/usr/lib/python2.7/dist-packages/stetl/merger.py is in python-stetl 1.1+ds-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 | # -*- coding: utf-8 -*-
#
# Merger Component base class for ETL.
#
# Author: Just van den Broecke
#
import random
from util import Util
from component import Component
log = Util.get_log('merger')
class Merger(Component):
"""
Component that merges multiple Input Components into a single Component.
Use this for example to combine multiple input streams like API endpoints.
The Merger will embed Child Components to which actions are delegated.
A Child Component may be a sub-Chain e.g. (Input|Filter|Filter..) sequence.
Hence the "next" should be coupled to the last Component in that sub-Chain with
the degenerate case where the sub-Chain is a single (Input) Component.
NB this Component can only be used for Inputs.
"""
def __init__(self, config_dict, child_list):
# Assemble child list
self.children = []
section_name = ''
for child in child_list:
section_name += '-%s_%d' % (child.get_id(), random.randrange(0, 100000))
# A Child can be a sub-Chain: each child is tuple: [0] is first
# [1] is last in sub-Chain. child[0] === child[1] if child is single Component.
# Need to remember both first and last in order to link/unlink subchain.
# So we store the Child as a tuple of (first, last).
self.children.append((child, child.get_last()))
# Add ourselves to config for compat with Component
config_dict.add_section(section_name)
# We use the in/out formats of first child, will be compat-checked later
Component.__init__(self, config_dict, section_name, consumes=self.first(self.children[0])._input_format,
produces=self.last(self.children[0])._output_format)
self.end_count = len(self.children)
def add_next(self, next_component):
for child in self.children:
# Couple Child Component's last .next directly to our next
self.last(child).add_next(next_component)
# Remember next
self.next = next_component
def first(self, child):
"""
Get first Component in Child sub-Chain.
:param child:
:return: first Component
"""
return child[0]
def last(self, child):
"""
Get last Component in Child sub-Chain.
:param child:
:return: last Component
"""
return child[1]
# Check compatibility with our child Components
def is_compatible(self):
for child in self.children:
# Last in subchain must be compatible
if not self.last(child).is_compatible():
return False
return True
def process(self, packet):
# Defer processing to children
# and track of End-of-Stream Packets
for child in self.children:
# Skip inactive child Components
if not self.last(child).next:
continue
# Defer to child
self.first(child).process(packet)
# Keep track of End-of-Stream
if packet.is_end_of_stream():
# deactivate Child by unlinking
# otherwise we'll keep getting EoS
self.last(child).next = None
self.end_count -= 1
# Re-init to start afresh again
packet.init()
# Only if all children have End-of-Stream
# declare the Packet returned EoS.
if self.end_count == 0:
packet.set_end_of_stream()
return packet
def do_init(self):
for child in self.children:
# Only init the child, without
# initing upstream Components via Chain
self.last(child).next = None
self.first(child).do_init()
self.last(child).next = self.next
# init upstream Components once
self.next.do_init()
def do_exit(self):
for child in self.children:
# Only exit the child, without
# exiting upstream Components via Chain
self.last(child).next = None
self.first(child).do_exit()
# exit upstream Components once
self.next.do_exit()
|