blob: 1f06ad0dd6735604a8c31c4148ce01f5c703f67d [file] [log] [blame]
khenaidooab1f7bd2019-11-14 14:00:27 -05001// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package etcdserver
16
17import (
18 "bytes"
19 "context"
20 "fmt"
21 "sort"
22 "time"
23
24 "go.etcd.io/etcd/auth"
25 pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
26 "go.etcd.io/etcd/lease"
27 "go.etcd.io/etcd/mvcc"
28 "go.etcd.io/etcd/mvcc/mvccpb"
29 "go.etcd.io/etcd/pkg/types"
30
31 "github.com/gogo/protobuf/proto"
32 "go.uber.org/zap"
33)
34
35const (
36 warnApplyDuration = 100 * time.Millisecond
37)
38
39type applyResult struct {
40 resp proto.Message
41 err error
42 // physc signals the physical effect of the request has completed in addition
43 // to being logically reflected by the node. Currently only used for
44 // Compaction requests.
45 physc <-chan struct{}
46}
47
48// applierV3 is the interface for processing V3 raft messages
49type applierV3 interface {
50 Apply(r *pb.InternalRaftRequest) *applyResult
51
52 Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, error)
53 Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error)
54 DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error)
55 Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error)
56 Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, error)
57
58 LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error)
59 LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error)
60
61 LeaseCheckpoint(lc *pb.LeaseCheckpointRequest) (*pb.LeaseCheckpointResponse, error)
62
63 Alarm(*pb.AlarmRequest) (*pb.AlarmResponse, error)
64
65 Authenticate(r *pb.InternalAuthenticateRequest) (*pb.AuthenticateResponse, error)
66
67 AuthEnable() (*pb.AuthEnableResponse, error)
68 AuthDisable() (*pb.AuthDisableResponse, error)
69
70 UserAdd(ua *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, error)
71 UserDelete(ua *pb.AuthUserDeleteRequest) (*pb.AuthUserDeleteResponse, error)
72 UserChangePassword(ua *pb.AuthUserChangePasswordRequest) (*pb.AuthUserChangePasswordResponse, error)
73 UserGrantRole(ua *pb.AuthUserGrantRoleRequest) (*pb.AuthUserGrantRoleResponse, error)
74 UserGet(ua *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, error)
75 UserRevokeRole(ua *pb.AuthUserRevokeRoleRequest) (*pb.AuthUserRevokeRoleResponse, error)
76 RoleAdd(ua *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, error)
77 RoleGrantPermission(ua *pb.AuthRoleGrantPermissionRequest) (*pb.AuthRoleGrantPermissionResponse, error)
78 RoleGet(ua *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, error)
79 RoleRevokePermission(ua *pb.AuthRoleRevokePermissionRequest) (*pb.AuthRoleRevokePermissionResponse, error)
80 RoleDelete(ua *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDeleteResponse, error)
81 UserList(ua *pb.AuthUserListRequest) (*pb.AuthUserListResponse, error)
82 RoleList(ua *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error)
83}
84
85type checkReqFunc func(mvcc.ReadView, *pb.RequestOp) error
86
87type applierV3backend struct {
88 s *EtcdServer
89
90 checkPut checkReqFunc
91 checkRange checkReqFunc
92}
93
94func (s *EtcdServer) newApplierV3Backend() applierV3 {
95 base := &applierV3backend{s: s}
96 base.checkPut = func(rv mvcc.ReadView, req *pb.RequestOp) error {
97 return base.checkRequestPut(rv, req)
98 }
99 base.checkRange = func(rv mvcc.ReadView, req *pb.RequestOp) error {
100 return base.checkRequestRange(rv, req)
101 }
102 return base
103}
104
105func (s *EtcdServer) newApplierV3() applierV3 {
106 return newAuthApplierV3(
107 s.AuthStore(),
108 newQuotaApplierV3(s, s.newApplierV3Backend()),
109 s.lessor,
110 )
111}
112
113func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult {
114 ar := &applyResult{}
115 defer func(start time.Time) {
116 warnOfExpensiveRequest(a.s.getLogger(), start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err)
117 }(time.Now())
118
119 // call into a.s.applyV3.F instead of a.F so upper appliers can check individual calls
120 switch {
121 case r.Range != nil:
122 ar.resp, ar.err = a.s.applyV3.Range(nil, r.Range)
123 case r.Put != nil:
124 ar.resp, ar.err = a.s.applyV3.Put(nil, r.Put)
125 case r.DeleteRange != nil:
126 ar.resp, ar.err = a.s.applyV3.DeleteRange(nil, r.DeleteRange)
127 case r.Txn != nil:
128 ar.resp, ar.err = a.s.applyV3.Txn(r.Txn)
129 case r.Compaction != nil:
130 ar.resp, ar.physc, ar.err = a.s.applyV3.Compaction(r.Compaction)
131 case r.LeaseGrant != nil:
132 ar.resp, ar.err = a.s.applyV3.LeaseGrant(r.LeaseGrant)
133 case r.LeaseRevoke != nil:
134 ar.resp, ar.err = a.s.applyV3.LeaseRevoke(r.LeaseRevoke)
135 case r.LeaseCheckpoint != nil:
136 ar.resp, ar.err = a.s.applyV3.LeaseCheckpoint(r.LeaseCheckpoint)
137 case r.Alarm != nil:
138 ar.resp, ar.err = a.s.applyV3.Alarm(r.Alarm)
139 case r.Authenticate != nil:
140 ar.resp, ar.err = a.s.applyV3.Authenticate(r.Authenticate)
141 case r.AuthEnable != nil:
142 ar.resp, ar.err = a.s.applyV3.AuthEnable()
143 case r.AuthDisable != nil:
144 ar.resp, ar.err = a.s.applyV3.AuthDisable()
145 case r.AuthUserAdd != nil:
146 ar.resp, ar.err = a.s.applyV3.UserAdd(r.AuthUserAdd)
147 case r.AuthUserDelete != nil:
148 ar.resp, ar.err = a.s.applyV3.UserDelete(r.AuthUserDelete)
149 case r.AuthUserChangePassword != nil:
150 ar.resp, ar.err = a.s.applyV3.UserChangePassword(r.AuthUserChangePassword)
151 case r.AuthUserGrantRole != nil:
152 ar.resp, ar.err = a.s.applyV3.UserGrantRole(r.AuthUserGrantRole)
153 case r.AuthUserGet != nil:
154 ar.resp, ar.err = a.s.applyV3.UserGet(r.AuthUserGet)
155 case r.AuthUserRevokeRole != nil:
156 ar.resp, ar.err = a.s.applyV3.UserRevokeRole(r.AuthUserRevokeRole)
157 case r.AuthRoleAdd != nil:
158 ar.resp, ar.err = a.s.applyV3.RoleAdd(r.AuthRoleAdd)
159 case r.AuthRoleGrantPermission != nil:
160 ar.resp, ar.err = a.s.applyV3.RoleGrantPermission(r.AuthRoleGrantPermission)
161 case r.AuthRoleGet != nil:
162 ar.resp, ar.err = a.s.applyV3.RoleGet(r.AuthRoleGet)
163 case r.AuthRoleRevokePermission != nil:
164 ar.resp, ar.err = a.s.applyV3.RoleRevokePermission(r.AuthRoleRevokePermission)
165 case r.AuthRoleDelete != nil:
166 ar.resp, ar.err = a.s.applyV3.RoleDelete(r.AuthRoleDelete)
167 case r.AuthUserList != nil:
168 ar.resp, ar.err = a.s.applyV3.UserList(r.AuthUserList)
169 case r.AuthRoleList != nil:
170 ar.resp, ar.err = a.s.applyV3.RoleList(r.AuthRoleList)
171 default:
172 panic("not implemented")
173 }
174 return ar
175}
176
177func (a *applierV3backend) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, err error) {
178 resp = &pb.PutResponse{}
179 resp.Header = &pb.ResponseHeader{}
180
181 val, leaseID := p.Value, lease.LeaseID(p.Lease)
182 if txn == nil {
183 if leaseID != lease.NoLease {
184 if l := a.s.lessor.Lookup(leaseID); l == nil {
185 return nil, lease.ErrLeaseNotFound
186 }
187 }
188 txn = a.s.KV().Write()
189 defer txn.End()
190 }
191
192 var rr *mvcc.RangeResult
193 if p.IgnoreValue || p.IgnoreLease || p.PrevKv {
194 rr, err = txn.Range(p.Key, nil, mvcc.RangeOptions{})
195 if err != nil {
196 return nil, err
197 }
198 }
199 if p.IgnoreValue || p.IgnoreLease {
200 if rr == nil || len(rr.KVs) == 0 {
201 // ignore_{lease,value} flag expects previous key-value pair
202 return nil, ErrKeyNotFound
203 }
204 }
205 if p.IgnoreValue {
206 val = rr.KVs[0].Value
207 }
208 if p.IgnoreLease {
209 leaseID = lease.LeaseID(rr.KVs[0].Lease)
210 }
211 if p.PrevKv {
212 if rr != nil && len(rr.KVs) != 0 {
213 resp.PrevKv = &rr.KVs[0]
214 }
215 }
216
217 resp.Header.Revision = txn.Put(p.Key, val, leaseID)
218 return resp, nil
219}
220
221func (a *applierV3backend) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
222 resp := &pb.DeleteRangeResponse{}
223 resp.Header = &pb.ResponseHeader{}
224 end := mkGteRange(dr.RangeEnd)
225
226 if txn == nil {
227 txn = a.s.kv.Write()
228 defer txn.End()
229 }
230
231 if dr.PrevKv {
232 rr, err := txn.Range(dr.Key, end, mvcc.RangeOptions{})
233 if err != nil {
234 return nil, err
235 }
236 if rr != nil {
237 resp.PrevKvs = make([]*mvccpb.KeyValue, len(rr.KVs))
238 for i := range rr.KVs {
239 resp.PrevKvs[i] = &rr.KVs[i]
240 }
241 }
242 }
243
244 resp.Deleted, resp.Header.Revision = txn.DeleteRange(dr.Key, end)
245 return resp, nil
246}
247
248func (a *applierV3backend) Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) {
249 resp := &pb.RangeResponse{}
250 resp.Header = &pb.ResponseHeader{}
251
252 if txn == nil {
253 txn = a.s.kv.Read()
254 defer txn.End()
255 }
256
257 limit := r.Limit
258 if r.SortOrder != pb.RangeRequest_NONE ||
259 r.MinModRevision != 0 || r.MaxModRevision != 0 ||
260 r.MinCreateRevision != 0 || r.MaxCreateRevision != 0 {
261 // fetch everything; sort and truncate afterwards
262 limit = 0
263 }
264 if limit > 0 {
265 // fetch one extra for 'more' flag
266 limit = limit + 1
267 }
268
269 ro := mvcc.RangeOptions{
270 Limit: limit,
271 Rev: r.Revision,
272 Count: r.CountOnly,
273 }
274
275 rr, err := txn.Range(r.Key, mkGteRange(r.RangeEnd), ro)
276 if err != nil {
277 return nil, err
278 }
279
280 if r.MaxModRevision != 0 {
281 f := func(kv *mvccpb.KeyValue) bool { return kv.ModRevision > r.MaxModRevision }
282 pruneKVs(rr, f)
283 }
284 if r.MinModRevision != 0 {
285 f := func(kv *mvccpb.KeyValue) bool { return kv.ModRevision < r.MinModRevision }
286 pruneKVs(rr, f)
287 }
288 if r.MaxCreateRevision != 0 {
289 f := func(kv *mvccpb.KeyValue) bool { return kv.CreateRevision > r.MaxCreateRevision }
290 pruneKVs(rr, f)
291 }
292 if r.MinCreateRevision != 0 {
293 f := func(kv *mvccpb.KeyValue) bool { return kv.CreateRevision < r.MinCreateRevision }
294 pruneKVs(rr, f)
295 }
296
297 sortOrder := r.SortOrder
298 if r.SortTarget != pb.RangeRequest_KEY && sortOrder == pb.RangeRequest_NONE {
299 // Since current mvcc.Range implementation returns results
300 // sorted by keys in lexiographically ascending order,
301 // sort ASCEND by default only when target is not 'KEY'
302 sortOrder = pb.RangeRequest_ASCEND
303 }
304 if sortOrder != pb.RangeRequest_NONE {
305 var sorter sort.Interface
306 switch {
307 case r.SortTarget == pb.RangeRequest_KEY:
308 sorter = &kvSortByKey{&kvSort{rr.KVs}}
309 case r.SortTarget == pb.RangeRequest_VERSION:
310 sorter = &kvSortByVersion{&kvSort{rr.KVs}}
311 case r.SortTarget == pb.RangeRequest_CREATE:
312 sorter = &kvSortByCreate{&kvSort{rr.KVs}}
313 case r.SortTarget == pb.RangeRequest_MOD:
314 sorter = &kvSortByMod{&kvSort{rr.KVs}}
315 case r.SortTarget == pb.RangeRequest_VALUE:
316 sorter = &kvSortByValue{&kvSort{rr.KVs}}
317 }
318 switch {
319 case sortOrder == pb.RangeRequest_ASCEND:
320 sort.Sort(sorter)
321 case sortOrder == pb.RangeRequest_DESCEND:
322 sort.Sort(sort.Reverse(sorter))
323 }
324 }
325
326 if r.Limit > 0 && len(rr.KVs) > int(r.Limit) {
327 rr.KVs = rr.KVs[:r.Limit]
328 resp.More = true
329 }
330
331 resp.Header.Revision = rr.Rev
332 resp.Count = int64(rr.Count)
333 resp.Kvs = make([]*mvccpb.KeyValue, len(rr.KVs))
334 for i := range rr.KVs {
335 if r.KeysOnly {
336 rr.KVs[i].Value = nil
337 }
338 resp.Kvs[i] = &rr.KVs[i]
339 }
340 return resp, nil
341}
342
343func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
344 isWrite := !isTxnReadonly(rt)
345 txn := mvcc.NewReadOnlyTxnWrite(a.s.KV().Read())
346
347 txnPath := compareToPath(txn, rt)
348 if isWrite {
349 if _, err := checkRequests(txn, rt, txnPath, a.checkPut); err != nil {
350 txn.End()
351 return nil, err
352 }
353 }
354 if _, err := checkRequests(txn, rt, txnPath, a.checkRange); err != nil {
355 txn.End()
356 return nil, err
357 }
358
359 txnResp, _ := newTxnResp(rt, txnPath)
360
361 // When executing mutable txn ops, etcd must hold the txn lock so
362 // readers do not see any intermediate results. Since writes are
363 // serialized on the raft loop, the revision in the read view will
364 // be the revision of the write txn.
365 if isWrite {
366 txn.End()
367 txn = a.s.KV().Write()
368 }
369 a.applyTxn(txn, rt, txnPath, txnResp)
370 rev := txn.Rev()
371 if len(txn.Changes()) != 0 {
372 rev++
373 }
374 txn.End()
375
376 txnResp.Header.Revision = rev
377 return txnResp, nil
378}
379
380// newTxnResp allocates a txn response for a txn request given a path.
381func newTxnResp(rt *pb.TxnRequest, txnPath []bool) (txnResp *pb.TxnResponse, txnCount int) {
382 reqs := rt.Success
383 if !txnPath[0] {
384 reqs = rt.Failure
385 }
386 resps := make([]*pb.ResponseOp, len(reqs))
387 txnResp = &pb.TxnResponse{
388 Responses: resps,
389 Succeeded: txnPath[0],
390 Header: &pb.ResponseHeader{},
391 }
392 for i, req := range reqs {
393 switch tv := req.Request.(type) {
394 case *pb.RequestOp_RequestRange:
395 resps[i] = &pb.ResponseOp{Response: &pb.ResponseOp_ResponseRange{}}
396 case *pb.RequestOp_RequestPut:
397 resps[i] = &pb.ResponseOp{Response: &pb.ResponseOp_ResponsePut{}}
398 case *pb.RequestOp_RequestDeleteRange:
399 resps[i] = &pb.ResponseOp{Response: &pb.ResponseOp_ResponseDeleteRange{}}
400 case *pb.RequestOp_RequestTxn:
401 resp, txns := newTxnResp(tv.RequestTxn, txnPath[1:])
402 resps[i] = &pb.ResponseOp{Response: &pb.ResponseOp_ResponseTxn{ResponseTxn: resp}}
403 txnPath = txnPath[1+txns:]
404 txnCount += txns + 1
405 default:
406 }
407 }
408 return txnResp, txnCount
409}
410
411func compareToPath(rv mvcc.ReadView, rt *pb.TxnRequest) []bool {
412 txnPath := make([]bool, 1)
413 ops := rt.Success
414 if txnPath[0] = applyCompares(rv, rt.Compare); !txnPath[0] {
415 ops = rt.Failure
416 }
417 for _, op := range ops {
418 tv, ok := op.Request.(*pb.RequestOp_RequestTxn)
419 if !ok || tv.RequestTxn == nil {
420 continue
421 }
422 txnPath = append(txnPath, compareToPath(rv, tv.RequestTxn)...)
423 }
424 return txnPath
425}
426
427func applyCompares(rv mvcc.ReadView, cmps []*pb.Compare) bool {
428 for _, c := range cmps {
429 if !applyCompare(rv, c) {
430 return false
431 }
432 }
433 return true
434}
435
436// applyCompare applies the compare request.
437// If the comparison succeeds, it returns true. Otherwise, returns false.
438func applyCompare(rv mvcc.ReadView, c *pb.Compare) bool {
439 // TODO: possible optimizations
440 // * chunk reads for large ranges to conserve memory
441 // * rewrite rules for common patterns:
442 // ex. "[a, b) createrev > 0" => "limit 1 /\ kvs > 0"
443 // * caching
444 rr, err := rv.Range(c.Key, mkGteRange(c.RangeEnd), mvcc.RangeOptions{})
445 if err != nil {
446 return false
447 }
448 if len(rr.KVs) == 0 {
449 if c.Target == pb.Compare_VALUE {
450 // Always fail if comparing a value on a key/keys that doesn't exist;
451 // nil == empty string in grpc; no way to represent missing value
452 return false
453 }
454 return compareKV(c, mvccpb.KeyValue{})
455 }
456 for _, kv := range rr.KVs {
457 if !compareKV(c, kv) {
458 return false
459 }
460 }
461 return true
462}
463
464func compareKV(c *pb.Compare, ckv mvccpb.KeyValue) bool {
465 var result int
466 rev := int64(0)
467 switch c.Target {
468 case pb.Compare_VALUE:
469 v := []byte{}
470 if tv, _ := c.TargetUnion.(*pb.Compare_Value); tv != nil {
471 v = tv.Value
472 }
473 result = bytes.Compare(ckv.Value, v)
474 case pb.Compare_CREATE:
475 if tv, _ := c.TargetUnion.(*pb.Compare_CreateRevision); tv != nil {
476 rev = tv.CreateRevision
477 }
478 result = compareInt64(ckv.CreateRevision, rev)
479 case pb.Compare_MOD:
480 if tv, _ := c.TargetUnion.(*pb.Compare_ModRevision); tv != nil {
481 rev = tv.ModRevision
482 }
483 result = compareInt64(ckv.ModRevision, rev)
484 case pb.Compare_VERSION:
485 if tv, _ := c.TargetUnion.(*pb.Compare_Version); tv != nil {
486 rev = tv.Version
487 }
488 result = compareInt64(ckv.Version, rev)
489 case pb.Compare_LEASE:
490 if tv, _ := c.TargetUnion.(*pb.Compare_Lease); tv != nil {
491 rev = tv.Lease
492 }
493 result = compareInt64(ckv.Lease, rev)
494 }
495 switch c.Result {
496 case pb.Compare_EQUAL:
497 return result == 0
498 case pb.Compare_NOT_EQUAL:
499 return result != 0
500 case pb.Compare_GREATER:
501 return result > 0
502 case pb.Compare_LESS:
503 return result < 0
504 }
505 return true
506}
507
508func (a *applierV3backend) applyTxn(txn mvcc.TxnWrite, rt *pb.TxnRequest, txnPath []bool, tresp *pb.TxnResponse) (txns int) {
509 reqs := rt.Success
510 if !txnPath[0] {
511 reqs = rt.Failure
512 }
513
514 lg := a.s.getLogger()
515 for i, req := range reqs {
516 respi := tresp.Responses[i].Response
517 switch tv := req.Request.(type) {
518 case *pb.RequestOp_RequestRange:
519 resp, err := a.Range(txn, tv.RequestRange)
520 if err != nil {
521 if lg != nil {
522 lg.Panic("unexpected error during txn", zap.Error(err))
523 } else {
524 plog.Panicf("unexpected error during txn: %v", err)
525 }
526 }
527 respi.(*pb.ResponseOp_ResponseRange).ResponseRange = resp
528 case *pb.RequestOp_RequestPut:
529 resp, err := a.Put(txn, tv.RequestPut)
530 if err != nil {
531 if lg != nil {
532 lg.Panic("unexpected error during txn", zap.Error(err))
533 } else {
534 plog.Panicf("unexpected error during txn: %v", err)
535 }
536 }
537 respi.(*pb.ResponseOp_ResponsePut).ResponsePut = resp
538 case *pb.RequestOp_RequestDeleteRange:
539 resp, err := a.DeleteRange(txn, tv.RequestDeleteRange)
540 if err != nil {
541 if lg != nil {
542 lg.Panic("unexpected error during txn", zap.Error(err))
543 } else {
544 plog.Panicf("unexpected error during txn: %v", err)
545 }
546 }
547 respi.(*pb.ResponseOp_ResponseDeleteRange).ResponseDeleteRange = resp
548 case *pb.RequestOp_RequestTxn:
549 resp := respi.(*pb.ResponseOp_ResponseTxn).ResponseTxn
550 applyTxns := a.applyTxn(txn, tv.RequestTxn, txnPath[1:], resp)
551 txns += applyTxns + 1
552 txnPath = txnPath[applyTxns+1:]
553 default:
554 // empty union
555 }
556 }
557 return txns
558}
559
560func (a *applierV3backend) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, error) {
561 resp := &pb.CompactionResponse{}
562 resp.Header = &pb.ResponseHeader{}
563 ch, err := a.s.KV().Compact(compaction.Revision)
564 if err != nil {
565 return nil, ch, err
566 }
567 // get the current revision. which key to get is not important.
568 rr, _ := a.s.KV().Range([]byte("compaction"), nil, mvcc.RangeOptions{})
569 resp.Header.Revision = rr.Rev
570 return resp, ch, err
571}
572
573func (a *applierV3backend) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
574 l, err := a.s.lessor.Grant(lease.LeaseID(lc.ID), lc.TTL)
575 resp := &pb.LeaseGrantResponse{}
576 if err == nil {
577 resp.ID = int64(l.ID)
578 resp.TTL = l.TTL()
579 resp.Header = newHeader(a.s)
580 }
581 return resp, err
582}
583
584func (a *applierV3backend) LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
585 err := a.s.lessor.Revoke(lease.LeaseID(lc.ID))
586 return &pb.LeaseRevokeResponse{Header: newHeader(a.s)}, err
587}
588
589func (a *applierV3backend) LeaseCheckpoint(lc *pb.LeaseCheckpointRequest) (*pb.LeaseCheckpointResponse, error) {
590 for _, c := range lc.Checkpoints {
591 err := a.s.lessor.Checkpoint(lease.LeaseID(c.ID), c.Remaining_TTL)
592 if err != nil {
593 return &pb.LeaseCheckpointResponse{Header: newHeader(a.s)}, err
594 }
595 }
596 return &pb.LeaseCheckpointResponse{Header: newHeader(a.s)}, nil
597}
598
599func (a *applierV3backend) Alarm(ar *pb.AlarmRequest) (*pb.AlarmResponse, error) {
600 resp := &pb.AlarmResponse{}
601 oldCount := len(a.s.alarmStore.Get(ar.Alarm))
602
603 lg := a.s.getLogger()
604 switch ar.Action {
605 case pb.AlarmRequest_GET:
606 resp.Alarms = a.s.alarmStore.Get(ar.Alarm)
607 case pb.AlarmRequest_ACTIVATE:
608 m := a.s.alarmStore.Activate(types.ID(ar.MemberID), ar.Alarm)
609 if m == nil {
610 break
611 }
612 resp.Alarms = append(resp.Alarms, m)
613 activated := oldCount == 0 && len(a.s.alarmStore.Get(m.Alarm)) == 1
614 if !activated {
615 break
616 }
617
618 if lg != nil {
619 lg.Warn("alarm raised", zap.String("alarm", m.Alarm.String()), zap.String("from", types.ID(m.MemberID).String()))
620 } else {
621 plog.Warningf("alarm %v raised by peer %s", m.Alarm, types.ID(m.MemberID))
622 }
623 switch m.Alarm {
624 case pb.AlarmType_CORRUPT:
625 a.s.applyV3 = newApplierV3Corrupt(a)
626 case pb.AlarmType_NOSPACE:
627 a.s.applyV3 = newApplierV3Capped(a)
628 default:
629 if lg != nil {
630 lg.Warn("unimplemented alarm activation", zap.String("alarm", fmt.Sprintf("%+v", m)))
631 } else {
632 plog.Errorf("unimplemented alarm activation (%+v)", m)
633 }
634 }
635 case pb.AlarmRequest_DEACTIVATE:
636 m := a.s.alarmStore.Deactivate(types.ID(ar.MemberID), ar.Alarm)
637 if m == nil {
638 break
639 }
640 resp.Alarms = append(resp.Alarms, m)
641 deactivated := oldCount > 0 && len(a.s.alarmStore.Get(ar.Alarm)) == 0
642 if !deactivated {
643 break
644 }
645
646 switch m.Alarm {
647 case pb.AlarmType_NOSPACE, pb.AlarmType_CORRUPT:
648 // TODO: check kv hash before deactivating CORRUPT?
649 if lg != nil {
650 lg.Warn("alarm disarmed", zap.String("alarm", m.Alarm.String()), zap.String("from", types.ID(m.MemberID).String()))
651 } else {
652 plog.Infof("alarm disarmed %+v", ar)
653 }
654 a.s.applyV3 = a.s.newApplierV3()
655 default:
656 if lg != nil {
657 lg.Warn("unimplemented alarm deactivation", zap.String("alarm", fmt.Sprintf("%+v", m)))
658 } else {
659 plog.Errorf("unimplemented alarm deactivation (%+v)", m)
660 }
661 }
662 default:
663 return nil, nil
664 }
665 return resp, nil
666}
667
668type applierV3Capped struct {
669 applierV3
670 q backendQuota
671}
672
673// newApplierV3Capped creates an applyV3 that will reject Puts and transactions
674// with Puts so that the number of keys in the store is capped.
675func newApplierV3Capped(base applierV3) applierV3 { return &applierV3Capped{applierV3: base} }
676
677func (a *applierV3Capped) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, error) {
678 return nil, ErrNoSpace
679}
680
681func (a *applierV3Capped) Txn(r *pb.TxnRequest) (*pb.TxnResponse, error) {
682 if a.q.Cost(r) > 0 {
683 return nil, ErrNoSpace
684 }
685 return a.applierV3.Txn(r)
686}
687
688func (a *applierV3Capped) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
689 return nil, ErrNoSpace
690}
691
692func (a *applierV3backend) AuthEnable() (*pb.AuthEnableResponse, error) {
693 err := a.s.AuthStore().AuthEnable()
694 if err != nil {
695 return nil, err
696 }
697 return &pb.AuthEnableResponse{Header: newHeader(a.s)}, nil
698}
699
700func (a *applierV3backend) AuthDisable() (*pb.AuthDisableResponse, error) {
701 a.s.AuthStore().AuthDisable()
702 return &pb.AuthDisableResponse{Header: newHeader(a.s)}, nil
703}
704
705func (a *applierV3backend) Authenticate(r *pb.InternalAuthenticateRequest) (*pb.AuthenticateResponse, error) {
706 ctx := context.WithValue(context.WithValue(a.s.ctx, auth.AuthenticateParamIndex{}, a.s.consistIndex.ConsistentIndex()), auth.AuthenticateParamSimpleTokenPrefix{}, r.SimpleToken)
707 resp, err := a.s.AuthStore().Authenticate(ctx, r.Name, r.Password)
708 if resp != nil {
709 resp.Header = newHeader(a.s)
710 }
711 return resp, err
712}
713
714func (a *applierV3backend) UserAdd(r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, error) {
715 resp, err := a.s.AuthStore().UserAdd(r)
716 if resp != nil {
717 resp.Header = newHeader(a.s)
718 }
719 return resp, err
720}
721
722func (a *applierV3backend) UserDelete(r *pb.AuthUserDeleteRequest) (*pb.AuthUserDeleteResponse, error) {
723 resp, err := a.s.AuthStore().UserDelete(r)
724 if resp != nil {
725 resp.Header = newHeader(a.s)
726 }
727 return resp, err
728}
729
730func (a *applierV3backend) UserChangePassword(r *pb.AuthUserChangePasswordRequest) (*pb.AuthUserChangePasswordResponse, error) {
731 resp, err := a.s.AuthStore().UserChangePassword(r)
732 if resp != nil {
733 resp.Header = newHeader(a.s)
734 }
735 return resp, err
736}
737
738func (a *applierV3backend) UserGrantRole(r *pb.AuthUserGrantRoleRequest) (*pb.AuthUserGrantRoleResponse, error) {
739 resp, err := a.s.AuthStore().UserGrantRole(r)
740 if resp != nil {
741 resp.Header = newHeader(a.s)
742 }
743 return resp, err
744}
745
746func (a *applierV3backend) UserGet(r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, error) {
747 resp, err := a.s.AuthStore().UserGet(r)
748 if resp != nil {
749 resp.Header = newHeader(a.s)
750 }
751 return resp, err
752}
753
754func (a *applierV3backend) UserRevokeRole(r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUserRevokeRoleResponse, error) {
755 resp, err := a.s.AuthStore().UserRevokeRole(r)
756 if resp != nil {
757 resp.Header = newHeader(a.s)
758 }
759 return resp, err
760}
761
762func (a *applierV3backend) RoleAdd(r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, error) {
763 resp, err := a.s.AuthStore().RoleAdd(r)
764 if resp != nil {
765 resp.Header = newHeader(a.s)
766 }
767 return resp, err
768}
769
770func (a *applierV3backend) RoleGrantPermission(r *pb.AuthRoleGrantPermissionRequest) (*pb.AuthRoleGrantPermissionResponse, error) {
771 resp, err := a.s.AuthStore().RoleGrantPermission(r)
772 if resp != nil {
773 resp.Header = newHeader(a.s)
774 }
775 return resp, err
776}
777
778func (a *applierV3backend) RoleGet(r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, error) {
779 resp, err := a.s.AuthStore().RoleGet(r)
780 if resp != nil {
781 resp.Header = newHeader(a.s)
782 }
783 return resp, err
784}
785
786func (a *applierV3backend) RoleRevokePermission(r *pb.AuthRoleRevokePermissionRequest) (*pb.AuthRoleRevokePermissionResponse, error) {
787 resp, err := a.s.AuthStore().RoleRevokePermission(r)
788 if resp != nil {
789 resp.Header = newHeader(a.s)
790 }
791 return resp, err
792}
793
794func (a *applierV3backend) RoleDelete(r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDeleteResponse, error) {
795 resp, err := a.s.AuthStore().RoleDelete(r)
796 if resp != nil {
797 resp.Header = newHeader(a.s)
798 }
799 return resp, err
800}
801
802func (a *applierV3backend) UserList(r *pb.AuthUserListRequest) (*pb.AuthUserListResponse, error) {
803 resp, err := a.s.AuthStore().UserList(r)
804 if resp != nil {
805 resp.Header = newHeader(a.s)
806 }
807 return resp, err
808}
809
810func (a *applierV3backend) RoleList(r *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error) {
811 resp, err := a.s.AuthStore().RoleList(r)
812 if resp != nil {
813 resp.Header = newHeader(a.s)
814 }
815 return resp, err
816}
817
818type quotaApplierV3 struct {
819 applierV3
820 q Quota
821}
822
823func newQuotaApplierV3(s *EtcdServer, app applierV3) applierV3 {
824 return &quotaApplierV3{app, NewBackendQuota(s, "v3-applier")}
825}
826
827func (a *quotaApplierV3) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, error) {
828 ok := a.q.Available(p)
829 resp, err := a.applierV3.Put(txn, p)
830 if err == nil && !ok {
831 err = ErrNoSpace
832 }
833 return resp, err
834}
835
836func (a *quotaApplierV3) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
837 ok := a.q.Available(rt)
838 resp, err := a.applierV3.Txn(rt)
839 if err == nil && !ok {
840 err = ErrNoSpace
841 }
842 return resp, err
843}
844
845func (a *quotaApplierV3) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
846 ok := a.q.Available(lc)
847 resp, err := a.applierV3.LeaseGrant(lc)
848 if err == nil && !ok {
849 err = ErrNoSpace
850 }
851 return resp, err
852}
853
854type kvSort struct{ kvs []mvccpb.KeyValue }
855
856func (s *kvSort) Swap(i, j int) {
857 t := s.kvs[i]
858 s.kvs[i] = s.kvs[j]
859 s.kvs[j] = t
860}
861func (s *kvSort) Len() int { return len(s.kvs) }
862
863type kvSortByKey struct{ *kvSort }
864
865func (s *kvSortByKey) Less(i, j int) bool {
866 return bytes.Compare(s.kvs[i].Key, s.kvs[j].Key) < 0
867}
868
869type kvSortByVersion struct{ *kvSort }
870
871func (s *kvSortByVersion) Less(i, j int) bool {
872 return (s.kvs[i].Version - s.kvs[j].Version) < 0
873}
874
875type kvSortByCreate struct{ *kvSort }
876
877func (s *kvSortByCreate) Less(i, j int) bool {
878 return (s.kvs[i].CreateRevision - s.kvs[j].CreateRevision) < 0
879}
880
881type kvSortByMod struct{ *kvSort }
882
883func (s *kvSortByMod) Less(i, j int) bool {
884 return (s.kvs[i].ModRevision - s.kvs[j].ModRevision) < 0
885}
886
887type kvSortByValue struct{ *kvSort }
888
889func (s *kvSortByValue) Less(i, j int) bool {
890 return bytes.Compare(s.kvs[i].Value, s.kvs[j].Value) < 0
891}
892
893func checkRequests(rv mvcc.ReadView, rt *pb.TxnRequest, txnPath []bool, f checkReqFunc) (int, error) {
894 txnCount := 0
895 reqs := rt.Success
896 if !txnPath[0] {
897 reqs = rt.Failure
898 }
899 for _, req := range reqs {
900 if tv, ok := req.Request.(*pb.RequestOp_RequestTxn); ok && tv.RequestTxn != nil {
901 txns, err := checkRequests(rv, tv.RequestTxn, txnPath[1:], f)
902 if err != nil {
903 return 0, err
904 }
905 txnCount += txns + 1
906 txnPath = txnPath[txns+1:]
907 continue
908 }
909 if err := f(rv, req); err != nil {
910 return 0, err
911 }
912 }
913 return txnCount, nil
914}
915
916func (a *applierV3backend) checkRequestPut(rv mvcc.ReadView, reqOp *pb.RequestOp) error {
917 tv, ok := reqOp.Request.(*pb.RequestOp_RequestPut)
918 if !ok || tv.RequestPut == nil {
919 return nil
920 }
921 req := tv.RequestPut
922 if req.IgnoreValue || req.IgnoreLease {
923 // expects previous key-value, error if not exist
924 rr, err := rv.Range(req.Key, nil, mvcc.RangeOptions{})
925 if err != nil {
926 return err
927 }
928 if rr == nil || len(rr.KVs) == 0 {
929 return ErrKeyNotFound
930 }
931 }
932 if lease.LeaseID(req.Lease) != lease.NoLease {
933 if l := a.s.lessor.Lookup(lease.LeaseID(req.Lease)); l == nil {
934 return lease.ErrLeaseNotFound
935 }
936 }
937 return nil
938}
939
940func (a *applierV3backend) checkRequestRange(rv mvcc.ReadView, reqOp *pb.RequestOp) error {
941 tv, ok := reqOp.Request.(*pb.RequestOp_RequestRange)
942 if !ok || tv.RequestRange == nil {
943 return nil
944 }
945 req := tv.RequestRange
946 switch {
947 case req.Revision == 0:
948 return nil
949 case req.Revision > rv.Rev():
950 return mvcc.ErrFutureRev
951 case req.Revision < rv.FirstRev():
952 return mvcc.ErrCompacted
953 }
954 return nil
955}
956
957func compareInt64(a, b int64) int {
958 switch {
959 case a < b:
960 return -1
961 case a > b:
962 return 1
963 default:
964 return 0
965 }
966}
967
968// mkGteRange determines if the range end is a >= range. This works around grpc
969// sending empty byte strings as nil; >= is encoded in the range end as '\0'.
970// If it is a GTE range, then []byte{} is returned to indicate the empty byte
971// string (vs nil being no byte string).
972func mkGteRange(rangeEnd []byte) []byte {
973 if len(rangeEnd) == 1 && rangeEnd[0] == 0 {
974 return []byte{}
975 }
976 return rangeEnd
977}
978
979func noSideEffect(r *pb.InternalRaftRequest) bool {
980 return r.Range != nil || r.AuthUserGet != nil || r.AuthRoleGet != nil
981}
982
983func removeNeedlessRangeReqs(txn *pb.TxnRequest) {
984 f := func(ops []*pb.RequestOp) []*pb.RequestOp {
985 j := 0
986 for i := 0; i < len(ops); i++ {
987 if _, ok := ops[i].Request.(*pb.RequestOp_RequestRange); ok {
988 continue
989 }
990 ops[j] = ops[i]
991 j++
992 }
993
994 return ops[:j]
995 }
996
997 txn.Success = f(txn.Success)
998 txn.Failure = f(txn.Failure)
999}
1000
1001func pruneKVs(rr *mvcc.RangeResult, isPrunable func(*mvccpb.KeyValue) bool) {
1002 j := 0
1003 for i := range rr.KVs {
1004 rr.KVs[j] = rr.KVs[i]
1005 if !isPrunable(&rr.KVs[i]) {
1006 j++
1007 }
1008 }
1009 rr.KVs = rr.KVs[:j]
1010}
1011
1012func newHeader(s *EtcdServer) *pb.ResponseHeader {
1013 return &pb.ResponseHeader{
1014 ClusterId: uint64(s.Cluster().ID()),
1015 MemberId: uint64(s.ID()),
1016 Revision: s.KV().Rev(),
1017 RaftTerm: s.Term(),
1018 }
1019}