[VOL-4931] openoltAdapter: memory leak seen in long term tests

Change-Id: Ieb60bd9b4bdf88fc22a3b6704854591c0c30717d
diff --git a/VERSION b/VERSION
index a50236a..f77856a 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-4.2.12
+4.3.1
diff --git a/go.mod b/go.mod
index de6b24b..fe0cc84 100644
--- a/go.mod
+++ b/go.mod
@@ -13,7 +13,7 @@
 	github.com/gogo/protobuf v1.3.2
 	github.com/golang/protobuf v1.5.2
 	github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
-	github.com/opencord/voltha-lib-go/v7 v7.3.2
+	github.com/opencord/voltha-lib-go/v7 v7.4.1
 	github.com/opencord/voltha-protos/v5 v5.3.8
 	github.com/stretchr/testify v1.7.0
 	go.etcd.io/etcd v3.3.25+incompatible
diff --git a/go.sum b/go.sum
index 8057192..ae47da0 100644
--- a/go.sum
+++ b/go.sum
@@ -198,8 +198,8 @@
 github.com/onsi/gomega v1.10.3/go.mod h1:V9xEwhxec5O8UDM77eCW8vLymOMltsqPVYWrpDsH8xc=
 github.com/onsi/gomega v1.14.0 h1:ep6kpPVwmr/nTbklSx2nrLNSIO62DoYAhnPNIMhK8gI=
 github.com/onsi/gomega v1.14.0/go.mod h1:cIuvLEne0aoVhAgh/O6ac0Op8WWw9H6eYCriF+tEHG0=
-github.com/opencord/voltha-lib-go/v7 v7.3.2 h1:mvQE+HTf3sLXIMulkDQJbbR67lIaV/Y6IIj1co0vrhU=
-github.com/opencord/voltha-lib-go/v7 v7.3.2/go.mod h1:3XnWQBHALGZTm5n3j401zKGG9aL2UqSU3/owGwNmcxM=
+github.com/opencord/voltha-lib-go/v7 v7.4.1 h1:8tLKwQMOxkBb2vvKtDfUx8uoD2p2lkogwxJDd/PFaIA=
+github.com/opencord/voltha-lib-go/v7 v7.4.1/go.mod h1:3XnWQBHALGZTm5n3j401zKGG9aL2UqSU3/owGwNmcxM=
 github.com/opencord/voltha-protos/v5 v5.3.8 h1:tL8I1wtOfuMnKMQvgN1Ul+8YL/LTBm0PpNuxU1usGDw=
 github.com/opencord/voltha-protos/v5 v5.3.8/go.mod h1:ZGcyW79kQKIo7AySo1LRu613E6uiozixrCF0yNB/4x8=
 github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index 5dd278c..dadd316 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -265,6 +265,24 @@
 	return kvbackend
 }
 
+// CloseKVClient closes open KV clients
+func (dh *DeviceHandler) CloseKVClient(ctx context.Context) {
+	if dh.resourceMgr != nil {
+		for _, rscMgr := range dh.resourceMgr {
+			if rscMgr != nil {
+				rscMgr.CloseKVClient(ctx)
+			}
+		}
+	}
+	if dh.flowMgr != nil {
+		for _, flMgr := range dh.flowMgr {
+			if flMgr != nil {
+				flMgr.CloseKVClient(ctx)
+			}
+		}
+	}
+}
+
 // start save the device to the data model
 func (dh *DeviceHandler) start(ctx context.Context) {
 	dh.lockDevice.Lock()
@@ -974,6 +992,12 @@
 	var err error
 	dh.deviceInfo, err = dh.populateDeviceInfo(ctx)
 
+	if dh.flowMgr != nil {
+		dh.StopAllFlowRoutines(ctx)
+	}
+
+	dh.CloseKVClient(ctx)
+
 	if err != nil {
 		return olterrors.NewErrAdapter("populate-device-info-failed", log.Fields{"device-id": dh.device.Id}, err)
 	}
@@ -2190,20 +2214,8 @@
 	*/
 
 	dh.setDeviceDeletionInProgressFlag(true)
-	var wg sync.WaitGroup
-	wg.Add(1) // for the mcast routine below to finish
-	go dh.StopAllMcastHandlerRoutines(ctx, &wg)
-	for _, flMgr := range dh.flowMgr {
-		if flMgr != nil {
-			wg.Add(1) // for the flow handler routine below to finish
-			go flMgr.StopAllFlowHandlerRoutines(ctx, &wg)
-		}
-	}
-	if !dh.waitForTimeoutOrCompletion(&wg, time.Second*30) {
-		logger.Warnw(ctx, "timed out waiting for stopping flow and group handlers", log.Fields{"deviceID": device.Id})
-	} else {
-		logger.Infow(ctx, "all flow and group handlers shutdown gracefully", log.Fields{"deviceID": device.Id})
-	}
+
+	dh.StopAllFlowRoutines(ctx)
 
 	dh.cleanupDeviceResources(ctx)
 	logger.Debugw(ctx, "removed-device-from-Resource-manager-KV-store", log.Fields{"device-id": dh.device.Id})
@@ -2238,6 +2250,25 @@
 	dh.deleteAdapterClients(ctx)
 	return nil
 }
