VOL-1497 : Add more control to kv/memory access
- Added kv locking mechanism (etcd only)
- (watch) control path access whenever possible
- (watch) use a transaction for updates and merge with memory
- cleaned up vendoring
- misc changes to fix exceptions found along the way
Amendments:
- Copyright header got removed in auto-generated file
- Changed default locking to false for KV list operation
- Updated backend api to allow the passing of locking parameter
Change-Id: Ie1a55d3ca8b9d92ae71a85ce42bb22fcf1419e2c
diff --git a/vendor/github.com/armon/go-metrics/.gitignore b/vendor/github.com/armon/go-metrics/.gitignore
old mode 100755
new mode 100644
diff --git a/vendor/github.com/armon/go-metrics/README.md b/vendor/github.com/armon/go-metrics/README.md
index 7b6f23e..aa73348 100644
--- a/vendor/github.com/armon/go-metrics/README.md
+++ b/vendor/github.com/armon/go-metrics/README.md
@@ -7,7 +7,7 @@
Current API: [![GoDoc](https://godoc.org/github.com/armon/go-metrics?status.svg)](https://godoc.org/github.com/armon/go-metrics)
Sinks
-=====
+-----
The `metrics` package makes use of a `MetricSink` interface to support delivery
to any type of backend. Currently the following sinks are provided:
@@ -23,49 +23,69 @@
and dump a formatted output of recent metrics. For example, when a process gets
a SIGUSR1, it can dump to stderr recent performance metrics for debugging.
+Labels
+------
+
+Most metrics do have an equivalent ending with `WithLabels`, such methods
+allow to push metrics with labels and use some features of underlying Sinks
+(ex: translated into Prometheus labels).
+
+Since some of these labels may increase greatly cardinality of metrics, the
+library allow to filter labels using a blacklist/whitelist filtering system
+which is global to all metrics.
+
+* If `Config.AllowedLabels` is not nil, then only labels specified in this value will be sent to underlying Sink, otherwise, all labels are sent by default.
+* If `Config.BlockedLabels` is not nil, any label specified in this value will not be sent to underlying Sinks.
+
+By default, both `Config.AllowedLabels` and `Config.BlockedLabels` are nil, meaning that
+no tags are filetered at all, but it allow to a user to globally block some tags with high
+cardinality at application level.
+
Examples
-========
+--------
Here is an example of using the package:
- func SlowMethod() {
- // Profiling the runtime of a method
- defer metrics.MeasureSince([]string{"SlowMethod"}, time.Now())
- }
+```go
+func SlowMethod() {
+ // Profiling the runtime of a method
+ defer metrics.MeasureSince([]string{"SlowMethod"}, time.Now())
+}
- // Configure a statsite sink as the global metrics sink
- sink, _ := metrics.NewStatsiteSink("statsite:8125")
- metrics.NewGlobal(metrics.DefaultConfig("service-name"), sink)
+// Configure a statsite sink as the global metrics sink
+sink, _ := metrics.NewStatsiteSink("statsite:8125")
+metrics.NewGlobal(metrics.DefaultConfig("service-name"), sink)
- // Emit a Key/Value pair
- metrics.EmitKey([]string{"questions", "meaning of life"}, 42)
+// Emit a Key/Value pair
+metrics.EmitKey([]string{"questions", "meaning of life"}, 42)
+```
+Here is an example of setting up a signal handler:
-Here is an example of setting up an signal handler:
+```go
+// Setup the inmem sink and signal handler
+inm := metrics.NewInmemSink(10*time.Second, time.Minute)
+sig := metrics.DefaultInmemSignal(inm)
+metrics.NewGlobal(metrics.DefaultConfig("service-name"), inm)
- // Setup the inmem sink and signal handler
- inm := metrics.NewInmemSink(10*time.Second, time.Minute)
- sig := metrics.DefaultInmemSignal(inm)
- metrics.NewGlobal(metrics.DefaultConfig("service-name"), inm)
+// Run some code
+inm.SetGauge([]string{"foo"}, 42)
+inm.EmitKey([]string{"bar"}, 30)
- // Run some code
- inm.SetGauge([]string{"foo"}, 42)
- inm.EmitKey([]string{"bar"}, 30)
+inm.IncrCounter([]string{"baz"}, 42)
+inm.IncrCounter([]string{"baz"}, 1)
+inm.IncrCounter([]string{"baz"}, 80)
- inm.IncrCounter([]string{"baz"}, 42)
- inm.IncrCounter([]string{"baz"}, 1)
- inm.IncrCounter([]string{"baz"}, 80)
+inm.AddSample([]string{"method", "wow"}, 42)
+inm.AddSample([]string{"method", "wow"}, 100)
+inm.AddSample([]string{"method", "wow"}, 22)
- inm.AddSample([]string{"method", "wow"}, 42)
- inm.AddSample([]string{"method", "wow"}, 100)
- inm.AddSample([]string{"method", "wow"}, 22)
-
- ....
+....
+```
When a signal comes in, output like the following will be dumped to stderr:
[2014-01-28 14:57:33.04 -0800 PST][G] 'foo': 42.000
[2014-01-28 14:57:33.04 -0800 PST][P] 'bar': 30.000
[2014-01-28 14:57:33.04 -0800 PST][C] 'baz': Count: 3 Min: 1.000 Mean: 41.000 Max: 80.000 Stddev: 39.509
- [2014-01-28 14:57:33.04 -0800 PST][S] 'method.wow': Count: 3 Min: 22.000 Mean: 54.667 Max: 100.000 Stddev: 40.513
-
+ [2014-01-28 14:57:33.04 -0800 PST][S] 'method.wow': Count: 3 Min: 22.000 Mean: 54.667 Max: 100.000 Stddev: 40.513
\ No newline at end of file
diff --git a/vendor/github.com/armon/go-metrics/inmem.go b/vendor/github.com/armon/go-metrics/inmem.go
index 83fb6bb..4e2d6a7 100644
--- a/vendor/github.com/armon/go-metrics/inmem.go
+++ b/vendor/github.com/armon/go-metrics/inmem.go
@@ -1,8 +1,10 @@
package metrics
import (
+ "bytes"
"fmt"
"math"
+ "net/url"
"strings"
"sync"
"time"
@@ -25,7 +27,7 @@
// intervals is a slice of the retained intervals
intervals []*IntervalMetrics
intervalLock sync.RWMutex
-
+
rateDenom float64
}
@@ -38,7 +40,7 @@
Interval time.Time
// Gauges maps the key to the last set value
- Gauges map[string]float32
+ Gauges map[string]GaugeValue
// Points maps the string to the list of emitted values
// from EmitKey
@@ -46,21 +48,21 @@
// Counters maps the string key to a sum of the counter
// values
- Counters map[string]*AggregateSample
+ Counters map[string]SampledValue
// Samples maps the key to an AggregateSample,
// which has the rolled up view of a sample
- Samples map[string]*AggregateSample
+ Samples map[string]SampledValue
}
// NewIntervalMetrics creates a new IntervalMetrics for a given interval
func NewIntervalMetrics(intv time.Time) *IntervalMetrics {
return &IntervalMetrics{
Interval: intv,
- Gauges: make(map[string]float32),
+ Gauges: make(map[string]GaugeValue),
Points: make(map[string][]float32),
- Counters: make(map[string]*AggregateSample),
- Samples: make(map[string]*AggregateSample),
+ Counters: make(map[string]SampledValue),
+ Samples: make(map[string]SampledValue),
}
}
@@ -68,12 +70,12 @@
// about a sample
type AggregateSample struct {
Count int // The count of emitted pairs
- Rate float64 // The count of emitted pairs per time unit (usually 1 second)
+ Rate float64 // The values rate per time unit (usually 1 second)
Sum float64 // The sum of values
- SumSq float64 // The sum of squared values
+ SumSq float64 `json:"-"` // The sum of squared values
Min float64 // Minimum value
Max float64 // Maximum value
- LastUpdated time.Time // When value was last updated
+ LastUpdated time.Time `json:"-"` // When value was last updated
}
// Computes a Stddev of the values
@@ -105,7 +107,7 @@
if v > a.Max || a.Count == 1 {
a.Max = v
}
- a.Rate = float64(a.Count)/rateDenom
+ a.Rate = float64(a.Sum) / rateDenom
a.LastUpdated = time.Now()
}
@@ -120,6 +122,24 @@
}
}
+// NewInmemSinkFromURL creates an InmemSink from a URL. It is used
+// (and tested) from NewMetricSinkFromURL.
+func NewInmemSinkFromURL(u *url.URL) (MetricSink, error) {
+ params := u.Query()
+
+ interval, err := time.ParseDuration(params.Get("interval"))
+ if err != nil {
+ return nil, fmt.Errorf("Bad 'interval' param: %s", err)
+ }
+
+ retain, err := time.ParseDuration(params.Get("retain"))
+ if err != nil {
+ return nil, fmt.Errorf("Bad 'retain' param: %s", err)
+ }
+
+ return NewInmemSink(interval, retain), nil
+}
+
// NewInmemSink is used to construct a new in-memory sink.
// Uses an aggregation interval and maximum retention period.
func NewInmemSink(interval, retain time.Duration) *InmemSink {
@@ -128,19 +148,23 @@
interval: interval,
retain: retain,
maxIntervals: int(retain / interval),
- rateDenom: float64(interval.Nanoseconds()) / float64(rateTimeUnit.Nanoseconds()),
+ rateDenom: float64(interval.Nanoseconds()) / float64(rateTimeUnit.Nanoseconds()),
}
i.intervals = make([]*IntervalMetrics, 0, i.maxIntervals)
return i
}
func (i *InmemSink) SetGauge(key []string, val float32) {
- k := i.flattenKey(key)
+ i.SetGaugeWithLabels(key, val, nil)
+}
+
+func (i *InmemSink) SetGaugeWithLabels(key []string, val float32, labels []Label) {
+ k, name := i.flattenKeyLabels(key, labels)
intv := i.getInterval()
intv.Lock()
defer intv.Unlock()
- intv.Gauges[k] = val
+ intv.Gauges[k] = GaugeValue{Name: name, Value: val, Labels: labels}
}
func (i *InmemSink) EmitKey(key []string, val float32) {
@@ -154,30 +178,46 @@
}
func (i *InmemSink) IncrCounter(key []string, val float32) {
- k := i.flattenKey(key)
+ i.IncrCounterWithLabels(key, val, nil)
+}
+
+func (i *InmemSink) IncrCounterWithLabels(key []string, val float32, labels []Label) {
+ k, name := i.flattenKeyLabels(key, labels)
intv := i.getInterval()
intv.Lock()
defer intv.Unlock()
- agg := intv.Counters[k]
- if agg == nil {
- agg = &AggregateSample{}
+ agg, ok := intv.Counters[k]
+ if !ok {
+ agg = SampledValue{
+ Name: name,
+ AggregateSample: &AggregateSample{},
+ Labels: labels,
+ }
intv.Counters[k] = agg
}
agg.Ingest(float64(val), i.rateDenom)
}
func (i *InmemSink) AddSample(key []string, val float32) {
- k := i.flattenKey(key)
+ i.AddSampleWithLabels(key, val, nil)
+}
+
+func (i *InmemSink) AddSampleWithLabels(key []string, val float32, labels []Label) {
+ k, name := i.flattenKeyLabels(key, labels)
intv := i.getInterval()
intv.Lock()
defer intv.Unlock()
- agg := intv.Samples[k]
- if agg == nil {
- agg = &AggregateSample{}
+ agg, ok := intv.Samples[k]
+ if !ok {
+ agg = SampledValue{
+ Name: name,
+ AggregateSample: &AggregateSample{},
+ Labels: labels,
+ }
intv.Samples[k] = agg
}
agg.Ingest(float64(val), i.rateDenom)
@@ -192,8 +232,37 @@
i.intervalLock.RLock()
defer i.intervalLock.RUnlock()
- intervals := make([]*IntervalMetrics, len(i.intervals))
- copy(intervals, i.intervals)
+ n := len(i.intervals)
+ intervals := make([]*IntervalMetrics, n)
+
+ copy(intervals[:n-1], i.intervals[:n-1])
+ current := i.intervals[n-1]
+
+ // make its own copy for current interval
+ intervals[n-1] = &IntervalMetrics{}
+ copyCurrent := intervals[n-1]
+ current.RLock()
+ *copyCurrent = *current
+
+ copyCurrent.Gauges = make(map[string]GaugeValue, len(current.Gauges))
+ for k, v := range current.Gauges {
+ copyCurrent.Gauges[k] = v
+ }
+ // saved values will be not change, just copy its link
+ copyCurrent.Points = make(map[string][]float32, len(current.Points))
+ for k, v := range current.Points {
+ copyCurrent.Points[k] = v
+ }
+ copyCurrent.Counters = make(map[string]SampledValue, len(current.Counters))
+ for k, v := range current.Counters {
+ copyCurrent.Counters[k] = v
+ }
+ copyCurrent.Samples = make(map[string]SampledValue, len(current.Samples))
+ for k, v := range current.Samples {
+ copyCurrent.Samples[k] = v
+ }
+ current.RUnlock()
+
return intervals
}
@@ -242,6 +311,38 @@
// Flattens the key for formatting, removes spaces
func (i *InmemSink) flattenKey(parts []string) string {
- joined := strings.Join(parts, ".")
- return strings.Replace(joined, " ", "_", -1)
+ buf := &bytes.Buffer{}
+ replacer := strings.NewReplacer(" ", "_")
+
+ if len(parts) > 0 {
+ replacer.WriteString(buf, parts[0])
+ }
+ for _, part := range parts[1:] {
+ replacer.WriteString(buf, ".")
+ replacer.WriteString(buf, part)
+ }
+
+ return buf.String()
+}
+
+// Flattens the key for formatting along with its labels, removes spaces
+func (i *InmemSink) flattenKeyLabels(parts []string, labels []Label) (string, string) {
+ buf := &bytes.Buffer{}
+ replacer := strings.NewReplacer(" ", "_")
+
+ if len(parts) > 0 {
+ replacer.WriteString(buf, parts[0])
+ }
+ for _, part := range parts[1:] {
+ replacer.WriteString(buf, ".")
+ replacer.WriteString(buf, part)
+ }
+
+ key := buf.String()
+
+ for _, label := range labels {
+ replacer.WriteString(buf, fmt.Sprintf(";%s=%s", label.Name, label.Value))
+ }
+
+ return buf.String(), key
}
diff --git a/vendor/github.com/armon/go-metrics/inmem_endpoint.go b/vendor/github.com/armon/go-metrics/inmem_endpoint.go
new file mode 100644
index 0000000..504f1b3
--- /dev/null
+++ b/vendor/github.com/armon/go-metrics/inmem_endpoint.go
@@ -0,0 +1,118 @@
+package metrics
+
+import (
+ "fmt"
+ "net/http"
+ "sort"
+ "time"
+)
+
+// MetricsSummary holds a roll-up of metrics info for a given interval
+type MetricsSummary struct {
+ Timestamp string
+ Gauges []GaugeValue
+ Points []PointValue
+ Counters []SampledValue
+ Samples []SampledValue
+}
+
+type GaugeValue struct {
+ Name string
+ Hash string `json:"-"`
+ Value float32
+
+ Labels []Label `json:"-"`
+ DisplayLabels map[string]string `json:"Labels"`
+}
+
+type PointValue struct {
+ Name string
+ Points []float32
+}
+
+type SampledValue struct {
+ Name string
+ Hash string `json:"-"`
+ *AggregateSample
+ Mean float64
+ Stddev float64
+
+ Labels []Label `json:"-"`
+ DisplayLabels map[string]string `json:"Labels"`
+}
+
+// DisplayMetrics returns a summary of the metrics from the most recent finished interval.
+func (i *InmemSink) DisplayMetrics(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
+ data := i.Data()
+
+ var interval *IntervalMetrics
+ n := len(data)
+ switch {
+ case n == 0:
+ return nil, fmt.Errorf("no metric intervals have been initialized yet")
+ case n == 1:
+ // Show the current interval if it's all we have
+ interval = i.intervals[0]
+ default:
+ // Show the most recent finished interval if we have one
+ interval = i.intervals[n-2]
+ }
+
+ summary := MetricsSummary{
+ Timestamp: interval.Interval.Round(time.Second).UTC().String(),
+ Gauges: make([]GaugeValue, 0, len(interval.Gauges)),
+ Points: make([]PointValue, 0, len(interval.Points)),
+ }
+
+ // Format and sort the output of each metric type, so it gets displayed in a
+ // deterministic order.
+ for name, points := range interval.Points {
+ summary.Points = append(summary.Points, PointValue{name, points})
+ }
+ sort.Slice(summary.Points, func(i, j int) bool {
+ return summary.Points[i].Name < summary.Points[j].Name
+ })
+
+ for hash, value := range interval.Gauges {
+ value.Hash = hash
+ value.DisplayLabels = make(map[string]string)
+ for _, label := range value.Labels {
+ value.DisplayLabels[label.Name] = label.Value
+ }
+ value.Labels = nil
+
+ summary.Gauges = append(summary.Gauges, value)
+ }
+ sort.Slice(summary.Gauges, func(i, j int) bool {
+ return summary.Gauges[i].Hash < summary.Gauges[j].Hash
+ })
+
+ summary.Counters = formatSamples(interval.Counters)
+ summary.Samples = formatSamples(interval.Samples)
+
+ return summary, nil
+}
+
+func formatSamples(source map[string]SampledValue) []SampledValue {
+ output := make([]SampledValue, 0, len(source))
+ for hash, sample := range source {
+ displayLabels := make(map[string]string)
+ for _, label := range sample.Labels {
+ displayLabels[label.Name] = label.Value
+ }
+
+ output = append(output, SampledValue{
+ Name: sample.Name,
+ Hash: hash,
+ AggregateSample: sample.AggregateSample,
+ Mean: sample.AggregateSample.Mean(),
+ Stddev: sample.AggregateSample.Stddev(),
+ DisplayLabels: displayLabels,
+ })
+ }
+ sort.Slice(output, func(i, j int) bool {
+ return output[i].Hash < output[j].Hash
+ })
+
+ return output
+}
diff --git a/vendor/github.com/armon/go-metrics/inmem_signal.go b/vendor/github.com/armon/go-metrics/inmem_signal.go
index 95d08ee..0937f4a 100644
--- a/vendor/github.com/armon/go-metrics/inmem_signal.go
+++ b/vendor/github.com/armon/go-metrics/inmem_signal.go
@@ -6,6 +6,7 @@
"io"
"os"
"os/signal"
+ "strings"
"sync"
"syscall"
)
@@ -75,22 +76,25 @@
data := i.inm.Data()
// Skip the last period which is still being aggregated
- for i := 0; i < len(data)-1; i++ {
- intv := data[i]
+ for j := 0; j < len(data)-1; j++ {
+ intv := data[j]
intv.RLock()
- for name, val := range intv.Gauges {
- fmt.Fprintf(buf, "[%v][G] '%s': %0.3f\n", intv.Interval, name, val)
+ for _, val := range intv.Gauges {
+ name := i.flattenLabels(val.Name, val.Labels)
+ fmt.Fprintf(buf, "[%v][G] '%s': %0.3f\n", intv.Interval, name, val.Value)
}
for name, vals := range intv.Points {
for _, val := range vals {
fmt.Fprintf(buf, "[%v][P] '%s': %0.3f\n", intv.Interval, name, val)
}
}
- for name, agg := range intv.Counters {
- fmt.Fprintf(buf, "[%v][C] '%s': %s\n", intv.Interval, name, agg)
+ for _, agg := range intv.Counters {
+ name := i.flattenLabels(agg.Name, agg.Labels)
+ fmt.Fprintf(buf, "[%v][C] '%s': %s\n", intv.Interval, name, agg.AggregateSample)
}
- for name, agg := range intv.Samples {
- fmt.Fprintf(buf, "[%v][S] '%s': %s\n", intv.Interval, name, agg)
+ for _, agg := range intv.Samples {
+ name := i.flattenLabels(agg.Name, agg.Labels)
+ fmt.Fprintf(buf, "[%v][S] '%s': %s\n", intv.Interval, name, agg.AggregateSample)
}
intv.RUnlock()
}
@@ -98,3 +102,16 @@
// Write out the bytes
i.w.Write(buf.Bytes())
}
+
+// Flattens the key for formatting along with its labels, removes spaces
+func (i *InmemSignal) flattenLabels(name string, labels []Label) string {
+ buf := bytes.NewBufferString(name)
+ replacer := strings.NewReplacer(" ", "_", ":", "_")
+
+ for _, label := range labels {
+ replacer.WriteString(buf, ".")
+ replacer.WriteString(buf, label.Value)
+ }
+
+ return buf.String()
+}
diff --git a/vendor/github.com/armon/go-metrics/metrics.go b/vendor/github.com/armon/go-metrics/metrics.go
old mode 100755
new mode 100644
index b818e41..cf9def7
--- a/vendor/github.com/armon/go-metrics/metrics.go
+++ b/vendor/github.com/armon/go-metrics/metrics.go
@@ -2,20 +2,44 @@
import (
"runtime"
+ "strings"
"time"
+
+ "github.com/hashicorp/go-immutable-radix"
)
+type Label struct {
+ Name string
+ Value string
+}
+
func (m *Metrics) SetGauge(key []string, val float32) {
- if m.HostName != "" && m.EnableHostname {
- key = insert(0, m.HostName, key)
+ m.SetGaugeWithLabels(key, val, nil)
+}
+
+func (m *Metrics) SetGaugeWithLabels(key []string, val float32, labels []Label) {
+ if m.HostName != "" {
+ if m.EnableHostnameLabel {
+ labels = append(labels, Label{"host", m.HostName})
+ } else if m.EnableHostname {
+ key = insert(0, m.HostName, key)
+ }
}
if m.EnableTypePrefix {
key = insert(0, "gauge", key)
}
if m.ServiceName != "" {
- key = insert(0, m.ServiceName, key)
+ if m.EnableServiceLabel {
+ labels = append(labels, Label{"service", m.ServiceName})
+ } else {
+ key = insert(0, m.ServiceName, key)
+ }
}
- m.sink.SetGauge(key, val)
+ allowed, labelsFiltered := m.allowMetric(key, labels)
+ if !allowed {
+ return
+ }
+ m.sink.SetGaugeWithLabels(key, val, labelsFiltered)
}
func (m *Metrics) EmitKey(key []string, val float32) {
@@ -25,40 +49,179 @@
if m.ServiceName != "" {
key = insert(0, m.ServiceName, key)
}
+ allowed, _ := m.allowMetric(key, nil)
+ if !allowed {
+ return
+ }
m.sink.EmitKey(key, val)
}
func (m *Metrics) IncrCounter(key []string, val float32) {
+ m.IncrCounterWithLabels(key, val, nil)
+}
+
+func (m *Metrics) IncrCounterWithLabels(key []string, val float32, labels []Label) {
+ if m.HostName != "" && m.EnableHostnameLabel {
+ labels = append(labels, Label{"host", m.HostName})
+ }
if m.EnableTypePrefix {
key = insert(0, "counter", key)
}
if m.ServiceName != "" {
- key = insert(0, m.ServiceName, key)
+ if m.EnableServiceLabel {
+ labels = append(labels, Label{"service", m.ServiceName})
+ } else {
+ key = insert(0, m.ServiceName, key)
+ }
}
- m.sink.IncrCounter(key, val)
+ allowed, labelsFiltered := m.allowMetric(key, labels)
+ if !allowed {
+ return
+ }
+ m.sink.IncrCounterWithLabels(key, val, labelsFiltered)
}
func (m *Metrics) AddSample(key []string, val float32) {
+ m.AddSampleWithLabels(key, val, nil)
+}
+
+func (m *Metrics) AddSampleWithLabels(key []string, val float32, labels []Label) {
+ if m.HostName != "" && m.EnableHostnameLabel {
+ labels = append(labels, Label{"host", m.HostName})
+ }
if m.EnableTypePrefix {
key = insert(0, "sample", key)
}
if m.ServiceName != "" {
- key = insert(0, m.ServiceName, key)
+ if m.EnableServiceLabel {
+ labels = append(labels, Label{"service", m.ServiceName})
+ } else {
+ key = insert(0, m.ServiceName, key)
+ }
}
- m.sink.AddSample(key, val)
+ allowed, labelsFiltered := m.allowMetric(key, labels)
+ if !allowed {
+ return
+ }
+ m.sink.AddSampleWithLabels(key, val, labelsFiltered)
}
func (m *Metrics) MeasureSince(key []string, start time.Time) {
+ m.MeasureSinceWithLabels(key, start, nil)
+}
+
+func (m *Metrics) MeasureSinceWithLabels(key []string, start time.Time, labels []Label) {
+ if m.HostName != "" && m.EnableHostnameLabel {
+ labels = append(labels, Label{"host", m.HostName})
+ }
if m.EnableTypePrefix {
key = insert(0, "timer", key)
}
if m.ServiceName != "" {
- key = insert(0, m.ServiceName, key)
+ if m.EnableServiceLabel {
+ labels = append(labels, Label{"service", m.ServiceName})
+ } else {
+ key = insert(0, m.ServiceName, key)
+ }
+ }
+ allowed, labelsFiltered := m.allowMetric(key, labels)
+ if !allowed {
+ return
}
now := time.Now()
elapsed := now.Sub(start)
msec := float32(elapsed.Nanoseconds()) / float32(m.TimerGranularity)
- m.sink.AddSample(key, msec)
+ m.sink.AddSampleWithLabels(key, msec, labelsFiltered)
+}
+
+// UpdateFilter overwrites the existing filter with the given rules.
+func (m *Metrics) UpdateFilter(allow, block []string) {
+ m.UpdateFilterAndLabels(allow, block, m.AllowedLabels, m.BlockedLabels)
+}
+
+// UpdateFilterAndLabels overwrites the existing filter with the given rules.
+func (m *Metrics) UpdateFilterAndLabels(allow, block, allowedLabels, blockedLabels []string) {
+ m.filterLock.Lock()
+ defer m.filterLock.Unlock()
+
+ m.AllowedPrefixes = allow
+ m.BlockedPrefixes = block
+
+ if allowedLabels == nil {
+ // Having a white list means we take only elements from it
+ m.allowedLabels = nil
+ } else {
+ m.allowedLabels = make(map[string]bool)
+ for _, v := range allowedLabels {
+ m.allowedLabels[v] = true
+ }
+ }
+ m.blockedLabels = make(map[string]bool)
+ for _, v := range blockedLabels {
+ m.blockedLabels[v] = true
+ }
+ m.AllowedLabels = allowedLabels
+ m.BlockedLabels = blockedLabels
+
+ m.filter = iradix.New()
+ for _, prefix := range m.AllowedPrefixes {
+ m.filter, _, _ = m.filter.Insert([]byte(prefix), true)
+ }
+ for _, prefix := range m.BlockedPrefixes {
+ m.filter, _, _ = m.filter.Insert([]byte(prefix), false)
+ }
+}
+
+// labelIsAllowed return true if a should be included in metric
+// the caller should lock m.filterLock while calling this method
+func (m *Metrics) labelIsAllowed(label *Label) bool {
+ labelName := (*label).Name
+ if m.blockedLabels != nil {
+ _, ok := m.blockedLabels[labelName]
+ if ok {
+ // If present, let's remove this label
+ return false
+ }
+ }
+ if m.allowedLabels != nil {
+ _, ok := m.allowedLabels[labelName]
+ return ok
+ }
+ // Allow by default
+ return true
+}
+
+// filterLabels return only allowed labels
+// the caller should lock m.filterLock while calling this method
+func (m *Metrics) filterLabels(labels []Label) []Label {
+ if labels == nil {
+ return nil
+ }
+ toReturn := labels[:0]
+ for _, label := range labels {
+ if m.labelIsAllowed(&label) {
+ toReturn = append(toReturn, label)
+ }
+ }
+ return toReturn
+}
+
+// Returns whether the metric should be allowed based on configured prefix filters
+// Also return the applicable labels
+func (m *Metrics) allowMetric(key []string, labels []Label) (bool, []Label) {
+ m.filterLock.RLock()
+ defer m.filterLock.RUnlock()
+
+ if m.filter == nil || m.filter.Len() == 0 {
+ return m.Config.FilterDefault, m.filterLabels(labels)
+ }
+
+ _, allowed, ok := m.filter.Root().LongestPrefix([]byte(strings.Join(key, ".")))
+ if !ok {
+ return m.Config.FilterDefault, m.filterLabels(labels)
+ }
+
+ return allowed.(bool), m.filterLabels(labels)
}
// Periodically collects runtime stats to publish
diff --git a/vendor/github.com/armon/go-metrics/sink.go b/vendor/github.com/armon/go-metrics/sink.go
old mode 100755
new mode 100644
index 0c240c2..0b7d6e4
--- a/vendor/github.com/armon/go-metrics/sink.go
+++ b/vendor/github.com/armon/go-metrics/sink.go
@@ -1,35 +1,50 @@
package metrics
+import (
+ "fmt"
+ "net/url"
+)
+
// The MetricSink interface is used to transmit metrics information
// to an external system
type MetricSink interface {
// A Gauge should retain the last value it is set to
SetGauge(key []string, val float32)
+ SetGaugeWithLabels(key []string, val float32, labels []Label)
// Should emit a Key/Value pair for each call
EmitKey(key []string, val float32)
// Counters should accumulate values
IncrCounter(key []string, val float32)
+ IncrCounterWithLabels(key []string, val float32, labels []Label)
// Samples are for timing information, where quantiles are used
AddSample(key []string, val float32)
+ AddSampleWithLabels(key []string, val float32, labels []Label)
}
// BlackholeSink is used to just blackhole messages
type BlackholeSink struct{}
-func (*BlackholeSink) SetGauge(key []string, val float32) {}
-func (*BlackholeSink) EmitKey(key []string, val float32) {}
-func (*BlackholeSink) IncrCounter(key []string, val float32) {}
-func (*BlackholeSink) AddSample(key []string, val float32) {}
+func (*BlackholeSink) SetGauge(key []string, val float32) {}
+func (*BlackholeSink) SetGaugeWithLabels(key []string, val float32, labels []Label) {}
+func (*BlackholeSink) EmitKey(key []string, val float32) {}
+func (*BlackholeSink) IncrCounter(key []string, val float32) {}
+func (*BlackholeSink) IncrCounterWithLabels(key []string, val float32, labels []Label) {}
+func (*BlackholeSink) AddSample(key []string, val float32) {}
+func (*BlackholeSink) AddSampleWithLabels(key []string, val float32, labels []Label) {}
// FanoutSink is used to sink to fanout values to multiple sinks
type FanoutSink []MetricSink
func (fh FanoutSink) SetGauge(key []string, val float32) {
+ fh.SetGaugeWithLabels(key, val, nil)
+}
+
+func (fh FanoutSink) SetGaugeWithLabels(key []string, val float32, labels []Label) {
for _, s := range fh {
- s.SetGauge(key, val)
+ s.SetGaugeWithLabels(key, val, labels)
}
}
@@ -40,13 +55,61 @@
}
func (fh FanoutSink) IncrCounter(key []string, val float32) {
+ fh.IncrCounterWithLabels(key, val, nil)
+}
+
+func (fh FanoutSink) IncrCounterWithLabels(key []string, val float32, labels []Label) {
for _, s := range fh {
- s.IncrCounter(key, val)
+ s.IncrCounterWithLabels(key, val, labels)
}
}
func (fh FanoutSink) AddSample(key []string, val float32) {
+ fh.AddSampleWithLabels(key, val, nil)
+}
+
+func (fh FanoutSink) AddSampleWithLabels(key []string, val float32, labels []Label) {
for _, s := range fh {
- s.AddSample(key, val)
+ s.AddSampleWithLabels(key, val, labels)
}
}
+
+// sinkURLFactoryFunc is an generic interface around the *SinkFromURL() function provided
+// by each sink type
+type sinkURLFactoryFunc func(*url.URL) (MetricSink, error)
+
+// sinkRegistry supports the generic NewMetricSink function by mapping URL
+// schemes to metric sink factory functions
+var sinkRegistry = map[string]sinkURLFactoryFunc{
+ "statsd": NewStatsdSinkFromURL,
+ "statsite": NewStatsiteSinkFromURL,
+ "inmem": NewInmemSinkFromURL,
+}
+
+// NewMetricSinkFromURL allows a generic URL input to configure any of the
+// supported sinks. The scheme of the URL identifies the type of the sink, the
+// and query parameters are used to set options.
+//
+// "statsd://" - Initializes a StatsdSink. The host and port are passed through
+// as the "addr" of the sink
+//
+// "statsite://" - Initializes a StatsiteSink. The host and port become the
+// "addr" of the sink
+//
+// "inmem://" - Initializes an InmemSink. The host and port are ignored. The
+// "interval" and "duration" query parameters must be specified with valid
+// durations, see NewInmemSink for details.
+func NewMetricSinkFromURL(urlStr string) (MetricSink, error) {
+ u, err := url.Parse(urlStr)
+ if err != nil {
+ return nil, err
+ }
+
+ sinkURLFactoryFunc := sinkRegistry[u.Scheme]
+ if sinkURLFactoryFunc == nil {
+ return nil, fmt.Errorf(
+ "cannot create metric sink, unrecognized sink name: %q", u.Scheme)
+ }
+
+ return sinkURLFactoryFunc(u)
+}
diff --git a/vendor/github.com/armon/go-metrics/start.go b/vendor/github.com/armon/go-metrics/start.go
old mode 100755
new mode 100644
index 44113f1..32a28c4
--- a/vendor/github.com/armon/go-metrics/start.go
+++ b/vendor/github.com/armon/go-metrics/start.go
@@ -2,34 +2,50 @@
import (
"os"
+ "sync"
+ "sync/atomic"
"time"
+
+ "github.com/hashicorp/go-immutable-radix"
)
// Config is used to configure metrics settings
type Config struct {
- ServiceName string // Prefixed with keys to seperate services
+ ServiceName string // Prefixed with keys to separate services
HostName string // Hostname to use. If not provided and EnableHostname, it will be os.Hostname
EnableHostname bool // Enable prefixing gauge values with hostname
+ EnableHostnameLabel bool // Enable adding hostname to labels
+ EnableServiceLabel bool // Enable adding service to labels
EnableRuntimeMetrics bool // Enables profiling of runtime metrics (GC, Goroutines, Memory)
EnableTypePrefix bool // Prefixes key with a type ("counter", "gauge", "timer")
TimerGranularity time.Duration // Granularity of timers.
ProfileInterval time.Duration // Interval to profile runtime metrics
+
+ AllowedPrefixes []string // A list of metric prefixes to allow, with '.' as the separator
+ BlockedPrefixes []string // A list of metric prefixes to block, with '.' as the separator
+ AllowedLabels []string // A list of metric labels to allow, with '.' as the separator
+ BlockedLabels []string // A list of metric labels to block, with '.' as the separator
+ FilterDefault bool // Whether to allow metrics by default
}
// Metrics represents an instance of a metrics sink that can
// be used to emit
type Metrics struct {
Config
- lastNumGC uint32
- sink MetricSink
+ lastNumGC uint32
+ sink MetricSink
+ filter *iradix.Tree
+ allowedLabels map[string]bool
+ blockedLabels map[string]bool
+ filterLock sync.RWMutex // Lock filters and allowedLabels/blockedLabels access
}
// Shared global metrics instance
-var globalMetrics *Metrics
+var globalMetrics atomic.Value // *Metrics
func init() {
// Initialize to a blackhole sink to avoid errors
- globalMetrics = &Metrics{sink: &BlackholeSink{}}
+ globalMetrics.Store(&Metrics{sink: &BlackholeSink{}})
}
// DefaultConfig provides a sane default configuration
@@ -42,6 +58,7 @@
EnableTypePrefix: false, // Disable type prefix
TimerGranularity: time.Millisecond, // Timers are in milliseconds
ProfileInterval: time.Second, // Poll runtime every second
+ FilterDefault: true, // Don't filter metrics by default
}
// Try to get the hostname
@@ -55,6 +72,7 @@
met := &Metrics{}
met.Config = *conf
met.sink = sink
+ met.UpdateFilterAndLabels(conf.AllowedPrefixes, conf.BlockedPrefixes, conf.AllowedLabels, conf.BlockedLabels)
// Start the runtime collector
if conf.EnableRuntimeMetrics {
@@ -68,28 +86,56 @@
func NewGlobal(conf *Config, sink MetricSink) (*Metrics, error) {
metrics, err := New(conf, sink)
if err == nil {
- globalMetrics = metrics
+ globalMetrics.Store(metrics)
}
return metrics, err
}
// Proxy all the methods to the globalMetrics instance
func SetGauge(key []string, val float32) {
- globalMetrics.SetGauge(key, val)
+ globalMetrics.Load().(*Metrics).SetGauge(key, val)
+}
+
+func SetGaugeWithLabels(key []string, val float32, labels []Label) {
+ globalMetrics.Load().(*Metrics).SetGaugeWithLabels(key, val, labels)
}
func EmitKey(key []string, val float32) {
- globalMetrics.EmitKey(key, val)
+ globalMetrics.Load().(*Metrics).EmitKey(key, val)
}
func IncrCounter(key []string, val float32) {
- globalMetrics.IncrCounter(key, val)
+ globalMetrics.Load().(*Metrics).IncrCounter(key, val)
+}
+
+func IncrCounterWithLabels(key []string, val float32, labels []Label) {
+ globalMetrics.Load().(*Metrics).IncrCounterWithLabels(key, val, labels)
}
func AddSample(key []string, val float32) {
- globalMetrics.AddSample(key, val)
+ globalMetrics.Load().(*Metrics).AddSample(key, val)
+}
+
+func AddSampleWithLabels(key []string, val float32, labels []Label) {
+ globalMetrics.Load().(*Metrics).AddSampleWithLabels(key, val, labels)
}
func MeasureSince(key []string, start time.Time) {
- globalMetrics.MeasureSince(key, start)
+ globalMetrics.Load().(*Metrics).MeasureSince(key, start)
+}
+
+func MeasureSinceWithLabels(key []string, start time.Time, labels []Label) {
+ globalMetrics.Load().(*Metrics).MeasureSinceWithLabels(key, start, labels)
+}
+
+func UpdateFilter(allow, block []string) {
+ globalMetrics.Load().(*Metrics).UpdateFilter(allow, block)
+}
+
+// UpdateFilterAndLabels set allow/block prefixes of metrics while allowedLabels
+// and blockedLabels - when not nil - allow filtering of labels in order to
+// block/allow globally labels (especially useful when having large number of
+// values for a given label). See README.md for more information about usage.
+func UpdateFilterAndLabels(allow, block, allowedLabels, blockedLabels []string) {
+ globalMetrics.Load().(*Metrics).UpdateFilterAndLabels(allow, block, allowedLabels, blockedLabels)
}
diff --git a/vendor/github.com/armon/go-metrics/statsd.go b/vendor/github.com/armon/go-metrics/statsd.go
index 65a5021..1bfffce 100644
--- a/vendor/github.com/armon/go-metrics/statsd.go
+++ b/vendor/github.com/armon/go-metrics/statsd.go
@@ -5,6 +5,7 @@
"fmt"
"log"
"net"
+ "net/url"
"strings"
"time"
)
@@ -23,6 +24,12 @@
metricQueue chan string
}
+// NewStatsdSinkFromURL creates an StatsdSink from a URL. It is used
+// (and tested) from NewMetricSinkFromURL.
+func NewStatsdSinkFromURL(u *url.URL) (MetricSink, error) {
+ return NewStatsdSink(u.Host)
+}
+
// NewStatsdSink is used to create a new StatsdSink
func NewStatsdSink(addr string) (*StatsdSink, error) {
s := &StatsdSink{
@@ -43,6 +50,11 @@
s.pushMetric(fmt.Sprintf("%s:%f|g\n", flatKey, val))
}
+func (s *StatsdSink) SetGaugeWithLabels(key []string, val float32, labels []Label) {
+ flatKey := s.flattenKeyLabels(key, labels)
+ s.pushMetric(fmt.Sprintf("%s:%f|g\n", flatKey, val))
+}
+
func (s *StatsdSink) EmitKey(key []string, val float32) {
flatKey := s.flattenKey(key)
s.pushMetric(fmt.Sprintf("%s:%f|kv\n", flatKey, val))
@@ -53,11 +65,21 @@
s.pushMetric(fmt.Sprintf("%s:%f|c\n", flatKey, val))
}
+func (s *StatsdSink) IncrCounterWithLabels(key []string, val float32, labels []Label) {
+ flatKey := s.flattenKeyLabels(key, labels)
+ s.pushMetric(fmt.Sprintf("%s:%f|c\n", flatKey, val))
+}
+
func (s *StatsdSink) AddSample(key []string, val float32) {
flatKey := s.flattenKey(key)
s.pushMetric(fmt.Sprintf("%s:%f|ms\n", flatKey, val))
}
+func (s *StatsdSink) AddSampleWithLabels(key []string, val float32, labels []Label) {
+ flatKey := s.flattenKeyLabels(key, labels)
+ s.pushMetric(fmt.Sprintf("%s:%f|ms\n", flatKey, val))
+}
+
// Flattens the key for formatting, removes spaces
func (s *StatsdSink) flattenKey(parts []string) string {
joined := strings.Join(parts, ".")
@@ -73,6 +95,14 @@
}, joined)
}
+// Flattens the key along with labels for formatting, removes spaces
+func (s *StatsdSink) flattenKeyLabels(parts []string, labels []Label) string {
+ for _, label := range labels {
+ parts = append(parts, label.Value)
+ }
+ return s.flattenKey(parts)
+}
+
// Does a non-blocking push to the metrics queue
func (s *StatsdSink) pushMetric(m string) {
select {
diff --git a/vendor/github.com/armon/go-metrics/statsite.go b/vendor/github.com/armon/go-metrics/statsite.go
old mode 100755
new mode 100644
index 6873013..6c0d284
--- a/vendor/github.com/armon/go-metrics/statsite.go
+++ b/vendor/github.com/armon/go-metrics/statsite.go
@@ -5,6 +5,7 @@
"fmt"
"log"
"net"
+ "net/url"
"strings"
"time"
)
@@ -16,6 +17,12 @@
flushInterval = 100 * time.Millisecond
)
+// NewStatsiteSinkFromURL creates an StatsiteSink from a URL. It is used
+// (and tested) from NewMetricSinkFromURL.
+func NewStatsiteSinkFromURL(u *url.URL) (MetricSink, error) {
+ return NewStatsiteSink(u.Host)
+}
+
// StatsiteSink provides a MetricSink that can be used with a
// statsite metrics server
type StatsiteSink struct {
@@ -43,6 +50,11 @@
s.pushMetric(fmt.Sprintf("%s:%f|g\n", flatKey, val))
}
+func (s *StatsiteSink) SetGaugeWithLabels(key []string, val float32, labels []Label) {
+ flatKey := s.flattenKeyLabels(key, labels)
+ s.pushMetric(fmt.Sprintf("%s:%f|g\n", flatKey, val))
+}
+
func (s *StatsiteSink) EmitKey(key []string, val float32) {
flatKey := s.flattenKey(key)
s.pushMetric(fmt.Sprintf("%s:%f|kv\n", flatKey, val))
@@ -53,11 +65,21 @@
s.pushMetric(fmt.Sprintf("%s:%f|c\n", flatKey, val))
}
+func (s *StatsiteSink) IncrCounterWithLabels(key []string, val float32, labels []Label) {
+ flatKey := s.flattenKeyLabels(key, labels)
+ s.pushMetric(fmt.Sprintf("%s:%f|c\n", flatKey, val))
+}
+
func (s *StatsiteSink) AddSample(key []string, val float32) {
flatKey := s.flattenKey(key)
s.pushMetric(fmt.Sprintf("%s:%f|ms\n", flatKey, val))
}
+func (s *StatsiteSink) AddSampleWithLabels(key []string, val float32, labels []Label) {
+ flatKey := s.flattenKeyLabels(key, labels)
+ s.pushMetric(fmt.Sprintf("%s:%f|ms\n", flatKey, val))
+}
+
// Flattens the key for formatting, removes spaces
func (s *StatsiteSink) flattenKey(parts []string) string {
joined := strings.Join(parts, ".")
@@ -73,6 +95,14 @@
}, joined)
}
+// Flattens the key along with labels for formatting, removes spaces
+func (s *StatsiteSink) flattenKeyLabels(parts []string, labels []Label) string {
+ for _, label := range labels {
+ parts = append(parts, label.Value)
+ }
+ return s.flattenKey(parts)
+}
+
// Does a non-blocking push to the metrics queue
func (s *StatsiteSink) pushMetric(m string) {
select {