khenaidoo | ffe076b | 2019-01-15 16:08:08 -0500 | [diff] [blame^] | 1 | // Copyright 2017 The etcd Authors |
| 2 | // |
| 3 | // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | // you may not use this file except in compliance with the License. |
| 5 | // You may obtain a copy of the License at |
| 6 | // |
| 7 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | // |
| 9 | // Unless required by applicable law or agreed to in writing, software |
| 10 | // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | // See the License for the specific language governing permissions and |
| 13 | // limitations under the License. |
| 14 | |
| 15 | package leasing |
| 16 | |
| 17 | import ( |
| 18 | "context" |
| 19 | "strings" |
| 20 | "sync" |
| 21 | "time" |
| 22 | |
| 23 | v3 "github.com/coreos/etcd/clientv3" |
| 24 | v3pb "github.com/coreos/etcd/etcdserver/etcdserverpb" |
| 25 | "github.com/coreos/etcd/mvcc/mvccpb" |
| 26 | ) |
| 27 | |
| 28 | const revokeBackoff = 2 * time.Second |
| 29 | |
| 30 | type leaseCache struct { |
| 31 | mu sync.RWMutex |
| 32 | entries map[string]*leaseKey |
| 33 | revokes map[string]time.Time |
| 34 | header *v3pb.ResponseHeader |
| 35 | } |
| 36 | |
| 37 | type leaseKey struct { |
| 38 | response *v3.GetResponse |
| 39 | // rev is the leasing key revision. |
| 40 | rev int64 |
| 41 | waitc chan struct{} |
| 42 | } |
| 43 | |
| 44 | func (lc *leaseCache) Rev(key string) int64 { |
| 45 | lc.mu.RLock() |
| 46 | defer lc.mu.RUnlock() |
| 47 | if li := lc.entries[key]; li != nil { |
| 48 | return li.rev |
| 49 | } |
| 50 | return 0 |
| 51 | } |
| 52 | |
| 53 | func (lc *leaseCache) Lock(key string) (chan<- struct{}, int64) { |
| 54 | lc.mu.Lock() |
| 55 | defer lc.mu.Unlock() |
| 56 | if li := lc.entries[key]; li != nil { |
| 57 | li.waitc = make(chan struct{}) |
| 58 | return li.waitc, li.rev |
| 59 | } |
| 60 | return nil, 0 |
| 61 | } |
| 62 | |
| 63 | func (lc *leaseCache) LockRange(begin, end string) (ret []chan<- struct{}) { |
| 64 | lc.mu.Lock() |
| 65 | defer lc.mu.Unlock() |
| 66 | for k, li := range lc.entries { |
| 67 | if inRange(k, begin, end) { |
| 68 | li.waitc = make(chan struct{}) |
| 69 | ret = append(ret, li.waitc) |
| 70 | } |
| 71 | } |
| 72 | return ret |
| 73 | } |
| 74 | |
| 75 | func inRange(k, begin, end string) bool { |
| 76 | if strings.Compare(k, begin) < 0 { |
| 77 | return false |
| 78 | } |
| 79 | if end != "\x00" && strings.Compare(k, end) >= 0 { |
| 80 | return false |
| 81 | } |
| 82 | return true |
| 83 | } |
| 84 | |
| 85 | func (lc *leaseCache) LockWriteOps(ops []v3.Op) (ret []chan<- struct{}) { |
| 86 | for _, op := range ops { |
| 87 | if op.IsGet() { |
| 88 | continue |
| 89 | } |
| 90 | key := string(op.KeyBytes()) |
| 91 | if end := string(op.RangeBytes()); end == "" { |
| 92 | if wc, _ := lc.Lock(key); wc != nil { |
| 93 | ret = append(ret, wc) |
| 94 | } |
| 95 | } else { |
| 96 | for k := range lc.entries { |
| 97 | if !inRange(k, key, end) { |
| 98 | continue |
| 99 | } |
| 100 | if wc, _ := lc.Lock(k); wc != nil { |
| 101 | ret = append(ret, wc) |
| 102 | } |
| 103 | } |
| 104 | } |
| 105 | } |
| 106 | return ret |
| 107 | } |
| 108 | |
| 109 | func (lc *leaseCache) NotifyOps(ops []v3.Op) (wcs []<-chan struct{}) { |
| 110 | for _, op := range ops { |
| 111 | if op.IsGet() { |
| 112 | if _, wc := lc.notify(string(op.KeyBytes())); wc != nil { |
| 113 | wcs = append(wcs, wc) |
| 114 | } |
| 115 | } |
| 116 | } |
| 117 | return wcs |
| 118 | } |
| 119 | |
| 120 | func (lc *leaseCache) MayAcquire(key string) bool { |
| 121 | lc.mu.RLock() |
| 122 | lr, ok := lc.revokes[key] |
| 123 | lc.mu.RUnlock() |
| 124 | return !ok || time.Since(lr) > revokeBackoff |
| 125 | } |
| 126 | |
| 127 | func (lc *leaseCache) Add(key string, resp *v3.GetResponse, op v3.Op) *v3.GetResponse { |
| 128 | lk := &leaseKey{resp, resp.Header.Revision, closedCh} |
| 129 | lc.mu.Lock() |
| 130 | if lc.header == nil || lc.header.Revision < resp.Header.Revision { |
| 131 | lc.header = resp.Header |
| 132 | } |
| 133 | lc.entries[key] = lk |
| 134 | ret := lk.get(op) |
| 135 | lc.mu.Unlock() |
| 136 | return ret |
| 137 | } |
| 138 | |
| 139 | func (lc *leaseCache) Update(key, val []byte, respHeader *v3pb.ResponseHeader) { |
| 140 | li := lc.entries[string(key)] |
| 141 | if li == nil { |
| 142 | return |
| 143 | } |
| 144 | cacheResp := li.response |
| 145 | if len(cacheResp.Kvs) == 0 { |
| 146 | kv := &mvccpb.KeyValue{ |
| 147 | Key: key, |
| 148 | CreateRevision: respHeader.Revision, |
| 149 | } |
| 150 | cacheResp.Kvs = append(cacheResp.Kvs, kv) |
| 151 | cacheResp.Count = 1 |
| 152 | } |
| 153 | cacheResp.Kvs[0].Version++ |
| 154 | if cacheResp.Kvs[0].ModRevision < respHeader.Revision { |
| 155 | cacheResp.Header = respHeader |
| 156 | cacheResp.Kvs[0].ModRevision = respHeader.Revision |
| 157 | cacheResp.Kvs[0].Value = val |
| 158 | } |
| 159 | } |
| 160 | |
| 161 | func (lc *leaseCache) Delete(key string, hdr *v3pb.ResponseHeader) { |
| 162 | lc.mu.Lock() |
| 163 | defer lc.mu.Unlock() |
| 164 | lc.delete(key, hdr) |
| 165 | } |
| 166 | |
| 167 | func (lc *leaseCache) delete(key string, hdr *v3pb.ResponseHeader) { |
| 168 | if li := lc.entries[key]; li != nil && hdr.Revision >= li.response.Header.Revision { |
| 169 | li.response.Kvs = nil |
| 170 | li.response.Header = copyHeader(hdr) |
| 171 | } |
| 172 | } |
| 173 | |
| 174 | func (lc *leaseCache) Evict(key string) (rev int64) { |
| 175 | lc.mu.Lock() |
| 176 | defer lc.mu.Unlock() |
| 177 | if li := lc.entries[key]; li != nil { |
| 178 | rev = li.rev |
| 179 | delete(lc.entries, key) |
| 180 | lc.revokes[key] = time.Now() |
| 181 | } |
| 182 | return rev |
| 183 | } |
| 184 | |
| 185 | func (lc *leaseCache) EvictRange(key, end string) { |
| 186 | lc.mu.Lock() |
| 187 | defer lc.mu.Unlock() |
| 188 | for k := range lc.entries { |
| 189 | if inRange(k, key, end) { |
| 190 | delete(lc.entries, key) |
| 191 | lc.revokes[key] = time.Now() |
| 192 | } |
| 193 | } |
| 194 | } |
| 195 | |
| 196 | func isBadOp(op v3.Op) bool { return op.Rev() > 0 || len(op.RangeBytes()) > 0 } |
| 197 | |
| 198 | func (lc *leaseCache) Get(ctx context.Context, op v3.Op) (*v3.GetResponse, bool) { |
| 199 | if isBadOp(op) { |
| 200 | return nil, false |
| 201 | } |
| 202 | key := string(op.KeyBytes()) |
| 203 | li, wc := lc.notify(key) |
| 204 | if li == nil { |
| 205 | return nil, true |
| 206 | } |
| 207 | select { |
| 208 | case <-wc: |
| 209 | case <-ctx.Done(): |
| 210 | return nil, true |
| 211 | } |
| 212 | lc.mu.RLock() |
| 213 | lk := *li |
| 214 | ret := lk.get(op) |
| 215 | lc.mu.RUnlock() |
| 216 | return ret, true |
| 217 | } |
| 218 | |
| 219 | func (lk *leaseKey) get(op v3.Op) *v3.GetResponse { |
| 220 | ret := *lk.response |
| 221 | ret.Header = copyHeader(ret.Header) |
| 222 | empty := len(ret.Kvs) == 0 || op.IsCountOnly() |
| 223 | empty = empty || (op.MinModRev() > ret.Kvs[0].ModRevision) |
| 224 | empty = empty || (op.MaxModRev() != 0 && op.MaxModRev() < ret.Kvs[0].ModRevision) |
| 225 | empty = empty || (op.MinCreateRev() > ret.Kvs[0].CreateRevision) |
| 226 | empty = empty || (op.MaxCreateRev() != 0 && op.MaxCreateRev() < ret.Kvs[0].CreateRevision) |
| 227 | if empty { |
| 228 | ret.Kvs = nil |
| 229 | } else { |
| 230 | kv := *ret.Kvs[0] |
| 231 | kv.Key = make([]byte, len(kv.Key)) |
| 232 | copy(kv.Key, ret.Kvs[0].Key) |
| 233 | if !op.IsKeysOnly() { |
| 234 | kv.Value = make([]byte, len(kv.Value)) |
| 235 | copy(kv.Value, ret.Kvs[0].Value) |
| 236 | } |
| 237 | ret.Kvs = []*mvccpb.KeyValue{&kv} |
| 238 | } |
| 239 | return &ret |
| 240 | } |
| 241 | |
| 242 | func (lc *leaseCache) notify(key string) (*leaseKey, <-chan struct{}) { |
| 243 | lc.mu.RLock() |
| 244 | defer lc.mu.RUnlock() |
| 245 | if li := lc.entries[key]; li != nil { |
| 246 | return li, li.waitc |
| 247 | } |
| 248 | return nil, nil |
| 249 | } |
| 250 | |
| 251 | func (lc *leaseCache) clearOldRevokes(ctx context.Context) { |
| 252 | for { |
| 253 | select { |
| 254 | case <-ctx.Done(): |
| 255 | return |
| 256 | case <-time.After(time.Second): |
| 257 | lc.mu.Lock() |
| 258 | for k, lr := range lc.revokes { |
| 259 | if time.Now().Sub(lr.Add(revokeBackoff)) > 0 { |
| 260 | delete(lc.revokes, k) |
| 261 | } |
| 262 | } |
| 263 | lc.mu.Unlock() |
| 264 | } |
| 265 | } |
| 266 | } |
| 267 | |
| 268 | func (lc *leaseCache) evalCmp(cmps []v3.Cmp) (cmpVal bool, ok bool) { |
| 269 | for _, cmp := range cmps { |
| 270 | if len(cmp.RangeEnd) > 0 { |
| 271 | return false, false |
| 272 | } |
| 273 | lk := lc.entries[string(cmp.Key)] |
| 274 | if lk == nil { |
| 275 | return false, false |
| 276 | } |
| 277 | if !evalCmp(lk.response, cmp) { |
| 278 | return false, true |
| 279 | } |
| 280 | } |
| 281 | return true, true |
| 282 | } |
| 283 | |
| 284 | func (lc *leaseCache) evalOps(ops []v3.Op) ([]*v3pb.ResponseOp, bool) { |
| 285 | resps := make([]*v3pb.ResponseOp, len(ops)) |
| 286 | for i, op := range ops { |
| 287 | if !op.IsGet() || isBadOp(op) { |
| 288 | // TODO: support read-only Txn |
| 289 | return nil, false |
| 290 | } |
| 291 | lk := lc.entries[string(op.KeyBytes())] |
| 292 | if lk == nil { |
| 293 | return nil, false |
| 294 | } |
| 295 | resp := lk.get(op) |
| 296 | if resp == nil { |
| 297 | return nil, false |
| 298 | } |
| 299 | resps[i] = &v3pb.ResponseOp{ |
| 300 | Response: &v3pb.ResponseOp_ResponseRange{ |
| 301 | (*v3pb.RangeResponse)(resp), |
| 302 | }, |
| 303 | } |
| 304 | } |
| 305 | return resps, true |
| 306 | } |