/usr/lib/python3/dist-packages/pysnmp/entity/rfc3413/ntfrcv.py is in python3-pysnmp4 4.2.5-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 | import sys
from pyasn1.compat.octets import null
from pysnmp.proto import rfc3411, error
from pysnmp.proto.api import v1, v2c # backend is always SMIv2 compliant
from pysnmp.proto.proxy import rfc2576
from pysnmp import debug
# 3.4
class NotificationReceiver:
pduTypes = (
v1.TrapPDU.tagSet,
v2c.SNMPv2TrapPDU.tagSet,
v2c.InformRequestPDU.tagSet
)
def __init__(self, snmpEngine, cbFun, cbCtx=None):
snmpEngine.msgAndPduDsp.registerContextEngineId(
null, self.pduTypes, self.processPdu # '' is a wildcard
)
self.__cbFunVer = 0
self.__cbFun = cbFun
self.__cbCtx = cbCtx
def close(self, snmpEngine):
snmpEngine.msgAndPduDsp.unregisterContextEngineId(
null, self.pduTypes
)
self.__cbFun = self.__cbCtx = None
def processPdu(
self,
snmpEngine,
messageProcessingModel,
securityModel,
securityName,
securityLevel,
contextEngineId,
contextName,
pduVersion,
PDU,
maxSizeResponseScopedPDU,
stateReference
):
# Agent-side API complies with SMIv2
if messageProcessingModel == 0:
origPdu = PDU
PDU = rfc2576.v1ToV2(PDU)
else:
origPdu = None
errorStatus = 'noError'; errorIndex = 0
varBinds = v2c.apiPDU.getVarBinds(PDU)
debug.logger & debug.flagApp and debug.logger('processPdu: stateReference %s, varBinds %s' % (stateReference, varBinds))
# 3.4
if PDU.tagSet in rfc3411.confirmedClassPDUs:
# 3.4.1 --> no-op
rspPDU = v2c.apiPDU.getResponse(PDU)
# 3.4.2
v2c.apiPDU.setErrorStatus(rspPDU, errorStatus)
v2c.apiPDU.setErrorIndex(rspPDU, errorIndex)
v2c.apiPDU.setVarBinds(rspPDU, varBinds)
debug.logger & debug.flagApp and debug.logger('processPdu: stateReference %s, confirm PDU %s' % (stateReference, rspPDU.prettyPrint()))
# Agent-side API complies with SMIv2
if messageProcessingModel == 0:
rspPDU = rfc2576.v2ToV1(rspPDU, origPdu)
statusInformation = {}
# 3.4.3
try:
snmpEngine.msgAndPduDsp.returnResponsePdu(
snmpEngine,
messageProcessingModel,
securityModel,
securityName,
securityLevel,
contextEngineId,
contextName,
pduVersion,
rspPDU,
maxSizeResponseScopedPDU,
stateReference,
statusInformation
)
except error.StatusInformation:
debug.logger & debug.flagApp and debug.logger('processPdu: stateReference %s, statusInformation %s' % (stateReference, sys.exc_info()[1]))
snmpSilentDrops, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols('__SNMPv2-MIB', 'snmpSilentDrops')
snmpSilentDrops.syntax = snmpSilentDrops.syntax + 1
elif PDU.tagSet in rfc3411.unconfirmedClassPDUs:
pass
else:
raise error.ProtocolError('Unexpected PDU class %s' % PDU.tagSet)
debug.logger & debug.flagApp and debug.logger('processPdu: stateReference %s, user cbFun %s, cbCtx %s, varBinds %s' % (stateReference, self.__cbFun, self.__cbCtx, varBinds))
if self.__cbFunVer:
self.__cbFun(
snmpEngine, stateReference, contextEngineId, contextName,
varBinds, self.__cbCtx
)
else:
# Compatibility stub (handle legacy cbFun interface)
try:
self.__cbFun(
snmpEngine, contextEngineId, contextName,
varBinds, self.__cbCtx
)
except TypeError:
self.__cbFunVer = 1
self.__cbFun(
snmpEngine, stateReference, contextEngineId, contextName,
varBinds, self.__cbCtx
)
|