blob: 93b0e0ad83cea5207cf34e39ac90bfac75d253a3 [file] [log] [blame]
Takahiro Suzuki241c10e2020-12-17 20:17:57 +09001package metrics
2
3import (
4 "bytes"
5 "fmt"
6 "math"
7 "net/url"
8 "strings"
9 "sync"
10 "time"
11)
12
13// InmemSink provides a MetricSink that does in-memory aggregation
14// without sending metrics over a network. It can be embedded within
15// an application to provide profiling information.
16type InmemSink struct {
17 // How long is each aggregation interval
18 interval time.Duration
19
20 // Retain controls how many metrics interval we keep
21 retain time.Duration
22
23 // maxIntervals is the maximum length of intervals.
24 // It is retain / interval.
25 maxIntervals int
26
27 // intervals is a slice of the retained intervals
28 intervals []*IntervalMetrics
29 intervalLock sync.RWMutex
30
31 rateDenom float64
32}
33
34// IntervalMetrics stores the aggregated metrics
35// for a specific interval
36type IntervalMetrics struct {
37 sync.RWMutex
38
39 // The start time of the interval
40 Interval time.Time
41
42 // Gauges maps the key to the last set value
43 Gauges map[string]GaugeValue
44
45 // Points maps the string to the list of emitted values
46 // from EmitKey
47 Points map[string][]float32
48
49 // Counters maps the string key to a sum of the counter
50 // values
51 Counters map[string]SampledValue
52
53 // Samples maps the key to an AggregateSample,
54 // which has the rolled up view of a sample
55 Samples map[string]SampledValue
56}
57
58// NewIntervalMetrics creates a new IntervalMetrics for a given interval
59func NewIntervalMetrics(intv time.Time) *IntervalMetrics {
60 return &IntervalMetrics{
61 Interval: intv,
62 Gauges: make(map[string]GaugeValue),
63 Points: make(map[string][]float32),
64 Counters: make(map[string]SampledValue),
65 Samples: make(map[string]SampledValue),
66 }
67}
68
69// AggregateSample is used to hold aggregate metrics
70// about a sample
71type AggregateSample struct {
72 Count int // The count of emitted pairs
73 Rate float64 // The values rate per time unit (usually 1 second)
74 Sum float64 // The sum of values
75 SumSq float64 `json:"-"` // The sum of squared values
76 Min float64 // Minimum value
77 Max float64 // Maximum value
78 LastUpdated time.Time `json:"-"` // When value was last updated
79}
80
81// Computes a Stddev of the values
82func (a *AggregateSample) Stddev() float64 {
83 num := (float64(a.Count) * a.SumSq) - math.Pow(a.Sum, 2)
84 div := float64(a.Count * (a.Count - 1))
85 if div == 0 {
86 return 0
87 }
88 return math.Sqrt(num / div)
89}
90
91// Computes a mean of the values
92func (a *AggregateSample) Mean() float64 {
93 if a.Count == 0 {
94 return 0
95 }
96 return a.Sum / float64(a.Count)
97}
98
99// Ingest is used to update a sample
100func (a *AggregateSample) Ingest(v float64, rateDenom float64) {
101 a.Count++
102 a.Sum += v
103 a.SumSq += (v * v)
104 if v < a.Min || a.Count == 1 {
105 a.Min = v
106 }
107 if v > a.Max || a.Count == 1 {
108 a.Max = v
109 }
110 a.Rate = float64(a.Sum) / rateDenom
111 a.LastUpdated = time.Now()
112}
113
114func (a *AggregateSample) String() string {
115 if a.Count == 0 {
116 return "Count: 0"
117 } else if a.Stddev() == 0 {
118 return fmt.Sprintf("Count: %d Sum: %0.3f LastUpdated: %s", a.Count, a.Sum, a.LastUpdated)
119 } else {
120 return fmt.Sprintf("Count: %d Min: %0.3f Mean: %0.3f Max: %0.3f Stddev: %0.3f Sum: %0.3f LastUpdated: %s",
121 a.Count, a.Min, a.Mean(), a.Max, a.Stddev(), a.Sum, a.LastUpdated)
122 }
123}
124
125// NewInmemSinkFromURL creates an InmemSink from a URL. It is used
126// (and tested) from NewMetricSinkFromURL.
127func NewInmemSinkFromURL(u *url.URL) (MetricSink, error) {
128 params := u.Query()
129
130 interval, err := time.ParseDuration(params.Get("interval"))
131 if err != nil {
132 return nil, fmt.Errorf("Bad 'interval' param: %s", err)
133 }
134
135 retain, err := time.ParseDuration(params.Get("retain"))
136 if err != nil {
137 return nil, fmt.Errorf("Bad 'retain' param: %s", err)
138 }
139
140 return NewInmemSink(interval, retain), nil
141}
142
143// NewInmemSink is used to construct a new in-memory sink.
144// Uses an aggregation interval and maximum retention period.
145func NewInmemSink(interval, retain time.Duration) *InmemSink {
146 rateTimeUnit := time.Second
147 i := &InmemSink{
148 interval: interval,
149 retain: retain,
150 maxIntervals: int(retain / interval),
151 rateDenom: float64(interval.Nanoseconds()) / float64(rateTimeUnit.Nanoseconds()),
152 }
153 i.intervals = make([]*IntervalMetrics, 0, i.maxIntervals)
154 return i
155}
156
157func (i *InmemSink) SetGauge(key []string, val float32) {
158 i.SetGaugeWithLabels(key, val, nil)
159}
160
161func (i *InmemSink) SetGaugeWithLabels(key []string, val float32, labels []Label) {
162 k, name := i.flattenKeyLabels(key, labels)
163 intv := i.getInterval()
164
165 intv.Lock()
166 defer intv.Unlock()
167 intv.Gauges[k] = GaugeValue{Name: name, Value: val, Labels: labels}
168}
169
170func (i *InmemSink) EmitKey(key []string, val float32) {
171 k := i.flattenKey(key)
172 intv := i.getInterval()
173
174 intv.Lock()
175 defer intv.Unlock()
176 vals := intv.Points[k]
177 intv.Points[k] = append(vals, val)
178}
179
180func (i *InmemSink) IncrCounter(key []string, val float32) {
181 i.IncrCounterWithLabels(key, val, nil)
182}
183
184func (i *InmemSink) IncrCounterWithLabels(key []string, val float32, labels []Label) {
185 k, name := i.flattenKeyLabels(key, labels)
186 intv := i.getInterval()
187
188 intv.Lock()
189 defer intv.Unlock()
190
191 agg, ok := intv.Counters[k]
192 if !ok {
193 agg = SampledValue{
194 Name: name,
195 AggregateSample: &AggregateSample{},
196 Labels: labels,
197 }
198 intv.Counters[k] = agg
199 }
200 agg.Ingest(float64(val), i.rateDenom)
201}
202
203func (i *InmemSink) AddSample(key []string, val float32) {
204 i.AddSampleWithLabels(key, val, nil)
205}
206
207func (i *InmemSink) AddSampleWithLabels(key []string, val float32, labels []Label) {
208 k, name := i.flattenKeyLabels(key, labels)
209 intv := i.getInterval()
210
211 intv.Lock()
212 defer intv.Unlock()
213
214 agg, ok := intv.Samples[k]
215 if !ok {
216 agg = SampledValue{
217 Name: name,
218 AggregateSample: &AggregateSample{},
219 Labels: labels,
220 }
221 intv.Samples[k] = agg
222 }
223 agg.Ingest(float64(val), i.rateDenom)
224}
225
226// Data is used to retrieve all the aggregated metrics
227// Intervals may be in use, and a read lock should be acquired
228func (i *InmemSink) Data() []*IntervalMetrics {
229 // Get the current interval, forces creation
230 i.getInterval()
231
232 i.intervalLock.RLock()
233 defer i.intervalLock.RUnlock()
234
235 n := len(i.intervals)
236 intervals := make([]*IntervalMetrics, n)
237
238 copy(intervals[:n-1], i.intervals[:n-1])
239 current := i.intervals[n-1]
240
241 // make its own copy for current interval
242 intervals[n-1] = &IntervalMetrics{}
243 copyCurrent := intervals[n-1]
244 current.RLock()
245 *copyCurrent = *current
246
247 copyCurrent.Gauges = make(map[string]GaugeValue, len(current.Gauges))
248 for k, v := range current.Gauges {
249 copyCurrent.Gauges[k] = v
250 }
251 // saved values will be not change, just copy its link
252 copyCurrent.Points = make(map[string][]float32, len(current.Points))
253 for k, v := range current.Points {
254 copyCurrent.Points[k] = v
255 }
256 copyCurrent.Counters = make(map[string]SampledValue, len(current.Counters))
257 for k, v := range current.Counters {
258 copyCurrent.Counters[k] = v.deepCopy()
259 }
260 copyCurrent.Samples = make(map[string]SampledValue, len(current.Samples))
261 for k, v := range current.Samples {
262 copyCurrent.Samples[k] = v.deepCopy()
263 }
264 current.RUnlock()
265
266 return intervals
267}
268
269func (i *InmemSink) getExistingInterval(intv time.Time) *IntervalMetrics {
270 i.intervalLock.RLock()
271 defer i.intervalLock.RUnlock()
272
273 n := len(i.intervals)
274 if n > 0 && i.intervals[n-1].Interval == intv {
275 return i.intervals[n-1]
276 }
277 return nil
278}
279
280func (i *InmemSink) createInterval(intv time.Time) *IntervalMetrics {
281 i.intervalLock.Lock()
282 defer i.intervalLock.Unlock()
283
284 // Check for an existing interval
285 n := len(i.intervals)
286 if n > 0 && i.intervals[n-1].Interval == intv {
287 return i.intervals[n-1]
288 }
289
290 // Add the current interval
291 current := NewIntervalMetrics(intv)
292 i.intervals = append(i.intervals, current)
293 n++
294
295 // Truncate the intervals if they are too long
296 if n >= i.maxIntervals {
297 copy(i.intervals[0:], i.intervals[n-i.maxIntervals:])
298 i.intervals = i.intervals[:i.maxIntervals]
299 }
300 return current
301}
302
303// getInterval returns the current interval to write to
304func (i *InmemSink) getInterval() *IntervalMetrics {
305 intv := time.Now().Truncate(i.interval)
306 if m := i.getExistingInterval(intv); m != nil {
307 return m
308 }
309 return i.createInterval(intv)
310}
311
312// Flattens the key for formatting, removes spaces
313func (i *InmemSink) flattenKey(parts []string) string {
314 buf := &bytes.Buffer{}
315 replacer := strings.NewReplacer(" ", "_")
316
317 if len(parts) > 0 {
318 replacer.WriteString(buf, parts[0])
319 }
320 for _, part := range parts[1:] {
321 replacer.WriteString(buf, ".")
322 replacer.WriteString(buf, part)
323 }
324
325 return buf.String()
326}
327
328// Flattens the key for formatting along with its labels, removes spaces
329func (i *InmemSink) flattenKeyLabels(parts []string, labels []Label) (string, string) {
330 buf := &bytes.Buffer{}
331 replacer := strings.NewReplacer(" ", "_")
332
333 if len(parts) > 0 {
334 replacer.WriteString(buf, parts[0])
335 }
336 for _, part := range parts[1:] {
337 replacer.WriteString(buf, ".")
338 replacer.WriteString(buf, part)
339 }
340
341 key := buf.String()
342
343 for _, label := range labels {
344 replacer.WriteString(buf, fmt.Sprintf(";%s=%s", label.Name, label.Value))
345 }
346
347 return buf.String(), key
348}