blob: f7d9e60c2e7d011fae28303ce93c79fe8afe4df9 [file] [log] [blame]
khenaidooffe076b2019-01-15 16:08:08 -05001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package backend
16
17import (
18 "fmt"
19 "hash/crc32"
20 "io"
21 "io/ioutil"
22 "os"
23 "path/filepath"
24 "sync"
25 "sync/atomic"
26 "time"
27
28 bolt "github.com/coreos/bbolt"
29 "github.com/coreos/pkg/capnslog"
30)
31
32var (
33 defaultBatchLimit = 10000
34 defaultBatchInterval = 100 * time.Millisecond
35
36 defragLimit = 10000
37
38 // initialMmapSize is the initial size of the mmapped region. Setting this larger than
39 // the potential max db size can prevent writer from blocking reader.
40 // This only works for linux.
41 initialMmapSize = uint64(10 * 1024 * 1024 * 1024)
42
43 plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "mvcc/backend")
44
45 // minSnapshotWarningTimeout is the minimum threshold to trigger a long running snapshot warning.
46 minSnapshotWarningTimeout = time.Duration(30 * time.Second)
47)
48
49type Backend interface {
50 ReadTx() ReadTx
51 BatchTx() BatchTx
52
53 Snapshot() Snapshot
54 Hash(ignores map[IgnoreKey]struct{}) (uint32, error)
55 // Size returns the current size of the backend.
56 Size() int64
57 // SizeInUse returns the current size of the backend logically in use.
58 // Since the backend can manage free space in a non-byte unit such as
59 // number of pages, the returned value can be not exactly accurate in bytes.
60 SizeInUse() int64
61 Defrag() error
62 ForceCommit()
63 Close() error
64}
65
66type Snapshot interface {
67 // Size gets the size of the snapshot.
68 Size() int64
69 // WriteTo writes the snapshot into the given writer.
70 WriteTo(w io.Writer) (n int64, err error)
71 // Close closes the snapshot.
72 Close() error
73}
74
75type backend struct {
76 // size and commits are used with atomic operations so they must be
77 // 64-bit aligned, otherwise 32-bit tests will crash
78
79 // size is the number of bytes in the backend
80 size int64
81
82 // sizeInUse is the number of bytes actually used in the backend
83 sizeInUse int64
84
85 // commits counts number of commits since start
86 commits int64
87
88 mu sync.RWMutex
89 db *bolt.DB
90
91 batchInterval time.Duration
92 batchLimit int
93 batchTx *batchTxBuffered
94
95 readTx *readTx
96
97 stopc chan struct{}
98 donec chan struct{}
99}
100
101type BackendConfig struct {
102 // Path is the file path to the backend file.
103 Path string
104 // BatchInterval is the maximum time before flushing the BatchTx.
105 BatchInterval time.Duration
106 // BatchLimit is the maximum puts before flushing the BatchTx.
107 BatchLimit int
108 // MmapSize is the number of bytes to mmap for the backend.
109 MmapSize uint64
110}
111
112func DefaultBackendConfig() BackendConfig {
113 return BackendConfig{
114 BatchInterval: defaultBatchInterval,
115 BatchLimit: defaultBatchLimit,
116 MmapSize: initialMmapSize,
117 }
118}
119
120func New(bcfg BackendConfig) Backend {
121 return newBackend(bcfg)
122}
123
124func NewDefaultBackend(path string) Backend {
125 bcfg := DefaultBackendConfig()
126 bcfg.Path = path
127 return newBackend(bcfg)
128}
129
130func newBackend(bcfg BackendConfig) *backend {
131 bopts := &bolt.Options{}
132 if boltOpenOptions != nil {
133 *bopts = *boltOpenOptions
134 }
135 bopts.InitialMmapSize = bcfg.mmapSize()
136
137 db, err := bolt.Open(bcfg.Path, 0600, bopts)
138 if err != nil {
139 plog.Panicf("cannot open database at %s (%v)", bcfg.Path, err)
140 }
141
142 // In future, may want to make buffering optional for low-concurrency systems
143 // or dynamically swap between buffered/non-buffered depending on workload.
144 b := &backend{
145 db: db,
146
147 batchInterval: bcfg.BatchInterval,
148 batchLimit: bcfg.BatchLimit,
149
150 readTx: &readTx{
151 buf: txReadBuffer{
152 txBuffer: txBuffer{make(map[string]*bucketBuffer)},
153 },
154 buckets: make(map[string]*bolt.Bucket),
155 },
156
157 stopc: make(chan struct{}),
158 donec: make(chan struct{}),
159 }
160 b.batchTx = newBatchTxBuffered(b)
161 go b.run()
162 return b
163}
164
165// BatchTx returns the current batch tx in coalescer. The tx can be used for read and
166// write operations. The write result can be retrieved within the same tx immediately.
167// The write result is isolated with other txs until the current one get committed.
168func (b *backend) BatchTx() BatchTx {
169 return b.batchTx
170}
171
172func (b *backend) ReadTx() ReadTx { return b.readTx }
173
174// ForceCommit forces the current batching tx to commit.
175func (b *backend) ForceCommit() {
176 b.batchTx.Commit()
177}
178
179func (b *backend) Snapshot() Snapshot {
180 b.batchTx.Commit()
181
182 b.mu.RLock()
183 defer b.mu.RUnlock()
184 tx, err := b.db.Begin(false)
185 if err != nil {
186 plog.Fatalf("cannot begin tx (%s)", err)
187 }
188
189 stopc, donec := make(chan struct{}), make(chan struct{})
190 dbBytes := tx.Size()
191 go func() {
192 defer close(donec)
193 // sendRateBytes is based on transferring snapshot data over a 1 gigabit/s connection
194 // assuming a min tcp throughput of 100MB/s.
195 var sendRateBytes int64 = 100 * 1024 * 1014
196 warningTimeout := time.Duration(int64((float64(dbBytes) / float64(sendRateBytes)) * float64(time.Second)))
197 if warningTimeout < minSnapshotWarningTimeout {
198 warningTimeout = minSnapshotWarningTimeout
199 }
200 start := time.Now()
201 ticker := time.NewTicker(warningTimeout)
202 defer ticker.Stop()
203 for {
204 select {
205 case <-ticker.C:
206 plog.Warningf("snapshotting is taking more than %v seconds to finish transferring %v MB [started at %v]", time.Since(start).Seconds(), float64(dbBytes)/float64(1024*1014), start)
207 case <-stopc:
208 snapshotDurations.Observe(time.Since(start).Seconds())
209 return
210 }
211 }
212 }()
213
214 return &snapshot{tx, stopc, donec}
215}
216
217type IgnoreKey struct {
218 Bucket string
219 Key string
220}
221
222func (b *backend) Hash(ignores map[IgnoreKey]struct{}) (uint32, error) {
223 h := crc32.New(crc32.MakeTable(crc32.Castagnoli))
224
225 b.mu.RLock()
226 defer b.mu.RUnlock()
227 err := b.db.View(func(tx *bolt.Tx) error {
228 c := tx.Cursor()
229 for next, _ := c.First(); next != nil; next, _ = c.Next() {
230 b := tx.Bucket(next)
231 if b == nil {
232 return fmt.Errorf("cannot get hash of bucket %s", string(next))
233 }
234 h.Write(next)
235 b.ForEach(func(k, v []byte) error {
236 bk := IgnoreKey{Bucket: string(next), Key: string(k)}
237 if _, ok := ignores[bk]; !ok {
238 h.Write(k)
239 h.Write(v)
240 }
241 return nil
242 })
243 }
244 return nil
245 })
246
247 if err != nil {
248 return 0, err
249 }
250
251 return h.Sum32(), nil
252}
253
254func (b *backend) Size() int64 {
255 return atomic.LoadInt64(&b.size)
256}
257
258func (b *backend) SizeInUse() int64 {
259 return atomic.LoadInt64(&b.sizeInUse)
260}
261
262func (b *backend) run() {
263 defer close(b.donec)
264 t := time.NewTimer(b.batchInterval)
265 defer t.Stop()
266 for {
267 select {
268 case <-t.C:
269 case <-b.stopc:
270 b.batchTx.CommitAndStop()
271 return
272 }
273 b.batchTx.Commit()
274 t.Reset(b.batchInterval)
275 }
276}
277
278func (b *backend) Close() error {
279 close(b.stopc)
280 <-b.donec
281 return b.db.Close()
282}
283
284// Commits returns total number of commits since start
285func (b *backend) Commits() int64 {
286 return atomic.LoadInt64(&b.commits)
287}
288
289func (b *backend) Defrag() error {
290 return b.defrag()
291}
292
293func (b *backend) defrag() error {
294 now := time.Now()
295
296 // TODO: make this non-blocking?
297 // lock batchTx to ensure nobody is using previous tx, and then
298 // close previous ongoing tx.
299 b.batchTx.Lock()
300 defer b.batchTx.Unlock()
301
302 // lock database after lock tx to avoid deadlock.
303 b.mu.Lock()
304 defer b.mu.Unlock()
305
306 // block concurrent read requests while resetting tx
307 b.readTx.mu.Lock()
308 defer b.readTx.mu.Unlock()
309
310 b.batchTx.unsafeCommit(true)
311 b.batchTx.tx = nil
312
313 tmpdb, err := bolt.Open(b.db.Path()+".tmp", 0600, boltOpenOptions)
314 if err != nil {
315 return err
316 }
317
318 err = defragdb(b.db, tmpdb, defragLimit)
319
320 if err != nil {
321 tmpdb.Close()
322 os.RemoveAll(tmpdb.Path())
323 return err
324 }
325
326 dbp := b.db.Path()
327 tdbp := tmpdb.Path()
328
329 err = b.db.Close()
330 if err != nil {
331 plog.Fatalf("cannot close database (%s)", err)
332 }
333 err = tmpdb.Close()
334 if err != nil {
335 plog.Fatalf("cannot close database (%s)", err)
336 }
337 err = os.Rename(tdbp, dbp)
338 if err != nil {
339 plog.Fatalf("cannot rename database (%s)", err)
340 }
341
342 b.db, err = bolt.Open(dbp, 0600, boltOpenOptions)
343 if err != nil {
344 plog.Panicf("cannot open database at %s (%v)", dbp, err)
345 }
346 b.batchTx.tx, err = b.db.Begin(true)
347 if err != nil {
348 plog.Fatalf("cannot begin tx (%s)", err)
349 }
350
351 b.readTx.reset()
352 b.readTx.tx = b.unsafeBegin(false)
353
354 size := b.readTx.tx.Size()
355 db := b.db
356 atomic.StoreInt64(&b.size, size)
357 atomic.StoreInt64(&b.sizeInUse, size-(int64(db.Stats().FreePageN)*int64(db.Info().PageSize)))
358
359 took := time.Since(now)
360 defragDurations.Observe(took.Seconds())
361
362 return nil
363}
364
365func defragdb(odb, tmpdb *bolt.DB, limit int) error {
366 // open a tx on tmpdb for writes
367 tmptx, err := tmpdb.Begin(true)
368 if err != nil {
369 return err
370 }
371
372 // open a tx on old db for read
373 tx, err := odb.Begin(false)
374 if err != nil {
375 return err
376 }
377 defer tx.Rollback()
378
379 c := tx.Cursor()
380
381 count := 0
382 for next, _ := c.First(); next != nil; next, _ = c.Next() {
383 b := tx.Bucket(next)
384 if b == nil {
385 return fmt.Errorf("backend: cannot defrag bucket %s", string(next))
386 }
387
388 tmpb, berr := tmptx.CreateBucketIfNotExists(next)
389 if berr != nil {
390 return berr
391 }
392 tmpb.FillPercent = 0.9 // for seq write in for each
393
394 b.ForEach(func(k, v []byte) error {
395 count++
396 if count > limit {
397 err = tmptx.Commit()
398 if err != nil {
399 return err
400 }
401 tmptx, err = tmpdb.Begin(true)
402 if err != nil {
403 return err
404 }
405 tmpb = tmptx.Bucket(next)
406 tmpb.FillPercent = 0.9 // for seq write in for each
407
408 count = 0
409 }
410 return tmpb.Put(k, v)
411 })
412 }
413
414 return tmptx.Commit()
415}
416
417func (b *backend) begin(write bool) *bolt.Tx {
418 b.mu.RLock()
419 tx := b.unsafeBegin(write)
420 b.mu.RUnlock()
421
422 size := tx.Size()
423 db := tx.DB()
424 atomic.StoreInt64(&b.size, size)
425 atomic.StoreInt64(&b.sizeInUse, size-(int64(db.Stats().FreePageN)*int64(db.Info().PageSize)))
426
427 return tx
428}
429
430func (b *backend) unsafeBegin(write bool) *bolt.Tx {
431 tx, err := b.db.Begin(write)
432 if err != nil {
433 plog.Fatalf("cannot begin tx (%s)", err)
434 }
435 return tx
436}
437
438// NewTmpBackend creates a backend implementation for testing.
439func NewTmpBackend(batchInterval time.Duration, batchLimit int) (*backend, string) {
440 dir, err := ioutil.TempDir(os.TempDir(), "etcd_backend_test")
441 if err != nil {
442 plog.Fatal(err)
443 }
444 tmpPath := filepath.Join(dir, "database")
445 bcfg := DefaultBackendConfig()
446 bcfg.Path, bcfg.BatchInterval, bcfg.BatchLimit = tmpPath, batchInterval, batchLimit
447 return newBackend(bcfg), tmpPath
448}
449
450func NewDefaultTmpBackend() (*backend, string) {
451 return NewTmpBackend(defaultBatchInterval, defaultBatchLimit)
452}
453
454type snapshot struct {
455 *bolt.Tx
456 stopc chan struct{}
457 donec chan struct{}
458}
459
460func (s *snapshot) Close() error {
461 close(s.stopc)
462 <-s.donec
463 return s.Tx.Rollback()
464}