/usr/share/pyshared/axiom/_pysqlite2.py is in python-axiom 0.6.0-4.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 | # -*- test-case-name: axiom.test.test_pysqlite2 -*-
"""
PySQLite2 Connection and Cursor wrappers.
These provide a uniform interface on top of PySQLite2 for Axiom, particularly
including error handling behavior and exception types.
"""
import time
from pysqlite2 import dbapi2
from twisted.python import log
from axiom import errors, iaxiom
class Connection(object):
def __init__(self, connection, timeout=None):
self._connection = connection
self._timeout = timeout
def fromDatabaseName(cls, dbFilename, timeout=None, isolationLevel=None):
return cls(dbapi2.connect(dbFilename, timeout=0,
isolation_level=isolationLevel))
fromDatabaseName = classmethod(fromDatabaseName)
def cursor(self):
return Cursor(self, self._timeout)
def identifySQLError(self, sql, args, e):
"""
Identify an appropriate SQL error object for the given message for the
supported versions of sqlite.
@return: an SQLError
"""
message = e.args[0]
if message.startswith("table") and message.endswith("already exists"):
return errors.TableAlreadyExists(sql, args, e)
return errors.SQLError(sql, args, e)
class Cursor(object):
def __init__(self, connection, timeout):
self._connection = connection
self._cursor = connection._connection.cursor()
self.timeout = timeout
def __iter__(self):
return iter(self._cursor)
def time(self):
"""
Return the current wallclock time as a float representing seconds
from an fixed but arbitrary point.
"""
return time.time()
def sleep(self, seconds):
"""
Block for the given number of seconds.
@type seconds: C{float}
"""
time.sleep(seconds)
def execute(self, sql, args=()):
try:
try:
blockedTime = 0.0
t = self.time()
try:
# SQLite3 uses something like exponential backoff when
# trying to acquire a database lock. This means that even
# for very long timeouts, it may only attempt to acquire
# the lock a handful of times. Another process which is
# executing frequent, short-lived transactions may acquire
# and release the lock many times between any two attempts
# by this one to acquire it. If this process gets unlucky
# just a few times, this execute may fail to acquire the
# lock within the specified timeout.
# Since attempting to acquire the lock is a fairly cheap
# operation, we take another route. SQLite3 is always told
# to use a timeout of 0 - ie, acquire it on the first try
# or fail instantly. We will keep doing this, ten times a
# second, until the actual timeout expires.
# What would be really fantastic is a notification
# mechanism for information about the state of the lock
# changing. Of course this clearly insane, no one has ever
# managed to invent a tool for communicating one bit of
# information between multiple processes.
while 1:
try:
return self._cursor.execute(sql, args)
except dbapi2.OperationalError, e:
if e.args[0] == 'database is locked':
now = self.time()
if self.timeout is not None:
if (now - t) > self.timeout:
raise errors.TimeoutError(sql, self.timeout, e)
self.sleep(0.1)
blockedTime = self.time() - t
else:
raise
finally:
txntime = self.time() - t
if txntime - blockedTime > 2.0:
log.msg('Extremely long execute: %s' % (txntime - blockedTime,))
log.msg(sql)
# import traceback; traceback.print_stack()
log.msg(interface=iaxiom.IStatEvent,
stat_cursor_execute_time=txntime,
stat_cursor_blocked_time=blockedTime)
except dbapi2.OperationalError, e:
if e.args[0] == 'database schema has changed':
return self._cursor.execute(sql, args)
raise
except (dbapi2.ProgrammingError,
dbapi2.InterfaceError,
dbapi2.OperationalError), e:
raise self._connection.identifySQLError(sql, args, e)
def lastRowID(self):
return self._cursor.lastrowid
def close(self):
self._cursor.close()
|