[VOL-3279] Resetting the GemPort when all the flows in an ONU are removed

Change-Id: Icd01703900cd14e0e3baf2abb4669524a47dbeeb
diff --git a/VERSION b/VERSION
index abd4105..3a4036f 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-0.2.4
+0.2.5
diff --git a/internal/bbsim/devices/messageTypes.go b/internal/bbsim/devices/messageTypes.go
index 8bb1d25..5652520 100644
--- a/internal/bbsim/devices/messageTypes.go
+++ b/internal/bbsim/devices/messageTypes.go
@@ -31,7 +31,8 @@
 	OnuDiscIndication MessageType = 3
 	OnuIndication     MessageType = 4
 	OMCI              MessageType = 5
-	FlowUpdate        MessageType = 6
+	FlowAdd           MessageType = 6
+	FlowRemoved       MessageType = 18
 	StartEAPOL        MessageType = 7
 	StartDHCP         MessageType = 8
 	OnuPacketOut      MessageType = 9
@@ -58,7 +59,7 @@
 		"OnuDiscIndication",
 		"OnuIndication",
 		"OMCI",
-		"FlowUpdate",
+		"FlowAdd",
 		"StartEAPOL",
 		"StartDHCP",
 		"OnuPacketOut",
@@ -69,6 +70,7 @@
 		"IGMPMembershipReportV2",
 		"IGMPLeaveGroup",
 		"IGMPMembershipReportV3",
+		"FlowRemoved",
 	}
 	return names[m]
 }
