/usr/lib/python3/dist-packages/ipykernel/inprocess/manager.py is in python3-ipykernel 4.8.2-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 | """A kernel manager for in-process kernels."""
# Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.
from traitlets import Instance, DottedObjectName, default
from jupyter_client.managerabc import KernelManagerABC
from jupyter_client.manager import KernelManager
from jupyter_client.session import Session
from .constants import INPROCESS_KEY
class InProcessKernelManager(KernelManager):
"""A manager for an in-process kernel.
This class implements the interface of
`jupyter_client.kernelmanagerabc.KernelManagerABC` and allows
(asynchronous) frontends to be used seamlessly with an in-process kernel.
See `jupyter_client.kernelmanager.KernelManager` for docstrings.
"""
# The kernel process with which the KernelManager is communicating.
kernel = Instance('ipykernel.inprocess.ipkernel.InProcessKernel',
allow_none=True)
# the client class for KM.client() shortcut
client_class = DottedObjectName('ipykernel.inprocess.BlockingInProcessKernelClient')
@default('blocking_class')
def _default_blocking_class(self):
from .blocking import BlockingInProcessKernelClient
return BlockingInProcessKernelClient
@default('session')
def _default_session(self):
# don't sign in-process messages
return Session(key=INPROCESS_KEY, parent=self)
#--------------------------------------------------------------------------
# Kernel management methods
#--------------------------------------------------------------------------
def start_kernel(self, **kwds):
from ipykernel.inprocess.ipkernel import InProcessKernel
self.kernel = InProcessKernel(parent=self, session=self.session)
def shutdown_kernel(self):
self.kernel.iopub_thread.stop()
self._kill_kernel()
def restart_kernel(self, now=False, **kwds):
self.shutdown_kernel()
self.start_kernel(**kwds)
@property
def has_kernel(self):
return self.kernel is not None
def _kill_kernel(self):
self.kernel = None
def interrupt_kernel(self):
raise NotImplementedError("Cannot interrupt in-process kernel.")
def signal_kernel(self, signum):
raise NotImplementedError("Cannot signal in-process kernel.")
def is_alive(self):
return self.kernel is not None
def client(self, **kwargs):
kwargs['kernel'] = self.kernel
return super(InProcessKernelManager, self).client(**kwargs)
#-----------------------------------------------------------------------------
# ABC Registration
#-----------------------------------------------------------------------------
KernelManagerABC.register(InProcessKernelManager)
|