/usr/lib/python3/dist-packages/twisted/trial/_asynctest.py is in python3-twisted-experimental 13.2.0-0ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 | # -*- test-case-name: twisted.trial.test -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Things likely to be used by writers of unit tests.
Maintainer: Jonathan Lange
"""
from __future__ import division, absolute_import
import warnings
from zope.interface import implementer
# We can't import reactor at module-level because this code runs before trial
# installs a user-specified reactor, installing the default reactor and
# breaking reactor installation. See also #6047.
from twisted.internet import defer, utils
from twisted.python import failure
from twisted.trial import itrial, util
from twisted.trial._synctest import (
FailTest, SkipTest, SynchronousTestCase)
_wait_is_running = []
@implementer(itrial.ITestCase)
class TestCase(SynchronousTestCase):
"""
A unit test. The atom of the unit testing universe.
This class extends L{SynchronousTestCase} which extends C{unittest.TestCase}
from the standard library. The main feature is the ability to return
C{Deferred}s from tests and fixture methods and to have the suite wait for
those C{Deferred}s to fire. Also provides new assertions such as
L{assertFailure}.
@ivar timeout: A real number of seconds. If set, the test will
raise an error if it takes longer than C{timeout} seconds.
If not set, util.DEFAULT_TIMEOUT_DURATION is used.
"""
def __init__(self, methodName='runTest'):
"""
Construct an asynchronous test case for C{methodName}.
@param methodName: The name of a method on C{self}. This method should
be a unit test. That is, it should be a short method that calls some of
the assert* methods. If C{methodName} is unspecified,
L{SynchronousTestCase.runTest} will be used as the test method. This is
mostly useful for testing Trial.
"""
super(TestCase, self).__init__(methodName)
def assertFailure(self, deferred, *expectedFailures):
"""
Fail if C{deferred} does not errback with one of C{expectedFailures}.
Returns the original Deferred with callbacks added. You will need
to return this Deferred from your test case.
"""
def _cb(ignore):
raise self.failureException(
"did not catch an error, instead got %r" % (ignore,))
def _eb(failure):
if failure.check(*expectedFailures):
return failure.value
else:
output = ('\nExpected: %r\nGot:\n%s'
% (expectedFailures, str(failure)))
raise self.failureException(output)
return deferred.addCallbacks(_cb, _eb)
failUnlessFailure = assertFailure
def _run(self, methodName, result):
from twisted.internet import reactor
timeout = self.getTimeout()
def onTimeout(d):
e = defer.TimeoutError("%r (%s) still running at %s secs"
% (self, methodName, timeout))
f = failure.Failure(e)
# try to errback the deferred that the test returns (for no gorram
# reason) (see issue1005 and test_errorPropagation in
# test_deferred)
try:
d.errback(f)
except defer.AlreadyCalledError:
# if the deferred has been called already but the *back chain
# is still unfinished, crash the reactor and report timeout
# error ourself.
reactor.crash()
self._timedOut = True # see self._wait
todo = self.getTodo()
if todo is not None and todo.expected(f):
result.addExpectedFailure(self, f, todo)
else:
result.addError(self, f)
onTimeout = utils.suppressWarnings(
onTimeout, util.suppress(category=DeprecationWarning))
method = getattr(self, methodName)
d = defer.maybeDeferred(
utils.runWithWarningsSuppressed, self._getSuppress(), method)
call = reactor.callLater(timeout, onTimeout, d)
d.addBoth(lambda x : call.active() and call.cancel() or x)
return d
def __call__(self, *args, **kwargs):
return self.run(*args, **kwargs)
def deferSetUp(self, ignored, result):
d = self._run('setUp', result)
d.addCallbacks(self.deferTestMethod, self._ebDeferSetUp,
callbackArgs=(result,),
errbackArgs=(result,))
return d
def _ebDeferSetUp(self, failure, result):
if failure.check(SkipTest):
result.addSkip(self, self._getSkipReason(self.setUp, failure.value))
else:
result.addError(self, failure)
if failure.check(KeyboardInterrupt):
result.stop()
return self.deferRunCleanups(None, result)
def deferTestMethod(self, ignored, result):
d = self._run(self._testMethodName, result)
d.addCallbacks(self._cbDeferTestMethod, self._ebDeferTestMethod,
callbackArgs=(result,),
errbackArgs=(result,))
d.addBoth(self.deferRunCleanups, result)
d.addBoth(self.deferTearDown, result)
return d
def _cbDeferTestMethod(self, ignored, result):
if self.getTodo() is not None:
result.addUnexpectedSuccess(self, self.getTodo())
else:
self._passed = True
return ignored
def _ebDeferTestMethod(self, f, result):
todo = self.getTodo()
if todo is not None and todo.expected(f):
result.addExpectedFailure(self, f, todo)
elif f.check(self.failureException, FailTest):
result.addFailure(self, f)
elif f.check(KeyboardInterrupt):
result.addError(self, f)
result.stop()
elif f.check(SkipTest):
result.addSkip(
self,
self._getSkipReason(getattr(self, self._testMethodName), f.value))
else:
result.addError(self, f)
def deferTearDown(self, ignored, result):
d = self._run('tearDown', result)
d.addErrback(self._ebDeferTearDown, result)
return d
def _ebDeferTearDown(self, failure, result):
result.addError(self, failure)
if failure.check(KeyboardInterrupt):
result.stop()
self._passed = False
def deferRunCleanups(self, ignored, result):
"""
Run any scheduled cleanups and report errors (if any to the result
object.
"""
d = self._runCleanups()
d.addCallback(self._cbDeferRunCleanups, result)
return d
def _cbDeferRunCleanups(self, cleanupResults, result):
for flag, failure in cleanupResults:
if flag == defer.FAILURE:
result.addError(self, failure)
if failure.check(KeyboardInterrupt):
result.stop()
self._passed = False
def _cleanUp(self, result):
try:
clean = util._Janitor(self, result).postCaseCleanup()
if not clean:
self._passed = False
except:
result.addError(self, failure.Failure())
self._passed = False
for error in self._observer.getErrors():
result.addError(self, error)
self._passed = False
self.flushLoggedErrors()
self._removeObserver()
if self._passed:
result.addSuccess(self)
def _classCleanUp(self, result):
try:
util._Janitor(self, result).postClassCleanup()
except:
result.addError(self, failure.Failure())
def _makeReactorMethod(self, name):
"""
Create a method which wraps the reactor method C{name}. The new
method issues a deprecation warning and calls the original.
"""
def _(*a, **kw):
warnings.warn("reactor.%s cannot be used inside unit tests. "
"In the future, using %s will fail the test and may "
"crash or hang the test run."
% (name, name),
stacklevel=2, category=DeprecationWarning)
return self._reactorMethods[name](*a, **kw)
return _
def _deprecateReactor(self, reactor):
"""
Deprecate C{iterate}, C{crash} and C{stop} on C{reactor}. That is,
each method is wrapped in a function that issues a deprecation
warning, then calls the original.
@param reactor: The Twisted reactor.
"""
self._reactorMethods = {}
for name in ['crash', 'iterate', 'stop']:
self._reactorMethods[name] = getattr(reactor, name)
setattr(reactor, name, self._makeReactorMethod(name))
def _undeprecateReactor(self, reactor):
"""
Restore the deprecated reactor methods. Undoes what
L{_deprecateReactor} did.
@param reactor: The Twisted reactor.
"""
for name, method in self._reactorMethods.items():
setattr(reactor, name, method)
self._reactorMethods = {}
def _runCleanups(self):
"""
Run the cleanups added with L{addCleanup} in order.
@return: A C{Deferred} that fires when all cleanups are run.
"""
def _makeFunction(f, args, kwargs):
return lambda: f(*args, **kwargs)
callables = []
while len(self._cleanups) > 0:
f, args, kwargs = self._cleanups.pop()
callables.append(_makeFunction(f, args, kwargs))
return util._runSequentially(callables)
def _runFixturesAndTest(self, result):
"""
Really run C{setUp}, the test method, and C{tearDown}. Any of these may
return L{defer.Deferred}s. After they complete, do some reactor cleanup.
@param result: A L{TestResult} object.
"""
from twisted.internet import reactor
self._deprecateReactor(reactor)
self._timedOut = False
try:
d = self.deferSetUp(None, result)
try:
self._wait(d)
finally:
self._cleanUp(result)
self._classCleanUp(result)
finally:
self._undeprecateReactor(reactor)
def addCleanup(self, f, *args, **kwargs):
"""
Extend the base cleanup feature with support for cleanup functions which
return Deferreds.
If the function C{f} returns a Deferred, C{TestCase} will wait until the
Deferred has fired before proceeding to the next function.
"""
return super(TestCase, self).addCleanup(f, *args, **kwargs)
def getSuppress(self):
return self._getSuppress()
def getTimeout(self):
"""
Returns the timeout value set on this test. Checks on the instance
first, then the class, then the module, then packages. As soon as it
finds something with a C{timeout} attribute, returns that. Returns
L{util.DEFAULT_TIMEOUT_DURATION} if it cannot find anything. See
L{TestCase} docstring for more details.
"""
timeout = util.acquireAttribute(self._parents, 'timeout',
util.DEFAULT_TIMEOUT_DURATION)
try:
return float(timeout)
except (ValueError, TypeError):
# XXX -- this is here because sometimes people will have methods
# called 'timeout', or set timeout to 'orange', or something
# Particularly, test_news.NewsTestCase and ReactorCoreTestCase
# both do this.
warnings.warn("'timeout' attribute needs to be a number.",
category=DeprecationWarning)
return util.DEFAULT_TIMEOUT_DURATION
def _wait(self, d, running=_wait_is_running):
"""Take a Deferred that only ever callbacks. Block until it happens.
"""
if running:
raise RuntimeError("_wait is not reentrant")
from twisted.internet import reactor
results = []
def append(any):
if results is not None:
results.append(any)
def crash(ign):
if results is not None:
reactor.crash()
crash = utils.suppressWarnings(
crash, util.suppress(message=r'reactor\.crash cannot be used.*',
category=DeprecationWarning))
def stop():
reactor.crash()
stop = utils.suppressWarnings(
stop, util.suppress(message=r'reactor\.crash cannot be used.*',
category=DeprecationWarning))
running.append(None)
try:
d.addBoth(append)
if results:
# d might have already been fired, in which case append is
# called synchronously. Avoid any reactor stuff.
return
d.addBoth(crash)
reactor.stop = stop
try:
reactor.run()
finally:
del reactor.stop
# If the reactor was crashed elsewhere due to a timeout, hopefully
# that crasher also reported an error. Just return.
# _timedOut is most likely to be set when d has fired but hasn't
# completed its callback chain (see self._run)
if results or self._timedOut: #defined in run() and _run()
return
# If the timeout didn't happen, and we didn't get a result or
# a failure, then the user probably aborted the test, so let's
# just raise KeyboardInterrupt.
# FIXME: imagine this:
# web/test/test_webclient.py:
# exc = self.assertRaises(error.Error, wait, method(url))
#
# wait() will raise KeyboardInterrupt, and assertRaises will
# swallow it. Therefore, wait() raising KeyboardInterrupt is
# insufficient to stop trial. A suggested solution is to have
# this code set a "stop trial" flag, or otherwise notify trial
# that it should really try to stop as soon as possible.
raise KeyboardInterrupt()
finally:
results = None
running.pop()
|