This file is indexed.

/usr/lib/python3/dist-packages/s3transfer/tasks.py is in python3-s3transfer 0.1.13-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import copy
import logging

from s3transfer.utils import get_callbacks


logger = logging.getLogger(__name__)


class Task(object):
    """A task associated to a TransferFuture request

    This is a base class for other classes to subclass from. All subclassed
    classes must implement the main() method.
    """
    def __init__(self, transfer_coordinator, main_kwargs=None,
                 pending_main_kwargs=None, done_callbacks=None,
                 is_final=False):
        """
        :type transfer_coordinator: s3transfer.futures.TransferCoordinator
        :param transfer_coordinator: The context associated to the
            TransferFuture for which this Task is associated with.

        :type main_kwargs: dict
        :param main_kwargs: The keyword args that can be immediately supplied
            to the _main() method of the task

        :type pending_main_kwargs: dict
        :param pending_main_kwargs: The keyword args that are depended upon
            by the result from a dependent future(s). The result returned by
            the future(s) will be used as the value for the keyword argument
            when _main() is called. The values for each key can be:
                * a single future - Once completed, its value will be the
                  result of that single future
                * a list of futures - Once all of the futures complete, the
                  value used will be a list of each completed future result
                  value in order of when they were originally supplied.

        :type done_callbacks: list of callbacks
        :param done_callbacks: A list of callbacks to call once the task is
            done completing. Each callback will be called with no arguments
            and will be called no matter if the task succeeds or an exception
            is raised.

        :type is_final: boolean
        :param is_final: True, to indicate that this task is the final task
            for the TransferFuture request. By setting this value to True, it
            will set the result of the entire TransferFuture to the result
            returned by this task's main() method.
        """
        self._transfer_coordinator = transfer_coordinator

        self._main_kwargs = main_kwargs
        if self._main_kwargs is None:
            self._main_kwargs = {}

        self._pending_main_kwargs = pending_main_kwargs
        if pending_main_kwargs is None:
            self._pending_main_kwargs = {}

        self._done_callbacks = done_callbacks
        if self._done_callbacks is None:
            self._done_callbacks = []

        self._is_final = is_final

    def __repr__(self):
        # These are the general main_kwarg parameters that we want to
        # display in the repr.
        params_to_display = [
            'bucket', 'key', 'part_number', 'final_filename',
            'transfer_future', 'offset', 'extra_args'
        ]
        main_kwargs_to_display = self._get_kwargs_with_params_to_include(
            self._main_kwargs, params_to_display)
        return '%s(transfer_id=%s, %s)' % (
            self.__class__.__name__, self._transfer_coordinator.transfer_id,
            main_kwargs_to_display)

    @property
    def transfer_id(self):
        """The id for the transfer request that the task belongs to"""
        return self._transfer_coordinator.transfer_id

    def _get_kwargs_with_params_to_include(self, kwargs, include):
        filtered_kwargs = {}
        for param in include:
            if param in kwargs:
                filtered_kwargs[param] = kwargs[param]
        return filtered_kwargs

    def _get_kwargs_with_params_to_exclude(self, kwargs, exclude):
        filtered_kwargs = {}
        for param, value in kwargs.items():
            if param in exclude:
                continue
            filtered_kwargs[param] = value
        return filtered_kwargs

    def __call__(self):
        """The callable to use when submitting a Task to an executor"""
        try:
            # Wait for all of futures this task depends on.
            self._wait_on_dependent_futures()
            # Gather up all of the main keyword arguments for main().
            # This includes the immediately provided main_kwargs and
            # the values for pending_main_kwargs that source from the return
            # values from the task's depenent futures.
            kwargs = self._get_all_main_kwargs()
            # If the task is not done (really only if some other related
            # task to the TransferFuture had failed) then execute the task's
            # main() method.
            if not self._transfer_coordinator.done():
                return self._execute_main(kwargs)
        except Exception as e:
            self._log_and_set_exception(e)
        finally:
            # Run any done callbacks associated to the task no matter what.
            for done_callback in self._done_callbacks:
                done_callback()

            if self._is_final:
                # If this is the final task announce that it is done if results
                # are waiting on its completion.
                self._transfer_coordinator.announce_done()

    def _execute_main(self, kwargs):
        # Do not display keyword args that should not be printed, especially
        # if they are going to make the logs hard to follow.
        params_to_exclude = ['data']
        kwargs_to_display = self._get_kwargs_with_params_to_exclude(
            kwargs, params_to_exclude)
        # Log what is about to be executed.
        logger.debug(
            "Executing task %s with kwargs %s" % (self, kwargs_to_display)
        )

        return_value = self._main(**kwargs)
        # If the task is the final task, then set the TransferFuture's
        # value to the return value from main().
        if self._is_final:
            self._transfer_coordinator.set_result(return_value)
        return return_value

    def _log_and_set_exception(self, exception):
        # If an exception is ever thrown than set the exception for the
        # entire TransferFuture.
        logger.debug("Exception raised.", exc_info=True)
        self._transfer_coordinator.set_exception(exception)

    def _main(self, **kwargs):
        """The method that will be ran in the executor

        This method must be implemented by subclasses from Task. main() can
        be implemented with any arguments decided upon by the subclass.
        """
        raise NotImplementedError('_main() must be implemented')

    def _wait_on_dependent_futures(self):
        # Gather all of the futures into that main() depends on.
        futures_to_wait_on = []
        for _, future in self._pending_main_kwargs.items():
            # If the pending main keyword arg is a list then extend the list.
            if isinstance(future, list):
                futures_to_wait_on.extend(future)
            # If the pending main keword arg is a future append it to the list.
            else:
                futures_to_wait_on.append(future)
        # Now wait for all of the futures to complete.
        self._wait_until_all_complete(futures_to_wait_on)

    def _wait_until_all_complete(self, futures):
        # This is a basic implementation of the concurrent.futures.wait()
        #
        # concurrent.futures.wait() is not used instead because of this
        # reported issue: https://bugs.python.org/issue20319.
        # The issue would occassionally cause multipart uploads to hang
        # when wait() was called. With this approach, it avoids the
        # concurrency bug by removing any association with concurrent.futures
        # implementation of waiters.
        logger.debug(
            '%s about to wait for the following futures %s', self, futures)
        for future in futures:
            try:
                logger.debug('%s about to wait for %s', self, future)
                future.result()
            except Exception:
                # result() can also produce exceptions. We want to ignore
                # these to be deffered to error handling down the road.
                pass
        logger.debug('%s done waiting for dependent futures', self)

    def _get_all_main_kwargs(self):
        # Copy over all of the kwargs that we know is available.
        kwargs = copy.copy(self._main_kwargs)

        # Iterate through the kwargs whose values are pending on the result
        # of a future.
        for key, pending_value in self._pending_main_kwargs.items():
            # If the value is a list of futures, iterate though the list
            # appending on the result from each future.
            if isinstance(pending_value, list):
                result = []
                for future in pending_value:
                    result.append(future.result())
            # Otherwise if the pending_value is a future, just wait for it.
            else:
                result = pending_value.result()
            # Add the retrieved value to the kwargs to be sent to the
            # main() call.
            kwargs[key] = result
        return kwargs


