/usr/lib/python3/dist-packages/provisioningserver/security.py is in python3-maas-provisioningserver 2.4.0~beta2-6865-gec43e47e6-0ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 | # Copyright 2014-2017 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""Cluster security code."""
__all__ = [
"calculate_digest",
"get_shared_secret_filesystem_path",
"get_shared_secret_from_filesystem",
]
from base64 import (
urlsafe_b64decode,
urlsafe_b64encode,
)
import binascii
from binascii import (
a2b_hex,
b2a_hex,
)
import errno
from hashlib import sha256
from hmac import HMAC
from os import (
fchmod,
makedirs,
)
from os.path import dirname
from sys import (
stderr,
stdin,
)
from threading import Lock
from cryptography.fernet import Fernet
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from provisioningserver.path import get_data_path
from provisioningserver.utils.fs import (
FileLock,
read_text_file,
write_text_file,
)
class MissingSharedSecret(RuntimeError):
"""Raised when the MAAS shared secret is missing."""
def to_hex(b):
"""Convert byte string to hex encoding."""
assert isinstance(b, bytes), "%r is not a byte string" % (b,)
return b2a_hex(b).decode("ascii")
def to_bin(u):
"""Convert ASCII-only unicode string to hex encoding."""
assert isinstance(u, str), "%r is not a unicode string" % (u,)
# Strip ASCII whitespace from u before converting.
return a2b_hex(u.encode("ascii").strip())
def get_shared_secret_filesystem_path():
"""Return the path to shared-secret on the filesystem."""
return get_data_path("var", "lib", "maas", "secret")
def get_shared_secret_from_filesystem():
"""Load the secret from the filesystem.
`get_shared_secret_filesystem_path` defines where the file will be
written. If the directory does not already exist, this will attempt to
create it, including all parent directories.
:return: A byte string of arbitrary length.
"""
secret_path = get_shared_secret_filesystem_path()
makedirs(dirname(secret_path), exist_ok=True)
with FileLock(secret_path).wait(10):
# Load secret from the filesystem, if it exists.
try:
secret_hex = read_text_file(secret_path)
except IOError as e:
if e.errno == errno.ENOENT:
return None
else:
raise
else:
return to_bin(secret_hex)
def set_shared_secret_on_filesystem(secret):
"""Write the secret to the filesystem.
`get_shared_secret_filesystem_path` defines where the file will be
written. If the directory does not already exist, this will attempt to
create it, including all parent directories.
:type secret: A byte string of arbitrary length.
"""
secret_path = get_shared_secret_filesystem_path()
makedirs(dirname(secret_path), exist_ok=True)
secret_hex = to_hex(secret)
with FileLock(secret_path).wait(10):
# Ensure that the file has sensible permissions.
with open(secret_path, "ab") as secret_f:
fchmod(secret_f.fileno(), 0o640)
# Write secret to the filesystem.
write_text_file(secret_path, secret_hex)
def calculate_digest(secret, message, salt):
"""Calculate a SHA-256 HMAC digest for the given data."""
assert isinstance(secret, bytes), "%r is not a byte string." % (secret,)
assert isinstance(message, bytes), "%r is not byte string." % (message,)
assert isinstance(salt, bytes), "%r is not a byte string." % (salt,)
hmacr = HMAC(secret, digestmod=sha256)
hmacr.update(message)
hmacr.update(salt)
return hmacr.digest()
# Cache the Fernet pre-shared key, since it's expensive to derive the key.
# Note: this will need to change to become a dictionary if salts are supported.
_fernet_psk = None
_fernet_lock = Lock()
# Warning: this should not generally be changed; a MAAS server will not be able
# to communicate with any peers using this value unless it matches. This value
# should be set relatively high, in order to make a brute-force attack to
# determine the MAAS secret impractical.
DEFAULT_ITERATION_COUNT = 100000
def _get_or_create_fernet_psk():
"""Gets or creates a pre-shared key to be used with the Fernet algorithm.
The pre-shared key is cached in a global to prevent the expense of
recalculating it.
Uses the MAAS secret (typically /var/lib/maas/secret) to derive the key.
:return: A pre-shared key suitable for use with the Fernet class.
"""
with _fernet_lock:
global _fernet_psk
if _fernet_psk is None:
secret = get_shared_secret_from_filesystem()
if secret is None:
raise MissingSharedSecret("MAAS shared secret not found.")
# Keying material is required by PBKDF2 to be a byte string.
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
# XXX: It might be better to use the maas_id for the salt.
# But that requires the maas_id to be known in advance by all
# parties to the encrypted communication. The format of the
# cached pre-shared key would also need to change.
salt=b"",
# XXX: an infrequently-changing variable iteration count might
# be nice, but that would require protocol support, and
# changing the way the PSK is cached.
iterations=DEFAULT_ITERATION_COUNT,
backend=default_backend()
)
key = kdf.derive(secret)
key = urlsafe_b64encode(key)
_fernet_psk = key
else:
key = _fernet_psk
return key
def _get_fernet_context():
"""Returns a Fernet context based on the MAAS secret."""
key = _get_or_create_fernet_psk()
f = Fernet(key)
return f
def fernet_encrypt_psk(message, raw=False):
"""Encrypts the specified message using the Fernet format.
Returns the encrypted token, as a byte string.
Note that a Fernet token includes the current time. Users decrypting a
the token can specify a TTL (in seconds) indicating how long the encrypted
message should be valid. So the system clock must be correct before calling
this function.
:param message: The message to encrypt.
:type message: Must be of type 'bytes' or a UTF-8 'str'.
:param raw: if True, returns the decoded base64 bytes representing the
Fernet token. The bytes must be converted back to base64 to be
decrypted. (Or the 'raw' argument on the corresponding
fernet_decrypt_psk() function can be used.)
:return: the encryption token, as a base64-encoded byte string.
"""
fernet = _get_fernet_context()
if isinstance(message, str):
message = message.encode("utf-8")
token = fernet.encrypt(message)
if raw is True:
token = urlsafe_b64decode(token)
return token
def fernet_decrypt_psk(token, ttl=None, raw=False):
"""Decrypts the specified Fernet token using the MAAS secret.
Returns the decrypted token as a byte string; the user is responsible for
converting it to the correct format or encoding.
:param message: The token to decrypt.
:type token: Must be of type 'bytes', or an ASCII base64 string.
:param ttl: Optional amount of time (in seconds) allowed to have elapsed
before the message is rejected upon decryption. Note that the Fernet
library considers times up to 60 seconds into the future (beyond the
TTL) to be valid.
:param raw: if True, treats the string as the decoded base64 bytes of a
Fernet token, and attempts to encode them (as expected by the Fernet
APIs) before decrypting.
:return: bytes
"""
if raw is True:
token = urlsafe_b64encode(token)
f = _get_fernet_context()
if isinstance(token, str):
token = token.encode("ascii")
return f.decrypt(token, ttl=ttl)
class InstallSharedSecretScript:
"""Install a shared-secret onto a cluster.
This class conforms to the contract that :py:func:`MainScript.register`
requires.
"""
@staticmethod
def add_arguments(parser):
"""Initialise options for storing a shared-secret.
:param parser: An instance of :class:`ArgumentParser`.
"""
@staticmethod
def run(args):
"""Install a shared-secret to this cluster.
When invoked interactively, you'll be prompted to enter the secret.
Otherwise the secret will be read from the first line of stdin.
In both cases, the secret must be hex/base16 encoded.
"""
# Obtain the secret from the invoker.
if stdin.isatty():
try:
secret_hex = input("Secret (hex/base16 encoded): ")
except EOFError:
print() # So that the shell prompt appears on the next line.
raise SystemExit(1)
except KeyboardInterrupt:
print() # So that the shell prompt appears on the next line.
raise
else:
secret_hex = stdin.readline()
# Decode and install the secret.
try:
secret = to_bin(secret_hex.strip())
except binascii.Error as error:
print("Secret could not be decoded:", str(error), file=stderr)
raise SystemExit(1)
else:
set_shared_secret_on_filesystem(secret)
shared_secret_path = get_shared_secret_filesystem_path()
print("Secret installed to %s." % shared_secret_path)
raise SystemExit(0)
class CheckForSharedSecretScript:
"""Check for the presence of a shared-secret on a cluster.
This class conforms to the contract that :py:func:`MainScript.register`
requires.
"""
@staticmethod
def add_arguments(parser):
"""Initialise options for checking the presence of a shared-secret.
:param parser: An instance of :class:`ArgumentParser`.
"""
@staticmethod
def run(args):
"""Check for the presence of a shared-secret on this cluster.
Exits 0 (zero) if a shared-secret has been installed.
"""
if get_shared_secret_from_filesystem() is None:
print("Shared-secret is NOT installed.")
raise SystemExit(1)
else:
print("Shared-secret is installed.")
raise SystemExit(0)
|