VOL-1183: OFagent cannot reach vcore after vcore moved to different node in cluster

Two gRPC calls made by OFagent's ConnectionManager, Subscribe and ListLogicalDevices, actually
run in the Twisted reactor thread. On rare occasions, either call never returns when the vcore
connected to the OFagent dies. Because the blocking gRPC runs in the reactor thread, the entire
application is blocked and cannot detect comms errors with the deceased vcore. The handling  of
these gRPCs has been moved to the OFagent's internal GrpcClient, which wraps all gRPC calls in
the twisted.internet.threads.deferToThread function.

Change-Id: Ifa3479a3cbca7ae8d7f5e2ca0372e22507d5e4b9
diff --git a/ofagent/connection_mgr.py b/ofagent/connection_mgr.py
index 13ce2a0..2496dd5 100644
--- a/ofagent/connection_mgr.py
+++ b/ofagent/connection_mgr.py
@@ -32,7 +32,6 @@
 from grpc_client import GrpcClient
 
 from agent import Agent
-from google.protobuf.empty_pb2 import Empty
 from common.utils.dockerhelpers import get_my_containers_name
 
 
@@ -40,8 +39,8 @@
 # _ = third_party
 
 class ConnectionManager(object):
-    def __init__(self, consul_endpoint, vcore_endpoint, controller_endpoints,
-                 instance_id,
+    def __init__(self, consul_endpoint, vcore_endpoint, vcore_grpc_timeout,
+                 controller_endpoints, instance_id,
                  enable_tls=False, key_file=None, cert_file=None,
                  vcore_retry_interval=0.5, devices_refresh_interval=5,
                  subscription_refresh_interval=5):
@@ -51,6 +50,7 @@
         self.controller_endpoints = controller_endpoints
         self.consul_endpoint = consul_endpoint
         self.vcore_endpoint = vcore_endpoint
+        self.grpc_timeout = vcore_grpc_timeout
         self.instance_id = instance_id
         self.enable_tls = enable_tls
         self.key_file = key_file
@@ -160,8 +160,10 @@
 
                 # Send subscription request to register the current ofagent instance
                 container_name = self.instance_id
-                stub = voltha_pb2.VolthaLocalServiceStub(self.channel)
-                subscription = stub.Subscribe(OfAgentSubscriber(ofagent_id=container_name))
+                if self.grpc_client is None:
+                    self.grpc_client = GrpcClient(self, self.channel, self.grpc_timeout)
+                subscription = yield self.grpc_client.subscribe(
+                    OfAgentSubscriber(ofagent_id=container_name))
 
                 # If the subscriber id matches the current instance
                 # ... then the subscription has succeeded
@@ -170,7 +172,7 @@
                         # Keep details on the current GRPC session and subscription
                         log.debug('subscription-with-vcore-successful', subscription=subscription)
                         self.subscription = subscription
-                        self.grpc_client = GrpcClient(self, self.channel).start()
+                        self.grpc_client.start()
 
                     # Sleep a bit in between each subscribe
                     yield asleep(self.subscription_refresh_interval)
@@ -204,16 +206,16 @@
         while self.running:
             log.info('retrieve-logical-device-list')
             try:
-                stub = voltha_pb2.VolthaLocalServiceStub(self.channel)
-                devices = stub.ListLogicalDevices(Empty()).items
+                devices = yield self.grpc_client.list_logical_devices()
                 for device in devices:
                     log.info("logical-device-entry", id=device.id, datapath_id=device.datapath_id)
 
                 returnValue(devices)
 
             except _Rendezvous, e:
-                log.error('vcore-communication-failure', exception=e, status=e.code())
-                if e.code() == StatusCode.UNAVAILABLE:
+                status = e.code()
+                log.error('vcore-communication-failure', exception=e, status=status)
+                if status == StatusCode.UNAVAILABLE or status == StatusCode.DEADLINE_EXCEEDED:
                     os.system("kill -15 {}".format(os.getpid()))
 
             except Exception as e:
@@ -287,7 +289,8 @@
             # see https://jira.opencord.org/browse/CORD-821
 
             try:
-                if self.channel is not None and self.grpc_client is not None:
+                if self.channel is not None and self.grpc_client is not None and \
+                                self.subscription is not None:
                     # get current list from Voltha
                     devices = yield self.get_list_of_logical_devices_from_voltha()