blob: 9698254644db4d8407413e58e5e4df18cbeb5f3b [file] [log] [blame]
khenaidooab1f7bd2019-11-14 14:00:27 -05001// Copyright 2017 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package mvcc
16
17import (
18 "go.etcd.io/etcd/lease"
19 "go.etcd.io/etcd/mvcc/backend"
20 "go.etcd.io/etcd/mvcc/mvccpb"
21 "go.uber.org/zap"
22)
23
24type storeTxnRead struct {
25 s *store
26 tx backend.ReadTx
27
28 firstRev int64
29 rev int64
30}
31
32func (s *store) Read() TxnRead {
33 s.mu.RLock()
34 s.revMu.RLock()
35 // backend holds b.readTx.RLock() only when creating the concurrentReadTx. After
36 // ConcurrentReadTx is created, it will not block write transaction.
37 tx := s.b.ConcurrentReadTx()
38 tx.RLock() // RLock is no-op. concurrentReadTx does not need to be locked after it is created.
39 firstRev, rev := s.compactMainRev, s.currentRev
40 s.revMu.RUnlock()
41 return newMetricsTxnRead(&storeTxnRead{s, tx, firstRev, rev})
42}
43
44func (tr *storeTxnRead) FirstRev() int64 { return tr.firstRev }
45func (tr *storeTxnRead) Rev() int64 { return tr.rev }
46
47func (tr *storeTxnRead) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
48 return tr.rangeKeys(key, end, tr.Rev(), ro)
49}
50
51func (tr *storeTxnRead) End() {
52 tr.tx.RUnlock() // RUnlock signals the end of concurrentReadTx.
53 tr.s.mu.RUnlock()
54}
55
56type storeTxnWrite struct {
57 storeTxnRead
58 tx backend.BatchTx
59 // beginRev is the revision where the txn begins; it will write to the next revision.
60 beginRev int64
61 changes []mvccpb.KeyValue
62}
63
64func (s *store) Write() TxnWrite {
65 s.mu.RLock()
66 tx := s.b.BatchTx()
67 tx.Lock()
68 tw := &storeTxnWrite{
69 storeTxnRead: storeTxnRead{s, tx, 0, 0},
70 tx: tx,
71 beginRev: s.currentRev,
72 changes: make([]mvccpb.KeyValue, 0, 4),
73 }
74 return newMetricsTxnWrite(tw)
75}
76
77func (tw *storeTxnWrite) Rev() int64 { return tw.beginRev }
78
79func (tw *storeTxnWrite) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
80 rev := tw.beginRev
81 if len(tw.changes) > 0 {
82 rev++
83 }
84 return tw.rangeKeys(key, end, rev, ro)
85}
86
87func (tw *storeTxnWrite) DeleteRange(key, end []byte) (int64, int64) {
88 if n := tw.deleteRange(key, end); n != 0 || len(tw.changes) > 0 {
89 return n, tw.beginRev + 1
90 }
91 return 0, tw.beginRev
92}
93
94func (tw *storeTxnWrite) Put(key, value []byte, lease lease.LeaseID) int64 {
95 tw.put(key, value, lease)
96 return tw.beginRev + 1
97}
98
99func (tw *storeTxnWrite) End() {
100 // only update index if the txn modifies the mvcc state.
101 if len(tw.changes) != 0 {
102 tw.s.saveIndex(tw.tx)
103 // hold revMu lock to prevent new read txns from opening until writeback.
104 tw.s.revMu.Lock()
105 tw.s.currentRev++
106 }
107 tw.tx.Unlock()
108 if len(tw.changes) != 0 {
109 tw.s.revMu.Unlock()
110 }
111 tw.s.mu.RUnlock()
112}
113
114func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions) (*RangeResult, error) {
115 rev := ro.Rev
116 if rev > curRev {
117 return &RangeResult{KVs: nil, Count: -1, Rev: curRev}, ErrFutureRev
118 }
119 if rev <= 0 {
120 rev = curRev
121 }
122 if rev < tr.s.compactMainRev {
123 return &RangeResult{KVs: nil, Count: -1, Rev: 0}, ErrCompacted
124 }
125
126 revpairs := tr.s.kvindex.Revisions(key, end, rev)
127 if len(revpairs) == 0 {
128 return &RangeResult{KVs: nil, Count: 0, Rev: curRev}, nil
129 }
130 if ro.Count {
131 return &RangeResult{KVs: nil, Count: len(revpairs), Rev: curRev}, nil
132 }
133
134 limit := int(ro.Limit)
135 if limit <= 0 || limit > len(revpairs) {
136 limit = len(revpairs)
137 }
138
139 kvs := make([]mvccpb.KeyValue, limit)
140 revBytes := newRevBytes()
141 for i, revpair := range revpairs[:len(kvs)] {
142 revToBytes(revpair, revBytes)
143 _, vs := tr.tx.UnsafeRange(keyBucketName, revBytes, nil, 0)
144 if len(vs) != 1 {
145 if tr.s.lg != nil {
146 tr.s.lg.Fatal(
147 "range failed to find revision pair",
148 zap.Int64("revision-main", revpair.main),
149 zap.Int64("revision-sub", revpair.sub),
150 )
151 } else {
152 plog.Fatalf("range cannot find rev (%d,%d)", revpair.main, revpair.sub)
153 }
154 }
155 if err := kvs[i].Unmarshal(vs[0]); err != nil {
156 if tr.s.lg != nil {
157 tr.s.lg.Fatal(
158 "failed to unmarshal mvccpb.KeyValue",
159 zap.Error(err),
160 )
161 } else {
162 plog.Fatalf("cannot unmarshal event: %v", err)
163 }
164 }
165 }
166 return &RangeResult{KVs: kvs, Count: len(revpairs), Rev: curRev}, nil
167}
168
169func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
170 rev := tw.beginRev + 1
171 c := rev
172 oldLease := lease.NoLease
173
174 // if the key exists before, use its previous created and
175 // get its previous leaseID
176 _, created, ver, err := tw.s.kvindex.Get(key, rev)
177 if err == nil {
178 c = created.main
179 oldLease = tw.s.le.GetLease(lease.LeaseItem{Key: string(key)})
180 }
181
182 ibytes := newRevBytes()
183 idxRev := revision{main: rev, sub: int64(len(tw.changes))}
184 revToBytes(idxRev, ibytes)
185
186 ver = ver + 1
187 kv := mvccpb.KeyValue{
188 Key: key,
189 Value: value,
190 CreateRevision: c,
191 ModRevision: rev,
192 Version: ver,
193 Lease: int64(leaseID),
194 }
195
196 d, err := kv.Marshal()
197 if err != nil {
198 if tw.storeTxnRead.s.lg != nil {
199 tw.storeTxnRead.s.lg.Fatal(
200 "failed to marshal mvccpb.KeyValue",
201 zap.Error(err),
202 )
203 } else {
204 plog.Fatalf("cannot marshal event: %v", err)
205 }
206 }
207
208 tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
209 tw.s.kvindex.Put(key, idxRev)
210 tw.changes = append(tw.changes, kv)
211
212 if oldLease != lease.NoLease {
213 if tw.s.le == nil {
214 panic("no lessor to detach lease")
215 }
216 err = tw.s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}})
217 if err != nil {
218 if tw.storeTxnRead.s.lg != nil {
219 tw.storeTxnRead.s.lg.Fatal(
220 "failed to detach old lease from a key",
221 zap.Error(err),
222 )
223 } else {
224 plog.Errorf("unexpected error from lease detach: %v", err)
225 }
226 }
227 }
228 if leaseID != lease.NoLease {
229 if tw.s.le == nil {
230 panic("no lessor to attach lease")
231 }
232 err = tw.s.le.Attach(leaseID, []lease.LeaseItem{{Key: string(key)}})
233 if err != nil {
234 panic("unexpected error from lease Attach")
235 }
236 }
237}
238
239func (tw *storeTxnWrite) deleteRange(key, end []byte) int64 {
240 rrev := tw.beginRev
241 if len(tw.changes) > 0 {
242 rrev++
243 }
244 keys, _ := tw.s.kvindex.Range(key, end, rrev)
245 if len(keys) == 0 {
246 return 0
247 }
248 for _, key := range keys {
249 tw.delete(key)
250 }
251 return int64(len(keys))
252}
253
254func (tw *storeTxnWrite) delete(key []byte) {
255 ibytes := newRevBytes()
256 idxRev := revision{main: tw.beginRev + 1, sub: int64(len(tw.changes))}
257 revToBytes(idxRev, ibytes)
258
259 if tw.storeTxnRead.s != nil && tw.storeTxnRead.s.lg != nil {
260 ibytes = appendMarkTombstone(tw.storeTxnRead.s.lg, ibytes)
261 } else {
262 // TODO: remove this in v3.5
263 ibytes = appendMarkTombstone(nil, ibytes)
264 }
265
266 kv := mvccpb.KeyValue{Key: key}
267
268 d, err := kv.Marshal()
269 if err != nil {
270 if tw.storeTxnRead.s.lg != nil {
271 tw.storeTxnRead.s.lg.Fatal(
272 "failed to marshal mvccpb.KeyValue",
273 zap.Error(err),
274 )
275 } else {
276 plog.Fatalf("cannot marshal event: %v", err)
277 }
278 }
279
280 tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
281 err = tw.s.kvindex.Tombstone(key, idxRev)
282 if err != nil {
283 if tw.storeTxnRead.s.lg != nil {
284 tw.storeTxnRead.s.lg.Fatal(
285 "failed to tombstone an existing key",
286 zap.String("key", string(key)),
287 zap.Error(err),
288 )
289 } else {
290 plog.Fatalf("cannot tombstone an existing key (%s): %v", string(key), err)
291 }
292 }
293 tw.changes = append(tw.changes, kv)
294
295 item := lease.LeaseItem{Key: string(key)}
296 leaseID := tw.s.le.GetLease(item)
297
298 if leaseID != lease.NoLease {
299 err = tw.s.le.Detach(leaseID, []lease.LeaseItem{item})
300 if err != nil {
301 if tw.storeTxnRead.s.lg != nil {
302 tw.storeTxnRead.s.lg.Fatal(
303 "failed to detach old lease from a key",
304 zap.Error(err),
305 )
306 } else {
307 plog.Errorf("cannot detach %v", err)
308 }
309 }
310 }
311}
312
313func (tw *storeTxnWrite) Changes() []mvccpb.KeyValue { return tw.changes }