Simplifying ONU channels and packet responders

Change-Id: I1f3912367a96564986b4209b7864e9fd4b507e8e
diff --git a/internal/bbsim/devices/olt.go b/internal/bbsim/devices/olt.go
index e6248e8..33126a8 100644
--- a/internal/bbsim/devices/olt.go
+++ b/internal/bbsim/devices/olt.go
@@ -543,21 +543,37 @@
 	pon, _ := o.getPonById(onuPkt.IntfId)
 	onu, _ := pon.getOnuById(onuPkt.OnuId)
 
+	oltLogger.WithFields(log.Fields{
+		"IntfId": onu.PonPortID,
+		"OnuId":  onu.ID,
+		"OnuSn":  onu.Sn(),
+	}).Tracef("Received OnuPacketOut")
+
 	rawpkt := gopacket.NewPacket(onuPkt.Pkt, layers.LayerTypeEthernet, gopacket.Default)
 
-	etherType := rawpkt.Layer(layers.LayerTypeEthernet).(*layers.Ethernet).EthernetType
-
-	if etherType == layers.EthernetTypeEAPOL {
-		eapolPkt := bbsim.ByteMsg{IntfId: onuPkt.IntfId, OnuId: onuPkt.OnuId, Bytes: rawpkt.Data()}
-		onu.eapolPktOutCh <- &eapolPkt
-	} else if layerDHCP := rawpkt.Layer(layers.LayerTypeDHCPv4); layerDHCP != nil {
-		// TODO use IsDhcpPacket
-		// TODO we need to untag the packets
-		// NOTE here we receive packets going from the DHCP Server to the ONU
-		// for now we expect them to be double-tagged, but ideally the should be single tagged
-		dhcpPkt := bbsim.ByteMsg{IntfId: onuPkt.IntfId, OnuId: onuPkt.OnuId, Bytes: rawpkt.Data()}
-		onu.dhcpPktOutCh <- &dhcpPkt
+	msg := Message{
+		Type: OnuPacketOut,
+		Data: OnuPacketOutMessage{
+			IntfId: onuPkt.IntfId,
+			OnuId:  onuPkt.OnuId,
+			Packet: rawpkt,
+		},
 	}
+	onu.Channel <- msg
+
+	//etherType := rawpkt.Layer(layers.LayerTypeEthernet).(*layers.Ethernet).EthernetType
+	//
+	//if etherType == layers.EthernetTypeEAPOL {
+	//	eapolPkt := bbsim.ByteMsg{IntfId: onuPkt.IntfId, OnuId: onuPkt.OnuId, Bytes: rawpkt.Data()}
+	//	onu.eapolPktOutCh <- &eapolPkt
+	//} else if layerDHCP := rawpkt.Layer(layers.LayerTypeDHCPv4); layerDHCP != nil {
+	//	// TODO use IsDhcpPacket
+	//	// TODO we need to untag the packets
+	//	// NOTE here we receive packets going from the DHCP Server to the ONU
+	//	// for now we expect them to be double-tagged, but ideally the should be single tagged
+	//	dhcpPkt := bbsim.ByteMsg{IntfId: onuPkt.IntfId, OnuId: onuPkt.OnuId, Bytes: rawpkt.Data()}
+	//	onu.dhcpPktOutCh <- &dhcpPkt
+	//}
 	return new(openolt.Empty), nil
 }
 
diff --git a/internal/bbsim/devices/onu.go b/internal/bbsim/devices/onu.go
index 4587620..3477a62 100644
--- a/internal/bbsim/devices/onu.go
+++ b/internal/bbsim/devices/onu.go
@@ -20,6 +20,7 @@
 	"fmt"
 	"github.com/google/gopacket/layers"
 	"github.com/looplab/fsm"
+	"github.com/opencord/bbsim/internal/bbsim/packetHandlers"
 	"github.com/opencord/bbsim/internal/bbsim/responders/dhcp"
 	"github.com/opencord/bbsim/internal/bbsim/responders/eapol"
 	bbsim "github.com/opencord/bbsim/internal/bbsim/types"
@@ -184,33 +185,22 @@
 			o.handleFlowUpdate(msg, stream)
 		case StartEAPOL:
 			log.Infof("Receive StartEAPOL message on ONU Channel")
-			go func() {
-				// TODO kill this thread
-				eapol.CreateWPASupplicant(
-					o.ID,
-					o.PonPortID,
-					o.Sn(),
-					o.InternalState,
-					stream,
-					o.eapolPktOutCh,
-				)
-			}()
+			eapol.SendEapStart(o.ID, o.PonPortID, o.Sn(), o.InternalState, stream)
 		case StartDHCP:
 			log.Infof("Receive StartDHCP message on ONU Channel")
-			go func() {
-				// TODO kill this thread
-				dhcp.CreateDHCPClient(
-					o.ID,
-					o.PonPortID,
-					o.Sn(),
-					o.HwAddress,
-					o.CTag,
-					o.InternalState,
-					stream,
-					o.dhcpPktOutCh,
-				)
-			}()
+			dhcp.SendDHCPDiscovery(o.PonPortID, o.ID, o.Sn(), o.InternalState, o.HwAddress, o.CTag, stream)
+		case OnuPacketOut:
+			msg, _ := message.Data.(OnuPacketOutMessage)
+			pkt := msg.Packet
+			etherType := pkt.Layer(layers.LayerTypeEthernet).(*layers.Ethernet).EthernetType
 
+			if etherType == layers.EthernetTypeEAPOL {
+				eapol.HandleNextPacket(msg.OnuId, msg.IntfId, o.Sn(), o.InternalState, msg.Packet, stream)
+			} else if packetHandlers.IsDhcpPacket(pkt) {
+				// NOTE here we receive packets going from the DHCP Server to the ONU
+				// for now we expect them to be double-tagged, but ideally the should be single tagged
+				dhcp.HandleNextPacket(o.ID, o.PonPortID, o.Sn(), o.HwAddress, o.CTag, o.InternalState, msg.Packet, stream)
+			}
 		case DyingGaspIndication:
 			msg, _ := message.Data.(DyingGaspIndicationMessage)
 			o.sendDyingGaspInd(msg, stream)
