This file is indexed.

/usr/share/pyshared/axiom/scheduler.py is in python-axiom 0.7.1-2.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
# -*- test-case-name: axiom.test.test_scheduler -*-

"""
Timed event scheduling for Axiom databases.

With this module, applications can schedule an L{Item} to have its C{run} method
called at a particular point in the future.  This call will happen even if the
process which initially schedules it exits and the database is later re-opened
by another process (of course, if the scheduled time comes and goes while no
process is using the database, then the call will be delayed until some process
opens the database and starts its services).

This module contains two implementations of the L{axiom.iaxiom.IScheduler}
interface, one for site stores and one for sub-stores.  Items can only be
scheduled using an L{IScheduler} implementations from the store containing the
item.  This means a typical way to schedule an item to be run is::

    IScheduler(item.store).schedule(item, when)

The scheduler service can also be retrieved from the site store's service
collection by name::

    IServiceCollection(siteStore).getServiceNamed(SITE_SCHEDULER)
"""

import warnings

from zope.interface import implements

from twisted.internet import reactor

from twisted.application.service import IService, Service
from twisted.python import log, failure

from epsilon.extime import Time

from axiom.iaxiom import IScheduler
from axiom.item import Item, declareLegacyItem
from axiom.attributes import AND, timestamp, reference, integer, inmemory, bytes
from axiom.dependency import uninstallFrom
from axiom.upgrade import registerUpgrader
from axiom.substore import SubStore

VERBOSE = False

SITE_SCHEDULER = u"Site Scheduler"


class TimedEventFailureLog(Item):
    typeName = 'timed_event_failure_log'
    schemaVersion = 1

    desiredTime = timestamp()
    actualTime = timestamp()

    runnable = reference()
    traceback = bytes()


class TimedEvent(Item):
    typeName = 'timed_event'
    schemaVersion = 1

    time = timestamp(indexed=True)
    runnable = reference()

    running = inmemory(doc='True if this event is currently running.')

    def activate(self):
        self.running = False


    def _rescheduleFromRun(self, newTime):
        """
        Schedule this event to be run at the indicated time, or if the
        indicated time is None, delete this event.
        """
        if newTime is None:
            self.deleteFromStore()
        else:
            self.time = newTime


    def invokeRunnable(self):
        """
        Run my runnable, and reschedule or delete myself based on its result.
        Must be run in a transaction.
        """
        runnable = self.runnable
        if runnable is None:
            self.deleteFromStore()
        else:
            try:
                self.running = True
                newTime = runnable.run()
            finally:
                self.running = False
            self._rescheduleFromRun(newTime)


    def handleError(self, now, failureObj):
        """ An error occurred running my runnable.  Check my runnable for an
        error-handling method called 'timedEventErrorHandler' that will take
        the given failure as an argument, and execute that if available:
        otherwise, create a TimedEventFailureLog with information about what
        happened to this event.

        Must be run in a transaction.
        """
        errorHandler = getattr(self.runnable, 'timedEventErrorHandler', None)
        if errorHandler is not None:
            self._rescheduleFromRun(errorHandler(self, failureObj))
        else:
            self._defaultErrorHandler(now, failureObj)


    def _defaultErrorHandler(self, now, failureObj):
        TimedEventFailureLog(store=self.store,
                             desiredTime=self.time,
                             actualTime=now,
                             runnable=self.runnable,
                             traceback=failureObj.getTraceback())
        self.deleteFromStore()



class _WackyControlFlow(Exception):
    def __init__(self, eventObject, failureObject):
        Exception.__init__(self, "User code failed during timed event")
        self.eventObject = eventObject
        self.failureObject = failureObject


MAX_WORK_PER_TICK = 10

