khenaidoo | d948f77 | 2021-08-11 17:49:24 -0400 | [diff] [blame] | 1 | // Copyright 2015 The etcd Authors |
| 2 | // |
| 3 | // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | // you may not use this file except in compliance with the License. |
| 5 | // You may obtain a copy of the License at |
| 6 | // |
| 7 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | // |
| 9 | // Unless required by applicable law or agreed to in writing, software |
| 10 | // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | // See the License for the specific language governing permissions and |
| 13 | // limitations under the License. |
| 14 | |
| 15 | package etcdserver |
| 16 | |
| 17 | import ( |
| 18 | "bytes" |
| 19 | "context" |
| 20 | "encoding/binary" |
| 21 | "time" |
| 22 | |
| 23 | "github.com/coreos/etcd/auth" |
| 24 | pb "github.com/coreos/etcd/etcdserver/etcdserverpb" |
| 25 | "github.com/coreos/etcd/etcdserver/membership" |
| 26 | "github.com/coreos/etcd/lease" |
| 27 | "github.com/coreos/etcd/lease/leasehttp" |
| 28 | "github.com/coreos/etcd/mvcc" |
| 29 | "github.com/coreos/etcd/raft" |
| 30 | |
| 31 | "github.com/gogo/protobuf/proto" |
| 32 | ) |
| 33 | |
| 34 | const ( |
| 35 | // In the health case, there might be a small gap (10s of entries) between |
| 36 | // the applied index and committed index. |
| 37 | // However, if the committed entries are very heavy to apply, the gap might grow. |
| 38 | // We should stop accepting new proposals if the gap growing to a certain point. |
| 39 | maxGapBetweenApplyAndCommitIndex = 5000 |
| 40 | ) |
| 41 | |
| 42 | type RaftKV interface { |
| 43 | Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) |
| 44 | Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) |
| 45 | DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) |
| 46 | Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) |
| 47 | Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) |
| 48 | } |
| 49 | |
| 50 | type Lessor interface { |
| 51 | // LeaseGrant sends LeaseGrant request to raft and apply it after committed. |
| 52 | LeaseGrant(ctx context.Context, r *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) |
| 53 | // LeaseRevoke sends LeaseRevoke request to raft and apply it after committed. |
| 54 | LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) |
| 55 | |
| 56 | // LeaseRenew renews the lease with given ID. The renewed TTL is returned. Or an error |
| 57 | // is returned. |
| 58 | LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error) |
| 59 | |
| 60 | // LeaseTimeToLive retrieves lease information. |
| 61 | LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) |
| 62 | |
| 63 | // LeaseLeases lists all leases. |
| 64 | LeaseLeases(ctx context.Context, r *pb.LeaseLeasesRequest) (*pb.LeaseLeasesResponse, error) |
| 65 | } |
| 66 | |
| 67 | type Authenticator interface { |
| 68 | AuthEnable(ctx context.Context, r *pb.AuthEnableRequest) (*pb.AuthEnableResponse, error) |
| 69 | AuthDisable(ctx context.Context, r *pb.AuthDisableRequest) (*pb.AuthDisableResponse, error) |
| 70 | Authenticate(ctx context.Context, r *pb.AuthenticateRequest) (*pb.AuthenticateResponse, error) |
| 71 | UserAdd(ctx context.Context, r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, error) |
| 72 | UserDelete(ctx context.Context, r *pb.AuthUserDeleteRequest) (*pb.AuthUserDeleteResponse, error) |
| 73 | UserChangePassword(ctx context.Context, r *pb.AuthUserChangePasswordRequest) (*pb.AuthUserChangePasswordResponse, error) |
| 74 | UserGrantRole(ctx context.Context, r *pb.AuthUserGrantRoleRequest) (*pb.AuthUserGrantRoleResponse, error) |
| 75 | UserGet(ctx context.Context, r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, error) |
| 76 | UserRevokeRole(ctx context.Context, r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUserRevokeRoleResponse, error) |
| 77 | RoleAdd(ctx context.Context, r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, error) |
| 78 | RoleGrantPermission(ctx context.Context, r *pb.AuthRoleGrantPermissionRequest) (*pb.AuthRoleGrantPermissionResponse, error) |
| 79 | RoleGet(ctx context.Context, r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, error) |
| 80 | RoleRevokePermission(ctx context.Context, r *pb.AuthRoleRevokePermissionRequest) (*pb.AuthRoleRevokePermissionResponse, error) |
| 81 | RoleDelete(ctx context.Context, r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDeleteResponse, error) |
| 82 | UserList(ctx context.Context, r *pb.AuthUserListRequest) (*pb.AuthUserListResponse, error) |
| 83 | RoleList(ctx context.Context, r *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error) |
| 84 | } |
| 85 | |
| 86 | func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) { |
| 87 | var resp *pb.RangeResponse |
| 88 | var err error |
| 89 | defer func(start time.Time) { |
| 90 | warnOfExpensiveReadOnlyRangeRequest(start, r, resp, err) |
| 91 | }(time.Now()) |
| 92 | |
| 93 | if !r.Serializable { |
| 94 | err = s.linearizableReadNotify(ctx) |
| 95 | if err != nil { |
| 96 | return nil, err |
| 97 | } |
| 98 | } |
| 99 | chk := func(ai *auth.AuthInfo) error { |
| 100 | return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd) |
| 101 | } |
| 102 | |
| 103 | get := func() { resp, err = s.applyV3Base.Range(nil, r) } |
| 104 | if serr := s.doSerialize(ctx, chk, get); serr != nil { |
| 105 | err = serr |
| 106 | return nil, err |
| 107 | } |
| 108 | return resp, err |
| 109 | } |
| 110 | |
| 111 | func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) { |
| 112 | resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Put: r}) |
| 113 | if err != nil { |
| 114 | return nil, err |
| 115 | } |
| 116 | return resp.(*pb.PutResponse), nil |
| 117 | } |
| 118 | |
| 119 | func (s *EtcdServer) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) { |
| 120 | resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{DeleteRange: r}) |
| 121 | if err != nil { |
| 122 | return nil, err |
| 123 | } |
| 124 | return resp.(*pb.DeleteRangeResponse), nil |
| 125 | } |
| 126 | |
| 127 | func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) { |
| 128 | if isTxnReadonly(r) { |
| 129 | if !isTxnSerializable(r) { |
| 130 | err := s.linearizableReadNotify(ctx) |
| 131 | if err != nil { |
| 132 | return nil, err |
| 133 | } |
| 134 | } |
| 135 | var resp *pb.TxnResponse |
| 136 | var err error |
| 137 | chk := func(ai *auth.AuthInfo) error { |
| 138 | return checkTxnAuth(s.authStore, ai, r) |
| 139 | } |
| 140 | |
| 141 | defer func(start time.Time) { |
| 142 | warnOfExpensiveReadOnlyTxnRequest(start, r, resp, err) |
| 143 | }(time.Now()) |
| 144 | |
| 145 | get := func() { resp, err = s.applyV3Base.Txn(r) } |
| 146 | if serr := s.doSerialize(ctx, chk, get); serr != nil { |
| 147 | return nil, serr |
| 148 | } |
| 149 | return resp, err |
| 150 | } |
| 151 | |
| 152 | resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Txn: r}) |
| 153 | if err != nil { |
| 154 | return nil, err |
| 155 | } |
| 156 | return resp.(*pb.TxnResponse), nil |
| 157 | } |
| 158 | |
| 159 | func isTxnSerializable(r *pb.TxnRequest) bool { |
| 160 | for _, u := range r.Success { |
| 161 | if r := u.GetRequestRange(); r == nil || !r.Serializable { |
| 162 | return false |
| 163 | } |
| 164 | } |
| 165 | for _, u := range r.Failure { |
| 166 | if r := u.GetRequestRange(); r == nil || !r.Serializable { |
| 167 | return false |
| 168 | } |
| 169 | } |
| 170 | return true |
| 171 | } |
| 172 | |
| 173 | func isTxnReadonly(r *pb.TxnRequest) bool { |
| 174 | for _, u := range r.Success { |
| 175 | if r := u.GetRequestRange(); r == nil { |
| 176 | return false |
| 177 | } |
| 178 | } |
| 179 | for _, u := range r.Failure { |
| 180 | if r := u.GetRequestRange(); r == nil { |
| 181 | return false |
| 182 | } |
| 183 | } |
| 184 | return true |
| 185 | } |
| 186 | |
| 187 | func (s *EtcdServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) { |
| 188 | result, err := s.processInternalRaftRequestOnce(ctx, pb.InternalRaftRequest{Compaction: r}) |
| 189 | if r.Physical && result != nil && result.physc != nil { |
| 190 | <-result.physc |
| 191 | // The compaction is done deleting keys; the hash is now settled |
| 192 | // but the data is not necessarily committed. If there's a crash, |
| 193 | // the hash may revert to a hash prior to compaction completing |
| 194 | // if the compaction resumes. Force the finished compaction to |
| 195 | // commit so it won't resume following a crash. |
| 196 | s.be.ForceCommit() |
| 197 | } |
| 198 | if err != nil { |
| 199 | return nil, err |
| 200 | } |
| 201 | if result.err != nil { |
| 202 | return nil, result.err |
| 203 | } |
| 204 | resp := result.resp.(*pb.CompactionResponse) |
| 205 | if resp == nil { |
| 206 | resp = &pb.CompactionResponse{} |
| 207 | } |
| 208 | if resp.Header == nil { |
| 209 | resp.Header = &pb.ResponseHeader{} |
| 210 | } |
| 211 | resp.Header.Revision = s.kv.Rev() |
| 212 | return resp, nil |
| 213 | } |
| 214 | |
| 215 | func (s *EtcdServer) LeaseGrant(ctx context.Context, r *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) { |
| 216 | // no id given? choose one |
| 217 | for r.ID == int64(lease.NoLease) { |
| 218 | // only use positive int64 id's |
| 219 | r.ID = int64(s.reqIDGen.Next() & ((1 << 63) - 1)) |
| 220 | } |
| 221 | resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseGrant: r}) |
| 222 | if err != nil { |
| 223 | return nil, err |
| 224 | } |
| 225 | return resp.(*pb.LeaseGrantResponse), nil |
| 226 | } |
| 227 | |
| 228 | func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) { |
| 229 | resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseRevoke: r}) |
| 230 | if err != nil { |
| 231 | return nil, err |
| 232 | } |
| 233 | return resp.(*pb.LeaseRevokeResponse), nil |
| 234 | } |
| 235 | |
| 236 | func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error) { |
| 237 | ttl, err := s.lessor.Renew(id) |
| 238 | if err == nil { // already requested to primary lessor(leader) |
| 239 | return ttl, nil |
| 240 | } |
| 241 | if err != lease.ErrNotPrimary { |
| 242 | return -1, err |
| 243 | } |
| 244 | |
| 245 | cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout()) |
| 246 | defer cancel() |
| 247 | |
| 248 | // renewals don't go through raft; forward to leader manually |
| 249 | for cctx.Err() == nil && err != nil { |
| 250 | leader, lerr := s.waitLeader(cctx) |
| 251 | if lerr != nil { |
| 252 | return -1, lerr |
| 253 | } |
| 254 | for _, url := range leader.PeerURLs { |
| 255 | lurl := url + leasehttp.LeasePrefix |
| 256 | ttl, err = leasehttp.RenewHTTP(cctx, id, lurl, s.peerRt) |
| 257 | if err == nil || err == lease.ErrLeaseNotFound { |
| 258 | return ttl, err |
| 259 | } |
| 260 | } |
| 261 | } |
| 262 | return -1, ErrTimeout |
| 263 | } |
| 264 | |
| 265 | func (s *EtcdServer) LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) { |
| 266 | if s.Leader() == s.ID() { |
| 267 | // primary; timetolive directly from leader |
| 268 | le := s.lessor.Lookup(lease.LeaseID(r.ID)) |
| 269 | if le == nil { |
| 270 | return nil, lease.ErrLeaseNotFound |
| 271 | } |
| 272 | // TODO: fill out ResponseHeader |
| 273 | resp := &pb.LeaseTimeToLiveResponse{Header: &pb.ResponseHeader{}, ID: r.ID, TTL: int64(le.Remaining().Seconds()), GrantedTTL: le.TTL()} |
| 274 | if r.Keys { |
| 275 | ks := le.Keys() |
| 276 | kbs := make([][]byte, len(ks)) |
| 277 | for i := range ks { |
| 278 | kbs[i] = []byte(ks[i]) |
| 279 | } |
| 280 | resp.Keys = kbs |
| 281 | } |
| 282 | return resp, nil |
| 283 | } |
| 284 | |
| 285 | cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout()) |
| 286 | defer cancel() |
| 287 | |
| 288 | // forward to leader |
| 289 | for cctx.Err() == nil { |
| 290 | leader, err := s.waitLeader(cctx) |
| 291 | if err != nil { |
| 292 | return nil, err |
| 293 | } |
| 294 | for _, url := range leader.PeerURLs { |
| 295 | lurl := url + leasehttp.LeaseInternalPrefix |
| 296 | resp, err := leasehttp.TimeToLiveHTTP(cctx, lease.LeaseID(r.ID), r.Keys, lurl, s.peerRt) |
| 297 | if err == nil { |
| 298 | return resp.LeaseTimeToLiveResponse, nil |
| 299 | } |
| 300 | if err == lease.ErrLeaseNotFound { |
| 301 | return nil, err |
| 302 | } |
| 303 | } |
| 304 | } |
| 305 | return nil, ErrTimeout |
| 306 | } |
| 307 | |
| 308 | func (s *EtcdServer) LeaseLeases(ctx context.Context, r *pb.LeaseLeasesRequest) (*pb.LeaseLeasesResponse, error) { |
| 309 | ls := s.lessor.Leases() |
| 310 | lss := make([]*pb.LeaseStatus, len(ls)) |
| 311 | for i := range ls { |
| 312 | lss[i] = &pb.LeaseStatus{ID: int64(ls[i].ID)} |
| 313 | } |
| 314 | return &pb.LeaseLeasesResponse{Header: newHeader(s), Leases: lss}, nil |
| 315 | } |
| 316 | |
| 317 | func (s *EtcdServer) waitLeader(ctx context.Context) (*membership.Member, error) { |
| 318 | leader := s.cluster.Member(s.Leader()) |
| 319 | for leader == nil { |
| 320 | // wait an election |
| 321 | dur := time.Duration(s.Cfg.ElectionTicks) * time.Duration(s.Cfg.TickMs) * time.Millisecond |
| 322 | select { |
| 323 | case <-time.After(dur): |
| 324 | leader = s.cluster.Member(s.Leader()) |
| 325 | case <-s.stopping: |
| 326 | return nil, ErrStopped |
| 327 | case <-ctx.Done(): |
| 328 | return nil, ErrNoLeader |
| 329 | } |
| 330 | } |
| 331 | if leader == nil || len(leader.PeerURLs) == 0 { |
| 332 | return nil, ErrNoLeader |
| 333 | } |
| 334 | return leader, nil |
| 335 | } |
| 336 | |
| 337 | func (s *EtcdServer) Alarm(ctx context.Context, r *pb.AlarmRequest) (*pb.AlarmResponse, error) { |
| 338 | resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{Alarm: r}) |
| 339 | if err != nil { |
| 340 | return nil, err |
| 341 | } |
| 342 | return resp.(*pb.AlarmResponse), nil |
| 343 | } |
| 344 | |
| 345 | func (s *EtcdServer) AuthEnable(ctx context.Context, r *pb.AuthEnableRequest) (*pb.AuthEnableResponse, error) { |
| 346 | resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{AuthEnable: r}) |
| 347 | if err != nil { |
| 348 | return nil, err |
| 349 | } |
| 350 | return resp.(*pb.AuthEnableResponse), nil |
| 351 | } |
| 352 | |
| 353 | func (s *EtcdServer) AuthDisable(ctx context.Context, r *pb.AuthDisableRequest) (*pb.AuthDisableResponse, error) { |
| 354 | resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthDisable: r}) |
| 355 | if err != nil { |
| 356 | return nil, err |
| 357 | } |
| 358 | return resp.(*pb.AuthDisableResponse), nil |
| 359 | } |
| 360 | |
| 361 | func (s *EtcdServer) Authenticate(ctx context.Context, r *pb.AuthenticateRequest) (*pb.AuthenticateResponse, error) { |
| 362 | if err := s.linearizableReadNotify(ctx); err != nil { |
| 363 | return nil, err |
| 364 | } |
| 365 | |
| 366 | var resp proto.Message |
| 367 | for { |
| 368 | checkedRevision, err := s.AuthStore().CheckPassword(r.Name, r.Password) |
| 369 | if err != nil { |
| 370 | if err != auth.ErrAuthNotEnabled { |
| 371 | plog.Errorf("invalid authentication request to user %s was issued", r.Name) |
| 372 | } |
| 373 | return nil, err |
| 374 | } |
| 375 | |
| 376 | st, err := s.AuthStore().GenTokenPrefix() |
| 377 | if err != nil { |
| 378 | return nil, err |
| 379 | } |
| 380 | |
| 381 | // internalReq doesn't need to have Password because the above s.AuthStore().CheckPassword() already did it. |
| 382 | // In addition, it will let a WAL entry not record password as a plain text. |
| 383 | internalReq := &pb.InternalAuthenticateRequest{ |
| 384 | Name: r.Name, |
| 385 | SimpleToken: st, |
| 386 | } |
| 387 | |
| 388 | resp, err = s.raftRequestOnce(ctx, pb.InternalRaftRequest{Authenticate: internalReq}) |
| 389 | if err != nil { |
| 390 | return nil, err |
| 391 | } |
| 392 | if checkedRevision == s.AuthStore().Revision() { |
| 393 | break |
| 394 | } |
| 395 | plog.Infof("revision when password checked is obsolete, retrying") |
| 396 | } |
| 397 | |
| 398 | return resp.(*pb.AuthenticateResponse), nil |
| 399 | } |
| 400 | |
| 401 | func (s *EtcdServer) UserAdd(ctx context.Context, r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, error) { |
| 402 | resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserAdd: r}) |
| 403 | if err != nil { |
| 404 | return nil, err |
| 405 | } |
| 406 | return resp.(*pb.AuthUserAddResponse), nil |
| 407 | } |
| 408 | |
| 409 | func (s *EtcdServer) UserDelete(ctx context.Context, r *pb.AuthUserDeleteRequest) (*pb.AuthUserDeleteResponse, error) { |
| 410 | resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserDelete: r}) |
| 411 | if err != nil { |
| 412 | return nil, err |
| 413 | } |
| 414 | return resp.(*pb.AuthUserDeleteResponse), nil |
| 415 | } |
| 416 | |
| 417 | func (s *EtcdServer) UserChangePassword(ctx context.Context, r *pb.AuthUserChangePasswordRequest) (*pb.AuthUserChangePasswordResponse, error) { |
| 418 | resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserChangePassword: r}) |
| 419 | if err != nil { |
| 420 | return nil, err |
| 421 | } |
| 422 | return resp.(*pb.AuthUserChangePasswordResponse), nil |
| 423 | } |
| 424 | |
| 425 | func (s *EtcdServer) UserGrantRole(ctx context.Context, r *pb.AuthUserGrantRoleRequest) (*pb.AuthUserGrantRoleResponse, error) { |
| 426 | resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserGrantRole: r}) |
| 427 | if err != nil { |
| 428 | return nil, err |
| 429 | } |
| 430 | return resp.(*pb.AuthUserGrantRoleResponse), nil |
| 431 | } |
| 432 | |
| 433 | func (s *EtcdServer) UserGet(ctx context.Context, r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, error) { |
| 434 | resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserGet: r}) |
| 435 | if err != nil { |
| 436 | return nil, err |
| 437 | } |
| 438 | return resp.(*pb.AuthUserGetResponse), nil |
| 439 | } |
| 440 | |
| 441 | func (s *EtcdServer) UserList(ctx context.Context, r *pb.AuthUserListRequest) (*pb.AuthUserListResponse, error) { |
| 442 | resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserList: r}) |
| 443 | if err != nil { |
| 444 | return nil, err |
| 445 | } |
| 446 | return resp.(*pb.AuthUserListResponse), nil |
| 447 | } |
| 448 | |
| 449 | func (s *EtcdServer) UserRevokeRole(ctx context.Context, r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUserRevokeRoleResponse, error) { |
| 450 | resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserRevokeRole: r}) |
| 451 | if err != nil { |
| 452 | return nil, err |
| 453 | } |
| 454 | return resp.(*pb.AuthUserRevokeRoleResponse), nil |
| 455 | } |
| 456 | |
| 457 | func (s *EtcdServer) RoleAdd(ctx context.Context, r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, error) { |
| 458 | resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleAdd: r}) |
| 459 | if err != nil { |
| 460 | return nil, err |
| 461 | } |
| 462 | return resp.(*pb.AuthRoleAddResponse), nil |
| 463 | } |
| 464 | |
| 465 | func (s *EtcdServer) RoleGrantPermission(ctx context.Context, r *pb.AuthRoleGrantPermissionRequest) (*pb.AuthRoleGrantPermissionResponse, error) { |
| 466 | resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleGrantPermission: r}) |
| 467 | if err != nil { |
| 468 | return nil, err |
| 469 | } |
| 470 | return resp.(*pb.AuthRoleGrantPermissionResponse), nil |
| 471 | } |
| 472 | |
| 473 | func (s *EtcdServer) RoleGet(ctx context.Context, r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, error) { |
| 474 | resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleGet: r}) |
| 475 | if err != nil { |
| 476 | return nil, err |
| 477 | } |
| 478 | return resp.(*pb.AuthRoleGetResponse), nil |
| 479 | } |
| 480 | |
| 481 | func (s *EtcdServer) RoleList(ctx context.Context, r *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error) { |
| 482 | resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleList: r}) |
| 483 | if err != nil { |
| 484 | return nil, err |
| 485 | } |
| 486 | return resp.(*pb.AuthRoleListResponse), nil |
| 487 | } |
| 488 | |
| 489 | func (s *EtcdServer) RoleRevokePermission(ctx context.Context, r *pb.AuthRoleRevokePermissionRequest) (*pb.AuthRoleRevokePermissionResponse, error) { |
| 490 | resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleRevokePermission: r}) |
| 491 | if err != nil { |
| 492 | return nil, err |
| 493 | } |
| 494 | return resp.(*pb.AuthRoleRevokePermissionResponse), nil |
| 495 | } |
| 496 | |
| 497 | func (s *EtcdServer) RoleDelete(ctx context.Context, r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDeleteResponse, error) { |
| 498 | resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleDelete: r}) |
| 499 | if err != nil { |
| 500 | return nil, err |
| 501 | } |
| 502 | return resp.(*pb.AuthRoleDeleteResponse), nil |
| 503 | } |
| 504 | |
| 505 | func (s *EtcdServer) raftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (proto.Message, error) { |
| 506 | result, err := s.processInternalRaftRequestOnce(ctx, r) |
| 507 | if err != nil { |
| 508 | return nil, err |
| 509 | } |
| 510 | if result.err != nil { |
| 511 | return nil, result.err |
| 512 | } |
| 513 | return result.resp, nil |
| 514 | } |
| 515 | |
| 516 | func (s *EtcdServer) raftRequest(ctx context.Context, r pb.InternalRaftRequest) (proto.Message, error) { |
| 517 | return s.raftRequestOnce(ctx, r) |
| 518 | } |
| 519 | |
| 520 | // doSerialize handles the auth logic, with permissions checked by "chk", for a serialized request "get". Returns a non-nil error on authentication failure. |
| 521 | func (s *EtcdServer) doSerialize(ctx context.Context, chk func(*auth.AuthInfo) error, get func()) error { |
| 522 | ai, err := s.AuthInfoFromCtx(ctx) |
| 523 | if err != nil { |
| 524 | return err |
| 525 | } |
| 526 | if ai == nil { |
| 527 | // chk expects non-nil AuthInfo; use empty credentials |
| 528 | ai = &auth.AuthInfo{} |
| 529 | } |
| 530 | if err = chk(ai); err != nil { |
| 531 | return err |
| 532 | } |
| 533 | // fetch response for serialized request |
| 534 | get() |
| 535 | // check for stale token revision in case the auth store was updated while |
| 536 | // the request has been handled. |
| 537 | if ai.Revision != 0 && ai.Revision != s.authStore.Revision() { |
| 538 | return auth.ErrAuthOldRevision |
| 539 | } |
| 540 | return nil |
| 541 | } |
| 542 | |
| 543 | func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (*applyResult, error) { |
| 544 | ai := s.getAppliedIndex() |
| 545 | ci := s.getCommittedIndex() |
| 546 | if ci > ai+maxGapBetweenApplyAndCommitIndex { |
| 547 | return nil, ErrTooManyRequests |
| 548 | } |
| 549 | |
| 550 | r.Header = &pb.RequestHeader{ |
| 551 | ID: s.reqIDGen.Next(), |
| 552 | } |
| 553 | |
| 554 | authInfo, err := s.AuthInfoFromCtx(ctx) |
| 555 | if err != nil { |
| 556 | return nil, err |
| 557 | } |
| 558 | if authInfo != nil { |
| 559 | r.Header.Username = authInfo.Username |
| 560 | r.Header.AuthRevision = authInfo.Revision |
| 561 | } |
| 562 | |
| 563 | data, err := r.Marshal() |
| 564 | if err != nil { |
| 565 | return nil, err |
| 566 | } |
| 567 | |
| 568 | if len(data) > int(s.Cfg.MaxRequestBytes) { |
| 569 | return nil, ErrRequestTooLarge |
| 570 | } |
| 571 | |
| 572 | id := r.ID |
| 573 | if id == 0 { |
| 574 | id = r.Header.ID |
| 575 | } |
| 576 | ch := s.w.Register(id) |
| 577 | |
| 578 | cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout()) |
| 579 | defer cancel() |
| 580 | |
| 581 | start := time.Now() |
| 582 | s.r.Propose(cctx, data) |
| 583 | proposalsPending.Inc() |
| 584 | defer proposalsPending.Dec() |
| 585 | |
| 586 | select { |
| 587 | case x := <-ch: |
| 588 | return x.(*applyResult), nil |
| 589 | case <-cctx.Done(): |
| 590 | proposalsFailed.Inc() |
| 591 | s.w.Trigger(id, nil) // GC wait |
| 592 | return nil, s.parseProposeCtxErr(cctx.Err(), start) |
| 593 | case <-s.done: |
| 594 | return nil, ErrStopped |
| 595 | } |
| 596 | } |
| 597 | |
| 598 | // Watchable returns a watchable interface attached to the etcdserver. |
| 599 | func (s *EtcdServer) Watchable() mvcc.WatchableKV { return s.KV() } |
| 600 | |
| 601 | func (s *EtcdServer) linearizableReadLoop() { |
| 602 | var rs raft.ReadState |
| 603 | |
| 604 | for { |
| 605 | ctxToSend := make([]byte, 8) |
| 606 | id1 := s.reqIDGen.Next() |
| 607 | binary.BigEndian.PutUint64(ctxToSend, id1) |
| 608 | |
| 609 | leaderChangedNotifier := s.leaderChangedNotify() |
| 610 | select { |
| 611 | case <-leaderChangedNotifier: |
| 612 | continue |
| 613 | case <-s.readwaitc: |
| 614 | case <-s.stopping: |
| 615 | return |
| 616 | } |
| 617 | |
| 618 | nextnr := newNotifier() |
| 619 | |
| 620 | s.readMu.Lock() |
| 621 | nr := s.readNotifier |
| 622 | s.readNotifier = nextnr |
| 623 | s.readMu.Unlock() |
| 624 | |
| 625 | cctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout()) |
| 626 | if err := s.r.ReadIndex(cctx, ctxToSend); err != nil { |
| 627 | cancel() |
| 628 | if err == raft.ErrStopped { |
| 629 | return |
| 630 | } |
| 631 | plog.Errorf("failed to get read index from raft: %v", err) |
| 632 | readIndexFailed.Inc() |
| 633 | nr.notify(err) |
| 634 | continue |
| 635 | } |
| 636 | cancel() |
| 637 | |
| 638 | var ( |
| 639 | timeout bool |
| 640 | done bool |
| 641 | ) |
| 642 | for !timeout && !done { |
| 643 | select { |
| 644 | case rs = <-s.r.readStateC: |
| 645 | done = bytes.Equal(rs.RequestCtx, ctxToSend) |
| 646 | if !done { |
| 647 | // a previous request might time out. now we should ignore the response of it and |
| 648 | // continue waiting for the response of the current requests. |
| 649 | id2 := uint64(0) |
| 650 | if len(rs.RequestCtx) == 8 { |
| 651 | id2 = binary.BigEndian.Uint64(rs.RequestCtx) |
| 652 | } |
| 653 | plog.Warningf("ignored out-of-date read index response; local node read indexes queueing up and waiting to be in sync with leader (request ID want %d, got %d)", id1, id2) |
| 654 | slowReadIndex.Inc() |
| 655 | } |
| 656 | |
| 657 | case <-leaderChangedNotifier: |
| 658 | timeout = true |
| 659 | readIndexFailed.Inc() |
| 660 | // return a retryable error. |
| 661 | nr.notify(ErrLeaderChanged) |
| 662 | |
| 663 | case <-time.After(s.Cfg.ReqTimeout()): |
| 664 | plog.Warningf("timed out waiting for read index response (local node might have slow network)") |
| 665 | nr.notify(ErrTimeout) |
| 666 | timeout = true |
| 667 | slowReadIndex.Inc() |
| 668 | |
| 669 | case <-s.stopping: |
| 670 | return |
| 671 | } |
| 672 | } |
| 673 | if !done { |
| 674 | continue |
| 675 | } |
| 676 | |
| 677 | if ai := s.getAppliedIndex(); ai < rs.Index { |
| 678 | select { |
| 679 | case <-s.applyWait.Wait(rs.Index): |
| 680 | case <-s.stopping: |
| 681 | return |
| 682 | } |
| 683 | } |
| 684 | // unblock all l-reads requested at indices before rs.Index |
| 685 | nr.notify(nil) |
| 686 | } |
| 687 | } |
| 688 | |
| 689 | func (s *EtcdServer) linearizableReadNotify(ctx context.Context) error { |
| 690 | s.readMu.RLock() |
| 691 | nc := s.readNotifier |
| 692 | s.readMu.RUnlock() |
| 693 | |
| 694 | // signal linearizable loop for current notify if it hasn't been already |
| 695 | select { |
| 696 | case s.readwaitc <- struct{}{}: |
| 697 | default: |
| 698 | } |
| 699 | |
| 700 | // wait for read state notification |
| 701 | select { |
| 702 | case <-nc.c: |
| 703 | return nc.err |
| 704 | case <-ctx.Done(): |
| 705 | return ctx.Err() |
| 706 | case <-s.done: |
| 707 | return ErrStopped |
| 708 | } |
| 709 | } |
| 710 | |
| 711 | func (s *EtcdServer) AuthInfoFromCtx(ctx context.Context) (*auth.AuthInfo, error) { |
| 712 | authInfo, err := s.AuthStore().AuthInfoFromCtx(ctx) |
| 713 | if authInfo != nil || err != nil { |
| 714 | return authInfo, err |
| 715 | } |
| 716 | if !s.Cfg.ClientCertAuthEnabled { |
| 717 | return nil, nil |
| 718 | } |
| 719 | authInfo = s.AuthStore().AuthInfoFromTLS(ctx) |
| 720 | return authInfo, nil |
| 721 | } |