Scott Baker | eee8dd8 | 2019-09-24 12:52:34 -0700 | [diff] [blame] | 1 | // Copyright 2015 The etcd Authors |
| 2 | // |
| 3 | // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | // you may not use this file except in compliance with the License. |
| 5 | // You may obtain a copy of the License at |
| 6 | // |
| 7 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | // |
| 9 | // Unless required by applicable law or agreed to in writing, software |
| 10 | // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | // See the License for the specific language governing permissions and |
| 13 | // limitations under the License. |
| 14 | |
| 15 | package raft |
| 16 | |
| 17 | import ( |
| 18 | "context" |
| 19 | "errors" |
| 20 | |
| 21 | pb "go.etcd.io/etcd/raft/raftpb" |
| 22 | ) |
| 23 | |
| 24 | type SnapshotStatus int |
| 25 | |
| 26 | const ( |
| 27 | SnapshotFinish SnapshotStatus = 1 |
| 28 | SnapshotFailure SnapshotStatus = 2 |
| 29 | ) |
| 30 | |
| 31 | var ( |
| 32 | emptyState = pb.HardState{} |
| 33 | |
| 34 | // ErrStopped is returned by methods on Nodes that have been stopped. |
| 35 | ErrStopped = errors.New("raft: stopped") |
| 36 | ) |
| 37 | |
| 38 | // SoftState provides state that is useful for logging and debugging. |
| 39 | // The state is volatile and does not need to be persisted to the WAL. |
| 40 | type SoftState struct { |
| 41 | Lead uint64 // must use atomic operations to access; keep 64-bit aligned. |
| 42 | RaftState StateType |
| 43 | } |
| 44 | |
| 45 | func (a *SoftState) equal(b *SoftState) bool { |
| 46 | return a.Lead == b.Lead && a.RaftState == b.RaftState |
| 47 | } |
| 48 | |
| 49 | // Ready encapsulates the entries and messages that are ready to read, |
| 50 | // be saved to stable storage, committed or sent to other peers. |
| 51 | // All fields in Ready are read-only. |
| 52 | type Ready struct { |
| 53 | // The current volatile state of a Node. |
| 54 | // SoftState will be nil if there is no update. |
| 55 | // It is not required to consume or store SoftState. |
| 56 | *SoftState |
| 57 | |
| 58 | // The current state of a Node to be saved to stable storage BEFORE |
| 59 | // Messages are sent. |
| 60 | // HardState will be equal to empty state if there is no update. |
| 61 | pb.HardState |
| 62 | |
| 63 | // ReadStates can be used for node to serve linearizable read requests locally |
| 64 | // when its applied index is greater than the index in ReadState. |
| 65 | // Note that the readState will be returned when raft receives msgReadIndex. |
| 66 | // The returned is only valid for the request that requested to read. |
| 67 | ReadStates []ReadState |
| 68 | |
| 69 | // Entries specifies entries to be saved to stable storage BEFORE |
| 70 | // Messages are sent. |
| 71 | Entries []pb.Entry |
| 72 | |
| 73 | // Snapshot specifies the snapshot to be saved to stable storage. |
| 74 | Snapshot pb.Snapshot |
| 75 | |
| 76 | // CommittedEntries specifies entries to be committed to a |
| 77 | // store/state-machine. These have previously been committed to stable |
| 78 | // store. |
| 79 | CommittedEntries []pb.Entry |
| 80 | |
| 81 | // Messages specifies outbound messages to be sent AFTER Entries are |
| 82 | // committed to stable storage. |
| 83 | // If it contains a MsgSnap message, the application MUST report back to raft |
| 84 | // when the snapshot has been received or has failed by calling ReportSnapshot. |
| 85 | Messages []pb.Message |
| 86 | |
| 87 | // MustSync indicates whether the HardState and Entries must be synchronously |
| 88 | // written to disk or if an asynchronous write is permissible. |
| 89 | MustSync bool |
| 90 | } |
| 91 | |
| 92 | func isHardStateEqual(a, b pb.HardState) bool { |
| 93 | return a.Term == b.Term && a.Vote == b.Vote && a.Commit == b.Commit |
| 94 | } |
| 95 | |
| 96 | // IsEmptyHardState returns true if the given HardState is empty. |
| 97 | func IsEmptyHardState(st pb.HardState) bool { |
| 98 | return isHardStateEqual(st, emptyState) |
| 99 | } |
| 100 | |
| 101 | // IsEmptySnap returns true if the given Snapshot is empty. |
| 102 | func IsEmptySnap(sp pb.Snapshot) bool { |
| 103 | return sp.Metadata.Index == 0 |
| 104 | } |
| 105 | |
| 106 | func (rd Ready) containsUpdates() bool { |
| 107 | return rd.SoftState != nil || !IsEmptyHardState(rd.HardState) || |
| 108 | !IsEmptySnap(rd.Snapshot) || len(rd.Entries) > 0 || |
| 109 | len(rd.CommittedEntries) > 0 || len(rd.Messages) > 0 || len(rd.ReadStates) != 0 |
| 110 | } |
| 111 | |
| 112 | // appliedCursor extracts from the Ready the highest index the client has |
| 113 | // applied (once the Ready is confirmed via Advance). If no information is |
| 114 | // contained in the Ready, returns zero. |
| 115 | func (rd Ready) appliedCursor() uint64 { |
| 116 | if n := len(rd.CommittedEntries); n > 0 { |
| 117 | return rd.CommittedEntries[n-1].Index |
| 118 | } |
| 119 | if index := rd.Snapshot.Metadata.Index; index > 0 { |
| 120 | return index |
| 121 | } |
| 122 | return 0 |
| 123 | } |
| 124 | |
| 125 | // Node represents a node in a raft cluster. |
| 126 | type Node interface { |
| 127 | // Tick increments the internal logical clock for the Node by a single tick. Election |
| 128 | // timeouts and heartbeat timeouts are in units of ticks. |
| 129 | Tick() |
| 130 | // Campaign causes the Node to transition to candidate state and start campaigning to become leader. |
| 131 | Campaign(ctx context.Context) error |
| 132 | // Propose proposes that data be appended to the log. Note that proposals can be lost without |
| 133 | // notice, therefore it is user's job to ensure proposal retries. |
| 134 | Propose(ctx context.Context, data []byte) error |
| 135 | // ProposeConfChange proposes config change. |
| 136 | // At most one ConfChange can be in the process of going through consensus. |
| 137 | // Application needs to call ApplyConfChange when applying EntryConfChange type entry. |
| 138 | ProposeConfChange(ctx context.Context, cc pb.ConfChange) error |
| 139 | // Step advances the state machine using the given message. ctx.Err() will be returned, if any. |
| 140 | Step(ctx context.Context, msg pb.Message) error |
| 141 | |
| 142 | // Ready returns a channel that returns the current point-in-time state. |
| 143 | // Users of the Node must call Advance after retrieving the state returned by Ready. |
| 144 | // |
| 145 | // NOTE: No committed entries from the next Ready may be applied until all committed entries |
| 146 | // and snapshots from the previous one have finished. |
| 147 | Ready() <-chan Ready |
| 148 | |
| 149 | // Advance notifies the Node that the application has saved progress up to the last Ready. |
| 150 | // It prepares the node to return the next available Ready. |
| 151 | // |
| 152 | // The application should generally call Advance after it applies the entries in last Ready. |
| 153 | // |
| 154 | // However, as an optimization, the application may call Advance while it is applying the |
| 155 | // commands. For example. when the last Ready contains a snapshot, the application might take |
| 156 | // a long time to apply the snapshot data. To continue receiving Ready without blocking raft |
| 157 | // progress, it can call Advance before finishing applying the last ready. |
| 158 | Advance() |
| 159 | // ApplyConfChange applies config change to the local node. |
| 160 | // Returns an opaque ConfState protobuf which must be recorded |
| 161 | // in snapshots. Will never return nil; it returns a pointer only |
| 162 | // to match MemoryStorage.Compact. |
| 163 | ApplyConfChange(cc pb.ConfChange) *pb.ConfState |
| 164 | |
| 165 | // TransferLeadership attempts to transfer leadership to the given transferee. |
| 166 | TransferLeadership(ctx context.Context, lead, transferee uint64) |
| 167 | |
| 168 | // ReadIndex request a read state. The read state will be set in the ready. |
| 169 | // Read state has a read index. Once the application advances further than the read |
| 170 | // index, any linearizable read requests issued before the read request can be |
| 171 | // processed safely. The read state will have the same rctx attached. |
| 172 | ReadIndex(ctx context.Context, rctx []byte) error |
| 173 | |
| 174 | // Status returns the current status of the raft state machine. |
| 175 | Status() Status |
| 176 | // ReportUnreachable reports the given node is not reachable for the last send. |
| 177 | ReportUnreachable(id uint64) |
| 178 | // ReportSnapshot reports the status of the sent snapshot. The id is the raft ID of the follower |
| 179 | // who is meant to receive the snapshot, and the status is SnapshotFinish or SnapshotFailure. |
| 180 | // Calling ReportSnapshot with SnapshotFinish is a no-op. But, any failure in applying a |
| 181 | // snapshot (for e.g., while streaming it from leader to follower), should be reported to the |
| 182 | // leader with SnapshotFailure. When leader sends a snapshot to a follower, it pauses any raft |
| 183 | // log probes until the follower can apply the snapshot and advance its state. If the follower |
| 184 | // can't do that, for e.g., due to a crash, it could end up in a limbo, never getting any |
| 185 | // updates from the leader. Therefore, it is crucial that the application ensures that any |
| 186 | // failure in snapshot sending is caught and reported back to the leader; so it can resume raft |
| 187 | // log probing in the follower. |
| 188 | ReportSnapshot(id uint64, status SnapshotStatus) |
| 189 | // Stop performs any necessary termination of the Node. |
| 190 | Stop() |
| 191 | } |
| 192 | |
| 193 | type Peer struct { |
| 194 | ID uint64 |
| 195 | Context []byte |
| 196 | } |
| 197 | |
| 198 | // StartNode returns a new Node given configuration and a list of raft peers. |
| 199 | // It appends a ConfChangeAddNode entry for each given peer to the initial log. |
| 200 | func StartNode(c *Config, peers []Peer) Node { |
| 201 | r := newRaft(c) |
| 202 | // become the follower at term 1 and apply initial configuration |
| 203 | // entries of term 1 |
| 204 | r.becomeFollower(1, None) |
| 205 | for _, peer := range peers { |
| 206 | cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context} |
| 207 | d, err := cc.Marshal() |
| 208 | if err != nil { |
| 209 | panic("unexpected marshal error") |
| 210 | } |
| 211 | e := pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: r.raftLog.lastIndex() + 1, Data: d} |
| 212 | r.raftLog.append(e) |
| 213 | } |
| 214 | // Mark these initial entries as committed. |
| 215 | // TODO(bdarnell): These entries are still unstable; do we need to preserve |
| 216 | // the invariant that committed < unstable? |
| 217 | r.raftLog.committed = r.raftLog.lastIndex() |
| 218 | // Now apply them, mainly so that the application can call Campaign |
| 219 | // immediately after StartNode in tests. Note that these nodes will |
| 220 | // be added to raft twice: here and when the application's Ready |
| 221 | // loop calls ApplyConfChange. The calls to addNode must come after |
| 222 | // all calls to raftLog.append so progress.next is set after these |
| 223 | // bootstrapping entries (it is an error if we try to append these |
| 224 | // entries since they have already been committed). |
| 225 | // We do not set raftLog.applied so the application will be able |
| 226 | // to observe all conf changes via Ready.CommittedEntries. |
| 227 | for _, peer := range peers { |
| 228 | r.addNode(peer.ID) |
| 229 | } |
| 230 | |
| 231 | n := newNode() |
| 232 | n.logger = c.Logger |
| 233 | go n.run(r) |
| 234 | return &n |
| 235 | } |
| 236 | |
| 237 | // RestartNode is similar to StartNode but does not take a list of peers. |
| 238 | // The current membership of the cluster will be restored from the Storage. |
| 239 | // If the caller has an existing state machine, pass in the last log index that |
| 240 | // has been applied to it; otherwise use zero. |
| 241 | func RestartNode(c *Config) Node { |
| 242 | r := newRaft(c) |
| 243 | |
| 244 | n := newNode() |
| 245 | n.logger = c.Logger |
| 246 | go n.run(r) |
| 247 | return &n |
| 248 | } |
| 249 | |
| 250 | type msgWithResult struct { |
| 251 | m pb.Message |
| 252 | result chan error |
| 253 | } |
| 254 | |
| 255 | // node is the canonical implementation of the Node interface |
| 256 | type node struct { |
| 257 | propc chan msgWithResult |
| 258 | recvc chan pb.Message |
| 259 | confc chan pb.ConfChange |
| 260 | confstatec chan pb.ConfState |
| 261 | readyc chan Ready |
| 262 | advancec chan struct{} |
| 263 | tickc chan struct{} |
| 264 | done chan struct{} |
| 265 | stop chan struct{} |
| 266 | status chan chan Status |
| 267 | |
| 268 | logger Logger |
| 269 | } |
| 270 | |
| 271 | func newNode() node { |
| 272 | return node{ |
| 273 | propc: make(chan msgWithResult), |
| 274 | recvc: make(chan pb.Message), |
| 275 | confc: make(chan pb.ConfChange), |
| 276 | confstatec: make(chan pb.ConfState), |
| 277 | readyc: make(chan Ready), |
| 278 | advancec: make(chan struct{}), |
| 279 | // make tickc a buffered chan, so raft node can buffer some ticks when the node |
| 280 | // is busy processing raft messages. Raft node will resume process buffered |
| 281 | // ticks when it becomes idle. |
| 282 | tickc: make(chan struct{}, 128), |
| 283 | done: make(chan struct{}), |
| 284 | stop: make(chan struct{}), |
| 285 | status: make(chan chan Status), |
| 286 | } |
| 287 | } |
| 288 | |
| 289 | func (n *node) Stop() { |
| 290 | select { |
| 291 | case n.stop <- struct{}{}: |
| 292 | // Not already stopped, so trigger it |
| 293 | case <-n.done: |
| 294 | // Node has already been stopped - no need to do anything |
| 295 | return |
| 296 | } |
| 297 | // Block until the stop has been acknowledged by run() |
| 298 | <-n.done |
| 299 | } |
| 300 | |
| 301 | func (n *node) run(r *raft) { |
| 302 | var propc chan msgWithResult |
| 303 | var readyc chan Ready |
| 304 | var advancec chan struct{} |
| 305 | var prevLastUnstablei, prevLastUnstablet uint64 |
| 306 | var havePrevLastUnstablei bool |
| 307 | var prevSnapi uint64 |
| 308 | var applyingToI uint64 |
| 309 | var rd Ready |
| 310 | |
| 311 | lead := None |
| 312 | prevSoftSt := r.softState() |
| 313 | prevHardSt := emptyState |
| 314 | |
| 315 | for { |
| 316 | if advancec != nil { |
| 317 | readyc = nil |
| 318 | } else { |
| 319 | rd = newReady(r, prevSoftSt, prevHardSt) |
| 320 | if rd.containsUpdates() { |
| 321 | readyc = n.readyc |
| 322 | } else { |
| 323 | readyc = nil |
| 324 | } |
| 325 | } |
| 326 | |
| 327 | if lead != r.lead { |
| 328 | if r.hasLeader() { |
| 329 | if lead == None { |
| 330 | r.logger.Infof("raft.node: %x elected leader %x at term %d", r.id, r.lead, r.Term) |
| 331 | } else { |
| 332 | r.logger.Infof("raft.node: %x changed leader from %x to %x at term %d", r.id, lead, r.lead, r.Term) |
| 333 | } |
| 334 | propc = n.propc |
| 335 | } else { |
| 336 | r.logger.Infof("raft.node: %x lost leader %x at term %d", r.id, lead, r.Term) |
| 337 | propc = nil |
| 338 | } |
| 339 | lead = r.lead |
| 340 | } |
| 341 | |
| 342 | select { |
| 343 | // TODO: maybe buffer the config propose if there exists one (the way |
| 344 | // described in raft dissertation) |
| 345 | // Currently it is dropped in Step silently. |
| 346 | case pm := <-propc: |
| 347 | m := pm.m |
| 348 | m.From = r.id |
| 349 | err := r.Step(m) |
| 350 | if pm.result != nil { |
| 351 | pm.result <- err |
| 352 | close(pm.result) |
| 353 | } |
| 354 | case m := <-n.recvc: |
| 355 | // filter out response message from unknown From. |
| 356 | if pr := r.getProgress(m.From); pr != nil || !IsResponseMsg(m.Type) { |
| 357 | r.Step(m) |
| 358 | } |
| 359 | case cc := <-n.confc: |
| 360 | if cc.NodeID == None { |
| 361 | select { |
| 362 | case n.confstatec <- pb.ConfState{ |
| 363 | Nodes: r.nodes(), |
| 364 | Learners: r.learnerNodes()}: |
| 365 | case <-n.done: |
| 366 | } |
| 367 | break |
| 368 | } |
| 369 | switch cc.Type { |
| 370 | case pb.ConfChangeAddNode: |
| 371 | r.addNode(cc.NodeID) |
| 372 | case pb.ConfChangeAddLearnerNode: |
| 373 | r.addLearner(cc.NodeID) |
| 374 | case pb.ConfChangeRemoveNode: |
| 375 | // block incoming proposal when local node is |
| 376 | // removed |
| 377 | if cc.NodeID == r.id { |
| 378 | propc = nil |
| 379 | } |
| 380 | r.removeNode(cc.NodeID) |
| 381 | case pb.ConfChangeUpdateNode: |
| 382 | default: |
| 383 | panic("unexpected conf type") |
| 384 | } |
| 385 | select { |
| 386 | case n.confstatec <- pb.ConfState{ |
| 387 | Nodes: r.nodes(), |
| 388 | Learners: r.learnerNodes()}: |
| 389 | case <-n.done: |
| 390 | } |
| 391 | case <-n.tickc: |
| 392 | r.tick() |
| 393 | case readyc <- rd: |
| 394 | if rd.SoftState != nil { |
| 395 | prevSoftSt = rd.SoftState |
| 396 | } |
| 397 | if len(rd.Entries) > 0 { |
| 398 | prevLastUnstablei = rd.Entries[len(rd.Entries)-1].Index |
| 399 | prevLastUnstablet = rd.Entries[len(rd.Entries)-1].Term |
| 400 | havePrevLastUnstablei = true |
| 401 | } |
| 402 | if !IsEmptyHardState(rd.HardState) { |
| 403 | prevHardSt = rd.HardState |
| 404 | } |
| 405 | if !IsEmptySnap(rd.Snapshot) { |
| 406 | prevSnapi = rd.Snapshot.Metadata.Index |
| 407 | } |
| 408 | if index := rd.appliedCursor(); index != 0 { |
| 409 | applyingToI = index |
| 410 | } |
| 411 | |
| 412 | r.msgs = nil |
| 413 | r.readStates = nil |
| 414 | r.reduceUncommittedSize(rd.CommittedEntries) |
| 415 | advancec = n.advancec |
| 416 | case <-advancec: |
| 417 | if applyingToI != 0 { |
| 418 | r.raftLog.appliedTo(applyingToI) |
| 419 | applyingToI = 0 |
| 420 | } |
| 421 | if havePrevLastUnstablei { |
| 422 | r.raftLog.stableTo(prevLastUnstablei, prevLastUnstablet) |
| 423 | havePrevLastUnstablei = false |
| 424 | } |
| 425 | r.raftLog.stableSnapTo(prevSnapi) |
| 426 | advancec = nil |
| 427 | case c := <-n.status: |
| 428 | c <- getStatus(r) |
| 429 | case <-n.stop: |
| 430 | close(n.done) |
| 431 | return |
| 432 | } |
| 433 | } |
| 434 | } |
| 435 | |
| 436 | // Tick increments the internal logical clock for this Node. Election timeouts |
| 437 | // and heartbeat timeouts are in units of ticks. |
| 438 | func (n *node) Tick() { |
| 439 | select { |
| 440 | case n.tickc <- struct{}{}: |
| 441 | case <-n.done: |
| 442 | default: |
| 443 | n.logger.Warningf("A tick missed to fire. Node blocks too long!") |
| 444 | } |
| 445 | } |
| 446 | |
| 447 | func (n *node) Campaign(ctx context.Context) error { return n.step(ctx, pb.Message{Type: pb.MsgHup}) } |
| 448 | |
| 449 | func (n *node) Propose(ctx context.Context, data []byte) error { |
| 450 | return n.stepWait(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}}) |
| 451 | } |
| 452 | |
| 453 | func (n *node) Step(ctx context.Context, m pb.Message) error { |
| 454 | // ignore unexpected local messages receiving over network |
| 455 | if IsLocalMsg(m.Type) { |
| 456 | // TODO: return an error? |
| 457 | return nil |
| 458 | } |
| 459 | return n.step(ctx, m) |
| 460 | } |
| 461 | |
| 462 | func (n *node) ProposeConfChange(ctx context.Context, cc pb.ConfChange) error { |
| 463 | data, err := cc.Marshal() |
| 464 | if err != nil { |
| 465 | return err |
| 466 | } |
| 467 | return n.Step(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange, Data: data}}}) |
| 468 | } |
| 469 | |
| 470 | func (n *node) step(ctx context.Context, m pb.Message) error { |
| 471 | return n.stepWithWaitOption(ctx, m, false) |
| 472 | } |
| 473 | |
| 474 | func (n *node) stepWait(ctx context.Context, m pb.Message) error { |
| 475 | return n.stepWithWaitOption(ctx, m, true) |
| 476 | } |
| 477 | |
| 478 | // Step advances the state machine using msgs. The ctx.Err() will be returned, |
| 479 | // if any. |
| 480 | func (n *node) stepWithWaitOption(ctx context.Context, m pb.Message, wait bool) error { |
| 481 | if m.Type != pb.MsgProp { |
| 482 | select { |
| 483 | case n.recvc <- m: |
| 484 | return nil |
| 485 | case <-ctx.Done(): |
| 486 | return ctx.Err() |
| 487 | case <-n.done: |
| 488 | return ErrStopped |
| 489 | } |
| 490 | } |
| 491 | ch := n.propc |
| 492 | pm := msgWithResult{m: m} |
| 493 | if wait { |
| 494 | pm.result = make(chan error, 1) |
| 495 | } |
| 496 | select { |
| 497 | case ch <- pm: |
| 498 | if !wait { |
| 499 | return nil |
| 500 | } |
| 501 | case <-ctx.Done(): |
| 502 | return ctx.Err() |
| 503 | case <-n.done: |
| 504 | return ErrStopped |
| 505 | } |
| 506 | select { |
| 507 | case err := <-pm.result: |
| 508 | if err != nil { |
| 509 | return err |
| 510 | } |
| 511 | case <-ctx.Done(): |
| 512 | return ctx.Err() |
| 513 | case <-n.done: |
| 514 | return ErrStopped |
| 515 | } |
| 516 | return nil |
| 517 | } |
| 518 | |
| 519 | func (n *node) Ready() <-chan Ready { return n.readyc } |
| 520 | |
| 521 | func (n *node) Advance() { |
| 522 | select { |
| 523 | case n.advancec <- struct{}{}: |
| 524 | case <-n.done: |
| 525 | } |
| 526 | } |
| 527 | |
| 528 | func (n *node) ApplyConfChange(cc pb.ConfChange) *pb.ConfState { |
| 529 | var cs pb.ConfState |
| 530 | select { |
| 531 | case n.confc <- cc: |
| 532 | case <-n.done: |
| 533 | } |
| 534 | select { |
| 535 | case cs = <-n.confstatec: |
| 536 | case <-n.done: |
| 537 | } |
| 538 | return &cs |
| 539 | } |
| 540 | |
| 541 | func (n *node) Status() Status { |
| 542 | c := make(chan Status) |
| 543 | select { |
| 544 | case n.status <- c: |
| 545 | return <-c |
| 546 | case <-n.done: |
| 547 | return Status{} |
| 548 | } |
| 549 | } |
| 550 | |
| 551 | func (n *node) ReportUnreachable(id uint64) { |
| 552 | select { |
| 553 | case n.recvc <- pb.Message{Type: pb.MsgUnreachable, From: id}: |
| 554 | case <-n.done: |
| 555 | } |
| 556 | } |
| 557 | |
| 558 | func (n *node) ReportSnapshot(id uint64, status SnapshotStatus) { |
| 559 | rej := status == SnapshotFailure |
| 560 | |
| 561 | select { |
| 562 | case n.recvc <- pb.Message{Type: pb.MsgSnapStatus, From: id, Reject: rej}: |
| 563 | case <-n.done: |
| 564 | } |
| 565 | } |
| 566 | |
| 567 | func (n *node) TransferLeadership(ctx context.Context, lead, transferee uint64) { |
| 568 | select { |
| 569 | // manually set 'from' and 'to', so that leader can voluntarily transfers its leadership |
| 570 | case n.recvc <- pb.Message{Type: pb.MsgTransferLeader, From: transferee, To: lead}: |
| 571 | case <-n.done: |
| 572 | case <-ctx.Done(): |
| 573 | } |
| 574 | } |
| 575 | |
| 576 | func (n *node) ReadIndex(ctx context.Context, rctx []byte) error { |
| 577 | return n.step(ctx, pb.Message{Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: rctx}}}) |
| 578 | } |
| 579 | |
| 580 | func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready { |
| 581 | rd := Ready{ |
| 582 | Entries: r.raftLog.unstableEntries(), |
| 583 | CommittedEntries: r.raftLog.nextEnts(), |
| 584 | Messages: r.msgs, |
| 585 | } |
| 586 | if softSt := r.softState(); !softSt.equal(prevSoftSt) { |
| 587 | rd.SoftState = softSt |
| 588 | } |
| 589 | if hardSt := r.hardState(); !isHardStateEqual(hardSt, prevHardSt) { |
| 590 | rd.HardState = hardSt |
| 591 | } |
| 592 | if r.raftLog.unstable.snapshot != nil { |
| 593 | rd.Snapshot = *r.raftLog.unstable.snapshot |
| 594 | } |
| 595 | if len(r.readStates) != 0 { |
| 596 | rd.ReadStates = r.readStates |
| 597 | } |
| 598 | rd.MustSync = MustSync(r.hardState(), prevHardSt, len(rd.Entries)) |
| 599 | return rd |
| 600 | } |
| 601 | |
| 602 | // MustSync returns true if the hard state and count of Raft entries indicate |
| 603 | // that a synchronous write to persistent storage is required. |
| 604 | func MustSync(st, prevst pb.HardState, entsnum int) bool { |
| 605 | // Persistent state on all servers: |
| 606 | // (Updated on stable storage before responding to RPCs) |
| 607 | // currentTerm |
| 608 | // votedFor |
| 609 | // log entries[] |
| 610 | return entsnum != 0 || st.Vote != prevst.Vote || st.Term != prevst.Term |
| 611 | } |