This file is indexed.

/usr/lib/python2.7/dist-packages/boto/cloudsearch2/optionstatus.py is in python-boto 2.34.0-2.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
# Copyright (c) 2012 Mitch Garnaat http://garnaat.org/
# Copyright (c) 2012 Amazon.com, Inc. or its affiliates.
# All Rights Reserved
#
# Permission is hereby granted, free of charge, to any person obtaining a
# copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish, dis-
# tribute, sublicense, and/or sell copies of the Software, and to permit
# persons to whom the Software is furnished to do so, subject to the fol-
# lowing conditions:
#
# The above copyright notice and this permission notice shall be included
# in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
#

from boto.compat import json


class OptionStatus(dict):
    """
    Presents a combination of status field (defined below) which are
    accessed as attributes and option values which are stored in the
    native Python dictionary.  In this class, the option values are
    merged from a JSON object that is stored as the Option part of
    the object.

    :ivar domain_name: The name of the domain this option is associated with.
    :ivar create_date: A timestamp for when this option was created.
    :ivar state: The state of processing a change to an option.
        Possible values:

        * RequiresIndexDocuments: the option's latest value will not
          be visible in searches until IndexDocuments has been called
          and indexing is complete.
        * Processing: the option's latest value is not yet visible in
          all searches but is in the process of being activated.
        * Active: the option's latest value is completely visible.

    :ivar update_date: A timestamp for when this option was updated.
    :ivar update_version: A unique integer that indicates when this
        option was last updated.
    """

    def __init__(self, domain, data=None, refresh_fn=None, refresh_key=None,
                 save_fn=None):
        self.domain = domain
        self.refresh_fn = refresh_fn
        self.refresh_key = refresh_key
        self.save_fn = save_fn
        self.refresh(data)

    def _update_status(self, status):
        self.creation_date = status['CreationDate']
        self.status = status['State']
        self.update_date = status['UpdateDate']
        self.update_version = int(status['UpdateVersion'])

    def _update_options(self, options):
        if options:
            self.update(options)

    def refresh(self, data=None):
        """
        Refresh the local state of the object.  You can either pass
        new state data in as the parameter ``data`` or, if that parameter
        is omitted, the state data will be retrieved from CloudSearch.
        """
        if not data:
            if self.refresh_fn:
                data = self.refresh_fn(self.domain.name)

                if data and self.refresh_key:
                    # Attempt to pull out the right nested bag of data
                    for key in self.refresh_key:
                        data = data[key]
        if data:
            self._update_status(data['Status'])
            self._update_options(data['Options'])

    def to_json(self):
        """
        Return the JSON representation of the options as a string.
        """
        return json.dumps(self)

    def save(self):
        """
        Write the current state of the local object back to the
        CloudSearch service.
        """
        if self.save_fn:
            data = self.save_fn(self.domain.name, self.to_json())
            self.refresh(data)


class IndexFieldStatus(OptionStatus):
    def save(self):
        pass


class AvailabilityOptionsStatus(OptionStatus):
    def save(self):
        pass


class ScalingParametersStatus(IndexFieldStatus):
    pass


class ExpressionStatus(IndexFieldStatus):
    pass


class ServicePoliciesStatus(OptionStatus):

    def new_statement(self, arn, ip):
        """
        Returns a new policy statement that will allow
        access to the service described by ``arn`` by the
        ip specified in ``ip``.

        :type arn: string
        :param arn: The Amazon Resource Notation identifier for the
            service you wish to provide access to.  This would be
            either the search service or the document service.

        :type ip: string
        :param ip: An IP address or CIDR block you wish to grant access
            to.
        """
        return {
            "Effect": "Allow",
            "Action": "*",  # Docs say use GET, but denies unless *
            "Resource": arn,
            "Condition": {
                "IpAddress": {
                    "aws:SourceIp": [ip]
                }
            }
        }

    def _allow_ip(self, arn, ip):
        if 'Statement' not in self:
            s = self.new_statement(arn, ip)
            self['Statement'] = [s]
            self.save()
        else:
            add_statement = True
            for statement in self['Statement']:
                if statement['Resource'] == arn:
                    for condition_name in statement['Condition']:
                        if condition_name == 'IpAddress':
                            add_statement = False
                            condition = statement['Condition'][condition_name]
                            if ip not in condition['aws:SourceIp']:
                                condition['aws:SourceIp'].append(ip)

            if add_statement:
                s = self.new_statement(arn, ip)
                self['Statement'].append(s)
            self.save()

    def allow_search_ip(self, ip):
        """
        Add the provided ip address or CIDR block to the list of
        allowable address for the search service.

        :type ip: string
        :param ip: An IP address or CIDR block you wish to grant access
            to.
        """
        arn = self.domain.service_arn
        self._allow_ip(arn, ip)

    def allow_doc_ip(self, ip):
        """
        Add the provided ip address or CIDR block to the list of
        allowable address for the document service.

        :type ip: string
        :param ip: An IP address or CIDR block you wish to grant access
            to.
        """
        arn = self.domain.service_arn
        self._allow_ip(arn, ip)

    def _disallow_ip(self, arn, ip):
        if 'Statement' not in self:
            return
        need_update = False
        for statement in self['Statement']:
            if statement['Resource'] == arn:
                for condition_name in statement['Condition']:
                    if condition_name == 'IpAddress':
                        condition = statement['Condition'][condition_name]
                        if ip in condition['aws:SourceIp']:
                            condition['aws:SourceIp'].remove(ip)
                            need_update = True
        if need_update:
            self.save()

    def disallow_search_ip(self, ip):
        """
        Remove the provided ip address or CIDR block from the list of
        allowable address for the search service.

        :type ip: string
        :param ip: An IP address or CIDR block you wish to grant access
            to.
        """
        arn = self.domain.service_arn
        self._disallow_ip(arn, ip)

    def disallow_doc_ip(self, ip):
        """
        Remove the provided ip address or CIDR block from the list of
        allowable address for the document service.

        :type ip: string
        :param ip: An IP address or CIDR block you wish to grant access
            to.
        """
        arn = self.domain.service_arn
        self._disallow_ip(arn, ip)