class SchedulerMixin:
    def _oneTick(self, now):
        theEvent = self._getNextEvent(now)
        if theEvent is None:
            return False
        try:
            theEvent.invokeRunnable()
        except:
            raise _WackyControlFlow(theEvent, failure.Failure())
        self.lastEventAt = now
        return True


    def _getNextEvent(self, now):
        # o/` gonna party like it's 1984 o/`
        theEventL = list(self.store.query(TimedEvent,
                                          TimedEvent.time <= now,
                                          sort=TimedEvent.time.ascending,
                                          limit=1))
        if theEventL:
            return theEventL[0]


    def tick(self):
        now = self.now()
        self.nextEventAt = None
        workBeingDone = True
        workUnitsPerformed = 0
        errors = 0
        while workBeingDone and workUnitsPerformed < MAX_WORK_PER_TICK:
            try:
                workBeingDone = self.store.transact(self._oneTick, now)
            except _WackyControlFlow, wcf:
                self.store.transact(wcf.eventObject.handleError, now, wcf.failureObject)
                log.err(wcf.failureObject)
                errors += 1
                workBeingDone = True
            if workBeingDone:
                workUnitsPerformed += 1
        x = list(self.store.query(TimedEvent, sort=TimedEvent.time.ascending, limit=1))
        if x:
            self._transientSchedule(x[0].time, now)
        if errors or VERBOSE:
            log.msg("The scheduler ran %(eventCount)s events%(errors)s." % dict(
                    eventCount=workUnitsPerformed,
                    errors=(errors and (" (with %d errors)" % (errors,))) or ''))


    def schedule(self, runnable, when):
        TimedEvent(store=self.store, time=when, runnable=runnable)
        self._transientSchedule(when, self.now())


    def reschedule(self, runnable, fromWhen, toWhen):
        for evt in self.store.query(TimedEvent,
                                    AND(TimedEvent.time == fromWhen,
                                        TimedEvent.runnable == runnable)):
            evt.time = toWhen
            self._transientSchedule(toWhen, self.now())
            break
        else:
            raise ValueError("%r is not scheduled to run at %r" % (runnable, fromWhen))


    def unscheduleFirst(self, runnable):
        """
        Remove from given item from the schedule.

        If runnable is scheduled to run multiple times, only the temporally first
        is removed.
        """
        for evt in self.store.query(TimedEvent, TimedEvent.runnable == runnable, sort=TimedEvent.time.ascending):
            evt.deleteFromStore()
            break


    def unscheduleAll(self, runnable):
        for evt in self.store.query(TimedEvent, TimedEvent.runnable == runnable):
            evt.deleteFromStore()


    def scheduledTimes(self, runnable):
        """
        Return an iterable of the times at which the given item is scheduled to
        run.
        """
        events = self.store.query(
            TimedEvent, TimedEvent.runnable == runnable)
        return (event.time for event in events if not event.running)

_EPSILON = 1e-20      # A very small amount of time.



class _SiteScheduler(object, Service, SchedulerMixin):
    """
    Adapter from a site store to L{IScheduler}.
    """
    implements(IScheduler)

    timer = None
    callLater = reactor.callLater
    now = Time

    def __init__(self, store):
        self.store = store
        self.setName(SITE_SCHEDULER)


    def startService(self):
        """
        Start calling persistent timed events whose time has come.
        """
        super(_SiteScheduler, self).startService()
        self._transientSchedule(self.now(), self.now())


    def stopService(self):
        """
        Stop calling persistent timed events.
        """
        super(_SiteScheduler, self).stopService()
        if self.timer is not None:
            self.timer.cancel()
            self.timer = None


    def tick(self):
        self.timer = None
        return super(_SiteScheduler, self).tick()


    def _transientSchedule(self, when, now):
        """
        If the service is currently running, schedule a tick to happen no
        later than C{when}.

        @param when: The time at which to tick.
        @type when: L{epsilon.extime.Time}

        @param now: The current time.
        @type now: L{epsilon.extime.Time}
        """
        if not self.running:
            return
        if self.timer is not None:
            if self.timer.getTime() < when.asPOSIXTimestamp():
                return
            self.timer.cancel()
        delay = when.asPOSIXTimestamp() - now.asPOSIXTimestamp()

        # reactor.callLater allows only positive delay values.  The scheduler
        # may want to have scheduled things in the past and that's OK, since we
        # are dealing with Time() instances it's impossible to predict what
        # they are relative to the current time from user code anyway.
        delay = max(_EPSILON, delay)
        self.timer = self.callLater(delay, self.tick)
        self.nextEventAt = when



