gRPC migration

Change-Id: Ib390f6dde0d5a8d6db12ccd7da41135570ad1354
diff --git a/vendor/google.golang.org/grpc/clientconn.go b/vendor/google.golang.org/grpc/clientconn.go
index 0740693..4414ba8 100644
--- a/vendor/google.golang.org/grpc/clientconn.go
+++ b/vendor/google.golang.org/grpc/clientconn.go
@@ -35,10 +35,10 @@
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/connectivity"
 	"google.golang.org/grpc/credentials"
+	"google.golang.org/grpc/grpclog"
 	"google.golang.org/grpc/internal/backoff"
 	"google.golang.org/grpc/internal/channelz"
 	"google.golang.org/grpc/internal/grpcsync"
-	"google.golang.org/grpc/internal/grpcutil"
 	"google.golang.org/grpc/internal/transport"
 	"google.golang.org/grpc/keepalive"
 	"google.golang.org/grpc/resolver"
@@ -151,7 +151,7 @@
 	if channelz.IsOn() {
 		if cc.dopts.channelzParentID != 0 {
 			cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target)
-			channelz.AddTraceEvent(cc.channelzID, 0, &channelz.TraceEventDesc{
+			channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
 				Desc:     "Channel Created",
 				Severity: channelz.CtINFO,
 				Parent: &channelz.TraceEventDesc{
@@ -161,7 +161,10 @@
 			})
 		} else {
 			cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, 0, target)
-			channelz.Info(cc.channelzID, "Channel Created")
+			channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
+				Desc:     "Channel Created",
+				Severity: channelz.CtINFO,
+			})
 		}
 		cc.csMgr.channelzID = cc.channelzID
 	}
