/usr/lib/python2.7/dist-packages/stetl/chain.py is in python-stetl 1.0.9+ds-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 | # -*- coding: utf-8 -*-
#
# Chain class: holds pipeline of components.
#
# Author: Just van den Broecke
#
from factory import factory
from packet import Packet
from util import Util
log = Util.get_log('chain')
class Chain:
"""
Holder for single invokable pipeline of components
A Chain is basically a singly linked list of Components
Each Component executes a part of the total ETL.
Data along the Chain is passed within a Packet object.
The compatibility of input and output for linked
Components is checked when adding a Component to the Chain.
"""
def __init__(self, chain_str, config_dict):
self.first_comp = None
self.cur_comp = None
self.config_dict = config_dict
self.chain_str = chain_str
def assemble(self):
"""
Builder method: build a Chain of linked Components
:return:
"""
log.info('Assembling Chain: %s...' % self.chain_str)
# Create linked list of input/filter/output (ETL Component) objects
chain_str_arr = self.chain_str.split('|')
for etl_section_name in chain_str_arr:
# Create the ETL component by name and properties
etl_comp = factory.create_obj(self.config_dict, etl_section_name)
# Add component to end of Chain
self.add(etl_comp)
def add(self, etl_comp):
"""
Add component to end of Chain
:param etl_comp:
:return:
"""
if not self.first_comp:
self.first_comp = etl_comp
else:
# Already component(s) in chain add to current
self.cur_comp.add_next(etl_comp)
# Remember current
self.cur_comp = etl_comp
def run(self):
"""
Run the ETL Chain.
:return:
"""
log.info('Running Chain: %s' % self.chain_str)
# One time init for entire Chain
self.first_comp.do_init()
# Do ETL as long as input available in Packet
packet = Packet()
rounds = 0
try:
while not packet.is_end_of_stream():
# try:
# Invoke the first component to start the chain
packet.init()
packet = self.first_comp.process(packet)
rounds += 1
# except (Exception), e:
# log.error("Fatal Error in ETL: %s"% str(e))
# break
finally:
# Always one time exit for entire Chain
self.first_comp.do_exit()
log.info('DONE - %d rounds - chain=%s ' % (rounds, self.chain_str))
|