SEBA-901 - handle adapter restart

Stops existing goroutines processing messages and
allows for ONUs not in the initialized state when
adapter reconnects to not attempt to rediscover.

Change-Id: Ie3951d6ad36b7c8b3a4ddfbf55850b8ed7cf35d8
diff --git a/internal/bbsim/devices/onu.go b/internal/bbsim/devices/onu.go
index 4fd18cf..9ab83fa 100644
--- a/internal/bbsim/devices/onu.go
+++ b/internal/bbsim/devices/onu.go
@@ -246,88 +246,105 @@
 }
 
 // ProcessOnuMessages starts indication channel for each ONU
-func (o *Onu) ProcessOnuMessages(stream openolt.Openolt_EnableIndicationServer, client openolt.OpenoltClient) {
+func (o *Onu) ProcessOnuMessages(ctx context.Context, stream openolt.Openolt_EnableIndicationServer, client openolt.OpenoltClient) {
 	onuLogger.WithFields(log.Fields{
 		"onuID":   o.ID,
 		"onuSN":   o.Sn(),
 		"ponPort": o.PonPortID,
 	}).Debug("Starting ONU Indication Channel")
 
-	for message := range o.Channel {
-		onuLogger.WithFields(log.Fields{
-			"onuID":       o.ID,
-			"onuSN":       o.Sn(),
-			"messageType": message.Type,
-		}).Tracef("Received message on ONU Channel")
-
-		switch message.Type {
-		case OnuDiscIndication:
-			msg, _ := message.Data.(OnuDiscIndicationMessage)
-			// NOTE we need to slow down and send ONU Discovery Indication in batches to better emulate a real scenario
-			time.Sleep(time.Duration(int(o.ID)*o.PonPort.Olt.Delay) * time.Millisecond)
-			o.sendOnuDiscIndication(msg, stream)
-		case OnuIndication:
-			msg, _ := message.Data.(OnuIndicationMessage)
-			o.sendOnuIndication(msg, stream)
-		case OMCI:
-			msg, _ := message.Data.(OmciMessage)
-			o.handleOmciMessage(msg, stream)
-		case FlowUpdate:
-			msg, _ := message.Data.(OnuFlowUpdateMessage)
-			o.handleFlowUpdate(msg)
-		case StartEAPOL:
-			log.Infof("Receive StartEAPOL message on ONU Channel")
-			eapol.SendEapStart(o.ID, o.PonPortID, o.Sn(), o.PortNo, o.HwAddress, o.InternalState, stream)
-		case StartDHCP:
-			log.Infof("Receive StartDHCP message on ONU Channel")
-			// FIXME use id, ponId as SendEapStart
-			dhcp.SendDHCPDiscovery(o.PonPortID, o.ID, o.Sn(), o.PortNo, o.InternalState, o.HwAddress, o.CTag, stream)
-		case OnuPacketOut:
-
-			msg, _ := message.Data.(OnuPacketMessage)
-
-			log.WithFields(log.Fields{
-				"IntfId":  msg.IntfId,
-				"OnuId":   msg.OnuId,
-				"pktType": msg.Type,
-			}).Trace("Received OnuPacketOut Message")
-
-			if msg.Type == packetHandlers.EAPOL {
-				eapol.HandleNextPacket(msg.OnuId, msg.IntfId, o.Sn(), o.PortNo, o.InternalState, msg.Packet, stream, client)
-			} else if msg.Type == packetHandlers.DHCP {
-				// NOTE here we receive packets going from the DHCP Server to the ONU
-				// for now we expect them to be double-tagged, but ideally the should be single tagged
-				dhcp.HandleNextPacket(o.ID, o.PonPortID, o.Sn(), o.PortNo, o.HwAddress, o.CTag, o.InternalState, msg.Packet, stream)
+loop:
+	for {
+		select {
+		case <-ctx.Done():
+			onuLogger.WithFields(log.Fields{
+				"onuID": o.ID,
+				"onuSN": o.Sn(),
+			}).Tracef("ONU message handling canceled via context")
+			break loop
+		case message, ok := <-o.Channel:
+			if !ok || ctx.Err() != nil {
+				onuLogger.WithFields(log.Fields{
+					"onuID": o.ID,
+					"onuSN": o.Sn(),
+				}).Tracef("ONU message handling canceled via channel close")
+				break loop
 			}
-		case OnuPacketIn:
-			// NOTE we only receive BBR packets here.
-			// Eapol.HandleNextPacket can handle both BBSim and BBr cases so the call is the same
-			// in the DHCP case VOLTHA only act as a proxy, the behaviour is completely different thus we have a dhcp.HandleNextBbrPacket
-			msg, _ := message.Data.(OnuPacketMessage)
+			onuLogger.WithFields(log.Fields{
+				"onuID":       o.ID,
+				"onuSN":       o.Sn(),
+				"messageType": message.Type,
+			}).Tracef("Received message on ONU Channel")
 
-			log.WithFields(log.Fields{
-				"IntfId":  msg.IntfId,
-				"OnuId":   msg.OnuId,
-				"pktType": msg.Type,
-			}).Trace("Received OnuPacketIn Message")
+			switch message.Type {
+			case OnuDiscIndication:
+				msg, _ := message.Data.(OnuDiscIndicationMessage)
+				// NOTE we need to slow down and send ONU Discovery Indication in batches to better emulate a real scenario
+				time.Sleep(time.Duration(int(o.ID)*o.PonPort.Olt.Delay) * time.Millisecond)
+				o.sendOnuDiscIndication(msg, stream)
+			case OnuIndication:
+				msg, _ := message.Data.(OnuIndicationMessage)
+				o.sendOnuIndication(msg, stream)
+			case OMCI:
+				msg, _ := message.Data.(OmciMessage)
+				o.handleOmciMessage(msg, stream)
+			case FlowUpdate:
+				msg, _ := message.Data.(OnuFlowUpdateMessage)
+				o.handleFlowUpdate(msg)
+			case StartEAPOL:
+				log.Infof("Receive StartEAPOL message on ONU Channel")
+				eapol.SendEapStart(o.ID, o.PonPortID, o.Sn(), o.PortNo, o.HwAddress, o.InternalState, stream)
+			case StartDHCP:
+				log.Infof("Receive StartDHCP message on ONU Channel")
+				// FIXME use id, ponId as SendEapStart
+				dhcp.SendDHCPDiscovery(o.PonPortID, o.ID, o.Sn(), o.PortNo, o.InternalState, o.HwAddress, o.CTag, stream)
+			case OnuPacketOut:
 
-			if msg.Type == packetHandlers.EAPOL {
-				eapol.HandleNextPacket(msg.OnuId, msg.IntfId, o.Sn(), o.PortNo, o.InternalState, msg.Packet, stream, client)
-			} else if msg.Type == packetHandlers.DHCP {
-				dhcp.HandleNextBbrPacket(o.ID, o.PonPortID, o.Sn(), o.STag, o.HwAddress, o.DoneChannel, msg.Packet, client)
+				msg, _ := message.Data.(OnuPacketMessage)
+
+				log.WithFields(log.Fields{
+					"IntfId":  msg.IntfId,
+					"OnuId":   msg.OnuId,
+					"pktType": msg.Type,
+				}).Trace("Received OnuPacketOut Message")
+
+				if msg.Type == packetHandlers.EAPOL {
+					eapol.HandleNextPacket(msg.OnuId, msg.IntfId, o.Sn(), o.PortNo, o.InternalState, msg.Packet, stream, client)
+				} else if msg.Type == packetHandlers.DHCP {
+					// NOTE here we receive packets going from the DHCP Server to the ONU
+					// for now we expect them to be double-tagged, but ideally the should be single tagged
+					dhcp.HandleNextPacket(o.ID, o.PonPortID, o.Sn(), o.PortNo, o.HwAddress, o.CTag, o.InternalState, msg.Packet, stream)
+				}
+			case OnuPacketIn:
+				// NOTE we only receive BBR packets here.
+				// Eapol.HandleNextPacket can handle both BBSim and BBr cases so the call is the same
+				// in the DHCP case VOLTHA only act as a proxy, the behaviour is completely different thus we have a dhcp.HandleNextBbrPacket
+				msg, _ := message.Data.(OnuPacketMessage)
+
+				log.WithFields(log.Fields{
+					"IntfId":  msg.IntfId,
+					"OnuId":   msg.OnuId,
+					"pktType": msg.Type,
+				}).Trace("Received OnuPacketIn Message")
+
+				if msg.Type == packetHandlers.EAPOL {
+					eapol.HandleNextPacket(msg.OnuId, msg.IntfId, o.Sn(), o.PortNo, o.InternalState, msg.Packet, stream, client)
+				} else if msg.Type == packetHandlers.DHCP {
+					dhcp.HandleNextBbrPacket(o.ID, o.PonPortID, o.Sn(), o.STag, o.HwAddress, o.DoneChannel, msg.Packet, client)
+				}
+			case DyingGaspIndication:
+				msg, _ := message.Data.(DyingGaspIndicationMessage)
+				o.sendDyingGaspInd(msg, stream)
+			case OmciIndication:
+				msg, _ := message.Data.(OmciIndicationMessage)
+				o.handleOmci(msg, client)
+			case SendEapolFlow:
+				o.sendEapolFlow(client)
+			case SendDhcpFlow:
+				o.sendDhcpFlow(client)
+			default:
+				onuLogger.Warnf("Received unknown message data %v for type %v in OLT Channel", message.Data, message.Type)
 			}
-		case DyingGaspIndication:
-			msg, _ := message.Data.(DyingGaspIndicationMessage)
-			o.sendDyingGaspInd(msg, stream)
-		case OmciIndication:
-			msg, _ := message.Data.(OmciIndicationMessage)
-			o.handleOmci(msg, client)
-		case SendEapolFlow:
-			o.sendEapolFlow(client)
-		case SendDhcpFlow:
-			o.sendDhcpFlow(client)
-		default:
-			onuLogger.Warnf("Received unknown message data %v for type %v in OLT Channel", message.Data, message.Type)
 		}
 	}
 	onuLogger.WithFields(log.Fields{