[VOL-2108] Updated with fix for concurrent access of map in deviceHandler
Updated with review comments
Updated with sync.map
Updated with review comments
Change-Id: Iabe2557cf8e5e4a7863d86467a13bc49e24c628d
diff --git a/adaptercore/device_handler.go b/adaptercore/device_handler.go
index 26b62b3..9f38133 100644
--- a/adaptercore/device_handler.go
+++ b/adaptercore/device_handler.go
@@ -74,8 +74,9 @@
flowMgr *OpenOltFlowMgr
eventMgr *OpenOltEventMgr
resourceMgr *rsrcMgr.OpenOltResourceMgr
- discOnus map[string]bool
- onus map[string]*OnuDevice
+
+ discOnus sync.Map
+ onus sync.Map
portStats *OpenOltStatisticsMgr
metrics *pmmetrics.PmMetrics
stopCollector chan bool
@@ -129,9 +130,7 @@
dh.device = cloned
dh.openOLT = adapter
dh.exitChannel = make(chan int, 1)
- dh.discOnus = make(map[string]bool)
dh.lockDevice = sync.RWMutex{}
- dh.onus = make(map[string]*OnuDevice)
dh.stopCollector = make(chan bool, 2)
dh.metrics = pmmetrics.NewPmMetrics(cloned.Id, pmmetrics.Frequency(150), pmmetrics.FrequencyOverride(false), pmmetrics.Grouped(false), pmmetrics.Metrics(pmNames))
//TODO initialize the support classes.
@@ -437,7 +436,7 @@
}
/* Discovered ONUs entries need to be cleared , since after OLT
is up, it starts sending discovery indications again*/
- dh.discOnus = make(map[string]bool)
+ dh.discOnus = sync.Map{}
log.Debugw("do-state-down-end", log.Fields{"deviceID": device.Id})
return nil
}
@@ -674,7 +673,9 @@
var proxyDeviceID string
onuKey := dh.formOnuKey(omciInd.IntfId, omciInd.OnuId)
- if onuInCache, ok := dh.onus[onuKey]; !ok {
+
+ if onuInCache, ok := dh.onus.Load(onuKey); !ok {
+
log.Debugw("omci indication for a device not in cache.", log.Fields{"intfID": omciInd.IntfId, "onuID": omciInd.OnuId})
ponPort := IntfIDToPortNo(omciInd.GetIntfId(), voltha.Port_PON_OLT)
kwargs := make(map[string]interface{})
@@ -690,13 +691,13 @@
deviceID = onuDevice.Id
proxyDeviceID = onuDevice.ProxyAddress.DeviceId
//if not exist in cache, then add to cache.
- dh.onus[onuKey] = NewOnuDevice(deviceID, deviceType, onuDevice.SerialNumber, omciInd.OnuId, omciInd.IntfId, proxyDeviceID)
+ dh.onus.Store(onuKey, NewOnuDevice(deviceID, deviceType, onuDevice.SerialNumber, omciInd.OnuId, omciInd.IntfId, proxyDeviceID))
} else {
//found in cache
log.Debugw("omci indication for a device in cache.", log.Fields{"intfID": omciInd.IntfId, "onuID": omciInd.OnuId})
- deviceType = onuInCache.deviceType
- deviceID = onuInCache.deviceID
- proxyDeviceID = onuInCache.proxyDeviceID
+ deviceType = onuInCache.(*OnuDevice).deviceType
+ deviceID = onuInCache.(*OnuDevice).deviceID
+ proxyDeviceID = onuInCache.(*OnuDevice).proxyDeviceID
}
omciMsg := &ic.InterAdapterOmciMessage{Message: omciInd.Pkt}
@@ -802,16 +803,14 @@
parentPortNo := IntfIDToPortNo(onuDiscInd.GetIntfId(), voltha.Port_PON_OLT)
log.Debugw("new-discovery-indication", log.Fields{"sn": sn})
- dh.lockDevice.Lock()
- if _, ok := dh.discOnus[sn]; ok {
- dh.lockDevice.Unlock()
+
+ if _, ok := dh.discOnus.Load(sn); ok {
+
log.Debugw("onu-sn-is-already-being-processed", log.Fields{"sn": sn})
return
}
- dh.discOnus[sn] = true
- log.Debugw("new-discovery-indications-list", log.Fields{"discOnus": dh.discOnus})
- dh.lockDevice.Unlock()
+ dh.discOnus.Store(sn, true)
kwargs := make(map[string]interface{})
if sn != "" {
@@ -853,10 +852,9 @@
"intfId": onuDiscInd.GetIntfId()})
onuKey := dh.formOnuKey(onuDiscInd.GetIntfId(), onuID)
- dh.lockDevice.Lock()
- dh.onus[onuKey] = NewOnuDevice(onuDevice.Id, onuDevice.Type, onuDevice.SerialNumber, onuID, onuDiscInd.GetIntfId(), onuDevice.ProxyAddress.DeviceId)
- log.Debugw("new-onu-device-discovered", log.Fields{"onu": dh.onus[onuKey]})
- dh.lockDevice.Unlock()
+ onuDev := NewOnuDevice(onuDevice.Id, onuDevice.Type, onuDevice.SerialNumber, onuID, onuDiscInd.GetIntfId(), onuDevice.ProxyAddress.DeviceId)
+ dh.onus.Store(onuKey, onuDev)
+ log.Debugw("new-onu-device-discovered", log.Fields{"onu": onuDev})
err = dh.coreProxy.DeviceStateUpdate(context.TODO(), onuDevice.Id, common.ConnectStatus_REACHABLE, common.OperStatus_DISCOVERED)
if err != nil {
@@ -882,10 +880,12 @@
log.Debugw("ONU indication key create", log.Fields{"onuId": onuInd.OnuId,
"intfId": onuInd.GetIntfId()})
onuKey := dh.formOnuKey(onuInd.GetIntfId(), onuInd.OnuId)
- if onuInCache, ok := dh.onus[onuKey]; ok {
+
+ if onuInCache, ok := dh.onus.Load(onuKey); ok {
+
//If ONU id is discovered before then use GetDevice to get onuDevice because it is cheaper.
foundInCache = true
- onuDevice, _ = dh.coreProxy.GetDevice(nil, dh.device.Id, onuInCache.deviceID)
+ onuDevice, _ = dh.coreProxy.GetDevice(nil, dh.device.Id, onuInCache.(*OnuDevice).deviceID)
} else {
//If ONU not found in adapter cache then we have to use GetChildDevice to get onuDevice
if serialNumber != "" {
@@ -908,9 +908,9 @@
}
if !foundInCache {
onuKey := dh.formOnuKey(onuInd.GetIntfId(), onuInd.GetOnuId())
- dh.lockDevice.Lock()
- dh.onus[onuKey] = NewOnuDevice(onuDevice.Id, onuDevice.Type, onuDevice.SerialNumber, onuInd.GetOnuId(), onuInd.GetIntfId(), onuDevice.ProxyAddress.DeviceId)
- dh.lockDevice.Unlock()
+
+ dh.onus.Store(onuKey, NewOnuDevice(onuDevice.Id, onuDevice.Type, onuDevice.SerialNumber, onuInd.GetOnuId(), onuInd.GetIntfId(), onuDevice.ProxyAddress.DeviceId))
+
}
dh.updateOnuStates(onuDevice, onuInd, foundInCache)
@@ -1024,12 +1024,11 @@
// AddUniPortToOnu adds the uni port to the onu device
func (dh *DeviceHandler) AddUniPortToOnu(intfID, onuID, uniPort uint32) {
onuKey := dh.formOnuKey(intfID, onuID)
- dh.lockDevice.Lock()
- defer dh.lockDevice.Unlock()
- if onuDevice, ok := dh.onus[onuKey]; ok {
+
+ if onuDevice, ok := dh.onus.Load(onuKey); ok {
// add it to the uniPort map for the onu device
- if _, ok = onuDevice.uniPorts[uniPort]; !ok {
- onuDevice.uniPorts[uniPort] = struct{}{}
+ if _, ok = onuDevice.(*OnuDevice).uniPorts[uniPort]; !ok {
+ onuDevice.(*OnuDevice).uniPorts[uniPort] = struct{}{}
log.Debugw("adding-uni-port", log.Fields{"port": uniPort, "intfID": intfID, "onuId": onuID})
}
}
@@ -1083,10 +1082,10 @@
log.Debugw("olt-disabled", log.Fields{"deviceID": device.Id})
/* Discovered ONUs entries need to be cleared , since on device disable the child devices goes to
UNREACHABLE state which needs to be configured again*/
- dh.lockDevice.Lock()
- dh.discOnus = make(map[string]bool)
- dh.onus = make(map[string]*OnuDevice)
- dh.lockDevice.Unlock()
+
+ dh.discOnus = sync.Map{}
+ dh.onus = sync.Map{}
+
go dh.notifyChildDevices()
cloned := proto.Clone(device).(*voltha.Device)
// Update the all ports state on that device to disable
@@ -1275,9 +1274,11 @@
}
/*Delete ONU map for the device*/
- for onu := range dh.onus {
- delete(dh.onus, onu)
- }
+ dh.onus.Range(func(key interface{}, value interface{}) bool {
+ dh.onus.Delete(key)
+ return true
+ })
+
log.Debug("Removed-device-from-Resource-manager-KV-store")
// Stop the Stats collector
dh.stopCollector <- true
diff --git a/adaptercore/device_handler_test.go b/adaptercore/device_handler_test.go
index 0e26d29..588796c 100644
--- a/adaptercore/device_handler_test.go
+++ b/adaptercore/device_handler_test.go
@@ -21,6 +21,7 @@
"context"
"net"
"reflect"
+ "sync"
"testing"
"time"
@@ -596,8 +597,10 @@
dh1 := newMockDeviceHandler()
dh2 := negativeDeviceHandler()
dh3 := newMockDeviceHandler()
- dh3.onus = map[string]*OnuDevice{"onu1": NewOnuDevice("onu1", "onu1", "onu1", 1, 1, "onu1"),
- "onu2": NewOnuDevice("onu2", "onu2", "onu2", 2, 2, "onu2")}
+ dh3.onus = sync.Map{}
+ dh3.onus.Store("onu1", NewOnuDevice("onu1", "onu1", "onu1", 1, 1, "onu1"))
+ dh3.onus.Store("onu2", NewOnuDevice("onu2", "onu2", "onu2", 2, 2, "onu2"))
+
type args struct {
indication *oop.Indication
}
@@ -960,7 +963,9 @@
func TestDeviceHandler_onuDiscIndication(t *testing.T) {
dh1 := newMockDeviceHandler()
- dh1.discOnus = map[string]bool{"onu1": true, "onu2": false}
+ dh1.discOnus = sync.Map{}
+ dh1.discOnus.Store("onu1", true)
+ dh1.discOnus.Store("onu2", false)
dh2 := negativeDeviceHandler()
type args struct {
onuDiscInd *oop.OnuDiscIndication
diff --git a/adaptercore/openolt_eventmgr.go b/adaptercore/openolt_eventmgr.go
index 8173ad1..4c74361 100644
--- a/adaptercore/openolt_eventmgr.go
+++ b/adaptercore/openolt_eventmgr.go
@@ -19,6 +19,7 @@
import (
"fmt"
+
"github.com/opencord/voltha-lib-go/v2/pkg/adapters/adapterif"
"github.com/opencord/voltha-lib-go/v2/pkg/log"
oop "github.com/opencord/voltha-protos/v2/go/openolt"
@@ -186,8 +187,9 @@
context := make(map[string]string)
/* Populating event context */
serialNumber = ""
- if onu, ok := em.handler.onus[em.handler.formOnuKey(dgi.IntfId, dgi.OnuId)]; ok {
- serialNumber = onu.serialNumber
+ onu := em.handler.formOnuKey(dgi.IntfId, dgi.OnuId)
+ if onu, ok := em.handler.onus.Load(onu); ok {
+ serialNumber = onu.(*OnuDevice).serialNumber
}
context["serial-number"] = serialNumber
context["intf-id"] = string(dgi.IntfId)
diff --git a/adaptercore/openolt_eventmgr_test.go b/adaptercore/openolt_eventmgr_test.go
index 8012318..4bd7a65 100644
--- a/adaptercore/openolt_eventmgr_test.go
+++ b/adaptercore/openolt_eventmgr_test.go
@@ -18,6 +18,7 @@
package adaptercore
import (
+ "sync"
"testing"
"time"
@@ -28,11 +29,11 @@
func mockEventMgr() *OpenOltEventMgr {
ep := &mocks.MockEventProxy{}
dh := &DeviceHandler{}
- dh.onus = make(map[string]*OnuDevice)
- dh.onus[dh.formOnuKey(1, 1)] = &OnuDevice{deviceID: "TEST_ONU",
+ dh.onus = sync.Map{}
+ dh.onus.Store(dh.formOnuKey(1, 1), &OnuDevice{deviceID: "TEST_ONU",
deviceType: "ONU",
serialNumber: "TEST_ONU_123",
- onuID: 1, intfID: 1}
+ onuID: 1, intfID: 1})
return NewEventMgr(ep, dh)
}
func TestOpenOltEventMgr_ProcessEvents(t *testing.T) {