This commit lays the groudwork for inter-core request forwarding.
It includes: 1) adding global ids for both logical and device ids
and 2) keep a mapping of vore instances along with their respective
docker IPs.
Change-Id: I124b883cd639d6b507adef77ae09af7ca486fb14
diff --git a/voltha/coordinator.py b/voltha/coordinator.py
index 65786ff..20164d4 100644
--- a/voltha/coordinator.py
+++ b/voltha/coordinator.py
@@ -29,6 +29,7 @@
from common.utils.asleep import asleep
from voltha.registry import IComponent
from worker import Worker
+from simplejson import dumps, loads
log = get_logger()
@@ -224,10 +225,17 @@
yield self._retry(self._do_create_membership_record)
reactor.callLater(0, self._maintain_membership_record)
+ def _create_membership_record_data(self):
+ member_record = dict()
+ member_record['status'] = 'alive'
+ member_record['host_address'] = self.external_host_address
+ return member_record
+
@inlineCallbacks
def _do_create_membership_record(self):
result = yield self.consul.kv.put(
- self.membership_record_key, 'alive',
+ self.membership_record_key,
+ dumps(self._create_membership_record_data()),
acquire=self.session_id)
if not result:
raise StaleMembershipEntryException(self.instance_id)
@@ -242,7 +250,9 @@
index=index)
log.debug('membership-record-change-detected',
index=index, record=record)
- if record is None or record['Session'] != self.session_id:
+ if record is None or \
+ 'Session' not in record or \
+ record['Session'] != self.session_id:
log.debug('remaking-membership-record')
yield self._retry(self._do_create_membership_record)
diff --git a/voltha/core/adapter_agent.py b/voltha/core/adapter_agent.py
index 614d535..c6e2191 100644
--- a/voltha/core/adapter_agent.py
+++ b/voltha/core/adapter_agent.py
@@ -28,6 +28,7 @@
from common.event_bus import EventBusClient
from common.frameio.frameio import hexify
+from common.utils.id_generation import create_cluster_logical_device_ids
from voltha.adapters.interface import IAdapterAgent
from voltha.protos import third_party
from voltha.core.flow_decomposer import OUTPUT
@@ -375,10 +376,12 @@
logical_devices = self.root_proxy.get('/logical_devices')
existing_ids = set(ld.id for ld in logical_devices)
existing_datapath_ids = set(ld.datapath_id for ld in logical_devices)
+ core_id = registry('core').core_store_id
i = 1
while True:
- if i not in existing_datapath_ids and str(i) not in existing_ids:
- return i
+ ld_id, dp_id = create_cluster_logical_device_ids(core_id, i)
+ if dp_id not in existing_datapath_ids and ld_id not in existing_ids:
+ return ld_id, dp_id
i += 1
def get_logical_device(self, logical_device_id):
@@ -393,9 +396,9 @@
assert isinstance(logical_device, LogicalDevice)
if not logical_device.id:
- id = self._find_first_available_id()
- logical_device.id = str(id)
- logical_device.datapath_id = id
+ ld_id, dp_id = self._find_first_available_id()
+ logical_device.id = ld_id
+ logical_device.datapath_id = dp_id
self._make_up_to_date('/logical_devices',
logical_device.id, logical_device)
diff --git a/voltha/core/local_handler.py b/voltha/core/local_handler.py
index 00d5ecf..824bc85 100644
--- a/voltha/core/local_handler.py
+++ b/voltha/core/local_handler.py
@@ -20,8 +20,8 @@
from grpc import StatusCode
from common.utils.grpc_utils import twisted_async
+from common.utils.id_generation import create_cluster_device_id
from voltha.core.config.config_root import ConfigRoot
-from voltha.core.config.config_backend import ConsulStore
from voltha.protos.openflow_13_pb2 import PacketIn, Flows, FlowGroups, \
ofp_port_status
from voltha.protos.voltha_pb2 import \
@@ -279,7 +279,8 @@
return Device()
# fill additional data
- device.id = uuid4().hex[:12]
+ device.id = create_cluster_device_id(self.core_store_id)
+ log.debug('device-id-created', device_id=device.id)
device_type = known_device_types[device.type]
device.adapter = device_type.adapter
if device.admin_state != AdminState.PREPROVISIONED:
diff --git a/voltha/leader.py b/voltha/leader.py
index 6e58bd7..47ed8bc 100644
--- a/voltha/leader.py
+++ b/voltha/leader.py
@@ -77,7 +77,7 @@
@inlineCallbacks
def start(self):
log.debug('starting')
- yield self._validate_workload()
+ # yield self._validate_workload()
yield self._start_tracking_assignments()
log.info('started')
@@ -95,22 +95,22 @@
# Private methods:
- @inlineCallbacks
- def _validate_workload(self):
- """
- Workload is defined as any k/v entries under the workload prefix
- in consul. Under normal operation, only the leader shall edit the
- workload list. But we make sure that in case an administrator
- manually edits the workload, we react to that properly.
- """
-
- # TODO for now we simply generate a fixed number of fake entries
- yield DeferredList([
- self.coord.kv_put(
- self.coord.workload_prefix + 'device_group_%04d' % (i + 1),
- 'placeholder for device group %d data' % (i + 1))
- for i in xrange(100)
- ])
+ # @inlineCallbacks
+ # def _validate_workload(self):
+ # """
+ # Workload is defined as any k/v entries under the workload prefix
+ # in consul. Under normal operation, only the leader shall edit the
+ # workload list. But we make sure that in case an administrator
+ # manually edits the workload, we react to that properly.
+ # """
+ #
+ # # TODO for now we simply generate a fixed number of fake entries
+ # yield DeferredList([
+ # self.coord.kv_put(
+ # self.coord.workload_prefix + 'device_group_%04d' % (i + 1),
+ # 'placeholder for device group %d data' % (i + 1))
+ # for i in xrange(100)
+ # ])
def _start_tracking_assignments(self):
"""
@@ -118,59 +118,63 @@
list. Upon change in either, we must rerun our sharding algorithm
and reassign work as/if needed.
"""
- reactor.callLater(0, self._track_workload, 0)
+ # reactor.callLater(0, self._track_workload, 0)
reactor.callLater(0, self._track_members, 0)
- @inlineCallbacks
- def _track_workload(self, index):
-
- try:
- (index, results) = yield self.coord.kv_get(
- self.coord.workload_prefix, index=index, recurse=True)
-
- matches = (self.workload_id_match(e['Key']) for e in results)
- workload = [m.group(2) for m in matches if m is not None]
-
- if workload != self.workload:
- log.info('workload-changed',
- old_workload_count=len(self.workload),
- new_workload_count=len(workload))
- self.workload = workload
- self._restart_reassignment_soak_timer()
-
- except Exception, e:
- log.exception('workload-track-error', e=e)
- yield asleep(
- self.coord.leader_config.get(
- self.coord.leader_config[
- 'workload_track_error_to_prevent_flood'], 1))
- # to prevent flood
-
- finally:
- if not self.halted:
- reactor.callLater(0, self._track_workload, index)
+ # @inlineCallbacks
+ # def _track_workload(self, index):
+ #
+ # try:
+ # (index, results) = yield self.coord.kv_get(
+ # self.coord.workload_prefix, index=index, recurse=True)
+ #
+ # matches = (self.workload_id_match(e['Key']) for e in results)
+ # workload = [m.group(2) for m in matches if m is not None]
+ #
+ # if workload != self.workload:
+ # log.info('workload-changed',
+ # old_workload_count=len(self.workload),
+ # new_workload_count=len(workload))
+ # self.workload = workload
+ # self._restart_reassignment_soak_timer()
+ #
+ # except Exception, e:
+ # log.exception('workload-track-error', e=e)
+ # yield asleep(
+ # self.coord.leader_config.get(
+ # self.coord.leader_config[
+ # 'workload_track_error_to_prevent_flood'], 1))
+ # # to prevent flood
+ #
+ # finally:
+ # if not self.halted:
+ # reactor.callLater(0, self._track_workload, index)
@inlineCallbacks
def _get_core_store_mappings(self):
- # Get the mapping record
- (_, mappings) = yield self.coord.kv_get(
- self.core_store_assignment_key, recurse=True)
- if mappings:
- self.core_store_assignment = loads(mappings[0]['Value'])
- return
- else: # Key has not been created yet
- # Create the key with an empty dictionary value
- value = dict()
- result = yield self.coord.kv_put(self.core_store_assignment_key,
- dumps(value))
- if not result:
- raise ConfigMappingException(self.instance_id)
-
- # Ensure the record was created
+ try:
+ # Get the mapping record
(_, mappings) = yield self.coord.kv_get(
self.core_store_assignment_key, recurse=True)
+ if mappings:
+ self.core_store_assignment = loads(mappings[0]['Value'])
+ return
+ else: # Key has not been created yet
+ # Create the key with an empty dictionary value
+ value = dict()
+ result = yield self.coord.kv_put(self.core_store_assignment_key,
+ dumps(value))
+ if not result:
+ raise ConfigMappingException(self.instance_id)
- self.core_store_assignment = loads(mappings[0]['Value'])
+ # Ensure the record was created
+ (_, mappings) = yield self.coord.kv_get(
+ self.core_store_assignment_key, recurse=True)
+
+ self.core_store_assignment = loads(mappings[0]['Value'])
+
+ except Exception, e:
+ log.exception('error', e=e)
@inlineCallbacks
def _update_core_store_references(self):
@@ -191,7 +195,7 @@
assignment=self.core_store_assignment)
except Exception, e:
- log.exception('get-config-error', e=e)
+ log.exception('error-update-store', e=e)
@inlineCallbacks
def _track_members(self, index):
@@ -201,11 +205,11 @@
self.coord.membership_prefix, index=index, recurse=True)
# Only members with valid session are considered active
- matches = (self.member_id_match(e['Key'])
- for e in results if 'Session' in e)
- members = [m.group(2) for m in matches if m is not None]
+ members = [{'id':self.member_id_match(e['Key']).group(2),
+ 'host': loads(e['Value'])['host_address']}
+ for e in results if 'Session' in e ]
- log.debug('active-members', active_members=members)
+ log.info('active-members', active_members=members)
# Check if the two sets are the same
if members != self.members:
@@ -258,34 +262,34 @@
def _get_core_data_id_from_instance(instance_name):
for id, instance in self.core_store_assignment.iteritems():
- if instance == instance_name:
+ if instance and instance['id'] == instance_name:
return id
try:
- log.debug('reassign-core-stores', curr_members=self.members)
+ log.info('core-members', curr_members=self.members,
+ prev_members=self.core_store_assignment)
# 1. clear the mapping for instances that are no longer running
updated_mapping = dict()
existing_active_config_members = set()
cleared_config_ids = set()
inactive_members = set()
- log.debug('previous-assignment',
- core_store_assignment=self.core_store_assignment)
if self.core_store_assignment:
for id, instance in self.core_store_assignment.iteritems():
if instance not in self.members:
updated_mapping[id] = None
cleared_config_ids.add(id)
- inactive_members.add(instance)
+ if instance:
+ inactive_members.add(instance['id'])
else:
updated_mapping[id] = instance
- existing_active_config_members.add(instance)
+ existing_active_config_members.add(instance['id'])
# 2. Update the mapping with the new set
current_id = max(self.core_store_assignment) \
- if self.core_store_assignment else '0'
+ if self.core_store_assignment else '0'
for instance in self.members:
- if instance not in existing_active_config_members:
+ if instance['id'] not in existing_active_config_members:
# Add the member to the config map
if cleared_config_ids:
# There is an empty slot
@@ -297,118 +301,118 @@
updated_mapping[current_id] = instance
self.core_store_assignment = updated_mapping
- log.debug('updated-assignment',
- core_store_assignment=self.core_store_assignment)
+ log.info('updated-assignment',
+ core_store_assignment=self.core_store_assignment,
+ inactive_members=inactive_members)
# 3. save the mapping into consul
yield self.coord.kv_put(self.core_store_assignment_key,
dumps(self.core_store_assignment))
# 4. Assign the new workload to the newly created members
- curr_members_set = set(self.members)
- new_members = curr_members_set.difference(
- existing_active_config_members)
- for new_member in new_members:
+ curr_members_set = set([m['id'] for m in self.members])
+ new_members = curr_members_set.difference(existing_active_config_members)
+ for new_member_id in new_members:
yield self.coord.kv_put(
self.coord.assignment_prefix
- + new_member + '/' +
+ + new_member_id + '/' +
self.coord.core_storage_suffix,
- _get_core_data_id_from_instance(new_member))
+ _get_core_data_id_from_instance(new_member_id))
# 5. Remove non-existent members
- for member in inactive_members:
+ for member_id in inactive_members:
yield self.coord.kv_delete(
- self.coord.assignment_prefix + member, recurse=True)
+ self.coord.assignment_prefix + member_id, recurse=True)
yield self.coord.kv_delete(
- self.coord.membership_prefix + member,
+ self.coord.membership_prefix + member_id,
recurse=True)
except Exception as e:
log.exception('config-reassignment-failure', e=e)
self._restart_core_store_reassignment_soak_timer()
- @inlineCallbacks
- def _reassign_work(self):
-
- log.info('reassign-work')
-
- # Plan
- #
- # Step 1: calculate desired assignment from current members and
- # workload list (e.g., using consistent hashing or any other
- # algorithm
- # Step 2: collect current assignments from consul
- # Step 3: find the delta between the desired and actual assignments:
- # these form two lists:
- # 1. new assignments to be made
- # 2. obsolete assignments to be revoked
- # graceful handling may be desirable when moving existing
- # assignment from existing member to another member (to make
- # sure it is abandoned by old member before new takes charge)
- # Step 4: orchestrate the assignment by adding/deleting(/locking)
- # entries in consul
- #
- # We must make sure while we are working on this, we do not re-enter
- # into same method!
-
- try:
-
- # Step 1: generate wanted assignment (mapping work to members)
-
- ring = HashRing(self.members)
- wanted_assignments = dict() # member_id -> set(work_id)
- _ = [
- wanted_assignments.setdefault(ring.get_node(work), set())
- .add(work)
- for work in self.workload
- ]
- for (member, work) in sorted(wanted_assignments.iteritems()):
- log.info('assignment',
- member=member, work_count=len(work))
-
- # Step 2: discover current assignment (from consul)
-
- (_, results) = yield self.coord.kv_get(
- self.coord.assignment_prefix, recurse=True)
-
- matches = [
- (self.assignment_match(e['Key']), e) for e in results or []]
-
- current_assignments = dict() # member_id -> set(work_id)
- _ = [
- current_assignments.setdefault(
- m.groupdict()['member_id'], set())
- .add(m.groupdict()['work_id'])
- for m, e in matches if m is not None
- ]
-
- # Step 3: handle revoked assignments first on a per member basis
-
- for member_id, current_work in current_assignments.iteritems():
- assert isinstance(current_work, set)
- wanted_work = wanted_assignments.get(member_id, set())
- work_to_revoke = current_work.difference(wanted_work)
-
- # revoking work by simply deleting the assignment entry
- # TODO if we want some feedback to see that member abandoned
- # work, we could add a consul-based protocol here
- for work_id in work_to_revoke:
- yield self.coord.kv_delete(
- self.coord.assignment_prefix
- + member_id + '/' + work_id)
-
- # Step 4: assign new work as needed
-
- for member_id, wanted_work in wanted_assignments.iteritems():
- assert isinstance(wanted_work, set)
- current_work = current_assignments.get(member_id, set())
- work_to_assign = wanted_work.difference(current_work)
-
- for work_id in work_to_assign:
- yield self.coord.kv_put(
- self.coord.assignment_prefix
- + member_id + '/' + work_id, '')
-
- except Exception, e:
- log.exception('failed-reassignment', e=e)
- self._restart_reassignment_soak_timer() # try again in a while
+ # @inlineCallbacks
+ # def _reassign_work(self):
+ #
+ # log.info('reassign-work')
+ #
+ # # Plan
+ # #
+ # # Step 1: calculate desired assignment from current members and
+ # # workload list (e.g., using consistent hashing or any other
+ # # algorithm
+ # # Step 2: collect current assignments from consul
+ # # Step 3: find the delta between the desired and actual assignments:
+ # # these form two lists:
+ # # 1. new assignments to be made
+ # # 2. obsolete assignments to be revoked
+ # # graceful handling may be desirable when moving existing
+ # # assignment from existing member to another member (to make
+ # # sure it is abandoned by old member before new takes charge)
+ # # Step 4: orchestrate the assignment by adding/deleting(/locking)
+ # # entries in consul
+ # #
+ # # We must make sure while we are working on this, we do not re-enter
+ # # into same method!
+ #
+ # try:
+ #
+ # # Step 1: generate wanted assignment (mapping work to members)
+ #
+ # ring = HashRing(self.members)
+ # wanted_assignments = dict() # member_id -> set(work_id)
+ # _ = [
+ # wanted_assignments.setdefault(ring.get_node(work), set())
+ # .add(work)
+ # for work in self.workload
+ # ]
+ # for (member, work) in sorted(wanted_assignments.iteritems()):
+ # log.info('assignment',
+ # member=member, work_count=len(work))
+ #
+ # # Step 2: discover current assignment (from consul)
+ #
+ # (_, results) = yield self.coord.kv_get(
+ # self.coord.assignment_prefix, recurse=True)
+ #
+ # matches = [
+ # (self.assignment_match(e['Key']), e) for e in results or []]
+ #
+ # current_assignments = dict() # member_id -> set(work_id)
+ # _ = [
+ # current_assignments.setdefault(
+ # m.groupdict()['member_id'], set())
+ # .add(m.groupdict()['work_id'])
+ # for m, e in matches if m is not None
+ # ]
+ #
+ # # Step 3: handle revoked assignments first on a per member basis
+ #
+ # for member_id, current_work in current_assignments.iteritems():
+ # assert isinstance(current_work, set)
+ # wanted_work = wanted_assignments.get(member_id, set())
+ # work_to_revoke = current_work.difference(wanted_work)
+ #
+ # # revoking work by simply deleting the assignment entry
+ # # TODO if we want some feedback to see that member abandoned
+ # # work, we could add a consul-based protocol here
+ # for work_id in work_to_revoke:
+ # yield self.coord.kv_delete(
+ # self.coord.assignment_prefix
+ # + member_id + '/' + work_id)
+ #
+ # # Step 4: assign new work as needed
+ #
+ # for member_id, wanted_work in wanted_assignments.iteritems():
+ # assert isinstance(wanted_work, set)
+ # current_work = current_assignments.get(member_id, set())
+ # work_to_assign = wanted_work.difference(current_work)
+ #
+ # for work_id in work_to_assign:
+ # yield self.coord.kv_put(
+ # self.coord.assignment_prefix
+ # + member_id + '/' + work_id, '')
+ #
+ # except Exception, e:
+ # log.exception('failed-reassignment', e=e)
+ # self._restart_reassignment_soak_timer() # try again in a while