VOL-3722 Processing onu related indications in occurence order
The ONU related indications from the OLT are processed in separate go routines. This may cause the indications be processed in wrong orders and sent to onu-adapter in wrong orders.
With this patch onuDiscoveryIndications and OnuIndications processed in per-oonu channels.
Change-Id: I79bab458a047f76759ff1d649c63eb15cfeb361b
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index 229481f..413e490 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -86,7 +86,9 @@
stopIndications chan bool
isReadIndicationRoutineActive bool
- totalPonPorts uint32
+ totalPonPorts uint32
+ perOnuChannel map[string]onuIndicationChannels
+ perOnuChannelLock sync.Mutex
}
//OnuDevice represents ONU related info
@@ -101,6 +103,17 @@
rdiRaised bool
}
+type perOnuIndication struct {
+ ctx context.Context
+ indication *oop.Indication
+ serialNumber string
+}
+
+type onuIndicationChannels struct {
+ indicationChannel chan perOnuIndication
+ stopChannel chan struct{}
+}
+
var pmNames = []string{
"rx_bytes",
"rx_packets",
@@ -142,7 +155,7 @@
dh.metrics = pmmetrics.NewPmMetrics(cloned.Id, pmmetrics.Frequency(150), pmmetrics.FrequencyOverride(false), pmmetrics.Grouped(false), pmmetrics.Metrics(pmNames))
dh.activePorts = sync.Map{}
dh.stopIndications = make(chan bool, 1)
-
+ dh.perOnuChannel = make(map[string]onuIndicationChannels)
//TODO initialize the support classes.
return &dh
}
@@ -508,22 +521,17 @@
onuDiscInd := indication.GetOnuDiscInd()
logger.Infow(ctx, "received-onu-discovery-indication", log.Fields{"OnuDiscInd": onuDiscInd, "device-id": dh.device.Id})
sn := dh.stringifySerialNumber(onuDiscInd.SerialNumber)
- go func() {
- if err := dh.onuDiscIndication(ctx, onuDiscInd, sn); err != nil {
- _ = olterrors.NewErrAdapter("handle-indication-error", log.Fields{"type": "onu-discovery", "device-id": dh.device.Id}, err).Log()
- }
- }()
+ //put message to channel and return immediately
+ dh.putOnuIndicationToChannel(ctx, indication, sn)
case *oop.Indication_OnuInd:
span, ctx := log.CreateChildSpan(ctx, "onu-indication", log.Fields{"device-id": dh.device.Id})
defer span.Finish()
onuInd := indication.GetOnuInd()
logger.Infow(ctx, "received-onu-indication", log.Fields{"OnuInd": onuInd, "device-id": dh.device.Id})
- go func() {
- if err := dh.onuIndication(ctx, onuInd); err != nil {
- _ = olterrors.NewErrAdapter("handle-indication-error", log.Fields{"type": "onu", "device-id": dh.device.Id}, err).Log()
- }
- }()
+ sn := dh.stringifySerialNumber(onuInd.SerialNumber)
+ //put message to channel and return immediately
+ dh.putOnuIndicationToChannel(ctx, indication, sn)
case *oop.Indication_OmciInd:
span, ctx := log.CreateChildSpan(ctx, "omci-indication", log.Fields{"device-id": dh.device.Id})
defer span.Finish()
@@ -1204,8 +1212,7 @@
return nil
}
-func (dh *DeviceHandler) onuIndication(ctx context.Context, onuInd *oop.OnuIndication) error {
- serialNumber := dh.stringifySerialNumber(onuInd.SerialNumber)
+func (dh *DeviceHandler) onuIndication(ctx context.Context, onuInd *oop.OnuIndication, serialNumber string) error {
kwargs := make(map[string]interface{})
ponPort := IntfIDToPortNo(onuInd.GetIntfId(), voltha.Port_PON_OLT)
@@ -2138,6 +2145,7 @@
}
dh.onus.Delete(onuKey)
dh.discOnus.Delete(onuDevice.(*OnuDevice).serialNumber)
+ dh.removeOnuIndicationChannels(ctx, onuDevice.(*OnuDevice).serialNumber)
return nil
}
@@ -2250,3 +2258,76 @@
}
return intfID
}
+
+func (dh *DeviceHandler) getOnuIndicationChannel(ctx context.Context, sn string) chan perOnuIndication {
+ dh.perOnuChannelLock.Lock()
+ if ch, ok := dh.perOnuChannel[sn]; ok {
+ dh.perOnuChannelLock.Unlock()
+ return ch.indicationChannel
+ }
+ channels := onuIndicationChannels{
+ //We create a buffered channel here to avoid calling function to be blocked
+ //in case of multiple indications from the same ONU,
+ //especially in the case where indications are buffered in OLT.
+ indicationChannel: make(chan perOnuIndication, 5),
+ stopChannel: make(chan struct{}),
+ }
+ dh.perOnuChannel[sn] = channels
+ dh.perOnuChannelLock.Unlock()
+ go dh.perOnuIndicationsRoutine(&channels)
+ return channels.indicationChannel
+
+}
+
+func (dh *DeviceHandler) removeOnuIndicationChannels(ctx context.Context, sn string) {
+ logger.Debugw(ctx, "remove-onu-indication-channels", log.Fields{"sn": sn})
+ dh.perOnuChannelLock.Lock()
+ defer dh.perOnuChannelLock.Unlock()
+ if ch, ok := dh.perOnuChannel[sn]; ok {
+ close(ch.stopChannel)
+ delete(dh.perOnuChannel, sn)
+ }
+
+}
+
+func (dh *DeviceHandler) putOnuIndicationToChannel(ctx context.Context, indication *oop.Indication, sn string) {
+ ind := perOnuIndication{
+ ctx: ctx,
+ indication: indication,
+ serialNumber: sn,
+ }
+ logger.Debugw(ctx, "put-onu-indication-to-channel", log.Fields{"indication": indication, "sn": sn})
+ // Send the onuIndication on the ONU channel
+ dh.getOnuIndicationChannel(ctx, sn) <- ind
+}
+
+func (dh *DeviceHandler) perOnuIndicationsRoutine(onuChannels *onuIndicationChannels) {
+ for {
+ select {
+ // process one indication per onu, before proceeding to the next one
+ case onuInd := <-onuChannels.indicationChannel:
+ logger.Debugw(onuInd.ctx, "calling-indication", log.Fields{"device-id": dh.device.Id,
+ "ind": onuInd.indication, "sn": onuInd.serialNumber})
+ switch onuInd.indication.Data.(type) {
+ case *oop.Indication_OnuInd:
+ if err := dh.onuIndication(onuInd.ctx, onuInd.indication.GetOnuInd(), onuInd.serialNumber); err != nil {
+ _ = olterrors.NewErrAdapter("handle-indication-error", log.Fields{
+ "type": "onu-indication",
+ "device-id": dh.device.Id,
+ "sn": onuInd.serialNumber}, err).Log()
+ }
+ case *oop.Indication_OnuDiscInd:
+ if err := dh.onuDiscIndication(onuInd.ctx, onuInd.indication.GetOnuDiscInd(), onuInd.serialNumber); err != nil {
+ _ = olterrors.NewErrAdapter("handle-indication-error", log.Fields{
+ "type": "onu-discovery",
+ "device-id": dh.device.Id,
+ "sn": onuInd.serialNumber}, err).Log()
+ }
+ }
+ case <-onuChannels.stopChannel:
+ logger.Debugw(context.Background(), "stop-signal-received-for-onu-channel", log.Fields{"device-id": dh.device.Id})
+ close(onuChannels.indicationChannel)
+ return
+ }
+ }
+}