VOL-1691 Fix openolt adapter getting stuck while registartion with core
Change-Id: Ide8131f325bc15f1b909e14d7af6ee9bcd6b3b5b
diff --git a/vendor/go.etcd.io/etcd/raft/raft.go b/vendor/go.etcd.io/etcd/raft/raft.go
index 61ad12c..846ff49 100644
--- a/vendor/go.etcd.io/etcd/raft/raft.go
+++ b/vendor/go.etcd.io/etcd/raft/raft.go
@@ -20,12 +20,13 @@
"fmt"
"math"
"math/rand"
- "sort"
"strings"
"sync"
"time"
+ "go.etcd.io/etcd/raft/quorum"
pb "go.etcd.io/etcd/raft/raftpb"
+ "go.etcd.io/etcd/raft/tracker"
)
// None is a placeholder node ID used when there is no leader.
@@ -261,18 +262,13 @@
maxMsgSize uint64
maxUncommittedSize uint64
- maxInflight int
- prs map[uint64]*Progress
- learnerPrs map[uint64]*Progress
- matchBuf uint64Slice
+ prs tracker.ProgressTracker
state StateType
// isLearner is true if the local raft node is a learner.
isLearner bool
- votes map[uint64]bool
-
msgs []pb.Message
// the leader id
@@ -348,10 +344,8 @@
isLearner: false,
raftLog: raftlog,
maxMsgSize: c.MaxSizePerMsg,
- maxInflight: c.MaxInflightMsgs,
maxUncommittedSize: c.MaxUncommittedEntriesSize,
- prs: make(map[uint64]*Progress),
- learnerPrs: make(map[uint64]*Progress),
+ prs: tracker.MakeProgressTracker(c.MaxInflightMsgs),
electionTimeout: c.ElectionTick,
heartbeatTimeout: c.HeartbeatTick,
logger: c.Logger,
@@ -361,13 +355,13 @@
disableProposalForwarding: c.DisableProposalForwarding,
}
for _, p := range peers {
- r.prs[p] = &Progress{Next: 1, ins: newInflights(r.maxInflight)}
+ // Add node to active config.
+ r.prs.InitProgress(p, 0 /* match */, 1 /* next */, false /* isLearner */)
}
for _, p := range learners {
- if _, ok := r.prs[p]; ok {
- panic(fmt.Sprintf("node %x is in both learner and peer list", p))
- }
- r.learnerPrs[p] = &Progress{Next: 1, ins: newInflights(r.maxInflight), IsLearner: true}
+ // Add learner to active config.
+ r.prs.InitProgress(p, 0 /* match */, 1 /* next */, true /* isLearner */)
+
if r.id == p {
r.isLearner = true
}
@@ -382,7 +376,7 @@
r.becomeFollower(r.Term, None)
var nodesStrs []string
- for _, n := range r.nodes() {
+ for _, n := range r.prs.VoterNodes() {
nodesStrs = append(nodesStrs, fmt.Sprintf("%x", n))
}
@@ -403,26 +397,6 @@
}
}
-func (r *raft) quorum() int { return len(r.prs)/2 + 1 }
-
-func (r *raft) nodes() []uint64 {
- nodes := make([]uint64, 0, len(r.prs))
- for id := range r.prs {
- nodes = append(nodes, id)
- }
- sort.Sort(uint64Slice(nodes))
- return nodes
-}
-
-func (r *raft) learnerNodes() []uint64 {
- nodes := make([]uint64, 0, len(r.learnerPrs))
- for id := range r.learnerPrs {
- nodes = append(nodes, id)
- }
- sort.Sort(uint64Slice(nodes))
- return nodes
-}
-
// send persists state to stable storage and then sends to its mailbox.
func (r *raft) send(m pb.Message) {
m.From = r.id
@@ -457,14 +431,6 @@
r.msgs = append(r.msgs, m)
}
-func (r *raft) getProgress(id uint64) *Progress {
- if pr, ok := r.prs[id]; ok {
- return pr
- }
-
- return r.learnerPrs[id]
-}
-
// sendAppend sends an append RPC with new entries (if any) and the
// current commit index to the given peer.
func (r *raft) sendAppend(to uint64) {
@@ -477,7 +443,7 @@
// ("empty" messages are useful to convey updated Commit indexes, but
// are undesirable when we're sending multiple messages in a batch).
func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
- pr := r.getProgress(to)
+ pr := r.prs.Progress[to]
if pr.IsPaused() {
return false
}
@@ -512,7 +478,7 @@
sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr)
- pr.becomeSnapshot(sindex)
+ pr.BecomeSnapshot(sindex)
r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr)
} else {
m.Type = pb.MsgApp
@@ -522,13 +488,13 @@
m.Commit = r.raftLog.committed
if n := len(m.Entries); n != 0 {
switch pr.State {
- // optimistically increase the next when in ProgressStateReplicate
- case ProgressStateReplicate:
+ // optimistically increase the next when in StateReplicate
+ case tracker.StateReplicate:
last := m.Entries[n-1].Index
- pr.optimisticUpdate(last)
- pr.ins.add(last)
- case ProgressStateProbe:
- pr.pause()
+ pr.OptimisticUpdate(last)
+ pr.Inflights.Add(last)
+ case tracker.StateProbe:
+ pr.ProbeSent = true
default:
r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State)
}
@@ -546,7 +512,7 @@
// or it might not have all the committed entries.
// The leader MUST NOT forward the follower's commit to
// an unmatched index.
- commit := min(r.getProgress(to).Match, r.raftLog.committed)
+ commit := min(r.prs.Progress[to].Match, r.raftLog.committed)
m := pb.Message{
To: to,
Type: pb.MsgHeartbeat,
@@ -557,20 +523,10 @@
r.send(m)
}
-func (r *raft) forEachProgress(f func(id uint64, pr *Progress)) {
- for id, pr := range r.prs {
- f(id, pr)
- }
-
- for id, pr := range r.learnerPrs {
- f(id, pr)
- }
-}
-
// bcastAppend sends RPC, with entries to all peers that are not up-to-date
// according to the progress recorded in r.prs.
func (r *raft) bcastAppend() {
- r.forEachProgress(func(id uint64, _ *Progress) {
+ r.prs.Visit(func(id uint64, _ *tracker.Progress) {
if id == r.id {
return
}
@@ -590,7 +546,7 @@
}
func (r *raft) bcastHeartbeatWithCtx(ctx []byte) {
- r.forEachProgress(func(id uint64, _ *Progress) {
+ r.prs.Visit(func(id uint64, _ *tracker.Progress) {
if id == r.id {
return
}
@@ -602,19 +558,7 @@
// the commit index changed (in which case the caller should call
// r.bcastAppend).
func (r *raft) maybeCommit() bool {
- // Preserving matchBuf across calls is an optimization
- // used to avoid allocating a new slice on each call.
- if cap(r.matchBuf) < len(r.prs) {
- r.matchBuf = make(uint64Slice, len(r.prs))
- }
- mis := r.matchBuf[:len(r.prs)]
- idx := 0
- for _, p := range r.prs {
- mis[idx] = p.Match
- idx++
- }
- sort.Sort(mis)
- mci := mis[len(mis)-r.quorum()]
+ mci := r.prs.Committed()
return r.raftLog.maybeCommit(mci, r.Term)
}
@@ -631,9 +575,14 @@
r.abortLeaderTransfer()
- r.votes = make(map[uint64]bool)
- r.forEachProgress(func(id uint64, pr *Progress) {
- *pr = Progress{Next: r.raftLog.lastIndex() + 1, ins: newInflights(r.maxInflight), IsLearner: pr.IsLearner}
+ r.prs.ResetVotes()
+ r.prs.Visit(func(id uint64, pr *tracker.Progress) {
+ *pr = tracker.Progress{
+ Match: 0,
+ Next: r.raftLog.lastIndex() + 1,
+ Inflights: tracker.NewInflights(r.prs.MaxInflight),
+ IsLearner: pr.IsLearner,
+ }
if id == r.id {
pr.Match = r.raftLog.lastIndex()
}
@@ -661,7 +610,7 @@
}
// use latest "last" index after truncate/append
li = r.raftLog.append(es...)
- r.getProgress(r.id).maybeUpdate(li)
+ r.prs.Progress[r.id].MaybeUpdate(li)
// Regardless of maybeCommit's return, our caller will call bcastAppend.
r.maybeCommit()
return true
@@ -734,7 +683,7 @@
// but doesn't change anything else. In particular it does not increase
// r.Term or change r.Vote.
r.step = stepCandidate
- r.votes = make(map[uint64]bool)
+ r.prs.ResetVotes()
r.tick = r.tickElection
r.lead = None
r.state = StatePreCandidate
@@ -755,7 +704,7 @@
// (perhaps after having received a snapshot as a result). The leader is
// trivially in this state. Note that r.reset() has initialized this
// progress with the last index already.
- r.prs[r.id].becomeReplicate()
+ r.prs.Progress[r.id].BecomeReplicate()
// Conservatively set the pendingConfIndex to the last index in the
// log. There may or may not be a pending config change, but it's
@@ -777,7 +726,14 @@
r.logger.Infof("%x became leader at term %d", r.id, r.Term)
}
+// campaign transitions the raft instance to candidate state. This must only be
+// called after verifying that this is a legitimate transition.
func (r *raft) campaign(t CampaignType) {
+ if !r.promotable() {
+ // This path should not be hit (callers are supposed to check), but
+ // better safe than sorry.
+ r.logger.Warningf("%x is unpromotable; campaign() should have been called", r.id)
+ }
var term uint64
var voteMsg pb.MessageType
if t == campaignPreElection {
@@ -790,7 +746,7 @@
voteMsg = pb.MsgVote
term = r.Term
}
- if r.quorum() == r.poll(r.id, voteRespMsgType(voteMsg), true) {
+ if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == quorum.VoteWon {
// We won the election after voting for ourselves (which must mean that
// this is a single-node cluster). Advance to the next state.
if t == campaignPreElection {
@@ -800,7 +756,7 @@
}
return
}
- for id := range r.prs {
+ for id := range r.prs.Voters.IDs() {
if id == r.id {
continue
}
@@ -815,21 +771,14 @@
}
}
-func (r *raft) poll(id uint64, t pb.MessageType, v bool) (granted int) {
+func (r *raft) poll(id uint64, t pb.MessageType, v bool) (granted int, rejected int, result quorum.VoteResult) {
if v {
r.logger.Infof("%x received %s from %x at term %d", r.id, t, id, r.Term)
} else {
r.logger.Infof("%x received %s rejection from %x at term %d", r.id, t, id, r.Term)
}
- if _, ok := r.votes[id]; !ok {
- r.votes[id] = v
- }
- for _, vv := range r.votes {
- if vv {
- granted++
- }
- }
- return granted
+ r.prs.RecordVote(id, v)
+ return r.prs.TallyVotes()
}
func (r *raft) Step(m pb.Message) error {
@@ -910,6 +859,10 @@
switch m.Type {
case pb.MsgHup:
if r.state != StateLeader {
+ if !r.promotable() {
+ r.logger.Warningf("%x is unpromotable and can not campaign; ignoring MsgHup", r.id)
+ return nil
+ }
ents, err := r.raftLog.slice(r.raftLog.applied+1, r.raftLog.committed+1, noLimit)
if err != nil {
r.logger.Panicf("unexpected error getting unapplied entries (%v)", err)
@@ -985,16 +938,32 @@
r.bcastHeartbeat()
return nil
case pb.MsgCheckQuorum:
- if !r.checkQuorumActive() {
+ // The leader should always see itself as active. As a precaution, handle
+ // the case in which the leader isn't in the configuration any more (for
+ // example if it just removed itself).
+ //
+ // TODO(tbg): I added a TODO in removeNode, it doesn't seem that the
+ // leader steps down when removing itself. I might be missing something.
+ if pr := r.prs.Progress[r.id]; pr != nil {
+ pr.RecentActive = true
+ }
+ if !r.prs.QuorumActive() {
r.logger.Warningf("%x stepped down to follower since quorum is not active", r.id)
r.becomeFollower(r.Term, None)
}
+ // Mark everyone (but ourselves) as inactive in preparation for the next
+ // CheckQuorum.
+ r.prs.Visit(func(id uint64, pr *tracker.Progress) {
+ if id != r.id {
+ pr.RecentActive = false
+ }
+ })
return nil
case pb.MsgProp:
if len(m.Entries) == 0 {
r.logger.Panicf("%x stepped empty MsgProp", r.id)
}
- if _, ok := r.prs[r.id]; !ok {
+ if r.prs.Progress[r.id] == nil {
// If we are not currently a member of the range (i.e. this node
// was removed from the configuration while serving as leader),
// drop any new proposals.
@@ -1005,11 +974,12 @@
return ErrProposalDropped
}
- for i, e := range m.Entries {
+ for i := range m.Entries {
+ e := &m.Entries[i]
if e.Type == pb.EntryConfChange {
if r.pendingConfIndex > r.raftLog.applied {
r.logger.Infof("propose conf %s ignored since pending unapplied configuration [index %d, applied %d]",
- e.String(), r.pendingConfIndex, r.raftLog.applied)
+ e, r.pendingConfIndex, r.raftLog.applied)
m.Entries[i] = pb.Entry{Type: pb.EntryNormal}
} else {
r.pendingConfIndex = r.raftLog.lastIndex() + uint64(i) + 1
@@ -1023,7 +993,9 @@
r.bcastAppend()
return nil
case pb.MsgReadIndex:
- if r.quorum() > 1 {
+ // If more than the local vote is needed, go through a full broadcast,
+ // otherwise optimize.
+ if !r.prs.IsSingleton() {
if r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) != r.Term {
// Reject read only request when this leader has not committed any log entry at its term.
return nil
@@ -1035,6 +1007,8 @@
switch r.readOnly.option {
case ReadOnlySafe:
r.readOnly.addRequest(r.raftLog.committed, m)
+ // The local node automatically acks the request.
+ r.readOnly.recvAck(r.id, m.Entries[0].Data)
r.bcastHeartbeatWithCtx(m.Entries[0].Data)
case ReadOnlyLeaseBased:
ri := r.raftLog.committed
@@ -1044,7 +1018,7 @@
r.send(pb.Message{To: m.From, Type: pb.MsgReadIndexResp, Index: ri, Entries: m.Entries})
}
}
- } else { // there is only one voting member (the leader) in the cluster
+ } else { // only one voting member (the leader) in the cluster
if m.From == None || m.From == r.id { // from leader itself
r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data})
} else { // from learner member
@@ -1056,7 +1030,7 @@
}
// All other message types require a progress for m.From (pr).
- pr := r.getProgress(m.From)
+ pr := r.prs.Progress[m.From]
if pr == nil {
r.logger.Debugf("%x no progress available for %x", r.id, m.From)
return nil
@@ -1068,30 +1042,30 @@
if m.Reject {
r.logger.Debugf("%x received msgApp rejection(lastindex: %d) from %x for index %d",
r.id, m.RejectHint, m.From, m.Index)
- if pr.maybeDecrTo(m.Index, m.RejectHint) {
+ if pr.MaybeDecrTo(m.Index, m.RejectHint) {
r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr)
- if pr.State == ProgressStateReplicate {
- pr.becomeProbe()
+ if pr.State == tracker.StateReplicate {
+ pr.BecomeProbe()
}
r.sendAppend(m.From)
}
} else {
oldPaused := pr.IsPaused()
- if pr.maybeUpdate(m.Index) {
+ if pr.MaybeUpdate(m.Index) {
switch {
- case pr.State == ProgressStateProbe:
- pr.becomeReplicate()
- case pr.State == ProgressStateSnapshot && pr.needSnapshotAbort():
- r.logger.Debugf("%x snapshot aborted, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
+ case pr.State == tracker.StateProbe:
+ pr.BecomeReplicate()
+ case pr.State == tracker.StateSnapshot && pr.Match >= pr.PendingSnapshot:
+ r.logger.Debugf("%x recovered from needing snapshot, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
// Transition back to replicating state via probing state
// (which takes the snapshot into account). If we didn't
// move to replicating state, that would only happen with
// the next round of appends (but there may not be a next
// round for a while, exposing an inconsistent RaftStatus).
- pr.becomeProbe()
- pr.becomeReplicate()
- case pr.State == ProgressStateReplicate:
- pr.ins.freeTo(m.Index)
+ pr.BecomeProbe()
+ pr.BecomeReplicate()
+ case pr.State == tracker.StateReplicate:
+ pr.Inflights.FreeLE(m.Index)
}
if r.maybeCommit() {
@@ -1118,11 +1092,11 @@
}
case pb.MsgHeartbeatResp:
pr.RecentActive = true
- pr.resume()
+ pr.ProbeSent = false
// free one slot for the full inflights window to allow progress.
- if pr.State == ProgressStateReplicate && pr.ins.full() {
- pr.ins.freeFirstOne()
+ if pr.State == tracker.StateReplicate && pr.Inflights.Full() {
+ pr.Inflights.FreeFirstOne()
}
if pr.Match < r.raftLog.lastIndex() {
r.sendAppend(m.From)
@@ -1132,8 +1106,7 @@
return nil
}
- ackCount := r.readOnly.recvAck(m)
- if ackCount < r.quorum() {
+ if r.prs.Voters.VoteResult(r.readOnly.recvAck(m.From, m.Context)) != quorum.VoteWon {
return nil
}
@@ -1147,26 +1120,32 @@
}
}
case pb.MsgSnapStatus:
- if pr.State != ProgressStateSnapshot {
+ if pr.State != tracker.StateSnapshot {
return nil
}
+ // TODO(tbg): this code is very similar to the snapshot handling in
+ // MsgAppResp above. In fact, the code there is more correct than the
+ // code here and should likely be updated to match (or even better, the
+ // logic pulled into a newly created Progress state machine handler).
if !m.Reject {
- pr.becomeProbe()
+ pr.BecomeProbe()
r.logger.Debugf("%x snapshot succeeded, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
} else {
- pr.snapshotFailure()
- pr.becomeProbe()
+ // NB: the order here matters or we'll be probing erroneously from
+ // the snapshot index, but the snapshot never applied.
+ pr.PendingSnapshot = 0
+ pr.BecomeProbe()
r.logger.Debugf("%x snapshot failed, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
}
// If snapshot finish, wait for the msgAppResp from the remote node before sending
// out the next msgApp.
// If snapshot failure, wait for a heartbeat interval before next try
- pr.pause()
+ pr.ProbeSent = true
case pb.MsgUnreachable:
// During optimistic replication, if the remote becomes unreachable,
// there is huge probability that a MsgApp is lost.
- if pr.State == ProgressStateReplicate {
- pr.becomeProbe()
+ if pr.State == tracker.StateReplicate {
+ pr.BecomeProbe()
}
r.logger.Debugf("%x failed to send message to %x because it is unreachable [%s]", r.id, m.From, pr)
case pb.MsgTransferLeader:
@@ -1230,17 +1209,17 @@
r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
r.handleSnapshot(m)
case myVoteRespType:
- gr := r.poll(m.From, m.Type, !m.Reject)
- r.logger.Infof("%x [quorum:%d] has received %d %s votes and %d vote rejections", r.id, r.quorum(), gr, m.Type, len(r.votes)-gr)
- switch r.quorum() {
- case gr:
+ gr, rj, res := r.poll(m.From, m.Type, !m.Reject)
+ r.logger.Infof("%x has received %d %s votes and %d vote rejections", r.id, gr, m.Type, rj)
+ switch res {
+ case quorum.VoteWon:
if r.state == StatePreCandidate {
r.campaign(campaignElection)
} else {
r.becomeLeader()
r.bcastAppend()
}
- case len(r.votes) - gr:
+ case quorum.VoteLost:
// pb.MsgPreVoteResp contains future term of pre-candidate
// m.Term > r.Term; reuse r.Term
r.becomeFollower(r.Term, None)
@@ -1343,11 +1322,51 @@
}
// restore recovers the state machine from a snapshot. It restores the log and the
-// configuration of state machine.
+// configuration of state machine. If this method returns false, the snapshot was
+// ignored, either because it was obsolete or because of an error.
func (r *raft) restore(s pb.Snapshot) bool {
if s.Metadata.Index <= r.raftLog.committed {
return false
}
+ if r.state != StateFollower {
+ // This is defense-in-depth: if the leader somehow ended up applying a
+ // snapshot, it could move into a new term without moving into a
+ // follower state. This should never fire, but if it did, we'd have
+ // prevented damage by returning early, so log only a loud warning.
+ //
+ // At the time of writing, the instance is guaranteed to be in follower
+ // state when this method is called.
+ r.logger.Warningf("%x attempted to restore snapshot as leader; should never happen", r.id)
+ r.becomeFollower(r.Term+1, None)
+ return false
+ }
+
+ // More defense-in-depth: throw away snapshot if recipient is not in the
+ // config. This shouuldn't ever happen (at the time of writing) but lots of
+ // code here and there assumes that r.id is in the progress tracker.
+ found := false
+ cs := s.Metadata.ConfState
+ for _, set := range [][]uint64{
+ cs.Nodes,
+ cs.Learners,
+ } {
+ for _, id := range set {
+ if id == r.id {
+ found = true
+ break
+ }
+ }
+ }
+ if !found {
+ r.logger.Warningf(
+ "%x attempted to restore snapshot but it is not in the ConfState %v; should never happen",
+ r.id, cs,
+ )
+ return false
+ }
+
+ // Now go ahead and actually restore.
+
if r.raftLog.matchTerm(s.Metadata.Index, s.Metadata.Term) {
r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] fast-forwarded commit to snapshot [index: %d, term: %d]",
r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
@@ -1355,123 +1374,124 @@
return false
}
- // The normal peer can't become learner.
- if !r.isLearner {
- for _, id := range s.Metadata.ConfState.Learners {
- if id == r.id {
- r.logger.Errorf("%x can't become learner when restores snapshot [index: %d, term: %d]", r.id, s.Metadata.Index, s.Metadata.Term)
- return false
- }
- }
- }
-
- r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] starts to restore snapshot [index: %d, term: %d]",
- r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
-
r.raftLog.restore(s)
- r.prs = make(map[uint64]*Progress)
- r.learnerPrs = make(map[uint64]*Progress)
- r.restoreNode(s.Metadata.ConfState.Nodes, false)
- r.restoreNode(s.Metadata.ConfState.Learners, true)
- return true
-}
-func (r *raft) restoreNode(nodes []uint64, isLearner bool) {
- for _, n := range nodes {
- match, next := uint64(0), r.raftLog.lastIndex()+1
- if n == r.id {
- match = next - 1
- r.isLearner = isLearner
- }
- r.setProgress(n, match, next, isLearner)
- r.logger.Infof("%x restored progress of %x [%s]", r.id, n, r.getProgress(n))
+ // Reset the configuration and add the (potentially updated) peers in anew.
+ r.prs = tracker.MakeProgressTracker(r.prs.MaxInflight)
+ for _, id := range s.Metadata.ConfState.Nodes {
+ r.applyConfChange(pb.ConfChange{NodeID: id, Type: pb.ConfChangeAddNode})
}
+ for _, id := range s.Metadata.ConfState.Learners {
+ r.applyConfChange(pb.ConfChange{NodeID: id, Type: pb.ConfChangeAddLearnerNode})
+ }
+
+ pr := r.prs.Progress[r.id]
+ pr.MaybeUpdate(pr.Next - 1) // TODO(tbg): this is untested and likely unneeded
+
+ r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] restored snapshot [index: %d, term: %d]",
+ r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
+ return true
}
// promotable indicates whether state machine can be promoted to leader,
// which is true when its own id is in progress list.
func (r *raft) promotable() bool {
- _, ok := r.prs[r.id]
- return ok
+ pr := r.prs.Progress[r.id]
+ return pr != nil && !pr.IsLearner
}
-func (r *raft) addNode(id uint64) {
- r.addNodeOrLearnerNode(id, false)
-}
+func (r *raft) applyConfChange(cc pb.ConfChange) pb.ConfState {
+ addNodeOrLearnerNode := func(id uint64, isLearner bool) {
+ // NB: this method is intentionally hidden from view. All mutations of
+ // the conf state must call applyConfChange directly.
+ pr := r.prs.Progress[id]
+ if pr == nil {
+ r.prs.InitProgress(id, 0, r.raftLog.lastIndex()+1, isLearner)
+ } else {
+ if isLearner && !pr.IsLearner {
+ // Can only change Learner to Voter.
+ //
+ // TODO(tbg): why?
+ r.logger.Infof("%x ignored addLearner: do not support changing %x from raft peer to learner.", r.id, id)
+ return
+ }
-func (r *raft) addLearner(id uint64) {
- r.addNodeOrLearnerNode(id, true)
-}
+ if isLearner == pr.IsLearner {
+ // Ignore any redundant addNode calls (which can happen because the
+ // initial bootstrapping entries are applied twice).
+ return
+ }
-func (r *raft) addNodeOrLearnerNode(id uint64, isLearner bool) {
- pr := r.getProgress(id)
- if pr == nil {
- r.setProgress(id, 0, r.raftLog.lastIndex()+1, isLearner)
- } else {
- if isLearner && !pr.IsLearner {
- // can only change Learner to Voter
- r.logger.Infof("%x ignored addLearner: do not support changing %x from raft peer to learner.", r.id, id)
- return
+ // Change Learner to Voter, use origin Learner progress.
+ r.prs.RemoveAny(id)
+ r.prs.InitProgress(id, 0 /* match */, 1 /* next */, false /* isLearner */)
+ pr.IsLearner = false
+ *r.prs.Progress[id] = *pr
}
- if isLearner == pr.IsLearner {
- // Ignore any redundant addNode calls (which can happen because the
- // initial bootstrapping entries are applied twice).
- return
+ // When a node is first added, we should mark it as recently active.
+ // Otherwise, CheckQuorum may cause us to step down if it is invoked
+ // before the added node has had a chance to communicate with us.
+ r.prs.Progress[id].RecentActive = true
+ }
+
+ var removed int
+ if cc.NodeID != None {
+ switch cc.Type {
+ case pb.ConfChangeAddNode:
+ addNodeOrLearnerNode(cc.NodeID, false /* isLearner */)
+ case pb.ConfChangeAddLearnerNode:
+ addNodeOrLearnerNode(cc.NodeID, true /* isLearner */)
+ case pb.ConfChangeRemoveNode:
+ removed++
+ r.prs.RemoveAny(cc.NodeID)
+ case pb.ConfChangeUpdateNode:
+ default:
+ panic("unexpected conf type")
}
-
- // change Learner to Voter, use origin Learner progress
- delete(r.learnerPrs, id)
- pr.IsLearner = false
- r.prs[id] = pr
}
- if r.id == id {
- r.isLearner = isLearner
+ r.logger.Infof("%x switched to configuration %s", r.id, r.prs.Config)
+ // Now that the configuration is updated, handle any side effects.
+
+ cs := pb.ConfState{Nodes: r.prs.VoterNodes(), Learners: r.prs.LearnerNodes()}
+ pr, ok := r.prs.Progress[r.id]
+
+ // Update whether the node itself is a learner, resetting to false when the
+ // node is removed.
+ r.isLearner = ok && pr.IsLearner
+
+ if (!ok || r.isLearner) && r.state == StateLeader {
+ // This node is leader and was removed or demoted. We prevent demotions
+ // at the time writing but hypothetically we handle them the same way as
+ // removing the leader: stepping down into the next Term.
+ //
+ // TODO(tbg): step down (for sanity) and ask follower with largest Match
+ // to TimeoutNow (to avoid interruption). This might still drop some
+ // proposals but it's better than nothing.
+ //
+ // TODO(tbg): test this branch. It is untested at the time of writing.
+ return cs
}
- // When a node is first added, we should mark it as recently active.
- // Otherwise, CheckQuorum may cause us to step down if it is invoked
- // before the added node has a chance to communicate with us.
- pr = r.getProgress(id)
- pr.RecentActive = true
-}
-
-func (r *raft) removeNode(id uint64) {
- r.delProgress(id)
-
- // do not try to commit or abort transferring if there is no nodes in the cluster.
- if len(r.prs) == 0 && len(r.learnerPrs) == 0 {
- return
+ // The remaining steps only make sense if this node is the leader and there
+ // are other nodes.
+ if r.state != StateLeader || len(cs.Nodes) == 0 {
+ return cs
}
-
- // The quorum size is now smaller, so see if any pending entries can
- // be committed.
- if r.maybeCommit() {
- r.bcastAppend()
+ if removed > 0 {
+ // The quorum size may have been reduced (but not to zero), so see if
+ // any pending entries can be committed.
+ if r.maybeCommit() {
+ r.bcastAppend()
+ }
}
- // If the removed node is the leadTransferee, then abort the leadership transferring.
- if r.state == StateLeader && r.leadTransferee == id {
+ // If the the leadTransferee was removed, abort the leadership transfer.
+ if _, tOK := r.prs.Progress[r.leadTransferee]; !tOK && r.leadTransferee != 0 {
r.abortLeaderTransfer()
}
-}
-func (r *raft) setProgress(id, match, next uint64, isLearner bool) {
- if !isLearner {
- delete(r.learnerPrs, id)
- r.prs[id] = &Progress{Next: next, Match: match, ins: newInflights(r.maxInflight)}
- return
- }
-
- if _, ok := r.prs[id]; ok {
- panic(fmt.Sprintf("%x unexpected changing from voter to learner for %x", r.id, id))
- }
- r.learnerPrs[id] = &Progress{Next: next, Match: match, ins: newInflights(r.maxInflight), IsLearner: true}
-}
-
-func (r *raft) delProgress(id uint64) {
- delete(r.prs, id)
- delete(r.learnerPrs, id)
+ return cs
}
func (r *raft) loadState(state pb.HardState) {
@@ -1494,29 +1514,6 @@
r.randomizedElectionTimeout = r.electionTimeout + globalRand.Intn(r.electionTimeout)
}
-// checkQuorumActive returns true if the quorum is active from
-// the view of the local raft state machine. Otherwise, it returns
-// false.
-// checkQuorumActive also resets all RecentActive to false.
-func (r *raft) checkQuorumActive() bool {
- var act int
-
- r.forEachProgress(func(id uint64, pr *Progress) {
- if id == r.id { // self is always active
- act++
- return
- }
-
- if pr.RecentActive && !pr.IsLearner {
- act++
- }
-
- pr.RecentActive = false
- })
-
- return act >= r.quorum()
-}
-
func (r *raft) sendTimeoutNow(to uint64) {
r.send(pb.Message{To: to, Type: pb.MsgTimeoutNow})
}