VOL-3882 Changing onuIndications channel to per PON instead of per ONU

Change-Id: Ieac25ae243a6e439f9aac46743cbfb83087528d7
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index 8eaf0bf..ec5bf84 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -94,9 +94,9 @@
 	stopIndications               chan bool
 	isReadIndicationRoutineActive bool
 
-	totalPonPorts     uint32
-	perOnuChannel     map[string]onuIndicationChannels
-	perOnuChannelLock sync.Mutex
+	totalPonPorts                  uint32
+	perPonOnuIndicationChannel     map[uint32]onuIndicationChannels
+	perPonOnuIndicationChannelLock sync.Mutex
 
 	// Slice of channels. Each channel in slice, index by (mcast-group-id modulo MaxNumOfGroupHandlerChannels)
 	// A go routine per index, waits on a unique channel for incoming mcast flow or group (add/modify/remove).
@@ -115,14 +115,13 @@
 	rdiRaised     bool
 }
 
-type perOnuIndication struct {
-	ctx          context.Context
-	indication   *oop.Indication
-	serialNumber string
+type onuIndicationMsg struct {
+	ctx        context.Context
+	indication *oop.Indication
 }
 
 type onuIndicationChannels struct {
-	indicationChannel chan perOnuIndication
+	indicationChannel chan onuIndicationMsg
 	stopChannel       chan struct{}
 }
 
@@ -179,7 +178,7 @@
 	dh.metrics = pmmetrics.NewPmMetrics(cloned.Id, pmmetrics.Frequency(150), pmmetrics.FrequencyOverride(false), pmmetrics.Grouped(false), pmmetrics.Metrics(pmNames))
 	dh.activePorts = sync.Map{}
 	dh.stopIndications = make(chan bool, 1)
-	dh.perOnuChannel = make(map[string]onuIndicationChannels)
+	dh.perPonOnuIndicationChannel = make(map[uint32]onuIndicationChannels)
 	// Create a slice of buffered channels for handling concurrent mcast flow/group.
 	dh.incomingMcastFlowOrGroup = make([]chan McastFlowOrGroupControlBlock, MaxNumOfGroupHandlerChannels)
 	for i := range dh.incomingMcastFlowOrGroup {
@@ -554,18 +553,16 @@
 
 		onuDiscInd := indication.GetOnuDiscInd()
 		logger.Infow(ctx, "received-onu-discovery-indication", log.Fields{"OnuDiscInd": onuDiscInd, "device-id": dh.device.Id})
-		sn := dh.stringifySerialNumber(onuDiscInd.SerialNumber)
 		//put message  to channel and return immediately
-		dh.putOnuIndicationToChannel(ctx, indication, sn)
+		dh.putOnuIndicationToChannel(ctx, indication, onuDiscInd.GetIntfId())
 	case *oop.Indication_OnuInd:
 		span, ctx := log.CreateChildSpan(ctx, "onu-indication", log.Fields{"device-id": dh.device.Id})
 		defer span.Finish()
 
 		onuInd := indication.GetOnuInd()
 		logger.Infow(ctx, "received-onu-indication", log.Fields{"OnuInd": onuInd, "device-id": dh.device.Id})
-		sn := dh.stringifySerialNumber(onuInd.SerialNumber)
 		//put message  to channel and return immediately
-		dh.putOnuIndicationToChannel(ctx, indication, sn)
+		dh.putOnuIndicationToChannel(ctx, indication, onuInd.GetIntfId())
 	case *oop.Indication_OmciInd:
 		span, ctx := log.CreateChildSpan(ctx, "omci-indication", log.Fields{"device-id": dh.device.Id})
 		defer span.Finish()
@@ -1129,10 +1126,11 @@
 	return nil
 }
 
-func (dh *DeviceHandler) onuDiscIndication(ctx context.Context, onuDiscInd *oop.OnuDiscIndication, sn string) error {
+func (dh *DeviceHandler) onuDiscIndication(ctx context.Context, onuDiscInd *oop.OnuDiscIndication) error {
 	channelID := onuDiscInd.GetIntfId()
 	parentPortNo := IntfIDToPortNo(onuDiscInd.GetIntfId(), voltha.Port_PON_OLT)
 
+	sn := dh.stringifySerialNumber(onuDiscInd.SerialNumber)
 	logger.Infow(ctx, "new-discovery-indication", log.Fields{"sn": sn})
 
 	kwargs := make(map[string]interface{})
@@ -1263,7 +1261,7 @@
 	return nil
 }
 
-func (dh *DeviceHandler) onuIndication(ctx context.Context, onuInd *oop.OnuIndication, serialNumber string) error {
+func (dh *DeviceHandler) onuIndication(ctx context.Context, onuInd *oop.OnuIndication) error {
 
 	kwargs := make(map[string]interface{})
 	ponPort := IntfIDToPortNo(onuInd.GetIntfId(), voltha.Port_PON_OLT)
@@ -1275,6 +1273,7 @@
 			"intfId":    onuInd.GetIntfId(),
 			"device-id": dh.device.Id})
 	onuKey := dh.formOnuKey(onuInd.GetIntfId(), onuInd.OnuId)
+	serialNumber := dh.stringifySerialNumber(onuInd.SerialNumber)
 
 	errFields := log.Fields{"device-id": dh.device.Id}
 
@@ -1394,15 +1393,18 @@
 }
 
 func (dh *DeviceHandler) stringifyVendorSpecific(vendorSpecific []byte) string {
-	tmp := fmt.Sprintf("%x", (uint32(vendorSpecific[0])>>4)&0x0f) +
-		fmt.Sprintf("%x", uint32(vendorSpecific[0]&0x0f)) +
-		fmt.Sprintf("%x", (uint32(vendorSpecific[1])>>4)&0x0f) +
-		fmt.Sprintf("%x", (uint32(vendorSpecific[1]))&0x0f) +
-		fmt.Sprintf("%x", (uint32(vendorSpecific[2])>>4)&0x0f) +
-		fmt.Sprintf("%x", (uint32(vendorSpecific[2]))&0x0f) +
-		fmt.Sprintf("%x", (uint32(vendorSpecific[3])>>4)&0x0f) +
-		fmt.Sprintf("%x", (uint32(vendorSpecific[3]))&0x0f)
-	return tmp
+	if len(vendorSpecific) > 3 {
+		tmp := fmt.Sprintf("%x", (uint32(vendorSpecific[0])>>4)&0x0f) +
+			fmt.Sprintf("%x", uint32(vendorSpecific[0]&0x0f)) +
+			fmt.Sprintf("%x", (uint32(vendorSpecific[1])>>4)&0x0f) +
+			fmt.Sprintf("%x", (uint32(vendorSpecific[1]))&0x0f) +
+			fmt.Sprintf("%x", (uint32(vendorSpecific[2])>>4)&0x0f) +
+			fmt.Sprintf("%x", (uint32(vendorSpecific[2]))&0x0f) +
+			fmt.Sprintf("%x", (uint32(vendorSpecific[3])>>4)&0x0f) +
+			fmt.Sprintf("%x", (uint32(vendorSpecific[3]))&0x0f)
+		return tmp
+	}
+	return ""
 }
 
 //UpdateFlowsBulk upates the bulk flow
@@ -1733,6 +1735,7 @@
 			return olterrors.NewErrAdapter("olt-reboot-failed", log.Fields{"device-id": dh.device.Id}, err).Log()
 		}
 	}
