/usr/share/pyshared/landscape/package/taskhandler.py is in landscape-common 12.04.3-0ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 | import os
import re
import logging
from twisted.internet.defer import succeed, Deferred
from landscape.lib.lock import lock_path, LockError
from landscape.lib.log import log_failure
from landscape.lib.lsb_release import LSB_RELEASE_FILENAME, parse_lsb_release
from landscape.reactor import TwistedReactor
from landscape.deployment import Configuration, init_logging
from landscape.package.store import PackageStore, InvalidHashIdDb
from landscape.broker.amp import RemoteBrokerConnector
class PackageTaskError(Exception):
"""Raised when a task hasn't been successfully completed."""
class PackageTaskHandlerConfiguration(Configuration):
"""Specialized configuration for L{PackageTaskHandler}s."""
@property
def package_directory(self):
"""Get the path to the package directory."""
return os.path.join(self.data_path, "package")
@property
def store_filename(self):
"""Get the path to the SQlite file for the L{PackageStore}."""
return os.path.join(self.package_directory, "database")
@property
def hash_id_directory(self):
"""Get the path to the directory holding the stock hash-id stores."""
return os.path.join(self.package_directory, "hash-id")
@property
def update_stamp_filename(self):
"""Get the path to the update-stamp file."""
return os.path.join(self.package_directory, "update-stamp")
class LazyRemoteBroker(object):
"""Wrapper class around L{RemoteBroker} providing lazy initialization.
This class is a wrapper around a regular L{RemoteBroker}. It connects to
the remote broker object only when one of its attributes is first accessed.
@param connector: The L{RemoteBrokerConnector} which will be used
to connect to the broker.
@note: This behaviour is needed in particular by the ReleaseUpgrader and
the PackageChanger, because if the they connect early and the
landscape-client package gets upgraded while they run, they will lose
the connection and will not be able to reconnect for a potentially long
window of time (till the new landscape-client package version is fully
configured and the service is started again).
"""
def __init__(self, connector):
self._connector = connector
self._remote = None
def __getattr__(self, method):
if self._remote:
return getattr(self._remote, method)
def wrapper(*args, **kwargs):
def got_connection(remote):
self._remote = remote
return getattr(self._remote, method)(*args, **kwargs)
result = self._connector.connect()
return result.addCallback(got_connection)
return wrapper
class PackageTaskHandler(object):
config_factory = PackageTaskHandlerConfiguration
queue_name = "default"
lsb_release_filename = LSB_RELEASE_FILENAME
package_store_class = PackageStore
def __init__(self, package_store, package_facade, remote_broker, config):
self._store = package_store
self._facade = package_facade
self._broker = remote_broker
self._config = config
self._count = 0
def run(self):
return self.handle_tasks()
def handle_tasks(self):
"""Handle the tasks in the queue.
The tasks will be handed over one by one to L{handle_task} until the
queue is empty or a task fails.
@see: L{handle_tasks}
"""
return self._handle_next_task(None)
def _handle_next_task(self, result, last_task=None):
"""Pick the next task from the queue and pass it to C{handle_task}."""
if last_task is not None:
# Last task succeeded. We can safely kill it now.
last_task.remove()
self._count += 1
task = self._store.get_next_task(self.queue_name)
if task:
# We have another task. Let's handle it.
result = self.handle_task(task)
result.addCallback(self._handle_next_task, last_task=task)
result.addErrback(self._handle_task_failure)
return result
else:
# No more tasks! We're done!
return succeed(None)
def _handle_task_failure(self, failure):
"""Gracefully handle a L{PackageTaskError} and stop handling tasks."""
failure.trap(PackageTaskError)
def handle_task(self, task):
"""Handle a single task.
Sub-classes must override this method in order to trigger task-specific
actions.
This method must return a L{Deferred} firing the task result. If the
deferred is successful the task will be removed from the queue and the
next one will be picked. If the task can't be completed, this method
must raise a L{PackageTaskError}, in this case the handler will stop
processing tasks and the failed task won't be removed from the queue.
"""
return succeed(None)
@property
def handled_tasks_count(self):
"""
Return the number of tasks that have been successfully handled so far.
"""
return self._count
def use_hash_id_db(self):
"""
Attach the appropriate pre-canned hash=>id database to our store.
"""
def use_it(hash_id_db_filename):
if hash_id_db_filename is None:
# Couldn't determine which hash=>id database to use,
# just ignore the failure and go on
return
if not os.path.exists(hash_id_db_filename):
# The appropriate database isn't there, but nevermind
# and just go on
return
try:
self._store.add_hash_id_db(hash_id_db_filename)
except InvalidHashIdDb:
# The appropriate database is there but broken,
# let's remove it and go on
logging.warning("Invalid hash=>id database %s" %
hash_id_db_filename)
os.remove(hash_id_db_filename)
return
result = self._determine_hash_id_db_filename()
result.addCallback(use_it)
return result
def _determine_hash_id_db_filename(self):
"""Build up the filename of the hash=>id database to use.
@return: a deferred resulting in the filename to use or C{None}
in case of errors.
"""
def got_server_uuid(server_uuid):
warning = "Couldn't determine which hash=>id database to use: %s"
if server_uuid is None:
logging.warning(warning % "server UUID not available")
return None
try:
lsb_release_info = parse_lsb_release(self.lsb_release_filename)
except IOError, error:
logging.warning(warning % str(error))
return None
try:
codename = lsb_release_info["code-name"]
except KeyError:
logging.warning(warning % "missing code-name key in %s" %
self.lsb_release_filename)
return None
arch = self._facade.get_arch()
if not arch:
# The Apt code should always return a non-empty string,
# so this branch shouldn't get executed at all. However
# this check is kept as an extra paranoia sanity check.
logging.warning(warning % "unknown dpkg architecture")
return None
return os.path.join(self._config.hash_id_directory,
"%s_%s_%s" % (server_uuid, codename, arch))
result = self._broker.get_server_uuid()
result.addCallback(got_server_uuid)
return result
def run_task_handler(cls, args, reactor=None):
# please only pass reactor when you have totally mangled everything with
# mocker. Otherwise bad things will happen.
if reactor is None:
reactor = TwistedReactor()
config = cls.config_factory()
config.load(args)
for directory in [config.package_directory, config.hash_id_directory]:
if not os.path.isdir(directory):
os.mkdir(directory)
program_name = cls.queue_name
lock_filename = os.path.join(config.package_directory,
program_name + ".lock")
try:
lock_path(lock_filename)
except LockError:
if config.quiet:
raise SystemExit()
raise SystemExit("error: package %s is already running"
% program_name)
words = re.findall("[A-Z][a-z]+", cls.__name__)
init_logging(config, "-".join(word.lower() for word in words))
# Setup our umask for Smart to use, this needs to setup file permissions to
# 0644 so...
os.umask(022)
package_store = cls.package_store_class(config.store_filename)
# Delay importing of the facades so that we don't
# import Smart unless we need to.
from landscape.package.facade import (
AptFacade, SmartFacade, has_new_enough_apt)
if has_new_enough_apt:
package_facade = AptFacade()
else:
package_facade = SmartFacade()
def finish():
connector.disconnect()
# For some obscure reason our TwistedReactor.stop method calls
# reactor.crash() instead of reactor.stop(), which doesn't work
# here. Maybe TwistedReactor.stop should simply use reactor.stop().
reactor.call_later(0, reactor._reactor.stop)
def got_error(failure):
log_failure(failure)
finish()
connector = RemoteBrokerConnector(reactor, config, retry_on_reconnect=True)
remote = LazyRemoteBroker(connector)
handler = cls(package_store, package_facade, remote, config)
result = Deferred()
result.addCallback(lambda x: handler.run())
result.addCallback(lambda x: finish())
result.addErrback(got_error)
reactor.call_when_running(lambda: result.callback(None))
reactor.run()
return result
|