blob: 74e679c3b03f578dd41881fb285bebcf0c53ce96 [file] [log] [blame]
khenaidood948f772021-08-11 17:49:24 -04001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package etcdserver
16
17import (
18 "bytes"
19 "context"
20 "encoding/binary"
21 "time"
22
23 "github.com/coreos/etcd/auth"
24 pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
25 "github.com/coreos/etcd/etcdserver/membership"
26 "github.com/coreos/etcd/lease"
27 "github.com/coreos/etcd/lease/leasehttp"
28 "github.com/coreos/etcd/mvcc"
29 "github.com/coreos/etcd/raft"
30
31 "github.com/gogo/protobuf/proto"
32)
33
34const (
35 // In the health case, there might be a small gap (10s of entries) between
36 // the applied index and committed index.
37 // However, if the committed entries are very heavy to apply, the gap might grow.
38 // We should stop accepting new proposals if the gap growing to a certain point.
39 maxGapBetweenApplyAndCommitIndex = 5000
40)
41
42type RaftKV interface {
43 Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error)
44 Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error)
45 DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error)
46 Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error)
47 Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error)
48}
49
50type Lessor interface {
51 // LeaseGrant sends LeaseGrant request to raft and apply it after committed.
52 LeaseGrant(ctx context.Context, r *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error)
53 // LeaseRevoke sends LeaseRevoke request to raft and apply it after committed.
54 LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error)
55
56 // LeaseRenew renews the lease with given ID. The renewed TTL is returned. Or an error
57 // is returned.
58 LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error)
59
60 // LeaseTimeToLive retrieves lease information.
61 LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error)
62
63 // LeaseLeases lists all leases.
64 LeaseLeases(ctx context.Context, r *pb.LeaseLeasesRequest) (*pb.LeaseLeasesResponse, error)
65}
66
67type Authenticator interface {
68 AuthEnable(ctx context.Context, r *pb.AuthEnableRequest) (*pb.AuthEnableResponse, error)
69 AuthDisable(ctx context.Context, r *pb.AuthDisableRequest) (*pb.AuthDisableResponse, error)
70 Authenticate(ctx context.Context, r *pb.AuthenticateRequest) (*pb.AuthenticateResponse, error)
71 UserAdd(ctx context.Context, r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, error)
72 UserDelete(ctx context.Context, r *pb.AuthUserDeleteRequest) (*pb.AuthUserDeleteResponse, error)
73 UserChangePassword(ctx context.Context, r *pb.AuthUserChangePasswordRequest) (*pb.AuthUserChangePasswordResponse, error)
74 UserGrantRole(ctx context.Context, r *pb.AuthUserGrantRoleRequest) (*pb.AuthUserGrantRoleResponse, error)
75 UserGet(ctx context.Context, r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, error)
76 UserRevokeRole(ctx context.Context, r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUserRevokeRoleResponse, error)
77 RoleAdd(ctx context.Context, r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, error)
78 RoleGrantPermission(ctx context.Context, r *pb.AuthRoleGrantPermissionRequest) (*pb.AuthRoleGrantPermissionResponse, error)
79 RoleGet(ctx context.Context, r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, error)
80 RoleRevokePermission(ctx context.Context, r *pb.AuthRoleRevokePermissionRequest) (*pb.AuthRoleRevokePermissionResponse, error)
81 RoleDelete(ctx context.Context, r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDeleteResponse, error)
82 UserList(ctx context.Context, r *pb.AuthUserListRequest) (*pb.AuthUserListResponse, error)
83 RoleList(ctx context.Context, r *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error)
84}
85
86func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
87 var resp *pb.RangeResponse
88 var err error
89 defer func(start time.Time) {
90 warnOfExpensiveReadOnlyRangeRequest(start, r, resp, err)
91 }(time.Now())
92
93 if !r.Serializable {
94 err = s.linearizableReadNotify(ctx)
95 if err != nil {
96 return nil, err
97 }
98 }
99 chk := func(ai *auth.AuthInfo) error {
100 return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd)
101 }
102
103 get := func() { resp, err = s.applyV3Base.Range(nil, r) }
104 if serr := s.doSerialize(ctx, chk, get); serr != nil {
105 err = serr
106 return nil, err
107 }
108 return resp, err
109}
110
111func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
112 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Put: r})
113 if err != nil {
114 return nil, err
115 }
116 return resp.(*pb.PutResponse), nil
117}
118
119func (s *EtcdServer) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
120 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{DeleteRange: r})
121 if err != nil {
122 return nil, err
123 }
124 return resp.(*pb.DeleteRangeResponse), nil
125}
126
127func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
128 if isTxnReadonly(r) {
129 if !isTxnSerializable(r) {
130 err := s.linearizableReadNotify(ctx)
131 if err != nil {
132 return nil, err
133 }
134 }
135 var resp *pb.TxnResponse
136 var err error
137 chk := func(ai *auth.AuthInfo) error {
138 return checkTxnAuth(s.authStore, ai, r)
139 }
140
141 defer func(start time.Time) {
142 warnOfExpensiveReadOnlyTxnRequest(start, r, resp, err)
143 }(time.Now())
144
145 get := func() { resp, err = s.applyV3Base.Txn(r) }
146 if serr := s.doSerialize(ctx, chk, get); serr != nil {
147 return nil, serr
148 }
149 return resp, err
150 }
151
152 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Txn: r})
153 if err != nil {
154 return nil, err
155 }
156 return resp.(*pb.TxnResponse), nil
157}
158
159func isTxnSerializable(r *pb.TxnRequest) bool {
160 for _, u := range r.Success {
161 if r := u.GetRequestRange(); r == nil || !r.Serializable {
162 return false
163 }
164 }
165 for _, u := range r.Failure {
166 if r := u.GetRequestRange(); r == nil || !r.Serializable {
167 return false
168 }
169 }
170 return true
171}
172
173func isTxnReadonly(r *pb.TxnRequest) bool {
174 for _, u := range r.Success {
175 if r := u.GetRequestRange(); r == nil {
176 return false
177 }
178 }
179 for _, u := range r.Failure {
180 if r := u.GetRequestRange(); r == nil {
181 return false
182 }
183 }
184 return true
185}
186
187func (s *EtcdServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) {
188 result, err := s.processInternalRaftRequestOnce(ctx, pb.InternalRaftRequest{Compaction: r})
189 if r.Physical && result != nil && result.physc != nil {
190 <-result.physc
191 // The compaction is done deleting keys; the hash is now settled
192 // but the data is not necessarily committed. If there's a crash,
193 // the hash may revert to a hash prior to compaction completing
194 // if the compaction resumes. Force the finished compaction to
195 // commit so it won't resume following a crash.
196 s.be.ForceCommit()
197 }
198 if err != nil {
199 return nil, err
200 }
201 if result.err != nil {
202 return nil, result.err
203 }
204 resp := result.resp.(*pb.CompactionResponse)
205 if resp == nil {
206 resp = &pb.CompactionResponse{}
207 }
208 if resp.Header == nil {
209 resp.Header = &pb.ResponseHeader{}
210 }
211 resp.Header.Revision = s.kv.Rev()
212 return resp, nil
213}
214
215func (s *EtcdServer) LeaseGrant(ctx context.Context, r *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
216 // no id given? choose one
217 for r.ID == int64(lease.NoLease) {
218 // only use positive int64 id's
219 r.ID = int64(s.reqIDGen.Next() & ((1 << 63) - 1))
220 }
221 resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseGrant: r})
222 if err != nil {
223 return nil, err
224 }
225 return resp.(*pb.LeaseGrantResponse), nil
226}
227
228func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
229 resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseRevoke: r})
230 if err != nil {
231 return nil, err
232 }
233 return resp.(*pb.LeaseRevokeResponse), nil
234}
235
236func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error) {
237 ttl, err := s.lessor.Renew(id)
238 if err == nil { // already requested to primary lessor(leader)
239 return ttl, nil
240 }
241 if err != lease.ErrNotPrimary {
242 return -1, err
243 }
244
245 cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
246 defer cancel()
247
248 // renewals don't go through raft; forward to leader manually
249 for cctx.Err() == nil && err != nil {
250 leader, lerr := s.waitLeader(cctx)
251 if lerr != nil {
252 return -1, lerr
253 }
254 for _, url := range leader.PeerURLs {
255 lurl := url + leasehttp.LeasePrefix
256 ttl, err = leasehttp.RenewHTTP(cctx, id, lurl, s.peerRt)
257 if err == nil || err == lease.ErrLeaseNotFound {
258 return ttl, err
259 }
260 }
261 }
262 return -1, ErrTimeout
263}
264
265func (s *EtcdServer) LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) {
266 if s.Leader() == s.ID() {
267 // primary; timetolive directly from leader
268 le := s.lessor.Lookup(lease.LeaseID(r.ID))
269 if le == nil {
270 return nil, lease.ErrLeaseNotFound
271 }
272 // TODO: fill out ResponseHeader
273 resp := &pb.LeaseTimeToLiveResponse{Header: &pb.ResponseHeader{}, ID: r.ID, TTL: int64(le.Remaining().Seconds()), GrantedTTL: le.TTL()}
274 if r.Keys {
275 ks := le.Keys()
276 kbs := make([][]byte, len(ks))
277 for i := range ks {
278 kbs[i] = []byte(ks[i])
279 }
280 resp.Keys = kbs
281 }
282 return resp, nil
283 }
284
285 cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
286 defer cancel()
287
288 // forward to leader
289 for cctx.Err() == nil {
290 leader, err := s.waitLeader(cctx)
291 if err != nil {
292 return nil, err
293 }
294 for _, url := range leader.PeerURLs {
295 lurl := url + leasehttp.LeaseInternalPrefix
296 resp, err := leasehttp.TimeToLiveHTTP(cctx, lease.LeaseID(r.ID), r.Keys, lurl, s.peerRt)
297 if err == nil {
298 return resp.LeaseTimeToLiveResponse, nil
299 }
300 if err == lease.ErrLeaseNotFound {
301 return nil, err
302 }
303 }
304 }
305 return nil, ErrTimeout
306}
307
308func (s *EtcdServer) LeaseLeases(ctx context.Context, r *pb.LeaseLeasesRequest) (*pb.LeaseLeasesResponse, error) {
309 ls := s.lessor.Leases()
310 lss := make([]*pb.LeaseStatus, len(ls))
311 for i := range ls {
312 lss[i] = &pb.LeaseStatus{ID: int64(ls[i].ID)}
313 }
314 return &pb.LeaseLeasesResponse{Header: newHeader(s), Leases: lss}, nil
315}
316
317func (s *EtcdServer) waitLeader(ctx context.Context) (*membership.Member, error) {
318 leader := s.cluster.Member(s.Leader())
319 for leader == nil {
320 // wait an election
321 dur := time.Duration(s.Cfg.ElectionTicks) * time.Duration(s.Cfg.TickMs) * time.Millisecond
322 select {
323 case <-time.After(dur):
324 leader = s.cluster.Member(s.Leader())
325 case <-s.stopping:
326 return nil, ErrStopped
327 case <-ctx.Done():
328 return nil, ErrNoLeader
329 }
330 }
331 if leader == nil || len(leader.PeerURLs) == 0 {
332 return nil, ErrNoLeader
333 }
334 return leader, nil
335}
336
337func (s *EtcdServer) Alarm(ctx context.Context, r *pb.AlarmRequest) (*pb.AlarmResponse, error) {
338 resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{Alarm: r})
339 if err != nil {
340 return nil, err
341 }
342 return resp.(*pb.AlarmResponse), nil
343}
344
345func (s *EtcdServer) AuthEnable(ctx context.Context, r *pb.AuthEnableRequest) (*pb.AuthEnableResponse, error) {
346 resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{AuthEnable: r})
347 if err != nil {
348 return nil, err
349 }
350 return resp.(*pb.AuthEnableResponse), nil
351}
352
353func (s *EtcdServer) AuthDisable(ctx context.Context, r *pb.AuthDisableRequest) (*pb.AuthDisableResponse, error) {
354 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthDisable: r})
355 if err != nil {
356 return nil, err
357 }
358 return resp.(*pb.AuthDisableResponse), nil
359}
360
361func (s *EtcdServer) Authenticate(ctx context.Context, r *pb.AuthenticateRequest) (*pb.AuthenticateResponse, error) {
362 if err := s.linearizableReadNotify(ctx); err != nil {
363 return nil, err
364 }
365
366 var resp proto.Message
367 for {
368 checkedRevision, err := s.AuthStore().CheckPassword(r.Name, r.Password)
369 if err != nil {
370 if err != auth.ErrAuthNotEnabled {
371 plog.Errorf("invalid authentication request to user %s was issued", r.Name)
372 }
373 return nil, err
374 }
375
376 st, err := s.AuthStore().GenTokenPrefix()
377 if err != nil {
378 return nil, err
379 }
380
381 // internalReq doesn't need to have Password because the above s.AuthStore().CheckPassword() already did it.
382 // In addition, it will let a WAL entry not record password as a plain text.
383 internalReq := &pb.InternalAuthenticateRequest{
384 Name: r.Name,
385 SimpleToken: st,
386 }
387
388 resp, err = s.raftRequestOnce(ctx, pb.InternalRaftRequest{Authenticate: internalReq})
389 if err != nil {
390 return nil, err
391 }
392 if checkedRevision == s.AuthStore().Revision() {
393 break
394 }
395 plog.Infof("revision when password checked is obsolete, retrying")
396 }
397
398 return resp.(*pb.AuthenticateResponse), nil
399}
400
401func (s *EtcdServer) UserAdd(ctx context.Context, r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, error) {
402 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserAdd: r})
403 if err != nil {
404 return nil, err
405 }
406 return resp.(*pb.AuthUserAddResponse), nil
407}
408
409func (s *EtcdServer) UserDelete(ctx context.Context, r *pb.AuthUserDeleteRequest) (*pb.AuthUserDeleteResponse, error) {
410 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserDelete: r})
411 if err != nil {
412 return nil, err
413 }
414 return resp.(*pb.AuthUserDeleteResponse), nil
415}
416
417func (s *EtcdServer) UserChangePassword(ctx context.Context, r *pb.AuthUserChangePasswordRequest) (*pb.AuthUserChangePasswordResponse, error) {
418 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserChangePassword: r})
419 if err != nil {
420 return nil, err
421 }
422 return resp.(*pb.AuthUserChangePasswordResponse), nil
423}
424
425func (s *EtcdServer) UserGrantRole(ctx context.Context, r *pb.AuthUserGrantRoleRequest) (*pb.AuthUserGrantRoleResponse, error) {
426 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserGrantRole: r})
427 if err != nil {
428 return nil, err
429 }
430 return resp.(*pb.AuthUserGrantRoleResponse), nil
431}
432
433func (s *EtcdServer) UserGet(ctx context.Context, r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, error) {
434 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserGet: r})
435 if err != nil {
436 return nil, err
437 }
438 return resp.(*pb.AuthUserGetResponse), nil
439}
440
441func (s *EtcdServer) UserList(ctx context.Context, r *pb.AuthUserListRequest) (*pb.AuthUserListResponse, error) {
442 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserList: r})
443 if err != nil {
444 return nil, err
445 }
446 return resp.(*pb.AuthUserListResponse), nil
447}
448
449func (s *EtcdServer) UserRevokeRole(ctx context.Context, r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUserRevokeRoleResponse, error) {
450 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserRevokeRole: r})
451 if err != nil {
452 return nil, err
453 }
454 return resp.(*pb.AuthUserRevokeRoleResponse), nil
455}
456
457func (s *EtcdServer) RoleAdd(ctx context.Context, r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, error) {
458 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleAdd: r})
459 if err != nil {
460 return nil, err
461 }
462 return resp.(*pb.AuthRoleAddResponse), nil
463}
464
465func (s *EtcdServer) RoleGrantPermission(ctx context.Context, r *pb.AuthRoleGrantPermissionRequest) (*pb.AuthRoleGrantPermissionResponse, error) {
466 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleGrantPermission: r})
467 if err != nil {
468 return nil, err
469 }
470 return resp.(*pb.AuthRoleGrantPermissionResponse), nil
471}
472
473func (s *EtcdServer) RoleGet(ctx context.Context, r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, error) {
474 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleGet: r})
475 if err != nil {
476 return nil, err
477 }
478 return resp.(*pb.AuthRoleGetResponse), nil
479}
480
481func (s *EtcdServer) RoleList(ctx context.Context, r *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error) {
482 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleList: r})
483 if err != nil {
484 return nil, err
485 }
486 return resp.(*pb.AuthRoleListResponse), nil
487}
488
489func (s *EtcdServer) RoleRevokePermission(ctx context.Context, r *pb.AuthRoleRevokePermissionRequest) (*pb.AuthRoleRevokePermissionResponse, error) {
490 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleRevokePermission: r})
491 if err != nil {
492 return nil, err
493 }
494 return resp.(*pb.AuthRoleRevokePermissionResponse), nil
495}
496
497func (s *EtcdServer) RoleDelete(ctx context.Context, r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDeleteResponse, error) {
498 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleDelete: r})
499 if err != nil {
500 return nil, err
501 }
502 return resp.(*pb.AuthRoleDeleteResponse), nil
503}
504
505func (s *EtcdServer) raftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (proto.Message, error) {
506 result, err := s.processInternalRaftRequestOnce(ctx, r)
507 if err != nil {
508 return nil, err
509 }
510 if result.err != nil {
511 return nil, result.err
512 }
513 return result.resp, nil
514}
515
516func (s *EtcdServer) raftRequest(ctx context.Context, r pb.InternalRaftRequest) (proto.Message, error) {
517 return s.raftRequestOnce(ctx, r)
518}
519
520// doSerialize handles the auth logic, with permissions checked by "chk", for a serialized request "get". Returns a non-nil error on authentication failure.
521func (s *EtcdServer) doSerialize(ctx context.Context, chk func(*auth.AuthInfo) error, get func()) error {
522 ai, err := s.AuthInfoFromCtx(ctx)
523 if err != nil {
524 return err
525 }
526 if ai == nil {
527 // chk expects non-nil AuthInfo; use empty credentials
528 ai = &auth.AuthInfo{}
529 }
530 if err = chk(ai); err != nil {
531 return err
532 }
533 // fetch response for serialized request
534 get()
535 // check for stale token revision in case the auth store was updated while
536 // the request has been handled.
537 if ai.Revision != 0 && ai.Revision != s.authStore.Revision() {
538 return auth.ErrAuthOldRevision
539 }
540 return nil
541}
542
543func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (*applyResult, error) {
544 ai := s.getAppliedIndex()
545 ci := s.getCommittedIndex()
546 if ci > ai+maxGapBetweenApplyAndCommitIndex {
547 return nil, ErrTooManyRequests
548 }
549
550 r.Header = &pb.RequestHeader{
551 ID: s.reqIDGen.Next(),
552 }
553
554 authInfo, err := s.AuthInfoFromCtx(ctx)
555 if err != nil {
556 return nil, err
557 }
558 if authInfo != nil {
559 r.Header.Username = authInfo.Username
560 r.Header.AuthRevision = authInfo.Revision
561 }
562
563 data, err := r.Marshal()
564 if err != nil {
565 return nil, err
566 }
567
568 if len(data) > int(s.Cfg.MaxRequestBytes) {
569 return nil, ErrRequestTooLarge
570 }
571
572 id := r.ID
573 if id == 0 {
574 id = r.Header.ID
575 }
576 ch := s.w.Register(id)
577
578 cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
579 defer cancel()
580
581 start := time.Now()
582 s.r.Propose(cctx, data)
583 proposalsPending.Inc()
584 defer proposalsPending.Dec()
585
586 select {
587 case x := <-ch:
588 return x.(*applyResult), nil
589 case <-cctx.Done():
590 proposalsFailed.Inc()
591 s.w.Trigger(id, nil) // GC wait
592 return nil, s.parseProposeCtxErr(cctx.Err(), start)
593 case <-s.done:
594 return nil, ErrStopped
595 }
596}
597
598// Watchable returns a watchable interface attached to the etcdserver.
599func (s *EtcdServer) Watchable() mvcc.WatchableKV { return s.KV() }
600
601func (s *EtcdServer) linearizableReadLoop() {
602 var rs raft.ReadState
603
604 for {
605 ctxToSend := make([]byte, 8)
606 id1 := s.reqIDGen.Next()
607 binary.BigEndian.PutUint64(ctxToSend, id1)
608
609 leaderChangedNotifier := s.leaderChangedNotify()
610 select {
611 case <-leaderChangedNotifier:
612 continue
613 case <-s.readwaitc:
614 case <-s.stopping:
615 return
616 }
617
618 nextnr := newNotifier()
619
620 s.readMu.Lock()
621 nr := s.readNotifier
622 s.readNotifier = nextnr
623 s.readMu.Unlock()
624
625 cctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
626 if err := s.r.ReadIndex(cctx, ctxToSend); err != nil {
627 cancel()
628 if err == raft.ErrStopped {
629 return
630 }
631 plog.Errorf("failed to get read index from raft: %v", err)
632 readIndexFailed.Inc()
633 nr.notify(err)
634 continue
635 }
636 cancel()
637
638 var (
639 timeout bool
640 done bool
641 )
642 for !timeout && !done {
643 select {
644 case rs = <-s.r.readStateC:
645 done = bytes.Equal(rs.RequestCtx, ctxToSend)
646 if !done {
647 // a previous request might time out. now we should ignore the response of it and
648 // continue waiting for the response of the current requests.
649 id2 := uint64(0)
650 if len(rs.RequestCtx) == 8 {
651 id2 = binary.BigEndian.Uint64(rs.RequestCtx)
652 }
653 plog.Warningf("ignored out-of-date read index response; local node read indexes queueing up and waiting to be in sync with leader (request ID want %d, got %d)", id1, id2)
654 slowReadIndex.Inc()
655 }
656
657 case <-leaderChangedNotifier:
658 timeout = true
659 readIndexFailed.Inc()
660 // return a retryable error.
661 nr.notify(ErrLeaderChanged)
662
663 case <-time.After(s.Cfg.ReqTimeout()):
664 plog.Warningf("timed out waiting for read index response (local node might have slow network)")
665 nr.notify(ErrTimeout)
666 timeout = true
667 slowReadIndex.Inc()
668
669 case <-s.stopping:
670 return
671 }
672 }
673 if !done {
674 continue
675 }
676
677 if ai := s.getAppliedIndex(); ai < rs.Index {
678 select {
679 case <-s.applyWait.Wait(rs.Index):
680 case <-s.stopping:
681 return
682 }
683 }
684 // unblock all l-reads requested at indices before rs.Index
685 nr.notify(nil)
686 }
687}
688
689func (s *EtcdServer) linearizableReadNotify(ctx context.Context) error {
690 s.readMu.RLock()
691 nc := s.readNotifier
692 s.readMu.RUnlock()
693
694 // signal linearizable loop for current notify if it hasn't been already
695 select {
696 case s.readwaitc <- struct{}{}:
697 default:
698 }
699
700 // wait for read state notification
701 select {
702 case <-nc.c:
703 return nc.err
704 case <-ctx.Done():
705 return ctx.Err()
706 case <-s.done:
707 return ErrStopped
708 }
709}
710
711func (s *EtcdServer) AuthInfoFromCtx(ctx context.Context) (*auth.AuthInfo, error) {
712 authInfo, err := s.AuthStore().AuthInfoFromCtx(ctx)
713 if authInfo != nil || err != nil {
714 return authInfo, err
715 }
716 if !s.Cfg.ClientCertAuthEnabled {
717 return nil, nil
718 }
719 authInfo = s.AuthStore().AuthInfoFromTLS(ctx)
720 return authInfo, nil
721}