/usr/lib/python2.7/dist-packages/celery/worker/heartbeat.py is in python-celery 4.1.0-2ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 | # -*- coding: utf-8 -*-
"""Heartbeat service.
This is the internal thread responsible for sending heartbeat events
at regular intervals (may not be an actual thread).
"""
from __future__ import absolute_import, unicode_literals
from celery.signals import heartbeat_sent
from celery.utils.sysinfo import load_average
from .state import SOFTWARE_INFO, active_requests, all_total_count
__all__ = ['Heart']
class Heart(object):
"""Timer sending heartbeats at regular intervals.
Arguments:
timer (kombu.async.timer.Timer): Timer to use.
eventer (celery.events.EventDispatcher): Event dispatcher
to use.
interval (float): Time in seconds between sending
heartbeats. Default is 2 seconds.
"""
def __init__(self, timer, eventer, interval=None):
self.timer = timer
self.eventer = eventer
self.interval = float(interval or 2.0)
self.tref = None
# Make event dispatcher start/stop us when enabled/disabled.
self.eventer.on_enabled.add(self.start)
self.eventer.on_disabled.add(self.stop)
# Only send heartbeat_sent signal if it has receivers.
self._send_sent_signal = (
heartbeat_sent.send if heartbeat_sent.receivers else None)
def _send(self, event):
if self._send_sent_signal is not None:
self._send_sent_signal(sender=self)
return self.eventer.send(event, freq=self.interval,
active=len(active_requests),
processed=all_total_count[0],
loadavg=load_average(),
**SOFTWARE_INFO)
def start(self):
if self.eventer.enabled:
self._send('worker-online')
self.tref = self.timer.call_repeatedly(
self.interval, self._send, ('worker-heartbeat',),
)
def stop(self):
if self.tref is not None:
self.timer.cancel(self.tref)
self.tref = None
if self.eventer.enabled:
self._send('worker-offline')
|