class _UserScheduler(object, Service, SchedulerMixin):
    """
    Adapter from a non-site store to L{IScheduler}.
    """
    implements(IScheduler)

    def __init__(self, store):
        self.store = store


    def now(self):
        """
        Report the current time, as reported by the parent's scheduler.
        """
        return IScheduler(self.store.parent).now()


    def _transientSchedule(self, when, now):
        """
        If this service's store is attached to its parent, ask the parent to
        schedule this substore to tick at the given time.

        @param when: The time at which to tick.
        @type when: L{epsilon.extime.Time}

        @param now: Present for signature compatibility with
            L{_SiteScheduler._transientSchedule}, but ignored otherwise.
        """
        if self.store.parent is not None:
            subStore = self.store.parent.getItemByID(self.store.idInParent)
            hook = self.store.parent.findOrCreate(
                _SubSchedulerParentHook,
                subStore=subStore)
            hook._schedule(when)


    def migrateDown(self):
        """
        Remove the components in the site store for this SubScheduler.
        """
        subStore = self.store.parent.getItemByID(self.store.idInParent)
        ssph = self.store.parent.findUnique(
            _SubSchedulerParentHook,
            _SubSchedulerParentHook.subStore == subStore,
            default=None)
        if ssph is not None:
            te = self.store.parent.findUnique(TimedEvent,
                                              TimedEvent.runnable == ssph,
                                              default=None)
            if te is not None:
                te.deleteFromStore()
            ssph.deleteFromStore()


    def migrateUp(self):
        """
        Recreate the hooks in the site store to trigger this SubScheduler.
        """
        te = self.store.findFirst(TimedEvent, sort=TimedEvent.time.descending)
        if te is not None:
            self._transientSchedule(te.time, None)



class _SchedulerCompatMixin(object):
    """
    Backwards compatibility helper for L{Scheduler} and L{SubScheduler}.

    This mixin provides all the attributes from L{IScheduler}, but provides
    them by adapting the L{Store} the item is in to L{IScheduler} and
    getting them from the resulting object.  Primarily in support of test
    code, it also supports rebinding those attributes by rebinding them on
    the L{IScheduler} powerup.

    @see: L{IScheduler}
    """
    implements(IScheduler)

    def forwardToReal(name):
        def get(self):
            return getattr(IScheduler(self.store), name)
        def set(self, value):
            setattr(IScheduler(self.store), name, value)
        return property(get, set)

    now = forwardToReal("now")
    tick = forwardToReal("tick")
    schedule = forwardToReal("schedule")
    reschedule = forwardToReal("reschedule")
    unschedule = forwardToReal("unschedule")
    unscheduleAll = forwardToReal("unscheduleAll")
    scheduledTimes = forwardToReal("scheduledTimes")


    def activate(self):
        """
        Whenever L{Scheduler} or L{SubScheduler} is created, either newly or
        when loaded from a database, emit a deprecation warning referring
        people to L{IScheduler}.
        """
        # This is unfortunate.  Perhaps it is the best thing which works (it is
        # the first I found). -exarkun
        if '_axiom_memory_dummy' in vars(self):
            stacklevel = 7
        else:
            stacklevel = 5
        warnings.warn(
            self.__class__.__name__ + " is deprecated since Axiom 0.5.32.  "
            "Just adapt stores to IScheduler.",
            category=PendingDeprecationWarning,
            stacklevel=stacklevel)



