blob: 83fb6bba094d9563c3bef13124ef1b813f480f6d [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001package metrics
2
3import (
4 "fmt"
5 "math"
6 "strings"
7 "sync"
8 "time"
9)
10
11// InmemSink provides a MetricSink that does in-memory aggregation
12// without sending metrics over a network. It can be embedded within
13// an application to provide profiling information.
14type InmemSink struct {
15 // How long is each aggregation interval
16 interval time.Duration
17
18 // Retain controls how many metrics interval we keep
19 retain time.Duration
20
21 // maxIntervals is the maximum length of intervals.
22 // It is retain / interval.
23 maxIntervals int
24
25 // intervals is a slice of the retained intervals
26 intervals []*IntervalMetrics
27 intervalLock sync.RWMutex
28
29 rateDenom float64
30}
31
32// IntervalMetrics stores the aggregated metrics
33// for a specific interval
34type IntervalMetrics struct {
35 sync.RWMutex
36
37 // The start time of the interval
38 Interval time.Time
39
40 // Gauges maps the key to the last set value
41 Gauges map[string]float32
42
43 // Points maps the string to the list of emitted values
44 // from EmitKey
45 Points map[string][]float32
46
47 // Counters maps the string key to a sum of the counter
48 // values
49 Counters map[string]*AggregateSample
50
51 // Samples maps the key to an AggregateSample,
52 // which has the rolled up view of a sample
53 Samples map[string]*AggregateSample
54}
55
56// NewIntervalMetrics creates a new IntervalMetrics for a given interval
57func NewIntervalMetrics(intv time.Time) *IntervalMetrics {
58 return &IntervalMetrics{
59 Interval: intv,
60 Gauges: make(map[string]float32),
61 Points: make(map[string][]float32),
62 Counters: make(map[string]*AggregateSample),
63 Samples: make(map[string]*AggregateSample),
64 }
65}
66
67// AggregateSample is used to hold aggregate metrics
68// about a sample
69type AggregateSample struct {
70 Count int // The count of emitted pairs
71 Rate float64 // The count of emitted pairs per time unit (usually 1 second)
72 Sum float64 // The sum of values
73 SumSq float64 // The sum of squared values
74 Min float64 // Minimum value
75 Max float64 // Maximum value
76 LastUpdated time.Time // When value was last updated
77}
78
79// Computes a Stddev of the values
80func (a *AggregateSample) Stddev() float64 {
81 num := (float64(a.Count) * a.SumSq) - math.Pow(a.Sum, 2)
82 div := float64(a.Count * (a.Count - 1))
83 if div == 0 {
84 return 0
85 }
86 return math.Sqrt(num / div)
87}
88
89// Computes a mean of the values
90func (a *AggregateSample) Mean() float64 {
91 if a.Count == 0 {
92 return 0
93 }
94 return a.Sum / float64(a.Count)
95}
96
97// Ingest is used to update a sample
98func (a *AggregateSample) Ingest(v float64, rateDenom float64) {
99 a.Count++
100 a.Sum += v
101 a.SumSq += (v * v)
102 if v < a.Min || a.Count == 1 {
103 a.Min = v
104 }
105 if v > a.Max || a.Count == 1 {
106 a.Max = v
107 }
108 a.Rate = float64(a.Count)/rateDenom
109 a.LastUpdated = time.Now()
110}
111
112func (a *AggregateSample) String() string {
113 if a.Count == 0 {
114 return "Count: 0"
115 } else if a.Stddev() == 0 {
116 return fmt.Sprintf("Count: %d Sum: %0.3f LastUpdated: %s", a.Count, a.Sum, a.LastUpdated)
117 } else {
118 return fmt.Sprintf("Count: %d Min: %0.3f Mean: %0.3f Max: %0.3f Stddev: %0.3f Sum: %0.3f LastUpdated: %s",
119 a.Count, a.Min, a.Mean(), a.Max, a.Stddev(), a.Sum, a.LastUpdated)
120 }
121}
122
123// NewInmemSink is used to construct a new in-memory sink.
124// Uses an aggregation interval and maximum retention period.
125func NewInmemSink(interval, retain time.Duration) *InmemSink {
126 rateTimeUnit := time.Second
127 i := &InmemSink{
128 interval: interval,
129 retain: retain,
130 maxIntervals: int(retain / interval),
131 rateDenom: float64(interval.Nanoseconds()) / float64(rateTimeUnit.Nanoseconds()),
132 }
133 i.intervals = make([]*IntervalMetrics, 0, i.maxIntervals)
134 return i
135}
136
137func (i *InmemSink) SetGauge(key []string, val float32) {
138 k := i.flattenKey(key)
139 intv := i.getInterval()
140
141 intv.Lock()
142 defer intv.Unlock()
143 intv.Gauges[k] = val
144}
145
146func (i *InmemSink) EmitKey(key []string, val float32) {
147 k := i.flattenKey(key)
148 intv := i.getInterval()
149
150 intv.Lock()
151 defer intv.Unlock()
152 vals := intv.Points[k]
153 intv.Points[k] = append(vals, val)
154}
155
156func (i *InmemSink) IncrCounter(key []string, val float32) {
157 k := i.flattenKey(key)
158 intv := i.getInterval()
159
160 intv.Lock()
161 defer intv.Unlock()
162
163 agg := intv.Counters[k]
164 if agg == nil {
165 agg = &AggregateSample{}
166 intv.Counters[k] = agg
167 }
168 agg.Ingest(float64(val), i.rateDenom)
169}
170
171func (i *InmemSink) AddSample(key []string, val float32) {
172 k := i.flattenKey(key)
173 intv := i.getInterval()
174
175 intv.Lock()
176 defer intv.Unlock()
177
178 agg := intv.Samples[k]
179 if agg == nil {
180 agg = &AggregateSample{}
181 intv.Samples[k] = agg
182 }
183 agg.Ingest(float64(val), i.rateDenom)
184}
185
186// Data is used to retrieve all the aggregated metrics
187// Intervals may be in use, and a read lock should be acquired
188func (i *InmemSink) Data() []*IntervalMetrics {
189 // Get the current interval, forces creation
190 i.getInterval()
191
192 i.intervalLock.RLock()
193 defer i.intervalLock.RUnlock()
194
195 intervals := make([]*IntervalMetrics, len(i.intervals))
196 copy(intervals, i.intervals)
197 return intervals
198}
199
200func (i *InmemSink) getExistingInterval(intv time.Time) *IntervalMetrics {
201 i.intervalLock.RLock()
202 defer i.intervalLock.RUnlock()
203
204 n := len(i.intervals)
205 if n > 0 && i.intervals[n-1].Interval == intv {
206 return i.intervals[n-1]
207 }
208 return nil
209}
210
211func (i *InmemSink) createInterval(intv time.Time) *IntervalMetrics {
212 i.intervalLock.Lock()
213 defer i.intervalLock.Unlock()
214
215 // Check for an existing interval
216 n := len(i.intervals)
217 if n > 0 && i.intervals[n-1].Interval == intv {
218 return i.intervals[n-1]
219 }
220
221 // Add the current interval
222 current := NewIntervalMetrics(intv)
223 i.intervals = append(i.intervals, current)
224 n++
225
226 // Truncate the intervals if they are too long
227 if n >= i.maxIntervals {
228 copy(i.intervals[0:], i.intervals[n-i.maxIntervals:])
229 i.intervals = i.intervals[:i.maxIntervals]
230 }
231 return current
232}
233
234// getInterval returns the current interval to write to
235func (i *InmemSink) getInterval() *IntervalMetrics {
236 intv := time.Now().Truncate(i.interval)
237 if m := i.getExistingInterval(intv); m != nil {
238 return m
239 }
240 return i.createInterval(intv)
241}
242
243// Flattens the key for formatting, removes spaces
244func (i *InmemSink) flattenKey(parts []string) string {
245 joined := strings.Join(parts, ".")
246 return strings.Replace(joined, " ", "_", -1)
247}