This commit cleans up the python directory to ensure the adapters
and the cli runs properly.

Change-Id: Ic68a3ecd1f16a5af44296e3c020c808b185f4c18
diff --git a/rw_core/core/adapter_proxy.go b/rw_core/core/adapter_proxy.go
index 643c9de..ab35037 100644
--- a/rw_core/core/adapter_proxy.go
+++ b/rw_core/core/adapter_proxy.go
@@ -250,6 +250,33 @@
 	return nil, nil
 }
 
+func (ap *AdapterProxy) packetOut(deviceType string, deviceId string, outPort uint32, packet *openflow_13.OfpPacketOut) error {
+	log.Debugw("packetOut", log.Fields{"deviceId": deviceId})
+	topic := kafka.Topic{Name: deviceType}
+	rpc := "receive_packet_out"
+	dId := &ca.StrType{Val:deviceId}
+	args := make([]*kafka.KVArg, 3)
+	args[0] = &kafka.KVArg{
+		Key:   "deviceId",
+		Value: dId,
+	}
+	op := &ca.IntType{Val:int64(outPort)}
+	args[1] = &kafka.KVArg{
+		Key:   "outPort",
+		Value: op,
+	}
+	args[2] = &kafka.KVArg{
+		Key:   "packet",
+		Value: packet,
+	}
+
+	// TODO:  Do we need to wait for an ACK on a packet Out?
+	success, result := ap.kafkaProxy.InvokeRPC(nil, rpc, &topic, false, args...)
+	log.Debugw("packetOut", log.Fields{"deviceid": deviceId, "success": success})
+	return unPackResponse(rpc, deviceId, success, result)
+}
+
+
 func (ap *AdapterProxy) UpdateFlowsBulk(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups) error {
 	log.Debugw("UpdateFlowsBulk", log.Fields{"deviceId": device.Id})
 	topic := kafka.Topic{Name: device.Type}
diff --git a/rw_core/core/adapter_request_handler.go b/rw_core/core/adapter_request_handler.go
index 570b445..85e43be 100644
--- a/rw_core/core/adapter_request_handler.go
+++ b/rw_core/core/adapter_request_handler.go
@@ -457,3 +457,41 @@
 
 	return new(empty.Empty), nil
 }
+
+
+func (rhp *AdapterRequestHandlerProxy) PacketIn(args []*ca.Argument) (*empty.Empty, error) {
+	if len(args) < 3 {
+		log.Warn("invalid-number-of-args", log.Fields{"args": args})
+		err := errors.New("invalid-number-of-args")
+		return nil, err
+	}
+	deviceId := &voltha.ID{}
+	portNo := &ca.IntType{}
+	packet := &ca.Packet{}
+	for _, arg := range args {
+		switch arg.Key {
+		case "device_id":
+			if err := ptypes.UnmarshalAny(arg.Value, deviceId); err != nil {
+				log.Warnw("cannot-unmarshal-device-id", log.Fields{"error": err})
+				return nil, err
+			}
+		case "port":
+			if err := ptypes.UnmarshalAny(arg.Value, portNo); err != nil {
+				log.Warnw("cannot-unmarshal-port-no", log.Fields{"error": err})
+				return nil, err
+			}
+		case "packet":
+			if err := ptypes.UnmarshalAny(arg.Value, packet); err != nil {
+				log.Warnw("cannot-unmarshal-packet", log.Fields{"error": err})
+				return nil, err
+			}
+
+		}
+	}
+	log.Debugw("PacketIn", log.Fields{"deviceId": deviceId.Id, "port": portNo.Val,  "packet": packet})
+	if rhp.TestMode { // Execute only for test cases
+		return nil, nil
+	}
+	go rhp.deviceMgr.PacketIn(deviceId.Id, uint32(portNo.Val), packet.Payload)
+	return new(empty.Empty), nil
+}
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index e045fc9..92f00bf 100644
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -33,6 +33,7 @@
 
 type DeviceAgent struct {
 	deviceId         string
+	deviceType 		string
 	lastData         *voltha.Device
 	adapterProxy     *AdapterProxy
 	deviceMgr        *DeviceManager
@@ -60,6 +61,7 @@
 		cloned.Vlan = device.ProxyAddress.ChannelId
 	}
 	agent.deviceId = cloned.Id
+	agent.deviceType = cloned.Type
 	agent.lastData = cloned
 	agent.deviceMgr = deviceMgr
 	agent.exitChannel = make(chan int, 1)
@@ -368,6 +370,16 @@
 	}
 }
 
+func (agent *DeviceAgent) packetOut(outPort uint32, packet *ofp.OfpPacketOut) error {
+	//	Send packet to adapter
+	if err := agent.adapterProxy.packetOut(agent.deviceType, agent.deviceId, outPort, packet); err != nil {
+		log.Debugw("packet-out-error", log.Fields{"id": agent.lastData.Id, "error": err})
+		return err
+	}
+	return nil
+}
+
+
 // TODO: implement when callback from the data model is ready
 // processUpdate is a callback invoked whenever there is a change on the device manages by this device agent
 func (agent *DeviceAgent) processUpdate(args ...interface{}) interface{} {
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index b2ab478..c4ac343 100644
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -362,6 +362,34 @@
 	return nil
 }
 