diff --git a/internal/bbsim/devices/types.go b/internal/bbsim/devices/types.go
index 93998e9..eca15d4 100644
--- a/internal/bbsim/devices/types.go
+++ b/internal/bbsim/devices/types.go
@@ -20,6 +20,7 @@
 	"bytes"
 	"errors"
 	"fmt"
+	"github.com/google/gopacket"
 	"github.com/looplab/fsm"
 	bbsim "github.com/opencord/bbsim/internal/bbsim/types"
 	"github.com/opencord/voltha-protos/go/openolt"
@@ -128,7 +129,8 @@
 	FlowUpdate          MessageType = 6
 	StartEAPOL          MessageType = 7
 	StartDHCP           MessageType = 8
-	DyingGaspIndication MessageType = 9
+	OnuPacketOut        MessageType = 9
+	DyingGaspIndication MessageType = 10
 )
 
 func (m MessageType) String() string {
@@ -142,6 +144,7 @@
 		"FlowUpdate",
 		"StartEAPOL",
 		"StartDHCP",
+		"OnuPacketOut",
 		"DyingGaspIndication",
 	}
 	return names[m]
@@ -201,6 +204,12 @@
 	Status    string
 }
 
+type OnuPacketOutMessage struct {
+	IntfId uint32
+	OnuId  uint32
+	Packet gopacket.Packet
+}
+
 type OperState int
 
 const (
diff --git a/internal/bbsim/responders/dhcp/dhcp.go b/internal/bbsim/responders/dhcp/dhcp.go
index d2a3db7..64e83dd 100644
--- a/internal/bbsim/responders/dhcp/dhcp.go
+++ b/internal/bbsim/responders/dhcp/dhcp.go
@@ -30,6 +30,8 @@
 	"reflect"
 )
 
+var GetGemPortId = omci.GetGemPortId
+
 var dhcpLogger = log.WithFields(log.Fields{
 	"module": "DHCP",
 })
