blob: 556e6432eb1002d8fcb08a54b1d1fd2167101ded [file] [log] [blame]
Matteo Scandoloa4285862020-12-01 18:10:10 -08001/*
2Copyright 2016 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package workqueue
18
19import (
20 "sync"
21 "time"
22
23 "k8s.io/apimachinery/pkg/util/clock"
24)
25
26// This file provides abstractions for setting the provider (e.g., prometheus)
27// of metrics.
28
29type queueMetrics interface {
30 add(item t)
31 get(item t)
32 done(item t)
33 updateUnfinishedWork()
34}
35
36// GaugeMetric represents a single numerical value that can arbitrarily go up
37// and down.
38type GaugeMetric interface {
39 Inc()
40 Dec()
41}
42
43// SettableGaugeMetric represents a single numerical value that can arbitrarily go up
44// and down. (Separate from GaugeMetric to preserve backwards compatibility.)
45type SettableGaugeMetric interface {
46 Set(float64)
47}
48
49// CounterMetric represents a single numerical value that only ever
50// goes up.
51type CounterMetric interface {
52 Inc()
53}
54
55// SummaryMetric captures individual observations.
56type SummaryMetric interface {
57 Observe(float64)
58}
59
60// HistogramMetric counts individual observations.
61type HistogramMetric interface {
62 Observe(float64)
63}
64
65type noopMetric struct{}
66
67func (noopMetric) Inc() {}
68func (noopMetric) Dec() {}
69func (noopMetric) Set(float64) {}
70func (noopMetric) Observe(float64) {}
71
72// defaultQueueMetrics expects the caller to lock before setting any metrics.
73type defaultQueueMetrics struct {
74 clock clock.Clock
75
76 // current depth of a workqueue
77 depth GaugeMetric
78 // total number of adds handled by a workqueue
79 adds CounterMetric
80 // how long an item stays in a workqueue
81 latency HistogramMetric
82 // how long processing an item from a workqueue takes
83 workDuration HistogramMetric
84 addTimes map[t]time.Time
85 processingStartTimes map[t]time.Time
86
87 // how long have current threads been working?
88 unfinishedWorkSeconds SettableGaugeMetric
89 longestRunningProcessor SettableGaugeMetric
90}
91
92func (m *defaultQueueMetrics) add(item t) {
93 if m == nil {
94 return
95 }
96
97 m.adds.Inc()
98 m.depth.Inc()
99 if _, exists := m.addTimes[item]; !exists {
100 m.addTimes[item] = m.clock.Now()
101 }
102}
103
104func (m *defaultQueueMetrics) get(item t) {
105 if m == nil {
106 return
107 }
108
109 m.depth.Dec()
110 m.processingStartTimes[item] = m.clock.Now()
111 if startTime, exists := m.addTimes[item]; exists {
112 m.latency.Observe(m.sinceInSeconds(startTime))
113 delete(m.addTimes, item)
114 }
115}
116
117func (m *defaultQueueMetrics) done(item t) {
118 if m == nil {
119 return
120 }
121
122 if startTime, exists := m.processingStartTimes[item]; exists {
123 m.workDuration.Observe(m.sinceInSeconds(startTime))
124 delete(m.processingStartTimes, item)
125 }
126}
127
128func (m *defaultQueueMetrics) updateUnfinishedWork() {
129 // Note that a summary metric would be better for this, but prometheus
130 // doesn't seem to have non-hacky ways to reset the summary metrics.
131 var total float64
132 var oldest float64
133 for _, t := range m.processingStartTimes {
134 age := m.sinceInSeconds(t)
135 total += age
136 if age > oldest {
137 oldest = age
138 }
139 }
140 m.unfinishedWorkSeconds.Set(total)
141 m.longestRunningProcessor.Set(oldest)
142}
143
144type noMetrics struct{}
145
146func (noMetrics) add(item t) {}
147func (noMetrics) get(item t) {}
148func (noMetrics) done(item t) {}
149func (noMetrics) updateUnfinishedWork() {}
150
151// Gets the time since the specified start in seconds.
152func (m *defaultQueueMetrics) sinceInSeconds(start time.Time) float64 {
153 return m.clock.Since(start).Seconds()
154}
155
156type retryMetrics interface {
157 retry()
158}
159
160type defaultRetryMetrics struct {
161 retries CounterMetric
162}
163
164func (m *defaultRetryMetrics) retry() {
165 if m == nil {
166 return
167 }
168
169 m.retries.Inc()
170}
171
172// MetricsProvider generates various metrics used by the queue.
173type MetricsProvider interface {
174 NewDepthMetric(name string) GaugeMetric
175 NewAddsMetric(name string) CounterMetric
176 NewLatencyMetric(name string) HistogramMetric
177 NewWorkDurationMetric(name string) HistogramMetric
178 NewUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric
179 NewLongestRunningProcessorSecondsMetric(name string) SettableGaugeMetric
180 NewRetriesMetric(name string) CounterMetric
181}
182
183type noopMetricsProvider struct{}
184
185func (_ noopMetricsProvider) NewDepthMetric(name string) GaugeMetric {
186 return noopMetric{}
187}
188
189func (_ noopMetricsProvider) NewAddsMetric(name string) CounterMetric {
190 return noopMetric{}
191}
192
193func (_ noopMetricsProvider) NewLatencyMetric(name string) HistogramMetric {
194 return noopMetric{}
195}
196
197func (_ noopMetricsProvider) NewWorkDurationMetric(name string) HistogramMetric {
198 return noopMetric{}
199}
200
201func (_ noopMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric {
202 return noopMetric{}
203}
204
205func (_ noopMetricsProvider) NewLongestRunningProcessorSecondsMetric(name string) SettableGaugeMetric {
206 return noopMetric{}
207}
208
209func (_ noopMetricsProvider) NewRetriesMetric(name string) CounterMetric {
210 return noopMetric{}
211}
212
213var globalMetricsFactory = queueMetricsFactory{
214 metricsProvider: noopMetricsProvider{},
215}
216
217type queueMetricsFactory struct {
218 metricsProvider MetricsProvider
219
220 onlyOnce sync.Once
221}
222
223func (f *queueMetricsFactory) setProvider(mp MetricsProvider) {
224 f.onlyOnce.Do(func() {
225 f.metricsProvider = mp
226 })
227}
228
229func (f *queueMetricsFactory) newQueueMetrics(name string, clock clock.Clock) queueMetrics {
230 mp := f.metricsProvider
231 if len(name) == 0 || mp == (noopMetricsProvider{}) {
232 return noMetrics{}
233 }
234 return &defaultQueueMetrics{
235 clock: clock,
236 depth: mp.NewDepthMetric(name),
237 adds: mp.NewAddsMetric(name),
238 latency: mp.NewLatencyMetric(name),
239 workDuration: mp.NewWorkDurationMetric(name),
240 unfinishedWorkSeconds: mp.NewUnfinishedWorkSecondsMetric(name),
241 longestRunningProcessor: mp.NewLongestRunningProcessorSecondsMetric(name),
242 addTimes: map[t]time.Time{},
243 processingStartTimes: map[t]time.Time{},
244 }
245}
246
247func newRetryMetrics(name string) retryMetrics {
248 var ret *defaultRetryMetrics
249 if len(name) == 0 {
250 return ret
251 }
252 return &defaultRetryMetrics{
253 retries: globalMetricsFactory.metricsProvider.NewRetriesMetric(name),
254 }
255}
256
257// SetProvider sets the metrics provider for all subsequently created work
258// queues. Only the first call has an effect.
259func SetProvider(metricsProvider MetricsProvider) {
260 globalMetricsFactory.setProvider(metricsProvider)
261}