/usr/lib/python3/dist-packages/oauth2client/crypt.py is in python3-oauth2client 2.0.1-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 | # -*- coding: utf-8 -*-
#
# Copyright 2014 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Crypto-related routines for oauth2client."""
import json
import logging
import time
from oauth2client._helpers import _from_bytes
from oauth2client._helpers import _json_encode
from oauth2client._helpers import _to_bytes
from oauth2client._helpers import _urlsafe_b64decode
from oauth2client._helpers import _urlsafe_b64encode
from oauth2client._pure_python_crypt import RsaSigner
from oauth2client._pure_python_crypt import RsaVerifier
CLOCK_SKEW_SECS = 300 # 5 minutes in seconds
AUTH_TOKEN_LIFETIME_SECS = 300 # 5 minutes in seconds
MAX_TOKEN_LIFETIME_SECS = 86400 # 1 day in seconds
logger = logging.getLogger(__name__)
class AppIdentityError(Exception):
"""Error to indicate crypto failure."""
def _bad_pkcs12_key_as_pem(*args, **kwargs):
raise NotImplementedError('pkcs12_key_as_pem requires OpenSSL.')
try:
from oauth2client._openssl_crypt import OpenSSLVerifier
from oauth2client._openssl_crypt import OpenSSLSigner
from oauth2client._openssl_crypt import pkcs12_key_as_pem
except ImportError: # pragma: NO COVER
OpenSSLVerifier = None
OpenSSLSigner = None
pkcs12_key_as_pem = _bad_pkcs12_key_as_pem
try:
from oauth2client._pycrypto_crypt import PyCryptoVerifier
from oauth2client._pycrypto_crypt import PyCryptoSigner
except ImportError: # pragma: NO COVER
PyCryptoVerifier = None
PyCryptoSigner = None
if OpenSSLSigner:
Signer = OpenSSLSigner
Verifier = OpenSSLVerifier
elif PyCryptoSigner: # pragma: NO COVER
Signer = PyCryptoSigner
Verifier = PyCryptoVerifier
else: # pragma: NO COVER
Signer = RsaSigner
Verifier = RsaVerifier
def make_signed_jwt(signer, payload, key_id=None):
"""Make a signed JWT.
See http://self-issued.info/docs/draft-jones-json-web-token.html.
Args:
signer: crypt.Signer, Cryptographic signer.
payload: dict, Dictionary of data to convert to JSON and then sign.
key_id: string, (Optional) Key ID header.
Returns:
string, The JWT for the payload.
"""
header = {'typ': 'JWT', 'alg': 'RS256'}
if key_id is not None:
header['kid'] = key_id
segments = [
_urlsafe_b64encode(_json_encode(header)),
_urlsafe_b64encode(_json_encode(payload)),
]
signing_input = b'.'.join(segments)
signature = signer.sign(signing_input)
segments.append(_urlsafe_b64encode(signature))
logger.debug(str(segments))
return b'.'.join(segments)
def _verify_signature(message, signature, certs):
"""Verifies signed content using a list of certificates.
Args:
message: string or bytes, The message to verify.
signature: string or bytes, The signature on the message.
certs: iterable, certificates in PEM format.
Raises:
AppIdentityError: If none of the certificates can verify the message
against the signature.
"""
for pem in certs:
verifier = Verifier.from_string(pem, is_x509_cert=True)
if verifier.verify(message, signature):
return
# If we have not returned, no certificate confirms the signature.
raise AppIdentityError('Invalid token signature')
def _check_audience(payload_dict, audience):
"""Checks audience field from a JWT payload.
Does nothing if the passed in ``audience`` is null.
Args:
payload_dict: dict, A dictionary containing a JWT payload.
audience: string or NoneType, an audience to check for in
the JWT payload.
Raises:
AppIdentityError: If there is no ``'aud'`` field in the payload
dictionary but there is an ``audience`` to check.
AppIdentityError: If the ``'aud'`` field in the payload dictionary
does not match the ``audience``.
"""
if audience is None:
return
audience_in_payload = payload_dict.get('aud')
if audience_in_payload is None:
raise AppIdentityError('No aud field in token: %s' %
(payload_dict,))
if audience_in_payload != audience:
raise AppIdentityError('Wrong recipient, %s != %s: %s' %
(audience_in_payload, audience, payload_dict))
def _verify_time_range(payload_dict):
"""Verifies the issued at and expiration from a JWT payload.
Makes sure the current time (in UTC) falls between the issued at and
expiration for the JWT (with some skew allowed for via
``CLOCK_SKEW_SECS``).
Args:
payload_dict: dict, A dictionary containing a JWT payload.
Raises:
AppIdentityError: If there is no ``'iat'`` field in the payload
dictionary.
AppIdentityError: If there is no ``'exp'`` field in the payload
dictionary.
AppIdentityError: If the JWT expiration is too far in the future (i.e.
if the expiration would imply a token lifetime
longer than what is allowed.)
AppIdentityError: If the token appears to have been issued in the
future (up to clock skew).
AppIdentityError: If the token appears to have expired in the past
(up to clock skew).
"""
# Get the current time to use throughout.
now = int(time.time())
# Make sure issued at and expiration are in the payload.
issued_at = payload_dict.get('iat')
if issued_at is None:
raise AppIdentityError('No iat field in token: %s' % (payload_dict,))
expiration = payload_dict.get('exp')
if expiration is None:
raise AppIdentityError('No exp field in token: %s' % (payload_dict,))
# Make sure the expiration gives an acceptable token lifetime.
if expiration >= now + MAX_TOKEN_LIFETIME_SECS:
raise AppIdentityError('exp field too far in future: %s' %
(payload_dict,))
# Make sure (up to clock skew) that the token wasn't issued in the future.
earliest = issued_at - CLOCK_SKEW_SECS
if now < earliest:
raise AppIdentityError('Token used too early, %d < %d: %s' %
(now, earliest, payload_dict))
# Make sure (up to clock skew) that the token isn't already expired.
latest = expiration + CLOCK_SKEW_SECS
if now > latest:
raise AppIdentityError('Token used too late, %d > %d: %s' %
(now, latest, payload_dict))
def verify_signed_jwt_with_certs(jwt, certs, audience=None):
"""Verify a JWT against public certs.
See http://self-issued.info/docs/draft-jones-json-web-token.html.
Args:
jwt: string, A JWT.
certs: dict, Dictionary where values of public keys in PEM format.
audience: string, The audience, 'aud', that this JWT should contain. If
None then the JWT's 'aud' parameter is not verified.
Returns:
dict, The deserialized JSON payload in the JWT.
Raises:
AppIdentityError: if any checks are failed.
"""
jwt = _to_bytes(jwt)
if jwt.count(b'.') != 2:
raise AppIdentityError(
'Wrong number of segments in token: %s' % (jwt,))
header, payload, signature = jwt.split(b'.')
message_to_sign = header + b'.' + payload
signature = _urlsafe_b64decode(signature)
# Parse token.
payload_bytes = _urlsafe_b64decode(payload)
try:
payload_dict = json.loads(_from_bytes(payload_bytes))
except:
raise AppIdentityError('Can\'t parse token: %s' % (payload_bytes,))
# Verify that the signature matches the message.
_verify_signature(message_to_sign, signature, certs.values())
# Verify the issued at and created times in the payload.
_verify_time_range(payload_dict)
# Check audience.
_check_audience(payload_dict, audience)
return payload_dict
|