blob: 93e78e390c80b0d155775a65131cb3bb6a7772a3 [file] [log] [blame]
khenaidooffe076b2019-01-15 16:08:08 -05001// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package etcdserver
16
17import (
18 "bytes"
19 "context"
20 "sort"
21 "time"
22
23 "github.com/coreos/etcd/auth"
24 pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
25 "github.com/coreos/etcd/lease"
26 "github.com/coreos/etcd/mvcc"
27 "github.com/coreos/etcd/mvcc/mvccpb"
28 "github.com/coreos/etcd/pkg/types"
29
30 "github.com/gogo/protobuf/proto"
31)
32
33const (
34 warnApplyDuration = 100 * time.Millisecond
35)
36
37type applyResult struct {
38 resp proto.Message
39 err error
40 // physc signals the physical effect of the request has completed in addition
41 // to being logically reflected by the node. Currently only used for
42 // Compaction requests.
43 physc <-chan struct{}
44}
45
46// applierV3 is the interface for processing V3 raft messages
47type applierV3 interface {
48 Apply(r *pb.InternalRaftRequest) *applyResult
49
50 Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, error)
51 Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error)
52 DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error)
53 Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error)
54 Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, error)
55
56 LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error)
57 LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error)
58
59 Alarm(*pb.AlarmRequest) (*pb.AlarmResponse, error)
60
61 Authenticate(r *pb.InternalAuthenticateRequest) (*pb.AuthenticateResponse, error)
62
63 AuthEnable() (*pb.AuthEnableResponse, error)
64 AuthDisable() (*pb.AuthDisableResponse, error)
65
66 UserAdd(ua *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, error)
67 UserDelete(ua *pb.AuthUserDeleteRequest) (*pb.AuthUserDeleteResponse, error)
68 UserChangePassword(ua *pb.AuthUserChangePasswordRequest) (*pb.AuthUserChangePasswordResponse, error)
69 UserGrantRole(ua *pb.AuthUserGrantRoleRequest) (*pb.AuthUserGrantRoleResponse, error)
70 UserGet(ua *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, error)
71 UserRevokeRole(ua *pb.AuthUserRevokeRoleRequest) (*pb.AuthUserRevokeRoleResponse, error)
72 RoleAdd(ua *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, error)
73 RoleGrantPermission(ua *pb.AuthRoleGrantPermissionRequest) (*pb.AuthRoleGrantPermissionResponse, error)
74 RoleGet(ua *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, error)
75 RoleRevokePermission(ua *pb.AuthRoleRevokePermissionRequest) (*pb.AuthRoleRevokePermissionResponse, error)
76 RoleDelete(ua *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDeleteResponse, error)
77 UserList(ua *pb.AuthUserListRequest) (*pb.AuthUserListResponse, error)
78 RoleList(ua *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error)
79}
80
81type checkReqFunc func(mvcc.ReadView, *pb.RequestOp) error
82
83type applierV3backend struct {
84 s *EtcdServer
85
86 checkPut checkReqFunc
87 checkRange checkReqFunc
88}
89
90func (s *EtcdServer) newApplierV3Backend() applierV3 {
91 base := &applierV3backend{s: s}
92 base.checkPut = func(rv mvcc.ReadView, req *pb.RequestOp) error {
93 return base.checkRequestPut(rv, req)
94 }
95 base.checkRange = func(rv mvcc.ReadView, req *pb.RequestOp) error {
96 return base.checkRequestRange(rv, req)
97 }
98 return base
99}
100
101func (s *EtcdServer) newApplierV3() applierV3 {
102 return newAuthApplierV3(
103 s.AuthStore(),
104 newQuotaApplierV3(s, s.newApplierV3Backend()),
105 s.lessor,
106 )
107}
108
109func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult {
110 ar := &applyResult{}
111 defer func(start time.Time) {
112 warnOfExpensiveRequest(start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err)
113 }(time.Now())
114
115 // call into a.s.applyV3.F instead of a.F so upper appliers can check individual calls
116 switch {
117 case r.Range != nil:
118 ar.resp, ar.err = a.s.applyV3.Range(nil, r.Range)
119 case r.Put != nil:
120 ar.resp, ar.err = a.s.applyV3.Put(nil, r.Put)
121 case r.DeleteRange != nil:
122 ar.resp, ar.err = a.s.applyV3.DeleteRange(nil, r.DeleteRange)
123 case r.Txn != nil:
124 ar.resp, ar.err = a.s.applyV3.Txn(r.Txn)
125 case r.Compaction != nil:
126 ar.resp, ar.physc, ar.err = a.s.applyV3.Compaction(r.Compaction)
127 case r.LeaseGrant != nil:
128 ar.resp, ar.err = a.s.applyV3.LeaseGrant(r.LeaseGrant)
129 case r.LeaseRevoke != nil:
130 ar.resp, ar.err = a.s.applyV3.LeaseRevoke(r.LeaseRevoke)
131 case r.Alarm != nil:
132 ar.resp, ar.err = a.s.applyV3.Alarm(r.Alarm)
133 case r.Authenticate != nil:
134 ar.resp, ar.err = a.s.applyV3.Authenticate(r.Authenticate)
135 case r.AuthEnable != nil:
136 ar.resp, ar.err = a.s.applyV3.AuthEnable()
137 case r.AuthDisable != nil:
138 ar.resp, ar.err = a.s.applyV3.AuthDisable()
139 case r.AuthUserAdd != nil:
140 ar.resp, ar.err = a.s.applyV3.UserAdd(r.AuthUserAdd)
141 case r.AuthUserDelete != nil:
142 ar.resp, ar.err = a.s.applyV3.UserDelete(r.AuthUserDelete)
143 case r.AuthUserChangePassword != nil:
144 ar.resp, ar.err = a.s.applyV3.UserChangePassword(r.AuthUserChangePassword)
145 case r.AuthUserGrantRole != nil:
146 ar.resp, ar.err = a.s.applyV3.UserGrantRole(r.AuthUserGrantRole)
147 case r.AuthUserGet != nil:
148 ar.resp, ar.err = a.s.applyV3.UserGet(r.AuthUserGet)
149 case r.AuthUserRevokeRole != nil:
150 ar.resp, ar.err = a.s.applyV3.UserRevokeRole(r.AuthUserRevokeRole)
151 case r.AuthRoleAdd != nil:
152 ar.resp, ar.err = a.s.applyV3.RoleAdd(r.AuthRoleAdd)
153 case r.AuthRoleGrantPermission != nil:
154 ar.resp, ar.err = a.s.applyV3.RoleGrantPermission(r.AuthRoleGrantPermission)
155 case r.AuthRoleGet != nil:
156 ar.resp, ar.err = a.s.applyV3.RoleGet(r.AuthRoleGet)
157 case r.AuthRoleRevokePermission != nil:
158 ar.resp, ar.err = a.s.applyV3.RoleRevokePermission(r.AuthRoleRevokePermission)
159 case r.AuthRoleDelete != nil:
160 ar.resp, ar.err = a.s.applyV3.RoleDelete(r.AuthRoleDelete)
161 case r.AuthUserList != nil:
162 ar.resp, ar.err = a.s.applyV3.UserList(r.AuthUserList)
163 case r.AuthRoleList != nil:
164 ar.resp, ar.err = a.s.applyV3.RoleList(r.AuthRoleList)
165 default:
166 panic("not implemented")
167 }
168 return ar
169}
170
171func (a *applierV3backend) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, err error) {
172 resp = &pb.PutResponse{}
173 resp.Header = &pb.ResponseHeader{}
174
175 val, leaseID := p.Value, lease.LeaseID(p.Lease)
176 if txn == nil {
177 if leaseID != lease.NoLease {
178 if l := a.s.lessor.Lookup(leaseID); l == nil {
179 return nil, lease.ErrLeaseNotFound
180 }
181 }
182 txn = a.s.KV().Write()
183 defer txn.End()
184 }
185
186 var rr *mvcc.RangeResult
187 if p.IgnoreValue || p.IgnoreLease || p.PrevKv {
188 rr, err = txn.Range(p.Key, nil, mvcc.RangeOptions{})
189 if err != nil {
190 return nil, err
191 }
192 }
193 if p.IgnoreValue || p.IgnoreLease {
194 if rr == nil || len(rr.KVs) == 0 {
195 // ignore_{lease,value} flag expects previous key-value pair
196 return nil, ErrKeyNotFound
197 }
198 }
199 if p.IgnoreValue {
200 val = rr.KVs[0].Value
201 }
202 if p.IgnoreLease {
203 leaseID = lease.LeaseID(rr.KVs[0].Lease)
204 }
205 if p.PrevKv {
206 if rr != nil && len(rr.KVs) != 0 {
207 resp.PrevKv = &rr.KVs[0]
208 }
209 }
210
211 resp.Header.Revision = txn.Put(p.Key, val, leaseID)
212 return resp, nil
213}
214
215func (a *applierV3backend) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
216 resp := &pb.DeleteRangeResponse{}
217 resp.Header = &pb.ResponseHeader{}
218 end := mkGteRange(dr.RangeEnd)
219
220 if txn == nil {
221 txn = a.s.kv.Write()
222 defer txn.End()
223 }
224
225 if dr.PrevKv {
226 rr, err := txn.Range(dr.Key, end, mvcc.RangeOptions{})
227 if err != nil {
228 return nil, err
229 }
230 if rr != nil {
231 resp.PrevKvs = make([]*mvccpb.KeyValue, len(rr.KVs))
232 for i := range rr.KVs {
233 resp.PrevKvs[i] = &rr.KVs[i]
234 }
235 }
236 }
237
238 resp.Deleted, resp.Header.Revision = txn.DeleteRange(dr.Key, end)
239 return resp, nil
240}
241
242func (a *applierV3backend) Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) {
243 resp := &pb.RangeResponse{}
244 resp.Header = &pb.ResponseHeader{}
245
246 if txn == nil {
247 txn = a.s.kv.Read()
248 defer txn.End()
249 }
250
251 limit := r.Limit
252 if r.SortOrder != pb.RangeRequest_NONE ||
253 r.MinModRevision != 0 || r.MaxModRevision != 0 ||
254 r.MinCreateRevision != 0 || r.MaxCreateRevision != 0 {
255 // fetch everything; sort and truncate afterwards
256 limit = 0
257 }
258 if limit > 0 {
259 // fetch one extra for 'more' flag
260 limit = limit + 1
261 }
262
263 ro := mvcc.RangeOptions{
264 Limit: limit,
265 Rev: r.Revision,
266 Count: r.CountOnly,
267 }
268
269 rr, err := txn.Range(r.Key, mkGteRange(r.RangeEnd), ro)
270 if err != nil {
271 return nil, err
272 }
273
274 if r.MaxModRevision != 0 {
275 f := func(kv *mvccpb.KeyValue) bool { return kv.ModRevision > r.MaxModRevision }
276 pruneKVs(rr, f)
277 }
278 if r.MinModRevision != 0 {
279 f := func(kv *mvccpb.KeyValue) bool { return kv.ModRevision < r.MinModRevision }
280 pruneKVs(rr, f)
281 }
282 if r.MaxCreateRevision != 0 {
283 f := func(kv *mvccpb.KeyValue) bool { return kv.CreateRevision > r.MaxCreateRevision }
284 pruneKVs(rr, f)
285 }
286 if r.MinCreateRevision != 0 {
287 f := func(kv *mvccpb.KeyValue) bool { return kv.CreateRevision < r.MinCreateRevision }
288 pruneKVs(rr, f)
289 }
290
291 sortOrder := r.SortOrder
292 if r.SortTarget != pb.RangeRequest_KEY && sortOrder == pb.RangeRequest_NONE {
293 // Since current mvcc.Range implementation returns results
294 // sorted by keys in lexiographically ascending order,
295 // sort ASCEND by default only when target is not 'KEY'
296 sortOrder = pb.RangeRequest_ASCEND
297 }
298 if sortOrder != pb.RangeRequest_NONE {
299 var sorter sort.Interface
300 switch {
301 case r.SortTarget == pb.RangeRequest_KEY:
302 sorter = &kvSortByKey{&kvSort{rr.KVs}}
303 case r.SortTarget == pb.RangeRequest_VERSION:
304 sorter = &kvSortByVersion{&kvSort{rr.KVs}}
305 case r.SortTarget == pb.RangeRequest_CREATE:
306 sorter = &kvSortByCreate{&kvSort{rr.KVs}}
307 case r.SortTarget == pb.RangeRequest_MOD:
308 sorter = &kvSortByMod{&kvSort{rr.KVs}}
309 case r.SortTarget == pb.RangeRequest_VALUE:
310 sorter = &kvSortByValue{&kvSort{rr.KVs}}
311 }
312 switch {
313 case sortOrder == pb.RangeRequest_ASCEND:
314 sort.Sort(sorter)
315 case sortOrder == pb.RangeRequest_DESCEND:
316 sort.Sort(sort.Reverse(sorter))
317 }
318 }
319
320 if r.Limit > 0 && len(rr.KVs) > int(r.Limit) {
321 rr.KVs = rr.KVs[:r.Limit]
322 resp.More = true
323 }
324
325 resp.Header.Revision = rr.Rev
326 resp.Count = int64(rr.Count)
327 resp.Kvs = make([]*mvccpb.KeyValue, len(rr.KVs))
328 for i := range rr.KVs {
329 if r.KeysOnly {
330 rr.KVs[i].Value = nil
331 }
332 resp.Kvs[i] = &rr.KVs[i]
333 }
334 return resp, nil
335}
336
337func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
338 isWrite := !isTxnReadonly(rt)
339 txn := mvcc.NewReadOnlyTxnWrite(a.s.KV().Read())
340
341 txnPath := compareToPath(txn, rt)
342 if isWrite {
343 if _, err := checkRequests(txn, rt, txnPath, a.checkPut); err != nil {
344 txn.End()
345 return nil, err
346 }
347 }
348 if _, err := checkRequests(txn, rt, txnPath, a.checkRange); err != nil {
349 txn.End()
350 return nil, err
351 }
352
353 txnResp, _ := newTxnResp(rt, txnPath)
354
355 // When executing mutable txn ops, etcd must hold the txn lock so
356 // readers do not see any intermediate results. Since writes are
357 // serialized on the raft loop, the revision in the read view will
358 // be the revision of the write txn.
359 if isWrite {
360 txn.End()
361 txn = a.s.KV().Write()
362 }
363 a.applyTxn(txn, rt, txnPath, txnResp)
364 rev := txn.Rev()
365 if len(txn.Changes()) != 0 {
366 rev++
367 }
368 txn.End()
369
370 txnResp.Header.Revision = rev
371 return txnResp, nil
372}
373
374// newTxnResp allocates a txn response for a txn request given a path.
375func newTxnResp(rt *pb.TxnRequest, txnPath []bool) (txnResp *pb.TxnResponse, txnCount int) {
376 reqs := rt.Success
377 if !txnPath[0] {
378 reqs = rt.Failure
379 }
380 resps := make([]*pb.ResponseOp, len(reqs))
381 txnResp = &pb.TxnResponse{
382 Responses: resps,
383 Succeeded: txnPath[0],
384 Header: &pb.ResponseHeader{},
385 }
386 for i, req := range reqs {
387 switch tv := req.Request.(type) {
388 case *pb.RequestOp_RequestRange:
389 resps[i] = &pb.ResponseOp{Response: &pb.ResponseOp_ResponseRange{}}
390 case *pb.RequestOp_RequestPut:
391 resps[i] = &pb.ResponseOp{Response: &pb.ResponseOp_ResponsePut{}}
392 case *pb.RequestOp_RequestDeleteRange:
393 resps[i] = &pb.ResponseOp{Response: &pb.ResponseOp_ResponseDeleteRange{}}
394 case *pb.RequestOp_RequestTxn:
395 resp, txns := newTxnResp(tv.RequestTxn, txnPath[1:])
396 resps[i] = &pb.ResponseOp{Response: &pb.ResponseOp_ResponseTxn{ResponseTxn: resp}}
397 txnPath = txnPath[1+txns:]
398 txnCount += txns + 1
399 default:
400 }
401 }
402 return txnResp, txnCount
403}
404
405func compareToPath(rv mvcc.ReadView, rt *pb.TxnRequest) []bool {
406 txnPath := make([]bool, 1)
407 ops := rt.Success
408 if txnPath[0] = applyCompares(rv, rt.Compare); !txnPath[0] {
409 ops = rt.Failure
410 }
411 for _, op := range ops {
412 tv, ok := op.Request.(*pb.RequestOp_RequestTxn)
413 if !ok || tv.RequestTxn == nil {
414 continue
415 }
416 txnPath = append(txnPath, compareToPath(rv, tv.RequestTxn)...)
417 }
418 return txnPath
419}
420
421func applyCompares(rv mvcc.ReadView, cmps []*pb.Compare) bool {
422 for _, c := range cmps {
423 if !applyCompare(rv, c) {
424 return false
425 }
426 }
427 return true
428}
429
430// applyCompare applies the compare request.
431// If the comparison succeeds, it returns true. Otherwise, returns false.
432func applyCompare(rv mvcc.ReadView, c *pb.Compare) bool {
433 // TODO: possible optimizations
434 // * chunk reads for large ranges to conserve memory
435 // * rewrite rules for common patterns:
436 // ex. "[a, b) createrev > 0" => "limit 1 /\ kvs > 0"
437 // * caching
438 rr, err := rv.Range(c.Key, mkGteRange(c.RangeEnd), mvcc.RangeOptions{})
439 if err != nil {
440 return false
441 }
442 if len(rr.KVs) == 0 {
443 if c.Target == pb.Compare_VALUE {
444 // Always fail if comparing a value on a key/keys that doesn't exist;
445 // nil == empty string in grpc; no way to represent missing value
446 return false
447 }
448 return compareKV(c, mvccpb.KeyValue{})
449 }
450 for _, kv := range rr.KVs {
451 if !compareKV(c, kv) {
452 return false
453 }
454 }
455 return true
456}
457
458func compareKV(c *pb.Compare, ckv mvccpb.KeyValue) bool {
459 var result int
460 rev := int64(0)
461 switch c.Target {
462 case pb.Compare_VALUE:
463 v := []byte{}
464 if tv, _ := c.TargetUnion.(*pb.Compare_Value); tv != nil {
465 v = tv.Value
466 }
467 result = bytes.Compare(ckv.Value, v)
468 case pb.Compare_CREATE:
469 if tv, _ := c.TargetUnion.(*pb.Compare_CreateRevision); tv != nil {
470 rev = tv.CreateRevision
471 }
472 result = compareInt64(ckv.CreateRevision, rev)
473 case pb.Compare_MOD:
474 if tv, _ := c.TargetUnion.(*pb.Compare_ModRevision); tv != nil {
475 rev = tv.ModRevision
476 }
477 result = compareInt64(ckv.ModRevision, rev)
478 case pb.Compare_VERSION:
479 if tv, _ := c.TargetUnion.(*pb.Compare_Version); tv != nil {
480 rev = tv.Version
481 }
482 result = compareInt64(ckv.Version, rev)
483 case pb.Compare_LEASE:
484 if tv, _ := c.TargetUnion.(*pb.Compare_Lease); tv != nil {
485 rev = tv.Lease
486 }
487 result = compareInt64(ckv.Lease, rev)
488 }
489 switch c.Result {
490 case pb.Compare_EQUAL:
491 return result == 0
492 case pb.Compare_NOT_EQUAL:
493 return result != 0
494 case pb.Compare_GREATER:
495 return result > 0
496 case pb.Compare_LESS:
497 return result < 0
498 }
499 return true
500}
501
502func (a *applierV3backend) applyTxn(txn mvcc.TxnWrite, rt *pb.TxnRequest, txnPath []bool, tresp *pb.TxnResponse) (txns int) {
503 reqs := rt.Success
504 if !txnPath[0] {
505 reqs = rt.Failure
506 }
507 for i, req := range reqs {
508 respi := tresp.Responses[i].Response
509 switch tv := req.Request.(type) {
510 case *pb.RequestOp_RequestRange:
511 resp, err := a.Range(txn, tv.RequestRange)
512 if err != nil {
513 plog.Panicf("unexpected error during txn: %v", err)
514 }
515 respi.(*pb.ResponseOp_ResponseRange).ResponseRange = resp
516 case *pb.RequestOp_RequestPut:
517 resp, err := a.Put(txn, tv.RequestPut)
518 if err != nil {
519 plog.Panicf("unexpected error during txn: %v", err)
520 }
521 respi.(*pb.ResponseOp_ResponsePut).ResponsePut = resp
522 case *pb.RequestOp_RequestDeleteRange:
523 resp, err := a.DeleteRange(txn, tv.RequestDeleteRange)
524 if err != nil {
525 plog.Panicf("unexpected error during txn: %v", err)
526 }
527 respi.(*pb.ResponseOp_ResponseDeleteRange).ResponseDeleteRange = resp
528 case *pb.RequestOp_RequestTxn:
529 resp := respi.(*pb.ResponseOp_ResponseTxn).ResponseTxn
530 applyTxns := a.applyTxn(txn, tv.RequestTxn, txnPath[1:], resp)
531 txns += applyTxns + 1
532 txnPath = txnPath[applyTxns+1:]
533 default:
534 // empty union
535 }
536 }
537 return txns
538}
539
540func (a *applierV3backend) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, error) {
541 resp := &pb.CompactionResponse{}
542 resp.Header = &pb.ResponseHeader{}
543 ch, err := a.s.KV().Compact(compaction.Revision)
544 if err != nil {
545 return nil, ch, err
546 }
547 // get the current revision. which key to get is not important.
548 rr, _ := a.s.KV().Range([]byte("compaction"), nil, mvcc.RangeOptions{})
549 resp.Header.Revision = rr.Rev
550 return resp, ch, err
551}
552
553func (a *applierV3backend) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
554 l, err := a.s.lessor.Grant(lease.LeaseID(lc.ID), lc.TTL)
555 resp := &pb.LeaseGrantResponse{}
556 if err == nil {
557 resp.ID = int64(l.ID)
558 resp.TTL = l.TTL()
559 resp.Header = newHeader(a.s)
560 }
561 return resp, err
562}
563
564func (a *applierV3backend) LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
565 err := a.s.lessor.Revoke(lease.LeaseID(lc.ID))
566 return &pb.LeaseRevokeResponse{Header: newHeader(a.s)}, err
567}
568
569func (a *applierV3backend) Alarm(ar *pb.AlarmRequest) (*pb.AlarmResponse, error) {
570 resp := &pb.AlarmResponse{}
571 oldCount := len(a.s.alarmStore.Get(ar.Alarm))
572
573 switch ar.Action {
574 case pb.AlarmRequest_GET:
575 resp.Alarms = a.s.alarmStore.Get(ar.Alarm)
576 case pb.AlarmRequest_ACTIVATE:
577 m := a.s.alarmStore.Activate(types.ID(ar.MemberID), ar.Alarm)
578 if m == nil {
579 break
580 }
581 resp.Alarms = append(resp.Alarms, m)
582 activated := oldCount == 0 && len(a.s.alarmStore.Get(m.Alarm)) == 1
583 if !activated {
584 break
585 }
586
587 plog.Warningf("alarm %v raised by peer %s", m.Alarm, types.ID(m.MemberID))
588 switch m.Alarm {
589 case pb.AlarmType_CORRUPT:
590 a.s.applyV3 = newApplierV3Corrupt(a)
591 case pb.AlarmType_NOSPACE:
592 a.s.applyV3 = newApplierV3Capped(a)
593 default:
594 plog.Errorf("unimplemented alarm activation (%+v)", m)
595 }
596 case pb.AlarmRequest_DEACTIVATE:
597 m := a.s.alarmStore.Deactivate(types.ID(ar.MemberID), ar.Alarm)
598 if m == nil {
599 break
600 }
601 resp.Alarms = append(resp.Alarms, m)
602 deactivated := oldCount > 0 && len(a.s.alarmStore.Get(ar.Alarm)) == 0
603 if !deactivated {
604 break
605 }
606
607 switch m.Alarm {
608 case pb.AlarmType_NOSPACE, pb.AlarmType_CORRUPT:
609 // TODO: check kv hash before deactivating CORRUPT?
610 plog.Infof("alarm disarmed %+v", ar)
611 a.s.applyV3 = a.s.newApplierV3()
612 default:
613 plog.Errorf("unimplemented alarm deactivation (%+v)", m)
614 }
615 default:
616 return nil, nil
617 }
618 return resp, nil
619}
620
621type applierV3Capped struct {
622 applierV3
623 q backendQuota
624}
625
626// newApplierV3Capped creates an applyV3 that will reject Puts and transactions
627// with Puts so that the number of keys in the store is capped.
628func newApplierV3Capped(base applierV3) applierV3 { return &applierV3Capped{applierV3: base} }
629
630func (a *applierV3Capped) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, error) {
631 return nil, ErrNoSpace
632}
633
634func (a *applierV3Capped) Txn(r *pb.TxnRequest) (*pb.TxnResponse, error) {
635 if a.q.Cost(r) > 0 {
636 return nil, ErrNoSpace
637 }
638 return a.applierV3.Txn(r)
639}
640
641func (a *applierV3Capped) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
642 return nil, ErrNoSpace
643}
644
645func (a *applierV3backend) AuthEnable() (*pb.AuthEnableResponse, error) {
646 err := a.s.AuthStore().AuthEnable()
647 if err != nil {
648 return nil, err
649 }
650 return &pb.AuthEnableResponse{Header: newHeader(a.s)}, nil
651}
652
653func (a *applierV3backend) AuthDisable() (*pb.AuthDisableResponse, error) {
654 a.s.AuthStore().AuthDisable()
655 return &pb.AuthDisableResponse{Header: newHeader(a.s)}, nil
656}
657
658func (a *applierV3backend) Authenticate(r *pb.InternalAuthenticateRequest) (*pb.AuthenticateResponse, error) {
659 ctx := context.WithValue(context.WithValue(a.s.ctx, auth.AuthenticateParamIndex{}, a.s.consistIndex.ConsistentIndex()), auth.AuthenticateParamSimpleTokenPrefix{}, r.SimpleToken)
660 resp, err := a.s.AuthStore().Authenticate(ctx, r.Name, r.Password)
661 if resp != nil {
662 resp.Header = newHeader(a.s)
663 }
664 return resp, err
665}
666
667func (a *applierV3backend) UserAdd(r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, error) {
668 resp, err := a.s.AuthStore().UserAdd(r)
669 if resp != nil {
670 resp.Header = newHeader(a.s)
671 }
672 return resp, err
673}
674
675func (a *applierV3backend) UserDelete(r *pb.AuthUserDeleteRequest) (*pb.AuthUserDeleteResponse, error) {
676 resp, err := a.s.AuthStore().UserDelete(r)
677 if resp != nil {
678 resp.Header = newHeader(a.s)
679 }
680 return resp, err
681}
682
683func (a *applierV3backend) UserChangePassword(r *pb.AuthUserChangePasswordRequest) (*pb.AuthUserChangePasswordResponse, error) {
684 resp, err := a.s.AuthStore().UserChangePassword(r)
685 if resp != nil {
686 resp.Header = newHeader(a.s)
687 }
688 return resp, err
689}
690
691func (a *applierV3backend) UserGrantRole(r *pb.AuthUserGrantRoleRequest) (*pb.AuthUserGrantRoleResponse, error) {
692 resp, err := a.s.AuthStore().UserGrantRole(r)
693 if resp != nil {
694 resp.Header = newHeader(a.s)
695 }
696 return resp, err
697}
698
699func (a *applierV3backend) UserGet(r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, error) {
700 resp, err := a.s.AuthStore().UserGet(r)
701 if resp != nil {
702 resp.Header = newHeader(a.s)
703 }
704 return resp, err
705}
706
707func (a *applierV3backend) UserRevokeRole(r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUserRevokeRoleResponse, error) {
708 resp, err := a.s.AuthStore().UserRevokeRole(r)
709 if resp != nil {
710 resp.Header = newHeader(a.s)
711 }
712 return resp, err
713}
714
715func (a *applierV3backend) RoleAdd(r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, error) {
716 resp, err := a.s.AuthStore().RoleAdd(r)
717 if resp != nil {
718 resp.Header = newHeader(a.s)
719 }
720 return resp, err
721}
722
723func (a *applierV3backend) RoleGrantPermission(r *pb.AuthRoleGrantPermissionRequest) (*pb.AuthRoleGrantPermissionResponse, error) {
724 resp, err := a.s.AuthStore().RoleGrantPermission(r)
725 if resp != nil {
726 resp.Header = newHeader(a.s)
727 }
728 return resp, err
729}
730
731func (a *applierV3backend) RoleGet(r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, error) {
732 resp, err := a.s.AuthStore().RoleGet(r)
733 if resp != nil {
734 resp.Header = newHeader(a.s)
735 }
736 return resp, err
737}
738
739func (a *applierV3backend) RoleRevokePermission(r *pb.AuthRoleRevokePermissionRequest) (*pb.AuthRoleRevokePermissionResponse, error) {
740 resp, err := a.s.AuthStore().RoleRevokePermission(r)
741 if resp != nil {
742 resp.Header = newHeader(a.s)
743 }
744 return resp, err
745}
746
747func (a *applierV3backend) RoleDelete(r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDeleteResponse, error) {
748 resp, err := a.s.AuthStore().RoleDelete(r)
749 if resp != nil {
750 resp.Header = newHeader(a.s)
751 }
752 return resp, err
753}
754
755func (a *applierV3backend) UserList(r *pb.AuthUserListRequest) (*pb.AuthUserListResponse, error) {
756 resp, err := a.s.AuthStore().UserList(r)
757 if resp != nil {
758 resp.Header = newHeader(a.s)
759 }
760 return resp, err
761}
762
763func (a *applierV3backend) RoleList(r *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error) {
764 resp, err := a.s.AuthStore().RoleList(r)
765 if resp != nil {
766 resp.Header = newHeader(a.s)
767 }
768 return resp, err
769}
770
771type quotaApplierV3 struct {
772 applierV3
773 q Quota
774}
775
776func newQuotaApplierV3(s *EtcdServer, app applierV3) applierV3 {
777 return &quotaApplierV3{app, NewBackendQuota(s)}
778}
779
780func (a *quotaApplierV3) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, error) {
781 ok := a.q.Available(p)
782 resp, err := a.applierV3.Put(txn, p)
783 if err == nil && !ok {
784 err = ErrNoSpace
785 }
786 return resp, err
787}
788
789func (a *quotaApplierV3) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
790 ok := a.q.Available(rt)
791 resp, err := a.applierV3.Txn(rt)
792 if err == nil && !ok {
793 err = ErrNoSpace
794 }
795 return resp, err
796}
797
798func (a *quotaApplierV3) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
799 ok := a.q.Available(lc)
800 resp, err := a.applierV3.LeaseGrant(lc)
801 if err == nil && !ok {
802 err = ErrNoSpace
803 }
804 return resp, err
805}
806
807type kvSort struct{ kvs []mvccpb.KeyValue }
808
809func (s *kvSort) Swap(i, j int) {
810 t := s.kvs[i]
811 s.kvs[i] = s.kvs[j]
812 s.kvs[j] = t
813}
814func (s *kvSort) Len() int { return len(s.kvs) }
815
816type kvSortByKey struct{ *kvSort }
817
818func (s *kvSortByKey) Less(i, j int) bool {
819 return bytes.Compare(s.kvs[i].Key, s.kvs[j].Key) < 0
820}
821
822type kvSortByVersion struct{ *kvSort }
823
824func (s *kvSortByVersion) Less(i, j int) bool {
825 return (s.kvs[i].Version - s.kvs[j].Version) < 0
826}
827
828type kvSortByCreate struct{ *kvSort }
829
830func (s *kvSortByCreate) Less(i, j int) bool {
831 return (s.kvs[i].CreateRevision - s.kvs[j].CreateRevision) < 0
832}
833
834type kvSortByMod struct{ *kvSort }
835
836func (s *kvSortByMod) Less(i, j int) bool {
837 return (s.kvs[i].ModRevision - s.kvs[j].ModRevision) < 0
838}
839
840type kvSortByValue struct{ *kvSort }
841
842func (s *kvSortByValue) Less(i, j int) bool {
843 return bytes.Compare(s.kvs[i].Value, s.kvs[j].Value) < 0
844}
845
846func checkRequests(rv mvcc.ReadView, rt *pb.TxnRequest, txnPath []bool, f checkReqFunc) (int, error) {
847 txnCount := 0
848 reqs := rt.Success
849 if !txnPath[0] {
850 reqs = rt.Failure
851 }
852 for _, req := range reqs {
853 if tv, ok := req.Request.(*pb.RequestOp_RequestTxn); ok && tv.RequestTxn != nil {
854 txns, err := checkRequests(rv, tv.RequestTxn, txnPath[1:], f)
855 if err != nil {
856 return 0, err
857 }
858 txnCount += txns + 1
859 txnPath = txnPath[txns+1:]
860 continue
861 }
862 if err := f(rv, req); err != nil {
863 return 0, err
864 }
865 }
866 return txnCount, nil
867}
868
869func (a *applierV3backend) checkRequestPut(rv mvcc.ReadView, reqOp *pb.RequestOp) error {
870 tv, ok := reqOp.Request.(*pb.RequestOp_RequestPut)
871 if !ok || tv.RequestPut == nil {
872 return nil
873 }
874 req := tv.RequestPut
875 if req.IgnoreValue || req.IgnoreLease {
876 // expects previous key-value, error if not exist
877 rr, err := rv.Range(req.Key, nil, mvcc.RangeOptions{})
878 if err != nil {
879 return err
880 }
881 if rr == nil || len(rr.KVs) == 0 {
882 return ErrKeyNotFound
883 }
884 }
885 if lease.LeaseID(req.Lease) != lease.NoLease {
886 if l := a.s.lessor.Lookup(lease.LeaseID(req.Lease)); l == nil {
887 return lease.ErrLeaseNotFound
888 }
889 }
890 return nil
891}
892
893func (a *applierV3backend) checkRequestRange(rv mvcc.ReadView, reqOp *pb.RequestOp) error {
894 tv, ok := reqOp.Request.(*pb.RequestOp_RequestRange)
895 if !ok || tv.RequestRange == nil {
896 return nil
897 }
898 req := tv.RequestRange
899 switch {
900 case req.Revision == 0:
901 return nil
902 case req.Revision > rv.Rev():
903 return mvcc.ErrFutureRev
904 case req.Revision < rv.FirstRev():
905 return mvcc.ErrCompacted
906 }
907 return nil
908}
909
910func compareInt64(a, b int64) int {
911 switch {
912 case a < b:
913 return -1
914 case a > b:
915 return 1
916 default:
917 return 0
918 }
919}
920
921// mkGteRange determines if the range end is a >= range. This works around grpc
922// sending empty byte strings as nil; >= is encoded in the range end as '\0'.
923// If it is a GTE range, then []byte{} is returned to indicate the empty byte
924// string (vs nil being no byte string).
925func mkGteRange(rangeEnd []byte) []byte {
926 if len(rangeEnd) == 1 && rangeEnd[0] == 0 {
927 return []byte{}
928 }
929 return rangeEnd
930}
931
932func noSideEffect(r *pb.InternalRaftRequest) bool {
933 return r.Range != nil || r.AuthUserGet != nil || r.AuthRoleGet != nil
934}
935
936func removeNeedlessRangeReqs(txn *pb.TxnRequest) {
937 f := func(ops []*pb.RequestOp) []*pb.RequestOp {
938 j := 0
939 for i := 0; i < len(ops); i++ {
940 if _, ok := ops[i].Request.(*pb.RequestOp_RequestRange); ok {
941 continue
942 }
943 ops[j] = ops[i]
944 j++
945 }
946
947 return ops[:j]
948 }
949
950 txn.Success = f(txn.Success)
951 txn.Failure = f(txn.Failure)
952}
953
954func pruneKVs(rr *mvcc.RangeResult, isPrunable func(*mvccpb.KeyValue) bool) {
955 j := 0
956 for i := range rr.KVs {
957 rr.KVs[j] = rr.KVs[i]
958 if !isPrunable(&rr.KVs[i]) {
959 j++
960 }
961 }
962 rr.KVs = rr.KVs[:j]
963}
964
965func newHeader(s *EtcdServer) *pb.ResponseHeader {
966 return &pb.ResponseHeader{
967 ClusterId: uint64(s.Cluster().ID()),
968 MemberId: uint64(s.ID()),
969 Revision: s.KV().Rev(),
970 RaftTerm: s.Term(),
971 }
972}