blob: d759d912be156b85f948d8bf9b2c497e9ff5fdda [file] [log] [blame]
Matteo Scandoloa4285862020-12-01 18:10:10 -08001/*
2Copyright 2014 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package wait
18
19import (
20 "context"
21 "errors"
22 "math"
23 "math/rand"
24 "sync"
25 "time"
26
27 "k8s.io/apimachinery/pkg/util/clock"
28 "k8s.io/apimachinery/pkg/util/runtime"
29)
30
31// For any test of the style:
32// ...
33// <- time.After(timeout):
34// t.Errorf("Timed out")
35// The value for timeout should effectively be "forever." Obviously we don't want our tests to truly lock up forever, but 30s
36// is long enough that it is effectively forever for the things that can slow down a run on a heavily contended machine
37// (GC, seeks, etc), but not so long as to make a developer ctrl-c a test run if they do happen to break that test.
38var ForeverTestTimeout = time.Second * 30
39
40// NeverStop may be passed to Until to make it never stop.
41var NeverStop <-chan struct{} = make(chan struct{})
42
43// Group allows to start a group of goroutines and wait for their completion.
44type Group struct {
45 wg sync.WaitGroup
46}
47
48func (g *Group) Wait() {
49 g.wg.Wait()
50}
51
52// StartWithChannel starts f in a new goroutine in the group.
53// stopCh is passed to f as an argument. f should stop when stopCh is available.
54func (g *Group) StartWithChannel(stopCh <-chan struct{}, f func(stopCh <-chan struct{})) {
55 g.Start(func() {
56 f(stopCh)
57 })
58}
59
60// StartWithContext starts f in a new goroutine in the group.
61// ctx is passed to f as an argument. f should stop when ctx.Done() is available.
62func (g *Group) StartWithContext(ctx context.Context, f func(context.Context)) {
63 g.Start(func() {
64 f(ctx)
65 })
66}
67
68// Start starts f in a new goroutine in the group.
69func (g *Group) Start(f func()) {
70 g.wg.Add(1)
71 go func() {
72 defer g.wg.Done()
73 f()
74 }()
75}
76
77// Forever calls f every period for ever.
78//
79// Forever is syntactic sugar on top of Until.
80func Forever(f func(), period time.Duration) {
81 Until(f, period, NeverStop)
82}
83
84// Until loops until stop channel is closed, running f every period.
85//
86// Until is syntactic sugar on top of JitterUntil with zero jitter factor and
87// with sliding = true (which means the timer for period starts after the f
88// completes).
89func Until(f func(), period time.Duration, stopCh <-chan struct{}) {
90 JitterUntil(f, period, 0.0, true, stopCh)
91}
92
93// UntilWithContext loops until context is done, running f every period.
94//
95// UntilWithContext is syntactic sugar on top of JitterUntilWithContext
96// with zero jitter factor and with sliding = true (which means the timer
97// for period starts after the f completes).
98func UntilWithContext(ctx context.Context, f func(context.Context), period time.Duration) {
99 JitterUntilWithContext(ctx, f, period, 0.0, true)
100}
101
102// NonSlidingUntil loops until stop channel is closed, running f every
103// period.
104//
105// NonSlidingUntil is syntactic sugar on top of JitterUntil with zero jitter
106// factor, with sliding = false (meaning the timer for period starts at the same
107// time as the function starts).
108func NonSlidingUntil(f func(), period time.Duration, stopCh <-chan struct{}) {
109 JitterUntil(f, period, 0.0, false, stopCh)
110}
111
112// NonSlidingUntilWithContext loops until context is done, running f every
113// period.
114//
115// NonSlidingUntilWithContext is syntactic sugar on top of JitterUntilWithContext
116// with zero jitter factor, with sliding = false (meaning the timer for period
117// starts at the same time as the function starts).
118func NonSlidingUntilWithContext(ctx context.Context, f func(context.Context), period time.Duration) {
119 JitterUntilWithContext(ctx, f, period, 0.0, false)
120}
121
122// JitterUntil loops until stop channel is closed, running f every period.
123//
124// If jitterFactor is positive, the period is jittered before every run of f.
125// If jitterFactor is not positive, the period is unchanged and not jittered.
126//
127// If sliding is true, the period is computed after f runs. If it is false then
128// period includes the runtime for f.
129//
130// Close stopCh to stop. f may not be invoked if stop channel is already
131// closed. Pass NeverStop to if you don't want it stop.
132func JitterUntil(f func(), period time.Duration, jitterFactor float64, sliding bool, stopCh <-chan struct{}) {
133 BackoffUntil(f, NewJitteredBackoffManager(period, jitterFactor, &clock.RealClock{}), sliding, stopCh)
134}
135
136// BackoffUntil loops until stop channel is closed, run f every duration given by BackoffManager.
137//
138// If sliding is true, the period is computed after f runs. If it is false then
139// period includes the runtime for f.
140func BackoffUntil(f func(), backoff BackoffManager, sliding bool, stopCh <-chan struct{}) {
141 var t clock.Timer
142 for {
143 select {
144 case <-stopCh:
145 return
146 default:
147 }
148
149 if !sliding {
150 t = backoff.Backoff()
151 }
152
153 func() {
154 defer runtime.HandleCrash()
155 f()
156 }()
157
158 if sliding {
159 t = backoff.Backoff()
160 }
161
162 // NOTE: b/c there is no priority selection in golang
163 // it is possible for this to race, meaning we could
164 // trigger t.C and stopCh, and t.C select falls through.
165 // In order to mitigate we re-check stopCh at the beginning
166 // of every loop to prevent extra executions of f().
167 select {
168 case <-stopCh:
169 return
170 case <-t.C():
171 }
172 }
173}
174
175// JitterUntilWithContext loops until context is done, running f every period.
176//
177// If jitterFactor is positive, the period is jittered before every run of f.
178// If jitterFactor is not positive, the period is unchanged and not jittered.
179//
180// If sliding is true, the period is computed after f runs. If it is false then
181// period includes the runtime for f.
182//
183// Cancel context to stop. f may not be invoked if context is already expired.
184func JitterUntilWithContext(ctx context.Context, f func(context.Context), period time.Duration, jitterFactor float64, sliding bool) {
185 JitterUntil(func() { f(ctx) }, period, jitterFactor, sliding, ctx.Done())
186}
187
188// Jitter returns a time.Duration between duration and duration + maxFactor *
189// duration.
190//
191// This allows clients to avoid converging on periodic behavior. If maxFactor
192// is 0.0, a suggested default value will be chosen.
193func Jitter(duration time.Duration, maxFactor float64) time.Duration {
194 if maxFactor <= 0.0 {
195 maxFactor = 1.0
196 }
197 wait := duration + time.Duration(rand.Float64()*maxFactor*float64(duration))
198 return wait
199}
200
201// ErrWaitTimeout is returned when the condition exited without success.
202var ErrWaitTimeout = errors.New("timed out waiting for the condition")
203
204// ConditionFunc returns true if the condition is satisfied, or an error
205// if the loop should be aborted.
206type ConditionFunc func() (done bool, err error)
207
208// runConditionWithCrashProtection runs a ConditionFunc with crash protection
209func runConditionWithCrashProtection(condition ConditionFunc) (bool, error) {
210 defer runtime.HandleCrash()
211 return condition()
212}
213
214// Backoff holds parameters applied to a Backoff function.
215type Backoff struct {
216 // The initial duration.
217 Duration time.Duration
218 // Duration is multiplied by factor each iteration, if factor is not zero
219 // and the limits imposed by Steps and Cap have not been reached.
220 // Should not be negative.
221 // The jitter does not contribute to the updates to the duration parameter.
222 Factor float64
223 // The sleep at each iteration is the duration plus an additional
224 // amount chosen uniformly at random from the interval between
225 // zero and `jitter*duration`.
226 Jitter float64
227 // The remaining number of iterations in which the duration
228 // parameter may change (but progress can be stopped earlier by
229 // hitting the cap). If not positive, the duration is not
230 // changed. Used for exponential backoff in combination with
231 // Factor and Cap.
232 Steps int
233 // A limit on revised values of the duration parameter. If a
234 // multiplication by the factor parameter would make the duration
235 // exceed the cap then the duration is set to the cap and the
236 // steps parameter is set to zero.
237 Cap time.Duration
238}
239
240// Step (1) returns an amount of time to sleep determined by the
241// original Duration and Jitter and (2) mutates the provided Backoff
242// to update its Steps and Duration.
243func (b *Backoff) Step() time.Duration {
244 if b.Steps < 1 {
245 if b.Jitter > 0 {
246 return Jitter(b.Duration, b.Jitter)
247 }
248 return b.Duration
249 }
250 b.Steps--
251
252 duration := b.Duration
253
254 // calculate the next step
255 if b.Factor != 0 {
256 b.Duration = time.Duration(float64(b.Duration) * b.Factor)
257 if b.Cap > 0 && b.Duration > b.Cap {
258 b.Duration = b.Cap
259 b.Steps = 0
260 }
261 }
262
263 if b.Jitter > 0 {
264 duration = Jitter(duration, b.Jitter)
265 }
266 return duration
267}
268
269// contextForChannel derives a child context from a parent channel.
270//
271// The derived context's Done channel is closed when the returned cancel function
272// is called or when the parent channel is closed, whichever happens first.
273//
274// Note the caller must *always* call the CancelFunc, otherwise resources may be leaked.
275func contextForChannel(parentCh <-chan struct{}) (context.Context, context.CancelFunc) {
276 ctx, cancel := context.WithCancel(context.Background())
277
278 go func() {
279 select {
280 case <-parentCh:
281 cancel()
282 case <-ctx.Done():
283 }
284 }()
285 return ctx, cancel
286}
287
288// BackoffManager manages backoff with a particular scheme based on its underlying implementation. It provides
289// an interface to return a timer for backoff, and caller shall backoff until Timer.C() drains. If the second Backoff()
290// is called before the timer from the first Backoff() call finishes, the first timer will NOT be drained and result in
291// undetermined behavior.
292// The BackoffManager is supposed to be called in a single-threaded environment.
293type BackoffManager interface {
294 Backoff() clock.Timer
295}
296
297type exponentialBackoffManagerImpl struct {
298 backoff *Backoff
299 backoffTimer clock.Timer
300 lastBackoffStart time.Time
301 initialBackoff time.Duration
302 backoffResetDuration time.Duration
303 clock clock.Clock
304}
305
306// NewExponentialBackoffManager returns a manager for managing exponential backoff. Each backoff is jittered and
307// backoff will not exceed the given max. If the backoff is not called within resetDuration, the backoff is reset.
308// This backoff manager is used to reduce load during upstream unhealthiness.
309func NewExponentialBackoffManager(initBackoff, maxBackoff, resetDuration time.Duration, backoffFactor, jitter float64, c clock.Clock) BackoffManager {
310 return &exponentialBackoffManagerImpl{
311 backoff: &Backoff{
312 Duration: initBackoff,
313 Factor: backoffFactor,
314 Jitter: jitter,
315
316 // the current impl of wait.Backoff returns Backoff.Duration once steps are used up, which is not
317 // what we ideally need here, we set it to max int and assume we will never use up the steps
318 Steps: math.MaxInt32,
319 Cap: maxBackoff,
320 },
321 backoffTimer: nil,
322 initialBackoff: initBackoff,
323 lastBackoffStart: c.Now(),
324 backoffResetDuration: resetDuration,
325 clock: c,
326 }
327}
328
329func (b *exponentialBackoffManagerImpl) getNextBackoff() time.Duration {
330 if b.clock.Now().Sub(b.lastBackoffStart) > b.backoffResetDuration {
331 b.backoff.Steps = math.MaxInt32
332 b.backoff.Duration = b.initialBackoff
333 }
334 b.lastBackoffStart = b.clock.Now()
335 return b.backoff.Step()
336}
337
338// Backoff implements BackoffManager.Backoff, it returns a timer so caller can block on the timer for exponential backoff.
339// The returned timer must be drained before calling Backoff() the second time
340func (b *exponentialBackoffManagerImpl) Backoff() clock.Timer {
341 if b.backoffTimer == nil {
342 b.backoffTimer = b.clock.NewTimer(b.getNextBackoff())
343 } else {
344 b.backoffTimer.Reset(b.getNextBackoff())
345 }
346 return b.backoffTimer
347}
348
349type jitteredBackoffManagerImpl struct {
350 clock clock.Clock
351 duration time.Duration
352 jitter float64
353 backoffTimer clock.Timer
354}
355
356// NewJitteredBackoffManager returns a BackoffManager that backoffs with given duration plus given jitter. If the jitter
357// is negative, backoff will not be jittered.
358func NewJitteredBackoffManager(duration time.Duration, jitter float64, c clock.Clock) BackoffManager {
359 return &jitteredBackoffManagerImpl{
360 clock: c,
361 duration: duration,
362 jitter: jitter,
363 backoffTimer: nil,
364 }
365}
366
367func (j *jitteredBackoffManagerImpl) getNextBackoff() time.Duration {
368 jitteredPeriod := j.duration
369 if j.jitter > 0.0 {
370 jitteredPeriod = Jitter(j.duration, j.jitter)
371 }
372 return jitteredPeriod
373}
374
375// Backoff implements BackoffManager.Backoff, it returns a timer so caller can block on the timer for jittered backoff.
376// The returned timer must be drained before calling Backoff() the second time
377func (j *jitteredBackoffManagerImpl) Backoff() clock.Timer {
378 backoff := j.getNextBackoff()
379 if j.backoffTimer == nil {
380 j.backoffTimer = j.clock.NewTimer(backoff)
381 } else {
382 j.backoffTimer.Reset(backoff)
383 }
384 return j.backoffTimer
385}
386
387// ExponentialBackoff repeats a condition check with exponential backoff.
388//
389// It repeatedly checks the condition and then sleeps, using `backoff.Step()`
390// to determine the length of the sleep and adjust Duration and Steps.
391// Stops and returns as soon as:
392// 1. the condition check returns true or an error,
393// 2. `backoff.Steps` checks of the condition have been done, or
394// 3. a sleep truncated by the cap on duration has been completed.
395// In case (1) the returned error is what the condition function returned.
396// In all other cases, ErrWaitTimeout is returned.
397func ExponentialBackoff(backoff Backoff, condition ConditionFunc) error {
398 for backoff.Steps > 0 {
399 if ok, err := runConditionWithCrashProtection(condition); err != nil || ok {
400 return err
401 }
402 if backoff.Steps == 1 {
403 break
404 }
405 time.Sleep(backoff.Step())
406 }
407 return ErrWaitTimeout
408}
409
410// Poll tries a condition func until it returns true, an error, or the timeout
411// is reached.
412//
413// Poll always waits the interval before the run of 'condition'.
414// 'condition' will always be invoked at least once.
415//
416// Some intervals may be missed if the condition takes too long or the time
417// window is too short.
418//
419// If you want to Poll something forever, see PollInfinite.
420func Poll(interval, timeout time.Duration, condition ConditionFunc) error {
421 return pollInternal(poller(interval, timeout), condition)
422}
423
424func pollInternal(wait WaitFunc, condition ConditionFunc) error {
425 done := make(chan struct{})
426 defer close(done)
427 return WaitFor(wait, condition, done)
428}
429
430// PollImmediate tries a condition func until it returns true, an error, or the timeout
431// is reached.
432//
433// PollImmediate always checks 'condition' before waiting for the interval. 'condition'
434// will always be invoked at least once.
435//
436// Some intervals may be missed if the condition takes too long or the time
437// window is too short.
438//
439// If you want to immediately Poll something forever, see PollImmediateInfinite.
440func PollImmediate(interval, timeout time.Duration, condition ConditionFunc) error {
441 return pollImmediateInternal(poller(interval, timeout), condition)
442}
443
444func pollImmediateInternal(wait WaitFunc, condition ConditionFunc) error {
445 done, err := runConditionWithCrashProtection(condition)
446 if err != nil {
447 return err
448 }
449 if done {
450 return nil
451 }
452 return pollInternal(wait, condition)
453}
454
455// PollInfinite tries a condition func until it returns true or an error
456//
457// PollInfinite always waits the interval before the run of 'condition'.
458//
459// Some intervals may be missed if the condition takes too long or the time
460// window is too short.
461func PollInfinite(interval time.Duration, condition ConditionFunc) error {
462 done := make(chan struct{})
463 defer close(done)
464 return PollUntil(interval, condition, done)
465}
466
467// PollImmediateInfinite tries a condition func until it returns true or an error
468//
469// PollImmediateInfinite runs the 'condition' before waiting for the interval.
470//
471// Some intervals may be missed if the condition takes too long or the time
472// window is too short.
473func PollImmediateInfinite(interval time.Duration, condition ConditionFunc) error {
474 done, err := runConditionWithCrashProtection(condition)
475 if err != nil {
476 return err
477 }
478 if done {
479 return nil
480 }
481 return PollInfinite(interval, condition)
482}
483
484// PollUntil tries a condition func until it returns true, an error or stopCh is
485// closed.
486//
487// PollUntil always waits interval before the first run of 'condition'.
488// 'condition' will always be invoked at least once.
489func PollUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error {
490 ctx, cancel := contextForChannel(stopCh)
491 defer cancel()
492 return WaitFor(poller(interval, 0), condition, ctx.Done())
493}
494
495// PollImmediateUntil tries a condition func until it returns true, an error or stopCh is closed.
496//
497// PollImmediateUntil runs the 'condition' before waiting for the interval.
498// 'condition' will always be invoked at least once.
499func PollImmediateUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error {
500 done, err := condition()
501 if err != nil {
502 return err
503 }
504 if done {
505 return nil
506 }
507 select {
508 case <-stopCh:
509 return ErrWaitTimeout
510 default:
511 return PollUntil(interval, condition, stopCh)
512 }
513}
514
515// WaitFunc creates a channel that receives an item every time a test
516// should be executed and is closed when the last test should be invoked.
517type WaitFunc func(done <-chan struct{}) <-chan struct{}
518
519// WaitFor continually checks 'fn' as driven by 'wait'.
520//
521// WaitFor gets a channel from 'wait()'', and then invokes 'fn' once for every value
522// placed on the channel and once more when the channel is closed. If the channel is closed
523// and 'fn' returns false without error, WaitFor returns ErrWaitTimeout.
524//
525// If 'fn' returns an error the loop ends and that error is returned. If
526// 'fn' returns true the loop ends and nil is returned.
527//
528// ErrWaitTimeout will be returned if the 'done' channel is closed without fn ever
529// returning true.
530//
531// When the done channel is closed, because the golang `select` statement is
532// "uniform pseudo-random", the `fn` might still run one or multiple time,
533// though eventually `WaitFor` will return.
534func WaitFor(wait WaitFunc, fn ConditionFunc, done <-chan struct{}) error {
535 stopCh := make(chan struct{})
536 defer close(stopCh)
537 c := wait(stopCh)
538 for {
539 select {
540 case _, open := <-c:
541 ok, err := runConditionWithCrashProtection(fn)
542 if err != nil {
543 return err
544 }
545 if ok {
546 return nil
547 }
548 if !open {
549 return ErrWaitTimeout
550 }
551 case <-done:
552 return ErrWaitTimeout
553 }
554 }
555}
556
557// poller returns a WaitFunc that will send to the channel every interval until
558// timeout has elapsed and then closes the channel.
559//
560// Over very short intervals you may receive no ticks before the channel is
561// closed. A timeout of 0 is interpreted as an infinity, and in such a case
562// it would be the caller's responsibility to close the done channel.
563// Failure to do so would result in a leaked goroutine.
564//
565// Output ticks are not buffered. If the channel is not ready to receive an
566// item, the tick is skipped.
567func poller(interval, timeout time.Duration) WaitFunc {
568 return WaitFunc(func(done <-chan struct{}) <-chan struct{} {
569 ch := make(chan struct{})
570
571 go func() {
572 defer close(ch)
573
574 tick := time.NewTicker(interval)
575 defer tick.Stop()
576
577 var after <-chan time.Time
578 if timeout != 0 {
579 // time.After is more convenient, but it
580 // potentially leaves timers around much longer
581 // than necessary if we exit early.
582 timer := time.NewTimer(timeout)
583 after = timer.C
584 defer timer.Stop()
585 }
586
587 for {
588 select {
589 case <-tick.C:
590 // If the consumer isn't ready for this signal drop it and
591 // check the other channels.
592 select {
593 case ch <- struct{}{}:
594 default:
595 }
596 case <-after:
597 return
598 case <-done:
599 return
600 }
601 }
602 }()
603
604 return ch
605 })
606}