Girish Gowdra | 6450343 | 2020-01-07 10:59:10 +0530 | [diff] [blame] | 1 | package metrics |
| 2 | |
| 3 | import ( |
| 4 | "bytes" |
| 5 | "fmt" |
| 6 | "math" |
| 7 | "net/url" |
| 8 | "strings" |
| 9 | "sync" |
| 10 | "time" |
| 11 | ) |
| 12 | |
| 13 | // InmemSink provides a MetricSink that does in-memory aggregation |
| 14 | // without sending metrics over a network. It can be embedded within |
| 15 | // an application to provide profiling information. |
| 16 | type InmemSink struct { |
| 17 | // How long is each aggregation interval |
| 18 | interval time.Duration |
| 19 | |
| 20 | // Retain controls how many metrics interval we keep |
| 21 | retain time.Duration |
| 22 | |
| 23 | // maxIntervals is the maximum length of intervals. |
| 24 | // It is retain / interval. |
| 25 | maxIntervals int |
| 26 | |
| 27 | // intervals is a slice of the retained intervals |
| 28 | intervals []*IntervalMetrics |
| 29 | intervalLock sync.RWMutex |
| 30 | |
| 31 | rateDenom float64 |
| 32 | } |
| 33 | |
| 34 | // IntervalMetrics stores the aggregated metrics |
| 35 | // for a specific interval |
| 36 | type IntervalMetrics struct { |
| 37 | sync.RWMutex |
| 38 | |
| 39 | // The start time of the interval |
| 40 | Interval time.Time |
| 41 | |
| 42 | // Gauges maps the key to the last set value |
| 43 | Gauges map[string]GaugeValue |
| 44 | |
| 45 | // Points maps the string to the list of emitted values |
| 46 | // from EmitKey |
| 47 | Points map[string][]float32 |
| 48 | |
| 49 | // Counters maps the string key to a sum of the counter |
| 50 | // values |
| 51 | Counters map[string]SampledValue |
| 52 | |
| 53 | // Samples maps the key to an AggregateSample, |
| 54 | // which has the rolled up view of a sample |
| 55 | Samples map[string]SampledValue |
| 56 | } |
| 57 | |
| 58 | // NewIntervalMetrics creates a new IntervalMetrics for a given interval |
| 59 | func NewIntervalMetrics(intv time.Time) *IntervalMetrics { |
| 60 | return &IntervalMetrics{ |
| 61 | Interval: intv, |
| 62 | Gauges: make(map[string]GaugeValue), |
| 63 | Points: make(map[string][]float32), |
| 64 | Counters: make(map[string]SampledValue), |
| 65 | Samples: make(map[string]SampledValue), |
| 66 | } |
| 67 | } |
| 68 | |
| 69 | // AggregateSample is used to hold aggregate metrics |
| 70 | // about a sample |
| 71 | type AggregateSample struct { |
| 72 | Count int // The count of emitted pairs |
| 73 | Rate float64 // The values rate per time unit (usually 1 second) |
| 74 | Sum float64 // The sum of values |
| 75 | SumSq float64 `json:"-"` // The sum of squared values |
| 76 | Min float64 // Minimum value |
| 77 | Max float64 // Maximum value |
| 78 | LastUpdated time.Time `json:"-"` // When value was last updated |
| 79 | } |
| 80 | |
| 81 | // Computes a Stddev of the values |
| 82 | func (a *AggregateSample) Stddev() float64 { |
| 83 | num := (float64(a.Count) * a.SumSq) - math.Pow(a.Sum, 2) |
| 84 | div := float64(a.Count * (a.Count - 1)) |
| 85 | if div == 0 { |
| 86 | return 0 |
| 87 | } |
| 88 | return math.Sqrt(num / div) |
| 89 | } |
| 90 | |
| 91 | // Computes a mean of the values |
| 92 | func (a *AggregateSample) Mean() float64 { |
| 93 | if a.Count == 0 { |
| 94 | return 0 |
| 95 | } |
| 96 | return a.Sum / float64(a.Count) |
| 97 | } |
| 98 | |
| 99 | // Ingest is used to update a sample |
| 100 | func (a *AggregateSample) Ingest(v float64, rateDenom float64) { |
| 101 | a.Count++ |
| 102 | a.Sum += v |
| 103 | a.SumSq += (v * v) |
| 104 | if v < a.Min || a.Count == 1 { |
| 105 | a.Min = v |
| 106 | } |
| 107 | if v > a.Max || a.Count == 1 { |
| 108 | a.Max = v |
| 109 | } |
| 110 | a.Rate = float64(a.Sum) / rateDenom |
| 111 | a.LastUpdated = time.Now() |
| 112 | } |
| 113 | |
| 114 | func (a *AggregateSample) String() string { |
| 115 | if a.Count == 0 { |
| 116 | return "Count: 0" |
| 117 | } else if a.Stddev() == 0 { |
| 118 | return fmt.Sprintf("Count: %d Sum: %0.3f LastUpdated: %s", a.Count, a.Sum, a.LastUpdated) |
| 119 | } else { |
| 120 | return fmt.Sprintf("Count: %d Min: %0.3f Mean: %0.3f Max: %0.3f Stddev: %0.3f Sum: %0.3f LastUpdated: %s", |
| 121 | a.Count, a.Min, a.Mean(), a.Max, a.Stddev(), a.Sum, a.LastUpdated) |
| 122 | } |
| 123 | } |
| 124 | |
| 125 | // NewInmemSinkFromURL creates an InmemSink from a URL. It is used |
| 126 | // (and tested) from NewMetricSinkFromURL. |
| 127 | func NewInmemSinkFromURL(u *url.URL) (MetricSink, error) { |
| 128 | params := u.Query() |
| 129 | |
| 130 | interval, err := time.ParseDuration(params.Get("interval")) |
| 131 | if err != nil { |
| 132 | return nil, fmt.Errorf("Bad 'interval' param: %s", err) |
| 133 | } |
| 134 | |
| 135 | retain, err := time.ParseDuration(params.Get("retain")) |
| 136 | if err != nil { |
| 137 | return nil, fmt.Errorf("Bad 'retain' param: %s", err) |
| 138 | } |
| 139 | |
| 140 | return NewInmemSink(interval, retain), nil |
| 141 | } |
| 142 | |
| 143 | // NewInmemSink is used to construct a new in-memory sink. |
| 144 | // Uses an aggregation interval and maximum retention period. |
| 145 | func NewInmemSink(interval, retain time.Duration) *InmemSink { |
| 146 | rateTimeUnit := time.Second |
| 147 | i := &InmemSink{ |
| 148 | interval: interval, |
| 149 | retain: retain, |
| 150 | maxIntervals: int(retain / interval), |
| 151 | rateDenom: float64(interval.Nanoseconds()) / float64(rateTimeUnit.Nanoseconds()), |
| 152 | } |
| 153 | i.intervals = make([]*IntervalMetrics, 0, i.maxIntervals) |
| 154 | return i |
| 155 | } |
| 156 | |
| 157 | func (i *InmemSink) SetGauge(key []string, val float32) { |
| 158 | i.SetGaugeWithLabels(key, val, nil) |
| 159 | } |
| 160 | |
| 161 | func (i *InmemSink) SetGaugeWithLabels(key []string, val float32, labels []Label) { |
| 162 | k, name := i.flattenKeyLabels(key, labels) |
| 163 | intv := i.getInterval() |
| 164 | |
| 165 | intv.Lock() |
| 166 | defer intv.Unlock() |
| 167 | intv.Gauges[k] = GaugeValue{Name: name, Value: val, Labels: labels} |
| 168 | } |
| 169 | |
| 170 | func (i *InmemSink) EmitKey(key []string, val float32) { |
| 171 | k := i.flattenKey(key) |
| 172 | intv := i.getInterval() |
| 173 | |
| 174 | intv.Lock() |
| 175 | defer intv.Unlock() |
| 176 | vals := intv.Points[k] |
| 177 | intv.Points[k] = append(vals, val) |
| 178 | } |
| 179 | |
| 180 | func (i *InmemSink) IncrCounter(key []string, val float32) { |
| 181 | i.IncrCounterWithLabels(key, val, nil) |
| 182 | } |
| 183 | |
| 184 | func (i *InmemSink) IncrCounterWithLabels(key []string, val float32, labels []Label) { |
| 185 | k, name := i.flattenKeyLabels(key, labels) |
| 186 | intv := i.getInterval() |
| 187 | |
| 188 | intv.Lock() |
| 189 | defer intv.Unlock() |
| 190 | |
| 191 | agg, ok := intv.Counters[k] |
| 192 | if !ok { |
| 193 | agg = SampledValue{ |
| 194 | Name: name, |
| 195 | AggregateSample: &AggregateSample{}, |
| 196 | Labels: labels, |
| 197 | } |
| 198 | intv.Counters[k] = agg |
| 199 | } |
| 200 | agg.Ingest(float64(val), i.rateDenom) |
| 201 | } |
| 202 | |
| 203 | func (i *InmemSink) AddSample(key []string, val float32) { |
| 204 | i.AddSampleWithLabels(key, val, nil) |
| 205 | } |
| 206 | |
| 207 | func (i *InmemSink) AddSampleWithLabels(key []string, val float32, labels []Label) { |
| 208 | k, name := i.flattenKeyLabels(key, labels) |
| 209 | intv := i.getInterval() |
| 210 | |
| 211 | intv.Lock() |
| 212 | defer intv.Unlock() |
| 213 | |
| 214 | agg, ok := intv.Samples[k] |
| 215 | if !ok { |
| 216 | agg = SampledValue{ |
| 217 | Name: name, |
| 218 | AggregateSample: &AggregateSample{}, |
| 219 | Labels: labels, |
| 220 | } |
| 221 | intv.Samples[k] = agg |
| 222 | } |
| 223 | agg.Ingest(float64(val), i.rateDenom) |
| 224 | } |
| 225 | |
| 226 | // Data is used to retrieve all the aggregated metrics |
| 227 | // Intervals may be in use, and a read lock should be acquired |
| 228 | func (i *InmemSink) Data() []*IntervalMetrics { |
| 229 | // Get the current interval, forces creation |
| 230 | i.getInterval() |
| 231 | |
| 232 | i.intervalLock.RLock() |
| 233 | defer i.intervalLock.RUnlock() |
| 234 | |
| 235 | n := len(i.intervals) |
| 236 | intervals := make([]*IntervalMetrics, n) |
| 237 | |
| 238 | copy(intervals[:n-1], i.intervals[:n-1]) |
| 239 | current := i.intervals[n-1] |
| 240 | |
| 241 | // make its own copy for current interval |
| 242 | intervals[n-1] = &IntervalMetrics{} |
| 243 | copyCurrent := intervals[n-1] |
| 244 | current.RLock() |
| 245 | *copyCurrent = *current |
| 246 | |
| 247 | copyCurrent.Gauges = make(map[string]GaugeValue, len(current.Gauges)) |
| 248 | for k, v := range current.Gauges { |
| 249 | copyCurrent.Gauges[k] = v |
| 250 | } |
| 251 | // saved values will be not change, just copy its link |
| 252 | copyCurrent.Points = make(map[string][]float32, len(current.Points)) |
| 253 | for k, v := range current.Points { |
| 254 | copyCurrent.Points[k] = v |
| 255 | } |
| 256 | copyCurrent.Counters = make(map[string]SampledValue, len(current.Counters)) |
| 257 | for k, v := range current.Counters { |
| 258 | copyCurrent.Counters[k] = v.deepCopy() |
| 259 | } |
| 260 | copyCurrent.Samples = make(map[string]SampledValue, len(current.Samples)) |
| 261 | for k, v := range current.Samples { |
| 262 | copyCurrent.Samples[k] = v.deepCopy() |
| 263 | } |
| 264 | current.RUnlock() |
| 265 | |
| 266 | return intervals |
| 267 | } |
| 268 | |
| 269 | func (i *InmemSink) getExistingInterval(intv time.Time) *IntervalMetrics { |
| 270 | i.intervalLock.RLock() |
| 271 | defer i.intervalLock.RUnlock() |
| 272 | |
| 273 | n := len(i.intervals) |
| 274 | if n > 0 && i.intervals[n-1].Interval == intv { |
| 275 | return i.intervals[n-1] |
| 276 | } |
| 277 | return nil |
| 278 | } |
| 279 | |
| 280 | func (i *InmemSink) createInterval(intv time.Time) *IntervalMetrics { |
| 281 | i.intervalLock.Lock() |
| 282 | defer i.intervalLock.Unlock() |
| 283 | |
| 284 | // Check for an existing interval |
| 285 | n := len(i.intervals) |
| 286 | if n > 0 && i.intervals[n-1].Interval == intv { |
| 287 | return i.intervals[n-1] |
| 288 | } |
| 289 | |
| 290 | // Add the current interval |
| 291 | current := NewIntervalMetrics(intv) |
| 292 | i.intervals = append(i.intervals, current) |
| 293 | n++ |
| 294 | |
| 295 | // Truncate the intervals if they are too long |
| 296 | if n >= i.maxIntervals { |
| 297 | copy(i.intervals[0:], i.intervals[n-i.maxIntervals:]) |
| 298 | i.intervals = i.intervals[:i.maxIntervals] |
| 299 | } |
| 300 | return current |
| 301 | } |
| 302 | |
| 303 | // getInterval returns the current interval to write to |
| 304 | func (i *InmemSink) getInterval() *IntervalMetrics { |
| 305 | intv := time.Now().Truncate(i.interval) |
| 306 | if m := i.getExistingInterval(intv); m != nil { |
| 307 | return m |
| 308 | } |
| 309 | return i.createInterval(intv) |
| 310 | } |
| 311 | |
| 312 | // Flattens the key for formatting, removes spaces |
| 313 | func (i *InmemSink) flattenKey(parts []string) string { |
| 314 | buf := &bytes.Buffer{} |
| 315 | replacer := strings.NewReplacer(" ", "_") |
| 316 | |
| 317 | if len(parts) > 0 { |
| 318 | replacer.WriteString(buf, parts[0]) |
| 319 | } |
| 320 | for _, part := range parts[1:] { |
| 321 | replacer.WriteString(buf, ".") |
| 322 | replacer.WriteString(buf, part) |
| 323 | } |
| 324 | |
| 325 | return buf.String() |
| 326 | } |
| 327 | |
| 328 | // Flattens the key for formatting along with its labels, removes spaces |
| 329 | func (i *InmemSink) flattenKeyLabels(parts []string, labels []Label) (string, string) { |
| 330 | buf := &bytes.Buffer{} |
| 331 | replacer := strings.NewReplacer(" ", "_") |
| 332 | |
| 333 | if len(parts) > 0 { |
| 334 | replacer.WriteString(buf, parts[0]) |
| 335 | } |
| 336 | for _, part := range parts[1:] { |
| 337 | replacer.WriteString(buf, ".") |
| 338 | replacer.WriteString(buf, part) |
| 339 | } |
| 340 | |
| 341 | key := buf.String() |
| 342 | |
| 343 | for _, label := range labels { |
| 344 | replacer.WriteString(buf, fmt.Sprintf(";%s=%s", label.Name, label.Value)) |
| 345 | } |
| 346 | |
| 347 | return buf.String(), key |
| 348 | } |