blob: 2229d9ce1cb9922d3cccd142105d466ae1f7d5b8 [file] [log] [blame]
khenaidoo26721882021-08-11 17:42:52 -04001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package backend
16
17import (
18 "fmt"
19 "hash/crc32"
20 "io"
21 "io/ioutil"
22 "os"
23 "path/filepath"
24 "sync"
25 "sync/atomic"
26 "time"
27
28 bolt "github.com/coreos/bbolt"
29 "github.com/coreos/pkg/capnslog"
30)
31
32var (
33 defaultBatchLimit = 10000
34 defaultBatchInterval = 100 * time.Millisecond
35
36 defragLimit = 10000
37
38 // initialMmapSize is the initial size of the mmapped region. Setting this larger than
39 // the potential max db size can prevent writer from blocking reader.
40 // This only works for linux.
41 initialMmapSize = uint64(10 * 1024 * 1024 * 1024)
42
43 plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "mvcc/backend")
44
45 // minSnapshotWarningTimeout is the minimum threshold to trigger a long running snapshot warning.
46 minSnapshotWarningTimeout = time.Duration(30 * time.Second)
47)
48
49type Backend interface {
50 ReadTx() ReadTx
51 BatchTx() BatchTx
52
53 Snapshot() Snapshot
54 Hash(ignores map[IgnoreKey]struct{}) (uint32, error)
55 // Size returns the current size of the backend.
56 Size() int64
57 // SizeInUse returns the current size of the backend logically in use.
58 // Since the backend can manage free space in a non-byte unit such as
59 // number of pages, the returned value can be not exactly accurate in bytes.
60 SizeInUse() int64
61 Defrag() error
62 ForceCommit()
63 Close() error
64}
65
66type Snapshot interface {
67 // Size gets the size of the snapshot.
68 Size() int64
69 // WriteTo writes the snapshot into the given writer.
70 WriteTo(w io.Writer) (n int64, err error)
71 // Close closes the snapshot.
72 Close() error
73}
74
75type backend struct {
76 // size and commits are used with atomic operations so they must be
77 // 64-bit aligned, otherwise 32-bit tests will crash
78
79 // size is the number of bytes in the backend
80 size int64
81
82 // sizeInUse is the number of bytes actually used in the backend
83 sizeInUse int64
84
85 // commits counts number of commits since start
86 commits int64
87
88 mu sync.RWMutex
89 db *bolt.DB
90
91 batchInterval time.Duration
92 batchLimit int
93 batchTx *batchTxBuffered
94
95 readTx *readTx
96
97 stopc chan struct{}
98 donec chan struct{}
99}
100
101type BackendConfig struct {
102 // Path is the file path to the backend file.
103 Path string
104 // BatchInterval is the maximum time before flushing the BatchTx.
105 BatchInterval time.Duration
106 // BatchLimit is the maximum puts before flushing the BatchTx.
107 BatchLimit int
108 // MmapSize is the number of bytes to mmap for the backend.
109 MmapSize uint64
110}
111
112func DefaultBackendConfig() BackendConfig {
113 return BackendConfig{
114 BatchInterval: defaultBatchInterval,
115 BatchLimit: defaultBatchLimit,
116 MmapSize: initialMmapSize,
117 }
118}
119
120func New(bcfg BackendConfig) Backend {
121 return newBackend(bcfg)
122}
123
124func NewDefaultBackend(path string) Backend {
125 bcfg := DefaultBackendConfig()
126 bcfg.Path = path
127 return newBackend(bcfg)
128}
129
130func newBackend(bcfg BackendConfig) *backend {
131 bopts := &bolt.Options{}
132 if boltOpenOptions != nil {
133 *bopts = *boltOpenOptions
134 }
135 bopts.InitialMmapSize = bcfg.mmapSize()
136
137 db, err := bolt.Open(bcfg.Path, 0600, bopts)
138 if err != nil {
139 plog.Panicf("cannot open database at %s (%v)", bcfg.Path, err)
140 }
141
142 // In future, may want to make buffering optional for low-concurrency systems
143 // or dynamically swap between buffered/non-buffered depending on workload.
144 b := &backend{
145 db: db,
146
147 batchInterval: bcfg.BatchInterval,
148 batchLimit: bcfg.BatchLimit,
149
150 readTx: &readTx{
151 buf: txReadBuffer{
152 txBuffer: txBuffer{make(map[string]*bucketBuffer)},
153 },
154 buckets: make(map[string]*bolt.Bucket),
155 },
156
157 stopc: make(chan struct{}),
158 donec: make(chan struct{}),
159 }
160 b.batchTx = newBatchTxBuffered(b)
161 go b.run()
162 return b
163}
164
165// BatchTx returns the current batch tx in coalescer. The tx can be used for read and
166// write operations. The write result can be retrieved within the same tx immediately.
167// The write result is isolated with other txs until the current one get committed.
168func (b *backend) BatchTx() BatchTx {
169 return b.batchTx
170}
171
172func (b *backend) ReadTx() ReadTx { return b.readTx }
173
174// ForceCommit forces the current batching tx to commit.
175func (b *backend) ForceCommit() {
176 b.batchTx.Commit()
177}
178
179func (b *backend) Snapshot() Snapshot {
180 b.batchTx.Commit()
181
182 b.mu.RLock()
183 defer b.mu.RUnlock()
184 tx, err := b.db.Begin(false)
185 if err != nil {
186 plog.Fatalf("cannot begin tx (%s)", err)
187 }
188
189 stopc, donec := make(chan struct{}), make(chan struct{})
190 dbBytes := tx.Size()
191 go func() {
192 defer close(donec)
193 // sendRateBytes is based on transferring snapshot data over a 1 gigabit/s connection
194 // assuming a min tcp throughput of 100MB/s.
195 var sendRateBytes int64 = 100 * 1024 * 1014
196 warningTimeout := time.Duration(int64((float64(dbBytes) / float64(sendRateBytes)) * float64(time.Second)))
197 if warningTimeout < minSnapshotWarningTimeout {
198 warningTimeout = minSnapshotWarningTimeout
199 }
200 start := time.Now()
201 ticker := time.NewTicker(warningTimeout)
202 defer ticker.Stop()
203 for {
204 select {
205 case <-ticker.C:
206 plog.Warningf("snapshotting is taking more than %v seconds to finish transferring %v MB [started at %v]", time.Since(start).Seconds(), float64(dbBytes)/float64(1024*1014), start)
207 case <-stopc:
208 snapshotDurations.Observe(time.Since(start).Seconds())
209 return
210 }
211 }
212 }()
213
214 return &snapshot{tx, stopc, donec}
215}
216
217type IgnoreKey struct {
218 Bucket string
219 Key string
220}
221
222func (b *backend) Hash(ignores map[IgnoreKey]struct{}) (uint32, error) {
223 h := crc32.New(crc32.MakeTable(crc32.Castagnoli))
224
225 b.mu.RLock()
226 defer b.mu.RUnlock()
227 err := b.db.View(func(tx *bolt.Tx) error {
228 c := tx.Cursor()
229 for next, _ := c.First(); next != nil; next, _ = c.Next() {
230 b := tx.Bucket(next)
231 if b == nil {
232 return fmt.Errorf("cannot get hash of bucket %s", string(next))
233 }
234 h.Write(next)
235 b.ForEach(func(k, v []byte) error {
236 bk := IgnoreKey{Bucket: string(next), Key: string(k)}
237 if _, ok := ignores[bk]; !ok {
238 h.Write(k)
239 h.Write(v)
240 }
241 return nil
242 })
243 }
244 return nil
245 })
246
247 if err != nil {
248 return 0, err
249 }
250
251 return h.Sum32(), nil
252}
253
254func (b *backend) Size() int64 {
255 return atomic.LoadInt64(&b.size)
256}
257
258func (b *backend) SizeInUse() int64 {
259 return atomic.LoadInt64(&b.sizeInUse)
260}
261
262func (b *backend) run() {
263 defer close(b.donec)
264 t := time.NewTimer(b.batchInterval)
265 defer t.Stop()
266 for {
267 select {
268 case <-t.C:
269 case <-b.stopc:
270 b.batchTx.CommitAndStop()
271 return
272 }
273 b.batchTx.Commit()
274 t.Reset(b.batchInterval)
275 }
276}
277
278func (b *backend) Close() error {
279 close(b.stopc)
280 <-b.donec
281 return b.db.Close()
282}
283
284// Commits returns total number of commits since start
285func (b *backend) Commits() int64 {
286 return atomic.LoadInt64(&b.commits)
287}
288
289func (b *backend) Defrag() error {
290 return b.defrag()
291}
292
293func (b *backend) defrag() error {
294 now := time.Now()
295
296 // TODO: make this non-blocking?
297 // lock batchTx to ensure nobody is using previous tx, and then
298 // close previous ongoing tx.
299 b.batchTx.Lock()
300 defer b.batchTx.Unlock()
301
302 // lock database after lock tx to avoid deadlock.
303 b.mu.Lock()
304 defer b.mu.Unlock()
305
306 // block concurrent read requests while resetting tx
307 b.readTx.mu.Lock()
308 defer b.readTx.mu.Unlock()
309
310 b.batchTx.unsafeCommit(true)
311 b.batchTx.tx = nil
312
313 // Create a temporary file to ensure we start with a clean slate.
314 // Snapshotter.cleanupSnapdir cleans up any of these that are found during startup.
315 dir := filepath.Dir(b.db.Path())
316 temp, err := ioutil.TempFile(dir, "db.tmp.*")
317 if err != nil {
318 return err
319 }
320 options := bolt.Options{}
321 if boltOpenOptions != nil {
322 options = *boltOpenOptions
323 }
324 options.OpenFile = func(path string, i int, mode os.FileMode) (file *os.File, err error) {
325 return temp, nil
326 }
327 tdbp := temp.Name()
328 tmpdb, err := bolt.Open(tdbp, 0600, &options)
329 if err != nil {
330 return err
331 }
332
333 // gofail: var defragBeforeCopy struct{}
334 err = defragdb(b.db, tmpdb, defragLimit)
335
336 if err != nil {
337 tmpdb.Close()
338 if rmErr := os.RemoveAll(tmpdb.Path()); rmErr != nil {
339 plog.Fatalf("failed to remove db.tmp after defragmentation completed: %v", rmErr)
340 }
341 return err
342 }
343
344 dbp := b.db.Path()
345
346 err = b.db.Close()
347 if err != nil {
348 plog.Fatalf("cannot close database (%s)", err)
349 }
350 err = tmpdb.Close()
351 if err != nil {
352 plog.Fatalf("cannot close database (%s)", err)
353 }
354 // gofail: var defragBeforeRename struct{}
355 err = os.Rename(tdbp, dbp)
356 if err != nil {
357 plog.Fatalf("cannot rename database (%s)", err)
358 }
359
360 b.db, err = bolt.Open(dbp, 0600, boltOpenOptions)
361 if err != nil {
362 plog.Panicf("cannot open database at %s (%v)", dbp, err)
363 }
364 b.batchTx.tx, err = b.db.Begin(true)
365 if err != nil {
366 plog.Fatalf("cannot begin tx (%s)", err)
367 }
368
369 b.readTx.reset()
370 b.readTx.tx = b.unsafeBegin(false)
371
372 size := b.readTx.tx.Size()
373 db := b.db
374 atomic.StoreInt64(&b.size, size)
375 atomic.StoreInt64(&b.sizeInUse, size-(int64(db.Stats().FreePageN)*int64(db.Info().PageSize)))
376
377 took := time.Since(now)
378 defragDurations.Observe(took.Seconds())
379
380 return nil
381}
382
383func defragdb(odb, tmpdb *bolt.DB, limit int) error {
384 // open a tx on tmpdb for writes
385 tmptx, err := tmpdb.Begin(true)
386 if err != nil {
387 return err
388 }
389
390 // open a tx on old db for read
391 tx, err := odb.Begin(false)
392 if err != nil {
393 return err
394 }
395 defer tx.Rollback()
396
397 c := tx.Cursor()
398
399 count := 0
400 for next, _ := c.First(); next != nil; next, _ = c.Next() {
401 b := tx.Bucket(next)
402 if b == nil {
403 return fmt.Errorf("backend: cannot defrag bucket %s", string(next))
404 }
405
406 tmpb, berr := tmptx.CreateBucketIfNotExists(next)
407 if berr != nil {
408 return berr
409 }
410 tmpb.FillPercent = 0.9 // for seq write in for each
411
412 b.ForEach(func(k, v []byte) error {
413 count++
414 if count > limit {
415 err = tmptx.Commit()
416 if err != nil {
417 return err
418 }
419 tmptx, err = tmpdb.Begin(true)
420 if err != nil {
421 return err
422 }
423 tmpb = tmptx.Bucket(next)
424 tmpb.FillPercent = 0.9 // for seq write in for each
425
426 count = 0
427 }
428 return tmpb.Put(k, v)
429 })
430 }
431
432 return tmptx.Commit()
433}
434
435func (b *backend) begin(write bool) *bolt.Tx {
436 b.mu.RLock()
437 tx := b.unsafeBegin(write)
438 b.mu.RUnlock()
439
440 size := tx.Size()
441 db := tx.DB()
442 atomic.StoreInt64(&b.size, size)
443 atomic.StoreInt64(&b.sizeInUse, size-(int64(db.Stats().FreePageN)*int64(db.Info().PageSize)))
444
445 return tx
446}
447
448func (b *backend) unsafeBegin(write bool) *bolt.Tx {
449 tx, err := b.db.Begin(write)
450 if err != nil {
451 plog.Fatalf("cannot begin tx (%s)", err)
452 }
453 return tx
454}
455
456// NewTmpBackend creates a backend implementation for testing.
457func NewTmpBackend(batchInterval time.Duration, batchLimit int) (*backend, string) {
458 dir, err := ioutil.TempDir(os.TempDir(), "etcd_backend_test")
459 if err != nil {
460 plog.Fatal(err)
461 }
462 tmpPath := filepath.Join(dir, "database")
463 bcfg := DefaultBackendConfig()
464 bcfg.Path, bcfg.BatchInterval, bcfg.BatchLimit = tmpPath, batchInterval, batchLimit
465 return newBackend(bcfg), tmpPath
466}
467
468func NewDefaultTmpBackend() (*backend, string) {
469 return NewTmpBackend(defaultBatchInterval, defaultBatchLimit)
470}
471
472type snapshot struct {
473 *bolt.Tx
474 stopc chan struct{}
475 donec chan struct{}
476}
477
478func (s *snapshot) Close() error {
479 close(s.stopc)
480 <-s.donec
481 return s.Tx.Rollback()
482}