This file is indexed.

/usr/share/pyshared/nose/plugins/logcapture.py is in python-nose 1.1.2-3.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
"""
This plugin captures logging statements issued during test execution. When an
error or failure occurs, the captured log messages are attached to the running
test in the test.capturedLogging attribute, and displayed with the error failure
output. It is enabled by default but can be turned off with the option
``--nologcapture``.

You can filter captured logging statements with the ``--logging-filter`` option. 
If set, it specifies which logger(s) will be captured; loggers that do not match
will be passed. Example: specifying ``--logging-filter=sqlalchemy,myapp``
will ensure that only statements logged via sqlalchemy.engine, myapp
or myapp.foo.bar logger will be logged.

You can remove other installed logging handlers with the
``--logging-clear-handlers`` option.
"""

import logging
from logging.handlers import BufferingHandler
import threading

from nose.plugins.base import Plugin
from nose.util import anyp, ln, safe_str

try:
    from cStringIO import StringIO
except ImportError:
    from StringIO import StringIO

log = logging.getLogger(__name__)

class FilterSet(object):
    def __init__(self, filter_components):
        self.inclusive, self.exclusive = self._partition(filter_components)

    # @staticmethod
    def _partition(components):
        inclusive, exclusive = [], []
        for component in components:
            if component.startswith('-'):
                exclusive.append(component[1:])
            else:
                inclusive.append(component)
        return inclusive, exclusive
    _partition = staticmethod(_partition)

    def allow(self, record):
        """returns whether this record should be printed"""
        if not self:
            # nothing to filter
            return True
        return self._allow(record) and not self._deny(record)

    # @staticmethod
    def _any_match(matchers, record):
        """return the bool of whether `record` starts with
        any item in `matchers`"""
        def record_matches_key(key):
            return record == key or record.startswith(key + '.')
        return anyp(bool, map(record_matches_key, matchers))
    _any_match = staticmethod(_any_match)

    def _allow(self, record):
        if not self.inclusive:
            return True
        return self._any_match(self.inclusive, record)

    def _deny(self, record):
        if not self.exclusive:
            return False
        return self._any_match(self.exclusive, record)


class MyMemoryHandler(BufferingHandler):
    def __init__(self, capacity, logformat, logdatefmt, filters):
        BufferingHandler.__init__(self, capacity)
        fmt = logging.Formatter(logformat, logdatefmt)
        self.setFormatter(fmt)
        self.filterset = FilterSet(filters)
    def flush(self):
        pass # do nothing
    def truncate(self):
        self.buffer = []
    def filter(self, record):
        return self.filterset.allow(record.name)
    def __getstate__(self):
        state = self.__dict__.copy()
        del state['lock']
        return state
    def __setstate__(self, state):
        self.__dict__.update(state)
        self.lock = threading.RLock()


