blob: 1cfb4cde855597a1c9fe68c8996bc62da5602c1e [file] [log] [blame]
khenaidooffe076b2019-01-15 16:08:08 -05001package bolt
2
3import (
4 "fmt"
5 "io"
6 "os"
7 "sort"
8 "strings"
9 "time"
10 "unsafe"
11)
12
13// txid represents the internal transaction identifier.
14type txid uint64
15
16// Tx represents a read-only or read/write transaction on the database.
17// Read-only transactions can be used for retrieving values for keys and creating cursors.
18// Read/write transactions can create and remove buckets and create and remove keys.
19//
20// IMPORTANT: You must commit or rollback transactions when you are done with
21// them. Pages can not be reclaimed by the writer until no more transactions
22// are using them. A long running read transaction can cause the database to
23// quickly grow.
24type Tx struct {
25 writable bool
26 managed bool
27 db *DB
28 meta *meta
29 root Bucket
30 pages map[pgid]*page
31 stats TxStats
32 commitHandlers []func()
33
34 // WriteFlag specifies the flag for write-related methods like WriteTo().
35 // Tx opens the database file with the specified flag to copy the data.
36 //
37 // By default, the flag is unset, which works well for mostly in-memory
38 // workloads. For databases that are much larger than available RAM,
39 // set the flag to syscall.O_DIRECT to avoid trashing the page cache.
40 WriteFlag int
41}
42
43// init initializes the transaction.
44func (tx *Tx) init(db *DB) {
45 tx.db = db
46 tx.pages = nil
47
48 // Copy the meta page since it can be changed by the writer.
49 tx.meta = &meta{}
50 db.meta().copy(tx.meta)
51
52 // Copy over the root bucket.
53 tx.root = newBucket(tx)
54 tx.root.bucket = &bucket{}
55 *tx.root.bucket = tx.meta.root
56
57 // Increment the transaction id and add a page cache for writable transactions.
58 if tx.writable {
59 tx.pages = make(map[pgid]*page)
60 tx.meta.txid += txid(1)
61 }
62}
63
64// ID returns the transaction id.
65func (tx *Tx) ID() int {
66 return int(tx.meta.txid)
67}
68
69// DB returns a reference to the database that created the transaction.
70func (tx *Tx) DB() *DB {
71 return tx.db
72}
73
74// Size returns current database size in bytes as seen by this transaction.
75func (tx *Tx) Size() int64 {
76 return int64(tx.meta.pgid) * int64(tx.db.pageSize)
77}
78
79// Writable returns whether the transaction can perform write operations.
80func (tx *Tx) Writable() bool {
81 return tx.writable
82}
83
84// Cursor creates a cursor associated with the root bucket.
85// All items in the cursor will return a nil value because all root bucket keys point to buckets.
86// The cursor is only valid as long as the transaction is open.
87// Do not use a cursor after the transaction is closed.
88func (tx *Tx) Cursor() *Cursor {
89 return tx.root.Cursor()
90}
91
92// Stats retrieves a copy of the current transaction statistics.
93func (tx *Tx) Stats() TxStats {
94 return tx.stats
95}
96
97// Bucket retrieves a bucket by name.
98// Returns nil if the bucket does not exist.
99// The bucket instance is only valid for the lifetime of the transaction.
100func (tx *Tx) Bucket(name []byte) *Bucket {
101 return tx.root.Bucket(name)
102}
103
104// CreateBucket creates a new bucket.
105// Returns an error if the bucket already exists, if the bucket name is blank, or if the bucket name is too long.
106// The bucket instance is only valid for the lifetime of the transaction.
107func (tx *Tx) CreateBucket(name []byte) (*Bucket, error) {
108 return tx.root.CreateBucket(name)
109}
110
111// CreateBucketIfNotExists creates a new bucket if it doesn't already exist.
112// Returns an error if the bucket name is blank, or if the bucket name is too long.
113// The bucket instance is only valid for the lifetime of the transaction.
114func (tx *Tx) CreateBucketIfNotExists(name []byte) (*Bucket, error) {
115 return tx.root.CreateBucketIfNotExists(name)
116}
117
118// DeleteBucket deletes a bucket.
119// Returns an error if the bucket cannot be found or if the key represents a non-bucket value.
120func (tx *Tx) DeleteBucket(name []byte) error {
121 return tx.root.DeleteBucket(name)
122}
123
124// ForEach executes a function for each bucket in the root.
125// If the provided function returns an error then the iteration is stopped and
126// the error is returned to the caller.
127func (tx *Tx) ForEach(fn func(name []byte, b *Bucket) error) error {
128 return tx.root.ForEach(func(k, v []byte) error {
129 if err := fn(k, tx.root.Bucket(k)); err != nil {
130 return err
131 }
132 return nil
133 })
134}
135
136// OnCommit adds a handler function to be executed after the transaction successfully commits.
137func (tx *Tx) OnCommit(fn func()) {
138 tx.commitHandlers = append(tx.commitHandlers, fn)
139}
140
141// Commit writes all changes to disk and updates the meta page.
142// Returns an error if a disk write error occurs, or if Commit is
143// called on a read-only transaction.
144func (tx *Tx) Commit() error {
145 _assert(!tx.managed, "managed tx commit not allowed")
146 if tx.db == nil {
147 return ErrTxClosed
148 } else if !tx.writable {
149 return ErrTxNotWritable
150 }
151
152 // TODO(benbjohnson): Use vectorized I/O to write out dirty pages.
153
154 // Rebalance nodes which have had deletions.
155 var startTime = time.Now()
156 tx.root.rebalance()
157 if tx.stats.Rebalance > 0 {
158 tx.stats.RebalanceTime += time.Since(startTime)
159 }
160
161 // spill data onto dirty pages.
162 startTime = time.Now()
163 if err := tx.root.spill(); err != nil {
164 tx.rollback()
165 return err
166 }
167 tx.stats.SpillTime += time.Since(startTime)
168
169 // Free the old root bucket.
170 tx.meta.root.root = tx.root.root
171
172 opgid := tx.meta.pgid
173
174 // Free the freelist and allocate new pages for it. This will overestimate
175 // the size of the freelist but not underestimate the size (which would be bad).
176 tx.db.freelist.free(tx.meta.txid, tx.db.page(tx.meta.freelist))
177 p, err := tx.allocate((tx.db.freelist.size() / tx.db.pageSize) + 1)
178 if err != nil {
179 tx.rollback()
180 return err
181 }
182 if err := tx.db.freelist.write(p); err != nil {
183 tx.rollback()
184 return err
185 }
186 tx.meta.freelist = p.id
187
188 // If the high water mark has moved up then attempt to grow the database.
189 if tx.meta.pgid > opgid {
190 if err := tx.db.grow(int(tx.meta.pgid+1) * tx.db.pageSize); err != nil {
191 tx.rollback()
192 return err
193 }
194 }
195
196 // Write dirty pages to disk.
197 startTime = time.Now()
198 if err := tx.write(); err != nil {
199 tx.rollback()
200 return err
201 }
202
203 // If strict mode is enabled then perform a consistency check.
204 // Only the first consistency error is reported in the panic.
205 if tx.db.StrictMode {
206 ch := tx.Check()
207 var errs []string
208 for {
209 err, ok := <-ch
210 if !ok {
211 break
212 }
213 errs = append(errs, err.Error())
214 }
215 if len(errs) > 0 {
216 panic("check fail: " + strings.Join(errs, "\n"))
217 }
218 }
219
220 // Write meta to disk.
221 if err := tx.writeMeta(); err != nil {
222 tx.rollback()
223 return err
224 }
225 tx.stats.WriteTime += time.Since(startTime)
226
227 // Finalize the transaction.
228 tx.close()
229
230 // Execute commit handlers now that the locks have been removed.
231 for _, fn := range tx.commitHandlers {
232 fn()
233 }
234
235 return nil
236}
237
238// Rollback closes the transaction and ignores all previous updates. Read-only
239// transactions must be rolled back and not committed.
240func (tx *Tx) Rollback() error {
241 _assert(!tx.managed, "managed tx rollback not allowed")
242 if tx.db == nil {
243 return ErrTxClosed
244 }
245 tx.rollback()
246 return nil
247}
248
249func (tx *Tx) rollback() {
250 if tx.db == nil {
251 return
252 }
253 if tx.writable {
254 tx.db.freelist.rollback(tx.meta.txid)
255 tx.db.freelist.reload(tx.db.page(tx.db.meta().freelist))
256 }
257 tx.close()
258}
259
260func (tx *Tx) close() {
261 if tx.db == nil {
262 return
263 }
264 if tx.writable {
265 // Grab freelist stats.
266 var freelistFreeN = tx.db.freelist.free_count()
267 var freelistPendingN = tx.db.freelist.pending_count()
268 var freelistAlloc = tx.db.freelist.size()
269
270 // Remove transaction ref & writer lock.
271 tx.db.rwtx = nil
272 tx.db.rwlock.Unlock()
273
274 // Merge statistics.
275 tx.db.statlock.Lock()
276 tx.db.stats.FreePageN = freelistFreeN
277 tx.db.stats.PendingPageN = freelistPendingN
278 tx.db.stats.FreeAlloc = (freelistFreeN + freelistPendingN) * tx.db.pageSize
279 tx.db.stats.FreelistInuse = freelistAlloc
280 tx.db.stats.TxStats.add(&tx.stats)
281 tx.db.statlock.Unlock()
282 } else {
283 tx.db.removeTx(tx)
284 }
285
286 // Clear all references.
287 tx.db = nil
288 tx.meta = nil
289 tx.root = Bucket{tx: tx}
290 tx.pages = nil
291}
292
293// Copy writes the entire database to a writer.
294// This function exists for backwards compatibility. Use WriteTo() instead.
295func (tx *Tx) Copy(w io.Writer) error {
296 _, err := tx.WriteTo(w)
297 return err
298}
299
300// WriteTo writes the entire database to a writer.
301// If err == nil then exactly tx.Size() bytes will be written into the writer.
302func (tx *Tx) WriteTo(w io.Writer) (n int64, err error) {
303 // Attempt to open reader with WriteFlag
304 f, err := os.OpenFile(tx.db.path, os.O_RDONLY|tx.WriteFlag, 0)
305 if err != nil {
306 return 0, err
307 }
308 defer func() { _ = f.Close() }()
309
310 // Generate a meta page. We use the same page data for both meta pages.
311 buf := make([]byte, tx.db.pageSize)
312 page := (*page)(unsafe.Pointer(&buf[0]))
313 page.flags = metaPageFlag
314 *page.meta() = *tx.meta
315
316 // Write meta 0.
317 page.id = 0
318 page.meta().checksum = page.meta().sum64()
319 nn, err := w.Write(buf)
320 n += int64(nn)
321 if err != nil {
322 return n, fmt.Errorf("meta 0 copy: %s", err)
323 }
324
325 // Write meta 1 with a lower transaction id.
326 page.id = 1
327 page.meta().txid -= 1
328 page.meta().checksum = page.meta().sum64()
329 nn, err = w.Write(buf)
330 n += int64(nn)
331 if err != nil {
332 return n, fmt.Errorf("meta 1 copy: %s", err)
333 }
334
335 // Move past the meta pages in the file.
336 if _, err := f.Seek(int64(tx.db.pageSize*2), os.SEEK_SET); err != nil {
337 return n, fmt.Errorf("seek: %s", err)
338 }
339
340 // Copy data pages.
341 wn, err := io.CopyN(w, f, tx.Size()-int64(tx.db.pageSize*2))
342 n += wn
343 if err != nil {
344 return n, err
345 }
346
347 return n, f.Close()
348}
349
350// CopyFile copies the entire database to file at the given path.
351// A reader transaction is maintained during the copy so it is safe to continue
352// using the database while a copy is in progress.
353func (tx *Tx) CopyFile(path string, mode os.FileMode) error {
354 f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, mode)
355 if err != nil {
356 return err
357 }
358
359 err = tx.Copy(f)
360 if err != nil {
361 _ = f.Close()
362 return err
363 }
364 return f.Close()
365}
366
367// Check performs several consistency checks on the database for this transaction.
368// An error is returned if any inconsistency is found.
369//
370// It can be safely run concurrently on a writable transaction. However, this
371// incurs a high cost for large databases and databases with a lot of subbuckets
372// because of caching. This overhead can be removed if running on a read-only
373// transaction, however, it is not safe to execute other writer transactions at
374// the same time.
375func (tx *Tx) Check() <-chan error {
376 ch := make(chan error)
377 go tx.check(ch)
378 return ch
379}
380
381func (tx *Tx) check(ch chan error) {
382 // Check if any pages are double freed.
383 freed := make(map[pgid]bool)
384 for _, id := range tx.db.freelist.all() {
385 if freed[id] {
386 ch <- fmt.Errorf("page %d: already freed", id)
387 }
388 freed[id] = true
389 }
390
391 // Track every reachable page.
392 reachable := make(map[pgid]*page)
393 reachable[0] = tx.page(0) // meta0
394 reachable[1] = tx.page(1) // meta1
395 for i := uint32(0); i <= tx.page(tx.meta.freelist).overflow; i++ {
396 reachable[tx.meta.freelist+pgid(i)] = tx.page(tx.meta.freelist)
397 }
398
399 // Recursively check buckets.
400 tx.checkBucket(&tx.root, reachable, freed, ch)
401
402 // Ensure all pages below high water mark are either reachable or freed.
403 for i := pgid(0); i < tx.meta.pgid; i++ {
404 _, isReachable := reachable[i]
405 if !isReachable && !freed[i] {
406 ch <- fmt.Errorf("page %d: unreachable unfreed", int(i))
407 }
408 }
409
410 // Close the channel to signal completion.
411 close(ch)
412}
413
414func (tx *Tx) checkBucket(b *Bucket, reachable map[pgid]*page, freed map[pgid]bool, ch chan error) {
415 // Ignore inline buckets.
416 if b.root == 0 {
417 return
418 }
419
420 // Check every page used by this bucket.
421 b.tx.forEachPage(b.root, 0, func(p *page, _ int) {
422 if p.id > tx.meta.pgid {
423 ch <- fmt.Errorf("page %d: out of bounds: %d", int(p.id), int(b.tx.meta.pgid))
424 }
425
426 // Ensure each page is only referenced once.
427 for i := pgid(0); i <= pgid(p.overflow); i++ {
428 var id = p.id + i
429 if _, ok := reachable[id]; ok {
430 ch <- fmt.Errorf("page %d: multiple references", int(id))
431 }
432 reachable[id] = p
433 }
434
435 // We should only encounter un-freed leaf and branch pages.
436 if freed[p.id] {
437 ch <- fmt.Errorf("page %d: reachable freed", int(p.id))
438 } else if (p.flags&branchPageFlag) == 0 && (p.flags&leafPageFlag) == 0 {
439 ch <- fmt.Errorf("page %d: invalid type: %s", int(p.id), p.typ())
440 }
441 })
442
443 // Check each bucket within this bucket.
444 _ = b.ForEach(func(k, v []byte) error {
445 if child := b.Bucket(k); child != nil {
446 tx.checkBucket(child, reachable, freed, ch)
447 }
448 return nil
449 })
450}
451
452// allocate returns a contiguous block of memory starting at a given page.
453func (tx *Tx) allocate(count int) (*page, error) {
454 p, err := tx.db.allocate(count)
455 if err != nil {
456 return nil, err
457 }
458
459 // Save to our page cache.
460 tx.pages[p.id] = p
461
462 // Update statistics.
463 tx.stats.PageCount++
464 tx.stats.PageAlloc += count * tx.db.pageSize
465
466 return p, nil
467}
468
469// write writes any dirty pages to disk.
470func (tx *Tx) write() error {
471 // Sort pages by id.
472 pages := make(pages, 0, len(tx.pages))
473 for _, p := range tx.pages {
474 pages = append(pages, p)
475 }
476 // Clear out page cache early.
477 tx.pages = make(map[pgid]*page)
478 sort.Sort(pages)
479
480 // Write pages to disk in order.
481 for _, p := range pages {
482 size := (int(p.overflow) + 1) * tx.db.pageSize
483 offset := int64(p.id) * int64(tx.db.pageSize)
484
485 // Write out page in "max allocation" sized chunks.
486 ptr := (*[maxAllocSize]byte)(unsafe.Pointer(p))
487 for {
488 // Limit our write to our max allocation size.
489 sz := size
490 if sz > maxAllocSize-1 {
491 sz = maxAllocSize - 1
492 }
493
494 // Write chunk to disk.
495 buf := ptr[:sz]
496 if _, err := tx.db.ops.writeAt(buf, offset); err != nil {
497 return err
498 }
499
500 // Update statistics.
501 tx.stats.Write++
502
503 // Exit inner for loop if we've written all the chunks.
504 size -= sz
505 if size == 0 {
506 break
507 }
508
509 // Otherwise move offset forward and move pointer to next chunk.
510 offset += int64(sz)
511 ptr = (*[maxAllocSize]byte)(unsafe.Pointer(&ptr[sz]))
512 }
513 }
514
515 // Ignore file sync if flag is set on DB.
516 if !tx.db.NoSync || IgnoreNoSync {
517 if err := fdatasync(tx.db); err != nil {
518 return err
519 }
520 }
521
522 // Put small pages back to page pool.
523 for _, p := range pages {
524 // Ignore page sizes over 1 page.
525 // These are allocated using make() instead of the page pool.
526 if int(p.overflow) != 0 {
527 continue
528 }
529
530 buf := (*[maxAllocSize]byte)(unsafe.Pointer(p))[:tx.db.pageSize]
531
532 // See https://go.googlesource.com/go/+/f03c9202c43e0abb130669852082117ca50aa9b1
533 for i := range buf {
534 buf[i] = 0
535 }
536 tx.db.pagePool.Put(buf)
537 }
538
539 return nil
540}
541
542// writeMeta writes the meta to the disk.
543func (tx *Tx) writeMeta() error {
544 // Create a temporary buffer for the meta page.
545 buf := make([]byte, tx.db.pageSize)
546 p := tx.db.pageInBuffer(buf, 0)
547 tx.meta.write(p)
548
549 // Write the meta page to file.
550 if _, err := tx.db.ops.writeAt(buf, int64(p.id)*int64(tx.db.pageSize)); err != nil {
551 return err
552 }
553 if !tx.db.NoSync || IgnoreNoSync {
554 if err := fdatasync(tx.db); err != nil {
555 return err
556 }
557 }
558
559 // Update statistics.
560 tx.stats.Write++
561
562 return nil
563}
564
565// page returns a reference to the page with a given id.
566// If page has been written to then a temporary buffered page is returned.
567func (tx *Tx) page(id pgid) *page {
568 // Check the dirty pages first.
569 if tx.pages != nil {
570 if p, ok := tx.pages[id]; ok {
571 return p
572 }
573 }
574
575 // Otherwise return directly from the mmap.
576 return tx.db.page(id)
577}
578
579// forEachPage iterates over every page within a given page and executes a function.
580func (tx *Tx) forEachPage(pgid pgid, depth int, fn func(*page, int)) {
581 p := tx.page(pgid)
582
583 // Execute function.
584 fn(p, depth)
585
586 // Recursively loop over children.
587 if (p.flags & branchPageFlag) != 0 {
588 for i := 0; i < int(p.count); i++ {
589 elem := p.branchPageElement(uint16(i))
590 tx.forEachPage(elem.pgid, depth+1, fn)
591 }
592 }
593}
594
595// Page returns page information for a given page number.
596// This is only safe for concurrent use when used by a writable transaction.
597func (tx *Tx) Page(id int) (*PageInfo, error) {
598 if tx.db == nil {
599 return nil, ErrTxClosed
600 } else if pgid(id) >= tx.meta.pgid {
601 return nil, nil
602 }
603
604 // Build the page info.
605 p := tx.db.page(pgid(id))
606 info := &PageInfo{
607 ID: id,
608 Count: int(p.count),
609 OverflowCount: int(p.overflow),
610 }
611
612 // Determine the type (or if it's free).
613 if tx.db.freelist.freed(pgid(id)) {
614 info.Type = "free"
615 } else {
616 info.Type = p.typ()
617 }
618
619 return info, nil
620}
621
622// TxStats represents statistics about the actions performed by the transaction.
623type TxStats struct {
624 // Page statistics.
625 PageCount int // number of page allocations
626 PageAlloc int // total bytes allocated
627
628 // Cursor statistics.
629 CursorCount int // number of cursors created
630
631 // Node statistics
632 NodeCount int // number of node allocations
633 NodeDeref int // number of node dereferences
634
635 // Rebalance statistics.
636 Rebalance int // number of node rebalances
637 RebalanceTime time.Duration // total time spent rebalancing
638
639 // Split/Spill statistics.
640 Split int // number of nodes split
641 Spill int // number of nodes spilled
642 SpillTime time.Duration // total time spent spilling
643
644 // Write statistics.
645 Write int // number of writes performed
646 WriteTime time.Duration // total time spent writing to disk
647}
648
649func (s *TxStats) add(other *TxStats) {
650 s.PageCount += other.PageCount
651 s.PageAlloc += other.PageAlloc
652 s.CursorCount += other.CursorCount
653 s.NodeCount += other.NodeCount
654 s.NodeDeref += other.NodeDeref
655 s.Rebalance += other.Rebalance
656 s.RebalanceTime += other.RebalanceTime
657 s.Split += other.Split
658 s.Spill += other.Spill
659 s.SpillTime += other.SpillTime
660 s.Write += other.Write
661 s.WriteTime += other.WriteTime
662}
663
664// Sub calculates and returns the difference between two sets of transaction stats.
665// This is useful when obtaining stats at two different points and time and
666// you need the performance counters that occurred within that time span.
667func (s *TxStats) Sub(other *TxStats) TxStats {
668 var diff TxStats
669 diff.PageCount = s.PageCount - other.PageCount
670 diff.PageAlloc = s.PageAlloc - other.PageAlloc
671 diff.CursorCount = s.CursorCount - other.CursorCount
672 diff.NodeCount = s.NodeCount - other.NodeCount
673 diff.NodeDeref = s.NodeDeref - other.NodeDeref
674 diff.Rebalance = s.Rebalance - other.Rebalance
675 diff.RebalanceTime = s.RebalanceTime - other.RebalanceTime
676 diff.Split = s.Split - other.Split
677 diff.Spill = s.Spill - other.Spill
678 diff.SpillTime = s.SpillTime - other.SpillTime
679 diff.Write = s.Write - other.Write
680 diff.WriteTime = s.WriteTime - other.WriteTime
681 return diff
682}