/usr/lib/python2.7/dist-packages/stetl/filters/xmlassembler.py is in python-stetl 1.1+ds-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 | #!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Splits stream of XML elements into etree docs.
#
# Author: Just van den Broecke
#
from stetl.util import Util, etree
from stetl.filter import Filter
from stetl.packet import FORMAT
log = Util.get_log('xmlassembler')
class XmlAssembler(Filter):
"""
Split a stream of etree DOM XML elements (usually Features) into etree DOM docs.
Consumes and buffers elements until max_elements reached, will then produce an etree doc.
consumes=FORMAT.etree_element, produces=FORMAT.etree_doc
"""
xpath_base = "//*[local-name() = '%s']"
# Constructor
def __init__(self, configdict, section):
Filter.__init__(self, configdict, section, consumes=FORMAT.etree_element, produces=FORMAT.etree_doc)
log.info("cfg = %s" % self.cfg.to_string())
self.max_elements = self.cfg.get_int('max_elements', 10000)
self.container_doc = self.cfg.get('container_doc')
self.element_container_xpath = XmlAssembler.xpath_base % self.cfg.get('element_container_tag')
self.total_element_count = 0
self.element_arr = []
# Reusable XML parser
self.xml_parser = etree.XMLParser(remove_blank_text=True)
def invoke(self, packet):
if packet.data is not None:
# Valid element: consume and handle
self.consume_element(packet)
# Document is obviously not finished, reset EoD/EoS in packet
packet.set_end_of_stream(False)
packet.set_end_of_doc(False)
if packet.is_end_of_stream() or packet.is_end_of_doc() or len(self.element_arr) >= self.max_elements:
# EOF but still data in buffer: make doc
# log.info("Flush doc")
self.flush_elements(packet)
return packet
def consume_element(self, packet):
# Always move the data (element) from packet
element = packet.consume()
if element is not None:
self.total_element_count += 1
self.element_arr.append(element)
return packet
def flush_elements(self, packet):
packet.set_end_of_doc()
if len(self.element_arr) == 0:
return packet
# Start new doc (TODO clone)
try:
etree_doc = etree.fromstring(self.container_doc, self.xml_parser)
except Exception as e:
log.error('new container doc not OK: %s' % str(e))
return packet
parent_element = etree_doc.xpath(self.element_container_xpath)
if len(parent_element) > 0:
parent_element = parent_element[0]
for element in self.element_arr:
parent_element.append(element)
log.info('xmldoc ready: elms=%d total_elms=%d' % (len(self.element_arr), self.total_element_count))
packet.data = etree_doc
self.element_arr = []
return packet
|