This file is indexed.

/usr/lib/python2.7/dist-packages/ooni/deck.py is in ooniprobe 1.3.2-2.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
# -*- coding: utf-8 -*-

from ooni.oonibclient import OONIBClient
from ooni.nettest import NetTestLoader
from ooni.settings import config
from ooni.utils import log
from ooni import errors as e

from twisted.python.filepath import FilePath
from twisted.internet import defer

import os
import yaml
import json
from hashlib import sha256


class InputFile(object):
    def __init__(self, input_hash, base_path=config.inputs_directory):
        self.id = input_hash
        cache_path = os.path.join(os.path.abspath(base_path), input_hash)
        self.cached_file = cache_path
        self.cached_descriptor = cache_path + '.desc'

    @property
    def descriptorCached(self):
        if os.path.exists(self.cached_descriptor):
            with open(self.cached_descriptor) as f:
                descriptor = json.load(f)
                self.load(descriptor)
            return True
        return False

    @property
    def fileCached(self):
        if os.path.exists(self.cached_file):
            try:
                self.verify()
            except AssertionError:
                log.err("The input %s failed validation."
                        "Going to consider it not cached." % self.id)
                return False
            return True
        return False

    def save(self):
        with open(self.cached_descriptor, 'w+') as f:
            json.dump({
                'name': self.name,
                'id': self.id,
                'version': self.version,
                'author': self.author,
                'date': self.date,
                'description': self.description
            }, f)

    def load(self, descriptor):
        self.name = descriptor['name']
        self.version = descriptor['version']
        self.author = descriptor['author']
        self.date = descriptor['date']
        self.description = descriptor['description']

    def verify(self):
        digest = os.path.basename(self.cached_file)
        with open(self.cached_file) as f:
            file_hash = sha256(f.read())
            assert file_hash.hexdigest() == digest


def nettest_to_path(path, allow_arbitrary_paths=False):
    """
    Takes as input either a path or a nettest name.

    Args:

        allow_arbitrary_paths:
            allow also paths that are not relative to the nettest_directory.

    Returns:

        full path to the nettest file.
    """
    if allow_arbitrary_paths and os.path.exists(path):
        return path

    fp = FilePath(config.nettest_directory).preauthChild(path + '.py')
    if fp.exists():
        return fp.path
    else:
        raise e.NetTestNotFound(path)


