/usr/bin/pysnmptrap is in python-pysnmp4-apps 0.3.2-1.
This file is owned by root:root, with mode 0o755.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 | #!/usr/bin/python
#
# TRAP generator
#
# Copyright 1999-2012 by Ilya Etingof <ilya@glas.net>.
#
import sys, socket, time
from pysnmp_apps.cli import main, msgmod, secmod, target, pdu, mibview, base
from pysnmp.entity import engine, config
from pysnmp.entity.rfc3413 import ntforg, context
from pysnmp.proto.proxy import rfc2576
from pysnmp.proto import rfc1902
from pysnmp.proto.api import v1, v2c
from pysnmp import error
def getUsage():
return "Usage: %s [OPTIONS] <MANAGER> <PARAMETERS>\n\
%s%s%s%s\
TRAP options:\n\
-C<TRAPOPT>: set various application specific behaviours:\n\
i: send INFORM-PDU, expect a response\n\
%s\
SNMPv1 TRAP management parameters:\n\
enterprise-oid agent generic-trap specific-trap uptime <management-params>\n\
where:\n\
generic-trap: coldStart|warmStart|linkDown|linkUp|authenticationFailure|egpNeighborLoss|enterpriseSpecific\n\
SNMPv2/SNMPv3 management parameters:\n\
uptime trap-oid <management-params>\n\
%s" % (
sys.argv[0],
main.getUsage(),
msgmod.getUsage(),
secmod.getUsage(),
mibview.getUsage(),
target.getUsage(),
pdu.getWriteUsage()
)
# Construct c/l interpreter for this app
class Scanner(
msgmod.MPScannerMixIn,
secmod.SMScannerMixIn,
mibview.MibViewScannerMixIn,
target.TargetScannerMixIn,
pdu.ReadPduScannerMixIn,
main.MainScannerMixIn,
base.ScannerTemplate
):
def t_appopts(self, s):
r' -C '
self.rv.append(base.ConfigToken('appopts'))
def t_genericTrap(self, s):
r' coldStart|warmStart|linkDown|linkUp|authenticationFailure|egpNeighborLoss|enterpriseSpecific '
self.rv.append(base.ConfigToken('genericTrap', s))
class Parser(
msgmod.MPParserMixIn,
secmod.SMParserMixIn,
mibview.MibViewParserMixIn,
target.TargetParserMixIn,
pdu.WritePduParserMixIn,
main.MainParserMixIn,
base.ParserTemplate
):
def p_trapParams(self, args):
'''
TrapV1Params ::= EnterpriseOid whitespace AgentName whitespace GenericTrap whitespace SpecificTrap whitespace Uptime whitespace VarBinds
EnterpriseOid ::= string
AgentName ::= string
GenericTrap ::= genericTrap
SpecificTrap ::= string
Uptime ::= string
TrapV2cParams ::= Uptime whitespace TrapOid whitespace VarBinds
TrapOid ::= string
'''
def p_paramsSpec(self, args):
'''
Params ::= TrapV1Params
Params ::= TrapV2cParams
'''
def p_appOptions(self, args):
'''
Option ::= ApplicationOption
ApplicationOption ::= appopts whitespace string
ApplicationOption ::= appopts string
'''
class __Generator(base.GeneratorTemplate):
def n_ApplicationOption(self, cbCtx, node):
snmpEngine, ctx = cbCtx
if len(node) > 2:
opt = node[2].attr
else:
opt = node[1].attr
for c in opt:
if c == 'i':
ctx['informMode'] = 1
else:
raise error.PySnmpError('bad -C option - "%s"' % c)
def n_EnterpriseOid(self, cbCtx, node):
snmpEngine, ctx= cbCtx
ctx['EnterpriseOid'] = node[0].attr
def n_AgentName(self, cbCtx, node):
snmpEngine, ctx = cbCtx
try:
ctx['AgentName'] = socket.gethostbyname(node[0].attr)
except socket.error:
raise error.PySnmpError(
'Bad agent name %s: %s' % (node[0].attr, sys.exc_info()[1])
)
def n_GenericTrap(self, cbCtx, node):
snmpEngine, ctx = cbCtx
ctx['GenericTrap'] = node[0].attr
def n_SpecificTrap(self, cbCtx, node):
snmpEngine, ctx = cbCtx
ctx['SpecificTrap'] = node[0].attr
def n_Uptime(self, cbCtx, node):
snmpEngine, ctx = cbCtx
ctx['Uptime'] = int(node[0].attr)
def n_TrapV1Params_exit(self, cbCtx, node):
snmpEngine, ctx = cbCtx
# Hack SNMP engine's uptime
sysUpTime, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols(
'__SNMPv2-MIB', 'sysUpTime'
)
sysUpTime.syntax.createdAt = time.time() - float(ctx['Uptime'])/100
# Initialize v1 PDU with passed params, then proxy it into v2c PDU
v1Pdu = v1.TrapPDU()
v1.apiTrapPDU.setDefaults(v1Pdu)
if 'EnterpriseOid' in ctx:
v1.apiTrapPDU.setEnterprise(v1Pdu, ctx['EnterpriseOid'])
if 'AgentName' in ctx:
v1.apiTrapPDU.setAgentAddr(v1Pdu, ctx['AgentName'])
if 'GenericTrap' in ctx:
v1.apiTrapPDU.setGenericTrap(v1Pdu, ctx['GenericTrap'])
if 'SpecificTrap' in ctx:
v1.apiTrapPDU.setSpecificTrap(v1Pdu, ctx['SpecificTrap'])
if 'Uptime' in ctx:
v1.apiTrapPDU.setTimeStamp(v1Pdu, ctx['Uptime'])
v2cPdu = rfc2576.v1ToV2(v1Pdu)
# Drop first two var-binds of v2c PDU as they are set internally by
# SNMP engine
varBinds = v2c.apiPDU.getVarBinds(v2cPdu)
if 'varBinds' not in ctx:
ctx['varBinds'] = []
ctx['varBinds'] = varBinds[2:] + ctx['varBinds']
# Extract TrapOid from proxied PDU
ctx['TrapOid' ] = varBinds[1][1].prettyPrint()
def n_TrapOid(self, cbCtx, node):
snmpEngine, ctx = cbCtx
ctx['TrapOid'] = node[0].attr
def n_TrapV2cParams_exit(self, cbCtx, node):
snmpEngine, ctx = cbCtx
# Hack SNMP engine's uptime
sysUpTime, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols('__SNMPv2-MIB', 'sysUpTime')
sysUpTime.syntax.createdAt = time.time() - float(ctx['Uptime'])/100
def generator(cbCtx, ast):
snmpEngine, ctx = cbCtx
return __Generator().preorder((snmpEngine, ctx), ast)
snmpEngine = engine.SnmpEngine()
try:
# Parse c/l into AST
ast = Parser().parse(
Scanner().tokenize(' '.join(sys.argv[1:]))
)
ctx = {}
# Apply configuration to SNMP entity
main.generator((snmpEngine, ctx), ast)
msgmod.generator((snmpEngine, ctx), ast)
secmod.generator((snmpEngine, ctx), ast)
mibview.generator((snmpEngine, ctx), ast)
target.generatorTrap((snmpEngine, ctx), ast)
pdu.writePduGenerator((snmpEngine, ctx), ast)
generator((snmpEngine, ctx), ast)
except error.PySnmpError:
sys.stderr.write('Error: %s\n%s' % (sys.exc_info()[1], getUsage()))
sys.exit(-1)
# Run SNMP engine
def cbFun(sendRequestHandle, errorIndication, cbCtx):
if errorIndication:
sys.stderr.write('%s\n' % errorIndication)
return
snmpContext = context.SnmpContext(snmpEngine)
# Agent-side VACM setup
config.addContext(snmpEngine, '')
config.addVacmUser(snmpEngine, 1, ctx['securityName'],
'noAuthNoPriv', (), (), (1,3,6),
contextName=ctx.get('contextName', '')) # v1
config.addVacmUser(snmpEngine, 2, ctx['securityName'],
'noAuthNoPriv', (), (), (1,3,6),
contextName=ctx.get('contextName', '')) # v2c
config.addVacmUser(snmpEngine, 3, ctx['securityName'],
'authPriv', (), (), (1,3,6),
contextName=ctx.get('contextName', '')) # v3
config.addVacmUser(snmpEngine, 3, ctx['securityName'],
'authNoPriv', (), (), (1,3,6),
contextName=ctx.get('contextName', '')) # v3
config.addVacmUser(snmpEngine, 3, ctx['securityName'],
'noAuthNoPriv', (), (), (1,3,6),
contextName=ctx.get('contextName', '')) # v3
if 'contextName' in ctx:
snmpContext.registerContextName(
ctx.get('contextName', ''),
# ref to base MIB instrum
snmpEngine.msgAndPduDsp.mibInstrumController
)
ctx['notificationName'] = 'myNotifyName'
config.addNotificationTarget(
snmpEngine, ctx['notificationName'], ctx['paramsName'],
ctx['transportTag'], 'informMode' in ctx and 'inform' or 'trap'
)
ntforg.NotificationOriginator(snmpContext).sendNotification(
snmpEngine, ctx['notificationName'], ctx['TrapOid'],
ctx['varBinds'], cbFun, ctx, ctx.get('contextName', '')
)
try:
snmpEngine.transportDispatcher.runDispatcher()
except error.PySnmpError:
sys.stderr.write('Error: %s\n' % sys.exc_info()[1])
sys.exit(-1)
|