VOL-1183: OFagent cannot reach vcore after vcore moved to different node in cluster

Two gRPC calls made by OFagent's ConnectionManager, Subscribe and ListLogicalDevices, actually
run in the Twisted reactor thread. On rare occasions, either call never returns when the vcore
connected to the OFagent dies. Because the blocking gRPC runs in the reactor thread, the entire
application is blocked and cannot detect comms errors with the deceased vcore. The handling  of
these gRPCs has been moved to the OFagent's internal GrpcClient, which wraps all gRPC calls in
the twisted.internet.threads.deferToThread function.

Change-Id: Ifa3479a3cbca7ae8d7f5e2ca0372e22507d5e4b9
diff --git a/ofagent/connection_mgr.py b/ofagent/connection_mgr.py
index 13ce2a0..2496dd5 100644
--- a/ofagent/connection_mgr.py
+++ b/ofagent/connection_mgr.py
@@ -32,7 +32,6 @@
 from grpc_client import GrpcClient
 
 from agent import Agent
-from google.protobuf.empty_pb2 import Empty
 from common.utils.dockerhelpers import get_my_containers_name
 
 
@@ -40,8 +39,8 @@
 # _ = third_party
 
 class ConnectionManager(object):
-    def __init__(self, consul_endpoint, vcore_endpoint, controller_endpoints,
-                 instance_id,
+    def __init__(self, consul_endpoint, vcore_endpoint, vcore_grpc_timeout,
+                 controller_endpoints, instance_id,
                  enable_tls=False, key_file=None, cert_file=None,
                  vcore_retry_interval=0.5, devices_refresh_interval=5,
                  subscription_refresh_interval=5):
@@ -51,6 +50,7 @@
         self.controller_endpoints = controller_endpoints
         self.consul_endpoint = consul_endpoint
         self.vcore_endpoint = vcore_endpoint
+        self.grpc_timeout = vcore_grpc_timeout
         self.instance_id = instance_id
         self.enable_tls = enable_tls
         self.key_file = key_file
@@ -160,8 +160,10 @@
 
                 # Send subscription request to register the current ofagent instance
                 container_name = self.instance_id
-                stub = voltha_pb2.VolthaLocalServiceStub(self.channel)
-                subscription = stub.Subscribe(OfAgentSubscriber(ofagent_id=container_name))
+                if self.grpc_client is None:
+                    self.grpc_client = GrpcClient(self, self.channel, self.grpc_timeout)
+                subscription = yield self.grpc_client.subscribe(
+                    OfAgentSubscriber(ofagent_id=container_name))
 
                 # If the subscriber id matches the current instance
                 # ... then the subscription has succeeded
@@ -170,7 +172,7 @@
                         # Keep details on the current GRPC session and subscription
                         log.debug('subscription-with-vcore-successful', subscription=subscription)
                         self.subscription = subscription
-                        self.grpc_client = GrpcClient(self, self.channel).start()
+                        self.grpc_client.start()
 
                     # Sleep a bit in between each subscribe
                     yield asleep(self.subscription_refresh_interval)
@@ -204,16 +206,16 @@
         while self.running:
             log.info('retrieve-logical-device-list')
             try:
-                stub = voltha_pb2.VolthaLocalServiceStub(self.channel)
-                devices = stub.ListLogicalDevices(Empty()).items
+                devices = yield self.grpc_client.list_logical_devices()
                 for device in devices:
                     log.info("logical-device-entry", id=device.id, datapath_id=device.datapath_id)
 
                 returnValue(devices)
 
             except _Rendezvous, e:
-                log.error('vcore-communication-failure', exception=e, status=e.code())
-                if e.code() == StatusCode.UNAVAILABLE:
+                status = e.code()
+                log.error('vcore-communication-failure', exception=e, status=status)
+                if status == StatusCode.UNAVAILABLE or status == StatusCode.DEADLINE_EXCEEDED:
                     os.system("kill -15 {}".format(os.getpid()))
 
             except Exception as e:
@@ -287,7 +289,8 @@
             # see https://jira.opencord.org/browse/CORD-821
 
             try:
-                if self.channel is not None and self.grpc_client is not None:
+                if self.channel is not None and self.grpc_client is not None and \
+                                self.subscription is not None:
                     # get current list from Voltha
                     devices = yield self.get_list_of_logical_devices_from_voltha()
 
diff --git a/ofagent/grpc_client.py b/ofagent/grpc_client.py
index af3ffe4..56af949 100644
--- a/ofagent/grpc_client.py
+++ b/ofagent/grpc_client.py
@@ -38,10 +38,11 @@
 
 class GrpcClient(object):
 
-    def __init__(self, connection_manager, channel):
+    def __init__(self, connection_manager, channel, grpc_timeout):
 
         self.connection_manager = connection_manager
         self.channel = channel
+        self.grpc_timeout = grpc_timeout
         self.local_stub = VolthaLocalServiceStub(channel)
 
         self.stopped = False
@@ -51,7 +52,7 @@
         self.change_event_queue = DeferredQueue()  # queue change events
 
     def start(self):
-        log.debug('starting')
+        log.debug('starting', grpc_timeout=self.grpc_timeout)
         self.start_packet_out_stream()
         self.start_packet_in_stream()
         self.start_change_event_in_stream()
@@ -82,6 +83,7 @@
             try:
                 self.local_stub.StreamPacketsOut(generator)
             except _Rendezvous, e:
+                log.error('grpc-exception', status=e.code())
                 if e.code() == StatusCode.UNAVAILABLE:
                     os.system("kill -15 {}".format(os.getpid()))
 
@@ -100,6 +102,7 @@
                               packet_in=packet_in,
                               queue_len=len(self.packet_in_queue.pending))
             except _Rendezvous, e:
