This file is indexed.

/usr/lib/python2.7/dist-packages/kombu/transport/beanstalk.py is in python-kombu 3.0.33-1ubuntu2.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
"""
kombu.transport.beanstalk
=========================

Beanstalk transport.

:copyright: (c) 2010 - 2013 by David Ziegler.
:license: BSD, see LICENSE for more details.

"""
from __future__ import absolute_import

import socket

from anyjson import loads, dumps

from kombu.five import Empty
from kombu.utils.encoding import bytes_to_str

from . import virtual

try:
    import beanstalkc
except ImportError:  # pragma: no cover
    beanstalkc = None  # noqa

DEFAULT_PORT = 11300

__author__ = 'David Ziegler <david.ziegler@gmail.com>'


class Channel(virtual.Channel):
    _client = None

    def _parse_job(self, job):
        item, dest = None, None
        if job:
            try:
                item = loads(bytes_to_str(job.body))
                dest = job.stats()['tube']
            except Exception:
                job.bury()
            else:
                job.delete()
        else:
            raise Empty()
        return item, dest

    def _put(self, queue, message, **kwargs):
        extra = {}
        priority = message['properties']['delivery_info']['priority']
        ttr = message['properties'].get('ttr')
        if ttr is not None:
            extra['ttr'] = ttr

        self.client.use(queue)
        self.client.put(dumps(message), priority=priority, **extra)

    def _get(self, queue):
        if queue not in self.client.watching():
            self.client.watch(queue)

        [self.client.ignore(active) for active in self.client.watching()
         if active != queue]

        job = self.client.reserve(timeout=1)
        item, dest = self._parse_job(job)
        return item

    def _get_many(self, queues, timeout=1):
        # timeout of None will cause beanstalk to timeout waiting
        # for a new request
        if timeout is None:
            timeout = 1

        watching = self.client.watching()

        [self.client.watch(active) for active in queues
         if active not in watching]

        [self.client.ignore(active) for active in watching
         if active not in queues]

        job = self.client.reserve(timeout=timeout)
        return self._parse_job(job)

    def _purge(self, queue):
        if queue not in self.client.watching():
            self.client.watch(queue)

        [self.client.ignore(active)
         for active in self.client.watching()
         if active != queue]
        count = 0
        while 1:
            job = self.client.reserve(timeout=1)
            if job:
                job.delete()
                count += 1
            else:
                break
        return count

    def _size(self, queue):
        return 0

    def _open(self):
        conninfo = self.connection.client
        host = conninfo.hostname or 'localhost'
        port = conninfo.port or DEFAULT_PORT
        conn = beanstalkc.Connection(host=host, port=port)
        conn.connect()
        return conn

    def close(self):
        if self._client is not None:
            return self._client.close()
        super(Channel, self).close()

    @property
    def client(self):
        if self._client is None:
            self._client = self._open()
        return self._client


class Transport(virtual.Transport):
    Channel = Channel

    polling_interval = 1
    default_port = DEFAULT_PORT
    connection_errors = (
        virtual.Transport.connection_errors + (
            socket.error, IOError,
            getattr(beanstalkc, 'SocketError', None),
        )
    )
    channel_errors = (
        virtual.Transport.channel_errors + (
            socket.error, IOError,
            getattr(beanstalkc, 'SocketError', None),
            getattr(beanstalkc, 'BeanstalkcException', None),
        )
    )
    driver_type = 'beanstalk'
    driver_name = 'beanstalkc'

    def __init__(self, *args, **kwargs):
        if beanstalkc is None:
            raise ImportError(
                'Missing beanstalkc library (pip install beanstalkc)')
        super(Transport, self).__init__(*args, **kwargs)

    def driver_version(self):
        return beanstalkc.__version__