class SubmissionTask(Task):
    """A base class for any submission task

    Submission tasks are the top-level task used to submit a series of tasks
    to execute a particular transfer.
    """
    def _main(self, transfer_future, **kwargs):
        """
        :type transfer_future: s3transfer.futures.TransferFuture
        :param transfer_future: The transfer future associated with the
            transfer request that tasks are being submitted for

        :param kwargs: Any additional kwargs that you may want to pass
            to the _submit() method
        """
        try:
            self._transfer_coordinator.set_status_to_queued()

            # Before submitting any tasks, run all of the on_queued callbacks
            on_queued_callbacks = get_callbacks(transfer_future, 'queued')
            for on_queued_callback in on_queued_callbacks:
                on_queued_callback()

            # Once callbacks have been ran set the status to running.
            self._transfer_coordinator.set_status_to_running()

            # Call the submit method to start submitting tasks to execute the
            # transfer.
            self._submit(transfer_future=transfer_future, **kwargs)
        except BaseException as e:
            # If there was an exception raised during the submission of task
            # there is a chance that the final task that signals if a transfer
            # is done and too run the cleanup may never have been submitted in
            # the first place so we need to account accordingly.
            #
            # Note that BaseException is caught, instead of Exception, because
            # for some implmentations of executors, specifically the serial
            # implementation, the SubmissionTask is directly exposed to
            # KeyboardInterupts and so needs to cleanup and signal done
            # for those as well.

            # Set the exception, that caused the process to fail.
            self._log_and_set_exception(e)

            # Wait for all possibly associated futures that may have spawned
            # from this submission task have finished before we anounce the
            # transfer done.
            self._wait_for_all_submitted_futures_to_complete()

            # Announce the transfer as done, which will run any cleanups
            # and done callbacks as well.
            self._transfer_coordinator.announce_done()

    def _submit(self, transfer_future, **kwargs):
        """The submition method to be implemented

        :type transfer_future: s3transfer.futures.TransferFuture
        :param transfer_future: The transfer future associated with the
            transfer request that tasks are being submitted for

        :param kwargs: Any additional keyword arguments you want to be passed
            in
        """
        raise NotImplementedError('_submit() must be implemented')

    def _wait_for_all_submitted_futures_to_complete(self):
        # We want to wait for all futures that were submitted to
        # complete as we do not want the cleanup callbacks or done callbacks
        # to be called to early. The main problem is any task that was
        # submitted may have submitted even more during its process and so
        # we need to account accordingly.

        # First get all of the futures that were submitted up to this point.
        submitted_futures = self._transfer_coordinator.associated_futures
        while submitted_futures:
            # Wait for those futures to complete.
            self._wait_until_all_complete(submitted_futures)
            # However, more futures may have been submitted as we waited so
            # we need to check again for any more associated futures.
            possibly_more_submitted_futures = \
                self._transfer_coordinator.associated_futures
            # If the current list of submitted futures is equal to the
            # the list of associated futures for when after the wait completes,
            # we can ensure no more futures were submitted in waiting on
            # the current list of futures to complete ultimately meaning all
            # futures that may have spawned from the original submission task
            # have completed.
            if submitted_futures == possibly_more_submitted_futures:
                break
            submitted_futures = possibly_more_submitted_futures


