blob: aed6893e41a1ac31d232d6909fc3fcfbf20150f2 [file] [log] [blame]
khenaidooffe076b2019-01-15 16:08:08 -05001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package backend
16
17import (
18 "bytes"
19 "math"
20 "sync"
21 "sync/atomic"
22 "time"
23
24 bolt "github.com/coreos/bbolt"
25)
26
27type BatchTx interface {
28 ReadTx
29 UnsafeCreateBucket(name []byte)
30 UnsafePut(bucketName []byte, key []byte, value []byte)
31 UnsafeSeqPut(bucketName []byte, key []byte, value []byte)
32 UnsafeDelete(bucketName []byte, key []byte)
33 // Commit commits a previous tx and begins a new writable one.
34 Commit()
35 // CommitAndStop commits the previous tx and does not create a new one.
36 CommitAndStop()
37}
38
39type batchTx struct {
40 sync.Mutex
41 tx *bolt.Tx
42 backend *backend
43
44 pending int
45}
46
47func (t *batchTx) UnsafeCreateBucket(name []byte) {
48 _, err := t.tx.CreateBucket(name)
49 if err != nil && err != bolt.ErrBucketExists {
50 plog.Fatalf("cannot create bucket %s (%v)", name, err)
51 }
52 t.pending++
53}
54
55// UnsafePut must be called holding the lock on the tx.
56func (t *batchTx) UnsafePut(bucketName []byte, key []byte, value []byte) {
57 t.unsafePut(bucketName, key, value, false)
58}
59
60// UnsafeSeqPut must be called holding the lock on the tx.
61func (t *batchTx) UnsafeSeqPut(bucketName []byte, key []byte, value []byte) {
62 t.unsafePut(bucketName, key, value, true)
63}
64
65func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq bool) {
66 bucket := t.tx.Bucket(bucketName)
67 if bucket == nil {
68 plog.Fatalf("bucket %s does not exist", bucketName)
69 }
70 if seq {
71 // it is useful to increase fill percent when the workloads are mostly append-only.
72 // this can delay the page split and reduce space usage.
73 bucket.FillPercent = 0.9
74 }
75 if err := bucket.Put(key, value); err != nil {
76 plog.Fatalf("cannot put key into bucket (%v)", err)
77 }
78 t.pending++
79}
80
81// UnsafeRange must be called holding the lock on the tx.
82func (t *batchTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
83 bucket := t.tx.Bucket(bucketName)
84 if bucket == nil {
85 plog.Fatalf("bucket %s does not exist", bucketName)
86 }
87 return unsafeRange(bucket.Cursor(), key, endKey, limit)
88}
89
90func unsafeRange(c *bolt.Cursor, key, endKey []byte, limit int64) (keys [][]byte, vs [][]byte) {
91 if limit <= 0 {
92 limit = math.MaxInt64
93 }
94 var isMatch func(b []byte) bool
95 if len(endKey) > 0 {
96 isMatch = func(b []byte) bool { return bytes.Compare(b, endKey) < 0 }
97 } else {
98 isMatch = func(b []byte) bool { return bytes.Equal(b, key) }
99 limit = 1
100 }
101 for ck, cv := c.Seek(key); ck != nil && isMatch(ck); ck, cv = c.Next() {
102 vs = append(vs, cv)
103 keys = append(keys, ck)
104 if limit == int64(len(keys)) {
105 break
106 }
107 }
108 return keys, vs
109}
110
111// UnsafeDelete must be called holding the lock on the tx.
112func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) {
113 bucket := t.tx.Bucket(bucketName)
114 if bucket == nil {
115 plog.Fatalf("bucket %s does not exist", bucketName)
116 }
117 err := bucket.Delete(key)
118 if err != nil {
119 plog.Fatalf("cannot delete key from bucket (%v)", err)
120 }
121 t.pending++
122}
123
124// UnsafeForEach must be called holding the lock on the tx.
125func (t *batchTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error {
126 return unsafeForEach(t.tx, bucketName, visitor)
127}
128
129func unsafeForEach(tx *bolt.Tx, bucket []byte, visitor func(k, v []byte) error) error {
130 if b := tx.Bucket(bucket); b != nil {
131 return b.ForEach(visitor)
132 }
133 return nil
134}
135
136// Commit commits a previous tx and begins a new writable one.
137func (t *batchTx) Commit() {
138 t.Lock()
139 t.commit(false)
140 t.Unlock()
141}
142
143// CommitAndStop commits the previous tx and does not create a new one.
144func (t *batchTx) CommitAndStop() {
145 t.Lock()
146 t.commit(true)
147 t.Unlock()
148}
149
150func (t *batchTx) Unlock() {
151 if t.pending >= t.backend.batchLimit {
152 t.commit(false)
153 }
154 t.Mutex.Unlock()
155}
156
157func (t *batchTx) commit(stop bool) {
158 // commit the last tx
159 if t.tx != nil {
160 if t.pending == 0 && !stop {
161 return
162 }
163
164 start := time.Now()
165
166 // gofail: var beforeCommit struct{}
167 err := t.tx.Commit()
168 // gofail: var afterCommit struct{}
169
170 commitDurations.Observe(time.Since(start).Seconds())
171 atomic.AddInt64(&t.backend.commits, 1)
172
173 t.pending = 0
174 if err != nil {
175 plog.Fatalf("cannot commit tx (%s)", err)
176 }
177 }
178 if !stop {
179 t.tx = t.backend.begin(true)
180 }
181}
182
183type batchTxBuffered struct {
184 batchTx
185 buf txWriteBuffer
186}
187
188func newBatchTxBuffered(backend *backend) *batchTxBuffered {
189 tx := &batchTxBuffered{
190 batchTx: batchTx{backend: backend},
191 buf: txWriteBuffer{
192 txBuffer: txBuffer{make(map[string]*bucketBuffer)},
193 seq: true,
194 },
195 }
196 tx.Commit()
197 return tx
198}
199
200func (t *batchTxBuffered) Unlock() {
201 if t.pending != 0 {
202 t.backend.readTx.mu.Lock()
203 t.buf.writeback(&t.backend.readTx.buf)
204 t.backend.readTx.mu.Unlock()
205 if t.pending >= t.backend.batchLimit {
206 t.commit(false)
207 }
208 }
209 t.batchTx.Unlock()
210}
211
212func (t *batchTxBuffered) Commit() {
213 t.Lock()
214 t.commit(false)
215 t.Unlock()
216}
217
218func (t *batchTxBuffered) CommitAndStop() {
219 t.Lock()
220 t.commit(true)
221 t.Unlock()
222}
223
224func (t *batchTxBuffered) commit(stop bool) {
225 // all read txs must be closed to acquire boltdb commit rwlock
226 t.backend.readTx.mu.Lock()
227 t.unsafeCommit(stop)
228 t.backend.readTx.mu.Unlock()
229}
230
231func (t *batchTxBuffered) unsafeCommit(stop bool) {
232 if t.backend.readTx.tx != nil {
233 if err := t.backend.readTx.tx.Rollback(); err != nil {
234 plog.Fatalf("cannot rollback tx (%s)", err)
235 }
236 t.backend.readTx.reset()
237 }
238
239 t.batchTx.commit(stop)
240
241 if !stop {
242 t.backend.readTx.tx = t.backend.begin(false)
243 }
244}
245
246func (t *batchTxBuffered) UnsafePut(bucketName []byte, key []byte, value []byte) {
247 t.batchTx.UnsafePut(bucketName, key, value)
248 t.buf.put(bucketName, key, value)
249}
250
251func (t *batchTxBuffered) UnsafeSeqPut(bucketName []byte, key []byte, value []byte) {
252 t.batchTx.UnsafeSeqPut(bucketName, key, value)
253 t.buf.putSeq(bucketName, key, value)
254}