/usr/lib/python2.7/dist-packages/pysimplesoap/wsse.py is in python-pysimplesoap 1.16-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 | #!/usr/bin/python
# -*- coding: utf-8 -*-
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 3, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTIBILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
# for more details.
"""Pythonic simple SOAP Client plugins for WebService Security extensions"""
from __future__ import unicode_literals
import sys
if sys.version > '3':
basestring = unicode = str
import datetime
from decimal import Decimal
import os
import logging
import hashlib
import warnings
from . import __author__, __copyright__, __license__, __version__
from .simplexml import SimpleXMLElement
# Namespaces:
WSSE_URI = 'http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-secext-1.0.xsd'
WSU_URI = "http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd"
XMLDSIG_URI = "http://www.w3.org/2000/09/xmldsig#"
X509v3_URI = "http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-x509-token-profile-1.0#X509v3"
Base64Binary_URI = "http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-soap-message-security-1.0#Base64Binary"
class UsernameToken:
"WebService Security extension to add a basic credentials to xml request"
def __init__(self, username="", password=""):
self.token = {
'wsse:UsernameToken': {
'wsse:Username': username,
'wsse:Password': password,
}
}
def preprocess(self, client, request, method, args, kwargs, headers, soap_uri):
"Add basic credentials to outgoing message"
# always extract WS Security header and send it
header = request('Header', ns=soap_uri, )
k = 'wsse:Security'
# for backward compatibility, use header if given:
if k in headers:
self.token = headers[k]
# convert the token to xml
header.marshall(k, self.token, ns=False, add_children_ns=False)
header(k)['xmlns:wsse'] = WSSE_URI
#<wsse:UsernameToken xmlns:wsu='http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd'>
def postprocess(self, client, response, method, args, kwargs, headers, soap_uri):
"Analyze incoming credentials"
# TODO: add some password validation callback?
pass
BIN_TOKEN_TMPL = """<?xml version="1.0" encoding="UTF-8"?>
<wsse:Security soapenv:mustUnderstand="1" xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:wsse="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-secext-1.0.xsd">
<wsse:BinarySecurityToken EncodingType="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-soap-message-security-1.0#Base64Binary" ValueType="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-x509-token-profile-1.0#X509v3" wsu:Id="CertId-45851B081998E431E8132880700036719" xmlns:wsu="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd">
%(certificate)s</wsse:BinarySecurityToken>
<ds:Signature Id="Signature-13" xmlns:ds="http://www.w3.org/2000/09/xmldsig#">
%(signed_info)s
<ds:SignatureValue>%(signature_value)s</ds:SignatureValue>
<ds:KeyInfo Id="KeyId-45851B081998E431E8132880700036720">
<wsse:SecurityTokenReference wsu:Id="STRId-45851B081998E431E8132880700036821" xmlns:wsu="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd">
<wsse:Reference URI="#CertId-45851B081998E431E8132880700036719" ValueType="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-x509-token-profile-1.0#X509v3"/>
</wsse:SecurityTokenReference>
</ds:KeyInfo>
</ds:Signature>
</wsse:Security>
"""
class BinaryTokenSignature:
"WebService Security extension to add a basic signature to xml request"
def __init__(self, certificate="", private_key="", password=None, cacert=None):
# read the X509v3 certificate (PEM)
self.certificate = ''.join([line for line in open(certificate)
if not line.startswith("---")])
self.private_key = private_key
self.password = password
self.cacert = cacert
def preprocess(self, client, request, method, args, kwargs, headers, soap_uri):
"Sign the outgoing SOAP request"
# get xml elements:
body = request('Body', ns=soap_uri, )
header = request('Header', ns=soap_uri, )
# prepare body xml attributes to be signed (reference)
body['wsu:Id'] = "id-14"
body['xmlns:wsu'] = WSU_URI
# workaround: copy namespaces so lxml can parse the xml to be signed
for attr, value in request[:]:
if attr.startswith("xmlns"):
body[attr] = value
# use the internal tag xml representation (not the full xml document)
ref_xml = repr(body)
# sign using RSA-SHA1 (XML Security)
from . import xmlsec
vars = xmlsec.rsa_sign(ref_xml, "#id-14",
self.private_key, self.password)
vars['certificate'] = self.certificate
# generate the xml (filling the placeholders)
wsse = SimpleXMLElement(BIN_TOKEN_TMPL % vars)
header.import_node(wsse)
def postprocess(self, client, response, method, args, kwargs, headers, soap_uri):
"Verify the signature of the incoming response"
from . import xmlsec
# get xml elements:
body = response('Body', ns=soap_uri, )
header = response('Header', ns=soap_uri, )
wsse = header("Security", ns=WSSE_URI)
cert = wsse("BinarySecurityToken", ns=WSSE_URI)
# check that the cert (binary token) is coming in the correct format:
self.__check(cert["EncodingType"], Base64Binary_URI)
self.__check(cert["ValueType"], X509v3_URI)
# extract the certificate (in DER to avoid new line & padding issues!)
cert_der = str(cert).decode("base64")
public_key = xmlsec.x509_extract_rsa_public_key(cert_der, binary=True)
# validate the certificate using the certification authority:
if not self.cacert:
warnings.warn("No CA provided, WSSE not validating certificate")
elif not xmlsec.x509_verify(self.cacert, cert_der, binary=True):
raise RuntimeError("WSSE certificate validation failed")
# check body xml attributes was signed correctly (reference)
self.__check(body['xmlns:wsu'], WSU_URI)
ref_uri = body['wsu:Id']
signature = wsse("Signature", ns=XMLDSIG_URI)
signed_info = signature("SignedInfo")
signature_value = signature("SignatureValue")
# TODO: these sanity checks should be moved to xmlsec?
self.__check(signed_info("Reference")['URI'], "#" + ref_uri)
self.__check(signed_info("SignatureMethod")['Algorithm'],
XMLDSIG_URI + "rsa-sha1")
self.__check(signed_info("Reference")("DigestMethod")['Algorithm'],
XMLDSIG_URI + "sha1")
# TODO: check KeyInfo uses the correct SecurityTokenReference
# workaround: copy namespaces so lxml can parse the xml to be signed
for attr, value in response[:]:
if attr.startswith("xmlns"):
body[attr] = value
# use the internal tag xml representation (not the full xml document)
ref_xml = xmlsec.canonicalize(repr(body))
# verify the signed hash
computed_hash = xmlsec.sha1_hash_digest(ref_xml)
digest_value = str(signed_info("Reference")("DigestValue"))
if computed_hash != digest_value:
raise RuntimeError("WSSE SHA1 hash digests mismatch")
# workaround: prepare the signed info (assure the parent ns is present)
signed_info['xmlns'] = XMLDSIG_URI
xml = repr(signed_info)
# verify the signature using RSA-SHA1 (XML Security)
ok = xmlsec.rsa_verify(xml, str(signature_value), public_key)
if not ok:
raise RuntimeError("WSSE RSA-SHA1 signature verification failed")
# TODO: remove any unsigned part from the xml?
def __check(self, value, expected, msg="WSSE sanity check failed"):
if value != expected:
raise RuntimeError(msg)
|