blob: dd9f04ae211355ecb3d78ea6f3f0f409030b089f [file] [log] [blame]
khenaidooffe076b2019-01-15 16:08:08 -05001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package mvcc
16
17import (
18 "context"
19 "encoding/binary"
20 "errors"
21 "hash/crc32"
22 "math"
23 "sync"
24 "sync/atomic"
25 "time"
26
27 "github.com/coreos/etcd/lease"
28 "github.com/coreos/etcd/mvcc/backend"
29 "github.com/coreos/etcd/mvcc/mvccpb"
30 "github.com/coreos/etcd/pkg/schedule"
31 "github.com/coreos/pkg/capnslog"
32)
33
34var (
35 keyBucketName = []byte("key")
36 metaBucketName = []byte("meta")
37
38 consistentIndexKeyName = []byte("consistent_index")
39 scheduledCompactKeyName = []byte("scheduledCompactRev")
40 finishedCompactKeyName = []byte("finishedCompactRev")
41
42 ErrCompacted = errors.New("mvcc: required revision has been compacted")
43 ErrFutureRev = errors.New("mvcc: required revision is a future revision")
44 ErrCanceled = errors.New("mvcc: watcher is canceled")
45 ErrClosed = errors.New("mvcc: closed")
46
47 plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "mvcc")
48)
49
50const (
51 // markedRevBytesLen is the byte length of marked revision.
52 // The first `revBytesLen` bytes represents a normal revision. The last
53 // one byte is the mark.
54 markedRevBytesLen = revBytesLen + 1
55 markBytePosition = markedRevBytesLen - 1
56 markTombstone byte = 't'
57)
58
59var restoreChunkKeys = 10000 // non-const for testing
60
61// ConsistentIndexGetter is an interface that wraps the Get method.
62// Consistent index is the offset of an entry in a consistent replicated log.
63type ConsistentIndexGetter interface {
64 // ConsistentIndex returns the consistent index of current executing entry.
65 ConsistentIndex() uint64
66}
67
68type store struct {
69 ReadView
70 WriteView
71
72 // consistentIndex caches the "consistent_index" key's value. Accessed
73 // through atomics so must be 64-bit aligned.
74 consistentIndex uint64
75
76 // mu read locks for txns and write locks for non-txn store changes.
77 mu sync.RWMutex
78
79 ig ConsistentIndexGetter
80
81 b backend.Backend
82 kvindex index
83
84 le lease.Lessor
85
86 // revMuLock protects currentRev and compactMainRev.
87 // Locked at end of write txn and released after write txn unlock lock.
88 // Locked before locking read txn and released after locking.
89 revMu sync.RWMutex
90 // currentRev is the revision of the last completed transaction.
91 currentRev int64
92 // compactMainRev is the main revision of the last compaction.
93 compactMainRev int64
94
95 // bytesBuf8 is a byte slice of length 8
96 // to avoid a repetitive allocation in saveIndex.
97 bytesBuf8 []byte
98
99 fifoSched schedule.Scheduler
100
101 stopc chan struct{}
102}
103
104// NewStore returns a new store. It is useful to create a store inside
105// mvcc pkg. It should only be used for testing externally.
106func NewStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) *store {
107 s := &store{
108 b: b,
109 ig: ig,
110 kvindex: newTreeIndex(),
111
112 le: le,
113
114 currentRev: 1,
115 compactMainRev: -1,
116
117 bytesBuf8: make([]byte, 8),
118 fifoSched: schedule.NewFIFOScheduler(),
119
120 stopc: make(chan struct{}),
121 }
122 s.ReadView = &readView{s}
123 s.WriteView = &writeView{s}
124 if s.le != nil {
125 s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write() })
126 }
127
128 tx := s.b.BatchTx()
129 tx.Lock()
130 tx.UnsafeCreateBucket(keyBucketName)
131 tx.UnsafeCreateBucket(metaBucketName)
132 tx.Unlock()
133 s.b.ForceCommit()
134
135 if err := s.restore(); err != nil {
136 // TODO: return the error instead of panic here?
137 panic("failed to recover store from backend")
138 }
139
140 return s
141}
142
143func (s *store) compactBarrier(ctx context.Context, ch chan struct{}) {
144 if ctx == nil || ctx.Err() != nil {
145 s.mu.Lock()
146 select {
147 case <-s.stopc:
148 default:
149 f := func(ctx context.Context) { s.compactBarrier(ctx, ch) }
150 s.fifoSched.Schedule(f)
151 }
152 s.mu.Unlock()
153 return
154 }
155 close(ch)
156}
157
158func (s *store) Hash() (hash uint32, revision int64, err error) {
159 start := time.Now()
160
161 s.b.ForceCommit()
162 h, err := s.b.Hash(DefaultIgnores)
163
164 hashDurations.Observe(time.Since(start).Seconds())
165 return h, s.currentRev, err
166}
167
168func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev int64, err error) {
169 start := time.Now()
170
171 s.mu.RLock()
172 s.revMu.RLock()
173 compactRev, currentRev = s.compactMainRev, s.currentRev
174 s.revMu.RUnlock()
175
176 if rev > 0 && rev <= compactRev {
177 s.mu.RUnlock()
178 return 0, 0, compactRev, ErrCompacted
179 } else if rev > 0 && rev > currentRev {
180 s.mu.RUnlock()
181 return 0, currentRev, 0, ErrFutureRev
182 }
183
184 if rev == 0 {
185 rev = currentRev
186 }
187 keep := s.kvindex.Keep(rev)
188
189 tx := s.b.ReadTx()
190 tx.Lock()
191 defer tx.Unlock()
192 s.mu.RUnlock()
193
194 upper := revision{main: rev + 1}
195 lower := revision{main: compactRev + 1}
196 h := crc32.New(crc32.MakeTable(crc32.Castagnoli))
197
198 h.Write(keyBucketName)
199 err = tx.UnsafeForEach(keyBucketName, func(k, v []byte) error {
200 kr := bytesToRev(k)
201 if !upper.GreaterThan(kr) {
202 return nil
203 }
204 // skip revisions that are scheduled for deletion
205 // due to compacting; don't skip if there isn't one.
206 if lower.GreaterThan(kr) && len(keep) > 0 {
207 if _, ok := keep[kr]; !ok {
208 return nil
209 }
210 }
211 h.Write(k)
212 h.Write(v)
213 return nil
214 })
215 hash = h.Sum32()
216
217 hashRevDurations.Observe(time.Since(start).Seconds())
218 return hash, currentRev, compactRev, err
219}
220
221func (s *store) Compact(rev int64) (<-chan struct{}, error) {
222 s.mu.Lock()
223 defer s.mu.Unlock()
224 s.revMu.Lock()
225 defer s.revMu.Unlock()
226
227 if rev <= s.compactMainRev {
228 ch := make(chan struct{})
229 f := func(ctx context.Context) { s.compactBarrier(ctx, ch) }
230 s.fifoSched.Schedule(f)
231 return ch, ErrCompacted
232 }
233 if rev > s.currentRev {
234 return nil, ErrFutureRev
235 }
236
237 start := time.Now()
238
239 s.compactMainRev = rev
240
241 rbytes := newRevBytes()
242 revToBytes(revision{main: rev}, rbytes)
243
244 tx := s.b.BatchTx()
245 tx.Lock()
246 tx.UnsafePut(metaBucketName, scheduledCompactKeyName, rbytes)
247 tx.Unlock()
248 // ensure that desired compaction is persisted
249 s.b.ForceCommit()
250
251 keep := s.kvindex.Compact(rev)
252 ch := make(chan struct{})
253 var j = func(ctx context.Context) {
254 if ctx.Err() != nil {
255 s.compactBarrier(ctx, ch)
256 return
257 }
258 if !s.scheduleCompaction(rev, keep) {
259 s.compactBarrier(nil, ch)
260 return
261 }
262 close(ch)
263 }
264
265 s.fifoSched.Schedule(j)
266
267 indexCompactionPauseDurations.Observe(float64(time.Since(start) / time.Millisecond))
268 return ch, nil
269}
270
271// DefaultIgnores is a map of keys to ignore in hash checking.
272var DefaultIgnores map[backend.IgnoreKey]struct{}
273
274func init() {
275 DefaultIgnores = map[backend.IgnoreKey]struct{}{
276 // consistent index might be changed due to v2 internal sync, which
277 // is not controllable by the user.
278 {Bucket: string(metaBucketName), Key: string(consistentIndexKeyName)}: {},
279 }
280}
281
282func (s *store) Commit() {
283 s.mu.Lock()
284 defer s.mu.Unlock()
285
286 tx := s.b.BatchTx()
287 tx.Lock()
288 s.saveIndex(tx)
289 tx.Unlock()
290 s.b.ForceCommit()
291}
292
293func (s *store) Restore(b backend.Backend) error {
294 s.mu.Lock()
295 defer s.mu.Unlock()
296
297 close(s.stopc)
298 s.fifoSched.Stop()
299
300 atomic.StoreUint64(&s.consistentIndex, 0)
301 s.b = b
302 s.kvindex = newTreeIndex()
303 s.currentRev = 1
304 s.compactMainRev = -1
305 s.fifoSched = schedule.NewFIFOScheduler()
306 s.stopc = make(chan struct{})
307
308 return s.restore()
309}
310
311func (s *store) restore() error {
312 b := s.b
313
314 reportDbTotalSizeInBytesMu.Lock()
315 reportDbTotalSizeInBytes = func() float64 { return float64(b.Size()) }
316 reportDbTotalSizeInBytesMu.Unlock()
317 reportDbTotalSizeInUseInBytesMu.Lock()
318 reportDbTotalSizeInUseInBytes = func() float64 { return float64(b.SizeInUse()) }
319 reportDbTotalSizeInUseInBytesMu.Unlock()
320
321 min, max := newRevBytes(), newRevBytes()
322 revToBytes(revision{main: 1}, min)
323 revToBytes(revision{main: math.MaxInt64, sub: math.MaxInt64}, max)
324
325 keyToLease := make(map[string]lease.LeaseID)
326
327 // restore index
328 tx := s.b.BatchTx()
329 tx.Lock()
330
331 _, finishedCompactBytes := tx.UnsafeRange(metaBucketName, finishedCompactKeyName, nil, 0)
332 if len(finishedCompactBytes) != 0 {
333 s.compactMainRev = bytesToRev(finishedCompactBytes[0]).main
334 plog.Printf("restore compact to %d", s.compactMainRev)
335 }
336 _, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0)
337 scheduledCompact := int64(0)
338 if len(scheduledCompactBytes) != 0 {
339 scheduledCompact = bytesToRev(scheduledCompactBytes[0]).main
340 }
341
342 // index keys concurrently as they're loaded in from tx
343 keysGauge.Set(0)
344 rkvc, revc := restoreIntoIndex(s.kvindex)
345 for {
346 keys, vals := tx.UnsafeRange(keyBucketName, min, max, int64(restoreChunkKeys))
347 if len(keys) == 0 {
348 break
349 }
350 // rkvc blocks if the total pending keys exceeds the restore
351 // chunk size to keep keys from consuming too much memory.
352 restoreChunk(rkvc, keys, vals, keyToLease)
353 if len(keys) < restoreChunkKeys {
354 // partial set implies final set
355 break
356 }
357 // next set begins after where this one ended
358 newMin := bytesToRev(keys[len(keys)-1][:revBytesLen])
359 newMin.sub++
360 revToBytes(newMin, min)
361 }
362 close(rkvc)
363 s.currentRev = <-revc
364
365 // keys in the range [compacted revision -N, compaction] might all be deleted due to compaction.
366 // the correct revision should be set to compaction revision in the case, not the largest revision
367 // we have seen.
368 if s.currentRev < s.compactMainRev {
369 s.currentRev = s.compactMainRev
370 }
371 if scheduledCompact <= s.compactMainRev {
372 scheduledCompact = 0
373 }
374
375 for key, lid := range keyToLease {
376 if s.le == nil {
377 panic("no lessor to attach lease")
378 }
379 err := s.le.Attach(lid, []lease.LeaseItem{{Key: key}})
380 if err != nil {
381 plog.Errorf("unexpected Attach error: %v", err)
382 }
383 }
384
385 tx.Unlock()
386
387 if scheduledCompact != 0 {
388 s.Compact(scheduledCompact)
389 plog.Printf("resume scheduled compaction at %d", scheduledCompact)
390 }
391
392 return nil
393}
394
395type revKeyValue struct {
396 key []byte
397 kv mvccpb.KeyValue
398 kstr string
399}
400
401func restoreIntoIndex(idx index) (chan<- revKeyValue, <-chan int64) {
402 rkvc, revc := make(chan revKeyValue, restoreChunkKeys), make(chan int64, 1)
403 go func() {
404 currentRev := int64(1)
405 defer func() { revc <- currentRev }()
406 // restore the tree index from streaming the unordered index.
407 kiCache := make(map[string]*keyIndex, restoreChunkKeys)
408 for rkv := range rkvc {
409 ki, ok := kiCache[rkv.kstr]
410 // purge kiCache if many keys but still missing in the cache
411 if !ok && len(kiCache) >= restoreChunkKeys {
412 i := 10
413 for k := range kiCache {
414 delete(kiCache, k)
415 if i--; i == 0 {
416 break
417 }
418 }
419 }
420 // cache miss, fetch from tree index if there
421 if !ok {
422 ki = &keyIndex{key: rkv.kv.Key}
423 if idxKey := idx.KeyIndex(ki); idxKey != nil {
424 kiCache[rkv.kstr], ki = idxKey, idxKey
425 ok = true
426 }
427 }
428 rev := bytesToRev(rkv.key)
429 currentRev = rev.main
430 if ok {
431 if isTombstone(rkv.key) {
432 ki.tombstone(rev.main, rev.sub)
433 continue
434 }
435 ki.put(rev.main, rev.sub)
436 } else if !isTombstone(rkv.key) {
437 ki.restore(revision{rkv.kv.CreateRevision, 0}, rev, rkv.kv.Version)
438 idx.Insert(ki)
439 kiCache[rkv.kstr] = ki
440 }
441 }
442 }()
443 return rkvc, revc
444}
445
446func restoreChunk(kvc chan<- revKeyValue, keys, vals [][]byte, keyToLease map[string]lease.LeaseID) {
447 for i, key := range keys {
448 rkv := revKeyValue{key: key}
449 if err := rkv.kv.Unmarshal(vals[i]); err != nil {
450 plog.Fatalf("cannot unmarshal event: %v", err)
451 }
452 rkv.kstr = string(rkv.kv.Key)
453 if isTombstone(key) {
454 delete(keyToLease, rkv.kstr)
455 } else if lid := lease.LeaseID(rkv.kv.Lease); lid != lease.NoLease {
456 keyToLease[rkv.kstr] = lid
457 } else {
458 delete(keyToLease, rkv.kstr)
459 }
460 kvc <- rkv
461 }
462}
463
464func (s *store) Close() error {
465 close(s.stopc)
466 s.fifoSched.Stop()
467 return nil
468}
469
470func (s *store) saveIndex(tx backend.BatchTx) {
471 if s.ig == nil {
472 return
473 }
474 bs := s.bytesBuf8
475 ci := s.ig.ConsistentIndex()
476 binary.BigEndian.PutUint64(bs, ci)
477 // put the index into the underlying backend
478 // tx has been locked in TxnBegin, so there is no need to lock it again
479 tx.UnsafePut(metaBucketName, consistentIndexKeyName, bs)
480 atomic.StoreUint64(&s.consistentIndex, ci)
481}
482
483func (s *store) ConsistentIndex() uint64 {
484 if ci := atomic.LoadUint64(&s.consistentIndex); ci > 0 {
485 return ci
486 }
487 tx := s.b.BatchTx()
488 tx.Lock()
489 defer tx.Unlock()
490 _, vs := tx.UnsafeRange(metaBucketName, consistentIndexKeyName, nil, 0)
491 if len(vs) == 0 {
492 return 0
493 }
494 v := binary.BigEndian.Uint64(vs[0])
495 atomic.StoreUint64(&s.consistentIndex, v)
496 return v
497}
498
499// appendMarkTombstone appends tombstone mark to normal revision bytes.
500func appendMarkTombstone(b []byte) []byte {
501 if len(b) != revBytesLen {
502 plog.Panicf("cannot append mark to non normal revision bytes")
503 }
504 return append(b, markTombstone)
505}
506
507// isTombstone checks whether the revision bytes is a tombstone.
508func isTombstone(b []byte) bool {
509 return len(b) == markedRevBytesLen && b[markBytePosition] == markTombstone
510}