/usr/lib/python2.7/dist-packages/keystoneauth1/identity/v3/k2k.py is in python-keystoneauth1 3.4.0-0ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 | # Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import six
from keystoneauth1 import access
from keystoneauth1 import exceptions
from keystoneauth1.identity.v3 import federation
from keystoneauth1 import plugin
__all__ = ('Keystone2Keystone',)
class Keystone2Keystone(federation._Rescoped):
"""Plugin to execute the Keystone to Keyestone authentication flow.
In this plugin, an ECP wrapped SAML assertion provided by a keystone
Identity Provider (IdP) is used to request an OpenStack unscoped token
from a keystone Service Provider (SP).
:param base_plugin: Auth plugin already authenticated against the keystone
IdP.
:type base_plugin: keystoneauth1.identity.v3.base.BaseAuth
:param service_provider: The Service Provider ID as returned by
ServiceProviderManager.list()
:type service_provider: str
"""
REQUEST_ECP_URL = '/auth/OS-FEDERATION/saml2/ecp'
"""Path where the ECP wrapped SAML assertion should be presented to the
Keystone Service Provider."""
HTTP_MOVED_TEMPORARILY = 302
HTTP_SEE_OTHER = 303
def __init__(self, base_plugin, service_provider, **kwargs):
super(Keystone2Keystone, self).__init__(auth_url=None, **kwargs)
self._local_cloud_plugin = base_plugin
self._sp_id = service_provider
@classmethod
def _remote_auth_url(cls, auth_url):
"""Return auth_url of the remote Keystone Service Provider.
Remote cloud's auth_url is an endpoint for getting federated unscoped
token, typically that would be
``https://remote.example.com:5000/v3/OS-FEDERATION/identity_providers/
<idp>/protocols/<protocol_id>/auth``. However we need to generate a
real auth_url, used for token scoping. This function assumes there are
static values today in the remote auth_url stored in the Service
Provider attribute and those can be used as a delimiter. If the
sp_auth_url doesn't comply with standard federation auth url the
function will simply return whole string.
:param auth_url: auth_url of the remote cloud
:type auth_url: str
:returns: auth_url of remote cloud where a token can be validated or
scoped.
:rtype: str
"""
PATTERN = '/OS-FEDERATION/'
idx = auth_url.index(PATTERN) if PATTERN in auth_url else len(auth_url)
return auth_url[:idx]
def _get_ecp_assertion(self, session):
body = {
'auth': {
'identity': {
'methods': ['token'],
'token': {
'id': self._local_cloud_plugin.get_token(session)
}
},
'scope': {
'service_provider': {
'id': self._sp_id
}
}
}
}
endpoint_filter = {'version': (3, 0),
'interface': plugin.AUTH_INTERFACE}
headers = {'Accept': 'application/json'}
resp = session.post(self.REQUEST_ECP_URL,
json=body,
auth=self._local_cloud_plugin,
endpoint_filter=endpoint_filter,
headers=headers,
authenticated=False,
raise_exc=False)
# NOTE(marek-denis): I am not sure whether disabling exceptions in the
# Session object and testing if resp.ok is sufficient. An alternative
# would be catching locally all exceptions and reraising with custom
# warning.
if not resp.ok:
msg = ("Error while requesting ECP wrapped assertion: response "
"exit code: %(status_code)d, reason: %(err)s")
msg = msg % {'status_code': resp.status_code, 'err': resp.reason}
raise exceptions.AuthorizationFailure(msg)
if not resp.text:
raise exceptions.InvalidResponse(resp)
return six.text_type(resp.text)
def _send_service_provider_ecp_authn_response(self, session, sp_url,
sp_auth_url):
"""Present ECP wrapped SAML assertion to the keystone SP.
The assertion is issued by the keystone IdP and it is targeted to the
keystone that will serve as Service Provider.
:param session: a session object to send out HTTP requests.
:param sp_url: URL where the ECP wrapped SAML assertion will be
presented to the keystone SP. Usually, something like:
https://sp.com/Shibboleth.sso/SAML2/ECP
:type sp_url: str
:param sp_auth_url: Federated authentication URL of the keystone SP.
It is specified by IdP, for example:
https://sp.com/v3/OS-FEDERATION/identity_providers/
idp_id/protocols/protocol_id/auth
:type sp_auth_url: str
"""
response = session.post(
sp_url,
headers={'Content-Type': 'application/vnd.paos+xml'},
data=self._get_ecp_assertion(session),
authenticated=False,
redirect=False)
# Don't follow HTTP specs - after the HTTP 302/303 response don't
# repeat the call directed to the Location URL. In this case, this is
# an indication that SAML2 session is now active and protected resource
# can be accessed.
if response.status_code in (self.HTTP_MOVED_TEMPORARILY,
self.HTTP_SEE_OTHER):
response = session.get(
sp_auth_url,
headers={'Content-Type': 'application/vnd.paos+xml'},
authenticated=False)
return response
def get_unscoped_auth_ref(self, session, **kwargs):
sp_auth_url = self._local_cloud_plugin.get_sp_auth_url(
session, self._sp_id)
sp_url = self._local_cloud_plugin.get_sp_url(session, self._sp_id)
self.auth_url = self._remote_auth_url(sp_auth_url)
response = self._send_service_provider_ecp_authn_response(
session, sp_url, sp_auth_url)
return access.create(resp=response)
|