blob: 6c0d284d2ddb8b1d1642e4f64ff1fd9b609a31dd [file] [log] [blame]
divyadesai81bb7ba2020-03-11 11:45:23 +00001package metrics
2
3import (
4 "bufio"
5 "fmt"
6 "log"
7 "net"
8 "net/url"
9 "strings"
10 "time"
11)
12
13const (
14 // We force flush the statsite metrics after this period of
15 // inactivity. Prevents stats from getting stuck in a buffer
16 // forever.
17 flushInterval = 100 * time.Millisecond
18)
19
20// NewStatsiteSinkFromURL creates an StatsiteSink from a URL. It is used
21// (and tested) from NewMetricSinkFromURL.
22func NewStatsiteSinkFromURL(u *url.URL) (MetricSink, error) {
23 return NewStatsiteSink(u.Host)
24}
25
26// StatsiteSink provides a MetricSink that can be used with a
27// statsite metrics server
28type StatsiteSink struct {
29 addr string
30 metricQueue chan string
31}
32
33// NewStatsiteSink is used to create a new StatsiteSink
34func NewStatsiteSink(addr string) (*StatsiteSink, error) {
35 s := &StatsiteSink{
36 addr: addr,
37 metricQueue: make(chan string, 4096),
38 }
39 go s.flushMetrics()
40 return s, nil
41}
42
43// Close is used to stop flushing to statsite
44func (s *StatsiteSink) Shutdown() {
45 close(s.metricQueue)
46}
47
48func (s *StatsiteSink) SetGauge(key []string, val float32) {
49 flatKey := s.flattenKey(key)
50 s.pushMetric(fmt.Sprintf("%s:%f|g\n", flatKey, val))
51}
52
53func (s *StatsiteSink) SetGaugeWithLabels(key []string, val float32, labels []Label) {
54 flatKey := s.flattenKeyLabels(key, labels)
55 s.pushMetric(fmt.Sprintf("%s:%f|g\n", flatKey, val))
56}
57
58func (s *StatsiteSink) EmitKey(key []string, val float32) {
59 flatKey := s.flattenKey(key)
60 s.pushMetric(fmt.Sprintf("%s:%f|kv\n", flatKey, val))
61}
62
63func (s *StatsiteSink) IncrCounter(key []string, val float32) {
64 flatKey := s.flattenKey(key)
65 s.pushMetric(fmt.Sprintf("%s:%f|c\n", flatKey, val))
66}
67
68func (s *StatsiteSink) IncrCounterWithLabels(key []string, val float32, labels []Label) {
69 flatKey := s.flattenKeyLabels(key, labels)
70 s.pushMetric(fmt.Sprintf("%s:%f|c\n", flatKey, val))
71}
72
73func (s *StatsiteSink) AddSample(key []string, val float32) {
74 flatKey := s.flattenKey(key)
75 s.pushMetric(fmt.Sprintf("%s:%f|ms\n", flatKey, val))
76}
77
78func (s *StatsiteSink) AddSampleWithLabels(key []string, val float32, labels []Label) {
79 flatKey := s.flattenKeyLabels(key, labels)
80 s.pushMetric(fmt.Sprintf("%s:%f|ms\n", flatKey, val))
81}
82
83// Flattens the key for formatting, removes spaces
84func (s *StatsiteSink) flattenKey(parts []string) string {
85 joined := strings.Join(parts, ".")
86 return strings.Map(func(r rune) rune {
87 switch r {
88 case ':':
89 fallthrough
90 case ' ':
91 return '_'
92 default:
93 return r
94 }
95 }, joined)
96}
97
98// Flattens the key along with labels for formatting, removes spaces
99func (s *StatsiteSink) flattenKeyLabels(parts []string, labels []Label) string {
100 for _, label := range labels {
101 parts = append(parts, label.Value)
102 }
103 return s.flattenKey(parts)
104}
105
106// Does a non-blocking push to the metrics queue
107func (s *StatsiteSink) pushMetric(m string) {
108 select {
109 case s.metricQueue <- m:
110 default:
111 }
112}
113
114// Flushes metrics
115func (s *StatsiteSink) flushMetrics() {
116 var sock net.Conn
117 var err error
118 var wait <-chan time.Time
119 var buffered *bufio.Writer
120 ticker := time.NewTicker(flushInterval)
121 defer ticker.Stop()
122
123CONNECT:
124 // Attempt to connect
125 sock, err = net.Dial("tcp", s.addr)
126 if err != nil {
127 log.Printf("[ERR] Error connecting to statsite! Err: %s", err)
128 goto WAIT
129 }
130
131 // Create a buffered writer
132 buffered = bufio.NewWriter(sock)
133
134 for {
135 select {
136 case metric, ok := <-s.metricQueue:
137 // Get a metric from the queue
138 if !ok {
139 goto QUIT
140 }
141
142 // Try to send to statsite
143 _, err := buffered.Write([]byte(metric))
144 if err != nil {
145 log.Printf("[ERR] Error writing to statsite! Err: %s", err)
146 goto WAIT
147 }
148 case <-ticker.C:
149 if err := buffered.Flush(); err != nil {
150 log.Printf("[ERR] Error flushing to statsite! Err: %s", err)
151 goto WAIT
152 }
153 }
154 }
155
156WAIT:
157 // Wait for a while
158 wait = time.After(time.Duration(5) * time.Second)
159 for {
160 select {
161 // Dequeue the messages to avoid backlog
162 case _, ok := <-s.metricQueue:
163 if !ok {
164 goto QUIT
165 }
166 case <-wait:
167 goto CONNECT
168 }
169 }
170QUIT:
171 s.metricQueue = nil
172}