This file is indexed.

/usr/lib/python2.7/dist-packages/stetl/etl.py is in python-stetl 1.1+ds-2.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
# -*- coding: utf-8 -*-
#
# Main ETL program.
#
# Author: Just van den Broecke
#
import os
import sys
from ConfigParser import ConfigParser
import version
from util import Util
from chain import Chain
import StringIO

log = Util.get_log('ETL')


class ETL:
    """The main class: builds ETL Chains with connected Components from a config and let them run.

    Usually this class is called via :mod:`main`  but it may be called directly for direct integration.

    """

    CONFIG_DIR = None

    def __init__(self, options_dict, args_dict=None):
        """
        :param options_dict: dictionary with options, now only config_file files with path to config file
        :param args_dict: optional dictionary with arguments to be substituted for symbolic values in config
        :return:

        Assume path to config .ini file is in options dict
        """
        # args_dict is optional and is used to do string substitutions in options_dict.config file

        log.info("INIT - Stetl version is %s" % str(version.__version__))

        self.options_dict = options_dict
        config_file = self.options_dict.get('config_file')

        if config_file is None or not os.path.isfile(config_file):
            log.error('No config file found at: %s' % config_file)
            sys.exit(1)

        ETL.CONFIG_DIR = os.path.dirname(os.path.abspath(config_file))
        log.info("Config/working dir = %s" % ETL.CONFIG_DIR)

        self.configdict = ConfigParser()

        sys.path.append(ETL.CONFIG_DIR)

        try:
            log.info("Reading config_file = %s" % config_file)
            if args_dict:
                log.info("Substituting %d args in config file from args_dict: %s" % (len(args_dict), str(args_dict)))
                # Get config file as string
                f = open(config_file, 'r')
                config_str = f.read()
                f.close()

                # Do replacements  see http://docs.python.org/2/library/string.html#formatstrings
                config_str = config_str.format(**args_dict)

                log.info("Substituting args OK")
                # Put Config string into buffer (readfp() needs a readline() method)
                config_buf = StringIO.StringIO(config_str)

                # Parse config from file buffer
                self.configdict.readfp(config_buf, config_file)
            else:
                # Parse config file directly
                self.configdict.read(config_file)
        except Exception as e:
            log.error("Fatal Error reading config file: err=%s" % str(e))

    def run(self):
        # The main ETL processing
        log.info("START")
        t1 = Util.start_timer("total ETL")

        # Get the ETL Chain pipeline config strings
        # Default is to use the section [etl], but may be overidden on cmd line

        config_section = self.options_dict.get('config_section')
        if config_section is None:
            config_section = 'etl'

        chains_str = self.configdict.get(config_section, 'chains')
        if not chains_str:
            raise ValueError('ETL chain entry not defined in section [etl]')

        # Multiple Chains may be specified in the config
        chains_str_arr = chains_str.split(',')
        for chain_str in chains_str_arr:
            # Build single Chain of components and let it run
            chain = Chain(chain_str, self.configdict)
            chain.assemble()

            # Run the ETL for this Chain
            chain.run()

        Util.end_timer(t1, "total ETL")

        log.info("ALL DONE")