class LogCapture(Plugin):
    """
    Log capture plugin. Enabled by default. Disable with --nologcapture.
    This plugin captures logging statements issued during test execution,
    appending any output captured to the error or failure output,
    should the test fail or raise an error.
    """
    enabled = True
    env_opt = 'NOSE_NOLOGCAPTURE'
    name = 'logcapture'
    score = 500
    logformat = '%(name)s: %(levelname)s: %(message)s'
    logdatefmt = None
    clear = False
    filters = ['-nose']

    def options(self, parser, env):
        """Register commandline options.
        """
        parser.add_option(
            "--nologcapture", action="store_false",
            default=not env.get(self.env_opt), dest="logcapture",
            help="Disable logging capture plugin. "
                 "Logging configurtion will be left intact."
                 " [NOSE_NOLOGCAPTURE]")
        parser.add_option(
            "--logging-format", action="store", dest="logcapture_format",
            default=env.get('NOSE_LOGFORMAT') or self.logformat,
            metavar="FORMAT",
            help="Specify custom format to print statements. "
                 "Uses the same format as used by standard logging handlers."
                 " [NOSE_LOGFORMAT]")
        parser.add_option(
            "--logging-datefmt", action="store", dest="logcapture_datefmt",
            default=env.get('NOSE_LOGDATEFMT') or self.logdatefmt,
            metavar="FORMAT",
            help="Specify custom date/time format to print statements. "
                 "Uses the same format as used by standard logging handlers."
                 " [NOSE_LOGDATEFMT]")
        parser.add_option(
            "--logging-filter", action="store", dest="logcapture_filters",
            default=env.get('NOSE_LOGFILTER'),
            metavar="FILTER",
            help="Specify which statements to filter in/out. "
                 "By default, everything is captured. If the output is too"
                 " verbose,\nuse this option to filter out needless output.\n"
                 "Example: filter=foo will capture statements issued ONLY to\n"
                 " foo or foo.what.ever.sub but not foobar or other logger.\n"
                 "Specify multiple loggers with comma: filter=foo,bar,baz.\n"
                 "If any logger name is prefixed with a minus, eg filter=-foo,\n"
                 "it will be excluded rather than included. Default: "
                 "exclude logging messages from nose itself (-nose)."
                 " [NOSE_LOGFILTER]\n")
        parser.add_option(
            "--logging-clear-handlers", action="store_true",
            default=False, dest="logcapture_clear",
            help="Clear all other logging handlers")

    def configure(self, options, conf):
        """Configure plugin.
        """
        self.conf = conf
        # Disable if explicitly disabled, or if logging is
        # configured via logging config file
        if not options.logcapture or conf.loggingConfig:
            self.enabled = False
        self.logformat = options.logcapture_format
        self.logdatefmt = options.logcapture_datefmt
        self.clear = options.logcapture_clear
        if options.logcapture_filters:
            self.filters = options.logcapture_filters.split(',')

    def setupLoghandler(self):
        # setup our handler with root logger
        root_logger = logging.getLogger()
        if self.clear:
            if hasattr(root_logger, "handlers"):
                for handler in root_logger.handlers:
                    root_logger.removeHandler(handler)
            for logger in logging.Logger.manager.loggerDict.values():
                if hasattr(logger, "handlers"):
                    for handler in logger.handlers:
                        logger.removeHandler(handler)
        # make sure there isn't one already
        # you can't simply use "if self.handler not in root_logger.handlers"
        # since at least in unit tests this doesn't work --
        # LogCapture() is instantiated for each test case while root_logger
        # is module global
        # so we always add new MyMemoryHandler instance
        for handler in root_logger.handlers[:]:
            if isinstance(handler, MyMemoryHandler):
                root_logger.handlers.remove(handler)
        root_logger.addHandler(self.handler)
        # to make sure everything gets captured
        root_logger.setLevel(logging.NOTSET)

    def begin(self):
        """Set up logging handler before test run begins.
        """
        self.start()

    def start(self):
        self.handler = MyMemoryHandler(1000, self.logformat, self.logdatefmt,
                                       self.filters)
        self.setupLoghandler()

    def end(self):
        pass

    def beforeTest(self, test):
        """Clear buffers and handlers before test.
        """
        self.setupLoghandler()

    def afterTest(self, test):
        """Clear buffers after test.
        """
        self.handler.truncate()

    def formatFailure(self, test, err):
        """Add captured log messages to failure output.
        """
        return self.formatError(test, err)

    def formatError(self, test, err):
        """Add captured log messages to error output.
        """
        # logic flow copied from Capture.formatError
        test.capturedLogging = records = self.formatLogRecords()
        if not records:
            return err
        ec, ev, tb = err
        return (ec, self.addCaptureToErr(ev, records), tb)

    def formatLogRecords(self):
        format = self.handler.format
        return [safe_str(format(r)) for r in self.handler.buffer]

    def addCaptureToErr(self, ev, records):
        return '\n'.join([safe_str(ev), ln('>> begin captured logging <<')] + \
                          records + \
                          [ln('>> end captured logging <<')])