/usr/share/checkbox/plugins/jobs_prompt.py is in checkbox 0.13.7.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 | #
# This file is part of Checkbox.
#
# Copyright 2010 Canonical Ltd.
#
# Checkbox is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Checkbox is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Checkbox. If not, see <http://www.gnu.org/licenses/>.
#
from checkbox.contrib.persist import Persist, MemoryBackend
from checkbox.job import JobStore, PASS, UNINITIATED, UNTESTED
from checkbox.properties import Int, Path
from checkbox.plugin import Plugin
from checkbox.user_interface import NEXT, PREV
class JobsPrompt(Plugin):
# Directory where messages are stored
store_directory = Path(default="%(checkbox_data)s/store")
# Maximum number of messages per directory
store_directory_size = Int(default=1000)
@property
def persist(self):
if self._persist is None:
self._persist = Persist(backend=MemoryBackend())
return self._persist.root_at("jobs_prompt")
@property
def store(self):
if self._store is None:
self._store = JobStore(self.persist, self.store_directory,
self.store_directory_size)
return self._store
def register(self, manager):
super(JobsPrompt, self).register(manager)
self._ignore = []
self._persist = None
self._store = None
for (rt, rh) in [
("begin-persist", self.begin_persist),
("begin-recover", self.begin_recover),
("ignore-jobs", self.ignore_jobs),
("prompt-job", self.prompt_job),
("prompt-jobs", self.prompt_jobs),
("prompt-finish", self.prompt_finish),
("report", self.report),
("report-job", self.report_job)]:
self._manager.reactor.call_on(rt, rh)
#This should fire first thing during the gathering phase.
self._manager.reactor.call_on("gather", self.begin_gather, -900)
#This should fire last during gathering (i.e. after
#all other gathering callbacks are finished)
self._manager.reactor.call_on("gather", self.end_gather, 900)
def begin_persist(self, persist):
self._persist = persist
def begin_recover(self, recover):
if not recover:
self.store.delete_all_messages()
def begin_gather(self):
#Speed boost during the gathering phase. Not critical data anyway.
self.store.safe_file_closing = False
def end_gather(self):
#Back to saving data very carefully once gathering is done.
self.store.safe_file_closing = True
def ignore_jobs(self, jobs):
self._ignore = jobs
def report_job(self, job):
# Update job
job.setdefault("status", UNINITIATED)
self._manager.reactor.fire("report-%s" % job["plugin"], job)
self.store.add(job)
def prompt_job(self, interface, job):
attribute = "description" if job.get("type") == "suite" else "name"
if job[attribute] in self._ignore:
job["status"] = UNTESTED
else:
if "depends" in job:
offset = self.store.get_pending_offset()
self.store.set_pending_offset(0)
messages = self.store.get_pending_messages()
self.store.set_pending_offset(offset)
# Skip if any message in the depends doesn't pass
depends = job["depends"]
for message in messages:
if message["name"] in depends \
and message["status"] != PASS:
return
self._manager.reactor.fire("prompt-%s" % job["plugin"], interface, job)
def prompt_jobs(self, interface):
while True:
if interface.direction == PREV:
if not self.store.remove_pending_offset():
break
messages = self.store.get_pending_messages(1)
if not messages:
break
job = messages[0]
self._manager.reactor.fire("prompt-job", interface, job)
self.store.update(job)
if interface.direction == NEXT:
self.store.add_pending_offset()
def prompt_finish(self, interface):
if interface.direction == NEXT:
self.store.delete_all_messages()
def report(self):
self.store.set_pending_offset(0)
messages = self.store.get_pending_messages()
self.store.add_pending_offset(len(messages))
tests = [m for m in messages if m.get("type") in ("test", "metric")]
self._manager.reactor.fire("report-tests", tests)
suites = [m for m in messages if m.get("type") == "suite"]
self._manager.reactor.fire("report-suites", suites)
attachments = [m for m in messages if m.get("type") == "attachment" and "data" in m]
self._manager.reactor.fire("report-attachments", attachments)
factory = JobsPrompt
|