/usr/lib/python3/dist-packages/lockfile/sqlitelockfile.py is in python3-lockfile 1:0.12.2-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 | from __future__ import absolute_import, division
import time
import os
try:
unicode
except NameError:
unicode = str
from . import LockBase, NotLocked, NotMyLock, LockTimeout, AlreadyLocked
class SQLiteLockFile(LockBase):
"Demonstrate SQL-based locking."
testdb = None
def __init__(self, path, threaded=True, timeout=None):
"""
>>> lock = SQLiteLockFile('somefile')
>>> lock = SQLiteLockFile('somefile', threaded=False)
"""
LockBase.__init__(self, path, threaded, timeout)
self.lock_file = unicode(self.lock_file)
self.unique_name = unicode(self.unique_name)
if SQLiteLockFile.testdb is None:
import tempfile
_fd, testdb = tempfile.mkstemp()
os.close(_fd)
os.unlink(testdb)
del _fd, tempfile
SQLiteLockFile.testdb = testdb
import sqlite3
self.connection = sqlite3.connect(SQLiteLockFile.testdb)
c = self.connection.cursor()
try:
c.execute("create table locks"
"("
" lock_file varchar(32),"
" unique_name varchar(32)"
")")
except sqlite3.OperationalError:
pass
else:
self.connection.commit()
import atexit
atexit.register(os.unlink, SQLiteLockFile.testdb)
def acquire(self, timeout=None):
timeout = timeout if timeout is not None else self.timeout
end_time = time.time()
if timeout is not None and timeout > 0:
end_time += timeout
if timeout is None:
wait = 0.1
elif timeout <= 0:
wait = 0
else:
wait = timeout / 10
cursor = self.connection.cursor()
while True:
if not self.is_locked():
# Not locked. Try to lock it.
cursor.execute("insert into locks"
" (lock_file, unique_name)"
" values"
" (?, ?)",
(self.lock_file, self.unique_name))
self.connection.commit()
# Check to see if we are the only lock holder.
cursor.execute("select * from locks"
" where unique_name = ?",
(self.unique_name,))
rows = cursor.fetchall()
if len(rows) > 1:
# Nope. Someone else got there. Remove our lock.
cursor.execute("delete from locks"
" where unique_name = ?",
(self.unique_name,))
self.connection.commit()
else:
# Yup. We're done, so go home.
return
else:
# Check to see if we are the only lock holder.
cursor.execute("select * from locks"
" where unique_name = ?",
(self.unique_name,))
rows = cursor.fetchall()
if len(rows) == 1:
# We're the locker, so go home.
return
# Maybe we should wait a bit longer.
if timeout is not None and time.time() > end_time:
if timeout > 0:
# No more waiting.
raise LockTimeout("Timeout waiting to acquire"
" lock for %s" %
self.path)
else:
# Someone else has the lock and we are impatient..
raise AlreadyLocked("%s is already locked" % self.path)
# Well, okay. We'll give it a bit longer.
time.sleep(wait)
def release(self):
if not self.is_locked():
raise NotLocked("%s is not locked" % self.path)
if not self.i_am_locking():
raise NotMyLock("%s is locked, but not by me (by %s)" %
(self.unique_name, self._who_is_locking()))
cursor = self.connection.cursor()
cursor.execute("delete from locks"
" where unique_name = ?",
(self.unique_name,))
self.connection.commit()
def _who_is_locking(self):
cursor = self.connection.cursor()
cursor.execute("select unique_name from locks"
" where lock_file = ?",
(self.lock_file,))
return cursor.fetchone()[0]
def is_locked(self):
cursor = self.connection.cursor()
cursor.execute("select * from locks"
" where lock_file = ?",
(self.lock_file,))
rows = cursor.fetchall()
return not not rows
def i_am_locking(self):
cursor = self.connection.cursor()
cursor.execute("select * from locks"
" where lock_file = ?"
" and unique_name = ?",
(self.lock_file, self.unique_name))
return not not cursor.fetchall()
def break_lock(self):
cursor = self.connection.cursor()
cursor.execute("delete from locks"
" where lock_file = ?",
(self.lock_file,))
self.connection.commit()
|