/usr/lib/python2.7/dist-packages/volatility/scan.py is in volatility 2.6-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 | # Volatility
# Copyright (C) 2007-2013 Volatility Foundation
#
# Derived from source in PyFlag developed by:
# Copyright 2004: Commonwealth of Australia.
# Michael Cohen <scudette@users.sourceforge.net>
# David Collett <daveco@users.sourceforge.net>
#
# This file is part of Volatility.
#
# Volatility is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# Volatility is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Volatility. If not, see <http://www.gnu.org/licenses/>.
#
# Special thanks to Michael Cohen for ideas and comments!
#
#pylint: disable-msg=C0111
"""
@author: AAron Walters
@license: GNU General Public License 2.0
@contact: awalters@4tphi.net
@organization: Volatility Foundation
"""
import volatility.debug as debug
import volatility.registry as registry
import volatility.addrspace as addrspace
import volatility.constants as constants
import volatility.conf as conf
########### Following is the new implementation of the scanning
########### framework. The old framework was based on PyFlag's
########### scanning framework which is probably too complex for this.
class BaseScanner(object):
""" A more thorough scanner which checks every byte """
checks = []
def __init__(self, window_size = 8):
self.buffer = addrspace.BufferAddressSpace(conf.DummyConfig(), data = '\x00' * 1024)
self.window_size = window_size
self.constraints = []
self.error_count = 0
def check_addr(self, found):
""" This calls all our constraints on the offset found and
returns the number of contraints that matched.
We shortcut the loop as soon as its obvious that there will
not be sufficient matches to fit the criteria. This allows for
an early exit and a speed boost.
"""
cnt = 0
for check in self.constraints:
## constraints can raise for an error
try:
val = check.check(found)
except Exception:
debug.b()
val = False
if not val:
cnt = cnt + 1
if cnt > self.error_count:
return False
return True
overlap = 20
def scan(self, address_space, offset = 0, maxlen = None):
self.buffer.profile = address_space.profile
current_offset = offset
## Build our constraints from the specified ScannerCheck
## classes:
self.constraints = []
for class_name, args in self.checks:
check = registry.get_plugin_classes(ScannerCheck)[class_name](self.buffer, **args)
self.constraints.append(check)
## Which checks also have skippers?
skippers = [ c for c in self.constraints if hasattr(c, "skip") ]
for (range_start, range_size) in sorted(address_space.get_available_addresses()):
# Jump to the next available point to scan from
# self.base_offset jumps up to be at least range_start
current_offset = max(range_start, current_offset)
range_end = range_start + range_size
# If we have a maximum length, we make sure it's less than the range_end
if maxlen:
range_end = min(range_end, offset + maxlen)
while (current_offset < range_end):
# We've now got range_start <= self.base_offset < range_end
# Figure out how much data to read
l = min(constants.SCAN_BLOCKSIZE + self.overlap, range_end - current_offset)
# Populate the buffer with data
# We use zread to scan what we can because there are often invalid
# pages in the DTB
data = address_space.zread(current_offset, l)
self.buffer.assign_buffer(data, current_offset)
## Run checks throughout this block of data
i = 0
while i < l:
if self.check_addr(i + current_offset):
## yield the offset to the start of the memory
## (after the pool tag)
yield i + current_offset
## Where should we go next? By default we go 1 byte
## ahead, but if some of the checkers have skippers,
## we may actually go much farther. Checkers with
## skippers basically tell us that there is no way
## they can match anything before the skipped result,
## so there is no point in trying them on all the data
## in between. This optimization is useful to really
## speed things up. FIXME - currently skippers assume
## that the check must match, therefore we can skip
## the unmatchable region, but its possible that a
## scanner needs to match only some checkers.
skip = 1
for s in skippers:
skip = max(skip, s.skip(data, i))
i += skip
current_offset += min(constants.SCAN_BLOCKSIZE, l)
class DiscontigScanner(BaseScanner):
def scan(self, address_space, offset = 0, maxlen = None):
debug.warning("DiscontigScanner has been deprecated, all functionality is now contained in BaseScanner")
for match in BaseScanner.scan(self, address_space, offset, maxlen):
yield match
class ScannerCheck(object):
""" A scanner check is a special class which is invoked on an AS to check for a specific condition.
The main method is def check(self, offset):
This will return True if the condition is true or False otherwise.
This class is the base class for all checks.
"""
def __init__(self, address_space, **_kwargs):
self.address_space = address_space
def object_offset(self, offset, address_space):
return offset
def check(self, _offset):
return False
## If you want to speed up the scanning define this method - it
## will be used to skip the data which is obviously not going to
## match. You will need to return the number of bytes from offset
## to skip to. We take the maximum number of bytes to guarantee
## that all checks have a chance of passing.
#def skip(self, data, offset):
# return -1
|