/usr/lib/python2.7/dist-packages/stetl/etl.py is in python-stetl 1.1+ds-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 | # -*- coding: utf-8 -*-
#
# Main ETL program.
#
# Author: Just van den Broecke
#
import os
import sys
from ConfigParser import ConfigParser
import version
from util import Util
from chain import Chain
import StringIO
log = Util.get_log('ETL')
class ETL:
"""The main class: builds ETL Chains with connected Components from a config and let them run.
Usually this class is called via :mod:`main` but it may be called directly for direct integration.
"""
CONFIG_DIR = None
def __init__(self, options_dict, args_dict=None):
"""
:param options_dict: dictionary with options, now only config_file files with path to config file
:param args_dict: optional dictionary with arguments to be substituted for symbolic values in config
:return:
Assume path to config .ini file is in options dict
"""
# args_dict is optional and is used to do string substitutions in options_dict.config file
log.info("INIT - Stetl version is %s" % str(version.__version__))
self.options_dict = options_dict
config_file = self.options_dict.get('config_file')
if config_file is None or not os.path.isfile(config_file):
log.error('No config file found at: %s' % config_file)
sys.exit(1)
ETL.CONFIG_DIR = os.path.dirname(os.path.abspath(config_file))
log.info("Config/working dir = %s" % ETL.CONFIG_DIR)
self.configdict = ConfigParser()
sys.path.append(ETL.CONFIG_DIR)
try:
log.info("Reading config_file = %s" % config_file)
if args_dict:
log.info("Substituting %d args in config file from args_dict: %s" % (len(args_dict), str(args_dict)))
# Get config file as string
f = open(config_file, 'r')
config_str = f.read()
f.close()
# Do replacements see http://docs.python.org/2/library/string.html#formatstrings
config_str = config_str.format(**args_dict)
log.info("Substituting args OK")
# Put Config string into buffer (readfp() needs a readline() method)
config_buf = StringIO.StringIO(config_str)
# Parse config from file buffer
self.configdict.readfp(config_buf, config_file)
else:
# Parse config file directly
self.configdict.read(config_file)
except Exception as e:
log.error("Fatal Error reading config file: err=%s" % str(e))
def run(self):
# The main ETL processing
log.info("START")
t1 = Util.start_timer("total ETL")
# Get the ETL Chain pipeline config strings
# Default is to use the section [etl], but may be overidden on cmd line
config_section = self.options_dict.get('config_section')
if config_section is None:
config_section = 'etl'
chains_str = self.configdict.get(config_section, 'chains')
if not chains_str:
raise ValueError('ETL chain entry not defined in section [etl]')
# Multiple Chains may be specified in the config
chains_str_arr = chains_str.split(',')
for chain_str in chains_str_arr:
# Build single Chain of components and let it run
chain = Chain(chain_str, self.configdict)
chain.assemble()
# Run the ETL for this Chain
chain.run()
Util.end_timer(t1, "total ETL")
log.info("ALL DONE")
|