/usr/share/gocode/src/github.com/armon/go-metrics/statsite.go is in golang-github-armon-go-metrics-dev 0.0~git20150601-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 | package metrics
import (
"bufio"
"fmt"
"log"
"net"
"strings"
"time"
)
const (
// We force flush the statsite metrics after this period of
// inactivity. Prevents stats from getting stuck in a buffer
// forever.
flushInterval = 100 * time.Millisecond
)
// StatsiteSink provides a MetricSink that can be used with a
// statsite metrics server
type StatsiteSink struct {
addr string
metricQueue chan string
}
// NewStatsiteSink is used to create a new StatsiteSink
func NewStatsiteSink(addr string) (*StatsiteSink, error) {
s := &StatsiteSink{
addr: addr,
metricQueue: make(chan string, 4096),
}
go s.flushMetrics()
return s, nil
}
// Close is used to stop flushing to statsite
func (s *StatsiteSink) Shutdown() {
close(s.metricQueue)
}
func (s *StatsiteSink) SetGauge(key []string, val float32) {
flatKey := s.flattenKey(key)
s.pushMetric(fmt.Sprintf("%s:%f|g\n", flatKey, val))
}
func (s *StatsiteSink) EmitKey(key []string, val float32) {
flatKey := s.flattenKey(key)
s.pushMetric(fmt.Sprintf("%s:%f|kv\n", flatKey, val))
}
func (s *StatsiteSink) IncrCounter(key []string, val float32) {
flatKey := s.flattenKey(key)
s.pushMetric(fmt.Sprintf("%s:%f|c\n", flatKey, val))
}
func (s *StatsiteSink) AddSample(key []string, val float32) {
flatKey := s.flattenKey(key)
s.pushMetric(fmt.Sprintf("%s:%f|ms\n", flatKey, val))
}
// Flattens the key for formatting, removes spaces
func (s *StatsiteSink) flattenKey(parts []string) string {
joined := strings.Join(parts, ".")
return strings.Map(func(r rune) rune {
switch r {
case ':':
fallthrough
case ' ':
return '_'
default:
return r
}
}, joined)
}
// Does a non-blocking push to the metrics queue
func (s *StatsiteSink) pushMetric(m string) {
select {
case s.metricQueue <- m:
default:
}
}
// Flushes metrics
func (s *StatsiteSink) flushMetrics() {
var sock net.Conn
var err error
var wait <-chan time.Time
var buffered *bufio.Writer
ticker := time.NewTicker(flushInterval)
defer ticker.Stop()
CONNECT:
// Attempt to connect
sock, err = net.Dial("tcp", s.addr)
if err != nil {
log.Printf("[ERR] Error connecting to statsite! Err: %s", err)
goto WAIT
}
// Create a buffered writer
buffered = bufio.NewWriter(sock)
for {
select {
case metric, ok := <-s.metricQueue:
// Get a metric from the queue
if !ok {
goto QUIT
}
// Try to send to statsite
_, err := buffered.Write([]byte(metric))
if err != nil {
log.Printf("[ERR] Error writing to statsite! Err: %s", err)
goto WAIT
}
case <-ticker.C:
if err := buffered.Flush(); err != nil {
log.Printf("[ERR] Error flushing to statsite! Err: %s", err)
goto WAIT
}
}
}
WAIT:
// Wait for a while
wait = time.After(time.Duration(5) * time.Second)
for {
select {
// Dequeue the messages to avoid backlog
case _, ok := <-s.metricQueue:
if !ok {
goto QUIT
}
case <-wait:
goto CONNECT
}
}
QUIT:
s.metricQueue = nil
}
|