[VOL-2778] Fixing reboots
Change-Id: I9751a6a9abd83a1a45a809b1c32834d60ec22254
diff --git a/internal/bbsim/api/services_handler.go b/internal/bbsim/api/services_handler.go
index d20b527..18bb84c 100644
--- a/internal/bbsim/api/services_handler.go
+++ b/internal/bbsim/api/services_handler.go
@@ -24,17 +24,18 @@
 
 func convertBBSimServiceToProtoService(s *devices.Service) *bbsim.Service {
 	return &bbsim.Service{
-		Name:       s.Name,
-		HwAddress:  s.HwAddress.String(),
-		OnuSn:      s.Onu.Sn(),
-		CTag:       int32(s.CTag),
-		STag:       int32(s.STag),
-		NeedsEapol: s.NeedsEapol,
-		NeedsDhcp:  s.NeedsDhcp,
-		NeedsIgmp:  s.NeedsIgmp,
-		GemPort:    int32(s.GemPort),
-		EapolState: s.EapolState.Current(),
-		DhcpState:  s.DHCPState.Current(),
+		Name:          s.Name,
+		InternalState: s.InternalState.Current(),
+		HwAddress:     s.HwAddress.String(),
+		OnuSn:         s.Onu.Sn(),
+		CTag:          int32(s.CTag),
+		STag:          int32(s.STag),
+		NeedsEapol:    s.NeedsEapol,
+		NeedsDhcp:     s.NeedsDhcp,
+		NeedsIgmp:     s.NeedsIgmp,
+		GemPort:       int32(s.GemPort),
+		EapolState:    s.EapolState.Current(),
+		DhcpState:     s.DHCPState.Current(),
 	}
 }
 
diff --git a/internal/bbsim/devices/olt.go b/internal/bbsim/devices/olt.go
index eff9c86..49ea3fe 100644
--- a/internal/bbsim/devices/olt.go
+++ b/internal/bbsim/devices/olt.go
@@ -277,20 +277,12 @@
 		return err
 	}
 
+	// PONs are already handled in the Disable call
 	for _, pon := range olt.Pons {
-		msg := Message{
-			Type: PonIndication,
-			Data: PonIndicationMessage{
-				OperState: DOWN,
-				PonPortID: pon.ID,
-			},
-		}
-		o.channel <- msg
-
+		// ONUs are not automatically disabled when a PON goes down
+		// as it's possible that it's an admin down and in that case the ONUs need to keep their state
 		for _, onu := range pon.Onus {
-			if onu.InternalState.Current() != "initialized" {
-				_ = onu.InternalState.Event("disable")
-			}
+			_ = onu.InternalState.Event("disable")
 		}
 	}
 
@@ -611,7 +603,7 @@
 	oltLogger.WithFields(log.Fields{
 		"IntfId":    pon.ID,
 		"OperState": pon.OperState.Current(),
-	}).Debug("Sent Indication_IntfInd")
+	}).Debug("Sent Indication_IntfInd for PON")
 
 	// Send IntfOperIndication for PON port
 	operData := &openolt.Indication_IntfOperInd{IntfOperInd: &openolt.IntfOperIndication{
@@ -1275,7 +1267,7 @@
 		"IntfId": onu.PonPortID,
 		"OnuId":  onu.ID,
 		"OnuSn":  onu.Sn(),
-	}).Info("Received OnuPacketOut")
+	}).Trace("Received OnuPacketOut")
 
 	rawpkt := gopacket.NewPacket(onuPkt.Pkt, layers.LayerTypeEthernet, gopacket.Default)
 	pktType, _ := packetHandlers.IsEapolOrDhcp(rawpkt)
diff --git a/internal/bbsim/devices/onu.go b/internal/bbsim/devices/onu.go
index 0f78107..9116b31 100644
--- a/internal/bbsim/devices/onu.go
+++ b/internal/bbsim/devices/onu.go
@@ -62,7 +62,8 @@
 	// PortNo comes with flows and it's used when sending packetIndications,
 	// There is one PortNo per UNI Port, for now we're only storing the first one
 	// FIXME add support for multiple UNIs (each UNI has a different PortNo)
-	PortNo       uint32
+	PortNo uint32
+	// deprecated (gemPort is on a Service basis)
 	GemPortAdded bool
 	Flows        []FlowKey
 	FlowIds      []uint32 // keep track of the flows we currently have in the ONU
