[VOL-4442] grpc streaming connection monitoring
Change-Id: I8a361473a252f6d2b64578a97980b2b7b3618f55
diff --git a/cmd/openonu-adapter/main.go b/cmd/openonu-adapter/main.go
index 96bfabe..edc5088 100644
--- a/cmd/openonu-adapter/main.go
+++ b/cmd/openonu-adapter/main.go
@@ -38,9 +38,7 @@
"github.com/opencord/voltha-lib-go/v7/pkg/probe"
"github.com/opencord/voltha-lib-go/v7/pkg/version"
"github.com/opencord/voltha-protos/v5/go/adapter_service"
- "github.com/opencord/voltha-protos/v5/go/common"
"github.com/opencord/voltha-protos/v5/go/core_service"
- "github.com/opencord/voltha-protos/v5/go/health"
"github.com/opencord/voltha-protos/v5/go/onu_inter_adapter_service"
"github.com/opencord/voltha-protos/v5/go/voltha"
"google.golang.org/grpc"
@@ -60,14 +58,15 @@
type adapter struct {
//defaultAppName string
- instanceID string
- config *config.AdapterFlags
- kafkaClient kafka.Client
- kvClient kvstore.Client
- eventProxy eventif.EventProxy
- grpcServer *vgrpc.GrpcServer
- onuAdapter *ac.OpenONUAC
- coreClient *vgrpc.Client
+ instanceID string
+ config *config.AdapterFlags
+ kafkaClient kafka.Client
+ kvClient kvstore.Client
+ eventProxy eventif.EventProxy
+ grpcServer *vgrpc.GrpcServer
+ onuAdapter *ac.OpenONUAC
+ onuInterAdapter *ac.OpenONUACInterAdapter
+ coreClient *vgrpc.Client
}
func newAdapter(cf *config.AdapterFlags) *adapter {
@@ -133,15 +132,21 @@
if a.coreClient, err = vgrpc.NewClient(
a.config.AdapterEndpoint,
a.config.CoreEndpoint,
+ "core_service.CoreService",
a.coreRestarted); err != nil {
logger.Fatal(ctx, "grpc-client-not-created")
}
// Start the core grpc client
- go a.coreClient.Start(ctx, setAndTestCoreServiceHandler)
+ go a.coreClient.Start(ctx, getCoreServiceClientHandler)
// Create the open ONU interface adapter
- if a.onuAdapter, err = a.startVolthaInterfaceAdapter(ctx, a.coreClient, a.eventProxy, a.config, cm); err != nil {
- logger.Fatalw(ctx, "error-starting-volthaInterfaceAdapter for OpenOnt", log.Fields{"error": err})
+ if a.onuAdapter, err = a.startONUAdapter(ctx, a.coreClient, a.eventProxy, a.config, cm); err != nil {
+ logger.Fatalw(ctx, "error-starting-startONUAdapter", log.Fields{"error": err})
+ }
+
+ // Create the open ONU Inter adapter
+ if a.onuInterAdapter, err = a.startONUInterAdapter(ctx, a.onuAdapter); err != nil {
+ logger.Fatalw(ctx, "error-starting-startONUInterAdapter", log.Fields{"error": err})
}
// Create and start the grpc server
@@ -151,7 +156,7 @@
a.addAdapterService(ctx, a.grpcServer, a.onuAdapter)
//Register the onu inter adapter service
- a.addOnuInterAdapterService(ctx, a.grpcServer, a.onuAdapter)
+ a.addOnuInterAdapterService(ctx, a.grpcServer, a.onuInterAdapter)
go a.startGRPCService(ctx, a.grpcServer, onuAdapterService)
@@ -171,16 +176,22 @@
return nil
}
-// setAndTestCoreServiceHandler is used to test whether the remote gRPC service is up
-func setAndTestCoreServiceHandler(ctx context.Context, conn *grpc.ClientConn, clientConn *common.Connection) interface{} {
- svc := core_service.NewCoreServiceClient(conn)
- if h, err := svc.GetHealthStatus(ctx, clientConn); err != nil || h.State != health.HealthStatus_HEALTHY {
+// getCoreServiceClientHandler is used to setup the remote gRPC service
+func getCoreServiceClientHandler(ctx context.Context, conn *grpc.ClientConn) interface{} {
+ if conn == nil {
return nil
}
- return svc
+ return core_service.NewCoreServiceClient(conn)
}
func (a *adapter) stop(ctx context.Context) {
+ // Cleanup the grpc services first
+ if err := a.onuAdapter.Stop(ctx); err != nil {
+ logger.Errorw(ctx, "failure-stopping-onu-adapter-service", log.Fields{"error": err, "adapter": a.config.AdapterEndpoint})
+ }
+ if err := a.onuInterAdapter.Stop(ctx); err != nil {
+ logger.Errorw(ctx, "failure-stopping-onu-inter-adapter-service", log.Fields{"error": err, "adapter": a.config.AdapterEndpoint})
+ }
// Cleanup - applies only if we had a kvClient
if a.kvClient != nil {
// Release all reservations
@@ -248,7 +259,7 @@
return nil
}
-func (a *adapter) startVolthaInterfaceAdapter(ctx context.Context, cc *vgrpc.Client, ep eventif.EventProxy,
+func (a *adapter) startONUAdapter(ctx context.Context, cc *vgrpc.Client, ep eventif.EventProxy,
cfg *config.AdapterFlags, cm *conf.ConfigManager) (*ac.OpenONUAC, error) {
var err error
sAcONU := ac.NewOpenONUAC(ctx, cc, ep, a.kvClient, cfg, cm)
@@ -262,6 +273,19 @@
return sAcONU, nil
}
+func (a *adapter) startONUInterAdapter(ctx context.Context, onuA *ac.OpenONUAC) (*ac.OpenONUACInterAdapter, error) {
+ var err error
+ sAcONUInterAdapter := ac.NewOpenONUACAdapter(ctx, onuA)
+
+ if err = sAcONUInterAdapter.Start(ctx); err != nil {
+ logger.Fatalw(ctx, "error-starting-OpenONUACInterAdapter", log.Fields{"error": err})
+ return nil, err
+ }
+
+ logger.Info(ctx, "OpenONUACInterAdapter-started")
+ return sAcONUInterAdapter, nil
+}
+
func (a *adapter) registerWithCore(ctx context.Context, serviceName string, retries int) error {
adapterID := fmt.Sprintf("brcm_openomci_onu_%d", a.config.CurrentReplica)
vendorIdsList := strings.Split(a.config.OnuVendorIds, ",")