khenaidoo | d948f77 | 2021-08-11 17:49:24 -0400 | [diff] [blame] | 1 | // Copyright 2015 The etcd Authors |
| 2 | // |
| 3 | // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | // you may not use this file except in compliance with the License. |
| 5 | // You may obtain a copy of the License at |
| 6 | // |
| 7 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | // |
| 9 | // Unless required by applicable law or agreed to in writing, software |
| 10 | // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | // See the License for the specific language governing permissions and |
| 13 | // limitations under the License. |
| 14 | |
| 15 | package lease |
| 16 | |
| 17 | import ( |
| 18 | "encoding/binary" |
| 19 | "errors" |
| 20 | "math" |
| 21 | "sort" |
| 22 | "sync" |
| 23 | "time" |
| 24 | |
| 25 | "github.com/coreos/etcd/lease/leasepb" |
| 26 | "github.com/coreos/etcd/mvcc/backend" |
| 27 | ) |
| 28 | |
| 29 | // NoLease is a special LeaseID representing the absence of a lease. |
| 30 | const NoLease = LeaseID(0) |
| 31 | |
| 32 | // MaxLeaseTTL is the maximum lease TTL value |
| 33 | const MaxLeaseTTL = 9000000000 |
| 34 | |
| 35 | var ( |
| 36 | forever = time.Time{} |
| 37 | |
| 38 | leaseBucketName = []byte("lease") |
| 39 | |
| 40 | // maximum number of leases to revoke per second; configurable for tests |
| 41 | leaseRevokeRate = 1000 |
| 42 | |
| 43 | ErrNotPrimary = errors.New("not a primary lessor") |
| 44 | ErrLeaseNotFound = errors.New("lease not found") |
| 45 | ErrLeaseExists = errors.New("lease already exists") |
| 46 | ErrLeaseTTLTooLarge = errors.New("too large lease TTL") |
| 47 | ) |
| 48 | |
| 49 | // TxnDelete is a TxnWrite that only permits deletes. Defined here |
| 50 | // to avoid circular dependency with mvcc. |
| 51 | type TxnDelete interface { |
| 52 | DeleteRange(key, end []byte) (n, rev int64) |
| 53 | End() |
| 54 | } |
| 55 | |
| 56 | // RangeDeleter is a TxnDelete constructor. |
| 57 | type RangeDeleter func() TxnDelete |
| 58 | |
| 59 | type LeaseID int64 |
| 60 | |
| 61 | // Lessor owns leases. It can grant, revoke, renew and modify leases for lessee. |
| 62 | type Lessor interface { |
| 63 | // SetRangeDeleter lets the lessor create TxnDeletes to the store. |
| 64 | // Lessor deletes the items in the revoked or expired lease by creating |
| 65 | // new TxnDeletes. |
| 66 | SetRangeDeleter(rd RangeDeleter) |
| 67 | |
| 68 | // Grant grants a lease that expires at least after TTL seconds. |
| 69 | Grant(id LeaseID, ttl int64) (*Lease, error) |
| 70 | // Revoke revokes a lease with given ID. The item attached to the |
| 71 | // given lease will be removed. If the ID does not exist, an error |
| 72 | // will be returned. |
| 73 | Revoke(id LeaseID) error |
| 74 | |
| 75 | // Attach attaches given leaseItem to the lease with given LeaseID. |
| 76 | // If the lease does not exist, an error will be returned. |
| 77 | Attach(id LeaseID, items []LeaseItem) error |
| 78 | |
| 79 | // GetLease returns LeaseID for given item. |
| 80 | // If no lease found, NoLease value will be returned. |
| 81 | GetLease(item LeaseItem) LeaseID |
| 82 | |
| 83 | // Detach detaches given leaseItem from the lease with given LeaseID. |
| 84 | // If the lease does not exist, an error will be returned. |
| 85 | Detach(id LeaseID, items []LeaseItem) error |
| 86 | |
| 87 | // Promote promotes the lessor to be the primary lessor. Primary lessor manages |
| 88 | // the expiration and renew of leases. |
| 89 | // Newly promoted lessor renew the TTL of all lease to extend + previous TTL. |
| 90 | Promote(extend time.Duration) |
| 91 | |
| 92 | // Demote demotes the lessor from being the primary lessor. |
| 93 | Demote() |
| 94 | |
| 95 | // Renew renews a lease with given ID. It returns the renewed TTL. If the ID does not exist, |
| 96 | // an error will be returned. |
| 97 | Renew(id LeaseID) (int64, error) |
| 98 | |
| 99 | // Lookup gives the lease at a given lease id, if any |
| 100 | Lookup(id LeaseID) *Lease |
| 101 | |
| 102 | // Leases lists all leases. |
| 103 | Leases() []*Lease |
| 104 | |
| 105 | // ExpiredLeasesC returns a chan that is used to receive expired leases. |
| 106 | ExpiredLeasesC() <-chan []*Lease |
| 107 | |
| 108 | // Recover recovers the lessor state from the given backend and RangeDeleter. |
| 109 | Recover(b backend.Backend, rd RangeDeleter) |
| 110 | |
| 111 | // Stop stops the lessor for managing leases. The behavior of calling Stop multiple |
| 112 | // times is undefined. |
| 113 | Stop() |
| 114 | } |
| 115 | |
| 116 | // lessor implements Lessor interface. |
| 117 | // TODO: use clockwork for testability. |
| 118 | type lessor struct { |
| 119 | mu sync.Mutex |
| 120 | |
| 121 | // demotec is set when the lessor is the primary. |
| 122 | // demotec will be closed if the lessor is demoted. |
| 123 | demotec chan struct{} |
| 124 | |
| 125 | // TODO: probably this should be a heap with a secondary |
| 126 | // id index. |
| 127 | // Now it is O(N) to loop over the leases to find expired ones. |
| 128 | // We want to make Grant, Revoke, and findExpiredLeases all O(logN) and |
| 129 | // Renew O(1). |
| 130 | // findExpiredLeases and Renew should be the most frequent operations. |
| 131 | leaseMap map[LeaseID]*Lease |
| 132 | |
| 133 | itemMap map[LeaseItem]LeaseID |
| 134 | |
| 135 | // When a lease expires, the lessor will delete the |
| 136 | // leased range (or key) by the RangeDeleter. |
| 137 | rd RangeDeleter |
| 138 | |
| 139 | // backend to persist leases. We only persist lease ID and expiry for now. |
| 140 | // The leased items can be recovered by iterating all the keys in kv. |
| 141 | b backend.Backend |
| 142 | |
| 143 | // minLeaseTTL is the minimum lease TTL that can be granted for a lease. Any |
| 144 | // requests for shorter TTLs are extended to the minimum TTL. |
| 145 | minLeaseTTL int64 |
| 146 | |
| 147 | expiredC chan []*Lease |
| 148 | // stopC is a channel whose closure indicates that the lessor should be stopped. |
| 149 | stopC chan struct{} |
| 150 | // doneC is a channel whose closure indicates that the lessor is stopped. |
| 151 | doneC chan struct{} |
| 152 | } |
| 153 | |
| 154 | func NewLessor(b backend.Backend, minLeaseTTL int64) Lessor { |
| 155 | return newLessor(b, minLeaseTTL) |
| 156 | } |
| 157 | |
| 158 | func newLessor(b backend.Backend, minLeaseTTL int64) *lessor { |
| 159 | l := &lessor{ |
| 160 | leaseMap: make(map[LeaseID]*Lease), |
| 161 | itemMap: make(map[LeaseItem]LeaseID), |
| 162 | b: b, |
| 163 | minLeaseTTL: minLeaseTTL, |
| 164 | // expiredC is a small buffered chan to avoid unnecessary blocking. |
| 165 | expiredC: make(chan []*Lease, 16), |
| 166 | stopC: make(chan struct{}), |
| 167 | doneC: make(chan struct{}), |
| 168 | } |
| 169 | l.initAndRecover() |
| 170 | |
| 171 | go l.runLoop() |
| 172 | |
| 173 | return l |
| 174 | } |
| 175 | |
| 176 | // isPrimary indicates if this lessor is the primary lessor. The primary |
| 177 | // lessor manages lease expiration and renew. |
| 178 | // |
| 179 | // in etcd, raft leader is the primary. Thus there might be two primary |
| 180 | // leaders at the same time (raft allows concurrent leader but with different term) |
| 181 | // for at most a leader election timeout. |
| 182 | // The old primary leader cannot affect the correctness since its proposal has a |
| 183 | // smaller term and will not be committed. |
| 184 | // |
| 185 | // TODO: raft follower do not forward lease management proposals. There might be a |
| 186 | // very small window (within second normally which depends on go scheduling) that |
| 187 | // a raft follow is the primary between the raft leader demotion and lessor demotion. |
| 188 | // Usually this should not be a problem. Lease should not be that sensitive to timing. |
| 189 | func (le *lessor) isPrimary() bool { |
| 190 | return le.demotec != nil |
| 191 | } |
| 192 | |
| 193 | func (le *lessor) SetRangeDeleter(rd RangeDeleter) { |
| 194 | le.mu.Lock() |
| 195 | defer le.mu.Unlock() |
| 196 | |
| 197 | le.rd = rd |
| 198 | } |
| 199 | |
| 200 | func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) { |
| 201 | if id == NoLease { |
| 202 | return nil, ErrLeaseNotFound |
| 203 | } |
| 204 | |
| 205 | if ttl > MaxLeaseTTL { |
| 206 | return nil, ErrLeaseTTLTooLarge |
| 207 | } |
| 208 | |
| 209 | // TODO: when lessor is under high load, it should give out lease |
| 210 | // with longer TTL to reduce renew load. |
| 211 | l := &Lease{ |
| 212 | ID: id, |
| 213 | ttl: ttl, |
| 214 | itemSet: make(map[LeaseItem]struct{}), |
| 215 | revokec: make(chan struct{}), |
| 216 | } |
| 217 | |
| 218 | le.mu.Lock() |
| 219 | defer le.mu.Unlock() |
| 220 | |
| 221 | if _, ok := le.leaseMap[id]; ok { |
| 222 | return nil, ErrLeaseExists |
| 223 | } |
| 224 | |
| 225 | if l.ttl < le.minLeaseTTL { |
| 226 | l.ttl = le.minLeaseTTL |
| 227 | } |
| 228 | |
| 229 | if le.isPrimary() { |
| 230 | l.refresh(0) |
| 231 | } else { |
| 232 | l.forever() |
| 233 | } |
| 234 | |
| 235 | le.leaseMap[id] = l |
| 236 | l.persistTo(le.b) |
| 237 | |
| 238 | return l, nil |
| 239 | } |
| 240 | |
| 241 | func (le *lessor) Revoke(id LeaseID) error { |
| 242 | le.mu.Lock() |
| 243 | |
| 244 | l := le.leaseMap[id] |
| 245 | if l == nil { |
| 246 | le.mu.Unlock() |
| 247 | return ErrLeaseNotFound |
| 248 | } |
| 249 | defer close(l.revokec) |
| 250 | // unlock before doing external work |
| 251 | le.mu.Unlock() |
| 252 | |
| 253 | if le.rd == nil { |
| 254 | return nil |
| 255 | } |
| 256 | |
| 257 | txn := le.rd() |
| 258 | |
| 259 | // sort keys so deletes are in same order among all members, |
| 260 | // otherwise the backened hashes will be different |
| 261 | keys := l.Keys() |
| 262 | sort.StringSlice(keys).Sort() |
| 263 | for _, key := range keys { |
| 264 | txn.DeleteRange([]byte(key), nil) |
| 265 | } |
| 266 | |
| 267 | le.mu.Lock() |
| 268 | defer le.mu.Unlock() |
| 269 | delete(le.leaseMap, l.ID) |
| 270 | // lease deletion needs to be in the same backend transaction with the |
| 271 | // kv deletion. Or we might end up with not executing the revoke or not |
| 272 | // deleting the keys if etcdserver fails in between. |
| 273 | le.b.BatchTx().UnsafeDelete(leaseBucketName, int64ToBytes(int64(l.ID))) |
| 274 | |
| 275 | txn.End() |
| 276 | return nil |
| 277 | } |
| 278 | |
| 279 | // Renew renews an existing lease. If the given lease does not exist or |
| 280 | // has expired, an error will be returned. |
| 281 | func (le *lessor) Renew(id LeaseID) (int64, error) { |
| 282 | le.mu.Lock() |
| 283 | |
| 284 | unlock := func() { le.mu.Unlock() } |
| 285 | defer func() { unlock() }() |
| 286 | |
| 287 | if !le.isPrimary() { |
| 288 | // forward renew request to primary instead of returning error. |
| 289 | return -1, ErrNotPrimary |
| 290 | } |
| 291 | |
| 292 | demotec := le.demotec |
| 293 | |
| 294 | l := le.leaseMap[id] |
| 295 | if l == nil { |
| 296 | return -1, ErrLeaseNotFound |
| 297 | } |
| 298 | |
| 299 | if l.expired() { |
| 300 | le.mu.Unlock() |
| 301 | unlock = func() {} |
| 302 | select { |
| 303 | // A expired lease might be pending for revoking or going through |
| 304 | // quorum to be revoked. To be accurate, renew request must wait for the |
| 305 | // deletion to complete. |
| 306 | case <-l.revokec: |
| 307 | return -1, ErrLeaseNotFound |
| 308 | // The expired lease might fail to be revoked if the primary changes. |
| 309 | // The caller will retry on ErrNotPrimary. |
| 310 | case <-demotec: |
| 311 | return -1, ErrNotPrimary |
| 312 | case <-le.stopC: |
| 313 | return -1, ErrNotPrimary |
| 314 | } |
| 315 | } |
| 316 | |
| 317 | l.refresh(0) |
| 318 | return l.ttl, nil |
| 319 | } |
| 320 | |
| 321 | func (le *lessor) Lookup(id LeaseID) *Lease { |
| 322 | le.mu.Lock() |
| 323 | defer le.mu.Unlock() |
| 324 | return le.leaseMap[id] |
| 325 | } |
| 326 | |
| 327 | func (le *lessor) unsafeLeases() []*Lease { |
| 328 | leases := make([]*Lease, 0, len(le.leaseMap)) |
| 329 | for _, l := range le.leaseMap { |
| 330 | leases = append(leases, l) |
| 331 | } |
| 332 | sort.Sort(leasesByExpiry(leases)) |
| 333 | return leases |
| 334 | } |
| 335 | |
| 336 | func (le *lessor) Leases() []*Lease { |
| 337 | le.mu.Lock() |
| 338 | ls := le.unsafeLeases() |
| 339 | le.mu.Unlock() |
| 340 | return ls |
| 341 | } |
| 342 | |
| 343 | func (le *lessor) Promote(extend time.Duration) { |
| 344 | le.mu.Lock() |
| 345 | defer le.mu.Unlock() |
| 346 | |
| 347 | le.demotec = make(chan struct{}) |
| 348 | |
| 349 | // refresh the expiries of all leases. |
| 350 | for _, l := range le.leaseMap { |
| 351 | l.refresh(extend) |
| 352 | } |
| 353 | |
| 354 | if len(le.leaseMap) < leaseRevokeRate { |
| 355 | // no possibility of lease pile-up |
| 356 | return |
| 357 | } |
| 358 | |
| 359 | // adjust expiries in case of overlap |
| 360 | leases := le.unsafeLeases() |
| 361 | |
| 362 | baseWindow := leases[0].Remaining() |
| 363 | nextWindow := baseWindow + time.Second |
| 364 | expires := 0 |
| 365 | // have fewer expires than the total revoke rate so piled up leases |
| 366 | // don't consume the entire revoke limit |
| 367 | targetExpiresPerSecond := (3 * leaseRevokeRate) / 4 |
| 368 | for _, l := range leases { |
| 369 | remaining := l.Remaining() |
| 370 | if remaining > nextWindow { |
| 371 | baseWindow = remaining |
| 372 | nextWindow = baseWindow + time.Second |
| 373 | expires = 1 |
| 374 | continue |
| 375 | } |
| 376 | expires++ |
| 377 | if expires <= targetExpiresPerSecond { |
| 378 | continue |
| 379 | } |
| 380 | rateDelay := float64(time.Second) * (float64(expires) / float64(targetExpiresPerSecond)) |
| 381 | // If leases are extended by n seconds, leases n seconds ahead of the |
| 382 | // base window should be extended by only one second. |
| 383 | rateDelay -= float64(remaining - baseWindow) |
| 384 | delay := time.Duration(rateDelay) |
| 385 | nextWindow = baseWindow + delay |
| 386 | l.refresh(delay + extend) |
| 387 | } |
| 388 | } |
| 389 | |
| 390 | type leasesByExpiry []*Lease |
| 391 | |
| 392 | func (le leasesByExpiry) Len() int { return len(le) } |
| 393 | func (le leasesByExpiry) Less(i, j int) bool { return le[i].Remaining() < le[j].Remaining() } |
| 394 | func (le leasesByExpiry) Swap(i, j int) { le[i], le[j] = le[j], le[i] } |
| 395 | |
| 396 | func (le *lessor) Demote() { |
| 397 | le.mu.Lock() |
| 398 | defer le.mu.Unlock() |
| 399 | |
| 400 | // set the expiries of all leases to forever |
| 401 | for _, l := range le.leaseMap { |
| 402 | l.forever() |
| 403 | } |
| 404 | |
| 405 | if le.demotec != nil { |
| 406 | close(le.demotec) |
| 407 | le.demotec = nil |
| 408 | } |
| 409 | } |
| 410 | |
| 411 | // Attach attaches items to the lease with given ID. When the lease |
| 412 | // expires, the attached items will be automatically removed. |
| 413 | // If the given lease does not exist, an error will be returned. |
| 414 | func (le *lessor) Attach(id LeaseID, items []LeaseItem) error { |
| 415 | le.mu.Lock() |
| 416 | defer le.mu.Unlock() |
| 417 | |
| 418 | l := le.leaseMap[id] |
| 419 | if l == nil { |
| 420 | return ErrLeaseNotFound |
| 421 | } |
| 422 | |
| 423 | l.mu.Lock() |
| 424 | for _, it := range items { |
| 425 | l.itemSet[it] = struct{}{} |
| 426 | le.itemMap[it] = id |
| 427 | } |
| 428 | l.mu.Unlock() |
| 429 | return nil |
| 430 | } |
| 431 | |
| 432 | func (le *lessor) GetLease(item LeaseItem) LeaseID { |
| 433 | le.mu.Lock() |
| 434 | id := le.itemMap[item] |
| 435 | le.mu.Unlock() |
| 436 | return id |
| 437 | } |
| 438 | |
| 439 | // Detach detaches items from the lease with given ID. |
| 440 | // If the given lease does not exist, an error will be returned. |
| 441 | func (le *lessor) Detach(id LeaseID, items []LeaseItem) error { |
| 442 | le.mu.Lock() |
| 443 | defer le.mu.Unlock() |
| 444 | |
| 445 | l := le.leaseMap[id] |
| 446 | if l == nil { |
| 447 | return ErrLeaseNotFound |
| 448 | } |
| 449 | |
| 450 | l.mu.Lock() |
| 451 | for _, it := range items { |
| 452 | delete(l.itemSet, it) |
| 453 | delete(le.itemMap, it) |
| 454 | } |
| 455 | l.mu.Unlock() |
| 456 | return nil |
| 457 | } |
| 458 | |
| 459 | func (le *lessor) Recover(b backend.Backend, rd RangeDeleter) { |
| 460 | le.mu.Lock() |
| 461 | defer le.mu.Unlock() |
| 462 | |
| 463 | le.b = b |
| 464 | le.rd = rd |
| 465 | le.leaseMap = make(map[LeaseID]*Lease) |
| 466 | le.itemMap = make(map[LeaseItem]LeaseID) |
| 467 | le.initAndRecover() |
| 468 | } |
| 469 | |
| 470 | func (le *lessor) ExpiredLeasesC() <-chan []*Lease { |
| 471 | return le.expiredC |
| 472 | } |
| 473 | |
| 474 | func (le *lessor) Stop() { |
| 475 | close(le.stopC) |
| 476 | <-le.doneC |
| 477 | } |
| 478 | |
| 479 | func (le *lessor) runLoop() { |
| 480 | defer close(le.doneC) |
| 481 | |
| 482 | for { |
| 483 | var ls []*Lease |
| 484 | |
| 485 | // rate limit |
| 486 | revokeLimit := leaseRevokeRate / 2 |
| 487 | |
| 488 | le.mu.Lock() |
| 489 | if le.isPrimary() { |
| 490 | ls = le.findExpiredLeases(revokeLimit) |
| 491 | } |
| 492 | le.mu.Unlock() |
| 493 | |
| 494 | if len(ls) != 0 { |
| 495 | select { |
| 496 | case <-le.stopC: |
| 497 | return |
| 498 | case le.expiredC <- ls: |
| 499 | default: |
| 500 | // the receiver of expiredC is probably busy handling |
| 501 | // other stuff |
| 502 | // let's try this next time after 500ms |
| 503 | } |
| 504 | } |
| 505 | |
| 506 | select { |
| 507 | case <-time.After(500 * time.Millisecond): |
| 508 | case <-le.stopC: |
| 509 | return |
| 510 | } |
| 511 | } |
| 512 | } |
| 513 | |
| 514 | // findExpiredLeases loops leases in the leaseMap until reaching expired limit |
| 515 | // and returns the expired leases that needed to be revoked. |
| 516 | func (le *lessor) findExpiredLeases(limit int) []*Lease { |
| 517 | leases := make([]*Lease, 0, 16) |
| 518 | |
| 519 | for _, l := range le.leaseMap { |
| 520 | // TODO: probably should change to <= 100-500 millisecond to |
| 521 | // make up committing latency. |
| 522 | if l.expired() { |
| 523 | leases = append(leases, l) |
| 524 | |
| 525 | // reach expired limit |
| 526 | if len(leases) == limit { |
| 527 | break |
| 528 | } |
| 529 | } |
| 530 | } |
| 531 | |
| 532 | return leases |
| 533 | } |
| 534 | |
| 535 | func (le *lessor) initAndRecover() { |
| 536 | tx := le.b.BatchTx() |
| 537 | tx.Lock() |
| 538 | |
| 539 | tx.UnsafeCreateBucket(leaseBucketName) |
| 540 | _, vs := tx.UnsafeRange(leaseBucketName, int64ToBytes(0), int64ToBytes(math.MaxInt64), 0) |
| 541 | // TODO: copy vs and do decoding outside tx lock if lock contention becomes an issue. |
| 542 | for i := range vs { |
| 543 | var lpb leasepb.Lease |
| 544 | err := lpb.Unmarshal(vs[i]) |
| 545 | if err != nil { |
| 546 | tx.Unlock() |
| 547 | panic("failed to unmarshal lease proto item") |
| 548 | } |
| 549 | ID := LeaseID(lpb.ID) |
| 550 | if lpb.TTL < le.minLeaseTTL { |
| 551 | lpb.TTL = le.minLeaseTTL |
| 552 | } |
| 553 | le.leaseMap[ID] = &Lease{ |
| 554 | ID: ID, |
| 555 | ttl: lpb.TTL, |
| 556 | // itemSet will be filled in when recover key-value pairs |
| 557 | // set expiry to forever, refresh when promoted |
| 558 | itemSet: make(map[LeaseItem]struct{}), |
| 559 | expiry: forever, |
| 560 | revokec: make(chan struct{}), |
| 561 | } |
| 562 | } |
| 563 | tx.Unlock() |
| 564 | |
| 565 | le.b.ForceCommit() |
| 566 | } |
| 567 | |
| 568 | type Lease struct { |
| 569 | ID LeaseID |
| 570 | ttl int64 // time to live in seconds |
| 571 | // expiryMu protects concurrent accesses to expiry |
| 572 | expiryMu sync.RWMutex |
| 573 | // expiry is time when lease should expire. no expiration when expiry.IsZero() is true |
| 574 | expiry time.Time |
| 575 | |
| 576 | // mu protects concurrent accesses to itemSet |
| 577 | mu sync.RWMutex |
| 578 | itemSet map[LeaseItem]struct{} |
| 579 | revokec chan struct{} |
| 580 | } |
| 581 | |
| 582 | func (l *Lease) expired() bool { |
| 583 | return l.Remaining() <= 0 |
| 584 | } |
| 585 | |
| 586 | func (l *Lease) persistTo(b backend.Backend) { |
| 587 | key := int64ToBytes(int64(l.ID)) |
| 588 | |
| 589 | lpb := leasepb.Lease{ID: int64(l.ID), TTL: int64(l.ttl)} |
| 590 | val, err := lpb.Marshal() |
| 591 | if err != nil { |
| 592 | panic("failed to marshal lease proto item") |
| 593 | } |
| 594 | |
| 595 | b.BatchTx().Lock() |
| 596 | b.BatchTx().UnsafePut(leaseBucketName, key, val) |
| 597 | b.BatchTx().Unlock() |
| 598 | } |
| 599 | |
| 600 | // TTL returns the TTL of the Lease. |
| 601 | func (l *Lease) TTL() int64 { |
| 602 | return l.ttl |
| 603 | } |
| 604 | |
| 605 | // refresh refreshes the expiry of the lease. |
| 606 | func (l *Lease) refresh(extend time.Duration) { |
| 607 | newExpiry := time.Now().Add(extend + time.Duration(l.ttl)*time.Second) |
| 608 | l.expiryMu.Lock() |
| 609 | defer l.expiryMu.Unlock() |
| 610 | l.expiry = newExpiry |
| 611 | } |
| 612 | |
| 613 | // forever sets the expiry of lease to be forever. |
| 614 | func (l *Lease) forever() { |
| 615 | l.expiryMu.Lock() |
| 616 | defer l.expiryMu.Unlock() |
| 617 | l.expiry = forever |
| 618 | } |
| 619 | |
| 620 | // Keys returns all the keys attached to the lease. |
| 621 | func (l *Lease) Keys() []string { |
| 622 | l.mu.RLock() |
| 623 | keys := make([]string, 0, len(l.itemSet)) |
| 624 | for k := range l.itemSet { |
| 625 | keys = append(keys, k.Key) |
| 626 | } |
| 627 | l.mu.RUnlock() |
| 628 | return keys |
| 629 | } |
| 630 | |
| 631 | // Remaining returns the remaining time of the lease. |
| 632 | func (l *Lease) Remaining() time.Duration { |
| 633 | l.expiryMu.RLock() |
| 634 | defer l.expiryMu.RUnlock() |
| 635 | if l.expiry.IsZero() { |
| 636 | return time.Duration(math.MaxInt64) |
| 637 | } |
| 638 | return time.Until(l.expiry) |
| 639 | } |
| 640 | |
| 641 | type LeaseItem struct { |
| 642 | Key string |
| 643 | } |
| 644 | |
| 645 | func int64ToBytes(n int64) []byte { |
| 646 | bytes := make([]byte, 8) |
| 647 | binary.BigEndian.PutUint64(bytes, uint64(n)) |
| 648 | return bytes |
| 649 | } |
| 650 | |
| 651 | // FakeLessor is a fake implementation of Lessor interface. |
| 652 | // Used for testing only. |
| 653 | type FakeLessor struct{} |
| 654 | |
| 655 | func (fl *FakeLessor) SetRangeDeleter(dr RangeDeleter) {} |
| 656 | |
| 657 | func (fl *FakeLessor) Grant(id LeaseID, ttl int64) (*Lease, error) { return nil, nil } |
| 658 | |
| 659 | func (fl *FakeLessor) Revoke(id LeaseID) error { return nil } |
| 660 | |
| 661 | func (fl *FakeLessor) Attach(id LeaseID, items []LeaseItem) error { return nil } |
| 662 | |
| 663 | func (fl *FakeLessor) GetLease(item LeaseItem) LeaseID { return 0 } |
| 664 | func (fl *FakeLessor) Detach(id LeaseID, items []LeaseItem) error { return nil } |
| 665 | |
| 666 | func (fl *FakeLessor) Promote(extend time.Duration) {} |
| 667 | |
| 668 | func (fl *FakeLessor) Demote() {} |
| 669 | |
| 670 | func (fl *FakeLessor) Renew(id LeaseID) (int64, error) { return 10, nil } |
| 671 | |
| 672 | func (fl *FakeLessor) Lookup(id LeaseID) *Lease { return nil } |
| 673 | |
| 674 | func (fl *FakeLessor) Leases() []*Lease { return nil } |
| 675 | |
| 676 | func (fl *FakeLessor) ExpiredLeasesC() <-chan []*Lease { return nil } |
| 677 | |
| 678 | func (fl *FakeLessor) Recover(b backend.Backend, rd RangeDeleter) {} |
| 679 | |
| 680 | func (fl *FakeLessor) Stop() {} |