This file is indexed.

/usr/lib/python2.7/dist-packages/kombu/transport/couchdb.py is in python-kombu 3.0.33-1ubuntu2.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
"""
kombu.transport.couchdb
=======================

CouchDB transport.

:copyright: (c) 2010 - 2013 by David Clymer.
:license: BSD, see LICENSE for more details.

"""
from __future__ import absolute_import

import socket

from anyjson import loads, dumps

from kombu.five import Empty
from kombu.utils import uuid4
from kombu.utils.encoding import bytes_to_str

from . import virtual

try:
    import couchdb
except ImportError:  # pragma: no cover
    couchdb = None   # noqa

DEFAULT_PORT = 5984
DEFAULT_DATABASE = 'kombu_default'

__author__ = 'David Clymer <david@zettazebra.com>'


def create_message_view(db):
    from couchdb import design

    view = design.ViewDefinition('kombu', 'messages', """
        function (doc) {
          if (doc.queue && doc.payload)
            emit(doc.queue, doc);
        }
        """)
    if not view.get_doc(db):
        view.sync(db)


class Channel(virtual.Channel):
    _client = None

    view_created = False

    def _put(self, queue, message, **kwargs):
        self.client.save({'_id': uuid4().hex,
                          'queue': queue,
                          'payload': dumps(message)})

    def _get(self, queue):
        result = self._query(queue, limit=1)
        if not result:
            raise Empty()

        item = result.rows[0].value
        self.client.delete(item)
        return loads(bytes_to_str(item['payload']))

    def _purge(self, queue):
        result = self._query(queue)
        for item in result:
            self.client.delete(item.value)
        return len(result)

    def _size(self, queue):
        return len(self._query(queue))

    def _open(self):
        conninfo = self.connection.client
        dbname = conninfo.virtual_host
        proto = conninfo.ssl and 'https' or 'http'
        if not dbname or dbname == '/':
            dbname = DEFAULT_DATABASE
        port = conninfo.port or DEFAULT_PORT
        server = couchdb.Server('%s://%s:%s/' % (proto,
                                                 conninfo.hostname,
                                                 port))
        # Use username and password if avaliable
        try:
            if conninfo.userid:
                server.resource.credentials = (conninfo.userid,
                                               conninfo.password)
        except AttributeError:
            pass
        try:
            return server[dbname]
        except couchdb.http.ResourceNotFound:
            return server.create(dbname)

    def _query(self, queue, **kwargs):
        if not self.view_created:
            # if the message view is not yet set up, we'll need it now.
            create_message_view(self.client)
            self.view_created = True
        return self.client.view('kombu/messages', key=queue, **kwargs)

    @property
    def client(self):
        if self._client is None:
            self._client = self._open()
        return self._client


class Transport(virtual.Transport):
    Channel = Channel

    polling_interval = 1
    default_port = DEFAULT_PORT
    connection_errors = (
        virtual.Transport.connection_errors + (
            socket.error,
            getattr(couchdb, 'HTTPError', None),
            getattr(couchdb, 'ServerError', None),
            getattr(couchdb, 'Unauthorized', None),
        )
    )
    channel_errors = (
        virtual.Transport.channel_errors + (
            getattr(couchdb, 'HTTPError', None),
            getattr(couchdb, 'ServerError', None),
            getattr(couchdb, 'PreconditionFailed', None),
            getattr(couchdb, 'ResourceConflict', None),
            getattr(couchdb, 'ResourceNotFound', None),
        )
    )
    driver_type = 'couchdb'
    driver_name = 'couchdb'

    def __init__(self, *args, **kwargs):
        if couchdb is None:
            raise ImportError('Missing couchdb library (pip install couchdb)')
        super(Transport, self).__init__(*args, **kwargs)

    def driver_version(self):
        return couchdb.__version__