VOL-2112 move to voltha-lib-go
Change-Id: Ic1af08003c1d2c698c0cce371e64f47b47b8d875
diff --git a/vendor/go.etcd.io/etcd/clientv3/balancer/balancer.go b/vendor/go.etcd.io/etcd/clientv3/balancer/balancer.go
index 3c44e70..d02a7ee 100644
--- a/vendor/go.etcd.io/etcd/clientv3/balancer/balancer.go
+++ b/vendor/go.etcd.io/etcd/clientv3/balancer/balancer.go
@@ -12,24 +12,45 @@
// See the License for the specific language governing permissions and
// limitations under the License.
+// Package balancer implements client balancer.
package balancer
import (
- "fmt"
"strconv"
"sync"
"time"
+ "go.etcd.io/etcd/clientv3/balancer/connectivity"
"go.etcd.io/etcd/clientv3/balancer/picker"
"go.uber.org/zap"
"google.golang.org/grpc/balancer"
- "google.golang.org/grpc/connectivity"
+ grpcconnectivity "google.golang.org/grpc/connectivity"
"google.golang.org/grpc/resolver"
_ "google.golang.org/grpc/resolver/dns" // register DNS resolver
_ "google.golang.org/grpc/resolver/passthrough" // register passthrough resolver
)
+// Config defines balancer configurations.
+type Config struct {
+ // Policy configures balancer policy.
+ Policy picker.Policy
+
+ // Picker implements gRPC picker.
+ // Leave empty if "Policy" field is not custom.
+ // TODO: currently custom policy is not supported.
+ // Picker picker.Picker
+
+ // Name defines an additional name for balancer.
+ // Useful for balancer testing to avoid register conflicts.
+ // If empty, defaults to policy name.
+ Name string
+
+ // Logger configures balancer logging.
+ // If nil, logs are discarded.
+ Logger *zap.Logger
+}
+
// RegisterBuilder creates and registers a builder. Since this function calls balancer.Register, it
// must be invoked at initialization time.
func RegisterBuilder(cfg Config) {
@@ -54,24 +75,18 @@
bb := &baseBalancer{
id: strconv.FormatInt(time.Now().UnixNano(), 36),
policy: b.cfg.Policy,
- name: b.cfg.Policy.String(),
+ name: b.cfg.Name,
lg: b.cfg.Logger,
addrToSc: make(map[resolver.Address]balancer.SubConn),
scToAddr: make(map[balancer.SubConn]resolver.Address),
- scToSt: make(map[balancer.SubConn]connectivity.State),
+ scToSt: make(map[balancer.SubConn]grpcconnectivity.State),
- currentConn: nil,
- csEvltr: &connectivityStateEvaluator{},
+ currentConn: nil,
+ connectivityRecorder: connectivity.New(b.cfg.Logger),
// initialize picker always returns "ErrNoSubConnAvailable"
- Picker: picker.NewErr(balancer.ErrNoSubConnAvailable),
- }
- if b.cfg.Name != "" {
- bb.name = b.cfg.Name
- }
- if bb.lg == nil {
- bb.lg = zap.NewNop()
+ picker: picker.NewErr(balancer.ErrNoSubConnAvailable),
}
// TODO: support multiple connections
@@ -115,13 +130,12 @@
addrToSc map[resolver.Address]balancer.SubConn
scToAddr map[balancer.SubConn]resolver.Address
- scToSt map[balancer.SubConn]connectivity.State
+ scToSt map[balancer.SubConn]grpcconnectivity.State
- currentConn balancer.ClientConn
- currentState connectivity.State
- csEvltr *connectivityStateEvaluator
+ currentConn balancer.ClientConn
+ connectivityRecorder connectivity.Recorder
- picker.Picker
+ picker picker.Picker
}
// HandleResolvedAddrs implements "grpc/balancer.Balancer" interface.
@@ -131,7 +145,11 @@
bb.lg.Warn("HandleResolvedAddrs called with error", zap.String("balancer-id", bb.id), zap.Error(err))
return
}
- bb.lg.Info("resolved", zap.String("balancer-id", bb.id), zap.Strings("addresses", addrsToStrings(addrs)))
+ bb.lg.Info("resolved",
+ zap.String("picker", bb.picker.String()),
+ zap.String("balancer-id", bb.id),
+ zap.Strings("addresses", addrsToStrings(addrs)),
+ )
bb.mu.Lock()
defer bb.mu.Unlock()
@@ -142,12 +160,13 @@
if _, ok := bb.addrToSc[addr]; !ok {
sc, err := bb.currentConn.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{})
if err != nil {
- bb.lg.Warn("NewSubConn failed", zap.String("balancer-id", bb.id), zap.Error(err), zap.String("address", addr.Addr))
+ bb.lg.Warn("NewSubConn failed", zap.String("picker", bb.picker.String()), zap.String("balancer-id", bb.id), zap.Error(err), zap.String("address", addr.Addr))
continue
}
+ bb.lg.Info("created subconn", zap.String("address", addr.Addr))
bb.addrToSc[addr] = sc
bb.scToAddr[sc] = addr
- bb.scToSt[sc] = connectivity.Idle
+ bb.scToSt[sc] = grpcconnectivity.Idle
sc.Connect()
}
}
@@ -160,6 +179,7 @@
bb.lg.Info(
"removed subconn",
+ zap.String("picker", bb.picker.String()),
zap.String("balancer-id", bb.id),
zap.String("address", addr.Addr),
zap.String("subconn", scToString(sc)),
@@ -174,7 +194,7 @@
}
// HandleSubConnStateChange implements "grpc/balancer.Balancer" interface.
-func (bb *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
+func (bb *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s grpcconnectivity.State) {
bb.mu.Lock()
defer bb.mu.Unlock()
@@ -182,8 +202,10 @@
if !ok {
bb.lg.Warn(
"state change for an unknown subconn",
+ zap.String("picker", bb.picker.String()),
zap.String("balancer-id", bb.id),
zap.String("subconn", scToString(sc)),
+ zap.Int("subconn-size", len(bb.scToAddr)),
zap.String("state", s.String()),
)
return
@@ -191,9 +213,11 @@
bb.lg.Info(
"state changed",
+ zap.String("picker", bb.picker.String()),
zap.String("balancer-id", bb.id),
- zap.Bool("connected", s == connectivity.Ready),
+ zap.Bool("connected", s == grpcconnectivity.Ready),
zap.String("subconn", scToString(sc)),
+ zap.Int("subconn-size", len(bb.scToAddr)),
zap.String("address", bb.scToAddr[sc].Addr),
zap.String("old-state", old.String()),
zap.String("new-state", s.String()),
@@ -201,69 +225,63 @@
bb.scToSt[sc] = s
switch s {
- case connectivity.Idle:
+ case grpcconnectivity.Idle:
sc.Connect()
- case connectivity.Shutdown:
+ case grpcconnectivity.Shutdown:
// When an address was removed by resolver, b called RemoveSubConn but
// kept the sc's state in scToSt. Remove state for this sc here.
delete(bb.scToAddr, sc)
delete(bb.scToSt, sc)
}
- oldAggrState := bb.currentState
- bb.currentState = bb.csEvltr.recordTransition(old, s)
+ oldAggrState := bb.connectivityRecorder.GetCurrentState()
+ bb.connectivityRecorder.RecordTransition(old, s)
- // Regenerate picker when one of the following happens:
+ // Update balancer picker when one of the following happens:
// - this sc became ready from not-ready
// - this sc became not-ready from ready
// - the aggregated state of balancer became TransientFailure from non-TransientFailure
// - the aggregated state of balancer became non-TransientFailure from TransientFailure
- if (s == connectivity.Ready) != (old == connectivity.Ready) ||
- (bb.currentState == connectivity.TransientFailure) != (oldAggrState == connectivity.TransientFailure) {
- bb.regeneratePicker()
+ if (s == grpcconnectivity.Ready) != (old == grpcconnectivity.Ready) ||
+ (bb.connectivityRecorder.GetCurrentState() == grpcconnectivity.TransientFailure) != (oldAggrState == grpcconnectivity.TransientFailure) {
+ bb.updatePicker()
}
- bb.currentConn.UpdateBalancerState(bb.currentState, bb.Picker)
- return
+ bb.currentConn.UpdateBalancerState(bb.connectivityRecorder.GetCurrentState(), bb.picker)
}
-func (bb *baseBalancer) regeneratePicker() {
- if bb.currentState == connectivity.TransientFailure {
+func (bb *baseBalancer) updatePicker() {
+ if bb.connectivityRecorder.GetCurrentState() == grpcconnectivity.TransientFailure {
+ bb.picker = picker.NewErr(balancer.ErrTransientFailure)
bb.lg.Info(
- "generated transient error picker",
+ "updated picker to transient error picker",
+ zap.String("picker", bb.picker.String()),
zap.String("balancer-id", bb.id),
zap.String("policy", bb.policy.String()),
)
- bb.Picker = picker.NewErr(balancer.ErrTransientFailure)
return
}
// only pass ready subconns to picker
- scs := make([]balancer.SubConn, 0)
- addrToSc := make(map[resolver.Address]balancer.SubConn)
scToAddr := make(map[balancer.SubConn]resolver.Address)
for addr, sc := range bb.addrToSc {
- if st, ok := bb.scToSt[sc]; ok && st == connectivity.Ready {
- scs = append(scs, sc)
- addrToSc[addr] = sc
+ if st, ok := bb.scToSt[sc]; ok && st == grpcconnectivity.Ready {
scToAddr[sc] = addr
}
}
- switch bb.policy {
- case picker.RoundrobinBalanced:
- bb.Picker = picker.NewRoundrobinBalanced(bb.lg, scs, addrToSc, scToAddr)
-
- default:
- panic(fmt.Errorf("invalid balancer picker policy (%d)", bb.policy))
- }
-
+ bb.picker = picker.New(picker.Config{
+ Policy: bb.policy,
+ Logger: bb.lg,
+ SubConnToResolverAddress: scToAddr,
+ })
bb.lg.Info(
- "generated picker",
+ "updated picker",
+ zap.String("picker", bb.picker.String()),
zap.String("balancer-id", bb.id),
zap.String("policy", bb.policy.String()),
- zap.Strings("subconn-ready", scsToStrings(addrToSc)),
- zap.Int("subconn-size", len(addrToSc)),
+ zap.Strings("subconn-ready", scsToStrings(scToAddr)),
+ zap.Int("subconn-size", len(scToAddr)),
)
}