khenaidoo | d948f77 | 2021-08-11 17:49:24 -0400 | [diff] [blame^] | 1 | // Copyright 2015 The etcd Authors |
| 2 | // |
| 3 | // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | // you may not use this file except in compliance with the License. |
| 5 | // You may obtain a copy of the License at |
| 6 | // |
| 7 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | // |
| 9 | // Unless required by applicable law or agreed to in writing, software |
| 10 | // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | // See the License for the specific language governing permissions and |
| 13 | // limitations under the License. |
| 14 | |
| 15 | package raft |
| 16 | |
| 17 | import ( |
| 18 | "context" |
| 19 | "errors" |
| 20 | |
| 21 | pb "github.com/coreos/etcd/raft/raftpb" |
| 22 | ) |
| 23 | |
| 24 | type SnapshotStatus int |
| 25 | |
| 26 | const ( |
| 27 | SnapshotFinish SnapshotStatus = 1 |
| 28 | SnapshotFailure SnapshotStatus = 2 |
| 29 | ) |
| 30 | |
| 31 | var ( |
| 32 | emptyState = pb.HardState{} |
| 33 | |
| 34 | // ErrStopped is returned by methods on Nodes that have been stopped. |
| 35 | ErrStopped = errors.New("raft: stopped") |
| 36 | ) |
| 37 | |
| 38 | // SoftState provides state that is useful for logging and debugging. |
| 39 | // The state is volatile and does not need to be persisted to the WAL. |
| 40 | type SoftState struct { |
| 41 | Lead uint64 // must use atomic operations to access; keep 64-bit aligned. |
| 42 | RaftState StateType |
| 43 | } |
| 44 | |
| 45 | func (a *SoftState) equal(b *SoftState) bool { |
| 46 | return a.Lead == b.Lead && a.RaftState == b.RaftState |
| 47 | } |
| 48 | |
| 49 | // Ready encapsulates the entries and messages that are ready to read, |
| 50 | // be saved to stable storage, committed or sent to other peers. |
| 51 | // All fields in Ready are read-only. |
| 52 | type Ready struct { |
| 53 | // The current volatile state of a Node. |
| 54 | // SoftState will be nil if there is no update. |
| 55 | // It is not required to consume or store SoftState. |
| 56 | *SoftState |
| 57 | |
| 58 | // The current state of a Node to be saved to stable storage BEFORE |
| 59 | // Messages are sent. |
| 60 | // HardState will be equal to empty state if there is no update. |
| 61 | pb.HardState |
| 62 | |
| 63 | // ReadStates can be used for node to serve linearizable read requests locally |
| 64 | // when its applied index is greater than the index in ReadState. |
| 65 | // Note that the readState will be returned when raft receives msgReadIndex. |
| 66 | // The returned is only valid for the request that requested to read. |
| 67 | ReadStates []ReadState |
| 68 | |
| 69 | // Entries specifies entries to be saved to stable storage BEFORE |
| 70 | // Messages are sent. |
| 71 | Entries []pb.Entry |
| 72 | |
| 73 | // Snapshot specifies the snapshot to be saved to stable storage. |
| 74 | Snapshot pb.Snapshot |
| 75 | |
| 76 | // CommittedEntries specifies entries to be committed to a |
| 77 | // store/state-machine. These have previously been committed to stable |
| 78 | // store. |
| 79 | CommittedEntries []pb.Entry |
| 80 | |
| 81 | // Messages specifies outbound messages to be sent AFTER Entries are |
| 82 | // committed to stable storage. |
| 83 | // If it contains a MsgSnap message, the application MUST report back to raft |
| 84 | // when the snapshot has been received or has failed by calling ReportSnapshot. |
| 85 | Messages []pb.Message |
| 86 | |
| 87 | // MustSync indicates whether the HardState and Entries must be synchronously |
| 88 | // written to disk or if an asynchronous write is permissible. |
| 89 | MustSync bool |
| 90 | } |
| 91 | |
| 92 | func isHardStateEqual(a, b pb.HardState) bool { |
| 93 | return a.Term == b.Term && a.Vote == b.Vote && a.Commit == b.Commit |
| 94 | } |
| 95 | |
| 96 | // IsEmptyHardState returns true if the given HardState is empty. |
| 97 | func IsEmptyHardState(st pb.HardState) bool { |
| 98 | return isHardStateEqual(st, emptyState) |
| 99 | } |
| 100 | |
| 101 | // IsEmptySnap returns true if the given Snapshot is empty. |
| 102 | func IsEmptySnap(sp pb.Snapshot) bool { |
| 103 | return sp.Metadata.Index == 0 |
| 104 | } |
| 105 | |
| 106 | func (rd Ready) containsUpdates() bool { |
| 107 | return rd.SoftState != nil || !IsEmptyHardState(rd.HardState) || |
| 108 | !IsEmptySnap(rd.Snapshot) || len(rd.Entries) > 0 || |
| 109 | len(rd.CommittedEntries) > 0 || len(rd.Messages) > 0 || len(rd.ReadStates) != 0 |
| 110 | } |
| 111 | |
| 112 | // Node represents a node in a raft cluster. |
| 113 | type Node interface { |
| 114 | // Tick increments the internal logical clock for the Node by a single tick. Election |
| 115 | // timeouts and heartbeat timeouts are in units of ticks. |
| 116 | Tick() |
| 117 | // Campaign causes the Node to transition to candidate state and start campaigning to become leader. |
| 118 | Campaign(ctx context.Context) error |
| 119 | // Propose proposes that data be appended to the log. |
| 120 | Propose(ctx context.Context, data []byte) error |
| 121 | // ProposeConfChange proposes config change. |
| 122 | // At most one ConfChange can be in the process of going through consensus. |
| 123 | // Application needs to call ApplyConfChange when applying EntryConfChange type entry. |
| 124 | ProposeConfChange(ctx context.Context, cc pb.ConfChange) error |
| 125 | // Step advances the state machine using the given message. ctx.Err() will be returned, if any. |
| 126 | Step(ctx context.Context, msg pb.Message) error |
| 127 | |
| 128 | // Ready returns a channel that returns the current point-in-time state. |
| 129 | // Users of the Node must call Advance after retrieving the state returned by Ready. |
| 130 | // |
| 131 | // NOTE: No committed entries from the next Ready may be applied until all committed entries |
| 132 | // and snapshots from the previous one have finished. |
| 133 | Ready() <-chan Ready |
| 134 | |
| 135 | // Advance notifies the Node that the application has saved progress up to the last Ready. |
| 136 | // It prepares the node to return the next available Ready. |
| 137 | // |
| 138 | // The application should generally call Advance after it applies the entries in last Ready. |
| 139 | // |
| 140 | // However, as an optimization, the application may call Advance while it is applying the |
| 141 | // commands. For example. when the last Ready contains a snapshot, the application might take |
| 142 | // a long time to apply the snapshot data. To continue receiving Ready without blocking raft |
| 143 | // progress, it can call Advance before finishing applying the last ready. |
| 144 | Advance() |
| 145 | // ApplyConfChange applies config change to the local node. |
| 146 | // Returns an opaque ConfState protobuf which must be recorded |
| 147 | // in snapshots. Will never return nil; it returns a pointer only |
| 148 | // to match MemoryStorage.Compact. |
| 149 | ApplyConfChange(cc pb.ConfChange) *pb.ConfState |
| 150 | |
| 151 | // TransferLeadership attempts to transfer leadership to the given transferee. |
| 152 | TransferLeadership(ctx context.Context, lead, transferee uint64) |
| 153 | |
| 154 | // ReadIndex request a read state. The read state will be set in the ready. |
| 155 | // Read state has a read index. Once the application advances further than the read |
| 156 | // index, any linearizable read requests issued before the read request can be |
| 157 | // processed safely. The read state will have the same rctx attached. |
| 158 | ReadIndex(ctx context.Context, rctx []byte) error |
| 159 | |
| 160 | // Status returns the current status of the raft state machine. |
| 161 | Status() Status |
| 162 | // ReportUnreachable reports the given node is not reachable for the last send. |
| 163 | ReportUnreachable(id uint64) |
| 164 | // ReportSnapshot reports the status of the sent snapshot. |
| 165 | ReportSnapshot(id uint64, status SnapshotStatus) |
| 166 | // Stop performs any necessary termination of the Node. |
| 167 | Stop() |
| 168 | } |
| 169 | |
| 170 | type Peer struct { |
| 171 | ID uint64 |
| 172 | Context []byte |
| 173 | } |
| 174 | |
| 175 | // StartNode returns a new Node given configuration and a list of raft peers. |
| 176 | // It appends a ConfChangeAddNode entry for each given peer to the initial log. |
| 177 | func StartNode(c *Config, peers []Peer) Node { |
| 178 | r := newRaft(c) |
| 179 | // become the follower at term 1 and apply initial configuration |
| 180 | // entries of term 1 |
| 181 | r.becomeFollower(1, None) |
| 182 | for _, peer := range peers { |
| 183 | cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context} |
| 184 | d, err := cc.Marshal() |
| 185 | if err != nil { |
| 186 | panic("unexpected marshal error") |
| 187 | } |
| 188 | e := pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: r.raftLog.lastIndex() + 1, Data: d} |
| 189 | r.raftLog.append(e) |
| 190 | } |
| 191 | // Mark these initial entries as committed. |
| 192 | // TODO(bdarnell): These entries are still unstable; do we need to preserve |
| 193 | // the invariant that committed < unstable? |
| 194 | r.raftLog.committed = r.raftLog.lastIndex() |
| 195 | // Now apply them, mainly so that the application can call Campaign |
| 196 | // immediately after StartNode in tests. Note that these nodes will |
| 197 | // be added to raft twice: here and when the application's Ready |
| 198 | // loop calls ApplyConfChange. The calls to addNode must come after |
| 199 | // all calls to raftLog.append so progress.next is set after these |
| 200 | // bootstrapping entries (it is an error if we try to append these |
| 201 | // entries since they have already been committed). |
| 202 | // We do not set raftLog.applied so the application will be able |
| 203 | // to observe all conf changes via Ready.CommittedEntries. |
| 204 | for _, peer := range peers { |
| 205 | r.addNode(peer.ID) |
| 206 | } |
| 207 | |
| 208 | n := newNode() |
| 209 | n.logger = c.Logger |
| 210 | go n.run(r) |
| 211 | return &n |
| 212 | } |
| 213 | |
| 214 | // RestartNode is similar to StartNode but does not take a list of peers. |
| 215 | // The current membership of the cluster will be restored from the Storage. |
| 216 | // If the caller has an existing state machine, pass in the last log index that |
| 217 | // has been applied to it; otherwise use zero. |
| 218 | func RestartNode(c *Config) Node { |
| 219 | r := newRaft(c) |
| 220 | |
| 221 | n := newNode() |
| 222 | n.logger = c.Logger |
| 223 | go n.run(r) |
| 224 | return &n |
| 225 | } |
| 226 | |
| 227 | // node is the canonical implementation of the Node interface |
| 228 | type node struct { |
| 229 | propc chan pb.Message |
| 230 | recvc chan pb.Message |
| 231 | confc chan pb.ConfChange |
| 232 | confstatec chan pb.ConfState |
| 233 | readyc chan Ready |
| 234 | advancec chan struct{} |
| 235 | tickc chan struct{} |
| 236 | done chan struct{} |
| 237 | stop chan struct{} |
| 238 | status chan chan Status |
| 239 | |
| 240 | logger Logger |
| 241 | } |
| 242 | |
| 243 | func newNode() node { |
| 244 | return node{ |
| 245 | propc: make(chan pb.Message), |
| 246 | recvc: make(chan pb.Message), |
| 247 | confc: make(chan pb.ConfChange), |
| 248 | confstatec: make(chan pb.ConfState), |
| 249 | readyc: make(chan Ready), |
| 250 | advancec: make(chan struct{}), |
| 251 | // make tickc a buffered chan, so raft node can buffer some ticks when the node |
| 252 | // is busy processing raft messages. Raft node will resume process buffered |
| 253 | // ticks when it becomes idle. |
| 254 | tickc: make(chan struct{}, 128), |
| 255 | done: make(chan struct{}), |
| 256 | stop: make(chan struct{}), |
| 257 | status: make(chan chan Status), |
| 258 | } |
| 259 | } |
| 260 | |
| 261 | func (n *node) Stop() { |
| 262 | select { |
| 263 | case n.stop <- struct{}{}: |
| 264 | // Not already stopped, so trigger it |
| 265 | case <-n.done: |
| 266 | // Node has already been stopped - no need to do anything |
| 267 | return |
| 268 | } |
| 269 | // Block until the stop has been acknowledged by run() |
| 270 | <-n.done |
| 271 | } |
| 272 | |
| 273 | func (n *node) run(r *raft) { |
| 274 | var propc chan pb.Message |
| 275 | var readyc chan Ready |
| 276 | var advancec chan struct{} |
| 277 | var prevLastUnstablei, prevLastUnstablet uint64 |
| 278 | var havePrevLastUnstablei bool |
| 279 | var prevSnapi uint64 |
| 280 | var rd Ready |
| 281 | |
| 282 | lead := None |
| 283 | prevSoftSt := r.softState() |
| 284 | prevHardSt := emptyState |
| 285 | |
| 286 | for { |
| 287 | if advancec != nil { |
| 288 | readyc = nil |
| 289 | } else { |
| 290 | rd = newReady(r, prevSoftSt, prevHardSt) |
| 291 | if rd.containsUpdates() { |
| 292 | readyc = n.readyc |
| 293 | } else { |
| 294 | readyc = nil |
| 295 | } |
| 296 | } |
| 297 | |
| 298 | if lead != r.lead { |
| 299 | if r.hasLeader() { |
| 300 | if lead == None { |
| 301 | r.logger.Infof("raft.node: %x elected leader %x at term %d", r.id, r.lead, r.Term) |
| 302 | } else { |
| 303 | r.logger.Infof("raft.node: %x changed leader from %x to %x at term %d", r.id, lead, r.lead, r.Term) |
| 304 | } |
| 305 | propc = n.propc |
| 306 | } else { |
| 307 | r.logger.Infof("raft.node: %x lost leader %x at term %d", r.id, lead, r.Term) |
| 308 | propc = nil |
| 309 | } |
| 310 | lead = r.lead |
| 311 | } |
| 312 | |
| 313 | select { |
| 314 | // TODO: maybe buffer the config propose if there exists one (the way |
| 315 | // described in raft dissertation) |
| 316 | // Currently it is dropped in Step silently. |
| 317 | case m := <-propc: |
| 318 | m.From = r.id |
| 319 | r.Step(m) |
| 320 | case m := <-n.recvc: |
| 321 | // filter out response message from unknown From. |
| 322 | if pr := r.getProgress(m.From); pr != nil || !IsResponseMsg(m.Type) { |
| 323 | r.Step(m) // raft never returns an error |
| 324 | } |
| 325 | case cc := <-n.confc: |
| 326 | if cc.NodeID == None { |
| 327 | r.resetPendingConf() |
| 328 | select { |
| 329 | case n.confstatec <- pb.ConfState{Nodes: r.nodes()}: |
| 330 | case <-n.done: |
| 331 | } |
| 332 | break |
| 333 | } |
| 334 | switch cc.Type { |
| 335 | case pb.ConfChangeAddNode: |
| 336 | r.addNode(cc.NodeID) |
| 337 | case pb.ConfChangeAddLearnerNode: |
| 338 | r.addLearner(cc.NodeID) |
| 339 | case pb.ConfChangeRemoveNode: |
| 340 | // block incoming proposal when local node is |
| 341 | // removed |
| 342 | if cc.NodeID == r.id { |
| 343 | propc = nil |
| 344 | } |
| 345 | r.removeNode(cc.NodeID) |
| 346 | case pb.ConfChangeUpdateNode: |
| 347 | r.resetPendingConf() |
| 348 | default: |
| 349 | panic("unexpected conf type") |
| 350 | } |
| 351 | select { |
| 352 | case n.confstatec <- pb.ConfState{Nodes: r.nodes()}: |
| 353 | case <-n.done: |
| 354 | } |
| 355 | case <-n.tickc: |
| 356 | r.tick() |
| 357 | case readyc <- rd: |
| 358 | if rd.SoftState != nil { |
| 359 | prevSoftSt = rd.SoftState |
| 360 | } |
| 361 | if len(rd.Entries) > 0 { |
| 362 | prevLastUnstablei = rd.Entries[len(rd.Entries)-1].Index |
| 363 | prevLastUnstablet = rd.Entries[len(rd.Entries)-1].Term |
| 364 | havePrevLastUnstablei = true |
| 365 | } |
| 366 | if !IsEmptyHardState(rd.HardState) { |
| 367 | prevHardSt = rd.HardState |
| 368 | } |
| 369 | if !IsEmptySnap(rd.Snapshot) { |
| 370 | prevSnapi = rd.Snapshot.Metadata.Index |
| 371 | } |
| 372 | |
| 373 | r.msgs = nil |
| 374 | r.readStates = nil |
| 375 | advancec = n.advancec |
| 376 | case <-advancec: |
| 377 | if prevHardSt.Commit != 0 { |
| 378 | r.raftLog.appliedTo(prevHardSt.Commit) |
| 379 | } |
| 380 | if havePrevLastUnstablei { |
| 381 | r.raftLog.stableTo(prevLastUnstablei, prevLastUnstablet) |
| 382 | havePrevLastUnstablei = false |
| 383 | } |
| 384 | r.raftLog.stableSnapTo(prevSnapi) |
| 385 | advancec = nil |
| 386 | case c := <-n.status: |
| 387 | c <- getStatus(r) |
| 388 | case <-n.stop: |
| 389 | close(n.done) |
| 390 | return |
| 391 | } |
| 392 | } |
| 393 | } |
| 394 | |
| 395 | // Tick increments the internal logical clock for this Node. Election timeouts |
| 396 | // and heartbeat timeouts are in units of ticks. |
| 397 | func (n *node) Tick() { |
| 398 | select { |
| 399 | case n.tickc <- struct{}{}: |
| 400 | case <-n.done: |
| 401 | default: |
| 402 | n.logger.Warningf("A tick missed to fire. Node blocks too long!") |
| 403 | } |
| 404 | } |
| 405 | |
| 406 | func (n *node) Campaign(ctx context.Context) error { return n.step(ctx, pb.Message{Type: pb.MsgHup}) } |
| 407 | |
| 408 | func (n *node) Propose(ctx context.Context, data []byte) error { |
| 409 | return n.step(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}}) |
| 410 | } |
| 411 | |
| 412 | func (n *node) Step(ctx context.Context, m pb.Message) error { |
| 413 | // ignore unexpected local messages receiving over network |
| 414 | if IsLocalMsg(m.Type) { |
| 415 | // TODO: return an error? |
| 416 | return nil |
| 417 | } |
| 418 | return n.step(ctx, m) |
| 419 | } |
| 420 | |
| 421 | func (n *node) ProposeConfChange(ctx context.Context, cc pb.ConfChange) error { |
| 422 | data, err := cc.Marshal() |
| 423 | if err != nil { |
| 424 | return err |
| 425 | } |
| 426 | return n.Step(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange, Data: data}}}) |
| 427 | } |
| 428 | |
| 429 | // Step advances the state machine using msgs. The ctx.Err() will be returned, |
| 430 | // if any. |
| 431 | func (n *node) step(ctx context.Context, m pb.Message) error { |
| 432 | ch := n.recvc |
| 433 | if m.Type == pb.MsgProp { |
| 434 | ch = n.propc |
| 435 | } |
| 436 | |
| 437 | select { |
| 438 | case ch <- m: |
| 439 | return nil |
| 440 | case <-ctx.Done(): |
| 441 | return ctx.Err() |
| 442 | case <-n.done: |
| 443 | return ErrStopped |
| 444 | } |
| 445 | } |
| 446 | |
| 447 | func (n *node) Ready() <-chan Ready { return n.readyc } |
| 448 | |
| 449 | func (n *node) Advance() { |
| 450 | select { |
| 451 | case n.advancec <- struct{}{}: |
| 452 | case <-n.done: |
| 453 | } |
| 454 | } |
| 455 | |
| 456 | func (n *node) ApplyConfChange(cc pb.ConfChange) *pb.ConfState { |
| 457 | var cs pb.ConfState |
| 458 | select { |
| 459 | case n.confc <- cc: |
| 460 | case <-n.done: |
| 461 | } |
| 462 | select { |
| 463 | case cs = <-n.confstatec: |
| 464 | case <-n.done: |
| 465 | } |
| 466 | return &cs |
| 467 | } |
| 468 | |
| 469 | func (n *node) Status() Status { |
| 470 | c := make(chan Status) |
| 471 | select { |
| 472 | case n.status <- c: |
| 473 | return <-c |
| 474 | case <-n.done: |
| 475 | return Status{} |
| 476 | } |
| 477 | } |
| 478 | |
| 479 | func (n *node) ReportUnreachable(id uint64) { |
| 480 | select { |
| 481 | case n.recvc <- pb.Message{Type: pb.MsgUnreachable, From: id}: |
| 482 | case <-n.done: |
| 483 | } |
| 484 | } |
| 485 | |
| 486 | func (n *node) ReportSnapshot(id uint64, status SnapshotStatus) { |
| 487 | rej := status == SnapshotFailure |
| 488 | |
| 489 | select { |
| 490 | case n.recvc <- pb.Message{Type: pb.MsgSnapStatus, From: id, Reject: rej}: |
| 491 | case <-n.done: |
| 492 | } |
| 493 | } |
| 494 | |
| 495 | func (n *node) TransferLeadership(ctx context.Context, lead, transferee uint64) { |
| 496 | select { |
| 497 | // manually set 'from' and 'to', so that leader can voluntarily transfers its leadership |
| 498 | case n.recvc <- pb.Message{Type: pb.MsgTransferLeader, From: transferee, To: lead}: |
| 499 | case <-n.done: |
| 500 | case <-ctx.Done(): |
| 501 | } |
| 502 | } |
| 503 | |
| 504 | func (n *node) ReadIndex(ctx context.Context, rctx []byte) error { |
| 505 | return n.step(ctx, pb.Message{Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: rctx}}}) |
| 506 | } |
| 507 | |
| 508 | func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready { |
| 509 | rd := Ready{ |
| 510 | Entries: r.raftLog.unstableEntries(), |
| 511 | CommittedEntries: r.raftLog.nextEnts(), |
| 512 | Messages: r.msgs, |
| 513 | } |
| 514 | if softSt := r.softState(); !softSt.equal(prevSoftSt) { |
| 515 | rd.SoftState = softSt |
| 516 | } |
| 517 | if hardSt := r.hardState(); !isHardStateEqual(hardSt, prevHardSt) { |
| 518 | rd.HardState = hardSt |
| 519 | } |
| 520 | if r.raftLog.unstable.snapshot != nil { |
| 521 | rd.Snapshot = *r.raftLog.unstable.snapshot |
| 522 | } |
| 523 | if len(r.readStates) != 0 { |
| 524 | rd.ReadStates = r.readStates |
| 525 | } |
| 526 | rd.MustSync = MustSync(rd.HardState, prevHardSt, len(rd.Entries)) |
| 527 | return rd |
| 528 | } |
| 529 | |
| 530 | // MustSync returns true if the hard state and count of Raft entries indicate |
| 531 | // that a synchronous write to persistent storage is required. |
| 532 | func MustSync(st, prevst pb.HardState, entsnum int) bool { |
| 533 | // Persistent state on all servers: |
| 534 | // (Updated on stable storage before responding to RPCs) |
| 535 | // currentTerm |
| 536 | // votedFor |
| 537 | // log entries[] |
| 538 | return entsnum != 0 || st.Vote != prevst.Vote || st.Term != prevst.Term |
| 539 | } |