@@ -194,13 +197,12 @@
 	cc.mkp = cc.dopts.copts.KeepaliveParams
 
 	if cc.dopts.copts.Dialer == nil {
-		cc.dopts.copts.Dialer = func(ctx context.Context, addr string) (net.Conn, error) {
-			network, addr := parseDialTarget(addr)
-			return (&net.Dialer{}).DialContext(ctx, network, addr)
-		}
-		if cc.dopts.withProxy {
-			cc.dopts.copts.Dialer = newProxyDialer(cc.dopts.copts.Dialer)
-		}
+		cc.dopts.copts.Dialer = newProxyDialer(
+			func(ctx context.Context, addr string) (net.Conn, error) {
+				network, addr := parseDialTarget(addr)
+				return (&net.Dialer{}).DialContext(ctx, network, addr)
+			},
+		)
 	}
 
 	if cc.dopts.copts.UserAgent != "" {
@@ -237,26 +239,25 @@
 	if cc.dopts.bs == nil {
 		cc.dopts.bs = backoff.DefaultExponential
 	}
-
-	// Determine the resolver to use.
-	cc.parsedTarget = grpcutil.ParseTarget(cc.target)
-	channelz.Infof(cc.channelzID, "parsed scheme: %q", cc.parsedTarget.Scheme)
-	resolverBuilder := cc.getResolver(cc.parsedTarget.Scheme)
-	if resolverBuilder == nil {
-		// If resolver builder is still nil, the parsed target's scheme is
-		// not registered. Fallback to default resolver and set Endpoint to
-		// the original target.
-		channelz.Infof(cc.channelzID, "scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme)
-		cc.parsedTarget = resolver.Target{
-			Scheme:   resolver.GetDefaultScheme(),
-			Endpoint: target,
+	if cc.dopts.resolverBuilder == nil {
+		// Only try to parse target when resolver builder is not already set.
+		cc.parsedTarget = parseTarget(cc.target)
+		grpclog.Infof("parsed scheme: %q", cc.parsedTarget.Scheme)
+		cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
+		if cc.dopts.resolverBuilder == nil {
+			// If resolver builder is still nil, the parsed target's scheme is
+			// not registered. Fallback to default resolver and set Endpoint to
+			// the original target.
+			grpclog.Infof("scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme)
+			cc.parsedTarget = resolver.Target{
+				Scheme:   resolver.GetDefaultScheme(),
+				Endpoint: target,
+			}
+			cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
 		}
-		resolverBuilder = cc.getResolver(cc.parsedTarget.Scheme)
-		if resolverBuilder == nil {
-			return nil, fmt.Errorf("could not get resolver for default scheme: %q", cc.parsedTarget.Scheme)
-		}
+	} else {
+		cc.parsedTarget = resolver.Target{Endpoint: target}
 	}
-
 	creds := cc.dopts.copts.TransportCredentials
 	if creds != nil && creds.Info().ServerName != "" {
 		cc.authority = creds.Info().ServerName
@@ -296,14 +297,14 @@
 	}
 
 	// Build the resolver.
-	rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)
+	rWrapper, err := newCCResolverWrapper(cc)
 	if err != nil {
 		return nil, fmt.Errorf("failed to build resolver: %v", err)
 	}
+
 	cc.mu.Lock()
 	cc.resolverWrapper = rWrapper
 	cc.mu.Unlock()
-
 	// A blocking dial blocks until the clientConn is ready.
 	if cc.dopts.block {
 		for {
@@ -414,7 +415,12 @@
 		return
 	}
 	csm.state = state
-	channelz.Infof(csm.channelzID, "Channel Connectivity change to %v", state)
+	if channelz.IsOn() {
+		channelz.AddTraceEvent(csm.channelzID, &channelz.TraceEventDesc{
+			Desc:     fmt.Sprintf("Channel Connectivity change to %v", state),
+			Severity: channelz.CtINFO,
+		})
+	}
 	if csm.notifyChan != nil {
 		// There are other goroutines waiting on this channel.
 		close(csm.notifyChan)
@@ -437,20 +443,6 @@
 	return csm.notifyChan
 }
 
-// ClientConnInterface defines the functions clients need to perform unary and
-// streaming RPCs.  It is implemented by *ClientConn, and is only intended to
-// be referenced by generated code.
-type ClientConnInterface interface {
-	// Invoke performs a unary RPC and returns after the response is received
-	// into reply.
-	Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...CallOption) error
-	// NewStream begins a streaming RPC.
-	NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error)
-}
-
-// Assert *ClientConn implements ClientConnInterface.
-var _ ClientConnInterface = (*ClientConn)(nil)
-
 // ClientConn represents a virtual connection to a conceptual endpoint, to
 // perform RPCs.
 //
@@ -664,9 +656,9 @@
 		return
 	}
 
-	channelz.Infof(cc.channelzID, "ClientConn switching balancer to %q", name)
+	grpclog.Infof("ClientConn switching balancer to %q", name)
 	if cc.dopts.balancerBuilder != nil {
-		channelz.Info(cc.channelzID, "ignoring balancer switching: Balancer DialOption used instead")
+		grpclog.Infoln("ignoring balancer switching: Balancer DialOption used instead")
 		return
 	}
 	if cc.balancerWrapper != nil {
@@ -674,19 +666,29 @@
 	}
 
 	builder := balancer.Get(name)
+	if channelz.IsOn() {
+		if builder == nil {
+			channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
+				Desc:     fmt.Sprintf("Channel switches to new LB policy %q due to fallback from invalid balancer name", PickFirstBalancerName),
+				Severity: channelz.CtWarning,
+			})
+		} else {
+			channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
+				Desc:     fmt.Sprintf("Channel switches to new LB policy %q", name),
+				Severity: channelz.CtINFO,
+			})
+		}
+	}
 	if builder == nil {
-		channelz.Warningf(cc.channelzID, "Channel switches to new LB policy %q due to fallback from invalid balancer name", PickFirstBalancerName)
-		channelz.Infof(cc.channelzID, "failed to get balancer builder for: %v, using pick_first instead", name)
+		grpclog.Infof("failed to get balancer builder for: %v, using pick_first instead", name)
 		builder = newPickfirstBuilder()
-	} else {
-		channelz.Infof(cc.channelzID, "Channel switches to new LB policy %q", name)
 	}
 
 	cc.curBalancerName = builder.Name()
 	cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
 }
 
