This file is indexed.

/usr/lib/python2.7/dist-packages/carbon/cache.py is in graphite-carbon 1.0.2-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
"""Copyright 2009 Chris Davis

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

   http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License."""

import time
import threading
from operator import itemgetter
from random import choice
from collections import defaultdict

from carbon.conf import settings
from carbon import events, log
from carbon.pipeline import Processor


def by_timestamp((timestamp, value)):  # useful sort key function
  return timestamp


class CacheFeedingProcessor(Processor):
  plugin_name = 'write'

  def __init__(self, *args, **kwargs):
    super(Processor, self).__init__(*args, **kwargs)
    self.cache = MetricCache()

  def process(self, metric, datapoint):
    self.cache.store(metric, datapoint)
    return Processor.NO_OUTPUT


class DrainStrategy(object):
  """Implements the strategy for writing metrics.
  The strategy chooses what order (if any) metrics
  will be popped from the backing cache"""
  def __init__(self, cache):
    self.cache = cache

  def choose_item(self):
    raise NotImplemented


class NaiveStrategy(DrainStrategy):
  """Pop points in an unordered fashion."""
  def __init__(self, cache):
    super(NaiveStrategy, self).__init__(cache)

    def _generate_queue():
      while True:
        metric_names = self.cache.keys()
        while metric_names:
          yield metric_names.pop()

    self.queue = _generate_queue()

  def choose_item(self):
    return self.queue.next()


class MaxStrategy(DrainStrategy):
  """Always pop the metric with the greatest number of points stored.
  This method leads to less variance in pointsPerUpdate but may mean
  that infrequently or irregularly updated metrics may not be written
  until shutdown """
  def choose_item(self):
    metric_name, size = max(self.cache.items(), key=lambda x: len(itemgetter(1)(x)))
    return metric_name


class RandomStrategy(DrainStrategy):
  """Pop points randomly"""
  def choose_item(self):
    return choice(self.cache.keys())


class SortedStrategy(DrainStrategy):
  """ The default strategy which prefers metrics with a greater number
  of cached points but guarantees every point gets written exactly once during
  a loop of the cache """
  def __init__(self, cache):
    super(SortedStrategy, self).__init__(cache)

    def _generate_queue():
      while True:
        t = time.time()
        metric_counts = sorted(self.cache.counts, key=lambda x: x[1])
        size = len(metric_counts)
        if settings.LOG_CACHE_QUEUE_SORTS and size:
          log.msg("Sorted %d cache queues in %.6f seconds" % (size, time.time() - t))
        while metric_counts:
          yield itemgetter(0)(metric_counts.pop())
        if settings.LOG_CACHE_QUEUE_SORTS and size:
          log.msg("Queue consumed in %.6f seconds" % (time.time() - t))

    self.queue = _generate_queue()

  def choose_item(self):
    return self.queue.next()


class TimeSortedStrategy(DrainStrategy):
  """ This strategy prefers metrics wich are lagging behind
  guarantees every point gets written exactly once during
  a loop of the cache """
  def __init__(self, cache):
    super(TimeSortedStrategy, self).__init__(cache)

    def _generate_queue():
      while True:
        t = time.time()
        metric_lw = sorted(self.cache.watermarks, key=lambda x: x[1], reverse=True)
        size = len(metric_lw)
        if settings.LOG_CACHE_QUEUE_SORTS and size:
          log.msg("Sorted %d cache queues in %.6f seconds" % (size, time.time() - t))
        while metric_lw:
          yield itemgetter(0)(metric_lw.pop())
        if settings.LOG_CACHE_QUEUE_SORTS and size:
          log.msg("Queue consumed in %.6f seconds" % (time.time() - t))

    self.queue = _generate_queue()

  def choose_item(self):
    return self.queue.next()


class _MetricCache(defaultdict):
  """A Singleton dictionary of metric names and lists of their datapoints"""
  def __init__(self, strategy=None):
    self.lock = threading.Lock()
    self.size = 0
    self.strategy = None
    if strategy:
      self.strategy = strategy(self)
    super(_MetricCache, self).__init__(dict)

  @property
  def counts(self):
    return [(metric, len(datapoints)) for (metric, datapoints) in self.items()]

  @property
  def watermarks(self):
    return [(metric, min(datapoints.keys()), max(datapoints.keys()))
            for (metric, datapoints) in self.items()
            if datapoints]

  @property
  def is_full(self):
    if settings.MAX_CACHE_SIZE == float('inf'):
      return False
    else:
      return self.size >= settings.MAX_CACHE_SIZE

  def _check_available_space(self):
    if state.cacheTooFull and self.size < settings.CACHE_SIZE_LOW_WATERMARK:
      log.msg("MetricCache below watermark: self.size=%d" % self.size)
      events.cacheSpaceAvailable()

  def drain_metric(self):
    """Returns a metric and it's datapoints in order determined by the
    `DrainStrategy`_"""
    if not self:
      return (None, [])
    if self.strategy:
      metric = self.strategy.choose_item()
    else:
      # Avoid .keys() as it dumps the whole list
      metric = self.iterkeys().next()
    return (metric, self.pop(metric))

  def get_datapoints(self, metric):
    """Return a list of currently cached datapoints sorted by timestamp"""
    return sorted(self.get(metric, {}).items(), key=by_timestamp)

  def pop(self, metric):
    with self.lock:
      datapoint_index = defaultdict.pop(self, metric)
      self.size -= len(datapoint_index)
    self._check_available_space()

    return sorted(datapoint_index.items(), key=by_timestamp)

  def store(self, metric, datapoint):
    timestamp, value = datapoint
    if timestamp not in self[metric]:
      # Not a duplicate, hence process if cache is not full
      if self.is_full:
        log.msg("MetricCache is full: self.size=%d" % self.size)
        events.cacheFull()
      else:
        with self.lock:
          self.size += 1
          self[metric][timestamp] = value
    else:
      # Updating a duplicate does not increase the cache size
      self[metric][timestamp] = value


_Cache = None

def MetricCache():
  global _Cache
  if _Cache is not None:
    return _Cache

  # Initialize a singleton cache instance
  # TODO: use plugins.
  write_strategy = None
  if settings.CACHE_WRITE_STRATEGY == 'naive':
    write_strategy = NaiveStrategy
  if settings.CACHE_WRITE_STRATEGY == 'max':
    write_strategy = MaxStrategy
  if settings.CACHE_WRITE_STRATEGY == 'sorted':
    write_strategy = SortedStrategy
  if settings.CACHE_WRITE_STRATEGY == 'timesorted':
    write_strategy = TimeSortedStrategy
  if settings.CACHE_WRITE_STRATEGY == 'random':
    write_strategy = RandomStrategy

  _Cache = _MetricCache(write_strategy)
  return _Cache



# Avoid import circularities
from carbon import state