/usr/lib/python3/dist-packages/tornado/platform/asyncio.py is in python3-tornado 4.2.1-1ubuntu3.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 | """Bridges between the `asyncio` module and Tornado IOLoop.
This is a work in progress and interfaces are subject to change.
To test:
python3.4 -m tornado.test.runtests --ioloop=tornado.platform.asyncio.AsyncIOLoop
python3.4 -m tornado.test.runtests --ioloop=tornado.platform.asyncio.AsyncIOMainLoop
(the tests log a few warnings with AsyncIOMainLoop because they leave some
unfinished callbacks on the event loop that fail when it resumes)
"""
from __future__ import absolute_import, division, print_function, with_statement
import functools
import tornado.concurrent
from tornado.gen import convert_yielded
from tornado.ioloop import IOLoop
from tornado import stack_context
try:
# Import the real asyncio module for py33+ first. Older versions of the
# trollius backport also use this name.
import asyncio
except ImportError as e:
# Asyncio itself isn't available; see if trollius is (backport to py26+).
try:
import trollius as asyncio
except ImportError:
# Re-raise the original asyncio error, not the trollius one.
raise e
class BaseAsyncIOLoop(IOLoop):
def initialize(self, asyncio_loop, close_loop=False, **kwargs):
super(BaseAsyncIOLoop, self).initialize(**kwargs)
self.asyncio_loop = asyncio_loop
self.close_loop = close_loop
self.asyncio_loop.call_soon(self.make_current)
# Maps fd to (fileobj, handler function) pair (as in IOLoop.add_handler)
self.handlers = {}
# Set of fds listening for reads/writes
self.readers = set()
self.writers = set()
self.closing = False
def close(self, all_fds=False):
self.closing = True
for fd in list(self.handlers):
fileobj, handler_func = self.handlers[fd]
self.remove_handler(fd)
if all_fds:
self.close_fd(fileobj)
if self.close_loop:
self.asyncio_loop.close()
def add_handler(self, fd, handler, events):
fd, fileobj = self.split_fd(fd)
if fd in self.handlers:
raise ValueError("fd %s added twice" % fd)
self.handlers[fd] = (fileobj, stack_context.wrap(handler))
if events & IOLoop.READ:
self.asyncio_loop.add_reader(
fd, self._handle_events, fd, IOLoop.READ)
self.readers.add(fd)
if events & IOLoop.WRITE:
self.asyncio_loop.add_writer(
fd, self._handle_events, fd, IOLoop.WRITE)
self.writers.add(fd)
def update_handler(self, fd, events):
fd, fileobj = self.split_fd(fd)
if events & IOLoop.READ:
if fd not in self.readers:
self.asyncio_loop.add_reader(
fd, self._handle_events, fd, IOLoop.READ)
self.readers.add(fd)
else:
if fd in self.readers:
self.asyncio_loop.remove_reader(fd)
self.readers.remove(fd)
if events & IOLoop.WRITE:
if fd not in self.writers:
self.asyncio_loop.add_writer(
fd, self._handle_events, fd, IOLoop.WRITE)
self.writers.add(fd)
else:
if fd in self.writers:
self.asyncio_loop.remove_writer(fd)
self.writers.remove(fd)
def remove_handler(self, fd):
fd, fileobj = self.split_fd(fd)
if fd not in self.handlers:
return
if fd in self.readers:
self.asyncio_loop.remove_reader(fd)
self.readers.remove(fd)
if fd in self.writers:
self.asyncio_loop.remove_writer(fd)
self.writers.remove(fd)
del self.handlers[fd]
def _handle_events(self, fd, events):
fileobj, handler_func = self.handlers[fd]
handler_func(fileobj, events)
def start(self):
self._setup_logging()
self.asyncio_loop.run_forever()
def stop(self):
self.asyncio_loop.stop()
def call_at(self, when, callback, *args, **kwargs):
# asyncio.call_at supports *args but not **kwargs, so bind them here.
# We do not synchronize self.time and asyncio_loop.time, so
# convert from absolute to relative.
return self.asyncio_loop.call_later(
max(0, when - self.time()), self._run_callback,
functools.partial(stack_context.wrap(callback), *args, **kwargs))
def remove_timeout(self, timeout):
timeout.cancel()
def add_callback(self, callback, *args, **kwargs):
if self.closing:
raise RuntimeError("IOLoop is closing")
self.asyncio_loop.call_soon_threadsafe(
self._run_callback,
functools.partial(stack_context.wrap(callback), *args, **kwargs))
add_callback_from_signal = add_callback
class AsyncIOMainLoop(BaseAsyncIOLoop):
def initialize(self, **kwargs):
super(AsyncIOMainLoop, self).initialize(asyncio.get_event_loop(),
close_loop=False, **kwargs)
class AsyncIOLoop(BaseAsyncIOLoop):
def initialize(self, **kwargs):
super(AsyncIOLoop, self).initialize(asyncio.new_event_loop(),
close_loop=True, **kwargs)
def to_tornado_future(asyncio_future):
"""Convert an ``asyncio.Future`` to a `tornado.concurrent.Future`."""
tf = tornado.concurrent.Future()
tornado.concurrent.chain_future(asyncio_future, tf)
return tf
def to_asyncio_future(tornado_future):
"""Convert a `tornado.concurrent.Future` to an ``asyncio.Future``."""
af = asyncio.Future()
tornado.concurrent.chain_future(tornado_future, af)
return af
if hasattr(convert_yielded, 'register'):
convert_yielded.register(asyncio.Future, to_tornado_future)
|