blob: 6903a785c78901e2357d7dd1fb893d253d479751 [file] [log] [blame]
khenaidooffe076b2019-01-15 16:08:08 -05001// Copyright 2017 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package leasing
16
17import (
18 "context"
19 "strings"
20 "sync"
21 "time"
22
23 v3 "github.com/coreos/etcd/clientv3"
24 v3pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
25 "github.com/coreos/etcd/mvcc/mvccpb"
26)
27
28const revokeBackoff = 2 * time.Second
29
30type leaseCache struct {
31 mu sync.RWMutex
32 entries map[string]*leaseKey
33 revokes map[string]time.Time
34 header *v3pb.ResponseHeader
35}
36
37type leaseKey struct {
38 response *v3.GetResponse
39 // rev is the leasing key revision.
40 rev int64
41 waitc chan struct{}
42}
43
44func (lc *leaseCache) Rev(key string) int64 {
45 lc.mu.RLock()
46 defer lc.mu.RUnlock()
47 if li := lc.entries[key]; li != nil {
48 return li.rev
49 }
50 return 0
51}
52
53func (lc *leaseCache) Lock(key string) (chan<- struct{}, int64) {
54 lc.mu.Lock()
55 defer lc.mu.Unlock()
56 if li := lc.entries[key]; li != nil {
57 li.waitc = make(chan struct{})
58 return li.waitc, li.rev
59 }
60 return nil, 0
61}
62
63func (lc *leaseCache) LockRange(begin, end string) (ret []chan<- struct{}) {
64 lc.mu.Lock()
65 defer lc.mu.Unlock()
66 for k, li := range lc.entries {
67 if inRange(k, begin, end) {
68 li.waitc = make(chan struct{})
69 ret = append(ret, li.waitc)
70 }
71 }
72 return ret
73}
74
75func inRange(k, begin, end string) bool {
76 if strings.Compare(k, begin) < 0 {
77 return false
78 }
79 if end != "\x00" && strings.Compare(k, end) >= 0 {
80 return false
81 }
82 return true
83}
84
85func (lc *leaseCache) LockWriteOps(ops []v3.Op) (ret []chan<- struct{}) {
86 for _, op := range ops {
87 if op.IsGet() {
88 continue
89 }
90 key := string(op.KeyBytes())
91 if end := string(op.RangeBytes()); end == "" {
92 if wc, _ := lc.Lock(key); wc != nil {
93 ret = append(ret, wc)
94 }
95 } else {
96 for k := range lc.entries {
97 if !inRange(k, key, end) {
98 continue
99 }
100 if wc, _ := lc.Lock(k); wc != nil {
101 ret = append(ret, wc)
102 }
103 }
104 }
105 }
106 return ret
107}
108
109func (lc *leaseCache) NotifyOps(ops []v3.Op) (wcs []<-chan struct{}) {
110 for _, op := range ops {
111 if op.IsGet() {
112 if _, wc := lc.notify(string(op.KeyBytes())); wc != nil {
113 wcs = append(wcs, wc)
114 }
115 }
116 }
117 return wcs
118}
119
120func (lc *leaseCache) MayAcquire(key string) bool {
121 lc.mu.RLock()
122 lr, ok := lc.revokes[key]
123 lc.mu.RUnlock()
124 return !ok || time.Since(lr) > revokeBackoff
125}
126
127func (lc *leaseCache) Add(key string, resp *v3.GetResponse, op v3.Op) *v3.GetResponse {
128 lk := &leaseKey{resp, resp.Header.Revision, closedCh}
129 lc.mu.Lock()
130 if lc.header == nil || lc.header.Revision < resp.Header.Revision {
131 lc.header = resp.Header
132 }
133 lc.entries[key] = lk
134 ret := lk.get(op)
135 lc.mu.Unlock()
136 return ret
137}
138
139func (lc *leaseCache) Update(key, val []byte, respHeader *v3pb.ResponseHeader) {
140 li := lc.entries[string(key)]
141 if li == nil {
142 return
143 }
144 cacheResp := li.response
145 if len(cacheResp.Kvs) == 0 {
146 kv := &mvccpb.KeyValue{
147 Key: key,
148 CreateRevision: respHeader.Revision,
149 }
150 cacheResp.Kvs = append(cacheResp.Kvs, kv)
151 cacheResp.Count = 1
152 }
153 cacheResp.Kvs[0].Version++
154 if cacheResp.Kvs[0].ModRevision < respHeader.Revision {
155 cacheResp.Header = respHeader
156 cacheResp.Kvs[0].ModRevision = respHeader.Revision
157 cacheResp.Kvs[0].Value = val
158 }
159}
160
161func (lc *leaseCache) Delete(key string, hdr *v3pb.ResponseHeader) {
162 lc.mu.Lock()
163 defer lc.mu.Unlock()
164 lc.delete(key, hdr)
165}
166
167func (lc *leaseCache) delete(key string, hdr *v3pb.ResponseHeader) {
168 if li := lc.entries[key]; li != nil && hdr.Revision >= li.response.Header.Revision {
169 li.response.Kvs = nil
170 li.response.Header = copyHeader(hdr)
171 }
172}
173
174func (lc *leaseCache) Evict(key string) (rev int64) {
175 lc.mu.Lock()
176 defer lc.mu.Unlock()
177 if li := lc.entries[key]; li != nil {
178 rev = li.rev
179 delete(lc.entries, key)
180 lc.revokes[key] = time.Now()
181 }
182 return rev
183}
184
185func (lc *leaseCache) EvictRange(key, end string) {
186 lc.mu.Lock()
187 defer lc.mu.Unlock()
188 for k := range lc.entries {
189 if inRange(k, key, end) {
190 delete(lc.entries, key)
191 lc.revokes[key] = time.Now()
192 }
193 }
194}
195
196func isBadOp(op v3.Op) bool { return op.Rev() > 0 || len(op.RangeBytes()) > 0 }
197
198func (lc *leaseCache) Get(ctx context.Context, op v3.Op) (*v3.GetResponse, bool) {
199 if isBadOp(op) {
200 return nil, false
201 }
202 key := string(op.KeyBytes())
203 li, wc := lc.notify(key)
204 if li == nil {
205 return nil, true
206 }
207 select {
208 case <-wc:
209 case <-ctx.Done():
210 return nil, true
211 }
212 lc.mu.RLock()
213 lk := *li
214 ret := lk.get(op)
215 lc.mu.RUnlock()
216 return ret, true
217}
218
219func (lk *leaseKey) get(op v3.Op) *v3.GetResponse {
220 ret := *lk.response
221 ret.Header = copyHeader(ret.Header)
222 empty := len(ret.Kvs) == 0 || op.IsCountOnly()
223 empty = empty || (op.MinModRev() > ret.Kvs[0].ModRevision)
224 empty = empty || (op.MaxModRev() != 0 && op.MaxModRev() < ret.Kvs[0].ModRevision)
225 empty = empty || (op.MinCreateRev() > ret.Kvs[0].CreateRevision)
226 empty = empty || (op.MaxCreateRev() != 0 && op.MaxCreateRev() < ret.Kvs[0].CreateRevision)
227 if empty {
228 ret.Kvs = nil
229 } else {
230 kv := *ret.Kvs[0]
231 kv.Key = make([]byte, len(kv.Key))
232 copy(kv.Key, ret.Kvs[0].Key)
233 if !op.IsKeysOnly() {
234 kv.Value = make([]byte, len(kv.Value))
235 copy(kv.Value, ret.Kvs[0].Value)
236 }
237 ret.Kvs = []*mvccpb.KeyValue{&kv}
238 }
239 return &ret
240}
241
242func (lc *leaseCache) notify(key string) (*leaseKey, <-chan struct{}) {
243 lc.mu.RLock()
244 defer lc.mu.RUnlock()
245 if li := lc.entries[key]; li != nil {
246 return li, li.waitc
247 }
248 return nil, nil
249}
250
251func (lc *leaseCache) clearOldRevokes(ctx context.Context) {
252 for {
253 select {
254 case <-ctx.Done():
255 return
256 case <-time.After(time.Second):
257 lc.mu.Lock()
258 for k, lr := range lc.revokes {
259 if time.Now().Sub(lr.Add(revokeBackoff)) > 0 {
260 delete(lc.revokes, k)
261 }
262 }
263 lc.mu.Unlock()
264 }
265 }
266}
267
268func (lc *leaseCache) evalCmp(cmps []v3.Cmp) (cmpVal bool, ok bool) {
269 for _, cmp := range cmps {
270 if len(cmp.RangeEnd) > 0 {
271 return false, false
272 }
273 lk := lc.entries[string(cmp.Key)]
274 if lk == nil {
275 return false, false
276 }
277 if !evalCmp(lk.response, cmp) {
278 return false, true
279 }
280 }
281 return true, true
282}
283
284func (lc *leaseCache) evalOps(ops []v3.Op) ([]*v3pb.ResponseOp, bool) {
285 resps := make([]*v3pb.ResponseOp, len(ops))
286 for i, op := range ops {
287 if !op.IsGet() || isBadOp(op) {
288 // TODO: support read-only Txn
289 return nil, false
290 }
291 lk := lc.entries[string(op.KeyBytes())]
292 if lk == nil {
293 return nil, false
294 }
295 resp := lk.get(op)
296 if resp == nil {
297 return nil, false
298 }
299 resps[i] = &v3pb.ResponseOp{
300 Response: &v3pb.ResponseOp_ResponseRange{
301 (*v3pb.RangeResponse)(resp),
302 },
303 }
304 }
305 return resps, true
306}