Stephane Barbarie | 260a563 | 2019-02-26 16:12:49 -0500 | [diff] [blame] | 1 | // Copyright 2015 The etcd Authors |
| 2 | // |
| 3 | // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | // you may not use this file except in compliance with the License. |
| 5 | // You may obtain a copy of the License at |
| 6 | // |
| 7 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | // |
| 9 | // Unless required by applicable law or agreed to in writing, software |
| 10 | // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | // See the License for the specific language governing permissions and |
| 13 | // limitations under the License. |
| 14 | |
| 15 | package raft |
| 16 | |
| 17 | import ( |
| 18 | "context" |
| 19 | "errors" |
| 20 | |
| 21 | pb "go.etcd.io/etcd/raft/raftpb" |
| 22 | ) |
| 23 | |
| 24 | type SnapshotStatus int |
| 25 | |
| 26 | const ( |
| 27 | SnapshotFinish SnapshotStatus = 1 |
| 28 | SnapshotFailure SnapshotStatus = 2 |
| 29 | ) |
| 30 | |
| 31 | var ( |
| 32 | emptyState = pb.HardState{} |
| 33 | |
| 34 | // ErrStopped is returned by methods on Nodes that have been stopped. |
| 35 | ErrStopped = errors.New("raft: stopped") |
| 36 | ) |
| 37 | |
| 38 | // SoftState provides state that is useful for logging and debugging. |
| 39 | // The state is volatile and does not need to be persisted to the WAL. |
| 40 | type SoftState struct { |
| 41 | Lead uint64 // must use atomic operations to access; keep 64-bit aligned. |
| 42 | RaftState StateType |
| 43 | } |
| 44 | |
| 45 | func (a *SoftState) equal(b *SoftState) bool { |
| 46 | return a.Lead == b.Lead && a.RaftState == b.RaftState |
| 47 | } |
| 48 | |
| 49 | // Ready encapsulates the entries and messages that are ready to read, |
| 50 | // be saved to stable storage, committed or sent to other peers. |
| 51 | // All fields in Ready are read-only. |
| 52 | type Ready struct { |
| 53 | // The current volatile state of a Node. |
| 54 | // SoftState will be nil if there is no update. |
| 55 | // It is not required to consume or store SoftState. |
| 56 | *SoftState |
| 57 | |
| 58 | // The current state of a Node to be saved to stable storage BEFORE |
| 59 | // Messages are sent. |
| 60 | // HardState will be equal to empty state if there is no update. |
| 61 | pb.HardState |
| 62 | |
| 63 | // ReadStates can be used for node to serve linearizable read requests locally |
| 64 | // when its applied index is greater than the index in ReadState. |
| 65 | // Note that the readState will be returned when raft receives msgReadIndex. |
| 66 | // The returned is only valid for the request that requested to read. |
| 67 | ReadStates []ReadState |
| 68 | |
| 69 | // Entries specifies entries to be saved to stable storage BEFORE |
| 70 | // Messages are sent. |
| 71 | Entries []pb.Entry |
| 72 | |
| 73 | // Snapshot specifies the snapshot to be saved to stable storage. |
| 74 | Snapshot pb.Snapshot |
| 75 | |
| 76 | // CommittedEntries specifies entries to be committed to a |
| 77 | // store/state-machine. These have previously been committed to stable |
| 78 | // store. |
| 79 | CommittedEntries []pb.Entry |
| 80 | |
| 81 | // Messages specifies outbound messages to be sent AFTER Entries are |
| 82 | // committed to stable storage. |
| 83 | // If it contains a MsgSnap message, the application MUST report back to raft |
| 84 | // when the snapshot has been received or has failed by calling ReportSnapshot. |
| 85 | Messages []pb.Message |
| 86 | |
| 87 | // MustSync indicates whether the HardState and Entries must be synchronously |
| 88 | // written to disk or if an asynchronous write is permissible. |
| 89 | MustSync bool |
| 90 | } |
| 91 | |
| 92 | func isHardStateEqual(a, b pb.HardState) bool { |
| 93 | return a.Term == b.Term && a.Vote == b.Vote && a.Commit == b.Commit |
| 94 | } |
| 95 | |
| 96 | // IsEmptyHardState returns true if the given HardState is empty. |
| 97 | func IsEmptyHardState(st pb.HardState) bool { |
| 98 | return isHardStateEqual(st, emptyState) |
| 99 | } |
| 100 | |
| 101 | // IsEmptySnap returns true if the given Snapshot is empty. |
| 102 | func IsEmptySnap(sp pb.Snapshot) bool { |
| 103 | return sp.Metadata.Index == 0 |
| 104 | } |
| 105 | |
| 106 | func (rd Ready) containsUpdates() bool { |
| 107 | return rd.SoftState != nil || !IsEmptyHardState(rd.HardState) || |
| 108 | !IsEmptySnap(rd.Snapshot) || len(rd.Entries) > 0 || |
| 109 | len(rd.CommittedEntries) > 0 || len(rd.Messages) > 0 || len(rd.ReadStates) != 0 |
| 110 | } |
| 111 | |
| 112 | // appliedCursor extracts from the Ready the highest index the client has |
| 113 | // applied (once the Ready is confirmed via Advance). If no information is |
| 114 | // contained in the Ready, returns zero. |
| 115 | func (rd Ready) appliedCursor() uint64 { |
| 116 | if n := len(rd.CommittedEntries); n > 0 { |
| 117 | return rd.CommittedEntries[n-1].Index |
| 118 | } |
| 119 | if index := rd.Snapshot.Metadata.Index; index > 0 { |
| 120 | return index |
| 121 | } |
| 122 | return 0 |
| 123 | } |
| 124 | |
| 125 | // Node represents a node in a raft cluster. |
| 126 | type Node interface { |
| 127 | // Tick increments the internal logical clock for the Node by a single tick. Election |
| 128 | // timeouts and heartbeat timeouts are in units of ticks. |
| 129 | Tick() |
| 130 | // Campaign causes the Node to transition to candidate state and start campaigning to become leader. |
| 131 | Campaign(ctx context.Context) error |
| 132 | // Propose proposes that data be appended to the log. Note that proposals can be lost without |
| 133 | // notice, therefore it is user's job to ensure proposal retries. |
| 134 | Propose(ctx context.Context, data []byte) error |
Scott Baker | beb3cfa | 2019-10-01 14:44:30 -0700 | [diff] [blame^] | 135 | // ProposeConfChange proposes a configuration change. Like any proposal, the |
| 136 | // configuration change may be dropped with or without an error being |
| 137 | // returned. In particular, configuration changes are dropped unless the |
| 138 | // leader has certainty that there is no prior unapplied configuration |
| 139 | // change in its log. |
| 140 | // |
| 141 | // The method accepts either a pb.ConfChange (deprecated) or pb.ConfChangeV2 |
| 142 | // message. The latter allows arbitrary configuration changes via joint |
| 143 | // consensus, notably including replacing a voter. Passing a ConfChangeV2 |
| 144 | // message is only allowed if all Nodes participating in the cluster run a |
| 145 | // version of this library aware of the V2 API. See pb.ConfChangeV2 for |
| 146 | // usage details and semantics. |
| 147 | ProposeConfChange(ctx context.Context, cc pb.ConfChangeI) error |
| 148 | |
Stephane Barbarie | 260a563 | 2019-02-26 16:12:49 -0500 | [diff] [blame] | 149 | // Step advances the state machine using the given message. ctx.Err() will be returned, if any. |
| 150 | Step(ctx context.Context, msg pb.Message) error |
| 151 | |
| 152 | // Ready returns a channel that returns the current point-in-time state. |
| 153 | // Users of the Node must call Advance after retrieving the state returned by Ready. |
| 154 | // |
| 155 | // NOTE: No committed entries from the next Ready may be applied until all committed entries |
| 156 | // and snapshots from the previous one have finished. |
| 157 | Ready() <-chan Ready |
| 158 | |
| 159 | // Advance notifies the Node that the application has saved progress up to the last Ready. |
| 160 | // It prepares the node to return the next available Ready. |
| 161 | // |
| 162 | // The application should generally call Advance after it applies the entries in last Ready. |
| 163 | // |
| 164 | // However, as an optimization, the application may call Advance while it is applying the |
| 165 | // commands. For example. when the last Ready contains a snapshot, the application might take |
| 166 | // a long time to apply the snapshot data. To continue receiving Ready without blocking raft |
| 167 | // progress, it can call Advance before finishing applying the last ready. |
| 168 | Advance() |
Scott Baker | beb3cfa | 2019-10-01 14:44:30 -0700 | [diff] [blame^] | 169 | // ApplyConfChange applies a config change (previously passed to |
| 170 | // ProposeConfChange) to the node. This must be called whenever a config |
| 171 | // change is observed in Ready.CommittedEntries. |
| 172 | // |
| 173 | // Returns an opaque non-nil ConfState protobuf which must be recorded in |
| 174 | // snapshots. |
| 175 | ApplyConfChange(cc pb.ConfChangeI) *pb.ConfState |
Stephane Barbarie | 260a563 | 2019-02-26 16:12:49 -0500 | [diff] [blame] | 176 | |
| 177 | // TransferLeadership attempts to transfer leadership to the given transferee. |
| 178 | TransferLeadership(ctx context.Context, lead, transferee uint64) |
| 179 | |
| 180 | // ReadIndex request a read state. The read state will be set in the ready. |
| 181 | // Read state has a read index. Once the application advances further than the read |
| 182 | // index, any linearizable read requests issued before the read request can be |
| 183 | // processed safely. The read state will have the same rctx attached. |
| 184 | ReadIndex(ctx context.Context, rctx []byte) error |
| 185 | |
| 186 | // Status returns the current status of the raft state machine. |
| 187 | Status() Status |
| 188 | // ReportUnreachable reports the given node is not reachable for the last send. |
| 189 | ReportUnreachable(id uint64) |
| 190 | // ReportSnapshot reports the status of the sent snapshot. The id is the raft ID of the follower |
| 191 | // who is meant to receive the snapshot, and the status is SnapshotFinish or SnapshotFailure. |
| 192 | // Calling ReportSnapshot with SnapshotFinish is a no-op. But, any failure in applying a |
| 193 | // snapshot (for e.g., while streaming it from leader to follower), should be reported to the |
| 194 | // leader with SnapshotFailure. When leader sends a snapshot to a follower, it pauses any raft |
| 195 | // log probes until the follower can apply the snapshot and advance its state. If the follower |
| 196 | // can't do that, for e.g., due to a crash, it could end up in a limbo, never getting any |
| 197 | // updates from the leader. Therefore, it is crucial that the application ensures that any |
| 198 | // failure in snapshot sending is caught and reported back to the leader; so it can resume raft |
| 199 | // log probing in the follower. |
| 200 | ReportSnapshot(id uint64, status SnapshotStatus) |
| 201 | // Stop performs any necessary termination of the Node. |
| 202 | Stop() |
| 203 | } |
| 204 | |
| 205 | type Peer struct { |
| 206 | ID uint64 |
| 207 | Context []byte |
| 208 | } |
| 209 | |
| 210 | // StartNode returns a new Node given configuration and a list of raft peers. |
| 211 | // It appends a ConfChangeAddNode entry for each given peer to the initial log. |
Scott Baker | beb3cfa | 2019-10-01 14:44:30 -0700 | [diff] [blame^] | 212 | // |
| 213 | // Peers must not be zero length; call RestartNode in that case. |
Stephane Barbarie | 260a563 | 2019-02-26 16:12:49 -0500 | [diff] [blame] | 214 | func StartNode(c *Config, peers []Peer) Node { |
Scott Baker | beb3cfa | 2019-10-01 14:44:30 -0700 | [diff] [blame^] | 215 | if len(peers) == 0 { |
| 216 | panic("no peers given; use RestartNode instead") |
Stephane Barbarie | 260a563 | 2019-02-26 16:12:49 -0500 | [diff] [blame] | 217 | } |
Scott Baker | beb3cfa | 2019-10-01 14:44:30 -0700 | [diff] [blame^] | 218 | rn, err := NewRawNode(c) |
| 219 | if err != nil { |
| 220 | panic(err) |
Stephane Barbarie | 260a563 | 2019-02-26 16:12:49 -0500 | [diff] [blame] | 221 | } |
Scott Baker | beb3cfa | 2019-10-01 14:44:30 -0700 | [diff] [blame^] | 222 | rn.Bootstrap(peers) |
Stephane Barbarie | 260a563 | 2019-02-26 16:12:49 -0500 | [diff] [blame] | 223 | |
Scott Baker | beb3cfa | 2019-10-01 14:44:30 -0700 | [diff] [blame^] | 224 | n := newNode(rn) |
| 225 | |
| 226 | go n.run() |
Stephane Barbarie | 260a563 | 2019-02-26 16:12:49 -0500 | [diff] [blame] | 227 | return &n |
| 228 | } |
| 229 | |
| 230 | // RestartNode is similar to StartNode but does not take a list of peers. |
| 231 | // The current membership of the cluster will be restored from the Storage. |
| 232 | // If the caller has an existing state machine, pass in the last log index that |
| 233 | // has been applied to it; otherwise use zero. |
| 234 | func RestartNode(c *Config) Node { |
Scott Baker | beb3cfa | 2019-10-01 14:44:30 -0700 | [diff] [blame^] | 235 | rn, err := NewRawNode(c) |
| 236 | if err != nil { |
| 237 | panic(err) |
| 238 | } |
| 239 | n := newNode(rn) |
| 240 | go n.run() |
Stephane Barbarie | 260a563 | 2019-02-26 16:12:49 -0500 | [diff] [blame] | 241 | return &n |
| 242 | } |
| 243 | |
| 244 | type msgWithResult struct { |
| 245 | m pb.Message |
| 246 | result chan error |
| 247 | } |
| 248 | |
| 249 | // node is the canonical implementation of the Node interface |
| 250 | type node struct { |
| 251 | propc chan msgWithResult |
| 252 | recvc chan pb.Message |
Scott Baker | beb3cfa | 2019-10-01 14:44:30 -0700 | [diff] [blame^] | 253 | confc chan pb.ConfChangeV2 |
Stephane Barbarie | 260a563 | 2019-02-26 16:12:49 -0500 | [diff] [blame] | 254 | confstatec chan pb.ConfState |
| 255 | readyc chan Ready |
| 256 | advancec chan struct{} |
| 257 | tickc chan struct{} |
| 258 | done chan struct{} |
| 259 | stop chan struct{} |
| 260 | status chan chan Status |
| 261 | |
Scott Baker | beb3cfa | 2019-10-01 14:44:30 -0700 | [diff] [blame^] | 262 | rn *RawNode |
Stephane Barbarie | 260a563 | 2019-02-26 16:12:49 -0500 | [diff] [blame] | 263 | } |
| 264 | |
Scott Baker | beb3cfa | 2019-10-01 14:44:30 -0700 | [diff] [blame^] | 265 | func newNode(rn *RawNode) node { |
Stephane Barbarie | 260a563 | 2019-02-26 16:12:49 -0500 | [diff] [blame] | 266 | return node{ |
| 267 | propc: make(chan msgWithResult), |
| 268 | recvc: make(chan pb.Message), |
Scott Baker | beb3cfa | 2019-10-01 14:44:30 -0700 | [diff] [blame^] | 269 | confc: make(chan pb.ConfChangeV2), |
Stephane Barbarie | 260a563 | 2019-02-26 16:12:49 -0500 | [diff] [blame] | 270 | confstatec: make(chan pb.ConfState), |
| 271 | readyc: make(chan Ready), |
| 272 | advancec: make(chan struct{}), |
| 273 | // make tickc a buffered chan, so raft node can buffer some ticks when the node |
| 274 | // is busy processing raft messages. Raft node will resume process buffered |
| 275 | // ticks when it becomes idle. |
| 276 | tickc: make(chan struct{}, 128), |
| 277 | done: make(chan struct{}), |
| 278 | stop: make(chan struct{}), |
| 279 | status: make(chan chan Status), |
Scott Baker | beb3cfa | 2019-10-01 14:44:30 -0700 | [diff] [blame^] | 280 | rn: rn, |
Stephane Barbarie | 260a563 | 2019-02-26 16:12:49 -0500 | [diff] [blame] | 281 | } |
| 282 | } |
| 283 | |
| 284 | func (n *node) Stop() { |
| 285 | select { |
| 286 | case n.stop <- struct{}{}: |
| 287 | // Not already stopped, so trigger it |
| 288 | case <-n.done: |
| 289 | // Node has already been stopped - no need to do anything |
| 290 | return |
| 291 | } |
| 292 | // Block until the stop has been acknowledged by run() |
| 293 | <-n.done |
| 294 | } |
| 295 | |
Scott Baker | beb3cfa | 2019-10-01 14:44:30 -0700 | [diff] [blame^] | 296 | func (n *node) run() { |
Stephane Barbarie | 260a563 | 2019-02-26 16:12:49 -0500 | [diff] [blame] | 297 | var propc chan msgWithResult |
| 298 | var readyc chan Ready |
| 299 | var advancec chan struct{} |
Stephane Barbarie | 260a563 | 2019-02-26 16:12:49 -0500 | [diff] [blame] | 300 | var rd Ready |
| 301 | |
Scott Baker | beb3cfa | 2019-10-01 14:44:30 -0700 | [diff] [blame^] | 302 | r := n.rn.raft |
| 303 | |
Stephane Barbarie | 260a563 | 2019-02-26 16:12:49 -0500 | [diff] [blame] | 304 | lead := None |
Stephane Barbarie | 260a563 | 2019-02-26 16:12:49 -0500 | [diff] [blame] | 305 | |
| 306 | for { |
| 307 | if advancec != nil { |
| 308 | readyc = nil |
Scott Baker | beb3cfa | 2019-10-01 14:44:30 -0700 | [diff] [blame^] | 309 | } else if n.rn.HasReady() { |
| 310 | // Populate a Ready. Note that this Ready is not guaranteed to |
| 311 | // actually be handled. We will arm readyc, but there's no guarantee |
| 312 | // that we will actually send on it. It's possible that we will |
| 313 | // service another channel instead, loop around, and then populate |
| 314 | // the Ready again. We could instead force the previous Ready to be |
| 315 | // handled first, but it's generally good to emit larger Readys plus |
| 316 | // it simplifies testing (by emitting less frequently and more |
| 317 | // predictably). |
| 318 | rd = n.rn.readyWithoutAccept() |
| 319 | readyc = n.readyc |
Stephane Barbarie | 260a563 | 2019-02-26 16:12:49 -0500 | [diff] [blame] | 320 | } |
| 321 | |
| 322 | if lead != r.lead { |
| 323 | if r.hasLeader() { |
| 324 | if lead == None { |
| 325 | r.logger.Infof("raft.node: %x elected leader %x at term %d", r.id, r.lead, r.Term) |
| 326 | } else { |
| 327 | r.logger.Infof("raft.node: %x changed leader from %x to %x at term %d", r.id, lead, r.lead, r.Term) |
| 328 | } |
| 329 | propc = n.propc |
| 330 | } else { |
| 331 | r.logger.Infof("raft.node: %x lost leader %x at term %d", r.id, lead, r.Term) |
| 332 | propc = nil |
| 333 | } |
| 334 | lead = r.lead |
| 335 | } |
| 336 | |
| 337 | select { |
| 338 | // TODO: maybe buffer the config propose if there exists one (the way |
| 339 | // described in raft dissertation) |
| 340 | // Currently it is dropped in Step silently. |
| 341 | case pm := <-propc: |
| 342 | m := pm.m |
| 343 | m.From = r.id |
| 344 | err := r.Step(m) |
| 345 | if pm.result != nil { |
| 346 | pm.result <- err |
| 347 | close(pm.result) |
| 348 | } |
| 349 | case m := <-n.recvc: |
| 350 | // filter out response message from unknown From. |
Scott Baker | beb3cfa | 2019-10-01 14:44:30 -0700 | [diff] [blame^] | 351 | if pr := r.prs.Progress[m.From]; pr != nil || !IsResponseMsg(m.Type) { |
Stephane Barbarie | 260a563 | 2019-02-26 16:12:49 -0500 | [diff] [blame] | 352 | r.Step(m) |
| 353 | } |
| 354 | case cc := <-n.confc: |
Scott Baker | beb3cfa | 2019-10-01 14:44:30 -0700 | [diff] [blame^] | 355 | _, okBefore := r.prs.Progress[r.id] |
| 356 | cs := r.applyConfChange(cc) |
| 357 | // If the node was removed, block incoming proposals. Note that we |
| 358 | // only do this if the node was in the config before. Nodes may be |
| 359 | // a member of the group without knowing this (when they're catching |
| 360 | // up on the log and don't have the latest config) and we don't want |
| 361 | // to block the proposal channel in that case. |
| 362 | // |
| 363 | // NB: propc is reset when the leader changes, which, if we learn |
| 364 | // about it, sort of implies that we got readded, maybe? This isn't |
| 365 | // very sound and likely has bugs. |
| 366 | if _, okAfter := r.prs.Progress[r.id]; okBefore && !okAfter { |
| 367 | var found bool |
| 368 | for _, sl := range [][]uint64{cs.Voters, cs.VotersOutgoing} { |
| 369 | for _, id := range sl { |
| 370 | if id == r.id { |
| 371 | found = true |
| 372 | } |
| 373 | } |
Stephane Barbarie | 260a563 | 2019-02-26 16:12:49 -0500 | [diff] [blame] | 374 | } |
Scott Baker | beb3cfa | 2019-10-01 14:44:30 -0700 | [diff] [blame^] | 375 | if !found { |
Stephane Barbarie | 260a563 | 2019-02-26 16:12:49 -0500 | [diff] [blame] | 376 | propc = nil |
| 377 | } |
Stephane Barbarie | 260a563 | 2019-02-26 16:12:49 -0500 | [diff] [blame] | 378 | } |
| 379 | select { |
Scott Baker | beb3cfa | 2019-10-01 14:44:30 -0700 | [diff] [blame^] | 380 | case n.confstatec <- cs: |
Stephane Barbarie | 260a563 | 2019-02-26 16:12:49 -0500 | [diff] [blame] | 381 | case <-n.done: |
| 382 | } |
| 383 | case <-n.tickc: |
Scott Baker | beb3cfa | 2019-10-01 14:44:30 -0700 | [diff] [blame^] | 384 | n.rn.Tick() |
Stephane Barbarie | 260a563 | 2019-02-26 16:12:49 -0500 | [diff] [blame] | 385 | case readyc <- rd: |
Scott Baker | beb3cfa | 2019-10-01 14:44:30 -0700 | [diff] [blame^] | 386 | n.rn.acceptReady(rd) |
Stephane Barbarie | 260a563 | 2019-02-26 16:12:49 -0500 | [diff] [blame] | 387 | advancec = n.advancec |
| 388 | case <-advancec: |
Scott Baker | beb3cfa | 2019-10-01 14:44:30 -0700 | [diff] [blame^] | 389 | n.rn.Advance(rd) |
| 390 | rd = Ready{} |
Stephane Barbarie | 260a563 | 2019-02-26 16:12:49 -0500 | [diff] [blame] | 391 | advancec = nil |
| 392 | case c := <-n.status: |
| 393 | c <- getStatus(r) |
| 394 | case <-n.stop: |
| 395 | close(n.done) |
| 396 | return |
| 397 | } |
| 398 | } |
| 399 | } |
| 400 | |
| 401 | // Tick increments the internal logical clock for this Node. Election timeouts |
| 402 | // and heartbeat timeouts are in units of ticks. |
| 403 | func (n *node) Tick() { |
| 404 | select { |
| 405 | case n.tickc <- struct{}{}: |
| 406 | case <-n.done: |
| 407 | default: |
Scott Baker | beb3cfa | 2019-10-01 14:44:30 -0700 | [diff] [blame^] | 408 | n.rn.raft.logger.Warningf("%x (leader %v) A tick missed to fire. Node blocks too long!", n.rn.raft.id, n.rn.raft.id == n.rn.raft.lead) |
Stephane Barbarie | 260a563 | 2019-02-26 16:12:49 -0500 | [diff] [blame] | 409 | } |
| 410 | } |
| 411 | |
| 412 | func (n *node) Campaign(ctx context.Context) error { return n.step(ctx, pb.Message{Type: pb.MsgHup}) } |
| 413 | |
| 414 | func (n *node) Propose(ctx context.Context, data []byte) error { |
| 415 | return n.stepWait(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}}) |
| 416 | } |
| 417 | |
| 418 | func (n *node) Step(ctx context.Context, m pb.Message) error { |
| 419 | // ignore unexpected local messages receiving over network |
| 420 | if IsLocalMsg(m.Type) { |
| 421 | // TODO: return an error? |
| 422 | return nil |
| 423 | } |
| 424 | return n.step(ctx, m) |
| 425 | } |
| 426 | |
Scott Baker | beb3cfa | 2019-10-01 14:44:30 -0700 | [diff] [blame^] | 427 | func confChangeToMsg(c pb.ConfChangeI) (pb.Message, error) { |
| 428 | typ, data, err := pb.MarshalConfChange(c) |
| 429 | if err != nil { |
| 430 | return pb.Message{}, err |
| 431 | } |
| 432 | return pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Type: typ, Data: data}}}, nil |
| 433 | } |
| 434 | |
| 435 | func (n *node) ProposeConfChange(ctx context.Context, cc pb.ConfChangeI) error { |
| 436 | msg, err := confChangeToMsg(cc) |
Stephane Barbarie | 260a563 | 2019-02-26 16:12:49 -0500 | [diff] [blame] | 437 | if err != nil { |
| 438 | return err |
| 439 | } |
Scott Baker | beb3cfa | 2019-10-01 14:44:30 -0700 | [diff] [blame^] | 440 | return n.Step(ctx, msg) |
Stephane Barbarie | 260a563 | 2019-02-26 16:12:49 -0500 | [diff] [blame] | 441 | } |
| 442 | |
| 443 | func (n *node) step(ctx context.Context, m pb.Message) error { |
| 444 | return n.stepWithWaitOption(ctx, m, false) |
| 445 | } |
| 446 | |
| 447 | func (n *node) stepWait(ctx context.Context, m pb.Message) error { |
| 448 | return n.stepWithWaitOption(ctx, m, true) |
| 449 | } |
| 450 | |
| 451 | // Step advances the state machine using msgs. The ctx.Err() will be returned, |
| 452 | // if any. |
| 453 | func (n *node) stepWithWaitOption(ctx context.Context, m pb.Message, wait bool) error { |
| 454 | if m.Type != pb.MsgProp { |
| 455 | select { |
| 456 | case n.recvc <- m: |
| 457 | return nil |
| 458 | case <-ctx.Done(): |
| 459 | return ctx.Err() |
| 460 | case <-n.done: |
| 461 | return ErrStopped |
| 462 | } |
| 463 | } |
| 464 | ch := n.propc |
| 465 | pm := msgWithResult{m: m} |
| 466 | if wait { |
| 467 | pm.result = make(chan error, 1) |
| 468 | } |
| 469 | select { |
| 470 | case ch <- pm: |
| 471 | if !wait { |
| 472 | return nil |
| 473 | } |
| 474 | case <-ctx.Done(): |
| 475 | return ctx.Err() |
| 476 | case <-n.done: |
| 477 | return ErrStopped |
| 478 | } |
| 479 | select { |
William Kurkian | daa6bb2 | 2019-03-07 12:26:28 -0500 | [diff] [blame] | 480 | case err := <-pm.result: |
| 481 | if err != nil { |
| 482 | return err |
Stephane Barbarie | 260a563 | 2019-02-26 16:12:49 -0500 | [diff] [blame] | 483 | } |
| 484 | case <-ctx.Done(): |
| 485 | return ctx.Err() |
| 486 | case <-n.done: |
| 487 | return ErrStopped |
| 488 | } |
| 489 | return nil |
| 490 | } |
| 491 | |
| 492 | func (n *node) Ready() <-chan Ready { return n.readyc } |
| 493 | |
| 494 | func (n *node) Advance() { |
| 495 | select { |
| 496 | case n.advancec <- struct{}{}: |
| 497 | case <-n.done: |
| 498 | } |
| 499 | } |
| 500 | |
Scott Baker | beb3cfa | 2019-10-01 14:44:30 -0700 | [diff] [blame^] | 501 | func (n *node) ApplyConfChange(cc pb.ConfChangeI) *pb.ConfState { |
Stephane Barbarie | 260a563 | 2019-02-26 16:12:49 -0500 | [diff] [blame] | 502 | var cs pb.ConfState |
| 503 | select { |
Scott Baker | beb3cfa | 2019-10-01 14:44:30 -0700 | [diff] [blame^] | 504 | case n.confc <- cc.AsV2(): |
Stephane Barbarie | 260a563 | 2019-02-26 16:12:49 -0500 | [diff] [blame] | 505 | case <-n.done: |
| 506 | } |
| 507 | select { |
| 508 | case cs = <-n.confstatec: |
| 509 | case <-n.done: |
| 510 | } |
| 511 | return &cs |
| 512 | } |
| 513 | |
| 514 | func (n *node) Status() Status { |
| 515 | c := make(chan Status) |
| 516 | select { |
| 517 | case n.status <- c: |
| 518 | return <-c |
| 519 | case <-n.done: |
| 520 | return Status{} |
| 521 | } |
| 522 | } |
| 523 | |
| 524 | func (n *node) ReportUnreachable(id uint64) { |
| 525 | select { |
| 526 | case n.recvc <- pb.Message{Type: pb.MsgUnreachable, From: id}: |
| 527 | case <-n.done: |
| 528 | } |
| 529 | } |
| 530 | |
| 531 | func (n *node) ReportSnapshot(id uint64, status SnapshotStatus) { |
| 532 | rej := status == SnapshotFailure |
| 533 | |
| 534 | select { |
| 535 | case n.recvc <- pb.Message{Type: pb.MsgSnapStatus, From: id, Reject: rej}: |
| 536 | case <-n.done: |
| 537 | } |
| 538 | } |
| 539 | |
| 540 | func (n *node) TransferLeadership(ctx context.Context, lead, transferee uint64) { |
| 541 | select { |
| 542 | // manually set 'from' and 'to', so that leader can voluntarily transfers its leadership |
| 543 | case n.recvc <- pb.Message{Type: pb.MsgTransferLeader, From: transferee, To: lead}: |
| 544 | case <-n.done: |
| 545 | case <-ctx.Done(): |
| 546 | } |
| 547 | } |
| 548 | |
| 549 | func (n *node) ReadIndex(ctx context.Context, rctx []byte) error { |
| 550 | return n.step(ctx, pb.Message{Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: rctx}}}) |
| 551 | } |
| 552 | |
| 553 | func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready { |
| 554 | rd := Ready{ |
| 555 | Entries: r.raftLog.unstableEntries(), |
| 556 | CommittedEntries: r.raftLog.nextEnts(), |
| 557 | Messages: r.msgs, |
| 558 | } |
| 559 | if softSt := r.softState(); !softSt.equal(prevSoftSt) { |
| 560 | rd.SoftState = softSt |
| 561 | } |
| 562 | if hardSt := r.hardState(); !isHardStateEqual(hardSt, prevHardSt) { |
| 563 | rd.HardState = hardSt |
| 564 | } |
| 565 | if r.raftLog.unstable.snapshot != nil { |
| 566 | rd.Snapshot = *r.raftLog.unstable.snapshot |
| 567 | } |
| 568 | if len(r.readStates) != 0 { |
| 569 | rd.ReadStates = r.readStates |
| 570 | } |
| 571 | rd.MustSync = MustSync(r.hardState(), prevHardSt, len(rd.Entries)) |
| 572 | return rd |
| 573 | } |
| 574 | |
| 575 | // MustSync returns true if the hard state and count of Raft entries indicate |
| 576 | // that a synchronous write to persistent storage is required. |
| 577 | func MustSync(st, prevst pb.HardState, entsnum int) bool { |
| 578 | // Persistent state on all servers: |
| 579 | // (Updated on stable storage before responding to RPCs) |
| 580 | // currentTerm |
| 581 | // votedFor |
| 582 | // log entries[] |
| 583 | return entsnum != 0 || st.Vote != prevst.Vote || st.Term != prevst.Term |
| 584 | } |