VOL-2112 move to voltha-lib-go

Change-Id: Ic1af08003c1d2c698c0cce371e64f47b47b8d875
diff --git a/vendor/google.golang.org/grpc/clientconn.go b/vendor/google.golang.org/grpc/clientconn.go
index df1bb94..a7643df 100644
--- a/vendor/google.golang.org/grpc/clientconn.go
+++ b/vendor/google.golang.org/grpc/clientconn.go
@@ -38,14 +38,13 @@
 	"google.golang.org/grpc/grpclog"
 	"google.golang.org/grpc/internal/backoff"
 	"google.golang.org/grpc/internal/channelz"
-	"google.golang.org/grpc/internal/envconfig"
 	"google.golang.org/grpc/internal/grpcsync"
 	"google.golang.org/grpc/internal/transport"
 	"google.golang.org/grpc/keepalive"
-	"google.golang.org/grpc/metadata"
 	"google.golang.org/grpc/resolver"
 	_ "google.golang.org/grpc/resolver/dns"         // To register dns resolver.
 	_ "google.golang.org/grpc/resolver/passthrough" // To register passthrough resolver.
+	"google.golang.org/grpc/serviceconfig"
 	"google.golang.org/grpc/status"
 )
 
@@ -69,11 +68,9 @@
 	errConnClosing = errors.New("grpc: the connection is closing")
 	// errBalancerClosed indicates that the balancer is closed.
 	errBalancerClosed = errors.New("grpc: balancer is closed")
-	// We use an accessor so that minConnectTimeout can be
-	// atomically read and updated while testing.
-	getMinConnectTimeout = func() time.Duration {
-		return minConnectTimeout
-	}
+	// invalidDefaultServiceConfigErrPrefix is used to prefix the json parsing error for the default
+	// service config.
+	invalidDefaultServiceConfigErrPrefix = "grpc: the provided default service config is invalid"
 )
 
 // The following errors are returned from Dial and DialContext
@@ -140,6 +137,15 @@
 		opt.apply(&cc.dopts)
 	}
 
+	chainUnaryClientInterceptors(cc)
+	chainStreamClientInterceptors(cc)
+
+	defer func() {
+		if err != nil {
+			cc.Close()
+		}
+	}()
+
 	if channelz.IsOn() {
 		if cc.dopts.channelzParentID != 0 {
 			cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target)
@@ -179,6 +185,13 @@
 		}
 	}
 
+	if cc.dopts.defaultServiceConfigRawJSON != nil {
+		sc, err := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON)
+		if err != nil {
+			return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, err)
+		}
+		cc.dopts.defaultServiceConfig = sc
+	}
 	cc.mkp = cc.dopts.copts.KeepaliveParams
 
 	if cc.dopts.copts.Dialer == nil {
@@ -201,17 +214,12 @@
 		ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
 		defer cancel()
 	}
-
 	defer func() {
 		select {
 		case <-ctx.Done():
 			conn, err = nil, ctx.Err()
 		default:
 		}
-
-		if err != nil {
-			cc.Close()
-		}
 	}()
 
 	scSet := false
@@ -220,7 +228,7 @@
 		select {
 		case sc, ok := <-cc.dopts.scChan:
 			if ok {
-				cc.sc = sc
+				cc.sc = &sc
 				scSet = true
 			}
 		default:
@@ -266,7 +274,7 @@
 		select {
 		case sc, ok := <-cc.dopts.scChan:
 			if ok {
-				cc.sc = sc
+				cc.sc = &sc
 			}
 		case <-ctx.Done():
 			return nil, ctx.Err()
@@ -285,6 +293,7 @@
 		CredsBundle:      cc.dopts.copts.CredsBundle,
 		Dialer:           cc.dopts.copts.Dialer,
 		ChannelzParentID: cc.channelzID,
+		Target:           cc.parsedTarget,
 	}
 
 	// Build the resolver.
@@ -322,6 +331,68 @@
 	return cc, nil
 }
 
