/usr/share/fence/fencing_snmp.py is in fence-agents 4.0.25-2ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 | #!/usr/bin/python -tt
# For example of use please see fence_cisco_mds
import re, pexpect
import logging
from fencing import *
from fencing import fail, fail_usage, EC_TIMED_OUT, run_delay
__all__ = ['FencingSnmp']
## do not add code here.
#BEGIN_VERSION_GENERATION
RELEASE_VERSION="4.0.25"
BUILD_DATE="(built Sat, 10 Feb 2018 00:55:27 -0800)"
REDHAT_COPYRIGHT="Copyright (C) Red Hat, Inc. 2004-2010 All rights reserved."
#END_VERSION_GENERATION
class FencingSnmp:
def __init__(self, options):
self.options = options
run_delay(options)
def quote_for_run(self, string):
return string.replace(r"'", "'\\''")
def complete_missed_params(self):
mapping = [[
['snmp-priv-passwd', 'password', '!snmp-sec-level'],
'self.options["--snmp-sec-level"]="authPriv"'
], [
['!snmp-version', 'community', '!username', '!snmp-priv-passwd', '!password'],
'self.options["--snmp-version"]="2c"'
]]
for val in mapping:
e = val[0]
res = True
for item in e:
if item[0] == '!' and "--" + item[1:] in self.options:
res = False
break
if item[0] != '!' and "--" + item[0:] not in self.options:
res = False
break
if res:
exec(val[1])
def prepare_cmd(self, command):
cmd = "%s -m '' -Oeqn "% (command)
self.complete_missed_params()
#mapping from our option to snmpcmd option
mapping = (('snmp-version', 'v'), ('community', 'c'))
for item in mapping:
if "--" + item[0] in self.options:
cmd += " -%s '%s'"% (item[1], self.quote_for_run(self.options["--" + item[0]]))
# Some options make sense only for v3 (and for v1/2c can cause "problems")
if ("--snmp-version" in self.options) and (self.options["--snmp-version"] == "3"):
# Mapping from our options to snmpcmd options for v3
mapping_v3 = (('snmp-auth-prot', 'a'), ('snmp-sec-level', 'l'), ('snmp-priv-prot', 'x'), \
('snmp-priv-passwd', 'X'), ('password', 'A'), ('username', 'u'))
for item in mapping_v3:
if "--"+item[0] in self.options:
cmd += " -%s '%s'"% (item[1], self.quote_for_run(self.options["--" + item[0]]))
force_ipvx = ""
if "--inet6-only" in self.options:
force_ipvx = "udp6:"
if "--inet4-only" in self.options:
force_ipvx = "udp:"
cmd += " '%s%s%s'"% (force_ipvx, self.quote_for_run(self.options["--ip"]),
"--ipport" in self.options and self.quote_for_run(":" + str(self.options["--ipport"])) or "")
return cmd
def run_command(self, command, additional_timemout=0):
try:
logging.debug("%s\n", command)
(res_output, res_code) = pexpect.run(command,
int(self.options["--shell-timeout"]) +
int(self.options["--login-timeout"]) +
additional_timemout, True)
if res_code == None:
fail(EC_TIMED_OUT)
logging.debug("%s\n", res_output)
if (res_code != 0) or (re.search("^Error ", res_output, re.MULTILINE) != None):
fail_usage("Returned %d: %s"% (res_code, res_output))
except pexpect.ExceptionPexpect:
fail_usage("Cannot run command %s"%(command))
return res_output
def get(self, oid, additional_timemout=0):
cmd = "%s '%s'"% (self.prepare_cmd(self.options["--snmpget-path"]), self.quote_for_run(oid))
output = self.run_command(cmd, additional_timemout).splitlines()
return output[len(output)-1].split(None, 1)
def set(self, oid, value, additional_timemout=0):
mapping = ((int, 'i'), (str, 's'))
type_of_value = ''
for item in mapping:
if isinstance(value, item[0]):
type_of_value = item[1]
break
cmd = "%s '%s' %s '%s'" % (self.prepare_cmd(self.options["--snmpset-path"]),
self.quote_for_run(oid), type_of_value, self.quote_for_run(str(value)))
self.run_command(cmd, additional_timemout)
def walk(self, oid, additional_timemout=0):
cmd = "%s '%s'"% (self.prepare_cmd(self.options["--snmpwalk-path"]), self.quote_for_run(oid))
output = self.run_command(cmd, additional_timemout).splitlines()
return [x.split(None, 1) for x in output if x.startswith(".")]
|