This file is indexed.

/usr/share/doc/python-tables-doc/examples/multiprocess_access_benchmarks.py is in python-tables-doc 3.1.1-3.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
# Benchmark three methods of using PyTables with multiple processes, where data
# is read from a PyTables file in one process and then sent to another
#
# 1. using multiprocessing.Pipe
# 2. using a memory mapped file that's shared between two processes, passed as
#    out argument to tables.Array.read.
# 3. using a Unix domain socket (this uses the "abstract namespace" and will
#    work only on Linux).
# 4. using an IPv4 socket
#
# In all three cases, an array is loaded from a file in one process, sent to
# another, and then modified by incrementing each array element.  This is meant
# to simulate retrieving data and then modifying it.

from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

import multiprocessing
import os
import random
import select
import socket
import time

import numpy as np
import tables


# create a PyTables file with a single int64 array with the specified number of
# elements
def create_file(array_size):
    array = np.ones(array_size, dtype='i8')
    with tables.open_file('test.h5', 'w') as fobj:
        array = fobj.create_array('/', 'test', array)
        print('file created, size: {0} MB'.format(array.size_on_disk / 1e6))


# process to receive an array using a multiprocessing.Pipe connection
class PipeReceive(multiprocessing.Process):

    def __init__(self, receiver_pipe, result_send):
        super(PipeReceive, self).__init__()
        self.receiver_pipe = receiver_pipe
        self.result_send = result_send

    def run(self):
        # block until something is received on the pipe
        array = self.receiver_pipe.recv()
        recv_timestamp = time.time()
        # perform an operation on the received array
        array += 1
        finish_timestamp = time.time()
        assert(np.all(array == 2))
        # send the measured timestamps back to the originating process
        self.result_send.send((recv_timestamp, finish_timestamp))


def read_and_send_pipe(send_type, array_size):
    # set up Pipe objects to send the actual array to the other process
    # and receive the timing results from the other process
    array_recv, array_send = multiprocessing.Pipe(False)
    result_recv, result_send = multiprocessing.Pipe(False)
    # start the other process and pause to allow it to start up
    recv_process = PipeReceive(array_recv, result_send)
    recv_process.start()
    time.sleep(0.15)
    with tables.open_file('test.h5', 'r') as fobj:
        array = fobj.get_node('/', 'test')
        start_timestamp = time.time()
        # read an array from the PyTables file and send it to the other process
        output = array.read(0, array_size, 1)
        array_send.send(output)
        assert(np.all(output + 1 == 2))
        # receive the timestamps from the other process
        recv_timestamp, finish_timestamp = result_recv.recv()
    print_results(send_type, start_timestamp, recv_timestamp, finish_timestamp)
    recv_process.join()


# process to receive an array using a shared memory mapped file
# for real use, this would require creating some protocol to specify the
# array's data type and shape
class MemmapReceive(multiprocessing.Process):

    def __init__(self, path_recv, result_send):
        super(MemmapReceive, self).__init__()
        self.path_recv = path_recv
        self.result_send = result_send

    def run(self):
        # block until the memmap file path is received from the other process
        path = self.path_recv.recv()
        # create a memmap array using the received file path
        array = np.memmap(path, 'i8', 'r+')
        recv_timestamp = time.time()
        # perform an operation on the array
        array += 1
        finish_timestamp = time.time()
        assert(np.all(array == 2))
        # send the timing results back to the other process
        self.result_send.send((recv_timestamp, finish_timestamp))


def read_and_send_memmap(send_type, array_size):
    # create a multiprocessing Pipe that will be used to send the memmap
    # file path to the receiving process
    path_recv, path_send = multiprocessing.Pipe(False)
    result_recv, result_send = multiprocessing.Pipe(False)
    # start the receiving process and pause to allow it to start up
    recv_process = MemmapReceive(path_recv, result_send)
    recv_process.start()
    time.sleep(0.15)
    with tables.open_file('test.h5', 'r') as fobj:
        array = fobj.get_node('/', 'test')
        start_timestamp = time.time()
        # memmap a file as a NumPy array in 'overwrite' mode
        output = np.memmap('/tmp/array1', 'i8', 'w+', shape=(array_size, ))
        # read an array from a PyTables file into the memmory mapped array
        array.read(0, array_size, 1, out=output)
        # use a multiprocessing.Pipe to send the file's path to the receiving
        # process
        path_send.send('/tmp/array1')
        # receive the timestamps from the other process
        recv_timestamp, finish_timestamp = result_recv.recv()
        # because 'output' is shared between processes, all elements should now
        # be equal to 2
        assert(np.all(output == 2))
    print_results(send_type, start_timestamp, recv_timestamp, finish_timestamp)
    recv_process.join()


