/usr/lib/python3/dist-packages/saml2/assertion.py is in python3-pysaml2 4.0.2-0ubuntu3.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 | #!/usr/bin/env python
# -*- coding: utf-8 -*-
import importlib
import logging
import re
from saml2.saml import NAME_FORMAT_URI
import six
from saml2 import xmlenc
from saml2 import saml
from saml2.time_util import instant, in_a_while
from saml2.attribute_converter import from_local, get_local_name
from saml2.s_utils import sid, MissingValue
from saml2.s_utils import factory
from saml2.s_utils import assertion_factory
logger = logging.getLogger(__name__)
def _filter_values(vals, vlist=None, must=False):
""" Removes values from *vals* that does not appear in vlist
:param vals: The values that are to be filtered
:param vlist: required or optional value
:param must: Whether the allowed values must appear
:return: The set of values after filtering
"""
if not vlist: # No value specified equals any value
return vals
if isinstance(vlist, six.string_types):
vlist = [vlist]
res = []
for val in vlist:
if val in vals:
res.append(val)
if must:
if res:
return res
else:
raise MissingValue("Required attribute value missing")
else:
return res
def _match(attr, ava):
if attr in ava:
return attr
_la = attr.lower()
if _la in ava:
return _la
for _at in ava.keys():
if _at.lower() == _la:
return _at
return None
def filter_on_attributes(ava, required=None, optional=None, acs=None,
fail_on_unfulfilled_requirements=True):
""" Filter
:param ava: An attribute value assertion as a dictionary
:param required: list of RequestedAttribute instances defined to be
required
:param optional: list of RequestedAttribute instances defined to be
optional
:param fail_on_unfulfilled_requirements: If required attributes
are missing fail or fail not depending on this parameter.
:return: The modified attribute value assertion
"""
def _match_attr_name(attr, ava):
try:
friendly_name = attr["friendly_name"]
except KeyError:
friendly_name = get_local_name(acs, attr["name"], attr["name_format"])
_fn = _match(friendly_name, ava)
if not _fn: # In the unlikely case that someone has provided us with URIs as attribute names
_fn = _match(attr["name"], ava)
return _fn
def _apply_attr_value_restrictions(attr, res, must=False):
try:
values = [av["text"] for av in attr["attribute_value"]]
except KeyError:
values = []
try:
res[_fn].extend(_filter_values(ava[_fn], values))
except KeyError:
res[_fn] = _filter_values(ava[_fn], values)
return _filter_values(ava[_fn], values, must)
res = {}
if required is None:
required = []
for attr in required:
_fn = _match_attr_name(attr, ava)
if _fn:
_apply_attr_value_restrictions(attr, res, True)
elif fail_on_unfulfilled_requirements:
desc = "Required attribute missing: '%s' (%s)" % (attr["name"],
_fn)
raise MissingValue(desc)
if optional is None:
optional = []
for attr in optional:
_fn = _match_attr_name(attr, ava)
if _fn:
_apply_attr_value_restrictions(attr, res, False)
return res
def filter_on_demands(ava, required=None, optional=None):
""" Never return more than is needed. Filters out everything
the server is prepared to return but the receiver doesn't ask for
:param ava: Attribute value assertion as a dictionary
:param required: Required attributes
:param optional: Optional attributes
:return: The possibly reduced assertion
"""
# Is all what's required there:
if required is None:
required = {}
lava = dict([(k.lower(), k) for k in ava.keys()])
for attr, vals in required.items():
attr = attr.lower()
if attr in lava:
if vals:
for val in vals:
if val not in ava[lava[attr]]:
raise MissingValue(
"Required attribute value missing: %s,%s" % (attr,
val))
else:
raise MissingValue("Required attribute missing: %s" % (attr,))
if optional is None:
optional = {}
oka = [k.lower() for k in required.keys()]
oka.extend([k.lower() for k in optional.keys()])
# OK, so I can imaging releasing values that are not absolutely necessary
# but not attributes that are not asked for.
for attr in lava.keys():
if attr not in oka:
del ava[lava[attr]]
return ava
def filter_on_wire_representation(ava, acs, required=None, optional=None):
"""
:param ava: A dictionary with attributes and values
:param acs: List of tuples (Attribute Converter name,
Attribute Converter instance)
:param required: A list of saml.Attributes
:param optional: A list of saml.Attributes
:return: Dictionary of expected/wanted attributes and values
"""
acsdic = dict([(ac.name_format, ac) for ac in acs])
if required is None:
required = []
if optional is None:
optional = []
res = {}
for attr, val in ava.items():
done = False
for req in required:
try:
_name = acsdic[req.name_format]._to[attr]
if _name == req.name:
res[attr] = val
done = True
except KeyError:
pass
if done:
continue
for opt in optional:
try:
_name = acsdic[opt.name_format]._to[attr]
if _name == opt.name:
res[attr] = val
break
except KeyError:
pass
return res
def filter_attribute_value_assertions(ava, attribute_restrictions=None):
""" Will weed out attribute values and values according to the
rules defined in the attribute restrictions. If filtering results in
an attribute without values, then the attribute is removed from the
assertion.
:param ava: The incoming attribute value assertion (dictionary)
:param attribute_restrictions: The rules that govern which attributes
and values that are allowed. (dictionary)
:return: The modified attribute value assertion
"""
if not attribute_restrictions:
return ava
for attr, vals in list(ava.items()):
_attr = attr.lower()
try:
_rests = attribute_restrictions[_attr]
except KeyError:
del ava[attr]
else:
if _rests is None:
continue
if isinstance(vals, six.string_types):
vals = [vals]
rvals = []
for restr in _rests:
for val in vals:
if restr.match(val):
rvals.append(val)
if rvals:
ava[attr] = list(set(rvals))
else:
del ava[attr]
return ava
def restriction_from_attribute_spec(attributes):
restr = {}
for attribute in attributes:
restr[attribute.name] = {}
for val in attribute.attribute_value:
if not val.text:
restr[attribute.name] = None
break
else:
restr[attribute.name] = re.compile(val.text)
return restr
def post_entity_categories(maps, **kwargs):
restrictions = {}
if kwargs["mds"]:
try:
ecs = kwargs["mds"].entity_categories(kwargs["sp_entity_id"])
except KeyError:
for ec_map in maps:
for attr in ec_map[""]:
restrictions[attr] = None
else:
for ec_map in maps:
for key, val in ec_map.items():
if key == "": # always released
attrs = val
elif isinstance(key, tuple):
attrs = val
for _key in key:
try:
assert _key in ecs
except AssertionError:
attrs = []
break
elif key in ecs:
attrs = val
else:
attrs = []
for attr in attrs:
restrictions[attr] = None
return restrictions
class Policy(object):
""" handles restrictions on assertions """
def __init__(self, restrictions=None):
if restrictions:
self.compile(restrictions)
else:
self._restrictions = None
self.acs = []
def compile(self, restrictions):
""" This is only for IdPs or AAs, and it's about limiting what
is returned to the SP.
In the configuration file, restrictions on which values that
can be returned are specified with the help of regular expressions.
This function goes through and pre-compiles the regular expressions.
:param restrictions:
:return: The assertion with the string specification replaced with
a compiled regular expression.
"""
self._restrictions = restrictions.copy()
for who, spec in self._restrictions.items():
if spec is None:
continue
try:
items = spec["entity_categories"]
except KeyError:
pass
else:
ecs = []
for cat in items:
_mod = importlib.import_module(
"saml2.entity_category.%s" % cat)
_ec = {}
for key, items in _mod.RELEASE.items():
_ec[key] = [k.lower() for k in items]
ecs.append(_ec)
spec["entity_categories"] = ecs
try:
restr = spec["attribute_restrictions"]
except KeyError:
continue
if restr is None:
continue
_are = {}
for key, values in restr.items():
if not values:
_are[key.lower()] = None
continue
_are[key.lower()] = [re.compile(value) for value in values]
spec["attribute_restrictions"] = _are
logger.debug("policy restrictions: %s", self._restrictions)
return self._restrictions
def get(self, attribute, sp_entity_id, default=None, post_func=None,
**kwargs):
"""
:param attribute:
:param sp_entity_id:
:param default:
:param post_func:
:return:
"""
if not self._restrictions:
return default
try:
try:
val = self._restrictions[sp_entity_id][attribute]
except KeyError:
try:
val = self._restrictions["default"][attribute]
except KeyError:
val = None
except KeyError:
val = None
if val is None:
return default
elif post_func:
return post_func(val, sp_entity_id=sp_entity_id, **kwargs)
else:
return val
def get_nameid_format(self, sp_entity_id):
""" Get the NameIDFormat to used for the entity id
:param: The SP entity ID
:retur: The format
"""
return self.get("nameid_format", sp_entity_id,
saml.NAMEID_FORMAT_TRANSIENT)
def get_name_form(self, sp_entity_id):
""" Get the NameFormat to used for the entity id
:param: The SP entity ID
:retur: The format
"""
return self.get("name_form", sp_entity_id, NAME_FORMAT_URI)
def get_lifetime(self, sp_entity_id):
""" The lifetime of the assertion
:param sp_entity_id: The SP entity ID
:param: lifetime as a dictionary
"""
# default is a hour
return self.get("lifetime", sp_entity_id, {"hours": 1})
def get_attribute_restrictions(self, sp_entity_id):
""" Return the attribute restriction for SP that want the information
:param sp_entity_id: The SP entity ID
:return: The restrictions
"""
return self.get("attribute_restrictions", sp_entity_id)
def get_fail_on_missing_requested(self, sp_entity_id):
""" Return the whether the IdP should should fail if the SPs
requested attributes could not be found.
:param sp_entity_id: The SP entity ID
:return: The restrictions
"""
return self.get("fail_on_missing_requested", sp_entity_id, True)
def entity_category_attributes(self, ec):
if not self._restrictions:
return None
ec_maps = self._restrictions["default"]["entity_categories"]
for ec_map in ec_maps:
try:
return ec_map[ec]
except KeyError:
pass
return []
def get_entity_categories(self, sp_entity_id, mds):
"""
:param sp_entity_id:
:param mds: MetadataStore instance
:return: A dictionary with restrictions
"""
kwargs = {"mds": mds}
return self.get("entity_categories", sp_entity_id, default={},
post_func=post_entity_categories, **kwargs)
def not_on_or_after(self, sp_entity_id):
""" When the assertion stops being valid, should not be
used after this time.
:param sp_entity_id: The SP entity ID
:return: String representation of the time
"""
return in_a_while(**self.get_lifetime(sp_entity_id))
def filter(self, ava, sp_entity_id, mdstore, required=None, optional=None):
""" What attribute and attribute values returns depends on what
the SP has said it wants in the request or in the metadata file and
what the IdP/AA wants to release. An assumption is that what the SP
asks for overrides whatever is in the metadata. But of course the
IdP never releases anything it doesn't want to.
:param ava: The information about the subject as a dictionary
:param sp_entity_id: The entity ID of the SP
:param mdstore: A Metadata store
:param required: Attributes that the SP requires in the assertion
:param optional: Attributes that the SP regards as optional
:return: A possibly modified AVA
"""
_ava = None
if required or optional:
logger.debug("required: %s, optional: %s", required, optional)
_ava = filter_on_attributes(
ava.copy(), required, optional, self.acs,
self.get_fail_on_missing_requested(sp_entity_id))
_rest = self.get_entity_categories(sp_entity_id, mdstore)
if _rest:
ava_ec = filter_attribute_value_assertions(ava.copy(), _rest)
if _ava is None:
_ava = ava_ec
else:
_ava.update(ava_ec)
_rest = self.get_attribute_restrictions(sp_entity_id)
if _rest:
if _ava is None:
_ava = ava.copy()
_ava = filter_attribute_value_assertions(_ava, _rest)
elif _ava is None:
_ava = ava.copy()
if _ava is None:
return {}
else:
return _ava
def restrict(self, ava, sp_entity_id, metadata=None):
""" Identity attribute names are expected to be expressed in
the local lingo (== friendlyName)
:return: A filtered ava according to the IdPs/AAs rules and
the list of required/optional attributes according to the SP.
If the requirements can't be met an exception is raised.
"""
if metadata:
spec = metadata.attribute_requirement(sp_entity_id)
if spec:
return self.filter(ava, sp_entity_id, metadata,
spec["required"], spec["optional"])
return self.filter(ava, sp_entity_id, metadata, [], [])
def conditions(self, sp_entity_id):
""" Return a saml.Condition instance
:param sp_entity_id: The SP entity ID
:return: A saml.Condition instance
"""
return factory(saml.Conditions,
not_before=instant(),
# How long might depend on who's getting it
not_on_or_after=self.not_on_or_after(sp_entity_id),
audience_restriction=[factory(
saml.AudienceRestriction,
audience=[factory(saml.Audience,
text=sp_entity_id)])])
def get_sign(self, sp_entity_id):
"""
Possible choices
"sign": ["response", "assertion", "on_demand"]
:param sp_entity_id:
:return:
"""
return self.get("sign", sp_entity_id, [])
class EntityCategories(object):
pass
def _authn_context_class_ref(authn_class, authn_auth=None):
"""
Construct the authn context with a authn context class reference
:param authn_class: The authn context class reference
:param authn_auth: Authenticating Authority
:return: An AuthnContext instance
"""
cntx_class = factory(saml.AuthnContextClassRef, text=authn_class)
if authn_auth:
return factory(saml.AuthnContext,
authn_context_class_ref=cntx_class,
authenticating_authority=factory(
saml.AuthenticatingAuthority, text=authn_auth))
else:
return factory(saml.AuthnContext,
authn_context_class_ref=cntx_class)
def _authn_context_decl(decl, authn_auth=None):
"""
Construct the authn context with a authn context declaration
:param decl: The authn context declaration
:param authn_auth: Authenticating Authority
:return: An AuthnContext instance
"""
return factory(saml.AuthnContext,
authn_context_decl=decl,
authenticating_authority=factory(
saml.AuthenticatingAuthority, text=authn_auth))
def _authn_context_decl_ref(decl_ref, authn_auth=None):
"""
Construct the authn context with a authn context declaration reference
:param decl_ref: The authn context declaration reference
:param authn_auth: Authenticating Authority
:return: An AuthnContext instance
"""
return factory(saml.AuthnContext,
authn_context_decl_ref=decl_ref,
authenticating_authority=factory(
saml.AuthenticatingAuthority, text=authn_auth))
def authn_statement(authn_class=None, authn_auth=None,
authn_decl=None, authn_decl_ref=None, authn_instant="",
subject_locality=""):
"""
Construct the AuthnStatement
:param authn_class: Authentication Context Class reference
:param authn_auth: Authenticating Authority
:param authn_decl: Authentication Context Declaration
:param authn_decl_ref: Authentication Context Declaration reference
:param authn_instant: When the Authentication was performed.
Assumed to be seconds since the Epoch.
:param subject_locality: Specifies the DNS domain name and IP address
for the system from which the assertion subject was apparently
authenticated.
:return: An AuthnContext instance
"""
if authn_instant:
_instant = instant(time_stamp=authn_instant)
else:
_instant = instant()
if authn_class:
res = factory(
saml.AuthnStatement,
authn_instant=_instant,
session_index=sid(),
authn_context=_authn_context_class_ref(
authn_class, authn_auth))
elif authn_decl:
res = factory(
saml.AuthnStatement,
authn_instant=_instant,
session_index=sid(),
authn_context=_authn_context_decl(authn_decl, authn_auth))
elif authn_decl_ref:
res = factory(
saml.AuthnStatement,
authn_instant=_instant,
session_index=sid(),
authn_context=_authn_context_decl_ref(authn_decl_ref,
authn_auth))
else:
res = factory(
saml.AuthnStatement,
authn_instant=_instant,
session_index=sid())
if subject_locality:
res.subject_locality = saml.SubjectLocality(text=subject_locality)
return res
class Assertion(dict):
""" Handles assertions about subjects """
def __init__(self, dic=None):
dict.__init__(self, dic)
self.acs = []
def construct(self, sp_entity_id, in_response_to, consumer_url,
name_id, attrconvs, policy, issuer, authn_class=None,
authn_auth=None, authn_decl=None, encrypt=None,
sec_context=None, authn_decl_ref=None, authn_instant="",
subject_locality="", authn_statem=None, add_subject=True):
""" Construct the Assertion
:param sp_entity_id: The entityid of the SP
:param in_response_to: An identifier of the message, this message is
a response to
:param consumer_url: The intended consumer of the assertion
:param name_id: An NameID instance
:param attrconvs: AttributeConverters
:param policy: The policy that should be adhered to when replying
:param issuer: Who is issuing the statement
:param authn_class: The authentication class
:param authn_auth: The authentication instance
:param authn_decl: An Authentication Context declaration
:param encrypt: Whether to encrypt parts or all of the Assertion
:param sec_context: The security context used when encrypting
:param authn_decl_ref: An Authentication Context declaration reference
:param authn_instant: When the Authentication was performed
:param subject_locality: Specifies the DNS domain name and IP address
for the system from which the assertion subject was apparently
authenticated.
:param authn_statem: A AuthnStatement instance
:return: An Assertion instance
"""
if policy:
_name_format = policy.get_name_form(sp_entity_id)
else:
_name_format = NAME_FORMAT_URI
attr_statement = saml.AttributeStatement(attribute=from_local(
attrconvs, self, _name_format))
if encrypt == "attributes":
for attr in attr_statement.attribute:
enc = sec_context.encrypt(text="%s" % attr)
encd = xmlenc.encrypted_data_from_string(enc)
encattr = saml.EncryptedAttribute(encrypted_data=encd)
attr_statement.encrypted_attribute.append(encattr)
attr_statement.attribute = []
# start using now and for some time
conds = policy.conditions(sp_entity_id)
if authn_statem:
_authn_statement = authn_statem
elif authn_auth or authn_class or authn_decl or authn_decl_ref:
_authn_statement = authn_statement(authn_class, authn_auth,
authn_decl, authn_decl_ref,
authn_instant,
subject_locality)
else:
_authn_statement = None
if not add_subject:
_ass = assertion_factory(
issuer=issuer,
conditions=conds,
subject=None
)
else:
_ass = assertion_factory(
issuer=issuer,
conditions=conds,
subject=factory(
saml.Subject,
name_id=name_id,
subject_confirmation=[factory(
saml.SubjectConfirmation,
method=saml.SCM_BEARER,
subject_confirmation_data=factory(
saml.SubjectConfirmationData,
in_response_to=in_response_to,
recipient=consumer_url,
not_on_or_after=policy.not_on_or_after(sp_entity_id)))]
),
)
if _authn_statement:
_ass.authn_statement = [_authn_statement]
if not attr_statement.empty():
_ass.attribute_statement = [attr_statement]
return _ass
def apply_policy(self, sp_entity_id, policy, metadata=None):
""" Apply policy to the assertion I'm representing
:param sp_entity_id: The SP entity ID
:param policy: The policy
:param metadata: Metadata to use
:return: The resulting AVA after the policy is applied
"""
policy.acs = self.acs
ava = policy.restrict(self, sp_entity_id, metadata)
for key, val in list(self.items()):
if key in ava:
self[key] = ava[key]
else:
del self[key]
return ava
|