This file is indexed.

/usr/lib/python2.7/dist-packages/osc/oscssl.py is in osc 0.143.0-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
# Copyright (C) 2009 Novell Inc.
# This program is free software; it may be used, copied, modified
# and distributed under the terms of the GNU General Public Licence,
# either version 2, or (at your option) any later version.

from __future__ import print_function

import M2Crypto.httpslib
from M2Crypto.SSL.Checker import SSLVerificationError
from M2Crypto import m2, SSL
import M2Crypto.m2urllib2
import socket
import sys

try:
    from urllib.parse import urlparse, splithost, splitport, splittype
    from urllib.request import addinfourl
    from http.client import HTTPSConnection
except ImportError:
    #python 2.x
    from urlparse import urlparse
    from urllib import addinfourl, splithost, splitport, splittype
    from httplib import HTTPSConnection

from .core import raw_input

class TrustedCertStore:
    _tmptrusted = {}

    def __init__(self, host, port, app, cert):

        self.cert = cert
        self.host = host
        if self.host == None:
            raise Exception("empty host")
        if port:
            self.host += "_%d" % port
        import os
        self.dir = os.path.expanduser('~/.config/%s/trusted-certs' % app)
        self.file = self.dir + '/%s.pem' % self.host

    def is_known(self):
        if self.host in self._tmptrusted:
            return True

        import os
        if os.path.exists(self.file):
            return True
        return False

    def is_trusted(self):
        import os
        if self.host in self._tmptrusted:
            cert = self._tmptrusted[self.host]
        else:
            if not os.path.exists(self.file):
                return False
            from M2Crypto import X509
            cert = X509.load_cert(self.file)
        if self.cert.as_pem() == cert.as_pem():
            return True
        else:
            return False

    def trust_tmp(self):
        self._tmptrusted[self.host] = self.cert

    def trust_always(self):
        self.trust_tmp()
        from M2Crypto import X509
        import os
        if not os.path.exists(self.dir):
            os.makedirs(self.dir)
        self.cert.save_pem(self.file)


# verify_cb is called for each error once
# we only collect the errors and return suceess
# connection will be aborted later if it needs to
def verify_cb(ctx, ok, store):
    if not ctx.verrs:
        ctx.verrs = ValidationErrors()

    try:
        if not ok:
            ctx.verrs.record(store.get_current_cert(), store.get_error(), store.get_error_depth())
        return 1

    except Exception as e:
        print(e, file=sys.stderr)
        return 0

class FailCert:
    def __init__(self, cert):
        self.cert = cert
        self.errs = []

class ValidationErrors:

    def __init__(self):
        self.chain_ok = True
        self.cert_ok = True
        self.failures = {}

    def record(self, cert, err, depth):
        #print "cert for %s, level %d fail(%d)" % ( cert.get_subject().commonName, depth, err )
        if depth == 0:
            self.cert_ok = False
        else:
            self.chain_ok = False

        if not depth in self.failures:
            self.failures[depth] = FailCert(cert)
        else:
            if self.failures[depth].cert.get_fingerprint() != cert.get_fingerprint():
                raise Exception("Certificate changed unexpectedly. This should not happen")
        self.failures[depth].errs.append(err)

    def show(self, out):
        for depth in self.failures.keys():
            cert = self.failures[depth].cert
            print("*** certificate verify failed at depth %d" % depth, file=out)
            print("Subject: ", cert.get_subject(), file=out)
            print("Issuer:  ", cert.get_issuer(), file=out)
            print("Valid: ", cert.get_not_before(), "-", cert.get_not_after(), file=out)
            print("Fingerprint(MD5):  ", cert.get_fingerprint('md5'), file=out)
            print("Fingerprint(SHA1): ", cert.get_fingerprint('sha1'), file=out)

            for err in self.failures[depth].errs:
                reason = "Unknown"
                try:
                    import M2Crypto.Err
                    reason = M2Crypto.Err.get_x509_verify_error(err)
                except:
                    pass
                print("Reason:", reason, file=out)

    # check if the encountered errors could be ignored
    def could_ignore(self):
        if not 0 in self.failures:
            return True

        nonfatal_errors = [
                m2.X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY,
                m2.X509_V_ERR_SELF_SIGNED_CERT_IN_CHAIN,
                m2.X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT,
                m2.X509_V_ERR_CERT_UNTRUSTED,
                m2.X509_V_ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE,

                m2.X509_V_ERR_CERT_NOT_YET_VALID,
                m2.X509_V_ERR_CERT_HAS_EXPIRED,
                m2.X509_V_OK,
                ]

        canignore = True
        for err in self.failures[0].errs:
            if not err in nonfatal_errors:
                canignore = False
                break

        return canignore

