VOL-2112 move to voltha-lib-go
Change-Id: Ic1af08003c1d2c698c0cce371e64f47b47b8d875
diff --git a/vendor/go.etcd.io/etcd/raft/node.go b/vendor/go.etcd.io/etcd/raft/node.go
index 2ec2c3a..ab6185b 100644
--- a/vendor/go.etcd.io/etcd/raft/node.go
+++ b/vendor/go.etcd.io/etcd/raft/node.go
@@ -132,10 +132,20 @@
// Propose proposes that data be appended to the log. Note that proposals can be lost without
// notice, therefore it is user's job to ensure proposal retries.
Propose(ctx context.Context, data []byte) error
- // ProposeConfChange proposes config change.
- // At most one ConfChange can be in the process of going through consensus.
- // Application needs to call ApplyConfChange when applying EntryConfChange type entry.
- ProposeConfChange(ctx context.Context, cc pb.ConfChange) error
+ // ProposeConfChange proposes a configuration change. Like any proposal, the
+ // configuration change may be dropped with or without an error being
+ // returned. In particular, configuration changes are dropped unless the
+ // leader has certainty that there is no prior unapplied configuration
+ // change in its log.
+ //
+ // The method accepts either a pb.ConfChange (deprecated) or pb.ConfChangeV2
+ // message. The latter allows arbitrary configuration changes via joint
+ // consensus, notably including replacing a voter. Passing a ConfChangeV2
+ // message is only allowed if all Nodes participating in the cluster run a
+ // version of this library aware of the V2 API. See pb.ConfChangeV2 for
+ // usage details and semantics.
+ ProposeConfChange(ctx context.Context, cc pb.ConfChangeI) error
+
// Step advances the state machine using the given message. ctx.Err() will be returned, if any.
Step(ctx context.Context, msg pb.Message) error
@@ -156,11 +166,13 @@
// a long time to apply the snapshot data. To continue receiving Ready without blocking raft
// progress, it can call Advance before finishing applying the last ready.
Advance()
- // ApplyConfChange applies config change to the local node.
- // Returns an opaque ConfState protobuf which must be recorded
- // in snapshots. Will never return nil; it returns a pointer only
- // to match MemoryStorage.Compact.
- ApplyConfChange(cc pb.ConfChange) *pb.ConfState
+ // ApplyConfChange applies a config change (previously passed to
+ // ProposeConfChange) to the node. This must be called whenever a config
+ // change is observed in Ready.CommittedEntries.
+ //
+ // Returns an opaque non-nil ConfState protobuf which must be recorded in
+ // snapshots.
+ ApplyConfChange(cc pb.ConfChangeI) *pb.ConfState
// TransferLeadership attempts to transfer leadership to the given transferee.
TransferLeadership(ctx context.Context, lead, transferee uint64)
@@ -197,40 +209,21 @@
// StartNode returns a new Node given configuration and a list of raft peers.
// It appends a ConfChangeAddNode entry for each given peer to the initial log.
+//
+// Peers must not be zero length; call RestartNode in that case.
func StartNode(c *Config, peers []Peer) Node {
- r := newRaft(c)
- // become the follower at term 1 and apply initial configuration
- // entries of term 1
- r.becomeFollower(1, None)
- for _, peer := range peers {
- cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context}
- d, err := cc.Marshal()
- if err != nil {
- panic("unexpected marshal error")
- }
- e := pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: r.raftLog.lastIndex() + 1, Data: d}
- r.raftLog.append(e)
+ if len(peers) == 0 {
+ panic("no peers given; use RestartNode instead")
}
- // Mark these initial entries as committed.
- // TODO(bdarnell): These entries are still unstable; do we need to preserve
- // the invariant that committed < unstable?
- r.raftLog.committed = r.raftLog.lastIndex()
- // Now apply them, mainly so that the application can call Campaign
- // immediately after StartNode in tests. Note that these nodes will
- // be added to raft twice: here and when the application's Ready
- // loop calls ApplyConfChange. The calls to addNode must come after
- // all calls to raftLog.append so progress.next is set after these
- // bootstrapping entries (it is an error if we try to append these
- // entries since they have already been committed).
- // We do not set raftLog.applied so the application will be able
- // to observe all conf changes via Ready.CommittedEntries.
- for _, peer := range peers {
- r.addNode(peer.ID)
+ rn, err := NewRawNode(c)
+ if err != nil {
+ panic(err)
}
+ rn.Bootstrap(peers)
- n := newNode()
- n.logger = c.Logger
- go n.run(r)
+ n := newNode(rn)
+
+ go n.run()
return &n
}
@@ -239,11 +232,12 @@
// If the caller has an existing state machine, pass in the last log index that
// has been applied to it; otherwise use zero.
func RestartNode(c *Config) Node {
- r := newRaft(c)
-
- n := newNode()
- n.logger = c.Logger
- go n.run(r)
+ rn, err := NewRawNode(c)
+ if err != nil {
+ panic(err)
+ }
+ n := newNode(rn)
+ go n.run()
return &n
}
@@ -256,7 +250,7 @@
type node struct {
propc chan msgWithResult
recvc chan pb.Message
- confc chan pb.ConfChange
+ confc chan pb.ConfChangeV2
confstatec chan pb.ConfState
readyc chan Ready
advancec chan struct{}
@@ -265,14 +259,14 @@
stop chan struct{}
status chan chan Status
- logger Logger
+ rn *RawNode
}
-func newNode() node {
+func newNode(rn *RawNode) node {
return node{
propc: make(chan msgWithResult),
recvc: make(chan pb.Message),
- confc: make(chan pb.ConfChange),
+ confc: make(chan pb.ConfChangeV2),
confstatec: make(chan pb.ConfState),
readyc: make(chan Ready),
advancec: make(chan struct{}),
@@ -283,6 +277,7 @@
done: make(chan struct{}),
stop: make(chan struct{}),
status: make(chan chan Status),
+ rn: rn,
}
}
@@ -298,30 +293,30 @@
<-n.done
}
-func (n *node) run(r *raft) {
+func (n *node) run() {
var propc chan msgWithResult
var readyc chan Ready
var advancec chan struct{}
- var prevLastUnstablei, prevLastUnstablet uint64
- var havePrevLastUnstablei bool
- var prevSnapi uint64
- var applyingToI uint64
var rd Ready
+ r := n.rn.raft
+
lead := None
- prevSoftSt := r.softState()
- prevHardSt := emptyState
for {
if advancec != nil {
readyc = nil
- } else {
- rd = newReady(r, prevSoftSt, prevHardSt)
- if rd.containsUpdates() {
- readyc = n.readyc
- } else {
- readyc = nil
- }
+ } else if n.rn.HasReady() {
+ // Populate a Ready. Note that this Ready is not guaranteed to
+ // actually be handled. We will arm readyc, but there's no guarantee
+ // that we will actually send on it. It's possible that we will
+ // service another channel instead, loop around, and then populate
+ // the Ready again. We could instead force the previous Ready to be
+ // handled first, but it's generally good to emit larger Readys plus
+ // it simplifies testing (by emitting less frequently and more
+ // predictably).
+ rd = n.rn.readyWithoutAccept()
+ readyc = n.readyc
}
if lead != r.lead {
@@ -353,76 +348,46 @@
}
case m := <-n.recvc:
// filter out response message from unknown From.
- if pr := r.getProgress(m.From); pr != nil || !IsResponseMsg(m.Type) {
+ if pr := r.prs.Progress[m.From]; pr != nil || !IsResponseMsg(m.Type) {
r.Step(m)
}
case cc := <-n.confc:
- if cc.NodeID == None {
- select {
- case n.confstatec <- pb.ConfState{
- Nodes: r.nodes(),
- Learners: r.learnerNodes()}:
- case <-n.done:
+ _, okBefore := r.prs.Progress[r.id]
+ cs := r.applyConfChange(cc)
+ // If the node was removed, block incoming proposals. Note that we
+ // only do this if the node was in the config before. Nodes may be
+ // a member of the group without knowing this (when they're catching
+ // up on the log and don't have the latest config) and we don't want
+ // to block the proposal channel in that case.
+ //
+ // NB: propc is reset when the leader changes, which, if we learn
+ // about it, sort of implies that we got readded, maybe? This isn't
+ // very sound and likely has bugs.
+ if _, okAfter := r.prs.Progress[r.id]; okBefore && !okAfter {
+ var found bool
+ for _, sl := range [][]uint64{cs.Voters, cs.VotersOutgoing} {
+ for _, id := range sl {
+ if id == r.id {
+ found = true
+ }
+ }
}
- break
- }
- switch cc.Type {
- case pb.ConfChangeAddNode:
- r.addNode(cc.NodeID)
- case pb.ConfChangeAddLearnerNode:
- r.addLearner(cc.NodeID)
- case pb.ConfChangeRemoveNode:
- // block incoming proposal when local node is
- // removed
- if cc.NodeID == r.id {
+ if !found {
propc = nil
}
- r.removeNode(cc.NodeID)
- case pb.ConfChangeUpdateNode:
- default:
- panic("unexpected conf type")
}
select {
- case n.confstatec <- pb.ConfState{
- Nodes: r.nodes(),
- Learners: r.learnerNodes()}:
+ case n.confstatec <- cs:
case <-n.done:
}
case <-n.tickc:
- r.tick()
+ n.rn.Tick()
case readyc <- rd:
- if rd.SoftState != nil {
- prevSoftSt = rd.SoftState
- }
- if len(rd.Entries) > 0 {
- prevLastUnstablei = rd.Entries[len(rd.Entries)-1].Index
- prevLastUnstablet = rd.Entries[len(rd.Entries)-1].Term
- havePrevLastUnstablei = true
- }
- if !IsEmptyHardState(rd.HardState) {
- prevHardSt = rd.HardState
- }
- if !IsEmptySnap(rd.Snapshot) {
- prevSnapi = rd.Snapshot.Metadata.Index
- }
- if index := rd.appliedCursor(); index != 0 {
- applyingToI = index
- }
-
- r.msgs = nil
- r.readStates = nil
- r.reduceUncommittedSize(rd.CommittedEntries)
+ n.rn.acceptReady(rd)
advancec = n.advancec
case <-advancec:
- if applyingToI != 0 {
- r.raftLog.appliedTo(applyingToI)
- applyingToI = 0
- }
- if havePrevLastUnstablei {
- r.raftLog.stableTo(prevLastUnstablei, prevLastUnstablet)
- havePrevLastUnstablei = false
- }
- r.raftLog.stableSnapTo(prevSnapi)
+ n.rn.Advance(rd)
+ rd = Ready{}
advancec = nil
case c := <-n.status:
c <- getStatus(r)
@@ -440,7 +405,7 @@
case n.tickc <- struct{}{}:
case <-n.done:
default:
- n.logger.Warningf("A tick missed to fire. Node blocks too long!")
+ n.rn.raft.logger.Warningf("%x (leader %v) A tick missed to fire. Node blocks too long!", n.rn.raft.id, n.rn.raft.id == n.rn.raft.lead)
}
}
@@ -459,12 +424,20 @@
return n.step(ctx, m)
}
-func (n *node) ProposeConfChange(ctx context.Context, cc pb.ConfChange) error {
- data, err := cc.Marshal()
+func confChangeToMsg(c pb.ConfChangeI) (pb.Message, error) {
+ typ, data, err := pb.MarshalConfChange(c)
+ if err != nil {
+ return pb.Message{}, err
+ }
+ return pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Type: typ, Data: data}}}, nil
+}
+
+func (n *node) ProposeConfChange(ctx context.Context, cc pb.ConfChangeI) error {
+ msg, err := confChangeToMsg(cc)
if err != nil {
return err
}
- return n.Step(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange, Data: data}}})
+ return n.Step(ctx, msg)
}
func (n *node) step(ctx context.Context, m pb.Message) error {
@@ -525,10 +498,10 @@
}
}
-func (n *node) ApplyConfChange(cc pb.ConfChange) *pb.ConfState {
+func (n *node) ApplyConfChange(cc pb.ConfChangeI) *pb.ConfState {
var cs pb.ConfState
select {
- case n.confc <- cc:
+ case n.confc <- cc.AsV2():
case <-n.done:
}
select {