This file is indexed.

/usr/lib/python2.7/dist-packages/stetl/chain.py is in python-stetl 1.1+ds-2.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# -*- coding: utf-8 -*-
#
# Chain class: holds pipeline of components.
#
# Author: Just van den Broecke
#

from factory import factory
from packet import Packet
from util import Util
from splitter import Splitter
from merger import Merger

log = Util.get_log('chain')


class Chain:
    """
    Holder for single invokable pipeline of components
    A Chain is basically a singly linked list of Components
    Each Component executes a part of the total ETL.
    Data along the Chain is passed within a Packet object.
    The compatibility of input and output for linked
    Components is checked when adding a Component to the Chain.
    """

    def __init__(self, chain_str, config_dict):
        self.first_comp = None
        self.cur_comp = None
        self.config_dict = config_dict
        self.chain_str = chain_str.strip()

    def assemble(self):
        """
        Builder method: build a Chain of linked Components
        :return:
        """
        log.info('Assembling Chain: %s...' % self.chain_str)

        # Create linked list of input/filter/output (ETL Component) objects
        chain_str = self.chain_str
        sub_comps = []
        while chain_str:
            chain_str = chain_str.strip()

            # Check and handle Splitter construct
            # e.g. input_xml_file |(transformer_xslt|output_file) (output_std) (transformer_xslt|output_std)
            if chain_str.startswith('('):
                etl_section_name, chain_str = chain_str.split(')', 1)
                etl_section_name = etl_section_name.strip('(')

                # Check for subchain (split at Filter level)
                if '|' in etl_section_name:
                    # Have subchain: use Chain to assemble
                    sub_chain = Chain(etl_section_name, self.config_dict)
                    sub_chain.assemble()
                    child_comp = sub_chain.first_comp
                else:
                    # Single component (Output) to split
                    child_comp = factory.create_obj(self.config_dict, etl_section_name.strip())

                # Assemble Components (can be subchains) for Splitter later
                sub_comps.append(child_comp)
                if '(' in chain_str:
                    # Still components (subchains) to assemble for Splitter
                    continue

            if len(sub_comps) > 0:
                if chain_str.startswith('|'):
                    # Next component is Merger with children
                    etl_comp = Merger(self.config_dict, sub_comps)
                    dummy, chain_str = chain_str.split('|', 1)
                else:
                    # Next component is Splitter with children
                    etl_comp = Splitter(self.config_dict, sub_comps)
                sub_comps = []
            else:

                # "Normal" case: regular Components piped in Chain
                if '|' in chain_str:
                    # More than one component in remaining Chain
                    etl_section_name, chain_str = chain_str.split('|', 1)
                else:
                    # Last element, we're done!
                    etl_section_name = chain_str
                    chain_str = None

                # Create the ETL component by name and properties
                etl_comp = factory.create_obj(self.config_dict, etl_section_name.strip())

            # Add component to end of Chain
            self.add(etl_comp)

    def add(self, etl_comp):
        """
        Add component to end of Chain
        :param etl_comp:
        :return:
        """
        if not self.first_comp:
            self.first_comp = etl_comp
        else:
            # Already component(s) in chain add to current
            self.cur_comp.add_next(etl_comp)

        # Remember current
        self.cur_comp = etl_comp

    def get_by_class(self, clazz):
        """
        Get Component instance from Chain by class, mainly for testing.
        :param clazz:
        :return Component:
        """
        cur_comp = self.first_comp
        while cur_comp:
            if cur_comp.__class__ == clazz:
                return cur_comp

            # Try next in Chain
            cur_comp = cur_comp.next

        return None

    def get_by_id(self, id):
        """
        Get Component instance from Chain, mainly for testing.
        :param name:
        :return Component:
        """
        cur_comp = self.first_comp
        while cur_comp:
            if cur_comp.get_id() == id:
                return cur_comp

            # Try next in Chain
            cur_comp = cur_comp.next

        return None

    def get_by_index(self, index):
        """
        Get Component instance from Chain by position/index in Chain, mainly for testing.
        :param clazz:
        :return Component:
        """
        cur_comp = self.first_comp
        i = 0
        while cur_comp and i < index:
            # Try next in Chain
            cur_comp = cur_comp.next
            i += 1

        return cur_comp

    def run(self):
        """
        Run the ETL Chain.
        :return:
        """
        log.info('Running Chain: %s' % self.chain_str)

        # One time init for entire Chain
        self.first_comp.do_init()

        # Do ETL as long as input available in Packet
        packet = Packet()
        rounds = 0
        try:
            while not packet.is_end_of_stream():
                # try:
                # Invoke the first component to start the chain
                packet.init()
                packet = self.first_comp.process(packet)
                rounds += 1
                #            except (Exception), e:
                #                log.error("Fatal Error in ETL: %s"% str(e))
                #                break
        finally:
            # Always one time exit for entire Chain
            self.first_comp.do_exit()

        log.info('DONE - %d rounds - chain=%s ' % (rounds, self.chain_str))