VOL-1081: The cluster coordinator's leadership tracking loop stops working
* Ensured loop doesn't stop by placing the reactor.callLater within a finally
statement.
* Cleaned up the logging a bit.
Change-Id: I1e1276480ee551130b0122f8089843192d45492f
diff --git a/voltha/coordinator_etcd.py b/voltha/coordinator_etcd.py
index 3c13f67..c736104 100644
--- a/voltha/coordinator_etcd.py
+++ b/voltha/coordinator_etcd.py
@@ -395,65 +395,61 @@
@inlineCallbacks
def _leadership_tracking_loop(self):
- log.info('leadership-attempt-before')
-
- # Try to acquire leadership lease via test-and-set operation.
- # Success means the leader key was previously absent and was
- # just re-created by this instance.
- leader_prefix = bytes(self.leader_prefix)
- txn = Transaction(
- compare=[
- CompVersion(leader_prefix, '==', 0)
- ],
- success=[
- OpSet(leader_prefix, bytes(self.instance_id), lease=self.lease),
- OpGet(leader_prefix)
- ],
- failure=[]
- )
- newly_asserted = False
try:
- result = yield self.etcd.submit(txn)
- except Failed as failed:
- log.info('Leader key PRESENT')
- for response in failed.responses:
- log.info('Leader key already present', response=response)
- else:
- newly_asserted = True
- log.info('Leader key ABSENT')
- for response in result.responses:
- log.info('Leader key was absent', response=response)
+ # Try to seize leadership via test-and-set operation.
+ # Success means the leader key was previously absent
+ # and was just re-created by this instance.
- log.info('leadership-attempt-after')
-
- # Confirm that the assertion succeeded by reading back the value
- # of the leader key.
- leader = None
- result = yield self.etcd.get(b'service/voltha/leader')
- if result.kvs:
- kv = result.kvs[0]
- leader = kv.value
- log.info('Leader readback', leader=leader, instance=self.instance_id)
-
- if leader is None:
- log.info('Failed to read leader key')
- elif leader == self.instance_id:
- if newly_asserted:
- log.info("I JUST BECAME LEADER!")
- yield self._assert_leadership()
+ leader_prefix = bytes(self.leader_prefix)
+ txn = Transaction(
+ compare=[
+ CompVersion(leader_prefix, '==', 0)
+ ],
+ success=[
+ OpSet(leader_prefix, bytes(self.instance_id), lease=self.lease),
+ OpGet(leader_prefix)
+ ],
+ failure=[]
+ )
+ newly_asserted = False
+ try:
+ result = yield self.etcd.submit(txn)
+ except Failed as failed:
+ # Leader key already present
+ pass
else:
- log.info("I'm an aging LEADER")
- else:
- log.info('The LEADER is another', leader=leader)
- yield self._assert_nonleadership(leader)
+ newly_asserted = True
+ log.info('leader-key-absent')
- # May have to add code here to handle case where, for some reason, the lease
- # had been blown away and the txn failed for that reason
+ # Confirm that the assertion succeeded by reading back
+ # the value of the leader key.
+ leader = None
+ result = yield self.etcd.get(leader_prefix)
+ if result.kvs:
+ kv = result.kvs[0]
+ leader = kv.value
+ log.info('get-leader-key', leader=leader, instance=self.instance_id)
- # except in shutdown, the loop must continue (after a short delay)
- if not self.shutting_down:
- reactor.callLater(self.tracking_loop_delay,
- self._leadership_tracking_loop)
+ if leader is None:
+ log.error('get-leader-failed')
+ elif leader == self.instance_id:
+ if newly_asserted:
+ log.info('leadership-seized')
+ yield self._assert_leadership()
+ else:
+ log.info('already-leader')
+ else:
+ log.info('leader-is-another', leader=leader)
+ yield self._assert_nonleadership(leader)
+
+ except Exception, e:
+ log.exception('unexpected-error-leader-tracking', e=e)
+
+ finally:
+ # Except in shutdown, the loop must continue (after a short delay)
+ if not self.shutting_down:
+ reactor.callLater(self.tracking_loop_delay,
+ self._leadership_tracking_loop)
@inlineCallbacks
def _assert_leadership(self):