/usr/share/pymsnt/src/xdb.py is in pymsnt 0.11.3+hg224-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 | # Copyright 2004-2006 James Bunton <james@delx.cjb.net>
# Licensed for distribution under the GPL version 2, check COPYING for details
from twisted.words.xish.domish import parseFile, Element
from debug import LogEvent, INFO, WARN
import os
import os.path
import shutil
import config
try:
from hashlib import md5
except ImportError:
from md5 import md5
X = os.path.sep
SPOOL_UMASK = 0077
def unmangle(file):
chunks = file.split("%")
end = chunks.pop()
file = "%s@%s" % ("%".join(chunks), end)
return file
def mangle(file):
return file.replace("@", "%")
def makeHash(file):
return md5(file).hexdigest()[0:3]
class XDB:
"""
Class for storage of data.
Create one instance of the class for each XDB 'folder' you want.
Call request()/set() with the xdbns argument you wish to retrieve
"""
def __init__(self, name, mangle=False):
""" Creates an XDB object. If mangle is True then any '@' signs in filenames will be changed to '%' """
self.name = os.path.join(os.path.abspath(config.spooldir), name)
if not os.path.exists(self.name):
os.makedirs(self.name)
self.mangle = mangle
def __getFile(self, file):
if(self.mangle):
file = mangle(file)
hash = makeHash(file)
document = parseFile(self.name + X + hash + X + file + ".xml")
return document
def __writeFile(self, file, text):
if(self.mangle):
file = mangle(file)
prev_umask = os.umask(SPOOL_UMASK)
hash = makeHash(file)
pre = self.name + X + hash + X
if not os.path.exists(pre):
os.makedirs(pre)
try:
f = open(pre + file + ".xml.new", "w")
f.write(text)
f.close()
shutil.move(pre + file + ".xml.new", pre + file + ".xml")
except IOError, e:
LogEvent(WARN, "", "IOError " + str(e))
raise
os.umask(prev_umask)
def files(self):
""" Returns a list containing the files in the current XDB database """
files = []
for dir in os.listdir(self.name):
if(os.path.isdir(self.name + X + dir)):
files.extend(os.listdir(self.name + X + dir))
if self.mangle:
files = [unmangle(x)[:-4] for x in files]
else:
files = [x[:-4] for x in files]
while files.count(''):
files.remove('')
return files
def request(self, file, xdbns):
""" Requests a specific xdb namespace from the XDB 'file' """
try:
document = self.__getFile(file)
for child in document.elements():
if(child.getAttribute("xdbns") == xdbns):
return child
except:
return None
def set(self, file, xdbns, element):
""" Sets a specific xdb namespace in the XDB 'file' to element """
try:
element.attributes["xdbns"] = xdbns
document = None
try:
document = self.__getFile(file)
except IOError:
pass
if(not document):
document = Element((None, "xdb"))
# Remove the existing node (if any)
for child in document.elements():
if(child.getAttribute("xdbns") == xdbns):
document.children.remove(child)
# Add the new one
document.addChild(element)
self.__writeFile(file, document.toXml())
except IOError, e:
LogEvent(WARN, "", "IOError " + str(e))
raise
def remove(self, file):
""" Removes an XDB file """
if self.mangle:
file = mangle(file)
hash = makeHash(file)
file = self.name + X + hash + X + file + ".xml"
try:
os.remove(file)
except IOError, e:
LogEvent(WARN, "", "IOError " + str(e))
raise
|