blob: cdcb43d3f0dd2f8d820ab54ff5e150998735ce13 [file] [log] [blame]
divyadesai81bb7ba2020-03-11 11:45:23 +00001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package raft
16
17import (
18 "bytes"
19 "errors"
20 "fmt"
21 "math"
22 "math/rand"
23 "sort"
24 "strings"
25 "sync"
26 "time"
27
28 "go.etcd.io/etcd/raft/confchange"
29 "go.etcd.io/etcd/raft/quorum"
30 pb "go.etcd.io/etcd/raft/raftpb"
31 "go.etcd.io/etcd/raft/tracker"
32)
33
34// None is a placeholder node ID used when there is no leader.
35const None uint64 = 0
36const noLimit = math.MaxUint64
37
38// Possible values for StateType.
39const (
40 StateFollower StateType = iota
41 StateCandidate
42 StateLeader
43 StatePreCandidate
44 numStates
45)
46
47type ReadOnlyOption int
48
49const (
50 // ReadOnlySafe guarantees the linearizability of the read only request by
51 // communicating with the quorum. It is the default and suggested option.
52 ReadOnlySafe ReadOnlyOption = iota
53 // ReadOnlyLeaseBased ensures linearizability of the read only request by
54 // relying on the leader lease. It can be affected by clock drift.
55 // If the clock drift is unbounded, leader might keep the lease longer than it
56 // should (clock can move backward/pause without any bound). ReadIndex is not safe
57 // in that case.
58 ReadOnlyLeaseBased
59)
60
61// Possible values for CampaignType
62const (
63 // campaignPreElection represents the first phase of a normal election when
64 // Config.PreVote is true.
65 campaignPreElection CampaignType = "CampaignPreElection"
66 // campaignElection represents a normal (time-based) election (the second phase
67 // of the election when Config.PreVote is true).
68 campaignElection CampaignType = "CampaignElection"
69 // campaignTransfer represents the type of leader transfer
70 campaignTransfer CampaignType = "CampaignTransfer"
71)
72
73// ErrProposalDropped is returned when the proposal is ignored by some cases,
74// so that the proposer can be notified and fail fast.
75var ErrProposalDropped = errors.New("raft proposal dropped")
76
77// lockedRand is a small wrapper around rand.Rand to provide
78// synchronization among multiple raft groups. Only the methods needed
79// by the code are exposed (e.g. Intn).
80type lockedRand struct {
81 mu sync.Mutex
82 rand *rand.Rand
83}
84
85func (r *lockedRand) Intn(n int) int {
86 r.mu.Lock()
87 v := r.rand.Intn(n)
88 r.mu.Unlock()
89 return v
90}
91
92var globalRand = &lockedRand{
93 rand: rand.New(rand.NewSource(time.Now().UnixNano())),
94}
95
96// CampaignType represents the type of campaigning
97// the reason we use the type of string instead of uint64
98// is because it's simpler to compare and fill in raft entries
99type CampaignType string
100
101// StateType represents the role of a node in a cluster.
102type StateType uint64
103
104var stmap = [...]string{
105 "StateFollower",
106 "StateCandidate",
107 "StateLeader",
108 "StatePreCandidate",
109}
110
111func (st StateType) String() string {
112 return stmap[uint64(st)]
113}
114
115// Config contains the parameters to start a raft.
116type Config struct {
117 // ID is the identity of the local raft. ID cannot be 0.
118 ID uint64
119
120 // peers contains the IDs of all nodes (including self) in the raft cluster. It
121 // should only be set when starting a new raft cluster. Restarting raft from
122 // previous configuration will panic if peers is set. peer is private and only
123 // used for testing right now.
124 peers []uint64
125
126 // learners contains the IDs of all learner nodes (including self if the
127 // local node is a learner) in the raft cluster. learners only receives
128 // entries from the leader node. It does not vote or promote itself.
129 learners []uint64
130
131 // ElectionTick is the number of Node.Tick invocations that must pass between
132 // elections. That is, if a follower does not receive any message from the
133 // leader of current term before ElectionTick has elapsed, it will become
134 // candidate and start an election. ElectionTick must be greater than
135 // HeartbeatTick. We suggest ElectionTick = 10 * HeartbeatTick to avoid
136 // unnecessary leader switching.
137 ElectionTick int
138 // HeartbeatTick is the number of Node.Tick invocations that must pass between
139 // heartbeats. That is, a leader sends heartbeat messages to maintain its
140 // leadership every HeartbeatTick ticks.
141 HeartbeatTick int
142
143 // Storage is the storage for raft. raft generates entries and states to be
144 // stored in storage. raft reads the persisted entries and states out of
145 // Storage when it needs. raft reads out the previous state and configuration
146 // out of storage when restarting.
147 Storage Storage
148 // Applied is the last applied index. It should only be set when restarting
149 // raft. raft will not return entries to the application smaller or equal to
150 // Applied. If Applied is unset when restarting, raft might return previous
151 // applied entries. This is a very application dependent configuration.
152 Applied uint64
153
154 // MaxSizePerMsg limits the max byte size of each append message. Smaller
155 // value lowers the raft recovery cost(initial probing and message lost
156 // during normal operation). On the other side, it might affect the
157 // throughput during normal replication. Note: math.MaxUint64 for unlimited,
158 // 0 for at most one entry per message.
159 MaxSizePerMsg uint64
160 // MaxCommittedSizePerReady limits the size of the committed entries which
161 // can be applied.
162 MaxCommittedSizePerReady uint64
163 // MaxUncommittedEntriesSize limits the aggregate byte size of the
164 // uncommitted entries that may be appended to a leader's log. Once this
165 // limit is exceeded, proposals will begin to return ErrProposalDropped
166 // errors. Note: 0 for no limit.
167 MaxUncommittedEntriesSize uint64
168 // MaxInflightMsgs limits the max number of in-flight append messages during
169 // optimistic replication phase. The application transportation layer usually
170 // has its own sending buffer over TCP/UDP. Setting MaxInflightMsgs to avoid
171 // overflowing that sending buffer. TODO (xiangli): feedback to application to
172 // limit the proposal rate?
173 MaxInflightMsgs int
174
175 // CheckQuorum specifies if the leader should check quorum activity. Leader
176 // steps down when quorum is not active for an electionTimeout.
177 CheckQuorum bool
178
179 // PreVote enables the Pre-Vote algorithm described in raft thesis section
180 // 9.6. This prevents disruption when a node that has been partitioned away
181 // rejoins the cluster.
182 PreVote bool
183
184 // ReadOnlyOption specifies how the read only request is processed.
185 //
186 // ReadOnlySafe guarantees the linearizability of the read only request by
187 // communicating with the quorum. It is the default and suggested option.
188 //
189 // ReadOnlyLeaseBased ensures linearizability of the read only request by
190 // relying on the leader lease. It can be affected by clock drift.
191 // If the clock drift is unbounded, leader might keep the lease longer than it
192 // should (clock can move backward/pause without any bound). ReadIndex is not safe
193 // in that case.
194 // CheckQuorum MUST be enabled if ReadOnlyOption is ReadOnlyLeaseBased.
195 ReadOnlyOption ReadOnlyOption
196
197 // Logger is the logger used for raft log. For multinode which can host
198 // multiple raft group, each raft group can have its own logger
199 Logger Logger
200
201 // DisableProposalForwarding set to true means that followers will drop
202 // proposals, rather than forwarding them to the leader. One use case for
203 // this feature would be in a situation where the Raft leader is used to
204 // compute the data of a proposal, for example, adding a timestamp from a
205 // hybrid logical clock to data in a monotonically increasing way. Forwarding
206 // should be disabled to prevent a follower with an inaccurate hybrid
207 // logical clock from assigning the timestamp and then forwarding the data
208 // to the leader.
209 DisableProposalForwarding bool
210}
211
212func (c *Config) validate() error {
213 if c.ID == None {
214 return errors.New("cannot use none as id")
215 }
216
217 if c.HeartbeatTick <= 0 {
218 return errors.New("heartbeat tick must be greater than 0")
219 }
220
221 if c.ElectionTick <= c.HeartbeatTick {
222 return errors.New("election tick must be greater than heartbeat tick")
223 }
224
225 if c.Storage == nil {
226 return errors.New("storage cannot be nil")
227 }
228
229 if c.MaxUncommittedEntriesSize == 0 {
230 c.MaxUncommittedEntriesSize = noLimit
231 }
232
233 // default MaxCommittedSizePerReady to MaxSizePerMsg because they were
234 // previously the same parameter.
235 if c.MaxCommittedSizePerReady == 0 {
236 c.MaxCommittedSizePerReady = c.MaxSizePerMsg
237 }
238
239 if c.MaxInflightMsgs <= 0 {
240 return errors.New("max inflight messages must be greater than 0")
241 }
242
243 if c.Logger == nil {
244 c.Logger = raftLogger
245 }
246
247 if c.ReadOnlyOption == ReadOnlyLeaseBased && !c.CheckQuorum {
248 return errors.New("CheckQuorum must be enabled when ReadOnlyOption is ReadOnlyLeaseBased")
249 }
250
251 return nil
252}
253
254type raft struct {
255 id uint64
256
257 Term uint64
258 Vote uint64
259
260 readStates []ReadState
261
262 // the log
263 raftLog *raftLog
264
265 maxMsgSize uint64
266 maxUncommittedSize uint64
267 // TODO(tbg): rename to trk.
268 prs tracker.ProgressTracker
269
270 state StateType
271
272 // isLearner is true if the local raft node is a learner.
273 isLearner bool
274
275 msgs []pb.Message
276
277 // the leader id
278 lead uint64
279 // leadTransferee is id of the leader transfer target when its value is not zero.
280 // Follow the procedure defined in raft thesis 3.10.
281 leadTransferee uint64
282 // Only one conf change may be pending (in the log, but not yet
283 // applied) at a time. This is enforced via pendingConfIndex, which
284 // is set to a value >= the log index of the latest pending
285 // configuration change (if any). Config changes are only allowed to
286 // be proposed if the leader's applied index is greater than this
287 // value.
288 pendingConfIndex uint64
289 // an estimate of the size of the uncommitted tail of the Raft log. Used to
290 // prevent unbounded log growth. Only maintained by the leader. Reset on
291 // term changes.
292 uncommittedSize uint64
293
294 readOnly *readOnly
295
296 // number of ticks since it reached last electionTimeout when it is leader
297 // or candidate.
298 // number of ticks since it reached last electionTimeout or received a
299 // valid message from current leader when it is a follower.
300 electionElapsed int
301
302 // number of ticks since it reached last heartbeatTimeout.
303 // only leader keeps heartbeatElapsed.
304 heartbeatElapsed int
305
306 checkQuorum bool
307 preVote bool
308
309 heartbeatTimeout int
310 electionTimeout int
311 // randomizedElectionTimeout is a random number between
312 // [electiontimeout, 2 * electiontimeout - 1]. It gets reset
313 // when raft changes its state to follower or candidate.
314 randomizedElectionTimeout int
315 disableProposalForwarding bool
316
317 tick func()
318 step stepFunc
319
320 logger Logger
321}
322
323func newRaft(c *Config) *raft {
324 if err := c.validate(); err != nil {
325 panic(err.Error())
326 }
327 raftlog := newLogWithSize(c.Storage, c.Logger, c.MaxCommittedSizePerReady)
328 hs, cs, err := c.Storage.InitialState()
329 if err != nil {
330 panic(err) // TODO(bdarnell)
331 }
332
333 if len(c.peers) > 0 || len(c.learners) > 0 {
334 if len(cs.Voters) > 0 || len(cs.Learners) > 0 {
335 // TODO(bdarnell): the peers argument is always nil except in
336 // tests; the argument should be removed and these tests should be
337 // updated to specify their nodes through a snapshot.
338 panic("cannot specify both newRaft(peers, learners) and ConfState.(Voters, Learners)")
339 }
340 cs.Voters = c.peers
341 cs.Learners = c.learners
342 }
343
344 r := &raft{
345 id: c.ID,
346 lead: None,
347 isLearner: false,
348 raftLog: raftlog,
349 maxMsgSize: c.MaxSizePerMsg,
350 maxUncommittedSize: c.MaxUncommittedEntriesSize,
351 prs: tracker.MakeProgressTracker(c.MaxInflightMsgs),
352 electionTimeout: c.ElectionTick,
353 heartbeatTimeout: c.HeartbeatTick,
354 logger: c.Logger,
355 checkQuorum: c.CheckQuorum,
356 preVote: c.PreVote,
357 readOnly: newReadOnly(c.ReadOnlyOption),
358 disableProposalForwarding: c.DisableProposalForwarding,
359 }
360
361 cfg, prs, err := confchange.Restore(confchange.Changer{
362 Tracker: r.prs,
363 LastIndex: raftlog.lastIndex(),
364 }, cs)
365 if err != nil {
366 panic(err)
367 }
368 assertConfStatesEquivalent(r.logger, cs, r.switchToConfig(cfg, prs))
369
370 if !IsEmptyHardState(hs) {
371 r.loadState(hs)
372 }
373 if c.Applied > 0 {
374 raftlog.appliedTo(c.Applied)
375 }
376 r.becomeFollower(r.Term, None)
377
378 var nodesStrs []string
379 for _, n := range r.prs.VoterNodes() {
380 nodesStrs = append(nodesStrs, fmt.Sprintf("%x", n))
381 }
382
383 r.logger.Infof("newRaft %x [peers: [%s], term: %d, commit: %d, applied: %d, lastindex: %d, lastterm: %d]",
384 r.id, strings.Join(nodesStrs, ","), r.Term, r.raftLog.committed, r.raftLog.applied, r.raftLog.lastIndex(), r.raftLog.lastTerm())
385 return r
386}
387
388func (r *raft) hasLeader() bool { return r.lead != None }
389
390func (r *raft) softState() *SoftState { return &SoftState{Lead: r.lead, RaftState: r.state} }
391
392func (r *raft) hardState() pb.HardState {
393 return pb.HardState{
394 Term: r.Term,
395 Vote: r.Vote,
396 Commit: r.raftLog.committed,
397 }
398}
399
400// send persists state to stable storage and then sends to its mailbox.
401func (r *raft) send(m pb.Message) {
402 m.From = r.id
403 if m.Type == pb.MsgVote || m.Type == pb.MsgVoteResp || m.Type == pb.MsgPreVote || m.Type == pb.MsgPreVoteResp {
404 if m.Term == 0 {
405 // All {pre-,}campaign messages need to have the term set when
406 // sending.
407 // - MsgVote: m.Term is the term the node is campaigning for,
408 // non-zero as we increment the term when campaigning.
409 // - MsgVoteResp: m.Term is the new r.Term if the MsgVote was
410 // granted, non-zero for the same reason MsgVote is
411 // - MsgPreVote: m.Term is the term the node will campaign,
412 // non-zero as we use m.Term to indicate the next term we'll be
413 // campaigning for
414 // - MsgPreVoteResp: m.Term is the term received in the original
415 // MsgPreVote if the pre-vote was granted, non-zero for the
416 // same reasons MsgPreVote is
417 panic(fmt.Sprintf("term should be set when sending %s", m.Type))
418 }
419 } else {
420 if m.Term != 0 {
421 panic(fmt.Sprintf("term should not be set when sending %s (was %d)", m.Type, m.Term))
422 }
423 // do not attach term to MsgProp, MsgReadIndex
424 // proposals are a way to forward to the leader and
425 // should be treated as local message.
426 // MsgReadIndex is also forwarded to leader.
427 if m.Type != pb.MsgProp && m.Type != pb.MsgReadIndex {
428 m.Term = r.Term
429 }
430 }
431 r.msgs = append(r.msgs, m)
432}
433
434// sendAppend sends an append RPC with new entries (if any) and the
435// current commit index to the given peer.
436func (r *raft) sendAppend(to uint64) {
437 r.maybeSendAppend(to, true)
438}
439
440// maybeSendAppend sends an append RPC with new entries to the given peer,
441// if necessary. Returns true if a message was sent. The sendIfEmpty
442// argument controls whether messages with no entries will be sent
443// ("empty" messages are useful to convey updated Commit indexes, but
444// are undesirable when we're sending multiple messages in a batch).
445func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
446 pr := r.prs.Progress[to]
447 if pr.IsPaused() {
448 return false
449 }
450 m := pb.Message{}
451 m.To = to
452
453 term, errt := r.raftLog.term(pr.Next - 1)
454 ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize)
455 if len(ents) == 0 && !sendIfEmpty {
456 return false
457 }
458
459 if errt != nil || erre != nil { // send snapshot if we failed to get term or entries
460 if !pr.RecentActive {
461 r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to)
462 return false
463 }
464
465 m.Type = pb.MsgSnap
466 snapshot, err := r.raftLog.snapshot()
467 if err != nil {
468 if err == ErrSnapshotTemporarilyUnavailable {
469 r.logger.Debugf("%x failed to send snapshot to %x because snapshot is temporarily unavailable", r.id, to)
470 return false
471 }
472 panic(err) // TODO(bdarnell)
473 }
474 if IsEmptySnap(snapshot) {
475 panic("need non-empty snapshot")
476 }
477 m.Snapshot = snapshot
478 sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
479 r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
480 r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr)
481 pr.BecomeSnapshot(sindex)
482 r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr)
483 } else {
484 m.Type = pb.MsgApp
485 m.Index = pr.Next - 1
486 m.LogTerm = term
487 m.Entries = ents
488 m.Commit = r.raftLog.committed
489 if n := len(m.Entries); n != 0 {
490 switch pr.State {
491 // optimistically increase the next when in StateReplicate
492 case tracker.StateReplicate:
493 last := m.Entries[n-1].Index
494 pr.OptimisticUpdate(last)
495 pr.Inflights.Add(last)
496 case tracker.StateProbe:
497 pr.ProbeSent = true
498 default:
499 r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State)
500 }
501 }
502 }
503 r.send(m)
504 return true
505}
506
507// sendHeartbeat sends a heartbeat RPC to the given peer.
508func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
509 // Attach the commit as min(to.matched, r.committed).
510 // When the leader sends out heartbeat message,
511 // the receiver(follower) might not be matched with the leader
512 // or it might not have all the committed entries.
513 // The leader MUST NOT forward the follower's commit to
514 // an unmatched index.
515 commit := min(r.prs.Progress[to].Match, r.raftLog.committed)
516 m := pb.Message{
517 To: to,
518 Type: pb.MsgHeartbeat,
519 Commit: commit,
520 Context: ctx,
521 }
522
523 r.send(m)
524}
525
526// bcastAppend sends RPC, with entries to all peers that are not up-to-date
527// according to the progress recorded in r.prs.
528func (r *raft) bcastAppend() {
529 r.prs.Visit(func(id uint64, _ *tracker.Progress) {
530 if id == r.id {
531 return
532 }
533 r.sendAppend(id)
534 })
535}
536
537// bcastHeartbeat sends RPC, without entries to all the peers.
538func (r *raft) bcastHeartbeat() {
539 lastCtx := r.readOnly.lastPendingRequestCtx()
540 if len(lastCtx) == 0 {
541 r.bcastHeartbeatWithCtx(nil)
542 } else {
543 r.bcastHeartbeatWithCtx([]byte(lastCtx))
544 }
545}
546
547func (r *raft) bcastHeartbeatWithCtx(ctx []byte) {
548 r.prs.Visit(func(id uint64, _ *tracker.Progress) {
549 if id == r.id {
550 return
551 }
552 r.sendHeartbeat(id, ctx)
553 })
554}
555
556func (r *raft) advance(rd Ready) {
557 // If entries were applied (or a snapshot), update our cursor for
558 // the next Ready. Note that if the current HardState contains a
559 // new Commit index, this does not mean that we're also applying
560 // all of the new entries due to commit pagination by size.
561 if index := rd.appliedCursor(); index > 0 {
562 r.raftLog.appliedTo(index)
563 if r.prs.Config.AutoLeave && index >= r.pendingConfIndex && r.state == StateLeader {
564 // If the current (and most recent, at least for this leader's term)
565 // configuration should be auto-left, initiate that now.
566 ccdata, err := (&pb.ConfChangeV2{}).Marshal()
567 if err != nil {
568 panic(err)
569 }
570 ent := pb.Entry{
571 Type: pb.EntryConfChangeV2,
572 Data: ccdata,
573 }
574 if !r.appendEntry(ent) {
575 // If we could not append the entry, bump the pending conf index
576 // so that we'll try again later.
577 //
578 // TODO(tbg): test this case.
579 r.pendingConfIndex = r.raftLog.lastIndex()
580 } else {
581 r.logger.Infof("initiating automatic transition out of joint configuration %s", r.prs.Config)
582 }
583 }
584 }
585 r.reduceUncommittedSize(rd.CommittedEntries)
586
587 if len(rd.Entries) > 0 {
588 e := rd.Entries[len(rd.Entries)-1]
589 r.raftLog.stableTo(e.Index, e.Term)
590 }
591 if !IsEmptySnap(rd.Snapshot) {
592 r.raftLog.stableSnapTo(rd.Snapshot.Metadata.Index)
593 }
594}
595
596// maybeCommit attempts to advance the commit index. Returns true if
597// the commit index changed (in which case the caller should call
598// r.bcastAppend).
599func (r *raft) maybeCommit() bool {
600 mci := r.prs.Committed()
601 return r.raftLog.maybeCommit(mci, r.Term)
602}
603
604func (r *raft) reset(term uint64) {
605 if r.Term != term {
606 r.Term = term
607 r.Vote = None
608 }
609 r.lead = None
610
611 r.electionElapsed = 0
612 r.heartbeatElapsed = 0
613 r.resetRandomizedElectionTimeout()
614
615 r.abortLeaderTransfer()
616
617 r.prs.ResetVotes()
618 r.prs.Visit(func(id uint64, pr *tracker.Progress) {
619 *pr = tracker.Progress{
620 Match: 0,
621 Next: r.raftLog.lastIndex() + 1,
622 Inflights: tracker.NewInflights(r.prs.MaxInflight),
623 IsLearner: pr.IsLearner,
624 }
625 if id == r.id {
626 pr.Match = r.raftLog.lastIndex()
627 }
628 })
629
630 r.pendingConfIndex = 0
631 r.uncommittedSize = 0
632 r.readOnly = newReadOnly(r.readOnly.option)
633}
634
635func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) {
636 li := r.raftLog.lastIndex()
637 for i := range es {
638 es[i].Term = r.Term
639 es[i].Index = li + 1 + uint64(i)
640 }
641 // Track the size of this uncommitted proposal.
642 if !r.increaseUncommittedSize(es) {
643 r.logger.Debugf(
644 "%x appending new entries to log would exceed uncommitted entry size limit; dropping proposal",
645 r.id,
646 )
647 // Drop the proposal.
648 return false
649 }
650 // use latest "last" index after truncate/append
651 li = r.raftLog.append(es...)
652 r.prs.Progress[r.id].MaybeUpdate(li)
653 // Regardless of maybeCommit's return, our caller will call bcastAppend.
654 r.maybeCommit()
655 return true
656}
657
658// tickElection is run by followers and candidates after r.electionTimeout.
659func (r *raft) tickElection() {
660 r.electionElapsed++
661
662 if r.promotable() && r.pastElectionTimeout() {
663 r.electionElapsed = 0
664 r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
665 }
666}
667
668// tickHeartbeat is run by leaders to send a MsgBeat after r.heartbeatTimeout.
669func (r *raft) tickHeartbeat() {
670 r.heartbeatElapsed++
671 r.electionElapsed++
672
673 if r.electionElapsed >= r.electionTimeout {
674 r.electionElapsed = 0
675 if r.checkQuorum {
676 r.Step(pb.Message{From: r.id, Type: pb.MsgCheckQuorum})
677 }
678 // If current leader cannot transfer leadership in electionTimeout, it becomes leader again.
679 if r.state == StateLeader && r.leadTransferee != None {
680 r.abortLeaderTransfer()
681 }
682 }
683
684 if r.state != StateLeader {
685 return
686 }
687
688 if r.heartbeatElapsed >= r.heartbeatTimeout {
689 r.heartbeatElapsed = 0
690 r.Step(pb.Message{From: r.id, Type: pb.MsgBeat})
691 }
692}
693
694func (r *raft) becomeFollower(term uint64, lead uint64) {
695 r.step = stepFollower
696 r.reset(term)
697 r.tick = r.tickElection
698 r.lead = lead
699 r.state = StateFollower
700 r.logger.Infof("%x became follower at term %d", r.id, r.Term)
701}
702
703func (r *raft) becomeCandidate() {
704 // TODO(xiangli) remove the panic when the raft implementation is stable
705 if r.state == StateLeader {
706 panic("invalid transition [leader -> candidate]")
707 }
708 r.step = stepCandidate
709 r.reset(r.Term + 1)
710 r.tick = r.tickElection
711 r.Vote = r.id
712 r.state = StateCandidate
713 r.logger.Infof("%x became candidate at term %d", r.id, r.Term)
714}
715
716func (r *raft) becomePreCandidate() {
717 // TODO(xiangli) remove the panic when the raft implementation is stable
718 if r.state == StateLeader {
719 panic("invalid transition [leader -> pre-candidate]")
720 }
721 // Becoming a pre-candidate changes our step functions and state,
722 // but doesn't change anything else. In particular it does not increase
723 // r.Term or change r.Vote.
724 r.step = stepCandidate
725 r.prs.ResetVotes()
726 r.tick = r.tickElection
727 r.lead = None
728 r.state = StatePreCandidate
729 r.logger.Infof("%x became pre-candidate at term %d", r.id, r.Term)
730}
731
732func (r *raft) becomeLeader() {
733 // TODO(xiangli) remove the panic when the raft implementation is stable
734 if r.state == StateFollower {
735 panic("invalid transition [follower -> leader]")
736 }
737 r.step = stepLeader
738 r.reset(r.Term)
739 r.tick = r.tickHeartbeat
740 r.lead = r.id
741 r.state = StateLeader
742 // Followers enter replicate mode when they've been successfully probed
743 // (perhaps after having received a snapshot as a result). The leader is
744 // trivially in this state. Note that r.reset() has initialized this
745 // progress with the last index already.
746 r.prs.Progress[r.id].BecomeReplicate()
747
748 // Conservatively set the pendingConfIndex to the last index in the
749 // log. There may or may not be a pending config change, but it's
750 // safe to delay any future proposals until we commit all our
751 // pending log entries, and scanning the entire tail of the log
752 // could be expensive.
753 r.pendingConfIndex = r.raftLog.lastIndex()
754
755 emptyEnt := pb.Entry{Data: nil}
756 if !r.appendEntry(emptyEnt) {
757 // This won't happen because we just called reset() above.
758 r.logger.Panic("empty entry was dropped")
759 }
760 // As a special case, don't count the initial empty entry towards the
761 // uncommitted log quota. This is because we want to preserve the
762 // behavior of allowing one entry larger than quota if the current
763 // usage is zero.
764 r.reduceUncommittedSize([]pb.Entry{emptyEnt})
765 r.logger.Infof("%x became leader at term %d", r.id, r.Term)
766}
767
768// campaign transitions the raft instance to candidate state. This must only be
769// called after verifying that this is a legitimate transition.
770func (r *raft) campaign(t CampaignType) {
771 if !r.promotable() {
772 // This path should not be hit (callers are supposed to check), but
773 // better safe than sorry.
774 r.logger.Warningf("%x is unpromotable; campaign() should have been called", r.id)
775 }
776 var term uint64
777 var voteMsg pb.MessageType
778 if t == campaignPreElection {
779 r.becomePreCandidate()
780 voteMsg = pb.MsgPreVote
781 // PreVote RPCs are sent for the next term before we've incremented r.Term.
782 term = r.Term + 1
783 } else {
784 r.becomeCandidate()
785 voteMsg = pb.MsgVote
786 term = r.Term
787 }
788 if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == quorum.VoteWon {
789 // We won the election after voting for ourselves (which must mean that
790 // this is a single-node cluster). Advance to the next state.
791 if t == campaignPreElection {
792 r.campaign(campaignElection)
793 } else {
794 r.becomeLeader()
795 }
796 return
797 }
798 var ids []uint64
799 {
800 idMap := r.prs.Voters.IDs()
801 ids = make([]uint64, 0, len(idMap))
802 for id := range idMap {
803 ids = append(ids, id)
804 }
805 sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] })
806 }
807 for _, id := range ids {
808 if id == r.id {
809 continue
810 }
811 r.logger.Infof("%x [logterm: %d, index: %d] sent %s request to %x at term %d",
812 r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), voteMsg, id, r.Term)
813
814 var ctx []byte
815 if t == campaignTransfer {
816 ctx = []byte(t)
817 }
818 r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
819 }
820}
821
822func (r *raft) poll(id uint64, t pb.MessageType, v bool) (granted int, rejected int, result quorum.VoteResult) {
823 if v {
824 r.logger.Infof("%x received %s from %x at term %d", r.id, t, id, r.Term)
825 } else {
826 r.logger.Infof("%x received %s rejection from %x at term %d", r.id, t, id, r.Term)
827 }
828 r.prs.RecordVote(id, v)
829 return r.prs.TallyVotes()
830}
831
832func (r *raft) Step(m pb.Message) error {
833 // Handle the message term, which may result in our stepping down to a follower.
834 switch {
835 case m.Term == 0:
836 // local message
837 case m.Term > r.Term:
838 if m.Type == pb.MsgVote || m.Type == pb.MsgPreVote {
839 force := bytes.Equal(m.Context, []byte(campaignTransfer))
840 inLease := r.checkQuorum && r.lead != None && r.electionElapsed < r.electionTimeout
841 if !force && inLease {
842 // If a server receives a RequestVote request within the minimum election timeout
843 // of hearing from a current leader, it does not update its term or grant its vote
844 r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] ignored %s from %x [logterm: %d, index: %d] at term %d: lease is not expired (remaining ticks: %d)",
845 r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term, r.electionTimeout-r.electionElapsed)
846 return nil
847 }
848 }
849 switch {
850 case m.Type == pb.MsgPreVote:
851 // Never change our term in response to a PreVote
852 case m.Type == pb.MsgPreVoteResp && !m.Reject:
853 // We send pre-vote requests with a term in our future. If the
854 // pre-vote is granted, we will increment our term when we get a
855 // quorum. If it is not, the term comes from the node that
856 // rejected our vote so we should become a follower at the new
857 // term.
858 default:
859 r.logger.Infof("%x [term: %d] received a %s message with higher term from %x [term: %d]",
860 r.id, r.Term, m.Type, m.From, m.Term)
861 if m.Type == pb.MsgApp || m.Type == pb.MsgHeartbeat || m.Type == pb.MsgSnap {
862 r.becomeFollower(m.Term, m.From)
863 } else {
864 r.becomeFollower(m.Term, None)
865 }
866 }
867
868 case m.Term < r.Term:
869 if (r.checkQuorum || r.preVote) && (m.Type == pb.MsgHeartbeat || m.Type == pb.MsgApp) {
870 // We have received messages from a leader at a lower term. It is possible
871 // that these messages were simply delayed in the network, but this could
872 // also mean that this node has advanced its term number during a network
873 // partition, and it is now unable to either win an election or to rejoin
874 // the majority on the old term. If checkQuorum is false, this will be
875 // handled by incrementing term numbers in response to MsgVote with a
876 // higher term, but if checkQuorum is true we may not advance the term on
877 // MsgVote and must generate other messages to advance the term. The net
878 // result of these two features is to minimize the disruption caused by
879 // nodes that have been removed from the cluster's configuration: a
880 // removed node will send MsgVotes (or MsgPreVotes) which will be ignored,
881 // but it will not receive MsgApp or MsgHeartbeat, so it will not create
882 // disruptive term increases, by notifying leader of this node's activeness.
883 // The above comments also true for Pre-Vote
884 //
885 // When follower gets isolated, it soon starts an election ending
886 // up with a higher term than leader, although it won't receive enough
887 // votes to win the election. When it regains connectivity, this response
888 // with "pb.MsgAppResp" of higher term would force leader to step down.
889 // However, this disruption is inevitable to free this stuck node with
890 // fresh election. This can be prevented with Pre-Vote phase.
891 r.send(pb.Message{To: m.From, Type: pb.MsgAppResp})
892 } else if m.Type == pb.MsgPreVote {
893 // Before Pre-Vote enable, there may have candidate with higher term,
894 // but less log. After update to Pre-Vote, the cluster may deadlock if
895 // we drop messages with a lower term.
896 r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d",
897 r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
898 r.send(pb.Message{To: m.From, Term: r.Term, Type: pb.MsgPreVoteResp, Reject: true})
899 } else {
900 // ignore other cases
901 r.logger.Infof("%x [term: %d] ignored a %s message with lower term from %x [term: %d]",
902 r.id, r.Term, m.Type, m.From, m.Term)
903 }
904 return nil
905 }
906
907 switch m.Type {
908 case pb.MsgHup:
909 if r.state != StateLeader {
910 if !r.promotable() {
911 r.logger.Warningf("%x is unpromotable and can not campaign; ignoring MsgHup", r.id)
912 return nil
913 }
914 ents, err := r.raftLog.slice(r.raftLog.applied+1, r.raftLog.committed+1, noLimit)
915 if err != nil {
916 r.logger.Panicf("unexpected error getting unapplied entries (%v)", err)
917 }
918 if n := numOfPendingConf(ents); n != 0 && r.raftLog.committed > r.raftLog.applied {
919 r.logger.Warningf("%x cannot campaign at term %d since there are still %d pending configuration changes to apply", r.id, r.Term, n)
920 return nil
921 }
922
923 r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term)
924 if r.preVote {
925 r.campaign(campaignPreElection)
926 } else {
927 r.campaign(campaignElection)
928 }
929 } else {
930 r.logger.Debugf("%x ignoring MsgHup because already leader", r.id)
931 }
932
933 case pb.MsgVote, pb.MsgPreVote:
934 // We can vote if this is a repeat of a vote we've already cast...
935 canVote := r.Vote == m.From ||
936 // ...we haven't voted and we don't think there's a leader yet in this term...
937 (r.Vote == None && r.lead == None) ||
938 // ...or this is a PreVote for a future term...
939 (m.Type == pb.MsgPreVote && m.Term > r.Term)
940 // ...and we believe the candidate is up to date.
941 if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
942 // Note: it turns out that that learners must be allowed to cast votes.
943 // This seems counter- intuitive but is necessary in the situation in which
944 // a learner has been promoted (i.e. is now a voter) but has not learned
945 // about this yet.
946 // For example, consider a group in which id=1 is a learner and id=2 and
947 // id=3 are voters. A configuration change promoting 1 can be committed on
948 // the quorum `{2,3}` without the config change being appended to the
949 // learner's log. If the leader (say 2) fails, there are de facto two
950 // voters remaining. Only 3 can win an election (due to its log containing
951 // all committed entries), but to do so it will need 1 to vote. But 1
952 // considers itself a learner and will continue to do so until 3 has
953 // stepped up as leader, replicates the conf change to 1, and 1 applies it.
954 // Ultimately, by receiving a request to vote, the learner realizes that
955 // the candidate believes it to be a voter, and that it should act
956 // accordingly. The candidate's config may be stale, too; but in that case
957 // it won't win the election, at least in the absence of the bug discussed
958 // in:
959 // https://github.com/etcd-io/etcd/issues/7625#issuecomment-488798263.
960 r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] cast %s for %x [logterm: %d, index: %d] at term %d",
961 r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
962 // When responding to Msg{Pre,}Vote messages we include the term
963 // from the message, not the local term. To see why, consider the
964 // case where a single node was previously partitioned away and
965 // it's local term is now out of date. If we include the local term
966 // (recall that for pre-votes we don't update the local term), the
967 // (pre-)campaigning node on the other end will proceed to ignore
968 // the message (it ignores all out of date messages).
969 // The term in the original message and current local term are the
970 // same in the case of regular votes, but different for pre-votes.
971 r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)})
972 if m.Type == pb.MsgVote {
973 // Only record real votes.
974 r.electionElapsed = 0
975 r.Vote = m.From
976 }
977 } else {
978 r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d",
979 r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
980 r.send(pb.Message{To: m.From, Term: r.Term, Type: voteRespMsgType(m.Type), Reject: true})
981 }
982
983 default:
984 err := r.step(r, m)
985 if err != nil {
986 return err
987 }
988 }
989 return nil
990}
991
992type stepFunc func(r *raft, m pb.Message) error
993
994func stepLeader(r *raft, m pb.Message) error {
995 // These message types do not require any progress for m.From.
996 switch m.Type {
997 case pb.MsgBeat:
998 r.bcastHeartbeat()
999 return nil
1000 case pb.MsgCheckQuorum:
1001 // The leader should always see itself as active. As a precaution, handle
1002 // the case in which the leader isn't in the configuration any more (for
1003 // example if it just removed itself).
1004 //
1005 // TODO(tbg): I added a TODO in removeNode, it doesn't seem that the
1006 // leader steps down when removing itself. I might be missing something.
1007 if pr := r.prs.Progress[r.id]; pr != nil {
1008 pr.RecentActive = true
1009 }
1010 if !r.prs.QuorumActive() {
1011 r.logger.Warningf("%x stepped down to follower since quorum is not active", r.id)
1012 r.becomeFollower(r.Term, None)
1013 }
1014 // Mark everyone (but ourselves) as inactive in preparation for the next
1015 // CheckQuorum.
1016 r.prs.Visit(func(id uint64, pr *tracker.Progress) {
1017 if id != r.id {
1018 pr.RecentActive = false
1019 }
1020 })
1021 return nil
1022 case pb.MsgProp:
1023 if len(m.Entries) == 0 {
1024 r.logger.Panicf("%x stepped empty MsgProp", r.id)
1025 }
1026 if r.prs.Progress[r.id] == nil {
1027 // If we are not currently a member of the range (i.e. this node
1028 // was removed from the configuration while serving as leader),
1029 // drop any new proposals.
1030 return ErrProposalDropped
1031 }
1032 if r.leadTransferee != None {
1033 r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee)
1034 return ErrProposalDropped
1035 }
1036
1037 for i := range m.Entries {
1038 e := &m.Entries[i]
1039 var cc pb.ConfChangeI
1040 if e.Type == pb.EntryConfChange {
1041 var ccc pb.ConfChange
1042 if err := ccc.Unmarshal(e.Data); err != nil {
1043 panic(err)
1044 }
1045 cc = ccc
1046 } else if e.Type == pb.EntryConfChangeV2 {
1047 var ccc pb.ConfChangeV2
1048 if err := ccc.Unmarshal(e.Data); err != nil {
1049 panic(err)
1050 }
1051 cc = ccc
1052 }
1053 if cc != nil {
1054 alreadyPending := r.pendingConfIndex > r.raftLog.applied
1055 alreadyJoint := len(r.prs.Config.Voters[1]) > 0
1056 wantsLeaveJoint := len(cc.AsV2().Changes) == 0
1057
1058 var refused string
1059 if alreadyPending {
1060 refused = fmt.Sprintf("possible unapplied conf change at index %d (applied to %d)", r.pendingConfIndex, r.raftLog.applied)
1061 } else if alreadyJoint && !wantsLeaveJoint {
1062 refused = "must transition out of joint config first"
1063 } else if !alreadyJoint && wantsLeaveJoint {
1064 refused = "not in joint state; refusing empty conf change"
1065 }
1066
1067 if refused != "" {
1068 r.logger.Infof("%x ignoring conf change %v at config %s: %s", r.id, cc, r.prs.Config, refused)
1069 m.Entries[i] = pb.Entry{Type: pb.EntryNormal}
1070 } else {
1071 r.pendingConfIndex = r.raftLog.lastIndex() + uint64(i) + 1
1072 }
1073 }
1074 }
1075
1076 if !r.appendEntry(m.Entries...) {
1077 return ErrProposalDropped
1078 }
1079 r.bcastAppend()
1080 return nil
1081 case pb.MsgReadIndex:
1082 // If more than the local vote is needed, go through a full broadcast,
1083 // otherwise optimize.
1084 if !r.prs.IsSingleton() {
1085 if r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) != r.Term {
1086 // Reject read only request when this leader has not committed any log entry at its term.
1087 return nil
1088 }
1089
1090 // thinking: use an interally defined context instead of the user given context.
1091 // We can express this in terms of the term and index instead of a user-supplied value.
1092 // This would allow multiple reads to piggyback on the same message.
1093 switch r.readOnly.option {
1094 case ReadOnlySafe:
1095 r.readOnly.addRequest(r.raftLog.committed, m)
1096 // The local node automatically acks the request.
1097 r.readOnly.recvAck(r.id, m.Entries[0].Data)
1098 r.bcastHeartbeatWithCtx(m.Entries[0].Data)
1099 case ReadOnlyLeaseBased:
1100 ri := r.raftLog.committed
1101 if m.From == None || m.From == r.id { // from local member
1102 r.readStates = append(r.readStates, ReadState{Index: ri, RequestCtx: m.Entries[0].Data})
1103 } else {
1104 r.send(pb.Message{To: m.From, Type: pb.MsgReadIndexResp, Index: ri, Entries: m.Entries})
1105 }
1106 }
1107 } else { // only one voting member (the leader) in the cluster
1108 if m.From == None || m.From == r.id { // from leader itself
1109 r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data})
1110 } else { // from learner member
1111 r.send(pb.Message{To: m.From, Type: pb.MsgReadIndexResp, Index: r.raftLog.committed, Entries: m.Entries})
1112 }
1113 }
1114
1115 return nil
1116 }
1117
1118 // All other message types require a progress for m.From (pr).
1119 pr := r.prs.Progress[m.From]
1120 if pr == nil {
1121 r.logger.Debugf("%x no progress available for %x", r.id, m.From)
1122 return nil
1123 }
1124 switch m.Type {
1125 case pb.MsgAppResp:
1126 pr.RecentActive = true
1127
1128 if m.Reject {
1129 r.logger.Debugf("%x received MsgAppResp(MsgApp was rejected, lastindex: %d) from %x for index %d",
1130 r.id, m.RejectHint, m.From, m.Index)
1131 if pr.MaybeDecrTo(m.Index, m.RejectHint) {
1132 r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr)
1133 if pr.State == tracker.StateReplicate {
1134 pr.BecomeProbe()
1135 }
1136 r.sendAppend(m.From)
1137 }
1138 } else {
1139 oldPaused := pr.IsPaused()
1140 if pr.MaybeUpdate(m.Index) {
1141 switch {
1142 case pr.State == tracker.StateProbe:
1143 pr.BecomeReplicate()
1144 case pr.State == tracker.StateSnapshot && pr.Match >= pr.PendingSnapshot:
1145 // TODO(tbg): we should also enter this branch if a snapshot is
1146 // received that is below pr.PendingSnapshot but which makes it
1147 // possible to use the log again.
1148 r.logger.Debugf("%x recovered from needing snapshot, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
1149 // Transition back to replicating state via probing state
1150 // (which takes the snapshot into account). If we didn't
1151 // move to replicating state, that would only happen with
1152 // the next round of appends (but there may not be a next
1153 // round for a while, exposing an inconsistent RaftStatus).
1154 pr.BecomeProbe()
1155 pr.BecomeReplicate()
1156 case pr.State == tracker.StateReplicate:
1157 pr.Inflights.FreeLE(m.Index)
1158 }
1159
1160 if r.maybeCommit() {
1161 r.bcastAppend()
1162 } else if oldPaused {
1163 // If we were paused before, this node may be missing the
1164 // latest commit index, so send it.
1165 r.sendAppend(m.From)
1166 }
1167 // We've updated flow control information above, which may
1168 // allow us to send multiple (size-limited) in-flight messages
1169 // at once (such as when transitioning from probe to
1170 // replicate, or when freeTo() covers multiple messages). If
1171 // we have more entries to send, send as many messages as we
1172 // can (without sending empty messages for the commit index)
1173 for r.maybeSendAppend(m.From, false) {
1174 }
1175 // Transfer leadership is in progress.
1176 if m.From == r.leadTransferee && pr.Match == r.raftLog.lastIndex() {
1177 r.logger.Infof("%x sent MsgTimeoutNow to %x after received MsgAppResp", r.id, m.From)
1178 r.sendTimeoutNow(m.From)
1179 }
1180 }
1181 }
1182 case pb.MsgHeartbeatResp:
1183 pr.RecentActive = true
1184 pr.ProbeSent = false
1185
1186 // free one slot for the full inflights window to allow progress.
1187 if pr.State == tracker.StateReplicate && pr.Inflights.Full() {
1188 pr.Inflights.FreeFirstOne()
1189 }
1190 if pr.Match < r.raftLog.lastIndex() {
1191 r.sendAppend(m.From)
1192 }
1193
1194 if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 {
1195 return nil
1196 }
1197
1198 if r.prs.Voters.VoteResult(r.readOnly.recvAck(m.From, m.Context)) != quorum.VoteWon {
1199 return nil
1200 }
1201
1202 rss := r.readOnly.advance(m)
1203 for _, rs := range rss {
1204 req := rs.req
1205 if req.From == None || req.From == r.id { // from local member
1206 r.readStates = append(r.readStates, ReadState{Index: rs.index, RequestCtx: req.Entries[0].Data})
1207 } else {
1208 r.send(pb.Message{To: req.From, Type: pb.MsgReadIndexResp, Index: rs.index, Entries: req.Entries})
1209 }
1210 }
1211 case pb.MsgSnapStatus:
1212 if pr.State != tracker.StateSnapshot {
1213 return nil
1214 }
1215 // TODO(tbg): this code is very similar to the snapshot handling in
1216 // MsgAppResp above. In fact, the code there is more correct than the
1217 // code here and should likely be updated to match (or even better, the
1218 // logic pulled into a newly created Progress state machine handler).
1219 if !m.Reject {
1220 pr.BecomeProbe()
1221 r.logger.Debugf("%x snapshot succeeded, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
1222 } else {
1223 // NB: the order here matters or we'll be probing erroneously from
1224 // the snapshot index, but the snapshot never applied.
1225 pr.PendingSnapshot = 0
1226 pr.BecomeProbe()
1227 r.logger.Debugf("%x snapshot failed, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
1228 }
1229 // If snapshot finish, wait for the MsgAppResp from the remote node before sending
1230 // out the next MsgApp.
1231 // If snapshot failure, wait for a heartbeat interval before next try
1232 pr.ProbeSent = true
1233 case pb.MsgUnreachable:
1234 // During optimistic replication, if the remote becomes unreachable,
1235 // there is huge probability that a MsgApp is lost.
1236 if pr.State == tracker.StateReplicate {
1237 pr.BecomeProbe()
1238 }
1239 r.logger.Debugf("%x failed to send message to %x because it is unreachable [%s]", r.id, m.From, pr)
1240 case pb.MsgTransferLeader:
1241 if pr.IsLearner {
1242 r.logger.Debugf("%x is learner. Ignored transferring leadership", r.id)
1243 return nil
1244 }
1245 leadTransferee := m.From
1246 lastLeadTransferee := r.leadTransferee
1247 if lastLeadTransferee != None {
1248 if lastLeadTransferee == leadTransferee {
1249 r.logger.Infof("%x [term %d] transfer leadership to %x is in progress, ignores request to same node %x",
1250 r.id, r.Term, leadTransferee, leadTransferee)
1251 return nil
1252 }
1253 r.abortLeaderTransfer()
1254 r.logger.Infof("%x [term %d] abort previous transferring leadership to %x", r.id, r.Term, lastLeadTransferee)
1255 }
1256 if leadTransferee == r.id {
1257 r.logger.Debugf("%x is already leader. Ignored transferring leadership to self", r.id)
1258 return nil
1259 }
1260 // Transfer leadership to third party.
1261 r.logger.Infof("%x [term %d] starts to transfer leadership to %x", r.id, r.Term, leadTransferee)
1262 // Transfer leadership should be finished in one electionTimeout, so reset r.electionElapsed.
1263 r.electionElapsed = 0
1264 r.leadTransferee = leadTransferee
1265 if pr.Match == r.raftLog.lastIndex() {
1266 r.sendTimeoutNow(leadTransferee)
1267 r.logger.Infof("%x sends MsgTimeoutNow to %x immediately as %x already has up-to-date log", r.id, leadTransferee, leadTransferee)
1268 } else {
1269 r.sendAppend(leadTransferee)
1270 }
1271 }
1272 return nil
1273}
1274
1275// stepCandidate is shared by StateCandidate and StatePreCandidate; the difference is
1276// whether they respond to MsgVoteResp or MsgPreVoteResp.
1277func stepCandidate(r *raft, m pb.Message) error {
1278 // Only handle vote responses corresponding to our candidacy (while in
1279 // StateCandidate, we may get stale MsgPreVoteResp messages in this term from
1280 // our pre-candidate state).
1281 var myVoteRespType pb.MessageType
1282 if r.state == StatePreCandidate {
1283 myVoteRespType = pb.MsgPreVoteResp
1284 } else {
1285 myVoteRespType = pb.MsgVoteResp
1286 }
1287 switch m.Type {
1288 case pb.MsgProp:
1289 r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
1290 return ErrProposalDropped
1291 case pb.MsgApp:
1292 r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
1293 r.handleAppendEntries(m)
1294 case pb.MsgHeartbeat:
1295 r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
1296 r.handleHeartbeat(m)
1297 case pb.MsgSnap:
1298 r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
1299 r.handleSnapshot(m)
1300 case myVoteRespType:
1301 gr, rj, res := r.poll(m.From, m.Type, !m.Reject)
1302 r.logger.Infof("%x has received %d %s votes and %d vote rejections", r.id, gr, m.Type, rj)
1303 switch res {
1304 case quorum.VoteWon:
1305 if r.state == StatePreCandidate {
1306 r.campaign(campaignElection)
1307 } else {
1308 r.becomeLeader()
1309 r.bcastAppend()
1310 }
1311 case quorum.VoteLost:
1312 // pb.MsgPreVoteResp contains future term of pre-candidate
1313 // m.Term > r.Term; reuse r.Term
1314 r.becomeFollower(r.Term, None)
1315 }
1316 case pb.MsgTimeoutNow:
1317 r.logger.Debugf("%x [term %d state %v] ignored MsgTimeoutNow from %x", r.id, r.Term, r.state, m.From)
1318 }
1319 return nil
1320}
1321
1322func stepFollower(r *raft, m pb.Message) error {
1323 switch m.Type {
1324 case pb.MsgProp:
1325 if r.lead == None {
1326 r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
1327 return ErrProposalDropped
1328 } else if r.disableProposalForwarding {
1329 r.logger.Infof("%x not forwarding to leader %x at term %d; dropping proposal", r.id, r.lead, r.Term)
1330 return ErrProposalDropped
1331 }
1332 m.To = r.lead
1333 r.send(m)
1334 case pb.MsgApp:
1335 r.electionElapsed = 0
1336 r.lead = m.From
1337 r.handleAppendEntries(m)
1338 case pb.MsgHeartbeat:
1339 r.electionElapsed = 0
1340 r.lead = m.From
1341 r.handleHeartbeat(m)
1342 case pb.MsgSnap:
1343 r.electionElapsed = 0
1344 r.lead = m.From
1345 r.handleSnapshot(m)
1346 case pb.MsgTransferLeader:
1347 if r.lead == None {
1348 r.logger.Infof("%x no leader at term %d; dropping leader transfer msg", r.id, r.Term)
1349 return nil
1350 }
1351 m.To = r.lead
1352 r.send(m)
1353 case pb.MsgTimeoutNow:
1354 if r.promotable() {
1355 r.logger.Infof("%x [term %d] received MsgTimeoutNow from %x and starts an election to get leadership.", r.id, r.Term, m.From)
1356 // Leadership transfers never use pre-vote even if r.preVote is true; we
1357 // know we are not recovering from a partition so there is no need for the
1358 // extra round trip.
1359 r.campaign(campaignTransfer)
1360 } else {
1361 r.logger.Infof("%x received MsgTimeoutNow from %x but is not promotable", r.id, m.From)
1362 }
1363 case pb.MsgReadIndex:
1364 if r.lead == None {
1365 r.logger.Infof("%x no leader at term %d; dropping index reading msg", r.id, r.Term)
1366 return nil
1367 }
1368 m.To = r.lead
1369 r.send(m)
1370 case pb.MsgReadIndexResp:
1371 if len(m.Entries) != 1 {
1372 r.logger.Errorf("%x invalid format of MsgReadIndexResp from %x, entries count: %d", r.id, m.From, len(m.Entries))
1373 return nil
1374 }
1375 r.readStates = append(r.readStates, ReadState{Index: m.Index, RequestCtx: m.Entries[0].Data})
1376 }
1377 return nil
1378}
1379
1380func (r *raft) handleAppendEntries(m pb.Message) {
1381 if m.Index < r.raftLog.committed {
1382 r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
1383 return
1384 }
1385
1386 if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
1387 r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
1388 } else {
1389 r.logger.Debugf("%x [logterm: %d, index: %d] rejected MsgApp [logterm: %d, index: %d] from %x",
1390 r.id, r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From)
1391 r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true, RejectHint: r.raftLog.lastIndex()})
1392 }
1393}
1394
1395func (r *raft) handleHeartbeat(m pb.Message) {
1396 r.raftLog.commitTo(m.Commit)
1397 r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context})
1398}
1399
1400func (r *raft) handleSnapshot(m pb.Message) {
1401 sindex, sterm := m.Snapshot.Metadata.Index, m.Snapshot.Metadata.Term
1402 if r.restore(m.Snapshot) {
1403 r.logger.Infof("%x [commit: %d] restored snapshot [index: %d, term: %d]",
1404 r.id, r.raftLog.committed, sindex, sterm)
1405 r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()})
1406 } else {
1407 r.logger.Infof("%x [commit: %d] ignored snapshot [index: %d, term: %d]",
1408 r.id, r.raftLog.committed, sindex, sterm)
1409 r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
1410 }
1411}
1412
1413// restore recovers the state machine from a snapshot. It restores the log and the
1414// configuration of state machine. If this method returns false, the snapshot was
1415// ignored, either because it was obsolete or because of an error.
1416func (r *raft) restore(s pb.Snapshot) bool {
1417 if s.Metadata.Index <= r.raftLog.committed {
1418 return false
1419 }
1420 if r.state != StateFollower {
1421 // This is defense-in-depth: if the leader somehow ended up applying a
1422 // snapshot, it could move into a new term without moving into a
1423 // follower state. This should never fire, but if it did, we'd have
1424 // prevented damage by returning early, so log only a loud warning.
1425 //
1426 // At the time of writing, the instance is guaranteed to be in follower
1427 // state when this method is called.
1428 r.logger.Warningf("%x attempted to restore snapshot as leader; should never happen", r.id)
1429 r.becomeFollower(r.Term+1, None)
1430 return false
1431 }
1432
1433 // More defense-in-depth: throw away snapshot if recipient is not in the
1434 // config. This shouldn't ever happen (at the time of writing) but lots of
1435 // code here and there assumes that r.id is in the progress tracker.
1436 found := false
1437 cs := s.Metadata.ConfState
1438 for _, set := range [][]uint64{
1439 cs.Voters,
1440 cs.Learners,
1441 } {
1442 for _, id := range set {
1443 if id == r.id {
1444 found = true
1445 break
1446 }
1447 }
1448 }
1449 if !found {
1450 r.logger.Warningf(
1451 "%x attempted to restore snapshot but it is not in the ConfState %v; should never happen",
1452 r.id, cs,
1453 )
1454 return false
1455 }
1456
1457 // Now go ahead and actually restore.
1458
1459 if r.raftLog.matchTerm(s.Metadata.Index, s.Metadata.Term) {
1460 r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] fast-forwarded commit to snapshot [index: %d, term: %d]",
1461 r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
1462 r.raftLog.commitTo(s.Metadata.Index)
1463 return false
1464 }
1465
1466 r.raftLog.restore(s)
1467
1468 // Reset the configuration and add the (potentially updated) peers in anew.
1469 r.prs = tracker.MakeProgressTracker(r.prs.MaxInflight)
1470 cfg, prs, err := confchange.Restore(confchange.Changer{
1471 Tracker: r.prs,
1472 LastIndex: r.raftLog.lastIndex(),
1473 }, cs)
1474
1475 if err != nil {
1476 // This should never happen. Either there's a bug in our config change
1477 // handling or the client corrupted the conf change.
1478 panic(fmt.Sprintf("unable to restore config %+v: %s", cs, err))
1479 }
1480
1481 assertConfStatesEquivalent(r.logger, cs, r.switchToConfig(cfg, prs))
1482
1483 pr := r.prs.Progress[r.id]
1484 pr.MaybeUpdate(pr.Next - 1) // TODO(tbg): this is untested and likely unneeded
1485
1486 r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] restored snapshot [index: %d, term: %d]",
1487 r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
1488 return true
1489}
1490
1491// promotable indicates whether state machine can be promoted to leader,
1492// which is true when its own id is in progress list.
1493func (r *raft) promotable() bool {
1494 pr := r.prs.Progress[r.id]
1495 return pr != nil && !pr.IsLearner
1496}
1497
1498func (r *raft) applyConfChange(cc pb.ConfChangeV2) pb.ConfState {
1499 cfg, prs, err := func() (tracker.Config, tracker.ProgressMap, error) {
1500 changer := confchange.Changer{
1501 Tracker: r.prs,
1502 LastIndex: r.raftLog.lastIndex(),
1503 }
1504 if cc.LeaveJoint() {
1505 return changer.LeaveJoint()
1506 } else if autoLeave, ok := cc.EnterJoint(); ok {
1507 return changer.EnterJoint(autoLeave, cc.Changes...)
1508 }
1509 return changer.Simple(cc.Changes...)
1510 }()
1511
1512 if err != nil {
1513 // TODO(tbg): return the error to the caller.
1514 panic(err)
1515 }
1516
1517 return r.switchToConfig(cfg, prs)
1518}
1519
1520// switchToConfig reconfigures this node to use the provided configuration. It
1521// updates the in-memory state and, when necessary, carries out additional
1522// actions such as reacting to the removal of nodes or changed quorum
1523// requirements.
1524//
1525// The inputs usually result from restoring a ConfState or applying a ConfChange.
1526func (r *raft) switchToConfig(cfg tracker.Config, prs tracker.ProgressMap) pb.ConfState {
1527 r.prs.Config = cfg
1528 r.prs.Progress = prs
1529
1530 r.logger.Infof("%x switched to configuration %s", r.id, r.prs.Config)
1531 cs := r.prs.ConfState()
1532 pr, ok := r.prs.Progress[r.id]
1533
1534 // Update whether the node itself is a learner, resetting to false when the
1535 // node is removed.
1536 r.isLearner = ok && pr.IsLearner
1537
1538 if (!ok || r.isLearner) && r.state == StateLeader {
1539 // This node is leader and was removed or demoted. We prevent demotions
1540 // at the time writing but hypothetically we handle them the same way as
1541 // removing the leader: stepping down into the next Term.
1542 //
1543 // TODO(tbg): step down (for sanity) and ask follower with largest Match
1544 // to TimeoutNow (to avoid interruption). This might still drop some
1545 // proposals but it's better than nothing.
1546 //
1547 // TODO(tbg): test this branch. It is untested at the time of writing.
1548 return cs
1549 }
1550
1551 // The remaining steps only make sense if this node is the leader and there
1552 // are other nodes.
1553 if r.state != StateLeader || len(cs.Voters) == 0 {
1554 return cs
1555 }
1556
1557 if r.maybeCommit() {
1558 // If the configuration change means that more entries are committed now,
1559 // broadcast/append to everyone in the updated config.
1560 r.bcastAppend()
1561 } else {
1562 // Otherwise, still probe the newly added replicas; there's no reason to
1563 // let them wait out a heartbeat interval (or the next incoming
1564 // proposal).
1565 r.prs.Visit(func(id uint64, pr *tracker.Progress) {
1566 r.maybeSendAppend(id, false /* sendIfEmpty */)
1567 })
1568 }
1569 // If the the leadTransferee was removed, abort the leadership transfer.
1570 if _, tOK := r.prs.Progress[r.leadTransferee]; !tOK && r.leadTransferee != 0 {
1571 r.abortLeaderTransfer()
1572 }
1573
1574 return cs
1575}
1576
1577func (r *raft) loadState(state pb.HardState) {
1578 if state.Commit < r.raftLog.committed || state.Commit > r.raftLog.lastIndex() {
1579 r.logger.Panicf("%x state.commit %d is out of range [%d, %d]", r.id, state.Commit, r.raftLog.committed, r.raftLog.lastIndex())
1580 }
1581 r.raftLog.committed = state.Commit
1582 r.Term = state.Term
1583 r.Vote = state.Vote
1584}
1585
1586// pastElectionTimeout returns true iff r.electionElapsed is greater
1587// than or equal to the randomized election timeout in
1588// [electiontimeout, 2 * electiontimeout - 1].
1589func (r *raft) pastElectionTimeout() bool {
1590 return r.electionElapsed >= r.randomizedElectionTimeout
1591}
1592
1593func (r *raft) resetRandomizedElectionTimeout() {
1594 r.randomizedElectionTimeout = r.electionTimeout + globalRand.Intn(r.electionTimeout)
1595}
1596
1597func (r *raft) sendTimeoutNow(to uint64) {
1598 r.send(pb.Message{To: to, Type: pb.MsgTimeoutNow})
1599}
1600
1601func (r *raft) abortLeaderTransfer() {
1602 r.leadTransferee = None
1603}
1604
1605// increaseUncommittedSize computes the size of the proposed entries and
1606// determines whether they would push leader over its maxUncommittedSize limit.
1607// If the new entries would exceed the limit, the method returns false. If not,
1608// the increase in uncommitted entry size is recorded and the method returns
1609// true.
1610func (r *raft) increaseUncommittedSize(ents []pb.Entry) bool {
1611 var s uint64
1612 for _, e := range ents {
1613 s += uint64(PayloadSize(e))
1614 }
1615
1616 if r.uncommittedSize > 0 && r.uncommittedSize+s > r.maxUncommittedSize {
1617 // If the uncommitted tail of the Raft log is empty, allow any size
1618 // proposal. Otherwise, limit the size of the uncommitted tail of the
1619 // log and drop any proposal that would push the size over the limit.
1620 return false
1621 }
1622 r.uncommittedSize += s
1623 return true
1624}
1625
1626// reduceUncommittedSize accounts for the newly committed entries by decreasing
1627// the uncommitted entry size limit.
1628func (r *raft) reduceUncommittedSize(ents []pb.Entry) {
1629 if r.uncommittedSize == 0 {
1630 // Fast-path for followers, who do not track or enforce the limit.
1631 return
1632 }
1633
1634 var s uint64
1635 for _, e := range ents {
1636 s += uint64(PayloadSize(e))
1637 }
1638 if s > r.uncommittedSize {
1639 // uncommittedSize may underestimate the size of the uncommitted Raft
1640 // log tail but will never overestimate it. Saturate at 0 instead of
1641 // allowing overflow.
1642 r.uncommittedSize = 0
1643 } else {
1644 r.uncommittedSize -= s
1645 }
1646}
1647
1648func numOfPendingConf(ents []pb.Entry) int {
1649 n := 0
1650 for i := range ents {
1651 if ents[i].Type == pb.EntryConfChange {
1652 n++
1653 }
1654 }
1655 return n
1656}