/usr/lib/python3/dist-packages/postgresql/notifyman.py is in python3-postgresql 1.1.0-1build1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 | ##
# .notifyman - Receive and manage NOTIFY events.
##
"""
Notification Management Tools
Primarily this module houses the `NotificationManager` class which provides an
iterator for a NOTIFY event loop against a set of connections.
>>> import postgresql
>>> db = postgresql.open(...)
>>> from postgresql.notifyman import NotificationManager
>>> nm = NotificationManager(db, timeout = 10) # idle events every 10 seconds
>>> for x in nm:
... if x is None:
... # idle event
... ...
... db, notifies = x
... for channel, payload, pid in notifies:
... ...
"""
from time import time
from select import select
from itertools import chain
class NotificationManager(object):
"""
A class for managing the asynchronous notifications received by a
set of connections.
Instances provide the iterator for an event loop that responds to NOTIFYs
received by the connections being watched. There is no thread safety, so
when a connection is being managed, it should not be used concurrently in
other threads while being managed.
"""
__slots__ = (
'connections',
'garbage',
'incoming',
'timeout',
'_last_time',
'_pulled',
)
def __init__(self, *connections, timeout = None):
self.settimeout(timeout)
self.connections = set(connections)
# Connections that failed.
self.garbage = set()
# Used to store NOTIFYs consumed from the connections
self.incoming = None
self._last_time = None
# connection -> sequence of NOTIFYs
self._pulled = dict()
# Check the wire *and* wait for new messages.
def _wait_on_wires(self, time = time, select = select):
if self.timeout == 0:
# We're polling.
max_duration = 0
else:
# If timeout is None, we don't issue idle events, but
# we still cycle in case the timeout is changed.
if self._last_time is not None:
max_duration = (self.timeout or 10) - (time() - self._last_time)
if max_duration < 0:
max_duration = 0
else:
self._last_time = time()
max_duration = self.timeout or 10
# Connections already marked as "bad" should not be checked.
check = self.connections - self.garbage
for db in check:
if db.closed:
self.connections.remove(db)
self.garbage.add(db)
check = self.connections - self.garbage
r, w, x = select(check, (), check, max_duration)
# Make sure the connection's _notifies get filled.
for db in r:
# Collect any pending events.
try:
# Even if db is in a failed transaction, this
# 'null' command will succeed.
# (only connection failures blow up)
db.execute('')
except Exception:
# failed to collect notifies; put in exception list.
# It is very unlikely that this is *not* a FATAL error.
x.append(db)
self.trash(x)
def trash(self, connections):
"""
Remove the given connections from the set of good connections, and add
them to the `garbage` set.
This method can be overridden by subclasses to take a callback approach
to connection failures.
"""
# Identify the bad connections.
self.garbage.update(connections)
self.connections.difference_update(connections)
def queue(self, db, notifies):
"""
Queue the notifies for the specified connection. Upon success, the
This method can be overridden by subclasses to take a callback approach
to notification management.
"""
l = self._pulled.setdefault(db, list())
l.extend(notifies)
# Check the connection's _notifies list; just scan everything.
def _pull_from_connections(self):
for db in self.connections:
if not db._notifies:
# nothing queued up, look at the next connection
continue
# Pull notifies into the NotificationManager
decode = db.typio.decode
notifies = [
(decode(x.channel), decode(x.payload), x.pid)
for x in db._notifies
]
self.queue(db, notifies)
del db._notifies[:len(notifies)]
# "Append" the pulled NOTIFYs to the 'incoming' iterator.
def _queue_next(self):
new_seqs = []
for db in self._pulled:
decode = db.typio.decode
new_seqs.append((db, self._pulled[db]))
if new_seqs:
if self.incoming:
# Already have incoming; not an expected condition,
# but let's compensate.
self.incoming, self._pulled = chain(self.incoming, iter(new_seqs)), {}
else:
self.incoming, self._pulled = iter(new_seqs), {}
elif self.incoming is None:
# Use this to trigger the StopIteration case of zero-timeout.
self.incoming, self._pulled = iter(()), {}
def _timedout(self, time = time):
# Idles are guaranteed to occur, but make sure that
# __next__ has a chance to check the connections and the wires.
now = time()
if self._last_time is None:
self._last_time = now
elif self.timeout and now >= (self._last_time + self.timeout):
# Set last_time to None in case the timeout is so low
# that this condition keeps NOTIFYs from being seen.
self._last_time = None
# Signal timeout.
return True
else:
# toggle back to None.
self._last_time = None
return False
def settimeout(self, seconds):
"""
Set the maximum duration, in seconds, for waiting for NOTIFYs on the
set of managed connections. The given `seconds` argument can be a number
or `None`.
A timeout of `None` means no timeout, and "idle" events will never
occur.
A timeout of `0` means to never wait for NOTIFYs. This has the effect of
a StopIteration being raised by `__next__` when there are no more
Notifications available for any of the connections in the set. "Idle"
events will never occur in this situation as well.
A timeout greater than zero means to emit `None` as "idle" events into
the loop at the specified interval. Idle events are guaranteed to occur.
"""
if seconds is not None and seconds < 0:
raise ValueError("cannot set timeout less than zero")
self.timeout = seconds
def gettimeout(self):
'Get the timeout.'
return self.timeout
def __iter__(self):
return self
def __next__(self, time = time):
checked_wire = True
# Loop until NOTIFY received or timeout.
while True:
if self.incoming is not None:
try:
return next(self.incoming)
except StopIteration:
# Nothing more in this incoming.
self.incoming = None
# Allow a zero timeout to be used to indicate
# that there are no NOTIFYs to be read.
# This can be used to poll a set of
# connections instead of listening.
if self.timeout == 0 or not self.connections:
raise
# timeout happened? yield the "idle" event.
# This check **must** happen after .incoming is checked.
# Never emit idle when there are real events.
if self._timedout():
return None
if not checked_wire and self.connections:
# Nothing queued up, check connections if any.
self._wait_on_wires()
checked_wire = True
else:
checked_wire = False
self._pull_from_connections()
self._queue_next()
|