VOL-405: Create GET timeouts in voltha core instead on relying on the Twisted Consul API for watch notifications.   Also add a timeout to the voltha peer communications

Change-Id: Ifd1069eefb2fc1fb18c1cf6a2f421fd08fd8f76b
diff --git a/voltha/coordinator.py b/voltha/coordinator.py
index 0c2d746..a82ebea 100644
--- a/voltha/coordinator.py
+++ b/voltha/coordinator.py
@@ -243,10 +243,10 @@
                 valid_membership = yield self._assert_membership_record_valid()
                 if not valid_membership:
                     log.info('recreating-membership-before',
-                              session=self.session_id)
+                             session=self.session_id)
                     yield self._do_create_membership_record_with_retries()
                     log.info('recreating-membership-after',
-                              session=self.session_id)
+                             session=self.session_id)
                 else:
                     log.debug('valid-membership', session=self.session_id)
                 # Async sleep before checking the membership record again
@@ -380,8 +380,6 @@
     def _start_leader_tracking(self):
         reactor.callLater(0, self._leadership_tracking_loop)
 
-
-
     @inlineCallbacks
     def _leadership_tracking_loop(self):
         try:
@@ -418,17 +416,25 @@
                     yield self._assert_nonleadership(leader_id)
 
             # if record was none, we shall try leadership again
-
-            # using consul's watch feature, start tracking any changes to key
             last = record
             while last is not None:
                 # this shall return only when update is made to leader key
-                # or expires after 5 seconds wait (in case consul took an
-                # unexpected vacation)
-                (index, updated) = yield self._retry('GET',
-                                                     self.leader_prefix,
-                                                     wait='5s',
-                                                     index=index)
+                # or expires after 5 seconds wait
+                is_timeout, (tmp_index, updated) = yield \
+                    self.consul_get_with_timeout(
+                        key=self.leader_prefix,
+                        index=index,
+                        timeout=5)
+                # Timeout means either there is a lost connectivity to
+                # consul or there are no change to that key.  Do nothing.
+                if is_timeout:
+                    continue
+
+                # After timeout event the index returned from
+                # consul_get_with_timeout is None.  If we are here it's not a
+                # timeout, therefore the index is a valid one.
+                index=tmp_index
+
                 if updated is None or updated != last:
                     log.info('leader-key-change',
                              index=index, updated=updated, last=last)
@@ -550,3 +556,39 @@
 
         log.info('end', operation=operation, args=args)
         returnValue(result)
+
+    @inlineCallbacks
+    def consul_get_with_timeout(self, key, timeout, **kw):
+        """
+        Query consul with a timeout
+        :param key: Key to query
+        :param timeout: timeout value
+        :param kw: additional key-value params
+        :return: (is_timeout, (index, result)).
+        """
+
+        @inlineCallbacks
+        def _get(key, m_callback):
+            try:
+                (index, result) = yield self._retry('GET', key, **kw)
+                if not m_callback.called:
+                    log.debug('got-result-cancelling-timer')
+                    m_callback.callback((index, result))
+            except Exception as e:
+                log.exception('got-exception', e=e)
+
+        try:
+            rcvd = DeferredWithTimeout(timeout=timeout)
+            _get(key, rcvd)
+            try:
+                result = yield rcvd
+                log.debug('result-received', result=result)
+                returnValue((False, result))
+            except TimeOutError as e:
+                log.debug('timeout-or-no-data-change', key=key)
+            except Exception as e:
+                log.exception('exception', e=e)
+        except Exception as e:
+            log.exception('exception', e=e)
+
+        returnValue((True, (None, None)))
diff --git a/voltha/core/dispatcher.py b/voltha/core/dispatcher.py
index be253b8..1457124 100644
--- a/voltha/core/dispatcher.py
+++ b/voltha/core/dispatcher.py
@@ -340,9 +340,15 @@
 
         try:
             # Always request from the local service when making request to peer
