VOL-1868 move simulated onu from voltha-go to voltha-simonu-adapter
Sourced from voltha-go commit 251a11c0ffe60512318a644cd6ce0dc4e12f4018
Change-Id: Iab179bc2f3dd772ed7f488d1c03d1a84ba75e874
diff --git a/vendor/github.com/armon/go-metrics/.gitignore b/vendor/github.com/armon/go-metrics/.gitignore
new file mode 100644
index 0000000..8c03ec1
--- /dev/null
+++ b/vendor/github.com/armon/go-metrics/.gitignore
@@ -0,0 +1,24 @@
+# Compiled Object files, Static and Dynamic libs (Shared Objects)
+*.o
+*.a
+*.so
+
+# Folders
+_obj
+_test
+
+# Architecture specific extensions/prefixes
+*.[568vq]
+[568vq].out
+
+*.cgo1.go
+*.cgo2.c
+_cgo_defun.c
+_cgo_gotypes.go
+_cgo_export.*
+
+_testmain.go
+
+*.exe
+
+/metrics.out
diff --git a/vendor/github.com/armon/go-metrics/LICENSE b/vendor/github.com/armon/go-metrics/LICENSE
new file mode 100644
index 0000000..106569e
--- /dev/null
+++ b/vendor/github.com/armon/go-metrics/LICENSE
@@ -0,0 +1,20 @@
+The MIT License (MIT)
+
+Copyright (c) 2013 Armon Dadgar
+
+Permission is hereby granted, free of charge, to any person obtaining a copy of
+this software and associated documentation files (the "Software"), to deal in
+the Software without restriction, including without limitation the rights to
+use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
+the Software, and to permit persons to whom the Software is furnished to do so,
+subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
+FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
+IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
diff --git a/vendor/github.com/armon/go-metrics/README.md b/vendor/github.com/armon/go-metrics/README.md
new file mode 100644
index 0000000..aa73348
--- /dev/null
+++ b/vendor/github.com/armon/go-metrics/README.md
@@ -0,0 +1,91 @@
+go-metrics
+==========
+
+This library provides a `metrics` package which can be used to instrument code,
+expose application metrics, and profile runtime performance in a flexible manner.
+
+Current API: [![GoDoc](https://godoc.org/github.com/armon/go-metrics?status.svg)](https://godoc.org/github.com/armon/go-metrics)
+
+Sinks
+-----
+
+The `metrics` package makes use of a `MetricSink` interface to support delivery
+to any type of backend. Currently the following sinks are provided:
+
+* StatsiteSink : Sinks to a [statsite](https://github.com/armon/statsite/) instance (TCP)
+* StatsdSink: Sinks to a [StatsD](https://github.com/etsy/statsd/) / statsite instance (UDP)
+* PrometheusSink: Sinks to a [Prometheus](http://prometheus.io/) metrics endpoint (exposed via HTTP for scrapes)
+* InmemSink : Provides in-memory aggregation, can be used to export stats
+* FanoutSink : Sinks to multiple sinks. Enables writing to multiple statsite instances for example.
+* BlackholeSink : Sinks to nowhere
+
+In addition to the sinks, the `InmemSignal` can be used to catch a signal,
+and dump a formatted output of recent metrics. For example, when a process gets
+a SIGUSR1, it can dump to stderr recent performance metrics for debugging.
+
+Labels
+------
+
+Most metrics do have an equivalent ending with `WithLabels`, such methods
+allow to push metrics with labels and use some features of underlying Sinks
+(ex: translated into Prometheus labels).
+
+Since some of these labels may increase greatly cardinality of metrics, the
+library allow to filter labels using a blacklist/whitelist filtering system
+which is global to all metrics.
+
+* If `Config.AllowedLabels` is not nil, then only labels specified in this value will be sent to underlying Sink, otherwise, all labels are sent by default.
+* If `Config.BlockedLabels` is not nil, any label specified in this value will not be sent to underlying Sinks.
+
+By default, both `Config.AllowedLabels` and `Config.BlockedLabels` are nil, meaning that
+no tags are filetered at all, but it allow to a user to globally block some tags with high
+cardinality at application level.
+
+Examples
+--------
+
+Here is an example of using the package:
+
+```go
+func SlowMethod() {
+ // Profiling the runtime of a method
+ defer metrics.MeasureSince([]string{"SlowMethod"}, time.Now())
+}
+
+// Configure a statsite sink as the global metrics sink
+sink, _ := metrics.NewStatsiteSink("statsite:8125")
+metrics.NewGlobal(metrics.DefaultConfig("service-name"), sink)
+
+// Emit a Key/Value pair
+metrics.EmitKey([]string{"questions", "meaning of life"}, 42)
+```
+
+Here is an example of setting up a signal handler:
+
+```go
+// Setup the inmem sink and signal handler
+inm := metrics.NewInmemSink(10*time.Second, time.Minute)
+sig := metrics.DefaultInmemSignal(inm)
+metrics.NewGlobal(metrics.DefaultConfig("service-name"), inm)
+
+// Run some code
+inm.SetGauge([]string{"foo"}, 42)
+inm.EmitKey([]string{"bar"}, 30)
+
+inm.IncrCounter([]string{"baz"}, 42)
+inm.IncrCounter([]string{"baz"}, 1)
+inm.IncrCounter([]string{"baz"}, 80)
+
+inm.AddSample([]string{"method", "wow"}, 42)
+inm.AddSample([]string{"method", "wow"}, 100)
+inm.AddSample([]string{"method", "wow"}, 22)
+
+....
+```
+
+When a signal comes in, output like the following will be dumped to stderr:
+
+ [2014-01-28 14:57:33.04 -0800 PST][G] 'foo': 42.000
+ [2014-01-28 14:57:33.04 -0800 PST][P] 'bar': 30.000
+ [2014-01-28 14:57:33.04 -0800 PST][C] 'baz': Count: 3 Min: 1.000 Mean: 41.000 Max: 80.000 Stddev: 39.509
+ [2014-01-28 14:57:33.04 -0800 PST][S] 'method.wow': Count: 3 Min: 22.000 Mean: 54.667 Max: 100.000 Stddev: 40.513
\ No newline at end of file
diff --git a/vendor/github.com/armon/go-metrics/const_unix.go b/vendor/github.com/armon/go-metrics/const_unix.go
new file mode 100644
index 0000000..31098dd
--- /dev/null
+++ b/vendor/github.com/armon/go-metrics/const_unix.go
@@ -0,0 +1,12 @@
+// +build !windows
+
+package metrics
+
+import (
+ "syscall"
+)
+
+const (
+ // DefaultSignal is used with DefaultInmemSignal
+ DefaultSignal = syscall.SIGUSR1
+)
diff --git a/vendor/github.com/armon/go-metrics/const_windows.go b/vendor/github.com/armon/go-metrics/const_windows.go
new file mode 100644
index 0000000..38136af
--- /dev/null
+++ b/vendor/github.com/armon/go-metrics/const_windows.go
@@ -0,0 +1,13 @@
+// +build windows
+
+package metrics
+
+import (
+ "syscall"
+)
+
+const (
+ // DefaultSignal is used with DefaultInmemSignal
+ // Windows has no SIGUSR1, use SIGBREAK
+ DefaultSignal = syscall.Signal(21)
+)
diff --git a/vendor/github.com/armon/go-metrics/inmem.go b/vendor/github.com/armon/go-metrics/inmem.go
new file mode 100644
index 0000000..4e2d6a7
--- /dev/null
+++ b/vendor/github.com/armon/go-metrics/inmem.go
@@ -0,0 +1,348 @@
+package metrics
+
+import (
+ "bytes"
+ "fmt"
+ "math"
+ "net/url"
+ "strings"
+ "sync"
+ "time"
+)
+
+// InmemSink provides a MetricSink that does in-memory aggregation
+// without sending metrics over a network. It can be embedded within
+// an application to provide profiling information.
+type InmemSink struct {
+ // How long is each aggregation interval
+ interval time.Duration
+
+ // Retain controls how many metrics interval we keep
+ retain time.Duration
+
+ // maxIntervals is the maximum length of intervals.
+ // It is retain / interval.
+ maxIntervals int
+
+ // intervals is a slice of the retained intervals
+ intervals []*IntervalMetrics
+ intervalLock sync.RWMutex
+
+ rateDenom float64
+}
+
+// IntervalMetrics stores the aggregated metrics
+// for a specific interval
+type IntervalMetrics struct {
+ sync.RWMutex
+
+ // The start time of the interval
+ Interval time.Time
+
+ // Gauges maps the key to the last set value
+ Gauges map[string]GaugeValue
+
+ // Points maps the string to the list of emitted values
+ // from EmitKey
+ Points map[string][]float32
+
+ // Counters maps the string key to a sum of the counter
+ // values
+ Counters map[string]SampledValue
+
+ // Samples maps the key to an AggregateSample,
+ // which has the rolled up view of a sample
+ Samples map[string]SampledValue
+}
+
+// NewIntervalMetrics creates a new IntervalMetrics for a given interval
+func NewIntervalMetrics(intv time.Time) *IntervalMetrics {
+ return &IntervalMetrics{
+ Interval: intv,
+ Gauges: make(map[string]GaugeValue),
+ Points: make(map[string][]float32),
+ Counters: make(map[string]SampledValue),
+ Samples: make(map[string]SampledValue),
+ }
+}
+
+// AggregateSample is used to hold aggregate metrics
+// about a sample
+type AggregateSample struct {
+ Count int // The count of emitted pairs
+ Rate float64 // The values rate per time unit (usually 1 second)
+ Sum float64 // The sum of values
+ SumSq float64 `json:"-"` // The sum of squared values
+ Min float64 // Minimum value
+ Max float64 // Maximum value
+ LastUpdated time.Time `json:"-"` // When value was last updated
+}
+
+// Computes a Stddev of the values
+func (a *AggregateSample) Stddev() float64 {
+ num := (float64(a.Count) * a.SumSq) - math.Pow(a.Sum, 2)
+ div := float64(a.Count * (a.Count - 1))
+ if div == 0 {
+ return 0
+ }
+ return math.Sqrt(num / div)
+}
+
+// Computes a mean of the values
+func (a *AggregateSample) Mean() float64 {
+ if a.Count == 0 {
+ return 0
+ }
+ return a.Sum / float64(a.Count)
+}
+
+// Ingest is used to update a sample
+func (a *AggregateSample) Ingest(v float64, rateDenom float64) {
+ a.Count++
+ a.Sum += v
+ a.SumSq += (v * v)
+ if v < a.Min || a.Count == 1 {
+ a.Min = v
+ }
+ if v > a.Max || a.Count == 1 {
+ a.Max = v
+ }
+ a.Rate = float64(a.Sum) / rateDenom
+ a.LastUpdated = time.Now()
+}
+
+func (a *AggregateSample) String() string {
+ if a.Count == 0 {
+ return "Count: 0"
+ } else if a.Stddev() == 0 {
+ return fmt.Sprintf("Count: %d Sum: %0.3f LastUpdated: %s", a.Count, a.Sum, a.LastUpdated)
+ } else {
+ return fmt.Sprintf("Count: %d Min: %0.3f Mean: %0.3f Max: %0.3f Stddev: %0.3f Sum: %0.3f LastUpdated: %s",
+ a.Count, a.Min, a.Mean(), a.Max, a.Stddev(), a.Sum, a.LastUpdated)
+ }
+}
+
+// NewInmemSinkFromURL creates an InmemSink from a URL. It is used
+// (and tested) from NewMetricSinkFromURL.
+func NewInmemSinkFromURL(u *url.URL) (MetricSink, error) {
+ params := u.Query()
+
+ interval, err := time.ParseDuration(params.Get("interval"))
+ if err != nil {
+ return nil, fmt.Errorf("Bad 'interval' param: %s", err)
+ }
+
+ retain, err := time.ParseDuration(params.Get("retain"))
+ if err != nil {
+ return nil, fmt.Errorf("Bad 'retain' param: %s", err)
+ }
+
+ return NewInmemSink(interval, retain), nil
+}
+
+// NewInmemSink is used to construct a new in-memory sink.
+// Uses an aggregation interval and maximum retention period.
+func NewInmemSink(interval, retain time.Duration) *InmemSink {
+ rateTimeUnit := time.Second
+ i := &InmemSink{
+ interval: interval,
+ retain: retain,
+ maxIntervals: int(retain / interval),
+ rateDenom: float64(interval.Nanoseconds()) / float64(rateTimeUnit.Nanoseconds()),
+ }
+ i.intervals = make([]*IntervalMetrics, 0, i.maxIntervals)
+ return i
+}
+
+func (i *InmemSink) SetGauge(key []string, val float32) {
+ i.SetGaugeWithLabels(key, val, nil)
+}
+
+func (i *InmemSink) SetGaugeWithLabels(key []string, val float32, labels []Label) {
+ k, name := i.flattenKeyLabels(key, labels)
+ intv := i.getInterval()
+
+ intv.Lock()
+ defer intv.Unlock()
+ intv.Gauges[k] = GaugeValue{Name: name, Value: val, Labels: labels}
+}
+
+func (i *InmemSink) EmitKey(key []string, val float32) {
+ k := i.flattenKey(key)
+ intv := i.getInterval()
+
+ intv.Lock()
+ defer intv.Unlock()
+ vals := intv.Points[k]
+ intv.Points[k] = append(vals, val)
+}
+
+func (i *InmemSink) IncrCounter(key []string, val float32) {
+ i.IncrCounterWithLabels(key, val, nil)
+}
+
+func (i *InmemSink) IncrCounterWithLabels(key []string, val float32, labels []Label) {
+ k, name := i.flattenKeyLabels(key, labels)
+ intv := i.getInterval()
+
+ intv.Lock()
+ defer intv.Unlock()
+
+ agg, ok := intv.Counters[k]
+ if !ok {
+ agg = SampledValue{
+ Name: name,
+ AggregateSample: &AggregateSample{},
+ Labels: labels,
+ }
+ intv.Counters[k] = agg
+ }
+ agg.Ingest(float64(val), i.rateDenom)
+}
+
+func (i *InmemSink) AddSample(key []string, val float32) {
+ i.AddSampleWithLabels(key, val, nil)
+}
+
+func (i *InmemSink) AddSampleWithLabels(key []string, val float32, labels []Label) {
+ k, name := i.flattenKeyLabels(key, labels)
+ intv := i.getInterval()
+
+ intv.Lock()
+ defer intv.Unlock()
+
+ agg, ok := intv.Samples[k]
+ if !ok {
+ agg = SampledValue{
+ Name: name,
+ AggregateSample: &AggregateSample{},
+ Labels: labels,
+ }
+ intv.Samples[k] = agg
+ }
+ agg.Ingest(float64(val), i.rateDenom)
+}
+
+// Data is used to retrieve all the aggregated metrics
+// Intervals may be in use, and a read lock should be acquired
+func (i *InmemSink) Data() []*IntervalMetrics {
+ // Get the current interval, forces creation
+ i.getInterval()
+
+ i.intervalLock.RLock()
+ defer i.intervalLock.RUnlock()
+
+ n := len(i.intervals)
+ intervals := make([]*IntervalMetrics, n)
+
+ copy(intervals[:n-1], i.intervals[:n-1])
+ current := i.intervals[n-1]
+
+ // make its own copy for current interval
+ intervals[n-1] = &IntervalMetrics{}
+ copyCurrent := intervals[n-1]
+ current.RLock()
+ *copyCurrent = *current
+
+ copyCurrent.Gauges = make(map[string]GaugeValue, len(current.Gauges))
+ for k, v := range current.Gauges {
+ copyCurrent.Gauges[k] = v
+ }
+ // saved values will be not change, just copy its link
+ copyCurrent.Points = make(map[string][]float32, len(current.Points))
+ for k, v := range current.Points {
+ copyCurrent.Points[k] = v
+ }
+ copyCurrent.Counters = make(map[string]SampledValue, len(current.Counters))
+ for k, v := range current.Counters {
+ copyCurrent.Counters[k] = v
+ }
+ copyCurrent.Samples = make(map[string]SampledValue, len(current.Samples))
+ for k, v := range current.Samples {
+ copyCurrent.Samples[k] = v
+ }
+ current.RUnlock()
+
+ return intervals
+}
+
+func (i *InmemSink) getExistingInterval(intv time.Time) *IntervalMetrics {
+ i.intervalLock.RLock()
+ defer i.intervalLock.RUnlock()
+
+ n := len(i.intervals)
+ if n > 0 && i.intervals[n-1].Interval == intv {
+ return i.intervals[n-1]
+ }
+ return nil
+}
+
+func (i *InmemSink) createInterval(intv time.Time) *IntervalMetrics {
+ i.intervalLock.Lock()
+ defer i.intervalLock.Unlock()
+
+ // Check for an existing interval
+ n := len(i.intervals)
+ if n > 0 && i.intervals[n-1].Interval == intv {
+ return i.intervals[n-1]
+ }
+
+ // Add the current interval
+ current := NewIntervalMetrics(intv)
+ i.intervals = append(i.intervals, current)
+ n++
+
+ // Truncate the intervals if they are too long
+ if n >= i.maxIntervals {
+ copy(i.intervals[0:], i.intervals[n-i.maxIntervals:])
+ i.intervals = i.intervals[:i.maxIntervals]
+ }
+ return current
+}
+
+// getInterval returns the current interval to write to
+func (i *InmemSink) getInterval() *IntervalMetrics {
+ intv := time.Now().Truncate(i.interval)
+ if m := i.getExistingInterval(intv); m != nil {
+ return m
+ }
+ return i.createInterval(intv)
+}
+
+// Flattens the key for formatting, removes spaces
+func (i *InmemSink) flattenKey(parts []string) string {
+ buf := &bytes.Buffer{}
+ replacer := strings.NewReplacer(" ", "_")
+
+ if len(parts) > 0 {
+ replacer.WriteString(buf, parts[0])
+ }
+ for _, part := range parts[1:] {
+ replacer.WriteString(buf, ".")
+ replacer.WriteString(buf, part)
+ }
+
+ return buf.String()
+}
+
+// Flattens the key for formatting along with its labels, removes spaces
+func (i *InmemSink) flattenKeyLabels(parts []string, labels []Label) (string, string) {
+ buf := &bytes.Buffer{}
+ replacer := strings.NewReplacer(" ", "_")
+
+ if len(parts) > 0 {
+ replacer.WriteString(buf, parts[0])
+ }
+ for _, part := range parts[1:] {
+ replacer.WriteString(buf, ".")
+ replacer.WriteString(buf, part)
+ }
+
+ key := buf.String()
+
+ for _, label := range labels {
+ replacer.WriteString(buf, fmt.Sprintf(";%s=%s", label.Name, label.Value))
+ }
+
+ return buf.String(), key
+}
diff --git a/vendor/github.com/armon/go-metrics/inmem_endpoint.go b/vendor/github.com/armon/go-metrics/inmem_endpoint.go
new file mode 100644
index 0000000..504f1b3
--- /dev/null
+++ b/vendor/github.com/armon/go-metrics/inmem_endpoint.go
@@ -0,0 +1,118 @@
+package metrics
+
+import (
+ "fmt"
+ "net/http"
+ "sort"
+ "time"
+)
+
+// MetricsSummary holds a roll-up of metrics info for a given interval
+type MetricsSummary struct {
+ Timestamp string
+ Gauges []GaugeValue
+ Points []PointValue
+ Counters []SampledValue
+ Samples []SampledValue
+}
+
+type GaugeValue struct {
+ Name string
+ Hash string `json:"-"`
+ Value float32
+
+ Labels []Label `json:"-"`
+ DisplayLabels map[string]string `json:"Labels"`
+}
+
+type PointValue struct {
+ Name string
+ Points []float32
+}
+
+type SampledValue struct {
+ Name string
+ Hash string `json:"-"`
+ *AggregateSample
+ Mean float64
+ Stddev float64
+
+ Labels []Label `json:"-"`
+ DisplayLabels map[string]string `json:"Labels"`
+}
+
+// DisplayMetrics returns a summary of the metrics from the most recent finished interval.
+func (i *InmemSink) DisplayMetrics(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
+ data := i.Data()
+
+ var interval *IntervalMetrics
+ n := len(data)
+ switch {
+ case n == 0:
+ return nil, fmt.Errorf("no metric intervals have been initialized yet")
+ case n == 1:
+ // Show the current interval if it's all we have
+ interval = i.intervals[0]
+ default:
+ // Show the most recent finished interval if we have one
+ interval = i.intervals[n-2]
+ }
+
+ summary := MetricsSummary{
+ Timestamp: interval.Interval.Round(time.Second).UTC().String(),
+ Gauges: make([]GaugeValue, 0, len(interval.Gauges)),
+ Points: make([]PointValue, 0, len(interval.Points)),
+ }
+
+ // Format and sort the output of each metric type, so it gets displayed in a
+ // deterministic order.
+ for name, points := range interval.Points {
+ summary.Points = append(summary.Points, PointValue{name, points})
+ }
+ sort.Slice(summary.Points, func(i, j int) bool {
+ return summary.Points[i].Name < summary.Points[j].Name
+ })
+
+ for hash, value := range interval.Gauges {
+ value.Hash = hash
+ value.DisplayLabels = make(map[string]string)
+ for _, label := range value.Labels {
+ value.DisplayLabels[label.Name] = label.Value
+ }
+ value.Labels = nil
+
+ summary.Gauges = append(summary.Gauges, value)
+ }
+ sort.Slice(summary.Gauges, func(i, j int) bool {
+ return summary.Gauges[i].Hash < summary.Gauges[j].Hash
+ })
+
+ summary.Counters = formatSamples(interval.Counters)
+ summary.Samples = formatSamples(interval.Samples)
+
+ return summary, nil
+}
+
+func formatSamples(source map[string]SampledValue) []SampledValue {
+ output := make([]SampledValue, 0, len(source))
+ for hash, sample := range source {
+ displayLabels := make(map[string]string)
+ for _, label := range sample.Labels {
+ displayLabels[label.Name] = label.Value
+ }
+
+ output = append(output, SampledValue{
+ Name: sample.Name,
+ Hash: hash,
+ AggregateSample: sample.AggregateSample,
+ Mean: sample.AggregateSample.Mean(),
+ Stddev: sample.AggregateSample.Stddev(),
+ DisplayLabels: displayLabels,
+ })
+ }
+ sort.Slice(output, func(i, j int) bool {
+ return output[i].Hash < output[j].Hash
+ })
+
+ return output
+}
diff --git a/vendor/github.com/armon/go-metrics/inmem_signal.go b/vendor/github.com/armon/go-metrics/inmem_signal.go
new file mode 100644
index 0000000..0937f4a
--- /dev/null
+++ b/vendor/github.com/armon/go-metrics/inmem_signal.go
@@ -0,0 +1,117 @@
+package metrics
+
+import (
+ "bytes"
+ "fmt"
+ "io"
+ "os"
+ "os/signal"
+ "strings"
+ "sync"
+ "syscall"
+)
+
+// InmemSignal is used to listen for a given signal, and when received,
+// to dump the current metrics from the InmemSink to an io.Writer
+type InmemSignal struct {
+ signal syscall.Signal
+ inm *InmemSink
+ w io.Writer
+ sigCh chan os.Signal
+
+ stop bool
+ stopCh chan struct{}
+ stopLock sync.Mutex
+}
+
+// NewInmemSignal creates a new InmemSignal which listens for a given signal,
+// and dumps the current metrics out to a writer
+func NewInmemSignal(inmem *InmemSink, sig syscall.Signal, w io.Writer) *InmemSignal {
+ i := &InmemSignal{
+ signal: sig,
+ inm: inmem,
+ w: w,
+ sigCh: make(chan os.Signal, 1),
+ stopCh: make(chan struct{}),
+ }
+ signal.Notify(i.sigCh, sig)
+ go i.run()
+ return i
+}
+
+// DefaultInmemSignal returns a new InmemSignal that responds to SIGUSR1
+// and writes output to stderr. Windows uses SIGBREAK
+func DefaultInmemSignal(inmem *InmemSink) *InmemSignal {
+ return NewInmemSignal(inmem, DefaultSignal, os.Stderr)
+}
+
+// Stop is used to stop the InmemSignal from listening
+func (i *InmemSignal) Stop() {
+ i.stopLock.Lock()
+ defer i.stopLock.Unlock()
+
+ if i.stop {
+ return
+ }
+ i.stop = true
+ close(i.stopCh)
+ signal.Stop(i.sigCh)
+}
+
+// run is a long running routine that handles signals
+func (i *InmemSignal) run() {
+ for {
+ select {
+ case <-i.sigCh:
+ i.dumpStats()
+ case <-i.stopCh:
+ return
+ }
+ }
+}
+
+// dumpStats is used to dump the data to output writer
+func (i *InmemSignal) dumpStats() {
+ buf := bytes.NewBuffer(nil)
+
+ data := i.inm.Data()
+ // Skip the last period which is still being aggregated
+ for j := 0; j < len(data)-1; j++ {
+ intv := data[j]
+ intv.RLock()
+ for _, val := range intv.Gauges {
+ name := i.flattenLabels(val.Name, val.Labels)
+ fmt.Fprintf(buf, "[%v][G] '%s': %0.3f\n", intv.Interval, name, val.Value)
+ }
+ for name, vals := range intv.Points {
+ for _, val := range vals {
+ fmt.Fprintf(buf, "[%v][P] '%s': %0.3f\n", intv.Interval, name, val)
+ }
+ }
+ for _, agg := range intv.Counters {
+ name := i.flattenLabels(agg.Name, agg.Labels)
+ fmt.Fprintf(buf, "[%v][C] '%s': %s\n", intv.Interval, name, agg.AggregateSample)
+ }
+ for _, agg := range intv.Samples {
+ name := i.flattenLabels(agg.Name, agg.Labels)
+ fmt.Fprintf(buf, "[%v][S] '%s': %s\n", intv.Interval, name, agg.AggregateSample)
+ }
+ intv.RUnlock()
+ }
+
+ // Write out the bytes
+ i.w.Write(buf.Bytes())
+}
+
+// Flattens the key for formatting along with its labels, removes spaces
+func (i *InmemSignal) flattenLabels(name string, labels []Label) string {
+ buf := bytes.NewBufferString(name)
+ replacer := strings.NewReplacer(" ", "_", ":", "_")
+
+ for _, label := range labels {
+ replacer.WriteString(buf, ".")
+ replacer.WriteString(buf, label.Value)
+ }
+
+ return buf.String()
+}
diff --git a/vendor/github.com/armon/go-metrics/metrics.go b/vendor/github.com/armon/go-metrics/metrics.go
new file mode 100644
index 0000000..cf9def7
--- /dev/null
+++ b/vendor/github.com/armon/go-metrics/metrics.go
@@ -0,0 +1,278 @@
+package metrics
+
+import (
+ "runtime"
+ "strings"
+ "time"
+
+ "github.com/hashicorp/go-immutable-radix"
+)
+
+type Label struct {
+ Name string
+ Value string
+}
+
+func (m *Metrics) SetGauge(key []string, val float32) {
+ m.SetGaugeWithLabels(key, val, nil)
+}
+
+func (m *Metrics) SetGaugeWithLabels(key []string, val float32, labels []Label) {
+ if m.HostName != "" {
+ if m.EnableHostnameLabel {
+ labels = append(labels, Label{"host", m.HostName})
+ } else if m.EnableHostname {
+ key = insert(0, m.HostName, key)
+ }
+ }
+ if m.EnableTypePrefix {
+ key = insert(0, "gauge", key)
+ }
+ if m.ServiceName != "" {
+ if m.EnableServiceLabel {
+ labels = append(labels, Label{"service", m.ServiceName})
+ } else {
+ key = insert(0, m.ServiceName, key)
+ }
+ }
+ allowed, labelsFiltered := m.allowMetric(key, labels)
+ if !allowed {
+ return
+ }
+ m.sink.SetGaugeWithLabels(key, val, labelsFiltered)
+}
+
+func (m *Metrics) EmitKey(key []string, val float32) {
+ if m.EnableTypePrefix {
+ key = insert(0, "kv", key)
+ }
+ if m.ServiceName != "" {
+ key = insert(0, m.ServiceName, key)
+ }
+ allowed, _ := m.allowMetric(key, nil)
+ if !allowed {
+ return
+ }
+ m.sink.EmitKey(key, val)
+}
+
+func (m *Metrics) IncrCounter(key []string, val float32) {
+ m.IncrCounterWithLabels(key, val, nil)
+}
+
+func (m *Metrics) IncrCounterWithLabels(key []string, val float32, labels []Label) {
+ if m.HostName != "" && m.EnableHostnameLabel {
+ labels = append(labels, Label{"host", m.HostName})
+ }
+ if m.EnableTypePrefix {
+ key = insert(0, "counter", key)
+ }
+ if m.ServiceName != "" {
+ if m.EnableServiceLabel {
+ labels = append(labels, Label{"service", m.ServiceName})
+ } else {
+ key = insert(0, m.ServiceName, key)
+ }
+ }
+ allowed, labelsFiltered := m.allowMetric(key, labels)
+ if !allowed {
+ return
+ }
+ m.sink.IncrCounterWithLabels(key, val, labelsFiltered)
+}
+
+func (m *Metrics) AddSample(key []string, val float32) {
+ m.AddSampleWithLabels(key, val, nil)
+}
+
+func (m *Metrics) AddSampleWithLabels(key []string, val float32, labels []Label) {
+ if m.HostName != "" && m.EnableHostnameLabel {
+ labels = append(labels, Label{"host", m.HostName})
+ }
+ if m.EnableTypePrefix {
+ key = insert(0, "sample", key)
+ }
+ if m.ServiceName != "" {
+ if m.EnableServiceLabel {
+ labels = append(labels, Label{"service", m.ServiceName})
+ } else {
+ key = insert(0, m.ServiceName, key)
+ }
+ }
+ allowed, labelsFiltered := m.allowMetric(key, labels)
+ if !allowed {
+ return
+ }
+ m.sink.AddSampleWithLabels(key, val, labelsFiltered)
+}
+
+func (m *Metrics) MeasureSince(key []string, start time.Time) {
+ m.MeasureSinceWithLabels(key, start, nil)
+}
+
+func (m *Metrics) MeasureSinceWithLabels(key []string, start time.Time, labels []Label) {
+ if m.HostName != "" && m.EnableHostnameLabel {
+ labels = append(labels, Label{"host", m.HostName})
+ }
+ if m.EnableTypePrefix {
+ key = insert(0, "timer", key)
+ }
+ if m.ServiceName != "" {
+ if m.EnableServiceLabel {
+ labels = append(labels, Label{"service", m.ServiceName})
+ } else {
+ key = insert(0, m.ServiceName, key)
+ }
+ }
+ allowed, labelsFiltered := m.allowMetric(key, labels)
+ if !allowed {
+ return
+ }
+ now := time.Now()
+ elapsed := now.Sub(start)
+ msec := float32(elapsed.Nanoseconds()) / float32(m.TimerGranularity)
+ m.sink.AddSampleWithLabels(key, msec, labelsFiltered)
+}
+
+// UpdateFilter overwrites the existing filter with the given rules.
+func (m *Metrics) UpdateFilter(allow, block []string) {
+ m.UpdateFilterAndLabels(allow, block, m.AllowedLabels, m.BlockedLabels)
+}
+
+// UpdateFilterAndLabels overwrites the existing filter with the given rules.
+func (m *Metrics) UpdateFilterAndLabels(allow, block, allowedLabels, blockedLabels []string) {
+ m.filterLock.Lock()
+ defer m.filterLock.Unlock()
+
+ m.AllowedPrefixes = allow
+ m.BlockedPrefixes = block
+
+ if allowedLabels == nil {
+ // Having a white list means we take only elements from it
+ m.allowedLabels = nil
+ } else {
+ m.allowedLabels = make(map[string]bool)
+ for _, v := range allowedLabels {
+ m.allowedLabels[v] = true
+ }
+ }
+ m.blockedLabels = make(map[string]bool)
+ for _, v := range blockedLabels {
+ m.blockedLabels[v] = true
+ }
+ m.AllowedLabels = allowedLabels
+ m.BlockedLabels = blockedLabels
+
+ m.filter = iradix.New()
+ for _, prefix := range m.AllowedPrefixes {
+ m.filter, _, _ = m.filter.Insert([]byte(prefix), true)
+ }
+ for _, prefix := range m.BlockedPrefixes {
+ m.filter, _, _ = m.filter.Insert([]byte(prefix), false)
+ }
+}
+
+// labelIsAllowed return true if a should be included in metric
+// the caller should lock m.filterLock while calling this method
+func (m *Metrics) labelIsAllowed(label *Label) bool {
+ labelName := (*label).Name
+ if m.blockedLabels != nil {
+ _, ok := m.blockedLabels[labelName]
+ if ok {
+ // If present, let's remove this label
+ return false
+ }
+ }
+ if m.allowedLabels != nil {
+ _, ok := m.allowedLabels[labelName]
+ return ok
+ }
+ // Allow by default
+ return true
+}
+
+// filterLabels return only allowed labels
+// the caller should lock m.filterLock while calling this method
+func (m *Metrics) filterLabels(labels []Label) []Label {
+ if labels == nil {
+ return nil
+ }
+ toReturn := labels[:0]
+ for _, label := range labels {
+ if m.labelIsAllowed(&label) {
+ toReturn = append(toReturn, label)
+ }
+ }
+ return toReturn
+}
+
+// Returns whether the metric should be allowed based on configured prefix filters
+// Also return the applicable labels
+func (m *Metrics) allowMetric(key []string, labels []Label) (bool, []Label) {
+ m.filterLock.RLock()
+ defer m.filterLock.RUnlock()
+
+ if m.filter == nil || m.filter.Len() == 0 {
+ return m.Config.FilterDefault, m.filterLabels(labels)
+ }
+
+ _, allowed, ok := m.filter.Root().LongestPrefix([]byte(strings.Join(key, ".")))
+ if !ok {
+ return m.Config.FilterDefault, m.filterLabels(labels)
+ }
+
+ return allowed.(bool), m.filterLabels(labels)
+}
+
+// Periodically collects runtime stats to publish
+func (m *Metrics) collectStats() {
+ for {
+ time.Sleep(m.ProfileInterval)
+ m.emitRuntimeStats()
+ }
+}
+
+// Emits various runtime statsitics
+func (m *Metrics) emitRuntimeStats() {
+ // Export number of Goroutines
+ numRoutines := runtime.NumGoroutine()
+ m.SetGauge([]string{"runtime", "num_goroutines"}, float32(numRoutines))
+
+ // Export memory stats
+ var stats runtime.MemStats
+ runtime.ReadMemStats(&stats)
+ m.SetGauge([]string{"runtime", "alloc_bytes"}, float32(stats.Alloc))
+ m.SetGauge([]string{"runtime", "sys_bytes"}, float32(stats.Sys))
+ m.SetGauge([]string{"runtime", "malloc_count"}, float32(stats.Mallocs))
+ m.SetGauge([]string{"runtime", "free_count"}, float32(stats.Frees))
+ m.SetGauge([]string{"runtime", "heap_objects"}, float32(stats.HeapObjects))
+ m.SetGauge([]string{"runtime", "total_gc_pause_ns"}, float32(stats.PauseTotalNs))
+ m.SetGauge([]string{"runtime", "total_gc_runs"}, float32(stats.NumGC))
+
+ // Export info about the last few GC runs
+ num := stats.NumGC
+
+ // Handle wrap around
+ if num < m.lastNumGC {
+ m.lastNumGC = 0
+ }
+
+ // Ensure we don't scan more than 256
+ if num-m.lastNumGC >= 256 {
+ m.lastNumGC = num - 255
+ }
+
+ for i := m.lastNumGC; i < num; i++ {
+ pause := stats.PauseNs[i%256]
+ m.AddSample([]string{"runtime", "gc_pause_ns"}, float32(pause))
+ }
+ m.lastNumGC = num
+}
+
+// Inserts a string value at an index into the slice
+func insert(i int, v string, s []string) []string {
+ s = append(s, "")
+ copy(s[i+1:], s[i:])
+ s[i] = v
+ return s
+}
diff --git a/vendor/github.com/armon/go-metrics/sink.go b/vendor/github.com/armon/go-metrics/sink.go
new file mode 100644
index 0000000..0b7d6e4
--- /dev/null
+++ b/vendor/github.com/armon/go-metrics/sink.go
@@ -0,0 +1,115 @@
+package metrics
+
+import (
+ "fmt"
+ "net/url"
+)
+
+// The MetricSink interface is used to transmit metrics information
+// to an external system
+type MetricSink interface {
+ // A Gauge should retain the last value it is set to
+ SetGauge(key []string, val float32)
+ SetGaugeWithLabels(key []string, val float32, labels []Label)
+
+ // Should emit a Key/Value pair for each call
+ EmitKey(key []string, val float32)
+
+ // Counters should accumulate values
+ IncrCounter(key []string, val float32)
+ IncrCounterWithLabels(key []string, val float32, labels []Label)
+
+ // Samples are for timing information, where quantiles are used
+ AddSample(key []string, val float32)
+ AddSampleWithLabels(key []string, val float32, labels []Label)
+}
+
+// BlackholeSink is used to just blackhole messages
+type BlackholeSink struct{}
+
+func (*BlackholeSink) SetGauge(key []string, val float32) {}
+func (*BlackholeSink) SetGaugeWithLabels(key []string, val float32, labels []Label) {}
+func (*BlackholeSink) EmitKey(key []string, val float32) {}
+func (*BlackholeSink) IncrCounter(key []string, val float32) {}
+func (*BlackholeSink) IncrCounterWithLabels(key []string, val float32, labels []Label) {}
+func (*BlackholeSink) AddSample(key []string, val float32) {}
+func (*BlackholeSink) AddSampleWithLabels(key []string, val float32, labels []Label) {}
+
+// FanoutSink is used to sink to fanout values to multiple sinks
+type FanoutSink []MetricSink
+
+func (fh FanoutSink) SetGauge(key []string, val float32) {
+ fh.SetGaugeWithLabels(key, val, nil)
+}
+
+func (fh FanoutSink) SetGaugeWithLabels(key []string, val float32, labels []Label) {
+ for _, s := range fh {
+ s.SetGaugeWithLabels(key, val, labels)
+ }
+}
+
+func (fh FanoutSink) EmitKey(key []string, val float32) {
+ for _, s := range fh {
+ s.EmitKey(key, val)
+ }
+}
+
+func (fh FanoutSink) IncrCounter(key []string, val float32) {
+ fh.IncrCounterWithLabels(key, val, nil)
+}
+
+func (fh FanoutSink) IncrCounterWithLabels(key []string, val float32, labels []Label) {
+ for _, s := range fh {
+ s.IncrCounterWithLabels(key, val, labels)
+ }
+}
+
+func (fh FanoutSink) AddSample(key []string, val float32) {
+ fh.AddSampleWithLabels(key, val, nil)
+}
+
+func (fh FanoutSink) AddSampleWithLabels(key []string, val float32, labels []Label) {
+ for _, s := range fh {
+ s.AddSampleWithLabels(key, val, labels)
+ }
+}
+
+// sinkURLFactoryFunc is an generic interface around the *SinkFromURL() function provided
+// by each sink type
+type sinkURLFactoryFunc func(*url.URL) (MetricSink, error)
+
+// sinkRegistry supports the generic NewMetricSink function by mapping URL
+// schemes to metric sink factory functions
+var sinkRegistry = map[string]sinkURLFactoryFunc{
+ "statsd": NewStatsdSinkFromURL,
+ "statsite": NewStatsiteSinkFromURL,
+ "inmem": NewInmemSinkFromURL,
+}
+
+// NewMetricSinkFromURL allows a generic URL input to configure any of the
+// supported sinks. The scheme of the URL identifies the type of the sink, the
+// and query parameters are used to set options.
+//
+// "statsd://" - Initializes a StatsdSink. The host and port are passed through
+// as the "addr" of the sink
+//
+// "statsite://" - Initializes a StatsiteSink. The host and port become the
+// "addr" of the sink
+//
+// "inmem://" - Initializes an InmemSink. The host and port are ignored. The
+// "interval" and "duration" query parameters must be specified with valid
+// durations, see NewInmemSink for details.
+func NewMetricSinkFromURL(urlStr string) (MetricSink, error) {
+ u, err := url.Parse(urlStr)
+ if err != nil {
+ return nil, err
+ }
+
+ sinkURLFactoryFunc := sinkRegistry[u.Scheme]
+ if sinkURLFactoryFunc == nil {
+ return nil, fmt.Errorf(
+ "cannot create metric sink, unrecognized sink name: %q", u.Scheme)
+ }
+
+ return sinkURLFactoryFunc(u)
+}
diff --git a/vendor/github.com/armon/go-metrics/start.go b/vendor/github.com/armon/go-metrics/start.go
new file mode 100644
index 0000000..32a28c4
--- /dev/null
+++ b/vendor/github.com/armon/go-metrics/start.go
@@ -0,0 +1,141 @@
+package metrics
+
+import (
+ "os"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/hashicorp/go-immutable-radix"
+)
+
+// Config is used to configure metrics settings
+type Config struct {
+ ServiceName string // Prefixed with keys to separate services
+ HostName string // Hostname to use. If not provided and EnableHostname, it will be os.Hostname
+ EnableHostname bool // Enable prefixing gauge values with hostname
+ EnableHostnameLabel bool // Enable adding hostname to labels
+ EnableServiceLabel bool // Enable adding service to labels
+ EnableRuntimeMetrics bool // Enables profiling of runtime metrics (GC, Goroutines, Memory)
+ EnableTypePrefix bool // Prefixes key with a type ("counter", "gauge", "timer")
+ TimerGranularity time.Duration // Granularity of timers.
+ ProfileInterval time.Duration // Interval to profile runtime metrics
+
+ AllowedPrefixes []string // A list of metric prefixes to allow, with '.' as the separator
+ BlockedPrefixes []string // A list of metric prefixes to block, with '.' as the separator
+ AllowedLabels []string // A list of metric labels to allow, with '.' as the separator
+ BlockedLabels []string // A list of metric labels to block, with '.' as the separator
+ FilterDefault bool // Whether to allow metrics by default
+}
+
+// Metrics represents an instance of a metrics sink that can
+// be used to emit
+type Metrics struct {
+ Config
+ lastNumGC uint32
+ sink MetricSink
+ filter *iradix.Tree
+ allowedLabels map[string]bool
+ blockedLabels map[string]bool
+ filterLock sync.RWMutex // Lock filters and allowedLabels/blockedLabels access
+}
+
+// Shared global metrics instance
+var globalMetrics atomic.Value // *Metrics
+
+func init() {
+ // Initialize to a blackhole sink to avoid errors
+ globalMetrics.Store(&Metrics{sink: &BlackholeSink{}})
+}
+
+// DefaultConfig provides a sane default configuration
+func DefaultConfig(serviceName string) *Config {
+ c := &Config{
+ ServiceName: serviceName, // Use client provided service
+ HostName: "",
+ EnableHostname: true, // Enable hostname prefix
+ EnableRuntimeMetrics: true, // Enable runtime profiling
+ EnableTypePrefix: false, // Disable type prefix
+ TimerGranularity: time.Millisecond, // Timers are in milliseconds
+ ProfileInterval: time.Second, // Poll runtime every second
+ FilterDefault: true, // Don't filter metrics by default
+ }
+
+ // Try to get the hostname
+ name, _ := os.Hostname()
+ c.HostName = name
+ return c
+}
+
+// New is used to create a new instance of Metrics
+func New(conf *Config, sink MetricSink) (*Metrics, error) {
+ met := &Metrics{}
+ met.Config = *conf
+ met.sink = sink
+ met.UpdateFilterAndLabels(conf.AllowedPrefixes, conf.BlockedPrefixes, conf.AllowedLabels, conf.BlockedLabels)
+
+ // Start the runtime collector
+ if conf.EnableRuntimeMetrics {
+ go met.collectStats()
+ }
+ return met, nil
+}
+
+// NewGlobal is the same as New, but it assigns the metrics object to be
+// used globally as well as returning it.
+func NewGlobal(conf *Config, sink MetricSink) (*Metrics, error) {
+ metrics, err := New(conf, sink)
+ if err == nil {
+ globalMetrics.Store(metrics)
+ }
+ return metrics, err
+}
+
+// Proxy all the methods to the globalMetrics instance
+func SetGauge(key []string, val float32) {
+ globalMetrics.Load().(*Metrics).SetGauge(key, val)
+}
+
+func SetGaugeWithLabels(key []string, val float32, labels []Label) {
+ globalMetrics.Load().(*Metrics).SetGaugeWithLabels(key, val, labels)
+}
+
+func EmitKey(key []string, val float32) {
+ globalMetrics.Load().(*Metrics).EmitKey(key, val)
+}
+
+func IncrCounter(key []string, val float32) {
+ globalMetrics.Load().(*Metrics).IncrCounter(key, val)
+}
+
+func IncrCounterWithLabels(key []string, val float32, labels []Label) {
+ globalMetrics.Load().(*Metrics).IncrCounterWithLabels(key, val, labels)
+}
+
+func AddSample(key []string, val float32) {
+ globalMetrics.Load().(*Metrics).AddSample(key, val)
+}
+
+func AddSampleWithLabels(key []string, val float32, labels []Label) {
+ globalMetrics.Load().(*Metrics).AddSampleWithLabels(key, val, labels)
+}
+
+func MeasureSince(key []string, start time.Time) {
+ globalMetrics.Load().(*Metrics).MeasureSince(key, start)
+}
+
+func MeasureSinceWithLabels(key []string, start time.Time, labels []Label) {
+ globalMetrics.Load().(*Metrics).MeasureSinceWithLabels(key, start, labels)
+}
+
+func UpdateFilter(allow, block []string) {
+ globalMetrics.Load().(*Metrics).UpdateFilter(allow, block)
+}
+
+// UpdateFilterAndLabels set allow/block prefixes of metrics while allowedLabels
+// and blockedLabels - when not nil - allow filtering of labels in order to
+// block/allow globally labels (especially useful when having large number of
+// values for a given label). See README.md for more information about usage.
+func UpdateFilterAndLabels(allow, block, allowedLabels, blockedLabels []string) {
+ globalMetrics.Load().(*Metrics).UpdateFilterAndLabels(allow, block, allowedLabels, blockedLabels)
+}
diff --git a/vendor/github.com/armon/go-metrics/statsd.go b/vendor/github.com/armon/go-metrics/statsd.go
new file mode 100644
index 0000000..1bfffce
--- /dev/null
+++ b/vendor/github.com/armon/go-metrics/statsd.go
@@ -0,0 +1,184 @@
+package metrics
+
+import (
+ "bytes"
+ "fmt"
+ "log"
+ "net"
+ "net/url"
+ "strings"
+ "time"
+)
+
+const (
+ // statsdMaxLen is the maximum size of a packet
+ // to send to statsd
+ statsdMaxLen = 1400
+)
+
+// StatsdSink provides a MetricSink that can be used
+// with a statsite or statsd metrics server. It uses
+// only UDP packets, while StatsiteSink uses TCP.
+type StatsdSink struct {
+ addr string
+ metricQueue chan string
+}
+
+// NewStatsdSinkFromURL creates an StatsdSink from a URL. It is used
+// (and tested) from NewMetricSinkFromURL.
+func NewStatsdSinkFromURL(u *url.URL) (MetricSink, error) {
+ return NewStatsdSink(u.Host)
+}
+
+// NewStatsdSink is used to create a new StatsdSink
+func NewStatsdSink(addr string) (*StatsdSink, error) {
+ s := &StatsdSink{
+ addr: addr,
+ metricQueue: make(chan string, 4096),
+ }
+ go s.flushMetrics()
+ return s, nil
+}
+
+// Close is used to stop flushing to statsd
+func (s *StatsdSink) Shutdown() {
+ close(s.metricQueue)
+}
+
+func (s *StatsdSink) SetGauge(key []string, val float32) {
+ flatKey := s.flattenKey(key)
+ s.pushMetric(fmt.Sprintf("%s:%f|g\n", flatKey, val))
+}
+
+func (s *StatsdSink) SetGaugeWithLabels(key []string, val float32, labels []Label) {
+ flatKey := s.flattenKeyLabels(key, labels)
+ s.pushMetric(fmt.Sprintf("%s:%f|g\n", flatKey, val))
+}
+
+func (s *StatsdSink) EmitKey(key []string, val float32) {
+ flatKey := s.flattenKey(key)
+ s.pushMetric(fmt.Sprintf("%s:%f|kv\n", flatKey, val))
+}
+
+func (s *StatsdSink) IncrCounter(key []string, val float32) {
+ flatKey := s.flattenKey(key)
+ s.pushMetric(fmt.Sprintf("%s:%f|c\n", flatKey, val))
+}
+
+func (s *StatsdSink) IncrCounterWithLabels(key []string, val float32, labels []Label) {
+ flatKey := s.flattenKeyLabels(key, labels)
+ s.pushMetric(fmt.Sprintf("%s:%f|c\n", flatKey, val))
+}
+
+func (s *StatsdSink) AddSample(key []string, val float32) {
+ flatKey := s.flattenKey(key)
+ s.pushMetric(fmt.Sprintf("%s:%f|ms\n", flatKey, val))
+}
+
+func (s *StatsdSink) AddSampleWithLabels(key []string, val float32, labels []Label) {
+ flatKey := s.flattenKeyLabels(key, labels)
+ s.pushMetric(fmt.Sprintf("%s:%f|ms\n", flatKey, val))
+}
+
+// Flattens the key for formatting, removes spaces
+func (s *StatsdSink) flattenKey(parts []string) string {
+ joined := strings.Join(parts, ".")
+ return strings.Map(func(r rune) rune {
+ switch r {
+ case ':':
+ fallthrough
+ case ' ':
+ return '_'
+ default:
+ return r
+ }
+ }, joined)
+}
+
+// Flattens the key along with labels for formatting, removes spaces
+func (s *StatsdSink) flattenKeyLabels(parts []string, labels []Label) string {
+ for _, label := range labels {
+ parts = append(parts, label.Value)
+ }
+ return s.flattenKey(parts)
+}
+
+// Does a non-blocking push to the metrics queue
+func (s *StatsdSink) pushMetric(m string) {
+ select {
+ case s.metricQueue <- m:
+ default:
+ }
+}
+
+// Flushes metrics
+func (s *StatsdSink) flushMetrics() {
+ var sock net.Conn
+ var err error
+ var wait <-chan time.Time
+ ticker := time.NewTicker(flushInterval)
+ defer ticker.Stop()
+
+CONNECT:
+ // Create a buffer
+ buf := bytes.NewBuffer(nil)
+
+ // Attempt to connect
+ sock, err = net.Dial("udp", s.addr)
+ if err != nil {
+ log.Printf("[ERR] Error connecting to statsd! Err: %s", err)
+ goto WAIT
+ }
+
+ for {
+ select {
+ case metric, ok := <-s.metricQueue:
+ // Get a metric from the queue
+ if !ok {
+ goto QUIT
+ }
+
+ // Check if this would overflow the packet size
+ if len(metric)+buf.Len() > statsdMaxLen {
+ _, err := sock.Write(buf.Bytes())
+ buf.Reset()
+ if err != nil {
+ log.Printf("[ERR] Error writing to statsd! Err: %s", err)
+ goto WAIT
+ }
+ }
+
+ // Append to the buffer
+ buf.WriteString(metric)
+
+ case <-ticker.C:
+ if buf.Len() == 0 {
+ continue
+ }
+
+ _, err := sock.Write(buf.Bytes())
+ buf.Reset()
+ if err != nil {
+ log.Printf("[ERR] Error flushing to statsd! Err: %s", err)
+ goto WAIT
+ }
+ }
+ }
+
+WAIT:
+ // Wait for a while
+ wait = time.After(time.Duration(5) * time.Second)
+ for {
+ select {
+ // Dequeue the messages to avoid backlog
+ case _, ok := <-s.metricQueue:
+ if !ok {
+ goto QUIT
+ }
+ case <-wait:
+ goto CONNECT
+ }
+ }
+QUIT:
+ s.metricQueue = nil
+}
diff --git a/vendor/github.com/armon/go-metrics/statsite.go b/vendor/github.com/armon/go-metrics/statsite.go
new file mode 100644
index 0000000..6c0d284
--- /dev/null
+++ b/vendor/github.com/armon/go-metrics/statsite.go
@@ -0,0 +1,172 @@
+package metrics
+
+import (
+ "bufio"
+ "fmt"
+ "log"
+ "net"
+ "net/url"
+ "strings"
+ "time"
+)
+
+const (
+ // We force flush the statsite metrics after this period of
+ // inactivity. Prevents stats from getting stuck in a buffer
+ // forever.
+ flushInterval = 100 * time.Millisecond
+)
+
+// NewStatsiteSinkFromURL creates an StatsiteSink from a URL. It is used
+// (and tested) from NewMetricSinkFromURL.
+func NewStatsiteSinkFromURL(u *url.URL) (MetricSink, error) {
+ return NewStatsiteSink(u.Host)
+}
+
+// StatsiteSink provides a MetricSink that can be used with a
+// statsite metrics server
+type StatsiteSink struct {
+ addr string
+ metricQueue chan string
+}
+
+// NewStatsiteSink is used to create a new StatsiteSink
+func NewStatsiteSink(addr string) (*StatsiteSink, error) {
+ s := &StatsiteSink{
+ addr: addr,
+ metricQueue: make(chan string, 4096),
+ }
+ go s.flushMetrics()
+ return s, nil
+}
+
+// Close is used to stop flushing to statsite
+func (s *StatsiteSink) Shutdown() {
+ close(s.metricQueue)
+}
+
+func (s *StatsiteSink) SetGauge(key []string, val float32) {
+ flatKey := s.flattenKey(key)
+ s.pushMetric(fmt.Sprintf("%s:%f|g\n", flatKey, val))
+}
+
+func (s *StatsiteSink) SetGaugeWithLabels(key []string, val float32, labels []Label) {
+ flatKey := s.flattenKeyLabels(key, labels)
+ s.pushMetric(fmt.Sprintf("%s:%f|g\n", flatKey, val))
+}
+
+func (s *StatsiteSink) EmitKey(key []string, val float32) {
+ flatKey := s.flattenKey(key)
+ s.pushMetric(fmt.Sprintf("%s:%f|kv\n", flatKey, val))
+}
+
+func (s *StatsiteSink) IncrCounter(key []string, val float32) {
+ flatKey := s.flattenKey(key)
+ s.pushMetric(fmt.Sprintf("%s:%f|c\n", flatKey, val))
+}
+
+func (s *StatsiteSink) IncrCounterWithLabels(key []string, val float32, labels []Label) {
+ flatKey := s.flattenKeyLabels(key, labels)
+ s.pushMetric(fmt.Sprintf("%s:%f|c\n", flatKey, val))
+}
+
+func (s *StatsiteSink) AddSample(key []string, val float32) {
+ flatKey := s.flattenKey(key)
+ s.pushMetric(fmt.Sprintf("%s:%f|ms\n", flatKey, val))
+}
+
+func (s *StatsiteSink) AddSampleWithLabels(key []string, val float32, labels []Label) {
+ flatKey := s.flattenKeyLabels(key, labels)
+ s.pushMetric(fmt.Sprintf("%s:%f|ms\n", flatKey, val))
+}
+
+// Flattens the key for formatting, removes spaces
+func (s *StatsiteSink) flattenKey(parts []string) string {
+ joined := strings.Join(parts, ".")
+ return strings.Map(func(r rune) rune {
+ switch r {
+ case ':':
+ fallthrough
+ case ' ':
+ return '_'
+ default:
+ return r
+ }
+ }, joined)
+}
+
+// Flattens the key along with labels for formatting, removes spaces
+func (s *StatsiteSink) flattenKeyLabels(parts []string, labels []Label) string {
+ for _, label := range labels {
+ parts = append(parts, label.Value)
+ }
+ return s.flattenKey(parts)
+}
+
+// Does a non-blocking push to the metrics queue
+func (s *StatsiteSink) pushMetric(m string) {
+ select {
+ case s.metricQueue <- m:
+ default:
+ }
+}
+
+// Flushes metrics
+func (s *StatsiteSink) flushMetrics() {
+ var sock net.Conn
+ var err error
+ var wait <-chan time.Time
+ var buffered *bufio.Writer
+ ticker := time.NewTicker(flushInterval)
+ defer ticker.Stop()
+
+CONNECT:
+ // Attempt to connect
+ sock, err = net.Dial("tcp", s.addr)
+ if err != nil {
+ log.Printf("[ERR] Error connecting to statsite! Err: %s", err)
+ goto WAIT
+ }
+
+ // Create a buffered writer
+ buffered = bufio.NewWriter(sock)
+
+ for {
+ select {
+ case metric, ok := <-s.metricQueue:
+ // Get a metric from the queue
+ if !ok {
+ goto QUIT
+ }
+
+ // Try to send to statsite
+ _, err := buffered.Write([]byte(metric))
+ if err != nil {
+ log.Printf("[ERR] Error writing to statsite! Err: %s", err)
+ goto WAIT
+ }
+ case <-ticker.C:
+ if err := buffered.Flush(); err != nil {
+ log.Printf("[ERR] Error flushing to statsite! Err: %s", err)
+ goto WAIT
+ }
+ }
+ }
+
+WAIT:
+ // Wait for a while
+ wait = time.After(time.Duration(5) * time.Second)
+ for {
+ select {
+ // Dequeue the messages to avoid backlog
+ case _, ok := <-s.metricQueue:
+ if !ok {
+ goto QUIT
+ }
+ case <-wait:
+ goto CONNECT
+ }
+ }
+QUIT:
+ s.metricQueue = nil
+}