[VOL-3837] Checking that OnuId, AllocId and GemPorts are unique per PON
Validate that received flow carry valid GemPortId and AllocIds

Change-Id: I1b8928c7a9e580c9711f61320595a449df7c30f5
diff --git a/internal/bbsim/api/grpc_api_server.go b/internal/bbsim/api/grpc_api_server.go
index 01c7f4f..ec25a10 100644
--- a/internal/bbsim/api/grpc_api_server.go
+++ b/internal/bbsim/api/grpc_api_server.go
@@ -19,6 +19,7 @@
 import (
 	"context"
 	"fmt"
+	"google.golang.org/grpc/status"
 	"strings"
 	"time"
 
@@ -67,9 +68,43 @@
 	}
 
 	for _, pon := range olt.Pons {
+
+		allocatedOnuIds := []*bbsim.PonAllocatedResources{}
+		allocatedAllocIds := []*bbsim.PonAllocatedResources{}
+		allocatedGemPorts := []*bbsim.PonAllocatedResources{}
+
+		for k, v := range pon.AllocatedOnuIds {
+			resource := &bbsim.PonAllocatedResources{
+				SerialNumber: common.OnuSnToString(v),
+				Id:           int32(k),
+			}
+			allocatedOnuIds = append(allocatedOnuIds, resource)
+		}
+
+		for k, v := range pon.AllocatedGemPorts {
+			resource := &bbsim.PonAllocatedResources{
+				SerialNumber: common.OnuSnToString(v),
+				Id:           int32(k),
+			}
+			allocatedGemPorts = append(allocatedGemPorts, resource)
+		}
+
+		for k, v := range pon.AllocatedAllocIds {
+			resource := &bbsim.PonAllocatedResources{
+				SerialNumber: common.OnuSnToString(v),
+				Id:           int32(k),
+			}
+			allocatedAllocIds = append(allocatedAllocIds, resource)
+		}
+
 		p := bbsim.PONPort{
-			ID:        int32(pon.ID),
-			OperState: pon.OperState.Current(),
+			ID:                int32(pon.ID),
+			OperState:         pon.OperState.Current(),
+			InternalState:     pon.InternalState.Current(),
+			PacketCount:       pon.PacketCount,
+			AllocatedOnuIds:   allocatedOnuIds,
+			AllocatedAllocIds: allocatedAllocIds,
+			AllocatedGemPorts: allocatedGemPorts,
 		}
 		pons = append(pons, &p)
 	}
@@ -91,6 +126,48 @@
 	return &res, nil
 }
 
+// takes a nested map and return a proto
+func resourcesMapToresourcesProto(resourceType bbsim.OltAllocatedResourceType_Type, resources map[uint32]map[uint32]map[uint32]map[int32]map[uint64]bool) *bbsim.OltAllocatedResources {
+	proto := &bbsim.OltAllocatedResources{
+		Resources: []*bbsim.OltAllocatedResource{},
+	}
+	for ponId, ponValues := range resources {
+		for onuId, onuValues := range ponValues {
+			for uniId, uniValues := range onuValues {
+				for allocId, flows := range uniValues {
+					for flow := range flows {
+						resource := &bbsim.OltAllocatedResource{
+							Type:       resourceType.String(),
+							PonPortId:  ponId,
+							OnuId:      onuId,
+							PortNo:     uniId,
+							ResourceId: allocId,
+							FlowId:     flow,
+						}
+						proto.Resources = append(proto.Resources, resource)
+					}
+				}
+			}
+		}
+	}
+	return proto
+}
+
+func (s BBSimServer) GetOltAllocatedResources(ctx context.Context, req *bbsim.OltAllocatedResourceType) (*bbsim.OltAllocatedResources, error) {
+	o := devices.GetOLT()
+
+	switch req.Type {
+	case bbsim.OltAllocatedResourceType_UNKNOWN:
+		return nil, status.Errorf(codes.InvalidArgument, "resource-type-%s-is-invalid", req.Type)
+	case bbsim.OltAllocatedResourceType_ALLOC_ID:
+		return resourcesMapToresourcesProto(bbsim.OltAllocatedResourceType_ALLOC_ID, o.AllocIDs), nil
+	case bbsim.OltAllocatedResourceType_GEM_PORT:
+		return resourcesMapToresourcesProto(bbsim.OltAllocatedResourceType_GEM_PORT, o.GemPortIDs), nil
+	default:
+		return nil, status.Errorf(codes.InvalidArgument, "unkown-resource-type-%s", req.Type)
+	}
+}
+
 func (s BBSimServer) PoweronOlt(ctx context.Context, req *bbsim.Empty) (*bbsim.Response, error) {
 	res := &bbsim.Response{}
 	o := devices.GetOLT()
diff --git a/internal/bbsim/devices/olt.go b/internal/bbsim/devices/olt.go
index f9ef936..87b9da4 100644
--- a/internal/bbsim/devices/olt.go
+++ b/internal/bbsim/devices/olt.go
@@ -90,6 +90,13 @@
 
 	OpenoltStream openolt.Openolt_EnableIndicationServer
 	enablePerf    bool
+
+	// Allocated Resources
+	// this data are to verify that the openolt adapter does not duplicate resources
+	AllocIDsLock   sync.RWMutex
+	AllocIDs       map[uint32]map[uint32]map[uint32]map[int32]map[uint64]bool // map[ponPortId]map[OnuId]map[PortNo]map[AllocIds]map[FlowId]bool
+	GemPortIDsLock sync.RWMutex
+	GemPortIDs     map[uint32]map[uint32]map[uint32]map[int32]map[uint64]bool // map[ponPortId]map[OnuId]map[PortNo]map[GemPortIDs]map[FlowId]bool
 }
 
 var olt OltDevice
@@ -123,6 +130,8 @@
 		PortStatsInterval:   options.Olt.PortStatsInterval,
 		dhcpServer:          dhcp.NewDHCPServer(),
 		PreviouslyConnected: false,
+		AllocIDs:            make(map[uint32]map[uint32]map[uint32]map[int32]map[uint64]bool),
+		GemPortIDs:          make(map[uint32]map[uint32]map[uint32]map[int32]map[uint64]bool),
 	}
 
 	if val, ok := ControlledActivationModes[options.BBSim.ControlledActivation]; ok {
@@ -149,6 +158,10 @@
 				oltLogger.Debugf("Changing OLT InternalState from %s to %s", e.Src, e.Dst)
 			},
 			"enter_initialized": func(e *fsm.Event) { olt.InitOlt() },
+			"enter_deleted": func(e *fsm.Event) {
+				// remove all the resource allocations
+				olt.clearAllResources()
+			},
 		},
 	)
 
@@ -169,6 +182,11 @@
 
 	// create PON ports
 	for i := 0; i < olt.NumPon; i++ {
+
+		// initialize the resource maps for every PON Ports
+		olt.AllocIDs[uint32(i)] = make(map[uint32]map[uint32]map[int32]map[uint64]bool)
+		olt.GemPortIDs[uint32(i)] = make(map[uint32]map[uint32]map[int32]map[uint64]bool)
+
 		p := CreatePonPort(&olt, uint32(i))
 
 		// create ONU devices
@@ -253,6 +271,12 @@
 		// in-band management
 		o.Nnis[0].OperState.SetState("down")
 	}
