[VOL-4291] Rw-core updates for gRPC migration

Change-Id: I8d5a554409115b29318089671ca4e1ab3fa98810
diff --git a/vendor/github.com/coreos/etcd/clientv3/balancer/balancer.go b/vendor/github.com/coreos/etcd/clientv3/balancer/balancer.go
new file mode 100644
index 0000000..9306385
--- /dev/null
+++ b/vendor/github.com/coreos/etcd/clientv3/balancer/balancer.go
@@ -0,0 +1,293 @@
+// Copyright 2018 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package balancer implements client balancer.
+package balancer
+
+import (
+	"strconv"
+	"sync"
+	"time"
+
+	"github.com/coreos/etcd/clientv3/balancer/connectivity"
+	"github.com/coreos/etcd/clientv3/balancer/picker"
+
+	"go.uber.org/zap"
+	"google.golang.org/grpc/balancer"
+	grpcconnectivity "google.golang.org/grpc/connectivity"
+	"google.golang.org/grpc/resolver"
+	_ "google.golang.org/grpc/resolver/dns"         // register DNS resolver
+	_ "google.golang.org/grpc/resolver/passthrough" // register passthrough resolver
+)
+
+// Config defines balancer configurations.
+type Config struct {
+	// Policy configures balancer policy.
+	Policy picker.Policy
+
+	// Picker implements gRPC picker.
+	// Leave empty if "Policy" field is not custom.
+	// TODO: currently custom policy is not supported.
+	// Picker picker.Picker
+
+	// Name defines an additional name for balancer.
+	// Useful for balancer testing to avoid register conflicts.
+	// If empty, defaults to policy name.
+	Name string
+
+	// Logger configures balancer logging.
+	// If nil, logs are discarded.
+	Logger *zap.Logger
+}
+
+// RegisterBuilder creates and registers a builder. Since this function calls balancer.Register, it
+// must be invoked at initialization time.
+func RegisterBuilder(cfg Config) {
+	bb := &builder{cfg}
+	balancer.Register(bb)
+
+	bb.cfg.Logger.Debug(
+		"registered balancer",
+		zap.String("policy", bb.cfg.Policy.String()),
+		zap.String("name", bb.cfg.Name),
+	)
+}
+
+type builder struct {
+	cfg Config
+}
+
+// Build is called initially when creating "ccBalancerWrapper".
+// "grpc.Dial" is called to this client connection.
+// Then, resolved addresses will be handled via "HandleResolvedAddrs".
+func (b *builder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
+	bb := &baseBalancer{
+		id:     strconv.FormatInt(time.Now().UnixNano(), 36),
+		policy: b.cfg.Policy,
+		name:   b.cfg.Name,
+		lg:     b.cfg.Logger,
+
+		addrToSc: make(map[resolver.Address]balancer.SubConn),
+		scToAddr: make(map[balancer.SubConn]resolver.Address),
+		scToSt:   make(map[balancer.SubConn]grpcconnectivity.State),
+
+		currentConn:          nil,
+		connectivityRecorder: connectivity.New(b.cfg.Logger),
+
+		// initialize picker always returns "ErrNoSubConnAvailable"
+		picker: picker.NewErr(balancer.ErrNoSubConnAvailable),
+	}
+
+	// TODO: support multiple connections
+	bb.mu.Lock()
+	bb.currentConn = cc
+	bb.mu.Unlock()
+
+	bb.lg.Info(
+		"built balancer",
+		zap.String("balancer-id", bb.id),
+		zap.String("policy", bb.policy.String()),
+		zap.String("resolver-target", cc.Target()),
+	)
+	return bb
+}
+
+// Name implements "grpc/balancer.Builder" interface.
+func (b *builder) Name() string { return b.cfg.Name }
+
+// Balancer defines client balancer interface.
+type Balancer interface {
+	// Balancer is called on specified client connection. Client initiates gRPC
+	// connection with "grpc.Dial(addr, grpc.WithBalancerName)", and then those resolved
+	// addresses are passed to "grpc/balancer.Balancer.HandleResolvedAddrs".
+	// For each resolved address, balancer calls "balancer.ClientConn.NewSubConn".
+	// "grpc/balancer.Balancer.HandleSubConnStateChange" is called when connectivity state
+	// changes, thus requires failover logic in this method.
+	balancer.Balancer
+
+	// Picker calls "Pick" for every client request.
+	picker.Picker
+}
+
+type baseBalancer struct {
+	id     string
+	policy picker.Policy
+	name   string
+	lg     *zap.Logger
+
+	mu sync.RWMutex
+
+	addrToSc map[resolver.Address]balancer.SubConn
+	scToAddr map[balancer.SubConn]resolver.Address
+	scToSt   map[balancer.SubConn]grpcconnectivity.State
+
+	currentConn          balancer.ClientConn
+	connectivityRecorder connectivity.Recorder
+
+	picker picker.Picker
+}
+
+// HandleResolvedAddrs implements "grpc/balancer.Balancer" interface.
+// gRPC sends initial or updated resolved addresses from "Build".
+func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
+	if err != nil {
+		bb.lg.Warn("HandleResolvedAddrs called with error", zap.String("balancer-id", bb.id), zap.Error(err))
+		return
+	}
+	bb.lg.Info("resolved",
+		zap.String("picker", bb.picker.String()),
+		zap.String("balancer-id", bb.id),
+		zap.Strings("addresses", addrsToStrings(addrs)),
+	)
+
+	bb.mu.Lock()
+	defer bb.mu.Unlock()
+
+	resolved := make(map[resolver.Address]struct{})
+	for _, addr := range addrs {
+		resolved[addr] = struct{}{}
+		if _, ok := bb.addrToSc[addr]; !ok {
+			sc, err := bb.currentConn.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{})
+			if err != nil {
+				bb.lg.Warn("NewSubConn failed", zap.String("picker", bb.picker.String()), zap.String("balancer-id", bb.id), zap.Error(err), zap.String("address", addr.Addr))
+				continue
+			}
+			bb.lg.Info("created subconn", zap.String("address", addr.Addr))
+			bb.addrToSc[addr] = sc
+			bb.scToAddr[sc] = addr
+			bb.scToSt[sc] = grpcconnectivity.Idle
+			sc.Connect()
+		}
+	}
+
+	for addr, sc := range bb.addrToSc {
+		if _, ok := resolved[addr]; !ok {
+			// was removed by resolver or failed to create subconn
+			bb.currentConn.RemoveSubConn(sc)
+			delete(bb.addrToSc, addr)
+
+			bb.lg.Info(
+				"removed subconn",
+				zap.String("picker", bb.picker.String()),
+				zap.String("balancer-id", bb.id),
+				zap.String("address", addr.Addr),
+				zap.String("subconn", scToString(sc)),
+			)
+
+			// Keep the state of this sc in bb.scToSt until sc's state becomes Shutdown.
+			// The entry will be deleted in HandleSubConnStateChange.
+			// (DO NOT) delete(bb.scToAddr, sc)
+			// (DO NOT) delete(bb.scToSt, sc)
+		}
+	}
+}
+
+// HandleSubConnStateChange implements "grpc/balancer.Balancer" interface.
+func (bb *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s grpcconnectivity.State) {
+	bb.mu.Lock()
+	defer bb.mu.Unlock()
+
+	old, ok := bb.scToSt[sc]
+	if !ok {
+		bb.lg.Warn(
+			"state change for an unknown subconn",
+			zap.String("picker", bb.picker.String()),
+			zap.String("balancer-id", bb.id),
+			zap.String("subconn", scToString(sc)),
+			zap.Int("subconn-size", len(bb.scToAddr)),
+			zap.String("state", s.String()),
+		)
+		return
+	}
+
+	bb.lg.Info(
+		"state changed",
+		zap.String("picker", bb.picker.String()),
+		zap.String("balancer-id", bb.id),
+		zap.Bool("connected", s == grpcconnectivity.Ready),
+		zap.String("subconn", scToString(sc)),
+		zap.Int("subconn-size", len(bb.scToAddr)),
+		zap.String("address", bb.scToAddr[sc].Addr),
+		zap.String("old-state", old.String()),
+		zap.String("new-state", s.String()),
+	)
+
+	bb.scToSt[sc] = s
+	switch s {
+	case grpcconnectivity.Idle:
+		sc.Connect()
+	case grpcconnectivity.Shutdown:
+		// When an address was removed by resolver, b called RemoveSubConn but
+		// kept the sc's state in scToSt. Remove state for this sc here.
+		delete(bb.scToAddr, sc)
+		delete(bb.scToSt, sc)
+	}
+
+	oldAggrState := bb.connectivityRecorder.GetCurrentState()
+	bb.connectivityRecorder.RecordTransition(old, s)
+
+	// Update balancer picker when one of the following happens:
+	//  - this sc became ready from not-ready
+	//  - this sc became not-ready from ready
+	//  - the aggregated state of balancer became TransientFailure from non-TransientFailure
+	//  - the aggregated state of balancer became non-TransientFailure from TransientFailure
+	if (s == grpcconnectivity.Ready) != (old == grpcconnectivity.Ready) ||
+		(bb.connectivityRecorder.GetCurrentState() == grpcconnectivity.TransientFailure) != (oldAggrState == grpcconnectivity.TransientFailure) {
+		bb.updatePicker()
+	}
+
+	bb.currentConn.UpdateBalancerState(bb.connectivityRecorder.GetCurrentState(), bb.picker)
+}
+
+func (bb *baseBalancer) updatePicker() {
+	if bb.connectivityRecorder.GetCurrentState() == grpcconnectivity.TransientFailure {
+		bb.picker = picker.NewErr(balancer.ErrTransientFailure)
+		bb.lg.Info(
+			"updated picker to transient error picker",
+			zap.String("picker", bb.picker.String()),
+			zap.String("balancer-id", bb.id),
+			zap.String("policy", bb.policy.String()),
+		)
+		return
+	}
+
+	// only pass ready subconns to picker
+	scToAddr := make(map[balancer.SubConn]resolver.Address)
+	for addr, sc := range bb.addrToSc {
+		if st, ok := bb.scToSt[sc]; ok && st == grpcconnectivity.Ready {
+			scToAddr[sc] = addr
+		}
+	}
+
+	bb.picker = picker.New(picker.Config{
+		Policy:                   bb.policy,
+		Logger:                   bb.lg,
+		SubConnToResolverAddress: scToAddr,
+	})
+	bb.lg.Info(
+		"updated picker",
+		zap.String("picker", bb.picker.String()),
+		zap.String("balancer-id", bb.id),
+		zap.String("policy", bb.policy.String()),
+		zap.Strings("subconn-ready", scsToStrings(scToAddr)),
+		zap.Int("subconn-size", len(scToAddr)),
+	)
+}
+
+// Close implements "grpc/balancer.Balancer" interface.
+// Close is a nop because base balancer doesn't have internal state to clean up,
+// and it doesn't need to call RemoveSubConn for the SubConns.
+func (bb *baseBalancer) Close() {
+	// TODO
+}