[VOL-2778] Handling IGMP

Change-Id: I54c26bc438a144c7d4c64a9b3d543461fc743ab2
diff --git a/internal/bbsim/alarmsim/alarmsim.go b/internal/bbsim/alarmsim/alarmsim.go
index d7c0b54..838e15a 100644
--- a/internal/bbsim/alarmsim/alarmsim.go
+++ b/internal/bbsim/alarmsim/alarmsim.go
@@ -25,7 +25,7 @@
 
 	"github.com/opencord/bbsim/api/bbsim"
 	"github.com/opencord/bbsim/internal/bbsim/devices"
-	"github.com/opencord/voltha-protos/v2/go/openolt"
+	"github.com/opencord/voltha-protos/v3/go/openolt"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
 )
@@ -160,9 +160,14 @@
 	case bbsim.AlarmType_ONU_ITU_PON_STATS:
 		alarm = &openolt.AlarmIndication{
 			Data: &openolt.AlarmIndication_OnuItuPonStatsInd{OnuItuPonStatsInd: &openolt.OnuItuPonStatsIndication{
-				OnuId:     onu.ID,
-				IntfId:    onu.PonPortID,
-				RdiErrors: uint32(extractInt(req.Parameters, "RdiErrors", 0)),
+				OnuId:  onu.ID,
+				IntfId: onu.PonPortID,
+				Stats: &openolt.OnuItuPonStatsIndication_RdiErrorInd{
+					RdiErrorInd: &openolt.RdiErrorIndication{
+						RdiErrorCount: uint64(extractInt(req.Parameters, "RdiErrors", 0)),
+						Status:        req.Status,
+					},
+				},
 			}},
 		}
 	case bbsim.AlarmType_ONU_ALARM_LOS:
diff --git a/internal/bbsim/api/onus_handler.go b/internal/bbsim/api/onus_handler.go
index 332f237..10d671d 100644
--- a/internal/bbsim/api/onus_handler.go
+++ b/internal/bbsim/api/onus_handler.go
@@ -224,6 +224,9 @@
 }
 
 func (s BBSimServer) ChangeIgmpState(ctx context.Context, req *bbsim.IgmpRequest) (*bbsim.Response, error) {
+
+	// TODO check that the ONU is enabled and the services are initialized before changing the state
+
 	res := &bbsim.Response{}
 
 	logger.WithFields(log.Fields{
@@ -250,15 +253,42 @@
 			event = "igmp_join_startv3"
 		}
 
-		if igmpErr := onu.InternalState.Event(event); igmpErr != nil {
-			logger.WithFields(log.Fields{
-				"OnuId":  onu.ID,
-				"IntfId": onu.PonPortID,
-				"OnuSn":  onu.Sn(),
-			}).Errorf("IGMP request failed: %s", igmpErr.Error())
+		errors := []string{}
+		startedOn := []string{}
+		success := true
+
+		for _, s := range onu.Services {
+			service := s.(*devices.Service)
+			if service.NeedsIgmp {
+
+				logger.WithFields(log.Fields{
+					"OnuId":   onu.ID,
+					"IntfId":  onu.PonPortID,
+					"OnuSn":   onu.Sn(),
+					"Service": service.Name,
+				}).Debugf("Sending %s event on Service %s", event, service.Name)
+
+				if err := service.IGMPState.Event(event); err != nil {
+					logger.WithFields(log.Fields{
+						"OnuId":   onu.ID,
+						"IntfId":  onu.PonPortID,
+						"OnuSn":   onu.Sn(),
+						"Service": service.Name,
+					}).Errorf("IGMP request failed: %s", err.Error())
+					errors = append(errors, fmt.Sprintf("%s: %s", service.Name, err.Error()))
+					success = false
+				}
+				startedOn = append(startedOn, service.Name)
+			}
+		}
+
+		if success {
+			res.StatusCode = int32(codes.OK)
+			res.Message = fmt.Sprintf("Authentication restarted on Services %s for ONU %s.",
+				fmt.Sprintf("%v", startedOn), onu.Sn())
+		} else {
 			res.StatusCode = int32(codes.FailedPrecondition)
-			res.Message = igmpErr.Error()
-			return res, igmpErr
+			res.Message = fmt.Sprintf("%v", errors)
 		}
 	}
 
@@ -283,6 +313,7 @@
 	}
 
 	errors := []string{}
+	startedOn := []string{}
 	success := true
 
 	for _, s := range onu.Services {
@@ -298,12 +329,14 @@
 				errors = append(errors, fmt.Sprintf("%s: %s", service.Name, err.Error()))
 				success = false
 			}
+			startedOn = append(startedOn, service.Name)
 		}
 	}
 
 	if success {
 		res.StatusCode = int32(codes.OK)
-		res.Message = fmt.Sprintf("Authentication restarted for ONU %s.", onu.Sn())
+		res.Message = fmt.Sprintf("Authentication restarted on Services %s for ONU %s.",
+			fmt.Sprintf("%v", startedOn), onu.Sn())
 	} else {
 		res.StatusCode = int32(codes.FailedPrecondition)
 		res.Message = fmt.Sprintf("%v", errors)
@@ -330,6 +363,7 @@
 	}
 
 	errors := []string{}
+	startedOn := []string{}
 	success := true
 
 	for _, s := range onu.Services {
@@ -346,12 +380,14 @@
 				errors = append(errors, fmt.Sprintf("%s: %s", service.Name, err.Error()))
 				success = false
 			}
+			startedOn = append(startedOn, service.Name)
 		}
 	}
 
 	if success {
 		res.StatusCode = int32(codes.OK)
-		res.Message = fmt.Sprintf("DHCP restarted for ONU %s.", onu.Sn())
+		res.Message = fmt.Sprintf("DHCP restarted on Services %s for ONU %s.",
+			fmt.Sprintf("%v", startedOn), onu.Sn())
 	} else {
 		res.StatusCode = int32(codes.FailedPrecondition)
 		res.Message = fmt.Sprintf("%v", errors)
diff --git a/internal/bbsim/api/services_handler.go b/internal/bbsim/api/services_handler.go
index 18bb84c..85ee8f6 100644
--- a/internal/bbsim/api/services_handler.go
+++ b/internal/bbsim/api/services_handler.go
@@ -36,6 +36,7 @@
 		GemPort:       int32(s.GemPort),
 		EapolState:    s.EapolState.Current(),
 		DhcpState:     s.DHCPState.Current(),
+		IGMPState:     s.IGMPState.Current(),
 	}
 }
 
