SEBA-901 - handle adapter restart
Stops existing goroutines processing messages and
allows for ONUs not in the initialized state when
adapter reconnects to not attempt to rediscover.
Change-Id: Ie3951d6ad36b7c8b3a4ddfbf55850b8ed7cf35d8
diff --git a/internal/bbsim/devices/onu.go b/internal/bbsim/devices/onu.go
index 4fd18cf..9ab83fa 100644
--- a/internal/bbsim/devices/onu.go
+++ b/internal/bbsim/devices/onu.go
@@ -246,88 +246,105 @@
}
// ProcessOnuMessages starts indication channel for each ONU
-func (o *Onu) ProcessOnuMessages(stream openolt.Openolt_EnableIndicationServer, client openolt.OpenoltClient) {
+func (o *Onu) ProcessOnuMessages(ctx context.Context, stream openolt.Openolt_EnableIndicationServer, client openolt.OpenoltClient) {
onuLogger.WithFields(log.Fields{
"onuID": o.ID,
"onuSN": o.Sn(),
"ponPort": o.PonPortID,
}).Debug("Starting ONU Indication Channel")
- for message := range o.Channel {
- onuLogger.WithFields(log.Fields{
- "onuID": o.ID,
- "onuSN": o.Sn(),
- "messageType": message.Type,
- }).Tracef("Received message on ONU Channel")
-
- switch message.Type {
- case OnuDiscIndication:
- msg, _ := message.Data.(OnuDiscIndicationMessage)
- // NOTE we need to slow down and send ONU Discovery Indication in batches to better emulate a real scenario
- time.Sleep(time.Duration(int(o.ID)*o.PonPort.Olt.Delay) * time.Millisecond)
- o.sendOnuDiscIndication(msg, stream)
- case OnuIndication:
- msg, _ := message.Data.(OnuIndicationMessage)
- o.sendOnuIndication(msg, stream)
- case OMCI:
- msg, _ := message.Data.(OmciMessage)
- o.handleOmciMessage(msg, stream)
- case FlowUpdate:
- msg, _ := message.Data.(OnuFlowUpdateMessage)
- o.handleFlowUpdate(msg)
- case StartEAPOL:
- log.Infof("Receive StartEAPOL message on ONU Channel")
- eapol.SendEapStart(o.ID, o.PonPortID, o.Sn(), o.PortNo, o.HwAddress, o.InternalState, stream)
- case StartDHCP:
- log.Infof("Receive StartDHCP message on ONU Channel")
- // FIXME use id, ponId as SendEapStart
- dhcp.SendDHCPDiscovery(o.PonPortID, o.ID, o.Sn(), o.PortNo, o.InternalState, o.HwAddress, o.CTag, stream)
- case OnuPacketOut:
-
- msg, _ := message.Data.(OnuPacketMessage)
-
- log.WithFields(log.Fields{
- "IntfId": msg.IntfId,
- "OnuId": msg.OnuId,
- "pktType": msg.Type,
- }).Trace("Received OnuPacketOut Message")
-
- if msg.Type == packetHandlers.EAPOL {
- eapol.HandleNextPacket(msg.OnuId, msg.IntfId, o.Sn(), o.PortNo, o.InternalState, msg.Packet, stream, client)
- } else if msg.Type == packetHandlers.DHCP {
- // NOTE here we receive packets going from the DHCP Server to the ONU
- // for now we expect them to be double-tagged, but ideally the should be single tagged
- dhcp.HandleNextPacket(o.ID, o.PonPortID, o.Sn(), o.PortNo, o.HwAddress, o.CTag, o.InternalState, msg.Packet, stream)
+loop:
+ for {
+ select {
+ case <-ctx.Done():
+ onuLogger.WithFields(log.Fields{
+ "onuID": o.ID,
+ "onuSN": o.Sn(),
+ }).Tracef("ONU message handling canceled via context")
+ break loop
+ case message, ok := <-o.Channel:
+ if !ok || ctx.Err() != nil {
+ onuLogger.WithFields(log.Fields{
+ "onuID": o.ID,
+ "onuSN": o.Sn(),
+ }).Tracef("ONU message handling canceled via channel close")
+ break loop
}
- case OnuPacketIn:
- // NOTE we only receive BBR packets here.
- // Eapol.HandleNextPacket can handle both BBSim and BBr cases so the call is the same
- // in the DHCP case VOLTHA only act as a proxy, the behaviour is completely different thus we have a dhcp.HandleNextBbrPacket
- msg, _ := message.Data.(OnuPacketMessage)
+ onuLogger.WithFields(log.Fields{
+ "onuID": o.ID,
+ "onuSN": o.Sn(),
+ "messageType": message.Type,
+ }).Tracef("Received message on ONU Channel")
- log.WithFields(log.Fields{
- "IntfId": msg.IntfId,
- "OnuId": msg.OnuId,
- "pktType": msg.Type,
- }).Trace("Received OnuPacketIn Message")
+ switch message.Type {
+ case OnuDiscIndication:
+ msg, _ := message.Data.(OnuDiscIndicationMessage)
+ // NOTE we need to slow down and send ONU Discovery Indication in batches to better emulate a real scenario
+ time.Sleep(time.Duration(int(o.ID)*o.PonPort.Olt.Delay) * time.Millisecond)
+ o.sendOnuDiscIndication(msg, stream)
+ case OnuIndication:
+ msg, _ := message.Data.(OnuIndicationMessage)
+ o.sendOnuIndication(msg, stream)
+ case OMCI:
+ msg, _ := message.Data.(OmciMessage)
+ o.handleOmciMessage(msg, stream)
+ case FlowUpdate:
+ msg, _ := message.Data.(OnuFlowUpdateMessage)
+ o.handleFlowUpdate(msg)
+ case StartEAPOL:
+ log.Infof("Receive StartEAPOL message on ONU Channel")
+ eapol.SendEapStart(o.ID, o.PonPortID, o.Sn(), o.PortNo, o.HwAddress, o.InternalState, stream)
+ case StartDHCP:
+ log.Infof("Receive StartDHCP message on ONU Channel")
+ // FIXME use id, ponId as SendEapStart
+ dhcp.SendDHCPDiscovery(o.PonPortID, o.ID, o.Sn(), o.PortNo, o.InternalState, o.HwAddress, o.CTag, stream)
+ case OnuPacketOut:
- if msg.Type == packetHandlers.EAPOL {
- eapol.HandleNextPacket(msg.OnuId, msg.IntfId, o.Sn(), o.PortNo, o.InternalState, msg.Packet, stream, client)
- } else if msg.Type == packetHandlers.DHCP {
- dhcp.HandleNextBbrPacket(o.ID, o.PonPortID, o.Sn(), o.STag, o.HwAddress, o.DoneChannel, msg.Packet, client)
+ msg, _ := message.Data.(OnuPacketMessage)
+
+ log.WithFields(log.Fields{
+ "IntfId": msg.IntfId,
+ "OnuId": msg.OnuId,
+ "pktType": msg.Type,
+ }).Trace("Received OnuPacketOut Message")
+
+ if msg.Type == packetHandlers.EAPOL {
+ eapol.HandleNextPacket(msg.OnuId, msg.IntfId, o.Sn(), o.PortNo, o.InternalState, msg.Packet, stream, client)
+ } else if msg.Type == packetHandlers.DHCP {
+ // NOTE here we receive packets going from the DHCP Server to the ONU
+ // for now we expect them to be double-tagged, but ideally the should be single tagged
+ dhcp.HandleNextPacket(o.ID, o.PonPortID, o.Sn(), o.PortNo, o.HwAddress, o.CTag, o.InternalState, msg.Packet, stream)
+ }
+ case OnuPacketIn:
+ // NOTE we only receive BBR packets here.
+ // Eapol.HandleNextPacket can handle both BBSim and BBr cases so the call is the same
+ // in the DHCP case VOLTHA only act as a proxy, the behaviour is completely different thus we have a dhcp.HandleNextBbrPacket
+ msg, _ := message.Data.(OnuPacketMessage)
+
+ log.WithFields(log.Fields{
+ "IntfId": msg.IntfId,
+ "OnuId": msg.OnuId,
+ "pktType": msg.Type,
+ }).Trace("Received OnuPacketIn Message")
+
+ if msg.Type == packetHandlers.EAPOL {
+ eapol.HandleNextPacket(msg.OnuId, msg.IntfId, o.Sn(), o.PortNo, o.InternalState, msg.Packet, stream, client)
+ } else if msg.Type == packetHandlers.DHCP {
+ dhcp.HandleNextBbrPacket(o.ID, o.PonPortID, o.Sn(), o.STag, o.HwAddress, o.DoneChannel, msg.Packet, client)
+ }
+ case DyingGaspIndication:
+ msg, _ := message.Data.(DyingGaspIndicationMessage)
+ o.sendDyingGaspInd(msg, stream)
+ case OmciIndication:
+ msg, _ := message.Data.(OmciIndicationMessage)
+ o.handleOmci(msg, client)
+ case SendEapolFlow:
+ o.sendEapolFlow(client)
+ case SendDhcpFlow:
+ o.sendDhcpFlow(client)
+ default:
+ onuLogger.Warnf("Received unknown message data %v for type %v in OLT Channel", message.Data, message.Type)
}
- case DyingGaspIndication:
- msg, _ := message.Data.(DyingGaspIndicationMessage)
- o.sendDyingGaspInd(msg, stream)
- case OmciIndication:
- msg, _ := message.Data.(OmciIndicationMessage)
- o.handleOmci(msg, client)
- case SendEapolFlow:
- o.sendEapolFlow(client)
- case SendDhcpFlow:
- o.sendDhcpFlow(client)
- default:
- onuLogger.Warnf("Received unknown message data %v for type %v in OLT Channel", message.Data, message.Type)
}
}
onuLogger.WithFields(log.Fields{