blob: 13937cdbf612a504fd756bac1246ac3d9ebea1fb [file] [log] [blame]
khenaidood948f772021-08-11 17:49:24 -04001package bbolt
2
3import (
4 "fmt"
5 "io"
6 "os"
7 "reflect"
8 "sort"
9 "strings"
10 "time"
11 "unsafe"
12)
13
14// txid represents the internal transaction identifier.
15type txid uint64
16
17// Tx represents a read-only or read/write transaction on the database.
18// Read-only transactions can be used for retrieving values for keys and creating cursors.
19// Read/write transactions can create and remove buckets and create and remove keys.
20//
21// IMPORTANT: You must commit or rollback transactions when you are done with
22// them. Pages can not be reclaimed by the writer until no more transactions
23// are using them. A long running read transaction can cause the database to
24// quickly grow.
25type Tx struct {
26 writable bool
27 managed bool
28 db *DB
29 meta *meta
30 root Bucket
31 pages map[pgid]*page
32 stats TxStats
33 commitHandlers []func()
34
35 // WriteFlag specifies the flag for write-related methods like WriteTo().
36 // Tx opens the database file with the specified flag to copy the data.
37 //
38 // By default, the flag is unset, which works well for mostly in-memory
39 // workloads. For databases that are much larger than available RAM,
40 // set the flag to syscall.O_DIRECT to avoid trashing the page cache.
41 WriteFlag int
42}
43
44// init initializes the transaction.
45func (tx *Tx) init(db *DB) {
46 tx.db = db
47 tx.pages = nil
48
49 // Copy the meta page since it can be changed by the writer.
50 tx.meta = &meta{}
51 db.meta().copy(tx.meta)
52
53 // Copy over the root bucket.
54 tx.root = newBucket(tx)
55 tx.root.bucket = &bucket{}
56 *tx.root.bucket = tx.meta.root
57
58 // Increment the transaction id and add a page cache for writable transactions.
59 if tx.writable {
60 tx.pages = make(map[pgid]*page)
61 tx.meta.txid += txid(1)
62 }
63}
64
65// ID returns the transaction id.
66func (tx *Tx) ID() int {
67 return int(tx.meta.txid)
68}
69
70// DB returns a reference to the database that created the transaction.
71func (tx *Tx) DB() *DB {
72 return tx.db
73}
74
75// Size returns current database size in bytes as seen by this transaction.
76func (tx *Tx) Size() int64 {
77 return int64(tx.meta.pgid) * int64(tx.db.pageSize)
78}
79
80// Writable returns whether the transaction can perform write operations.
81func (tx *Tx) Writable() bool {
82 return tx.writable
83}
84
85// Cursor creates a cursor associated with the root bucket.
86// All items in the cursor will return a nil value because all root bucket keys point to buckets.
87// The cursor is only valid as long as the transaction is open.
88// Do not use a cursor after the transaction is closed.
89func (tx *Tx) Cursor() *Cursor {
90 return tx.root.Cursor()
91}
92
93// Stats retrieves a copy of the current transaction statistics.
94func (tx *Tx) Stats() TxStats {
95 return tx.stats
96}
97
98// Bucket retrieves a bucket by name.
99// Returns nil if the bucket does not exist.
100// The bucket instance is only valid for the lifetime of the transaction.
101func (tx *Tx) Bucket(name []byte) *Bucket {
102 return tx.root.Bucket(name)
103}
104
105// CreateBucket creates a new bucket.
106// Returns an error if the bucket already exists, if the bucket name is blank, or if the bucket name is too long.
107// The bucket instance is only valid for the lifetime of the transaction.
108func (tx *Tx) CreateBucket(name []byte) (*Bucket, error) {
109 return tx.root.CreateBucket(name)
110}
111
112// CreateBucketIfNotExists creates a new bucket if it doesn't already exist.
113// Returns an error if the bucket name is blank, or if the bucket name is too long.
114// The bucket instance is only valid for the lifetime of the transaction.
115func (tx *Tx) CreateBucketIfNotExists(name []byte) (*Bucket, error) {
116 return tx.root.CreateBucketIfNotExists(name)
117}
118
119// DeleteBucket deletes a bucket.
120// Returns an error if the bucket cannot be found or if the key represents a non-bucket value.
121func (tx *Tx) DeleteBucket(name []byte) error {
122 return tx.root.DeleteBucket(name)
123}
124
125// ForEach executes a function for each bucket in the root.
126// If the provided function returns an error then the iteration is stopped and
127// the error is returned to the caller.
128func (tx *Tx) ForEach(fn func(name []byte, b *Bucket) error) error {
129 return tx.root.ForEach(func(k, v []byte) error {
130 return fn(k, tx.root.Bucket(k))
131 })
132}
133
134// OnCommit adds a handler function to be executed after the transaction successfully commits.
135func (tx *Tx) OnCommit(fn func()) {
136 tx.commitHandlers = append(tx.commitHandlers, fn)
137}
138
139// Commit writes all changes to disk and updates the meta page.
140// Returns an error if a disk write error occurs, or if Commit is
141// called on a read-only transaction.
142func (tx *Tx) Commit() error {
143 _assert(!tx.managed, "managed tx commit not allowed")
144 if tx.db == nil {
145 return ErrTxClosed
146 } else if !tx.writable {
147 return ErrTxNotWritable
148 }
149
150 // TODO(benbjohnson): Use vectorized I/O to write out dirty pages.
151
152 // Rebalance nodes which have had deletions.
153 var startTime = time.Now()
154 tx.root.rebalance()
155 if tx.stats.Rebalance > 0 {
156 tx.stats.RebalanceTime += time.Since(startTime)
157 }
158
159 // spill data onto dirty pages.
160 startTime = time.Now()
161 if err := tx.root.spill(); err != nil {
162 tx.rollback()
163 return err
164 }
165 tx.stats.SpillTime += time.Since(startTime)
166
167 // Free the old root bucket.
168 tx.meta.root.root = tx.root.root
169
170 // Free the old freelist because commit writes out a fresh freelist.
171 if tx.meta.freelist != pgidNoFreelist {
172 tx.db.freelist.free(tx.meta.txid, tx.db.page(tx.meta.freelist))
173 }
174
175 if !tx.db.NoFreelistSync {
176 err := tx.commitFreelist()
177 if err != nil {
178 return err
179 }
180 } else {
181 tx.meta.freelist = pgidNoFreelist
182 }
183
184 // Write dirty pages to disk.
185 startTime = time.Now()
186 if err := tx.write(); err != nil {
187 tx.rollback()
188 return err
189 }
190
191 // If strict mode is enabled then perform a consistency check.
192 // Only the first consistency error is reported in the panic.
193 if tx.db.StrictMode {
194 ch := tx.Check()
195 var errs []string
196 for {
197 err, ok := <-ch
198 if !ok {
199 break
200 }
201 errs = append(errs, err.Error())
202 }
203 if len(errs) > 0 {
204 panic("check fail: " + strings.Join(errs, "\n"))
205 }
206 }
207
208 // Write meta to disk.
209 if err := tx.writeMeta(); err != nil {
210 tx.rollback()
211 return err
212 }
213 tx.stats.WriteTime += time.Since(startTime)
214
215 // Finalize the transaction.
216 tx.close()
217
218 // Execute commit handlers now that the locks have been removed.
219 for _, fn := range tx.commitHandlers {
220 fn()
221 }
222
223 return nil
224}
225
226func (tx *Tx) commitFreelist() error {
227 // Allocate new pages for the new free list. This will overestimate
228 // the size of the freelist but not underestimate the size (which would be bad).
229 opgid := tx.meta.pgid
230 p, err := tx.allocate((tx.db.freelist.size() / tx.db.pageSize) + 1)
231 if err != nil {
232 tx.rollback()
233 return err
234 }
235 if err := tx.db.freelist.write(p); err != nil {
236 tx.rollback()
237 return err
238 }
239 tx.meta.freelist = p.id
240 // If the high water mark has moved up then attempt to grow the database.
241 if tx.meta.pgid > opgid {
242 if err := tx.db.grow(int(tx.meta.pgid+1) * tx.db.pageSize); err != nil {
243 tx.rollback()
244 return err
245 }
246 }
247
248 return nil
249}
250
251// Rollback closes the transaction and ignores all previous updates. Read-only
252// transactions must be rolled back and not committed.
253func (tx *Tx) Rollback() error {
254 _assert(!tx.managed, "managed tx rollback not allowed")
255 if tx.db == nil {
256 return ErrTxClosed
257 }
258 tx.nonPhysicalRollback()
259 return nil
260}
261
262// nonPhysicalRollback is called when user calls Rollback directly, in this case we do not need to reload the free pages from disk.
263func (tx *Tx) nonPhysicalRollback() {
264 if tx.db == nil {
265 return
266 }
267 if tx.writable {
268 tx.db.freelist.rollback(tx.meta.txid)
269 }
270 tx.close()
271}
272
273// rollback needs to reload the free pages from disk in case some system error happens like fsync error.
274func (tx *Tx) rollback() {
275 if tx.db == nil {
276 return
277 }
278 if tx.writable {
279 tx.db.freelist.rollback(tx.meta.txid)
280 if !tx.db.hasSyncedFreelist() {
281 // Reconstruct free page list by scanning the DB to get the whole free page list.
282 // Note: scaning the whole db is heavy if your db size is large in NoSyncFreeList mode.
283 tx.db.freelist.noSyncReload(tx.db.freepages())
284 } else {
285 // Read free page list from freelist page.
286 tx.db.freelist.reload(tx.db.page(tx.db.meta().freelist))
287 }
288 }
289 tx.close()
290}
291
292func (tx *Tx) close() {
293 if tx.db == nil {
294 return
295 }
296 if tx.writable {
297 // Grab freelist stats.
298 var freelistFreeN = tx.db.freelist.free_count()
299 var freelistPendingN = tx.db.freelist.pending_count()
300 var freelistAlloc = tx.db.freelist.size()
301
302 // Remove transaction ref & writer lock.
303 tx.db.rwtx = nil
304 tx.db.rwlock.Unlock()
305
306 // Merge statistics.
307 tx.db.statlock.Lock()
308 tx.db.stats.FreePageN = freelistFreeN
309 tx.db.stats.PendingPageN = freelistPendingN
310 tx.db.stats.FreeAlloc = (freelistFreeN + freelistPendingN) * tx.db.pageSize
311 tx.db.stats.FreelistInuse = freelistAlloc
312 tx.db.stats.TxStats.add(&tx.stats)
313 tx.db.statlock.Unlock()
314 } else {
315 tx.db.removeTx(tx)
316 }
317
318 // Clear all references.
319 tx.db = nil
320 tx.meta = nil
321 tx.root = Bucket{tx: tx}
322 tx.pages = nil
323}
324
325// Copy writes the entire database to a writer.
326// This function exists for backwards compatibility.
327//
328// Deprecated; Use WriteTo() instead.
329func (tx *Tx) Copy(w io.Writer) error {
330 _, err := tx.WriteTo(w)
331 return err
332}
333
334// WriteTo writes the entire database to a writer.
335// If err == nil then exactly tx.Size() bytes will be written into the writer.
336func (tx *Tx) WriteTo(w io.Writer) (n int64, err error) {
337 // Attempt to open reader with WriteFlag
338 f, err := tx.db.openFile(tx.db.path, os.O_RDONLY|tx.WriteFlag, 0)
339 if err != nil {
340 return 0, err
341 }
342 defer func() {
343 if cerr := f.Close(); err == nil {
344 err = cerr
345 }
346 }()
347
348 // Generate a meta page. We use the same page data for both meta pages.
349 buf := make([]byte, tx.db.pageSize)
350 page := (*page)(unsafe.Pointer(&buf[0]))
351 page.flags = metaPageFlag
352 *page.meta() = *tx.meta
353
354 // Write meta 0.
355 page.id = 0
356 page.meta().checksum = page.meta().sum64()
357 nn, err := w.Write(buf)
358 n += int64(nn)
359 if err != nil {
360 return n, fmt.Errorf("meta 0 copy: %s", err)
361 }
362
363 // Write meta 1 with a lower transaction id.
364 page.id = 1
365 page.meta().txid -= 1
366 page.meta().checksum = page.meta().sum64()
367 nn, err = w.Write(buf)
368 n += int64(nn)
369 if err != nil {
370 return n, fmt.Errorf("meta 1 copy: %s", err)
371 }
372
373 // Move past the meta pages in the file.
374 if _, err := f.Seek(int64(tx.db.pageSize*2), io.SeekStart); err != nil {
375 return n, fmt.Errorf("seek: %s", err)
376 }
377
378 // Copy data pages.
379 wn, err := io.CopyN(w, f, tx.Size()-int64(tx.db.pageSize*2))
380 n += wn
381 if err != nil {
382 return n, err
383 }
384
385 return n, nil
386}
387
388// CopyFile copies the entire database to file at the given path.
389// A reader transaction is maintained during the copy so it is safe to continue
390// using the database while a copy is in progress.
391func (tx *Tx) CopyFile(path string, mode os.FileMode) error {
392 f, err := tx.db.openFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, mode)
393 if err != nil {
394 return err
395 }
396
397 err = tx.Copy(f)
398 if err != nil {
399 _ = f.Close()
400 return err
401 }
402 return f.Close()
403}
404
405// Check performs several consistency checks on the database for this transaction.
406// An error is returned if any inconsistency is found.
407//
408// It can be safely run concurrently on a writable transaction. However, this
409// incurs a high cost for large databases and databases with a lot of subbuckets
410// because of caching. This overhead can be removed if running on a read-only
411// transaction, however, it is not safe to execute other writer transactions at
412// the same time.
413func (tx *Tx) Check() <-chan error {
414 ch := make(chan error)
415 go tx.check(ch)
416 return ch
417}
418
419func (tx *Tx) check(ch chan error) {
420 // Force loading free list if opened in ReadOnly mode.
421 tx.db.loadFreelist()
422
423 // Check if any pages are double freed.
424 freed := make(map[pgid]bool)
425 all := make([]pgid, tx.db.freelist.count())
426 tx.db.freelist.copyall(all)
427 for _, id := range all {
428 if freed[id] {
429 ch <- fmt.Errorf("page %d: already freed", id)
430 }
431 freed[id] = true
432 }
433
434 // Track every reachable page.
435 reachable := make(map[pgid]*page)
436 reachable[0] = tx.page(0) // meta0
437 reachable[1] = tx.page(1) // meta1
438 if tx.meta.freelist != pgidNoFreelist {
439 for i := uint32(0); i <= tx.page(tx.meta.freelist).overflow; i++ {
440 reachable[tx.meta.freelist+pgid(i)] = tx.page(tx.meta.freelist)
441 }
442 }
443
444 // Recursively check buckets.
445 tx.checkBucket(&tx.root, reachable, freed, ch)
446
447 // Ensure all pages below high water mark are either reachable or freed.
448 for i := pgid(0); i < tx.meta.pgid; i++ {
449 _, isReachable := reachable[i]
450 if !isReachable && !freed[i] {
451 ch <- fmt.Errorf("page %d: unreachable unfreed", int(i))
452 }
453 }
454
455 // Close the channel to signal completion.
456 close(ch)
457}
458
459func (tx *Tx) checkBucket(b *Bucket, reachable map[pgid]*page, freed map[pgid]bool, ch chan error) {
460 // Ignore inline buckets.
461 if b.root == 0 {
462 return
463 }
464
465 // Check every page used by this bucket.
466 b.tx.forEachPage(b.root, 0, func(p *page, _ int) {
467 if p.id > tx.meta.pgid {
468 ch <- fmt.Errorf("page %d: out of bounds: %d", int(p.id), int(b.tx.meta.pgid))
469 }
470
471 // Ensure each page is only referenced once.
472 for i := pgid(0); i <= pgid(p.overflow); i++ {
473 var id = p.id + i
474 if _, ok := reachable[id]; ok {
475 ch <- fmt.Errorf("page %d: multiple references", int(id))
476 }
477 reachable[id] = p
478 }
479
480 // We should only encounter un-freed leaf and branch pages.
481 if freed[p.id] {
482 ch <- fmt.Errorf("page %d: reachable freed", int(p.id))
483 } else if (p.flags&branchPageFlag) == 0 && (p.flags&leafPageFlag) == 0 {
484 ch <- fmt.Errorf("page %d: invalid type: %s", int(p.id), p.typ())
485 }
486 })
487
488 // Check each bucket within this bucket.
489 _ = b.ForEach(func(k, v []byte) error {
490 if child := b.Bucket(k); child != nil {
491 tx.checkBucket(child, reachable, freed, ch)
492 }
493 return nil
494 })
495}
496
497// allocate returns a contiguous block of memory starting at a given page.
498func (tx *Tx) allocate(count int) (*page, error) {
499 p, err := tx.db.allocate(tx.meta.txid, count)
500 if err != nil {
501 return nil, err
502 }
503
504 // Save to our page cache.
505 tx.pages[p.id] = p
506
507 // Update statistics.
508 tx.stats.PageCount += count
509 tx.stats.PageAlloc += count * tx.db.pageSize
510
511 return p, nil
512}
513
514// write writes any dirty pages to disk.
515func (tx *Tx) write() error {
516 // Sort pages by id.
517 pages := make(pages, 0, len(tx.pages))
518 for _, p := range tx.pages {
519 pages = append(pages, p)
520 }
521 // Clear out page cache early.
522 tx.pages = make(map[pgid]*page)
523 sort.Sort(pages)
524
525 // Write pages to disk in order.
526 for _, p := range pages {
527 size := (int(p.overflow) + 1) * tx.db.pageSize
528 offset := int64(p.id) * int64(tx.db.pageSize)
529
530 // Write out page in "max allocation" sized chunks.
531 ptr := uintptr(unsafe.Pointer(p))
532 for {
533 // Limit our write to our max allocation size.
534 sz := size
535 if sz > maxAllocSize-1 {
536 sz = maxAllocSize - 1
537 }
538
539 // Write chunk to disk.
540 buf := *(*[]byte)(unsafe.Pointer(&reflect.SliceHeader{
541 Data: ptr,
542 Len: sz,
543 Cap: sz,
544 }))
545 if _, err := tx.db.ops.writeAt(buf, offset); err != nil {
546 return err
547 }
548
549 // Update statistics.
550 tx.stats.Write++
551
552 // Exit inner for loop if we've written all the chunks.
553 size -= sz
554 if size == 0 {
555 break
556 }
557
558 // Otherwise move offset forward and move pointer to next chunk.
559 offset += int64(sz)
560 ptr += uintptr(sz)
561 }
562 }
563
564 // Ignore file sync if flag is set on DB.
565 if !tx.db.NoSync || IgnoreNoSync {
566 if err := fdatasync(tx.db); err != nil {
567 return err
568 }
569 }
570
571 // Put small pages back to page pool.
572 for _, p := range pages {
573 // Ignore page sizes over 1 page.
574 // These are allocated using make() instead of the page pool.
575 if int(p.overflow) != 0 {
576 continue
577 }
578
579 buf := *(*[]byte)(unsafe.Pointer(&reflect.SliceHeader{
580 Data: uintptr(unsafe.Pointer(p)),
581 Len: tx.db.pageSize,
582 Cap: tx.db.pageSize,
583 }))
584
585 // See https://go.googlesource.com/go/+/f03c9202c43e0abb130669852082117ca50aa9b1
586 for i := range buf {
587 buf[i] = 0
588 }
589 tx.db.pagePool.Put(buf)
590 }
591
592 return nil
593}
594
595// writeMeta writes the meta to the disk.
596func (tx *Tx) writeMeta() error {
597 // Create a temporary buffer for the meta page.
598 buf := make([]byte, tx.db.pageSize)
599 p := tx.db.pageInBuffer(buf, 0)
600 tx.meta.write(p)
601
602 // Write the meta page to file.
603 if _, err := tx.db.ops.writeAt(buf, int64(p.id)*int64(tx.db.pageSize)); err != nil {
604 return err
605 }
606 if !tx.db.NoSync || IgnoreNoSync {
607 if err := fdatasync(tx.db); err != nil {
608 return err
609 }
610 }
611
612 // Update statistics.
613 tx.stats.Write++
614
615 return nil
616}
617
618// page returns a reference to the page with a given id.
619// If page has been written to then a temporary buffered page is returned.
620func (tx *Tx) page(id pgid) *page {
621 // Check the dirty pages first.
622 if tx.pages != nil {
623 if p, ok := tx.pages[id]; ok {
624 return p
625 }
626 }
627
628 // Otherwise return directly from the mmap.
629 return tx.db.page(id)
630}
631
632// forEachPage iterates over every page within a given page and executes a function.
633func (tx *Tx) forEachPage(pgid pgid, depth int, fn func(*page, int)) {
634 p := tx.page(pgid)
635
636 // Execute function.
637 fn(p, depth)
638
639 // Recursively loop over children.
640 if (p.flags & branchPageFlag) != 0 {
641 for i := 0; i < int(p.count); i++ {
642 elem := p.branchPageElement(uint16(i))
643 tx.forEachPage(elem.pgid, depth+1, fn)
644 }
645 }
646}
647
648// Page returns page information for a given page number.
649// This is only safe for concurrent use when used by a writable transaction.
650func (tx *Tx) Page(id int) (*PageInfo, error) {
651 if tx.db == nil {
652 return nil, ErrTxClosed
653 } else if pgid(id) >= tx.meta.pgid {
654 return nil, nil
655 }
656
657 // Build the page info.
658 p := tx.db.page(pgid(id))
659 info := &PageInfo{
660 ID: id,
661 Count: int(p.count),
662 OverflowCount: int(p.overflow),
663 }
664
665 // Determine the type (or if it's free).
666 if tx.db.freelist.freed(pgid(id)) {
667 info.Type = "free"
668 } else {
669 info.Type = p.typ()
670 }
671
672 return info, nil
673}
674
675// TxStats represents statistics about the actions performed by the transaction.
676type TxStats struct {
677 // Page statistics.
678 PageCount int // number of page allocations
679 PageAlloc int // total bytes allocated
680
681 // Cursor statistics.
682 CursorCount int // number of cursors created
683
684 // Node statistics
685 NodeCount int // number of node allocations
686 NodeDeref int // number of node dereferences
687
688 // Rebalance statistics.
689 Rebalance int // number of node rebalances
690 RebalanceTime time.Duration // total time spent rebalancing
691
692 // Split/Spill statistics.
693 Split int // number of nodes split
694 Spill int // number of nodes spilled
695 SpillTime time.Duration // total time spent spilling
696
697 // Write statistics.
698 Write int // number of writes performed
699 WriteTime time.Duration // total time spent writing to disk
700}
701
702func (s *TxStats) add(other *TxStats) {
703 s.PageCount += other.PageCount
704 s.PageAlloc += other.PageAlloc
705 s.CursorCount += other.CursorCount
706 s.NodeCount += other.NodeCount
707 s.NodeDeref += other.NodeDeref
708 s.Rebalance += other.Rebalance
709 s.RebalanceTime += other.RebalanceTime
710 s.Split += other.Split
711 s.Spill += other.Spill
712 s.SpillTime += other.SpillTime
713 s.Write += other.Write
714 s.WriteTime += other.WriteTime
715}
716
717// Sub calculates and returns the difference between two sets of transaction stats.
718// This is useful when obtaining stats at two different points and time and
719// you need the performance counters that occurred within that time span.
720func (s *TxStats) Sub(other *TxStats) TxStats {
721 var diff TxStats
722 diff.PageCount = s.PageCount - other.PageCount
723 diff.PageAlloc = s.PageAlloc - other.PageAlloc
724 diff.CursorCount = s.CursorCount - other.CursorCount
725 diff.NodeCount = s.NodeCount - other.NodeCount
726 diff.NodeDeref = s.NodeDeref - other.NodeDeref
727 diff.Rebalance = s.Rebalance - other.Rebalance
728 diff.RebalanceTime = s.RebalanceTime - other.RebalanceTime
729 diff.Split = s.Split - other.Split
730 diff.Spill = s.Spill - other.Spill
731 diff.SpillTime = s.SpillTime - other.SpillTime
732 diff.Write = s.Write - other.Write
733 diff.WriteTime = s.WriteTime - other.WriteTime
734 return diff
735}