William Kurkian | ea86948 | 2019-04-09 15:16:11 -0400 | [diff] [blame] | 1 | // Copyright 2015 The etcd Authors |
| 2 | // |
| 3 | // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | // you may not use this file except in compliance with the License. |
| 5 | // You may obtain a copy of the License at |
| 6 | // |
| 7 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | // |
| 9 | // Unless required by applicable law or agreed to in writing, software |
| 10 | // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | // See the License for the specific language governing permissions and |
| 13 | // limitations under the License. |
| 14 | |
| 15 | /* |
| 16 | Package raft sends and receives messages in the Protocol Buffer format |
| 17 | defined in the raftpb package. |
| 18 | |
| 19 | Raft is a protocol with which a cluster of nodes can maintain a replicated state machine. |
| 20 | The state machine is kept in sync through the use of a replicated log. |
| 21 | For more details on Raft, see "In Search of an Understandable Consensus Algorithm" |
Abhilash S.L | 3b49463 | 2019-07-16 15:51:09 +0530 | [diff] [blame] | 22 | (https://raft.github.io/raft.pdf) by Diego Ongaro and John Ousterhout. |
William Kurkian | ea86948 | 2019-04-09 15:16:11 -0400 | [diff] [blame] | 23 | |
| 24 | A simple example application, _raftexample_, is also available to help illustrate |
| 25 | how to use this package in practice: |
| 26 | https://github.com/etcd-io/etcd/tree/master/contrib/raftexample |
| 27 | |
| 28 | Usage |
| 29 | |
| 30 | The primary object in raft is a Node. You either start a Node from scratch |
| 31 | using raft.StartNode or start a Node from some initial state using raft.RestartNode. |
| 32 | |
| 33 | To start a node from scratch: |
| 34 | |
| 35 | storage := raft.NewMemoryStorage() |
| 36 | c := &Config{ |
| 37 | ID: 0x01, |
| 38 | ElectionTick: 10, |
| 39 | HeartbeatTick: 1, |
| 40 | Storage: storage, |
| 41 | MaxSizePerMsg: 4096, |
| 42 | MaxInflightMsgs: 256, |
| 43 | } |
| 44 | n := raft.StartNode(c, []raft.Peer{{ID: 0x02}, {ID: 0x03}}) |
| 45 | |
| 46 | To restart a node from previous state: |
| 47 | |
| 48 | storage := raft.NewMemoryStorage() |
| 49 | |
| 50 | // recover the in-memory storage from persistent |
| 51 | // snapshot, state and entries. |
| 52 | storage.ApplySnapshot(snapshot) |
| 53 | storage.SetHardState(state) |
| 54 | storage.Append(entries) |
| 55 | |
| 56 | c := &Config{ |
| 57 | ID: 0x01, |
| 58 | ElectionTick: 10, |
| 59 | HeartbeatTick: 1, |
| 60 | Storage: storage, |
| 61 | MaxSizePerMsg: 4096, |
| 62 | MaxInflightMsgs: 256, |
| 63 | } |
| 64 | |
| 65 | // restart raft without peer information. |
| 66 | // peer information is already included in the storage. |
| 67 | n := raft.RestartNode(c) |
| 68 | |
| 69 | Now that you are holding onto a Node you have a few responsibilities: |
| 70 | |
| 71 | First, you must read from the Node.Ready() channel and process the updates |
| 72 | it contains. These steps may be performed in parallel, except as noted in step |
| 73 | 2. |
| 74 | |
| 75 | 1. Write HardState, Entries, and Snapshot to persistent storage if they are |
| 76 | not empty. Note that when writing an Entry with Index i, any |
| 77 | previously-persisted entries with Index >= i must be discarded. |
| 78 | |
| 79 | 2. Send all Messages to the nodes named in the To field. It is important that |
| 80 | no messages be sent until the latest HardState has been persisted to disk, |
| 81 | and all Entries written by any previous Ready batch (Messages may be sent while |
| 82 | entries from the same batch are being persisted). To reduce the I/O latency, an |
| 83 | optimization can be applied to make leader write to disk in parallel with its |
| 84 | followers (as explained at section 10.2.1 in Raft thesis). If any Message has type |
| 85 | MsgSnap, call Node.ReportSnapshot() after it has been sent (these messages may be |
| 86 | large). |
| 87 | |
| 88 | Note: Marshalling messages is not thread-safe; it is important that you |
| 89 | make sure that no new entries are persisted while marshalling. |
| 90 | The easiest way to achieve this is to serialize the messages directly inside |
| 91 | your main raft loop. |
| 92 | |
| 93 | 3. Apply Snapshot (if any) and CommittedEntries to the state machine. |
| 94 | If any committed Entry has Type EntryConfChange, call Node.ApplyConfChange() |
| 95 | to apply it to the node. The configuration change may be cancelled at this point |
| 96 | by setting the NodeID field to zero before calling ApplyConfChange |
| 97 | (but ApplyConfChange must be called one way or the other, and the decision to cancel |
| 98 | must be based solely on the state machine and not external information such as |
| 99 | the observed health of the node). |
| 100 | |
| 101 | 4. Call Node.Advance() to signal readiness for the next batch of updates. |
| 102 | This may be done at any time after step 1, although all updates must be processed |
| 103 | in the order they were returned by Ready. |
| 104 | |
| 105 | Second, all persisted log entries must be made available via an |
| 106 | implementation of the Storage interface. The provided MemoryStorage |
| 107 | type can be used for this (if you repopulate its state upon a |
| 108 | restart), or you can supply your own disk-backed implementation. |
| 109 | |
| 110 | Third, when you receive a message from another node, pass it to Node.Step: |
| 111 | |
| 112 | func recvRaftRPC(ctx context.Context, m raftpb.Message) { |
| 113 | n.Step(ctx, m) |
| 114 | } |
| 115 | |
| 116 | Finally, you need to call Node.Tick() at regular intervals (probably |
| 117 | via a time.Ticker). Raft has two important timeouts: heartbeat and the |
| 118 | election timeout. However, internally to the raft package time is |
| 119 | represented by an abstract "tick". |
| 120 | |
| 121 | The total state machine handling loop will look something like this: |
| 122 | |
| 123 | for { |
| 124 | select { |
| 125 | case <-s.Ticker: |
| 126 | n.Tick() |
| 127 | case rd := <-s.Node.Ready(): |
| 128 | saveToStorage(rd.State, rd.Entries, rd.Snapshot) |
| 129 | send(rd.Messages) |
| 130 | if !raft.IsEmptySnap(rd.Snapshot) { |
| 131 | processSnapshot(rd.Snapshot) |
| 132 | } |
| 133 | for _, entry := range rd.CommittedEntries { |
| 134 | process(entry) |
| 135 | if entry.Type == raftpb.EntryConfChange { |
| 136 | var cc raftpb.ConfChange |
| 137 | cc.Unmarshal(entry.Data) |
| 138 | s.Node.ApplyConfChange(cc) |
| 139 | } |
| 140 | } |
| 141 | s.Node.Advance() |
| 142 | case <-s.done: |
| 143 | return |
| 144 | } |
| 145 | } |
| 146 | |
| 147 | To propose changes to the state machine from your node take your application |
| 148 | data, serialize it into a byte slice and call: |
| 149 | |
| 150 | n.Propose(ctx, data) |
| 151 | |
| 152 | If the proposal is committed, data will appear in committed entries with type |
| 153 | raftpb.EntryNormal. There is no guarantee that a proposed command will be |
| 154 | committed; you may have to re-propose after a timeout. |
| 155 | |
| 156 | To add or remove a node in a cluster, build ConfChange struct 'cc' and call: |
| 157 | |
| 158 | n.ProposeConfChange(ctx, cc) |
| 159 | |
| 160 | After config change is committed, some committed entry with type |
| 161 | raftpb.EntryConfChange will be returned. You must apply it to node through: |
| 162 | |
| 163 | var cc raftpb.ConfChange |
| 164 | cc.Unmarshal(data) |
| 165 | n.ApplyConfChange(cc) |
| 166 | |
| 167 | Note: An ID represents a unique node in a cluster for all time. A |
| 168 | given ID MUST be used only once even if the old node has been removed. |
| 169 | This means that for example IP addresses make poor node IDs since they |
| 170 | may be reused. Node IDs must be non-zero. |
| 171 | |
| 172 | Implementation notes |
| 173 | |
| 174 | This implementation is up to date with the final Raft thesis |
Abhilash S.L | 3b49463 | 2019-07-16 15:51:09 +0530 | [diff] [blame] | 175 | (https://github.com/ongardie/dissertation/blob/master/stanford.pdf), although our |
William Kurkian | ea86948 | 2019-04-09 15:16:11 -0400 | [diff] [blame] | 176 | implementation of the membership change protocol differs somewhat from |
| 177 | that described in chapter 4. The key invariant that membership changes |
| 178 | happen one node at a time is preserved, but in our implementation the |
| 179 | membership change takes effect when its entry is applied, not when it |
| 180 | is added to the log (so the entry is committed under the old |
| 181 | membership instead of the new). This is equivalent in terms of safety, |
| 182 | since the old and new configurations are guaranteed to overlap. |
| 183 | |
| 184 | To ensure that we do not attempt to commit two membership changes at |
| 185 | once by matching log positions (which would be unsafe since they |
| 186 | should have different quorum requirements), we simply disallow any |
| 187 | proposed membership change while any uncommitted change appears in |
| 188 | the leader's log. |
| 189 | |
| 190 | This approach introduces a problem when you try to remove a member |
| 191 | from a two-member cluster: If one of the members dies before the |
| 192 | other one receives the commit of the confchange entry, then the member |
| 193 | cannot be removed any more since the cluster cannot make progress. |
| 194 | For this reason it is highly recommended to use three or more nodes in |
| 195 | every cluster. |
| 196 | |
| 197 | MessageType |
| 198 | |
| 199 | Package raft sends and receives message in Protocol Buffer format (defined |
| 200 | in raftpb package). Each state (follower, candidate, leader) implements its |
| 201 | own 'step' method ('stepFollower', 'stepCandidate', 'stepLeader') when |
| 202 | advancing with the given raftpb.Message. Each step is determined by its |
| 203 | raftpb.MessageType. Note that every step is checked by one common method |
| 204 | 'Step' that safety-checks the terms of node and incoming message to prevent |
| 205 | stale log entries: |
| 206 | |
| 207 | 'MsgHup' is used for election. If a node is a follower or candidate, the |
| 208 | 'tick' function in 'raft' struct is set as 'tickElection'. If a follower or |
| 209 | candidate has not received any heartbeat before the election timeout, it |
| 210 | passes 'MsgHup' to its Step method and becomes (or remains) a candidate to |
| 211 | start a new election. |
| 212 | |
| 213 | 'MsgBeat' is an internal type that signals the leader to send a heartbeat of |
| 214 | the 'MsgHeartbeat' type. If a node is a leader, the 'tick' function in |
| 215 | the 'raft' struct is set as 'tickHeartbeat', and triggers the leader to |
| 216 | send periodic 'MsgHeartbeat' messages to its followers. |
| 217 | |
| 218 | 'MsgProp' proposes to append data to its log entries. This is a special |
| 219 | type to redirect proposals to leader. Therefore, send method overwrites |
| 220 | raftpb.Message's term with its HardState's term to avoid attaching its |
| 221 | local term to 'MsgProp'. When 'MsgProp' is passed to the leader's 'Step' |
| 222 | method, the leader first calls the 'appendEntry' method to append entries |
| 223 | to its log, and then calls 'bcastAppend' method to send those entries to |
| 224 | its peers. When passed to candidate, 'MsgProp' is dropped. When passed to |
| 225 | follower, 'MsgProp' is stored in follower's mailbox(msgs) by the send |
| 226 | method. It is stored with sender's ID and later forwarded to leader by |
| 227 | rafthttp package. |
| 228 | |
| 229 | 'MsgApp' contains log entries to replicate. A leader calls bcastAppend, |
| 230 | which calls sendAppend, which sends soon-to-be-replicated logs in 'MsgApp' |
| 231 | type. When 'MsgApp' is passed to candidate's Step method, candidate reverts |
| 232 | back to follower, because it indicates that there is a valid leader sending |
| 233 | 'MsgApp' messages. Candidate and follower respond to this message in |
| 234 | 'MsgAppResp' type. |
| 235 | |
| 236 | 'MsgAppResp' is response to log replication request('MsgApp'). When |
| 237 | 'MsgApp' is passed to candidate or follower's Step method, it responds by |
| 238 | calling 'handleAppendEntries' method, which sends 'MsgAppResp' to raft |
| 239 | mailbox. |
| 240 | |
| 241 | 'MsgVote' requests votes for election. When a node is a follower or |
| 242 | candidate and 'MsgHup' is passed to its Step method, then the node calls |
| 243 | 'campaign' method to campaign itself to become a leader. Once 'campaign' |
| 244 | method is called, the node becomes candidate and sends 'MsgVote' to peers |
| 245 | in cluster to request votes. When passed to leader or candidate's Step |
| 246 | method and the message's Term is lower than leader's or candidate's, |
| 247 | 'MsgVote' will be rejected ('MsgVoteResp' is returned with Reject true). |
| 248 | If leader or candidate receives 'MsgVote' with higher term, it will revert |
| 249 | back to follower. When 'MsgVote' is passed to follower, it votes for the |
| 250 | sender only when sender's last term is greater than MsgVote's term or |
| 251 | sender's last term is equal to MsgVote's term but sender's last committed |
| 252 | index is greater than or equal to follower's. |
| 253 | |
| 254 | 'MsgVoteResp' contains responses from voting request. When 'MsgVoteResp' is |
| 255 | passed to candidate, the candidate calculates how many votes it has won. If |
| 256 | it's more than majority (quorum), it becomes leader and calls 'bcastAppend'. |
| 257 | If candidate receives majority of votes of denials, it reverts back to |
| 258 | follower. |
| 259 | |
| 260 | 'MsgPreVote' and 'MsgPreVoteResp' are used in an optional two-phase election |
| 261 | protocol. When Config.PreVote is true, a pre-election is carried out first |
| 262 | (using the same rules as a regular election), and no node increases its term |
| 263 | number unless the pre-election indicates that the campaigning node would win. |
| 264 | This minimizes disruption when a partitioned node rejoins the cluster. |
| 265 | |
| 266 | 'MsgSnap' requests to install a snapshot message. When a node has just |
| 267 | become a leader or the leader receives 'MsgProp' message, it calls |
| 268 | 'bcastAppend' method, which then calls 'sendAppend' method to each |
| 269 | follower. In 'sendAppend', if a leader fails to get term or entries, |
| 270 | the leader requests snapshot by sending 'MsgSnap' type message. |
| 271 | |
| 272 | 'MsgSnapStatus' tells the result of snapshot install message. When a |
| 273 | follower rejected 'MsgSnap', it indicates the snapshot request with |
| 274 | 'MsgSnap' had failed from network issues which causes the network layer |
| 275 | to fail to send out snapshots to its followers. Then leader considers |
| 276 | follower's progress as probe. When 'MsgSnap' were not rejected, it |
| 277 | indicates that the snapshot succeeded and the leader sets follower's |
| 278 | progress to probe and resumes its log replication. |
| 279 | |
| 280 | 'MsgHeartbeat' sends heartbeat from leader. When 'MsgHeartbeat' is passed |
| 281 | to candidate and message's term is higher than candidate's, the candidate |
| 282 | reverts back to follower and updates its committed index from the one in |
| 283 | this heartbeat. And it sends the message to its mailbox. When |
| 284 | 'MsgHeartbeat' is passed to follower's Step method and message's term is |
| 285 | higher than follower's, the follower updates its leaderID with the ID |
| 286 | from the message. |
| 287 | |
| 288 | 'MsgHeartbeatResp' is a response to 'MsgHeartbeat'. When 'MsgHeartbeatResp' |
| 289 | is passed to leader's Step method, the leader knows which follower |
| 290 | responded. And only when the leader's last committed index is greater than |
| 291 | follower's Match index, the leader runs 'sendAppend` method. |
| 292 | |
| 293 | 'MsgUnreachable' tells that request(message) wasn't delivered. When |
| 294 | 'MsgUnreachable' is passed to leader's Step method, the leader discovers |
| 295 | that the follower that sent this 'MsgUnreachable' is not reachable, often |
| 296 | indicating 'MsgApp' is lost. When follower's progress state is replicate, |
| 297 | the leader sets it back to probe. |
| 298 | |
| 299 | */ |
| 300 | package raft |