+func (dMgr *DeviceManager) packetOut(deviceId string, outPort uint32, packet *ofp.OfpPacketOut) error {
+	log.Debugw("packetOut", log.Fields{"deviceId": deviceId, "outPort": outPort})
+	if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
+		return agent.packetOut(outPort, packet)
+	}
+	return status.Errorf(codes.NotFound, "%s", deviceId)
+}
+
+func (dMgr *DeviceManager) PacketIn(deviceId string, port uint32, packet []byte) error {
+	log.Debugw("PacketIn", log.Fields{"deviceId": deviceId, "port": port})
+	// Get the logical device Id based on the deviceId
+	var device *voltha.Device
+	var err error
+	if device, err = dMgr.GetDevice(deviceId); err != nil {
+		log.Errorw("device-not-found", log.Fields{"deviceId": deviceId})
+		return err
+	}
+	if !device.Root{
+		log.Errorw("device-not-root", log.Fields{"deviceId": deviceId})
+		return status.Errorf(codes.FailedPrecondition, "%s", deviceId)
+	}
+
+	if err := dMgr.logicalDeviceMgr.packetIn(device.ParentId, port, packet); err != nil {
+		return err
+	}
+	return nil
+}
+
 func (dMgr *DeviceManager) createLogicalDevice(cDevice *voltha.Device) error {
 	log.Info("createLogicalDevice")
 	var logicalId *string
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index 5c9eced..4f53474 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -1023,3 +1023,19 @@
 	}
 	return nil
 }
+
+func (agent *LogicalDeviceAgent) packetOut(packet *ofp.OfpPacketOut ) {
+	log.Debugw("packet-out", log.Fields{"packet": packet.GetInPort()})
+	outPort := fd.GetPacketOutPort(packet)
+	//frame := packet.GetData()
+	//TODO: Use a channel between the logical agent and the device agent
+	agent.deviceMgr.packetOut(agent.rootDeviceId, outPort, packet)
+}
+
+
+func (agent *LogicalDeviceAgent) packetIn(port uint32, packet []byte) {
+	log.Debugw("packet-in", log.Fields{"port": port, "packet": packet})
+	packet_in := fd.MkPacketIn(port, packet)
+	log.Debugw("sending-packet-in", log.Fields{"packet-in": packet_in})
+}
+
diff --git a/rw_core/core/logical_device_manager.go b/rw_core/core/logical_device_manager.go
index 9d365aa..4625518 100644
--- a/rw_core/core/logical_device_manager.go
+++ b/rw_core/core/logical_device_manager.go
@@ -324,3 +324,23 @@
 	}
 	sendAPIResponse(ctx, ch, res)
 }
+
+func (ldMgr *LogicalDeviceManager) packetOut( packetOut *openflow_13.PacketOut) {
+	log.Debugw("packetOut", log.Fields{"logicalDeviceId": packetOut.Id})
+	if agent := ldMgr.getLogicalDeviceAgent(packetOut.Id); agent != nil {
+		agent.packetOut(packetOut.PacketOut)
+	} else {
+		log.Error("logical-device-not-exist", log.Fields{"logicalDeviceId": packetOut.Id})
+	}
+}
+
+func (ldMgr *LogicalDeviceManager) packetIn(logicalDeviceId string, port uint32, packet []byte) error {
+	log.Debugw("packetIn", log.Fields{"logicalDeviceId": logicalDeviceId, "port": port})
+	if agent := ldMgr.getLogicalDeviceAgent(logicalDeviceId); agent != nil {
+		agent.packetIn(port, packet)
+	} else {
+		log.Error("logical-device-not-exist", log.Fields{"logicalDeviceId": logicalDeviceId})
+	}
+	return nil
+}
+
diff --git a/rw_core/flow_decomposition/flow_decomposer.go b/rw_core/flow_decomposition/flow_decomposer.go
index 284bef2..bd0e591 100644
--- a/rw_core/flow_decomposition/flow_decomposer.go
+++ b/rw_core/flow_decomposition/flow_decomposer.go
@@ -407,6 +407,18 @@
 	}
 }
 
+func GetPacketOutPort(packet *ofp.OfpPacketOut) uint32 {
+	if packet == nil {
+		return 0
+	}
+	for _, action := range packet.GetActions() {
+		if action.Type == OUTPUT {
+			return action.GetOutput().Port
+		}
+	}
+	return 0
+}
+
 func GetOutPort(flow *ofp.OfpFlowStats) uint32 {
 	if flow == nil {
 		return 0
@@ -696,6 +708,24 @@
 	return mod
 }
 
+func MkPacketIn(port uint32, packet []byte) *ofp.OfpPacketIn {
+	packetIn := &ofp.OfpPacketIn{
+		Reason: ofp.OfpPacketInReason_OFPR_ACTION,
+		Match: &ofp.OfpMatch{
+			Type: ofp.OfpMatchType_OFPMT_OXM,
+			OxmFields: []*ofp.OfpOxmField{
+				{
+					OxmClass:ofp.OfpOxmClass_OFPXMC_OPENFLOW_BASIC,
+					Field: &ofp.OfpOxmField_OfbField{
+						OfbField: InPort(port)},
+				},
+			},
+		},
+		Data:packet,
+	}
+	return packetIn
+}
+
 // MkFlowStat is a helper method to build flows
 func MkFlowStat(fa *fu.FlowArgs) *ofp.OfpFlowStats {
 	//Build the matchfields