/usr/lib/python3/dist-packages/sqlalchemy/testing/assertsql.py is in python3-sqlalchemy 1.0.15+ds1-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 | # testing/assertsql.py
# Copyright (C) 2005-2016 the SQLAlchemy authors and contributors
# <see AUTHORS file>
#
# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
from ..engine.default import DefaultDialect
from .. import util
import re
import collections
import contextlib
from .. import event
from sqlalchemy.schema import _DDLCompiles
from sqlalchemy.engine.util import _distill_params
from sqlalchemy.engine import url
class AssertRule(object):
is_consumed = False
errormessage = None
consume_statement = True
def process_statement(self, execute_observed):
pass
def no_more_statements(self):
assert False, 'All statements are complete, but pending '\
'assertion rules remain'
class SQLMatchRule(AssertRule):
pass
class CursorSQL(SQLMatchRule):
consume_statement = False
def __init__(self, statement, params=None):
self.statement = statement
self.params = params
def process_statement(self, execute_observed):
stmt = execute_observed.statements[0]
if self.statement != stmt.statement or (
self.params is not None and self.params != stmt.parameters):
self.errormessage = \
"Testing for exact SQL %s parameters %s received %s %s" % (
self.statement, self.params,
stmt.statement, stmt.parameters
)
else:
execute_observed.statements.pop(0)
self.is_consumed = True
if not execute_observed.statements:
self.consume_statement = True
class CompiledSQL(SQLMatchRule):
def __init__(self, statement, params=None, dialect='default'):
self.statement = statement
self.params = params
self.dialect = dialect
def _compare_sql(self, execute_observed, received_statement):
stmt = re.sub(r'[\n\t]', '', self.statement)
return received_statement == stmt
def _compile_dialect(self, execute_observed):
if self.dialect == 'default':
return DefaultDialect()
else:
# ugh
if self.dialect == 'postgresql':
params = {'implicit_returning': True}
else:
params = {}
return url.URL(self.dialect).get_dialect()(**params)
def _received_statement(self, execute_observed):
"""reconstruct the statement and params in terms
of a target dialect, which for CompiledSQL is just DefaultDialect."""
context = execute_observed.context
compare_dialect = self._compile_dialect(execute_observed)
if isinstance(context.compiled.statement, _DDLCompiles):
compiled = \
context.compiled.statement.compile(dialect=compare_dialect)
else:
compiled = (
context.compiled.statement.compile(
dialect=compare_dialect,
column_keys=context.compiled.column_keys,
inline=context.compiled.inline)
)
_received_statement = re.sub(r'[\n\t]', '', util.text_type(compiled))
parameters = execute_observed.parameters
if not parameters:
_received_parameters = [compiled.construct_params()]
else:
_received_parameters = [
compiled.construct_params(m) for m in parameters]
return _received_statement, _received_parameters
def process_statement(self, execute_observed):
context = execute_observed.context
_received_statement, _received_parameters = \
self._received_statement(execute_observed)
params = self._all_params(context)
equivalent = self._compare_sql(execute_observed, _received_statement)
if equivalent:
if params is not None:
all_params = list(params)
all_received = list(_received_parameters)
while all_params and all_received:
param = dict(all_params.pop(0))
for idx, received in enumerate(list(all_received)):
# do a positive compare only
for param_key in param:
# a key in param did not match current
# 'received'
if param_key not in received or \
received[param_key] != param[param_key]:
break
else:
# all keys in param matched 'received';
# onto next param
del all_received[idx]
break
else:
# param did not match any entry
# in all_received
equivalent = False
break
if all_params or all_received:
equivalent = False
if equivalent:
self.is_consumed = True
self.errormessage = None
else:
self.errormessage = self._failure_message(params) % {
'received_statement': _received_statement,
'received_parameters': _received_parameters
}
def _all_params(self, context):
if self.params:
if util.callable(self.params):
params = self.params(context)
else:
params = self.params
if not isinstance(params, list):
params = [params]
return params
else:
return None
def _failure_message(self, expected_params):
return (
'Testing for compiled statement %r partial params %r, '
'received %%(received_statement)r with params '
'%%(received_parameters)r' % (
self.statement.replace('%', '%%'), expected_params
)
)
class RegexSQL(CompiledSQL):
def __init__(self, regex, params=None):
SQLMatchRule.__init__(self)
self.regex = re.compile(regex)
self.orig_regex = regex
self.params = params
self.dialect = 'default'
def _failure_message(self, expected_params):
return (
'Testing for compiled statement ~%r partial params %r, '
'received %%(received_statement)r with params '
'%%(received_parameters)r' % (
self.orig_regex, expected_params
)
)
def _compare_sql(self, execute_observed, received_statement):
return bool(self.regex.match(received_statement))
class DialectSQL(CompiledSQL):
def _compile_dialect(self, execute_observed):
return execute_observed.context.dialect
def _compare_no_space(self, real_stmt, received_stmt):
stmt = re.sub(r'[\n\t]', '', real_stmt)
return received_stmt == stmt
def _received_statement(self, execute_observed):
received_stmt, received_params = super(DialectSQL, self).\
_received_statement(execute_observed)
# TODO: why do we need this part?
for real_stmt in execute_observed.statements:
if self._compare_no_space(real_stmt.statement, received_stmt):
break
else:
raise AssertionError(
"Can't locate compiled statement %r in list of "
"statements actually invoked" % received_stmt)
return received_stmt, execute_observed.context.compiled_parameters
def _compare_sql(self, execute_observed, received_statement):
stmt = re.sub(r'[\n\t]', '', self.statement)
# convert our comparison statement to have the
# paramstyle of the received
paramstyle = execute_observed.context.dialect.paramstyle
if paramstyle == 'pyformat':
stmt = re.sub(
r':([\w_]+)', r"%(\1)s", stmt)
else:
# positional params
repl = None
if paramstyle == 'qmark':
repl = "?"
elif paramstyle == 'format':
repl = r"%s"
elif paramstyle == 'numeric':
repl = None
stmt = re.sub(r':([\w_]+)', repl, stmt)
return received_statement == stmt
class CountStatements(AssertRule):
def __init__(self, count):
self.count = count
self._statement_count = 0
def process_statement(self, execute_observed):
self._statement_count += 1
def no_more_statements(self):
if self.count != self._statement_count:
assert False, 'desired statement count %d does not match %d' \
% (self.count, self._statement_count)
class AllOf(AssertRule):
def __init__(self, *rules):
self.rules = set(rules)
def process_statement(self, execute_observed):
for rule in list(self.rules):
rule.errormessage = None
rule.process_statement(execute_observed)
if rule.is_consumed:
self.rules.discard(rule)
if not self.rules:
self.is_consumed = True
break
elif not rule.errormessage:
# rule is not done yet
self.errormessage = None
break
else:
self.errormessage = list(self.rules)[0].errormessage
class Or(AllOf):
def process_statement(self, execute_observed):
for rule in self.rules:
rule.process_statement(execute_observed)
if rule.is_consumed:
self.is_consumed = True
break
else:
self.errormessage = list(self.rules)[0].errormessage
class SQLExecuteObserved(object):
def __init__(self, context, clauseelement, multiparams, params):
self.context = context
self.clauseelement = clauseelement
self.parameters = _distill_params(multiparams, params)
self.statements = []
class SQLCursorExecuteObserved(
collections.namedtuple(
"SQLCursorExecuteObserved",
["statement", "parameters", "context", "executemany"])
):
pass
class SQLAsserter(object):
def __init__(self):
self.accumulated = []
def _close(self):
self._final = self.accumulated
del self.accumulated
def assert_(self, *rules):
rules = list(rules)
observed = list(self._final)
while observed and rules:
rule = rules[0]
rule.process_statement(observed[0])
if rule.is_consumed:
rules.pop(0)
elif rule.errormessage:
assert False, rule.errormessage
if rule.consume_statement:
observed.pop(0)
if not observed and rules:
rules[0].no_more_statements()
elif not rules and observed:
assert False, "Additional SQL statements remain"
@contextlib.contextmanager
def assert_engine(engine):
asserter = SQLAsserter()
orig = []
@event.listens_for(engine, "before_execute")
def connection_execute(conn, clauseelement, multiparams, params):
# grab the original statement + params before any cursor
# execution
orig[:] = clauseelement, multiparams, params
@event.listens_for(engine, "after_cursor_execute")
def cursor_execute(conn, cursor, statement, parameters,
context, executemany):
if not context:
return
# then grab real cursor statements and associate them all
# around a single context
if asserter.accumulated and \
asserter.accumulated[-1].context is context:
obs = asserter.accumulated[-1]
else:
obs = SQLExecuteObserved(context, orig[0], orig[1], orig[2])
asserter.accumulated.append(obs)
obs.statements.append(
SQLCursorExecuteObserved(
statement, parameters, context, executemany)
)
try:
yield asserter
finally:
event.remove(engine, "after_cursor_execute", cursor_execute)
event.remove(engine, "before_execute", connection_execute)
asserter._close()
|