[VOL-2778] Introducing Service definition in order to support the TT workflow
Change-Id: Ib171502e8940b5d0b219620a4503f7095d376d7a
diff --git a/internal/bbsim/devices/onu.go b/internal/bbsim/devices/onu.go
index 6c53b68..0f78107 100644
--- a/internal/bbsim/devices/onu.go
+++ b/internal/bbsim/devices/onu.go
@@ -18,8 +18,10 @@
import (
"context"
- "errors"
"fmt"
+ "github.com/opencord/bbsim/internal/bbsim/packetHandlers"
+ "github.com/opencord/bbsim/internal/bbsim/responders/dhcp"
+ "github.com/opencord/bbsim/internal/bbsim/responders/eapol"
"net"
"time"
@@ -28,10 +30,6 @@
"github.com/google/gopacket/layers"
"github.com/jpillora/backoff"
"github.com/looplab/fsm"
- "github.com/opencord/bbsim/internal/bbsim/packetHandlers"
- "github.com/opencord/bbsim/internal/bbsim/responders/dhcp"
- "github.com/opencord/bbsim/internal/bbsim/responders/eapol"
- "github.com/opencord/bbsim/internal/bbsim/responders/igmp"
"github.com/opencord/bbsim/internal/common"
omcilib "github.com/opencord/bbsim/internal/common/omci"
omcisim "github.com/opencord/omci-sim"
@@ -53,31 +51,26 @@
ID uint32
PonPortID uint32
PonPort *PonPort
- STag int
- CTag int
- Auth bool // automatically start EAPOL if set to true
- Dhcp bool // automatically start DHCP if set to true
- HwAddress net.HardwareAddr
InternalState *fsm.FSM
DiscoveryRetryDelay time.Duration // this is the time between subsequent Discovery Indication
DiscoveryDelay time.Duration // this is the time to send the first Discovery Indication
- Backoff *backoff.Backoff
+
+ Services []ServiceIf
+
+ Backoff *backoff.Backoff
// ONU State
// PortNo comes with flows and it's used when sending packetIndications,
// There is one PortNo per UNI Port, for now we're only storing the first one
// FIXME add support for multiple UNIs (each UNI has a different PortNo)
- PortNo uint32
- GemPortAdded bool
- EapolFlowReceived bool
- DhcpFlowReceived bool
- Flows []FlowKey
- FlowIds []uint32 // keep track of the flows we currently have in the ONU
+ PortNo uint32
+ GemPortAdded bool
+ Flows []FlowKey
+ FlowIds []uint32 // keep track of the flows we currently have in the ONU
OperState *fsm.FSM
SerialNumber *openolt.SerialNumber
- Channel chan Message // this Channel is to track state changes OMCI messages, EAPOL and DHCP packets
- GemPortChannels []chan bool // this channels are used to notify everyone that is interested that a GemPort has been added
+ Channel chan Message // this Channel is to track state changes OMCI messages, EAPOL and DHCP packets
// OMCI params
tid uint16
@@ -92,13 +85,7 @@
return common.OnuSnToString(o.SerialNumber)
}
-func (o *Onu) GetGemPortChan() chan bool {
- listener := make(chan bool, 1)
- o.GemPortChannels = append(o.GemPortChannels, listener)
- return listener
-}
-
-func CreateONU(olt *OltDevice, pon *PonPort, id uint32, sTag int, cTag int, auth bool, dhcp bool, delay time.Duration, isMock bool) *Onu {
+func CreateONU(olt *OltDevice, pon *PonPort, id uint32, delay time.Duration, isMock bool) *Onu {
b := &backoff.Backoff{
//These are the defaults
Min: 5 * time.Second,
@@ -108,21 +95,14 @@
}
o := Onu{
- ID: 0,
+ ID: id,
PonPortID: pon.ID,
PonPort: pon,
- STag: sTag,
- CTag: cTag,
- Auth: auth,
- Dhcp: dhcp,
- HwAddress: net.HardwareAddr{0x2e, 0x60, 0x70, byte(olt.ID), byte(pon.ID), byte(id)},
PortNo: 0,
tid: 0x1,
hpTid: 0x8000,
seqNumber: 0,
DoneChannel: make(chan bool, 1),
- DhcpFlowReceived: false,
- EapolFlowReceived: false,
GemPortAdded: false,
DiscoveryRetryDelay: 60 * time.Second, // this is used to send OnuDiscoveryIndications until an activate call is received
Flows: []FlowKey{},
@@ -152,28 +132,10 @@
{Name: "disable", Src: []string{"enabled", "eap_response_success_received", "auth_failed", "dhcp_ack_received", "dhcp_failed", "pon_disabled"}, Dst: "disabled"},
// ONU state when PON port is disabled but ONU is power ON(more states should be added in src?)
{Name: "pon_disabled", Src: []string{"enabled", "eap_response_success_received", "auth_failed", "dhcp_ack_received", "dhcp_failed"}, Dst: "pon_disabled"},
- // EAPOL
- {Name: "start_auth", Src: []string{"enabled", "eap_start_sent", "eap_response_identity_sent", "eap_response_challenge_sent", "eap_response_success_received", "auth_failed", "dhcp_ack_received", "dhcp_failed", "igmp_join_started", "igmp_left", "igmp_join_error"}, Dst: "auth_started"},
- {Name: "eap_start_sent", Src: []string{"auth_started"}, Dst: "eap_start_sent"},
- {Name: "eap_response_identity_sent", Src: []string{"eap_start_sent"}, Dst: "eap_response_identity_sent"},
- {Name: "eap_response_challenge_sent", Src: []string{"eap_response_identity_sent"}, Dst: "eap_response_challenge_sent"},
- {Name: "eap_response_success_received", Src: []string{"eap_response_challenge_sent"}, Dst: "eap_response_success_received"},
- {Name: "auth_failed", Src: []string{"auth_started", "eap_start_sent", "eap_response_identity_sent", "eap_response_challenge_sent"}, Dst: "auth_failed"},
- // DHCP
- {Name: "start_dhcp", Src: []string{"enabled", "eap_response_success_received", "dhcp_discovery_sent", "dhcp_request_sent", "dhcp_ack_received", "dhcp_failed", "igmp_join_started", "igmp_left", "igmp_join_error"}, Dst: "dhcp_started"},
- {Name: "dhcp_discovery_sent", Src: []string{"dhcp_started"}, Dst: "dhcp_discovery_sent"},
- {Name: "dhcp_request_sent", Src: []string{"dhcp_discovery_sent"}, Dst: "dhcp_request_sent"},
- {Name: "dhcp_ack_received", Src: []string{"dhcp_request_sent"}, Dst: "dhcp_ack_received"},
- {Name: "dhcp_failed", Src: []string{"dhcp_started", "dhcp_discovery_sent", "dhcp_request_sent"}, Dst: "dhcp_failed"},
// BBR States
// TODO add start OMCI state
{Name: "send_eapol_flow", Src: []string{"initialized"}, Dst: "eapol_flow_sent"},
{Name: "send_dhcp_flow", Src: []string{"eapol_flow_sent"}, Dst: "dhcp_flow_sent"},
- // IGMP
- {Name: "igmp_join_start", Src: []string{"eap_response_success_received", "dhcp_ack_received", "igmp_left", "igmp_join_error", "igmp_join_started"}, Dst: "igmp_join_started"},
- {Name: "igmp_join_startv3", Src: []string{"eap_response_success_received", "dhcp_ack_received", "igmp_left", "igmp_join_error", "igmp_join_started"}, Dst: "igmp_join_started"},
- {Name: "igmp_join_error", Src: []string{"igmp_join_started"}, Dst: "igmp_join_error"},
- {Name: "igmp_leave", Src: []string{"igmp_join_started", "eap_response_success_received", "dhcp_ack_received"}, Dst: "igmp_left"},
},
fsm.Callbacks{
"enter_state": func(e *fsm.Event) {
@@ -193,7 +155,7 @@
if !isMock {
// start ProcessOnuMessages Go routine
- go o.ProcessOnuMessages(olt.enableContext, *olt.OpenoltStream, nil)
+ go o.ProcessOnuMessages(olt.enableContext, olt.OpenoltStream, nil)
}
},
"enter_discovered": func(e *fsm.Event) {
@@ -220,8 +182,6 @@
"enter_disabled": func(event *fsm.Event) {
// clean the ONU state
- o.DhcpFlowReceived = false
- o.EapolFlowReceived = false
o.GemPortAdded = false
o.PortNo = 0
o.Flows = []FlowKey{}
@@ -252,75 +212,7 @@
close(o.Channel)
}
},
- "before_start_auth": func(e *fsm.Event) {
- if !o.EapolFlowReceived {
- e.Cancel(errors.New("cannot-go-to-auth-started-as-eapol-flow-is-missing"))
- return
- }
- if !o.GemPortAdded {
- e.Cancel(errors.New("cannot-go-to-auth-started-as-gemport-is-missing"))
- return
- }
- },
- "enter_auth_started": func(e *fsm.Event) {
- o.logStateChange(e.Src, e.Dst)
- msg := Message{
- Type: StartEAPOL,
- Data: PacketMessage{
- PonPortID: o.PonPortID,
- OnuID: o.ID,
- },
- }
- o.Channel <- msg
- },
- "enter_eap_response_success_received": func(e *fsm.Event) {
- publishEvent("ONU-authentication-done", int32(o.PonPortID), int32(o.ID), o.Sn())
- },
- "enter_auth_failed": func(e *fsm.Event) {
- onuLogger.WithFields(log.Fields{
- "OnuId": o.ID,
- "IntfId": o.PonPortID,
- "OnuSn": o.Sn(),
- }).Errorf("ONU failed to authenticate!")
- },
- "before_start_dhcp": func(e *fsm.Event) {
-
- // we allow transition from eanbled to dhcp_started only if auth was set to false
- if o.InternalState.Current() == "enabled" && o.Auth {
- e.Cancel(errors.New("cannot-go-to-dhcp-started-as-authentication-is-required"))
- return
- }
-
- if !o.DhcpFlowReceived {
- e.Cancel(errors.New("cannot-go-to-dhcp-started-as-dhcp-flow-is-missing"))
- return
- }
-
- if !o.GemPortAdded {
- e.Cancel(errors.New("cannot-go-to-dhcp-started-as-gemport-is-missing"))
- return
- }
- },
- "enter_dhcp_started": func(e *fsm.Event) {
- msg := Message{
- Type: StartDHCP,
- Data: PacketMessage{
- PonPortID: o.PonPortID,
- OnuID: o.ID,
- },
- }
- o.Channel <- msg
- },
- "enter_dhcp_ack_received": func(e *fsm.Event) {
- publishEvent("ONU-DHCP-ACK-received", int32(o.PonPortID), int32(o.ID), o.Sn())
- },
- "enter_dhcp_failed": func(e *fsm.Event) {
- onuLogger.WithFields(log.Fields{
- "OnuId": o.ID,
- "IntfId": o.PonPortID,
- "OnuSn": o.Sn(),
- }).Errorf("ONU failed to DHCP!")
- },
+ // BBR states
"enter_eapol_flow_sent": func(e *fsm.Event) {
msg := Message{
Type: SendEapolFlow,
@@ -413,10 +305,6 @@
case FlowRemoved:
msg, _ := message.Data.(OnuFlowUpdateMessage)
o.handleFlowRemove(msg)
- case StartEAPOL:
- o.handleEAPOLStart(stream)
- case StartDHCP:
- o.handleDHCPStart(stream)
case OnuPacketOut:
msg, _ := message.Data.(OnuPacketMessage)
@@ -427,13 +315,20 @@
"pktType": msg.Type,
}).Trace("Received OnuPacketOut Message")
- if msg.Type == packetHandlers.EAPOL {
- eapol.HandleNextPacket(msg.OnuId, msg.IntfId, o.Sn(), o.PortNo, o.InternalState, msg.Packet, stream, client)
- } else if msg.Type == packetHandlers.DHCP {
- // NOTE here we receive packets going from the DHCP Server to the ONU
- // for now we expect them to be double-tagged, but ideally the should be single tagged
- _ = dhcp.HandleNextPacket(o.PonPort.Olt.ID, o.ID, o.PonPortID, o.Sn(), o.PortNo, o.HwAddress, o.CTag, o.InternalState, msg.Packet, stream)
+ service, err := o.findServiceByMacAddress(msg.MacAddress)
+ if err != nil {
+ onuLogger.WithFields(log.Fields{
+ "IntfId": msg.IntfId,
+ "OnuId": msg.OnuId,
+ "pktType": msg.Type,
+ "MacAddress": msg.MacAddress,
+ "OnuSn": o.Sn(),
+ }).Error("Cannot find Service associated with packet")
+ return
}
+
+ service.PacketCh <- msg
+
case OnuPacketIn:
// NOTE we only receive BBR packets here.
// Eapol.HandleNextPacket can handle both BBSim and BBr cases so the call is the same
@@ -447,9 +342,9 @@
}).Trace("Received OnuPacketIn Message")
if msg.Type == packetHandlers.EAPOL {
- eapol.HandleNextPacket(msg.OnuId, msg.IntfId, o.Sn(), o.PortNo, o.InternalState, msg.Packet, stream, client)
+ eapol.HandleNextPacket(msg.OnuId, msg.IntfId, msg.GemPortId, o.Sn(), o.PortNo, o.InternalState, msg.Packet, stream, client)
} else if msg.Type == packetHandlers.DHCP {
- _ = dhcp.HandleNextBbrPacket(o.ID, o.PonPortID, o.Sn(), o.STag, o.HwAddress, o.DoneChannel, msg.Packet, client)
+ _ = dhcp.HandleNextBbrPacket(o.ID, o.PonPortID, o.Sn(), o.DoneChannel, msg.Packet, client)
}
case OmciIndication:
msg, _ := message.Data.(OmciIndicationMessage)
@@ -458,15 +353,6 @@
o.sendEapolFlow(client)
case SendDhcpFlow:
o.sendDhcpFlow(client)
- case IGMPMembershipReportV2:
- log.Infof("Recieved IGMPMembershipReportV2 message on ONU channel")
- _ = igmp.SendIGMPMembershipReportV2(o.PonPortID, o.ID, o.Sn(), o.PortNo, o.HwAddress, stream)
- case IGMPLeaveGroup:
- log.Infof("Recieved IGMPLeaveGroupV2 message on ONU channel")
- _ = igmp.SendIGMPLeaveGroupV2(o.PonPortID, o.ID, o.Sn(), o.PortNo, o.HwAddress, stream)
- case IGMPMembershipReportV3:
- log.Infof("Recieved IGMPMembershipReportV3 message on ONU channel")
- _ = igmp.SendIGMPMembershipReportV3(o.PonPortID, o.ID, o.Sn(), o.PortNo, o.HwAddress, stream)
default:
onuLogger.Warnf("Received unknown message data %v for type %v in OLT Channel", message.Data, message.Type)
}
@@ -485,7 +371,7 @@
"OnuId": message.Data.OnuId,
"IntfId": message.Data.IntfId,
"Type": message.Type,
- }).Infof("UNI Link Alarm")
+ }).Debug("UNI Link Alarm")
// TODO send to OLT
omciInd := openolt.OmciIndication{
@@ -510,60 +396,10 @@
"SerialNumber": o.Sn(),
"Type": message.Type,
"omciPacket": omciInd.Pkt,
- }).Info("UNI Link alarm sent")
-
- case omcisim.GemPortAdded:
- log.WithFields(log.Fields{
- "OnuId": message.Data.OnuId,
- "IntfId": message.Data.IntfId,
- "OnuSn": o.Sn(),
- }).Infof("GemPort Added")
-
- o.GemPortAdded = true
-
- // broadcast the change to all listeners
- // and close the channels as once the GemPort is set
- // it won't change anymore
- for _, ch := range o.GemPortChannels {
- ch <- true
- close(ch)
- }
- o.GemPortChannels = []chan bool{}
+ }).Debug("UNI Link alarm sent")
}
}
-func (o *Onu) handleEAPOLStart(stream openolt.Openolt_EnableIndicationServer) {
- log.Infof("Receive StartEAPOL message on ONU Channel")
- _ = eapol.SendEapStart(o.ID, o.PonPortID, o.Sn(), o.PortNo, o.HwAddress, o.InternalState, stream)
- go func(delay time.Duration) {
- time.Sleep(delay)
- if (o.InternalState.Current() == "eap_start_sent" ||
- o.InternalState.Current() == "eap_response_identity_sent" ||
- o.InternalState.Current() == "eap_response_challenge_sent" ||
- o.InternalState.Current() == "auth_failed") && common.Options.BBSim.AuthRetry {
- _ = o.InternalState.Event("start_auth")
- } else if o.InternalState.Current() == "eap_response_success_received" {
- o.Backoff.Reset()
- }
- }(o.Backoff.Duration())
-}
-
-func (o *Onu) handleDHCPStart(stream openolt.Openolt_EnableIndicationServer) {
- log.Infof("Receive StartDHCP message on ONU Channel")
- // FIXME use id, ponId as SendEapStart
- _ = dhcp.SendDHCPDiscovery(o.PonPort.Olt.ID, o.PonPortID, o.ID, o.Sn(), o.PortNo, o.InternalState, o.HwAddress, o.CTag, stream)
- go func(delay time.Duration) {
- time.Sleep(delay)
- if (o.InternalState.Current() == "dhcp_discovery_sent" ||
- o.InternalState.Current() == "dhcp_request_sent" ||
- o.InternalState.Current() == "dhcp_failed") && common.Options.BBSim.DhcpRetry {
- _ = o.InternalState.Event("start_dhcp")
- } else if o.InternalState.Current() == "dhcp_ack_received" {
- o.Backoff.Reset()
- }
- }(o.Backoff.Duration())
-}
-
func (o Onu) NewSN(oltid int, intfid uint32, onuid uint32) *openolt.SerialNumber {
sn := new(openolt.SerialNumber)
@@ -627,6 +463,10 @@
"OnuSn": o.Sn(),
}).Debug("Sent Indication_OnuInd")
+ for _, s := range o.Services {
+ go s.HandlePackets(stream)
+ }
+
}
func (o *Onu) publishOmciEvent(msg OmciMessage) {
@@ -790,83 +630,22 @@
}
o.FlowIds = append(o.FlowIds, msg.Flow.FlowId)
+ o.addGemPortToService(uint32(msg.Flow.GemportId), msg.Flow.Classifier.EthType, msg.Flow.Classifier.OVid, msg.Flow.Classifier.IVid)
if msg.Flow.Classifier.EthType == uint32(layers.EthernetTypeEAPOL) && msg.Flow.Classifier.OVid == 4091 {
// NOTE storing the PortNO, it's needed when sending PacketIndications
o.storePortNumber(uint32(msg.Flow.PortNo))
- o.EapolFlowReceived = true
- // if authentication is not enabled, do nothing
- if o.Auth {
- // NOTE if we receive the EAPOL flows but we don't have GemPorts
- // wait for it before starting auth
- if !o.GemPortAdded {
- // wait for Gem and then start auth
- go func() {
- for v := range o.GetGemPortChan() {
- if v {
- if err := o.InternalState.Event("start_auth"); err != nil {
- onuLogger.Warnf("Can't go to auth_started: %v", err)
- }
- }
- }
- onuLogger.Trace("GemPortChannel closed")
- }()
- } else {
- // start the EAPOL state machine
- if err := o.InternalState.Event("start_auth"); err != nil {
- onuLogger.Warnf("Can't go to auth_started: %v", err)
- }
- }
- } else {
- onuLogger.WithFields(log.Fields{
- "IntfId": o.PonPortID,
- "OnuId": o.ID,
- "SerialNumber": o.Sn(),
- }).Warn("Not starting authentication as Auth bit is not set in CLI parameters")
+
+ for _, s := range o.Services {
+ s.HandleAuth(o.PonPort.Olt.OpenoltStream)
}
} else if msg.Flow.Classifier.EthType == uint32(layers.EthernetTypeIPv4) &&
msg.Flow.Classifier.SrcPort == uint32(68) &&
msg.Flow.Classifier.DstPort == uint32(67) &&
(msg.Flow.Classifier.OPbits == 0 || msg.Flow.Classifier.OPbits == 255) {
- if o.Dhcp {
- if !o.DhcpFlowReceived {
- // keep track that we received the DHCP Flows
- // so that we can transition the state to dhcp_started
- // this is needed as a check in case someone trigger DHCP from the CLI
- o.DhcpFlowReceived = true
-
- if !o.GemPortAdded {
- // wait for Gem and then start DHCP
- go func() {
- for v := range o.GetGemPortChan() {
- if v {
- if err := o.InternalState.Event("start_dhcp"); err != nil {
- log.Errorf("Can't go to dhcp_started: %v", err)
- }
- }
- }
- }()
- } else {
- // start the DHCP state machine
- if err := o.InternalState.Event("start_dhcp"); err != nil {
- log.Errorf("Can't go to dhcp_started: %v", err)
- }
- }
- } else {
- onuLogger.WithFields(log.Fields{
- "IntfId": o.PonPortID,
- "OnuId": o.ID,
- "SerialNumber": o.Sn(),
- "DhcpFlowReceived": o.DhcpFlowReceived,
- }).Warn("DHCP already started")
- }
- } else {
- onuLogger.WithFields(log.Fields{
- "IntfId": o.PonPortID,
- "OnuId": o.ID,
- "SerialNumber": o.Sn(),
- }).Warn("Not starting DHCP as Dhcp bit is not set in CLI parameters")
+ for _, s := range o.Services {
+ s.HandleDhcp(o.PonPort.Olt.OpenoltStream, int(msg.Flow.Classifier.OVid))
}
}
}
@@ -896,11 +675,6 @@
}).Info("Resetting GemPort")
o.GemPortAdded = false
- // TODO ideally we should keep track of the flow type (and not only the ID)
- // so that we can properly set these two flag when the flow is removed
- o.EapolFlowReceived = false
- o.DhcpFlowReceived = false
-
// check if ONU delete is performed and
// terminate the ONU's ProcessOnuMessages Go routine
if o.InternalState.Current() == "disabled" {
@@ -1075,10 +849,15 @@
}
func (o *Onu) sendDhcpFlow(client openolt.OpenoltClient) {
+
+ // BBR only works with a single service (ATT HSIA)
+ hsia := o.Services[0].(*Service)
+
classifierProto := openolt.Classifier{
EthType: uint32(layers.EthernetTypeIPv4),
SrcPort: uint32(68),
DstPort: uint32(67),
+ OVid: uint32(hsia.CTag),
}
actionProto := openolt.Action{}
@@ -1158,3 +937,36 @@
}).Infof("Failed to transition ONU to discovered state: %s", err.Error())
}
}
+
+func (onu *Onu) addGemPortToService(gemport uint32, ethType uint32, oVlan uint32, iVlan uint32) {
+ for _, s := range onu.Services {
+ if service, ok := s.(*Service); ok {
+ // EAPOL is a strange case, as packets are untagged
+ // but we assume we will have a single service requiring EAPOL
+ if ethType == uint32(layers.EthernetTypeEAPOL) && service.NeedsEapol {
+ service.GemPort = gemport
+ }
+
+ // For DHCP services we single tag the outgoing packets,
+ // thus the flow only contains the CTag and we can use that to match the service
+ if ethType == uint32(layers.EthernetTypeIPv4) && service.NeedsDhcp && service.CTag == int(oVlan) {
+ service.GemPort = gemport
+ }
+
+ // for dataplane services match both C and S tags
+ if service.CTag == int(iVlan) && service.STag == int(oVlan) {
+ service.GemPort = gemport
+ }
+ }
+ }
+}
+
+func (onu *Onu) findServiceByMacAddress(macAddress net.HardwareAddr) (*Service, error) {
+ for _, s := range onu.Services {
+ service := s.(*Service)
+ if service.HwAddress.String() == macAddress.String() {
+ return service, nil
+ }
+ }
+ return nil, fmt.Errorf("cannot-find-service-with-mac-address-%s", macAddress.String())
+}