+// chainUnaryClientInterceptors chains all unary client interceptors into one.
+func chainUnaryClientInterceptors(cc *ClientConn) {
+	interceptors := cc.dopts.chainUnaryInts
+	// Prepend dopts.unaryInt to the chaining interceptors if it exists, since unaryInt will
+	// be executed before any other chained interceptors.
+	if cc.dopts.unaryInt != nil {
+		interceptors = append([]UnaryClientInterceptor{cc.dopts.unaryInt}, interceptors...)
+	}
+	var chainedInt UnaryClientInterceptor
+	if len(interceptors) == 0 {
+		chainedInt = nil
+	} else if len(interceptors) == 1 {
+		chainedInt = interceptors[0]
+	} else {
+		chainedInt = func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error {
+			return interceptors[0](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, 0, invoker), opts...)
+		}
+	}
+	cc.dopts.unaryInt = chainedInt
+}
+
+// getChainUnaryInvoker recursively generate the chained unary invoker.
+func getChainUnaryInvoker(interceptors []UnaryClientInterceptor, curr int, finalInvoker UnaryInvoker) UnaryInvoker {
+	if curr == len(interceptors)-1 {
+		return finalInvoker
+	}
+	return func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
+		return interceptors[curr+1](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, curr+1, finalInvoker), opts...)
+	}
+}
+
+// chainStreamClientInterceptors chains all stream client interceptors into one.
+func chainStreamClientInterceptors(cc *ClientConn) {
+	interceptors := cc.dopts.chainStreamInts
+	// Prepend dopts.streamInt to the chaining interceptors if it exists, since streamInt will
+	// be executed before any other chained interceptors.
+	if cc.dopts.streamInt != nil {
+		interceptors = append([]StreamClientInterceptor{cc.dopts.streamInt}, interceptors...)
+	}
+	var chainedInt StreamClientInterceptor
+	if len(interceptors) == 0 {
+		chainedInt = nil
+	} else if len(interceptors) == 1 {
+		chainedInt = interceptors[0]
+	} else {
+		chainedInt = func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error) {
+			return interceptors[0](ctx, desc, cc, method, getChainStreamer(interceptors, 0, streamer), opts...)
+		}
+	}
+	cc.dopts.streamInt = chainedInt
+}
+
+// getChainStreamer recursively generate the chained client stream constructor.
+func getChainStreamer(interceptors []StreamClientInterceptor, curr int, finalStreamer Streamer) Streamer {
+	if curr == len(interceptors)-1 {
+		return finalStreamer
+	}
+	return func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
+		return interceptors[curr+1](ctx, desc, cc, method, getChainStreamer(interceptors, curr+1, finalStreamer), opts...)
+	}
+}
+
 // connectivityStateManager keeps the connectivity.State of ClientConn.
 // This struct will eventually be exported so the balancers can access it.
 type connectivityStateManager struct {
@@ -388,14 +459,11 @@
 
 	mu              sync.RWMutex
 	resolverWrapper *ccResolverWrapper
-	sc              ServiceConfig
-	scRaw           string
+	sc              *ServiceConfig
 	conns           map[*addrConn]struct{}
 	// Keepalive parameter can be updated if a GoAway is received.
 	mkp             keepalive.ClientParameters
 	curBalancerName string
-	preBalancerName string // previous balancer name.
-	curAddresses    []resolver.Address
 	balancerWrapper *ccBalancerWrapper
 	retryThrottler  atomic.Value
 
@@ -437,8 +505,7 @@
 			cc.mu.Lock()
 			// TODO: load balance policy runtime change is ignored.
 			// We may revisit this decision in the future.
-			cc.sc = sc
-			cc.scRaw = ""
+			cc.sc = &sc
 			cc.mu.Unlock()
 		case <-cc.ctx.Done():
 			return
@@ -465,48 +532,45 @@
 	}
 }
 
