VOL-3722 Processing onu related indications in occurence order
The ONU related indications from the OLT are processed in separate go routines. This may cause the indications be processed in wrong orders and sent to onu-adapter in wrong orders.
With this patch onuDiscoveryIndications and OnuIndications processed in per-oonu channels.

Change-Id: I79bab458a047f76759ff1d649c63eb15cfeb361b
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index 229481f..413e490 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -86,7 +86,9 @@
 	stopIndications               chan bool
 	isReadIndicationRoutineActive bool
 
-	totalPonPorts uint32
+	totalPonPorts     uint32
+	perOnuChannel     map[string]onuIndicationChannels
+	perOnuChannelLock sync.Mutex
 }
 
 //OnuDevice represents ONU related info
@@ -101,6 +103,17 @@
 	rdiRaised     bool
 }
 
+type perOnuIndication struct {
+	ctx          context.Context
+	indication   *oop.Indication
+	serialNumber string
+}
+
+type onuIndicationChannels struct {
+	indicationChannel chan perOnuIndication
+	stopChannel       chan struct{}
+}
+
 var pmNames = []string{
 	"rx_bytes",
 	"rx_packets",
@@ -142,7 +155,7 @@
 	dh.metrics = pmmetrics.NewPmMetrics(cloned.Id, pmmetrics.Frequency(150), pmmetrics.FrequencyOverride(false), pmmetrics.Grouped(false), pmmetrics.Metrics(pmNames))
 	dh.activePorts = sync.Map{}
 	dh.stopIndications = make(chan bool, 1)
-
+	dh.perOnuChannel = make(map[string]onuIndicationChannels)
 	//TODO initialize the support classes.
 	return &dh
 }
@@ -508,22 +521,17 @@
 		onuDiscInd := indication.GetOnuDiscInd()
 		logger.Infow(ctx, "received-onu-discovery-indication", log.Fields{"OnuDiscInd": onuDiscInd, "device-id": dh.device.Id})
 		sn := dh.stringifySerialNumber(onuDiscInd.SerialNumber)
-		go func() {
-			if err := dh.onuDiscIndication(ctx, onuDiscInd, sn); err != nil {
-				_ = olterrors.NewErrAdapter("handle-indication-error", log.Fields{"type": "onu-discovery", "device-id": dh.device.Id}, err).Log()
-			}
-		}()
+		//put message  to channel and return immediately
+		dh.putOnuIndicationToChannel(ctx, indication, sn)
 	case *oop.Indication_OnuInd:
 		span, ctx := log.CreateChildSpan(ctx, "onu-indication", log.Fields{"device-id": dh.device.Id})
 		defer span.Finish()
 
 		onuInd := indication.GetOnuInd()
 		logger.Infow(ctx, "received-onu-indication", log.Fields{"OnuInd": onuInd, "device-id": dh.device.Id})
-		go func() {
-			if err := dh.onuIndication(ctx, onuInd); err != nil {
-				_ = olterrors.NewErrAdapter("handle-indication-error", log.Fields{"type": "onu", "device-id": dh.device.Id}, err).Log()
-			}
-		}()
+		sn := dh.stringifySerialNumber(onuInd.SerialNumber)
+		//put message  to channel and return immediately
+		dh.putOnuIndicationToChannel(ctx, indication, sn)
 	case *oop.Indication_OmciInd:
 		span, ctx := log.CreateChildSpan(ctx, "omci-indication", log.Fields{"device-id": dh.device.Id})
 		defer span.Finish()
@@ -1204,8 +1212,7 @@
 	return nil
 }
 
-func (dh *DeviceHandler) onuIndication(ctx context.Context, onuInd *oop.OnuIndication) error {
-	serialNumber := dh.stringifySerialNumber(onuInd.SerialNumber)
+func (dh *DeviceHandler) onuIndication(ctx context.Context, onuInd *oop.OnuIndication, serialNumber string) error {
 
 	kwargs := make(map[string]interface{})
 	ponPort := IntfIDToPortNo(onuInd.GetIntfId(), voltha.Port_PON_OLT)
@@ -2138,6 +2145,7 @@
 	}
 	dh.onus.Delete(onuKey)
 	dh.discOnus.Delete(onuDevice.(*OnuDevice).serialNumber)
+	dh.removeOnuIndicationChannels(ctx, onuDevice.(*OnuDevice).serialNumber)
 	return nil
 }
 