+
+// StopAllFlowRoutines stops all flow routines
+func (dh *DeviceHandler) StopAllFlowRoutines(ctx context.Context) {
+	var wg sync.WaitGroup
+	wg.Add(1) // for the mcast routine below to finish
+	go dh.StopAllMcastHandlerRoutines(ctx, &wg)
+	for _, flMgr := range dh.flowMgr {
+		if flMgr != nil {
+			wg.Add(1) // for the flow handler routine below to finish
+			go flMgr.StopAllFlowHandlerRoutines(ctx, &wg)
+		}
+	}
+	if !dh.waitForTimeoutOrCompletion(&wg, time.Second*30) {
+		logger.Warnw(ctx, "timed out waiting for stopping flow and group handlers", log.Fields{"deviceID": dh.device.Id})
+	} else {
+		logger.Infow(ctx, "all flow and group handlers shutdown gracefully", log.Fields{"deviceID": dh.device.Id})
+	}
+}
+
 func (dh *DeviceHandler) cleanupDeviceResources(ctx context.Context) {
 
 	if dh.resourceMgr != nil {
@@ -2262,6 +2293,8 @@
 		_ = dh.resourceMgr[dh.totalPonPorts].DeleteAllFlowIDsForGemForIntf(ctx)
 	}
 
+	dh.CloseKVClient(ctx)
+
 	// Take one final sweep at cleaning up KV store for the OLT device
 	// Clean everything at <base-path-prefix>/openolt/<device-id>
 	kvClient, err := kvstore.NewEtcdClient(ctx, dh.openOLT.KVStoreAddress, rsrcMgr.KvstoreTimeout, log.FatalLevel)
@@ -2273,6 +2306,7 @@
 			Timeout:    rsrcMgr.KvstoreTimeout,
 			PathPrefix: fmt.Sprintf(rsrcMgr.BasePathKvStore, dh.cm.Backend.PathPrefix, dh.device.Id)}
 		_ = kvBackend.DeleteWithPrefix(ctx, "")
+		kvBackend.Client.Close(ctx)
 	}
 
 	/*Delete ONU map for the device*/
@@ -2658,6 +2692,7 @@
 	go dh.eventMgr.oltCommunicationEvent(ctx, cloned, raisedTs)
 
 	dh.cleanupDeviceResources(ctx)
+	logger.Debugw(ctx, "removed-device-from-Resource-manager-KV-store", log.Fields{"device-id": dh.device.Id})
 
 	dh.lockDevice.RLock()
 	// Stop the Stats collector
@@ -2670,20 +2705,7 @@
 	}
 	dh.lockDevice.RUnlock()
 
