Revert "move pkt in into openolt_packet"
This reverts commit f188069d2716ac05ccfd09f0dd88726b126885b1.
Change-Id: Ifb71377c839b560e5af3f48018245c18ac26a3ac
diff --git a/voltha/adapters/openolt/grpc/openolt_grpc.py b/voltha/adapters/openolt/grpc/openolt_grpc.py
index 05605ee..1c47e5c 100644
--- a/voltha/adapters/openolt/grpc/openolt_grpc.py
+++ b/voltha/adapters/openolt/grpc/openolt_grpc.py
@@ -56,8 +56,7 @@
stub = openolt_pb2_grpc.OpenoltStub(channel)
stream = stub.EnableIndication(openolt_pb2.Empty())
- default_topic = 'openolt.ind-{}'.format(host_and_port.split(':')[0])
- pktin_topic = 'openolt.pktin-{}'.format(host_and_port.split(':')[0])
+ topic = 'openolt.ind-{}'.format(host_and_port.split(':')[0])
while True:
try:
@@ -67,14 +66,11 @@
log.warn('openolt grpc connection lost', error=e)
ind = openolt_pb2.Indication()
ind.olt_ind.oper_state = 'down'
- kafka_send_pb(default_topic, ind)
+ kafka_send_pb(topic, ind)
break
else:
log.debug("openolt grpc rx indication", indication=ind)
- if ind.HasField('pkt_ind'):
- kafka_send_pb(pktin_topic, ind)
- else:
- kafka_send_pb(default_topic, ind)
+ kafka_send_pb(topic, ind)
if __name__ == '__main__':
diff --git a/voltha/adapters/openolt/openolt_device.py b/voltha/adapters/openolt/openolt_device.py
index eec0f2c..ef94da7 100644
--- a/voltha/adapters/openolt/openolt_device.py
+++ b/voltha/adapters/openolt/openolt_device.py
@@ -143,11 +143,8 @@
self.data_model.olt_create(self.device_info)
- # FIXME
self._kadmin.delete_topics([
'voltha.pktout-{}'.format(self.data_model.logical_device_id)])
- self._kadmin.delete_topics(['openolt.pktin-{}'.format(
- self.host_and_port.split(':')[0])])
self._packet = OpenoltPacket(self)
self._packet.start()
diff --git a/voltha/adapters/openolt/openolt_indications.py b/voltha/adapters/openolt/openolt_indications.py
index fc5b32e..75f905d 100644
--- a/voltha/adapters/openolt/openolt_indications.py
+++ b/voltha/adapters/openolt/openolt_indications.py
@@ -19,6 +19,8 @@
from simplejson import loads
from twisted.internet import reactor
import structlog
+from scapy.layers.l2 import Ether, Packet
+from common.frameio.frameio import hexify
from voltha.adapters.openolt.protos import openolt_pb2
from voltha.adapters.openolt.openolt_kafka_consumer import KConsumer
@@ -79,6 +81,8 @@
reactor.callFromThread(self.device.onu_indication, ind.onu_ind)
elif ind.HasField('omci_ind'):
reactor.callFromThread(self.device.omci_indication, ind.omci_ind)
+ elif ind.HasField('pkt_ind'):
+ self.send_packet_in(ind.pkt_ind)
elif ind.HasField('port_stats'):
reactor.callFromThread(
self.device.stats_mgr.port_statistics_indication,
@@ -92,3 +96,40 @@
self.device.alarm_mgr.process_alarms, ind.alarm_ind)
else:
self.log.warn('unknown indication type')
+
+ def send_packet_in(self, pkt_indication):
+ self.log.debug("packet indication",
+ intf_type=pkt_indication.intf_type,
+ intf_id=pkt_indication.intf_id,
+ port_no=pkt_indication.port_no,
+ cookie=pkt_indication.cookie,
+ gemport_id=pkt_indication.gemport_id,
+ flow_id=pkt_indication.flow_id)
+ try:
+ logical_port_num = self.device.data_model.logical_port_num(
+ pkt_indication.intf_type,
+ pkt_indication.intf_id,
+ pkt_indication.port_no,
+ pkt_indication.gemport_id)
+ except ValueError:
+ self.log.error('No logical port found',
+ intf_type=pkt_indication.intf_type,
+ intf_id=pkt_indication.intf_id,
+ port_no=pkt_indication.port_no,
+ gemport_id=pkt_indication.gemport_id)
+ return
+
+ ether_pkt = Ether(pkt_indication.pkt)
+
+ if isinstance(ether_pkt, Packet):
+ ether_pkt = str(ether_pkt)
+
+ logical_device_id = self.device.data_model.logical_device_id
+ topic = 'packet-in:' + logical_device_id
+
+ self.log.debug('send-packet-in', logical_device_id=logical_device_id,
+ logical_port_num=logical_port_num,
+ packet=hexify(ether_pkt))
+
+ self.device.data_model.adapter_agent.event_bus.publish(
+ topic, (logical_port_num, str(ether_pkt)))
diff --git a/voltha/adapters/openolt/openolt_packet.py b/voltha/adapters/openolt/openolt_packet.py
index 318456e..1bc77e0 100644
--- a/voltha/adapters/openolt/openolt_packet.py
+++ b/voltha/adapters/openolt/openolt_packet.py
@@ -20,7 +20,6 @@
import structlog
from scapy.layers.l2 import Ether, Dot1Q
import binascii
-from scapy.layers.l2 import Packet
from common.frameio.frameio import hexify
from voltha.protos.openflow_13_pb2 import PacketOut
@@ -34,35 +33,23 @@
def __init__(self, device):
self.log = structlog.get_logger()
self.device = device
-
- self.packet_out_thread_handle = threading.Thread(
- target=self.packet_out_thread)
- self.packet_out_thread_handle.setDaemon(True)
-
- self.packet_in_thread_handle = threading.Thread(
- target=self.packet_in_thread)
- self.packet_in_thread_handle.setDaemon(True)
+ self.packet_thread_handle = threading.Thread(
+ target=self.packet_thread)
+ self.packet_thread_handle.setDaemon(True)
def start(self):
- self.packet_out_thread_handle.start()
- self.packet_in_thread_handle.start()
+ self.packet_thread_handle.start()
def stop(self):
pass
- def packet_out_thread(self):
+ def packet_thread(self):
self.log.debug('openolt packet-out thread starting')
- KConsumer(self.packet_out_process,
+ KConsumer(self.packet_process,
'voltha.pktout-{}'.format(
self.device.data_model.logical_device_id))
- def packet_in_thread(self):
- self.log.debug('openolt packet-in thread starting')
- topic = 'openolt.pktin-{}'.format(
- self.device.host_and_port.split(':')[0])
- KConsumer(self.packet_in_process, topic)
-
- def packet_out_process(self, topic, msg):
+ def packet_process(self, topic, msg):
def get_port_out(opo):
for action in opo.actions:
@@ -147,46 +134,3 @@
self.log.warn('Packet-out-to-this-interface-type-not-implemented',
egress_port=egress_port,
port_type=egress_port_type)
-
- def packet_in_process(self, topic, msg):
-
- ind = Parse(loads(msg), openolt_pb2.Indication(),
- ignore_unknown_fields=True)
- assert(ind.HasField('pkt_ind'))
- pkt_ind = ind.pkt_ind
-
- self.log.debug("packet indication",
- intf_type=pkt_ind.intf_type,
- intf_id=pkt_ind.intf_id,
- port_no=pkt_ind.port_no,
- cookie=pkt_ind.cookie,
- gemport_id=pkt_ind.gemport_id,
- flow_id=pkt_ind.flow_id)
- try:
- logical_port_num = self.device.data_model.logical_port_num(
- pkt_ind.intf_type,
- pkt_ind.intf_id,
- pkt_ind.port_no,
- pkt_ind.gemport_id)
- except ValueError:
- self.log.error('No logical port found',
- intf_type=pkt_ind.intf_type,
- intf_id=pkt_ind.intf_id,
- port_no=pkt_ind.port_no,
- gemport_id=pkt_ind.gemport_id)
- return
-
- ether_pkt = Ether(pkt_ind.pkt)
-
- if isinstance(ether_pkt, Packet):
- ether_pkt = str(ether_pkt)
-
- logical_device_id = self.device.data_model.logical_device_id
- topic = 'packet-in:' + logical_device_id
-
- self.log.debug('send-packet-in', logical_device_id=logical_device_id,
- logical_port_num=logical_port_num,
- packet=hexify(ether_pkt))
-
- self.device.data_model.adapter_agent.event_bus.publish(
- topic, (logical_port_num, str(ether_pkt)))