class mySSLContext(SSL.Context):

    def __init__(self):
        SSL.Context.__init__(self, 'sslv23')
        self.set_options(m2.SSL_OP_NO_SSLv2 | m2.SSL_OP_NO_SSLv3)
        self.set_cipher_list("ECDHE-RSA-AES128-SHA256:AES128-GCM-SHA256:RC4:HIGH:!MD5:!aNULL:!EDH")
        self.set_session_cache_mode(m2.SSL_SESS_CACHE_CLIENT)
        self.verrs = None
        #self.set_info_callback() # debug
        self.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, depth=9, callback=lambda ok, store: verify_cb(self, ok, store))

class myHTTPSHandler(M2Crypto.m2urllib2.HTTPSHandler):
    handler_order = 499
    saved_session = None

    def __init__(self, *args, **kwargs):
        self.appname = kwargs.pop('appname', 'generic')
        M2Crypto.m2urllib2.HTTPSHandler.__init__(self, *args, **kwargs)

    # copied from M2Crypto.m2urllib2.HTTPSHandler
    # it's sole purpose is to use our myHTTPSHandler/myHTTPSProxyHandler class
    # ideally the m2urllib2.HTTPSHandler.https_open() method would be split into
    # "do_open()" and "https_open()" so that we just need to override
    # the small "https_open()" method...)
    def https_open(self, req):
        host = req.get_host()
        if not host:
            raise M2Crypto.m2urllib2.URLError('no host given: ' + req.get_full_url())

        # Our change: Check to see if we're using a proxy.
        # Then create an appropriate ssl-aware connection.
        full_url = req.get_full_url()
        target_host = urlparse(full_url)[1]

        if (target_host != host):
            h = myProxyHTTPSConnection(host = host, appname = self.appname, ssl_context = self.ctx)
            # M2Crypto.ProxyHTTPSConnection.putrequest expects a fullurl
            selector = full_url
        else:
            h = myHTTPSConnection(host = host, appname = self.appname, ssl_context = self.ctx)
            selector = req.get_selector()
        # End our change
        h.set_debuglevel(self._debuglevel)
        if self.saved_session:
            h.set_session(self.saved_session)

        headers = dict(req.headers)
        headers.update(req.unredirected_hdrs)
        # We want to make an HTTP/1.1 request, but the addinfourl
        # class isn't prepared to deal with a persistent connection.
        # It will try to read all remaining data from the socket,
        # which will block while the server waits for the next request.
        # So make sure the connection gets closed after the (only)
        # request.
        headers["Connection"] = "close"
        try:
            h.request(req.get_method(), selector, req.data, headers)
            s = h.get_session()
            if s:
                self.saved_session = s
            r = h.getresponse()
        except socket.error as err: # XXX what error?
            err.filename = full_url
            raise M2Crypto.m2urllib2.URLError(err)

        # Pick apart the HTTPResponse object to get the addinfourl
        # object initialized properly.

        # Wrap the HTTPResponse object in socket's file object adapter
        # for Windows.  That adapter calls recv(), so delegate recv()
        # to read().  This weird wrapping allows the returned object to
        # have readline() and readlines() methods.

        # XXX It might be better to extract the read buffering code
        # out of socket._fileobject() and into a base class.

        r.recv = r.read
        fp = socket._fileobject(r)

        resp = addinfourl(fp, r.msg, req.get_full_url())
        resp.code = r.status
        resp.msg = r.reason
        return resp

