blob: f88ca7248b0fdbff37db37f820ebba18fef1ab4b [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001// Package breaker implements the circuit-breaker resiliency pattern for Go.
2package breaker
3
4import (
5 "errors"
6 "sync"
7 "sync/atomic"
8 "time"
9)
10
11// ErrBreakerOpen is the error returned from Run() when the function is not executed
12// because the breaker is currently open.
13var ErrBreakerOpen = errors.New("circuit breaker is open")
14
15const (
16 closed uint32 = iota
17 open
18 halfOpen
19)
20
21// Breaker implements the circuit-breaker resiliency pattern
22type Breaker struct {
23 errorThreshold, successThreshold int
24 timeout time.Duration
25
26 lock sync.Mutex
27 state uint32
28 errors, successes int
29 lastError time.Time
30}
31
32// New constructs a new circuit-breaker that starts closed.
33// From closed, the breaker opens if "errorThreshold" errors are seen
34// without an error-free period of at least "timeout". From open, the
35// breaker half-closes after "timeout". From half-open, the breaker closes
36// after "successThreshold" consecutive successes, or opens on a single error.
37func New(errorThreshold, successThreshold int, timeout time.Duration) *Breaker {
38 return &Breaker{
39 errorThreshold: errorThreshold,
40 successThreshold: successThreshold,
41 timeout: timeout,
42 }
43}
44
45// Run will either return ErrBreakerOpen immediately if the circuit-breaker is
46// already open, or it will run the given function and pass along its return
47// value. It is safe to call Run concurrently on the same Breaker.
48func (b *Breaker) Run(work func() error) error {
49 state := atomic.LoadUint32(&b.state)
50
51 if state == open {
52 return ErrBreakerOpen
53 }
54
55 return b.doWork(state, work)
56}
57
58// Go will either return ErrBreakerOpen immediately if the circuit-breaker is
59// already open, or it will run the given function in a separate goroutine.
60// If the function is run, Go will return nil immediately, and will *not* return
61// the return value of the function. It is safe to call Go concurrently on the
62// same Breaker.
63func (b *Breaker) Go(work func() error) error {
64 state := atomic.LoadUint32(&b.state)
65
66 if state == open {
67 return ErrBreakerOpen
68 }
69
70 // errcheck complains about ignoring the error return value, but
71 // that's on purpose; if you want an error from a goroutine you have to
72 // get it over a channel or something
73 go b.doWork(state, work)
74
75 return nil
76}
77
78func (b *Breaker) doWork(state uint32, work func() error) error {
79 var panicValue interface{}
80
81 result := func() error {
82 defer func() {
83 panicValue = recover()
84 }()
85 return work()
86 }()
87
88 if result == nil && panicValue == nil && state == closed {
89 // short-circuit the normal, success path without contending
90 // on the lock
91 return nil
92 }
93
94 // oh well, I guess we have to contend on the lock
95 b.processResult(result, panicValue)
96
97 if panicValue != nil {
98 // as close as Go lets us come to a "rethrow" although unfortunately
99 // we lose the original panicing location
100 panic(panicValue)
101 }
102
103 return result
104}
105
106func (b *Breaker) processResult(result error, panicValue interface{}) {
107 b.lock.Lock()
108 defer b.lock.Unlock()
109
110 if result == nil && panicValue == nil {
111 if b.state == halfOpen {
112 b.successes++
113 if b.successes == b.successThreshold {
114 b.closeBreaker()
115 }
116 }
117 } else {
118 if b.errors > 0 {
119 expiry := b.lastError.Add(b.timeout)
120 if time.Now().After(expiry) {
121 b.errors = 0
122 }
123 }
124
125 switch b.state {
126 case closed:
127 b.errors++
128 if b.errors == b.errorThreshold {
129 b.openBreaker()
130 } else {
131 b.lastError = time.Now()
132 }
133 case halfOpen:
134 b.openBreaker()
135 }
136 }
137}
138
139func (b *Breaker) openBreaker() {
140 b.changeState(open)
141 go b.timer()
142}
143
144func (b *Breaker) closeBreaker() {
145 b.changeState(closed)
146}
147
148func (b *Breaker) timer() {
149 time.Sleep(b.timeout)
150
151 b.lock.Lock()
152 defer b.lock.Unlock()
153
154 b.changeState(halfOpen)
155}
156
157func (b *Breaker) changeState(newState uint32) {
158 b.errors = 0
159 b.successes = 0
160 atomic.StoreUint32(&b.state, newState)
161}