/usr/share/pyshared/chemfp/fps_io.py is in python-chemfp 1.1p1-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 | from __future__ import absolute_import
from cStringIO import StringIO
from __builtin__ import open as _builtin_open
import binascii
import _chemfp
import re
import sys
import heapq
import itertools
import ctypes
from . import Metadata, ParseError, FingerprintReader
from . import fps_search
from . import io
# I tried a wide range of sizes for my laptop, with both compressed
# and uncompressed files, and found that the best size was around
# 2**17. Actually, 2**16.8 was the absolute best, which gives
BLOCKSIZE=11400
# (BTW, the compressed time took 1.3x the uncompressed time)
class FPSParseError(ParseError):
def __init__(self, errcode, lineno, filename):
self.errcode = errcode
self.lineno = lineno
self.filename = filename
def __repr__(self):
return "FPSParseError(%d, %d, %s)" % (self.errcode, self.lineno, self.filename)
def __str__(self):
msg = _chemfp.strerror(self.errcode)
msg += " at line %d" % (self.lineno,)
if self.filename is not None:
msg += " of %r" % (self.filename,)
return msg
def open_fps(source, format=None):
format_name, compression = io.normalize_format(source, format)
if format_name != "fps":
raise ValueError("Unknown format %r" % (format_name,))
infile = io.open_compressed_input_universal(source, compression)
filename = io.get_filename(source)
metadata, lineno, block = read_header(infile, filename)
return FPSReader(infile, metadata, lineno, block)
# This never buffers
def _read_blocks(infile):
while 1:
block = infile.read(BLOCKSIZE)
if not block:
break
if block[-1:] == "\n":
yield block
continue
line = infile.readline()
if not line:
# Note: this might not end with a newline!
yield block
break
yield block + line
class FPSReader(FingerprintReader):
_search = fps_search
def __init__(self, infile, metadata, first_fp_lineno, first_fp_block):
self._infile = infile
self._filename = getattr(infile, "name", "<unknown>")
self.metadata = metadata
self._first_fp_lineno = first_fp_lineno
self._first_fp_block = first_fp_block
self._expected_hex_len = 2*metadata.num_bytes
self._hex_len_source = "size in header"
self._at_start = True
self._it = None
self._block_reader = None
# Not sure if this is complete. Also, should have a context manager
# def close(self):
# self._infile.close()
def iter_blocks(self):
if self._block_reader is None:
self._block_reader = iter(self._iter_blocks())
return self._block_reader
def _iter_blocks(self):
if not self._at_start:
raise TypeError("Already iterating")
self._at_start = False
if self._first_fp_block is None:
return
block_stream = _read_blocks(self._infile)
yield self._first_fp_block
for block in block_stream:
yield block
def iter_rows(self):
unhexlify = binascii.unhexlify
lineno = self._first_fp_lineno
expected_hex_len = self._expected_hex_len
for block in self.iter_blocks():
for line in block.splitlines(True):
err = _chemfp.fps_line_validate(expected_hex_len, line)
if err:
raise FPSParseError(err, lineno, self._filename)
yield line[:-1].split("\t")
lineno += 1
def __iter__(self):
unhexlify = binascii.unhexlify
lineno = self._first_fp_lineno
expected_hex_len = self._expected_hex_len
for block in self.iter_blocks():
for line in block.splitlines(True):
err, id_fp = _chemfp.fps_parse_id_fp(expected_hex_len, line)
if err:
# Include the line?
raise FPSParseError(err, lineno, self._filename)
yield id_fp
lineno += 1
def _check_at_start(self):
if not self._at_start:
raise TypeError("FPS file is not at the start of the file; cannot search")
def count_tanimoto_hits_fp(self, query_fp, threshold=0.7):
self._check_at_start()
return fps_search.count_tanimoto_hits_fp(query_fp, self, threshold)
def count_tanimoto_hits_arena(self, queries, threshold=0.7, arena_size=100):
self._check_at_start()
return fps_search.count_tanimoto_hits_arena(queries, self, threshold)
def threshold_tanimoto_search_fp(self, query_fp, threshold=0.7):
self._check_at_start()
return fps_search.threshold_tanimoto_search_fp(query_fp, self, threshold)
def threshold_tanimoto_search_arena(self, queries, threshold=0.7, arena_size=100):
self._check_at_start()
return fps_search.threshold_tanimoto_search_arena(queries, self, threshold)
def knearest_tanimoto_search_fp(self, query_fp, k=3, threshold=0.7):
self._check_at_start()
return fps_search.knearest_tanimoto_search_fp(query_fp, self, k, threshold)
def knearest_tanimoto_search_arena(self, queries, k=3, threshold=0.7, arena_size=100):
self._check_at_start()
return fps_search.knearest_tanimoto_search_arena(queries, self, k, threshold)
def _where(filename, lineno):
if filename is None:
return "line %d" % (lineno,)
else:
return "%r line %d" % (filename, lineno)
# XXX Use Python's warning system
def warn_to_stderr(filename, lineno, message):
where = _where(filename, lineno)
sys.stderr.write("WARNING: %s at %s\n" % (message, where))
def read_header(f, filename, warn=warn_to_stderr):
metadata = Metadata()
lineno = 1
for block in _read_blocks(f):
# A block must be non-empty
start = 0
while 1:
c = block[start:start+1]
if c == "":
# End of the block; get the next one
break
if c != '#':
# End of the header. This block contains the first fingerprint line
block = block[start:]
if metadata.num_bits is None:
# We can figure this out from the fingerprint on the first line
err = _chemfp.fps_line_validate(-1, block)
if err:
raise FPSParseError(err, lineno, filename)
i = block.index("\t")
# If you don't specify the number of bits then I'll do it for you.
metadata.num_bits = i * 4
metadata.num_bytes = i // 2
return metadata, lineno, block
start += 1 # Skip the '#'
end = block.find("\n", start)
if end == -1:
# Only happens when the last line of the file contains
# no newlines. In that case, we're at the last block.
line = block[start:]
start = len(block)
else:
line = block[start:end]
start = end+1
# Right! We've got a line. Check if it's magic
# This is the only line which cannot contain a '='
if lineno == 1:
if line.rstrip() == "FPS1":
lineno += 1
continue
if line.startswith("x-") or line.startswith("X-"):
# Completely ignore the contents of 'experimental' lines
continue
if "=" not in line:
raise TypeError("header line must contain an '=': %r at %s" %
(line, _where(filename, lineno)))
key, value = line.split("=", 1)
key = key.strip()
value = value.strip()
if key == "num_bits":
try:
metadata.num_bits = int(value)
metadata.num_bytes = (metadata.num_bits + 7)//8
if not (metadata.num_bits > 0):
raise ValueError
except ValueError:
raise TypeError(
"num_bits header must be a positive integer, not %r: %s" %
(value, _where(filename, lineno)))
metadata.num_bytes = (metadata.num_bits+7)//8
elif key == "software":
metadata.software = value.decode("utf8")
elif key == "type":
# Should I have an auto-normalization step here which
# removes excess whitespace?
#metadata.type = normalize_type(value)
metadata.type = value
elif key == "source":
metadata.sources.append(value)
elif key == "date":
metadata.date = value
elif key == "aromaticity":
metadata.aromaticity = value
elif key.startswith("x-"):
pass
else:
#print "UNKNOWN", repr(line), repr(key), repr(value)
#warn(filename, lineno, "Unknown header %r" % (value,))
pass
lineno += 1
# Reached the end of file. No fingerprint lines and nothing left to process.
if metadata.num_bits is None:
metadata.num_bits = 0
metadata.num_bytes = 0
return metadata, lineno, None
|