VOL-2112 move to voltha-lib-go
Change-Id: I3435b8acb982deeab6b6ac28e798d7722ad01d0a
diff --git a/vendor/go.etcd.io/etcd/clientv3/balancer/balancer.go b/vendor/go.etcd.io/etcd/clientv3/balancer/balancer.go
index 3c44e70..d02a7ee 100644
--- a/vendor/go.etcd.io/etcd/clientv3/balancer/balancer.go
+++ b/vendor/go.etcd.io/etcd/clientv3/balancer/balancer.go
@@ -12,24 +12,45 @@
// See the License for the specific language governing permissions and
// limitations under the License.
+// Package balancer implements client balancer.
package balancer
import (
- "fmt"
"strconv"
"sync"
"time"
+ "go.etcd.io/etcd/clientv3/balancer/connectivity"
"go.etcd.io/etcd/clientv3/balancer/picker"
"go.uber.org/zap"
"google.golang.org/grpc/balancer"
- "google.golang.org/grpc/connectivity"
+ grpcconnectivity "google.golang.org/grpc/connectivity"
"google.golang.org/grpc/resolver"
_ "google.golang.org/grpc/resolver/dns" // register DNS resolver
_ "google.golang.org/grpc/resolver/passthrough" // register passthrough resolver
)
+// Config defines balancer configurations.
+type Config struct {
+ // Policy configures balancer policy.
+ Policy picker.Policy
+
+ // Picker implements gRPC picker.
+ // Leave empty if "Policy" field is not custom.
+ // TODO: currently custom policy is not supported.
+ // Picker picker.Picker
+
+ // Name defines an additional name for balancer.
+ // Useful for balancer testing to avoid register conflicts.
+ // If empty, defaults to policy name.
+ Name string
+
+ // Logger configures balancer logging.
+ // If nil, logs are discarded.
+ Logger *zap.Logger
+}
+
// RegisterBuilder creates and registers a builder. Since this function calls balancer.Register, it
// must be invoked at initialization time.
func RegisterBuilder(cfg Config) {
@@ -54,24 +75,18 @@
bb := &baseBalancer{
id: strconv.FormatInt(time.Now().UnixNano(), 36),
policy: b.cfg.Policy,
- name: b.cfg.Policy.String(),
+ name: b.cfg.Name,
lg: b.cfg.Logger,
addrToSc: make(map[resolver.Address]balancer.SubConn),
scToAddr: make(map[balancer.SubConn]resolver.Address),
- scToSt: make(map[balancer.SubConn]connectivity.State),
+ scToSt: make(map[balancer.SubConn]grpcconnectivity.State),
- currentConn: nil,
- csEvltr: &connectivityStateEvaluator{},
+ currentConn: nil,
+ connectivityRecorder: connectivity.New(b.cfg.Logger),
// initialize picker always returns "ErrNoSubConnAvailable"
- Picker: picker.NewErr(balancer.ErrNoSubConnAvailable),
- }
- if b.cfg.Name != "" {
- bb.name = b.cfg.Name
- }
- if bb.lg == nil {
- bb.lg = zap.NewNop()
+ picker: picker.NewErr(balancer.ErrNoSubConnAvailable),
}
// TODO: support multiple connections
@@ -115,13 +130,12 @@
addrToSc map[resolver.Address]balancer.SubConn
scToAddr map[balancer.SubConn]resolver.Address
- scToSt map[balancer.SubConn]connectivity.State
+ scToSt map[balancer.SubConn]grpcconnectivity.State
- currentConn balancer.ClientConn
- currentState connectivity.State
- csEvltr *connectivityStateEvaluator
+ currentConn balancer.ClientConn
+ connectivityRecorder connectivity.Recorder
- picker.Picker
+ picker picker.Picker
}
// HandleResolvedAddrs implements "grpc/balancer.Balancer" interface.
@@ -131,7 +145,11 @@
bb.lg.Warn("HandleResolvedAddrs called with error", zap.String("balancer-id", bb.id), zap.Error(err))
return
}
- bb.lg.Info("resolved", zap.String("balancer-id", bb.id), zap.Strings("addresses", addrsToStrings(addrs)))
+ bb.lg.Info("resolved",
+ zap.String("picker", bb.picker.String()),
+ zap.String("balancer-id", bb.id),
+ zap.Strings("addresses", addrsToStrings(addrs)),
+ )
bb.mu.Lock()
defer bb.mu.Unlock()
@@ -142,12 +160,13 @@
if _, ok := bb.addrToSc[addr]; !ok {
sc, err := bb.currentConn.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{})
if err != nil {
- bb.lg.Warn("NewSubConn failed", zap.String("balancer-id", bb.id), zap.Error(err), zap.String("address", addr.Addr))
+ bb.lg.Warn("NewSubConn failed", zap.String("picker", bb.picker.String()), zap.String("balancer-id", bb.id), zap.Error(err), zap.String("address", addr.Addr))
continue
}
+ bb.lg.Info("created subconn", zap.String("address", addr.Addr))
bb.addrToSc[addr] = sc
bb.scToAddr[sc] = addr
- bb.scToSt[sc] = connectivity.Idle
+ bb.scToSt[sc] = grpcconnectivity.Idle
sc.Connect()
}
}
@@ -160,6 +179,7 @@
bb.lg.Info(
"removed subconn",
+ zap.String("picker", bb.picker.String()),
zap.String("balancer-id", bb.id),
zap.String("address", addr.Addr),
zap.String("subconn", scToString(sc)),
@@ -174,7 +194,7 @@
}
// HandleSubConnStateChange implements "grpc/balancer.Balancer" interface.
-func (bb *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
+func (bb *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s grpcconnectivity.State) {
bb.mu.Lock()
defer bb.mu.Unlock()
@@ -182,8 +202,10 @@
if !ok {
bb.lg.Warn(
"state change for an unknown subconn",
+ zap.String("picker", bb.picker.String()),
zap.String("balancer-id", bb.id),
zap.String("subconn", scToString(sc)),
+ zap.Int("subconn-size", len(bb.scToAddr)),
zap.String("state", s.String()),
)
return
@@ -191,9 +213,11 @@
bb.lg.Info(
"state changed",
+ zap.String("picker", bb.picker.String()),
zap.String("balancer-id", bb.id),
- zap.Bool("connected", s == connectivity.Ready),
+ zap.Bool("connected", s == grpcconnectivity.Ready),
zap.String("subconn", scToString(sc)),
+ zap.Int("subconn-size", len(bb.scToAddr)),
zap.String("address", bb.scToAddr[sc].Addr),
zap.String("old-state", old.String()),
zap.String("new-state", s.String()),
@@ -201,69 +225,63 @@
bb.scToSt[sc] = s
switch s {
- case connectivity.Idle:
+ case grpcconnectivity.Idle:
sc.Connect()
- case connectivity.Shutdown:
+ case grpcconnectivity.Shutdown:
// When an address was removed by resolver, b called RemoveSubConn but
// kept the sc's state in scToSt. Remove state for this sc here.
delete(bb.scToAddr, sc)
delete(bb.scToSt, sc)
}
- oldAggrState := bb.currentState
- bb.currentState = bb.csEvltr.recordTransition(old, s)
+ oldAggrState := bb.connectivityRecorder.GetCurrentState()
+ bb.connectivityRecorder.RecordTransition(old, s)
- // Regenerate picker when one of the following happens:
+ // Update balancer picker when one of the following happens:
// - this sc became ready from not-ready
// - this sc became not-ready from ready
// - the aggregated state of balancer became TransientFailure from non-TransientFailure
// - the aggregated state of balancer became non-TransientFailure from TransientFailure
- if (s == connectivity.Ready) != (old == connectivity.Ready) ||
- (bb.currentState == connectivity.TransientFailure) != (oldAggrState == connectivity.TransientFailure) {
- bb.regeneratePicker()
+ if (s == grpcconnectivity.Ready) != (old == grpcconnectivity.Ready) ||
+ (bb.connectivityRecorder.GetCurrentState() == grpcconnectivity.TransientFailure) != (oldAggrState == grpcconnectivity.TransientFailure) {
+ bb.updatePicker()
}
- bb.currentConn.UpdateBalancerState(bb.currentState, bb.Picker)
- return
+ bb.currentConn.UpdateBalancerState(bb.connectivityRecorder.GetCurrentState(), bb.picker)
}
-func (bb *baseBalancer) regeneratePicker() {
- if bb.currentState == connectivity.TransientFailure {
+func (bb *baseBalancer) updatePicker() {
+ if bb.connectivityRecorder.GetCurrentState() == grpcconnectivity.TransientFailure {
+ bb.picker = picker.NewErr(balancer.ErrTransientFailure)
bb.lg.Info(
- "generated transient error picker",
+ "updated picker to transient error picker",
+ zap.String("picker", bb.picker.String()),
zap.String("balancer-id", bb.id),
zap.String("policy", bb.policy.String()),
)
- bb.Picker = picker.NewErr(balancer.ErrTransientFailure)
return
}
// only pass ready subconns to picker
- scs := make([]balancer.SubConn, 0)
- addrToSc := make(map[resolver.Address]balancer.SubConn)
scToAddr := make(map[balancer.SubConn]resolver.Address)
for addr, sc := range bb.addrToSc {
- if st, ok := bb.scToSt[sc]; ok && st == connectivity.Ready {
- scs = append(scs, sc)
- addrToSc[addr] = sc
+ if st, ok := bb.scToSt[sc]; ok && st == grpcconnectivity.Ready {
scToAddr[sc] = addr
}
}
- switch bb.policy {
- case picker.RoundrobinBalanced:
- bb.Picker = picker.NewRoundrobinBalanced(bb.lg, scs, addrToSc, scToAddr)
-
- default:
- panic(fmt.Errorf("invalid balancer picker policy (%d)", bb.policy))
- }
-
+ bb.picker = picker.New(picker.Config{
+ Policy: bb.policy,
+ Logger: bb.lg,
+ SubConnToResolverAddress: scToAddr,
+ })
bb.lg.Info(
- "generated picker",
+ "updated picker",
+ zap.String("picker", bb.picker.String()),
zap.String("balancer-id", bb.id),
zap.String("policy", bb.policy.String()),
- zap.Strings("subconn-ready", scsToStrings(addrToSc)),
- zap.Int("subconn-size", len(addrToSc)),
+ zap.Strings("subconn-ready", scsToStrings(scToAddr)),
+ zap.Int("subconn-size", len(scToAddr)),
)
}
diff --git a/vendor/go.etcd.io/etcd/clientv3/balancer/config.go b/vendor/go.etcd.io/etcd/clientv3/balancer/config.go
deleted file mode 100644
index 0339a84..0000000
--- a/vendor/go.etcd.io/etcd/clientv3/balancer/config.go
+++ /dev/null
@@ -1,36 +0,0 @@
-// Copyright 2018 The etcd Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package balancer
-
-import (
- "go.etcd.io/etcd/clientv3/balancer/picker"
-
- "go.uber.org/zap"
-)
-
-// Config defines balancer configurations.
-type Config struct {
- // Policy configures balancer policy.
- Policy picker.Policy
-
- // Name defines an additional name for balancer.
- // Useful for balancer testing to avoid register conflicts.
- // If empty, defaults to policy name.
- Name string
-
- // Logger configures balancer logging.
- // If nil, logs are discarded.
- Logger *zap.Logger
-}
diff --git a/vendor/go.etcd.io/etcd/clientv3/balancer/connectivity.go b/vendor/go.etcd.io/etcd/clientv3/balancer/connectivity.go
deleted file mode 100644
index 6cdeb3f..0000000
--- a/vendor/go.etcd.io/etcd/clientv3/balancer/connectivity.go
+++ /dev/null
@@ -1,58 +0,0 @@
-// Copyright 2018 The etcd Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package balancer
-
-import "google.golang.org/grpc/connectivity"
-
-// connectivityStateEvaluator gets updated by addrConns when their
-// states transition, based on which it evaluates the state of
-// ClientConn.
-type connectivityStateEvaluator struct {
- numReady uint64 // Number of addrConns in ready state.
- numConnecting uint64 // Number of addrConns in connecting state.
- numTransientFailure uint64 // Number of addrConns in transientFailure.
-}
-
-// recordTransition records state change happening in every subConn and based on
-// that it evaluates what aggregated state should be.
-// It can only transition between Ready, Connecting and TransientFailure. Other states,
-// Idle and Shutdown are transitioned into by ClientConn; in the beginning of the connection
-// before any subConn is created ClientConn is in idle state. In the end when ClientConn
-// closes it is in Shutdown state.
-//
-// recordTransition should only be called synchronously from the same goroutine.
-func (cse *connectivityStateEvaluator) recordTransition(oldState, newState connectivity.State) connectivity.State {
- // Update counters.
- for idx, state := range []connectivity.State{oldState, newState} {
- updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new.
- switch state {
- case connectivity.Ready:
- cse.numReady += updateVal
- case connectivity.Connecting:
- cse.numConnecting += updateVal
- case connectivity.TransientFailure:
- cse.numTransientFailure += updateVal
- }
- }
-
- // Evaluate.
- if cse.numReady > 0 {
- return connectivity.Ready
- }
- if cse.numConnecting > 0 {
- return connectivity.Connecting
- }
- return connectivity.TransientFailure
-}
diff --git a/vendor/go.etcd.io/etcd/clientv3/balancer/connectivity/connectivity.go b/vendor/go.etcd.io/etcd/clientv3/balancer/connectivity/connectivity.go
new file mode 100644
index 0000000..4c4ad36
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/balancer/connectivity/connectivity.go
@@ -0,0 +1,93 @@
+// Copyright 2019 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package connectivity implements client connectivity operations.
+package connectivity
+
+import (
+ "sync"
+
+ "go.uber.org/zap"
+ "google.golang.org/grpc/connectivity"
+)
+
+// Recorder records gRPC connectivity.
+type Recorder interface {
+ GetCurrentState() connectivity.State
+ RecordTransition(oldState, newState connectivity.State)
+}
+
+// New returns a new Recorder.
+func New(lg *zap.Logger) Recorder {
+ return &recorder{lg: lg}
+}
+
+// recorder takes the connectivity states of multiple SubConns
+// and returns one aggregated connectivity state.
+// ref. https://github.com/grpc/grpc-go/blob/master/balancer/balancer.go
+type recorder struct {
+ lg *zap.Logger
+
+ mu sync.RWMutex
+
+ cur connectivity.State
+
+ numReady uint64 // Number of addrConns in ready state.
+ numConnecting uint64 // Number of addrConns in connecting state.
+ numTransientFailure uint64 // Number of addrConns in transientFailure.
+}
+
+func (rc *recorder) GetCurrentState() (state connectivity.State) {
+ rc.mu.RLock()
+ defer rc.mu.RUnlock()
+ return rc.cur
+}
+
+// RecordTransition records state change happening in subConn and based on that
+// it evaluates what aggregated state should be.
+//
+// - If at least one SubConn in Ready, the aggregated state is Ready;
+// - Else if at least one SubConn in Connecting, the aggregated state is Connecting;
+// - Else the aggregated state is TransientFailure.
+//
+// Idle and Shutdown are not considered.
+//
+// ref. https://github.com/grpc/grpc-go/blob/master/balancer/balancer.go
+func (rc *recorder) RecordTransition(oldState, newState connectivity.State) {
+ rc.mu.Lock()
+ defer rc.mu.Unlock()
+
+ for idx, state := range []connectivity.State{oldState, newState} {
+ updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new.
+ switch state {
+ case connectivity.Ready:
+ rc.numReady += updateVal
+ case connectivity.Connecting:
+ rc.numConnecting += updateVal
+ case connectivity.TransientFailure:
+ rc.numTransientFailure += updateVal
+ default:
+ rc.lg.Warn("connectivity recorder received unknown state", zap.String("connectivity-state", state.String()))
+ }
+ }
+
+ switch { // must be exclusive, no overlap
+ case rc.numReady > 0:
+ rc.cur = connectivity.Ready
+ case rc.numConnecting > 0:
+ rc.cur = connectivity.Connecting
+ default:
+ rc.cur = connectivity.TransientFailure
+ }
+}
diff --git a/vendor/go.etcd.io/etcd/clientv3/balancer/doc.go b/vendor/go.etcd.io/etcd/clientv3/balancer/doc.go
deleted file mode 100644
index 45af5e9..0000000
--- a/vendor/go.etcd.io/etcd/clientv3/balancer/doc.go
+++ /dev/null
@@ -1,16 +0,0 @@
-// Copyright 2018 The etcd Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-// Package balancer implements client balancer.
-package balancer
diff --git a/vendor/go.etcd.io/etcd/clientv3/balancer/grpc1.7-health.go b/vendor/go.etcd.io/etcd/clientv3/balancer/grpc1.7-health.go
deleted file mode 100644
index 2153767..0000000
--- a/vendor/go.etcd.io/etcd/clientv3/balancer/grpc1.7-health.go
+++ /dev/null
@@ -1,657 +0,0 @@
-// Copyright 2018 The etcd Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package balancer
-
-import (
- "context"
- "errors"
- "io/ioutil"
- "net/url"
- "strings"
- "sync"
- "time"
-
- "google.golang.org/grpc"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/grpclog"
- healthpb "google.golang.org/grpc/health/grpc_health_v1"
- "google.golang.org/grpc/status"
-)
-
-// TODO: replace with something better
-var lg = grpclog.NewLoggerV2(ioutil.Discard, ioutil.Discard, ioutil.Discard)
-
-const (
- minHealthRetryDuration = 3 * time.Second
- unknownService = "unknown service grpc.health.v1.Health"
-)
-
-// ErrNoAddrAvailable is returned by Get() when the balancer does not have
-// any active connection to endpoints at the time.
-// This error is returned only when opts.BlockingWait is true.
-var ErrNoAddrAvailable = status.Error(codes.Unavailable, "there is no address available")
-
-type NotifyMsg int
-
-const (
- NotifyReset NotifyMsg = iota
- NotifyNext
-)
-
-// GRPC17Health does the bare minimum to expose multiple eps
-// to the grpc reconnection code path
-type GRPC17Health struct {
- // addrs are the client's endpoint addresses for grpc
- addrs []grpc.Address
-
- // eps holds the raw endpoints from the client
- eps []string
-
- // notifyCh notifies grpc of the set of addresses for connecting
- notifyCh chan []grpc.Address
-
- // readyc closes once the first connection is up
- readyc chan struct{}
- readyOnce sync.Once
-
- // healthCheck checks an endpoint's health.
- healthCheck func(ep string) (bool, error)
- healthCheckTimeout time.Duration
-
- unhealthyMu sync.RWMutex
- unhealthyHostPorts map[string]time.Time
-
- // mu protects all fields below.
- mu sync.RWMutex
-
- // upc closes when pinAddr transitions from empty to non-empty or the balancer closes.
- upc chan struct{}
-
- // downc closes when grpc calls down() on pinAddr
- downc chan struct{}
-
- // stopc is closed to signal updateNotifyLoop should stop.
- stopc chan struct{}
- stopOnce sync.Once
- wg sync.WaitGroup
-
- // donec closes when all goroutines are exited
- donec chan struct{}
-
- // updateAddrsC notifies updateNotifyLoop to update addrs.
- updateAddrsC chan NotifyMsg
-
- // grpc issues TLS cert checks using the string passed into dial so
- // that string must be the host. To recover the full scheme://host URL,
- // have a map from hosts to the original endpoint.
- hostPort2ep map[string]string
-
- // pinAddr is the currently pinned address; set to the empty string on
- // initialization and shutdown.
- pinAddr string
-
- closed bool
-}
-
-// DialFunc defines gRPC dial function.
-type DialFunc func(ep string, dopts ...grpc.DialOption) (*grpc.ClientConn, error)
-
-// NewGRPC17Health returns a new health balancer with gRPC v1.7.
-func NewGRPC17Health(
- eps []string,
- timeout time.Duration,
- dialFunc DialFunc,
-) *GRPC17Health {
- notifyCh := make(chan []grpc.Address)
- addrs := eps2addrs(eps)
- hb := &GRPC17Health{
- addrs: addrs,
- eps: eps,
- notifyCh: notifyCh,
- readyc: make(chan struct{}),
- healthCheck: func(ep string) (bool, error) { return grpcHealthCheck(ep, dialFunc) },
- unhealthyHostPorts: make(map[string]time.Time),
- upc: make(chan struct{}),
- stopc: make(chan struct{}),
- downc: make(chan struct{}),
- donec: make(chan struct{}),
- updateAddrsC: make(chan NotifyMsg),
- hostPort2ep: getHostPort2ep(eps),
- }
- if timeout < minHealthRetryDuration {
- timeout = minHealthRetryDuration
- }
- hb.healthCheckTimeout = timeout
-
- close(hb.downc)
- go hb.updateNotifyLoop()
- hb.wg.Add(1)
- go func() {
- defer hb.wg.Done()
- hb.updateUnhealthy()
- }()
- return hb
-}
-
-func (b *GRPC17Health) Start(target string, config grpc.BalancerConfig) error { return nil }
-
-func (b *GRPC17Health) ConnectNotify() <-chan struct{} {
- b.mu.Lock()
- defer b.mu.Unlock()
- return b.upc
-}
-
-func (b *GRPC17Health) UpdateAddrsC() chan NotifyMsg { return b.updateAddrsC }
-func (b *GRPC17Health) StopC() chan struct{} { return b.stopc }
-
-func (b *GRPC17Health) Ready() <-chan struct{} { return b.readyc }
-
-func (b *GRPC17Health) Endpoint(hostPort string) string {
- b.mu.RLock()
- defer b.mu.RUnlock()
- return b.hostPort2ep[hostPort]
-}
-
-func (b *GRPC17Health) Pinned() string {
- b.mu.RLock()
- defer b.mu.RUnlock()
- return b.pinAddr
-}
-
-func (b *GRPC17Health) HostPortError(hostPort string, err error) {
- if b.Endpoint(hostPort) == "" {
- lg.Infof("clientv3/balancer: %q is stale (skip marking as unhealthy on %q)", hostPort, err.Error())
- return
- }
-
- b.unhealthyMu.Lock()
- b.unhealthyHostPorts[hostPort] = time.Now()
- b.unhealthyMu.Unlock()
- lg.Infof("clientv3/balancer: %q is marked unhealthy (%q)", hostPort, err.Error())
-}
-
-func (b *GRPC17Health) removeUnhealthy(hostPort, msg string) {
- if b.Endpoint(hostPort) == "" {
- lg.Infof("clientv3/balancer: %q was not in unhealthy (%q)", hostPort, msg)
- return
- }
-
- b.unhealthyMu.Lock()
- delete(b.unhealthyHostPorts, hostPort)
- b.unhealthyMu.Unlock()
- lg.Infof("clientv3/balancer: %q is removed from unhealthy (%q)", hostPort, msg)
-}
-
-func (b *GRPC17Health) countUnhealthy() (count int) {
- b.unhealthyMu.RLock()
- count = len(b.unhealthyHostPorts)
- b.unhealthyMu.RUnlock()
- return count
-}
-
-func (b *GRPC17Health) isUnhealthy(hostPort string) (unhealthy bool) {
- b.unhealthyMu.RLock()
- _, unhealthy = b.unhealthyHostPorts[hostPort]
- b.unhealthyMu.RUnlock()
- return unhealthy
-}
-
-func (b *GRPC17Health) cleanupUnhealthy() {
- b.unhealthyMu.Lock()
- for k, v := range b.unhealthyHostPorts {
- if time.Since(v) > b.healthCheckTimeout {
- delete(b.unhealthyHostPorts, k)
- lg.Infof("clientv3/balancer: removed %q from unhealthy after %v", k, b.healthCheckTimeout)
- }
- }
- b.unhealthyMu.Unlock()
-}
-
-func (b *GRPC17Health) liveAddrs() ([]grpc.Address, map[string]struct{}) {
- unhealthyCnt := b.countUnhealthy()
-
- b.mu.RLock()
- defer b.mu.RUnlock()
-
- hbAddrs := b.addrs
- if len(b.addrs) == 1 || unhealthyCnt == 0 || unhealthyCnt == len(b.addrs) {
- liveHostPorts := make(map[string]struct{}, len(b.hostPort2ep))
- for k := range b.hostPort2ep {
- liveHostPorts[k] = struct{}{}
- }
- return hbAddrs, liveHostPorts
- }
-
- addrs := make([]grpc.Address, 0, len(b.addrs)-unhealthyCnt)
- liveHostPorts := make(map[string]struct{}, len(addrs))
- for _, addr := range b.addrs {
- if !b.isUnhealthy(addr.Addr) {
- addrs = append(addrs, addr)
- liveHostPorts[addr.Addr] = struct{}{}
- }
- }
- return addrs, liveHostPorts
-}
-
-func (b *GRPC17Health) updateUnhealthy() {
- for {
- select {
- case <-time.After(b.healthCheckTimeout):
- b.cleanupUnhealthy()
- pinned := b.Pinned()
- if pinned == "" || b.isUnhealthy(pinned) {
- select {
- case b.updateAddrsC <- NotifyNext:
- case <-b.stopc:
- return
- }
- }
- case <-b.stopc:
- return
- }
- }
-}
-
-// NeedUpdate returns true if all connections are down or
-// addresses do not include current pinned address.
-func (b *GRPC17Health) NeedUpdate() bool {
- // updating notifyCh can trigger new connections,
- // need update addrs if all connections are down
- // or addrs does not include pinAddr.
- b.mu.RLock()
- update := !hasAddr(b.addrs, b.pinAddr)
- b.mu.RUnlock()
- return update
-}
-
-func (b *GRPC17Health) UpdateAddrs(eps ...string) {
- np := getHostPort2ep(eps)
-
- b.mu.Lock()
- defer b.mu.Unlock()
-
- match := len(np) == len(b.hostPort2ep)
- if match {
- for k, v := range np {
- if b.hostPort2ep[k] != v {
- match = false
- break
- }
- }
- }
- if match {
- // same endpoints, so no need to update address
- return
- }
-
- b.hostPort2ep = np
- b.addrs, b.eps = eps2addrs(eps), eps
-
- b.unhealthyMu.Lock()
- b.unhealthyHostPorts = make(map[string]time.Time)
- b.unhealthyMu.Unlock()
-}
-
-func (b *GRPC17Health) Next() {
- b.mu.RLock()
- downc := b.downc
- b.mu.RUnlock()
- select {
- case b.updateAddrsC <- NotifyNext:
- case <-b.stopc:
- }
- // wait until disconnect so new RPCs are not issued on old connection
- select {
- case <-downc:
- case <-b.stopc:
- }
-}
-
-func (b *GRPC17Health) updateNotifyLoop() {
- defer close(b.donec)
-
- for {
- b.mu.RLock()
- upc, downc, addr := b.upc, b.downc, b.pinAddr
- b.mu.RUnlock()
- // downc or upc should be closed
- select {
- case <-downc:
- downc = nil
- default:
- }
- select {
- case <-upc:
- upc = nil
- default:
- }
- switch {
- case downc == nil && upc == nil:
- // stale
- select {
- case <-b.stopc:
- return
- default:
- }
- case downc == nil:
- b.notifyAddrs(NotifyReset)
- select {
- case <-upc:
- case msg := <-b.updateAddrsC:
- b.notifyAddrs(msg)
- case <-b.stopc:
- return
- }
- case upc == nil:
- select {
- // close connections that are not the pinned address
- case b.notifyCh <- []grpc.Address{{Addr: addr}}:
- case <-downc:
- case <-b.stopc:
- return
- }
- select {
- case <-downc:
- b.notifyAddrs(NotifyReset)
- case msg := <-b.updateAddrsC:
- b.notifyAddrs(msg)
- case <-b.stopc:
- return
- }
- }
- }
-}
-
-func (b *GRPC17Health) notifyAddrs(msg NotifyMsg) {
- if msg == NotifyNext {
- select {
- case b.notifyCh <- []grpc.Address{}:
- case <-b.stopc:
- return
- }
- }
- b.mu.RLock()
- pinAddr := b.pinAddr
- downc := b.downc
- b.mu.RUnlock()
- addrs, hostPorts := b.liveAddrs()
-
- var waitDown bool
- if pinAddr != "" {
- _, ok := hostPorts[pinAddr]
- waitDown = !ok
- }
-
- select {
- case b.notifyCh <- addrs:
- if waitDown {
- select {
- case <-downc:
- case <-b.stopc:
- }
- }
- case <-b.stopc:
- }
-}
-
-func (b *GRPC17Health) Up(addr grpc.Address) func(error) {
- if !b.mayPin(addr) {
- return func(err error) {}
- }
-
- b.mu.Lock()
- defer b.mu.Unlock()
-
- // gRPC might call Up after it called Close. We add this check
- // to "fix" it up at application layer. Otherwise, will panic
- // if b.upc is already closed.
- if b.closed {
- return func(err error) {}
- }
-
- // gRPC might call Up on a stale address.
- // Prevent updating pinAddr with a stale address.
- if !hasAddr(b.addrs, addr.Addr) {
- return func(err error) {}
- }
-
- if b.pinAddr != "" {
- lg.Infof("clientv3/balancer: %q is up but not pinned (already pinned %q)", addr.Addr, b.pinAddr)
- return func(err error) {}
- }
-
- // notify waiting Get()s and pin first connected address
- close(b.upc)
- b.downc = make(chan struct{})
- b.pinAddr = addr.Addr
- lg.Infof("clientv3/balancer: pin %q", addr.Addr)
-
- // notify client that a connection is up
- b.readyOnce.Do(func() { close(b.readyc) })
-
- return func(err error) {
- // If connected to a black hole endpoint or a killed server, the gRPC ping
- // timeout will induce a network I/O error, and retrying until success;
- // finding healthy endpoint on retry could take several timeouts and redials.
- // To avoid wasting retries, gray-list unhealthy endpoints.
- b.HostPortError(addr.Addr, err)
-
- b.mu.Lock()
- b.upc = make(chan struct{})
- close(b.downc)
- b.pinAddr = ""
- b.mu.Unlock()
- lg.Infof("clientv3/balancer: unpin %q (%q)", addr.Addr, err.Error())
- }
-}
-
-func (b *GRPC17Health) mayPin(addr grpc.Address) bool {
- if b.Endpoint(addr.Addr) == "" { // stale host:port
- return false
- }
-
- b.unhealthyMu.RLock()
- unhealthyCnt := len(b.unhealthyHostPorts)
- failedTime, bad := b.unhealthyHostPorts[addr.Addr]
- b.unhealthyMu.RUnlock()
-
- b.mu.RLock()
- skip := len(b.addrs) == 1 || unhealthyCnt == 0 || len(b.addrs) == unhealthyCnt
- b.mu.RUnlock()
- if skip || !bad {
- return true
- }
-
- // prevent isolated member's endpoint from being infinitely retried, as follows:
- // 1. keepalive pings detects GoAway with http2.ErrCodeEnhanceYourCalm
- // 2. balancer 'Up' unpins with grpc: failed with network I/O error
- // 3. grpc-healthcheck still SERVING, thus retry to pin
- // instead, return before grpc-healthcheck if failed within healthcheck timeout
- if elapsed := time.Since(failedTime); elapsed < b.healthCheckTimeout {
- lg.Infof("clientv3/balancer: %q is up but not pinned (failed %v ago, require minimum %v after failure)", addr.Addr, elapsed, b.healthCheckTimeout)
- return false
- }
-
- if ok, _ := b.healthCheck(addr.Addr); ok {
- b.removeUnhealthy(addr.Addr, "health check success")
- return true
- }
-
- b.HostPortError(addr.Addr, errors.New("health check failed"))
- return false
-}
-
-func (b *GRPC17Health) Get(ctx context.Context, opts grpc.BalancerGetOptions) (grpc.Address, func(), error) {
- var (
- addr string
- closed bool
- )
-
- // If opts.BlockingWait is false (for fail-fast RPCs), it should return
- // an address it has notified via Notify immediately instead of blocking.
- if !opts.BlockingWait {
- b.mu.RLock()
- closed = b.closed
- addr = b.pinAddr
- b.mu.RUnlock()
- if closed {
- return grpc.Address{Addr: ""}, nil, grpc.ErrClientConnClosing
- }
- if addr == "" {
- return grpc.Address{Addr: ""}, nil, ErrNoAddrAvailable
- }
- return grpc.Address{Addr: addr}, func() {}, nil
- }
-
- for {
- b.mu.RLock()
- ch := b.upc
- b.mu.RUnlock()
- select {
- case <-ch:
- case <-b.donec:
- return grpc.Address{Addr: ""}, nil, grpc.ErrClientConnClosing
- case <-ctx.Done():
- return grpc.Address{Addr: ""}, nil, ctx.Err()
- }
- b.mu.RLock()
- closed = b.closed
- addr = b.pinAddr
- b.mu.RUnlock()
- // Close() which sets b.closed = true can be called before Get(), Get() must exit if balancer is closed.
- if closed {
- return grpc.Address{Addr: ""}, nil, grpc.ErrClientConnClosing
- }
- if addr != "" {
- break
- }
- }
- return grpc.Address{Addr: addr}, func() {}, nil
-}
-
-func (b *GRPC17Health) Notify() <-chan []grpc.Address { return b.notifyCh }
-
-func (b *GRPC17Health) Close() error {
- b.mu.Lock()
- // In case gRPC calls close twice. TODO: remove the checking
- // when we are sure that gRPC wont call close twice.
- if b.closed {
- b.mu.Unlock()
- <-b.donec
- return nil
- }
- b.closed = true
- b.stopOnce.Do(func() { close(b.stopc) })
- b.pinAddr = ""
-
- // In the case of following scenario:
- // 1. upc is not closed; no pinned address
- // 2. client issues an RPC, calling invoke(), which calls Get(), enters for loop, blocks
- // 3. client.conn.Close() calls balancer.Close(); closed = true
- // 4. for loop in Get() never exits since ctx is the context passed in by the client and may not be canceled
- // we must close upc so Get() exits from blocking on upc
- select {
- case <-b.upc:
- default:
- // terminate all waiting Get()s
- close(b.upc)
- }
-
- b.mu.Unlock()
- b.wg.Wait()
-
- // wait for updateNotifyLoop to finish
- <-b.donec
- close(b.notifyCh)
-
- return nil
-}
-
-func grpcHealthCheck(ep string, dialFunc func(ep string, dopts ...grpc.DialOption) (*grpc.ClientConn, error)) (bool, error) {
- conn, err := dialFunc(ep)
- if err != nil {
- return false, err
- }
- defer conn.Close()
- cli := healthpb.NewHealthClient(conn)
- ctx, cancel := context.WithTimeout(context.Background(), time.Second)
- resp, err := cli.Check(ctx, &healthpb.HealthCheckRequest{})
- cancel()
- if err != nil {
- if s, ok := status.FromError(err); ok && s.Code() == codes.Unavailable {
- if s.Message() == unknownService { // etcd < v3.3.0
- return true, nil
- }
- }
- return false, err
- }
- return resp.Status == healthpb.HealthCheckResponse_SERVING, nil
-}
-
-func hasAddr(addrs []grpc.Address, targetAddr string) bool {
- for _, addr := range addrs {
- if targetAddr == addr.Addr {
- return true
- }
- }
- return false
-}
-
-func getHost(ep string) string {
- url, uerr := url.Parse(ep)
- if uerr != nil || !strings.Contains(ep, "://") {
- return ep
- }
- return url.Host
-}
-
-func eps2addrs(eps []string) []grpc.Address {
- addrs := make([]grpc.Address, len(eps))
- for i := range eps {
- addrs[i].Addr = getHost(eps[i])
- }
- return addrs
-}
-
-func getHostPort2ep(eps []string) map[string]string {
- hm := make(map[string]string, len(eps))
- for i := range eps {
- _, host, _ := parseEndpoint(eps[i])
- hm[host] = eps[i]
- }
- return hm
-}
-
-func parseEndpoint(endpoint string) (proto string, host string, scheme string) {
- proto = "tcp"
- host = endpoint
- url, uerr := url.Parse(endpoint)
- if uerr != nil || !strings.Contains(endpoint, "://") {
- return proto, host, scheme
- }
- scheme = url.Scheme
-
- // strip scheme:// prefix since grpc dials by host
- host = url.Host
- switch url.Scheme {
- case "http", "https":
- case "unix", "unixs":
- proto = "unix"
- host = url.Host + url.Path
- default:
- proto, host = "", ""
- }
- return proto, host, scheme
-}
diff --git a/vendor/go.etcd.io/etcd/clientv3/balancer/picker/err.go b/vendor/go.etcd.io/etcd/clientv3/balancer/picker/err.go
index c70ce15..9e04378 100644
--- a/vendor/go.etcd.io/etcd/clientv3/balancer/picker/err.go
+++ b/vendor/go.etcd.io/etcd/clientv3/balancer/picker/err.go
@@ -22,13 +22,18 @@
// NewErr returns a picker that always returns err on "Pick".
func NewErr(err error) Picker {
- return &errPicker{err: err}
+ return &errPicker{p: Error, err: err}
}
type errPicker struct {
+ p Policy
err error
}
-func (p *errPicker) Pick(context.Context, balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
- return nil, nil, p.err
+func (ep *errPicker) String() string {
+ return ep.p.String()
+}
+
+func (ep *errPicker) Pick(context.Context, balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
+ return nil, nil, ep.err
}
diff --git a/vendor/go.etcd.io/etcd/clientv3/balancer/picker/picker.go b/vendor/go.etcd.io/etcd/clientv3/balancer/picker/picker.go
index 7ea761b..bd1a5d2 100644
--- a/vendor/go.etcd.io/etcd/clientv3/balancer/picker/picker.go
+++ b/vendor/go.etcd.io/etcd/clientv3/balancer/picker/picker.go
@@ -15,10 +15,77 @@
package picker
import (
+ "fmt"
+
+ "go.uber.org/zap"
"google.golang.org/grpc/balancer"
+ "google.golang.org/grpc/resolver"
)
// Picker defines balancer Picker methods.
type Picker interface {
balancer.Picker
+ String() string
+}
+
+// Config defines picker configuration.
+type Config struct {
+ // Policy specifies etcd clientv3's built in balancer policy.
+ Policy Policy
+
+ // Logger defines picker logging object.
+ Logger *zap.Logger
+
+ // SubConnToResolverAddress maps each gRPC sub-connection to an address.
+ // Basically, it is a list of addresses that the Picker can pick from.
+ SubConnToResolverAddress map[balancer.SubConn]resolver.Address
+}
+
+// Policy defines balancer picker policy.
+type Policy uint8
+
+const (
+ // Error is error picker policy.
+ Error Policy = iota
+
+ // RoundrobinBalanced balances loads over multiple endpoints
+ // and implements failover in roundrobin fashion.
+ RoundrobinBalanced
+
+ // Custom defines custom balancer picker.
+ // TODO: custom picker is not supported yet.
+ Custom
+)
+
+func (p Policy) String() string {
+ switch p {
+ case Error:
+ return "picker-error"
+
+ case RoundrobinBalanced:
+ return "picker-roundrobin-balanced"
+
+ case Custom:
+ panic("'custom' picker policy is not supported yet")
+
+ default:
+ panic(fmt.Errorf("invalid balancer picker policy (%d)", p))
+ }
+}
+
+// New creates a new Picker.
+func New(cfg Config) Picker {
+ switch cfg.Policy {
+ case Error:
+ panic("'error' picker policy is not supported here; use 'picker.NewErr'")
+
+ case RoundrobinBalanced:
+ return newRoundrobinBalanced(cfg)
+
+ case Custom:
+ panic("'custom' picker policy is not supported yet")
+
+ default:
+ panic(fmt.Errorf("invalid balancer picker policy (%d)", cfg.Policy))
+ }
}
diff --git a/vendor/go.etcd.io/etcd/clientv3/balancer/picker/picker_policy.go b/vendor/go.etcd.io/etcd/clientv3/balancer/picker/picker_policy.go
deleted file mode 100644
index 463ddc2..0000000
--- a/vendor/go.etcd.io/etcd/clientv3/balancer/picker/picker_policy.go
+++ /dev/null
@@ -1,49 +0,0 @@
-// Copyright 2018 The etcd Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package picker
-
-import "fmt"
-
-// Policy defines balancer picker policy.
-type Policy uint8
-
-const (
- // TODO: custom picker is not supported yet.
- // custom defines custom balancer picker.
- custom Policy = iota
-
- // RoundrobinBalanced balance loads over multiple endpoints
- // and implements failover in roundrobin fashion.
- RoundrobinBalanced Policy = iota
-
- // TODO: only send loads to pinned address "RoundrobinFailover"
- // just like how 3.3 client works
- //
- // TODO: priotize leader
- // TODO: health-check
- // TODO: weighted roundrobin
- // TODO: power of two random choice
-)
-
-func (p Policy) String() string {
- switch p {
- case custom:
- panic("'custom' picker policy is not supported yet")
- case RoundrobinBalanced:
- return "etcd-client-roundrobin-balanced"
- default:
- panic(fmt.Errorf("invalid balancer picker policy (%d)", p))
- }
-}
diff --git a/vendor/go.etcd.io/etcd/clientv3/balancer/picker/roundrobin_balanced.go b/vendor/go.etcd.io/etcd/clientv3/balancer/picker/roundrobin_balanced.go
index b043d57..1b8b285 100644
--- a/vendor/go.etcd.io/etcd/clientv3/balancer/picker/roundrobin_balanced.go
+++ b/vendor/go.etcd.io/etcd/clientv3/balancer/picker/roundrobin_balanced.go
@@ -24,32 +24,33 @@
"google.golang.org/grpc/resolver"
)
-// NewRoundrobinBalanced returns a new roundrobin balanced picker.
-func NewRoundrobinBalanced(
- lg *zap.Logger,
- scs []balancer.SubConn,
- addrToSc map[resolver.Address]balancer.SubConn,
- scToAddr map[balancer.SubConn]resolver.Address,
-) Picker {
+// newRoundrobinBalanced returns a new roundrobin balanced picker.
+func newRoundrobinBalanced(cfg Config) Picker {
+ scs := make([]balancer.SubConn, 0, len(cfg.SubConnToResolverAddress))
+ for sc := range cfg.SubConnToResolverAddress {
+ scs = append(scs, sc)
+ }
return &rrBalanced{
- lg: lg,
+ p: RoundrobinBalanced,
+ lg: cfg.Logger,
scs: scs,
- addrToSc: addrToSc,
- scToAddr: scToAddr,
+ scToAddr: cfg.SubConnToResolverAddress,
}
}
type rrBalanced struct {
+ p Policy
+
lg *zap.Logger
- mu sync.RWMutex
- next int
- scs []balancer.SubConn
-
- addrToSc map[resolver.Address]balancer.SubConn
+ mu sync.RWMutex
+ next int
+ scs []balancer.SubConn
scToAddr map[balancer.SubConn]resolver.Address
}
+func (rb *rrBalanced) String() string { return rb.p.String() }
+
// Pick is called for every client request.
func (rb *rrBalanced) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
rb.mu.RLock()
@@ -68,6 +69,7 @@
rb.lg.Debug(
"picked",
+ zap.String("picker", rb.p.String()),
zap.String("address", picked),
zap.Int("subconn-index", cur),
zap.Int("subconn-size", n),
@@ -77,6 +79,7 @@
// TODO: error handling?
fss := []zapcore.Field{
zap.Error(info.Err),
+ zap.String("picker", rb.p.String()),
zap.String("address", picked),
zap.Bool("success", info.Err == nil),
zap.Bool("bytes-sent", info.BytesSent),
diff --git a/vendor/go.etcd.io/etcd/clientv3/balancer/utils.go b/vendor/go.etcd.io/etcd/clientv3/balancer/utils.go
index a11faeb..48eb875 100644
--- a/vendor/go.etcd.io/etcd/clientv3/balancer/utils.go
+++ b/vendor/go.etcd.io/etcd/clientv3/balancer/utils.go
@@ -29,9 +29,9 @@
return fmt.Sprintf("%p", sc)
}
-func scsToStrings(scs map[resolver.Address]balancer.SubConn) (ss []string) {
+func scsToStrings(scs map[balancer.SubConn]resolver.Address) (ss []string) {
ss = make([]string, 0, len(scs))
- for a, sc := range scs {
+ for sc, a := range scs {
ss = append(ss, fmt.Sprintf("%s (%s)", a.Addr, scToString(sc)))
}
sort.Strings(ss)