class myHTTPSConnection(M2Crypto.httpslib.HTTPSConnection):
    def __init__(self, *args, **kwargs):
        self.appname = kwargs.pop('appname', 'generic')
        M2Crypto.httpslib.HTTPSConnection.__init__(self, *args, **kwargs)

    def connect(self, *args):
        M2Crypto.httpslib.HTTPSConnection.connect(self, *args)
        verify_certificate(self)

    def getHost(self):
        return self.host

    def getPort(self):
        return self.port

class myProxyHTTPSConnection(M2Crypto.httpslib.ProxyHTTPSConnection, HTTPSConnection):
    def __init__(self, *args, **kwargs):
        self.appname = kwargs.pop('appname', 'generic')
        M2Crypto.httpslib.ProxyHTTPSConnection.__init__(self, *args, **kwargs)

    def _start_ssl(self):
        M2Crypto.httpslib.ProxyHTTPSConnection._start_ssl(self)
        verify_certificate(self)

    def endheaders(self, *args, **kwargs):
        if self._proxy_auth is None:
            self._proxy_auth = self._encode_auth()
        HTTPSConnection.endheaders(self, *args, **kwargs)        

    # broken in m2crypto: port needs to be an int
    def putrequest(self, method, url, skip_host=0, skip_accept_encoding=0):
        #putrequest is called before connect, so can interpret url and get
        #real host/port to be used to make CONNECT request to proxy
        proto, rest = splittype(url)
        if proto is None:
            raise ValueError("unknown URL type: %s" % url)
        #get host
        host, rest = splithost(rest)
        #try to get port
        host, port = splitport(host)
        #if port is not defined try to get from proto
        if port is None:
            try:
                port = self._ports[proto]
            except KeyError:
                raise ValueError("unknown protocol for: %s" % url)
        self._real_host = host
        self._real_port = int(port)
        M2Crypto.httpslib.HTTPSConnection.putrequest(self, method, url, skip_host, skip_accept_encoding)

    def getHost(self):
        return self._real_host

    def getPort(self):
        return self._real_port

def verify_certificate(connection):
    ctx = connection.sock.ctx
    verrs = ctx.verrs
    ctx.verrs = None
    cert = connection.sock.get_peer_cert()
    if not cert:
        connection.close()
        raise SSLVerificationError("server did not present a certificate")

    # XXX: should be check if the certificate is known anyways?
    # Maybe it changed to something valid.
    if not connection.sock.verify_ok():

        tc = TrustedCertStore(connection.getHost(), connection.getPort(), connection.appname, cert)

        if tc.is_known():

            if tc.is_trusted(): # ok, same cert as the stored one
                return
            else:
                print("WARNING: REMOTE HOST IDENTIFICATION HAS CHANGED!", file=sys.stderr)
                print("IT IS POSSIBLE THAT SOMEONE IS DOING SOMETHING NASTY!", file=sys.stderr)
                print("offending certificate is at '%s'" % tc.file, file=sys.stderr)
                raise SSLVerificationError("remote host identification has changed")

        # if http_debug is set we redirect sys.stdout to an StringIO
        # instance in order to do some header filtering (see conf module)
        # so we have to use the "original" stdout for printing
        out = getattr(connection, '_orig_stdout', sys.stdout)
        verrs.show(out)

        print(file=out)

        if not verrs.could_ignore():
            raise SSLVerificationError("Certificate validation error cannot be ignored")

        if not verrs.chain_ok:
            print("A certificate in the chain failed verification", file=out)
        if not verrs.cert_ok:
            print("The server certificate failed verification", file=out)

        while True:
            print("""
Would you like to
0 - quit (default)
1 - continue anyways
2 - trust the server certificate permanently
9 - review the server certificate
""", file=out)

            print("Enter choice [0129]: ", end='', file=out)
            r = raw_input()
            if not r or r == '0':
                connection.close()
                raise SSLVerificationError("Untrusted Certificate")
            elif r == '1':
                tc.trust_tmp()
                return
            elif r == '2':
                tc.trust_always()
                return
            elif r == '9':
                print(cert.as_text(), file=out)

# vim: sw=4 et