Packet-In/Out Support and Removed default Leaf rules as it is overriding ONU default 4091 flows
Bug fixes in pon resource manager

Change-Id: Iafbbfa7360fec3b5f3f4d591f65cbcd8c8215eec
diff --git a/adapters/common/core_proxy.go b/adapters/common/core_proxy.go
index e1a7e34..e2d79fc 100644
--- a/adapters/common/core_proxy.go
+++ b/adapters/common/core_proxy.go
@@ -417,3 +417,32 @@
 		return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
 	}
 }
+
+func (ap *CoreProxy) SendPacketIn(ctx context.Context, deviceId string, port uint32, pktPayload []byte) error {
+	log.Debugw("SendPacketIn", log.Fields{"deviceId": deviceId, "port": port, "pktPayload": pktPayload})
+	rpc := "PacketIn"
+	// Use a device specific topic to send the request.  The adapter handling the device creates a device
+	// specific topic
+	toTopic := ap.getCoreTopic(deviceId)
+	replyToTopic := ap.getAdapterTopic()
+
+	args := make([]*kafka.KVArg, 3)
+	id := &voltha.ID{Id: deviceId}
+	args[0] = &kafka.KVArg{
+		Key:   "device_id",
+		Value: id,
+	}
+	portNo := &ic.IntType{Val: int64(port)}
+	args[1] = &kafka.KVArg{
+		Key:   "port",
+		Value: portNo,
+	}
+	pkt := &ic.Packet{Payload: pktPayload}
+	args[2] = &kafka.KVArg{
+		Key:   "packet",
+		Value: pkt,
+	}
+	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, deviceId, args...)
+	log.Debugw("ChildDeviceDetected-response", log.Fields{"pDeviceId": deviceId, "success": success})
+	return unPackResponse(rpc, deviceId, success, result)
+}
diff --git a/adapters/common/request_handler.go b/adapters/common/request_handler.go
index e0c7860..3ac5c4f 100644
--- a/adapters/common/request_handler.go
+++ b/adapters/common/request_handler.go
@@ -284,6 +284,45 @@
 }
 
 func (rhp *RequestHandlerProxy) Receive_packet_out(args []*ic.Argument) (*empty.Empty, error) {
+	log.Debugw("Receive_packet_out", log.Fields{"args": args})
+	if len(args) < 3 {
+		log.Warn("Receive_packet_out-invalid-number-of-args", log.Fields{"args": args})
+		err := errors.New("invalid-number-of-args")
+		return nil, err
+	}
+	deviceId := &ic.StrType{}
+	egressPort := &ic.IntType{}
+	packet := &openflow_13.OfpPacketOut{}
+	transactionID := &ic.StrType{}
+	for _, arg := range args {
+		switch arg.Key {
+		case "deviceId":
+			if err := ptypes.UnmarshalAny(arg.Value, deviceId); err != nil {
+				log.Warnw("cannot-unmarshal-deviceId", log.Fields{"error": err})
+				return nil, err
+			}
+		case "outPort":
+			if err := ptypes.UnmarshalAny(arg.Value, egressPort); err != nil {
+				log.Warnw("cannot-unmarshal-egressPort", log.Fields{"error": err})
+				return nil, err
+			}
+		case "packet":
+			if err := ptypes.UnmarshalAny(arg.Value, packet); err != nil {
+				log.Warnw("cannot-unmarshal-packet", log.Fields{"error": err})
+				return nil, err
+			}
+		case kafka.TransactionKey:
+			if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+				log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+				return nil, err
+			}
+		}
+	}
+	log.Debugw("Receive_packet_out", log.Fields{"deviceId": deviceId.Val, "outPort": egressPort, "packet": packet})
+	//Invoke the adopt device on the adapter
+	if err := rhp.adapter.Receive_packet_out(deviceId.Val, int(egressPort.Val), packet); err != nil {
+		return nil, status.Errorf(codes.NotFound, "%s", err.Error())
+	}
 	return new(empty.Empty), nil
 }
 
