khenaidoo | ab1f7bd | 2019-11-14 14:00:27 -0500 | [diff] [blame] | 1 | // Copyright 2015 The etcd Authors |
| 2 | // |
| 3 | // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | // you may not use this file except in compliance with the License. |
| 5 | // You may obtain a copy of the License at |
| 6 | // |
| 7 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | // |
| 9 | // Unless required by applicable law or agreed to in writing, software |
| 10 | // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | // See the License for the specific language governing permissions and |
| 13 | // limitations under the License. |
| 14 | |
| 15 | package lease |
| 16 | |
| 17 | import ( |
| 18 | "container/heap" |
| 19 | "context" |
| 20 | "encoding/binary" |
| 21 | "errors" |
| 22 | "math" |
| 23 | "sort" |
| 24 | "sync" |
| 25 | "time" |
| 26 | |
| 27 | pb "go.etcd.io/etcd/etcdserver/etcdserverpb" |
| 28 | "go.etcd.io/etcd/lease/leasepb" |
| 29 | "go.etcd.io/etcd/mvcc/backend" |
| 30 | "go.uber.org/zap" |
| 31 | ) |
| 32 | |
| 33 | // NoLease is a special LeaseID representing the absence of a lease. |
| 34 | const NoLease = LeaseID(0) |
| 35 | |
| 36 | // MaxLeaseTTL is the maximum lease TTL value |
| 37 | const MaxLeaseTTL = 9000000000 |
| 38 | |
| 39 | var ( |
| 40 | forever = time.Time{} |
| 41 | |
| 42 | leaseBucketName = []byte("lease") |
| 43 | |
| 44 | // maximum number of leases to revoke per second; configurable for tests |
| 45 | leaseRevokeRate = 1000 |
| 46 | |
| 47 | // maximum number of lease checkpoints recorded to the consensus log per second; configurable for tests |
| 48 | leaseCheckpointRate = 1000 |
| 49 | |
| 50 | // the default interval of lease checkpoint |
| 51 | defaultLeaseCheckpointInterval = 5 * time.Minute |
| 52 | |
| 53 | // maximum number of lease checkpoints to batch into a single consensus log entry |
| 54 | maxLeaseCheckpointBatchSize = 1000 |
| 55 | |
| 56 | // the default interval to check if the expired lease is revoked |
| 57 | defaultExpiredleaseRetryInterval = 3 * time.Second |
| 58 | |
| 59 | ErrNotPrimary = errors.New("not a primary lessor") |
| 60 | ErrLeaseNotFound = errors.New("lease not found") |
| 61 | ErrLeaseExists = errors.New("lease already exists") |
| 62 | ErrLeaseTTLTooLarge = errors.New("too large lease TTL") |
| 63 | ) |
| 64 | |
| 65 | // TxnDelete is a TxnWrite that only permits deletes. Defined here |
| 66 | // to avoid circular dependency with mvcc. |
| 67 | type TxnDelete interface { |
| 68 | DeleteRange(key, end []byte) (n, rev int64) |
| 69 | End() |
| 70 | } |
| 71 | |
| 72 | // RangeDeleter is a TxnDelete constructor. |
| 73 | type RangeDeleter func() TxnDelete |
| 74 | |
| 75 | // Checkpointer permits checkpointing of lease remaining TTLs to the consensus log. Defined here to |
| 76 | // avoid circular dependency with mvcc. |
| 77 | type Checkpointer func(ctx context.Context, lc *pb.LeaseCheckpointRequest) |
| 78 | |
| 79 | type LeaseID int64 |
| 80 | |
| 81 | // Lessor owns leases. It can grant, revoke, renew and modify leases for lessee. |
| 82 | type Lessor interface { |
| 83 | // SetRangeDeleter lets the lessor create TxnDeletes to the store. |
| 84 | // Lessor deletes the items in the revoked or expired lease by creating |
| 85 | // new TxnDeletes. |
| 86 | SetRangeDeleter(rd RangeDeleter) |
| 87 | |
| 88 | SetCheckpointer(cp Checkpointer) |
| 89 | |
| 90 | // Grant grants a lease that expires at least after TTL seconds. |
| 91 | Grant(id LeaseID, ttl int64) (*Lease, error) |
| 92 | // Revoke revokes a lease with given ID. The item attached to the |
| 93 | // given lease will be removed. If the ID does not exist, an error |
| 94 | // will be returned. |
| 95 | Revoke(id LeaseID) error |
| 96 | |
| 97 | // Checkpoint applies the remainingTTL of a lease. The remainingTTL is used in Promote to set |
| 98 | // the expiry of leases to less than the full TTL when possible. |
| 99 | Checkpoint(id LeaseID, remainingTTL int64) error |
| 100 | |
| 101 | // Attach attaches given leaseItem to the lease with given LeaseID. |
| 102 | // If the lease does not exist, an error will be returned. |
| 103 | Attach(id LeaseID, items []LeaseItem) error |
| 104 | |
| 105 | // GetLease returns LeaseID for given item. |
| 106 | // If no lease found, NoLease value will be returned. |
| 107 | GetLease(item LeaseItem) LeaseID |
| 108 | |
| 109 | // Detach detaches given leaseItem from the lease with given LeaseID. |
| 110 | // If the lease does not exist, an error will be returned. |
| 111 | Detach(id LeaseID, items []LeaseItem) error |
| 112 | |
| 113 | // Promote promotes the lessor to be the primary lessor. Primary lessor manages |
| 114 | // the expiration and renew of leases. |
| 115 | // Newly promoted lessor renew the TTL of all lease to extend + previous TTL. |
| 116 | Promote(extend time.Duration) |
| 117 | |
| 118 | // Demote demotes the lessor from being the primary lessor. |
| 119 | Demote() |
| 120 | |
| 121 | // Renew renews a lease with given ID. It returns the renewed TTL. If the ID does not exist, |
| 122 | // an error will be returned. |
| 123 | Renew(id LeaseID) (int64, error) |
| 124 | |
| 125 | // Lookup gives the lease at a given lease id, if any |
| 126 | Lookup(id LeaseID) *Lease |
| 127 | |
| 128 | // Leases lists all leases. |
| 129 | Leases() []*Lease |
| 130 | |
| 131 | // ExpiredLeasesC returns a chan that is used to receive expired leases. |
| 132 | ExpiredLeasesC() <-chan []*Lease |
| 133 | |
| 134 | // Recover recovers the lessor state from the given backend and RangeDeleter. |
| 135 | Recover(b backend.Backend, rd RangeDeleter) |
| 136 | |
| 137 | // Stop stops the lessor for managing leases. The behavior of calling Stop multiple |
| 138 | // times is undefined. |
| 139 | Stop() |
| 140 | } |
| 141 | |
| 142 | // lessor implements Lessor interface. |
| 143 | // TODO: use clockwork for testability. |
| 144 | type lessor struct { |
| 145 | mu sync.RWMutex |
| 146 | |
| 147 | // demotec is set when the lessor is the primary. |
| 148 | // demotec will be closed if the lessor is demoted. |
| 149 | demotec chan struct{} |
| 150 | |
| 151 | leaseMap map[LeaseID]*Lease |
| 152 | leaseExpiredNotifier *LeaseExpiredNotifier |
| 153 | leaseCheckpointHeap LeaseQueue |
| 154 | itemMap map[LeaseItem]LeaseID |
| 155 | |
| 156 | // When a lease expires, the lessor will delete the |
| 157 | // leased range (or key) by the RangeDeleter. |
| 158 | rd RangeDeleter |
| 159 | |
| 160 | // When a lease's deadline should be persisted to preserve the remaining TTL across leader |
| 161 | // elections and restarts, the lessor will checkpoint the lease by the Checkpointer. |
| 162 | cp Checkpointer |
| 163 | |
| 164 | // backend to persist leases. We only persist lease ID and expiry for now. |
| 165 | // The leased items can be recovered by iterating all the keys in kv. |
| 166 | b backend.Backend |
| 167 | |
| 168 | // minLeaseTTL is the minimum lease TTL that can be granted for a lease. Any |
| 169 | // requests for shorter TTLs are extended to the minimum TTL. |
| 170 | minLeaseTTL int64 |
| 171 | |
| 172 | expiredC chan []*Lease |
| 173 | // stopC is a channel whose closure indicates that the lessor should be stopped. |
| 174 | stopC chan struct{} |
| 175 | // doneC is a channel whose closure indicates that the lessor is stopped. |
| 176 | doneC chan struct{} |
| 177 | |
| 178 | lg *zap.Logger |
| 179 | |
| 180 | // Wait duration between lease checkpoints. |
| 181 | checkpointInterval time.Duration |
| 182 | // the interval to check if the expired lease is revoked |
| 183 | expiredLeaseRetryInterval time.Duration |
| 184 | } |
| 185 | |
| 186 | type LessorConfig struct { |
| 187 | MinLeaseTTL int64 |
| 188 | CheckpointInterval time.Duration |
| 189 | ExpiredLeasesRetryInterval time.Duration |
| 190 | } |
| 191 | |
| 192 | func NewLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) Lessor { |
| 193 | return newLessor(lg, b, cfg) |
| 194 | } |
| 195 | |
| 196 | func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) *lessor { |
| 197 | checkpointInterval := cfg.CheckpointInterval |
| 198 | expiredLeaseRetryInterval := cfg.ExpiredLeasesRetryInterval |
| 199 | if checkpointInterval == 0 { |
| 200 | checkpointInterval = defaultLeaseCheckpointInterval |
| 201 | } |
| 202 | if expiredLeaseRetryInterval == 0 { |
| 203 | expiredLeaseRetryInterval = defaultExpiredleaseRetryInterval |
| 204 | } |
| 205 | l := &lessor{ |
| 206 | leaseMap: make(map[LeaseID]*Lease), |
| 207 | itemMap: make(map[LeaseItem]LeaseID), |
| 208 | leaseExpiredNotifier: newLeaseExpiredNotifier(), |
| 209 | leaseCheckpointHeap: make(LeaseQueue, 0), |
| 210 | b: b, |
| 211 | minLeaseTTL: cfg.MinLeaseTTL, |
| 212 | checkpointInterval: checkpointInterval, |
| 213 | expiredLeaseRetryInterval: expiredLeaseRetryInterval, |
| 214 | // expiredC is a small buffered chan to avoid unnecessary blocking. |
| 215 | expiredC: make(chan []*Lease, 16), |
| 216 | stopC: make(chan struct{}), |
| 217 | doneC: make(chan struct{}), |
| 218 | lg: lg, |
| 219 | } |
| 220 | l.initAndRecover() |
| 221 | |
| 222 | go l.runLoop() |
| 223 | |
| 224 | return l |
| 225 | } |
| 226 | |
| 227 | // isPrimary indicates if this lessor is the primary lessor. The primary |
| 228 | // lessor manages lease expiration and renew. |
| 229 | // |
| 230 | // in etcd, raft leader is the primary. Thus there might be two primary |
| 231 | // leaders at the same time (raft allows concurrent leader but with different term) |
| 232 | // for at most a leader election timeout. |
| 233 | // The old primary leader cannot affect the correctness since its proposal has a |
| 234 | // smaller term and will not be committed. |
| 235 | // |
| 236 | // TODO: raft follower do not forward lease management proposals. There might be a |
| 237 | // very small window (within second normally which depends on go scheduling) that |
| 238 | // a raft follow is the primary between the raft leader demotion and lessor demotion. |
| 239 | // Usually this should not be a problem. Lease should not be that sensitive to timing. |
| 240 | func (le *lessor) isPrimary() bool { |
| 241 | return le.demotec != nil |
| 242 | } |
| 243 | |
| 244 | func (le *lessor) SetRangeDeleter(rd RangeDeleter) { |
| 245 | le.mu.Lock() |
| 246 | defer le.mu.Unlock() |
| 247 | |
| 248 | le.rd = rd |
| 249 | } |
| 250 | |
| 251 | func (le *lessor) SetCheckpointer(cp Checkpointer) { |
| 252 | le.mu.Lock() |
| 253 | defer le.mu.Unlock() |
| 254 | |
| 255 | le.cp = cp |
| 256 | } |
| 257 | |
| 258 | func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) { |
| 259 | if id == NoLease { |
| 260 | return nil, ErrLeaseNotFound |
| 261 | } |
| 262 | |
| 263 | if ttl > MaxLeaseTTL { |
| 264 | return nil, ErrLeaseTTLTooLarge |
| 265 | } |
| 266 | |
| 267 | // TODO: when lessor is under high load, it should give out lease |
| 268 | // with longer TTL to reduce renew load. |
| 269 | l := &Lease{ |
| 270 | ID: id, |
| 271 | ttl: ttl, |
| 272 | itemSet: make(map[LeaseItem]struct{}), |
| 273 | revokec: make(chan struct{}), |
| 274 | } |
| 275 | |
| 276 | le.mu.Lock() |
| 277 | defer le.mu.Unlock() |
| 278 | |
| 279 | if _, ok := le.leaseMap[id]; ok { |
| 280 | return nil, ErrLeaseExists |
| 281 | } |
| 282 | |
| 283 | if l.ttl < le.minLeaseTTL { |
| 284 | l.ttl = le.minLeaseTTL |
| 285 | } |
| 286 | |
| 287 | if le.isPrimary() { |
| 288 | l.refresh(0) |
| 289 | } else { |
| 290 | l.forever() |
| 291 | } |
| 292 | |
| 293 | le.leaseMap[id] = l |
| 294 | item := &LeaseWithTime{id: l.ID, time: l.expiry.UnixNano()} |
| 295 | le.leaseExpiredNotifier.RegisterOrUpdate(item) |
| 296 | l.persistTo(le.b) |
| 297 | |
| 298 | leaseTotalTTLs.Observe(float64(l.ttl)) |
| 299 | leaseGranted.Inc() |
| 300 | |
| 301 | if le.isPrimary() { |
| 302 | le.scheduleCheckpointIfNeeded(l) |
| 303 | } |
| 304 | |
| 305 | return l, nil |
| 306 | } |
| 307 | |
| 308 | func (le *lessor) Revoke(id LeaseID) error { |
| 309 | le.mu.Lock() |
| 310 | |
| 311 | l := le.leaseMap[id] |
| 312 | if l == nil { |
| 313 | le.mu.Unlock() |
| 314 | return ErrLeaseNotFound |
| 315 | } |
| 316 | defer close(l.revokec) |
| 317 | // unlock before doing external work |
| 318 | le.mu.Unlock() |
| 319 | |
| 320 | if le.rd == nil { |
| 321 | return nil |
| 322 | } |
| 323 | |
| 324 | txn := le.rd() |
| 325 | |
| 326 | // sort keys so deletes are in same order among all members, |
| 327 | // otherwise the backend hashes will be different |
| 328 | keys := l.Keys() |
| 329 | sort.StringSlice(keys).Sort() |
| 330 | for _, key := range keys { |
| 331 | txn.DeleteRange([]byte(key), nil) |
| 332 | } |
| 333 | |
| 334 | le.mu.Lock() |
| 335 | defer le.mu.Unlock() |
| 336 | delete(le.leaseMap, l.ID) |
| 337 | // lease deletion needs to be in the same backend transaction with the |
| 338 | // kv deletion. Or we might end up with not executing the revoke or not |
| 339 | // deleting the keys if etcdserver fails in between. |
| 340 | le.b.BatchTx().UnsafeDelete(leaseBucketName, int64ToBytes(int64(l.ID))) |
| 341 | |
| 342 | txn.End() |
| 343 | |
| 344 | leaseRevoked.Inc() |
| 345 | return nil |
| 346 | } |
| 347 | |
| 348 | func (le *lessor) Checkpoint(id LeaseID, remainingTTL int64) error { |
| 349 | le.mu.Lock() |
| 350 | defer le.mu.Unlock() |
| 351 | |
| 352 | if l, ok := le.leaseMap[id]; ok { |
| 353 | // when checkpointing, we only update the remainingTTL, Promote is responsible for applying this to lease expiry |
| 354 | l.remainingTTL = remainingTTL |
| 355 | if le.isPrimary() { |
| 356 | // schedule the next checkpoint as needed |
| 357 | le.scheduleCheckpointIfNeeded(l) |
| 358 | } |
| 359 | } |
| 360 | return nil |
| 361 | } |
| 362 | |
| 363 | // Renew renews an existing lease. If the given lease does not exist or |
| 364 | // has expired, an error will be returned. |
| 365 | func (le *lessor) Renew(id LeaseID) (int64, error) { |
| 366 | le.mu.RLock() |
| 367 | if !le.isPrimary() { |
| 368 | // forward renew request to primary instead of returning error. |
| 369 | le.mu.RUnlock() |
| 370 | return -1, ErrNotPrimary |
| 371 | } |
| 372 | |
| 373 | demotec := le.demotec |
| 374 | |
| 375 | l := le.leaseMap[id] |
| 376 | if l == nil { |
| 377 | le.mu.RUnlock() |
| 378 | return -1, ErrLeaseNotFound |
| 379 | } |
| 380 | // Clear remaining TTL when we renew if it is set |
| 381 | clearRemainingTTL := le.cp != nil && l.remainingTTL > 0 |
| 382 | |
| 383 | le.mu.RUnlock() |
| 384 | if l.expired() { |
| 385 | select { |
| 386 | // A expired lease might be pending for revoking or going through |
| 387 | // quorum to be revoked. To be accurate, renew request must wait for the |
| 388 | // deletion to complete. |
| 389 | case <-l.revokec: |
| 390 | return -1, ErrLeaseNotFound |
| 391 | // The expired lease might fail to be revoked if the primary changes. |
| 392 | // The caller will retry on ErrNotPrimary. |
| 393 | case <-demotec: |
| 394 | return -1, ErrNotPrimary |
| 395 | case <-le.stopC: |
| 396 | return -1, ErrNotPrimary |
| 397 | } |
| 398 | } |
| 399 | |
| 400 | // Clear remaining TTL when we renew if it is set |
| 401 | // By applying a RAFT entry only when the remainingTTL is already set, we limit the number |
| 402 | // of RAFT entries written per lease to a max of 2 per checkpoint interval. |
| 403 | if clearRemainingTTL { |
| 404 | le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: []*pb.LeaseCheckpoint{{ID: int64(l.ID), Remaining_TTL: 0}}}) |
| 405 | } |
| 406 | |
| 407 | le.mu.Lock() |
| 408 | l.refresh(0) |
| 409 | item := &LeaseWithTime{id: l.ID, time: l.expiry.UnixNano()} |
| 410 | le.leaseExpiredNotifier.RegisterOrUpdate(item) |
| 411 | le.mu.Unlock() |
| 412 | |
| 413 | leaseRenewed.Inc() |
| 414 | return l.ttl, nil |
| 415 | } |
| 416 | |
| 417 | func (le *lessor) Lookup(id LeaseID) *Lease { |
| 418 | le.mu.RLock() |
| 419 | defer le.mu.RUnlock() |
| 420 | return le.leaseMap[id] |
| 421 | } |
| 422 | |
| 423 | func (le *lessor) unsafeLeases() []*Lease { |
| 424 | leases := make([]*Lease, 0, len(le.leaseMap)) |
| 425 | for _, l := range le.leaseMap { |
| 426 | leases = append(leases, l) |
| 427 | } |
| 428 | return leases |
| 429 | } |
| 430 | |
| 431 | func (le *lessor) Leases() []*Lease { |
| 432 | le.mu.RLock() |
| 433 | ls := le.unsafeLeases() |
| 434 | le.mu.RUnlock() |
| 435 | sort.Sort(leasesByExpiry(ls)) |
| 436 | return ls |
| 437 | } |
| 438 | |
| 439 | func (le *lessor) Promote(extend time.Duration) { |
| 440 | le.mu.Lock() |
| 441 | defer le.mu.Unlock() |
| 442 | |
| 443 | le.demotec = make(chan struct{}) |
| 444 | |
| 445 | // refresh the expiries of all leases. |
| 446 | for _, l := range le.leaseMap { |
| 447 | l.refresh(extend) |
| 448 | item := &LeaseWithTime{id: l.ID, time: l.expiry.UnixNano()} |
| 449 | le.leaseExpiredNotifier.RegisterOrUpdate(item) |
| 450 | } |
| 451 | |
| 452 | if len(le.leaseMap) < leaseRevokeRate { |
| 453 | // no possibility of lease pile-up |
| 454 | return |
| 455 | } |
| 456 | |
| 457 | // adjust expiries in case of overlap |
| 458 | leases := le.unsafeLeases() |
| 459 | sort.Sort(leasesByExpiry(leases)) |
| 460 | |
| 461 | baseWindow := leases[0].Remaining() |
| 462 | nextWindow := baseWindow + time.Second |
| 463 | expires := 0 |
| 464 | // have fewer expires than the total revoke rate so piled up leases |
| 465 | // don't consume the entire revoke limit |
| 466 | targetExpiresPerSecond := (3 * leaseRevokeRate) / 4 |
| 467 | for _, l := range leases { |
| 468 | remaining := l.Remaining() |
| 469 | if remaining > nextWindow { |
| 470 | baseWindow = remaining |
| 471 | nextWindow = baseWindow + time.Second |
| 472 | expires = 1 |
| 473 | continue |
| 474 | } |
| 475 | expires++ |
| 476 | if expires <= targetExpiresPerSecond { |
| 477 | continue |
| 478 | } |
| 479 | rateDelay := float64(time.Second) * (float64(expires) / float64(targetExpiresPerSecond)) |
| 480 | // If leases are extended by n seconds, leases n seconds ahead of the |
| 481 | // base window should be extended by only one second. |
| 482 | rateDelay -= float64(remaining - baseWindow) |
| 483 | delay := time.Duration(rateDelay) |
| 484 | nextWindow = baseWindow + delay |
| 485 | l.refresh(delay + extend) |
| 486 | item := &LeaseWithTime{id: l.ID, time: l.expiry.UnixNano()} |
| 487 | le.leaseExpiredNotifier.RegisterOrUpdate(item) |
| 488 | le.scheduleCheckpointIfNeeded(l) |
| 489 | } |
| 490 | } |
| 491 | |
| 492 | type leasesByExpiry []*Lease |
| 493 | |
| 494 | func (le leasesByExpiry) Len() int { return len(le) } |
| 495 | func (le leasesByExpiry) Less(i, j int) bool { return le[i].Remaining() < le[j].Remaining() } |
| 496 | func (le leasesByExpiry) Swap(i, j int) { le[i], le[j] = le[j], le[i] } |
| 497 | |
| 498 | func (le *lessor) Demote() { |
| 499 | le.mu.Lock() |
| 500 | defer le.mu.Unlock() |
| 501 | |
| 502 | // set the expiries of all leases to forever |
| 503 | for _, l := range le.leaseMap { |
| 504 | l.forever() |
| 505 | } |
| 506 | |
| 507 | le.clearScheduledLeasesCheckpoints() |
| 508 | |
| 509 | if le.demotec != nil { |
| 510 | close(le.demotec) |
| 511 | le.demotec = nil |
| 512 | } |
| 513 | } |
| 514 | |
| 515 | // Attach attaches items to the lease with given ID. When the lease |
| 516 | // expires, the attached items will be automatically removed. |
| 517 | // If the given lease does not exist, an error will be returned. |
| 518 | func (le *lessor) Attach(id LeaseID, items []LeaseItem) error { |
| 519 | le.mu.Lock() |
| 520 | defer le.mu.Unlock() |
| 521 | |
| 522 | l := le.leaseMap[id] |
| 523 | if l == nil { |
| 524 | return ErrLeaseNotFound |
| 525 | } |
| 526 | |
| 527 | l.mu.Lock() |
| 528 | for _, it := range items { |
| 529 | l.itemSet[it] = struct{}{} |
| 530 | le.itemMap[it] = id |
| 531 | } |
| 532 | l.mu.Unlock() |
| 533 | return nil |
| 534 | } |
| 535 | |
| 536 | func (le *lessor) GetLease(item LeaseItem) LeaseID { |
| 537 | le.mu.RLock() |
| 538 | id := le.itemMap[item] |
| 539 | le.mu.RUnlock() |
| 540 | return id |
| 541 | } |
| 542 | |
| 543 | // Detach detaches items from the lease with given ID. |
| 544 | // If the given lease does not exist, an error will be returned. |
| 545 | func (le *lessor) Detach(id LeaseID, items []LeaseItem) error { |
| 546 | le.mu.Lock() |
| 547 | defer le.mu.Unlock() |
| 548 | |
| 549 | l := le.leaseMap[id] |
| 550 | if l == nil { |
| 551 | return ErrLeaseNotFound |
| 552 | } |
| 553 | |
| 554 | l.mu.Lock() |
| 555 | for _, it := range items { |
| 556 | delete(l.itemSet, it) |
| 557 | delete(le.itemMap, it) |
| 558 | } |
| 559 | l.mu.Unlock() |
| 560 | return nil |
| 561 | } |
| 562 | |
| 563 | func (le *lessor) Recover(b backend.Backend, rd RangeDeleter) { |
| 564 | le.mu.Lock() |
| 565 | defer le.mu.Unlock() |
| 566 | |
| 567 | le.b = b |
| 568 | le.rd = rd |
| 569 | le.leaseMap = make(map[LeaseID]*Lease) |
| 570 | le.itemMap = make(map[LeaseItem]LeaseID) |
| 571 | le.initAndRecover() |
| 572 | } |
| 573 | |
| 574 | func (le *lessor) ExpiredLeasesC() <-chan []*Lease { |
| 575 | return le.expiredC |
| 576 | } |
| 577 | |
| 578 | func (le *lessor) Stop() { |
| 579 | close(le.stopC) |
| 580 | <-le.doneC |
| 581 | } |
| 582 | |
| 583 | func (le *lessor) runLoop() { |
| 584 | defer close(le.doneC) |
| 585 | |
| 586 | for { |
| 587 | le.revokeExpiredLeases() |
| 588 | le.checkpointScheduledLeases() |
| 589 | |
| 590 | select { |
| 591 | case <-time.After(500 * time.Millisecond): |
| 592 | case <-le.stopC: |
| 593 | return |
| 594 | } |
| 595 | } |
| 596 | } |
| 597 | |
| 598 | // revokeExpiredLeases finds all leases past their expiry and sends them to expired channel for |
| 599 | // to be revoked. |
| 600 | func (le *lessor) revokeExpiredLeases() { |
| 601 | var ls []*Lease |
| 602 | |
| 603 | // rate limit |
| 604 | revokeLimit := leaseRevokeRate / 2 |
| 605 | |
| 606 | le.mu.RLock() |
| 607 | if le.isPrimary() { |
| 608 | ls = le.findExpiredLeases(revokeLimit) |
| 609 | } |
| 610 | le.mu.RUnlock() |
| 611 | |
| 612 | if len(ls) != 0 { |
| 613 | select { |
| 614 | case <-le.stopC: |
| 615 | return |
| 616 | case le.expiredC <- ls: |
| 617 | default: |
| 618 | // the receiver of expiredC is probably busy handling |
| 619 | // other stuff |
| 620 | // let's try this next time after 500ms |
| 621 | } |
| 622 | } |
| 623 | } |
| 624 | |
| 625 | // checkpointScheduledLeases finds all scheduled lease checkpoints that are due and |
| 626 | // submits them to the checkpointer to persist them to the consensus log. |
| 627 | func (le *lessor) checkpointScheduledLeases() { |
| 628 | var cps []*pb.LeaseCheckpoint |
| 629 | |
| 630 | // rate limit |
| 631 | for i := 0; i < leaseCheckpointRate/2; i++ { |
| 632 | le.mu.Lock() |
| 633 | if le.isPrimary() { |
| 634 | cps = le.findDueScheduledCheckpoints(maxLeaseCheckpointBatchSize) |
| 635 | } |
| 636 | le.mu.Unlock() |
| 637 | |
| 638 | if len(cps) != 0 { |
| 639 | le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: cps}) |
| 640 | } |
| 641 | if len(cps) < maxLeaseCheckpointBatchSize { |
| 642 | return |
| 643 | } |
| 644 | } |
| 645 | } |
| 646 | |
| 647 | func (le *lessor) clearScheduledLeasesCheckpoints() { |
| 648 | le.leaseCheckpointHeap = make(LeaseQueue, 0) |
| 649 | } |
| 650 | |
| 651 | // expireExists returns true if expiry items exist. |
| 652 | // It pops only when expiry item exists. |
| 653 | // "next" is true, to indicate that it may exist in next attempt. |
| 654 | func (le *lessor) expireExists() (l *Lease, ok bool, next bool) { |
| 655 | if le.leaseExpiredNotifier.Len() == 0 { |
| 656 | return nil, false, false |
| 657 | } |
| 658 | |
| 659 | item := le.leaseExpiredNotifier.Poll() |
| 660 | l = le.leaseMap[item.id] |
| 661 | if l == nil { |
| 662 | // lease has expired or been revoked |
| 663 | // no need to revoke (nothing is expiry) |
| 664 | le.leaseExpiredNotifier.Unregister() // O(log N) |
| 665 | return nil, false, true |
| 666 | } |
| 667 | now := time.Now() |
| 668 | if now.UnixNano() < item.time /* expiration time */ { |
| 669 | // Candidate expirations are caught up, reinsert this item |
| 670 | // and no need to revoke (nothing is expiry) |
| 671 | return l, false, false |
| 672 | } |
| 673 | |
| 674 | // recheck if revoke is complete after retry interval |
| 675 | item.time = now.Add(le.expiredLeaseRetryInterval).UnixNano() |
| 676 | le.leaseExpiredNotifier.RegisterOrUpdate(item) |
| 677 | return l, true, false |
| 678 | } |
| 679 | |
| 680 | // findExpiredLeases loops leases in the leaseMap until reaching expired limit |
| 681 | // and returns the expired leases that needed to be revoked. |
| 682 | func (le *lessor) findExpiredLeases(limit int) []*Lease { |
| 683 | leases := make([]*Lease, 0, 16) |
| 684 | |
| 685 | for { |
| 686 | l, ok, next := le.expireExists() |
| 687 | if !ok && !next { |
| 688 | break |
| 689 | } |
| 690 | if !ok { |
| 691 | continue |
| 692 | } |
| 693 | if next { |
| 694 | continue |
| 695 | } |
| 696 | |
| 697 | if l.expired() { |
| 698 | leases = append(leases, l) |
| 699 | |
| 700 | // reach expired limit |
| 701 | if len(leases) == limit { |
| 702 | break |
| 703 | } |
| 704 | } |
| 705 | } |
| 706 | |
| 707 | return leases |
| 708 | } |
| 709 | |
| 710 | func (le *lessor) scheduleCheckpointIfNeeded(lease *Lease) { |
| 711 | if le.cp == nil { |
| 712 | return |
| 713 | } |
| 714 | |
| 715 | if lease.RemainingTTL() > int64(le.checkpointInterval.Seconds()) { |
| 716 | if le.lg != nil { |
| 717 | le.lg.Debug("Scheduling lease checkpoint", |
| 718 | zap.Int64("leaseID", int64(lease.ID)), |
| 719 | zap.Duration("intervalSeconds", le.checkpointInterval), |
| 720 | ) |
| 721 | } |
| 722 | heap.Push(&le.leaseCheckpointHeap, &LeaseWithTime{ |
| 723 | id: lease.ID, |
| 724 | time: time.Now().Add(le.checkpointInterval).UnixNano(), |
| 725 | }) |
| 726 | } |
| 727 | } |
| 728 | |
| 729 | func (le *lessor) findDueScheduledCheckpoints(checkpointLimit int) []*pb.LeaseCheckpoint { |
| 730 | if le.cp == nil { |
| 731 | return nil |
| 732 | } |
| 733 | |
| 734 | now := time.Now() |
| 735 | cps := []*pb.LeaseCheckpoint{} |
| 736 | for le.leaseCheckpointHeap.Len() > 0 && len(cps) < checkpointLimit { |
| 737 | lt := le.leaseCheckpointHeap[0] |
| 738 | if lt.time /* next checkpoint time */ > now.UnixNano() { |
| 739 | return cps |
| 740 | } |
| 741 | heap.Pop(&le.leaseCheckpointHeap) |
| 742 | var l *Lease |
| 743 | var ok bool |
| 744 | if l, ok = le.leaseMap[lt.id]; !ok { |
| 745 | continue |
| 746 | } |
| 747 | if !now.Before(l.expiry) { |
| 748 | continue |
| 749 | } |
| 750 | remainingTTL := int64(math.Ceil(l.expiry.Sub(now).Seconds())) |
| 751 | if remainingTTL >= l.ttl { |
| 752 | continue |
| 753 | } |
| 754 | if le.lg != nil { |
| 755 | le.lg.Debug("Checkpointing lease", |
| 756 | zap.Int64("leaseID", int64(lt.id)), |
| 757 | zap.Int64("remainingTTL", remainingTTL), |
| 758 | ) |
| 759 | } |
| 760 | cps = append(cps, &pb.LeaseCheckpoint{ID: int64(lt.id), Remaining_TTL: remainingTTL}) |
| 761 | } |
| 762 | return cps |
| 763 | } |
| 764 | |
| 765 | func (le *lessor) initAndRecover() { |
| 766 | tx := le.b.BatchTx() |
| 767 | tx.Lock() |
| 768 | |
| 769 | tx.UnsafeCreateBucket(leaseBucketName) |
| 770 | _, vs := tx.UnsafeRange(leaseBucketName, int64ToBytes(0), int64ToBytes(math.MaxInt64), 0) |
| 771 | // TODO: copy vs and do decoding outside tx lock if lock contention becomes an issue. |
| 772 | for i := range vs { |
| 773 | var lpb leasepb.Lease |
| 774 | err := lpb.Unmarshal(vs[i]) |
| 775 | if err != nil { |
| 776 | tx.Unlock() |
| 777 | panic("failed to unmarshal lease proto item") |
| 778 | } |
| 779 | ID := LeaseID(lpb.ID) |
| 780 | if lpb.TTL < le.minLeaseTTL { |
| 781 | lpb.TTL = le.minLeaseTTL |
| 782 | } |
| 783 | le.leaseMap[ID] = &Lease{ |
| 784 | ID: ID, |
| 785 | ttl: lpb.TTL, |
| 786 | // itemSet will be filled in when recover key-value pairs |
| 787 | // set expiry to forever, refresh when promoted |
| 788 | itemSet: make(map[LeaseItem]struct{}), |
| 789 | expiry: forever, |
| 790 | revokec: make(chan struct{}), |
| 791 | } |
| 792 | } |
| 793 | le.leaseExpiredNotifier.Init() |
| 794 | heap.Init(&le.leaseCheckpointHeap) |
| 795 | tx.Unlock() |
| 796 | |
| 797 | le.b.ForceCommit() |
| 798 | } |
| 799 | |
| 800 | type Lease struct { |
| 801 | ID LeaseID |
| 802 | ttl int64 // time to live of the lease in seconds |
| 803 | remainingTTL int64 // remaining time to live in seconds, if zero valued it is considered unset and the full ttl should be used |
| 804 | // expiryMu protects concurrent accesses to expiry |
| 805 | expiryMu sync.RWMutex |
| 806 | // expiry is time when lease should expire. no expiration when expiry.IsZero() is true |
| 807 | expiry time.Time |
| 808 | |
| 809 | // mu protects concurrent accesses to itemSet |
| 810 | mu sync.RWMutex |
| 811 | itemSet map[LeaseItem]struct{} |
| 812 | revokec chan struct{} |
| 813 | } |
| 814 | |
| 815 | func (l *Lease) expired() bool { |
| 816 | return l.Remaining() <= 0 |
| 817 | } |
| 818 | |
| 819 | func (l *Lease) persistTo(b backend.Backend) { |
| 820 | key := int64ToBytes(int64(l.ID)) |
| 821 | |
| 822 | lpb := leasepb.Lease{ID: int64(l.ID), TTL: l.ttl, RemainingTTL: l.remainingTTL} |
| 823 | val, err := lpb.Marshal() |
| 824 | if err != nil { |
| 825 | panic("failed to marshal lease proto item") |
| 826 | } |
| 827 | |
| 828 | b.BatchTx().Lock() |
| 829 | b.BatchTx().UnsafePut(leaseBucketName, key, val) |
| 830 | b.BatchTx().Unlock() |
| 831 | } |
| 832 | |
| 833 | // TTL returns the TTL of the Lease. |
| 834 | func (l *Lease) TTL() int64 { |
| 835 | return l.ttl |
| 836 | } |
| 837 | |
| 838 | // RemainingTTL returns the last checkpointed remaining TTL of the lease. |
| 839 | // TODO(jpbetz): do not expose this utility method |
| 840 | func (l *Lease) RemainingTTL() int64 { |
| 841 | if l.remainingTTL > 0 { |
| 842 | return l.remainingTTL |
| 843 | } |
| 844 | return l.ttl |
| 845 | } |
| 846 | |
| 847 | // refresh refreshes the expiry of the lease. |
| 848 | func (l *Lease) refresh(extend time.Duration) { |
| 849 | newExpiry := time.Now().Add(extend + time.Duration(l.RemainingTTL())*time.Second) |
| 850 | l.expiryMu.Lock() |
| 851 | defer l.expiryMu.Unlock() |
| 852 | l.expiry = newExpiry |
| 853 | } |
| 854 | |
| 855 | // forever sets the expiry of lease to be forever. |
| 856 | func (l *Lease) forever() { |
| 857 | l.expiryMu.Lock() |
| 858 | defer l.expiryMu.Unlock() |
| 859 | l.expiry = forever |
| 860 | } |
| 861 | |
| 862 | // Keys returns all the keys attached to the lease. |
| 863 | func (l *Lease) Keys() []string { |
| 864 | l.mu.RLock() |
| 865 | keys := make([]string, 0, len(l.itemSet)) |
| 866 | for k := range l.itemSet { |
| 867 | keys = append(keys, k.Key) |
| 868 | } |
| 869 | l.mu.RUnlock() |
| 870 | return keys |
| 871 | } |
| 872 | |
| 873 | // Remaining returns the remaining time of the lease. |
| 874 | func (l *Lease) Remaining() time.Duration { |
| 875 | l.expiryMu.RLock() |
| 876 | defer l.expiryMu.RUnlock() |
| 877 | if l.expiry.IsZero() { |
| 878 | return time.Duration(math.MaxInt64) |
| 879 | } |
| 880 | return time.Until(l.expiry) |
| 881 | } |
| 882 | |
| 883 | type LeaseItem struct { |
| 884 | Key string |
| 885 | } |
| 886 | |
| 887 | func int64ToBytes(n int64) []byte { |
| 888 | bytes := make([]byte, 8) |
| 889 | binary.BigEndian.PutUint64(bytes, uint64(n)) |
| 890 | return bytes |
| 891 | } |
| 892 | |
| 893 | // FakeLessor is a fake implementation of Lessor interface. |
| 894 | // Used for testing only. |
| 895 | type FakeLessor struct{} |
| 896 | |
| 897 | func (fl *FakeLessor) SetRangeDeleter(dr RangeDeleter) {} |
| 898 | |
| 899 | func (fl *FakeLessor) SetCheckpointer(cp Checkpointer) {} |
| 900 | |
| 901 | func (fl *FakeLessor) Grant(id LeaseID, ttl int64) (*Lease, error) { return nil, nil } |
| 902 | |
| 903 | func (fl *FakeLessor) Revoke(id LeaseID) error { return nil } |
| 904 | |
| 905 | func (fl *FakeLessor) Checkpoint(id LeaseID, remainingTTL int64) error { return nil } |
| 906 | |
| 907 | func (fl *FakeLessor) Attach(id LeaseID, items []LeaseItem) error { return nil } |
| 908 | |
| 909 | func (fl *FakeLessor) GetLease(item LeaseItem) LeaseID { return 0 } |
| 910 | func (fl *FakeLessor) Detach(id LeaseID, items []LeaseItem) error { return nil } |
| 911 | |
| 912 | func (fl *FakeLessor) Promote(extend time.Duration) {} |
| 913 | |
| 914 | func (fl *FakeLessor) Demote() {} |
| 915 | |
| 916 | func (fl *FakeLessor) Renew(id LeaseID) (int64, error) { return 10, nil } |
| 917 | |
| 918 | func (fl *FakeLessor) Lookup(id LeaseID) *Lease { return nil } |
| 919 | |
| 920 | func (fl *FakeLessor) Leases() []*Lease { return nil } |
| 921 | |
| 922 | func (fl *FakeLessor) ExpiredLeasesC() <-chan []*Lease { return nil } |
| 923 | |
| 924 | func (fl *FakeLessor) Recover(b backend.Backend, rd RangeDeleter) {} |
| 925 | |
| 926 | func (fl *FakeLessor) Stop() {} |
| 927 | |
| 928 | type FakeTxnDelete struct { |
| 929 | backend.BatchTx |
| 930 | } |
| 931 | |
| 932 | func (ftd *FakeTxnDelete) DeleteRange(key, end []byte) (n, rev int64) { return 0, 0 } |
| 933 | func (ftd *FakeTxnDelete) End() { ftd.Unlock() } |