+            # Add a long timeout of 15 seconds to balance between:
+            #       (1) a query for large amount of data from a peer
+            #       (2) an error where the peer is not responding and the
+            #           request keeps waiting without getting a grpc
+            #           rendez-vous exception.
             stub = VolthaLocalServiceStub
             method = getattr(stub(self.grpc_conn_map[core_id]), method_name)
             response, rendezvous = yield method.with_call(request,
+                                                          timeout=15,
                                                           metadata=context.invocation_metadata())
             log.debug('peer-response',
                       core_id=core_id,
diff --git a/voltha/leader.py b/voltha/leader.py
index 3bfbc6c..62bf1d5 100644
--- a/voltha/leader.py
+++ b/voltha/leader.py
@@ -78,6 +78,10 @@
         self.assignment_id_match = re.compile(
             self.ASSIGNMENT_ID_EXTRACTOR % self.coord.assignment_prefix).match
 
+        self.members_tracking_sleep_to_prevent_flood = \
+            self.coord.leader_config.get((self.coord.leader_config[
+                'members_track_error_to_prevent_flood']), 1)
+
     @inlineCallbacks
     def start(self):
         log.debug('starting')
@@ -240,22 +244,23 @@
     def _track_members(self, index):
         previous_index = index
         try:
-            (index, results) = yield self.coord.kv_get(
-                self.coord.membership_prefix,
-                wait='10s',
-                index=index,
-                recurse=True)
-
-            if not results:
-                log.info('no-data-yet', index=index)
-                return
-
+            log.info('member-tracking-before')
+            is_timeout, (tmp_index, results) = yield \
+                                    self.coord.consul_get_with_timeout(
+                                            key=self.coord.membership_prefix,
+                                            recurse=True,
+                                            index=index,
+                                            timeout=10)
             # Check whether we are still the leader - a new regime may be in
             # place by the time we see a membership update
             if self.halted:
                 log.info('I am no longer the leader')
                 return
 
+            if is_timeout:
+                log.debug('timeout-or-no-membership-changed')
+                return
+
             # This can happen if consul went down and came back with no data
             if not results:
                 log.error('no-active-members')
@@ -263,6 +268,13 @@
                 self.coord._just_lost_leadership()
                 return
 
+            # After timeout event the index returned from
+            # consul_get_with_timeout is None.  If we are here it's not a
+            # timeout, therefore the index is a valid one.
+            index=tmp_index
+
+            log.info('membership-tracking-data', index=index, results=results)
+
             if previous_index != index:
                 log.info('membership-updated',
                          previous_index=previous_index, index=index)
@@ -303,11 +315,8 @@
 
         except Exception, e:
             log.exception('members-track-error', e=e)
-            yield asleep(
-                self.coord.leader_config.get(
-                    self.coord.leader_config[
-                        'members_track_error_to_prevent_flood']), 1)
             # to prevent flood
+            yield asleep(self.members_tracking_sleep_to_prevent_flood)
         finally:
             if not self.halted:
                 reactor.callLater(1, self._track_members, index)
diff --git a/voltha/worker.py b/voltha/worker.py
index d180cd1..b47accc 100644
--- a/voltha/worker.py
+++ b/voltha/worker.py
@@ -150,11 +150,21 @@
             prev_index = index
             if self.mycore_store_id:
                 # Wait for updates to the store assigment key
-                (index, mappings) = yield self.coord.kv_get(
-                    self.coord.core_store_assignment_key,
-                    wait='10s',
-                    index=index,
-                    recurse=True)
+                is_timeout, (tmp_index, mappings) = yield \
+                                self.coord.consul_get_with_timeout(
+                                    key=self.coord.core_store_assignment_key,
+                                    recurse=True,
+                                    index=index,
+                                    timeout=10)
+
+                if is_timeout:
+                    return
+
+                # After timeout event the index returned from
+                # consul_get_with_timeout is None.  If we are here it's not a
+                # timeout, therefore the index is a valid one.
+                index=tmp_index
+
                 if mappings and index != prev_index:
                     new_map = loads(mappings[0]['Value'])
                     # Remove my id from my peers list