| // Copyright 2015 The etcd Authors |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package etcdserver |
| |
| import ( |
| "bytes" |
| "context" |
| "encoding/binary" |
| "time" |
| |
| "go.etcd.io/etcd/auth" |
| "go.etcd.io/etcd/etcdserver/api/membership" |
| pb "go.etcd.io/etcd/etcdserver/etcdserverpb" |
| "go.etcd.io/etcd/lease" |
| "go.etcd.io/etcd/lease/leasehttp" |
| "go.etcd.io/etcd/mvcc" |
| "go.etcd.io/etcd/raft" |
| |
| "github.com/gogo/protobuf/proto" |
| "go.uber.org/zap" |
| ) |
| |
| const ( |
| // In the health case, there might be a small gap (10s of entries) between |
| // the applied index and committed index. |
| // However, if the committed entries are very heavy to apply, the gap might grow. |
| // We should stop accepting new proposals if the gap growing to a certain point. |
| maxGapBetweenApplyAndCommitIndex = 5000 |
| ) |
| |
| type RaftKV interface { |
| Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) |
| Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) |
| DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) |
| Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) |
| Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) |
| } |
| |
| type Lessor interface { |
| // LeaseGrant sends LeaseGrant request to raft and apply it after committed. |
| LeaseGrant(ctx context.Context, r *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) |
| // LeaseRevoke sends LeaseRevoke request to raft and apply it after committed. |
| LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) |
| |
| // LeaseRenew renews the lease with given ID. The renewed TTL is returned. Or an error |
| // is returned. |
| LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error) |
| |
| // LeaseTimeToLive retrieves lease information. |
| LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) |
| |
| // LeaseLeases lists all leases. |
| LeaseLeases(ctx context.Context, r *pb.LeaseLeasesRequest) (*pb.LeaseLeasesResponse, error) |
| } |
| |
| type Authenticator interface { |
| AuthEnable(ctx context.Context, r *pb.AuthEnableRequest) (*pb.AuthEnableResponse, error) |
| AuthDisable(ctx context.Context, r *pb.AuthDisableRequest) (*pb.AuthDisableResponse, error) |
| Authenticate(ctx context.Context, r *pb.AuthenticateRequest) (*pb.AuthenticateResponse, error) |
| UserAdd(ctx context.Context, r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, error) |
| UserDelete(ctx context.Context, r *pb.AuthUserDeleteRequest) (*pb.AuthUserDeleteResponse, error) |
| UserChangePassword(ctx context.Context, r *pb.AuthUserChangePasswordRequest) (*pb.AuthUserChangePasswordResponse, error) |
| UserGrantRole(ctx context.Context, r *pb.AuthUserGrantRoleRequest) (*pb.AuthUserGrantRoleResponse, error) |
| UserGet(ctx context.Context, r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, error) |
| UserRevokeRole(ctx context.Context, r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUserRevokeRoleResponse, error) |
| RoleAdd(ctx context.Context, r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, error) |
| RoleGrantPermission(ctx context.Context, r *pb.AuthRoleGrantPermissionRequest) (*pb.AuthRoleGrantPermissionResponse, error) |
| RoleGet(ctx context.Context, r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, error) |
| RoleRevokePermission(ctx context.Context, r *pb.AuthRoleRevokePermissionRequest) (*pb.AuthRoleRevokePermissionResponse, error) |
| RoleDelete(ctx context.Context, r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDeleteResponse, error) |
| UserList(ctx context.Context, r *pb.AuthUserListRequest) (*pb.AuthUserListResponse, error) |
| RoleList(ctx context.Context, r *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error) |
| } |
| |
| func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) { |
| var resp *pb.RangeResponse |
| var err error |
| defer func(start time.Time) { |
| warnOfExpensiveReadOnlyRangeRequest(s.getLogger(), start, r, resp, err) |
| }(time.Now()) |
| |
| if !r.Serializable { |
| err = s.linearizableReadNotify(ctx) |
| if err != nil { |
| return nil, err |
| } |
| } |
| chk := func(ai *auth.AuthInfo) error { |
| return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd) |
| } |
| |
| get := func() { resp, err = s.applyV3Base.Range(nil, r) } |
| if serr := s.doSerialize(ctx, chk, get); serr != nil { |
| err = serr |
| return nil, err |
| } |
| return resp, err |
| } |
| |
| func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) { |
| resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Put: r}) |
| if err != nil { |
| return nil, err |
| } |
| return resp.(*pb.PutResponse), nil |
| } |
| |
| func (s *EtcdServer) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) { |
| resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{DeleteRange: r}) |
| if err != nil { |
| return nil, err |
| } |
| return resp.(*pb.DeleteRangeResponse), nil |
| } |
| |
| func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) { |
| if isTxnReadonly(r) { |
| if !isTxnSerializable(r) { |
| err := s.linearizableReadNotify(ctx) |
| if err != nil { |
| return nil, err |
| } |
| } |
| var resp *pb.TxnResponse |
| var err error |
| chk := func(ai *auth.AuthInfo) error { |
| return checkTxnAuth(s.authStore, ai, r) |
| } |
| |
| defer func(start time.Time) { |
| warnOfExpensiveReadOnlyTxnRequest(s.getLogger(), start, r, resp, err) |
| }(time.Now()) |
| |
| get := func() { resp, err = s.applyV3Base.Txn(r) } |
| if serr := s.doSerialize(ctx, chk, get); serr != nil { |
| return nil, serr |
| } |
| return resp, err |
| } |
| |
| resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Txn: r}) |
| if err != nil { |
| return nil, err |
| } |
| return resp.(*pb.TxnResponse), nil |
| } |
| |
| func isTxnSerializable(r *pb.TxnRequest) bool { |
| for _, u := range r.Success { |
| if r := u.GetRequestRange(); r == nil || !r.Serializable { |
| return false |
| } |
| } |
| for _, u := range r.Failure { |
| if r := u.GetRequestRange(); r == nil || !r.Serializable { |
| return false |
| } |
| } |
| return true |
| } |
| |
| func isTxnReadonly(r *pb.TxnRequest) bool { |
| for _, u := range r.Success { |
| if r := u.GetRequestRange(); r == nil { |
| return false |
| } |
| } |
| for _, u := range r.Failure { |
| if r := u.GetRequestRange(); r == nil { |
| return false |
| } |
| } |
| return true |
| } |
| |
| func (s *EtcdServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) { |
| result, err := s.processInternalRaftRequestOnce(ctx, pb.InternalRaftRequest{Compaction: r}) |
| if r.Physical && result != nil && result.physc != nil { |
| <-result.physc |
| // The compaction is done deleting keys; the hash is now settled |
| // but the data is not necessarily committed. If there's a crash, |
| // the hash may revert to a hash prior to compaction completing |
| // if the compaction resumes. Force the finished compaction to |
| // commit so it won't resume following a crash. |
| s.be.ForceCommit() |
| } |
| if err != nil { |
| return nil, err |
| } |
| if result.err != nil { |
| return nil, result.err |
| } |
| resp := result.resp.(*pb.CompactionResponse) |
| if resp == nil { |
| resp = &pb.CompactionResponse{} |
| } |
| if resp.Header == nil { |
| resp.Header = &pb.ResponseHeader{} |
| } |
| resp.Header.Revision = s.kv.Rev() |
| return resp, nil |
| } |
| |
| func (s *EtcdServer) LeaseGrant(ctx context.Context, r *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) { |
| // no id given? choose one |
| for r.ID == int64(lease.NoLease) { |
| // only use positive int64 id's |
| r.ID = int64(s.reqIDGen.Next() & ((1 << 63) - 1)) |
| } |
| resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseGrant: r}) |
| if err != nil { |
| return nil, err |
| } |
| return resp.(*pb.LeaseGrantResponse), nil |
| } |
| |
| func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) { |
| resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseRevoke: r}) |
| if err != nil { |
| return nil, err |
| } |
| return resp.(*pb.LeaseRevokeResponse), nil |
| } |
| |
| func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error) { |
| ttl, err := s.lessor.Renew(id) |
| if err == nil { // already requested to primary lessor(leader) |
| return ttl, nil |
| } |
| if err != lease.ErrNotPrimary { |
| return -1, err |
| } |
| |
| cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout()) |
| defer cancel() |
| |
| // renewals don't go through raft; forward to leader manually |
| for cctx.Err() == nil && err != nil { |
| leader, lerr := s.waitLeader(cctx) |
| if lerr != nil { |
| return -1, lerr |
| } |
| for _, url := range leader.PeerURLs { |
| lurl := url + leasehttp.LeasePrefix |
| ttl, err = leasehttp.RenewHTTP(cctx, id, lurl, s.peerRt) |
| if err == nil || err == lease.ErrLeaseNotFound { |
| return ttl, err |
| } |
| } |
| } |
| |
| if cctx.Err() == context.DeadlineExceeded { |
| return -1, ErrTimeout |
| } |
| return -1, ErrCanceled |
| } |
| |
| func (s *EtcdServer) LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) { |
| if s.Leader() == s.ID() { |
| // primary; timetolive directly from leader |
| le := s.lessor.Lookup(lease.LeaseID(r.ID)) |
| if le == nil { |
| return nil, lease.ErrLeaseNotFound |
| } |
| // TODO: fill out ResponseHeader |
| resp := &pb.LeaseTimeToLiveResponse{Header: &pb.ResponseHeader{}, ID: r.ID, TTL: int64(le.Remaining().Seconds()), GrantedTTL: le.TTL()} |
| if r.Keys { |
| ks := le.Keys() |
| kbs := make([][]byte, len(ks)) |
| for i := range ks { |
| kbs[i] = []byte(ks[i]) |
| } |
| resp.Keys = kbs |
| } |
| return resp, nil |
| } |
| |
| cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout()) |
| defer cancel() |
| |
| // forward to leader |
| for cctx.Err() == nil { |
| leader, err := s.waitLeader(cctx) |
| if err != nil { |
| return nil, err |
| } |
| for _, url := range leader.PeerURLs { |
| lurl := url + leasehttp.LeaseInternalPrefix |
| resp, err := leasehttp.TimeToLiveHTTP(cctx, lease.LeaseID(r.ID), r.Keys, lurl, s.peerRt) |
| if err == nil { |
| return resp.LeaseTimeToLiveResponse, nil |
| } |
| if err == lease.ErrLeaseNotFound { |
| return nil, err |
| } |
| } |
| } |
| |
| if cctx.Err() == context.DeadlineExceeded { |
| return nil, ErrTimeout |
| } |
| return nil, ErrCanceled |
| } |
| |
| func (s *EtcdServer) LeaseLeases(ctx context.Context, r *pb.LeaseLeasesRequest) (*pb.LeaseLeasesResponse, error) { |
| ls := s.lessor.Leases() |
| lss := make([]*pb.LeaseStatus, len(ls)) |
| for i := range ls { |
| lss[i] = &pb.LeaseStatus{ID: int64(ls[i].ID)} |
| } |
| return &pb.LeaseLeasesResponse{Header: newHeader(s), Leases: lss}, nil |
| } |
| |
| func (s *EtcdServer) waitLeader(ctx context.Context) (*membership.Member, error) { |
| leader := s.cluster.Member(s.Leader()) |
| for leader == nil { |
| // wait an election |
| dur := time.Duration(s.Cfg.ElectionTicks) * time.Duration(s.Cfg.TickMs) * time.Millisecond |
| select { |
| case <-time.After(dur): |
| leader = s.cluster.Member(s.Leader()) |
| case <-s.stopping: |
| return nil, ErrStopped |
| case <-ctx.Done(): |
| return nil, ErrNoLeader |
| } |
| } |
| if leader == nil || len(leader.PeerURLs) == 0 { |
| return nil, ErrNoLeader |
| } |
| return leader, nil |
| } |
| |
| func (s *EtcdServer) Alarm(ctx context.Context, r *pb.AlarmRequest) (*pb.AlarmResponse, error) { |
| resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{Alarm: r}) |
| if err != nil { |
| return nil, err |
| } |
| return resp.(*pb.AlarmResponse), nil |
| } |
| |
| func (s *EtcdServer) AuthEnable(ctx context.Context, r *pb.AuthEnableRequest) (*pb.AuthEnableResponse, error) { |
| resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{AuthEnable: r}) |
| if err != nil { |
| return nil, err |
| } |
| return resp.(*pb.AuthEnableResponse), nil |
| } |
| |
| func (s *EtcdServer) AuthDisable(ctx context.Context, r *pb.AuthDisableRequest) (*pb.AuthDisableResponse, error) { |
| resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthDisable: r}) |
| if err != nil { |
| return nil, err |
| } |
| return resp.(*pb.AuthDisableResponse), nil |
| } |
| |
| func (s *EtcdServer) Authenticate(ctx context.Context, r *pb.AuthenticateRequest) (*pb.AuthenticateResponse, error) { |
| if err := s.linearizableReadNotify(ctx); err != nil { |
| return nil, err |
| } |
| |
| lg := s.getLogger() |
| |
| var resp proto.Message |
| for { |
| checkedRevision, err := s.AuthStore().CheckPassword(r.Name, r.Password) |
| if err != nil { |
| if err != auth.ErrAuthNotEnabled { |
| if lg != nil { |
| lg.Warn( |
| "invalid authentication was requested", |
| zap.String("user", r.Name), |
| zap.Error(err), |
| ) |
| } else { |
| plog.Errorf("invalid authentication request to user %s was issued", r.Name) |
| } |
| } |
| return nil, err |
| } |
| |
| st, err := s.AuthStore().GenTokenPrefix() |
| if err != nil { |
| return nil, err |
| } |
| |
| internalReq := &pb.InternalAuthenticateRequest{ |
| Name: r.Name, |
| Password: r.Password, |
| SimpleToken: st, |
| } |
| |
| resp, err = s.raftRequestOnce(ctx, pb.InternalRaftRequest{Authenticate: internalReq}) |
| if err != nil { |
| return nil, err |
| } |
| if checkedRevision == s.AuthStore().Revision() { |
| break |
| } |
| |
| if lg != nil { |
| lg.Info("revision when password checked became stale; retrying") |
| } else { |
| plog.Infof("revision when password checked is obsolete, retrying") |
| } |
| } |
| |
| return resp.(*pb.AuthenticateResponse), nil |
| } |
| |
| func (s *EtcdServer) UserAdd(ctx context.Context, r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, error) { |
| resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserAdd: r}) |
| if err != nil { |
| return nil, err |
| } |
| return resp.(*pb.AuthUserAddResponse), nil |
| } |
| |
| func (s *EtcdServer) UserDelete(ctx context.Context, r *pb.AuthUserDeleteRequest) (*pb.AuthUserDeleteResponse, error) { |
| resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserDelete: r}) |
| if err != nil { |
| return nil, err |
| } |
| return resp.(*pb.AuthUserDeleteResponse), nil |
| } |
| |
| func (s *EtcdServer) UserChangePassword(ctx context.Context, r *pb.AuthUserChangePasswordRequest) (*pb.AuthUserChangePasswordResponse, error) { |
| resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserChangePassword: r}) |
| if err != nil { |
| return nil, err |
| } |
| return resp.(*pb.AuthUserChangePasswordResponse), nil |
| } |
| |
| func (s *EtcdServer) UserGrantRole(ctx context.Context, r *pb.AuthUserGrantRoleRequest) (*pb.AuthUserGrantRoleResponse, error) { |
| resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserGrantRole: r}) |
| if err != nil { |
| return nil, err |
| } |
| return resp.(*pb.AuthUserGrantRoleResponse), nil |
| } |
| |
| func (s *EtcdServer) UserGet(ctx context.Context, r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, error) { |
| resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserGet: r}) |
| if err != nil { |
| return nil, err |
| } |
| return resp.(*pb.AuthUserGetResponse), nil |
| } |
| |
| func (s *EtcdServer) UserList(ctx context.Context, r *pb.AuthUserListRequest) (*pb.AuthUserListResponse, error) { |
| resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserList: r}) |
| if err != nil { |
| return nil, err |
| } |
| return resp.(*pb.AuthUserListResponse), nil |
| } |
| |
| func (s *EtcdServer) UserRevokeRole(ctx context.Context, r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUserRevokeRoleResponse, error) { |
| resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserRevokeRole: r}) |
| if err != nil { |
| return nil, err |
| } |
| return resp.(*pb.AuthUserRevokeRoleResponse), nil |
| } |
| |
| func (s *EtcdServer) RoleAdd(ctx context.Context, r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, error) { |
| resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleAdd: r}) |
| if err != nil { |
| return nil, err |
| } |
| return resp.(*pb.AuthRoleAddResponse), nil |
| } |
| |
| func (s *EtcdServer) RoleGrantPermission(ctx context.Context, r *pb.AuthRoleGrantPermissionRequest) (*pb.AuthRoleGrantPermissionResponse, error) { |
| resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleGrantPermission: r}) |
| if err != nil { |
| return nil, err |
| } |
| return resp.(*pb.AuthRoleGrantPermissionResponse), nil |
| } |
| |
| func (s *EtcdServer) RoleGet(ctx context.Context, r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, error) { |
| resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleGet: r}) |
| if err != nil { |
| return nil, err |
| } |
| return resp.(*pb.AuthRoleGetResponse), nil |
| } |
| |
| func (s *EtcdServer) RoleList(ctx context.Context, r *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error) { |
| resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleList: r}) |
| if err != nil { |
| return nil, err |
| } |
| return resp.(*pb.AuthRoleListResponse), nil |
| } |
| |
| func (s *EtcdServer) RoleRevokePermission(ctx context.Context, r *pb.AuthRoleRevokePermissionRequest) (*pb.AuthRoleRevokePermissionResponse, error) { |
| resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleRevokePermission: r}) |
| if err != nil { |
| return nil, err |
| } |
| return resp.(*pb.AuthRoleRevokePermissionResponse), nil |
| } |
| |
| func (s *EtcdServer) RoleDelete(ctx context.Context, r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDeleteResponse, error) { |
| resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleDelete: r}) |
| if err != nil { |
| return nil, err |
| } |
| return resp.(*pb.AuthRoleDeleteResponse), nil |
| } |
| |
| func (s *EtcdServer) raftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (proto.Message, error) { |
| result, err := s.processInternalRaftRequestOnce(ctx, r) |
| if err != nil { |
| return nil, err |
| } |
| if result.err != nil { |
| return nil, result.err |
| } |
| return result.resp, nil |
| } |
| |
| func (s *EtcdServer) raftRequest(ctx context.Context, r pb.InternalRaftRequest) (proto.Message, error) { |
| for { |
| resp, err := s.raftRequestOnce(ctx, r) |
| if err != auth.ErrAuthOldRevision { |
| return resp, err |
| } |
| } |
| } |
| |
| // doSerialize handles the auth logic, with permissions checked by "chk", for a serialized request "get". Returns a non-nil error on authentication failure. |
| func (s *EtcdServer) doSerialize(ctx context.Context, chk func(*auth.AuthInfo) error, get func()) error { |
| ai, err := s.AuthInfoFromCtx(ctx) |
| if err != nil { |
| return err |
| } |
| if ai == nil { |
| // chk expects non-nil AuthInfo; use empty credentials |
| ai = &auth.AuthInfo{} |
| } |
| if err = chk(ai); err != nil { |
| return err |
| } |
| // fetch response for serialized request |
| get() |
| // check for stale token revision in case the auth store was updated while |
| // the request has been handled. |
| if ai.Revision != 0 && ai.Revision != s.authStore.Revision() { |
| return auth.ErrAuthOldRevision |
| } |
| return nil |
| } |
| |
| func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (*applyResult, error) { |
| ai := s.getAppliedIndex() |
| ci := s.getCommittedIndex() |
| if ci > ai+maxGapBetweenApplyAndCommitIndex { |
| return nil, ErrTooManyRequests |
| } |
| |
| r.Header = &pb.RequestHeader{ |
| ID: s.reqIDGen.Next(), |
| } |
| |
| authInfo, err := s.AuthInfoFromCtx(ctx) |
| if err != nil { |
| return nil, err |
| } |
| if authInfo != nil { |
| r.Header.Username = authInfo.Username |
| r.Header.AuthRevision = authInfo.Revision |
| } |
| |
| data, err := r.Marshal() |
| if err != nil { |
| return nil, err |
| } |
| |
| if len(data) > int(s.Cfg.MaxRequestBytes) { |
| return nil, ErrRequestTooLarge |
| } |
| |
| id := r.ID |
| if id == 0 { |
| id = r.Header.ID |
| } |
| ch := s.w.Register(id) |
| |
| cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout()) |
| defer cancel() |
| |
| start := time.Now() |
| err = s.r.Propose(cctx, data) |
| if err != nil { |
| proposalsFailed.Inc() |
| s.w.Trigger(id, nil) // GC wait |
| return nil, err |
| } |
| proposalsPending.Inc() |
| defer proposalsPending.Dec() |
| |
| select { |
| case x := <-ch: |
| return x.(*applyResult), nil |
| case <-cctx.Done(): |
| proposalsFailed.Inc() |
| s.w.Trigger(id, nil) // GC wait |
| return nil, s.parseProposeCtxErr(cctx.Err(), start) |
| case <-s.done: |
| return nil, ErrStopped |
| } |
| } |
| |
| // Watchable returns a watchable interface attached to the etcdserver. |
| func (s *EtcdServer) Watchable() mvcc.WatchableKV { return s.KV() } |
| |
| func (s *EtcdServer) linearizableReadLoop() { |
| var rs raft.ReadState |
| |
| for { |
| ctxToSend := make([]byte, 8) |
| id1 := s.reqIDGen.Next() |
| binary.BigEndian.PutUint64(ctxToSend, id1) |
| leaderChangedNotifier := s.leaderChangedNotify() |
| select { |
| case <-leaderChangedNotifier: |
| continue |
| case <-s.readwaitc: |
| case <-s.stopping: |
| return |
| } |
| |
| nextnr := newNotifier() |
| |
| s.readMu.Lock() |
| nr := s.readNotifier |
| s.readNotifier = nextnr |
| s.readMu.Unlock() |
| |
| lg := s.getLogger() |
| cctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout()) |
| if err := s.r.ReadIndex(cctx, ctxToSend); err != nil { |
| cancel() |
| if err == raft.ErrStopped { |
| return |
| } |
| if lg != nil { |
| lg.Warn("failed to get read index from Raft", zap.Error(err)) |
| } else { |
| plog.Errorf("failed to get read index from raft: %v", err) |
| } |
| readIndexFailed.Inc() |
| nr.notify(err) |
| continue |
| } |
| cancel() |
| |
| var ( |
| timeout bool |
| done bool |
| ) |
| for !timeout && !done { |
| select { |
| case rs = <-s.r.readStateC: |
| done = bytes.Equal(rs.RequestCtx, ctxToSend) |
| if !done { |
| // a previous request might time out. now we should ignore the response of it and |
| // continue waiting for the response of the current requests. |
| id2 := uint64(0) |
| if len(rs.RequestCtx) == 8 { |
| id2 = binary.BigEndian.Uint64(rs.RequestCtx) |
| } |
| if lg != nil { |
| lg.Warn( |
| "ignored out-of-date read index response; local node read indexes queueing up and waiting to be in sync with leader", |
| zap.Uint64("sent-request-id", id1), |
| zap.Uint64("received-request-id", id2), |
| ) |
| } else { |
| plog.Warningf("ignored out-of-date read index response; local node read indexes queueing up and waiting to be in sync with leader (request ID want %d, got %d)", id1, id2) |
| } |
| slowReadIndex.Inc() |
| } |
| case <-leaderChangedNotifier: |
| timeout = true |
| readIndexFailed.Inc() |
| // return a retryable error. |
| nr.notify(ErrLeaderChanged) |
| case <-time.After(s.Cfg.ReqTimeout()): |
| if lg != nil { |
| lg.Warn("timed out waiting for read index response (local node might have slow network)", zap.Duration("timeout", s.Cfg.ReqTimeout())) |
| } else { |
| plog.Warningf("timed out waiting for read index response (local node might have slow network)") |
| } |
| nr.notify(ErrTimeout) |
| timeout = true |
| slowReadIndex.Inc() |
| case <-s.stopping: |
| return |
| } |
| } |
| if !done { |
| continue |
| } |
| |
| if ai := s.getAppliedIndex(); ai < rs.Index { |
| select { |
| case <-s.applyWait.Wait(rs.Index): |
| case <-s.stopping: |
| return |
| } |
| } |
| // unblock all l-reads requested at indices before rs.Index |
| nr.notify(nil) |
| } |
| } |
| |
| func (s *EtcdServer) linearizableReadNotify(ctx context.Context) error { |
| s.readMu.RLock() |
| nc := s.readNotifier |
| s.readMu.RUnlock() |
| |
| // signal linearizable loop for current notify if it hasn't been already |
| select { |
| case s.readwaitc <- struct{}{}: |
| default: |
| } |
| |
| // wait for read state notification |
| select { |
| case <-nc.c: |
| return nc.err |
| case <-ctx.Done(): |
| return ctx.Err() |
| case <-s.done: |
| return ErrStopped |
| } |
| } |
| |
| func (s *EtcdServer) AuthInfoFromCtx(ctx context.Context) (*auth.AuthInfo, error) { |
| authInfo, err := s.AuthStore().AuthInfoFromCtx(ctx) |
| if authInfo != nil || err != nil { |
| return authInfo, err |
| } |
| if !s.Cfg.ClientCertAuthEnabled { |
| return nil, nil |
| } |
| authInfo = s.AuthStore().AuthInfoFromTLS(ctx) |
| return authInfo, nil |
| |
| } |