This file is indexed.

/usr/lib/python2.7/dist-packages/whoosh/multiproc.py is in python-whoosh 2.7.0-2.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
# Copyright 2011 Matt Chaput. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
#    1. Redistributions of source code must retain the above copyright notice,
#       this list of conditions and the following disclaimer.
#
#    2. Redistributions in binary form must reproduce the above copyright
#       notice, this list of conditions and the following disclaimer in the
#       documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY MATT CHAPUT ``AS IS'' AND ANY EXPRESS OR
# IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
# EVENT SHALL MATT CHAPUT OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
# OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
# EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
# The views and conclusions contained in the software and documentation are
# those of the authors and should not be interpreted as representing official
# policies, either expressed or implied, of Matt Chaput.

from __future__ import with_statement
import os
from multiprocessing import Process, Queue, cpu_count

from whoosh.compat import queue, xrange, iteritems, pickle
from whoosh.codec import base
from whoosh.writing import PostingPool, SegmentWriter
from whoosh.externalsort import imerge
from whoosh.util import random_name


def finish_subsegment(writer, k=64):
    # Tell the pool to finish up the current file
    writer.pool.save()
    # Tell the pool to merge any and all runs in the pool until there
    # is only one run remaining. "k" is an optional parameter passed
    # from the parent which sets the maximum number of files to open
    # while reducing.
    writer.pool.reduce_to(1, k)

    # The filename of the single remaining run
    runname = writer.pool.runs[0]
    # The indexed field names
    fieldnames = writer.pool.fieldnames
    # The segment object (parent can use this to re-open the files created
    # by the sub-writer)
    segment = writer._partial_segment()

    return runname, fieldnames, segment


# Multiprocessing Writer

class SubWriterTask(Process):
    # This is a Process object that takes "jobs" off a job Queue, processes
    # them, and when it's done, puts a summary of its work on a results Queue

    def __init__(self, storage, indexname, jobqueue, resultqueue, kwargs,
                 multisegment):
        Process.__init__(self)
        self.storage = storage
        self.indexname = indexname
        self.jobqueue = jobqueue
        self.resultqueue = resultqueue
        self.kwargs = kwargs
        self.multisegment = multisegment
        self.running = True

    def run(self):
        # This is the main loop of the process. OK, so the way this works is
        # kind of brittle and stupid, but I had to figure out how to use the
        # multiprocessing module, work around bugs, and address performance
        # issues, so there is at least some reasoning behind some of this

        # The "parent" task farms individual documents out to the subtasks for
        # indexing. You could pickle the actual documents and put them in the
        # queue, but that is not very performant. Instead, we assume the tasks
        # share a filesystem and use that to pass the information around. The
        # parent task writes a certain number of documents to a file, then puts
        # the filename on the "job queue". A subtask gets the filename off the
        # queue and reads through the file processing the documents.

        jobqueue = self.jobqueue
        resultqueue = self.resultqueue
        multisegment = self.multisegment

        # Open a placeholder object representing the index
        ix = self.storage.open_index(self.indexname)
        # Open a writer for the index. The _lk=False parameter means to not try
        # to lock the index (the parent object that started me takes care of
        # locking the index)
        writer = self.writer = SegmentWriter(ix, _lk=False, **self.kwargs)

        # If the parent task calls cancel() on me, it will set self.running to
        # False, so I'll notice the next time through the loop
        while self.running:
            # Take an object off the job queue
            jobinfo = jobqueue.get()
            # If the object is None, it means the parent task wants me to
            # finish up
            if jobinfo is None:
                break
            # The object from the queue is a tuple of (filename,
            # number_of_docs_in_file). Pass those two pieces of information as
            # arguments to _process_file().
            self._process_file(*jobinfo)
            # jobqueue.task_done()

        if not self.running:
            # I was cancelled, so I'll cancel my underlying writer
            writer.cancel()
        else:
            if multisegment:
                # Actually finish the segment and return it with no run
                runname = None
                fieldnames = writer.pool.fieldnames
                segment = writer._finalize_segment()
            else:
                # Merge all runs in the writer's pool into one run, close the
                # segment, and return the run name and the segment
                k = self.kwargs.get("k", 64)
                runname, fieldnames, segment = finish_subsegment(writer, k)

            # Put the results (the run filename and the segment object) on the
            # result queue
            resultqueue.put((runname, fieldnames, segment), timeout=5)

    def _process_file(self, filename, doc_count):
        # This method processes a "job file" written out by the parent task. A
        # job file is a series of pickled (code, arguments) tuples. Currently
        # the only command codes is 0=add_document

        writer = self.writer
        tempstorage = writer.temp_storage()

        load = pickle.load
        with tempstorage.open_file(filename).raw_file() as f:
            for _ in xrange(doc_count):
                # Load the next pickled tuple from the file
                code, args = load(f)
                assert code == 0
                writer.add_document(**args)
        # Remove the job file
        tempstorage.delete_file(filename)

    def cancel(self):
        self.running = False


