/usr/lib/python2.7/dist-packages/ipykernel/serialize.py is in python-ipykernel 4.8.2-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 | """serialization utilities for apply messages"""
# Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.
import warnings
warnings.warn("ipykernel.serialize is deprecated. It has moved to ipyparallel.serialize", DeprecationWarning)
try:
import cPickle
pickle = cPickle
except:
cPickle = None
import pickle
from itertools import chain
from ipython_genutils.py3compat import PY3, buffer_to_bytes_py2
from ipykernel.pickleutil import (
can, uncan, can_sequence, uncan_sequence, CannedObject,
istype, sequence_types, PICKLE_PROTOCOL,
)
from jupyter_client.session import MAX_ITEMS, MAX_BYTES
if PY3:
buffer = memoryview
#-----------------------------------------------------------------------------
# Serialization Functions
#-----------------------------------------------------------------------------
def _extract_buffers(obj, threshold=MAX_BYTES):
"""extract buffers larger than a certain threshold"""
buffers = []
if isinstance(obj, CannedObject) and obj.buffers:
for i,buf in enumerate(obj.buffers):
if len(buf) > threshold:
# buffer larger than threshold, prevent pickling
obj.buffers[i] = None
buffers.append(buf)
# buffer too small for separate send, coerce to bytes
# because pickling buffer objects just results in broken pointers
elif isinstance(buf, memoryview):
obj.buffers[i] = buf.tobytes()
elif isinstance(buf, buffer):
obj.buffers[i] = bytes(buf)
return buffers
def _restore_buffers(obj, buffers):
"""restore buffers extracted by """
if isinstance(obj, CannedObject) and obj.buffers:
for i,buf in enumerate(obj.buffers):
if buf is None:
obj.buffers[i] = buffers.pop(0)
def serialize_object(obj, buffer_threshold=MAX_BYTES, item_threshold=MAX_ITEMS):
"""Serialize an object into a list of sendable buffers.
Parameters
----------
obj : object
The object to be serialized
buffer_threshold : int
The threshold (in bytes) for pulling out data buffers
to avoid pickling them.
item_threshold : int
The maximum number of items over which canning will iterate.
Containers (lists, dicts) larger than this will be pickled without
introspection.
Returns
-------
[bufs] : list of buffers representing the serialized object.
"""
buffers = []
if istype(obj, sequence_types) and len(obj) < item_threshold:
cobj = can_sequence(obj)
for c in cobj:
buffers.extend(_extract_buffers(c, buffer_threshold))
elif istype(obj, dict) and len(obj) < item_threshold:
cobj = {}
for k in sorted(obj):
c = can(obj[k])
buffers.extend(_extract_buffers(c, buffer_threshold))
cobj[k] = c
else:
cobj = can(obj)
buffers.extend(_extract_buffers(cobj, buffer_threshold))
buffers.insert(0, pickle.dumps(cobj, PICKLE_PROTOCOL))
return buffers
def deserialize_object(buffers, g=None):
"""reconstruct an object serialized by serialize_object from data buffers.
Parameters
----------
bufs : list of buffers/bytes
g : globals to be used when uncanning
Returns
-------
(newobj, bufs) : unpacked object, and the list of remaining unused buffers.
"""
bufs = list(buffers)
pobj = buffer_to_bytes_py2(bufs.pop(0))
canned = pickle.loads(pobj)
if istype(canned, sequence_types) and len(canned) < MAX_ITEMS:
for c in canned:
_restore_buffers(c, bufs)
newobj = uncan_sequence(canned, g)
elif istype(canned, dict) and len(canned) < MAX_ITEMS:
newobj = {}
for k in sorted(canned):
c = canned[k]
_restore_buffers(c, bufs)
newobj[k] = uncan(c, g)
else:
_restore_buffers(canned, bufs)
newobj = uncan(canned, g)
return newobj, bufs
def pack_apply_message(f, args, kwargs, buffer_threshold=MAX_BYTES, item_threshold=MAX_ITEMS):
"""pack up a function, args, and kwargs to be sent over the wire
Each element of args/kwargs will be canned for special treatment,
but inspection will not go any deeper than that.
Any object whose data is larger than `threshold` will not have their data copied
(only numpy arrays and bytes/buffers support zero-copy)
Message will be a list of bytes/buffers of the format:
[ cf, pinfo, <arg_bufs>, <kwarg_bufs> ]
With length at least two + len(args) + len(kwargs)
"""
arg_bufs = list(chain.from_iterable(
serialize_object(arg, buffer_threshold, item_threshold) for arg in args))
kw_keys = sorted(kwargs.keys())
kwarg_bufs = list(chain.from_iterable(
serialize_object(kwargs[key], buffer_threshold, item_threshold) for key in kw_keys))
info = dict(nargs=len(args), narg_bufs=len(arg_bufs), kw_keys=kw_keys)
msg = [pickle.dumps(can(f), PICKLE_PROTOCOL)]
msg.append(pickle.dumps(info, PICKLE_PROTOCOL))
msg.extend(arg_bufs)
msg.extend(kwarg_bufs)
return msg
def unpack_apply_message(bufs, g=None, copy=True):
"""unpack f,args,kwargs from buffers packed by pack_apply_message()
Returns: original f,args,kwargs"""
bufs = list(bufs) # allow us to pop
assert len(bufs) >= 2, "not enough buffers!"
pf = buffer_to_bytes_py2(bufs.pop(0))
f = uncan(pickle.loads(pf), g)
pinfo = buffer_to_bytes_py2(bufs.pop(0))
info = pickle.loads(pinfo)
arg_bufs, kwarg_bufs = bufs[:info['narg_bufs']], bufs[info['narg_bufs']:]
args = []
for i in range(info['nargs']):
arg, arg_bufs = deserialize_object(arg_bufs, g)
args.append(arg)
args = tuple(args)
assert not arg_bufs, "Shouldn't be any arg bufs left over"
kwargs = {}
for key in info['kw_keys']:
kwarg, kwarg_bufs = deserialize_object(kwarg_bufs, g)
kwargs[key] = kwarg
assert not kwarg_bufs, "Shouldn't be any kwarg bufs left over"
return f,args,kwargs
|