| // Copyright 2015 The etcd Authors |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package backend |
| |
| import ( |
| "fmt" |
| "hash/crc32" |
| "io" |
| "io/ioutil" |
| "os" |
| "path/filepath" |
| "sync" |
| "sync/atomic" |
| "time" |
| |
| "github.com/coreos/pkg/capnslog" |
| humanize "github.com/dustin/go-humanize" |
| bolt "go.etcd.io/bbolt" |
| "go.uber.org/zap" |
| ) |
| |
| var ( |
| defaultBatchLimit = 10000 |
| defaultBatchInterval = 100 * time.Millisecond |
| |
| defragLimit = 10000 |
| |
| // initialMmapSize is the initial size of the mmapped region. Setting this larger than |
| // the potential max db size can prevent writer from blocking reader. |
| // This only works for linux. |
| initialMmapSize = uint64(10 * 1024 * 1024 * 1024) |
| |
| plog = capnslog.NewPackageLogger("go.etcd.io/etcd", "mvcc/backend") |
| |
| // minSnapshotWarningTimeout is the minimum threshold to trigger a long running snapshot warning. |
| minSnapshotWarningTimeout = 30 * time.Second |
| ) |
| |
| type Backend interface { |
| // ReadTx returns a read transaction. It is replaced by ConcurrentReadTx in the main data path, see #10523. |
| ReadTx() ReadTx |
| BatchTx() BatchTx |
| // ConcurrentReadTx returns a non-blocking read transaction. |
| ConcurrentReadTx() ReadTx |
| |
| Snapshot() Snapshot |
| Hash(ignores map[IgnoreKey]struct{}) (uint32, error) |
| // Size returns the current size of the backend physically allocated. |
| // The backend can hold DB space that is not utilized at the moment, |
| // since it can conduct pre-allocation or spare unused space for recycling. |
| // Use SizeInUse() instead for the actual DB size. |
| Size() int64 |
| // SizeInUse returns the current size of the backend logically in use. |
| // Since the backend can manage free space in a non-byte unit such as |
| // number of pages, the returned value can be not exactly accurate in bytes. |
| SizeInUse() int64 |
| // OpenReadTxN returns the number of currently open read transactions in the backend. |
| OpenReadTxN() int64 |
| Defrag() error |
| ForceCommit() |
| Close() error |
| } |
| |
| type Snapshot interface { |
| // Size gets the size of the snapshot. |
| Size() int64 |
| // WriteTo writes the snapshot into the given writer. |
| WriteTo(w io.Writer) (n int64, err error) |
| // Close closes the snapshot. |
| Close() error |
| } |
| |
| type backend struct { |
| // size and commits are used with atomic operations so they must be |
| // 64-bit aligned, otherwise 32-bit tests will crash |
| |
| // size is the number of bytes allocated in the backend |
| size int64 |
| // sizeInUse is the number of bytes actually used in the backend |
| sizeInUse int64 |
| // commits counts number of commits since start |
| commits int64 |
| // openReadTxN is the number of currently open read transactions in the backend |
| openReadTxN int64 |
| |
| mu sync.RWMutex |
| db *bolt.DB |
| |
| batchInterval time.Duration |
| batchLimit int |
| batchTx *batchTxBuffered |
| |
| readTx *readTx |
| |
| stopc chan struct{} |
| donec chan struct{} |
| |
| lg *zap.Logger |
| } |
| |
| type BackendConfig struct { |
| // Path is the file path to the backend file. |
| Path string |
| // BatchInterval is the maximum time before flushing the BatchTx. |
| BatchInterval time.Duration |
| // BatchLimit is the maximum puts before flushing the BatchTx. |
| BatchLimit int |
| // BackendFreelistType is the backend boltdb's freelist type. |
| BackendFreelistType bolt.FreelistType |
| // MmapSize is the number of bytes to mmap for the backend. |
| MmapSize uint64 |
| // Logger logs backend-side operations. |
| Logger *zap.Logger |
| } |
| |
| func DefaultBackendConfig() BackendConfig { |
| return BackendConfig{ |
| BatchInterval: defaultBatchInterval, |
| BatchLimit: defaultBatchLimit, |
| MmapSize: initialMmapSize, |
| } |
| } |
| |
| func New(bcfg BackendConfig) Backend { |
| return newBackend(bcfg) |
| } |
| |
| func NewDefaultBackend(path string) Backend { |
| bcfg := DefaultBackendConfig() |
| bcfg.Path = path |
| return newBackend(bcfg) |
| } |
| |
| func newBackend(bcfg BackendConfig) *backend { |
| bopts := &bolt.Options{} |
| if boltOpenOptions != nil { |
| *bopts = *boltOpenOptions |
| } |
| bopts.InitialMmapSize = bcfg.mmapSize() |
| bopts.FreelistType = bcfg.BackendFreelistType |
| |
| db, err := bolt.Open(bcfg.Path, 0600, bopts) |
| if err != nil { |
| if bcfg.Logger != nil { |
| bcfg.Logger.Panic("failed to open database", zap.String("path", bcfg.Path), zap.Error(err)) |
| } else { |
| plog.Panicf("cannot open database at %s (%v)", bcfg.Path, err) |
| } |
| } |
| |
| // In future, may want to make buffering optional for low-concurrency systems |
| // or dynamically swap between buffered/non-buffered depending on workload. |
| b := &backend{ |
| db: db, |
| |
| batchInterval: bcfg.BatchInterval, |
| batchLimit: bcfg.BatchLimit, |
| |
| readTx: &readTx{ |
| buf: txReadBuffer{ |
| txBuffer: txBuffer{make(map[string]*bucketBuffer)}, |
| }, |
| buckets: make(map[string]*bolt.Bucket), |
| txWg: new(sync.WaitGroup), |
| }, |
| |
| stopc: make(chan struct{}), |
| donec: make(chan struct{}), |
| |
| lg: bcfg.Logger, |
| } |
| b.batchTx = newBatchTxBuffered(b) |
| go b.run() |
| return b |
| } |
| |
| // BatchTx returns the current batch tx in coalescer. The tx can be used for read and |
| // write operations. The write result can be retrieved within the same tx immediately. |
| // The write result is isolated with other txs until the current one get committed. |
| func (b *backend) BatchTx() BatchTx { |
| return b.batchTx |
| } |
| |
| func (b *backend) ReadTx() ReadTx { return b.readTx } |
| |
| // ConcurrentReadTx creates and returns a new ReadTx, which: |
| // A) creates and keeps a copy of backend.readTx.txReadBuffer, |
| // B) references the boltdb read Tx (and its bucket cache) of current batch interval. |
| func (b *backend) ConcurrentReadTx() ReadTx { |
| b.readTx.RLock() |
| defer b.readTx.RUnlock() |
| // prevent boltdb read Tx from been rolled back until store read Tx is done. Needs to be called when holding readTx.RLock(). |
| b.readTx.txWg.Add(1) |
| // TODO: might want to copy the read buffer lazily - create copy when A) end of a write transaction B) end of a batch interval. |
| return &concurrentReadTx{ |
| buf: b.readTx.buf.unsafeCopy(), |
| tx: b.readTx.tx, |
| txMu: &b.readTx.txMu, |
| buckets: b.readTx.buckets, |
| txWg: b.readTx.txWg, |
| } |
| } |
| |
| // ForceCommit forces the current batching tx to commit. |
| func (b *backend) ForceCommit() { |
| b.batchTx.Commit() |
| } |
| |
| func (b *backend) Snapshot() Snapshot { |
| b.batchTx.Commit() |
| |
| b.mu.RLock() |
| defer b.mu.RUnlock() |
| tx, err := b.db.Begin(false) |
| if err != nil { |
| if b.lg != nil { |
| b.lg.Fatal("failed to begin tx", zap.Error(err)) |
| } else { |
| plog.Fatalf("cannot begin tx (%s)", err) |
| } |
| } |
| |
| stopc, donec := make(chan struct{}), make(chan struct{}) |
| dbBytes := tx.Size() |
| go func() { |
| defer close(donec) |
| // sendRateBytes is based on transferring snapshot data over a 1 gigabit/s connection |
| // assuming a min tcp throughput of 100MB/s. |
| var sendRateBytes int64 = 100 * 1024 * 1014 |
| warningTimeout := time.Duration(int64((float64(dbBytes) / float64(sendRateBytes)) * float64(time.Second))) |
| if warningTimeout < minSnapshotWarningTimeout { |
| warningTimeout = minSnapshotWarningTimeout |
| } |
| start := time.Now() |
| ticker := time.NewTicker(warningTimeout) |
| defer ticker.Stop() |
| for { |
| select { |
| case <-ticker.C: |
| if b.lg != nil { |
| b.lg.Warn( |
| "snapshotting taking too long to transfer", |
| zap.Duration("taking", time.Since(start)), |
| zap.Int64("bytes", dbBytes), |
| zap.String("size", humanize.Bytes(uint64(dbBytes))), |
| ) |
| } else { |
| plog.Warningf("snapshotting is taking more than %v seconds to finish transferring %v MB [started at %v]", time.Since(start).Seconds(), float64(dbBytes)/float64(1024*1014), start) |
| } |
| |
| case <-stopc: |
| snapshotTransferSec.Observe(time.Since(start).Seconds()) |
| return |
| } |
| } |
| }() |
| |
| return &snapshot{tx, stopc, donec} |
| } |
| |
| type IgnoreKey struct { |
| Bucket string |
| Key string |
| } |
| |
| func (b *backend) Hash(ignores map[IgnoreKey]struct{}) (uint32, error) { |
| h := crc32.New(crc32.MakeTable(crc32.Castagnoli)) |
| |
| b.mu.RLock() |
| defer b.mu.RUnlock() |
| err := b.db.View(func(tx *bolt.Tx) error { |
| c := tx.Cursor() |
| for next, _ := c.First(); next != nil; next, _ = c.Next() { |
| b := tx.Bucket(next) |
| if b == nil { |
| return fmt.Errorf("cannot get hash of bucket %s", string(next)) |
| } |
| h.Write(next) |
| b.ForEach(func(k, v []byte) error { |
| bk := IgnoreKey{Bucket: string(next), Key: string(k)} |
| if _, ok := ignores[bk]; !ok { |
| h.Write(k) |
| h.Write(v) |
| } |
| return nil |
| }) |
| } |
| return nil |
| }) |
| |
| if err != nil { |
| return 0, err |
| } |
| |
| return h.Sum32(), nil |
| } |
| |
| func (b *backend) Size() int64 { |
| return atomic.LoadInt64(&b.size) |
| } |
| |
| func (b *backend) SizeInUse() int64 { |
| return atomic.LoadInt64(&b.sizeInUse) |
| } |
| |
| func (b *backend) run() { |
| defer close(b.donec) |
| t := time.NewTimer(b.batchInterval) |
| defer t.Stop() |
| for { |
| select { |
| case <-t.C: |
| case <-b.stopc: |
| b.batchTx.CommitAndStop() |
| return |
| } |
| if b.batchTx.safePending() != 0 { |
| b.batchTx.Commit() |
| } |
| t.Reset(b.batchInterval) |
| } |
| } |
| |
| func (b *backend) Close() error { |
| close(b.stopc) |
| <-b.donec |
| return b.db.Close() |
| } |
| |
| // Commits returns total number of commits since start |
| func (b *backend) Commits() int64 { |
| return atomic.LoadInt64(&b.commits) |
| } |
| |
| func (b *backend) Defrag() error { |
| return b.defrag() |
| } |
| |
| func (b *backend) defrag() error { |
| now := time.Now() |
| |
| // TODO: make this non-blocking? |
| // lock batchTx to ensure nobody is using previous tx, and then |
| // close previous ongoing tx. |
| b.batchTx.Lock() |
| defer b.batchTx.Unlock() |
| |
| // lock database after lock tx to avoid deadlock. |
| b.mu.Lock() |
| defer b.mu.Unlock() |
| |
| // block concurrent read requests while resetting tx |
| b.readTx.Lock() |
| defer b.readTx.Unlock() |
| |
| b.batchTx.unsafeCommit(true) |
| |
| b.batchTx.tx = nil |
| |
| tmpdb, err := bolt.Open(b.db.Path()+".tmp", 0600, boltOpenOptions) |
| if err != nil { |
| return err |
| } |
| |
| dbp := b.db.Path() |
| tdbp := tmpdb.Path() |
| size1, sizeInUse1 := b.Size(), b.SizeInUse() |
| if b.lg != nil { |
| b.lg.Info( |
| "defragmenting", |
| zap.String("path", dbp), |
| zap.Int64("current-db-size-bytes", size1), |
| zap.String("current-db-size", humanize.Bytes(uint64(size1))), |
| zap.Int64("current-db-size-in-use-bytes", sizeInUse1), |
| zap.String("current-db-size-in-use", humanize.Bytes(uint64(sizeInUse1))), |
| ) |
| } |
| |
| err = defragdb(b.db, tmpdb, defragLimit) |
| if err != nil { |
| tmpdb.Close() |
| os.RemoveAll(tmpdb.Path()) |
| return err |
| } |
| |
| err = b.db.Close() |
| if err != nil { |
| if b.lg != nil { |
| b.lg.Fatal("failed to close database", zap.Error(err)) |
| } else { |
| plog.Fatalf("cannot close database (%s)", err) |
| } |
| } |
| err = tmpdb.Close() |
| if err != nil { |
| if b.lg != nil { |
| b.lg.Fatal("failed to close tmp database", zap.Error(err)) |
| } else { |
| plog.Fatalf("cannot close database (%s)", err) |
| } |
| } |
| err = os.Rename(tdbp, dbp) |
| if err != nil { |
| if b.lg != nil { |
| b.lg.Fatal("failed to rename tmp database", zap.Error(err)) |
| } else { |
| plog.Fatalf("cannot rename database (%s)", err) |
| } |
| } |
| |
| b.db, err = bolt.Open(dbp, 0600, boltOpenOptions) |
| if err != nil { |
| if b.lg != nil { |
| b.lg.Fatal("failed to open database", zap.String("path", dbp), zap.Error(err)) |
| } else { |
| plog.Panicf("cannot open database at %s (%v)", dbp, err) |
| } |
| } |
| b.batchTx.tx = b.unsafeBegin(true) |
| |
| b.readTx.reset() |
| b.readTx.tx = b.unsafeBegin(false) |
| |
| size := b.readTx.tx.Size() |
| db := b.readTx.tx.DB() |
| atomic.StoreInt64(&b.size, size) |
| atomic.StoreInt64(&b.sizeInUse, size-(int64(db.Stats().FreePageN)*int64(db.Info().PageSize))) |
| |
| took := time.Since(now) |
| defragSec.Observe(took.Seconds()) |
| |
| size2, sizeInUse2 := b.Size(), b.SizeInUse() |
| if b.lg != nil { |
| b.lg.Info( |
| "defragmented", |
| zap.String("path", dbp), |
| zap.Int64("current-db-size-bytes-diff", size2-size1), |
| zap.Int64("current-db-size-bytes", size2), |
| zap.String("current-db-size", humanize.Bytes(uint64(size2))), |
| zap.Int64("current-db-size-in-use-bytes-diff", sizeInUse2-sizeInUse1), |
| zap.Int64("current-db-size-in-use-bytes", sizeInUse2), |
| zap.String("current-db-size-in-use", humanize.Bytes(uint64(sizeInUse2))), |
| zap.Duration("took", took), |
| ) |
| } |
| return nil |
| } |
| |
| func defragdb(odb, tmpdb *bolt.DB, limit int) error { |
| // open a tx on tmpdb for writes |
| tmptx, err := tmpdb.Begin(true) |
| if err != nil { |
| return err |
| } |
| |
| // open a tx on old db for read |
| tx, err := odb.Begin(false) |
| if err != nil { |
| return err |
| } |
| defer tx.Rollback() |
| |
| c := tx.Cursor() |
| |
| count := 0 |
| for next, _ := c.First(); next != nil; next, _ = c.Next() { |
| b := tx.Bucket(next) |
| if b == nil { |
| return fmt.Errorf("backend: cannot defrag bucket %s", string(next)) |
| } |
| |
| tmpb, berr := tmptx.CreateBucketIfNotExists(next) |
| if berr != nil { |
| return berr |
| } |
| tmpb.FillPercent = 0.9 // for seq write in for each |
| |
| b.ForEach(func(k, v []byte) error { |
| count++ |
| if count > limit { |
| err = tmptx.Commit() |
| if err != nil { |
| return err |
| } |
| tmptx, err = tmpdb.Begin(true) |
| if err != nil { |
| return err |
| } |
| tmpb = tmptx.Bucket(next) |
| tmpb.FillPercent = 0.9 // for seq write in for each |
| |
| count = 0 |
| } |
| return tmpb.Put(k, v) |
| }) |
| } |
| |
| return tmptx.Commit() |
| } |
| |
| func (b *backend) begin(write bool) *bolt.Tx { |
| b.mu.RLock() |
| tx := b.unsafeBegin(write) |
| b.mu.RUnlock() |
| |
| size := tx.Size() |
| db := tx.DB() |
| stats := db.Stats() |
| atomic.StoreInt64(&b.size, size) |
| atomic.StoreInt64(&b.sizeInUse, size-(int64(stats.FreePageN)*int64(db.Info().PageSize))) |
| atomic.StoreInt64(&b.openReadTxN, int64(stats.OpenTxN)) |
| |
| return tx |
| } |
| |
| func (b *backend) unsafeBegin(write bool) *bolt.Tx { |
| tx, err := b.db.Begin(write) |
| if err != nil { |
| if b.lg != nil { |
| b.lg.Fatal("failed to begin tx", zap.Error(err)) |
| } else { |
| plog.Fatalf("cannot begin tx (%s)", err) |
| } |
| } |
| return tx |
| } |
| |
| func (b *backend) OpenReadTxN() int64 { |
| return atomic.LoadInt64(&b.openReadTxN) |
| } |
| |
| // NewTmpBackend creates a backend implementation for testing. |
| func NewTmpBackend(batchInterval time.Duration, batchLimit int) (*backend, string) { |
| dir, err := ioutil.TempDir(os.TempDir(), "etcd_backend_test") |
| if err != nil { |
| panic(err) |
| } |
| tmpPath := filepath.Join(dir, "database") |
| bcfg := DefaultBackendConfig() |
| bcfg.Path, bcfg.BatchInterval, bcfg.BatchLimit = tmpPath, batchInterval, batchLimit |
| return newBackend(bcfg), tmpPath |
| } |
| |
| func NewDefaultTmpBackend() (*backend, string) { |
| return NewTmpBackend(defaultBatchInterval, defaultBatchLimit) |
| } |
| |
| type snapshot struct { |
| *bolt.Tx |
| stopc chan struct{} |
| donec chan struct{} |
| } |
| |
| func (s *snapshot) Close() error { |
| close(s.stopc) |
| <-s.donec |
| return s.Tx.Rollback() |
| } |