@@ -138,7 +140,7 @@
 
 const (
 	UP   OperState = iota
-	DOWN           // The device has been discovered, but not yet activated
+	DOWN  // The device has been discovered, but not yet activated
 )
 
 func (m OperState) String() string {
diff --git a/internal/bbsim/devices/olt.go b/internal/bbsim/devices/olt.go
index a2e3229..700fdeb 100644
--- a/internal/bbsim/devices/olt.go
+++ b/internal/bbsim/devices/olt.go
@@ -162,7 +162,7 @@
 			// create ONU devices
 			for j := 0; j < olt.NumOnuPerPon; j++ {
 				delay := time.Duration(olt.Delay*j) * time.Millisecond
-				o := CreateONU(&olt, *p, uint32(j+1), options.BBSim.STag, availableCTag, options.BBSim.EnableAuth, options.BBSim.EnableDhcp, delay, isMock)
+				o := CreateONU(&olt, p, uint32(j+1), options.BBSim.STag, availableCTag, options.BBSim.EnableAuth, options.BBSim.EnableDhcp, delay, isMock)
 				p.Onus = append(p.Onus, o)
 				availableCTag = availableCTag + 1
 			}
@@ -178,7 +178,7 @@
 			// create ONU devices
 			for j := 0; j < olt.NumOnuPerPon; j++ {
 				delay := time.Duration(olt.Delay*j) * time.Millisecond
-				o := CreateONU(&olt, *p, uint32(j+1), availableSTag, options.BBSim.CTag, options.BBSim.EnableAuth, options.BBSim.EnableDhcp, delay, isMock)
+				o := CreateONU(&olt, p, uint32(j+1), availableSTag, options.BBSim.CTag, options.BBSim.EnableAuth, options.BBSim.EnableDhcp, delay, isMock)
 				p.Onus = append(p.Onus, o)
 				availableSTag = availableSTag + 1
 			}
@@ -1046,11 +1046,11 @@
 	if flow.AccessIntfId == -1 {
 		oltLogger.WithFields(log.Fields{
 			"FlowId": flow.FlowId,
-		}).Debugf("This is an OLT flow")
+		}).Debug("Adding OLT flow")
 	} else if flow.FlowType == "multicast" {
 		oltLogger.WithFields(log.Fields{
 			"FlowId": flow.FlowId,
-		}).Debugf("This is a multicast flow")
+		}).Debug("Adding OLT multicast flow")
 	} else {
 		pon, err := o.GetPonById(uint32(flow.AccessIntfId))
 		if err != nil {
@@ -1078,7 +1078,7 @@
 		}
 
 		msg := Message{
-			Type: FlowUpdate,
+			Type: FlowAdd,
 			Data: OnuFlowUpdateMessage{
 				PonPortID: pon.ID,
 				OnuID:     onu.ID,
@@ -1093,10 +1093,11 @@
 
 // FlowRemove request from VOLTHA
 func (o OltDevice) FlowRemove(_ context.Context, flow *openolt.Flow) (*openolt.Empty, error) {
+
 	oltLogger.WithFields(log.Fields{
-		"FlowId":   flow.FlowId,
-		"FlowType": flow.FlowType,
-	}).Tracef("OLT receives FlowRemove")
+		"FlowId":           flow.FlowId,
+		"FlowType":         flow.FlowType,
+	}).Debug("OLT receives FlowRemove")
 
 	if !o.enablePerf { // remove only if flow were stored
 		flowKey := FlowKey{
@@ -1130,6 +1131,36 @@
 		// delete from olt flows
 		delete(o.Flows, flowKey)
 	}
+
+	if flow.AccessIntfId == -1 {
+		oltLogger.WithFields(log.Fields{
+			"FlowId": flow.FlowId,
+		}).Debug("Removing OLT flow")
+	} else if flow.FlowType == "multicast" {
+		oltLogger.WithFields(log.Fields{
+			"FlowId": flow.FlowId,
+		}).Debug("Removing OLT multicast flow")
+	} else {
+
+		onu, err := o.GetOnuByFlowId(flow.FlowId)
+		if err != nil {
+			oltLogger.WithFields(log.Fields{
+				"OnuId":  flow.OnuId,
+				"IntfId": flow.AccessIntfId,
+				"err":    err,
+			}).Error("Can't find Onu")
+			return nil, err
+		}
+
+		msg := Message{
+			Type: FlowRemoved,
+			Data: OnuFlowUpdateMessage{
+				Flow:      flow,
+			},
+		}
+		onu.Channel <- msg
+	}
+
 	return new(openolt.Empty), nil
 }
 
@@ -1141,6 +1172,19 @@
 	return &res, nil
 }
 
+func (o *OltDevice) GetOnuByFlowId(flowId uint32) (*Onu, error) {
+	for _, pon := range o.Pons {
+		for _, onu := range pon.Onus {
+			for _, fId := range onu.FlowIds {
+				if fId == flowId {
+					return onu, nil
+				}
+			}
+		}
+	}
+	return nil, errors.New(fmt.Sprintf("Cannot find Onu by flowId %d", flowId))
+}
+
 func (o OltDevice) GetDeviceInfo(context.Context, *openolt.Empty) (*openolt.DeviceInfo, error) {
 
 	oltLogger.WithFields(log.Fields{
diff --git a/internal/bbsim/devices/olt_test.go b/internal/bbsim/devices/olt_test.go
index bcb6f77..747ddf8 100644
--- a/internal/bbsim/devices/olt_test.go
+++ b/internal/bbsim/devices/olt_test.go
@@ -17,6 +17,7 @@
 package devices
 
 import (
+	"github.com/opencord/voltha-protos/v2/go/openolt"
 	"gotest.tools/assert"
 	"net"
 	"testing"
@@ -36,7 +37,7 @@
 			onuId := uint32(i + j)
 			onu := Onu{
 				ID:        onuId,
-				PonPort:   pon,
+				PonPort:   &pon,
 				PonPortID: pon.ID,
 				HwAddress: net.HardwareAddr{0x2e, 0x60, 0x70, 0x13, byte(pon.ID), byte(onuId)},
 			}
@@ -105,3 +106,42 @@
 
 	assert.Equal(t, err.Error(), "cannot-find-onu-by-mac-address-2e:60:70:13:03:03")
 }
+
+func Test_Olt_GetOnuByFlowId(t *testing.T) {
+	numPon := 4
+	numOnu := 4
+
+	olt := createMockOlt(numPon, numOnu)
+
+	// Add the flows to onus (to be found)
+	onu1, _ := olt.FindOnuBySn("BBSM00000303")
+	flow1 := openolt.Flow{
+		FlowId: 64,
+		Classifier: &openolt.Classifier{},
+	}
+	msg1 := OnuFlowUpdateMessage{
+		OnuID:     onu1.ID,
+		PonPortID: onu1.PonPortID,
+		Flow:      &flow1,
+	}
+	onu1.handleFlowAdd(msg1)
+
+	onu2, _ := olt.FindOnuBySn("BBSM00000103")
+	flow2 := openolt.Flow{
+		FlowId: 72,
+		Classifier: &openolt.Classifier{},
+	}
+	msg2 := OnuFlowUpdateMessage{
+		OnuID:     onu2.ID,
+		PonPortID: onu2.PonPortID,
+		Flow:      &flow2,
+	}
+	onu2.handleFlowAdd(msg2)
+
+
+
+	found, err := olt.GetOnuByFlowId(flow1.FlowId)
+
+	assert.Equal(t, err, nil)
+	assert.Equal(t, found.Sn(), onu1.Sn())
+}
\ No newline at end of file
diff --git a/internal/bbsim/devices/onu.go b/internal/bbsim/devices/onu.go
index 78569f5..1acb063 100644
--- a/internal/bbsim/devices/onu.go
+++ b/internal/bbsim/devices/onu.go
@@ -26,7 +26,7 @@
 
 	"github.com/cboling/omci"
 	"github.com/google/gopacket/layers"
-	backoff "github.com/jpillora/backoff"
+	"github.com/jpillora/backoff"
 	"github.com/looplab/fsm"
 	"github.com/opencord/bbsim/internal/bbsim/packetHandlers"
 	"github.com/opencord/bbsim/internal/bbsim/responders/dhcp"
@@ -52,7 +52,7 @@
 type Onu struct {
 	ID                  uint32
 	PonPortID           uint32
-	PonPort             PonPort
+	PonPort             *PonPort
 	STag                int
 	CTag                int
 	Auth                bool // automatically start EAPOL if set to true
@@ -71,6 +71,7 @@
 	EapolFlowReceived bool
 	DhcpFlowReceived  bool
 	Flows             []FlowKey
+	FlowIds           []uint32 // keep track of the flows we currently have in the ONU
 
 	OperState    *fsm.FSM
 	SerialNumber *openolt.SerialNumber
@@ -79,10 +80,9 @@
 	GemPortChannels []chan bool  // this channels are used to notify everyone that is interested that a GemPort has been added
 
 	// OMCI params
-	tid        uint16
-	hpTid      uint16
-	seqNumber  uint16
-	HasGemPort bool
+	tid       uint16
+	hpTid     uint16
+	seqNumber uint16
 
 	DoneChannel       chan bool // this channel is used to signal once the onu is complete (when the struct is used by BBR)
 	TrafficSchedulers *tech_profile.TrafficSchedulers
@@ -98,10 +98,10 @@
 	return listener
 }
 
-func CreateONU(olt *OltDevice, pon PonPort, id uint32, sTag int, cTag int, auth bool, dhcp bool, delay time.Duration, isMock bool) *Onu {
+func CreateONU(olt *OltDevice, pon *PonPort, id uint32, sTag int, cTag int, auth bool, dhcp bool, delay time.Duration, isMock bool) *Onu {
 	b := &backoff.Backoff{
 		//These are the defaults
-		Min:    1 * time.Second,
+		Min:    5 * time.Second,
 		Max:    35 * time.Second,
 		Factor: 1.5,
 		Jitter: false,
@@ -401,9 +401,12 @@
 			case OMCI:
 				msg, _ := message.Data.(OmciMessage)
 				o.handleOmciMessage(msg, stream)
-			case FlowUpdate:
+			case FlowAdd:
 				msg, _ := message.Data.(OnuFlowUpdateMessage)
-				o.handleFlowUpdate(msg)
+				o.handleFlowAdd(msg)
+			case FlowRemoved:
+				msg, _ := message.Data.(OnuFlowUpdateMessage)
+				o.handleFlowRemove(msg)
 			case StartEAPOL:
 				o.handleEAPOLStart(stream)
 			case StartDHCP:
@@ -751,7 +754,7 @@
 	o.ID = id
 }
 
-func (o *Onu) handleFlowUpdate(msg OnuFlowUpdateMessage) {
+func (o *Onu) handleFlowAdd(msg OnuFlowUpdateMessage) {
 	onuLogger.WithFields(log.Fields{
 		"DstPort":          msg.Flow.Classifier.DstPort,
 		"EthType":          fmt.Sprintf("%x", msg.Flow.Classifier.EthType),
@@ -768,7 +771,7 @@
 		"SrcPort":          msg.Flow.Classifier.SrcPort,
 		"UniID":            msg.Flow.UniId,
 		"ClassifierOPbits": msg.Flow.Classifier.OPbits,
-	}).Debug("ONU receives Flow")
+	}).Debug("ONU receives FlowAdd")
 
 	if msg.Flow.UniId != 0 {
 		// as of now BBSim only support a single UNI, so ignore everything that is not targeted to it
@@ -780,6 +783,8 @@
 		return
 	}
 
+	o.FlowIds = append(o.FlowIds, msg.Flow.FlowId)
+
 	if msg.Flow.Classifier.EthType == uint32(layers.EthernetTypeEAPOL) && msg.Flow.Classifier.OVid == 4091 {
 		// NOTE storing the PortNO, it's needed when sending PacketIndications
 		o.storePortNumber(uint32(msg.Flow.PortNo))
@@ -860,6 +865,38 @@
 	}
 }
 
+func (o *Onu) handleFlowRemove(msg OnuFlowUpdateMessage) {
+	onuLogger.WithFields(log.Fields{
+		"IntfId":       o.PonPortID,
+		"OnuId":        o.ID,
+		"SerialNumber": o.Sn(),
+		"FlowId":       msg.Flow.FlowId,
+		"FlowType":     msg.Flow.FlowType,
+	}).Debug("ONU receives FlowRemove")
+
+	for idx, flow := range o.FlowIds {
+		// If the gemport is found, delete it from local cache.
+		if flow == msg.Flow.FlowId {
+			o.FlowIds = append(o.FlowIds[:idx], o.FlowIds[idx+1:]...)
+			break
+		}
+	}
+
+	if len(o.FlowIds) == 0 {
+		onuLogger.WithFields(log.Fields{
+			"IntfId":       o.PonPortID,
+			"OnuId":        o.ID,
+			"SerialNumber": o.Sn(),
+		}).Info("Resetting GemPort")
+		o.GemPortAdded = false
+
+		// TODO ideally we should keep track of the flow type (and not only the ID)
+		// so that we can properly set these two flag when the flow is removed
+		o.EapolFlowReceived = false
+		o.DhcpFlowReceived = false
+	}
+}
+
 // HexDecode converts the hex encoding to binary
 func HexDecode(pkt []byte) []byte {
 	p := make([]byte, len(pkt)/2)
@@ -964,12 +1001,12 @@
 		// In the same way we can create a GemPort even without setting up UNIs/TConts/...
 		// but we need the GemPort to trigger the state change
 
-		if !o.HasGemPort {
+		if !o.GemPortAdded {
 			// NOTE this sends a CreateRequestType and BBSim replies with a CreateResponseType
 			// thus we send this request only once
 			gemReq, _ := omcilib.CreateGemPortRequest(o.getNextTid(false))
 			sendOmciMsg(gemReq, o.PonPortID, o.ID, o.SerialNumber, "CreateGemPortRequest", client)
-			o.HasGemPort = true
+			o.GemPortAdded = true
 		} else {
 			if err := o.InternalState.Event("send_eapol_flow"); err != nil {
 				onuLogger.WithFields(log.Fields{
diff --git a/internal/bbsim/devices/onu_flow_test.go b/internal/bbsim/devices/onu_flow_test.go
index 9ab8abb..b01859e 100644
--- a/internal/bbsim/devices/onu_flow_test.go
+++ b/internal/bbsim/devices/onu_flow_test.go
@@ -47,10 +47,28 @@
 	assert.Equal(t, client.FlowAddSpy.Calls[1].PortNo, onu.ID)
 }
 
+// checks that the FlowId is added to the list
+func Test_HandleFlowAddFlowId(t *testing.T) {
+	onu := createMockOnu(1, 1, 900, 900, true, false)
+
+	flow := openolt.Flow{
+		FlowId: 64,
+		Classifier: &openolt.Classifier{},
+	}
+	msg := OnuFlowUpdateMessage{
+		OnuID:     onu.ID,
+		PonPortID: onu.PonPortID,
+		Flow:      &flow,
+	}
+	onu.handleFlowAdd(msg)
+	assert.Equal(t, len(onu.FlowIds), 1)
+	assert.Equal(t, onu.FlowIds[0], uint32(64))
+}
+
 // validates that when an ONU receives an EAPOL flow for UNI 0
 // and the GemPort has already been configured
 // it transition to auth_started state
-func Test_HandleFlowUpdateEapolWithGem(t *testing.T) {
+func Test_HandleFlowAddEapolWithGem(t *testing.T) {
 
 	onu := createMockOnu(1, 1, 900, 900, true, false)
 
@@ -85,13 +103,13 @@
 		Flow:      &flow,
 	}
 
-	onu.handleFlowUpdate(msg)
+	onu.handleFlowAdd(msg)
 	assert.Equal(t, onu.InternalState.Current(), "auth_started")
 }
 
 // validates that when an ONU receives an EAPOL flow for UNI that is not 0
 // no action is taken (this is independent of GemPort status
-func Test_HandleFlowUpdateEapolWrongUNI(t *testing.T) {
+func Test_HandleFlowAddEapolWrongUNI(t *testing.T) {
 
 	onu := createMockOnu(1, 1, 900, 900, true, false)
 
@@ -126,14 +144,14 @@
 		Flow:      &flow,
 	}
 
-	onu.handleFlowUpdate(msg)
+	onu.handleFlowAdd(msg)
 	assert.Equal(t, onu.InternalState.Current(), "enabled")
 }
 
 // validates that when an ONU receives an EAPOL flow for UNI 0
 // and the GemPort has not yet been configured
 // it transition to auth_started state
-func Test_HandleFlowUpdateEapolWithoutGem(t *testing.T) {
+func Test_HandleFlowAddEapolWithoutGem(t *testing.T) {
 
 	onu := createMockOnu(1, 1, 900, 900, true, false)
 	onu.GemPortAdded = false
@@ -169,7 +187,7 @@
 		Flow:      &flow,
 	}
 
-	onu.handleFlowUpdate(msg)
+	onu.handleFlowAdd(msg)
 
 	wg := sync.WaitGroup{}
 	wg.Add(1)
@@ -191,7 +209,7 @@
 
 // validates that when an ONU receives an EAPOL flow for UNI 0
 // but the noAuth bit is set no action is taken
-func Test_HandleFlowUpdateEapolNoAuth(t *testing.T) {
+func Test_HandleFlowAddEapolNoAuth(t *testing.T) {
 	onu := createMockOnu(1, 1, 900, 900, false, false)
 
 	onu.InternalState = fsm.NewFSM(
@@ -225,14 +243,14 @@
 		Flow:      &flow,
 	}
 
-	onu.handleFlowUpdate(msg)
+	onu.handleFlowAdd(msg)
 	assert.Equal(t, onu.InternalState.Current(), "enabled")
 }
 
 // validates that when an ONU receives a DHCP flow for UNI 0 and pbit 0
 // and the GemPort has already been configured
 // it transition to dhcp_started state
-func Test_HandleFlowUpdateDhcp(t *testing.T) {
+func Test_HandleFlowAddDhcp(t *testing.T) {
 	onu := createMockOnu(1, 1, 900, 900, false, true)
 
 	onu.InternalState = fsm.NewFSM(
@@ -268,7 +286,7 @@
 		Flow:      &flow,
 	}
 
-	onu.handleFlowUpdate(msg)
+	onu.handleFlowAdd(msg)
 	assert.Equal(t, onu.InternalState.Current(), "dhcp_started")
 	assert.Equal(t, onu.DhcpFlowReceived, true)
 }
@@ -276,7 +294,7 @@
 // validates that when an ONU receives a DHCP flow for UNI 0 and pbit 255
 // and the GemPort has already been configured
 // it transition to dhcp_started state
-func Test_HandleFlowUpdateDhcpPBit255(t *testing.T) {
+func Test_HandleFlowAddDhcpPBit255(t *testing.T) {
 	onu := createMockOnu(1, 1, 900, 900, false, true)
 
 	onu.InternalState = fsm.NewFSM(
@@ -312,7 +330,7 @@
 		Flow:      &flow,
 	}
 
-	onu.handleFlowUpdate(msg)
+	onu.handleFlowAdd(msg)
 	assert.Equal(t, onu.InternalState.Current(), "dhcp_started")
 	assert.Equal(t, onu.DhcpFlowReceived, true)
 }
@@ -320,7 +338,7 @@
 // validates that when an ONU receives a DHCP flow for UNI 0 and pbit not 0 or 255
 // and the GemPort has already been configured
 // it ignores the message
-func Test_HandleFlowUpdateDhcpIgnoreByPbit(t *testing.T) {
+func Test_HandleFlowAddDhcpIgnoreByPbit(t *testing.T) {
 	onu := createMockOnu(1, 1, 900, 900, false, true)
 
 	onu.InternalState = fsm.NewFSM(
@@ -356,14 +374,14 @@
 		Flow:      &flow,
 	}
 
-	onu.handleFlowUpdate(msg)
+	onu.handleFlowAdd(msg)
 	assert.Equal(t, onu.InternalState.Current(), "eap_response_success_received")
 	assert.Equal(t, onu.DhcpFlowReceived, false)
 }
 
 // validates that when an ONU receives a DHCP flow for UNI 0
 // but the noDchp bit is set no action is taken
-func Test_HandleFlowUpdateDhcpNoDhcp(t *testing.T) {
+func Test_HandleFlowAddDhcpNoDhcp(t *testing.T) {
 	onu := createMockOnu(1, 1, 900, 900, false, false)
 
 	onu.InternalState = fsm.NewFSM(
@@ -398,7 +416,7 @@
 		Flow:      &flow,
 	}
 
-	onu.handleFlowUpdate(msg)
+	onu.handleFlowAdd(msg)
 	assert.Equal(t, onu.InternalState.Current(), "eap_response_success_received")
 	assert.Equal(t, onu.DhcpFlowReceived, false)
 }
@@ -406,9 +424,9 @@
 // validates that when an ONU receives a DHCP flow for UNI 0 and pbit not 0 or 255
 // and the GemPort has not already been configured
 // it transition to dhcp_started state
-func Test_HandleFlowUpdateDhcpWithoutGem(t *testing.T) {
-	// NOTE that this feature is required when we do DHCP with no eapol
-	// as the DHCP flow will be the first one received
+func Test_HandleFlowAddDhcpWithoutGem(t *testing.T) {
+	// NOTE that this feature is required as there is no guarantee that the gemport is the same
+	// one we received with the EAPOL flow
 	onu := createMockOnu(1, 1, 900, 900, false, true)
 
 	onu.GemPortAdded = false
@@ -446,7 +464,7 @@
 		Flow:      &flow,
 	}
 
-	onu.handleFlowUpdate(msg)
+	onu.handleFlowAdd(msg)
 
 	wg := sync.WaitGroup{}
 	wg.Add(1)
@@ -465,3 +483,51 @@
 	}(&wg)
 	wg.Wait()
 }
+
+// checks that we only remove the correct flow
+func Test_HandleFlowRemoveFlowId(t *testing.T) {
+	onu := createMockOnu(1, 1, 900, 900, true, false)
+
+	onu.FlowIds = []uint32{1, 2, 34, 64, 92}
+
+	flow := openolt.Flow{
+		FlowId: 64,
+		Classifier: &openolt.Classifier{},
+	}
+	msg := OnuFlowUpdateMessage{
+		OnuID:     onu.ID,
+		PonPortID: onu.PonPortID,
+		Flow:      &flow,
+	}
+	onu.handleFlowRemove(msg)
+	assert.Equal(t, len(onu.FlowIds), 4)
+	assert.Equal(t, onu.FlowIds[0], uint32(1))
+	assert.Equal(t, onu.FlowIds[1], uint32(2))
+	assert.Equal(t, onu.FlowIds[2], uint32(34))
+	assert.Equal(t, onu.FlowIds[3], uint32(92))
+}
+
+// checks that when the last flow is removed we reset the stored flags in the ONU
+func Test_HandleFlowRemoveFlowId_LastFlow(t *testing.T) {
+	onu := createMockOnu(1, 1, 900, 900, true, false)
+	onu.GemPortAdded = true
+	onu.DhcpFlowReceived = true
+	onu.EapolFlowReceived = true
+
+	onu.FlowIds = []uint32{64}
+
+	flow := openolt.Flow{
+		FlowId: 64,
+		Classifier: &openolt.Classifier{},
+	}
+	msg := OnuFlowUpdateMessage{
+		OnuID:     onu.ID,
+		PonPortID: onu.PonPortID,
+		Flow:      &flow,
+	}
+	onu.handleFlowRemove(msg)
+	assert.Equal(t, len(onu.FlowIds), 0)
+	assert.Equal(t, onu.GemPortAdded, false)
+	assert.Equal(t, onu.DhcpFlowReceived, false)
+	assert.Equal(t, onu.EapolFlowReceived, false)
+}
\ No newline at end of file
diff --git a/internal/bbsim/devices/onu_test.go b/internal/bbsim/devices/onu_test.go
index 35b75e8..a3f1276 100644
--- a/internal/bbsim/devices/onu_test.go
+++ b/internal/bbsim/devices/onu_test.go
@@ -32,7 +32,7 @@
 		Olt: &olt,
 	}
 
-	onu := CreateONU(&olt, pon, 1, 900, 900, true, false, 0, false)
+	onu := CreateONU(&olt, &pon, 1, 900, 900, true, false, 0, false)
 
 	assert.Equal(t, onu.Sn(), "BBSM00000101")
 	assert.Equal(t, onu.STag, 900)
diff --git a/internal/bbsim/devices/onu_test_helpers.go b/internal/bbsim/devices/onu_test_helpers.go
index 5a991d6..b35dff0 100644
--- a/internal/bbsim/devices/onu_test_helpers.go
+++ b/internal/bbsim/devices/onu_test_helpers.go
@@ -131,7 +131,7 @@
 		ID:  1,
 		Olt: &olt,
 	}
-	onu := CreateONU(&olt, pon, 1, 900, 900, false, false, time.Duration(1*time.Millisecond), true)
+	onu := CreateONU(&olt, &pon, 1, 900, 900, false, false, time.Duration(1*time.Millisecond), true)
 	// NOTE we need this in order to create the OnuChannel
 	onu.InternalState.Event("initialize")
 	onu.DiscoveryRetryDelay = 100 * time.Millisecond
diff --git a/internal/bbsim/responders/dhcp/dhcp.go b/internal/bbsim/responders/dhcp/dhcp.go
index 628084b..1ce71d6 100644
--- a/internal/bbsim/responders/dhcp/dhcp.go
+++ b/internal/bbsim/responders/dhcp/dhcp.go
@@ -251,6 +251,14 @@
 		}).Errorf("Can't retrieve GemPortId: %s", err)
 		return err
 	}
+
+	log.WithFields(log.Fields{
+		"OnuId":  msg.OnuId,
+		"IntfId": msg.IntfId,
+		"GemPort": gemid,
+		"Type": "DHCP",
+	}).Trace("sending-pkt")
+
 	data := &openolt.Indication_PktInd{PktInd: &openolt.PacketIndication{
 		IntfType:  "pon",
 		IntfId:    msg.IntfId,
diff --git a/internal/bbsim/responders/eapol/eapol.go b/internal/bbsim/responders/eapol/eapol.go
index e40e3cd..7ee36fc 100644
--- a/internal/bbsim/responders/eapol/eapol.go
+++ b/internal/bbsim/responders/eapol/eapol.go
@@ -47,6 +47,14 @@
 		}).Errorf("Can't retrieve GemPortId: %s", err)
 		return
 	}
+
+	log.WithFields(log.Fields{
+		"OnuId":  msg.OnuId,
+		"IntfId": msg.IntfId,
+		"GemPort": gemid,
+		"Type": "EAPOL",
+	}).Trace("sending-pkt")
+
 	data := &openolt.Indication_PktInd{PktInd: &openolt.PacketIndication{
 		IntfType:  "pon",
 		IntfId:    msg.IntfId,