VOL-1921 - updated to use go mod
Change-Id: I8d5187fa91fa619494f972bc29d3bd61e5be3a82
diff --git a/vendor/go.etcd.io/etcd/Documentation/README.md b/vendor/go.etcd.io/etcd/Documentation/README.md
deleted file mode 120000
index 8828313..0000000
--- a/vendor/go.etcd.io/etcd/Documentation/README.md
+++ /dev/null
@@ -1 +0,0 @@
-docs.md
\ No newline at end of file
diff --git a/vendor/go.etcd.io/etcd/NOTICE b/vendor/go.etcd.io/etcd/NOTICE
deleted file mode 100644
index b39ddfa..0000000
--- a/vendor/go.etcd.io/etcd/NOTICE
+++ /dev/null
@@ -1,5 +0,0 @@
-CoreOS Project
-Copyright 2014 CoreOS, Inc
-
-This product includes software developed at CoreOS, Inc.
-(http://www.coreos.com/).
diff --git a/vendor/go.etcd.io/etcd/clientv3/balancer/balancer.go b/vendor/go.etcd.io/etcd/clientv3/balancer/balancer.go
index c39702e..d02a7ee 100644
--- a/vendor/go.etcd.io/etcd/clientv3/balancer/balancer.go
+++ b/vendor/go.etcd.io/etcd/clientv3/balancer/balancer.go
@@ -12,24 +12,45 @@
// See the License for the specific language governing permissions and
// limitations under the License.
+// Package balancer implements client balancer.
package balancer
import (
- "fmt"
"strconv"
"sync"
"time"
+ "go.etcd.io/etcd/clientv3/balancer/connectivity"
"go.etcd.io/etcd/clientv3/balancer/picker"
"go.uber.org/zap"
"google.golang.org/grpc/balancer"
- "google.golang.org/grpc/connectivity"
+ grpcconnectivity "google.golang.org/grpc/connectivity"
"google.golang.org/grpc/resolver"
_ "google.golang.org/grpc/resolver/dns" // register DNS resolver
_ "google.golang.org/grpc/resolver/passthrough" // register passthrough resolver
)
+// Config defines balancer configurations.
+type Config struct {
+ // Policy configures balancer policy.
+ Policy picker.Policy
+
+ // Picker implements gRPC picker.
+ // Leave empty if "Policy" field is not custom.
+ // TODO: currently custom policy is not supported.
+ // Picker picker.Picker
+
+ // Name defines an additional name for balancer.
+ // Useful for balancer testing to avoid register conflicts.
+ // If empty, defaults to policy name.
+ Name string
+
+ // Logger configures balancer logging.
+ // If nil, logs are discarded.
+ Logger *zap.Logger
+}
+
// RegisterBuilder creates and registers a builder. Since this function calls balancer.Register, it
// must be invoked at initialization time.
func RegisterBuilder(cfg Config) {
@@ -59,16 +80,13 @@
addrToSc: make(map[resolver.Address]balancer.SubConn),
scToAddr: make(map[balancer.SubConn]resolver.Address),
- scToSt: make(map[balancer.SubConn]connectivity.State),
+ scToSt: make(map[balancer.SubConn]grpcconnectivity.State),
- currentConn: nil,
- csEvltr: &connectivityStateEvaluator{},
+ currentConn: nil,
+ connectivityRecorder: connectivity.New(b.cfg.Logger),
// initialize picker always returns "ErrNoSubConnAvailable"
- Picker: picker.NewErr(balancer.ErrNoSubConnAvailable),
- }
- if bb.lg == nil {
- bb.lg = zap.NewNop()
+ picker: picker.NewErr(balancer.ErrNoSubConnAvailable),
}
// TODO: support multiple connections
@@ -112,13 +130,12 @@
addrToSc map[resolver.Address]balancer.SubConn
scToAddr map[balancer.SubConn]resolver.Address
- scToSt map[balancer.SubConn]connectivity.State
+ scToSt map[balancer.SubConn]grpcconnectivity.State
- currentConn balancer.ClientConn
- currentState connectivity.State
- csEvltr *connectivityStateEvaluator
+ currentConn balancer.ClientConn
+ connectivityRecorder connectivity.Recorder
- picker.Picker
+ picker picker.Picker
}
// HandleResolvedAddrs implements "grpc/balancer.Balancer" interface.
@@ -128,7 +145,11 @@
bb.lg.Warn("HandleResolvedAddrs called with error", zap.String("balancer-id", bb.id), zap.Error(err))
return
}
- bb.lg.Info("resolved", zap.String("balancer-id", bb.id), zap.Strings("addresses", addrsToStrings(addrs)))
+ bb.lg.Info("resolved",
+ zap.String("picker", bb.picker.String()),
+ zap.String("balancer-id", bb.id),
+ zap.Strings("addresses", addrsToStrings(addrs)),
+ )
bb.mu.Lock()
defer bb.mu.Unlock()
@@ -139,12 +160,13 @@
if _, ok := bb.addrToSc[addr]; !ok {
sc, err := bb.currentConn.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{})
if err != nil {
- bb.lg.Warn("NewSubConn failed", zap.String("balancer-id", bb.id), zap.Error(err), zap.String("address", addr.Addr))
+ bb.lg.Warn("NewSubConn failed", zap.String("picker", bb.picker.String()), zap.String("balancer-id", bb.id), zap.Error(err), zap.String("address", addr.Addr))
continue
}
+ bb.lg.Info("created subconn", zap.String("address", addr.Addr))
bb.addrToSc[addr] = sc
bb.scToAddr[sc] = addr
- bb.scToSt[sc] = connectivity.Idle
+ bb.scToSt[sc] = grpcconnectivity.Idle
sc.Connect()
}
}
@@ -157,6 +179,7 @@
bb.lg.Info(
"removed subconn",
+ zap.String("picker", bb.picker.String()),
zap.String("balancer-id", bb.id),
zap.String("address", addr.Addr),
zap.String("subconn", scToString(sc)),
@@ -171,7 +194,7 @@
}
// HandleSubConnStateChange implements "grpc/balancer.Balancer" interface.
-func (bb *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
+func (bb *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s grpcconnectivity.State) {
bb.mu.Lock()
defer bb.mu.Unlock()
@@ -179,8 +202,10 @@
if !ok {
bb.lg.Warn(
"state change for an unknown subconn",
+ zap.String("picker", bb.picker.String()),
zap.String("balancer-id", bb.id),
zap.String("subconn", scToString(sc)),
+ zap.Int("subconn-size", len(bb.scToAddr)),
zap.String("state", s.String()),
)
return
@@ -188,9 +213,11 @@
bb.lg.Info(
"state changed",
+ zap.String("picker", bb.picker.String()),
zap.String("balancer-id", bb.id),
- zap.Bool("connected", s == connectivity.Ready),
+ zap.Bool("connected", s == grpcconnectivity.Ready),
zap.String("subconn", scToString(sc)),
+ zap.Int("subconn-size", len(bb.scToAddr)),
zap.String("address", bb.scToAddr[sc].Addr),
zap.String("old-state", old.String()),
zap.String("new-state", s.String()),
@@ -198,68 +225,63 @@
bb.scToSt[sc] = s
switch s {
- case connectivity.Idle:
+ case grpcconnectivity.Idle:
sc.Connect()
- case connectivity.Shutdown:
+ case grpcconnectivity.Shutdown:
// When an address was removed by resolver, b called RemoveSubConn but
// kept the sc's state in scToSt. Remove state for this sc here.
delete(bb.scToAddr, sc)
delete(bb.scToSt, sc)
}
- oldAggrState := bb.currentState
- bb.currentState = bb.csEvltr.recordTransition(old, s)
+ oldAggrState := bb.connectivityRecorder.GetCurrentState()
+ bb.connectivityRecorder.RecordTransition(old, s)
- // Regenerate picker when one of the following happens:
+ // Update balancer picker when one of the following happens:
// - this sc became ready from not-ready
// - this sc became not-ready from ready
// - the aggregated state of balancer became TransientFailure from non-TransientFailure
// - the aggregated state of balancer became non-TransientFailure from TransientFailure
- if (s == connectivity.Ready) != (old == connectivity.Ready) ||
- (bb.currentState == connectivity.TransientFailure) != (oldAggrState == connectivity.TransientFailure) {
- bb.regeneratePicker()
+ if (s == grpcconnectivity.Ready) != (old == grpcconnectivity.Ready) ||
+ (bb.connectivityRecorder.GetCurrentState() == grpcconnectivity.TransientFailure) != (oldAggrState == grpcconnectivity.TransientFailure) {
+ bb.updatePicker()
}
- bb.currentConn.UpdateBalancerState(bb.currentState, bb.Picker)
+ bb.currentConn.UpdateBalancerState(bb.connectivityRecorder.GetCurrentState(), bb.picker)
}
-func (bb *baseBalancer) regeneratePicker() {
- if bb.currentState == connectivity.TransientFailure {
+func (bb *baseBalancer) updatePicker() {
+ if bb.connectivityRecorder.GetCurrentState() == grpcconnectivity.TransientFailure {
+ bb.picker = picker.NewErr(balancer.ErrTransientFailure)
bb.lg.Info(
- "generated transient error picker",
+ "updated picker to transient error picker",
+ zap.String("picker", bb.picker.String()),
zap.String("balancer-id", bb.id),
zap.String("policy", bb.policy.String()),
)
- bb.Picker = picker.NewErr(balancer.ErrTransientFailure)
return
}
// only pass ready subconns to picker
- scs := make([]balancer.SubConn, 0)
- addrToSc := make(map[resolver.Address]balancer.SubConn)
scToAddr := make(map[balancer.SubConn]resolver.Address)
for addr, sc := range bb.addrToSc {
- if st, ok := bb.scToSt[sc]; ok && st == connectivity.Ready {
- scs = append(scs, sc)
- addrToSc[addr] = sc
+ if st, ok := bb.scToSt[sc]; ok && st == grpcconnectivity.Ready {
scToAddr[sc] = addr
}
}
- switch bb.policy {
- case picker.RoundrobinBalanced:
- bb.Picker = picker.NewRoundrobinBalanced(bb.lg, scs, addrToSc, scToAddr)
-
- default:
- panic(fmt.Errorf("invalid balancer picker policy (%d)", bb.policy))
- }
-
+ bb.picker = picker.New(picker.Config{
+ Policy: bb.policy,
+ Logger: bb.lg,
+ SubConnToResolverAddress: scToAddr,
+ })
bb.lg.Info(
- "generated picker",
+ "updated picker",
+ zap.String("picker", bb.picker.String()),
zap.String("balancer-id", bb.id),
zap.String("policy", bb.policy.String()),
- zap.Strings("subconn-ready", scsToStrings(addrToSc)),
- zap.Int("subconn-size", len(addrToSc)),
+ zap.Strings("subconn-ready", scsToStrings(scToAddr)),
+ zap.Int("subconn-size", len(scToAddr)),
)
}
diff --git a/vendor/go.etcd.io/etcd/clientv3/balancer/config.go b/vendor/go.etcd.io/etcd/clientv3/balancer/config.go
deleted file mode 100644
index 0339a84..0000000
--- a/vendor/go.etcd.io/etcd/clientv3/balancer/config.go
+++ /dev/null
@@ -1,36 +0,0 @@
-// Copyright 2018 The etcd Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package balancer
-
-import (
- "go.etcd.io/etcd/clientv3/balancer/picker"
-
- "go.uber.org/zap"
-)
-
-// Config defines balancer configurations.
-type Config struct {
- // Policy configures balancer policy.
- Policy picker.Policy
-
- // Name defines an additional name for balancer.
- // Useful for balancer testing to avoid register conflicts.
- // If empty, defaults to policy name.
- Name string
-
- // Logger configures balancer logging.
- // If nil, logs are discarded.
- Logger *zap.Logger
-}
diff --git a/vendor/go.etcd.io/etcd/clientv3/balancer/connectivity.go b/vendor/go.etcd.io/etcd/clientv3/balancer/connectivity.go
deleted file mode 100644
index 6cdeb3f..0000000
--- a/vendor/go.etcd.io/etcd/clientv3/balancer/connectivity.go
+++ /dev/null
@@ -1,58 +0,0 @@
-// Copyright 2018 The etcd Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package balancer
-
-import "google.golang.org/grpc/connectivity"
-
-// connectivityStateEvaluator gets updated by addrConns when their
-// states transition, based on which it evaluates the state of
-// ClientConn.
-type connectivityStateEvaluator struct {
- numReady uint64 // Number of addrConns in ready state.
- numConnecting uint64 // Number of addrConns in connecting state.
- numTransientFailure uint64 // Number of addrConns in transientFailure.
-}
-
-// recordTransition records state change happening in every subConn and based on
-// that it evaluates what aggregated state should be.
-// It can only transition between Ready, Connecting and TransientFailure. Other states,
-// Idle and Shutdown are transitioned into by ClientConn; in the beginning of the connection
-// before any subConn is created ClientConn is in idle state. In the end when ClientConn
-// closes it is in Shutdown state.
-//
-// recordTransition should only be called synchronously from the same goroutine.
-func (cse *connectivityStateEvaluator) recordTransition(oldState, newState connectivity.State) connectivity.State {
- // Update counters.
- for idx, state := range []connectivity.State{oldState, newState} {
- updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new.
- switch state {
- case connectivity.Ready:
- cse.numReady += updateVal
- case connectivity.Connecting:
- cse.numConnecting += updateVal
- case connectivity.TransientFailure:
- cse.numTransientFailure += updateVal
- }
- }
-
- // Evaluate.
- if cse.numReady > 0 {
- return connectivity.Ready
- }
- if cse.numConnecting > 0 {
- return connectivity.Connecting
- }
- return connectivity.TransientFailure
-}
diff --git a/vendor/go.etcd.io/etcd/clientv3/balancer/connectivity/connectivity.go b/vendor/go.etcd.io/etcd/clientv3/balancer/connectivity/connectivity.go
new file mode 100644
index 0000000..4c4ad36
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/balancer/connectivity/connectivity.go
@@ -0,0 +1,93 @@
+// Copyright 2019 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package connectivity implements client connectivity operations.
+package connectivity
+
+import (
+ "sync"
+
+ "go.uber.org/zap"
+ "google.golang.org/grpc/connectivity"
+)
+
+// Recorder records gRPC connectivity.
+type Recorder interface {
+ GetCurrentState() connectivity.State
+ RecordTransition(oldState, newState connectivity.State)
+}
+
+// New returns a new Recorder.
+func New(lg *zap.Logger) Recorder {
+ return &recorder{lg: lg}
+}
+
+// recorder takes the connectivity states of multiple SubConns
+// and returns one aggregated connectivity state.
+// ref. https://github.com/grpc/grpc-go/blob/master/balancer/balancer.go
+type recorder struct {
+ lg *zap.Logger
+
+ mu sync.RWMutex
+
+ cur connectivity.State
+
+ numReady uint64 // Number of addrConns in ready state.
+ numConnecting uint64 // Number of addrConns in connecting state.
+ numTransientFailure uint64 // Number of addrConns in transientFailure.
+}
+
+func (rc *recorder) GetCurrentState() (state connectivity.State) {
+ rc.mu.RLock()
+ defer rc.mu.RUnlock()
+ return rc.cur
+}
+
+// RecordTransition records state change happening in subConn and based on that
+// it evaluates what aggregated state should be.
+//
+// - If at least one SubConn in Ready, the aggregated state is Ready;
+// - Else if at least one SubConn in Connecting, the aggregated state is Connecting;
+// - Else the aggregated state is TransientFailure.
+//
+// Idle and Shutdown are not considered.
+//
+// ref. https://github.com/grpc/grpc-go/blob/master/balancer/balancer.go
+func (rc *recorder) RecordTransition(oldState, newState connectivity.State) {
+ rc.mu.Lock()
+ defer rc.mu.Unlock()
+
+ for idx, state := range []connectivity.State{oldState, newState} {
+ updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new.
+ switch state {
+ case connectivity.Ready:
+ rc.numReady += updateVal
+ case connectivity.Connecting:
+ rc.numConnecting += updateVal
+ case connectivity.TransientFailure:
+ rc.numTransientFailure += updateVal
+ default:
+ rc.lg.Warn("connectivity recorder received unknown state", zap.String("connectivity-state", state.String()))
+ }
+ }
+
+ switch { // must be exclusive, no overlap
+ case rc.numReady > 0:
+ rc.cur = connectivity.Ready
+ case rc.numConnecting > 0:
+ rc.cur = connectivity.Connecting
+ default:
+ rc.cur = connectivity.TransientFailure
+ }
+}
diff --git a/vendor/go.etcd.io/etcd/clientv3/balancer/doc.go b/vendor/go.etcd.io/etcd/clientv3/balancer/doc.go
deleted file mode 100644
index 45af5e9..0000000
--- a/vendor/go.etcd.io/etcd/clientv3/balancer/doc.go
+++ /dev/null
@@ -1,16 +0,0 @@
-// Copyright 2018 The etcd Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-// Package balancer implements client balancer.
-package balancer
diff --git a/vendor/go.etcd.io/etcd/clientv3/balancer/grpc1.7-health.go b/vendor/go.etcd.io/etcd/clientv3/balancer/grpc1.7-health.go
deleted file mode 100644
index 2153767..0000000
--- a/vendor/go.etcd.io/etcd/clientv3/balancer/grpc1.7-health.go
+++ /dev/null
@@ -1,657 +0,0 @@
-// Copyright 2018 The etcd Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package balancer
-
-import (
- "context"
- "errors"
- "io/ioutil"
- "net/url"
- "strings"
- "sync"
- "time"
-
- "google.golang.org/grpc"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/grpclog"
- healthpb "google.golang.org/grpc/health/grpc_health_v1"
- "google.golang.org/grpc/status"
-)
-
-// TODO: replace with something better
-var lg = grpclog.NewLoggerV2(ioutil.Discard, ioutil.Discard, ioutil.Discard)
-
-const (
- minHealthRetryDuration = 3 * time.Second
- unknownService = "unknown service grpc.health.v1.Health"
-)
-
-// ErrNoAddrAvailable is returned by Get() when the balancer does not have
-// any active connection to endpoints at the time.
-// This error is returned only when opts.BlockingWait is true.
-var ErrNoAddrAvailable = status.Error(codes.Unavailable, "there is no address available")
-
-type NotifyMsg int
-
-const (
- NotifyReset NotifyMsg = iota
- NotifyNext
-)
-
-// GRPC17Health does the bare minimum to expose multiple eps
-// to the grpc reconnection code path
-type GRPC17Health struct {
- // addrs are the client's endpoint addresses for grpc
- addrs []grpc.Address
-
- // eps holds the raw endpoints from the client
- eps []string
-
- // notifyCh notifies grpc of the set of addresses for connecting
- notifyCh chan []grpc.Address
-
- // readyc closes once the first connection is up
- readyc chan struct{}
- readyOnce sync.Once
-
- // healthCheck checks an endpoint's health.
- healthCheck func(ep string) (bool, error)
- healthCheckTimeout time.Duration
-
- unhealthyMu sync.RWMutex
- unhealthyHostPorts map[string]time.Time
-
- // mu protects all fields below.
- mu sync.RWMutex
-
- // upc closes when pinAddr transitions from empty to non-empty or the balancer closes.
- upc chan struct{}
-
- // downc closes when grpc calls down() on pinAddr
- downc chan struct{}
-
- // stopc is closed to signal updateNotifyLoop should stop.
- stopc chan struct{}
- stopOnce sync.Once
- wg sync.WaitGroup
-
- // donec closes when all goroutines are exited
- donec chan struct{}
-
- // updateAddrsC notifies updateNotifyLoop to update addrs.
- updateAddrsC chan NotifyMsg
-
- // grpc issues TLS cert checks using the string passed into dial so
- // that string must be the host. To recover the full scheme://host URL,
- // have a map from hosts to the original endpoint.
- hostPort2ep map[string]string
-
- // pinAddr is the currently pinned address; set to the empty string on
- // initialization and shutdown.
- pinAddr string
-
- closed bool
-}
-
-// DialFunc defines gRPC dial function.
-type DialFunc func(ep string, dopts ...grpc.DialOption) (*grpc.ClientConn, error)
-
-// NewGRPC17Health returns a new health balancer with gRPC v1.7.
-func NewGRPC17Health(
- eps []string,
- timeout time.Duration,
- dialFunc DialFunc,
-) *GRPC17Health {
- notifyCh := make(chan []grpc.Address)
- addrs := eps2addrs(eps)
- hb := &GRPC17Health{
- addrs: addrs,
- eps: eps,
- notifyCh: notifyCh,
- readyc: make(chan struct{}),
- healthCheck: func(ep string) (bool, error) { return grpcHealthCheck(ep, dialFunc) },
- unhealthyHostPorts: make(map[string]time.Time),
- upc: make(chan struct{}),
- stopc: make(chan struct{}),
- downc: make(chan struct{}),
- donec: make(chan struct{}),
- updateAddrsC: make(chan NotifyMsg),
- hostPort2ep: getHostPort2ep(eps),
- }
- if timeout < minHealthRetryDuration {
- timeout = minHealthRetryDuration
- }
- hb.healthCheckTimeout = timeout
-
- close(hb.downc)
- go hb.updateNotifyLoop()
- hb.wg.Add(1)
- go func() {
- defer hb.wg.Done()
- hb.updateUnhealthy()
- }()
- return hb
-}
-
-func (b *GRPC17Health) Start(target string, config grpc.BalancerConfig) error { return nil }
-
-func (b *GRPC17Health) ConnectNotify() <-chan struct{} {
- b.mu.Lock()
- defer b.mu.Unlock()
- return b.upc
-}
-
-func (b *GRPC17Health) UpdateAddrsC() chan NotifyMsg { return b.updateAddrsC }
-func (b *GRPC17Health) StopC() chan struct{} { return b.stopc }
-
-func (b *GRPC17Health) Ready() <-chan struct{} { return b.readyc }
-
-func (b *GRPC17Health) Endpoint(hostPort string) string {
- b.mu.RLock()
- defer b.mu.RUnlock()
- return b.hostPort2ep[hostPort]
-}
-
-func (b *GRPC17Health) Pinned() string {
- b.mu.RLock()
- defer b.mu.RUnlock()
- return b.pinAddr
-}
-
-func (b *GRPC17Health) HostPortError(hostPort string, err error) {
- if b.Endpoint(hostPort) == "" {
- lg.Infof("clientv3/balancer: %q is stale (skip marking as unhealthy on %q)", hostPort, err.Error())
- return
- }
-
- b.unhealthyMu.Lock()
- b.unhealthyHostPorts[hostPort] = time.Now()
- b.unhealthyMu.Unlock()
- lg.Infof("clientv3/balancer: %q is marked unhealthy (%q)", hostPort, err.Error())
-}
-
-func (b *GRPC17Health) removeUnhealthy(hostPort, msg string) {
- if b.Endpoint(hostPort) == "" {
- lg.Infof("clientv3/balancer: %q was not in unhealthy (%q)", hostPort, msg)
- return
- }
-
- b.unhealthyMu.Lock()
- delete(b.unhealthyHostPorts, hostPort)
- b.unhealthyMu.Unlock()
- lg.Infof("clientv3/balancer: %q is removed from unhealthy (%q)", hostPort, msg)
-}
-
-func (b *GRPC17Health) countUnhealthy() (count int) {
- b.unhealthyMu.RLock()
- count = len(b.unhealthyHostPorts)
- b.unhealthyMu.RUnlock()
- return count
-}
-
-func (b *GRPC17Health) isUnhealthy(hostPort string) (unhealthy bool) {
- b.unhealthyMu.RLock()
- _, unhealthy = b.unhealthyHostPorts[hostPort]
- b.unhealthyMu.RUnlock()
- return unhealthy
-}
-
-func (b *GRPC17Health) cleanupUnhealthy() {
- b.unhealthyMu.Lock()
- for k, v := range b.unhealthyHostPorts {
- if time.Since(v) > b.healthCheckTimeout {
- delete(b.unhealthyHostPorts, k)
- lg.Infof("clientv3/balancer: removed %q from unhealthy after %v", k, b.healthCheckTimeout)
- }
- }
- b.unhealthyMu.Unlock()
-}
-
-func (b *GRPC17Health) liveAddrs() ([]grpc.Address, map[string]struct{}) {
- unhealthyCnt := b.countUnhealthy()
-
- b.mu.RLock()
- defer b.mu.RUnlock()
-
- hbAddrs := b.addrs
- if len(b.addrs) == 1 || unhealthyCnt == 0 || unhealthyCnt == len(b.addrs) {
- liveHostPorts := make(map[string]struct{}, len(b.hostPort2ep))
- for k := range b.hostPort2ep {
- liveHostPorts[k] = struct{}{}
- }
- return hbAddrs, liveHostPorts
- }
-
- addrs := make([]grpc.Address, 0, len(b.addrs)-unhealthyCnt)
- liveHostPorts := make(map[string]struct{}, len(addrs))
- for _, addr := range b.addrs {
- if !b.isUnhealthy(addr.Addr) {
- addrs = append(addrs, addr)
- liveHostPorts[addr.Addr] = struct{}{}
- }
- }
- return addrs, liveHostPorts
-}
-
-func (b *GRPC17Health) updateUnhealthy() {
- for {
- select {
- case <-time.After(b.healthCheckTimeout):
- b.cleanupUnhealthy()
- pinned := b.Pinned()
- if pinned == "" || b.isUnhealthy(pinned) {
- select {
- case b.updateAddrsC <- NotifyNext:
- case <-b.stopc:
- return
- }
- }
- case <-b.stopc:
- return
- }
- }
-}
-
-// NeedUpdate returns true if all connections are down or
-// addresses do not include current pinned address.
-func (b *GRPC17Health) NeedUpdate() bool {
- // updating notifyCh can trigger new connections,
- // need update addrs if all connections are down
- // or addrs does not include pinAddr.
- b.mu.RLock()
- update := !hasAddr(b.addrs, b.pinAddr)
- b.mu.RUnlock()
- return update
-}
-
-func (b *GRPC17Health) UpdateAddrs(eps ...string) {
- np := getHostPort2ep(eps)
-
- b.mu.Lock()
- defer b.mu.Unlock()
-
- match := len(np) == len(b.hostPort2ep)
- if match {
- for k, v := range np {
- if b.hostPort2ep[k] != v {
- match = false
- break
- }
- }
- }
- if match {
- // same endpoints, so no need to update address
- return
- }
-
- b.hostPort2ep = np
- b.addrs, b.eps = eps2addrs(eps), eps
-
- b.unhealthyMu.Lock()
- b.unhealthyHostPorts = make(map[string]time.Time)
- b.unhealthyMu.Unlock()
-}
-
-func (b *GRPC17Health) Next() {
- b.mu.RLock()
- downc := b.downc
- b.mu.RUnlock()
- select {
- case b.updateAddrsC <- NotifyNext:
- case <-b.stopc:
- }
- // wait until disconnect so new RPCs are not issued on old connection
- select {
- case <-downc:
- case <-b.stopc:
- }
-}
-
-func (b *GRPC17Health) updateNotifyLoop() {
- defer close(b.donec)
-
- for {
- b.mu.RLock()
- upc, downc, addr := b.upc, b.downc, b.pinAddr
- b.mu.RUnlock()
- // downc or upc should be closed
- select {
- case <-downc:
- downc = nil
- default:
- }
- select {
- case <-upc:
- upc = nil
- default:
- }
- switch {
- case downc == nil && upc == nil:
- // stale
- select {
- case <-b.stopc:
- return
- default:
- }
- case downc == nil:
- b.notifyAddrs(NotifyReset)
- select {
- case <-upc:
- case msg := <-b.updateAddrsC:
- b.notifyAddrs(msg)
- case <-b.stopc:
- return
- }
- case upc == nil:
- select {
- // close connections that are not the pinned address
- case b.notifyCh <- []grpc.Address{{Addr: addr}}:
- case <-downc:
- case <-b.stopc:
- return
- }
- select {
- case <-downc:
- b.notifyAddrs(NotifyReset)
- case msg := <-b.updateAddrsC:
- b.notifyAddrs(msg)
- case <-b.stopc:
- return
- }
- }
- }
-}
-
-func (b *GRPC17Health) notifyAddrs(msg NotifyMsg) {
- if msg == NotifyNext {
- select {
- case b.notifyCh <- []grpc.Address{}:
- case <-b.stopc:
- return
- }
- }
- b.mu.RLock()
- pinAddr := b.pinAddr
- downc := b.downc
- b.mu.RUnlock()
- addrs, hostPorts := b.liveAddrs()
-
- var waitDown bool
- if pinAddr != "" {
- _, ok := hostPorts[pinAddr]
- waitDown = !ok
- }
-
- select {
- case b.notifyCh <- addrs:
- if waitDown {
- select {
- case <-downc:
- case <-b.stopc:
- }
- }
- case <-b.stopc:
- }
-}
-
-func (b *GRPC17Health) Up(addr grpc.Address) func(error) {
- if !b.mayPin(addr) {
- return func(err error) {}
- }
-
- b.mu.Lock()
- defer b.mu.Unlock()
-
- // gRPC might call Up after it called Close. We add this check
- // to "fix" it up at application layer. Otherwise, will panic
- // if b.upc is already closed.
- if b.closed {
- return func(err error) {}
- }
-
- // gRPC might call Up on a stale address.
- // Prevent updating pinAddr with a stale address.
- if !hasAddr(b.addrs, addr.Addr) {
- return func(err error) {}
- }
-
- if b.pinAddr != "" {
- lg.Infof("clientv3/balancer: %q is up but not pinned (already pinned %q)", addr.Addr, b.pinAddr)
- return func(err error) {}
- }
-
- // notify waiting Get()s and pin first connected address
- close(b.upc)
- b.downc = make(chan struct{})
- b.pinAddr = addr.Addr
- lg.Infof("clientv3/balancer: pin %q", addr.Addr)
-
- // notify client that a connection is up
- b.readyOnce.Do(func() { close(b.readyc) })
-
- return func(err error) {
- // If connected to a black hole endpoint or a killed server, the gRPC ping
- // timeout will induce a network I/O error, and retrying until success;
- // finding healthy endpoint on retry could take several timeouts and redials.
- // To avoid wasting retries, gray-list unhealthy endpoints.
- b.HostPortError(addr.Addr, err)
-
- b.mu.Lock()
- b.upc = make(chan struct{})
- close(b.downc)
- b.pinAddr = ""
- b.mu.Unlock()
- lg.Infof("clientv3/balancer: unpin %q (%q)", addr.Addr, err.Error())
- }
-}
-
-func (b *GRPC17Health) mayPin(addr grpc.Address) bool {
- if b.Endpoint(addr.Addr) == "" { // stale host:port
- return false
- }
-
- b.unhealthyMu.RLock()
- unhealthyCnt := len(b.unhealthyHostPorts)
- failedTime, bad := b.unhealthyHostPorts[addr.Addr]
- b.unhealthyMu.RUnlock()
-
- b.mu.RLock()
- skip := len(b.addrs) == 1 || unhealthyCnt == 0 || len(b.addrs) == unhealthyCnt
- b.mu.RUnlock()
- if skip || !bad {
- return true
- }
-
- // prevent isolated member's endpoint from being infinitely retried, as follows:
- // 1. keepalive pings detects GoAway with http2.ErrCodeEnhanceYourCalm
- // 2. balancer 'Up' unpins with grpc: failed with network I/O error
- // 3. grpc-healthcheck still SERVING, thus retry to pin
- // instead, return before grpc-healthcheck if failed within healthcheck timeout
- if elapsed := time.Since(failedTime); elapsed < b.healthCheckTimeout {
- lg.Infof("clientv3/balancer: %q is up but not pinned (failed %v ago, require minimum %v after failure)", addr.Addr, elapsed, b.healthCheckTimeout)
- return false
- }
-
- if ok, _ := b.healthCheck(addr.Addr); ok {
- b.removeUnhealthy(addr.Addr, "health check success")
- return true
- }
-
- b.HostPortError(addr.Addr, errors.New("health check failed"))
- return false
-}
-
-func (b *GRPC17Health) Get(ctx context.Context, opts grpc.BalancerGetOptions) (grpc.Address, func(), error) {
- var (
- addr string
- closed bool
- )
-
- // If opts.BlockingWait is false (for fail-fast RPCs), it should return
- // an address it has notified via Notify immediately instead of blocking.
- if !opts.BlockingWait {
- b.mu.RLock()
- closed = b.closed
- addr = b.pinAddr
- b.mu.RUnlock()
- if closed {
- return grpc.Address{Addr: ""}, nil, grpc.ErrClientConnClosing
- }
- if addr == "" {
- return grpc.Address{Addr: ""}, nil, ErrNoAddrAvailable
- }
- return grpc.Address{Addr: addr}, func() {}, nil
- }
-
- for {
- b.mu.RLock()
- ch := b.upc
- b.mu.RUnlock()
- select {
- case <-ch:
- case <-b.donec:
- return grpc.Address{Addr: ""}, nil, grpc.ErrClientConnClosing
- case <-ctx.Done():
- return grpc.Address{Addr: ""}, nil, ctx.Err()
- }
- b.mu.RLock()
- closed = b.closed
- addr = b.pinAddr
- b.mu.RUnlock()
- // Close() which sets b.closed = true can be called before Get(), Get() must exit if balancer is closed.
- if closed {
- return grpc.Address{Addr: ""}, nil, grpc.ErrClientConnClosing
- }
- if addr != "" {
- break
- }
- }
- return grpc.Address{Addr: addr}, func() {}, nil
-}
-
-func (b *GRPC17Health) Notify() <-chan []grpc.Address { return b.notifyCh }
-
-func (b *GRPC17Health) Close() error {
- b.mu.Lock()
- // In case gRPC calls close twice. TODO: remove the checking
- // when we are sure that gRPC wont call close twice.
- if b.closed {
- b.mu.Unlock()
- <-b.donec
- return nil
- }
- b.closed = true
- b.stopOnce.Do(func() { close(b.stopc) })
- b.pinAddr = ""
-
- // In the case of following scenario:
- // 1. upc is not closed; no pinned address
- // 2. client issues an RPC, calling invoke(), which calls Get(), enters for loop, blocks
- // 3. client.conn.Close() calls balancer.Close(); closed = true
- // 4. for loop in Get() never exits since ctx is the context passed in by the client and may not be canceled
- // we must close upc so Get() exits from blocking on upc
- select {
- case <-b.upc:
- default:
- // terminate all waiting Get()s
- close(b.upc)
- }
-
- b.mu.Unlock()
- b.wg.Wait()
-
- // wait for updateNotifyLoop to finish
- <-b.donec
- close(b.notifyCh)
-
- return nil
-}
-
-func grpcHealthCheck(ep string, dialFunc func(ep string, dopts ...grpc.DialOption) (*grpc.ClientConn, error)) (bool, error) {
- conn, err := dialFunc(ep)
- if err != nil {
- return false, err
- }
- defer conn.Close()
- cli := healthpb.NewHealthClient(conn)
- ctx, cancel := context.WithTimeout(context.Background(), time.Second)
- resp, err := cli.Check(ctx, &healthpb.HealthCheckRequest{})
- cancel()
- if err != nil {
- if s, ok := status.FromError(err); ok && s.Code() == codes.Unavailable {
- if s.Message() == unknownService { // etcd < v3.3.0
- return true, nil
- }
- }
- return false, err
- }
- return resp.Status == healthpb.HealthCheckResponse_SERVING, nil
-}
-
-func hasAddr(addrs []grpc.Address, targetAddr string) bool {
- for _, addr := range addrs {
- if targetAddr == addr.Addr {
- return true
- }
- }
- return false
-}
-
-func getHost(ep string) string {
- url, uerr := url.Parse(ep)
- if uerr != nil || !strings.Contains(ep, "://") {
- return ep
- }
- return url.Host
-}
-
-func eps2addrs(eps []string) []grpc.Address {
- addrs := make([]grpc.Address, len(eps))
- for i := range eps {
- addrs[i].Addr = getHost(eps[i])
- }
- return addrs
-}
-
-func getHostPort2ep(eps []string) map[string]string {
- hm := make(map[string]string, len(eps))
- for i := range eps {
- _, host, _ := parseEndpoint(eps[i])
- hm[host] = eps[i]
- }
- return hm
-}
-
-func parseEndpoint(endpoint string) (proto string, host string, scheme string) {
- proto = "tcp"
- host = endpoint
- url, uerr := url.Parse(endpoint)
- if uerr != nil || !strings.Contains(endpoint, "://") {
- return proto, host, scheme
- }
- scheme = url.Scheme
-
- // strip scheme:// prefix since grpc dials by host
- host = url.Host
- switch url.Scheme {
- case "http", "https":
- case "unix", "unixs":
- proto = "unix"
- host = url.Host + url.Path
- default:
- proto, host = "", ""
- }
- return proto, host, scheme
-}
diff --git a/vendor/go.etcd.io/etcd/clientv3/balancer/picker/err.go b/vendor/go.etcd.io/etcd/clientv3/balancer/picker/err.go
index c70ce15..9e04378 100644
--- a/vendor/go.etcd.io/etcd/clientv3/balancer/picker/err.go
+++ b/vendor/go.etcd.io/etcd/clientv3/balancer/picker/err.go
@@ -22,13 +22,18 @@
// NewErr returns a picker that always returns err on "Pick".
func NewErr(err error) Picker {
- return &errPicker{err: err}
+ return &errPicker{p: Error, err: err}
}
type errPicker struct {
+ p Policy
err error
}
-func (p *errPicker) Pick(context.Context, balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
- return nil, nil, p.err
+func (ep *errPicker) String() string {
+ return ep.p.String()
+}
+
+func (ep *errPicker) Pick(context.Context, balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
+ return nil, nil, ep.err
}
diff --git a/vendor/go.etcd.io/etcd/clientv3/balancer/picker/picker.go b/vendor/go.etcd.io/etcd/clientv3/balancer/picker/picker.go
index 7ea761b..bd1a5d2 100644
--- a/vendor/go.etcd.io/etcd/clientv3/balancer/picker/picker.go
+++ b/vendor/go.etcd.io/etcd/clientv3/balancer/picker/picker.go
@@ -15,10 +15,77 @@
package picker
import (
+ "fmt"
+
+ "go.uber.org/zap"
"google.golang.org/grpc/balancer"
+ "google.golang.org/grpc/resolver"
)
// Picker defines balancer Picker methods.
type Picker interface {
balancer.Picker
+ String() string
+}
+
+// Config defines picker configuration.
+type Config struct {
+ // Policy specifies etcd clientv3's built in balancer policy.
+ Policy Policy
+
+ // Logger defines picker logging object.
+ Logger *zap.Logger
+
+ // SubConnToResolverAddress maps each gRPC sub-connection to an address.
+ // Basically, it is a list of addresses that the Picker can pick from.
+ SubConnToResolverAddress map[balancer.SubConn]resolver.Address
+}
+
+// Policy defines balancer picker policy.
+type Policy uint8
+
+const (
+ // Error is error picker policy.
+ Error Policy = iota
+
+ // RoundrobinBalanced balances loads over multiple endpoints
+ // and implements failover in roundrobin fashion.
+ RoundrobinBalanced
+
+ // Custom defines custom balancer picker.
+ // TODO: custom picker is not supported yet.
+ Custom
+)
+
+func (p Policy) String() string {
+ switch p {
+ case Error:
+ return "picker-error"
+
+ case RoundrobinBalanced:
+ return "picker-roundrobin-balanced"
+
+ case Custom:
+ panic("'custom' picker policy is not supported yet")
+
+ default:
+ panic(fmt.Errorf("invalid balancer picker policy (%d)", p))
+ }
+}
+
+// New creates a new Picker.
+func New(cfg Config) Picker {
+ switch cfg.Policy {
+ case Error:
+ panic("'error' picker policy is not supported here; use 'picker.NewErr'")
+
+ case RoundrobinBalanced:
+ return newRoundrobinBalanced(cfg)
+
+ case Custom:
+ panic("'custom' picker policy is not supported yet")
+
+ default:
+ panic(fmt.Errorf("invalid balancer picker policy (%d)", cfg.Policy))
+ }
}
diff --git a/vendor/go.etcd.io/etcd/clientv3/balancer/picker/picker_policy.go b/vendor/go.etcd.io/etcd/clientv3/balancer/picker/picker_policy.go
deleted file mode 100644
index 7bca39c..0000000
--- a/vendor/go.etcd.io/etcd/clientv3/balancer/picker/picker_policy.go
+++ /dev/null
@@ -1,49 +0,0 @@
-// Copyright 2018 The etcd Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package picker
-
-import "fmt"
-
-// Policy defines balancer picker policy.
-type Policy uint8
-
-const (
- // TODO: custom picker is not supported yet.
- // custom defines custom balancer picker.
- custom Policy = iota
-
- // RoundrobinBalanced balance loads over multiple endpoints
- // and implements failover in roundrobin fashion.
- RoundrobinBalanced Policy = iota
-
- // TODO: only send loads to pinned address "RoundrobinFailover"
- // just like how 3.3 client works
- //
- // TODO: prioritize leader
- // TODO: health-check
- // TODO: weighted roundrobin
- // TODO: power of two random choice
-)
-
-func (p Policy) String() string {
- switch p {
- case custom:
- panic("'custom' picker policy is not supported yet")
- case RoundrobinBalanced:
- return "etcd-client-roundrobin-balanced"
- default:
- panic(fmt.Errorf("invalid balancer picker policy (%d)", p))
- }
-}
diff --git a/vendor/go.etcd.io/etcd/clientv3/balancer/picker/roundrobin_balanced.go b/vendor/go.etcd.io/etcd/clientv3/balancer/picker/roundrobin_balanced.go
index b043d57..1b8b285 100644
--- a/vendor/go.etcd.io/etcd/clientv3/balancer/picker/roundrobin_balanced.go
+++ b/vendor/go.etcd.io/etcd/clientv3/balancer/picker/roundrobin_balanced.go
@@ -24,32 +24,33 @@
"google.golang.org/grpc/resolver"
)
-// NewRoundrobinBalanced returns a new roundrobin balanced picker.
-func NewRoundrobinBalanced(
- lg *zap.Logger,
- scs []balancer.SubConn,
- addrToSc map[resolver.Address]balancer.SubConn,
- scToAddr map[balancer.SubConn]resolver.Address,
-) Picker {
+// newRoundrobinBalanced returns a new roundrobin balanced picker.
+func newRoundrobinBalanced(cfg Config) Picker {
+ scs := make([]balancer.SubConn, 0, len(cfg.SubConnToResolverAddress))
+ for sc := range cfg.SubConnToResolverAddress {
+ scs = append(scs, sc)
+ }
return &rrBalanced{
- lg: lg,
+ p: RoundrobinBalanced,
+ lg: cfg.Logger,
scs: scs,
- addrToSc: addrToSc,
- scToAddr: scToAddr,
+ scToAddr: cfg.SubConnToResolverAddress,
}
}
type rrBalanced struct {
+ p Policy
+
lg *zap.Logger
- mu sync.RWMutex
- next int
- scs []balancer.SubConn
-
- addrToSc map[resolver.Address]balancer.SubConn
+ mu sync.RWMutex
+ next int
+ scs []balancer.SubConn
scToAddr map[balancer.SubConn]resolver.Address
}
+func (rb *rrBalanced) String() string { return rb.p.String() }
+
// Pick is called for every client request.
func (rb *rrBalanced) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
rb.mu.RLock()
@@ -68,6 +69,7 @@
rb.lg.Debug(
"picked",
+ zap.String("picker", rb.p.String()),
zap.String("address", picked),
zap.Int("subconn-index", cur),
zap.Int("subconn-size", n),
@@ -77,6 +79,7 @@
// TODO: error handling?
fss := []zapcore.Field{
zap.Error(info.Err),
+ zap.String("picker", rb.p.String()),
zap.String("address", picked),
zap.Bool("success", info.Err == nil),
zap.Bool("bytes-sent", info.BytesSent),
diff --git a/vendor/go.etcd.io/etcd/clientv3/balancer/utils.go b/vendor/go.etcd.io/etcd/clientv3/balancer/utils.go
index a11faeb..48eb875 100644
--- a/vendor/go.etcd.io/etcd/clientv3/balancer/utils.go
+++ b/vendor/go.etcd.io/etcd/clientv3/balancer/utils.go
@@ -29,9 +29,9 @@
return fmt.Sprintf("%p", sc)
}
-func scsToStrings(scs map[resolver.Address]balancer.SubConn) (ss []string) {
+func scsToStrings(scs map[balancer.SubConn]resolver.Address) (ss []string) {
ss = make([]string, 0, len(scs))
- for a, sc := range scs {
+ for sc, a := range scs {
ss = append(ss, fmt.Sprintf("%s (%s)", a.Addr, scToString(sc)))
}
sort.Strings(ss)
diff --git a/vendor/go.etcd.io/etcd/clientv3/client.go b/vendor/go.etcd.io/etcd/clientv3/client.go
index b91cbf9..d6000a8 100644
--- a/vendor/go.etcd.io/etcd/clientv3/client.go
+++ b/vendor/go.etcd.io/etcd/clientv3/client.go
@@ -16,7 +16,6 @@
import (
"context"
- "crypto/tls"
"errors"
"fmt"
"net"
@@ -30,12 +29,13 @@
"go.etcd.io/etcd/clientv3/balancer"
"go.etcd.io/etcd/clientv3/balancer/picker"
"go.etcd.io/etcd/clientv3/balancer/resolver/endpoint"
+ "go.etcd.io/etcd/clientv3/credentials"
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
"go.etcd.io/etcd/pkg/logutil"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
- "google.golang.org/grpc/credentials"
+ grpccredentials "google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
@@ -51,12 +51,17 @@
func init() {
lg := zap.NewNop()
if os.Getenv("ETCD_CLIENT_DEBUG") != "" {
+ lcfg := logutil.DefaultZapLoggerConfig
+ lcfg.Level = zap.NewAtomicLevelAt(zap.DebugLevel)
+
var err error
- lg, err = zap.NewProductionConfig().Build() // info level logging
+ lg, err = lcfg.Build() // info level logging
if err != nil {
panic(err)
}
}
+
+ // TODO: support custom balancer
balancer.RegisterBuilder(balancer.Config{
Policy: picker.RoundrobinBalanced,
Name: roundRobinBalancerName,
@@ -76,7 +81,7 @@
conn *grpc.ClientConn
cfg Config
- creds *credentials.TransportCredentials
+ creds grpccredentials.TransportCredentials
resolverGroup *endpoint.ResolverGroup
mu *sync.RWMutex
@@ -86,9 +91,8 @@
// Username is a user name for authentication.
Username string
// Password is a password for authentication.
- Password string
- // tokenCred is an instance of WithPerRPCCredentials()'s argument
- tokenCred *authTokenCredential
+ Password string
+ authTokenBundle credentials.Bundle
callOpts []grpc.CallOption
@@ -125,8 +129,12 @@
// Close shuts down the client's etcd connections.
func (c *Client) Close() error {
c.cancel()
- c.Watcher.Close()
- c.Lease.Close()
+ if c.Watcher != nil {
+ c.Watcher.Close()
+ }
+ if c.Lease != nil {
+ c.Lease.Close()
+ }
if c.resolverGroup != nil {
c.resolverGroup.Close()
}
@@ -193,24 +201,7 @@
}
}
-type authTokenCredential struct {
- token string
- tokenMu *sync.RWMutex
-}
-
-func (cred authTokenCredential) RequireTransportSecurity() bool {
- return false
-}
-
-func (cred authTokenCredential) GetRequestMetadata(ctx context.Context, s ...string) (map[string]string, error) {
- cred.tokenMu.RLock()
- defer cred.tokenMu.RUnlock()
- return map[string]string{
- rpctypes.TokenFieldNameGRPC: cred.token,
- }, nil
-}
-
-func (c *Client) processCreds(scheme string) (creds *credentials.TransportCredentials) {
+func (c *Client) processCreds(scheme string) (creds grpccredentials.TransportCredentials) {
creds = c.creds
switch scheme {
case "unix":
@@ -220,9 +211,7 @@
if creds != nil {
break
}
- tlsconfig := &tls.Config{}
- emptyCreds := credentials.NewTLS(tlsconfig)
- creds = &emptyCreds
+ creds = credentials.NewBundle(credentials.Config{}).TransportCredentials()
default:
creds = nil
}
@@ -230,7 +219,7 @@
}
// dialSetupOpts gives the dial opts prior to any authentication.
-func (c *Client) dialSetupOpts(creds *credentials.TransportCredentials, dopts ...grpc.DialOption) (opts []grpc.DialOption, err error) {
+func (c *Client) dialSetupOpts(creds grpccredentials.TransportCredentials, dopts ...grpc.DialOption) (opts []grpc.DialOption, err error) {
if c.cfg.DialKeepAliveTime > 0 {
params := keepalive.ClientParameters{
Time: c.cfg.DialKeepAliveTime,
@@ -255,7 +244,7 @@
opts = append(opts, grpc.WithDialer(f))
if creds != nil {
- opts = append(opts, grpc.WithTransportCredentials(*creds))
+ opts = append(opts, grpc.WithTransportCredentials(creds))
} else {
opts = append(opts, grpc.WithInsecure())
}
@@ -289,8 +278,8 @@
var err error // return last error in a case of fail
var auth *authenticator
- for i := 0; i < len(c.cfg.Endpoints); i++ {
- ep := c.cfg.Endpoints[i]
+ eps := c.Endpoints()
+ for _, ep := range eps {
// use dial options without dopts to avoid reusing the client balancer
var dOpts []grpc.DialOption
_, host, _ := endpoint.ParseEndpoint(ep)
@@ -318,10 +307,7 @@
continue
}
- c.tokenCred.tokenMu.Lock()
- c.tokenCred.token = resp.Token
- c.tokenCred.tokenMu.Unlock()
-
+ c.authTokenBundle.UpdateAuthToken(resp.Token)
return nil
}
@@ -338,16 +324,14 @@
}
// dial configures and dials any grpc balancer target.
-func (c *Client) dial(target string, creds *credentials.TransportCredentials, dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
+func (c *Client) dial(target string, creds grpccredentials.TransportCredentials, dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
opts, err := c.dialSetupOpts(creds, dopts...)
if err != nil {
return nil, fmt.Errorf("failed to configure dialer: %v", err)
}
if c.Username != "" && c.Password != "" {
- c.tokenCred = &authTokenCredential{
- tokenMu: &sync.RWMutex{},
- }
+ c.authTokenBundle = credentials.NewBundle(credentials.Config{})
ctx, cancel := c.ctx, func() {}
if c.cfg.DialTimeout > 0 {
@@ -364,7 +348,7 @@
return nil, err
}
} else {
- opts = append(opts, grpc.WithPerRPCCredentials(c.tokenCred))
+ opts = append(opts, grpc.WithPerRPCCredentials(c.authTokenBundle.PerRPCCredentials()))
}
cancel()
}
@@ -385,26 +369,25 @@
return conn, nil
}
-func (c *Client) directDialCreds(ep string) *credentials.TransportCredentials {
+func (c *Client) directDialCreds(ep string) grpccredentials.TransportCredentials {
_, hostPort, scheme := endpoint.ParseEndpoint(ep)
creds := c.creds
if len(scheme) != 0 {
creds = c.processCreds(scheme)
if creds != nil {
- c := *creds
- clone := c.Clone()
+ clone := creds.Clone()
// Set the server name must to the endpoint hostname without port since grpc
// otherwise attempts to check if x509 cert is valid for the full endpoint
// including the scheme and port, which fails.
host, _ := endpoint.ParseHostPort(hostPort)
clone.OverrideServerName(host)
- creds = &clone
+ creds = clone
}
}
return creds
}
-func (c *Client) dialWithBalancerCreds(ep string) *credentials.TransportCredentials {
+func (c *Client) dialWithBalancerCreds(ep string) grpccredentials.TransportCredentials {
_, _, scheme := endpoint.ParseEndpoint(ep)
creds := c.creds
if len(scheme) != 0 {
@@ -424,10 +407,9 @@
if cfg == nil {
cfg = &Config{}
}
- var creds *credentials.TransportCredentials
+ var creds grpccredentials.TransportCredentials
if cfg.TLS != nil {
- c := credentials.NewTLS(cfg.TLS)
- creds = &c
+ creds = credentials.NewBundle(credentials.Config{TLSConfig: cfg.TLS}).TransportCredentials()
}
// use a temporary skeleton client to bootstrap first connection
@@ -541,13 +523,17 @@
func (c *Client) checkVersion() (err error) {
var wg sync.WaitGroup
- errc := make(chan error, len(c.cfg.Endpoints))
+
+ eps := c.Endpoints()
+ errc := make(chan error, len(eps))
ctx, cancel := context.WithCancel(c.ctx)
if c.cfg.DialTimeout > 0 {
- ctx, cancel = context.WithTimeout(ctx, c.cfg.DialTimeout)
+ cancel()
+ ctx, cancel = context.WithTimeout(c.ctx, c.cfg.DialTimeout)
}
- wg.Add(len(c.cfg.Endpoints))
- for _, ep := range c.cfg.Endpoints {
+
+ wg.Add(len(eps))
+ for _, ep := range eps {
// if cluster is current, any endpoint gives a recent version
go func(e string) {
defer wg.Done()
@@ -559,8 +545,15 @@
vs := strings.Split(resp.Version, ".")
maj, min := 0, 0
if len(vs) >= 2 {
- maj, _ = strconv.Atoi(vs[0])
- min, rerr = strconv.Atoi(vs[1])
+ var serr error
+ if maj, serr = strconv.Atoi(vs[0]); serr != nil {
+ errc <- serr
+ return
+ }
+ if min, serr = strconv.Atoi(vs[1]); serr != nil {
+ errc <- serr
+ return
+ }
}
if maj < 3 || (maj == 3 && min < 2) {
rerr = ErrOldCluster
@@ -569,7 +562,7 @@
}(ep)
}
// wait for success
- for i := 0; i < len(c.cfg.Endpoints); i++ {
+ for range eps {
if err = <-errc; err == nil {
break
}
@@ -609,10 +602,13 @@
if err == nil {
return false
}
- ev, _ := status.FromError(err)
- // Unavailable codes mean the system will be right back.
- // (e.g., can't connect, lost leader)
- return ev.Code() == codes.Unavailable
+ ev, ok := status.FromError(err)
+ if ok {
+ // Unavailable codes mean the system will be right back.
+ // (e.g., can't connect, lost leader)
+ return ev.Code() == codes.Unavailable
+ }
+ return false
}
func toErr(ctx context.Context, err error) error {
@@ -632,9 +628,6 @@
if ctx.Err() != nil {
err = ctx.Err()
}
- case codes.Unavailable:
- case codes.FailedPrecondition:
- err = grpc.ErrClientConnClosing
}
}
return err
@@ -654,16 +647,19 @@
if err == nil {
return false
}
- // >= gRPC v1.10.x
+
+ // >= gRPC v1.23.x
s, ok := status.FromError(err)
if ok {
// connection is canceled or server has already closed the connection
return s.Code() == codes.Canceled || s.Message() == "transport is closing"
}
+
// >= gRPC v1.10.x
if err == context.Canceled {
return true
}
+
// <= gRPC v1.7.x returns 'errors.New("grpc: the client connection is closing")'
return strings.Contains(err.Error(), "grpc: the client connection is closing")
}
diff --git a/vendor/go.etcd.io/etcd/clientv3/concurrency/mutex.go b/vendor/go.etcd.io/etcd/clientv3/concurrency/mutex.go
index 0135341..306470b 100644
--- a/vendor/go.etcd.io/etcd/clientv3/concurrency/mutex.go
+++ b/vendor/go.etcd.io/etcd/clientv3/concurrency/mutex.go
@@ -16,6 +16,7 @@
import (
"context"
+ "errors"
"fmt"
"sync"
@@ -23,6 +24,9 @@
pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
)
+// ErrLocked is returned by TryLock when Mutex is already locked by another session.
+var ErrLocked = errors.New("mutex: Locked by another session")
+
// Mutex implements the sync Locker interface with etcd
type Mutex struct {
s *Session
@@ -37,9 +41,56 @@
return &Mutex{s, pfx + "/", "", -1, nil}
}
+// TryLock locks the mutex if not already locked by another session.
+// If lock is held by another session, return immediately after attempting necessary cleanup
+// The ctx argument is used for the sending/receiving Txn RPC.
+func (m *Mutex) TryLock(ctx context.Context) error {
+ resp, err := m.tryAcquire(ctx)
+ if err != nil {
+ return err
+ }
+ // if no key on prefix / the minimum rev is key, already hold the lock
+ ownerKey := resp.Responses[1].GetResponseRange().Kvs
+ if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
+ m.hdr = resp.Header
+ return nil
+ }
+ client := m.s.Client()
+ // Cannot lock, so delete the key
+ if _, err := client.Delete(ctx, m.myKey); err != nil {
+ return err
+ }
+ m.myKey = "\x00"
+ m.myRev = -1
+ return ErrLocked
+}
+
// Lock locks the mutex with a cancelable context. If the context is canceled
// while trying to acquire the lock, the mutex tries to clean its stale lock entry.
func (m *Mutex) Lock(ctx context.Context) error {
+ resp, err := m.tryAcquire(ctx)
+ if err != nil {
+ return err
+ }
+ // if no key on prefix / the minimum rev is key, already hold the lock
+ ownerKey := resp.Responses[1].GetResponseRange().Kvs
+ if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
+ m.hdr = resp.Header
+ return nil
+ }
+ client := m.s.Client()
+ // wait for deletion revisions prior to myKey
+ hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
+ // release lock key if wait failed
+ if werr != nil {
+ m.Unlock(client.Ctx())
+ } else {
+ m.hdr = hdr
+ }
+ return werr
+}
+
+func (m *Mutex) tryAcquire(ctx context.Context) (*v3.TxnResponse, error) {
s := m.s
client := m.s.Client()
@@ -53,28 +104,13 @@
getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
if err != nil {
- return err
+ return nil, err
}
m.myRev = resp.Header.Revision
if !resp.Succeeded {
m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
}
- // if no key on prefix / the minimum rev is key, already hold the lock
- ownerKey := resp.Responses[1].GetResponseRange().Kvs
- if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
- m.hdr = resp.Header
- return nil
- }
-
- // wait for deletion revisions prior to myKey
- hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
- // release lock key if wait failed
- if werr != nil {
- m.Unlock(client.Ctx())
- } else {
- m.hdr = hdr
- }
- return werr
+ return resp, nil
}
func (m *Mutex) Unlock(ctx context.Context) error {
diff --git a/vendor/go.etcd.io/etcd/clientv3/config.go b/vendor/go.etcd.io/etcd/clientv3/config.go
index bd03768..11d447d 100644
--- a/vendor/go.etcd.io/etcd/clientv3/config.go
+++ b/vendor/go.etcd.io/etcd/clientv3/config.go
@@ -68,6 +68,8 @@
RejectOldCluster bool `json:"reject-old-cluster"`
// DialOptions is a list of dial options for the grpc client (e.g., for interceptors).
+ // For example, pass "grpc.WithBlock()" to block until the underlying connection is up.
+ // Without this, Dial returns immediately and connecting the server happens in background.
DialOptions []grpc.DialOption
// Context is the default client context; it can be used to cancel grpc dial out and
@@ -81,4 +83,6 @@
// PermitWithoutStream when set will allow client to send keepalive pings to server without any active streams(RPCs).
PermitWithoutStream bool `json:"permit-without-stream"`
+
+ // TODO: support custom balancer picker
}
diff --git a/vendor/go.etcd.io/etcd/clientv3/credentials/credentials.go b/vendor/go.etcd.io/etcd/clientv3/credentials/credentials.go
new file mode 100644
index 0000000..e6fd75c
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/credentials/credentials.go
@@ -0,0 +1,155 @@
+// Copyright 2019 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package credentials implements gRPC credential interface with etcd specific logic.
+// e.g., client handshake with custom authority parameter
+package credentials
+
+import (
+ "context"
+ "crypto/tls"
+ "net"
+ "sync"
+
+ "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
+ grpccredentials "google.golang.org/grpc/credentials"
+)
+
+// Config defines gRPC credential configuration.
+type Config struct {
+ TLSConfig *tls.Config
+}
+
+// Bundle defines gRPC credential interface.
+type Bundle interface {
+ grpccredentials.Bundle
+ UpdateAuthToken(token string)
+}
+
+// NewBundle constructs a new gRPC credential bundle.
+func NewBundle(cfg Config) Bundle {
+ return &bundle{
+ tc: newTransportCredential(cfg.TLSConfig),
+ rc: newPerRPCCredential(),
+ }
+}
+
+// bundle implements "grpccredentials.Bundle" interface.
+type bundle struct {
+ tc *transportCredential
+ rc *perRPCCredential
+}
+
+func (b *bundle) TransportCredentials() grpccredentials.TransportCredentials {
+ return b.tc
+}
+
+func (b *bundle) PerRPCCredentials() grpccredentials.PerRPCCredentials {
+ return b.rc
+}
+
+func (b *bundle) NewWithMode(mode string) (grpccredentials.Bundle, error) {
+ // no-op
+ return nil, nil
+}
+
+// transportCredential implements "grpccredentials.TransportCredentials" interface.
+type transportCredential struct {
+ gtc grpccredentials.TransportCredentials
+}
+
+func newTransportCredential(cfg *tls.Config) *transportCredential {
+ return &transportCredential{
+ gtc: grpccredentials.NewTLS(cfg),
+ }
+}
+
+func (tc *transportCredential) ClientHandshake(ctx context.Context, authority string, rawConn net.Conn) (net.Conn, grpccredentials.AuthInfo, error) {
+ // Only overwrite when authority is an IP address!
+ // Let's say, a server runs SRV records on "etcd.local" that resolves
+ // to "m1.etcd.local", and its SAN field also includes "m1.etcd.local".
+ // But what if SAN does not include its resolved IP address (e.g. 127.0.0.1)?
+ // Then, the server should only authenticate using its DNS hostname "m1.etcd.local",
+ // instead of overwriting it with its IP address.
+ // And we do not overwrite "localhost" either. Only overwrite IP addresses!
+ if isIP(authority) {
+ target := rawConn.RemoteAddr().String()
+ if authority != target {
+ // When user dials with "grpc.WithDialer", "grpc.DialContext" "cc.parsedTarget"
+ // update only happens once. This is problematic, because when TLS is enabled,
+ // retries happen through "grpc.WithDialer" with static "cc.parsedTarget" from
+ // the initial dial call.
+ // If the server authenticates by IP addresses, we want to set a new endpoint as
+ // a new authority. Otherwise
+ // "transport: authentication handshake failed: x509: certificate is valid for 127.0.0.1, 192.168.121.180, not 192.168.223.156"
+ // when the new dial target is "192.168.121.180" whose certificate host name is also "192.168.121.180"
+ // but client tries to authenticate with previously set "cc.parsedTarget" field "192.168.223.156"
+ authority = target
+ }
+ }
+ return tc.gtc.ClientHandshake(ctx, authority, rawConn)
+}
+
+// return true if given string is an IP.
+func isIP(ep string) bool {
+ return net.ParseIP(ep) != nil
+}
+
+func (tc *transportCredential) ServerHandshake(rawConn net.Conn) (net.Conn, grpccredentials.AuthInfo, error) {
+ return tc.gtc.ServerHandshake(rawConn)
+}
+
+func (tc *transportCredential) Info() grpccredentials.ProtocolInfo {
+ return tc.gtc.Info()
+}
+
+func (tc *transportCredential) Clone() grpccredentials.TransportCredentials {
+ return &transportCredential{
+ gtc: tc.gtc.Clone(),
+ }
+}
+
+func (tc *transportCredential) OverrideServerName(serverNameOverride string) error {
+ return tc.gtc.OverrideServerName(serverNameOverride)
+}
+
+// perRPCCredential implements "grpccredentials.PerRPCCredentials" interface.
+type perRPCCredential struct {
+ authToken string
+ authTokenMu sync.RWMutex
+}
+
+func newPerRPCCredential() *perRPCCredential { return &perRPCCredential{} }
+
+func (rc *perRPCCredential) RequireTransportSecurity() bool { return false }
+
+func (rc *perRPCCredential) GetRequestMetadata(ctx context.Context, s ...string) (map[string]string, error) {
+ rc.authTokenMu.RLock()
+ authToken := rc.authToken
+ rc.authTokenMu.RUnlock()
+ return map[string]string{rpctypes.TokenFieldNameGRPC: authToken}, nil
+}
+
+func (b *bundle) UpdateAuthToken(token string) {
+ if b.rc == nil {
+ return
+ }
+ b.rc.UpdateAuthToken(token)
+}
+
+func (rc *perRPCCredential) UpdateAuthToken(token string) {
+ rc.authTokenMu.Lock()
+ rc.authToken = token
+ rc.authTokenMu.Unlock()
+}
diff --git a/vendor/go.etcd.io/etcd/clientv3/doc.go b/vendor/go.etcd.io/etcd/clientv3/doc.go
index 01a3f59..913cd28 100644
--- a/vendor/go.etcd.io/etcd/clientv3/doc.go
+++ b/vendor/go.etcd.io/etcd/clientv3/doc.go
@@ -90,7 +90,7 @@
// // with etcd clientv3 <= v3.3
// if err == context.Canceled {
// // grpc balancer calls 'Get' with an inflight client.Close
-// } else if err == grpc.ErrClientConnClosing {
+// } else if err == grpc.ErrClientConnClosing { // <= gRCP v1.7.x
// // grpc balancer calls 'Get' after client.Close.
// }
// // with etcd clientv3 >= v3.4
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes/error.go b/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes/error.go
index 3bbc26b..e6a2814 100644
--- a/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes/error.go
+++ b/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes/error.go
@@ -54,6 +54,7 @@
ErrGRPCUserNotFound = status.New(codes.FailedPrecondition, "etcdserver: user name not found").Err()
ErrGRPCRoleAlreadyExist = status.New(codes.FailedPrecondition, "etcdserver: role name already exists").Err()
ErrGRPCRoleNotFound = status.New(codes.FailedPrecondition, "etcdserver: role name not found").Err()
+ ErrGRPCRoleEmpty = status.New(codes.InvalidArgument, "etcdserver: role name is empty").Err()
ErrGRPCAuthFailed = status.New(codes.InvalidArgument, "etcdserver: authentication failed, invalid user ID or password").Err()
ErrGRPCPermissionDenied = status.New(codes.PermissionDenied, "etcdserver: permission denied").Err()
ErrGRPCRoleNotGranted = status.New(codes.FailedPrecondition, "etcdserver: role is not granted to the user").Err()
@@ -110,6 +111,7 @@
ErrorDesc(ErrGRPCUserNotFound): ErrGRPCUserNotFound,
ErrorDesc(ErrGRPCRoleAlreadyExist): ErrGRPCRoleAlreadyExist,
ErrorDesc(ErrGRPCRoleNotFound): ErrGRPCRoleNotFound,
+ ErrorDesc(ErrGRPCRoleEmpty): ErrGRPCRoleEmpty,
ErrorDesc(ErrGRPCAuthFailed): ErrGRPCAuthFailed,
ErrorDesc(ErrGRPCPermissionDenied): ErrGRPCPermissionDenied,
ErrorDesc(ErrGRPCRoleNotGranted): ErrGRPCRoleNotGranted,
@@ -168,6 +170,7 @@
ErrUserNotFound = Error(ErrGRPCUserNotFound)
ErrRoleAlreadyExist = Error(ErrGRPCRoleAlreadyExist)
ErrRoleNotFound = Error(ErrGRPCRoleNotFound)
+ ErrRoleEmpty = Error(ErrGRPCRoleEmpty)
ErrAuthFailed = Error(ErrGRPCAuthFailed)
ErrPermissionDenied = Error(ErrGRPCPermissionDenied)
ErrRoleNotGranted = Error(ErrGRPCRoleNotGranted)
diff --git a/vendor/go.etcd.io/etcd/etcdserver/etcdserverpb/rpc.pb.go b/vendor/go.etcd.io/etcd/etcdserver/etcdserverpb/rpc.pb.go
index 73efc30..199ee62 100644
--- a/vendor/go.etcd.io/etcd/etcdserver/etcdserverpb/rpc.pb.go
+++ b/vendor/go.etcd.io/etcd/etcdserver/etcdserverpb/rpc.pb.go
@@ -4537,7 +4537,7 @@
AuthDisable(ctx context.Context, in *AuthDisableRequest, opts ...grpc.CallOption) (*AuthDisableResponse, error)
// Authenticate processes an authenticate request.
Authenticate(ctx context.Context, in *AuthenticateRequest, opts ...grpc.CallOption) (*AuthenticateResponse, error)
- // UserAdd adds a new user.
+ // UserAdd adds a new user. User name cannot be empty.
UserAdd(ctx context.Context, in *AuthUserAddRequest, opts ...grpc.CallOption) (*AuthUserAddResponse, error)
// UserGet gets detailed user information.
UserGet(ctx context.Context, in *AuthUserGetRequest, opts ...grpc.CallOption) (*AuthUserGetResponse, error)
@@ -4551,7 +4551,7 @@
UserGrantRole(ctx context.Context, in *AuthUserGrantRoleRequest, opts ...grpc.CallOption) (*AuthUserGrantRoleResponse, error)
// UserRevokeRole revokes a role of specified user.
UserRevokeRole(ctx context.Context, in *AuthUserRevokeRoleRequest, opts ...grpc.CallOption) (*AuthUserRevokeRoleResponse, error)
- // RoleAdd adds a new role.
+ // RoleAdd adds a new role. Role name cannot be empty.
RoleAdd(ctx context.Context, in *AuthRoleAddRequest, opts ...grpc.CallOption) (*AuthRoleAddResponse, error)
// RoleGet gets detailed role information.
RoleGet(ctx context.Context, in *AuthRoleGetRequest, opts ...grpc.CallOption) (*AuthRoleGetResponse, error)
@@ -4726,7 +4726,7 @@
AuthDisable(context.Context, *AuthDisableRequest) (*AuthDisableResponse, error)
// Authenticate processes an authenticate request.
Authenticate(context.Context, *AuthenticateRequest) (*AuthenticateResponse, error)
- // UserAdd adds a new user.
+ // UserAdd adds a new user. User name cannot be empty.
UserAdd(context.Context, *AuthUserAddRequest) (*AuthUserAddResponse, error)
// UserGet gets detailed user information.
UserGet(context.Context, *AuthUserGetRequest) (*AuthUserGetResponse, error)
@@ -4740,7 +4740,7 @@
UserGrantRole(context.Context, *AuthUserGrantRoleRequest) (*AuthUserGrantRoleResponse, error)
// UserRevokeRole revokes a role of specified user.
UserRevokeRole(context.Context, *AuthUserRevokeRoleRequest) (*AuthUserRevokeRoleResponse, error)
- // RoleAdd adds a new role.
+ // RoleAdd adds a new role. Role name cannot be empty.
RoleAdd(context.Context, *AuthRoleAddRequest) (*AuthRoleAddResponse, error)
// RoleGet gets detailed role information.
RoleGet(context.Context, *AuthRoleGetRequest) (*AuthRoleGetResponse, error)
diff --git a/vendor/go.etcd.io/etcd/etcdserver/etcdserverpb/rpc.proto b/vendor/go.etcd.io/etcd/etcdserver/etcdserverpb/rpc.proto
index 565f8fa..423eaba 100644
--- a/vendor/go.etcd.io/etcd/etcdserver/etcdserverpb/rpc.proto
+++ b/vendor/go.etcd.io/etcd/etcdserver/etcdserverpb/rpc.proto
@@ -264,7 +264,7 @@
};
}
- // UserAdd adds a new user.
+ // UserAdd adds a new user. User name cannot be empty.
rpc UserAdd(AuthUserAddRequest) returns (AuthUserAddResponse) {
option (google.api.http) = {
post: "/v3/auth/user/add"
@@ -320,7 +320,7 @@
};
}
- // RoleAdd adds a new role.
+ // RoleAdd adds a new role. Role name cannot be empty.
rpc RoleAdd(AuthRoleAddRequest) returns (AuthRoleAddResponse) {
option (google.api.http) = {
post: "/v3/auth/role/add"
diff --git a/vendor/go.etcd.io/etcd/pkg/logutil/log_level.go b/vendor/go.etcd.io/etcd/pkg/logutil/log_level.go
new file mode 100644
index 0000000..d57e173
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/pkg/logutil/log_level.go
@@ -0,0 +1,70 @@
+// Copyright 2019 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package logutil
+
+import (
+ "fmt"
+
+ "github.com/coreos/pkg/capnslog"
+ "go.uber.org/zap"
+ "go.uber.org/zap/zapcore"
+)
+
+var DefaultLogLevel = "info"
+
+// ConvertToZapLevel converts log level string to zapcore.Level.
+func ConvertToZapLevel(lvl string) zapcore.Level {
+ switch lvl {
+ case "debug":
+ return zap.DebugLevel
+ case "info":
+ return zap.InfoLevel
+ case "warn":
+ return zap.WarnLevel
+ case "error":
+ return zap.ErrorLevel
+ case "dpanic":
+ return zap.DPanicLevel
+ case "panic":
+ return zap.PanicLevel
+ case "fatal":
+ return zap.FatalLevel
+ default:
+ panic(fmt.Sprintf("unknown level %q", lvl))
+ }
+}
+
+// ConvertToCapnslogLogLevel convert log level string to capnslog.LogLevel.
+// TODO: deprecate this in 3.5
+func ConvertToCapnslogLogLevel(lvl string) capnslog.LogLevel {
+ switch lvl {
+ case "debug":
+ return capnslog.DEBUG
+ case "info":
+ return capnslog.INFO
+ case "warn":
+ return capnslog.WARNING
+ case "error":
+ return capnslog.ERROR
+ case "dpanic":
+ return capnslog.CRITICAL
+ case "panic":
+ return capnslog.CRITICAL
+ case "fatal":
+ return capnslog.CRITICAL
+ default:
+ panic(fmt.Sprintf("unknown level %q", lvl))
+ }
+}
diff --git a/vendor/go.etcd.io/etcd/pkg/logutil/zap.go b/vendor/go.etcd.io/etcd/pkg/logutil/zap.go
index 313d914..8fc6e03 100644
--- a/vendor/go.etcd.io/etcd/pkg/logutil/zap.go
+++ b/vendor/go.etcd.io/etcd/pkg/logutil/zap.go
@@ -23,7 +23,7 @@
// DefaultZapLoggerConfig defines default zap logger configuration.
var DefaultZapLoggerConfig = zap.Config{
- Level: zap.NewAtomicLevelAt(zap.InfoLevel),
+ Level: zap.NewAtomicLevelAt(ConvertToZapLevel(DefaultLogLevel)),
Development: false,
Sampling: &zap.SamplingConfig{
@@ -53,15 +53,12 @@
ErrorOutputPaths: []string{"stderr"},
}
-// AddOutputPaths adds output paths to the existing output paths, resolving conflicts.
-func AddOutputPaths(cfg zap.Config, outputPaths, errorOutputPaths []string) zap.Config {
+// MergeOutputPaths merges logging output paths, resolving conflicts.
+func MergeOutputPaths(cfg zap.Config) zap.Config {
outputs := make(map[string]struct{})
for _, v := range cfg.OutputPaths {
outputs[v] = struct{}{}
}
- for _, v := range outputPaths {
- outputs[v] = struct{}{}
- }
outputSlice := make([]string, 0)
if _, ok := outputs["/dev/null"]; ok {
// "/dev/null" to discard all
@@ -78,9 +75,6 @@
for _, v := range cfg.ErrorOutputPaths {
errOutputs[v] = struct{}{}
}
- for _, v := range errorOutputPaths {
- errOutputs[v] = struct{}{}
- }
errOutputSlice := make([]string, 0)
if _, ok := errOutputs["/dev/null"]; ok {
// "/dev/null" to discard all
diff --git a/vendor/go.etcd.io/etcd/pkg/logutil/zap_raft.go b/vendor/go.etcd.io/etcd/pkg/logutil/zap_raft.go
index e92cba0..f016b30 100644
--- a/vendor/go.etcd.io/etcd/pkg/logutil/zap_raft.go
+++ b/vendor/go.etcd.io/etcd/pkg/logutil/zap_raft.go
@@ -23,7 +23,7 @@
"go.uber.org/zap/zapcore"
)
-// NewRaftLogger converts "*zap.Logger" to "raft.Logger".
+// NewRaftLogger builds "raft.Logger" from "*zap.Config".
func NewRaftLogger(lcfg *zap.Config) (raft.Logger, error) {
if lcfg == nil {
return nil, errors.New("nil zap.Config")
@@ -35,6 +35,11 @@
return &zapRaftLogger{lg: lg, sugar: lg.Sugar()}, nil
}
+// NewRaftLoggerZap converts "*zap.Logger" to "raft.Logger".
+func NewRaftLoggerZap(lg *zap.Logger) raft.Logger {
+ return &zapRaftLogger{lg: lg, sugar: lg.Sugar()}
+}
+
// NewRaftLoggerFromZapCore creates "raft.Logger" from "zap.Core"
// and "zapcore.WriteSyncer".
func NewRaftLoggerFromZapCore(cr zapcore.Core, syncer zapcore.WriteSyncer) raft.Logger {
diff --git a/vendor/go.etcd.io/etcd/pkg/types/set.go b/vendor/go.etcd.io/etcd/pkg/types/set.go
index c111b0c..e7a3cdc 100644
--- a/vendor/go.etcd.io/etcd/pkg/types/set.go
+++ b/vendor/go.etcd.io/etcd/pkg/types/set.go
@@ -148,6 +148,14 @@
func (ts *tsafeSet) Equals(other Set) bool {
ts.m.RLock()
defer ts.m.RUnlock()
+
+ // If ts and other represent the same variable, avoid calling
+ // ts.us.Equals(other), to avoid double RLock bug
+ if _other, ok := other.(*tsafeSet); ok {
+ if _other == ts {
+ return true
+ }
+ }
return ts.us.Equals(other)
}
@@ -173,6 +181,15 @@
func (ts *tsafeSet) Sub(other Set) Set {
ts.m.RLock()
defer ts.m.RUnlock()
+
+ // If ts and other represent the same variable, avoid calling
+ // ts.us.Sub(other), to avoid double RLock bug
+ if _other, ok := other.(*tsafeSet); ok {
+ if _other == ts {
+ usResult := NewUnsafeSet()
+ return &tsafeSet{usResult, sync.RWMutex{}}
+ }
+ }
usResult := ts.us.Sub(other).(*unsafeSet)
return &tsafeSet{usResult, sync.RWMutex{}}
}
diff --git a/vendor/go.etcd.io/etcd/raft/bootstrap.go b/vendor/go.etcd.io/etcd/raft/bootstrap.go
new file mode 100644
index 0000000..bd82b20
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/raft/bootstrap.go
@@ -0,0 +1,80 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package raft
+
+import (
+ "errors"
+
+ pb "go.etcd.io/etcd/raft/raftpb"
+)
+
+// Bootstrap initializes the RawNode for first use by appending configuration
+// changes for the supplied peers. This method returns an error if the Storage
+// is nonempty.
+//
+// It is recommended that instead of calling this method, applications bootstrap
+// their state manually by setting up a Storage that has a first index > 1 and
+// which stores the desired ConfState as its InitialState.
+func (rn *RawNode) Bootstrap(peers []Peer) error {
+ if len(peers) == 0 {
+ return errors.New("must provide at least one peer to Bootstrap")
+ }
+ lastIndex, err := rn.raft.raftLog.storage.LastIndex()
+ if err != nil {
+ return err
+ }
+
+ if lastIndex != 0 {
+ return errors.New("can't bootstrap a nonempty Storage")
+ }
+
+ // We've faked out initial entries above, but nothing has been
+ // persisted. Start with an empty HardState (thus the first Ready will
+ // emit a HardState update for the app to persist).
+ rn.prevHardSt = emptyState
+
+ // TODO(tbg): remove StartNode and give the application the right tools to
+ // bootstrap the initial membership in a cleaner way.
+ rn.raft.becomeFollower(1, None)
+ ents := make([]pb.Entry, len(peers))
+ for i, peer := range peers {
+ cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context}
+ data, err := cc.Marshal()
+ if err != nil {
+ return err
+ }
+
+ ents[i] = pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: uint64(i + 1), Data: data}
+ }
+ rn.raft.raftLog.append(ents...)
+
+ // Now apply them, mainly so that the application can call Campaign
+ // immediately after StartNode in tests. Note that these nodes will
+ // be added to raft twice: here and when the application's Ready
+ // loop calls ApplyConfChange. The calls to addNode must come after
+ // all calls to raftLog.append so progress.next is set after these
+ // bootstrapping entries (it is an error if we try to append these
+ // entries since they have already been committed).
+ // We do not set raftLog.applied so the application will be able
+ // to observe all conf changes via Ready.CommittedEntries.
+ //
+ // TODO(bdarnell): These entries are still unstable; do we need to preserve
+ // the invariant that committed < unstable?
+ rn.raft.raftLog.committed = uint64(len(ents))
+ for _, peer := range peers {
+ rn.raft.applyConfChange(pb.ConfChange{NodeID: peer.ID, Type: pb.ConfChangeAddNode}.AsV2())
+ }
+ return nil
+}
diff --git a/vendor/go.etcd.io/etcd/raft/confchange/confchange.go b/vendor/go.etcd.io/etcd/raft/confchange/confchange.go
index fd75aed..a0dc486 100644
--- a/vendor/go.etcd.io/etcd/raft/confchange/confchange.go
+++ b/vendor/go.etcd.io/etcd/raft/confchange/confchange.go
@@ -41,12 +41,12 @@
// to
// (1 2 3)&&(1 2 3).
//
-// The supplied ConfChanges are then applied to the incoming majority config,
+// The supplied changes are then applied to the incoming majority config,
// resulting in a joint configuration that in terms of the Raft thesis[1]
// (Section 4.3) corresponds to `C_{new,old}`.
//
// [1]: https://github.com/ongardie/dissertation/blob/master/online-trim.pdf
-func (c Changer) EnterJoint(ccs ...pb.ConfChange) (tracker.Config, tracker.ProgressMap, error) {
+func (c Changer) EnterJoint(autoLeave bool, ccs ...pb.ConfChangeSingle) (tracker.Config, tracker.ProgressMap, error) {
cfg, prs, err := c.checkAndCopy()
if err != nil {
return c.err(err)
@@ -62,10 +62,7 @@
return c.err(err)
}
// Clear the outgoing config.
- {
- *outgoingPtr(&cfg.Voters) = quorum.MajorityConfig{}
-
- }
+ *outgoingPtr(&cfg.Voters) = quorum.MajorityConfig{}
// Copy incoming to outgoing.
for id := range incoming(cfg.Voters) {
outgoing(cfg.Voters)[id] = struct{}{}
@@ -74,7 +71,7 @@
if err := c.apply(&cfg, prs, ccs...); err != nil {
return c.err(err)
}
-
+ cfg.AutoLeave = autoLeave
return checkAndReturn(cfg, prs)
}
@@ -120,6 +117,7 @@
}
}
*outgoingPtr(&cfg.Voters) = nil
+ cfg.AutoLeave = false
return checkAndReturn(cfg, prs)
}
@@ -129,7 +127,7 @@
// will return an error if that is not the case, if the resulting quorum is
// zero, or if the configuration is in a joint state (i.e. if there is an
// outgoing configuration).
-func (c Changer) Simple(ccs ...pb.ConfChange) (tracker.Config, tracker.ProgressMap, error) {
+func (c Changer) Simple(ccs ...pb.ConfChangeSingle) (tracker.Config, tracker.ProgressMap, error) {
cfg, prs, err := c.checkAndCopy()
if err != nil {
return c.err(err)
@@ -142,7 +140,7 @@
return c.err(err)
}
if n := symdiff(incoming(c.Tracker.Voters), incoming(cfg.Voters)); n > 1 {
- return tracker.Config{}, nil, errors.New("more than voter changed without entering joint config")
+ return tracker.Config{}, nil, errors.New("more than one voter changed without entering joint config")
}
if err := checkInvariants(cfg, prs); err != nil {
return tracker.Config{}, tracker.ProgressMap{}, nil
@@ -151,14 +149,14 @@
return checkAndReturn(cfg, prs)
}
-// apply a ConfChange to the configuration. By convention, changes to voters are
+// apply a change to the configuration. By convention, changes to voters are
// always made to the incoming majority config Voters[0]. Voters[1] is either
// empty or preserves the outgoing majority configuration while in a joint state.
-func (c Changer) apply(cfg *tracker.Config, prs tracker.ProgressMap, ccs ...pb.ConfChange) error {
+func (c Changer) apply(cfg *tracker.Config, prs tracker.ProgressMap, ccs ...pb.ConfChangeSingle) error {
for _, cc := range ccs {
if cc.NodeID == 0 {
// etcd replaces the NodeID with zero if it decides (downstream of
- // raft) to not apply a ConfChange, so we have to have explicit code
+ // raft) to not apply a change, so we have to have explicit code
// here to ignore these.
continue
}
@@ -259,11 +257,15 @@
nilAwareAdd(&cfg.Learners, id)
}
prs[id] = &tracker.Progress{
- // We initialize Progress.Next with lastIndex+1 so that the peer will be
- // probed without an index first.
+ // Initializing the Progress with the last index means that the follower
+ // can be probed (with the last index).
//
- // TODO(tbg): verify that, this is just my best guess.
- Next: c.LastIndex + 1,
+ // TODO(tbg): seems awfully optimistic. Using the first index would be
+ // better. The general expectation here is that the follower has no log
+ // at all (and will thus likely need a snapshot), though the app may
+ // have applied a snapshot out of band before adding the replica (thus
+ // making the first index the better choice).
+ Next: c.LastIndex,
Match: 0,
Inflights: tracker.NewInflights(c.Tracker.MaxInflight),
IsLearner: isLearner,
@@ -327,6 +329,9 @@
if cfg.LearnersNext != nil {
return fmt.Errorf("LearnersNext must be nil when not joint")
}
+ if cfg.AutoLeave {
+ return fmt.Errorf("AutoLeave must be false when not joint")
+ }
}
return nil
@@ -408,7 +413,7 @@
// Describe prints the type and NodeID of the configuration changes as a
// space-delimited string.
-func Describe(ccs ...pb.ConfChange) string {
+func Describe(ccs ...pb.ConfChangeSingle) string {
var buf strings.Builder
for _, cc := range ccs {
if buf.Len() > 0 {
diff --git a/vendor/go.etcd.io/etcd/raft/confchange/restore.go b/vendor/go.etcd.io/etcd/raft/confchange/restore.go
new file mode 100644
index 0000000..724068d
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/raft/confchange/restore.go
@@ -0,0 +1,155 @@
+// Copyright 2019 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package confchange
+
+import (
+ pb "go.etcd.io/etcd/raft/raftpb"
+ "go.etcd.io/etcd/raft/tracker"
+)
+
+// toConfChangeSingle translates a conf state into 1) a slice of operations creating
+// first the config that will become the outgoing one, and then the incoming one, and
+// b) another slice that, when applied to the config resulted from 1), represents the
+// ConfState.
+func toConfChangeSingle(cs pb.ConfState) (out []pb.ConfChangeSingle, in []pb.ConfChangeSingle) {
+ // Example to follow along this code:
+ // voters=(1 2 3) learners=(5) outgoing=(1 2 4 6) learners_next=(4)
+ //
+ // This means that before entering the joint config, the configuration
+ // had voters (1 2 4) and perhaps some learners that are already gone.
+ // The new set of voters is (1 2 3), i.e. (1 2) were kept around, and (4 6)
+ // are no longer voters; however 4 is poised to become a learner upon leaving
+ // the joint state.
+ // We can't tell whether 5 was a learner before entering the joint config,
+ // but it doesn't matter (we'll pretend that it wasn't).
+ //
+ // The code below will construct
+ // outgoing = add 1; add 2; add 4; add 6
+ // incoming = remove 1; remove 2; remove 4; remove 6
+ // add 1; add 2; add 3;
+ // add-learner 5;
+ // add-learner 4;
+ //
+ // So, when starting with an empty config, after applying 'outgoing' we have
+ //
+ // quorum=(1 2 4 6)
+ //
+ // From which we enter a joint state via 'incoming'
+ //
+ // quorum=(1 2 3)&&(1 2 4 6) learners=(5) learners_next=(4)
+ //
+ // as desired.
+
+ for _, id := range cs.VotersOutgoing {
+ // If there are outgoing voters, first add them one by one so that the
+ // (non-joint) config has them all.
+ out = append(out, pb.ConfChangeSingle{
+ Type: pb.ConfChangeAddNode,
+ NodeID: id,
+ })
+
+ }
+
+ // We're done constructing the outgoing slice, now on to the incoming one
+ // (which will apply on top of the config created by the outgoing slice).
+
+ // First, we'll remove all of the outgoing voters.
+ for _, id := range cs.VotersOutgoing {
+ in = append(in, pb.ConfChangeSingle{
+ Type: pb.ConfChangeRemoveNode,
+ NodeID: id,
+ })
+ }
+ // Then we'll add the incoming voters and learners.
+ for _, id := range cs.Voters {
+ in = append(in, pb.ConfChangeSingle{
+ Type: pb.ConfChangeAddNode,
+ NodeID: id,
+ })
+ }
+ for _, id := range cs.Learners {
+ in = append(in, pb.ConfChangeSingle{
+ Type: pb.ConfChangeAddLearnerNode,
+ NodeID: id,
+ })
+ }
+ // Same for LearnersNext; these are nodes we want to be learners but which
+ // are currently voters in the outgoing config.
+ for _, id := range cs.LearnersNext {
+ in = append(in, pb.ConfChangeSingle{
+ Type: pb.ConfChangeAddLearnerNode,
+ NodeID: id,
+ })
+ }
+ return out, in
+}
+
+func chain(chg Changer, ops ...func(Changer) (tracker.Config, tracker.ProgressMap, error)) (tracker.Config, tracker.ProgressMap, error) {
+ for _, op := range ops {
+ cfg, prs, err := op(chg)
+ if err != nil {
+ return tracker.Config{}, nil, err
+ }
+ chg.Tracker.Config = cfg
+ chg.Tracker.Progress = prs
+ }
+ return chg.Tracker.Config, chg.Tracker.Progress, nil
+}
+
+// Restore takes a Changer (which must represent an empty configuration), and
+// runs a sequence of changes enacting the configuration described in the
+// ConfState.
+//
+// TODO(tbg) it's silly that this takes a Changer. Unravel this by making sure
+// the Changer only needs a ProgressMap (not a whole Tracker) at which point
+// this can just take LastIndex and MaxInflight directly instead and cook up
+// the results from that alone.
+func Restore(chg Changer, cs pb.ConfState) (tracker.Config, tracker.ProgressMap, error) {
+ outgoing, incoming := toConfChangeSingle(cs)
+
+ var ops []func(Changer) (tracker.Config, tracker.ProgressMap, error)
+
+ if len(outgoing) == 0 {
+ // No outgoing config, so just apply the incoming changes one by one.
+ for _, cc := range incoming {
+ cc := cc // loop-local copy
+ ops = append(ops, func(chg Changer) (tracker.Config, tracker.ProgressMap, error) {
+ return chg.Simple(cc)
+ })
+ }
+ } else {
+ // The ConfState describes a joint configuration.
+ //
+ // First, apply all of the changes of the outgoing config one by one, so
+ // that it temporarily becomes the incoming active config. For example,
+ // if the config is (1 2 3)&(2 3 4), this will establish (2 3 4)&().
+ for _, cc := range outgoing {
+ cc := cc // loop-local copy
+ ops = append(ops, func(chg Changer) (tracker.Config, tracker.ProgressMap, error) {
+ return chg.Simple(cc)
+ })
+ }
+ // Now enter the joint state, which rotates the above additions into the
+ // outgoing config, and adds the incoming config in. Continuing the
+ // example above, we'd get (1 2 3)&(2 3 4), i.e. the incoming operations
+ // would be removing 2,3,4 and then adding in 1,2,3 while transitioning
+ // into a joint state.
+ ops = append(ops, func(chg Changer) (tracker.Config, tracker.ProgressMap, error) {
+ return chg.EnterJoint(cs.AutoLeave, incoming...)
+ })
+ }
+
+ return chain(chg, ops...)
+}
diff --git a/vendor/go.etcd.io/etcd/raft/log_unstable.go b/vendor/go.etcd.io/etcd/raft/log_unstable.go
index 1005bf6..1bff5a7 100644
--- a/vendor/go.etcd.io/etcd/raft/log_unstable.go
+++ b/vendor/go.etcd.io/etcd/raft/log_unstable.go
@@ -55,10 +55,7 @@
// is any.
func (u *unstable) maybeTerm(i uint64) (uint64, bool) {
if i < u.offset {
- if u.snapshot == nil {
- return 0, false
- }
- if u.snapshot.Metadata.Index == i {
+ if u.snapshot != nil && u.snapshot.Metadata.Index == i {
return u.snapshot.Metadata.Term, true
}
return 0, false
@@ -71,6 +68,7 @@
if i > last {
return 0, false
}
+
return u.entries[i-u.offset].Term, true
}
diff --git a/vendor/go.etcd.io/etcd/raft/logger.go b/vendor/go.etcd.io/etcd/raft/logger.go
index 426a77d..6d89629 100644
--- a/vendor/go.etcd.io/etcd/raft/logger.go
+++ b/vendor/go.etcd.io/etcd/raft/logger.go
@@ -19,6 +19,7 @@
"io/ioutil"
"log"
"os"
+ "sync"
)
type Logger interface {
@@ -41,11 +42,16 @@
Panicf(format string, v ...interface{})
}
-func SetLogger(l Logger) { raftLogger = l }
+func SetLogger(l Logger) {
+ raftLoggerMu.Lock()
+ raftLogger = l
+ raftLoggerMu.Unlock()
+}
var (
defaultLogger = &DefaultLogger{Logger: log.New(os.Stderr, "raft", log.LstdFlags)}
discardLogger = &DefaultLogger{Logger: log.New(ioutil.Discard, "", 0)}
+ raftLoggerMu sync.Mutex
raftLogger = Logger(defaultLogger)
)
diff --git a/vendor/go.etcd.io/etcd/raft/node.go b/vendor/go.etcd.io/etcd/raft/node.go
index 4a3b2f1..ab6185b 100644
--- a/vendor/go.etcd.io/etcd/raft/node.go
+++ b/vendor/go.etcd.io/etcd/raft/node.go
@@ -132,10 +132,20 @@
// Propose proposes that data be appended to the log. Note that proposals can be lost without
// notice, therefore it is user's job to ensure proposal retries.
Propose(ctx context.Context, data []byte) error
- // ProposeConfChange proposes config change.
- // At most one ConfChange can be in the process of going through consensus.
- // Application needs to call ApplyConfChange when applying EntryConfChange type entry.
- ProposeConfChange(ctx context.Context, cc pb.ConfChange) error
+ // ProposeConfChange proposes a configuration change. Like any proposal, the
+ // configuration change may be dropped with or without an error being
+ // returned. In particular, configuration changes are dropped unless the
+ // leader has certainty that there is no prior unapplied configuration
+ // change in its log.
+ //
+ // The method accepts either a pb.ConfChange (deprecated) or pb.ConfChangeV2
+ // message. The latter allows arbitrary configuration changes via joint
+ // consensus, notably including replacing a voter. Passing a ConfChangeV2
+ // message is only allowed if all Nodes participating in the cluster run a
+ // version of this library aware of the V2 API. See pb.ConfChangeV2 for
+ // usage details and semantics.
+ ProposeConfChange(ctx context.Context, cc pb.ConfChangeI) error
+
// Step advances the state machine using the given message. ctx.Err() will be returned, if any.
Step(ctx context.Context, msg pb.Message) error
@@ -156,11 +166,13 @@
// a long time to apply the snapshot data. To continue receiving Ready without blocking raft
// progress, it can call Advance before finishing applying the last ready.
Advance()
- // ApplyConfChange applies config change to the local node.
- // Returns an opaque ConfState protobuf which must be recorded
- // in snapshots. Will never return nil; it returns a pointer only
- // to match MemoryStorage.Compact.
- ApplyConfChange(cc pb.ConfChange) *pb.ConfState
+ // ApplyConfChange applies a config change (previously passed to
+ // ProposeConfChange) to the node. This must be called whenever a config
+ // change is observed in Ready.CommittedEntries.
+ //
+ // Returns an opaque non-nil ConfState protobuf which must be recorded in
+ // snapshots.
+ ApplyConfChange(cc pb.ConfChangeI) *pb.ConfState
// TransferLeadership attempts to transfer leadership to the given transferee.
TransferLeadership(ctx context.Context, lead, transferee uint64)
@@ -197,52 +209,21 @@
// StartNode returns a new Node given configuration and a list of raft peers.
// It appends a ConfChangeAddNode entry for each given peer to the initial log.
+//
+// Peers must not be zero length; call RestartNode in that case.
func StartNode(c *Config, peers []Peer) Node {
- r := newRaft(c)
- // become the follower at term 1 and apply initial configuration
- // entries of term 1
- r.becomeFollower(1, None)
- for _, peer := range peers {
- cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context}
- d, err := cc.Marshal()
- if err != nil {
- panic("unexpected marshal error")
- }
- // TODO(tbg): this should append the ConfChange for the own node first
- // and also call applyConfChange below for that node first. Otherwise
- // we have a Raft group (for a little while) that doesn't have itself
- // in its config, which is bad.
- // This whole way of setting things up is rickety. The app should just
- // populate the initial ConfState appropriately and then all of this
- // goes away.
- e := pb.Entry{
- Type: pb.EntryConfChange,
- Term: 1,
- Index: r.raftLog.lastIndex() + 1,
- Data: d,
- }
- r.raftLog.append(e)
+ if len(peers) == 0 {
+ panic("no peers given; use RestartNode instead")
}
- // Mark these initial entries as committed.
- // TODO(bdarnell): These entries are still unstable; do we need to preserve
- // the invariant that committed < unstable?
- r.raftLog.committed = r.raftLog.lastIndex()
- // Now apply them, mainly so that the application can call Campaign
- // immediately after StartNode in tests. Note that these nodes will
- // be added to raft twice: here and when the application's Ready
- // loop calls ApplyConfChange. The calls to addNode must come after
- // all calls to raftLog.append so progress.next is set after these
- // bootstrapping entries (it is an error if we try to append these
- // entries since they have already been committed).
- // We do not set raftLog.applied so the application will be able
- // to observe all conf changes via Ready.CommittedEntries.
- for _, peer := range peers {
- r.applyConfChange(pb.ConfChange{NodeID: peer.ID, Type: pb.ConfChangeAddNode})
+ rn, err := NewRawNode(c)
+ if err != nil {
+ panic(err)
}
+ rn.Bootstrap(peers)
- n := newNode()
- n.logger = c.Logger
- go n.run(r)
+ n := newNode(rn)
+
+ go n.run()
return &n
}
@@ -251,11 +232,12 @@
// If the caller has an existing state machine, pass in the last log index that
// has been applied to it; otherwise use zero.
func RestartNode(c *Config) Node {
- r := newRaft(c)
-
- n := newNode()
- n.logger = c.Logger
- go n.run(r)
+ rn, err := NewRawNode(c)
+ if err != nil {
+ panic(err)
+ }
+ n := newNode(rn)
+ go n.run()
return &n
}
@@ -268,7 +250,7 @@
type node struct {
propc chan msgWithResult
recvc chan pb.Message
- confc chan pb.ConfChange
+ confc chan pb.ConfChangeV2
confstatec chan pb.ConfState
readyc chan Ready
advancec chan struct{}
@@ -277,14 +259,14 @@
stop chan struct{}
status chan chan Status
- logger Logger
+ rn *RawNode
}
-func newNode() node {
+func newNode(rn *RawNode) node {
return node{
propc: make(chan msgWithResult),
recvc: make(chan pb.Message),
- confc: make(chan pb.ConfChange),
+ confc: make(chan pb.ConfChangeV2),
confstatec: make(chan pb.ConfState),
readyc: make(chan Ready),
advancec: make(chan struct{}),
@@ -295,6 +277,7 @@
done: make(chan struct{}),
stop: make(chan struct{}),
status: make(chan chan Status),
+ rn: rn,
}
}
@@ -310,30 +293,30 @@
<-n.done
}
-func (n *node) run(r *raft) {
+func (n *node) run() {
var propc chan msgWithResult
var readyc chan Ready
var advancec chan struct{}
- var prevLastUnstablei, prevLastUnstablet uint64
- var havePrevLastUnstablei bool
- var prevSnapi uint64
- var applyingToI uint64
var rd Ready
+ r := n.rn.raft
+
lead := None
- prevSoftSt := r.softState()
- prevHardSt := emptyState
for {
if advancec != nil {
readyc = nil
- } else {
- rd = newReady(r, prevSoftSt, prevHardSt)
- if rd.containsUpdates() {
- readyc = n.readyc
- } else {
- readyc = nil
- }
+ } else if n.rn.HasReady() {
+ // Populate a Ready. Note that this Ready is not guaranteed to
+ // actually be handled. We will arm readyc, but there's no guarantee
+ // that we will actually send on it. It's possible that we will
+ // service another channel instead, loop around, and then populate
+ // the Ready again. We could instead force the previous Ready to be
+ // handled first, but it's generally good to emit larger Readys plus
+ // it simplifies testing (by emitting less frequently and more
+ // predictably).
+ rd = n.rn.readyWithoutAccept()
+ readyc = n.readyc
}
if lead != r.lead {
@@ -369,11 +352,27 @@
r.Step(m)
}
case cc := <-n.confc:
+ _, okBefore := r.prs.Progress[r.id]
cs := r.applyConfChange(cc)
- if _, ok := r.prs.Progress[r.id]; !ok {
- // block incoming proposal when local node is
- // removed
- if cc.NodeID == r.id {
+ // If the node was removed, block incoming proposals. Note that we
+ // only do this if the node was in the config before. Nodes may be
+ // a member of the group without knowing this (when they're catching
+ // up on the log and don't have the latest config) and we don't want
+ // to block the proposal channel in that case.
+ //
+ // NB: propc is reset when the leader changes, which, if we learn
+ // about it, sort of implies that we got readded, maybe? This isn't
+ // very sound and likely has bugs.
+ if _, okAfter := r.prs.Progress[r.id]; okBefore && !okAfter {
+ var found bool
+ for _, sl := range [][]uint64{cs.Voters, cs.VotersOutgoing} {
+ for _, id := range sl {
+ if id == r.id {
+ found = true
+ }
+ }
+ }
+ if !found {
propc = nil
}
}
@@ -382,40 +381,13 @@
case <-n.done:
}
case <-n.tickc:
- r.tick()
+ n.rn.Tick()
case readyc <- rd:
- if rd.SoftState != nil {
- prevSoftSt = rd.SoftState
- }
- if len(rd.Entries) > 0 {
- prevLastUnstablei = rd.Entries[len(rd.Entries)-1].Index
- prevLastUnstablet = rd.Entries[len(rd.Entries)-1].Term
- havePrevLastUnstablei = true
- }
- if !IsEmptyHardState(rd.HardState) {
- prevHardSt = rd.HardState
- }
- if !IsEmptySnap(rd.Snapshot) {
- prevSnapi = rd.Snapshot.Metadata.Index
- }
- if index := rd.appliedCursor(); index != 0 {
- applyingToI = index
- }
-
- r.msgs = nil
- r.readStates = nil
- r.reduceUncommittedSize(rd.CommittedEntries)
+ n.rn.acceptReady(rd)
advancec = n.advancec
case <-advancec:
- if applyingToI != 0 {
- r.raftLog.appliedTo(applyingToI)
- applyingToI = 0
- }
- if havePrevLastUnstablei {
- r.raftLog.stableTo(prevLastUnstablei, prevLastUnstablet)
- havePrevLastUnstablei = false
- }
- r.raftLog.stableSnapTo(prevSnapi)
+ n.rn.Advance(rd)
+ rd = Ready{}
advancec = nil
case c := <-n.status:
c <- getStatus(r)
@@ -433,7 +405,7 @@
case n.tickc <- struct{}{}:
case <-n.done:
default:
- n.logger.Warningf("A tick missed to fire. Node blocks too long!")
+ n.rn.raft.logger.Warningf("%x (leader %v) A tick missed to fire. Node blocks too long!", n.rn.raft.id, n.rn.raft.id == n.rn.raft.lead)
}
}
@@ -452,12 +424,20 @@
return n.step(ctx, m)
}
-func (n *node) ProposeConfChange(ctx context.Context, cc pb.ConfChange) error {
- data, err := cc.Marshal()
+func confChangeToMsg(c pb.ConfChangeI) (pb.Message, error) {
+ typ, data, err := pb.MarshalConfChange(c)
+ if err != nil {
+ return pb.Message{}, err
+ }
+ return pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Type: typ, Data: data}}}, nil
+}
+
+func (n *node) ProposeConfChange(ctx context.Context, cc pb.ConfChangeI) error {
+ msg, err := confChangeToMsg(cc)
if err != nil {
return err
}
- return n.Step(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange, Data: data}}})
+ return n.Step(ctx, msg)
}
func (n *node) step(ctx context.Context, m pb.Message) error {
@@ -518,10 +498,10 @@
}
}
-func (n *node) ApplyConfChange(cc pb.ConfChange) *pb.ConfState {
+func (n *node) ApplyConfChange(cc pb.ConfChangeI) *pb.ConfState {
var cs pb.ConfState
select {
- case n.confc <- cc:
+ case n.confc <- cc.AsV2():
case <-n.done:
}
select {
diff --git a/vendor/go.etcd.io/etcd/raft/quorum/majority.go b/vendor/go.etcd.io/etcd/raft/quorum/majority.go
index 5eba503..8858a36 100644
--- a/vendor/go.etcd.io/etcd/raft/quorum/majority.go
+++ b/vendor/go.etcd.io/etcd/raft/quorum/majority.go
@@ -102,9 +102,17 @@
return buf.String()
}
-type uint64Slice []uint64
+// Slice returns the MajorityConfig as a sorted slice.
+func (c MajorityConfig) Slice() []uint64 {
+ var sl []uint64
+ for id := range c {
+ sl = append(sl, id)
+ }
+ sort.Slice(sl, func(i, j int) bool { return sl[i] < sl[j] })
+ return sl
+}
-func insertionSort(sl uint64Slice) {
+func insertionSort(sl []uint64) {
a, b := 0, len(sl)
for i := a + 1; i < b; i++ {
for j := i; j > a && sl[j] < sl[j-1]; j-- {
@@ -131,12 +139,12 @@
// performance is a lesser concern (additionally the performance
// implications of an allocation here are far from drastic).
var stk [7]uint64
- srt := uint64Slice(stk[:])
-
- if cap(srt) < n {
+ var srt []uint64
+ if len(stk) >= n {
+ srt = stk[:n]
+ } else {
srt = make([]uint64, n)
}
- srt = srt[:n]
{
// Fill the slice with the indexes observed. Any unused slots will be
diff --git a/vendor/go.etcd.io/etcd/raft/raft.go b/vendor/go.etcd.io/etcd/raft/raft.go
index 01e23ec..cdcb43d 100644
--- a/vendor/go.etcd.io/etcd/raft/raft.go
+++ b/vendor/go.etcd.io/etcd/raft/raft.go
@@ -20,6 +20,7 @@
"fmt"
"math"
"math/rand"
+ "sort"
"strings"
"sync"
"time"
@@ -263,7 +264,8 @@
maxMsgSize uint64
maxUncommittedSize uint64
- prs tracker.ProgressTracker
+ // TODO(tbg): rename to trk.
+ prs tracker.ProgressTracker
state StateType
@@ -327,18 +329,18 @@
if err != nil {
panic(err) // TODO(bdarnell)
}
- peers := c.peers
- learners := c.learners
- if len(cs.Nodes) > 0 || len(cs.Learners) > 0 {
- if len(peers) > 0 || len(learners) > 0 {
+
+ if len(c.peers) > 0 || len(c.learners) > 0 {
+ if len(cs.Voters) > 0 || len(cs.Learners) > 0 {
// TODO(bdarnell): the peers argument is always nil except in
// tests; the argument should be removed and these tests should be
// updated to specify their nodes through a snapshot.
- panic("cannot specify both newRaft(peers, learners) and ConfState.(Nodes, Learners)")
+ panic("cannot specify both newRaft(peers, learners) and ConfState.(Voters, Learners)")
}
- peers = cs.Nodes
- learners = cs.Learners
+ cs.Voters = c.peers
+ cs.Learners = c.learners
}
+
r := &raft{
id: c.ID,
lead: None,
@@ -355,16 +357,17 @@
readOnly: newReadOnly(c.ReadOnlyOption),
disableProposalForwarding: c.DisableProposalForwarding,
}
- for _, p := range peers {
- // Add node to active config.
- r.applyConfChange(pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: p})
- }
- for _, p := range learners {
- // Add learner to active config.
- r.applyConfChange(pb.ConfChange{Type: pb.ConfChangeAddLearnerNode, NodeID: p})
- }
- if !isHardStateEqual(hs, emptyState) {
+ cfg, prs, err := confchange.Restore(confchange.Changer{
+ Tracker: r.prs,
+ LastIndex: raftlog.lastIndex(),
+ }, cs)
+ if err != nil {
+ panic(err)
+ }
+ assertConfStatesEquivalent(r.logger, cs, r.switchToConfig(cfg, prs))
+
+ if !IsEmptyHardState(hs) {
r.loadState(hs)
}
if c.Applied > 0 {
@@ -527,7 +530,6 @@
if id == r.id {
return
}
-
r.sendAppend(id)
})
}
@@ -551,6 +553,46 @@
})
}
+func (r *raft) advance(rd Ready) {
+ // If entries were applied (or a snapshot), update our cursor for
+ // the next Ready. Note that if the current HardState contains a
+ // new Commit index, this does not mean that we're also applying
+ // all of the new entries due to commit pagination by size.
+ if index := rd.appliedCursor(); index > 0 {
+ r.raftLog.appliedTo(index)
+ if r.prs.Config.AutoLeave && index >= r.pendingConfIndex && r.state == StateLeader {
+ // If the current (and most recent, at least for this leader's term)
+ // configuration should be auto-left, initiate that now.
+ ccdata, err := (&pb.ConfChangeV2{}).Marshal()
+ if err != nil {
+ panic(err)
+ }
+ ent := pb.Entry{
+ Type: pb.EntryConfChangeV2,
+ Data: ccdata,
+ }
+ if !r.appendEntry(ent) {
+ // If we could not append the entry, bump the pending conf index
+ // so that we'll try again later.
+ //
+ // TODO(tbg): test this case.
+ r.pendingConfIndex = r.raftLog.lastIndex()
+ } else {
+ r.logger.Infof("initiating automatic transition out of joint configuration %s", r.prs.Config)
+ }
+ }
+ }
+ r.reduceUncommittedSize(rd.CommittedEntries)
+
+ if len(rd.Entries) > 0 {
+ e := rd.Entries[len(rd.Entries)-1]
+ r.raftLog.stableTo(e.Index, e.Term)
+ }
+ if !IsEmptySnap(rd.Snapshot) {
+ r.raftLog.stableSnapTo(rd.Snapshot.Metadata.Index)
+ }
+}
+
// maybeCommit attempts to advance the commit index. Returns true if
// the commit index changed (in which case the caller should call
// r.bcastAppend).
@@ -753,7 +795,16 @@
}
return
}
- for id := range r.prs.Voters.IDs() {
+ var ids []uint64
+ {
+ idMap := r.prs.Voters.IDs()
+ ids = make([]uint64, 0, len(idMap))
+ for id := range idMap {
+ ids = append(ids, id)
+ }
+ sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] })
+ }
+ for _, id := range ids {
if id == r.id {
continue
}
@@ -880,12 +931,6 @@
}
case pb.MsgVote, pb.MsgPreVote:
- if r.isLearner {
- // TODO: learner may need to vote, in case of node down when confchange.
- r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] ignored %s from %x [logterm: %d, index: %d] at term %d: learner can not vote",
- r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
- return nil
- }
// We can vote if this is a repeat of a vote we've already cast...
canVote := r.Vote == m.From ||
// ...we haven't voted and we don't think there's a leader yet in this term...
@@ -894,6 +939,24 @@
(m.Type == pb.MsgPreVote && m.Term > r.Term)
// ...and we believe the candidate is up to date.
if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
+ // Note: it turns out that that learners must be allowed to cast votes.
+ // This seems counter- intuitive but is necessary in the situation in which
+ // a learner has been promoted (i.e. is now a voter) but has not learned
+ // about this yet.
+ // For example, consider a group in which id=1 is a learner and id=2 and
+ // id=3 are voters. A configuration change promoting 1 can be committed on
+ // the quorum `{2,3}` without the config change being appended to the
+ // learner's log. If the leader (say 2) fails, there are de facto two
+ // voters remaining. Only 3 can win an election (due to its log containing
+ // all committed entries), but to do so it will need 1 to vote. But 1
+ // considers itself a learner and will continue to do so until 3 has
+ // stepped up as leader, replicates the conf change to 1, and 1 applies it.
+ // Ultimately, by receiving a request to vote, the learner realizes that
+ // the candidate believes it to be a voter, and that it should act
+ // accordingly. The candidate's config may be stale, too; but in that case
+ // it won't win the election, at least in the absence of the bug discussed
+ // in:
+ // https://github.com/etcd-io/etcd/issues/7625#issuecomment-488798263.
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] cast %s for %x [logterm: %d, index: %d] at term %d",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
// When responding to Msg{Pre,}Vote messages we include the term
@@ -973,10 +1036,36 @@
for i := range m.Entries {
e := &m.Entries[i]
+ var cc pb.ConfChangeI
if e.Type == pb.EntryConfChange {
- if r.pendingConfIndex > r.raftLog.applied {
- r.logger.Infof("propose conf %s ignored since pending unapplied configuration [index %d, applied %d]",
- e, r.pendingConfIndex, r.raftLog.applied)
+ var ccc pb.ConfChange
+ if err := ccc.Unmarshal(e.Data); err != nil {
+ panic(err)
+ }
+ cc = ccc
+ } else if e.Type == pb.EntryConfChangeV2 {
+ var ccc pb.ConfChangeV2
+ if err := ccc.Unmarshal(e.Data); err != nil {
+ panic(err)
+ }
+ cc = ccc
+ }
+ if cc != nil {
+ alreadyPending := r.pendingConfIndex > r.raftLog.applied
+ alreadyJoint := len(r.prs.Config.Voters[1]) > 0
+ wantsLeaveJoint := len(cc.AsV2().Changes) == 0
+
+ var refused string
+ if alreadyPending {
+ refused = fmt.Sprintf("possible unapplied conf change at index %d (applied to %d)", r.pendingConfIndex, r.raftLog.applied)
+ } else if alreadyJoint && !wantsLeaveJoint {
+ refused = "must transition out of joint config first"
+ } else if !alreadyJoint && wantsLeaveJoint {
+ refused = "not in joint state; refusing empty conf change"
+ }
+
+ if refused != "" {
+ r.logger.Infof("%x ignoring conf change %v at config %s: %s", r.id, cc, r.prs.Config, refused)
m.Entries[i] = pb.Entry{Type: pb.EntryNormal}
} else {
r.pendingConfIndex = r.raftLog.lastIndex() + uint64(i) + 1
@@ -1010,7 +1099,7 @@
case ReadOnlyLeaseBased:
ri := r.raftLog.committed
if m.From == None || m.From == r.id { // from local member
- r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data})
+ r.readStates = append(r.readStates, ReadState{Index: ri, RequestCtx: m.Entries[0].Data})
} else {
r.send(pb.Message{To: m.From, Type: pb.MsgReadIndexResp, Index: ri, Entries: m.Entries})
}
@@ -1037,7 +1126,7 @@
pr.RecentActive = true
if m.Reject {
- r.logger.Debugf("%x received msgApp rejection(lastindex: %d) from %x for index %d",
+ r.logger.Debugf("%x received MsgAppResp(MsgApp was rejected, lastindex: %d) from %x for index %d",
r.id, m.RejectHint, m.From, m.Index)
if pr.MaybeDecrTo(m.Index, m.RejectHint) {
r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr)
@@ -1053,6 +1142,9 @@
case pr.State == tracker.StateProbe:
pr.BecomeReplicate()
case pr.State == tracker.StateSnapshot && pr.Match >= pr.PendingSnapshot:
+ // TODO(tbg): we should also enter this branch if a snapshot is
+ // received that is below pr.PendingSnapshot but which makes it
+ // possible to use the log again.
r.logger.Debugf("%x recovered from needing snapshot, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
// Transition back to replicating state via probing state
// (which takes the snapshot into account). If we didn't
@@ -1134,8 +1226,8 @@
pr.BecomeProbe()
r.logger.Debugf("%x snapshot failed, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
}
- // If snapshot finish, wait for the msgAppResp from the remote node before sending
- // out the next msgApp.
+ // If snapshot finish, wait for the MsgAppResp from the remote node before sending
+ // out the next MsgApp.
// If snapshot failure, wait for a heartbeat interval before next try
pr.ProbeSent = true
case pb.MsgUnreachable:
@@ -1294,7 +1386,7 @@
if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
} else {
- r.logger.Debugf("%x [logterm: %d, index: %d] rejected msgApp [logterm: %d, index: %d] from %x",
+ r.logger.Debugf("%x [logterm: %d, index: %d] rejected MsgApp [logterm: %d, index: %d] from %x",
r.id, r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From)
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true, RejectHint: r.raftLog.lastIndex()})
}
@@ -1339,12 +1431,12 @@
}
// More defense-in-depth: throw away snapshot if recipient is not in the
- // config. This shouuldn't ever happen (at the time of writing) but lots of
+ // config. This shouldn't ever happen (at the time of writing) but lots of
// code here and there assumes that r.id is in the progress tracker.
found := false
cs := s.Metadata.ConfState
for _, set := range [][]uint64{
- cs.Nodes,
+ cs.Voters,
cs.Learners,
} {
for _, id := range set {
@@ -1375,12 +1467,18 @@
// Reset the configuration and add the (potentially updated) peers in anew.
r.prs = tracker.MakeProgressTracker(r.prs.MaxInflight)
- for _, id := range s.Metadata.ConfState.Nodes {
- r.applyConfChange(pb.ConfChange{NodeID: id, Type: pb.ConfChangeAddNode})
+ cfg, prs, err := confchange.Restore(confchange.Changer{
+ Tracker: r.prs,
+ LastIndex: r.raftLog.lastIndex(),
+ }, cs)
+
+ if err != nil {
+ // This should never happen. Either there's a bug in our config change
+ // handling or the client corrupted the conf change.
+ panic(fmt.Sprintf("unable to restore config %+v: %s", cs, err))
}
- for _, id := range s.Metadata.ConfState.Learners {
- r.applyConfChange(pb.ConfChange{NodeID: id, Type: pb.ConfChangeAddLearnerNode})
- }
+
+ assertConfStatesEquivalent(r.logger, cs, r.switchToConfig(cfg, prs))
pr := r.prs.Progress[r.id]
pr.MaybeUpdate(pr.Next - 1) // TODO(tbg): this is untested and likely unneeded
@@ -1397,21 +1495,40 @@
return pr != nil && !pr.IsLearner
}
-func (r *raft) applyConfChange(cc pb.ConfChange) pb.ConfState {
- cfg, prs, err := confchange.Changer{
- Tracker: r.prs,
- LastIndex: r.raftLog.lastIndex(),
- }.Simple(cc)
+func (r *raft) applyConfChange(cc pb.ConfChangeV2) pb.ConfState {
+ cfg, prs, err := func() (tracker.Config, tracker.ProgressMap, error) {
+ changer := confchange.Changer{
+ Tracker: r.prs,
+ LastIndex: r.raftLog.lastIndex(),
+ }
+ if cc.LeaveJoint() {
+ return changer.LeaveJoint()
+ } else if autoLeave, ok := cc.EnterJoint(); ok {
+ return changer.EnterJoint(autoLeave, cc.Changes...)
+ }
+ return changer.Simple(cc.Changes...)
+ }()
+
if err != nil {
+ // TODO(tbg): return the error to the caller.
panic(err)
}
+
+ return r.switchToConfig(cfg, prs)
+}
+
+// switchToConfig reconfigures this node to use the provided configuration. It
+// updates the in-memory state and, when necessary, carries out additional
+// actions such as reacting to the removal of nodes or changed quorum
+// requirements.
+//
+// The inputs usually result from restoring a ConfState or applying a ConfChange.
+func (r *raft) switchToConfig(cfg tracker.Config, prs tracker.ProgressMap) pb.ConfState {
r.prs.Config = cfg
r.prs.Progress = prs
r.logger.Infof("%x switched to configuration %s", r.id, r.prs.Config)
- // Now that the configuration is updated, handle any side effects.
-
- cs := pb.ConfState{Nodes: r.prs.VoterNodes(), Learners: r.prs.LearnerNodes()}
+ cs := r.prs.ConfState()
pr, ok := r.prs.Progress[r.id]
// Update whether the node itself is a learner, resetting to false when the
@@ -1433,13 +1550,21 @@
// The remaining steps only make sense if this node is the leader and there
// are other nodes.
- if r.state != StateLeader || len(cs.Nodes) == 0 {
+ if r.state != StateLeader || len(cs.Voters) == 0 {
return cs
}
+
if r.maybeCommit() {
- // The quorum size may have been reduced (but not to zero), so see if
- // any pending entries can be committed.
+ // If the configuration change means that more entries are committed now,
+ // broadcast/append to everyone in the updated config.
r.bcastAppend()
+ } else {
+ // Otherwise, still probe the newly added replicas; there's no reason to
+ // let them wait out a heartbeat interval (or the next incoming
+ // proposal).
+ r.prs.Visit(func(id uint64, pr *tracker.Progress) {
+ r.maybeSendAppend(id, false /* sendIfEmpty */)
+ })
}
// If the the leadTransferee was removed, abort the leadership transfer.
if _, tOK := r.prs.Progress[r.leadTransferee]; !tOK && r.leadTransferee != 0 {
diff --git a/vendor/go.etcd.io/etcd/raft/raftpb/confchange.go b/vendor/go.etcd.io/etcd/raft/raftpb/confchange.go
new file mode 100644
index 0000000..46a7a70
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/raft/raftpb/confchange.go
@@ -0,0 +1,170 @@
+// Copyright 2019 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package raftpb
+
+import (
+ "fmt"
+ "strconv"
+ "strings"
+
+ "github.com/gogo/protobuf/proto"
+)
+
+// ConfChangeI abstracts over ConfChangeV2 and (legacy) ConfChange to allow
+// treating them in a unified manner.
+type ConfChangeI interface {
+ AsV2() ConfChangeV2
+ AsV1() (ConfChange, bool)
+}
+
+// MarshalConfChange calls Marshal on the underlying ConfChange or ConfChangeV2
+// and returns the result along with the corresponding EntryType.
+func MarshalConfChange(c ConfChangeI) (EntryType, []byte, error) {
+ var typ EntryType
+ var ccdata []byte
+ var err error
+ if ccv1, ok := c.AsV1(); ok {
+ typ = EntryConfChange
+ ccdata, err = ccv1.Marshal()
+ } else {
+ ccv2 := c.AsV2()
+ typ = EntryConfChangeV2
+ ccdata, err = ccv2.Marshal()
+ }
+ return typ, ccdata, err
+}
+
+// AsV2 returns a V2 configuration change carrying out the same operation.
+func (c ConfChange) AsV2() ConfChangeV2 {
+ return ConfChangeV2{
+ Changes: []ConfChangeSingle{{
+ Type: c.Type,
+ NodeID: c.NodeID,
+ }},
+ Context: c.Context,
+ }
+}
+
+// AsV1 returns the ConfChange and true.
+func (c ConfChange) AsV1() (ConfChange, bool) {
+ return c, true
+}
+
+// AsV2 is the identity.
+func (c ConfChangeV2) AsV2() ConfChangeV2 { return c }
+
+// AsV1 returns ConfChange{} and false.
+func (c ConfChangeV2) AsV1() (ConfChange, bool) { return ConfChange{}, false }
+
+// EnterJoint returns two bools. The second bool is true if and only if this
+// config change will use Joint Consensus, which is the case if it contains more
+// than one change or if the use of Joint Consensus was requested explicitly.
+// The first bool can only be true if second one is, and indicates whether the
+// Joint State will be left automatically.
+func (c *ConfChangeV2) EnterJoint() (autoLeave bool, ok bool) {
+ // NB: in theory, more config changes could qualify for the "simple"
+ // protocol but it depends on the config on top of which the changes apply.
+ // For example, adding two learners is not OK if both nodes are part of the
+ // base config (i.e. two voters are turned into learners in the process of
+ // applying the conf change). In practice, these distinctions should not
+ // matter, so we keep it simple and use Joint Consensus liberally.
+ if c.Transition != ConfChangeTransitionAuto || len(c.Changes) > 1 {
+ // Use Joint Consensus.
+ var autoLeave bool
+ switch c.Transition {
+ case ConfChangeTransitionAuto:
+ autoLeave = true
+ case ConfChangeTransitionJointImplicit:
+ autoLeave = true
+ case ConfChangeTransitionJointExplicit:
+ default:
+ panic(fmt.Sprintf("unknown transition: %+v", c))
+ }
+ return autoLeave, true
+ }
+ return false, false
+}
+
+// LeaveJoint is true if the configuration change leaves a joint configuration.
+// This is the case if the ConfChangeV2 is zero, with the possible exception of
+// the Context field.
+func (c *ConfChangeV2) LeaveJoint() bool {
+ cpy := *c
+ cpy.Context = nil
+ return proto.Equal(&cpy, &ConfChangeV2{})
+}
+
+// ConfChangesFromString parses a Space-delimited sequence of operations into a
+// slice of ConfChangeSingle. The supported operations are:
+// - vn: make n a voter,
+// - ln: make n a learner,
+// - rn: remove n, and
+// - un: update n.
+func ConfChangesFromString(s string) ([]ConfChangeSingle, error) {
+ var ccs []ConfChangeSingle
+ toks := strings.Split(strings.TrimSpace(s), " ")
+ if toks[0] == "" {
+ toks = nil
+ }
+ for _, tok := range toks {
+ if len(tok) < 2 {
+ return nil, fmt.Errorf("unknown token %s", tok)
+ }
+ var cc ConfChangeSingle
+ switch tok[0] {
+ case 'v':
+ cc.Type = ConfChangeAddNode
+ case 'l':
+ cc.Type = ConfChangeAddLearnerNode
+ case 'r':
+ cc.Type = ConfChangeRemoveNode
+ case 'u':
+ cc.Type = ConfChangeUpdateNode
+ default:
+ return nil, fmt.Errorf("unknown input: %s", tok)
+ }
+ id, err := strconv.ParseUint(tok[1:], 10, 64)
+ if err != nil {
+ return nil, err
+ }
+ cc.NodeID = id
+ ccs = append(ccs, cc)
+ }
+ return ccs, nil
+}
+
+// ConfChangesToString is the inverse to ConfChangesFromString.
+func ConfChangesToString(ccs []ConfChangeSingle) string {
+ var buf strings.Builder
+ for i, cc := range ccs {
+ if i > 0 {
+ buf.WriteByte(' ')
+ }
+ switch cc.Type {
+ case ConfChangeAddNode:
+ buf.WriteByte('v')
+ case ConfChangeAddLearnerNode:
+ buf.WriteByte('l')
+ case ConfChangeRemoveNode:
+ buf.WriteByte('r')
+ case ConfChangeUpdateNode:
+ buf.WriteByte('u')
+ default:
+ buf.WriteString("unknown")
+ }
+ fmt.Fprintf(&buf, "%d", cc.NodeID)
+ }
+ return buf.String()
+}
diff --git a/vendor/go.etcd.io/etcd/raft/raftpb/confstate.go b/vendor/go.etcd.io/etcd/raft/raftpb/confstate.go
new file mode 100644
index 0000000..4bda932
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/raft/raftpb/confstate.go
@@ -0,0 +1,45 @@
+// Copyright 2019 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package raftpb
+
+import (
+ "fmt"
+ "reflect"
+ "sort"
+)
+
+// Equivalent returns a nil error if the inputs describe the same configuration.
+// On mismatch, returns a descriptive error showing the differences.
+func (cs ConfState) Equivalent(cs2 ConfState) error {
+ cs1 := cs
+ orig1, orig2 := cs1, cs2
+ s := func(sl *[]uint64) {
+ *sl = append([]uint64(nil), *sl...)
+ sort.Slice(*sl, func(i, j int) bool { return (*sl)[i] < (*sl)[j] })
+ }
+
+ for _, cs := range []*ConfState{&cs1, &cs2} {
+ s(&cs.Voters)
+ s(&cs.Learners)
+ s(&cs.VotersOutgoing)
+ s(&cs.LearnersNext)
+ cs.XXX_unrecognized = nil
+ }
+
+ if !reflect.DeepEqual(cs1, cs2) {
+ return fmt.Errorf("ConfStates not equivalent after sorting:\n%+#v\n%+#v\nInputs were:\n%+#v\n%+#v", cs1, cs2, orig1, orig2)
+ }
+ return nil
+}
diff --git a/vendor/go.etcd.io/etcd/raft/raftpb/raft.pb.go b/vendor/go.etcd.io/etcd/raft/raftpb/raft.pb.go
index fd9ee37..fcf259c 100644
--- a/vendor/go.etcd.io/etcd/raft/raftpb/raft.pb.go
+++ b/vendor/go.etcd.io/etcd/raft/raftpb/raft.pb.go
@@ -15,6 +15,8 @@
HardState
ConfState
ConfChange
+ ConfChangeSingle
+ ConfChangeV2
*/
package raftpb
@@ -44,17 +46,20 @@
type EntryType int32
const (
- EntryNormal EntryType = 0
- EntryConfChange EntryType = 1
+ EntryNormal EntryType = 0
+ EntryConfChange EntryType = 1
+ EntryConfChangeV2 EntryType = 2
)
var EntryType_name = map[int32]string{
0: "EntryNormal",
1: "EntryConfChange",
+ 2: "EntryConfChangeV2",
}
var EntryType_value = map[string]int32{
- "EntryNormal": 0,
- "EntryConfChange": 1,
+ "EntryNormal": 0,
+ "EntryConfChange": 1,
+ "EntryConfChangeV2": 2,
}
func (x EntryType) Enum() *EntryType {
@@ -160,6 +165,57 @@
}
func (MessageType) EnumDescriptor() ([]byte, []int) { return fileDescriptorRaft, []int{1} }
+// ConfChangeTransition specifies the behavior of a configuration change with
+// respect to joint consensus.
+type ConfChangeTransition int32
+
+const (
+ // Automatically use the simple protocol if possible, otherwise fall back
+ // to ConfChangeJointImplicit. Most applications will want to use this.
+ ConfChangeTransitionAuto ConfChangeTransition = 0
+ // Use joint consensus unconditionally, and transition out of them
+ // automatically (by proposing a zero configuration change).
+ //
+ // This option is suitable for applications that want to minimize the time
+ // spent in the joint configuration and do not store the joint configuration
+ // in the state machine (outside of InitialState).
+ ConfChangeTransitionJointImplicit ConfChangeTransition = 1
+ // Use joint consensus and remain in the joint configuration until the
+ // application proposes a no-op configuration change. This is suitable for
+ // applications that want to explicitly control the transitions, for example
+ // to use a custom payload (via the Context field).
+ ConfChangeTransitionJointExplicit ConfChangeTransition = 2
+)
+
+var ConfChangeTransition_name = map[int32]string{
+ 0: "ConfChangeTransitionAuto",
+ 1: "ConfChangeTransitionJointImplicit",
+ 2: "ConfChangeTransitionJointExplicit",
+}
+var ConfChangeTransition_value = map[string]int32{
+ "ConfChangeTransitionAuto": 0,
+ "ConfChangeTransitionJointImplicit": 1,
+ "ConfChangeTransitionJointExplicit": 2,
+}
+
+func (x ConfChangeTransition) Enum() *ConfChangeTransition {
+ p := new(ConfChangeTransition)
+ *p = x
+ return p
+}
+func (x ConfChangeTransition) String() string {
+ return proto.EnumName(ConfChangeTransition_name, int32(x))
+}
+func (x *ConfChangeTransition) UnmarshalJSON(data []byte) error {
+ value, err := proto.UnmarshalJSONEnum(ConfChangeTransition_value, data, "ConfChangeTransition")
+ if err != nil {
+ return err
+ }
+ *x = ConfChangeTransition(value)
+ return nil
+}
+func (ConfChangeTransition) EnumDescriptor() ([]byte, []int) { return fileDescriptorRaft, []int{2} }
+
type ConfChangeType int32
const (
@@ -198,7 +254,7 @@
*x = ConfChangeType(value)
return nil
}
-func (ConfChangeType) EnumDescriptor() ([]byte, []int) { return fileDescriptorRaft, []int{2} }
+func (ConfChangeType) EnumDescriptor() ([]byte, []int) { return fileDescriptorRaft, []int{3} }
type Entry struct {
Term uint64 `protobuf:"varint,2,opt,name=Term" json:"Term"`
@@ -270,9 +326,21 @@
func (*HardState) Descriptor() ([]byte, []int) { return fileDescriptorRaft, []int{4} }
type ConfState struct {
- Nodes []uint64 `protobuf:"varint,1,rep,name=nodes" json:"nodes,omitempty"`
- Learners []uint64 `protobuf:"varint,2,rep,name=learners" json:"learners,omitempty"`
- XXX_unrecognized []byte `json:"-"`
+ // The voters in the incoming config. (If the configuration is not joint,
+ // then the outgoing config is empty).
+ Voters []uint64 `protobuf:"varint,1,rep,name=voters" json:"voters,omitempty"`
+ // The learners in the incoming config.
+ Learners []uint64 `protobuf:"varint,2,rep,name=learners" json:"learners,omitempty"`
+ // The voters in the outgoing config.
+ VotersOutgoing []uint64 `protobuf:"varint,3,rep,name=voters_outgoing,json=votersOutgoing" json:"voters_outgoing,omitempty"`
+ // The nodes that will become learners when the outgoing config is removed.
+ // These nodes are necessarily currently in nodes_joint (or they would have
+ // been added to the incoming config right away).
+ LearnersNext []uint64 `protobuf:"varint,4,rep,name=learners_next,json=learnersNext" json:"learners_next,omitempty"`
+ // If set, the config is joint and Raft will automatically transition into
+ // the final config (i.e. remove the outgoing config) when this is safe.
+ AutoLeave bool `protobuf:"varint,5,opt,name=auto_leave,json=autoLeave" json:"auto_leave"`
+ XXX_unrecognized []byte `json:"-"`
}
func (m *ConfState) Reset() { *m = ConfState{} }
@@ -281,11 +349,14 @@
func (*ConfState) Descriptor() ([]byte, []int) { return fileDescriptorRaft, []int{5} }
type ConfChange struct {
- ID uint64 `protobuf:"varint,1,opt,name=ID" json:"ID"`
- Type ConfChangeType `protobuf:"varint,2,opt,name=Type,enum=raftpb.ConfChangeType" json:"Type"`
- NodeID uint64 `protobuf:"varint,3,opt,name=NodeID" json:"NodeID"`
- Context []byte `protobuf:"bytes,4,opt,name=Context" json:"Context,omitempty"`
- XXX_unrecognized []byte `json:"-"`
+ Type ConfChangeType `protobuf:"varint,2,opt,name=type,enum=raftpb.ConfChangeType" json:"type"`
+ NodeID uint64 `protobuf:"varint,3,opt,name=node_id,json=nodeId" json:"node_id"`
+ Context []byte `protobuf:"bytes,4,opt,name=context" json:"context,omitempty"`
+ // NB: this is used only by etcd to thread through a unique identifier.
+ // Ideally it should really use the Context instead. No counterpart to
+ // this field exists in ConfChangeV2.
+ ID uint64 `protobuf:"varint,1,opt,name=id" json:"id"`
+ XXX_unrecognized []byte `json:"-"`
}
func (m *ConfChange) Reset() { *m = ConfChange{} }
@@ -293,6 +364,63 @@
func (*ConfChange) ProtoMessage() {}
func (*ConfChange) Descriptor() ([]byte, []int) { return fileDescriptorRaft, []int{6} }
+// ConfChangeSingle is an individual configuration change operation. Multiple
+// such operations can be carried out atomically via a ConfChangeV2.
+type ConfChangeSingle struct {
+ Type ConfChangeType `protobuf:"varint,1,opt,name=type,enum=raftpb.ConfChangeType" json:"type"`
+ NodeID uint64 `protobuf:"varint,2,opt,name=node_id,json=nodeId" json:"node_id"`
+ XXX_unrecognized []byte `json:"-"`
+}
+
+func (m *ConfChangeSingle) Reset() { *m = ConfChangeSingle{} }
+func (m *ConfChangeSingle) String() string { return proto.CompactTextString(m) }
+func (*ConfChangeSingle) ProtoMessage() {}
+func (*ConfChangeSingle) Descriptor() ([]byte, []int) { return fileDescriptorRaft, []int{7} }
+
+// ConfChangeV2 messages initiate configuration changes. They support both the
+// simple "one at a time" membership change protocol and full Joint Consensus
+// allowing for arbitrary changes in membership.
+//
+// The supplied context is treated as an opaque payload and can be used to
+// attach an action on the state machine to the application of the config change
+// proposal. Note that contrary to Joint Consensus as outlined in the Raft
+// paper[1], configuration changes become active when they are *applied* to the
+// state machine (not when they are appended to the log).
+//
+// The simple protocol can be used whenever only a single change is made.
+//
+// Non-simple changes require the use of Joint Consensus, for which two
+// configuration changes are run. The first configuration change specifies the
+// desired changes and transitions the Raft group into the joint configuration,
+// in which quorum requires a majority of both the pre-changes and post-changes
+// configuration. Joint Consensus avoids entering fragile intermediate
+// configurations that could compromise survivability. For example, without the
+// use of Joint Consensus and running across three availability zones with a
+// replication factor of three, it is not possible to replace a voter without
+// entering an intermediate configuration that does not survive the outage of
+// one availability zone.
+//
+// The provided ConfChangeTransition specifies how (and whether) Joint Consensus
+// is used, and assigns the task of leaving the joint configuration either to
+// Raft or the application. Leaving the joint configuration is accomplished by
+// proposing a ConfChangeV2 with only and optionally the Context field
+// populated.
+//
+// For details on Raft membership changes, see:
+//
+// [1]: https://github.com/ongardie/dissertation/blob/master/online-trim.pdf
+type ConfChangeV2 struct {
+ Transition ConfChangeTransition `protobuf:"varint,1,opt,name=transition,enum=raftpb.ConfChangeTransition" json:"transition"`
+ Changes []ConfChangeSingle `protobuf:"bytes,2,rep,name=changes" json:"changes"`
+ Context []byte `protobuf:"bytes,3,opt,name=context" json:"context,omitempty"`
+ XXX_unrecognized []byte `json:"-"`
+}
+
+func (m *ConfChangeV2) Reset() { *m = ConfChangeV2{} }
+func (m *ConfChangeV2) String() string { return proto.CompactTextString(m) }
+func (*ConfChangeV2) ProtoMessage() {}
+func (*ConfChangeV2) Descriptor() ([]byte, []int) { return fileDescriptorRaft, []int{8} }
+
func init() {
proto.RegisterType((*Entry)(nil), "raftpb.Entry")
proto.RegisterType((*SnapshotMetadata)(nil), "raftpb.SnapshotMetadata")
@@ -301,8 +429,11 @@
proto.RegisterType((*HardState)(nil), "raftpb.HardState")
proto.RegisterType((*ConfState)(nil), "raftpb.ConfState")
proto.RegisterType((*ConfChange)(nil), "raftpb.ConfChange")
+ proto.RegisterType((*ConfChangeSingle)(nil), "raftpb.ConfChangeSingle")
+ proto.RegisterType((*ConfChangeV2)(nil), "raftpb.ConfChangeV2")
proto.RegisterEnum("raftpb.EntryType", EntryType_name, EntryType_value)
proto.RegisterEnum("raftpb.MessageType", MessageType_name, MessageType_value)
+ proto.RegisterEnum("raftpb.ConfChangeTransition", ConfChangeTransition_name, ConfChangeTransition_value)
proto.RegisterEnum("raftpb.ConfChangeType", ConfChangeType_name, ConfChangeType_value)
}
func (m *Entry) Marshal() (dAtA []byte, err error) {
@@ -535,8 +666,8 @@
_ = i
var l int
_ = l
- if len(m.Nodes) > 0 {
- for _, num := range m.Nodes {
+ if len(m.Voters) > 0 {
+ for _, num := range m.Voters {
dAtA[i] = 0x8
i++
i = encodeVarintRaft(dAtA, i, uint64(num))
@@ -549,6 +680,28 @@
i = encodeVarintRaft(dAtA, i, uint64(num))
}
}
+ if len(m.VotersOutgoing) > 0 {
+ for _, num := range m.VotersOutgoing {
+ dAtA[i] = 0x18
+ i++
+ i = encodeVarintRaft(dAtA, i, uint64(num))
+ }
+ }
+ if len(m.LearnersNext) > 0 {
+ for _, num := range m.LearnersNext {
+ dAtA[i] = 0x20
+ i++
+ i = encodeVarintRaft(dAtA, i, uint64(num))
+ }
+ }
+ dAtA[i] = 0x28
+ i++
+ if m.AutoLeave {
+ dAtA[i] = 1
+ } else {
+ dAtA[i] = 0
+ }
+ i++
if m.XXX_unrecognized != nil {
i += copy(dAtA[i:], m.XXX_unrecognized)
}
@@ -591,6 +744,75 @@
return i, nil
}
+func (m *ConfChangeSingle) Marshal() (dAtA []byte, err error) {
+ size := m.Size()
+ dAtA = make([]byte, size)
+ n, err := m.MarshalTo(dAtA)
+ if err != nil {
+ return nil, err
+ }
+ return dAtA[:n], nil
+}
+
+func (m *ConfChangeSingle) MarshalTo(dAtA []byte) (int, error) {
+ var i int
+ _ = i
+ var l int
+ _ = l
+ dAtA[i] = 0x8
+ i++
+ i = encodeVarintRaft(dAtA, i, uint64(m.Type))
+ dAtA[i] = 0x10
+ i++
+ i = encodeVarintRaft(dAtA, i, uint64(m.NodeID))
+ if m.XXX_unrecognized != nil {
+ i += copy(dAtA[i:], m.XXX_unrecognized)
+ }
+ return i, nil
+}
+
+func (m *ConfChangeV2) Marshal() (dAtA []byte, err error) {
+ size := m.Size()
+ dAtA = make([]byte, size)
+ n, err := m.MarshalTo(dAtA)
+ if err != nil {
+ return nil, err
+ }
+ return dAtA[:n], nil
+}
+
+func (m *ConfChangeV2) MarshalTo(dAtA []byte) (int, error) {
+ var i int
+ _ = i
+ var l int
+ _ = l
+ dAtA[i] = 0x8
+ i++
+ i = encodeVarintRaft(dAtA, i, uint64(m.Transition))
+ if len(m.Changes) > 0 {
+ for _, msg := range m.Changes {
+ dAtA[i] = 0x12
+ i++
+ i = encodeVarintRaft(dAtA, i, uint64(msg.Size()))
+ n, err := msg.MarshalTo(dAtA[i:])
+ if err != nil {
+ return 0, err
+ }
+ i += n
+ }
+ }
+ if m.Context != nil {
+ dAtA[i] = 0x1a
+ i++
+ i = encodeVarintRaft(dAtA, i, uint64(len(m.Context)))
+ i += copy(dAtA[i:], m.Context)
+ }
+ if m.XXX_unrecognized != nil {
+ i += copy(dAtA[i:], m.XXX_unrecognized)
+ }
+ return i, nil
+}
+
func encodeVarintRaft(dAtA []byte, offset int, v uint64) int {
for v >= 1<<7 {
dAtA[offset] = uint8(v&0x7f | 0x80)
@@ -689,8 +911,8 @@
func (m *ConfState) Size() (n int) {
var l int
_ = l
- if len(m.Nodes) > 0 {
- for _, e := range m.Nodes {
+ if len(m.Voters) > 0 {
+ for _, e := range m.Voters {
n += 1 + sovRaft(uint64(e))
}
}
@@ -699,6 +921,17 @@
n += 1 + sovRaft(uint64(e))
}
}
+ if len(m.VotersOutgoing) > 0 {
+ for _, e := range m.VotersOutgoing {
+ n += 1 + sovRaft(uint64(e))
+ }
+ }
+ if len(m.LearnersNext) > 0 {
+ for _, e := range m.LearnersNext {
+ n += 1 + sovRaft(uint64(e))
+ }
+ }
+ n += 2
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
@@ -721,6 +954,37 @@
return n
}
+func (m *ConfChangeSingle) Size() (n int) {
+ var l int
+ _ = l
+ n += 1 + sovRaft(uint64(m.Type))
+ n += 1 + sovRaft(uint64(m.NodeID))
+ if m.XXX_unrecognized != nil {
+ n += len(m.XXX_unrecognized)
+ }
+ return n
+}
+
+func (m *ConfChangeV2) Size() (n int) {
+ var l int
+ _ = l
+ n += 1 + sovRaft(uint64(m.Transition))
+ if len(m.Changes) > 0 {
+ for _, e := range m.Changes {
+ l = e.Size()
+ n += 1 + l + sovRaft(uint64(l))
+ }
+ }
+ if m.Context != nil {
+ l = len(m.Context)
+ n += 1 + l + sovRaft(uint64(l))
+ }
+ if m.XXX_unrecognized != nil {
+ n += len(m.XXX_unrecognized)
+ }
+ return n
+}
+
func sovRaft(x uint64) (n int) {
for {
n++
@@ -1573,7 +1837,7 @@
break
}
}
- m.Nodes = append(m.Nodes, v)
+ m.Voters = append(m.Voters, v)
} else if wireType == 2 {
var packedLen int
for shift := uint(0); ; shift += 7 {
@@ -1613,10 +1877,10 @@
break
}
}
- m.Nodes = append(m.Nodes, v)
+ m.Voters = append(m.Voters, v)
}
} else {
- return fmt.Errorf("proto: wrong wireType = %d for field Nodes", wireType)
+ return fmt.Errorf("proto: wrong wireType = %d for field Voters", wireType)
}
case 2:
if wireType == 0 {
@@ -1680,6 +1944,150 @@
} else {
return fmt.Errorf("proto: wrong wireType = %d for field Learners", wireType)
}
+ case 3:
+ if wireType == 0 {
+ var v uint64
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ v |= (uint64(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ m.VotersOutgoing = append(m.VotersOutgoing, v)
+ } else if wireType == 2 {
+ var packedLen int
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ packedLen |= (int(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ if packedLen < 0 {
+ return ErrInvalidLengthRaft
+ }
+ postIndex := iNdEx + packedLen
+ if postIndex > l {
+ return io.ErrUnexpectedEOF
+ }
+ for iNdEx < postIndex {
+ var v uint64
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ v |= (uint64(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ m.VotersOutgoing = append(m.VotersOutgoing, v)
+ }
+ } else {
+ return fmt.Errorf("proto: wrong wireType = %d for field VotersOutgoing", wireType)
+ }
+ case 4:
+ if wireType == 0 {
+ var v uint64
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ v |= (uint64(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ m.LearnersNext = append(m.LearnersNext, v)
+ } else if wireType == 2 {
+ var packedLen int
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ packedLen |= (int(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ if packedLen < 0 {
+ return ErrInvalidLengthRaft
+ }
+ postIndex := iNdEx + packedLen
+ if postIndex > l {
+ return io.ErrUnexpectedEOF
+ }
+ for iNdEx < postIndex {
+ var v uint64
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ v |= (uint64(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ m.LearnersNext = append(m.LearnersNext, v)
+ }
+ } else {
+ return fmt.Errorf("proto: wrong wireType = %d for field LearnersNext", wireType)
+ }
+ case 5:
+ if wireType != 0 {
+ return fmt.Errorf("proto: wrong wireType = %d for field AutoLeave", wireType)
+ }
+ var v int
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ v |= (int(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ m.AutoLeave = bool(v != 0)
default:
iNdEx = preIndex
skippy, err := skipRaft(dAtA[iNdEx:])
@@ -1841,6 +2249,227 @@
}
return nil
}
+func (m *ConfChangeSingle) Unmarshal(dAtA []byte) error {
+ l := len(dAtA)
+ iNdEx := 0
+ for iNdEx < l {
+ preIndex := iNdEx
+ var wire uint64
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ wire |= (uint64(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ fieldNum := int32(wire >> 3)
+ wireType := int(wire & 0x7)
+ if wireType == 4 {
+ return fmt.Errorf("proto: ConfChangeSingle: wiretype end group for non-group")
+ }
+ if fieldNum <= 0 {
+ return fmt.Errorf("proto: ConfChangeSingle: illegal tag %d (wire type %d)", fieldNum, wire)
+ }
+ switch fieldNum {
+ case 1:
+ if wireType != 0 {
+ return fmt.Errorf("proto: wrong wireType = %d for field Type", wireType)
+ }
+ m.Type = 0
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ m.Type |= (ConfChangeType(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ case 2:
+ if wireType != 0 {
+ return fmt.Errorf("proto: wrong wireType = %d for field NodeID", wireType)
+ }
+ m.NodeID = 0
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ m.NodeID |= (uint64(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ default:
+ iNdEx = preIndex
+ skippy, err := skipRaft(dAtA[iNdEx:])
+ if err != nil {
+ return err
+ }
+ if skippy < 0 {
+ return ErrInvalidLengthRaft
+ }
+ if (iNdEx + skippy) > l {
+ return io.ErrUnexpectedEOF
+ }
+ m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
+ iNdEx += skippy
+ }
+ }
+
+ if iNdEx > l {
+ return io.ErrUnexpectedEOF
+ }
+ return nil
+}
+func (m *ConfChangeV2) Unmarshal(dAtA []byte) error {
+ l := len(dAtA)
+ iNdEx := 0
+ for iNdEx < l {
+ preIndex := iNdEx
+ var wire uint64
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ wire |= (uint64(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ fieldNum := int32(wire >> 3)
+ wireType := int(wire & 0x7)
+ if wireType == 4 {
+ return fmt.Errorf("proto: ConfChangeV2: wiretype end group for non-group")
+ }
+ if fieldNum <= 0 {
+ return fmt.Errorf("proto: ConfChangeV2: illegal tag %d (wire type %d)", fieldNum, wire)
+ }
+ switch fieldNum {
+ case 1:
+ if wireType != 0 {
+ return fmt.Errorf("proto: wrong wireType = %d for field Transition", wireType)
+ }
+ m.Transition = 0
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ m.Transition |= (ConfChangeTransition(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ case 2:
+ if wireType != 2 {
+ return fmt.Errorf("proto: wrong wireType = %d for field Changes", wireType)
+ }
+ var msglen int
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ msglen |= (int(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ if msglen < 0 {
+ return ErrInvalidLengthRaft
+ }
+ postIndex := iNdEx + msglen
+ if postIndex > l {
+ return io.ErrUnexpectedEOF
+ }
+ m.Changes = append(m.Changes, ConfChangeSingle{})
+ if err := m.Changes[len(m.Changes)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
+ return err
+ }
+ iNdEx = postIndex
+ case 3:
+ if wireType != 2 {
+ return fmt.Errorf("proto: wrong wireType = %d for field Context", wireType)
+ }
+ var byteLen int
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ byteLen |= (int(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ if byteLen < 0 {
+ return ErrInvalidLengthRaft
+ }
+ postIndex := iNdEx + byteLen
+ if postIndex > l {
+ return io.ErrUnexpectedEOF
+ }
+ m.Context = append(m.Context[:0], dAtA[iNdEx:postIndex]...)
+ if m.Context == nil {
+ m.Context = []byte{}
+ }
+ iNdEx = postIndex
+ default:
+ iNdEx = preIndex
+ skippy, err := skipRaft(dAtA[iNdEx:])
+ if err != nil {
+ return err
+ }
+ if skippy < 0 {
+ return ErrInvalidLengthRaft
+ }
+ if (iNdEx + skippy) > l {
+ return io.ErrUnexpectedEOF
+ }
+ m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
+ iNdEx += skippy
+ }
+ }
+
+ if iNdEx > l {
+ return io.ErrUnexpectedEOF
+ }
+ return nil
+}
func skipRaft(dAtA []byte) (n int, err error) {
l := len(dAtA)
iNdEx := 0
@@ -1949,56 +2578,69 @@
func init() { proto.RegisterFile("raft.proto", fileDescriptorRaft) }
var fileDescriptorRaft = []byte{
- // 815 bytes of a gzipped FileDescriptorProto
- 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x64, 0x54, 0xcd, 0x6e, 0x23, 0x45,
- 0x10, 0xf6, 0x8c, 0xc7, 0x7f, 0x35, 0x8e, 0xd3, 0xa9, 0x35, 0xa8, 0x15, 0x45, 0xc6, 0xb2, 0x38,
- 0x58, 0x41, 0x1b, 0x20, 0x07, 0x0e, 0x48, 0x1c, 0x36, 0x09, 0x52, 0x22, 0xad, 0xa3, 0xc5, 0x9b,
- 0xe5, 0x80, 0x84, 0x50, 0xc7, 0x53, 0x9e, 0x18, 0x32, 0xd3, 0xa3, 0x9e, 0xf6, 0xb2, 0xb9, 0x20,
- 0x1e, 0x80, 0x07, 0xe0, 0xc2, 0xfb, 0xe4, 0xb8, 0x12, 0x77, 0xc4, 0x86, 0x17, 0x41, 0xdd, 0xd3,
- 0x63, 0xcf, 0x24, 0xb7, 0xae, 0xef, 0xab, 0xae, 0xfa, 0xea, 0xeb, 0x9a, 0x01, 0x50, 0x62, 0xa9,
- 0x8f, 0x32, 0x25, 0xb5, 0xc4, 0xb6, 0x39, 0x67, 0xd7, 0xfb, 0xc3, 0x58, 0xc6, 0xd2, 0x42, 0x9f,
- 0x9b, 0x53, 0xc1, 0x4e, 0x7e, 0x83, 0xd6, 0xb7, 0xa9, 0x56, 0x77, 0xf8, 0x19, 0x04, 0x57, 0x77,
- 0x19, 0x71, 0x6f, 0xec, 0x4d, 0x07, 0xc7, 0x7b, 0x47, 0xc5, 0xad, 0x23, 0x4b, 0x1a, 0xe2, 0x24,
- 0xb8, 0xff, 0xe7, 0x93, 0xc6, 0xdc, 0x26, 0x21, 0x87, 0xe0, 0x8a, 0x54, 0xc2, 0xfd, 0xb1, 0x37,
- 0x0d, 0x36, 0x0c, 0xa9, 0x04, 0xf7, 0xa1, 0x75, 0x91, 0x46, 0xf4, 0x8e, 0x37, 0x2b, 0x54, 0x01,
- 0x21, 0x42, 0x70, 0x26, 0xb4, 0xe0, 0xc1, 0xd8, 0x9b, 0xf6, 0xe7, 0xf6, 0x3c, 0xf9, 0xdd, 0x03,
- 0xf6, 0x3a, 0x15, 0x59, 0x7e, 0x23, 0xf5, 0x8c, 0xb4, 0x88, 0x84, 0x16, 0xf8, 0x15, 0xc0, 0x42,
- 0xa6, 0xcb, 0x9f, 0x72, 0x2d, 0x74, 0xa1, 0x28, 0xdc, 0x2a, 0x3a, 0x95, 0xe9, 0xf2, 0xb5, 0x21,
- 0x5c, 0xf1, 0xde, 0xa2, 0x04, 0x4c, 0xf3, 0x95, 0x6d, 0x5e, 0xd5, 0x55, 0x40, 0x46, 0xb2, 0x36,
- 0x92, 0xab, 0xba, 0x2c, 0x32, 0xf9, 0x01, 0xba, 0xa5, 0x02, 0x23, 0xd1, 0x28, 0xb0, 0x3d, 0xfb,
- 0x73, 0x7b, 0xc6, 0xaf, 0xa1, 0x9b, 0x38, 0x65, 0xb6, 0x70, 0x78, 0xcc, 0x4b, 0x2d, 0x8f, 0x95,
- 0xbb, 0xba, 0x9b, 0xfc, 0xc9, 0x5f, 0x4d, 0xe8, 0xcc, 0x28, 0xcf, 0x45, 0x4c, 0xf8, 0x1c, 0x02,
- 0xbd, 0x75, 0xf8, 0x59, 0x59, 0xc3, 0xd1, 0x55, 0x8f, 0x4d, 0x1a, 0x0e, 0xc1, 0xd7, 0xb2, 0x36,
- 0x89, 0xaf, 0xa5, 0x19, 0x63, 0xa9, 0xe4, 0xa3, 0x31, 0x0c, 0xb2, 0x19, 0x30, 0x78, 0x3c, 0x20,
- 0x8e, 0xa0, 0x73, 0x2b, 0x63, 0xfb, 0x60, 0xad, 0x0a, 0x59, 0x82, 0x5b, 0xdb, 0xda, 0x4f, 0x6d,
- 0x7b, 0x0e, 0x1d, 0x4a, 0xb5, 0x5a, 0x51, 0xce, 0x3b, 0xe3, 0xe6, 0x34, 0x3c, 0xde, 0xa9, 0x6d,
- 0x46, 0x59, 0xca, 0xe5, 0xe0, 0x01, 0xb4, 0x17, 0x32, 0x49, 0x56, 0x9a, 0x77, 0x2b, 0xb5, 0x1c,
- 0x86, 0xc7, 0xd0, 0xcd, 0x9d, 0x63, 0xbc, 0x67, 0x9d, 0x64, 0x8f, 0x9d, 0x2c, 0x1d, 0x2c, 0xf3,
- 0x4c, 0x45, 0x45, 0x3f, 0xd3, 0x42, 0x73, 0x18, 0x7b, 0xd3, 0x6e, 0x59, 0xb1, 0xc0, 0xf0, 0x53,
- 0x80, 0xe2, 0x74, 0xbe, 0x4a, 0x35, 0x0f, 0x2b, 0x3d, 0x2b, 0x38, 0x72, 0xe8, 0x2c, 0x64, 0xaa,
- 0xe9, 0x9d, 0xe6, 0x7d, 0xfb, 0xb0, 0x65, 0x38, 0xf9, 0x11, 0x7a, 0xe7, 0x42, 0x45, 0xc5, 0xfa,
- 0x94, 0x0e, 0x7a, 0x4f, 0x1c, 0xe4, 0x10, 0xbc, 0x95, 0x9a, 0xea, 0xfb, 0x6e, 0x90, 0xca, 0xc0,
- 0xcd, 0xa7, 0x03, 0x4f, 0xbe, 0x81, 0xde, 0x66, 0x5d, 0x71, 0x08, 0xad, 0x54, 0x46, 0x94, 0x73,
- 0x6f, 0xdc, 0x9c, 0x06, 0xf3, 0x22, 0xc0, 0x7d, 0xe8, 0xde, 0x92, 0x50, 0x29, 0xa9, 0x9c, 0xfb,
- 0x96, 0xd8, 0xc4, 0x93, 0x3f, 0x3c, 0x00, 0x73, 0xff, 0xf4, 0x46, 0xa4, 0xb1, 0xdd, 0x88, 0x8b,
- 0xb3, 0x9a, 0x3a, 0xff, 0xe2, 0x0c, 0xbf, 0x70, 0x1f, 0xae, 0x6f, 0xd7, 0xea, 0xe3, 0xea, 0x67,
- 0x52, 0xdc, 0x7b, 0xf2, 0xf5, 0x1e, 0x40, 0xfb, 0x52, 0x46, 0x74, 0x71, 0x56, 0xd7, 0x5c, 0x60,
- 0xc6, 0xac, 0x53, 0x67, 0x56, 0xf1, 0xa1, 0x96, 0xe1, 0xe1, 0x97, 0xd0, 0xdb, 0xfc, 0x0e, 0x70,
- 0x17, 0x42, 0x1b, 0x5c, 0x4a, 0x95, 0x88, 0x5b, 0xd6, 0xc0, 0x67, 0xb0, 0x6b, 0x81, 0x6d, 0x63,
- 0xe6, 0x1d, 0xfe, 0xed, 0x43, 0x58, 0x59, 0x70, 0x04, 0x68, 0xcf, 0xf2, 0xf8, 0x7c, 0x9d, 0xb1,
- 0x06, 0x86, 0xd0, 0x99, 0xe5, 0xf1, 0x09, 0x09, 0xcd, 0x3c, 0x17, 0xbc, 0x52, 0x32, 0x63, 0xbe,
- 0xcb, 0x7a, 0x91, 0x65, 0xac, 0x89, 0x03, 0x80, 0xe2, 0x3c, 0xa7, 0x3c, 0x63, 0x81, 0x4b, 0xfc,
- 0x5e, 0x6a, 0x62, 0x2d, 0x23, 0xc2, 0x05, 0x96, 0x6d, 0x3b, 0xd6, 0x2c, 0x13, 0xeb, 0x20, 0x83,
- 0xbe, 0x69, 0x46, 0x42, 0xe9, 0x6b, 0xd3, 0xa5, 0x8b, 0x43, 0x60, 0x55, 0xc4, 0x5e, 0xea, 0x21,
- 0xc2, 0x60, 0x96, 0xc7, 0x6f, 0x52, 0x45, 0x62, 0x71, 0x23, 0xae, 0x6f, 0x89, 0x01, 0xee, 0xc1,
- 0x8e, 0x2b, 0x64, 0x1e, 0x6f, 0x9d, 0xb3, 0xd0, 0xa5, 0x9d, 0xde, 0xd0, 0xe2, 0x97, 0xef, 0xd6,
- 0x52, 0xad, 0x13, 0xd6, 0xc7, 0x8f, 0x60, 0x6f, 0x96, 0xc7, 0x57, 0x4a, 0xa4, 0xf9, 0x92, 0xd4,
- 0x4b, 0x12, 0x11, 0x29, 0xb6, 0xe3, 0x6e, 0x5f, 0xad, 0x12, 0x92, 0x6b, 0x7d, 0x29, 0x7f, 0x65,
- 0x03, 0x27, 0x66, 0x4e, 0x22, 0xb2, 0x3f, 0x43, 0xb6, 0xeb, 0xc4, 0x6c, 0x10, 0x2b, 0x86, 0xb9,
- 0x79, 0x5f, 0x29, 0xb2, 0x23, 0xee, 0xb9, 0xae, 0x2e, 0xb6, 0x39, 0x78, 0x78, 0x07, 0x83, 0xfa,
- 0xf3, 0x1a, 0x1d, 0x5b, 0xe4, 0x45, 0x14, 0x99, 0xb7, 0x64, 0x0d, 0xe4, 0x30, 0xdc, 0xc2, 0x73,
- 0x4a, 0xe4, 0x5b, 0xb2, 0x8c, 0x57, 0x67, 0xde, 0x64, 0x91, 0xd0, 0x05, 0xe3, 0xe3, 0x01, 0xf0,
- 0x5a, 0xa9, 0x97, 0xc5, 0x36, 0x5a, 0xb6, 0x79, 0xc2, 0xef, 0x3f, 0x8c, 0x1a, 0xef, 0x3f, 0x8c,
- 0x1a, 0xf7, 0x0f, 0x23, 0xef, 0xfd, 0xc3, 0xc8, 0xfb, 0xf7, 0x61, 0xe4, 0xfd, 0xf9, 0xdf, 0xa8,
- 0xf1, 0x7f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x86, 0x52, 0x5b, 0xe0, 0x74, 0x06, 0x00, 0x00,
+ // 1009 bytes of a gzipped FileDescriptorProto
+ 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x55, 0xcd, 0x6e, 0xe3, 0x36,
+ 0x17, 0xb5, 0x64, 0xc5, 0x3f, 0xd7, 0x8e, 0xc3, 0xdc, 0xc9, 0x37, 0x20, 0x82, 0xc0, 0xe3, 0xcf,
+ 0xd3, 0x62, 0x8c, 0x14, 0x93, 0x16, 0x5e, 0x14, 0x45, 0x77, 0xf9, 0x19, 0x20, 0x29, 0xe2, 0x74,
+ 0xea, 0x64, 0xb2, 0x28, 0x50, 0x04, 0x8c, 0x45, 0x2b, 0x6a, 0x2d, 0x51, 0xa0, 0xe8, 0x34, 0xd9,
+ 0x14, 0x45, 0x9f, 0xa2, 0x9b, 0xd9, 0xf6, 0x01, 0xfa, 0x14, 0x59, 0x0e, 0xd0, 0xfd, 0xa0, 0x93,
+ 0xbe, 0x48, 0x41, 0x8a, 0xb2, 0x65, 0x27, 0x98, 0x45, 0x77, 0xe4, 0x39, 0x87, 0xf7, 0x9e, 0x7b,
+ 0x79, 0x45, 0x01, 0x48, 0x36, 0x56, 0x3b, 0x89, 0x14, 0x4a, 0x60, 0x45, 0xaf, 0x93, 0xcb, 0xcd,
+ 0x8d, 0x40, 0x04, 0xc2, 0x40, 0x9f, 0xeb, 0x55, 0xc6, 0x76, 0x7f, 0x81, 0x95, 0x57, 0xb1, 0x92,
+ 0xb7, 0xf8, 0x19, 0x78, 0x67, 0xb7, 0x09, 0xa7, 0x4e, 0xc7, 0xe9, 0xb5, 0xfa, 0xeb, 0x3b, 0xd9,
+ 0xa9, 0x1d, 0x43, 0x6a, 0x62, 0xcf, 0xbb, 0x7b, 0xff, 0xac, 0x34, 0x34, 0x22, 0xa4, 0xe0, 0x9d,
+ 0x71, 0x19, 0x51, 0xb7, 0xe3, 0xf4, 0xbc, 0x19, 0xc3, 0x65, 0x84, 0x9b, 0xb0, 0x72, 0x14, 0xfb,
+ 0xfc, 0x86, 0x96, 0x0b, 0x54, 0x06, 0x21, 0x82, 0x77, 0xc0, 0x14, 0xa3, 0x5e, 0xc7, 0xe9, 0x35,
+ 0x87, 0x66, 0xdd, 0xfd, 0xd5, 0x01, 0x72, 0x1a, 0xb3, 0x24, 0xbd, 0x12, 0x6a, 0xc0, 0x15, 0xf3,
+ 0x99, 0x62, 0xf8, 0x25, 0xc0, 0x48, 0xc4, 0xe3, 0x8b, 0x54, 0x31, 0x95, 0x39, 0x6a, 0xcc, 0x1d,
+ 0xed, 0x8b, 0x78, 0x7c, 0xaa, 0x09, 0x1b, 0xbc, 0x3e, 0xca, 0x01, 0x9d, 0x3c, 0x34, 0xc9, 0x8b,
+ 0xbe, 0x32, 0x48, 0x5b, 0x56, 0xda, 0x72, 0xd1, 0x97, 0x41, 0xba, 0xdf, 0x43, 0x2d, 0x77, 0xa0,
+ 0x2d, 0x6a, 0x07, 0x26, 0x67, 0x73, 0x68, 0xd6, 0xf8, 0x35, 0xd4, 0x22, 0xeb, 0xcc, 0x04, 0x6e,
+ 0xf4, 0x69, 0xee, 0x65, 0xd9, 0xb9, 0x8d, 0x3b, 0xd3, 0x77, 0xdf, 0x96, 0xa1, 0x3a, 0xe0, 0x69,
+ 0xca, 0x02, 0x8e, 0x2f, 0xc1, 0x53, 0xf3, 0x0e, 0x3f, 0xc9, 0x63, 0x58, 0xba, 0xd8, 0x63, 0x2d,
+ 0xc3, 0x0d, 0x70, 0x95, 0x58, 0xa8, 0xc4, 0x55, 0x42, 0x97, 0x31, 0x96, 0x62, 0xa9, 0x0c, 0x8d,
+ 0xcc, 0x0a, 0xf4, 0x96, 0x0b, 0xc4, 0x36, 0x54, 0x27, 0x22, 0x30, 0x17, 0xb6, 0x52, 0x20, 0x73,
+ 0x70, 0xde, 0xb6, 0xca, 0xc3, 0xb6, 0xbd, 0x84, 0x2a, 0x8f, 0x95, 0x0c, 0x79, 0x4a, 0xab, 0x9d,
+ 0x72, 0xaf, 0xd1, 0x5f, 0x5d, 0x98, 0x8c, 0x3c, 0x94, 0xd5, 0xe0, 0x16, 0x54, 0x46, 0x22, 0x8a,
+ 0x42, 0x45, 0x6b, 0x85, 0x58, 0x16, 0xc3, 0x3e, 0xd4, 0x52, 0xdb, 0x31, 0x5a, 0x37, 0x9d, 0x24,
+ 0xcb, 0x9d, 0xcc, 0x3b, 0x98, 0xeb, 0x74, 0x44, 0xc9, 0x7f, 0xe4, 0x23, 0x45, 0xa1, 0xe3, 0xf4,
+ 0x6a, 0x79, 0xc4, 0x0c, 0xc3, 0x4f, 0x00, 0xb2, 0xd5, 0x61, 0x18, 0x2b, 0xda, 0x28, 0xe4, 0x2c,
+ 0xe0, 0x48, 0xa1, 0x3a, 0x12, 0xb1, 0xe2, 0x37, 0x8a, 0x36, 0xcd, 0xc5, 0xe6, 0xdb, 0xee, 0x0f,
+ 0x50, 0x3f, 0x64, 0xd2, 0xcf, 0xc6, 0x27, 0xef, 0xa0, 0xf3, 0xa0, 0x83, 0x14, 0xbc, 0x6b, 0xa1,
+ 0xf8, 0xe2, 0xbc, 0x6b, 0xa4, 0x50, 0x70, 0xf9, 0x61, 0xc1, 0xdd, 0x3f, 0x1d, 0xa8, 0xcf, 0xe6,
+ 0x15, 0x9f, 0x42, 0x45, 0x9f, 0x91, 0x29, 0x75, 0x3a, 0xe5, 0x9e, 0x37, 0xb4, 0x3b, 0xdc, 0x84,
+ 0xda, 0x84, 0x33, 0x19, 0x6b, 0xc6, 0x35, 0xcc, 0x6c, 0x8f, 0x2f, 0x60, 0x2d, 0x53, 0x5d, 0x88,
+ 0xa9, 0x0a, 0x44, 0x18, 0x07, 0xb4, 0x6c, 0x24, 0xad, 0x0c, 0xfe, 0xd6, 0xa2, 0xf8, 0x1c, 0x56,
+ 0xf3, 0x43, 0x17, 0xb1, 0xae, 0xd4, 0x33, 0xb2, 0x66, 0x0e, 0x9e, 0xf0, 0x1b, 0x85, 0xcf, 0x01,
+ 0xd8, 0x54, 0x89, 0x8b, 0x09, 0x67, 0xd7, 0xdc, 0x0c, 0x43, 0xde, 0xd0, 0xba, 0xc6, 0x8f, 0x35,
+ 0xdc, 0x7d, 0xeb, 0x00, 0x68, 0xd3, 0xfb, 0x57, 0x2c, 0x0e, 0xf4, 0x47, 0xe5, 0x86, 0xbe, 0xed,
+ 0x09, 0x68, 0xed, 0xfd, 0xfb, 0x67, 0xee, 0xd1, 0xc1, 0xd0, 0x0d, 0x7d, 0xfc, 0xc2, 0x8e, 0xb4,
+ 0x6b, 0x46, 0xfa, 0x69, 0xf1, 0x13, 0xcd, 0x4e, 0x3f, 0x98, 0xea, 0x17, 0x50, 0x8d, 0x85, 0xcf,
+ 0x2f, 0x42, 0xdf, 0x36, 0xac, 0x65, 0x43, 0x56, 0x4e, 0x84, 0xcf, 0x8f, 0x0e, 0x86, 0x15, 0x4d,
+ 0x1f, 0xf9, 0xc5, 0x3b, 0xf3, 0x16, 0xef, 0x2c, 0x02, 0x32, 0x4f, 0x70, 0x1a, 0xc6, 0xc1, 0x84,
+ 0xcf, 0x8c, 0x38, 0xff, 0xc5, 0x88, 0xfb, 0x31, 0x23, 0xdd, 0x3f, 0x1c, 0x68, 0xce, 0xe3, 0x9c,
+ 0xf7, 0x71, 0x0f, 0x40, 0x49, 0x16, 0xa7, 0xa1, 0x0a, 0x45, 0x6c, 0x33, 0x6e, 0x3d, 0x92, 0x71,
+ 0xa6, 0xc9, 0x27, 0x72, 0x7e, 0x0a, 0xbf, 0x82, 0xea, 0xc8, 0xa8, 0xb2, 0x1b, 0x2f, 0x3c, 0x29,
+ 0xcb, 0xa5, 0xe5, 0x5f, 0x98, 0x95, 0x17, 0xfb, 0x52, 0x5e, 0xe8, 0xcb, 0xf6, 0x21, 0xd4, 0x67,
+ 0xaf, 0x35, 0xae, 0x41, 0xc3, 0x6c, 0x4e, 0x84, 0x8c, 0xd8, 0x84, 0x94, 0xf0, 0x09, 0xac, 0x19,
+ 0x60, 0x1e, 0x9f, 0x38, 0xf8, 0x3f, 0x58, 0x5f, 0x02, 0xcf, 0xfb, 0xc4, 0xdd, 0xfe, 0xcb, 0x85,
+ 0x46, 0xe1, 0x59, 0x42, 0x80, 0xca, 0x20, 0x0d, 0x0e, 0xa7, 0x09, 0x29, 0x61, 0x03, 0xaa, 0x83,
+ 0x34, 0xd8, 0xe3, 0x4c, 0x11, 0xc7, 0x6e, 0x5e, 0x4b, 0x91, 0x10, 0xd7, 0xaa, 0x76, 0x93, 0x84,
+ 0x94, 0xb1, 0x05, 0x90, 0xad, 0x87, 0x3c, 0x4d, 0x88, 0x67, 0x85, 0xe7, 0x42, 0x71, 0xb2, 0xa2,
+ 0xbd, 0xd9, 0x8d, 0x61, 0x2b, 0x96, 0xd5, 0x4f, 0x00, 0xa9, 0x22, 0x81, 0xa6, 0x4e, 0xc6, 0x99,
+ 0x54, 0x97, 0x3a, 0x4b, 0x0d, 0x37, 0x80, 0x14, 0x11, 0x73, 0xa8, 0x8e, 0x08, 0xad, 0x41, 0x1a,
+ 0xbc, 0x89, 0x25, 0x67, 0xa3, 0x2b, 0x76, 0x39, 0xe1, 0x04, 0x70, 0x1d, 0x56, 0x6d, 0x20, 0xfd,
+ 0xc5, 0x4d, 0x53, 0xd2, 0xb0, 0xb2, 0xfd, 0x2b, 0x3e, 0xfa, 0xe9, 0xbb, 0xa9, 0x90, 0xd3, 0x88,
+ 0x34, 0x75, 0xd9, 0x83, 0x34, 0x30, 0x17, 0x34, 0xe6, 0xf2, 0x98, 0x33, 0x9f, 0x4b, 0xb2, 0x6a,
+ 0x4f, 0x9f, 0x85, 0x11, 0x17, 0x53, 0x75, 0x22, 0x7e, 0x26, 0x2d, 0x6b, 0x66, 0xc8, 0x99, 0x6f,
+ 0x7e, 0x61, 0x64, 0xcd, 0x9a, 0x99, 0x21, 0xc6, 0x0c, 0xb1, 0xf5, 0xbe, 0x96, 0xdc, 0x94, 0xb8,
+ 0x6e, 0xb3, 0xda, 0xbd, 0xd1, 0xe0, 0xf6, 0x6f, 0x0e, 0x6c, 0x3c, 0x36, 0x1e, 0xb8, 0x05, 0xf4,
+ 0x31, 0x7c, 0x77, 0xaa, 0x04, 0x29, 0xe1, 0xa7, 0xf0, 0xff, 0xc7, 0xd8, 0x6f, 0x44, 0x18, 0xab,
+ 0xa3, 0x28, 0x99, 0x84, 0xa3, 0x50, 0x5f, 0xc5, 0xc7, 0x64, 0xaf, 0x6e, 0xac, 0xcc, 0xdd, 0xbe,
+ 0x85, 0xd6, 0xe2, 0x47, 0xa1, 0x9b, 0x31, 0x47, 0x76, 0x7d, 0x5f, 0x8f, 0x3f, 0x29, 0x21, 0x2d,
+ 0x9a, 0x1d, 0xf2, 0x48, 0x5c, 0x73, 0xc3, 0x38, 0x8b, 0xcc, 0x9b, 0xc4, 0x67, 0x2a, 0x63, 0xdc,
+ 0xc5, 0x42, 0x76, 0x7d, 0xff, 0x38, 0x7b, 0x7b, 0x0c, 0x5b, 0xde, 0xa3, 0x77, 0x1f, 0xda, 0xa5,
+ 0x77, 0x1f, 0xda, 0xa5, 0xbb, 0xfb, 0xb6, 0xf3, 0xee, 0xbe, 0xed, 0xfc, 0x7d, 0xdf, 0x76, 0x7e,
+ 0xff, 0xa7, 0x5d, 0xfa, 0x37, 0x00, 0x00, 0xff, 0xff, 0x87, 0x11, 0x6d, 0xd6, 0xaf, 0x08, 0x00,
+ 0x00,
}
diff --git a/vendor/go.etcd.io/etcd/raft/raftpb/raft.proto b/vendor/go.etcd.io/etcd/raft/raftpb/raft.proto
index 644ce7b..23d62ec 100644
--- a/vendor/go.etcd.io/etcd/raft/raftpb/raft.proto
+++ b/vendor/go.etcd.io/etcd/raft/raftpb/raft.proto
@@ -10,8 +10,9 @@
option (gogoproto.goproto_enum_prefix_all) = false;
enum EntryType {
- EntryNormal = 0;
- EntryConfChange = 1;
+ EntryNormal = 0;
+ EntryConfChange = 1; // corresponds to pb.ConfChange
+ EntryConfChangeV2 = 2; // corresponds to pb.ConfChangeV2
}
message Entry {
@@ -75,9 +76,41 @@
optional uint64 commit = 3 [(gogoproto.nullable) = false];
}
+// ConfChangeTransition specifies the behavior of a configuration change with
+// respect to joint consensus.
+enum ConfChangeTransition {
+ // Automatically use the simple protocol if possible, otherwise fall back
+ // to ConfChangeJointImplicit. Most applications will want to use this.
+ ConfChangeTransitionAuto = 0;
+ // Use joint consensus unconditionally, and transition out of them
+ // automatically (by proposing a zero configuration change).
+ //
+ // This option is suitable for applications that want to minimize the time
+ // spent in the joint configuration and do not store the joint configuration
+ // in the state machine (outside of InitialState).
+ ConfChangeTransitionJointImplicit = 1;
+ // Use joint consensus and remain in the joint configuration until the
+ // application proposes a no-op configuration change. This is suitable for
+ // applications that want to explicitly control the transitions, for example
+ // to use a custom payload (via the Context field).
+ ConfChangeTransitionJointExplicit = 2;
+}
+
message ConfState {
- repeated uint64 nodes = 1;
- repeated uint64 learners = 2;
+ // The voters in the incoming config. (If the configuration is not joint,
+ // then the outgoing config is empty).
+ repeated uint64 voters = 1;
+ // The learners in the incoming config.
+ repeated uint64 learners = 2;
+ // The voters in the outgoing config.
+ repeated uint64 voters_outgoing = 3;
+ // The nodes that will become learners when the outgoing config is removed.
+ // These nodes are necessarily currently in nodes_joint (or they would have
+ // been added to the incoming config right away).
+ repeated uint64 learners_next = 4;
+ // If set, the config is joint and Raft will automatically transition into
+ // the final config (i.e. remove the outgoing config) when this is safe.
+ optional bool auto_leave = 5 [(gogoproto.nullable) = false];
}
enum ConfChangeType {
@@ -88,8 +121,57 @@
}
message ConfChange {
- optional uint64 ID = 1 [(gogoproto.nullable) = false];
- optional ConfChangeType Type = 2 [(gogoproto.nullable) = false];
- optional uint64 NodeID = 3 [(gogoproto.nullable) = false];
- optional bytes Context = 4;
+ optional ConfChangeType type = 2 [(gogoproto.nullable) = false];
+ optional uint64 node_id = 3 [(gogoproto.nullable) = false, (gogoproto.customname) = "NodeID" ];
+ optional bytes context = 4;
+
+ // NB: this is used only by etcd to thread through a unique identifier.
+ // Ideally it should really use the Context instead. No counterpart to
+ // this field exists in ConfChangeV2.
+ optional uint64 id = 1 [(gogoproto.nullable) = false, (gogoproto.customname) = "ID" ];
+}
+
+// ConfChangeSingle is an individual configuration change operation. Multiple
+// such operations can be carried out atomically via a ConfChangeV2.
+message ConfChangeSingle {
+ optional ConfChangeType type = 1 [(gogoproto.nullable) = false];
+ optional uint64 node_id = 2 [(gogoproto.nullable) = false, (gogoproto.customname) = "NodeID"];
+}
+
+// ConfChangeV2 messages initiate configuration changes. They support both the
+// simple "one at a time" membership change protocol and full Joint Consensus
+// allowing for arbitrary changes in membership.
+//
+// The supplied context is treated as an opaque payload and can be used to
+// attach an action on the state machine to the application of the config change
+// proposal. Note that contrary to Joint Consensus as outlined in the Raft
+// paper[1], configuration changes become active when they are *applied* to the
+// state machine (not when they are appended to the log).
+//
+// The simple protocol can be used whenever only a single change is made.
+//
+// Non-simple changes require the use of Joint Consensus, for which two
+// configuration changes are run. The first configuration change specifies the
+// desired changes and transitions the Raft group into the joint configuration,
+// in which quorum requires a majority of both the pre-changes and post-changes
+// configuration. Joint Consensus avoids entering fragile intermediate
+// configurations that could compromise survivability. For example, without the
+// use of Joint Consensus and running across three availability zones with a
+// replication factor of three, it is not possible to replace a voter without
+// entering an intermediate configuration that does not survive the outage of
+// one availability zone.
+//
+// The provided ConfChangeTransition specifies how (and whether) Joint Consensus
+// is used, and assigns the task of leaving the joint configuration either to
+// Raft or the application. Leaving the joint configuration is accomplished by
+// proposing a ConfChangeV2 with only and optionally the Context field
+// populated.
+//
+// For details on Raft membership changes, see:
+//
+// [1]: https://github.com/ongardie/dissertation/blob/master/online-trim.pdf
+message ConfChangeV2 {
+ optional ConfChangeTransition transition = 1 [(gogoproto.nullable) = false];
+ repeated ConfChangeSingle changes = 2 [(gogoproto.nullable) = false];
+ optional bytes context = 3;
}
diff --git a/vendor/go.etcd.io/etcd/raft/rawnode.go b/vendor/go.etcd.io/etcd/raft/rawnode.go
index 77183b7..90eb694 100644
--- a/vendor/go.etcd.io/etcd/raft/rawnode.go
+++ b/vendor/go.etcd.io/etcd/raft/rawnode.go
@@ -37,82 +37,20 @@
prevHardSt pb.HardState
}
-func (rn *RawNode) newReady() Ready {
- return newReady(rn.raft, rn.prevSoftSt, rn.prevHardSt)
-}
-
-func (rn *RawNode) commitReady(rd Ready) {
- if rd.SoftState != nil {
- rn.prevSoftSt = rd.SoftState
- }
- if !IsEmptyHardState(rd.HardState) {
- rn.prevHardSt = rd.HardState
- }
-
- // If entries were applied (or a snapshot), update our cursor for
- // the next Ready. Note that if the current HardState contains a
- // new Commit index, this does not mean that we're also applying
- // all of the new entries due to commit pagination by size.
- if index := rd.appliedCursor(); index > 0 {
- rn.raft.raftLog.appliedTo(index)
- }
-
- if len(rd.Entries) > 0 {
- e := rd.Entries[len(rd.Entries)-1]
- rn.raft.raftLog.stableTo(e.Index, e.Term)
- }
- if !IsEmptySnap(rd.Snapshot) {
- rn.raft.raftLog.stableSnapTo(rd.Snapshot.Metadata.Index)
- }
- if len(rd.ReadStates) != 0 {
- rn.raft.readStates = nil
- }
-}
-
-// NewRawNode returns a new RawNode given configuration and a list of raft peers.
-func NewRawNode(config *Config, peers []Peer) (*RawNode, error) {
- if config.ID == 0 {
- panic("config.ID must not be zero")
- }
+// NewRawNode instantiates a RawNode from the given configuration.
+//
+// See Bootstrap() for bootstrapping an initial state; this replaces the former
+// 'peers' argument to this method (with identical behavior). However, It is
+// recommended that instead of calling Bootstrap, applications bootstrap their
+// state manually by setting up a Storage that has a first index > 1 and which
+// stores the desired ConfState as its InitialState.
+func NewRawNode(config *Config) (*RawNode, error) {
r := newRaft(config)
rn := &RawNode{
raft: r,
}
- lastIndex, err := config.Storage.LastIndex()
- if err != nil {
- panic(err) // TODO(bdarnell)
- }
- // If the log is empty, this is a new RawNode (like StartNode); otherwise it's
- // restoring an existing RawNode (like RestartNode).
- // TODO(bdarnell): rethink RawNode initialization and whether the application needs
- // to be able to tell us when it expects the RawNode to exist.
- if lastIndex == 0 {
- r.becomeFollower(1, None)
- ents := make([]pb.Entry, len(peers))
- for i, peer := range peers {
- cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context}
- data, err := cc.Marshal()
- if err != nil {
- panic("unexpected marshal error")
- }
-
- ents[i] = pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: uint64(i + 1), Data: data}
- }
- r.raftLog.append(ents...)
- r.raftLog.committed = uint64(len(ents))
- for _, peer := range peers {
- r.applyConfChange(pb.ConfChange{NodeID: peer.ID, Type: pb.ConfChangeAddNode})
- }
- }
-
- // Set the initial hard and soft states after performing all initialization.
rn.prevSoftSt = r.softState()
- if lastIndex == 0 {
- rn.prevHardSt = emptyState
- } else {
- rn.prevHardSt = r.hardState()
- }
-
+ rn.prevHardSt = r.hardState()
return rn, nil
}
@@ -150,23 +88,19 @@
}})
}
-// ProposeConfChange proposes a config change.
-func (rn *RawNode) ProposeConfChange(cc pb.ConfChange) error {
- data, err := cc.Marshal()
+// ProposeConfChange proposes a config change. See (Node).ProposeConfChange for
+// details.
+func (rn *RawNode) ProposeConfChange(cc pb.ConfChangeI) error {
+ m, err := confChangeToMsg(cc)
if err != nil {
return err
}
- return rn.raft.Step(pb.Message{
- Type: pb.MsgProp,
- Entries: []pb.Entry{
- {Type: pb.EntryConfChange, Data: data},
- },
- })
+ return rn.raft.Step(m)
}
// ApplyConfChange applies a config change to the local node.
-func (rn *RawNode) ApplyConfChange(cc pb.ConfChange) *pb.ConfState {
- cs := rn.raft.applyConfChange(cc)
+func (rn *RawNode) ApplyConfChange(cc pb.ConfChangeI) *pb.ConfState {
+ cs := rn.raft.applyConfChange(cc.AsV2())
return &cs
}
@@ -182,14 +116,35 @@
return ErrStepPeerNotFound
}
-// Ready returns the current point-in-time state of this RawNode.
+// Ready returns the outstanding work that the application needs to handle. This
+// includes appending and applying entries or a snapshot, updating the HardState,
+// and sending messages. The returned Ready() *must* be handled and subsequently
+// passed back via Advance().
func (rn *RawNode) Ready() Ready {
- rd := rn.newReady()
- rn.raft.msgs = nil
- rn.raft.reduceUncommittedSize(rd.CommittedEntries)
+ rd := rn.readyWithoutAccept()
+ rn.acceptReady(rd)
return rd
}
+// readyWithoutAccept returns a Ready. This is a read-only operation, i.e. there
+// is no obligation that the Ready must be handled.
+func (rn *RawNode) readyWithoutAccept() Ready {
+ return newReady(rn.raft, rn.prevSoftSt, rn.prevHardSt)
+}
+
+// acceptReady is called when the consumer of the RawNode has decided to go
+// ahead and handle a Ready. Nothing must alter the state of the RawNode between
+// this call and the prior call to Ready().
+func (rn *RawNode) acceptReady(rd Ready) {
+ if rd.SoftState != nil {
+ rn.prevSoftSt = rd.SoftState
+ }
+ if len(rd.ReadStates) != 0 {
+ rn.raft.readStates = nil
+ }
+ rn.raft.msgs = nil
+}
+
// HasReady called when RawNode user need to check if any Ready pending.
// Checking logic in this method should be consistent with Ready.containsUpdates().
func (rn *RawNode) HasReady() bool {
@@ -215,21 +170,23 @@
// Advance notifies the RawNode that the application has applied and saved progress in the
// last Ready results.
func (rn *RawNode) Advance(rd Ready) {
- rn.commitReady(rd)
+ if !IsEmptyHardState(rd.HardState) {
+ rn.prevHardSt = rd.HardState
+ }
+ rn.raft.advance(rd)
}
-// Status returns the current status of the given group.
-func (rn *RawNode) Status() *Status {
+// Status returns the current status of the given group. This allocates, see
+// BasicStatus and WithProgress for allocation-friendlier choices.
+func (rn *RawNode) Status() Status {
status := getStatus(rn.raft)
- return &status
+ return status
}
-// StatusWithoutProgress returns a Status without populating the Progress field
-// (and returns the Status as a value to avoid forcing it onto the heap). This
-// is more performant if the Progress is not required. See WithProgress for an
-// allocation-free way to introspect the Progress.
-func (rn *RawNode) StatusWithoutProgress() Status {
- return getStatusWithoutProgress(rn.raft)
+// BasicStatus returns a BasicStatus. Notably this does not contain the
+// Progress map; see WithProgress for an allocation-free way to inspect it.
+func (rn *RawNode) BasicStatus() BasicStatus {
+ return getBasicStatus(rn.raft)
}
// ProgressType indicates the type of replica a Progress corresponds to.
diff --git a/vendor/go.etcd.io/etcd/raft/status.go b/vendor/go.etcd.io/etcd/raft/status.go
index bf4898c..adc6048 100644
--- a/vendor/go.etcd.io/etcd/raft/status.go
+++ b/vendor/go.etcd.io/etcd/raft/status.go
@@ -21,14 +21,22 @@
"go.etcd.io/etcd/raft/tracker"
)
+// Status contains information about this Raft peer and its view of the system.
+// The Progress is only populated on the leader.
type Status struct {
+ BasicStatus
+ Config tracker.Config
+ Progress map[uint64]tracker.Progress
+}
+
+// BasicStatus contains basic information about the Raft peer. It does not allocate.
+type BasicStatus struct {
ID uint64
pb.HardState
SoftState
- Applied uint64
- Progress map[uint64]tracker.Progress
+ Applied uint64
LeadTransferee uint64
}
@@ -37,19 +45,17 @@
m := make(map[uint64]tracker.Progress)
r.prs.Visit(func(id uint64, pr *tracker.Progress) {
var p tracker.Progress
- p, pr = *pr, nil /* avoid accidental reuse below */
-
- // The inflight buffer is tricky to copy and besides, it isn't exposed
- // to the client, so pretend it's nil.
- p.Inflights = nil
+ p = *pr
+ p.Inflights = pr.Inflights.Clone()
+ pr = nil
m[id] = p
})
return m
}
-func getStatusWithoutProgress(r *raft) Status {
- s := Status{
+func getBasicStatus(r *raft) BasicStatus {
+ s := BasicStatus{
ID: r.id,
LeadTransferee: r.leadTransferee,
}
@@ -61,10 +67,12 @@
// getStatus gets a copy of the current raft status.
func getStatus(r *raft) Status {
- s := getStatusWithoutProgress(r)
+ var s Status
+ s.BasicStatus = getBasicStatus(r)
if s.RaftState == StateLeader {
s.Progress = getProgressCopy(r)
}
+ s.Config = r.prs.Config.Clone()
return s
}
diff --git a/vendor/go.etcd.io/etcd/raft/storage.go b/vendor/go.etcd.io/etcd/raft/storage.go
index 14ad686..6be5745 100644
--- a/vendor/go.etcd.io/etcd/raft/storage.go
+++ b/vendor/go.etcd.io/etcd/raft/storage.go
@@ -44,6 +44,8 @@
// become inoperable and refuse to participate in elections; the
// application is responsible for cleanup and recovery in this case.
type Storage interface {
+ // TODO(tbg): split this into two interfaces, LogStorage and StateStorage.
+
// InitialState returns the saved HardState and ConfState information.
InitialState() (pb.HardState, pb.ConfState, error)
// Entries returns a slice of log entries in the range [lo,hi).
diff --git a/vendor/go.etcd.io/etcd/raft/tracker/inflights.go b/vendor/go.etcd.io/etcd/raft/tracker/inflights.go
index 9e209e2..1a05634 100644
--- a/vendor/go.etcd.io/etcd/raft/tracker/inflights.go
+++ b/vendor/go.etcd.io/etcd/raft/tracker/inflights.go
@@ -40,6 +40,14 @@
}
}
+// Clone returns an *Inflights that is identical to but shares no memory with
+// the receiver.
+func (in *Inflights) Clone() *Inflights {
+ ins := *in
+ ins.buffer = append([]uint64(nil), in.buffer...)
+ return &ins
+}
+
// Add notifies the Inflights that a new message with the given index is being
// dispatched. Full() must be called prior to Add() to verify that there is room
// for one more message, and consecutive calls to add Add() must provide a
diff --git a/vendor/go.etcd.io/etcd/raft/tracker/progress.go b/vendor/go.etcd.io/etcd/raft/tracker/progress.go
index 697277b..62c81f4 100644
--- a/vendor/go.etcd.io/etcd/raft/tracker/progress.go
+++ b/vendor/go.etcd.io/etcd/raft/tracker/progress.go
@@ -52,6 +52,8 @@
// RecentActive is true if the progress is recently active. Receiving any messages
// from the corresponding follower indicates the progress is active.
// RecentActive can be reset to false after an election timeout.
+ //
+ // TODO(tbg): the leader should always have this set to true.
RecentActive bool
// ProbeSent is used while this follower is in StateProbe. When ProbeSent is
diff --git a/vendor/go.etcd.io/etcd/raft/tracker/tracker.go b/vendor/go.etcd.io/etcd/raft/tracker/tracker.go
index a2638f5..a458114 100644
--- a/vendor/go.etcd.io/etcd/raft/tracker/tracker.go
+++ b/vendor/go.etcd.io/etcd/raft/tracker/tracker.go
@@ -20,11 +20,17 @@
"strings"
"go.etcd.io/etcd/raft/quorum"
+ pb "go.etcd.io/etcd/raft/raftpb"
)
// Config reflects the configuration tracked in a ProgressTracker.
type Config struct {
Voters quorum.JointConfig
+ // AutoLeave is true if the configuration is joint and a transition to the
+ // incoming configuration should be carried out automatically by Raft when
+ // this is possible. If false, the configuration will be joint until the
+ // application initiates the transition manually.
+ AutoLeave bool
// Learners is a set of IDs corresponding to the learners active in the
// current configuration.
//
@@ -80,6 +86,9 @@
if c.LearnersNext != nil {
fmt.Fprintf(&buf, " learners_next=%s", quorum.MajorityConfig(c.LearnersNext).String())
}
+ if c.AutoLeave {
+ fmt.Fprintf(&buf, " autoleave")
+ }
return buf.String()
}
@@ -133,6 +142,17 @@
return p
}
+// ConfState returns a ConfState representing the active configuration.
+func (p *ProgressTracker) ConfState() pb.ConfState {
+ return pb.ConfState{
+ Voters: p.Voters[0].Slice(),
+ VotersOutgoing: p.Voters[1].Slice(),
+ Learners: quorum.MajorityConfig(p.Learners).Slice(),
+ LearnersNext: quorum.MajorityConfig(p.LearnersNext).Slice(),
+ AutoLeave: p.AutoLeave,
+ }
+}
+
// IsSingleton returns true if (and only if) there is only one voting member
// (i.e. the leader) in the current configuration.
func (p *ProgressTracker) IsSingleton() bool {
@@ -158,10 +178,35 @@
return uint64(p.Voters.CommittedIndex(matchAckIndexer(p.Progress)))
}
-// Visit invokes the supplied closure for all tracked progresses.
+func insertionSort(sl []uint64) {
+ a, b := 0, len(sl)
+ for i := a + 1; i < b; i++ {
+ for j := i; j > a && sl[j] < sl[j-1]; j-- {
+ sl[j], sl[j-1] = sl[j-1], sl[j]
+ }
+ }
+}
+
+// Visit invokes the supplied closure for all tracked progresses in stable order.
func (p *ProgressTracker) Visit(f func(id uint64, pr *Progress)) {
- for id, pr := range p.Progress {
- f(id, pr)
+ n := len(p.Progress)
+ // We need to sort the IDs and don't want to allocate since this is hot code.
+ // The optimization here mirrors that in `(MajorityConfig).CommittedIndex`,
+ // see there for details.
+ var sl [7]uint64
+ ids := sl[:]
+ if len(sl) >= n {
+ ids = sl[:n]
+ } else {
+ ids = make([]uint64, n)
+ }
+ for id := range p.Progress {
+ n--
+ ids[n] = id
+ }
+ insertionSort(ids)
+ for _, id := range ids {
+ f(id, p.Progress[id])
}
}
@@ -192,6 +237,9 @@
// LearnerNodes returns a sorted slice of learners.
func (p *ProgressTracker) LearnerNodes() []uint64 {
+ if len(p.Learners) == 0 {
+ return nil
+ }
nodes := make([]uint64, 0, len(p.Learners))
for id := range p.Learners {
nodes = append(nodes, id)
diff --git a/vendor/go.etcd.io/etcd/raft/util.go b/vendor/go.etcd.io/etcd/raft/util.go
index c145d26..785cf73 100644
--- a/vendor/go.etcd.io/etcd/raft/util.go
+++ b/vendor/go.etcd.io/etcd/raft/util.go
@@ -17,6 +17,7 @@
import (
"bytes"
"fmt"
+ "strings"
pb "go.etcd.io/etcd/raft/raftpb"
)
@@ -25,13 +26,6 @@
return []byte(fmt.Sprintf("%q", st.String())), nil
}
-// uint64Slice implements sort interface
-type uint64Slice []uint64
-
-func (p uint64Slice) Len() int { return len(p) }
-func (p uint64Slice) Less(i, j int) bool { return p[i] < p[j] }
-func (p uint64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
-
func min(a, b uint64) uint64 {
if a > b {
return b
@@ -67,6 +61,69 @@
}
}
+func DescribeHardState(hs pb.HardState) string {
+ var buf strings.Builder
+ fmt.Fprintf(&buf, "Term:%d", hs.Term)
+ if hs.Vote != 0 {
+ fmt.Fprintf(&buf, " Vote:%d", hs.Vote)
+ }
+ fmt.Fprintf(&buf, " Commit:%d", hs.Commit)
+ return buf.String()
+}
+
+func DescribeSoftState(ss SoftState) string {
+ return fmt.Sprintf("Lead:%d State:%s", ss.Lead, ss.RaftState)
+}
+
+func DescribeConfState(state pb.ConfState) string {
+ return fmt.Sprintf(
+ "Voters:%v VotersOutgoing:%v Learners:%v LearnersNext:%v AutoLeave:%v",
+ state.Voters, state.VotersOutgoing, state.Learners, state.LearnersNext, state.AutoLeave,
+ )
+}
+
+func DescribeSnapshot(snap pb.Snapshot) string {
+ m := snap.Metadata
+ return fmt.Sprintf("Index:%d Term:%d ConfState:%s", m.Index, m.Term, DescribeConfState(m.ConfState))
+}
+
+func DescribeReady(rd Ready, f EntryFormatter) string {
+ var buf strings.Builder
+ if rd.SoftState != nil {
+ fmt.Fprint(&buf, DescribeSoftState(*rd.SoftState))
+ buf.WriteByte('\n')
+ }
+ if !IsEmptyHardState(rd.HardState) {
+ fmt.Fprintf(&buf, "HardState %s", DescribeHardState(rd.HardState))
+ buf.WriteByte('\n')
+ }
+ if len(rd.ReadStates) > 0 {
+ fmt.Fprintf(&buf, "ReadStates %v\n", rd.ReadStates)
+ }
+ if len(rd.Entries) > 0 {
+ buf.WriteString("Entries:\n")
+ fmt.Fprint(&buf, DescribeEntries(rd.Entries, f))
+ }
+ if !IsEmptySnap(rd.Snapshot) {
+ fmt.Fprintf(&buf, "Snapshot %s\n", DescribeSnapshot(rd.Snapshot))
+ }
+ if len(rd.CommittedEntries) > 0 {
+ buf.WriteString("CommittedEntries:\n")
+ fmt.Fprint(&buf, DescribeEntries(rd.CommittedEntries, f))
+ }
+ if len(rd.Messages) > 0 {
+ buf.WriteString("Messages:\n")
+ for _, msg := range rd.Messages {
+ fmt.Fprint(&buf, DescribeMessage(msg, f))
+ buf.WriteByte('\n')
+ }
+ }
+ if buf.Len() > 0 {
+ return fmt.Sprintf("Ready MustSync=%t:\n%s", rd.MustSync, buf.String())
+ }
+ return "<empty Ready>"
+}
+
// EntryFormatter can be implemented by the application to provide human-readable formatting
// of entry data. Nil is a valid EntryFormatter and will use a default format.
type EntryFormatter func([]byte) string
@@ -93,7 +150,7 @@
fmt.Fprintf(&buf, "]")
}
if !IsEmptySnap(m.Snapshot) {
- fmt.Fprintf(&buf, " Snapshot:%v", m.Snapshot)
+ fmt.Fprintf(&buf, " Snapshot: %s", DescribeSnapshot(m.Snapshot))
}
return buf.String()
}
@@ -107,13 +164,39 @@
// DescribeEntry returns a concise human-readable description of an
// Entry for debugging.
func DescribeEntry(e pb.Entry, f EntryFormatter) string {
- var formatted string
- if e.Type == pb.EntryNormal && f != nil {
- formatted = f(e.Data)
- } else {
- formatted = fmt.Sprintf("%q", e.Data)
+ if f == nil {
+ f = func(data []byte) string { return fmt.Sprintf("%q", data) }
}
- return fmt.Sprintf("%d/%d %s %s", e.Term, e.Index, e.Type, formatted)
+
+ formatConfChange := func(cc pb.ConfChangeI) string {
+ // TODO(tbg): give the EntryFormatter a type argument so that it gets
+ // a chance to expose the Context.
+ return pb.ConfChangesToString(cc.AsV2().Changes)
+ }
+
+ var formatted string
+ switch e.Type {
+ case pb.EntryNormal:
+ formatted = f(e.Data)
+ case pb.EntryConfChange:
+ var cc pb.ConfChange
+ if err := cc.Unmarshal(e.Data); err != nil {
+ formatted = err.Error()
+ } else {
+ formatted = formatConfChange(cc)
+ }
+ case pb.EntryConfChangeV2:
+ var cc pb.ConfChangeV2
+ if err := cc.Unmarshal(e.Data); err != nil {
+ formatted = err.Error()
+ } else {
+ formatted = formatConfChange(cc)
+ }
+ }
+ if formatted != "" {
+ formatted = " " + formatted
+ }
+ return fmt.Sprintf("%d/%d %s%s", e.Term, e.Index, e.Type, formatted)
}
// DescribeEntries calls DescribeEntry for each Entry, adding a newline to
@@ -140,3 +223,11 @@
}
return ents[:limit]
}
+
+func assertConfStatesEquivalent(l Logger, cs1, cs2 pb.ConfState) {
+ err := cs1.Equivalent(cs2)
+ if err == nil {
+ return
+ }
+ l.Panic(err)
+}