blob: 65a5021a0574327559bbaae63c39ca25fd76084d [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001package metrics
2
3import (
4 "bytes"
5 "fmt"
6 "log"
7 "net"
8 "strings"
9 "time"
10)
11
12const (
13 // statsdMaxLen is the maximum size of a packet
14 // to send to statsd
15 statsdMaxLen = 1400
16)
17
18// StatsdSink provides a MetricSink that can be used
19// with a statsite or statsd metrics server. It uses
20// only UDP packets, while StatsiteSink uses TCP.
21type StatsdSink struct {
22 addr string
23 metricQueue chan string
24}
25
26// NewStatsdSink is used to create a new StatsdSink
27func NewStatsdSink(addr string) (*StatsdSink, error) {
28 s := &StatsdSink{
29 addr: addr,
30 metricQueue: make(chan string, 4096),
31 }
32 go s.flushMetrics()
33 return s, nil
34}
35
36// Close is used to stop flushing to statsd
37func (s *StatsdSink) Shutdown() {
38 close(s.metricQueue)
39}
40
41func (s *StatsdSink) SetGauge(key []string, val float32) {
42 flatKey := s.flattenKey(key)
43 s.pushMetric(fmt.Sprintf("%s:%f|g\n", flatKey, val))
44}
45
46func (s *StatsdSink) EmitKey(key []string, val float32) {
47 flatKey := s.flattenKey(key)
48 s.pushMetric(fmt.Sprintf("%s:%f|kv\n", flatKey, val))
49}
50
51func (s *StatsdSink) IncrCounter(key []string, val float32) {
52 flatKey := s.flattenKey(key)
53 s.pushMetric(fmt.Sprintf("%s:%f|c\n", flatKey, val))
54}
55
56func (s *StatsdSink) AddSample(key []string, val float32) {
57 flatKey := s.flattenKey(key)
58 s.pushMetric(fmt.Sprintf("%s:%f|ms\n", flatKey, val))
59}
60
61// Flattens the key for formatting, removes spaces
62func (s *StatsdSink) flattenKey(parts []string) string {
63 joined := strings.Join(parts, ".")
64 return strings.Map(func(r rune) rune {
65 switch r {
66 case ':':
67 fallthrough
68 case ' ':
69 return '_'
70 default:
71 return r
72 }
73 }, joined)
74}
75
76// Does a non-blocking push to the metrics queue
77func (s *StatsdSink) pushMetric(m string) {
78 select {
79 case s.metricQueue <- m:
80 default:
81 }
82}
83
84// Flushes metrics
85func (s *StatsdSink) flushMetrics() {
86 var sock net.Conn
87 var err error
88 var wait <-chan time.Time
89 ticker := time.NewTicker(flushInterval)
90 defer ticker.Stop()
91
92CONNECT:
93 // Create a buffer
94 buf := bytes.NewBuffer(nil)
95
96 // Attempt to connect
97 sock, err = net.Dial("udp", s.addr)
98 if err != nil {
99 log.Printf("[ERR] Error connecting to statsd! Err: %s", err)
100 goto WAIT
101 }
102
103 for {
104 select {
105 case metric, ok := <-s.metricQueue:
106 // Get a metric from the queue
107 if !ok {
108 goto QUIT
109 }
110
111 // Check if this would overflow the packet size
112 if len(metric)+buf.Len() > statsdMaxLen {
113 _, err := sock.Write(buf.Bytes())
114 buf.Reset()
115 if err != nil {
116 log.Printf("[ERR] Error writing to statsd! Err: %s", err)
117 goto WAIT
118 }
119 }
120
121 // Append to the buffer
122 buf.WriteString(metric)
123
124 case <-ticker.C:
125 if buf.Len() == 0 {
126 continue
127 }
128
129 _, err := sock.Write(buf.Bytes())
130 buf.Reset()
131 if err != nil {
132 log.Printf("[ERR] Error flushing to statsd! Err: %s", err)
133 goto WAIT
134 }
135 }
136 }
137
138WAIT:
139 // Wait for a while
140 wait = time.After(time.Duration(5) * time.Second)
141 for {
142 select {
143 // Dequeue the messages to avoid backlog
144 case _, ok := <-s.metricQueue:
145 if !ok {
146 goto QUIT
147 }
148 case <-wait:
149 goto CONNECT
150 }
151 }
152QUIT:
153 s.metricQueue = nil
154}