VOL-1497 : Add more control to kv/memory access
- Added kv locking mechanism (etcd only)
- (watch) control path access whenever possible
- (watch) use a transaction for updates and merge with memory
- cleaned up vendoring
- misc changes to fix exceptions found along the way
Amendments:
- Copyright header got removed in auto-generated file
- Changed default locking to false for KV list operation
- Updated backend api to allow the passing of locking parameter
Change-Id: Ie1a55d3ca8b9d92ae71a85ce42bb22fcf1419e2c
diff --git a/vendor/github.com/armon/go-metrics/inmem.go b/vendor/github.com/armon/go-metrics/inmem.go
index 83fb6bb..4e2d6a7 100644
--- a/vendor/github.com/armon/go-metrics/inmem.go
+++ b/vendor/github.com/armon/go-metrics/inmem.go
@@ -1,8 +1,10 @@
package metrics
import (
+ "bytes"
"fmt"
"math"
+ "net/url"
"strings"
"sync"
"time"
@@ -25,7 +27,7 @@
// intervals is a slice of the retained intervals
intervals []*IntervalMetrics
intervalLock sync.RWMutex
-
+
rateDenom float64
}
@@ -38,7 +40,7 @@
Interval time.Time
// Gauges maps the key to the last set value
- Gauges map[string]float32
+ Gauges map[string]GaugeValue
// Points maps the string to the list of emitted values
// from EmitKey
@@ -46,21 +48,21 @@
// Counters maps the string key to a sum of the counter
// values
- Counters map[string]*AggregateSample
+ Counters map[string]SampledValue
// Samples maps the key to an AggregateSample,
// which has the rolled up view of a sample
- Samples map[string]*AggregateSample
+ Samples map[string]SampledValue
}
// NewIntervalMetrics creates a new IntervalMetrics for a given interval
func NewIntervalMetrics(intv time.Time) *IntervalMetrics {
return &IntervalMetrics{
Interval: intv,
- Gauges: make(map[string]float32),
+ Gauges: make(map[string]GaugeValue),
Points: make(map[string][]float32),
- Counters: make(map[string]*AggregateSample),
- Samples: make(map[string]*AggregateSample),
+ Counters: make(map[string]SampledValue),
+ Samples: make(map[string]SampledValue),
}
}
@@ -68,12 +70,12 @@
// about a sample
type AggregateSample struct {
Count int // The count of emitted pairs
- Rate float64 // The count of emitted pairs per time unit (usually 1 second)
+ Rate float64 // The values rate per time unit (usually 1 second)
Sum float64 // The sum of values
- SumSq float64 // The sum of squared values
+ SumSq float64 `json:"-"` // The sum of squared values
Min float64 // Minimum value
Max float64 // Maximum value
- LastUpdated time.Time // When value was last updated
+ LastUpdated time.Time `json:"-"` // When value was last updated
}
// Computes a Stddev of the values
@@ -105,7 +107,7 @@
if v > a.Max || a.Count == 1 {
a.Max = v
}
- a.Rate = float64(a.Count)/rateDenom
+ a.Rate = float64(a.Sum) / rateDenom
a.LastUpdated = time.Now()
}
@@ -120,6 +122,24 @@
}
}
+// NewInmemSinkFromURL creates an InmemSink from a URL. It is used
+// (and tested) from NewMetricSinkFromURL.
+func NewInmemSinkFromURL(u *url.URL) (MetricSink, error) {
+ params := u.Query()
+
+ interval, err := time.ParseDuration(params.Get("interval"))
+ if err != nil {
+ return nil, fmt.Errorf("Bad 'interval' param: %s", err)
+ }
+
+ retain, err := time.ParseDuration(params.Get("retain"))
+ if err != nil {
+ return nil, fmt.Errorf("Bad 'retain' param: %s", err)
+ }
+
+ return NewInmemSink(interval, retain), nil
+}
+
// NewInmemSink is used to construct a new in-memory sink.
// Uses an aggregation interval and maximum retention period.
func NewInmemSink(interval, retain time.Duration) *InmemSink {
@@ -128,19 +148,23 @@
interval: interval,
retain: retain,
maxIntervals: int(retain / interval),
- rateDenom: float64(interval.Nanoseconds()) / float64(rateTimeUnit.Nanoseconds()),
+ rateDenom: float64(interval.Nanoseconds()) / float64(rateTimeUnit.Nanoseconds()),
}
i.intervals = make([]*IntervalMetrics, 0, i.maxIntervals)
return i
}
func (i *InmemSink) SetGauge(key []string, val float32) {
- k := i.flattenKey(key)
+ i.SetGaugeWithLabels(key, val, nil)
+}
+
+func (i *InmemSink) SetGaugeWithLabels(key []string, val float32, labels []Label) {
+ k, name := i.flattenKeyLabels(key, labels)
intv := i.getInterval()
intv.Lock()
defer intv.Unlock()
- intv.Gauges[k] = val
+ intv.Gauges[k] = GaugeValue{Name: name, Value: val, Labels: labels}
}
func (i *InmemSink) EmitKey(key []string, val float32) {
@@ -154,30 +178,46 @@
}
func (i *InmemSink) IncrCounter(key []string, val float32) {
- k := i.flattenKey(key)
+ i.IncrCounterWithLabels(key, val, nil)
+}
+
+func (i *InmemSink) IncrCounterWithLabels(key []string, val float32, labels []Label) {
+ k, name := i.flattenKeyLabels(key, labels)
intv := i.getInterval()
intv.Lock()
defer intv.Unlock()
- agg := intv.Counters[k]
- if agg == nil {
- agg = &AggregateSample{}
+ agg, ok := intv.Counters[k]
+ if !ok {
+ agg = SampledValue{
+ Name: name,
+ AggregateSample: &AggregateSample{},
+ Labels: labels,
+ }
intv.Counters[k] = agg
}
agg.Ingest(float64(val), i.rateDenom)
}
func (i *InmemSink) AddSample(key []string, val float32) {
- k := i.flattenKey(key)
+ i.AddSampleWithLabels(key, val, nil)
+}
+
+func (i *InmemSink) AddSampleWithLabels(key []string, val float32, labels []Label) {
+ k, name := i.flattenKeyLabels(key, labels)
intv := i.getInterval()
intv.Lock()
defer intv.Unlock()
- agg := intv.Samples[k]
- if agg == nil {
- agg = &AggregateSample{}
+ agg, ok := intv.Samples[k]
+ if !ok {
+ agg = SampledValue{
+ Name: name,
+ AggregateSample: &AggregateSample{},
+ Labels: labels,
+ }
intv.Samples[k] = agg
}
agg.Ingest(float64(val), i.rateDenom)
@@ -192,8 +232,37 @@
i.intervalLock.RLock()
defer i.intervalLock.RUnlock()
- intervals := make([]*IntervalMetrics, len(i.intervals))
- copy(intervals, i.intervals)
+ n := len(i.intervals)
+ intervals := make([]*IntervalMetrics, n)
+
+ copy(intervals[:n-1], i.intervals[:n-1])
+ current := i.intervals[n-1]
+
+ // make its own copy for current interval
+ intervals[n-1] = &IntervalMetrics{}
+ copyCurrent := intervals[n-1]
+ current.RLock()
+ *copyCurrent = *current
+
+ copyCurrent.Gauges = make(map[string]GaugeValue, len(current.Gauges))
+ for k, v := range current.Gauges {
+ copyCurrent.Gauges[k] = v
+ }
+ // saved values will be not change, just copy its link
+ copyCurrent.Points = make(map[string][]float32, len(current.Points))
+ for k, v := range current.Points {
+ copyCurrent.Points[k] = v
+ }
+ copyCurrent.Counters = make(map[string]SampledValue, len(current.Counters))
+ for k, v := range current.Counters {
+ copyCurrent.Counters[k] = v
+ }
+ copyCurrent.Samples = make(map[string]SampledValue, len(current.Samples))
+ for k, v := range current.Samples {
+ copyCurrent.Samples[k] = v
+ }
+ current.RUnlock()
+
return intervals
}
@@ -242,6 +311,38 @@
// Flattens the key for formatting, removes spaces
func (i *InmemSink) flattenKey(parts []string) string {
- joined := strings.Join(parts, ".")
- return strings.Replace(joined, " ", "_", -1)
+ buf := &bytes.Buffer{}
+ replacer := strings.NewReplacer(" ", "_")
+
+ if len(parts) > 0 {
+ replacer.WriteString(buf, parts[0])
+ }
+ for _, part := range parts[1:] {
+ replacer.WriteString(buf, ".")
+ replacer.WriteString(buf, part)
+ }
+
+ return buf.String()
+}
+
+// Flattens the key for formatting along with its labels, removes spaces
+func (i *InmemSink) flattenKeyLabels(parts []string, labels []Label) (string, string) {
+ buf := &bytes.Buffer{}
+ replacer := strings.NewReplacer(" ", "_")
+
+ if len(parts) > 0 {
+ replacer.WriteString(buf, parts[0])
+ }
+ for _, part := range parts[1:] {
+ replacer.WriteString(buf, ".")
+ replacer.WriteString(buf, part)
+ }
+
+ key := buf.String()
+
+ for _, label := range labels {
+ replacer.WriteString(buf, fmt.Sprintf(";%s=%s", label.Name, label.Value))
+ }
+
+ return buf.String(), key
}