+                log.error('grpc-exception', status=e.code())
                 if e.code() == StatusCode.UNAVAILABLE:
                     os.system("kill -15 {}".format(os.getpid()))
 
@@ -117,6 +120,7 @@
                               change_event=event,
                               queue_len=len(self.change_event_queue.pending))
             except _Rendezvous, e:
+                log.error('grpc-exception', status=e.code())
                 if e.code() == StatusCode.UNAVAILABLE:
                     os.system("kill -15 {}".format(os.getpid()))
 
@@ -152,14 +156,14 @@
     def get_port(self, device_id, port_id):
         req = LogicalPortId(id=device_id, port_id=port_id)
         res = yield threads.deferToThread(
-            self.local_stub.GetLogicalDevicePort, req)
+            self.local_stub.GetLogicalDevicePort, req, timeout=self.grpc_timeout)
         returnValue(res)
 
     @inlineCallbacks
     def get_port_list(self, device_id):
         req = ID(id=device_id)
         res = yield threads.deferToThread(
-            self.local_stub.ListLogicalDevicePorts, req)
+            self.local_stub.ListLogicalDevicePorts, req, timeout=self.grpc_timeout)
         returnValue(res.items)
 
     @inlineCallbacks
@@ -169,7 +173,7 @@
             port_id=port_id
         )
         res = yield threads.deferToThread(
-            self.local_stub.EnableLogicalDevicePort, req)
+            self.local_stub.EnableLogicalDevicePort, req, timeout=self.grpc_timeout)
         returnValue(res)
 
     @inlineCallbacks
@@ -179,14 +183,14 @@
             port_id=port_id
         )
         res = yield threads.deferToThread(
-            self.local_stub.DisableLogicalDevicePort, req)
+            self.local_stub.DisableLogicalDevicePort, req, timeout=self.grpc_timeout)
         returnValue(res)
 
     @inlineCallbacks
     def get_device_info(self, device_id):
         req = ID(id=device_id)
         res = yield threads.deferToThread(
-            self.local_stub.GetLogicalDevice, req)
+            self.local_stub.GetLogicalDevice, req, timeout=self.grpc_timeout)
         returnValue(res)
 
     @inlineCallbacks
