/usr/lib/python2.7/dist-packages/kombu/transport/couchdb.py is in python-kombu 3.0.33-1ubuntu2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 | """
kombu.transport.couchdb
=======================
CouchDB transport.
:copyright: (c) 2010 - 2013 by David Clymer.
:license: BSD, see LICENSE for more details.
"""
from __future__ import absolute_import
import socket
from anyjson import loads, dumps
from kombu.five import Empty
from kombu.utils import uuid4
from kombu.utils.encoding import bytes_to_str
from . import virtual
try:
import couchdb
except ImportError: # pragma: no cover
couchdb = None # noqa
DEFAULT_PORT = 5984
DEFAULT_DATABASE = 'kombu_default'
__author__ = 'David Clymer <david@zettazebra.com>'
def create_message_view(db):
from couchdb import design
view = design.ViewDefinition('kombu', 'messages', """
function (doc) {
if (doc.queue && doc.payload)
emit(doc.queue, doc);
}
""")
if not view.get_doc(db):
view.sync(db)
class Channel(virtual.Channel):
_client = None
view_created = False
def _put(self, queue, message, **kwargs):
self.client.save({'_id': uuid4().hex,
'queue': queue,
'payload': dumps(message)})
def _get(self, queue):
result = self._query(queue, limit=1)
if not result:
raise Empty()
item = result.rows[0].value
self.client.delete(item)
return loads(bytes_to_str(item['payload']))
def _purge(self, queue):
result = self._query(queue)
for item in result:
self.client.delete(item.value)
return len(result)
def _size(self, queue):
return len(self._query(queue))
def _open(self):
conninfo = self.connection.client
dbname = conninfo.virtual_host
proto = conninfo.ssl and 'https' or 'http'
if not dbname or dbname == '/':
dbname = DEFAULT_DATABASE
port = conninfo.port or DEFAULT_PORT
server = couchdb.Server('%s://%s:%s/' % (proto,
conninfo.hostname,
port))
# Use username and password if avaliable
try:
if conninfo.userid:
server.resource.credentials = (conninfo.userid,
conninfo.password)
except AttributeError:
pass
try:
return server[dbname]
except couchdb.http.ResourceNotFound:
return server.create(dbname)
def _query(self, queue, **kwargs):
if not self.view_created:
# if the message view is not yet set up, we'll need it now.
create_message_view(self.client)
self.view_created = True
return self.client.view('kombu/messages', key=queue, **kwargs)
@property
def client(self):
if self._client is None:
self._client = self._open()
return self._client
class Transport(virtual.Transport):
Channel = Channel
polling_interval = 1
default_port = DEFAULT_PORT
connection_errors = (
virtual.Transport.connection_errors + (
socket.error,
getattr(couchdb, 'HTTPError', None),
getattr(couchdb, 'ServerError', None),
getattr(couchdb, 'Unauthorized', None),
)
)
channel_errors = (
virtual.Transport.channel_errors + (
getattr(couchdb, 'HTTPError', None),
getattr(couchdb, 'ServerError', None),
getattr(couchdb, 'PreconditionFailed', None),
getattr(couchdb, 'ResourceConflict', None),
getattr(couchdb, 'ResourceNotFound', None),
)
)
driver_type = 'couchdb'
driver_name = 'couchdb'
def __init__(self, *args, **kwargs):
if couchdb is None:
raise ImportError('Missing couchdb library (pip install couchdb)')
super(Transport, self).__init__(*args, **kwargs)
def driver_version(self):
return couchdb.__version__
|