/usr/lib/python2.7/dist-packages/cogent/parse/blast_xml.py is in python-cogent 1.9-9.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 | #!/usr/bin/env python
"""Parsers for XML output of blast, psi-blast and blat.
"""
__author__ = "Kristian Rother"
__copyright__ = "Copyright 2007-2016, The Cogent Project"
__contributors__ = ["Micah Hamady"]
__credits__ = ["Rob Knight"]
__license__ = "GPL"
__version__ = "1.9"
__maintainer__ = "Kristian Rother"
__email__ = "krother@rubor.de"
__status__ = "Prototype"
import xml.dom.minidom
"""
CAUTION:
This XML BLAST PARSER uses minidom. This means a bad performance for
big files (>5MB), and huge XML files will for sure crash the program!
(06/2009 Kristian)
Possible improvements:
- convert some values into floats automatically (feature request)
- MH recommends sax.* for faster processing.
- test against nt result
- test really big file.
- consider high speed parser for standard output
"""
from cogent.parse.blast import BlastResult
# field names used to parse tags and create dict.
HIT_XML_FIELDNAMES = ['QUERY ID','SUBJECT_ID','HIT_DEF','HIT_ACCESSION',\
'HIT_LENGTH']
HSP_XML_FIELDS = (
('PERCENT_IDENTITY','Hsp_identity'),
('ALIGNMENT_LENGTH','Hsp_align-len'),
('MISMATCHES',''),
('GAP_OPENINGS','Hsp_gaps'),
('QUERY_START','Hsp_query-from'),
('QUERY_END','Hsp_query-to'),
('SUBJECT_START','Hsp_hit-from'),
('SUBJECT_END','Hsp_hit-to'),
('E_VALUE','Hsp_evalue'),
('BIT_SCORE','Hsp_bit-score'),
('SCORE','Hsp_score'),
('POSITIVE','Hsp_positive'),
('QUERY_ALIGN','Hsp_qseq'),
('SUBJECT_ALIGN','Hsp_hseq'),
('MIDLINE_ALIGN','Hsp_midline'),
)
HSP_XML_FIELDNAMES = [x[0] for x in HSP_XML_FIELDS]
HSP_XML_TAGNAMES = [x[1] for x in HSP_XML_FIELDS]
def get_tag(record, name, default=None):
"""
Loks in the XML tag 'record' for
other tags named 'name', and returns the value of the first one.
If none is found, it returns 'default'.
"""
tag = record.getElementsByTagName(name)
if len(tag) and len(tag[0].childNodes):
return tag[0].childNodes[0].nodeValue
else:
return default
def parse_hit(hit_tag,query_id=1):
"""
Parses a 'Hit' dom object.
Returns a list of lists with HSP data.
"""
result = []
# parse elements from hit tag
hit_id = get_tag(hit_tag,'Hit_id')
hit_def = get_tag(hit_tag,'Hit_def')
accession = get_tag(hit_tag,'Hit_accession')
length = int(get_tag(hit_tag,'Hit_len'), 0)
hit_data = [query_id,hit_id, hit_def, accession, length]
# process HSPS in this hit.
for hsp_tag in hit_tag.getElementsByTagName('Hsp'):
result.append(hit_data + parse_hsp(hsp_tag))
return result
def parse_hsp(hsp_tag):
"""
Parses a 'Hsp' XML dom object. Returns a list of values,
according to the items in HSP_XML_FIELDS.
"""
result = []
for tag_name in HSP_XML_TAGNAMES:
result.append(get_tag(hsp_tag,tag_name,0))
# what about these?
# self.identity = int(self.get_tag(record,'Hsp_identity', 0))
# self.positive = int(self.get_tag(record, 'Hsp_positive', 0))
return result
def parse_header(tag):
"""
Parses a 'BlastOutput' dom object.
Returns a dict with information from the blast header
"""
result = {}
result['application'] = get_tag(tag,'BlastOutput_program')
result['version'] = get_tag(tag,'BlastOutput_version')
result['reference'] = get_tag(tag,'BlastOutput_reference')
result['query'] = get_tag(tag,'BlastOutput_query-def')
result['query_letters'] = int(get_tag(tag,'BlastOutput_query-len'))
result['database'] = get_tag(tag,'BlastOutput_db')
# add data fro Parameters tag
for param_tag in tag.getElementsByTagName('BlastOutput_param'):
#for param_tag in tag.getElementsByTagName('Parameters'):
data = parse_parameters(param_tag)
for k in data:
result[k] = data[k]
return result
def parse_parameters(tag):
"""Parses a 'BlastOutput_param' dom object."""
result = {}
result['matrix'] = get_tag(tag,'Parameters_matrix')
result['expect'] = get_tag(tag,'Parameters_expect')
result['gap_open_penalty'] = float(get_tag(tag,'Parameters_gap-open'))
result['gap_extend_penalty'] = float(get_tag(tag,'Parameters_gap-extend'))
result['filter'] = get_tag(tag,'Parameters_filter')
return result
def MinimalBlastParser7(lines, include_column_names=False, format='xml'):
"""Yields succesive records from lines (props, data list).
lines must be XML BLAST output format.
output:
props is a dict of {UPPERCASE_KEY:value}.
data_list is a list of list of strings, optionally with header first.
LIST CONTAINS [HIT][HSP][strings], FIRST ENTRY IS LIST OF LABELS!
"""
doc = ''.join(lines)
dom_obj = xml.dom.minidom.parseString(doc)
query_id = 1
for record in dom_obj.getElementsByTagName('BlastOutput'):
props = parse_header(record)
hits = [HIT_XML_FIELDNAMES + HSP_XML_FIELDNAMES]
for hit in record.getElementsByTagName('Hit'):
hits += parse_hit(hit,query_id)
yield props,hits
class BlastXMLResult(BlastResult):
"""the BlastResult objects have the query sequence as keys,
and the values are lists of lists of dictionaries.
The FIELD NAMES given are the keys of the dict.
"""
# FIELD NAMES
QUERY_ALIGN = 'HSP QSEQ'
SUBJECT_ALIGN = 'HSP HSEQ'
MIDLINE_ALIGN = 'HSP MIDLINE'
HIT_DEF = 'HIT_DEF'
HIT_ACCESSION = 'HIT_ACCESSION'
HIT_LENGTH = 'HIT_LENGTH'
SCORE = 'SCORE'
POSITIVE = 'POSITIVE'
#FieldComparisonOperators = (
# BlastResult.FieldComparisonOperators = {
# HIT_DEF:(_gt, float)
# }
# .. to be done
# .. extend HitKeys
HitKeys = BlastResult.HitKeys.union(
set([ HIT_DEF,
HIT_ACCESSION,
HIT_LENGTH,
SCORE,
POSITIVE,
QUERY_ALIGN,
SUBJECT_ALIGN,
MIDLINE_ALIGN ]))
def __init__(self, data, psiblast=False, parser=None, xml=False):
# iterate blast results, generate data structure
"""
Init using blast 7 or blast 9 results
data: blast output from the m = 9 output option
psiblast: if True, will expect psiblast output, else expects
blast output
"""
# further improvement:
# add XML option to BlastResult __init__ instead of
# using a separate class.
if not parser:
if xml:
parser = MinimalBlastParser7
elif psiblast:
parser = MinimalPsiBlastParser9
else:
parser = MinimalBlastParser9
# code below copied from BlastResult, unchanged.
mp = parser(data, True)
for props, rec_data in mp:
iteration = 1
if self.ITERATION in props:
iteration = int(props[self.ITERATION])
hits = []
# check if found any hits
if len(rec_data) > 1:
for h in rec_data[1:]:
hits.append(dict(zip(rec_data[0], h)))
else:
hits.append(dict(zip(rec_data[0], ['' for x in rec_data[0]])))
# get blast version of query id
query_id = hits[0][self.QUERY_ID]
if query_id not in self:
self[query_id] = []
self[query_id].append(hits)
|