blob: bffd74950b461464256ce8bb30626cdf95733aab [file] [log] [blame]
khenaidoo59ce9dd2019-11-11 13:05:32 -05001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package backend
16
17import (
18 "fmt"
19 "hash/crc32"
20 "io"
21 "io/ioutil"
22 "os"
23 "path/filepath"
24 "sync"
25 "sync/atomic"
26 "time"
27
28 "github.com/coreos/pkg/capnslog"
29 humanize "github.com/dustin/go-humanize"
30 bolt "go.etcd.io/bbolt"
31 "go.uber.org/zap"
32)
33
34var (
35 defaultBatchLimit = 10000
36 defaultBatchInterval = 100 * time.Millisecond
37
38 defragLimit = 10000
39
40 // initialMmapSize is the initial size of the mmapped region. Setting this larger than
41 // the potential max db size can prevent writer from blocking reader.
42 // This only works for linux.
43 initialMmapSize = uint64(10 * 1024 * 1024 * 1024)
44
45 plog = capnslog.NewPackageLogger("go.etcd.io/etcd", "mvcc/backend")
46
47 // minSnapshotWarningTimeout is the minimum threshold to trigger a long running snapshot warning.
48 minSnapshotWarningTimeout = 30 * time.Second
49)
50
51type Backend interface {
52 // ReadTx returns a read transaction. It is replaced by ConcurrentReadTx in the main data path, see #10523.
53 ReadTx() ReadTx
54 BatchTx() BatchTx
55 // ConcurrentReadTx returns a non-blocking read transaction.
56 ConcurrentReadTx() ReadTx
57
58 Snapshot() Snapshot
59 Hash(ignores map[IgnoreKey]struct{}) (uint32, error)
60 // Size returns the current size of the backend physically allocated.
61 // The backend can hold DB space that is not utilized at the moment,
62 // since it can conduct pre-allocation or spare unused space for recycling.
63 // Use SizeInUse() instead for the actual DB size.
64 Size() int64
65 // SizeInUse returns the current size of the backend logically in use.
66 // Since the backend can manage free space in a non-byte unit such as
67 // number of pages, the returned value can be not exactly accurate in bytes.
68 SizeInUse() int64
69 // OpenReadTxN returns the number of currently open read transactions in the backend.
70 OpenReadTxN() int64
71 Defrag() error
72 ForceCommit()
73 Close() error
74}
75
76type Snapshot interface {
77 // Size gets the size of the snapshot.
78 Size() int64
79 // WriteTo writes the snapshot into the given writer.
80 WriteTo(w io.Writer) (n int64, err error)
81 // Close closes the snapshot.
82 Close() error
83}
84
85type backend struct {
86 // size and commits are used with atomic operations so they must be
87 // 64-bit aligned, otherwise 32-bit tests will crash
88
89 // size is the number of bytes allocated in the backend
90 size int64
91 // sizeInUse is the number of bytes actually used in the backend
92 sizeInUse int64
93 // commits counts number of commits since start
94 commits int64
95 // openReadTxN is the number of currently open read transactions in the backend
96 openReadTxN int64
97
98 mu sync.RWMutex
99 db *bolt.DB
100
101 batchInterval time.Duration
102 batchLimit int
103 batchTx *batchTxBuffered
104
105 readTx *readTx
106
107 stopc chan struct{}
108 donec chan struct{}
109
110 lg *zap.Logger
111}
112
113type BackendConfig struct {
114 // Path is the file path to the backend file.
115 Path string
116 // BatchInterval is the maximum time before flushing the BatchTx.
117 BatchInterval time.Duration
118 // BatchLimit is the maximum puts before flushing the BatchTx.
119 BatchLimit int
120 // BackendFreelistType is the backend boltdb's freelist type.
121 BackendFreelistType bolt.FreelistType
122 // MmapSize is the number of bytes to mmap for the backend.
123 MmapSize uint64
124 // Logger logs backend-side operations.
125 Logger *zap.Logger
126}
127
128func DefaultBackendConfig() BackendConfig {
129 return BackendConfig{
130 BatchInterval: defaultBatchInterval,
131 BatchLimit: defaultBatchLimit,
132 MmapSize: initialMmapSize,
133 }
134}
135
136func New(bcfg BackendConfig) Backend {
137 return newBackend(bcfg)
138}
139
140func NewDefaultBackend(path string) Backend {
141 bcfg := DefaultBackendConfig()
142 bcfg.Path = path
143 return newBackend(bcfg)
144}
145
146func newBackend(bcfg BackendConfig) *backend {
147 bopts := &bolt.Options{}
148 if boltOpenOptions != nil {
149 *bopts = *boltOpenOptions
150 }
151 bopts.InitialMmapSize = bcfg.mmapSize()
152 bopts.FreelistType = bcfg.BackendFreelistType
153
154 db, err := bolt.Open(bcfg.Path, 0600, bopts)
155 if err != nil {
156 if bcfg.Logger != nil {
157 bcfg.Logger.Panic("failed to open database", zap.String("path", bcfg.Path), zap.Error(err))
158 } else {
159 plog.Panicf("cannot open database at %s (%v)", bcfg.Path, err)
160 }
161 }
162
163 // In future, may want to make buffering optional for low-concurrency systems
164 // or dynamically swap between buffered/non-buffered depending on workload.
165 b := &backend{
166 db: db,
167
168 batchInterval: bcfg.BatchInterval,
169 batchLimit: bcfg.BatchLimit,
170
171 readTx: &readTx{
172 buf: txReadBuffer{
173 txBuffer: txBuffer{make(map[string]*bucketBuffer)},
174 },
175 buckets: make(map[string]*bolt.Bucket),
176 txWg: new(sync.WaitGroup),
177 },
178
179 stopc: make(chan struct{}),
180 donec: make(chan struct{}),
181
182 lg: bcfg.Logger,
183 }
184 b.batchTx = newBatchTxBuffered(b)
185 go b.run()
186 return b
187}
188
189// BatchTx returns the current batch tx in coalescer. The tx can be used for read and
190// write operations. The write result can be retrieved within the same tx immediately.
191// The write result is isolated with other txs until the current one get committed.
192func (b *backend) BatchTx() BatchTx {
193 return b.batchTx
194}
195
196func (b *backend) ReadTx() ReadTx { return b.readTx }
197
198// ConcurrentReadTx creates and returns a new ReadTx, which:
199// A) creates and keeps a copy of backend.readTx.txReadBuffer,
200// B) references the boltdb read Tx (and its bucket cache) of current batch interval.
201func (b *backend) ConcurrentReadTx() ReadTx {
202 b.readTx.RLock()
203 defer b.readTx.RUnlock()
204 // prevent boltdb read Tx from been rolled back until store read Tx is done. Needs to be called when holding readTx.RLock().
205 b.readTx.txWg.Add(1)
206 // TODO: might want to copy the read buffer lazily - create copy when A) end of a write transaction B) end of a batch interval.
207 return &concurrentReadTx{
208 buf: b.readTx.buf.unsafeCopy(),
209 tx: b.readTx.tx,
210 txMu: &b.readTx.txMu,
211 buckets: b.readTx.buckets,
212 txWg: b.readTx.txWg,
213 }
214}
215
216// ForceCommit forces the current batching tx to commit.
217func (b *backend) ForceCommit() {
218 b.batchTx.Commit()
219}
220
221func (b *backend) Snapshot() Snapshot {
222 b.batchTx.Commit()
223
224 b.mu.RLock()
225 defer b.mu.RUnlock()
226 tx, err := b.db.Begin(false)
227 if err != nil {
228 if b.lg != nil {
229 b.lg.Fatal("failed to begin tx", zap.Error(err))
230 } else {
231 plog.Fatalf("cannot begin tx (%s)", err)
232 }
233 }
234
235 stopc, donec := make(chan struct{}), make(chan struct{})
236 dbBytes := tx.Size()
237 go func() {
238 defer close(donec)
239 // sendRateBytes is based on transferring snapshot data over a 1 gigabit/s connection
240 // assuming a min tcp throughput of 100MB/s.
241 var sendRateBytes int64 = 100 * 1024 * 1014
242 warningTimeout := time.Duration(int64((float64(dbBytes) / float64(sendRateBytes)) * float64(time.Second)))
243 if warningTimeout < minSnapshotWarningTimeout {
244 warningTimeout = minSnapshotWarningTimeout
245 }
246 start := time.Now()
247 ticker := time.NewTicker(warningTimeout)
248 defer ticker.Stop()
249 for {
250 select {
251 case <-ticker.C:
252 if b.lg != nil {
253 b.lg.Warn(
254 "snapshotting taking too long to transfer",
255 zap.Duration("taking", time.Since(start)),
256 zap.Int64("bytes", dbBytes),
257 zap.String("size", humanize.Bytes(uint64(dbBytes))),
258 )
259 } else {
260 plog.Warningf("snapshotting is taking more than %v seconds to finish transferring %v MB [started at %v]", time.Since(start).Seconds(), float64(dbBytes)/float64(1024*1014), start)
261 }
262
263 case <-stopc:
264 snapshotTransferSec.Observe(time.Since(start).Seconds())
265 return
266 }
267 }
268 }()
269
270 return &snapshot{tx, stopc, donec}
271}
272
273type IgnoreKey struct {
274 Bucket string
275 Key string
276}
277
278func (b *backend) Hash(ignores map[IgnoreKey]struct{}) (uint32, error) {
279 h := crc32.New(crc32.MakeTable(crc32.Castagnoli))
280
281 b.mu.RLock()
282 defer b.mu.RUnlock()
283 err := b.db.View(func(tx *bolt.Tx) error {
284 c := tx.Cursor()
285 for next, _ := c.First(); next != nil; next, _ = c.Next() {
286 b := tx.Bucket(next)
287 if b == nil {
288 return fmt.Errorf("cannot get hash of bucket %s", string(next))
289 }
290 h.Write(next)
291 b.ForEach(func(k, v []byte) error {
292 bk := IgnoreKey{Bucket: string(next), Key: string(k)}
293 if _, ok := ignores[bk]; !ok {
294 h.Write(k)
295 h.Write(v)
296 }
297 return nil
298 })
299 }
300 return nil
301 })
302
303 if err != nil {
304 return 0, err
305 }
306
307 return h.Sum32(), nil
308}
309
310func (b *backend) Size() int64 {
311 return atomic.LoadInt64(&b.size)
312}
313
314func (b *backend) SizeInUse() int64 {
315 return atomic.LoadInt64(&b.sizeInUse)
316}
317
318func (b *backend) run() {
319 defer close(b.donec)
320 t := time.NewTimer(b.batchInterval)
321 defer t.Stop()
322 for {
323 select {
324 case <-t.C:
325 case <-b.stopc:
326 b.batchTx.CommitAndStop()
327 return
328 }
329 if b.batchTx.safePending() != 0 {
330 b.batchTx.Commit()
331 }
332 t.Reset(b.batchInterval)
333 }
334}
335
336func (b *backend) Close() error {
337 close(b.stopc)
338 <-b.donec
339 return b.db.Close()
340}
341
342// Commits returns total number of commits since start
343func (b *backend) Commits() int64 {
344 return atomic.LoadInt64(&b.commits)
345}
346
347func (b *backend) Defrag() error {
348 return b.defrag()
349}
350
351func (b *backend) defrag() error {
352 now := time.Now()
353
354 // TODO: make this non-blocking?
355 // lock batchTx to ensure nobody is using previous tx, and then
356 // close previous ongoing tx.
357 b.batchTx.Lock()
358 defer b.batchTx.Unlock()
359
360 // lock database after lock tx to avoid deadlock.
361 b.mu.Lock()
362 defer b.mu.Unlock()
363
364 // block concurrent read requests while resetting tx
365 b.readTx.Lock()
366 defer b.readTx.Unlock()
367
368 b.batchTx.unsafeCommit(true)
369
370 b.batchTx.tx = nil
371
372 tmpdb, err := bolt.Open(b.db.Path()+".tmp", 0600, boltOpenOptions)
373 if err != nil {
374 return err
375 }
376
377 dbp := b.db.Path()
378 tdbp := tmpdb.Path()
379 size1, sizeInUse1 := b.Size(), b.SizeInUse()
380 if b.lg != nil {
381 b.lg.Info(
382 "defragmenting",
383 zap.String("path", dbp),
384 zap.Int64("current-db-size-bytes", size1),
385 zap.String("current-db-size", humanize.Bytes(uint64(size1))),
386 zap.Int64("current-db-size-in-use-bytes", sizeInUse1),
387 zap.String("current-db-size-in-use", humanize.Bytes(uint64(sizeInUse1))),
388 )
389 }
390
391 err = defragdb(b.db, tmpdb, defragLimit)
392 if err != nil {
393 tmpdb.Close()
394 os.RemoveAll(tmpdb.Path())
395 return err
396 }
397
398 err = b.db.Close()
399 if err != nil {
400 if b.lg != nil {
401 b.lg.Fatal("failed to close database", zap.Error(err))
402 } else {
403 plog.Fatalf("cannot close database (%s)", err)
404 }
405 }
406 err = tmpdb.Close()
407 if err != nil {
408 if b.lg != nil {
409 b.lg.Fatal("failed to close tmp database", zap.Error(err))
410 } else {
411 plog.Fatalf("cannot close database (%s)", err)
412 }
413 }
414 err = os.Rename(tdbp, dbp)
415 if err != nil {
416 if b.lg != nil {
417 b.lg.Fatal("failed to rename tmp database", zap.Error(err))
418 } else {
419 plog.Fatalf("cannot rename database (%s)", err)
420 }
421 }
422
423 b.db, err = bolt.Open(dbp, 0600, boltOpenOptions)
424 if err != nil {
425 if b.lg != nil {
426 b.lg.Fatal("failed to open database", zap.String("path", dbp), zap.Error(err))
427 } else {
428 plog.Panicf("cannot open database at %s (%v)", dbp, err)
429 }
430 }
431 b.batchTx.tx = b.unsafeBegin(true)
432
433 b.readTx.reset()
434 b.readTx.tx = b.unsafeBegin(false)
435
436 size := b.readTx.tx.Size()
437 db := b.readTx.tx.DB()
438 atomic.StoreInt64(&b.size, size)
439 atomic.StoreInt64(&b.sizeInUse, size-(int64(db.Stats().FreePageN)*int64(db.Info().PageSize)))
440
441 took := time.Since(now)
442 defragSec.Observe(took.Seconds())
443
444 size2, sizeInUse2 := b.Size(), b.SizeInUse()
445 if b.lg != nil {
446 b.lg.Info(
447 "defragmented",
448 zap.String("path", dbp),
449 zap.Int64("current-db-size-bytes-diff", size2-size1),
450 zap.Int64("current-db-size-bytes", size2),
451 zap.String("current-db-size", humanize.Bytes(uint64(size2))),
452 zap.Int64("current-db-size-in-use-bytes-diff", sizeInUse2-sizeInUse1),
453 zap.Int64("current-db-size-in-use-bytes", sizeInUse2),
454 zap.String("current-db-size-in-use", humanize.Bytes(uint64(sizeInUse2))),
455 zap.Duration("took", took),
456 )
457 }
458 return nil
459}
460
461func defragdb(odb, tmpdb *bolt.DB, limit int) error {
462 // open a tx on tmpdb for writes
463 tmptx, err := tmpdb.Begin(true)
464 if err != nil {
465 return err
466 }
467
468 // open a tx on old db for read
469 tx, err := odb.Begin(false)
470 if err != nil {
471 return err
472 }
473 defer tx.Rollback()
474
475 c := tx.Cursor()
476
477 count := 0
478 for next, _ := c.First(); next != nil; next, _ = c.Next() {
479 b := tx.Bucket(next)
480 if b == nil {
481 return fmt.Errorf("backend: cannot defrag bucket %s", string(next))
482 }
483
484 tmpb, berr := tmptx.CreateBucketIfNotExists(next)
485 if berr != nil {
486 return berr
487 }
488 tmpb.FillPercent = 0.9 // for seq write in for each
489
490 b.ForEach(func(k, v []byte) error {
491 count++
492 if count > limit {
493 err = tmptx.Commit()
494 if err != nil {
495 return err
496 }
497 tmptx, err = tmpdb.Begin(true)
498 if err != nil {
499 return err
500 }
501 tmpb = tmptx.Bucket(next)
502 tmpb.FillPercent = 0.9 // for seq write in for each
503
504 count = 0
505 }
506 return tmpb.Put(k, v)
507 })
508 }
509
510 return tmptx.Commit()
511}
512
513func (b *backend) begin(write bool) *bolt.Tx {
514 b.mu.RLock()
515 tx := b.unsafeBegin(write)
516 b.mu.RUnlock()
517
518 size := tx.Size()
519 db := tx.DB()
520 stats := db.Stats()
521 atomic.StoreInt64(&b.size, size)
522 atomic.StoreInt64(&b.sizeInUse, size-(int64(stats.FreePageN)*int64(db.Info().PageSize)))
523 atomic.StoreInt64(&b.openReadTxN, int64(stats.OpenTxN))
524
525 return tx
526}
527
528func (b *backend) unsafeBegin(write bool) *bolt.Tx {
529 tx, err := b.db.Begin(write)
530 if err != nil {
531 if b.lg != nil {
532 b.lg.Fatal("failed to begin tx", zap.Error(err))
533 } else {
534 plog.Fatalf("cannot begin tx (%s)", err)
535 }
536 }
537 return tx
538}
539
540func (b *backend) OpenReadTxN() int64 {
541 return atomic.LoadInt64(&b.openReadTxN)
542}
543
544// NewTmpBackend creates a backend implementation for testing.
545func NewTmpBackend(batchInterval time.Duration, batchLimit int) (*backend, string) {
546 dir, err := ioutil.TempDir(os.TempDir(), "etcd_backend_test")
547 if err != nil {
548 panic(err)
549 }
550 tmpPath := filepath.Join(dir, "database")
551 bcfg := DefaultBackendConfig()
552 bcfg.Path, bcfg.BatchInterval, bcfg.BatchLimit = tmpPath, batchInterval, batchLimit
553 return newBackend(bcfg), tmpPath
554}
555
556func NewDefaultTmpBackend() (*backend, string) {
557 return NewTmpBackend(defaultBatchInterval, defaultBatchLimit)
558}
559
560type snapshot struct {
561 *bolt.Tx
562 stopc chan struct{}
563 donec chan struct{}
564}
565
566func (s *snapshot) Close() error {
567 close(s.stopc)
568 <-s.donec
569 return s.Tx.Rollback()
570}