This file is indexed.

/usr/lib/python3/dist-packages/kombu/async/semaphore.py is in python3-kombu 3.0.33-1ubuntu2.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
# -*- coding: utf-8 -*-
"""
kombu.async.semaphore
=====================

Semaphores and concurrency primitives.

"""
from __future__ import absolute_import

from collections import deque

__all__ = ['DummyLock', 'LaxBoundedSemaphore']


class LaxBoundedSemaphore(object):
    """Asynchronous Bounded Semaphore.

    Lax means that the value will stay within the specified
    range even if released more times than it was acquired.

    Example:

        >>> from future import print_statement as printf
        # ^ ignore: just fooling stupid pyflakes

        >>> x = LaxBoundedSemaphore(2)

        >>> x.acquire(printf, 'HELLO 1')
        HELLO 1

        >>> x.acquire(printf, 'HELLO 2')
        HELLO 2

        >>> x.acquire(printf, 'HELLO 3')
        >>> x._waiters   # private, do not access directly
        [print, ('HELLO 3', )]

        >>> x.release()
        HELLO 3

    """

    def __init__(self, value):
        self.initial_value = self.value = value
        self._waiting = deque()
        self._add_waiter = self._waiting.append
        self._pop_waiter = self._waiting.popleft

    def acquire(self, callback, *partial_args):
        """Acquire semaphore, applying ``callback`` if
        the resource is available.

        :param callback: The callback to apply.
        :param \*partial_args: partial arguments to callback.

        """
        value = self.value
        if value <= 0:
            self._add_waiter((callback, partial_args))
            return False
        else:
            self.value = max(value - 1, 0)
            callback(*partial_args)
            return True

    def release(self):
        """Release semaphore.

        If there are any waiters this will apply the first waiter
        that is waiting for the resource (FIFO order).

        """
        try:
            waiter, args = self._pop_waiter()
        except IndexError:
            self.value = min(self.value + 1, self.initial_value)
        else:
            waiter(*args)

    def grow(self, n=1):
        """Change the size of the semaphore to accept more users."""
        self.initial_value += n
        self.value += n
        [self.release() for _ in range(n)]

    def shrink(self, n=1):
        """Change the size of the semaphore to accept less users."""
        self.initial_value = max(self.initial_value - n, 0)
        self.value = max(self.value - n, 0)

    def clear(self):
        """Reset the semaphore, which also wipes out any waiting callbacks."""
        self._waiting.clear()
        self.value = self.initial_value

    def __repr__(self):
        return '<{0} at {1:#x} value:{2} waiting:{3}>'.format(
            self.__class__.__name__, id(self), self.value, len(self._waiting),
        )


class DummyLock(object):
    """Pretending to be a lock."""

    def __enter__(self):
        return self

    def __exit__(self, *exc_info):
        pass