class Scheduler(Item, _SchedulerCompatMixin):
    """
    Track and execute persistent timed events for a I{site} store.

    This is deprecated and present only for backwards compatibility.  Adapt
    the store to L{IScheduler} instead.
    """
    implements(IService)

    typeName = 'axiom_scheduler'
    schemaVersion = 2

    dummy = integer()

    def activate(self):
        _SchedulerCompatMixin.activate(self)


    def setServiceParent(self, parent):
        """
        L{Scheduler} is no longer an L{IService}, but still provides this
        method as a no-op in case an instance which was still an L{IService}
        powerup is loaded (in which case it will be used like a service
        once).
        """



declareLegacyItem(
    Scheduler.typeName, 1,
    dict(eventsRun=integer(default=0),
         lastEventAt=timestamp(),
         nextEventAt=timestamp()))


def scheduler1to2(old):
    new = old.upgradeVersion(Scheduler.typeName, 1, 2)
    new.store.powerDown(new, IService)
    new.store.powerDown(new, IScheduler)
    return new

registerUpgrader(scheduler1to2, Scheduler.typeName, 1, 2)


class _SubSchedulerParentHook(Item):
    schemaVersion = 4
    typeName = 'axiom_subscheduler_parent_hook'

    subStore = reference(
        doc="""
        The L{SubStore} for which this scheduling hook exists.
        """, reftype=SubStore)

    def run(self):
        """
        Tick our C{subStore}'s L{SubScheduler}.
        """
        IScheduler(self.subStore).tick()


    def _schedule(self, when):
        """
        Ensure that this hook is scheduled to run at or before C{when}.
        """
        sched = IScheduler(self.store)
        for scheduledAt in sched.scheduledTimes(self):
            if when < scheduledAt:
                sched.reschedule(self, scheduledAt, when)
            break
        else:
            sched.schedule(self, when)


def upgradeParentHook1to2(oldHook):
    """
    Add the scheduler attribute to the given L{_SubSchedulerParentHook}.
    """
    newHook = oldHook.upgradeVersion(
        oldHook.typeName, 1, 2,
        loginAccount=oldHook.loginAccount,
        scheduledAt=oldHook.scheduledAt,
        scheduler=oldHook.store.findFirst(Scheduler))
    return newHook

registerUpgrader(upgradeParentHook1to2, _SubSchedulerParentHook.typeName, 1, 2)

declareLegacyItem(
    _SubSchedulerParentHook.typeName, 2,
    dict(loginAccount=reference(),
         scheduledAt=timestamp(default=None),
         scheduler=reference()))

def upgradeParentHook2to3(old):
    """
    Copy the C{loginAccount} attribute, but drop the others.
    """
    return old.upgradeVersion(
        old.typeName, 2, 3,
        loginAccount=old.loginAccount)

registerUpgrader(upgradeParentHook2to3, _SubSchedulerParentHook.typeName, 2, 3)

declareLegacyItem(
    _SubSchedulerParentHook.typeName, 3,
    dict(loginAccount=reference(),
         scheduler=reference()))

def upgradeParentHook3to4(old):
    """
    Copy C{loginAccount} to C{subStore} and remove the installation marker.
    """
    new = old.upgradeVersion(
        old.typeName, 3, 4, subStore=old.loginAccount)
    uninstallFrom(new, new.store)
    return new


registerUpgrader(upgradeParentHook3to4, _SubSchedulerParentHook.typeName, 3, 4)


class SubScheduler(Item, _SchedulerCompatMixin):
    """
    Track and execute persistent timed events for a substore.

    This is deprecated and present only for backwards compatibility.  Adapt
    the store to L{IScheduler} instead.
    """
    schemaVersion = 2
    typeName = 'axiom_subscheduler'

    dummy = integer()

    def activate(self):
        _SchedulerCompatMixin.activate(self)


def subscheduler1to2(old):
    new = old.upgradeVersion(SubScheduler.typeName, 1, 2)
    try:
        new.store.powerDown(new, IScheduler)
    except ValueError:
        # Someone might have created a SubScheduler but failed to power it
        # up.  Fine.
        pass
    return new

registerUpgrader(subscheduler1to2, SubScheduler.typeName, 1, 2)