class CreateMultipartUploadTask(Task):
    """Task to initiate a multipart upload"""
    def _main(self, client, bucket, key, extra_args):
        """
        :param client: The client to use when calling CreateMultipartUpload
        :param bucket: The name of the bucket to upload to
        :param key: The name of the key to upload to
        :param extra_args: A dictionary of any extra arguments that may be
            used in the intialization.

        :returns: The upload id of the multipart upload
        """
        # Create the multipart upload.
        response = client.create_multipart_upload(
            Bucket=bucket, Key=key, **extra_args)
        upload_id = response['UploadId']

        # Add a cleanup if the multipart upload fails at any point.
        self._transfer_coordinator.add_failure_cleanup(
            client.abort_multipart_upload, Bucket=bucket, Key=key,
            UploadId=upload_id
        )
        return upload_id


class CompleteMultipartUploadTask(Task):
    """Task to complete a multipart upload"""
    def _main(self, client, bucket, key, upload_id, parts, extra_args):
        """
        :param client: The client to use when calling CompleteMultipartUpload
        :param bucket: The name of the bucket to upload to
        :param key: The name of the key to upload to
        :param upload_id: The id of the upload
        :param parts: A list of parts to use to complete the multipart upload::

            [{'Etag': etag_value, 'PartNumber': part_number}, ...]

            Each element in the list consists of a return value from
            ``UploadPartTask.main()``.
        :param extra_args:  A dictionary of any extra arguments that may be
            used in completing the multipart transfer.
        """
        client.complete_multipart_upload(
            Bucket=bucket, Key=key, UploadId=upload_id,
            MultipartUpload={'Parts': parts},
            **extra_args)