This file is indexed.

/usr/lib/python3/dist-packages/pysnmp/entity/observer.py is in python3-pysnmp4 4.3.2-2.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
#
# This file is part of pysnmp software.
#
# Copyright (c) 2005-2016, Ilya Etingof <ilya@glas.net>
# License: http://pysnmp.sf.net/license.html
#
from pysnmp import error

class MetaObserver:
    """This is a simple facility for exposing internal SNMP Engine
       working details to pysnmp applications. These details are
       basically local scope variables at a fixed point of execution.

       Two modes of operations are offered:
       1. App can request an execution point context by execution point ID.
       2. App can register its callback function (and context) to be invoked
          once execution reaches specified point. All local scope variables
          will be passed to the callback as in #1.

       It's important to realize that execution context is only guaranteed
       to exist to functions that are at the same or deeper level of invocation
       relative to execution point specified.
    """
    def __init__(self):
        self.__observers = {}
        self.__contexts = {}
        self.__execpoints = {}

    def registerObserver(self, cbFun, *execpoints, **kwargs):
        if cbFun in self.__contexts:
            raise error.PySnmpError('duplicate observer %s' % cbFun)
        else:
            self.__contexts[cbFun] = kwargs.get('cbCtx')
        for execpoint in execpoints:
            if not execpoint in self.__observers:
                self.__observers[execpoint] = []
            self.__observers[execpoint].append(cbFun)

    def unregisterObserver(self, cbFun=None):
        if cbFun is None:
            self.__observers.clear()
            self.__contexts.clear()
        else:
            for execpoint in dict(self.__observers):
                if cbFun in self.__observers[execpoint]:
                    self.__observers[execpoint].remove(cbFun)
                if not self.__observers[execpoint]:
                    del self.__observers[execpoint]

    def storeExecutionContext(self, snmpEngine, execpoint, variables):
        self.__execpoints[execpoint] = variables
        if execpoint in self.__observers:
            for cbFun in self.__observers[execpoint]:
                cbFun(snmpEngine, execpoint, variables, self.__contexts[cbFun])

    def clearExecutionContext(self, snmpEngine, *execpoints):
        if execpoints:
            for execpoint in execpoints:
                del self.__execpoints[execpoint]
        else:
            self.__execpoints.clear()

    def getExecutionContext(self, execpoint):
        return self.__execpoints[execpoint]