VOL-1183: OFagent cannot reach vcore after vcore moved to different node in cluster

Two gRPC calls made by OFagent's ConnectionManager, Subscribe and ListLogicalDevices, actually
run in the Twisted reactor thread. On rare occasions, either call never returns when the vcore
connected to the OFagent dies. Because the blocking gRPC runs in the reactor thread, the entire
application is blocked and cannot detect comms errors with the deceased vcore. The handling  of
these gRPCs has been moved to the OFagent's internal GrpcClient, which wraps all gRPC calls in
the twisted.internet.threads.deferToThread function.

Change-Id: Ifa3479a3cbca7ae8d7f5e2ca0372e22507d5e4b9
diff --git a/ofagent/grpc_client.py b/ofagent/grpc_client.py
index af3ffe4..56af949 100644
--- a/ofagent/grpc_client.py
+++ b/ofagent/grpc_client.py
@@ -38,10 +38,11 @@
 
 class GrpcClient(object):
 
-    def __init__(self, connection_manager, channel):
+    def __init__(self, connection_manager, channel, grpc_timeout):
 
         self.connection_manager = connection_manager
         self.channel = channel
+        self.grpc_timeout = grpc_timeout
         self.local_stub = VolthaLocalServiceStub(channel)
 
         self.stopped = False
@@ -51,7 +52,7 @@
         self.change_event_queue = DeferredQueue()  # queue change events
 
     def start(self):
-        log.debug('starting')
+        log.debug('starting', grpc_timeout=self.grpc_timeout)
         self.start_packet_out_stream()
         self.start_packet_in_stream()
         self.start_change_event_in_stream()
@@ -82,6 +83,7 @@
             try:
                 self.local_stub.StreamPacketsOut(generator)
             except _Rendezvous, e:
+                log.error('grpc-exception', status=e.code())
                 if e.code() == StatusCode.UNAVAILABLE:
                     os.system("kill -15 {}".format(os.getpid()))
 
@@ -100,6 +102,7 @@
                               packet_in=packet_in,
                               queue_len=len(self.packet_in_queue.pending))
             except _Rendezvous, e:
+                log.error('grpc-exception', status=e.code())
                 if e.code() == StatusCode.UNAVAILABLE:
                     os.system("kill -15 {}".format(os.getpid()))
 
@@ -117,6 +120,7 @@
                               change_event=event,
                               queue_len=len(self.change_event_queue.pending))
             except _Rendezvous, e:
+                log.error('grpc-exception', status=e.code())
                 if e.code() == StatusCode.UNAVAILABLE:
                     os.system("kill -15 {}".format(os.getpid()))
 
@@ -152,14 +156,14 @@
     def get_port(self, device_id, port_id):
         req = LogicalPortId(id=device_id, port_id=port_id)
         res = yield threads.deferToThread(
-            self.local_stub.GetLogicalDevicePort, req)
+            self.local_stub.GetLogicalDevicePort, req, timeout=self.grpc_timeout)
         returnValue(res)
 
     @inlineCallbacks
     def get_port_list(self, device_id):
         req = ID(id=device_id)
         res = yield threads.deferToThread(
-            self.local_stub.ListLogicalDevicePorts, req)
+            self.local_stub.ListLogicalDevicePorts, req, timeout=self.grpc_timeout)
         returnValue(res.items)
 
     @inlineCallbacks
@@ -169,7 +173,7 @@
             port_id=port_id
         )
         res = yield threads.deferToThread(
-            self.local_stub.EnableLogicalDevicePort, req)
+            self.local_stub.EnableLogicalDevicePort, req, timeout=self.grpc_timeout)
         returnValue(res)
 
     @inlineCallbacks
@@ -179,14 +183,14 @@
             port_id=port_id
         )
         res = yield threads.deferToThread(
-            self.local_stub.DisableLogicalDevicePort, req)
+            self.local_stub.DisableLogicalDevicePort, req, timeout=self.grpc_timeout)
         returnValue(res)
 
     @inlineCallbacks
     def get_device_info(self, device_id):
         req = ID(id=device_id)
         res = yield threads.deferToThread(
-            self.local_stub.GetLogicalDevice, req)
+            self.local_stub.GetLogicalDevice, req, timeout=self.grpc_timeout)
         returnValue(res)
 
     @inlineCallbacks
@@ -196,7 +200,7 @@
             flow_mod=flow_mod
         )
         res = yield threads.deferToThread(
-            self.local_stub.UpdateLogicalDeviceFlowTable, req)
+            self.local_stub.UpdateLogicalDeviceFlowTable, req, timeout=self.grpc_timeout)
         returnValue(res)
 
     @inlineCallbacks
@@ -206,26 +210,38 @@
             group_mod=group_mod
         )
         res = yield threads.deferToThread(
-            self.local_stub.UpdateLogicalDeviceFlowGroupTable, req)
+            self.local_stub.UpdateLogicalDeviceFlowGroupTable, req, timeout=self.grpc_timeout)
         returnValue(res)
 
     @inlineCallbacks
     def list_flows(self, device_id):
         req = ID(id=device_id)
         res = yield threads.deferToThread(
-            self.local_stub.ListLogicalDeviceFlows, req)
+            self.local_stub.ListLogicalDeviceFlows, req, timeout=self.grpc_timeout)
         returnValue(res.items)
 
     @inlineCallbacks
     def list_groups(self, device_id):
         req = ID(id=device_id)
         res = yield threads.deferToThread(
-            self.local_stub.ListLogicalDeviceFlowGroups, req)
+            self.local_stub.ListLogicalDeviceFlowGroups, req, timeout=self.grpc_timeout)
         returnValue(res.items)
 
     @inlineCallbacks
     def list_ports(self, device_id):
         req = ID(id=device_id)
         res = yield threads.deferToThread(
-            self.local_stub.ListLogicalDevicePorts, req)
-        returnValue(res.items)
\ No newline at end of file
+            self.local_stub.ListLogicalDevicePorts, req, timeout=self.grpc_timeout)
+        returnValue(res.items)
+
+    @inlineCallbacks
+    def list_logical_devices(self):
+        res = yield threads.deferToThread(
+            self.local_stub.ListLogicalDevices, empty_pb2.Empty(), timeout=self.grpc_timeout)
+        returnValue(res.items)
+
+    @inlineCallbacks
+    def subscribe(self, subscriber):
+        res = yield threads.deferToThread(
+            self.local_stub.Subscribe, subscriber, timeout=self.grpc_timeout)
+        returnValue(res)