-func (cc *ClientConn) handleResolvedAddrs(addrs []resolver.Address, err error) {
+func (cc *ClientConn) updateResolverState(s resolver.State) error {
 	cc.mu.Lock()
 	defer cc.mu.Unlock()
+	// Check if the ClientConn is already closed. Some fields (e.g.
+	// balancerWrapper) are set to nil when closing the ClientConn, and could
+	// cause nil pointer panic if we don't have this check.
 	if cc.conns == nil {
-		// cc was closed.
-		return
+		return nil
 	}
 
-	if reflect.DeepEqual(cc.curAddresses, addrs) {
-		return
+	if cc.dopts.disableServiceConfig || s.ServiceConfig == nil {
+		if cc.dopts.defaultServiceConfig != nil && cc.sc == nil {
+			cc.applyServiceConfig(cc.dopts.defaultServiceConfig)
+		}
+	} else if sc, ok := s.ServiceConfig.(*ServiceConfig); ok {
+		cc.applyServiceConfig(sc)
 	}
 
-	cc.curAddresses = addrs
-	cc.firstResolveEvent.Fire()
-
+	var balCfg serviceconfig.LoadBalancingConfig
 	if cc.dopts.balancerBuilder == nil {
 		// Only look at balancer types and switch balancer if balancer dial
 		// option is not set.
-		var isGRPCLB bool
-		for _, a := range addrs {
-			if a.Type == resolver.GRPCLB {
-				isGRPCLB = true
-				break
-			}
-		}
 		var newBalancerName string
-		if isGRPCLB {
-			newBalancerName = grpclbName
+		if cc.sc != nil && cc.sc.lbConfig != nil {
+			newBalancerName = cc.sc.lbConfig.name
+			balCfg = cc.sc.lbConfig.cfg
 		} else {
-			// Address list doesn't contain grpclb address. Try to pick a
-			// non-grpclb balancer.
-			newBalancerName = cc.curBalancerName
-			// If current balancer is grpclb, switch to the previous one.
-			if newBalancerName == grpclbName {
-				newBalancerName = cc.preBalancerName
+			var isGRPCLB bool
+			for _, a := range s.Addresses {
+				if a.Type == resolver.GRPCLB {
+					isGRPCLB = true
+					break
+				}
 			}
-			// The following could be true in two cases:
-			// - the first time handling resolved addresses
-			//   (curBalancerName="")
-			// - the first time handling non-grpclb addresses
-			//   (curBalancerName="grpclb", preBalancerName="")
-			if newBalancerName == "" {
+			if isGRPCLB {
+				newBalancerName = grpclbName
+			} else if cc.sc != nil && cc.sc.LB != nil {
+				newBalancerName = *cc.sc.LB
+			} else {
 				newBalancerName = PickFirstBalancerName
 			}
 		}
@@ -514,10 +578,12 @@
 	} else if cc.balancerWrapper == nil {
 		// Balancer dial option was set, and this is the first time handling
 		// resolved addresses. Build a balancer with dopts.balancerBuilder.
+		cc.curBalancerName = cc.dopts.balancerBuilder.Name()
 		cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
 	}
 
-	cc.balancerWrapper.handleResolvedAddrs(addrs, nil)
+	cc.balancerWrapper.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
+	return nil
 }
 
 // switchBalancer starts the switching from current balancer to the balancer
@@ -529,11 +595,7 @@
 //
 // Caller must hold cc.mu.
 func (cc *ClientConn) switchBalancer(name string) {
-	if cc.conns == nil {
-		return
-	}
-
-	if strings.ToLower(cc.curBalancerName) == strings.ToLower(name) {
+	if strings.EqualFold(cc.curBalancerName, name) {
 		return
 	}
 
@@ -542,15 +604,11 @@
 		grpclog.Infoln("ignoring balancer switching: Balancer DialOption used instead")
 		return
 	}
-	// TODO(bar switching) change this to two steps: drain and close.
-	// Keep track of sc in wrapper.
 	if cc.balancerWrapper != nil {
 		cc.balancerWrapper.close()
 	}
 
 	builder := balancer.Get(name)
-	// TODO(yuxuanli): If user send a service config that does not contain a valid balancer name, should
-	// we reuse previous one?
 	if channelz.IsOn() {
 		if builder == nil {
 			channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
@@ -569,7 +627,6 @@
 		builder = newPickfirstBuilder()
 	}
 
-	cc.preBalancerName = cc.curBalancerName
 	cc.curBalancerName = builder.Name()
 	cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
 }
@@ -677,6 +734,8 @@
 		ac.mu.Unlock()
 		return nil
 	}
+	// Update connectivity state within the lock to prevent subsequent or
+	// concurrent calls from resetting the transport more than once.
 	ac.updateConnectivityState(connectivity.Connecting)
 	ac.mu.Unlock()
 
@@ -687,7 +746,16 @@
 
 // tryUpdateAddrs tries to update ac.addrs with the new addresses list.
 //
-// It checks whether current connected address of ac is in the new addrs list.
+// If ac is Connecting, it returns false. The caller should tear down the ac and
+// create a new one. Note that the backoff will be reset when this happens.
+//
+// If ac is TransientFailure, it updates ac.addrs and returns true. The updated
+// addresses will be picked up by retry in the next iteration after backoff.
+//
+// If ac is Shutdown or Idle, it updates ac.addrs and returns true.
+//
+// If ac is Ready, it checks whether current connected address of ac is in the
+// new addrs list.
 //  - If true, it updates ac.addrs and returns true. The ac will keep using
 //    the existing connection.
 //  - If false, it does nothing and returns false.
@@ -695,17 +763,18 @@
 	ac.mu.Lock()
 	defer ac.mu.Unlock()
 	grpclog.Infof("addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs)
-	if ac.state == connectivity.Shutdown {
+	if ac.state == connectivity.Shutdown ||
+		ac.state == connectivity.TransientFailure ||
+		ac.state == connectivity.Idle {
 		ac.addrs = addrs
 		return true
 	}
 
-	// Unless we're busy reconnecting already, let's reconnect from the top of
-	// the list.
-	if ac.state != connectivity.Ready {
+	if ac.state == connectivity.Connecting {
 		return false
 	}
 
+	// ac.state is Ready, try to find the connected address.
 	var curAddrFound bool
 	for _, a := range addrs {
 		if reflect.DeepEqual(ac.curAddr, a) {
@@ -732,6 +801,9 @@
 	// TODO: Avoid the locking here.
 	cc.mu.RLock()
 	defer cc.mu.RUnlock()
+	if cc.sc == nil {
+		return MethodConfig{}
+	}
 	m, ok := cc.sc.Methods[method]
 	if !ok {
 		i := strings.LastIndex(method, "/")
@@ -743,14 +815,15 @@
 func (cc *ClientConn) healthCheckConfig() *healthCheckConfig {
 	cc.mu.RLock()
 	defer cc.mu.RUnlock()
+	if cc.sc == nil {
+		return nil
+	}
 	return cc.sc.healthCheckConfig
 }
 
 func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
-	hdr, _ := metadata.FromOutgoingContext(ctx)
 	t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickOptions{
 		FullMethodName: method,
-		Header:         hdr,
 	})
 	if err != nil {
 		return nil, nil, toRPCErr(err)
@@ -758,65 +831,25 @@
 	return t, done, nil
 }
 
-// handleServiceConfig parses the service config string in JSON format to Go native
-// struct ServiceConfig, and store both the struct and the JSON string in ClientConn.
-func (cc *ClientConn) handleServiceConfig(js string) error {
-	if cc.dopts.disableServiceConfig {
-		return nil
+func (cc *ClientConn) applyServiceConfig(sc *ServiceConfig) error {
+	if sc == nil {
+		// should never reach here.
+		return fmt.Errorf("got nil pointer for service config")
 	}
-	if cc.scRaw == js {
-		return nil
-	}
-	if channelz.IsOn() {
-		channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
-			// The special formatting of \"%s\" instead of %q is to provide nice printing of service config
-			// for human consumption.
-			Desc:     fmt.Sprintf("Channel has a new service config \"%s\"", js),
-			Severity: channelz.CtINFO,
-		})
-	}
-	sc, err := parseServiceConfig(js)
-	if err != nil {
-		return err
-	}
-	cc.mu.Lock()
-	// Check if the ClientConn is already closed. Some fields (e.g.
-	// balancerWrapper) are set to nil when closing the ClientConn, and could
-	// cause nil pointer panic if we don't have this check.
-	if cc.conns == nil {
-		cc.mu.Unlock()
-		return nil
-	}
-	cc.scRaw = js
 	cc.sc = sc
 
-	if sc.retryThrottling != nil {
+	if cc.sc.retryThrottling != nil {
 		newThrottler := &retryThrottler{
-			tokens: sc.retryThrottling.MaxTokens,
-			max:    sc.retryThrottling.MaxTokens,
-			thresh: sc.retryThrottling.MaxTokens / 2,
-			ratio:  sc.retryThrottling.TokenRatio,
+			tokens: cc.sc.retryThrottling.MaxTokens,
+			max:    cc.sc.retryThrottling.MaxTokens,
+			thresh: cc.sc.retryThrottling.MaxTokens / 2,
+			ratio:  cc.sc.retryThrottling.TokenRatio,
 		}
 		cc.retryThrottler.Store(newThrottler)
 	} else {
 		cc.retryThrottler.Store((*retryThrottler)(nil))
 	}
 
-	if sc.LB != nil && *sc.LB != grpclbName { // "grpclb" is not a valid balancer option in service config.
-		if cc.curBalancerName == grpclbName {
-			// If current balancer is grpclb, there's at least one grpclb
-			// balancer address in the resolved list. Don't switch the balancer,
-			// but change the previous balancer name, so if a new resolved
-			// address list doesn't contain grpclb address, balancer will be
-			// switched to *sc.LB.
-			cc.preBalancerName = *sc.LB
-		} else {
-			cc.switchBalancer(*sc.LB)
-			cc.balancerWrapper.handleResolvedAddrs(cc.curAddresses, nil)
-		}
-	}
-
-	cc.mu.Unlock()
 	return nil
 }
 
@@ -892,7 +925,7 @@
 		}
 		channelz.AddTraceEvent(cc.channelzID, ted)
 		// TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
-		// the entity beng deleted, and thus prevent it from being deleted right away.
+		// the entity being deleted, and thus prevent it from being deleted right away.
 		channelz.RemoveEntry(cc.channelzID)
 	}
 	return nil
@@ -921,8 +954,6 @@
 	// Use updateConnectivityState for updating addrConn's connectivity state.
 	state connectivity.State
 
-	tearDownErr error // The reason this addrConn is torn down.
-
 	backoffIdx   int // Needs to be stateful for resetConnectBackoff.
 	resetBackoff chan struct{}
 
@@ -963,191 +994,147 @@
 
 func (ac *addrConn) resetTransport() {
 	for i := 0; ; i++ {
-		tryNextAddrFromStart := grpcsync.NewEvent()
-
-		ac.mu.Lock()
 		if i > 0 {
 			ac.cc.resolveNow(resolver.ResolveNowOption{})
 		}
-		addrs := ac.addrs
-		backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx)
 
-		// This will be the duration that dial gets to finish.
-		dialDuration := getMinConnectTimeout()
-		if dialDuration < backoffFor {
-			// Give dial more time as we keep failing to connect.
-			dialDuration = backoffFor
-		}
-		connectDeadline := time.Now().Add(dialDuration)
-		ac.mu.Unlock()
-
-	addrLoop:
-		for _, addr := range addrs {
-			ac.mu.Lock()
-
-			if ac.state == connectivity.Shutdown {
-				ac.mu.Unlock()
-				return
-			}
-			ac.updateConnectivityState(connectivity.Connecting)
-			ac.transport = nil
-
-			ac.cc.mu.RLock()
-			ac.dopts.copts.KeepaliveParams = ac.cc.mkp
-			ac.cc.mu.RUnlock()
-
-			if ac.state == connectivity.Shutdown {
-				ac.mu.Unlock()
-				return
-			}
-
-			copts := ac.dopts.copts
-			if ac.scopts.CredsBundle != nil {
-				copts.CredsBundle = ac.scopts.CredsBundle
-			}
-			hctx, hcancel := context.WithCancel(ac.ctx)
-			defer hcancel()
-			ac.mu.Unlock()
-
-			if channelz.IsOn() {
-				channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
-					Desc:     fmt.Sprintf("Subchannel picks a new address %q to connect", addr.Addr),
-					Severity: channelz.CtINFO,
-				})
-			}
-
-			reconnect := grpcsync.NewEvent()
-			prefaceReceived := make(chan struct{})
-			newTr, err := ac.createTransport(addr, copts, connectDeadline, reconnect, prefaceReceived)
-			if err == nil {
-				ac.mu.Lock()
-				ac.curAddr = addr
-				ac.transport = newTr
-				ac.mu.Unlock()
-
-				healthCheckConfig := ac.cc.healthCheckConfig()
-				// LB channel health checking is only enabled when all the four requirements below are met:
-				// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption,
-				// 2. the internal.HealthCheckFunc is set by importing the grpc/healthcheck package,
-				// 3. a service config with non-empty healthCheckConfig field is provided,
-				// 4. the current load balancer allows it.
-				healthcheckManagingState := false
-				if !ac.cc.dopts.disableHealthCheck && healthCheckConfig != nil && ac.scopts.HealthCheckEnabled {
-					if ac.cc.dopts.healthCheckFunc == nil {
-						// TODO: add a link to the health check doc in the error message.
-						grpclog.Error("the client side LB channel health check function has not been set.")
-					} else {
-						// TODO(deklerk) refactor to just return transport
-						go ac.startHealthCheck(hctx, newTr, addr, healthCheckConfig.ServiceName)
-						healthcheckManagingState = true
-					}
-				}
-				if !healthcheckManagingState {
-					ac.mu.Lock()
-					ac.updateConnectivityState(connectivity.Ready)
-					ac.mu.Unlock()
-				}
-			} else {
-				hcancel()
-				if err == errConnClosing {
-					return
-				}
-
-				if tryNextAddrFromStart.HasFired() {
-					break addrLoop
-				}
-				continue
-			}
-
-			backoffFor = 0
-			ac.mu.Lock()
-			reqHandshake := ac.dopts.reqHandshake
-			ac.mu.Unlock()
-
-			<-reconnect.Done()
-			hcancel()
-
-			if reqHandshake == envconfig.RequireHandshakeHybrid {
-				// In RequireHandshakeHybrid mode, we must check to see whether
-				// server preface has arrived yet to decide whether to start
-				// reconnecting at the top of the list (server preface received)
-				// or continue with the next addr in the list as if the
-				// connection were not successful (server preface not received).
-				select {
-				case <-prefaceReceived:
-					// We received a server preface - huzzah! We consider this
-					// a success and restart from the top of the addr list.
-					ac.mu.Lock()
-					ac.backoffIdx = 0
-					ac.mu.Unlock()
-					break addrLoop
-				default:
-					// Despite having set state to READY, in hybrid mode we
-					// consider this a failure and continue connecting at the
-					// next addr in the list.
-					ac.mu.Lock()
-					if ac.state == connectivity.Shutdown {
-						ac.mu.Unlock()
-						return
-					}
-
-					ac.updateConnectivityState(connectivity.TransientFailure)
-					ac.mu.Unlock()
-
-					if tryNextAddrFromStart.HasFired() {
-						break addrLoop
-					}
-				}
-			} else {
-				// In RequireHandshakeOn mode, we would have already waited for
-				// the server preface, so we consider this a success and restart
-				// from the top of the addr list. In RequireHandshakeOff mode,
-				// we don't care to wait for the server preface before
-				// considering this a success, so we also restart from the top
-				// of the addr list.
-				ac.mu.Lock()
-				ac.backoffIdx = 0
-				ac.mu.Unlock()
-				break addrLoop
-			}
-		}
-
-		// After exhausting all addresses, or after need to reconnect after a
-		// READY, the addrConn enters TRANSIENT_FAILURE.
 		ac.mu.Lock()
 		if ac.state == connectivity.Shutdown {
 			ac.mu.Unlock()
 			return
 		}
-		ac.updateConnectivityState(connectivity.TransientFailure)
 
-		// Backoff.
-		b := ac.resetBackoff
-		timer := time.NewTimer(backoffFor)
-		acctx := ac.ctx
+		addrs := ac.addrs
+		backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx)
+		// This will be the duration that dial gets to finish.
+		dialDuration := minConnectTimeout
+		if ac.dopts.minConnectTimeout != nil {
+			dialDuration = ac.dopts.minConnectTimeout()
+		}
+
+		if dialDuration < backoffFor {
+			// Give dial more time as we keep failing to connect.
+			dialDuration = backoffFor
+		}
+		// We can potentially spend all the time trying the first address, and
+		// if the server accepts the connection and then hangs, the following
+		// addresses will never be tried.
+		//
+		// The spec doesn't mention what should be done for multiple addresses.
+		// https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md#proposed-backoff-algorithm
+		connectDeadline := time.Now().Add(dialDuration)
+
+		ac.updateConnectivityState(connectivity.Connecting)
+		ac.transport = nil
 		ac.mu.Unlock()
 
-		select {
-		case <-timer.C:
+		newTr, addr, reconnect, err := ac.tryAllAddrs(addrs, connectDeadline)
+		if err != nil {
+			// After exhausting all addresses, the addrConn enters
+			// TRANSIENT_FAILURE.
 			ac.mu.Lock()
-			ac.backoffIdx++
+			if ac.state == connectivity.Shutdown {
+				ac.mu.Unlock()
+				return
+			}
+			ac.updateConnectivityState(connectivity.TransientFailure)
+
+			// Backoff.
+			b := ac.resetBackoff
 			ac.mu.Unlock()
-		case <-b:
-			timer.Stop()
-		case <-acctx.Done():
-			timer.Stop()
+
+			timer := time.NewTimer(backoffFor)
+			select {
+			case <-timer.C:
+				ac.mu.Lock()
+				ac.backoffIdx++
+				ac.mu.Unlock()
+			case <-b:
+				timer.Stop()
+			case <-ac.ctx.Done():
+				timer.Stop()
+				return
+			}
+			continue
+		}
+
+		ac.mu.Lock()
+		if ac.state == connectivity.Shutdown {
+			ac.mu.Unlock()
+			newTr.Close()
 			return
 		}
+		ac.curAddr = addr
+		ac.transport = newTr
+		ac.backoffIdx = 0
+
+		hctx, hcancel := context.WithCancel(ac.ctx)
+		ac.startHealthCheck(hctx)
+		ac.mu.Unlock()
+
+		// Block until the created transport is down. And when this happens,
+		// we restart from the top of the addr list.
+		<-reconnect.Done()
+		hcancel()
+		// restart connecting - the top of the loop will set state to
+		// CONNECTING.  This is against the current connectivity semantics doc,
+		// however it allows for graceful behavior for RPCs not yet dispatched
+		// - unfortunate timing would otherwise lead to the RPC failing even
+		// though the TRANSIENT_FAILURE state (called for by the doc) would be
+		// instantaneous.
+		//
+		// Ideally we should transition to Idle here and block until there is
+		// RPC activity that leads to the balancer requesting a reconnect of
+		// the associated SubConn.
 	}
 }
 
-// createTransport creates a connection to one of the backends in addrs. It
-// sets ac.transport in the success case, or it returns an error if it was
-// unable to successfully create a transport.
-//
-// If waitForHandshake is enabled, it blocks until server preface arrives.
-func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time, reconnect *grpcsync.Event, prefaceReceived chan struct{}) (transport.ClientTransport, error) {
+// tryAllAddrs tries to creates a connection to the addresses, and stop when at the
+// first successful one. It returns the transport, the address and a Event in
+// the successful case. The Event fires when the returned transport disconnects.
+func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.Time) (transport.ClientTransport, resolver.Address, *grpcsync.Event, error) {
+	for _, addr := range addrs {
+		ac.mu.Lock()
+		if ac.state == connectivity.Shutdown {
+			ac.mu.Unlock()
+			return nil, resolver.Address{}, nil, errConnClosing
+		}
+
+		ac.cc.mu.RLock()
+		ac.dopts.copts.KeepaliveParams = ac.cc.mkp
+		ac.cc.mu.RUnlock()
+
+		copts := ac.dopts.copts
+		if ac.scopts.CredsBundle != nil {
+			copts.CredsBundle = ac.scopts.CredsBundle
+		}
+		ac.mu.Unlock()
+
+		if channelz.IsOn() {
+			channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
+				Desc:     fmt.Sprintf("Subchannel picks a new address %q to connect", addr.Addr),
+				Severity: channelz.CtINFO,
+			})
+		}
+
+		newTr, reconnect, err := ac.createTransport(addr, copts, connectDeadline)
+		if err == nil {
+			return newTr, addr, reconnect, nil
+		}
+		ac.cc.blockingpicker.updateConnectionError(err)
+	}
+
+	// Couldn't connect to any address.
+	return nil, resolver.Address{}, nil, fmt.Errorf("couldn't connect to any address")
+}
+
+// createTransport creates a connection to addr. It returns the transport and a
+// Event in the successful case. The Event fires when the returned transport
+// disconnects.
+func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) (transport.ClientTransport, *grpcsync.Event, error) {
+	prefaceReceived := make(chan struct{})
 	onCloseCalled := make(chan struct{})
+	reconnect := grpcsync.NewEvent()
 
 	target := transport.TargetInfo{
 		Addr:      addr.Addr,
@@ -1155,24 +1142,41 @@
 		Authority: ac.cc.authority,
 	}
 
-	prefaceTimer := time.NewTimer(time.Until(connectDeadline))
-
+	once := sync.Once{}
 	onGoAway := func(r transport.GoAwayReason) {
 		ac.mu.Lock()
 		ac.adjustParams(r)
+		once.Do(func() {
+			if ac.state == connectivity.Ready {
+				// Prevent this SubConn from being used for new RPCs by setting its
+				// state to Connecting.
+				//
+				// TODO: this should be Idle when grpc-go properly supports it.
+				ac.updateConnectivityState(connectivity.Connecting)
+			}
+		})
 		ac.mu.Unlock()
 		reconnect.Fire()
 	}
 
 	onClose := func() {
+		ac.mu.Lock()
+		once.Do(func() {
+			if ac.state == connectivity.Ready {
+				// Prevent this SubConn from being used for new RPCs by setting its
+				// state to Connecting.
+				//
+				// TODO: this should be Idle when grpc-go properly supports it.
+				ac.updateConnectivityState(connectivity.Connecting)
+			}
+		})
+		ac.mu.Unlock()
 		close(onCloseCalled)
-		prefaceTimer.Stop()
 		reconnect.Fire()
 	}
 
 	onPrefaceReceipt := func() {
 		close(prefaceReceived)
-		prefaceTimer.Stop()
 	}
 
 	connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
@@ -1182,107 +1186,105 @@
 	}
 
 	newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt, onGoAway, onClose)
-
-	if err == nil {
-		if ac.dopts.reqHandshake == envconfig.RequireHandshakeOn {
-			select {
-			case <-prefaceTimer.C:
-				// We didn't get the preface in time.
-				newTr.Close()
-				err = errors.New("timed out waiting for server handshake")
-			case <-prefaceReceived:
-				// We got the preface - huzzah! things are good.
-			case <-onCloseCalled:
-				// The transport has already closed - noop.
-				return nil, errors.New("connection closed")
-			}
-		} else if ac.dopts.reqHandshake == envconfig.RequireHandshakeHybrid {
-			go func() {
-				select {
-				case <-prefaceTimer.C:
-					// We didn't get the preface in time.
-					newTr.Close()
-				case <-prefaceReceived:
-					// We got the preface just in the nick of time - huzzah!
-				case <-onCloseCalled:
-					// The transport has already closed - noop.
-				}
-			}()
-		}
-	}
-
 	if err != nil {
 		// newTr is either nil, or closed.
-		ac.cc.blockingpicker.updateConnectionError(err)
-		ac.mu.Lock()
-		if ac.state == connectivity.Shutdown {
-			// ac.tearDown(...) has been invoked.
-			ac.mu.Unlock()
-
-			return nil, errConnClosing
-		}
-		ac.mu.Unlock()
 		grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v. Err :%v. Reconnecting...", addr, err)
-		return nil, err
+		return nil, nil, err
 	}
 
-	// Now there is a viable transport to be use, so set ac.transport to reflect the new viable transport.
-	ac.mu.Lock()
-	if ac.state == connectivity.Shutdown {
-		ac.mu.Unlock()
+	select {
+	case <-time.After(connectDeadline.Sub(time.Now())):
+		// We didn't get the preface in time.
 		newTr.Close()
-		return nil, errConnClosing
+		grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr)
+		return nil, nil, errors.New("timed out waiting for server handshake")
+	case <-prefaceReceived:
+		// We got the preface - huzzah! things are good.
+	case <-onCloseCalled:
+		// The transport has already closed - noop.
+		return nil, nil, errors.New("connection closed")
+		// TODO(deklerk) this should bail on ac.ctx.Done(). Add a test and fix.
 	}
-	ac.mu.Unlock()
-
-	// Now there is a viable transport to be use, so set ac.transport to reflect the new viable transport.
-	ac.mu.Lock()
-	if ac.state == connectivity.Shutdown {
-		ac.mu.Unlock()
-		newTr.Close()
-		return nil, errConnClosing
-	}
-	ac.mu.Unlock()
-
-	return newTr, nil
+	return newTr, reconnect, nil
 }
 