@@ -2250,3 +2258,76 @@
 	}
 	return intfID
 }
+
+func (dh *DeviceHandler) getOnuIndicationChannel(ctx context.Context, sn string) chan perOnuIndication {
+	dh.perOnuChannelLock.Lock()
+	if ch, ok := dh.perOnuChannel[sn]; ok {
+		dh.perOnuChannelLock.Unlock()
+		return ch.indicationChannel
+	}
+	channels := onuIndicationChannels{
+		//We create a buffered channel here to avoid calling function to be  blocked
+		//in case of multiple indications from the same ONU,
+		//especially in the case where indications are buffered in  OLT.
+		indicationChannel: make(chan perOnuIndication, 5),
+		stopChannel:       make(chan struct{}),
+	}
+	dh.perOnuChannel[sn] = channels
+	dh.perOnuChannelLock.Unlock()
+	go dh.perOnuIndicationsRoutine(&channels)
+	return channels.indicationChannel
+
+}
+
+func (dh *DeviceHandler) removeOnuIndicationChannels(ctx context.Context, sn string) {
+	logger.Debugw(ctx, "remove-onu-indication-channels", log.Fields{"sn": sn})
+	dh.perOnuChannelLock.Lock()
+	defer dh.perOnuChannelLock.Unlock()
+	if ch, ok := dh.perOnuChannel[sn]; ok {
+		close(ch.stopChannel)
+		delete(dh.perOnuChannel, sn)
+	}
+
+}
+
+func (dh *DeviceHandler) putOnuIndicationToChannel(ctx context.Context, indication *oop.Indication, sn string) {
+	ind := perOnuIndication{
+		ctx:          ctx,
+		indication:   indication,
+		serialNumber: sn,
+	}
+	logger.Debugw(ctx, "put-onu-indication-to-channel", log.Fields{"indication": indication, "sn": sn})
+	// Send the onuIndication on the ONU channel
+	dh.getOnuIndicationChannel(ctx, sn) <- ind
+}
+
+func (dh *DeviceHandler) perOnuIndicationsRoutine(onuChannels *onuIndicationChannels) {
+	for {
+		select {
+		// process one indication per onu, before proceeding to the next one
+		case onuInd := <-onuChannels.indicationChannel:
+			logger.Debugw(onuInd.ctx, "calling-indication", log.Fields{"device-id": dh.device.Id,
+				"ind": onuInd.indication, "sn": onuInd.serialNumber})
+			switch onuInd.indication.Data.(type) {
+			case *oop.Indication_OnuInd:
+				if err := dh.onuIndication(onuInd.ctx, onuInd.indication.GetOnuInd(), onuInd.serialNumber); err != nil {
+					_ = olterrors.NewErrAdapter("handle-indication-error", log.Fields{
+						"type":      "onu-indication",
+						"device-id": dh.device.Id,
+						"sn":        onuInd.serialNumber}, err).Log()
+				}
+			case *oop.Indication_OnuDiscInd:
+				if err := dh.onuDiscIndication(onuInd.ctx, onuInd.indication.GetOnuDiscInd(), onuInd.serialNumber); err != nil {
+					_ = olterrors.NewErrAdapter("handle-indication-error", log.Fields{
+						"type":      "onu-discovery",
+						"device-id": dh.device.Id,
+						"sn":        onuInd.serialNumber}, err).Log()
+				}
+			}
+		case <-onuChannels.stopChannel:
+			logger.Debugw(context.Background(), "stop-signal-received-for-onu-channel", log.Fields{"device-id": dh.device.Id})
+			close(onuChannels.indicationChannel)
+			return
+		}
+	}
+}