This file is indexed.

/usr/share/pyshared/carbon/routers.py is in graphite-carbon 0.9.12-3.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import imp
from carbon.relayrules import loadRelayRules
from carbon.hashing import ConsistentHashRing


class DatapointRouter:
  "Interface for datapoint routing logic implementations"

  def addDestination(self, destination):
    "destination is a (host, port, instance) triple"

  def removeDestination(self, destination):
    "destination is a (host, port, instance) triple"

  def getDestinations(self, key):
    """Generate the destinations where the given routing key should map to. Only
    destinations which are configured (addDestination has been called for it)
    may be generated by this method."""


class RelayRulesRouter(DatapointRouter):
  def __init__(self, rules_path):
    self.rules_path = rules_path
    self.rules = loadRelayRules(rules_path)
    self.destinations = set()

  def addDestination(self, destination):
    self.destinations.add(destination)

  def removeDestination(self, destination):
    self.destinations.discard(destination)

  def getDestinations(self, key):
    for rule in self.rules:
      if rule.matches(key):
        for destination in rule.destinations:
          if destination in self.destinations:
            yield destination
        if not rule.continue_matching:
          return


class ConsistentHashingRouter(DatapointRouter):
  def __init__(self, replication_factor=1):
    self.replication_factor = int(replication_factor)
    self.instance_ports = {}  # { (server, instance) : port }
    self.ring = ConsistentHashRing([])

  def addDestination(self, destination):
    (server, port, instance) = destination
    if (server, instance) in self.instance_ports:
      raise Exception("destination instance (%s, %s) already configured" % (server, instance))
    self.instance_ports[(server, instance)] = port
    self.ring.add_node((server, instance))

  def removeDestination(self, destination):
    (server, port, instance) = destination
    if (server, instance) not in self.instance_ports:
      raise Exception("destination instance (%s, %s) not configured" % (server, instance))
    del self.instance_ports[(server, instance)]
    self.ring.remove_node((server, instance))

  def getDestinations(self, metric):
    key = self.getKey(metric)

    for count,node in enumerate(self.ring.get_nodes(key)):
      if count == self.replication_factor:
        return
      (server, instance) = node
      port = self.instance_ports[ (server, instance) ]
      yield (server, port, instance)

  def getKey(self, metric):
    return metric

  def setKeyFunction(self, func):
    self.getKey = func

  def setKeyFunctionFromModule(self, keyfunc_spec):
    module_path, func_name = keyfunc_spec.rsplit(':', 1)
    module_file = open(module_path, 'U')
    description = ('.py', 'U', imp.PY_SOURCE)
    module = imp.load_module('keyfunc_module', module_file, module_path, description)
    keyfunc = getattr(module, func_name)
    self.setKeyFunction(keyfunc)

class AggregatedConsistentHashingRouter(DatapointRouter):
  def __init__(self, agg_rules_manager, replication_factor=1):
    self.hash_router = ConsistentHashingRouter(replication_factor)
    self.agg_rules_manager = agg_rules_manager

  def addDestination(self, destination):
    self.hash_router.addDestination(destination)

  def removeDestination(self, destination):
    self.hash_router.removeDestination(destination)

  def getDestinations(self, key):
    # resolve metric to aggregate forms
    resolved_metrics = []
    for rule in self.agg_rules_manager.rules:
      aggregate_metric = rule.get_aggregate_metric(key)
      if aggregate_metric is None:
        continue
      else:
        resolved_metrics.append(aggregate_metric)

    # if the metric will not be aggregated, send it raw
    # (will pass through aggregation)
    if len(resolved_metrics) == 0:
      resolved_metrics.append(key)

    # get consistent hashing destinations based on aggregate forms
    destinations = set()
    for resolved_metric in resolved_metrics:
      for destination in self.hash_router.getDestinations(resolved_metric):
        destinations.add(destination)

    for destination in destinations:
      yield destination