/usr/share/pyshared/fedmsg/crypto/x509.py is in python-fedmsg 0.7.1-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 | # This file is part of fedmsg.
# Copyright (C) 2012 Red Hat, Inc.
#
# fedmsg is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# fedmsg is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with fedmsg; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
#
# Authors: Ralph Bean <rbean@redhat.com>
#
""" ``fedmsg.crypto.x509`` - X.509 backend for :mod:`fedmsg.crypto`. """
import os
import requests
import time
import fedmsg.crypto
import fedmsg.encoding
import logging
log = logging.getLogger(__name__)
try:
import M2Crypto
# FIXME - m2ext will be unnecessary once the following bug is closed.
# https://bugzilla.osafoundation.org/show_bug.cgi?id=11690
import m2ext
disabled = False
except ImportError, e:
logging.basicConfig()
log.warn("Crypto disabled %r" % e)
disabled = True
def sign(message, ssldir=None, certname=None, **config):
""" Insert two new fields into the message dict and return it.
Those fields are:
- 'signature' - the computed RSA message digest of the JSON repr.
- 'certificate' - the base64 X509 certificate of the sending host.
"""
if disabled:
return message
if ssldir is None or certname is None:
raise ValueError("You must set the ssldir and certname keyword arguments.")
certificate = M2Crypto.X509.load_cert(
"%s/%s.crt" % (ssldir, certname)).as_pem()
# FIXME ? -- Opening this file requires elevated privileges in stg/prod.
rsa_private = M2Crypto.RSA.load_key(
"%s/%s.key" % (ssldir, certname))
digest = M2Crypto.EVP.MessageDigest('sha1')
digest.update(fedmsg.encoding.dumps(message))
signature = rsa_private.sign(digest.digest())
# Return a new dict containing the pairs in the original message as well
# as the new authn fields.
return dict(message.items() + [
('signature', signature.encode('base64')),
('certificate', certificate.encode('base64')),
])
def validate(message, ssldir=None, **config):
""" Return true or false if the message is signed appropriately.
Four things must be true:
1) The X509 cert must be signed by our CA
2) The cert must not be in our CRL.
3) We must be able to verify the signature using the RSA public key
contained in the X509 cert.
4) The topic of the message and the CN on the cert must appear in the
:term:`routing_policy` dict.
"""
if ssldir is None:
raise ValueError("You must set the ssldir keyword argument.")
def fail(reason):
log.warn("Failed validation. %s" % reason)
return False
if disabled:
fail("M2Crypto and/or m2ext missing!")
# Some sanity checking
for field in ['signature', 'certificate']:
if not field in message:
return fail("No %r field found." % field)
if not isinstance(message[field], basestring):
return fail("msg[%r] is not a string" % field)
# Peal off the auth datums
decode = lambda obj: obj.decode('base64')
signature, certificate = map(decode, (
message['signature'], message['certificate']))
message = fedmsg.crypto.strip_credentials(message)
# Build an X509 object
cert = M2Crypto.X509.load_cert_string(certificate)
# Validate the cert. Make sure it is signed by our CA.
# validate_certificate will one day be a part of M2Crypto.SSL.Context
# https://bugzilla.osafoundation.org/show_bug.cgi?id=11690
default_ca_cert_loc = 'https://fedoraproject.org/fedmsg/ca.crt'
cafile = _load_remote_cert(
config.get('ca_cert_location', default_ca_cert_loc),
config.get('ca_cert_cache', '/etc/pki/fedmsg/ca.crt'),
config.get('ca_cert_cache_expiry', 0),
**config)
ctx = m2ext.SSL.Context()
ctx.load_verify_locations(cafile=cafile)
if not ctx.validate_certificate(cert):
return fail("X509 certificate is not valid.")
# Load and check against the CRL
crl = _load_remote_cert(
config.get('crl_location', 'https://fedoraproject.org/fedmsg/crl.pem'),
config.get('crl_cache', '/var/cache/fedmsg/crl.pem'),
config.get('crl_cache_expiry', 1800),
**config)
crl = M2Crypto.X509.load_crl(crl)
# FIXME -- We need to check that the CRL is signed by our own CA.
# See https://bugzilla.osafoundation.org/show_bug.cgi?id=12954#c2
#if not ctx.validate_certificate(crl):
# return fail("X509 CRL is not valid.")
# FIXME -- we check the CRL, but by doing string comparison ourselves.
# This is not what we want to be doing.
# There is a patch into M2Crypto to handle this for us. We should use it
# once its integrated upstream.
# See https://bugzilla.osafoundation.org/show_bug.cgi?id=12954#c2
revoked_serials = [long(line.split(': ')[1].strip(), base=16)
for line in crl.as_text().split('\n')
if 'Serial Number:' in line]
if cert.get_serial_number() in revoked_serials:
return fail("X509 certificate is in the Revocation List (CRL)")
# If the cert is good, then test to see if the signature in the messages
# matches up with the provided cert.
rsa_public = cert.get_pubkey().get_rsa()
digest = M2Crypto.EVP.MessageDigest('sha1')
digest.update(fedmsg.encoding.dumps(message))
try:
if not rsa_public.verify(digest.digest(), signature):
raise M2Crypto.RSA.RSAError("RSA signature failed to validate.")
except M2Crypto.RSA.RSAError as e:
return fail(str(e))
# Now we know that the cert is valid. The message is *authenticated*.
# * Next step: Authorization *
# Load our policy from the config dict.
routing_policy = config.get('routing_policy', {})
# Determine the name of the signer of the message.
# This will be something like "shell-pkgs01.stg.phx2.fedoraproject.org"
subject = cert.get_subject()
signer = subject.get_entries_by_nid(subject.nid['CN'])[0]\
.get_data().as_text()
# Perform the authz dance
# Do we have a list of permitted senders for the topic of this message?
if message['topic'] in routing_policy:
# If so.. is the signer one of those permitted senders?
if signer in routing_policy[message['topic']]:
# We are good. The signer of this message is explicitly
# whitelisted to send on this topic in our config policy.
pass
else:
# We have a policy for this topic and $homeboy isn't on the list.
return fail("Authorization/routing_policy error. "
"Topic %r. Signer %r." % (message['topic'], signer))
else:
# We don't have a policy for this topic. How we react next for an
# underspecified routing_policy is based on a configuration option.
# Ideally, we are in nitpicky mode. We leave it disabled while
# standing up fedmsg across our environment so that we can build our
# policy without having the whole thing come crashing down.
if config.get('routing_nitpicky', False):
# We *are* in nitpicky mode. We don't have an entry in the
# routing_policy for the topic of this message.. and *nobody*
# gets in without a pass. That means that we fail the message.
return fail("Authorization/routing_policy underspecified.")
else:
# We are *not* in nitpicky mode. We don't have an entry in the
# routing_policy for the topic of this message.. but we don't
# really care. We pass on the message and ultimately return
# True later on.
pass
return True
def _load_remote_cert(location, cache, cache_expiry, **config):
""" Get a fresh copy from fp.o/fedmsg/crl.pem if ours is getting stale.
Return the local filename.
"""
try:
modtime = os.stat(cache).st_mtime
except OSError:
# File does not exist yet.
modtime = 0
if (
(not modtime and not cache_expiry) or
(cache_expiry and time.time() - modtime > cache_expiry)
):
try:
response = requests.get(location)
with open(cache, 'w') as f:
f.write(response.content)
except requests.exceptions.ConnectionError:
log.warn("Could not access %r" % location)
except IOError as e:
# If we couldn't write to the specified cache location, try a
# similar place but inside our home directory instead.
cache = os.path.expanduser("~/.local" + cache)
usr_dir = '/'.join(cache.split('/')[:-1])
if not os.path.isdir(usr_dir):
os.makedirs(usr_dir)
with open(cache, 'w') as f:
f.write(response.content)
return cache
|