/usr/lib/python2.7/dist-packages/stetl/inputs/deegreeinput.py is in python-stetl 1.1+ds-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 | #!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Input classes for ETL.
#
# Author: Just van den Broecke
#
import codecs
import re
from stetl.component import Config
from stetl.postgis import PostGIS
from stetl.input import Input
from stetl.util import Util, etree, StringIO
from stetl.packet import FORMAT
log = Util.get_log('deegreeinput')
class DeegreeBlobstoreInput(Input):
"""
Read features from deegree Blobstore DB into an etree doc.
produces=FORMAT.etree_doc
"""
# Start attribute config meta
@Config(ptype=int, required=False, default=10000)
def max_features_per_doc(self):
"""
Max features to read from input feature GML stream per internal document.
"""
pass
@Config(ptype=str, required=True, default=None)
def start_container(self):
"""
Tag that starts container.
"""
pass
@Config(ptype=str, required=True, default=None)
def end_container(self):
"""
Tag that ends container.
"""
pass
@Config(ptype=str, required=False, default=False)
def start_feature_tag(self):
"""
XML tag that starts Feature.
"""
pass
@Config(ptype=str, required=False, default=None)
def end_feature_tag(self):
"""
XML tag that ends Feature.
"""
pass
# End attribute config meta
def __init__(self, configdict, section):
Input.__init__(self, configdict, section, produces=FORMAT.etree_doc)
self.cur_feature_blob = None
self.rowcount = 0
# http://www.mkyong.com/regular-expressions/how-to-extract-html-links-with-regular-expression/
self.regex_xlink_href = re.compile("\\s*(?i)xlink:href\\s*=\\s*(\"#([^\"]*\")|'#[^']*'|(#[^'\">\\s]+))")
self.db = None
self.xlink_db = None
self.buffer = None
self.feature_count = 0
# Reusable XML parser
self.xml_parser = etree.XMLParser(remove_blank_text=True)
def init(self):
pass
def read(self, packet):
if packet.is_end_of_stream():
return packet
if self.db is None:
# First time read
log.info("reading records from blobstore..")
self.db = PostGIS(self.cfg.get_dict())
self.db.connect()
sql = self.cfg.get('sql')
self.rowcount = self.db.execute(sql)
self.cur = self.db.cursor
log.info("Read records rowcount=%d" % self.rowcount)
# Init separate connection to fetch objects referenced by xlink:href
self.xlink_db = PostGIS(self.cfg.get_dict())
self.xlink_db.connect()
# Query active
while self.cur is not None:
if self.buffer is None:
self.buffer = self.init_buf()
self.buffer.write(self.start_container)
# Get next blob record
record = self.cur.fetchone()
# End of all records
if record is None:
# End of records: start closing
self.buffer.write(self.end_container)
self.cur = None
self.db.commit()
# Only create doc if there are features in the buffer
if self.feature_count > 0:
self.buffer_to_doc(packet)
packet.set_end_of_doc()
break
else:
# New record: embed feature blob in feature tags and write to buffer
feature_blob = self.write_feature(record)
# If we have local xlinks: fetch the related features as well from the DB and
# output them within the same document (local href resolvable)
# TODO: in some cases we may need to be recursive (xlinks in xlinked features...)
# First construct a single query for all xlinks
xlink_sql = None
for xlink in self.regex_xlink_href.finditer(feature_blob):
gml_id = xlink.group(1).strip('"').strip('#')
# We don't want multiple occurences of the same xlinked feature
if gml_id in self.xlink_ids:
continue
self.xlink_ids.add(gml_id)
if xlink_sql is None:
xlink_sql = "SELECT binary_object from gml_objects where gml_id = '%s'" % gml_id
else:
xlink_sql += "OR gml_id = '%s'" % gml_id
# Should we retrieve and write xlinked features?
if xlink_sql is not None:
# Fetch from DB
self.xlink_db.execute(xlink_sql)
while True:
# Get next blob record
xlink_record = self.xlink_db.cursor.fetchone()
if xlink_record is None:
break
self.write_feature(xlink_record)
# Should we output a doc
if self.feature_count >= self.max_features_per_doc:
# End of records: create XML doc
self.buffer.write(self.end_container)
self.buffer_to_doc(packet)
break
if self.cur is None:
# All records handled: close off
packet.set_end_of_stream()
# log.info("[%s]" % packet.data)
return packet
def write_feature(self, record):
feature_blob = str(record[0])
# Write start-tag, blob element, end-tag
self.buffer.write(self.start_feature_tag)
self.buffer.write(feature_blob)
self.buffer.write(self.end_feature_tag)
self.feature_count += 1
return feature_blob
def init_buf(self):
buffer = StringIO()
buffer = codecs.getwriter("utf8")(buffer)
self.feature_count = 0
self.xlink_ids = set()
return buffer
def buffer_to_doc(self, packet):
# Process/transform data in buffer
self.buffer.seek(0)
try:
# print '[' + self.buffer.getvalue() + ']'
packet.data = etree.parse(self.buffer, self.xml_parser)
# print buffer.getvalue()
except Exception as e:
bufStr = self.buffer.getvalue()
if not bufStr:
log.info("parse buffer empty: content=[%s]" % bufStr)
else:
log.error("error in buffer parsing %s" % str(e))
# print(bufStr)
raise
self.buffer.close()
self.buffer = None
|