This commit addresses some of the border conditions when a consul node
goes down and voltha needs to recuperate with the other other nodes.

Change-Id: I9c5bd997ddf9624295e50ce86638eaf8d803a6e7
diff --git a/voltha/coordinator.py b/voltha/coordinator.py
index cf38d69..a18b9f1 100644
--- a/voltha/coordinator.py
+++ b/voltha/coordinator.py
@@ -72,7 +72,7 @@
         self.worker_config = config['worker']
         self.leader_config = config['leader']
         self.membership_watch_relatch_delay = config.get(
-            'membership_watch_relatch_delay', 0.1)
+            'membership_watch_relatch_delay', 5.0)
         self.tracking_loop_delay = config.get(
             'tracking_loop_delay', 1)
         self.session_renewal_timeout = config.get(
@@ -106,7 +106,7 @@
         self.leader_id = None  # will be the instance id of the current leader
         self.shutting_down = False
         self.leader = None
-        self.session_renew_timer = None
+        self.membership_callback = None
 
         self.worker = Worker(self.instance_id, self)
 
@@ -130,7 +130,6 @@
     def stop(self):
         log.debug('stopping')
         self.shutting_down = True
-        self.session_renew_timer.stop()
         yield self._delete_session()  # this will delete the leader lock too
         yield self.worker.stop()
         if self.leader is not None:
@@ -221,12 +220,15 @@
 
     @inlineCallbacks
     def _delete_session(self):
-        yield self.consul.session.destroy(self.session_id)
+        try:
+            yield self.consul.session.destroy(self.session_id)
+        except Exception as e:
+            log.exception('failed-to-delete-session',
+                         session_id=self.session_id)
 
     @inlineCallbacks
     def _create_membership_record(self):
         yield self._do_create_membership_record_with_retries()
-        reactor.callLater(0, self._maintain_membership_record)
 
     def _create_membership_record_data(self):
         member_record = dict()
@@ -235,58 +237,55 @@
         return member_record
 
     @inlineCallbacks
+    def _assert_membership_record_valid(self):
+        try:
+            log.debug('membership-record-before')
+            (_, record) = yield self._retry('GET', self.membership_record_key)
+            log.debug('membership-record-after', record=record)
+            if record is None or \
+                        'Session' not in record or \
+                        record['Session'] != self.session_id:
+                log.debug('membership-record-change-detected',
+                          old_session=self.session_id,
+                          record=record)
+                returnValue(False)
+            else:
+                returnValue(True)
+        except Exception as e:
+            log.exception('membership-validation-exception', e=e)
+            returnValue(False)
+
+
+    @inlineCallbacks
     def _do_create_membership_record_with_retries(self):
         while 1:
-            result = yield self._retry(
-                'PUT',
-                self.membership_record_key,
-                dumps(self._create_membership_record_data()),
-                acquire=self.session_id)
-            if result:
-                log.debug('new-membership-record', session=self.session_id)
-                break
+            # First check whether we need to create membership
+            valid_membership = yield self._assert_membership_record_valid()
+            if not valid_membership:
+                try:
+                    # Delete old record
+                    yield self._retry('DELETE', self.membership_record_key)
+                except Exception as e:
+                    log.info('cannot-delete-older-membership', e=e)
+
+                # Just for consul to sync up, lets wait 1 second
+                yield asleep(1)
+
+                log.debug('recreating-membership', session=self.session_id)
+                result = yield self._retry(
+                    'PUT',
+                    self.membership_record_key,
+                    dumps(self._create_membership_record_data()),
+                    acquire=self.session_id)
+                if result:
+                    log.debug('new-membership-record-created',
+                              session=self.session_id)
+                    break
+                else:
+                    log.warn('cannot-create-membership-record')
+                    yield self._backoff('stale-membership-record')
             else:
-                log.warn('cannot-create-membership-record')
-                yield self._backoff('stale-membership-record')
-
-    @inlineCallbacks
-    def _do_create_membership_record(self):
-        consul = yield self.get_consul()
-        result = yield consul.kv.put(
-            self.membership_record_key,
-            dumps(self._create_membership_record_data()),
-            acquire=self.session_id)
-        if not result:
-            log.warn('cannot-create-membership-record')
-            raise StaleMembershipEntryException(self.instance_id)
-
-    @inlineCallbacks
-    def _maintain_membership_record(self):
-        index = None
-        try:
-            while 1:
-                (index, record) = yield self._retry('GET',
-                                                    self.membership_record_key,
-                                                    wait='5s',
-                                                    index=index)
-                if record is None or \
-                                'Session' not in record or \
-                                record['Session'] != self.session_id:
-                    log.debug('membership-record-change-detected',
-                              old_session=self.session_id,
-                              index=index,
-                              record=record)
-
-                    yield self._do_create_membership_record_with_retries()
-
-        except Exception, e:
-            log.exception('unexpected-error-leader-tracking', e=e)
-
-        finally:
-            # except in shutdown, the loop must continue (after a short delay)
-            if not self.shutting_down:
-                reactor.callLater(self.membership_watch_relatch_delay,
-                                  self._maintain_membership_record)
+                break
 
     def _start_session_tracking(self):
         reactor.callLater(0, self._session_tracking_loop)
@@ -297,10 +296,7 @@
         @inlineCallbacks
         def _redo_session():
             log.info('_redo_session-before')
-            try:
-                yield self._delete_session()
-            except Exception as e:
-                log.exception('could-not-delete-old-session', e=e)
+            yield self._delete_session()
 
             # Create a new consul connection/session
             try:
@@ -311,8 +307,15 @@
                     lock_delay=1)
                 log.debug('new-consul-session', session=self.session_id)
 
-                # Update my membership
-                yield self._do_create_membership_record_with_retries()
+                # Update my membership - first stop the previous invocation
+                # if it is still running
+                if self.membership_callback and not self.membership_callback.called:
+                    log.debug('cancelling-older-membership-request',
+                              session=self.session_id)
+                    self.membership_callback.cancel()
+
+                self.membership_callback = reactor.callLater(1,
+                               self._do_create_membership_record_with_retries)
 
             except Exception as e:
                 log.exception('could-not-create-a-consul-session', e=e)
@@ -477,8 +480,7 @@
         while 1:
             try:
                 consul = yield self.get_consul()
-                log.debug('consul', consul=consul, operation=operation,
-                         args=args)
+                log.debug('start', operation=operation, args=args)
                 if operation == 'GET':
                     result = yield consul.kv.get(*args, **kw)
                 elif operation == 'PUT':
@@ -515,4 +517,5 @@
                     log.exception(e)
                 yield self._backoff('unknown-error')
 
+        log.debug('end', operation=operation, args=args)
         returnValue(result)
diff --git a/voltha/worker.py b/voltha/worker.py
index bb8932a..c62c5fa 100644
--- a/voltha/worker.py
+++ b/voltha/worker.py
@@ -69,9 +69,15 @@
 
     def stop(self):
         log.debug('stopping')
+        self.halted = True
         if isinstance(self.assignment_soak_timer, DelayedCall):
             if not self.assignment_soak_timer.called:
                 self.assignment_soak_timer.cancel()
+
+        if isinstance(self.assignment_core_store_soak_timer, DelayedCall):
+            if not self.assignment_core_store_soak_timer.called:
+                self.assignment_core_store_soak_timer.cancel()
+
         log.info('stopped')
 
     @inlineCallbacks