This file is indexed.

/usr/share/pyshared/checkbox/dispatcher.py is in checkbox 0.13.7.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
# Copyright 2010-2011 Canonical Ltd.  This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).

__metaclass__ = type

__all__ = [
    "Dispatcher",
    "DispatcherList",
    "DispatcherQueue",
    ]

import logging

from itertools import product


class Event:
    """Event payload containing the positional and keywoard arguments
    passed to the handler in the event listener."""

    def __init__(self, type, *args, **kwargs):
        self.type = type
        self.args = args
        self.kwargs = kwargs


class Listener:
    """Event listener notified when events are published by the dispatcher."""

    def __init__(self, event_type, handler, count):
        self.event_type = event_type
        self.handler = handler
        self.count = count

    def notify(self, event):
        """Notify the handler with the payload of the event.

        :param event: The event containint the payload for the handler.
        """
        if self.count is None or self.count:
            self.handler(*event.args, **event.kwargs)
            if self.count:
                self.count -= 1


class ListenerList(Listener):
    """Event listener notified for lists of events."""

    def __init__(self, *args, **kwargs):
        super(ListenerList, self).__init__(*args, **kwargs)
        self.event_types = set(self.event_type)
        self.kwargs = {}

    def notify(self, event):
        """Only notify the handler when all the events for this listener
        have been published by the dispatcher. When duplicate events
        occur, the latest event is preserved and the previous one are
        overwritten until all events have been published.
        """
        if self.count is None or self.count:
            self.kwargs[event.type] = event.args[0]
            if self.event_types.issubset(self.kwargs):
                self.handler(**self.kwargs)
                if self.count:
                    self.count -= 1


class ListenerQueue(ListenerList):

    def notify(self, event):
        """Only notify the handler when all the events for this listener
        have been published by the dispatcher. Duplicate events are enqueued
        and dequeued only when all events have been published.
        """
        arg = event.args[0]
        queue = self.kwargs.setdefault(event.type, [])

        # Strip duplicates from the queue.
        if arg not in queue:
            queue.append(arg)

        # Once the queue has handler has been called, the queue
        # then behaves like a list using the latest events.
        if self.event_types.issubset(self.kwargs):
            self.notify = notify = super(ListenerQueue, self).notify
            keys = self.kwargs.keys()
            for values in product(*self.kwargs.values()):
                self.kwargs = dict(zip(keys, values))
                notify(event)


class Dispatcher:
    """Register handlers and publish events for them identified by strings."""

    listener_factory = Listener

    def __init__(self, listener_factory=None):
        self._event_listeners = {}

        if listener_factory is not None:
            self.listener_factory = listener_factory

    def registerHandler(self, event_type, handler, count=None):
        """Register an event handler and return its listener.

        :param event_type: The name of the event type to handle.
        :param handler: The function handling the given event type.
        :param count: Optionally, the number times to call the handler.
        """
        listener = self.listener_factory(event_type, handler, count)

        listeners = self._event_listeners.setdefault(event_type, [])
        listeners.append(listener)

        return listener

    def unregisterHandler(self, handler):
        """Unregister a handler.

        :param handler: The handler to unregister.
        """
        for event_type, listeners in self._event_listeners.items():
            listeners = [
                listener for listener in listeners
                if listener.handler == handler]
            if listeners:
                self._event_listeners[event_type] = listeners
            else:
                del self._event_listeners[event_type]

    def unregisterListener(self, listener, event_type=None):
        """Unregister a listener.

        :param listener: The listener of the handler to unregister.
        :param event_type: Optionally, the event_type to unregister.
        """
        if event_type is None:
            event_type = listener.event_type

        self._event_listeners[event_type].remove(listener)
        if not self._event_listeners[event_type]:
            del self._event_listeners[event_type]

    def publishEvent(self, event_type, *args, **kwargs):
        """Publish an event of a given type and notify all listeners.

        :param event_type: The name of the event type to publish.
        :param args: Positional arguments to pass to the registered handlers.
        :param kwargs: Keyword arguments to pass to the registered handlers.
        """
        if event_type in self._event_listeners:
            event = Event(event_type, *args, **kwargs)
            for listener in list(self._event_listeners[event_type]):
                try:
                    listener.notify(event)
                    if listener.count is not None and not listener.count:
                        self.unregisterListener(listener)
                except:
                    logging.exception(
                        "Error running event handler for %r with args %r %r",
                        event_type, args, kwargs)


class DispatcherList(Dispatcher):
    """
    Register handlers and publish events for them identified by lists
    of strings.
    """

    listener_factory = ListenerList

    def registerHandler(self, event_types, handler, count=None):
        """See Dispatcher."""
        if not isinstance(event_types, (list, tuple,)):
            event_types = (event_types,)

        listener = self.listener_factory(event_types, handler, count)
        for event_type in event_types:
            listeners = self._event_listeners.setdefault(event_type, [])
            listeners.append(listener)

        return listener

    def unregisterListener(self, listener):
        """See Dispatcher."""
        for event_type in listener.event_types:
            super(DispatcherList, self).unregisterListener(
                listener, event_type)

    def publishEvent(self, event_type, arg):
        """See Dispatcher."""
        super(DispatcherList, self).publishEvent(event_type, arg)


class DispatcherQueue(DispatcherList):
    """
    Register handlers and publish events for them identified by lists
    of strings in queue order.
    """

    listener_factory = ListenerQueue