blob: 555338574a5fa2ca830d4fbdc3e0f48e5245392b [file] [log] [blame]
khenaidood948f772021-08-11 17:49:24 -04001// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package etcdserver
16
17import (
18 "bytes"
19 "context"
20 "sort"
21 "time"
22
23 "github.com/coreos/etcd/auth"
24 pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
25 "github.com/coreos/etcd/lease"
26 "github.com/coreos/etcd/mvcc"
27 "github.com/coreos/etcd/mvcc/mvccpb"
28 "github.com/coreos/etcd/pkg/types"
29
30 "github.com/gogo/protobuf/proto"
31)
32
33const (
34 warnApplyDuration = 100 * time.Millisecond
35)
36
37type applyResult struct {
38 resp proto.Message
39 err error
40 // physc signals the physical effect of the request has completed in addition
41 // to being logically reflected by the node. Currently only used for
42 // Compaction requests.
43 physc <-chan struct{}
44}
45
46// applierV3 is the interface for processing V3 raft messages
47type applierV3 interface {
48 Apply(r *pb.InternalRaftRequest) *applyResult
49
50 Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, error)
51 Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error)
52 DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error)
53 Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error)
54 Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, error)
55
56 LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error)
57 LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error)
58
59 Alarm(*pb.AlarmRequest) (*pb.AlarmResponse, error)
60
61 Authenticate(r *pb.InternalAuthenticateRequest) (*pb.AuthenticateResponse, error)
62
63 AuthEnable() (*pb.AuthEnableResponse, error)
64 AuthDisable() (*pb.AuthDisableResponse, error)
65
66 UserAdd(ua *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, error)
67 UserDelete(ua *pb.AuthUserDeleteRequest) (*pb.AuthUserDeleteResponse, error)
68 UserChangePassword(ua *pb.AuthUserChangePasswordRequest) (*pb.AuthUserChangePasswordResponse, error)
69 UserGrantRole(ua *pb.AuthUserGrantRoleRequest) (*pb.AuthUserGrantRoleResponse, error)
70 UserGet(ua *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, error)
71 UserRevokeRole(ua *pb.AuthUserRevokeRoleRequest) (*pb.AuthUserRevokeRoleResponse, error)
72 RoleAdd(ua *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, error)
73 RoleGrantPermission(ua *pb.AuthRoleGrantPermissionRequest) (*pb.AuthRoleGrantPermissionResponse, error)
74 RoleGet(ua *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, error)
75 RoleRevokePermission(ua *pb.AuthRoleRevokePermissionRequest) (*pb.AuthRoleRevokePermissionResponse, error)
76 RoleDelete(ua *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDeleteResponse, error)
77 UserList(ua *pb.AuthUserListRequest) (*pb.AuthUserListResponse, error)
78 RoleList(ua *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error)
79}
80
81type checkReqFunc func(mvcc.ReadView, *pb.RequestOp) error
82
83type applierV3backend struct {
84 s *EtcdServer
85
86 checkPut checkReqFunc
87 checkRange checkReqFunc
88}
89
90func (s *EtcdServer) newApplierV3Backend() applierV3 {
91 base := &applierV3backend{s: s}
92 base.checkPut = func(rv mvcc.ReadView, req *pb.RequestOp) error {
93 return base.checkRequestPut(rv, req)
94 }
95 base.checkRange = func(rv mvcc.ReadView, req *pb.RequestOp) error {
96 return base.checkRequestRange(rv, req)
97 }
98 return base
99}
100
101func (s *EtcdServer) newApplierV3() applierV3 {
102 return newAuthApplierV3(
103 s.AuthStore(),
104 newQuotaApplierV3(s, s.newApplierV3Backend()),
105 s.lessor,
106 )
107}
108
109func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult {
110 ar := &applyResult{}
111 defer func(start time.Time) {
112 warnOfExpensiveRequest(start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err)
113 if ar.err != nil {
114 warnOfFailedRequest(start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err)
115 }
116 }(time.Now())
117
118 // call into a.s.applyV3.F instead of a.F so upper appliers can check individual calls
119 switch {
120 case r.Range != nil:
121 ar.resp, ar.err = a.s.applyV3.Range(nil, r.Range)
122 case r.Put != nil:
123 ar.resp, ar.err = a.s.applyV3.Put(nil, r.Put)
124 case r.DeleteRange != nil:
125 ar.resp, ar.err = a.s.applyV3.DeleteRange(nil, r.DeleteRange)
126 case r.Txn != nil:
127 ar.resp, ar.err = a.s.applyV3.Txn(r.Txn)
128 case r.Compaction != nil:
129 ar.resp, ar.physc, ar.err = a.s.applyV3.Compaction(r.Compaction)
130 case r.LeaseGrant != nil:
131 ar.resp, ar.err = a.s.applyV3.LeaseGrant(r.LeaseGrant)
132 case r.LeaseRevoke != nil:
133 ar.resp, ar.err = a.s.applyV3.LeaseRevoke(r.LeaseRevoke)
134 case r.Alarm != nil:
135 ar.resp, ar.err = a.s.applyV3.Alarm(r.Alarm)
136 case r.Authenticate != nil:
137 ar.resp, ar.err = a.s.applyV3.Authenticate(r.Authenticate)
138 case r.AuthEnable != nil:
139 ar.resp, ar.err = a.s.applyV3.AuthEnable()
140 case r.AuthDisable != nil:
141 ar.resp, ar.err = a.s.applyV3.AuthDisable()
142 case r.AuthUserAdd != nil:
143 ar.resp, ar.err = a.s.applyV3.UserAdd(r.AuthUserAdd)
144 case r.AuthUserDelete != nil:
145 ar.resp, ar.err = a.s.applyV3.UserDelete(r.AuthUserDelete)
146 case r.AuthUserChangePassword != nil:
147 ar.resp, ar.err = a.s.applyV3.UserChangePassword(r.AuthUserChangePassword)
148 case r.AuthUserGrantRole != nil:
149 ar.resp, ar.err = a.s.applyV3.UserGrantRole(r.AuthUserGrantRole)
150 case r.AuthUserGet != nil:
151 ar.resp, ar.err = a.s.applyV3.UserGet(r.AuthUserGet)
152 case r.AuthUserRevokeRole != nil:
153 ar.resp, ar.err = a.s.applyV3.UserRevokeRole(r.AuthUserRevokeRole)
154 case r.AuthRoleAdd != nil:
155 ar.resp, ar.err = a.s.applyV3.RoleAdd(r.AuthRoleAdd)
156 case r.AuthRoleGrantPermission != nil:
157 ar.resp, ar.err = a.s.applyV3.RoleGrantPermission(r.AuthRoleGrantPermission)
158 case r.AuthRoleGet != nil:
159 ar.resp, ar.err = a.s.applyV3.RoleGet(r.AuthRoleGet)
160 case r.AuthRoleRevokePermission != nil:
161 ar.resp, ar.err = a.s.applyV3.RoleRevokePermission(r.AuthRoleRevokePermission)
162 case r.AuthRoleDelete != nil:
163 ar.resp, ar.err = a.s.applyV3.RoleDelete(r.AuthRoleDelete)
164 case r.AuthUserList != nil:
165 ar.resp, ar.err = a.s.applyV3.UserList(r.AuthUserList)
166 case r.AuthRoleList != nil:
167 ar.resp, ar.err = a.s.applyV3.RoleList(r.AuthRoleList)
168 default:
169 panic("not implemented")
170 }
171 return ar
172}
173
174func (a *applierV3backend) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, err error) {
175 resp = &pb.PutResponse{}
176 resp.Header = &pb.ResponseHeader{}
177
178 val, leaseID := p.Value, lease.LeaseID(p.Lease)
179 if txn == nil {
180 if leaseID != lease.NoLease {
181 if l := a.s.lessor.Lookup(leaseID); l == nil {
182 return nil, lease.ErrLeaseNotFound
183 }
184 }
185 txn = a.s.KV().Write()
186 defer txn.End()
187 }
188
189 var rr *mvcc.RangeResult
190 if p.IgnoreValue || p.IgnoreLease || p.PrevKv {
191 rr, err = txn.Range(p.Key, nil, mvcc.RangeOptions{})
192 if err != nil {
193 return nil, err
194 }
195 }
196 if p.IgnoreValue || p.IgnoreLease {
197 if rr == nil || len(rr.KVs) == 0 {
198 // ignore_{lease,value} flag expects previous key-value pair
199 return nil, ErrKeyNotFound
200 }
201 }
202 if p.IgnoreValue {
203 val = rr.KVs[0].Value
204 }
205 if p.IgnoreLease {
206 leaseID = lease.LeaseID(rr.KVs[0].Lease)
207 }
208 if p.PrevKv {
209 if rr != nil && len(rr.KVs) != 0 {
210 resp.PrevKv = &rr.KVs[0]
211 }
212 }
213
214 resp.Header.Revision = txn.Put(p.Key, val, leaseID)
215 return resp, nil
216}
217
218func (a *applierV3backend) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
219 resp := &pb.DeleteRangeResponse{}
220 resp.Header = &pb.ResponseHeader{}
221 end := mkGteRange(dr.RangeEnd)
222
223 if txn == nil {
224 txn = a.s.kv.Write()
225 defer txn.End()
226 }
227
228 if dr.PrevKv {
229 rr, err := txn.Range(dr.Key, end, mvcc.RangeOptions{})
230 if err != nil {
231 return nil, err
232 }
233 if rr != nil {
234 resp.PrevKvs = make([]*mvccpb.KeyValue, len(rr.KVs))
235 for i := range rr.KVs {
236 resp.PrevKvs[i] = &rr.KVs[i]
237 }
238 }
239 }
240
241 resp.Deleted, resp.Header.Revision = txn.DeleteRange(dr.Key, end)
242 return resp, nil
243}
244
245func (a *applierV3backend) Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) {
246 resp := &pb.RangeResponse{}
247 resp.Header = &pb.ResponseHeader{}
248
249 if txn == nil {
250 txn = a.s.kv.Read()
251 defer txn.End()
252 }
253
254 limit := r.Limit
255 if r.SortOrder != pb.RangeRequest_NONE ||
256 r.MinModRevision != 0 || r.MaxModRevision != 0 ||
257 r.MinCreateRevision != 0 || r.MaxCreateRevision != 0 {
258 // fetch everything; sort and truncate afterwards
259 limit = 0
260 }
261 if limit > 0 {
262 // fetch one extra for 'more' flag
263 limit = limit + 1
264 }
265
266 ro := mvcc.RangeOptions{
267 Limit: limit,
268 Rev: r.Revision,
269 Count: r.CountOnly,
270 }
271
272 rr, err := txn.Range(r.Key, mkGteRange(r.RangeEnd), ro)
273 if err != nil {
274 return nil, err
275 }
276
277 if r.MaxModRevision != 0 {
278 f := func(kv *mvccpb.KeyValue) bool { return kv.ModRevision > r.MaxModRevision }
279 pruneKVs(rr, f)
280 }
281 if r.MinModRevision != 0 {
282 f := func(kv *mvccpb.KeyValue) bool { return kv.ModRevision < r.MinModRevision }
283 pruneKVs(rr, f)
284 }
285 if r.MaxCreateRevision != 0 {
286 f := func(kv *mvccpb.KeyValue) bool { return kv.CreateRevision > r.MaxCreateRevision }
287 pruneKVs(rr, f)
288 }
289 if r.MinCreateRevision != 0 {
290 f := func(kv *mvccpb.KeyValue) bool { return kv.CreateRevision < r.MinCreateRevision }
291 pruneKVs(rr, f)
292 }
293
294 sortOrder := r.SortOrder
295 if r.SortTarget != pb.RangeRequest_KEY && sortOrder == pb.RangeRequest_NONE {
296 // Since current mvcc.Range implementation returns results
297 // sorted by keys in lexiographically ascending order,
298 // sort ASCEND by default only when target is not 'KEY'
299 sortOrder = pb.RangeRequest_ASCEND
300 }
301 if sortOrder != pb.RangeRequest_NONE {
302 var sorter sort.Interface
303 switch {
304 case r.SortTarget == pb.RangeRequest_KEY:
305 sorter = &kvSortByKey{&kvSort{rr.KVs}}
306 case r.SortTarget == pb.RangeRequest_VERSION:
307 sorter = &kvSortByVersion{&kvSort{rr.KVs}}
308 case r.SortTarget == pb.RangeRequest_CREATE:
309 sorter = &kvSortByCreate{&kvSort{rr.KVs}}
310 case r.SortTarget == pb.RangeRequest_MOD:
311 sorter = &kvSortByMod{&kvSort{rr.KVs}}
312 case r.SortTarget == pb.RangeRequest_VALUE:
313 sorter = &kvSortByValue{&kvSort{rr.KVs}}
314 }
315 switch {
316 case sortOrder == pb.RangeRequest_ASCEND:
317 sort.Sort(sorter)
318 case sortOrder == pb.RangeRequest_DESCEND:
319 sort.Sort(sort.Reverse(sorter))
320 }
321 }
322
323 if r.Limit > 0 && len(rr.KVs) > int(r.Limit) {
324 rr.KVs = rr.KVs[:r.Limit]
325 resp.More = true
326 }
327
328 resp.Header.Revision = rr.Rev
329 resp.Count = int64(rr.Count)
330 resp.Kvs = make([]*mvccpb.KeyValue, len(rr.KVs))
331 for i := range rr.KVs {
332 if r.KeysOnly {
333 rr.KVs[i].Value = nil
334 }
335 resp.Kvs[i] = &rr.KVs[i]
336 }
337 return resp, nil
338}
339
340func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
341 isWrite := !isTxnReadonly(rt)
342 txn := mvcc.NewReadOnlyTxnWrite(a.s.KV().Read())
343
344 txnPath := compareToPath(txn, rt)
345 if isWrite {
346 if _, err := checkRequests(txn, rt, txnPath, a.checkPut); err != nil {
347 txn.End()
348 return nil, err
349 }
350 }
351 if _, err := checkRequests(txn, rt, txnPath, a.checkRange); err != nil {
352 txn.End()
353 return nil, err
354 }
355
356 txnResp, _ := newTxnResp(rt, txnPath)
357
358 // When executing mutable txn ops, etcd must hold the txn lock so
359 // readers do not see any intermediate results. Since writes are
360 // serialized on the raft loop, the revision in the read view will
361 // be the revision of the write txn.
362 if isWrite {
363 txn.End()
364 txn = a.s.KV().Write()
365 }
366 a.applyTxn(txn, rt, txnPath, txnResp)
367 rev := txn.Rev()
368 if len(txn.Changes()) != 0 {
369 rev++
370 }
371 txn.End()
372
373 txnResp.Header.Revision = rev
374 return txnResp, nil
375}
376
377// newTxnResp allocates a txn response for a txn request given a path.
378func newTxnResp(rt *pb.TxnRequest, txnPath []bool) (txnResp *pb.TxnResponse, txnCount int) {
379 reqs := rt.Success
380 if !txnPath[0] {
381 reqs = rt.Failure
382 }
383 resps := make([]*pb.ResponseOp, len(reqs))
384 txnResp = &pb.TxnResponse{
385 Responses: resps,
386 Succeeded: txnPath[0],
387 Header: &pb.ResponseHeader{},
388 }
389 for i, req := range reqs {
390 switch tv := req.Request.(type) {
391 case *pb.RequestOp_RequestRange:
392 resps[i] = &pb.ResponseOp{Response: &pb.ResponseOp_ResponseRange{}}
393 case *pb.RequestOp_RequestPut:
394 resps[i] = &pb.ResponseOp{Response: &pb.ResponseOp_ResponsePut{}}
395 case *pb.RequestOp_RequestDeleteRange:
396 resps[i] = &pb.ResponseOp{Response: &pb.ResponseOp_ResponseDeleteRange{}}
397 case *pb.RequestOp_RequestTxn:
398 resp, txns := newTxnResp(tv.RequestTxn, txnPath[1:])
399 resps[i] = &pb.ResponseOp{Response: &pb.ResponseOp_ResponseTxn{ResponseTxn: resp}}
400 txnPath = txnPath[1+txns:]
401 txnCount += txns + 1
402 default:
403 }
404 }
405 return txnResp, txnCount
406}
407
408func compareToPath(rv mvcc.ReadView, rt *pb.TxnRequest) []bool {
409 txnPath := make([]bool, 1)
410 ops := rt.Success
411 if txnPath[0] = applyCompares(rv, rt.Compare); !txnPath[0] {
412 ops = rt.Failure
413 }
414 for _, op := range ops {
415 tv, ok := op.Request.(*pb.RequestOp_RequestTxn)
416 if !ok || tv.RequestTxn == nil {
417 continue
418 }
419 txnPath = append(txnPath, compareToPath(rv, tv.RequestTxn)...)
420 }
421 return txnPath
422}
423
424func applyCompares(rv mvcc.ReadView, cmps []*pb.Compare) bool {
425 for _, c := range cmps {
426 if !applyCompare(rv, c) {
427 return false
428 }
429 }
430 return true
431}
432
433// applyCompare applies the compare request.
434// If the comparison succeeds, it returns true. Otherwise, returns false.
435func applyCompare(rv mvcc.ReadView, c *pb.Compare) bool {
436 // TODO: possible optimizations
437 // * chunk reads for large ranges to conserve memory
438 // * rewrite rules for common patterns:
439 // ex. "[a, b) createrev > 0" => "limit 1 /\ kvs > 0"
440 // * caching
441 rr, err := rv.Range(c.Key, mkGteRange(c.RangeEnd), mvcc.RangeOptions{})
442 if err != nil {
443 return false
444 }
445 if len(rr.KVs) == 0 {
446 if c.Target == pb.Compare_VALUE {
447 // Always fail if comparing a value on a key/keys that doesn't exist;
448 // nil == empty string in grpc; no way to represent missing value
449 return false
450 }
451 return compareKV(c, mvccpb.KeyValue{})
452 }
453 for _, kv := range rr.KVs {
454 if !compareKV(c, kv) {
455 return false
456 }
457 }
458 return true
459}
460
461func compareKV(c *pb.Compare, ckv mvccpb.KeyValue) bool {
462 var result int
463 rev := int64(0)
464 switch c.Target {
465 case pb.Compare_VALUE:
466 v := []byte{}
467 if tv, _ := c.TargetUnion.(*pb.Compare_Value); tv != nil {
468 v = tv.Value
469 }
470 result = bytes.Compare(ckv.Value, v)
471 case pb.Compare_CREATE:
472 if tv, _ := c.TargetUnion.(*pb.Compare_CreateRevision); tv != nil {
473 rev = tv.CreateRevision
474 }
475 result = compareInt64(ckv.CreateRevision, rev)
476 case pb.Compare_MOD:
477 if tv, _ := c.TargetUnion.(*pb.Compare_ModRevision); tv != nil {
478 rev = tv.ModRevision
479 }
480 result = compareInt64(ckv.ModRevision, rev)
481 case pb.Compare_VERSION:
482 if tv, _ := c.TargetUnion.(*pb.Compare_Version); tv != nil {
483 rev = tv.Version
484 }
485 result = compareInt64(ckv.Version, rev)
486 case pb.Compare_LEASE:
487 if tv, _ := c.TargetUnion.(*pb.Compare_Lease); tv != nil {
488 rev = tv.Lease
489 }
490 result = compareInt64(ckv.Lease, rev)
491 }
492 switch c.Result {
493 case pb.Compare_EQUAL:
494 return result == 0
495 case pb.Compare_NOT_EQUAL:
496 return result != 0
497 case pb.Compare_GREATER:
498 return result > 0
499 case pb.Compare_LESS:
500 return result < 0
501 }
502 return true
503}
504
505func (a *applierV3backend) applyTxn(txn mvcc.TxnWrite, rt *pb.TxnRequest, txnPath []bool, tresp *pb.TxnResponse) (txns int) {
506 reqs := rt.Success
507 if !txnPath[0] {
508 reqs = rt.Failure
509 }
510 for i, req := range reqs {
511 respi := tresp.Responses[i].Response
512 switch tv := req.Request.(type) {
513 case *pb.RequestOp_RequestRange:
514 resp, err := a.Range(txn, tv.RequestRange)
515 if err != nil {
516 plog.Panicf("unexpected error during txn: %v", err)
517 }
518 respi.(*pb.ResponseOp_ResponseRange).ResponseRange = resp
519 case *pb.RequestOp_RequestPut:
520 resp, err := a.Put(txn, tv.RequestPut)
521 if err != nil {
522 plog.Panicf("unexpected error during txn: %v", err)
523 }
524 respi.(*pb.ResponseOp_ResponsePut).ResponsePut = resp
525 case *pb.RequestOp_RequestDeleteRange:
526 resp, err := a.DeleteRange(txn, tv.RequestDeleteRange)
527 if err != nil {
528 plog.Panicf("unexpected error during txn: %v", err)
529 }
530 respi.(*pb.ResponseOp_ResponseDeleteRange).ResponseDeleteRange = resp
531 case *pb.RequestOp_RequestTxn:
532 resp := respi.(*pb.ResponseOp_ResponseTxn).ResponseTxn
533 applyTxns := a.applyTxn(txn, tv.RequestTxn, txnPath[1:], resp)
534 txns += applyTxns + 1
535 txnPath = txnPath[applyTxns+1:]
536 default:
537 // empty union
538 }
539 }
540 return txns
541}
542
543func (a *applierV3backend) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, error) {
544 resp := &pb.CompactionResponse{}
545 resp.Header = &pb.ResponseHeader{}
546 ch, err := a.s.KV().Compact(compaction.Revision)
547 if err != nil {
548 return nil, ch, err
549 }
550 // get the current revision. which key to get is not important.
551 rr, _ := a.s.KV().Range([]byte("compaction"), nil, mvcc.RangeOptions{})
552 resp.Header.Revision = rr.Rev
553 return resp, ch, err
554}
555
556func (a *applierV3backend) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
557 l, err := a.s.lessor.Grant(lease.LeaseID(lc.ID), lc.TTL)
558 resp := &pb.LeaseGrantResponse{}
559 if err == nil {
560 resp.ID = int64(l.ID)
561 resp.TTL = l.TTL()
562 resp.Header = newHeader(a.s)
563 }
564 return resp, err
565}
566
567func (a *applierV3backend) LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
568 err := a.s.lessor.Revoke(lease.LeaseID(lc.ID))
569 return &pb.LeaseRevokeResponse{Header: newHeader(a.s)}, err
570}
571
572func (a *applierV3backend) Alarm(ar *pb.AlarmRequest) (*pb.AlarmResponse, error) {
573 resp := &pb.AlarmResponse{}
574 oldCount := len(a.s.alarmStore.Get(ar.Alarm))
575
576 switch ar.Action {
577 case pb.AlarmRequest_GET:
578 resp.Alarms = a.s.alarmStore.Get(ar.Alarm)
579 case pb.AlarmRequest_ACTIVATE:
580 m := a.s.alarmStore.Activate(types.ID(ar.MemberID), ar.Alarm)
581 if m == nil {
582 break
583 }
584 resp.Alarms = append(resp.Alarms, m)
585 activated := oldCount == 0 && len(a.s.alarmStore.Get(m.Alarm)) == 1
586 if !activated {
587 break
588 }
589
590 plog.Warningf("alarm %v raised by peer %s", m.Alarm, types.ID(m.MemberID))
591 switch m.Alarm {
592 case pb.AlarmType_CORRUPT:
593 a.s.applyV3 = newApplierV3Corrupt(a)
594 case pb.AlarmType_NOSPACE:
595 a.s.applyV3 = newApplierV3Capped(a)
596 default:
597 plog.Errorf("unimplemented alarm activation (%+v)", m)
598 }
599 case pb.AlarmRequest_DEACTIVATE:
600 m := a.s.alarmStore.Deactivate(types.ID(ar.MemberID), ar.Alarm)
601 if m == nil {
602 break
603 }
604 resp.Alarms = append(resp.Alarms, m)
605 deactivated := oldCount > 0 && len(a.s.alarmStore.Get(ar.Alarm)) == 0
606 if !deactivated {
607 break
608 }
609
610 switch m.Alarm {
611 case pb.AlarmType_NOSPACE, pb.AlarmType_CORRUPT:
612 // TODO: check kv hash before deactivating CORRUPT?
613 plog.Infof("alarm disarmed %+v", ar)
614 a.s.applyV3 = a.s.newApplierV3()
615 default:
616 plog.Errorf("unimplemented alarm deactivation (%+v)", m)
617 }
618 default:
619 return nil, nil
620 }
621 return resp, nil
622}
623
624type applierV3Capped struct {
625 applierV3
626 q backendQuota
627}
628
629// newApplierV3Capped creates an applyV3 that will reject Puts and transactions
630// with Puts so that the number of keys in the store is capped.
631func newApplierV3Capped(base applierV3) applierV3 { return &applierV3Capped{applierV3: base} }
632
633func (a *applierV3Capped) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, error) {
634 return nil, ErrNoSpace
635}
636
637func (a *applierV3Capped) Txn(r *pb.TxnRequest) (*pb.TxnResponse, error) {
638 if a.q.Cost(r) > 0 {
639 return nil, ErrNoSpace
640 }
641 return a.applierV3.Txn(r)
642}
643
644func (a *applierV3Capped) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
645 return nil, ErrNoSpace
646}
647
648func (a *applierV3backend) AuthEnable() (*pb.AuthEnableResponse, error) {
649 err := a.s.AuthStore().AuthEnable()
650 if err != nil {
651 return nil, err
652 }
653 return &pb.AuthEnableResponse{Header: newHeader(a.s)}, nil
654}
655
656func (a *applierV3backend) AuthDisable() (*pb.AuthDisableResponse, error) {
657 a.s.AuthStore().AuthDisable()
658 return &pb.AuthDisableResponse{Header: newHeader(a.s)}, nil
659}
660
661func (a *applierV3backend) Authenticate(r *pb.InternalAuthenticateRequest) (*pb.AuthenticateResponse, error) {
662 ctx := context.WithValue(context.WithValue(a.s.ctx, auth.AuthenticateParamIndex{}, a.s.consistIndex.ConsistentIndex()), auth.AuthenticateParamSimpleTokenPrefix{}, r.SimpleToken)
663 resp, err := a.s.AuthStore().Authenticate(ctx, r.Name, r.Password)
664 if resp != nil {
665 resp.Header = newHeader(a.s)
666 }
667 return resp, err
668}
669
670func (a *applierV3backend) UserAdd(r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, error) {
671 resp, err := a.s.AuthStore().UserAdd(r)
672 if resp != nil {
673 resp.Header = newHeader(a.s)
674 }
675 return resp, err
676}
677
678func (a *applierV3backend) UserDelete(r *pb.AuthUserDeleteRequest) (*pb.AuthUserDeleteResponse, error) {
679 resp, err := a.s.AuthStore().UserDelete(r)
680 if resp != nil {
681 resp.Header = newHeader(a.s)
682 }
683 return resp, err
684}
685
686func (a *applierV3backend) UserChangePassword(r *pb.AuthUserChangePasswordRequest) (*pb.AuthUserChangePasswordResponse, error) {
687 resp, err := a.s.AuthStore().UserChangePassword(r)
688 if resp != nil {
689 resp.Header = newHeader(a.s)
690 }
691 return resp, err
692}
693
694func (a *applierV3backend) UserGrantRole(r *pb.AuthUserGrantRoleRequest) (*pb.AuthUserGrantRoleResponse, error) {
695 resp, err := a.s.AuthStore().UserGrantRole(r)
696 if resp != nil {
697 resp.Header = newHeader(a.s)
698 }
699 return resp, err
700}
701
702func (a *applierV3backend) UserGet(r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, error) {
703 resp, err := a.s.AuthStore().UserGet(r)
704 if resp != nil {
705 resp.Header = newHeader(a.s)
706 }
707 return resp, err
708}
709
710func (a *applierV3backend) UserRevokeRole(r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUserRevokeRoleResponse, error) {
711 resp, err := a.s.AuthStore().UserRevokeRole(r)
712 if resp != nil {
713 resp.Header = newHeader(a.s)
714 }
715 return resp, err
716}
717
718func (a *applierV3backend) RoleAdd(r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, error) {
719 resp, err := a.s.AuthStore().RoleAdd(r)
720 if resp != nil {
721 resp.Header = newHeader(a.s)
722 }
723 return resp, err
724}
725
726func (a *applierV3backend) RoleGrantPermission(r *pb.AuthRoleGrantPermissionRequest) (*pb.AuthRoleGrantPermissionResponse, error) {
727 resp, err := a.s.AuthStore().RoleGrantPermission(r)
728 if resp != nil {
729 resp.Header = newHeader(a.s)
730 }
731 return resp, err
732}
733
734func (a *applierV3backend) RoleGet(r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, error) {
735 resp, err := a.s.AuthStore().RoleGet(r)
736 if resp != nil {
737 resp.Header = newHeader(a.s)
738 }
739 return resp, err
740}
741
742func (a *applierV3backend) RoleRevokePermission(r *pb.AuthRoleRevokePermissionRequest) (*pb.AuthRoleRevokePermissionResponse, error) {
743 resp, err := a.s.AuthStore().RoleRevokePermission(r)
744 if resp != nil {
745 resp.Header = newHeader(a.s)
746 }
747 return resp, err
748}
749
750func (a *applierV3backend) RoleDelete(r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDeleteResponse, error) {
751 resp, err := a.s.AuthStore().RoleDelete(r)
752 if resp != nil {
753 resp.Header = newHeader(a.s)
754 }
755 return resp, err
756}
757
758func (a *applierV3backend) UserList(r *pb.AuthUserListRequest) (*pb.AuthUserListResponse, error) {
759 resp, err := a.s.AuthStore().UserList(r)
760 if resp != nil {
761 resp.Header = newHeader(a.s)
762 }
763 return resp, err
764}
765
766func (a *applierV3backend) RoleList(r *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error) {
767 resp, err := a.s.AuthStore().RoleList(r)
768 if resp != nil {
769 resp.Header = newHeader(a.s)
770 }
771 return resp, err
772}
773
774type quotaApplierV3 struct {
775 applierV3
776 q Quota
777}
778
779func newQuotaApplierV3(s *EtcdServer, app applierV3) applierV3 {
780 return &quotaApplierV3{app, NewBackendQuota(s)}
781}
782
783func (a *quotaApplierV3) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, error) {
784 ok := a.q.Available(p)
785 resp, err := a.applierV3.Put(txn, p)
786 if err == nil && !ok {
787 err = ErrNoSpace
788 }
789 return resp, err
790}
791
792func (a *quotaApplierV3) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
793 ok := a.q.Available(rt)
794 resp, err := a.applierV3.Txn(rt)
795 if err == nil && !ok {
796 err = ErrNoSpace
797 }
798 return resp, err
799}
800
801func (a *quotaApplierV3) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
802 ok := a.q.Available(lc)
803 resp, err := a.applierV3.LeaseGrant(lc)
804 if err == nil && !ok {
805 err = ErrNoSpace
806 }
807 return resp, err
808}
809
810type kvSort struct{ kvs []mvccpb.KeyValue }
811
812func (s *kvSort) Swap(i, j int) {
813 t := s.kvs[i]
814 s.kvs[i] = s.kvs[j]
815 s.kvs[j] = t
816}
817func (s *kvSort) Len() int { return len(s.kvs) }
818
819type kvSortByKey struct{ *kvSort }
820
821func (s *kvSortByKey) Less(i, j int) bool {
822 return bytes.Compare(s.kvs[i].Key, s.kvs[j].Key) < 0
823}
824
825type kvSortByVersion struct{ *kvSort }
826
827func (s *kvSortByVersion) Less(i, j int) bool {
828 return (s.kvs[i].Version - s.kvs[j].Version) < 0
829}
830
831type kvSortByCreate struct{ *kvSort }
832
833func (s *kvSortByCreate) Less(i, j int) bool {
834 return (s.kvs[i].CreateRevision - s.kvs[j].CreateRevision) < 0
835}
836
837type kvSortByMod struct{ *kvSort }
838
839func (s *kvSortByMod) Less(i, j int) bool {
840 return (s.kvs[i].ModRevision - s.kvs[j].ModRevision) < 0
841}
842
843type kvSortByValue struct{ *kvSort }
844
845func (s *kvSortByValue) Less(i, j int) bool {
846 return bytes.Compare(s.kvs[i].Value, s.kvs[j].Value) < 0
847}
848
849func checkRequests(rv mvcc.ReadView, rt *pb.TxnRequest, txnPath []bool, f checkReqFunc) (int, error) {
850 txnCount := 0
851 reqs := rt.Success
852 if !txnPath[0] {
853 reqs = rt.Failure
854 }
855 for _, req := range reqs {
856 if tv, ok := req.Request.(*pb.RequestOp_RequestTxn); ok && tv.RequestTxn != nil {
857 txns, err := checkRequests(rv, tv.RequestTxn, txnPath[1:], f)
858 if err != nil {
859 return 0, err
860 }
861 txnCount += txns + 1
862 txnPath = txnPath[txns+1:]
863 continue
864 }
865 if err := f(rv, req); err != nil {
866 return 0, err
867 }
868 }
869 return txnCount, nil
870}
871
872func (a *applierV3backend) checkRequestPut(rv mvcc.ReadView, reqOp *pb.RequestOp) error {
873 tv, ok := reqOp.Request.(*pb.RequestOp_RequestPut)
874 if !ok || tv.RequestPut == nil {
875 return nil
876 }
877 req := tv.RequestPut
878 if req.IgnoreValue || req.IgnoreLease {
879 // expects previous key-value, error if not exist
880 rr, err := rv.Range(req.Key, nil, mvcc.RangeOptions{})
881 if err != nil {
882 return err
883 }
884 if rr == nil || len(rr.KVs) == 0 {
885 return ErrKeyNotFound
886 }
887 }
888 if lease.LeaseID(req.Lease) != lease.NoLease {
889 if l := a.s.lessor.Lookup(lease.LeaseID(req.Lease)); l == nil {
890 return lease.ErrLeaseNotFound
891 }
892 }
893 return nil
894}
895
896func (a *applierV3backend) checkRequestRange(rv mvcc.ReadView, reqOp *pb.RequestOp) error {
897 tv, ok := reqOp.Request.(*pb.RequestOp_RequestRange)
898 if !ok || tv.RequestRange == nil {
899 return nil
900 }
901 req := tv.RequestRange
902 switch {
903 case req.Revision == 0:
904 return nil
905 case req.Revision > rv.Rev():
906 return mvcc.ErrFutureRev
907 case req.Revision < rv.FirstRev():
908 return mvcc.ErrCompacted
909 }
910 return nil
911}
912
913func compareInt64(a, b int64) int {
914 switch {
915 case a < b:
916 return -1
917 case a > b:
918 return 1
919 default:
920 return 0
921 }
922}
923
924// mkGteRange determines if the range end is a >= range. This works around grpc
925// sending empty byte strings as nil; >= is encoded in the range end as '\0'.
926// If it is a GTE range, then []byte{} is returned to indicate the empty byte
927// string (vs nil being no byte string).
928func mkGteRange(rangeEnd []byte) []byte {
929 if len(rangeEnd) == 1 && rangeEnd[0] == 0 {
930 return []byte{}
931 }
932 return rangeEnd
933}
934
935func noSideEffect(r *pb.InternalRaftRequest) bool {
936 return r.Range != nil || r.AuthUserGet != nil || r.AuthRoleGet != nil
937}
938
939func removeNeedlessRangeReqs(txn *pb.TxnRequest) {
940 f := func(ops []*pb.RequestOp) []*pb.RequestOp {
941 j := 0
942 for i := 0; i < len(ops); i++ {
943 if _, ok := ops[i].Request.(*pb.RequestOp_RequestRange); ok {
944 continue
945 }
946 ops[j] = ops[i]
947 j++
948 }
949
950 return ops[:j]
951 }
952
953 txn.Success = f(txn.Success)
954 txn.Failure = f(txn.Failure)
955}
956
957func pruneKVs(rr *mvcc.RangeResult, isPrunable func(*mvccpb.KeyValue) bool) {
958 j := 0
959 for i := range rr.KVs {
960 rr.KVs[j] = rr.KVs[i]
961 if !isPrunable(&rr.KVs[i]) {
962 j++
963 }
964 }
965 rr.KVs = rr.KVs[:j]
966}
967
968func newHeader(s *EtcdServer) *pb.ResponseHeader {
969 return &pb.ResponseHeader{
970 ClusterId: uint64(s.Cluster().ID()),
971 MemberId: uint64(s.ID()),
972 Revision: s.KV().Rev(),
973 RaftTerm: s.Term(),
974 }
975}