/usr/share/pyshared/kinterbasdb/_connection_timeout.py is in python-kinterbasdb 3.3.0-3.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 | # KInterbasDB Python Package - Python Wrapper for Core
#
# Version 3.3
#
# The following contributors hold Copyright (C) over their respective
# portions of code (see license.txt for details):
#
# [Original Author (maintained through version 2.0-0.3.1):]
# 1998-2001 [alex] Alexander Kuznetsov <alexan@users.sourceforge.net>
# [Maintainers (after version 2.0-0.3.1):]
# 2001-2002 [maz] Marek Isalski <kinterbasdb@maz.nu>
# 2002-2006 [dsr] David Rushby <woodsplitter@rocketmail.com>
# [Contributors:]
# 2001 [eac] Evgeny A. Cherkashin <eugeneai@icc.ru>
# 2001-2002 [janez] Janez Jere <janez.jere@void.si>
# This module is private.
import threading
_lock = threading.Lock()
_objects = {}
def startTimeoutThreadIfNecessary(cttMain, cttStopFunc):
_lock.acquire()
try:
if 'ctt' not in _objects:
startedEvent = threading.Event()
# Start the CTT:
ctt = ConnectionTimeoutThread(cttMain, startedEvent)
ctt.start()
_objects['ctt'] = ctt
# Wait for the CTT to indicate that it has finished starting:
startedEvent.wait()
del startedEvent
_objects['stopFunc'] = cttStopFunc
finally:
_lock.release()
def isTimeoutThreadStarted():
_lock.acquire()
try:
return 'ctt' in _objects
finally:
_lock.release()
def stopConnectionTimeoutThread():
_lock.acquire()
try:
_objects['stopFunc']()
assert not _objects['ctt'].isAlive()
_objects.clear()
finally:
_lock.release()
class ConnectionTimeoutThread(threading.Thread):
# This class exists to facilitate:
# - Proper bootstrapping of Python thread state onto the
# ConnectionTimeoutThread.
# - Communication between Python code and the ConnectionTimeoutThread.
#
# All of the "real functionality" of the ConnectionTimeoutThread is written
# in C, and most of it executes with the GIL released.
def __init__(self, cttMain, startedEvent):
threading.Thread.__init__(self, name='kinterbasdb_ConTimeoutThread')
self.setDaemon(True)
self._cttMain = cttMain
self._startedEvent = startedEvent
def run(self):
startedEvent = self._startedEvent
del self._startedEvent
cttMain = self._cttMain
del self._cttMain
cttMain(self, startedEvent)
|