diff --git a/adapters/iAdapter.go b/adapters/iAdapter.go
index 017082d..05df234 100644
--- a/adapters/iAdapter.go
+++ b/adapters/iAdapter.go
@@ -38,7 +38,7 @@
 	Update_flows_bulk(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups) error
 	Update_flows_incrementally(device *voltha.Device, flows *openflow_13.FlowChanges, groups *openflow_13.FlowGroupChanges) error
 	Update_pm_config(device *voltha.Device, pm_configs *voltha.PmConfigs) error
-	Receive_packet_out(device *voltha.Device, egress_port_no int, msg openflow_13.PacketOut) error
+	Receive_packet_out(deviceId string, egress_port_no int, msg *openflow_13.OfpPacketOut) error
 	Suppress_alarm(filter *voltha.AlarmFilter) error
 	Unsuppress_alarm(filter *voltha.AlarmFilter) error
 	Get_ofp_device_info(device *voltha.Device) (*ic.SwitchCapability, error)
diff --git a/adapters/simulated_olt/adaptercore/simulated_olt.go b/adapters/simulated_olt/adaptercore/simulated_olt.go
index d29fc1e..dd40b27 100644
--- a/adapters/simulated_olt/adaptercore/simulated_olt.go
+++ b/adapters/simulated_olt/adaptercore/simulated_olt.go
@@ -227,7 +227,7 @@
 	return errors.New("UnImplemented")
 }
 
-func (so *SimulatedOLT) Receive_packet_out(device *voltha.Device, egress_port_no int, msg openflow_13.PacketOut) error {
+func (so *SimulatedOLT) Receive_packet_out(deviceId string, egress_port_no int, msg *openflow_13.OfpPacketOut) error {
 	return errors.New("UnImplemented")
 }
 
diff --git a/adapters/simulated_onu/adaptercore/simulated_onu.go b/adapters/simulated_onu/adaptercore/simulated_onu.go
index 1bdd4bb..e9fa82e 100644
--- a/adapters/simulated_onu/adaptercore/simulated_onu.go
+++ b/adapters/simulated_onu/adaptercore/simulated_onu.go
@@ -219,7 +219,7 @@
 	return errors.New("UnImplemented")
 }
 
