blob: f214a1926bb9ab9a3b69902f1b585f61029446a3 [file] [log] [blame]
khenaidooffe076b2019-01-15 16:08:08 -05001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package etcdserver
16
17import (
18 "bytes"
19 "context"
20 "encoding/binary"
21 "time"
22
23 "github.com/coreos/etcd/auth"
24 pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
25 "github.com/coreos/etcd/etcdserver/membership"
26 "github.com/coreos/etcd/lease"
27 "github.com/coreos/etcd/lease/leasehttp"
28 "github.com/coreos/etcd/mvcc"
29 "github.com/coreos/etcd/raft"
30
31 "github.com/gogo/protobuf/proto"
32)
33
34const (
35 // In the health case, there might be a small gap (10s of entries) between
36 // the applied index and committed index.
37 // However, if the committed entries are very heavy to apply, the gap might grow.
38 // We should stop accepting new proposals if the gap growing to a certain point.
39 maxGapBetweenApplyAndCommitIndex = 5000
40)
41
42type RaftKV interface {
43 Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error)
44 Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error)
45 DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error)
46 Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error)
47 Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error)
48}
49
50type Lessor interface {
51 // LeaseGrant sends LeaseGrant request to raft and apply it after committed.
52 LeaseGrant(ctx context.Context, r *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error)
53 // LeaseRevoke sends LeaseRevoke request to raft and apply it after committed.
54 LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error)
55
56 // LeaseRenew renews the lease with given ID. The renewed TTL is returned. Or an error
57 // is returned.
58 LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error)
59
60 // LeaseTimeToLive retrieves lease information.
61 LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error)
62
63 // LeaseLeases lists all leases.
64 LeaseLeases(ctx context.Context, r *pb.LeaseLeasesRequest) (*pb.LeaseLeasesResponse, error)
65}
66
67type Authenticator interface {
68 AuthEnable(ctx context.Context, r *pb.AuthEnableRequest) (*pb.AuthEnableResponse, error)
69 AuthDisable(ctx context.Context, r *pb.AuthDisableRequest) (*pb.AuthDisableResponse, error)
70 Authenticate(ctx context.Context, r *pb.AuthenticateRequest) (*pb.AuthenticateResponse, error)
71 UserAdd(ctx context.Context, r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, error)
72 UserDelete(ctx context.Context, r *pb.AuthUserDeleteRequest) (*pb.AuthUserDeleteResponse, error)
73 UserChangePassword(ctx context.Context, r *pb.AuthUserChangePasswordRequest) (*pb.AuthUserChangePasswordResponse, error)
74 UserGrantRole(ctx context.Context, r *pb.AuthUserGrantRoleRequest) (*pb.AuthUserGrantRoleResponse, error)
75 UserGet(ctx context.Context, r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, error)
76 UserRevokeRole(ctx context.Context, r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUserRevokeRoleResponse, error)
77 RoleAdd(ctx context.Context, r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, error)
78 RoleGrantPermission(ctx context.Context, r *pb.AuthRoleGrantPermissionRequest) (*pb.AuthRoleGrantPermissionResponse, error)
79 RoleGet(ctx context.Context, r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, error)
80 RoleRevokePermission(ctx context.Context, r *pb.AuthRoleRevokePermissionRequest) (*pb.AuthRoleRevokePermissionResponse, error)
81 RoleDelete(ctx context.Context, r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDeleteResponse, error)
82 UserList(ctx context.Context, r *pb.AuthUserListRequest) (*pb.AuthUserListResponse, error)
83 RoleList(ctx context.Context, r *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error)
84}
85
86func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
87 var resp *pb.RangeResponse
88 var err error
89 defer func(start time.Time) {
90 warnOfExpensiveReadOnlyRangeRequest(start, r, resp, err)
91 }(time.Now())
92
93 if !r.Serializable {
94 err = s.linearizableReadNotify(ctx)
95 if err != nil {
96 return nil, err
97 }
98 }
99 chk := func(ai *auth.AuthInfo) error {
100 return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd)
101 }
102
103 get := func() { resp, err = s.applyV3Base.Range(nil, r) }
104 if serr := s.doSerialize(ctx, chk, get); serr != nil {
105 err = serr
106 return nil, err
107 }
108 return resp, err
109}
110
111func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
112 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Put: r})
113 if err != nil {
114 return nil, err
115 }
116 return resp.(*pb.PutResponse), nil
117}
118
119func (s *EtcdServer) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
120 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{DeleteRange: r})
121 if err != nil {
122 return nil, err
123 }
124 return resp.(*pb.DeleteRangeResponse), nil
125}
126
127func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
128 if isTxnReadonly(r) {
129 if !isTxnSerializable(r) {
130 err := s.linearizableReadNotify(ctx)
131 if err != nil {
132 return nil, err
133 }
134 }
135 var resp *pb.TxnResponse
136 var err error
137 chk := func(ai *auth.AuthInfo) error {
138 return checkTxnAuth(s.authStore, ai, r)
139 }
140
141 defer func(start time.Time) {
142 warnOfExpensiveReadOnlyTxnRequest(start, r, resp, err)
143 }(time.Now())
144
145 get := func() { resp, err = s.applyV3Base.Txn(r) }
146 if serr := s.doSerialize(ctx, chk, get); serr != nil {
147 return nil, serr
148 }
149 return resp, err
150 }
151
152 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Txn: r})
153 if err != nil {
154 return nil, err
155 }
156 return resp.(*pb.TxnResponse), nil
157}
158
159func isTxnSerializable(r *pb.TxnRequest) bool {
160 for _, u := range r.Success {
161 if r := u.GetRequestRange(); r == nil || !r.Serializable {
162 return false
163 }
164 }
165 for _, u := range r.Failure {
166 if r := u.GetRequestRange(); r == nil || !r.Serializable {
167 return false
168 }
169 }
170 return true
171}
172
173func isTxnReadonly(r *pb.TxnRequest) bool {
174 for _, u := range r.Success {
175 if r := u.GetRequestRange(); r == nil {
176 return false
177 }
178 }
179 for _, u := range r.Failure {
180 if r := u.GetRequestRange(); r == nil {
181 return false
182 }
183 }
184 return true
185}
186
187func (s *EtcdServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) {
188 result, err := s.processInternalRaftRequestOnce(ctx, pb.InternalRaftRequest{Compaction: r})
189 if r.Physical && result != nil && result.physc != nil {
190 <-result.physc
191 // The compaction is done deleting keys; the hash is now settled
192 // but the data is not necessarily committed. If there's a crash,
193 // the hash may revert to a hash prior to compaction completing
194 // if the compaction resumes. Force the finished compaction to
195 // commit so it won't resume following a crash.
196 s.be.ForceCommit()
197 }
198 if err != nil {
199 return nil, err
200 }
201 if result.err != nil {
202 return nil, result.err
203 }
204 resp := result.resp.(*pb.CompactionResponse)
205 if resp == nil {
206 resp = &pb.CompactionResponse{}
207 }
208 if resp.Header == nil {
209 resp.Header = &pb.ResponseHeader{}
210 }
211 resp.Header.Revision = s.kv.Rev()
212 return resp, nil
213}
214
215func (s *EtcdServer) LeaseGrant(ctx context.Context, r *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
216 // no id given? choose one
217 for r.ID == int64(lease.NoLease) {
218 // only use positive int64 id's
219 r.ID = int64(s.reqIDGen.Next() & ((1 << 63) - 1))
220 }
221 resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseGrant: r})
222 if err != nil {
223 return nil, err
224 }
225 return resp.(*pb.LeaseGrantResponse), nil
226}
227
228func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
229 resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseRevoke: r})
230 if err != nil {
231 return nil, err
232 }
233 return resp.(*pb.LeaseRevokeResponse), nil
234}
235
236func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error) {
237 ttl, err := s.lessor.Renew(id)
238 if err == nil { // already requested to primary lessor(leader)
239 return ttl, nil
240 }
241 if err != lease.ErrNotPrimary {
242 return -1, err
243 }
244
245 cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
246 defer cancel()
247
248 // renewals don't go through raft; forward to leader manually
249 for cctx.Err() == nil && err != nil {
250 leader, lerr := s.waitLeader(cctx)
251 if lerr != nil {
252 return -1, lerr
253 }
254 for _, url := range leader.PeerURLs {
255 lurl := url + leasehttp.LeasePrefix
256 ttl, err = leasehttp.RenewHTTP(cctx, id, lurl, s.peerRt)
257 if err == nil || err == lease.ErrLeaseNotFound {
258 return ttl, err
259 }
260 }
261 }
262 return -1, ErrTimeout
263}
264
265func (s *EtcdServer) LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) {
266 if s.Leader() == s.ID() {
267 // primary; timetolive directly from leader
268 le := s.lessor.Lookup(lease.LeaseID(r.ID))
269 if le == nil {
270 return nil, lease.ErrLeaseNotFound
271 }
272 // TODO: fill out ResponseHeader
273 resp := &pb.LeaseTimeToLiveResponse{Header: &pb.ResponseHeader{}, ID: r.ID, TTL: int64(le.Remaining().Seconds()), GrantedTTL: le.TTL()}
274 if r.Keys {
275 ks := le.Keys()
276 kbs := make([][]byte, len(ks))
277 for i := range ks {
278 kbs[i] = []byte(ks[i])
279 }
280 resp.Keys = kbs
281 }
282 return resp, nil
283 }
284
285 cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
286 defer cancel()
287
288 // forward to leader
289 for cctx.Err() == nil {
290 leader, err := s.waitLeader(cctx)
291 if err != nil {
292 return nil, err
293 }
294 for _, url := range leader.PeerURLs {
295 lurl := url + leasehttp.LeaseInternalPrefix
296 resp, err := leasehttp.TimeToLiveHTTP(cctx, lease.LeaseID(r.ID), r.Keys, lurl, s.peerRt)
297 if err == nil {
298 return resp.LeaseTimeToLiveResponse, nil
299 }
300 if err == lease.ErrLeaseNotFound {
301 return nil, err
302 }
303 }
304 }
305 return nil, ErrTimeout
306}
307
308func (s *EtcdServer) LeaseLeases(ctx context.Context, r *pb.LeaseLeasesRequest) (*pb.LeaseLeasesResponse, error) {
309 ls := s.lessor.Leases()
310 lss := make([]*pb.LeaseStatus, len(ls))
311 for i := range ls {
312 lss[i] = &pb.LeaseStatus{ID: int64(ls[i].ID)}
313 }
314 return &pb.LeaseLeasesResponse{Header: newHeader(s), Leases: lss}, nil
315}
316
317func (s *EtcdServer) waitLeader(ctx context.Context) (*membership.Member, error) {
318 leader := s.cluster.Member(s.Leader())
319 for leader == nil {
320 // wait an election
321 dur := time.Duration(s.Cfg.ElectionTicks) * time.Duration(s.Cfg.TickMs) * time.Millisecond
322 select {
323 case <-time.After(dur):
324 leader = s.cluster.Member(s.Leader())
325 case <-s.stopping:
326 return nil, ErrStopped
327 case <-ctx.Done():
328 return nil, ErrNoLeader
329 }
330 }
331 if leader == nil || len(leader.PeerURLs) == 0 {
332 return nil, ErrNoLeader
333 }
334 return leader, nil
335}
336
337func (s *EtcdServer) Alarm(ctx context.Context, r *pb.AlarmRequest) (*pb.AlarmResponse, error) {
338 resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{Alarm: r})
339 if err != nil {
340 return nil, err
341 }
342 return resp.(*pb.AlarmResponse), nil
343}
344
345func (s *EtcdServer) AuthEnable(ctx context.Context, r *pb.AuthEnableRequest) (*pb.AuthEnableResponse, error) {
346 resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{AuthEnable: r})
347 if err != nil {
348 return nil, err
349 }
350 return resp.(*pb.AuthEnableResponse), nil
351}
352
353func (s *EtcdServer) AuthDisable(ctx context.Context, r *pb.AuthDisableRequest) (*pb.AuthDisableResponse, error) {
354 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthDisable: r})
355 if err != nil {
356 return nil, err
357 }
358 return resp.(*pb.AuthDisableResponse), nil
359}
360
361func (s *EtcdServer) Authenticate(ctx context.Context, r *pb.AuthenticateRequest) (*pb.AuthenticateResponse, error) {
362 if err := s.linearizableReadNotify(ctx); err != nil {
363 return nil, err
364 }
365
366 var resp proto.Message
367 for {
368 checkedRevision, err := s.AuthStore().CheckPassword(r.Name, r.Password)
369 if err != nil {
370 if err != auth.ErrAuthNotEnabled {
371 plog.Errorf("invalid authentication request to user %s was issued", r.Name)
372 }
373 return nil, err
374 }
375
376 st, err := s.AuthStore().GenTokenPrefix()
377 if err != nil {
378 return nil, err
379 }
380
381 internalReq := &pb.InternalAuthenticateRequest{
382 Name: r.Name,
383 Password: r.Password,
384 SimpleToken: st,
385 }
386
387 resp, err = s.raftRequestOnce(ctx, pb.InternalRaftRequest{Authenticate: internalReq})
388 if err != nil {
389 return nil, err
390 }
391 if checkedRevision == s.AuthStore().Revision() {
392 break
393 }
394 plog.Infof("revision when password checked is obsolete, retrying")
395 }
396
397 return resp.(*pb.AuthenticateResponse), nil
398}
399
400func (s *EtcdServer) UserAdd(ctx context.Context, r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, error) {
401 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserAdd: r})
402 if err != nil {
403 return nil, err
404 }
405 return resp.(*pb.AuthUserAddResponse), nil
406}
407
408func (s *EtcdServer) UserDelete(ctx context.Context, r *pb.AuthUserDeleteRequest) (*pb.AuthUserDeleteResponse, error) {
409 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserDelete: r})
410 if err != nil {
411 return nil, err
412 }
413 return resp.(*pb.AuthUserDeleteResponse), nil
414}
415
416func (s *EtcdServer) UserChangePassword(ctx context.Context, r *pb.AuthUserChangePasswordRequest) (*pb.AuthUserChangePasswordResponse, error) {
417 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserChangePassword: r})
418 if err != nil {
419 return nil, err
420 }
421 return resp.(*pb.AuthUserChangePasswordResponse), nil
422}
423
424func (s *EtcdServer) UserGrantRole(ctx context.Context, r *pb.AuthUserGrantRoleRequest) (*pb.AuthUserGrantRoleResponse, error) {
425 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserGrantRole: r})
426 if err != nil {
427 return nil, err
428 }
429 return resp.(*pb.AuthUserGrantRoleResponse), nil
430}
431
432func (s *EtcdServer) UserGet(ctx context.Context, r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, error) {
433 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserGet: r})
434 if err != nil {
435 return nil, err
436 }
437 return resp.(*pb.AuthUserGetResponse), nil
438}
439
440func (s *EtcdServer) UserList(ctx context.Context, r *pb.AuthUserListRequest) (*pb.AuthUserListResponse, error) {
441 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserList: r})
442 if err != nil {
443 return nil, err
444 }
445 return resp.(*pb.AuthUserListResponse), nil
446}
447
448func (s *EtcdServer) UserRevokeRole(ctx context.Context, r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUserRevokeRoleResponse, error) {
449 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserRevokeRole: r})
450 if err != nil {
451 return nil, err
452 }
453 return resp.(*pb.AuthUserRevokeRoleResponse), nil
454}
455
456func (s *EtcdServer) RoleAdd(ctx context.Context, r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, error) {
457 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleAdd: r})
458 if err != nil {
459 return nil, err
460 }
461 return resp.(*pb.AuthRoleAddResponse), nil
462}
463
464func (s *EtcdServer) RoleGrantPermission(ctx context.Context, r *pb.AuthRoleGrantPermissionRequest) (*pb.AuthRoleGrantPermissionResponse, error) {
465 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleGrantPermission: r})
466 if err != nil {
467 return nil, err
468 }
469 return resp.(*pb.AuthRoleGrantPermissionResponse), nil
470}
471
472func (s *EtcdServer) RoleGet(ctx context.Context, r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, error) {
473 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleGet: r})
474 if err != nil {
475 return nil, err
476 }
477 return resp.(*pb.AuthRoleGetResponse), nil
478}
479
480func (s *EtcdServer) RoleList(ctx context.Context, r *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error) {
481 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleList: r})
482 if err != nil {
483 return nil, err
484 }
485 return resp.(*pb.AuthRoleListResponse), nil
486}
487
488func (s *EtcdServer) RoleRevokePermission(ctx context.Context, r *pb.AuthRoleRevokePermissionRequest) (*pb.AuthRoleRevokePermissionResponse, error) {
489 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleRevokePermission: r})
490 if err != nil {
491 return nil, err
492 }
493 return resp.(*pb.AuthRoleRevokePermissionResponse), nil
494}
495
496func (s *EtcdServer) RoleDelete(ctx context.Context, r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDeleteResponse, error) {
497 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleDelete: r})
498 if err != nil {
499 return nil, err
500 }
501 return resp.(*pb.AuthRoleDeleteResponse), nil
502}
503
504func (s *EtcdServer) raftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (proto.Message, error) {
505 result, err := s.processInternalRaftRequestOnce(ctx, r)
506 if err != nil {
507 return nil, err
508 }
509 if result.err != nil {
510 return nil, result.err
511 }
512 return result.resp, nil
513}
514
515func (s *EtcdServer) raftRequest(ctx context.Context, r pb.InternalRaftRequest) (proto.Message, error) {
516 for {
517 resp, err := s.raftRequestOnce(ctx, r)
518 if err != auth.ErrAuthOldRevision {
519 return resp, err
520 }
521 }
522}
523
524// doSerialize handles the auth logic, with permissions checked by "chk", for a serialized request "get". Returns a non-nil error on authentication failure.
525func (s *EtcdServer) doSerialize(ctx context.Context, chk func(*auth.AuthInfo) error, get func()) error {
526 for {
527 ai, err := s.AuthInfoFromCtx(ctx)
528 if err != nil {
529 return err
530 }
531 if ai == nil {
532 // chk expects non-nil AuthInfo; use empty credentials
533 ai = &auth.AuthInfo{}
534 }
535 if err = chk(ai); err != nil {
536 if err == auth.ErrAuthOldRevision {
537 continue
538 }
539 return err
540 }
541 // fetch response for serialized request
542 get()
543 // empty credentials or current auth info means no need to retry
544 if ai.Revision == 0 || ai.Revision == s.authStore.Revision() {
545 return nil
546 }
547 // avoid TOCTOU error, retry of the request is required.
548 }
549}
550
551func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (*applyResult, error) {
552 ai := s.getAppliedIndex()
553 ci := s.getCommittedIndex()
554 if ci > ai+maxGapBetweenApplyAndCommitIndex {
555 return nil, ErrTooManyRequests
556 }
557
558 r.Header = &pb.RequestHeader{
559 ID: s.reqIDGen.Next(),
560 }
561
562 authInfo, err := s.AuthInfoFromCtx(ctx)
563 if err != nil {
564 return nil, err
565 }
566 if authInfo != nil {
567 r.Header.Username = authInfo.Username
568 r.Header.AuthRevision = authInfo.Revision
569 }
570
571 data, err := r.Marshal()
572 if err != nil {
573 return nil, err
574 }
575
576 if len(data) > int(s.Cfg.MaxRequestBytes) {
577 return nil, ErrRequestTooLarge
578 }
579
580 id := r.ID
581 if id == 0 {
582 id = r.Header.ID
583 }
584 ch := s.w.Register(id)
585
586 cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
587 defer cancel()
588
589 start := time.Now()
590 s.r.Propose(cctx, data)
591 proposalsPending.Inc()
592 defer proposalsPending.Dec()
593
594 select {
595 case x := <-ch:
596 return x.(*applyResult), nil
597 case <-cctx.Done():
598 proposalsFailed.Inc()
599 s.w.Trigger(id, nil) // GC wait
600 return nil, s.parseProposeCtxErr(cctx.Err(), start)
601 case <-s.done:
602 return nil, ErrStopped
603 }
604}
605
606// Watchable returns a watchable interface attached to the etcdserver.
607func (s *EtcdServer) Watchable() mvcc.WatchableKV { return s.KV() }
608
609func (s *EtcdServer) linearizableReadLoop() {
610 var rs raft.ReadState
611
612 for {
613 ctxToSend := make([]byte, 8)
614 id1 := s.reqIDGen.Next()
615 binary.BigEndian.PutUint64(ctxToSend, id1)
616
617 select {
618 case <-s.readwaitc:
619 case <-s.stopping:
620 return
621 }
622
623 nextnr := newNotifier()
624
625 s.readMu.Lock()
626 nr := s.readNotifier
627 s.readNotifier = nextnr
628 s.readMu.Unlock()
629
630 cctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
631 if err := s.r.ReadIndex(cctx, ctxToSend); err != nil {
632 cancel()
633 if err == raft.ErrStopped {
634 return
635 }
636 plog.Errorf("failed to get read index from raft: %v", err)
637 readIndexFailed.Inc()
638 nr.notify(err)
639 continue
640 }
641 cancel()
642
643 var (
644 timeout bool
645 done bool
646 )
647 for !timeout && !done {
648 select {
649 case rs = <-s.r.readStateC:
650 done = bytes.Equal(rs.RequestCtx, ctxToSend)
651 if !done {
652 // a previous request might time out. now we should ignore the response of it and
653 // continue waiting for the response of the current requests.
654 id2 := uint64(0)
655 if len(rs.RequestCtx) == 8 {
656 id2 = binary.BigEndian.Uint64(rs.RequestCtx)
657 }
658 plog.Warningf("ignored out-of-date read index response; local node read indexes queueing up and waiting to be in sync with leader (request ID want %d, got %d)", id1, id2)
659 slowReadIndex.Inc()
660 }
661
662 case <-time.After(s.Cfg.ReqTimeout()):
663 plog.Warningf("timed out waiting for read index response (local node might have slow network)")
664 nr.notify(ErrTimeout)
665 timeout = true
666 slowReadIndex.Inc()
667
668 case <-s.stopping:
669 return
670 }
671 }
672 if !done {
673 continue
674 }
675
676 if ai := s.getAppliedIndex(); ai < rs.Index {
677 select {
678 case <-s.applyWait.Wait(rs.Index):
679 case <-s.stopping:
680 return
681 }
682 }
683 // unblock all l-reads requested at indices before rs.Index
684 nr.notify(nil)
685 }
686}
687
688func (s *EtcdServer) linearizableReadNotify(ctx context.Context) error {
689 s.readMu.RLock()
690 nc := s.readNotifier
691 s.readMu.RUnlock()
692
693 // signal linearizable loop for current notify if it hasn't been already
694 select {
695 case s.readwaitc <- struct{}{}:
696 default:
697 }
698
699 // wait for read state notification
700 select {
701 case <-nc.c:
702 return nc.err
703 case <-ctx.Done():
704 return ctx.Err()
705 case <-s.done:
706 return ErrStopped
707 }
708}
709
710func (s *EtcdServer) AuthInfoFromCtx(ctx context.Context) (*auth.AuthInfo, error) {
711 authInfo, err := s.AuthStore().AuthInfoFromCtx(ctx)
712 if authInfo != nil || err != nil {
713 return authInfo, err
714 }
715 if !s.Cfg.ClientCertAuthEnabled {
716 return nil, nil
717 }
718 authInfo = s.AuthStore().AuthInfoFromTLS(ctx)
719 return authInfo, nil
720}