diff --git a/internal/bbsim/devices/helpers.go b/internal/bbsim/devices/helpers.go
index e72093b..6b3d220 100644
--- a/internal/bbsim/devices/helpers.go
+++ b/internal/bbsim/devices/helpers.go
@@ -23,7 +23,7 @@
 
 	"github.com/looplab/fsm"
 	"github.com/opencord/bbsim/internal/common"
-	"github.com/opencord/voltha-protos/v2/go/openolt"
+	"github.com/opencord/voltha-protos/v3/go/openolt"
 )
 
 type mode int
diff --git a/internal/bbsim/devices/messageTypes.go b/internal/bbsim/devices/messageTypes.go
index cfe7200..b677416 100644
--- a/internal/bbsim/devices/messageTypes.go
+++ b/internal/bbsim/devices/messageTypes.go
@@ -19,7 +19,7 @@
 import (
 	"github.com/google/gopacket"
 	"github.com/opencord/bbsim/internal/bbsim/packetHandlers"
-	"github.com/opencord/voltha-protos/v2/go/openolt"
+	"github.com/opencord/voltha-protos/v3/go/openolt"
 	"net"
 )
 
diff --git a/internal/bbsim/devices/olt.go b/internal/bbsim/devices/olt.go
index a9420de..c883e95 100644
--- a/internal/bbsim/devices/olt.go
+++ b/internal/bbsim/devices/olt.go
@@ -20,6 +20,7 @@
 	"context"
 	"encoding/hex"
 	"fmt"
+	"github.com/opencord/voltha-protos/v3/go/ext/config"
 	"net"
 	"sync"
 	"time"
@@ -32,8 +33,9 @@
 	bbsim "github.com/opencord/bbsim/internal/bbsim/types"
 	"github.com/opencord/bbsim/internal/common"
 	omcisim "github.com/opencord/omci-sim"
-	"github.com/opencord/voltha-protos/v2/go/openolt"
-	"github.com/opencord/voltha-protos/v2/go/tech_profile"
+	common_protos "github.com/opencord/voltha-protos/v3/go/common"
+	"github.com/opencord/voltha-protos/v3/go/openolt"
+	"github.com/opencord/voltha-protos/v3/go/tech_profile"
 	log "github.com/sirupsen/logrus"
 	"google.golang.org/grpc"
 	"google.golang.org/grpc/codes"
@@ -1074,7 +1076,21 @@
 		}).Debug("Adding OLT flow")
 	} else if flow.FlowType == "multicast" {
 		oltLogger.WithFields(log.Fields{
-			"FlowId": flow.FlowId,
+			"Cookie":           flow.Cookie,
+			"DstPort":          flow.Classifier.DstPort,
+			"EthType":          fmt.Sprintf("%x", flow.Classifier.EthType),
+			"FlowId":           flow.FlowId,
+			"FlowType":         flow.FlowType,
+			"GemportId":        flow.GemportId,
+			"InnerVlan":        flow.Classifier.IVid,
+			"IntfId":           flow.AccessIntfId,
+			"IpProto":          flow.Classifier.IpProto,
+			"OnuId":            flow.OnuId,
+			"OuterVlan":        flow.Classifier.OVid,
+			"PortNo":           flow.PortNo,
+			"SrcPort":          flow.Classifier.SrcPort,
+			"UniID":            flow.UniId,
+			"ClassifierOPbits": flow.Classifier.OPbits,
 		}).Debug("Adding OLT multicast flow")
 	} else {
 		pon, err := o.GetPonById(uint32(flow.AccessIntfId))
@@ -1301,13 +1317,14 @@
 	}).Trace("Received OnuPacketOut")
 
 	rawpkt := gopacket.NewPacket(onuPkt.Pkt, layers.LayerTypeEthernet, gopacket.Default)
-	pktType, err := packetHandlers.IsEapolOrDhcp(rawpkt)
+
+	pktType, err := packetHandlers.GetPktType(rawpkt)
 	if err != nil {
 		onuLogger.WithFields(log.Fields{
 			"IntfId": onu.PonPortID,
 			"OnuId":  onu.ID,
 			"OnuSn":  onu.Sn(),
-			"Pkt":    rawpkt.Data(),
+			"Pkt":    hex.EncodeToString(rawpkt.Data()),
 		}).Error("Can't find pktType in packet, droppint it")
 		return new(openolt.Empty), nil
 	}
@@ -1473,3 +1490,39 @@
 	o.channel <- msg
 	return nil
 }
+
+func (o *OltDevice) PerformGroupOperation(ctx context.Context, group *openolt.Group) (*openolt.Empty, error) {
+	oltLogger.WithFields(log.Fields{
+		"GroupId": group.GroupId,
+		"Command": group.Command,
+		"Members": group.Members,
+		"Action":  group.Action,
+	}).Debug("received PerformGroupOperation")
+	return &openolt.Empty{}, nil
+}
+
+func (o *OltDevice) DeleteGroup(ctx context.Context, group *openolt.Group) (*openolt.Empty, error) {
+	oltLogger.WithFields(log.Fields{
+		"GroupId": group.GroupId,
+		"Command": group.Command,
+		"Members": group.Members,
+		"Action":  group.Action,
+	}).Debug("received PerformGroupOperation")
+	return &openolt.Empty{}, nil
+}
+
+func (o *OltDevice) GetExtValue(ctx context.Context, in *openolt.ValueParam) (*common_protos.ReturnValues, error) {
+	return &common_protos.ReturnValues{}, nil
+}
+
+func (o *OltDevice) OnuItuPonAlarmSet(ctx context.Context, in *config.OnuItuPonAlarm) (*openolt.Empty, error) {
+	return &openolt.Empty{}, nil
+}
+
+func (o *OltDevice) GetLogicalOnuDistanceZero(ctx context.Context, in *openolt.Onu) (*openolt.OnuLogicalDistance, error) {
+	return &openolt.OnuLogicalDistance{}, nil
+}
+
+func (o *OltDevice) GetLogicalOnuDistance(ctx context.Context, in *openolt.Onu) (*openolt.OnuLogicalDistance, error) {
+	return &openolt.OnuLogicalDistance{}, nil
+}
diff --git a/internal/bbsim/devices/olt_test.go b/internal/bbsim/devices/olt_test.go
index 53f9f44..1536baf 100644
--- a/internal/bbsim/devices/olt_test.go
+++ b/internal/bbsim/devices/olt_test.go
@@ -21,7 +21,7 @@
 	"net"
 	"testing"
 
