/usr/lib/python3/dist-packages/designateclient/client.py is in python3-designateclient 2.1.0-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 | # Copyright 2012 Managed I.T.
#
# Author: Kiall Mac Innes <kiall@managedit.ie>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import json
import six
from six.moves.urllib import parse
from stevedore import extension
from designateclient import exceptions
@six.add_metaclass(abc.ABCMeta)
class Controller(object):
def __init__(self, client):
self.client = client
def build_url(self, url, criterion=None, marker=None, limit=None):
params = criterion or {}
if marker is not None:
params['marker'] = marker
if limit is not None:
params['limit'] = limit
q = parse.urlencode(params) if params else ''
return '%(url)s%(params)s' % {
'url': url,
'params': '?%s' % q
}
def _serialize(self, kwargs):
if 'data' in kwargs:
kwargs['data'] = json.dumps(kwargs['data'])
def _post(self, url, response_key=None, **kwargs):
self._serialize(kwargs)
resp, body = self.client.session.post(url, **kwargs)
if response_key is not None:
return body[response_key]
return body
def _get(self, url, response_key=None):
resp, body = self.client.session.get(url)
if response_key is not None:
return body[response_key]
return body
def _patch(self, url, response_key=None, **kwargs):
self._serialize(kwargs)
resp, body = self.client.session.patch(url, **kwargs)
if response_key is not None:
return body[response_key]
return body
def _put(self, url, response_key=None, **kwargs):
self._serialize(kwargs)
resp, body = self.client.session.put(url, **kwargs)
if response_key is not None:
return body[response_key]
return body
def _delete(self, url):
resp, body = self.client.session.delete(url)
@six.add_metaclass(abc.ABCMeta)
class CrudController(Controller):
@abc.abstractmethod
def list(self, *args, **kw):
"""
List a resource
"""
@abc.abstractmethod
def get(self, *args, **kw):
"""
Get a resource
"""
@abc.abstractmethod
def create(self, *args, **kw):
"""
Create a resource
"""
@abc.abstractmethod
def update(self, *args, **kw):
"""
Update a resource
"""
@abc.abstractmethod
def delete(self, *args, **kw):
"""
Delete a resource
"""
def get_versions():
mgr = extension.ExtensionManager('designateclient.versions')
return dict([(ep.name, ep.plugin) for ep in mgr.extensions])
def Client(version, *args, **kwargs): # noqa
versions = get_versions()
if version not in versions:
msg = 'Version %s is not supported, use one of (%s)' % (
version, list(six.iterkeys(versions)))
raise exceptions.UnsupportedVersion(msg)
return versions[version](*args, **kwargs)
|