[VOL-4442] grpc streaming connection monitoring
Change-Id: I435a03fdc0ac2b549dc4512220148cb19c16db19
diff --git a/cmd/openolt-adapter/main.go b/cmd/openolt-adapter/main.go
index 434fc83..cffd2ab 100644
--- a/cmd/openolt-adapter/main.go
+++ b/cmd/openolt-adapter/main.go
@@ -38,10 +38,8 @@
"github.com/opencord/voltha-openolt-adapter/internal/pkg/config"
ac "github.com/opencord/voltha-openolt-adapter/internal/pkg/core"
"github.com/opencord/voltha-protos/v5/go/adapter_service"
- "github.com/opencord/voltha-protos/v5/go/common"
ca "github.com/opencord/voltha-protos/v5/go/core_adapter"
"github.com/opencord/voltha-protos/v5/go/core_service"
- "github.com/opencord/voltha-protos/v5/go/health"
"github.com/opencord/voltha-protos/v5/go/olt_inter_adapter_service"
"github.com/opencord/voltha-protos/v5/go/voltha"
"google.golang.org/grpc"
@@ -55,16 +53,17 @@
)
type adapter struct {
- instanceID string
- config *config.AdapterFlags
- grpcServer *vgrpc.GrpcServer
- oltAdapter *ac.OpenOLT
- kafkaClient kafka.Client
- kvClient kvstore.Client
- coreClient *vgrpc.Client
- eventProxy eventif.EventProxy
- halted bool
- exitChannel chan int
+ instanceID string
+ config *config.AdapterFlags
+ grpcServer *vgrpc.GrpcServer
+ oltAdapter *ac.OpenOLT
+ oltInterAdapter *ac.OpenOLTInterAdapter
+ kafkaClient kafka.Client
+ kvClient kvstore.Client
+ coreClient *vgrpc.Client
+ eventProxy eventif.EventProxy
+ halted bool
+ exitChannel chan int
}
func newAdapter(cf *config.AdapterFlags) *adapter {
@@ -133,17 +132,23 @@
if a.coreClient, err = vgrpc.NewClient(
a.config.AdapterEndpoint,
a.config.CoreEndpoint,
+ "core_service.CoreService",
a.coreRestarted); err != nil {
logger.Fatal(ctx, "grpc-client-not-created")
}
// Start the core grpc client
- go a.coreClient.Start(ctx, setAndTestCoreServiceHandler)
+ go a.coreClient.Start(ctx, getCoreServiceClientHandler)
// Create the open OLT adapter
if a.oltAdapter, err = a.startOpenOLT(ctx, a.coreClient, a.eventProxy, a.config, cm); err != nil {
logger.Fatalw(ctx, "error-starting-openolt", log.Fields{"error": err})
}
+ // Create the open OLT Inter adapter adapter
+ if a.oltInterAdapter, err = a.startOpenOLTInterAdapter(ctx, a.oltAdapter); err != nil {
+ logger.Fatalw(ctx, "error-starting-openolt-inter-adapter", log.Fields{"error": err})
+ }
+
// Create and start the grpc server
a.grpcServer = vgrpc.NewGrpcServer(a.config.GrpcAddress, nil, false, p)
@@ -151,7 +156,7 @@
a.addAdapterService(ctx, a.grpcServer, a.oltAdapter)
//Register the olt inter-adapter service
- a.addOltInterAdapterService(ctx, a.grpcServer, a.oltAdapter)
+ a.addOltInterAdapterService(ctx, a.grpcServer, a.oltInterAdapter)
// Start the grpc server
go a.startGRPCService(ctx, a.grpcServer, oltAdapterService)
@@ -171,13 +176,12 @@
return nil
}
-// setAndTestCoreServiceHandler is used to test whether the remote gRPC service is up
-func setAndTestCoreServiceHandler(ctx context.Context, conn *grpc.ClientConn, clientConn *common.Connection) interface{} {
- svc := core_service.NewCoreServiceClient(conn)
- if h, err := svc.GetHealthStatus(ctx, clientConn); err != nil || h.State != health.HealthStatus_HEALTHY {
+// getCoreServiceClientHandler is used to test whether the remote gRPC service is up
+func getCoreServiceClientHandler(ctx context.Context, conn *grpc.ClientConn) interface{} {
+ if conn == nil {
return nil
}
- return svc
+ return core_service.NewCoreServiceClient(conn)
}
/**
@@ -239,6 +243,14 @@
// send exit signal
a.exitChannel <- 0
+ // Stop all grpc processing
+ if err := a.oltAdapter.Stop(ctx); err != nil {
+ logger.Errorw(ctx, "failure-stopping-olt-adapter-service", log.Fields{"error": err, "adapter": a.config.AdapterName})
+ }
+ if err := a.oltInterAdapter.Stop(ctx); err != nil {
+ logger.Errorw(ctx, "failure-stopping-olt-inter-adapter-service", log.Fields{"error": err, "adapter": a.config.AdapterName})
+ }
+
// Cleanup - applies only if we had a kvClient
if a.kvClient != nil {
// Release all reservations
@@ -346,6 +358,19 @@
return sOLT, nil
}
+func (a *adapter) startOpenOLTInterAdapter(ctx context.Context, oo *ac.OpenOLT) (*ac.OpenOLTInterAdapter, error) {
+ logger.Info(ctx, "starting-open-olt-inter-adapter")
+ var err error
+ sOLTInterAdapter := ac.NewOpenOLTInterAdapter(oo)
+
+ if err = sOLTInterAdapter.Start(ctx); err != nil {
+ return nil, err
+ }
+
+ logger.Info(ctx, "open-olt-inter-adapter-started")
+ return sOLTInterAdapter, nil
+}
+
func (a *adapter) registerWithCore(ctx context.Context, serviceName string, retries int) error {
adapterID := fmt.Sprintf("openolt_%d", a.config.CurrentReplica)
logger.Infow(ctx, "registering-with-core", log.Fields{
diff --git a/cmd/openolt-adapter/main_test.go b/cmd/openolt-adapter/main_test.go
index a70949d..5c470ac 100644
--- a/cmd/openolt-adapter/main_test.go
+++ b/cmd/openolt-adapter/main_test.go
@@ -86,11 +86,12 @@
if ad.coreClient, err = vgrpc.NewClient(
"olt-endpoint",
ms.ApiEndpoint,
+ "core_service.CoreService",
ad.coreRestarted); err != nil {
t.Errorf("grpc client: expected error:nil, got error: %v", err)
}
// Start the core grpc client
- go ad.coreClient.Start(ctx, setAndTestCoreServiceHandler)
+ go ad.coreClient.Start(ctx, getCoreServiceClientHandler)
defer ad.coreClient.Stop(ctx)
err = ad.registerWithCore(ctx, coreService, 1)
if err != nil {