- listening for Flows and updating ONU state machine
- Sending the EAP_START packet
- Moving Eapol in an external responder

Change-Id: I94fbbbb391467f44c71be8f2181cd41df7bd92f5
diff --git a/internal/bbsim/devices/onu.go b/internal/bbsim/devices/onu.go
index 172ead4..94e0705 100644
--- a/internal/bbsim/devices/onu.go
+++ b/internal/bbsim/devices/onu.go
@@ -17,10 +17,14 @@
 package devices
 
 import (
-	"github.com/opencord/voltha-protos/go/openolt"
+	"fmt"
+	"github.com/opencord/bbsim/internal/bbsim/responders"
+	"github.com/google/gopacket/layers"
 	"github.com/looplab/fsm"
 	omci "github.com/opencord/omci-sim"
+	"github.com/opencord/voltha-protos/go/openolt"
 	log "github.com/sirupsen/logrus"
+	bbsim "github.com/opencord/bbsim/internal/bbsim/types"
 )
 
 var onuLogger = log.WithFields(log.Fields{
@@ -32,7 +36,9 @@
 			ID: id,
 			PonPortID: pon.ID,
 			PonPort: pon,
+			// NOTE can we combine everything in a single channel?
 			channel: make(chan Message),
+			eapolPktOutCh: make(chan *bbsim.ByteMsg, 1024),
 		}
 		o.SerialNumber = o.NewSN(olt.ID, pon.ID, o.ID)
 
@@ -50,19 +56,68 @@
 			fsm.Events{
 				{Name: "discover", Src: []string{"created"}, Dst: "discovered"},
 				{Name: "enable", Src: []string{"discovered"}, Dst: "enabled"},
-				{Name: "start_omci", Src: []string{"enabled"}, Dst: "starting_openomci"},
+				{Name: "receive_eapol_flow", Src: []string{"enabled", "gem_port_added"}, Dst: "eapol_flow_received"},
+				{Name: "add_gem_port", Src: []string{"enabled", "eapol_flow_received"}, Dst: "gem_port_added"},
+				{Name: "start_auth", Src: []string{"eapol_flow_received", "gem_port_added"}, Dst: "auth_started"},
 			},
 			fsm.Callbacks{
 				"enter_state": func(e *fsm.Event) {
-					onuLogger.WithFields(log.Fields{
-						"ID": o.ID,
-					}).Debugf("Changing ONU InternalState from %s to %s", e.Src, e.Dst)
+					o.logStateChange(e.Src, e.Dst)
+				},
+				"enter_eapol_flow_received": func(e *fsm.Event) {
+					o.logStateChange(e.Src, e.Dst)
+					if e.Src == "enter_gem_port_added" {
+						if err := o.InternalState.Event("start_auth"); err != nil {
+							log.Infof("Transitioning to StartAuth")
+							onuLogger.WithFields(log.Fields{
+								"OnuId":  o.ID,
+								"IntfId": o.PonPortID,
+								"OnuSn":  o.SerialNumber,
+							}).Errorf("Error while transitioning ONU State")
+						}
+					}
+				},
+				"enter_gem_port_added": func(e *fsm.Event) {
+					o.logStateChange(e.Src, e.Dst)
+					if e.Src == "eapol_flow_received" {
+						log.Infof("Transitioning to StartAuth")
+						if err := o.InternalState.Event("start_auth"); err != nil {
+							onuLogger.WithFields(log.Fields{
+								"OnuId": o.ID,
+								"IntfId": o.PonPortID,
+								"OnuSn": o.SerialNumber,
+							}).Errorf("Error while transitioning ONU State")
+						}
+					}
+				},
+				"enter_auth_started": func(e *fsm.Event) {
+					o.logStateChange(e.Src, e.Dst)
+					msg := Message{
+						Type:      StartEAPOL,
+						Data: EapStartMessage{
+							PonPortID: o.PonPortID,
+							OnuID: o.ID,
+						},
+					}
+					go func(msg Message){
+						// you can only send a value on an unbuffered channel without blocking
+						o.channel <- msg
+					}(msg)
+
 				},
 			},
 		)
 		return o
 }
 
