VOL-405: Create GET timeouts in voltha core instead on relying on the Twisted Consul API for watch notifications. Also add a timeout to the voltha peer communications
Change-Id: Ifd1069eefb2fc1fb18c1cf6a2f421fd08fd8f76b
diff --git a/voltha/coordinator.py b/voltha/coordinator.py
index 0c2d746..a82ebea 100644
--- a/voltha/coordinator.py
+++ b/voltha/coordinator.py
@@ -243,10 +243,10 @@
valid_membership = yield self._assert_membership_record_valid()
if not valid_membership:
log.info('recreating-membership-before',
- session=self.session_id)
+ session=self.session_id)
yield self._do_create_membership_record_with_retries()
log.info('recreating-membership-after',
- session=self.session_id)
+ session=self.session_id)
else:
log.debug('valid-membership', session=self.session_id)
# Async sleep before checking the membership record again
@@ -380,8 +380,6 @@
def _start_leader_tracking(self):
reactor.callLater(0, self._leadership_tracking_loop)
-
-
@inlineCallbacks
def _leadership_tracking_loop(self):
try:
@@ -418,17 +416,25 @@
yield self._assert_nonleadership(leader_id)
# if record was none, we shall try leadership again
-
- # using consul's watch feature, start tracking any changes to key
last = record
while last is not None:
# this shall return only when update is made to leader key
- # or expires after 5 seconds wait (in case consul took an
- # unexpected vacation)
- (index, updated) = yield self._retry('GET',
- self.leader_prefix,
- wait='5s',
- index=index)
+ # or expires after 5 seconds wait
+ is_timeout, (tmp_index, updated) = yield \
+ self.consul_get_with_timeout(
+ key=self.leader_prefix,
+ index=index,
+ timeout=5)
+ # Timeout means either there is a lost connectivity to
+ # consul or there are no change to that key. Do nothing.
+ if is_timeout:
+ continue
+
+ # After timeout event the index returned from
+ # consul_get_with_timeout is None. If we are here it's not a
+ # timeout, therefore the index is a valid one.
+ index=tmp_index
+
if updated is None or updated != last:
log.info('leader-key-change',
index=index, updated=updated, last=last)
@@ -550,3 +556,39 @@
log.info('end', operation=operation, args=args)
returnValue(result)
+
+ @inlineCallbacks
+ def consul_get_with_timeout(self, key, timeout, **kw):
+ """
+ Query consul with a timeout
+ :param key: Key to query
+ :param timeout: timeout value
+ :param kw: additional key-value params
+ :return: (is_timeout, (index, result)).
+ """
+
+ @inlineCallbacks
+ def _get(key, m_callback):
+ try:
+ (index, result) = yield self._retry('GET', key, **kw)
+ if not m_callback.called:
+ log.debug('got-result-cancelling-timer')
+ m_callback.callback((index, result))
+ except Exception as e:
+ log.exception('got-exception', e=e)
+
+ try:
+ rcvd = DeferredWithTimeout(timeout=timeout)
+ _get(key, rcvd)
+ try:
+ result = yield rcvd
+ log.debug('result-received', result=result)
+ returnValue((False, result))
+ except TimeOutError as e:
+ log.debug('timeout-or-no-data-change', key=key)
+ except Exception as e:
+ log.exception('exception', e=e)
+ except Exception as e:
+ log.exception('exception', e=e)
+
+ returnValue((True, (None, None)))
diff --git a/voltha/core/dispatcher.py b/voltha/core/dispatcher.py
index be253b8..1457124 100644
--- a/voltha/core/dispatcher.py
+++ b/voltha/core/dispatcher.py
@@ -340,9 +340,15 @@
try:
# Always request from the local service when making request to peer
+ # Add a long timeout of 15 seconds to balance between:
+ # (1) a query for large amount of data from a peer
+ # (2) an error where the peer is not responding and the
+ # request keeps waiting without getting a grpc
+ # rendez-vous exception.
stub = VolthaLocalServiceStub
method = getattr(stub(self.grpc_conn_map[core_id]), method_name)
response, rendezvous = yield method.with_call(request,
+ timeout=15,
metadata=context.invocation_metadata())
log.debug('peer-response',
core_id=core_id,
diff --git a/voltha/leader.py b/voltha/leader.py
index 3bfbc6c..62bf1d5 100644
--- a/voltha/leader.py
+++ b/voltha/leader.py
@@ -78,6 +78,10 @@
self.assignment_id_match = re.compile(
self.ASSIGNMENT_ID_EXTRACTOR % self.coord.assignment_prefix).match
+ self.members_tracking_sleep_to_prevent_flood = \
+ self.coord.leader_config.get((self.coord.leader_config[
+ 'members_track_error_to_prevent_flood']), 1)
+
@inlineCallbacks
def start(self):
log.debug('starting')
@@ -240,22 +244,23 @@
def _track_members(self, index):
previous_index = index
try:
- (index, results) = yield self.coord.kv_get(
- self.coord.membership_prefix,
- wait='10s',
- index=index,
- recurse=True)
-
- if not results:
- log.info('no-data-yet', index=index)
- return
-
+ log.info('member-tracking-before')
+ is_timeout, (tmp_index, results) = yield \
+ self.coord.consul_get_with_timeout(
+ key=self.coord.membership_prefix,
+ recurse=True,
+ index=index,
+ timeout=10)
# Check whether we are still the leader - a new regime may be in
# place by the time we see a membership update
if self.halted:
log.info('I am no longer the leader')
return
+ if is_timeout:
+ log.debug('timeout-or-no-membership-changed')
+ return
+
# This can happen if consul went down and came back with no data
if not results:
log.error('no-active-members')
@@ -263,6 +268,13 @@
self.coord._just_lost_leadership()
return
+ # After timeout event the index returned from
+ # consul_get_with_timeout is None. If we are here it's not a
+ # timeout, therefore the index is a valid one.
+ index=tmp_index
+
+ log.info('membership-tracking-data', index=index, results=results)
+
if previous_index != index:
log.info('membership-updated',
previous_index=previous_index, index=index)
@@ -303,11 +315,8 @@
except Exception, e:
log.exception('members-track-error', e=e)
- yield asleep(
- self.coord.leader_config.get(
- self.coord.leader_config[
- 'members_track_error_to_prevent_flood']), 1)
# to prevent flood
+ yield asleep(self.members_tracking_sleep_to_prevent_flood)
finally:
if not self.halted:
reactor.callLater(1, self._track_members, index)
diff --git a/voltha/worker.py b/voltha/worker.py
index d180cd1..b47accc 100644
--- a/voltha/worker.py
+++ b/voltha/worker.py
@@ -150,11 +150,21 @@
prev_index = index
if self.mycore_store_id:
# Wait for updates to the store assigment key
- (index, mappings) = yield self.coord.kv_get(
- self.coord.core_store_assignment_key,
- wait='10s',
- index=index,
- recurse=True)
+ is_timeout, (tmp_index, mappings) = yield \
+ self.coord.consul_get_with_timeout(
+ key=self.coord.core_store_assignment_key,
+ recurse=True,
+ index=index,
+ timeout=10)
+
+ if is_timeout:
+ return
+
+ # After timeout event the index returned from
+ # consul_get_with_timeout is None. If we are here it's not a
+ # timeout, therefore the index is a valid one.
+ index=tmp_index
+
if mappings and index != prev_index:
new_map = loads(mappings[0]['Value'])
# Remove my id from my peers list