-func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) {
+func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
 	cc.mu.Lock()
 	if cc.conns == nil {
 		cc.mu.Unlock()
@@ -694,7 +696,7 @@
 	}
 	// TODO(bar switching) send updates to all balancer wrappers when balancer
 	// gracefully switching is supported.
-	cc.balancerWrapper.handleSubConnStateChange(sc, s, err)
+	cc.balancerWrapper.handleSubConnStateChange(sc, s)
 	cc.mu.Unlock()
 }
 
@@ -703,7 +705,6 @@
 // Caller needs to make sure len(addrs) > 0.
 func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {
 	ac := &addrConn{
-		state:        connectivity.Idle,
 		cc:           cc,
 		addrs:        addrs,
 		scopts:       opts,
@@ -720,7 +721,7 @@
 	}
 	if channelz.IsOn() {
 		ac.channelzID = channelz.RegisterSubChannel(ac, cc.channelzID, "")
-		channelz.AddTraceEvent(ac.channelzID, 0, &channelz.TraceEventDesc{
+		channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
 			Desc:     "Subchannel Created",
 			Severity: channelz.CtINFO,
 			Parent: &channelz.TraceEventDesc{
@@ -792,7 +793,7 @@
 	}
 	// Update connectivity state within the lock to prevent subsequent or
 	// concurrent calls from resetting the transport more than once.
-	ac.updateConnectivityState(connectivity.Connecting, nil)
+	ac.updateConnectivityState(connectivity.Connecting)
 	ac.mu.Unlock()
 
 	// Start a goroutine connecting to the server asynchronously.
@@ -818,7 +819,7 @@
 func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
 	ac.mu.Lock()
 	defer ac.mu.Unlock()
-	channelz.Infof(ac.channelzID, "addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs)
+	grpclog.Infof("addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs)
 	if ac.state == connectivity.Shutdown ||
 		ac.state == connectivity.TransientFailure ||
 		ac.state == connectivity.Idle {
@@ -838,7 +839,7 @@
 			break
 		}
 	}
-	channelz.Infof(ac.channelzID, "addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound)
+	grpclog.Infof("addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound)
 	if curAddrFound {
 		ac.addrs = addrs
 	}
@@ -878,8 +879,7 @@
 }
 
 func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
-	t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{
-		Ctx:            ctx,
+	t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickOptions{
 		FullMethodName: method,
 	})
 	if err != nil {
@@ -938,7 +938,7 @@
 	}
 }
 
-func (cc *ClientConn) resolveNow(o resolver.ResolveNowOptions) {
+func (cc *ClientConn) resolveNow(o resolver.ResolveNowOption) {
 	cc.mu.RLock()
 	r := cc.resolverWrapper
 	cc.mu.RUnlock()
@@ -1009,7 +1009,7 @@
 				Severity: channelz.CtINFO,
 			}
 		}
-		channelz.AddTraceEvent(cc.channelzID, 0, ted)
+		channelz.AddTraceEvent(cc.channelzID, ted)
 		// TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
 		// the entity being deleted, and thus prevent it from being deleted right away.
 		channelz.RemoveEntry(cc.channelzID)
@@ -1048,13 +1048,20 @@
 }
 
 // Note: this requires a lock on ac.mu.
-func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error) {
+func (ac *addrConn) updateConnectivityState(s connectivity.State) {
 	if ac.state == s {
 		return
 	}
+
+	updateMsg := fmt.Sprintf("Subchannel Connectivity change to %v", s)
 	ac.state = s
-	channelz.Infof(ac.channelzID, "Subchannel Connectivity change to %v", s)
-	ac.cc.handleSubConnStateChange(ac.acbw, s, lastErr)
+	if channelz.IsOn() {
+		channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
+			Desc:     updateMsg,
+			Severity: channelz.CtINFO,
+		})
+	}
+	ac.cc.handleSubConnStateChange(ac.acbw, s)
 }
 
 // adjustParams updates parameters used to create transports upon
@@ -1074,7 +1081,7 @@
 func (ac *addrConn) resetTransport() {
 	for i := 0; ; i++ {
 		if i > 0 {
-			ac.cc.resolveNow(resolver.ResolveNowOptions{})
+			ac.cc.resolveNow(resolver.ResolveNowOption{})
 		}
 
 		ac.mu.Lock()
@@ -1103,7 +1110,7 @@
 		// https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md#proposed-backoff-algorithm
 		connectDeadline := time.Now().Add(dialDuration)
 
-		ac.updateConnectivityState(connectivity.Connecting, nil)
+		ac.updateConnectivityState(connectivity.Connecting)
 		ac.transport = nil
 		ac.mu.Unlock()
 
@@ -1116,7 +1123,7 @@
 				ac.mu.Unlock()
 				return
 			}
-			ac.updateConnectivityState(connectivity.TransientFailure, err)
+			ac.updateConnectivityState(connectivity.TransientFailure)
 
 			// Backoff.
 			b := ac.resetBackoff
@@ -1172,7 +1179,6 @@
 // first successful one. It returns the transport, the address and a Event in
 // the successful case. The Event fires when the returned transport disconnects.
 func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.Time) (transport.ClientTransport, resolver.Address, *grpcsync.Event, error) {
-	var firstConnErr error
 	for _, addr := range addrs {
 		ac.mu.Lock()
 		if ac.state == connectivity.Shutdown {
@@ -1190,20 +1196,22 @@
 		}
 		ac.mu.Unlock()
 
-		channelz.Infof(ac.channelzID, "Subchannel picks a new address %q to connect", addr.Addr)
+		if channelz.IsOn() {
+			channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
+				Desc:     fmt.Sprintf("Subchannel picks a new address %q to connect", addr.Addr),
+				Severity: channelz.CtINFO,
+			})
+		}
 
 		newTr, reconnect, err := ac.createTransport(addr, copts, connectDeadline)
 		if err == nil {
 			return newTr, addr, reconnect, nil
 		}
-		if firstConnErr == nil {
-			firstConnErr = err
-		}
 		ac.cc.blockingpicker.updateConnectionError(err)
 	}
 
 	// Couldn't connect to any address.
-	return nil, resolver.Address{}, nil, firstConnErr
+	return nil, resolver.Address{}, nil, fmt.Errorf("couldn't connect to any address")
 }
 
 // createTransport creates a connection to addr. It returns the transport and a
