blob: a8183dd7e21f85aa05f8e030cbe1804daf912b2b [file] [log] [blame]
Pragya Arya324337e2020-02-20 14:35:08 +05301package metrics
2
3import (
4 "math"
5 "sync"
6 "sync/atomic"
7)
8
9// EWMAs continuously calculate an exponentially-weighted moving average
10// based on an outside source of clock ticks.
11type EWMA interface {
12 Rate() float64
13 Snapshot() EWMA
14 Tick()
15 Update(int64)
16}
17
18// NewEWMA constructs a new EWMA with the given alpha.
19func NewEWMA(alpha float64) EWMA {
20 if UseNilMetrics {
21 return NilEWMA{}
22 }
23 return &StandardEWMA{alpha: alpha}
24}
25
26// NewEWMA1 constructs a new EWMA for a one-minute moving average.
27func NewEWMA1() EWMA {
28 return NewEWMA(1 - math.Exp(-5.0/60.0/1))
29}
30
31// NewEWMA5 constructs a new EWMA for a five-minute moving average.
32func NewEWMA5() EWMA {
33 return NewEWMA(1 - math.Exp(-5.0/60.0/5))
34}
35
36// NewEWMA15 constructs a new EWMA for a fifteen-minute moving average.
37func NewEWMA15() EWMA {
38 return NewEWMA(1 - math.Exp(-5.0/60.0/15))
39}
40
41// EWMASnapshot is a read-only copy of another EWMA.
42type EWMASnapshot float64
43
44// Rate returns the rate of events per second at the time the snapshot was
45// taken.
46func (a EWMASnapshot) Rate() float64 { return float64(a) }
47
48// Snapshot returns the snapshot.
49func (a EWMASnapshot) Snapshot() EWMA { return a }
50
51// Tick panics.
52func (EWMASnapshot) Tick() {
53 panic("Tick called on an EWMASnapshot")
54}
55
56// Update panics.
57func (EWMASnapshot) Update(int64) {
58 panic("Update called on an EWMASnapshot")
59}
60
61// NilEWMA is a no-op EWMA.
62type NilEWMA struct{}
63
64// Rate is a no-op.
65func (NilEWMA) Rate() float64 { return 0.0 }
66
67// Snapshot is a no-op.
68func (NilEWMA) Snapshot() EWMA { return NilEWMA{} }
69
70// Tick is a no-op.
71func (NilEWMA) Tick() {}
72
73// Update is a no-op.
74func (NilEWMA) Update(n int64) {}
75
76// StandardEWMA is the standard implementation of an EWMA and tracks the number
77// of uncounted events and processes them on each tick. It uses the
78// sync/atomic package to manage uncounted events.
79type StandardEWMA struct {
80 uncounted int64 // /!\ this should be the first member to ensure 64-bit alignment
81 alpha float64
82 rate uint64
83 init uint32
84 mutex sync.Mutex
85}
86
87// Rate returns the moving average rate of events per second.
88func (a *StandardEWMA) Rate() float64 {
89 currentRate := math.Float64frombits(atomic.LoadUint64(&a.rate)) * float64(1e9)
90 return currentRate
91}
92
93// Snapshot returns a read-only copy of the EWMA.
94func (a *StandardEWMA) Snapshot() EWMA {
95 return EWMASnapshot(a.Rate())
96}
97
98// Tick ticks the clock to update the moving average. It assumes it is called
99// every five seconds.
100func (a *StandardEWMA) Tick() {
101 // Optimization to avoid mutex locking in the hot-path.
102 if atomic.LoadUint32(&a.init) == 1 {
103 a.updateRate(a.fetchInstantRate())
104 } else {
105 // Slow-path: this is only needed on the first Tick() and preserves transactional updating
106 // of init and rate in the else block. The first conditional is needed below because
107 // a different thread could have set a.init = 1 between the time of the first atomic load and when
108 // the lock was acquired.
109 a.mutex.Lock()
110 if atomic.LoadUint32(&a.init) == 1 {
111 // The fetchInstantRate() uses atomic loading, which is unecessary in this critical section
112 // but again, this section is only invoked on the first successful Tick() operation.
113 a.updateRate(a.fetchInstantRate())
114 } else {
115 atomic.StoreUint32(&a.init, 1)
116 atomic.StoreUint64(&a.rate, math.Float64bits(a.fetchInstantRate()))
117 }
118 a.mutex.Unlock()
119 }
120}
121
122func (a *StandardEWMA) fetchInstantRate() float64 {
123 count := atomic.LoadInt64(&a.uncounted)
124 atomic.AddInt64(&a.uncounted, -count)
125 instantRate := float64(count) / float64(5e9)
126 return instantRate
127}
128
129func (a *StandardEWMA) updateRate(instantRate float64) {
130 currentRate := math.Float64frombits(atomic.LoadUint64(&a.rate))
131 currentRate += a.alpha * (instantRate - currentRate)
132 atomic.StoreUint64(&a.rate, math.Float64bits(currentRate))
133}
134
135// Update adds n uncounted events.
136func (a *StandardEWMA) Update(n int64) {
137 atomic.AddInt64(&a.uncounted, n)
138}