/usr/lib/python3/dist-packages/pyutilib/component/config/tempfiles.py is in python3-pyutilib 5.3.5-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 | # _________________________________________________________________________
#
# PyUtilib: A Python utility library.
# Copyright (c) 2008 Sandia Corporation.
# This software is distributed under the BSD License.
# Under the terms of Contract DE-AC04-94AL85000 with Sandia Corporation,
# the U.S. Government retains certain rights in this software.
# _________________________________________________________________________
"""A plugin that manages temporary files."""
__all__ = ['ITempfileManager', 'TempfileManagerPlugin', 'TempfileManager']
import os
import time
import tempfile
import logging
import shutil
from pyutilib.component.config.options import *
from pyutilib.component.config.managed_plugin import *
deletion_errors_are_fatal = True
class ITempfileManager(Interface):
"""Interface for managing temporary files."""
def create_tempfile(self, suffix=None, prefix=None, text=False, dir=None):
"""Return the absolute path of a temporary file that is guaranteed to be unique."""
def create_tempdir(self, suffix=None, prefix=None, dir=None):
"""Return the absolute path of a temporary directory that is guaranteed to be unique."""
def add_tempfile(self, filename):
"""Declare this file to be temporary."""
def clear_tempfiles(self, remove=True):
"""Delete all temporary files."""
def sequential_files(self, ctr=0):
"""Start generating sequential files, using the specified counter"""
def unique_files(self):
"""Start generating unique files"""
def push(self):
"""Push tempfiles onto a stack. This creates a new context for tempfiles."""
def pop(self, remove=True):
"""Pop tempfiles onto a stack. Tempfiles that are popped off the stack are deleted."""
class TempfileManagerPlugin(ManagedSingletonPlugin):
"""A plugin that manages temporary files."""
implements(ITempfileManager)
def __init__(self, **kwds):
if getattr(self, '_initialized', False) is True:
return
self._initialized = True
#
kwds['name']='TempfileManager'
ManagedSingletonPlugin.__init__(self,**kwds)
self._tempfiles = [[]]
declare_option("tempdir", default=None)
self._ctr=-1
def create_tempfile(self, suffix=None, prefix=None, text=False, dir=None):
"""
Return the absolute path of a temporary filename that is
guaranteed to be unique. This function generates the file and returns
the filename.
"""
if suffix is None:
suffix=''
if prefix is None:
prefix='tmp'
if dir is None:
dir=self.tempdir
ans = tempfile.mkstemp(suffix=suffix, prefix=prefix, text=text, dir=dir)
ans = list(ans)
if not os.path.isabs(ans[1]): #pragma:nocover
fname = os.path.join(dir,ans[1])
else:
fname = ans[1]
os.close(ans[0])
if self._ctr >= 0:
new_fname = os.path.join(dir,prefix+str(self._ctr)+suffix)
# Delete any file having the sequential name and then
# rename
if os.path.exists(new_fname):
os.remove(new_fname)
shutil.move(fname, new_fname)
fname = new_fname
self._ctr += 1
self._tempfiles[-1].append(fname)
return fname
def create_tempdir(self, suffix=None, prefix=None, dir=None):
"""
Return the absolute path of a temporary directory that is
guaranteed to be unique. This function generates the directory and returns
the directory name.
"""
if suffix is None:
suffix=''
if prefix is None:
prefix='tmp'
if dir is None:
dir=self.tempdir
dirname = tempfile.mkdtemp(suffix=suffix, prefix=prefix, dir=dir)
if self._ctr >= 0:
new_dirname = os.path.join(dir,prefix+str(self._ctr)+suffix)
# Delete any directory having the sequential name and then
# rename
if os.path.exists(new_dirname):
shutil.rmtree(new_dirname)
shutil.move(dirname, new_dirname)
dirname = new_dirname
self._ctr += 1
self._tempfiles[-1].append(dirname)
return dirname
def add_tempfile(self, filename, exists=True):
"""Declare this file to be temporary."""
tmp = os.path.abspath(filename)
if exists and not os.path.exists(tmp):
raise IOError("Temporary file does not exist: "+tmp)
self._tempfiles[-1].append(tmp)
def clear_tempfiles(self, remove=True):
"""Delete all temporary files."""
while len(self._tempfiles) > 1:
self.pop(remove)
self.pop(remove)
def sequential_files(self, ctr=0):
"""Start generating sequential files, using the specified counter"""
self._ctr=ctr
def unique_files(self):
"""Stop generating sequential files, using the specified counter"""
self._ctr=-1
#
# Support "with" statements, where the pop automatically
# takes place on exit.
#
def push(self):
self._tempfiles.append([])
return self
def __exit__(self, type, value, traceback):
self.pop(remove=True)
def pop(self, remove=True):
files = self._tempfiles.pop()
if remove:
for filename in files:
if os.path.exists(filename):
if os.path.isdir(filename):
shutil.rmtree(filename, ignore_errors=not deletion_errors_are_fatal)
else:
try:
os.remove(filename)
except WindowsError:
# Sometimes Windows doesn't release the
# file lock immediately when the process
# terminates. If we get an error, wait a
# second and try again.
try:
time.sleep(1)
os.remove(filename)
except WindowsError:
if deletion_errors_are_fatal:
raise
else:
# Failure to delete a tempfile
# should NOT be fatal
logger = logging.getLogger(
'pyutilib.component.config')
logger.warning("Unable to delete temporary "
"file %s" % ( filename, ) )
if len(self._tempfiles) == 0:
self._tempfiles = [[]]
#
# This global class provides a convenient handle to this
# singleton plugin
#
TempfileManager = TempfileManagerPlugin()
|