/usr/share/pyshared/asrun/common/lockfile.py is in code-aster-run 1.13.1-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 | # -*- coding: utf-8 -*-
"""
Facility to manipulate lock files
"""
import os
import os.path as osp
import time
import logging
# directly use hashlib.sha1 in python >= 2.5
from asrun.common.utils import sha1
logging.basicConfig(format='%(asctime)s.%(msecs)03d %(levelname)-8s %(message)s',
datefmt='%H:%M:%S',
level=logging.ERROR)
log = logging.getLogger('lockfile.log')
class LockError(Exception):
"""Error raised when locking file."""
def __init__(self, msg):
self.msg = msg
class LockedFile(object):
"""Class to use a lock file transparently to write into a file."""
def __init__(self, filename, mode='a+b',
max_attempt=50, interval=0.2,
lockdir=None,
info=0):
"""Initialization.
mode : mode used to open file but how can it be different of 'a'.
max_attempt : number of attempts to lock repositories list
interval : delay between two attempts
info : information level, 0 (silent), 1 (info), 2 (debug)
"""
self._fname = filename
self._mode = mode
rootname = sha1(self._fname).hexdigest() + '_' + osp.basename(self._fname) + '.lock'
if lockdir is None:
lockdir = osp.dirname(self._fname)
self._lockfile = osp.join(lockdir, rootname)
self._locked = False
self._max_attempt = max_attempt
self._interval = interval
#if os.path.exists(self._lockfile):
#os.remove(self._lockfile)
if info >= 2:
log.setLevel(logging.DEBUG)
elif info >= 1:
log.setLevel(logging.INFO)
log.debug(u"locked file '%s' created, mode '%s'", self._fname, self._mode)
log.debug(u"lock file is '%s'" % self._lockfile)
def write(self, *args):
"""Equivalent of file.write"""
try:
self._acquire()
except LockError, err:
log.info(u"%s: write cancelled", err.msg)
return
fobj = open(self._fname, self._mode)
fobj.write(*args)
fobj.close()
self._release()
def close(self):
"""In case it has not been done."""
self._release()
def _acquire(self):
"""Acquire a lock."""
if self._locked:
log.info(u"'%s' is already locked", self._fname)
return
for i in range(self._max_attempt):
if i > 0:
time.sleep(self._interval)
try:
fd = os.open(self._lockfile, os.O_RDWR | os.O_CREAT | os.O_EXCL, 0644)
os.write(fd, "lock file of %s" % self._fname)
os.close(fd)
log.debug(u"'%s' locked", self._fname)
self._locked = True
return
except OSError:
log.info(u"'%s' already locked, wait for %s s", self._fname, self._interval)
pass
raise LockError(u"unable to lock '%s'" % self._fname)
def _release(self):
"""Release current lock."""
if not self._locked:
log.info(u"'%s' is not locked !" % self._fname)
return
try:
os.remove(self._lockfile)
except OSError, err:
log.info(u"can not remove '%s', reason: %s" % (self._lockfile, err))
pass
self._locked = False
log.debug(u"'%s' released", self._fname)
if __name__ == "__main__":
import sys
fname = "/tmp/testlock"
if len(sys.argv) > 1:
fname = sys.argv[1]
def message():
ct = time.time()
msecs = (ct - long(ct)) * 1000
txt = "%s.%03d written by process %s\n" \
% (time.strftime('%H:%M:%S'), msecs, os.getpid())
return txt
lof = LockedFile(fname, 'a', info=2)
for i in range(100):
lof.write(message())
|