Merge "VOL-2090 Move omci processing to olt, so that messages can be processed by the correct onu"
diff --git a/internal/bbsim/devices/olt.go b/internal/bbsim/devices/olt.go
index 27f4e78..bfaa5fd 100644
--- a/internal/bbsim/devices/olt.go
+++ b/internal/bbsim/devices/olt.go
@@ -25,6 +25,7 @@
 	"github.com/looplab/fsm"
 	"github.com/opencord/bbsim/internal/bbsim/packetHandlers"
 	bbsim "github.com/opencord/bbsim/internal/bbsim/types"
+	omcisim "github.com/opencord/omci-sim"
 	"github.com/opencord/voltha-protos/go/openolt"
 	"github.com/opencord/voltha-protos/go/tech_profile"
 	log "github.com/sirupsen/logrus"
@@ -212,7 +213,7 @@
 		}
 		o.channel <- msg
 	}
-
+	go o.processOmciMessages()
 	// send PON Port indications
 	for _, pon := range o.Pons {
 		msg := Message{
@@ -226,7 +227,6 @@
 
 		for _, onu := range pon.Onus {
 			go onu.ProcessOnuMessages(stream, nil)
-			go onu.processOmciMessages(stream)
 			// FIXME move the message generation in the state transition
 			// from here only invoke the state transition
 			msg := Message{
@@ -244,6 +244,22 @@
 	return nil
 }
 
+func (o OltDevice) processOmciMessages() {
+	ch := omcisim.GetChannel()
+
+	oltLogger.Debug("Started OMCI Indication Channel")
+
+	for message := range ch {
+		onuId := message.Data.OnuId
+		intfId := message.Data.IntfId
+		onu, err := o.FindOnuById(intfId, onuId)
+		if err != nil {
+			oltLogger.Errorf("Failed to find onu: %v", err)
+		}
+		go onu.processOmciMessage(message)
+	}
+}
+
 // Helpers method
 
 func (o OltDevice) GetPonById(id uint32) (*PonPort, error) {
@@ -433,6 +449,22 @@
 	return &Onu{}, errors.New(fmt.Sprintf("cannot-find-onu-by-serial-number-%s", serialNumber))
 }
 
+// returns an ONU with a given interface/Onu Id
+func (o OltDevice) FindOnuById(intfId uint32, onuId uint32) (*Onu, error) {
+	// TODO this function can be a performance bottlenec when we have many ONUs,
+	// memoizing it will remove the bottleneck
+	for _, pon := range o.Pons {
+		if pon.ID == intfId {
+			for _, onu := range pon.Onus {
+				if onu.ID == onuId {
+					return onu, nil
+				}
+			}
+		}
+	}
+	return &Onu{}, errors.New(fmt.Sprintf("cannot-find-onu-by-id-%v-%v", intfId, onuId))
+}
+
 // returns an ONU with a given Mac Address
 func (o OltDevice) FindOnuByMacAddress(mac net.HardwareAddr) (*Onu, error) {
 	// TODO this function can be a perfoormance bottlenec when we have many ONUs,
diff --git a/internal/bbsim/devices/onu.go b/internal/bbsim/devices/onu.go
index f4ef84b..2aba5d2 100644
--- a/internal/bbsim/devices/onu.go
+++ b/internal/bbsim/devices/onu.go
@@ -294,32 +294,23 @@
 	}
 }
 
-func (o Onu) processOmciMessages(stream openolt.Openolt_EnableIndicationServer) {
-	ch := omcisim.GetChannel()
+func (o Onu) processOmciMessage(message omcisim.OmciChMessage) {
+	switch message.Type {
+	case omcisim.GemPortAdded:
+		log.WithFields(log.Fields{
+			"OnuId":  message.Data.OnuId,
+			"IntfId": message.Data.IntfId,
+		}).Infof("GemPort Added")
 
-	onuLogger.WithFields(log.Fields{
-		"onuID": o.ID,
-		"onuSN": o.Sn(),
-	}).Debug("Started OMCI Indication Channel")
-
-	for message := range ch {
-		switch message.Type {
-		case omcisim.GemPortAdded:
-			log.WithFields(log.Fields{
-				"OnuId":  message.Data.OnuId,
-				"IntfId": message.Data.IntfId,
-			}).Infof("GemPort Added")
-
-			// NOTE if we receive the GemPort but we don't have EAPOL flows
-			// go an intermediate state, otherwise start auth
-			if o.InternalState.Is("enabled") {
-				if err := o.InternalState.Event("add_gem_port"); err != nil {
-					log.Errorf("Can't go to gem_port_added: %v", err)
-				}
-			} else if o.InternalState.Is("eapol_flow_received") {
-				if err := o.InternalState.Event("start_auth"); err != nil {
-					log.Errorf("Can't go to auth_started: %v", err)
-				}
+		// NOTE if we receive the GemPort but we don't have EAPOL flows
+		// go an intermediate state, otherwise start auth
+		if o.InternalState.Is("enabled") {
+			if err := o.InternalState.Event("add_gem_port"); err != nil {
+				log.Errorf("Can't go to gem_port_added: %v", err)
+			}
+		} else if o.InternalState.Is("eapol_flow_received") {
+			if err := o.InternalState.Event("start_auth"); err != nil {
+				log.Errorf("Can't go to auth_started: %v", err)
 			}
 		}
 	}