This file is indexed.

/usr/lib/python2.7/dist-packages/stetl/inputs/ogrinput.py is in python-stetl 1.1+ds-2.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Input classes for ETL via GDAL OGR.
#
# Author: Just van den Broecke
#
import subprocess
from stetl.component import Config
from stetl.util import Util, gdal, ogr
from stetl.input import Input
from stetl.packet import FORMAT

log = Util.get_log('ogrinput')


class OgrInput(Input):
    """
    Direct GDAL OGR input via Python OGR wrapper. Via the Python API http://gdal.org/python
    an OGR data source is accessed and from each layer the Features are read.
    Each Layer corresponds to a "doc", so for multi-layer sources the 'end-of-doc' flag is
    set after a Layer has been read.

    This input can read almost any geospatial dataformat. One can use the features directly
    in a Stetl Filter or use a converter to e.g. convert to GeoJSON structures.

    produces=FORMAT.ogr_feature or FORMAT.ogr_feature_array (all features)
    """

    # Start attribute config meta
    # Applying Decorator pattern with the Config class to provide
    # read-only config values from the configured properties.

    @Config(ptype=str, default=None, required=True)
    def data_source(self):
        """
        String denoting the OGR datasource. Usually a path to a file like "path/rivers.shp" or connection string
        to PostgreSQL like "PG: host=localhost dbname='rivers' user='postgres'".
        """
        pass

    @Config(ptype=str, default=None, required=False)
    def source_format(self):
        """
        Instructs GDAL to use driver by that name to open datasource. Not required for
        many standard formats that are self-describing like ESRI Shapefile.

        Examples: 'PostgreSQL', 'GeoJSON' etc
        """
        pass

    @Config(ptype=dict, default=None, required=False)
    def source_options(self):
        """
        Custom datasource-specific options. Used in gdal.SetConfigOption().
        """
        pass

    @Config(ptype=str, default=None, required=False)
    def sql(self):
        """
        String with SQL query. Mandatory for PostgreSQL OGR source.
        """
        pass

    # End attribute config meta

    # Constructor
    def __init__(self, configdict, section):
        Input.__init__(self, configdict, section, produces=[FORMAT.ogr_feature, FORMAT.ogr_feature_array])

    def init(self):

        self.ogr = ogr
        # http://trac.osgeo.org/gdal/wiki/PythonGotchas
        self.gdal = gdal
        self.gdal.UseExceptions()
        log.info("Using GDAL/OGR version: %d" % int(gdal.VersionInfo('VERSION_NUM')))

        # GDAL error handler function
        # http://pcjericks.github.io/py-gdalogr-cookbook/gdal_general.html
        def gdal_error_handler(err_class, err_num, err_msg):
            err_type = {
                gdal.CE_None: 'None',
                gdal.CE_Debug: 'Debug',
                gdal.CE_Warning: 'Warning',
                gdal.CE_Failure: 'Failure',
                gdal.CE_Fatal: 'Fatal'
            }
            err_msg = err_msg.replace('\n', ' ')
            err_class = err_type.get(err_class, 'None')
            log.error('Error Number: %s, Type: %s, Msg: %s' % (err_num, err_class, err_msg))

        # install error handler
        self.gdal.PushErrorHandler(gdal_error_handler)

        # Raise a dummy error for testing
        # self.gdal.Error(1, 2, 'test error')

        if self.source_options:
            for k in self.source_options:
                self.gdal.SetConfigOption(k, self.source_options[k])

        # Open OGR data source in read-only mode.
        if self.source_format:
            self.data_source_p = ogr.GetDriverByName(self.source_format).Open(self.data_source, 0)
        else:
            self.data_source_p = self.ogr.Open(self.data_source, 0)

        # Report failure if failed
        if self.data_source_p is None:
            log.error("Cannot open OGR datasource: %s with the following drivers." % self.data_source)

            for iDriver in range(self.ogr.GetDriverCount()):
                log.info("  ->  " + self.ogr.GetDriver(iDriver).GetName())

            raise Exception()
        else:
            # Open ok: initialize
            self.layer = None

            if self.sql:
                self.layer_count = 1
                self.layer_idx = -1
            else:
                self.layer_count = self.data_source_p.GetLayerCount()
                self.layer_idx = 0

            log.info("Opened OGR source ok: %s layer count=%d" % (self.data_source, self.layer_count))

    def read(self, packet):
        if not self.data_source_p:
            log.info("End reading from: %s" % self.data_source)
            return packet

        if self.layer is None:
            if self.sql and self.layer_idx == -1:
                # PostgreSQL: Layer is gotten via Query
                # http://trac.osgeo.org/postgis/wiki/UsersWikiOGR
                self.layer = self.data_source_p.ExecuteSQL(self.sql)
                self.layer_idx = 0
            elif self.layer_idx < self.layer_count:
                self.layer = self.data_source_p.GetLayer(self.layer_idx)
                self.layer_idx += 1
                if self.layer is None:
                    log.error("Could not fetch layer %d" % 0)
                    raise Exception()
                log.info("Start reading from OGR Source: %s, Layer: %s" % (self.data_source, self.layer.GetName()))
            else:
                # No more Layers left: cleanup
                packet.set_end_of_stream()
                log.info("Closing OGR source: %s" % self.data_source)
                # Destroy not required anymore: http://trac.osgeo.org/gdal/wiki/PythonGotchas
                # self.data_source_p.Destroy()
                self.data_source_p = None
                return packet

        # Return all features from Layer (ogr_feature_array) or next feature (ogr_feature)
        if self.output_format == FORMAT.ogr_feature_array:
            # Assemble all features
            features = list()
            for feature in self.layer:
                features.append(feature)

            packet.data = features
            log.info("End reading all features from Layer: %s count=%d" % (self.layer.GetName(), len(features)))
            packet.set_end_of_doc()
            self.layer = None
        else:
            # Next feature
            feature = self.layer.GetNextFeature()
            if feature:
                packet.data = feature
            else:
                log.info("End reading from Layer: %s" % self.layer.GetName())
                packet.set_end_of_doc()
                self.layer = None

        return packet


