Move some blocking grpc calls to threads
Change-Id: I94e9ddcc9ed2806db1387fe2152f3c0795c96758
diff --git a/voltha/adapters/openolt/openolt_device.py b/voltha/adapters/openolt/openolt_device.py
index 2bc036a..0fb5353 100644
--- a/voltha/adapters/openolt/openolt_device.py
+++ b/voltha/adapters/openolt/openolt_device.py
@@ -19,6 +19,7 @@
import time
from scapy.layers.l2 import Ether, Dot1Q
from transitions import Machine
+from twisted.internet import reactor
from voltha.protos.device_pb2 import Port
from voltha.adapters.openolt.protos import openolt_pb2
@@ -318,7 +319,7 @@
def send_proxied_message(self, proxy_address, msg):
omci = openolt_pb2.OmciMsg(intf_id=proxy_address.channel_id,
onu_id=proxy_address.onu_id, pkt=str(msg))
- self._grpc.stub.OmciMsgOut(omci)
+ reactor.callInThread(self._grpc.stub.OmciMsgOut, omci)
def update_flow_table(self, flows):
self.log.debug('No updates here now, all is done in logical flows '
@@ -381,19 +382,9 @@
serial_number=serial_number)
onu = openolt_pb2.Onu(intf_id=intf_id, onu_id=onu_id,
serial_number=serial_number)
- try:
- self._grpc.stub.ActivateOnu(onu)
- except grpc.RpcError as grpc_e:
- if grpc_e.code() == grpc.StatusCode.ALREADY_EXISTS:
- self.log.info('onu activation in progress',
- serial_number=serial_number_str,
- e=grpc_e)
- else:
- self.log.error('onu activation failed',
- serial_number=serial_number_str,
- grpc_error=grpc_e)
- else:
- self.log.info('onu-activated', serial_number=serial_number_str)
+
+ self.log.info('activating onu', serial_number=serial_number_str)
+ reactor.callInThread(self._grpc.stub.ActivateOnu, onu)
# FIXME - instead of passing child_device around, delete_child_device
# needs to change to use serial_number.
diff --git a/voltha/adapters/openolt/openolt_flow_mgr.py b/voltha/adapters/openolt/openolt_flow_mgr.py
index 1150d9a..94a15cf 100644
--- a/voltha/adapters/openolt/openolt_flow_mgr.py
+++ b/voltha/adapters/openolt/openolt_flow_mgr.py
@@ -896,7 +896,7 @@
def add_flow_to_device(self, flow, logical_flow):
self.log.debug('pushing flow to device', flow=flow)
try:
- self.stub.FlowAdd(flow)
+ reactor.callInThread(self.stub.FlowAdd, flow)
except grpc.RpcError as grpc_e:
if grpc_e.code() == grpc.StatusCode.ALREADY_EXISTS:
self.log.warn('flow already exists', e=grpc_e, flow=flow)