| package metrics |
| |
| import ( |
| "bytes" |
| "fmt" |
| "math" |
| "net/url" |
| "strings" |
| "sync" |
| "time" |
| ) |
| |
| // InmemSink provides a MetricSink that does in-memory aggregation |
| // without sending metrics over a network. It can be embedded within |
| // an application to provide profiling information. |
| type InmemSink struct { |
| // How long is each aggregation interval |
| interval time.Duration |
| |
| // Retain controls how many metrics interval we keep |
| retain time.Duration |
| |
| // maxIntervals is the maximum length of intervals. |
| // It is retain / interval. |
| maxIntervals int |
| |
| // intervals is a slice of the retained intervals |
| intervals []*IntervalMetrics |
| intervalLock sync.RWMutex |
| |
| rateDenom float64 |
| } |
| |
| // IntervalMetrics stores the aggregated metrics |
| // for a specific interval |
| type IntervalMetrics struct { |
| sync.RWMutex |
| |
| // The start time of the interval |
| Interval time.Time |
| |
| // Gauges maps the key to the last set value |
| Gauges map[string]GaugeValue |
| |
| // Points maps the string to the list of emitted values |
| // from EmitKey |
| Points map[string][]float32 |
| |
| // Counters maps the string key to a sum of the counter |
| // values |
| Counters map[string]SampledValue |
| |
| // Samples maps the key to an AggregateSample, |
| // which has the rolled up view of a sample |
| Samples map[string]SampledValue |
| } |
| |
| // NewIntervalMetrics creates a new IntervalMetrics for a given interval |
| func NewIntervalMetrics(intv time.Time) *IntervalMetrics { |
| return &IntervalMetrics{ |
| Interval: intv, |
| Gauges: make(map[string]GaugeValue), |
| Points: make(map[string][]float32), |
| Counters: make(map[string]SampledValue), |
| Samples: make(map[string]SampledValue), |
| } |
| } |
| |
| // AggregateSample is used to hold aggregate metrics |
| // about a sample |
| type AggregateSample struct { |
| Count int // The count of emitted pairs |
| Rate float64 // The values rate per time unit (usually 1 second) |
| Sum float64 // The sum of values |
| SumSq float64 `json:"-"` // The sum of squared values |
| Min float64 // Minimum value |
| Max float64 // Maximum value |
| LastUpdated time.Time `json:"-"` // When value was last updated |
| } |
| |
| // Computes a Stddev of the values |
| func (a *AggregateSample) Stddev() float64 { |
| num := (float64(a.Count) * a.SumSq) - math.Pow(a.Sum, 2) |
| div := float64(a.Count * (a.Count - 1)) |
| if div == 0 { |
| return 0 |
| } |
| return math.Sqrt(num / div) |
| } |
| |
| // Computes a mean of the values |
| func (a *AggregateSample) Mean() float64 { |
| if a.Count == 0 { |
| return 0 |
| } |
| return a.Sum / float64(a.Count) |
| } |
| |
| // Ingest is used to update a sample |
| func (a *AggregateSample) Ingest(v float64, rateDenom float64) { |
| a.Count++ |
| a.Sum += v |
| a.SumSq += (v * v) |
| if v < a.Min || a.Count == 1 { |
| a.Min = v |
| } |
| if v > a.Max || a.Count == 1 { |
| a.Max = v |
| } |
| a.Rate = float64(a.Sum) / rateDenom |
| a.LastUpdated = time.Now() |
| } |
| |
| func (a *AggregateSample) String() string { |
| if a.Count == 0 { |
| return "Count: 0" |
| } else if a.Stddev() == 0 { |
| return fmt.Sprintf("Count: %d Sum: %0.3f LastUpdated: %s", a.Count, a.Sum, a.LastUpdated) |
| } else { |
| return fmt.Sprintf("Count: %d Min: %0.3f Mean: %0.3f Max: %0.3f Stddev: %0.3f Sum: %0.3f LastUpdated: %s", |
| a.Count, a.Min, a.Mean(), a.Max, a.Stddev(), a.Sum, a.LastUpdated) |
| } |
| } |
| |
| // NewInmemSinkFromURL creates an InmemSink from a URL. It is used |
| // (and tested) from NewMetricSinkFromURL. |
| func NewInmemSinkFromURL(u *url.URL) (MetricSink, error) { |
| params := u.Query() |
| |
| interval, err := time.ParseDuration(params.Get("interval")) |
| if err != nil { |
| return nil, fmt.Errorf("Bad 'interval' param: %s", err) |
| } |
| |
| retain, err := time.ParseDuration(params.Get("retain")) |
| if err != nil { |
| return nil, fmt.Errorf("Bad 'retain' param: %s", err) |
| } |
| |
| return NewInmemSink(interval, retain), nil |
| } |
| |
| // NewInmemSink is used to construct a new in-memory sink. |
| // Uses an aggregation interval and maximum retention period. |
| func NewInmemSink(interval, retain time.Duration) *InmemSink { |
| rateTimeUnit := time.Second |
| i := &InmemSink{ |
| interval: interval, |
| retain: retain, |
| maxIntervals: int(retain / interval), |
| rateDenom: float64(interval.Nanoseconds()) / float64(rateTimeUnit.Nanoseconds()), |
| } |
| i.intervals = make([]*IntervalMetrics, 0, i.maxIntervals) |
| return i |
| } |
| |
| func (i *InmemSink) SetGauge(key []string, val float32) { |
| i.SetGaugeWithLabels(key, val, nil) |
| } |
| |
| func (i *InmemSink) SetGaugeWithLabels(key []string, val float32, labels []Label) { |
| k, name := i.flattenKeyLabels(key, labels) |
| intv := i.getInterval() |
| |
| intv.Lock() |
| defer intv.Unlock() |
| intv.Gauges[k] = GaugeValue{Name: name, Value: val, Labels: labels} |
| } |
| |
| func (i *InmemSink) EmitKey(key []string, val float32) { |
| k := i.flattenKey(key) |
| intv := i.getInterval() |
| |
| intv.Lock() |
| defer intv.Unlock() |
| vals := intv.Points[k] |
| intv.Points[k] = append(vals, val) |
| } |
| |
| func (i *InmemSink) IncrCounter(key []string, val float32) { |
| i.IncrCounterWithLabels(key, val, nil) |
| } |
| |
| func (i *InmemSink) IncrCounterWithLabels(key []string, val float32, labels []Label) { |
| k, name := i.flattenKeyLabels(key, labels) |
| intv := i.getInterval() |
| |
| intv.Lock() |
| defer intv.Unlock() |
| |
| agg, ok := intv.Counters[k] |
| if !ok { |
| agg = SampledValue{ |
| Name: name, |
| AggregateSample: &AggregateSample{}, |
| Labels: labels, |
| } |
| intv.Counters[k] = agg |
| } |
| agg.Ingest(float64(val), i.rateDenom) |
| } |
| |
| func (i *InmemSink) AddSample(key []string, val float32) { |
| i.AddSampleWithLabels(key, val, nil) |
| } |
| |
| func (i *InmemSink) AddSampleWithLabels(key []string, val float32, labels []Label) { |
| k, name := i.flattenKeyLabels(key, labels) |
| intv := i.getInterval() |
| |
| intv.Lock() |
| defer intv.Unlock() |
| |
| agg, ok := intv.Samples[k] |
| if !ok { |
| agg = SampledValue{ |
| Name: name, |
| AggregateSample: &AggregateSample{}, |
| Labels: labels, |
| } |
| intv.Samples[k] = agg |
| } |
| agg.Ingest(float64(val), i.rateDenom) |
| } |
| |
| // Data is used to retrieve all the aggregated metrics |
| // Intervals may be in use, and a read lock should be acquired |
| func (i *InmemSink) Data() []*IntervalMetrics { |
| // Get the current interval, forces creation |
| i.getInterval() |
| |
| i.intervalLock.RLock() |
| defer i.intervalLock.RUnlock() |
| |
| n := len(i.intervals) |
| intervals := make([]*IntervalMetrics, n) |
| |
| copy(intervals[:n-1], i.intervals[:n-1]) |
| current := i.intervals[n-1] |
| |
| // make its own copy for current interval |
| intervals[n-1] = &IntervalMetrics{} |
| copyCurrent := intervals[n-1] |
| current.RLock() |
| *copyCurrent = *current |
| |
| copyCurrent.Gauges = make(map[string]GaugeValue, len(current.Gauges)) |
| for k, v := range current.Gauges { |
| copyCurrent.Gauges[k] = v |
| } |
| // saved values will be not change, just copy its link |
| copyCurrent.Points = make(map[string][]float32, len(current.Points)) |
| for k, v := range current.Points { |
| copyCurrent.Points[k] = v |
| } |
| copyCurrent.Counters = make(map[string]SampledValue, len(current.Counters)) |
| for k, v := range current.Counters { |
| copyCurrent.Counters[k] = v |
| } |
| copyCurrent.Samples = make(map[string]SampledValue, len(current.Samples)) |
| for k, v := range current.Samples { |
| copyCurrent.Samples[k] = v |
| } |
| current.RUnlock() |
| |
| return intervals |
| } |
| |
| func (i *InmemSink) getExistingInterval(intv time.Time) *IntervalMetrics { |
| i.intervalLock.RLock() |
| defer i.intervalLock.RUnlock() |
| |
| n := len(i.intervals) |
| if n > 0 && i.intervals[n-1].Interval == intv { |
| return i.intervals[n-1] |
| } |
| return nil |
| } |
| |
| func (i *InmemSink) createInterval(intv time.Time) *IntervalMetrics { |
| i.intervalLock.Lock() |
| defer i.intervalLock.Unlock() |
| |
| // Check for an existing interval |
| n := len(i.intervals) |
| if n > 0 && i.intervals[n-1].Interval == intv { |
| return i.intervals[n-1] |
| } |
| |
| // Add the current interval |
| current := NewIntervalMetrics(intv) |
| i.intervals = append(i.intervals, current) |
| n++ |
| |
| // Truncate the intervals if they are too long |
| if n >= i.maxIntervals { |
| copy(i.intervals[0:], i.intervals[n-i.maxIntervals:]) |
| i.intervals = i.intervals[:i.maxIntervals] |
| } |
| return current |
| } |
| |
| // getInterval returns the current interval to write to |
| func (i *InmemSink) getInterval() *IntervalMetrics { |
| intv := time.Now().Truncate(i.interval) |
| if m := i.getExistingInterval(intv); m != nil { |
| return m |
| } |
| return i.createInterval(intv) |
| } |
| |
| // Flattens the key for formatting, removes spaces |
| func (i *InmemSink) flattenKey(parts []string) string { |
| buf := &bytes.Buffer{} |
| replacer := strings.NewReplacer(" ", "_") |
| |
| if len(parts) > 0 { |
| replacer.WriteString(buf, parts[0]) |
| } |
| for _, part := range parts[1:] { |
| replacer.WriteString(buf, ".") |
| replacer.WriteString(buf, part) |
| } |
| |
| return buf.String() |
| } |
| |
| // Flattens the key for formatting along with its labels, removes spaces |
| func (i *InmemSink) flattenKeyLabels(parts []string, labels []Label) (string, string) { |
| buf := &bytes.Buffer{} |
| replacer := strings.NewReplacer(" ", "_") |
| |
| if len(parts) > 0 { |
| replacer.WriteString(buf, parts[0]) |
| } |
| for _, part := range parts[1:] { |
| replacer.WriteString(buf, ".") |
| replacer.WriteString(buf, part) |
| } |
| |
| key := buf.String() |
| |
| for _, label := range labels { |
| replacer.WriteString(buf, fmt.Sprintf(";%s=%s", label.Name, label.Value)) |
| } |
| |
| return buf.String(), key |
| } |