/usr/share/pyshared/quodlibet/util/copool.py is in exfalso 3.0.2-3.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 | # Copyright 2006 Joe Wreschnig, Alexandre Passos
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation
"""Manage a pool of routines using Python iterators."""
from gi.repository import GLib
__routines = {}
__paused = {}
def __wrap(func, funcid, args, kwargs):
for value in func(*args, **kwargs):
yield True
remove(funcid)
yield False
def add(func, *args, **kwargs):
"""Register a routine to run in GLib main loop.
func should be a function that returns a Python iterator (e.g.
generator) that provides values until it should stop being called.
Optional Keyword Arguments:
priority -- priority to run at (default PRIORITY_LOW)
funcid -- mutex/removal identifier for this function
timeout -- use timeout_add (with given timeout) instead of idle_add
Only one function with the same funcid can be running at once.
Starting a new function with the same ID will stop the old one. If
no funcid is given, the function itself is used. The funcid must
be usable as a hash key.
"""
funcid = kwargs.pop("funcid", func)
if funcid in __routines or funcid in __paused:
remove(funcid)
priority = kwargs.pop("priority", GLib.PRIORITY_LOW)
timeout = kwargs.pop("timeout", None)
next = __wrap(func, funcid, args, kwargs).next
if timeout:
src_id = GLib.timeout_add(timeout, next, priority=priority)
else:
src_id = GLib.idle_add(next, priority=priority)
__routines[funcid] = (src_id, next, priority, timeout)
print_d("Added copool function %r with id %r" % (func, funcid))
def remove(funcid):
"""Stop a registered routine."""
if funcid in __routines:
GLib.source_remove(__routines[funcid][0])
del(__routines[funcid])
if funcid in __paused:
del(__paused[funcid])
print_d("Removed copool function id %r" % funcid)
def remove_all():
"""Stop all running routines."""
for funcid in __routines.keys():
remove(funcid)
def pause(funcid):
"""Temporarily pause a registered routine."""
func = __routines[funcid]
remove(funcid)
__paused[funcid] = func
def resume(funcid):
"""Resume a paused routine."""
if funcid in __paused:
old_src_id, func, priority, timeout = __paused[funcid]
del(__paused[funcid])
if timeout:
src_id = GLib.timeout_add(timeout, func, priority=priority)
else:
src_id = GLib.idle_add(func, priority=priority)
__routines[funcid] = (src_id, func, priority, timeout)
def step(funcid):
"""Force this function to iterate once."""
if funcid in __routines:
__routines[funcid][1]()
elif funcid in __paused:
__paused[funcid]()
else:
raise ValueError("no pooled routine %r" % funcid)
|