/usr/share/pyshared/axiom/upgrade.py is in python-axiom 0.6.0-4.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 | # -*- test-case-name: axiom.test.test_upgrading -*-
"""
Axiom Item/schema upgrade support.
"""
from twisted.python.failure import Failure
from twisted.python.log import msg
from twisted.python.reflect import qual
from axiom.errors import NoUpgradePathAvailable, UpgraderRecursion
from axiom.errors import ItemUpgradeError
from axiom.item import _legacyTypes, _typeNameToMostRecentClass
_upgradeRegistry = {}
class _StoreUpgrade(object):
"""
Manage Item upgrades and upgrade batching for a store.
@type _currentlyUpgrading: C{dict}
@ivar _currentlyUpgrading: A map of storeIDs to Items currently in the
middle of an upgrader. Used to make sure that the same item isn't
upgraded reentrantly.
@type _oldTypesRemaining: C{list}
@ivar _oldTypesRemaining: All the old types which have not been fully
upgraded in this database.
"""
def __init__(self, store):
self.store = store
self._currentlyUpgrading = {}
self._oldTypesRemaining = []
def upgradesPending(self):
return bool(self._oldTypesRemaining)
upgradesPending = property(
upgradesPending,
doc="""
Flag indicating whether there any types that still need to be upgraded
or not.
""")
def checkUpgradePaths(self):
"""
Check that all of the accumulated old Item types have a way to get
from their current version to the latest version.
@raise axiom.errors.NoUpgradePathAvailable: for any, and all, Items
that do not have a valid upgrade path
"""
cantUpgradeErrors = []
for oldVersion in self._oldTypesRemaining:
# We have to be able to get from oldVersion.schemaVersion to
# the most recent type.
currentType = _typeNameToMostRecentClass.get(
oldVersion.typeName, None)
if currentType is None:
# There isn't a current version of this type; it's entirely
# legacy, will be upgraded by deleting and replacing with
# something else.
continue
typeInQuestion = oldVersion.typeName
upgver = oldVersion.schemaVersion
while upgver < currentType.schemaVersion:
# Do we have enough of the schema present to upgrade?
if ((typeInQuestion, upgver)
not in _upgradeRegistry):
cantUpgradeErrors.append(
"No upgrader present for %s (%s) from %d to %d" % (
typeInQuestion, qual(currentType), upgver,
upgver + 1))
# Is there a type available for each upgrader version?
if upgver+1 != currentType.schemaVersion:
if (typeInQuestion, upgver+1) not in _legacyTypes:
cantUpgradeErrors.append(
"Type schema required for upgrade missing:"
" %s version %d" % (
typeInQuestion, upgver+1))
upgver += 1
if cantUpgradeErrors:
raise NoUpgradePathAvailable('\n '.join(cantUpgradeErrors))
def queueTypeUpgrade(self, oldtype):
"""
Queue a type upgrade for C{oldtype}.
"""
if oldtype not in self._oldTypesRemaining:
self._oldTypesRemaining.append(oldtype)
def upgradeItem(self, thisItem):
"""
Upgrade a legacy item.
@raise axiom.errors.UpgraderRecursion: If the given item is already in
the process of being upgraded.
"""
sid = thisItem.storeID
if sid in self._currentlyUpgrading:
raise UpgraderRecursion()
self._currentlyUpgrading[sid] = thisItem
try:
return upgradeAllTheWay(thisItem)
finally:
self._currentlyUpgrading.pop(sid)
def upgradeEverything(self):
"""
Upgrade every item in the store, one at a time.
@raise axiom.errors.ItemUpgradeError: if an item upgrade failed
@return: A generator that yields for each item upgrade.
"""
return self.upgradeBatch(1)
def upgradeBatch(self, n):
"""
Upgrade the entire store in batches, yielding after each batch.
@param n: Number of upgrades to perform per transaction
@type n: C{int}
@raise axiom.errors.ItemUpgradeError: if an item upgrade failed
@return: A generator that yields after each batch upgrade. This needs
to be consumed for upgrading to actually take place.
"""
store = self.store
def _doBatch(itemType):
upgradedAnything = False
for theItem in store.query(itemType, limit=n):
upgradedAnything = True
try:
self.upgradeItem(theItem)
except:
f = Failure()
raise ItemUpgradeError(
f, theItem.storeID, itemType,
_typeNameToMostRecentClass[itemType.typeName])
return upgradedAnything
if self.upgradesPending:
didAny = False
while self._oldTypesRemaining:
t0 = self._oldTypesRemaining[0]
upgradedAnything = store.transact(_doBatch, t0)
if not upgradedAnything:
self._oldTypesRemaining.pop(0)
if didAny:
msg("%s finished upgrading %s" % (store.dbdir.path, qual(t0)))
continue
elif not didAny:
didAny = True
msg("%s beginning upgrade..." % (store.dbdir.path,))
yield None
if didAny:
msg("%s completely upgraded." % (store.dbdir.path,))
def registerUpgrader(upgrader, typeName, oldVersion, newVersion):
"""
Register a callable which can perform a schema upgrade between two
particular versions.
@param upgrader: A one-argument callable which will upgrade an object. It
is invoked with an instance of the old version of the object.
@param typeName: The database typename for which this is an upgrader.
@param oldVersion: The version from which this will upgrade.
@param newVersion: The version to which this will upgrade. This must be
exactly one greater than C{oldVersion}.
"""
# assert (typeName, oldVersion, newVersion) not in _upgradeRegistry, "duplicate upgrader"
# ^ this makes the tests blow up so it's just disabled for now; perhaps we
# should have a specific test mode
# assert newVersion - oldVersion == 1, "read the doc string"
assert isinstance(typeName, str), "read the doc string"
_upgradeRegistry[typeName, oldVersion] = upgrader
def registerAttributeCopyingUpgrader(itemType, fromVersion, toVersion, postCopy=None):
"""
Register an upgrader for C{itemType}, from C{fromVersion} to C{toVersion},
which will copy all attributes from the legacy item to the new item. If
postCopy is provided, it will be called with the new item after upgrading.
@param itemType: L{axiom.item.Item} subclass
@param postCopy: a callable of one argument
@return: None
"""
def upgrader(old):
newitem = old.upgradeVersion(itemType.typeName, fromVersion, toVersion,
**dict((str(name), getattr(old, name))
for (name, _) in old.getSchema()))
if postCopy is not None:
postCopy(newitem)
return newitem
registerUpgrader(upgrader, itemType.typeName, fromVersion, toVersion)
def registerDeletionUpgrader(itemType, fromVersion, toVersion):
"""
Register an upgrader for C{itemType}, from C{fromVersion} to C{toVersion},
which will delete the item from the database.
@param itemType: L{axiom.item.Item} subclass
@return: None
"""
# XXX This should actually do something more special so that a new table is
# not created and such.
def upgrader(old):
old.deleteFromStore()
return None
registerUpgrader(upgrader, itemType.typeName, fromVersion, toVersion)
def upgradeAllTheWay(o):
assert o.__legacy__
while True:
try:
upgrader = _upgradeRegistry[o.typeName, o.schemaVersion]
except KeyError:
break
else:
o = upgrader(o)
if o is None:
# Object was explicitly destroyed during upgrading.
break
return o
__all__ = [
'registerUpgrader', 'registerAttributeCopyingUpgrader',
'registerDeletionUpgrader']
|