[VOL-4010] openonuAdapterGo - investigate and resolve data race conditions
Change-Id: I8e957d8bd59b91db27ee4f303a5a222a8f83e8c4
diff --git a/internal/pkg/onuadaptercore/uniportadmin.go b/internal/pkg/onuadaptercore/uniportadmin.go
index 0d7a117..57a1b58 100644
--- a/internal/pkg/onuadaptercore/uniportadmin.go
+++ b/internal/pkg/onuadaptercore/uniportadmin.go
@@ -20,6 +20,7 @@
import (
"context"
"fmt"
+ "sync"
"time"
"github.com/looplab/fsm"
@@ -36,10 +37,12 @@
pDeviceHandler *deviceHandler
deviceID string
pOmciCC *omciCC
+ mutexAdminState sync.RWMutex
adminState bool
requestEvent OnuDeviceEvent
omciLockResponseReceived chan bool //seperate channel needed for checking UNI port OMCi message responses
pAdaptFsm *AdapterFsm
+ mutexPLastTxMeInstance sync.RWMutex
pLastTxMeInstance *me.ManagedEntity
}
@@ -199,15 +202,19 @@
func (oFsm *lockStateFsm) enterSettingOnuGState(ctx context.Context, e *fsm.Event) {
var omciAdminState uint8 = 1 //default locked
+ oFsm.mutexAdminState.RLock()
if !oFsm.adminState {
omciAdminState = 0
}
+ oFsm.mutexAdminState.RUnlock()
logger.Debugw(ctx, "LockStateFSM Tx Set::ONU-G:admin", log.Fields{
"omciAdmin": omciAdminState, "in state": e.FSM.Current(), "device-id": oFsm.deviceID})
requestedAttributes := me.AttributeValueMap{"AdministrativeState": omciAdminState}
+ oFsm.mutexPLastTxMeInstance.Lock()
meInstance, err := oFsm.pOmciCC.sendSetOnuGLS(log.WithSpanFromContext(context.TODO(), ctx), oFsm.pDeviceHandler.pOpenOnuAc.omciTimeout, true,
requestedAttributes, oFsm.pAdaptFsm.commChan)
if err != nil {
+ oFsm.mutexPLastTxMeInstance.Unlock()
logger.Errorw(ctx, "OnuGLS set failed, aborting LockStateFSM", log.Fields{"device-id": oFsm.deviceID})
pLockStateAFsm := oFsm.pAdaptFsm
if pLockStateAFsm != nil {
@@ -223,11 +230,13 @@
// - this avoids misinterpretation of new received OMCI messages
oFsm.pLastTxMeInstance = meInstance
if oFsm.pLastTxMeInstance == nil {
+ oFsm.mutexPLastTxMeInstance.Unlock()
logger.Errorw(ctx, "could not send OMCI message from LockStateFsm", log.Fields{
"device-id": oFsm.deviceID})
//some more sophisticated approach is possible, e.g. repeating once, by now let's reset the state machine in order to release all resources now
pLockStateAFsm := oFsm.pAdaptFsm
if pLockStateAFsm != nil {
+
// obviously calling some FSM event here directly does not work - so trying to decouple it ...
go func(a_pAFsm *AdapterFsm) {
if a_pAFsm != nil && a_pAFsm.pFsm != nil {
@@ -235,12 +244,16 @@
}
}(pLockStateAFsm)
}
+ return
}
+ oFsm.mutexPLastTxMeInstance.Unlock()
}
func (oFsm *lockStateFsm) enterSettingUnisState(ctx context.Context, e *fsm.Event) {
+ oFsm.mutexAdminState.RLock()
logger.Debugw(ctx, "LockStateFSM - starting UniTP adminState loop", log.Fields{
"in state": e.FSM.Current(), "device-id": oFsm.deviceID, "LockState": oFsm.adminState})
+ oFsm.mutexAdminState.RUnlock()
go oFsm.performUniPortAdminSet(ctx)
}
@@ -281,7 +294,9 @@
_ = a_pAFsm.pFsm.Event(uniEvRestart)
}
}(pLockStateAFsm)
+ oFsm.mutexPLastTxMeInstance.Lock()
oFsm.pLastTxMeInstance = nil
+ oFsm.mutexPLastTxMeInstance.Unlock()
}
}
@@ -345,6 +360,7 @@
}
//should never appear, left here for robustness
+ oFsm.mutexPLastTxMeInstance.RLock()
if oFsm.pLastTxMeInstance != nil {
// compare comments above for CreateResponse (apply also here ...)
if msgObj.EntityClass == oFsm.pLastTxMeInstance.GetClassID() &&
@@ -356,18 +372,28 @@
switch oFsm.pLastTxMeInstance.GetName() {
case "OnuG":
{ // let the FSM proceed ...
+ oFsm.mutexPLastTxMeInstance.RUnlock()
_ = oFsm.pAdaptFsm.pFsm.Event(uniEvRxOnugResp)
}
case "PhysicalPathTerminationPointEthernetUni", "VirtualEthernetInterfacePoint":
{ // let the PPTP init proceed by stopping the wait function
+ oFsm.mutexPLastTxMeInstance.RUnlock()
oFsm.omciLockResponseReceived <- true
}
+ default:
+ {
+ logger.Warnw(ctx, "Unsupported ME name received!",
+ log.Fields{"ME name": oFsm.pLastTxMeInstance.GetName(), "device-id": oFsm.deviceID})
+ oFsm.mutexPLastTxMeInstance.RUnlock()
+ }
}
} else {
+ oFsm.mutexPLastTxMeInstance.RUnlock()
logger.Warnf(ctx, "LockStateFsm - Received SetResponse Data for %s with wrong classID or entityID ",
log.Fields{"device-id": oFsm.deviceID, "data-fields": msgObj}, msgObj.EntityClass)
}
} else {
+ oFsm.mutexPLastTxMeInstance.RUnlock()
logger.Errorw(ctx, "pLastTxMeInstance is nil", log.Fields{"device-id": oFsm.deviceID})
return
}
@@ -379,9 +405,11 @@
func (oFsm *lockStateFsm) performUniPortAdminSet(ctx context.Context) {
var omciAdminState uint8 = 1 //default locked
+ oFsm.mutexAdminState.RLock()
if !oFsm.adminState {
omciAdminState = 0
}
+ oFsm.mutexAdminState.RUnlock()
//set PPTPEthUni or VEIP AdminState
requestedAttributes := me.AttributeValueMap{"AdministrativeState": omciAdminState}
@@ -393,41 +421,50 @@
if uniPort.portType == uniPPTP {
logger.Debugw(ctx, "Setting PPTP admin state", log.Fields{
"device-id": oFsm.deviceID, "for PortNo": uniNo, "state (0-unlock)": omciAdminState})
+ oFsm.mutexPLastTxMeInstance.Lock()
meInstance, err := oFsm.pOmciCC.sendSetPptpEthUniLS(log.WithSpanFromContext(context.TODO(), ctx),
uniPort.entityID, oFsm.pDeviceHandler.pOpenOnuAc.omciTimeout,
true, requestedAttributes, oFsm.pAdaptFsm.commChan)
if err != nil {
+ oFsm.mutexPLastTxMeInstance.Unlock()
logger.Errorw(ctx, "SetPptpEthUniLS set failed, aborting LockStateFsm!",
log.Fields{"device-id": oFsm.deviceID})
_ = oFsm.pAdaptFsm.pFsm.Event(uniEvReset)
return
}
oFsm.pLastTxMeInstance = meInstance
+ oFsm.mutexPLastTxMeInstance.Unlock()
} else if uniPort.portType == uniVEIP {
logger.Debugw(ctx, "Setting VEIP admin state", log.Fields{
"device-id": oFsm.deviceID, "for PortNo": uniNo, "state (0-unlock)": omciAdminState})
+ oFsm.mutexPLastTxMeInstance.Lock()
meInstance, err := oFsm.pOmciCC.sendSetVeipLS(log.WithSpanFromContext(context.TODO(), ctx),
uniPort.entityID, oFsm.pDeviceHandler.pOpenOnuAc.omciTimeout,
true, requestedAttributes, oFsm.pAdaptFsm.commChan)
if err != nil {
+ oFsm.mutexPLastTxMeInstance.Unlock()
logger.Errorw(ctx, "SetVeipLS set failed, aborting LockStateFsm!",
log.Fields{"device-id": oFsm.deviceID})
_ = oFsm.pAdaptFsm.pFsm.Event(uniEvReset)
return
}
oFsm.pLastTxMeInstance = meInstance
+ oFsm.mutexPLastTxMeInstance.Unlock()
} else {
logger.Warnw(ctx, "Unsupported UniTP type - skip",
log.Fields{"device-id": oFsm.deviceID, "Port": uniNo})
continue
}
+ oFsm.mutexPLastTxMeInstance.RLock()
if oFsm.pLastTxMeInstance == nil {
+ oFsm.mutexPLastTxMeInstance.RUnlock()
logger.Errorw(ctx, "could not send PortAdmin OMCI message from LockStateFsm", log.Fields{
"device-id": oFsm.deviceID, "Port": uniNo})
//some more sophisticated approach is possible, e.g. repeating once, by now let's reset the state machine in order to release all resources now
_ = oFsm.pAdaptFsm.pFsm.Event(uniEvReset)
return
}
+ oFsm.mutexPLastTxMeInstance.RUnlock()
//verify response
err := oFsm.waitforOmciResponse(ctx, meInstance)