blob: 43f0503531fe6d4e706b38cbe048d6c054df3a67 [file] [log] [blame]
khenaidooffe076b2019-01-15 16:08:08 -05001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package lease
16
17import (
18 "encoding/binary"
19 "errors"
20 "math"
21 "sort"
22 "sync"
23 "time"
24
25 "github.com/coreos/etcd/lease/leasepb"
26 "github.com/coreos/etcd/mvcc/backend"
27)
28
29// NoLease is a special LeaseID representing the absence of a lease.
30const NoLease = LeaseID(0)
31
32// MaxLeaseTTL is the maximum lease TTL value
33const MaxLeaseTTL = 9000000000
34
35var (
36 forever = time.Time{}
37
38 leaseBucketName = []byte("lease")
39
40 // maximum number of leases to revoke per second; configurable for tests
41 leaseRevokeRate = 1000
42
43 ErrNotPrimary = errors.New("not a primary lessor")
44 ErrLeaseNotFound = errors.New("lease not found")
45 ErrLeaseExists = errors.New("lease already exists")
46 ErrLeaseTTLTooLarge = errors.New("too large lease TTL")
47)
48
49// TxnDelete is a TxnWrite that only permits deletes. Defined here
50// to avoid circular dependency with mvcc.
51type TxnDelete interface {
52 DeleteRange(key, end []byte) (n, rev int64)
53 End()
54}
55
56// RangeDeleter is a TxnDelete constructor.
57type RangeDeleter func() TxnDelete
58
59type LeaseID int64
60
61// Lessor owns leases. It can grant, revoke, renew and modify leases for lessee.
62type Lessor interface {
63 // SetRangeDeleter lets the lessor create TxnDeletes to the store.
64 // Lessor deletes the items in the revoked or expired lease by creating
65 // new TxnDeletes.
66 SetRangeDeleter(rd RangeDeleter)
67
68 // Grant grants a lease that expires at least after TTL seconds.
69 Grant(id LeaseID, ttl int64) (*Lease, error)
70 // Revoke revokes a lease with given ID. The item attached to the
71 // given lease will be removed. If the ID does not exist, an error
72 // will be returned.
73 Revoke(id LeaseID) error
74
75 // Attach attaches given leaseItem to the lease with given LeaseID.
76 // If the lease does not exist, an error will be returned.
77 Attach(id LeaseID, items []LeaseItem) error
78
79 // GetLease returns LeaseID for given item.
80 // If no lease found, NoLease value will be returned.
81 GetLease(item LeaseItem) LeaseID
82
83 // Detach detaches given leaseItem from the lease with given LeaseID.
84 // If the lease does not exist, an error will be returned.
85 Detach(id LeaseID, items []LeaseItem) error
86
87 // Promote promotes the lessor to be the primary lessor. Primary lessor manages
88 // the expiration and renew of leases.
89 // Newly promoted lessor renew the TTL of all lease to extend + previous TTL.
90 Promote(extend time.Duration)
91
92 // Demote demotes the lessor from being the primary lessor.
93 Demote()
94
95 // Renew renews a lease with given ID. It returns the renewed TTL. If the ID does not exist,
96 // an error will be returned.
97 Renew(id LeaseID) (int64, error)
98
99 // Lookup gives the lease at a given lease id, if any
100 Lookup(id LeaseID) *Lease
101
102 // Leases lists all leases.
103 Leases() []*Lease
104
105 // ExpiredLeasesC returns a chan that is used to receive expired leases.
106 ExpiredLeasesC() <-chan []*Lease
107
108 // Recover recovers the lessor state from the given backend and RangeDeleter.
109 Recover(b backend.Backend, rd RangeDeleter)
110
111 // Stop stops the lessor for managing leases. The behavior of calling Stop multiple
112 // times is undefined.
113 Stop()
114}
115
116// lessor implements Lessor interface.
117// TODO: use clockwork for testability.
118type lessor struct {
119 mu sync.Mutex
120
121 // demotec is set when the lessor is the primary.
122 // demotec will be closed if the lessor is demoted.
123 demotec chan struct{}
124
125 // TODO: probably this should be a heap with a secondary
126 // id index.
127 // Now it is O(N) to loop over the leases to find expired ones.
128 // We want to make Grant, Revoke, and findExpiredLeases all O(logN) and
129 // Renew O(1).
130 // findExpiredLeases and Renew should be the most frequent operations.
131 leaseMap map[LeaseID]*Lease
132
133 itemMap map[LeaseItem]LeaseID
134
135 // When a lease expires, the lessor will delete the
136 // leased range (or key) by the RangeDeleter.
137 rd RangeDeleter
138
139 // backend to persist leases. We only persist lease ID and expiry for now.
140 // The leased items can be recovered by iterating all the keys in kv.
141 b backend.Backend
142
143 // minLeaseTTL is the minimum lease TTL that can be granted for a lease. Any
144 // requests for shorter TTLs are extended to the minimum TTL.
145 minLeaseTTL int64
146
147 expiredC chan []*Lease
148 // stopC is a channel whose closure indicates that the lessor should be stopped.
149 stopC chan struct{}
150 // doneC is a channel whose closure indicates that the lessor is stopped.
151 doneC chan struct{}
152}
153
154func NewLessor(b backend.Backend, minLeaseTTL int64) Lessor {
155 return newLessor(b, minLeaseTTL)
156}
157
158func newLessor(b backend.Backend, minLeaseTTL int64) *lessor {
159 l := &lessor{
160 leaseMap: make(map[LeaseID]*Lease),
161 itemMap: make(map[LeaseItem]LeaseID),
162 b: b,
163 minLeaseTTL: minLeaseTTL,
164 // expiredC is a small buffered chan to avoid unnecessary blocking.
165 expiredC: make(chan []*Lease, 16),
166 stopC: make(chan struct{}),
167 doneC: make(chan struct{}),
168 }
169 l.initAndRecover()
170
171 go l.runLoop()
172
173 return l
174}
175
176// isPrimary indicates if this lessor is the primary lessor. The primary
177// lessor manages lease expiration and renew.
178//
179// in etcd, raft leader is the primary. Thus there might be two primary
180// leaders at the same time (raft allows concurrent leader but with different term)
181// for at most a leader election timeout.
182// The old primary leader cannot affect the correctness since its proposal has a
183// smaller term and will not be committed.
184//
185// TODO: raft follower do not forward lease management proposals. There might be a
186// very small window (within second normally which depends on go scheduling) that
187// a raft follow is the primary between the raft leader demotion and lessor demotion.
188// Usually this should not be a problem. Lease should not be that sensitive to timing.
189func (le *lessor) isPrimary() bool {
190 return le.demotec != nil
191}
192
193func (le *lessor) SetRangeDeleter(rd RangeDeleter) {
194 le.mu.Lock()
195 defer le.mu.Unlock()
196
197 le.rd = rd
198}
199
200func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) {
201 if id == NoLease {
202 return nil, ErrLeaseNotFound
203 }
204
205 if ttl > MaxLeaseTTL {
206 return nil, ErrLeaseTTLTooLarge
207 }
208
209 // TODO: when lessor is under high load, it should give out lease
210 // with longer TTL to reduce renew load.
211 l := &Lease{
212 ID: id,
213 ttl: ttl,
214 itemSet: make(map[LeaseItem]struct{}),
215 revokec: make(chan struct{}),
216 }
217
218 le.mu.Lock()
219 defer le.mu.Unlock()
220
221 if _, ok := le.leaseMap[id]; ok {
222 return nil, ErrLeaseExists
223 }
224
225 if l.ttl < le.minLeaseTTL {
226 l.ttl = le.minLeaseTTL
227 }
228
229 if le.isPrimary() {
230 l.refresh(0)
231 } else {
232 l.forever()
233 }
234
235 le.leaseMap[id] = l
236 l.persistTo(le.b)
237
238 return l, nil
239}
240
241func (le *lessor) Revoke(id LeaseID) error {
242 le.mu.Lock()
243
244 l := le.leaseMap[id]
245 if l == nil {
246 le.mu.Unlock()
247 return ErrLeaseNotFound
248 }
249 defer close(l.revokec)
250 // unlock before doing external work
251 le.mu.Unlock()
252
253 if le.rd == nil {
254 return nil
255 }
256
257 txn := le.rd()
258
259 // sort keys so deletes are in same order among all members,
260 // otherwise the backened hashes will be different
261 keys := l.Keys()
262 sort.StringSlice(keys).Sort()
263 for _, key := range keys {
264 txn.DeleteRange([]byte(key), nil)
265 }
266
267 le.mu.Lock()
268 defer le.mu.Unlock()
269 delete(le.leaseMap, l.ID)
270 // lease deletion needs to be in the same backend transaction with the
271 // kv deletion. Or we might end up with not executing the revoke or not
272 // deleting the keys if etcdserver fails in between.
273 le.b.BatchTx().UnsafeDelete(leaseBucketName, int64ToBytes(int64(l.ID)))
274
275 txn.End()
276 return nil
277}
278
279// Renew renews an existing lease. If the given lease does not exist or
280// has expired, an error will be returned.
281func (le *lessor) Renew(id LeaseID) (int64, error) {
282 le.mu.Lock()
283
284 unlock := func() { le.mu.Unlock() }
285 defer func() { unlock() }()
286
287 if !le.isPrimary() {
288 // forward renew request to primary instead of returning error.
289 return -1, ErrNotPrimary
290 }
291
292 demotec := le.demotec
293
294 l := le.leaseMap[id]
295 if l == nil {
296 return -1, ErrLeaseNotFound
297 }
298
299 if l.expired() {
300 le.mu.Unlock()
301 unlock = func() {}
302 select {
303 // A expired lease might be pending for revoking or going through
304 // quorum to be revoked. To be accurate, renew request must wait for the
305 // deletion to complete.
306 case <-l.revokec:
307 return -1, ErrLeaseNotFound
308 // The expired lease might fail to be revoked if the primary changes.
309 // The caller will retry on ErrNotPrimary.
310 case <-demotec:
311 return -1, ErrNotPrimary
312 case <-le.stopC:
313 return -1, ErrNotPrimary
314 }
315 }
316
317 l.refresh(0)
318 return l.ttl, nil
319}
320
321func (le *lessor) Lookup(id LeaseID) *Lease {
322 le.mu.Lock()
323 defer le.mu.Unlock()
324 return le.leaseMap[id]
325}
326
327func (le *lessor) unsafeLeases() []*Lease {
328 leases := make([]*Lease, 0, len(le.leaseMap))
329 for _, l := range le.leaseMap {
330 leases = append(leases, l)
331 }
332 sort.Sort(leasesByExpiry(leases))
333 return leases
334}
335
336func (le *lessor) Leases() []*Lease {
337 le.mu.Lock()
338 ls := le.unsafeLeases()
339 le.mu.Unlock()
340 return ls
341}
342
343func (le *lessor) Promote(extend time.Duration) {
344 le.mu.Lock()
345 defer le.mu.Unlock()
346
347 le.demotec = make(chan struct{})
348
349 // refresh the expiries of all leases.
350 for _, l := range le.leaseMap {
351 l.refresh(extend)
352 }
353
354 if len(le.leaseMap) < leaseRevokeRate {
355 // no possibility of lease pile-up
356 return
357 }
358
359 // adjust expiries in case of overlap
360 leases := le.unsafeLeases()
361
362 baseWindow := leases[0].Remaining()
363 nextWindow := baseWindow + time.Second
364 expires := 0
365 // have fewer expires than the total revoke rate so piled up leases
366 // don't consume the entire revoke limit
367 targetExpiresPerSecond := (3 * leaseRevokeRate) / 4
368 for _, l := range leases {
369 remaining := l.Remaining()
370 if remaining > nextWindow {
371 baseWindow = remaining
372 nextWindow = baseWindow + time.Second
373 expires = 1
374 continue
375 }
376 expires++
377 if expires <= targetExpiresPerSecond {
378 continue
379 }
380 rateDelay := float64(time.Second) * (float64(expires) / float64(targetExpiresPerSecond))
381 // If leases are extended by n seconds, leases n seconds ahead of the
382 // base window should be extended by only one second.
383 rateDelay -= float64(remaining - baseWindow)
384 delay := time.Duration(rateDelay)
385 nextWindow = baseWindow + delay
386 l.refresh(delay + extend)
387 }
388}
389
390type leasesByExpiry []*Lease
391
392func (le leasesByExpiry) Len() int { return len(le) }
393func (le leasesByExpiry) Less(i, j int) bool { return le[i].Remaining() < le[j].Remaining() }
394func (le leasesByExpiry) Swap(i, j int) { le[i], le[j] = le[j], le[i] }
395
396func (le *lessor) Demote() {
397 le.mu.Lock()
398 defer le.mu.Unlock()
399
400 // set the expiries of all leases to forever
401 for _, l := range le.leaseMap {
402 l.forever()
403 }
404
405 if le.demotec != nil {
406 close(le.demotec)
407 le.demotec = nil
408 }
409}
410
411// Attach attaches items to the lease with given ID. When the lease
412// expires, the attached items will be automatically removed.
413// If the given lease does not exist, an error will be returned.
414func (le *lessor) Attach(id LeaseID, items []LeaseItem) error {
415 le.mu.Lock()
416 defer le.mu.Unlock()
417
418 l := le.leaseMap[id]
419 if l == nil {
420 return ErrLeaseNotFound
421 }
422
423 l.mu.Lock()
424 for _, it := range items {
425 l.itemSet[it] = struct{}{}
426 le.itemMap[it] = id
427 }
428 l.mu.Unlock()
429 return nil
430}
431
432func (le *lessor) GetLease(item LeaseItem) LeaseID {
433 le.mu.Lock()
434 id := le.itemMap[item]
435 le.mu.Unlock()
436 return id
437}
438
439// Detach detaches items from the lease with given ID.
440// If the given lease does not exist, an error will be returned.
441func (le *lessor) Detach(id LeaseID, items []LeaseItem) error {
442 le.mu.Lock()
443 defer le.mu.Unlock()
444
445 l := le.leaseMap[id]
446 if l == nil {
447 return ErrLeaseNotFound
448 }
449
450 l.mu.Lock()
451 for _, it := range items {
452 delete(l.itemSet, it)
453 delete(le.itemMap, it)
454 }
455 l.mu.Unlock()
456 return nil
457}
458
459func (le *lessor) Recover(b backend.Backend, rd RangeDeleter) {
460 le.mu.Lock()
461 defer le.mu.Unlock()
462
463 le.b = b
464 le.rd = rd
465 le.leaseMap = make(map[LeaseID]*Lease)
466 le.itemMap = make(map[LeaseItem]LeaseID)
467 le.initAndRecover()
468}
469
470func (le *lessor) ExpiredLeasesC() <-chan []*Lease {
471 return le.expiredC
472}
473
474func (le *lessor) Stop() {
475 close(le.stopC)
476 <-le.doneC
477}
478
479func (le *lessor) runLoop() {
480 defer close(le.doneC)
481
482 for {
483 var ls []*Lease
484
485 // rate limit
486 revokeLimit := leaseRevokeRate / 2
487
488 le.mu.Lock()
489 if le.isPrimary() {
490 ls = le.findExpiredLeases(revokeLimit)
491 }
492 le.mu.Unlock()
493
494 if len(ls) != 0 {
495 select {
496 case <-le.stopC:
497 return
498 case le.expiredC <- ls:
499 default:
500 // the receiver of expiredC is probably busy handling
501 // other stuff
502 // let's try this next time after 500ms
503 }
504 }
505
506 select {
507 case <-time.After(500 * time.Millisecond):
508 case <-le.stopC:
509 return
510 }
511 }
512}
513
514// findExpiredLeases loops leases in the leaseMap until reaching expired limit
515// and returns the expired leases that needed to be revoked.
516func (le *lessor) findExpiredLeases(limit int) []*Lease {
517 leases := make([]*Lease, 0, 16)
518
519 for _, l := range le.leaseMap {
520 // TODO: probably should change to <= 100-500 millisecond to
521 // make up committing latency.
522 if l.expired() {
523 leases = append(leases, l)
524
525 // reach expired limit
526 if len(leases) == limit {
527 break
528 }
529 }
530 }
531
532 return leases
533}
534
535func (le *lessor) initAndRecover() {
536 tx := le.b.BatchTx()
537 tx.Lock()
538
539 tx.UnsafeCreateBucket(leaseBucketName)
540 _, vs := tx.UnsafeRange(leaseBucketName, int64ToBytes(0), int64ToBytes(math.MaxInt64), 0)
541 // TODO: copy vs and do decoding outside tx lock if lock contention becomes an issue.
542 for i := range vs {
543 var lpb leasepb.Lease
544 err := lpb.Unmarshal(vs[i])
545 if err != nil {
546 tx.Unlock()
547 panic("failed to unmarshal lease proto item")
548 }
549 ID := LeaseID(lpb.ID)
550 if lpb.TTL < le.minLeaseTTL {
551 lpb.TTL = le.minLeaseTTL
552 }
553 le.leaseMap[ID] = &Lease{
554 ID: ID,
555 ttl: lpb.TTL,
556 // itemSet will be filled in when recover key-value pairs
557 // set expiry to forever, refresh when promoted
558 itemSet: make(map[LeaseItem]struct{}),
559 expiry: forever,
560 revokec: make(chan struct{}),
561 }
562 }
563 tx.Unlock()
564
565 le.b.ForceCommit()
566}
567
568type Lease struct {
569 ID LeaseID
570 ttl int64 // time to live in seconds
571 // expiryMu protects concurrent accesses to expiry
572 expiryMu sync.RWMutex
573 // expiry is time when lease should expire. no expiration when expiry.IsZero() is true
574 expiry time.Time
575
576 // mu protects concurrent accesses to itemSet
577 mu sync.RWMutex
578 itemSet map[LeaseItem]struct{}
579 revokec chan struct{}
580}
581
582func (l *Lease) expired() bool {
583 return l.Remaining() <= 0
584}
585
586func (l *Lease) persistTo(b backend.Backend) {
587 key := int64ToBytes(int64(l.ID))
588
589 lpb := leasepb.Lease{ID: int64(l.ID), TTL: int64(l.ttl)}
590 val, err := lpb.Marshal()
591 if err != nil {
592 panic("failed to marshal lease proto item")
593 }
594
595 b.BatchTx().Lock()
596 b.BatchTx().UnsafePut(leaseBucketName, key, val)
597 b.BatchTx().Unlock()
598}
599
600// TTL returns the TTL of the Lease.
601func (l *Lease) TTL() int64 {
602 return l.ttl
603}
604
605// refresh refreshes the expiry of the lease.
606func (l *Lease) refresh(extend time.Duration) {
607 newExpiry := time.Now().Add(extend + time.Duration(l.ttl)*time.Second)
608 l.expiryMu.Lock()
609 defer l.expiryMu.Unlock()
610 l.expiry = newExpiry
611}
612
613// forever sets the expiry of lease to be forever.
614func (l *Lease) forever() {
615 l.expiryMu.Lock()
616 defer l.expiryMu.Unlock()
617 l.expiry = forever
618}
619
620// Keys returns all the keys attached to the lease.
621func (l *Lease) Keys() []string {
622 l.mu.RLock()
623 keys := make([]string, 0, len(l.itemSet))
624 for k := range l.itemSet {
625 keys = append(keys, k.Key)
626 }
627 l.mu.RUnlock()
628 return keys
629}
630
631// Remaining returns the remaining time of the lease.
632func (l *Lease) Remaining() time.Duration {
633 l.expiryMu.RLock()
634 defer l.expiryMu.RUnlock()
635 if l.expiry.IsZero() {
636 return time.Duration(math.MaxInt64)
637 }
638 return time.Until(l.expiry)
639}
640
641type LeaseItem struct {
642 Key string
643}
644
645func int64ToBytes(n int64) []byte {
646 bytes := make([]byte, 8)
647 binary.BigEndian.PutUint64(bytes, uint64(n))
648 return bytes
649}
650
651// FakeLessor is a fake implementation of Lessor interface.
652// Used for testing only.
653type FakeLessor struct{}
654
655func (fl *FakeLessor) SetRangeDeleter(dr RangeDeleter) {}
656
657func (fl *FakeLessor) Grant(id LeaseID, ttl int64) (*Lease, error) { return nil, nil }
658
659func (fl *FakeLessor) Revoke(id LeaseID) error { return nil }
660
661func (fl *FakeLessor) Attach(id LeaseID, items []LeaseItem) error { return nil }
662
663func (fl *FakeLessor) GetLease(item LeaseItem) LeaseID { return 0 }
664func (fl *FakeLessor) Detach(id LeaseID, items []LeaseItem) error { return nil }
665
666func (fl *FakeLessor) Promote(extend time.Duration) {}
667
668func (fl *FakeLessor) Demote() {}
669
670func (fl *FakeLessor) Renew(id LeaseID) (int64, error) { return 10, nil }
671
672func (fl *FakeLessor) Lookup(id LeaseID) *Lease { return nil }
673
674func (fl *FakeLessor) Leases() []*Lease { return nil }
675
676func (fl *FakeLessor) ExpiredLeasesC() <-chan []*Lease { return nil }
677
678func (fl *FakeLessor) Recover(b backend.Backend, rd RangeDeleter) {}
679
680func (fl *FakeLessor) Stop() {}