blob: 91fe72ec5589838cf79dadb8f8ef548f0be91bea [file] [log] [blame]
khenaidooab1f7bd2019-11-14 14:00:27 -05001// Copyright 2017 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package backend
16
17import (
18 "bytes"
19 "math"
20 "sync"
21
22 bolt "go.etcd.io/bbolt"
23)
24
25// safeRangeBucket is a hack to avoid inadvertently reading duplicate keys;
26// overwrites on a bucket should only fetch with limit=1, but safeRangeBucket
27// is known to never overwrite any key so range is safe.
28var safeRangeBucket = []byte("key")
29
30type ReadTx interface {
31 Lock()
32 Unlock()
33 RLock()
34 RUnlock()
35
36 UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte)
37 UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error
38}
39
40type readTx struct {
41 // mu protects accesses to the txReadBuffer
42 mu sync.RWMutex
43 buf txReadBuffer
44
45 // TODO: group and encapsulate {txMu, tx, buckets, txWg}, as they share the same lifecycle.
46 // txMu protects accesses to buckets and tx on Range requests.
47 txMu sync.RWMutex
48 tx *bolt.Tx
49 buckets map[string]*bolt.Bucket
50 // txWg protects tx from being rolled back at the end of a batch interval until all reads using this tx are done.
51 txWg *sync.WaitGroup
52}
53
54func (rt *readTx) Lock() { rt.mu.Lock() }
55func (rt *readTx) Unlock() { rt.mu.Unlock() }
56func (rt *readTx) RLock() { rt.mu.RLock() }
57func (rt *readTx) RUnlock() { rt.mu.RUnlock() }
58
59func (rt *readTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
60 if endKey == nil {
61 // forbid duplicates for single keys
62 limit = 1
63 }
64 if limit <= 0 {
65 limit = math.MaxInt64
66 }
67 if limit > 1 && !bytes.Equal(bucketName, safeRangeBucket) {
68 panic("do not use unsafeRange on non-keys bucket")
69 }
70 keys, vals := rt.buf.Range(bucketName, key, endKey, limit)
71 if int64(len(keys)) == limit {
72 return keys, vals
73 }
74
75 // find/cache bucket
76 bn := string(bucketName)
77 rt.txMu.RLock()
78 bucket, ok := rt.buckets[bn]
79 rt.txMu.RUnlock()
80 if !ok {
81 rt.txMu.Lock()
82 bucket = rt.tx.Bucket(bucketName)
83 rt.buckets[bn] = bucket
84 rt.txMu.Unlock()
85 }
86
87 // ignore missing bucket since may have been created in this batch
88 if bucket == nil {
89 return keys, vals
90 }
91 rt.txMu.Lock()
92 c := bucket.Cursor()
93 rt.txMu.Unlock()
94
95 k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys)))
96 return append(k2, keys...), append(v2, vals...)
97}
98
99func (rt *readTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error {
100 dups := make(map[string]struct{})
101 getDups := func(k, v []byte) error {
102 dups[string(k)] = struct{}{}
103 return nil
104 }
105 visitNoDup := func(k, v []byte) error {
106 if _, ok := dups[string(k)]; ok {
107 return nil
108 }
109 return visitor(k, v)
110 }
111 if err := rt.buf.ForEach(bucketName, getDups); err != nil {
112 return err
113 }
114 rt.txMu.Lock()
115 err := unsafeForEach(rt.tx, bucketName, visitNoDup)
116 rt.txMu.Unlock()
117 if err != nil {
118 return err
119 }
120 return rt.buf.ForEach(bucketName, visitor)
121}
122
123func (rt *readTx) reset() {
124 rt.buf.reset()
125 rt.buckets = make(map[string]*bolt.Bucket)
126 rt.tx = nil
127 rt.txWg = new(sync.WaitGroup)
128}
129
130// TODO: create a base type for readTx and concurrentReadTx to avoid duplicated function implementation?
131type concurrentReadTx struct {
132 buf txReadBuffer
133 txMu *sync.RWMutex
134 tx *bolt.Tx
135 buckets map[string]*bolt.Bucket
136 txWg *sync.WaitGroup
137}
138
139func (rt *concurrentReadTx) Lock() {}
140func (rt *concurrentReadTx) Unlock() {}
141
142// RLock is no-op. concurrentReadTx does not need to be locked after it is created.
143func (rt *concurrentReadTx) RLock() {}
144
145// RUnlock signals the end of concurrentReadTx.
146func (rt *concurrentReadTx) RUnlock() { rt.txWg.Done() }
147
148func (rt *concurrentReadTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error {
149 dups := make(map[string]struct{})
150 getDups := func(k, v []byte) error {
151 dups[string(k)] = struct{}{}
152 return nil
153 }
154 visitNoDup := func(k, v []byte) error {
155 if _, ok := dups[string(k)]; ok {
156 return nil
157 }
158 return visitor(k, v)
159 }
160 if err := rt.buf.ForEach(bucketName, getDups); err != nil {
161 return err
162 }
163 rt.txMu.Lock()
164 err := unsafeForEach(rt.tx, bucketName, visitNoDup)
165 rt.txMu.Unlock()
166 if err != nil {
167 return err
168 }
169 return rt.buf.ForEach(bucketName, visitor)
170}
171
172func (rt *concurrentReadTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
173 if endKey == nil {
174 // forbid duplicates for single keys
175 limit = 1
176 }
177 if limit <= 0 {
178 limit = math.MaxInt64
179 }
180 if limit > 1 && !bytes.Equal(bucketName, safeRangeBucket) {
181 panic("do not use unsafeRange on non-keys bucket")
182 }
183 keys, vals := rt.buf.Range(bucketName, key, endKey, limit)
184 if int64(len(keys)) == limit {
185 return keys, vals
186 }
187
188 // find/cache bucket
189 bn := string(bucketName)
190 rt.txMu.RLock()
191 bucket, ok := rt.buckets[bn]
192 rt.txMu.RUnlock()
193 if !ok {
194 rt.txMu.Lock()
195 bucket = rt.tx.Bucket(bucketName)
196 rt.buckets[bn] = bucket
197 rt.txMu.Unlock()
198 }
199
200 // ignore missing bucket since may have been created in this batch
201 if bucket == nil {
202 return keys, vals
203 }
204 rt.txMu.Lock()
205 c := bucket.Cursor()
206 rt.txMu.Unlock()
207
208 k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys)))
209 return append(k2, keys...), append(v2, vals...)
210}