+func (o Onu) logStateChange(src string, dst string) {
+	onuLogger.WithFields(log.Fields{
+		"OnuId": o.ID,
+		"IntfId": o.PonPortID,
+		"OnuSn": o.SerialNumber,
+	}).Debugf("Changing ONU InternalState from %s to %s", src, dst)
+}
+
 func (o Onu) processOnuMessages(stream openolt.Openolt_EnableIndicationServer)  {
 	onuLogger.WithFields(log.Fields{
 		"onuID": o.ID,
@@ -74,7 +129,7 @@
 			"onuID": o.ID,
 			"onuSN": o.SerialNumber,
 			"messageType": message.Type,
-		}).Trace("Received message")
+		}).Tracef("Received message on ONU Channel")
 
 		switch message.Type {
 		case OnuDiscIndication:
@@ -85,14 +140,53 @@
 			o.sendOnuIndication(msg, stream)
 		case OMCI:
 			msg, _ := message.Data.(OmciMessage)
-			o.InternalState.Event("start_omci")
 			o.handleOmciMessage(msg, stream)
+		case FlowUpdate:
+			msg, _ := message.Data.(OnuFlowUpdateMessage)
+			o.handleFlowUpdate(msg, stream)
+		case StartEAPOL:
+			log.Infof("Receive StartEAPOL message on ONU channel")
+			go func() {
+
+				responders.StartWPASupplicant(o.ID, o.PonPortID, o.SerialNumber, stream, o.eapolPktOutCh)
+			}()
 		default:
 			onuLogger.Warnf("Received unknown message data %v for type %v in OLT channel", message.Data, message.Type)
 		}
 	}
 }
 
+func (o Onu) processOmciMessages(stream openolt.Openolt_EnableIndicationServer)  {
+	ch := omci.GetChannel()
+
+	onuLogger.WithFields(log.Fields{
+		"onuID": o.ID,
+		"onuSN": o.SerialNumber,
+	}).Debug("Started OMCI Indication Channel")
+
+	for message := range ch {
+		switch message.Type {
+		case omci.GemPortAdded:
+			log.WithFields(log.Fields{
+				"OnuId": message.Data.OnuId,
+				"IntfId": message.Data.IntfId,
+			}).Infof("GemPort Added")
+
+			// NOTE if we receive the GemPort but we don't have EAPOL flows
+			// go an intermediate state, otherwise start auth
+			if o.InternalState.Is("enabled") {
+				if err := o.InternalState.Event("add_gem_port"); err != nil {
+					log.Errorf("Can't go to gem_port_added: %v", err)
+				}
+			} else if  o.InternalState.Is("eapol_flow_received"){
+				if err := o.InternalState.Event("start_auth"); err != nil {
+					log.Errorf("Can't go to auth_started: %v", err)
+				}
+			}
+		}
+	}
+}
+
 func (o Onu) NewSN(oltid int, intfid uint32, onuid uint32) *openolt.SerialNumber {
 
 	sn := new(openolt.SerialNumber)
@@ -116,6 +210,7 @@
 	onuLogger.WithFields(log.Fields{
 		"IntfId": msg.Onu.PonPortID,
 		"SerialNumber": msg.Onu.SerialNumber,
+		"OnuId": o.ID,
 	}).Debug("Sent Indication_OnuDiscInd")
 }
 
@@ -175,6 +270,33 @@
 	}).Tracef("Sent OMCI message")
 }
 
+func (o Onu) handleFlowUpdate(msg OnuFlowUpdateMessage, stream openolt.Openolt_EnableIndicationServer) {
+	onuLogger.WithFields(log.Fields{
+		"IntfId": msg.Flow.AccessIntfId,
+		"OnuId": msg.Flow.OnuId,
+		"EthType": fmt.Sprintf("%x", msg.Flow.Classifier.EthType),
+		"InnerVlan": msg.Flow.Classifier.IVid,
+		"OuterVlan": msg.Flow.Classifier.OVid,
+		"FlowType": msg.Flow.FlowType,
+		"FlowId": msg.Flow.FlowId,
+		"UniID": msg.Flow.UniId,
+		"PortNo": msg.Flow.PortNo,
+	}).Infof("ONU receives Flow")
+	if msg.Flow.Classifier.EthType == uint32(layers.EthernetTypeEAPOL) && msg.Flow.Classifier.OVid == 4091 {
+		// NOTE if we receive the EAPOL flows but we don't have GemPorts
+		// go an intermediate state, otherwise start auth
+		if o.InternalState.Is("enabled") {
+			if err := o.InternalState.Event("receive_eapol_flow"); err != nil {
+				log.Errorf("Can't go to eapol_flow_received: %v", err)
+			}
+		} else if  o.InternalState.Is("gem_port_added"){
+			if err := o.InternalState.Event("start_auth"); err != nil {
+				log.Errorf("Can't go to auth_started: %v", err)
+			}
+		}
+	}
+}
+
 // HexDecode converts the hex encoding to binary
 func HexDecode(pkt []byte) []byte {
 	p := make([]byte, len(pkt)/2)