-	"github.com/opencord/voltha-protos/v2/go/openolt"
+	"github.com/opencord/voltha-protos/v3/go/openolt"
 	"gotest.tools/assert"
 )
 
diff --git a/internal/bbsim/devices/onu.go b/internal/bbsim/devices/onu.go
index 80308da..5300b38 100644
--- a/internal/bbsim/devices/onu.go
+++ b/internal/bbsim/devices/onu.go
@@ -18,6 +18,7 @@
 
 import (
 	"context"
+	"encoding/hex"
 	"fmt"
 	"github.com/opencord/bbsim/internal/bbsim/packetHandlers"
 	"github.com/opencord/bbsim/internal/bbsim/responders/dhcp"
@@ -33,8 +34,8 @@
 	"github.com/opencord/bbsim/internal/common"
 	omcilib "github.com/opencord/bbsim/internal/common/omci"
 	omcisim "github.com/opencord/omci-sim"
-	"github.com/opencord/voltha-protos/v2/go/openolt"
-	"github.com/opencord/voltha-protos/v2/go/tech_profile"
+	"github.com/opencord/voltha-protos/v3/go/openolt"
+	"github.com/opencord/voltha-protos/v3/go/tech_profile"
 	log "github.com/sirupsen/logrus"
 )
 
@@ -235,23 +236,6 @@
 				}
 				o.Channel <- msg
 			},
-			"igmp_join_start": func(e *fsm.Event) {
-				msg := Message{
-					Type: IGMPMembershipReportV2,
-				}
-				o.Channel <- msg
-			},
-			"igmp_leave": func(e *fsm.Event) {
-				msg := Message{
-					Type: IGMPLeaveGroup}
-				o.Channel <- msg
-			},
-			"igmp_join_startv3": func(e *fsm.Event) {
-				msg := Message{
-					Type: IGMPMembershipReportV3,
-				}
-				o.Channel <- msg
-			},
 		},
 	)
 
@@ -325,27 +309,32 @@
 					"pktType": msg.Type,
 				}).Trace("Received OnuPacketOut Message")
 
-				service, err := o.findServiceByMacAddress(msg.MacAddress)
-				if err != nil {
-					onuLogger.WithFields(log.Fields{
-						"IntfId":     msg.IntfId,
-						"OnuId":      msg.OnuId,
-						"pktType":    msg.Type,
-						"MacAddress": msg.MacAddress,
-						"OnuSn":      o.Sn(),
-					}).Error("Cannot find Service associated with packet")
-					return
+				if msg.Type == packetHandlers.EAPOL || msg.Type == packetHandlers.DHCP {
+
+					service, err := o.findServiceByMacAddress(msg.MacAddress)
+					if err != nil {
+						onuLogger.WithFields(log.Fields{
+							"IntfId":     msg.IntfId,
+							"OnuId":      msg.OnuId,
+							"pktType":    msg.Type,
+							"MacAddress": msg.MacAddress,
+							"Pkt":        hex.EncodeToString(msg.Packet.Data()),
+							"OnuSn":      o.Sn(),
+						}).Error("Cannot find Service associated with packet")
+						return
+					}
+					service.PacketCh <- msg
+				} else if msg.Type == packetHandlers.IGMP {
+					// if it's an IGMP packet we assume we have a single IGMP service
+					for _, s := range o.Services {
+						service := s.(*Service)
+
+						if service.NeedsIgmp {
+							service.PacketCh <- msg
+						}
+					}
 				}
 
-				service.PacketCh <- msg
-
-				onuLogger.WithFields(log.Fields{
-					"IntfId":      msg.IntfId,
-					"OnuId":       msg.OnuId,
-					"pktType":     msg.Type,
-					"ServiceName": service.Name,
-				}).Info("OnuPacketOut Sent on Service Packet channel")
-
 			case OnuPacketIn:
 				// NOTE we only receive BBR packets here.
 				// Eapol.HandleNextPacket can handle both BBSim and BBr cases so the call is the same
diff --git a/internal/bbsim/devices/onu_flow_test.go b/internal/bbsim/devices/onu_flow_test.go
index 3f0d702..0b8037a 100644
--- a/internal/bbsim/devices/onu_flow_test.go
+++ b/internal/bbsim/devices/onu_flow_test.go
@@ -19,7 +19,7 @@
 import (
 	"github.com/google/gopacket/layers"
 	"github.com/looplab/fsm"
-	"github.com/opencord/voltha-protos/v2/go/openolt"
+	"github.com/opencord/voltha-protos/v3/go/openolt"
 	"gotest.tools/assert"
 	"testing"
 )
diff --git a/internal/bbsim/devices/onu_indications_test.go b/internal/bbsim/devices/onu_indications_test.go
index 76c7a7d..1905c20 100644
--- a/internal/bbsim/devices/onu_indications_test.go
+++ b/internal/bbsim/devices/onu_indications_test.go
@@ -22,7 +22,7 @@
 	"testing"
 	"time"
 
-	"github.com/opencord/voltha-protos/v2/go/openolt"
+	"github.com/opencord/voltha-protos/v3/go/openolt"
 	"google.golang.org/grpc"
 	"gotest.tools/assert"
 )
diff --git a/internal/bbsim/devices/onu_test_helpers.go b/internal/bbsim/devices/onu_test_helpers.go
index 76f7917..d98dc3f 100644
--- a/internal/bbsim/devices/onu_test_helpers.go
+++ b/internal/bbsim/devices/onu_test_helpers.go
@@ -19,10 +19,12 @@
 import (
 	"context"
 	"errors"
+	"github.com/opencord/voltha-protos/v3/go/common"
+	"github.com/opencord/voltha-protos/v3/go/ext/config"
 	"time"
 
-	"github.com/opencord/voltha-protos/v2/go/openolt"
-	"github.com/opencord/voltha-protos/v2/go/tech_profile"
+	"github.com/opencord/voltha-protos/v3/go/openolt"
+	"github.com/opencord/voltha-protos/v3/go/tech_profile"
 	"google.golang.org/grpc"
 )
 
@@ -104,6 +106,24 @@
 func (s *mockClient) EnableIndication(ctx context.Context, in *openolt.Empty, opts ...grpc.CallOption) (openolt.Openolt_EnableIndicationClient, error) {
 	return nil, errors.New("unimplemented-in-mock-client")
 }