@@ -196,7 +200,7 @@
             flow_mod=flow_mod
         )
         res = yield threads.deferToThread(
-            self.local_stub.UpdateLogicalDeviceFlowTable, req)
+            self.local_stub.UpdateLogicalDeviceFlowTable, req, timeout=self.grpc_timeout)
         returnValue(res)
 
     @inlineCallbacks
@@ -206,26 +210,38 @@
             group_mod=group_mod
         )
         res = yield threads.deferToThread(
-            self.local_stub.UpdateLogicalDeviceFlowGroupTable, req)
+            self.local_stub.UpdateLogicalDeviceFlowGroupTable, req, timeout=self.grpc_timeout)
         returnValue(res)
 
     @inlineCallbacks
     def list_flows(self, device_id):
         req = ID(id=device_id)
         res = yield threads.deferToThread(
-            self.local_stub.ListLogicalDeviceFlows, req)
+            self.local_stub.ListLogicalDeviceFlows, req, timeout=self.grpc_timeout)
         returnValue(res.items)
 
     @inlineCallbacks
     def list_groups(self, device_id):
         req = ID(id=device_id)
         res = yield threads.deferToThread(
-            self.local_stub.ListLogicalDeviceFlowGroups, req)
+            self.local_stub.ListLogicalDeviceFlowGroups, req, timeout=self.grpc_timeout)
         returnValue(res.items)
 
     @inlineCallbacks
     def list_ports(self, device_id):
         req = ID(id=device_id)
         res = yield threads.deferToThread(
-            self.local_stub.ListLogicalDevicePorts, req)
-        returnValue(res.items)
\ No newline at end of file
+            self.local_stub.ListLogicalDevicePorts, req, timeout=self.grpc_timeout)
+        returnValue(res.items)
+
+    @inlineCallbacks
+    def list_logical_devices(self):
+        res = yield threads.deferToThread(
+            self.local_stub.ListLogicalDevices, empty_pb2.Empty(), timeout=self.grpc_timeout)
+        returnValue(res.items)
+
+    @inlineCallbacks
+    def subscribe(self, subscriber):
+        res = yield threads.deferToThread(
+            self.local_stub.Subscribe, subscriber, timeout=self.grpc_timeout)
+        returnValue(res)
diff --git a/ofagent/main.py b/ofagent/main.py
index 23335bb..06c2ae3 100755
--- a/ofagent/main.py
+++ b/ofagent/main.py
@@ -203,6 +203,9 @@
 
         self.args = args = parse_args()
         self.config = load_config(args)
