- listening for Flows and updating ONU state machine
- Sending the EAP_START packet
- Moving Eapol in an external responder
Change-Id: I94fbbbb391467f44c71be8f2181cd41df7bd92f5
diff --git a/internal/bbsim/devices/onu.go b/internal/bbsim/devices/onu.go
index 172ead4..94e0705 100644
--- a/internal/bbsim/devices/onu.go
+++ b/internal/bbsim/devices/onu.go
@@ -17,10 +17,14 @@
package devices
import (
- "github.com/opencord/voltha-protos/go/openolt"
+ "fmt"
+ "github.com/opencord/bbsim/internal/bbsim/responders"
+ "github.com/google/gopacket/layers"
"github.com/looplab/fsm"
omci "github.com/opencord/omci-sim"
+ "github.com/opencord/voltha-protos/go/openolt"
log "github.com/sirupsen/logrus"
+ bbsim "github.com/opencord/bbsim/internal/bbsim/types"
)
var onuLogger = log.WithFields(log.Fields{
@@ -32,7 +36,9 @@
ID: id,
PonPortID: pon.ID,
PonPort: pon,
+ // NOTE can we combine everything in a single channel?
channel: make(chan Message),
+ eapolPktOutCh: make(chan *bbsim.ByteMsg, 1024),
}
o.SerialNumber = o.NewSN(olt.ID, pon.ID, o.ID)
@@ -50,19 +56,68 @@
fsm.Events{
{Name: "discover", Src: []string{"created"}, Dst: "discovered"},
{Name: "enable", Src: []string{"discovered"}, Dst: "enabled"},
- {Name: "start_omci", Src: []string{"enabled"}, Dst: "starting_openomci"},
+ {Name: "receive_eapol_flow", Src: []string{"enabled", "gem_port_added"}, Dst: "eapol_flow_received"},
+ {Name: "add_gem_port", Src: []string{"enabled", "eapol_flow_received"}, Dst: "gem_port_added"},
+ {Name: "start_auth", Src: []string{"eapol_flow_received", "gem_port_added"}, Dst: "auth_started"},
},
fsm.Callbacks{
"enter_state": func(e *fsm.Event) {
- onuLogger.WithFields(log.Fields{
- "ID": o.ID,
- }).Debugf("Changing ONU InternalState from %s to %s", e.Src, e.Dst)
+ o.logStateChange(e.Src, e.Dst)
+ },
+ "enter_eapol_flow_received": func(e *fsm.Event) {
+ o.logStateChange(e.Src, e.Dst)
+ if e.Src == "enter_gem_port_added" {
+ if err := o.InternalState.Event("start_auth"); err != nil {
+ log.Infof("Transitioning to StartAuth")
+ onuLogger.WithFields(log.Fields{
+ "OnuId": o.ID,
+ "IntfId": o.PonPortID,
+ "OnuSn": o.SerialNumber,
+ }).Errorf("Error while transitioning ONU State")
+ }
+ }
+ },
+ "enter_gem_port_added": func(e *fsm.Event) {
+ o.logStateChange(e.Src, e.Dst)
+ if e.Src == "eapol_flow_received" {
+ log.Infof("Transitioning to StartAuth")
+ if err := o.InternalState.Event("start_auth"); err != nil {
+ onuLogger.WithFields(log.Fields{
+ "OnuId": o.ID,
+ "IntfId": o.PonPortID,
+ "OnuSn": o.SerialNumber,
+ }).Errorf("Error while transitioning ONU State")
+ }
+ }
+ },
+ "enter_auth_started": func(e *fsm.Event) {
+ o.logStateChange(e.Src, e.Dst)
+ msg := Message{
+ Type: StartEAPOL,
+ Data: EapStartMessage{
+ PonPortID: o.PonPortID,
+ OnuID: o.ID,
+ },
+ }
+ go func(msg Message){
+ // you can only send a value on an unbuffered channel without blocking
+ o.channel <- msg
+ }(msg)
+
},
},
)
return o
}
+func (o Onu) logStateChange(src string, dst string) {
+ onuLogger.WithFields(log.Fields{
+ "OnuId": o.ID,
+ "IntfId": o.PonPortID,
+ "OnuSn": o.SerialNumber,
+ }).Debugf("Changing ONU InternalState from %s to %s", src, dst)
+}
+
func (o Onu) processOnuMessages(stream openolt.Openolt_EnableIndicationServer) {
onuLogger.WithFields(log.Fields{
"onuID": o.ID,
@@ -74,7 +129,7 @@
"onuID": o.ID,
"onuSN": o.SerialNumber,
"messageType": message.Type,
- }).Trace("Received message")
+ }).Tracef("Received message on ONU Channel")
switch message.Type {
case OnuDiscIndication:
@@ -85,14 +140,53 @@
o.sendOnuIndication(msg, stream)
case OMCI:
msg, _ := message.Data.(OmciMessage)
- o.InternalState.Event("start_omci")
o.handleOmciMessage(msg, stream)
+ case FlowUpdate:
+ msg, _ := message.Data.(OnuFlowUpdateMessage)
+ o.handleFlowUpdate(msg, stream)
+ case StartEAPOL:
+ log.Infof("Receive StartEAPOL message on ONU channel")
+ go func() {
+
+ responders.StartWPASupplicant(o.ID, o.PonPortID, o.SerialNumber, stream, o.eapolPktOutCh)
+ }()
default:
onuLogger.Warnf("Received unknown message data %v for type %v in OLT channel", message.Data, message.Type)
}
}
}
+func (o Onu) processOmciMessages(stream openolt.Openolt_EnableIndicationServer) {
+ ch := omci.GetChannel()
+
+ onuLogger.WithFields(log.Fields{
+ "onuID": o.ID,
+ "onuSN": o.SerialNumber,
+ }).Debug("Started OMCI Indication Channel")
+
+ for message := range ch {
+ switch message.Type {
+ case omci.GemPortAdded:
+ log.WithFields(log.Fields{
+ "OnuId": message.Data.OnuId,
+ "IntfId": message.Data.IntfId,
+ }).Infof("GemPort Added")
+
+ // NOTE if we receive the GemPort but we don't have EAPOL flows
+ // go an intermediate state, otherwise start auth
+ if o.InternalState.Is("enabled") {
+ if err := o.InternalState.Event("add_gem_port"); err != nil {
+ log.Errorf("Can't go to gem_port_added: %v", err)
+ }
+ } else if o.InternalState.Is("eapol_flow_received"){
+ if err := o.InternalState.Event("start_auth"); err != nil {
+ log.Errorf("Can't go to auth_started: %v", err)
+ }
+ }
+ }
+ }
+}
+
func (o Onu) NewSN(oltid int, intfid uint32, onuid uint32) *openolt.SerialNumber {
sn := new(openolt.SerialNumber)
@@ -116,6 +210,7 @@
onuLogger.WithFields(log.Fields{
"IntfId": msg.Onu.PonPortID,
"SerialNumber": msg.Onu.SerialNumber,
+ "OnuId": o.ID,
}).Debug("Sent Indication_OnuDiscInd")
}
@@ -175,6 +270,33 @@
}).Tracef("Sent OMCI message")
}
+func (o Onu) handleFlowUpdate(msg OnuFlowUpdateMessage, stream openolt.Openolt_EnableIndicationServer) {
+ onuLogger.WithFields(log.Fields{
+ "IntfId": msg.Flow.AccessIntfId,
+ "OnuId": msg.Flow.OnuId,
+ "EthType": fmt.Sprintf("%x", msg.Flow.Classifier.EthType),
+ "InnerVlan": msg.Flow.Classifier.IVid,
+ "OuterVlan": msg.Flow.Classifier.OVid,
+ "FlowType": msg.Flow.FlowType,
+ "FlowId": msg.Flow.FlowId,
+ "UniID": msg.Flow.UniId,
+ "PortNo": msg.Flow.PortNo,
+ }).Infof("ONU receives Flow")
+ if msg.Flow.Classifier.EthType == uint32(layers.EthernetTypeEAPOL) && msg.Flow.Classifier.OVid == 4091 {
+ // NOTE if we receive the EAPOL flows but we don't have GemPorts
+ // go an intermediate state, otherwise start auth
+ if o.InternalState.Is("enabled") {
+ if err := o.InternalState.Event("receive_eapol_flow"); err != nil {
+ log.Errorf("Can't go to eapol_flow_received: %v", err)
+ }
+ } else if o.InternalState.Is("gem_port_added"){
+ if err := o.InternalState.Event("start_auth"); err != nil {
+ log.Errorf("Can't go to auth_started: %v", err)
+ }
+ }
+ }
+}
+
// HexDecode converts the hex encoding to binary
func HexDecode(pkt []byte) []byte {
p := make([]byte, len(pkt)/2)