blob: 1bfffce46e217e209c6109c76daab38d1eec33a9 [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001package metrics
2
3import (
4 "bytes"
5 "fmt"
6 "log"
7 "net"
Stephane Barbarie260a5632019-02-26 16:12:49 -05008 "net/url"
khenaidooac637102019-01-14 15:44:34 -05009 "strings"
10 "time"
11)
12
13const (
14 // statsdMaxLen is the maximum size of a packet
15 // to send to statsd
16 statsdMaxLen = 1400
17)
18
19// StatsdSink provides a MetricSink that can be used
20// with a statsite or statsd metrics server. It uses
21// only UDP packets, while StatsiteSink uses TCP.
22type StatsdSink struct {
23 addr string
24 metricQueue chan string
25}
26
Stephane Barbarie260a5632019-02-26 16:12:49 -050027// NewStatsdSinkFromURL creates an StatsdSink from a URL. It is used
28// (and tested) from NewMetricSinkFromURL.
29func NewStatsdSinkFromURL(u *url.URL) (MetricSink, error) {
30 return NewStatsdSink(u.Host)
31}
32
khenaidooac637102019-01-14 15:44:34 -050033// NewStatsdSink is used to create a new StatsdSink
34func NewStatsdSink(addr string) (*StatsdSink, error) {
35 s := &StatsdSink{
36 addr: addr,
37 metricQueue: make(chan string, 4096),
38 }
39 go s.flushMetrics()
40 return s, nil
41}
42
43// Close is used to stop flushing to statsd
44func (s *StatsdSink) Shutdown() {
45 close(s.metricQueue)
46}
47
48func (s *StatsdSink) SetGauge(key []string, val float32) {
49 flatKey := s.flattenKey(key)
50 s.pushMetric(fmt.Sprintf("%s:%f|g\n", flatKey, val))
51}
52
Stephane Barbarie260a5632019-02-26 16:12:49 -050053func (s *StatsdSink) SetGaugeWithLabels(key []string, val float32, labels []Label) {
54 flatKey := s.flattenKeyLabels(key, labels)
55 s.pushMetric(fmt.Sprintf("%s:%f|g\n", flatKey, val))
56}
57
khenaidooac637102019-01-14 15:44:34 -050058func (s *StatsdSink) EmitKey(key []string, val float32) {
59 flatKey := s.flattenKey(key)
60 s.pushMetric(fmt.Sprintf("%s:%f|kv\n", flatKey, val))
61}
62
63func (s *StatsdSink) IncrCounter(key []string, val float32) {
64 flatKey := s.flattenKey(key)
65 s.pushMetric(fmt.Sprintf("%s:%f|c\n", flatKey, val))
66}
67
Stephane Barbarie260a5632019-02-26 16:12:49 -050068func (s *StatsdSink) IncrCounterWithLabels(key []string, val float32, labels []Label) {
69 flatKey := s.flattenKeyLabels(key, labels)
70 s.pushMetric(fmt.Sprintf("%s:%f|c\n", flatKey, val))
71}
72
khenaidooac637102019-01-14 15:44:34 -050073func (s *StatsdSink) AddSample(key []string, val float32) {
74 flatKey := s.flattenKey(key)
75 s.pushMetric(fmt.Sprintf("%s:%f|ms\n", flatKey, val))
76}
77
Stephane Barbarie260a5632019-02-26 16:12:49 -050078func (s *StatsdSink) AddSampleWithLabels(key []string, val float32, labels []Label) {
79 flatKey := s.flattenKeyLabels(key, labels)
80 s.pushMetric(fmt.Sprintf("%s:%f|ms\n", flatKey, val))
81}
82
khenaidooac637102019-01-14 15:44:34 -050083// Flattens the key for formatting, removes spaces
84func (s *StatsdSink) flattenKey(parts []string) string {
85 joined := strings.Join(parts, ".")
86 return strings.Map(func(r rune) rune {
87 switch r {
88 case ':':
89 fallthrough
90 case ' ':
91 return '_'
92 default:
93 return r
94 }
95 }, joined)
96}
97
Stephane Barbarie260a5632019-02-26 16:12:49 -050098// Flattens the key along with labels for formatting, removes spaces
99func (s *StatsdSink) flattenKeyLabels(parts []string, labels []Label) string {
100 for _, label := range labels {
101 parts = append(parts, label.Value)
102 }
103 return s.flattenKey(parts)
104}
105
khenaidooac637102019-01-14 15:44:34 -0500106// Does a non-blocking push to the metrics queue
107func (s *StatsdSink) pushMetric(m string) {
108 select {
109 case s.metricQueue <- m:
110 default:
111 }
112}
113
114// Flushes metrics
115func (s *StatsdSink) flushMetrics() {
116 var sock net.Conn
117 var err error
118 var wait <-chan time.Time
119 ticker := time.NewTicker(flushInterval)
120 defer ticker.Stop()
121
122CONNECT:
123 // Create a buffer
124 buf := bytes.NewBuffer(nil)
125
126 // Attempt to connect
127 sock, err = net.Dial("udp", s.addr)
128 if err != nil {
129 log.Printf("[ERR] Error connecting to statsd! Err: %s", err)
130 goto WAIT
131 }
132
133 for {
134 select {
135 case metric, ok := <-s.metricQueue:
136 // Get a metric from the queue
137 if !ok {
138 goto QUIT
139 }
140
141 // Check if this would overflow the packet size
142 if len(metric)+buf.Len() > statsdMaxLen {
143 _, err := sock.Write(buf.Bytes())
144 buf.Reset()
145 if err != nil {
146 log.Printf("[ERR] Error writing to statsd! Err: %s", err)
147 goto WAIT
148 }
149 }
150
151 // Append to the buffer
152 buf.WriteString(metric)
153
154 case <-ticker.C:
155 if buf.Len() == 0 {
156 continue
157 }
158
159 _, err := sock.Write(buf.Bytes())
160 buf.Reset()
161 if err != nil {
162 log.Printf("[ERR] Error flushing to statsd! Err: %s", err)
163 goto WAIT
164 }
165 }
166 }
167
168WAIT:
169 // Wait for a while
170 wait = time.After(time.Duration(5) * time.Second)
171 for {
172 select {
173 // Dequeue the messages to avoid backlog
174 case _, ok := <-s.metricQueue:
175 if !ok {
176 goto QUIT
177 }
178 case <-wait:
179 goto CONNECT
180 }
181 }
182QUIT:
183 s.metricQueue = nil
184}