This file is indexed.

/usr/share/pyshared/txaws/client/ssl.py is in python-txaws 0.2-0ubuntu10.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
from glob import glob
import os
import re

from OpenSSL import SSL
from OpenSSL.crypto import load_certificate, FILETYPE_PEM

from twisted.internet.ssl import CertificateOptions


__all__ = ["VerifyingContextFactory", "get_ca_certs"]


class VerifyingContextFactory(CertificateOptions):
    """
    A SSL context factory to pass to C{connectSSL} to check for hostname
    validity.
    """

    def __init__(self, host, caCerts=None):
        if caCerts is None:
            caCerts = get_global_ca_certs()
        CertificateOptions.__init__(self, verify=True, caCerts=caCerts)
        self.host = host

    def _dnsname_match(self, dn, host):
        pats = []
        for frag in dn.split(r"."):
            if frag == "*":
                pats.append("[^.]+")
            else:
                frag = re.escape(frag)
                pats.append(frag.replace(r"\*", "[^.]*"))

        rx = re.compile(r"\A" + r"\.".join(pats) + r"\Z", re.IGNORECASE)
        return bool(rx.match(host))

    def verify_callback(self, connection, x509, errno, depth, preverifyOK):
        # Only check depth == 0 on chained certificates.
        if depth == 0:
            dns_found = False
            if getattr(x509, "get_extension", None) is not None:
                for index in range(x509.get_extension_count()):
                    extension = x509.get_extension(index)
                    if extension.get_short_name() != "subjectAltName":
                        continue
                    data = str(extension)
                    for element in data.split(", "):
                        key, value = element.split(":")
                        if key != "DNS":
                            continue
                        if self._dnsname_match(value, self.host):
                            return preverifyOK
                        dns_found = True
                    break
            if not dns_found:
                commonName = x509.get_subject().commonName
                if commonName is None:
                    return False
                if not self._dnsname_match(commonName, self.host):
                    return False
            else:
                return False
        return preverifyOK

    def _makeContext(self):
        context = CertificateOptions._makeContext(self)
        context.set_verify(
            SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
            self.verify_callback)
        return context


def get_ca_certs(files="/etc/ssl/certs/*.pem"):
    """Retrieve a list of CAs pointed by C{files}."""
    certificateAuthorityMap = {}
    for certFileName in glob(files):
        # There might be some dead symlinks in there, so let's make sure it's
        # real.
        if not os.path.exists(certFileName):
            continue
        certFile = open(certFileName)
        data = certFile.read()
        certFile.close()
        x509 = load_certificate(FILETYPE_PEM, data)
        digest = x509.digest("sha1")
        # Now, de-duplicate in case the same cert has multiple names.
        certificateAuthorityMap[digest] = x509
    return certificateAuthorityMap.values()


_ca_certs = None


def get_global_ca_certs():
    """Retrieve a singleton of CA certificates."""
    global _ca_certs
    if _ca_certs is None:
        _ca_certs = get_ca_certs()
    return _ca_certs