VOL-1921 - updated to use go mod
Change-Id: I8d5187fa91fa619494f972bc29d3bd61e5be3a82
diff --git a/vendor/go.etcd.io/etcd/raft/raft.go b/vendor/go.etcd.io/etcd/raft/raft.go
index 01e23ec..cdcb43d 100644
--- a/vendor/go.etcd.io/etcd/raft/raft.go
+++ b/vendor/go.etcd.io/etcd/raft/raft.go
@@ -20,6 +20,7 @@
"fmt"
"math"
"math/rand"
+ "sort"
"strings"
"sync"
"time"
@@ -263,7 +264,8 @@
maxMsgSize uint64
maxUncommittedSize uint64
- prs tracker.ProgressTracker
+ // TODO(tbg): rename to trk.
+ prs tracker.ProgressTracker
state StateType
@@ -327,18 +329,18 @@
if err != nil {
panic(err) // TODO(bdarnell)
}
- peers := c.peers
- learners := c.learners
- if len(cs.Nodes) > 0 || len(cs.Learners) > 0 {
- if len(peers) > 0 || len(learners) > 0 {
+
+ if len(c.peers) > 0 || len(c.learners) > 0 {
+ if len(cs.Voters) > 0 || len(cs.Learners) > 0 {
// TODO(bdarnell): the peers argument is always nil except in
// tests; the argument should be removed and these tests should be
// updated to specify their nodes through a snapshot.
- panic("cannot specify both newRaft(peers, learners) and ConfState.(Nodes, Learners)")
+ panic("cannot specify both newRaft(peers, learners) and ConfState.(Voters, Learners)")
}
- peers = cs.Nodes
- learners = cs.Learners
+ cs.Voters = c.peers
+ cs.Learners = c.learners
}
+
r := &raft{
id: c.ID,
lead: None,
@@ -355,16 +357,17 @@
readOnly: newReadOnly(c.ReadOnlyOption),
disableProposalForwarding: c.DisableProposalForwarding,
}
- for _, p := range peers {
- // Add node to active config.
- r.applyConfChange(pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: p})
- }
- for _, p := range learners {
- // Add learner to active config.
- r.applyConfChange(pb.ConfChange{Type: pb.ConfChangeAddLearnerNode, NodeID: p})
- }
- if !isHardStateEqual(hs, emptyState) {
+ cfg, prs, err := confchange.Restore(confchange.Changer{
+ Tracker: r.prs,
+ LastIndex: raftlog.lastIndex(),
+ }, cs)
+ if err != nil {
+ panic(err)
+ }
+ assertConfStatesEquivalent(r.logger, cs, r.switchToConfig(cfg, prs))
+
+ if !IsEmptyHardState(hs) {
r.loadState(hs)
}
if c.Applied > 0 {
@@ -527,7 +530,6 @@
if id == r.id {
return
}
-
r.sendAppend(id)
})
}
@@ -551,6 +553,46 @@
})
}
+func (r *raft) advance(rd Ready) {
+ // If entries were applied (or a snapshot), update our cursor for
+ // the next Ready. Note that if the current HardState contains a
+ // new Commit index, this does not mean that we're also applying
+ // all of the new entries due to commit pagination by size.
+ if index := rd.appliedCursor(); index > 0 {
+ r.raftLog.appliedTo(index)
+ if r.prs.Config.AutoLeave && index >= r.pendingConfIndex && r.state == StateLeader {
+ // If the current (and most recent, at least for this leader's term)
+ // configuration should be auto-left, initiate that now.
+ ccdata, err := (&pb.ConfChangeV2{}).Marshal()
+ if err != nil {
+ panic(err)
+ }
+ ent := pb.Entry{
+ Type: pb.EntryConfChangeV2,
+ Data: ccdata,
+ }
+ if !r.appendEntry(ent) {
+ // If we could not append the entry, bump the pending conf index
+ // so that we'll try again later.
+ //
+ // TODO(tbg): test this case.
+ r.pendingConfIndex = r.raftLog.lastIndex()
+ } else {
+ r.logger.Infof("initiating automatic transition out of joint configuration %s", r.prs.Config)
+ }
+ }
+ }
+ r.reduceUncommittedSize(rd.CommittedEntries)
+
+ if len(rd.Entries) > 0 {
+ e := rd.Entries[len(rd.Entries)-1]
+ r.raftLog.stableTo(e.Index, e.Term)
+ }
+ if !IsEmptySnap(rd.Snapshot) {
+ r.raftLog.stableSnapTo(rd.Snapshot.Metadata.Index)
+ }
+}
+
// maybeCommit attempts to advance the commit index. Returns true if
// the commit index changed (in which case the caller should call
// r.bcastAppend).
@@ -753,7 +795,16 @@
}
return
}
- for id := range r.prs.Voters.IDs() {
+ var ids []uint64
+ {
+ idMap := r.prs.Voters.IDs()
+ ids = make([]uint64, 0, len(idMap))
+ for id := range idMap {
+ ids = append(ids, id)
+ }
+ sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] })
+ }
+ for _, id := range ids {
if id == r.id {
continue
}
@@ -880,12 +931,6 @@
}
case pb.MsgVote, pb.MsgPreVote:
- if r.isLearner {
- // TODO: learner may need to vote, in case of node down when confchange.
- r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] ignored %s from %x [logterm: %d, index: %d] at term %d: learner can not vote",
- r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
- return nil
- }
// We can vote if this is a repeat of a vote we've already cast...
canVote := r.Vote == m.From ||
// ...we haven't voted and we don't think there's a leader yet in this term...
@@ -894,6 +939,24 @@
(m.Type == pb.MsgPreVote && m.Term > r.Term)
// ...and we believe the candidate is up to date.
if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
+ // Note: it turns out that that learners must be allowed to cast votes.
+ // This seems counter- intuitive but is necessary in the situation in which
+ // a learner has been promoted (i.e. is now a voter) but has not learned
+ // about this yet.
+ // For example, consider a group in which id=1 is a learner and id=2 and
+ // id=3 are voters. A configuration change promoting 1 can be committed on
+ // the quorum `{2,3}` without the config change being appended to the
+ // learner's log. If the leader (say 2) fails, there are de facto two
+ // voters remaining. Only 3 can win an election (due to its log containing
+ // all committed entries), but to do so it will need 1 to vote. But 1
+ // considers itself a learner and will continue to do so until 3 has
+ // stepped up as leader, replicates the conf change to 1, and 1 applies it.
+ // Ultimately, by receiving a request to vote, the learner realizes that
+ // the candidate believes it to be a voter, and that it should act
+ // accordingly. The candidate's config may be stale, too; but in that case
+ // it won't win the election, at least in the absence of the bug discussed
+ // in:
+ // https://github.com/etcd-io/etcd/issues/7625#issuecomment-488798263.
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] cast %s for %x [logterm: %d, index: %d] at term %d",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
// When responding to Msg{Pre,}Vote messages we include the term
@@ -973,10 +1036,36 @@
for i := range m.Entries {
e := &m.Entries[i]
+ var cc pb.ConfChangeI
if e.Type == pb.EntryConfChange {
- if r.pendingConfIndex > r.raftLog.applied {
- r.logger.Infof("propose conf %s ignored since pending unapplied configuration [index %d, applied %d]",
- e, r.pendingConfIndex, r.raftLog.applied)
+ var ccc pb.ConfChange
+ if err := ccc.Unmarshal(e.Data); err != nil {
+ panic(err)
+ }
+ cc = ccc
+ } else if e.Type == pb.EntryConfChangeV2 {
+ var ccc pb.ConfChangeV2
+ if err := ccc.Unmarshal(e.Data); err != nil {
+ panic(err)
+ }
+ cc = ccc
+ }
+ if cc != nil {
+ alreadyPending := r.pendingConfIndex > r.raftLog.applied
+ alreadyJoint := len(r.prs.Config.Voters[1]) > 0
+ wantsLeaveJoint := len(cc.AsV2().Changes) == 0
+
+ var refused string
+ if alreadyPending {
+ refused = fmt.Sprintf("possible unapplied conf change at index %d (applied to %d)", r.pendingConfIndex, r.raftLog.applied)
+ } else if alreadyJoint && !wantsLeaveJoint {
+ refused = "must transition out of joint config first"
+ } else if !alreadyJoint && wantsLeaveJoint {
+ refused = "not in joint state; refusing empty conf change"
+ }
+
+ if refused != "" {
+ r.logger.Infof("%x ignoring conf change %v at config %s: %s", r.id, cc, r.prs.Config, refused)
m.Entries[i] = pb.Entry{Type: pb.EntryNormal}
} else {
r.pendingConfIndex = r.raftLog.lastIndex() + uint64(i) + 1
@@ -1010,7 +1099,7 @@
case ReadOnlyLeaseBased:
ri := r.raftLog.committed
if m.From == None || m.From == r.id { // from local member
- r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data})
+ r.readStates = append(r.readStates, ReadState{Index: ri, RequestCtx: m.Entries[0].Data})
} else {
r.send(pb.Message{To: m.From, Type: pb.MsgReadIndexResp, Index: ri, Entries: m.Entries})
}
@@ -1037,7 +1126,7 @@
pr.RecentActive = true
if m.Reject {
- r.logger.Debugf("%x received msgApp rejection(lastindex: %d) from %x for index %d",
+ r.logger.Debugf("%x received MsgAppResp(MsgApp was rejected, lastindex: %d) from %x for index %d",
r.id, m.RejectHint, m.From, m.Index)
if pr.MaybeDecrTo(m.Index, m.RejectHint) {
r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr)
@@ -1053,6 +1142,9 @@
case pr.State == tracker.StateProbe:
pr.BecomeReplicate()
case pr.State == tracker.StateSnapshot && pr.Match >= pr.PendingSnapshot:
+ // TODO(tbg): we should also enter this branch if a snapshot is
+ // received that is below pr.PendingSnapshot but which makes it
+ // possible to use the log again.
r.logger.Debugf("%x recovered from needing snapshot, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
// Transition back to replicating state via probing state
// (which takes the snapshot into account). If we didn't
@@ -1134,8 +1226,8 @@
pr.BecomeProbe()
r.logger.Debugf("%x snapshot failed, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
}
- // If snapshot finish, wait for the msgAppResp from the remote node before sending
- // out the next msgApp.
+ // If snapshot finish, wait for the MsgAppResp from the remote node before sending
+ // out the next MsgApp.
// If snapshot failure, wait for a heartbeat interval before next try
pr.ProbeSent = true
case pb.MsgUnreachable:
@@ -1294,7 +1386,7 @@
if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
} else {
- r.logger.Debugf("%x [logterm: %d, index: %d] rejected msgApp [logterm: %d, index: %d] from %x",
+ r.logger.Debugf("%x [logterm: %d, index: %d] rejected MsgApp [logterm: %d, index: %d] from %x",
r.id, r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From)
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true, RejectHint: r.raftLog.lastIndex()})
}
@@ -1339,12 +1431,12 @@
}
// More defense-in-depth: throw away snapshot if recipient is not in the
- // config. This shouuldn't ever happen (at the time of writing) but lots of
+ // config. This shouldn't ever happen (at the time of writing) but lots of
// code here and there assumes that r.id is in the progress tracker.
found := false
cs := s.Metadata.ConfState
for _, set := range [][]uint64{
- cs.Nodes,
+ cs.Voters,
cs.Learners,
} {
for _, id := range set {
@@ -1375,12 +1467,18 @@
// Reset the configuration and add the (potentially updated) peers in anew.
r.prs = tracker.MakeProgressTracker(r.prs.MaxInflight)
- for _, id := range s.Metadata.ConfState.Nodes {
- r.applyConfChange(pb.ConfChange{NodeID: id, Type: pb.ConfChangeAddNode})
+ cfg, prs, err := confchange.Restore(confchange.Changer{
+ Tracker: r.prs,
+ LastIndex: r.raftLog.lastIndex(),
+ }, cs)
+
+ if err != nil {
+ // This should never happen. Either there's a bug in our config change
+ // handling or the client corrupted the conf change.
+ panic(fmt.Sprintf("unable to restore config %+v: %s", cs, err))
}
- for _, id := range s.Metadata.ConfState.Learners {
- r.applyConfChange(pb.ConfChange{NodeID: id, Type: pb.ConfChangeAddLearnerNode})
- }
+
+ assertConfStatesEquivalent(r.logger, cs, r.switchToConfig(cfg, prs))
pr := r.prs.Progress[r.id]
pr.MaybeUpdate(pr.Next - 1) // TODO(tbg): this is untested and likely unneeded
@@ -1397,21 +1495,40 @@
return pr != nil && !pr.IsLearner
}
-func (r *raft) applyConfChange(cc pb.ConfChange) pb.ConfState {
- cfg, prs, err := confchange.Changer{
- Tracker: r.prs,
- LastIndex: r.raftLog.lastIndex(),
- }.Simple(cc)
+func (r *raft) applyConfChange(cc pb.ConfChangeV2) pb.ConfState {
+ cfg, prs, err := func() (tracker.Config, tracker.ProgressMap, error) {
+ changer := confchange.Changer{
+ Tracker: r.prs,
+ LastIndex: r.raftLog.lastIndex(),
+ }
+ if cc.LeaveJoint() {
+ return changer.LeaveJoint()
+ } else if autoLeave, ok := cc.EnterJoint(); ok {
+ return changer.EnterJoint(autoLeave, cc.Changes...)
+ }
+ return changer.Simple(cc.Changes...)
+ }()
+
if err != nil {
+ // TODO(tbg): return the error to the caller.
panic(err)
}
+
+ return r.switchToConfig(cfg, prs)
+}
+
+// switchToConfig reconfigures this node to use the provided configuration. It
+// updates the in-memory state and, when necessary, carries out additional
+// actions such as reacting to the removal of nodes or changed quorum
+// requirements.
+//
+// The inputs usually result from restoring a ConfState or applying a ConfChange.
+func (r *raft) switchToConfig(cfg tracker.Config, prs tracker.ProgressMap) pb.ConfState {
r.prs.Config = cfg
r.prs.Progress = prs
r.logger.Infof("%x switched to configuration %s", r.id, r.prs.Config)
- // Now that the configuration is updated, handle any side effects.
-
- cs := pb.ConfState{Nodes: r.prs.VoterNodes(), Learners: r.prs.LearnerNodes()}
+ cs := r.prs.ConfState()
pr, ok := r.prs.Progress[r.id]
// Update whether the node itself is a learner, resetting to false when the
@@ -1433,13 +1550,21 @@
// The remaining steps only make sense if this node is the leader and there
// are other nodes.
- if r.state != StateLeader || len(cs.Nodes) == 0 {
+ if r.state != StateLeader || len(cs.Voters) == 0 {
return cs
}
+
if r.maybeCommit() {
- // The quorum size may have been reduced (but not to zero), so see if
- // any pending entries can be committed.
+ // If the configuration change means that more entries are committed now,
+ // broadcast/append to everyone in the updated config.
r.bcastAppend()
+ } else {
+ // Otherwise, still probe the newly added replicas; there's no reason to
+ // let them wait out a heartbeat interval (or the next incoming
+ // proposal).
+ r.prs.Visit(func(id uint64, pr *tracker.Progress) {
+ r.maybeSendAppend(id, false /* sendIfEmpty */)
+ })
}
// If the the leadTransferee was removed, abort the leadership transfer.
if _, tOK := r.prs.Progress[r.leadTransferee]; !tOK && r.leadTransferee != 0 {