+        # May want to specify the gRPC timeout as an arg (in future)
+        # Right now, set a default value
+        self.grpc_timeout = 10
 
         verbosity_adjust = (args.verbose or 0) - (args.quiet or 0)
         self.log = setup_logging(self.config.get('logging', {}),
@@ -227,7 +230,8 @@
         self.log.info('starting-internal-components')
         args = self.args
         self.connection_manager = yield ConnectionManager(
-            args.consul, args.grpc_endpoint, args.controller, args.instance_id, \
+            args.consul, args.grpc_endpoint, self.grpc_timeout,
+            args.controller, args.instance_id,
             args.enable_tls, args.key_file, args.cert_file).start()
         self.log.info('started-internal-services')
 
diff --git a/tests/utests/README.md b/tests/utests/README.md
index 731ecfa..7a1c9b1 100644
--- a/tests/utests/README.md
+++ b/tests/utests/README.md
@@ -22,6 +22,5 @@
 ```
 cd /cord/incubator/voltha/
 . ./env.sh
-cd ofagent/
 nosetests -s tests/utests/ofagent/
 ```
diff --git a/tests/utests/ofagent/test_connection_mgr.py b/tests/utests/ofagent/test_connection_mgr.py
index 4ca022c..fe7bcdf 100644
--- a/tests/utests/ofagent/test_connection_mgr.py
+++ b/tests/utests/ofagent/test_connection_mgr.py
@@ -17,6 +17,9 @@
 
 class TestConection_mgr(TestCase):
 
+    # Set a default for the gRPC timeout (seconds)
+    grpc_timeout = 10
+
     def gen_endpoints(self):
         consul_endpoint = "localhost:8500"
         voltha_endpoint= "localhost:8880"
@@ -45,7 +48,8 @@
     def test_connection_mgr_init(self):
         consul_endpoint,voltha_endpoint,controller_endpoints  = self.gen_endpoints()
         my_name = self.gen_container_name()
-        test_connection_init = ConnectionManager(consul_endpoint, voltha_endpoint, controller_endpoints, my_name)
+        test_connection_init = ConnectionManager(consul_endpoint, voltha_endpoint, self.grpc_timeout,
+                                                 controller_endpoints, my_name)
         self.assertEqual(test_connection_init.consul_endpoint,consul_endpoint)
         self.assertEqual(test_connection_init.vcore_endpoint, voltha_endpoint)
         self.assertEqual(test_connection_init.controller_endpoints, controller_endpoints)
@@ -53,7 +57,8 @@
     def test_resolve_endpoint(self):
         consul_endpoint, voltha_endpoint, controller_endpoints = self.gen_endpoints()
         my_name = self.gen_container_name()
-        test_connection_init = ConnectionManager(consul_endpoint, voltha_endpoint, controller_endpoints, my_name)
+        test_connection_init = ConnectionManager(consul_endpoint, voltha_endpoint, self.grpc_timeout,
+                                                 controller_endpoints, my_name)
         host,port = test_connection_init.resolve_endpoint(endpoint=consul_endpoint)
         assert isinstance(port, int)
         assert isinstance(host, basestring)
@@ -62,21 +67,24 @@
         consul_endpoint, voltha_endpoint, controller_endpoints = self.gen_endpoints()
         my_name = self.gen_container_name()
         devices,device = self.gen_devices()
-        test_connection_init = ConnectionManager(consul_endpoint, voltha_endpoint, controller_endpoints, my_name)
+        test_connection_init = ConnectionManager(consul_endpoint, voltha_endpoint, self.grpc_timeout,
+                                                 controller_endpoints, my_name)
         test_connection_init.refresh_agent_connections(devices)
 
     def test_create_agent(self):
         consul_endpoint, voltha_endpoint, controller_endpoints = self.gen_endpoints()
         my_name = self.gen_container_name()
         devices,device = self.gen_devices()
-        test_connection_init = ConnectionManager(consul_endpoint, voltha_endpoint, controller_endpoints, my_name)
+        test_connection_init = ConnectionManager(consul_endpoint, voltha_endpoint, self.grpc_timeout,
+                                                 controller_endpoints, my_name)
         test_connection_init.create_agent(device)
 
     def test_delete_agent(self):
         consul_endpoint, voltha_endpoint, controller_endpoints = self.gen_endpoints()
         my_name = self.gen_container_name()
         devices,device = self.gen_devices()
-        test_connection_init = ConnectionManager(consul_endpoint, voltha_endpoint, controller_endpoints, my_name)
+        test_connection_init = ConnectionManager(consul_endpoint, voltha_endpoint, self.grpc_timeout,
+                                                 controller_endpoints, my_name)
         test_connection_init.create_agent(device)
         with self.assertRaises(Exception) as context:
             test_connection_init.delete_agent(device.datapath_id)
@@ -88,7 +96,8 @@
         my_name = self.gen_container_name()
         devices,device = self.gen_devices()
         packet_in = self.gen_packet_in()
-        test_connection_init = ConnectionManager(consul_endpoint, voltha_endpoint, controller_endpoints, my_name)
+        test_connection_init = ConnectionManager(consul_endpoint, voltha_endpoint, self.grpc_timeout,
+                                                 controller_endpoints, my_name)
         test_connection_init.create_agent(device)
         test_connection_init.forward_packet_in(device.id, packet_in)