This file is indexed.

/usr/share/pyshared/checkbox/contrib/REThread.py is in checkbox 0.13.7.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
'''Enhanced threading.Thread which can deliver a return value and propagate
exceptions from the called thread to the calling thread.

Copyright (C) 2007 Canonical Ltd.
Author: Martin Pitt <martin.pitt@ubuntu.com>

This program is free software; you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by the
Free Software Foundation; either version 2 of the License, or (at your
option) any later version.  See http://www.gnu.org/copyleft/gpl.html for
the full text of the license.
'''

import threading, sys

class REThread(threading.Thread):
    '''Enhanced threading.Thread which can deliver a return value and propagate
    exceptions from the called thread to the calling thread.'''

    def __init__(self, group=None, target=None, name=None, args=(), kwargs={},
            verbose=None):
        '''Initialize Thread, identical to threading.Thread.__init__().'''

        threading.Thread.__init__(self, group, target, name, args, kwargs,
            verbose)
        self.__target = target
        self.__args = args
        self.__kwargs = kwargs
        self._retval = None
        self._exception = None

    def run(self):
        '''Run target function, identical to threading.Thread.run().'''

        if self.__target:
            try:
                self._retval = self.__target(*self.__args, **self.__kwargs)
            except:
                self._exception = sys.exc_info()

    def return_value(self):
        '''Return value from target function.

        This can only be called after the thread has finished, i. e. when
        isAlive() is False and did not terminate with an exception.'''

        assert not self.isAlive()
        assert not self._exception
        return self._retval

    def exc_info(self):
        '''Return a tuple (type, value, traceback) of the exception caught in
        run().'''

        return self._exception

    def exc_raise(self):
        '''Raises the exception caught in the thread.

        Does nothing if no exception was caught.'''

        if self._exception:
            raise self._exception[0], self._exception[1], self._exception[2]

#
# Unit test
#

if __name__ == '__main__':
    import unittest, time, traceback, exceptions

    def idle(seconds):
        '''Test thread to just wait a bit.'''

        time.sleep(seconds)

    def div(x, y):
        '''Test thread to divide two numbers.'''

        return x / y

    class _REThreadTest(unittest.TestCase):
        def test_return_value(self):
            '''Test that return value works properly.'''

            t = REThread(target=div, args=(42, 2))
            t.start()
            t.join()
            # exc_raise() should be a no-op on successful functions
            t.exc_raise()
            self.assertEqual(t.return_value(), 21)
            self.assertEqual(t.exc_info(), None)

        def test_no_return_value(self):
            '''Test that REThread works if run() does not return anything.'''

            t = REThread(target=idle, args=(0.5,))
            t.start()
            # thread must be joined first
            self.assertRaises(AssertionError, t.return_value)
            t.join()
            self.assertEqual(t.return_value(), None)
            self.assertEqual(t.exc_info(), None)

        def test_exception(self):
            '''Test that exception in thread is caught and passed.'''

            t = REThread(target=div, args=(1, 0))
            t.start()
            t.join()
            # thread did not terminate normally, no return value
            self.assertRaises(AssertionError, t.return_value)
            self.assert_(t.exc_info()[0] == exceptions.ZeroDivisionError)
            exc = traceback.format_exception(t.exc_info()[0], t.exc_info()[1],
                t.exc_info()[2])
            self.assert_(exc[-1].startswith('ZeroDivisionError'))
            self.assert_(exc[-2].endswith('return x / y\n'))

        def test_exc_raise(self):
            '''Test that exc_raise() raises caught thread exception.'''

            t = REThread(target=div, args=(1, 0))
            t.start()
            t.join()
            # thread did not terminate normally, no return value
            self.assertRaises(AssertionError, t.return_value)
            raised = False
            try:
                t.exc_raise()
            except:
                raised = True
                e = sys.exc_info()
                exc = traceback.format_exception(e[0], e[1], e[2])
                self.assert_(exc[-1].startswith('ZeroDivisionError'))
                self.assert_(exc[-2].endswith('return x / y\n'))
            self.assert_(raised)

    unittest.main()