@@ -194,7 +196,7 @@
 
 func sendDHCPPktIn(msg bbsim.ByteMsg, stream openolt.Openolt_EnableIndicationServer) error {
 	// FIXME unify sendDHCPPktIn and sendEapolPktIn methods
-	gemid, err := omci.GetGemPortId(msg.IntfId, msg.OnuId)
+	gemid, err := GetGemPortId(msg.IntfId, msg.OnuId)
 	if err != nil {
 		dhcpLogger.WithFields(log.Fields{
 			"OnuId":  msg.OnuId,
@@ -213,33 +215,6 @@
 	return nil
 }
 
-func sendDHCPDiscovery(ponPortId uint32, onuId uint32, serialNumber string, onuHwAddress net.HardwareAddr, cTag int, stream openolt.Openolt_EnableIndicationServer) error {
-	dhcp := createDHCPDisc(ponPortId, onuId)
-	pkt, err := serializeDHCPPacket(ponPortId, onuId, onuHwAddress, dhcp)
-	if err != nil {
-		dhcpLogger.Errorf("Cannot serializeDHCPPacket: %s", err)
-		return err
-	}
-	// NOTE I don't think we need to tag the packet
-	//taggedPkt, err := packetHandlers.PushSingleTag(cTag, pkt)
-
-	msg := bbsim.ByteMsg{
-		IntfId: ponPortId,
-		OnuId:  onuId,
-		Bytes:  pkt,
-	}
-
-	if err := sendDHCPPktIn(msg, stream); err != nil {
-		return err
-	}
-	dhcpLogger.WithFields(log.Fields{
-		"OnuId":  onuId,
-		"IntfId": ponPortId,
-		"OnuSn":  serialNumber,
-	}).Infof("DHCPDiscovery Sent")
-	return nil
-}
-
 func sendDHCPRequest(ponPortId uint32, onuId uint32, serialNumber string, onuHwAddress net.HardwareAddr, cTag int, stream openolt.Openolt_EnableIndicationServer) error {
 	dhcp := createDHCPReq(ponPortId, onuId)
 	pkt, err := serializeDHCPPacket(ponPortId, onuId, onuHwAddress, dhcp)
@@ -268,38 +243,45 @@
 	return nil
 }
 
-func CreateDHCPClient(onuId uint32, ponPortId uint32, serialNumber string, onuHwAddress net.HardwareAddr, cTag int, onuStateMachine *fsm.FSM, stream openolt.Openolt_EnableIndicationServer, pktOutCh chan *bbsim.ByteMsg) {
-	// NOTE pckOutCh is channel to listen on for packets received by VOLTHA
-	// the OLT device will publish messages on that channel
-
-	dhcpLogger.WithFields(log.Fields{
-		"OnuId":  onuId,
-		"IntfId": ponPortId,
-		"OnuSn":  serialNumber,
-	}).Infof("DHCP State Machine starting")
-
-	defer dhcpLogger.WithFields(log.Fields{
-		"OnuId":  onuId,
-		"IntfId": ponPortId,
-		"OnuSn":  serialNumber,
-	}).Infof("DHCP State machine completed")
-
-	// Send DHCP Discovery packet
-	if err := sendDHCPDiscovery(ponPortId, onuId, serialNumber, onuHwAddress, cTag, stream); err != nil {
+func updateDhcpFailed(onuId uint32, ponPortId uint32, serialNumber string, onuStateMachine *fsm.FSM) error {
+	if err := onuStateMachine.Event("dhcp_failed"); err != nil {
 		dhcpLogger.WithFields(log.Fields{
 			"OnuId":  onuId,
 			"IntfId": ponPortId,
 			"OnuSn":  serialNumber,
-		}).Errorf("Can't send DHCP Discovery: %s", err)
-		if err := onuStateMachine.Event("dhcp_failed"); err != nil {
-			dhcpLogger.WithFields(log.Fields{
-				"OnuId":  onuId,
-				"IntfId": ponPortId,
-				"OnuSn":  serialNumber,
-			}).Errorf("Error while transitioning ONU State %v", err)
-		}
-		return
+		}).Errorf("Error while transitioning ONU State %v", err)
+		return err
 	}
+	return nil
+}
+
+func SendDHCPDiscovery(ponPortId uint32, onuId uint32, serialNumber string, onuStateMachine *fsm.FSM, onuHwAddress net.HardwareAddr, cTag int, stream openolt.Openolt_EnableIndicationServer) error {
+	dhcp := createDHCPDisc(ponPortId, onuId)
+	pkt, err := serializeDHCPPacket(ponPortId, onuId, onuHwAddress, dhcp)
+	if err != nil {
+		dhcpLogger.Errorf("Cannot serializeDHCPPacket: %s", err)
+		return err
+	}
+	// NOTE I don't think we need to tag the packet
+	//taggedPkt, err := packetHandlers.PushSingleTag(cTag, pkt)
+
+	msg := bbsim.ByteMsg{
+		IntfId: ponPortId,
+		OnuId:  onuId,
+		Bytes:  pkt,
+	}
+
+	if err := sendDHCPPktIn(msg, stream); err != nil {
+		if err := updateDhcpFailed(onuId, ponPortId, serialNumber, onuStateMachine); err != nil {
+			return err
+		}
+		return err
+	}
+	dhcpLogger.WithFields(log.Fields{
+		"OnuId":  onuId,
+		"IntfId": ponPortId,
+		"OnuSn":  serialNumber,
+	}).Infof("DHCPDiscovery Sent")
 
 	if err := onuStateMachine.Event("dhcp_discovery_sent"); err != nil {
 		dhcpLogger.WithFields(log.Fields{
@@ -308,80 +290,79 @@
 			"OnuSn":  serialNumber,
 		}).Errorf("Error while transitioning ONU State %v", err)
 	}
+	return nil
+}
 
-	dhcpLogger.WithFields(log.Fields{
-		"OnuId":  onuId,
-		"IntfId": ponPortId,
-		"OnuSn":  serialNumber,
-	}).Infof("Listening on dhcpPktOutCh")
+func HandleNextPacket(onuId uint32, ponPortId uint32, serialNumber string, onuHwAddress net.HardwareAddr, cTag int, onuStateMachine *fsm.FSM, pkt gopacket.Packet, stream openolt.Openolt_EnableIndicationServer) error {
 
-	for msg := range pktOutCh {
-		dhcpLogger.Tracef("Received DHCP message %v", msg)
-
-		pkt := gopacket.NewPacket(msg.Bytes, layers.LayerTypeEthernet, gopacket.Default)
-		dhcpLayer, err := getDhcpLayer(pkt)
-		if err != nil {
-			dhcpLogger.WithFields(log.Fields{
-				"OnuId":  onuId,
-				"IntfId": ponPortId,
-				"OnuSn":  serialNumber,
-			}).Errorf("Can't get DHCP Layer from Packet: %v", err)
-			continue
+	dhcpLayer, err := getDhcpLayer(pkt)
+	if err != nil {
+		dhcpLogger.WithFields(log.Fields{
+			"OnuId":  onuId,
+			"IntfId": ponPortId,
+			"OnuSn":  serialNumber,
+		}).Errorf("Can't get DHCP Layer from Packet: %v", err)
+		if err := updateDhcpFailed(onuId, ponPortId, serialNumber, onuStateMachine); err != nil {
+			return err
 		}
-		dhcpMessageType, err := getDhcpMessageType(dhcpLayer)
-		if err != nil {
-			dhcpLogger.WithFields(log.Fields{
-				"OnuId":  onuId,
-				"IntfId": ponPortId,
-				"OnuSn":  serialNumber,
-			}).Errorf("Can't get DHCP Message Type from DHCP Layer: %v", err)
-			continue
-		}
-
-		if dhcpLayer.Operation == layers.DHCPOpReply {
-			if dhcpMessageType == layers.DHCPMsgTypeOffer {
-				if err := sendDHCPRequest(ponPortId, onuId, serialNumber, onuHwAddress, cTag, stream); err != nil {
-					dhcpLogger.WithFields(log.Fields{
-						"OnuId":  onuId,
-						"IntfId": ponPortId,
-						"OnuSn":  serialNumber,
-					}).Errorf("Can't send DHCP Request: %s", err)
-					if err := onuStateMachine.Event("dhcp_failed"); err != nil {
-						dhcpLogger.WithFields(log.Fields{
-							"OnuId":  onuId,
-							"IntfId": ponPortId,
-							"OnuSn":  serialNumber,
-						}).Errorf("Error while transitioning ONU State %v", err)
-					}
-					return
-				}
-				if err := onuStateMachine.Event("dhcp_request_sent"); err != nil {
-					dhcpLogger.WithFields(log.Fields{
-						"OnuId":  onuId,
-						"IntfId": ponPortId,
-						"OnuSn":  serialNumber,
-					}).Errorf("Error while transitioning ONU State %v", err)
-				}
-
-			} else if dhcpMessageType == layers.DHCPMsgTypeAck {
-				// NOTE once the ack is received we don't need to do anything but change the state
-				if err := onuStateMachine.Event("dhcp_ack_received"); err != nil {
-					dhcpLogger.WithFields(log.Fields{
-						"OnuId":  onuId,
-						"IntfId": ponPortId,
-						"OnuSn":  serialNumber,
-					}).Errorf("Error while transitioning ONU State %v", err)
-				}
-				return
-			}
-			// NOTE do we need to care about DHCPMsgTypeRelease??
-		} else {
-			dhcpLogger.WithFields(log.Fields{
-				"OnuId":  onuId,
-				"IntfId": ponPortId,
-				"OnuSn":  serialNumber,
-			}).Warnf("Unsupported DHCP Operation: %s", dhcpLayer.Operation.String())
-			continue
-		}
+		return err
 	}
+	dhcpMessageType, err := getDhcpMessageType(dhcpLayer)
+	if err != nil {
+		dhcpLogger.WithFields(log.Fields{
+			"OnuId":  onuId,
+			"IntfId": ponPortId,
+			"OnuSn":  serialNumber,
+		}).Errorf("Can't get DHCP Message Type from DHCP Layer: %v", err)
+		if err := updateDhcpFailed(onuId, ponPortId, serialNumber, onuStateMachine); err != nil {
+			return err
+		}
+		return err
+	}
+
+	if dhcpLayer.Operation == layers.DHCPOpReply {
+		if dhcpMessageType == layers.DHCPMsgTypeOffer {
+			if err := sendDHCPRequest(ponPortId, onuId, serialNumber, onuHwAddress, cTag, stream); err != nil {
+				dhcpLogger.WithFields(log.Fields{
+					"OnuId":  onuId,
+					"IntfId": ponPortId,
+					"OnuSn":  serialNumber,
+				}).Errorf("Can't send DHCP Request: %s", err)
+				if err := updateDhcpFailed(onuId, ponPortId, serialNumber, onuStateMachine); err != nil {
+					return err
+				}
+				return err
+			}
+			if err := onuStateMachine.Event("dhcp_request_sent"); err != nil {
+				dhcpLogger.WithFields(log.Fields{
+					"OnuId":  onuId,
+					"IntfId": ponPortId,
+					"OnuSn":  serialNumber,
+				}).Errorf("Error while transitioning ONU State %v", err)
+			}
+
+		} else if dhcpMessageType == layers.DHCPMsgTypeAck {
+			// NOTE once the ack is received we don't need to do anything but change the state
+			if err := onuStateMachine.Event("dhcp_ack_received"); err != nil {
+				dhcpLogger.WithFields(log.Fields{
+					"OnuId":  onuId,
+					"IntfId": ponPortId,
+					"OnuSn":  serialNumber,
+				}).Errorf("Error while transitioning ONU State %v", err)
+			}
+			dhcpLogger.WithFields(log.Fields{
+				"OnuId":  onuId,
+				"IntfId": ponPortId,
+				"OnuSn":  serialNumber,
+			}).Infof("DHCP State machine completed")
+		}
+		// NOTE do we need to care about DHCPMsgTypeRelease??
+	} else {
+		dhcpLogger.WithFields(log.Fields{
+			"OnuId":  onuId,
+			"IntfId": ponPortId,
+			"OnuSn":  serialNumber,
+		}).Warnf("Unsupported DHCP Operation: %s", dhcpLayer.Operation.String())
+	}
+	return nil
 }
diff --git a/internal/bbsim/responders/dhcp/dhcp_test.go b/internal/bbsim/responders/dhcp/dhcp_test.go
new file mode 100644
index 0000000..0733cc0
--- /dev/null
+++ b/internal/bbsim/responders/dhcp/dhcp_test.go
@@ -0,0 +1,120 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package dhcp
+
+import (
+	"errors"
+	"github.com/looplab/fsm"
+	"github.com/opencord/voltha-protos/go/openolt"
+	"google.golang.org/grpc"
+	"gotest.tools/assert"
+	"net"
+	"testing"
+)
+
+// MOCKS
+var calledSend = 0
+
+var dhcpStateMachine = fsm.NewFSM(
+	"dhcp_started",
+	fsm.Events{
+		{Name: "dhcp_discovery_sent", Src: []string{"dhcp_started"}, Dst: "dhcp_discovery_sent"},
+		{Name: "dhcp_request_sent", Src: []string{"dhcp_discovery_sent"}, Dst: "dhcp_request_sent"},
+		{Name: "dhcp_ack_received", Src: []string{"dhcp_request_sent"}, Dst: "dhcp_ack_received"},
+		{Name: "dhcp_failed", Src: []string{"dhcp_started", "dhcp_discovery_sent", "dhcp_request_sent"}, Dst: "dhcp_failed"},
+	},
+	fsm.Callbacks{},
+)
+
+type mockStreamSuccess struct {
+	grpc.ServerStream
+}
+
+func (s mockStreamSuccess) Send(ind *openolt.Indication) error {
+	calledSend++
+	return nil
+}
+
+type mockStreamError struct {
+	grpc.ServerStream
+}
+
+func (s mockStreamError) Send(ind *openolt.Indication) error {
+	calledSend++
+	return errors.New("stream-error")
+}
+
+// TESTS
+
+func TestSendDHCPDiscovery(t *testing.T) {
+	calledSend = 0
+	dhcpStateMachine.SetState("dhcp_started")
+
+	// Save current function and restore at the end:
+	old := GetGemPortId
+	defer func() { GetGemPortId = old }()
+
+	GetGemPortId = func(intfId uint32, onuId uint32) (uint16, error) {
+		return 1, nil
+	}
+
+	var onuId uint32 = 1
+	var ponPortId uint32 = 0
+	var serialNumber string = "BBSM00000001"
+	var mac = net.HardwareAddr{0x2e, 0x60, 0x70, 0x13, byte(ponPortId), byte(onuId)}
+
+	stream := mockStreamSuccess{}
+
+	if err := SendDHCPDiscovery(ponPortId, onuId, serialNumber, dhcpStateMachine, mac, 1, stream); err != nil {
+		t.Errorf("SendDHCPDiscovery returned an error: %v", err)
+		t.Fail()
+	}
+
+	assert.Equal(t, calledSend, 1)
+
+	assert.Equal(t, dhcpStateMachine.Current(), "dhcp_discovery_sent")
+}
+
+// TODO test dhcp.HandleNextPacket
+
+func TestUpdateDhcpFailed(t *testing.T) {
+
+	var onuId uint32 = 1
+	var ponPortId uint32 = 0
+	var serialNumber string = "BBSM00000001"
+
+	dhcpStateMachine.SetState("dhcp_started")
+	updateDhcpFailed(onuId, ponPortId, serialNumber, dhcpStateMachine)
+	assert.Equal(t, dhcpStateMachine.Current(), "dhcp_failed")
+
+	dhcpStateMachine.SetState("dhcp_discovery_sent")
+	updateDhcpFailed(onuId, ponPortId, serialNumber, dhcpStateMachine)
+	assert.Equal(t, dhcpStateMachine.Current(), "dhcp_failed")
+
+	dhcpStateMachine.SetState("dhcp_request_sent")
+	updateDhcpFailed(onuId, ponPortId, serialNumber, dhcpStateMachine)
+	assert.Equal(t, dhcpStateMachine.Current(), "dhcp_failed")
+
+	dhcpStateMachine.SetState("dhcp_ack_received")
+	err := updateDhcpFailed(onuId, ponPortId, serialNumber, dhcpStateMachine)
+	if err == nil {
+		t.Errorf("updateDhcpFailed did not return an error")
+		t.Fail()
+	}
+	assert.Equal(t, err.Error(), "event dhcp_failed inappropriate in current state dhcp_ack_received")
+
+}
diff --git a/internal/bbsim/responders/eapol/eapol.go b/internal/bbsim/responders/eapol/eapol.go
index 602a78e..da08cf5 100644
--- a/internal/bbsim/responders/eapol/eapol.go
+++ b/internal/bbsim/responders/eapol/eapol.go
@@ -34,125 +34,7 @@
 })
 
 var eapolVersion uint8 = 1
-
-func CreateWPASupplicant(onuId uint32, ponPortId uint32, serialNumber string, onuStateMachine *fsm.FSM, stream openolt.Openolt_EnableIndicationServer, pktOutCh chan *bbsim.ByteMsg) {
-	// NOTE pckOutCh is channel to listen on for packets received by VOLTHA
-	// the OLT device will publish messages on that channel
-
-	eapolLogger.WithFields(log.Fields{
-		"OnuId":  onuId,
-		"IntfId": ponPortId,
-		"OnuSn":  serialNumber,
-	}).Infof("EAPOL State Machine starting")
-
-	defer eapolLogger.WithFields(log.Fields{
-		"OnuId":  onuId,
-		"IntfId": ponPortId,
-		"OnuSn":  serialNumber,
-	}).Infof("EAPOL State machine completed")
-
-	if err := sendEapStart(onuId, ponPortId, serialNumber, stream); err != nil {
-		eapolLogger.WithFields(log.Fields{
-			"OnuId":  onuId,
-			"IntfId": ponPortId,
-			"OnuSn":  serialNumber,
-		}).Errorf("Can't send EapStart Message: %s", err)
-		if err := onuStateMachine.Event("auth_failed"); err != nil {
-			eapolLogger.WithFields(log.Fields{
-				"OnuId":  onuId,
-				"IntfId": ponPortId,
-				"OnuSn":  serialNumber,
-			}).Errorf("Error while transitioning ONU State %v", err)
-		}
-		return
-	}
-	if err := onuStateMachine.Event("eap_start_sent"); err != nil {
-		eapolLogger.WithFields(log.Fields{
-			"OnuId":  onuId,
-			"IntfId": ponPortId,
-			"OnuSn":  serialNumber,
-		}).Errorf("Error while transitioning ONU State %v", err)
-	}
-
-	eapolLogger.WithFields(log.Fields{
-		"OnuId":  onuId,
-		"IntfId": ponPortId,
-		"OnuSn":  serialNumber,
-	}).Infof("Listening on eapolPktOutCh")
-
-	for msg := range pktOutCh {
-		recvpkt := gopacket.NewPacket(msg.Bytes, layers.LayerTypeEthernet, gopacket.Default)
-		eap, err := extractEAP(recvpkt)
-		if err != nil {
-			eapolLogger.Errorf("%s", err)
-		}
-
-		if eap.Code == layers.EAPCodeRequest && eap.Type == layers.EAPTypeIdentity {
-			reseap := createEAPIdentityResponse(eap.Id)
-			pkt := createEAPOLPkt(reseap, onuId, ponPortId)
-
-			msg := bbsim.ByteMsg{
-				IntfId: ponPortId,
-				OnuId:  onuId,
-				Bytes:  pkt,
-			}
-
-			sendEapolPktIn(msg, stream)
-			eapolLogger.WithFields(log.Fields{
-				"OnuId":  onuId,
-				"IntfId": ponPortId,
-				"OnuSn":  serialNumber,
-			}).Infof("Sent EAPIdentityResponse packet")
-			if err := onuStateMachine.Event("eap_response_identity_sent"); err != nil {
-				eapolLogger.WithFields(log.Fields{
-					"OnuId":  onuId,
-					"IntfId": ponPortId,
-					"OnuSn":  serialNumber,
-				}).Errorf("Error while transitioning ONU State %v", err)
-			}
-
-		} else if eap.Code == layers.EAPCodeRequest && eap.Type == layers.EAPTypeOTP {
-			senddata := getMD5Data(eap)
-			senddata = append([]byte{0x10}, senddata...)
-			sendeap := createEAPChallengeResponse(eap.Id, senddata)
-			pkt := createEAPOLPkt(sendeap, onuId, ponPortId)
-
-			msg := bbsim.ByteMsg{
-				IntfId: ponPortId,
-				OnuId:  onuId,
-				Bytes:  pkt,
-			}
-
-			sendEapolPktIn(msg, stream)
-			eapolLogger.WithFields(log.Fields{
-				"OnuId":  onuId,
-				"IntfId": ponPortId,
-				"OnuSn":  serialNumber,
-			}).Infof("Sent EAPChallengeResponse packet")
-			if err := onuStateMachine.Event("eap_response_challenge_sent"); err != nil {
-				eapolLogger.WithFields(log.Fields{
-					"OnuId":  onuId,
-					"IntfId": ponPortId,
-					"OnuSn":  serialNumber,
-				}).Errorf("Error while transitioning ONU State %v", err)
-			}
-		} else if eap.Code == layers.EAPCodeSuccess && eap.Type == layers.EAPTypeNone {
-			eapolLogger.WithFields(log.Fields{
-				"OnuId":  onuId,
-				"IntfId": ponPortId,
-				"OnuSn":  serialNumber,
-			}).Infof("Received EAPSuccess packet")
-			if err := onuStateMachine.Event("eap_response_success_received"); err != nil {
-				eapolLogger.WithFields(log.Fields{
-					"OnuId":  onuId,
-					"IntfId": ponPortId,
-					"OnuSn":  serialNumber,
-				}).Errorf("Error while transitioning ONU State %v", err)
-			}
-			return
-		}
-	}
-}
+var GetGemPortId = omci.GetGemPortId
 
 func sendEapolPktIn(msg bbsim.ByteMsg, stream openolt.Openolt_EnableIndicationServer) {
 	// FIXME unify sendDHCPPktIn and sendEapolPktIn methods
@@ -235,11 +117,32 @@
 	return eap, nil
 }
 
-var sendEapStart = func(onuId uint32, ponPortId uint32, serialNumber string, stream openolt.Openolt_EnableIndicationServer) error {
+func updateAuthFailed(onuId uint32, ponPortId uint32, serialNumber string, onuStateMachine *fsm.FSM) error {
+	if err := onuStateMachine.Event("auth_failed"); err != nil {
+		eapolLogger.WithFields(log.Fields{
+			"OnuId":  onuId,
+			"IntfId": ponPortId,
+			"OnuSn":  serialNumber,
+		}).Errorf("Error while transitioning ONU State %v", err)
+		return err
+	}
+	return nil
+}
+
+func SendEapStart(onuId uint32, ponPortId uint32, serialNumber string, onuStateMachine *fsm.FSM, stream openolt.Openolt_EnableIndicationServer) error {
 
 	// send the packet (hacked together)
-	gemid, err := omci.GetGemPortId(ponPortId, onuId)
+	gemId, err := GetGemPortId(ponPortId, onuId)
 	if err != nil {
+		eapolLogger.WithFields(log.Fields{
+			"OnuId":  onuId,
+			"IntfId": ponPortId,
+			"OnuSn":  serialNumber,
+		}).Errorf("Can't retrieve GemPortId: %s", err)
+
+		if err := updateAuthFailed(onuId, ponPortId, serialNumber, onuStateMachine); err != nil {
+			return err
+		}
 		return err
 	}
 
@@ -265,14 +168,113 @@
 		PktInd: &openolt.PacketIndication{
 			IntfType:  "pon",
 			IntfId:    ponPortId,
-			GemportId: uint32(gemid),
+			GemportId: uint32(gemId),
 			Pkt:       msg,
 		},
 	}
-	// end of hacked (move in an EAPOL state machine)
-	if err := stream.Send(&openolt.Indication{Data: data}); err != nil {
-		eapolLogger.Errorf("Fail to send EAPOL PktInd indication. %v", err)
+
+	err = stream.Send(&openolt.Indication{Data: data})
+	if err != nil {
+		eapolLogger.WithFields(log.Fields{
+			"OnuId":  onuId,
+			"IntfId": ponPortId,
+			"OnuSn":  serialNumber,
+		}).Errorf("Can't send EapStart Message: %s", err)
+
+		if err := updateAuthFailed(onuId, ponPortId, serialNumber, onuStateMachine); err != nil {
+			return err
+		}
+		return err
+	}
+
+	if err != nil {
+
+	}
+
+	if err := onuStateMachine.Event("eap_start_sent"); err != nil {
+		eapolLogger.WithFields(log.Fields{
+			"OnuId":  onuId,
+			"IntfId": ponPortId,
+			"OnuSn":  serialNumber,
+		}).Errorf("Error while transitioning ONU State %v", err)
 		return err
 	}
 	return nil
 }
+
+func HandleNextPacket(onuId uint32, ponPortId uint32, serialNumber string, onuStateMachine *fsm.FSM, recvpkt gopacket.Packet, stream openolt.Openolt_EnableIndicationServer) {
+	eap, err := extractEAP(recvpkt)
+	if err != nil {
+		eapolLogger.Errorf("%s", err)
+	}
+
+	if eap.Code == layers.EAPCodeRequest && eap.Type == layers.EAPTypeIdentity {
+		reseap := createEAPIdentityResponse(eap.Id)
+		pkt := createEAPOLPkt(reseap, onuId, ponPortId)
+
+		msg := bbsim.ByteMsg{
+			IntfId: ponPortId,
+			OnuId:  onuId,
+			Bytes:  pkt,
+		}
+
+		sendEapolPktIn(msg, stream)
+		eapolLogger.WithFields(log.Fields{
+			"OnuId":  onuId,
+			"IntfId": ponPortId,
+			"OnuSn":  serialNumber,
+		}).Infof("Sent EAPIdentityResponse packet")
+		if err := onuStateMachine.Event("eap_response_identity_sent"); err != nil {
+			eapolLogger.WithFields(log.Fields{
+				"OnuId":  onuId,
+				"IntfId": ponPortId,
+				"OnuSn":  serialNumber,
+			}).Errorf("Error while transitioning ONU State %v", err)
+		}
+
+	} else if eap.Code == layers.EAPCodeRequest && eap.Type == layers.EAPTypeOTP {
+		senddata := getMD5Data(eap)
+		senddata = append([]byte{0x10}, senddata...)
+		sendeap := createEAPChallengeResponse(eap.Id, senddata)
+		pkt := createEAPOLPkt(sendeap, onuId, ponPortId)
+
+		msg := bbsim.ByteMsg{
+			IntfId: ponPortId,
+			OnuId:  onuId,
+			Bytes:  pkt,
+		}
+
+		sendEapolPktIn(msg, stream)
+		eapolLogger.WithFields(log.Fields{
+			"OnuId":  onuId,
+			"IntfId": ponPortId,
+			"OnuSn":  serialNumber,
+		}).Infof("Sent EAPChallengeResponse packet")
+		if err := onuStateMachine.Event("eap_response_challenge_sent"); err != nil {
+			eapolLogger.WithFields(log.Fields{
+				"OnuId":  onuId,
+				"IntfId": ponPortId,
+				"OnuSn":  serialNumber,
+			}).Errorf("Error while transitioning ONU State %v", err)
+		}
+	} else if eap.Code == layers.EAPCodeSuccess && eap.Type == layers.EAPTypeNone {
+		eapolLogger.WithFields(log.Fields{
+			"OnuId":  onuId,
+			"IntfId": ponPortId,
+			"OnuSn":  serialNumber,
+		}).Infof("Received EAPSuccess packet")
+		if err := onuStateMachine.Event("eap_response_success_received"); err != nil {
+			eapolLogger.WithFields(log.Fields{
+				"OnuId":  onuId,
+				"IntfId": ponPortId,
+				"OnuSn":  serialNumber,
+			}).Errorf("Error while transitioning ONU State %v", err)
+		}
+		eapolLogger.WithFields(log.Fields{
+			"OnuId":  onuId,
+			"IntfId": ponPortId,
+			"OnuSn":  serialNumber,
+		}).Infof("EAPOL State machine completed")
+		return
+	}
+}
diff --git a/internal/bbsim/responders/eapol/eapol_test.go b/internal/bbsim/responders/eapol/eapol_test.go
index 9e37dae..a3df56f 100644
--- a/internal/bbsim/responders/eapol/eapol_test.go
+++ b/internal/bbsim/responders/eapol/eapol_test.go
@@ -17,97 +17,169 @@
 package eapol
 
 import (
+	"errors"
 	"github.com/looplab/fsm"
-	bbsim "github.com/opencord/bbsim/internal/bbsim/types"
 	"github.com/opencord/voltha-protos/go/openolt"
 	"google.golang.org/grpc"
 	"gotest.tools/assert"
-	"os"
-	"sync"
 	"testing"
-	"time"
 )
 
-var (
-	originalSendEapStart func(onuId uint32, ponPortId uint32, serialNumber string, stream openolt.Openolt_EnableIndicationServer) error
+// MOCKS
+var calledSend = 0
+
+var eapolStateMachine = fsm.NewFSM(
+	"auth_started",
+	fsm.Events{
+		{Name: "eap_start_sent", Src: []string{"auth_started"}, Dst: "eap_start_sent"},
+		{Name: "eap_response_identity_sent", Src: []string{"eap_start_sent"}, Dst: "eap_response_identity_sent"},
+		{Name: "eap_response_challenge_sent", Src: []string{"eap_response_identity_sent"}, Dst: "eap_response_challenge_sent"},
+		{Name: "eap_response_success_received", Src: []string{"eap_response_challenge_sent"}, Dst: "eap_response_success_received"},
+		{Name: "auth_failed", Src: []string{"auth_started", "eap_start_sent", "eap_response_identity_sent", "eap_response_challenge_sent"}, Dst: "auth_failed"},
+	},
+	fsm.Callbacks{},
 )
 
-type fakeStream struct {
-	calledSend int
+type mockStreamSuccess struct {
 	grpc.ServerStream
 }
 
-func (s fakeStream) Send(flow *openolt.Indication) error {
-	s.calledSend++
+func (s mockStreamSuccess) Send(ind *openolt.Indication) error {
+	calledSend++
 	return nil
 }
 
-func setUp() {
-	originalSendEapStart = sendEapStart
+type mockStreamError struct {
+	grpc.ServerStream
 }
 
-func tearDown() {
-	sendEapStart = originalSendEapStart
+func (s mockStreamError) Send(ind *openolt.Indication) error {
+	calledSend++
+	return errors.New("stream-error")
 }
 
-func TestMain(m *testing.M) {
-	setUp()
-	code := m.Run()
-	tearDown()
-	os.Exit(code)
-}
+// TESTS
 
-func TestCreateWPASupplicant(t *testing.T) {
+func TestSendEapStartSuccess(t *testing.T) {
+	calledSend = 0
+	eapolStateMachine.SetState("auth_started")
 
-	// mocks
-	mockSendEapStartCalled := 0
-	mockSendEapStartArgs := struct {
-		onuId        uint32
-		ponPortId    uint32
-		serialNumber *openolt.SerialNumber
-		stream       openolt.Openolt_EnableIndicationServer
-	}{}
-	mockSendEapStart := func(onuId uint32, ponPortId uint32, serialNumber string, stream openolt.Openolt_EnableIndicationServer) error {
-		mockSendEapStartCalled++
-		mockSendEapStartArgs.onuId = onuId
-		mockSendEapStartArgs.ponPortId = ponPortId
-		return nil
+	// Save current function and restore at the end:
+	old := GetGemPortId
+	defer func() { GetGemPortId = old }()
+
+	GetGemPortId = func(intfId uint32, onuId uint32) (uint16, error) {
+		return 1, nil
 	}
-	sendEapStart = mockSendEapStart
 
 	// params for the function under test
 	var onuId uint32 = 1
 	var ponPortId uint32 = 0
 	var serialNumber string = "BBSM00000001"
 
-	eapolStateMachine := fsm.NewFSM(
-		"auth_started",
-		fsm.Events{
-			{Name: "eap_start_sent", Src: []string{"auth_started"}, Dst: "eap_start_sent"},
-			{Name: "eap_response_identity_sent", Src: []string{"eap_start_sent"}, Dst: "eap_response_identity_sent"},
-			{Name: "eap_response_challenge_sent", Src: []string{"eap_response_identity_sent"}, Dst: "eap_response_challenge_sent"},
-			{Name: "eap_response_success_received", Src: []string{"eap_response_challenge_sent"}, Dst: "eap_response_success_received"},
-		},
-		fsm.Callbacks{},
-	)
+	stream := mockStreamSuccess{}
 
-	pktOutCh := make(chan *bbsim.ByteMsg, 1024)
+	if err := SendEapStart(onuId, ponPortId, serialNumber, eapolStateMachine, stream); err != nil {
+		t.Errorf("SendEapStart returned an error: %v", err)
+		t.Fail()
+	}
 
-	stream := fakeStream{}
+	assert.Equal(t, calledSend, 1)
 
-	wg := sync.WaitGroup{}
-	wg.Add(1)
+	assert.Equal(t, eapolStateMachine.Current(), "eap_start_sent")
 
-	go CreateWPASupplicant(onuId, ponPortId, serialNumber, eapolStateMachine, stream, pktOutCh)
-	go func() {
-		time.Sleep(1 * time.Second)
-		close(pktOutCh)
-		wg.Done()
-	}()
+}
 
-	wg.Wait()
+func TestSendEapStartFailNoGemPort(t *testing.T) {
+	calledSend = 0
+	eapolStateMachine.SetState("auth_started")
 
-	assert.Equal(t, mockSendEapStartCalled, 1)
-	assert.Equal(t, mockSendEapStartArgs.onuId, onuId)
-	assert.Equal(t, mockSendEapStartArgs.ponPortId, ponPortId)
+	// Save current function and restore at the end:
+	old := GetGemPortId
+	defer func() { GetGemPortId = old }()
+
+	GetGemPortId = func(intfId uint32, onuId uint32) (uint16, error) {
+		return 0, errors.New("no-gem-port")
+	}
+
+	// params for the function under test
+	var onuId uint32 = 1
+	var ponPortId uint32 = 0
+	var serialNumber string = "BBSM00000001"
+
+	stream := mockStreamSuccess{}
+
+	err := SendEapStart(onuId, ponPortId, serialNumber, eapolStateMachine, stream)
+	if err == nil {
+		t.Errorf("SendEapStart did not return an error")
+		t.Fail()
+	}
+
+	assert.Equal(t, err.Error(), "no-gem-port")
+
+	assert.Equal(t, eapolStateMachine.Current(), "auth_failed")
+}
+
+func TestSendEapStartFailStreamError(t *testing.T) {
+	calledSend = 0
+	eapolStateMachine.SetState("auth_started")
+
+	// Save current function and restore at the end:
+	old := GetGemPortId
+	defer func() { GetGemPortId = old }()
+
+	GetGemPortId = func(intfId uint32, onuId uint32) (uint16, error) {
+		return 1, nil
+	}
+
+	// params for the function under test
+	var onuId uint32 = 1
+	var ponPortId uint32 = 0
+	var serialNumber string = "BBSM00000001"
+
+	stream := mockStreamError{}
+
+	err := SendEapStart(onuId, ponPortId, serialNumber, eapolStateMachine, stream)
+	if err == nil {
+		t.Errorf("SendEapStart did not return an error")
+		t.Fail()
+	}
+
+	assert.Equal(t, err.Error(), "stream-error")
+
+	assert.Equal(t, eapolStateMachine.Current(), "auth_failed")
+}
+
+// TODO test eapol.HandleNextPacket
+
+func TestUpdateAuthFailed(t *testing.T) {
+
+	var onuId uint32 = 1
+	var ponPortId uint32 = 0
+	var serialNumber string = "BBSM00000001"
+
+	eapolStateMachine.SetState("auth_started")
+	updateAuthFailed(onuId, ponPortId, serialNumber, eapolStateMachine)
+	assert.Equal(t, eapolStateMachine.Current(), "auth_failed")
+
+	eapolStateMachine.SetState("eap_start_sent")
+	updateAuthFailed(onuId, ponPortId, serialNumber, eapolStateMachine)
+	assert.Equal(t, eapolStateMachine.Current(), "auth_failed")
+
+	eapolStateMachine.SetState("eap_response_identity_sent")
+	updateAuthFailed(onuId, ponPortId, serialNumber, eapolStateMachine)
+	assert.Equal(t, eapolStateMachine.Current(), "auth_failed")
+
+	eapolStateMachine.SetState("eap_response_challenge_sent")
+	updateAuthFailed(onuId, ponPortId, serialNumber, eapolStateMachine)
+	assert.Equal(t, eapolStateMachine.Current(), "auth_failed")
+
+	eapolStateMachine.SetState("eap_response_success_received")
+	err := updateAuthFailed(onuId, ponPortId, serialNumber, eapolStateMachine)
+	if err == nil {
+		t.Errorf("updateAuthFailed did not return an error")
+		t.Fail()
+	}
+	assert.Equal(t, err.Error(), "event auth_failed inappropriate in current state eap_response_success_received")
+
 }
diff --git a/internal/bbsim/types/types.go b/internal/bbsim/types/types.go
index 731ca58..83f44fb 100644
--- a/internal/bbsim/types/types.go
+++ b/internal/bbsim/types/types.go
@@ -18,6 +18,7 @@
 
 import "github.com/google/gopacket"
 
+// deprecated
 type ByteMsg struct {
 	IntfId uint32
 	OnuId  uint32