This file is indexed.

/usr/share/pyshared/celery/concurrency/base.py is in python-celery 2.4.6-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# -*- coding: utf-8 -*-
from __future__ import absolute_import

import logging
import os
import sys
import time
import traceback

from functools import partial

from .. import log
from ..datastructures import ExceptionInfo
from ..utils import timer2
from ..utils.encoding import safe_repr


def apply_target(target, args=(), kwargs={}, callback=None,
        accept_callback=None, pid=None):
    if accept_callback:
        accept_callback(pid or os.getpid(), time.time())
    callback(target(*args, **kwargs))


class BasePool(object):
    RUN = 0x1
    CLOSE = 0x2
    TERMINATE = 0x3

    Timer = timer2.Timer

    signal_safe = True
    rlimit_safe = True
    is_green = False

    _state = None
    _pool = None

    def __init__(self, limit=None, putlocks=True, logger=None, **options):
        self.limit = limit
        self.putlocks = putlocks
        self.logger = logger or log.get_default_logger()
        self.options = options
        self.does_debug = self.logger.isEnabledFor(logging.DEBUG)

    def on_start(self):
        pass

    def on_stop(self):
        pass

    def on_apply(self, *args, **kwargs):
        pass

    def on_terminate(self):
        pass

    def terminate_job(self, pid):
        raise NotImplementedError(
                "%s does not implement kill_job" % (self.__class__, ))

    def stop(self):
        self._state = self.CLOSE
        self.on_stop()
        self._state = self.TERMINATE

    def terminate(self):
        self._state = self.TERMINATE
        self.on_terminate()

    def start(self):
        self.on_start()
        self._state = self.RUN

    def apply_async(self, target, args=None, kwargs=None, callback=None,
            errback=None, accept_callback=None, timeout_callback=None,
            soft_timeout=None, timeout=None, **compat):
        """Equivalent of the :func:`apply` built-in function.

        Callbacks should optimally return as soon as possible ince
        otherwise the thread which handles the result will get blocked.

        """
        args = args or []
        kwargs = kwargs or {}

        on_ready = partial(self.on_ready, callback, errback)
        on_worker_error = partial(self.on_worker_error, errback)

        if self.does_debug:
            self.logger.debug("TaskPool: Apply %s (args:%s kwargs:%s)",
                            target, safe_repr(args), safe_repr(kwargs))

        return self.on_apply(target, args, kwargs,
                             callback=on_ready,
                             accept_callback=accept_callback,
                             timeout_callback=timeout_callback,
                             error_callback=on_worker_error,
                             waitforslot=self.putlocks,
                             soft_timeout=soft_timeout,
                             timeout=timeout)

    def on_ready(self, callback, errback, ret_value):
        """What to do when a worker task is ready and its return value has
        been collected."""

        if isinstance(ret_value, ExceptionInfo):
            if isinstance(ret_value.exception, (
                    SystemExit, KeyboardInterrupt)):
                raise ret_value.exception
            self.safe_apply_callback(errback, ret_value)
        else:
            self.safe_apply_callback(callback, ret_value)

    def on_worker_error(self, errback, exc_info):
        errback(exc_info)

    def safe_apply_callback(self, fun, *args):
        if fun:
            try:
                fun(*args)
            except BaseException:
                self.logger.error("Pool callback raised exception: %s",
                                  traceback.format_exc(),
                                  exc_info=sys.exc_info())

    def _get_info(self):
        return {}

    @property
    def info(self):
        return self._get_info()

    @property
    def active(self):
        return self._state == self.RUN

    @property
    def num_processes(self):
        return self.limit