Revert "Revert "move pkt in into openolt_packet""

This reverts commit 8793c13bc19e3f0100856d34c5dd3c66b0847d27.

Change-Id: If34ee560dc935143ad45192e9b859a492b505e32
diff --git a/voltha/adapters/openolt/grpc/openolt_grpc.py b/voltha/adapters/openolt/grpc/openolt_grpc.py
index 09c9d1d..2d04723 100644
--- a/voltha/adapters/openolt/grpc/openolt_grpc.py
+++ b/voltha/adapters/openolt/grpc/openolt_grpc.py
@@ -39,7 +39,7 @@
     stream = stub.EnableIndication(openolt_pb2.Empty())
 
     default_topic = 'openolt.ind-{}'.format(host_and_port.split(':')[0])
-    # pktin_topic = 'openolt.pktin-{}'.format(host_and_port.split(':')[0])
+    pktin_topic = 'openolt.pktin-{}'.format(host_and_port.split(':')[0])
 
     conf = {'bootstrap.servers': broker}
 
@@ -58,7 +58,10 @@
             break
         else:
             log.debug("openolt grpc rx indication", indication=ind)
-            kafka_send_pb(p, default_topic, ind)
+            if ind.HasField('pkt_ind'):
+                kafka_send_pb(p, pktin_topic, ind)
+            else:
+                kafka_send_pb(p, default_topic, ind)
 
 
 if __name__ == '__main__':
diff --git a/voltha/adapters/openolt/openolt_device.py b/voltha/adapters/openolt/openolt_device.py
index ef94da7..eec0f2c 100644
--- a/voltha/adapters/openolt/openolt_device.py
+++ b/voltha/adapters/openolt/openolt_device.py
@@ -143,8 +143,11 @@
 
         self.data_model.olt_create(self.device_info)
 
+        # FIXME
         self._kadmin.delete_topics([
             'voltha.pktout-{}'.format(self.data_model.logical_device_id)])
+        self._kadmin.delete_topics(['openolt.pktin-{}'.format(
+            self.host_and_port.split(':')[0])])
 
         self._packet = OpenoltPacket(self)
         self._packet.start()
diff --git a/voltha/adapters/openolt/openolt_indications.py b/voltha/adapters/openolt/openolt_indications.py
index 75f905d..fc5b32e 100644
--- a/voltha/adapters/openolt/openolt_indications.py
+++ b/voltha/adapters/openolt/openolt_indications.py
@@ -19,8 +19,6 @@
 from simplejson import loads
 from twisted.internet import reactor
 import structlog
-from scapy.layers.l2 import Ether, Packet
-from common.frameio.frameio import hexify
 
 from voltha.adapters.openolt.protos import openolt_pb2
 from voltha.adapters.openolt.openolt_kafka_consumer import KConsumer
@@ -81,8 +79,6 @@
             reactor.callFromThread(self.device.onu_indication, ind.onu_ind)
         elif ind.HasField('omci_ind'):
             reactor.callFromThread(self.device.omci_indication, ind.omci_ind)
-        elif ind.HasField('pkt_ind'):
-            self.send_packet_in(ind.pkt_ind)
         elif ind.HasField('port_stats'):
             reactor.callFromThread(
                 self.device.stats_mgr.port_statistics_indication,
@@ -96,40 +92,3 @@
                 self.device.alarm_mgr.process_alarms, ind.alarm_ind)
         else:
             self.log.warn('unknown indication type')
-
-    def send_packet_in(self, pkt_indication):
-        self.log.debug("packet indication",
-                       intf_type=pkt_indication.intf_type,
-                       intf_id=pkt_indication.intf_id,
-                       port_no=pkt_indication.port_no,
-                       cookie=pkt_indication.cookie,
-                       gemport_id=pkt_indication.gemport_id,
-                       flow_id=pkt_indication.flow_id)
-        try:
-            logical_port_num = self.device.data_model.logical_port_num(
-                pkt_indication.intf_type,
-                pkt_indication.intf_id,
-                pkt_indication.port_no,
-                pkt_indication.gemport_id)
-        except ValueError:
-            self.log.error('No logical port found',
-                           intf_type=pkt_indication.intf_type,
-                           intf_id=pkt_indication.intf_id,
-                           port_no=pkt_indication.port_no,
-                           gemport_id=pkt_indication.gemport_id)
-            return
-
-        ether_pkt = Ether(pkt_indication.pkt)
-
-        if isinstance(ether_pkt, Packet):
-            ether_pkt = str(ether_pkt)
-
-        logical_device_id = self.device.data_model.logical_device_id
-        topic = 'packet-in:' + logical_device_id
-
-        self.log.debug('send-packet-in', logical_device_id=logical_device_id,
-                       logical_port_num=logical_port_num,
-                       packet=hexify(ether_pkt))
-
-        self.device.data_model.adapter_agent.event_bus.publish(
-            topic, (logical_port_num, str(ether_pkt)))
diff --git a/voltha/adapters/openolt/openolt_packet.py b/voltha/adapters/openolt/openolt_packet.py
index 1bc77e0..318456e 100644
--- a/voltha/adapters/openolt/openolt_packet.py
+++ b/voltha/adapters/openolt/openolt_packet.py
@@ -20,6 +20,7 @@
 import structlog
 from scapy.layers.l2 import Ether, Dot1Q
 import binascii
