blob: 223669bcb2920d8127422d37a524e87af0a1136c [file] [log] [blame]
Pragya Arya324337e2020-02-20 14:35:08 +05301package metrics
2
3import (
4 "math"
5 "sync"
6 "sync/atomic"
7 "time"
8)
9
10// Meters count events to produce exponentially-weighted moving average rates
11// at one-, five-, and fifteen-minutes and a mean rate.
12type Meter interface {
13 Count() int64
14 Mark(int64)
15 Rate1() float64
16 Rate5() float64
17 Rate15() float64
18 RateMean() float64
19 Snapshot() Meter
20 Stop()
21}
22
23// GetOrRegisterMeter returns an existing Meter or constructs and registers a
24// new StandardMeter.
25// Be sure to unregister the meter from the registry once it is of no use to
26// allow for garbage collection.
27func GetOrRegisterMeter(name string, r Registry) Meter {
28 if nil == r {
29 r = DefaultRegistry
30 }
31 return r.GetOrRegister(name, NewMeter).(Meter)
32}
33
34// NewMeter constructs a new StandardMeter and launches a goroutine.
35// Be sure to call Stop() once the meter is of no use to allow for garbage collection.
36func NewMeter() Meter {
37 if UseNilMetrics {
38 return NilMeter{}
39 }
40 m := newStandardMeter()
41 arbiter.Lock()
42 defer arbiter.Unlock()
43 arbiter.meters[m] = struct{}{}
44 if !arbiter.started {
45 arbiter.started = true
46 go arbiter.tick()
47 }
48 return m
49}
50
51// NewMeter constructs and registers a new StandardMeter and launches a
52// goroutine.
53// Be sure to unregister the meter from the registry once it is of no use to
54// allow for garbage collection.
55func NewRegisteredMeter(name string, r Registry) Meter {
56 c := NewMeter()
57 if nil == r {
58 r = DefaultRegistry
59 }
60 r.Register(name, c)
61 return c
62}
63
64// MeterSnapshot is a read-only copy of another Meter.
65type MeterSnapshot struct {
66 count int64
67 rate1, rate5, rate15, rateMean uint64
68}
69
70// Count returns the count of events at the time the snapshot was taken.
71func (m *MeterSnapshot) Count() int64 { return m.count }
72
73// Mark panics.
74func (*MeterSnapshot) Mark(n int64) {
75 panic("Mark called on a MeterSnapshot")
76}
77
78// Rate1 returns the one-minute moving average rate of events per second at the
79// time the snapshot was taken.
80func (m *MeterSnapshot) Rate1() float64 { return math.Float64frombits(m.rate1) }
81
82// Rate5 returns the five-minute moving average rate of events per second at
83// the time the snapshot was taken.
84func (m *MeterSnapshot) Rate5() float64 { return math.Float64frombits(m.rate5) }
85
86// Rate15 returns the fifteen-minute moving average rate of events per second
87// at the time the snapshot was taken.
88func (m *MeterSnapshot) Rate15() float64 { return math.Float64frombits(m.rate15) }
89
90// RateMean returns the meter's mean rate of events per second at the time the
91// snapshot was taken.
92func (m *MeterSnapshot) RateMean() float64 { return math.Float64frombits(m.rateMean) }
93
94// Snapshot returns the snapshot.
95func (m *MeterSnapshot) Snapshot() Meter { return m }
96
97// Stop is a no-op.
98func (m *MeterSnapshot) Stop() {}
99
100// NilMeter is a no-op Meter.
101type NilMeter struct{}
102
103// Count is a no-op.
104func (NilMeter) Count() int64 { return 0 }
105
106// Mark is a no-op.
107func (NilMeter) Mark(n int64) {}
108
109// Rate1 is a no-op.
110func (NilMeter) Rate1() float64 { return 0.0 }
111
112// Rate5 is a no-op.
113func (NilMeter) Rate5() float64 { return 0.0 }
114
115// Rate15is a no-op.
116func (NilMeter) Rate15() float64 { return 0.0 }
117
118// RateMean is a no-op.
119func (NilMeter) RateMean() float64 { return 0.0 }
120
121// Snapshot is a no-op.
122func (NilMeter) Snapshot() Meter { return NilMeter{} }
123
124// Stop is a no-op.
125func (NilMeter) Stop() {}
126
127// StandardMeter is the standard implementation of a Meter.
128type StandardMeter struct {
129 snapshot *MeterSnapshot
130 a1, a5, a15 EWMA
131 startTime time.Time
132 stopped uint32
133}
134
135func newStandardMeter() *StandardMeter {
136 return &StandardMeter{
137 snapshot: &MeterSnapshot{},
138 a1: NewEWMA1(),
139 a5: NewEWMA5(),
140 a15: NewEWMA15(),
141 startTime: time.Now(),
142 }
143}
144
145// Stop stops the meter, Mark() will be a no-op if you use it after being stopped.
146func (m *StandardMeter) Stop() {
147 if atomic.CompareAndSwapUint32(&m.stopped, 0, 1) {
148 arbiter.Lock()
149 delete(arbiter.meters, m)
150 arbiter.Unlock()
151 }
152}
153
154// Count returns the number of events recorded.
155func (m *StandardMeter) Count() int64 {
156 return atomic.LoadInt64(&m.snapshot.count)
157}
158
159// Mark records the occurance of n events.
160func (m *StandardMeter) Mark(n int64) {
161 if atomic.LoadUint32(&m.stopped) == 1 {
162 return
163 }
164
165 atomic.AddInt64(&m.snapshot.count, n)
166
167 m.a1.Update(n)
168 m.a5.Update(n)
169 m.a15.Update(n)
170 m.updateSnapshot()
171}
172
173// Rate1 returns the one-minute moving average rate of events per second.
174func (m *StandardMeter) Rate1() float64 {
175 return math.Float64frombits(atomic.LoadUint64(&m.snapshot.rate1))
176}
177
178// Rate5 returns the five-minute moving average rate of events per second.
179func (m *StandardMeter) Rate5() float64 {
180 return math.Float64frombits(atomic.LoadUint64(&m.snapshot.rate5))
181}
182
183// Rate15 returns the fifteen-minute moving average rate of events per second.
184func (m *StandardMeter) Rate15() float64 {
185 return math.Float64frombits(atomic.LoadUint64(&m.snapshot.rate15))
186}
187
188// RateMean returns the meter's mean rate of events per second.
189func (m *StandardMeter) RateMean() float64 {
190 return math.Float64frombits(atomic.LoadUint64(&m.snapshot.rateMean))
191}
192
193// Snapshot returns a read-only copy of the meter.
194func (m *StandardMeter) Snapshot() Meter {
195 copiedSnapshot := MeterSnapshot{
196 count: atomic.LoadInt64(&m.snapshot.count),
197 rate1: atomic.LoadUint64(&m.snapshot.rate1),
198 rate5: atomic.LoadUint64(&m.snapshot.rate5),
199 rate15: atomic.LoadUint64(&m.snapshot.rate15),
200 rateMean: atomic.LoadUint64(&m.snapshot.rateMean),
201 }
202 return &copiedSnapshot
203}
204
205func (m *StandardMeter) updateSnapshot() {
206 rate1 := math.Float64bits(m.a1.Rate())
207 rate5 := math.Float64bits(m.a5.Rate())
208 rate15 := math.Float64bits(m.a15.Rate())
209 rateMean := math.Float64bits(float64(m.Count()) / time.Since(m.startTime).Seconds())
210
211 atomic.StoreUint64(&m.snapshot.rate1, rate1)
212 atomic.StoreUint64(&m.snapshot.rate5, rate5)
213 atomic.StoreUint64(&m.snapshot.rate15, rate15)
214 atomic.StoreUint64(&m.snapshot.rateMean, rateMean)
215}
216
217func (m *StandardMeter) tick() {
218 m.a1.Tick()
219 m.a5.Tick()
220 m.a15.Tick()
221 m.updateSnapshot()
222}
223
224// meterArbiter ticks meters every 5s from a single goroutine.
225// meters are references in a set for future stopping.
226type meterArbiter struct {
227 sync.RWMutex
228 started bool
229 meters map[*StandardMeter]struct{}
230 ticker *time.Ticker
231}
232
233var arbiter = meterArbiter{ticker: time.NewTicker(5e9), meters: make(map[*StandardMeter]struct{})}
234
235// Ticks meters on the scheduled interval
236func (ma *meterArbiter) tick() {
237 for {
238 select {
239 case <-ma.ticker.C:
240 ma.tickMeters()
241 }
242 }
243}
244
245func (ma *meterArbiter) tickMeters() {
246 ma.RLock()
247 defer ma.RUnlock()
248 for meter := range ma.meters {
249 meter.tick()
250 }
251}