This commit addresses some of the border conditions when a consul node
goes down and voltha needs to recuperate with the other other nodes.
Change-Id: I9c5bd997ddf9624295e50ce86638eaf8d803a6e7
diff --git a/voltha/coordinator.py b/voltha/coordinator.py
index cf38d69..a18b9f1 100644
--- a/voltha/coordinator.py
+++ b/voltha/coordinator.py
@@ -72,7 +72,7 @@
self.worker_config = config['worker']
self.leader_config = config['leader']
self.membership_watch_relatch_delay = config.get(
- 'membership_watch_relatch_delay', 0.1)
+ 'membership_watch_relatch_delay', 5.0)
self.tracking_loop_delay = config.get(
'tracking_loop_delay', 1)
self.session_renewal_timeout = config.get(
@@ -106,7 +106,7 @@
self.leader_id = None # will be the instance id of the current leader
self.shutting_down = False
self.leader = None
- self.session_renew_timer = None
+ self.membership_callback = None
self.worker = Worker(self.instance_id, self)
@@ -130,7 +130,6 @@
def stop(self):
log.debug('stopping')
self.shutting_down = True
- self.session_renew_timer.stop()
yield self._delete_session() # this will delete the leader lock too
yield self.worker.stop()
if self.leader is not None:
@@ -221,12 +220,15 @@
@inlineCallbacks
def _delete_session(self):
- yield self.consul.session.destroy(self.session_id)
+ try:
+ yield self.consul.session.destroy(self.session_id)
+ except Exception as e:
+ log.exception('failed-to-delete-session',
+ session_id=self.session_id)
@inlineCallbacks
def _create_membership_record(self):
yield self._do_create_membership_record_with_retries()
- reactor.callLater(0, self._maintain_membership_record)
def _create_membership_record_data(self):
member_record = dict()
@@ -235,58 +237,55 @@
return member_record
@inlineCallbacks
+ def _assert_membership_record_valid(self):
+ try:
+ log.debug('membership-record-before')
+ (_, record) = yield self._retry('GET', self.membership_record_key)
+ log.debug('membership-record-after', record=record)
+ if record is None or \
+ 'Session' not in record or \
+ record['Session'] != self.session_id:
+ log.debug('membership-record-change-detected',
+ old_session=self.session_id,
+ record=record)
+ returnValue(False)
+ else:
+ returnValue(True)
+ except Exception as e:
+ log.exception('membership-validation-exception', e=e)
+ returnValue(False)
+
+
+ @inlineCallbacks
def _do_create_membership_record_with_retries(self):
while 1:
- result = yield self._retry(
- 'PUT',
- self.membership_record_key,
- dumps(self._create_membership_record_data()),
- acquire=self.session_id)
- if result:
- log.debug('new-membership-record', session=self.session_id)
- break
+ # First check whether we need to create membership
+ valid_membership = yield self._assert_membership_record_valid()
+ if not valid_membership:
+ try:
+ # Delete old record
+ yield self._retry('DELETE', self.membership_record_key)
+ except Exception as e:
+ log.info('cannot-delete-older-membership', e=e)
+
+ # Just for consul to sync up, lets wait 1 second
+ yield asleep(1)
+
+ log.debug('recreating-membership', session=self.session_id)
+ result = yield self._retry(
+ 'PUT',
+ self.membership_record_key,
+ dumps(self._create_membership_record_data()),
+ acquire=self.session_id)
+ if result:
+ log.debug('new-membership-record-created',
+ session=self.session_id)
+ break
+ else:
+ log.warn('cannot-create-membership-record')
+ yield self._backoff('stale-membership-record')
else:
- log.warn('cannot-create-membership-record')
- yield self._backoff('stale-membership-record')
-
- @inlineCallbacks
- def _do_create_membership_record(self):
- consul = yield self.get_consul()
- result = yield consul.kv.put(
- self.membership_record_key,
- dumps(self._create_membership_record_data()),
- acquire=self.session_id)
- if not result:
- log.warn('cannot-create-membership-record')
- raise StaleMembershipEntryException(self.instance_id)
-
- @inlineCallbacks
- def _maintain_membership_record(self):
- index = None
- try:
- while 1:
- (index, record) = yield self._retry('GET',
- self.membership_record_key,
- wait='5s',
- index=index)
- if record is None or \
- 'Session' not in record or \
- record['Session'] != self.session_id:
- log.debug('membership-record-change-detected',
- old_session=self.session_id,
- index=index,
- record=record)
-
- yield self._do_create_membership_record_with_retries()
-
- except Exception, e:
- log.exception('unexpected-error-leader-tracking', e=e)
-
- finally:
- # except in shutdown, the loop must continue (after a short delay)
- if not self.shutting_down:
- reactor.callLater(self.membership_watch_relatch_delay,
- self._maintain_membership_record)
+ break
def _start_session_tracking(self):
reactor.callLater(0, self._session_tracking_loop)
@@ -297,10 +296,7 @@
@inlineCallbacks
def _redo_session():
log.info('_redo_session-before')
- try:
- yield self._delete_session()
- except Exception as e:
- log.exception('could-not-delete-old-session', e=e)
+ yield self._delete_session()
# Create a new consul connection/session
try:
@@ -311,8 +307,15 @@
lock_delay=1)
log.debug('new-consul-session', session=self.session_id)
- # Update my membership
- yield self._do_create_membership_record_with_retries()
+ # Update my membership - first stop the previous invocation
+ # if it is still running
+ if self.membership_callback and not self.membership_callback.called:
+ log.debug('cancelling-older-membership-request',
+ session=self.session_id)
+ self.membership_callback.cancel()
+
+ self.membership_callback = reactor.callLater(1,
+ self._do_create_membership_record_with_retries)
except Exception as e:
log.exception('could-not-create-a-consul-session', e=e)
@@ -477,8 +480,7 @@
while 1:
try:
consul = yield self.get_consul()
- log.debug('consul', consul=consul, operation=operation,
- args=args)
+ log.debug('start', operation=operation, args=args)
if operation == 'GET':
result = yield consul.kv.get(*args, **kw)
elif operation == 'PUT':
@@ -515,4 +517,5 @@
log.exception(e)
yield self._backoff('unknown-error')
+ log.debug('end', operation=operation, args=args)
returnValue(result)
diff --git a/voltha/worker.py b/voltha/worker.py
index bb8932a..c62c5fa 100644
--- a/voltha/worker.py
+++ b/voltha/worker.py
@@ -69,9 +69,15 @@
def stop(self):
log.debug('stopping')
+ self.halted = True
if isinstance(self.assignment_soak_timer, DelayedCall):
if not self.assignment_soak_timer.called:
self.assignment_soak_timer.cancel()
+
+ if isinstance(self.assignment_core_store_soak_timer, DelayedCall):
+ if not self.assignment_core_store_soak_timer.called:
+ self.assignment_core_store_soak_timer.cancel()
+
log.info('stopped')
@inlineCallbacks