This file is indexed.

/usr/lib/python2.7/dist-packages/os_faults/tests/unit/drivers/cloud/test_universal.py is in python-os-faults 0.1.17-0ubuntu1.1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import ddt
import mock

from os_faults.api import node_collection
from os_faults.drivers.cloud import universal
from os_faults.tests.unit import fakes
from os_faults.tests.unit import test


@ddt.ddt
class UniversalManagementTestCase(test.TestCase):

    @mock.patch('os_faults.ansible.executor.AnsibleRunner', autospec=True)
    @ddt.data((
        dict(address='os.local', auth=dict(username='root')),
        dict(remote_user='root', private_key_file=None, password=None,
             become=None, become_password=None, jump_host=None,
             jump_user=None, serial=None),
    ), (
        dict(address='os.local', auth=dict(username='user', become=True,
             become_password='secret'), serial=42),
        dict(remote_user='user', private_key_file=None, password=None,
             become=True, become_password='secret', jump_host=None,
             jump_user=None, serial=42),
    ))
    @ddt.unpack
    def test_init(self, config, expected_runner_call, mock_ansible_runner):
        ansible_runner_inst = mock_ansible_runner.return_value

        cloud = universal.UniversalCloudManagement(config)

        mock_ansible_runner.assert_called_with(**expected_runner_call)
        self.assertIs(cloud.cloud_executor, ansible_runner_inst)

    @mock.patch('os_faults.ansible.executor.AnsibleRunner', autospec=True)
    @mock.patch('os_faults.drivers.cloud.universal.UniversalCloudManagement.'
                'discover_hosts')
    def test_verify(self, mock_discover_hosts, mock_ansible_runner):
        address = '10.0.0.10'
        ansible_result = fakes.FakeAnsibleResult(
            payload=dict(stdout='openstack.local'))
        ansible_runner_inst = mock_ansible_runner.return_value
        ansible_runner_inst.execute.side_effect = [
            [ansible_result]
        ]
        hosts = [node_collection.Host(ip=address)]
        mock_discover_hosts.return_value = hosts

        cloud = universal.UniversalCloudManagement(dict(address=address))
        cloud.verify()

        ansible_runner_inst.execute.assert_has_calls([
            mock.call(hosts, {'command': 'hostname'}),
        ])

    @mock.patch('os_faults.ansible.executor.AnsibleRunner', autospec=True)
    def test_discover_hosts(self, mock_ansible_runner):
        address = '10.0.0.10'
        hostname = 'openstack.local'

        ansible_runner_inst = mock_ansible_runner.return_value
        ansible_runner_inst.execute.side_effect = [
            [fakes.FakeAnsibleResult(
                payload=dict(stdout=hostname))]
        ]
        expected_hosts = [node_collection.Host(
            ip=address, mac=None, fqdn=hostname)]

        cloud = universal.UniversalCloudManagement(dict(address=address))

        self.assertEqual(expected_hosts, cloud.discover_hosts())

    @mock.patch('os_faults.ansible.executor.AnsibleRunner', autospec=True)
    def test_discover_hosts_with_iface(self, mock_ansible_runner):
        address = '10.0.0.10'
        hostname = 'openstack.local'
        mac = '0b:fe:fe:13:12:11'

        ansible_runner_inst = mock_ansible_runner.return_value
        ansible_runner_inst.execute.side_effect = [
            [fakes.FakeAnsibleResult(
                payload=dict(stdout=mac))],
            [fakes.FakeAnsibleResult(
                payload=dict(stdout=hostname))],
        ]
        expected_hosts = [node_collection.Host(
            ip=address, mac=mac, fqdn=hostname)]

        cloud = universal.UniversalCloudManagement(
            dict(address=address, iface='eth1'))

        self.assertEqual(expected_hosts, cloud.discover_hosts())