Packet-in channel plumbed thru whole stack

Change-Id: I1df0265139259cc1757f29af19132c5384943c15
diff --git a/ofagent/converter.py b/ofagent/converter.py
index 7252c43..4d40692 100644
--- a/ofagent/converter.py
+++ b/ofagent/converter.py
@@ -71,6 +71,9 @@
         if field_type == pb2.OFPXMT_OFB_ETH_TYPE:
             loxi_match_fields.append(
                 of13.oxm.eth_type(value=ofb_field['eth_type']))
+        elif field_type == pb2.OFPXMT_OFB_IN_PORT:
+            loxi_match_fields.append(
+                of13.oxm.in_port(value=ofb_field['port']))
         else:
             raise NotImplementedError(
                 'OXM match field for type %s' % field_type)
@@ -102,11 +105,18 @@
     del kw['id']
     return of13.flow_stats_entry(**kw)
 
+
 def ofp_packet_in_to_loxi_packet_in(pb):
-    kw = pb2dict(pb)
-    if 'match' in kw:
-        kw['match'] = make_loxi_match(kw['match'])
-    return of13.message.packet_in(**kw)
+    packet_in = of13.message.packet_in(
+        buffer_id=pb.buffer_id,
+        reason=pb.reason,
+        table_id=pb.table_id,
+        cookie=pb.cookie,
+        match=make_loxi_match(pb2dict(pb.match)),
+        data=pb.data
+    )
+    return packet_in
+
 
 def ofp_group_entry_to_loxi_group_entry(pb):
     return of13.group_stats_entry(
diff --git a/ofagent/of_protocol_handler.py b/ofagent/of_protocol_handler.py
index 5ac8712..86f8cb2 100644
--- a/ofagent/of_protocol_handler.py
+++ b/ofagent/of_protocol_handler.py
@@ -257,6 +257,7 @@
     }
 
     def forward_packet_in(self, ofp_packet_in):
+        log.info('sending-packet-in', ofp_packet_in=ofp_packet_in)
         self.cxn.send(to_loxi(ofp_packet_in))
 
     def forward_port_status(self, ofp_port_status):
