CORD-479 : Pulling literals out into voltha yaml file
Change-Id: I6f948bac24e6a8f13253592f21de12196888e2e5
diff --git a/voltha/coordinator.py b/voltha/coordinator.py
index 636e383..1c8c57b 100644
--- a/voltha/coordinator.py
+++ b/voltha/coordinator.py
@@ -48,11 +48,11 @@
CONNECT_RETRY_INTERVAL_SEC = 1
RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
- LEADER_KEY = 'service/voltha/leader'
+ #LEADER_KEY = 'service/voltha/leader'
- MEMBERSHIP_PREFIX = 'service/voltha/members/'
- ASSIGNMENT_PREFIX = 'service/voltha/assignments/'
- WORKLOAD_PREFIX = 'service/voltha/work/'
+ #MEMBERSHIP_PREFIX = 'service/voltha/members/'
+ #ASSIGNMENT_PREFIX = 'service/voltha/assignments/'
+ #WORKLOAD_PREFIX = 'service/voltha/work/'
# Public methods:
@@ -61,15 +61,36 @@
external_host_address,
instance_id,
rest_port,
+ config,
consul='localhost:8500',
leader_class=Leader):
+ self.log = get_logger()
+ self.log.info('initializing-coordinator')
+ self.config = config['coordinator']
+ self.worker_config = config['worker']
+ self.leader_config = config['leader']
+ #self.log.info('config: %r' % self.config)
+ self.membership_watch_relatch_delay = config.get(
+ 'membership_watch_relatch_delay', 0.1)
+ self.tracking_loop_delay = config.get(
+ 'tracking_loop_delay', 1)
+ self.prefix = self.config.get('voltha_kv_prefix', 'service/voltha')
+ self.leader_prefix = '/'.join([self.prefix, self.config.get(
+ self.config['leader_key'], 'leader')])
+ self.membership_prefix = '/'.join([self.prefix, self.config.get(
+ self.config['membership_key'], 'members')])
+ self.assignment_prefix = '/'.join([self.prefix, self.config.get(
+ self.config['assignment_key'], 'assignments')])
+ self.workload_prefix = '/'.join([self.prefix, self.config.get(
+ self.config['workload_key'], 'work')])
+
self.retries = 0
self.instance_id = instance_id
self.internal_host_address = internal_host_address
self.external_host_address = external_host_address
self.rest_port = rest_port
- self.membership_record_key = self.MEMBERSHIP_PREFIX + self.instance_id
+ self.membership_record_key = self.membership_prefix + self.instance_id
self.session_id = None
self.i_am_leader = False
@@ -79,9 +100,6 @@
self.worker = Worker(self.instance_id, self)
- self.log = get_logger()
- self.log.info('initializing-coordinator')
-
host = consul.split(':')[0].strip()
port = int(consul.split(':')[1].strip())
@@ -211,7 +229,8 @@
finally:
# except in shutdown, the loop must continue (after a short delay)
if not self.shutting_down:
- reactor.callLater(0.1, self._maintain_membership_record)
+ reactor.callLater(self.membership_watch_relatch_delay,
+ self._maintain_membership_record)
def _start_leader_tracking(self):
reactor.callLater(0, self._leadership_tracking_loop)
@@ -228,7 +247,7 @@
# attempt acquire leader lock
self.log.debug('leadership-attempt')
result = yield self._retry(self.consul.kv.put,
- self.LEADER_KEY,
+ self.leader_prefix,
self.instance_id,
acquire=self.session_id)
@@ -238,7 +257,7 @@
# key gets wiped out administratively since the previous line,
# the returned record can be None. Handle it.
(index, record) = yield self._retry(self.consul.kv.get,
- self.LEADER_KEY)
+ self.leader_prefix)
self.log.debug('leadership-key',
i_am_leader=result, index=index, record=record)
@@ -259,7 +278,7 @@
while last is not None:
# this shall return only when update is made to leader key
(index, updated) = yield self._retry(self.consul.kv.get,
- self.LEADER_KEY,
+ self.leader_prefix,
index=index)
self.log.debug('leader-key-change',
index=index, updated=updated)
@@ -275,7 +294,8 @@
finally:
# except in shutdown, the loop must continue (after a short delay)
if not self.shutting_down:
- reactor.callLater(1, self._leadership_tracking_loop)
+ reactor.callLater(self.tracking_loop_delay,
+ self._leadership_tracking_loop)
@inlineCallbacks
def _assert_leadership(self):
diff --git a/voltha/leader.py b/voltha/leader.py
index 9f6b715..7f1e177 100644
--- a/voltha/leader.py
+++ b/voltha/leader.py
@@ -51,13 +51,13 @@
self.reassignment_soak_timer = None
self.workload_id_match = re.compile(
- self.ID_EXTRACTOR % self.coord.WORKLOAD_PREFIX).match
+ self.ID_EXTRACTOR % self.coord.workload_prefix).match
self.member_id_match = re.compile(
- self.ID_EXTRACTOR % self.coord.MEMBERSHIP_PREFIX).match
+ self.ID_EXTRACTOR % self.coord.membership_prefix).match
self.assignment_match = re.compile(
- self.ASSIGNMENT_EXTRACTOR % self.coord.ASSIGNMENT_PREFIX).match
+ self.ASSIGNMENT_EXTRACTOR % self.coord.assignment_prefix).match
@inlineCallbacks
def start(self):
@@ -89,7 +89,7 @@
# TODO for now we simply generate a fixed number of fake entries
yield DeferredList([
self.coord.kv_put(
- self.coord.WORKLOAD_PREFIX + 'device_group_%04d' % (i + 1),
+ self.coord.workload_prefix + 'device_group_%04d' % (i + 1),
'placeholder for device group %d data' % (i + 1))
for i in xrange(100)
])
@@ -108,7 +108,7 @@
try:
(index, results) = yield self.coord.kv_get(
- self.coord.WORKLOAD_PREFIX, index=index, recurse=True)
+ self.coord.workload_prefix, index=index, recurse=True)
matches = (self.workload_id_match(e['Key']) for e in results)
workload = [m.group(2) for m in matches if m is not None]
@@ -122,7 +122,11 @@
except Exception, e:
self.log.exception('workload-track-error', e=e)
- yield asleep(1.0) # to prevent flood
+ yield asleep(
+ self.coord.leader_config.get(
+ self.coord.leader_config[
+ 'workload_track_error_to_prevent_flood'], 1))
+ # to prevent flood
finally:
if not self.halted:
@@ -133,7 +137,7 @@
try:
(index, results) = yield self.coord.kv_get(
- self.coord.MEMBERSHIP_PREFIX, index=index, recurse=True)
+ self.coord.membership_prefix, index=index, recurse=True)
matches = (self.member_id_match(e['Key']) for e in results or [])
members = [m.group(2) for m in matches if m is not None]
@@ -147,7 +151,11 @@
except Exception, e:
self.log.exception('members-track-error', e=e)
- yield asleep(1.0) # to prevent flood
+ yield asleep(
+ self.coord.leader_config.get(
+ self.coord.leader_config[
+ 'members_track_error_to_prevent_flood']), 1)
+ # to prevent flood
finally:
if not self.halted:
@@ -205,7 +213,7 @@
# Step 2: discover current assignment (from consul)
(_, results) = yield self.coord.kv_get(
- self.coord.ASSIGNMENT_PREFIX, recurse=True)
+ self.coord.assignment_prefix, recurse=True)
matches = [
(self.assignment_match(e['Key']), e) for e in results or []]
@@ -230,7 +238,7 @@
# work, we could add a consul-based protocol here
for work_id in work_to_revoke:
yield self.coord.kv_delete(
- self.coord.ASSIGNMENT_PREFIX
+ self.coord.assignment_prefix
+ member_id + '/' + work_id)
# Step 4: assign new work as needed
@@ -242,7 +250,7 @@
for work_id in work_to_assign:
yield self.coord.kv_put(
- self.coord.ASSIGNMENT_PREFIX
+ self.coord.assignment_prefix
+ member_id + '/' + work_id, '')
except Exception, e:
diff --git a/voltha/main.py b/voltha/main.py
index 29edc0b..3d03653 100755
--- a/voltha/main.py
+++ b/voltha/main.py
@@ -215,6 +215,9 @@
verbosity_adjust=verbosity_adjust,
fluentd=args.fluentd)
+ # configurable variables from voltha.yml file
+ #self.configurable_vars = self.config.get('Constants', {})
+
# components
self.coordinator = None
self.grpc_server = None
@@ -239,6 +242,7 @@
external_host_address=self.args.external_host_address,
rest_port=self.args.rest_port,
instance_id=self.args.instance_id,
+ config=self.config,
consul=self.args.consul)
init_rest_service(self.args.rest_port)
diff --git a/voltha/voltha.yml b/voltha/voltha.yml
index ae19f05..e5d7e18 100644
--- a/voltha/voltha.yml
+++ b/voltha/voltha.yml
@@ -42,3 +42,20 @@
level: INFO # this can be bumped up/down by -q and -v command line
# options
propagate: False
+
+coordinator:
+ voltha_kv_prefix: 'service/voltha'
+ leader_key: 'leader'
+ membership_key: 'members'
+ assignment_key: 'assignments'
+ workload_key: 'work'
+ membership_watch_relatch_delay: 0.1
+ tracking_loop_delay: 1
+
+worker:
+ time_to_let_leader_update: 5
+ assignments_track_error_to_avoid_flood: 1
+
+leader:
+ workload_track_error_to_prevent_flood: 1
+ members_track_error_to_prevent_flood: 1
diff --git a/voltha/worker.py b/voltha/worker.py
index 2e531a4..8d0206c 100644
--- a/voltha/worker.py
+++ b/voltha/worker.py
@@ -49,7 +49,7 @@
self.my_candidate_workload = set() # we stash here during soaking
self.assignment_match = re.compile(
- self.ASSIGNMENT_EXTRACTOR % self.coord.ASSIGNMENT_PREFIX).match
+ self.ASSIGNMENT_EXTRACTOR % self.coord.assignment_prefix).match
@inlineCallbacks
def start(self):
@@ -78,10 +78,11 @@
yield d
# additional time to let leader update
# assignments, to minimize potential churn
- yield asleep(5)
+ yield asleep(self.coord.worker_config.get(
+ self.coord.worker_config['time_to_let_leader_update'], 5))
(index, results) = yield self.coord.kv_get(
- self.coord.ASSIGNMENT_PREFIX + self.instance_id,
+ self.coord.assignment_prefix + self.instance_id,
index=index, recurse=True)
matches = [
@@ -96,7 +97,11 @@
except Exception, e:
self.log.exception('assignments-track-error', e=e)
- yield asleep(1) # to prevent flood
+ yield asleep(
+ self.coord.worker_config.get(
+ self.coord.worker_config[
+ 'assignments_track_error_to_avoid_flood'], 1))
+ # to prevent flood
finally:
if not self.halted: