This file is indexed.

/usr/lib/python2.7/dist-packages/neutron_dynamic_routing/extensions/bgp_dragentscheduler.py is in python-neutron-dynamic-routing 2:9.0.0-1.2.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
# Copyright 2016 Huawei Technologies India Pvt. Ltd.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import abc
import six
import webob

from neutron_lib import exceptions as n_exc
from oslo_log import log as logging

from neutron.api import extensions
from neutron.api.v2 import base
from neutron.api.v2 import resource
from neutron.extensions import agent
from neutron import manager
from neutron import wsgi

from neutron_dynamic_routing._i18n import _, _LE
from neutron_dynamic_routing.extensions import bgp as bgp_ext


LOG = logging.getLogger(__name__)

BGP_DRAGENT_SCHEDULER_EXT_ALIAS = 'bgp_dragent_scheduler'
BGP_DRINSTANCE = 'bgp-drinstance'
BGP_DRINSTANCES = BGP_DRINSTANCE + 's'
BGP_DRAGENT = 'bgp-dragent'
BGP_DRAGENTS = BGP_DRAGENT + 's'


class DrAgentInvalid(agent.AgentNotFound):
    message = _("BgpDrAgent %(id)s is invalid or has been disabled.")


class DrAgentNotHostingBgpSpeaker(n_exc.NotFound):
    message = _("BGP speaker %(bgp_speaker_id)s is not hosted "
                "by the BgpDrAgent %(agent_id)s.")


class DrAgentAssociationError(n_exc.Conflict):
    message = _("BgpDrAgent %(agent_id)s is already associated "
                "to a BGP speaker.")


class BgpDrSchedulerController(wsgi.Controller):
    """Schedule BgpSpeaker for a BgpDrAgent"""
    def get_plugin(self):
        plugin = manager.NeutronManager.get_service_plugins().get(
            bgp_ext.BGP_EXT_ALIAS)
        if not plugin:
            LOG.error(_LE('No plugin for BGP routing registered'))
            msg = _('The resource could not be found.')
            raise webob.exc.HTTPNotFound(msg)
        return plugin

    def index(self, request, **kwargs):
        plugin = self.get_plugin()
        return plugin.list_bgp_speaker_on_dragent(
            request.context, kwargs['agent_id'])

    def create(self, request, body, **kwargs):
        plugin = self.get_plugin()
        return plugin.add_bgp_speaker_to_dragent(
            request.context,
            kwargs['agent_id'],
            body['bgp_speaker_id'])

    def delete(self, request, id, **kwargs):
        plugin = self.get_plugin()
        return plugin.remove_bgp_speaker_from_dragent(
            request.context, kwargs['agent_id'], id)


class BgpDrAgentController(wsgi.Controller):
    def get_plugin(self):
        plugin = manager.NeutronManager.get_service_plugins().get(
            bgp_ext.BGP_EXT_ALIAS)
        if not plugin:
            LOG.error(_LE('No plugin for BGP routing registered'))
            msg = _('The resource could not be found.')
            raise webob.exc.HTTPNotFound(msg)
        return plugin

    def index(self, request, **kwargs):
        plugin = manager.NeutronManager.get_service_plugins().get(
            bgp_ext.BGP_EXT_ALIAS)
        return plugin.list_dragent_hosting_bgp_speaker(
            request.context, kwargs['bgp_speaker_id'])


class Bgp_dragentscheduler(extensions.ExtensionDescriptor):
    """Extension class supporting Dynamic Routing scheduler.
    """
    @classmethod
    def get_name(cls):
        return "BGP Dynamic Routing Agent Scheduler"

    @classmethod
    def get_alias(cls):
        return BGP_DRAGENT_SCHEDULER_EXT_ALIAS

    @classmethod
    def get_description(cls):
        return "Schedules BgpSpeakers on BgpDrAgent"

    @classmethod
    def get_updated(cls):
        return "2015-07-30T10:00:00-00:00"

    @classmethod
    def get_resources(cls):
        """Returns Ext Resources."""
        exts = []
        parent = dict(member_name="agent",
                      collection_name="agents")

        controller = resource.Resource(BgpDrSchedulerController(),
                                       base.FAULT_MAP)
        exts.append(extensions.ResourceExtension(BGP_DRINSTANCES,
                                                 controller, parent))

        parent = dict(member_name="bgp_speaker",
                      collection_name="bgp-speakers")
        controller = resource.Resource(BgpDrAgentController(),
                                       base.FAULT_MAP)
        exts.append(extensions.ResourceExtension(BGP_DRAGENTS,
                                                 controller, parent))
        return exts

    def get_extended_resources(self, version):
        return {}


@six.add_metaclass(abc.ABCMeta)
class BgpDrSchedulerPluginBase(object):
    """REST API to operate BGP dynamic routing agent scheduler.

    All the methods must be executed in admin context.
    """
    def get_plugin_description(self):
        return "Neutron BGP dynamic routing scheduler Plugin"

    def get_plugin_type(self):
        return bgp_ext.BGP_EXT_ALIAS

    @abc.abstractmethod
    def add_bgp_speaker_to_dragent(self, context, agent_id, speaker_id):
        pass

    @abc.abstractmethod
    def remove_bgp_speaker_from_dragent(self, context, agent_id, speaker_id):
        pass

    @abc.abstractmethod
    def list_dragent_hosting_bgp_speaker(self, context, speaker_id):
        pass

    @abc.abstractmethod
    def list_bgp_speaker_on_dragent(self, context, agent_id):
        pass

    @abc.abstractmethod
    def get_bgp_speakers_for_agent_host(self, context, host):
        pass

    @abc.abstractmethod
    def get_bgp_speaker_by_speaker_id(self, context, speaker_id):
        pass

    @abc.abstractmethod
    def get_bgp_peer_by_peer_id(self, context, bgp_peer_id):
        pass