/usr/lib/python2.7/dist-packages/magnum/service/periodic.py is in python-magnum 3.1.1-5.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 | # Copyright (c) 2015 Intel Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import functools
from heatclient import exc as heat_exc
from oslo_config import cfg
from oslo_log import log
from oslo_service import periodic_task
import six
from magnum.common import clients
from magnum.common import context
from magnum.common import exception
from magnum.common import rpc
from magnum.conductor import monitors
from magnum.i18n import _
from magnum.i18n import _LI
from magnum.i18n import _LW
from magnum import objects
from magnum.objects import fields
CONF = cfg.CONF
LOG = log.getLogger(__name__)
def set_context(func):
@functools.wraps(func)
def handler(self, ctx):
ctx = context.make_admin_context(all_tenants=True)
context.set_ctx(ctx)
func(self, ctx)
context.set_ctx(None)
return handler
class MagnumPeriodicTasks(periodic_task.PeriodicTasks):
'''Magnum periodic Task class
Any periodic task job need to be added into this class
NOTE(suro-patz):
- oslo_service.periodic_task runs tasks protected within try/catch
block, with default raise_on_error as 'False', in run_periodic_tasks(),
which ensures the process does not die, even if a task encounters an
Exception.
- The periodic tasks here does not necessarily need another
try/catch block. The present try/catch block here helps putting
magnum-periodic-task-specific log/error message.
'''
def __init__(self, conf):
super(MagnumPeriodicTasks, self).__init__(conf)
self.notifier = rpc.get_notifier()
@periodic_task.periodic_task(run_immediately=True)
@set_context
def sync_cluster_status(self, ctx):
try:
LOG.debug('Starting to sync up cluster status')
osc = clients.OpenStackClients(ctx)
status = [fields.ClusterStatus.CREATE_IN_PROGRESS,
fields.ClusterStatus.UPDATE_IN_PROGRESS,
fields.ClusterStatus.DELETE_IN_PROGRESS,
fields.ClusterStatus.ROLLBACK_IN_PROGRESS]
filters = {'status': status}
clusters = objects.Cluster.list(ctx, filters=filters)
if not clusters:
return
sid_to_cluster_mapping = {cluster.stack_id:
cluster for cluster in clusters}
cluster_stack_ids = sid_to_cluster_mapping.keys()
if CONF.periodic_global_stack_list:
stacks = osc.heat().stacks.list(
global_tenant=True, filters={'id': cluster_stack_ids})
else:
ret = self._get_cluster_stacks(
clusters, sid_to_cluster_mapping, cluster_stack_ids)
[stacks, clusters, cluster_stack_ids,
sid_to_cluster_mapping] = ret
sid_to_stack_mapping = {s.id: s for s in stacks}
# intersection of clusters magnum has and heat has
for sid in (six.viewkeys(sid_to_cluster_mapping) &
six.viewkeys(sid_to_stack_mapping)):
stack = sid_to_stack_mapping[sid]
cluster = sid_to_cluster_mapping[sid]
self._sync_existing_cluster(cluster, stack)
# the stacks that magnum has but heat doesn't have
for sid in (six.viewkeys(sid_to_cluster_mapping) -
six.viewkeys(sid_to_stack_mapping)):
cluster = sid_to_cluster_mapping[sid]
self._sync_missing_heat_stack(cluster)
except Exception as e:
LOG.warning(_LW(
"Ignore error [%s] when syncing up cluster status."
), e, exc_info=True)
def _get_cluster_stacks(
self, clusters, sid_to_cluster_mapping, cluster_stack_ids):
stacks = []
_clusters = clusters
_sid_to_cluster_mapping = sid_to_cluster_mapping
_cluster_stack_ids = cluster_stack_ids
for cluster in _clusters:
try:
# Create client with cluster's trustee user context
bosc = clients.OpenStackClients(
context.make_cluster_context(cluster))
stack = bosc.heat().stacks.get(cluster.stack_id)
stacks.append(stack)
# No need to do anything in this case
except heat_exc.HTTPNotFound:
pass
except Exception as e:
# Any other exception means we do not perform any
# action on this cluster in the current sync run, so remove
# it from all records.
LOG.warning(
_LW("Exception while attempting to retrieve "
"Heat stack %(stack_id)s for cluster %(cluster_id)s. "
"Traceback follows."),
{'stack_id': cluster.stack_id, 'cluster_id': cluster.id})
LOG.warning(e)
_sid_to_cluster_mapping.pop(cluster.stack_id)
_cluster_stack_ids.remove(cluster.stack_id)
_clusters.remove(cluster)
return [stacks, _clusters, _cluster_stack_ids, _sid_to_cluster_mapping]
def _sync_existing_cluster(self, cluster, stack):
if cluster.status != stack.stack_status:
old_status = cluster.status
cluster.status = stack.stack_status
cluster.status_reason = stack.stack_status_reason
cluster.save()
LOG.info(_LI("Sync up cluster with id %(id)s from "
"%(old_status)s to %(status)s."),
{'id': cluster.id, 'old_status': old_status,
'status': cluster.status})
def _sync_missing_heat_stack(self, cluster):
if cluster.status == fields.ClusterStatus.DELETE_IN_PROGRESS:
self._sync_deleted_stack(cluster)
elif cluster.status == fields.ClusterStatus.CREATE_IN_PROGRESS:
self._sync_missing_stack(cluster,
fields.ClusterStatus.CREATE_FAILED)
elif cluster.status == fields.ClusterStatus.UPDATE_IN_PROGRESS:
self._sync_missing_stack(cluster,
fields.ClusterStatus.UPDATE_FAILED)
def _sync_deleted_stack(self, cluster):
try:
cluster.destroy()
except exception.ClusterNotFound:
LOG.info(_LI('The cluster %s has been deleted by others.'),
cluster.uuid)
else:
LOG.info(_LI("cluster with id %(id)s not found in heat "
"with stack id %(sid)s, with status_reason: "
"%(reason)s."), {'id': cluster.id,
'sid': cluster.stack_id,
'reason': cluster.status_reason})
def _sync_missing_stack(self, cluster, new_status):
cluster.status = new_status
cluster.status_reason = _("Stack with id %s not found in "
"Heat.") % cluster.stack_id
cluster.save()
LOG.info(_LI("Cluster with id %(id)s has been set to "
"%(status)s due to stack with id %(sid)s "
"not found in Heat."),
{'id': cluster.id, 'status': cluster.status,
'sid': cluster.stack_id})
@periodic_task.periodic_task(run_immediately=True)
@set_context
def _send_cluster_metrics(self, ctx):
LOG.debug('Starting to send cluster metrics')
for cluster in objects.Cluster.list(ctx):
if cluster.status not in [fields.ClusterStatus.CREATE_COMPLETE,
fields.ClusterStatus.UPDATE_COMPLETE]:
continue
monitor = monitors.create_monitor(ctx, cluster)
if monitor is None:
continue
try:
monitor.pull_data()
except Exception as e:
LOG.warning(
_LW("Skip pulling data from cluster %(cluster)s due to "
"error: %(e)s"),
{'e': e, 'cluster': cluster.uuid}, exc_info=True)
continue
metrics = list()
for name in monitor.get_metric_names():
try:
metric = {
'name': name,
'value': monitor.compute_metric_value(name),
'unit': monitor.get_metric_unit(name),
}
metrics.append(metric)
except Exception as e:
LOG.warning(_LW("Skip adding metric %(name)s due to "
"error: %(e)s"),
{'e': e, 'name': name}, exc_info=True)
message = dict(metrics=metrics,
user_id=cluster.user_id,
project_id=cluster.project_id,
resource_id=cluster.uuid)
LOG.debug("About to send notification: '%s'", message)
self.notifier.info(ctx, "magnum.cluster.metrics.update",
message)
def setup(conf, tg):
pt = MagnumPeriodicTasks(conf)
tg.add_dynamic_timer(
pt.run_periodic_tasks,
periodic_interval_max=conf.periodic_interval_max,
context=None)
|