[VOL-4290] Voltha go library updates for gRPC migration
Change-Id: I1aa2774beb6b7ed7419bc45aeb53fcae8a8ecda0
diff --git a/vendor/google.golang.org/grpc/clientconn.go b/vendor/google.golang.org/grpc/clientconn.go
index a7643df..4414ba8 100644
--- a/vendor/google.golang.org/grpc/clientconn.go
+++ b/vendor/google.golang.org/grpc/clientconn.go
@@ -31,7 +31,7 @@
"time"
"google.golang.org/grpc/balancer"
- _ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin.
+ "google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
@@ -42,10 +42,12 @@
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/resolver"
- _ "google.golang.org/grpc/resolver/dns" // To register dns resolver.
- _ "google.golang.org/grpc/resolver/passthrough" // To register passthrough resolver.
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/status"
+
+ _ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin.
+ _ "google.golang.org/grpc/internal/resolver/dns" // To register dns resolver.
+ _ "google.golang.org/grpc/internal/resolver/passthrough" // To register passthrough resolver.
)
const (
@@ -186,11 +188,11 @@
}
if cc.dopts.defaultServiceConfigRawJSON != nil {
- sc, err := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON)
- if err != nil {
- return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, err)
+ scpr := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON)
+ if scpr.Err != nil {
+ return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, scpr.Err)
}
- cc.dopts.defaultServiceConfig = sc
+ cc.dopts.defaultServiceConfig, _ = scpr.Config.(*ServiceConfig)
}
cc.mkp = cc.dopts.copts.KeepaliveParams
@@ -235,9 +237,7 @@
}
}
if cc.dopts.bs == nil {
- cc.dopts.bs = backoff.Exponential{
- MaxDelay: DefaultBackoffConfig.MaxDelay,
- }
+ cc.dopts.bs = backoff.DefaultExponential
}
if cc.dopts.resolverBuilder == nil {
// Only try to parse target when resolver builder is not already set.
@@ -443,7 +443,18 @@
return csm.notifyChan
}
-// ClientConn represents a client connection to an RPC server.
+// ClientConn represents a virtual connection to a conceptual endpoint, to
+// perform RPCs.
+//
+// A ClientConn is free to have zero or more actual connections to the endpoint
+// based on configuration, load, etc. It is also free to determine which actual
+// endpoints to use and may change it every RPC, permitting client-side load
+// balancing.
+//
+// A ClientConn encapsulates a range of functionality including name
+// resolution, TCP connection establishment (with retries and backoff) and TLS
+// handshakes. It also handles errors on established connections by
+// re-resolving the name and reconnecting.
type ClientConn struct {
ctx context.Context
cancel context.CancelFunc
@@ -532,58 +543,104 @@
}
}
-func (cc *ClientConn) updateResolverState(s resolver.State) error {
+var emptyServiceConfig *ServiceConfig
+
+func init() {
+ cfg := parseServiceConfig("{}")
+ if cfg.Err != nil {
+ panic(fmt.Sprintf("impossible error parsing empty service config: %v", cfg.Err))
+ }
+ emptyServiceConfig = cfg.Config.(*ServiceConfig)
+}
+
+func (cc *ClientConn) maybeApplyDefaultServiceConfig(addrs []resolver.Address) {
+ if cc.sc != nil {
+ cc.applyServiceConfigAndBalancer(cc.sc, addrs)
+ return
+ }
+ if cc.dopts.defaultServiceConfig != nil {
+ cc.applyServiceConfigAndBalancer(cc.dopts.defaultServiceConfig, addrs)
+ } else {
+ cc.applyServiceConfigAndBalancer(emptyServiceConfig, addrs)
+ }
+}
+
+func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
+ defer cc.firstResolveEvent.Fire()
cc.mu.Lock()
- defer cc.mu.Unlock()
// Check if the ClientConn is already closed. Some fields (e.g.
// balancerWrapper) are set to nil when closing the ClientConn, and could
// cause nil pointer panic if we don't have this check.
if cc.conns == nil {
+ cc.mu.Unlock()
return nil
}
- if cc.dopts.disableServiceConfig || s.ServiceConfig == nil {
- if cc.dopts.defaultServiceConfig != nil && cc.sc == nil {
- cc.applyServiceConfig(cc.dopts.defaultServiceConfig)
+ if err != nil {
+ // May need to apply the initial service config in case the resolver
+ // doesn't support service configs, or doesn't provide a service config
+ // with the new addresses.
+ cc.maybeApplyDefaultServiceConfig(nil)
+
+ if cc.balancerWrapper != nil {
+ cc.balancerWrapper.resolverError(err)
}
- } else if sc, ok := s.ServiceConfig.(*ServiceConfig); ok {
- cc.applyServiceConfig(sc)
+
+ // No addresses are valid with err set; return early.
+ cc.mu.Unlock()
+ return balancer.ErrBadResolverState
+ }
+
+ var ret error
+ if cc.dopts.disableServiceConfig || s.ServiceConfig == nil {
+ cc.maybeApplyDefaultServiceConfig(s.Addresses)
+ // TODO: do we need to apply a failing LB policy if there is no
+ // default, per the error handling design?
+ } else {
+ if sc, ok := s.ServiceConfig.Config.(*ServiceConfig); s.ServiceConfig.Err == nil && ok {
+ cc.applyServiceConfigAndBalancer(sc, s.Addresses)
+ } else {
+ ret = balancer.ErrBadResolverState
+ if cc.balancerWrapper == nil {
+ var err error
+ if s.ServiceConfig.Err != nil {
+ err = status.Errorf(codes.Unavailable, "error parsing service config: %v", s.ServiceConfig.Err)
+ } else {
+ err = status.Errorf(codes.Unavailable, "illegal service config type: %T", s.ServiceConfig.Config)
+ }
+ cc.blockingpicker.updatePicker(base.NewErrPicker(err))
+ cc.csMgr.updateState(connectivity.TransientFailure)
+ cc.mu.Unlock()
+ return ret
+ }
+ }
}
var balCfg serviceconfig.LoadBalancingConfig
- if cc.dopts.balancerBuilder == nil {
- // Only look at balancer types and switch balancer if balancer dial
- // option is not set.
- var newBalancerName string
- if cc.sc != nil && cc.sc.lbConfig != nil {
- newBalancerName = cc.sc.lbConfig.name
- balCfg = cc.sc.lbConfig.cfg
- } else {
- var isGRPCLB bool
- for _, a := range s.Addresses {
- if a.Type == resolver.GRPCLB {
- isGRPCLB = true
- break
- }
- }
- if isGRPCLB {
- newBalancerName = grpclbName
- } else if cc.sc != nil && cc.sc.LB != nil {
- newBalancerName = *cc.sc.LB
- } else {
- newBalancerName = PickFirstBalancerName
- }
- }
- cc.switchBalancer(newBalancerName)
- } else if cc.balancerWrapper == nil {
- // Balancer dial option was set, and this is the first time handling
- // resolved addresses. Build a balancer with dopts.balancerBuilder.
- cc.curBalancerName = cc.dopts.balancerBuilder.Name()
- cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
+ if cc.dopts.balancerBuilder == nil && cc.sc != nil && cc.sc.lbConfig != nil {
+ balCfg = cc.sc.lbConfig.cfg
}
- cc.balancerWrapper.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
- return nil
+ cbn := cc.curBalancerName
+ bw := cc.balancerWrapper
+ cc.mu.Unlock()
+ if cbn != grpclbName {
+ // Filter any grpclb addresses since we don't have the grpclb balancer.
+ for i := 0; i < len(s.Addresses); {
+ if s.Addresses[i].Type == resolver.GRPCLB {
+ copy(s.Addresses[i:], s.Addresses[i+1:])
+ s.Addresses = s.Addresses[:len(s.Addresses)-1]
+ continue
+ }
+ i++
+ }
+ }
+ uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
+ if ret == nil {
+ ret = uccsErr // prefer ErrBadResolver state since any other error is
+ // currently meaningless to the caller.
+ }
+ return ret
}
// switchBalancer starts the switching from current balancer to the balancer
@@ -831,10 +888,10 @@
return t, done, nil
}
-func (cc *ClientConn) applyServiceConfig(sc *ServiceConfig) error {
+func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, addrs []resolver.Address) {
if sc == nil {
// should never reach here.
- return fmt.Errorf("got nil pointer for service config")
+ return
}
cc.sc = sc
@@ -850,7 +907,35 @@
cc.retryThrottler.Store((*retryThrottler)(nil))
}
- return nil
+ if cc.dopts.balancerBuilder == nil {
+ // Only look at balancer types and switch balancer if balancer dial
+ // option is not set.
+ var newBalancerName string
+ if cc.sc != nil && cc.sc.lbConfig != nil {
+ newBalancerName = cc.sc.lbConfig.name
+ } else {
+ var isGRPCLB bool
+ for _, a := range addrs {
+ if a.Type == resolver.GRPCLB {
+ isGRPCLB = true
+ break
+ }
+ }
+ if isGRPCLB {
+ newBalancerName = grpclbName
+ } else if cc.sc != nil && cc.sc.LB != nil {
+ newBalancerName = *cc.sc.LB
+ } else {
+ newBalancerName = PickFirstBalancerName
+ }
+ }
+ cc.switchBalancer(newBalancerName)
+ } else if cc.balancerWrapper == nil {
+ // Balancer dial option was set, and this is the first time handling
+ // resolved addresses. Build a balancer with dopts.balancerBuilder.
+ cc.curBalancerName = cc.dopts.balancerBuilder.Name()
+ cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
+ }
}
func (cc *ClientConn) resolveNow(o resolver.ResolveNowOption) {
@@ -875,8 +960,9 @@
// This API is EXPERIMENTAL.
func (cc *ClientConn) ResetConnectBackoff() {
cc.mu.Lock()
- defer cc.mu.Unlock()
- for ac := range cc.conns {
+ conns := cc.conns
+ cc.mu.Unlock()
+ for ac := range conns {
ac.resetConnectBackoff()
}
}
@@ -1136,10 +1222,16 @@
onCloseCalled := make(chan struct{})
reconnect := grpcsync.NewEvent()
+ authority := ac.cc.authority
+ // addr.ServerName takes precedent over ClientConn authority, if present.
+ if addr.ServerName != "" {
+ authority = addr.ServerName
+ }
+
target := transport.TargetInfo{
Addr: addr.Addr,
Metadata: addr.Metadata,
- Authority: ac.cc.authority,
+ Authority: authority,
}
once := sync.Once{}
@@ -1331,7 +1423,7 @@
curTr := ac.transport
ac.transport = nil
// We have to set the state to Shutdown before anything else to prevent races
- // between setting the state and logic that waits on context cancelation / etc.
+ // between setting the state and logic that waits on context cancellation / etc.
ac.updateConnectivityState(connectivity.Shutdown)
ac.cancel()
ac.curAddr = resolver.Address{}
@@ -1355,7 +1447,7 @@
},
})
// TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
- // the entity beng deleted, and thus prevent it from being deleted right away.
+ // the entity being deleted, and thus prevent it from being deleted right away.
channelz.RemoveEntry(ac.channelzID)
}
ac.mu.Unlock()