[VOL-4931] openoltAdapter: memory leak seen in long term tests
Change-Id: Ieb60bd9b4bdf88fc22a3b6704854591c0c30717d
diff --git a/VERSION b/VERSION
index a50236a..f77856a 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-4.2.12
+4.3.1
diff --git a/go.mod b/go.mod
index de6b24b..fe0cc84 100644
--- a/go.mod
+++ b/go.mod
@@ -13,7 +13,7 @@
github.com/gogo/protobuf v1.3.2
github.com/golang/protobuf v1.5.2
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
- github.com/opencord/voltha-lib-go/v7 v7.3.2
+ github.com/opencord/voltha-lib-go/v7 v7.4.1
github.com/opencord/voltha-protos/v5 v5.3.8
github.com/stretchr/testify v1.7.0
go.etcd.io/etcd v3.3.25+incompatible
diff --git a/go.sum b/go.sum
index 8057192..ae47da0 100644
--- a/go.sum
+++ b/go.sum
@@ -198,8 +198,8 @@
github.com/onsi/gomega v1.10.3/go.mod h1:V9xEwhxec5O8UDM77eCW8vLymOMltsqPVYWrpDsH8xc=
github.com/onsi/gomega v1.14.0 h1:ep6kpPVwmr/nTbklSx2nrLNSIO62DoYAhnPNIMhK8gI=
github.com/onsi/gomega v1.14.0/go.mod h1:cIuvLEne0aoVhAgh/O6ac0Op8WWw9H6eYCriF+tEHG0=
-github.com/opencord/voltha-lib-go/v7 v7.3.2 h1:mvQE+HTf3sLXIMulkDQJbbR67lIaV/Y6IIj1co0vrhU=
-github.com/opencord/voltha-lib-go/v7 v7.3.2/go.mod h1:3XnWQBHALGZTm5n3j401zKGG9aL2UqSU3/owGwNmcxM=
+github.com/opencord/voltha-lib-go/v7 v7.4.1 h1:8tLKwQMOxkBb2vvKtDfUx8uoD2p2lkogwxJDd/PFaIA=
+github.com/opencord/voltha-lib-go/v7 v7.4.1/go.mod h1:3XnWQBHALGZTm5n3j401zKGG9aL2UqSU3/owGwNmcxM=
github.com/opencord/voltha-protos/v5 v5.3.8 h1:tL8I1wtOfuMnKMQvgN1Ul+8YL/LTBm0PpNuxU1usGDw=
github.com/opencord/voltha-protos/v5 v5.3.8/go.mod h1:ZGcyW79kQKIo7AySo1LRu613E6uiozixrCF0yNB/4x8=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index 5dd278c..dadd316 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -265,6 +265,24 @@
return kvbackend
}
+// CloseKVClient closes open KV clients
+func (dh *DeviceHandler) CloseKVClient(ctx context.Context) {
+ if dh.resourceMgr != nil {
+ for _, rscMgr := range dh.resourceMgr {
+ if rscMgr != nil {
+ rscMgr.CloseKVClient(ctx)
+ }
+ }
+ }
+ if dh.flowMgr != nil {
+ for _, flMgr := range dh.flowMgr {
+ if flMgr != nil {
+ flMgr.CloseKVClient(ctx)
+ }
+ }
+ }
+}
+
// start save the device to the data model
func (dh *DeviceHandler) start(ctx context.Context) {
dh.lockDevice.Lock()
@@ -974,6 +992,12 @@
var err error
dh.deviceInfo, err = dh.populateDeviceInfo(ctx)
+ if dh.flowMgr != nil {
+ dh.StopAllFlowRoutines(ctx)
+ }
+
+ dh.CloseKVClient(ctx)
+
if err != nil {
return olterrors.NewErrAdapter("populate-device-info-failed", log.Fields{"device-id": dh.device.Id}, err)
}
@@ -2190,20 +2214,8 @@
*/
dh.setDeviceDeletionInProgressFlag(true)
- var wg sync.WaitGroup
- wg.Add(1) // for the mcast routine below to finish
- go dh.StopAllMcastHandlerRoutines(ctx, &wg)
- for _, flMgr := range dh.flowMgr {
- if flMgr != nil {
- wg.Add(1) // for the flow handler routine below to finish
- go flMgr.StopAllFlowHandlerRoutines(ctx, &wg)
- }
- }
- if !dh.waitForTimeoutOrCompletion(&wg, time.Second*30) {
- logger.Warnw(ctx, "timed out waiting for stopping flow and group handlers", log.Fields{"deviceID": device.Id})
- } else {
- logger.Infow(ctx, "all flow and group handlers shutdown gracefully", log.Fields{"deviceID": device.Id})
- }
+
+ dh.StopAllFlowRoutines(ctx)
dh.cleanupDeviceResources(ctx)
logger.Debugw(ctx, "removed-device-from-Resource-manager-KV-store", log.Fields{"device-id": dh.device.Id})
@@ -2238,6 +2250,25 @@
dh.deleteAdapterClients(ctx)
return nil
}
+
+// StopAllFlowRoutines stops all flow routines
+func (dh *DeviceHandler) StopAllFlowRoutines(ctx context.Context) {
+ var wg sync.WaitGroup
+ wg.Add(1) // for the mcast routine below to finish
+ go dh.StopAllMcastHandlerRoutines(ctx, &wg)
+ for _, flMgr := range dh.flowMgr {
+ if flMgr != nil {
+ wg.Add(1) // for the flow handler routine below to finish
+ go flMgr.StopAllFlowHandlerRoutines(ctx, &wg)
+ }
+ }
+ if !dh.waitForTimeoutOrCompletion(&wg, time.Second*30) {
+ logger.Warnw(ctx, "timed out waiting for stopping flow and group handlers", log.Fields{"deviceID": dh.device.Id})
+ } else {
+ logger.Infow(ctx, "all flow and group handlers shutdown gracefully", log.Fields{"deviceID": dh.device.Id})
+ }
+}
+
func (dh *DeviceHandler) cleanupDeviceResources(ctx context.Context) {
if dh.resourceMgr != nil {
@@ -2262,6 +2293,8 @@
_ = dh.resourceMgr[dh.totalPonPorts].DeleteAllFlowIDsForGemForIntf(ctx)
}
+ dh.CloseKVClient(ctx)
+
// Take one final sweep at cleaning up KV store for the OLT device
// Clean everything at <base-path-prefix>/openolt/<device-id>
kvClient, err := kvstore.NewEtcdClient(ctx, dh.openOLT.KVStoreAddress, rsrcMgr.KvstoreTimeout, log.FatalLevel)
@@ -2273,6 +2306,7 @@
Timeout: rsrcMgr.KvstoreTimeout,
PathPrefix: fmt.Sprintf(rsrcMgr.BasePathKvStore, dh.cm.Backend.PathPrefix, dh.device.Id)}
_ = kvBackend.DeleteWithPrefix(ctx, "")
+ kvBackend.Client.Close(ctx)
}
/*Delete ONU map for the device*/
@@ -2658,6 +2692,7 @@
go dh.eventMgr.oltCommunicationEvent(ctx, cloned, raisedTs)
dh.cleanupDeviceResources(ctx)
+ logger.Debugw(ctx, "removed-device-from-Resource-manager-KV-store", log.Fields{"device-id": dh.device.Id})
dh.lockDevice.RLock()
// Stop the Stats collector
@@ -2670,20 +2705,7 @@
}
dh.lockDevice.RUnlock()
- var wg sync.WaitGroup
- wg.Add(1) // for the multicast handler routine
- go dh.StopAllMcastHandlerRoutines(ctx, &wg)
- for _, flMgr := range dh.flowMgr {
- if flMgr != nil {
- wg.Add(1) // for the flow handler routine
- go flMgr.StopAllFlowHandlerRoutines(ctx, &wg)
- }
- }
- if !dh.waitForTimeoutOrCompletion(&wg, time.Second*30) {
- logger.Warnw(ctx, "timed out waiting for stopping flow and group handlers", log.Fields{"deviceID": device.Id})
- } else {
- logger.Infow(ctx, "all flow and group handlers shutdown gracefully", log.Fields{"deviceID": device.Id})
- }
+ dh.StopAllFlowRoutines(ctx)
//reset adapter reconcile flag
dh.adapterPreviouslyConnected = false
@@ -3214,6 +3236,16 @@
}
}
}
+
+ if dh.incomingMcastFlowOrGroup != nil {
+ for k := range dh.incomingMcastFlowOrGroup {
+ if dh.incomingMcastFlowOrGroup[k] != nil {
+ dh.incomingMcastFlowOrGroup[k] = nil
+ }
+ }
+ dh.incomingMcastFlowOrGroup = nil
+ }
+
wg.Done()
logger.Debug(ctx, "stopped all mcast handler routines")
}
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index b06af00..087f234 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -216,6 +216,13 @@
flowHandlerRoutineActive []bool
}
+// CloseKVClient closes open KV clients
+func (f *OpenOltFlowMgr) CloseKVClient(ctx context.Context) {
+ if f.techprofile != nil {
+ f.techprofile.CloseKVClient(ctx)
+ }
+}
+
//NewFlowManager creates OpenOltFlowMgr object and initializes the parameters
func NewFlowManager(ctx context.Context, dh *DeviceHandler, rMgr *rsrcMgr.OpenOltResourceMgr, grpMgr *OpenOltGroupMgr, ponPortIdx uint32) *OpenOltFlowMgr {
logger.Infow(ctx, "initializing-flow-manager", log.Fields{"device-id": dh.device.Id})
@@ -2313,7 +2320,19 @@
logger.Warnw(ctx, "timeout stopping flow handler routine", log.Fields{"onuID": i, "deviceID": f.deviceHandler.device.Id})
}
}
+ f.stopFlowHandlerRoutine[i] = nil
}
+ f.stopFlowHandlerRoutine = nil
+
+ if f.incomingFlows != nil {
+ for k := range f.incomingFlows {
+ if f.incomingFlows[k] != nil {
+ f.incomingFlows[k] = nil
+ }
+ }
+ f.incomingFlows = nil
+ }
+
wg.Done()
logger.Debugw(ctx, "stopped all flow handler routines", log.Fields{"ponPortIdx": f.ponPortIdx})
}
diff --git a/internal/pkg/resourcemanager/resourcemanager.go b/internal/pkg/resourcemanager/resourcemanager.go
index c6aa848..7e4044b 100755
--- a/internal/pkg/resourcemanager/resourcemanager.go
+++ b/internal/pkg/resourcemanager/resourcemanager.go
@@ -198,6 +198,17 @@
return kvbackend
}
+// CloseKVClient closes open KV clients
+func (rsrcMgr *OpenOltResourceMgr) CloseKVClient(ctx context.Context) {
+ if rsrcMgr.KVStore != nil {
+ rsrcMgr.KVStore.Client.Close(ctx)
+ rsrcMgr.KVStore = nil
+ }
+ if rsrcMgr.PonRsrMgr != nil {
+ rsrcMgr.PonRsrMgr.CloseKVClient(ctx)
+ }
+}
+
// NewResourceMgr init a New resource manager instance which in turn instantiates pon resource manager
// instances according to technology. Initializes the default resource ranges for all
// the resources.
diff --git a/pkg/mocks/mockTechprofile.go b/pkg/mocks/mockTechprofile.go
index 1cc60a2..5234de5 100644
--- a/pkg/mocks/mockTechprofile.go
+++ b/pkg/mocks/mockTechprofile.go
@@ -34,6 +34,10 @@
return &db.Backend{Client: &MockKVClient{}}
}
+// CloseKVClient to mock techprofile CloseKVClient method
+func (m MockTechProfile) CloseKVClient(ctx context.Context) {
+}
+
// GetTPInstance to mock techprofile GetTPInstance method
func (m MockTechProfile) GetTPInstance(ctx context.Context, path string) (interface{}, error) {
logger.Debug(ctx, "GetTPInstanceFromKVStore")
diff --git a/vendor/github.com/opencord/voltha-lib-go/v7/pkg/ponresourcemanager/ponresourcemanager.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/ponresourcemanager/ponresourcemanager.go
index d751723..807d4c8 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v7/pkg/ponresourcemanager/ponresourcemanager.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/ponresourcemanager/ponresourcemanager.go
@@ -192,6 +192,17 @@
return kvbackend
}
+func (PONRMgr *PONResourceManager) CloseKVClient(ctx context.Context) {
+ if PONRMgr.KVStore != nil {
+ PONRMgr.KVStore.Client.Close(ctx)
+ PONRMgr.KVStore = nil
+ }
+ if PONRMgr.KVStoreForConfig != nil {
+ PONRMgr.KVStoreForConfig.Client.Close(ctx)
+ PONRMgr.KVStoreForConfig = nil
+ }
+}
+
// NewPONResourceManager creates a new PON resource manager.
func NewPONResourceManager(ctx context.Context, Technology string, DeviceType string, DeviceID string, Backend string, Address string, basePathKvStore string) (*PONResourceManager, error) {
var PONMgr PONResourceManager
diff --git a/vendor/github.com/opencord/voltha-lib-go/v7/pkg/techprofile/tech_profile.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/techprofile/tech_profile.go
index 6d68f5a..dce43ed 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v7/pkg/techprofile/tech_profile.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/techprofile/tech_profile.go
@@ -178,6 +178,21 @@
*/
}
+func (t *TechProfileMgr) CloseKVClient(ctx context.Context) {
+ if t.config.KVBackend != nil {
+ t.config.KVBackend.Client.Close(ctx)
+ t.config.KVBackend = nil
+ }
+ if t.config.DefaultTpKVBackend != nil {
+ t.config.DefaultTpKVBackend.Client.Close(ctx)
+ t.config.DefaultTpKVBackend = nil
+ }
+ if t.config.ResourceInstanceKVBacked != nil {
+ t.config.ResourceInstanceKVBacked.Client.Close(ctx)
+ t.config.ResourceInstanceKVBacked = nil
+ }
+}
+
func NewTechProfile(ctx context.Context, resourceMgr iPonResourceMgr, kvStoreType string, kvStoreAddress string, basePathKvStore string) (*TechProfileMgr, error) {
var techprofileObj TechProfileMgr
logger.Debug(ctx, "initializing-techprofile-mananger")
diff --git a/vendor/github.com/opencord/voltha-lib-go/v7/pkg/techprofile/tech_profile_if.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/techprofile/tech_profile_if.go
index 89a66b2..c936a56 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v7/pkg/techprofile/tech_profile_if.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/techprofile/tech_profile_if.go
@@ -25,6 +25,7 @@
type TechProfileIf interface {
SetKVClient(ctx context.Context, pathPrefix string) *db.Backend
+ CloseKVClient(ctx context.Context)
GetTechProfileInstanceKey(ctx context.Context, tpID uint32, uniPortName string) string
GetTPInstance(ctx context.Context, path string) (interface{}, error)
CreateTechProfileInstance(ctx context.Context, tpID uint32, uniPortName string, intfID uint32) (interface{}, error)
diff --git a/vendor/modules.txt b/vendor/modules.txt
index 6de13ee..faaca9c 100644
--- a/vendor/modules.txt
+++ b/vendor/modules.txt
@@ -135,7 +135,7 @@
github.com/klauspost/compress/huff0
github.com/klauspost/compress/zstd
github.com/klauspost/compress/zstd/internal/xxhash
-# github.com/opencord/voltha-lib-go/v7 v7.3.2
+# github.com/opencord/voltha-lib-go/v7 v7.4.1
## explicit
github.com/opencord/voltha-lib-go/v7/pkg/config
github.com/opencord/voltha-lib-go/v7/pkg/db