@@ -1236,7 +1244,7 @@
 				// state to Connecting.
 				//
 				// TODO: this should be Idle when grpc-go properly supports it.
-				ac.updateConnectivityState(connectivity.Connecting, nil)
+				ac.updateConnectivityState(connectivity.Connecting)
 			}
 		})
 		ac.mu.Unlock()
@@ -1251,7 +1259,7 @@
 				// state to Connecting.
 				//
 				// TODO: this should be Idle when grpc-go properly supports it.
-				ac.updateConnectivityState(connectivity.Connecting, nil)
+				ac.updateConnectivityState(connectivity.Connecting)
 			}
 		})
 		ac.mu.Unlock()
@@ -1272,15 +1280,15 @@
 	newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt, onGoAway, onClose)
 	if err != nil {
 		// newTr is either nil, or closed.
-		channelz.Warningf(ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v. Err: %v. Reconnecting...", addr, err)
+		grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v. Err :%v. Reconnecting...", addr, err)
 		return nil, nil, err
 	}
 
 	select {
-	case <-time.After(time.Until(connectDeadline)):
+	case <-time.After(connectDeadline.Sub(time.Now())):
 		// We didn't get the preface in time.
 		newTr.Close()
-		channelz.Warningf(ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr)
+		grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr)
 		return nil, nil, errors.New("timed out waiting for server handshake")
 	case <-prefaceReceived:
 		// We got the preface - huzzah! things are good.
@@ -1308,7 +1316,7 @@
 	var healthcheckManagingState bool
 	defer func() {
 		if !healthcheckManagingState {
-			ac.updateConnectivityState(connectivity.Ready, nil)
+			ac.updateConnectivityState(connectivity.Ready)
 		}
 	}()
 
@@ -1327,7 +1335,7 @@
 		// The health package is not imported to set health check function.
 		//
 		// TODO: add a link to the health check doc in the error message.
-		channelz.Error(ac.channelzID, "Health check is requested but health check function is not set.")
+		grpclog.Error("Health check is requested but health check function is not set.")
 		return
 	}
 