+func (s *mockClient) PerformGroupOperation(ctx context.Context, group *openolt.Group, opts ...grpc.CallOption) (*openolt.Empty, error) {
+	return nil, errors.New("unimplemented-in-mock-client")
+}
+func (s *mockClient) DeleteGroup(ctx context.Context, group *openolt.Group, opts ...grpc.CallOption) (*openolt.Empty, error) {
+	return nil, errors.New("unimplemented-in-mock-client")
+}
+func (s *mockClient) GetExtValue(ctx context.Context, in *openolt.ValueParam, opts ...grpc.CallOption) (*common.ReturnValues, error) {
+	return nil, errors.New("unimplemented-in-mock-client")
+}
+func (s *mockClient) OnuItuPonAlarmSet(ctx context.Context, in *config.OnuItuPonAlarm, opts ...grpc.CallOption) (*openolt.Empty, error) {
+	return nil, errors.New("unimplemented-in-mock-client")
+}
+func (s *mockClient) GetLogicalOnuDistanceZero(ctx context.Context, in *openolt.Onu, opts ...grpc.CallOption) (*openolt.OnuLogicalDistance, error) {
+	return nil, errors.New("unimplemented-in-mock-client")
+}
+func (s *mockClient) GetLogicalOnuDistance(ctx context.Context, in *openolt.Onu, opts ...grpc.CallOption) (*openolt.OnuLogicalDistance, error) {
+	return nil, errors.New("unimplemented-in-mock-client")
+}
 
 // this method creates a fake ONU used in the tests
 func createMockOnu(id uint32, ponPortId uint32) *Onu {
diff --git a/internal/bbsim/devices/pon.go b/internal/bbsim/devices/pon.go
index a48dfb6..f9f9290 100644
--- a/internal/bbsim/devices/pon.go
+++ b/internal/bbsim/devices/pon.go
@@ -21,7 +21,7 @@
 	"fmt"
 
 	"github.com/looplab/fsm"
-	"github.com/opencord/voltha-protos/v2/go/openolt"
+	"github.com/opencord/voltha-protos/v3/go/openolt"
 	log "github.com/sirupsen/logrus"
 )
 
diff --git a/internal/bbsim/devices/service_test.go b/internal/bbsim/devices/service_test.go
index 2191178..c585906 100644
--- a/internal/bbsim/devices/service_test.go
+++ b/internal/bbsim/devices/service_test.go
@@ -18,7 +18,7 @@
 
 import (
 	"github.com/opencord/bbsim/internal/bbsim/types"
-	"github.com/opencord/voltha-protos/v2/go/openolt"
+	"github.com/opencord/voltha-protos/v3/go/openolt"
 	"github.com/stretchr/testify/assert"
 	"net"
 	"testing"
diff --git a/internal/bbsim/devices/services.go b/internal/bbsim/devices/services.go
index c8e9461..1d45ef7 100644
--- a/internal/bbsim/devices/services.go
+++ b/internal/bbsim/devices/services.go
@@ -17,10 +17,12 @@
 package devices
 
 import (
+	"encoding/hex"
 	"github.com/looplab/fsm"
 	"github.com/opencord/bbsim/internal/bbsim/packetHandlers"
 	"github.com/opencord/bbsim/internal/bbsim/responders/dhcp"
 	"github.com/opencord/bbsim/internal/bbsim/responders/eapol"
+	"github.com/opencord/bbsim/internal/bbsim/responders/igmp"
 	bbsimTypes "github.com/opencord/bbsim/internal/bbsim/types"
 	log "github.com/sirupsen/logrus"
 	"net"
@@ -61,6 +63,7 @@
 	InternalState *fsm.FSM
 	EapolState    *fsm.FSM
 	DHCPState     *fsm.FSM
+	IGMPState     *fsm.FSM
 	Channel       chan Message          // drive Service lifecycle
 	PacketCh      chan OnuPacketMessage // handle packets
 	Stream        bbsimTypes.Stream     // the gRPC stream to communicate with the adapter, created in the initialize transition
@@ -175,6 +178,35 @@
 		},
 	)
 
+	service.IGMPState = fsm.NewFSM(
+		"created",
+		fsm.Events{
+			{Name: "igmp_join_start", Src: []string{"created", "igmp_left", "igmp_join_error", "igmp_join_started"}, Dst: "igmp_join_started"},
+			{Name: "igmp_join_startv3", Src: []string{"igmp_left", "igmp_join_error", "igmp_join_started"}, Dst: "igmp_join_started"},
+			{Name: "igmp_join_error", Src: []string{"igmp_join_started"}, Dst: "igmp_join_error"},
+			{Name: "igmp_leave", Src: []string{"igmp_join_started"}, Dst: "igmp_left"},
+		},
+		fsm.Callbacks{
+			"igmp_join_start": func(e *fsm.Event) {
+				msg := Message{
+					Type: IGMPMembershipReportV2,
+				}
+				service.Channel <- msg
+			},
+			"igmp_leave": func(e *fsm.Event) {
+				msg := Message{
+					Type: IGMPLeaveGroup}
+				service.Channel <- msg
+			},
+			"igmp_join_startv3": func(e *fsm.Event) {
+				msg := Message{
+					Type: IGMPMembershipReportV3,
+				}
+				service.Channel <- msg
+			},
+		},
+	)
+
 	return &service, nil
 }
 
@@ -272,6 +304,9 @@
 			eapol.HandleNextPacket(msg.OnuId, msg.IntfId, s.GemPort, s.Onu.Sn(), s.Onu.PortNo, s.EapolState, msg.Packet, s.Stream, nil)
 		} else if msg.Type == packetHandlers.DHCP {
 			_ = dhcp.HandleNextPacket(s.Onu.PonPort.Olt.ID, s.Onu.ID, s.Onu.PonPortID, s.Name, s.Onu.Sn(), s.Onu.PortNo, s.CTag, s.GemPort, s.HwAddress, s.DHCPState, msg.Packet, s.UsPonCTagPriority, s.Stream)
+		} else if msg.Type == packetHandlers.IGMP {
+			log.Warn(hex.EncodeToString(msg.Packet.Data()))
+			_ = igmp.HandleNextPacket(s.Onu.PonPortID, s.Onu.ID, s.Onu.Sn(), s.Onu.PortNo, s.GemPort, s.HwAddress, msg.Packet, s.CTag, s.UsPonCTagPriority, s.Stream)
 		}
 	}
 }