class Deck(InputFile):
    def __init__(self, deck_hash=None,
                 deckFile=None,
                 decks_directory=config.decks_directory,
                 no_collector=False):
        self.id = deck_hash
        self.requiresTor = False
        self.no_collector = no_collector
        self.bouncer = ''
        self.netTestLoaders = []
        self.inputs = []

        self.oonibclient = OONIBClient(self.bouncer)

        self.decksDirectory = os.path.abspath(decks_directory)
        self.deckHash = deck_hash

        if deckFile:
            self.loadDeck(deckFile)

    @property
    def cached_file(self):
        return os.path.join(self.decksDirectory, self.deckHash)

    @property
    def cached_descriptor(self):
        return self.cached_file + '.desc'

    def loadDeck(self, deckFile):
        with open(deckFile) as f:
            self.deckHash = sha256(f.read()).hexdigest()
            f.seek(0)
            test_deck = yaml.safe_load(f)

        for test in test_deck:
            try:
                nettest_path = nettest_to_path(test['options']['test_file'])
            except e.NetTestNotFound:
                log.err("Could not find %s" % test['options']['test_file'])
                log.msg("Skipping...")
                continue
            net_test_loader = NetTestLoader(test['options']['subargs'],
                                            test_file=nettest_path)
            if test['options']['collector']:
                net_test_loader.collector = test['options']['collector']
            self.insert(net_test_loader)

    def insert(self, net_test_loader):
        """ Add a NetTestLoader to this test deck """

        def has_test_helper(missing_option):
            for rth in net_test_loader.requiredTestHelpers:
                if missing_option == rth['option']:
                    return True
            return False

        try:
            net_test_loader.checkOptions()
            if net_test_loader.requiresTor:
                self.requiresTor = True
        except e.MissingRequiredOption as missing_options:
            if not self.bouncer:
                raise
            for missing_option in missing_options.message:
                if not has_test_helper(missing_option):
                    raise
            self.requiresTor = True
        self.netTestLoaders.append(net_test_loader)

    @defer.inlineCallbacks
    def setup(self):
        """ fetch and verify inputs for all NetTests in the deck """
        log.msg("Fetching required net test inputs...")
        for net_test_loader in self.netTestLoaders:
            yield self.fetchAndVerifyNetTestInput(net_test_loader)

        if self.bouncer:
            log.msg("Looking up collector and test helpers")
            yield self.lookupCollector()

    @defer.inlineCallbacks
    def lookupCollector(self):
        self.oonibclient.address = self.bouncer

        required_nettests = []

        requires_test_helpers = False
        requires_collector = False
        for net_test_loader in self.netTestLoaders:
            nettest = {
                'name': net_test_loader.testDetails['test_name'],
                'version': net_test_loader.testDetails['test_version'],
                'test-helpers': [],
                'input-hashes': [x['hash'] for x in net_test_loader.inputFiles]
            }
            if not net_test_loader.collector and not self.no_collector:
                requires_collector = True

            for th in net_test_loader.requiredTestHelpers:
                # {'name':'', 'option':'', 'test_class':''}
                if th['test_class'].localOptions[th['option']]:
                    continue
                nettest['test-helpers'].append(th['name'])
                requires_test_helpers = True

            required_nettests.append(nettest)

        if not requires_test_helpers and not requires_collector:
            defer.returnValue(None)

        response = yield self.oonibclient.lookupTestCollector(required_nettests)
        provided_net_tests = response['net-tests']

        def find_collector_and_test_helpers(test_name, test_version, input_files):
            input_files = [u""+x['hash'] for x in input_files]
            for net_test in provided_net_tests:
                if net_test['name'] != test_name:
                    continue
                if net_test['version'] != test_version:
                    continue
                if set(net_test['input-hashes']) != set(input_files):
                    continue
                return net_test['collector'], net_test['test-helpers']

        for net_test_loader in self.netTestLoaders:
            log.msg("Setting collector and test helpers for %s" % net_test_loader.testDetails['test_name'])

            collector, test_helpers = \
                find_collector_and_test_helpers(net_test_loader.testDetails['test_name'],
                                                net_test_loader.testDetails['test_version'],
                                                net_test_loader.inputFiles)

            for th in net_test_loader.requiredTestHelpers:
                if not th['test_class'].localOptions[th['option']]:
                    th['test_class'].localOptions[th['option']] = test_helpers[th['name']].encode('utf-8')
                net_test_loader.testHelpers[th['option']] = th['test_class'].localOptions[th['option']]

            if not net_test_loader.collector:
                net_test_loader.collector = collector.encode('utf-8')

    @defer.inlineCallbacks
    def lookupTestHelpers(self):
        self.oonibclient.address = self.bouncer

        required_test_helpers = []
        requires_collector = []
        for net_test_loader in self.netTestLoaders:
            if not net_test_loader.collector and not self.no_collector:
                requires_collector.append(net_test_loader)

            for th in net_test_loader.requiredTestHelpers:
                # {'name':'', 'option':'', 'test_class':''}
                if th['test_class'].localOptions[th['option']]:
                    continue
                required_test_helpers.append(th['name'])

        if not required_test_helpers and not requires_collector:
            defer.returnValue(None)

        response = yield self.oonibclient.lookupTestHelpers(required_test_helpers)

        for net_test_loader in self.netTestLoaders:
            log.msg("Setting collector and test helpers for %s" %
                    net_test_loader.testDetails['test_name'])

            # Only set the collector if the no collector has been specified
            # from the command line or via the test deck.
            if not net_test_loader.requiredTestHelpers and \
                            net_test_loader in requires_collector:
                log.msg("Using the default collector: %s" %
                        response['default']['collector'])
                net_test_loader.collector = response['default']['collector'].encode('utf-8')
                continue

            for th in net_test_loader.requiredTestHelpers:
                # Only set helpers which are not already specified
                if th['name'] not in required_test_helpers:
                    continue
                test_helper = response[th['name']]
                log.msg("Using this helper: %s" % test_helper)
                th['test_class'].localOptions[th['option']] = test_helper['address'].encode('utf-8')
                if net_test_loader in requires_collector:
                    net_test_loader.collector = test_helper['collector'].encode('utf-8')

    @defer.inlineCallbacks
    def fetchAndVerifyNetTestInput(self, net_test_loader):
        """ fetch and verify a single NetTest's inputs """
        log.debug("Fetching and verifying inputs")
        for i in net_test_loader.inputFiles:
            if 'url' in i:
                log.debug("Downloading %s" % i['url'])
                self.oonibclient.address = i['address']

                try:
                    input_file = yield self.oonibclient.downloadInput(i['hash'])
                except:
                    raise e.UnableToLoadDeckInput

                try:
                    input_file.verify()
                except AssertionError:
                    raise e.UnableToLoadDeckInput

                i['test_class'].localOptions[i['key']] = input_file.cached_file