blob: b2084618b8a8905de5deddb21c7628d5c4829f7e [file] [log] [blame]
khenaidooab1f7bd2019-11-14 14:00:27 -05001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package etcdserver
16
17import (
18 "bytes"
19 "context"
20 "encoding/binary"
21 "time"
22
23 "go.etcd.io/etcd/auth"
24 "go.etcd.io/etcd/etcdserver/api/membership"
25 pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
26 "go.etcd.io/etcd/lease"
27 "go.etcd.io/etcd/lease/leasehttp"
28 "go.etcd.io/etcd/mvcc"
29 "go.etcd.io/etcd/raft"
30
31 "github.com/gogo/protobuf/proto"
32 "go.uber.org/zap"
33)
34
35const (
36 // In the health case, there might be a small gap (10s of entries) between
37 // the applied index and committed index.
38 // However, if the committed entries are very heavy to apply, the gap might grow.
39 // We should stop accepting new proposals if the gap growing to a certain point.
40 maxGapBetweenApplyAndCommitIndex = 5000
41)
42
43type RaftKV interface {
44 Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error)
45 Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error)
46 DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error)
47 Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error)
48 Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error)
49}
50
51type Lessor interface {
52 // LeaseGrant sends LeaseGrant request to raft and apply it after committed.
53 LeaseGrant(ctx context.Context, r *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error)
54 // LeaseRevoke sends LeaseRevoke request to raft and apply it after committed.
55 LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error)
56
57 // LeaseRenew renews the lease with given ID. The renewed TTL is returned. Or an error
58 // is returned.
59 LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error)
60
61 // LeaseTimeToLive retrieves lease information.
62 LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error)
63
64 // LeaseLeases lists all leases.
65 LeaseLeases(ctx context.Context, r *pb.LeaseLeasesRequest) (*pb.LeaseLeasesResponse, error)
66}
67
68type Authenticator interface {
69 AuthEnable(ctx context.Context, r *pb.AuthEnableRequest) (*pb.AuthEnableResponse, error)
70 AuthDisable(ctx context.Context, r *pb.AuthDisableRequest) (*pb.AuthDisableResponse, error)
71 Authenticate(ctx context.Context, r *pb.AuthenticateRequest) (*pb.AuthenticateResponse, error)
72 UserAdd(ctx context.Context, r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, error)
73 UserDelete(ctx context.Context, r *pb.AuthUserDeleteRequest) (*pb.AuthUserDeleteResponse, error)
74 UserChangePassword(ctx context.Context, r *pb.AuthUserChangePasswordRequest) (*pb.AuthUserChangePasswordResponse, error)
75 UserGrantRole(ctx context.Context, r *pb.AuthUserGrantRoleRequest) (*pb.AuthUserGrantRoleResponse, error)
76 UserGet(ctx context.Context, r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, error)
77 UserRevokeRole(ctx context.Context, r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUserRevokeRoleResponse, error)
78 RoleAdd(ctx context.Context, r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, error)
79 RoleGrantPermission(ctx context.Context, r *pb.AuthRoleGrantPermissionRequest) (*pb.AuthRoleGrantPermissionResponse, error)
80 RoleGet(ctx context.Context, r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, error)
81 RoleRevokePermission(ctx context.Context, r *pb.AuthRoleRevokePermissionRequest) (*pb.AuthRoleRevokePermissionResponse, error)
82 RoleDelete(ctx context.Context, r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDeleteResponse, error)
83 UserList(ctx context.Context, r *pb.AuthUserListRequest) (*pb.AuthUserListResponse, error)
84 RoleList(ctx context.Context, r *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error)
85}
86
87func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
88 var resp *pb.RangeResponse
89 var err error
90 defer func(start time.Time) {
91 warnOfExpensiveReadOnlyRangeRequest(s.getLogger(), start, r, resp, err)
92 }(time.Now())
93
94 if !r.Serializable {
95 err = s.linearizableReadNotify(ctx)
96 if err != nil {
97 return nil, err
98 }
99 }
100 chk := func(ai *auth.AuthInfo) error {
101 return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd)
102 }
103
104 get := func() { resp, err = s.applyV3Base.Range(nil, r) }
105 if serr := s.doSerialize(ctx, chk, get); serr != nil {
106 err = serr
107 return nil, err
108 }
109 return resp, err
110}
111
112func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
113 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Put: r})
114 if err != nil {
115 return nil, err
116 }
117 return resp.(*pb.PutResponse), nil
118}
119
120func (s *EtcdServer) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
121 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{DeleteRange: r})
122 if err != nil {
123 return nil, err
124 }
125 return resp.(*pb.DeleteRangeResponse), nil
126}
127
128func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
129 if isTxnReadonly(r) {
130 if !isTxnSerializable(r) {
131 err := s.linearizableReadNotify(ctx)
132 if err != nil {
133 return nil, err
134 }
135 }
136 var resp *pb.TxnResponse
137 var err error
138 chk := func(ai *auth.AuthInfo) error {
139 return checkTxnAuth(s.authStore, ai, r)
140 }
141
142 defer func(start time.Time) {
143 warnOfExpensiveReadOnlyTxnRequest(s.getLogger(), start, r, resp, err)
144 }(time.Now())
145
146 get := func() { resp, err = s.applyV3Base.Txn(r) }
147 if serr := s.doSerialize(ctx, chk, get); serr != nil {
148 return nil, serr
149 }
150 return resp, err
151 }
152
153 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Txn: r})
154 if err != nil {
155 return nil, err
156 }
157 return resp.(*pb.TxnResponse), nil
158}
159
160func isTxnSerializable(r *pb.TxnRequest) bool {
161 for _, u := range r.Success {
162 if r := u.GetRequestRange(); r == nil || !r.Serializable {
163 return false
164 }
165 }
166 for _, u := range r.Failure {
167 if r := u.GetRequestRange(); r == nil || !r.Serializable {
168 return false
169 }
170 }
171 return true
172}
173
174func isTxnReadonly(r *pb.TxnRequest) bool {
175 for _, u := range r.Success {
176 if r := u.GetRequestRange(); r == nil {
177 return false
178 }
179 }
180 for _, u := range r.Failure {
181 if r := u.GetRequestRange(); r == nil {
182 return false
183 }
184 }
185 return true
186}
187
188func (s *EtcdServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) {
189 result, err := s.processInternalRaftRequestOnce(ctx, pb.InternalRaftRequest{Compaction: r})
190 if r.Physical && result != nil && result.physc != nil {
191 <-result.physc
192 // The compaction is done deleting keys; the hash is now settled
193 // but the data is not necessarily committed. If there's a crash,
194 // the hash may revert to a hash prior to compaction completing
195 // if the compaction resumes. Force the finished compaction to
196 // commit so it won't resume following a crash.
197 s.be.ForceCommit()
198 }
199 if err != nil {
200 return nil, err
201 }
202 if result.err != nil {
203 return nil, result.err
204 }
205 resp := result.resp.(*pb.CompactionResponse)
206 if resp == nil {
207 resp = &pb.CompactionResponse{}
208 }
209 if resp.Header == nil {
210 resp.Header = &pb.ResponseHeader{}
211 }
212 resp.Header.Revision = s.kv.Rev()
213 return resp, nil
214}
215
216func (s *EtcdServer) LeaseGrant(ctx context.Context, r *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
217 // no id given? choose one
218 for r.ID == int64(lease.NoLease) {
219 // only use positive int64 id's
220 r.ID = int64(s.reqIDGen.Next() & ((1 << 63) - 1))
221 }
222 resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseGrant: r})
223 if err != nil {
224 return nil, err
225 }
226 return resp.(*pb.LeaseGrantResponse), nil
227}
228
229func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
230 resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseRevoke: r})
231 if err != nil {
232 return nil, err
233 }
234 return resp.(*pb.LeaseRevokeResponse), nil
235}
236
237func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error) {
238 ttl, err := s.lessor.Renew(id)
239 if err == nil { // already requested to primary lessor(leader)
240 return ttl, nil
241 }
242 if err != lease.ErrNotPrimary {
243 return -1, err
244 }
245
246 cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
247 defer cancel()
248
249 // renewals don't go through raft; forward to leader manually
250 for cctx.Err() == nil && err != nil {
251 leader, lerr := s.waitLeader(cctx)
252 if lerr != nil {
253 return -1, lerr
254 }
255 for _, url := range leader.PeerURLs {
256 lurl := url + leasehttp.LeasePrefix
257 ttl, err = leasehttp.RenewHTTP(cctx, id, lurl, s.peerRt)
258 if err == nil || err == lease.ErrLeaseNotFound {
259 return ttl, err
260 }
261 }
262 }
263
264 if cctx.Err() == context.DeadlineExceeded {
265 return -1, ErrTimeout
266 }
267 return -1, ErrCanceled
268}
269
270func (s *EtcdServer) LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) {
271 if s.Leader() == s.ID() {
272 // primary; timetolive directly from leader
273 le := s.lessor.Lookup(lease.LeaseID(r.ID))
274 if le == nil {
275 return nil, lease.ErrLeaseNotFound
276 }
277 // TODO: fill out ResponseHeader
278 resp := &pb.LeaseTimeToLiveResponse{Header: &pb.ResponseHeader{}, ID: r.ID, TTL: int64(le.Remaining().Seconds()), GrantedTTL: le.TTL()}
279 if r.Keys {
280 ks := le.Keys()
281 kbs := make([][]byte, len(ks))
282 for i := range ks {
283 kbs[i] = []byte(ks[i])
284 }
285 resp.Keys = kbs
286 }
287 return resp, nil
288 }
289
290 cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
291 defer cancel()
292
293 // forward to leader
294 for cctx.Err() == nil {
295 leader, err := s.waitLeader(cctx)
296 if err != nil {
297 return nil, err
298 }
299 for _, url := range leader.PeerURLs {
300 lurl := url + leasehttp.LeaseInternalPrefix
301 resp, err := leasehttp.TimeToLiveHTTP(cctx, lease.LeaseID(r.ID), r.Keys, lurl, s.peerRt)
302 if err == nil {
303 return resp.LeaseTimeToLiveResponse, nil
304 }
305 if err == lease.ErrLeaseNotFound {
306 return nil, err
307 }
308 }
309 }
310
311 if cctx.Err() == context.DeadlineExceeded {
312 return nil, ErrTimeout
313 }
314 return nil, ErrCanceled
315}
316
317func (s *EtcdServer) LeaseLeases(ctx context.Context, r *pb.LeaseLeasesRequest) (*pb.LeaseLeasesResponse, error) {
318 ls := s.lessor.Leases()
319 lss := make([]*pb.LeaseStatus, len(ls))
320 for i := range ls {
321 lss[i] = &pb.LeaseStatus{ID: int64(ls[i].ID)}
322 }
323 return &pb.LeaseLeasesResponse{Header: newHeader(s), Leases: lss}, nil
324}
325
326func (s *EtcdServer) waitLeader(ctx context.Context) (*membership.Member, error) {
327 leader := s.cluster.Member(s.Leader())
328 for leader == nil {
329 // wait an election
330 dur := time.Duration(s.Cfg.ElectionTicks) * time.Duration(s.Cfg.TickMs) * time.Millisecond
331 select {
332 case <-time.After(dur):
333 leader = s.cluster.Member(s.Leader())
334 case <-s.stopping:
335 return nil, ErrStopped
336 case <-ctx.Done():
337 return nil, ErrNoLeader
338 }
339 }
340 if leader == nil || len(leader.PeerURLs) == 0 {
341 return nil, ErrNoLeader
342 }
343 return leader, nil
344}
345
346func (s *EtcdServer) Alarm(ctx context.Context, r *pb.AlarmRequest) (*pb.AlarmResponse, error) {
347 resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{Alarm: r})
348 if err != nil {
349 return nil, err
350 }
351 return resp.(*pb.AlarmResponse), nil
352}
353
354func (s *EtcdServer) AuthEnable(ctx context.Context, r *pb.AuthEnableRequest) (*pb.AuthEnableResponse, error) {
355 resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{AuthEnable: r})
356 if err != nil {
357 return nil, err
358 }
359 return resp.(*pb.AuthEnableResponse), nil
360}
361
362func (s *EtcdServer) AuthDisable(ctx context.Context, r *pb.AuthDisableRequest) (*pb.AuthDisableResponse, error) {
363 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthDisable: r})
364 if err != nil {
365 return nil, err
366 }
367 return resp.(*pb.AuthDisableResponse), nil
368}
369
370func (s *EtcdServer) Authenticate(ctx context.Context, r *pb.AuthenticateRequest) (*pb.AuthenticateResponse, error) {
371 if err := s.linearizableReadNotify(ctx); err != nil {
372 return nil, err
373 }
374
375 lg := s.getLogger()
376
377 var resp proto.Message
378 for {
379 checkedRevision, err := s.AuthStore().CheckPassword(r.Name, r.Password)
380 if err != nil {
381 if err != auth.ErrAuthNotEnabled {
382 if lg != nil {
383 lg.Warn(
384 "invalid authentication was requested",
385 zap.String("user", r.Name),
386 zap.Error(err),
387 )
388 } else {
389 plog.Errorf("invalid authentication request to user %s was issued", r.Name)
390 }
391 }
392 return nil, err
393 }
394
395 st, err := s.AuthStore().GenTokenPrefix()
396 if err != nil {
397 return nil, err
398 }
399
400 internalReq := &pb.InternalAuthenticateRequest{
401 Name: r.Name,
402 Password: r.Password,
403 SimpleToken: st,
404 }
405
406 resp, err = s.raftRequestOnce(ctx, pb.InternalRaftRequest{Authenticate: internalReq})
407 if err != nil {
408 return nil, err
409 }
410 if checkedRevision == s.AuthStore().Revision() {
411 break
412 }
413
414 if lg != nil {
415 lg.Info("revision when password checked became stale; retrying")
416 } else {
417 plog.Infof("revision when password checked is obsolete, retrying")
418 }
419 }
420
421 return resp.(*pb.AuthenticateResponse), nil
422}
423
424func (s *EtcdServer) UserAdd(ctx context.Context, r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, error) {
425 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserAdd: r})
426 if err != nil {
427 return nil, err
428 }
429 return resp.(*pb.AuthUserAddResponse), nil
430}
431
432func (s *EtcdServer) UserDelete(ctx context.Context, r *pb.AuthUserDeleteRequest) (*pb.AuthUserDeleteResponse, error) {
433 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserDelete: r})
434 if err != nil {
435 return nil, err
436 }
437 return resp.(*pb.AuthUserDeleteResponse), nil
438}
439
440func (s *EtcdServer) UserChangePassword(ctx context.Context, r *pb.AuthUserChangePasswordRequest) (*pb.AuthUserChangePasswordResponse, error) {
441 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserChangePassword: r})
442 if err != nil {
443 return nil, err
444 }
445 return resp.(*pb.AuthUserChangePasswordResponse), nil
446}
447
448func (s *EtcdServer) UserGrantRole(ctx context.Context, r *pb.AuthUserGrantRoleRequest) (*pb.AuthUserGrantRoleResponse, error) {
449 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserGrantRole: r})
450 if err != nil {
451 return nil, err
452 }
453 return resp.(*pb.AuthUserGrantRoleResponse), nil
454}
455
456func (s *EtcdServer) UserGet(ctx context.Context, r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, error) {
457 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserGet: r})
458 if err != nil {
459 return nil, err
460 }
461 return resp.(*pb.AuthUserGetResponse), nil
462}
463
464func (s *EtcdServer) UserList(ctx context.Context, r *pb.AuthUserListRequest) (*pb.AuthUserListResponse, error) {
465 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserList: r})
466 if err != nil {
467 return nil, err
468 }
469 return resp.(*pb.AuthUserListResponse), nil
470}
471
472func (s *EtcdServer) UserRevokeRole(ctx context.Context, r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUserRevokeRoleResponse, error) {
473 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserRevokeRole: r})
474 if err != nil {
475 return nil, err
476 }
477 return resp.(*pb.AuthUserRevokeRoleResponse), nil
478}
479
480func (s *EtcdServer) RoleAdd(ctx context.Context, r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, error) {
481 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleAdd: r})
482 if err != nil {
483 return nil, err
484 }
485 return resp.(*pb.AuthRoleAddResponse), nil
486}
487
488func (s *EtcdServer) RoleGrantPermission(ctx context.Context, r *pb.AuthRoleGrantPermissionRequest) (*pb.AuthRoleGrantPermissionResponse, error) {
489 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleGrantPermission: r})
490 if err != nil {
491 return nil, err
492 }
493 return resp.(*pb.AuthRoleGrantPermissionResponse), nil
494}
495
496func (s *EtcdServer) RoleGet(ctx context.Context, r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, error) {
497 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleGet: r})
498 if err != nil {
499 return nil, err
500 }
501 return resp.(*pb.AuthRoleGetResponse), nil
502}
503
504func (s *EtcdServer) RoleList(ctx context.Context, r *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error) {
505 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleList: r})
506 if err != nil {
507 return nil, err
508 }
509 return resp.(*pb.AuthRoleListResponse), nil
510}
511
512func (s *EtcdServer) RoleRevokePermission(ctx context.Context, r *pb.AuthRoleRevokePermissionRequest) (*pb.AuthRoleRevokePermissionResponse, error) {
513 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleRevokePermission: r})
514 if err != nil {
515 return nil, err
516 }
517 return resp.(*pb.AuthRoleRevokePermissionResponse), nil
518}
519
520func (s *EtcdServer) RoleDelete(ctx context.Context, r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDeleteResponse, error) {
521 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleDelete: r})
522 if err != nil {
523 return nil, err
524 }
525 return resp.(*pb.AuthRoleDeleteResponse), nil
526}
527
528func (s *EtcdServer) raftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (proto.Message, error) {
529 result, err := s.processInternalRaftRequestOnce(ctx, r)
530 if err != nil {
531 return nil, err
532 }
533 if result.err != nil {
534 return nil, result.err
535 }
536 return result.resp, nil
537}
538
539func (s *EtcdServer) raftRequest(ctx context.Context, r pb.InternalRaftRequest) (proto.Message, error) {
540 for {
541 resp, err := s.raftRequestOnce(ctx, r)
542 if err != auth.ErrAuthOldRevision {
543 return resp, err
544 }
545 }
546}
547
548// doSerialize handles the auth logic, with permissions checked by "chk", for a serialized request "get". Returns a non-nil error on authentication failure.
549func (s *EtcdServer) doSerialize(ctx context.Context, chk func(*auth.AuthInfo) error, get func()) error {
550 ai, err := s.AuthInfoFromCtx(ctx)
551 if err != nil {
552 return err
553 }
554 if ai == nil {
555 // chk expects non-nil AuthInfo; use empty credentials
556 ai = &auth.AuthInfo{}
557 }
558 if err = chk(ai); err != nil {
559 return err
560 }
561 // fetch response for serialized request
562 get()
563 // check for stale token revision in case the auth store was updated while
564 // the request has been handled.
565 if ai.Revision != 0 && ai.Revision != s.authStore.Revision() {
566 return auth.ErrAuthOldRevision
567 }
568 return nil
569}
570
571func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (*applyResult, error) {
572 ai := s.getAppliedIndex()
573 ci := s.getCommittedIndex()
574 if ci > ai+maxGapBetweenApplyAndCommitIndex {
575 return nil, ErrTooManyRequests
576 }
577
578 r.Header = &pb.RequestHeader{
579 ID: s.reqIDGen.Next(),
580 }
581
582 authInfo, err := s.AuthInfoFromCtx(ctx)
583 if err != nil {
584 return nil, err
585 }
586 if authInfo != nil {
587 r.Header.Username = authInfo.Username
588 r.Header.AuthRevision = authInfo.Revision
589 }
590
591 data, err := r.Marshal()
592 if err != nil {
593 return nil, err
594 }
595
596 if len(data) > int(s.Cfg.MaxRequestBytes) {
597 return nil, ErrRequestTooLarge
598 }
599
600 id := r.ID
601 if id == 0 {
602 id = r.Header.ID
603 }
604 ch := s.w.Register(id)
605
606 cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
607 defer cancel()
608
609 start := time.Now()
610 err = s.r.Propose(cctx, data)
611 if err != nil {
612 proposalsFailed.Inc()
613 s.w.Trigger(id, nil) // GC wait
614 return nil, err
615 }
616 proposalsPending.Inc()
617 defer proposalsPending.Dec()
618
619 select {
620 case x := <-ch:
621 return x.(*applyResult), nil
622 case <-cctx.Done():
623 proposalsFailed.Inc()
624 s.w.Trigger(id, nil) // GC wait
625 return nil, s.parseProposeCtxErr(cctx.Err(), start)
626 case <-s.done:
627 return nil, ErrStopped
628 }
629}
630
631// Watchable returns a watchable interface attached to the etcdserver.
632func (s *EtcdServer) Watchable() mvcc.WatchableKV { return s.KV() }
633
634func (s *EtcdServer) linearizableReadLoop() {
635 var rs raft.ReadState
636
637 for {
638 ctxToSend := make([]byte, 8)
639 id1 := s.reqIDGen.Next()
640 binary.BigEndian.PutUint64(ctxToSend, id1)
641 leaderChangedNotifier := s.leaderChangedNotify()
642 select {
643 case <-leaderChangedNotifier:
644 continue
645 case <-s.readwaitc:
646 case <-s.stopping:
647 return
648 }
649
650 nextnr := newNotifier()
651
652 s.readMu.Lock()
653 nr := s.readNotifier
654 s.readNotifier = nextnr
655 s.readMu.Unlock()
656
657 lg := s.getLogger()
658 cctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
659 if err := s.r.ReadIndex(cctx, ctxToSend); err != nil {
660 cancel()
661 if err == raft.ErrStopped {
662 return
663 }
664 if lg != nil {
665 lg.Warn("failed to get read index from Raft", zap.Error(err))
666 } else {
667 plog.Errorf("failed to get read index from raft: %v", err)
668 }
669 readIndexFailed.Inc()
670 nr.notify(err)
671 continue
672 }
673 cancel()
674
675 var (
676 timeout bool
677 done bool
678 )
679 for !timeout && !done {
680 select {
681 case rs = <-s.r.readStateC:
682 done = bytes.Equal(rs.RequestCtx, ctxToSend)
683 if !done {
684 // a previous request might time out. now we should ignore the response of it and
685 // continue waiting for the response of the current requests.
686 id2 := uint64(0)
687 if len(rs.RequestCtx) == 8 {
688 id2 = binary.BigEndian.Uint64(rs.RequestCtx)
689 }
690 if lg != nil {
691 lg.Warn(
692 "ignored out-of-date read index response; local node read indexes queueing up and waiting to be in sync with leader",
693 zap.Uint64("sent-request-id", id1),
694 zap.Uint64("received-request-id", id2),
695 )
696 } else {
697 plog.Warningf("ignored out-of-date read index response; local node read indexes queueing up and waiting to be in sync with leader (request ID want %d, got %d)", id1, id2)
698 }
699 slowReadIndex.Inc()
700 }
701 case <-leaderChangedNotifier:
702 timeout = true
703 readIndexFailed.Inc()
704 // return a retryable error.
705 nr.notify(ErrLeaderChanged)
706 case <-time.After(s.Cfg.ReqTimeout()):
707 if lg != nil {
708 lg.Warn("timed out waiting for read index response (local node might have slow network)", zap.Duration("timeout", s.Cfg.ReqTimeout()))
709 } else {
710 plog.Warningf("timed out waiting for read index response (local node might have slow network)")
711 }
712 nr.notify(ErrTimeout)
713 timeout = true
714 slowReadIndex.Inc()
715 case <-s.stopping:
716 return
717 }
718 }
719 if !done {
720 continue
721 }
722
723 if ai := s.getAppliedIndex(); ai < rs.Index {
724 select {
725 case <-s.applyWait.Wait(rs.Index):
726 case <-s.stopping:
727 return
728 }
729 }
730 // unblock all l-reads requested at indices before rs.Index
731 nr.notify(nil)
732 }
733}
734
735func (s *EtcdServer) linearizableReadNotify(ctx context.Context) error {
736 s.readMu.RLock()
737 nc := s.readNotifier
738 s.readMu.RUnlock()
739
740 // signal linearizable loop for current notify if it hasn't been already
741 select {
742 case s.readwaitc <- struct{}{}:
743 default:
744 }
745
746 // wait for read state notification
747 select {
748 case <-nc.c:
749 return nc.err
750 case <-ctx.Done():
751 return ctx.Err()
752 case <-s.done:
753 return ErrStopped
754 }
755}
756
757func (s *EtcdServer) AuthInfoFromCtx(ctx context.Context) (*auth.AuthInfo, error) {
758 authInfo, err := s.AuthStore().AuthInfoFromCtx(ctx)
759 if authInfo != nil || err != nil {
760 return authInfo, err
761 }
762 if !s.Cfg.ClientCertAuthEnabled {
763 return nil, nil
764 }
765 authInfo = s.AuthStore().AuthInfoFromTLS(ctx)
766 return authInfo, nil
767
768}