/usr/lib/python2.7/dist-packages/sagenb/testing/stress.py is in python-sagenb 1.0.1+ds1-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 | # -*- coding: utf-8 -*
import re
from six.moves.urllib.request import urlopen
from sage.misc.sage_timeit import sage_timeit
from sagenb.misc.misc import walltime, cputime
from signal import alarm
def cancel_alarm():
alarm(0)
TIMEOUT = 'timeout'
class PubStressTest:
"""
Stress test viewing things that a non-authenticated viewer can
look at, namely the login screen, list of published worksheets,
and each individual published worksheet.
"""
def __init__(self, url, verbose=True,
timeit_number=1, timeit_repeat=1,
url_timeout=10):
"""
INPUT:
- ``url`` -- url of the Sage notebook server
- ``verbose`` -- bool; whether to print info about what is
being tested as it is tested
- ``timeit_number`` -- integer (default: 1)
- ``timeit_repeat`` -- integer (default: 1)
"""
self._url = url
self._verbose = verbose
self._timeit_number = timeit_number
self._timeit_repeat = timeit_repeat
self._url_timeout = url_timeout
def url_login_screen(self):
"""
Return the url of the login screen for the notebook server.
"""
return self._url
def url_pub(self):
"""
Return the url of the list list of published worksheets.
"""
return self._url + '/pub'
def _timeit(self, code):
"""
Time evaluation of the given code, timing out after
self._url_timeout seconds, and using the default number and
repeat values as options to timeit.
"""
try:
alarm(self._url_timeout)
T = sage_timeit(code, globals(), number=self._timeit_number,
repeat=self._timeit_repeat)
except KeyboardInterrupt:
return TIMEOUT
finally:
cancel_alarm()
return T
def _geturlcode(self, url):
"""
Return code that when evaluated returns the data at the given
url as a string.
"""
return "urlopen('%s').read()" % url
def _geturl(self, url, use_alarm=True):
"""
Download the given url. If use_alarm is True (the default)
timeout and return the TIMEOUT object if the default download
timeout is exceeded.
"""
if not use_alarm:
return urlopen(url).read()
try:
alarm(self._url_timeout)
return urlopen(url).read()
except KeyboardInterrupt:
return TIMEOUT
finally:
cancel_alarm()
def test_login_screen(self):
"""
Download the main login screen for the Sage notebook server.
"""
if self._verbose:
print("testing login screen...")
return self._timeit(self._geturlcode(self.url_login_screen()))
def test_pub(self):
"""
Download the list of published worksheets.
"""
if self._verbose:
print("testing list of published worksheets...")
return self._timeit(self._geturlcode(self.url_pub()))
def get_urls_of_published_worksheets(self):
"""
Get a list of the urls of all published worksheets.
"""
pub = self._geturl(self.url_pub())
if pub == TIMEOUT:
print(TIMEOUT)
return []
return [self._url + X.strip('"').strip("'") for X in
re.findall('"/home/pub/[0-9]*"', pub)]
def test_allpub(self):
"""
View every single one of the published worksheets on the
Sage notebook server.
"""
if self._verbose:
print("testing download of all published worksheets...")
tm = walltime()
pub = self.get_urls_of_published_worksheets()
try:
alarm(self._url_timeout)
for i, X in enumerate(pub):
t0 = walltime()
self._geturl(X, use_alarm=False)
if self._verbose:
print("Got %s [%s/%s] %.2f seconds" % (X,i+1,len(pub), walltime(t0)))
return walltime(tm)
except KeyboardInterrupt:
return TIMEOUT
finally:
cancel_alarm()
def test(self):
"""
Run all tests and return the rest as a dictionary of pairs
{'test_name':output}.
"""
v = {}
for method in dir(self):
if method.startswith('test_'):
v[method] = getattr(self, method)()
return v
|