@@ -295,7 +330,8 @@
 		}).Debug("Done Listening on Service Channel")
 	}()
 	for msg := range s.Channel {
-		if msg.Type == StartEAPOL {
+		switch msg.Type {
+		case StartEAPOL:
 			if err := s.handleEapolStart(s.Stream); err != nil {
 				serviceLogger.WithFields(log.Fields{
 					"OnuId":  s.Onu.ID,
@@ -306,7 +342,7 @@
 				}).Error("Error while sending EapolStart packet")
 				_ = s.EapolState.Event("auth_failed")
 			}
-		} else if msg.Type == StartDHCP {
+		case StartDHCP:
 			if err := s.handleDHCPStart(s.Stream); err != nil {
 				serviceLogger.WithFields(log.Fields{
 					"OnuId":  s.Onu.ID,
@@ -316,7 +352,33 @@
 					"err":    err,
 				}).Error("Error while sending DHCPDiscovery packet")
 				_ = s.DHCPState.Event("dhcp_failed")
+
 			}
+		case IGMPMembershipReportV2:
+			serviceLogger.WithFields(log.Fields{
+				"OnuId":  s.Onu.ID,
+				"IntfId": s.Onu.PonPortID,
+				"OnuSn":  s.Onu.Sn(),
+				"Name":   s.Name,
+			}).Debug("Recieved IGMPMembershipReportV2 message on ONU channel")
+			_ = igmp.SendIGMPMembershipReportV2(s.Onu.PonPortID, s.Onu.ID, s.Onu.Sn(), s.Onu.PortNo, s.GemPort, s.HwAddress, s.CTag, s.UsPonCTagPriority, s.Stream)
+		case IGMPLeaveGroup:
+			serviceLogger.WithFields(log.Fields{
+				"OnuId":  s.Onu.ID,
+				"IntfId": s.Onu.PonPortID,
+				"OnuSn":  s.Onu.Sn(),
+				"Name":   s.Name,
+			}).Debug("Recieved IGMPLeaveGroupV2 message on ONU channel")
+			_ = igmp.SendIGMPLeaveGroupV2(s.Onu.PonPortID, s.Onu.ID, s.Onu.Sn(), s.Onu.PortNo, s.GemPort, s.HwAddress, s.CTag, s.UsPonCTagPriority, s.Stream)
+		case IGMPMembershipReportV3:
+			serviceLogger.WithFields(log.Fields{
+				"OnuId":  s.Onu.ID,
+				"IntfId": s.Onu.PonPortID,
+				"OnuSn":  s.Onu.Sn(),
+				"Name":   s.Name,
+			}).Debug("Recieved IGMPMembershipReportV3 message on ONU channel")
+			_ = igmp.SendIGMPMembershipReportV3(s.Onu.PonPortID, s.Onu.ID, s.Onu.Sn(), s.Onu.PortNo, s.GemPort, s.HwAddress, s.CTag, s.UsPonCTagPriority, s.Stream)
+
 		}
 	}
 }
diff --git a/internal/bbsim/packetHandlers/filters.go b/internal/bbsim/packetHandlers/filters.go
index 9ada88c..c46f87d 100644
--- a/internal/bbsim/packetHandlers/filters.go
+++ b/internal/bbsim/packetHandlers/filters.go
@@ -30,6 +30,13 @@
 	return false
 }
 
+func IsIgmpPacket(pkt gopacket.Packet) bool {
+	if igmpLayer := pkt.Layer(layers.LayerTypeIGMP); igmpLayer != nil {
+		return true
+	}
+	return false
+}
+
 func IsLldpPacket(pkt gopacket.Packet) bool {
 	if layer := pkt.Layer(layers.LayerTypeLinkLayerDiscovery); layer != nil {
 		return true
@@ -78,11 +85,13 @@
 }
 
 // returns wether it's an EAPOL or DHCP packet, error if it's none
-func IsEapolOrDhcp(pkt gopacket.Packet) (PacketType, error) {
+func GetPktType(pkt gopacket.Packet) (PacketType, error) {
 	if pkt.Layer(layers.LayerTypeEAP) != nil || pkt.Layer(layers.LayerTypeEAPOL) != nil {
 		return EAPOL, nil
 	} else if IsDhcpPacket(pkt) {
 		return DHCP, nil
+	} else if IsIgmpPacket(pkt) {
+		return IGMP, nil
 	}
-	return UNKNOWN, errors.New("packet-is-neither-eapol-or-dhcp")
+	return UNKNOWN, errors.New("unknown-packet-type")
 }
diff --git a/internal/bbsim/packetHandlers/filters_test.go b/internal/bbsim/packetHandlers/filters_test.go
index 8d5f96a..c182e82 100644
--- a/internal/bbsim/packetHandlers/filters_test.go
+++ b/internal/bbsim/packetHandlers/filters_test.go
@@ -20,6 +20,7 @@
 	"github.com/google/gopacket"
 	"github.com/google/gopacket/layers"
 	"github.com/opencord/bbsim/internal/bbsim/packetHandlers"
+	"github.com/opencord/bbsim/internal/bbsim/responders/igmp"
 	"gotest.tools/assert"
 	"net"
 	"testing"
@@ -62,6 +63,29 @@
 	assert.Equal(t, res, false)
 }
 
+func Test_IsIgmpPacket(t *testing.T) {
+	igmp := &igmp.IGMP{
+		Type:         layers.IGMPMembershipReportV2, //IGMPV2 Membership Report
+		Checksum:     0,
+		GroupAddress: net.IPv4(224, 0, 0, 22),
+		Version:      2,
+	}
+	buffer := gopacket.NewSerializeBuffer()
+	options := gopacket.SerializeOptions{
+		ComputeChecksums: true,
+		FixLengths:       true,
+	}
+
+	if err := gopacket.SerializeLayers(buffer, options, igmp); err != nil {
+		t.Fatal(err)
+	}
+
+	pkt := gopacket.NewPacket(buffer.Bytes(), layers.LayerTypeIGMP, gopacket.DecodeOptions{})
+
+	res := packetHandlers.IsIgmpPacket(pkt)
+	assert.Equal(t, res, true)
+}
+
 func Test_IsLldpPacket_True(t *testing.T) {
 	layer := &layers.LinkLayerDiscovery{
 		PortID: layers.LLDPPortID{
diff --git a/internal/bbsim/packetHandlers/packetTypes.go b/internal/bbsim/packetHandlers/packetTypes.go
index fe424f0..c89443b 100644
--- a/internal/bbsim/packetHandlers/packetTypes.go
+++ b/internal/bbsim/packetHandlers/packetTypes.go
@@ -22,6 +22,7 @@
 	UNKNOWN PacketType = iota
 	EAPOL
 	DHCP
+	IGMP
 )
 
 func (t PacketType) String() string {
@@ -29,6 +30,7 @@
 		"UNKNOWN",
 		"EAPOL",
 		"DHCP",
+		"IGMP",
 	}
 	return names[t]
 }
diff --git a/internal/bbsim/responders/dhcp/dhcp.go b/internal/bbsim/responders/dhcp/dhcp.go
index 5fc8a64..9b48c05 100644
--- a/internal/bbsim/responders/dhcp/dhcp.go
+++ b/internal/bbsim/responders/dhcp/dhcp.go
@@ -30,7 +30,7 @@
 	"github.com/looplab/fsm"
 	"github.com/opencord/bbsim/internal/bbsim/packetHandlers"
 	bbsim "github.com/opencord/bbsim/internal/bbsim/types"
-	"github.com/opencord/voltha-protos/v2/go/openolt"
+	"github.com/opencord/voltha-protos/v3/go/openolt"
 	log "github.com/sirupsen/logrus"
 )
 
diff --git a/internal/bbsim/responders/dhcp/dhcp_test.go b/internal/bbsim/responders/dhcp/dhcp_test.go
index cb6d9b7..2a49836 100644
--- a/internal/bbsim/responders/dhcp/dhcp_test.go
+++ b/internal/bbsim/responders/dhcp/dhcp_test.go
@@ -22,7 +22,7 @@
 	"testing"
 
 	"github.com/looplab/fsm"
-	"github.com/opencord/voltha-protos/v2/go/openolt"
+	"github.com/opencord/voltha-protos/v3/go/openolt"
 	"google.golang.org/grpc"
 	"gotest.tools/assert"
 )