@@ -178,6 +179,12 @@
 					},
 				}
 				o.Channel <- msg
+
+				// Once the ONU is enabled start listening for packets
+				for _, s := range o.Services {
+					s.Initialize()
+					go s.HandlePackets(o.PonPort.Olt.OpenoltStream)
+				}
 			},
 			"enter_disabled": func(event *fsm.Event) {
 
@@ -186,7 +193,7 @@
 				o.PortNo = 0
 				o.Flows = []FlowKey{}
 
-				// set the OpenState to disabled
+				// set the OperState to disabled
 				if err := o.OperState.Event("disable"); err != nil {
 					onuLogger.WithFields(log.Fields{
 						"OnuId":  o.ID,
@@ -211,6 +218,10 @@
 				if len(o.FlowIds) == 0 {
 					close(o.Channel)
 				}
+
+				for _, s := range o.Services {
+					s.Disable()
+				}
 			},
 			// BBR states
 			"enter_eapol_flow_sent": func(e *fsm.Event) {
@@ -346,6 +357,7 @@
 				} else if msg.Type == packetHandlers.DHCP {
 					_ = dhcp.HandleNextBbrPacket(o.ID, o.PonPortID, o.Sn(), o.DoneChannel, msg.Packet, client)
 				}
+			// BBR specific messages
 			case OmciIndication:
 				msg, _ := message.Data.(OmciIndicationMessage)
 				o.handleOmci(msg, client)
@@ -454,6 +466,7 @@
 	if err := stream.Send(&openolt.Indication{Data: indData}); err != nil {
 		// NOTE do we need to transition to a broken state?
 		log.Errorf("Failed to send Indication_OnuInd: %v", err)
+		return
 	}
 	onuLogger.WithFields(log.Fields{
 		"IntfId":     o.PonPortID,
@@ -463,10 +476,6 @@
 		"OnuSn":      o.Sn(),
 	}).Debug("Sent Indication_OnuInd")
 
-	for _, s := range o.Services {
-		go s.HandlePackets(stream)
-	}
-
 }
 
 func (o *Onu) publishOmciEvent(msg OmciMessage) {
diff --git a/internal/bbsim/devices/service_test.go b/internal/bbsim/devices/service_test.go
index f83193a..93ea368 100644
--- a/internal/bbsim/devices/service_test.go
+++ b/internal/bbsim/devices/service_test.go
@@ -19,7 +19,7 @@
 import (
 	"github.com/opencord/bbsim/internal/bbsim/types"
 	"github.com/opencord/voltha-protos/v2/go/openolt"
-	"gotest.tools/assert"
+	"github.com/stretchr/testify/assert"
 	"net"
 	"testing"
 )
@@ -43,6 +43,30 @@
 	s.HandlePacketsCallCount = s.HandlePacketsCallCount + 1
 }
 
+func (s *mockService) Initialize() {}
+func (s *mockService) Disable()    {}
+
+// test the internalState transitions
+func TestService_InternalState(t *testing.T) {
+	mac := net.HardwareAddr{0x2e, 0x60, byte(1), byte(1), byte(1), byte(1)}
+	onu := createMockOnu(1, 1)
+	s, err := NewService("testService", mac, onu, 900, 900,
+		false, false, false, 64, 0, false,
+		0, 0, 0, 0)
+
+	assert.Nil(t, err)
+
+	assert.Empty(t, s.PacketCh)
+	s.Initialize()
+
+	assert.NotNil(t, s.PacketCh)
+
+	s.Disable()
+	assert.Equal(t, "created", s.EapolState.Current())
+	assert.Equal(t, "created", s.DHCPState.Current())
+}
+
+// make sure that if the service does not need EAPOL we're not sending any packet
 func TestService_HandleAuth_noEapol(t *testing.T) {
 	mac := net.HardwareAddr{0x2e, 0x60, byte(1), byte(1), byte(1), byte(1)}
 	onu := createMockOnu(1, 1)
@@ -50,7 +74,7 @@
 		false, false, false, 64, 0, false,
 		0, 0, 0, 0)
 
-	assert.NilError(t, err)
+	assert.Nil(t, err)
 
 	stream := &mockStream{
 		Calls:   make(map[int]*openolt.Indication),
@@ -66,6 +90,7 @@
 	assert.Equal(t, s.EapolState.Current(), "created")
 }
 
+// make sure that if the service does need EAPOL we're sending any packet
 func TestService_HandleAuth_withEapol(t *testing.T) {
 	mac := net.HardwareAddr{0x2e, 0x60, byte(1), byte(1), byte(1), byte(1)}
 	onu := createMockOnu(1, 1)
@@ -73,7 +98,7 @@
 		true, false, false, 64, 0, false,
 		0, 0, 0, 0)
 
-	assert.NilError(t, err)
+	assert.Nil(t, err)
 
 	stream := &mockStream{
 		Calls: make(map[int]*openolt.Indication),
@@ -87,3 +112,71 @@
 	// state should not change
 	assert.Equal(t, s.EapolState.Current(), "eap_start_sent")
 }
+
+// make sure that if the service does not need DHCP we're not sending any packet
+func TestService_HandleDhcp_not_needed(t *testing.T) {
+	mac := net.HardwareAddr{0x2e, 0x60, byte(1), byte(1), byte(1), byte(1)}
+	onu := createMockOnu(1, 1)
+	s, err := NewService("testService", mac, onu, 900, 900,
+		false, false, false, 64, 0, false,
+		0, 0, 0, 0)
+
+	assert.Nil(t, err)
+
+	stream := &mockStream{
+		Calls: make(map[int]*openolt.Indication),
+	}
+
+	s.HandleDhcp(stream, 900)
+
+	assert.Equal(t, stream.CallCount, 0)
+
+	// state should not change
+	assert.Equal(t, s.DHCPState.Current(), "created")
+}
+
+// when we receive a DHCP flow we call HandleDhcp an all the ONU Services
+// each service device whether the tag matches it's own configuration
+func TestService_HandleDhcp_different_c_Tag(t *testing.T) {
+	mac := net.HardwareAddr{0x2e, 0x60, byte(1), byte(1), byte(1), byte(1)}
+	onu := createMockOnu(1, 1)
+	s, err := NewService("testService", mac, onu, 900, 900,
+		false, false, false, 64, 0, false,
+		0, 0, 0, 0)
+
+	assert.Nil(t, err)
+
+	stream := &mockStream{
+		Calls: make(map[int]*openolt.Indication),
+	}
+
+	// NOTE that the c_tag is different from the one configured in the service
+	s.HandleDhcp(stream, 800)
+
+	assert.Equal(t, stream.CallCount, 0)
+
+	// state should not change
+	assert.Equal(t, s.DHCPState.Current(), "created")
+}
+
+// make sure that if the service does need DHCP we're sending any packet
+func TestService_HandleDhcp_needed(t *testing.T) {
+	mac := net.HardwareAddr{0x2e, 0x60, byte(1), byte(1), byte(1), byte(1)}
+	onu := createMockOnu(1, 1)
+	s, err := NewService("testService", mac, onu, 900, 900,
+		false, true, false, 64, 0, false,
+		0, 0, 0, 0)
+
+	assert.Nil(t, err)
+
+	stream := &mockStream{
+		Calls: make(map[int]*openolt.Indication),
+	}
+
+	s.HandleDhcp(stream, 900)
+
+	assert.Equal(t, stream.CallCount, 1)
+
+	// state should not change
+	assert.Equal(t, s.DHCPState.Current(), "dhcp_discovery_sent")
+}
diff --git a/internal/bbsim/devices/services.go b/internal/bbsim/devices/services.go
index c7940c9..15d313a 100644
--- a/internal/bbsim/devices/services.go
+++ b/internal/bbsim/devices/services.go
@@ -34,6 +34,9 @@
 	HandlePackets(stream bbsimTypes.Stream)        // start listening on the PacketCh
 	HandleAuth(stream bbsimTypes.Stream)           // Sends the EapoStart packet
 	HandleDhcp(stream bbsimTypes.Stream, cTag int) // Sends the DHCPDiscover packet
+
+	Initialize()
+	Disable()
 }
 
 type Service struct {
@@ -54,10 +57,11 @@
 	DsPonSTagPriority   int
 
 	// state
-	GemPort    uint32
-	EapolState *fsm.FSM
-	DHCPState  *fsm.FSM
-	PacketCh   chan OnuPacketMessage
+	GemPort       uint32
+	InternalState *fsm.FSM
+	EapolState    *fsm.FSM
+	DHCPState     *fsm.FSM
+	PacketCh      chan OnuPacketMessage
 }
 
 func NewService(name string, hwAddress net.HardwareAddr, onu *Onu, cTag int, sTag int,
@@ -80,9 +84,32 @@
 		UsPonSTagPriority:   usPonSTagPriority,
 		DsPonCTagPriority:   dsPonCTagPriority,
 		DsPonSTagPriority:   dsPonSTagPriority,
-		PacketCh:            make(chan OnuPacketMessage),
 	}
 
+	service.InternalState = fsm.NewFSM(
+		"created",
+		fsm.Events{
+			{Name: "initialized", Src: []string{"created", "disabled"}, Dst: "initialized"},
+			{Name: "disabled", Src: []string{"initialized"}, Dst: "disabled"},
+		},
+		fsm.Callbacks{
+			"enter_state": func(e *fsm.Event) {
+				service.logStateChange("InternalState", e.Src, e.Dst)
+			},
+			"enter_initialized": func(e *fsm.Event) {
+				service.PacketCh = make(chan OnuPacketMessage)
+			},
+			"enter_disabled": func(e *fsm.Event) {
+				// reset the state machines
+				service.EapolState.SetState("created")
+				service.DHCPState.SetState("created")
+
+				// stop listening for packets
+				close(service.PacketCh)
+			},
+		},
+	)
+
 	service.EapolState = fsm.NewFSM(
 		"created",
 		fsm.Events{
@@ -158,7 +185,6 @@
 
 func (s *Service) HandleDhcp(stream bbsimTypes.Stream, cTag int) {
 
-	// FIXME start dhcp only for the Service that matches the tag
 	if s.CTag != cTag {
 		serviceLogger.WithFields(log.Fields{
 			"OnuId":  s.Onu.ID,
@@ -205,6 +231,66 @@
 	}
 }
 
+func (s *Service) HandlePackets(stream bbsimTypes.Stream) {
+	serviceLogger.WithFields(log.Fields{
+		"OnuId":     s.Onu.ID,
+		"IntfId":    s.Onu.PonPortID,
+		"OnuSn":     s.Onu.Sn(),
+		"GemPortId": s.GemPort,
+		"Name":      s.Name,
+	}).Debug("Listening on Service Packet Channel")
+
+	defer func() {
+		serviceLogger.WithFields(log.Fields{
+			"OnuId":     s.Onu.ID,
+			"IntfId":    s.Onu.PonPortID,
+			"OnuSn":     s.Onu.Sn(),
+			"GemPortId": s.GemPort,
+			"Name":      s.Name,
+		}).Debug("Done Listening on Service Packet Channel")
+	}()
+
+	for msg := range s.PacketCh {
+		serviceLogger.WithFields(log.Fields{
+			"OnuId":       s.Onu.ID,
+			"IntfId":      s.Onu.PonPortID,
+			"OnuSn":       s.Onu.Sn(),
+			"Name":        s.Name,
+			"messageType": msg.Type,
+		}).Debug("Received message on Service Packet Channel")
+
+		if msg.Type == packetHandlers.EAPOL {
+			eapol.HandleNextPacket(msg.OnuId, msg.IntfId, s.GemPort, s.Onu.Sn(), s.Onu.PortNo, s.EapolState, msg.Packet, stream, nil)
+		} else if msg.Type == packetHandlers.DHCP {
+			_ = dhcp.HandleNextPacket(s.Onu.PonPort.Olt.ID, s.Onu.ID, s.Onu.PonPortID, s.Onu.Sn(), s.Onu.PortNo, s.CTag, s.GemPort, s.HwAddress, s.DHCPState, msg.Packet, stream)
+		}
+	}
+}
+
+func (s *Service) Initialize() {
+	if err := s.InternalState.Event("initialized"); err != nil {
+		serviceLogger.WithFields(log.Fields{
+			"OnuId":  s.Onu.ID,
+			"IntfId": s.Onu.PonPortID,
+			"OnuSn":  s.Onu.Sn(),
+			"Name":   s.Name,
+			"Err":    err,
+		}).Error("Cannot initialize service")
+	}
+}
+
+func (s *Service) Disable() {
+	if err := s.InternalState.Event("disabled"); err != nil {
+		serviceLogger.WithFields(log.Fields{
+			"OnuId":  s.Onu.ID,
+			"IntfId": s.Onu.PonPortID,
+			"OnuSn":  s.Onu.Sn(),
+			"Name":   s.Name,
+			"Err":    err,
+		}).Error("Cannot disable service")
+	}
+}
+
 func (s *Service) handleEapolStart(stream bbsimTypes.Stream) error {
 
 	serviceLogger.WithFields(log.Fields{
@@ -253,42 +339,6 @@
 	return nil
 }
 
-func (s *Service) HandlePackets(stream bbsimTypes.Stream) {
-	serviceLogger.WithFields(log.Fields{
-		"OnuId":     s.Onu.ID,
-		"IntfId":    s.Onu.PonPortID,
-		"OnuSn":     s.Onu.Sn(),
-		"GemPortId": s.GemPort,
-		"Name":      s.Name,
-	}).Debug("Listening on Service Packet Channel")
-
-	defer func() {
-		serviceLogger.WithFields(log.Fields{
-			"OnuId":     s.Onu.ID,
-			"IntfId":    s.Onu.PonPortID,
-			"OnuSn":     s.Onu.Sn(),
-			"GemPortId": s.GemPort,
-			"Name":      s.Name,
-		}).Debug("Done Listening on Service Packet Channel")
-	}()
-
-	for msg := range s.PacketCh {
-		serviceLogger.WithFields(log.Fields{
-			"OnuId":       s.Onu.ID,
-			"IntfId":      s.Onu.PonPortID,
-			"OnuSn":       s.Onu.Sn(),
-			"Name":        s.Name,
-			"messageType": msg.Type,
-		}).Debug("Received message on Service Packet Channel")
-
-		if msg.Type == packetHandlers.EAPOL {
-			eapol.HandleNextPacket(msg.OnuId, msg.IntfId, s.GemPort, s.Onu.Sn(), s.Onu.PortNo, s.EapolState, msg.Packet, stream, nil)
-		} else if msg.Type == packetHandlers.DHCP {
-			_ = dhcp.HandleNextPacket(s.Onu.PonPort.Olt.ID, s.Onu.ID, s.Onu.PonPortID, s.Onu.Sn(), s.Onu.PortNo, s.CTag, s.GemPort, s.HwAddress, s.DHCPState, msg.Packet, stream)
-		}
-	}
-}
-
 func (s *Service) logStateChange(stateMachine string, src string, dst string) {
 	serviceLogger.WithFields(log.Fields{
 		"OnuId":  s.Onu.ID,
diff --git a/internal/bbsim/responders/eapol/eapol.go b/internal/bbsim/responders/eapol/eapol.go
index 5595d5b..8a37f02 100644
--- a/internal/bbsim/responders/eapol/eapol.go
+++ b/internal/bbsim/responders/eapol/eapol.go
@@ -38,7 +38,7 @@
 var eapolVersion uint8 = 1
 var GetGemPortId = omci.GetGemPortId
 
-func sendEapolPktIn(msg bbsim.ByteMsg, portNo uint32, gemid uint32, stream bbsim.Stream) {
+func sendEapolPktIn(msg bbsim.ByteMsg, portNo uint32, gemid uint32, stream bbsim.Stream) error {
 	// FIXME unify sendDHCPPktIn and sendEapolPktIn methods
 
 	log.WithFields(log.Fields{
@@ -58,8 +58,9 @@
 
 	if err := stream.Send(&openolt.Indication{Data: data}); err != nil {
 		eapolLogger.Errorf("Fail to send EAPOL PktInd indication. %v", err)
-		return
+		return err
 	}
+	return nil
 }
 
 func getMD5Data(eap *layers.EAP) []byte {
@@ -312,7 +313,10 @@
 			Bytes:  pkt,
 		}
 
-		sendEapolPktIn(msg, portNo, gemPortId, stream)
+		if err := sendEapolPktIn(msg, portNo, gemPortId, stream); err != nil {
+			_ = stateMachine.Event("auth_failed")
+			return
+		}
 		eapolLogger.WithFields(log.Fields{
 			"OnuId":  onuId,
 			"IntfId": ponPortId,
@@ -360,7 +364,10 @@
 			Bytes:  pkt,
 		}
 
-		sendEapolPktIn(msg, portNo, gemPortId, stream)
+		if err := sendEapolPktIn(msg, portNo, gemPortId, stream); err != nil {
+			_ = stateMachine.Event("auth_failed")
+			return
+		}
 		eapolLogger.WithFields(log.Fields{
 			"OnuId":  onuId,
 			"IntfId": ponPortId,