/usr/share/pyshared/juju/charm/publisher.py is in juju-0.7 0.7-0ubuntu2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 | import logging
from zookeeper import NodeExistsException, NoNodeException
from twisted.internet.defer import (
DeferredList, inlineCallbacks, returnValue, succeed, FirstError)
from juju.lib import under
from juju.state.charm import CharmStateManager
from juju.state.errors import CharmStateNotFound, StateChanged
log = logging.getLogger("juju.charm")
class CharmPublisher(object):
"""
Publishes a charm to an environment.
"""
def __init__(self, client, storage):
self._client = client
self._storage = storage
self._charm_state_manager = CharmStateManager(self._client)
self._charm_add_queue = []
self._charm_state_cache = {}
@classmethod
@inlineCallbacks
def for_environment(cls, environment):
provider = environment.get_machine_provider()
storage = provider.get_file_storage()
client = yield provider.connect()
returnValue(cls(client, storage))
@inlineCallbacks
def add_charm(self, charm_id, charm):
"""Schedule a charm for addition to an juju environment.
Returns true if the charm is scheduled for upload, false if
the charm is already present in juju.
"""
self._charm_add_queue.append((charm_id, charm))
if charm_id in self._charm_state_cache:
returnValue(False)
try:
state = yield self._charm_state_manager.get_charm_state(
charm_id)
except CharmStateNotFound:
pass
else:
log.info("Using cached charm version of %s" % charm.metadata.name)
self._charm_state_cache[charm_id] = state
returnValue(False)
returnValue(True)
def _publish_one(self, charm_id, charm):
if charm_id in self._charm_state_cache:
return succeed(self._charm_state_cache[charm_id])
bundle = charm.as_bundle()
charm_file = open(bundle.path, "rb")
charm_store_path = under.quote(
"%s:%s" % (charm_id, bundle.get_sha256()))
def close_charm_file(passthrough):
charm_file.close()
return passthrough
def get_charm_url(result):
return self._storage.get_url(charm_store_path)
d = self._storage.put(charm_store_path, charm_file)
d.addBoth(close_charm_file)
d.addCallback(get_charm_url)
d.addCallback(self._cb_store_charm_state, charm_id, bundle)
d.addErrback(self._eb_verify_duplicate, charm_id, bundle)
return d
def publish(self):
"""Publish all added charms to provider storage and zookeeper.
Returns the charm_state of all scheduled charms.
"""
publish_deferreds = []
for charm_id, charm in self._charm_add_queue:
publish_deferreds.append(self._publish_one(charm_id, charm))
publish_deferred = DeferredList(publish_deferreds,
fireOnOneErrback=1,
consumeErrors=1)
# callbacks and deferreds to unwind the dlist
publish_deferred.addCallback(self._cb_extract_charm_state)
publish_deferred.addErrback(self._eb_extract_error)
return publish_deferred
def _cb_extract_charm_state(self, result):
return [r[1] for r in result]
def _eb_extract_error(self, failure):
failure.trap(FirstError)
return failure.value.subFailure
def _cb_store_charm_state(self, charm_url, charm_id, charm):
return self._charm_state_manager.add_charm_state(
charm_id, charm, charm_url)
@inlineCallbacks
def _eb_verify_duplicate(self, failure, charm_id, charm):
"""Detects duplicates vs. conflicts, raises stateerror on conflict."""
failure.trap(NodeExistsException)
try:
charm_state = \
yield self._charm_state_manager.get_charm_state(charm_id)
except NoNodeException:
# Check if the state goes away due to concurrent removal
msg = "Charm removed concurrently during publish, please retry."
raise StateChanged(msg)
if charm_state.get_sha256() != charm.get_sha256():
msg = "Concurrent upload of charm has different checksum %s" % (
charm_id)
raise StateChanged(msg)
|