diff --git a/internal/bbsim/responders/eapol/eapol.go b/internal/bbsim/responders/eapol/eapol.go
index 8a37f02..f9488fb 100644
--- a/internal/bbsim/responders/eapol/eapol.go
+++ b/internal/bbsim/responders/eapol/eapol.go
@@ -27,7 +27,7 @@
 	"github.com/looplab/fsm"
 	bbsim "github.com/opencord/bbsim/internal/bbsim/types"
 	omci "github.com/opencord/omci-sim"
-	"github.com/opencord/voltha-protos/v2/go/openolt"
+	"github.com/opencord/voltha-protos/v3/go/openolt"
 	log "github.com/sirupsen/logrus"
 )
 
diff --git a/internal/bbsim/responders/eapol/eapol_test.go b/internal/bbsim/responders/eapol/eapol_test.go
index 824ce65..05dcf9f 100644
--- a/internal/bbsim/responders/eapol/eapol_test.go
+++ b/internal/bbsim/responders/eapol/eapol_test.go
@@ -22,7 +22,7 @@
 	"testing"
 
 	"github.com/looplab/fsm"
-	"github.com/opencord/voltha-protos/v2/go/openolt"
+	"github.com/opencord/voltha-protos/v3/go/openolt"
 	"google.golang.org/grpc"
 	"gotest.tools/assert"
 )
diff --git a/internal/bbsim/responders/igmp/igmp.go b/internal/bbsim/responders/igmp/igmp.go
index defbe19..5390347 100644
--- a/internal/bbsim/responders/igmp/igmp.go
+++ b/internal/bbsim/responders/igmp/igmp.go
@@ -15,25 +15,28 @@
 
 import (
 	"encoding/binary"
+	"encoding/hex"
+	"errors"
+	"github.com/opencord/bbsim/internal/bbsim/packetHandlers"
 	"net"
 	"time"
 
 	"github.com/google/gopacket"
 	"github.com/google/gopacket/layers"
 	bbsim "github.com/opencord/bbsim/internal/bbsim/types"
-	omci "github.com/opencord/omci-sim"
-	"github.com/opencord/voltha-protos/v2/go/openolt"
+	"github.com/opencord/voltha-protos/v3/go/openolt"
 	log "github.com/sirupsen/logrus"
 )
 
