This commit does the following:
1) Update the consul session TTL and the voltha consul timeouts
2) Remove the dependency of session creation with consul membership update.
Change-Id: I8e64279abd3110d1a34c76c30733325561eafe06
diff --git a/voltha/coordinator.py b/voltha/coordinator.py
index a18b9f1..a1777a1 100644
--- a/voltha/coordinator.py
+++ b/voltha/coordinator.py
@@ -72,13 +72,17 @@
self.worker_config = config['worker']
self.leader_config = config['leader']
self.membership_watch_relatch_delay = config.get(
- 'membership_watch_relatch_delay', 5.0)
- self.tracking_loop_delay = config.get(
+ 'membership_watch_relatch_delay', 0.1)
+ self.tracking_loop_delay = self.config.get(
'tracking_loop_delay', 1)
- self.session_renewal_timeout = config.get(
- 'session_renewal_timeout', 2)
- self.session_renewal_loop_delay = config.get(
+ self.session_renewal_timeout = self.config.get(
+ 'session_renewal_timeout', 5)
+ self.session_renewal_loop_delay = self.config.get(
'session_renewal_loop_delay', 3)
+ self.membership_maintenance_loop_delay = self.config.get(
+ 'membership_maintenance_loop_delay', 5)
+ self.session_time_to_live = self.config.get(
+ 'session_time_to_live', 10)
self.prefix = self.config.get('voltha_kv_prefix', 'service/voltha')
self.leader_prefix = '/'.join((self.prefix, self.config.get(
self.config['leader_key'], 'leader')))
@@ -212,7 +216,8 @@
consul = yield self.get_consul()
# create consul session
self.session_id = yield consul.session.create(
- behavior='release', ttl=10, lock_delay=1)
+ behavior='release', ttl=self.session_time_to_live,
+ lock_delay=1)
log.info('created-consul-session', session_id=self.session_id)
self._start_session_tracking()
@@ -224,11 +229,36 @@
yield self.consul.session.destroy(self.session_id)
except Exception as e:
log.exception('failed-to-delete-session',
- session_id=self.session_id)
+ session_id=self.session_id)
@inlineCallbacks
def _create_membership_record(self):
yield self._do_create_membership_record_with_retries()
+ reactor.callLater(0, self._maintain_membership_record)
+
+ @inlineCallbacks
+ def _maintain_membership_record(self):
+ try:
+ while 1:
+ valid_membership = yield self._assert_membership_record_valid()
+ if not valid_membership:
+ log.info('recreating-membership-before',
+ session=self.session_id)
+ yield self._do_create_membership_record_with_retries()
+ log.info('recreating-membership-after',
+ session=self.session_id)
+ else:
+ log.debug('valid-membership', session=self.session_id)
+ # Async sleep before checking the membership record again
+ yield asleep(self.membership_maintenance_loop_delay)
+
+ except Exception, e:
+ log.exception('unexpected-error-leader-trackin', e=e)
+ finally:
+ # except in shutdown, the loop must continue (after a short delay)
+ if not self.shutting_down:
+ reactor.callLater(self.membership_watch_relatch_delay,
+ self._maintain_membership_record)
def _create_membership_record_data(self):
member_record = dict()
@@ -239,15 +269,16 @@
@inlineCallbacks
def _assert_membership_record_valid(self):
try:
- log.debug('membership-record-before')
- (_, record) = yield self._retry('GET', self.membership_record_key)
- log.debug('membership-record-after', record=record)
+ log.info('membership-record-before')
+ (_, record) = yield self._retry('GET',
+ self.membership_record_key)
+ log.info('membership-record-after', record=record)
if record is None or \
- 'Session' not in record or \
- record['Session'] != self.session_id:
- log.debug('membership-record-change-detected',
- old_session=self.session_id,
- record=record)
+ 'Session' not in record or \
+ record['Session'] != self.session_id:
+ log.info('membership-record-change-detected',
+ old_session=self.session_id,
+ record=record)
returnValue(False)
else:
returnValue(True)
@@ -255,37 +286,22 @@
log.exception('membership-validation-exception', e=e)
returnValue(False)
-
@inlineCallbacks
def _do_create_membership_record_with_retries(self):
while 1:
- # First check whether we need to create membership
- valid_membership = yield self._assert_membership_record_valid()
- if not valid_membership:
- try:
- # Delete old record
- yield self._retry('DELETE', self.membership_record_key)
- except Exception as e:
- log.info('cannot-delete-older-membership', e=e)
-
- # Just for consul to sync up, lets wait 1 second
- yield asleep(1)
-
- log.debug('recreating-membership', session=self.session_id)
- result = yield self._retry(
- 'PUT',
- self.membership_record_key,
- dumps(self._create_membership_record_data()),
- acquire=self.session_id)
- if result:
- log.debug('new-membership-record-created',
- session=self.session_id)
- break
- else:
- log.warn('cannot-create-membership-record')
- yield self._backoff('stale-membership-record')
- else:
+ log.info('recreating-membership', session=self.session_id)
+ result = yield self._retry(
+ 'PUT',
+ self.membership_record_key,
+ dumps(self._create_membership_record_data()),
+ acquire=self.session_id)
+ if result:
+ log.info('new-membership-record-created',
+ session=self.session_id)
break
+ else:
+ log.warn('cannot-create-membership-record')
+ yield self._backoff('stale-membership-record')
def _start_session_tracking(self):
reactor.callLater(0, self._session_tracking_loop)
@@ -298,24 +314,14 @@
log.info('_redo_session-before')
yield self._delete_session()
- # Create a new consul connection/session
+ # Create a new consul connection/session with a TTL of 25 secs
try:
self.consul = Consul(host=self.host, port=self.port)
self.session_id = yield self.consul.session.create(
behavior='release',
- ttl=10,
+ ttl=self.session_time_to_live,
lock_delay=1)
- log.debug('new-consul-session', session=self.session_id)
-
- # Update my membership - first stop the previous invocation
- # if it is still running
- if self.membership_callback and not self.membership_callback.called:
- log.debug('cancelling-older-membership-request',
- session=self.session_id)
- self.membership_callback.cancel()
-
- self.membership_callback = reactor.callLater(1,
- self._do_create_membership_record_with_retries)
+ log.info('new-consul-session', session=self.session_id)
except Exception as e:
log.exception('could-not-create-a-consul-session', e=e)
@@ -323,36 +329,47 @@
@inlineCallbacks
def _renew_session(m_callback):
try:
- log.info('_renew_session-before')
- result = yield self.consul.session.renew(
+ log.debug('_renew_session-before')
+ consul_ref = self.consul
+ result = yield consul_ref.session.renew(
session_id=self.session_id)
- log.debug('just-renewed-session', result=result)
+ log.info('just-renewed-session', result=result)
if not m_callback.called:
# Triggering callback will cancel the timeout timer
- log.debug('trigger-callback-to-cancel-timout-timer')
+ log.info('trigger-callback-to-cancel-timout-timer')
m_callback.callback(result)
+ else:
+ # Timeout event has already been called. Just ignore
+ # this event
+ log.info('renew-called-after-timout',
+ new_consul_ref=self.consul,
+ old_consul_ref=consul_ref)
except Exception, e:
# Let the invoking method receive a timeout
log.exception('could-not-renew-session', e=e)
try:
- log.debug('session-tracking-start')
- # Set a timeout timer (~ 2 secs) - should be more than
- # enough to renew a session
- rcvd = DeferredWithTimeout(timeout=self.session_renewal_timeout)
- _renew_session(rcvd)
- try:
- _ = yield rcvd
- except TimeOutError as e:
- log.info('session-renew-timeout', e=e)
- # Redo the session
- yield _redo_session()
- except Exception as e:
- log.exception('session-renew-exception', e=e)
- else:
- log.debug('successfully-renewed-session')
+ while 1:
+ log.debug('session-tracking-start')
+ rcvd = DeferredWithTimeout(
+ timeout=self.session_renewal_timeout)
+ _renew_session(rcvd)
+ try:
+ _ = yield rcvd
+ except TimeOutError as e:
+ log.info('session-renew-timeout', e=e)
+ # Redo the session
+ yield _redo_session()
+ except Exception as e:
+ log.exception('session-renew-exception', e=e)
+ else:
+ log.debug('successfully-renewed-session')
+
+ # Async sleep before the next session tracking
+ yield asleep(self.session_renewal_loop_delay)
+
except Exception as e:
- print 'got an exception:{}'.format(e)
+ log.exception('renew-exception', e=e)
finally:
reactor.callLater(self.session_renewal_loop_delay,
self._session_tracking_loop)
@@ -368,12 +385,12 @@
# is then the value under the leader key service/voltha/leader.
# attempt acquire leader lock
- log.debug('leadership-attempt-before')
+ log.info('leadership-attempt-before')
result = yield self._retry('PUT',
self.leader_prefix,
self.instance_id,
acquire=self.session_id)
- log.debug('leadership-attempt-after')
+ log.info('leadership-attempt-after')
# read it back before being too happy; seeing our session id is a
# proof and now we have the change id that we can use to reliably
@@ -382,8 +399,8 @@
# the returned record can be None. Handle it.
(index, record) = yield self._retry('GET',
self.leader_prefix)
- log.debug('leader-prefix',
- i_am_leader=result, index=index, record=record)
+ log.info('leader-prefix',
+ i_am_leader=result, index=index, record=record)
if record is not None:
if result is True:
@@ -408,15 +425,15 @@
wait='5s',
index=index)
if updated is None or updated != last:
- log.debug('leader-key-change',
- index=index, updated=updated, last=last)
+ log.info('leader-key-change',
+ index=index, updated=updated, last=last)
# leadership has changed or vacated (or forcefully
# removed), apply now
# If I was previoulsy the leader then assert a non
# leadership role before going for election
if self.i_am_leader:
- log.debug('leaving-leaderdhip',
- leader=self.instance_id)
+ log.info('leaving-leaderdhip',
+ leader=self.instance_id)
yield self._assert_nonleadership(self.instance_id)
break
@@ -480,7 +497,7 @@
while 1:
try:
consul = yield self.get_consul()
- log.debug('start', operation=operation, args=args)
+ log.info('start', operation=operation, args=args)
if operation == 'GET':
result = yield consul.kv.get(*args, **kw)
elif operation == 'PUT':
@@ -517,5 +534,5 @@
log.exception(e)
yield self._backoff('unknown-error')
- log.debug('end', operation=operation, args=args)
+ log.info('end', operation=operation, args=args)
returnValue(result)
diff --git a/voltha/core/config/config_backend.py b/voltha/core/config/config_backend.py
index 366b299..1360375 100644
--- a/voltha/core/config/config_backend.py
+++ b/voltha/core/config/config_backend.py
@@ -114,7 +114,7 @@
while 1:
try:
consul = self._get_consul()
- log.info('consul', consul=consul, operation=operation,
+ log.debug('consul', consul=consul, operation=operation,
args=args)
if operation == 'GET':
index, result = consul.kv.get(*args, **kw)
diff --git a/voltha/voltha.yml b/voltha/voltha.yml
index edb1669..8b5b1ee 100644
--- a/voltha/voltha.yml
+++ b/voltha/voltha.yml
@@ -17,7 +17,7 @@
handlers:
console:
class : logging.StreamHandler
- level: DEBUG
+ level: INFO
formatter: default
stream: ext://sys.stdout
fluent:
@@ -26,14 +26,14 @@
port: 24224
tag: voltha.logging
formatter: fluent_fmt
- level: DEBUG
+ level: INFO
localRotatingFile:
class: logging.handlers.RotatingFileHandler
filename: voltha.log
formatter: default
maxBytes: 2097152
backupCount: 10
- level: DEBUG
+ level: INFO
null:
class: logging.NullHandler
@@ -61,9 +61,11 @@
assignment_key: 'assignments'
workload_key: 'work'
membership_watch_relatch_delay: 0.1
+ membership_maintenance_loop_delay: 5
tracking_loop_delay: 1
- session_renewal_loop_delay: 3
- session_renewal_timeout: 2
+ session_time_to_live: 300
+ session_renewal_loop_delay: 30
+ session_renewal_timeout: 60
worker:
time_to_let_leader_update: 5