This file is indexed.

/usr/lib/python2.7/dist-packages/taskflow/listeners/capturing.py is in python-taskflow 2.3.0-2.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
# -*- coding: utf-8 -*-

#    Copyright (C) 2015 Yahoo! Inc. All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

from taskflow.listeners import base


def _freeze_it(values):
    """Freezes a set of values (handling none/empty nicely)."""
    if not values:
        return frozenset()
    else:
        return frozenset(values)


class CaptureListener(base.Listener):
    """A listener that captures transitions and saves them locally.

    NOTE(harlowja): this listener is *mainly* useful for testing (where it is
    useful to test the appropriate/expected transitions, produced results...
    occurred after engine running) but it could have other usages as well.

    :ivar values: Captured transitions + details (the result of
                  the :py:meth:`._format_capture` method) are stored into this
                  list (a previous list to append to may be provided using the
                  constructor keyword argument of the same name); by default
                  this stores tuples of the format ``(kind, state, details)``.
    """

    # Constant 'kind' strings used in the default capture formatting (to
    # identify what was captured); these are saved into the accumulated
    # values as the first index (so that your code may differentiate between
    # what was captured).

    #: Kind that denotes a 'flow' capture.
    FLOW = 'flow'

    #: Kind that denotes a 'task' capture.
    TASK = 'task'

    #: Kind that denotes a 'retry' capture.
    RETRY = 'retry'

    def __init__(self, engine,
                 task_listen_for=base.DEFAULT_LISTEN_FOR,
                 flow_listen_for=base.DEFAULT_LISTEN_FOR,
                 retry_listen_for=base.DEFAULT_LISTEN_FOR,
                 # Easily override what you want captured and where it
                 # should save into and what should be skipped...
                 capture_flow=True, capture_task=True, capture_retry=True,
                 # Skip capturing *all* tasks, all retries, all flows...
                 skip_tasks=None, skip_retries=None, skip_flows=None,
                 # Provide your own list (or previous list) to accumulate
                 # into...
                 values=None):
        super(CaptureListener, self).__init__(
            engine,
            task_listen_for=task_listen_for,
            flow_listen_for=flow_listen_for,
            retry_listen_for=retry_listen_for)
        self._capture_flow = capture_flow
        self._capture_task = capture_task
        self._capture_retry = capture_retry
        self._skip_tasks = _freeze_it(skip_tasks)
        self._skip_flows = _freeze_it(skip_flows)
        self._skip_retries = _freeze_it(skip_retries)
        if values is None:
            self.values = []
        else:
            self.values = values

    @staticmethod
    def _format_capture(kind, state, details):
        """Tweak what is saved according to your desire(s)."""
        return (kind, state, details)

    def _task_receiver(self, state, details):
        if self._capture_task:
            if details['task_name'] not in self._skip_tasks:
                self.values.append(self._format_capture(self.TASK,
                                                        state, details))

    def _retry_receiver(self, state, details):
        if self._capture_retry:
            if details['retry_name'] not in self._skip_retries:
                self.values.append(self._format_capture(self.RETRY,
                                                        state, details))

    def _flow_receiver(self, state, details):
        if self._capture_flow:
            if details['flow_name'] not in self._skip_flows:
                self.values.append(self._format_capture(self.FLOW,
                                                        state, details))