[VOL-4442] grpc streaming connection monitoring
Change-Id: I435a03fdc0ac2b549dc4512220148cb19c16db19
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index 0e46594..8248278 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -51,7 +51,6 @@
"github.com/opencord/voltha-protos/v5/go/common"
ca "github.com/opencord/voltha-protos/v5/go/core_adapter"
"github.com/opencord/voltha-protos/v5/go/extension"
- "github.com/opencord/voltha-protos/v5/go/health"
ia "github.com/opencord/voltha-protos/v5/go/inter_adapter"
"github.com/opencord/voltha-protos/v5/go/onu_inter_adapter_service"
of "github.com/opencord/voltha-protos/v5/go/openflow_13"
@@ -85,7 +84,7 @@
lockChildAdapterClients sync.RWMutex
EventProxy eventif.EventProxy
openOLT *OpenOLT
- exitChannel chan int
+ exitChannel chan struct{}
lockDevice sync.RWMutex
Client oop.OpenoltClient
transitionMap *TransitionMap
@@ -192,7 +191,7 @@
cloned := (proto.Clone(device)).(*voltha.Device)
dh.device = cloned
dh.openOLT = adapter
- dh.exitChannel = make(chan int, 1) // TODO: Why buffered?
+ dh.exitChannel = make(chan struct{})
dh.lockDevice = sync.RWMutex{}
dh.stopCollector = make(chan bool, 2) // TODO: Why buffered?
dh.stopHeartbeatCheck = make(chan bool, 2) // TODO: Why buffered?
@@ -229,13 +228,15 @@
logger.Debug(ctx, "device-agent-started")
}
-// stop stops the device dh. Not much to do for now
-func (dh *DeviceHandler) stop(ctx context.Context) {
+// Stop stops the device handler
+func (dh *DeviceHandler) Stop(ctx context.Context) {
dh.lockDevice.Lock()
defer dh.lockDevice.Unlock()
logger.Debug(ctx, "stopping-device-agent")
- dh.exitChannel <- 1
+ close(dh.exitChannel)
+ // Delete (which will stop also) all grpc connections to the child adapters
+ dh.deleteAdapterClients(ctx)
logger.Debug(ctx, "device-agent-stopped")
}
@@ -3219,11 +3220,13 @@
if dh.childAdapterClients[endpoint], err = vgrpc.NewClient(
dh.cfg.AdapterEndpoint,
endpoint,
- dh.onuAdapterRestarted); err != nil {
+ "onu_inter_adapter_service.OnuInterAdapterService",
+ dh.onuInterAdapterRestarted,
+ vgrpc.ClientContextData(dh.device.Id)); err != nil {
logger.Errorw(ctx, "grpc-client-not-created", log.Fields{"error": err, "endpoint": endpoint})
return err
}
- go dh.childAdapterClients[endpoint].Start(log.WithSpanFromContext(context.TODO(), ctx), dh.setAndTestOnuInterAdapterServiceHandler)
+ go dh.childAdapterClients[endpoint].Start(log.WithSpanFromContext(context.TODO(), ctx), dh.getOnuInterAdapterServiceClientHandler)
// Wait until we have a connection to the child adapter.
// Unlimited retries or until context expires
@@ -3281,30 +3284,18 @@
}
}
-// TODO: Any action the adapter needs to do following a onu adapter restart?
-func (dh *DeviceHandler) onuAdapterRestarted(ctx context.Context, endPoint string) error {
- logger.Warnw(ctx, "onu-adapter-reconnected", log.Fields{"endpoint": endPoint})
+// TODO: Any action the adapter needs to do following a onu adapter inter adapter service restart?
+func (dh *DeviceHandler) onuInterAdapterRestarted(ctx context.Context, endPoint string) error {
+ logger.Warnw(ctx, "onu-inter-adapter-service-reconnected", log.Fields{"endpoint": endPoint})
return nil
}
-// setAndTestOnuInterAdapterServiceHandler is used to test whether the remote gRPC service is up
-func (dh *DeviceHandler) setAndTestOnuInterAdapterServiceHandler(ctx context.Context, conn *grpc.ClientConn, clientConn *common.Connection) interface{} {
- // The onu adapter needs to know whether the olt adapter can connect to it. Since the olt adapter
- // has a grpc client connection per device handler (i.e. per olt device) to the onu adapter
- // then the onu adapter needs to know whether that specific client can connect to it. Because the
- // client uses a polling mechanism then not all grpc clients could be connected at the same time,
- // a maximum difference of 5 sec. We therefore add the parent device as additional contextual information
- // to this request.
- dh.lockDevice.RLock()
- if dh.device != nil {
- clientConn.ContextInfo = dh.device.Id
- }
- dh.lockDevice.RUnlock()
- svc := onu_inter_adapter_service.NewOnuInterAdapterServiceClient(conn)
- if h, err := svc.GetHealthStatus(ctx, clientConn); err != nil || h.State != health.HealthStatus_HEALTHY {
+// getOnuInterAdapterServiceClientHandler is used to setup the remote gRPC service
+func (dh *DeviceHandler) getOnuInterAdapterServiceClientHandler(ctx context.Context, conn *grpc.ClientConn) interface{} {
+ if conn == nil {
return nil
}
- return svc
+ return onu_inter_adapter_service.NewOnuInterAdapterServiceClient(conn)
}
func (dh *DeviceHandler) setDeviceDeletionInProgressFlag(flag bool) {
diff --git a/internal/pkg/core/device_handler_test.go b/internal/pkg/core/device_handler_test.go
index ea40a5c..8737ee2 100644
--- a/internal/pkg/core/device_handler_test.go
+++ b/internal/pkg/core/device_handler_test.go
@@ -1100,10 +1100,10 @@
dh := newMockDeviceHandler()
dh1 := negativeDeviceHandler()
dh.start(context.Background())
- dh.stop(context.Background())
+ dh.Stop(context.Background())
dh1.start(context.Background())
- dh1.stop(context.Background())
+ dh1.Stop(context.Background())
}
diff --git a/internal/pkg/core/openolt.go b/internal/pkg/core/openolt.go
index 3665cbd..a483111 100644
--- a/internal/pkg/core/openolt.go
+++ b/internal/pkg/core/openolt.go
@@ -20,6 +20,7 @@
import (
"context"
"errors"
+ "fmt"
"sync"
"time"
@@ -30,6 +31,7 @@
"github.com/opencord/voltha-lib-go/v7/pkg/log"
"github.com/opencord/voltha-openolt-adapter/internal/pkg/config"
"github.com/opencord/voltha-openolt-adapter/internal/pkg/olterrors"
+ "github.com/opencord/voltha-protos/v5/go/adapter_service"
"github.com/opencord/voltha-protos/v5/go/common"
ca "github.com/opencord/voltha-protos/v5/go/core_adapter"
"github.com/opencord/voltha-protos/v5/go/extension"
@@ -49,7 +51,7 @@
numOnus int
KVStoreAddress string
KVStoreType string
- exitChannel chan int
+ exitChannel chan struct{}
HeartbeatCheckInterval time.Duration
HeartbeatFailReportInterval time.Duration
GrpcTimeoutInterval time.Duration
@@ -64,7 +66,7 @@
coreClient *vgrpc.Client,
eventProxy eventif.EventProxy, cfg *config.AdapterFlags, cm *conf.ConfigManager) *OpenOLT {
var openOLT OpenOLT
- openOLT.exitChannel = make(chan int, 1)
+ openOLT.exitChannel = make(chan struct{})
openOLT.deviceHandlers = make(map[string]*DeviceHandler)
openOLT.config = cfg
openOLT.numOnus = cfg.OnuNumber
@@ -93,7 +95,15 @@
//Stop terminates the session
func (oo *OpenOLT) Stop(ctx context.Context) error {
logger.Info(ctx, "stopping-device-manager")
- oo.exitChannel <- 1
+ close(oo.exitChannel)
+ // Stop the device handlers
+ oo.stopAllDeviceHandlers(ctx)
+
+ // Stop the core grpc client connection
+ if oo.coreClient != nil {
+ oo.coreClient.Stop(ctx)
+ }
+
logger.Info(ctx, "device-manager-stopped")
return nil
}
@@ -121,9 +131,12 @@
return nil
}
-// GetHealthStatus is used as a service readiness validation as a grpc connection
-func (oo *OpenOLT) GetHealthStatus(ctx context.Context, clientConn *common.Connection) (*health.HealthStatus, error) {
- return &health.HealthStatus{State: health.HealthStatus_HEALTHY}, nil
+func (oo *OpenOLT) stopAllDeviceHandlers(ctx context.Context) {
+ oo.lockDeviceHandlersMap.Lock()
+ defer oo.lockDeviceHandlersMap.Unlock()
+ for _, handler := range oo.deviceHandlers {
+ handler.Stop(ctx)
+ }
}
// AdoptDevice creates a new device handler if not present already and then adopts the device
@@ -413,6 +426,50 @@
}
+// GetHealthStatus is used by a OltAdapterService client to detect a connection
+// lost with the gRPC server hosting the OltAdapterService service
+func (oo *OpenOLT) GetHealthStatus(stream adapter_service.AdapterService_GetHealthStatusServer) error {
+ ctx := context.Background()
+ logger.Debugw(ctx, "receive-stream-connection", log.Fields{"stream": stream})
+
+ if stream == nil {
+ return fmt.Errorf("conn-is-nil %v", stream)
+ }
+ initialRequestTime := time.Now()
+ var remoteClient *common.Connection
+ var tempClient *common.Connection
+ var err error
+loop:
+ for {
+ tempClient, err = stream.Recv()
+ if err != nil {
+ logger.Warnw(ctx, "received-stream-error", log.Fields{"remote-client": remoteClient, "error": err})
+ break loop
+ }
+ // Send a response back
+ err = stream.Send(&health.HealthStatus{State: health.HealthStatus_HEALTHY})
+ if err != nil {
+ logger.Warnw(ctx, "sending-stream-error", log.Fields{"remote-client": remoteClient, "error": err})
+ break loop
+ }
+
+ remoteClient = tempClient
+ logger.Debugw(ctx, "received-keep-alive", log.Fields{"remote-client": remoteClient})
+
+ select {
+ case <-stream.Context().Done():
+ logger.Infow(ctx, "stream-keep-alive-context-done", log.Fields{"remote-client": remoteClient, "error": stream.Context().Err()})
+ break loop
+ case <-oo.exitChannel:
+ logger.Warnw(ctx, "received-stop", log.Fields{"remote-client": remoteClient, "initial-conn-time": initialRequestTime})
+ break loop
+ default:
+ }
+ }
+ logger.Errorw(ctx, "connection-down", log.Fields{"remote-client": remoteClient, "error": err, "initial-conn-time": initialRequestTime})
+ return err
+}
+
/*
*
* Unimplemented APIs
diff --git a/internal/pkg/core/openoltInterAdapter.go b/internal/pkg/core/openoltInterAdapter.go
new file mode 100644
index 0000000..c8fb8bd
--- /dev/null
+++ b/internal/pkg/core/openoltInterAdapter.go
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2022-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+//Package core provides the utility for olt devices, flows and statistics
+package core
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "github.com/golang/protobuf/ptypes/empty"
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
+ "github.com/opencord/voltha-protos/v5/go/common"
+ "github.com/opencord/voltha-protos/v5/go/health"
+ ia "github.com/opencord/voltha-protos/v5/go/inter_adapter"
+ oltia "github.com/opencord/voltha-protos/v5/go/olt_inter_adapter_service"
+)
+
+//OpenOLTInterAdapter structure holds a reference to the oltAdapter
+type OpenOLTInterAdapter struct {
+ oltAdapter *OpenOLT
+ exitChannel chan struct{}
+}
+
+//NewOpenOLTInterAdapter returns a new instance of OpenOLTInterAdapter
+func NewOpenOLTInterAdapter(oltAdapter *OpenOLT) *OpenOLTInterAdapter {
+ return &OpenOLTInterAdapter{oltAdapter: oltAdapter, exitChannel: make(chan struct{})}
+}
+
+//Start starts (logs) the device manager
+func (oo *OpenOLTInterAdapter) Start(ctx context.Context) error {
+ return nil
+}
+
+//Stop terminates the session
+func (oo *OpenOLTInterAdapter) Stop(ctx context.Context) error {
+ close(oo.exitChannel)
+ return nil
+}
+
+// ProxyOmciRequest proxies an OMCI request from the child adapter
+func (oo *OpenOLTInterAdapter) ProxyOmciRequest(ctx context.Context, request *ia.OmciMessage) (*empty.Empty, error) {
+ return oo.oltAdapter.ProxyOmciRequest(ctx, request)
+}
+
+// ProxyOmciRequests proxies an OMCI request from the child adapter
+func (oo *OpenOLTInterAdapter) ProxyOmciRequests(ctx context.Context, request *ia.OmciMessages) (*empty.Empty, error) {
+ return oo.oltAdapter.ProxyOmciRequests(ctx, request)
+}
+
+// GetTechProfileInstance returns an instance of a tech profile
+func (oo *OpenOLTInterAdapter) GetTechProfileInstance(ctx context.Context, request *ia.TechProfileInstanceRequestMessage) (*ia.TechProfileDownloadMessage, error) {
+ return oo.oltAdapter.GetTechProfileInstance(ctx, request)
+}
+
+// GetHealthStatus is used by a OltInterAdapterService client to detect a connection
+// lost with the gRPC server hosting the OltInterAdapterService service
+func (oo *OpenOLTInterAdapter) GetHealthStatus(stream oltia.OltInterAdapterService_GetHealthStatusServer) error {
+ ctx := context.Background()
+ logger.Debugw(ctx, "receive-stream-connection", log.Fields{"stream": stream})
+
+ if stream == nil {
+ return fmt.Errorf("conn-is-nil %v", stream)
+ }
+ initialRequestTime := time.Now()
+ var remoteClient *common.Connection
+ var tempClient *common.Connection
+ var err error
+loop:
+ for {
+ tempClient, err = stream.Recv()
+ if err != nil {
+ logger.Warnw(ctx, "received-stream-error", log.Fields{"remote-client": remoteClient, "error": err})
+ break loop
+ }
+ err = stream.Send(&health.HealthStatus{State: health.HealthStatus_HEALTHY})
+ if err != nil {
+ logger.Warnw(ctx, "sending-stream-error", log.Fields{"remote-client": remoteClient, "error": err})
+ break loop
+ }
+
+ remoteClient = tempClient
+ logger.Debugw(ctx, "received-keep-alive", log.Fields{"remote-client": remoteClient})
+
+ select {
+ case <-stream.Context().Done():
+ logger.Infow(ctx, "stream-keep-alive-context-done", log.Fields{"remote-client": remoteClient, "error": stream.Context().Err()})
+ break loop
+ case <-oo.exitChannel:
+ logger.Warnw(ctx, "received-stop", log.Fields{"remote-client": remoteClient, "initial-conn-time": initialRequestTime})
+ break loop
+ default:
+ }
+ }
+ logger.Errorw(ctx, "connection-down", log.Fields{"remote-client": remoteClient, "error": err, "initial-conn-time": initialRequestTime})
+ return err
+}
diff --git a/internal/pkg/core/openolt_test.go b/internal/pkg/core/openolt_test.go
index c549d44..f909128 100644
--- a/internal/pkg/core/openolt_test.go
+++ b/internal/pkg/core/openolt_test.go
@@ -50,7 +50,7 @@
numOnus int
KVStoreAddress string
KVStoreType string
- exitChannel chan int
+ exitChannel chan struct{}
ctx context.Context
}
@@ -543,7 +543,7 @@
args args
wantErr error
}{
- {"stop-1", &fields{exitChannel: make(chan int, 1)}, args{}, errors.New("stop error")},
+ {"stop-1", &fields{exitChannel: make(chan struct{})}, args{}, errors.New("stop error")},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {