khenaidoo | ab1f7bd | 2019-11-14 14:00:27 -0500 | [diff] [blame] | 1 | // Copyright 2015 The etcd Authors |
| 2 | // |
| 3 | // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | // you may not use this file except in compliance with the License. |
| 5 | // You may obtain a copy of the License at |
| 6 | // |
| 7 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | // |
| 9 | // Unless required by applicable law or agreed to in writing, software |
| 10 | // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | // See the License for the specific language governing permissions and |
| 13 | // limitations under the License. |
| 14 | |
| 15 | package etcdserver |
| 16 | |
| 17 | import ( |
| 18 | "bytes" |
| 19 | "context" |
| 20 | "encoding/binary" |
| 21 | "time" |
| 22 | |
| 23 | "go.etcd.io/etcd/auth" |
| 24 | "go.etcd.io/etcd/etcdserver/api/membership" |
| 25 | pb "go.etcd.io/etcd/etcdserver/etcdserverpb" |
| 26 | "go.etcd.io/etcd/lease" |
| 27 | "go.etcd.io/etcd/lease/leasehttp" |
| 28 | "go.etcd.io/etcd/mvcc" |
| 29 | "go.etcd.io/etcd/raft" |
| 30 | |
| 31 | "github.com/gogo/protobuf/proto" |
| 32 | "go.uber.org/zap" |
| 33 | ) |
| 34 | |
| 35 | const ( |
| 36 | // In the health case, there might be a small gap (10s of entries) between |
| 37 | // the applied index and committed index. |
| 38 | // However, if the committed entries are very heavy to apply, the gap might grow. |
| 39 | // We should stop accepting new proposals if the gap growing to a certain point. |
| 40 | maxGapBetweenApplyAndCommitIndex = 5000 |
| 41 | ) |
| 42 | |
| 43 | type RaftKV interface { |
| 44 | Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) |
| 45 | Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) |
| 46 | DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) |
| 47 | Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) |
| 48 | Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) |
| 49 | } |
| 50 | |
| 51 | type Lessor interface { |
| 52 | // LeaseGrant sends LeaseGrant request to raft and apply it after committed. |
| 53 | LeaseGrant(ctx context.Context, r *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) |
| 54 | // LeaseRevoke sends LeaseRevoke request to raft and apply it after committed. |
| 55 | LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) |
| 56 | |
| 57 | // LeaseRenew renews the lease with given ID. The renewed TTL is returned. Or an error |
| 58 | // is returned. |
| 59 | LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error) |
| 60 | |
| 61 | // LeaseTimeToLive retrieves lease information. |
| 62 | LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) |
| 63 | |
| 64 | // LeaseLeases lists all leases. |
| 65 | LeaseLeases(ctx context.Context, r *pb.LeaseLeasesRequest) (*pb.LeaseLeasesResponse, error) |
| 66 | } |
| 67 | |
| 68 | type Authenticator interface { |
| 69 | AuthEnable(ctx context.Context, r *pb.AuthEnableRequest) (*pb.AuthEnableResponse, error) |
| 70 | AuthDisable(ctx context.Context, r *pb.AuthDisableRequest) (*pb.AuthDisableResponse, error) |
| 71 | Authenticate(ctx context.Context, r *pb.AuthenticateRequest) (*pb.AuthenticateResponse, error) |
| 72 | UserAdd(ctx context.Context, r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, error) |
| 73 | UserDelete(ctx context.Context, r *pb.AuthUserDeleteRequest) (*pb.AuthUserDeleteResponse, error) |
| 74 | UserChangePassword(ctx context.Context, r *pb.AuthUserChangePasswordRequest) (*pb.AuthUserChangePasswordResponse, error) |
| 75 | UserGrantRole(ctx context.Context, r *pb.AuthUserGrantRoleRequest) (*pb.AuthUserGrantRoleResponse, error) |
| 76 | UserGet(ctx context.Context, r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, error) |
| 77 | UserRevokeRole(ctx context.Context, r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUserRevokeRoleResponse, error) |
| 78 | RoleAdd(ctx context.Context, r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, error) |
| 79 | RoleGrantPermission(ctx context.Context, r *pb.AuthRoleGrantPermissionRequest) (*pb.AuthRoleGrantPermissionResponse, error) |
| 80 | RoleGet(ctx context.Context, r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, error) |
| 81 | RoleRevokePermission(ctx context.Context, r *pb.AuthRoleRevokePermissionRequest) (*pb.AuthRoleRevokePermissionResponse, error) |
| 82 | RoleDelete(ctx context.Context, r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDeleteResponse, error) |
| 83 | UserList(ctx context.Context, r *pb.AuthUserListRequest) (*pb.AuthUserListResponse, error) |
| 84 | RoleList(ctx context.Context, r *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error) |
| 85 | } |
| 86 | |
| 87 | func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) { |
| 88 | var resp *pb.RangeResponse |
| 89 | var err error |
| 90 | defer func(start time.Time) { |
| 91 | warnOfExpensiveReadOnlyRangeRequest(s.getLogger(), start, r, resp, err) |
| 92 | }(time.Now()) |
| 93 | |
| 94 | if !r.Serializable { |
| 95 | err = s.linearizableReadNotify(ctx) |
| 96 | if err != nil { |
| 97 | return nil, err |
| 98 | } |
| 99 | } |
| 100 | chk := func(ai *auth.AuthInfo) error { |
| 101 | return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd) |
| 102 | } |
| 103 | |
| 104 | get := func() { resp, err = s.applyV3Base.Range(nil, r) } |
| 105 | if serr := s.doSerialize(ctx, chk, get); serr != nil { |
| 106 | err = serr |
| 107 | return nil, err |
| 108 | } |
| 109 | return resp, err |
| 110 | } |
| 111 | |
| 112 | func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) { |
| 113 | resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Put: r}) |
| 114 | if err != nil { |
| 115 | return nil, err |
| 116 | } |
| 117 | return resp.(*pb.PutResponse), nil |
| 118 | } |
| 119 | |
| 120 | func (s *EtcdServer) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) { |
| 121 | resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{DeleteRange: r}) |
| 122 | if err != nil { |
| 123 | return nil, err |
| 124 | } |
| 125 | return resp.(*pb.DeleteRangeResponse), nil |
| 126 | } |
| 127 | |
| 128 | func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) { |
| 129 | if isTxnReadonly(r) { |
| 130 | if !isTxnSerializable(r) { |
| 131 | err := s.linearizableReadNotify(ctx) |
| 132 | if err != nil { |
| 133 | return nil, err |
| 134 | } |
| 135 | } |
| 136 | var resp *pb.TxnResponse |
| 137 | var err error |
| 138 | chk := func(ai *auth.AuthInfo) error { |
| 139 | return checkTxnAuth(s.authStore, ai, r) |
| 140 | } |
| 141 | |
| 142 | defer func(start time.Time) { |
| 143 | warnOfExpensiveReadOnlyTxnRequest(s.getLogger(), start, r, resp, err) |
| 144 | }(time.Now()) |
| 145 | |
| 146 | get := func() { resp, err = s.applyV3Base.Txn(r) } |
| 147 | if serr := s.doSerialize(ctx, chk, get); serr != nil { |
| 148 | return nil, serr |
| 149 | } |
| 150 | return resp, err |
| 151 | } |
| 152 | |
| 153 | resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Txn: r}) |
| 154 | if err != nil { |
| 155 | return nil, err |
| 156 | } |
| 157 | return resp.(*pb.TxnResponse), nil |
| 158 | } |
| 159 | |
| 160 | func isTxnSerializable(r *pb.TxnRequest) bool { |
| 161 | for _, u := range r.Success { |
| 162 | if r := u.GetRequestRange(); r == nil || !r.Serializable { |
| 163 | return false |
| 164 | } |
| 165 | } |
| 166 | for _, u := range r.Failure { |
| 167 | if r := u.GetRequestRange(); r == nil || !r.Serializable { |
| 168 | return false |
| 169 | } |
| 170 | } |
| 171 | return true |
| 172 | } |
| 173 | |
| 174 | func isTxnReadonly(r *pb.TxnRequest) bool { |
| 175 | for _, u := range r.Success { |
| 176 | if r := u.GetRequestRange(); r == nil { |
| 177 | return false |
| 178 | } |
| 179 | } |
| 180 | for _, u := range r.Failure { |
| 181 | if r := u.GetRequestRange(); r == nil { |
| 182 | return false |
| 183 | } |
| 184 | } |
| 185 | return true |
| 186 | } |
| 187 | |
| 188 | func (s *EtcdServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) { |
| 189 | result, err := s.processInternalRaftRequestOnce(ctx, pb.InternalRaftRequest{Compaction: r}) |
| 190 | if r.Physical && result != nil && result.physc != nil { |
| 191 | <-result.physc |
| 192 | // The compaction is done deleting keys; the hash is now settled |
| 193 | // but the data is not necessarily committed. If there's a crash, |
| 194 | // the hash may revert to a hash prior to compaction completing |
| 195 | // if the compaction resumes. Force the finished compaction to |
| 196 | // commit so it won't resume following a crash. |
| 197 | s.be.ForceCommit() |
| 198 | } |
| 199 | if err != nil { |
| 200 | return nil, err |
| 201 | } |
| 202 | if result.err != nil { |
| 203 | return nil, result.err |
| 204 | } |
| 205 | resp := result.resp.(*pb.CompactionResponse) |
| 206 | if resp == nil { |
| 207 | resp = &pb.CompactionResponse{} |
| 208 | } |
| 209 | if resp.Header == nil { |
| 210 | resp.Header = &pb.ResponseHeader{} |
| 211 | } |
| 212 | resp.Header.Revision = s.kv.Rev() |
| 213 | return resp, nil |
| 214 | } |
| 215 | |
| 216 | func (s *EtcdServer) LeaseGrant(ctx context.Context, r *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) { |
| 217 | // no id given? choose one |
| 218 | for r.ID == int64(lease.NoLease) { |
| 219 | // only use positive int64 id's |
| 220 | r.ID = int64(s.reqIDGen.Next() & ((1 << 63) - 1)) |
| 221 | } |
| 222 | resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseGrant: r}) |
| 223 | if err != nil { |
| 224 | return nil, err |
| 225 | } |
| 226 | return resp.(*pb.LeaseGrantResponse), nil |
| 227 | } |
| 228 | |
| 229 | func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) { |
| 230 | resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseRevoke: r}) |
| 231 | if err != nil { |
| 232 | return nil, err |
| 233 | } |
| 234 | return resp.(*pb.LeaseRevokeResponse), nil |
| 235 | } |
| 236 | |
| 237 | func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error) { |
| 238 | ttl, err := s.lessor.Renew(id) |
| 239 | if err == nil { // already requested to primary lessor(leader) |
| 240 | return ttl, nil |
| 241 | } |
| 242 | if err != lease.ErrNotPrimary { |
| 243 | return -1, err |
| 244 | } |
| 245 | |
| 246 | cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout()) |
| 247 | defer cancel() |
| 248 | |
| 249 | // renewals don't go through raft; forward to leader manually |
| 250 | for cctx.Err() == nil && err != nil { |
| 251 | leader, lerr := s.waitLeader(cctx) |
| 252 | if lerr != nil { |
| 253 | return -1, lerr |
| 254 | } |
| 255 | for _, url := range leader.PeerURLs { |
| 256 | lurl := url + leasehttp.LeasePrefix |
| 257 | ttl, err = leasehttp.RenewHTTP(cctx, id, lurl, s.peerRt) |
| 258 | if err == nil || err == lease.ErrLeaseNotFound { |
| 259 | return ttl, err |
| 260 | } |
| 261 | } |
| 262 | } |
| 263 | |
| 264 | if cctx.Err() == context.DeadlineExceeded { |
| 265 | return -1, ErrTimeout |
| 266 | } |
| 267 | return -1, ErrCanceled |
| 268 | } |
| 269 | |
| 270 | func (s *EtcdServer) LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) { |
| 271 | if s.Leader() == s.ID() { |
| 272 | // primary; timetolive directly from leader |
| 273 | le := s.lessor.Lookup(lease.LeaseID(r.ID)) |
| 274 | if le == nil { |
| 275 | return nil, lease.ErrLeaseNotFound |
| 276 | } |
| 277 | // TODO: fill out ResponseHeader |
| 278 | resp := &pb.LeaseTimeToLiveResponse{Header: &pb.ResponseHeader{}, ID: r.ID, TTL: int64(le.Remaining().Seconds()), GrantedTTL: le.TTL()} |
| 279 | if r.Keys { |
| 280 | ks := le.Keys() |
| 281 | kbs := make([][]byte, len(ks)) |
| 282 | for i := range ks { |
| 283 | kbs[i] = []byte(ks[i]) |
| 284 | } |
| 285 | resp.Keys = kbs |
| 286 | } |
| 287 | return resp, nil |
| 288 | } |
| 289 | |
| 290 | cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout()) |
| 291 | defer cancel() |
| 292 | |
| 293 | // forward to leader |
| 294 | for cctx.Err() == nil { |
| 295 | leader, err := s.waitLeader(cctx) |
| 296 | if err != nil { |
| 297 | return nil, err |
| 298 | } |
| 299 | for _, url := range leader.PeerURLs { |
| 300 | lurl := url + leasehttp.LeaseInternalPrefix |
| 301 | resp, err := leasehttp.TimeToLiveHTTP(cctx, lease.LeaseID(r.ID), r.Keys, lurl, s.peerRt) |
| 302 | if err == nil { |
| 303 | return resp.LeaseTimeToLiveResponse, nil |
| 304 | } |
| 305 | if err == lease.ErrLeaseNotFound { |
| 306 | return nil, err |
| 307 | } |
| 308 | } |
| 309 | } |
| 310 | |
| 311 | if cctx.Err() == context.DeadlineExceeded { |
| 312 | return nil, ErrTimeout |
| 313 | } |
| 314 | return nil, ErrCanceled |
| 315 | } |
| 316 | |
| 317 | func (s *EtcdServer) LeaseLeases(ctx context.Context, r *pb.LeaseLeasesRequest) (*pb.LeaseLeasesResponse, error) { |
| 318 | ls := s.lessor.Leases() |
| 319 | lss := make([]*pb.LeaseStatus, len(ls)) |
| 320 | for i := range ls { |
| 321 | lss[i] = &pb.LeaseStatus{ID: int64(ls[i].ID)} |
| 322 | } |
| 323 | return &pb.LeaseLeasesResponse{Header: newHeader(s), Leases: lss}, nil |
| 324 | } |
| 325 | |
| 326 | func (s *EtcdServer) waitLeader(ctx context.Context) (*membership.Member, error) { |
| 327 | leader := s.cluster.Member(s.Leader()) |
| 328 | for leader == nil { |
| 329 | // wait an election |
| 330 | dur := time.Duration(s.Cfg.ElectionTicks) * time.Duration(s.Cfg.TickMs) * time.Millisecond |
| 331 | select { |
| 332 | case <-time.After(dur): |
| 333 | leader = s.cluster.Member(s.Leader()) |
| 334 | case <-s.stopping: |
| 335 | return nil, ErrStopped |
| 336 | case <-ctx.Done(): |
| 337 | return nil, ErrNoLeader |
| 338 | } |
| 339 | } |
| 340 | if leader == nil || len(leader.PeerURLs) == 0 { |
| 341 | return nil, ErrNoLeader |
| 342 | } |
| 343 | return leader, nil |
| 344 | } |
| 345 | |
| 346 | func (s *EtcdServer) Alarm(ctx context.Context, r *pb.AlarmRequest) (*pb.AlarmResponse, error) { |
| 347 | resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{Alarm: r}) |
| 348 | if err != nil { |
| 349 | return nil, err |
| 350 | } |
| 351 | return resp.(*pb.AlarmResponse), nil |
| 352 | } |
| 353 | |
| 354 | func (s *EtcdServer) AuthEnable(ctx context.Context, r *pb.AuthEnableRequest) (*pb.AuthEnableResponse, error) { |
| 355 | resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{AuthEnable: r}) |
| 356 | if err != nil { |
| 357 | return nil, err |
| 358 | } |
| 359 | return resp.(*pb.AuthEnableResponse), nil |
| 360 | } |
| 361 | |
| 362 | func (s *EtcdServer) AuthDisable(ctx context.Context, r *pb.AuthDisableRequest) (*pb.AuthDisableResponse, error) { |
| 363 | resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthDisable: r}) |
| 364 | if err != nil { |
| 365 | return nil, err |
| 366 | } |
| 367 | return resp.(*pb.AuthDisableResponse), nil |
| 368 | } |
| 369 | |
| 370 | func (s *EtcdServer) Authenticate(ctx context.Context, r *pb.AuthenticateRequest) (*pb.AuthenticateResponse, error) { |
| 371 | if err := s.linearizableReadNotify(ctx); err != nil { |
| 372 | return nil, err |
| 373 | } |
| 374 | |
| 375 | lg := s.getLogger() |
| 376 | |
| 377 | var resp proto.Message |
| 378 | for { |
| 379 | checkedRevision, err := s.AuthStore().CheckPassword(r.Name, r.Password) |
| 380 | if err != nil { |
| 381 | if err != auth.ErrAuthNotEnabled { |
| 382 | if lg != nil { |
| 383 | lg.Warn( |
| 384 | "invalid authentication was requested", |
| 385 | zap.String("user", r.Name), |
| 386 | zap.Error(err), |
| 387 | ) |
| 388 | } else { |
| 389 | plog.Errorf("invalid authentication request to user %s was issued", r.Name) |
| 390 | } |
| 391 | } |
| 392 | return nil, err |
| 393 | } |
| 394 | |
| 395 | st, err := s.AuthStore().GenTokenPrefix() |
| 396 | if err != nil { |
| 397 | return nil, err |
| 398 | } |
| 399 | |
| 400 | internalReq := &pb.InternalAuthenticateRequest{ |
| 401 | Name: r.Name, |
| 402 | Password: r.Password, |
| 403 | SimpleToken: st, |
| 404 | } |
| 405 | |
| 406 | resp, err = s.raftRequestOnce(ctx, pb.InternalRaftRequest{Authenticate: internalReq}) |
| 407 | if err != nil { |
| 408 | return nil, err |
| 409 | } |
| 410 | if checkedRevision == s.AuthStore().Revision() { |
| 411 | break |
| 412 | } |
| 413 | |
| 414 | if lg != nil { |
| 415 | lg.Info("revision when password checked became stale; retrying") |
| 416 | } else { |
| 417 | plog.Infof("revision when password checked is obsolete, retrying") |
| 418 | } |
| 419 | } |
| 420 | |
| 421 | return resp.(*pb.AuthenticateResponse), nil |
| 422 | } |
| 423 | |
| 424 | func (s *EtcdServer) UserAdd(ctx context.Context, r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, error) { |
| 425 | resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserAdd: r}) |
| 426 | if err != nil { |
| 427 | return nil, err |
| 428 | } |
| 429 | return resp.(*pb.AuthUserAddResponse), nil |
| 430 | } |
| 431 | |
| 432 | func (s *EtcdServer) UserDelete(ctx context.Context, r *pb.AuthUserDeleteRequest) (*pb.AuthUserDeleteResponse, error) { |
| 433 | resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserDelete: r}) |
| 434 | if err != nil { |
| 435 | return nil, err |
| 436 | } |
| 437 | return resp.(*pb.AuthUserDeleteResponse), nil |
| 438 | } |
| 439 | |
| 440 | func (s *EtcdServer) UserChangePassword(ctx context.Context, r *pb.AuthUserChangePasswordRequest) (*pb.AuthUserChangePasswordResponse, error) { |
| 441 | resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserChangePassword: r}) |
| 442 | if err != nil { |
| 443 | return nil, err |
| 444 | } |
| 445 | return resp.(*pb.AuthUserChangePasswordResponse), nil |
| 446 | } |
| 447 | |
| 448 | func (s *EtcdServer) UserGrantRole(ctx context.Context, r *pb.AuthUserGrantRoleRequest) (*pb.AuthUserGrantRoleResponse, error) { |
| 449 | resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserGrantRole: r}) |
| 450 | if err != nil { |
| 451 | return nil, err |
| 452 | } |
| 453 | return resp.(*pb.AuthUserGrantRoleResponse), nil |
| 454 | } |
| 455 | |
| 456 | func (s *EtcdServer) UserGet(ctx context.Context, r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, error) { |
| 457 | resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserGet: r}) |
| 458 | if err != nil { |
| 459 | return nil, err |
| 460 | } |
| 461 | return resp.(*pb.AuthUserGetResponse), nil |
| 462 | } |
| 463 | |
| 464 | func (s *EtcdServer) UserList(ctx context.Context, r *pb.AuthUserListRequest) (*pb.AuthUserListResponse, error) { |
| 465 | resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserList: r}) |
| 466 | if err != nil { |
| 467 | return nil, err |
| 468 | } |
| 469 | return resp.(*pb.AuthUserListResponse), nil |
| 470 | } |
| 471 | |
| 472 | func (s *EtcdServer) UserRevokeRole(ctx context.Context, r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUserRevokeRoleResponse, error) { |
| 473 | resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserRevokeRole: r}) |
| 474 | if err != nil { |
| 475 | return nil, err |
| 476 | } |
| 477 | return resp.(*pb.AuthUserRevokeRoleResponse), nil |
| 478 | } |
| 479 | |
| 480 | func (s *EtcdServer) RoleAdd(ctx context.Context, r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, error) { |
| 481 | resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleAdd: r}) |
| 482 | if err != nil { |
| 483 | return nil, err |
| 484 | } |
| 485 | return resp.(*pb.AuthRoleAddResponse), nil |
| 486 | } |
| 487 | |
| 488 | func (s *EtcdServer) RoleGrantPermission(ctx context.Context, r *pb.AuthRoleGrantPermissionRequest) (*pb.AuthRoleGrantPermissionResponse, error) { |
| 489 | resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleGrantPermission: r}) |
| 490 | if err != nil { |
| 491 | return nil, err |
| 492 | } |
| 493 | return resp.(*pb.AuthRoleGrantPermissionResponse), nil |
| 494 | } |
| 495 | |
| 496 | func (s *EtcdServer) RoleGet(ctx context.Context, r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, error) { |
| 497 | resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleGet: r}) |
| 498 | if err != nil { |
| 499 | return nil, err |
| 500 | } |
| 501 | return resp.(*pb.AuthRoleGetResponse), nil |
| 502 | } |
| 503 | |
| 504 | func (s *EtcdServer) RoleList(ctx context.Context, r *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error) { |
| 505 | resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleList: r}) |
| 506 | if err != nil { |
| 507 | return nil, err |
| 508 | } |
| 509 | return resp.(*pb.AuthRoleListResponse), nil |
| 510 | } |
| 511 | |
| 512 | func (s *EtcdServer) RoleRevokePermission(ctx context.Context, r *pb.AuthRoleRevokePermissionRequest) (*pb.AuthRoleRevokePermissionResponse, error) { |
| 513 | resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleRevokePermission: r}) |
| 514 | if err != nil { |
| 515 | return nil, err |
| 516 | } |
| 517 | return resp.(*pb.AuthRoleRevokePermissionResponse), nil |
| 518 | } |
| 519 | |
| 520 | func (s *EtcdServer) RoleDelete(ctx context.Context, r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDeleteResponse, error) { |
| 521 | resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleDelete: r}) |
| 522 | if err != nil { |
| 523 | return nil, err |
| 524 | } |
| 525 | return resp.(*pb.AuthRoleDeleteResponse), nil |
| 526 | } |
| 527 | |
| 528 | func (s *EtcdServer) raftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (proto.Message, error) { |
| 529 | result, err := s.processInternalRaftRequestOnce(ctx, r) |
| 530 | if err != nil { |
| 531 | return nil, err |
| 532 | } |
| 533 | if result.err != nil { |
| 534 | return nil, result.err |
| 535 | } |
| 536 | return result.resp, nil |
| 537 | } |
| 538 | |
| 539 | func (s *EtcdServer) raftRequest(ctx context.Context, r pb.InternalRaftRequest) (proto.Message, error) { |
| 540 | for { |
| 541 | resp, err := s.raftRequestOnce(ctx, r) |
| 542 | if err != auth.ErrAuthOldRevision { |
| 543 | return resp, err |
| 544 | } |
| 545 | } |
| 546 | } |
| 547 | |
| 548 | // doSerialize handles the auth logic, with permissions checked by "chk", for a serialized request "get". Returns a non-nil error on authentication failure. |
| 549 | func (s *EtcdServer) doSerialize(ctx context.Context, chk func(*auth.AuthInfo) error, get func()) error { |
| 550 | ai, err := s.AuthInfoFromCtx(ctx) |
| 551 | if err != nil { |
| 552 | return err |
| 553 | } |
| 554 | if ai == nil { |
| 555 | // chk expects non-nil AuthInfo; use empty credentials |
| 556 | ai = &auth.AuthInfo{} |
| 557 | } |
| 558 | if err = chk(ai); err != nil { |
| 559 | return err |
| 560 | } |
| 561 | // fetch response for serialized request |
| 562 | get() |
| 563 | // check for stale token revision in case the auth store was updated while |
| 564 | // the request has been handled. |
| 565 | if ai.Revision != 0 && ai.Revision != s.authStore.Revision() { |
| 566 | return auth.ErrAuthOldRevision |
| 567 | } |
| 568 | return nil |
| 569 | } |
| 570 | |
| 571 | func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (*applyResult, error) { |
| 572 | ai := s.getAppliedIndex() |
| 573 | ci := s.getCommittedIndex() |
| 574 | if ci > ai+maxGapBetweenApplyAndCommitIndex { |
| 575 | return nil, ErrTooManyRequests |
| 576 | } |
| 577 | |
| 578 | r.Header = &pb.RequestHeader{ |
| 579 | ID: s.reqIDGen.Next(), |
| 580 | } |
| 581 | |
| 582 | authInfo, err := s.AuthInfoFromCtx(ctx) |
| 583 | if err != nil { |
| 584 | return nil, err |
| 585 | } |
| 586 | if authInfo != nil { |
| 587 | r.Header.Username = authInfo.Username |
| 588 | r.Header.AuthRevision = authInfo.Revision |
| 589 | } |
| 590 | |
| 591 | data, err := r.Marshal() |
| 592 | if err != nil { |
| 593 | return nil, err |
| 594 | } |
| 595 | |
| 596 | if len(data) > int(s.Cfg.MaxRequestBytes) { |
| 597 | return nil, ErrRequestTooLarge |
| 598 | } |
| 599 | |
| 600 | id := r.ID |
| 601 | if id == 0 { |
| 602 | id = r.Header.ID |
| 603 | } |
| 604 | ch := s.w.Register(id) |
| 605 | |
| 606 | cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout()) |
| 607 | defer cancel() |
| 608 | |
| 609 | start := time.Now() |
| 610 | err = s.r.Propose(cctx, data) |
| 611 | if err != nil { |
| 612 | proposalsFailed.Inc() |
| 613 | s.w.Trigger(id, nil) // GC wait |
| 614 | return nil, err |
| 615 | } |
| 616 | proposalsPending.Inc() |
| 617 | defer proposalsPending.Dec() |
| 618 | |
| 619 | select { |
| 620 | case x := <-ch: |
| 621 | return x.(*applyResult), nil |
| 622 | case <-cctx.Done(): |
| 623 | proposalsFailed.Inc() |
| 624 | s.w.Trigger(id, nil) // GC wait |
| 625 | return nil, s.parseProposeCtxErr(cctx.Err(), start) |
| 626 | case <-s.done: |
| 627 | return nil, ErrStopped |
| 628 | } |
| 629 | } |
| 630 | |
| 631 | // Watchable returns a watchable interface attached to the etcdserver. |
| 632 | func (s *EtcdServer) Watchable() mvcc.WatchableKV { return s.KV() } |
| 633 | |
| 634 | func (s *EtcdServer) linearizableReadLoop() { |
| 635 | var rs raft.ReadState |
| 636 | |
| 637 | for { |
| 638 | ctxToSend := make([]byte, 8) |
| 639 | id1 := s.reqIDGen.Next() |
| 640 | binary.BigEndian.PutUint64(ctxToSend, id1) |
| 641 | leaderChangedNotifier := s.leaderChangedNotify() |
| 642 | select { |
| 643 | case <-leaderChangedNotifier: |
| 644 | continue |
| 645 | case <-s.readwaitc: |
| 646 | case <-s.stopping: |
| 647 | return |
| 648 | } |
| 649 | |
| 650 | nextnr := newNotifier() |
| 651 | |
| 652 | s.readMu.Lock() |
| 653 | nr := s.readNotifier |
| 654 | s.readNotifier = nextnr |
| 655 | s.readMu.Unlock() |
| 656 | |
| 657 | lg := s.getLogger() |
| 658 | cctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout()) |
| 659 | if err := s.r.ReadIndex(cctx, ctxToSend); err != nil { |
| 660 | cancel() |
| 661 | if err == raft.ErrStopped { |
| 662 | return |
| 663 | } |
| 664 | if lg != nil { |
| 665 | lg.Warn("failed to get read index from Raft", zap.Error(err)) |
| 666 | } else { |
| 667 | plog.Errorf("failed to get read index from raft: %v", err) |
| 668 | } |
| 669 | readIndexFailed.Inc() |
| 670 | nr.notify(err) |
| 671 | continue |
| 672 | } |
| 673 | cancel() |
| 674 | |
| 675 | var ( |
| 676 | timeout bool |
| 677 | done bool |
| 678 | ) |
| 679 | for !timeout && !done { |
| 680 | select { |
| 681 | case rs = <-s.r.readStateC: |
| 682 | done = bytes.Equal(rs.RequestCtx, ctxToSend) |
| 683 | if !done { |
| 684 | // a previous request might time out. now we should ignore the response of it and |
| 685 | // continue waiting for the response of the current requests. |
| 686 | id2 := uint64(0) |
| 687 | if len(rs.RequestCtx) == 8 { |
| 688 | id2 = binary.BigEndian.Uint64(rs.RequestCtx) |
| 689 | } |
| 690 | if lg != nil { |
| 691 | lg.Warn( |
| 692 | "ignored out-of-date read index response; local node read indexes queueing up and waiting to be in sync with leader", |
| 693 | zap.Uint64("sent-request-id", id1), |
| 694 | zap.Uint64("received-request-id", id2), |
| 695 | ) |
| 696 | } else { |
| 697 | plog.Warningf("ignored out-of-date read index response; local node read indexes queueing up and waiting to be in sync with leader (request ID want %d, got %d)", id1, id2) |
| 698 | } |
| 699 | slowReadIndex.Inc() |
| 700 | } |
| 701 | case <-leaderChangedNotifier: |
| 702 | timeout = true |
| 703 | readIndexFailed.Inc() |
| 704 | // return a retryable error. |
| 705 | nr.notify(ErrLeaderChanged) |
| 706 | case <-time.After(s.Cfg.ReqTimeout()): |
| 707 | if lg != nil { |
| 708 | lg.Warn("timed out waiting for read index response (local node might have slow network)", zap.Duration("timeout", s.Cfg.ReqTimeout())) |
| 709 | } else { |
| 710 | plog.Warningf("timed out waiting for read index response (local node might have slow network)") |
| 711 | } |
| 712 | nr.notify(ErrTimeout) |
| 713 | timeout = true |
| 714 | slowReadIndex.Inc() |
| 715 | case <-s.stopping: |
| 716 | return |
| 717 | } |
| 718 | } |
| 719 | if !done { |
| 720 | continue |
| 721 | } |
| 722 | |
| 723 | if ai := s.getAppliedIndex(); ai < rs.Index { |
| 724 | select { |
| 725 | case <-s.applyWait.Wait(rs.Index): |
| 726 | case <-s.stopping: |
| 727 | return |
| 728 | } |
| 729 | } |
| 730 | // unblock all l-reads requested at indices before rs.Index |
| 731 | nr.notify(nil) |
| 732 | } |
| 733 | } |
| 734 | |
| 735 | func (s *EtcdServer) linearizableReadNotify(ctx context.Context) error { |
| 736 | s.readMu.RLock() |
| 737 | nc := s.readNotifier |
| 738 | s.readMu.RUnlock() |
| 739 | |
| 740 | // signal linearizable loop for current notify if it hasn't been already |
| 741 | select { |
| 742 | case s.readwaitc <- struct{}{}: |
| 743 | default: |
| 744 | } |
| 745 | |
| 746 | // wait for read state notification |
| 747 | select { |
| 748 | case <-nc.c: |
| 749 | return nc.err |
| 750 | case <-ctx.Done(): |
| 751 | return ctx.Err() |
| 752 | case <-s.done: |
| 753 | return ErrStopped |
| 754 | } |
| 755 | } |
| 756 | |
| 757 | func (s *EtcdServer) AuthInfoFromCtx(ctx context.Context) (*auth.AuthInfo, error) { |
| 758 | authInfo, err := s.AuthStore().AuthInfoFromCtx(ctx) |
| 759 | if authInfo != nil || err != nil { |
| 760 | return authInfo, err |
| 761 | } |
| 762 | if !s.Cfg.ClientCertAuthEnabled { |
| 763 | return nil, nil |
| 764 | } |
| 765 | authInfo = s.AuthStore().AuthInfoFromTLS(ctx) |
| 766 | return authInfo, nil |
| 767 | |
| 768 | } |