[VOL-5291] On demand statistics for ONU and OLT

Change-Id: I4850bb0f0d2235122cb0c1bcf835b3672bb34436
Signed-off-by: Akash Reddy Kankanala <akash.kankanala@radisys.com>
diff --git a/vendor/google.golang.org/grpc/resolver_conn_wrapper.go b/vendor/google.golang.org/grpc/resolver_conn_wrapper.go
index 2c47cd5..b408b36 100644
--- a/vendor/google.golang.org/grpc/resolver_conn_wrapper.go
+++ b/vendor/google.golang.org/grpc/resolver_conn_wrapper.go
@@ -19,149 +19,204 @@
 package grpc
 
 import (
-	"fmt"
+	"context"
 	"strings"
 	"sync"
 
 	"google.golang.org/grpc/balancer"
-	"google.golang.org/grpc/credentials"
 	"google.golang.org/grpc/internal/channelz"
 	"google.golang.org/grpc/internal/grpcsync"
+	"google.golang.org/grpc/internal/pretty"
 	"google.golang.org/grpc/resolver"
 	"google.golang.org/grpc/serviceconfig"
 )
 
+// resolverStateUpdater wraps the single method used by ccResolverWrapper to
+// report a state update from the actual resolver implementation.
+type resolverStateUpdater interface {
+	updateResolverState(s resolver.State, err error) error
+}
+
 // ccResolverWrapper is a wrapper on top of cc for resolvers.
 // It implements resolver.ClientConn interface.
 type ccResolverWrapper struct {
-	cc         *ClientConn
-	resolverMu sync.Mutex
-	resolver   resolver.Resolver
-	done       *grpcsync.Event
-	curState   resolver.State
+	// The following fields are initialized when the wrapper is created and are
+	// read-only afterwards, and therefore can be accessed without a mutex.
+	cc                  resolverStateUpdater
+	channelzID          *channelz.Identifier
+	ignoreServiceConfig bool
+	opts                ccResolverWrapperOpts
+	serializer          *grpcsync.CallbackSerializer // To serialize all incoming calls.
+	serializerCancel    context.CancelFunc           // To close the serializer, accessed only from close().
 
-	incomingMu sync.Mutex // Synchronizes all the incoming calls.
+	// All incoming (resolver --> gRPC) calls are guaranteed to execute in a
+	// mutually exclusive manner as they are scheduled on the serializer.
+	// Fields accessed *only* in these serializer callbacks, can therefore be
+	// accessed without a mutex.
+	curState resolver.State
+
+	// mu guards access to the below fields.
+	mu       sync.Mutex
+	closed   bool
+	resolver resolver.Resolver // Accessed only from outgoing calls.
+}
+
+// ccResolverWrapperOpts wraps the arguments to be passed when creating a new
+// ccResolverWrapper.
+type ccResolverWrapperOpts struct {
+	target     resolver.Target       // User specified dial target to resolve.
+	builder    resolver.Builder      // Resolver builder to use.
+	bOpts      resolver.BuildOptions // Resolver build options to use.
+	channelzID *channelz.Identifier  // Channelz identifier for the channel.
 }
 
 // newCCResolverWrapper uses the resolver.Builder to build a Resolver and
 // returns a ccResolverWrapper object which wraps the newly built resolver.