-	var wg sync.WaitGroup
-	wg.Add(1) // for the multicast handler routine
-	go dh.StopAllMcastHandlerRoutines(ctx, &wg)
-	for _, flMgr := range dh.flowMgr {
-		if flMgr != nil {
-			wg.Add(1) // for the flow handler routine
-			go flMgr.StopAllFlowHandlerRoutines(ctx, &wg)
-		}
-	}
-	if !dh.waitForTimeoutOrCompletion(&wg, time.Second*30) {
-		logger.Warnw(ctx, "timed out waiting for stopping flow and group handlers", log.Fields{"deviceID": device.Id})
-	} else {
-		logger.Infow(ctx, "all flow and group handlers shutdown gracefully", log.Fields{"deviceID": device.Id})
-	}
+	dh.StopAllFlowRoutines(ctx)
 
 	//reset adapter reconcile flag
 	dh.adapterPreviouslyConnected = false
@@ -3214,6 +3236,16 @@
 			}
 		}
 	}
+
+	if dh.incomingMcastFlowOrGroup != nil {
+		for k := range dh.incomingMcastFlowOrGroup {
+			if dh.incomingMcastFlowOrGroup[k] != nil {
+				dh.incomingMcastFlowOrGroup[k] = nil
+			}
+		}
+		dh.incomingMcastFlowOrGroup = nil
+	}
+
 	wg.Done()
 	logger.Debug(ctx, "stopped all mcast handler routines")
 }
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index b06af00..087f234 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -216,6 +216,13 @@
 	flowHandlerRoutineActive []bool
 }
 
+// CloseKVClient closes open KV clients
+func (f *OpenOltFlowMgr) CloseKVClient(ctx context.Context) {
+	if f.techprofile != nil {
+		f.techprofile.CloseKVClient(ctx)
+	}
+}
+
 //NewFlowManager creates OpenOltFlowMgr object and initializes the parameters
 func NewFlowManager(ctx context.Context, dh *DeviceHandler, rMgr *rsrcMgr.OpenOltResourceMgr, grpMgr *OpenOltGroupMgr, ponPortIdx uint32) *OpenOltFlowMgr {
 	logger.Infow(ctx, "initializing-flow-manager", log.Fields{"device-id": dh.device.Id})
@@ -2313,7 +2320,19 @@
 				logger.Warnw(ctx, "timeout stopping flow handler routine", log.Fields{"onuID": i, "deviceID": f.deviceHandler.device.Id})
 			}
 		}
+		f.stopFlowHandlerRoutine[i] = nil
 	}
+	f.stopFlowHandlerRoutine = nil
+
+	if f.incomingFlows != nil {
+		for k := range f.incomingFlows {
+			if f.incomingFlows[k] != nil {
+				f.incomingFlows[k] = nil
+			}
+		}
+		f.incomingFlows = nil
+	}
+
 	wg.Done()
 	logger.Debugw(ctx, "stopped all flow handler routines", log.Fields{"ponPortIdx": f.ponPortIdx})
 }
diff --git a/internal/pkg/resourcemanager/resourcemanager.go b/internal/pkg/resourcemanager/resourcemanager.go
index c6aa848..7e4044b 100755
--- a/internal/pkg/resourcemanager/resourcemanager.go
+++ b/internal/pkg/resourcemanager/resourcemanager.go
@@ -198,6 +198,17 @@
 	return kvbackend
 }
 
+// CloseKVClient closes open KV clients
+func (rsrcMgr *OpenOltResourceMgr) CloseKVClient(ctx context.Context) {
+	if rsrcMgr.KVStore != nil {
+		rsrcMgr.KVStore.Client.Close(ctx)
+		rsrcMgr.KVStore = nil
+	}
+	if rsrcMgr.PonRsrMgr != nil {
+		rsrcMgr.PonRsrMgr.CloseKVClient(ctx)
+	}
+}
+
 // NewResourceMgr init a New resource manager instance which in turn instantiates pon resource manager
 // instances according to technology. Initializes the default resource ranges for all
 // the resources.
diff --git a/pkg/mocks/mockTechprofile.go b/pkg/mocks/mockTechprofile.go
index 1cc60a2..5234de5 100644
--- a/pkg/mocks/mockTechprofile.go
+++ b/pkg/mocks/mockTechprofile.go
@@ -34,6 +34,10 @@
 	return &db.Backend{Client: &MockKVClient{}}
 }
 
