/usr/lib/python2.7/dist-packages/apscheduler/jobstores/zookeeper.py is in python-apscheduler 3.4.0-2ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 | from __future__ import absolute_import
import os
from datetime import datetime
from pytz import utc
from kazoo.exceptions import NoNodeError, NodeExistsError
from apscheduler.jobstores.base import BaseJobStore, JobLookupError, ConflictingIdError
from apscheduler.util import maybe_ref, datetime_to_utc_timestamp, utc_timestamp_to_datetime
from apscheduler.job import Job
try:
import cPickle as pickle
except ImportError: # pragma: nocover
import pickle
try:
from kazoo.client import KazooClient
except ImportError: # pragma: nocover
raise ImportError('ZooKeeperJobStore requires Kazoo installed')
class ZooKeeperJobStore(BaseJobStore):
"""
Stores jobs in a ZooKeeper tree. Any leftover keyword arguments are directly passed to
kazoo's `KazooClient
<http://kazoo.readthedocs.io/en/latest/api/client.html>`_.
Plugin alias: ``zookeeper``
:param str path: path to store jobs in
:param client: a :class:`~kazoo.client.KazooClient` instance to use instead of
providing connection arguments
:param int pickle_protocol: pickle protocol level to use (for serialization), defaults to the
highest available
"""
def __init__(self, path='/apscheduler', client=None, close_connection_on_exit=False,
pickle_protocol=pickle.HIGHEST_PROTOCOL, **connect_args):
super(ZooKeeperJobStore, self).__init__()
self.pickle_protocol = pickle_protocol
self.close_connection_on_exit = close_connection_on_exit
if not path:
raise ValueError('The "path" parameter must not be empty')
self.path = path
if client:
self.client = maybe_ref(client)
else:
self.client = KazooClient(**connect_args)
self._ensured_path = False
def _ensure_paths(self):
if not self._ensured_path:
self.client.ensure_path(self.path)
self._ensured_path = True
def start(self, scheduler, alias):
super(ZooKeeperJobStore, self).start(scheduler, alias)
if not self.client.connected:
self.client.start()
def lookup_job(self, job_id):
self._ensure_paths()
node_path = os.path.join(self.path, job_id)
try:
content, _ = self.client.get(node_path)
doc = pickle.loads(content)
job = self._reconstitute_job(doc['job_state'])
return job
except:
return None
def get_due_jobs(self, now):
timestamp = datetime_to_utc_timestamp(now)
jobs = [job_def['job'] for job_def in self._get_jobs()
if job_def['next_run_time'] is not None and job_def['next_run_time'] <= timestamp]
return jobs
def get_next_run_time(self):
next_runs = [job_def['next_run_time'] for job_def in self._get_jobs()
if job_def['next_run_time'] is not None]
return utc_timestamp_to_datetime(min(next_runs)) if len(next_runs) > 0 else None
def get_all_jobs(self):
jobs = [job_def['job'] for job_def in self._get_jobs()]
self._fix_paused_jobs_sorting(jobs)
return jobs
def add_job(self, job):
self._ensure_paths()
node_path = os.path.join(self.path, str(job.id))
value = {
'next_run_time': datetime_to_utc_timestamp(job.next_run_time),
'job_state': job.__getstate__()
}
data = pickle.dumps(value, self.pickle_protocol)
try:
self.client.create(node_path, value=data)
except NodeExistsError:
raise ConflictingIdError(job.id)
def update_job(self, job):
self._ensure_paths()
node_path = os.path.join(self.path, str(job.id))
changes = {
'next_run_time': datetime_to_utc_timestamp(job.next_run_time),
'job_state': job.__getstate__()
}
data = pickle.dumps(changes, self.pickle_protocol)
try:
self.client.set(node_path, value=data)
except NoNodeError:
raise JobLookupError(job.id)
def remove_job(self, job_id):
self._ensure_paths()
node_path = os.path.join(self.path, str(job_id))
try:
self.client.delete(node_path)
except NoNodeError:
raise JobLookupError(job_id)
def remove_all_jobs(self):
try:
self.client.delete(self.path, recursive=True)
except NoNodeError:
pass
self._ensured_path = False
def shutdown(self):
if self.close_connection_on_exit:
self.client.stop()
self.client.close()
def _reconstitute_job(self, job_state):
job_state = job_state
job = Job.__new__(Job)
job.__setstate__(job_state)
job._scheduler = self._scheduler
job._jobstore_alias = self._alias
return job
def _get_jobs(self):
self._ensure_paths()
jobs = []
failed_job_ids = []
all_ids = self.client.get_children(self.path)
for node_name in all_ids:
try:
node_path = os.path.join(self.path, node_name)
content, _ = self.client.get(node_path)
doc = pickle.loads(content)
job_def = {
'job_id': node_name,
'next_run_time': doc['next_run_time'] if doc['next_run_time'] else None,
'job_state': doc['job_state'],
'job': self._reconstitute_job(doc['job_state']),
'creation_time': _.ctime
}
jobs.append(job_def)
except:
self._logger.exception('Unable to restore job "%s" -- removing it' % node_name)
failed_job_ids.append(node_name)
# Remove all the jobs we failed to restore
if failed_job_ids:
for failed_id in failed_job_ids:
self.remove_job(failed_id)
paused_sort_key = datetime(9999, 12, 31, tzinfo=utc)
return sorted(jobs, key=lambda job_def: (job_def['job'].next_run_time or paused_sort_key,
job_def['creation_time']))
def __repr__(self):
self._logger.exception('<%s (client=%s)>' % (self.__class__.__name__, self.client))
return '<%s (client=%s)>' % (self.__class__.__name__, self.client)
|