blob: cc20283167ea74f0b902675bd50028777c2a418d [file] [log] [blame]
khenaidooab1f7bd2019-11-14 14:00:27 -05001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package lease
16
17import (
18 "container/heap"
19 "context"
20 "encoding/binary"
21 "errors"
22 "math"
23 "sort"
24 "sync"
25 "time"
26
27 pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
28 "go.etcd.io/etcd/lease/leasepb"
29 "go.etcd.io/etcd/mvcc/backend"
30 "go.uber.org/zap"
31)
32
33// NoLease is a special LeaseID representing the absence of a lease.
34const NoLease = LeaseID(0)
35
36// MaxLeaseTTL is the maximum lease TTL value
37const MaxLeaseTTL = 9000000000
38
39var (
40 forever = time.Time{}
41
42 leaseBucketName = []byte("lease")
43
44 // maximum number of leases to revoke per second; configurable for tests
45 leaseRevokeRate = 1000
46
47 // maximum number of lease checkpoints recorded to the consensus log per second; configurable for tests
48 leaseCheckpointRate = 1000
49
50 // the default interval of lease checkpoint
51 defaultLeaseCheckpointInterval = 5 * time.Minute
52
53 // maximum number of lease checkpoints to batch into a single consensus log entry
54 maxLeaseCheckpointBatchSize = 1000
55
56 // the default interval to check if the expired lease is revoked
57 defaultExpiredleaseRetryInterval = 3 * time.Second
58
59 ErrNotPrimary = errors.New("not a primary lessor")
60 ErrLeaseNotFound = errors.New("lease not found")
61 ErrLeaseExists = errors.New("lease already exists")
62 ErrLeaseTTLTooLarge = errors.New("too large lease TTL")
63)
64
65// TxnDelete is a TxnWrite that only permits deletes. Defined here
66// to avoid circular dependency with mvcc.
67type TxnDelete interface {
68 DeleteRange(key, end []byte) (n, rev int64)
69 End()
70}
71
72// RangeDeleter is a TxnDelete constructor.
73type RangeDeleter func() TxnDelete
74
75// Checkpointer permits checkpointing of lease remaining TTLs to the consensus log. Defined here to
76// avoid circular dependency with mvcc.
77type Checkpointer func(ctx context.Context, lc *pb.LeaseCheckpointRequest)
78
79type LeaseID int64
80
81// Lessor owns leases. It can grant, revoke, renew and modify leases for lessee.
82type Lessor interface {
83 // SetRangeDeleter lets the lessor create TxnDeletes to the store.
84 // Lessor deletes the items in the revoked or expired lease by creating
85 // new TxnDeletes.
86 SetRangeDeleter(rd RangeDeleter)
87
88 SetCheckpointer(cp Checkpointer)
89
90 // Grant grants a lease that expires at least after TTL seconds.
91 Grant(id LeaseID, ttl int64) (*Lease, error)
92 // Revoke revokes a lease with given ID. The item attached to the
93 // given lease will be removed. If the ID does not exist, an error
94 // will be returned.
95 Revoke(id LeaseID) error
96
97 // Checkpoint applies the remainingTTL of a lease. The remainingTTL is used in Promote to set
98 // the expiry of leases to less than the full TTL when possible.
99 Checkpoint(id LeaseID, remainingTTL int64) error
100
101 // Attach attaches given leaseItem to the lease with given LeaseID.
102 // If the lease does not exist, an error will be returned.
103 Attach(id LeaseID, items []LeaseItem) error
104
105 // GetLease returns LeaseID for given item.
106 // If no lease found, NoLease value will be returned.
107 GetLease(item LeaseItem) LeaseID
108
109 // Detach detaches given leaseItem from the lease with given LeaseID.
110 // If the lease does not exist, an error will be returned.
111 Detach(id LeaseID, items []LeaseItem) error
112
113 // Promote promotes the lessor to be the primary lessor. Primary lessor manages
114 // the expiration and renew of leases.
115 // Newly promoted lessor renew the TTL of all lease to extend + previous TTL.
116 Promote(extend time.Duration)
117
118 // Demote demotes the lessor from being the primary lessor.
119 Demote()
120
121 // Renew renews a lease with given ID. It returns the renewed TTL. If the ID does not exist,
122 // an error will be returned.
123 Renew(id LeaseID) (int64, error)
124
125 // Lookup gives the lease at a given lease id, if any
126 Lookup(id LeaseID) *Lease
127
128 // Leases lists all leases.
129 Leases() []*Lease
130
131 // ExpiredLeasesC returns a chan that is used to receive expired leases.
132 ExpiredLeasesC() <-chan []*Lease
133
134 // Recover recovers the lessor state from the given backend and RangeDeleter.
135 Recover(b backend.Backend, rd RangeDeleter)
136
137 // Stop stops the lessor for managing leases. The behavior of calling Stop multiple
138 // times is undefined.
139 Stop()
140}
141
142// lessor implements Lessor interface.
143// TODO: use clockwork for testability.
144type lessor struct {
145 mu sync.RWMutex
146
147 // demotec is set when the lessor is the primary.
148 // demotec will be closed if the lessor is demoted.
149 demotec chan struct{}
150
151 leaseMap map[LeaseID]*Lease
152 leaseExpiredNotifier *LeaseExpiredNotifier
153 leaseCheckpointHeap LeaseQueue
154 itemMap map[LeaseItem]LeaseID
155
156 // When a lease expires, the lessor will delete the
157 // leased range (or key) by the RangeDeleter.
158 rd RangeDeleter
159
160 // When a lease's deadline should be persisted to preserve the remaining TTL across leader
161 // elections and restarts, the lessor will checkpoint the lease by the Checkpointer.
162 cp Checkpointer
163
164 // backend to persist leases. We only persist lease ID and expiry for now.
165 // The leased items can be recovered by iterating all the keys in kv.
166 b backend.Backend
167
168 // minLeaseTTL is the minimum lease TTL that can be granted for a lease. Any
169 // requests for shorter TTLs are extended to the minimum TTL.
170 minLeaseTTL int64
171
172 expiredC chan []*Lease
173 // stopC is a channel whose closure indicates that the lessor should be stopped.
174 stopC chan struct{}
175 // doneC is a channel whose closure indicates that the lessor is stopped.
176 doneC chan struct{}
177
178 lg *zap.Logger
179
180 // Wait duration between lease checkpoints.
181 checkpointInterval time.Duration
182 // the interval to check if the expired lease is revoked
183 expiredLeaseRetryInterval time.Duration
184}
185
186type LessorConfig struct {
187 MinLeaseTTL int64
188 CheckpointInterval time.Duration
189 ExpiredLeasesRetryInterval time.Duration
190}
191
192func NewLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) Lessor {
193 return newLessor(lg, b, cfg)
194}
195
196func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) *lessor {
197 checkpointInterval := cfg.CheckpointInterval
198 expiredLeaseRetryInterval := cfg.ExpiredLeasesRetryInterval
199 if checkpointInterval == 0 {
200 checkpointInterval = defaultLeaseCheckpointInterval
201 }
202 if expiredLeaseRetryInterval == 0 {
203 expiredLeaseRetryInterval = defaultExpiredleaseRetryInterval
204 }
205 l := &lessor{
206 leaseMap: make(map[LeaseID]*Lease),
207 itemMap: make(map[LeaseItem]LeaseID),
208 leaseExpiredNotifier: newLeaseExpiredNotifier(),
209 leaseCheckpointHeap: make(LeaseQueue, 0),
210 b: b,
211 minLeaseTTL: cfg.MinLeaseTTL,
212 checkpointInterval: checkpointInterval,
213 expiredLeaseRetryInterval: expiredLeaseRetryInterval,
214 // expiredC is a small buffered chan to avoid unnecessary blocking.
215 expiredC: make(chan []*Lease, 16),
216 stopC: make(chan struct{}),
217 doneC: make(chan struct{}),
218 lg: lg,
219 }
220 l.initAndRecover()
221
222 go l.runLoop()
223
224 return l
225}
226
227// isPrimary indicates if this lessor is the primary lessor. The primary
228// lessor manages lease expiration and renew.
229//
230// in etcd, raft leader is the primary. Thus there might be two primary
231// leaders at the same time (raft allows concurrent leader but with different term)
232// for at most a leader election timeout.
233// The old primary leader cannot affect the correctness since its proposal has a
234// smaller term and will not be committed.
235//
236// TODO: raft follower do not forward lease management proposals. There might be a
237// very small window (within second normally which depends on go scheduling) that
238// a raft follow is the primary between the raft leader demotion and lessor demotion.
239// Usually this should not be a problem. Lease should not be that sensitive to timing.
240func (le *lessor) isPrimary() bool {
241 return le.demotec != nil
242}
243
244func (le *lessor) SetRangeDeleter(rd RangeDeleter) {
245 le.mu.Lock()
246 defer le.mu.Unlock()
247
248 le.rd = rd
249}
250
251func (le *lessor) SetCheckpointer(cp Checkpointer) {
252 le.mu.Lock()
253 defer le.mu.Unlock()
254
255 le.cp = cp
256}
257
258func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) {
259 if id == NoLease {
260 return nil, ErrLeaseNotFound
261 }
262
263 if ttl > MaxLeaseTTL {
264 return nil, ErrLeaseTTLTooLarge
265 }
266
267 // TODO: when lessor is under high load, it should give out lease
268 // with longer TTL to reduce renew load.
269 l := &Lease{
270 ID: id,
271 ttl: ttl,
272 itemSet: make(map[LeaseItem]struct{}),
273 revokec: make(chan struct{}),
274 }
275
276 le.mu.Lock()
277 defer le.mu.Unlock()
278
279 if _, ok := le.leaseMap[id]; ok {
280 return nil, ErrLeaseExists
281 }
282
283 if l.ttl < le.minLeaseTTL {
284 l.ttl = le.minLeaseTTL
285 }
286
287 if le.isPrimary() {
288 l.refresh(0)
289 } else {
290 l.forever()
291 }
292
293 le.leaseMap[id] = l
294 item := &LeaseWithTime{id: l.ID, time: l.expiry.UnixNano()}
295 le.leaseExpiredNotifier.RegisterOrUpdate(item)
296 l.persistTo(le.b)
297
298 leaseTotalTTLs.Observe(float64(l.ttl))
299 leaseGranted.Inc()
300
301 if le.isPrimary() {
302 le.scheduleCheckpointIfNeeded(l)
303 }
304
305 return l, nil
306}
307
308func (le *lessor) Revoke(id LeaseID) error {
309 le.mu.Lock()
310
311 l := le.leaseMap[id]
312 if l == nil {
313 le.mu.Unlock()
314 return ErrLeaseNotFound
315 }
316 defer close(l.revokec)
317 // unlock before doing external work
318 le.mu.Unlock()
319
320 if le.rd == nil {
321 return nil
322 }
323
324 txn := le.rd()
325
326 // sort keys so deletes are in same order among all members,
327 // otherwise the backend hashes will be different
328 keys := l.Keys()
329 sort.StringSlice(keys).Sort()
330 for _, key := range keys {
331 txn.DeleteRange([]byte(key), nil)
332 }
333
334 le.mu.Lock()
335 defer le.mu.Unlock()
336 delete(le.leaseMap, l.ID)
337 // lease deletion needs to be in the same backend transaction with the
338 // kv deletion. Or we might end up with not executing the revoke or not
339 // deleting the keys if etcdserver fails in between.
340 le.b.BatchTx().UnsafeDelete(leaseBucketName, int64ToBytes(int64(l.ID)))
341
342 txn.End()
343
344 leaseRevoked.Inc()
345 return nil
346}
347
348func (le *lessor) Checkpoint(id LeaseID, remainingTTL int64) error {
349 le.mu.Lock()
350 defer le.mu.Unlock()
351
352 if l, ok := le.leaseMap[id]; ok {
353 // when checkpointing, we only update the remainingTTL, Promote is responsible for applying this to lease expiry
354 l.remainingTTL = remainingTTL
355 if le.isPrimary() {
356 // schedule the next checkpoint as needed
357 le.scheduleCheckpointIfNeeded(l)
358 }
359 }
360 return nil
361}
362
363// Renew renews an existing lease. If the given lease does not exist or
364// has expired, an error will be returned.
365func (le *lessor) Renew(id LeaseID) (int64, error) {
366 le.mu.RLock()
367 if !le.isPrimary() {
368 // forward renew request to primary instead of returning error.
369 le.mu.RUnlock()
370 return -1, ErrNotPrimary
371 }
372
373 demotec := le.demotec
374
375 l := le.leaseMap[id]
376 if l == nil {
377 le.mu.RUnlock()
378 return -1, ErrLeaseNotFound
379 }
380 // Clear remaining TTL when we renew if it is set
381 clearRemainingTTL := le.cp != nil && l.remainingTTL > 0
382
383 le.mu.RUnlock()
384 if l.expired() {
385 select {
386 // A expired lease might be pending for revoking or going through
387 // quorum to be revoked. To be accurate, renew request must wait for the
388 // deletion to complete.
389 case <-l.revokec:
390 return -1, ErrLeaseNotFound
391 // The expired lease might fail to be revoked if the primary changes.
392 // The caller will retry on ErrNotPrimary.
393 case <-demotec:
394 return -1, ErrNotPrimary
395 case <-le.stopC:
396 return -1, ErrNotPrimary
397 }
398 }
399
400 // Clear remaining TTL when we renew if it is set
401 // By applying a RAFT entry only when the remainingTTL is already set, we limit the number
402 // of RAFT entries written per lease to a max of 2 per checkpoint interval.
403 if clearRemainingTTL {
404 le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: []*pb.LeaseCheckpoint{{ID: int64(l.ID), Remaining_TTL: 0}}})
405 }
406
407 le.mu.Lock()
408 l.refresh(0)
409 item := &LeaseWithTime{id: l.ID, time: l.expiry.UnixNano()}
410 le.leaseExpiredNotifier.RegisterOrUpdate(item)
411 le.mu.Unlock()
412
413 leaseRenewed.Inc()
414 return l.ttl, nil
415}
416
417func (le *lessor) Lookup(id LeaseID) *Lease {
418 le.mu.RLock()
419 defer le.mu.RUnlock()
420 return le.leaseMap[id]
421}
422
423func (le *lessor) unsafeLeases() []*Lease {
424 leases := make([]*Lease, 0, len(le.leaseMap))
425 for _, l := range le.leaseMap {
426 leases = append(leases, l)
427 }
428 return leases
429}
430
431func (le *lessor) Leases() []*Lease {
432 le.mu.RLock()
433 ls := le.unsafeLeases()
434 le.mu.RUnlock()
435 sort.Sort(leasesByExpiry(ls))
436 return ls
437}
438
439func (le *lessor) Promote(extend time.Duration) {
440 le.mu.Lock()
441 defer le.mu.Unlock()
442
443 le.demotec = make(chan struct{})
444
445 // refresh the expiries of all leases.
446 for _, l := range le.leaseMap {
447 l.refresh(extend)
448 item := &LeaseWithTime{id: l.ID, time: l.expiry.UnixNano()}
449 le.leaseExpiredNotifier.RegisterOrUpdate(item)
450 }
451
452 if len(le.leaseMap) < leaseRevokeRate {
453 // no possibility of lease pile-up
454 return
455 }
456
457 // adjust expiries in case of overlap
458 leases := le.unsafeLeases()
459 sort.Sort(leasesByExpiry(leases))
460
461 baseWindow := leases[0].Remaining()
462 nextWindow := baseWindow + time.Second
463 expires := 0
464 // have fewer expires than the total revoke rate so piled up leases
465 // don't consume the entire revoke limit
466 targetExpiresPerSecond := (3 * leaseRevokeRate) / 4
467 for _, l := range leases {
468 remaining := l.Remaining()
469 if remaining > nextWindow {
470 baseWindow = remaining
471 nextWindow = baseWindow + time.Second
472 expires = 1
473 continue
474 }
475 expires++
476 if expires <= targetExpiresPerSecond {
477 continue
478 }
479 rateDelay := float64(time.Second) * (float64(expires) / float64(targetExpiresPerSecond))
480 // If leases are extended by n seconds, leases n seconds ahead of the
481 // base window should be extended by only one second.
482 rateDelay -= float64(remaining - baseWindow)
483 delay := time.Duration(rateDelay)
484 nextWindow = baseWindow + delay
485 l.refresh(delay + extend)
486 item := &LeaseWithTime{id: l.ID, time: l.expiry.UnixNano()}
487 le.leaseExpiredNotifier.RegisterOrUpdate(item)
488 le.scheduleCheckpointIfNeeded(l)
489 }
490}
491
492type leasesByExpiry []*Lease
493
494func (le leasesByExpiry) Len() int { return len(le) }
495func (le leasesByExpiry) Less(i, j int) bool { return le[i].Remaining() < le[j].Remaining() }
496func (le leasesByExpiry) Swap(i, j int) { le[i], le[j] = le[j], le[i] }
497
498func (le *lessor) Demote() {
499 le.mu.Lock()
500 defer le.mu.Unlock()
501
502 // set the expiries of all leases to forever
503 for _, l := range le.leaseMap {
504 l.forever()
505 }
506
507 le.clearScheduledLeasesCheckpoints()
508
509 if le.demotec != nil {
510 close(le.demotec)
511 le.demotec = nil
512 }
513}
514
515// Attach attaches items to the lease with given ID. When the lease
516// expires, the attached items will be automatically removed.
517// If the given lease does not exist, an error will be returned.
518func (le *lessor) Attach(id LeaseID, items []LeaseItem) error {
519 le.mu.Lock()
520 defer le.mu.Unlock()
521
522 l := le.leaseMap[id]
523 if l == nil {
524 return ErrLeaseNotFound
525 }
526
527 l.mu.Lock()
528 for _, it := range items {
529 l.itemSet[it] = struct{}{}
530 le.itemMap[it] = id
531 }
532 l.mu.Unlock()
533 return nil
534}
535
536func (le *lessor) GetLease(item LeaseItem) LeaseID {
537 le.mu.RLock()
538 id := le.itemMap[item]
539 le.mu.RUnlock()
540 return id
541}
542
543// Detach detaches items from the lease with given ID.
544// If the given lease does not exist, an error will be returned.
545func (le *lessor) Detach(id LeaseID, items []LeaseItem) error {
546 le.mu.Lock()
547 defer le.mu.Unlock()
548
549 l := le.leaseMap[id]
550 if l == nil {
551 return ErrLeaseNotFound
552 }
553
554 l.mu.Lock()
555 for _, it := range items {
556 delete(l.itemSet, it)
557 delete(le.itemMap, it)
558 }
559 l.mu.Unlock()
560 return nil
561}
562
563func (le *lessor) Recover(b backend.Backend, rd RangeDeleter) {
564 le.mu.Lock()
565 defer le.mu.Unlock()
566
567 le.b = b
568 le.rd = rd
569 le.leaseMap = make(map[LeaseID]*Lease)
570 le.itemMap = make(map[LeaseItem]LeaseID)
571 le.initAndRecover()
572}
573
574func (le *lessor) ExpiredLeasesC() <-chan []*Lease {
575 return le.expiredC
576}
577
578func (le *lessor) Stop() {
579 close(le.stopC)
580 <-le.doneC
581}
582
583func (le *lessor) runLoop() {
584 defer close(le.doneC)
585
586 for {
587 le.revokeExpiredLeases()
588 le.checkpointScheduledLeases()
589
590 select {
591 case <-time.After(500 * time.Millisecond):
592 case <-le.stopC:
593 return
594 }
595 }
596}
597
598// revokeExpiredLeases finds all leases past their expiry and sends them to expired channel for
599// to be revoked.
600func (le *lessor) revokeExpiredLeases() {
601 var ls []*Lease
602
603 // rate limit
604 revokeLimit := leaseRevokeRate / 2
605
606 le.mu.RLock()
607 if le.isPrimary() {
608 ls = le.findExpiredLeases(revokeLimit)
609 }
610 le.mu.RUnlock()
611
612 if len(ls) != 0 {
613 select {
614 case <-le.stopC:
615 return
616 case le.expiredC <- ls:
617 default:
618 // the receiver of expiredC is probably busy handling
619 // other stuff
620 // let's try this next time after 500ms
621 }
622 }
623}
624
625// checkpointScheduledLeases finds all scheduled lease checkpoints that are due and
626// submits them to the checkpointer to persist them to the consensus log.
627func (le *lessor) checkpointScheduledLeases() {
628 var cps []*pb.LeaseCheckpoint
629
630 // rate limit
631 for i := 0; i < leaseCheckpointRate/2; i++ {
632 le.mu.Lock()
633 if le.isPrimary() {
634 cps = le.findDueScheduledCheckpoints(maxLeaseCheckpointBatchSize)
635 }
636 le.mu.Unlock()
637
638 if len(cps) != 0 {
639 le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: cps})
640 }
641 if len(cps) < maxLeaseCheckpointBatchSize {
642 return
643 }
644 }
645}
646
647func (le *lessor) clearScheduledLeasesCheckpoints() {
648 le.leaseCheckpointHeap = make(LeaseQueue, 0)
649}
650
651// expireExists returns true if expiry items exist.
652// It pops only when expiry item exists.
653// "next" is true, to indicate that it may exist in next attempt.
654func (le *lessor) expireExists() (l *Lease, ok bool, next bool) {
655 if le.leaseExpiredNotifier.Len() == 0 {
656 return nil, false, false
657 }
658
659 item := le.leaseExpiredNotifier.Poll()
660 l = le.leaseMap[item.id]
661 if l == nil {
662 // lease has expired or been revoked
663 // no need to revoke (nothing is expiry)
664 le.leaseExpiredNotifier.Unregister() // O(log N)
665 return nil, false, true
666 }
667 now := time.Now()
668 if now.UnixNano() < item.time /* expiration time */ {
669 // Candidate expirations are caught up, reinsert this item
670 // and no need to revoke (nothing is expiry)
671 return l, false, false
672 }
673
674 // recheck if revoke is complete after retry interval
675 item.time = now.Add(le.expiredLeaseRetryInterval).UnixNano()
676 le.leaseExpiredNotifier.RegisterOrUpdate(item)
677 return l, true, false
678}
679
680// findExpiredLeases loops leases in the leaseMap until reaching expired limit
681// and returns the expired leases that needed to be revoked.
682func (le *lessor) findExpiredLeases(limit int) []*Lease {
683 leases := make([]*Lease, 0, 16)
684
685 for {
686 l, ok, next := le.expireExists()
687 if !ok && !next {
688 break
689 }
690 if !ok {
691 continue
692 }
693 if next {
694 continue
695 }
696
697 if l.expired() {
698 leases = append(leases, l)
699
700 // reach expired limit
701 if len(leases) == limit {
702 break
703 }
704 }
705 }
706
707 return leases
708}
709
710func (le *lessor) scheduleCheckpointIfNeeded(lease *Lease) {
711 if le.cp == nil {
712 return
713 }
714
715 if lease.RemainingTTL() > int64(le.checkpointInterval.Seconds()) {
716 if le.lg != nil {
717 le.lg.Debug("Scheduling lease checkpoint",
718 zap.Int64("leaseID", int64(lease.ID)),
719 zap.Duration("intervalSeconds", le.checkpointInterval),
720 )
721 }
722 heap.Push(&le.leaseCheckpointHeap, &LeaseWithTime{
723 id: lease.ID,
724 time: time.Now().Add(le.checkpointInterval).UnixNano(),
725 })
726 }
727}
728
729func (le *lessor) findDueScheduledCheckpoints(checkpointLimit int) []*pb.LeaseCheckpoint {
730 if le.cp == nil {
731 return nil
732 }
733
734 now := time.Now()
735 cps := []*pb.LeaseCheckpoint{}
736 for le.leaseCheckpointHeap.Len() > 0 && len(cps) < checkpointLimit {
737 lt := le.leaseCheckpointHeap[0]
738 if lt.time /* next checkpoint time */ > now.UnixNano() {
739 return cps
740 }
741 heap.Pop(&le.leaseCheckpointHeap)
742 var l *Lease
743 var ok bool
744 if l, ok = le.leaseMap[lt.id]; !ok {
745 continue
746 }
747 if !now.Before(l.expiry) {
748 continue
749 }
750 remainingTTL := int64(math.Ceil(l.expiry.Sub(now).Seconds()))
751 if remainingTTL >= l.ttl {
752 continue
753 }
754 if le.lg != nil {
755 le.lg.Debug("Checkpointing lease",
756 zap.Int64("leaseID", int64(lt.id)),
757 zap.Int64("remainingTTL", remainingTTL),
758 )
759 }
760 cps = append(cps, &pb.LeaseCheckpoint{ID: int64(lt.id), Remaining_TTL: remainingTTL})
761 }
762 return cps
763}
764
765func (le *lessor) initAndRecover() {
766 tx := le.b.BatchTx()
767 tx.Lock()
768
769 tx.UnsafeCreateBucket(leaseBucketName)
770 _, vs := tx.UnsafeRange(leaseBucketName, int64ToBytes(0), int64ToBytes(math.MaxInt64), 0)
771 // TODO: copy vs and do decoding outside tx lock if lock contention becomes an issue.
772 for i := range vs {
773 var lpb leasepb.Lease
774 err := lpb.Unmarshal(vs[i])
775 if err != nil {
776 tx.Unlock()
777 panic("failed to unmarshal lease proto item")
778 }
779 ID := LeaseID(lpb.ID)
780 if lpb.TTL < le.minLeaseTTL {
781 lpb.TTL = le.minLeaseTTL
782 }
783 le.leaseMap[ID] = &Lease{
784 ID: ID,
785 ttl: lpb.TTL,
786 // itemSet will be filled in when recover key-value pairs
787 // set expiry to forever, refresh when promoted
788 itemSet: make(map[LeaseItem]struct{}),
789 expiry: forever,
790 revokec: make(chan struct{}),
791 }
792 }
793 le.leaseExpiredNotifier.Init()
794 heap.Init(&le.leaseCheckpointHeap)
795 tx.Unlock()
796
797 le.b.ForceCommit()
798}
799
800type Lease struct {
801 ID LeaseID
802 ttl int64 // time to live of the lease in seconds
803 remainingTTL int64 // remaining time to live in seconds, if zero valued it is considered unset and the full ttl should be used
804 // expiryMu protects concurrent accesses to expiry
805 expiryMu sync.RWMutex
806 // expiry is time when lease should expire. no expiration when expiry.IsZero() is true
807 expiry time.Time
808
809 // mu protects concurrent accesses to itemSet
810 mu sync.RWMutex
811 itemSet map[LeaseItem]struct{}
812 revokec chan struct{}
813}
814
815func (l *Lease) expired() bool {
816 return l.Remaining() <= 0
817}
818
819func (l *Lease) persistTo(b backend.Backend) {
820 key := int64ToBytes(int64(l.ID))
821
822 lpb := leasepb.Lease{ID: int64(l.ID), TTL: l.ttl, RemainingTTL: l.remainingTTL}
823 val, err := lpb.Marshal()
824 if err != nil {
825 panic("failed to marshal lease proto item")
826 }
827
828 b.BatchTx().Lock()
829 b.BatchTx().UnsafePut(leaseBucketName, key, val)
830 b.BatchTx().Unlock()
831}
832
833// TTL returns the TTL of the Lease.
834func (l *Lease) TTL() int64 {
835 return l.ttl
836}
837
838// RemainingTTL returns the last checkpointed remaining TTL of the lease.
839// TODO(jpbetz): do not expose this utility method
840func (l *Lease) RemainingTTL() int64 {
841 if l.remainingTTL > 0 {
842 return l.remainingTTL
843 }
844 return l.ttl
845}
846
847// refresh refreshes the expiry of the lease.
848func (l *Lease) refresh(extend time.Duration) {
849 newExpiry := time.Now().Add(extend + time.Duration(l.RemainingTTL())*time.Second)
850 l.expiryMu.Lock()
851 defer l.expiryMu.Unlock()
852 l.expiry = newExpiry
853}
854
855// forever sets the expiry of lease to be forever.
856func (l *Lease) forever() {
857 l.expiryMu.Lock()
858 defer l.expiryMu.Unlock()
859 l.expiry = forever
860}
861
862// Keys returns all the keys attached to the lease.
863func (l *Lease) Keys() []string {
864 l.mu.RLock()
865 keys := make([]string, 0, len(l.itemSet))
866 for k := range l.itemSet {
867 keys = append(keys, k.Key)
868 }
869 l.mu.RUnlock()
870 return keys
871}
872
873// Remaining returns the remaining time of the lease.
874func (l *Lease) Remaining() time.Duration {
875 l.expiryMu.RLock()
876 defer l.expiryMu.RUnlock()
877 if l.expiry.IsZero() {
878 return time.Duration(math.MaxInt64)
879 }
880 return time.Until(l.expiry)
881}
882
883type LeaseItem struct {
884 Key string
885}
886
887func int64ToBytes(n int64) []byte {
888 bytes := make([]byte, 8)
889 binary.BigEndian.PutUint64(bytes, uint64(n))
890 return bytes
891}
892
893// FakeLessor is a fake implementation of Lessor interface.
894// Used for testing only.
895type FakeLessor struct{}
896
897func (fl *FakeLessor) SetRangeDeleter(dr RangeDeleter) {}
898
899func (fl *FakeLessor) SetCheckpointer(cp Checkpointer) {}
900
901func (fl *FakeLessor) Grant(id LeaseID, ttl int64) (*Lease, error) { return nil, nil }
902
903func (fl *FakeLessor) Revoke(id LeaseID) error { return nil }
904
905func (fl *FakeLessor) Checkpoint(id LeaseID, remainingTTL int64) error { return nil }
906
907func (fl *FakeLessor) Attach(id LeaseID, items []LeaseItem) error { return nil }
908
909func (fl *FakeLessor) GetLease(item LeaseItem) LeaseID { return 0 }
910func (fl *FakeLessor) Detach(id LeaseID, items []LeaseItem) error { return nil }
911
912func (fl *FakeLessor) Promote(extend time.Duration) {}
913
914func (fl *FakeLessor) Demote() {}
915
916func (fl *FakeLessor) Renew(id LeaseID) (int64, error) { return 10, nil }
917
918func (fl *FakeLessor) Lookup(id LeaseID) *Lease { return nil }
919
920func (fl *FakeLessor) Leases() []*Lease { return nil }
921
922func (fl *FakeLessor) ExpiredLeasesC() <-chan []*Lease { return nil }
923
924func (fl *FakeLessor) Recover(b backend.Backend, rd RangeDeleter) {}
925
926func (fl *FakeLessor) Stop() {}
927
928type FakeTxnDelete struct {
929 backend.BatchTx
930}
931
932func (ftd *FakeTxnDelete) DeleteRange(key, end []byte) (n, rev int64) { return 0, 0 }
933func (ftd *FakeTxnDelete) End() { ftd.Unlock() }