/usr/lib/python2.7/dist-packages/zmq/sugar/context.py is in python-zmq 14.4.0-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 | # coding: utf-8
"""Python bindings for 0MQ."""
# Copyright (C) PyZMQ Developers
# Distributed under the terms of the Modified BSD License.
import atexit
import weakref
from zmq.backend import Context as ContextBase
from . import constants
from .attrsettr import AttributeSetter
from .constants import ENOTSUP, ctx_opt_names
from .socket import Socket
from zmq.error import ZMQError
from zmq.utils.interop import cast_int_addr
class Context(ContextBase, AttributeSetter):
"""Create a zmq Context
A zmq Context creates sockets via its ``ctx.socket`` method.
"""
sockopts = None
_instance = None
_shadow = False
_exiting = False
def __init__(self, io_threads=1, **kwargs):
super(Context, self).__init__(io_threads=io_threads, **kwargs)
if kwargs.get('shadow', False):
self._shadow = True
else:
self._shadow = False
self.sockopts = {}
self._exiting = False
if not self._shadow:
ctx_ref = weakref.ref(self)
def _notify_atexit():
ctx = ctx_ref()
if ctx is not None:
ctx._exiting = True
atexit.register(_notify_atexit)
def __del__(self):
"""deleting a Context should terminate it, without trying non-threadsafe destroy"""
if not self._shadow and not self._exiting:
self.term()
def __enter__(self):
return self
def __exit__(self, *args, **kwargs):
self.term()
@classmethod
def shadow(cls, address):
"""Shadow an existing libzmq context
address is the integer address of the libzmq context
or an FFI pointer to it.
.. versionadded:: 14.1
"""
address = cast_int_addr(address)
return cls(shadow=address)
@classmethod
def shadow_pyczmq(cls, ctx):
"""Shadow an existing pyczmq context
ctx is the FFI `zctx_t *` pointer
.. versionadded:: 14.1
"""
from pyczmq import zctx
underlying = zctx.underlying(ctx)
address = cast_int_addr(underlying)
return cls(shadow=address)
# static method copied from tornado IOLoop.instance
@classmethod
def instance(cls, io_threads=1):
"""Returns a global Context instance.
Most single-threaded applications have a single, global Context.
Use this method instead of passing around Context instances
throughout your code.
A common pattern for classes that depend on Contexts is to use
a default argument to enable programs with multiple Contexts
but not require the argument for simpler applications:
class MyClass(object):
def __init__(self, context=None):
self.context = context or Context.instance()
"""
if cls._instance is None or cls._instance.closed:
cls._instance = cls(io_threads=io_threads)
return cls._instance
#-------------------------------------------------------------------------
# Hooks for ctxopt completion
#-------------------------------------------------------------------------
def __dir__(self):
keys = dir(self.__class__)
for collection in (
ctx_opt_names,
):
keys.extend(collection)
return keys
#-------------------------------------------------------------------------
# Creating Sockets
#-------------------------------------------------------------------------
@property
def _socket_class(self):
return Socket
def socket(self, socket_type):
"""Create a Socket associated with this Context.
Parameters
----------
socket_type : int
The socket type, which can be any of the 0MQ socket types:
REQ, REP, PUB, SUB, PAIR, DEALER, ROUTER, PULL, PUSH, etc.
"""
if self.closed:
raise ZMQError(ENOTSUP)
s = self._socket_class(self, socket_type)
for opt, value in self.sockopts.items():
try:
s.setsockopt(opt, value)
except ZMQError:
# ignore ZMQErrors, which are likely for socket options
# that do not apply to a particular socket type, e.g.
# SUBSCRIBE for non-SUB sockets.
pass
return s
def setsockopt(self, opt, value):
"""set default socket options for new sockets created by this Context
.. versionadded:: 13.0
"""
self.sockopts[opt] = value
def getsockopt(self, opt):
"""get default socket options for new sockets created by this Context
.. versionadded:: 13.0
"""
return self.sockopts[opt]
def _set_attr_opt(self, name, opt, value):
"""set default sockopts as attributes"""
if name in constants.ctx_opt_names:
return self.set(opt, value)
else:
self.sockopts[opt] = value
def _get_attr_opt(self, name, opt):
"""get default sockopts as attributes"""
if name in constants.ctx_opt_names:
return self.get(opt)
else:
if opt not in self.sockopts:
raise AttributeError(name)
else:
return self.sockopts[opt]
def __delattr__(self, key):
"""delete default sockopts as attributes"""
key = key.upper()
try:
opt = getattr(constants, key)
except AttributeError:
raise AttributeError("no such socket option: %s" % key)
else:
if opt not in self.sockopts:
raise AttributeError(key)
else:
del self.sockopts[opt]
__all__ = ['Context']
|