+	dh.removeOnuIndicationChannels(ctx)
 	// There is no need to update the core about operation status and connection status of the OLT.
 	// The OLT is getting deleted anyway and the core might have already cleared the OLT device from its DB.
 	// So any attempt to update the operation status and connection status of the OLT will result in core throwing an error back,
@@ -2215,7 +2218,6 @@
 	}
 	dh.onus.Delete(onuKey)
 	dh.discOnus.Delete(onuDevice.(*OnuDevice).serialNumber)
-	dh.removeOnuIndicationChannels(ctx, onuDevice.(*OnuDevice).serialNumber)
 	return nil
 }
 
@@ -2329,69 +2331,65 @@
 	return intfID
 }
 
-func (dh *DeviceHandler) getOnuIndicationChannel(ctx context.Context, sn string) chan perOnuIndication {
-	dh.perOnuChannelLock.Lock()
-	if ch, ok := dh.perOnuChannel[sn]; ok {
-		dh.perOnuChannelLock.Unlock()
+func (dh *DeviceHandler) getOnuIndicationChannel(ctx context.Context, intfID uint32) chan onuIndicationMsg {
+	dh.perPonOnuIndicationChannelLock.Lock()
+	if ch, ok := dh.perPonOnuIndicationChannel[intfID]; ok {
+		dh.perPonOnuIndicationChannelLock.Unlock()
 		return ch.indicationChannel
 	}
 	channels := onuIndicationChannels{
 		//We create a buffered channel here to avoid calling function to be  blocked
-		//in case of multiple indications from the same ONU,
+		//in case of multiple indications from the ONUs,
 		//especially in the case where indications are buffered in  OLT.
-		indicationChannel: make(chan perOnuIndication, 5),
+		indicationChannel: make(chan onuIndicationMsg, 500),
 		stopChannel:       make(chan struct{}),
 	}
-	dh.perOnuChannel[sn] = channels
-	dh.perOnuChannelLock.Unlock()
-	go dh.perOnuIndicationsRoutine(&channels)
+	dh.perPonOnuIndicationChannel[intfID] = channels
+	dh.perPonOnuIndicationChannelLock.Unlock()
+	go dh.onuIndicationsRoutine(&channels)
 	return channels.indicationChannel
 
 }
 
-func (dh *DeviceHandler) removeOnuIndicationChannels(ctx context.Context, sn string) {
-	logger.Debugw(ctx, "remove-onu-indication-channels", log.Fields{"sn": sn})
-	dh.perOnuChannelLock.Lock()
-	defer dh.perOnuChannelLock.Unlock()
-	if ch, ok := dh.perOnuChannel[sn]; ok {
-		close(ch.stopChannel)
-		delete(dh.perOnuChannel, sn)
+func (dh *DeviceHandler) removeOnuIndicationChannels(ctx context.Context) {
+	logger.Debug(ctx, "remove-onu-indication-channels", log.Fields{"device-id": dh.device.Id})
+	dh.perPonOnuIndicationChannelLock.Lock()
+	defer dh.perPonOnuIndicationChannelLock.Unlock()
+	for _, v := range dh.perPonOnuIndicationChannel {
+		close(v.stopChannel)
 	}
-
+	dh.perPonOnuIndicationChannel = make(map[uint32]onuIndicationChannels)
 }
 
-func (dh *DeviceHandler) putOnuIndicationToChannel(ctx context.Context, indication *oop.Indication, sn string) {
-	ind := perOnuIndication{
-		ctx:          ctx,
-		indication:   indication,
-		serialNumber: sn,
+func (dh *DeviceHandler) putOnuIndicationToChannel(ctx context.Context, indication *oop.Indication, intfID uint32) {
+	ind := onuIndicationMsg{
+		ctx:        ctx,
+		indication: indication,
 	}
-	logger.Debugw(ctx, "put-onu-indication-to-channel", log.Fields{"indication": indication, "sn": sn})
+	logger.Debugw(ctx, "put-onu-indication-to-channel", log.Fields{"indication": indication, "intfID": intfID})
 	// Send the onuIndication on the ONU channel
-	dh.getOnuIndicationChannel(ctx, sn) <- ind
+	dh.getOnuIndicationChannel(ctx, intfID) <- ind
 }
 
-func (dh *DeviceHandler) perOnuIndicationsRoutine(onuChannels *onuIndicationChannels) {
+func (dh *DeviceHandler) onuIndicationsRoutine(onuChannels *onuIndicationChannels) {
 	for {
 		select {
 		// process one indication per onu, before proceeding to the next one
 		case onuInd := <-onuChannels.indicationChannel:
 			logger.Debugw(onuInd.ctx, "calling-indication", log.Fields{"device-id": dh.device.Id,
-				"ind": onuInd.indication, "sn": onuInd.serialNumber})
+				"ind": onuInd.indication})
 			switch onuInd.indication.Data.(type) {
 			case *oop.Indication_OnuInd:
-				if err := dh.onuIndication(onuInd.ctx, onuInd.indication.GetOnuInd(), onuInd.serialNumber); err != nil {
+				if err := dh.onuIndication(onuInd.ctx, onuInd.indication.GetOnuInd()); err != nil {
 					_ = olterrors.NewErrAdapter("handle-indication-error", log.Fields{
 						"type":      "onu-indication",
-						"device-id": dh.device.Id,
-						"sn":        onuInd.serialNumber}, err).Log()
+						"device-id": dh.device.Id}, err).Log()
 				}
 			case *oop.Indication_OnuDiscInd:
-				if err := dh.onuDiscIndication(onuInd.ctx, onuInd.indication.GetOnuDiscInd(), onuInd.serialNumber); err != nil {
+				if err := dh.onuDiscIndication(onuInd.ctx, onuInd.indication.GetOnuDiscInd()); err != nil {
 					_ = olterrors.NewErrAdapter("handle-indication-error", log.Fields{
 						"type":      "onu-discovery",
-						"device-id": dh.device.Id,
-						"sn":        onuInd.serialNumber}, err).Log()
+						"device-id": dh.device.Id}, err).Log()
 				}
 			}
 		case <-onuChannels.stopChannel:
diff --git a/internal/pkg/core/device_handler_test.go b/internal/pkg/core/device_handler_test.go
index bba8601..9770f15 100644
--- a/internal/pkg/core/device_handler_test.go
+++ b/internal/pkg/core/device_handler_test.go
@@ -1089,7 +1089,7 @@
 		t.Run(tt.name, func(t *testing.T) {
 			ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
 			defer cancel()
-			_ = tt.devicehandler.onuDiscIndication(ctx, tt.args.onuDiscInd, tt.args.sn)
+			_ = tt.devicehandler.onuDiscIndication(ctx, tt.args.onuDiscInd)
 			//TODO: actually verify test cases
 		})
 	}