VOL-1183: OFagent cannot reach vcore after vcore moved to different node in cluster
Two gRPC calls made by OFagent's ConnectionManager, Subscribe and ListLogicalDevices, actually
run in the Twisted reactor thread. On rare occasions, either call never returns when the vcore
connected to the OFagent dies. Because the blocking gRPC runs in the reactor thread, the entire
application is blocked and cannot detect comms errors with the deceased vcore. The handling of
these gRPCs has been moved to the OFagent's internal GrpcClient, which wraps all gRPC calls in
the twisted.internet.threads.deferToThread function.
Change-Id: Ifa3479a3cbca7ae8d7f5e2ca0372e22507d5e4b9
diff --git a/ofagent/connection_mgr.py b/ofagent/connection_mgr.py
index 13ce2a0..2496dd5 100644
--- a/ofagent/connection_mgr.py
+++ b/ofagent/connection_mgr.py
@@ -32,7 +32,6 @@
from grpc_client import GrpcClient
from agent import Agent
-from google.protobuf.empty_pb2 import Empty
from common.utils.dockerhelpers import get_my_containers_name
@@ -40,8 +39,8 @@
# _ = third_party
class ConnectionManager(object):
- def __init__(self, consul_endpoint, vcore_endpoint, controller_endpoints,
- instance_id,
+ def __init__(self, consul_endpoint, vcore_endpoint, vcore_grpc_timeout,
+ controller_endpoints, instance_id,
enable_tls=False, key_file=None, cert_file=None,
vcore_retry_interval=0.5, devices_refresh_interval=5,
subscription_refresh_interval=5):
@@ -51,6 +50,7 @@
self.controller_endpoints = controller_endpoints
self.consul_endpoint = consul_endpoint
self.vcore_endpoint = vcore_endpoint
+ self.grpc_timeout = vcore_grpc_timeout
self.instance_id = instance_id
self.enable_tls = enable_tls
self.key_file = key_file
@@ -160,8 +160,10 @@
# Send subscription request to register the current ofagent instance
container_name = self.instance_id
- stub = voltha_pb2.VolthaLocalServiceStub(self.channel)
- subscription = stub.Subscribe(OfAgentSubscriber(ofagent_id=container_name))
+ if self.grpc_client is None:
+ self.grpc_client = GrpcClient(self, self.channel, self.grpc_timeout)
+ subscription = yield self.grpc_client.subscribe(
+ OfAgentSubscriber(ofagent_id=container_name))
# If the subscriber id matches the current instance
# ... then the subscription has succeeded
@@ -170,7 +172,7 @@
# Keep details on the current GRPC session and subscription
log.debug('subscription-with-vcore-successful', subscription=subscription)
self.subscription = subscription
- self.grpc_client = GrpcClient(self, self.channel).start()
+ self.grpc_client.start()
# Sleep a bit in between each subscribe
yield asleep(self.subscription_refresh_interval)
@@ -204,16 +206,16 @@
while self.running:
log.info('retrieve-logical-device-list')
try:
- stub = voltha_pb2.VolthaLocalServiceStub(self.channel)
- devices = stub.ListLogicalDevices(Empty()).items
+ devices = yield self.grpc_client.list_logical_devices()
for device in devices:
log.info("logical-device-entry", id=device.id, datapath_id=device.datapath_id)
returnValue(devices)
except _Rendezvous, e:
- log.error('vcore-communication-failure', exception=e, status=e.code())
- if e.code() == StatusCode.UNAVAILABLE:
+ status = e.code()
+ log.error('vcore-communication-failure', exception=e, status=status)
+ if status == StatusCode.UNAVAILABLE or status == StatusCode.DEADLINE_EXCEEDED:
os.system("kill -15 {}".format(os.getpid()))
except Exception as e:
@@ -287,7 +289,8 @@
# see https://jira.opencord.org/browse/CORD-821
try:
- if self.channel is not None and self.grpc_client is not None:
+ if self.channel is not None and self.grpc_client is not None and \
+ self.subscription is not None:
# get current list from Voltha
devices = yield self.get_list_of_logical_devices_from_voltha()
diff --git a/ofagent/grpc_client.py b/ofagent/grpc_client.py
index af3ffe4..56af949 100644
--- a/ofagent/grpc_client.py
+++ b/ofagent/grpc_client.py
@@ -38,10 +38,11 @@
class GrpcClient(object):
- def __init__(self, connection_manager, channel):
+ def __init__(self, connection_manager, channel, grpc_timeout):
self.connection_manager = connection_manager
self.channel = channel
+ self.grpc_timeout = grpc_timeout
self.local_stub = VolthaLocalServiceStub(channel)
self.stopped = False
@@ -51,7 +52,7 @@
self.change_event_queue = DeferredQueue() # queue change events
def start(self):
- log.debug('starting')
+ log.debug('starting', grpc_timeout=self.grpc_timeout)
self.start_packet_out_stream()
self.start_packet_in_stream()
self.start_change_event_in_stream()
@@ -82,6 +83,7 @@
try:
self.local_stub.StreamPacketsOut(generator)
except _Rendezvous, e:
+ log.error('grpc-exception', status=e.code())
if e.code() == StatusCode.UNAVAILABLE:
os.system("kill -15 {}".format(os.getpid()))
@@ -100,6 +102,7 @@
packet_in=packet_in,
queue_len=len(self.packet_in_queue.pending))
except _Rendezvous, e:
+ log.error('grpc-exception', status=e.code())
if e.code() == StatusCode.UNAVAILABLE:
os.system("kill -15 {}".format(os.getpid()))
@@ -117,6 +120,7 @@
change_event=event,
queue_len=len(self.change_event_queue.pending))
except _Rendezvous, e:
+ log.error('grpc-exception', status=e.code())
if e.code() == StatusCode.UNAVAILABLE:
os.system("kill -15 {}".format(os.getpid()))
@@ -152,14 +156,14 @@
def get_port(self, device_id, port_id):
req = LogicalPortId(id=device_id, port_id=port_id)
res = yield threads.deferToThread(
- self.local_stub.GetLogicalDevicePort, req)
+ self.local_stub.GetLogicalDevicePort, req, timeout=self.grpc_timeout)
returnValue(res)
@inlineCallbacks
def get_port_list(self, device_id):
req = ID(id=device_id)
res = yield threads.deferToThread(
- self.local_stub.ListLogicalDevicePorts, req)
+ self.local_stub.ListLogicalDevicePorts, req, timeout=self.grpc_timeout)
returnValue(res.items)
@inlineCallbacks
@@ -169,7 +173,7 @@
port_id=port_id
)
res = yield threads.deferToThread(
- self.local_stub.EnableLogicalDevicePort, req)
+ self.local_stub.EnableLogicalDevicePort, req, timeout=self.grpc_timeout)
returnValue(res)
@inlineCallbacks
@@ -179,14 +183,14 @@
port_id=port_id
)
res = yield threads.deferToThread(
- self.local_stub.DisableLogicalDevicePort, req)
+ self.local_stub.DisableLogicalDevicePort, req, timeout=self.grpc_timeout)
returnValue(res)
@inlineCallbacks
def get_device_info(self, device_id):
req = ID(id=device_id)
res = yield threads.deferToThread(
- self.local_stub.GetLogicalDevice, req)
+ self.local_stub.GetLogicalDevice, req, timeout=self.grpc_timeout)
returnValue(res)
@inlineCallbacks
@@ -196,7 +200,7 @@
flow_mod=flow_mod
)
res = yield threads.deferToThread(
- self.local_stub.UpdateLogicalDeviceFlowTable, req)
+ self.local_stub.UpdateLogicalDeviceFlowTable, req, timeout=self.grpc_timeout)
returnValue(res)
@inlineCallbacks
@@ -206,26 +210,38 @@
group_mod=group_mod
)
res = yield threads.deferToThread(
- self.local_stub.UpdateLogicalDeviceFlowGroupTable, req)
+ self.local_stub.UpdateLogicalDeviceFlowGroupTable, req, timeout=self.grpc_timeout)
returnValue(res)
@inlineCallbacks
def list_flows(self, device_id):
req = ID(id=device_id)
res = yield threads.deferToThread(
- self.local_stub.ListLogicalDeviceFlows, req)
+ self.local_stub.ListLogicalDeviceFlows, req, timeout=self.grpc_timeout)
returnValue(res.items)
@inlineCallbacks
def list_groups(self, device_id):
req = ID(id=device_id)
res = yield threads.deferToThread(
- self.local_stub.ListLogicalDeviceFlowGroups, req)
+ self.local_stub.ListLogicalDeviceFlowGroups, req, timeout=self.grpc_timeout)
returnValue(res.items)
@inlineCallbacks
def list_ports(self, device_id):
req = ID(id=device_id)
res = yield threads.deferToThread(
- self.local_stub.ListLogicalDevicePorts, req)
- returnValue(res.items)
\ No newline at end of file
+ self.local_stub.ListLogicalDevicePorts, req, timeout=self.grpc_timeout)
+ returnValue(res.items)
+
+ @inlineCallbacks
+ def list_logical_devices(self):
+ res = yield threads.deferToThread(
+ self.local_stub.ListLogicalDevices, empty_pb2.Empty(), timeout=self.grpc_timeout)
+ returnValue(res.items)
+
+ @inlineCallbacks
+ def subscribe(self, subscriber):
+ res = yield threads.deferToThread(
+ self.local_stub.Subscribe, subscriber, timeout=self.grpc_timeout)
+ returnValue(res)
diff --git a/ofagent/main.py b/ofagent/main.py
index 23335bb..06c2ae3 100755
--- a/ofagent/main.py
+++ b/ofagent/main.py
@@ -203,6 +203,9 @@
self.args = args = parse_args()
self.config = load_config(args)
+ # May want to specify the gRPC timeout as an arg (in future)
+ # Right now, set a default value
+ self.grpc_timeout = 10
verbosity_adjust = (args.verbose or 0) - (args.quiet or 0)
self.log = setup_logging(self.config.get('logging', {}),
@@ -227,7 +230,8 @@
self.log.info('starting-internal-components')
args = self.args
self.connection_manager = yield ConnectionManager(
- args.consul, args.grpc_endpoint, args.controller, args.instance_id, \
+ args.consul, args.grpc_endpoint, self.grpc_timeout,
+ args.controller, args.instance_id,
args.enable_tls, args.key_file, args.cert_file).start()
self.log.info('started-internal-services')
diff --git a/tests/utests/README.md b/tests/utests/README.md
index 731ecfa..7a1c9b1 100644
--- a/tests/utests/README.md
+++ b/tests/utests/README.md
@@ -22,6 +22,5 @@
```
cd /cord/incubator/voltha/
. ./env.sh
-cd ofagent/
nosetests -s tests/utests/ofagent/
```
diff --git a/tests/utests/ofagent/test_connection_mgr.py b/tests/utests/ofagent/test_connection_mgr.py
index 4ca022c..fe7bcdf 100644
--- a/tests/utests/ofagent/test_connection_mgr.py
+++ b/tests/utests/ofagent/test_connection_mgr.py
@@ -17,6 +17,9 @@
class TestConection_mgr(TestCase):
+ # Set a default for the gRPC timeout (seconds)
+ grpc_timeout = 10
+
def gen_endpoints(self):
consul_endpoint = "localhost:8500"
voltha_endpoint= "localhost:8880"
@@ -45,7 +48,8 @@
def test_connection_mgr_init(self):
consul_endpoint,voltha_endpoint,controller_endpoints = self.gen_endpoints()
my_name = self.gen_container_name()
- test_connection_init = ConnectionManager(consul_endpoint, voltha_endpoint, controller_endpoints, my_name)
+ test_connection_init = ConnectionManager(consul_endpoint, voltha_endpoint, self.grpc_timeout,
+ controller_endpoints, my_name)
self.assertEqual(test_connection_init.consul_endpoint,consul_endpoint)
self.assertEqual(test_connection_init.vcore_endpoint, voltha_endpoint)
self.assertEqual(test_connection_init.controller_endpoints, controller_endpoints)
@@ -53,7 +57,8 @@
def test_resolve_endpoint(self):
consul_endpoint, voltha_endpoint, controller_endpoints = self.gen_endpoints()
my_name = self.gen_container_name()
- test_connection_init = ConnectionManager(consul_endpoint, voltha_endpoint, controller_endpoints, my_name)
+ test_connection_init = ConnectionManager(consul_endpoint, voltha_endpoint, self.grpc_timeout,
+ controller_endpoints, my_name)
host,port = test_connection_init.resolve_endpoint(endpoint=consul_endpoint)
assert isinstance(port, int)
assert isinstance(host, basestring)
@@ -62,21 +67,24 @@
consul_endpoint, voltha_endpoint, controller_endpoints = self.gen_endpoints()
my_name = self.gen_container_name()
devices,device = self.gen_devices()
- test_connection_init = ConnectionManager(consul_endpoint, voltha_endpoint, controller_endpoints, my_name)
+ test_connection_init = ConnectionManager(consul_endpoint, voltha_endpoint, self.grpc_timeout,
+ controller_endpoints, my_name)
test_connection_init.refresh_agent_connections(devices)
def test_create_agent(self):
consul_endpoint, voltha_endpoint, controller_endpoints = self.gen_endpoints()
my_name = self.gen_container_name()
devices,device = self.gen_devices()
- test_connection_init = ConnectionManager(consul_endpoint, voltha_endpoint, controller_endpoints, my_name)
+ test_connection_init = ConnectionManager(consul_endpoint, voltha_endpoint, self.grpc_timeout,
+ controller_endpoints, my_name)
test_connection_init.create_agent(device)
def test_delete_agent(self):
consul_endpoint, voltha_endpoint, controller_endpoints = self.gen_endpoints()
my_name = self.gen_container_name()
devices,device = self.gen_devices()
- test_connection_init = ConnectionManager(consul_endpoint, voltha_endpoint, controller_endpoints, my_name)
+ test_connection_init = ConnectionManager(consul_endpoint, voltha_endpoint, self.grpc_timeout,
+ controller_endpoints, my_name)
test_connection_init.create_agent(device)
with self.assertRaises(Exception) as context:
test_connection_init.delete_agent(device.datapath_id)
@@ -88,7 +96,8 @@
my_name = self.gen_container_name()
devices,device = self.gen_devices()
packet_in = self.gen_packet_in()
- test_connection_init = ConnectionManager(consul_endpoint, voltha_endpoint, controller_endpoints, my_name)
+ test_connection_init = ConnectionManager(consul_endpoint, voltha_endpoint, self.grpc_timeout,
+ controller_endpoints, my_name)
test_connection_init.create_agent(device)
test_connection_init.forward_packet_in(device.id, packet_in)