[VOL-2108] Updated with fix for concurrent access of map in deviceHandler
Updated with review comments
Updated with sync.map
Updated with review comments
Change-Id: Iabe2557cf8e5e4a7863d86467a13bc49e24c628d
diff --git a/adaptercore/device_handler.go b/adaptercore/device_handler.go
index 26b62b3..9f38133 100644
--- a/adaptercore/device_handler.go
+++ b/adaptercore/device_handler.go
@@ -74,8 +74,9 @@
flowMgr *OpenOltFlowMgr
eventMgr *OpenOltEventMgr
resourceMgr *rsrcMgr.OpenOltResourceMgr
- discOnus map[string]bool
- onus map[string]*OnuDevice
+
+ discOnus sync.Map
+ onus sync.Map
portStats *OpenOltStatisticsMgr
metrics *pmmetrics.PmMetrics
stopCollector chan bool
@@ -129,9 +130,7 @@
dh.device = cloned
dh.openOLT = adapter
dh.exitChannel = make(chan int, 1)
- dh.discOnus = make(map[string]bool)
dh.lockDevice = sync.RWMutex{}
- dh.onus = make(map[string]*OnuDevice)
dh.stopCollector = make(chan bool, 2)
dh.metrics = pmmetrics.NewPmMetrics(cloned.Id, pmmetrics.Frequency(150), pmmetrics.FrequencyOverride(false), pmmetrics.Grouped(false), pmmetrics.Metrics(pmNames))
//TODO initialize the support classes.
@@ -437,7 +436,7 @@
}
/* Discovered ONUs entries need to be cleared , since after OLT
is up, it starts sending discovery indications again*/
- dh.discOnus = make(map[string]bool)
+ dh.discOnus = sync.Map{}
log.Debugw("do-state-down-end", log.Fields{"deviceID": device.Id})
return nil
}
@@ -674,7 +673,9 @@
var proxyDeviceID string
onuKey := dh.formOnuKey(omciInd.IntfId, omciInd.OnuId)
- if onuInCache, ok := dh.onus[onuKey]; !ok {
+
+ if onuInCache, ok := dh.onus.Load(onuKey); !ok {
+
log.Debugw("omci indication for a device not in cache.", log.Fields{"intfID": omciInd.IntfId, "onuID": omciInd.OnuId})
ponPort := IntfIDToPortNo(omciInd.GetIntfId(), voltha.Port_PON_OLT)
kwargs := make(map[string]interface{})
@@ -690,13 +691,13 @@
deviceID = onuDevice.Id
proxyDeviceID = onuDevice.ProxyAddress.DeviceId
//if not exist in cache, then add to cache.
- dh.onus[onuKey] = NewOnuDevice(deviceID, deviceType, onuDevice.SerialNumber, omciInd.OnuId, omciInd.IntfId, proxyDeviceID)
+ dh.onus.Store(onuKey, NewOnuDevice(deviceID, deviceType, onuDevice.SerialNumber, omciInd.OnuId, omciInd.IntfId, proxyDeviceID))
} else {
//found in cache
log.Debugw("omci indication for a device in cache.", log.Fields{"intfID": omciInd.IntfId, "onuID": omciInd.OnuId})
- deviceType = onuInCache.deviceType
- deviceID = onuInCache.deviceID
- proxyDeviceID = onuInCache.proxyDeviceID
+ deviceType = onuInCache.(*OnuDevice).deviceType
+ deviceID = onuInCache.(*OnuDevice).deviceID
+ proxyDeviceID = onuInCache.(*OnuDevice).proxyDeviceID
}
omciMsg := &ic.InterAdapterOmciMessage{Message: omciInd.Pkt}
@@ -802,16 +803,14 @@
parentPortNo := IntfIDToPortNo(onuDiscInd.GetIntfId(), voltha.Port_PON_OLT)
log.Debugw("new-discovery-indication", log.Fields{"sn": sn})
- dh.lockDevice.Lock()
- if _, ok := dh.discOnus[sn]; ok {
- dh.lockDevice.Unlock()
+
+ if _, ok := dh.discOnus.Load(sn); ok {
+
log.Debugw("onu-sn-is-already-being-processed", log.Fields{"sn": sn})
return
}
- dh.discOnus[sn] = true
- log.Debugw("new-discovery-indications-list", log.Fields{"discOnus": dh.discOnus})
- dh.lockDevice.Unlock()
+ dh.discOnus.Store(sn, true)
kwargs := make(map[string]interface{})
if sn != "" {
@@ -853,10 +852,9 @@
"intfId": onuDiscInd.GetIntfId()})
onuKey := dh.formOnuKey(onuDiscInd.GetIntfId(), onuID)
- dh.lockDevice.Lock()
- dh.onus[onuKey] = NewOnuDevice(onuDevice.Id, onuDevice.Type, onuDevice.SerialNumber, onuID, onuDiscInd.GetIntfId(), onuDevice.ProxyAddress.DeviceId)
- log.Debugw("new-onu-device-discovered", log.Fields{"onu": dh.onus[onuKey]})
- dh.lockDevice.Unlock()
+ onuDev := NewOnuDevice(onuDevice.Id, onuDevice.Type, onuDevice.SerialNumber, onuID, onuDiscInd.GetIntfId(), onuDevice.ProxyAddress.DeviceId)
+ dh.onus.Store(onuKey, onuDev)
+ log.Debugw("new-onu-device-discovered", log.Fields{"onu": onuDev})
err = dh.coreProxy.DeviceStateUpdate(context.TODO(), onuDevice.Id, common.ConnectStatus_REACHABLE, common.OperStatus_DISCOVERED)
if err != nil {
@@ -882,10 +880,12 @@
log.Debugw("ONU indication key create", log.Fields{"onuId": onuInd.OnuId,
"intfId": onuInd.GetIntfId()})
onuKey := dh.formOnuKey(onuInd.GetIntfId(), onuInd.OnuId)
- if onuInCache, ok := dh.onus[onuKey]; ok {
+
+ if onuInCache, ok := dh.onus.Load(onuKey); ok {
+
//If ONU id is discovered before then use GetDevice to get onuDevice because it is cheaper.
foundInCache = true
- onuDevice, _ = dh.coreProxy.GetDevice(nil, dh.device.Id, onuInCache.deviceID)
+ onuDevice, _ = dh.coreProxy.GetDevice(nil, dh.device.Id, onuInCache.(*OnuDevice).deviceID)
} else {
//If ONU not found in adapter cache then we have to use GetChildDevice to get onuDevice
if serialNumber != "" {
@@ -908,9 +908,9 @@
}
if !foundInCache {
onuKey := dh.formOnuKey(onuInd.GetIntfId(), onuInd.GetOnuId())
- dh.lockDevice.Lock()
- dh.onus[onuKey] = NewOnuDevice(onuDevice.Id, onuDevice.Type, onuDevice.SerialNumber, onuInd.GetOnuId(), onuInd.GetIntfId(), onuDevice.ProxyAddress.DeviceId)
- dh.lockDevice.Unlock()
+
+ dh.onus.Store(onuKey, NewOnuDevice(onuDevice.Id, onuDevice.Type, onuDevice.SerialNumber, onuInd.GetOnuId(), onuInd.GetIntfId(), onuDevice.ProxyAddress.DeviceId))
+
}
dh.updateOnuStates(onuDevice, onuInd, foundInCache)
@@ -1024,12 +1024,11 @@
// AddUniPortToOnu adds the uni port to the onu device
func (dh *DeviceHandler) AddUniPortToOnu(intfID, onuID, uniPort uint32) {
onuKey := dh.formOnuKey(intfID, onuID)
- dh.lockDevice.Lock()
- defer dh.lockDevice.Unlock()
- if onuDevice, ok := dh.onus[onuKey]; ok {
+
+ if onuDevice, ok := dh.onus.Load(onuKey); ok {
// add it to the uniPort map for the onu device
- if _, ok = onuDevice.uniPorts[uniPort]; !ok {
- onuDevice.uniPorts[uniPort] = struct{}{}
+ if _, ok = onuDevice.(*OnuDevice).uniPorts[uniPort]; !ok {
+ onuDevice.(*OnuDevice).uniPorts[uniPort] = struct{}{}
log.Debugw("adding-uni-port", log.Fields{"port": uniPort, "intfID": intfID, "onuId": onuID})
}
}
@@ -1083,10 +1082,10 @@
log.Debugw("olt-disabled", log.Fields{"deviceID": device.Id})
/* Discovered ONUs entries need to be cleared , since on device disable the child devices goes to
UNREACHABLE state which needs to be configured again*/
- dh.lockDevice.Lock()
- dh.discOnus = make(map[string]bool)
- dh.onus = make(map[string]*OnuDevice)
- dh.lockDevice.Unlock()
+
+ dh.discOnus = sync.Map{}
+ dh.onus = sync.Map{}
+
go dh.notifyChildDevices()
cloned := proto.Clone(device).(*voltha.Device)
// Update the all ports state on that device to disable
@@ -1275,9 +1274,11 @@
}
/*Delete ONU map for the device*/
- for onu := range dh.onus {
- delete(dh.onus, onu)
- }
+ dh.onus.Range(func(key interface{}, value interface{}) bool {
+ dh.onus.Delete(key)
+ return true
+ })
+
log.Debug("Removed-device-from-Resource-manager-KV-store")
// Stop the Stats collector
dh.stopCollector <- true