VOL-669 Openolt adapter - Improve indication handling.
Change-Id: Ic6d4791075b78574da88b7c3f743752f14b0f84f
diff --git a/voltha/adapters/openolt/openolt_device.py b/voltha/adapters/openolt/openolt_device.py
index 99fe0ba..804bc0e 100644
--- a/voltha/adapters/openolt/openolt_device.py
+++ b/voltha/adapters/openolt/openolt_device.py
@@ -20,6 +20,8 @@
import collections
import time
+from twisted.internet import reactor
+
from voltha.protos.device_pb2 import Port, Device
from voltha.protos.common_pb2 import OperStatus, AdminState, ConnectStatus
from voltha.protos.logical_device_pb2 import LogicalDevice
@@ -87,29 +89,29 @@
self.stub = openolt_pb2_grpc.OpenoltStub(self.channel)
# Start indications thread
- self.indications = self.stub.EnableIndication(openolt_pb2.Empty())
self.indications_thread = threading.Thread(target=self.process_indication)
self.indications_thread.daemon = True
self.indications_thread.start()
def process_indication(self):
+ self.indications = self.stub.EnableIndication(openolt_pb2.Empty())
while 1:
+ # get the next indication from olt
ind = next(self.indications)
self.log.debug("rx indication", indication=ind)
+ # schedule indication handlers to be run in the main event loop
if ind.HasField('olt_ind'):
- self.olt_indication(ind.olt_ind)
+ reactor.callFromThread(self.olt_indication, ind.olt_ind)
elif ind.HasField('intf_ind'):
- self.intf_indication(ind.intf_ind)
+ reactor.callFromThread(self.intf_indication, ind.intf_ind)
elif ind.HasField('intf_oper_ind'):
- self.intf_oper_indication(ind.intf_oper_ind)
+ reactor.callFromThread(self.intf_oper_indication, ind.intf_oper_ind)
elif ind.HasField('onu_disc_ind'):
- self.onu_discovery_indication(ind.onu_disc_ind)
+ reactor.callFromThread(self.onu_discovery_indication, ind.onu_disc_ind)
elif ind.HasField('onu_ind'):
- self.onu_indication(ind.onu_ind)
+ reactor.callFromThread(self.onu_indication, ind.onu_ind)
elif ind.HasField('omci_ind'):
- self.omci_indication(ind.omci_ind)
- # Throttle indications
- time.sleep(0.1)
+ reactor.callFromThread(self.omci_indication, ind.omci_ind)
def olt_indication(self, olt_indication):
self.log.debug("olt indication", olt_ind=olt_indication)
@@ -163,15 +165,18 @@
if onu_id is None:
onu_id = self.new_onu_id(onu_disc_indication.intf_id)
- self.add_onu_device(
- onu_disc_indication.intf_id,
- self.intf_id_to_port_no(onu_disc_indication.intf_id, Port.PON_OLT),
- onu_id,
- onu_disc_indication.serial_number)
-
- self.activate_onu(
- onu_disc_indication.intf_id, onu_id,
- serial_number=onu_disc_indication.serial_number)
+ try:
+ self.add_onu_device(
+ onu_disc_indication.intf_id,
+ self.intf_id_to_port_no(onu_disc_indication.intf_id, Port.PON_OLT),
+ onu_id,
+ onu_disc_indication.serial_number)
+ except Exception as e:
+ self.log.exception('onu activation failed', e=e)
+ else:
+ self.activate_onu(
+ onu_disc_indication.intf_id, onu_id,
+ serial_number=onu_disc_indication.serial_number)
else:
# FIXME - handle discovery of already activated onu
self.log.info("onu activation in progress",