class MpWriter(SegmentWriter):
    def __init__(self, ix, procs=None, batchsize=100, subargs=None,
                 multisegment=False, **kwargs):
        # This is the "main" writer that will aggregate the results created by
        # the sub-tasks
        SegmentWriter.__init__(self, ix, **kwargs)

        self.procs = procs or cpu_count()
        # The maximum number of documents in each job file submitted to the
        # sub-tasks
        self.batchsize = batchsize
        # You can use keyword arguments or the "subargs" argument to pass
        # keyword arguments to the sub-writers
        self.subargs = subargs if subargs else kwargs
        # If multisegment is True, don't merge the segments created by the
        # sub-writers, just add them directly to the TOC
        self.multisegment = multisegment

        # A list to hold the sub-task Process objects
        self.tasks = []
        # A queue to pass the filenames of job files to the sub-tasks
        self.jobqueue = Queue(self.procs * 4)
        # A queue to get back the final results of the sub-tasks
        self.resultqueue = Queue()
        # A buffer for documents before they are flushed to a job file
        self.docbuffer = []

        self._grouping = 0
        self._added_sub = False

    def _new_task(self):
        task = SubWriterTask(self.storage, self.indexname,
                             self.jobqueue, self.resultqueue, self.subargs,
                             self.multisegment)
        self.tasks.append(task)
        task.start()
        return task

    def _enqueue(self):
        # Flush the documents stored in self.docbuffer to a file and put the
        # filename on the job queue
        docbuffer = self.docbuffer
        dump = pickle.dump
        length = len(docbuffer)

        filename = "%s.doclist" % random_name()
        with self.temp_storage().create_file(filename).raw_file() as f:
            for item in docbuffer:
                dump(item, f, -1)

        if len(self.tasks) < self.procs:
            self._new_task()
        jobinfo = (filename, length)
        self.jobqueue.put(jobinfo)
        self.docbuffer = []

    def cancel(self):
        try:
            for task in self.tasks:
                task.cancel()
        finally:
            SegmentWriter.cancel(self)

    def start_group(self):
        self._grouping += 1

    def end_group(self):
        if not self._grouping:
            raise Exception("Unbalanced end_group")
        self._grouping -= 1

    def add_document(self, **fields):
        # Add the document to the docbuffer
        self.docbuffer.append((0, fields))
        # If the buffer is full, flush it to the job queue
        if not self._grouping and len(self.docbuffer) >= self.batchsize:
            self._enqueue()
        self._added_sub = True

    def _read_and_renumber_run(self, path, offset):
        # Note that SortingPool._read_run() automatically deletes the run file
        # when it's finished

        gen = self.pool._read_run(path)
        # If offset is 0, just return the items unchanged
        if not offset:
            return gen
        else:
            # Otherwise, add the offset to each docnum
            return ((fname, text, docnum + offset, weight, value)
                    for fname, text, docnum, weight, value in gen)

    def commit(self, mergetype=None, optimize=None, merge=None):
        if self._added_sub:
            # If documents have been added to sub-writers, use the parallel
            # merge commit code
            self._commit(mergetype, optimize, merge)
        else:
            # Otherwise, just do a regular-old commit
            SegmentWriter.commit(self, mergetype=mergetype, optimize=optimize,
                                 merge=merge)

    def _commit(self, mergetype, optimize, merge):
        # Index the remaining documents in the doc buffer
        if self.docbuffer:
            self._enqueue()
        # Tell the tasks to finish
        for task in self.tasks:
            self.jobqueue.put(None)

        # Merge existing segments
        finalsegments = self._merge_segments(mergetype, optimize, merge)

        # Wait for the subtasks to finish
        for task in self.tasks:
            task.join()

        # Pull a (run_file_name, fieldnames, segment) tuple off the result
        # queue for each sub-task, representing the final results of the task
        results = []
        for _ in self.tasks:
            try:
                results.append(self.resultqueue.get(timeout=1))
            except queue.Empty:
                pass

        if self.multisegment:
            # If we're not merging the segments, we don't care about the runname
            # and fieldnames in the results... just pull out the segments and
            # add them to the list of final segments
            finalsegments += [s for _, _, s in results]
            if self._added:
                finalsegments.append(self._finalize_segment())
            else:
                self._close_segment()
            assert self.perdocwriter.is_closed
        else:
            # Merge the posting sources from the sub-writers and my
            # postings into this writer
            self._merge_subsegments(results, mergetype)
            self._close_segment()
            self._assemble_segment()
            finalsegments.append(self.get_segment())
            assert self.perdocwriter.is_closed

        self._commit_toc(finalsegments)
        self._finish()

    def _merge_subsegments(self, results, mergetype):
        schema = self.schema
        schemanames = set(schema.names())
        storage = self.storage
        codec = self.codec
        sources = []

        # If information was added to this writer the conventional (e.g.
        # through add_reader or merging segments), add it as an extra source
        if self._added:
            sources.append(self.pool.iter_postings())

        pdrs = []
        for runname, fieldnames, segment in results:
            fieldnames = set(fieldnames) | schemanames
            pdr = codec.per_document_reader(storage, segment)
            pdrs.append(pdr)
            basedoc = self.docnum
            docmap = self.write_per_doc(fieldnames, pdr)
            assert docmap is None

            items = self._read_and_renumber_run(runname, basedoc)
            sources.append(items)

        # Create a MultiLengths object combining the length files from the
        # subtask segments
        self.perdocwriter.close()
        pdrs.insert(0, self.per_document_reader())
        mpdr = base.MultiPerDocumentReader(pdrs)

        try:
            # Merge the iterators into the field writer
            self.fieldwriter.add_postings(schema, mpdr, imerge(sources))
        finally:
            mpdr.close()
        self._added = True


