/usr/lib/python3/dist-packages/molotov/util.py is in python3-molotov 1.4-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 | from io import StringIO
import traceback
import sys
import functools
import json
import socket
import os
import asyncio
import time
from urllib.parse import urlparse, urlunparse
from socket import gethostbyname
from aiohttp import ClientSession
_DNS_CACHE = {}
_STOP = False
_TIMER = None
def get_timer():
return _TIMER
def set_timer(value=None):
global _TIMER
if value is None:
value = int(time.time())
_TIMER = value
def stop():
global _STOP
_STOP = True
def is_stopped():
return _STOP
def resolve(url):
parts = urlparse(url)
if '@' in parts.netloc:
username, password = parts.username, parts.password
netloc = parts.netloc.split('@', 1)[1]
else:
username, password = None, None
netloc = parts.netloc
if ':' in netloc:
host = netloc.split(':')[0]
else:
host = netloc
port_provided = False
if not parts.port and parts.scheme == 'https':
port = 443
elif not parts.port and parts.scheme == 'http':
port = 80
else:
port = parts.port
port_provided = True
original = host
resolved = None
if host in _DNS_CACHE:
resolved = _DNS_CACHE[host]
else:
try:
resolved = gethostbyname(host)
_DNS_CACHE[host] = resolved
except socket.gaierror:
return url, original, host
# Don't use a resolved hostname for SSL requests otherwise the
# certificate will not match the IP address (resolved)
host = resolved if parts.scheme != 'https' else host
netloc = host
if port_provided:
netloc += ':%d' % port
if username is not None:
if password is not None:
netloc = '%s:%s@%s' % (username, password, netloc)
else:
netloc = '%s@%s' % (username, netloc)
if port not in (443, 80):
host += ':%d' % port
original += ':%d' % port
new = urlunparse((parts.scheme, netloc, parts.path or '', '',
parts.query or '', parts.fragment or ''))
return new, original, host
class OptionError(Exception):
pass
def _expand_args(args, options):
for key, val in options.items():
setattr(args, key, val)
def expand_options(config, scenario, args):
if not isinstance(config, str):
try:
config = json.loads(config.read())
except Exception:
raise OptionError("Can't parse %r" % config)
else:
if not os.path.exists(config):
raise OptionError("Can't find %r" % config)
with open(config) as f:
try:
config = json.loads(f.read())
except ValueError:
raise OptionError("Can't parse %r" % config)
if 'molotov' not in config:
raise OptionError("Bad config -- no molotov key")
if 'tests' not in config['molotov']:
raise OptionError("Bad config -- no molotov/tests key")
if scenario not in config['molotov']['tests']:
raise OptionError("Can't find %r in the config" % scenario)
_expand_args(args, config['molotov']['tests'][scenario])
def _run_in_fresh_loop(coro):
loop = asyncio.new_event_loop()
res = None
try:
task = loop.create_task(coro(loop=loop))
res = loop.run_until_complete(task)
finally:
loop.close()
return res
async def _request(endpoint, verb='GET', session_options=None,
json=False, loop=None, **options):
if session_options is None:
session_options = {}
async with ClientSession(loop=loop, **session_options) as session:
meth = getattr(session, verb.lower())
result = {}
async with meth(endpoint, **options) as resp:
if json:
result['content'] = await resp.json()
else:
result['content'] = await resp.text()
result['status'] = resp.status
result['headers'] = resp.headers
return result
def request(endpoint, verb='GET', session_options=None, **options):
"""Performs a synchronous request.
Uses a dedicated event loop and aiohttp.ClientSession object.
Options:
- endpoint: the endpoint to call
- verb: the HTTP verb to use (defaults: GET)
- session_options: a dict containing options to initialize the session
(defaults: None)
- options: extra options for the request (defaults: None)
Returns a dict object with the following keys:
- content: the content of the response
- status: the status
- headers: a dict with all the response headers
"""
req = functools.partial(_request, endpoint, verb, session_options,
**options)
return _run_in_fresh_loop(req)
def json_request(endpoint, verb='GET', session_options=None, **options):
"""Like :func:`molotov.request` but extracts json from the response.
"""
req = functools.partial(_request, endpoint, verb, session_options,
json=True, **options)
return _run_in_fresh_loop(req)
_VARS = {}
def set_var(name, value):
"""Sets a global variable.
Options:
- name: name of the variable
- value: object to set
"""
_VARS[name] = value
def get_var(name, factory=None):
"""Gets a global variable given its name.
If factory is not None and the variable is not set, factory
is a callable that will set the variable.
If not set, returns None.
"""
if name not in _VARS and factory is not None:
_VARS[name] = factory()
return _VARS.get(name)
# taken from https://stackoverflow.com/a/37211337
def _make_sleep():
async def sleep(delay, result=None, *, loop=None):
coro = asyncio.sleep(delay, result=result, loop=loop)
task = asyncio.ensure_future(coro, loop=loop)
sleep.tasks.add(task)
try:
return await task
except asyncio.CancelledError:
return result
finally:
sleep.tasks.remove(task)
sleep.tasks = set()
sleep.cancel_all = lambda: sum(task.cancel() for task in sleep.tasks)
return sleep
cancellable_sleep = _make_sleep()
def printable_error(error, tb=None):
printable = [repr(error)]
if tb is None:
tb = sys.exc_info()[2]
printed = StringIO()
traceback.print_tb(tb, file=printed)
printed.seek(0)
for line in printed.readlines():
printable.append(line.rstrip('\n'))
return printable
|