| package metrics |
| |
| import ( |
| "bufio" |
| "fmt" |
| "log" |
| "net" |
| "net/url" |
| "strings" |
| "time" |
| ) |
| |
| const ( |
| // We force flush the statsite metrics after this period of |
| // inactivity. Prevents stats from getting stuck in a buffer |
| // forever. |
| flushInterval = 100 * time.Millisecond |
| ) |
| |
| // NewStatsiteSinkFromURL creates an StatsiteSink from a URL. It is used |
| // (and tested) from NewMetricSinkFromURL. |
| func NewStatsiteSinkFromURL(u *url.URL) (MetricSink, error) { |
| return NewStatsiteSink(u.Host) |
| } |
| |
| // StatsiteSink provides a MetricSink that can be used with a |
| // statsite metrics server |
| type StatsiteSink struct { |
| addr string |
| metricQueue chan string |
| } |
| |
| // NewStatsiteSink is used to create a new StatsiteSink |
| func NewStatsiteSink(addr string) (*StatsiteSink, error) { |
| s := &StatsiteSink{ |
| addr: addr, |
| metricQueue: make(chan string, 4096), |
| } |
| go s.flushMetrics() |
| return s, nil |
| } |
| |
| // Close is used to stop flushing to statsite |
| func (s *StatsiteSink) Shutdown() { |
| close(s.metricQueue) |
| } |
| |
| func (s *StatsiteSink) SetGauge(key []string, val float32) { |
| flatKey := s.flattenKey(key) |
| s.pushMetric(fmt.Sprintf("%s:%f|g\n", flatKey, val)) |
| } |
| |
| func (s *StatsiteSink) SetGaugeWithLabels(key []string, val float32, labels []Label) { |
| flatKey := s.flattenKeyLabels(key, labels) |
| s.pushMetric(fmt.Sprintf("%s:%f|g\n", flatKey, val)) |
| } |
| |
| func (s *StatsiteSink) EmitKey(key []string, val float32) { |
| flatKey := s.flattenKey(key) |
| s.pushMetric(fmt.Sprintf("%s:%f|kv\n", flatKey, val)) |
| } |
| |
| func (s *StatsiteSink) IncrCounter(key []string, val float32) { |
| flatKey := s.flattenKey(key) |
| s.pushMetric(fmt.Sprintf("%s:%f|c\n", flatKey, val)) |
| } |
| |
| func (s *StatsiteSink) IncrCounterWithLabels(key []string, val float32, labels []Label) { |
| flatKey := s.flattenKeyLabels(key, labels) |
| s.pushMetric(fmt.Sprintf("%s:%f|c\n", flatKey, val)) |
| } |
| |
| func (s *StatsiteSink) AddSample(key []string, val float32) { |
| flatKey := s.flattenKey(key) |
| s.pushMetric(fmt.Sprintf("%s:%f|ms\n", flatKey, val)) |
| } |
| |
| func (s *StatsiteSink) AddSampleWithLabels(key []string, val float32, labels []Label) { |
| flatKey := s.flattenKeyLabels(key, labels) |
| s.pushMetric(fmt.Sprintf("%s:%f|ms\n", flatKey, val)) |
| } |
| |
| // Flattens the key for formatting, removes spaces |
| func (s *StatsiteSink) flattenKey(parts []string) string { |
| joined := strings.Join(parts, ".") |
| return strings.Map(func(r rune) rune { |
| switch r { |
| case ':': |
| fallthrough |
| case ' ': |
| return '_' |
| default: |
| return r |
| } |
| }, joined) |
| } |
| |
| // Flattens the key along with labels for formatting, removes spaces |
| func (s *StatsiteSink) flattenKeyLabels(parts []string, labels []Label) string { |
| for _, label := range labels { |
| parts = append(parts, label.Value) |
| } |
| return s.flattenKey(parts) |
| } |
| |
| // Does a non-blocking push to the metrics queue |
| func (s *StatsiteSink) pushMetric(m string) { |
| select { |
| case s.metricQueue <- m: |
| default: |
| } |
| } |
| |
| // Flushes metrics |
| func (s *StatsiteSink) flushMetrics() { |
| var sock net.Conn |
| var err error |
| var wait <-chan time.Time |
| var buffered *bufio.Writer |
| ticker := time.NewTicker(flushInterval) |
| defer ticker.Stop() |
| |
| CONNECT: |
| // Attempt to connect |
| sock, err = net.Dial("tcp", s.addr) |
| if err != nil { |
| log.Printf("[ERR] Error connecting to statsite! Err: %s", err) |
| goto WAIT |
| } |
| |
| // Create a buffered writer |
| buffered = bufio.NewWriter(sock) |
| |
| for { |
| select { |
| case metric, ok := <-s.metricQueue: |
| // Get a metric from the queue |
| if !ok { |
| goto QUIT |
| } |
| |
| // Try to send to statsite |
| _, err := buffered.Write([]byte(metric)) |
| if err != nil { |
| log.Printf("[ERR] Error writing to statsite! Err: %s", err) |
| goto WAIT |
| } |
| case <-ticker.C: |
| if err := buffered.Flush(); err != nil { |
| log.Printf("[ERR] Error flushing to statsite! Err: %s", err) |
| goto WAIT |
| } |
| } |
| } |
| |
| WAIT: |
| // Wait for a while |
| wait = time.After(time.Duration(5) * time.Second) |
| for { |
| select { |
| // Dequeue the messages to avoid backlog |
| case _, ok := <-s.metricQueue: |
| if !ok { |
| goto QUIT |
| } |
| case <-wait: |
| goto CONNECT |
| } |
| } |
| QUIT: |
| s.metricQueue = nil |
| } |