[VOL-4931] openoltAdapter: memory leak seen in long term tests

Change-Id: Ieb60bd9b4bdf88fc22a3b6704854591c0c30717d
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index 5dd278c..dadd316 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -265,6 +265,24 @@
 	return kvbackend
 }
 
+// CloseKVClient closes open KV clients
+func (dh *DeviceHandler) CloseKVClient(ctx context.Context) {
+	if dh.resourceMgr != nil {
+		for _, rscMgr := range dh.resourceMgr {
+			if rscMgr != nil {
+				rscMgr.CloseKVClient(ctx)
+			}
+		}
+	}
+	if dh.flowMgr != nil {
+		for _, flMgr := range dh.flowMgr {
+			if flMgr != nil {
+				flMgr.CloseKVClient(ctx)
+			}
+		}
+	}
+}
+
 // start save the device to the data model
 func (dh *DeviceHandler) start(ctx context.Context) {
 	dh.lockDevice.Lock()
@@ -974,6 +992,12 @@
 	var err error
 	dh.deviceInfo, err = dh.populateDeviceInfo(ctx)
 
+	if dh.flowMgr != nil {
+		dh.StopAllFlowRoutines(ctx)
+	}
+
+	dh.CloseKVClient(ctx)
+
 	if err != nil {
 		return olterrors.NewErrAdapter("populate-device-info-failed", log.Fields{"device-id": dh.device.Id}, err)
 	}
@@ -2190,20 +2214,8 @@
 	*/
 
 	dh.setDeviceDeletionInProgressFlag(true)
-	var wg sync.WaitGroup
-	wg.Add(1) // for the mcast routine below to finish
-	go dh.StopAllMcastHandlerRoutines(ctx, &wg)
-	for _, flMgr := range dh.flowMgr {
-		if flMgr != nil {
-			wg.Add(1) // for the flow handler routine below to finish
-			go flMgr.StopAllFlowHandlerRoutines(ctx, &wg)
-		}
-	}
-	if !dh.waitForTimeoutOrCompletion(&wg, time.Second*30) {
-		logger.Warnw(ctx, "timed out waiting for stopping flow and group handlers", log.Fields{"deviceID": device.Id})
-	} else {
-		logger.Infow(ctx, "all flow and group handlers shutdown gracefully", log.Fields{"deviceID": device.Id})
-	}
+
+	dh.StopAllFlowRoutines(ctx)
 
 	dh.cleanupDeviceResources(ctx)
 	logger.Debugw(ctx, "removed-device-from-Resource-manager-KV-store", log.Fields{"device-id": dh.device.Id})
@@ -2238,6 +2250,25 @@
 	dh.deleteAdapterClients(ctx)
 	return nil
 }
+
+// StopAllFlowRoutines stops all flow routines
+func (dh *DeviceHandler) StopAllFlowRoutines(ctx context.Context) {
+	var wg sync.WaitGroup
+	wg.Add(1) // for the mcast routine below to finish
+	go dh.StopAllMcastHandlerRoutines(ctx, &wg)
+	for _, flMgr := range dh.flowMgr {
+		if flMgr != nil {
+			wg.Add(1) // for the flow handler routine below to finish
+			go flMgr.StopAllFlowHandlerRoutines(ctx, &wg)
+		}
+	}
+	if !dh.waitForTimeoutOrCompletion(&wg, time.Second*30) {
+		logger.Warnw(ctx, "timed out waiting for stopping flow and group handlers", log.Fields{"deviceID": dh.device.Id})
+	} else {
+		logger.Infow(ctx, "all flow and group handlers shutdown gracefully", log.Fields{"deviceID": dh.device.Id})
+	}
+}
+
 func (dh *DeviceHandler) cleanupDeviceResources(ctx context.Context) {
 
 	if dh.resourceMgr != nil {
@@ -2262,6 +2293,8 @@
 		_ = dh.resourceMgr[dh.totalPonPorts].DeleteAllFlowIDsForGemForIntf(ctx)
 	}
 
+	dh.CloseKVClient(ctx)
+
 	// Take one final sweep at cleaning up KV store for the OLT device
 	// Clean everything at <base-path-prefix>/openolt/<device-id>
 	kvClient, err := kvstore.NewEtcdClient(ctx, dh.openOLT.KVStoreAddress, rsrcMgr.KvstoreTimeout, log.FatalLevel)
@@ -2273,6 +2306,7 @@
 			Timeout:    rsrcMgr.KvstoreTimeout,
 			PathPrefix: fmt.Sprintf(rsrcMgr.BasePathKvStore, dh.cm.Backend.PathPrefix, dh.device.Id)}
 		_ = kvBackend.DeleteWithPrefix(ctx, "")
+		kvBackend.Client.Close(ctx)
 	}
 
 	/*Delete ONU map for the device*/
@@ -2658,6 +2692,7 @@
 	go dh.eventMgr.oltCommunicationEvent(ctx, cloned, raisedTs)
 
 	dh.cleanupDeviceResources(ctx)
+	logger.Debugw(ctx, "removed-device-from-Resource-manager-KV-store", log.Fields{"device-id": dh.device.Id})
 
 	dh.lockDevice.RLock()
 	// Stop the Stats collector
@@ -2670,20 +2705,7 @@
 	}
 	dh.lockDevice.RUnlock()
 
-	var wg sync.WaitGroup
-	wg.Add(1) // for the multicast handler routine
-	go dh.StopAllMcastHandlerRoutines(ctx, &wg)
-	for _, flMgr := range dh.flowMgr {
-		if flMgr != nil {
-			wg.Add(1) // for the flow handler routine
-			go flMgr.StopAllFlowHandlerRoutines(ctx, &wg)
-		}
-	}
-	if !dh.waitForTimeoutOrCompletion(&wg, time.Second*30) {
-		logger.Warnw(ctx, "timed out waiting for stopping flow and group handlers", log.Fields{"deviceID": device.Id})
-	} else {
-		logger.Infow(ctx, "all flow and group handlers shutdown gracefully", log.Fields{"deviceID": device.Id})
-	}
+	dh.StopAllFlowRoutines(ctx)
 
 	//reset adapter reconcile flag
 	dh.adapterPreviouslyConnected = false
@@ -3214,6 +3236,16 @@
 			}
 		}
 	}
+
+	if dh.incomingMcastFlowOrGroup != nil {
+		for k := range dh.incomingMcastFlowOrGroup {
+			if dh.incomingMcastFlowOrGroup[k] != nil {
+				dh.incomingMcastFlowOrGroup[k] = nil
+			}
+		}
+		dh.incomingMcastFlowOrGroup = nil
+	}
+
 	wg.Done()
 	logger.Debug(ctx, "stopped all mcast handler routines")
 }