[SEBA-817][SEBA-821]
Adding c/s tags and hw address in the onu struct
DHCP State machine completed
Cleaned up logs
Change-Id: Iadb1d3967befe1c402e302a552b67faa2701f5c5
diff --git a/internal/bbsim/devices/onu.go b/internal/bbsim/devices/onu.go
index fac7ce1..77e912c 100644
--- a/internal/bbsim/devices/onu.go
+++ b/internal/bbsim/devices/onu.go
@@ -18,129 +18,130 @@
import (
"fmt"
- "github.com/opencord/bbsim/internal/bbsim/responders/eapol"
"github.com/google/gopacket/layers"
"github.com/looplab/fsm"
+ "github.com/opencord/bbsim/internal/bbsim/responders/dhcp"
+ "github.com/opencord/bbsim/internal/bbsim/responders/eapol"
+ bbsim "github.com/opencord/bbsim/internal/bbsim/types"
omci "github.com/opencord/omci-sim"
"github.com/opencord/voltha-protos/go/openolt"
log "github.com/sirupsen/logrus"
- bbsim "github.com/opencord/bbsim/internal/bbsim/types"
+ "net"
)
var onuLogger = log.WithFields(log.Fields{
"module": "ONU",
})
-func CreateONU(olt OltDevice, pon PonPort, id uint32) Onu {
- o := Onu{
- ID: id,
- PonPortID: pon.ID,
- PonPort: pon,
- // NOTE can we combine everything in a single channel?
- channel: make(chan Message),
- eapolPktOutCh: make(chan *bbsim.ByteMsg, 1024),
- }
- o.SerialNumber = o.NewSN(olt.ID, pon.ID, o.ID)
+func CreateONU(olt OltDevice, pon PonPort, id uint32, sTag int, cTag int) Onu {
- // NOTE this state machine is used to track the operational
- // state as requested by VOLTHA
- o.OperState = getOperStateFSM(func(e *fsm.Event) {
- onuLogger.WithFields(log.Fields{
- "ID": o.ID,
- }).Debugf("Changing ONU OperState from %s to %s", e.Src, e.Dst)
- })
+ o := Onu{
+ ID: id,
+ PonPortID: pon.ID,
+ PonPort: pon,
+ STag: sTag,
+ CTag: cTag,
+ HwAddress: net.HardwareAddr{0x2e, 0x60, 0x70, 0x13, byte(pon.ID), byte(id)},
+ // NOTE can we combine everything in a single channel?
+ channel: make(chan Message, 2048),
+ eapolPktOutCh: make(chan *bbsim.ByteMsg, 1024),
+ dhcpPktOutCh: make(chan *bbsim.ByteMsg, 1024),
+ }
+ o.SerialNumber = o.NewSN(olt.ID, pon.ID, o.ID)
- // NOTE this state machine is used to activate the OMCI, EAPOL and DHCP clients
- o.InternalState = fsm.NewFSM(
- "created",
- fsm.Events{
- {Name: "discover", Src: []string{"created"}, Dst: "discovered"},
- {Name: "enable", Src: []string{"discovered"}, Dst: "enabled"},
- {Name: "receive_eapol_flow", Src: []string{"enabled", "gem_port_added"}, Dst: "eapol_flow_received"},
- {Name: "add_gem_port", Src: []string{"enabled", "eapol_flow_received"}, Dst: "gem_port_added"},
- {Name: "start_auth", Src: []string{"eapol_flow_received", "gem_port_added"}, Dst: "auth_started"},
- {Name: "eap_start_sent", Src: []string{"auth_started"}, Dst: "eap_start_sent"},
- {Name: "eap_resonse_identity_sent", Src: []string{"eap_start_sent"}, Dst: "eap_resonse_identity_sent"},
- {Name: "eap_resonse_challenge_sent", Src: []string{"eap_resonse_identity_sent"}, Dst: "eap_resonse_challenge_sent"},
- {Name: "eap_resonse_success_received", Src: []string{"eap_resonse_challenge_sent"}, Dst: "eap_resonse_success_received"},
- {Name: "auth_failed", Src: []string{"auth_started", "eap_start_sent", "eap_resonse_identity_sent", "eap_resonse_challenge_sent"}, Dst: "auth_failed"},
+ // NOTE this state machine is used to track the operational
+ // state as requested by VOLTHA
+ o.OperState = getOperStateFSM(func(e *fsm.Event) {
+ onuLogger.WithFields(log.Fields{
+ "ID": o.ID,
+ }).Debugf("Changing ONU OperState from %s to %s", e.Src, e.Dst)
+ })
+
+ // NOTE this state machine is used to activate the OMCI, EAPOL and DHCP clients
+ o.InternalState = fsm.NewFSM(
+ "created",
+ fsm.Events{
+ // DEVICE Activation
+ {Name: "discover", Src: []string{"created"}, Dst: "discovered"},
+ {Name: "enable", Src: []string{"discovered"}, Dst: "enabled"},
+ {Name: "receive_eapol_flow", Src: []string{"enabled", "gem_port_added"}, Dst: "eapol_flow_received"},
+ {Name: "add_gem_port", Src: []string{"enabled", "eapol_flow_received"}, Dst: "gem_port_added"},
+ // EAPOL
+ {Name: "start_auth", Src: []string{"eapol_flow_received", "gem_port_added"}, Dst: "auth_started"},
+ {Name: "eap_start_sent", Src: []string{"auth_started"}, Dst: "eap_start_sent"},
+ {Name: "eap_response_identity_sent", Src: []string{"eap_start_sent"}, Dst: "eap_response_identity_sent"},
+ {Name: "eap_response_challenge_sent", Src: []string{"eap_response_identity_sent"}, Dst: "eap_response_challenge_sent"},
+ {Name: "eap_response_success_received", Src: []string{"eap_response_challenge_sent"}, Dst: "eap_response_success_received"},
+ {Name: "auth_failed", Src: []string{"auth_started", "eap_start_sent", "eap_response_identity_sent", "eap_response_challenge_sent"}, Dst: "auth_failed"},
+ // DHCP
+ {Name: "start_dhcp", Src: []string{"eap_response_success_received"}, Dst: "dhcp_started"},
+ {Name: "dhcp_discovery_sent", Src: []string{"dhcp_started"}, Dst: "dhcp_discovery_sent"},
+ {Name: "dhcp_request_sent", Src: []string{"dhcp_discovery_sent"}, Dst: "dhcp_request_sent"},
+ {Name: "dhcp_ack_received", Src: []string{"dhcp_request_sent"}, Dst: "dhcp_ack_received"},
+ {Name: "dhcp_failed", Src: []string{"dhcp_started", "dhcp_discovery_sent", "dhcp_request_sent"}, Dst: "dhcp_failed"},
+ },
+ fsm.Callbacks{
+ "enter_state": func(e *fsm.Event) {
+ o.logStateChange(e.Src, e.Dst)
},
- fsm.Callbacks{
- "enter_state": func(e *fsm.Event) {
- o.logStateChange(e.Src, e.Dst)
- },
- "enter_eapol_flow_received": func(e *fsm.Event) {
- o.logStateChange(e.Src, e.Dst)
- if e.Src == "enter_gem_port_added" {
- if err := o.InternalState.Event("start_auth"); err != nil {
- log.Infof("Transitioning to StartAuth")
- onuLogger.WithFields(log.Fields{
- "OnuId": o.ID,
- "IntfId": o.PonPortID,
- "OnuSn": o.SerialNumber,
- }).Errorf("Error while transitioning ONU State")
- }
- }
- },
- "enter_gem_port_added": func(e *fsm.Event) {
- o.logStateChange(e.Src, e.Dst)
- if e.Src == "eapol_flow_received" {
- log.Infof("Transitioning to StartAuth")
- if err := o.InternalState.Event("start_auth"); err != nil {
- onuLogger.WithFields(log.Fields{
- "OnuId": o.ID,
- "IntfId": o.PonPortID,
- "OnuSn": o.SerialNumber,
- }).Errorf("Error while transitioning ONU State")
- }
- }
- },
- "enter_auth_started": func(e *fsm.Event) {
- o.logStateChange(e.Src, e.Dst)
- msg := Message{
- Type: StartEAPOL,
- Data: EapStartMessage{
- PonPortID: o.PonPortID,
- OnuID: o.ID,
- },
- }
- go func(msg Message){
- // you can only send a value on an unbuffered channel without blocking
- o.channel <- msg
- }(msg)
-
- },
- "enter_eap_resonse_success_received": func(e *fsm.Event) {
- o.logStateChange(e.Src, e.Dst)
- onuLogger.WithFields(log.Fields{
- "OnuId": o.ID,
- "IntfId": o.PonPortID,
- "OnuSn": o.SerialNumber,
- }).Warnf("TODO start DHCP request")
- },
+ "enter_auth_started": func(e *fsm.Event) {
+ o.logStateChange(e.Src, e.Dst)
+ msg := Message{
+ Type: StartEAPOL,
+ Data: PacketMessage{
+ PonPortID: o.PonPortID,
+ OnuID: o.ID,
+ },
+ }
+ o.channel <- msg
},
- )
- return o
+ "enter_auth_failed": func(e *fsm.Event) {
+ onuLogger.WithFields(log.Fields{
+ "OnuId": o.ID,
+ "IntfId": o.PonPortID,
+ "OnuSn": o.Sn(),
+ }).Errorf("ONU failed to authenticate!")
+ },
+ "enter_dhcp_started": func(e *fsm.Event) {
+ msg := Message{
+ Type: StartDHCP,
+ Data: PacketMessage{
+ PonPortID: o.PonPortID,
+ OnuID: o.ID,
+ },
+ }
+ o.channel <- msg
+ },
+ "enter_dhcp_failed": func(e *fsm.Event) {
+ onuLogger.WithFields(log.Fields{
+ "OnuId": o.ID,
+ "IntfId": o.PonPortID,
+ "OnuSn": o.Sn(),
+ }).Errorf("ONU failed to DHCP!")
+ },
+ },
+ )
+ return o
}
func (o Onu) logStateChange(src string, dst string) {
onuLogger.WithFields(log.Fields{
- "OnuId": o.ID,
+ "OnuId": o.ID,
"IntfId": o.PonPortID,
- "OnuSn": o.SerialNumber,
+ "OnuSn": o.Sn(),
}).Debugf("Changing ONU InternalState from %s to %s", src, dst)
}
-func (o Onu) processOnuMessages(stream openolt.Openolt_EnableIndicationServer) {
+func (o Onu) processOnuMessages(stream openolt.Openolt_EnableIndicationServer) {
onuLogger.WithFields(log.Fields{
"onuID": o.ID,
- "onuSN": o.SerialNumber,
+ "onuSN": o.Sn(),
}).Debug("Started ONU Indication Channel")
for message := range o.channel {
onuLogger.WithFields(log.Fields{
- "onuID": o.ID,
- "onuSN": o.SerialNumber,
+ "onuID": o.ID,
+ "onuSN": o.SerialNumber,
"messageType": message.Type,
}).Tracef("Received message on ONU Channel")
@@ -161,7 +162,29 @@
log.Infof("Receive StartEAPOL message on ONU channel")
go func() {
// TODO kill this thread
- eapol.CreateWPASupplicant(o.ID, o.PonPortID, o.SerialNumber, o.InternalState, stream, o.eapolPktOutCh)
+ eapol.CreateWPASupplicant(
+ o.ID,
+ o.PonPortID,
+ o.Sn(),
+ o.InternalState,
+ stream,
+ o.eapolPktOutCh,
+ )
+ }()
+ case StartDHCP:
+ log.Infof("Receive StartDHCP message on ONU channel")
+ go func() {
+ // TODO kill this thread
+ dhcp.CreateDHCPClient(
+ o.ID,
+ o.PonPortID,
+ o.Sn(),
+ o.HwAddress,
+ o.CTag,
+ o.InternalState,
+ stream,
+ o.dhcpPktOutCh,
+ )
}()
default:
onuLogger.Warnf("Received unknown message data %v for type %v in OLT channel", message.Data, message.Type)
@@ -169,19 +192,19 @@
}
}
-func (o Onu) processOmciMessages(stream openolt.Openolt_EnableIndicationServer) {
+func (o Onu) processOmciMessages(stream openolt.Openolt_EnableIndicationServer) {
ch := omci.GetChannel()
onuLogger.WithFields(log.Fields{
"onuID": o.ID,
- "onuSN": o.SerialNumber,
+ "onuSN": o.Sn(),
}).Debug("Started OMCI Indication Channel")
for message := range ch {
switch message.Type {
case omci.GemPortAdded:
log.WithFields(log.Fields{
- "OnuId": message.Data.OnuId,
+ "OnuId": message.Data.OnuId,
"IntfId": message.Data.IntfId,
}).Infof("GemPort Added")
@@ -191,7 +214,7 @@
if err := o.InternalState.Event("add_gem_port"); err != nil {
log.Errorf("Can't go to gem_port_added: %v", err)
}
- } else if o.InternalState.Is("eapol_flow_received"){
+ } else if o.InternalState.Is("eapol_flow_received") {
if err := o.InternalState.Event("start_auth"); err != nil {
log.Errorf("Can't go to auth_started: %v", err)
}
@@ -213,7 +236,7 @@
func (o Onu) sendOnuDiscIndication(msg OnuDiscIndicationMessage, stream openolt.Openolt_EnableIndicationServer) {
discoverData := &openolt.Indication_OnuDiscInd{OnuDiscInd: &openolt.OnuDiscIndication{
- IntfId: msg.Onu.PonPortID,
+ IntfId: msg.Onu.PonPortID,
SerialNumber: msg.Onu.SerialNumber,
}}
if err := stream.Send(&openolt.Indication{Data: discoverData}); err != nil {
@@ -222,8 +245,8 @@
o.InternalState.Event("discover")
onuLogger.WithFields(log.Fields{
"IntfId": msg.Onu.PonPortID,
- "SerialNumber": msg.Onu.SerialNumber,
- "OnuId": o.ID,
+ "OnuSn": msg.Onu.Sn(),
+ "OnuId": o.ID,
}).Debug("Sent Indication_OnuDiscInd")
}
@@ -235,10 +258,10 @@
o.OperState.Event("enable")
indData := &openolt.Indication_OnuInd{OnuInd: &openolt.OnuIndication{
- IntfId: o.PonPortID,
- OnuId: o.ID,
- OperState: o.OperState.Current(),
- AdminState: o.OperState.Current(),
+ IntfId: o.PonPortID,
+ OnuId: o.ID,
+ OperState: o.OperState.Current(),
+ AdminState: o.OperState.Current(),
SerialNumber: o.SerialNumber,
}}
if err := stream.Send(&openolt.Indication{Data: indData}); err != nil {
@@ -246,20 +269,20 @@
}
o.InternalState.Event("enable")
onuLogger.WithFields(log.Fields{
- "IntfId": o.PonPortID,
- "OnuId": o.ID,
- "OperState": msg.OperState.String(),
+ "IntfId": o.PonPortID,
+ "OnuId": o.ID,
+ "OperState": msg.OperState.String(),
"AdminState": msg.OperState.String(),
- "SerialNumber": o.SerialNumber,
+ "OnuSn": o.Sn(),
}).Debug("Sent Indication_OnuInd")
}
func (o Onu) handleOmciMessage(msg OmciMessage, stream openolt.Openolt_EnableIndicationServer) {
onuLogger.WithFields(log.Fields{
- "IntfId": o.PonPortID,
+ "IntfId": o.PonPortID,
"SerialNumber": o.SerialNumber,
- "omciPacket": msg.omciMsg.Pkt,
+ "omciPacket": msg.omciMsg.Pkt,
}).Tracef("Received OMCI message")
var omciInd openolt.OmciIndication
@@ -277,36 +300,48 @@
onuLogger.Errorf("send omci indication failed: %v", err)
}
onuLogger.WithFields(log.Fields{
- "IntfId": o.PonPortID,
+ "IntfId": o.PonPortID,
"SerialNumber": o.SerialNumber,
- "omciPacket": omciInd.Pkt,
+ "omciPacket": omciInd.Pkt,
}).Tracef("Sent OMCI message")
}
func (o Onu) handleFlowUpdate(msg OnuFlowUpdateMessage, stream openolt.Openolt_EnableIndicationServer) {
onuLogger.WithFields(log.Fields{
- "IntfId": msg.Flow.AccessIntfId,
- "OnuId": msg.Flow.OnuId,
- "EthType": fmt.Sprintf("%x", msg.Flow.Classifier.EthType),
+ "DstPort": msg.Flow.Classifier.DstPort,
+ "EthType": fmt.Sprintf("%x", msg.Flow.Classifier.EthType),
+ "FlowId": msg.Flow.FlowId,
+ "FlowType": msg.Flow.FlowType,
"InnerVlan": msg.Flow.Classifier.IVid,
+ "IntfId": msg.Flow.AccessIntfId,
+ "IpProto": msg.Flow.Classifier.IpProto,
+ "OnuId": msg.Flow.OnuId,
+ "OnuSn": o.Sn(),
"OuterVlan": msg.Flow.Classifier.OVid,
- "FlowType": msg.Flow.FlowType,
- "FlowId": msg.Flow.FlowId,
- "UniID": msg.Flow.UniId,
- "PortNo": msg.Flow.PortNo,
- }).Infof("ONU receives Flow")
+ "PortNo": msg.Flow.PortNo,
+ "SrcPort": msg.Flow.Classifier.SrcPort,
+ "UniID": msg.Flow.UniId,
+ }).Debug("ONU receives Flow")
+
if msg.Flow.Classifier.EthType == uint32(layers.EthernetTypeEAPOL) && msg.Flow.Classifier.OVid == 4091 {
// NOTE if we receive the EAPOL flows but we don't have GemPorts
// go an intermediate state, otherwise start auth
if o.InternalState.Is("enabled") {
if err := o.InternalState.Event("receive_eapol_flow"); err != nil {
- log.Errorf("Can't go to eapol_flow_received: %v", err)
+ log.Warnf("Can't go to eapol_flow_received: %v", err)
}
- } else if o.InternalState.Is("gem_port_added"){
+ } else if o.InternalState.Is("gem_port_added") {
if err := o.InternalState.Event("start_auth"); err != nil {
- log.Errorf("Can't go to auth_started: %v", err)
+ log.Warnf("Can't go to auth_started: %v", err)
}
}
+ } else if msg.Flow.Classifier.EthType == uint32(layers.EthernetTypeIPv4) &&
+ msg.Flow.Classifier.SrcPort == uint32(68) &&
+ msg.Flow.Classifier.DstPort == uint32(67) {
+ // NOTE we are receiving mulitple DHCP flows but we shouldn't call the transition multiple times
+ if err := o.InternalState.Event("start_dhcp"); err != nil {
+ log.Warnf("Can't go to dhcp_started: %v", err)
+ }
}
}
@@ -321,4 +356,4 @@
}
onuLogger.Tracef("Omci decoded: %x.", p)
return p
-}
\ No newline at end of file
+}