/usr/lib/python2.7/dist-packages/bcc/usdt.py is in python-bpfcc 0.5.0-5ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 | # Copyright 2016 Sasha Goldshtein
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import ctypes as ct
import os, sys
from .libbcc import lib, _USDT_CB, _USDT_PROBE_CB, \
bcc_usdt_location, bcc_usdt_argument, \
BCC_USDT_ARGUMENT_FLAGS
class USDTException(Exception):
pass
class USDTProbeArgument(object):
def __init__(self, argument):
self.signed = argument.size < 0
self.size = abs(argument.size)
self.valid = argument.valid
if self.valid & BCC_USDT_ARGUMENT_FLAGS.CONSTANT != 0:
self.constant = argument.constant
if self.valid & BCC_USDT_ARGUMENT_FLAGS.DEREF_OFFSET != 0:
self.deref_offset = argument.deref_offset
if self.valid & BCC_USDT_ARGUMENT_FLAGS.DEREF_IDENT != 0:
self.deref_ident = argument.deref_ident
if self.valid & BCC_USDT_ARGUMENT_FLAGS.BASE_REGISTER_NAME != 0:
self.base_register_name = argument.base_register_name
if self.valid & BCC_USDT_ARGUMENT_FLAGS.INDEX_REGISTER_NAME != 0:
self.index_register_name = argument.index_register_name
if self.valid & BCC_USDT_ARGUMENT_FLAGS.SCALE != 0:
self.scale = argument.scale
def _size_prefix(self):
return "%d %s bytes" % \
(self.size, "signed " if self.signed else "unsigned")
def _format(self):
# This mimics the logic in cc/usdt_args.cc that gives meaning to the
# various argument settings. A change there will require a change here.
if self.valid & BCC_USDT_ARGUMENT_FLAGS.CONSTANT != 0:
return "%d" % self.constant
if self.valid & BCC_USDT_ARGUMENT_FLAGS.DEREF_OFFSET == 0:
return "%s" % self.base_register_name
if self.valid & BCC_USDT_ARGUMENT_FLAGS.DEREF_OFFSET != 0 and \
self.valid & BCC_USDT_ARGUMENT_FLAGS.DEREF_IDENT == 0:
if self.valid & BCC_USDT_ARGUMENT_FLAGS.INDEX_REGISTER_NAME != 0:
index_offset = " + %s" % self.index_register_name
if self.valid & BCC_USDT_ARGUMENT_FLAGS.SCALE != 0:
index_offset += " * %d" % self.scale
else:
index_offset = ""
sign = '+' if self.deref_offset >= 0 else '-'
return "*(%s %s %d%s)" % (self.base_register_name,
sign, abs(self.deref_offset), index_offset)
if self.valid & BCC_USDT_ARGUMENT_FLAGS.DEREF_OFFSET != 0 and \
self.valid & BCC_USDT_ARGUMENT_FLAGS.DEREF_IDENT != 0 and \
self.valid & BCC_USDT_ARGUMENT_FLAGS.BASE_REGISTER_NAME != 0 and \
self.base_register_name == "ip":
sign = '+' if self.deref_offset >= 0 else '-'
return "*(&%s %s %d)" % (self.deref_ident,
sign, abs(self.deref_offset))
# If we got here, this is an unrecognized case. Doesn't mean it's
# necessarily bad, so just provide the raw data. It just means that
# other tools won't be able to work with this argument.
return "unrecognized argument format, flags %d" % self.valid
def __str__(self):
return "%s @ %s" % (self._size_prefix(), self._format())
class USDTProbeLocation(object):
def __init__(self, probe, index, location):
self.probe = probe
self.index = index
self.num_arguments = probe.num_arguments
self.address = location.address
def __str__(self):
return "0x%x" % self.address
def get_argument(self, index):
arg = bcc_usdt_argument()
res = lib.bcc_usdt_get_argument(self.probe.context, self.probe.name,
self.index, index, ct.byref(arg))
if res != 0:
raise USDTException(
"error retrieving probe argument %d location %d" %
(index, self.index))
return USDTProbeArgument(arg)
class USDTProbe(object):
def __init__(self, context, probe):
self.context = context
self.provider = probe.provider
self.name = probe.name
self.bin_path = probe.bin_path
self.semaphore = probe.semaphore
self.num_locations = probe.num_locations
self.num_arguments = probe.num_arguments
def __str__(self):
return "%s %s:%s [sema 0x%x]" % \
(self.bin_path, self.provider, self.name, self.semaphore)
def short_name(self):
return "%s:%s" % (self.provider, self.name)
def get_location(self, index):
loc = bcc_usdt_location()
res = lib.bcc_usdt_get_location(self.context, self.name,
index, ct.byref(loc))
if res != 0:
raise USDTException("error retrieving probe location %d" % index)
return USDTProbeLocation(self, index, loc)
class USDT(object):
def __init__(self, pid=None, path=None):
if pid and pid != -1:
self.pid = pid
self.context = lib.bcc_usdt_new_frompid(pid)
if self.context == None:
raise USDTException("USDT failed to instrument PID %d" % pid)
elif path:
self.path = path
self.context = lib.bcc_usdt_new_frompath(path.encode('ascii'))
if self.context == None:
raise USDTException("USDT failed to instrument path %s" % path)
else:
raise USDTException(
"either a pid or a binary path must be specified")
def enable_probe(self, probe, fn_name):
if lib.bcc_usdt_enable_probe(self.context, probe.encode('ascii'),
fn_name.encode('ascii')) != 0:
raise USDTException(
("failed to enable probe '%s'; a possible cause " +
"can be that the probe requires a pid to enable") %
probe
)
def enable_probe_or_bail(self, probe, fn_name):
if lib.bcc_usdt_enable_probe(self.context, probe.encode('ascii'),
fn_name.encode('ascii')) != 0:
print(
"""Error attaching USDT probes: the specified pid might not contain the
given language's runtime, or the runtime was not built with the required
USDT probes. Look for a configure flag similar to --with-dtrace or
--enable-dtrace. To check which probes are present in the process, use the
tplist tool.""")
sys.exit(1)
def get_context(self):
return self.context
def get_probe_arg_ctype(self, probe_name, arg_index):
return lib.bcc_usdt_get_probe_argctype(
self.context, probe_name, arg_index)
def enumerate_probes(self):
probes = []
def _add_probe(probe):
probes.append(USDTProbe(self.context, probe.contents))
lib.bcc_usdt_foreach(self.context, _USDT_CB(_add_probe))
return probes
# This is called by the BPF module's __init__ when it realizes that there
# is a USDT context and probes need to be attached.
def attach_uprobes(self, bpf):
probes = self.enumerate_active_probes()
for (binpath, fn_name, addr, pid) in probes:
bpf.attach_uprobe(name=binpath.decode(), fn_name=fn_name.decode(),
addr=addr, pid=pid)
def enumerate_active_probes(self):
probes = []
def _add_probe(binpath, fn_name, addr, pid):
probes.append((binpath, fn_name, addr, pid))
lib.bcc_usdt_foreach_uprobe(self.context, _USDT_PROBE_CB(_add_probe))
return probes
|