blob: 8896fb86dbf26b199bbfbbf8af7cebba7cdee9a1 [file] [log] [blame]
khenaidood948f772021-08-11 17:49:24 -04001// Copyright 2017 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package mvcc
16
17import (
18 "github.com/coreos/etcd/lease"
19 "github.com/coreos/etcd/mvcc/backend"
20 "github.com/coreos/etcd/mvcc/mvccpb"
21)
22
23type storeTxnRead struct {
24 s *store
25 tx backend.ReadTx
26
27 firstRev int64
28 rev int64
29}
30
31func (s *store) Read() TxnRead {
32 s.mu.RLock()
33 tx := s.b.ReadTx()
34 s.revMu.RLock()
35 tx.Lock()
36 firstRev, rev := s.compactMainRev, s.currentRev
37 s.revMu.RUnlock()
38 return newMetricsTxnRead(&storeTxnRead{s, tx, firstRev, rev})
39}
40
41func (tr *storeTxnRead) FirstRev() int64 { return tr.firstRev }
42func (tr *storeTxnRead) Rev() int64 { return tr.rev }
43
44func (tr *storeTxnRead) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
45 return tr.rangeKeys(key, end, tr.Rev(), ro)
46}
47
48func (tr *storeTxnRead) End() {
49 tr.tx.Unlock()
50 tr.s.mu.RUnlock()
51}
52
53type storeTxnWrite struct {
54 storeTxnRead
55 tx backend.BatchTx
56 // beginRev is the revision where the txn begins; it will write to the next revision.
57 beginRev int64
58 changes []mvccpb.KeyValue
59}
60
61func (s *store) Write() TxnWrite {
62 s.mu.RLock()
63 tx := s.b.BatchTx()
64 tx.Lock()
65 tw := &storeTxnWrite{
66 storeTxnRead: storeTxnRead{s, tx, 0, 0},
67 tx: tx,
68 beginRev: s.currentRev,
69 changes: make([]mvccpb.KeyValue, 0, 4),
70 }
71 return newMetricsTxnWrite(tw)
72}
73
74func (tw *storeTxnWrite) Rev() int64 { return tw.beginRev }
75
76func (tw *storeTxnWrite) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
77 rev := tw.beginRev
78 if len(tw.changes) > 0 {
79 rev++
80 }
81 return tw.rangeKeys(key, end, rev, ro)
82}
83
84func (tw *storeTxnWrite) DeleteRange(key, end []byte) (int64, int64) {
85 if n := tw.deleteRange(key, end); n != 0 || len(tw.changes) > 0 {
86 return n, int64(tw.beginRev + 1)
87 }
88 return 0, int64(tw.beginRev)
89}
90
91func (tw *storeTxnWrite) Put(key, value []byte, lease lease.LeaseID) int64 {
92 tw.put(key, value, lease)
93 return int64(tw.beginRev + 1)
94}
95
96func (tw *storeTxnWrite) End() {
97 // only update index if the txn modifies the mvcc state.
98 if len(tw.changes) != 0 {
99 tw.s.saveIndex(tw.tx)
100 // hold revMu lock to prevent new read txns from opening until writeback.
101 tw.s.revMu.Lock()
102 tw.s.currentRev++
103 }
104 tw.tx.Unlock()
105 if len(tw.changes) != 0 {
106 tw.s.revMu.Unlock()
107 }
108 tw.s.mu.RUnlock()
109}
110
111func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions) (*RangeResult, error) {
112 rev := ro.Rev
113 if rev > curRev {
114 return &RangeResult{KVs: nil, Count: -1, Rev: curRev}, ErrFutureRev
115 }
116 if rev <= 0 {
117 rev = curRev
118 }
119 if rev < tr.s.compactMainRev {
120 return &RangeResult{KVs: nil, Count: -1, Rev: 0}, ErrCompacted
121 }
122
123 revpairs := tr.s.kvindex.Revisions(key, end, int64(rev))
124 if len(revpairs) == 0 {
125 return &RangeResult{KVs: nil, Count: 0, Rev: curRev}, nil
126 }
127 if ro.Count {
128 return &RangeResult{KVs: nil, Count: len(revpairs), Rev: curRev}, nil
129 }
130
131 limit := int(ro.Limit)
132 if limit <= 0 || limit > len(revpairs) {
133 limit = len(revpairs)
134 }
135
136 kvs := make([]mvccpb.KeyValue, limit)
137 revBytes := newRevBytes()
138 for i, revpair := range revpairs[:len(kvs)] {
139 revToBytes(revpair, revBytes)
140 _, vs := tr.tx.UnsafeRange(keyBucketName, revBytes, nil, 0)
141 if len(vs) != 1 {
142 plog.Fatalf("range cannot find rev (%d,%d)", revpair.main, revpair.sub)
143 }
144 if err := kvs[i].Unmarshal(vs[0]); err != nil {
145 plog.Fatalf("cannot unmarshal event: %v", err)
146 }
147 }
148 return &RangeResult{KVs: kvs, Count: len(revpairs), Rev: curRev}, nil
149}
150
151func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
152 rev := tw.beginRev + 1
153 c := rev
154 oldLease := lease.NoLease
155
156 // if the key exists before, use its previous created and
157 // get its previous leaseID
158 _, created, ver, err := tw.s.kvindex.Get(key, rev)
159 if err == nil {
160 c = created.main
161 oldLease = tw.s.le.GetLease(lease.LeaseItem{Key: string(key)})
162 }
163
164 ibytes := newRevBytes()
165 idxRev := revision{main: rev, sub: int64(len(tw.changes))}
166 revToBytes(idxRev, ibytes)
167
168 ver = ver + 1
169 kv := mvccpb.KeyValue{
170 Key: key,
171 Value: value,
172 CreateRevision: c,
173 ModRevision: rev,
174 Version: ver,
175 Lease: int64(leaseID),
176 }
177
178 d, err := kv.Marshal()
179 if err != nil {
180 plog.Fatalf("cannot marshal event: %v", err)
181 }
182
183 tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
184 tw.s.kvindex.Put(key, idxRev)
185 tw.changes = append(tw.changes, kv)
186
187 if oldLease != lease.NoLease {
188 if tw.s.le == nil {
189 panic("no lessor to detach lease")
190 }
191 err = tw.s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}})
192 if err != nil {
193 plog.Errorf("unexpected error from lease detach: %v", err)
194 }
195 }
196 if leaseID != lease.NoLease {
197 if tw.s.le == nil {
198 panic("no lessor to attach lease")
199 }
200 err = tw.s.le.Attach(leaseID, []lease.LeaseItem{{Key: string(key)}})
201 if err != nil {
202 panic("unexpected error from lease Attach")
203 }
204 }
205}
206
207func (tw *storeTxnWrite) deleteRange(key, end []byte) int64 {
208 rrev := tw.beginRev
209 if len(tw.changes) > 0 {
210 rrev += 1
211 }
212 keys, revs := tw.s.kvindex.Range(key, end, rrev)
213 if len(keys) == 0 {
214 return 0
215 }
216 for i, key := range keys {
217 tw.delete(key, revs[i])
218 }
219 return int64(len(keys))
220}
221
222func (tw *storeTxnWrite) delete(key []byte, rev revision) {
223 ibytes := newRevBytes()
224 idxRev := revision{main: tw.beginRev + 1, sub: int64(len(tw.changes))}
225 revToBytes(idxRev, ibytes)
226 ibytes = appendMarkTombstone(ibytes)
227
228 kv := mvccpb.KeyValue{Key: key}
229
230 d, err := kv.Marshal()
231 if err != nil {
232 plog.Fatalf("cannot marshal event: %v", err)
233 }
234
235 tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
236 err = tw.s.kvindex.Tombstone(key, idxRev)
237 if err != nil {
238 plog.Fatalf("cannot tombstone an existing key (%s): %v", string(key), err)
239 }
240 tw.changes = append(tw.changes, kv)
241
242 item := lease.LeaseItem{Key: string(key)}
243 leaseID := tw.s.le.GetLease(item)
244
245 if leaseID != lease.NoLease {
246 err = tw.s.le.Detach(leaseID, []lease.LeaseItem{item})
247 if err != nil {
248 plog.Errorf("cannot detach %v", err)
249 }
250 }
251}
252
253func (tw *storeTxnWrite) Changes() []mvccpb.KeyValue { return tw.changes }