blob: da5b83a8abb9c8d9dc23e619d68e2f5cc99105ff [file] [log] [blame]
khenaidooffe076b2019-01-15 16:08:08 -05001// Copyright 2017 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package leasing
16
17import (
18 "context"
19 "strings"
20
21 v3 "github.com/coreos/etcd/clientv3"
22 v3pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
23)
24
25type txnLeasing struct {
26 v3.Txn
27 lkv *leasingKV
28 ctx context.Context
29 cs []v3.Cmp
30 opst []v3.Op
31 opse []v3.Op
32}
33
34func (txn *txnLeasing) If(cs ...v3.Cmp) v3.Txn {
35 txn.cs = append(txn.cs, cs...)
36 txn.Txn = txn.Txn.If(cs...)
37 return txn
38}
39
40func (txn *txnLeasing) Then(ops ...v3.Op) v3.Txn {
41 txn.opst = append(txn.opst, ops...)
42 txn.Txn = txn.Txn.Then(ops...)
43 return txn
44}
45
46func (txn *txnLeasing) Else(ops ...v3.Op) v3.Txn {
47 txn.opse = append(txn.opse, ops...)
48 txn.Txn = txn.Txn.Else(ops...)
49 return txn
50}
51
52func (txn *txnLeasing) Commit() (*v3.TxnResponse, error) {
53 if resp, err := txn.eval(); resp != nil || err != nil {
54 return resp, err
55 }
56 return txn.serverTxn()
57}
58
59func (txn *txnLeasing) eval() (*v3.TxnResponse, error) {
60 // TODO: wait on keys in comparisons
61 thenOps, elseOps := gatherOps(txn.opst), gatherOps(txn.opse)
62 ops := make([]v3.Op, 0, len(thenOps)+len(elseOps))
63 ops = append(ops, thenOps...)
64 ops = append(ops, elseOps...)
65
66 for _, ch := range txn.lkv.leases.NotifyOps(ops) {
67 select {
68 case <-ch:
69 case <-txn.ctx.Done():
70 return nil, txn.ctx.Err()
71 }
72 }
73
74 txn.lkv.leases.mu.RLock()
75 defer txn.lkv.leases.mu.RUnlock()
76 succeeded, ok := txn.lkv.leases.evalCmp(txn.cs)
77 if !ok || txn.lkv.leases.header == nil {
78 return nil, nil
79 }
80 if ops = txn.opst; !succeeded {
81 ops = txn.opse
82 }
83
84 resps, ok := txn.lkv.leases.evalOps(ops)
85 if !ok {
86 return nil, nil
87 }
88 return &v3.TxnResponse{copyHeader(txn.lkv.leases.header), succeeded, resps}, nil
89}
90
91// fallback computes the ops to fetch all possible conflicting
92// leasing keys for a list of ops.
93func (txn *txnLeasing) fallback(ops []v3.Op) (fbOps []v3.Op) {
94 for _, op := range ops {
95 if op.IsGet() {
96 continue
97 }
98 lkey, lend := txn.lkv.pfx+string(op.KeyBytes()), ""
99 if len(op.RangeBytes()) > 0 {
100 lend = txn.lkv.pfx + string(op.RangeBytes())
101 }
102 fbOps = append(fbOps, v3.OpGet(lkey, v3.WithRange(lend)))
103 }
104 return fbOps
105}
106
107func (txn *txnLeasing) guardKeys(ops []v3.Op) (cmps []v3.Cmp) {
108 seen := make(map[string]bool)
109 for _, op := range ops {
110 key := string(op.KeyBytes())
111 if op.IsGet() || len(op.RangeBytes()) != 0 || seen[key] {
112 continue
113 }
114 rev := txn.lkv.leases.Rev(key)
115 cmps = append(cmps, v3.Compare(v3.CreateRevision(txn.lkv.pfx+key), "<", rev+1))
116 seen[key] = true
117 }
118 return cmps
119}
120
121func (txn *txnLeasing) guardRanges(ops []v3.Op) (cmps []v3.Cmp, err error) {
122 for _, op := range ops {
123 if op.IsGet() || len(op.RangeBytes()) == 0 {
124 continue
125 }
126
127 key, end := string(op.KeyBytes()), string(op.RangeBytes())
128 maxRevLK, err := txn.lkv.revokeRange(txn.ctx, key, end)
129 if err != nil {
130 return nil, err
131 }
132
133 opts := append(v3.WithLastRev(), v3.WithRange(end))
134 getResp, err := txn.lkv.kv.Get(txn.ctx, key, opts...)
135 if err != nil {
136 return nil, err
137 }
138 maxModRev := int64(0)
139 if len(getResp.Kvs) > 0 {
140 maxModRev = getResp.Kvs[0].ModRevision
141 }
142
143 noKeyUpdate := v3.Compare(v3.ModRevision(key).WithRange(end), "<", maxModRev+1)
144 noLeaseUpdate := v3.Compare(
145 v3.CreateRevision(txn.lkv.pfx+key).WithRange(txn.lkv.pfx+end),
146 "<",
147 maxRevLK+1)
148 cmps = append(cmps, noKeyUpdate, noLeaseUpdate)
149 }
150 return cmps, nil
151}
152
153func (txn *txnLeasing) guard(ops []v3.Op) ([]v3.Cmp, error) {
154 cmps := txn.guardKeys(ops)
155 rangeCmps, err := txn.guardRanges(ops)
156 return append(cmps, rangeCmps...), err
157}
158
159func (txn *txnLeasing) commitToCache(txnResp *v3pb.TxnResponse, userTxn v3.Op) {
160 ops := gatherResponseOps(txnResp.Responses, []v3.Op{userTxn})
161 txn.lkv.leases.mu.Lock()
162 for _, op := range ops {
163 key := string(op.KeyBytes())
164 if op.IsDelete() && len(op.RangeBytes()) > 0 {
165 end := string(op.RangeBytes())
166 for k := range txn.lkv.leases.entries {
167 if inRange(k, key, end) {
168 txn.lkv.leases.delete(k, txnResp.Header)
169 }
170 }
171 } else if op.IsDelete() {
172 txn.lkv.leases.delete(key, txnResp.Header)
173 }
174 if op.IsPut() {
175 txn.lkv.leases.Update(op.KeyBytes(), op.ValueBytes(), txnResp.Header)
176 }
177 }
178 txn.lkv.leases.mu.Unlock()
179}
180
181func (txn *txnLeasing) revokeFallback(fbResps []*v3pb.ResponseOp) error {
182 for _, resp := range fbResps {
183 _, err := txn.lkv.revokeLeaseKvs(txn.ctx, resp.GetResponseRange().Kvs)
184 if err != nil {
185 return err
186 }
187 }
188 return nil
189}
190
191func (txn *txnLeasing) serverTxn() (*v3.TxnResponse, error) {
192 if err := txn.lkv.waitSession(txn.ctx); err != nil {
193 return nil, err
194 }
195
196 userOps := gatherOps(append(txn.opst, txn.opse...))
197 userTxn := v3.OpTxn(txn.cs, txn.opst, txn.opse)
198 fbOps := txn.fallback(userOps)
199
200 defer closeAll(txn.lkv.leases.LockWriteOps(userOps))
201 for {
202 cmps, err := txn.guard(userOps)
203 if err != nil {
204 return nil, err
205 }
206 resp, err := txn.lkv.kv.Txn(txn.ctx).If(cmps...).Then(userTxn).Else(fbOps...).Commit()
207 if err != nil {
208 for _, cmp := range cmps {
209 txn.lkv.leases.Evict(strings.TrimPrefix(string(cmp.Key), txn.lkv.pfx))
210 }
211 return nil, err
212 }
213 if resp.Succeeded {
214 txn.commitToCache((*v3pb.TxnResponse)(resp), userTxn)
215 userResp := resp.Responses[0].GetResponseTxn()
216 userResp.Header = resp.Header
217 return (*v3.TxnResponse)(userResp), nil
218 }
219 if err := txn.revokeFallback(resp.Responses); err != nil {
220 return nil, err
221 }
222 }
223}