-func (so *SimulatedONU) Receive_packet_out(device *voltha.Device, egress_port_no int, msg openflow_13.PacketOut) error {
+func (so *SimulatedONU) Receive_packet_out(deviceId string, egress_port_no int, msg *openflow_13.OfpPacketOut) error {
 	return errors.New("UnImplemented")
 }
 
diff --git a/common/ponresourcemanager/ponresourcemanager.go b/common/ponresourcemanager/ponresourcemanager.go
index c37307b..2873dbc 100755
--- a/common/ponresourcemanager/ponresourcemanager.go
+++ b/common/ponresourcemanager/ponresourcemanager.go
@@ -133,6 +133,7 @@
 	SharedResourceMgrs map[string]*PONResourceManager
 	SharedIdxByType    map[string]string
 	IntfIDs            []uint32 // list of pon interface IDs
+	Globalorlocal      string
 }
 
 func newKVClient(storeType string, address string, timeout int) (kvstore.Client, error) {
@@ -254,10 +255,14 @@
 	log.Debugf("update ranges for %s, %d", StartIDx, StartID)
 
 	if StartID != 0 {
-		PONRMgr.PonResourceRanges[StartIDx] = StartID
+		if (PONRMgr.PonResourceRanges[StartIDx] == nil) || (PONRMgr.PonResourceRanges[StartIDx].(uint32) < StartID) {
+			PONRMgr.PonResourceRanges[StartIDx] = StartID
+		}
 	}
 	if EndID != 0 {
-		PONRMgr.PonResourceRanges[EndIDx] = EndID
+		if (PONRMgr.PonResourceRanges[EndIDx] == nil) || (PONRMgr.PonResourceRanges[EndIDx].(uint32) > EndID) {
+			PONRMgr.PonResourceRanges[EndIDx] = EndID
+		}
 	}
 	//if SharedPoolID != 0 {
 	PONRMgr.PonResourceRanges[SharedIDx] = SharedPoolID
@@ -567,6 +572,7 @@
 	if SharedResourceMgr != nil && PONRMgr != SharedResourceMgr {
 		return SharedResourceMgr.GetResourceID(IntfID, ResourceType, NumIDs)
 	}
+	log.Debugf("Fetching resource from %s rsrc mgr for resource %s", PONRMgr.Globalorlocal, ResourceType)
 
 	Path := PONRMgr.GetPath(IntfID, ResourceType)
 	if Path == "" {
diff --git a/common/techprofile/tech_profile.go b/common/techprofile/tech_profile.go
index 00cfdc1..2879e99 100644
--- a/common/techprofile/tech_profile.go
+++ b/common/techprofile/tech_profile.go
@@ -382,6 +382,7 @@
 		log.Errorw("Error getting alloc id from rsrcrMgr", log.Fields{"intfId": intfId, "numTconts": numOfTconts})
 		return nil
 	}
+	log.Debugw("Num GEM ports in TP:", log.Fields{"NumGemPorts": tp.NumGemPorts})
 	if gemPorts, err = t.resourceMgr.GetResourceID(intfId, t.resourceMgr.GetResourceTypeGemPortID(), tp.NumGemPorts); err != nil {
 		log.Errorw("Error getting gemport ids from rsrcrMgr", log.Fields{"intfId": intfId, "numGemports": tp.NumGemPorts})
 		return nil
@@ -593,7 +594,6 @@
 		}
 	}
 	tconts := []*openolt_pb.Tcont{}
-	// TODO: Fix me , UPSTREAM direction is not proper
 	// upstream scheduler
 	tcont_us := &openolt_pb.Tcont{
 		Direction: usSched.Direction,
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index 7b2783e..1555ca5 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -1058,8 +1058,6 @@
 	for deviceId := range deviceNodeIds {
 		if deviceId == ld.RootDeviceId {
 			rules.AddFlowsAndGroup(deviceId, agent.rootDeviceDefaultRules())
-		} else {
-			rules.AddFlowsAndGroup(deviceId, agent.leafDeviceDefaultRules(deviceId))
 		}
 	}
 	return rules
@@ -1110,8 +1108,6 @@
 	for deviceId := range deviceNodeIds {
 		if deviceId == agent.rootDeviceId {
 			rules.AddFlowsAndGroup(deviceId, agent.rootDeviceDefaultRules())
-		} else {
-			rules.AddFlowsAndGroup(deviceId, agent.leafDeviceDefaultRules(deviceId))
 		}
 	}
 	agent.DefaultFlowRules = rules
@@ -1137,8 +1133,6 @@
 	for deviceId := range deviceNodeIds {
 		if deviceId == agent.rootDeviceId {
 			rules.AddFlowsAndGroup(deviceId, agent.rootDeviceDefaultRules())
-		} else {
-			rules.AddFlowsAndGroup(deviceId, agent.leafDeviceDefaultRules(deviceId))
 		}
 	}
 	agent.DefaultFlowRules = rules
diff --git a/rw_core/utils/flow_utils.go b/rw_core/utils/flow_utils.go
index 0c485bb..c2c9287 100644
--- a/rw_core/utils/flow_utils.go
+++ b/rw_core/utils/flow_utils.go
@@ -171,9 +171,11 @@
 
 func (dr *DeviceRules) Copy() *DeviceRules {
 	copyDR := NewDeviceRules()
-	for key, val := range dr.Rules {
-		if val != nil {
-			copyDR.Rules[key] = val.Copy()
+	if dr != nil {
+		for key, val := range dr.Rules {
+			if val != nil {
+				copyDR.Rules[key] = val.Copy()
+			}
 		}
 	}
 	return copyDR