blob: 68730139a73a7e25eb6e9c2b2040099a52de3a39 [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001package metrics
2
3import (
4 "bufio"
5 "fmt"
6 "log"
7 "net"
8 "strings"
9 "time"
10)
11
12const (
13 // We force flush the statsite metrics after this period of
14 // inactivity. Prevents stats from getting stuck in a buffer
15 // forever.
16 flushInterval = 100 * time.Millisecond
17)
18
19// StatsiteSink provides a MetricSink that can be used with a
20// statsite metrics server
21type StatsiteSink struct {
22 addr string
23 metricQueue chan string
24}
25
26// NewStatsiteSink is used to create a new StatsiteSink
27func NewStatsiteSink(addr string) (*StatsiteSink, error) {
28 s := &StatsiteSink{
29 addr: addr,
30 metricQueue: make(chan string, 4096),
31 }
32 go s.flushMetrics()
33 return s, nil
34}
35
36// Close is used to stop flushing to statsite
37func (s *StatsiteSink) Shutdown() {
38 close(s.metricQueue)
39}
40
41func (s *StatsiteSink) SetGauge(key []string, val float32) {
42 flatKey := s.flattenKey(key)
43 s.pushMetric(fmt.Sprintf("%s:%f|g\n", flatKey, val))
44}
45
46func (s *StatsiteSink) EmitKey(key []string, val float32) {
47 flatKey := s.flattenKey(key)
48 s.pushMetric(fmt.Sprintf("%s:%f|kv\n", flatKey, val))
49}
50
51func (s *StatsiteSink) IncrCounter(key []string, val float32) {
52 flatKey := s.flattenKey(key)
53 s.pushMetric(fmt.Sprintf("%s:%f|c\n", flatKey, val))
54}
55
56func (s *StatsiteSink) AddSample(key []string, val float32) {
57 flatKey := s.flattenKey(key)
58 s.pushMetric(fmt.Sprintf("%s:%f|ms\n", flatKey, val))
59}
60
61// Flattens the key for formatting, removes spaces
62func (s *StatsiteSink) flattenKey(parts []string) string {
63 joined := strings.Join(parts, ".")
64 return strings.Map(func(r rune) rune {
65 switch r {
66 case ':':
67 fallthrough
68 case ' ':
69 return '_'
70 default:
71 return r
72 }
73 }, joined)
74}
75
76// Does a non-blocking push to the metrics queue
77func (s *StatsiteSink) pushMetric(m string) {
78 select {
79 case s.metricQueue <- m:
80 default:
81 }
82}
83
84// Flushes metrics
85func (s *StatsiteSink) flushMetrics() {
86 var sock net.Conn
87 var err error
88 var wait <-chan time.Time
89 var buffered *bufio.Writer
90 ticker := time.NewTicker(flushInterval)
91 defer ticker.Stop()
92
93CONNECT:
94 // Attempt to connect
95 sock, err = net.Dial("tcp", s.addr)
96 if err != nil {
97 log.Printf("[ERR] Error connecting to statsite! Err: %s", err)
98 goto WAIT
99 }
100
101 // Create a buffered writer
102 buffered = bufio.NewWriter(sock)
103
104 for {
105 select {
106 case metric, ok := <-s.metricQueue:
107 // Get a metric from the queue
108 if !ok {
109 goto QUIT
110 }
111
112 // Try to send to statsite
113 _, err := buffered.Write([]byte(metric))
114 if err != nil {
115 log.Printf("[ERR] Error writing to statsite! Err: %s", err)
116 goto WAIT
117 }
118 case <-ticker.C:
119 if err := buffered.Flush(); err != nil {
120 log.Printf("[ERR] Error flushing to statsite! Err: %s", err)
121 goto WAIT
122 }
123 }
124 }
125
126WAIT:
127 // Wait for a while
128 wait = time.After(time.Duration(5) * time.Second)
129 for {
130 select {
131 // Dequeue the messages to avoid backlog
132 case _, ok := <-s.metricQueue:
133 if !ok {
134 goto QUIT
135 }
136 case <-wait:
137 goto CONNECT
138 }
139 }
140QUIT:
141 s.metricQueue = nil
142}