+
+	for ponId := range o.Pons {
+		// initialize the resource maps for every PON Ports
+		olt.AllocIDs[uint32(ponId)] = make(map[uint32]map[uint32]map[int32]map[uint64]bool)
+		olt.GemPortIDs[uint32(ponId)] = make(map[uint32]map[uint32]map[int32]map[uint64]bool)
+	}
 }
 
 func (o *OltDevice) RestartOLT() error {
@@ -822,16 +846,14 @@
 	publishEvent("ONU-activate-indication-received", int32(onu.IntfId), int32(onu.OnuId), onuSnToString(onu.SerialNumber))
 
 	pon, _ := o.GetPonById(onu.IntfId)
+
+	// Initialize the resource maps for this ONU
+	olt.AllocIDs[onu.IntfId][onu.OnuId] = make(map[uint32]map[int32]map[uint64]bool)
+	olt.GemPortIDs[onu.IntfId][onu.OnuId] = make(map[uint32]map[int32]map[uint64]bool)
+
 	_onu, _ := pon.GetOnuBySn(onu.SerialNumber)
 	_onu.SetID(onu.OnuId)
 
-	if err := _onu.OperState.Event("enable"); err != nil {
-		oltLogger.WithFields(log.Fields{
-			"IntfId": _onu.PonPortID,
-			"OnuSn":  _onu.Sn(),
-			"OnuId":  _onu.ID,
-		}).Infof("Failed to transition ONU.OperState to enabled state: %s", err.Error())
-	}
 	if err := _onu.InternalState.Event(OnuTxEnable); err != nil {
 		oltLogger.WithFields(log.Fields{
 			"IntfId": _onu.PonPortID,
@@ -845,7 +867,7 @@
 	return new(openolt.Empty), nil
 }
 
-func (o *OltDevice) DeactivateOnu(context.Context, *openolt.Onu) (*openolt.Empty, error) {
+func (o *OltDevice) DeactivateOnu(_ context.Context, onu *openolt.Onu) (*openolt.Empty, error) {
 	oltLogger.Error("DeactivateOnu not implemented")
 	return new(openolt.Empty), nil
 }
@@ -1056,6 +1078,21 @@
 			}
 		}
 
+		// validate that the flow reference correct IDs (Alloc, Gem)
+		if err := o.validateFlow(flow); err != nil {
+			oltLogger.WithFields(log.Fields{
+				"OnuId":        flow.OnuId,
+				"IntfId":       flow.AccessIntfId,
+				"Flow":         flow,
+				"SerialNumber": onu.Sn(),
+				"err":          err,
+			}).Error("invalid-flow-for-onu")
+			return nil, err
+		}
+
+		o.storeGemPortId(flow)
+		o.storeAllocId(flow)
+
 		msg := types.Message{
 			Type: types.FlowAdd,
 			Data: types.OnuFlowUpdateMessage{
@@ -1074,10 +1111,22 @@
 func (o *OltDevice) FlowRemove(_ context.Context, flow *openolt.Flow) (*openolt.Empty, error) {
 
 	oltLogger.WithFields(log.Fields{
-		"FlowId":   flow.FlowId,
-		"FlowType": flow.FlowType,
+		"AllocId":       flow.AllocId,
+		"Cookie":        flow.Cookie,
+		"FlowId":        flow.FlowId,
+		"FlowType":      flow.FlowType,
+		"GemportId":     flow.GemportId,
+		"IntfId":        flow.AccessIntfId,
+		"OnuId":         flow.OnuId,
+		"PortNo":        flow.PortNo,
+		"UniID":         flow.UniId,
+		"ReplicateFlow": flow.ReplicateFlow,
+		"PbitToGemport": flow.PbitToGemport,
 	}).Debug("OLT receives FlowRemove")
 
+	olt.freeGemPortId(flow)
+	olt.freeAllocId(flow)
+
 	if !o.enablePerf { // remove only if flow were stored
 		flowKey := FlowKey{
 			ID:        flow.FlowId,
@@ -1177,11 +1226,6 @@
 
 func (o *OltDevice) GetDeviceInfo(context.Context, *openolt.Empty) (*openolt.DeviceInfo, error) {
 
-	oltLogger.WithFields(log.Fields{
-		"oltId":    o.ID,
-		"PonPorts": o.NumPon,
-	}).Info("OLT receives GetDeviceInfo call from VOLTHA")
-
 	intfIDs := []uint32{}
 	for i := 0; i < o.NumPon; i++ {
 		intfIDs = append(intfIDs, uint32(i))
@@ -1538,3 +1582,179 @@
 func (o *OltDevice) GetOnuStatistics(ctx context.Context, in *openolt.Onu) (*openolt.OnuStatistics, error) {
 	return &openolt.OnuStatistics{}, nil
 }
+
+func (o *OltDevice) storeAllocId(flow *openolt.Flow) {
+	o.AllocIDsLock.Lock()
+	defer o.AllocIDsLock.Unlock()
+
+	oltLogger.WithFields(log.Fields{
+		"IntfId":  flow.AccessIntfId,
+		"OnuId":   flow.OnuId,
+		"PortNo":  flow.PortNo,
+		"AllocId": flow.AllocId,
+	}).Trace("storing-alloc-id-via-flow")
+
+	if _, ok := o.AllocIDs[uint32(flow.AccessIntfId)][uint32(flow.OnuId)][flow.PortNo]; !ok {
+		o.AllocIDs[uint32(flow.AccessIntfId)][uint32(flow.OnuId)][flow.PortNo] = make(map[int32]map[uint64]bool)
+	}
+	if _, ok := o.AllocIDs[uint32(flow.AccessIntfId)][uint32(flow.OnuId)][flow.PortNo][flow.AllocId]; !ok {
+		o.AllocIDs[uint32(flow.AccessIntfId)][uint32(flow.OnuId)][flow.PortNo][flow.AllocId] = make(map[uint64]bool)
+	}
+	o.AllocIDs[uint32(flow.AccessIntfId)][uint32(flow.OnuId)][flow.PortNo][flow.AllocId][flow.FlowId] = true
+}
+
+func (o *OltDevice) freeAllocId(flow *openolt.Flow) {
+	// if this is the last flow referencing the AllocId then remove it
+	o.AllocIDsLock.Lock()
+	defer o.AllocIDsLock.Unlock()
+
+	oltLogger.WithFields(log.Fields{
+		"IntfId":    flow.AccessIntfId,
+		"OnuId":     flow.OnuId,
+		"PortNo":    flow.PortNo,
+		"GemportId": flow.GemportId,
+	}).Trace("freeing-alloc-id-via-flow")
+
+	// NOTE look at the freeGemPortId implementation for comments and context
+	for ponId, ponValues := range o.AllocIDs {
+		for onuId, onuValues := range ponValues {
+			for uniId, uniValues := range onuValues {
+				for allocId, flows := range uniValues {
+					for flowId := range flows {
+						// if the flow matches, remove it from the map.
+						if flow.FlowId == flowId {
+							delete(o.AllocIDs[ponId][onuId][uniId][allocId], flow.FlowId)
+						}
+						// if that was the last flow for a particular allocId, remove the entire allocId
+						if len(o.AllocIDs[ponId][onuId][uniId][allocId]) == 0 {
+							delete(o.AllocIDs[ponId][onuId][uniId], allocId)
+						}
+					}
+				}
+			}
+		}
+	}
+}
+
+func (o *OltDevice) storeGemPortId(flow *openolt.Flow) {
+	o.GemPortIDsLock.Lock()
+	defer o.GemPortIDsLock.Unlock()
+
+	oltLogger.WithFields(log.Fields{
+		"IntfId":    flow.AccessIntfId,
+		"OnuId":     flow.OnuId,
+		"PortNo":    flow.PortNo,
+		"GemportId": flow.GemportId,
+	}).Trace("storing-gem-port-id-via-flow")
+
+	if _, ok := o.GemPortIDs[uint32(flow.AccessIntfId)][uint32(flow.OnuId)][flow.PortNo]; !ok {
+		o.GemPortIDs[uint32(flow.AccessIntfId)][uint32(flow.OnuId)][flow.PortNo] = make(map[int32]map[uint64]bool)
+	}
+	if _, ok := o.GemPortIDs[uint32(flow.AccessIntfId)][uint32(flow.OnuId)][flow.PortNo][flow.GemportId]; !ok {
+		o.GemPortIDs[uint32(flow.AccessIntfId)][uint32(flow.OnuId)][flow.PortNo][flow.GemportId] = make(map[uint64]bool)
+	}
+	o.GemPortIDs[uint32(flow.AccessIntfId)][uint32(flow.OnuId)][flow.PortNo][flow.GemportId][flow.FlowId] = true
+}
+
+func (o *OltDevice) freeGemPortId(flow *openolt.Flow) {
+	// if this is the last flow referencing the GemPort then remove it
+	o.GemPortIDsLock.Lock()
+	defer o.GemPortIDsLock.Unlock()
+
+	oltLogger.WithFields(log.Fields{
+		"IntfId":    flow.AccessIntfId,
+		"OnuId":     flow.OnuId,
+		"PortNo":    flow.PortNo,
+		"GemportId": flow.GemportId,
+	}).Trace("freeing-gem-port-id-via-flow")
+
+	// NOTE that this loop is not very performant, it would be better if the flow carries
+	// the same information that it carries during a FlowAdd. If so we can directly remove
+	// items from the map
+
+	//delete(o.GemPortIDs[uint32(flow.AccessIntfId)][uint32(flow.OnuId)][flow.PortNo][flow.GemportId], flow.FlowId)
+	//if len(o.GemPortIDs[uint32(flow.AccessIntfId)][uint32(flow.OnuId)][flow.PortNo][flow.GemportId]) == 0 {
+	//	delete(o.GemPortIDs[uint32(flow.AccessIntfId)][uint32(flow.OnuId)][flow.PortNo], flow.GemportId)
+	//}
+
+	// NOTE this loop assumes that flow IDs are unique per device
+	for ponId, ponValues := range o.GemPortIDs {
+		for onuId, onuValues := range ponValues {
+			for uniId, uniValues := range onuValues {
+				for gemId, flows := range uniValues {
+					for flowId := range flows {
+						// if the flow matches, remove it from the map.
+						if flow.FlowId == flowId {
+							delete(o.GemPortIDs[ponId][onuId][uniId][gemId], flow.FlowId)
+						}
+						// if that was the last flow for a particular gem, remove the entire gem
+						if len(o.GemPortIDs[ponId][onuId][uniId][gemId]) == 0 {
+							delete(o.GemPortIDs[ponId][onuId][uniId], gemId)
+						}
+					}
+				}
+			}
+		}
+	}
+}
+
+// validateFlow checks that:
+// - the AllocId is not used in any flow referencing other ONUs/UNIs on the same PON
+// - the GemPortId is not used in any flow referencing other ONUs/UNIs on the same PON
+func (o *OltDevice) validateFlow(flow *openolt.Flow) error {
+
+	// validate gemPort
+	o.GemPortIDsLock.RLock()
+	allocatedGems := o.GemPortIDs[uint32(flow.AccessIntfId)]
+	o.GemPortIDsLock.RUnlock()
+	for onuId, onu := range allocatedGems {
+		if onuId == uint32(flow.OnuId) {
+			continue
+		}
+		for uniId, uni := range onu {
+			for gem := range uni {
+				if gem == flow.GemportId {
+					return fmt.Errorf("gem-%d-already-in-use-on-uni-%d-onu-%d", gem, uniId, onuId)
+				}
+			}
+		}
+	}
+
+	o.AllocIDsLock.RLock()
+	allocatedAllocIds := o.AllocIDs[uint32(flow.AccessIntfId)]
+	o.AllocIDsLock.RUnlock()
+	for onuId, onu := range allocatedAllocIds {
+		if onuId == uint32(flow.OnuId) {
+			continue
+		}
+		for uniId, uni := range onu {
+			for allocId := range uni {
+				if allocId == flow.AllocId {
+					return fmt.Errorf("allocId-%d-already-in-use-on-uni-%d-onu-%d", allocId, uniId, onuId)
+				}
+			}
+		}
+	}
+
+	return nil
+}
+
+// clearAllResources is invoked up OLT Reboot to remove all the allocated
+// GemPorts, AllocId and ONU-IDs across the PONs
+func (o *OltDevice) clearAllResources() {
+
+	// remove the resources received via flows
+	o.GemPortIDsLock.Lock()
+	o.GemPortIDs = make(map[uint32]map[uint32]map[uint32]map[int32]map[uint64]bool)
+	o.GemPortIDsLock.Unlock()
+	o.AllocIDsLock.Lock()
+	o.AllocIDs = make(map[uint32]map[uint32]map[uint32]map[int32]map[uint64]bool)
+	o.AllocIDsLock.Unlock()
+
+	// remove the resources received via OMCI
+	for _, pon := range o.Pons {
+		pon.removeAllAllocIds()
+		pon.removeAllGemPorts()
+		pon.removeAllOnuIds()
+	}
+}
diff --git a/internal/bbsim/devices/olt_test.go b/internal/bbsim/devices/olt_test.go
index 154942b..acbe7a7 100644
--- a/internal/bbsim/devices/olt_test.go
+++ b/internal/bbsim/devices/olt_test.go
@@ -28,15 +28,27 @@
 
 func createMockOlt(numPon int, numOnu int, services []ServiceIf) *OltDevice {
 	olt := &OltDevice{
-		ID: 0,
+		ID:         0,
+		AllocIDs:   make(map[uint32]map[uint32]map[uint32]map[int32]map[uint64]bool),
+		GemPortIDs: make(map[uint32]map[uint32]map[uint32]map[int32]map[uint64]bool),
 	}
 
 	for i := 0; i < numPon; i++ {
+
+		// initialize the resource maps for every PON Ports
+		olt.AllocIDs[uint32(i)] = make(map[uint32]map[uint32]map[int32]map[uint64]bool)
+		olt.GemPortIDs[uint32(i)] = make(map[uint32]map[uint32]map[int32]map[uint64]bool)
+
 		pon := PonPort{
 			ID: uint32(i),
 		}
 
 		for j := 0; j < numOnu; j++ {
+
+			// initialize the resource maps for every ONU and the first UNI
+			olt.AllocIDs[uint32(i)][uint32(j)] = make(map[uint32]map[int32]map[uint64]bool)
+			olt.GemPortIDs[uint32(i)][uint32(j)] = make(map[uint32]map[int32]map[uint64]bool)
+
 			onuId := uint32(i + j)
 			onu := Onu{
 				ID:        onuId,
@@ -51,7 +63,7 @@
 				onu.Services = append(onu.Services, service)
 			}
 
-			onu.SerialNumber = onu.NewSN(olt.ID, pon.ID, onu.ID)
+			onu.SerialNumber = NewSN(olt.ID, pon.ID, onu.ID)
 			pon.Onus = append(pon.Onus, &onu)
 		}
 		olt.Pons = append(olt.Pons, &pon)
@@ -227,3 +239,209 @@
 	assert.Equal(t, err, nil)
 	assert.Equal(t, found.Sn(), onu1.Sn())
 }
+
+func Test_Olt_storeGemPortId(t *testing.T) {
+
+	const (
+		pon  = 1
+		onu  = 1
+		uni  = 16
+		gem1 = 1024
+		gem2 = 1025
+	)
+
+	numPon := 2
+	numOnu := 2
+
+	olt := createMockOlt(numPon, numOnu, []ServiceIf{})
+
+	// add a first flow on the ONU
+	flow1 := &openolt.Flow{
+		AccessIntfId: pon,
+		OnuId:        onu,
+		PortNo:       uni,
+		FlowId:       1,
+		GemportId:    gem1,
+	}
+
+	olt.storeGemPortId(flow1)
+	assert.Equal(t, len(olt.GemPortIDs[pon][onu][uni]), 1)       // we have 1 gem port
+	assert.Equal(t, len(olt.GemPortIDs[pon][onu][uni][gem1]), 1) // and one flow referencing it
+
+	// add a second flow on the ONU (same gem)
+	flow2 := &openolt.Flow{
+		AccessIntfId: pon,
+		OnuId:        onu,
+		PortNo:       uni,
+		FlowId:       2,
+		GemportId:    gem1,
+	}
+
+	olt.storeGemPortId(flow2)
+	assert.Equal(t, len(olt.GemPortIDs[pon][onu][uni]), 1)       // we have 1 gem port
+	assert.Equal(t, len(olt.GemPortIDs[pon][onu][uni][gem1]), 2) // and two flows referencing it
+
+	// add a third flow on the ONU (different gem)
+	flow3 := &openolt.Flow{
+		AccessIntfId: pon,
+		OnuId:        onu,
+		PortNo:       uni,
+		FlowId:       2,
+		GemportId:    1025,
+	}
+
+	olt.storeGemPortId(flow3)
+	assert.Equal(t, len(olt.GemPortIDs[pon][onu][uni]), 2)       // we have 2 gem ports
+	assert.Equal(t, len(olt.GemPortIDs[pon][onu][uni][gem1]), 2) // two flows referencing the first one
+	assert.Equal(t, len(olt.GemPortIDs[pon][onu][uni][gem2]), 1) // and one flow referencing the second one
+}
+
+func Test_Olt_freeGemPortId(t *testing.T) {
+	const (
+		pon   = 1
+		onu   = 1
+		uni   = 16
+		gem1  = 1024
+		gem2  = 1025
+		flow1 = 1
+		flow2 = 2
+		flow3 = 3
+	)
+
+	numPon := 2
+	numOnu := 2
+
+	olt := createMockOlt(numPon, numOnu, []ServiceIf{})
+
+	olt.GemPortIDs[pon][onu][uni] = make(map[int32]map[uint64]bool)
+	olt.GemPortIDs[pon][onu][uni][gem1] = make(map[uint64]bool)
+	olt.GemPortIDs[pon][onu][uni][gem1][flow1] = true
+	olt.GemPortIDs[pon][onu][uni][gem1][flow2] = true
+	olt.GemPortIDs[pon][onu][uni][gem2] = make(map[uint64]bool)
+	olt.GemPortIDs[pon][onu][uni][gem2][flow3] = true
+
+	// remove one flow on the first gem, check that the gem is still allocated as there is still a flow referencing it
+	// NOTE that the flow remove only carries the flow ID, no other information
+	flowGem1 := &openolt.Flow{
+		FlowId: flow1,
+	}
+
+	olt.freeGemPortId(flowGem1)
+	// we still have two unis in the map
+	assert.Equal(t, len(olt.GemPortIDs[pon][onu][uni]), 2)
+
+	// we should now have a single gem referenced on this UNI
+	assert.Equal(t, len(olt.GemPortIDs[pon][onu][uni][gem1]), 1, "gemport-not-removed")
+
+	// the gem should still reference flow 2
+	assert.Equal(t, olt.GemPortIDs[pon][onu][uni][gem1][flow2], true)
+	// but should not reference flow1
+	_, flow1Exists := olt.GemPortIDs[pon][onu][uni][gem1][flow1]
+	assert.Equal(t, flow1Exists, false)
+
+	// this is the only flow remaining on this gem, the gem should be removed
+	flowGem2 := &openolt.Flow{
+		FlowId: flow2,
+	}
+	olt.freeGemPortId(flowGem2)
+
+	// we should now have a single gem referenced on this UNI
+	assert.Equal(t, len(olt.GemPortIDs[pon][onu][uni]), 1, "gemport-not-removed")
+
+	// and it should be gem2
+	_, gem1exists := olt.GemPortIDs[pon][onu][uni][gem1]
+	assert.Equal(t, gem1exists, false)
+	_, gem2exists := olt.GemPortIDs[pon][onu][uni][gem2]
+	assert.Equal(t, gem2exists, true)
+}
+
+func Test_Olt_validateFlow(t *testing.T) {
+
+	const (
+		pon0            = 0
+		pon1            = 1
+		onu0            = 0
+		onu1            = 1
+		uniPort         = 0
+		usedGemIdPon0   = 1024
+		usedGemIdPon1   = 1025
+		usedAllocIdPon0 = 1
+		usedAllocIdPon1 = 2
+		flowId          = 1
+	)
+
+	numPon := 2
+	numOnu := 2
+
+	olt := createMockOlt(numPon, numOnu, []ServiceIf{})
+
+	olt.GemPortIDs[pon0][onu0][uniPort] = make(map[int32]map[uint64]bool)
+	olt.GemPortIDs[pon1][onu0][uniPort] = make(map[int32]map[uint64]bool)
+
+	olt.GemPortIDs[pon0][onu0][uniPort][usedGemIdPon0] = make(map[uint64]bool)
+	olt.GemPortIDs[pon0][onu0][uniPort][usedGemIdPon0][flowId] = true
+	olt.GemPortIDs[pon1][onu0][uniPort][usedGemIdPon1] = make(map[uint64]bool)
+	olt.GemPortIDs[pon1][onu0][uniPort][usedGemIdPon1][flowId] = true
+
+	olt.AllocIDs[pon0][onu0][uniPort] = make(map[int32]map[uint64]bool)
+	olt.AllocIDs[pon1][onu0][uniPort] = make(map[int32]map[uint64]bool)
+	olt.AllocIDs[pon0][onu0][uniPort][usedAllocIdPon0] = make(map[uint64]bool)
+	olt.AllocIDs[pon0][onu0][uniPort][usedAllocIdPon0][flowId] = true
+	olt.AllocIDs[pon1][onu0][uniPort][usedAllocIdPon1] = make(map[uint64]bool)
+	olt.AllocIDs[pon1][onu0][uniPort][usedAllocIdPon1][flowId] = true
+
+	// a GemPortID can be referenced across multiple flows on the same ONU
+	validGemFlow := &openolt.Flow{
+		AccessIntfId: pon0,
+		OnuId:        onu0,
+		GemportId:    usedGemIdPon0,
+	}
+
+	err := olt.validateFlow(validGemFlow)
+	assert.NilError(t, err)
+
+	// a GemPortID can NOT be referenced across different ONUs on the same PON
+	invalidGemFlow := &openolt.Flow{
+		AccessIntfId: pon0,
+		OnuId:        onu1,
+		GemportId:    usedGemIdPon0,
+	}
+	err = olt.validateFlow(invalidGemFlow)
+	assert.Error(t, err, "gem-1024-already-in-use-on-uni-0-onu-0")
+
+	// if a flow reference the same GEM on a different PON it's a valid flow
+	invalidGemDifferentPonFlow := &openolt.Flow{
+		AccessIntfId: pon1,
+		OnuId:        onu1,
+		GemportId:    usedGemIdPon0,
+	}
+	err = olt.validateFlow(invalidGemDifferentPonFlow)
+	assert.NilError(t, err)
+
+	// an allocId can be referenced across multiple flows on the same ONU
+	validAllocFlow := &openolt.Flow{
+		AccessIntfId: pon0,
+		OnuId:        onu0,
+		AllocId:      usedAllocIdPon0,
+	}
+	err = olt.validateFlow(validAllocFlow)
+	assert.NilError(t, err)
+
+	// an allocId can NOT be referenced across different ONUs on the same PON
+	invalidAllocFlow := &openolt.Flow{
+		AccessIntfId: pon0,
+		OnuId:        onu1,
+		AllocId:      usedAllocIdPon0,
+	}
+	err = olt.validateFlow(invalidAllocFlow)
+	assert.Error(t, err, "allocId-1-already-in-use-on-uni-0-onu-0")
+
+	// if a flow reference the same AllocId on a different PON it's a valid flow
+	invalidAllocDifferentPonFlow := &openolt.Flow{
+		AccessIntfId: pon1,
+		OnuId:        onu1,
+		AllocId:      usedAllocIdPon0,
+	}
+	err = olt.validateFlow(invalidAllocDifferentPonFlow)
+	assert.NilError(t, err)
+}
diff --git a/internal/bbsim/devices/onu.go b/internal/bbsim/devices/onu.go
index fd7eca0..ab7c35f 100644
--- a/internal/bbsim/devices/onu.go
+++ b/internal/bbsim/devices/onu.go
@@ -22,14 +22,13 @@
 	"fmt"
 	pb "github.com/opencord/bbsim/api/bbsim"
 	"github.com/opencord/bbsim/internal/bbsim/alarmsim"
-	bbsim "github.com/opencord/bbsim/internal/bbsim/types"
-	me "github.com/opencord/omci-lib-go/generated"
-	"strconv"
-
 	"github.com/opencord/bbsim/internal/bbsim/packetHandlers"
 	"github.com/opencord/bbsim/internal/bbsim/responders/dhcp"
 	"github.com/opencord/bbsim/internal/bbsim/responders/eapol"
+	bbsim "github.com/opencord/bbsim/internal/bbsim/types"
+	me "github.com/opencord/omci-lib-go/generated"
 	"net"
+	"strconv"
 	"time"
 
 	"github.com/google/gopacket/layers"
@@ -102,11 +101,9 @@
 	// PortNo comes with flows and it's used when sending packetIndications,
 	// There is one PortNo per UNI Port, for now we're only storing the first one
 	// FIXME add support for multiple UNIs (each UNI has a different PortNo)
-	PortNo uint32
-	// deprecated (gemPort is on a Service basis)
-	GemPortAdded bool
-	Flows        []FlowKey
-	FlowIds      []uint64 // keep track of the flows we currently have in the ONU
+	PortNo  uint32
+	Flows   []FlowKey
+	FlowIds []uint64 // keep track of the flows we currently have in the ONU
 
 	OperState    *fsm.FSM
 	SerialNumber *openolt.SerialNumber
@@ -153,7 +150,7 @@
 		ActiveImageEntityId:           0, // when we start the SoftwareImage with ID 0 is active and committed
 		CommittedImageEntityId:        0,
 	}
-	o.SerialNumber = o.NewSN(olt.ID, pon.ID, id)
+	o.SerialNumber = NewSN(olt.ID, pon.ID, id)
 	// NOTE this state machine is used to track the operational
 	// state as requested by VOLTHA
 	o.OperState = getOperStateFSM(func(e *fsm.Event) {
@@ -217,6 +214,18 @@
 				o.Channel <- msg
 			},
 			"enter_enabled": func(event *fsm.Event) {
+
+				if used, sn := o.PonPort.isOnuIdAllocated(o.ID); used {
+					onuLogger.WithFields(log.Fields{
+						"IntfId":       o.PonPortID,
+						"OnuId":        o.ID,
+						"SerialNumber": o.Sn(),
+					}).Errorf("received-omci-with-sn-%s", common.OnuSnToString(sn))
+					return
+				} else {
+					o.PonPort.storeOnuId(o.ID, o.SerialNumber)
+				}
+
 				msg := bbsim.Message{
 					Type: bbsim.OnuIndication,
 					Data: bbsim.OnuIndicationMessage{
@@ -235,9 +244,11 @@
 			"enter_disabled": func(event *fsm.Event) {
 
 				// clean the ONU state
-				o.GemPortAdded = false
 				o.PortNo = 0
 				o.Flows = []FlowKey{}
+				o.PonPort.removeOnuId(o.ID)
+				o.PonPort.removeAllocId(o.SerialNumber)
+				o.PonPort.removeGemPortBySn(o.SerialNumber)
 
 				// set the OperState to disabled
 				if err := o.OperState.Event("disable"); err != nil {
@@ -438,7 +449,7 @@
 	}).Debug("Stopped handling ONU Indication Channel")
 }
 
-func (o Onu) NewSN(oltid int, intfid uint32, onuid uint32) *openolt.SerialNumber {
+func NewSN(oltid int, intfid uint32, onuid uint32) *openolt.SerialNumber {
 
 	sn := new(openolt.SerialNumber)
 
@@ -477,10 +488,8 @@
 }
 
 func (o *Onu) sendOnuIndication(msg bbsim.OnuIndicationMessage, stream openolt.Openolt_EnableIndicationServer) {
-	// NOTE voltha returns an ID, but if we use that ID then it complains:
-	// expected_onu_id: 1, received_onu_id: 1024, event: ONU-id-mismatch, can happen if both voltha and the olt rebooted
-	// so we're using the internal ID that is 1
-	// o.ID = msg.OnuID
+	// NOTE the ONU ID is set by VOLTHA in the ActivateOnu call (via openolt.proto)
+	// and stored in the Onu struct via onu.SetID
 
 	indData := &openolt.Indication_OnuInd{OnuInd: &openolt.OnuIndication{
 		IntfId:       o.PonPortID,
@@ -495,11 +504,12 @@
 		return
 	}
 	onuLogger.WithFields(log.Fields{
-		"IntfId":     o.PonPortID,
-		"OnuId":      o.ID,
-		"OperState":  msg.OperState.String(),
-		"AdminState": msg.OperState.String(),
-		"OnuSn":      o.Sn(),
+		"IntfId":      o.PonPortID,
+		"OnuId":       o.ID,
+		"VolthaOnuId": msg.OnuID,
+		"OperState":   msg.OperState.String(),
+		"AdminState":  msg.OperState.String(),
+		"OnuSn":       o.Sn(),
 	}).Debug("Sent Indication_OnuInd")
 
 }
@@ -707,10 +717,7 @@
 	case omci.GetRequestType:
 		responsePkt, _ = omcilib.CreateGetResponse(omciPkt, omciMsg, o.SerialNumber, o.MibDataSync, o.ActiveImageEntityId, o.CommittedImageEntityId)
 	case omci.SetRequestType:
-		if responsePkt, errResp = omcilib.CreateSetResponse(omciPkt, omciMsg); errResp == nil {
-			o.MibDataSync++
-		}
-
+		success := true
 		msgObj, _ := omcilib.ParseSetRequest(omciPkt)
 		switch msgObj.EntityClass {
 		case me.PhysicalPathTerminationPointEthernetUniClassID:
@@ -734,12 +741,101 @@
 				}
 				o.Channel <- msg
 			}
+		case me.TContClassID:
+			allocId := msgObj.Attributes["AllocId"].(uint16)
+
+			// if the AllocId is 255 (0xFF) or 65535 (0xFFFF) it means we are removing it,
+			// otherwise we are adding it
+			if allocId == 255 || allocId == 65535 {
+				onuLogger.WithFields(log.Fields{
+					"IntfId":       o.PonPortID,
+					"OnuId":        o.ID,
+					"TContId":      msgObj.EntityInstance,
+					"AllocId":      allocId,
+					"SerialNumber": o.Sn(),
+				}).Trace("freeing-alloc-id-via-omci")
+				o.PonPort.removeAllocId(o.SerialNumber)
+			} else {
+				if used, sn := o.PonPort.isAllocIdAllocated(allocId); used {
+					onuLogger.WithFields(log.Fields{
+						"IntfId":       o.PonPortID,
+						"OnuId":        o.ID,
+						"AllocId":      allocId,
+						"SerialNumber": o.Sn(),
+					}).Errorf("allocid-already-allocated-to-onu-with-sn-%s", common.OnuSnToString(sn))
+					success = false
+				} else {
+					onuLogger.WithFields(log.Fields{
+						"IntfId":       o.PonPortID,
+						"OnuId":        o.ID,
+						"TContId":      msgObj.EntityInstance,
+						"AllocId":      allocId,
+						"SerialNumber": o.Sn(),
+					}).Trace("storing-alloc-id-via-omci")
+					o.PonPort.storeAllocId(allocId, o.SerialNumber)
+				}
+			}
+
+		}
+
+		if success {
+			if responsePkt, errResp = omcilib.CreateSetResponse(omciPkt, omciMsg, me.Success); errResp == nil {
+				o.MibDataSync++
+			}
+		} else {
+			responsePkt, _ = omcilib.CreateSetResponse(omciPkt, omciMsg, me.AttributeFailure)
 		}
 	case omci.CreateRequestType:
-		if responsePkt, errResp = omcilib.CreateCreateResponse(omciPkt, omciMsg); errResp == nil {
-			o.MibDataSync++
+		// check for GemPortNetworkCtp and make sure there are no duplicates on the same PON
+		var used bool
+		var sn *openolt.SerialNumber
+		msgObj, err := omcilib.ParseCreateRequest(omciPkt)
+		if err == nil {
+			if msgObj.EntityClass == me.GemPortNetworkCtpClassID {
+				if used, sn = o.PonPort.isGemPortAllocated(msgObj.EntityInstance); used {
+					onuLogger.WithFields(log.Fields{
+						"IntfId":       o.PonPortID,
+						"OnuId":        o.ID,
+						"GemPortId":    msgObj.EntityInstance,
+						"SerialNumber": o.Sn(),
+					}).Errorf("gemport-already-allocated-to-onu-with-sn-%s", common.OnuSnToString(sn))
+				} else {
+					onuLogger.WithFields(log.Fields{
+						"IntfId":       o.PonPortID,
+						"OnuId":        o.ID,
+						"GemPortId":    msgObj.EntityInstance,
+						"SerialNumber": o.Sn(),
+					}).Trace("storing-gem-port-id-via-omci")
+					o.PonPort.storeGemPort(msgObj.EntityInstance, o.SerialNumber)
+				}
+			}
+		}
+
+		// if the gemPort is valid then increment the MDS and return a successful response
+		// otherwise fail the request
+		// for now the CreateRequeste for the gemPort is the only one that can fail, if we start supporting multiple
+		// validation this check will need to be rewritten
+		if !used {
+			if responsePkt, errResp = omcilib.CreateCreateResponse(omciPkt, omciMsg, me.Success); errResp == nil {
+				o.MibDataSync++
+			}
+		} else {
+			responsePkt, _ = omcilib.CreateCreateResponse(omciPkt, omciMsg, me.ProcessingError)
 		}
 	case omci.DeleteRequestType:
+		msgObj, err := omcilib.ParseDeleteRequest(omciPkt)
+		if err == nil {
+			if msgObj.EntityClass == me.GemPortNetworkCtpClassID {
+				onuLogger.WithFields(log.Fields{
+					"IntfId":       o.PonPortID,
+					"OnuId":        o.ID,
+					"GemPortId":    msgObj.EntityInstance,
+					"SerialNumber": o.Sn(),
+				}).Trace("freeing-gem-port-id-via-omci")
+				o.PonPort.removeGemPort(msgObj.EntityInstance)
+			}
+		}
+
 		if responsePkt, errResp = omcilib.CreateDeleteResponse(omciPkt, omciMsg); errResp == nil {
 			o.MibDataSync++
 		}
@@ -1037,6 +1133,7 @@
 
 func (o *Onu) handleFlowAdd(msg bbsim.OnuFlowUpdateMessage) {
 	onuLogger.WithFields(log.Fields{
+		"AllocId":           msg.Flow.AllocId,
 		"Cookie":            msg.Flow.Cookie,
 		"DstPort":           msg.Flow.Classifier.DstPort,
 		"FlowId":            msg.Flow.FlowId,
@@ -1123,7 +1220,6 @@
 			"OnuId":        o.ID,
 			"SerialNumber": o.Sn(),
 		}).Info("Resetting GemPort")
-		o.GemPortAdded = false
 
 		// check if ONU delete is performed and
 		// terminate the ONU's ProcessOnuMessages Go routine
@@ -1245,25 +1341,7 @@
 
 		if o.seqNumber > 290 {
 			// NOTE we are done with the MIB Upload (290 is the number of messages the omci-sim library will respond to)
-			galEnet, _ := omcilib.CreateGalEnetRequest(o.getNextTid(false))
-			sendOmciMsg(galEnet, o.PonPortID, o.ID, o.SerialNumber, "CreateGalEnetRequest", client)
-		} else {
-			mibUploadNext, _ := omcilib.CreateMibUploadNextRequest(o.getNextTid(false), o.seqNumber)
-			sendOmciMsg(mibUploadNext, o.PonPortID, o.ID, o.SerialNumber, "mibUploadNext", client)
-		}
-	case omci.CreateResponseType:
-		// NOTE Creating a GemPort,
-		// BBsim actually doesn't care about the values, so we can do we want with the parameters
-		// In the same way we can create a GemPort even without setting up UNIs/TConts/...
-		// but we need the GemPort to trigger the state change
-
-		if !o.GemPortAdded {
-			// NOTE this sends a CreateRequestType and BBSim replies with a CreateResponseType
-			// thus we send this request only once
-			gemReq, _ := omcilib.CreateGemPortRequest(o.getNextTid(false))
-			sendOmciMsg(gemReq, o.PonPortID, o.ID, o.SerialNumber, "CreateGemPortRequest", client)
-			o.GemPortAdded = true
-		} else {
+			// start sending the flows, we don't care about the OMCI setup in BBR, just that a lot of messages can go through
 			if err := o.InternalState.Event(BbrOnuTxSendEapolFlow); err != nil {
 				onuLogger.WithFields(log.Fields{
 					"OnuId":  o.ID,
@@ -1271,6 +1349,9 @@
 					"OnuSn":  o.Sn(),
 				}).Errorf("Error while transitioning ONU State %v", err)
 			}
+		} else {
+			mibUploadNext, _ := omcilib.CreateMibUploadNextRequest(o.getNextTid(false), o.seqNumber)
+			sendOmciMsg(mibUploadNext, o.PonPortID, o.ID, o.SerialNumber, "mibUploadNext", client)
 		}
 	}
 }
@@ -1290,14 +1371,16 @@
 		UniId:         int32(0), // NOTE do not hardcode this, we need to support multiple UNIs
 		FlowId:        uint64(o.ID),
 		FlowType:      "downstream",
-		AllocId:       int32(0),
 		NetworkIntfId: int32(0),
-		GemportId:     int32(1), // FIXME use the same value as CreateGemPortRequest PortID, do not hardcode
 		Classifier:    &classifierProto,
 		Action:        &actionProto,
 		Priority:      int32(100),
 		Cookie:        uint64(o.ID),
-		PortNo:        uint32(o.ID), // NOTE we are using this to map an incoming packetIndication to an ONU
+		PortNo:        o.ID, // NOTE we are using this to map an incoming packetIndication to an ONU
+		// AllocId and GemPorts need to be unique per PON
+		// for now use the ONU-ID, will need to change once we support multiple UNIs
+		AllocId:   int32(o.ID),
+		GemportId: int32(o.ID),
 	}
 
 	if _, err := client.FlowAdd(context.Background(), &downstreamFlow); err != nil {
@@ -1307,6 +1390,7 @@
 			"FlowId":       downstreamFlow.FlowId,
 			"PortNo":       downstreamFlow.PortNo,
 			"SerialNumber": common.OnuSnToString(o.SerialNumber),
+			"Err":          err,
 		}).Fatalf("Failed to add EAPOL Flow")
 	}
 	log.WithFields(log.Fields{
@@ -1338,14 +1422,16 @@
 		UniId:         int32(0), // FIXME do not hardcode this
 		FlowId:        uint64(o.ID),
 		FlowType:      "downstream",
-		AllocId:       int32(0),
 		NetworkIntfId: int32(0),
-		GemportId:     int32(1), // FIXME use the same value as CreateGemPortRequest PortID, do not hardcode
 		Classifier:    &classifierProto,
 		Action:        &actionProto,
 		Priority:      int32(100),
 		Cookie:        uint64(o.ID),
-		PortNo:        uint32(o.ID), // NOTE we are using this to map an incoming packetIndication to an ONU
+		PortNo:        o.ID, // NOTE we are using this to map an incoming packetIndication to an ONU
+		// AllocId and GemPorts need to be unique per PON
+		// for now use the ONU-ID, will need to change once we support multiple UNIs
+		AllocId:   int32(o.ID),
+		GemportId: int32(o.ID),
 	}
 
 	if _, err := client.FlowAdd(context.Background(), &downstreamFlow); err != nil {
@@ -1355,6 +1441,7 @@
 			"FlowId":       downstreamFlow.FlowId,
 			"PortNo":       downstreamFlow.PortNo,
 			"SerialNumber": common.OnuSnToString(o.SerialNumber),
+			"Err":          err,
 		}).Fatalf("Failed to send DHCP Flow")
 	}
 	log.WithFields(log.Fields{
diff --git a/internal/bbsim/devices/onu_flow_test.go b/internal/bbsim/devices/onu_flow_test.go
index e0ad9fe..b116843 100644
--- a/internal/bbsim/devices/onu_flow_test.go
+++ b/internal/bbsim/devices/onu_flow_test.go
@@ -100,8 +100,6 @@
 		fsm.Callbacks{},
 	)
 
-	onu.GemPortAdded = true
-
 	onu.FlowIds = []uint64{64}
 
 	flow := openolt.Flow{
@@ -115,7 +113,6 @@
 	}
 	onu.handleFlowRemove(msg)
 	assert.Equal(t, len(onu.FlowIds), 0)
-	assert.Equal(t, onu.GemPortAdded, false)
 }
 
 func TestOnu_HhandleEAPOLStart(t *testing.T) {
@@ -429,8 +426,6 @@
 	// one we received with the EAPOL flow
 	onu := createMockOnu(1, 1)
 
-	onu.GemPortAdded = false
-
 	onu.InternalState = fsm.NewFSM(
 		"enabled",
 		fsm.Events{
diff --git a/internal/bbsim/devices/onu_omci_test.go b/internal/bbsim/devices/onu_omci_test.go
index 4b9cc26..4d706b9 100644
--- a/internal/bbsim/devices/onu_omci_test.go
+++ b/internal/bbsim/devices/onu_omci_test.go
@@ -17,6 +17,7 @@
 package devices
 
 import (
+	"github.com/google/gopacket"
 	bbsim "github.com/opencord/bbsim/internal/bbsim/types"
 	omcilib "github.com/opencord/bbsim/internal/common/omci"
 	"github.com/opencord/omci-lib-go"
@@ -27,7 +28,6 @@
 )
 
 var mockAttr = me.AttributeValueMap{
-	"ManagedEntityId":                     12,
 	"PortId":                              0,
 	"TContPointer":                        0,
 	"Direction":                           0,
@@ -46,6 +46,7 @@
 		},
 		Attributes: mockAttr,
 	}
+
 	omciPkt, err := omcilib.Serialize(omci.CreateRequestType, omciReq, 66)
 	if err != nil {
 		t.Fatal(err.Error())
@@ -119,6 +120,34 @@
 	}
 }
 
+func omciBytesToMsg(t *testing.T, data []byte) (*omci.OMCI, *gopacket.Packet) {
+	packet := gopacket.NewPacket(data, omci.LayerTypeOMCI, gopacket.NoCopy)
+	if packet == nil {
+		t.Fatal("could not decode rxMsg as OMCI")
+	}
+	omciLayer := packet.Layer(omci.LayerTypeOMCI)
+	if omciLayer == nil {
+		t.Fatal("could not decode omci layer")
+	}
+	omciMsg, ok := omciLayer.(*omci.OMCI)
+	if !ok {
+		t.Fatal("could not assign omci layer")
+	}
+	return omciMsg, &packet
+}
+
+func omciToCreateResponse(t *testing.T, omciPkt *gopacket.Packet) *omci.CreateResponse {
+	msgLayer := (*omciPkt).Layer(omci.LayerTypeCreateResponse)
+	if msgLayer == nil {
+		t.Fatal("omci Msg layer could not be detected for CreateResponse - handling of MibSyncChan stopped")
+	}
+	msgObj, msgOk := msgLayer.(*omci.CreateResponse)
+	if !msgOk {
+		t.Fatal("omci Msg layer could not be assigned for CreateResponse - handling of MibSyncChan stopped")
+	}
+	return msgObj
+}
+
 func Test_MibDataSyncIncrease(t *testing.T) {
 	onu := createMockOnu(1, 1)
 
@@ -174,3 +203,39 @@
 	onu.handleOmciRequest(makeOmciMessage(t, onu, makeOmciDeleteRequest(t)), stream)
 	assert.Equal(t, onu.MibDataSync, uint8(0))
 }
+
+func Test_GemPortValidation(t *testing.T) {
+
+	// setup
+	onu := createMockOnu(1, 1)
+
+	stream := &mockStream{
+		Calls: make(map[int]*openolt.Indication),
+	}
+
+	// create a gem port via OMCI (gemPortId 12)
+	onu.handleOmciRequest(makeOmciMessage(t, onu, makeOmciCreateRequest(t)), stream)
+
+	// the first time we created the gemPort
+	// the MDS should be incremented
+	assert.Equal(t, stream.CallCount, 1)
+	assert.Equal(t, onu.MibDataSync, uint8(1))
+
+	// and the OMCI response status should be me.Success
+	indication := stream.Calls[1].GetOmciInd()
+	_, omciPkt := omciBytesToMsg(t, indication.Pkt)
+	responseLayer := omciToCreateResponse(t, omciPkt)
+	assert.Equal(t, responseLayer.Result, me.Success)
+
+	// send a request to create the same gem port via OMCI (gemPortId 12)
+	onu.handleOmciRequest(makeOmciMessage(t, onu, makeOmciCreateRequest(t)), stream)
+
+	// this time the MDS should not be incremented
+	assert.Equal(t, stream.CallCount, 2)
+	assert.Equal(t, onu.MibDataSync, uint8(1))
+
+	// and the OMCI response status should be me.ProcessingError
+	_, omciPkt = omciBytesToMsg(t, stream.Calls[2].GetOmciInd().Pkt)
+	responseLayer = omciToCreateResponse(t, omciPkt)
+	assert.Equal(t, responseLayer.Result, me.ProcessingError)
+}
diff --git a/internal/bbsim/devices/onu_state_machine_test.go b/internal/bbsim/devices/onu_state_machine_test.go
index 6f62540..f0fd232 100644
--- a/internal/bbsim/devices/onu_state_machine_test.go
+++ b/internal/bbsim/devices/onu_state_machine_test.go
@@ -37,7 +37,6 @@
 	assert.Equal(t, onu.InternalState.Current(), OnuStateEnabled)
 
 	onu.PortNo = 16
-	onu.GemPortAdded = true
 	onu.Flows = []FlowKey{
 		{ID: 1, Direction: "upstream"},
 		{ID: 2, Direction: "downstream"},
@@ -46,7 +45,6 @@
 	_ = onu.InternalState.Event(OnuTxDisable)
 	assert.Equal(t, onu.InternalState.Current(), OnuStateDisabled)
 
-	assert.Equal(t, onu.GemPortAdded, false)
 	assert.Equal(t, onu.PortNo, uint32(0))
 	assert.Equal(t, len(onu.Flows), 0)
 }
@@ -95,7 +93,6 @@
 	assert.Equal(t, onu.InternalState.Current(), OnuStateEnabled)
 
 	// succeed
-	onu.GemPortAdded = true
 	_ = onu.InternalState.Event("start_auth")
 	assert.Equal(t, onu.InternalState.Current(), "auth_started")
 }
@@ -104,8 +101,6 @@
 	t.Skip("Needs to be moved in the Service struct")
 	onu := createTestOnu()
 
-	onu.GemPortAdded = true
-
 	onu.InternalState.SetState("auth_started")
 
 	assert.Equal(t, onu.InternalState.Current(), "auth_started")
@@ -168,8 +163,6 @@
 	onu.InternalState.SetState("eap_response_success_received")
 	assert.Equal(t, onu.InternalState.Current(), "eap_response_success_received")
 
-	onu.GemPortAdded = false
-
 	err := onu.InternalState.Event("start_dhcp")
 	if err == nil {
 		t.Fail()
@@ -181,7 +174,6 @@
 func Test_Onu_StateMachine_dhcp_start(t *testing.T) {
 	t.Skip("Needs to be moved in the Service struct")
 	onu := createTestOnu()
-	onu.GemPortAdded = true
 
 	onu.InternalState.SetState("eap_response_success_received")
 	assert.Equal(t, onu.InternalState.Current(), "eap_response_success_received")
@@ -195,8 +187,6 @@
 	t.Skip("Needs to be moved in the Service struct")
 	onu := createTestOnu()
 
-	onu.GemPortAdded = true
-
 	onu.InternalState.SetState("dhcp_started")
 
 	assert.Equal(t, onu.InternalState.Current(), "dhcp_started")
diff --git a/internal/bbsim/devices/onu_test_helpers.go b/internal/bbsim/devices/onu_test_helpers.go
index f0d5dab..2f18199 100644
--- a/internal/bbsim/devices/onu_test_helpers.go
+++ b/internal/bbsim/devices/onu_test_helpers.go
@@ -137,15 +137,16 @@
 // this method creates a fake ONU used in the tests
 func createMockOnu(id uint32, ponPortId uint32) *Onu {
 	o := Onu{
-		ID:           id,
-		PonPortID:    ponPortId,
-		PortNo:       0,
-		GemPortAdded: true,
+		ID:        id,
+		PonPortID: ponPortId,
+		PortNo:    0,
 		PonPort: &PonPort{
-			Olt: &OltDevice{},
+			AllocatedGemPorts: make(map[uint16]*openolt.SerialNumber),
+			AllocatedAllocIds: make(map[uint16]*openolt.SerialNumber),
+			Olt:               &OltDevice{},
 		},
 	}
-	o.SerialNumber = o.NewSN(0, ponPortId, o.ID)
+	o.SerialNumber = NewSN(0, ponPortId, o.ID)
 	o.Channel = make(chan types.Message, 10)
 	return &o
 }
@@ -155,11 +156,10 @@
 	olt := OltDevice{
 		ID: 0,
 	}
-	pon := PonPort{
-		ID:  1,
-		Olt: &olt,
-	}
-	onu := CreateONU(&olt, &pon, 1, time.Duration(1*time.Millisecond), true)
+
+	pon := CreatePonPort(&olt, 1)
+
+	onu := CreateONU(&olt, pon, 1, time.Duration(1*time.Millisecond), true)
 	// NOTE we need this in order to create the OnuChannel
 	_ = onu.InternalState.Event(OnuTxInitialize)
 	onu.DiscoveryRetryDelay = 100 * time.Millisecond
diff --git a/internal/bbsim/devices/pon.go b/internal/bbsim/devices/pon.go
index 1e0483d..3d74eb1 100644
--- a/internal/bbsim/devices/pon.go
+++ b/internal/bbsim/devices/pon.go
@@ -19,6 +19,7 @@
 import (
 	"bytes"
 	"fmt"
+	"sync"
 
 	"github.com/looplab/fsm"
 	"github.com/opencord/voltha-protos/v4/go/openolt"
@@ -37,17 +38,30 @@
 	// PON Attributes
 	OperState *fsm.FSM
 	Type      string
+
+	// Allocated resources
+	// Some resources (eg: OnuId, AllocId and GemPorts) have to be unique per PON port
+	// we are keeping a list so that we can throw an error in cases we receive duplicates
+	AllocatedGemPorts     map[uint16]*openolt.SerialNumber
+	allocatedGemPortsLock sync.RWMutex
+	AllocatedOnuIds       map[uint32]*openolt.SerialNumber
+	allocatedOnuIdsLock   sync.RWMutex
+	AllocatedAllocIds     map[uint16]*openolt.SerialNumber
+	allocatedAllocIdsLock sync.RWMutex
 }
 
 // CreatePonPort creates pon port object
 func CreatePonPort(olt *OltDevice, id uint32) *PonPort {
 
 	ponPort := PonPort{
-		NumOnu: olt.NumOnuPerPon,
-		ID:     id,
-		Type:   "pon",
-		Olt:    olt,
-		Onus:   []*Onu{},
+		NumOnu:            olt.NumOnuPerPon,
+		ID:                id,
+		Type:              "pon",
+		Olt:               olt,
+		Onus:              []*Onu{},
+		AllocatedGemPorts: make(map[uint16]*openolt.SerialNumber),
+		AllocatedOnuIds:   make(map[uint32]*openolt.SerialNumber),
+		AllocatedAllocIds: make(map[uint16]*openolt.SerialNumber),
 	}
 
 	ponPort.InternalState = fsm.NewFSM(
@@ -170,7 +184,7 @@
 	return &ponPort
 }
 
-func (p PonPort) GetOnuBySn(sn *openolt.SerialNumber) (*Onu, error) {
+func (p *PonPort) GetOnuBySn(sn *openolt.SerialNumber) (*Onu, error) {
 	for _, onu := range p.Onus {
 		if bytes.Equal(onu.SerialNumber.VendorSpecific, sn.VendorSpecific) {
 			return onu, nil
@@ -179,7 +193,7 @@
 	return nil, fmt.Errorf("Cannot find Onu with serial number %d in PonPort %d", sn, p.ID)
 }
 
-func (p PonPort) GetOnuById(id uint32) (*Onu, error) {
+func (p *PonPort) GetOnuById(id uint32) (*Onu, error) {
 	for _, onu := range p.Onus {
 		if onu.ID == id {
 			return onu, nil
@@ -189,7 +203,7 @@
 }
 
 // GetNumOfActiveOnus returns number of active ONUs for PON port
-func (p PonPort) GetNumOfActiveOnus() uint32 {
+func (p *PonPort) GetNumOfActiveOnus() uint32 {
 	var count uint32 = 0
 	for _, onu := range p.Onus {
 		if onu.InternalState.Current() == OnuStateInitialized || onu.InternalState.Current() == OnuStateCreated || onu.InternalState.Current() == OnuStateDisabled {
@@ -199,3 +213,111 @@
 	}
 	return count
 }
+
+// storeOnuId adds the Id to the ONU Ids already allocated to this PON port
+func (p *PonPort) storeOnuId(onuId uint32, onuSn *openolt.SerialNumber) {
+	p.allocatedOnuIdsLock.Lock()
+	defer p.allocatedOnuIdsLock.Unlock()
+	p.AllocatedOnuIds[onuId] = onuSn
+}
+
+// removeOnuId removes the OnuId from the allocated resources
+func (p *PonPort) removeOnuId(onuId uint32) {
+	p.allocatedOnuIdsLock.Lock()
+	defer p.allocatedOnuIdsLock.Unlock()
+	delete(p.AllocatedOnuIds, onuId)
+}
+
+func (p *PonPort) removeAllOnuIds() {
+	p.allocatedOnuIdsLock.Lock()
+	defer p.allocatedOnuIdsLock.Unlock()
+	p.AllocatedOnuIds = make(map[uint32]*openolt.SerialNumber)
+}
+
+// isOnuIdAllocated returns whether this OnuId is already in use on this PON
+func (p *PonPort) isOnuIdAllocated(onuId uint32) (bool, *openolt.SerialNumber) {
+	p.allocatedOnuIdsLock.RLock()
+	defer p.allocatedOnuIdsLock.RUnlock()
+
+	if _, ok := p.AllocatedOnuIds[onuId]; ok {
+		return true, p.AllocatedOnuIds[onuId]
+	}
+	return false, nil
+}
+
+// storeGemPort adds the gemPortId to the gemports already allocated to this PON port
+func (p *PonPort) storeGemPort(gemPortId uint16, onuSn *openolt.SerialNumber) {
+	p.allocatedGemPortsLock.Lock()
+	defer p.allocatedGemPortsLock.Unlock()
+	p.AllocatedGemPorts[gemPortId] = onuSn
+}
+
+// removeGemPort removes the gemPortId from the allocated resources
+func (p *PonPort) removeGemPort(gemPortId uint16) {
+	p.allocatedGemPortsLock.Lock()
+	defer p.allocatedGemPortsLock.Unlock()
+	delete(p.AllocatedGemPorts, gemPortId)
+}
+
+func (p *PonPort) removeGemPortBySn(onuSn *openolt.SerialNumber) {
+	p.allocatedGemPortsLock.Lock()
+	defer p.allocatedGemPortsLock.Unlock()
+	for gemPort, sn := range p.AllocatedGemPorts {
+		if sn == onuSn {
+			delete(p.AllocatedGemPorts, gemPort)
+		}
+	}
+}
+
+func (p *PonPort) removeAllGemPorts() {
+	p.allocatedGemPortsLock.Lock()
+	defer p.allocatedGemPortsLock.Unlock()
+	p.AllocatedGemPorts = make(map[uint16]*openolt.SerialNumber)
+}
+
+// isGemPortAllocated returns whether this gemPort is already in use on this PON
+func (p *PonPort) isGemPortAllocated(gemPortId uint16) (bool, *openolt.SerialNumber) {
+	p.allocatedGemPortsLock.RLock()
+	defer p.allocatedGemPortsLock.RUnlock()
+
+	if _, ok := p.AllocatedGemPorts[gemPortId]; ok {
+		return true, p.AllocatedGemPorts[gemPortId]
+	}
+	return false, nil
+}
+
+// storeAllocId adds the Id to the ONU Ids already allocated to this PON port
+func (p *PonPort) storeAllocId(allocId uint16, onuSn *openolt.SerialNumber) {
+	p.allocatedAllocIdsLock.Lock()
+	defer p.allocatedAllocIdsLock.Unlock()
+	p.AllocatedAllocIds[allocId] = onuSn
+}
+
+// removeAllocId removes the AllocId from the allocated resources
+// this is done via SN as the AllocId is not remove but set to a default value
+func (p *PonPort) removeAllocId(onuSn *openolt.SerialNumber) {
+	p.allocatedAllocIdsLock.Lock()
+	defer p.allocatedAllocIdsLock.Unlock()
+	for allocId, sn := range p.AllocatedAllocIds {
+		if sn == onuSn {
+			delete(p.AllocatedAllocIds, allocId)
+		}
+	}
+}
+
+func (p *PonPort) removeAllAllocIds() {
+	p.allocatedAllocIdsLock.Lock()
+	defer p.allocatedAllocIdsLock.Unlock()
+	p.AllocatedAllocIds = make(map[uint16]*openolt.SerialNumber)
+}
+
+// isAllocIdAllocated returns whether this AllocId is already in use on this PON
+func (p *PonPort) isAllocIdAllocated(allocId uint16) (bool, *openolt.SerialNumber) {
+	p.allocatedAllocIdsLock.RLock()
+	defer p.allocatedAllocIdsLock.RUnlock()
+
+	if _, ok := p.AllocatedAllocIds[allocId]; ok {
+		return true, p.AllocatedAllocIds[allocId]
+	}
+	return false, nil
+}
diff --git a/internal/bbsim/devices/pon_test.go b/internal/bbsim/devices/pon_test.go
new file mode 100644
index 0000000..45e1861
--- /dev/null
+++ b/internal/bbsim/devices/pon_test.go
@@ -0,0 +1,151 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package devices
+
+import (
+	"github.com/opencord/voltha-protos/v4/go/openolt"
+	"github.com/stretchr/testify/assert"
+	"sync"
+	"testing"
+)
+
+var sn1 = NewSN(0, 0, 1)
+var sn2 = NewSN(0, 0, 2)
+var sn3 = NewSN(0, 0, 3)
+
+// NOTE that we are using a benchmark test to actually test concurrency
+func Benchmark_storeGemPort(b *testing.B) {
+	pon := PonPort{
+		AllocatedGemPorts: make(map[uint16]*openolt.SerialNumber),
+	}
+
+	wg := sync.WaitGroup{}
+	wg.Add(3)
+
+	// concurrently add multiple ports
+	go func(wg *sync.WaitGroup) { pon.storeGemPort(1, sn1); wg.Done() }(&wg)
+	go func(wg *sync.WaitGroup) { pon.storeGemPort(2, sn2); wg.Done() }(&wg)
+	go func(wg *sync.WaitGroup) { pon.storeGemPort(3, sn3); wg.Done() }(&wg)
+
+	wg.Wait()
+
+	assert.Equal(b, len(pon.AllocatedGemPorts), 3)
+}
+
+func Benchmark_removeGemPort(b *testing.B) {
+	pon := PonPort{
+		AllocatedGemPorts: make(map[uint16]*openolt.SerialNumber),
+	}
+
+	pon.storeGemPort(1, sn1)
+	pon.storeGemPort(2, sn2)
+	pon.storeGemPort(3, sn3)
+
+	assert.Equal(b, len(pon.AllocatedGemPorts), 3)
+
+	wg := sync.WaitGroup{}
+	wg.Add(3)
+
+	// concurrently add multiple ports
+	go func(wg *sync.WaitGroup) { pon.removeGemPort(1); wg.Done() }(&wg)
+	go func(wg *sync.WaitGroup) { pon.removeGemPort(2); wg.Done() }(&wg)
+	go func(wg *sync.WaitGroup) { pon.removeGemPort(3); wg.Done() }(&wg)
+
+	wg.Wait()
+
+	assert.Equal(b, len(pon.AllocatedGemPorts), 0)
+}
+
+func Test_removeGemPort(t *testing.T) {
+	pon := &PonPort{
+		AllocatedGemPorts: make(map[uint16]*openolt.SerialNumber),
+	}
+
+	pon.storeGemPort(1, sn1)
+	pon.storeGemPort(2, sn2)
+	assert.Equal(t, len(pon.AllocatedGemPorts), 2)
+
+	// remove a non exiting gemPort
+	pon.removeGemPort(3)
+	assert.Equal(t, len(pon.AllocatedGemPorts), 2)
+
+	// remove an existing gemPort
+	pon.removeGemPort(1)
+	assert.Equal(t, len(pon.AllocatedGemPorts), 1)
+
+}
+
+func Test_removeGemPortBySn(t *testing.T) {
+	pon := &PonPort{
+		AllocatedGemPorts: make(map[uint16]*openolt.SerialNumber),
+	}
+
+	pon.storeGemPort(1, sn1)
+	pon.storeGemPort(2, sn2)
+	assert.Equal(t, len(pon.AllocatedGemPorts), 2)
+
+	// remove a non exiting gemPort
+	pon.removeGemPortBySn(sn1)
+	assert.Equal(t, len(pon.AllocatedGemPorts), 1)
+	assert.Nil(t, pon.AllocatedGemPorts[1])
+	assert.Equal(t, pon.AllocatedGemPorts[2], sn2)
+}
+
+func Test_isGemPortAllocated(t *testing.T) {
+	pon := &PonPort{
+		AllocatedGemPorts: make(map[uint16]*openolt.SerialNumber),
+	}
+
+	pon.storeGemPort(1, sn1)
+
+	assert.Equal(t, len(pon.AllocatedGemPorts), 1)
+
+	free, sn := pon.isGemPortAllocated(1)
+
+	assert.Equal(t, free, true)
+	assert.Equal(t, sn, sn1)
+
+	used, sn_ := pon.isGemPortAllocated(2)
+
+	assert.Equal(t, used, false)
+	assert.Nil(t, sn_)
+}
+
+// the allocId is never removed, is always set to either 255 or 65535
+func Test_removeAllocId(t *testing.T) {
+
+	const (
+		allocId1 = 1024
+		allocId2 = 1025
+	)
+
+	pon := &PonPort{
+		AllocatedAllocIds: make(map[uint16]*openolt.SerialNumber),
+	}
+
+	pon.AllocatedAllocIds[allocId1] = sn1
+	pon.AllocatedAllocIds[allocId2] = sn2
+
+	assert.Equal(t, len(pon.AllocatedAllocIds), 2)
+
+	pon.removeAllocId(sn1)
+
+	assert.Equal(t, len(pon.AllocatedAllocIds), 1)
+	assert.Nil(t, pon.AllocatedAllocIds[allocId1])
+	assert.Equal(t, pon.AllocatedAllocIds[allocId2], sn2)
+
+}
diff --git a/internal/bbsim/responders/dhcp/dhcp_test.go b/internal/bbsim/responders/dhcp/dhcp_test.go
index 7b598a0..3334e82 100644
--- a/internal/bbsim/responders/dhcp/dhcp_test.go
+++ b/internal/bbsim/responders/dhcp/dhcp_test.go
@@ -18,7 +18,6 @@
 
 import (
 	"errors"
-	"fmt"
 	"net"
 	"testing"
 
@@ -68,10 +67,6 @@
 	xid2 := macAddressToTxId(mac2)
 	xid3 := macAddressToTxId(mac3)
 
-	fmt.Println(xid1)
-	fmt.Println(xid2)
-	fmt.Println(xid3)
-
 	assert.NotEqual(t, xid1, xid2)
 	assert.NotEqual(t, xid1, xid3)
 	assert.NotEqual(t, xid2, xid3)
diff --git a/internal/bbsim/responders/sadis/sadis_test.go b/internal/bbsim/responders/sadis/sadis_test.go
index 76facb5..a3ca6df 100644
--- a/internal/bbsim/responders/sadis/sadis_test.go
+++ b/internal/bbsim/responders/sadis/sadis_test.go
@@ -38,7 +38,7 @@
 
 	mac := net.HardwareAddr{0x2e, 0x60, 0x01, byte(1), byte(1), byte(0)}
 
-	onu.SerialNumber = onu.NewSN(0, onu.PonPortID, onu.ID)
+	onu.SerialNumber = devices.NewSN(0, onu.PonPortID, onu.ID)
 	onu.Services = []devices.ServiceIf{
 		&devices.Service{Name: "hsia", CTag: 923, STag: 900, NeedsEapol: true, NeedsDhcp: true, NeedsIgmp: true, HwAddress: mac, TechnologyProfileID: 64},
 	}
diff --git a/internal/bbsimctl/commands/helpers.go b/internal/bbsimctl/commands/helpers.go
new file mode 100644
index 0000000..f2a9ee1
--- /dev/null
+++ b/internal/bbsimctl/commands/helpers.go
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package commands
+
+import (
+	pb "github.com/opencord/bbsim/api/bbsim"
+	"github.com/opencord/bbsim/internal/bbsimctl/config"
+	log "github.com/sirupsen/logrus"
+	"google.golang.org/grpc"
+)
+
+func connect() (pb.BBSimClient, *grpc.ClientConn) {
+	conn, err := grpc.Dial(config.GlobalConfig.Server, grpc.WithInsecure())
+
+	if err != nil {
+		log.Fatalf("did not connect: %v", err)
+		return nil, conn
+	}
+	return pb.NewBBSimClient(conn), conn
+}
diff --git a/internal/bbsimctl/commands/olt.go b/internal/bbsimctl/commands/olt.go
index bed3733..1789c86 100644
--- a/internal/bbsimctl/commands/olt.go
+++ b/internal/bbsimctl/commands/olt.go
@@ -22,6 +22,7 @@
 	"fmt"
 	"os"
 	"strconv"
+	"strings"
 
 	"github.com/jessevdk/go-flags"
 	"github.com/olekukonko/tablewriter"
@@ -29,12 +30,13 @@
 	"github.com/opencord/bbsim/internal/bbsimctl/config"
 	"github.com/opencord/cordctl/pkg/format"
 	log "github.com/sirupsen/logrus"
-	"google.golang.org/grpc"
 )
 
 const (
-	DEFAULT_OLT_DEVICE_HEADER_FORMAT = "table{{ .ID }}\t{{ .SerialNumber }}\t{{ .OperState }}\t{{ .InternalState }}\t{{ .IP }}"
-	DEFAULT_PORT_HEADER_FORMAT       = "table{{ .ID }}\t{{ .OperState }}"
+	DEFAULT_OLT_DEVICE_HEADER_FORMAT    = "table{{ .ID }}\t{{ .SerialNumber }}\t{{ .OperState }}\t{{ .InternalState }}\t{{ .IP }}"
+	DEFAULT_OLT_RESOURCES_HEADER_FORMAT = "table{{ .Type }}\t{{ .PonPortId }}\t{{ .OnuId }}\t{{ .PortNo }}\t{{ .ResourceId }}\t{{ .FlowId }}"
+	DEFAULT_NNI_PORT_HEADER_FORMAT      = "table{{ .ID }}\t{{ .OperState }}\t{{ .InternalState }}\t{{ .PacketCount }}"
+	DEFAULT_PON_PORT_HEADER_FORMAT      = "table{{ .ID }}\t{{ .OperState }}\t{{ .InternalState }}\t{{ .PacketCount }}\t{{ .AllocatedOnuIds }}\t{{ .AllocatedGemPorts }}\t{{ .AllocatedAllocIds }}"
 )
 
 type OltGet struct{}
@@ -64,8 +66,16 @@
 
 type OltShutdownAllOnus struct{}
 
+type oltResourcesType string
+type OltResources struct {
+	Args struct {
+		Type oltResourcesType
+	} `positional-args:"yes" required:"yes"`
+}
+
 type oltOptions struct {
 	Get             OltGet             `command:"get"`
+	GetResources    OltResources       `command:"resources"`
 	NNI             OltNNIs            `command:"nnis"`
 	PON             OltPONs            `command:"pons"`
 	Shutdown        OltShutdown        `command:"shutdown"`
@@ -85,19 +95,12 @@
 }
 
 func getOLT() *pb.Olt {
-	conn, err := grpc.Dial(config.GlobalConfig.Server, grpc.WithInsecure())
-	if err != nil {
-		log.Fatalf("did not connect: %v", err)
-		return nil
-	}
+	client, conn := connect()
 	defer conn.Close()
-	c := pb.NewBBSimClient(conn)
-
-	// Contact the server and print out its response.
 
 	ctx, cancel := context.WithTimeout(context.Background(), config.GlobalConfig.Grpc.Timeout)
 	defer cancel()
-	olt, err := c.GetOlt(ctx, &pb.Empty{})
+	olt, err := client.GetOlt(ctx, &pb.Empty{})
 	if err != nil {
 		log.Fatalf("could not get OLT: %v", err)
 		return nil
@@ -120,12 +123,34 @@
 	return nil
 }
 
+func (o *OltResources) Execute(args []string) error {
+	client, conn := connect()
+	defer conn.Close()
+
+	resourceType := pb.OltAllocatedResourceType_Type_value[string(o.Args.Type)]
+
+	ctx, cancel := context.WithTimeout(context.Background(), config.GlobalConfig.Grpc.Timeout)
+	defer cancel()
+	resources, err := client.GetOltAllocatedResources(ctx, &pb.OltAllocatedResourceType{Type: pb.OltAllocatedResourceType_Type(resourceType)})
+
+	if err != nil {
+		log.Fatalf("could not get OLT resources: %v", err)
+		return nil
+	}
+
+	tableFormat := format.Format(DEFAULT_OLT_RESOURCES_HEADER_FORMAT)
+	if err := tableFormat.Execute(os.Stdout, true, resources.Resources); err != nil {
+		return err
+	}
+	return nil
+}
+
 func (o *OltNNIs) Execute(args []string) error {
 	olt := getOLT()
 
 	printOltHeader("NNI Ports for", olt)
 
-	tableFormat := format.Format(DEFAULT_PORT_HEADER_FORMAT)
+	tableFormat := format.Format(DEFAULT_NNI_PORT_HEADER_FORMAT)
 	_ = tableFormat.Execute(os.Stdout, true, olt.NNIPorts)
 
 	return nil
@@ -136,7 +161,7 @@
 
 	printOltHeader("PON Ports for", olt)
 
-	tableFormat := format.Format(DEFAULT_PORT_HEADER_FORMAT)
+	tableFormat := format.Format(DEFAULT_PON_PORT_HEADER_FORMAT)
 	_ = tableFormat.Execute(os.Stdout, true, olt.PONPorts)
 
 	return nil
@@ -354,3 +379,13 @@
 	fmt.Println(fmt.Sprintf("[Status: %d] %s", res.StatusCode, res.Message))
 	return nil
 }
+
+func (rt *oltResourcesType) Complete(match string) []flags.Completion {
+	list := make([]flags.Completion, 0)
+	for k := range pb.OltAllocatedResourceType_Type_value {
+		if strings.HasPrefix(k, strings.ToUpper(match)) && k != pb.OltAllocatedResourceType_UNKNOWN.String() {
+			list = append(list, flags.Completion{Item: k})
+		}
+	}
+	return list
+}
diff --git a/internal/bbsimctl/commands/onu.go b/internal/bbsimctl/commands/onu.go
index 78923e0..33bd5ef 100644
--- a/internal/bbsimctl/commands/onu.go
+++ b/internal/bbsimctl/commands/onu.go
@@ -30,7 +30,6 @@
 	"github.com/opencord/bbsim/internal/bbsimctl/config"
 	"github.com/opencord/cordctl/pkg/format"
 	log "github.com/sirupsen/logrus"
-	"google.golang.org/grpc"
 )
 
 const (
@@ -127,16 +126,6 @@
 	_, _ = parser.AddCommand("onu", "ONU Commands", "Commands to query and manipulate ONU devices", &ONUOptions{})
 }
 
-func connect() (pb.BBSimClient, *grpc.ClientConn) {
-	conn, err := grpc.Dial(config.GlobalConfig.Server, grpc.WithInsecure())
-
-	if err != nil {
-		log.Fatalf("did not connect: %v", err)
-		return nil, conn
-	}
-	return pb.NewBBSimClient(conn), conn
-}
-
 func getONUs() *pb.ONUs {
 
 	client, conn := connect()
diff --git a/internal/common/logger.go b/internal/common/logger.go
index cc6e48d..ce17bb5 100644
--- a/internal/common/logger.go
+++ b/internal/common/logger.go
@@ -21,6 +21,11 @@
 func SetLogLevel(logger *log.Logger, level string, caller bool) {
 
 	logger.SetReportCaller(caller)
+	Formatter := new(log.TextFormatter)
+	Formatter.TimestampFormat = "2006-01-02T15:04:05.999999999Z07:00"
+	Formatter.FullTimestamp = true
+	//Formatter.ForceColors = true
+	logger.SetFormatter(Formatter)
 
 	switch level {
 	case "trace":
diff --git a/internal/common/omci/create.go b/internal/common/omci/create.go
index c1e2b0b..d3c88b3 100644
--- a/internal/common/omci/create.go
+++ b/internal/common/omci/create.go
@@ -42,7 +42,7 @@
 	return msgObj, nil
 }
 
-func CreateCreateResponse(omciPkt gopacket.Packet, omciMsg *omci.OMCI) ([]byte, error) {
+func CreateCreateResponse(omciPkt gopacket.Packet, omciMsg *omci.OMCI, result me.Results) ([]byte, error) {
 
 	msgObj, err := ParseCreateRequest(omciPkt)
 
@@ -60,7 +60,7 @@
 			EntityClass:    msgObj.EntityClass,
 			EntityInstance: msgObj.EntityInstance,
 		},
-		Result: me.Success,
+		Result: result,
 	}
 
 	pkt, err := Serialize(omci.CreateResponseType, response, omciMsg.TransactionID)
@@ -78,75 +78,3 @@
 
 	return pkt, nil
 }
-
-// methods used by BBR to drive the OMCI state machine
-
-func CreateGalEnetRequest(tid uint16) ([]byte, error) {
-	params := me.ParamData{
-		EntityID:   1,
-		Attributes: me.AttributeValueMap{"MaximumGemPayloadSize": 48},
-	}
-	meDef, _ := me.NewGalEthernetProfile(params)
-	pkt, err := omci.GenFrame(meDef, omci.CreateRequestType, omci.TransactionID(tid))
-	if err != nil {
-		omciLogger.WithField("err", err).Fatalf("Can't generate GalEnetRequest")
-	}
-	return HexEncode(pkt)
-}
-
-func CreateEnableUniRequest(tid uint16, uniId uint16, enabled bool, isPtp bool) ([]byte, error) {
-
-	var _enabled uint8
-	if enabled {
-		_enabled = uint8(1)
-	} else {
-		_enabled = uint8(0)
-	}
-
-	data := me.ParamData{
-		EntityID: uniId,
-		Attributes: me.AttributeValueMap{
-			"AdministrativeState": _enabled,
-		},
-	}
-	var medef *me.ManagedEntity
-	var omciErr me.OmciErrors
-
-	if isPtp {
-		medef, omciErr = me.NewPhysicalPathTerminationPointEthernetUni(data)
-	} else {
-		medef, omciErr = me.NewVirtualEthernetInterfacePoint(data)
-	}
-	if omciErr != nil {
-		return nil, omciErr.GetError()
-	}
-	pkt, err := omci.GenFrame(medef, omci.SetRequestType, omci.TransactionID(tid))
-	if err != nil {
-		omciLogger.WithField("err", err).Fatalf("Can't generate EnableUniRequest")
-	}
-	return HexEncode(pkt)
-}
-
-func CreateGemPortRequest(tid uint16) ([]byte, error) {
-	params := me.ParamData{
-		EntityID: 1,
-		Attributes: me.AttributeValueMap{
-			"PortId":                              1,
-			"TContPointer":                        1,
-			"Direction":                           0,
-			"TrafficManagementPointerForUpstream": 0,
-			"TrafficDescriptorProfilePointerForUpstream": 0,
-			"UniCounter":                                   0,
-			"PriorityQueuePointerForDownStream":            0,
-			"EncryptionState":                              0,
-			"TrafficDescriptorProfilePointerForDownstream": 0,
-			"EncryptionKeyRing":                            0,
-		},
-	}
-	meDef, _ := me.NewGemPortNetworkCtp(params)
-	pkt, err := omci.GenFrame(meDef, omci.CreateRequestType, omci.TransactionID(tid))
-	if err != nil {
-		omciLogger.WithField("err", err).Fatalf("Can't generate GemPortRequest")
-	}
-	return HexEncode(pkt)
-}
diff --git a/internal/common/omci/create_test.go b/internal/common/omci/create_test.go
new file mode 100644
index 0000000..33d3a3e
--- /dev/null
+++ b/internal/common/omci/create_test.go
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package omci
+
+import (
+	"github.com/google/gopacket"
+	"github.com/opencord/omci-lib-go"
+	me "github.com/opencord/omci-lib-go/generated"
+	"gotest.tools/assert"
+	"testing"
+)
+
+func omciToCreateResponse(t *testing.T, omciPkt *gopacket.Packet) *omci.CreateResponse {
+	msgLayer := (*omciPkt).Layer(omci.LayerTypeCreateResponse)
+	if msgLayer == nil {
+		t.Fatal("omci Msg layer could not be detected for CreateResponse - handling of MibSyncChan stopped")
+	}
+	msgObj, msgOk := msgLayer.(*omci.CreateResponse)
+	if !msgOk {
+		t.Fatal("omci Msg layer could not be assigned for CreateResponse - handling of MibSyncChan stopped")
+	}
+	return msgObj
+}
+
+type createArgs struct {
+	omciPkt []byte
+	result  me.Results
+}
+
+type createWant struct {
+	result me.Results
+}
+
+func TestCreateResponse(t *testing.T) {
+
+	// generate a CreateRequest packet to create a GemPort
+	omciReq := &omci.CreateRequest{
+		MeBasePacket: omci.MeBasePacket{
+			EntityClass:    me.GemPortNetworkCtpClassID,
+			EntityInstance: 12,
+		},
+		Attributes: me.AttributeValueMap{
+			"PortId":                              0,
+			"TContPointer":                        0,
+			"Direction":                           0,
+			"TrafficManagementPointerForUpstream": 0,
+			"TrafficDescriptorProfilePointerForUpstream":   0,
+			"PriorityQueuePointerForDownStream":            0,
+			"TrafficDescriptorProfilePointerForDownstream": 0,
+			"EncryptionKeyRing":                            0,
+		},
+	}
+	omciPkt, err := Serialize(omci.CreateRequestType, omciReq, 66)
+	if err != nil {
+		t.Fatal(err.Error())
+	}
+
+	omciPkt, _ = HexEncode(omciPkt)
+
+	tests := []struct {
+		name string
+		args createArgs
+		want createWant
+	}{
+		{"createSuccess",
+			createArgs{omciPkt, me.Success},
+			createWant{me.Success},
+		},
+		{"createProcessingError",
+			createArgs{omciPkt, me.ProcessingError},
+			createWant{me.ProcessingError},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			pkt, msg, _ := ParseOpenOltOmciPacket(tt.args.omciPkt)
+			requestPkt, _ := CreateCreateResponse(pkt, msg, tt.args.result)
+
+			omciMsg, omciPkt := omciBytesToMsg(t, requestPkt)
+
+			assert.Equal(t, omciMsg.MessageType, omci.CreateResponseType)
+
+			getResponseLayer := omciToCreateResponse(t, omciPkt)
+
+			assert.Equal(t, getResponseLayer.Result, tt.want.result)
+
+		})
+	}
+}
diff --git a/internal/common/omci/get_test.go b/internal/common/omci/get_test.go
index ca580ed..d995bba 100644
--- a/internal/common/omci/get_test.go
+++ b/internal/common/omci/get_test.go
@@ -55,12 +55,12 @@
 	return msgObj
 }
 
-type args struct {
+type getArgs struct {
 	generatedPkt  *omci.GetResponse
 	transactionId uint16
 }
 
-type want struct {
+type getWant struct {
 	transactionId uint16
 	attributes    map[string]interface{}
 }
@@ -77,20 +77,20 @@
 
 	tests := []struct {
 		name string
-		args args
-		want want
+		args getArgs
+		want getWant
 	}{
 		{"getOnu2gResponse",
-			args{createOnu2gResponse(57344, 10), 1},
-			want{1, map[string]interface{}{"OpticalNetworkUnitManagementAndControlChannelOmccVersion": uint8(180)}},
+			getArgs{createOnu2gResponse(57344, 10), 1},
+			getWant{1, map[string]interface{}{"OpticalNetworkUnitManagementAndControlChannelOmccVersion": uint8(180)}},
 		},
 		{"getOnugResponse",
-			args{createOnugResponse(40960, 10, sn), 1},
-			want{1, map[string]interface{}{}},
+			getArgs{createOnugResponse(40960, 10, sn), 1},
+			getWant{1, map[string]interface{}{}},
 		},
 		{"getOnuDataResponse",
-			args{createOnuDataResponse(32768, 10, 129), 2},
-			want{2, map[string]interface{}{"MibDataSync": uint8(129)}},
+			getArgs{createOnuDataResponse(32768, 10, 129), 2},
+			getWant{2, map[string]interface{}{"MibDataSync": uint8(129)}},
 		},
 	}
 	for _, tt := range tests {
diff --git a/internal/common/omci/set.go b/internal/common/omci/set.go
index 64f048f..73e17df 100644
--- a/internal/common/omci/set.go
+++ b/internal/common/omci/set.go
@@ -40,7 +40,7 @@
 	return msgObj, nil
 }
 
-func CreateSetResponse(omciPkt gopacket.Packet, omciMsg *omci.OMCI) ([]byte, error) {
+func CreateSetResponse(omciPkt gopacket.Packet, omciMsg *omci.OMCI, result me.Results) ([]byte, error) {
 
 	msgObj, err := ParseSetRequest(omciPkt)
 
@@ -59,7 +59,7 @@
 			EntityClass:    msgObj.EntityClass,
 			EntityInstance: msgObj.EntityInstance,
 		},
-		Result: me.Success,
+		Result: result,
 	}
 
 	pkt, err := Serialize(omci.SetResponseType, response, omciMsg.TransactionID)