/usr/lib/python3/dist-packages/twisted/internet/task.py is in python3-twisted-experimental 13.2.0-0ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 | # -*- test-case-name: twisted.test.test_task,twisted.test.test_cooperator -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Scheduling utility methods and classes.
"""
from __future__ import division, absolute_import
__metaclass__ = type
import sys
import time
from zope.interface import implementer
from twisted.python import log
from twisted.python import _reflectpy3 as reflect
from twisted.python.failure import Failure
from twisted.internet import base, defer
from twisted.internet.interfaces import IReactorTime
from twisted.internet.error import ReactorNotRunning
class LoopingCall:
"""Call a function repeatedly.
If C{f} returns a deferred, rescheduling will not take place until the
deferred has fired. The result value is ignored.
@ivar f: The function to call.
@ivar a: A tuple of arguments to pass the function.
@ivar kw: A dictionary of keyword arguments to pass to the function.
@ivar clock: A provider of
L{twisted.internet.interfaces.IReactorTime}. The default is
L{twisted.internet.reactor}. Feel free to set this to
something else, but it probably ought to be set *before*
calling L{start}.
@type running: C{bool}
@ivar running: A flag which is C{True} while C{f} is scheduled to be called
(or is currently being called). It is set to C{True} when L{start} is
called and set to C{False} when L{stop} is called or if C{f} raises an
exception. In either case, it will be C{False} by the time the
C{Deferred} returned by L{start} fires its callback or errback.
@type _expectNextCallAt: C{float}
@ivar _expectNextCallAt: The time at which this instance most recently
scheduled itself to run.
@type _realLastTime: C{float}
@ivar _realLastTime: When counting skips, the time at which the skip
counter was last invoked.
@type _runAtStart: C{bool}
@ivar _runAtStart: A flag indicating whether the 'now' argument was passed
to L{LoopingCall.start}.
"""
call = None
running = False
deferred = None
interval = None
_expectNextCallAt = 0.0
_runAtStart = False
starttime = None
def __init__(self, f, *a, **kw):
self.f = f
self.a = a
self.kw = kw
from twisted.internet import reactor
self.clock = reactor
def withCount(cls, countCallable):
"""
An alternate constructor for L{LoopingCall} that makes available the
number of calls which should have occurred since it was last invoked.
Note that this number is an C{int} value; It represents the discrete
number of calls that should have been made. For example, if you are
using a looping call to display an animation with discrete frames, this
number would be the number of frames to advance.
The count is normally 1, but can be higher. For example, if the reactor
is blocked and takes too long to invoke the L{LoopingCall}, a Deferred
returned from a previous call is not fired before an interval has
elapsed, or if the callable itself blocks for longer than an interval,
preventing I{itself} from being called.
@param countCallable: A callable that will be invoked each time the
resulting LoopingCall is run, with an integer specifying the number
of calls that should have been invoked.
@type countCallable: 1-argument callable which takes an C{int}
@return: An instance of L{LoopingCall} with call counting enabled,
which provides the count as the first positional argument.
@rtype: L{LoopingCall}
@since: 9.0
"""
def counter():
now = self.clock.seconds()
lastTime = self._realLastTime
if lastTime is None:
lastTime = self.starttime
if self._runAtStart:
lastTime -= self.interval
self._realLastTime = now
lastInterval = self._intervalOf(lastTime)
thisInterval = self._intervalOf(now)
count = thisInterval - lastInterval
return countCallable(count)
self = cls(counter)
self._realLastTime = None
return self
withCount = classmethod(withCount)
def _intervalOf(self, t):
"""
Determine the number of intervals passed as of the given point in
time.
@param t: The specified time (from the start of the L{LoopingCall}) to
be measured in intervals
@return: The C{int} number of intervals which have passed as of the
given point in time.
"""
elapsedTime = t - self.starttime
intervalNum = int(elapsedTime / self.interval)
return intervalNum
def start(self, interval, now=True):
"""
Start running function every interval seconds.
@param interval: The number of seconds between calls. May be
less than one. Precision will depend on the underlying
platform, the available hardware, and the load on the system.
@param now: If True, run this call right now. Otherwise, wait
until the interval has elapsed before beginning.
@return: A Deferred whose callback will be invoked with
C{self} when C{self.stop} is called, or whose errback will be
invoked when the function raises an exception or returned a
deferred that has its errback invoked.
"""
assert not self.running, ("Tried to start an already running "
"LoopingCall.")
if interval < 0:
raise ValueError("interval must be >= 0")
self.running = True
d = self.deferred = defer.Deferred()
self.starttime = self.clock.seconds()
self._expectNextCallAt = self.starttime
self.interval = interval
self._runAtStart = now
if now:
self()
else:
self._reschedule()
return d
def stop(self):
"""Stop running function.
"""
assert self.running, ("Tried to stop a LoopingCall that was "
"not running.")
self.running = False
if self.call is not None:
self.call.cancel()
self.call = None
d, self.deferred = self.deferred, None
d.callback(self)
def reset(self):
"""
Skip the next iteration and reset the timer.
@since: 11.1
"""
assert self.running, ("Tried to reset a LoopingCall that was "
"not running.")
if self.call is not None:
self.call.cancel()
self.call = None
self._expectNextCallAt = self.clock.seconds()
self._reschedule()
def __call__(self):
def cb(result):
if self.running:
self._reschedule()
else:
d, self.deferred = self.deferred, None
d.callback(self)
def eb(failure):
self.running = False
d, self.deferred = self.deferred, None
d.errback(failure)
self.call = None
d = defer.maybeDeferred(self.f, *self.a, **self.kw)
d.addCallback(cb)
d.addErrback(eb)
def _reschedule(self):
"""
Schedule the next iteration of this looping call.
"""
if self.interval == 0:
self.call = self.clock.callLater(0, self)
return
currentTime = self.clock.seconds()
# Find how long is left until the interval comes around again.
untilNextTime = (self._expectNextCallAt - currentTime) % self.interval
# Make sure it is in the future, in case more than one interval worth
# of time passed since the previous call was made.
nextTime = max(
self._expectNextCallAt + self.interval, currentTime + untilNextTime)
# If the interval falls on the current time exactly, skip it and
# schedule the call for the next interval.
if nextTime == currentTime:
nextTime += self.interval
self._expectNextCallAt = nextTime
self.call = self.clock.callLater(nextTime - currentTime, self)
def __repr__(self):
if hasattr(self.f, '__qualname__'):
func = self.f.__qualname__
elif hasattr(self.f, '__name__'):
func = self.f.__name__
if hasattr(self.f, 'im_class'):
func = self.f.im_class.__name__ + '.' + func
else:
func = reflect.safe_repr(self.f)
return 'LoopingCall<%r>(%s, *%s, **%s)' % (
self.interval, func, reflect.safe_repr(self.a),
reflect.safe_repr(self.kw))
class SchedulerError(Exception):
"""
The operation could not be completed because the scheduler or one of its
tasks was in an invalid state. This exception should not be raised
directly, but is a superclass of various scheduler-state-related
exceptions.
"""
class SchedulerStopped(SchedulerError):
"""
The operation could not complete because the scheduler was stopped in
progress or was already stopped.
"""
class TaskFinished(SchedulerError):
"""
The operation could not complete because the task was already completed,
stopped, encountered an error or otherwise permanently stopped running.
"""
class TaskDone(TaskFinished):
"""
The operation could not complete because the task was already completed.
"""
class TaskStopped(TaskFinished):
"""
The operation could not complete because the task was stopped.
"""
class TaskFailed(TaskFinished):
"""
The operation could not complete because the task died with an unhandled
error.
"""
class NotPaused(SchedulerError):
"""
This exception is raised when a task is resumed which was not previously
paused.
"""
class _Timer(object):
MAX_SLICE = 0.01
def __init__(self):
self.end = time.time() + self.MAX_SLICE
def __call__(self):
return time.time() >= self.end
_EPSILON = 0.00000001
def _defaultScheduler(x):
from twisted.internet import reactor
return reactor.callLater(_EPSILON, x)
class CooperativeTask(object):
"""
A L{CooperativeTask} is a task object inside a L{Cooperator}, which can be
paused, resumed, and stopped. It can also have its completion (or
termination) monitored.
@see: L{Cooperator.cooperate}
@ivar _iterator: the iterator to iterate when this L{CooperativeTask} is
asked to do work.
@ivar _cooperator: the L{Cooperator} that this L{CooperativeTask}
participates in, which is used to re-insert it upon resume.
@ivar _deferreds: the list of L{defer.Deferred}s to fire when this task
completes, fails, or finishes.
@type _deferreds: C{list}
@type _cooperator: L{Cooperator}
@ivar _pauseCount: the number of times that this L{CooperativeTask} has
been paused; if 0, it is running.
@type _pauseCount: C{int}
@ivar _completionState: The completion-state of this L{CooperativeTask}.
C{None} if the task is not yet completed, an instance of L{TaskStopped}
if C{stop} was called to stop this task early, of L{TaskFailed} if the
application code in the iterator raised an exception which caused it to
terminate, and of L{TaskDone} if it terminated normally via raising
C{StopIteration}.
@type _completionState: L{TaskFinished}
"""
def __init__(self, iterator, cooperator):
"""
A private constructor: to create a new L{CooperativeTask}, see
L{Cooperator.cooperate}.
"""
self._iterator = iterator
self._cooperator = cooperator
self._deferreds = []
self._pauseCount = 0
self._completionState = None
self._completionResult = None
cooperator._addTask(self)
def whenDone(self):
"""
Get a L{defer.Deferred} notification of when this task is complete.
@return: a L{defer.Deferred} that fires with the C{iterator} that this
L{CooperativeTask} was created with when the iterator has been
exhausted (i.e. its C{next} method has raised C{StopIteration}), or
fails with the exception raised by C{next} if it raises some other
exception.
@rtype: L{defer.Deferred}
"""
d = defer.Deferred()
if self._completionState is None:
self._deferreds.append(d)
else:
d.callback(self._completionResult)
return d
def pause(self):
"""
Pause this L{CooperativeTask}. Stop doing work until
L{CooperativeTask.resume} is called. If C{pause} is called more than
once, C{resume} must be called an equal number of times to resume this
task.
@raise TaskFinished: if this task has already finished or completed.
"""
self._checkFinish()
self._pauseCount += 1
if self._pauseCount == 1:
self._cooperator._removeTask(self)
def resume(self):
"""
Resume processing of a paused L{CooperativeTask}.
@raise NotPaused: if this L{CooperativeTask} is not paused.
"""
if self._pauseCount == 0:
raise NotPaused()
self._pauseCount -= 1
if self._pauseCount == 0 and self._completionState is None:
self._cooperator._addTask(self)
def _completeWith(self, completionState, deferredResult):
"""
@param completionState: a L{TaskFinished} exception or a subclass
thereof, indicating what exception should be raised when subsequent
operations are performed.
@param deferredResult: the result to fire all the deferreds with.
"""
self._completionState = completionState
self._completionResult = deferredResult
if not self._pauseCount:
self._cooperator._removeTask(self)
# The Deferreds need to be invoked after all this is completed, because
# a Deferred may want to manipulate other tasks in a Cooperator. For
# example, if you call "stop()" on a cooperator in a callback on a
# Deferred returned from whenDone(), this CooperativeTask must be gone
# from the Cooperator by that point so that _completeWith is not
# invoked reentrantly; that would cause these Deferreds to blow up with
# an AlreadyCalledError, or the _removeTask to fail with a ValueError.
for d in self._deferreds:
d.callback(deferredResult)
def stop(self):
"""
Stop further processing of this task.
@raise TaskFinished: if this L{CooperativeTask} has previously
completed, via C{stop}, completion, or failure.
"""
self._checkFinish()
self._completeWith(TaskStopped(), Failure(TaskStopped()))
def _checkFinish(self):
"""
If this task has been stopped, raise the appropriate subclass of
L{TaskFinished}.
"""
if self._completionState is not None:
raise self._completionState
def _oneWorkUnit(self):
"""
Perform one unit of work for this task, retrieving one item from its
iterator, stopping if there are no further items in the iterator, and
pausing if the result was a L{defer.Deferred}.
"""
try:
result = next(self._iterator)
except StopIteration:
self._completeWith(TaskDone(), self._iterator)
except:
self._completeWith(TaskFailed(), Failure())
else:
if isinstance(result, defer.Deferred):
self.pause()
def failLater(f):
self._completeWith(TaskFailed(), f)
result.addCallbacks(lambda result: self.resume(),
failLater)
class Cooperator(object):
"""
Cooperative task scheduler.
A cooperative task is an iterator where each iteration represents an
atomic unit of work. When the iterator yields, it allows the
L{Cooperator} to decide which of its tasks to execute next. If the
iterator yields a L{defer.Deferred} then work will pause until the
L{defer.Deferred} fires and completes its callback chain.
When a L{Cooperator} has more than one task, it distributes work between
all tasks.
There are two ways to add tasks to a L{Cooperator}, L{cooperate} and
L{coiterate}. L{cooperate} is the more useful of the two, as it returns a
L{CooperativeTask}, which can be L{paused<CooperativeTask.pause>},
L{resumed<CooperativeTask.resume>} and L{waited
on<CooperativeTask.whenDone>}. L{coiterate} has the same effect, but
returns only a L{defer.Deferred} that fires when the task is done.
L{Cooperator} can be used for many things, including but not limited to:
- running one or more computationally intensive tasks without blocking
- limiting parallelism by running a subset of the total tasks
simultaneously
- doing one thing, waiting for a L{Deferred<defer.Deferred>} to fire,
doing the next thing, repeat (i.e. serializing a sequence of
asynchronous tasks)
Multiple L{Cooperator}s do not cooperate with each other, so for most
cases you should use the L{global cooperator<task.cooperate>}.
"""
def __init__(self,
terminationPredicateFactory=_Timer,
scheduler=_defaultScheduler,
started=True):
"""
Create a scheduler-like object to which iterators may be added.
@param terminationPredicateFactory: A no-argument callable which will
be invoked at the beginning of each step and should return a
no-argument callable which will return True when the step should be
terminated. The default factory is time-based and allows iterators to
run for 1/100th of a second at a time.
@param scheduler: A one-argument callable which takes a no-argument
callable and should invoke it at some future point. This will be used
to schedule each step of this Cooperator.
@param started: A boolean which indicates whether iterators should be
stepped as soon as they are added, or if they will be queued up until
L{Cooperator.start} is called.
"""
self._tasks = []
self._metarator = iter(())
self._terminationPredicateFactory = terminationPredicateFactory
self._scheduler = scheduler
self._delayedCall = None
self._stopped = False
self._started = started
def coiterate(self, iterator, doneDeferred=None):
"""
Add an iterator to the list of iterators this L{Cooperator} is
currently running.
Equivalent to L{cooperate}, but returns a L{defer.Deferred} that will
be fired when the task is done.
@param doneDeferred: If specified, this will be the Deferred used as
the completion deferred. It is suggested that you use the default,
which creates a new Deferred for you.
@return: a Deferred that will fire when the iterator finishes.
"""
if doneDeferred is None:
doneDeferred = defer.Deferred()
CooperativeTask(iterator, self).whenDone().chainDeferred(doneDeferred)
return doneDeferred
def cooperate(self, iterator):
"""
Start running the given iterator as a long-running cooperative task, by
calling next() on it as a periodic timed event.
@param iterator: the iterator to invoke.
@return: a L{CooperativeTask} object representing this task.
"""
return CooperativeTask(iterator, self)
def _addTask(self, task):
"""
Add a L{CooperativeTask} object to this L{Cooperator}.
"""
if self._stopped:
self._tasks.append(task) # XXX silly, I know, but _completeWith
# does the inverse
task._completeWith(SchedulerStopped(), Failure(SchedulerStopped()))
else:
self._tasks.append(task)
self._reschedule()
def _removeTask(self, task):
"""
Remove a L{CooperativeTask} from this L{Cooperator}.
"""
self._tasks.remove(task)
# If no work left to do, cancel the delayed call:
if not self._tasks and self._delayedCall:
self._delayedCall.cancel()
self._delayedCall = None
def _tasksWhileNotStopped(self):
"""
Yield all L{CooperativeTask} objects in a loop as long as this
L{Cooperator}'s termination condition has not been met.
"""
terminator = self._terminationPredicateFactory()
while self._tasks:
for t in self._metarator:
yield t
if terminator():
return
self._metarator = iter(self._tasks)
def _tick(self):
"""
Run one scheduler tick.
"""
self._delayedCall = None
for taskObj in self._tasksWhileNotStopped():
taskObj._oneWorkUnit()
self._reschedule()
_mustScheduleOnStart = False
def _reschedule(self):
if not self._started:
self._mustScheduleOnStart = True
return
if self._delayedCall is None and self._tasks:
self._delayedCall = self._scheduler(self._tick)
def start(self):
"""
Begin scheduling steps.
"""
self._stopped = False
self._started = True
if self._mustScheduleOnStart:
del self._mustScheduleOnStart
self._reschedule()
def stop(self):
"""
Stop scheduling steps. Errback the completion Deferreds of all
iterators which have been added and forget about them.
"""
self._stopped = True
for taskObj in self._tasks:
taskObj._completeWith(SchedulerStopped(),
Failure(SchedulerStopped()))
self._tasks = []
if self._delayedCall is not None:
self._delayedCall.cancel()
self._delayedCall = None
@property
def running(self):
"""
Is this L{Cooperator} is currently running?
@return: C{True} if the L{Cooperator} is running, C{False} otherwise.
@rtype: C{bool}
"""
return (self._started and not self._stopped)
_theCooperator = Cooperator()
def coiterate(iterator):
"""
Cooperatively iterate over the given iterator, dividing runtime between it
and all other iterators which have been passed to this function and not yet
exhausted.
@param iterator: the iterator to invoke.
@return: a Deferred that will fire when the iterator finishes.
"""
return _theCooperator.coiterate(iterator)
def cooperate(iterator):
"""
Start running the given iterator as a long-running cooperative task, by
calling next() on it as a periodic timed event.
This is very useful if you have computationally expensive tasks that you
want to run without blocking the reactor. Just break each task up so that
it yields frequently, pass it in here and the global L{Cooperator} will
make sure work is distributed between them without blocking longer than a
single iteration of a single task.
@param iterator: the iterator to invoke.
@return: a L{CooperativeTask} object representing this task.
"""
return _theCooperator.cooperate(iterator)
@implementer(IReactorTime)
class Clock:
"""
Provide a deterministic, easily-controlled implementation of
L{IReactorTime.callLater}. This is commonly useful for writing
deterministic unit tests for code which schedules events using this API.
"""
rightNow = 0.0
def __init__(self):
self.calls = []
def seconds(self):
"""
Pretend to be time.time(). This is used internally when an operation
such as L{IDelayedCall.reset} needs to determine a a time value
relative to the current time.
@rtype: C{float}
@return: The time which should be considered the current time.
"""
return self.rightNow
def _sortCalls(self):
"""
Sort the pending calls according to the time they are scheduled.
"""
self.calls.sort(key=lambda a: a.getTime())
def callLater(self, when, what, *a, **kw):
"""
See L{twisted.internet.interfaces.IReactorTime.callLater}.
"""
dc = base.DelayedCall(self.seconds() + when,
what, a, kw,
self.calls.remove,
lambda c: None,
self.seconds)
self.calls.append(dc)
self._sortCalls()
return dc
def getDelayedCalls(self):
"""
See L{twisted.internet.interfaces.IReactorTime.getDelayedCalls}
"""
return self.calls
def advance(self, amount):
"""
Move time on this clock forward by the given amount and run whatever
pending calls should be run.
@type amount: C{float}
@param amount: The number of seconds which to advance this clock's
time.
"""
self.rightNow += amount
self._sortCalls()
while self.calls and self.calls[0].getTime() <= self.seconds():
call = self.calls.pop(0)
call.called = 1
call.func(*call.args, **call.kw)
self._sortCalls()
def pump(self, timings):
"""
Advance incrementally by the given set of times.
@type timings: iterable of C{float}
"""
for amount in timings:
self.advance(amount)
def deferLater(clock, delay, callable, *args, **kw):
"""
Call the given function after a certain period of time has passed.
@type clock: L{IReactorTime} provider
@param clock: The object which will be used to schedule the delayed
call.
@type delay: C{float} or C{int}
@param delay: The number of seconds to wait before calling the function.
@param callable: The object to call after the delay.
@param *args: The positional arguments to pass to C{callable}.
@param **kw: The keyword arguments to pass to C{callable}.
@rtype: L{defer.Deferred}
@return: A deferred that fires with the result of the callable when the
specified time has elapsed.
"""
def deferLaterCancel(deferred):
delayedCall.cancel()
d = defer.Deferred(deferLaterCancel)
d.addCallback(lambda ignored: callable(*args, **kw))
delayedCall = clock.callLater(delay, d.callback, None)
return d
def react(main, argv=(), _reactor=None):
"""
Call C{main} and run the reactor until the L{Deferred} it returns fires.
This is intended as the way to start up an application with a well-defined
completion condition. Use it to write clients or one-off asynchronous
operations. Prefer this to calling C{reactor.run} directly, as this
function will also:
- Take care to call C{reactor.stop} once and only once, and at the right
time.
- Log any failures from the C{Deferred} returned by C{main}.
- Exit the application when done, with exit code 0 in case of success and
1 in case of failure. If C{main} fails with a C{SystemExit} error, the
code returned is used.
@param main: A callable which returns a L{Deferred}. It should take as
many arguments as there are elements in the list C{argv}.
@param argv: A list of arguments to pass to C{main}. If omitted the
callable will be invoked with no additional arguments.
@param _reactor: An implementation detail to allow easier unit testing. Do
not supply this parameter.
@since: 12.3
"""
if _reactor is None:
from twisted.internet import reactor as _reactor
finished = main(_reactor, *argv)
codes = [0]
stopping = []
_reactor.addSystemEventTrigger('before', 'shutdown', stopping.append, True)
def stop(result, stopReactor):
if stopReactor:
try:
_reactor.stop()
except ReactorNotRunning:
pass
if isinstance(result, Failure):
if result.check(SystemExit) is not None:
code = result.value.code
else:
log.err(result, "main function encountered error")
code = 1
codes[0] = code
def cbFinish(result):
if stopping:
stop(result, False)
else:
_reactor.callWhenRunning(stop, result, True)
finished.addBoth(cbFinish)
_reactor.run()
sys.exit(codes[0])
__all__ = [
'LoopingCall',
'Clock',
'SchedulerStopped', 'Cooperator', 'coiterate',
'deferLater', 'react']
|