khenaidoo | ffe076b | 2019-01-15 16:08:08 -0500 | [diff] [blame^] | 1 | // Copyright 2016 The etcd Authors |
| 2 | // |
| 3 | // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | // you may not use this file except in compliance with the License. |
| 5 | // You may obtain a copy of the License at |
| 6 | // |
| 7 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | // |
| 9 | // Unless required by applicable law or agreed to in writing, software |
| 10 | // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | // See the License for the specific language governing permissions and |
| 13 | // limitations under the License. |
| 14 | |
| 15 | package grpcproxy |
| 16 | |
| 17 | import ( |
| 18 | "context" |
| 19 | |
| 20 | "github.com/coreos/etcd/clientv3" |
| 21 | pb "github.com/coreos/etcd/etcdserver/etcdserverpb" |
| 22 | "github.com/coreos/etcd/proxy/grpcproxy/cache" |
| 23 | ) |
| 24 | |
| 25 | type kvProxy struct { |
| 26 | kv clientv3.KV |
| 27 | cache cache.Cache |
| 28 | } |
| 29 | |
| 30 | func NewKvProxy(c *clientv3.Client) (pb.KVServer, <-chan struct{}) { |
| 31 | kv := &kvProxy{ |
| 32 | kv: c.KV, |
| 33 | cache: cache.NewCache(cache.DefaultMaxEntries), |
| 34 | } |
| 35 | donec := make(chan struct{}) |
| 36 | close(donec) |
| 37 | return kv, donec |
| 38 | } |
| 39 | |
| 40 | func (p *kvProxy) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) { |
| 41 | if r.Serializable { |
| 42 | resp, err := p.cache.Get(r) |
| 43 | switch err { |
| 44 | case nil: |
| 45 | cacheHits.Inc() |
| 46 | return resp, nil |
| 47 | case cache.ErrCompacted: |
| 48 | cacheHits.Inc() |
| 49 | return nil, err |
| 50 | } |
| 51 | |
| 52 | cachedMisses.Inc() |
| 53 | } |
| 54 | |
| 55 | resp, err := p.kv.Do(ctx, RangeRequestToOp(r)) |
| 56 | if err != nil { |
| 57 | return nil, err |
| 58 | } |
| 59 | |
| 60 | // cache linearizable as serializable |
| 61 | req := *r |
| 62 | req.Serializable = true |
| 63 | gresp := (*pb.RangeResponse)(resp.Get()) |
| 64 | p.cache.Add(&req, gresp) |
| 65 | cacheKeys.Set(float64(p.cache.Size())) |
| 66 | |
| 67 | return gresp, nil |
| 68 | } |
| 69 | |
| 70 | func (p *kvProxy) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) { |
| 71 | p.cache.Invalidate(r.Key, nil) |
| 72 | cacheKeys.Set(float64(p.cache.Size())) |
| 73 | |
| 74 | resp, err := p.kv.Do(ctx, PutRequestToOp(r)) |
| 75 | return (*pb.PutResponse)(resp.Put()), err |
| 76 | } |
| 77 | |
| 78 | func (p *kvProxy) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) { |
| 79 | p.cache.Invalidate(r.Key, r.RangeEnd) |
| 80 | cacheKeys.Set(float64(p.cache.Size())) |
| 81 | |
| 82 | resp, err := p.kv.Do(ctx, DelRequestToOp(r)) |
| 83 | return (*pb.DeleteRangeResponse)(resp.Del()), err |
| 84 | } |
| 85 | |
| 86 | func (p *kvProxy) txnToCache(reqs []*pb.RequestOp, resps []*pb.ResponseOp) { |
| 87 | for i := range resps { |
| 88 | switch tv := resps[i].Response.(type) { |
| 89 | case *pb.ResponseOp_ResponsePut: |
| 90 | p.cache.Invalidate(reqs[i].GetRequestPut().Key, nil) |
| 91 | case *pb.ResponseOp_ResponseDeleteRange: |
| 92 | rdr := reqs[i].GetRequestDeleteRange() |
| 93 | p.cache.Invalidate(rdr.Key, rdr.RangeEnd) |
| 94 | case *pb.ResponseOp_ResponseRange: |
| 95 | req := *(reqs[i].GetRequestRange()) |
| 96 | req.Serializable = true |
| 97 | p.cache.Add(&req, tv.ResponseRange) |
| 98 | } |
| 99 | } |
| 100 | } |
| 101 | |
| 102 | func (p *kvProxy) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) { |
| 103 | op := TxnRequestToOp(r) |
| 104 | opResp, err := p.kv.Do(ctx, op) |
| 105 | if err != nil { |
| 106 | return nil, err |
| 107 | } |
| 108 | resp := opResp.Txn() |
| 109 | |
| 110 | // txn may claim an outdated key is updated; be safe and invalidate |
| 111 | for _, cmp := range r.Compare { |
| 112 | p.cache.Invalidate(cmp.Key, cmp.RangeEnd) |
| 113 | } |
| 114 | // update any fetched keys |
| 115 | if resp.Succeeded { |
| 116 | p.txnToCache(r.Success, resp.Responses) |
| 117 | } else { |
| 118 | p.txnToCache(r.Failure, resp.Responses) |
| 119 | } |
| 120 | |
| 121 | cacheKeys.Set(float64(p.cache.Size())) |
| 122 | |
| 123 | return (*pb.TxnResponse)(resp), nil |
| 124 | } |
| 125 | |
| 126 | func (p *kvProxy) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) { |
| 127 | var opts []clientv3.CompactOption |
| 128 | if r.Physical { |
| 129 | opts = append(opts, clientv3.WithCompactPhysical()) |
| 130 | } |
| 131 | |
| 132 | resp, err := p.kv.Compact(ctx, r.Revision, opts...) |
| 133 | if err == nil { |
| 134 | p.cache.Compact(r.Revision) |
| 135 | } |
| 136 | |
| 137 | cacheKeys.Set(float64(p.cache.Size())) |
| 138 | |
| 139 | return (*pb.CompactionResponse)(resp), err |
| 140 | } |
| 141 | |
| 142 | func requestOpToOp(union *pb.RequestOp) clientv3.Op { |
| 143 | switch tv := union.Request.(type) { |
| 144 | case *pb.RequestOp_RequestRange: |
| 145 | if tv.RequestRange != nil { |
| 146 | return RangeRequestToOp(tv.RequestRange) |
| 147 | } |
| 148 | case *pb.RequestOp_RequestPut: |
| 149 | if tv.RequestPut != nil { |
| 150 | return PutRequestToOp(tv.RequestPut) |
| 151 | } |
| 152 | case *pb.RequestOp_RequestDeleteRange: |
| 153 | if tv.RequestDeleteRange != nil { |
| 154 | return DelRequestToOp(tv.RequestDeleteRange) |
| 155 | } |
| 156 | case *pb.RequestOp_RequestTxn: |
| 157 | if tv.RequestTxn != nil { |
| 158 | return TxnRequestToOp(tv.RequestTxn) |
| 159 | } |
| 160 | } |
| 161 | panic("unknown request") |
| 162 | } |
| 163 | |
| 164 | func RangeRequestToOp(r *pb.RangeRequest) clientv3.Op { |
| 165 | opts := []clientv3.OpOption{} |
| 166 | if len(r.RangeEnd) != 0 { |
| 167 | opts = append(opts, clientv3.WithRange(string(r.RangeEnd))) |
| 168 | } |
| 169 | opts = append(opts, clientv3.WithRev(r.Revision)) |
| 170 | opts = append(opts, clientv3.WithLimit(r.Limit)) |
| 171 | opts = append(opts, clientv3.WithSort( |
| 172 | clientv3.SortTarget(r.SortTarget), |
| 173 | clientv3.SortOrder(r.SortOrder)), |
| 174 | ) |
| 175 | opts = append(opts, clientv3.WithMaxCreateRev(r.MaxCreateRevision)) |
| 176 | opts = append(opts, clientv3.WithMinCreateRev(r.MinCreateRevision)) |
| 177 | opts = append(opts, clientv3.WithMaxModRev(r.MaxModRevision)) |
| 178 | opts = append(opts, clientv3.WithMinModRev(r.MinModRevision)) |
| 179 | if r.CountOnly { |
| 180 | opts = append(opts, clientv3.WithCountOnly()) |
| 181 | } |
| 182 | if r.KeysOnly { |
| 183 | opts = append(opts, clientv3.WithKeysOnly()) |
| 184 | } |
| 185 | if r.Serializable { |
| 186 | opts = append(opts, clientv3.WithSerializable()) |
| 187 | } |
| 188 | |
| 189 | return clientv3.OpGet(string(r.Key), opts...) |
| 190 | } |
| 191 | |
| 192 | func PutRequestToOp(r *pb.PutRequest) clientv3.Op { |
| 193 | opts := []clientv3.OpOption{} |
| 194 | opts = append(opts, clientv3.WithLease(clientv3.LeaseID(r.Lease))) |
| 195 | if r.IgnoreValue { |
| 196 | opts = append(opts, clientv3.WithIgnoreValue()) |
| 197 | } |
| 198 | if r.IgnoreLease { |
| 199 | opts = append(opts, clientv3.WithIgnoreLease()) |
| 200 | } |
| 201 | if r.PrevKv { |
| 202 | opts = append(opts, clientv3.WithPrevKV()) |
| 203 | } |
| 204 | return clientv3.OpPut(string(r.Key), string(r.Value), opts...) |
| 205 | } |
| 206 | |
| 207 | func DelRequestToOp(r *pb.DeleteRangeRequest) clientv3.Op { |
| 208 | opts := []clientv3.OpOption{} |
| 209 | if len(r.RangeEnd) != 0 { |
| 210 | opts = append(opts, clientv3.WithRange(string(r.RangeEnd))) |
| 211 | } |
| 212 | if r.PrevKv { |
| 213 | opts = append(opts, clientv3.WithPrevKV()) |
| 214 | } |
| 215 | return clientv3.OpDelete(string(r.Key), opts...) |
| 216 | } |
| 217 | |
| 218 | func TxnRequestToOp(r *pb.TxnRequest) clientv3.Op { |
| 219 | cmps := make([]clientv3.Cmp, len(r.Compare)) |
| 220 | thenops := make([]clientv3.Op, len(r.Success)) |
| 221 | elseops := make([]clientv3.Op, len(r.Failure)) |
| 222 | for i := range r.Compare { |
| 223 | cmps[i] = (clientv3.Cmp)(*r.Compare[i]) |
| 224 | } |
| 225 | for i := range r.Success { |
| 226 | thenops[i] = requestOpToOp(r.Success[i]) |
| 227 | } |
| 228 | for i := range r.Failure { |
| 229 | elseops[i] = requestOpToOp(r.Failure[i]) |
| 230 | } |
| 231 | return clientv3.OpTxn(cmps, thenops, elseops) |
| 232 | } |