/usr/lib/python2.7/dist-packages/aodh/notifier/zaqar.py is in python-aodh 2.0.0-0ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 | #
# Copyright 2015 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Zaqar alarm notifier."""
from oslo_config import cfg
from oslo_log import log
import six.moves.urllib.parse as urlparse
from aodh.i18n import _LE, _LI
from aodh import keystone_client
from aodh import notifier
LOG = log.getLogger(__name__)
SERVICE_OPTS = [
cfg.StrOpt('zaqar',
default='messaging',
help='Message queue service type.'),
]
cfg.CONF.register_opts(SERVICE_OPTS, group='service_types')
class ZaqarAlarmNotifier(notifier.AlarmNotifier):
"""Zaqar notifier."""
def __init__(self, conf):
super(ZaqarAlarmNotifier, self).__init__(conf)
self.conf = conf
self._zclient = None
def _get_endpoint(self):
try:
ks_client = keystone_client.get_client(self.conf)
return ks_client.service_catalog.url_for(
service_type=cfg.CONF.service_types.zaqar,
endpoint_type=self.conf.service_credentials.os_endpoint_type)
except Exception:
LOG.error(_LE("Aodh was configured to use zaqar:// action,"
" but Zaqar endpoint could not be found in Keystone"
" service catalog."))
def get_zaqar_client(self):
conf = self.conf.service_credentials
params = {
'auth_opts': {
'backend': 'keystone',
'options': {
'os_username': conf.os_username,
'os_password': conf.os_password,
'os_project_name': conf.os_tenant_name,
'os_auth_url': conf.os_auth_url,
'insecure': ''
}
}
}
try:
from zaqarclient.queues import client as zaqar_client
return zaqar_client.Client(self._get_endpoint(),
version=1.1, conf=params)
except Exception:
LOG.error(_LE("Failed to connect to Zaqar service "),
exc_info=True)
def notify(self, action, alarm_id, alarm_name, severity, previous,
current, reason, reason_data, headers=None):
LOG.info(_LI(
"Notifying alarm %(alarm_name)s %(alarm_id)s of %(severity)s "
"priority from %(previous)s to %(current)s with action %(action)s"
" because %(reason)s.") % ({'alarm_name': alarm_name,
'alarm_id': alarm_id,
'severity': severity,
'previous': previous,
'current': current,
'action': action,
'reason': reason}))
body = {'alarm_name': alarm_name, 'alarm_id': alarm_id,
'severity': severity, 'previous': previous,
'current': current, 'reason': reason,
'reason_data': reason_data}
message = dict(body=body)
self.notify_zaqar(action, message)
@property
def client(self):
if self._zclient is None:
self._zclient = self.get_zaqar_client()
return self._zclient
def notify_zaqar(self, action, message):
queue_info = urlparse.parse_qs(action.query)
try:
# queue_name is a combination of <alarm-id>-<topic>
queue_name = "%s-%s" % (message['body']['alarm_id'],
queue_info.get('topic')[-1])
# create a queue in zaqar
queue = self.client.queue(queue_name, force_create=True)
subscriber_list = queue_info.get('subscriber', [])
ttl = queue_info.get('ttl', [3600])[-1]
for subscriber in subscriber_list:
# add subscriber to the zaqar queue
subscription_data = dict(subscriber=subscriber,
ttl=ttl)
self.client.subscription(queue_name,
**subscription_data)
# post the message to the queue
queue.post(message)
except IndexError:
LOG.error(_LE("Required topic query option missing in action %s")
% action)
except Exception:
LOG.error(_LE("Unknown error occurred; Failed to post message to"
" Zaqar queue"),
exc_info=True)
|