This file is indexed.

/usr/share/pyshared/async/test/task.py is in python-async 0.6.1-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
"""Module containing task implementations useful for testing them"""
from async.task import *

import threading
import weakref

class _TestTaskBase(object):
	"""Note: causes great slowdown due to the required locking of task variables"""
	def __init__(self, *args, **kwargs):
		super(_TestTaskBase, self).__init__(*args, **kwargs)
		self.should_fail = False
		self.lock = threading.Lock()		# yes, can't safely do x = x + 1 :)
		self.plock = threading.Lock()
		self.item_count = 0
		self.process_count = 0
	
	def do_fun(self, item):
		self.lock.acquire()
		self.item_count += 1
		self.lock.release()
		if self.should_fail:
			raise AssertionError("I am failing just for the fun of it")
		return item
	
	def process(self, count=1):
		# must do it first, otherwise we might read and check results before
		# the thread gets here :). Its a lesson !
		self.plock.acquire()
		self.process_count += 1
		self.plock.release()
		super(_TestTaskBase, self).process(count)
		
	def _assert(self, pc, fc, check_scheduled=False):
		"""Assert for num process counts (pc) and num function counts (fc)
		:return: self"""
		self.lock.acquire()
		if self.item_count != fc:
			print self.item_count, fc
		assert self.item_count == fc
		self.lock.release()
		
		# NOTE: asserting num-writers fails every now and then, implying a thread is 
		# still processing (an empty chunk) when we are checking it. This can 
		# only be prevented by checking the scheduled items, which requires locking
		# and causes slowdows, so we don't do that. If the num_writers 
		# counter wouldn't be maintained properly, more tests would fail, so 
		# we can safely refrain from checking this here
		# self._wlock.acquire()
		# assert self._num_writers == 0
		# self._wlock.release()
		return self


class TestThreadTask(_TestTaskBase, IteratorThreadTask):
	pass
		

class TestFailureThreadTask(TestThreadTask):
	"""Fails after X items"""
	def __init__(self, *args, **kwargs):
		self.fail_after = kwargs.pop('fail_after')
		super(TestFailureThreadTask, self).__init__(*args, **kwargs)
		
	def do_fun(self, item):
		item = TestThreadTask.do_fun(self, item)
		
		self.lock.acquire()
		try:
			if self.item_count > self.fail_after:
				raise AssertionError("Simulated failure after processing %i items" % self.fail_after)
		finally:
			self.lock.release()
		# END handle fail after
		return item
		

class TestChannelThreadTask(_TestTaskBase, ChannelThreadTask):
	"""Apply a transformation on items read from an input channel"""
	def __init__(self, *args, **kwargs):
		self.fail_after = kwargs.pop('fail_after', 0)
		super(TestChannelThreadTask, self).__init__(*args, **kwargs)
	
	def do_fun(self, item):
		"""return tuple(i, i*2)"""
		item = super(TestChannelThreadTask, self).do_fun(item)
		
		# fail after support
		if self.fail_after:
			self.lock.acquire()
			try:
				if self.item_count > self.fail_after:
					raise AssertionError("Simulated failure after processing %i items" % self.fail_after)
			finally:
				self.lock.release()
		# END handle fail-after
		
		if isinstance(item, tuple):
			i = item[0]
			return item + (i * self.id, )
		else:
			return (item, item * self.id)
		# END handle tuple


class TestPerformanceThreadTask(ChannelThreadTask):
	"""Applies no operation to the item, and does not lock, measuring
	the actual throughput of the system"""
	
	def do_fun(self, item):
		return item


class TestVerifyChannelThreadTask(_TestTaskBase, ChannelThreadTask):
	"""An input channel task, which verifies the result of its input channels, 
	should be last in the chain.
	Id must be int"""
	
	def do_fun(self, item):
		"""return tuple(i, i*2)"""
		item = super(TestVerifyChannelThreadTask, self).do_fun(item)
		
		# make sure the computation order matches
		assert isinstance(item, tuple), "input was no tuple: %s" % item
		
		base = item[0]
		for id, num in enumerate(item[1:]):
			assert num == base * id, "%i != %i, orig = %s" % (num, base * id, str(item))
		# END verify order
		
		return item

#{ Utilities

def make_proxy_method(t):
	"""required to prevent binding self into the method we call"""
	wt = weakref.proxy(t)
	return lambda item: wt.do_fun(item)

def add_task_chain(p, ni, count=1, fail_setup=list(), feeder_channel=None, id_offset=0, 
					feedercls=TestThreadTask, transformercls=TestChannelThreadTask, 
					include_verifier=True):
	"""Create a task chain of feeder, count transformers and order verifcator 
	to the pool p, like t1 -> t2 -> t3
	:param fail_setup: a list of pairs, task_id, fail_after, i.e. [(2, 20)] would 
		make the third transformer fail after 20 items
	:param feeder_channel: if set to a channel, it will be used as input of the 
		first transformation task. The respective first task in the return value 
		will be None.
	:param id_offset: defines the id of the first transformation task, all subsequent
		ones will add one
	:return: tuple(list(task1, taskN, ...), list(rc1, rcN, ...))"""
	nt = p.num_tasks()
	
	feeder = None
	frc = feeder_channel
	if feeder_channel is None:
		feeder = make_iterator_task(ni, taskcls=feedercls)
		frc = p.add_task(feeder)
	# END handle specific feeder
	
	rcs = [frc]
	tasks = [feeder]
	
	inrc = frc
	for tc in xrange(count):
		t = transformercls(inrc, tc+id_offset, None)
		
		t.fun = make_proxy_method(t)
		#t.fun = t.do_fun
		inrc = p.add_task(t)
		
		tasks.append(t)
		rcs.append(inrc)
	# END create count transformers
	
	# setup failure
	for id, fail_after in fail_setup:
		tasks[1+id].fail_after = fail_after
	# END setup failure 
	
	if include_verifier:
		verifier = TestVerifyChannelThreadTask(inrc, 'verifier', None)
		#verifier.fun = verifier.do_fun
		verifier.fun = make_proxy_method(verifier)
		vrc = p.add_task(verifier)
		
		
		tasks.append(verifier)
		rcs.append(vrc)
	# END handle include verifier
	return tasks, rcs
	
def make_iterator_task(ni, taskcls=TestThreadTask, **kwargs):
	""":return: task which yields ni items
	:param taskcls: the actual iterator type to use
	:param kwargs: additional kwargs to be passed to the task"""
	t = taskcls(iter(range(ni)), 'iterator', None, **kwargs)
	if isinstance(t, _TestTaskBase):
		t.fun = make_proxy_method(t)
	return t

#} END utilities