-func SendIGMPLeaveGroupV2(ponPortId uint32, onuId uint32, serialNumber string, portNo uint32, macAddress net.HardwareAddr, stream bbsim.Stream) error {
+func SendIGMPLeaveGroupV2(ponPortId uint32, onuId uint32, serialNumber string, portNo uint32,
+	gemPortId uint32, macAddress net.HardwareAddr, cTag int, pbit uint8, stream bbsim.Stream) error {
 	log.WithFields(log.Fields{
 		"OnuId":        onuId,
 		"SerialNumber": serialNumber,
 		"PortNo":       portNo,
 	}).Debugf("Entered SendIGMPLeaveGroupV2")
 	igmp := createIGMPV2LeaveRequestPacket()
-	pkt, err := serializeIgmpPacket(ponPortId, onuId, macAddress, igmp)
+	pkt, err := serializeIgmpPacket(ponPortId, onuId, cTag, macAddress, pbit, igmp)
 
 	if err != nil {
 		log.WithFields(log.Fields{
@@ -44,21 +47,11 @@
 		return err
 	}
 
-	gemid, err := omci.GetGemPortId(ponPortId, onuId)
-	if err != nil {
-		log.WithFields(log.Fields{
-			"OnuId":        onuId,
-			"IntfId":       ponPortId,
-			"SerialNumber": serialNumber,
-		}).Errorf("Can't retrieve GemPortId for IGMP: %s", err)
-		return err
-	}
-
 	data := &openolt.Indication_PktInd{
 		PktInd: &openolt.PacketIndication{
 			IntfType:  "pon",
 			IntfId:    ponPortId,
-			GemportId: uint32(gemid),
+			GemportId: gemPortId,
 			Pkt:       pkt,
 			PortNo:    portNo,
 		},
@@ -77,14 +70,11 @@
 	return nil
 }
 
-func SendIGMPMembershipReportV2(ponPortId uint32, onuId uint32, serialNumber string, portNo uint32, macAddress net.HardwareAddr, stream bbsim.Stream) error {
-	log.WithFields(log.Fields{
-		"OnuId":        onuId,
-		"SerialNumber": serialNumber,
-		"PortNo":       portNo,
-	}).Debugf("Entered SendIGMPMembershipReportV2")
+func SendIGMPMembershipReportV2(ponPortId uint32, onuId uint32, serialNumber string, portNo uint32,
+	gemPortId uint32, macAddress net.HardwareAddr, cTag int, pbit uint8, stream bbsim.Stream) error {
+
 	igmp := createIGMPV2MembershipReportPacket()
-	pkt, err := serializeIgmpPacket(ponPortId, onuId, macAddress, igmp)
+	pkt, err := serializeIgmpPacket(ponPortId, onuId, cTag, macAddress, pbit, igmp)
 
 	if err != nil {
 		log.WithFields(log.Fields{
@@ -95,21 +85,11 @@
 		return err
 	}
 
-	gemid, err := omci.GetGemPortId(ponPortId, onuId)
-	if err != nil {
-		log.WithFields(log.Fields{
-			"OnuId":        onuId,
-			"IntfId":       ponPortId,
-			"SerialNumber": serialNumber,
-		}).Errorf("Can't retrieve GemPortId for IGMP: %s", err)
-		return err
-	}
-
 	data := &openolt.Indication_PktInd{
 		PktInd: &openolt.PacketIndication{
 			IntfType:  "pon",
 			IntfId:    ponPortId,
-			GemportId: uint32(gemid),
+			GemportId: gemPortId,
 			Pkt:       pkt,
 			PortNo:    portNo,
 		},
@@ -125,17 +105,24 @@
 		}).Errorf("Fail to send IGMP PktInd indication")
 		return err
 	}
+
+	log.WithFields(log.Fields{
+		"OnuId":        onuId,
+		"SerialNumber": serialNumber,
+		"PortNo":       portNo,
+	}).Debugf("Sent SendIGMPMembershipReportV2")
 	return nil
 }
 
-func SendIGMPMembershipReportV3(ponPortId uint32, onuId uint32, serialNumber string, portNo uint32, macAddress net.HardwareAddr, stream bbsim.Stream) error {
+func SendIGMPMembershipReportV3(ponPortId uint32, onuId uint32, serialNumber string, portNo uint32,
+	gemPortId uint32, macAddress net.HardwareAddr, cTag int, pbit uint8, stream bbsim.Stream) error {
 	log.WithFields(log.Fields{
 		"OnuId":        onuId,
 		"SerialNumber": serialNumber,
 		"PortNo":       portNo,
 	}).Debugf("Entered SendIGMPMembershipReportV3")
 	igmp := createIGMPV3MembershipReportPacket()
-	pkt, err := serializeIgmpPacket(ponPortId, onuId, macAddress, igmp)
+	pkt, err := serializeIgmpPacket(ponPortId, onuId, cTag, macAddress, pbit, igmp)
 
 	if err != nil {
 		log.WithFields(log.Fields{
@@ -146,21 +133,11 @@
 		return err
 	}
 
-	gemid, err := omci.GetGemPortId(ponPortId, onuId)
-	if err != nil {
-		log.WithFields(log.Fields{
-			"OnuId":        onuId,
-			"IntfId":       ponPortId,
-			"SerialNumber": serialNumber,
-		}).Errorf("Can't retrieve GemPortId for IGMP: %s", err)
-		return err
-	}
-
 	data := &openolt.Indication_PktInd{
 		PktInd: &openolt.PacketIndication{
 			IntfType:  "pon",
 			IntfId:    ponPortId,
-			GemportId: uint32(gemid),
+			GemportId: gemPortId,
 			Pkt:       pkt,
 			PortNo:    portNo,
 		},
@@ -178,7 +155,34 @@
 	return nil
 }
 
-func createIGMPV3MembershipReportPacket() IGMP {
+func HandleNextPacket(ponPortId uint32, onuId uint32, serialNumber string, portNo uint32,
+	gemPortId uint32, macAddress net.HardwareAddr, pkt gopacket.Packet, cTag int, pbit uint8, stream bbsim.Stream) error {
+
+	igmpLayer := pkt.Layer(layers.LayerTypeIGMP)
+	if igmpLayer == nil {
+		log.WithFields(log.Fields{
+			"OnuId":        onuId,
+			"SerialNumber": serialNumber,
+			"PortNo":       portNo,
+			"Pkt":          hex.EncodeToString(pkt.Data()),
+		}).Error("This is not an IGMP packet")
+		return errors.New("packet-is-not-igmp")
+	}
+
+	log.WithFields(log.Fields{
+		"Pkt": pkt.Data(),
+	}).Trace("IGMP packet")
+
+	igmp := igmpLayer.(*layers.IGMPv1or2)
+
+	if igmp.Type == layers.IGMPMembershipQuery {
+		_ = SendIGMPMembershipReportV2(ponPortId, onuId, serialNumber, portNo, gemPortId, macAddress, cTag, pbit, stream)
+	}
+
+	return nil
+}
+
+func createIGMPV3MembershipReportPacket() *IGMP {
 
 	groupRecord1 := IGMPv3GroupRecord{
 		Type:             IGMPv3GroupRecordType(IGMPIsIn),
@@ -198,8 +202,8 @@
 		AuxData:          0, // NOT USED
 	}
 
-	igmpDefault := IGMP{
-		Type:                    0x22, //IGMPV3 Membership Report
+	igmpDefault := &IGMP{
+		Type:                    layers.IGMPMembershipReportV3, //IGMPV3 Membership Report
 		MaxResponseTime:         time.Duration(1),
 		Checksum:                0,
 		GroupAddress:            net.IPv4(224, 0, 0, 22),
@@ -216,10 +220,18 @@
 	return igmpDefault
 }
 
-//func serializeIgmpPacket(intfId uint32, onuId uint32, srcMac net.HardwareAddr, igmp *layers.IGMP) ([]byte, error) {
-func createIGMPV2MembershipReportPacket() IGMP {
-	return IGMP{
-		Type:            0x16, //IGMPV2 Membership Report
+func createIGMPV2MembershipReportPacket() *IGMP {
+	return &IGMP{
+		Type:         layers.IGMPMembershipReportV2, //IGMPV2 Membership Report
+		Checksum:     0,
+		GroupAddress: net.IPv4(224, 0, 0, 22),
+		Version:      2,
+	}
+}
+
+func createIGMPV2LeaveRequestPacket() *IGMP {
+	return &IGMP{
+		Type:            layers.IGMPLeaveGroup, //IGMPV2 Leave Group
 		MaxResponseTime: time.Duration(1),
 		Checksum:        0,
 		GroupAddress:    net.IPv4(224, 0, 0, 22),
@@ -227,17 +239,7 @@
 	}
 }
 
-func createIGMPV2LeaveRequestPacket() IGMP {
-	return IGMP{
-		Type:            0x17, //IGMPV2 Leave Group
-		MaxResponseTime: time.Duration(1),
-		Checksum:        0,
-		GroupAddress:    net.IPv4(224, 0, 0, 22),
-		Version:         2,
-	}
-}
-
-func serializeIgmpPacket(intfId uint32, onuId uint32, srcMac net.HardwareAddr, igmp IGMP) ([]byte, error) {
+func serializeIgmpPacket(intfId uint32, onuId uint32, cTag int, srcMac net.HardwareAddr, pbit uint8, igmp *IGMP) ([]byte, error) {
 	buffer := gopacket.NewSerializeBuffer()
 	options := gopacket.SerializeOptions{
 		ComputeChecksums: true,
@@ -265,28 +267,22 @@
 		return nil, err
 	}
 
-	return buffer.Bytes(), nil
+	untaggedPkt := gopacket.NewPacket(buffer.Bytes(), layers.LayerTypeEthernet, gopacket.Default)
+	taggedPkt, err := packetHandlers.PushSingleTag(cTag, untaggedPkt, pbit)
+
+	if err != nil {
+		log.Error("TagPacket")
+		return nil, err
+	}
+
+	return taggedPkt.Data(), nil
 }
 
 //-----------------------------------------***********************---------------------------------
-// BaseLayer is a convenience struct which implements the LayerData and
-// LayerPayload functions of the Layer interface.
-type BaseLayer struct {
-	// Contents is the set of bytes that make up this layer.  IE: for an
-	// Ethernet packet, this would be the set of bytes making up the
-	// Ethernet frame.
-	Contents []byte
-	// Payload is the set of bytes contained by (but not part of) this
-	// Layer.  Again, to take Ethernet as an example, this would be the
-	// set of bytes encapsulated by the Ethernet protocol.
-	Payload []byte
-}
-
-type IGMPType uint8
 
 type IGMP struct {
-	BaseLayer
-	Type                    IGMPType
+	layers.BaseLayer
+	Type                    layers.IGMPType
 	MaxResponseTime         time.Duration
 	Checksum                uint16
 	GroupAddress            net.IP
@@ -343,8 +339,7 @@
 // SerializeTo writes the serialized form of this layer into the
 // SerializationBuffer, implementing gopacket.SerializableLayer.
 // See the docs for gopacket.SerializableLayer for more info.
-func (igmp IGMP) SerializeTo(b gopacket.SerializeBuffer, opts gopacket.SerializeOptions) error {
-	log.Debugf("Serializing IGMP Packet")
+func (igmp *IGMP) SerializeTo(b gopacket.SerializeBuffer, opts gopacket.SerializeOptions) error {
 	data, err := b.PrependBytes(8915)
 	if err != nil {
 		return err
@@ -410,4 +405,6 @@
 	return ^uint16(csum)
 }
 
-func (IGMP) LayerType() gopacket.LayerType { return layers.LayerTypeIGMP }
+func (i *IGMP) LayerType() gopacket.LayerType { return layers.LayerTypeIGMP }
+func (i *IGMP) LayerContents() []byte         { return i.Contents }
+func (i *IGMP) LayerPayload() []byte          { return i.Payload }
diff --git a/internal/bbsim/responders/igmp/igmp_test.go b/internal/bbsim/responders/igmp/igmp_test.go
new file mode 100644
index 0000000..a1788c0
--- /dev/null
+++ b/internal/bbsim/responders/igmp/igmp_test.go
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package igmp
+
+import (
+	"encoding/hex"
+	"fmt"
+	"github.com/google/gopacket"
+	"github.com/google/gopacket/layers"
+	"github.com/opencord/voltha-protos/v3/go/openolt"
+	"github.com/stretchr/testify/assert"
+	"google.golang.org/grpc"
+	"net"
+	"testing"
+)
+
+type mockStream struct {
+	CallCount int
+	Calls     map[int]*openolt.Indication
+	grpc.ServerStream
+}
+
+func (s *mockStream) Send(ind *openolt.Indication) error {
+	s.CallCount++
+	s.Calls[s.CallCount] = ind
+	return nil
+}
+
+func TestHandleNextPacket(t *testing.T) {
+
+	t.Skip("Need to find how to serialize an IGMP packet")
+
+	stream := &mockStream{
+		CallCount: 0,
+		Calls:     make(map[int]*openolt.Indication),
+	}
+
+	mac := net.HardwareAddr{0x2e, 0x60, 0x70, 0x13, 0x15, 0x16}
+
+	packetData := []byte{
+		1, 0, 94, 0, 0, 22, 222, 173, 190, 239, 186, 17, 8, 0, 70, 0, 0, 32, 0, 0, 0, 0, 120, 2, 191,
+		215, 10, 244, 2, 246, 224, 0, 0, 22, 148, 4, 0, 0, 17, 10, 14, 223, 224, 0, 0, 22, 0, 0, 0, 0,
+		0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+	}
+	fmt.Println(hex.EncodeToString(packetData))
+
+	packet := gopacket.NewPacket(packetData, layers.LayerTypeIPv4, gopacket.Default)
+
+	fmt.Println(hex.EncodeToString(packet.Data()))
+
+	fmt.Println(packet.Layers())
+
+	err := HandleNextPacket(0, 0, "FOO", 1, 1024, mac, packet, 55, 5, stream)
+	assert.Nil(t, err)
+
+	assert.Equal(t, 1, stream.CallCount)
+}
diff --git a/internal/bbsim/types/interfaces.go b/internal/bbsim/types/interfaces.go
index 53a05b4..6a12136 100644
--- a/internal/bbsim/types/interfaces.go
+++ b/internal/bbsim/types/interfaces.go
@@ -17,7 +17,7 @@
 package types
 
 import (
-	"github.com/opencord/voltha-protos/v2/go/openolt"
+	"github.com/opencord/voltha-protos/v3/go/openolt"
 )
 
 // represent a gRPC stream