| // Copyright 2015 The etcd Authors |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package mvcc |
| |
| import ( |
| "context" |
| "encoding/binary" |
| "errors" |
| "fmt" |
| "hash/crc32" |
| "math" |
| "sync" |
| "sync/atomic" |
| "time" |
| |
| "go.etcd.io/etcd/lease" |
| "go.etcd.io/etcd/mvcc/backend" |
| "go.etcd.io/etcd/mvcc/mvccpb" |
| "go.etcd.io/etcd/pkg/schedule" |
| |
| "github.com/coreos/pkg/capnslog" |
| "go.uber.org/zap" |
| ) |
| |
| var ( |
| keyBucketName = []byte("key") |
| metaBucketName = []byte("meta") |
| |
| consistentIndexKeyName = []byte("consistent_index") |
| scheduledCompactKeyName = []byte("scheduledCompactRev") |
| finishedCompactKeyName = []byte("finishedCompactRev") |
| |
| ErrCompacted = errors.New("mvcc: required revision has been compacted") |
| ErrFutureRev = errors.New("mvcc: required revision is a future revision") |
| ErrCanceled = errors.New("mvcc: watcher is canceled") |
| ErrClosed = errors.New("mvcc: closed") |
| |
| plog = capnslog.NewPackageLogger("go.etcd.io/etcd", "mvcc") |
| ) |
| |
| const ( |
| // markedRevBytesLen is the byte length of marked revision. |
| // The first `revBytesLen` bytes represents a normal revision. The last |
| // one byte is the mark. |
| markedRevBytesLen = revBytesLen + 1 |
| markBytePosition = markedRevBytesLen - 1 |
| markTombstone byte = 't' |
| ) |
| |
| var restoreChunkKeys = 10000 // non-const for testing |
| var defaultCompactBatchLimit = 1000 |
| |
| // ConsistentIndexGetter is an interface that wraps the Get method. |
| // Consistent index is the offset of an entry in a consistent replicated log. |
| type ConsistentIndexGetter interface { |
| // ConsistentIndex returns the consistent index of current executing entry. |
| ConsistentIndex() uint64 |
| } |
| |
| type StoreConfig struct { |
| CompactionBatchLimit int |
| } |
| |
| type store struct { |
| ReadView |
| WriteView |
| |
| // consistentIndex caches the "consistent_index" key's value. Accessed |
| // through atomics so must be 64-bit aligned. |
| consistentIndex uint64 |
| |
| cfg StoreConfig |
| |
| // mu read locks for txns and write locks for non-txn store changes. |
| mu sync.RWMutex |
| |
| ig ConsistentIndexGetter |
| |
| b backend.Backend |
| kvindex index |
| |
| le lease.Lessor |
| |
| // revMuLock protects currentRev and compactMainRev. |
| // Locked at end of write txn and released after write txn unlock lock. |
| // Locked before locking read txn and released after locking. |
| revMu sync.RWMutex |
| // currentRev is the revision of the last completed transaction. |
| currentRev int64 |
| // compactMainRev is the main revision of the last compaction. |
| compactMainRev int64 |
| |
| // bytesBuf8 is a byte slice of length 8 |
| // to avoid a repetitive allocation in saveIndex. |
| bytesBuf8 []byte |
| |
| fifoSched schedule.Scheduler |
| |
| stopc chan struct{} |
| |
| lg *zap.Logger |
| } |
| |
| // NewStore returns a new store. It is useful to create a store inside |
| // mvcc pkg. It should only be used for testing externally. |
| func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter, cfg StoreConfig) *store { |
| if cfg.CompactionBatchLimit == 0 { |
| cfg.CompactionBatchLimit = defaultCompactBatchLimit |
| } |
| s := &store{ |
| cfg: cfg, |
| b: b, |
| ig: ig, |
| kvindex: newTreeIndex(lg), |
| |
| le: le, |
| |
| currentRev: 1, |
| compactMainRev: -1, |
| |
| bytesBuf8: make([]byte, 8), |
| fifoSched: schedule.NewFIFOScheduler(), |
| |
| stopc: make(chan struct{}), |
| |
| lg: lg, |
| } |
| s.ReadView = &readView{s} |
| s.WriteView = &writeView{s} |
| if s.le != nil { |
| s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write() }) |
| } |
| |
| tx := s.b.BatchTx() |
| tx.Lock() |
| tx.UnsafeCreateBucket(keyBucketName) |
| tx.UnsafeCreateBucket(metaBucketName) |
| tx.Unlock() |
| s.b.ForceCommit() |
| |
| s.mu.Lock() |
| defer s.mu.Unlock() |
| if err := s.restore(); err != nil { |
| // TODO: return the error instead of panic here? |
| panic("failed to recover store from backend") |
| } |
| |
| return s |
| } |
| |
| func (s *store) compactBarrier(ctx context.Context, ch chan struct{}) { |
| if ctx == nil || ctx.Err() != nil { |
| s.mu.Lock() |
| select { |
| case <-s.stopc: |
| default: |
| f := func(ctx context.Context) { s.compactBarrier(ctx, ch) } |
| s.fifoSched.Schedule(f) |
| } |
| s.mu.Unlock() |
| return |
| } |
| close(ch) |
| } |
| |
| func (s *store) Hash() (hash uint32, revision int64, err error) { |
| // TODO: hash and revision could be inconsistent, one possible fix is to add s.revMu.RLock() at the beginning of function, which is costly |
| start := time.Now() |
| |
| s.b.ForceCommit() |
| h, err := s.b.Hash(DefaultIgnores) |
| |
| hashSec.Observe(time.Since(start).Seconds()) |
| return h, s.currentRev, err |
| } |
| |
| func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev int64, err error) { |
| start := time.Now() |
| |
| s.mu.RLock() |
| s.revMu.RLock() |
| compactRev, currentRev = s.compactMainRev, s.currentRev |
| s.revMu.RUnlock() |
| |
| if rev > 0 && rev <= compactRev { |
| s.mu.RUnlock() |
| return 0, 0, compactRev, ErrCompacted |
| } else if rev > 0 && rev > currentRev { |
| s.mu.RUnlock() |
| return 0, currentRev, 0, ErrFutureRev |
| } |
| |
| if rev == 0 { |
| rev = currentRev |
| } |
| keep := s.kvindex.Keep(rev) |
| |
| tx := s.b.ReadTx() |
| tx.RLock() |
| defer tx.RUnlock() |
| s.mu.RUnlock() |
| |
| upper := revision{main: rev + 1} |
| lower := revision{main: compactRev + 1} |
| h := crc32.New(crc32.MakeTable(crc32.Castagnoli)) |
| |
| h.Write(keyBucketName) |
| err = tx.UnsafeForEach(keyBucketName, func(k, v []byte) error { |
| kr := bytesToRev(k) |
| if !upper.GreaterThan(kr) { |
| return nil |
| } |
| // skip revisions that are scheduled for deletion |
| // due to compacting; don't skip if there isn't one. |
| if lower.GreaterThan(kr) && len(keep) > 0 { |
| if _, ok := keep[kr]; !ok { |
| return nil |
| } |
| } |
| h.Write(k) |
| h.Write(v) |
| return nil |
| }) |
| hash = h.Sum32() |
| |
| hashRevSec.Observe(time.Since(start).Seconds()) |
| return hash, currentRev, compactRev, err |
| } |
| |
| func (s *store) updateCompactRev(rev int64) (<-chan struct{}, error) { |
| s.revMu.Lock() |
| if rev <= s.compactMainRev { |
| ch := make(chan struct{}) |
| f := func(ctx context.Context) { s.compactBarrier(ctx, ch) } |
| s.fifoSched.Schedule(f) |
| s.revMu.Unlock() |
| return ch, ErrCompacted |
| } |
| if rev > s.currentRev { |
| s.revMu.Unlock() |
| return nil, ErrFutureRev |
| } |
| |
| s.compactMainRev = rev |
| |
| rbytes := newRevBytes() |
| revToBytes(revision{main: rev}, rbytes) |
| |
| tx := s.b.BatchTx() |
| tx.Lock() |
| tx.UnsafePut(metaBucketName, scheduledCompactKeyName, rbytes) |
| tx.Unlock() |
| // ensure that desired compaction is persisted |
| s.b.ForceCommit() |
| |
| s.revMu.Unlock() |
| |
| return nil, nil |
| } |
| |
| func (s *store) compact(rev int64) (<-chan struct{}, error) { |
| start := time.Now() |
| keep := s.kvindex.Compact(rev) |
| ch := make(chan struct{}) |
| var j = func(ctx context.Context) { |
| if ctx.Err() != nil { |
| s.compactBarrier(ctx, ch) |
| return |
| } |
| if !s.scheduleCompaction(rev, keep) { |
| s.compactBarrier(nil, ch) |
| return |
| } |
| close(ch) |
| } |
| |
| s.fifoSched.Schedule(j) |
| |
| indexCompactionPauseMs.Observe(float64(time.Since(start) / time.Millisecond)) |
| return ch, nil |
| } |
| |
| func (s *store) compactLockfree(rev int64) (<-chan struct{}, error) { |
| ch, err := s.updateCompactRev(rev) |
| if nil != err { |
| return ch, err |
| } |
| |
| return s.compact(rev) |
| } |
| |
| func (s *store) Compact(rev int64) (<-chan struct{}, error) { |
| s.mu.Lock() |
| |
| ch, err := s.updateCompactRev(rev) |
| |
| if err != nil { |
| s.mu.Unlock() |
| return ch, err |
| } |
| s.mu.Unlock() |
| |
| return s.compact(rev) |
| } |
| |
| // DefaultIgnores is a map of keys to ignore in hash checking. |
| var DefaultIgnores map[backend.IgnoreKey]struct{} |
| |
| func init() { |
| DefaultIgnores = map[backend.IgnoreKey]struct{}{ |
| // consistent index might be changed due to v2 internal sync, which |
| // is not controllable by the user. |
| {Bucket: string(metaBucketName), Key: string(consistentIndexKeyName)}: {}, |
| } |
| } |
| |
| func (s *store) Commit() { |
| s.mu.Lock() |
| defer s.mu.Unlock() |
| |
| tx := s.b.BatchTx() |
| tx.Lock() |
| s.saveIndex(tx) |
| tx.Unlock() |
| s.b.ForceCommit() |
| } |
| |
| func (s *store) Restore(b backend.Backend) error { |
| s.mu.Lock() |
| defer s.mu.Unlock() |
| |
| close(s.stopc) |
| s.fifoSched.Stop() |
| |
| atomic.StoreUint64(&s.consistentIndex, 0) |
| s.b = b |
| s.kvindex = newTreeIndex(s.lg) |
| s.currentRev = 1 |
| s.compactMainRev = -1 |
| s.fifoSched = schedule.NewFIFOScheduler() |
| s.stopc = make(chan struct{}) |
| |
| return s.restore() |
| } |
| |
| func (s *store) restore() error { |
| s.setupMetricsReporter() |
| |
| min, max := newRevBytes(), newRevBytes() |
| revToBytes(revision{main: 1}, min) |
| revToBytes(revision{main: math.MaxInt64, sub: math.MaxInt64}, max) |
| |
| keyToLease := make(map[string]lease.LeaseID) |
| |
| // restore index |
| tx := s.b.BatchTx() |
| tx.Lock() |
| |
| _, finishedCompactBytes := tx.UnsafeRange(metaBucketName, finishedCompactKeyName, nil, 0) |
| if len(finishedCompactBytes) != 0 { |
| s.compactMainRev = bytesToRev(finishedCompactBytes[0]).main |
| |
| if s.lg != nil { |
| s.lg.Info( |
| "restored last compact revision", |
| zap.String("meta-bucket-name", string(metaBucketName)), |
| zap.String("meta-bucket-name-key", string(finishedCompactKeyName)), |
| zap.Int64("restored-compact-revision", s.compactMainRev), |
| ) |
| } else { |
| plog.Printf("restore compact to %d", s.compactMainRev) |
| } |
| } |
| _, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0) |
| scheduledCompact := int64(0) |
| if len(scheduledCompactBytes) != 0 { |
| scheduledCompact = bytesToRev(scheduledCompactBytes[0]).main |
| } |
| |
| // index keys concurrently as they're loaded in from tx |
| keysGauge.Set(0) |
| rkvc, revc := restoreIntoIndex(s.lg, s.kvindex) |
| for { |
| keys, vals := tx.UnsafeRange(keyBucketName, min, max, int64(restoreChunkKeys)) |
| if len(keys) == 0 { |
| break |
| } |
| // rkvc blocks if the total pending keys exceeds the restore |
| // chunk size to keep keys from consuming too much memory. |
| restoreChunk(s.lg, rkvc, keys, vals, keyToLease) |
| if len(keys) < restoreChunkKeys { |
| // partial set implies final set |
| break |
| } |
| // next set begins after where this one ended |
| newMin := bytesToRev(keys[len(keys)-1][:revBytesLen]) |
| newMin.sub++ |
| revToBytes(newMin, min) |
| } |
| close(rkvc) |
| s.currentRev = <-revc |
| |
| // keys in the range [compacted revision -N, compaction] might all be deleted due to compaction. |
| // the correct revision should be set to compaction revision in the case, not the largest revision |
| // we have seen. |
| if s.currentRev < s.compactMainRev { |
| s.currentRev = s.compactMainRev |
| } |
| if scheduledCompact <= s.compactMainRev { |
| scheduledCompact = 0 |
| } |
| |
| for key, lid := range keyToLease { |
| if s.le == nil { |
| panic("no lessor to attach lease") |
| } |
| err := s.le.Attach(lid, []lease.LeaseItem{{Key: key}}) |
| if err != nil { |
| if s.lg != nil { |
| s.lg.Warn( |
| "failed to attach a lease", |
| zap.String("lease-id", fmt.Sprintf("%016x", lid)), |
| zap.Error(err), |
| ) |
| } else { |
| plog.Errorf("unexpected Attach error: %v", err) |
| } |
| } |
| } |
| |
| tx.Unlock() |
| |
| if scheduledCompact != 0 { |
| s.compactLockfree(scheduledCompact) |
| |
| if s.lg != nil { |
| s.lg.Info( |
| "resume scheduled compaction", |
| zap.String("meta-bucket-name", string(metaBucketName)), |
| zap.String("meta-bucket-name-key", string(scheduledCompactKeyName)), |
| zap.Int64("scheduled-compact-revision", scheduledCompact), |
| ) |
| } else { |
| plog.Printf("resume scheduled compaction at %d", scheduledCompact) |
| } |
| } |
| |
| return nil |
| } |
| |
| type revKeyValue struct { |
| key []byte |
| kv mvccpb.KeyValue |
| kstr string |
| } |
| |
| func restoreIntoIndex(lg *zap.Logger, idx index) (chan<- revKeyValue, <-chan int64) { |
| rkvc, revc := make(chan revKeyValue, restoreChunkKeys), make(chan int64, 1) |
| go func() { |
| currentRev := int64(1) |
| defer func() { revc <- currentRev }() |
| // restore the tree index from streaming the unordered index. |
| kiCache := make(map[string]*keyIndex, restoreChunkKeys) |
| for rkv := range rkvc { |
| ki, ok := kiCache[rkv.kstr] |
| // purge kiCache if many keys but still missing in the cache |
| if !ok && len(kiCache) >= restoreChunkKeys { |
| i := 10 |
| for k := range kiCache { |
| delete(kiCache, k) |
| if i--; i == 0 { |
| break |
| } |
| } |
| } |
| // cache miss, fetch from tree index if there |
| if !ok { |
| ki = &keyIndex{key: rkv.kv.Key} |
| if idxKey := idx.KeyIndex(ki); idxKey != nil { |
| kiCache[rkv.kstr], ki = idxKey, idxKey |
| ok = true |
| } |
| } |
| rev := bytesToRev(rkv.key) |
| currentRev = rev.main |
| if ok { |
| if isTombstone(rkv.key) { |
| ki.tombstone(lg, rev.main, rev.sub) |
| continue |
| } |
| ki.put(lg, rev.main, rev.sub) |
| } else if !isTombstone(rkv.key) { |
| ki.restore(lg, revision{rkv.kv.CreateRevision, 0}, rev, rkv.kv.Version) |
| idx.Insert(ki) |
| kiCache[rkv.kstr] = ki |
| } |
| } |
| }() |
| return rkvc, revc |
| } |
| |
| func restoreChunk(lg *zap.Logger, kvc chan<- revKeyValue, keys, vals [][]byte, keyToLease map[string]lease.LeaseID) { |
| for i, key := range keys { |
| rkv := revKeyValue{key: key} |
| if err := rkv.kv.Unmarshal(vals[i]); err != nil { |
| if lg != nil { |
| lg.Fatal("failed to unmarshal mvccpb.KeyValue", zap.Error(err)) |
| } else { |
| plog.Fatalf("cannot unmarshal event: %v", err) |
| } |
| } |
| rkv.kstr = string(rkv.kv.Key) |
| if isTombstone(key) { |
| delete(keyToLease, rkv.kstr) |
| } else if lid := lease.LeaseID(rkv.kv.Lease); lid != lease.NoLease { |
| keyToLease[rkv.kstr] = lid |
| } else { |
| delete(keyToLease, rkv.kstr) |
| } |
| kvc <- rkv |
| } |
| } |
| |
| func (s *store) Close() error { |
| close(s.stopc) |
| s.fifoSched.Stop() |
| return nil |
| } |
| |
| func (s *store) saveIndex(tx backend.BatchTx) { |
| if s.ig == nil { |
| return |
| } |
| bs := s.bytesBuf8 |
| ci := s.ig.ConsistentIndex() |
| binary.BigEndian.PutUint64(bs, ci) |
| // put the index into the underlying backend |
| // tx has been locked in TxnBegin, so there is no need to lock it again |
| tx.UnsafePut(metaBucketName, consistentIndexKeyName, bs) |
| atomic.StoreUint64(&s.consistentIndex, ci) |
| } |
| |
| func (s *store) ConsistentIndex() uint64 { |
| if ci := atomic.LoadUint64(&s.consistentIndex); ci > 0 { |
| return ci |
| } |
| tx := s.b.BatchTx() |
| tx.Lock() |
| defer tx.Unlock() |
| _, vs := tx.UnsafeRange(metaBucketName, consistentIndexKeyName, nil, 0) |
| if len(vs) == 0 { |
| return 0 |
| } |
| v := binary.BigEndian.Uint64(vs[0]) |
| atomic.StoreUint64(&s.consistentIndex, v) |
| return v |
| } |
| |
| func (s *store) setupMetricsReporter() { |
| b := s.b |
| reportDbTotalSizeInBytesMu.Lock() |
| reportDbTotalSizeInBytes = func() float64 { return float64(b.Size()) } |
| reportDbTotalSizeInBytesMu.Unlock() |
| reportDbTotalSizeInBytesDebugMu.Lock() |
| reportDbTotalSizeInBytesDebug = func() float64 { return float64(b.Size()) } |
| reportDbTotalSizeInBytesDebugMu.Unlock() |
| reportDbTotalSizeInUseInBytesMu.Lock() |
| reportDbTotalSizeInUseInBytes = func() float64 { return float64(b.SizeInUse()) } |
| reportDbTotalSizeInUseInBytesMu.Unlock() |
| reportDbOpenReadTxNMu.Lock() |
| reportDbOpenReadTxN = func() float64 { return float64(b.OpenReadTxN()) } |
| reportDbOpenReadTxNMu.Unlock() |
| reportCurrentRevMu.Lock() |
| reportCurrentRev = func() float64 { |
| s.revMu.RLock() |
| defer s.revMu.RUnlock() |
| return float64(s.currentRev) |
| } |
| reportCurrentRevMu.Unlock() |
| reportCompactRevMu.Lock() |
| reportCompactRev = func() float64 { |
| s.revMu.RLock() |
| defer s.revMu.RUnlock() |
| return float64(s.compactMainRev) |
| } |
| reportCompactRevMu.Unlock() |
| } |
| |
| // appendMarkTombstone appends tombstone mark to normal revision bytes. |
| func appendMarkTombstone(lg *zap.Logger, b []byte) []byte { |
| if len(b) != revBytesLen { |
| if lg != nil { |
| lg.Panic( |
| "cannot append tombstone mark to non-normal revision bytes", |
| zap.Int("expected-revision-bytes-size", revBytesLen), |
| zap.Int("given-revision-bytes-size", len(b)), |
| ) |
| } else { |
| plog.Panicf("cannot append mark to non normal revision bytes") |
| } |
| } |
| return append(b, markTombstone) |
| } |
| |
| // isTombstone checks whether the revision bytes is a tombstone. |
| func isTombstone(b []byte) bool { |
| return len(b) == markedRevBytesLen && b[markBytePosition] == markTombstone |
| } |