blob: 01e23ec98942244737e870f31ccce35d8a237b3e [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package raft
16
17import (
18 "bytes"
19 "errors"
20 "fmt"
21 "math"
22 "math/rand"
William Kurkianea869482019-04-09 15:16:11 -040023 "strings"
24 "sync"
25 "time"
26
Devmalya Paulfb990a52019-07-09 10:01:49 -040027 "go.etcd.io/etcd/raft/confchange"
Abhilash S.L3b494632019-07-16 15:51:09 +053028 "go.etcd.io/etcd/raft/quorum"
William Kurkianea869482019-04-09 15:16:11 -040029 pb "go.etcd.io/etcd/raft/raftpb"
Abhilash S.L3b494632019-07-16 15:51:09 +053030 "go.etcd.io/etcd/raft/tracker"
William Kurkianea869482019-04-09 15:16:11 -040031)
32
33// None is a placeholder node ID used when there is no leader.
34const None uint64 = 0
35const noLimit = math.MaxUint64
36
37// Possible values for StateType.
38const (
39 StateFollower StateType = iota
40 StateCandidate
41 StateLeader
42 StatePreCandidate
43 numStates
44)
45
46type ReadOnlyOption int
47
48const (
49 // ReadOnlySafe guarantees the linearizability of the read only request by
50 // communicating with the quorum. It is the default and suggested option.
51 ReadOnlySafe ReadOnlyOption = iota
52 // ReadOnlyLeaseBased ensures linearizability of the read only request by
53 // relying on the leader lease. It can be affected by clock drift.
54 // If the clock drift is unbounded, leader might keep the lease longer than it
55 // should (clock can move backward/pause without any bound). ReadIndex is not safe
56 // in that case.
57 ReadOnlyLeaseBased
58)
59
60// Possible values for CampaignType
61const (
62 // campaignPreElection represents the first phase of a normal election when
63 // Config.PreVote is true.
64 campaignPreElection CampaignType = "CampaignPreElection"
65 // campaignElection represents a normal (time-based) election (the second phase
66 // of the election when Config.PreVote is true).
67 campaignElection CampaignType = "CampaignElection"
68 // campaignTransfer represents the type of leader transfer
69 campaignTransfer CampaignType = "CampaignTransfer"
70)
71
72// ErrProposalDropped is returned when the proposal is ignored by some cases,
73// so that the proposer can be notified and fail fast.
74var ErrProposalDropped = errors.New("raft proposal dropped")
75
76// lockedRand is a small wrapper around rand.Rand to provide
77// synchronization among multiple raft groups. Only the methods needed
78// by the code are exposed (e.g. Intn).
79type lockedRand struct {
80 mu sync.Mutex
81 rand *rand.Rand
82}
83
84func (r *lockedRand) Intn(n int) int {
85 r.mu.Lock()
86 v := r.rand.Intn(n)
87 r.mu.Unlock()
88 return v
89}
90
91var globalRand = &lockedRand{
92 rand: rand.New(rand.NewSource(time.Now().UnixNano())),
93}
94
95// CampaignType represents the type of campaigning
96// the reason we use the type of string instead of uint64
97// is because it's simpler to compare and fill in raft entries
98type CampaignType string
99
100// StateType represents the role of a node in a cluster.
101type StateType uint64
102
103var stmap = [...]string{
104 "StateFollower",
105 "StateCandidate",
106 "StateLeader",
107 "StatePreCandidate",
108}
109
110func (st StateType) String() string {
111 return stmap[uint64(st)]
112}
113
114// Config contains the parameters to start a raft.
115type Config struct {
116 // ID is the identity of the local raft. ID cannot be 0.
117 ID uint64
118
119 // peers contains the IDs of all nodes (including self) in the raft cluster. It
120 // should only be set when starting a new raft cluster. Restarting raft from
121 // previous configuration will panic if peers is set. peer is private and only
122 // used for testing right now.
123 peers []uint64
124
125 // learners contains the IDs of all learner nodes (including self if the
126 // local node is a learner) in the raft cluster. learners only receives
127 // entries from the leader node. It does not vote or promote itself.
128 learners []uint64
129
130 // ElectionTick is the number of Node.Tick invocations that must pass between
131 // elections. That is, if a follower does not receive any message from the
132 // leader of current term before ElectionTick has elapsed, it will become
133 // candidate and start an election. ElectionTick must be greater than
134 // HeartbeatTick. We suggest ElectionTick = 10 * HeartbeatTick to avoid
135 // unnecessary leader switching.
136 ElectionTick int
137 // HeartbeatTick is the number of Node.Tick invocations that must pass between
138 // heartbeats. That is, a leader sends heartbeat messages to maintain its
139 // leadership every HeartbeatTick ticks.
140 HeartbeatTick int
141
142 // Storage is the storage for raft. raft generates entries and states to be
143 // stored in storage. raft reads the persisted entries and states out of
144 // Storage when it needs. raft reads out the previous state and configuration
145 // out of storage when restarting.
146 Storage Storage
147 // Applied is the last applied index. It should only be set when restarting
148 // raft. raft will not return entries to the application smaller or equal to
149 // Applied. If Applied is unset when restarting, raft might return previous
150 // applied entries. This is a very application dependent configuration.
151 Applied uint64
152
153 // MaxSizePerMsg limits the max byte size of each append message. Smaller
154 // value lowers the raft recovery cost(initial probing and message lost
155 // during normal operation). On the other side, it might affect the
156 // throughput during normal replication. Note: math.MaxUint64 for unlimited,
157 // 0 for at most one entry per message.
158 MaxSizePerMsg uint64
159 // MaxCommittedSizePerReady limits the size of the committed entries which
160 // can be applied.
161 MaxCommittedSizePerReady uint64
162 // MaxUncommittedEntriesSize limits the aggregate byte size of the
163 // uncommitted entries that may be appended to a leader's log. Once this
164 // limit is exceeded, proposals will begin to return ErrProposalDropped
165 // errors. Note: 0 for no limit.
166 MaxUncommittedEntriesSize uint64
167 // MaxInflightMsgs limits the max number of in-flight append messages during
168 // optimistic replication phase. The application transportation layer usually
169 // has its own sending buffer over TCP/UDP. Setting MaxInflightMsgs to avoid
170 // overflowing that sending buffer. TODO (xiangli): feedback to application to
171 // limit the proposal rate?
172 MaxInflightMsgs int
173
174 // CheckQuorum specifies if the leader should check quorum activity. Leader
175 // steps down when quorum is not active for an electionTimeout.
176 CheckQuorum bool
177
178 // PreVote enables the Pre-Vote algorithm described in raft thesis section
179 // 9.6. This prevents disruption when a node that has been partitioned away
180 // rejoins the cluster.
181 PreVote bool
182
183 // ReadOnlyOption specifies how the read only request is processed.
184 //
185 // ReadOnlySafe guarantees the linearizability of the read only request by
186 // communicating with the quorum. It is the default and suggested option.
187 //
188 // ReadOnlyLeaseBased ensures linearizability of the read only request by
189 // relying on the leader lease. It can be affected by clock drift.
190 // If the clock drift is unbounded, leader might keep the lease longer than it
191 // should (clock can move backward/pause without any bound). ReadIndex is not safe
192 // in that case.
193 // CheckQuorum MUST be enabled if ReadOnlyOption is ReadOnlyLeaseBased.
194 ReadOnlyOption ReadOnlyOption
195
196 // Logger is the logger used for raft log. For multinode which can host
197 // multiple raft group, each raft group can have its own logger
198 Logger Logger
199
200 // DisableProposalForwarding set to true means that followers will drop
201 // proposals, rather than forwarding them to the leader. One use case for
202 // this feature would be in a situation where the Raft leader is used to
203 // compute the data of a proposal, for example, adding a timestamp from a
204 // hybrid logical clock to data in a monotonically increasing way. Forwarding
205 // should be disabled to prevent a follower with an inaccurate hybrid
206 // logical clock from assigning the timestamp and then forwarding the data
207 // to the leader.
208 DisableProposalForwarding bool
209}
210
211func (c *Config) validate() error {
212 if c.ID == None {
213 return errors.New("cannot use none as id")
214 }
215
216 if c.HeartbeatTick <= 0 {
217 return errors.New("heartbeat tick must be greater than 0")
218 }
219
220 if c.ElectionTick <= c.HeartbeatTick {
221 return errors.New("election tick must be greater than heartbeat tick")
222 }
223
224 if c.Storage == nil {
225 return errors.New("storage cannot be nil")
226 }
227
228 if c.MaxUncommittedEntriesSize == 0 {
229 c.MaxUncommittedEntriesSize = noLimit
230 }
231
232 // default MaxCommittedSizePerReady to MaxSizePerMsg because they were
233 // previously the same parameter.
234 if c.MaxCommittedSizePerReady == 0 {
235 c.MaxCommittedSizePerReady = c.MaxSizePerMsg
236 }
237
238 if c.MaxInflightMsgs <= 0 {
239 return errors.New("max inflight messages must be greater than 0")
240 }
241
242 if c.Logger == nil {
243 c.Logger = raftLogger
244 }
245
246 if c.ReadOnlyOption == ReadOnlyLeaseBased && !c.CheckQuorum {
247 return errors.New("CheckQuorum must be enabled when ReadOnlyOption is ReadOnlyLeaseBased")
248 }
249
250 return nil
251}
252
253type raft struct {
254 id uint64
255
256 Term uint64
257 Vote uint64
258
259 readStates []ReadState
260
261 // the log
262 raftLog *raftLog
263
264 maxMsgSize uint64
265 maxUncommittedSize uint64
Abhilash S.L3b494632019-07-16 15:51:09 +0530266 prs tracker.ProgressTracker
William Kurkianea869482019-04-09 15:16:11 -0400267
268 state StateType
269
270 // isLearner is true if the local raft node is a learner.
271 isLearner bool
272
William Kurkianea869482019-04-09 15:16:11 -0400273 msgs []pb.Message
274
275 // the leader id
276 lead uint64
277 // leadTransferee is id of the leader transfer target when its value is not zero.
278 // Follow the procedure defined in raft thesis 3.10.
279 leadTransferee uint64
280 // Only one conf change may be pending (in the log, but not yet
281 // applied) at a time. This is enforced via pendingConfIndex, which
282 // is set to a value >= the log index of the latest pending
283 // configuration change (if any). Config changes are only allowed to
284 // be proposed if the leader's applied index is greater than this
285 // value.
286 pendingConfIndex uint64
287 // an estimate of the size of the uncommitted tail of the Raft log. Used to
288 // prevent unbounded log growth. Only maintained by the leader. Reset on
289 // term changes.
290 uncommittedSize uint64
291
292 readOnly *readOnly
293
294 // number of ticks since it reached last electionTimeout when it is leader
295 // or candidate.
296 // number of ticks since it reached last electionTimeout or received a
297 // valid message from current leader when it is a follower.
298 electionElapsed int
299
300 // number of ticks since it reached last heartbeatTimeout.
301 // only leader keeps heartbeatElapsed.
302 heartbeatElapsed int
303
304 checkQuorum bool
305 preVote bool
306
307 heartbeatTimeout int
308 electionTimeout int
309 // randomizedElectionTimeout is a random number between
310 // [electiontimeout, 2 * electiontimeout - 1]. It gets reset
311 // when raft changes its state to follower or candidate.
312 randomizedElectionTimeout int
313 disableProposalForwarding bool
314
315 tick func()
316 step stepFunc
317
318 logger Logger
319}
320
321func newRaft(c *Config) *raft {
322 if err := c.validate(); err != nil {
323 panic(err.Error())
324 }
325 raftlog := newLogWithSize(c.Storage, c.Logger, c.MaxCommittedSizePerReady)
326 hs, cs, err := c.Storage.InitialState()
327 if err != nil {
328 panic(err) // TODO(bdarnell)
329 }
330 peers := c.peers
331 learners := c.learners
332 if len(cs.Nodes) > 0 || len(cs.Learners) > 0 {
333 if len(peers) > 0 || len(learners) > 0 {
334 // TODO(bdarnell): the peers argument is always nil except in
335 // tests; the argument should be removed and these tests should be
336 // updated to specify their nodes through a snapshot.
337 panic("cannot specify both newRaft(peers, learners) and ConfState.(Nodes, Learners)")
338 }
339 peers = cs.Nodes
340 learners = cs.Learners
341 }
342 r := &raft{
343 id: c.ID,
344 lead: None,
345 isLearner: false,
346 raftLog: raftlog,
347 maxMsgSize: c.MaxSizePerMsg,
William Kurkianea869482019-04-09 15:16:11 -0400348 maxUncommittedSize: c.MaxUncommittedEntriesSize,
Abhilash S.L3b494632019-07-16 15:51:09 +0530349 prs: tracker.MakeProgressTracker(c.MaxInflightMsgs),
William Kurkianea869482019-04-09 15:16:11 -0400350 electionTimeout: c.ElectionTick,
351 heartbeatTimeout: c.HeartbeatTick,
352 logger: c.Logger,
353 checkQuorum: c.CheckQuorum,
354 preVote: c.PreVote,
355 readOnly: newReadOnly(c.ReadOnlyOption),
356 disableProposalForwarding: c.DisableProposalForwarding,
357 }
358 for _, p := range peers {
Abhilash S.L3b494632019-07-16 15:51:09 +0530359 // Add node to active config.
Devmalya Paulfb990a52019-07-09 10:01:49 -0400360 r.applyConfChange(pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: p})
William Kurkianea869482019-04-09 15:16:11 -0400361 }
362 for _, p := range learners {
Abhilash S.L3b494632019-07-16 15:51:09 +0530363 // Add learner to active config.
Devmalya Paulfb990a52019-07-09 10:01:49 -0400364 r.applyConfChange(pb.ConfChange{Type: pb.ConfChangeAddLearnerNode, NodeID: p})
William Kurkianea869482019-04-09 15:16:11 -0400365 }
366
367 if !isHardStateEqual(hs, emptyState) {
368 r.loadState(hs)
369 }
370 if c.Applied > 0 {
371 raftlog.appliedTo(c.Applied)
372 }
373 r.becomeFollower(r.Term, None)
374
375 var nodesStrs []string
Abhilash S.L3b494632019-07-16 15:51:09 +0530376 for _, n := range r.prs.VoterNodes() {
William Kurkianea869482019-04-09 15:16:11 -0400377 nodesStrs = append(nodesStrs, fmt.Sprintf("%x", n))
378 }
379
380 r.logger.Infof("newRaft %x [peers: [%s], term: %d, commit: %d, applied: %d, lastindex: %d, lastterm: %d]",
381 r.id, strings.Join(nodesStrs, ","), r.Term, r.raftLog.committed, r.raftLog.applied, r.raftLog.lastIndex(), r.raftLog.lastTerm())
382 return r
383}
384
385func (r *raft) hasLeader() bool { return r.lead != None }
386
387func (r *raft) softState() *SoftState { return &SoftState{Lead: r.lead, RaftState: r.state} }
388
389func (r *raft) hardState() pb.HardState {
390 return pb.HardState{
391 Term: r.Term,
392 Vote: r.Vote,
393 Commit: r.raftLog.committed,
394 }
395}
396
William Kurkianea869482019-04-09 15:16:11 -0400397// send persists state to stable storage and then sends to its mailbox.
398func (r *raft) send(m pb.Message) {
399 m.From = r.id
400 if m.Type == pb.MsgVote || m.Type == pb.MsgVoteResp || m.Type == pb.MsgPreVote || m.Type == pb.MsgPreVoteResp {
401 if m.Term == 0 {
402 // All {pre-,}campaign messages need to have the term set when
403 // sending.
404 // - MsgVote: m.Term is the term the node is campaigning for,
405 // non-zero as we increment the term when campaigning.
406 // - MsgVoteResp: m.Term is the new r.Term if the MsgVote was
407 // granted, non-zero for the same reason MsgVote is
408 // - MsgPreVote: m.Term is the term the node will campaign,
409 // non-zero as we use m.Term to indicate the next term we'll be
410 // campaigning for
411 // - MsgPreVoteResp: m.Term is the term received in the original
412 // MsgPreVote if the pre-vote was granted, non-zero for the
413 // same reasons MsgPreVote is
414 panic(fmt.Sprintf("term should be set when sending %s", m.Type))
415 }
416 } else {
417 if m.Term != 0 {
418 panic(fmt.Sprintf("term should not be set when sending %s (was %d)", m.Type, m.Term))
419 }
420 // do not attach term to MsgProp, MsgReadIndex
421 // proposals are a way to forward to the leader and
422 // should be treated as local message.
423 // MsgReadIndex is also forwarded to leader.
424 if m.Type != pb.MsgProp && m.Type != pb.MsgReadIndex {
425 m.Term = r.Term
426 }
427 }
428 r.msgs = append(r.msgs, m)
429}
430
William Kurkianea869482019-04-09 15:16:11 -0400431// sendAppend sends an append RPC with new entries (if any) and the
432// current commit index to the given peer.
433func (r *raft) sendAppend(to uint64) {
434 r.maybeSendAppend(to, true)
435}
436
437// maybeSendAppend sends an append RPC with new entries to the given peer,
438// if necessary. Returns true if a message was sent. The sendIfEmpty
439// argument controls whether messages with no entries will be sent
440// ("empty" messages are useful to convey updated Commit indexes, but
441// are undesirable when we're sending multiple messages in a batch).
442func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
Abhilash S.L3b494632019-07-16 15:51:09 +0530443 pr := r.prs.Progress[to]
William Kurkianea869482019-04-09 15:16:11 -0400444 if pr.IsPaused() {
445 return false
446 }
447 m := pb.Message{}
448 m.To = to
449
450 term, errt := r.raftLog.term(pr.Next - 1)
451 ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize)
452 if len(ents) == 0 && !sendIfEmpty {
453 return false
454 }
455
456 if errt != nil || erre != nil { // send snapshot if we failed to get term or entries
457 if !pr.RecentActive {
458 r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to)
459 return false
460 }
461
462 m.Type = pb.MsgSnap
463 snapshot, err := r.raftLog.snapshot()
464 if err != nil {
465 if err == ErrSnapshotTemporarilyUnavailable {
466 r.logger.Debugf("%x failed to send snapshot to %x because snapshot is temporarily unavailable", r.id, to)
467 return false
468 }
469 panic(err) // TODO(bdarnell)
470 }
471 if IsEmptySnap(snapshot) {
472 panic("need non-empty snapshot")
473 }
474 m.Snapshot = snapshot
475 sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
476 r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
477 r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr)
Abhilash S.L3b494632019-07-16 15:51:09 +0530478 pr.BecomeSnapshot(sindex)
William Kurkianea869482019-04-09 15:16:11 -0400479 r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr)
480 } else {
481 m.Type = pb.MsgApp
482 m.Index = pr.Next - 1
483 m.LogTerm = term
484 m.Entries = ents
485 m.Commit = r.raftLog.committed
486 if n := len(m.Entries); n != 0 {
487 switch pr.State {
Abhilash S.L3b494632019-07-16 15:51:09 +0530488 // optimistically increase the next when in StateReplicate
489 case tracker.StateReplicate:
William Kurkianea869482019-04-09 15:16:11 -0400490 last := m.Entries[n-1].Index
Abhilash S.L3b494632019-07-16 15:51:09 +0530491 pr.OptimisticUpdate(last)
492 pr.Inflights.Add(last)
493 case tracker.StateProbe:
494 pr.ProbeSent = true
William Kurkianea869482019-04-09 15:16:11 -0400495 default:
496 r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State)
497 }
498 }
499 }
500 r.send(m)
501 return true
502}
503
504// sendHeartbeat sends a heartbeat RPC to the given peer.
505func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
506 // Attach the commit as min(to.matched, r.committed).
507 // When the leader sends out heartbeat message,
508 // the receiver(follower) might not be matched with the leader
509 // or it might not have all the committed entries.
510 // The leader MUST NOT forward the follower's commit to
511 // an unmatched index.
Abhilash S.L3b494632019-07-16 15:51:09 +0530512 commit := min(r.prs.Progress[to].Match, r.raftLog.committed)
William Kurkianea869482019-04-09 15:16:11 -0400513 m := pb.Message{
514 To: to,
515 Type: pb.MsgHeartbeat,
516 Commit: commit,
517 Context: ctx,
518 }
519
520 r.send(m)
521}
522
William Kurkianea869482019-04-09 15:16:11 -0400523// bcastAppend sends RPC, with entries to all peers that are not up-to-date
524// according to the progress recorded in r.prs.
525func (r *raft) bcastAppend() {
Abhilash S.L3b494632019-07-16 15:51:09 +0530526 r.prs.Visit(func(id uint64, _ *tracker.Progress) {
William Kurkianea869482019-04-09 15:16:11 -0400527 if id == r.id {
528 return
529 }
530
531 r.sendAppend(id)
532 })
533}
534
535// bcastHeartbeat sends RPC, without entries to all the peers.
536func (r *raft) bcastHeartbeat() {
537 lastCtx := r.readOnly.lastPendingRequestCtx()
538 if len(lastCtx) == 0 {
539 r.bcastHeartbeatWithCtx(nil)
540 } else {
541 r.bcastHeartbeatWithCtx([]byte(lastCtx))
542 }
543}
544
545func (r *raft) bcastHeartbeatWithCtx(ctx []byte) {
Abhilash S.L3b494632019-07-16 15:51:09 +0530546 r.prs.Visit(func(id uint64, _ *tracker.Progress) {
William Kurkianea869482019-04-09 15:16:11 -0400547 if id == r.id {
548 return
549 }
550 r.sendHeartbeat(id, ctx)
551 })
552}
553
554// maybeCommit attempts to advance the commit index. Returns true if
555// the commit index changed (in which case the caller should call
556// r.bcastAppend).
557func (r *raft) maybeCommit() bool {
Abhilash S.L3b494632019-07-16 15:51:09 +0530558 mci := r.prs.Committed()
William Kurkianea869482019-04-09 15:16:11 -0400559 return r.raftLog.maybeCommit(mci, r.Term)
560}
561
562func (r *raft) reset(term uint64) {
563 if r.Term != term {
564 r.Term = term
565 r.Vote = None
566 }
567 r.lead = None
568
569 r.electionElapsed = 0
570 r.heartbeatElapsed = 0
571 r.resetRandomizedElectionTimeout()
572
573 r.abortLeaderTransfer()
574
Abhilash S.L3b494632019-07-16 15:51:09 +0530575 r.prs.ResetVotes()
576 r.prs.Visit(func(id uint64, pr *tracker.Progress) {
577 *pr = tracker.Progress{
578 Match: 0,
579 Next: r.raftLog.lastIndex() + 1,
580 Inflights: tracker.NewInflights(r.prs.MaxInflight),
581 IsLearner: pr.IsLearner,
582 }
William Kurkianea869482019-04-09 15:16:11 -0400583 if id == r.id {
584 pr.Match = r.raftLog.lastIndex()
585 }
586 })
587
588 r.pendingConfIndex = 0
589 r.uncommittedSize = 0
590 r.readOnly = newReadOnly(r.readOnly.option)
591}
592
593func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) {
594 li := r.raftLog.lastIndex()
595 for i := range es {
596 es[i].Term = r.Term
597 es[i].Index = li + 1 + uint64(i)
598 }
599 // Track the size of this uncommitted proposal.
600 if !r.increaseUncommittedSize(es) {
601 r.logger.Debugf(
602 "%x appending new entries to log would exceed uncommitted entry size limit; dropping proposal",
603 r.id,
604 )
605 // Drop the proposal.
606 return false
607 }
608 // use latest "last" index after truncate/append
609 li = r.raftLog.append(es...)
Abhilash S.L3b494632019-07-16 15:51:09 +0530610 r.prs.Progress[r.id].MaybeUpdate(li)
William Kurkianea869482019-04-09 15:16:11 -0400611 // Regardless of maybeCommit's return, our caller will call bcastAppend.
612 r.maybeCommit()
613 return true
614}
615
616// tickElection is run by followers and candidates after r.electionTimeout.
617func (r *raft) tickElection() {
618 r.electionElapsed++
619
620 if r.promotable() && r.pastElectionTimeout() {
621 r.electionElapsed = 0
622 r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
623 }
624}
625
626// tickHeartbeat is run by leaders to send a MsgBeat after r.heartbeatTimeout.
627func (r *raft) tickHeartbeat() {
628 r.heartbeatElapsed++
629 r.electionElapsed++
630
631 if r.electionElapsed >= r.electionTimeout {
632 r.electionElapsed = 0
633 if r.checkQuorum {
634 r.Step(pb.Message{From: r.id, Type: pb.MsgCheckQuorum})
635 }
636 // If current leader cannot transfer leadership in electionTimeout, it becomes leader again.
637 if r.state == StateLeader && r.leadTransferee != None {
638 r.abortLeaderTransfer()
639 }
640 }
641
642 if r.state != StateLeader {
643 return
644 }
645
646 if r.heartbeatElapsed >= r.heartbeatTimeout {
647 r.heartbeatElapsed = 0
648 r.Step(pb.Message{From: r.id, Type: pb.MsgBeat})
649 }
650}
651
652func (r *raft) becomeFollower(term uint64, lead uint64) {
653 r.step = stepFollower
654 r.reset(term)
655 r.tick = r.tickElection
656 r.lead = lead
657 r.state = StateFollower
658 r.logger.Infof("%x became follower at term %d", r.id, r.Term)
659}
660
661func (r *raft) becomeCandidate() {
662 // TODO(xiangli) remove the panic when the raft implementation is stable
663 if r.state == StateLeader {
664 panic("invalid transition [leader -> candidate]")
665 }
666 r.step = stepCandidate
667 r.reset(r.Term + 1)
668 r.tick = r.tickElection
669 r.Vote = r.id
670 r.state = StateCandidate
671 r.logger.Infof("%x became candidate at term %d", r.id, r.Term)
672}
673
674func (r *raft) becomePreCandidate() {
675 // TODO(xiangli) remove the panic when the raft implementation is stable
676 if r.state == StateLeader {
677 panic("invalid transition [leader -> pre-candidate]")
678 }
679 // Becoming a pre-candidate changes our step functions and state,
680 // but doesn't change anything else. In particular it does not increase
681 // r.Term or change r.Vote.
682 r.step = stepCandidate
Abhilash S.L3b494632019-07-16 15:51:09 +0530683 r.prs.ResetVotes()
William Kurkianea869482019-04-09 15:16:11 -0400684 r.tick = r.tickElection
685 r.lead = None
686 r.state = StatePreCandidate
687 r.logger.Infof("%x became pre-candidate at term %d", r.id, r.Term)
688}
689
690func (r *raft) becomeLeader() {
691 // TODO(xiangli) remove the panic when the raft implementation is stable
692 if r.state == StateFollower {
693 panic("invalid transition [follower -> leader]")
694 }
695 r.step = stepLeader
696 r.reset(r.Term)
697 r.tick = r.tickHeartbeat
698 r.lead = r.id
699 r.state = StateLeader
700 // Followers enter replicate mode when they've been successfully probed
701 // (perhaps after having received a snapshot as a result). The leader is
702 // trivially in this state. Note that r.reset() has initialized this
703 // progress with the last index already.
Abhilash S.L3b494632019-07-16 15:51:09 +0530704 r.prs.Progress[r.id].BecomeReplicate()
William Kurkianea869482019-04-09 15:16:11 -0400705
706 // Conservatively set the pendingConfIndex to the last index in the
707 // log. There may or may not be a pending config change, but it's
708 // safe to delay any future proposals until we commit all our
709 // pending log entries, and scanning the entire tail of the log
710 // could be expensive.
711 r.pendingConfIndex = r.raftLog.lastIndex()
712
713 emptyEnt := pb.Entry{Data: nil}
714 if !r.appendEntry(emptyEnt) {
715 // This won't happen because we just called reset() above.
716 r.logger.Panic("empty entry was dropped")
717 }
718 // As a special case, don't count the initial empty entry towards the
719 // uncommitted log quota. This is because we want to preserve the
720 // behavior of allowing one entry larger than quota if the current
721 // usage is zero.
722 r.reduceUncommittedSize([]pb.Entry{emptyEnt})
723 r.logger.Infof("%x became leader at term %d", r.id, r.Term)
724}
725
Abhilash S.L3b494632019-07-16 15:51:09 +0530726// campaign transitions the raft instance to candidate state. This must only be
727// called after verifying that this is a legitimate transition.
William Kurkianea869482019-04-09 15:16:11 -0400728func (r *raft) campaign(t CampaignType) {
Abhilash S.L3b494632019-07-16 15:51:09 +0530729 if !r.promotable() {
730 // This path should not be hit (callers are supposed to check), but
731 // better safe than sorry.
732 r.logger.Warningf("%x is unpromotable; campaign() should have been called", r.id)
733 }
William Kurkianea869482019-04-09 15:16:11 -0400734 var term uint64
735 var voteMsg pb.MessageType
736 if t == campaignPreElection {
737 r.becomePreCandidate()
738 voteMsg = pb.MsgPreVote
739 // PreVote RPCs are sent for the next term before we've incremented r.Term.
740 term = r.Term + 1
741 } else {
742 r.becomeCandidate()
743 voteMsg = pb.MsgVote
744 term = r.Term
745 }
Abhilash S.L3b494632019-07-16 15:51:09 +0530746 if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == quorum.VoteWon {
William Kurkianea869482019-04-09 15:16:11 -0400747 // We won the election after voting for ourselves (which must mean that
748 // this is a single-node cluster). Advance to the next state.
749 if t == campaignPreElection {
750 r.campaign(campaignElection)
751 } else {
752 r.becomeLeader()
753 }
754 return
755 }
Abhilash S.L3b494632019-07-16 15:51:09 +0530756 for id := range r.prs.Voters.IDs() {
William Kurkianea869482019-04-09 15:16:11 -0400757 if id == r.id {
758 continue
759 }
760 r.logger.Infof("%x [logterm: %d, index: %d] sent %s request to %x at term %d",
761 r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), voteMsg, id, r.Term)
762
763 var ctx []byte
764 if t == campaignTransfer {
765 ctx = []byte(t)
766 }
767 r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
768 }
769}
770
Abhilash S.L3b494632019-07-16 15:51:09 +0530771func (r *raft) poll(id uint64, t pb.MessageType, v bool) (granted int, rejected int, result quorum.VoteResult) {
William Kurkianea869482019-04-09 15:16:11 -0400772 if v {
773 r.logger.Infof("%x received %s from %x at term %d", r.id, t, id, r.Term)
774 } else {
775 r.logger.Infof("%x received %s rejection from %x at term %d", r.id, t, id, r.Term)
776 }
Abhilash S.L3b494632019-07-16 15:51:09 +0530777 r.prs.RecordVote(id, v)
778 return r.prs.TallyVotes()
William Kurkianea869482019-04-09 15:16:11 -0400779}
780
781func (r *raft) Step(m pb.Message) error {
782 // Handle the message term, which may result in our stepping down to a follower.
783 switch {
784 case m.Term == 0:
785 // local message
786 case m.Term > r.Term:
787 if m.Type == pb.MsgVote || m.Type == pb.MsgPreVote {
788 force := bytes.Equal(m.Context, []byte(campaignTransfer))
789 inLease := r.checkQuorum && r.lead != None && r.electionElapsed < r.electionTimeout
790 if !force && inLease {
791 // If a server receives a RequestVote request within the minimum election timeout
792 // of hearing from a current leader, it does not update its term or grant its vote
793 r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] ignored %s from %x [logterm: %d, index: %d] at term %d: lease is not expired (remaining ticks: %d)",
794 r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term, r.electionTimeout-r.electionElapsed)
795 return nil
796 }
797 }
798 switch {
799 case m.Type == pb.MsgPreVote:
800 // Never change our term in response to a PreVote
801 case m.Type == pb.MsgPreVoteResp && !m.Reject:
802 // We send pre-vote requests with a term in our future. If the
803 // pre-vote is granted, we will increment our term when we get a
804 // quorum. If it is not, the term comes from the node that
805 // rejected our vote so we should become a follower at the new
806 // term.
807 default:
808 r.logger.Infof("%x [term: %d] received a %s message with higher term from %x [term: %d]",
809 r.id, r.Term, m.Type, m.From, m.Term)
810 if m.Type == pb.MsgApp || m.Type == pb.MsgHeartbeat || m.Type == pb.MsgSnap {
811 r.becomeFollower(m.Term, m.From)
812 } else {
813 r.becomeFollower(m.Term, None)
814 }
815 }
816
817 case m.Term < r.Term:
818 if (r.checkQuorum || r.preVote) && (m.Type == pb.MsgHeartbeat || m.Type == pb.MsgApp) {
819 // We have received messages from a leader at a lower term. It is possible
820 // that these messages were simply delayed in the network, but this could
821 // also mean that this node has advanced its term number during a network
822 // partition, and it is now unable to either win an election or to rejoin
823 // the majority on the old term. If checkQuorum is false, this will be
824 // handled by incrementing term numbers in response to MsgVote with a
825 // higher term, but if checkQuorum is true we may not advance the term on
826 // MsgVote and must generate other messages to advance the term. The net
827 // result of these two features is to minimize the disruption caused by
828 // nodes that have been removed from the cluster's configuration: a
829 // removed node will send MsgVotes (or MsgPreVotes) which will be ignored,
830 // but it will not receive MsgApp or MsgHeartbeat, so it will not create
831 // disruptive term increases, by notifying leader of this node's activeness.
832 // The above comments also true for Pre-Vote
833 //
834 // When follower gets isolated, it soon starts an election ending
835 // up with a higher term than leader, although it won't receive enough
836 // votes to win the election. When it regains connectivity, this response
837 // with "pb.MsgAppResp" of higher term would force leader to step down.
838 // However, this disruption is inevitable to free this stuck node with
839 // fresh election. This can be prevented with Pre-Vote phase.
840 r.send(pb.Message{To: m.From, Type: pb.MsgAppResp})
841 } else if m.Type == pb.MsgPreVote {
842 // Before Pre-Vote enable, there may have candidate with higher term,
843 // but less log. After update to Pre-Vote, the cluster may deadlock if
844 // we drop messages with a lower term.
845 r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d",
846 r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
847 r.send(pb.Message{To: m.From, Term: r.Term, Type: pb.MsgPreVoteResp, Reject: true})
848 } else {
849 // ignore other cases
850 r.logger.Infof("%x [term: %d] ignored a %s message with lower term from %x [term: %d]",
851 r.id, r.Term, m.Type, m.From, m.Term)
852 }
853 return nil
854 }
855
856 switch m.Type {
857 case pb.MsgHup:
858 if r.state != StateLeader {
Abhilash S.L3b494632019-07-16 15:51:09 +0530859 if !r.promotable() {
860 r.logger.Warningf("%x is unpromotable and can not campaign; ignoring MsgHup", r.id)
861 return nil
862 }
William Kurkianea869482019-04-09 15:16:11 -0400863 ents, err := r.raftLog.slice(r.raftLog.applied+1, r.raftLog.committed+1, noLimit)
864 if err != nil {
865 r.logger.Panicf("unexpected error getting unapplied entries (%v)", err)
866 }
867 if n := numOfPendingConf(ents); n != 0 && r.raftLog.committed > r.raftLog.applied {
868 r.logger.Warningf("%x cannot campaign at term %d since there are still %d pending configuration changes to apply", r.id, r.Term, n)
869 return nil
870 }
871
872 r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term)
873 if r.preVote {
874 r.campaign(campaignPreElection)
875 } else {
876 r.campaign(campaignElection)
877 }
878 } else {
879 r.logger.Debugf("%x ignoring MsgHup because already leader", r.id)
880 }
881
882 case pb.MsgVote, pb.MsgPreVote:
883 if r.isLearner {
884 // TODO: learner may need to vote, in case of node down when confchange.
885 r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] ignored %s from %x [logterm: %d, index: %d] at term %d: learner can not vote",
886 r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
887 return nil
888 }
889 // We can vote if this is a repeat of a vote we've already cast...
890 canVote := r.Vote == m.From ||
891 // ...we haven't voted and we don't think there's a leader yet in this term...
892 (r.Vote == None && r.lead == None) ||
893 // ...or this is a PreVote for a future term...
894 (m.Type == pb.MsgPreVote && m.Term > r.Term)
895 // ...and we believe the candidate is up to date.
896 if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
897 r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] cast %s for %x [logterm: %d, index: %d] at term %d",
898 r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
899 // When responding to Msg{Pre,}Vote messages we include the term
900 // from the message, not the local term. To see why, consider the
901 // case where a single node was previously partitioned away and
902 // it's local term is now out of date. If we include the local term
903 // (recall that for pre-votes we don't update the local term), the
904 // (pre-)campaigning node on the other end will proceed to ignore
905 // the message (it ignores all out of date messages).
906 // The term in the original message and current local term are the
907 // same in the case of regular votes, but different for pre-votes.
908 r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)})
909 if m.Type == pb.MsgVote {
910 // Only record real votes.
911 r.electionElapsed = 0
912 r.Vote = m.From
913 }
914 } else {
915 r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d",
916 r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
917 r.send(pb.Message{To: m.From, Term: r.Term, Type: voteRespMsgType(m.Type), Reject: true})
918 }
919
920 default:
921 err := r.step(r, m)
922 if err != nil {
923 return err
924 }
925 }
926 return nil
927}
928
929type stepFunc func(r *raft, m pb.Message) error
930
931func stepLeader(r *raft, m pb.Message) error {
932 // These message types do not require any progress for m.From.
933 switch m.Type {
934 case pb.MsgBeat:
935 r.bcastHeartbeat()
936 return nil
937 case pb.MsgCheckQuorum:
Abhilash S.L3b494632019-07-16 15:51:09 +0530938 // The leader should always see itself as active. As a precaution, handle
939 // the case in which the leader isn't in the configuration any more (for
940 // example if it just removed itself).
941 //
942 // TODO(tbg): I added a TODO in removeNode, it doesn't seem that the
943 // leader steps down when removing itself. I might be missing something.
944 if pr := r.prs.Progress[r.id]; pr != nil {
945 pr.RecentActive = true
946 }
947 if !r.prs.QuorumActive() {
William Kurkianea869482019-04-09 15:16:11 -0400948 r.logger.Warningf("%x stepped down to follower since quorum is not active", r.id)
949 r.becomeFollower(r.Term, None)
950 }
Abhilash S.L3b494632019-07-16 15:51:09 +0530951 // Mark everyone (but ourselves) as inactive in preparation for the next
952 // CheckQuorum.
953 r.prs.Visit(func(id uint64, pr *tracker.Progress) {
954 if id != r.id {
955 pr.RecentActive = false
956 }
957 })
William Kurkianea869482019-04-09 15:16:11 -0400958 return nil
959 case pb.MsgProp:
960 if len(m.Entries) == 0 {
961 r.logger.Panicf("%x stepped empty MsgProp", r.id)
962 }
Abhilash S.L3b494632019-07-16 15:51:09 +0530963 if r.prs.Progress[r.id] == nil {
William Kurkianea869482019-04-09 15:16:11 -0400964 // If we are not currently a member of the range (i.e. this node
965 // was removed from the configuration while serving as leader),
966 // drop any new proposals.
967 return ErrProposalDropped
968 }
969 if r.leadTransferee != None {
970 r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee)
971 return ErrProposalDropped
972 }
973
Abhilash S.L3b494632019-07-16 15:51:09 +0530974 for i := range m.Entries {
975 e := &m.Entries[i]
William Kurkianea869482019-04-09 15:16:11 -0400976 if e.Type == pb.EntryConfChange {
977 if r.pendingConfIndex > r.raftLog.applied {
978 r.logger.Infof("propose conf %s ignored since pending unapplied configuration [index %d, applied %d]",
Abhilash S.L3b494632019-07-16 15:51:09 +0530979 e, r.pendingConfIndex, r.raftLog.applied)
William Kurkianea869482019-04-09 15:16:11 -0400980 m.Entries[i] = pb.Entry{Type: pb.EntryNormal}
981 } else {
982 r.pendingConfIndex = r.raftLog.lastIndex() + uint64(i) + 1
983 }
984 }
985 }
986
987 if !r.appendEntry(m.Entries...) {
988 return ErrProposalDropped
989 }
990 r.bcastAppend()
991 return nil
992 case pb.MsgReadIndex:
Abhilash S.L3b494632019-07-16 15:51:09 +0530993 // If more than the local vote is needed, go through a full broadcast,
994 // otherwise optimize.
995 if !r.prs.IsSingleton() {
William Kurkianea869482019-04-09 15:16:11 -0400996 if r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) != r.Term {
997 // Reject read only request when this leader has not committed any log entry at its term.
998 return nil
999 }
1000
1001 // thinking: use an interally defined context instead of the user given context.
1002 // We can express this in terms of the term and index instead of a user-supplied value.
1003 // This would allow multiple reads to piggyback on the same message.
1004 switch r.readOnly.option {
1005 case ReadOnlySafe:
1006 r.readOnly.addRequest(r.raftLog.committed, m)
Abhilash S.L3b494632019-07-16 15:51:09 +05301007 // The local node automatically acks the request.
1008 r.readOnly.recvAck(r.id, m.Entries[0].Data)
William Kurkianea869482019-04-09 15:16:11 -04001009 r.bcastHeartbeatWithCtx(m.Entries[0].Data)
1010 case ReadOnlyLeaseBased:
1011 ri := r.raftLog.committed
1012 if m.From == None || m.From == r.id { // from local member
1013 r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data})
1014 } else {
1015 r.send(pb.Message{To: m.From, Type: pb.MsgReadIndexResp, Index: ri, Entries: m.Entries})
1016 }
1017 }
Abhilash S.L3b494632019-07-16 15:51:09 +05301018 } else { // only one voting member (the leader) in the cluster
William Kurkianea869482019-04-09 15:16:11 -04001019 if m.From == None || m.From == r.id { // from leader itself
1020 r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data})
1021 } else { // from learner member
1022 r.send(pb.Message{To: m.From, Type: pb.MsgReadIndexResp, Index: r.raftLog.committed, Entries: m.Entries})
1023 }
1024 }
1025
1026 return nil
1027 }
1028
1029 // All other message types require a progress for m.From (pr).
Abhilash S.L3b494632019-07-16 15:51:09 +05301030 pr := r.prs.Progress[m.From]
William Kurkianea869482019-04-09 15:16:11 -04001031 if pr == nil {
1032 r.logger.Debugf("%x no progress available for %x", r.id, m.From)
1033 return nil
1034 }
1035 switch m.Type {
1036 case pb.MsgAppResp:
1037 pr.RecentActive = true
1038
1039 if m.Reject {
1040 r.logger.Debugf("%x received msgApp rejection(lastindex: %d) from %x for index %d",
1041 r.id, m.RejectHint, m.From, m.Index)
Abhilash S.L3b494632019-07-16 15:51:09 +05301042 if pr.MaybeDecrTo(m.Index, m.RejectHint) {
William Kurkianea869482019-04-09 15:16:11 -04001043 r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr)
Abhilash S.L3b494632019-07-16 15:51:09 +05301044 if pr.State == tracker.StateReplicate {
1045 pr.BecomeProbe()
William Kurkianea869482019-04-09 15:16:11 -04001046 }
1047 r.sendAppend(m.From)
1048 }
1049 } else {
1050 oldPaused := pr.IsPaused()
Abhilash S.L3b494632019-07-16 15:51:09 +05301051 if pr.MaybeUpdate(m.Index) {
William Kurkianea869482019-04-09 15:16:11 -04001052 switch {
Abhilash S.L3b494632019-07-16 15:51:09 +05301053 case pr.State == tracker.StateProbe:
1054 pr.BecomeReplicate()
1055 case pr.State == tracker.StateSnapshot && pr.Match >= pr.PendingSnapshot:
1056 r.logger.Debugf("%x recovered from needing snapshot, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
William Kurkianea869482019-04-09 15:16:11 -04001057 // Transition back to replicating state via probing state
1058 // (which takes the snapshot into account). If we didn't
1059 // move to replicating state, that would only happen with
1060 // the next round of appends (but there may not be a next
1061 // round for a while, exposing an inconsistent RaftStatus).
Abhilash S.L3b494632019-07-16 15:51:09 +05301062 pr.BecomeProbe()
1063 pr.BecomeReplicate()
1064 case pr.State == tracker.StateReplicate:
1065 pr.Inflights.FreeLE(m.Index)
William Kurkianea869482019-04-09 15:16:11 -04001066 }
1067
1068 if r.maybeCommit() {
1069 r.bcastAppend()
1070 } else if oldPaused {
1071 // If we were paused before, this node may be missing the
1072 // latest commit index, so send it.
1073 r.sendAppend(m.From)
1074 }
1075 // We've updated flow control information above, which may
1076 // allow us to send multiple (size-limited) in-flight messages
1077 // at once (such as when transitioning from probe to
1078 // replicate, or when freeTo() covers multiple messages). If
1079 // we have more entries to send, send as many messages as we
1080 // can (without sending empty messages for the commit index)
1081 for r.maybeSendAppend(m.From, false) {
1082 }
1083 // Transfer leadership is in progress.
1084 if m.From == r.leadTransferee && pr.Match == r.raftLog.lastIndex() {
1085 r.logger.Infof("%x sent MsgTimeoutNow to %x after received MsgAppResp", r.id, m.From)
1086 r.sendTimeoutNow(m.From)
1087 }
1088 }
1089 }
1090 case pb.MsgHeartbeatResp:
1091 pr.RecentActive = true
Abhilash S.L3b494632019-07-16 15:51:09 +05301092 pr.ProbeSent = false
William Kurkianea869482019-04-09 15:16:11 -04001093
1094 // free one slot for the full inflights window to allow progress.
Abhilash S.L3b494632019-07-16 15:51:09 +05301095 if pr.State == tracker.StateReplicate && pr.Inflights.Full() {
1096 pr.Inflights.FreeFirstOne()
William Kurkianea869482019-04-09 15:16:11 -04001097 }
1098 if pr.Match < r.raftLog.lastIndex() {
1099 r.sendAppend(m.From)
1100 }
1101
1102 if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 {
1103 return nil
1104 }
1105
Abhilash S.L3b494632019-07-16 15:51:09 +05301106 if r.prs.Voters.VoteResult(r.readOnly.recvAck(m.From, m.Context)) != quorum.VoteWon {
William Kurkianea869482019-04-09 15:16:11 -04001107 return nil
1108 }
1109
1110 rss := r.readOnly.advance(m)
1111 for _, rs := range rss {
1112 req := rs.req
1113 if req.From == None || req.From == r.id { // from local member
1114 r.readStates = append(r.readStates, ReadState{Index: rs.index, RequestCtx: req.Entries[0].Data})
1115 } else {
1116 r.send(pb.Message{To: req.From, Type: pb.MsgReadIndexResp, Index: rs.index, Entries: req.Entries})
1117 }
1118 }
1119 case pb.MsgSnapStatus:
Abhilash S.L3b494632019-07-16 15:51:09 +05301120 if pr.State != tracker.StateSnapshot {
William Kurkianea869482019-04-09 15:16:11 -04001121 return nil
1122 }
Abhilash S.L3b494632019-07-16 15:51:09 +05301123 // TODO(tbg): this code is very similar to the snapshot handling in
1124 // MsgAppResp above. In fact, the code there is more correct than the
1125 // code here and should likely be updated to match (or even better, the
1126 // logic pulled into a newly created Progress state machine handler).
William Kurkianea869482019-04-09 15:16:11 -04001127 if !m.Reject {
Abhilash S.L3b494632019-07-16 15:51:09 +05301128 pr.BecomeProbe()
William Kurkianea869482019-04-09 15:16:11 -04001129 r.logger.Debugf("%x snapshot succeeded, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
1130 } else {
Abhilash S.L3b494632019-07-16 15:51:09 +05301131 // NB: the order here matters or we'll be probing erroneously from
1132 // the snapshot index, but the snapshot never applied.
1133 pr.PendingSnapshot = 0
1134 pr.BecomeProbe()
William Kurkianea869482019-04-09 15:16:11 -04001135 r.logger.Debugf("%x snapshot failed, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
1136 }
1137 // If snapshot finish, wait for the msgAppResp from the remote node before sending
1138 // out the next msgApp.
1139 // If snapshot failure, wait for a heartbeat interval before next try
Abhilash S.L3b494632019-07-16 15:51:09 +05301140 pr.ProbeSent = true
William Kurkianea869482019-04-09 15:16:11 -04001141 case pb.MsgUnreachable:
1142 // During optimistic replication, if the remote becomes unreachable,
1143 // there is huge probability that a MsgApp is lost.
Abhilash S.L3b494632019-07-16 15:51:09 +05301144 if pr.State == tracker.StateReplicate {
1145 pr.BecomeProbe()
William Kurkianea869482019-04-09 15:16:11 -04001146 }
1147 r.logger.Debugf("%x failed to send message to %x because it is unreachable [%s]", r.id, m.From, pr)
1148 case pb.MsgTransferLeader:
1149 if pr.IsLearner {
1150 r.logger.Debugf("%x is learner. Ignored transferring leadership", r.id)
1151 return nil
1152 }
1153 leadTransferee := m.From
1154 lastLeadTransferee := r.leadTransferee
1155 if lastLeadTransferee != None {
1156 if lastLeadTransferee == leadTransferee {
1157 r.logger.Infof("%x [term %d] transfer leadership to %x is in progress, ignores request to same node %x",
1158 r.id, r.Term, leadTransferee, leadTransferee)
1159 return nil
1160 }
1161 r.abortLeaderTransfer()
1162 r.logger.Infof("%x [term %d] abort previous transferring leadership to %x", r.id, r.Term, lastLeadTransferee)
1163 }
1164 if leadTransferee == r.id {
1165 r.logger.Debugf("%x is already leader. Ignored transferring leadership to self", r.id)
1166 return nil
1167 }
1168 // Transfer leadership to third party.
1169 r.logger.Infof("%x [term %d] starts to transfer leadership to %x", r.id, r.Term, leadTransferee)
1170 // Transfer leadership should be finished in one electionTimeout, so reset r.electionElapsed.
1171 r.electionElapsed = 0
1172 r.leadTransferee = leadTransferee
1173 if pr.Match == r.raftLog.lastIndex() {
1174 r.sendTimeoutNow(leadTransferee)
1175 r.logger.Infof("%x sends MsgTimeoutNow to %x immediately as %x already has up-to-date log", r.id, leadTransferee, leadTransferee)
1176 } else {
1177 r.sendAppend(leadTransferee)
1178 }
1179 }
1180 return nil
1181}
1182
1183// stepCandidate is shared by StateCandidate and StatePreCandidate; the difference is
1184// whether they respond to MsgVoteResp or MsgPreVoteResp.
1185func stepCandidate(r *raft, m pb.Message) error {
1186 // Only handle vote responses corresponding to our candidacy (while in
1187 // StateCandidate, we may get stale MsgPreVoteResp messages in this term from
1188 // our pre-candidate state).
1189 var myVoteRespType pb.MessageType
1190 if r.state == StatePreCandidate {
1191 myVoteRespType = pb.MsgPreVoteResp
1192 } else {
1193 myVoteRespType = pb.MsgVoteResp
1194 }
1195 switch m.Type {
1196 case pb.MsgProp:
1197 r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
1198 return ErrProposalDropped
1199 case pb.MsgApp:
1200 r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
1201 r.handleAppendEntries(m)
1202 case pb.MsgHeartbeat:
1203 r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
1204 r.handleHeartbeat(m)
1205 case pb.MsgSnap:
1206 r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
1207 r.handleSnapshot(m)
1208 case myVoteRespType:
Abhilash S.L3b494632019-07-16 15:51:09 +05301209 gr, rj, res := r.poll(m.From, m.Type, !m.Reject)
1210 r.logger.Infof("%x has received %d %s votes and %d vote rejections", r.id, gr, m.Type, rj)
1211 switch res {
1212 case quorum.VoteWon:
William Kurkianea869482019-04-09 15:16:11 -04001213 if r.state == StatePreCandidate {
1214 r.campaign(campaignElection)
1215 } else {
1216 r.becomeLeader()
1217 r.bcastAppend()
1218 }
Abhilash S.L3b494632019-07-16 15:51:09 +05301219 case quorum.VoteLost:
William Kurkianea869482019-04-09 15:16:11 -04001220 // pb.MsgPreVoteResp contains future term of pre-candidate
1221 // m.Term > r.Term; reuse r.Term
1222 r.becomeFollower(r.Term, None)
1223 }
1224 case pb.MsgTimeoutNow:
1225 r.logger.Debugf("%x [term %d state %v] ignored MsgTimeoutNow from %x", r.id, r.Term, r.state, m.From)
1226 }
1227 return nil
1228}
1229
1230func stepFollower(r *raft, m pb.Message) error {
1231 switch m.Type {
1232 case pb.MsgProp:
1233 if r.lead == None {
1234 r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
1235 return ErrProposalDropped
1236 } else if r.disableProposalForwarding {
1237 r.logger.Infof("%x not forwarding to leader %x at term %d; dropping proposal", r.id, r.lead, r.Term)
1238 return ErrProposalDropped
1239 }
1240 m.To = r.lead
1241 r.send(m)
1242 case pb.MsgApp:
1243 r.electionElapsed = 0
1244 r.lead = m.From
1245 r.handleAppendEntries(m)
1246 case pb.MsgHeartbeat:
1247 r.electionElapsed = 0
1248 r.lead = m.From
1249 r.handleHeartbeat(m)
1250 case pb.MsgSnap:
1251 r.electionElapsed = 0
1252 r.lead = m.From
1253 r.handleSnapshot(m)
1254 case pb.MsgTransferLeader:
1255 if r.lead == None {
1256 r.logger.Infof("%x no leader at term %d; dropping leader transfer msg", r.id, r.Term)
1257 return nil
1258 }
1259 m.To = r.lead
1260 r.send(m)
1261 case pb.MsgTimeoutNow:
1262 if r.promotable() {
1263 r.logger.Infof("%x [term %d] received MsgTimeoutNow from %x and starts an election to get leadership.", r.id, r.Term, m.From)
1264 // Leadership transfers never use pre-vote even if r.preVote is true; we
1265 // know we are not recovering from a partition so there is no need for the
1266 // extra round trip.
1267 r.campaign(campaignTransfer)
1268 } else {
1269 r.logger.Infof("%x received MsgTimeoutNow from %x but is not promotable", r.id, m.From)
1270 }
1271 case pb.MsgReadIndex:
1272 if r.lead == None {
1273 r.logger.Infof("%x no leader at term %d; dropping index reading msg", r.id, r.Term)
1274 return nil
1275 }
1276 m.To = r.lead
1277 r.send(m)
1278 case pb.MsgReadIndexResp:
1279 if len(m.Entries) != 1 {
1280 r.logger.Errorf("%x invalid format of MsgReadIndexResp from %x, entries count: %d", r.id, m.From, len(m.Entries))
1281 return nil
1282 }
1283 r.readStates = append(r.readStates, ReadState{Index: m.Index, RequestCtx: m.Entries[0].Data})
1284 }
1285 return nil
1286}
1287
1288func (r *raft) handleAppendEntries(m pb.Message) {
1289 if m.Index < r.raftLog.committed {
1290 r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
1291 return
1292 }
1293
1294 if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
1295 r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
1296 } else {
1297 r.logger.Debugf("%x [logterm: %d, index: %d] rejected msgApp [logterm: %d, index: %d] from %x",
1298 r.id, r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From)
1299 r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true, RejectHint: r.raftLog.lastIndex()})
1300 }
1301}
1302
1303func (r *raft) handleHeartbeat(m pb.Message) {
1304 r.raftLog.commitTo(m.Commit)
1305 r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context})
1306}
1307
1308func (r *raft) handleSnapshot(m pb.Message) {
1309 sindex, sterm := m.Snapshot.Metadata.Index, m.Snapshot.Metadata.Term
1310 if r.restore(m.Snapshot) {
1311 r.logger.Infof("%x [commit: %d] restored snapshot [index: %d, term: %d]",
1312 r.id, r.raftLog.committed, sindex, sterm)
1313 r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()})
1314 } else {
1315 r.logger.Infof("%x [commit: %d] ignored snapshot [index: %d, term: %d]",
1316 r.id, r.raftLog.committed, sindex, sterm)
1317 r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
1318 }
1319}
1320
1321// restore recovers the state machine from a snapshot. It restores the log and the
Abhilash S.L3b494632019-07-16 15:51:09 +05301322// configuration of state machine. If this method returns false, the snapshot was
1323// ignored, either because it was obsolete or because of an error.
William Kurkianea869482019-04-09 15:16:11 -04001324func (r *raft) restore(s pb.Snapshot) bool {
1325 if s.Metadata.Index <= r.raftLog.committed {
1326 return false
1327 }
Abhilash S.L3b494632019-07-16 15:51:09 +05301328 if r.state != StateFollower {
1329 // This is defense-in-depth: if the leader somehow ended up applying a
1330 // snapshot, it could move into a new term without moving into a
1331 // follower state. This should never fire, but if it did, we'd have
1332 // prevented damage by returning early, so log only a loud warning.
1333 //
1334 // At the time of writing, the instance is guaranteed to be in follower
1335 // state when this method is called.
1336 r.logger.Warningf("%x attempted to restore snapshot as leader; should never happen", r.id)
1337 r.becomeFollower(r.Term+1, None)
1338 return false
1339 }
1340
1341 // More defense-in-depth: throw away snapshot if recipient is not in the
1342 // config. This shouuldn't ever happen (at the time of writing) but lots of
1343 // code here and there assumes that r.id is in the progress tracker.
1344 found := false
1345 cs := s.Metadata.ConfState
1346 for _, set := range [][]uint64{
1347 cs.Nodes,
1348 cs.Learners,
1349 } {
1350 for _, id := range set {
1351 if id == r.id {
1352 found = true
1353 break
1354 }
1355 }
1356 }
1357 if !found {
1358 r.logger.Warningf(
1359 "%x attempted to restore snapshot but it is not in the ConfState %v; should never happen",
1360 r.id, cs,
1361 )
1362 return false
1363 }
1364
1365 // Now go ahead and actually restore.
1366
William Kurkianea869482019-04-09 15:16:11 -04001367 if r.raftLog.matchTerm(s.Metadata.Index, s.Metadata.Term) {
1368 r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] fast-forwarded commit to snapshot [index: %d, term: %d]",
1369 r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
1370 r.raftLog.commitTo(s.Metadata.Index)
1371 return false
1372 }
1373
William Kurkianea869482019-04-09 15:16:11 -04001374 r.raftLog.restore(s)
William Kurkianea869482019-04-09 15:16:11 -04001375
Abhilash S.L3b494632019-07-16 15:51:09 +05301376 // Reset the configuration and add the (potentially updated) peers in anew.
1377 r.prs = tracker.MakeProgressTracker(r.prs.MaxInflight)
1378 for _, id := range s.Metadata.ConfState.Nodes {
1379 r.applyConfChange(pb.ConfChange{NodeID: id, Type: pb.ConfChangeAddNode})
William Kurkianea869482019-04-09 15:16:11 -04001380 }
Abhilash S.L3b494632019-07-16 15:51:09 +05301381 for _, id := range s.Metadata.ConfState.Learners {
1382 r.applyConfChange(pb.ConfChange{NodeID: id, Type: pb.ConfChangeAddLearnerNode})
1383 }
1384
1385 pr := r.prs.Progress[r.id]
1386 pr.MaybeUpdate(pr.Next - 1) // TODO(tbg): this is untested and likely unneeded
1387
1388 r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] restored snapshot [index: %d, term: %d]",
1389 r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
1390 return true
William Kurkianea869482019-04-09 15:16:11 -04001391}
1392
1393// promotable indicates whether state machine can be promoted to leader,
1394// which is true when its own id is in progress list.
1395func (r *raft) promotable() bool {
Abhilash S.L3b494632019-07-16 15:51:09 +05301396 pr := r.prs.Progress[r.id]
1397 return pr != nil && !pr.IsLearner
William Kurkianea869482019-04-09 15:16:11 -04001398}
1399
Abhilash S.L3b494632019-07-16 15:51:09 +05301400func (r *raft) applyConfChange(cc pb.ConfChange) pb.ConfState {
Devmalya Paulfb990a52019-07-09 10:01:49 -04001401 cfg, prs, err := confchange.Changer{
1402 Tracker: r.prs,
1403 LastIndex: r.raftLog.lastIndex(),
1404 }.Simple(cc)
1405 if err != nil {
1406 panic(err)
Abhilash S.L3b494632019-07-16 15:51:09 +05301407 }
Devmalya Paulfb990a52019-07-09 10:01:49 -04001408 r.prs.Config = cfg
1409 r.prs.Progress = prs
William Kurkianea869482019-04-09 15:16:11 -04001410
Abhilash S.L3b494632019-07-16 15:51:09 +05301411 r.logger.Infof("%x switched to configuration %s", r.id, r.prs.Config)
1412 // Now that the configuration is updated, handle any side effects.
1413
1414 cs := pb.ConfState{Nodes: r.prs.VoterNodes(), Learners: r.prs.LearnerNodes()}
1415 pr, ok := r.prs.Progress[r.id]
1416
1417 // Update whether the node itself is a learner, resetting to false when the
1418 // node is removed.
1419 r.isLearner = ok && pr.IsLearner
1420
1421 if (!ok || r.isLearner) && r.state == StateLeader {
1422 // This node is leader and was removed or demoted. We prevent demotions
1423 // at the time writing but hypothetically we handle them the same way as
1424 // removing the leader: stepping down into the next Term.
1425 //
1426 // TODO(tbg): step down (for sanity) and ask follower with largest Match
1427 // to TimeoutNow (to avoid interruption). This might still drop some
1428 // proposals but it's better than nothing.
1429 //
1430 // TODO(tbg): test this branch. It is untested at the time of writing.
1431 return cs
William Kurkianea869482019-04-09 15:16:11 -04001432 }
1433
Abhilash S.L3b494632019-07-16 15:51:09 +05301434 // The remaining steps only make sense if this node is the leader and there
1435 // are other nodes.
1436 if r.state != StateLeader || len(cs.Nodes) == 0 {
1437 return cs
William Kurkianea869482019-04-09 15:16:11 -04001438 }
Devmalya Paulfb990a52019-07-09 10:01:49 -04001439 if r.maybeCommit() {
Abhilash S.L3b494632019-07-16 15:51:09 +05301440 // The quorum size may have been reduced (but not to zero), so see if
1441 // any pending entries can be committed.
Devmalya Paulfb990a52019-07-09 10:01:49 -04001442 r.bcastAppend()
William Kurkianea869482019-04-09 15:16:11 -04001443 }
Abhilash S.L3b494632019-07-16 15:51:09 +05301444 // If the the leadTransferee was removed, abort the leadership transfer.
1445 if _, tOK := r.prs.Progress[r.leadTransferee]; !tOK && r.leadTransferee != 0 {
William Kurkianea869482019-04-09 15:16:11 -04001446 r.abortLeaderTransfer()
1447 }
William Kurkianea869482019-04-09 15:16:11 -04001448
Abhilash S.L3b494632019-07-16 15:51:09 +05301449 return cs
William Kurkianea869482019-04-09 15:16:11 -04001450}
1451
1452func (r *raft) loadState(state pb.HardState) {
1453 if state.Commit < r.raftLog.committed || state.Commit > r.raftLog.lastIndex() {
1454 r.logger.Panicf("%x state.commit %d is out of range [%d, %d]", r.id, state.Commit, r.raftLog.committed, r.raftLog.lastIndex())
1455 }
1456 r.raftLog.committed = state.Commit
1457 r.Term = state.Term
1458 r.Vote = state.Vote
1459}
1460
1461// pastElectionTimeout returns true iff r.electionElapsed is greater
1462// than or equal to the randomized election timeout in
1463// [electiontimeout, 2 * electiontimeout - 1].
1464func (r *raft) pastElectionTimeout() bool {
1465 return r.electionElapsed >= r.randomizedElectionTimeout
1466}
1467
1468func (r *raft) resetRandomizedElectionTimeout() {
1469 r.randomizedElectionTimeout = r.electionTimeout + globalRand.Intn(r.electionTimeout)
1470}
1471
William Kurkianea869482019-04-09 15:16:11 -04001472func (r *raft) sendTimeoutNow(to uint64) {
1473 r.send(pb.Message{To: to, Type: pb.MsgTimeoutNow})
1474}
1475
1476func (r *raft) abortLeaderTransfer() {
1477 r.leadTransferee = None
1478}
1479
1480// increaseUncommittedSize computes the size of the proposed entries and
1481// determines whether they would push leader over its maxUncommittedSize limit.
1482// If the new entries would exceed the limit, the method returns false. If not,
1483// the increase in uncommitted entry size is recorded and the method returns
1484// true.
1485func (r *raft) increaseUncommittedSize(ents []pb.Entry) bool {
1486 var s uint64
1487 for _, e := range ents {
1488 s += uint64(PayloadSize(e))
1489 }
1490
1491 if r.uncommittedSize > 0 && r.uncommittedSize+s > r.maxUncommittedSize {
1492 // If the uncommitted tail of the Raft log is empty, allow any size
1493 // proposal. Otherwise, limit the size of the uncommitted tail of the
1494 // log and drop any proposal that would push the size over the limit.
1495 return false
1496 }
1497 r.uncommittedSize += s
1498 return true
1499}
1500
1501// reduceUncommittedSize accounts for the newly committed entries by decreasing
1502// the uncommitted entry size limit.
1503func (r *raft) reduceUncommittedSize(ents []pb.Entry) {
1504 if r.uncommittedSize == 0 {
1505 // Fast-path for followers, who do not track or enforce the limit.
1506 return
1507 }
1508
1509 var s uint64
1510 for _, e := range ents {
1511 s += uint64(PayloadSize(e))
1512 }
1513 if s > r.uncommittedSize {
1514 // uncommittedSize may underestimate the size of the uncommitted Raft
1515 // log tail but will never overestimate it. Saturate at 0 instead of
1516 // allowing overflow.
1517 r.uncommittedSize = 0
1518 } else {
1519 r.uncommittedSize -= s
1520 }
1521}
1522
1523func numOfPendingConf(ents []pb.Entry) int {
1524 n := 0
1525 for i := range ents {
1526 if ents[i].Type == pb.EntryConfChange {
1527 n++
1528 }
1529 }
1530 return n
1531}