/usr/lib/python2.7/dist-packages/xdist/remote.py is in python-pytest-xdist 1.22.1-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 | """
This module is executed in remote subprocesses and helps to
control a remote testing session and relay back information.
It assumes that 'py' is importable and does not have dependencies
on the rest of the xdist code. This means that the xdist-plugin
needs not to be installed in remote environments.
"""
import sys
import os
import time
import _pytest.hookspec
import pytest
class WorkerInteractor:
def __init__(self, config, channel):
self.config = config
self.workerid = config.workerinput.get('workerid', "?")
self.log = py.log.Producer("worker-%s" % self.workerid)
if not config.option.debug:
py.log.setconsumer(self.log._keywords, None)
self.channel = channel
config.pluginmanager.register(self)
def sendevent(self, name, **kwargs):
self.log("sending", name, kwargs)
self.channel.send((name, kwargs))
def pytest_internalerror(self, excrepr):
for line in str(excrepr).split("\n"):
self.log("IERROR>", line)
def pytest_sessionstart(self, session):
self.session = session
workerinfo = getinfodict()
self.sendevent("workerready", workerinfo=workerinfo)
@pytest.hookimpl(hookwrapper=True)
def pytest_sessionfinish(self, exitstatus):
self.config.workeroutput['exitstatus'] = exitstatus
yield
self.sendevent("workerfinished", workeroutput=self.config.workeroutput)
def pytest_collection(self, session):
self.sendevent("collectionstart")
def pytest_runtestloop(self, session):
self.log("entering main loop")
torun = []
while 1:
try:
name, kwargs = self.channel.receive()
except EOFError:
return True
self.log("received command", name, kwargs)
if name == "runtests":
torun.extend(kwargs['indices'])
elif name == "runtests_all":
torun.extend(range(len(session.items)))
self.log("items to run:", torun)
# only run if we have an item and a next item
while len(torun) >= 2:
self.run_one_test(torun)
if name == "shutdown":
if torun:
self.run_one_test(torun)
break
return True
def run_one_test(self, torun):
items = self.session.items
self.item_index = torun.pop(0)
item = items[self.item_index]
if torun:
nextitem = items[torun[0]]
else:
nextitem = None
start = time.time()
self.config.hook.pytest_runtest_protocol(
item=item,
nextitem=nextitem)
duration = time.time() - start
self.sendevent("runtest_protocol_complete", item_index=self.item_index,
duration=duration)
def pytest_collection_finish(self, session):
self.sendevent(
"collectionfinish",
topdir=str(session.fspath),
ids=[item.nodeid for item in session.items])
def pytest_runtest_logstart(self, nodeid, location):
self.sendevent("logstart", nodeid=nodeid, location=location)
# the pytest_runtest_logfinish hook was introduced in pytest 3.4
if hasattr(_pytest.hookspec, 'pytest_runtest_logfinish'):
def pytest_runtest_logfinish(self, nodeid, location):
self.sendevent("logfinish", nodeid=nodeid, location=location)
def pytest_runtest_logreport(self, report):
data = serialize_report(report)
data["item_index"] = self.item_index
data["worker_id"] = self.workerid
assert self.session.items[self.item_index].nodeid == report.nodeid
self.sendevent("testreport", data=data)
def pytest_collectreport(self, report):
data = serialize_report(report)
self.sendevent("collectreport", data=data)
def pytest_logwarning(self, message, code, nodeid, fslocation):
self.sendevent("logwarning", message=message, code=code, nodeid=nodeid,
fslocation=str(fslocation))
def serialize_report(rep):
def disassembled_report(rep):
reprtraceback = rep.longrepr.reprtraceback.__dict__.copy()
reprcrash = rep.longrepr.reprcrash.__dict__.copy()
new_entries = []
for entry in reprtraceback['reprentries']:
entry_data = {
'type': type(entry).__name__,
'data': entry.__dict__.copy(),
}
for key, value in entry_data['data'].items():
if hasattr(value, '__dict__'):
entry_data['data'][key] = value.__dict__.copy()
new_entries.append(entry_data)
reprtraceback['reprentries'] = new_entries
return {
'reprcrash': reprcrash,
'reprtraceback': reprtraceback,
'sections': rep.longrepr.sections
}
import py
d = rep.__dict__.copy()
if hasattr(rep.longrepr, 'toterminal'):
if hasattr(rep.longrepr, 'reprtraceback') \
and hasattr(rep.longrepr, 'reprcrash'):
d['longrepr'] = disassembled_report(rep)
else:
d['longrepr'] = str(rep.longrepr)
else:
d['longrepr'] = rep.longrepr
for name in d:
if isinstance(d[name], py.path.local):
d[name] = str(d[name])
elif name == "result":
d[name] = None # for now
return d
def getinfodict():
import platform
return dict(
version=sys.version,
version_info=tuple(sys.version_info),
sysplatform=sys.platform,
platform=platform.platform(),
executable=sys.executable,
cwd=os.getcwd(),
)
def remote_initconfig(option_dict, args):
from _pytest.config import Config
option_dict['plugins'].append("no:terminal")
config = Config.fromdictargs(option_dict, args)
config.option.looponfail = False
config.option.usepdb = False
config.option.dist = "no"
config.option.distload = False
config.option.numprocesses = None
config.args = args
return config
if __name__ == '__channelexec__':
channel = channel # noqa
workerinput, args, option_dict = channel.receive()
importpath = os.getcwd()
sys.path.insert(0, importpath) # XXX only for remote situations
os.environ['PYTHONPATH'] = (
importpath + os.pathsep +
os.environ.get('PYTHONPATH', ''))
os.environ['PYTEST_XDIST_WORKER'] = workerinput['workerid']
os.environ['PYTEST_XDIST_WORKER_COUNT'] = str(workerinput['workercount'])
# os.environ['PYTHONPATH'] = importpath
import py
config = remote_initconfig(option_dict, args)
config.workerinput = workerinput
config.workeroutput = {}
# TODO: deprecated name, backward compatibility only. Remove it in future
config.slaveinput = config.workerinput
config.slaveoutput = config.workeroutput
interactor = WorkerInteractor(config, channel)
config.hook.pytest_cmdline_main(config=config)
|