-func newCCResolverWrapper(cc *ClientConn, rb resolver.Builder) (*ccResolverWrapper, error) {
+func newCCResolverWrapper(cc resolverStateUpdater, opts ccResolverWrapperOpts) (*ccResolverWrapper, error) {
+	ctx, cancel := context.WithCancel(context.Background())
 	ccr := &ccResolverWrapper{
-		cc:   cc,
-		done: grpcsync.NewEvent(),
+		cc:                  cc,
+		channelzID:          opts.channelzID,
+		ignoreServiceConfig: opts.bOpts.DisableServiceConfig,
+		opts:                opts,
+		serializer:          grpcsync.NewCallbackSerializer(ctx),
+		serializerCancel:    cancel,
 	}
 
-	var credsClone credentials.TransportCredentials
-	if creds := cc.dopts.copts.TransportCredentials; creds != nil {
-		credsClone = creds.Clone()
-	}
-	rbo := resolver.BuildOptions{
-		DisableServiceConfig: cc.dopts.disableServiceConfig,
-		DialCreds:            credsClone,
-		CredsBundle:          cc.dopts.copts.CredsBundle,
-		Dialer:               cc.dopts.copts.Dialer,
-	}
-
-	var err error
-	// We need to hold the lock here while we assign to the ccr.resolver field
-	// to guard against a data race caused by the following code path,
-	// rb.Build-->ccr.ReportError-->ccr.poll-->ccr.resolveNow, would end up
-	// accessing ccr.resolver which is being assigned here.
-	ccr.resolverMu.Lock()
-	defer ccr.resolverMu.Unlock()
-	ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, rbo)
+	// Cannot hold the lock at build time because the resolver can send an
+	// update or error inline and these incoming calls grab the lock to schedule
+	// a callback in the serializer.
+	r, err := opts.builder.Build(opts.target, ccr, opts.bOpts)
 	if err != nil {
+		cancel()
 		return nil, err
 	}
+
+	// Any error reported by the resolver at build time that leads to a
+	// re-resolution request from the balancer is dropped by grpc until we
+	// return from this function. So, we don't have to handle pending resolveNow
+	// requests here.
+	ccr.mu.Lock()
+	ccr.resolver = r
+	ccr.mu.Unlock()
+
 	return ccr, nil
 }
 
 func (ccr *ccResolverWrapper) resolveNow(o resolver.ResolveNowOptions) {
-	ccr.resolverMu.Lock()
-	if !ccr.done.HasFired() {
-		ccr.resolver.ResolveNow(o)
+	ccr.mu.Lock()
+	defer ccr.mu.Unlock()
+
+	// ccr.resolver field is set only after the call to Build() returns. But in
+	// the process of building, the resolver may send an error update which when
+	// propagated to the balancer may result in a re-resolution request.
+	if ccr.closed || ccr.resolver == nil {
+		return
 	}
-	ccr.resolverMu.Unlock()
+	ccr.resolver.ResolveNow(o)
 }
 
 func (ccr *ccResolverWrapper) close() {
-	ccr.resolverMu.Lock()
-	ccr.resolver.Close()
-	ccr.done.Fire()
-	ccr.resolverMu.Unlock()
+	ccr.mu.Lock()
+	if ccr.closed {
+		ccr.mu.Unlock()
+		return
+	}
+
+	channelz.Info(logger, ccr.channelzID, "Closing the name resolver")
+
+	// Close the serializer to ensure that no more calls from the resolver are
+	// handled, before actually closing the resolver.
+	ccr.serializerCancel()
+	ccr.closed = true
+	r := ccr.resolver
+	ccr.mu.Unlock()
+
+	// Give enqueued callbacks a chance to finish.
+	<-ccr.serializer.Done
+
+	// Spawn a goroutine to close the resolver (since it may block trying to
+	// cleanup all allocated resources) and return early.
+	go r.Close()
 }
 
+// serializerScheduleLocked is a convenience method to schedule a function to be
+// run on the serializer while holding ccr.mu.
+func (ccr *ccResolverWrapper) serializerScheduleLocked(f func(context.Context)) {
+	ccr.mu.Lock()
+	ccr.serializer.Schedule(f)
+	ccr.mu.Unlock()
+}
+
+// UpdateState is called by resolver implementations to report new state to gRPC
+// which includes addresses and service config.
 func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error {
-	ccr.incomingMu.Lock()
-	defer ccr.incomingMu.Unlock()
-	if ccr.done.HasFired() {
+	errCh := make(chan error, 1)
+	ok := ccr.serializer.Schedule(func(context.Context) {
+		ccr.addChannelzTraceEvent(s)
+		ccr.curState = s
+		if err := ccr.cc.updateResolverState(ccr.curState, nil); err == balancer.ErrBadResolverState {
+			errCh <- balancer.ErrBadResolverState
+			return
+		}
+		errCh <- nil
+	})
+	if !ok {
+		// The only time when Schedule() fail to add the callback to the
+		// serializer is when the serializer is closed, and this happens only
+		// when the resolver wrapper is closed.
 		return nil
 	}
-	channelz.Infof(logger, ccr.cc.channelzID, "ccResolverWrapper: sending update to cc: %v", s)
-	if channelz.IsOn() {
-		ccr.addChannelzTraceEvent(s)
-	}
-	ccr.curState = s
-	if err := ccr.cc.updateResolverState(ccr.curState, nil); err == balancer.ErrBadResolverState {
-		return balancer.ErrBadResolverState
-	}
-	return nil
+	return <-errCh
 }
 
+// ReportError is called by resolver implementations to report errors
+// encountered during name resolution to gRPC.
 func (ccr *ccResolverWrapper) ReportError(err error) {
-	ccr.incomingMu.Lock()
-	defer ccr.incomingMu.Unlock()
-	if ccr.done.HasFired() {
-		return
-	}
-	channelz.Warningf(logger, ccr.cc.channelzID, "ccResolverWrapper: reporting error to cc: %v", err)
-	ccr.cc.updateResolverState(resolver.State{}, err)
+	ccr.serializerScheduleLocked(func(_ context.Context) {
+		channelz.Warningf(logger, ccr.channelzID, "ccResolverWrapper: reporting error to cc: %v", err)
+		ccr.cc.updateResolverState(resolver.State{}, err)
+	})
 }
 
-// NewAddress is called by the resolver implementation to send addresses to gRPC.
+// NewAddress is called by the resolver implementation to send addresses to
+// gRPC.
 func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) {
-	ccr.incomingMu.Lock()
-	defer ccr.incomingMu.Unlock()
-	if ccr.done.HasFired() {
-		return
-	}
-	channelz.Infof(logger, ccr.cc.channelzID, "ccResolverWrapper: sending new addresses to cc: %v", addrs)
-	if channelz.IsOn() {
+	ccr.serializerScheduleLocked(func(_ context.Context) {
 		ccr.addChannelzTraceEvent(resolver.State{Addresses: addrs, ServiceConfig: ccr.curState.ServiceConfig})
-	}
-	ccr.curState.Addresses = addrs
-	ccr.cc.updateResolverState(ccr.curState, nil)
+		ccr.curState.Addresses = addrs
+		ccr.cc.updateResolverState(ccr.curState, nil)
+	})
 }
 
 // NewServiceConfig is called by the resolver implementation to send service
 // configs to gRPC.
 func (ccr *ccResolverWrapper) NewServiceConfig(sc string) {
-	ccr.incomingMu.Lock()
-	defer ccr.incomingMu.Unlock()
-	if ccr.done.HasFired() {
-		return
-	}
-	channelz.Infof(logger, ccr.cc.channelzID, "ccResolverWrapper: got new service config: %v", sc)
-	if ccr.cc.dopts.disableServiceConfig {
-		channelz.Info(logger, ccr.cc.channelzID, "Service config lookups disabled; ignoring config")
-		return
-	}
-	scpr := parseServiceConfig(sc)
-	if scpr.Err != nil {
-		channelz.Warningf(logger, ccr.cc.channelzID, "ccResolverWrapper: error parsing service config: %v", scpr.Err)
-		return
-	}
-	if channelz.IsOn() {
+	ccr.serializerScheduleLocked(func(_ context.Context) {
+		channelz.Infof(logger, ccr.channelzID, "ccResolverWrapper: got new service config: %s", sc)
+		if ccr.ignoreServiceConfig {
+			channelz.Info(logger, ccr.channelzID, "Service config lookups disabled; ignoring config")
+			return
+		}
+		scpr := parseServiceConfig(sc)
+		if scpr.Err != nil {
+			channelz.Warningf(logger, ccr.channelzID, "ccResolverWrapper: error parsing service config: %v", scpr.Err)
+			return
+		}
 		ccr.addChannelzTraceEvent(resolver.State{Addresses: ccr.curState.Addresses, ServiceConfig: scpr})
-	}
-	ccr.curState.ServiceConfig = scpr
-	ccr.cc.updateResolverState(ccr.curState, nil)
+		ccr.curState.ServiceConfig = scpr
+		ccr.cc.updateResolverState(ccr.curState, nil)
+	})
 }
 
+// ParseServiceConfig is called by resolver implementations to parse a JSON
+// representation of the service config.
 func (ccr *ccResolverWrapper) ParseServiceConfig(scJSON string) *serviceconfig.ParseResult {
 	return parseServiceConfig(scJSON)
 }
 
+// addChannelzTraceEvent adds a channelz trace event containing the new
+// state received from resolver implementations.
 func (ccr *ccResolverWrapper) addChannelzTraceEvent(s resolver.State) {
 	var updates []string
 	var oldSC, newSC *ServiceConfig
@@ -180,8 +235,5 @@
 	} else if len(ccr.curState.Addresses) == 0 && len(s.Addresses) > 0 {
 		updates = append(updates, "resolver returned new addresses")
 	}
-	channelz.AddTraceEvent(logger, ccr.cc.channelzID, 0, &channelz.TraceEventDesc{
-		Desc:     fmt.Sprintf("Resolver state updated: %+v (%v)", s, strings.Join(updates, "; ")),
-		Severity: channelz.CtInfo,
-	})
+	channelz.Infof(logger, ccr.channelzID, "Resolver state updated: %s (%v)", pretty.ToJSON(s), strings.Join(updates, "; "))
 }