This file is indexed.

/usr/lib/python3/dist-packages/apscheduler/schedulers/twisted.py is in python3-apscheduler 3.4.0-2ubuntu1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
from __future__ import absolute_import

from functools import wraps

from apscheduler.schedulers.base import BaseScheduler
from apscheduler.util import maybe_ref

try:
    from twisted.internet import reactor as default_reactor
except ImportError:  # pragma: nocover
    raise ImportError('TwistedScheduler requires Twisted installed')


def run_in_reactor(func):
    @wraps(func)
    def wrapper(self, *args, **kwargs):
        self._reactor.callFromThread(func, self, *args, **kwargs)
    return wrapper


class TwistedScheduler(BaseScheduler):
    """
    A scheduler that runs on a Twisted reactor.

    Extra options:

    =========== ========================================================
    ``reactor`` Reactor instance to use (defaults to the global reactor)
    =========== ========================================================
    """

    _reactor = None
    _delayedcall = None

    def _configure(self, config):
        self._reactor = maybe_ref(config.pop('reactor', default_reactor))
        super(TwistedScheduler, self)._configure(config)

    @run_in_reactor
    def shutdown(self, wait=True):
        super(TwistedScheduler, self).shutdown(wait)
        self._stop_timer()

    def _start_timer(self, wait_seconds):
        self._stop_timer()
        if wait_seconds is not None:
            self._delayedcall = self._reactor.callLater(wait_seconds, self.wakeup)

    def _stop_timer(self):
        if self._delayedcall and self._delayedcall.active():
            self._delayedcall.cancel()
            del self._delayedcall

    @run_in_reactor
    def wakeup(self):
        self._stop_timer()
        wait_seconds = self._process_jobs()
        self._start_timer(wait_seconds)

    def _create_default_executor(self):
        from apscheduler.executors.twisted import TwistedExecutor
        return TwistedExecutor()