class OgrPostgisInput(Input):
    """
     Input from PostGIS via ogr2ogr command. For now hardcoded to produce an ogr GML line stream.
     OgrInput may be a better alternative.

     Alternatives: either stetl.input.PostgresqlInput or stetl.input.OgrInput.

     produces=FORMAT.xml_line_stream
    """

    # Start attribute config meta
    @Config(ptype=str, required=False, default='localhost')
    def in_pg_host(self):
        """
        Host of input DB.
        """
        pass

    @Config(ptype=str, required=False, default='5432')
    def in_pg_port(self):
        """
        Port of input DB.
        """
        pass

    @Config(ptype=str, required=True, default=None)
    def in_pg_db(self):
        """
        Database name input DB.

        """
        pass

    @Config(ptype=str, required=False, default=None)
    def in_pg_schema(self):
        """
        DB Schema name input DB.
        """
        pass

    @Config(ptype=str, required=False, default='postgres')
    def in_pg_user(self):
        """
        User input DB.
        """
        pass

    @Config(ptype=str, required=False, default='postgres')
    def in_pg_password(self):
        """
        Password input DB.
        """
        pass

    @Config(ptype=str, required=False, default=None)
    def in_srs(self):
        """
        SRS (projection) (ogr2ogr -s_srs) input DB e.g. 'EPSG:28992'.
        """
        pass

    @Config(ptype=str, required=False, default=None)
    def in_pg_sql(self):
        """
        The input query (string) to fire.
        """
        pass

    @Config(ptype=str, required=False, default=None)
    def out_srs(self):
        """
        Target SRS (ogr2ogr -t_srs) code output stream.
        """
        pass

    @Config(ptype=str, required=False, default='2')
    def out_dimension(self):
        """
        Dimension (OGR: DIM=N) of features in output stream.
        """
        pass

    @Config(ptype=str, required=False, default=None)
    def out_gml_format(self):
        """
        GML format OGR name in output stream, e.g. 'GML3'.
        """
        pass

    @Config(ptype=str, required=False, default=None)
    def out_layer_name(self):
        """
        New Layer name (ogr2ogr -nln) output stream, e.g. 'address'.
        """
        pass

    @Config(ptype=str, required=False, default=None)
    def out_geotype(self):
        """
        OGR Geometry type new layer in output stream, e.g. POINT.
        """
        pass

    # End attribute config meta

    # TODO make this template configurable so we can have generic ogr2ogr input....
    pg_conn_tmpl = "PG:host=%s dbname=%s active_schema=%s user=%s password=%s port=%s"
    cmd_tmpl = 'ogr2ogr|-t_srs|%s|-s_srs|%s|-f|GML|%s|-dsco|FORMAT=%s|-lco|DIM=%s|%s|-SQL|%s|-nln|%s|%s'

    # Constructor
    def __init__(self, configdict, section):
        Input.__init__(self, configdict, section, produces=FORMAT.xml_line_stream)

    def init(self):
        self.ogr_process = None
        self.eof_stdout = False
        self.eof_stderr = False
        self.out_file = '/vsistdout/'

        #
        # Build ogr2ogr command line
        #
        # PostGIS PG: options
        self.pg = OgrPostgisInput.pg_conn_tmpl % (
            self.in_pg_host, self.in_pg_db, self.in_pg_schema, self.in_pg_user, self.in_pg_password, self.in_pg_port)

        # Entire ogr2ogr command line
        self.cmd = OgrPostgisInput.cmd_tmpl % (
            self.out_srs, self.in_srs, self.out_file, self.out_gml_format, self.out_dimension, self.pg, self.in_pg_sql,
            self.out_layer_name, self.out_geotype)

        # Make array to make it easy for Popen with quotes etc
        self.cmd = self.cmd.split('|')

    def exec_cmd(self):
        log.info("start ogr2ogr cmd = %s" % repr(self.cmd))
        self.ogr_process = subprocess.Popen(self.cmd,
                                            shell=False,
                                            stdout=subprocess.PIPE,
                                            stderr=subprocess.PIPE)

        err_line = self.readline_err()
        if err_line:
            log.warning('ogr2ogr: %s ' % err_line)

            #        exit_code = self.ogr_process.returncode

    def readline(self):
        if self.eof_stdout is True:
            return None

        line = self.ogr_process.stdout.readline()
        if not line:
            self.eof_stdout = True
            log.info("EOF stdout")
            return None

        line = line.decode('utf-8')
        return line

    def readline_err(self):
        if self.eof_stderr is True:
            return None

        line = self.ogr_process.stderr.readline()
        if not line:
            self.eof_stderr = True
            log.info("EOF stderr")
            return None

        return line

    def read(self, packet):
        if packet.is_end_of_stream():
            return packet

        # First time here : spawn the ogr2ogr command
        if self.ogr_process is None:
            # New splitter for each layer
            self.exec_cmd()

        packet.data = self.readline()
        if not packet.data:
            err_line = self.readline_err()
            if err_line:
                log.warning('ogr2ogr: %s ' % err_line)

            packet.set_end_of_stream()
            log.info('EOF ogr2ogr output')

        return packet