+from scapy.layers.l2 import Packet
 
 from common.frameio.frameio import hexify
 from voltha.protos.openflow_13_pb2 import PacketOut
@@ -33,23 +34,35 @@
     def __init__(self, device):
         self.log = structlog.get_logger()
         self.device = device
-        self.packet_thread_handle = threading.Thread(
-            target=self.packet_thread)
-        self.packet_thread_handle.setDaemon(True)
+
+        self.packet_out_thread_handle = threading.Thread(
+            target=self.packet_out_thread)
+        self.packet_out_thread_handle.setDaemon(True)
+
+        self.packet_in_thread_handle = threading.Thread(
+            target=self.packet_in_thread)
+        self.packet_in_thread_handle.setDaemon(True)
 
     def start(self):
-        self.packet_thread_handle.start()
+        self.packet_out_thread_handle.start()
+        self.packet_in_thread_handle.start()
 
     def stop(self):
         pass
 
-    def packet_thread(self):
+    def packet_out_thread(self):
         self.log.debug('openolt packet-out thread starting')
-        KConsumer(self.packet_process,
+        KConsumer(self.packet_out_process,
                   'voltha.pktout-{}'.format(
                       self.device.data_model.logical_device_id))
 
-    def packet_process(self, topic, msg):
+    def packet_in_thread(self):
+        self.log.debug('openolt packet-in thread starting')
+        topic = 'openolt.pktin-{}'.format(
+            self.device.host_and_port.split(':')[0])
+        KConsumer(self.packet_in_process, topic)
+
+    def packet_out_process(self, topic, msg):
 
         def get_port_out(opo):
             for action in opo.actions:
@@ -134,3 +147,46 @@
             self.log.warn('Packet-out-to-this-interface-type-not-implemented',
                           egress_port=egress_port,
                           port_type=egress_port_type)
+
+    def packet_in_process(self, topic, msg):
+
+        ind = Parse(loads(msg), openolt_pb2.Indication(),
+                    ignore_unknown_fields=True)
+        assert(ind.HasField('pkt_ind'))
+        pkt_ind = ind.pkt_ind
+
+        self.log.debug("packet indication",
+                       intf_type=pkt_ind.intf_type,
+                       intf_id=pkt_ind.intf_id,
+                       port_no=pkt_ind.port_no,
+                       cookie=pkt_ind.cookie,
+                       gemport_id=pkt_ind.gemport_id,
+                       flow_id=pkt_ind.flow_id)
+        try:
+            logical_port_num = self.device.data_model.logical_port_num(
+                pkt_ind.intf_type,
+                pkt_ind.intf_id,
+                pkt_ind.port_no,
+                pkt_ind.gemport_id)
+        except ValueError:
+            self.log.error('No logical port found',
+                           intf_type=pkt_ind.intf_type,
+                           intf_id=pkt_ind.intf_id,
+                           port_no=pkt_ind.port_no,
+                           gemport_id=pkt_ind.gemport_id)
+            return
+
+        ether_pkt = Ether(pkt_ind.pkt)
+
+        if isinstance(ether_pkt, Packet):
+            ether_pkt = str(ether_pkt)
+
+        logical_device_id = self.device.data_model.logical_device_id
+        topic = 'packet-in:' + logical_device_id
+
+        self.log.debug('send-packet-in', logical_device_id=logical_device_id,
+                       logical_port_num=logical_port_num,
+                       packet=hexify(ether_pkt))
+
+        self.device.data_model.adapter_agent.event_bus.publish(
+            topic, (logical_port_num, str(ether_pkt)))