VOL-2112 move to voltha-lib-go
Change-Id: I3435b8acb982deeab6b6ac28e798d7722ad01d0a
diff --git a/vendor/google.golang.org/grpc/clientconn.go b/vendor/google.golang.org/grpc/clientconn.go
index df1bb94..a7643df 100644
--- a/vendor/google.golang.org/grpc/clientconn.go
+++ b/vendor/google.golang.org/grpc/clientconn.go
@@ -38,14 +38,13 @@
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/channelz"
- "google.golang.org/grpc/internal/envconfig"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/keepalive"
- "google.golang.org/grpc/metadata"
"google.golang.org/grpc/resolver"
_ "google.golang.org/grpc/resolver/dns" // To register dns resolver.
_ "google.golang.org/grpc/resolver/passthrough" // To register passthrough resolver.
+ "google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/status"
)
@@ -69,11 +68,9 @@
errConnClosing = errors.New("grpc: the connection is closing")
// errBalancerClosed indicates that the balancer is closed.
errBalancerClosed = errors.New("grpc: balancer is closed")
- // We use an accessor so that minConnectTimeout can be
- // atomically read and updated while testing.
- getMinConnectTimeout = func() time.Duration {
- return minConnectTimeout
- }
+ // invalidDefaultServiceConfigErrPrefix is used to prefix the json parsing error for the default
+ // service config.
+ invalidDefaultServiceConfigErrPrefix = "grpc: the provided default service config is invalid"
)
// The following errors are returned from Dial and DialContext
@@ -140,6 +137,15 @@
opt.apply(&cc.dopts)
}
+ chainUnaryClientInterceptors(cc)
+ chainStreamClientInterceptors(cc)
+
+ defer func() {
+ if err != nil {
+ cc.Close()
+ }
+ }()
+
if channelz.IsOn() {
if cc.dopts.channelzParentID != 0 {
cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target)
@@ -179,6 +185,13 @@
}
}
+ if cc.dopts.defaultServiceConfigRawJSON != nil {
+ sc, err := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON)
+ if err != nil {
+ return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, err)
+ }
+ cc.dopts.defaultServiceConfig = sc
+ }
cc.mkp = cc.dopts.copts.KeepaliveParams
if cc.dopts.copts.Dialer == nil {
@@ -201,17 +214,12 @@
ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
defer cancel()
}
-
defer func() {
select {
case <-ctx.Done():
conn, err = nil, ctx.Err()
default:
}
-
- if err != nil {
- cc.Close()
- }
}()
scSet := false
@@ -220,7 +228,7 @@
select {
case sc, ok := <-cc.dopts.scChan:
if ok {
- cc.sc = sc
+ cc.sc = &sc
scSet = true
}
default:
@@ -266,7 +274,7 @@
select {
case sc, ok := <-cc.dopts.scChan:
if ok {
- cc.sc = sc
+ cc.sc = &sc
}
case <-ctx.Done():
return nil, ctx.Err()
@@ -285,6 +293,7 @@
CredsBundle: cc.dopts.copts.CredsBundle,
Dialer: cc.dopts.copts.Dialer,
ChannelzParentID: cc.channelzID,
+ Target: cc.parsedTarget,
}
// Build the resolver.
@@ -322,6 +331,68 @@
return cc, nil
}
+// chainUnaryClientInterceptors chains all unary client interceptors into one.
+func chainUnaryClientInterceptors(cc *ClientConn) {
+ interceptors := cc.dopts.chainUnaryInts
+ // Prepend dopts.unaryInt to the chaining interceptors if it exists, since unaryInt will
+ // be executed before any other chained interceptors.
+ if cc.dopts.unaryInt != nil {
+ interceptors = append([]UnaryClientInterceptor{cc.dopts.unaryInt}, interceptors...)
+ }
+ var chainedInt UnaryClientInterceptor
+ if len(interceptors) == 0 {
+ chainedInt = nil
+ } else if len(interceptors) == 1 {
+ chainedInt = interceptors[0]
+ } else {
+ chainedInt = func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error {
+ return interceptors[0](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, 0, invoker), opts...)
+ }
+ }
+ cc.dopts.unaryInt = chainedInt
+}
+
+// getChainUnaryInvoker recursively generate the chained unary invoker.
+func getChainUnaryInvoker(interceptors []UnaryClientInterceptor, curr int, finalInvoker UnaryInvoker) UnaryInvoker {
+ if curr == len(interceptors)-1 {
+ return finalInvoker
+ }
+ return func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
+ return interceptors[curr+1](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, curr+1, finalInvoker), opts...)
+ }
+}
+
+// chainStreamClientInterceptors chains all stream client interceptors into one.
+func chainStreamClientInterceptors(cc *ClientConn) {
+ interceptors := cc.dopts.chainStreamInts
+ // Prepend dopts.streamInt to the chaining interceptors if it exists, since streamInt will
+ // be executed before any other chained interceptors.
+ if cc.dopts.streamInt != nil {
+ interceptors = append([]StreamClientInterceptor{cc.dopts.streamInt}, interceptors...)
+ }
+ var chainedInt StreamClientInterceptor
+ if len(interceptors) == 0 {
+ chainedInt = nil
+ } else if len(interceptors) == 1 {
+ chainedInt = interceptors[0]
+ } else {
+ chainedInt = func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error) {
+ return interceptors[0](ctx, desc, cc, method, getChainStreamer(interceptors, 0, streamer), opts...)
+ }
+ }
+ cc.dopts.streamInt = chainedInt
+}
+
+// getChainStreamer recursively generate the chained client stream constructor.
+func getChainStreamer(interceptors []StreamClientInterceptor, curr int, finalStreamer Streamer) Streamer {
+ if curr == len(interceptors)-1 {
+ return finalStreamer
+ }
+ return func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
+ return interceptors[curr+1](ctx, desc, cc, method, getChainStreamer(interceptors, curr+1, finalStreamer), opts...)
+ }
+}
+
// connectivityStateManager keeps the connectivity.State of ClientConn.
// This struct will eventually be exported so the balancers can access it.
type connectivityStateManager struct {
@@ -388,14 +459,11 @@
mu sync.RWMutex
resolverWrapper *ccResolverWrapper
- sc ServiceConfig
- scRaw string
+ sc *ServiceConfig
conns map[*addrConn]struct{}
// Keepalive parameter can be updated if a GoAway is received.
mkp keepalive.ClientParameters
curBalancerName string
- preBalancerName string // previous balancer name.
- curAddresses []resolver.Address
balancerWrapper *ccBalancerWrapper
retryThrottler atomic.Value
@@ -437,8 +505,7 @@
cc.mu.Lock()
// TODO: load balance policy runtime change is ignored.
// We may revisit this decision in the future.
- cc.sc = sc
- cc.scRaw = ""
+ cc.sc = &sc
cc.mu.Unlock()
case <-cc.ctx.Done():
return
@@ -465,48 +532,45 @@
}
}
-func (cc *ClientConn) handleResolvedAddrs(addrs []resolver.Address, err error) {
+func (cc *ClientConn) updateResolverState(s resolver.State) error {
cc.mu.Lock()
defer cc.mu.Unlock()
+ // Check if the ClientConn is already closed. Some fields (e.g.
+ // balancerWrapper) are set to nil when closing the ClientConn, and could
+ // cause nil pointer panic if we don't have this check.
if cc.conns == nil {
- // cc was closed.
- return
+ return nil
}
- if reflect.DeepEqual(cc.curAddresses, addrs) {
- return
+ if cc.dopts.disableServiceConfig || s.ServiceConfig == nil {
+ if cc.dopts.defaultServiceConfig != nil && cc.sc == nil {
+ cc.applyServiceConfig(cc.dopts.defaultServiceConfig)
+ }
+ } else if sc, ok := s.ServiceConfig.(*ServiceConfig); ok {
+ cc.applyServiceConfig(sc)
}
- cc.curAddresses = addrs
- cc.firstResolveEvent.Fire()
-
+ var balCfg serviceconfig.LoadBalancingConfig
if cc.dopts.balancerBuilder == nil {
// Only look at balancer types and switch balancer if balancer dial
// option is not set.
- var isGRPCLB bool
- for _, a := range addrs {
- if a.Type == resolver.GRPCLB {
- isGRPCLB = true
- break
- }
- }
var newBalancerName string
- if isGRPCLB {
- newBalancerName = grpclbName
+ if cc.sc != nil && cc.sc.lbConfig != nil {
+ newBalancerName = cc.sc.lbConfig.name
+ balCfg = cc.sc.lbConfig.cfg
} else {
- // Address list doesn't contain grpclb address. Try to pick a
- // non-grpclb balancer.
- newBalancerName = cc.curBalancerName
- // If current balancer is grpclb, switch to the previous one.
- if newBalancerName == grpclbName {
- newBalancerName = cc.preBalancerName
+ var isGRPCLB bool
+ for _, a := range s.Addresses {
+ if a.Type == resolver.GRPCLB {
+ isGRPCLB = true
+ break
+ }
}
- // The following could be true in two cases:
- // - the first time handling resolved addresses
- // (curBalancerName="")
- // - the first time handling non-grpclb addresses
- // (curBalancerName="grpclb", preBalancerName="")
- if newBalancerName == "" {
+ if isGRPCLB {
+ newBalancerName = grpclbName
+ } else if cc.sc != nil && cc.sc.LB != nil {
+ newBalancerName = *cc.sc.LB
+ } else {
newBalancerName = PickFirstBalancerName
}
}
@@ -514,10 +578,12 @@
} else if cc.balancerWrapper == nil {
// Balancer dial option was set, and this is the first time handling
// resolved addresses. Build a balancer with dopts.balancerBuilder.
+ cc.curBalancerName = cc.dopts.balancerBuilder.Name()
cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
}
- cc.balancerWrapper.handleResolvedAddrs(addrs, nil)
+ cc.balancerWrapper.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
+ return nil
}
// switchBalancer starts the switching from current balancer to the balancer
@@ -529,11 +595,7 @@
//
// Caller must hold cc.mu.
func (cc *ClientConn) switchBalancer(name string) {
- if cc.conns == nil {
- return
- }
-
- if strings.ToLower(cc.curBalancerName) == strings.ToLower(name) {
+ if strings.EqualFold(cc.curBalancerName, name) {
return
}
@@ -542,15 +604,11 @@
grpclog.Infoln("ignoring balancer switching: Balancer DialOption used instead")
return
}
- // TODO(bar switching) change this to two steps: drain and close.
- // Keep track of sc in wrapper.
if cc.balancerWrapper != nil {
cc.balancerWrapper.close()
}
builder := balancer.Get(name)
- // TODO(yuxuanli): If user send a service config that does not contain a valid balancer name, should
- // we reuse previous one?
if channelz.IsOn() {
if builder == nil {
channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
@@ -569,7 +627,6 @@
builder = newPickfirstBuilder()
}
- cc.preBalancerName = cc.curBalancerName
cc.curBalancerName = builder.Name()
cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
}
@@ -677,6 +734,8 @@
ac.mu.Unlock()
return nil
}
+ // Update connectivity state within the lock to prevent subsequent or
+ // concurrent calls from resetting the transport more than once.
ac.updateConnectivityState(connectivity.Connecting)
ac.mu.Unlock()
@@ -687,7 +746,16 @@
// tryUpdateAddrs tries to update ac.addrs with the new addresses list.
//
-// It checks whether current connected address of ac is in the new addrs list.
+// If ac is Connecting, it returns false. The caller should tear down the ac and
+// create a new one. Note that the backoff will be reset when this happens.
+//
+// If ac is TransientFailure, it updates ac.addrs and returns true. The updated
+// addresses will be picked up by retry in the next iteration after backoff.
+//
+// If ac is Shutdown or Idle, it updates ac.addrs and returns true.
+//
+// If ac is Ready, it checks whether current connected address of ac is in the
+// new addrs list.
// - If true, it updates ac.addrs and returns true. The ac will keep using
// the existing connection.
// - If false, it does nothing and returns false.
@@ -695,17 +763,18 @@
ac.mu.Lock()
defer ac.mu.Unlock()
grpclog.Infof("addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs)
- if ac.state == connectivity.Shutdown {
+ if ac.state == connectivity.Shutdown ||
+ ac.state == connectivity.TransientFailure ||
+ ac.state == connectivity.Idle {
ac.addrs = addrs
return true
}
- // Unless we're busy reconnecting already, let's reconnect from the top of
- // the list.
- if ac.state != connectivity.Ready {
+ if ac.state == connectivity.Connecting {
return false
}
+ // ac.state is Ready, try to find the connected address.
var curAddrFound bool
for _, a := range addrs {
if reflect.DeepEqual(ac.curAddr, a) {
@@ -732,6 +801,9 @@
// TODO: Avoid the locking here.
cc.mu.RLock()
defer cc.mu.RUnlock()
+ if cc.sc == nil {
+ return MethodConfig{}
+ }
m, ok := cc.sc.Methods[method]
if !ok {
i := strings.LastIndex(method, "/")
@@ -743,14 +815,15 @@
func (cc *ClientConn) healthCheckConfig() *healthCheckConfig {
cc.mu.RLock()
defer cc.mu.RUnlock()
+ if cc.sc == nil {
+ return nil
+ }
return cc.sc.healthCheckConfig
}
func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
- hdr, _ := metadata.FromOutgoingContext(ctx)
t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickOptions{
FullMethodName: method,
- Header: hdr,
})
if err != nil {
return nil, nil, toRPCErr(err)
@@ -758,65 +831,25 @@
return t, done, nil
}
-// handleServiceConfig parses the service config string in JSON format to Go native
-// struct ServiceConfig, and store both the struct and the JSON string in ClientConn.
-func (cc *ClientConn) handleServiceConfig(js string) error {
- if cc.dopts.disableServiceConfig {
- return nil
+func (cc *ClientConn) applyServiceConfig(sc *ServiceConfig) error {
+ if sc == nil {
+ // should never reach here.
+ return fmt.Errorf("got nil pointer for service config")
}
- if cc.scRaw == js {
- return nil
- }
- if channelz.IsOn() {
- channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
- // The special formatting of \"%s\" instead of %q is to provide nice printing of service config
- // for human consumption.
- Desc: fmt.Sprintf("Channel has a new service config \"%s\"", js),
- Severity: channelz.CtINFO,
- })
- }
- sc, err := parseServiceConfig(js)
- if err != nil {
- return err
- }
- cc.mu.Lock()
- // Check if the ClientConn is already closed. Some fields (e.g.
- // balancerWrapper) are set to nil when closing the ClientConn, and could
- // cause nil pointer panic if we don't have this check.
- if cc.conns == nil {
- cc.mu.Unlock()
- return nil
- }
- cc.scRaw = js
cc.sc = sc
- if sc.retryThrottling != nil {
+ if cc.sc.retryThrottling != nil {
newThrottler := &retryThrottler{
- tokens: sc.retryThrottling.MaxTokens,
- max: sc.retryThrottling.MaxTokens,
- thresh: sc.retryThrottling.MaxTokens / 2,
- ratio: sc.retryThrottling.TokenRatio,
+ tokens: cc.sc.retryThrottling.MaxTokens,
+ max: cc.sc.retryThrottling.MaxTokens,
+ thresh: cc.sc.retryThrottling.MaxTokens / 2,
+ ratio: cc.sc.retryThrottling.TokenRatio,
}
cc.retryThrottler.Store(newThrottler)
} else {
cc.retryThrottler.Store((*retryThrottler)(nil))
}
- if sc.LB != nil && *sc.LB != grpclbName { // "grpclb" is not a valid balancer option in service config.
- if cc.curBalancerName == grpclbName {
- // If current balancer is grpclb, there's at least one grpclb
- // balancer address in the resolved list. Don't switch the balancer,
- // but change the previous balancer name, so if a new resolved
- // address list doesn't contain grpclb address, balancer will be
- // switched to *sc.LB.
- cc.preBalancerName = *sc.LB
- } else {
- cc.switchBalancer(*sc.LB)
- cc.balancerWrapper.handleResolvedAddrs(cc.curAddresses, nil)
- }
- }
-
- cc.mu.Unlock()
return nil
}
@@ -892,7 +925,7 @@
}
channelz.AddTraceEvent(cc.channelzID, ted)
// TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
- // the entity beng deleted, and thus prevent it from being deleted right away.
+ // the entity being deleted, and thus prevent it from being deleted right away.
channelz.RemoveEntry(cc.channelzID)
}
return nil
@@ -921,8 +954,6 @@
// Use updateConnectivityState for updating addrConn's connectivity state.
state connectivity.State
- tearDownErr error // The reason this addrConn is torn down.
-
backoffIdx int // Needs to be stateful for resetConnectBackoff.
resetBackoff chan struct{}
@@ -963,191 +994,147 @@
func (ac *addrConn) resetTransport() {
for i := 0; ; i++ {
- tryNextAddrFromStart := grpcsync.NewEvent()
-
- ac.mu.Lock()
if i > 0 {
ac.cc.resolveNow(resolver.ResolveNowOption{})
}
- addrs := ac.addrs
- backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx)
- // This will be the duration that dial gets to finish.
- dialDuration := getMinConnectTimeout()
- if dialDuration < backoffFor {
- // Give dial more time as we keep failing to connect.
- dialDuration = backoffFor
- }
- connectDeadline := time.Now().Add(dialDuration)
- ac.mu.Unlock()
-
- addrLoop:
- for _, addr := range addrs {
- ac.mu.Lock()
-
- if ac.state == connectivity.Shutdown {
- ac.mu.Unlock()
- return
- }
- ac.updateConnectivityState(connectivity.Connecting)
- ac.transport = nil
-
- ac.cc.mu.RLock()
- ac.dopts.copts.KeepaliveParams = ac.cc.mkp
- ac.cc.mu.RUnlock()
-
- if ac.state == connectivity.Shutdown {
- ac.mu.Unlock()
- return
- }
-
- copts := ac.dopts.copts
- if ac.scopts.CredsBundle != nil {
- copts.CredsBundle = ac.scopts.CredsBundle
- }
- hctx, hcancel := context.WithCancel(ac.ctx)
- defer hcancel()
- ac.mu.Unlock()
-
- if channelz.IsOn() {
- channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
- Desc: fmt.Sprintf("Subchannel picks a new address %q to connect", addr.Addr),
- Severity: channelz.CtINFO,
- })
- }
-
- reconnect := grpcsync.NewEvent()
- prefaceReceived := make(chan struct{})
- newTr, err := ac.createTransport(addr, copts, connectDeadline, reconnect, prefaceReceived)
- if err == nil {
- ac.mu.Lock()
- ac.curAddr = addr
- ac.transport = newTr
- ac.mu.Unlock()
-
- healthCheckConfig := ac.cc.healthCheckConfig()
- // LB channel health checking is only enabled when all the four requirements below are met:
- // 1. it is not disabled by the user with the WithDisableHealthCheck DialOption,
- // 2. the internal.HealthCheckFunc is set by importing the grpc/healthcheck package,
- // 3. a service config with non-empty healthCheckConfig field is provided,
- // 4. the current load balancer allows it.
- healthcheckManagingState := false
- if !ac.cc.dopts.disableHealthCheck && healthCheckConfig != nil && ac.scopts.HealthCheckEnabled {
- if ac.cc.dopts.healthCheckFunc == nil {
- // TODO: add a link to the health check doc in the error message.
- grpclog.Error("the client side LB channel health check function has not been set.")
- } else {
- // TODO(deklerk) refactor to just return transport
- go ac.startHealthCheck(hctx, newTr, addr, healthCheckConfig.ServiceName)
- healthcheckManagingState = true
- }
- }
- if !healthcheckManagingState {
- ac.mu.Lock()
- ac.updateConnectivityState(connectivity.Ready)
- ac.mu.Unlock()
- }
- } else {
- hcancel()
- if err == errConnClosing {
- return
- }
-
- if tryNextAddrFromStart.HasFired() {
- break addrLoop
- }
- continue
- }
-
- backoffFor = 0
- ac.mu.Lock()
- reqHandshake := ac.dopts.reqHandshake
- ac.mu.Unlock()
-
- <-reconnect.Done()
- hcancel()
-
- if reqHandshake == envconfig.RequireHandshakeHybrid {
- // In RequireHandshakeHybrid mode, we must check to see whether
- // server preface has arrived yet to decide whether to start
- // reconnecting at the top of the list (server preface received)
- // or continue with the next addr in the list as if the
- // connection were not successful (server preface not received).
- select {
- case <-prefaceReceived:
- // We received a server preface - huzzah! We consider this
- // a success and restart from the top of the addr list.
- ac.mu.Lock()
- ac.backoffIdx = 0
- ac.mu.Unlock()
- break addrLoop
- default:
- // Despite having set state to READY, in hybrid mode we
- // consider this a failure and continue connecting at the
- // next addr in the list.
- ac.mu.Lock()
- if ac.state == connectivity.Shutdown {
- ac.mu.Unlock()
- return
- }
-
- ac.updateConnectivityState(connectivity.TransientFailure)
- ac.mu.Unlock()
-
- if tryNextAddrFromStart.HasFired() {
- break addrLoop
- }
- }
- } else {
- // In RequireHandshakeOn mode, we would have already waited for
- // the server preface, so we consider this a success and restart
- // from the top of the addr list. In RequireHandshakeOff mode,
- // we don't care to wait for the server preface before
- // considering this a success, so we also restart from the top
- // of the addr list.
- ac.mu.Lock()
- ac.backoffIdx = 0
- ac.mu.Unlock()
- break addrLoop
- }
- }
-
- // After exhausting all addresses, or after need to reconnect after a
- // READY, the addrConn enters TRANSIENT_FAILURE.
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
return
}
- ac.updateConnectivityState(connectivity.TransientFailure)
- // Backoff.
- b := ac.resetBackoff
- timer := time.NewTimer(backoffFor)
- acctx := ac.ctx
+ addrs := ac.addrs
+ backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx)
+ // This will be the duration that dial gets to finish.
+ dialDuration := minConnectTimeout
+ if ac.dopts.minConnectTimeout != nil {
+ dialDuration = ac.dopts.minConnectTimeout()
+ }
+
+ if dialDuration < backoffFor {
+ // Give dial more time as we keep failing to connect.
+ dialDuration = backoffFor
+ }
+ // We can potentially spend all the time trying the first address, and
+ // if the server accepts the connection and then hangs, the following
+ // addresses will never be tried.
+ //
+ // The spec doesn't mention what should be done for multiple addresses.
+ // https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md#proposed-backoff-algorithm
+ connectDeadline := time.Now().Add(dialDuration)
+
+ ac.updateConnectivityState(connectivity.Connecting)
+ ac.transport = nil
ac.mu.Unlock()
- select {
- case <-timer.C:
+ newTr, addr, reconnect, err := ac.tryAllAddrs(addrs, connectDeadline)
+ if err != nil {
+ // After exhausting all addresses, the addrConn enters
+ // TRANSIENT_FAILURE.
ac.mu.Lock()
- ac.backoffIdx++
+ if ac.state == connectivity.Shutdown {
+ ac.mu.Unlock()
+ return
+ }
+ ac.updateConnectivityState(connectivity.TransientFailure)
+
+ // Backoff.
+ b := ac.resetBackoff
ac.mu.Unlock()
- case <-b:
- timer.Stop()
- case <-acctx.Done():
- timer.Stop()
+
+ timer := time.NewTimer(backoffFor)
+ select {
+ case <-timer.C:
+ ac.mu.Lock()
+ ac.backoffIdx++
+ ac.mu.Unlock()
+ case <-b:
+ timer.Stop()
+ case <-ac.ctx.Done():
+ timer.Stop()
+ return
+ }
+ continue
+ }
+
+ ac.mu.Lock()
+ if ac.state == connectivity.Shutdown {
+ ac.mu.Unlock()
+ newTr.Close()
return
}
+ ac.curAddr = addr
+ ac.transport = newTr
+ ac.backoffIdx = 0
+
+ hctx, hcancel := context.WithCancel(ac.ctx)
+ ac.startHealthCheck(hctx)
+ ac.mu.Unlock()
+
+ // Block until the created transport is down. And when this happens,
+ // we restart from the top of the addr list.
+ <-reconnect.Done()
+ hcancel()
+ // restart connecting - the top of the loop will set state to
+ // CONNECTING. This is against the current connectivity semantics doc,
+ // however it allows for graceful behavior for RPCs not yet dispatched
+ // - unfortunate timing would otherwise lead to the RPC failing even
+ // though the TRANSIENT_FAILURE state (called for by the doc) would be
+ // instantaneous.
+ //
+ // Ideally we should transition to Idle here and block until there is
+ // RPC activity that leads to the balancer requesting a reconnect of
+ // the associated SubConn.
}
}
-// createTransport creates a connection to one of the backends in addrs. It
-// sets ac.transport in the success case, or it returns an error if it was
-// unable to successfully create a transport.
-//
-// If waitForHandshake is enabled, it blocks until server preface arrives.
-func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time, reconnect *grpcsync.Event, prefaceReceived chan struct{}) (transport.ClientTransport, error) {
+// tryAllAddrs tries to creates a connection to the addresses, and stop when at the
+// first successful one. It returns the transport, the address and a Event in
+// the successful case. The Event fires when the returned transport disconnects.
+func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.Time) (transport.ClientTransport, resolver.Address, *grpcsync.Event, error) {
+ for _, addr := range addrs {
+ ac.mu.Lock()
+ if ac.state == connectivity.Shutdown {
+ ac.mu.Unlock()
+ return nil, resolver.Address{}, nil, errConnClosing
+ }
+
+ ac.cc.mu.RLock()
+ ac.dopts.copts.KeepaliveParams = ac.cc.mkp
+ ac.cc.mu.RUnlock()
+
+ copts := ac.dopts.copts
+ if ac.scopts.CredsBundle != nil {
+ copts.CredsBundle = ac.scopts.CredsBundle
+ }
+ ac.mu.Unlock()
+
+ if channelz.IsOn() {
+ channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
+ Desc: fmt.Sprintf("Subchannel picks a new address %q to connect", addr.Addr),
+ Severity: channelz.CtINFO,
+ })
+ }
+
+ newTr, reconnect, err := ac.createTransport(addr, copts, connectDeadline)
+ if err == nil {
+ return newTr, addr, reconnect, nil
+ }
+ ac.cc.blockingpicker.updateConnectionError(err)
+ }
+
+ // Couldn't connect to any address.
+ return nil, resolver.Address{}, nil, fmt.Errorf("couldn't connect to any address")
+}
+
+// createTransport creates a connection to addr. It returns the transport and a
+// Event in the successful case. The Event fires when the returned transport
+// disconnects.
+func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) (transport.ClientTransport, *grpcsync.Event, error) {
+ prefaceReceived := make(chan struct{})
onCloseCalled := make(chan struct{})
+ reconnect := grpcsync.NewEvent()
target := transport.TargetInfo{
Addr: addr.Addr,
@@ -1155,24 +1142,41 @@
Authority: ac.cc.authority,
}
- prefaceTimer := time.NewTimer(time.Until(connectDeadline))
-
+ once := sync.Once{}
onGoAway := func(r transport.GoAwayReason) {
ac.mu.Lock()
ac.adjustParams(r)
+ once.Do(func() {
+ if ac.state == connectivity.Ready {
+ // Prevent this SubConn from being used for new RPCs by setting its
+ // state to Connecting.
+ //
+ // TODO: this should be Idle when grpc-go properly supports it.
+ ac.updateConnectivityState(connectivity.Connecting)
+ }
+ })
ac.mu.Unlock()
reconnect.Fire()
}
onClose := func() {
+ ac.mu.Lock()
+ once.Do(func() {
+ if ac.state == connectivity.Ready {
+ // Prevent this SubConn from being used for new RPCs by setting its
+ // state to Connecting.
+ //
+ // TODO: this should be Idle when grpc-go properly supports it.
+ ac.updateConnectivityState(connectivity.Connecting)
+ }
+ })
+ ac.mu.Unlock()
close(onCloseCalled)
- prefaceTimer.Stop()
reconnect.Fire()
}
onPrefaceReceipt := func() {
close(prefaceReceived)
- prefaceTimer.Stop()
}
connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
@@ -1182,107 +1186,105 @@
}
newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt, onGoAway, onClose)
-
- if err == nil {
- if ac.dopts.reqHandshake == envconfig.RequireHandshakeOn {
- select {
- case <-prefaceTimer.C:
- // We didn't get the preface in time.
- newTr.Close()
- err = errors.New("timed out waiting for server handshake")
- case <-prefaceReceived:
- // We got the preface - huzzah! things are good.
- case <-onCloseCalled:
- // The transport has already closed - noop.
- return nil, errors.New("connection closed")
- }
- } else if ac.dopts.reqHandshake == envconfig.RequireHandshakeHybrid {
- go func() {
- select {
- case <-prefaceTimer.C:
- // We didn't get the preface in time.
- newTr.Close()
- case <-prefaceReceived:
- // We got the preface just in the nick of time - huzzah!
- case <-onCloseCalled:
- // The transport has already closed - noop.
- }
- }()
- }
- }
-
if err != nil {
// newTr is either nil, or closed.
- ac.cc.blockingpicker.updateConnectionError(err)
- ac.mu.Lock()
- if ac.state == connectivity.Shutdown {
- // ac.tearDown(...) has been invoked.
- ac.mu.Unlock()
-
- return nil, errConnClosing
- }
- ac.mu.Unlock()
grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v. Err :%v. Reconnecting...", addr, err)
- return nil, err
+ return nil, nil, err
}
- // Now there is a viable transport to be use, so set ac.transport to reflect the new viable transport.
- ac.mu.Lock()
- if ac.state == connectivity.Shutdown {
- ac.mu.Unlock()
+ select {
+ case <-time.After(connectDeadline.Sub(time.Now())):
+ // We didn't get the preface in time.
newTr.Close()
- return nil, errConnClosing
+ grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr)
+ return nil, nil, errors.New("timed out waiting for server handshake")
+ case <-prefaceReceived:
+ // We got the preface - huzzah! things are good.
+ case <-onCloseCalled:
+ // The transport has already closed - noop.
+ return nil, nil, errors.New("connection closed")
+ // TODO(deklerk) this should bail on ac.ctx.Done(). Add a test and fix.
}
- ac.mu.Unlock()
-
- // Now there is a viable transport to be use, so set ac.transport to reflect the new viable transport.
- ac.mu.Lock()
- if ac.state == connectivity.Shutdown {
- ac.mu.Unlock()
- newTr.Close()
- return nil, errConnClosing
- }
- ac.mu.Unlock()
-
- return newTr, nil
+ return newTr, reconnect, nil
}
-func (ac *addrConn) startHealthCheck(ctx context.Context, newTr transport.ClientTransport, addr resolver.Address, serviceName string) {
- // Set up the health check helper functions
- newStream := func() (interface{}, error) {
- return ac.newClientStream(ctx, &StreamDesc{ServerStreams: true}, "/grpc.health.v1.Health/Watch", newTr)
+// startHealthCheck starts the health checking stream (RPC) to watch the health
+// stats of this connection if health checking is requested and configured.
+//
+// LB channel health checking is enabled when all requirements below are met:
+// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption
+// 2. internal.HealthCheckFunc is set by importing the grpc/healthcheck package
+// 3. a service config with non-empty healthCheckConfig field is provided
+// 4. the load balancer requests it
+//
+// It sets addrConn to READY if the health checking stream is not started.
+//
+// Caller must hold ac.mu.
+func (ac *addrConn) startHealthCheck(ctx context.Context) {
+ var healthcheckManagingState bool
+ defer func() {
+ if !healthcheckManagingState {
+ ac.updateConnectivityState(connectivity.Ready)
+ }
+ }()
+
+ if ac.cc.dopts.disableHealthCheck {
+ return
}
- firstReady := true
- reportHealth := func(ok bool) {
+ healthCheckConfig := ac.cc.healthCheckConfig()
+ if healthCheckConfig == nil {
+ return
+ }
+ if !ac.scopts.HealthCheckEnabled {
+ return
+ }
+ healthCheckFunc := ac.cc.dopts.healthCheckFunc
+ if healthCheckFunc == nil {
+ // The health package is not imported to set health check function.
+ //
+ // TODO: add a link to the health check doc in the error message.
+ grpclog.Error("Health check is requested but health check function is not set.")
+ return
+ }
+
+ healthcheckManagingState = true
+
+ // Set up the health check helper functions.
+ currentTr := ac.transport
+ newStream := func(method string) (interface{}, error) {
+ ac.mu.Lock()
+ if ac.transport != currentTr {
+ ac.mu.Unlock()
+ return nil, status.Error(codes.Canceled, "the provided transport is no longer valid to use")
+ }
+ ac.mu.Unlock()
+ return newNonRetryClientStream(ctx, &StreamDesc{ServerStreams: true}, method, currentTr, ac)
+ }
+ setConnectivityState := func(s connectivity.State) {
ac.mu.Lock()
defer ac.mu.Unlock()
- if ac.transport != newTr {
+ if ac.transport != currentTr {
return
}
- if ok {
- if firstReady {
- firstReady = false
- ac.curAddr = addr
- }
- ac.updateConnectivityState(connectivity.Ready)
- } else {
- ac.updateConnectivityState(connectivity.TransientFailure)
- }
+ ac.updateConnectivityState(s)
}
- err := ac.cc.dopts.healthCheckFunc(ctx, newStream, reportHealth, serviceName)
- if err != nil {
- if status.Code(err) == codes.Unimplemented {
- if channelz.IsOn() {
- channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
- Desc: "Subchannel health check is unimplemented at server side, thus health check is disabled",
- Severity: channelz.CtError,
- })
+ // Start the health checking stream.
+ go func() {
+ err := ac.cc.dopts.healthCheckFunc(ctx, newStream, setConnectivityState, healthCheckConfig.ServiceName)
+ if err != nil {
+ if status.Code(err) == codes.Unimplemented {
+ if channelz.IsOn() {
+ channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
+ Desc: "Subchannel health check is unimplemented at server side, thus health check is disabled",
+ Severity: channelz.CtError,
+ })
+ }
+ grpclog.Error("Subchannel health check is unimplemented at server side, thus health check is disabled")
+ } else {
+ grpclog.Errorf("HealthCheckFunc exits with unexpected error %v", err)
}
- grpclog.Error("Subchannel health check is unimplemented at server side, thus health check is disabled")
- } else {
- grpclog.Errorf("HealthCheckFunc exits with unexpected error %v", err)
}
- }
+ }()
}
func (ac *addrConn) resetConnectBackoff() {
@@ -1332,7 +1334,6 @@
// between setting the state and logic that waits on context cancelation / etc.
ac.updateConnectivityState(connectivity.Shutdown)
ac.cancel()
- ac.tearDownErr = err
ac.curAddr = resolver.Address{}
if err == errConnDrain && curTr != nil {
// GracefulClose(...) may be executed multiple times when