blob: 82339cb744a83382b683ca464910fed7a2d5877b [file] [log] [blame]
divyadesai81bb7ba2020-03-11 11:45:23 +00001package api
2
3import (
4 "fmt"
5 "sync"
6 "time"
7)
8
9const (
10 // DefaultLockSessionName is the Session Name we assign if none is provided
11 DefaultLockSessionName = "Consul API Lock"
12
13 // DefaultLockSessionTTL is the default session TTL if no Session is provided
14 // when creating a new Lock. This is used because we do not have another
15 // other check to depend upon.
16 DefaultLockSessionTTL = "15s"
17
18 // DefaultLockWaitTime is how long we block for at a time to check if lock
19 // acquisition is possible. This affects the minimum time it takes to cancel
20 // a Lock acquisition.
21 DefaultLockWaitTime = 15 * time.Second
22
23 // DefaultLockRetryTime is how long we wait after a failed lock acquisition
24 // before attempting to do the lock again. This is so that once a lock-delay
25 // is in effect, we do not hot loop retrying the acquisition.
26 DefaultLockRetryTime = 5 * time.Second
27
28 // DefaultMonitorRetryTime is how long we wait after a failed monitor check
29 // of a lock (500 response code). This allows the monitor to ride out brief
30 // periods of unavailability, subject to the MonitorRetries setting in the
31 // lock options which is by default set to 0, disabling this feature. This
32 // affects locks and semaphores.
33 DefaultMonitorRetryTime = 2 * time.Second
34
35 // LockFlagValue is a magic flag we set to indicate a key
36 // is being used for a lock. It is used to detect a potential
37 // conflict with a semaphore.
38 LockFlagValue = 0x2ddccbc058a50c18
39)
40
41var (
42 // ErrLockHeld is returned if we attempt to double lock
43 ErrLockHeld = fmt.Errorf("Lock already held")
44
45 // ErrLockNotHeld is returned if we attempt to unlock a lock
46 // that we do not hold.
47 ErrLockNotHeld = fmt.Errorf("Lock not held")
48
49 // ErrLockInUse is returned if we attempt to destroy a lock
50 // that is in use.
51 ErrLockInUse = fmt.Errorf("Lock in use")
52
53 // ErrLockConflict is returned if the flags on a key
54 // used for a lock do not match expectation
55 ErrLockConflict = fmt.Errorf("Existing key does not match lock use")
56)
57
58// Lock is used to implement client-side leader election. It is follows the
59// algorithm as described here: https://www.consul.io/docs/guides/leader-election.html.
60type Lock struct {
61 c *Client
62 opts *LockOptions
63
64 isHeld bool
65 sessionRenew chan struct{}
66 lockSession string
67 l sync.Mutex
68}
69
70// LockOptions is used to parameterize the Lock behavior.
71type LockOptions struct {
72 Key string // Must be set and have write permissions
73 Value []byte // Optional, value to associate with the lock
74 Session string // Optional, created if not specified
75 SessionOpts *SessionEntry // Optional, options to use when creating a session
76 SessionName string // Optional, defaults to DefaultLockSessionName (ignored if SessionOpts is given)
77 SessionTTL string // Optional, defaults to DefaultLockSessionTTL (ignored if SessionOpts is given)
78 MonitorRetries int // Optional, defaults to 0 which means no retries
79 MonitorRetryTime time.Duration // Optional, defaults to DefaultMonitorRetryTime
80 LockWaitTime time.Duration // Optional, defaults to DefaultLockWaitTime
81 LockTryOnce bool // Optional, defaults to false which means try forever
82}
83
84// LockKey returns a handle to a lock struct which can be used
85// to acquire and release the mutex. The key used must have
86// write permissions.
87func (c *Client) LockKey(key string) (*Lock, error) {
88 opts := &LockOptions{
89 Key: key,
90 }
91 return c.LockOpts(opts)
92}
93
94// LockOpts returns a handle to a lock struct which can be used
95// to acquire and release the mutex. The key used must have
96// write permissions.
97func (c *Client) LockOpts(opts *LockOptions) (*Lock, error) {
98 if opts.Key == "" {
99 return nil, fmt.Errorf("missing key")
100 }
101 if opts.SessionName == "" {
102 opts.SessionName = DefaultLockSessionName
103 }
104 if opts.SessionTTL == "" {
105 opts.SessionTTL = DefaultLockSessionTTL
106 } else {
107 if _, err := time.ParseDuration(opts.SessionTTL); err != nil {
108 return nil, fmt.Errorf("invalid SessionTTL: %v", err)
109 }
110 }
111 if opts.MonitorRetryTime == 0 {
112 opts.MonitorRetryTime = DefaultMonitorRetryTime
113 }
114 if opts.LockWaitTime == 0 {
115 opts.LockWaitTime = DefaultLockWaitTime
116 }
117 l := &Lock{
118 c: c,
119 opts: opts,
120 }
121 return l, nil
122}
123
124// Lock attempts to acquire the lock and blocks while doing so.
125// Providing a non-nil stopCh can be used to abort the lock attempt.
126// Returns a channel that is closed if our lock is lost or an error.
127// This channel could be closed at any time due to session invalidation,
128// communication errors, operator intervention, etc. It is NOT safe to
129// assume that the lock is held until Unlock() unless the Session is specifically
130// created without any associated health checks. By default Consul sessions
131// prefer liveness over safety and an application must be able to handle
132// the lock being lost.
133func (l *Lock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) {
134 // Hold the lock as we try to acquire
135 l.l.Lock()
136 defer l.l.Unlock()
137
138 // Check if we already hold the lock
139 if l.isHeld {
140 return nil, ErrLockHeld
141 }
142
143 // Check if we need to create a session first
144 l.lockSession = l.opts.Session
145 if l.lockSession == "" {
146 s, err := l.createSession()
147 if err != nil {
148 return nil, fmt.Errorf("failed to create session: %v", err)
149 }
150
151 l.sessionRenew = make(chan struct{})
152 l.lockSession = s
153 session := l.c.Session()
154 go session.RenewPeriodic(l.opts.SessionTTL, s, nil, l.sessionRenew)
155
156 // If we fail to acquire the lock, cleanup the session
157 defer func() {
158 if !l.isHeld {
159 close(l.sessionRenew)
160 l.sessionRenew = nil
161 }
162 }()
163 }
164
165 // Setup the query options
166 kv := l.c.KV()
167 qOpts := &QueryOptions{
168 WaitTime: l.opts.LockWaitTime,
169 }
170
171 start := time.Now()
172 attempts := 0
173WAIT:
174 // Check if we should quit
175 select {
176 case <-stopCh:
177 return nil, nil
178 default:
179 }
180
181 // Handle the one-shot mode.
182 if l.opts.LockTryOnce && attempts > 0 {
183 elapsed := time.Since(start)
184 if elapsed > l.opts.LockWaitTime {
185 return nil, nil
186 }
187
188 // Query wait time should not exceed the lock wait time
189 qOpts.WaitTime = l.opts.LockWaitTime - elapsed
190 }
191 attempts++
192
193 // Look for an existing lock, blocking until not taken
194 pair, meta, err := kv.Get(l.opts.Key, qOpts)
195 if err != nil {
196 return nil, fmt.Errorf("failed to read lock: %v", err)
197 }
198 if pair != nil && pair.Flags != LockFlagValue {
199 return nil, ErrLockConflict
200 }
201 locked := false
202 if pair != nil && pair.Session == l.lockSession {
203 goto HELD
204 }
205 if pair != nil && pair.Session != "" {
206 qOpts.WaitIndex = meta.LastIndex
207 goto WAIT
208 }
209
210 // Try to acquire the lock
211 pair = l.lockEntry(l.lockSession)
212 locked, _, err = kv.Acquire(pair, nil)
213 if err != nil {
214 return nil, fmt.Errorf("failed to acquire lock: %v", err)
215 }
216
217 // Handle the case of not getting the lock
218 if !locked {
219 // Determine why the lock failed
220 qOpts.WaitIndex = 0
221 pair, meta, err = kv.Get(l.opts.Key, qOpts)
222 if pair != nil && pair.Session != "" {
223 //If the session is not null, this means that a wait can safely happen
224 //using a long poll
225 qOpts.WaitIndex = meta.LastIndex
226 goto WAIT
227 } else {
228 // If the session is empty and the lock failed to acquire, then it means
229 // a lock-delay is in effect and a timed wait must be used
230 select {
231 case <-time.After(DefaultLockRetryTime):
232 goto WAIT
233 case <-stopCh:
234 return nil, nil
235 }
236 }
237 }
238
239HELD:
240 // Watch to ensure we maintain leadership
241 leaderCh := make(chan struct{})
242 go l.monitorLock(l.lockSession, leaderCh)
243
244 // Set that we own the lock
245 l.isHeld = true
246
247 // Locked! All done
248 return leaderCh, nil
249}
250
251// Unlock released the lock. It is an error to call this
252// if the lock is not currently held.
253func (l *Lock) Unlock() error {
254 // Hold the lock as we try to release
255 l.l.Lock()
256 defer l.l.Unlock()
257
258 // Ensure the lock is actually held
259 if !l.isHeld {
260 return ErrLockNotHeld
261 }
262
263 // Set that we no longer own the lock
264 l.isHeld = false
265
266 // Stop the session renew
267 if l.sessionRenew != nil {
268 defer func() {
269 close(l.sessionRenew)
270 l.sessionRenew = nil
271 }()
272 }
273
274 // Get the lock entry, and clear the lock session
275 lockEnt := l.lockEntry(l.lockSession)
276 l.lockSession = ""
277
278 // Release the lock explicitly
279 kv := l.c.KV()
280 _, _, err := kv.Release(lockEnt, nil)
281 if err != nil {
282 return fmt.Errorf("failed to release lock: %v", err)
283 }
284 return nil
285}
286
287// Destroy is used to cleanup the lock entry. It is not necessary
288// to invoke. It will fail if the lock is in use.
289func (l *Lock) Destroy() error {
290 // Hold the lock as we try to release
291 l.l.Lock()
292 defer l.l.Unlock()
293
294 // Check if we already hold the lock
295 if l.isHeld {
296 return ErrLockHeld
297 }
298
299 // Look for an existing lock
300 kv := l.c.KV()
301 pair, _, err := kv.Get(l.opts.Key, nil)
302 if err != nil {
303 return fmt.Errorf("failed to read lock: %v", err)
304 }
305
306 // Nothing to do if the lock does not exist
307 if pair == nil {
308 return nil
309 }
310
311 // Check for possible flag conflict
312 if pair.Flags != LockFlagValue {
313 return ErrLockConflict
314 }
315
316 // Check if it is in use
317 if pair.Session != "" {
318 return ErrLockInUse
319 }
320
321 // Attempt the delete
322 didRemove, _, err := kv.DeleteCAS(pair, nil)
323 if err != nil {
324 return fmt.Errorf("failed to remove lock: %v", err)
325 }
326 if !didRemove {
327 return ErrLockInUse
328 }
329 return nil
330}
331
332// createSession is used to create a new managed session
333func (l *Lock) createSession() (string, error) {
334 session := l.c.Session()
335 se := l.opts.SessionOpts
336 if se == nil {
337 se = &SessionEntry{
338 Name: l.opts.SessionName,
339 TTL: l.opts.SessionTTL,
340 }
341 }
342 id, _, err := session.Create(se, nil)
343 if err != nil {
344 return "", err
345 }
346 return id, nil
347}
348
349// lockEntry returns a formatted KVPair for the lock
350func (l *Lock) lockEntry(session string) *KVPair {
351 return &KVPair{
352 Key: l.opts.Key,
353 Value: l.opts.Value,
354 Session: session,
355 Flags: LockFlagValue,
356 }
357}
358
359// monitorLock is a long running routine to monitor a lock ownership
360// It closes the stopCh if we lose our leadership.
361func (l *Lock) monitorLock(session string, stopCh chan struct{}) {
362 defer close(stopCh)
363 kv := l.c.KV()
364 opts := &QueryOptions{RequireConsistent: true}
365WAIT:
366 retries := l.opts.MonitorRetries
367RETRY:
368 pair, meta, err := kv.Get(l.opts.Key, opts)
369 if err != nil {
370 // If configured we can try to ride out a brief Consul unavailability
371 // by doing retries. Note that we have to attempt the retry in a non-
372 // blocking fashion so that we have a clean place to reset the retry
373 // counter if service is restored.
374 if retries > 0 && IsRetryableError(err) {
375 time.Sleep(l.opts.MonitorRetryTime)
376 retries--
377 opts.WaitIndex = 0
378 goto RETRY
379 }
380 return
381 }
382 if pair != nil && pair.Session == session {
383 opts.WaitIndex = meta.LastIndex
384 goto WAIT
385 }
386}