VOL-2112 move to voltha-lib-go
Change-Id: I3435b8acb982deeab6b6ac28e798d7722ad01d0a
diff --git a/vendor/go.etcd.io/etcd/raft/README.md b/vendor/go.etcd.io/etcd/raft/README.md
index a78e5f7..83cf040 100644
--- a/vendor/go.etcd.io/etcd/raft/README.md
+++ b/vendor/go.etcd.io/etcd/raft/README.md
@@ -3,7 +3,7 @@
Raft is a protocol with which a cluster of nodes can maintain a replicated state machine.
The state machine is kept in sync through the use of a replicated log.
For more details on Raft, see "In Search of an Understandable Consensus Algorithm"
-(https://ramcloud.stanford.edu/raft.pdf) by Diego Ongaro and John Ousterhout.
+(https://raft.github.io/raft.pdf) by Diego Ongaro and John Ousterhout.
This Raft library is stable and feature complete. As of 2016, it is **the most widely used** Raft library in production, serving tens of thousands clusters each day. It powers distributed systems such as etcd, Kubernetes, Docker Swarm, Cloud Foundry Diego, CockroachDB, TiDB, Project Calico, Flannel, and more.
@@ -190,7 +190,7 @@
## Implementation notes
-This implementation is up to date with the final Raft thesis (https://ramcloud.stanford.edu/~ongaro/thesis.pdf), although this implementation of the membership change protocol differs somewhat from that described in chapter 4. The key invariant that membership changes happen one node at a time is preserved, but in our implementation the membership change takes effect when its entry is applied, not when it is added to the log (so the entry is committed under the old membership instead of the new). This is equivalent in terms of safety, since the old and new configurations are guaranteed to overlap.
+This implementation is up to date with the final Raft thesis (https://github.com/ongardie/dissertation/blob/master/stanford.pdf), although this implementation of the membership change protocol differs somewhat from that described in chapter 4. The key invariant that membership changes happen one node at a time is preserved, but in our implementation the membership change takes effect when its entry is applied, not when it is added to the log (so the entry is committed under the old membership instead of the new). This is equivalent in terms of safety, since the old and new configurations are guaranteed to overlap.
To ensure there is no attempt to commit two membership changes at once by matching log positions (which would be unsafe since they should have different quorum requirements), any proposed membership change is simply disallowed while any uncommitted change appears in the leader's log.
diff --git a/vendor/go.etcd.io/etcd/raft/bootstrap.go b/vendor/go.etcd.io/etcd/raft/bootstrap.go
new file mode 100644
index 0000000..bd82b20
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/raft/bootstrap.go
@@ -0,0 +1,80 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package raft
+
+import (
+ "errors"
+
+ pb "go.etcd.io/etcd/raft/raftpb"
+)
+
+// Bootstrap initializes the RawNode for first use by appending configuration
+// changes for the supplied peers. This method returns an error if the Storage
+// is nonempty.
+//
+// It is recommended that instead of calling this method, applications bootstrap
+// their state manually by setting up a Storage that has a first index > 1 and
+// which stores the desired ConfState as its InitialState.
+func (rn *RawNode) Bootstrap(peers []Peer) error {
+ if len(peers) == 0 {
+ return errors.New("must provide at least one peer to Bootstrap")
+ }
+ lastIndex, err := rn.raft.raftLog.storage.LastIndex()
+ if err != nil {
+ return err
+ }
+
+ if lastIndex != 0 {
+ return errors.New("can't bootstrap a nonempty Storage")
+ }
+
+ // We've faked out initial entries above, but nothing has been
+ // persisted. Start with an empty HardState (thus the first Ready will
+ // emit a HardState update for the app to persist).
+ rn.prevHardSt = emptyState
+
+ // TODO(tbg): remove StartNode and give the application the right tools to
+ // bootstrap the initial membership in a cleaner way.
+ rn.raft.becomeFollower(1, None)
+ ents := make([]pb.Entry, len(peers))
+ for i, peer := range peers {
+ cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context}
+ data, err := cc.Marshal()
+ if err != nil {
+ return err
+ }
+
+ ents[i] = pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: uint64(i + 1), Data: data}
+ }
+ rn.raft.raftLog.append(ents...)
+
+ // Now apply them, mainly so that the application can call Campaign
+ // immediately after StartNode in tests. Note that these nodes will
+ // be added to raft twice: here and when the application's Ready
+ // loop calls ApplyConfChange. The calls to addNode must come after
+ // all calls to raftLog.append so progress.next is set after these
+ // bootstrapping entries (it is an error if we try to append these
+ // entries since they have already been committed).
+ // We do not set raftLog.applied so the application will be able
+ // to observe all conf changes via Ready.CommittedEntries.
+ //
+ // TODO(bdarnell): These entries are still unstable; do we need to preserve
+ // the invariant that committed < unstable?
+ rn.raft.raftLog.committed = uint64(len(ents))
+ for _, peer := range peers {
+ rn.raft.applyConfChange(pb.ConfChange{NodeID: peer.ID, Type: pb.ConfChangeAddNode}.AsV2())
+ }
+ return nil
+}
diff --git a/vendor/go.etcd.io/etcd/raft/confchange/confchange.go b/vendor/go.etcd.io/etcd/raft/confchange/confchange.go
new file mode 100644
index 0000000..a0dc486
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/raft/confchange/confchange.go
@@ -0,0 +1,425 @@
+// Copyright 2019 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package confchange
+
+import (
+ "errors"
+ "fmt"
+ "strings"
+
+ "go.etcd.io/etcd/raft/quorum"
+ pb "go.etcd.io/etcd/raft/raftpb"
+ "go.etcd.io/etcd/raft/tracker"
+)
+
+// Changer facilitates configuration changes. It exposes methods to handle
+// simple and joint consensus while performing the proper validation that allows
+// refusing invalid configuration changes before they affect the active
+// configuration.
+type Changer struct {
+ Tracker tracker.ProgressTracker
+ LastIndex uint64
+}
+
+// EnterJoint verifies that the outgoing (=right) majority config of the joint
+// config is empty and initializes it with a copy of the incoming (=left)
+// majority config. That is, it transitions from
+//
+// (1 2 3)&&()
+// to
+// (1 2 3)&&(1 2 3).
+//
+// The supplied changes are then applied to the incoming majority config,
+// resulting in a joint configuration that in terms of the Raft thesis[1]
+// (Section 4.3) corresponds to `C_{new,old}`.
+//
+// [1]: https://github.com/ongardie/dissertation/blob/master/online-trim.pdf
+func (c Changer) EnterJoint(autoLeave bool, ccs ...pb.ConfChangeSingle) (tracker.Config, tracker.ProgressMap, error) {
+ cfg, prs, err := c.checkAndCopy()
+ if err != nil {
+ return c.err(err)
+ }
+ if joint(cfg) {
+ err := errors.New("config is already joint")
+ return c.err(err)
+ }
+ if len(incoming(cfg.Voters)) == 0 {
+ // We allow adding nodes to an empty config for convenience (testing and
+ // bootstrap), but you can't enter a joint state.
+ err := errors.New("can't make a zero-voter config joint")
+ return c.err(err)
+ }
+ // Clear the outgoing config.
+ *outgoingPtr(&cfg.Voters) = quorum.MajorityConfig{}
+ // Copy incoming to outgoing.
+ for id := range incoming(cfg.Voters) {
+ outgoing(cfg.Voters)[id] = struct{}{}
+ }
+
+ if err := c.apply(&cfg, prs, ccs...); err != nil {
+ return c.err(err)
+ }
+ cfg.AutoLeave = autoLeave
+ return checkAndReturn(cfg, prs)
+}
+
+// LeaveJoint transitions out of a joint configuration. It is an error to call
+// this method if the configuration is not joint, i.e. if the outgoing majority
+// config Voters[1] is empty.
+//
+// The outgoing majority config of the joint configuration will be removed,
+// that is, the incoming config is promoted as the sole decision maker. In the
+// notation of the Raft thesis[1] (Section 4.3), this method transitions from
+// `C_{new,old}` into `C_new`.
+//
+// At the same time, any staged learners (LearnersNext) the addition of which
+// was held back by an overlapping voter in the former outgoing config will be
+// inserted into Learners.
+//
+// [1]: https://github.com/ongardie/dissertation/blob/master/online-trim.pdf
+func (c Changer) LeaveJoint() (tracker.Config, tracker.ProgressMap, error) {
+ cfg, prs, err := c.checkAndCopy()
+ if err != nil {
+ return c.err(err)
+ }
+ if !joint(cfg) {
+ err := errors.New("can't leave a non-joint config")
+ return c.err(err)
+ }
+ if len(outgoing(cfg.Voters)) == 0 {
+ err := fmt.Errorf("configuration is not joint: %v", cfg)
+ return c.err(err)
+ }
+ for id := range cfg.LearnersNext {
+ nilAwareAdd(&cfg.Learners, id)
+ prs[id].IsLearner = true
+ }
+ cfg.LearnersNext = nil
+
+ for id := range outgoing(cfg.Voters) {
+ _, isVoter := incoming(cfg.Voters)[id]
+ _, isLearner := cfg.Learners[id]
+
+ if !isVoter && !isLearner {
+ delete(prs, id)
+ }
+ }
+ *outgoingPtr(&cfg.Voters) = nil
+ cfg.AutoLeave = false
+
+ return checkAndReturn(cfg, prs)
+}
+
+// Simple carries out a series of configuration changes that (in aggregate)
+// mutates the incoming majority config Voters[0] by at most one. This method
+// will return an error if that is not the case, if the resulting quorum is
+// zero, or if the configuration is in a joint state (i.e. if there is an
+// outgoing configuration).
+func (c Changer) Simple(ccs ...pb.ConfChangeSingle) (tracker.Config, tracker.ProgressMap, error) {
+ cfg, prs, err := c.checkAndCopy()
+ if err != nil {
+ return c.err(err)
+ }
+ if joint(cfg) {
+ err := errors.New("can't apply simple config change in joint config")
+ return c.err(err)
+ }
+ if err := c.apply(&cfg, prs, ccs...); err != nil {
+ return c.err(err)
+ }
+ if n := symdiff(incoming(c.Tracker.Voters), incoming(cfg.Voters)); n > 1 {
+ return tracker.Config{}, nil, errors.New("more than one voter changed without entering joint config")
+ }
+ if err := checkInvariants(cfg, prs); err != nil {
+ return tracker.Config{}, tracker.ProgressMap{}, nil
+ }
+
+ return checkAndReturn(cfg, prs)
+}
+
+// apply a change to the configuration. By convention, changes to voters are
+// always made to the incoming majority config Voters[0]. Voters[1] is either
+// empty or preserves the outgoing majority configuration while in a joint state.
+func (c Changer) apply(cfg *tracker.Config, prs tracker.ProgressMap, ccs ...pb.ConfChangeSingle) error {
+ for _, cc := range ccs {
+ if cc.NodeID == 0 {
+ // etcd replaces the NodeID with zero if it decides (downstream of
+ // raft) to not apply a change, so we have to have explicit code
+ // here to ignore these.
+ continue
+ }
+ switch cc.Type {
+ case pb.ConfChangeAddNode:
+ c.makeVoter(cfg, prs, cc.NodeID)
+ case pb.ConfChangeAddLearnerNode:
+ c.makeLearner(cfg, prs, cc.NodeID)
+ case pb.ConfChangeRemoveNode:
+ c.remove(cfg, prs, cc.NodeID)
+ case pb.ConfChangeUpdateNode:
+ default:
+ return fmt.Errorf("unexpected conf type %d", cc.Type)
+ }
+ }
+ if len(incoming(cfg.Voters)) == 0 {
+ return errors.New("removed all voters")
+ }
+ return nil
+}
+
+// makeVoter adds or promotes the given ID to be a voter in the incoming
+// majority config.
+func (c Changer) makeVoter(cfg *tracker.Config, prs tracker.ProgressMap, id uint64) {
+ pr := prs[id]
+ if pr == nil {
+ c.initProgress(cfg, prs, id, false /* isLearner */)
+ return
+ }
+
+ pr.IsLearner = false
+ nilAwareDelete(&cfg.Learners, id)
+ nilAwareDelete(&cfg.LearnersNext, id)
+ incoming(cfg.Voters)[id] = struct{}{}
+ return
+}
+
+// makeLearner makes the given ID a learner or stages it to be a learner once
+// an active joint configuration is exited.
+//
+// The former happens when the peer is not a part of the outgoing config, in
+// which case we either add a new learner or demote a voter in the incoming
+// config.
+//
+// The latter case occurs when the configuration is joint and the peer is a
+// voter in the outgoing config. In that case, we do not want to add the peer
+// as a learner because then we'd have to track a peer as a voter and learner
+// simultaneously. Instead, we add the learner to LearnersNext, so that it will
+// be added to Learners the moment the outgoing config is removed by
+// LeaveJoint().
+func (c Changer) makeLearner(cfg *tracker.Config, prs tracker.ProgressMap, id uint64) {
+ pr := prs[id]
+ if pr == nil {
+ c.initProgress(cfg, prs, id, true /* isLearner */)
+ return
+ }
+ if pr.IsLearner {
+ return
+ }
+ // Remove any existing voter in the incoming config...
+ c.remove(cfg, prs, id)
+ // ... but save the Progress.
+ prs[id] = pr
+ // Use LearnersNext if we can't add the learner to Learners directly, i.e.
+ // if the peer is still tracked as a voter in the outgoing config. It will
+ // be turned into a learner in LeaveJoint().
+ //
+ // Otherwise, add a regular learner right away.
+ if _, onRight := outgoing(cfg.Voters)[id]; onRight {
+ nilAwareAdd(&cfg.LearnersNext, id)
+ } else {
+ pr.IsLearner = true
+ nilAwareAdd(&cfg.Learners, id)
+ }
+}
+
+// remove this peer as a voter or learner from the incoming config.
+func (c Changer) remove(cfg *tracker.Config, prs tracker.ProgressMap, id uint64) {
+ if _, ok := prs[id]; !ok {
+ return
+ }
+
+ delete(incoming(cfg.Voters), id)
+ nilAwareDelete(&cfg.Learners, id)
+ nilAwareDelete(&cfg.LearnersNext, id)
+
+ // If the peer is still a voter in the outgoing config, keep the Progress.
+ if _, onRight := outgoing(cfg.Voters)[id]; !onRight {
+ delete(prs, id)
+ }
+}
+
+// initProgress initializes a new progress for the given node or learner.
+func (c Changer) initProgress(cfg *tracker.Config, prs tracker.ProgressMap, id uint64, isLearner bool) {
+ if !isLearner {
+ incoming(cfg.Voters)[id] = struct{}{}
+ } else {
+ nilAwareAdd(&cfg.Learners, id)
+ }
+ prs[id] = &tracker.Progress{
+ // Initializing the Progress with the last index means that the follower
+ // can be probed (with the last index).
+ //
+ // TODO(tbg): seems awfully optimistic. Using the first index would be
+ // better. The general expectation here is that the follower has no log
+ // at all (and will thus likely need a snapshot), though the app may
+ // have applied a snapshot out of band before adding the replica (thus
+ // making the first index the better choice).
+ Next: c.LastIndex,
+ Match: 0,
+ Inflights: tracker.NewInflights(c.Tracker.MaxInflight),
+ IsLearner: isLearner,
+ // When a node is first added, we should mark it as recently active.
+ // Otherwise, CheckQuorum may cause us to step down if it is invoked
+ // before the added node has had a chance to communicate with us.
+ RecentActive: true,
+ }
+}
+
+// checkInvariants makes sure that the config and progress are compatible with
+// each other. This is used to check both what the Changer is initialized with,
+// as well as what it returns.
+func checkInvariants(cfg tracker.Config, prs tracker.ProgressMap) error {
+ // NB: intentionally allow the empty config. In production we'll never see a
+ // non-empty config (we prevent it from being created) but we will need to
+ // be able to *create* an initial config, for example during bootstrap (or
+ // during tests). Instead of having to hand-code this, we allow
+ // transitioning from an empty config into any other legal and non-empty
+ // config.
+ for _, ids := range []map[uint64]struct{}{
+ cfg.Voters.IDs(),
+ cfg.Learners,
+ cfg.LearnersNext,
+ } {
+ for id := range ids {
+ if _, ok := prs[id]; !ok {
+ return fmt.Errorf("no progress for %d", id)
+ }
+ }
+ }
+
+ // Any staged learner was staged because it could not be directly added due
+ // to a conflicting voter in the outgoing config.
+ for id := range cfg.LearnersNext {
+ if _, ok := outgoing(cfg.Voters)[id]; !ok {
+ return fmt.Errorf("%d is in LearnersNext, but not Voters[1]", id)
+ }
+ if prs[id].IsLearner {
+ return fmt.Errorf("%d is in LearnersNext, but is already marked as learner", id)
+ }
+ }
+ // Conversely Learners and Voters doesn't intersect at all.
+ for id := range cfg.Learners {
+ if _, ok := outgoing(cfg.Voters)[id]; ok {
+ return fmt.Errorf("%d is in Learners and Voters[1]", id)
+ }
+ if _, ok := incoming(cfg.Voters)[id]; ok {
+ return fmt.Errorf("%d is in Learners and Voters[0]", id)
+ }
+ if !prs[id].IsLearner {
+ return fmt.Errorf("%d is in Learners, but is not marked as learner", id)
+ }
+ }
+
+ if !joint(cfg) {
+ // We enforce that empty maps are nil instead of zero.
+ if outgoing(cfg.Voters) != nil {
+ return fmt.Errorf("Voters[1] must be nil when not joint")
+ }
+ if cfg.LearnersNext != nil {
+ return fmt.Errorf("LearnersNext must be nil when not joint")
+ }
+ if cfg.AutoLeave {
+ return fmt.Errorf("AutoLeave must be false when not joint")
+ }
+ }
+
+ return nil
+}
+
+// checkAndCopy copies the tracker's config and progress map (deeply enough for
+// the purposes of the Changer) and returns those copies. It returns an error
+// if checkInvariants does.
+func (c Changer) checkAndCopy() (tracker.Config, tracker.ProgressMap, error) {
+ cfg := c.Tracker.Config.Clone()
+ prs := tracker.ProgressMap{}
+
+ for id, pr := range c.Tracker.Progress {
+ // A shallow copy is enough because we only mutate the Learner field.
+ ppr := *pr
+ prs[id] = &ppr
+ }
+ return checkAndReturn(cfg, prs)
+}
+
+// checkAndReturn calls checkInvariants on the input and returns either the
+// resulting error or the input.
+func checkAndReturn(cfg tracker.Config, prs tracker.ProgressMap) (tracker.Config, tracker.ProgressMap, error) {
+ if err := checkInvariants(cfg, prs); err != nil {
+ return tracker.Config{}, tracker.ProgressMap{}, err
+ }
+ return cfg, prs, nil
+}
+
+// err returns zero values and an error.
+func (c Changer) err(err error) (tracker.Config, tracker.ProgressMap, error) {
+ return tracker.Config{}, nil, err
+}
+
+// nilAwareAdd populates a map entry, creating the map if necessary.
+func nilAwareAdd(m *map[uint64]struct{}, id uint64) {
+ if *m == nil {
+ *m = map[uint64]struct{}{}
+ }
+ (*m)[id] = struct{}{}
+}
+
+// nilAwareDelete deletes from a map, nil'ing the map itself if it is empty after.
+func nilAwareDelete(m *map[uint64]struct{}, id uint64) {
+ if *m == nil {
+ return
+ }
+ delete(*m, id)
+ if len(*m) == 0 {
+ *m = nil
+ }
+}
+
+// symdiff returns the count of the symmetric difference between the sets of
+// uint64s, i.e. len( (l - r) \union (r - l)).
+func symdiff(l, r map[uint64]struct{}) int {
+ var n int
+ pairs := [][2]quorum.MajorityConfig{
+ {l, r}, // count elems in l but not in r
+ {r, l}, // count elems in r but not in l
+ }
+ for _, p := range pairs {
+ for id := range p[0] {
+ if _, ok := p[1][id]; !ok {
+ n++
+ }
+ }
+ }
+ return n
+}
+
+func joint(cfg tracker.Config) bool {
+ return len(outgoing(cfg.Voters)) > 0
+}
+
+func incoming(voters quorum.JointConfig) quorum.MajorityConfig { return voters[0] }
+func outgoing(voters quorum.JointConfig) quorum.MajorityConfig { return voters[1] }
+func outgoingPtr(voters *quorum.JointConfig) *quorum.MajorityConfig { return &voters[1] }
+
+// Describe prints the type and NodeID of the configuration changes as a
+// space-delimited string.
+func Describe(ccs ...pb.ConfChangeSingle) string {
+ var buf strings.Builder
+ for _, cc := range ccs {
+ if buf.Len() > 0 {
+ buf.WriteByte(' ')
+ }
+ fmt.Fprintf(&buf, "%s(%d)", cc.Type, cc.NodeID)
+ }
+ return buf.String()
+}
diff --git a/vendor/go.etcd.io/etcd/raft/confchange/restore.go b/vendor/go.etcd.io/etcd/raft/confchange/restore.go
new file mode 100644
index 0000000..724068d
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/raft/confchange/restore.go
@@ -0,0 +1,155 @@
+// Copyright 2019 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package confchange
+
+import (
+ pb "go.etcd.io/etcd/raft/raftpb"
+ "go.etcd.io/etcd/raft/tracker"
+)
+
+// toConfChangeSingle translates a conf state into 1) a slice of operations creating
+// first the config that will become the outgoing one, and then the incoming one, and
+// b) another slice that, when applied to the config resulted from 1), represents the
+// ConfState.
+func toConfChangeSingle(cs pb.ConfState) (out []pb.ConfChangeSingle, in []pb.ConfChangeSingle) {
+ // Example to follow along this code:
+ // voters=(1 2 3) learners=(5) outgoing=(1 2 4 6) learners_next=(4)
+ //
+ // This means that before entering the joint config, the configuration
+ // had voters (1 2 4) and perhaps some learners that are already gone.
+ // The new set of voters is (1 2 3), i.e. (1 2) were kept around, and (4 6)
+ // are no longer voters; however 4 is poised to become a learner upon leaving
+ // the joint state.
+ // We can't tell whether 5 was a learner before entering the joint config,
+ // but it doesn't matter (we'll pretend that it wasn't).
+ //
+ // The code below will construct
+ // outgoing = add 1; add 2; add 4; add 6
+ // incoming = remove 1; remove 2; remove 4; remove 6
+ // add 1; add 2; add 3;
+ // add-learner 5;
+ // add-learner 4;
+ //
+ // So, when starting with an empty config, after applying 'outgoing' we have
+ //
+ // quorum=(1 2 4 6)
+ //
+ // From which we enter a joint state via 'incoming'
+ //
+ // quorum=(1 2 3)&&(1 2 4 6) learners=(5) learners_next=(4)
+ //
+ // as desired.
+
+ for _, id := range cs.VotersOutgoing {
+ // If there are outgoing voters, first add them one by one so that the
+ // (non-joint) config has them all.
+ out = append(out, pb.ConfChangeSingle{
+ Type: pb.ConfChangeAddNode,
+ NodeID: id,
+ })
+
+ }
+
+ // We're done constructing the outgoing slice, now on to the incoming one
+ // (which will apply on top of the config created by the outgoing slice).
+
+ // First, we'll remove all of the outgoing voters.
+ for _, id := range cs.VotersOutgoing {
+ in = append(in, pb.ConfChangeSingle{
+ Type: pb.ConfChangeRemoveNode,
+ NodeID: id,
+ })
+ }
+ // Then we'll add the incoming voters and learners.
+ for _, id := range cs.Voters {
+ in = append(in, pb.ConfChangeSingle{
+ Type: pb.ConfChangeAddNode,
+ NodeID: id,
+ })
+ }
+ for _, id := range cs.Learners {
+ in = append(in, pb.ConfChangeSingle{
+ Type: pb.ConfChangeAddLearnerNode,
+ NodeID: id,
+ })
+ }
+ // Same for LearnersNext; these are nodes we want to be learners but which
+ // are currently voters in the outgoing config.
+ for _, id := range cs.LearnersNext {
+ in = append(in, pb.ConfChangeSingle{
+ Type: pb.ConfChangeAddLearnerNode,
+ NodeID: id,
+ })
+ }
+ return out, in
+}
+
+func chain(chg Changer, ops ...func(Changer) (tracker.Config, tracker.ProgressMap, error)) (tracker.Config, tracker.ProgressMap, error) {
+ for _, op := range ops {
+ cfg, prs, err := op(chg)
+ if err != nil {
+ return tracker.Config{}, nil, err
+ }
+ chg.Tracker.Config = cfg
+ chg.Tracker.Progress = prs
+ }
+ return chg.Tracker.Config, chg.Tracker.Progress, nil
+}
+
+// Restore takes a Changer (which must represent an empty configuration), and
+// runs a sequence of changes enacting the configuration described in the
+// ConfState.
+//
+// TODO(tbg) it's silly that this takes a Changer. Unravel this by making sure
+// the Changer only needs a ProgressMap (not a whole Tracker) at which point
+// this can just take LastIndex and MaxInflight directly instead and cook up
+// the results from that alone.
+func Restore(chg Changer, cs pb.ConfState) (tracker.Config, tracker.ProgressMap, error) {
+ outgoing, incoming := toConfChangeSingle(cs)
+
+ var ops []func(Changer) (tracker.Config, tracker.ProgressMap, error)
+
+ if len(outgoing) == 0 {
+ // No outgoing config, so just apply the incoming changes one by one.
+ for _, cc := range incoming {
+ cc := cc // loop-local copy
+ ops = append(ops, func(chg Changer) (tracker.Config, tracker.ProgressMap, error) {
+ return chg.Simple(cc)
+ })
+ }
+ } else {
+ // The ConfState describes a joint configuration.
+ //
+ // First, apply all of the changes of the outgoing config one by one, so
+ // that it temporarily becomes the incoming active config. For example,
+ // if the config is (1 2 3)&(2 3 4), this will establish (2 3 4)&().
+ for _, cc := range outgoing {
+ cc := cc // loop-local copy
+ ops = append(ops, func(chg Changer) (tracker.Config, tracker.ProgressMap, error) {
+ return chg.Simple(cc)
+ })
+ }
+ // Now enter the joint state, which rotates the above additions into the
+ // outgoing config, and adds the incoming config in. Continuing the
+ // example above, we'd get (1 2 3)&(2 3 4), i.e. the incoming operations
+ // would be removing 2,3,4 and then adding in 1,2,3 while transitioning
+ // into a joint state.
+ ops = append(ops, func(chg Changer) (tracker.Config, tracker.ProgressMap, error) {
+ return chg.EnterJoint(cs.AutoLeave, incoming...)
+ })
+ }
+
+ return chain(chg, ops...)
+}
diff --git a/vendor/go.etcd.io/etcd/raft/doc.go b/vendor/go.etcd.io/etcd/raft/doc.go
index c30d884..68fe6f0 100644
--- a/vendor/go.etcd.io/etcd/raft/doc.go
+++ b/vendor/go.etcd.io/etcd/raft/doc.go
@@ -19,7 +19,7 @@
Raft is a protocol with which a cluster of nodes can maintain a replicated state machine.
The state machine is kept in sync through the use of a replicated log.
For more details on Raft, see "In Search of an Understandable Consensus Algorithm"
-(https://ramcloud.stanford.edu/raft.pdf) by Diego Ongaro and John Ousterhout.
+(https://raft.github.io/raft.pdf) by Diego Ongaro and John Ousterhout.
A simple example application, _raftexample_, is also available to help illustrate
how to use this package in practice:
@@ -172,7 +172,7 @@
Implementation notes
This implementation is up to date with the final Raft thesis
-(https://ramcloud.stanford.edu/~ongaro/thesis.pdf), although our
+(https://github.com/ongardie/dissertation/blob/master/stanford.pdf), although our
implementation of the membership change protocol differs somewhat from
that described in chapter 4. The key invariant that membership changes
happen one node at a time is preserved, but in our implementation the
diff --git a/vendor/go.etcd.io/etcd/raft/log.go b/vendor/go.etcd.io/etcd/raft/log.go
index 03f83e6..77eedfc 100644
--- a/vendor/go.etcd.io/etcd/raft/log.go
+++ b/vendor/go.etcd.io/etcd/raft/log.go
@@ -332,8 +332,10 @@
if hi > l.unstable.offset {
unstable := l.unstable.slice(max(lo, l.unstable.offset), hi)
if len(ents) > 0 {
- ents = append([]pb.Entry{}, ents...)
- ents = append(ents, unstable...)
+ combined := make([]pb.Entry, len(ents)+len(unstable))
+ n := copy(combined, ents)
+ copy(combined[n:], unstable)
+ ents = combined
} else {
ents = unstable
}
diff --git a/vendor/go.etcd.io/etcd/raft/log_unstable.go b/vendor/go.etcd.io/etcd/raft/log_unstable.go
index 1005bf6..1bff5a7 100644
--- a/vendor/go.etcd.io/etcd/raft/log_unstable.go
+++ b/vendor/go.etcd.io/etcd/raft/log_unstable.go
@@ -55,10 +55,7 @@
// is any.
func (u *unstable) maybeTerm(i uint64) (uint64, bool) {
if i < u.offset {
- if u.snapshot == nil {
- return 0, false
- }
- if u.snapshot.Metadata.Index == i {
+ if u.snapshot != nil && u.snapshot.Metadata.Index == i {
return u.snapshot.Metadata.Term, true
}
return 0, false
@@ -71,6 +68,7 @@
if i > last {
return 0, false
}
+
return u.entries[i-u.offset].Term, true
}
diff --git a/vendor/go.etcd.io/etcd/raft/logger.go b/vendor/go.etcd.io/etcd/raft/logger.go
index 426a77d..6d89629 100644
--- a/vendor/go.etcd.io/etcd/raft/logger.go
+++ b/vendor/go.etcd.io/etcd/raft/logger.go
@@ -19,6 +19,7 @@
"io/ioutil"
"log"
"os"
+ "sync"
)
type Logger interface {
@@ -41,11 +42,16 @@
Panicf(format string, v ...interface{})
}
-func SetLogger(l Logger) { raftLogger = l }
+func SetLogger(l Logger) {
+ raftLoggerMu.Lock()
+ raftLogger = l
+ raftLoggerMu.Unlock()
+}
var (
defaultLogger = &DefaultLogger{Logger: log.New(os.Stderr, "raft", log.LstdFlags)}
discardLogger = &DefaultLogger{Logger: log.New(ioutil.Discard, "", 0)}
+ raftLoggerMu sync.Mutex
raftLogger = Logger(defaultLogger)
)
diff --git a/vendor/go.etcd.io/etcd/raft/node.go b/vendor/go.etcd.io/etcd/raft/node.go
index 2ec2c3a..ab6185b 100644
--- a/vendor/go.etcd.io/etcd/raft/node.go
+++ b/vendor/go.etcd.io/etcd/raft/node.go
@@ -132,10 +132,20 @@
// Propose proposes that data be appended to the log. Note that proposals can be lost without
// notice, therefore it is user's job to ensure proposal retries.
Propose(ctx context.Context, data []byte) error
- // ProposeConfChange proposes config change.
- // At most one ConfChange can be in the process of going through consensus.
- // Application needs to call ApplyConfChange when applying EntryConfChange type entry.
- ProposeConfChange(ctx context.Context, cc pb.ConfChange) error
+ // ProposeConfChange proposes a configuration change. Like any proposal, the
+ // configuration change may be dropped with or without an error being
+ // returned. In particular, configuration changes are dropped unless the
+ // leader has certainty that there is no prior unapplied configuration
+ // change in its log.
+ //
+ // The method accepts either a pb.ConfChange (deprecated) or pb.ConfChangeV2
+ // message. The latter allows arbitrary configuration changes via joint
+ // consensus, notably including replacing a voter. Passing a ConfChangeV2
+ // message is only allowed if all Nodes participating in the cluster run a
+ // version of this library aware of the V2 API. See pb.ConfChangeV2 for
+ // usage details and semantics.
+ ProposeConfChange(ctx context.Context, cc pb.ConfChangeI) error
+
// Step advances the state machine using the given message. ctx.Err() will be returned, if any.
Step(ctx context.Context, msg pb.Message) error
@@ -156,11 +166,13 @@
// a long time to apply the snapshot data. To continue receiving Ready without blocking raft
// progress, it can call Advance before finishing applying the last ready.
Advance()
- // ApplyConfChange applies config change to the local node.
- // Returns an opaque ConfState protobuf which must be recorded
- // in snapshots. Will never return nil; it returns a pointer only
- // to match MemoryStorage.Compact.
- ApplyConfChange(cc pb.ConfChange) *pb.ConfState
+ // ApplyConfChange applies a config change (previously passed to
+ // ProposeConfChange) to the node. This must be called whenever a config
+ // change is observed in Ready.CommittedEntries.
+ //
+ // Returns an opaque non-nil ConfState protobuf which must be recorded in
+ // snapshots.
+ ApplyConfChange(cc pb.ConfChangeI) *pb.ConfState
// TransferLeadership attempts to transfer leadership to the given transferee.
TransferLeadership(ctx context.Context, lead, transferee uint64)
@@ -197,40 +209,21 @@
// StartNode returns a new Node given configuration and a list of raft peers.
// It appends a ConfChangeAddNode entry for each given peer to the initial log.
+//
+// Peers must not be zero length; call RestartNode in that case.
func StartNode(c *Config, peers []Peer) Node {
- r := newRaft(c)
- // become the follower at term 1 and apply initial configuration
- // entries of term 1
- r.becomeFollower(1, None)
- for _, peer := range peers {
- cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context}
- d, err := cc.Marshal()
- if err != nil {
- panic("unexpected marshal error")
- }
- e := pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: r.raftLog.lastIndex() + 1, Data: d}
- r.raftLog.append(e)
+ if len(peers) == 0 {
+ panic("no peers given; use RestartNode instead")
}
- // Mark these initial entries as committed.
- // TODO(bdarnell): These entries are still unstable; do we need to preserve
- // the invariant that committed < unstable?
- r.raftLog.committed = r.raftLog.lastIndex()
- // Now apply them, mainly so that the application can call Campaign
- // immediately after StartNode in tests. Note that these nodes will
- // be added to raft twice: here and when the application's Ready
- // loop calls ApplyConfChange. The calls to addNode must come after
- // all calls to raftLog.append so progress.next is set after these
- // bootstrapping entries (it is an error if we try to append these
- // entries since they have already been committed).
- // We do not set raftLog.applied so the application will be able
- // to observe all conf changes via Ready.CommittedEntries.
- for _, peer := range peers {
- r.addNode(peer.ID)
+ rn, err := NewRawNode(c)
+ if err != nil {
+ panic(err)
}
+ rn.Bootstrap(peers)
- n := newNode()
- n.logger = c.Logger
- go n.run(r)
+ n := newNode(rn)
+
+ go n.run()
return &n
}
@@ -239,11 +232,12 @@
// If the caller has an existing state machine, pass in the last log index that
// has been applied to it; otherwise use zero.
func RestartNode(c *Config) Node {
- r := newRaft(c)
-
- n := newNode()
- n.logger = c.Logger
- go n.run(r)
+ rn, err := NewRawNode(c)
+ if err != nil {
+ panic(err)
+ }
+ n := newNode(rn)
+ go n.run()
return &n
}
@@ -256,7 +250,7 @@
type node struct {
propc chan msgWithResult
recvc chan pb.Message
- confc chan pb.ConfChange
+ confc chan pb.ConfChangeV2
confstatec chan pb.ConfState
readyc chan Ready
advancec chan struct{}
@@ -265,14 +259,14 @@
stop chan struct{}
status chan chan Status
- logger Logger
+ rn *RawNode
}
-func newNode() node {
+func newNode(rn *RawNode) node {
return node{
propc: make(chan msgWithResult),
recvc: make(chan pb.Message),
- confc: make(chan pb.ConfChange),
+ confc: make(chan pb.ConfChangeV2),
confstatec: make(chan pb.ConfState),
readyc: make(chan Ready),
advancec: make(chan struct{}),
@@ -283,6 +277,7 @@
done: make(chan struct{}),
stop: make(chan struct{}),
status: make(chan chan Status),
+ rn: rn,
}
}
@@ -298,30 +293,30 @@
<-n.done
}
-func (n *node) run(r *raft) {
+func (n *node) run() {
var propc chan msgWithResult
var readyc chan Ready
var advancec chan struct{}
- var prevLastUnstablei, prevLastUnstablet uint64
- var havePrevLastUnstablei bool
- var prevSnapi uint64
- var applyingToI uint64
var rd Ready
+ r := n.rn.raft
+
lead := None
- prevSoftSt := r.softState()
- prevHardSt := emptyState
for {
if advancec != nil {
readyc = nil
- } else {
- rd = newReady(r, prevSoftSt, prevHardSt)
- if rd.containsUpdates() {
- readyc = n.readyc
- } else {
- readyc = nil
- }
+ } else if n.rn.HasReady() {
+ // Populate a Ready. Note that this Ready is not guaranteed to
+ // actually be handled. We will arm readyc, but there's no guarantee
+ // that we will actually send on it. It's possible that we will
+ // service another channel instead, loop around, and then populate
+ // the Ready again. We could instead force the previous Ready to be
+ // handled first, but it's generally good to emit larger Readys plus
+ // it simplifies testing (by emitting less frequently and more
+ // predictably).
+ rd = n.rn.readyWithoutAccept()
+ readyc = n.readyc
}
if lead != r.lead {
@@ -353,76 +348,46 @@
}
case m := <-n.recvc:
// filter out response message from unknown From.
- if pr := r.getProgress(m.From); pr != nil || !IsResponseMsg(m.Type) {
+ if pr := r.prs.Progress[m.From]; pr != nil || !IsResponseMsg(m.Type) {
r.Step(m)
}
case cc := <-n.confc:
- if cc.NodeID == None {
- select {
- case n.confstatec <- pb.ConfState{
- Nodes: r.nodes(),
- Learners: r.learnerNodes()}:
- case <-n.done:
+ _, okBefore := r.prs.Progress[r.id]
+ cs := r.applyConfChange(cc)
+ // If the node was removed, block incoming proposals. Note that we
+ // only do this if the node was in the config before. Nodes may be
+ // a member of the group without knowing this (when they're catching
+ // up on the log and don't have the latest config) and we don't want
+ // to block the proposal channel in that case.
+ //
+ // NB: propc is reset when the leader changes, which, if we learn
+ // about it, sort of implies that we got readded, maybe? This isn't
+ // very sound and likely has bugs.
+ if _, okAfter := r.prs.Progress[r.id]; okBefore && !okAfter {
+ var found bool
+ for _, sl := range [][]uint64{cs.Voters, cs.VotersOutgoing} {
+ for _, id := range sl {
+ if id == r.id {
+ found = true
+ }
+ }
}
- break
- }
- switch cc.Type {
- case pb.ConfChangeAddNode:
- r.addNode(cc.NodeID)
- case pb.ConfChangeAddLearnerNode:
- r.addLearner(cc.NodeID)
- case pb.ConfChangeRemoveNode:
- // block incoming proposal when local node is
- // removed
- if cc.NodeID == r.id {
+ if !found {
propc = nil
}
- r.removeNode(cc.NodeID)
- case pb.ConfChangeUpdateNode:
- default:
- panic("unexpected conf type")
}
select {
- case n.confstatec <- pb.ConfState{
- Nodes: r.nodes(),
- Learners: r.learnerNodes()}:
+ case n.confstatec <- cs:
case <-n.done:
}
case <-n.tickc:
- r.tick()
+ n.rn.Tick()
case readyc <- rd:
- if rd.SoftState != nil {
- prevSoftSt = rd.SoftState
- }
- if len(rd.Entries) > 0 {
- prevLastUnstablei = rd.Entries[len(rd.Entries)-1].Index
- prevLastUnstablet = rd.Entries[len(rd.Entries)-1].Term
- havePrevLastUnstablei = true
- }
- if !IsEmptyHardState(rd.HardState) {
- prevHardSt = rd.HardState
- }
- if !IsEmptySnap(rd.Snapshot) {
- prevSnapi = rd.Snapshot.Metadata.Index
- }
- if index := rd.appliedCursor(); index != 0 {
- applyingToI = index
- }
-
- r.msgs = nil
- r.readStates = nil
- r.reduceUncommittedSize(rd.CommittedEntries)
+ n.rn.acceptReady(rd)
advancec = n.advancec
case <-advancec:
- if applyingToI != 0 {
- r.raftLog.appliedTo(applyingToI)
- applyingToI = 0
- }
- if havePrevLastUnstablei {
- r.raftLog.stableTo(prevLastUnstablei, prevLastUnstablet)
- havePrevLastUnstablei = false
- }
- r.raftLog.stableSnapTo(prevSnapi)
+ n.rn.Advance(rd)
+ rd = Ready{}
advancec = nil
case c := <-n.status:
c <- getStatus(r)
@@ -440,7 +405,7 @@
case n.tickc <- struct{}{}:
case <-n.done:
default:
- n.logger.Warningf("A tick missed to fire. Node blocks too long!")
+ n.rn.raft.logger.Warningf("%x (leader %v) A tick missed to fire. Node blocks too long!", n.rn.raft.id, n.rn.raft.id == n.rn.raft.lead)
}
}
@@ -459,12 +424,20 @@
return n.step(ctx, m)
}
-func (n *node) ProposeConfChange(ctx context.Context, cc pb.ConfChange) error {
- data, err := cc.Marshal()
+func confChangeToMsg(c pb.ConfChangeI) (pb.Message, error) {
+ typ, data, err := pb.MarshalConfChange(c)
+ if err != nil {
+ return pb.Message{}, err
+ }
+ return pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Type: typ, Data: data}}}, nil
+}
+
+func (n *node) ProposeConfChange(ctx context.Context, cc pb.ConfChangeI) error {
+ msg, err := confChangeToMsg(cc)
if err != nil {
return err
}
- return n.Step(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange, Data: data}}})
+ return n.Step(ctx, msg)
}
func (n *node) step(ctx context.Context, m pb.Message) error {
@@ -525,10 +498,10 @@
}
}
-func (n *node) ApplyConfChange(cc pb.ConfChange) *pb.ConfState {
+func (n *node) ApplyConfChange(cc pb.ConfChangeI) *pb.ConfState {
var cs pb.ConfState
select {
- case n.confc <- cc:
+ case n.confc <- cc.AsV2():
case <-n.done:
}
select {
diff --git a/vendor/go.etcd.io/etcd/raft/progress.go b/vendor/go.etcd.io/etcd/raft/progress.go
deleted file mode 100644
index ef3787d..0000000
--- a/vendor/go.etcd.io/etcd/raft/progress.go
+++ /dev/null
@@ -1,284 +0,0 @@
-// Copyright 2015 The etcd Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package raft
-
-import "fmt"
-
-const (
- ProgressStateProbe ProgressStateType = iota
- ProgressStateReplicate
- ProgressStateSnapshot
-)
-
-type ProgressStateType uint64
-
-var prstmap = [...]string{
- "ProgressStateProbe",
- "ProgressStateReplicate",
- "ProgressStateSnapshot",
-}
-
-func (st ProgressStateType) String() string { return prstmap[uint64(st)] }
-
-// Progress represents a follower’s progress in the view of the leader. Leader maintains
-// progresses of all followers, and sends entries to the follower based on its progress.
-type Progress struct {
- Match, Next uint64
- // State defines how the leader should interact with the follower.
- //
- // When in ProgressStateProbe, leader sends at most one replication message
- // per heartbeat interval. It also probes actual progress of the follower.
- //
- // When in ProgressStateReplicate, leader optimistically increases next
- // to the latest entry sent after sending replication message. This is
- // an optimized state for fast replicating log entries to the follower.
- //
- // When in ProgressStateSnapshot, leader should have sent out snapshot
- // before and stops sending any replication message.
- State ProgressStateType
-
- // Paused is used in ProgressStateProbe.
- // When Paused is true, raft should pause sending replication message to this peer.
- Paused bool
- // PendingSnapshot is used in ProgressStateSnapshot.
- // If there is a pending snapshot, the pendingSnapshot will be set to the
- // index of the snapshot. If pendingSnapshot is set, the replication process of
- // this Progress will be paused. raft will not resend snapshot until the pending one
- // is reported to be failed.
- PendingSnapshot uint64
-
- // RecentActive is true if the progress is recently active. Receiving any messages
- // from the corresponding follower indicates the progress is active.
- // RecentActive can be reset to false after an election timeout.
- RecentActive bool
-
- // inflights is a sliding window for the inflight messages.
- // Each inflight message contains one or more log entries.
- // The max number of entries per message is defined in raft config as MaxSizePerMsg.
- // Thus inflight effectively limits both the number of inflight messages
- // and the bandwidth each Progress can use.
- // When inflights is full, no more message should be sent.
- // When a leader sends out a message, the index of the last
- // entry should be added to inflights. The index MUST be added
- // into inflights in order.
- // When a leader receives a reply, the previous inflights should
- // be freed by calling inflights.freeTo with the index of the last
- // received entry.
- ins *inflights
-
- // IsLearner is true if this progress is tracked for a learner.
- IsLearner bool
-}
-
-func (pr *Progress) resetState(state ProgressStateType) {
- pr.Paused = false
- pr.PendingSnapshot = 0
- pr.State = state
- pr.ins.reset()
-}
-
-func (pr *Progress) becomeProbe() {
- // If the original state is ProgressStateSnapshot, progress knows that
- // the pending snapshot has been sent to this peer successfully, then
- // probes from pendingSnapshot + 1.
- if pr.State == ProgressStateSnapshot {
- pendingSnapshot := pr.PendingSnapshot
- pr.resetState(ProgressStateProbe)
- pr.Next = max(pr.Match+1, pendingSnapshot+1)
- } else {
- pr.resetState(ProgressStateProbe)
- pr.Next = pr.Match + 1
- }
-}
-
-func (pr *Progress) becomeReplicate() {
- pr.resetState(ProgressStateReplicate)
- pr.Next = pr.Match + 1
-}
-
-func (pr *Progress) becomeSnapshot(snapshoti uint64) {
- pr.resetState(ProgressStateSnapshot)
- pr.PendingSnapshot = snapshoti
-}
-
-// maybeUpdate returns false if the given n index comes from an outdated message.
-// Otherwise it updates the progress and returns true.
-func (pr *Progress) maybeUpdate(n uint64) bool {
- var updated bool
- if pr.Match < n {
- pr.Match = n
- updated = true
- pr.resume()
- }
- if pr.Next < n+1 {
- pr.Next = n + 1
- }
- return updated
-}
-
-func (pr *Progress) optimisticUpdate(n uint64) { pr.Next = n + 1 }
-
-// maybeDecrTo returns false if the given to index comes from an out of order message.
-// Otherwise it decreases the progress next index to min(rejected, last) and returns true.
-func (pr *Progress) maybeDecrTo(rejected, last uint64) bool {
- if pr.State == ProgressStateReplicate {
- // the rejection must be stale if the progress has matched and "rejected"
- // is smaller than "match".
- if rejected <= pr.Match {
- return false
- }
- // directly decrease next to match + 1
- pr.Next = pr.Match + 1
- return true
- }
-
- // the rejection must be stale if "rejected" does not match next - 1
- if pr.Next-1 != rejected {
- return false
- }
-
- if pr.Next = min(rejected, last+1); pr.Next < 1 {
- pr.Next = 1
- }
- pr.resume()
- return true
-}
-
-func (pr *Progress) pause() { pr.Paused = true }
-func (pr *Progress) resume() { pr.Paused = false }
-
-// IsPaused returns whether sending log entries to this node has been
-// paused. A node may be paused because it has rejected recent
-// MsgApps, is currently waiting for a snapshot, or has reached the
-// MaxInflightMsgs limit.
-func (pr *Progress) IsPaused() bool {
- switch pr.State {
- case ProgressStateProbe:
- return pr.Paused
- case ProgressStateReplicate:
- return pr.ins.full()
- case ProgressStateSnapshot:
- return true
- default:
- panic("unexpected state")
- }
-}
-
-func (pr *Progress) snapshotFailure() { pr.PendingSnapshot = 0 }
-
-// needSnapshotAbort returns true if snapshot progress's Match
-// is equal or higher than the pendingSnapshot.
-func (pr *Progress) needSnapshotAbort() bool {
- return pr.State == ProgressStateSnapshot && pr.Match >= pr.PendingSnapshot
-}
-
-func (pr *Progress) String() string {
- return fmt.Sprintf("next = %d, match = %d, state = %s, waiting = %v, pendingSnapshot = %d", pr.Next, pr.Match, pr.State, pr.IsPaused(), pr.PendingSnapshot)
-}
-
-type inflights struct {
- // the starting index in the buffer
- start int
- // number of inflights in the buffer
- count int
-
- // the size of the buffer
- size int
-
- // buffer contains the index of the last entry
- // inside one message.
- buffer []uint64
-}
-
-func newInflights(size int) *inflights {
- return &inflights{
- size: size,
- }
-}
-
-// add adds an inflight into inflights
-func (in *inflights) add(inflight uint64) {
- if in.full() {
- panic("cannot add into a full inflights")
- }
- next := in.start + in.count
- size := in.size
- if next >= size {
- next -= size
- }
- if next >= len(in.buffer) {
- in.growBuf()
- }
- in.buffer[next] = inflight
- in.count++
-}
-
-// grow the inflight buffer by doubling up to inflights.size. We grow on demand
-// instead of preallocating to inflights.size to handle systems which have
-// thousands of Raft groups per process.
-func (in *inflights) growBuf() {
- newSize := len(in.buffer) * 2
- if newSize == 0 {
- newSize = 1
- } else if newSize > in.size {
- newSize = in.size
- }
- newBuffer := make([]uint64, newSize)
- copy(newBuffer, in.buffer)
- in.buffer = newBuffer
-}
-
-// freeTo frees the inflights smaller or equal to the given `to` flight.
-func (in *inflights) freeTo(to uint64) {
- if in.count == 0 || to < in.buffer[in.start] {
- // out of the left side of the window
- return
- }
-
- idx := in.start
- var i int
- for i = 0; i < in.count; i++ {
- if to < in.buffer[idx] { // found the first large inflight
- break
- }
-
- // increase index and maybe rotate
- size := in.size
- if idx++; idx >= size {
- idx -= size
- }
- }
- // free i inflights and set new start index
- in.count -= i
- in.start = idx
- if in.count == 0 {
- // inflights is empty, reset the start index so that we don't grow the
- // buffer unnecessarily.
- in.start = 0
- }
-}
-
-func (in *inflights) freeFirstOne() { in.freeTo(in.buffer[in.start]) }
-
-// full returns true if the inflights is full.
-func (in *inflights) full() bool {
- return in.count == in.size
-}
-
-// resets frees all inflights.
-func (in *inflights) reset() {
- in.count = 0
- in.start = 0
-}
diff --git a/vendor/go.etcd.io/etcd/raft/quorum/joint.go b/vendor/go.etcd.io/etcd/raft/quorum/joint.go
new file mode 100644
index 0000000..e3741e0
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/raft/quorum/joint.go
@@ -0,0 +1,75 @@
+// Copyright 2019 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package quorum
+
+// JointConfig is a configuration of two groups of (possibly overlapping)
+// majority configurations. Decisions require the support of both majorities.
+type JointConfig [2]MajorityConfig
+
+func (c JointConfig) String() string {
+ if len(c[1]) > 0 {
+ return c[0].String() + "&&" + c[1].String()
+ }
+ return c[0].String()
+}
+
+// IDs returns a newly initialized map representing the set of voters present
+// in the joint configuration.
+func (c JointConfig) IDs() map[uint64]struct{} {
+ m := map[uint64]struct{}{}
+ for _, cc := range c {
+ for id := range cc {
+ m[id] = struct{}{}
+ }
+ }
+ return m
+}
+
+// Describe returns a (multi-line) representation of the commit indexes for the
+// given lookuper.
+func (c JointConfig) Describe(l AckedIndexer) string {
+ return MajorityConfig(c.IDs()).Describe(l)
+}
+
+// CommittedIndex returns the largest committed index for the given joint
+// quorum. An index is jointly committed if it is committed in both constituent
+// majorities.
+func (c JointConfig) CommittedIndex(l AckedIndexer) Index {
+ idx0 := c[0].CommittedIndex(l)
+ idx1 := c[1].CommittedIndex(l)
+ if idx0 < idx1 {
+ return idx0
+ }
+ return idx1
+}
+
+// VoteResult takes a mapping of voters to yes/no (true/false) votes and returns
+// a result indicating whether the vote is pending, lost, or won. A joint quorum
+// requires both majority quorums to vote in favor.
+func (c JointConfig) VoteResult(votes map[uint64]bool) VoteResult {
+ r1 := c[0].VoteResult(votes)
+ r2 := c[1].VoteResult(votes)
+
+ if r1 == r2 {
+ // If they agree, return the agreed state.
+ return r1
+ }
+ if r1 == VoteLost || r2 == VoteLost {
+ // If either config has lost, loss is the only possible outcome.
+ return VoteLost
+ }
+ // One side won, the other one is pending, so the whole outcome is.
+ return VotePending
+}
diff --git a/vendor/go.etcd.io/etcd/raft/quorum/majority.go b/vendor/go.etcd.io/etcd/raft/quorum/majority.go
new file mode 100644
index 0000000..8858a36
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/raft/quorum/majority.go
@@ -0,0 +1,210 @@
+// Copyright 2019 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package quorum
+
+import (
+ "fmt"
+ "math"
+ "sort"
+ "strings"
+)
+
+// MajorityConfig is a set of IDs that uses majority quorums to make decisions.
+type MajorityConfig map[uint64]struct{}
+
+func (c MajorityConfig) String() string {
+ sl := make([]uint64, 0, len(c))
+ for id := range c {
+ sl = append(sl, id)
+ }
+ sort.Slice(sl, func(i, j int) bool { return sl[i] < sl[j] })
+ var buf strings.Builder
+ buf.WriteByte('(')
+ for i := range sl {
+ if i > 0 {
+ buf.WriteByte(' ')
+ }
+ fmt.Fprint(&buf, sl[i])
+ }
+ buf.WriteByte(')')
+ return buf.String()
+}
+
+// Describe returns a (multi-line) representation of the commit indexes for the
+// given lookuper.
+func (c MajorityConfig) Describe(l AckedIndexer) string {
+ if len(c) == 0 {
+ return "<empty majority quorum>"
+ }
+ type tup struct {
+ id uint64
+ idx Index
+ ok bool // idx found?
+ bar int // length of bar displayed for this tup
+ }
+
+ // Below, populate .bar so that the i-th largest commit index has bar i (we
+ // plot this as sort of a progress bar). The actual code is a bit more
+ // complicated and also makes sure that equal index => equal bar.
+
+ n := len(c)
+ info := make([]tup, 0, n)
+ for id := range c {
+ idx, ok := l.AckedIndex(id)
+ info = append(info, tup{id: id, idx: idx, ok: ok})
+ }
+
+ // Sort by index
+ sort.Slice(info, func(i, j int) bool {
+ if info[i].idx == info[j].idx {
+ return info[i].id < info[j].id
+ }
+ return info[i].idx < info[j].idx
+ })
+
+ // Populate .bar.
+ for i := range info {
+ if i > 0 && info[i-1].idx < info[i].idx {
+ info[i].bar = i
+ }
+ }
+
+ // Sort by ID.
+ sort.Slice(info, func(i, j int) bool {
+ return info[i].id < info[j].id
+ })
+
+ var buf strings.Builder
+
+ // Print.
+ fmt.Fprint(&buf, strings.Repeat(" ", n)+" idx\n")
+ for i := range info {
+ bar := info[i].bar
+ if !info[i].ok {
+ fmt.Fprint(&buf, "?"+strings.Repeat(" ", n))
+ } else {
+ fmt.Fprint(&buf, strings.Repeat("x", bar)+">"+strings.Repeat(" ", n-bar))
+ }
+ fmt.Fprintf(&buf, " %5d (id=%d)\n", info[i].idx, info[i].id)
+ }
+ return buf.String()
+}
+
+// Slice returns the MajorityConfig as a sorted slice.
+func (c MajorityConfig) Slice() []uint64 {
+ var sl []uint64
+ for id := range c {
+ sl = append(sl, id)
+ }
+ sort.Slice(sl, func(i, j int) bool { return sl[i] < sl[j] })
+ return sl
+}
+
+func insertionSort(sl []uint64) {
+ a, b := 0, len(sl)
+ for i := a + 1; i < b; i++ {
+ for j := i; j > a && sl[j] < sl[j-1]; j-- {
+ sl[j], sl[j-1] = sl[j-1], sl[j]
+ }
+ }
+}
+
+// CommittedIndex computes the committed index from those supplied via the
+// provided AckedIndexer (for the active config).
+func (c MajorityConfig) CommittedIndex(l AckedIndexer) Index {
+ n := len(c)
+ if n == 0 {
+ // This plays well with joint quorums which, when one half is the zero
+ // MajorityConfig, should behave like the other half.
+ return math.MaxUint64
+ }
+
+ // Use an on-stack slice to collect the committed indexes when n <= 7
+ // (otherwise we alloc). The alternative is to stash a slice on
+ // MajorityConfig, but this impairs usability (as is, MajorityConfig is just
+ // a map, and that's nice). The assumption is that running with a
+ // replication factor of >7 is rare, and in cases in which it happens
+ // performance is a lesser concern (additionally the performance
+ // implications of an allocation here are far from drastic).
+ var stk [7]uint64
+ var srt []uint64
+ if len(stk) >= n {
+ srt = stk[:n]
+ } else {
+ srt = make([]uint64, n)
+ }
+
+ {
+ // Fill the slice with the indexes observed. Any unused slots will be
+ // left as zero; these correspond to voters that may report in, but
+ // haven't yet. We fill from the right (since the zeroes will end up on
+ // the left after sorting below anyway).
+ i := n - 1
+ for id := range c {
+ if idx, ok := l.AckedIndex(id); ok {
+ srt[i] = uint64(idx)
+ i--
+ }
+ }
+ }
+
+ // Sort by index. Use a bespoke algorithm (copied from the stdlib's sort
+ // package) to keep srt on the stack.
+ insertionSort(srt)
+
+ // The smallest index into the array for which the value is acked by a
+ // quorum. In other words, from the end of the slice, move n/2+1 to the
+ // left (accounting for zero-indexing).
+ pos := n - (n/2 + 1)
+ return Index(srt[pos])
+}
+
+// VoteResult takes a mapping of voters to yes/no (true/false) votes and returns
+// a result indicating whether the vote is pending (i.e. neither a quorum of
+// yes/no has been reached), won (a quorum of yes has been reached), or lost (a
+// quorum of no has been reached).
+func (c MajorityConfig) VoteResult(votes map[uint64]bool) VoteResult {
+ if len(c) == 0 {
+ // By convention, the elections on an empty config win. This comes in
+ // handy with joint quorums because it'll make a half-populated joint
+ // quorum behave like a majority quorum.
+ return VoteWon
+ }
+
+ ny := [2]int{} // vote counts for no and yes, respectively
+
+ var missing int
+ for id := range c {
+ v, ok := votes[id]
+ if !ok {
+ missing++
+ continue
+ }
+ if v {
+ ny[1]++
+ } else {
+ ny[0]++
+ }
+ }
+
+ q := len(c)/2 + 1
+ if ny[1] >= q {
+ return VoteWon
+ }
+ if ny[1]+missing >= q {
+ return VotePending
+ }
+ return VoteLost
+}
diff --git a/vendor/go.etcd.io/etcd/raft/quorum/quorum.go b/vendor/go.etcd.io/etcd/raft/quorum/quorum.go
new file mode 100644
index 0000000..2899e46
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/raft/quorum/quorum.go
@@ -0,0 +1,58 @@
+// Copyright 2019 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package quorum
+
+import (
+ "math"
+ "strconv"
+)
+
+// Index is a Raft log position.
+type Index uint64
+
+func (i Index) String() string {
+ if i == math.MaxUint64 {
+ return "∞"
+ }
+ return strconv.FormatUint(uint64(i), 10)
+}
+
+// AckedIndexer allows looking up a commit index for a given ID of a voter
+// from a corresponding MajorityConfig.
+type AckedIndexer interface {
+ AckedIndex(voterID uint64) (idx Index, found bool)
+}
+
+type mapAckIndexer map[uint64]Index
+
+func (m mapAckIndexer) AckedIndex(id uint64) (Index, bool) {
+ idx, ok := m[id]
+ return idx, ok
+}
+
+// VoteResult indicates the outcome of a vote.
+//
+//go:generate stringer -type=VoteResult
+type VoteResult uint8
+
+const (
+ // VotePending indicates that the decision of the vote depends on future
+ // votes, i.e. neither "yes" or "no" has reached quorum yet.
+ VotePending VoteResult = 1 + iota
+ // VoteLost indicates that the quorum has voted "no".
+ VoteLost
+ // VoteWon indicates that the quorum has voted "yes".
+ VoteWon
+)
diff --git a/vendor/go.etcd.io/etcd/raft/quorum/voteresult_string.go b/vendor/go.etcd.io/etcd/raft/quorum/voteresult_string.go
new file mode 100644
index 0000000..9eca8fd
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/raft/quorum/voteresult_string.go
@@ -0,0 +1,26 @@
+// Code generated by "stringer -type=VoteResult"; DO NOT EDIT.
+
+package quorum
+
+import "strconv"
+
+func _() {
+ // An "invalid array index" compiler error signifies that the constant values have changed.
+ // Re-run the stringer command to generate them again.
+ var x [1]struct{}
+ _ = x[VotePending-1]
+ _ = x[VoteLost-2]
+ _ = x[VoteWon-3]
+}
+
+const _VoteResult_name = "VotePendingVoteLostVoteWon"
+
+var _VoteResult_index = [...]uint8{0, 11, 19, 26}
+
+func (i VoteResult) String() string {
+ i -= 1
+ if i >= VoteResult(len(_VoteResult_index)-1) {
+ return "VoteResult(" + strconv.FormatInt(int64(i+1), 10) + ")"
+ }
+ return _VoteResult_name[_VoteResult_index[i]:_VoteResult_index[i+1]]
+}
diff --git a/vendor/go.etcd.io/etcd/raft/raft.go b/vendor/go.etcd.io/etcd/raft/raft.go
index e1e6a16..cdcb43d 100644
--- a/vendor/go.etcd.io/etcd/raft/raft.go
+++ b/vendor/go.etcd.io/etcd/raft/raft.go
@@ -25,7 +25,10 @@
"sync"
"time"
+ "go.etcd.io/etcd/raft/confchange"
+ "go.etcd.io/etcd/raft/quorum"
pb "go.etcd.io/etcd/raft/raftpb"
+ "go.etcd.io/etcd/raft/tracker"
)
// None is a placeholder node ID used when there is no leader.
@@ -261,18 +264,14 @@
maxMsgSize uint64
maxUncommittedSize uint64
- maxInflight int
- prs map[uint64]*Progress
- learnerPrs map[uint64]*Progress
- matchBuf uint64Slice
+ // TODO(tbg): rename to trk.
+ prs tracker.ProgressTracker
state StateType
// isLearner is true if the local raft node is a learner.
isLearner bool
- votes map[uint64]bool
-
msgs []pb.Message
// the leader id
@@ -330,28 +329,26 @@
if err != nil {
panic(err) // TODO(bdarnell)
}
- peers := c.peers
- learners := c.learners
- if len(cs.Nodes) > 0 || len(cs.Learners) > 0 {
- if len(peers) > 0 || len(learners) > 0 {
+
+ if len(c.peers) > 0 || len(c.learners) > 0 {
+ if len(cs.Voters) > 0 || len(cs.Learners) > 0 {
// TODO(bdarnell): the peers argument is always nil except in
// tests; the argument should be removed and these tests should be
// updated to specify their nodes through a snapshot.
- panic("cannot specify both newRaft(peers, learners) and ConfState.(Nodes, Learners)")
+ panic("cannot specify both newRaft(peers, learners) and ConfState.(Voters, Learners)")
}
- peers = cs.Nodes
- learners = cs.Learners
+ cs.Voters = c.peers
+ cs.Learners = c.learners
}
+
r := &raft{
id: c.ID,
lead: None,
isLearner: false,
raftLog: raftlog,
maxMsgSize: c.MaxSizePerMsg,
- maxInflight: c.MaxInflightMsgs,
maxUncommittedSize: c.MaxUncommittedEntriesSize,
- prs: make(map[uint64]*Progress),
- learnerPrs: make(map[uint64]*Progress),
+ prs: tracker.MakeProgressTracker(c.MaxInflightMsgs),
electionTimeout: c.ElectionTick,
heartbeatTimeout: c.HeartbeatTick,
logger: c.Logger,
@@ -360,20 +357,17 @@
readOnly: newReadOnly(c.ReadOnlyOption),
disableProposalForwarding: c.DisableProposalForwarding,
}
- for _, p := range peers {
- r.prs[p] = &Progress{Next: 1, ins: newInflights(r.maxInflight)}
- }
- for _, p := range learners {
- if _, ok := r.prs[p]; ok {
- panic(fmt.Sprintf("node %x is in both learner and peer list", p))
- }
- r.learnerPrs[p] = &Progress{Next: 1, ins: newInflights(r.maxInflight), IsLearner: true}
- if r.id == p {
- r.isLearner = true
- }
- }
- if !isHardStateEqual(hs, emptyState) {
+ cfg, prs, err := confchange.Restore(confchange.Changer{
+ Tracker: r.prs,
+ LastIndex: raftlog.lastIndex(),
+ }, cs)
+ if err != nil {
+ panic(err)
+ }
+ assertConfStatesEquivalent(r.logger, cs, r.switchToConfig(cfg, prs))
+
+ if !IsEmptyHardState(hs) {
r.loadState(hs)
}
if c.Applied > 0 {
@@ -382,7 +376,7 @@
r.becomeFollower(r.Term, None)
var nodesStrs []string
- for _, n := range r.nodes() {
+ for _, n := range r.prs.VoterNodes() {
nodesStrs = append(nodesStrs, fmt.Sprintf("%x", n))
}
@@ -403,26 +397,6 @@
}
}
-func (r *raft) quorum() int { return len(r.prs)/2 + 1 }
-
-func (r *raft) nodes() []uint64 {
- nodes := make([]uint64, 0, len(r.prs))
- for id := range r.prs {
- nodes = append(nodes, id)
- }
- sort.Sort(uint64Slice(nodes))
- return nodes
-}
-
-func (r *raft) learnerNodes() []uint64 {
- nodes := make([]uint64, 0, len(r.learnerPrs))
- for id := range r.learnerPrs {
- nodes = append(nodes, id)
- }
- sort.Sort(uint64Slice(nodes))
- return nodes
-}
-
// send persists state to stable storage and then sends to its mailbox.
func (r *raft) send(m pb.Message) {
m.From = r.id
@@ -457,14 +431,6 @@
r.msgs = append(r.msgs, m)
}
-func (r *raft) getProgress(id uint64) *Progress {
- if pr, ok := r.prs[id]; ok {
- return pr
- }
-
- return r.learnerPrs[id]
-}
-
// sendAppend sends an append RPC with new entries (if any) and the
// current commit index to the given peer.
func (r *raft) sendAppend(to uint64) {
@@ -477,7 +443,7 @@
// ("empty" messages are useful to convey updated Commit indexes, but
// are undesirable when we're sending multiple messages in a batch).
func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
- pr := r.getProgress(to)
+ pr := r.prs.Progress[to]
if pr.IsPaused() {
return false
}
@@ -512,7 +478,7 @@
sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr)
- pr.becomeSnapshot(sindex)
+ pr.BecomeSnapshot(sindex)
r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr)
} else {
m.Type = pb.MsgApp
@@ -522,13 +488,13 @@
m.Commit = r.raftLog.committed
if n := len(m.Entries); n != 0 {
switch pr.State {
- // optimistically increase the next when in ProgressStateReplicate
- case ProgressStateReplicate:
+ // optimistically increase the next when in StateReplicate
+ case tracker.StateReplicate:
last := m.Entries[n-1].Index
- pr.optimisticUpdate(last)
- pr.ins.add(last)
- case ProgressStateProbe:
- pr.pause()
+ pr.OptimisticUpdate(last)
+ pr.Inflights.Add(last)
+ case tracker.StateProbe:
+ pr.ProbeSent = true
default:
r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State)
}
@@ -546,7 +512,7 @@
// or it might not have all the committed entries.
// The leader MUST NOT forward the follower's commit to
// an unmatched index.
- commit := min(r.getProgress(to).Match, r.raftLog.committed)
+ commit := min(r.prs.Progress[to].Match, r.raftLog.committed)
m := pb.Message{
To: to,
Type: pb.MsgHeartbeat,
@@ -557,24 +523,13 @@
r.send(m)
}
-func (r *raft) forEachProgress(f func(id uint64, pr *Progress)) {
- for id, pr := range r.prs {
- f(id, pr)
- }
-
- for id, pr := range r.learnerPrs {
- f(id, pr)
- }
-}
-
// bcastAppend sends RPC, with entries to all peers that are not up-to-date
// according to the progress recorded in r.prs.
func (r *raft) bcastAppend() {
- r.forEachProgress(func(id uint64, _ *Progress) {
+ r.prs.Visit(func(id uint64, _ *tracker.Progress) {
if id == r.id {
return
}
-
r.sendAppend(id)
})
}
@@ -590,7 +545,7 @@
}
func (r *raft) bcastHeartbeatWithCtx(ctx []byte) {
- r.forEachProgress(func(id uint64, _ *Progress) {
+ r.prs.Visit(func(id uint64, _ *tracker.Progress) {
if id == r.id {
return
}
@@ -598,23 +553,51 @@
})
}
+func (r *raft) advance(rd Ready) {
+ // If entries were applied (or a snapshot), update our cursor for
+ // the next Ready. Note that if the current HardState contains a
+ // new Commit index, this does not mean that we're also applying
+ // all of the new entries due to commit pagination by size.
+ if index := rd.appliedCursor(); index > 0 {
+ r.raftLog.appliedTo(index)
+ if r.prs.Config.AutoLeave && index >= r.pendingConfIndex && r.state == StateLeader {
+ // If the current (and most recent, at least for this leader's term)
+ // configuration should be auto-left, initiate that now.
+ ccdata, err := (&pb.ConfChangeV2{}).Marshal()
+ if err != nil {
+ panic(err)
+ }
+ ent := pb.Entry{
+ Type: pb.EntryConfChangeV2,
+ Data: ccdata,
+ }
+ if !r.appendEntry(ent) {
+ // If we could not append the entry, bump the pending conf index
+ // so that we'll try again later.
+ //
+ // TODO(tbg): test this case.
+ r.pendingConfIndex = r.raftLog.lastIndex()
+ } else {
+ r.logger.Infof("initiating automatic transition out of joint configuration %s", r.prs.Config)
+ }
+ }
+ }
+ r.reduceUncommittedSize(rd.CommittedEntries)
+
+ if len(rd.Entries) > 0 {
+ e := rd.Entries[len(rd.Entries)-1]
+ r.raftLog.stableTo(e.Index, e.Term)
+ }
+ if !IsEmptySnap(rd.Snapshot) {
+ r.raftLog.stableSnapTo(rd.Snapshot.Metadata.Index)
+ }
+}
+
// maybeCommit attempts to advance the commit index. Returns true if
// the commit index changed (in which case the caller should call
// r.bcastAppend).
func (r *raft) maybeCommit() bool {
- // Preserving matchBuf across calls is an optimization
- // used to avoid allocating a new slice on each call.
- if cap(r.matchBuf) < len(r.prs) {
- r.matchBuf = make(uint64Slice, len(r.prs))
- }
- mis := r.matchBuf[:len(r.prs)]
- idx := 0
- for _, p := range r.prs {
- mis[idx] = p.Match
- idx++
- }
- sort.Sort(mis)
- mci := mis[len(mis)-r.quorum()]
+ mci := r.prs.Committed()
return r.raftLog.maybeCommit(mci, r.Term)
}
@@ -631,9 +614,14 @@
r.abortLeaderTransfer()
- r.votes = make(map[uint64]bool)
- r.forEachProgress(func(id uint64, pr *Progress) {
- *pr = Progress{Next: r.raftLog.lastIndex() + 1, ins: newInflights(r.maxInflight), IsLearner: pr.IsLearner}
+ r.prs.ResetVotes()
+ r.prs.Visit(func(id uint64, pr *tracker.Progress) {
+ *pr = tracker.Progress{
+ Match: 0,
+ Next: r.raftLog.lastIndex() + 1,
+ Inflights: tracker.NewInflights(r.prs.MaxInflight),
+ IsLearner: pr.IsLearner,
+ }
if id == r.id {
pr.Match = r.raftLog.lastIndex()
}
@@ -661,7 +649,7 @@
}
// use latest "last" index after truncate/append
li = r.raftLog.append(es...)
- r.getProgress(r.id).maybeUpdate(li)
+ r.prs.Progress[r.id].MaybeUpdate(li)
// Regardless of maybeCommit's return, our caller will call bcastAppend.
r.maybeCommit()
return true
@@ -734,7 +722,7 @@
// but doesn't change anything else. In particular it does not increase
// r.Term or change r.Vote.
r.step = stepCandidate
- r.votes = make(map[uint64]bool)
+ r.prs.ResetVotes()
r.tick = r.tickElection
r.lead = None
r.state = StatePreCandidate
@@ -755,7 +743,7 @@
// (perhaps after having received a snapshot as a result). The leader is
// trivially in this state. Note that r.reset() has initialized this
// progress with the last index already.
- r.prs[r.id].becomeReplicate()
+ r.prs.Progress[r.id].BecomeReplicate()
// Conservatively set the pendingConfIndex to the last index in the
// log. There may or may not be a pending config change, but it's
@@ -777,7 +765,14 @@
r.logger.Infof("%x became leader at term %d", r.id, r.Term)
}
+// campaign transitions the raft instance to candidate state. This must only be
+// called after verifying that this is a legitimate transition.
func (r *raft) campaign(t CampaignType) {
+ if !r.promotable() {
+ // This path should not be hit (callers are supposed to check), but
+ // better safe than sorry.
+ r.logger.Warningf("%x is unpromotable; campaign() should have been called", r.id)
+ }
var term uint64
var voteMsg pb.MessageType
if t == campaignPreElection {
@@ -790,7 +785,7 @@
voteMsg = pb.MsgVote
term = r.Term
}
- if r.quorum() == r.poll(r.id, voteRespMsgType(voteMsg), true) {
+ if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == quorum.VoteWon {
// We won the election after voting for ourselves (which must mean that
// this is a single-node cluster). Advance to the next state.
if t == campaignPreElection {
@@ -800,7 +795,16 @@
}
return
}
- for id := range r.prs {
+ var ids []uint64
+ {
+ idMap := r.prs.Voters.IDs()
+ ids = make([]uint64, 0, len(idMap))
+ for id := range idMap {
+ ids = append(ids, id)
+ }
+ sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] })
+ }
+ for _, id := range ids {
if id == r.id {
continue
}
@@ -815,21 +819,14 @@
}
}
-func (r *raft) poll(id uint64, t pb.MessageType, v bool) (granted int) {
+func (r *raft) poll(id uint64, t pb.MessageType, v bool) (granted int, rejected int, result quorum.VoteResult) {
if v {
r.logger.Infof("%x received %s from %x at term %d", r.id, t, id, r.Term)
} else {
r.logger.Infof("%x received %s rejection from %x at term %d", r.id, t, id, r.Term)
}
- if _, ok := r.votes[id]; !ok {
- r.votes[id] = v
- }
- for _, vv := range r.votes {
- if vv {
- granted++
- }
- }
- return granted
+ r.prs.RecordVote(id, v)
+ return r.prs.TallyVotes()
}
func (r *raft) Step(m pb.Message) error {
@@ -910,6 +907,10 @@
switch m.Type {
case pb.MsgHup:
if r.state != StateLeader {
+ if !r.promotable() {
+ r.logger.Warningf("%x is unpromotable and can not campaign; ignoring MsgHup", r.id)
+ return nil
+ }
ents, err := r.raftLog.slice(r.raftLog.applied+1, r.raftLog.committed+1, noLimit)
if err != nil {
r.logger.Panicf("unexpected error getting unapplied entries (%v)", err)
@@ -930,12 +931,6 @@
}
case pb.MsgVote, pb.MsgPreVote:
- if r.isLearner {
- // TODO: learner may need to vote, in case of node down when confchange.
- r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] ignored %s from %x [logterm: %d, index: %d] at term %d: learner can not vote",
- r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
- return nil
- }
// We can vote if this is a repeat of a vote we've already cast...
canVote := r.Vote == m.From ||
// ...we haven't voted and we don't think there's a leader yet in this term...
@@ -944,12 +939,30 @@
(m.Type == pb.MsgPreVote && m.Term > r.Term)
// ...and we believe the candidate is up to date.
if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
+ // Note: it turns out that that learners must be allowed to cast votes.
+ // This seems counter- intuitive but is necessary in the situation in which
+ // a learner has been promoted (i.e. is now a voter) but has not learned
+ // about this yet.
+ // For example, consider a group in which id=1 is a learner and id=2 and
+ // id=3 are voters. A configuration change promoting 1 can be committed on
+ // the quorum `{2,3}` without the config change being appended to the
+ // learner's log. If the leader (say 2) fails, there are de facto two
+ // voters remaining. Only 3 can win an election (due to its log containing
+ // all committed entries), but to do so it will need 1 to vote. But 1
+ // considers itself a learner and will continue to do so until 3 has
+ // stepped up as leader, replicates the conf change to 1, and 1 applies it.
+ // Ultimately, by receiving a request to vote, the learner realizes that
+ // the candidate believes it to be a voter, and that it should act
+ // accordingly. The candidate's config may be stale, too; but in that case
+ // it won't win the election, at least in the absence of the bug discussed
+ // in:
+ // https://github.com/etcd-io/etcd/issues/7625#issuecomment-488798263.
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] cast %s for %x [logterm: %d, index: %d] at term %d",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
// When responding to Msg{Pre,}Vote messages we include the term
- // from the message, not the local term. To see why consider the
+ // from the message, not the local term. To see why, consider the
// case where a single node was previously partitioned away and
- // it's local term is now of date. If we include the local term
+ // it's local term is now out of date. If we include the local term
// (recall that for pre-votes we don't update the local term), the
// (pre-)campaigning node on the other end will proceed to ignore
// the message (it ignores all out of date messages).
@@ -985,16 +998,32 @@
r.bcastHeartbeat()
return nil
case pb.MsgCheckQuorum:
- if !r.checkQuorumActive() {
+ // The leader should always see itself as active. As a precaution, handle
+ // the case in which the leader isn't in the configuration any more (for
+ // example if it just removed itself).
+ //
+ // TODO(tbg): I added a TODO in removeNode, it doesn't seem that the
+ // leader steps down when removing itself. I might be missing something.
+ if pr := r.prs.Progress[r.id]; pr != nil {
+ pr.RecentActive = true
+ }
+ if !r.prs.QuorumActive() {
r.logger.Warningf("%x stepped down to follower since quorum is not active", r.id)
r.becomeFollower(r.Term, None)
}
+ // Mark everyone (but ourselves) as inactive in preparation for the next
+ // CheckQuorum.
+ r.prs.Visit(func(id uint64, pr *tracker.Progress) {
+ if id != r.id {
+ pr.RecentActive = false
+ }
+ })
return nil
case pb.MsgProp:
if len(m.Entries) == 0 {
r.logger.Panicf("%x stepped empty MsgProp", r.id)
}
- if _, ok := r.prs[r.id]; !ok {
+ if r.prs.Progress[r.id] == nil {
// If we are not currently a member of the range (i.e. this node
// was removed from the configuration while serving as leader),
// drop any new proposals.
@@ -1005,11 +1034,38 @@
return ErrProposalDropped
}
- for i, e := range m.Entries {
+ for i := range m.Entries {
+ e := &m.Entries[i]
+ var cc pb.ConfChangeI
if e.Type == pb.EntryConfChange {
- if r.pendingConfIndex > r.raftLog.applied {
- r.logger.Infof("propose conf %s ignored since pending unapplied configuration [index %d, applied %d]",
- e.String(), r.pendingConfIndex, r.raftLog.applied)
+ var ccc pb.ConfChange
+ if err := ccc.Unmarshal(e.Data); err != nil {
+ panic(err)
+ }
+ cc = ccc
+ } else if e.Type == pb.EntryConfChangeV2 {
+ var ccc pb.ConfChangeV2
+ if err := ccc.Unmarshal(e.Data); err != nil {
+ panic(err)
+ }
+ cc = ccc
+ }
+ if cc != nil {
+ alreadyPending := r.pendingConfIndex > r.raftLog.applied
+ alreadyJoint := len(r.prs.Config.Voters[1]) > 0
+ wantsLeaveJoint := len(cc.AsV2().Changes) == 0
+
+ var refused string
+ if alreadyPending {
+ refused = fmt.Sprintf("possible unapplied conf change at index %d (applied to %d)", r.pendingConfIndex, r.raftLog.applied)
+ } else if alreadyJoint && !wantsLeaveJoint {
+ refused = "must transition out of joint config first"
+ } else if !alreadyJoint && wantsLeaveJoint {
+ refused = "not in joint state; refusing empty conf change"
+ }
+
+ if refused != "" {
+ r.logger.Infof("%x ignoring conf change %v at config %s: %s", r.id, cc, r.prs.Config, refused)
m.Entries[i] = pb.Entry{Type: pb.EntryNormal}
} else {
r.pendingConfIndex = r.raftLog.lastIndex() + uint64(i) + 1
@@ -1023,7 +1079,9 @@
r.bcastAppend()
return nil
case pb.MsgReadIndex:
- if r.quorum() > 1 {
+ // If more than the local vote is needed, go through a full broadcast,
+ // otherwise optimize.
+ if !r.prs.IsSingleton() {
if r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) != r.Term {
// Reject read only request when this leader has not committed any log entry at its term.
return nil
@@ -1035,24 +1093,30 @@
switch r.readOnly.option {
case ReadOnlySafe:
r.readOnly.addRequest(r.raftLog.committed, m)
+ // The local node automatically acks the request.
+ r.readOnly.recvAck(r.id, m.Entries[0].Data)
r.bcastHeartbeatWithCtx(m.Entries[0].Data)
case ReadOnlyLeaseBased:
ri := r.raftLog.committed
if m.From == None || m.From == r.id { // from local member
- r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data})
+ r.readStates = append(r.readStates, ReadState{Index: ri, RequestCtx: m.Entries[0].Data})
} else {
r.send(pb.Message{To: m.From, Type: pb.MsgReadIndexResp, Index: ri, Entries: m.Entries})
}
}
- } else {
- r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data})
+ } else { // only one voting member (the leader) in the cluster
+ if m.From == None || m.From == r.id { // from leader itself
+ r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data})
+ } else { // from learner member
+ r.send(pb.Message{To: m.From, Type: pb.MsgReadIndexResp, Index: r.raftLog.committed, Entries: m.Entries})
+ }
}
return nil
}
// All other message types require a progress for m.From (pr).
- pr := r.getProgress(m.From)
+ pr := r.prs.Progress[m.From]
if pr == nil {
r.logger.Debugf("%x no progress available for %x", r.id, m.From)
return nil
@@ -1062,32 +1126,35 @@
pr.RecentActive = true
if m.Reject {
- r.logger.Debugf("%x received msgApp rejection(lastindex: %d) from %x for index %d",
+ r.logger.Debugf("%x received MsgAppResp(MsgApp was rejected, lastindex: %d) from %x for index %d",
r.id, m.RejectHint, m.From, m.Index)
- if pr.maybeDecrTo(m.Index, m.RejectHint) {
+ if pr.MaybeDecrTo(m.Index, m.RejectHint) {
r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr)
- if pr.State == ProgressStateReplicate {
- pr.becomeProbe()
+ if pr.State == tracker.StateReplicate {
+ pr.BecomeProbe()
}
r.sendAppend(m.From)
}
} else {
oldPaused := pr.IsPaused()
- if pr.maybeUpdate(m.Index) {
+ if pr.MaybeUpdate(m.Index) {
switch {
- case pr.State == ProgressStateProbe:
- pr.becomeReplicate()
- case pr.State == ProgressStateSnapshot && pr.needSnapshotAbort():
- r.logger.Debugf("%x snapshot aborted, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
+ case pr.State == tracker.StateProbe:
+ pr.BecomeReplicate()
+ case pr.State == tracker.StateSnapshot && pr.Match >= pr.PendingSnapshot:
+ // TODO(tbg): we should also enter this branch if a snapshot is
+ // received that is below pr.PendingSnapshot but which makes it
+ // possible to use the log again.
+ r.logger.Debugf("%x recovered from needing snapshot, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
// Transition back to replicating state via probing state
// (which takes the snapshot into account). If we didn't
// move to replicating state, that would only happen with
// the next round of appends (but there may not be a next
// round for a while, exposing an inconsistent RaftStatus).
- pr.becomeProbe()
- pr.becomeReplicate()
- case pr.State == ProgressStateReplicate:
- pr.ins.freeTo(m.Index)
+ pr.BecomeProbe()
+ pr.BecomeReplicate()
+ case pr.State == tracker.StateReplicate:
+ pr.Inflights.FreeLE(m.Index)
}
if r.maybeCommit() {
@@ -1114,11 +1181,11 @@
}
case pb.MsgHeartbeatResp:
pr.RecentActive = true
- pr.resume()
+ pr.ProbeSent = false
// free one slot for the full inflights window to allow progress.
- if pr.State == ProgressStateReplicate && pr.ins.full() {
- pr.ins.freeFirstOne()
+ if pr.State == tracker.StateReplicate && pr.Inflights.Full() {
+ pr.Inflights.FreeFirstOne()
}
if pr.Match < r.raftLog.lastIndex() {
r.sendAppend(m.From)
@@ -1128,8 +1195,7 @@
return nil
}
- ackCount := r.readOnly.recvAck(m)
- if ackCount < r.quorum() {
+ if r.prs.Voters.VoteResult(r.readOnly.recvAck(m.From, m.Context)) != quorum.VoteWon {
return nil
}
@@ -1143,26 +1209,32 @@
}
}
case pb.MsgSnapStatus:
- if pr.State != ProgressStateSnapshot {
+ if pr.State != tracker.StateSnapshot {
return nil
}
+ // TODO(tbg): this code is very similar to the snapshot handling in
+ // MsgAppResp above. In fact, the code there is more correct than the
+ // code here and should likely be updated to match (or even better, the
+ // logic pulled into a newly created Progress state machine handler).
if !m.Reject {
- pr.becomeProbe()
+ pr.BecomeProbe()
r.logger.Debugf("%x snapshot succeeded, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
} else {
- pr.snapshotFailure()
- pr.becomeProbe()
+ // NB: the order here matters or we'll be probing erroneously from
+ // the snapshot index, but the snapshot never applied.
+ pr.PendingSnapshot = 0
+ pr.BecomeProbe()
r.logger.Debugf("%x snapshot failed, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
}
- // If snapshot finish, wait for the msgAppResp from the remote node before sending
- // out the next msgApp.
+ // If snapshot finish, wait for the MsgAppResp from the remote node before sending
+ // out the next MsgApp.
// If snapshot failure, wait for a heartbeat interval before next try
- pr.pause()
+ pr.ProbeSent = true
case pb.MsgUnreachable:
// During optimistic replication, if the remote becomes unreachable,
// there is huge probability that a MsgApp is lost.
- if pr.State == ProgressStateReplicate {
- pr.becomeProbe()
+ if pr.State == tracker.StateReplicate {
+ pr.BecomeProbe()
}
r.logger.Debugf("%x failed to send message to %x because it is unreachable [%s]", r.id, m.From, pr)
case pb.MsgTransferLeader:
@@ -1226,17 +1298,17 @@
r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
r.handleSnapshot(m)
case myVoteRespType:
- gr := r.poll(m.From, m.Type, !m.Reject)
- r.logger.Infof("%x [quorum:%d] has received %d %s votes and %d vote rejections", r.id, r.quorum(), gr, m.Type, len(r.votes)-gr)
- switch r.quorum() {
- case gr:
+ gr, rj, res := r.poll(m.From, m.Type, !m.Reject)
+ r.logger.Infof("%x has received %d %s votes and %d vote rejections", r.id, gr, m.Type, rj)
+ switch res {
+ case quorum.VoteWon:
if r.state == StatePreCandidate {
r.campaign(campaignElection)
} else {
r.becomeLeader()
r.bcastAppend()
}
- case len(r.votes) - gr:
+ case quorum.VoteLost:
// pb.MsgPreVoteResp contains future term of pre-candidate
// m.Term > r.Term; reuse r.Term
r.becomeFollower(r.Term, None)
@@ -1314,7 +1386,7 @@
if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
} else {
- r.logger.Debugf("%x [logterm: %d, index: %d] rejected msgApp [logterm: %d, index: %d] from %x",
+ r.logger.Debugf("%x [logterm: %d, index: %d] rejected MsgApp [logterm: %d, index: %d] from %x",
r.id, r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From)
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true, RejectHint: r.raftLog.lastIndex()})
}
@@ -1339,11 +1411,51 @@
}
// restore recovers the state machine from a snapshot. It restores the log and the
-// configuration of state machine.
+// configuration of state machine. If this method returns false, the snapshot was
+// ignored, either because it was obsolete or because of an error.
func (r *raft) restore(s pb.Snapshot) bool {
if s.Metadata.Index <= r.raftLog.committed {
return false
}
+ if r.state != StateFollower {
+ // This is defense-in-depth: if the leader somehow ended up applying a
+ // snapshot, it could move into a new term without moving into a
+ // follower state. This should never fire, but if it did, we'd have
+ // prevented damage by returning early, so log only a loud warning.
+ //
+ // At the time of writing, the instance is guaranteed to be in follower
+ // state when this method is called.
+ r.logger.Warningf("%x attempted to restore snapshot as leader; should never happen", r.id)
+ r.becomeFollower(r.Term+1, None)
+ return false
+ }
+
+ // More defense-in-depth: throw away snapshot if recipient is not in the
+ // config. This shouldn't ever happen (at the time of writing) but lots of
+ // code here and there assumes that r.id is in the progress tracker.
+ found := false
+ cs := s.Metadata.ConfState
+ for _, set := range [][]uint64{
+ cs.Voters,
+ cs.Learners,
+ } {
+ for _, id := range set {
+ if id == r.id {
+ found = true
+ break
+ }
+ }
+ }
+ if !found {
+ r.logger.Warningf(
+ "%x attempted to restore snapshot but it is not in the ConfState %v; should never happen",
+ r.id, cs,
+ )
+ return false
+ }
+
+ // Now go ahead and actually restore.
+
if r.raftLog.matchTerm(s.Metadata.Index, s.Metadata.Term) {
r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] fast-forwarded commit to snapshot [index: %d, term: %d]",
r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
@@ -1351,123 +1463,115 @@
return false
}
- // The normal peer can't become learner.
- if !r.isLearner {
- for _, id := range s.Metadata.ConfState.Learners {
- if id == r.id {
- r.logger.Errorf("%x can't become learner when restores snapshot [index: %d, term: %d]", r.id, s.Metadata.Index, s.Metadata.Term)
- return false
- }
- }
- }
-
- r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] starts to restore snapshot [index: %d, term: %d]",
- r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
-
r.raftLog.restore(s)
- r.prs = make(map[uint64]*Progress)
- r.learnerPrs = make(map[uint64]*Progress)
- r.restoreNode(s.Metadata.ConfState.Nodes, false)
- r.restoreNode(s.Metadata.ConfState.Learners, true)
- return true
-}
-func (r *raft) restoreNode(nodes []uint64, isLearner bool) {
- for _, n := range nodes {
- match, next := uint64(0), r.raftLog.lastIndex()+1
- if n == r.id {
- match = next - 1
- r.isLearner = isLearner
- }
- r.setProgress(n, match, next, isLearner)
- r.logger.Infof("%x restored progress of %x [%s]", r.id, n, r.getProgress(n))
+ // Reset the configuration and add the (potentially updated) peers in anew.
+ r.prs = tracker.MakeProgressTracker(r.prs.MaxInflight)
+ cfg, prs, err := confchange.Restore(confchange.Changer{
+ Tracker: r.prs,
+ LastIndex: r.raftLog.lastIndex(),
+ }, cs)
+
+ if err != nil {
+ // This should never happen. Either there's a bug in our config change
+ // handling or the client corrupted the conf change.
+ panic(fmt.Sprintf("unable to restore config %+v: %s", cs, err))
}
+
+ assertConfStatesEquivalent(r.logger, cs, r.switchToConfig(cfg, prs))
+
+ pr := r.prs.Progress[r.id]
+ pr.MaybeUpdate(pr.Next - 1) // TODO(tbg): this is untested and likely unneeded
+
+ r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] restored snapshot [index: %d, term: %d]",
+ r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
+ return true
}
// promotable indicates whether state machine can be promoted to leader,
// which is true when its own id is in progress list.
func (r *raft) promotable() bool {
- _, ok := r.prs[r.id]
- return ok
+ pr := r.prs.Progress[r.id]
+ return pr != nil && !pr.IsLearner
}
-func (r *raft) addNode(id uint64) {
- r.addNodeOrLearnerNode(id, false)
-}
-
-func (r *raft) addLearner(id uint64) {
- r.addNodeOrLearnerNode(id, true)
-}
-
-func (r *raft) addNodeOrLearnerNode(id uint64, isLearner bool) {
- pr := r.getProgress(id)
- if pr == nil {
- r.setProgress(id, 0, r.raftLog.lastIndex()+1, isLearner)
- } else {
- if isLearner && !pr.IsLearner {
- // can only change Learner to Voter
- r.logger.Infof("%x ignored addLearner: do not support changing %x from raft peer to learner.", r.id, id)
- return
+func (r *raft) applyConfChange(cc pb.ConfChangeV2) pb.ConfState {
+ cfg, prs, err := func() (tracker.Config, tracker.ProgressMap, error) {
+ changer := confchange.Changer{
+ Tracker: r.prs,
+ LastIndex: r.raftLog.lastIndex(),
}
-
- if isLearner == pr.IsLearner {
- // Ignore any redundant addNode calls (which can happen because the
- // initial bootstrapping entries are applied twice).
- return
+ if cc.LeaveJoint() {
+ return changer.LeaveJoint()
+ } else if autoLeave, ok := cc.EnterJoint(); ok {
+ return changer.EnterJoint(autoLeave, cc.Changes...)
}
+ return changer.Simple(cc.Changes...)
+ }()
- // change Learner to Voter, use origin Learner progress
- delete(r.learnerPrs, id)
- pr.IsLearner = false
- r.prs[id] = pr
+ if err != nil {
+ // TODO(tbg): return the error to the caller.
+ panic(err)
}
- if r.id == id {
- r.isLearner = isLearner
- }
-
- // When a node is first added, we should mark it as recently active.
- // Otherwise, CheckQuorum may cause us to step down if it is invoked
- // before the added node has a chance to communicate with us.
- pr = r.getProgress(id)
- pr.RecentActive = true
+ return r.switchToConfig(cfg, prs)
}
-func (r *raft) removeNode(id uint64) {
- r.delProgress(id)
+// switchToConfig reconfigures this node to use the provided configuration. It
+// updates the in-memory state and, when necessary, carries out additional
+// actions such as reacting to the removal of nodes or changed quorum
+// requirements.
+//
+// The inputs usually result from restoring a ConfState or applying a ConfChange.
+func (r *raft) switchToConfig(cfg tracker.Config, prs tracker.ProgressMap) pb.ConfState {
+ r.prs.Config = cfg
+ r.prs.Progress = prs
- // do not try to commit or abort transferring if there is no nodes in the cluster.
- if len(r.prs) == 0 && len(r.learnerPrs) == 0 {
- return
+ r.logger.Infof("%x switched to configuration %s", r.id, r.prs.Config)
+ cs := r.prs.ConfState()
+ pr, ok := r.prs.Progress[r.id]
+
+ // Update whether the node itself is a learner, resetting to false when the
+ // node is removed.
+ r.isLearner = ok && pr.IsLearner
+
+ if (!ok || r.isLearner) && r.state == StateLeader {
+ // This node is leader and was removed or demoted. We prevent demotions
+ // at the time writing but hypothetically we handle them the same way as
+ // removing the leader: stepping down into the next Term.
+ //
+ // TODO(tbg): step down (for sanity) and ask follower with largest Match
+ // to TimeoutNow (to avoid interruption). This might still drop some
+ // proposals but it's better than nothing.
+ //
+ // TODO(tbg): test this branch. It is untested at the time of writing.
+ return cs
}
- // The quorum size is now smaller, so see if any pending entries can
- // be committed.
+ // The remaining steps only make sense if this node is the leader and there
+ // are other nodes.
+ if r.state != StateLeader || len(cs.Voters) == 0 {
+ return cs
+ }
+
if r.maybeCommit() {
+ // If the configuration change means that more entries are committed now,
+ // broadcast/append to everyone in the updated config.
r.bcastAppend()
+ } else {
+ // Otherwise, still probe the newly added replicas; there's no reason to
+ // let them wait out a heartbeat interval (or the next incoming
+ // proposal).
+ r.prs.Visit(func(id uint64, pr *tracker.Progress) {
+ r.maybeSendAppend(id, false /* sendIfEmpty */)
+ })
}
- // If the removed node is the leadTransferee, then abort the leadership transferring.
- if r.state == StateLeader && r.leadTransferee == id {
+ // If the the leadTransferee was removed, abort the leadership transfer.
+ if _, tOK := r.prs.Progress[r.leadTransferee]; !tOK && r.leadTransferee != 0 {
r.abortLeaderTransfer()
}
-}
-func (r *raft) setProgress(id, match, next uint64, isLearner bool) {
- if !isLearner {
- delete(r.learnerPrs, id)
- r.prs[id] = &Progress{Next: next, Match: match, ins: newInflights(r.maxInflight)}
- return
- }
-
- if _, ok := r.prs[id]; ok {
- panic(fmt.Sprintf("%x unexpected changing from voter to learner for %x", r.id, id))
- }
- r.learnerPrs[id] = &Progress{Next: next, Match: match, ins: newInflights(r.maxInflight), IsLearner: true}
-}
-
-func (r *raft) delProgress(id uint64) {
- delete(r.prs, id)
- delete(r.learnerPrs, id)
+ return cs
}
func (r *raft) loadState(state pb.HardState) {
@@ -1490,29 +1594,6 @@
r.randomizedElectionTimeout = r.electionTimeout + globalRand.Intn(r.electionTimeout)
}
-// checkQuorumActive returns true if the quorum is active from
-// the view of the local raft state machine. Otherwise, it returns
-// false.
-// checkQuorumActive also resets all RecentActive to false.
-func (r *raft) checkQuorumActive() bool {
- var act int
-
- r.forEachProgress(func(id uint64, pr *Progress) {
- if id == r.id { // self is always active
- act++
- return
- }
-
- if pr.RecentActive && !pr.IsLearner {
- act++
- }
-
- pr.RecentActive = false
- })
-
- return act >= r.quorum()
-}
-
func (r *raft) sendTimeoutNow(to uint64) {
r.send(pb.Message{To: to, Type: pb.MsgTimeoutNow})
}
diff --git a/vendor/go.etcd.io/etcd/raft/raftpb/confchange.go b/vendor/go.etcd.io/etcd/raft/raftpb/confchange.go
new file mode 100644
index 0000000..46a7a70
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/raft/raftpb/confchange.go
@@ -0,0 +1,170 @@
+// Copyright 2019 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package raftpb
+
+import (
+ "fmt"
+ "strconv"
+ "strings"
+
+ "github.com/gogo/protobuf/proto"
+)
+
+// ConfChangeI abstracts over ConfChangeV2 and (legacy) ConfChange to allow
+// treating them in a unified manner.
+type ConfChangeI interface {
+ AsV2() ConfChangeV2
+ AsV1() (ConfChange, bool)
+}
+
+// MarshalConfChange calls Marshal on the underlying ConfChange or ConfChangeV2
+// and returns the result along with the corresponding EntryType.
+func MarshalConfChange(c ConfChangeI) (EntryType, []byte, error) {
+ var typ EntryType
+ var ccdata []byte
+ var err error
+ if ccv1, ok := c.AsV1(); ok {
+ typ = EntryConfChange
+ ccdata, err = ccv1.Marshal()
+ } else {
+ ccv2 := c.AsV2()
+ typ = EntryConfChangeV2
+ ccdata, err = ccv2.Marshal()
+ }
+ return typ, ccdata, err
+}
+
+// AsV2 returns a V2 configuration change carrying out the same operation.
+func (c ConfChange) AsV2() ConfChangeV2 {
+ return ConfChangeV2{
+ Changes: []ConfChangeSingle{{
+ Type: c.Type,
+ NodeID: c.NodeID,
+ }},
+ Context: c.Context,
+ }
+}
+
+// AsV1 returns the ConfChange and true.
+func (c ConfChange) AsV1() (ConfChange, bool) {
+ return c, true
+}
+
+// AsV2 is the identity.
+func (c ConfChangeV2) AsV2() ConfChangeV2 { return c }
+
+// AsV1 returns ConfChange{} and false.
+func (c ConfChangeV2) AsV1() (ConfChange, bool) { return ConfChange{}, false }
+
+// EnterJoint returns two bools. The second bool is true if and only if this
+// config change will use Joint Consensus, which is the case if it contains more
+// than one change or if the use of Joint Consensus was requested explicitly.
+// The first bool can only be true if second one is, and indicates whether the
+// Joint State will be left automatically.
+func (c *ConfChangeV2) EnterJoint() (autoLeave bool, ok bool) {
+ // NB: in theory, more config changes could qualify for the "simple"
+ // protocol but it depends on the config on top of which the changes apply.
+ // For example, adding two learners is not OK if both nodes are part of the
+ // base config (i.e. two voters are turned into learners in the process of
+ // applying the conf change). In practice, these distinctions should not
+ // matter, so we keep it simple and use Joint Consensus liberally.
+ if c.Transition != ConfChangeTransitionAuto || len(c.Changes) > 1 {
+ // Use Joint Consensus.
+ var autoLeave bool
+ switch c.Transition {
+ case ConfChangeTransitionAuto:
+ autoLeave = true
+ case ConfChangeTransitionJointImplicit:
+ autoLeave = true
+ case ConfChangeTransitionJointExplicit:
+ default:
+ panic(fmt.Sprintf("unknown transition: %+v", c))
+ }
+ return autoLeave, true
+ }
+ return false, false
+}
+
+// LeaveJoint is true if the configuration change leaves a joint configuration.
+// This is the case if the ConfChangeV2 is zero, with the possible exception of
+// the Context field.
+func (c *ConfChangeV2) LeaveJoint() bool {
+ cpy := *c
+ cpy.Context = nil
+ return proto.Equal(&cpy, &ConfChangeV2{})
+}
+
+// ConfChangesFromString parses a Space-delimited sequence of operations into a
+// slice of ConfChangeSingle. The supported operations are:
+// - vn: make n a voter,
+// - ln: make n a learner,
+// - rn: remove n, and
+// - un: update n.
+func ConfChangesFromString(s string) ([]ConfChangeSingle, error) {
+ var ccs []ConfChangeSingle
+ toks := strings.Split(strings.TrimSpace(s), " ")
+ if toks[0] == "" {
+ toks = nil
+ }
+ for _, tok := range toks {
+ if len(tok) < 2 {
+ return nil, fmt.Errorf("unknown token %s", tok)
+ }
+ var cc ConfChangeSingle
+ switch tok[0] {
+ case 'v':
+ cc.Type = ConfChangeAddNode
+ case 'l':
+ cc.Type = ConfChangeAddLearnerNode
+ case 'r':
+ cc.Type = ConfChangeRemoveNode
+ case 'u':
+ cc.Type = ConfChangeUpdateNode
+ default:
+ return nil, fmt.Errorf("unknown input: %s", tok)
+ }
+ id, err := strconv.ParseUint(tok[1:], 10, 64)
+ if err != nil {
+ return nil, err
+ }
+ cc.NodeID = id
+ ccs = append(ccs, cc)
+ }
+ return ccs, nil
+}
+
+// ConfChangesToString is the inverse to ConfChangesFromString.
+func ConfChangesToString(ccs []ConfChangeSingle) string {
+ var buf strings.Builder
+ for i, cc := range ccs {
+ if i > 0 {
+ buf.WriteByte(' ')
+ }
+ switch cc.Type {
+ case ConfChangeAddNode:
+ buf.WriteByte('v')
+ case ConfChangeAddLearnerNode:
+ buf.WriteByte('l')
+ case ConfChangeRemoveNode:
+ buf.WriteByte('r')
+ case ConfChangeUpdateNode:
+ buf.WriteByte('u')
+ default:
+ buf.WriteString("unknown")
+ }
+ fmt.Fprintf(&buf, "%d", cc.NodeID)
+ }
+ return buf.String()
+}
diff --git a/vendor/go.etcd.io/etcd/raft/raftpb/confstate.go b/vendor/go.etcd.io/etcd/raft/raftpb/confstate.go
new file mode 100644
index 0000000..4bda932
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/raft/raftpb/confstate.go
@@ -0,0 +1,45 @@
+// Copyright 2019 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package raftpb
+
+import (
+ "fmt"
+ "reflect"
+ "sort"
+)
+
+// Equivalent returns a nil error if the inputs describe the same configuration.
+// On mismatch, returns a descriptive error showing the differences.
+func (cs ConfState) Equivalent(cs2 ConfState) error {
+ cs1 := cs
+ orig1, orig2 := cs1, cs2
+ s := func(sl *[]uint64) {
+ *sl = append([]uint64(nil), *sl...)
+ sort.Slice(*sl, func(i, j int) bool { return (*sl)[i] < (*sl)[j] })
+ }
+
+ for _, cs := range []*ConfState{&cs1, &cs2} {
+ s(&cs.Voters)
+ s(&cs.Learners)
+ s(&cs.VotersOutgoing)
+ s(&cs.LearnersNext)
+ cs.XXX_unrecognized = nil
+ }
+
+ if !reflect.DeepEqual(cs1, cs2) {
+ return fmt.Errorf("ConfStates not equivalent after sorting:\n%+#v\n%+#v\nInputs were:\n%+#v\n%+#v", cs1, cs2, orig1, orig2)
+ }
+ return nil
+}
diff --git a/vendor/go.etcd.io/etcd/raft/raftpb/raft.pb.go b/vendor/go.etcd.io/etcd/raft/raftpb/raft.pb.go
index fd9ee37..fcf259c 100644
--- a/vendor/go.etcd.io/etcd/raft/raftpb/raft.pb.go
+++ b/vendor/go.etcd.io/etcd/raft/raftpb/raft.pb.go
@@ -15,6 +15,8 @@
HardState
ConfState
ConfChange
+ ConfChangeSingle
+ ConfChangeV2
*/
package raftpb
@@ -44,17 +46,20 @@
type EntryType int32
const (
- EntryNormal EntryType = 0
- EntryConfChange EntryType = 1
+ EntryNormal EntryType = 0
+ EntryConfChange EntryType = 1
+ EntryConfChangeV2 EntryType = 2
)
var EntryType_name = map[int32]string{
0: "EntryNormal",
1: "EntryConfChange",
+ 2: "EntryConfChangeV2",
}
var EntryType_value = map[string]int32{
- "EntryNormal": 0,
- "EntryConfChange": 1,
+ "EntryNormal": 0,
+ "EntryConfChange": 1,
+ "EntryConfChangeV2": 2,
}
func (x EntryType) Enum() *EntryType {
@@ -160,6 +165,57 @@
}
func (MessageType) EnumDescriptor() ([]byte, []int) { return fileDescriptorRaft, []int{1} }
+// ConfChangeTransition specifies the behavior of a configuration change with
+// respect to joint consensus.
+type ConfChangeTransition int32
+
+const (
+ // Automatically use the simple protocol if possible, otherwise fall back
+ // to ConfChangeJointImplicit. Most applications will want to use this.
+ ConfChangeTransitionAuto ConfChangeTransition = 0
+ // Use joint consensus unconditionally, and transition out of them
+ // automatically (by proposing a zero configuration change).
+ //
+ // This option is suitable for applications that want to minimize the time
+ // spent in the joint configuration and do not store the joint configuration
+ // in the state machine (outside of InitialState).
+ ConfChangeTransitionJointImplicit ConfChangeTransition = 1
+ // Use joint consensus and remain in the joint configuration until the
+ // application proposes a no-op configuration change. This is suitable for
+ // applications that want to explicitly control the transitions, for example
+ // to use a custom payload (via the Context field).
+ ConfChangeTransitionJointExplicit ConfChangeTransition = 2
+)
+
+var ConfChangeTransition_name = map[int32]string{
+ 0: "ConfChangeTransitionAuto",
+ 1: "ConfChangeTransitionJointImplicit",
+ 2: "ConfChangeTransitionJointExplicit",
+}
+var ConfChangeTransition_value = map[string]int32{
+ "ConfChangeTransitionAuto": 0,
+ "ConfChangeTransitionJointImplicit": 1,
+ "ConfChangeTransitionJointExplicit": 2,
+}
+
+func (x ConfChangeTransition) Enum() *ConfChangeTransition {
+ p := new(ConfChangeTransition)
+ *p = x
+ return p
+}
+func (x ConfChangeTransition) String() string {
+ return proto.EnumName(ConfChangeTransition_name, int32(x))
+}
+func (x *ConfChangeTransition) UnmarshalJSON(data []byte) error {
+ value, err := proto.UnmarshalJSONEnum(ConfChangeTransition_value, data, "ConfChangeTransition")
+ if err != nil {
+ return err
+ }
+ *x = ConfChangeTransition(value)
+ return nil
+}
+func (ConfChangeTransition) EnumDescriptor() ([]byte, []int) { return fileDescriptorRaft, []int{2} }
+
type ConfChangeType int32
const (
@@ -198,7 +254,7 @@
*x = ConfChangeType(value)
return nil
}
-func (ConfChangeType) EnumDescriptor() ([]byte, []int) { return fileDescriptorRaft, []int{2} }
+func (ConfChangeType) EnumDescriptor() ([]byte, []int) { return fileDescriptorRaft, []int{3} }
type Entry struct {
Term uint64 `protobuf:"varint,2,opt,name=Term" json:"Term"`
@@ -270,9 +326,21 @@
func (*HardState) Descriptor() ([]byte, []int) { return fileDescriptorRaft, []int{4} }
type ConfState struct {
- Nodes []uint64 `protobuf:"varint,1,rep,name=nodes" json:"nodes,omitempty"`
- Learners []uint64 `protobuf:"varint,2,rep,name=learners" json:"learners,omitempty"`
- XXX_unrecognized []byte `json:"-"`
+ // The voters in the incoming config. (If the configuration is not joint,
+ // then the outgoing config is empty).
+ Voters []uint64 `protobuf:"varint,1,rep,name=voters" json:"voters,omitempty"`
+ // The learners in the incoming config.
+ Learners []uint64 `protobuf:"varint,2,rep,name=learners" json:"learners,omitempty"`
+ // The voters in the outgoing config.
+ VotersOutgoing []uint64 `protobuf:"varint,3,rep,name=voters_outgoing,json=votersOutgoing" json:"voters_outgoing,omitempty"`
+ // The nodes that will become learners when the outgoing config is removed.
+ // These nodes are necessarily currently in nodes_joint (or they would have
+ // been added to the incoming config right away).
+ LearnersNext []uint64 `protobuf:"varint,4,rep,name=learners_next,json=learnersNext" json:"learners_next,omitempty"`
+ // If set, the config is joint and Raft will automatically transition into
+ // the final config (i.e. remove the outgoing config) when this is safe.
+ AutoLeave bool `protobuf:"varint,5,opt,name=auto_leave,json=autoLeave" json:"auto_leave"`
+ XXX_unrecognized []byte `json:"-"`
}
func (m *ConfState) Reset() { *m = ConfState{} }
@@ -281,11 +349,14 @@
func (*ConfState) Descriptor() ([]byte, []int) { return fileDescriptorRaft, []int{5} }
type ConfChange struct {
- ID uint64 `protobuf:"varint,1,opt,name=ID" json:"ID"`
- Type ConfChangeType `protobuf:"varint,2,opt,name=Type,enum=raftpb.ConfChangeType" json:"Type"`
- NodeID uint64 `protobuf:"varint,3,opt,name=NodeID" json:"NodeID"`
- Context []byte `protobuf:"bytes,4,opt,name=Context" json:"Context,omitempty"`
- XXX_unrecognized []byte `json:"-"`
+ Type ConfChangeType `protobuf:"varint,2,opt,name=type,enum=raftpb.ConfChangeType" json:"type"`
+ NodeID uint64 `protobuf:"varint,3,opt,name=node_id,json=nodeId" json:"node_id"`
+ Context []byte `protobuf:"bytes,4,opt,name=context" json:"context,omitempty"`
+ // NB: this is used only by etcd to thread through a unique identifier.
+ // Ideally it should really use the Context instead. No counterpart to
+ // this field exists in ConfChangeV2.
+ ID uint64 `protobuf:"varint,1,opt,name=id" json:"id"`
+ XXX_unrecognized []byte `json:"-"`
}
func (m *ConfChange) Reset() { *m = ConfChange{} }
@@ -293,6 +364,63 @@
func (*ConfChange) ProtoMessage() {}
func (*ConfChange) Descriptor() ([]byte, []int) { return fileDescriptorRaft, []int{6} }
+// ConfChangeSingle is an individual configuration change operation. Multiple
+// such operations can be carried out atomically via a ConfChangeV2.
+type ConfChangeSingle struct {
+ Type ConfChangeType `protobuf:"varint,1,opt,name=type,enum=raftpb.ConfChangeType" json:"type"`
+ NodeID uint64 `protobuf:"varint,2,opt,name=node_id,json=nodeId" json:"node_id"`
+ XXX_unrecognized []byte `json:"-"`
+}
+
+func (m *ConfChangeSingle) Reset() { *m = ConfChangeSingle{} }
+func (m *ConfChangeSingle) String() string { return proto.CompactTextString(m) }
+func (*ConfChangeSingle) ProtoMessage() {}
+func (*ConfChangeSingle) Descriptor() ([]byte, []int) { return fileDescriptorRaft, []int{7} }
+
+// ConfChangeV2 messages initiate configuration changes. They support both the
+// simple "one at a time" membership change protocol and full Joint Consensus
+// allowing for arbitrary changes in membership.
+//
+// The supplied context is treated as an opaque payload and can be used to
+// attach an action on the state machine to the application of the config change
+// proposal. Note that contrary to Joint Consensus as outlined in the Raft
+// paper[1], configuration changes become active when they are *applied* to the
+// state machine (not when they are appended to the log).
+//
+// The simple protocol can be used whenever only a single change is made.
+//
+// Non-simple changes require the use of Joint Consensus, for which two
+// configuration changes are run. The first configuration change specifies the
+// desired changes and transitions the Raft group into the joint configuration,
+// in which quorum requires a majority of both the pre-changes and post-changes
+// configuration. Joint Consensus avoids entering fragile intermediate
+// configurations that could compromise survivability. For example, without the
+// use of Joint Consensus and running across three availability zones with a
+// replication factor of three, it is not possible to replace a voter without
+// entering an intermediate configuration that does not survive the outage of
+// one availability zone.
+//
+// The provided ConfChangeTransition specifies how (and whether) Joint Consensus
+// is used, and assigns the task of leaving the joint configuration either to
+// Raft or the application. Leaving the joint configuration is accomplished by
+// proposing a ConfChangeV2 with only and optionally the Context field
+// populated.
+//
+// For details on Raft membership changes, see:
+//
+// [1]: https://github.com/ongardie/dissertation/blob/master/online-trim.pdf
+type ConfChangeV2 struct {
+ Transition ConfChangeTransition `protobuf:"varint,1,opt,name=transition,enum=raftpb.ConfChangeTransition" json:"transition"`
+ Changes []ConfChangeSingle `protobuf:"bytes,2,rep,name=changes" json:"changes"`
+ Context []byte `protobuf:"bytes,3,opt,name=context" json:"context,omitempty"`
+ XXX_unrecognized []byte `json:"-"`
+}
+
+func (m *ConfChangeV2) Reset() { *m = ConfChangeV2{} }
+func (m *ConfChangeV2) String() string { return proto.CompactTextString(m) }
+func (*ConfChangeV2) ProtoMessage() {}
+func (*ConfChangeV2) Descriptor() ([]byte, []int) { return fileDescriptorRaft, []int{8} }
+
func init() {
proto.RegisterType((*Entry)(nil), "raftpb.Entry")
proto.RegisterType((*SnapshotMetadata)(nil), "raftpb.SnapshotMetadata")
@@ -301,8 +429,11 @@
proto.RegisterType((*HardState)(nil), "raftpb.HardState")
proto.RegisterType((*ConfState)(nil), "raftpb.ConfState")
proto.RegisterType((*ConfChange)(nil), "raftpb.ConfChange")
+ proto.RegisterType((*ConfChangeSingle)(nil), "raftpb.ConfChangeSingle")
+ proto.RegisterType((*ConfChangeV2)(nil), "raftpb.ConfChangeV2")
proto.RegisterEnum("raftpb.EntryType", EntryType_name, EntryType_value)
proto.RegisterEnum("raftpb.MessageType", MessageType_name, MessageType_value)
+ proto.RegisterEnum("raftpb.ConfChangeTransition", ConfChangeTransition_name, ConfChangeTransition_value)
proto.RegisterEnum("raftpb.ConfChangeType", ConfChangeType_name, ConfChangeType_value)
}
func (m *Entry) Marshal() (dAtA []byte, err error) {
@@ -535,8 +666,8 @@
_ = i
var l int
_ = l
- if len(m.Nodes) > 0 {
- for _, num := range m.Nodes {
+ if len(m.Voters) > 0 {
+ for _, num := range m.Voters {
dAtA[i] = 0x8
i++
i = encodeVarintRaft(dAtA, i, uint64(num))
@@ -549,6 +680,28 @@
i = encodeVarintRaft(dAtA, i, uint64(num))
}
}
+ if len(m.VotersOutgoing) > 0 {
+ for _, num := range m.VotersOutgoing {
+ dAtA[i] = 0x18
+ i++
+ i = encodeVarintRaft(dAtA, i, uint64(num))
+ }
+ }
+ if len(m.LearnersNext) > 0 {
+ for _, num := range m.LearnersNext {
+ dAtA[i] = 0x20
+ i++
+ i = encodeVarintRaft(dAtA, i, uint64(num))
+ }
+ }
+ dAtA[i] = 0x28
+ i++
+ if m.AutoLeave {
+ dAtA[i] = 1
+ } else {
+ dAtA[i] = 0
+ }
+ i++
if m.XXX_unrecognized != nil {
i += copy(dAtA[i:], m.XXX_unrecognized)
}
@@ -591,6 +744,75 @@
return i, nil
}
+func (m *ConfChangeSingle) Marshal() (dAtA []byte, err error) {
+ size := m.Size()
+ dAtA = make([]byte, size)
+ n, err := m.MarshalTo(dAtA)
+ if err != nil {
+ return nil, err
+ }
+ return dAtA[:n], nil
+}
+
+func (m *ConfChangeSingle) MarshalTo(dAtA []byte) (int, error) {
+ var i int
+ _ = i
+ var l int
+ _ = l
+ dAtA[i] = 0x8
+ i++
+ i = encodeVarintRaft(dAtA, i, uint64(m.Type))
+ dAtA[i] = 0x10
+ i++
+ i = encodeVarintRaft(dAtA, i, uint64(m.NodeID))
+ if m.XXX_unrecognized != nil {
+ i += copy(dAtA[i:], m.XXX_unrecognized)
+ }
+ return i, nil
+}
+
+func (m *ConfChangeV2) Marshal() (dAtA []byte, err error) {
+ size := m.Size()
+ dAtA = make([]byte, size)
+ n, err := m.MarshalTo(dAtA)
+ if err != nil {
+ return nil, err
+ }
+ return dAtA[:n], nil
+}
+
+func (m *ConfChangeV2) MarshalTo(dAtA []byte) (int, error) {
+ var i int
+ _ = i
+ var l int
+ _ = l
+ dAtA[i] = 0x8
+ i++
+ i = encodeVarintRaft(dAtA, i, uint64(m.Transition))
+ if len(m.Changes) > 0 {
+ for _, msg := range m.Changes {
+ dAtA[i] = 0x12
+ i++
+ i = encodeVarintRaft(dAtA, i, uint64(msg.Size()))
+ n, err := msg.MarshalTo(dAtA[i:])
+ if err != nil {
+ return 0, err
+ }
+ i += n
+ }
+ }
+ if m.Context != nil {
+ dAtA[i] = 0x1a
+ i++
+ i = encodeVarintRaft(dAtA, i, uint64(len(m.Context)))
+ i += copy(dAtA[i:], m.Context)
+ }
+ if m.XXX_unrecognized != nil {
+ i += copy(dAtA[i:], m.XXX_unrecognized)
+ }
+ return i, nil
+}
+
func encodeVarintRaft(dAtA []byte, offset int, v uint64) int {
for v >= 1<<7 {
dAtA[offset] = uint8(v&0x7f | 0x80)
@@ -689,8 +911,8 @@
func (m *ConfState) Size() (n int) {
var l int
_ = l
- if len(m.Nodes) > 0 {
- for _, e := range m.Nodes {
+ if len(m.Voters) > 0 {
+ for _, e := range m.Voters {
n += 1 + sovRaft(uint64(e))
}
}
@@ -699,6 +921,17 @@
n += 1 + sovRaft(uint64(e))
}
}
+ if len(m.VotersOutgoing) > 0 {
+ for _, e := range m.VotersOutgoing {
+ n += 1 + sovRaft(uint64(e))
+ }
+ }
+ if len(m.LearnersNext) > 0 {
+ for _, e := range m.LearnersNext {
+ n += 1 + sovRaft(uint64(e))
+ }
+ }
+ n += 2
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
@@ -721,6 +954,37 @@
return n
}
+func (m *ConfChangeSingle) Size() (n int) {
+ var l int
+ _ = l
+ n += 1 + sovRaft(uint64(m.Type))
+ n += 1 + sovRaft(uint64(m.NodeID))
+ if m.XXX_unrecognized != nil {
+ n += len(m.XXX_unrecognized)
+ }
+ return n
+}
+
+func (m *ConfChangeV2) Size() (n int) {
+ var l int
+ _ = l
+ n += 1 + sovRaft(uint64(m.Transition))
+ if len(m.Changes) > 0 {
+ for _, e := range m.Changes {
+ l = e.Size()
+ n += 1 + l + sovRaft(uint64(l))
+ }
+ }
+ if m.Context != nil {
+ l = len(m.Context)
+ n += 1 + l + sovRaft(uint64(l))
+ }
+ if m.XXX_unrecognized != nil {
+ n += len(m.XXX_unrecognized)
+ }
+ return n
+}
+
func sovRaft(x uint64) (n int) {
for {
n++
@@ -1573,7 +1837,7 @@
break
}
}
- m.Nodes = append(m.Nodes, v)
+ m.Voters = append(m.Voters, v)
} else if wireType == 2 {
var packedLen int
for shift := uint(0); ; shift += 7 {
@@ -1613,10 +1877,10 @@
break
}
}
- m.Nodes = append(m.Nodes, v)
+ m.Voters = append(m.Voters, v)
}
} else {
- return fmt.Errorf("proto: wrong wireType = %d for field Nodes", wireType)
+ return fmt.Errorf("proto: wrong wireType = %d for field Voters", wireType)
}
case 2:
if wireType == 0 {
@@ -1680,6 +1944,150 @@
} else {
return fmt.Errorf("proto: wrong wireType = %d for field Learners", wireType)
}
+ case 3:
+ if wireType == 0 {
+ var v uint64
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ v |= (uint64(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ m.VotersOutgoing = append(m.VotersOutgoing, v)
+ } else if wireType == 2 {
+ var packedLen int
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ packedLen |= (int(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ if packedLen < 0 {
+ return ErrInvalidLengthRaft
+ }
+ postIndex := iNdEx + packedLen
+ if postIndex > l {
+ return io.ErrUnexpectedEOF
+ }
+ for iNdEx < postIndex {
+ var v uint64
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ v |= (uint64(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ m.VotersOutgoing = append(m.VotersOutgoing, v)
+ }
+ } else {
+ return fmt.Errorf("proto: wrong wireType = %d for field VotersOutgoing", wireType)
+ }
+ case 4:
+ if wireType == 0 {
+ var v uint64
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ v |= (uint64(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ m.LearnersNext = append(m.LearnersNext, v)
+ } else if wireType == 2 {
+ var packedLen int
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ packedLen |= (int(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ if packedLen < 0 {
+ return ErrInvalidLengthRaft
+ }
+ postIndex := iNdEx + packedLen
+ if postIndex > l {
+ return io.ErrUnexpectedEOF
+ }
+ for iNdEx < postIndex {
+ var v uint64
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ v |= (uint64(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ m.LearnersNext = append(m.LearnersNext, v)
+ }
+ } else {
+ return fmt.Errorf("proto: wrong wireType = %d for field LearnersNext", wireType)
+ }
+ case 5:
+ if wireType != 0 {
+ return fmt.Errorf("proto: wrong wireType = %d for field AutoLeave", wireType)
+ }
+ var v int
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ v |= (int(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ m.AutoLeave = bool(v != 0)
default:
iNdEx = preIndex
skippy, err := skipRaft(dAtA[iNdEx:])
@@ -1841,6 +2249,227 @@
}
return nil
}
+func (m *ConfChangeSingle) Unmarshal(dAtA []byte) error {
+ l := len(dAtA)
+ iNdEx := 0
+ for iNdEx < l {
+ preIndex := iNdEx
+ var wire uint64
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ wire |= (uint64(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ fieldNum := int32(wire >> 3)
+ wireType := int(wire & 0x7)
+ if wireType == 4 {
+ return fmt.Errorf("proto: ConfChangeSingle: wiretype end group for non-group")
+ }
+ if fieldNum <= 0 {
+ return fmt.Errorf("proto: ConfChangeSingle: illegal tag %d (wire type %d)", fieldNum, wire)
+ }
+ switch fieldNum {
+ case 1:
+ if wireType != 0 {
+ return fmt.Errorf("proto: wrong wireType = %d for field Type", wireType)
+ }
+ m.Type = 0
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ m.Type |= (ConfChangeType(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ case 2:
+ if wireType != 0 {
+ return fmt.Errorf("proto: wrong wireType = %d for field NodeID", wireType)
+ }
+ m.NodeID = 0
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ m.NodeID |= (uint64(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ default:
+ iNdEx = preIndex
+ skippy, err := skipRaft(dAtA[iNdEx:])
+ if err != nil {
+ return err
+ }
+ if skippy < 0 {
+ return ErrInvalidLengthRaft
+ }
+ if (iNdEx + skippy) > l {
+ return io.ErrUnexpectedEOF
+ }
+ m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
+ iNdEx += skippy
+ }
+ }
+
+ if iNdEx > l {
+ return io.ErrUnexpectedEOF
+ }
+ return nil
+}
+func (m *ConfChangeV2) Unmarshal(dAtA []byte) error {
+ l := len(dAtA)
+ iNdEx := 0
+ for iNdEx < l {
+ preIndex := iNdEx
+ var wire uint64
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ wire |= (uint64(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ fieldNum := int32(wire >> 3)
+ wireType := int(wire & 0x7)
+ if wireType == 4 {
+ return fmt.Errorf("proto: ConfChangeV2: wiretype end group for non-group")
+ }
+ if fieldNum <= 0 {
+ return fmt.Errorf("proto: ConfChangeV2: illegal tag %d (wire type %d)", fieldNum, wire)
+ }
+ switch fieldNum {
+ case 1:
+ if wireType != 0 {
+ return fmt.Errorf("proto: wrong wireType = %d for field Transition", wireType)
+ }
+ m.Transition = 0
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ m.Transition |= (ConfChangeTransition(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ case 2:
+ if wireType != 2 {
+ return fmt.Errorf("proto: wrong wireType = %d for field Changes", wireType)
+ }
+ var msglen int
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ msglen |= (int(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ if msglen < 0 {
+ return ErrInvalidLengthRaft
+ }
+ postIndex := iNdEx + msglen
+ if postIndex > l {
+ return io.ErrUnexpectedEOF
+ }
+ m.Changes = append(m.Changes, ConfChangeSingle{})
+ if err := m.Changes[len(m.Changes)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
+ return err
+ }
+ iNdEx = postIndex
+ case 3:
+ if wireType != 2 {
+ return fmt.Errorf("proto: wrong wireType = %d for field Context", wireType)
+ }
+ var byteLen int
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ byteLen |= (int(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ if byteLen < 0 {
+ return ErrInvalidLengthRaft
+ }
+ postIndex := iNdEx + byteLen
+ if postIndex > l {
+ return io.ErrUnexpectedEOF
+ }
+ m.Context = append(m.Context[:0], dAtA[iNdEx:postIndex]...)
+ if m.Context == nil {
+ m.Context = []byte{}
+ }
+ iNdEx = postIndex
+ default:
+ iNdEx = preIndex
+ skippy, err := skipRaft(dAtA[iNdEx:])
+ if err != nil {
+ return err
+ }
+ if skippy < 0 {
+ return ErrInvalidLengthRaft
+ }
+ if (iNdEx + skippy) > l {
+ return io.ErrUnexpectedEOF
+ }
+ m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
+ iNdEx += skippy
+ }
+ }
+
+ if iNdEx > l {
+ return io.ErrUnexpectedEOF
+ }
+ return nil
+}
func skipRaft(dAtA []byte) (n int, err error) {
l := len(dAtA)
iNdEx := 0
@@ -1949,56 +2578,69 @@
func init() { proto.RegisterFile("raft.proto", fileDescriptorRaft) }
var fileDescriptorRaft = []byte{
- // 815 bytes of a gzipped FileDescriptorProto
- 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x64, 0x54, 0xcd, 0x6e, 0x23, 0x45,
- 0x10, 0xf6, 0x8c, 0xc7, 0x7f, 0x35, 0x8e, 0xd3, 0xa9, 0x35, 0xa8, 0x15, 0x45, 0xc6, 0xb2, 0x38,
- 0x58, 0x41, 0x1b, 0x20, 0x07, 0x0e, 0x48, 0x1c, 0x36, 0x09, 0x52, 0x22, 0xad, 0xa3, 0xc5, 0x9b,
- 0xe5, 0x80, 0x84, 0x50, 0xc7, 0x53, 0x9e, 0x18, 0x32, 0xd3, 0xa3, 0x9e, 0xf6, 0xb2, 0xb9, 0x20,
- 0x1e, 0x80, 0x07, 0xe0, 0xc2, 0xfb, 0xe4, 0xb8, 0x12, 0x77, 0xc4, 0x86, 0x17, 0x41, 0xdd, 0xd3,
- 0x63, 0xcf, 0x24, 0xb7, 0xae, 0xef, 0xab, 0xae, 0xfa, 0xea, 0xeb, 0x9a, 0x01, 0x50, 0x62, 0xa9,
- 0x8f, 0x32, 0x25, 0xb5, 0xc4, 0xb6, 0x39, 0x67, 0xd7, 0xfb, 0xc3, 0x58, 0xc6, 0xd2, 0x42, 0x9f,
- 0x9b, 0x53, 0xc1, 0x4e, 0x7e, 0x83, 0xd6, 0xb7, 0xa9, 0x56, 0x77, 0xf8, 0x19, 0x04, 0x57, 0x77,
- 0x19, 0x71, 0x6f, 0xec, 0x4d, 0x07, 0xc7, 0x7b, 0x47, 0xc5, 0xad, 0x23, 0x4b, 0x1a, 0xe2, 0x24,
- 0xb8, 0xff, 0xe7, 0x93, 0xc6, 0xdc, 0x26, 0x21, 0x87, 0xe0, 0x8a, 0x54, 0xc2, 0xfd, 0xb1, 0x37,
- 0x0d, 0x36, 0x0c, 0xa9, 0x04, 0xf7, 0xa1, 0x75, 0x91, 0x46, 0xf4, 0x8e, 0x37, 0x2b, 0x54, 0x01,
- 0x21, 0x42, 0x70, 0x26, 0xb4, 0xe0, 0xc1, 0xd8, 0x9b, 0xf6, 0xe7, 0xf6, 0x3c, 0xf9, 0xdd, 0x03,
- 0xf6, 0x3a, 0x15, 0x59, 0x7e, 0x23, 0xf5, 0x8c, 0xb4, 0x88, 0x84, 0x16, 0xf8, 0x15, 0xc0, 0x42,
- 0xa6, 0xcb, 0x9f, 0x72, 0x2d, 0x74, 0xa1, 0x28, 0xdc, 0x2a, 0x3a, 0x95, 0xe9, 0xf2, 0xb5, 0x21,
- 0x5c, 0xf1, 0xde, 0xa2, 0x04, 0x4c, 0xf3, 0x95, 0x6d, 0x5e, 0xd5, 0x55, 0x40, 0x46, 0xb2, 0x36,
- 0x92, 0xab, 0xba, 0x2c, 0x32, 0xf9, 0x01, 0xba, 0xa5, 0x02, 0x23, 0xd1, 0x28, 0xb0, 0x3d, 0xfb,
- 0x73, 0x7b, 0xc6, 0xaf, 0xa1, 0x9b, 0x38, 0x65, 0xb6, 0x70, 0x78, 0xcc, 0x4b, 0x2d, 0x8f, 0x95,
- 0xbb, 0xba, 0x9b, 0xfc, 0xc9, 0x5f, 0x4d, 0xe8, 0xcc, 0x28, 0xcf, 0x45, 0x4c, 0xf8, 0x1c, 0x02,
- 0xbd, 0x75, 0xf8, 0x59, 0x59, 0xc3, 0xd1, 0x55, 0x8f, 0x4d, 0x1a, 0x0e, 0xc1, 0xd7, 0xb2, 0x36,
- 0x89, 0xaf, 0xa5, 0x19, 0x63, 0xa9, 0xe4, 0xa3, 0x31, 0x0c, 0xb2, 0x19, 0x30, 0x78, 0x3c, 0x20,
- 0x8e, 0xa0, 0x73, 0x2b, 0x63, 0xfb, 0x60, 0xad, 0x0a, 0x59, 0x82, 0x5b, 0xdb, 0xda, 0x4f, 0x6d,
- 0x7b, 0x0e, 0x1d, 0x4a, 0xb5, 0x5a, 0x51, 0xce, 0x3b, 0xe3, 0xe6, 0x34, 0x3c, 0xde, 0xa9, 0x6d,
- 0x46, 0x59, 0xca, 0xe5, 0xe0, 0x01, 0xb4, 0x17, 0x32, 0x49, 0x56, 0x9a, 0x77, 0x2b, 0xb5, 0x1c,
- 0x86, 0xc7, 0xd0, 0xcd, 0x9d, 0x63, 0xbc, 0x67, 0x9d, 0x64, 0x8f, 0x9d, 0x2c, 0x1d, 0x2c, 0xf3,
- 0x4c, 0x45, 0x45, 0x3f, 0xd3, 0x42, 0x73, 0x18, 0x7b, 0xd3, 0x6e, 0x59, 0xb1, 0xc0, 0xf0, 0x53,
- 0x80, 0xe2, 0x74, 0xbe, 0x4a, 0x35, 0x0f, 0x2b, 0x3d, 0x2b, 0x38, 0x72, 0xe8, 0x2c, 0x64, 0xaa,
- 0xe9, 0x9d, 0xe6, 0x7d, 0xfb, 0xb0, 0x65, 0x38, 0xf9, 0x11, 0x7a, 0xe7, 0x42, 0x45, 0xc5, 0xfa,
- 0x94, 0x0e, 0x7a, 0x4f, 0x1c, 0xe4, 0x10, 0xbc, 0x95, 0x9a, 0xea, 0xfb, 0x6e, 0x90, 0xca, 0xc0,
- 0xcd, 0xa7, 0x03, 0x4f, 0xbe, 0x81, 0xde, 0x66, 0x5d, 0x71, 0x08, 0xad, 0x54, 0x46, 0x94, 0x73,
- 0x6f, 0xdc, 0x9c, 0x06, 0xf3, 0x22, 0xc0, 0x7d, 0xe8, 0xde, 0x92, 0x50, 0x29, 0xa9, 0x9c, 0xfb,
- 0x96, 0xd8, 0xc4, 0x93, 0x3f, 0x3c, 0x00, 0x73, 0xff, 0xf4, 0x46, 0xa4, 0xb1, 0xdd, 0x88, 0x8b,
- 0xb3, 0x9a, 0x3a, 0xff, 0xe2, 0x0c, 0xbf, 0x70, 0x1f, 0xae, 0x6f, 0xd7, 0xea, 0xe3, 0xea, 0x67,
- 0x52, 0xdc, 0x7b, 0xf2, 0xf5, 0x1e, 0x40, 0xfb, 0x52, 0x46, 0x74, 0x71, 0x56, 0xd7, 0x5c, 0x60,
- 0xc6, 0xac, 0x53, 0x67, 0x56, 0xf1, 0xa1, 0x96, 0xe1, 0xe1, 0x97, 0xd0, 0xdb, 0xfc, 0x0e, 0x70,
- 0x17, 0x42, 0x1b, 0x5c, 0x4a, 0x95, 0x88, 0x5b, 0xd6, 0xc0, 0x67, 0xb0, 0x6b, 0x81, 0x6d, 0x63,
- 0xe6, 0x1d, 0xfe, 0xed, 0x43, 0x58, 0x59, 0x70, 0x04, 0x68, 0xcf, 0xf2, 0xf8, 0x7c, 0x9d, 0xb1,
- 0x06, 0x86, 0xd0, 0x99, 0xe5, 0xf1, 0x09, 0x09, 0xcd, 0x3c, 0x17, 0xbc, 0x52, 0x32, 0x63, 0xbe,
- 0xcb, 0x7a, 0x91, 0x65, 0xac, 0x89, 0x03, 0x80, 0xe2, 0x3c, 0xa7, 0x3c, 0x63, 0x81, 0x4b, 0xfc,
- 0x5e, 0x6a, 0x62, 0x2d, 0x23, 0xc2, 0x05, 0x96, 0x6d, 0x3b, 0xd6, 0x2c, 0x13, 0xeb, 0x20, 0x83,
- 0xbe, 0x69, 0x46, 0x42, 0xe9, 0x6b, 0xd3, 0xa5, 0x8b, 0x43, 0x60, 0x55, 0xc4, 0x5e, 0xea, 0x21,
- 0xc2, 0x60, 0x96, 0xc7, 0x6f, 0x52, 0x45, 0x62, 0x71, 0x23, 0xae, 0x6f, 0x89, 0x01, 0xee, 0xc1,
- 0x8e, 0x2b, 0x64, 0x1e, 0x6f, 0x9d, 0xb3, 0xd0, 0xa5, 0x9d, 0xde, 0xd0, 0xe2, 0x97, 0xef, 0xd6,
- 0x52, 0xad, 0x13, 0xd6, 0xc7, 0x8f, 0x60, 0x6f, 0x96, 0xc7, 0x57, 0x4a, 0xa4, 0xf9, 0x92, 0xd4,
- 0x4b, 0x12, 0x11, 0x29, 0xb6, 0xe3, 0x6e, 0x5f, 0xad, 0x12, 0x92, 0x6b, 0x7d, 0x29, 0x7f, 0x65,
- 0x03, 0x27, 0x66, 0x4e, 0x22, 0xb2, 0x3f, 0x43, 0xb6, 0xeb, 0xc4, 0x6c, 0x10, 0x2b, 0x86, 0xb9,
- 0x79, 0x5f, 0x29, 0xb2, 0x23, 0xee, 0xb9, 0xae, 0x2e, 0xb6, 0x39, 0x78, 0x78, 0x07, 0x83, 0xfa,
- 0xf3, 0x1a, 0x1d, 0x5b, 0xe4, 0x45, 0x14, 0x99, 0xb7, 0x64, 0x0d, 0xe4, 0x30, 0xdc, 0xc2, 0x73,
- 0x4a, 0xe4, 0x5b, 0xb2, 0x8c, 0x57, 0x67, 0xde, 0x64, 0x91, 0xd0, 0x05, 0xe3, 0xe3, 0x01, 0xf0,
- 0x5a, 0xa9, 0x97, 0xc5, 0x36, 0x5a, 0xb6, 0x79, 0xc2, 0xef, 0x3f, 0x8c, 0x1a, 0xef, 0x3f, 0x8c,
- 0x1a, 0xf7, 0x0f, 0x23, 0xef, 0xfd, 0xc3, 0xc8, 0xfb, 0xf7, 0x61, 0xe4, 0xfd, 0xf9, 0xdf, 0xa8,
- 0xf1, 0x7f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x86, 0x52, 0x5b, 0xe0, 0x74, 0x06, 0x00, 0x00,
+ // 1009 bytes of a gzipped FileDescriptorProto
+ 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x55, 0xcd, 0x6e, 0xe3, 0x36,
+ 0x17, 0xb5, 0x64, 0xc5, 0x3f, 0xd7, 0x8e, 0xc3, 0xdc, 0xc9, 0x37, 0x20, 0x82, 0xc0, 0xe3, 0xcf,
+ 0xd3, 0x62, 0x8c, 0x14, 0x93, 0x16, 0x5e, 0x14, 0x45, 0x77, 0xf9, 0x19, 0x20, 0x29, 0xe2, 0x74,
+ 0xea, 0x64, 0xb2, 0x28, 0x50, 0x04, 0x8c, 0x45, 0x2b, 0x6a, 0x2d, 0x51, 0xa0, 0xe8, 0x34, 0xd9,
+ 0x14, 0x45, 0x9f, 0xa2, 0x9b, 0xd9, 0xf6, 0x01, 0xfa, 0x14, 0x59, 0x0e, 0xd0, 0xfd, 0xa0, 0x93,
+ 0xbe, 0x48, 0x41, 0x8a, 0xb2, 0x65, 0x27, 0x98, 0x45, 0x77, 0xe4, 0x39, 0x87, 0xf7, 0x9e, 0x7b,
+ 0x79, 0x45, 0x01, 0x48, 0x36, 0x56, 0x3b, 0x89, 0x14, 0x4a, 0x60, 0x45, 0xaf, 0x93, 0xcb, 0xcd,
+ 0x8d, 0x40, 0x04, 0xc2, 0x40, 0x9f, 0xeb, 0x55, 0xc6, 0x76, 0x7f, 0x81, 0x95, 0x57, 0xb1, 0x92,
+ 0xb7, 0xf8, 0x19, 0x78, 0x67, 0xb7, 0x09, 0xa7, 0x4e, 0xc7, 0xe9, 0xb5, 0xfa, 0xeb, 0x3b, 0xd9,
+ 0xa9, 0x1d, 0x43, 0x6a, 0x62, 0xcf, 0xbb, 0x7b, 0xff, 0xac, 0x34, 0x34, 0x22, 0xa4, 0xe0, 0x9d,
+ 0x71, 0x19, 0x51, 0xb7, 0xe3, 0xf4, 0xbc, 0x19, 0xc3, 0x65, 0x84, 0x9b, 0xb0, 0x72, 0x14, 0xfb,
+ 0xfc, 0x86, 0x96, 0x0b, 0x54, 0x06, 0x21, 0x82, 0x77, 0xc0, 0x14, 0xa3, 0x5e, 0xc7, 0xe9, 0x35,
+ 0x87, 0x66, 0xdd, 0xfd, 0xd5, 0x01, 0x72, 0x1a, 0xb3, 0x24, 0xbd, 0x12, 0x6a, 0xc0, 0x15, 0xf3,
+ 0x99, 0x62, 0xf8, 0x25, 0xc0, 0x48, 0xc4, 0xe3, 0x8b, 0x54, 0x31, 0x95, 0x39, 0x6a, 0xcc, 0x1d,
+ 0xed, 0x8b, 0x78, 0x7c, 0xaa, 0x09, 0x1b, 0xbc, 0x3e, 0xca, 0x01, 0x9d, 0x3c, 0x34, 0xc9, 0x8b,
+ 0xbe, 0x32, 0x48, 0x5b, 0x56, 0xda, 0x72, 0xd1, 0x97, 0x41, 0xba, 0xdf, 0x43, 0x2d, 0x77, 0xa0,
+ 0x2d, 0x6a, 0x07, 0x26, 0x67, 0x73, 0x68, 0xd6, 0xf8, 0x35, 0xd4, 0x22, 0xeb, 0xcc, 0x04, 0x6e,
+ 0xf4, 0x69, 0xee, 0x65, 0xd9, 0xb9, 0x8d, 0x3b, 0xd3, 0x77, 0xdf, 0x96, 0xa1, 0x3a, 0xe0, 0x69,
+ 0xca, 0x02, 0x8e, 0x2f, 0xc1, 0x53, 0xf3, 0x0e, 0x3f, 0xc9, 0x63, 0x58, 0xba, 0xd8, 0x63, 0x2d,
+ 0xc3, 0x0d, 0x70, 0x95, 0x58, 0xa8, 0xc4, 0x55, 0x42, 0x97, 0x31, 0x96, 0x62, 0xa9, 0x0c, 0x8d,
+ 0xcc, 0x0a, 0xf4, 0x96, 0x0b, 0xc4, 0x36, 0x54, 0x27, 0x22, 0x30, 0x17, 0xb6, 0x52, 0x20, 0x73,
+ 0x70, 0xde, 0xb6, 0xca, 0xc3, 0xb6, 0xbd, 0x84, 0x2a, 0x8f, 0x95, 0x0c, 0x79, 0x4a, 0xab, 0x9d,
+ 0x72, 0xaf, 0xd1, 0x5f, 0x5d, 0x98, 0x8c, 0x3c, 0x94, 0xd5, 0xe0, 0x16, 0x54, 0x46, 0x22, 0x8a,
+ 0x42, 0x45, 0x6b, 0x85, 0x58, 0x16, 0xc3, 0x3e, 0xd4, 0x52, 0xdb, 0x31, 0x5a, 0x37, 0x9d, 0x24,
+ 0xcb, 0x9d, 0xcc, 0x3b, 0x98, 0xeb, 0x74, 0x44, 0xc9, 0x7f, 0xe4, 0x23, 0x45, 0xa1, 0xe3, 0xf4,
+ 0x6a, 0x79, 0xc4, 0x0c, 0xc3, 0x4f, 0x00, 0xb2, 0xd5, 0x61, 0x18, 0x2b, 0xda, 0x28, 0xe4, 0x2c,
+ 0xe0, 0x48, 0xa1, 0x3a, 0x12, 0xb1, 0xe2, 0x37, 0x8a, 0x36, 0xcd, 0xc5, 0xe6, 0xdb, 0xee, 0x0f,
+ 0x50, 0x3f, 0x64, 0xd2, 0xcf, 0xc6, 0x27, 0xef, 0xa0, 0xf3, 0xa0, 0x83, 0x14, 0xbc, 0x6b, 0xa1,
+ 0xf8, 0xe2, 0xbc, 0x6b, 0xa4, 0x50, 0x70, 0xf9, 0x61, 0xc1, 0xdd, 0x3f, 0x1d, 0xa8, 0xcf, 0xe6,
+ 0x15, 0x9f, 0x42, 0x45, 0x9f, 0x91, 0x29, 0x75, 0x3a, 0xe5, 0x9e, 0x37, 0xb4, 0x3b, 0xdc, 0x84,
+ 0xda, 0x84, 0x33, 0x19, 0x6b, 0xc6, 0x35, 0xcc, 0x6c, 0x8f, 0x2f, 0x60, 0x2d, 0x53, 0x5d, 0x88,
+ 0xa9, 0x0a, 0x44, 0x18, 0x07, 0xb4, 0x6c, 0x24, 0xad, 0x0c, 0xfe, 0xd6, 0xa2, 0xf8, 0x1c, 0x56,
+ 0xf3, 0x43, 0x17, 0xb1, 0xae, 0xd4, 0x33, 0xb2, 0x66, 0x0e, 0x9e, 0xf0, 0x1b, 0x85, 0xcf, 0x01,
+ 0xd8, 0x54, 0x89, 0x8b, 0x09, 0x67, 0xd7, 0xdc, 0x0c, 0x43, 0xde, 0xd0, 0xba, 0xc6, 0x8f, 0x35,
+ 0xdc, 0x7d, 0xeb, 0x00, 0x68, 0xd3, 0xfb, 0x57, 0x2c, 0x0e, 0xf4, 0x47, 0xe5, 0x86, 0xbe, 0xed,
+ 0x09, 0x68, 0xed, 0xfd, 0xfb, 0x67, 0xee, 0xd1, 0xc1, 0xd0, 0x0d, 0x7d, 0xfc, 0xc2, 0x8e, 0xb4,
+ 0x6b, 0x46, 0xfa, 0x69, 0xf1, 0x13, 0xcd, 0x4e, 0x3f, 0x98, 0xea, 0x17, 0x50, 0x8d, 0x85, 0xcf,
+ 0x2f, 0x42, 0xdf, 0x36, 0xac, 0x65, 0x43, 0x56, 0x4e, 0x84, 0xcf, 0x8f, 0x0e, 0x86, 0x15, 0x4d,
+ 0x1f, 0xf9, 0xc5, 0x3b, 0xf3, 0x16, 0xef, 0x2c, 0x02, 0x32, 0x4f, 0x70, 0x1a, 0xc6, 0xc1, 0x84,
+ 0xcf, 0x8c, 0x38, 0xff, 0xc5, 0x88, 0xfb, 0x31, 0x23, 0xdd, 0x3f, 0x1c, 0x68, 0xce, 0xe3, 0x9c,
+ 0xf7, 0x71, 0x0f, 0x40, 0x49, 0x16, 0xa7, 0xa1, 0x0a, 0x45, 0x6c, 0x33, 0x6e, 0x3d, 0x92, 0x71,
+ 0xa6, 0xc9, 0x27, 0x72, 0x7e, 0x0a, 0xbf, 0x82, 0xea, 0xc8, 0xa8, 0xb2, 0x1b, 0x2f, 0x3c, 0x29,
+ 0xcb, 0xa5, 0xe5, 0x5f, 0x98, 0x95, 0x17, 0xfb, 0x52, 0x5e, 0xe8, 0xcb, 0xf6, 0x21, 0xd4, 0x67,
+ 0xaf, 0x35, 0xae, 0x41, 0xc3, 0x6c, 0x4e, 0x84, 0x8c, 0xd8, 0x84, 0x94, 0xf0, 0x09, 0xac, 0x19,
+ 0x60, 0x1e, 0x9f, 0x38, 0xf8, 0x3f, 0x58, 0x5f, 0x02, 0xcf, 0xfb, 0xc4, 0xdd, 0xfe, 0xcb, 0x85,
+ 0x46, 0xe1, 0x59, 0x42, 0x80, 0xca, 0x20, 0x0d, 0x0e, 0xa7, 0x09, 0x29, 0x61, 0x03, 0xaa, 0x83,
+ 0x34, 0xd8, 0xe3, 0x4c, 0x11, 0xc7, 0x6e, 0x5e, 0x4b, 0x91, 0x10, 0xd7, 0xaa, 0x76, 0x93, 0x84,
+ 0x94, 0xb1, 0x05, 0x90, 0xad, 0x87, 0x3c, 0x4d, 0x88, 0x67, 0x85, 0xe7, 0x42, 0x71, 0xb2, 0xa2,
+ 0xbd, 0xd9, 0x8d, 0x61, 0x2b, 0x96, 0xd5, 0x4f, 0x00, 0xa9, 0x22, 0x81, 0xa6, 0x4e, 0xc6, 0x99,
+ 0x54, 0x97, 0x3a, 0x4b, 0x0d, 0x37, 0x80, 0x14, 0x11, 0x73, 0xa8, 0x8e, 0x08, 0xad, 0x41, 0x1a,
+ 0xbc, 0x89, 0x25, 0x67, 0xa3, 0x2b, 0x76, 0x39, 0xe1, 0x04, 0x70, 0x1d, 0x56, 0x6d, 0x20, 0xfd,
+ 0xc5, 0x4d, 0x53, 0xd2, 0xb0, 0xb2, 0xfd, 0x2b, 0x3e, 0xfa, 0xe9, 0xbb, 0xa9, 0x90, 0xd3, 0x88,
+ 0x34, 0x75, 0xd9, 0x83, 0x34, 0x30, 0x17, 0x34, 0xe6, 0xf2, 0x98, 0x33, 0x9f, 0x4b, 0xb2, 0x6a,
+ 0x4f, 0x9f, 0x85, 0x11, 0x17, 0x53, 0x75, 0x22, 0x7e, 0x26, 0x2d, 0x6b, 0x66, 0xc8, 0x99, 0x6f,
+ 0x7e, 0x61, 0x64, 0xcd, 0x9a, 0x99, 0x21, 0xc6, 0x0c, 0xb1, 0xf5, 0xbe, 0x96, 0xdc, 0x94, 0xb8,
+ 0x6e, 0xb3, 0xda, 0xbd, 0xd1, 0xe0, 0xf6, 0x6f, 0x0e, 0x6c, 0x3c, 0x36, 0x1e, 0xb8, 0x05, 0xf4,
+ 0x31, 0x7c, 0x77, 0xaa, 0x04, 0x29, 0xe1, 0xa7, 0xf0, 0xff, 0xc7, 0xd8, 0x6f, 0x44, 0x18, 0xab,
+ 0xa3, 0x28, 0x99, 0x84, 0xa3, 0x50, 0x5f, 0xc5, 0xc7, 0x64, 0xaf, 0x6e, 0xac, 0xcc, 0xdd, 0xbe,
+ 0x85, 0xd6, 0xe2, 0x47, 0xa1, 0x9b, 0x31, 0x47, 0x76, 0x7d, 0x5f, 0x8f, 0x3f, 0x29, 0x21, 0x2d,
+ 0x9a, 0x1d, 0xf2, 0x48, 0x5c, 0x73, 0xc3, 0x38, 0x8b, 0xcc, 0x9b, 0xc4, 0x67, 0x2a, 0x63, 0xdc,
+ 0xc5, 0x42, 0x76, 0x7d, 0xff, 0x38, 0x7b, 0x7b, 0x0c, 0x5b, 0xde, 0xa3, 0x77, 0x1f, 0xda, 0xa5,
+ 0x77, 0x1f, 0xda, 0xa5, 0xbb, 0xfb, 0xb6, 0xf3, 0xee, 0xbe, 0xed, 0xfc, 0x7d, 0xdf, 0x76, 0x7e,
+ 0xff, 0xa7, 0x5d, 0xfa, 0x37, 0x00, 0x00, 0xff, 0xff, 0x87, 0x11, 0x6d, 0xd6, 0xaf, 0x08, 0x00,
+ 0x00,
}
diff --git a/vendor/go.etcd.io/etcd/raft/raftpb/raft.proto b/vendor/go.etcd.io/etcd/raft/raftpb/raft.proto
index 644ce7b..23d62ec 100644
--- a/vendor/go.etcd.io/etcd/raft/raftpb/raft.proto
+++ b/vendor/go.etcd.io/etcd/raft/raftpb/raft.proto
@@ -10,8 +10,9 @@
option (gogoproto.goproto_enum_prefix_all) = false;
enum EntryType {
- EntryNormal = 0;
- EntryConfChange = 1;
+ EntryNormal = 0;
+ EntryConfChange = 1; // corresponds to pb.ConfChange
+ EntryConfChangeV2 = 2; // corresponds to pb.ConfChangeV2
}
message Entry {
@@ -75,9 +76,41 @@
optional uint64 commit = 3 [(gogoproto.nullable) = false];
}
+// ConfChangeTransition specifies the behavior of a configuration change with
+// respect to joint consensus.
+enum ConfChangeTransition {
+ // Automatically use the simple protocol if possible, otherwise fall back
+ // to ConfChangeJointImplicit. Most applications will want to use this.
+ ConfChangeTransitionAuto = 0;
+ // Use joint consensus unconditionally, and transition out of them
+ // automatically (by proposing a zero configuration change).
+ //
+ // This option is suitable for applications that want to minimize the time
+ // spent in the joint configuration and do not store the joint configuration
+ // in the state machine (outside of InitialState).
+ ConfChangeTransitionJointImplicit = 1;
+ // Use joint consensus and remain in the joint configuration until the
+ // application proposes a no-op configuration change. This is suitable for
+ // applications that want to explicitly control the transitions, for example
+ // to use a custom payload (via the Context field).
+ ConfChangeTransitionJointExplicit = 2;
+}
+
message ConfState {
- repeated uint64 nodes = 1;
- repeated uint64 learners = 2;
+ // The voters in the incoming config. (If the configuration is not joint,
+ // then the outgoing config is empty).
+ repeated uint64 voters = 1;
+ // The learners in the incoming config.
+ repeated uint64 learners = 2;
+ // The voters in the outgoing config.
+ repeated uint64 voters_outgoing = 3;
+ // The nodes that will become learners when the outgoing config is removed.
+ // These nodes are necessarily currently in nodes_joint (or they would have
+ // been added to the incoming config right away).
+ repeated uint64 learners_next = 4;
+ // If set, the config is joint and Raft will automatically transition into
+ // the final config (i.e. remove the outgoing config) when this is safe.
+ optional bool auto_leave = 5 [(gogoproto.nullable) = false];
}
enum ConfChangeType {
@@ -88,8 +121,57 @@
}
message ConfChange {
- optional uint64 ID = 1 [(gogoproto.nullable) = false];
- optional ConfChangeType Type = 2 [(gogoproto.nullable) = false];
- optional uint64 NodeID = 3 [(gogoproto.nullable) = false];
- optional bytes Context = 4;
+ optional ConfChangeType type = 2 [(gogoproto.nullable) = false];
+ optional uint64 node_id = 3 [(gogoproto.nullable) = false, (gogoproto.customname) = "NodeID" ];
+ optional bytes context = 4;
+
+ // NB: this is used only by etcd to thread through a unique identifier.
+ // Ideally it should really use the Context instead. No counterpart to
+ // this field exists in ConfChangeV2.
+ optional uint64 id = 1 [(gogoproto.nullable) = false, (gogoproto.customname) = "ID" ];
+}
+
+// ConfChangeSingle is an individual configuration change operation. Multiple
+// such operations can be carried out atomically via a ConfChangeV2.
+message ConfChangeSingle {
+ optional ConfChangeType type = 1 [(gogoproto.nullable) = false];
+ optional uint64 node_id = 2 [(gogoproto.nullable) = false, (gogoproto.customname) = "NodeID"];
+}
+
+// ConfChangeV2 messages initiate configuration changes. They support both the
+// simple "one at a time" membership change protocol and full Joint Consensus
+// allowing for arbitrary changes in membership.
+//
+// The supplied context is treated as an opaque payload and can be used to
+// attach an action on the state machine to the application of the config change
+// proposal. Note that contrary to Joint Consensus as outlined in the Raft
+// paper[1], configuration changes become active when they are *applied* to the
+// state machine (not when they are appended to the log).
+//
+// The simple protocol can be used whenever only a single change is made.
+//
+// Non-simple changes require the use of Joint Consensus, for which two
+// configuration changes are run. The first configuration change specifies the
+// desired changes and transitions the Raft group into the joint configuration,
+// in which quorum requires a majority of both the pre-changes and post-changes
+// configuration. Joint Consensus avoids entering fragile intermediate
+// configurations that could compromise survivability. For example, without the
+// use of Joint Consensus and running across three availability zones with a
+// replication factor of three, it is not possible to replace a voter without
+// entering an intermediate configuration that does not survive the outage of
+// one availability zone.
+//
+// The provided ConfChangeTransition specifies how (and whether) Joint Consensus
+// is used, and assigns the task of leaving the joint configuration either to
+// Raft or the application. Leaving the joint configuration is accomplished by
+// proposing a ConfChangeV2 with only and optionally the Context field
+// populated.
+//
+// For details on Raft membership changes, see:
+//
+// [1]: https://github.com/ongardie/dissertation/blob/master/online-trim.pdf
+message ConfChangeV2 {
+ optional ConfChangeTransition transition = 1 [(gogoproto.nullable) = false];
+ repeated ConfChangeSingle changes = 2 [(gogoproto.nullable) = false];
+ optional bytes context = 3;
}
diff --git a/vendor/go.etcd.io/etcd/raft/rawnode.go b/vendor/go.etcd.io/etcd/raft/rawnode.go
index d7a272d..90eb694 100644
--- a/vendor/go.etcd.io/etcd/raft/rawnode.go
+++ b/vendor/go.etcd.io/etcd/raft/rawnode.go
@@ -18,6 +18,7 @@
"errors"
pb "go.etcd.io/etcd/raft/raftpb"
+ "go.etcd.io/etcd/raft/tracker"
)
// ErrStepLocalMsg is returned when try to step a local raft message
@@ -36,82 +37,20 @@
prevHardSt pb.HardState
}
-func (rn *RawNode) newReady() Ready {
- return newReady(rn.raft, rn.prevSoftSt, rn.prevHardSt)
-}
-
-func (rn *RawNode) commitReady(rd Ready) {
- if rd.SoftState != nil {
- rn.prevSoftSt = rd.SoftState
- }
- if !IsEmptyHardState(rd.HardState) {
- rn.prevHardSt = rd.HardState
- }
-
- // If entries were applied (or a snapshot), update our cursor for
- // the next Ready. Note that if the current HardState contains a
- // new Commit index, this does not mean that we're also applying
- // all of the new entries due to commit pagination by size.
- if index := rd.appliedCursor(); index > 0 {
- rn.raft.raftLog.appliedTo(index)
- }
-
- if len(rd.Entries) > 0 {
- e := rd.Entries[len(rd.Entries)-1]
- rn.raft.raftLog.stableTo(e.Index, e.Term)
- }
- if !IsEmptySnap(rd.Snapshot) {
- rn.raft.raftLog.stableSnapTo(rd.Snapshot.Metadata.Index)
- }
- if len(rd.ReadStates) != 0 {
- rn.raft.readStates = nil
- }
-}
-
-// NewRawNode returns a new RawNode given configuration and a list of raft peers.
-func NewRawNode(config *Config, peers []Peer) (*RawNode, error) {
- if config.ID == 0 {
- panic("config.ID must not be zero")
- }
+// NewRawNode instantiates a RawNode from the given configuration.
+//
+// See Bootstrap() for bootstrapping an initial state; this replaces the former
+// 'peers' argument to this method (with identical behavior). However, It is
+// recommended that instead of calling Bootstrap, applications bootstrap their
+// state manually by setting up a Storage that has a first index > 1 and which
+// stores the desired ConfState as its InitialState.
+func NewRawNode(config *Config) (*RawNode, error) {
r := newRaft(config)
rn := &RawNode{
raft: r,
}
- lastIndex, err := config.Storage.LastIndex()
- if err != nil {
- panic(err) // TODO(bdarnell)
- }
- // If the log is empty, this is a new RawNode (like StartNode); otherwise it's
- // restoring an existing RawNode (like RestartNode).
- // TODO(bdarnell): rethink RawNode initialization and whether the application needs
- // to be able to tell us when it expects the RawNode to exist.
- if lastIndex == 0 {
- r.becomeFollower(1, None)
- ents := make([]pb.Entry, len(peers))
- for i, peer := range peers {
- cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context}
- data, err := cc.Marshal()
- if err != nil {
- panic("unexpected marshal error")
- }
-
- ents[i] = pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: uint64(i + 1), Data: data}
- }
- r.raftLog.append(ents...)
- r.raftLog.committed = uint64(len(ents))
- for _, peer := range peers {
- r.addNode(peer.ID)
- }
- }
-
- // Set the initial hard and soft states after performing all initialization.
rn.prevSoftSt = r.softState()
- if lastIndex == 0 {
- rn.prevHardSt = emptyState
- } else {
- rn.prevHardSt = r.hardState()
- }
-
+ rn.prevHardSt = r.hardState()
return rn, nil
}
@@ -149,37 +88,20 @@
}})
}
-// ProposeConfChange proposes a config change.
-func (rn *RawNode) ProposeConfChange(cc pb.ConfChange) error {
- data, err := cc.Marshal()
+// ProposeConfChange proposes a config change. See (Node).ProposeConfChange for
+// details.
+func (rn *RawNode) ProposeConfChange(cc pb.ConfChangeI) error {
+ m, err := confChangeToMsg(cc)
if err != nil {
return err
}
- return rn.raft.Step(pb.Message{
- Type: pb.MsgProp,
- Entries: []pb.Entry{
- {Type: pb.EntryConfChange, Data: data},
- },
- })
+ return rn.raft.Step(m)
}
// ApplyConfChange applies a config change to the local node.
-func (rn *RawNode) ApplyConfChange(cc pb.ConfChange) *pb.ConfState {
- if cc.NodeID == None {
- return &pb.ConfState{Nodes: rn.raft.nodes(), Learners: rn.raft.learnerNodes()}
- }
- switch cc.Type {
- case pb.ConfChangeAddNode:
- rn.raft.addNode(cc.NodeID)
- case pb.ConfChangeAddLearnerNode:
- rn.raft.addLearner(cc.NodeID)
- case pb.ConfChangeRemoveNode:
- rn.raft.removeNode(cc.NodeID)
- case pb.ConfChangeUpdateNode:
- default:
- panic("unexpected conf type")
- }
- return &pb.ConfState{Nodes: rn.raft.nodes(), Learners: rn.raft.learnerNodes()}
+func (rn *RawNode) ApplyConfChange(cc pb.ConfChangeI) *pb.ConfState {
+ cs := rn.raft.applyConfChange(cc.AsV2())
+ return &cs
}
// Step advances the state machine using the given message.
@@ -188,20 +110,41 @@
if IsLocalMsg(m.Type) {
return ErrStepLocalMsg
}
- if pr := rn.raft.getProgress(m.From); pr != nil || !IsResponseMsg(m.Type) {
+ if pr := rn.raft.prs.Progress[m.From]; pr != nil || !IsResponseMsg(m.Type) {
return rn.raft.Step(m)
}
return ErrStepPeerNotFound
}
-// Ready returns the current point-in-time state of this RawNode.
+// Ready returns the outstanding work that the application needs to handle. This
+// includes appending and applying entries or a snapshot, updating the HardState,
+// and sending messages. The returned Ready() *must* be handled and subsequently
+// passed back via Advance().
func (rn *RawNode) Ready() Ready {
- rd := rn.newReady()
- rn.raft.msgs = nil
- rn.raft.reduceUncommittedSize(rd.CommittedEntries)
+ rd := rn.readyWithoutAccept()
+ rn.acceptReady(rd)
return rd
}
+// readyWithoutAccept returns a Ready. This is a read-only operation, i.e. there
+// is no obligation that the Ready must be handled.
+func (rn *RawNode) readyWithoutAccept() Ready {
+ return newReady(rn.raft, rn.prevSoftSt, rn.prevHardSt)
+}
+
+// acceptReady is called when the consumer of the RawNode has decided to go
+// ahead and handle a Ready. Nothing must alter the state of the RawNode between
+// this call and the prior call to Ready().
+func (rn *RawNode) acceptReady(rd Ready) {
+ if rd.SoftState != nil {
+ rn.prevSoftSt = rd.SoftState
+ }
+ if len(rd.ReadStates) != 0 {
+ rn.raft.readStates = nil
+ }
+ rn.raft.msgs = nil
+}
+
// HasReady called when RawNode user need to check if any Ready pending.
// Checking logic in this method should be consistent with Ready.containsUpdates().
func (rn *RawNode) HasReady() bool {
@@ -227,21 +170,23 @@
// Advance notifies the RawNode that the application has applied and saved progress in the
// last Ready results.
func (rn *RawNode) Advance(rd Ready) {
- rn.commitReady(rd)
+ if !IsEmptyHardState(rd.HardState) {
+ rn.prevHardSt = rd.HardState
+ }
+ rn.raft.advance(rd)
}
-// Status returns the current status of the given group.
-func (rn *RawNode) Status() *Status {
+// Status returns the current status of the given group. This allocates, see
+// BasicStatus and WithProgress for allocation-friendlier choices.
+func (rn *RawNode) Status() Status {
status := getStatus(rn.raft)
- return &status
+ return status
}
-// StatusWithoutProgress returns a Status without populating the Progress field
-// (and returns the Status as a value to avoid forcing it onto the heap). This
-// is more performant if the Progress is not required. See WithProgress for an
-// allocation-free way to introspect the Progress.
-func (rn *RawNode) StatusWithoutProgress() Status {
- return getStatusWithoutProgress(rn.raft)
+// BasicStatus returns a BasicStatus. Notably this does not contain the
+// Progress map; see WithProgress for an allocation-free way to inspect it.
+func (rn *RawNode) BasicStatus() BasicStatus {
+ return getBasicStatus(rn.raft)
}
// ProgressType indicates the type of replica a Progress corresponds to.
@@ -256,17 +201,16 @@
// WithProgress is a helper to introspect the Progress for this node and its
// peers.
-func (rn *RawNode) WithProgress(visitor func(id uint64, typ ProgressType, pr Progress)) {
- for id, pr := range rn.raft.prs {
- pr := *pr
- pr.ins = nil
- visitor(id, ProgressTypePeer, pr)
- }
- for id, pr := range rn.raft.learnerPrs {
- pr := *pr
- pr.ins = nil
- visitor(id, ProgressTypeLearner, pr)
- }
+func (rn *RawNode) WithProgress(visitor func(id uint64, typ ProgressType, pr tracker.Progress)) {
+ rn.raft.prs.Visit(func(id uint64, pr *tracker.Progress) {
+ typ := ProgressTypePeer
+ if pr.IsLearner {
+ typ = ProgressTypeLearner
+ }
+ p := *pr
+ p.Inflights = nil
+ visitor(id, typ, p)
+ })
}
// ReportUnreachable reports the given node is not reachable for the last send.
diff --git a/vendor/go.etcd.io/etcd/raft/read_only.go b/vendor/go.etcd.io/etcd/raft/read_only.go
index aecc6b2..6987f1b 100644
--- a/vendor/go.etcd.io/etcd/raft/read_only.go
+++ b/vendor/go.etcd.io/etcd/raft/read_only.go
@@ -29,7 +29,11 @@
type readIndexStatus struct {
req pb.Message
index uint64
- acks map[uint64]struct{}
+ // NB: this never records 'false', but it's more convenient to use this
+ // instead of a map[uint64]struct{} due to the API of quorum.VoteResult. If
+ // this becomes performance sensitive enough (doubtful), quorum.VoteResult
+ // can change to an API that is closer to that of CommittedIndex.
+ acks map[uint64]bool
}
type readOnly struct {
@@ -50,26 +54,25 @@
// the read only request.
// `m` is the original read only request message from the local or remote node.
func (ro *readOnly) addRequest(index uint64, m pb.Message) {
- ctx := string(m.Entries[0].Data)
- if _, ok := ro.pendingReadIndex[ctx]; ok {
+ s := string(m.Entries[0].Data)
+ if _, ok := ro.pendingReadIndex[s]; ok {
return
}
- ro.pendingReadIndex[ctx] = &readIndexStatus{index: index, req: m, acks: make(map[uint64]struct{})}
- ro.readIndexQueue = append(ro.readIndexQueue, ctx)
+ ro.pendingReadIndex[s] = &readIndexStatus{index: index, req: m, acks: make(map[uint64]bool)}
+ ro.readIndexQueue = append(ro.readIndexQueue, s)
}
// recvAck notifies the readonly struct that the raft state machine received
// an acknowledgment of the heartbeat that attached with the read only request
// context.
-func (ro *readOnly) recvAck(m pb.Message) int {
- rs, ok := ro.pendingReadIndex[string(m.Context)]
+func (ro *readOnly) recvAck(id uint64, context []byte) map[uint64]bool {
+ rs, ok := ro.pendingReadIndex[string(context)]
if !ok {
- return 0
+ return nil
}
- rs.acks[m.From] = struct{}{}
- // add one to include an ack from local node
- return len(rs.acks) + 1
+ rs.acks[id] = true
+ return rs.acks
}
// advance advances the read only request queue kept by the readonly struct.
diff --git a/vendor/go.etcd.io/etcd/raft/status.go b/vendor/go.etcd.io/etcd/raft/status.go
index 9feca7c..adc6048 100644
--- a/vendor/go.etcd.io/etcd/raft/status.go
+++ b/vendor/go.etcd.io/etcd/raft/status.go
@@ -18,34 +18,44 @@
"fmt"
pb "go.etcd.io/etcd/raft/raftpb"
+ "go.etcd.io/etcd/raft/tracker"
)
+// Status contains information about this Raft peer and its view of the system.
+// The Progress is only populated on the leader.
type Status struct {
+ BasicStatus
+ Config tracker.Config
+ Progress map[uint64]tracker.Progress
+}
+
+// BasicStatus contains basic information about the Raft peer. It does not allocate.
+type BasicStatus struct {
ID uint64
pb.HardState
SoftState
- Applied uint64
- Progress map[uint64]Progress
+ Applied uint64
LeadTransferee uint64
}
-func getProgressCopy(r *raft) map[uint64]Progress {
- prs := make(map[uint64]Progress)
- for id, p := range r.prs {
- prs[id] = *p
- }
+func getProgressCopy(r *raft) map[uint64]tracker.Progress {
+ m := make(map[uint64]tracker.Progress)
+ r.prs.Visit(func(id uint64, pr *tracker.Progress) {
+ var p tracker.Progress
+ p = *pr
+ p.Inflights = pr.Inflights.Clone()
+ pr = nil
- for id, p := range r.learnerPrs {
- prs[id] = *p
- }
- return prs
+ m[id] = p
+ })
+ return m
}
-func getStatusWithoutProgress(r *raft) Status {
- s := Status{
+func getBasicStatus(r *raft) BasicStatus {
+ s := BasicStatus{
ID: r.id,
LeadTransferee: r.leadTransferee,
}
@@ -57,10 +67,12 @@
// getStatus gets a copy of the current raft status.
func getStatus(r *raft) Status {
- s := getStatusWithoutProgress(r)
+ var s Status
+ s.BasicStatus = getBasicStatus(r)
if s.RaftState == StateLeader {
s.Progress = getProgressCopy(r)
}
+ s.Config = r.prs.Config.Clone()
return s
}
diff --git a/vendor/go.etcd.io/etcd/raft/storage.go b/vendor/go.etcd.io/etcd/raft/storage.go
index 14ad686..6be5745 100644
--- a/vendor/go.etcd.io/etcd/raft/storage.go
+++ b/vendor/go.etcd.io/etcd/raft/storage.go
@@ -44,6 +44,8 @@
// become inoperable and refuse to participate in elections; the
// application is responsible for cleanup and recovery in this case.
type Storage interface {
+ // TODO(tbg): split this into two interfaces, LogStorage and StateStorage.
+
// InitialState returns the saved HardState and ConfState information.
InitialState() (pb.HardState, pb.ConfState, error)
// Entries returns a slice of log entries in the range [lo,hi).
diff --git a/vendor/go.etcd.io/etcd/raft/tracker/inflights.go b/vendor/go.etcd.io/etcd/raft/tracker/inflights.go
new file mode 100644
index 0000000..1a05634
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/raft/tracker/inflights.go
@@ -0,0 +1,132 @@
+// Copyright 2019 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package tracker
+
+// Inflights limits the number of MsgApp (represented by the largest index
+// contained within) sent to followers but not yet acknowledged by them. Callers
+// use Full() to check whether more messages can be sent, call Add() whenever
+// they are sending a new append, and release "quota" via FreeLE() whenever an
+// ack is received.
+type Inflights struct {
+ // the starting index in the buffer
+ start int
+ // number of inflights in the buffer
+ count int
+
+ // the size of the buffer
+ size int
+
+ // buffer contains the index of the last entry
+ // inside one message.
+ buffer []uint64
+}
+
+// NewInflights sets up an Inflights that allows up to 'size' inflight messages.
+func NewInflights(size int) *Inflights {
+ return &Inflights{
+ size: size,
+ }
+}
+
+// Clone returns an *Inflights that is identical to but shares no memory with
+// the receiver.
+func (in *Inflights) Clone() *Inflights {
+ ins := *in
+ ins.buffer = append([]uint64(nil), in.buffer...)
+ return &ins
+}
+
+// Add notifies the Inflights that a new message with the given index is being
+// dispatched. Full() must be called prior to Add() to verify that there is room
+// for one more message, and consecutive calls to add Add() must provide a
+// monotonic sequence of indexes.
+func (in *Inflights) Add(inflight uint64) {
+ if in.Full() {
+ panic("cannot add into a Full inflights")
+ }
+ next := in.start + in.count
+ size := in.size
+ if next >= size {
+ next -= size
+ }
+ if next >= len(in.buffer) {
+ in.grow()
+ }
+ in.buffer[next] = inflight
+ in.count++
+}
+
+// grow the inflight buffer by doubling up to inflights.size. We grow on demand
+// instead of preallocating to inflights.size to handle systems which have
+// thousands of Raft groups per process.
+func (in *Inflights) grow() {
+ newSize := len(in.buffer) * 2
+ if newSize == 0 {
+ newSize = 1
+ } else if newSize > in.size {
+ newSize = in.size
+ }
+ newBuffer := make([]uint64, newSize)
+ copy(newBuffer, in.buffer)
+ in.buffer = newBuffer
+}
+
+// FreeLE frees the inflights smaller or equal to the given `to` flight.
+func (in *Inflights) FreeLE(to uint64) {
+ if in.count == 0 || to < in.buffer[in.start] {
+ // out of the left side of the window
+ return
+ }
+
+ idx := in.start
+ var i int
+ for i = 0; i < in.count; i++ {
+ if to < in.buffer[idx] { // found the first large inflight
+ break
+ }
+
+ // increase index and maybe rotate
+ size := in.size
+ if idx++; idx >= size {
+ idx -= size
+ }
+ }
+ // free i inflights and set new start index
+ in.count -= i
+ in.start = idx
+ if in.count == 0 {
+ // inflights is empty, reset the start index so that we don't grow the
+ // buffer unnecessarily.
+ in.start = 0
+ }
+}
+
+// FreeFirstOne releases the first inflight. This is a no-op if nothing is
+// inflight.
+func (in *Inflights) FreeFirstOne() { in.FreeLE(in.buffer[in.start]) }
+
+// Full returns true if no more messages can be sent at the moment.
+func (in *Inflights) Full() bool {
+ return in.count == in.size
+}
+
+// Count returns the number of inflight messages.
+func (in *Inflights) Count() int { return in.count }
+
+// reset frees all inflights.
+func (in *Inflights) reset() {
+ in.count = 0
+ in.start = 0
+}
diff --git a/vendor/go.etcd.io/etcd/raft/tracker/progress.go b/vendor/go.etcd.io/etcd/raft/tracker/progress.go
new file mode 100644
index 0000000..62c81f4
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/raft/tracker/progress.go
@@ -0,0 +1,259 @@
+// Copyright 2019 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package tracker
+
+import (
+ "fmt"
+ "sort"
+ "strings"
+)
+
+// Progress represents a follower’s progress in the view of the leader. Leader
+// maintains progresses of all followers, and sends entries to the follower
+// based on its progress.
+//
+// NB(tbg): Progress is basically a state machine whose transitions are mostly
+// strewn around `*raft.raft`. Additionally, some fields are only used when in a
+// certain State. All of this isn't ideal.
+type Progress struct {
+ Match, Next uint64
+ // State defines how the leader should interact with the follower.
+ //
+ // When in StateProbe, leader sends at most one replication message
+ // per heartbeat interval. It also probes actual progress of the follower.
+ //
+ // When in StateReplicate, leader optimistically increases next
+ // to the latest entry sent after sending replication message. This is
+ // an optimized state for fast replicating log entries to the follower.
+ //
+ // When in StateSnapshot, leader should have sent out snapshot
+ // before and stops sending any replication message.
+ State StateType
+
+ // PendingSnapshot is used in StateSnapshot.
+ // If there is a pending snapshot, the pendingSnapshot will be set to the
+ // index of the snapshot. If pendingSnapshot is set, the replication process of
+ // this Progress will be paused. raft will not resend snapshot until the pending one
+ // is reported to be failed.
+ PendingSnapshot uint64
+
+ // RecentActive is true if the progress is recently active. Receiving any messages
+ // from the corresponding follower indicates the progress is active.
+ // RecentActive can be reset to false after an election timeout.
+ //
+ // TODO(tbg): the leader should always have this set to true.
+ RecentActive bool
+
+ // ProbeSent is used while this follower is in StateProbe. When ProbeSent is
+ // true, raft should pause sending replication message to this peer until
+ // ProbeSent is reset. See ProbeAcked() and IsPaused().
+ ProbeSent bool
+
+ // Inflights is a sliding window for the inflight messages.
+ // Each inflight message contains one or more log entries.
+ // The max number of entries per message is defined in raft config as MaxSizePerMsg.
+ // Thus inflight effectively limits both the number of inflight messages
+ // and the bandwidth each Progress can use.
+ // When inflights is Full, no more message should be sent.
+ // When a leader sends out a message, the index of the last
+ // entry should be added to inflights. The index MUST be added
+ // into inflights in order.
+ // When a leader receives a reply, the previous inflights should
+ // be freed by calling inflights.FreeLE with the index of the last
+ // received entry.
+ Inflights *Inflights
+
+ // IsLearner is true if this progress is tracked for a learner.
+ IsLearner bool
+}
+
+// ResetState moves the Progress into the specified State, resetting ProbeSent,
+// PendingSnapshot, and Inflights.
+func (pr *Progress) ResetState(state StateType) {
+ pr.ProbeSent = false
+ pr.PendingSnapshot = 0
+ pr.State = state
+ pr.Inflights.reset()
+}
+
+func max(a, b uint64) uint64 {
+ if a > b {
+ return a
+ }
+ return b
+}
+
+func min(a, b uint64) uint64 {
+ if a > b {
+ return b
+ }
+ return a
+}
+
+// ProbeAcked is called when this peer has accepted an append. It resets
+// ProbeSent to signal that additional append messages should be sent without
+// further delay.
+func (pr *Progress) ProbeAcked() {
+ pr.ProbeSent = false
+}
+
+// BecomeProbe transitions into StateProbe. Next is reset to Match+1 or,
+// optionally and if larger, the index of the pending snapshot.
+func (pr *Progress) BecomeProbe() {
+ // If the original state is StateSnapshot, progress knows that
+ // the pending snapshot has been sent to this peer successfully, then
+ // probes from pendingSnapshot + 1.
+ if pr.State == StateSnapshot {
+ pendingSnapshot := pr.PendingSnapshot
+ pr.ResetState(StateProbe)
+ pr.Next = max(pr.Match+1, pendingSnapshot+1)
+ } else {
+ pr.ResetState(StateProbe)
+ pr.Next = pr.Match + 1
+ }
+}
+
+// BecomeReplicate transitions into StateReplicate, resetting Next to Match+1.
+func (pr *Progress) BecomeReplicate() {
+ pr.ResetState(StateReplicate)
+ pr.Next = pr.Match + 1
+}
+
+// BecomeSnapshot moves the Progress to StateSnapshot with the specified pending
+// snapshot index.
+func (pr *Progress) BecomeSnapshot(snapshoti uint64) {
+ pr.ResetState(StateSnapshot)
+ pr.PendingSnapshot = snapshoti
+}
+
+// MaybeUpdate is called when an MsgAppResp arrives from the follower, with the
+// index acked by it. The method returns false if the given n index comes from
+// an outdated message. Otherwise it updates the progress and returns true.
+func (pr *Progress) MaybeUpdate(n uint64) bool {
+ var updated bool
+ if pr.Match < n {
+ pr.Match = n
+ updated = true
+ pr.ProbeAcked()
+ }
+ if pr.Next < n+1 {
+ pr.Next = n + 1
+ }
+ return updated
+}
+
+// OptimisticUpdate signals that appends all the way up to and including index n
+// are in-flight. As a result, Next is increased to n+1.
+func (pr *Progress) OptimisticUpdate(n uint64) { pr.Next = n + 1 }
+
+// MaybeDecrTo adjusts the Progress to the receipt of a MsgApp rejection. The
+// arguments are the index the follower rejected to append to its log, and its
+// last index.
+//
+// Rejections can happen spuriously as messages are sent out of order or
+// duplicated. In such cases, the rejection pertains to an index that the
+// Progress already knows were previously acknowledged, and false is returned
+// without changing the Progress.
+//
+// If the rejection is genuine, Next is lowered sensibly, and the Progress is
+// cleared for sending log entries.
+func (pr *Progress) MaybeDecrTo(rejected, last uint64) bool {
+ if pr.State == StateReplicate {
+ // The rejection must be stale if the progress has matched and "rejected"
+ // is smaller than "match".
+ if rejected <= pr.Match {
+ return false
+ }
+ // Directly decrease next to match + 1.
+ //
+ // TODO(tbg): why not use last if it's larger?
+ pr.Next = pr.Match + 1
+ return true
+ }
+
+ // The rejection must be stale if "rejected" does not match next - 1. This
+ // is because non-replicating followers are probed one entry at a time.
+ if pr.Next-1 != rejected {
+ return false
+ }
+
+ if pr.Next = min(rejected, last+1); pr.Next < 1 {
+ pr.Next = 1
+ }
+ pr.ProbeSent = false
+ return true
+}
+
+// IsPaused returns whether sending log entries to this node has been throttled.
+// This is done when a node has rejected recent MsgApps, is currently waiting
+// for a snapshot, or has reached the MaxInflightMsgs limit. In normal
+// operation, this is false. A throttled node will be contacted less frequently
+// until it has reached a state in which it's able to accept a steady stream of
+// log entries again.
+func (pr *Progress) IsPaused() bool {
+ switch pr.State {
+ case StateProbe:
+ return pr.ProbeSent
+ case StateReplicate:
+ return pr.Inflights.Full()
+ case StateSnapshot:
+ return true
+ default:
+ panic("unexpected state")
+ }
+}
+
+func (pr *Progress) String() string {
+ var buf strings.Builder
+ fmt.Fprintf(&buf, "%s match=%d next=%d", pr.State, pr.Match, pr.Next)
+ if pr.IsLearner {
+ fmt.Fprint(&buf, " learner")
+ }
+ if pr.IsPaused() {
+ fmt.Fprint(&buf, " paused")
+ }
+ if pr.PendingSnapshot > 0 {
+ fmt.Fprintf(&buf, " pendingSnap=%d", pr.PendingSnapshot)
+ }
+ if !pr.RecentActive {
+ fmt.Fprintf(&buf, " inactive")
+ }
+ if n := pr.Inflights.Count(); n > 0 {
+ fmt.Fprintf(&buf, " inflight=%d", n)
+ if pr.Inflights.Full() {
+ fmt.Fprint(&buf, "[full]")
+ }
+ }
+ return buf.String()
+}
+
+// ProgressMap is a map of *Progress.
+type ProgressMap map[uint64]*Progress
+
+// String prints the ProgressMap in sorted key order, one Progress per line.
+func (m ProgressMap) String() string {
+ ids := make([]uint64, 0, len(m))
+ for k := range m {
+ ids = append(ids, k)
+ }
+ sort.Slice(ids, func(i, j int) bool {
+ return ids[i] < ids[j]
+ })
+ var buf strings.Builder
+ for _, id := range ids {
+ fmt.Fprintf(&buf, "%d: %s\n", id, m[id])
+ }
+ return buf.String()
+}
diff --git a/vendor/go.etcd.io/etcd/raft/tracker/state.go b/vendor/go.etcd.io/etcd/raft/tracker/state.go
new file mode 100644
index 0000000..285b4b8
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/raft/tracker/state.go
@@ -0,0 +1,42 @@
+// Copyright 2019 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package tracker
+
+// StateType is the state of a tracked follower.
+type StateType uint64
+
+const (
+ // StateProbe indicates a follower whose last index isn't known. Such a
+ // follower is "probed" (i.e. an append sent periodically) to narrow down
+ // its last index. In the ideal (and common) case, only one round of probing
+ // is necessary as the follower will react with a hint. Followers that are
+ // probed over extended periods of time are often offline.
+ StateProbe StateType = iota
+ // StateReplicate is the state steady in which a follower eagerly receives
+ // log entries to append to its log.
+ StateReplicate
+ // StateSnapshot indicates a follower that needs log entries not available
+ // from the leader's Raft log. Such a follower needs a full snapshot to
+ // return to StateReplicate.
+ StateSnapshot
+)
+
+var prstmap = [...]string{
+ "StateProbe",
+ "StateReplicate",
+ "StateSnapshot",
+}
+
+func (st StateType) String() string { return prstmap[uint64(st)] }
diff --git a/vendor/go.etcd.io/etcd/raft/tracker/tracker.go b/vendor/go.etcd.io/etcd/raft/tracker/tracker.go
new file mode 100644
index 0000000..a458114
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/raft/tracker/tracker.go
@@ -0,0 +1,288 @@
+// Copyright 2019 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package tracker
+
+import (
+ "fmt"
+ "sort"
+ "strings"
+
+ "go.etcd.io/etcd/raft/quorum"
+ pb "go.etcd.io/etcd/raft/raftpb"
+)
+
+// Config reflects the configuration tracked in a ProgressTracker.
+type Config struct {
+ Voters quorum.JointConfig
+ // AutoLeave is true if the configuration is joint and a transition to the
+ // incoming configuration should be carried out automatically by Raft when
+ // this is possible. If false, the configuration will be joint until the
+ // application initiates the transition manually.
+ AutoLeave bool
+ // Learners is a set of IDs corresponding to the learners active in the
+ // current configuration.
+ //
+ // Invariant: Learners and Voters does not intersect, i.e. if a peer is in
+ // either half of the joint config, it can't be a learner; if it is a
+ // learner it can't be in either half of the joint config. This invariant
+ // simplifies the implementation since it allows peers to have clarity about
+ // its current role without taking into account joint consensus.
+ Learners map[uint64]struct{}
+ // When we turn a voter into a learner during a joint consensus transition,
+ // we cannot add the learner directly when entering the joint state. This is
+ // because this would violate the invariant that the intersection of
+ // voters and learners is empty. For example, assume a Voter is removed and
+ // immediately re-added as a learner (or in other words, it is demoted):
+ //
+ // Initially, the configuration will be
+ //
+ // voters: {1 2 3}
+ // learners: {}
+ //
+ // and we want to demote 3. Entering the joint configuration, we naively get
+ //
+ // voters: {1 2} & {1 2 3}
+ // learners: {3}
+ //
+ // but this violates the invariant (3 is both voter and learner). Instead,
+ // we get
+ //
+ // voters: {1 2} & {1 2 3}
+ // learners: {}
+ // next_learners: {3}
+ //
+ // Where 3 is now still purely a voter, but we are remembering the intention
+ // to make it a learner upon transitioning into the final configuration:
+ //
+ // voters: {1 2}
+ // learners: {3}
+ // next_learners: {}
+ //
+ // Note that next_learners is not used while adding a learner that is not
+ // also a voter in the joint config. In this case, the learner is added
+ // right away when entering the joint configuration, so that it is caught up
+ // as soon as possible.
+ LearnersNext map[uint64]struct{}
+}
+
+func (c Config) String() string {
+ var buf strings.Builder
+ fmt.Fprintf(&buf, "voters=%s", c.Voters)
+ if c.Learners != nil {
+ fmt.Fprintf(&buf, " learners=%s", quorum.MajorityConfig(c.Learners).String())
+ }
+ if c.LearnersNext != nil {
+ fmt.Fprintf(&buf, " learners_next=%s", quorum.MajorityConfig(c.LearnersNext).String())
+ }
+ if c.AutoLeave {
+ fmt.Fprintf(&buf, " autoleave")
+ }
+ return buf.String()
+}
+
+// Clone returns a copy of the Config that shares no memory with the original.
+func (c *Config) Clone() Config {
+ clone := func(m map[uint64]struct{}) map[uint64]struct{} {
+ if m == nil {
+ return nil
+ }
+ mm := make(map[uint64]struct{}, len(m))
+ for k := range m {
+ mm[k] = struct{}{}
+ }
+ return mm
+ }
+ return Config{
+ Voters: quorum.JointConfig{clone(c.Voters[0]), clone(c.Voters[1])},
+ Learners: clone(c.Learners),
+ LearnersNext: clone(c.LearnersNext),
+ }
+}
+
+// ProgressTracker tracks the currently active configuration and the information
+// known about the nodes and learners in it. In particular, it tracks the match
+// index for each peer which in turn allows reasoning about the committed index.
+type ProgressTracker struct {
+ Config
+
+ Progress ProgressMap
+
+ Votes map[uint64]bool
+
+ MaxInflight int
+}
+
+// MakeProgressTracker initializes a ProgressTracker.
+func MakeProgressTracker(maxInflight int) ProgressTracker {
+ p := ProgressTracker{
+ MaxInflight: maxInflight,
+ Config: Config{
+ Voters: quorum.JointConfig{
+ quorum.MajorityConfig{},
+ nil, // only populated when used
+ },
+ Learners: nil, // only populated when used
+ LearnersNext: nil, // only populated when used
+ },
+ Votes: map[uint64]bool{},
+ Progress: map[uint64]*Progress{},
+ }
+ return p
+}
+
+// ConfState returns a ConfState representing the active configuration.
+func (p *ProgressTracker) ConfState() pb.ConfState {
+ return pb.ConfState{
+ Voters: p.Voters[0].Slice(),
+ VotersOutgoing: p.Voters[1].Slice(),
+ Learners: quorum.MajorityConfig(p.Learners).Slice(),
+ LearnersNext: quorum.MajorityConfig(p.LearnersNext).Slice(),
+ AutoLeave: p.AutoLeave,
+ }
+}
+
+// IsSingleton returns true if (and only if) there is only one voting member
+// (i.e. the leader) in the current configuration.
+func (p *ProgressTracker) IsSingleton() bool {
+ return len(p.Voters[0]) == 1 && len(p.Voters[1]) == 0
+}
+
+type matchAckIndexer map[uint64]*Progress
+
+var _ quorum.AckedIndexer = matchAckIndexer(nil)
+
+// AckedIndex implements IndexLookuper.
+func (l matchAckIndexer) AckedIndex(id uint64) (quorum.Index, bool) {
+ pr, ok := l[id]
+ if !ok {
+ return 0, false
+ }
+ return quorum.Index(pr.Match), true
+}
+
+// Committed returns the largest log index known to be committed based on what
+// the voting members of the group have acknowledged.
+func (p *ProgressTracker) Committed() uint64 {
+ return uint64(p.Voters.CommittedIndex(matchAckIndexer(p.Progress)))
+}
+
+func insertionSort(sl []uint64) {
+ a, b := 0, len(sl)
+ for i := a + 1; i < b; i++ {
+ for j := i; j > a && sl[j] < sl[j-1]; j-- {
+ sl[j], sl[j-1] = sl[j-1], sl[j]
+ }
+ }
+}
+
+// Visit invokes the supplied closure for all tracked progresses in stable order.
+func (p *ProgressTracker) Visit(f func(id uint64, pr *Progress)) {
+ n := len(p.Progress)
+ // We need to sort the IDs and don't want to allocate since this is hot code.
+ // The optimization here mirrors that in `(MajorityConfig).CommittedIndex`,
+ // see there for details.
+ var sl [7]uint64
+ ids := sl[:]
+ if len(sl) >= n {
+ ids = sl[:n]
+ } else {
+ ids = make([]uint64, n)
+ }
+ for id := range p.Progress {
+ n--
+ ids[n] = id
+ }
+ insertionSort(ids)
+ for _, id := range ids {
+ f(id, p.Progress[id])
+ }
+}
+
+// QuorumActive returns true if the quorum is active from the view of the local
+// raft state machine. Otherwise, it returns false.
+func (p *ProgressTracker) QuorumActive() bool {
+ votes := map[uint64]bool{}
+ p.Visit(func(id uint64, pr *Progress) {
+ if pr.IsLearner {
+ return
+ }
+ votes[id] = pr.RecentActive
+ })
+
+ return p.Voters.VoteResult(votes) == quorum.VoteWon
+}
+
+// VoterNodes returns a sorted slice of voters.
+func (p *ProgressTracker) VoterNodes() []uint64 {
+ m := p.Voters.IDs()
+ nodes := make([]uint64, 0, len(m))
+ for id := range m {
+ nodes = append(nodes, id)
+ }
+ sort.Slice(nodes, func(i, j int) bool { return nodes[i] < nodes[j] })
+ return nodes
+}
+
+// LearnerNodes returns a sorted slice of learners.
+func (p *ProgressTracker) LearnerNodes() []uint64 {
+ if len(p.Learners) == 0 {
+ return nil
+ }
+ nodes := make([]uint64, 0, len(p.Learners))
+ for id := range p.Learners {
+ nodes = append(nodes, id)
+ }
+ sort.Slice(nodes, func(i, j int) bool { return nodes[i] < nodes[j] })
+ return nodes
+}
+
+// ResetVotes prepares for a new round of vote counting via recordVote.
+func (p *ProgressTracker) ResetVotes() {
+ p.Votes = map[uint64]bool{}
+}
+
+// RecordVote records that the node with the given id voted for this Raft
+// instance if v == true (and declined it otherwise).
+func (p *ProgressTracker) RecordVote(id uint64, v bool) {
+ _, ok := p.Votes[id]
+ if !ok {
+ p.Votes[id] = v
+ }
+}
+
+// TallyVotes returns the number of granted and rejected Votes, and whether the
+// election outcome is known.
+func (p *ProgressTracker) TallyVotes() (granted int, rejected int, _ quorum.VoteResult) {
+ // Make sure to populate granted/rejected correctly even if the Votes slice
+ // contains members no longer part of the configuration. This doesn't really
+ // matter in the way the numbers are used (they're informational), but might
+ // as well get it right.
+ for id, pr := range p.Progress {
+ if pr.IsLearner {
+ continue
+ }
+ v, voted := p.Votes[id]
+ if !voted {
+ continue
+ }
+ if v {
+ granted++
+ } else {
+ rejected++
+ }
+ }
+ result := p.Voters.VoteResult(p.Votes)
+ return granted, rejected, result
+}
diff --git a/vendor/go.etcd.io/etcd/raft/util.go b/vendor/go.etcd.io/etcd/raft/util.go
index c145d26..785cf73 100644
--- a/vendor/go.etcd.io/etcd/raft/util.go
+++ b/vendor/go.etcd.io/etcd/raft/util.go
@@ -17,6 +17,7 @@
import (
"bytes"
"fmt"
+ "strings"
pb "go.etcd.io/etcd/raft/raftpb"
)
@@ -25,13 +26,6 @@
return []byte(fmt.Sprintf("%q", st.String())), nil
}
-// uint64Slice implements sort interface
-type uint64Slice []uint64
-
-func (p uint64Slice) Len() int { return len(p) }
-func (p uint64Slice) Less(i, j int) bool { return p[i] < p[j] }
-func (p uint64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
-
func min(a, b uint64) uint64 {
if a > b {
return b
@@ -67,6 +61,69 @@
}
}
+func DescribeHardState(hs pb.HardState) string {
+ var buf strings.Builder
+ fmt.Fprintf(&buf, "Term:%d", hs.Term)
+ if hs.Vote != 0 {
+ fmt.Fprintf(&buf, " Vote:%d", hs.Vote)
+ }
+ fmt.Fprintf(&buf, " Commit:%d", hs.Commit)
+ return buf.String()
+}
+
+func DescribeSoftState(ss SoftState) string {
+ return fmt.Sprintf("Lead:%d State:%s", ss.Lead, ss.RaftState)
+}
+
+func DescribeConfState(state pb.ConfState) string {
+ return fmt.Sprintf(
+ "Voters:%v VotersOutgoing:%v Learners:%v LearnersNext:%v AutoLeave:%v",
+ state.Voters, state.VotersOutgoing, state.Learners, state.LearnersNext, state.AutoLeave,
+ )
+}
+
+func DescribeSnapshot(snap pb.Snapshot) string {
+ m := snap.Metadata
+ return fmt.Sprintf("Index:%d Term:%d ConfState:%s", m.Index, m.Term, DescribeConfState(m.ConfState))
+}
+
+func DescribeReady(rd Ready, f EntryFormatter) string {
+ var buf strings.Builder
+ if rd.SoftState != nil {
+ fmt.Fprint(&buf, DescribeSoftState(*rd.SoftState))
+ buf.WriteByte('\n')
+ }
+ if !IsEmptyHardState(rd.HardState) {
+ fmt.Fprintf(&buf, "HardState %s", DescribeHardState(rd.HardState))
+ buf.WriteByte('\n')
+ }
+ if len(rd.ReadStates) > 0 {
+ fmt.Fprintf(&buf, "ReadStates %v\n", rd.ReadStates)
+ }
+ if len(rd.Entries) > 0 {
+ buf.WriteString("Entries:\n")
+ fmt.Fprint(&buf, DescribeEntries(rd.Entries, f))
+ }
+ if !IsEmptySnap(rd.Snapshot) {
+ fmt.Fprintf(&buf, "Snapshot %s\n", DescribeSnapshot(rd.Snapshot))
+ }
+ if len(rd.CommittedEntries) > 0 {
+ buf.WriteString("CommittedEntries:\n")
+ fmt.Fprint(&buf, DescribeEntries(rd.CommittedEntries, f))
+ }
+ if len(rd.Messages) > 0 {
+ buf.WriteString("Messages:\n")
+ for _, msg := range rd.Messages {
+ fmt.Fprint(&buf, DescribeMessage(msg, f))
+ buf.WriteByte('\n')
+ }
+ }
+ if buf.Len() > 0 {
+ return fmt.Sprintf("Ready MustSync=%t:\n%s", rd.MustSync, buf.String())
+ }
+ return "<empty Ready>"
+}
+
// EntryFormatter can be implemented by the application to provide human-readable formatting
// of entry data. Nil is a valid EntryFormatter and will use a default format.
type EntryFormatter func([]byte) string
@@ -93,7 +150,7 @@
fmt.Fprintf(&buf, "]")
}
if !IsEmptySnap(m.Snapshot) {
- fmt.Fprintf(&buf, " Snapshot:%v", m.Snapshot)
+ fmt.Fprintf(&buf, " Snapshot: %s", DescribeSnapshot(m.Snapshot))
}
return buf.String()
}
@@ -107,13 +164,39 @@
// DescribeEntry returns a concise human-readable description of an
// Entry for debugging.
func DescribeEntry(e pb.Entry, f EntryFormatter) string {
- var formatted string
- if e.Type == pb.EntryNormal && f != nil {
- formatted = f(e.Data)
- } else {
- formatted = fmt.Sprintf("%q", e.Data)
+ if f == nil {
+ f = func(data []byte) string { return fmt.Sprintf("%q", data) }
}
- return fmt.Sprintf("%d/%d %s %s", e.Term, e.Index, e.Type, formatted)
+
+ formatConfChange := func(cc pb.ConfChangeI) string {
+ // TODO(tbg): give the EntryFormatter a type argument so that it gets
+ // a chance to expose the Context.
+ return pb.ConfChangesToString(cc.AsV2().Changes)
+ }
+
+ var formatted string
+ switch e.Type {
+ case pb.EntryNormal:
+ formatted = f(e.Data)
+ case pb.EntryConfChange:
+ var cc pb.ConfChange
+ if err := cc.Unmarshal(e.Data); err != nil {
+ formatted = err.Error()
+ } else {
+ formatted = formatConfChange(cc)
+ }
+ case pb.EntryConfChangeV2:
+ var cc pb.ConfChangeV2
+ if err := cc.Unmarshal(e.Data); err != nil {
+ formatted = err.Error()
+ } else {
+ formatted = formatConfChange(cc)
+ }
+ }
+ if formatted != "" {
+ formatted = " " + formatted
+ }
+ return fmt.Sprintf("%d/%d %s%s", e.Term, e.Index, e.Type, formatted)
}
// DescribeEntries calls DescribeEntry for each Entry, adding a newline to
@@ -140,3 +223,11 @@
}
return ents[:limit]
}
+
+func assertConfStatesEquivalent(l Logger, cs1, cs2 pb.ConfState) {
+ err := cs1.Equivalent(cs2)
+ if err == nil {
+ return
+ }
+ l.Panic(err)
+}