blob: 5a5e2312b7268a38a590370b492092f2df79262d [file] [log] [blame]
khenaidooffe076b2019-01-15 16:08:08 -05001// Copyright 2017 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package leasing
16
17import (
18 "context"
19 "strings"
20 "sync"
21 "time"
22
23 v3 "github.com/coreos/etcd/clientv3"
24 "github.com/coreos/etcd/clientv3/concurrency"
25 "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
26 pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
27 "github.com/coreos/etcd/mvcc/mvccpb"
28
29 "google.golang.org/grpc/codes"
30 "google.golang.org/grpc/status"
31)
32
33type leasingKV struct {
34 cl *v3.Client
35 kv v3.KV
36 pfx string
37 leases leaseCache
38
39 ctx context.Context
40 cancel context.CancelFunc
41 wg sync.WaitGroup
42
43 sessionOpts []concurrency.SessionOption
44 session *concurrency.Session
45 sessionc chan struct{}
46}
47
48var closedCh chan struct{}
49
50func init() {
51 closedCh = make(chan struct{})
52 close(closedCh)
53}
54
55// NewKV wraps a KV instance so that all requests are wired through a leasing protocol.
56func NewKV(cl *v3.Client, pfx string, opts ...concurrency.SessionOption) (v3.KV, func(), error) {
57 cctx, cancel := context.WithCancel(cl.Ctx())
58 lkv := &leasingKV{
59 cl: cl,
60 kv: cl.KV,
61 pfx: pfx,
62 leases: leaseCache{revokes: make(map[string]time.Time)},
63 ctx: cctx,
64 cancel: cancel,
65 sessionOpts: opts,
66 sessionc: make(chan struct{}),
67 }
68 lkv.wg.Add(2)
69 go func() {
70 defer lkv.wg.Done()
71 lkv.monitorSession()
72 }()
73 go func() {
74 defer lkv.wg.Done()
75 lkv.leases.clearOldRevokes(cctx)
76 }()
77 return lkv, lkv.Close, lkv.waitSession(cctx)
78}
79
80func (lkv *leasingKV) Close() {
81 lkv.cancel()
82 lkv.wg.Wait()
83}
84
85func (lkv *leasingKV) Get(ctx context.Context, key string, opts ...v3.OpOption) (*v3.GetResponse, error) {
86 return lkv.get(ctx, v3.OpGet(key, opts...))
87}
88
89func (lkv *leasingKV) Put(ctx context.Context, key, val string, opts ...v3.OpOption) (*v3.PutResponse, error) {
90 return lkv.put(ctx, v3.OpPut(key, val, opts...))
91}
92
93func (lkv *leasingKV) Delete(ctx context.Context, key string, opts ...v3.OpOption) (*v3.DeleteResponse, error) {
94 return lkv.delete(ctx, v3.OpDelete(key, opts...))
95}
96
97func (lkv *leasingKV) Do(ctx context.Context, op v3.Op) (v3.OpResponse, error) {
98 switch {
99 case op.IsGet():
100 resp, err := lkv.get(ctx, op)
101 return resp.OpResponse(), err
102 case op.IsPut():
103 resp, err := lkv.put(ctx, op)
104 return resp.OpResponse(), err
105 case op.IsDelete():
106 resp, err := lkv.delete(ctx, op)
107 return resp.OpResponse(), err
108 case op.IsTxn():
109 cmps, thenOps, elseOps := op.Txn()
110 resp, err := lkv.Txn(ctx).If(cmps...).Then(thenOps...).Else(elseOps...).Commit()
111 return resp.OpResponse(), err
112 }
113 return v3.OpResponse{}, nil
114}
115
116func (lkv *leasingKV) Compact(ctx context.Context, rev int64, opts ...v3.CompactOption) (*v3.CompactResponse, error) {
117 return lkv.kv.Compact(ctx, rev, opts...)
118}
119
120func (lkv *leasingKV) Txn(ctx context.Context) v3.Txn {
121 return &txnLeasing{Txn: lkv.kv.Txn(ctx), lkv: lkv, ctx: ctx}
122}
123
124func (lkv *leasingKV) monitorSession() {
125 for lkv.ctx.Err() == nil {
126 if lkv.session != nil {
127 select {
128 case <-lkv.session.Done():
129 case <-lkv.ctx.Done():
130 return
131 }
132 }
133 lkv.leases.mu.Lock()
134 select {
135 case <-lkv.sessionc:
136 lkv.sessionc = make(chan struct{})
137 default:
138 }
139 lkv.leases.entries = make(map[string]*leaseKey)
140 lkv.leases.mu.Unlock()
141
142 s, err := concurrency.NewSession(lkv.cl, lkv.sessionOpts...)
143 if err != nil {
144 continue
145 }
146
147 lkv.leases.mu.Lock()
148 lkv.session = s
149 close(lkv.sessionc)
150 lkv.leases.mu.Unlock()
151 }
152}
153
154func (lkv *leasingKV) monitorLease(ctx context.Context, key string, rev int64) {
155 cctx, cancel := context.WithCancel(lkv.ctx)
156 defer cancel()
157 for cctx.Err() == nil {
158 if rev == 0 {
159 resp, err := lkv.kv.Get(ctx, lkv.pfx+key)
160 if err != nil {
161 continue
162 }
163 rev = resp.Header.Revision
164 if len(resp.Kvs) == 0 || string(resp.Kvs[0].Value) == "REVOKE" {
165 lkv.rescind(cctx, key, rev)
166 return
167 }
168 }
169 wch := lkv.cl.Watch(cctx, lkv.pfx+key, v3.WithRev(rev+1))
170 for resp := range wch {
171 for _, ev := range resp.Events {
172 if string(ev.Kv.Value) != "REVOKE" {
173 continue
174 }
175 if v3.LeaseID(ev.Kv.Lease) == lkv.leaseID() {
176 lkv.rescind(cctx, key, ev.Kv.ModRevision)
177 }
178 return
179 }
180 }
181 rev = 0
182 }
183}
184
185// rescind releases a lease from this client.
186func (lkv *leasingKV) rescind(ctx context.Context, key string, rev int64) {
187 if lkv.leases.Evict(key) > rev {
188 return
189 }
190 cmp := v3.Compare(v3.CreateRevision(lkv.pfx+key), "<", rev)
191 op := v3.OpDelete(lkv.pfx + key)
192 for ctx.Err() == nil {
193 if _, err := lkv.kv.Txn(ctx).If(cmp).Then(op).Commit(); err == nil {
194 return
195 }
196 }
197}
198
199func (lkv *leasingKV) waitRescind(ctx context.Context, key string, rev int64) error {
200 cctx, cancel := context.WithCancel(ctx)
201 defer cancel()
202 wch := lkv.cl.Watch(cctx, lkv.pfx+key, v3.WithRev(rev+1))
203 for resp := range wch {
204 for _, ev := range resp.Events {
205 if ev.Type == v3.EventTypeDelete {
206 return ctx.Err()
207 }
208 }
209 }
210 return ctx.Err()
211}
212
213func (lkv *leasingKV) tryModifyOp(ctx context.Context, op v3.Op) (*v3.TxnResponse, chan<- struct{}, error) {
214 key := string(op.KeyBytes())
215 wc, rev := lkv.leases.Lock(key)
216 cmp := v3.Compare(v3.CreateRevision(lkv.pfx+key), "<", rev+1)
217 resp, err := lkv.kv.Txn(ctx).If(cmp).Then(op).Commit()
218 switch {
219 case err != nil:
220 lkv.leases.Evict(key)
221 fallthrough
222 case !resp.Succeeded:
223 if wc != nil {
224 close(wc)
225 }
226 return nil, nil, err
227 }
228 return resp, wc, nil
229}
230
231func (lkv *leasingKV) put(ctx context.Context, op v3.Op) (pr *v3.PutResponse, err error) {
232 if err := lkv.waitSession(ctx); err != nil {
233 return nil, err
234 }
235 for ctx.Err() == nil {
236 resp, wc, err := lkv.tryModifyOp(ctx, op)
237 if err != nil || wc == nil {
238 resp, err = lkv.revoke(ctx, string(op.KeyBytes()), op)
239 }
240 if err != nil {
241 return nil, err
242 }
243 if resp.Succeeded {
244 lkv.leases.mu.Lock()
245 lkv.leases.Update(op.KeyBytes(), op.ValueBytes(), resp.Header)
246 lkv.leases.mu.Unlock()
247 pr = (*v3.PutResponse)(resp.Responses[0].GetResponsePut())
248 pr.Header = resp.Header
249 }
250 if wc != nil {
251 close(wc)
252 }
253 if resp.Succeeded {
254 return pr, nil
255 }
256 }
257 return nil, ctx.Err()
258}
259
260func (lkv *leasingKV) acquire(ctx context.Context, key string, op v3.Op) (*v3.TxnResponse, error) {
261 for ctx.Err() == nil {
262 if err := lkv.waitSession(ctx); err != nil {
263 return nil, err
264 }
265 lcmp := v3.Cmp{Key: []byte(key), Target: pb.Compare_LEASE}
266 resp, err := lkv.kv.Txn(ctx).If(
267 v3.Compare(v3.CreateRevision(lkv.pfx+key), "=", 0),
268 v3.Compare(lcmp, "=", 0)).
269 Then(
270 op,
271 v3.OpPut(lkv.pfx+key, "", v3.WithLease(lkv.leaseID()))).
272 Else(
273 op,
274 v3.OpGet(lkv.pfx+key),
275 ).Commit()
276 if err == nil {
277 if !resp.Succeeded {
278 kvs := resp.Responses[1].GetResponseRange().Kvs
279 // if txn failed since already owner, lease is acquired
280 resp.Succeeded = len(kvs) > 0 && v3.LeaseID(kvs[0].Lease) == lkv.leaseID()
281 }
282 return resp, nil
283 }
284 // retry if transient error
285 if _, ok := err.(rpctypes.EtcdError); ok {
286 return nil, err
287 }
288 if ev, _ := status.FromError(err); ev.Code() != codes.Unavailable {
289 return nil, err
290 }
291 }
292 return nil, ctx.Err()
293}
294
295func (lkv *leasingKV) get(ctx context.Context, op v3.Op) (*v3.GetResponse, error) {
296 do := func() (*v3.GetResponse, error) {
297 r, err := lkv.kv.Do(ctx, op)
298 return r.Get(), err
299 }
300 if !lkv.readySession() {
301 return do()
302 }
303
304 if resp, ok := lkv.leases.Get(ctx, op); resp != nil {
305 return resp, nil
306 } else if !ok || op.IsSerializable() {
307 // must be handled by server or can skip linearization
308 return do()
309 }
310
311 key := string(op.KeyBytes())
312 if !lkv.leases.MayAcquire(key) {
313 resp, err := lkv.kv.Do(ctx, op)
314 return resp.Get(), err
315 }
316
317 resp, err := lkv.acquire(ctx, key, v3.OpGet(key))
318 if err != nil {
319 return nil, err
320 }
321 getResp := (*v3.GetResponse)(resp.Responses[0].GetResponseRange())
322 getResp.Header = resp.Header
323 if resp.Succeeded {
324 getResp = lkv.leases.Add(key, getResp, op)
325 lkv.wg.Add(1)
326 go func() {
327 defer lkv.wg.Done()
328 lkv.monitorLease(ctx, key, resp.Header.Revision)
329 }()
330 }
331 return getResp, nil
332}
333
334func (lkv *leasingKV) deleteRangeRPC(ctx context.Context, maxLeaseRev int64, key, end string) (*v3.DeleteResponse, error) {
335 lkey, lend := lkv.pfx+key, lkv.pfx+end
336 resp, err := lkv.kv.Txn(ctx).If(
337 v3.Compare(v3.CreateRevision(lkey).WithRange(lend), "<", maxLeaseRev+1),
338 ).Then(
339 v3.OpGet(key, v3.WithRange(end), v3.WithKeysOnly()),
340 v3.OpDelete(key, v3.WithRange(end)),
341 ).Commit()
342 if err != nil {
343 lkv.leases.EvictRange(key, end)
344 return nil, err
345 }
346 if !resp.Succeeded {
347 return nil, nil
348 }
349 for _, kv := range resp.Responses[0].GetResponseRange().Kvs {
350 lkv.leases.Delete(string(kv.Key), resp.Header)
351 }
352 delResp := (*v3.DeleteResponse)(resp.Responses[1].GetResponseDeleteRange())
353 delResp.Header = resp.Header
354 return delResp, nil
355}
356
357func (lkv *leasingKV) deleteRange(ctx context.Context, op v3.Op) (*v3.DeleteResponse, error) {
358 key, end := string(op.KeyBytes()), string(op.RangeBytes())
359 for ctx.Err() == nil {
360 maxLeaseRev, err := lkv.revokeRange(ctx, key, end)
361 if err != nil {
362 return nil, err
363 }
364 wcs := lkv.leases.LockRange(key, end)
365 delResp, err := lkv.deleteRangeRPC(ctx, maxLeaseRev, key, end)
366 closeAll(wcs)
367 if err != nil || delResp != nil {
368 return delResp, err
369 }
370 }
371 return nil, ctx.Err()
372}
373
374func (lkv *leasingKV) delete(ctx context.Context, op v3.Op) (dr *v3.DeleteResponse, err error) {
375 if err := lkv.waitSession(ctx); err != nil {
376 return nil, err
377 }
378 if len(op.RangeBytes()) > 0 {
379 return lkv.deleteRange(ctx, op)
380 }
381 key := string(op.KeyBytes())
382 for ctx.Err() == nil {
383 resp, wc, err := lkv.tryModifyOp(ctx, op)
384 if err != nil || wc == nil {
385 resp, err = lkv.revoke(ctx, key, op)
386 }
387 if err != nil {
388 // don't know if delete was processed
389 lkv.leases.Evict(key)
390 return nil, err
391 }
392 if resp.Succeeded {
393 dr = (*v3.DeleteResponse)(resp.Responses[0].GetResponseDeleteRange())
394 dr.Header = resp.Header
395 lkv.leases.Delete(key, dr.Header)
396 }
397 if wc != nil {
398 close(wc)
399 }
400 if resp.Succeeded {
401 return dr, nil
402 }
403 }
404 return nil, ctx.Err()
405}
406
407func (lkv *leasingKV) revoke(ctx context.Context, key string, op v3.Op) (*v3.TxnResponse, error) {
408 rev := lkv.leases.Rev(key)
409 txn := lkv.kv.Txn(ctx).If(v3.Compare(v3.CreateRevision(lkv.pfx+key), "<", rev+1)).Then(op)
410 resp, err := txn.Else(v3.OpPut(lkv.pfx+key, "REVOKE", v3.WithIgnoreLease())).Commit()
411 if err != nil || resp.Succeeded {
412 return resp, err
413 }
414 return resp, lkv.waitRescind(ctx, key, resp.Header.Revision)
415}
416
417func (lkv *leasingKV) revokeRange(ctx context.Context, begin, end string) (int64, error) {
418 lkey, lend := lkv.pfx+begin, ""
419 if len(end) > 0 {
420 lend = lkv.pfx + end
421 }
422 leaseKeys, err := lkv.kv.Get(ctx, lkey, v3.WithRange(lend))
423 if err != nil {
424 return 0, err
425 }
426 return lkv.revokeLeaseKvs(ctx, leaseKeys.Kvs)
427}
428
429func (lkv *leasingKV) revokeLeaseKvs(ctx context.Context, kvs []*mvccpb.KeyValue) (int64, error) {
430 maxLeaseRev := int64(0)
431 for _, kv := range kvs {
432 if rev := kv.CreateRevision; rev > maxLeaseRev {
433 maxLeaseRev = rev
434 }
435 if v3.LeaseID(kv.Lease) == lkv.leaseID() {
436 // don't revoke own keys
437 continue
438 }
439 key := strings.TrimPrefix(string(kv.Key), lkv.pfx)
440 if _, err := lkv.revoke(ctx, key, v3.OpGet(key)); err != nil {
441 return 0, err
442 }
443 }
444 return maxLeaseRev, nil
445}
446
447func (lkv *leasingKV) waitSession(ctx context.Context) error {
448 lkv.leases.mu.RLock()
449 sessionc := lkv.sessionc
450 lkv.leases.mu.RUnlock()
451 select {
452 case <-sessionc:
453 return nil
454 case <-lkv.ctx.Done():
455 return lkv.ctx.Err()
456 case <-ctx.Done():
457 return ctx.Err()
458 }
459}
460
461func (lkv *leasingKV) readySession() bool {
462 lkv.leases.mu.RLock()
463 defer lkv.leases.mu.RUnlock()
464 if lkv.session == nil {
465 return false
466 }
467 select {
468 case <-lkv.session.Done():
469 default:
470 return true
471 }
472 return false
473}
474
475func (lkv *leasingKV) leaseID() v3.LeaseID {
476 lkv.leases.mu.RLock()
477 defer lkv.leases.mu.RUnlock()
478 return lkv.session.Lease()
479}