blob: 6903a785c78901e2357d7dd1fb893d253d479751 [file] [log] [blame]
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package leasing
import (
"context"
"strings"
"sync"
"time"
v3 "github.com/coreos/etcd/clientv3"
v3pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/mvcc/mvccpb"
)
const revokeBackoff = 2 * time.Second
type leaseCache struct {
mu sync.RWMutex
entries map[string]*leaseKey
revokes map[string]time.Time
header *v3pb.ResponseHeader
}
type leaseKey struct {
response *v3.GetResponse
// rev is the leasing key revision.
rev int64
waitc chan struct{}
}
func (lc *leaseCache) Rev(key string) int64 {
lc.mu.RLock()
defer lc.mu.RUnlock()
if li := lc.entries[key]; li != nil {
return li.rev
}
return 0
}
func (lc *leaseCache) Lock(key string) (chan<- struct{}, int64) {
lc.mu.Lock()
defer lc.mu.Unlock()
if li := lc.entries[key]; li != nil {
li.waitc = make(chan struct{})
return li.waitc, li.rev
}
return nil, 0
}
func (lc *leaseCache) LockRange(begin, end string) (ret []chan<- struct{}) {
lc.mu.Lock()
defer lc.mu.Unlock()
for k, li := range lc.entries {
if inRange(k, begin, end) {
li.waitc = make(chan struct{})
ret = append(ret, li.waitc)
}
}
return ret
}
func inRange(k, begin, end string) bool {
if strings.Compare(k, begin) < 0 {
return false
}
if end != "\x00" && strings.Compare(k, end) >= 0 {
return false
}
return true
}
func (lc *leaseCache) LockWriteOps(ops []v3.Op) (ret []chan<- struct{}) {
for _, op := range ops {
if op.IsGet() {
continue
}
key := string(op.KeyBytes())
if end := string(op.RangeBytes()); end == "" {
if wc, _ := lc.Lock(key); wc != nil {
ret = append(ret, wc)
}
} else {
for k := range lc.entries {
if !inRange(k, key, end) {
continue
}
if wc, _ := lc.Lock(k); wc != nil {
ret = append(ret, wc)
}
}
}
}
return ret
}
func (lc *leaseCache) NotifyOps(ops []v3.Op) (wcs []<-chan struct{}) {
for _, op := range ops {
if op.IsGet() {
if _, wc := lc.notify(string(op.KeyBytes())); wc != nil {
wcs = append(wcs, wc)
}
}
}
return wcs
}
func (lc *leaseCache) MayAcquire(key string) bool {
lc.mu.RLock()
lr, ok := lc.revokes[key]
lc.mu.RUnlock()
return !ok || time.Since(lr) > revokeBackoff
}
func (lc *leaseCache) Add(key string, resp *v3.GetResponse, op v3.Op) *v3.GetResponse {
lk := &leaseKey{resp, resp.Header.Revision, closedCh}
lc.mu.Lock()
if lc.header == nil || lc.header.Revision < resp.Header.Revision {
lc.header = resp.Header
}
lc.entries[key] = lk
ret := lk.get(op)
lc.mu.Unlock()
return ret
}
func (lc *leaseCache) Update(key, val []byte, respHeader *v3pb.ResponseHeader) {
li := lc.entries[string(key)]
if li == nil {
return
}
cacheResp := li.response
if len(cacheResp.Kvs) == 0 {
kv := &mvccpb.KeyValue{
Key: key,
CreateRevision: respHeader.Revision,
}
cacheResp.Kvs = append(cacheResp.Kvs, kv)
cacheResp.Count = 1
}
cacheResp.Kvs[0].Version++
if cacheResp.Kvs[0].ModRevision < respHeader.Revision {
cacheResp.Header = respHeader
cacheResp.Kvs[0].ModRevision = respHeader.Revision
cacheResp.Kvs[0].Value = val
}
}
func (lc *leaseCache) Delete(key string, hdr *v3pb.ResponseHeader) {
lc.mu.Lock()
defer lc.mu.Unlock()
lc.delete(key, hdr)
}
func (lc *leaseCache) delete(key string, hdr *v3pb.ResponseHeader) {
if li := lc.entries[key]; li != nil && hdr.Revision >= li.response.Header.Revision {
li.response.Kvs = nil
li.response.Header = copyHeader(hdr)
}
}
func (lc *leaseCache) Evict(key string) (rev int64) {
lc.mu.Lock()
defer lc.mu.Unlock()
if li := lc.entries[key]; li != nil {
rev = li.rev
delete(lc.entries, key)
lc.revokes[key] = time.Now()
}
return rev
}
func (lc *leaseCache) EvictRange(key, end string) {
lc.mu.Lock()
defer lc.mu.Unlock()
for k := range lc.entries {
if inRange(k, key, end) {
delete(lc.entries, key)
lc.revokes[key] = time.Now()
}
}
}
func isBadOp(op v3.Op) bool { return op.Rev() > 0 || len(op.RangeBytes()) > 0 }
func (lc *leaseCache) Get(ctx context.Context, op v3.Op) (*v3.GetResponse, bool) {
if isBadOp(op) {
return nil, false
}
key := string(op.KeyBytes())
li, wc := lc.notify(key)
if li == nil {
return nil, true
}
select {
case <-wc:
case <-ctx.Done():
return nil, true
}
lc.mu.RLock()
lk := *li
ret := lk.get(op)
lc.mu.RUnlock()
return ret, true
}
func (lk *leaseKey) get(op v3.Op) *v3.GetResponse {
ret := *lk.response
ret.Header = copyHeader(ret.Header)
empty := len(ret.Kvs) == 0 || op.IsCountOnly()
empty = empty || (op.MinModRev() > ret.Kvs[0].ModRevision)
empty = empty || (op.MaxModRev() != 0 && op.MaxModRev() < ret.Kvs[0].ModRevision)
empty = empty || (op.MinCreateRev() > ret.Kvs[0].CreateRevision)
empty = empty || (op.MaxCreateRev() != 0 && op.MaxCreateRev() < ret.Kvs[0].CreateRevision)
if empty {
ret.Kvs = nil
} else {
kv := *ret.Kvs[0]
kv.Key = make([]byte, len(kv.Key))
copy(kv.Key, ret.Kvs[0].Key)
if !op.IsKeysOnly() {
kv.Value = make([]byte, len(kv.Value))
copy(kv.Value, ret.Kvs[0].Value)
}
ret.Kvs = []*mvccpb.KeyValue{&kv}
}
return &ret
}
func (lc *leaseCache) notify(key string) (*leaseKey, <-chan struct{}) {
lc.mu.RLock()
defer lc.mu.RUnlock()
if li := lc.entries[key]; li != nil {
return li, li.waitc
}
return nil, nil
}
func (lc *leaseCache) clearOldRevokes(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-time.After(time.Second):
lc.mu.Lock()
for k, lr := range lc.revokes {
if time.Now().Sub(lr.Add(revokeBackoff)) > 0 {
delete(lc.revokes, k)
}
}
lc.mu.Unlock()
}
}
}
func (lc *leaseCache) evalCmp(cmps []v3.Cmp) (cmpVal bool, ok bool) {
for _, cmp := range cmps {
if len(cmp.RangeEnd) > 0 {
return false, false
}
lk := lc.entries[string(cmp.Key)]
if lk == nil {
return false, false
}
if !evalCmp(lk.response, cmp) {
return false, true
}
}
return true, true
}
func (lc *leaseCache) evalOps(ops []v3.Op) ([]*v3pb.ResponseOp, bool) {
resps := make([]*v3pb.ResponseOp, len(ops))
for i, op := range ops {
if !op.IsGet() || isBadOp(op) {
// TODO: support read-only Txn
return nil, false
}
lk := lc.entries[string(op.KeyBytes())]
if lk == nil {
return nil, false
}
resp := lk.get(op)
if resp == nil {
return nil, false
}
resps[i] = &v3pb.ResponseOp{
Response: &v3pb.ResponseOp_ResponseRange{
(*v3pb.RangeResponse)(resp),
},
}
}
return resps, true
}