VOL-2112 move to voltha-lib-go
Change-Id: Ic1af08003c1d2c698c0cce371e64f47b47b8d875
diff --git a/vendor/go.etcd.io/etcd/raft/raft.go b/vendor/go.etcd.io/etcd/raft/raft.go
index e1e6a16..cdcb43d 100644
--- a/vendor/go.etcd.io/etcd/raft/raft.go
+++ b/vendor/go.etcd.io/etcd/raft/raft.go
@@ -25,7 +25,10 @@
"sync"
"time"
+ "go.etcd.io/etcd/raft/confchange"
+ "go.etcd.io/etcd/raft/quorum"
pb "go.etcd.io/etcd/raft/raftpb"
+ "go.etcd.io/etcd/raft/tracker"
)
// None is a placeholder node ID used when there is no leader.
@@ -261,18 +264,14 @@
maxMsgSize uint64
maxUncommittedSize uint64
- maxInflight int
- prs map[uint64]*Progress
- learnerPrs map[uint64]*Progress
- matchBuf uint64Slice
+ // TODO(tbg): rename to trk.
+ prs tracker.ProgressTracker
state StateType
// isLearner is true if the local raft node is a learner.
isLearner bool
- votes map[uint64]bool
-
msgs []pb.Message
// the leader id
@@ -330,28 +329,26 @@
if err != nil {
panic(err) // TODO(bdarnell)
}
- peers := c.peers
- learners := c.learners
- if len(cs.Nodes) > 0 || len(cs.Learners) > 0 {
- if len(peers) > 0 || len(learners) > 0 {
+
+ if len(c.peers) > 0 || len(c.learners) > 0 {
+ if len(cs.Voters) > 0 || len(cs.Learners) > 0 {
// TODO(bdarnell): the peers argument is always nil except in
// tests; the argument should be removed and these tests should be
// updated to specify their nodes through a snapshot.
- panic("cannot specify both newRaft(peers, learners) and ConfState.(Nodes, Learners)")
+ panic("cannot specify both newRaft(peers, learners) and ConfState.(Voters, Learners)")
}
- peers = cs.Nodes
- learners = cs.Learners
+ cs.Voters = c.peers
+ cs.Learners = c.learners
}
+
r := &raft{
id: c.ID,
lead: None,
isLearner: false,
raftLog: raftlog,
maxMsgSize: c.MaxSizePerMsg,
- maxInflight: c.MaxInflightMsgs,
maxUncommittedSize: c.MaxUncommittedEntriesSize,
- prs: make(map[uint64]*Progress),
- learnerPrs: make(map[uint64]*Progress),
+ prs: tracker.MakeProgressTracker(c.MaxInflightMsgs),
electionTimeout: c.ElectionTick,
heartbeatTimeout: c.HeartbeatTick,
logger: c.Logger,
@@ -360,20 +357,17 @@
readOnly: newReadOnly(c.ReadOnlyOption),
disableProposalForwarding: c.DisableProposalForwarding,
}
- for _, p := range peers {
- r.prs[p] = &Progress{Next: 1, ins: newInflights(r.maxInflight)}
- }
- for _, p := range learners {
- if _, ok := r.prs[p]; ok {
- panic(fmt.Sprintf("node %x is in both learner and peer list", p))
- }
- r.learnerPrs[p] = &Progress{Next: 1, ins: newInflights(r.maxInflight), IsLearner: true}
- if r.id == p {
- r.isLearner = true
- }
- }
- if !isHardStateEqual(hs, emptyState) {
+ cfg, prs, err := confchange.Restore(confchange.Changer{
+ Tracker: r.prs,
+ LastIndex: raftlog.lastIndex(),
+ }, cs)
+ if err != nil {
+ panic(err)
+ }
+ assertConfStatesEquivalent(r.logger, cs, r.switchToConfig(cfg, prs))
+
+ if !IsEmptyHardState(hs) {
r.loadState(hs)
}
if c.Applied > 0 {
@@ -382,7 +376,7 @@
r.becomeFollower(r.Term, None)
var nodesStrs []string
- for _, n := range r.nodes() {
+ for _, n := range r.prs.VoterNodes() {
nodesStrs = append(nodesStrs, fmt.Sprintf("%x", n))
}
@@ -403,26 +397,6 @@
}
}
-func (r *raft) quorum() int { return len(r.prs)/2 + 1 }
-
-func (r *raft) nodes() []uint64 {
- nodes := make([]uint64, 0, len(r.prs))
- for id := range r.prs {
- nodes = append(nodes, id)
- }
- sort.Sort(uint64Slice(nodes))
- return nodes
-}
-
-func (r *raft) learnerNodes() []uint64 {
- nodes := make([]uint64, 0, len(r.learnerPrs))
- for id := range r.learnerPrs {
- nodes = append(nodes, id)
- }
- sort.Sort(uint64Slice(nodes))
- return nodes
-}
-
// send persists state to stable storage and then sends to its mailbox.
func (r *raft) send(m pb.Message) {
m.From = r.id
@@ -457,14 +431,6 @@
r.msgs = append(r.msgs, m)
}
-func (r *raft) getProgress(id uint64) *Progress {
- if pr, ok := r.prs[id]; ok {
- return pr
- }
-
- return r.learnerPrs[id]
-}
-
// sendAppend sends an append RPC with new entries (if any) and the
// current commit index to the given peer.
func (r *raft) sendAppend(to uint64) {
@@ -477,7 +443,7 @@
// ("empty" messages are useful to convey updated Commit indexes, but
// are undesirable when we're sending multiple messages in a batch).
func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
- pr := r.getProgress(to)
+ pr := r.prs.Progress[to]
if pr.IsPaused() {
return false
}
@@ -512,7 +478,7 @@
sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr)
- pr.becomeSnapshot(sindex)
+ pr.BecomeSnapshot(sindex)
r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr)
} else {
m.Type = pb.MsgApp
@@ -522,13 +488,13 @@
m.Commit = r.raftLog.committed
if n := len(m.Entries); n != 0 {
switch pr.State {
- // optimistically increase the next when in ProgressStateReplicate
- case ProgressStateReplicate:
+ // optimistically increase the next when in StateReplicate
+ case tracker.StateReplicate:
last := m.Entries[n-1].Index
- pr.optimisticUpdate(last)
- pr.ins.add(last)
- case ProgressStateProbe:
- pr.pause()
+ pr.OptimisticUpdate(last)
+ pr.Inflights.Add(last)
+ case tracker.StateProbe:
+ pr.ProbeSent = true
default:
r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State)
}
@@ -546,7 +512,7 @@
// or it might not have all the committed entries.
// The leader MUST NOT forward the follower's commit to
// an unmatched index.
- commit := min(r.getProgress(to).Match, r.raftLog.committed)
+ commit := min(r.prs.Progress[to].Match, r.raftLog.committed)
m := pb.Message{
To: to,
Type: pb.MsgHeartbeat,
@@ -557,24 +523,13 @@
r.send(m)
}
-func (r *raft) forEachProgress(f func(id uint64, pr *Progress)) {
- for id, pr := range r.prs {
- f(id, pr)
- }
-
- for id, pr := range r.learnerPrs {
- f(id, pr)
- }
-}
-
// bcastAppend sends RPC, with entries to all peers that are not up-to-date
// according to the progress recorded in r.prs.
func (r *raft) bcastAppend() {
- r.forEachProgress(func(id uint64, _ *Progress) {
+ r.prs.Visit(func(id uint64, _ *tracker.Progress) {
if id == r.id {
return
}
-
r.sendAppend(id)
})
}
@@ -590,7 +545,7 @@
}
func (r *raft) bcastHeartbeatWithCtx(ctx []byte) {
- r.forEachProgress(func(id uint64, _ *Progress) {
+ r.prs.Visit(func(id uint64, _ *tracker.Progress) {
if id == r.id {
return
}
@@ -598,23 +553,51 @@
})
}
+func (r *raft) advance(rd Ready) {
+ // If entries were applied (or a snapshot), update our cursor for
+ // the next Ready. Note that if the current HardState contains a
+ // new Commit index, this does not mean that we're also applying
+ // all of the new entries due to commit pagination by size.
+ if index := rd.appliedCursor(); index > 0 {
+ r.raftLog.appliedTo(index)
+ if r.prs.Config.AutoLeave && index >= r.pendingConfIndex && r.state == StateLeader {
+ // If the current (and most recent, at least for this leader's term)
+ // configuration should be auto-left, initiate that now.
+ ccdata, err := (&pb.ConfChangeV2{}).Marshal()
+ if err != nil {
+ panic(err)
+ }
+ ent := pb.Entry{
+ Type: pb.EntryConfChangeV2,
+ Data: ccdata,
+ }
+ if !r.appendEntry(ent) {
+ // If we could not append the entry, bump the pending conf index
+ // so that we'll try again later.
+ //
+ // TODO(tbg): test this case.
+ r.pendingConfIndex = r.raftLog.lastIndex()
+ } else {
+ r.logger.Infof("initiating automatic transition out of joint configuration %s", r.prs.Config)
+ }
+ }
+ }
+ r.reduceUncommittedSize(rd.CommittedEntries)
+
+ if len(rd.Entries) > 0 {
+ e := rd.Entries[len(rd.Entries)-1]
+ r.raftLog.stableTo(e.Index, e.Term)
+ }
+ if !IsEmptySnap(rd.Snapshot) {
+ r.raftLog.stableSnapTo(rd.Snapshot.Metadata.Index)
+ }
+}
+
// maybeCommit attempts to advance the commit index. Returns true if
// the commit index changed (in which case the caller should call
// r.bcastAppend).
func (r *raft) maybeCommit() bool {
- // Preserving matchBuf across calls is an optimization
- // used to avoid allocating a new slice on each call.
- if cap(r.matchBuf) < len(r.prs) {
- r.matchBuf = make(uint64Slice, len(r.prs))
- }
- mis := r.matchBuf[:len(r.prs)]
- idx := 0
- for _, p := range r.prs {
- mis[idx] = p.Match
- idx++
- }
- sort.Sort(mis)
- mci := mis[len(mis)-r.quorum()]
+ mci := r.prs.Committed()
return r.raftLog.maybeCommit(mci, r.Term)
}
@@ -631,9 +614,14 @@
r.abortLeaderTransfer()
- r.votes = make(map[uint64]bool)
- r.forEachProgress(func(id uint64, pr *Progress) {
- *pr = Progress{Next: r.raftLog.lastIndex() + 1, ins: newInflights(r.maxInflight), IsLearner: pr.IsLearner}
+ r.prs.ResetVotes()
+ r.prs.Visit(func(id uint64, pr *tracker.Progress) {
+ *pr = tracker.Progress{
+ Match: 0,
+ Next: r.raftLog.lastIndex() + 1,
+ Inflights: tracker.NewInflights(r.prs.MaxInflight),
+ IsLearner: pr.IsLearner,
+ }
if id == r.id {
pr.Match = r.raftLog.lastIndex()
}
@@ -661,7 +649,7 @@
}
// use latest "last" index after truncate/append
li = r.raftLog.append(es...)
- r.getProgress(r.id).maybeUpdate(li)
+ r.prs.Progress[r.id].MaybeUpdate(li)
// Regardless of maybeCommit's return, our caller will call bcastAppend.
r.maybeCommit()
return true
@@ -734,7 +722,7 @@
// but doesn't change anything else. In particular it does not increase
// r.Term or change r.Vote.
r.step = stepCandidate
- r.votes = make(map[uint64]bool)
+ r.prs.ResetVotes()
r.tick = r.tickElection
r.lead = None
r.state = StatePreCandidate
@@ -755,7 +743,7 @@
// (perhaps after having received a snapshot as a result). The leader is
// trivially in this state. Note that r.reset() has initialized this
// progress with the last index already.
- r.prs[r.id].becomeReplicate()
+ r.prs.Progress[r.id].BecomeReplicate()
// Conservatively set the pendingConfIndex to the last index in the
// log. There may or may not be a pending config change, but it's
@@ -777,7 +765,14 @@
r.logger.Infof("%x became leader at term %d", r.id, r.Term)
}
+// campaign transitions the raft instance to candidate state. This must only be
+// called after verifying that this is a legitimate transition.
func (r *raft) campaign(t CampaignType) {
+ if !r.promotable() {
+ // This path should not be hit (callers are supposed to check), but
+ // better safe than sorry.
+ r.logger.Warningf("%x is unpromotable; campaign() should have been called", r.id)
+ }
var term uint64
var voteMsg pb.MessageType
if t == campaignPreElection {
@@ -790,7 +785,7 @@
voteMsg = pb.MsgVote
term = r.Term
}
- if r.quorum() == r.poll(r.id, voteRespMsgType(voteMsg), true) {
+ if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == quorum.VoteWon {
// We won the election after voting for ourselves (which must mean that
// this is a single-node cluster). Advance to the next state.
if t == campaignPreElection {
@@ -800,7 +795,16 @@
}
return
}
- for id := range r.prs {
+ var ids []uint64
+ {
+ idMap := r.prs.Voters.IDs()
+ ids = make([]uint64, 0, len(idMap))
+ for id := range idMap {
+ ids = append(ids, id)
+ }
+ sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] })
+ }
+ for _, id := range ids {
if id == r.id {
continue
}
@@ -815,21 +819,14 @@
}
}
-func (r *raft) poll(id uint64, t pb.MessageType, v bool) (granted int) {
+func (r *raft) poll(id uint64, t pb.MessageType, v bool) (granted int, rejected int, result quorum.VoteResult) {
if v {
r.logger.Infof("%x received %s from %x at term %d", r.id, t, id, r.Term)
} else {
r.logger.Infof("%x received %s rejection from %x at term %d", r.id, t, id, r.Term)
}
- if _, ok := r.votes[id]; !ok {
- r.votes[id] = v
- }
- for _, vv := range r.votes {
- if vv {
- granted++
- }
- }
- return granted
+ r.prs.RecordVote(id, v)
+ return r.prs.TallyVotes()
}
func (r *raft) Step(m pb.Message) error {
@@ -910,6 +907,10 @@
switch m.Type {
case pb.MsgHup:
if r.state != StateLeader {
+ if !r.promotable() {
+ r.logger.Warningf("%x is unpromotable and can not campaign; ignoring MsgHup", r.id)
+ return nil
+ }
ents, err := r.raftLog.slice(r.raftLog.applied+1, r.raftLog.committed+1, noLimit)
if err != nil {
r.logger.Panicf("unexpected error getting unapplied entries (%v)", err)
@@ -930,12 +931,6 @@
}
case pb.MsgVote, pb.MsgPreVote:
- if r.isLearner {
- // TODO: learner may need to vote, in case of node down when confchange.
- r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] ignored %s from %x [logterm: %d, index: %d] at term %d: learner can not vote",
- r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
- return nil
- }
// We can vote if this is a repeat of a vote we've already cast...
canVote := r.Vote == m.From ||
// ...we haven't voted and we don't think there's a leader yet in this term...
@@ -944,12 +939,30 @@
(m.Type == pb.MsgPreVote && m.Term > r.Term)
// ...and we believe the candidate is up to date.
if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
+ // Note: it turns out that that learners must be allowed to cast votes.
+ // This seems counter- intuitive but is necessary in the situation in which
+ // a learner has been promoted (i.e. is now a voter) but has not learned
+ // about this yet.
+ // For example, consider a group in which id=1 is a learner and id=2 and
+ // id=3 are voters. A configuration change promoting 1 can be committed on
+ // the quorum `{2,3}` without the config change being appended to the
+ // learner's log. If the leader (say 2) fails, there are de facto two
+ // voters remaining. Only 3 can win an election (due to its log containing
+ // all committed entries), but to do so it will need 1 to vote. But 1
+ // considers itself a learner and will continue to do so until 3 has
+ // stepped up as leader, replicates the conf change to 1, and 1 applies it.
+ // Ultimately, by receiving a request to vote, the learner realizes that
+ // the candidate believes it to be a voter, and that it should act
+ // accordingly. The candidate's config may be stale, too; but in that case
+ // it won't win the election, at least in the absence of the bug discussed
+ // in:
+ // https://github.com/etcd-io/etcd/issues/7625#issuecomment-488798263.
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] cast %s for %x [logterm: %d, index: %d] at term %d",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
// When responding to Msg{Pre,}Vote messages we include the term
- // from the message, not the local term. To see why consider the
+ // from the message, not the local term. To see why, consider the
// case where a single node was previously partitioned away and
- // it's local term is now of date. If we include the local term
+ // it's local term is now out of date. If we include the local term
// (recall that for pre-votes we don't update the local term), the
// (pre-)campaigning node on the other end will proceed to ignore
// the message (it ignores all out of date messages).
@@ -985,16 +998,32 @@
r.bcastHeartbeat()
return nil
case pb.MsgCheckQuorum:
- if !r.checkQuorumActive() {
+ // The leader should always see itself as active. As a precaution, handle
+ // the case in which the leader isn't in the configuration any more (for
+ // example if it just removed itself).
+ //
+ // TODO(tbg): I added a TODO in removeNode, it doesn't seem that the
+ // leader steps down when removing itself. I might be missing something.
+ if pr := r.prs.Progress[r.id]; pr != nil {
+ pr.RecentActive = true
+ }
+ if !r.prs.QuorumActive() {
r.logger.Warningf("%x stepped down to follower since quorum is not active", r.id)
r.becomeFollower(r.Term, None)
}
+ // Mark everyone (but ourselves) as inactive in preparation for the next
+ // CheckQuorum.
+ r.prs.Visit(func(id uint64, pr *tracker.Progress) {
+ if id != r.id {
+ pr.RecentActive = false
+ }
+ })
return nil
case pb.MsgProp:
if len(m.Entries) == 0 {
r.logger.Panicf("%x stepped empty MsgProp", r.id)
}
- if _, ok := r.prs[r.id]; !ok {
+ if r.prs.Progress[r.id] == nil {
// If we are not currently a member of the range (i.e. this node
// was removed from the configuration while serving as leader),
// drop any new proposals.
@@ -1005,11 +1034,38 @@
return ErrProposalDropped
}
- for i, e := range m.Entries {
+ for i := range m.Entries {
+ e := &m.Entries[i]
+ var cc pb.ConfChangeI
if e.Type == pb.EntryConfChange {
- if r.pendingConfIndex > r.raftLog.applied {
- r.logger.Infof("propose conf %s ignored since pending unapplied configuration [index %d, applied %d]",
- e.String(), r.pendingConfIndex, r.raftLog.applied)
+ var ccc pb.ConfChange
+ if err := ccc.Unmarshal(e.Data); err != nil {
+ panic(err)
+ }
+ cc = ccc
+ } else if e.Type == pb.EntryConfChangeV2 {
+ var ccc pb.ConfChangeV2
+ if err := ccc.Unmarshal(e.Data); err != nil {
+ panic(err)
+ }
+ cc = ccc
+ }
+ if cc != nil {
+ alreadyPending := r.pendingConfIndex > r.raftLog.applied
+ alreadyJoint := len(r.prs.Config.Voters[1]) > 0
+ wantsLeaveJoint := len(cc.AsV2().Changes) == 0
+
+ var refused string
+ if alreadyPending {
+ refused = fmt.Sprintf("possible unapplied conf change at index %d (applied to %d)", r.pendingConfIndex, r.raftLog.applied)
+ } else if alreadyJoint && !wantsLeaveJoint {
+ refused = "must transition out of joint config first"
+ } else if !alreadyJoint && wantsLeaveJoint {
+ refused = "not in joint state; refusing empty conf change"
+ }
+
+ if refused != "" {
+ r.logger.Infof("%x ignoring conf change %v at config %s: %s", r.id, cc, r.prs.Config, refused)
m.Entries[i] = pb.Entry{Type: pb.EntryNormal}
} else {
r.pendingConfIndex = r.raftLog.lastIndex() + uint64(i) + 1
@@ -1023,7 +1079,9 @@
r.bcastAppend()
return nil
case pb.MsgReadIndex:
- if r.quorum() > 1 {
+ // If more than the local vote is needed, go through a full broadcast,
+ // otherwise optimize.
+ if !r.prs.IsSingleton() {
if r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) != r.Term {
// Reject read only request when this leader has not committed any log entry at its term.
return nil
@@ -1035,24 +1093,30 @@
switch r.readOnly.option {
case ReadOnlySafe:
r.readOnly.addRequest(r.raftLog.committed, m)
+ // The local node automatically acks the request.
+ r.readOnly.recvAck(r.id, m.Entries[0].Data)
r.bcastHeartbeatWithCtx(m.Entries[0].Data)
case ReadOnlyLeaseBased:
ri := r.raftLog.committed
if m.From == None || m.From == r.id { // from local member
- r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data})
+ r.readStates = append(r.readStates, ReadState{Index: ri, RequestCtx: m.Entries[0].Data})
} else {
r.send(pb.Message{To: m.From, Type: pb.MsgReadIndexResp, Index: ri, Entries: m.Entries})
}
}
- } else {
- r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data})
+ } else { // only one voting member (the leader) in the cluster
+ if m.From == None || m.From == r.id { // from leader itself
+ r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data})
+ } else { // from learner member
+ r.send(pb.Message{To: m.From, Type: pb.MsgReadIndexResp, Index: r.raftLog.committed, Entries: m.Entries})
+ }
}
return nil
}
// All other message types require a progress for m.From (pr).
- pr := r.getProgress(m.From)
+ pr := r.prs.Progress[m.From]
if pr == nil {
r.logger.Debugf("%x no progress available for %x", r.id, m.From)
return nil
@@ -1062,32 +1126,35 @@
pr.RecentActive = true
if m.Reject {
- r.logger.Debugf("%x received msgApp rejection(lastindex: %d) from %x for index %d",
+ r.logger.Debugf("%x received MsgAppResp(MsgApp was rejected, lastindex: %d) from %x for index %d",
r.id, m.RejectHint, m.From, m.Index)
- if pr.maybeDecrTo(m.Index, m.RejectHint) {
+ if pr.MaybeDecrTo(m.Index, m.RejectHint) {
r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr)
- if pr.State == ProgressStateReplicate {
- pr.becomeProbe()
+ if pr.State == tracker.StateReplicate {
+ pr.BecomeProbe()
}
r.sendAppend(m.From)
}
} else {
oldPaused := pr.IsPaused()
- if pr.maybeUpdate(m.Index) {
+ if pr.MaybeUpdate(m.Index) {
switch {
- case pr.State == ProgressStateProbe:
- pr.becomeReplicate()
- case pr.State == ProgressStateSnapshot && pr.needSnapshotAbort():
- r.logger.Debugf("%x snapshot aborted, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
+ case pr.State == tracker.StateProbe:
+ pr.BecomeReplicate()
+ case pr.State == tracker.StateSnapshot && pr.Match >= pr.PendingSnapshot:
+ // TODO(tbg): we should also enter this branch if a snapshot is
+ // received that is below pr.PendingSnapshot but which makes it
+ // possible to use the log again.
+ r.logger.Debugf("%x recovered from needing snapshot, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
// Transition back to replicating state via probing state
// (which takes the snapshot into account). If we didn't
// move to replicating state, that would only happen with
// the next round of appends (but there may not be a next
// round for a while, exposing an inconsistent RaftStatus).
- pr.becomeProbe()
- pr.becomeReplicate()
- case pr.State == ProgressStateReplicate:
- pr.ins.freeTo(m.Index)
+ pr.BecomeProbe()
+ pr.BecomeReplicate()
+ case pr.State == tracker.StateReplicate:
+ pr.Inflights.FreeLE(m.Index)
}
if r.maybeCommit() {
@@ -1114,11 +1181,11 @@
}
case pb.MsgHeartbeatResp:
pr.RecentActive = true
- pr.resume()
+ pr.ProbeSent = false
// free one slot for the full inflights window to allow progress.
- if pr.State == ProgressStateReplicate && pr.ins.full() {
- pr.ins.freeFirstOne()
+ if pr.State == tracker.StateReplicate && pr.Inflights.Full() {
+ pr.Inflights.FreeFirstOne()
}
if pr.Match < r.raftLog.lastIndex() {
r.sendAppend(m.From)
@@ -1128,8 +1195,7 @@
return nil
}
- ackCount := r.readOnly.recvAck(m)
- if ackCount < r.quorum() {
+ if r.prs.Voters.VoteResult(r.readOnly.recvAck(m.From, m.Context)) != quorum.VoteWon {
return nil
}
@@ -1143,26 +1209,32 @@
}
}
case pb.MsgSnapStatus:
- if pr.State != ProgressStateSnapshot {
+ if pr.State != tracker.StateSnapshot {
return nil
}
+ // TODO(tbg): this code is very similar to the snapshot handling in
+ // MsgAppResp above. In fact, the code there is more correct than the
+ // code here and should likely be updated to match (or even better, the
+ // logic pulled into a newly created Progress state machine handler).
if !m.Reject {
- pr.becomeProbe()
+ pr.BecomeProbe()
r.logger.Debugf("%x snapshot succeeded, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
} else {
- pr.snapshotFailure()
- pr.becomeProbe()
+ // NB: the order here matters or we'll be probing erroneously from
+ // the snapshot index, but the snapshot never applied.
+ pr.PendingSnapshot = 0
+ pr.BecomeProbe()
r.logger.Debugf("%x snapshot failed, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
}
- // If snapshot finish, wait for the msgAppResp from the remote node before sending
- // out the next msgApp.
+ // If snapshot finish, wait for the MsgAppResp from the remote node before sending
+ // out the next MsgApp.
// If snapshot failure, wait for a heartbeat interval before next try
- pr.pause()
+ pr.ProbeSent = true
case pb.MsgUnreachable:
// During optimistic replication, if the remote becomes unreachable,
// there is huge probability that a MsgApp is lost.
- if pr.State == ProgressStateReplicate {
- pr.becomeProbe()
+ if pr.State == tracker.StateReplicate {
+ pr.BecomeProbe()
}
r.logger.Debugf("%x failed to send message to %x because it is unreachable [%s]", r.id, m.From, pr)
case pb.MsgTransferLeader:
@@ -1226,17 +1298,17 @@
r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
r.handleSnapshot(m)
case myVoteRespType:
- gr := r.poll(m.From, m.Type, !m.Reject)
- r.logger.Infof("%x [quorum:%d] has received %d %s votes and %d vote rejections", r.id, r.quorum(), gr, m.Type, len(r.votes)-gr)
- switch r.quorum() {
- case gr:
+ gr, rj, res := r.poll(m.From, m.Type, !m.Reject)
+ r.logger.Infof("%x has received %d %s votes and %d vote rejections", r.id, gr, m.Type, rj)
+ switch res {
+ case quorum.VoteWon:
if r.state == StatePreCandidate {
r.campaign(campaignElection)
} else {
r.becomeLeader()
r.bcastAppend()
}
- case len(r.votes) - gr:
+ case quorum.VoteLost:
// pb.MsgPreVoteResp contains future term of pre-candidate
// m.Term > r.Term; reuse r.Term
r.becomeFollower(r.Term, None)
@@ -1314,7 +1386,7 @@
if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
} else {
- r.logger.Debugf("%x [logterm: %d, index: %d] rejected msgApp [logterm: %d, index: %d] from %x",
+ r.logger.Debugf("%x [logterm: %d, index: %d] rejected MsgApp [logterm: %d, index: %d] from %x",
r.id, r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From)
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true, RejectHint: r.raftLog.lastIndex()})
}
@@ -1339,11 +1411,51 @@
}
// restore recovers the state machine from a snapshot. It restores the log and the
-// configuration of state machine.
+// configuration of state machine. If this method returns false, the snapshot was
+// ignored, either because it was obsolete or because of an error.
func (r *raft) restore(s pb.Snapshot) bool {
if s.Metadata.Index <= r.raftLog.committed {
return false
}
+ if r.state != StateFollower {
+ // This is defense-in-depth: if the leader somehow ended up applying a
+ // snapshot, it could move into a new term without moving into a
+ // follower state. This should never fire, but if it did, we'd have
+ // prevented damage by returning early, so log only a loud warning.
+ //
+ // At the time of writing, the instance is guaranteed to be in follower
+ // state when this method is called.
+ r.logger.Warningf("%x attempted to restore snapshot as leader; should never happen", r.id)
+ r.becomeFollower(r.Term+1, None)
+ return false
+ }
+
+ // More defense-in-depth: throw away snapshot if recipient is not in the
+ // config. This shouldn't ever happen (at the time of writing) but lots of
+ // code here and there assumes that r.id is in the progress tracker.
+ found := false
+ cs := s.Metadata.ConfState
+ for _, set := range [][]uint64{
+ cs.Voters,
+ cs.Learners,
+ } {
+ for _, id := range set {
+ if id == r.id {
+ found = true
+ break
+ }
+ }
+ }
+ if !found {
+ r.logger.Warningf(
+ "%x attempted to restore snapshot but it is not in the ConfState %v; should never happen",
+ r.id, cs,
+ )
+ return false
+ }
+
+ // Now go ahead and actually restore.
+
if r.raftLog.matchTerm(s.Metadata.Index, s.Metadata.Term) {
r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] fast-forwarded commit to snapshot [index: %d, term: %d]",
r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
@@ -1351,123 +1463,115 @@
return false
}
- // The normal peer can't become learner.
- if !r.isLearner {
- for _, id := range s.Metadata.ConfState.Learners {
- if id == r.id {
- r.logger.Errorf("%x can't become learner when restores snapshot [index: %d, term: %d]", r.id, s.Metadata.Index, s.Metadata.Term)
- return false
- }
- }
- }
-
- r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] starts to restore snapshot [index: %d, term: %d]",
- r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
-
r.raftLog.restore(s)
- r.prs = make(map[uint64]*Progress)
- r.learnerPrs = make(map[uint64]*Progress)
- r.restoreNode(s.Metadata.ConfState.Nodes, false)
- r.restoreNode(s.Metadata.ConfState.Learners, true)
- return true
-}
-func (r *raft) restoreNode(nodes []uint64, isLearner bool) {
- for _, n := range nodes {
- match, next := uint64(0), r.raftLog.lastIndex()+1
- if n == r.id {
- match = next - 1
- r.isLearner = isLearner
- }
- r.setProgress(n, match, next, isLearner)
- r.logger.Infof("%x restored progress of %x [%s]", r.id, n, r.getProgress(n))
+ // Reset the configuration and add the (potentially updated) peers in anew.
+ r.prs = tracker.MakeProgressTracker(r.prs.MaxInflight)
+ cfg, prs, err := confchange.Restore(confchange.Changer{
+ Tracker: r.prs,
+ LastIndex: r.raftLog.lastIndex(),
+ }, cs)
+
+ if err != nil {
+ // This should never happen. Either there's a bug in our config change
+ // handling or the client corrupted the conf change.
+ panic(fmt.Sprintf("unable to restore config %+v: %s", cs, err))
}
+
+ assertConfStatesEquivalent(r.logger, cs, r.switchToConfig(cfg, prs))
+
+ pr := r.prs.Progress[r.id]
+ pr.MaybeUpdate(pr.Next - 1) // TODO(tbg): this is untested and likely unneeded
+
+ r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] restored snapshot [index: %d, term: %d]",
+ r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
+ return true
}
// promotable indicates whether state machine can be promoted to leader,
// which is true when its own id is in progress list.
func (r *raft) promotable() bool {
- _, ok := r.prs[r.id]
- return ok
+ pr := r.prs.Progress[r.id]
+ return pr != nil && !pr.IsLearner
}
-func (r *raft) addNode(id uint64) {
- r.addNodeOrLearnerNode(id, false)
-}
-
-func (r *raft) addLearner(id uint64) {
- r.addNodeOrLearnerNode(id, true)
-}
-
-func (r *raft) addNodeOrLearnerNode(id uint64, isLearner bool) {
- pr := r.getProgress(id)
- if pr == nil {
- r.setProgress(id, 0, r.raftLog.lastIndex()+1, isLearner)
- } else {
- if isLearner && !pr.IsLearner {
- // can only change Learner to Voter
- r.logger.Infof("%x ignored addLearner: do not support changing %x from raft peer to learner.", r.id, id)
- return
+func (r *raft) applyConfChange(cc pb.ConfChangeV2) pb.ConfState {
+ cfg, prs, err := func() (tracker.Config, tracker.ProgressMap, error) {
+ changer := confchange.Changer{
+ Tracker: r.prs,
+ LastIndex: r.raftLog.lastIndex(),
}
-
- if isLearner == pr.IsLearner {
- // Ignore any redundant addNode calls (which can happen because the
- // initial bootstrapping entries are applied twice).
- return
+ if cc.LeaveJoint() {
+ return changer.LeaveJoint()
+ } else if autoLeave, ok := cc.EnterJoint(); ok {
+ return changer.EnterJoint(autoLeave, cc.Changes...)
}
+ return changer.Simple(cc.Changes...)
+ }()
- // change Learner to Voter, use origin Learner progress
- delete(r.learnerPrs, id)
- pr.IsLearner = false
- r.prs[id] = pr
+ if err != nil {
+ // TODO(tbg): return the error to the caller.
+ panic(err)
}
- if r.id == id {
- r.isLearner = isLearner
- }
-
- // When a node is first added, we should mark it as recently active.
- // Otherwise, CheckQuorum may cause us to step down if it is invoked
- // before the added node has a chance to communicate with us.
- pr = r.getProgress(id)
- pr.RecentActive = true
+ return r.switchToConfig(cfg, prs)
}
-func (r *raft) removeNode(id uint64) {
- r.delProgress(id)
+// switchToConfig reconfigures this node to use the provided configuration. It
+// updates the in-memory state and, when necessary, carries out additional
+// actions such as reacting to the removal of nodes or changed quorum
+// requirements.
+//
+// The inputs usually result from restoring a ConfState or applying a ConfChange.
+func (r *raft) switchToConfig(cfg tracker.Config, prs tracker.ProgressMap) pb.ConfState {
+ r.prs.Config = cfg
+ r.prs.Progress = prs
- // do not try to commit or abort transferring if there is no nodes in the cluster.
- if len(r.prs) == 0 && len(r.learnerPrs) == 0 {
- return
+ r.logger.Infof("%x switched to configuration %s", r.id, r.prs.Config)
+ cs := r.prs.ConfState()
+ pr, ok := r.prs.Progress[r.id]
+
+ // Update whether the node itself is a learner, resetting to false when the
+ // node is removed.
+ r.isLearner = ok && pr.IsLearner
+
+ if (!ok || r.isLearner) && r.state == StateLeader {
+ // This node is leader and was removed or demoted. We prevent demotions
+ // at the time writing but hypothetically we handle them the same way as
+ // removing the leader: stepping down into the next Term.
+ //
+ // TODO(tbg): step down (for sanity) and ask follower with largest Match
+ // to TimeoutNow (to avoid interruption). This might still drop some
+ // proposals but it's better than nothing.
+ //
+ // TODO(tbg): test this branch. It is untested at the time of writing.
+ return cs
}
- // The quorum size is now smaller, so see if any pending entries can
- // be committed.
+ // The remaining steps only make sense if this node is the leader and there
+ // are other nodes.
+ if r.state != StateLeader || len(cs.Voters) == 0 {
+ return cs
+ }
+
if r.maybeCommit() {
+ // If the configuration change means that more entries are committed now,
+ // broadcast/append to everyone in the updated config.
r.bcastAppend()
+ } else {
+ // Otherwise, still probe the newly added replicas; there's no reason to
+ // let them wait out a heartbeat interval (or the next incoming
+ // proposal).
+ r.prs.Visit(func(id uint64, pr *tracker.Progress) {
+ r.maybeSendAppend(id, false /* sendIfEmpty */)
+ })
}
- // If the removed node is the leadTransferee, then abort the leadership transferring.
- if r.state == StateLeader && r.leadTransferee == id {
+ // If the the leadTransferee was removed, abort the leadership transfer.
+ if _, tOK := r.prs.Progress[r.leadTransferee]; !tOK && r.leadTransferee != 0 {
r.abortLeaderTransfer()
}
-}
-func (r *raft) setProgress(id, match, next uint64, isLearner bool) {
- if !isLearner {
- delete(r.learnerPrs, id)
- r.prs[id] = &Progress{Next: next, Match: match, ins: newInflights(r.maxInflight)}
- return
- }
-
- if _, ok := r.prs[id]; ok {
- panic(fmt.Sprintf("%x unexpected changing from voter to learner for %x", r.id, id))
- }
- r.learnerPrs[id] = &Progress{Next: next, Match: match, ins: newInflights(r.maxInflight), IsLearner: true}
-}
-
-func (r *raft) delProgress(id uint64) {
- delete(r.prs, id)
- delete(r.learnerPrs, id)
+ return cs
}
func (r *raft) loadState(state pb.HardState) {
@@ -1490,29 +1594,6 @@
r.randomizedElectionTimeout = r.electionTimeout + globalRand.Intn(r.electionTimeout)
}
-// checkQuorumActive returns true if the quorum is active from
-// the view of the local raft state machine. Otherwise, it returns
-// false.
-// checkQuorumActive also resets all RecentActive to false.
-func (r *raft) checkQuorumActive() bool {
- var act int
-
- r.forEachProgress(func(id uint64, pr *Progress) {
- if id == r.id { // self is always active
- act++
- return
- }
-
- if pr.RecentActive && !pr.IsLearner {
- act++
- }
-
- pr.RecentActive = false
- })
-
- return act >= r.quorum()
-}
-
func (r *raft) sendTimeoutNow(to uint64) {
r.send(pb.Message{To: to, Type: pb.MsgTimeoutNow})
}