/usr/share/pyshared/celery/bin/celeryev.py is in python-celery 2.4.6-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 | # -*- coding: utf-8 -*-
from __future__ import absolute_import
from __future__ import with_statement
import os
import sys
from functools import partial
from celery import platforms
from celery.platforms import detached
from celery.bin.base import Command, Option, daemon_options
class EvCommand(Command):
supports_args = False
preload_options = (Command.preload_options
+ daemon_options(default_pidfile="celeryev.pid"))
def run(self, dump=False, camera=None, frequency=1.0, maxrate=None,
loglevel="INFO", logfile=None, prog_name="celeryev",
pidfile=None, uid=None, gid=None, umask=None,
working_directory=None, detach=False, **kwargs):
self.prog_name = prog_name
if dump:
return self.run_evdump()
if camera:
return self.run_evcam(camera, freq=frequency, maxrate=maxrate,
loglevel=loglevel, logfile=logfile,
pidfile=pidfile, uid=uid, gid=gid,
umask=umask,
working_directory=working_directory,
detach=detach)
return self.run_evtop()
def prepare_preload_options(self, options):
workdir = options.get("working_directory")
if workdir:
os.chdir(workdir)
def run_evdump(self):
from celery.events.dumper import evdump
self.set_process_status("dump")
return evdump(app=self.app)
def run_evtop(self):
from celery.events.cursesmon import evtop
self.set_process_status("top")
return evtop(app=self.app)
def run_evcam(self, camera, logfile=None, pidfile=None, uid=None,
gid=None, umask=None, working_directory=None,
detach=False, **kwargs):
from celery.events.snapshot import evcam
workdir = working_directory
self.set_process_status("cam")
kwargs["app"] = self.app
cam = partial(evcam, camera,
logfile=logfile, pidfile=pidfile, **kwargs)
if detach:
with detached(logfile, pidfile, uid, gid, umask, workdir):
return cam()
else:
return cam()
def set_process_status(self, prog, info=""):
prog = "%s:%s" % (self.prog_name, prog)
info = "%s %s" % (info, platforms.strargv(sys.argv))
return platforms.set_process_title(prog, info=info)
def get_options(self):
return (
Option('-d', '--dump',
action="store_true", dest="dump",
help="Dump events to stdout."),
Option('-c', '--camera',
action="store", dest="camera",
help="Camera class to take event snapshots with."),
Option('--detach',
default=False, action="store_true", dest="detach",
help="Recording: Detach and run in the background."),
Option('-F', '--frequency', '--freq',
action="store", dest="frequency",
type="float", default=1.0,
help="Recording: Snapshot frequency."),
Option('-r', '--maxrate',
action="store", dest="maxrate", default=None,
help="Recording: Shutter rate limit (e.g. 10/m)"),
Option('-l', '--loglevel',
action="store", dest="loglevel", default="INFO",
help="Loglevel. Default is WARNING."))
def main():
ev = EvCommand()
ev.execute_from_commandline()
if __name__ == "__main__": # pragma: no cover
main()
|