/usr/lib/python2.7/dist-packages/stetl/outputs/deegreeoutput.py is in python-stetl 1.0.9+ds-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 | #!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Output Components for deegree server storage (www.deegree.org).
#
# Author: Just van den Broecke
#
# NB deegree also supports WFS-T!
#
from stetl.postgis import PostGIS
from stetl.output import Output
from stetl.util import Util, etree
from stetl.packet import FORMAT
import os
log = Util.get_log('deegreeoutput')
class DeegreeBlobstoreOutput(Output):
"""
Insert features into deegree Blobstore from an etree doc.
consumes=FORMAT.etree_doc
"""
def __init__(self, configdict, section):
Output.__init__(self, configdict, section, consumes=FORMAT.etree_doc)
self.overwrite = self.cfg.get_bool('overwrite')
self.srid = self.cfg.get_int('srid', -1)
self.feature_member_tag = self.cfg.get('feature_member_tag')
self.feature_type_ids = {}
def init(self):
self.get_feature_types()
if self.overwrite:
self.delete_features()
# not required for now
self.pg_srs_constraint()
def get_feature_types(self):
log.info('reading all featuretypes from DB')
db = PostGIS(self.cfg.get_dict())
db.connect()
sql = "SELECT id,qname FROM feature_types"
db.execute(sql)
cur = db.cursor
for record in cur:
self.feature_type_ids[record[1]] = record[0]
def delete_features(self):
log.info('deleting ALL features in DB')
db = PostGIS(self.cfg.get_dict())
db.tx_execute("TRUNCATE gml_objects")
def pg_srs_constraint(self):
log.info('set srs constraint')
db = PostGIS(self.cfg.get_dict())
srid = self.srid
sql = "ALTER TABLE gml_objects DROP CONSTRAINT enforce_srid_gml_bounded_by;"
db.tx_execute(sql)
sql = "ALTER TABLE gml_objects ADD CONSTRAINT enforce_srid_gml_bounded_by CHECK (st_srid(gml_bounded_by) = (%s));" % srid
db.tx_execute(sql)
def write(self, packet):
if packet.data is None:
return packet
gml_doc = packet.data
log.info('inserting features in DB')
db = PostGIS(self.cfg.get_dict())
db.connect()
# print self.to_string(gml_doc, False, False)
# NS = {'base': 'urn:x-inspire:specification:gmlas:BaseTypes:3.2', 'gml': 'http://www.opengis.net/gml/3.2'}
# featureMembers = gml_doc.xpath('//base:member/*', namespaces=NS)
featureMembers = gml_doc.xpath("//*[local-name() = '%s']/*" % self.feature_member_tag)
count = 0
gml_ns = None
for childNode in featureMembers:
if gml_ns is None:
if childNode.nsmap.has_key('gml'):
gml_ns = childNode.nsmap['gml']
else:
if childNode.nsmap.has_key('GML'):
gml_ns = childNode.nsmap['GML']
gml_id = childNode.get('{%s}id' % gml_ns)
feature_type_id = self.feature_type_ids[childNode.tag]
# Find a GML geometry in the GML NS
ogrGeomWKT = None
# gmlMembers = childNode.xpath(".//gml:Point|.//gml:Curve|.//gml:Surface|.//gml:MultiSurface", namespaces=NS)
gmlMembers = childNode.xpath(
".//*[local-name() = 'Point']|.//*[local-name() = 'Polygon']|.//*[local-name() = 'Curve']|.//*[local-name() = 'Surface']|.//*[local-name() = 'MultiSurface']")
geom_str = None
for gmlMember in gmlMembers:
if geom_str is None:
geom_str = etree.tostring(gmlMember)
# no need for GDAL Python bindings for now, maybe when we'll optimize with COPY iso INSERT
# ogrGeom = ogr.CreateGeometryFromGML(str(gmlStr))
# if ogrGeom is not None:
# ogrGeomWKT = ogrGeom.ExportToWkt()
# if ogrGeomWKT is not None:
# break
blob = etree.tostring(childNode, pretty_print=False, xml_declaration=False, encoding='UTF-8')
if geom_str is None:
sql = "INSERT INTO gml_objects(gml_id, ft_type, binary_object) VALUES (%s, %s, %s)"
parameters = (gml_id, feature_type_id, db.make_bytea(blob))
else:
# ST_SetSRID(ST_GeomFromGML(%s)),-1)
sql = "INSERT INTO gml_objects(gml_id, ft_type, binary_object, gml_bounded_by) VALUES (%s, %s, %s, ST_SetSRID( ST_GeomFromGML(%s),%s) )"
parameters = (gml_id, feature_type_id, db.make_bytea(blob), geom_str, self.srid)
if db.execute(sql, parameters) == -1:
log.error("feat num# = %d error inserting feature blob=%s (but continuing)" % (count, blob))
# will fail but we will close connection also
db.commit()
# proceed...
log.info('retrying to proceed with remaining features...')
db = PostGIS(self.cfg.get_dict())
db.connect()
count = 0
count += 1
exception = db.commit()
if exception is not None:
log.error("error in commit")
log.info("inserted %s features" % count)
return packet
#
class DeegreeFSLoaderOutput(Output):
"""
Insert features via deegree using deegree's FSLoader tool from an etree doc.
consumes=FORMAT.etree_doc
"""
# d3toolbox FeatureStoreLoader -action insert -dataset ${DATASET} -format ${GML_VERSION} -fsconfig ${1} -idgen ${IDGEN_MODE} -workspace ${WORKSPACE}
cmd_tmpl = '%s|FeatureStoreLoader|-action|insert|-dataset|%s|-format|%s|-fsconfig|%s|-idgen|%s|-workspace|%s'
def __init__(self, configdict, section):
Output.__init__(self, configdict, section, consumes=FORMAT.etree_doc)
def write(self, packet):
from subprocess import Popen, PIPE
if packet.data is None:
return packet
gml_doc = packet.data
d3tools_path = self.cfg.get('d3tools_path')
workspace_path = self.cfg.get('workspace_path')
feature_store = self.cfg.get('feature_store')
format = self.cfg.get('format', 'GML_32')
idgen = self.cfg.get('idgen', 'USE_EXISTING')
java_opts = self.cfg.get('java_opts', "-Xms128m -Xmx1024m")
dataset = self.cfg.get('dataset_path', '/dev/stdin')
os.putenv("JAVA_OPTS", java_opts)
d3tools = d3tools_path + '/bin/d3toolbox'
cmd = DeegreeFSLoaderOutput.cmd_tmpl % (d3tools, dataset, format, feature_store, idgen, workspace_path)
cmd = cmd.split('|')
p = Popen(cmd, stdin=PIPE)
p.stdin.write(packet.to_string())
result = p.communicate()[0]
return packet
# print(result)
|