This file is indexed.

/usr/lib/python2.7/dist-packages/magnum/drivers/k8s_fedora_ironic_v1/template_def.py is in python-magnum 3.1.1-5.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

from neutronclient.common import exceptions as n_exception
from neutronclient.neutron import v2_0 as neutronV20
import os
from oslo_log import log as logging

from magnum.common import exception
from magnum.drivers.common import k8s_fedora_template_def as kftd
from oslo_config import cfg

CONF = cfg.CONF

LOG = logging.getLogger(__name__)


class FedoraK8sIronicTemplateDefinition(kftd.K8sFedoraTemplateDefinition):
    """Kubernetes template for a Fedora Baremetal."""

    provides = [
        {'server_type': 'bm',
         'os': 'fedora',
         'coe': 'kubernetes'},
    ]

    def __init__(self):
        super(FedoraK8sIronicTemplateDefinition, self).__init__()
        self.add_parameter('fixed_subnet',
                           cluster_template_attr='fixed_subnet',
                           param_type=str,
                           required=True)

    def get_fixed_network_id(self, osc, cluster_template):
        try:
            subnet = neutronV20.find_resource_by_name_or_id(
                osc.neutron(),
                'subnet',
                cluster_template.fixed_subnet
            )
        except n_exception.NeutronException as e:
            # NOTE(yuanying): NeutronCLIError doesn't have status_code
            # if subnet name is duplicated, NeutronClientNoUniqueMatch
            # (which is kind of NeutronCLIError) will be raised.
            if getattr(e, 'status_code', 400) < 500:
                raise exception.InvalidSubnet(message=("%s" % e))
            else:
                raise e

        if subnet['ip_version'] != 4:
            raise exception.InvalidSubnet(
                message="Subnet IP version should be 4"
            )

        return subnet['network_id']

    def get_params(self, context, cluster_template, cluster, **kwargs):
        ep = kwargs.pop('extra_params', {})

        osc = self.get_osc(context)
        ep['fixed_network'] = self.get_fixed_network_id(osc, cluster_template)

        return super(FedoraK8sIronicTemplateDefinition,
                     self).get_params(context, cluster_template, cluster,
                                      extra_params=ep,
                                      **kwargs)

    @property
    def driver_module_path(self):
        return __name__[:__name__.rindex('.')]

    @property
    def template_path(self):
        return os.path.join(os.path.dirname(os.path.realpath(__file__)),
                            'templates/kubecluster.yaml')