-func (ac *addrConn) startHealthCheck(ctx context.Context, newTr transport.ClientTransport, addr resolver.Address, serviceName string) {
-	// Set up the health check helper functions
-	newStream := func() (interface{}, error) {
-		return ac.newClientStream(ctx, &StreamDesc{ServerStreams: true}, "/grpc.health.v1.Health/Watch", newTr)
+// startHealthCheck starts the health checking stream (RPC) to watch the health
+// stats of this connection if health checking is requested and configured.
+//
+// LB channel health checking is enabled when all requirements below are met:
+// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption
+// 2. internal.HealthCheckFunc is set by importing the grpc/healthcheck package
+// 3. a service config with non-empty healthCheckConfig field is provided
+// 4. the load balancer requests it
+//
+// It sets addrConn to READY if the health checking stream is not started.
+//
+// Caller must hold ac.mu.
+func (ac *addrConn) startHealthCheck(ctx context.Context) {
+	var healthcheckManagingState bool
+	defer func() {
+		if !healthcheckManagingState {
+			ac.updateConnectivityState(connectivity.Ready)
+		}
+	}()
+
+	if ac.cc.dopts.disableHealthCheck {
+		return
 	}
-	firstReady := true
-	reportHealth := func(ok bool) {
+	healthCheckConfig := ac.cc.healthCheckConfig()
+	if healthCheckConfig == nil {
+		return
+	}
+	if !ac.scopts.HealthCheckEnabled {
+		return
+	}
+	healthCheckFunc := ac.cc.dopts.healthCheckFunc
+	if healthCheckFunc == nil {
+		// The health package is not imported to set health check function.
+		//
+		// TODO: add a link to the health check doc in the error message.
+		grpclog.Error("Health check is requested but health check function is not set.")
+		return
+	}
+
+	healthcheckManagingState = true
+
+	// Set up the health check helper functions.
+	currentTr := ac.transport
+	newStream := func(method string) (interface{}, error) {
+		ac.mu.Lock()
+		if ac.transport != currentTr {
+			ac.mu.Unlock()
+			return nil, status.Error(codes.Canceled, "the provided transport is no longer valid to use")
+		}
+		ac.mu.Unlock()
+		return newNonRetryClientStream(ctx, &StreamDesc{ServerStreams: true}, method, currentTr, ac)
+	}
+	setConnectivityState := func(s connectivity.State) {
 		ac.mu.Lock()
 		defer ac.mu.Unlock()
-		if ac.transport != newTr {
+		if ac.transport != currentTr {
 			return
 		}
-		if ok {
-			if firstReady {
-				firstReady = false
-				ac.curAddr = addr
-			}
-			ac.updateConnectivityState(connectivity.Ready)
-		} else {
-			ac.updateConnectivityState(connectivity.TransientFailure)
-		}
+		ac.updateConnectivityState(s)
 	}
-	err := ac.cc.dopts.healthCheckFunc(ctx, newStream, reportHealth, serviceName)
-	if err != nil {
-		if status.Code(err) == codes.Unimplemented {
-			if channelz.IsOn() {
-				channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
-					Desc:     "Subchannel health check is unimplemented at server side, thus health check is disabled",
-					Severity: channelz.CtError,
-				})
+	// Start the health checking stream.
+	go func() {
+		err := ac.cc.dopts.healthCheckFunc(ctx, newStream, setConnectivityState, healthCheckConfig.ServiceName)
+		if err != nil {
+			if status.Code(err) == codes.Unimplemented {
+				if channelz.IsOn() {
+					channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
+						Desc:     "Subchannel health check is unimplemented at server side, thus health check is disabled",
+						Severity: channelz.CtError,
+					})
+				}
+				grpclog.Error("Subchannel health check is unimplemented at server side, thus health check is disabled")
+			} else {
+				grpclog.Errorf("HealthCheckFunc exits with unexpected error %v", err)
 			}
-			grpclog.Error("Subchannel health check is unimplemented at server side, thus health check is disabled")
-		} else {
-			grpclog.Errorf("HealthCheckFunc exits with unexpected error %v", err)
 		}
-	}
+	}()
 }
 
 func (ac *addrConn) resetConnectBackoff() {
@@ -1332,7 +1334,6 @@
 	// between setting the state and logic that waits on context cancelation / etc.
 	ac.updateConnectivityState(connectivity.Shutdown)
 	ac.cancel()
-	ac.tearDownErr = err
 	ac.curAddr = resolver.Address{}
 	if err == errConnDrain && curTr != nil {
 		// GracefulClose(...) may be executed multiple times when