blob: f8cc6df88cfbd564388b41e02e6b6c65c86dc265 [file] [log] [blame]
khenaidooab1f7bd2019-11-14 14:00:27 -05001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package mvcc
16
17import (
18 "sort"
19 "sync"
20
21 "github.com/google/btree"
22 "go.uber.org/zap"
23)
24
25type index interface {
26 Get(key []byte, atRev int64) (rev, created revision, ver int64, err error)
27 Range(key, end []byte, atRev int64) ([][]byte, []revision)
28 Revisions(key, end []byte, atRev int64) []revision
29 Put(key []byte, rev revision)
30 Tombstone(key []byte, rev revision) error
31 RangeSince(key, end []byte, rev int64) []revision
32 Compact(rev int64) map[revision]struct{}
33 Keep(rev int64) map[revision]struct{}
34 Equal(b index) bool
35
36 Insert(ki *keyIndex)
37 KeyIndex(ki *keyIndex) *keyIndex
38}
39
40type treeIndex struct {
41 sync.RWMutex
42 tree *btree.BTree
43 lg *zap.Logger
44}
45
46func newTreeIndex(lg *zap.Logger) index {
47 return &treeIndex{
48 tree: btree.New(32),
49 lg: lg,
50 }
51}
52
53func (ti *treeIndex) Put(key []byte, rev revision) {
54 keyi := &keyIndex{key: key}
55
56 ti.Lock()
57 defer ti.Unlock()
58 item := ti.tree.Get(keyi)
59 if item == nil {
60 keyi.put(ti.lg, rev.main, rev.sub)
61 ti.tree.ReplaceOrInsert(keyi)
62 return
63 }
64 okeyi := item.(*keyIndex)
65 okeyi.put(ti.lg, rev.main, rev.sub)
66}
67
68func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created revision, ver int64, err error) {
69 keyi := &keyIndex{key: key}
70 ti.RLock()
71 defer ti.RUnlock()
72 if keyi = ti.keyIndex(keyi); keyi == nil {
73 return revision{}, revision{}, 0, ErrRevisionNotFound
74 }
75 return keyi.get(ti.lg, atRev)
76}
77
78func (ti *treeIndex) KeyIndex(keyi *keyIndex) *keyIndex {
79 ti.RLock()
80 defer ti.RUnlock()
81 return ti.keyIndex(keyi)
82}
83
84func (ti *treeIndex) keyIndex(keyi *keyIndex) *keyIndex {
85 if item := ti.tree.Get(keyi); item != nil {
86 return item.(*keyIndex)
87 }
88 return nil
89}
90
91func (ti *treeIndex) visit(key, end []byte, f func(ki *keyIndex)) {
92 keyi, endi := &keyIndex{key: key}, &keyIndex{key: end}
93
94 ti.RLock()
95 defer ti.RUnlock()
96
97 ti.tree.AscendGreaterOrEqual(keyi, func(item btree.Item) bool {
98 if len(endi.key) > 0 && !item.Less(endi) {
99 return false
100 }
101 f(item.(*keyIndex))
102 return true
103 })
104}
105
106func (ti *treeIndex) Revisions(key, end []byte, atRev int64) (revs []revision) {
107 if end == nil {
108 rev, _, _, err := ti.Get(key, atRev)
109 if err != nil {
110 return nil
111 }
112 return []revision{rev}
113 }
114 ti.visit(key, end, func(ki *keyIndex) {
115 if rev, _, _, err := ki.get(ti.lg, atRev); err == nil {
116 revs = append(revs, rev)
117 }
118 })
119 return revs
120}
121
122func (ti *treeIndex) Range(key, end []byte, atRev int64) (keys [][]byte, revs []revision) {
123 if end == nil {
124 rev, _, _, err := ti.Get(key, atRev)
125 if err != nil {
126 return nil, nil
127 }
128 return [][]byte{key}, []revision{rev}
129 }
130 ti.visit(key, end, func(ki *keyIndex) {
131 if rev, _, _, err := ki.get(ti.lg, atRev); err == nil {
132 revs = append(revs, rev)
133 keys = append(keys, ki.key)
134 }
135 })
136 return keys, revs
137}
138
139func (ti *treeIndex) Tombstone(key []byte, rev revision) error {
140 keyi := &keyIndex{key: key}
141
142 ti.Lock()
143 defer ti.Unlock()
144 item := ti.tree.Get(keyi)
145 if item == nil {
146 return ErrRevisionNotFound
147 }
148
149 ki := item.(*keyIndex)
150 return ki.tombstone(ti.lg, rev.main, rev.sub)
151}
152
153// RangeSince returns all revisions from key(including) to end(excluding)
154// at or after the given rev. The returned slice is sorted in the order
155// of revision.
156func (ti *treeIndex) RangeSince(key, end []byte, rev int64) []revision {
157 keyi := &keyIndex{key: key}
158
159 ti.RLock()
160 defer ti.RUnlock()
161
162 if end == nil {
163 item := ti.tree.Get(keyi)
164 if item == nil {
165 return nil
166 }
167 keyi = item.(*keyIndex)
168 return keyi.since(ti.lg, rev)
169 }
170
171 endi := &keyIndex{key: end}
172 var revs []revision
173 ti.tree.AscendGreaterOrEqual(keyi, func(item btree.Item) bool {
174 if len(endi.key) > 0 && !item.Less(endi) {
175 return false
176 }
177 curKeyi := item.(*keyIndex)
178 revs = append(revs, curKeyi.since(ti.lg, rev)...)
179 return true
180 })
181 sort.Sort(revisions(revs))
182
183 return revs
184}
185
186func (ti *treeIndex) Compact(rev int64) map[revision]struct{} {
187 available := make(map[revision]struct{})
188 if ti.lg != nil {
189 ti.lg.Info("compact tree index", zap.Int64("revision", rev))
190 } else {
191 plog.Printf("store.index: compact %d", rev)
192 }
193 ti.Lock()
194 clone := ti.tree.Clone()
195 ti.Unlock()
196
197 clone.Ascend(func(item btree.Item) bool {
198 keyi := item.(*keyIndex)
199 //Lock is needed here to prevent modification to the keyIndex while
200 //compaction is going on or revision added to empty before deletion
201 ti.Lock()
202 keyi.compact(ti.lg, rev, available)
203 if keyi.isEmpty() {
204 item := ti.tree.Delete(keyi)
205 if item == nil {
206 if ti.lg != nil {
207 ti.lg.Panic("failed to delete during compaction")
208 } else {
209 plog.Panic("store.index: unexpected delete failure during compaction")
210 }
211 }
212 }
213 ti.Unlock()
214 return true
215 })
216 return available
217}
218
219// Keep finds all revisions to be kept for a Compaction at the given rev.
220func (ti *treeIndex) Keep(rev int64) map[revision]struct{} {
221 available := make(map[revision]struct{})
222 ti.RLock()
223 defer ti.RUnlock()
224 ti.tree.Ascend(func(i btree.Item) bool {
225 keyi := i.(*keyIndex)
226 keyi.keep(rev, available)
227 return true
228 })
229 return available
230}
231
232func (ti *treeIndex) Equal(bi index) bool {
233 b := bi.(*treeIndex)
234
235 if ti.tree.Len() != b.tree.Len() {
236 return false
237 }
238
239 equal := true
240
241 ti.tree.Ascend(func(item btree.Item) bool {
242 aki := item.(*keyIndex)
243 bki := b.tree.Get(item).(*keyIndex)
244 if !aki.equal(bki) {
245 equal = false
246 return false
247 }
248 return true
249 })
250
251 return equal
252}
253
254func (ti *treeIndex) Insert(ki *keyIndex) {
255 ti.Lock()
256 defer ti.Unlock()
257 ti.tree.ReplaceOrInsert(ki)
258}