blob: c30d88445f2b979b089a3592f550c3a63bf6a53b [file] [log] [blame]
Scott Baker2d897982019-09-24 11:50:08 -07001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15/*
16Package raft sends and receives messages in the Protocol Buffer format
17defined in the raftpb package.
18
19Raft is a protocol with which a cluster of nodes can maintain a replicated state machine.
20The state machine is kept in sync through the use of a replicated log.
21For more details on Raft, see "In Search of an Understandable Consensus Algorithm"
22(https://ramcloud.stanford.edu/raft.pdf) by Diego Ongaro and John Ousterhout.
23
24A simple example application, _raftexample_, is also available to help illustrate
25how to use this package in practice:
26https://github.com/etcd-io/etcd/tree/master/contrib/raftexample
27
28Usage
29
30The primary object in raft is a Node. You either start a Node from scratch
31using raft.StartNode or start a Node from some initial state using raft.RestartNode.
32
33To start a node from scratch:
34
35 storage := raft.NewMemoryStorage()
36 c := &Config{
37 ID: 0x01,
38 ElectionTick: 10,
39 HeartbeatTick: 1,
40 Storage: storage,
41 MaxSizePerMsg: 4096,
42 MaxInflightMsgs: 256,
43 }
44 n := raft.StartNode(c, []raft.Peer{{ID: 0x02}, {ID: 0x03}})
45
46To restart a node from previous state:
47
48 storage := raft.NewMemoryStorage()
49
50 // recover the in-memory storage from persistent
51 // snapshot, state and entries.
52 storage.ApplySnapshot(snapshot)
53 storage.SetHardState(state)
54 storage.Append(entries)
55
56 c := &Config{
57 ID: 0x01,
58 ElectionTick: 10,
59 HeartbeatTick: 1,
60 Storage: storage,
61 MaxSizePerMsg: 4096,
62 MaxInflightMsgs: 256,
63 }
64
65 // restart raft without peer information.
66 // peer information is already included in the storage.
67 n := raft.RestartNode(c)
68
69Now that you are holding onto a Node you have a few responsibilities:
70
71First, you must read from the Node.Ready() channel and process the updates
72it contains. These steps may be performed in parallel, except as noted in step
732.
74
751. Write HardState, Entries, and Snapshot to persistent storage if they are
76not empty. Note that when writing an Entry with Index i, any
77previously-persisted entries with Index >= i must be discarded.
78
792. Send all Messages to the nodes named in the To field. It is important that
80no messages be sent until the latest HardState has been persisted to disk,
81and all Entries written by any previous Ready batch (Messages may be sent while
82entries from the same batch are being persisted). To reduce the I/O latency, an
83optimization can be applied to make leader write to disk in parallel with its
84followers (as explained at section 10.2.1 in Raft thesis). If any Message has type
85MsgSnap, call Node.ReportSnapshot() after it has been sent (these messages may be
86large).
87
88Note: Marshalling messages is not thread-safe; it is important that you
89make sure that no new entries are persisted while marshalling.
90The easiest way to achieve this is to serialize the messages directly inside
91your main raft loop.
92
933. Apply Snapshot (if any) and CommittedEntries to the state machine.
94If any committed Entry has Type EntryConfChange, call Node.ApplyConfChange()
95to apply it to the node. The configuration change may be cancelled at this point
96by setting the NodeID field to zero before calling ApplyConfChange
97(but ApplyConfChange must be called one way or the other, and the decision to cancel
98must be based solely on the state machine and not external information such as
99the observed health of the node).
100
1014. Call Node.Advance() to signal readiness for the next batch of updates.
102This may be done at any time after step 1, although all updates must be processed
103in the order they were returned by Ready.
104
105Second, all persisted log entries must be made available via an
106implementation of the Storage interface. The provided MemoryStorage
107type can be used for this (if you repopulate its state upon a
108restart), or you can supply your own disk-backed implementation.
109
110Third, when you receive a message from another node, pass it to Node.Step:
111
112 func recvRaftRPC(ctx context.Context, m raftpb.Message) {
113 n.Step(ctx, m)
114 }
115
116Finally, you need to call Node.Tick() at regular intervals (probably
117via a time.Ticker). Raft has two important timeouts: heartbeat and the
118election timeout. However, internally to the raft package time is
119represented by an abstract "tick".
120
121The total state machine handling loop will look something like this:
122
123 for {
124 select {
125 case <-s.Ticker:
126 n.Tick()
127 case rd := <-s.Node.Ready():
128 saveToStorage(rd.State, rd.Entries, rd.Snapshot)
129 send(rd.Messages)
130 if !raft.IsEmptySnap(rd.Snapshot) {
131 processSnapshot(rd.Snapshot)
132 }
133 for _, entry := range rd.CommittedEntries {
134 process(entry)
135 if entry.Type == raftpb.EntryConfChange {
136 var cc raftpb.ConfChange
137 cc.Unmarshal(entry.Data)
138 s.Node.ApplyConfChange(cc)
139 }
140 }
141 s.Node.Advance()
142 case <-s.done:
143 return
144 }
145 }
146
147To propose changes to the state machine from your node take your application
148data, serialize it into a byte slice and call:
149
150 n.Propose(ctx, data)
151
152If the proposal is committed, data will appear in committed entries with type
153raftpb.EntryNormal. There is no guarantee that a proposed command will be
154committed; you may have to re-propose after a timeout.
155
156To add or remove a node in a cluster, build ConfChange struct 'cc' and call:
157
158 n.ProposeConfChange(ctx, cc)
159
160After config change is committed, some committed entry with type
161raftpb.EntryConfChange will be returned. You must apply it to node through:
162
163 var cc raftpb.ConfChange
164 cc.Unmarshal(data)
165 n.ApplyConfChange(cc)
166
167Note: An ID represents a unique node in a cluster for all time. A
168given ID MUST be used only once even if the old node has been removed.
169This means that for example IP addresses make poor node IDs since they
170may be reused. Node IDs must be non-zero.
171
172Implementation notes
173
174This implementation is up to date with the final Raft thesis
175(https://ramcloud.stanford.edu/~ongaro/thesis.pdf), although our
176implementation of the membership change protocol differs somewhat from
177that described in chapter 4. The key invariant that membership changes
178happen one node at a time is preserved, but in our implementation the
179membership change takes effect when its entry is applied, not when it
180is added to the log (so the entry is committed under the old
181membership instead of the new). This is equivalent in terms of safety,
182since the old and new configurations are guaranteed to overlap.
183
184To ensure that we do not attempt to commit two membership changes at
185once by matching log positions (which would be unsafe since they
186should have different quorum requirements), we simply disallow any
187proposed membership change while any uncommitted change appears in
188the leader's log.
189
190This approach introduces a problem when you try to remove a member
191from a two-member cluster: If one of the members dies before the
192other one receives the commit of the confchange entry, then the member
193cannot be removed any more since the cluster cannot make progress.
194For this reason it is highly recommended to use three or more nodes in
195every cluster.
196
197MessageType
198
199Package raft sends and receives message in Protocol Buffer format (defined
200in raftpb package). Each state (follower, candidate, leader) implements its
201own 'step' method ('stepFollower', 'stepCandidate', 'stepLeader') when
202advancing with the given raftpb.Message. Each step is determined by its
203raftpb.MessageType. Note that every step is checked by one common method
204'Step' that safety-checks the terms of node and incoming message to prevent
205stale log entries:
206
207 'MsgHup' is used for election. If a node is a follower or candidate, the
208 'tick' function in 'raft' struct is set as 'tickElection'. If a follower or
209 candidate has not received any heartbeat before the election timeout, it
210 passes 'MsgHup' to its Step method and becomes (or remains) a candidate to
211 start a new election.
212
213 'MsgBeat' is an internal type that signals the leader to send a heartbeat of
214 the 'MsgHeartbeat' type. If a node is a leader, the 'tick' function in
215 the 'raft' struct is set as 'tickHeartbeat', and triggers the leader to
216 send periodic 'MsgHeartbeat' messages to its followers.
217
218 'MsgProp' proposes to append data to its log entries. This is a special
219 type to redirect proposals to leader. Therefore, send method overwrites
220 raftpb.Message's term with its HardState's term to avoid attaching its
221 local term to 'MsgProp'. When 'MsgProp' is passed to the leader's 'Step'
222 method, the leader first calls the 'appendEntry' method to append entries
223 to its log, and then calls 'bcastAppend' method to send those entries to
224 its peers. When passed to candidate, 'MsgProp' is dropped. When passed to
225 follower, 'MsgProp' is stored in follower's mailbox(msgs) by the send
226 method. It is stored with sender's ID and later forwarded to leader by
227 rafthttp package.
228
229 'MsgApp' contains log entries to replicate. A leader calls bcastAppend,
230 which calls sendAppend, which sends soon-to-be-replicated logs in 'MsgApp'
231 type. When 'MsgApp' is passed to candidate's Step method, candidate reverts
232 back to follower, because it indicates that there is a valid leader sending
233 'MsgApp' messages. Candidate and follower respond to this message in
234 'MsgAppResp' type.
235
236 'MsgAppResp' is response to log replication request('MsgApp'). When
237 'MsgApp' is passed to candidate or follower's Step method, it responds by
238 calling 'handleAppendEntries' method, which sends 'MsgAppResp' to raft
239 mailbox.
240
241 'MsgVote' requests votes for election. When a node is a follower or
242 candidate and 'MsgHup' is passed to its Step method, then the node calls
243 'campaign' method to campaign itself to become a leader. Once 'campaign'
244 method is called, the node becomes candidate and sends 'MsgVote' to peers
245 in cluster to request votes. When passed to leader or candidate's Step
246 method and the message's Term is lower than leader's or candidate's,
247 'MsgVote' will be rejected ('MsgVoteResp' is returned with Reject true).
248 If leader or candidate receives 'MsgVote' with higher term, it will revert
249 back to follower. When 'MsgVote' is passed to follower, it votes for the
250 sender only when sender's last term is greater than MsgVote's term or
251 sender's last term is equal to MsgVote's term but sender's last committed
252 index is greater than or equal to follower's.
253
254 'MsgVoteResp' contains responses from voting request. When 'MsgVoteResp' is
255 passed to candidate, the candidate calculates how many votes it has won. If
256 it's more than majority (quorum), it becomes leader and calls 'bcastAppend'.
257 If candidate receives majority of votes of denials, it reverts back to
258 follower.
259
260 'MsgPreVote' and 'MsgPreVoteResp' are used in an optional two-phase election
261 protocol. When Config.PreVote is true, a pre-election is carried out first
262 (using the same rules as a regular election), and no node increases its term
263 number unless the pre-election indicates that the campaigning node would win.
264 This minimizes disruption when a partitioned node rejoins the cluster.
265
266 'MsgSnap' requests to install a snapshot message. When a node has just
267 become a leader or the leader receives 'MsgProp' message, it calls
268 'bcastAppend' method, which then calls 'sendAppend' method to each
269 follower. In 'sendAppend', if a leader fails to get term or entries,
270 the leader requests snapshot by sending 'MsgSnap' type message.
271
272 'MsgSnapStatus' tells the result of snapshot install message. When a
273 follower rejected 'MsgSnap', it indicates the snapshot request with
274 'MsgSnap' had failed from network issues which causes the network layer
275 to fail to send out snapshots to its followers. Then leader considers
276 follower's progress as probe. When 'MsgSnap' were not rejected, it
277 indicates that the snapshot succeeded and the leader sets follower's
278 progress to probe and resumes its log replication.
279
280 'MsgHeartbeat' sends heartbeat from leader. When 'MsgHeartbeat' is passed
281 to candidate and message's term is higher than candidate's, the candidate
282 reverts back to follower and updates its committed index from the one in
283 this heartbeat. And it sends the message to its mailbox. When
284 'MsgHeartbeat' is passed to follower's Step method and message's term is
285 higher than follower's, the follower updates its leaderID with the ID
286 from the message.
287
288 'MsgHeartbeatResp' is a response to 'MsgHeartbeat'. When 'MsgHeartbeatResp'
289 is passed to leader's Step method, the leader knows which follower
290 responded. And only when the leader's last committed index is greater than
291 follower's Match index, the leader runs 'sendAppend` method.
292
293 'MsgUnreachable' tells that request(message) wasn't delivered. When
294 'MsgUnreachable' is passed to leader's Step method, the leader discovers
295 that the follower that sent this 'MsgUnreachable' is not reachable, often
296 indicating 'MsgApp' is lost. When follower's progress state is replicate,
297 the leader sets it back to probe.
298
299*/
300package raft