This file is indexed.

/usr/lib/python2.7/dist-packages/keystonemiddleware/auth_token/_revocations.py is in python-keystonemiddleware 4.9.0-2.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import datetime
import logging
import os

from oslo_serialization import jsonutils
from oslo_utils import timeutils

from keystonemiddleware.auth_token import _exceptions as exc
from keystonemiddleware.i18n import _

_LOG = logging.getLogger(__name__)


class Revocations(object):
    _FILE_NAME = 'revoked.pem'

    def __init__(self, timeout, signing_directory, identity_server,
                 cms_verify, log=_LOG):
        self._cache_timeout = timeout
        self._signing_directory = signing_directory
        self._identity_server = identity_server
        self._cms_verify = cms_verify
        self._log = log

        self._fetched_time_prop = None
        self._list_prop = None

    @property
    def _fetched_time(self):
        if not self._fetched_time_prop:
            # If the fetched list has been written to disk, use its
            # modification time.
            file_path = self._signing_directory.calc_path(self._FILE_NAME)
            if os.path.exists(file_path):
                mtime = os.path.getmtime(file_path)
                fetched_time = datetime.datetime.utcfromtimestamp(mtime)
            # Otherwise the list will need to be fetched.
            else:
                fetched_time = datetime.datetime.min
            self._fetched_time_prop = fetched_time
        return self._fetched_time_prop

    @_fetched_time.setter
    def _fetched_time(self, value):
        self._fetched_time_prop = value

    def _fetch(self):
        revocation_list_data = self._identity_server.fetch_revocation_list()
        return self._cms_verify(revocation_list_data)

    @property
    def _list(self):
        timeout = self._fetched_time + self._cache_timeout
        list_is_current = timeutils.utcnow() < timeout

        if list_is_current:
            # Load the list from disk if required
            if not self._list_prop:
                self._list_prop = jsonutils.loads(
                    self._signing_directory.read_file(self._FILE_NAME))
        else:
            self._list = self._fetch()
        return self._list_prop

    @_list.setter
    def _list(self, value):
        """Save a revocation list to memory and to disk.

        :param value: A json-encoded revocation list

        """
        self._list_prop = jsonutils.loads(value)
        self._fetched_time = timeutils.utcnow()
        self._signing_directory.write_file(self._FILE_NAME, value)

    def _is_revoked(self, token_id):
        """Indicate whether the token_id appears in the revocation list."""
        revoked_tokens = self._list.get('revoked', None)
        if not revoked_tokens:
            return False

        revoked_ids = (x['id'] for x in revoked_tokens)
        return token_id in revoked_ids

    def _any_revoked(self, token_ids):
        for token_id in token_ids:
            if self._is_revoked(token_id):
                return True
        return False

    def check(self, token_ids):
        if self._any_revoked(token_ids):
            self._log.debug('Token is marked as having been revoked')
            raise exc.InvalidToken(_('Token has been revoked'))

    def check_by_audit_id(self, audit_ids):
        """Check whether the audit_id appears in the revocation list.

        :raises keystonemiddleware.auth_token._exceptions.InvalidToken:
            if the audit ID(s) appear in the revocation list.

        """
        revoked_tokens = self._list.get('revoked', None)
        if not revoked_tokens:
            # There's no revoked tokens, so nothing to do.
            return

        # The audit_id may not be present in the revocation events because
        # earlier versions of the identity server didn't provide them.
        revoked_ids = set(
            x['audit_id'] for x in revoked_tokens if 'audit_id' in x)
        for audit_id in audit_ids:
            if audit_id in revoked_ids:
                self._log.debug(
                    'Token is marked as having been revoked by audit id')
                raise exc.InvalidToken(_('Token has been revoked'))