blob: e367ebbb366680a50aa9110e4c200b589cec2cab [file] [log] [blame]
khenaidooab1f7bd2019-11-14 14:00:27 -05001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package mvcc
16
17import (
18 "context"
19 "encoding/binary"
20 "errors"
21 "fmt"
22 "hash/crc32"
23 "math"
24 "sync"
25 "sync/atomic"
26 "time"
27
28 "go.etcd.io/etcd/lease"
29 "go.etcd.io/etcd/mvcc/backend"
30 "go.etcd.io/etcd/mvcc/mvccpb"
31 "go.etcd.io/etcd/pkg/schedule"
32
33 "github.com/coreos/pkg/capnslog"
34 "go.uber.org/zap"
35)
36
37var (
38 keyBucketName = []byte("key")
39 metaBucketName = []byte("meta")
40
41 consistentIndexKeyName = []byte("consistent_index")
42 scheduledCompactKeyName = []byte("scheduledCompactRev")
43 finishedCompactKeyName = []byte("finishedCompactRev")
44
45 ErrCompacted = errors.New("mvcc: required revision has been compacted")
46 ErrFutureRev = errors.New("mvcc: required revision is a future revision")
47 ErrCanceled = errors.New("mvcc: watcher is canceled")
48 ErrClosed = errors.New("mvcc: closed")
49
50 plog = capnslog.NewPackageLogger("go.etcd.io/etcd", "mvcc")
51)
52
53const (
54 // markedRevBytesLen is the byte length of marked revision.
55 // The first `revBytesLen` bytes represents a normal revision. The last
56 // one byte is the mark.
57 markedRevBytesLen = revBytesLen + 1
58 markBytePosition = markedRevBytesLen - 1
59 markTombstone byte = 't'
60)
61
62var restoreChunkKeys = 10000 // non-const for testing
63var defaultCompactBatchLimit = 1000
64
65// ConsistentIndexGetter is an interface that wraps the Get method.
66// Consistent index is the offset of an entry in a consistent replicated log.
67type ConsistentIndexGetter interface {
68 // ConsistentIndex returns the consistent index of current executing entry.
69 ConsistentIndex() uint64
70}
71
72type StoreConfig struct {
73 CompactionBatchLimit int
74}
75
76type store struct {
77 ReadView
78 WriteView
79
80 // consistentIndex caches the "consistent_index" key's value. Accessed
81 // through atomics so must be 64-bit aligned.
82 consistentIndex uint64
83
84 cfg StoreConfig
85
86 // mu read locks for txns and write locks for non-txn store changes.
87 mu sync.RWMutex
88
89 ig ConsistentIndexGetter
90
91 b backend.Backend
92 kvindex index
93
94 le lease.Lessor
95
96 // revMuLock protects currentRev and compactMainRev.
97 // Locked at end of write txn and released after write txn unlock lock.
98 // Locked before locking read txn and released after locking.
99 revMu sync.RWMutex
100 // currentRev is the revision of the last completed transaction.
101 currentRev int64
102 // compactMainRev is the main revision of the last compaction.
103 compactMainRev int64
104
105 // bytesBuf8 is a byte slice of length 8
106 // to avoid a repetitive allocation in saveIndex.
107 bytesBuf8 []byte
108
109 fifoSched schedule.Scheduler
110
111 stopc chan struct{}
112
113 lg *zap.Logger
114}
115
116// NewStore returns a new store. It is useful to create a store inside
117// mvcc pkg. It should only be used for testing externally.
118func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter, cfg StoreConfig) *store {
119 if cfg.CompactionBatchLimit == 0 {
120 cfg.CompactionBatchLimit = defaultCompactBatchLimit
121 }
122 s := &store{
123 cfg: cfg,
124 b: b,
125 ig: ig,
126 kvindex: newTreeIndex(lg),
127
128 le: le,
129
130 currentRev: 1,
131 compactMainRev: -1,
132
133 bytesBuf8: make([]byte, 8),
134 fifoSched: schedule.NewFIFOScheduler(),
135
136 stopc: make(chan struct{}),
137
138 lg: lg,
139 }
140 s.ReadView = &readView{s}
141 s.WriteView = &writeView{s}
142 if s.le != nil {
143 s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write() })
144 }
145
146 tx := s.b.BatchTx()
147 tx.Lock()
148 tx.UnsafeCreateBucket(keyBucketName)
149 tx.UnsafeCreateBucket(metaBucketName)
150 tx.Unlock()
151 s.b.ForceCommit()
152
153 s.mu.Lock()
154 defer s.mu.Unlock()
155 if err := s.restore(); err != nil {
156 // TODO: return the error instead of panic here?
157 panic("failed to recover store from backend")
158 }
159
160 return s
161}
162
163func (s *store) compactBarrier(ctx context.Context, ch chan struct{}) {
164 if ctx == nil || ctx.Err() != nil {
165 s.mu.Lock()
166 select {
167 case <-s.stopc:
168 default:
169 f := func(ctx context.Context) { s.compactBarrier(ctx, ch) }
170 s.fifoSched.Schedule(f)
171 }
172 s.mu.Unlock()
173 return
174 }
175 close(ch)
176}
177
178func (s *store) Hash() (hash uint32, revision int64, err error) {
179 // TODO: hash and revision could be inconsistent, one possible fix is to add s.revMu.RLock() at the beginning of function, which is costly
180 start := time.Now()
181
182 s.b.ForceCommit()
183 h, err := s.b.Hash(DefaultIgnores)
184
185 hashSec.Observe(time.Since(start).Seconds())
186 return h, s.currentRev, err
187}
188
189func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev int64, err error) {
190 start := time.Now()
191
192 s.mu.RLock()
193 s.revMu.RLock()
194 compactRev, currentRev = s.compactMainRev, s.currentRev
195 s.revMu.RUnlock()
196
197 if rev > 0 && rev <= compactRev {
198 s.mu.RUnlock()
199 return 0, 0, compactRev, ErrCompacted
200 } else if rev > 0 && rev > currentRev {
201 s.mu.RUnlock()
202 return 0, currentRev, 0, ErrFutureRev
203 }
204
205 if rev == 0 {
206 rev = currentRev
207 }
208 keep := s.kvindex.Keep(rev)
209
210 tx := s.b.ReadTx()
211 tx.RLock()
212 defer tx.RUnlock()
213 s.mu.RUnlock()
214
215 upper := revision{main: rev + 1}
216 lower := revision{main: compactRev + 1}
217 h := crc32.New(crc32.MakeTable(crc32.Castagnoli))
218
219 h.Write(keyBucketName)
220 err = tx.UnsafeForEach(keyBucketName, func(k, v []byte) error {
221 kr := bytesToRev(k)
222 if !upper.GreaterThan(kr) {
223 return nil
224 }
225 // skip revisions that are scheduled for deletion
226 // due to compacting; don't skip if there isn't one.
227 if lower.GreaterThan(kr) && len(keep) > 0 {
228 if _, ok := keep[kr]; !ok {
229 return nil
230 }
231 }
232 h.Write(k)
233 h.Write(v)
234 return nil
235 })
236 hash = h.Sum32()
237
238 hashRevSec.Observe(time.Since(start).Seconds())
239 return hash, currentRev, compactRev, err
240}
241
242func (s *store) updateCompactRev(rev int64) (<-chan struct{}, error) {
243 s.revMu.Lock()
244 if rev <= s.compactMainRev {
245 ch := make(chan struct{})
246 f := func(ctx context.Context) { s.compactBarrier(ctx, ch) }
247 s.fifoSched.Schedule(f)
248 s.revMu.Unlock()
249 return ch, ErrCompacted
250 }
251 if rev > s.currentRev {
252 s.revMu.Unlock()
253 return nil, ErrFutureRev
254 }
255
256 s.compactMainRev = rev
257
258 rbytes := newRevBytes()
259 revToBytes(revision{main: rev}, rbytes)
260
261 tx := s.b.BatchTx()
262 tx.Lock()
263 tx.UnsafePut(metaBucketName, scheduledCompactKeyName, rbytes)
264 tx.Unlock()
265 // ensure that desired compaction is persisted
266 s.b.ForceCommit()
267
268 s.revMu.Unlock()
269
270 return nil, nil
271}
272
273func (s *store) compact(rev int64) (<-chan struct{}, error) {
274 start := time.Now()
275 keep := s.kvindex.Compact(rev)
276 ch := make(chan struct{})
277 var j = func(ctx context.Context) {
278 if ctx.Err() != nil {
279 s.compactBarrier(ctx, ch)
280 return
281 }
282 if !s.scheduleCompaction(rev, keep) {
283 s.compactBarrier(nil, ch)
284 return
285 }
286 close(ch)
287 }
288
289 s.fifoSched.Schedule(j)
290
291 indexCompactionPauseMs.Observe(float64(time.Since(start) / time.Millisecond))
292 return ch, nil
293}
294
295func (s *store) compactLockfree(rev int64) (<-chan struct{}, error) {
296 ch, err := s.updateCompactRev(rev)
297 if nil != err {
298 return ch, err
299 }
300
301 return s.compact(rev)
302}
303
304func (s *store) Compact(rev int64) (<-chan struct{}, error) {
305 s.mu.Lock()
306
307 ch, err := s.updateCompactRev(rev)
308
309 if err != nil {
310 s.mu.Unlock()
311 return ch, err
312 }
313 s.mu.Unlock()
314
315 return s.compact(rev)
316}
317
318// DefaultIgnores is a map of keys to ignore in hash checking.
319var DefaultIgnores map[backend.IgnoreKey]struct{}
320
321func init() {
322 DefaultIgnores = map[backend.IgnoreKey]struct{}{
323 // consistent index might be changed due to v2 internal sync, which
324 // is not controllable by the user.
325 {Bucket: string(metaBucketName), Key: string(consistentIndexKeyName)}: {},
326 }
327}
328
329func (s *store) Commit() {
330 s.mu.Lock()
331 defer s.mu.Unlock()
332
333 tx := s.b.BatchTx()
334 tx.Lock()
335 s.saveIndex(tx)
336 tx.Unlock()
337 s.b.ForceCommit()
338}
339
340func (s *store) Restore(b backend.Backend) error {
341 s.mu.Lock()
342 defer s.mu.Unlock()
343
344 close(s.stopc)
345 s.fifoSched.Stop()
346
347 atomic.StoreUint64(&s.consistentIndex, 0)
348 s.b = b
349 s.kvindex = newTreeIndex(s.lg)
350 s.currentRev = 1
351 s.compactMainRev = -1
352 s.fifoSched = schedule.NewFIFOScheduler()
353 s.stopc = make(chan struct{})
354
355 return s.restore()
356}
357
358func (s *store) restore() error {
359 s.setupMetricsReporter()
360
361 min, max := newRevBytes(), newRevBytes()
362 revToBytes(revision{main: 1}, min)
363 revToBytes(revision{main: math.MaxInt64, sub: math.MaxInt64}, max)
364
365 keyToLease := make(map[string]lease.LeaseID)
366
367 // restore index
368 tx := s.b.BatchTx()
369 tx.Lock()
370
371 _, finishedCompactBytes := tx.UnsafeRange(metaBucketName, finishedCompactKeyName, nil, 0)
372 if len(finishedCompactBytes) != 0 {
373 s.compactMainRev = bytesToRev(finishedCompactBytes[0]).main
374
375 if s.lg != nil {
376 s.lg.Info(
377 "restored last compact revision",
378 zap.String("meta-bucket-name", string(metaBucketName)),
379 zap.String("meta-bucket-name-key", string(finishedCompactKeyName)),
380 zap.Int64("restored-compact-revision", s.compactMainRev),
381 )
382 } else {
383 plog.Printf("restore compact to %d", s.compactMainRev)
384 }
385 }
386 _, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0)
387 scheduledCompact := int64(0)
388 if len(scheduledCompactBytes) != 0 {
389 scheduledCompact = bytesToRev(scheduledCompactBytes[0]).main
390 }
391
392 // index keys concurrently as they're loaded in from tx
393 keysGauge.Set(0)
394 rkvc, revc := restoreIntoIndex(s.lg, s.kvindex)
395 for {
396 keys, vals := tx.UnsafeRange(keyBucketName, min, max, int64(restoreChunkKeys))
397 if len(keys) == 0 {
398 break
399 }
400 // rkvc blocks if the total pending keys exceeds the restore
401 // chunk size to keep keys from consuming too much memory.
402 restoreChunk(s.lg, rkvc, keys, vals, keyToLease)
403 if len(keys) < restoreChunkKeys {
404 // partial set implies final set
405 break
406 }
407 // next set begins after where this one ended
408 newMin := bytesToRev(keys[len(keys)-1][:revBytesLen])
409 newMin.sub++
410 revToBytes(newMin, min)
411 }
412 close(rkvc)
413 s.currentRev = <-revc
414
415 // keys in the range [compacted revision -N, compaction] might all be deleted due to compaction.
416 // the correct revision should be set to compaction revision in the case, not the largest revision
417 // we have seen.
418 if s.currentRev < s.compactMainRev {
419 s.currentRev = s.compactMainRev
420 }
421 if scheduledCompact <= s.compactMainRev {
422 scheduledCompact = 0
423 }
424
425 for key, lid := range keyToLease {
426 if s.le == nil {
427 panic("no lessor to attach lease")
428 }
429 err := s.le.Attach(lid, []lease.LeaseItem{{Key: key}})
430 if err != nil {
431 if s.lg != nil {
432 s.lg.Warn(
433 "failed to attach a lease",
434 zap.String("lease-id", fmt.Sprintf("%016x", lid)),
435 zap.Error(err),
436 )
437 } else {
438 plog.Errorf("unexpected Attach error: %v", err)
439 }
440 }
441 }
442
443 tx.Unlock()
444
445 if scheduledCompact != 0 {
446 s.compactLockfree(scheduledCompact)
447
448 if s.lg != nil {
449 s.lg.Info(
450 "resume scheduled compaction",
451 zap.String("meta-bucket-name", string(metaBucketName)),
452 zap.String("meta-bucket-name-key", string(scheduledCompactKeyName)),
453 zap.Int64("scheduled-compact-revision", scheduledCompact),
454 )
455 } else {
456 plog.Printf("resume scheduled compaction at %d", scheduledCompact)
457 }
458 }
459
460 return nil
461}
462
463type revKeyValue struct {
464 key []byte
465 kv mvccpb.KeyValue
466 kstr string
467}
468
469func restoreIntoIndex(lg *zap.Logger, idx index) (chan<- revKeyValue, <-chan int64) {
470 rkvc, revc := make(chan revKeyValue, restoreChunkKeys), make(chan int64, 1)
471 go func() {
472 currentRev := int64(1)
473 defer func() { revc <- currentRev }()
474 // restore the tree index from streaming the unordered index.
475 kiCache := make(map[string]*keyIndex, restoreChunkKeys)
476 for rkv := range rkvc {
477 ki, ok := kiCache[rkv.kstr]
478 // purge kiCache if many keys but still missing in the cache
479 if !ok && len(kiCache) >= restoreChunkKeys {
480 i := 10
481 for k := range kiCache {
482 delete(kiCache, k)
483 if i--; i == 0 {
484 break
485 }
486 }
487 }
488 // cache miss, fetch from tree index if there
489 if !ok {
490 ki = &keyIndex{key: rkv.kv.Key}
491 if idxKey := idx.KeyIndex(ki); idxKey != nil {
492 kiCache[rkv.kstr], ki = idxKey, idxKey
493 ok = true
494 }
495 }
496 rev := bytesToRev(rkv.key)
497 currentRev = rev.main
498 if ok {
499 if isTombstone(rkv.key) {
500 ki.tombstone(lg, rev.main, rev.sub)
501 continue
502 }
503 ki.put(lg, rev.main, rev.sub)
504 } else if !isTombstone(rkv.key) {
505 ki.restore(lg, revision{rkv.kv.CreateRevision, 0}, rev, rkv.kv.Version)
506 idx.Insert(ki)
507 kiCache[rkv.kstr] = ki
508 }
509 }
510 }()
511 return rkvc, revc
512}
513
514func restoreChunk(lg *zap.Logger, kvc chan<- revKeyValue, keys, vals [][]byte, keyToLease map[string]lease.LeaseID) {
515 for i, key := range keys {
516 rkv := revKeyValue{key: key}
517 if err := rkv.kv.Unmarshal(vals[i]); err != nil {
518 if lg != nil {
519 lg.Fatal("failed to unmarshal mvccpb.KeyValue", zap.Error(err))
520 } else {
521 plog.Fatalf("cannot unmarshal event: %v", err)
522 }
523 }
524 rkv.kstr = string(rkv.kv.Key)
525 if isTombstone(key) {
526 delete(keyToLease, rkv.kstr)
527 } else if lid := lease.LeaseID(rkv.kv.Lease); lid != lease.NoLease {
528 keyToLease[rkv.kstr] = lid
529 } else {
530 delete(keyToLease, rkv.kstr)
531 }
532 kvc <- rkv
533 }
534}
535
536func (s *store) Close() error {
537 close(s.stopc)
538 s.fifoSched.Stop()
539 return nil
540}
541
542func (s *store) saveIndex(tx backend.BatchTx) {
543 if s.ig == nil {
544 return
545 }
546 bs := s.bytesBuf8
547 ci := s.ig.ConsistentIndex()
548 binary.BigEndian.PutUint64(bs, ci)
549 // put the index into the underlying backend
550 // tx has been locked in TxnBegin, so there is no need to lock it again
551 tx.UnsafePut(metaBucketName, consistentIndexKeyName, bs)
552 atomic.StoreUint64(&s.consistentIndex, ci)
553}
554
555func (s *store) ConsistentIndex() uint64 {
556 if ci := atomic.LoadUint64(&s.consistentIndex); ci > 0 {
557 return ci
558 }
559 tx := s.b.BatchTx()
560 tx.Lock()
561 defer tx.Unlock()
562 _, vs := tx.UnsafeRange(metaBucketName, consistentIndexKeyName, nil, 0)
563 if len(vs) == 0 {
564 return 0
565 }
566 v := binary.BigEndian.Uint64(vs[0])
567 atomic.StoreUint64(&s.consistentIndex, v)
568 return v
569}
570
571func (s *store) setupMetricsReporter() {
572 b := s.b
573 reportDbTotalSizeInBytesMu.Lock()
574 reportDbTotalSizeInBytes = func() float64 { return float64(b.Size()) }
575 reportDbTotalSizeInBytesMu.Unlock()
576 reportDbTotalSizeInBytesDebugMu.Lock()
577 reportDbTotalSizeInBytesDebug = func() float64 { return float64(b.Size()) }
578 reportDbTotalSizeInBytesDebugMu.Unlock()
579 reportDbTotalSizeInUseInBytesMu.Lock()
580 reportDbTotalSizeInUseInBytes = func() float64 { return float64(b.SizeInUse()) }
581 reportDbTotalSizeInUseInBytesMu.Unlock()
582 reportDbOpenReadTxNMu.Lock()
583 reportDbOpenReadTxN = func() float64 { return float64(b.OpenReadTxN()) }
584 reportDbOpenReadTxNMu.Unlock()
585 reportCurrentRevMu.Lock()
586 reportCurrentRev = func() float64 {
587 s.revMu.RLock()
588 defer s.revMu.RUnlock()
589 return float64(s.currentRev)
590 }
591 reportCurrentRevMu.Unlock()
592 reportCompactRevMu.Lock()
593 reportCompactRev = func() float64 {
594 s.revMu.RLock()
595 defer s.revMu.RUnlock()
596 return float64(s.compactMainRev)
597 }
598 reportCompactRevMu.Unlock()
599}
600
601// appendMarkTombstone appends tombstone mark to normal revision bytes.
602func appendMarkTombstone(lg *zap.Logger, b []byte) []byte {
603 if len(b) != revBytesLen {
604 if lg != nil {
605 lg.Panic(
606 "cannot append tombstone mark to non-normal revision bytes",
607 zap.Int("expected-revision-bytes-size", revBytesLen),
608 zap.Int("given-revision-bytes-size", len(b)),
609 )
610 } else {
611 plog.Panicf("cannot append mark to non normal revision bytes")
612 }
613 }
614 return append(b, markTombstone)
615}
616
617// isTombstone checks whether the revision bytes is a tombstone.
618func isTombstone(b []byte) bool {
619 return len(b) == markedRevBytesLen && b[markBytePosition] == markTombstone
620}