/usr/lib/python3/dist-packages/wormhole/_key.py is in magic-wormhole 0.10.3-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 | from __future__ import print_function, absolute_import, unicode_literals
from hashlib import sha256
import six
from zope.interface import implementer
from attr import attrs, attrib
from attr.validators import provides, instance_of
from spake2 import SPAKE2_Symmetric
from hkdf import Hkdf
from nacl.secret import SecretBox
from nacl.exceptions import CryptoError
from nacl import utils
from automat import MethodicalMachine
from .util import (to_bytes, bytes_to_hexstr, hexstr_to_bytes,
bytes_to_dict, dict_to_bytes)
from . import _interfaces
CryptoError
__all__ = ["derive_key", "derive_phase_key", "CryptoError",
"Key"]
def HKDF(skm, outlen, salt=None, CTXinfo=b""):
return Hkdf(salt, skm).expand(CTXinfo, outlen)
def derive_key(key, purpose, length=SecretBox.KEY_SIZE):
if not isinstance(key, type(b"")): raise TypeError(type(key))
if not isinstance(purpose, type(b"")): raise TypeError(type(purpose))
if not isinstance(length, six.integer_types): raise TypeError(type(length))
return HKDF(key, length, CTXinfo=purpose)
def derive_phase_key(key, side, phase):
assert isinstance(side, type("")), type(side)
assert isinstance(phase, type("")), type(phase)
side_bytes = side.encode("ascii")
phase_bytes = phase.encode("ascii")
purpose = (b"wormhole:phase:"
+ sha256(side_bytes).digest()
+ sha256(phase_bytes).digest())
return derive_key(key, purpose)
def decrypt_data(key, encrypted):
assert isinstance(key, type(b"")), type(key)
assert isinstance(encrypted, type(b"")), type(encrypted)
assert len(key) == SecretBox.KEY_SIZE, len(key)
box = SecretBox(key)
data = box.decrypt(encrypted)
return data
def encrypt_data(key, plaintext):
assert isinstance(key, type(b"")), type(key)
assert isinstance(plaintext, type(b"")), type(plaintext)
assert len(key) == SecretBox.KEY_SIZE, len(key)
box = SecretBox(key)
nonce = utils.random(SecretBox.NONCE_SIZE)
return box.encrypt(plaintext, nonce)
# the Key we expose to callers (Boss, Ordering) is responsible for sorting
# the two messages (got_code and got_pake), then delivering them to
# _SortedKey in the right order.
@attrs
@implementer(_interfaces.IKey)
class Key(object):
_appid = attrib(validator=instance_of(type(u"")))
_versions = attrib(validator=instance_of(dict))
_side = attrib(validator=instance_of(type(u"")))
_timing = attrib(validator=provides(_interfaces.ITiming))
m = MethodicalMachine()
set_trace = getattr(m, "_setTrace", lambda self, f: None)
def __attrs_post_init__(self):
self._SK = _SortedKey(self._appid, self._versions, self._side,
self._timing)
self._debug_pake_stashed = False # for tests
def wire(self, boss, mailbox, receive):
self._SK.wire(boss, mailbox, receive)
@m.state(initial=True)
def S00(self): pass # pragma: no cover
@m.state()
def S01(self): pass # pragma: no cover
@m.state()
def S10(self): pass # pragma: no cover
@m.state()
def S11(self): pass # pragma: no cover
@m.input()
def got_code(self, code): pass
@m.input()
def got_pake(self, body): pass
@m.output()
def stash_pake(self, body):
self._pake = body
self._debug_pake_stashed = True
@m.output()
def deliver_code(self, code):
self._SK.got_code(code)
@m.output()
def deliver_pake(self, body):
self._SK.got_pake(body)
@m.output()
def deliver_code_and_stashed_pake(self, code):
self._SK.got_code(code)
self._SK.got_pake(self._pake)
S00.upon(got_code, enter=S10, outputs=[deliver_code])
S10.upon(got_pake, enter=S11, outputs=[deliver_pake])
S00.upon(got_pake, enter=S01, outputs=[stash_pake])
S01.upon(got_code, enter=S11, outputs=[deliver_code_and_stashed_pake])
@attrs
class _SortedKey(object):
_appid = attrib(validator=instance_of(type(u"")))
_versions = attrib(validator=instance_of(dict))
_side = attrib(validator=instance_of(type(u"")))
_timing = attrib(validator=provides(_interfaces.ITiming))
m = MethodicalMachine()
set_trace = getattr(m, "_setTrace", lambda self, f: None)
def wire(self, boss, mailbox, receive):
self._B = _interfaces.IBoss(boss)
self._M = _interfaces.IMailbox(mailbox)
self._R = _interfaces.IReceive(receive)
@m.state(initial=True)
def S0_know_nothing(self): pass # pragma: no cover
@m.state()
def S1_know_code(self): pass # pragma: no cover
@m.state()
def S2_know_key(self): pass # pragma: no cover
@m.state(terminal=True)
def S3_scared(self): pass # pragma: no cover
# from Boss
@m.input()
def got_code(self, code): pass
# from Ordering
def got_pake(self, body):
assert isinstance(body, type(b"")), type(body)
payload = bytes_to_dict(body)
if "pake_v1" in payload:
self.got_pake_good(hexstr_to_bytes(payload["pake_v1"]))
else:
self.got_pake_bad()
@m.input()
def got_pake_good(self, msg2): pass
@m.input()
def got_pake_bad(self): pass
@m.output()
def build_pake(self, code):
with self._timing.add("pake1", waiting="crypto"):
self._sp = SPAKE2_Symmetric(to_bytes(code),
idSymmetric=to_bytes(self._appid))
msg1 = self._sp.start()
body = dict_to_bytes({"pake_v1": bytes_to_hexstr(msg1)})
self._M.add_message("pake", body)
@m.output()
def scared(self):
self._B.scared()
@m.output()
def compute_key(self, msg2):
assert isinstance(msg2, type(b""))
with self._timing.add("pake2", waiting="crypto"):
key = self._sp.finish(msg2)
self._B.got_key(key)
phase = "version"
data_key = derive_phase_key(key, self._side, phase)
plaintext = dict_to_bytes(self._versions)
encrypted = encrypt_data(data_key, plaintext)
self._M.add_message(phase, encrypted)
self._R.got_key(key)
S0_know_nothing.upon(got_code, enter=S1_know_code, outputs=[build_pake])
S1_know_code.upon(got_pake_good, enter=S2_know_key, outputs=[compute_key])
S1_know_code.upon(got_pake_bad, enter=S3_scared, outputs=[scared])
|