@@ -1344,22 +1352,28 @@
 		ac.mu.Unlock()
 		return newNonRetryClientStream(ctx, &StreamDesc{ServerStreams: true}, method, currentTr, ac)
 	}
-	setConnectivityState := func(s connectivity.State, lastErr error) {
+	setConnectivityState := func(s connectivity.State) {
 		ac.mu.Lock()
 		defer ac.mu.Unlock()
 		if ac.transport != currentTr {
 			return
 		}
-		ac.updateConnectivityState(s, lastErr)
+		ac.updateConnectivityState(s)
 	}
 	// Start the health checking stream.
 	go func() {
 		err := ac.cc.dopts.healthCheckFunc(ctx, newStream, setConnectivityState, healthCheckConfig.ServiceName)
 		if err != nil {
 			if status.Code(err) == codes.Unimplemented {
-				channelz.Error(ac.channelzID, "Subchannel health check is unimplemented at server side, thus health check is disabled")
+				if channelz.IsOn() {
+					channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
+						Desc:     "Subchannel health check is unimplemented at server side, thus health check is disabled",
+						Severity: channelz.CtError,
+					})
+				}
+				grpclog.Error("Subchannel health check is unimplemented at server side, thus health check is disabled")
 			} else {
-				channelz.Errorf(ac.channelzID, "HealthCheckFunc exits with unexpected error %v", err)
+				grpclog.Errorf("HealthCheckFunc exits with unexpected error %v", err)
 			}
 		}
 	}()
@@ -1410,7 +1424,7 @@
 	ac.transport = nil
 	// We have to set the state to Shutdown before anything else to prevent races
 	// between setting the state and logic that waits on context cancellation / etc.
-	ac.updateConnectivityState(connectivity.Shutdown, nil)
+	ac.updateConnectivityState(connectivity.Shutdown)
 	ac.cancel()
 	ac.curAddr = resolver.Address{}
 	if err == errConnDrain && curTr != nil {
@@ -1424,7 +1438,7 @@
 		ac.mu.Lock()
 	}
 	if channelz.IsOn() {
-		channelz.AddTraceEvent(ac.channelzID, 0, &channelz.TraceEventDesc{
+		channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
 			Desc:     "Subchannel Deleted",
 			Severity: channelz.CtINFO,
 			Parent: &channelz.TraceEventDesc{
@@ -1523,12 +1537,3 @@
 // Deprecated: This error is never returned by grpc and should not be
 // referenced by users.
 var ErrClientConnTimeout = errors.New("grpc: timed out when dialing")
-
-func (cc *ClientConn) getResolver(scheme string) resolver.Builder {
-	for _, rb := range cc.dopts.resolvers {
-		if scheme == rb.Scheme() {
-			return rb
-		}
-	}
-	return resolver.Get(scheme)
-}