/usr/share/pyshared/juju/state/firewall.py is in juju-0.7 0.7-0ubuntu2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 | import logging
from twisted.internet.defer import inlineCallbacks
from juju.errors import MachinesNotFound
from juju.state.errors import (
ServiceStateNotFound, ServiceUnitStateNotFound, StateChanged,
StopWatcher)
from juju.state.machine import MachineStateManager
from juju.state.service import ServiceStateManager
log = logging.getLogger("juju.state.expose")
NotExposed = object()
class FirewallManager(object):
"""Manages the opening and closing of ports in the firewall.
"""
def __init__(self, client, is_running, provider):
"""Initialize a Firewall Manager.
:param client: A connected zookeeper client.
:param is_running: A function (usually a bound method) that
returns whether the associated agent is still running or
not.
:param provider: A machine provider, used for making the
actual changes in the environment to firewall settings.
"""
self.machine_state_manager = MachineStateManager(client)
self.service_state_manager = ServiceStateManager(client)
self.is_running = is_running
self.provider = provider
# Track all currently watched machines, using machine ID.
self._watched_machines = set()
# Map service name to either NotExposed or set of exposed unit names.
# If a service name is present in the dictionary, it means its
# respective expose node is being watched.
self._watched_services = {}
# Machines to retry open_close_ports because of earlier errors
self._retry_machines_on_port_error = set()
# Registration of observers for corresponding actions
self._open_close_ports_observers = set()
self._open_close_ports_on_machine_observers = set()
@inlineCallbacks
def process_machine(self, machine_state):
"""Ensures watch is setup per machine and performs any necessary retry.
:param machine_state: The machine state of the machine to be checked.
The watch that is established, via
:method:`juju.state.machine.MachineState.watch_assigned_units`,
handles the scenario where a service or service unit is
removed from the topology. Because the service unit is no
longer in the topology, the corresponding watch terminates and
is unable to `open_close_ports` in response to the
change. However, the specific machine watch will be called in
this case, and that suffices to determine that its port policy
should be checked.
In addition, this method can rely on the fact that the
provisioning agent periodically rechecks machines so as to
support retries of security group operations that failed for
that provider. This method is called by the corresponding
:method:`juju.agents.provision.ProvisioningAgent.process_machine`
in the provisioning agent.
"""
if machine_state.id in self._retry_machines_on_port_error:
self._retry_machines_on_port_error.remove(machine_state.id)
try:
yield self.open_close_ports_on_machine(machine_state.id)
except StopWatcher:
# open_close_ports_on_machine can also be called from
# a watch, so simply ignore this since it's just used
# to shutdown a watch in the case of agent shutdown
pass
def cb_watch_assigned_units(old_units, new_units):
"""Watch assigned units for changes possibly require port mgmt.
"""
log.debug("Assigned units for machine %r: old=%r, new=%r",
machine_state.id, old_units, new_units)
return self.open_close_ports_on_machine(machine_state.id)
if machine_state.id not in self._watched_machines:
self._watched_machines.add(machine_state.id)
yield machine_state.watch_assigned_units(cb_watch_assigned_units)
@inlineCallbacks
def watch_service_changes(self, old_services, new_services):
"""Manage watching service exposed status.
This method is called upon every change to the set of services
currently deployed. All services are then watched for changes
to their exposed flag setting.
:param old_services: the set of services before this change.
:param new_services: the current set of services.
"""
removed_services = old_services - new_services
for service_name in removed_services:
self._watched_services.pop(service_name, None)
for service_name in new_services:
yield self._setup_new_service_watch(service_name)
@inlineCallbacks
def _setup_new_service_watch(self, service_name):
"""Sets up the watching of the exposed flag for a new service.
If `service_name` is not watched (as known by
`self._watched_services`), adds the watch and a corresponding
entry in self._watched_services.
(This dict is necessary because there is currently no way to
introspect a service for whether it is watched or not.)
"""
if service_name in self._watched_services:
return # already watched
self._watched_services[service_name] = NotExposed
try:
service_state = yield self.service_state_manager.get_service_state(
service_name)
except ServiceStateNotFound:
log.debug("Cannot setup watch, since service %r no longer exists",
service_name)
self._watched_services.pop(service_name, None)
return
@inlineCallbacks
def cb_watch_service_exposed_flag(exposed):
if not self.is_running():
raise StopWatcher()
if exposed:
log.debug("Service %r is exposed", service_name)
else:
log.debug("Service %r is unexposed", service_name)
try:
unit_states = yield service_state.get_all_unit_states()
except StateChanged:
log.debug("Stopping watch on %r, no longer in topology",
service_name)
raise StopWatcher()
for unit_state in unit_states:
yield self.open_close_ports(unit_state)
if not exposed:
log.debug("Service %r is unexposed", service_name)
self._watched_services[service_name] = NotExposed
else:
log.debug("Service %r is exposed", service_name)
self._watched_services[service_name] = set()
yield self._setup_service_unit_watch(service_state)
yield service_state.watch_exposed_flag(cb_watch_service_exposed_flag)
log.debug("Started watch of %r on changes to being exposed",
service_name)
@inlineCallbacks
def _setup_service_unit_watch(self, service_state):
"""Setup watches on service units of newly exposed `service_name`."""
@inlineCallbacks
def cb_check_service_units(old_service_units, new_service_units):
watched_units = self._watched_services.get(
service_state.service_name, NotExposed)
if not self.is_running() or watched_units is NotExposed:
raise StopWatcher()
removed_service_units = old_service_units - new_service_units
for unit_name in removed_service_units:
watched_units.discard(unit_name)
if not self.is_running():
raise StopWatcher()
try:
unit_state = yield service_state.get_unit_state(unit_name)
except (ServiceUnitStateNotFound, StateChanged):
log.debug("Not setting up watch on %r, not in topology",
unit_name)
continue
yield self.open_close_ports(unit_state)
for unit_name in new_service_units:
if unit_name not in watched_units:
watched_units.add(unit_name)
yield self._setup_watch_ports(service_state, unit_name)
yield service_state.watch_service_unit_states(cb_check_service_units)
log.debug("Started watch of service units for exposed service %r",
service_state.service_name)
@inlineCallbacks
def _setup_watch_ports(self, service_state, unit_name):
"""Setup the watching of ports for `unit_name`."""
try:
unit_state = yield service_state.get_unit_state(unit_name)
except (ServiceUnitStateNotFound, StateChanged):
log.debug("Cannot setup watch on %r (no longer exists), ignoring",
unit_name)
return
@inlineCallbacks
def cb_watch_ports(value):
"""Permanently watch ports until service is no longer exposed."""
watched_units = self._watched_services.get(
service_state.service_name, NotExposed)
if (not self.is_running() or watched_units is NotExposed or
unit_name not in watched_units):
log.debug("Stopping ports watch for %r", unit_name)
raise StopWatcher()
yield self.open_close_ports(unit_state)
yield unit_state.watch_ports(cb_watch_ports)
log.debug("Started watch of %r on changes to open ports", unit_name)
def add_open_close_ports_observer(self, observer):
"""Set `observer` for calls to `open_close_ports`.
:param observer: The callback is called with the corresponding
:class:`juju.state.service.UnitState`.
"""
self._open_close_ports_observers.add(observer)
@inlineCallbacks
def open_close_ports(self, unit_state):
"""Called upon changes that *may* open/close ports for a service unit.
"""
if not self.is_running():
raise StopWatcher()
try:
try:
machine_id = yield unit_state.get_assigned_machine_id()
except StateChanged:
log.debug("Stopping watch, machine %r no longer in topology",
unit_state.unit_name)
raise StopWatcher()
if machine_id is not None:
yield self.open_close_ports_on_machine(machine_id)
finally:
# Ensure that the observations runs after the
# corresponding action completes. In particular, tests
# that use observation depend on this ordering to ensure
# that the action has in fact happened before they can
# proceed.
observers = list(self._open_close_ports_observers)
for observer in observers:
yield observer(unit_state)
def add_open_close_ports_on_machine_observer(self, observer):
"""Add `observer` for calls to `open_close_ports`.
:param observer: A callback receives the machine id for each call.
"""
self._open_close_ports_on_machine_observers.add(observer)
@inlineCallbacks
def open_close_ports_on_machine(self, machine_id):
"""Called upon changes that *may* open/close ports for a machine.
:param machine_id: The machine ID of the machine that needs to
be checked.
This machine supports multiple service units being assigned to a
machine; all service units are checked each time this is
called to determine the active set of ports to be opened.
"""
if not self.is_running():
raise StopWatcher()
try:
machine_state = yield self.machine_state_manager.get_machine_state(
machine_id)
instance_id = yield machine_state.get_instance_id()
machine = yield self.provider.get_machine(instance_id)
unit_states = yield machine_state.get_all_service_unit_states()
policy_ports = set()
for unit_state in unit_states:
service_state = yield self.service_state_manager.\
get_service_state(unit_state.service_name)
exposed = yield service_state.get_exposed_flag()
if exposed:
ports = yield unit_state.get_open_ports()
for port in ports:
policy_ports.add(
(port["port"], port["proto"]))
current_ports = yield self.provider.get_opened_ports(
machine, machine_id)
to_open = policy_ports - current_ports
to_close = current_ports - policy_ports
for port, proto in to_open:
yield self.provider.open_port(machine, machine_id, port, proto)
for port, proto in to_close:
yield self.provider.close_port(
machine, machine_id, port, proto)
except MachinesNotFound:
log.info("No provisioned machine for machine %r", machine_id)
except Exception:
log.exception("Got exception in opening/closing ports, will retry")
self._retry_machines_on_port_error.add(machine_id)
finally:
# Ensure that the observation runs after the corresponding
# action completes. In particular, tests that use
# observation depend on this ordering to ensure that this
# action has happened before they can proceed.
observers = list(self._open_close_ports_on_machine_observers)
for observer in observers:
yield observer(machine_id)
|