+// CloseKVClient to mock techprofile CloseKVClient method
+func (m MockTechProfile) CloseKVClient(ctx context.Context) {
+}
+
 // GetTPInstance to mock techprofile GetTPInstance method
 func (m MockTechProfile) GetTPInstance(ctx context.Context, path string) (interface{}, error) {
 	logger.Debug(ctx, "GetTPInstanceFromKVStore")
diff --git a/vendor/github.com/opencord/voltha-lib-go/v7/pkg/ponresourcemanager/ponresourcemanager.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/ponresourcemanager/ponresourcemanager.go
index d751723..807d4c8 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v7/pkg/ponresourcemanager/ponresourcemanager.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/ponresourcemanager/ponresourcemanager.go
@@ -192,6 +192,17 @@
 	return kvbackend
 }
 
+func (PONRMgr *PONResourceManager) CloseKVClient(ctx context.Context) {
+	if PONRMgr.KVStore != nil {
+		PONRMgr.KVStore.Client.Close(ctx)
+		PONRMgr.KVStore = nil
+	}
+	if PONRMgr.KVStoreForConfig != nil {
+		PONRMgr.KVStoreForConfig.Client.Close(ctx)
+		PONRMgr.KVStoreForConfig = nil
+	}
+}
+
 // NewPONResourceManager creates a new PON resource manager.
 func NewPONResourceManager(ctx context.Context, Technology string, DeviceType string, DeviceID string, Backend string, Address string, basePathKvStore string) (*PONResourceManager, error) {
 	var PONMgr PONResourceManager
diff --git a/vendor/github.com/opencord/voltha-lib-go/v7/pkg/techprofile/tech_profile.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/techprofile/tech_profile.go
index 6d68f5a..dce43ed 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v7/pkg/techprofile/tech_profile.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/techprofile/tech_profile.go
@@ -178,6 +178,21 @@
 	*/
 }
 
+func (t *TechProfileMgr) CloseKVClient(ctx context.Context) {
+	if t.config.KVBackend != nil {
+		t.config.KVBackend.Client.Close(ctx)
+		t.config.KVBackend = nil
+	}
+	if t.config.DefaultTpKVBackend != nil {
+		t.config.DefaultTpKVBackend.Client.Close(ctx)
+		t.config.DefaultTpKVBackend = nil
+	}
+	if t.config.ResourceInstanceKVBacked != nil {
+		t.config.ResourceInstanceKVBacked.Client.Close(ctx)
+		t.config.ResourceInstanceKVBacked = nil
+	}
+}
+
 func NewTechProfile(ctx context.Context, resourceMgr iPonResourceMgr, kvStoreType string, kvStoreAddress string, basePathKvStore string) (*TechProfileMgr, error) {
 	var techprofileObj TechProfileMgr
 	logger.Debug(ctx, "initializing-techprofile-mananger")
diff --git a/vendor/github.com/opencord/voltha-lib-go/v7/pkg/techprofile/tech_profile_if.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/techprofile/tech_profile_if.go
index 89a66b2..c936a56 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v7/pkg/techprofile/tech_profile_if.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/techprofile/tech_profile_if.go
@@ -25,6 +25,7 @@
 
 type TechProfileIf interface {
 	SetKVClient(ctx context.Context, pathPrefix string) *db.Backend
+	CloseKVClient(ctx context.Context)
 	GetTechProfileInstanceKey(ctx context.Context, tpID uint32, uniPortName string) string
 	GetTPInstance(ctx context.Context, path string) (interface{}, error)
 	CreateTechProfileInstance(ctx context.Context, tpID uint32, uniPortName string, intfID uint32) (interface{}, error)
diff --git a/vendor/modules.txt b/vendor/modules.txt
index 6de13ee..faaca9c 100644
--- a/vendor/modules.txt
+++ b/vendor/modules.txt
@@ -135,7 +135,7 @@
 github.com/klauspost/compress/huff0
 github.com/klauspost/compress/zstd
 github.com/klauspost/compress/zstd/internal/xxhash
-# github.com/opencord/voltha-lib-go/v7 v7.3.2
+# github.com/opencord/voltha-lib-go/v7 v7.4.1
 ## explicit
 github.com/opencord/voltha-lib-go/v7/pkg/config
 github.com/opencord/voltha-lib-go/v7/pkg/db