# process to receive an array using a socket
# for real use, this would require creating some protocol to specify the
# array's data type and shape
class SocketReceive(multiprocessing.Process):

    def __init__(self, socket_family, address, result_send, array_nbytes):
        super(SocketReceive, self).__init__()
        self.socket_family = socket_family
        self.address = address
        self.result_send = result_send
        self.array_nbytes = array_nbytes

    def run(self):
        # create the socket, listen for a connection and use select to block
        # until a connection is made
        sock = socket.socket(self.socket_family, socket.SOCK_STREAM)
        sock.bind(self.address)
        sock.listen(1)
        readable, _, _ = select.select([sock], [], [])
        # accept the connection and read the sent data into a bytearray
        connection = sock.accept()[0]
        recv_buffer = bytearray(self.array_nbytes)
        view = memoryview(recv_buffer)
        bytes_recv = 0
        while bytes_recv < self.array_nbytes:
            bytes_recv += connection.recv_into(view[bytes_recv:])
        # convert the bytearray into a NumPy array
        array = np.frombuffer(recv_buffer, dtype='i8')
        recv_timestamp = time.time()
        # perform an operation on the received array
        array += 1
        finish_timestamp = time.time()
        assert(np.all(array == 2))
        # send the timestamps back to the originating process
        self.result_send.send((recv_timestamp, finish_timestamp))
        connection.close()
        sock.close()


def unix_socket_address():
    # create a Unix domain address in the abstract namespace
    # this will only work on Linux
    return b'\x00' + os.urandom(5)


def ipv4_socket_address():
    # create an IPv4 socket address
    return ('127.0.0.1', random.randint(9000, 10000))


def read_and_send_socket(send_type, array_size, array_bytes, address_func,
                         socket_family):
    address = address_func()
    # start the receiving process and pause to allow it to start up
    result_recv, result_send = multiprocessing.Pipe(False)
    recv_process = SocketReceive(socket_family, address, result_send,
                                 array_bytes)
    recv_process.start()
    time.sleep(0.15)
    with tables.open_file('test.h5', 'r') as fobj:
        array = fobj.get_node('/', 'test')
        start_timestamp = time.time()
        # connect to the receiving process' socket
        sock = socket.socket(socket_family, socket.SOCK_STREAM)
        sock.connect(address)
        # read the array from the PyTables file and send its
        # data buffer to the receiving process
        output = array.read(0, array_size, 1)
        sock.send(output.data)
        assert(np.all(output + 1 == 2))
        # receive the timestamps from the other process
        recv_timestamp, finish_timestamp = result_recv.recv()
    sock.close()
    print_results(send_type, start_timestamp, recv_timestamp, finish_timestamp)
    recv_process.join()


def print_results(send_type, start_timestamp, recv_timestamp,
                  finish_timestamp):
    msg = 'type: {0}\t receive: {1:5.5f}, add:{2:5.5f}, total: {3:5.5f}'
    print(msg.format(send_type,
                     recv_timestamp - start_timestamp,
                     finish_timestamp - recv_timestamp,
                     finish_timestamp - start_timestamp))


if __name__ == '__main__':

    random.seed(os.urandom(2))
    array_num_bytes = [int(x) for x in [1e5, 1e6, 1e7, 1e8]]

    for array_bytes in array_num_bytes:
        array_size = int(array_bytes // 8)

        create_file(array_size)
        read_and_send_pipe('multiproc.Pipe', array_size)
        read_and_send_memmap('memmap     ', array_size)
        # comment out this line to run on an OS other than Linux
        read_and_send_socket('Unix socket', array_size, array_bytes,
                             unix_socket_address, socket.AF_UNIX)
        read_and_send_socket('IPv4 socket', array_size, array_bytes,
                             ipv4_socket_address, socket.AF_INET)
        print()