blob: b27a9e5433955f47ce1fa622bfe3a8d9466f2945 [file] [log] [blame]
khenaidood948f772021-08-11 17:49:24 -04001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package mvcc
16
17import (
18 "sort"
19 "sync"
20
21 "github.com/google/btree"
22)
23
24type index interface {
25 Get(key []byte, atRev int64) (rev, created revision, ver int64, err error)
26 Range(key, end []byte, atRev int64) ([][]byte, []revision)
27 Revisions(key, end []byte, atRev int64) []revision
28 Put(key []byte, rev revision)
29 Tombstone(key []byte, rev revision) error
30 RangeSince(key, end []byte, rev int64) []revision
31 Compact(rev int64) map[revision]struct{}
32 Keep(rev int64) map[revision]struct{}
33 Equal(b index) bool
34
35 Insert(ki *keyIndex)
36 KeyIndex(ki *keyIndex) *keyIndex
37}
38
39type treeIndex struct {
40 sync.RWMutex
41 tree *btree.BTree
42}
43
44func newTreeIndex() index {
45 return &treeIndex{
46 tree: btree.New(32),
47 }
48}
49
50func (ti *treeIndex) Put(key []byte, rev revision) {
51 keyi := &keyIndex{key: key}
52
53 ti.Lock()
54 defer ti.Unlock()
55 item := ti.tree.Get(keyi)
56 if item == nil {
57 keyi.put(rev.main, rev.sub)
58 ti.tree.ReplaceOrInsert(keyi)
59 return
60 }
61 okeyi := item.(*keyIndex)
62 okeyi.put(rev.main, rev.sub)
63}
64
65func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created revision, ver int64, err error) {
66 keyi := &keyIndex{key: key}
67 ti.RLock()
68 defer ti.RUnlock()
69 if keyi = ti.keyIndex(keyi); keyi == nil {
70 return revision{}, revision{}, 0, ErrRevisionNotFound
71 }
72 return keyi.get(atRev)
73}
74
75func (ti *treeIndex) KeyIndex(keyi *keyIndex) *keyIndex {
76 ti.RLock()
77 defer ti.RUnlock()
78 return ti.keyIndex(keyi)
79}
80
81func (ti *treeIndex) keyIndex(keyi *keyIndex) *keyIndex {
82 if item := ti.tree.Get(keyi); item != nil {
83 return item.(*keyIndex)
84 }
85 return nil
86}
87
88func (ti *treeIndex) visit(key, end []byte, f func(ki *keyIndex)) {
89 keyi, endi := &keyIndex{key: key}, &keyIndex{key: end}
90
91 ti.RLock()
92 defer ti.RUnlock()
93
94 ti.tree.AscendGreaterOrEqual(keyi, func(item btree.Item) bool {
95 if len(endi.key) > 0 && !item.Less(endi) {
96 return false
97 }
98 f(item.(*keyIndex))
99 return true
100 })
101}
102
103func (ti *treeIndex) Revisions(key, end []byte, atRev int64) (revs []revision) {
104 if end == nil {
105 rev, _, _, err := ti.Get(key, atRev)
106 if err != nil {
107 return nil
108 }
109 return []revision{rev}
110 }
111 ti.visit(key, end, func(ki *keyIndex) {
112 if rev, _, _, err := ki.get(atRev); err == nil {
113 revs = append(revs, rev)
114 }
115 })
116 return revs
117}
118
119func (ti *treeIndex) Range(key, end []byte, atRev int64) (keys [][]byte, revs []revision) {
120 if end == nil {
121 rev, _, _, err := ti.Get(key, atRev)
122 if err != nil {
123 return nil, nil
124 }
125 return [][]byte{key}, []revision{rev}
126 }
127 ti.visit(key, end, func(ki *keyIndex) {
128 if rev, _, _, err := ki.get(atRev); err == nil {
129 revs = append(revs, rev)
130 keys = append(keys, ki.key)
131 }
132 })
133 return keys, revs
134}
135
136func (ti *treeIndex) Tombstone(key []byte, rev revision) error {
137 keyi := &keyIndex{key: key}
138
139 ti.Lock()
140 defer ti.Unlock()
141 item := ti.tree.Get(keyi)
142 if item == nil {
143 return ErrRevisionNotFound
144 }
145
146 ki := item.(*keyIndex)
147 return ki.tombstone(rev.main, rev.sub)
148}
149
150// RangeSince returns all revisions from key(including) to end(excluding)
151// at or after the given rev. The returned slice is sorted in the order
152// of revision.
153func (ti *treeIndex) RangeSince(key, end []byte, rev int64) []revision {
154 keyi := &keyIndex{key: key}
155
156 ti.RLock()
157 defer ti.RUnlock()
158
159 if end == nil {
160 item := ti.tree.Get(keyi)
161 if item == nil {
162 return nil
163 }
164 keyi = item.(*keyIndex)
165 return keyi.since(rev)
166 }
167
168 endi := &keyIndex{key: end}
169 var revs []revision
170 ti.tree.AscendGreaterOrEqual(keyi, func(item btree.Item) bool {
171 if len(endi.key) > 0 && !item.Less(endi) {
172 return false
173 }
174 curKeyi := item.(*keyIndex)
175 revs = append(revs, curKeyi.since(rev)...)
176 return true
177 })
178 sort.Sort(revisions(revs))
179
180 return revs
181}
182
183func (ti *treeIndex) Compact(rev int64) map[revision]struct{} {
184 available := make(map[revision]struct{})
185 var emptyki []*keyIndex
186 plog.Printf("store.index: compact %d", rev)
187 // TODO: do not hold the lock for long time?
188 // This is probably OK. Compacting 10M keys takes O(10ms).
189 ti.Lock()
190 defer ti.Unlock()
191 ti.tree.Ascend(compactIndex(rev, available, &emptyki))
192 for _, ki := range emptyki {
193 item := ti.tree.Delete(ki)
194 if item == nil {
195 plog.Panic("store.index: unexpected delete failure during compaction")
196 }
197 }
198 return available
199}
200
201// Keep finds all revisions to be kept for a Compaction at the given rev.
202func (ti *treeIndex) Keep(rev int64) map[revision]struct{} {
203 available := make(map[revision]struct{})
204 ti.RLock()
205 defer ti.RUnlock()
206 ti.tree.Ascend(func(i btree.Item) bool {
207 keyi := i.(*keyIndex)
208 keyi.keep(rev, available)
209 return true
210 })
211 return available
212}
213
214func compactIndex(rev int64, available map[revision]struct{}, emptyki *[]*keyIndex) func(i btree.Item) bool {
215 return func(i btree.Item) bool {
216 keyi := i.(*keyIndex)
217 keyi.compact(rev, available)
218 if keyi.isEmpty() {
219 *emptyki = append(*emptyki, keyi)
220 }
221 return true
222 }
223}
224
225func (ti *treeIndex) Equal(bi index) bool {
226 b := bi.(*treeIndex)
227
228 if ti.tree.Len() != b.tree.Len() {
229 return false
230 }
231
232 equal := true
233
234 ti.tree.Ascend(func(item btree.Item) bool {
235 aki := item.(*keyIndex)
236 bki := b.tree.Get(item).(*keyIndex)
237 if !aki.equal(bki) {
238 equal = false
239 return false
240 }
241 return true
242 })
243
244 return equal
245}
246
247func (ti *treeIndex) Insert(ki *keyIndex) {
248 ti.Lock()
249 defer ti.Unlock()
250 ti.tree.ReplaceOrInsert(ki)
251}