/usr/share/pyshared/txaws/client/ssl.py is in python-txaws 0.2-0ubuntu10.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 | from glob import glob
import os
import re
from OpenSSL import SSL
from OpenSSL.crypto import load_certificate, FILETYPE_PEM
from twisted.internet.ssl import CertificateOptions
__all__ = ["VerifyingContextFactory", "get_ca_certs"]
class VerifyingContextFactory(CertificateOptions):
"""
A SSL context factory to pass to C{connectSSL} to check for hostname
validity.
"""
def __init__(self, host, caCerts=None):
if caCerts is None:
caCerts = get_global_ca_certs()
CertificateOptions.__init__(self, verify=True, caCerts=caCerts)
self.host = host
def _dnsname_match(self, dn, host):
pats = []
for frag in dn.split(r"."):
if frag == "*":
pats.append("[^.]+")
else:
frag = re.escape(frag)
pats.append(frag.replace(r"\*", "[^.]*"))
rx = re.compile(r"\A" + r"\.".join(pats) + r"\Z", re.IGNORECASE)
return bool(rx.match(host))
def verify_callback(self, connection, x509, errno, depth, preverifyOK):
# Only check depth == 0 on chained certificates.
if depth == 0:
dns_found = False
if getattr(x509, "get_extension", None) is not None:
for index in range(x509.get_extension_count()):
extension = x509.get_extension(index)
if extension.get_short_name() != "subjectAltName":
continue
data = str(extension)
for element in data.split(", "):
key, value = element.split(":")
if key != "DNS":
continue
if self._dnsname_match(value, self.host):
return preverifyOK
dns_found = True
break
if not dns_found:
commonName = x509.get_subject().commonName
if commonName is None:
return False
if not self._dnsname_match(commonName, self.host):
return False
else:
return False
return preverifyOK
def _makeContext(self):
context = CertificateOptions._makeContext(self)
context.set_verify(
SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
self.verify_callback)
return context
def get_ca_certs(files="/etc/ssl/certs/*.pem"):
"""Retrieve a list of CAs pointed by C{files}."""
certificateAuthorityMap = {}
for certFileName in glob(files):
# There might be some dead symlinks in there, so let's make sure it's
# real.
if not os.path.exists(certFileName):
continue
certFile = open(certFileName)
data = certFile.read()
certFile.close()
x509 = load_certificate(FILETYPE_PEM, data)
digest = x509.digest("sha1")
# Now, de-duplicate in case the same cert has multiple names.
certificateAuthorityMap[digest] = x509
return certificateAuthorityMap.values()
_ca_certs = None
def get_global_ca_certs():
"""Retrieve a singleton of CA certificates."""
global _ca_certs
if _ca_certs is None:
_ca_certs = get_ca_certs()
return _ca_certs
|