blob: d5c8a88c353e208d479f66f35ac1c9d405ab8b15 [file] [log] [blame]
khenaidooab1f7bd2019-11-14 14:00:27 -05001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package backend
16
17import (
18 "bytes"
19 "math"
20 "sync"
21 "sync/atomic"
22 "time"
23
24 bolt "go.etcd.io/bbolt"
25 "go.uber.org/zap"
26)
27
28type BatchTx interface {
29 ReadTx
30 UnsafeCreateBucket(name []byte)
31 UnsafePut(bucketName []byte, key []byte, value []byte)
32 UnsafeSeqPut(bucketName []byte, key []byte, value []byte)
33 UnsafeDelete(bucketName []byte, key []byte)
34 // Commit commits a previous tx and begins a new writable one.
35 Commit()
36 // CommitAndStop commits the previous tx and does not create a new one.
37 CommitAndStop()
38}
39
40type batchTx struct {
41 sync.Mutex
42 tx *bolt.Tx
43 backend *backend
44
45 pending int
46}
47
48func (t *batchTx) Lock() {
49 t.Mutex.Lock()
50}
51
52func (t *batchTx) Unlock() {
53 if t.pending >= t.backend.batchLimit {
54 t.commit(false)
55 }
56 t.Mutex.Unlock()
57}
58
59// BatchTx interface embeds ReadTx interface. But RLock() and RUnlock() do not
60// have appropriate semantics in BatchTx interface. Therefore should not be called.
61// TODO: might want to decouple ReadTx and BatchTx
62
63func (t *batchTx) RLock() {
64 panic("unexpected RLock")
65}
66
67func (t *batchTx) RUnlock() {
68 panic("unexpected RUnlock")
69}
70
71func (t *batchTx) UnsafeCreateBucket(name []byte) {
72 _, err := t.tx.CreateBucket(name)
73 if err != nil && err != bolt.ErrBucketExists {
74 if t.backend.lg != nil {
75 t.backend.lg.Fatal(
76 "failed to create a bucket",
77 zap.String("bucket-name", string(name)),
78 zap.Error(err),
79 )
80 } else {
81 plog.Fatalf("cannot create bucket %s (%v)", name, err)
82 }
83 }
84 t.pending++
85}
86
87// UnsafePut must be called holding the lock on the tx.
88func (t *batchTx) UnsafePut(bucketName []byte, key []byte, value []byte) {
89 t.unsafePut(bucketName, key, value, false)
90}
91
92// UnsafeSeqPut must be called holding the lock on the tx.
93func (t *batchTx) UnsafeSeqPut(bucketName []byte, key []byte, value []byte) {
94 t.unsafePut(bucketName, key, value, true)
95}
96
97func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq bool) {
98 bucket := t.tx.Bucket(bucketName)
99 if bucket == nil {
100 if t.backend.lg != nil {
101 t.backend.lg.Fatal(
102 "failed to find a bucket",
103 zap.String("bucket-name", string(bucketName)),
104 )
105 } else {
106 plog.Fatalf("bucket %s does not exist", bucketName)
107 }
108 }
109 if seq {
110 // it is useful to increase fill percent when the workloads are mostly append-only.
111 // this can delay the page split and reduce space usage.
112 bucket.FillPercent = 0.9
113 }
114 if err := bucket.Put(key, value); err != nil {
115 if t.backend.lg != nil {
116 t.backend.lg.Fatal(
117 "failed to write to a bucket",
118 zap.String("bucket-name", string(bucketName)),
119 zap.Error(err),
120 )
121 } else {
122 plog.Fatalf("cannot put key into bucket (%v)", err)
123 }
124 }
125 t.pending++
126}
127
128// UnsafeRange must be called holding the lock on the tx.
129func (t *batchTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
130 bucket := t.tx.Bucket(bucketName)
131 if bucket == nil {
132 if t.backend.lg != nil {
133 t.backend.lg.Fatal(
134 "failed to find a bucket",
135 zap.String("bucket-name", string(bucketName)),
136 )
137 } else {
138 plog.Fatalf("bucket %s does not exist", bucketName)
139 }
140 }
141 return unsafeRange(bucket.Cursor(), key, endKey, limit)
142}
143
144func unsafeRange(c *bolt.Cursor, key, endKey []byte, limit int64) (keys [][]byte, vs [][]byte) {
145 if limit <= 0 {
146 limit = math.MaxInt64
147 }
148 var isMatch func(b []byte) bool
149 if len(endKey) > 0 {
150 isMatch = func(b []byte) bool { return bytes.Compare(b, endKey) < 0 }
151 } else {
152 isMatch = func(b []byte) bool { return bytes.Equal(b, key) }
153 limit = 1
154 }
155
156 for ck, cv := c.Seek(key); ck != nil && isMatch(ck); ck, cv = c.Next() {
157 vs = append(vs, cv)
158 keys = append(keys, ck)
159 if limit == int64(len(keys)) {
160 break
161 }
162 }
163 return keys, vs
164}
165
166// UnsafeDelete must be called holding the lock on the tx.
167func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) {
168 bucket := t.tx.Bucket(bucketName)
169 if bucket == nil {
170 if t.backend.lg != nil {
171 t.backend.lg.Fatal(
172 "failed to find a bucket",
173 zap.String("bucket-name", string(bucketName)),
174 )
175 } else {
176 plog.Fatalf("bucket %s does not exist", bucketName)
177 }
178 }
179 err := bucket.Delete(key)
180 if err != nil {
181 if t.backend.lg != nil {
182 t.backend.lg.Fatal(
183 "failed to delete a key",
184 zap.String("bucket-name", string(bucketName)),
185 zap.Error(err),
186 )
187 } else {
188 plog.Fatalf("cannot delete key from bucket (%v)", err)
189 }
190 }
191 t.pending++
192}
193
194// UnsafeForEach must be called holding the lock on the tx.
195func (t *batchTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error {
196 return unsafeForEach(t.tx, bucketName, visitor)
197}
198
199func unsafeForEach(tx *bolt.Tx, bucket []byte, visitor func(k, v []byte) error) error {
200 if b := tx.Bucket(bucket); b != nil {
201 return b.ForEach(visitor)
202 }
203 return nil
204}
205
206// Commit commits a previous tx and begins a new writable one.
207func (t *batchTx) Commit() {
208 t.Lock()
209 t.commit(false)
210 t.Unlock()
211}
212
213// CommitAndStop commits the previous tx and does not create a new one.
214func (t *batchTx) CommitAndStop() {
215 t.Lock()
216 t.commit(true)
217 t.Unlock()
218}
219
220func (t *batchTx) safePending() int {
221 t.Mutex.Lock()
222 defer t.Mutex.Unlock()
223 return t.pending
224}
225
226func (t *batchTx) commit(stop bool) {
227 // commit the last tx
228 if t.tx != nil {
229 if t.pending == 0 && !stop {
230 return
231 }
232
233 start := time.Now()
234
235 // gofail: var beforeCommit struct{}
236 err := t.tx.Commit()
237 // gofail: var afterCommit struct{}
238
239 rebalanceSec.Observe(t.tx.Stats().RebalanceTime.Seconds())
240 spillSec.Observe(t.tx.Stats().SpillTime.Seconds())
241 writeSec.Observe(t.tx.Stats().WriteTime.Seconds())
242 commitSec.Observe(time.Since(start).Seconds())
243 atomic.AddInt64(&t.backend.commits, 1)
244
245 t.pending = 0
246 if err != nil {
247 if t.backend.lg != nil {
248 t.backend.lg.Fatal("failed to commit tx", zap.Error(err))
249 } else {
250 plog.Fatalf("cannot commit tx (%s)", err)
251 }
252 }
253 }
254 if !stop {
255 t.tx = t.backend.begin(true)
256 }
257}
258
259type batchTxBuffered struct {
260 batchTx
261 buf txWriteBuffer
262}
263
264func newBatchTxBuffered(backend *backend) *batchTxBuffered {
265 tx := &batchTxBuffered{
266 batchTx: batchTx{backend: backend},
267 buf: txWriteBuffer{
268 txBuffer: txBuffer{make(map[string]*bucketBuffer)},
269 seq: true,
270 },
271 }
272 tx.Commit()
273 return tx
274}
275
276func (t *batchTxBuffered) Unlock() {
277 if t.pending != 0 {
278 t.backend.readTx.Lock() // blocks txReadBuffer for writing.
279 t.buf.writeback(&t.backend.readTx.buf)
280 t.backend.readTx.Unlock()
281 if t.pending >= t.backend.batchLimit {
282 t.commit(false)
283 }
284 }
285 t.batchTx.Unlock()
286}
287
288func (t *batchTxBuffered) Commit() {
289 t.Lock()
290 t.commit(false)
291 t.Unlock()
292}
293
294func (t *batchTxBuffered) CommitAndStop() {
295 t.Lock()
296 t.commit(true)
297 t.Unlock()
298}
299
300func (t *batchTxBuffered) commit(stop bool) {
301 // all read txs must be closed to acquire boltdb commit rwlock
302 t.backend.readTx.Lock()
303 t.unsafeCommit(stop)
304 t.backend.readTx.Unlock()
305}
306
307func (t *batchTxBuffered) unsafeCommit(stop bool) {
308 if t.backend.readTx.tx != nil {
309 // wait all store read transactions using the current boltdb tx to finish,
310 // then close the boltdb tx
311 go func(tx *bolt.Tx, wg *sync.WaitGroup) {
312 wg.Wait()
313 if err := tx.Rollback(); err != nil {
314 if t.backend.lg != nil {
315 t.backend.lg.Fatal("failed to rollback tx", zap.Error(err))
316 } else {
317 plog.Fatalf("cannot rollback tx (%s)", err)
318 }
319 }
320 }(t.backend.readTx.tx, t.backend.readTx.txWg)
321 t.backend.readTx.reset()
322 }
323
324 t.batchTx.commit(stop)
325
326 if !stop {
327 t.backend.readTx.tx = t.backend.begin(false)
328 }
329}
330
331func (t *batchTxBuffered) UnsafePut(bucketName []byte, key []byte, value []byte) {
332 t.batchTx.UnsafePut(bucketName, key, value)
333 t.buf.put(bucketName, key, value)
334}
335
336func (t *batchTxBuffered) UnsafeSeqPut(bucketName []byte, key []byte, value []byte) {
337 t.batchTx.UnsafeSeqPut(bucketName, key, value)
338 t.buf.putSeq(bucketName, key, value)
339}