/usr/lib/python2.7/dist-packages/aodh/tests/functional/storage/test_impl_mongodb.py is in python-aodh 2.0.0-0ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 | #
# Copyright 2012 New Dream Network, LLC (DreamHost)
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Tests for aodh/storage/impl_mongodb.py
.. note::
In order to run the tests against another MongoDB server set the
environment variable aodh_TEST_MONGODB_URL to point to a MongoDB
server before running the tests.
"""
import unittest
try:
from aodh.storage import impl_mongodb
except ImportError:
impl_mongodb = None
from aodh.tests import base as test_base
from aodh.tests.functional import db as tests_db
@unittest.skipUnless(impl_mongodb, "pymongo not available")
@tests_db.run_with('mongodb')
class MongoDBConnection(tests_db.TestBase):
def test_connection_pooling(self):
test_conn = impl_mongodb.Connection(self.CONF,
self.CONF.database.connection)
self.assertEqual(self.alarm_conn.conn, test_conn.conn)
def test_replica_set(self):
url = self.CONF.database.connection + '?replicaSet=foobar'
conn = impl_mongodb.Connection(self.CONF, url)
self.assertTrue(conn.conn)
@unittest.skipUnless(impl_mongodb, "pymongo not available")
@tests_db.run_with('mongodb')
class IndexTest(tests_db.TestBase):
def _test_ttl_index_absent(self, conn, coll_name, ttl_opt):
# create a fake index and check it is deleted
coll = getattr(conn.db, coll_name)
index_name = '%s_ttl' % coll_name
self.CONF.set_override(ttl_opt, -1, group='database',
enforce_type=True)
conn.upgrade()
self.assertNotIn(index_name, coll.index_information())
self.CONF.set_override(ttl_opt, 456789, group='database',
enforce_type=True)
conn.upgrade()
self.assertEqual(456789,
coll.index_information()
[index_name]['expireAfterSeconds'])
def test_alarm_history_ttl_index_absent(self):
self._test_ttl_index_absent(self.alarm_conn, 'alarm_history',
'alarm_history_time_to_live')
def _test_ttl_index_present(self, conn, coll_name, ttl_opt):
coll = getattr(conn.db, coll_name)
self.CONF.set_override(ttl_opt, 456789, group='database',
enforce_type=True)
conn.upgrade()
index_name = '%s_ttl' % coll_name
self.assertEqual(456789,
coll.index_information()
[index_name]['expireAfterSeconds'])
self.CONF.set_override(ttl_opt, -1, group='database',
enforce_type=True)
conn.upgrade()
self.assertNotIn(index_name, coll.index_information())
def test_alarm_history_ttl_index_present(self):
self._test_ttl_index_present(self.alarm_conn, 'alarm_history',
'alarm_history_time_to_live')
class CapabilitiesTest(test_base.BaseTestCase):
@unittest.skipUnless(impl_mongodb, "pymongo not available")
def test_alarm_capabilities(self):
expected_capabilities = {
'alarms': {'query': {'simple': True,
'complex': True},
'history': {'query': {'simple': True,
'complex': True}}},
}
actual_capabilities = impl_mongodb.Connection.get_capabilities()
self.assertEqual(expected_capabilities, actual_capabilities)
|