[VOL-4931] openoltAdapter: memory leak seen in long term tests
Change-Id: Ieb60bd9b4bdf88fc22a3b6704854591c0c30717d
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index 5dd278c..dadd316 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -265,6 +265,24 @@
return kvbackend
}
+// CloseKVClient closes open KV clients
+func (dh *DeviceHandler) CloseKVClient(ctx context.Context) {
+ if dh.resourceMgr != nil {
+ for _, rscMgr := range dh.resourceMgr {
+ if rscMgr != nil {
+ rscMgr.CloseKVClient(ctx)
+ }
+ }
+ }
+ if dh.flowMgr != nil {
+ for _, flMgr := range dh.flowMgr {
+ if flMgr != nil {
+ flMgr.CloseKVClient(ctx)
+ }
+ }
+ }
+}
+
// start save the device to the data model
func (dh *DeviceHandler) start(ctx context.Context) {
dh.lockDevice.Lock()
@@ -974,6 +992,12 @@
var err error
dh.deviceInfo, err = dh.populateDeviceInfo(ctx)
+ if dh.flowMgr != nil {
+ dh.StopAllFlowRoutines(ctx)
+ }
+
+ dh.CloseKVClient(ctx)
+
if err != nil {
return olterrors.NewErrAdapter("populate-device-info-failed", log.Fields{"device-id": dh.device.Id}, err)
}
@@ -2190,20 +2214,8 @@
*/
dh.setDeviceDeletionInProgressFlag(true)
- var wg sync.WaitGroup
- wg.Add(1) // for the mcast routine below to finish
- go dh.StopAllMcastHandlerRoutines(ctx, &wg)
- for _, flMgr := range dh.flowMgr {
- if flMgr != nil {
- wg.Add(1) // for the flow handler routine below to finish
- go flMgr.StopAllFlowHandlerRoutines(ctx, &wg)
- }
- }
- if !dh.waitForTimeoutOrCompletion(&wg, time.Second*30) {
- logger.Warnw(ctx, "timed out waiting for stopping flow and group handlers", log.Fields{"deviceID": device.Id})
- } else {
- logger.Infow(ctx, "all flow and group handlers shutdown gracefully", log.Fields{"deviceID": device.Id})
- }
+
+ dh.StopAllFlowRoutines(ctx)
dh.cleanupDeviceResources(ctx)
logger.Debugw(ctx, "removed-device-from-Resource-manager-KV-store", log.Fields{"device-id": dh.device.Id})
@@ -2238,6 +2250,25 @@
dh.deleteAdapterClients(ctx)
return nil
}
+
+// StopAllFlowRoutines stops all flow routines
+func (dh *DeviceHandler) StopAllFlowRoutines(ctx context.Context) {
+ var wg sync.WaitGroup
+ wg.Add(1) // for the mcast routine below to finish
+ go dh.StopAllMcastHandlerRoutines(ctx, &wg)
+ for _, flMgr := range dh.flowMgr {
+ if flMgr != nil {
+ wg.Add(1) // for the flow handler routine below to finish
+ go flMgr.StopAllFlowHandlerRoutines(ctx, &wg)
+ }
+ }
+ if !dh.waitForTimeoutOrCompletion(&wg, time.Second*30) {
+ logger.Warnw(ctx, "timed out waiting for stopping flow and group handlers", log.Fields{"deviceID": dh.device.Id})
+ } else {
+ logger.Infow(ctx, "all flow and group handlers shutdown gracefully", log.Fields{"deviceID": dh.device.Id})
+ }
+}
+
func (dh *DeviceHandler) cleanupDeviceResources(ctx context.Context) {
if dh.resourceMgr != nil {
@@ -2262,6 +2293,8 @@
_ = dh.resourceMgr[dh.totalPonPorts].DeleteAllFlowIDsForGemForIntf(ctx)
}
+ dh.CloseKVClient(ctx)
+
// Take one final sweep at cleaning up KV store for the OLT device
// Clean everything at <base-path-prefix>/openolt/<device-id>
kvClient, err := kvstore.NewEtcdClient(ctx, dh.openOLT.KVStoreAddress, rsrcMgr.KvstoreTimeout, log.FatalLevel)
@@ -2273,6 +2306,7 @@
Timeout: rsrcMgr.KvstoreTimeout,
PathPrefix: fmt.Sprintf(rsrcMgr.BasePathKvStore, dh.cm.Backend.PathPrefix, dh.device.Id)}
_ = kvBackend.DeleteWithPrefix(ctx, "")
+ kvBackend.Client.Close(ctx)
}
/*Delete ONU map for the device*/
@@ -2658,6 +2692,7 @@
go dh.eventMgr.oltCommunicationEvent(ctx, cloned, raisedTs)
dh.cleanupDeviceResources(ctx)
+ logger.Debugw(ctx, "removed-device-from-Resource-manager-KV-store", log.Fields{"device-id": dh.device.Id})
dh.lockDevice.RLock()
// Stop the Stats collector
@@ -2670,20 +2705,7 @@
}
dh.lockDevice.RUnlock()
- var wg sync.WaitGroup
- wg.Add(1) // for the multicast handler routine
- go dh.StopAllMcastHandlerRoutines(ctx, &wg)
- for _, flMgr := range dh.flowMgr {
- if flMgr != nil {
- wg.Add(1) // for the flow handler routine
- go flMgr.StopAllFlowHandlerRoutines(ctx, &wg)
- }
- }
- if !dh.waitForTimeoutOrCompletion(&wg, time.Second*30) {
- logger.Warnw(ctx, "timed out waiting for stopping flow and group handlers", log.Fields{"deviceID": device.Id})
- } else {
- logger.Infow(ctx, "all flow and group handlers shutdown gracefully", log.Fields{"deviceID": device.Id})
- }
+ dh.StopAllFlowRoutines(ctx)
//reset adapter reconcile flag
dh.adapterPreviouslyConnected = false
@@ -3214,6 +3236,16 @@
}
}
}
+
+ if dh.incomingMcastFlowOrGroup != nil {
+ for k := range dh.incomingMcastFlowOrGroup {
+ if dh.incomingMcastFlowOrGroup[k] != nil {
+ dh.incomingMcastFlowOrGroup[k] = nil
+ }
+ }
+ dh.incomingMcastFlowOrGroup = nil
+ }
+
wg.Done()
logger.Debug(ctx, "stopped all mcast handler routines")
}