diff --git a/voltha/adapters/interface.py b/voltha/adapters/interface.py
index bb40eb0..e59d5dd 100644
--- a/voltha/adapters/interface.py
+++ b/voltha/adapters/interface.py
@@ -228,4 +228,13 @@
         :return:
         """
 
+    def send_packet_in(logical_device_id, logical_port_no, packet):
+        """
+        Forward given packet to the northbound toward an SDN controller.
+        :param device_id: logical device identifier
+        :param logical_port_no: logical port_no (as numbered in openflow)
+        :param packet: the actual packet; can be a serialized string or a scapy
+                       Packet.
+        :return: None returned on success
+        """
 
diff --git a/voltha/adapters/simulated_olt/simulated_olt.py b/voltha/adapters/simulated_olt/simulated_olt.py
index 8dc8fa2..d407c4b 100644
--- a/voltha/adapters/simulated_olt/simulated_olt.py
+++ b/voltha/adapters/simulated_olt/simulated_olt.py
@@ -21,6 +21,7 @@
 
 import structlog
 from klein import Klein
+from scapy.layers.l2 import Ether, EAPOL, Padding
 from twisted.internet import endpoints
 from twisted.internet import reactor
 from twisted.internet.defer import inlineCallbacks
@@ -234,6 +235,9 @@
             )
             self.adapter_agent.add_logical_port(ld.id, port)
 
+        olt.parent_id = ld.id
+        self.adapter_agent.update_device(olt)
+
     @inlineCallbacks
     def _simulate_device_activation(self, device):
 
@@ -371,12 +375,26 @@
 
     # ~~~~~~~~~~~~~~~~~~~~ Embedded test Klein rest server ~~~~~~~~~~~~~~~~~~~~
 
-    @app.route('/devices/<string:id>/detect_onus')
-    def detect_onu(self, request, **kw):
-        log.info('detect-onus', request=request, **kw)
-        device_id = kw['id']
+    def get_test_control_site(self):
+        return Site(self.app.resource())
+
+    @app.route('/devices/<string:device_id>/detect_onus')
+    def detect_onus(self, request, device_id):
+        log.info('detect-onus', request=request, device_id=device_id)
         self._simulate_detection_of_onus(device_id)
         return '{"status": "OK"}'
 
-    def get_test_control_site(self):
-        return Site(self.app.resource())
+    @app.route('/devices/<string:device_id>/test_eapol_in')
+    def test_eapol_in(self, request, device_id):
+        """Simulate a packet in message posted upstream"""
+        log.info('test_eapol_in', request=request, device_id=device_id)
+        eapol_start = str(
+            (Ether(src='00:11:22:33:44:55') / EAPOL(type=1))
+            / Padding(load=42*'\x00')
+        )
+
+        device = self.adapter_agent.get_device(device_id)
+        self.adapter_agent.send_packet_in(logical_device_id=device.parent_id,
+                                          logical_port_no=1,
+                                          packet=eapol_start)
+        return '{"status": "sent"}'
diff --git a/voltha/core/adapter_agent.py b/voltha/core/adapter_agent.py
index 83b4e89..2a6c61b 100644
--- a/voltha/core/adapter_agent.py
+++ b/voltha/core/adapter_agent.py
@@ -20,6 +20,7 @@
 from uuid import uuid4
 
 import structlog
+from scapy.packet import Packet
 from twisted.internet.defer import inlineCallbacks, returnValue
 from zope.interface import implementer
 
@@ -251,3 +252,15 @@
     def receive_proxied_message(self, proxy_address, msg):
         topic = self._gen_rx_proxy_address_topic(proxy_address)
         self.event_bus.publish(topic, msg)
+
+    # ~~~~~~~~~~~~~~~~~~ Handling packet-in and packet-out ~~~~~~~~~~~~~~~~~~~~
+
+    def send_packet_in(self, logical_device_id, logical_port_no, packet):
+        self.log.debug('send-packet-in', logical_device_id=logical_device_id,
+                       logical_port_no=logical_port_no, packet=packet)
+
+        if isinstance(packet, Packet):
+            packet = str(packet)
+
+        topic = 'packet-in:' + logical_device_id
+        self.event_bus.publish(topic, (logical_port_no, packet))
diff --git a/voltha/core/logical_device_agent.py b/voltha/core/logical_device_agent.py
index ec06379..afc4ef3 100644
--- a/voltha/core/logical_device_agent.py
+++ b/voltha/core/logical_device_agent.py
@@ -22,6 +22,7 @@
 
 import structlog
 
+from common.event_bus import EventBusClient
 from voltha.core.config.config_proxy import CallbackType
 from voltha.core.device_graph import DeviceGraph
 from voltha.core.flow_decomposer import FlowDecomposer, \
@@ -45,7 +46,6 @@
     def __init__(self, core, logical_device):
         self.core = core
         self.local_handler = core.get_local_handler()
-        self.grpc_server = registry('grpc_server')
         self.logical_device_id = logical_device.id
 
         self.root_proxy = core.get_proxy('/')
@@ -65,14 +65,15 @@
         self.self_proxy.register_callback(
             CallbackType.POST_REMOVE, self._port_removed)
 
+        self.event_bus = EventBusClient()
+        self.packet_in_subscription = self.event_bus.subscribe(
+            topic='packet-in:{}'.format(logical_device.id),
+            callback=self.handle_packet_in_event)
+
         self.log = structlog.get_logger(logical_device_id=logical_device.id)
 
         self._routes = None
 
-        self.log = structlog.get_logger(logical_device_id=logical_device.id)
-
-        self.log = structlog.get_logger(logical_device_id=logical_device.id)
-
     def start(self):
         self.log.debug('starting')
         self.log.info('started')
@@ -127,10 +128,8 @@
             self.flow_modify_strict(flow_mod)
 
         else:
-            self.log.warn('unhandled-flow-mod', command=command, flow_mod=flow_mod)
-
-    # def list_flows(self):
-    #     return self.flows
+            self.log.warn('unhandled-flow-mod',
+                          command=command, flow_mod=flow_mod)
 
     def update_group_table(self, group_mod):
 
@@ -146,13 +145,10 @@
             self.group_modify(group_mod)
 
         else:
-            self.log.warn('unhandled-group-mod', command=command,
-                     group_mod=group_mod)
+            self.log.warn('unhandled-group-mod',
+                          command=command, group_mod=group_mod)
 
-    def list_groups(self):
-        return self.groups.values()
-
-    ## <=============== LOW LEVEL FLOW HANDLERS ==============================>
+    # ~~~~~~~~~~~~~~~~~~~~~~~~~ LOW LEVEL FLOW HANDLERS ~~~~~~~~~~~~~~~~~~~~~~~
 
     def flow_add(self, mod):
         assert isinstance(mod, ofp.ofp_flow_mod)
@@ -379,7 +375,7 @@
 
         return bool(to_delete), flows
 
-    ## <=============== LOW LEVEL GROUP HANDLERS =============================>
+    # ~~~~~~~~~~~~~~~~~~~~~ LOW LEVEL GROUP HANDLERS ~~~~~~~~~~~~~~~~~~~~~~~~~~
 
     def group_add(self, group_mod):
         assert isinstance(group_mod, ofp.ofp_group_mod)
@@ -450,7 +446,7 @@
         if changed:
             self.groups_proxy.update('/', FlowGroups(items=groups.values()))
 
-    ## <=============== PACKET_OUT ===========================================>
+    # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ PACKET_OUT ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
     def packet_out(self, ofp_packet_out):
         self.log.debug('packet-out', packet=ofp_packet_out)
@@ -464,14 +460,36 @@
                 data=ofp_packet_out.data
             ))
 
-    ## <=============== PACKET_IN ============================================>
+    # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ PACKET_IN ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+    def handle_packet_in_event(self, _, msg):
+        self.log.debug('handle-packet-in', msg=msg)
+        logical_port_no, packet = msg
+        packet_in = ofp.ofp_packet_in(
+            # buffer_id=0,
+            reason=ofp.OFPR_ACTION,
+            # table_id=0,
+            # cookie=0,
+            match=ofp.ofp_match(
+                type=ofp.OFPMT_OXM,
+                oxm_fields=[
+                    ofp.ofp_oxm_field(
+                        oxm_class=ofp.OFPXMC_OPENFLOW_BASIC,
+                        ofb_field=in_port(logical_port_no)
+                    )
+                ]
+            ),
+            data=packet
+        )
+        self.packet_in(packet_in)
 
     def packet_in(self, ofp_packet_in):
         # TODO
         print 'PACKET_IN:', ofp_packet_in
-        self.grpc_server.send_packet_in(self.logical_device_id, ofp_packet_in)
+        self.local_handler.send_packet_in(
+            self.logical_device_id, ofp_packet_in)
 
-    ## <======================== FLOW TABLE UPDATE HANDLING ===================
+    # ~~~~~~~~~~~~~~~~~~~~~ FLOW TABLE UPDATE HANDLING ~~~~~~~~~~~~~~~~~~~~~~~~
 
     def _flow_table_updated(self, flows):
         self.log.debug('flow-table-updated',
@@ -490,7 +508,7 @@
             self.root_proxy.update('/devices/{}/flow_groups'.format(device_id),
                                    FlowGroups(items=groups.values()))
 
-    ## <======================= GROUP TABLE UPDATE HANDLING ===================
+    # ~~~~~~~~~~~~~~~~~~~~ GROUP TABLE UPDATE HANDLING ~~~~~~~~~~~~~~~~~~~~~~~~
 
     def _group_table_updated(self, flow_groups):
         self.log.debug('group-table-updated',
@@ -505,7 +523,7 @@
             self.root_proxy.update('/devices/{}/flow_groups'.format(device_id),
                                    FlowGroups(items=groups.values()))
 
-    ## <==================== APIs NEEDED BY FLOW DECOMPOSER ===================
+    # ~~~~~~~~~~~~~~~~~~~ APIs NEEDED BY FLOW DECOMPOSER ~~~~~~~~~~~~~~~~~~~~~~
 
     def _port_added(self, port):
         assert isinstance(port, LogicalPort)