/usr/lib/python2.7/dist-packages/carbon/routers.py is in graphite-carbon 0.9.12-3.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 | import imp
from carbon.relayrules import loadRelayRules
from carbon.hashing import ConsistentHashRing
class DatapointRouter:
"Interface for datapoint routing logic implementations"
def addDestination(self, destination):
"destination is a (host, port, instance) triple"
def removeDestination(self, destination):
"destination is a (host, port, instance) triple"
def getDestinations(self, key):
"""Generate the destinations where the given routing key should map to. Only
destinations which are configured (addDestination has been called for it)
may be generated by this method."""
class RelayRulesRouter(DatapointRouter):
def __init__(self, rules_path):
self.rules_path = rules_path
self.rules = loadRelayRules(rules_path)
self.destinations = set()
def addDestination(self, destination):
self.destinations.add(destination)
def removeDestination(self, destination):
self.destinations.discard(destination)
def getDestinations(self, key):
for rule in self.rules:
if rule.matches(key):
for destination in rule.destinations:
if destination in self.destinations:
yield destination
if not rule.continue_matching:
return
class ConsistentHashingRouter(DatapointRouter):
def __init__(self, replication_factor=1):
self.replication_factor = int(replication_factor)
self.instance_ports = {} # { (server, instance) : port }
self.ring = ConsistentHashRing([])
def addDestination(self, destination):
(server, port, instance) = destination
if (server, instance) in self.instance_ports:
raise Exception("destination instance (%s, %s) already configured" % (server, instance))
self.instance_ports[(server, instance)] = port
self.ring.add_node((server, instance))
def removeDestination(self, destination):
(server, port, instance) = destination
if (server, instance) not in self.instance_ports:
raise Exception("destination instance (%s, %s) not configured" % (server, instance))
del self.instance_ports[(server, instance)]
self.ring.remove_node((server, instance))
def getDestinations(self, metric):
key = self.getKey(metric)
for count,node in enumerate(self.ring.get_nodes(key)):
if count == self.replication_factor:
return
(server, instance) = node
port = self.instance_ports[ (server, instance) ]
yield (server, port, instance)
def getKey(self, metric):
return metric
def setKeyFunction(self, func):
self.getKey = func
def setKeyFunctionFromModule(self, keyfunc_spec):
module_path, func_name = keyfunc_spec.rsplit(':', 1)
module_file = open(module_path, 'U')
description = ('.py', 'U', imp.PY_SOURCE)
module = imp.load_module('keyfunc_module', module_file, module_path, description)
keyfunc = getattr(module, func_name)
self.setKeyFunction(keyfunc)
class AggregatedConsistentHashingRouter(DatapointRouter):
def __init__(self, agg_rules_manager, replication_factor=1):
self.hash_router = ConsistentHashingRouter(replication_factor)
self.agg_rules_manager = agg_rules_manager
def addDestination(self, destination):
self.hash_router.addDestination(destination)
def removeDestination(self, destination):
self.hash_router.removeDestination(destination)
def getDestinations(self, key):
# resolve metric to aggregate forms
resolved_metrics = []
for rule in self.agg_rules_manager.rules:
aggregate_metric = rule.get_aggregate_metric(key)
if aggregate_metric is None:
continue
else:
resolved_metrics.append(aggregate_metric)
# if the metric will not be aggregated, send it raw
# (will pass through aggregation)
if len(resolved_metrics) == 0:
resolved_metrics.append(key)
# get consistent hashing destinations based on aggregate forms
destinations = set()
for resolved_metric in resolved_metrics:
for destination in self.hash_router.getDestinations(resolved_metric):
destinations.add(destination)
for destination in destinations:
yield destination
|