/usr/lib/python2.7/dist-packages/kombu/transport/pyro.py is in python-kombu 3.0.33-1ubuntu2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 | """
kombu.transport.pyro
======================
Pyro transport.
Requires the :mod:`Pyro4` library to be installed.
"""
from __future__ import absolute_import
import sys
from kombu.five import reraise
from kombu.utils import cached_property
from . import virtual
try:
import Pyro4 as pyro
from Pyro4.errors import NamingError
except ImportError: # pragma: no cover
pyro = NamingError = None # noqa
DEFAULT_PORT = 9090
E_LOOKUP = """\
Unable to locate pyro nameserver {0.virtual_host} on host {0.hostname}\
"""
class Channel(virtual.Channel):
def queues(self):
return self.shared_queues.get_queue_names()
def _new_queue(self, queue, **kwargs):
if queue not in self.queues():
self.shared_queues.new_queue(queue)
def _get(self, queue, timeout=None):
queue = self._queue_for(queue)
msg = self.shared_queues._get(queue)
return msg
def _queue_for(self, queue):
if queue not in self.queues():
self.shared_queues.new_queue(queue)
return queue
def _put(self, queue, message, **kwargs):
queue = self._queue_for(queue)
self.shared_queues._put(queue, message)
def _size(self, queue):
return self.shared_queues._size(queue)
def _delete(self, queue, *args):
self.shared_queues._delete(queue)
def _purge(self, queue):
return self.shared_queues._purge(queue)
def after_reply_message_received(self, queue):
pass
@cached_property
def shared_queues(self):
return self.connection.shared_queues
class Transport(virtual.Transport):
Channel = Channel
#: memory backend state is global.
state = virtual.BrokerState()
default_port = DEFAULT_PORT
driver_type = driver_name = 'pyro'
def _open(self):
conninfo = self.client
pyro.config.HMAC_KEY = conninfo.virtual_host
try:
nameserver = pyro.locateNS(host=conninfo.hostname,
port=self.default_port)
# name of registered pyro object
uri = nameserver.lookup(conninfo.virtual_host)
return pyro.Proxy(uri)
except NamingError:
reraise(NamingError, NamingError(E_LOOKUP.format(conninfo)),
sys.exc_info()[2])
def driver_version(self):
return pyro.__version__
@cached_property
def shared_queues(self):
return self._open()
|