/usr/lib/python2.7/dist-packages/gevent/hub.py is in python-gevent 1.2.2-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 | # Copyright (c) 2009-2015 Denis Bilenko. See LICENSE for details.
"""
Event-loop hub.
"""
from __future__ import absolute_import
# XXX: FIXME: Refactor to make this smaller
# pylint:disable=too-many-lines
from functools import partial as _functools_partial
import os
import sys
import traceback
from greenlet import greenlet as RawGreenlet, getcurrent, GreenletExit
__all__ = [
'getcurrent',
'GreenletExit',
'spawn_raw',
'sleep',
'kill',
'signal',
'reinit',
'get_hub',
'Hub',
'Waiter',
]
from gevent._compat import string_types
from gevent._compat import xrange
from gevent._util import _NONE
from gevent._util import readproperty
if sys.version_info[0] <= 2:
import thread # pylint:disable=import-error
else:
import _thread as thread # python 2 pylint:disable=import-error
# These must be the "real" native thread versions,
# not monkey-patched.
threadlocal = thread._local
class _threadlocal(threadlocal):
def __init__(self):
# Use a class with an initializer so that we can test
# for 'is None' instead of catching AttributeError, making
# the code cleaner and possibly solving some corner cases
# (like #687)
threadlocal.__init__(self)
self.Hub = None
self.loop = None
self.hub = None
_threadlocal = _threadlocal()
get_ident = thread.get_ident
MAIN_THREAD = get_ident()
class LoopExit(Exception):
"""
Exception thrown when the hub finishes running.
In a normal application, this is never thrown or caught
explicitly. The internal implementation of functions like
:func:`join` and :func:`joinall` may catch it, but user code
generally should not.
.. caution::
Errors in application programming can also lead to this exception being
raised. Some examples include (but are not limited too):
- greenlets deadlocking on a lock;
- using a socket or other gevent object with native thread
affinity from a different thread
"""
pass
class BlockingSwitchOutError(AssertionError):
pass
class InvalidSwitchError(AssertionError):
pass
class ConcurrentObjectUseError(AssertionError):
# raised when an object is used (waited on) by two greenlets
# independently, meaning the object was entered into a blocking
# state by one greenlet and then another while still blocking in the
# first one
pass
def spawn_raw(function, *args, **kwargs):
"""
Create a new :class:`greenlet.greenlet` object and schedule it to
run ``function(*args, **kwargs)``.
This returns a raw :class:`~greenlet.greenlet` which does not have all the useful
methods that :class:`gevent.Greenlet` has. Typically, applications
should prefer :func:`~gevent.spawn`, but this method may
occasionally be useful as an optimization if there are many
greenlets involved.
.. versionchanged:: 1.1b1
If *function* is not callable, immediately raise a :exc:`TypeError`
instead of spawning a greenlet that will raise an uncaught TypeError.
.. versionchanged:: 1.1rc2
Accept keyword arguments for ``function`` as previously (incorrectly)
documented. Note that this may incur an additional expense.
.. versionchanged:: 1.1a3
Verify that ``function`` is callable, raising a TypeError if not. Previously,
the spawned greenlet would have failed the first time it was switched to.
"""
if not callable(function):
raise TypeError("function must be callable")
hub = get_hub()
# The callback class object that we use to run this doesn't
# accept kwargs (and those objects are heavily used, as well as being
# implemented twice in core.ppyx and corecffi.py) so do it with a partial
if kwargs:
function = _functools_partial(function, *args, **kwargs)
g = RawGreenlet(function, hub)
hub.loop.run_callback(g.switch)
else:
g = RawGreenlet(function, hub)
hub.loop.run_callback(g.switch, *args)
return g
def sleep(seconds=0, ref=True):
"""
Put the current greenlet to sleep for at least *seconds*.
*seconds* may be specified as an integer, or a float if fractional
seconds are desired.
.. tip:: In the current implementation, a value of 0 (the default)
means to yield execution to any other runnable greenlets, but
this greenlet may be scheduled again before the event loop
cycles (in an extreme case, a greenlet that repeatedly sleeps
with 0 can prevent greenlets that are ready to do I/O from
being scheduled for some (small) period of time); a value greater than
0, on the other hand, will delay running this greenlet until
the next iteration of the loop.
If *ref* is False, the greenlet running ``sleep()`` will not prevent :func:`gevent.wait`
from exiting.
.. seealso:: :func:`idle`
"""
hub = get_hub()
loop = hub.loop
if seconds <= 0:
waiter = Waiter()
loop.run_callback(waiter.switch)
waiter.get()
else:
hub.wait(loop.timer(seconds, ref=ref))
def idle(priority=0):
"""
Cause the calling greenlet to wait until the event loop is idle.
Idle is defined as having no other events of the same or higher
*priority* pending. That is, as long as sockets, timeouts or even
signals of the same or higher priority are being processed, the loop
is not idle.
.. seealso:: :func:`sleep`
"""
hub = get_hub()
watcher = hub.loop.idle()
if priority:
watcher.priority = priority
hub.wait(watcher)
def kill(greenlet, exception=GreenletExit):
"""
Kill greenlet asynchronously. The current greenlet is not unscheduled.
.. note::
The method :meth:`Greenlet.kill` method does the same and
more (and the same caveats listed there apply here). However, the MAIN
greenlet - the one that exists initially - does not have a
``kill()`` method, and neither do any created with :func:`spawn_raw`,
so you have to use this function.
.. caution:: Use care when killing greenlets. If they are not prepared for
exceptions, this could result in corrupted state.
.. versionchanged:: 1.1a2
If the ``greenlet`` has a :meth:`kill <Greenlet.kill>` method, calls it. This prevents a
greenlet from being switched to for the first time after it's been
killed but not yet executed.
"""
if not greenlet.dead:
if hasattr(greenlet, 'kill'):
# dealing with gevent.greenlet.Greenlet. Use it, especially
# to avoid allowing one to be switched to for the first time
# after it's been killed
greenlet.kill(exception=exception, block=False)
else:
get_hub().loop.run_callback(greenlet.throw, exception)
class signal(object):
"""
Call the *handler* with the *args* and *kwargs* when the process
receives the signal *signalnum*.
The *handler* will be run in a new greenlet when the signal is delivered.
This returns an object with the useful method ``cancel``, which, when called,
will prevent future deliveries of *signalnum* from calling *handler*.
.. note::
This may not operate correctly with SIGCHLD if libev child watchers
are used (as they are by default with os.fork).
.. versionchanged:: 1.2a1
The ``handler`` argument is required to be callable at construction time.
"""
# XXX: This is manually documented in gevent.rst while it is aliased in
# the gevent module.
greenlet_class = None
def __init__(self, signalnum, handler, *args, **kwargs):
if not callable(handler):
raise TypeError("signal handler must be callable.")
self.hub = get_hub()
self.watcher = self.hub.loop.signal(signalnum, ref=False)
self.watcher.start(self._start)
self.handler = handler
self.args = args
self.kwargs = kwargs
if self.greenlet_class is None:
from gevent import Greenlet
self.greenlet_class = Greenlet
def _get_ref(self):
return self.watcher.ref
def _set_ref(self, value):
self.watcher.ref = value
ref = property(_get_ref, _set_ref)
del _get_ref, _set_ref
def cancel(self):
self.watcher.stop()
def _start(self):
try:
greenlet = self.greenlet_class(self.handle)
greenlet.switch()
except: # pylint:disable=bare-except
self.hub.handle_error(None, *sys._exc_info()) # pylint:disable=no-member
def handle(self):
try:
self.handler(*self.args, **self.kwargs)
except: # pylint:disable=bare-except
self.hub.handle_error(None, *sys.exc_info())
def reinit():
"""
Prepare the gevent hub to run in a new (forked) process.
This should be called *immediately* after :func:`os.fork` in the
child process. This is done automatically by
:func:`gevent.os.fork` or if the :mod:`os` module has been
monkey-patched. If this function is not called in a forked
process, symptoms may include hanging of functions like
:func:`socket.getaddrinfo`, and the hub's threadpool is unlikely
to work.
.. note:: Registered fork watchers may or may not run before
this function (and thus ``gevent.os.fork``) return. If they have
not run, they will run "soon", after an iteration of the event loop.
You can force this by inserting a few small (but non-zero) calls to :func:`sleep`
after fork returns. (As of gevent 1.1 and before, fork watchers will
not have run, but this may change in the future.)
.. note:: This function may be removed in a future major release
if the fork process can be more smoothly managed.
.. warning:: See remarks in :func:`gevent.os.fork` about greenlets
and libev watchers in the child process.
"""
# The loop reinit function in turn calls libev's ev_loop_fork
# function.
hub = _get_hub()
if hub is not None:
# Note that we reinit the existing loop, not destroy it.
# See https://github.com/gevent/gevent/issues/200.
hub.loop.reinit()
# libev's fork watchers are slow to fire because the only fire
# at the beginning of a loop; due to our use of callbacks that
# run at the end of the loop, that may be too late. The
# threadpool and resolvers depend on the fork handlers being
# run (specifically, the threadpool will fail in the forked
# child if there were any threads in it, which there will be
# if the resolver_thread was in use (the default) before the
# fork.)
#
# If the forked process wants to use the threadpool or
# resolver immediately (in a queued callback), it would hang.
#
# The below is a workaround. Fortunately, both of these
# methods are idempotent and can be called multiple times
# following a fork if the suddenly started working, or were
# already working on some platforms. Other threadpools and fork handlers
# will be called at an arbitrary time later ('soon')
if hasattr(hub.threadpool, '_on_fork'):
hub.threadpool._on_fork()
# resolver_ares also has a fork watcher that's not firing
if hasattr(hub.resolver, '_on_fork'):
hub.resolver._on_fork()
# TODO: We'd like to sleep for a non-zero amount of time to force the loop to make a
# pass around before returning to this greenlet. That will allow any
# user-provided fork watchers to run. (Two calls are necessary.) HOWEVER, if
# we do this, certain tests that heavily mix threads and forking,
# like 2.7/test_threading:test_reinit_tls_after_fork, fail. It's not immediately clear
# why.
#sleep(0.00001)
#sleep(0.00001)
def get_hub_class():
"""Return the type of hub to use for the current thread.
If there's no type of hub for the current thread yet, 'gevent.hub.Hub' is used.
"""
hubtype = _threadlocal.Hub
if hubtype is None:
hubtype = _threadlocal.Hub = Hub
return hubtype
def get_hub(*args, **kwargs):
"""
Return the hub for the current thread.
If a hub does not exist in the current thread, a new one is
created of the type returned by :func:`get_hub_class`.
"""
hub = _threadlocal.hub
if hub is None:
hubtype = get_hub_class()
hub = _threadlocal.hub = hubtype(*args, **kwargs)
return hub
def _get_hub():
"""Return the hub for the current thread.
Return ``None`` if no hub has been created yet.
"""
return _threadlocal.hub
def set_hub(hub):
_threadlocal.hub = hub
def _import(path):
# pylint:disable=too-many-branches
if isinstance(path, list):
if not path:
raise ImportError('Cannot import from empty list: %r' % (path, ))
for item in path[:-1]:
try:
return _import(item)
except ImportError:
pass
return _import(path[-1])
if not isinstance(path, string_types):
return path
if '.' not in path:
raise ImportError("Cannot import %r (required format: [path/][package.]module.class)" % path)
if '/' in path:
package_path, path = path.rsplit('/', 1)
sys.path = [package_path] + sys.path
else:
package_path = None
try:
module, item = path.rsplit('.', 1)
x = __import__(module)
for attr in path.split('.')[1:]:
oldx = x
x = getattr(x, attr, _NONE)
if x is _NONE:
raise ImportError('Cannot import %r from %r' % (attr, oldx))
return x
finally:
try:
sys.path.remove(package_path)
except ValueError:
pass
def config(default, envvar):
result = os.environ.get(envvar) or default # absolute import gets confused pylint: disable=no-member
if isinstance(result, string_types):
return result.split(',')
return result
def resolver_config(default, envvar):
result = config(default, envvar)
return [_resolvers.get(x, x) for x in result]
_resolvers = {'ares': 'gevent.resolver_ares.Resolver',
'thread': 'gevent.resolver_thread.Resolver',
'block': 'gevent.socket.BlockingResolver'}
_DEFAULT_LOOP_CLASS = 'gevent.core.loop'
class Hub(RawGreenlet):
"""A greenlet that runs the event loop.
It is created automatically by :func:`get_hub`.
**Switching**
Every time this greenlet (i.e., the event loop) is switched *to*, if
the current greenlet has a ``switch_out`` method, it will be called. This
allows a greenlet to take some cleanup actions before yielding control. This method
should not call any gevent blocking functions.
"""
#: If instances of these classes are raised into the event loop,
#: they will be propagated out to the main greenlet (where they will
#: usually be caught by Python itself)
SYSTEM_ERROR = (KeyboardInterrupt, SystemExit, SystemError)
#: Instances of these classes are not considered to be errors and
#: do not get logged/printed when raised by the event loop.
NOT_ERROR = (GreenletExit, SystemExit)
loop_class = config(_DEFAULT_LOOP_CLASS, 'GEVENT_LOOP')
# For the standard class, go ahead and import it when this class
# is defined. This is no loss of generality because the envvar is
# only read when this class is defined, and we know that the
# standard class will be available. This can solve problems with
# the class being imported from multiple threads at once, leading
# to one of the imports failing. Only do this for the object we
# need in the constructor, as the rest of the factories are
# themselves handled lazily. See #687. (People using a custom loop_class
# can probably manage to get_hub() from the main thread or otherwise import
# that loop_class themselves.)
if loop_class == [_DEFAULT_LOOP_CLASS]:
loop_class = [_import(loop_class)]
resolver_class = ['gevent.resolver_thread.Resolver',
'gevent.resolver_ares.Resolver',
'gevent.socket.BlockingResolver']
#: The class or callable object, or the name of a factory function or class,
#: that will be used to create :attr:`resolver`. By default, configured according to
#: :doc:`dns`. If a list, a list of objects in preference order.
resolver_class = resolver_config(resolver_class, 'GEVENT_RESOLVER')
threadpool_class = config('gevent.threadpool.ThreadPool', 'GEVENT_THREADPOOL')
backend = config(None, 'GEVENT_BACKEND')
threadpool_size = 10
# using pprint.pformat can override custom __repr__ methods on dict/list
# subclasses, which can be a security concern
format_context = 'pprint.saferepr'
def __init__(self, loop=None, default=None):
RawGreenlet.__init__(self)
if hasattr(loop, 'run'):
if default is not None:
raise TypeError("Unexpected argument: default")
self.loop = loop
elif _threadlocal.loop is not None:
# Reuse a loop instance previously set by
# destroying a hub without destroying the associated
# loop. See #237 and #238.
self.loop = _threadlocal.loop
else:
if default is None and get_ident() != MAIN_THREAD:
default = False
loop_class = _import(self.loop_class)
if loop is None:
loop = self.backend
self.loop = loop_class(flags=loop, default=default)
self._resolver = None
self._threadpool = None
self.format_context = _import(self.format_context)
def __repr__(self):
if self.loop is None:
info = 'destroyed'
else:
try:
info = self.loop._format()
except Exception as ex: # pylint:disable=broad-except
info = str(ex) or repr(ex) or 'error'
result = '<%s at 0x%x %s' % (self.__class__.__name__, id(self), info)
if self._resolver is not None:
result += ' resolver=%r' % self._resolver
if self._threadpool is not None:
result += ' threadpool=%r' % self._threadpool
return result + '>'
def handle_error(self, context, type, value, tb):
"""
Called by the event loop when an error occurs. The arguments
type, value, and tb are the standard tuple returned by :func:`sys.exc_info`.
Applications can set a property on the hub with this same signature
to override the error handling provided by this class.
Errors that are :attr:`system errors <SYSTEM_ERROR>` are passed
to :meth:`handle_system_error`.
:param context: If this is ``None``, indicates a system error that
should generally result in exiting the loop and being thrown to the
parent greenlet.
"""
if isinstance(value, str):
# Cython can raise errors where the value is a plain string
# e.g., AttributeError, "_semaphore.Semaphore has no attr", <traceback>
value = type(value)
if not issubclass(type, self.NOT_ERROR):
self.print_exception(context, type, value, tb)
if context is None or issubclass(type, self.SYSTEM_ERROR):
self.handle_system_error(type, value)
def handle_system_error(self, type, value):
current = getcurrent()
if current is self or current is self.parent or self.loop is None:
self.parent.throw(type, value)
else:
# in case system error was handled and life goes on
# switch back to this greenlet as well
cb = None
try:
cb = self.loop.run_callback(current.switch)
except: # pylint:disable=bare-except
traceback.print_exc(file=self.exception_stream)
try:
self.parent.throw(type, value)
finally:
if cb is not None:
cb.stop()
@readproperty
def exception_stream(self):
"""
The stream to which exceptions will be written.
Defaults to ``sys.stderr`` unless assigned to.
.. versionadded:: 1.2a1
"""
# Unwrap any FileObjectThread we have thrown around sys.stderr
# (because it can't be used in the hub). Tricky because we are
# called in error situations when it's not safe to import.
stderr = sys.stderr
if type(stderr).__name__ == 'FileObjectThread':
stderr = stderr.io # pylint:disable=no-member
return stderr
def print_exception(self, context, type, value, tb):
# Python 3 does not gracefully handle None value or tb in
# traceback.print_exception() as previous versions did.
# pylint:disable=no-member
errstream = self.exception_stream
if value is None:
errstream.write('%s\n' % type.__name__)
else:
traceback.print_exception(type, value, tb, file=errstream)
del tb
try:
import time
errstream.write(time.ctime())
errstream.write(' ' if context is not None else '\n')
except: # pylint:disable=bare-except
# Possible not safe to import under certain
# error conditions in Python 2
pass
if context is not None:
if not isinstance(context, str):
try:
context = self.format_context(context)
except: # pylint:disable=bare-except
traceback.print_exc(file=self.exception_stream)
context = repr(context)
errstream.write('%s failed with %s\n\n' % (context, getattr(type, '__name__', 'exception'), ))
def switch(self):
switch_out = getattr(getcurrent(), 'switch_out', None)
if switch_out is not None:
switch_out()
return RawGreenlet.switch(self)
def switch_out(self):
raise BlockingSwitchOutError('Impossible to call blocking function in the event loop callback')
def wait(self, watcher):
"""
Wait until the *watcher* (which should not be started) is ready.
The current greenlet will be unscheduled during this time.
.. seealso:: :class:`gevent.core.io`, :class:`gevent.core.timer`,
:class:`gevent.core.signal`, :class:`gevent.core.idle`, :class:`gevent.core.prepare`,
:class:`gevent.core.check`, :class:`gevent.core.fork`, :class:`gevent.core.async`,
:class:`gevent.core.child`, :class:`gevent.core.stat`
"""
waiter = Waiter()
unique = object()
watcher.start(waiter.switch, unique)
try:
result = waiter.get()
if result is not unique:
raise InvalidSwitchError('Invalid switch into %s: %r (expected %r)' % (getcurrent(), result, unique))
finally:
watcher.stop()
def cancel_wait(self, watcher, error):
"""
Cancel an in-progress call to :meth:`wait` by throwing the given *error*
in the waiting greenlet.
"""
if watcher.callback is not None:
self.loop.run_callback(self._cancel_wait, watcher, error)
def _cancel_wait(self, watcher, error):
if watcher.active:
switch = watcher.callback
if switch is not None:
greenlet = getattr(switch, '__self__', None)
if greenlet is not None:
greenlet.throw(error)
def run(self):
"""
Entry-point to running the loop. This method is called automatically
when the hub greenlet is scheduled; do not call it directly.
:raises LoopExit: If the loop finishes running. This means
that there are no other scheduled greenlets, and no active
watchers or servers. In some situations, this indicates a
programming error.
"""
assert self is getcurrent(), 'Do not call Hub.run() directly'
while True:
loop = self.loop
loop.error_handler = self
try:
loop.run()
finally:
loop.error_handler = None # break the refcount cycle
self.parent.throw(LoopExit('This operation would block forever', self))
# this function must never return, as it will cause switch() in the parent greenlet
# to return an unexpected value
# It is still possible to kill this greenlet with throw. However, in that case
# switching to it is no longer safe, as switch will return immediatelly
def join(self, timeout=None):
"""Wait for the event loop to finish. Exits only when there are
no more spawned greenlets, started servers, active timeouts or watchers.
If *timeout* is provided, wait no longer for the specified number of seconds.
Returns True if exited because the loop finished execution.
Returns False if exited because of timeout expired.
"""
assert getcurrent() is self.parent, "only possible from the MAIN greenlet"
if self.dead:
return True
waiter = Waiter()
if timeout is not None:
timeout = self.loop.timer(timeout, ref=False)
timeout.start(waiter.switch)
try:
try:
waiter.get()
except LoopExit:
return True
finally:
if timeout is not None:
timeout.stop()
return False
def destroy(self, destroy_loop=None):
if self._resolver is not None:
self._resolver.close()
del self._resolver
if self._threadpool is not None:
self._threadpool.kill()
del self._threadpool
if destroy_loop is None:
destroy_loop = not self.loop.default
if destroy_loop:
if _threadlocal.loop is self.loop:
# Don't let anyone try to reuse this
_threadlocal.loop = None
self.loop.destroy()
else:
# Store in case another hub is created for this
# thread.
_threadlocal.loop = self.loop
self.loop = None
if _threadlocal.hub is self:
_threadlocal.hub = None
def _get_resolver(self):
if self._resolver is None:
if self.resolver_class is not None:
self.resolver_class = _import(self.resolver_class)
self._resolver = self.resolver_class(hub=self)
return self._resolver
def _set_resolver(self, value):
self._resolver = value
def _del_resolver(self):
del self._resolver
resolver = property(_get_resolver, _set_resolver, _del_resolver)
def _get_threadpool(self):
if self._threadpool is None:
if self.threadpool_class is not None:
self.threadpool_class = _import(self.threadpool_class)
self._threadpool = self.threadpool_class(self.threadpool_size, hub=self)
return self._threadpool
def _set_threadpool(self, value):
self._threadpool = value
def _del_threadpool(self):
del self._threadpool
threadpool = property(_get_threadpool, _set_threadpool, _del_threadpool)
class Waiter(object):
"""
A low level communication utility for greenlets.
Waiter is a wrapper around greenlet's ``switch()`` and ``throw()`` calls that makes them somewhat safer:
* switching will occur only if the waiting greenlet is executing :meth:`get` method currently;
* any error raised in the greenlet is handled inside :meth:`switch` and :meth:`throw`
* if :meth:`switch`/:meth:`throw` is called before the receiver calls :meth:`get`, then :class:`Waiter`
will store the value/exception. The following :meth:`get` will return the value/raise the exception.
The :meth:`switch` and :meth:`throw` methods must only be called from the :class:`Hub` greenlet.
The :meth:`get` method must be called from a greenlet other than :class:`Hub`.
>>> result = Waiter()
>>> timer = get_hub().loop.timer(0.1)
>>> timer.start(result.switch, 'hello from Waiter')
>>> result.get() # blocks for 0.1 seconds
'hello from Waiter'
If switch is called before the greenlet gets a chance to call :meth:`get` then
:class:`Waiter` stores the value.
>>> result = Waiter()
>>> timer = get_hub().loop.timer(0.1)
>>> timer.start(result.switch, 'hi from Waiter')
>>> sleep(0.2)
>>> result.get() # returns immediatelly without blocking
'hi from Waiter'
.. warning::
This a limited and dangerous way to communicate between
greenlets. It can easily leave a greenlet unscheduled forever
if used incorrectly. Consider using safer classes such as
:class:`gevent.event.Event`, :class:`gevent.event.AsyncResult`,
or :class:`gevent.queue.Queue`.
"""
__slots__ = ['hub', 'greenlet', 'value', '_exception']
def __init__(self, hub=None):
if hub is None:
self.hub = get_hub()
else:
self.hub = hub
self.greenlet = None
self.value = None
self._exception = _NONE
def clear(self):
self.greenlet = None
self.value = None
self._exception = _NONE
def __str__(self):
if self._exception is _NONE:
return '<%s greenlet=%s>' % (type(self).__name__, self.greenlet)
if self._exception is None:
return '<%s greenlet=%s value=%r>' % (type(self).__name__, self.greenlet, self.value)
return '<%s greenlet=%s exc_info=%r>' % (type(self).__name__, self.greenlet, self.exc_info)
def ready(self):
"""Return true if and only if it holds a value or an exception"""
return self._exception is not _NONE
def successful(self):
"""Return true if and only if it is ready and holds a value"""
return self._exception is None
@property
def exc_info(self):
"Holds the exception info passed to :meth:`throw` if :meth:`throw` was called. Otherwise ``None``."
if self._exception is not _NONE:
return self._exception
def switch(self, value=None):
"""Switch to the greenlet if one's available. Otherwise store the value."""
greenlet = self.greenlet
if greenlet is None:
self.value = value
self._exception = None
else:
assert getcurrent() is self.hub, "Can only use Waiter.switch method from the Hub greenlet"
switch = greenlet.switch
try:
switch(value)
except: # pylint:disable=bare-except
self.hub.handle_error(switch, *sys.exc_info())
def switch_args(self, *args):
return self.switch(args)
def throw(self, *throw_args):
"""Switch to the greenlet with the exception. If there's no greenlet, store the exception."""
greenlet = self.greenlet
if greenlet is None:
self._exception = throw_args
else:
assert getcurrent() is self.hub, "Can only use Waiter.switch method from the Hub greenlet"
throw = greenlet.throw
try:
throw(*throw_args)
except: # pylint:disable=bare-except
self.hub.handle_error(throw, *sys.exc_info())
def get(self):
"""If a value/an exception is stored, return/raise it. Otherwise until switch() or throw() is called."""
if self._exception is not _NONE:
if self._exception is None:
return self.value
else:
getcurrent().throw(*self._exception)
else:
if self.greenlet is not None:
raise ConcurrentObjectUseError('This Waiter is already used by %r' % (self.greenlet, ))
self.greenlet = getcurrent()
try:
return self.hub.switch()
finally:
self.greenlet = None
def __call__(self, source):
if source.exception is None:
self.switch(source.value)
else:
self.throw(source.exception)
# can also have a debugging version, that wraps the value in a tuple (self, value) in switch()
# and unwraps it in wait() thus checking that switch() was indeed called
class _MultipleWaiter(Waiter):
"""
An internal extension of Waiter that can be used if multiple objects
must be waited on, and there is a chance that in between waits greenlets
might be switched out. All greenlets that switch to this waiter
will have their value returned.
This does not handle exceptions or throw methods.
"""
__slots__ = ['_values']
def __init__(self, *args, **kwargs):
Waiter.__init__(self, *args, **kwargs)
# we typically expect a relatively small number of these to be outstanding.
# since we pop from the left, a deque might be slightly
# more efficient, but since we're in the hub we avoid imports if
# we can help it to better support monkey-patching, and delaying the import
# here can be impractical (see https://github.com/gevent/gevent/issues/652)
self._values = list()
def switch(self, value): # pylint:disable=signature-differs
self._values.append(value)
Waiter.switch(self, True)
def get(self):
if not self._values:
Waiter.get(self)
Waiter.clear(self)
return self._values.pop(0)
def iwait(objects, timeout=None, count=None):
"""
Iteratively yield *objects* as they are ready, until all (or *count*) are ready
or *timeout* expired.
:param objects: A sequence (supporting :func:`len`) containing objects
implementing the wait protocol (rawlink() and unlink()).
:keyword int count: If not `None`, then a number specifying the maximum number
of objects to wait for. If ``None`` (the default), all objects
are waited for.
:keyword float timeout: If given, specifies a maximum number of seconds
to wait. If the timeout expires before the desired waited-for objects
are available, then this method returns immediately.
.. seealso:: :func:`wait`
.. versionchanged:: 1.1a1
Add the *count* parameter.
.. versionchanged:: 1.1a2
No longer raise :exc:`LoopExit` if our caller switches greenlets
in between items yielded by this function.
"""
# QQQ would be nice to support iterable here that can be generated slowly (why?)
if objects is None:
yield get_hub().join(timeout=timeout)
return
count = len(objects) if count is None else min(count, len(objects))
waiter = _MultipleWaiter()
switch = waiter.switch
if timeout is not None:
timer = get_hub().loop.timer(timeout, priority=-1)
timer.start(switch, _NONE)
try:
for obj in objects:
obj.rawlink(switch)
for _ in xrange(count):
item = waiter.get()
waiter.clear()
if item is _NONE:
return
yield item
finally:
if timeout is not None:
timer.stop()
for aobj in objects:
unlink = getattr(aobj, 'unlink', None)
if unlink:
try:
unlink(switch)
except: # pylint:disable=bare-except
traceback.print_exc()
def wait(objects=None, timeout=None, count=None):
"""
Wait for ``objects`` to become ready or for event loop to finish.
If ``objects`` is provided, it must be a list containing objects
implementing the wait protocol (rawlink() and unlink() methods):
- :class:`gevent.Greenlet` instance
- :class:`gevent.event.Event` instance
- :class:`gevent.lock.Semaphore` instance
- :class:`gevent.subprocess.Popen` instance
If ``objects`` is ``None`` (the default), ``wait()`` blocks until
the current event loop has nothing to do (or until ``timeout`` passes):
- all greenlets have finished
- all servers were stopped
- all event loop watchers were stopped.
If ``count`` is ``None`` (the default), wait for all ``objects``
to become ready.
If ``count`` is a number, wait for (up to) ``count`` objects to become
ready. (For example, if count is ``1`` then the function exits
when any object in the list is ready).
If ``timeout`` is provided, it specifies the maximum number of
seconds ``wait()`` will block.
Returns the list of ready objects, in the order in which they were
ready.
.. seealso:: :func:`iwait`
"""
if objects is None:
return get_hub().join(timeout=timeout)
return list(iwait(objects, timeout, count))
class linkproxy(object):
__slots__ = ['callback', 'obj']
def __init__(self, callback, obj):
self.callback = callback
self.obj = obj
def __call__(self, *args):
callback = self.callback
obj = self.obj
self.callback = None
self.obj = None
callback(obj)
|