class SerialMpWriter(MpWriter):
    # A non-parallel version of the MpWriter for testing purposes

    def __init__(self, ix, procs=None, batchsize=100, subargs=None, **kwargs):
        SegmentWriter.__init__(self, ix, **kwargs)

        self.procs = procs or cpu_count()
        self.batchsize = batchsize
        self.subargs = subargs if subargs else kwargs
        self.tasks = [SegmentWriter(ix, _lk=False, **self.subargs)
                      for _ in xrange(self.procs)]
        self.pointer = 0
        self._added_sub = False

    def add_document(self, **fields):
        self.tasks[self.pointer].add_document(**fields)
        self.pointer = (self.pointer + 1) % len(self.tasks)
        self._added_sub = True

    def _commit(self, mergetype, optimize, merge):
        # Pull a (run_file_name, segment) tuple off the result queue for each
        # sub-task, representing the final results of the task

        # Merge existing segments
        finalsegments = self._merge_segments(mergetype, optimize, merge)
        results = []
        for writer in self.tasks:
            results.append(finish_subsegment(writer))

        self._merge_subsegments(results, mergetype)
        self._close_segment()
        self._assemble_segment()
        finalsegments.append(self.get_segment())

        self._commit_toc(finalsegments)
        self._finish()


# For compatibility with old multiproc module
class MultiSegmentWriter(MpWriter):
    def __init__(self, *args, **kwargs):
        MpWriter.__init__(self, *args, **kwargs)
        self.multisegment = True