[ 4460 ] Minor fix when getting invalid voltha instance
Initial commit of the Global Forwarder.
Change-Id: I6c619a8589abaeecba00c004a42beaf063f31448
diff --git a/common/utils/id_generation.py b/common/utils/id_generation.py
index 984c19c..eedf434 100644
--- a/common/utils/id_generation.py
+++ b/common/utils/id_generation.py
@@ -17,6 +17,21 @@
from uuid import uuid4
+
+BROADCAST_CORE_ID=hex(0xFFFF)[2:]
+
+def get_next_core_id(current_id_in_hex_str):
+ """
+ :param current_id_in_hex_str: a hex string of the maximum core id
+ assigned without the leading 0x characters
+ :return: current_id_in_hex_str + 1 in hex string
+ """
+ if not current_id_in_hex_str or current_id_in_hex_str == '':
+ return '0001'
+ else:
+ return format(int(current_id_in_hex_str, 16) + 1, '04x')
+
+
def create_cluster_logical_device_ids(core_id, switch_id):
"""
Creates a logical device id and an OpenFlow datapath id that is unique
@@ -28,9 +43,21 @@
"""
switch_id = format(switch_id, '012x')
id = '{}{}'.format(format(int(core_id), '04x'), switch_id)
- hex_int=int(id,16)
+ hex_int = int(id, 16)
return id, hex_int
+def is_broadcast_core_id(id):
+ assert id and len(id) == 16
+ return id[:4] == BROADCAST_CORE_ID
+
+def create_cluster_id():
+ """
+ Returns an id that is common across all voltha instances. The id
+ is a str of 64 bits. The lower 48 bits refers to an id specific to that
+ object while the upper 16 bits refers a broadcast core_id
+ :return: An common id across all Voltha instances
+ """
+ return '{}{}'.format(BROADCAST_CORE_ID, uuid4().hex[:12])
def create_cluster_device_id(core_id):
"""
@@ -42,9 +69,11 @@
"""
return '{}{}'.format(format(int(core_id), '04x'), uuid4().hex[:12])
+
def get_core_id_from_device_id(device_id):
# Device id is a string and the first 4 characters represent the core_id
- assert device_id and device_id.len() == 16
+ assert device_id and len(device_id) == 16
+ # Get the leading 4 hexs and remove leading 0's
return device_id[:4]
@@ -55,9 +84,11 @@
:param logical_device_id:
:return: core_id string
"""
- assert logical_device_id and logical_device_id.len() == 16
+ assert logical_device_id and len(logical_device_id) == 16
+ # Get the leading 4 hexs and remove leading 0's
return logical_device_id[:4]
+
def get_core_id_from_datapath_id(datapath_id):
"""
datapath id is a uint64 where:
@@ -68,6 +99,6 @@
"""
assert datapath_id
# Get the hex string and remove the '0x' prefix
- id_in_hex_str=hex(datapath_id)[2:]
- assert id_in_hex_str.len() > 12
+ id_in_hex_str = hex(datapath_id)[2:]
+ assert len(id_in_hex_str) > 12
return id_in_hex_str[:-12]
diff --git a/tests/itests/README.md b/tests/itests/README.md
index 00d4020..658d95e 100644
--- a/tests/itests/README.md
+++ b/tests/itests/README.md
@@ -69,6 +69,8 @@
```
Run the test:
```
+cd /cord/incubator/voltha
+. ./env.sh
nosetests -s tests/itests/voltha/test_device_state_changes.py
```
@@ -93,6 +95,8 @@
Run the test:
```
+cd /cord/incubator/voltha
+. ./env.sh
nosetests -s tests/itests/voltha/test_persistence.py
```
@@ -108,6 +112,14 @@
nosetests -s tests/itests/voltha/test_voltha_rest_apis.py
```
-* Voltha_alarm_events: TODO
+* **Voltha_alarm_events**: TODO
-* Voltha_alarm_filters: TODO
+* **Voltha_alarm_filters**: TODO
+
+* **Dispatcher**: This test exercises the requests forwarding via the Global
+handler.
+```
+cd /cord/incubator/voltha
+. ./env.sh
+nosetests -s tests/itests/voltha/test_dispatcher.py
+```
diff --git a/tests/itests/voltha/test_dispatcher.py b/tests/itests/voltha/test_dispatcher.py
new file mode 100644
index 0000000..c369fc0
--- /dev/null
+++ b/tests/itests/voltha/test_dispatcher.py
@@ -0,0 +1,394 @@
+from random import randint
+from time import time, sleep
+
+from google.protobuf.json_format import MessageToDict
+from unittest import main, TestCase
+from voltha.protos.device_pb2 import Device
+from tests.itests.voltha.rest_base import RestBase
+from voltha.core.flow_decomposer import mk_simple_flow_mod, in_port, output
+from voltha.protos import openflow_13_pb2 as ofp
+from common.utils.consulhelpers import get_endpoint_from_consul
+from common.utils.consulhelpers import verify_all_services_healthy
+from tests.itests.docutests.test_utils import \
+ run_command_to_completion_with_raw_stdout, \
+ run_command_to_completion_with_stdout_in_list
+from voltha.protos.voltha_pb2 import AlarmFilter
+
+LOCAL_CONSUL = "localhost:8500"
+DOCKER_COMPOSE_FILE = "compose/docker-compose-system-test.yml"
+
+command_defs = dict(
+ docker_ps="docker ps",
+ docker_compose_start_all="docker-compose -f {} up -d "
+ .format(DOCKER_COMPOSE_FILE),
+ docker_stop_and_remove_all_containers="docker-compose -f {} down"
+ .format(DOCKER_COMPOSE_FILE),
+ docker_compose_start_voltha="docker-compose -f {} up -d voltha "
+ .format(DOCKER_COMPOSE_FILE),
+ docker_compose_stop_voltha="docker-compose -f {} stop voltha"
+ .format(DOCKER_COMPOSE_FILE),
+ docker_compose_remove_voltha="docker-compose -f {} rm -f voltha"
+ .format(DOCKER_COMPOSE_FILE),
+ kafka_topics="kafkacat -b {} -L",
+ kafka_alarms="kafkacat -o end -b {} -C -t voltha.alarms -c 2",
+ kafka_kpis="kafkacat -o end -b {} -C -t voltha.kpis -c 5"
+)
+
+class DispatcherTest(RestBase):
+
+ t0 = [time()]
+
+ def pt(self, msg=''):
+ t1 = time()
+ print '%20.8f ms - %s' % (1000 * (t1 - DispatcherTest.t0[0]),
+ msg)
+ DispatcherTest.t0[0] = t1
+
+
+ def wait_till(self, msg, predicate, interval=0.1, timeout=5.0):
+ deadline = time() + timeout
+ while time() < deadline:
+ if predicate():
+ return
+ sleep(interval)
+ self.fail('Timed out while waiting for condition: {}'.format(msg))
+
+
+ def test_01_global_rest_apis(self):
+ # Start the voltha ensemble with a single voltha instance
+ self._stop_and_remove_all_containers()
+ sleep(5) # A small wait for the system to settle down
+ self.start_all_containers()
+ self.set_rest_endpoint()
+ self.set_kafka_endpoint()
+
+ self._get_root()
+ self._get_schema()
+ self._get_health()
+ self._get_voltha()
+ self._list_voltha_instances()
+ self._get_voltha_instance()
+ olt_id = self._add_olt_device()
+ self._verify_device_preprovisioned_state(olt_id)
+ self._activate_device(olt_id)
+ ldev_id = self._wait_for_logical_device(olt_id)
+ ldevices = self._list_logical_devices()
+ logical_device_id = ldevices['items'][0]['id']
+ self._get_logical_device(logical_device_id)
+ self._list_logical_device_ports(logical_device_id)
+ self._list_and_update_logical_device_flows(logical_device_id)
+ self._list_and_update_logical_device_flow_groups(logical_device_id)
+ devices = self._list_devices()
+ device_id = devices['items'][0]['id']
+ self._get_device(device_id)
+ self._list_device_ports(device_id)
+ self._list_device_flows(device_id)
+ self._list_device_flow_groups(device_id)
+ self._get_images(device_id)
+ self._self_test(device_id)
+ dtypes = self._list_device_types()
+ self._get_device_type(dtypes['items'][0]['id'])
+ alarm_filter = self._create_device_filter(olt_id)
+ self._remove_device_filter(alarm_filter['id'])
+ # TODO: PM APIs test
+
+ def test_02_cross_instances_dispatch(self):
+ """ TODO: So far manual tests done. Needs to be automated. """
+ pass
+
+ def _stop_and_remove_all_containers(self):
+ # check if there are any running containers first
+ cmd = command_defs['docker_ps']
+ out, err, rc = run_command_to_completion_with_stdout_in_list(cmd)
+ self.assertEqual(rc, 0)
+ if len(out) > 1: # not counting docker ps header
+ cmd = command_defs['docker_stop_and_remove_all_containers']
+ out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
+ self.assertEqual(rc, 0)
+
+ def start_all_containers(self):
+ t0 = time()
+
+ # start all the containers
+ self.pt("Starting all containers ...")
+ cmd = command_defs['docker_compose_start_all']
+ out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
+ self.assertEqual(rc, 0)
+
+ self.pt("Waiting for voltha and chameleon containers to be ready ...")
+ self.wait_till('voltha services HEALTHY',
+ lambda: verify_all_services_healthy(
+ LOCAL_CONSUL, service_name='voltha-grpc') == True,
+ timeout=10)
+ self.wait_till('chameleon services HEALTHY',
+ lambda: verify_all_services_healthy(
+ LOCAL_CONSUL,service_name='chameleon-rest') == True,
+ timeout=10)
+
+ # Chameleon takes some time to compile the protos and make them
+ # available. So let's wait 10 seconds
+ sleep(10)
+
+ def set_rest_endpoint(self):
+ self.rest_endpoint = get_endpoint_from_consul(LOCAL_CONSUL,
+ 'chameleon-rest')
+ self.base_url = 'http://' + self.rest_endpoint
+
+ def set_kafka_endpoint(self):
+ self.kafka_endpoint = get_endpoint_from_consul(LOCAL_CONSUL, 'kafka')
+
+
+ def _get_root(self):
+ res = self.get('/', expected_content_type='text/html')
+ self.assertGreaterEqual(res.find('swagger'), 0)
+
+ def _get_schema(self):
+ res = self.get('/schema')
+ self.assertEqual(set(res.keys()), {'protos', 'yang_from','swagger_from'})
+
+ def _get_health(self):
+ res = self.get('/health')
+ self.assertEqual(res['state'], 'HEALTHY')
+
+ # ~~~~~~~~~~~~~~~~~~~~~ TOP LEVEL VOLTHA OPERATIONS ~~~~~~~~~~~~~~~~~~~~~~~
+
+ def _get_voltha(self):
+ res = self.get('/api/v1')
+ self.assertEqual(res['version'], '0.9.0')
+
+ def _list_voltha_instances(self):
+ res = self.get('/api/v1/instances')
+ self.assertEqual(len(res['items']), 1)
+
+ def _get_voltha_instance(self):
+ res = self.get('/api/v1/instances')
+ voltha_id=res['items'][0]
+ res = self.get('/api/v1/instances/{}'.format(voltha_id))
+ self.assertEqual(res['version'], '0.9.0')
+
+ def _add_olt_device(self):
+ device = Device(
+ type='simulated_olt',
+ mac_address='00:00:00:00:00:01'
+ )
+ device = self.post('/api/v1/devices', MessageToDict(device),
+ expected_code=200)
+ return device['id']
+
+ def _verify_device_preprovisioned_state(self, olt_id):
+ # we also check that so far what we read back is same as what we get
+ # back on create
+ device = self.get('/api/v1/devices/{}'.format(olt_id))
+ self.assertNotEqual(device['id'], '')
+ self.assertEqual(device['adapter'], 'simulated_olt')
+ self.assertEqual(device['admin_state'], 'PREPROVISIONED')
+ self.assertEqual(device['oper_status'], 'UNKNOWN')
+
+ def _activate_device(self, olt_id):
+ path = '/api/v1/devices/{}'.format(olt_id)
+ self.post(path + '/enable', expected_code=200)
+ device = self.get(path)
+ self.assertEqual(device['admin_state'], 'ENABLED')
+
+ self.wait_till(
+ 'admin state moves to ACTIVATING or ACTIVE',
+ lambda: self.get(path)['oper_status'] in ('ACTIVATING', 'ACTIVE'),
+ timeout=0.5)
+
+ # eventually, it shall move to active state and by then we shall have
+ # device details filled, connect_state set, and device ports created
+ self.wait_till(
+ 'admin state ACTIVE',
+ lambda: self.get(path)['oper_status'] == 'ACTIVE',
+ timeout=0.5)
+ device = self.get(path)
+ images = device['images']
+ image = images['image']
+ image_1 = image[0]
+ version = image_1['version']
+ self.assertNotEqual(version, '')
+ self.assertEqual(device['connect_status'], 'REACHABLE')
+
+ ports = self.get(path + '/ports')['items']
+ self.assertEqual(len(ports), 2)
+
+ def _wait_for_logical_device(self, olt_id):
+ # we shall find the logical device id from the parent_id of the olt
+ # (root) device
+ device = self.get(
+ '/api/v1/devices/{}'.format(olt_id))
+ self.assertNotEqual(device['parent_id'], '')
+ logical_device = self.get(
+ '/api/v1/logical_devices/{}'.format(device['parent_id']))
+
+ # the logical device shall be linked back to the hard device,
+ # its ports too
+ self.assertEqual(logical_device['root_device_id'], device['id'])
+
+ logical_ports = self.get(
+ '/api/v1/logical_devices/{}/ports'.format(
+ logical_device['id'])
+ )['items']
+ self.assertGreaterEqual(len(logical_ports), 1)
+ logical_port = logical_ports[0]
+ self.assertEqual(logical_port['id'], 'nni')
+ self.assertEqual(logical_port['ofp_port']['name'], 'nni')
+ self.assertEqual(logical_port['ofp_port']['port_no'], 129)
+ self.assertEqual(logical_port['device_id'], device['id'])
+ self.assertEqual(logical_port['device_port_no'], 2)
+ return logical_device['id']
+
+ def _list_logical_devices(self):
+ res = self.get('/api/v1/logical_devices')
+ self.assertGreaterEqual(len(res['items']), 1)
+ return res
+
+ def _get_logical_device(self, id):
+ res = self.get('/api/v1/logical_devices/{}'.format(id))
+ self.assertIsNotNone(res['datapath_id'])
+
+ def _list_logical_device_ports(self, id):
+ res = self.get('/api/v1/logical_devices/{}/ports'.format(id))
+ self.assertGreaterEqual(len(res['items']), 1)
+
+ def _list_and_update_logical_device_flows(self, id):
+
+ # retrieve flow list
+ res = self.get('/api/v1/logical_devices/{}/flows'.format(id))
+ len_before = len(res['items'])
+
+ # add some flows
+ req = ofp.FlowTableUpdate(
+ id=id,
+ flow_mod=mk_simple_flow_mod(
+ cookie=randint(1, 10000000000),
+ priority=len_before,
+ match_fields=[
+ in_port(129)
+ ],
+ actions=[
+ output(1)
+ ]
+ )
+ )
+ res = self.post('/api/v1/logical_devices/{}/flows'.format(id),
+ MessageToDict(req, preserving_proto_field_name=True),
+ expected_code=200)
+ # TODO check some stuff on res
+
+ res = self.get('/api/v1/logical_devices/{}/flows'.format(id))
+ len_after = len(res['items'])
+ self.assertGreater(len_after, len_before)
+
+ def _list_and_update_logical_device_flow_groups(self, id):
+
+ # retrieve flow list
+ res = self.get('/api/v1/logical_devices/{}/flow_groups'.format(id))
+ len_before = len(res['items'])
+
+ # add some flows
+ req = ofp.FlowGroupTableUpdate(
+ id=id,
+ group_mod=ofp.ofp_group_mod(
+ command=ofp.OFPGC_ADD,
+ type=ofp.OFPGT_ALL,
+ group_id=len_before + 1,
+ buckets=[
+ ofp.ofp_bucket(
+ actions=[
+ ofp.ofp_action(
+ type=ofp.OFPAT_OUTPUT,
+ output=ofp.ofp_action_output(
+ port=1
+ )
+ )
+ ]
+ )
+ ]
+ )
+ )
+ res = self.post('/api/v1/logical_devices/{}/flow_groups'.format(id),
+ MessageToDict(req, preserving_proto_field_name=True),
+ expected_code=200)
+ # TODO check some stuff on res
+
+ res = self.get('/api/v1/logical_devices/{}/flow_groups'.format(id))
+ len_after = len(res['items'])
+ self.assertGreater(len_after, len_before)
+
+ def _list_devices(self):
+ res = self.get('/api/v1/devices')
+ self.assertGreaterEqual(len(res['items']), 2)
+ return res
+
+ def _get_device(self, id):
+ res = self.get('/api/v1/devices/{}'.format(id))
+ # TODO test result
+
+ def _list_device_ports(self, id):
+ res = self.get('/api/v1/devices/{}/ports'.format(id))
+ self.assertGreaterEqual(len(res['items']), 2)
+
+ def _list_device_flows(self, id):
+ # pump some flows into the logical device
+ res = self.get('/api/v1/devices/{}/flows'.format(id))
+ self.assertGreaterEqual(len(res['items']), 1)
+
+ def _list_device_flow_groups(self,id):
+ res = self.get('/api/v1/devices/{}/flow_groups'.format(id))
+ self.assertGreaterEqual(len(res['items']), 0)
+
+ def _list_device_types(self):
+ res = self.get('/api/v1/device_types')
+ self.assertGreaterEqual(len(res['items']), 2)
+ return res
+
+ def _get_device_type(self, dtype):
+ res = self.get('/api/v1/device_types/{}'.format(dtype))
+ self.assertIsNotNone(res)
+ # TODO test the result
+
+ def _list_device_groups(self):
+ pass
+ # res = self.get('/api/v1/device_groups')
+ # self.assertGreaterEqual(len(res['items']), 1)
+
+ def _get_device_group(self):
+ pass
+ # res = self.get('/api/v1/device_groups/1')
+ # # TODO test the result
+
+ def _get_images(self, id):
+ res = self.get('/api/v1/devices/{}/images'.format(id))
+ self.assertIsNotNone(res)
+
+ def _self_test(self, id):
+ res = self.post('/api/v1/devices/{}/self_test'.format(id),
+ expected_code=200)
+ self.assertIsNotNone(res)
+
+ def _create_device_filter(self, device_id):
+ rules = list()
+ rule = dict()
+
+ # Create a filter with a single rule
+ rule['key'] = 'device_id'
+ rule['value'] = device_id
+ rules.append(rule)
+
+ alarm_filter = AlarmFilter(rules=rules)
+ alarm_filter = self.post('/api/v1/alarm_filters',
+ MessageToDict(alarm_filter),
+ expected_code=200)
+ self.assertIsNotNone(alarm_filter)
+ return alarm_filter
+
+ def _remove_device_filter(self, alarm_filter_id):
+ path = '/api/v1/alarm_filters/{}'.format(alarm_filter_id)
+ self.delete(path, expected_code=200)
+ alarm_filter = self.get(path, expected_code=404)
+ self.assertIsNone(alarm_filter)
+
+
+if __name__ == '__main__':
+ main()
diff --git a/tests/itests/voltha/test_persistence.py b/tests/itests/voltha/test_persistence.py
index 428d51d..1111c1e 100644
--- a/tests/itests/voltha/test_persistence.py
+++ b/tests/itests/voltha/test_persistence.py
@@ -691,14 +691,14 @@
rules.append(rule)
alarm_filter = AlarmFilter(rules=rules)
- alarm_filter = self.post('/api/v1/local/alarm_filters',
+ alarm_filter = self.post('/api/v1/alarm_filters',
MessageToDict(alarm_filter),
expected_code=200)
self.assertIsNotNone(alarm_filter)
return alarm_filter
def remove_device_filter(self, alarm_filter_id):
- path = '/api/v1/local/alarm_filters/{}'.format(alarm_filter_id)
+ path = '/api/v1/alarm_filters/{}'.format(alarm_filter_id)
self.delete(path, expected_code=200)
alarm_filter = self.get(path, expected_code=404)
self.assertIsNone(alarm_filter)
diff --git a/tests/itests/voltha/test_voltha_alarm_filters.py b/tests/itests/voltha/test_voltha_alarm_filters.py
index 88744ea..4b73509 100644
--- a/tests/itests/voltha/test_voltha_alarm_filters.py
+++ b/tests/itests/voltha/test_voltha_alarm_filters.py
@@ -91,7 +91,7 @@
device = Device(
type='simulated_olt',
)
- device = self.post('/api/v1/local/devices', MessageToDict(device),
+ device = self.post('/api/v1/devices', MessageToDict(device),
expected_code=200)
return device
@@ -106,7 +106,7 @@
rules.append(rule)
alarm_filter = AlarmFilter(rules=rules)
- alarm_filter = self.post('/api/v1/local/alarm_filters', MessageToDict(alarm_filter),
+ alarm_filter = self.post('/api/v1/alarm_filters', MessageToDict(alarm_filter),
expected_code=200)
return alarm_filter
@@ -114,7 +114,7 @@
# Active the simulated device.
# This will trigger the simulation of random alarms
def activate_device(self, device_id):
- path = '/api/v1/local/devices/{}'.format(device_id)
+ path = '/api/v1/devices/{}'.format(device_id)
self.post(path + '/enable', expected_code=200)
device = self.get(path)
self.assertEqual(device['admin_state'], 'ENABLED')
diff --git a/voltha/adapters/simulated_olt/simulated_olt.py b/voltha/adapters/simulated_olt/simulated_olt.py
index 4de3f0f..e3860b0 100644
--- a/voltha/adapters/simulated_olt/simulated_olt.py
+++ b/voltha/adapters/simulated_olt/simulated_olt.py
@@ -499,20 +499,23 @@
@inlineCallbacks
def _simulate_detection_of_onus(self, device_id):
- for i in xrange(1, 5):
- log.info('activate-olt-for-onu-{}'.format(i))
- vlan_id = self._olt_side_onu_activation(i)
- yield asleep(0.05)
- self.adapter_agent.child_device_detected(
- parent_device_id=device_id,
- parent_port_no=1,
- child_device_type='simulated_onu',
- proxy_address=Device.ProxyAddress(
- device_id=device_id,
- channel_id=vlan_id
- ),
- vlan=vlan_id
- )
+ try:
+ for i in xrange(1, 5):
+ log.info('activate-olt-for-onu-{}'.format(i))
+ vlan_id = self._olt_side_onu_activation(i)
+ yield asleep(0.05)
+ self.adapter_agent.child_device_detected(
+ parent_device_id=device_id,
+ parent_port_no=1,
+ child_device_type='simulated_onu',
+ proxy_address=Device.ProxyAddress(
+ device_id=device_id,
+ channel_id=vlan_id
+ ),
+ vlan=vlan_id
+ )
+ except Exception as e:
+ log.exception('error', e=e)
def _olt_side_onu_activation(self, seq):
"""
diff --git a/voltha/coordinator.py b/voltha/coordinator.py
index 20164d4..6eecc11 100644
--- a/voltha/coordinator.py
+++ b/voltha/coordinator.py
@@ -27,6 +27,7 @@
from leader import Leader
from common.utils.asleep import asleep
+from common.utils.message_queue import MessageQueue
from voltha.registry import IComponent
from worker import Worker
from simplejson import dumps, loads
@@ -75,16 +76,18 @@
'tracking_loop_delay', 1)
self.prefix = self.config.get('voltha_kv_prefix', 'service/voltha')
self.leader_prefix = '/'.join((self.prefix, self.config.get(
- self.config['leader_key'], 'leader')))
+ self.config['leader_key'], 'leader')))
self.membership_prefix = '/'.join((self.prefix, self.config.get(
- self.config['membership_key'], 'members'), ''))
+ self.config['membership_key'], 'members'), ''))
self.assignment_prefix = '/'.join((self.prefix, self.config.get(
- self.config['assignment_key'], 'assignments'), ''))
+ self.config['assignment_key'], 'assignments'), ''))
self.workload_prefix = '/'.join((self.prefix, self.config.get(
- self.config['workload_key'], 'work'), ''))
+ self.config['workload_key'], 'work'), ''))
self.core_store_prefix = '/'.join((self.prefix, self.config.get(
- self.config['core_store_key'], 'data/core')))
- self.core_storage_suffix='core_store'
+ self.config['core_store_key'], 'data/core')))
+ self.core_store_assignment_key = self.core_store_prefix + \
+ '/assignment'
+ self.core_storage_suffix = 'core_store'
self.retries = 0
self.instance_id = instance_id
@@ -110,6 +113,8 @@
self.wait_for_leader_deferreds = []
+ self.peers_mapping_queue = MessageQueue()
+
def start(self):
log.debug('starting')
reactor.callLater(0, self._async_init)
@@ -142,13 +147,18 @@
self.wait_for_leader_deferreds.append(d)
return d
-
# Wait for a core data id to be assigned to this voltha instance
@inlineCallbacks
def get_core_store_id_and_prefix(self):
core_store_id = yield self.worker.get_core_store_id()
returnValue((core_store_id, self.core_store_prefix))
+ def recv_peers_map(self):
+ return self.peers_mapping_queue.get()
+
+ def publish_peers_map_change(self, msg):
+ self.peers_mapping_queue.put(msg)
+
# Proxy methods for consul with retry support
def kv_get(self, *args, **kw):
@@ -249,10 +259,10 @@
self.membership_record_key,
index=index)
log.debug('membership-record-change-detected',
- index=index, record=record)
+ index=index, record=record)
if record is None or \
- 'Session' not in record or \
- record['Session'] != self.session_id:
+ 'Session' not in record or \
+ record['Session'] != self.session_id:
log.debug('remaking-membership-record')
yield self._retry(self._do_create_membership_record)
@@ -292,7 +302,7 @@
(index, record) = yield self._retry(self.consul.kv.get,
self.leader_prefix)
log.debug('leadership-key',
- i_am_leader=result, index=index, record=record)
+ i_am_leader=result, index=index, record=record)
if record is not None:
if result is True:
@@ -314,7 +324,7 @@
self.leader_prefix,
index=index)
log.debug('leader-key-change',
- index=index, updated=updated)
+ index=index, updated=updated)
if updated is None or updated != last:
# leadership has changed or vacated (or forcefully
# removed), apply now
diff --git a/voltha/core/adapter_agent.py b/voltha/core/adapter_agent.py
index dff4695..9140cc5 100644
--- a/voltha/core/adapter_agent.py
+++ b/voltha/core/adapter_agent.py
@@ -39,6 +39,7 @@
from voltha.protos.voltha_pb2 import DeviceGroup, LogicalDevice, \
LogicalPort, AdminState, OperStatus, AlarmFilterRuleKey
from voltha.registry import registry
+from common.utils.id_generation import create_cluster_device_id
@implementer(IAdapterAgent)
@@ -512,7 +513,8 @@
# we create new ONU device objects and insert them into the config
# TODO should we auto-enable the freshly created device? Probably.
device = Device(
- id=uuid4().hex[:12],
+ id=create_cluster_device_id(self.core.core_store_id),
+ # id=uuid4().hex[:12],
type=child_device_type,
parent_id=parent_device_id,
parent_port_no=parent_port_no,
diff --git a/voltha/core/core.py b/voltha/core/core.py
index 6d82d31..80bc5c1 100644
--- a/voltha/core/core.py
+++ b/voltha/core/core.py
@@ -40,10 +40,18 @@
@implementer(IComponent)
class VolthaCore(object):
- def __init__(self, instance_id, core_store_id, version, log_level):
+ def __init__(self,
+ instance_id,
+ core_store_id,
+ grpc_port,
+ version,
+ log_level):
self.instance_id = instance_id
self.stopped = False
- self.dispatcher = Dispatcher(self, instance_id)
+ self.dispatcher = Dispatcher(self,
+ instance_id,
+ core_store_id,
+ grpc_port)
self.core_store_id = core_store_id
self.global_handler = GlobalHandler(
dispatcher=self.dispatcher,
diff --git a/voltha/core/dispatcher.py b/voltha/core/dispatcher.py
index e0c6c0a..99357ac 100644
--- a/voltha/core/dispatcher.py
+++ b/voltha/core/dispatcher.py
@@ -20,22 +20,38 @@
calls are forwarded to the LocalHandler.
"""
import structlog
-
+from twisted.internet.defer import inlineCallbacks, returnValue
from voltha.protos.voltha_pb2 import VolthaLocalServiceStub
+from voltha.registry import registry
+from twisted.internet import reactor
+import grpc
+from grpc import StatusCode
+from grpc._channel import _Rendezvous
+from common.utils.id_generation import get_core_id_from_device_id, \
+ is_broadcast_core_id
log = structlog.get_logger()
-class Dispatcher(object):
+class DispatchError(object):
+ def __init__(self, error_code):
+ self.error_code = error_code
- def __init__(self, core, instance_id):
+
+class Dispatcher(object):
+ def __init__(self, core, instance_id, core_store_id, grpc_port):
self.core = core
self.instance_id = instance_id
+ self.core_store_id = core_store_id
+ self.grpc_port = grpc_port
self.local_handler = None
+ self.peers_map = dict()
+ self.grpc_conn_map = {}
def start(self):
log.debug('starting')
self.local_handler = self.core.get_local_handler()
+ reactor.callLater(0, self._start_tracking_peers)
log.info('started')
return self
@@ -43,22 +59,101 @@
log.debug('stopping')
log.info('stopped')
- def dispatch(self, instance_id, stub, method_name, input, context):
- log.debug('dispatch', instance_id=instance_id, stub=stub,
- _method_name=method_name, input=input)
- # special case if instance_id is us
- if instance_id == self.instance_id:
- # for now, we assume it is always the local stub
- assert stub == VolthaLocalServiceStub
- method = getattr(self.local_handler, method_name)
- log.debug('dispatching', method=method)
- res = method(input, context=context)
- log.debug('dispatch-success', res=res)
- return res
+ @inlineCallbacks
+ def dispatch(self,
+ method_name,
+ request,
+ context,
+ core_id=None,
+ id=None,
+ broadcast=False):
+ """
+ Called whenever a global request is received from the NBI. The
+ request will be dispatch as follows:
+ 1) to a specific voltha Instance if the core_id is specified
+ 2) to the local Voltha Instance if the request specifies an ID that
+ matches the core id of the local Voltha instance
+ 3) to a remote Voltha Instance if the request specifies an ID that
+ matches the core id of that voltha instance
+ 4) to all Voltha Instances if it's a broadcast request,
+ e.g. getDevices, i.e. broadcast=True. The id may or may not be
+ None. In this case, the results will be returned as a list of
+ responses back to the global handler
+ 5) to the local voltha instance if id=None and broadcast=False.
+ This occurs in cases where any Voltha instance will return the same
+ output, e.g. getAdapters()
+ :param method_name: rpc name
+ :param id: the id in the request, if present.
+ :param request: the input parameters
+ :param context: grpc context
+ :return: the response of that dispatching request
+ """
+ log.debug('start',
+ _method_name=method_name,
+ id=id,
+ request=request)
- else:
- log.warning('no-real-dispatch-yet')
- raise KeyError()
+ core_id_from_request_id = None
+ if id:
+ try:
+ core_id_from_request_id = get_core_id_from_device_id(id)
+ except Exception, e:
+ log.warning('invalid-id', request=request, id=id)
+ returnValue(DispatchError(StatusCode.NOT_FOUND))
+
+ try:
+ # Broadcast request if set
+ if broadcast:
+ # broadcast to all instances (including locally)
+ res = yield self._broadcast_request(method_name,
+ request,
+ context)
+ returnValue(res)
+
+ # Local Dispatch
+ elif (core_id and core_id == self.core_store_id) or (not id) or \
+ (core_id_from_request_id and (
+ (core_id_from_request_id == self.core_store_id) or
+ (is_broadcast_core_id(id))
+ )
+ ):
+ returnValue(self._local_dispatch(self.core_store_id,
+ method_name,
+ request,
+ context))
+ # Peer Dispatch
+ elif core_id_from_request_id:
+ res = yield self._dispatch_to_peer(core_id_from_request_id,
+ method_name,
+ request,
+ context)
+ returnValue(res)
+ else:
+ log.warning('invalid-request', request=request, id=id,
+ core_id=core_id, broadcast=broadcast)
+ returnValue(DispatchError(StatusCode.INVALID_ARGUMENT))
+
+ except Exception as e:
+ log.exception('remote-dispatch-exception', e=e)
+ returnValue(DispatchError(StatusCode.UNKNOWN))
+
+ def get_core_id_from_instance_id(self, instance_id):
+ """
+ :param instance_id: instance name
+ :return: core id of that instance
+ """
+ if instance_id == self.instance_id:
+ return self.core_store_id
+ for id, instance in self.peers_map.iteritems():
+ if instance['id'] == instance_id:
+ return id
+
+ def get_cluster_instances(self):
+ result = []
+ result.append(self.instance_id)
+ for id, instance in self.peers_map.iteritems():
+ result.append(instance['id'])
+ return result
def instance_id_by_logical_device_id(self, logical_device_id):
log.warning('temp-mapping-logical-device-id')
@@ -69,3 +164,203 @@
log.warning('temp-mapping-logical-device-id')
# TODO no true dispatchong yet, we blindly map everything to self
return self.instance_id
+
+ @inlineCallbacks
+ def _broadcast_request(self, method_name, request, context):
+ # First get local result
+ result = self._local_dispatch(self.core_store_id,
+ method_name,
+ request,
+ context)
+ # Then get peers results
+ for core_id in self.peers_map:
+ if self.peers_map[core_id] and self.grpc_conn_map[core_id]:
+ res = yield self._dispatch_to_peer(core_id,
+ method_name,
+ request,
+ context)
+ if isinstance(res, DispatchError):
+ log.warning('ignoring-peer',
+ core_id=core_id,
+ error_code=res.error_code)
+ else:
+ result.MergeFrom(res)
+ returnValue(result)
+
+ def _local_dispatch(self, core_id, method_name, request, context):
+ log.debug('local-dispatch', core_id=core_id)
+ method = getattr(self.local_handler, method_name)
+ res = method(request, context=context)
+ log.debug('local-dispatch-result', res=res, context=context)
+ return res
+
+ @inlineCallbacks
+ def _start_tracking_peers(self):
+ try:
+ while True:
+ peers_map = yield registry('coordinator').recv_peers_map()
+ log.info('peers-map-changed', peers_map=peers_map)
+ yield self.update_grpc_client_map(peers_map)
+ self.peers_map = peers_map
+ except Exception, e:
+ log.exception('exception', e=e)
+
+ @inlineCallbacks
+ def update_grpc_client_map(self, peers_map):
+ try:
+ # 1. Get the list of connection to open and to close
+ to_open = dict()
+ to_close = set()
+ for id, instance in peers_map.iteritems():
+ # Check for no change
+ if id in self.peers_map and self.peers_map[id] == instance:
+ continue
+
+ if id not in self.peers_map:
+ if instance:
+ to_open[id] = instance['host']
+ elif instance:
+ to_open[id] = instance['host']
+ if self.peers_map[id]:
+ to_close.add(id)
+ else:
+ if self.peers_map[id]:
+ to_close.add(id)
+
+ # Close connections that are no longer referenced
+ old_ids = set(self.peers_map.keys()) - set(peers_map.keys())
+ for id in old_ids:
+ if self.peers_map[id]:
+ to_close.add(id)
+
+ # 2. Refresh the grpc connections
+ yield self._refresh_grpc_connections(to_open, to_close)
+ except Exception, e:
+ log.exception('exception', e=e)
+
+ @inlineCallbacks
+ def _refresh_grpc_connections(self, to_open, to_close):
+ try:
+ log.info('grpc-channel-refresh', to_open=to_open,
+ to_close=to_close)
+ # First open the connection
+ for id, host in to_open.iteritems():
+ if id in self.grpc_conn_map and self.grpc_conn_map[id]:
+ # clear connection
+ self._disconnect_from_peer(id)
+ if host:
+ self.grpc_conn_map[id] = \
+ yield self._connect_to_peer(host, self.grpc_port)
+
+ # Close the unused connection
+ for id in to_close:
+ if self.grpc_conn_map[id]:
+ # clear connection
+ self._disconnect_from_peer(id)
+ except Exception, e:
+ log.exception('exception', e=e)
+
+ @inlineCallbacks
+ def _connect_to_peer(self, host, port):
+ try:
+ channel = yield grpc.insecure_channel('{}:{}'.format(host, port))
+ log.info('grpc-channel-created-with-peer', peer=host)
+ returnValue(channel)
+ except Exception, e:
+ log.exception('exception', e=e)
+
+ def _disconnect_from_peer(self, peer_id):
+ try:
+ if self.grpc_conn_map[peer_id]:
+ # Let garbage collection clear the connect - no API exist to
+ # close the connection
+ # yield self.grpc_conn_map[peer_id].close()
+ self.grpc_conn_map[peer_id] = None
+ log.info('grpc-channel-closed-with-peer', peer_id=peer_id)
+ except Exception, e:
+ log.exception('exception', e=e)
+ finally:
+ self.grpc_conn_map.pop(peer_id)
+
+ @inlineCallbacks
+ def _reconnect_to_peer(self, peer_id):
+ try:
+ # First disconnect
+ yield self._disconnect_from_peer(peer_id)
+ # Then reconnect
+ peer_instance = self.peers_map.get(peer_id, None)
+ if peer_instance:
+ self.grpc_conn_map[peer_id] = \
+ yield self._connect_to_peer(peer_instance['host'],
+ self.grpc_port)
+ log.info('reconnected-to-peer', peer_id=peer_id)
+ returnValue(True)
+ else:
+ log.info('peer-unavailable', peer_id=peer_id)
+ except Exception, e:
+ log.exception('exception', e=e)
+ returnValue(False)
+
+ @inlineCallbacks
+ def _dispatch_to_peer(self,
+ core_id,
+ method_name,
+ request,
+ context,
+ retry=1):
+ """
+ Invoke a gRPC call to the remote server and return the response.
+ :param core_id: The voltha instance where this request needs to be sent
+ :param method_name: The method name inside the service stub
+ :param request: The request protobuf message
+ :param context: grprc context
+ :return: The response as a protobuf message
+ """
+ log.debug('peer-dispatch',
+ core_id=core_id,
+ _method_name=method_name,
+ request=request)
+
+ if core_id not in self.peers_map or not self.peers_map[core_id]:
+ log.exception('non-existent-core-id', core_id=core_id,
+ peers_map=self.peers_map)
+ return
+
+ try:
+ # Always request from the local service when making request to peer
+ stub = VolthaLocalServiceStub
+ method = getattr(stub(self.grpc_conn_map[core_id]), method_name)
+ response, rendezvous = yield method.with_call(request,
+ metadata=context.invocation_metadata())
+ log.debug('peer-response',
+ core_id=core_id,
+ response=response,
+ rendezvous_metadata=rendezvous.trailing_metadata())
+ # TODO: Should we return the metadata as well
+ returnValue(response)
+ except grpc._channel._Rendezvous, e:
+ code = e.code()
+ if code == grpc.StatusCode.UNAVAILABLE:
+ # Try to reconnect
+ status = self._reconnect_to_peer(core_id)
+ if status and retry > 0:
+ response = yield self._dispatch_to_peer(core_id,
+ method_name,
+ request,
+ context,
+ retry=retry - 1)
+ returnValue(response)
+ elif code in (
+ grpc.StatusCode.NOT_FOUND,
+ grpc.StatusCode.INVALID_ARGUMENT,
+ grpc.StatusCode.ALREADY_EXISTS,
+ grpc.StatusCode.UNAUTHENTICATED,
+ grpc.StatusCode.PERMISSION_DENIED):
+
+ pass # don't log error, these occur naturally
+
+ else:
+ log.exception('error-invoke', e=e)
+
+ log.warning('error-from-peer', code=code)
+ returnValue(DispatchError(code))
diff --git a/voltha/core/global_handler.py b/voltha/core/global_handler.py
index 6979de3..7c80826 100644
--- a/voltha/core/global_handler.py
+++ b/voltha/core/global_handler.py
@@ -18,20 +18,23 @@
from twisted.internet.defer import returnValue
from common.utils.grpc_utils import twisted_async
+from common.utils.id_generation import create_cluster_id
from voltha.core.config.config_root import ConfigRoot
from voltha.protos.device_pb2 import PmConfigs, Images
from voltha.protos.voltha_pb2 import \
add_VolthaGlobalServiceServicer_to_server, VolthaLocalServiceStub, \
VolthaGlobalServiceServicer, Voltha, VolthaInstances, VolthaInstance, \
- LogicalDevice, Ports, Flows, FlowGroups, Device, SelfTestResponse
+ LogicalDevice, Ports, Flows, FlowGroups, Device, SelfTestResponse, \
+ VolthaGlobalServiceStub, Devices, DeviceType, DeviceTypes, DeviceGroup, \
+ AlarmFilter, AlarmFilters
from voltha.registry import registry
from google.protobuf.empty_pb2 import Empty
+from dispatcher import DispatchError
log = structlog.get_logger()
class GlobalHandler(VolthaGlobalServiceServicer):
-
def __init__(self, dispatcher, instance_id, **init_kw):
self.dispatcher = dispatcher
self.instance_id = instance_id
@@ -61,539 +64,585 @@
return self.root.get('/', depth=1)
@twisted_async
- @inlineCallbacks
def ListVolthaInstances(self, request, context):
log.info('grpc-request', request=request)
- items = yield registry('coordinator').get_members()
- returnValue(VolthaInstances(items=items))
+ items = self.dispatcher.get_cluster_instances()
+ return VolthaInstances(items=items)
@twisted_async
+ @inlineCallbacks
def GetVolthaInstance(self, request, context):
log.info('grpc-request', request=request)
- instance_id = request.id
- try:
- return self.dispatcher.dispatch(
- instance_id,
- VolthaLocalServiceStub,
- 'GetVolthaInstance',
- Empty(),
- context)
- except KeyError:
- context.set_details(
- 'Voltha instance \'{}\' not found'.format(instance_id))
+ core_id = self.dispatcher.get_core_id_from_instance_id(request.id)
+ if not core_id:
+ log.info('invalid-instance-id', instance=request.id)
+ context.set_details('Voltha Instance error')
context.set_code(StatusCode.NOT_FOUND)
- return VolthaInstance()
+ returnValue(VolthaInstance())
+
+ response = yield self.dispatcher.dispatch('GetVolthaInstance',
+ Empty(),
+ context,
+ core_id=core_id)
+ log.info('grpc-response', response=response)
+ if isinstance(response, DispatchError):
+ log.info('grpc-error-response', error=response.error_code)
+ context.set_details('Voltha Instance error')
+ context.set_code(response.error_code)
+ returnValue(VolthaInstance())
+ else:
+ log.info('grpc-success-response', response=response)
+ returnValue(response)
@twisted_async
+ @inlineCallbacks
def ListLogicalDevices(self, request, context):
- log.warning('temp-limited-implementation')
- # TODO dispatching to local instead of collecting all
- return self.dispatcher.dispatch(
- self.instance_id,
- VolthaLocalServiceStub,
- 'ListLogicalDevices',
- Empty(),
- context)
+ log.info('grpc-request', request=request)
+ response = yield self.dispatcher.dispatch('ListLogicalDevices',
+ Empty(),
+ context,
+ broadcast=True)
+ log.info('grpc-response', response=response)
+ returnValue(response)
@twisted_async
+ @inlineCallbacks
def GetLogicalDevice(self, request, context):
log.info('grpc-request', request=request)
- try:
- instance_id = self.dispatcher.instance_id_by_logical_device_id(
- request.id
- )
- except KeyError:
+ response = yield self.dispatcher.dispatch('GetLogicalDevice',
+ request,
+ context,
+ id=request.id)
+ log.info('grpc-response', response=response)
+ if isinstance(response, DispatchError):
+ log.info('grpc-error-response', error=response.error_code)
context.set_details(
- 'Logical device \'{}\' not found'.format(request.id))
- context.set_code(StatusCode.NOT_FOUND)
- return LogicalDevice()
-
- return self.dispatcher.dispatch(
- instance_id,
- VolthaLocalServiceStub,
- 'GetLogicalDevice',
- request,
- context)
+ 'Logical device \'{}\' error'.format(request.id))
+ context.set_code(response.error_code)
+ returnValue(LogicalDevice())
+ else:
+ log.info('grpc-success-response', response=response)
+ returnValue(response)
@twisted_async
+ @inlineCallbacks
def ListLogicalDevicePorts(self, request, context):
log.info('grpc-request', request=request)
- try:
- instance_id = self.dispatcher.instance_id_by_logical_device_id(
- request.id
- )
- except KeyError:
+ response = yield self.dispatcher.dispatch('ListLogicalDevicePorts',
+ request,
+ context,
+ id=request.id)
+ log.info('grpc-response', response=response)
+ if isinstance(response, DispatchError):
+ log.info('grpc-error-response', error=response.error_code)
context.set_details(
- 'Logical device \'{}\' not found'.format(request.id))
- context.set_code(StatusCode.NOT_FOUND)
- return Ports()
-
- return self.dispatcher.dispatch(
- instance_id,
- VolthaLocalServiceStub,
- 'ListLogicalDevicePorts',
- request,
- context)
+ 'Logical device ports \'{}\' error'.format(request.id))
+ context.set_code(response.error_code)
+ returnValue(Ports())
+ else:
+ log.info('grpc-success-response', response=response)
+ returnValue(response)
@twisted_async
+ @inlineCallbacks
def ListLogicalDeviceFlows(self, request, context):
log.info('grpc-request', request=request)
-
- try:
- instance_id = self.dispatcher.instance_id_by_logical_device_id(
- request.id
- )
- except KeyError:
+ response = yield self.dispatcher.dispatch('ListLogicalDeviceFlows',
+ request,
+ context,
+ id=request.id)
+ log.info('grpc-response', response=response)
+ if isinstance(response, DispatchError):
+ log.info('grpc-error-response', error=response.error_code)
context.set_details(
- 'Logical device \'{}\' not found'.format(request.id))
- context.set_code(StatusCode.NOT_FOUND)
- return Flows()
-
- return self.dispatcher.dispatch(
- instance_id,
- VolthaLocalServiceStub,
- 'ListLogicalDeviceFlows',
- request,
- context)
+ 'Logical device \'{}\' error'.format(request.id))
+ context.set_code(response.error_code)
+ returnValue(Flows())
+ else:
+ log.info('grpc-success-response', response=response)
+ returnValue(response)
@twisted_async
+ @inlineCallbacks
def UpdateLogicalDeviceFlowTable(self, request, context):
log.info('grpc-request', request=request)
-
- try:
- instance_id = self.dispatcher.instance_id_by_logical_device_id(
- request.id
- )
- except KeyError:
- context.set_details(
- 'Logical device \'{}\' not found'.format(request.id))
- context.set_code(StatusCode.NOT_FOUND)
- return Empty()
-
- return self.dispatcher.dispatch(
- instance_id,
- VolthaLocalServiceStub,
+ response = yield self.dispatcher.dispatch(
'UpdateLogicalDeviceFlowTable',
request,
- context)
+ context,
+ id=request.id)
+ log.info('grpc-response', response=response)
+ if isinstance(response, DispatchError):
+ log.info('grpc-error-response', error=response.error_code)
+ context.set_details(
+ 'Logical device \'{}\' error'.format(request.id))
+ context.set_code(response.error_code)
+ returnValue(Empty())
+ else:
+ log.info('grpc-success-response', response=response)
+ returnValue(response)
@twisted_async
+ @inlineCallbacks
def ListLogicalDeviceFlowGroups(self, request, context):
log.info('grpc-request', request=request)
-
- try:
- instance_id = self.dispatcher.instance_id_by_logical_device_id(
- request.id
- )
- except KeyError:
- context.set_details(
- 'Logical device \'{}\' not found'.format(request.id))
- context.set_code(StatusCode.NOT_FOUND)
- return FlowGroups()
-
- return self.dispatcher.dispatch(
- instance_id,
- VolthaLocalServiceStub,
+ response = yield self.dispatcher.dispatch(
'ListLogicalDeviceFlowGroups',
request,
- context)
+ context,
+ id=request.id)
+ log.info('grpc-response', response=response)
+ if isinstance(response, DispatchError):
+ log.info('grpc-error-response', error=response.error_code)
+ context.set_details(
+ 'Logical device \'{}\' error'.format(request.id))
+ context.set_code(response.error_code)
+ returnValue(FlowGroups())
+ else:
+ log.info('grpc-success-response', response=response)
+ returnValue(response)
@twisted_async
+ @inlineCallbacks
def UpdateLogicalDeviceFlowGroupTable(self, request, context):
log.info('grpc-request', request=request)
-
- try:
- instance_id = self.dispatcher.instance_id_by_logical_device_id(
- request.id
- )
- except KeyError:
- context.set_details(
- 'Logical device \'{}\' not found'.format(request.id))
- context.set_code(StatusCode.NOT_FOUND)
- return Empty()
-
- return self.dispatcher.dispatch(
- instance_id,
- VolthaLocalServiceStub,
+ response = yield self.dispatcher.dispatch(
'UpdateLogicalDeviceFlowGroupTable',
request,
- context)
+ context,
+ id=request.id)
+ log.info('grpc-response', response=response)
+ if isinstance(response, DispatchError):
+ log.info('grpc-error-response', error=response.error_code)
+ context.set_details(
+ 'Logical device \'{}\' error'.format(request.id))
+ context.set_code(response.error_code)
+ returnValue(Empty())
+ else:
+ log.info('grpc-success-response', response=response)
+ returnValue(response)
@twisted_async
+ @inlineCallbacks
def ListDevices(self, request, context):
- log.warning('temp-limited-implementation')
- # TODO dispatching to local instead of collecting all
- return self.dispatcher.dispatch(
- self.instance_id,
- VolthaLocalServiceStub,
- 'ListDevices',
- request,
- context)
+ log.info('grpc-request', request=request)
+ response = yield self.dispatcher.dispatch('ListDevices',
+ Empty(),
+ context,
+ broadcast=True)
+ log.info('grpc-response', response=response)
+ returnValue(response)
@twisted_async
+ @inlineCallbacks
def GetDevice(self, request, context):
log.info('grpc-request', request=request)
-
- try:
- instance_id = self.dispatcher.instance_id_by_device_id(
- request.id
- )
- except KeyError:
- context.set_details(
- 'Device \'{}\' not found'.format(request.id))
- context.set_code(StatusCode.NOT_FOUND)
- return Device()
-
- return self.dispatcher.dispatch(
- instance_id,
- VolthaLocalServiceStub,
- 'GetDevice',
- request,
- context)
+ response = yield self.dispatcher.dispatch('GetDevice',
+ request,
+ context,
+ id=request.id)
+ log.info('grpc-response', response=response)
+ if isinstance(response, DispatchError):
+ log.info('grpc-error-response', error=response.error_code)
+ context.set_details('Device \'{}\' error'.format(request.id))
+ context.set_code(response.error_code)
+ returnValue(Device())
+ else:
+ log.info('grpc-success-response', response=response)
+ returnValue(response)
@twisted_async
+ @inlineCallbacks
def CreateDevice(self, request, context):
log.info('grpc-request', request=request)
- # TODO dispatching to local instead of passing it to leader
- return self.dispatcher.dispatch(
- self.instance_id,
- VolthaLocalServiceStub,
- 'CreateDevice',
- request,
- context)
+ response = yield self.dispatcher.dispatch('CreateDevice',
+ request,
+ context)
+ log.info('grpc-response', response=response)
+ if isinstance(response, DispatchError):
+ log.info('grpc-error-response', error=response.error_code)
+ context.set_details('Create device error')
+ context.set_code(response.error_code)
+ returnValue(Device())
+ else:
+ log.info('grpc-success-response', response=response)
+ returnValue(response)
@twisted_async
+ @inlineCallbacks
def EnableDevice(self, request, context):
log.info('grpc-request', request=request)
-
- try:
- instance_id = self.dispatcher.instance_id_by_device_id(
- request.id
- )
- except KeyError:
- context.set_details(
- 'Device \'{}\' not found'.format(request.id))
- context.set_code(StatusCode.NOT_FOUND)
- return Device()
-
- return self.dispatcher.dispatch(
- instance_id,
- VolthaLocalServiceStub,
- 'EnableDevice',
- request,
- context)
-
+ response = yield self.dispatcher.dispatch('EnableDevice',
+ request,
+ context,
+ id=request.id)
+ log.info('grpc-response', response=response)
+ if isinstance(response, DispatchError):
+ log.info('grpc-error-response', error=response.error_code)
+ context.set_details('Device \'{}\' error'.format(request.id))
+ context.set_code(response.error_code)
+ returnValue(Device())
+ else:
+ log.info('grpc-success-response', response=response)
+ returnValue(response)
@twisted_async
+ @inlineCallbacks
def DisableDevice(self, request, context):
log.info('grpc-request', request=request)
-
- try:
- instance_id = self.dispatcher.instance_id_by_device_id(
- request.id
- )
- except KeyError:
- context.set_details(
- 'Device \'{}\' not found'.format(request.id))
- context.set_code(StatusCode.NOT_FOUND)
- return Device()
-
- return self.dispatcher.dispatch(
- instance_id,
- VolthaLocalServiceStub,
- 'DisableDevice',
- request,
- context)
+ response = yield self.dispatcher.dispatch('DisableDevice',
+ request,
+ context,
+ id=request.id)
+ log.info('grpc-response', response=response)
+ if isinstance(response, DispatchError):
+ log.info('grpc-error-response', error=response.error_code)
+ context.set_details('Device \'{}\' error'.format(request.id))
+ context.set_code(response.error_code)
+ returnValue(Device())
+ else:
+ log.info('grpc-success-response', response=response)
+ returnValue(response)
@twisted_async
+ @inlineCallbacks
def RebootDevice(self, request, context):
log.info('grpc-request', request=request)
-
- try:
- instance_id = self.dispatcher.instance_id_by_device_id(
- request.id
- )
- except KeyError:
- context.set_details(
- 'Device \'{}\' not found'.format(request.id))
- context.set_code(StatusCode.NOT_FOUND)
- return Device()
-
- return self.dispatcher.dispatch(
- instance_id,
- VolthaLocalServiceStub,
- 'RebootDevice',
- request,
- context)
+ response = yield self.dispatcher.dispatch('RebootDevice',
+ request,
+ context,
+ id=request.id)
+ log.info('grpc-response', response=response)
+ if isinstance(response, DispatchError):
+ log.info('grpc-error-response', error=response.error_code)
+ context.set_details('Device \'{}\' error'.format(request.id))
+ context.set_code(response.error_code)
+ returnValue(Device())
+ else:
+ log.info('grpc-success-response', response=response)
+ returnValue(response)
@twisted_async
+ @inlineCallbacks
def DeleteDevice(self, request, context):
log.info('grpc-request', request=request)
-
- try:
- instance_id = self.dispatcher.instance_id_by_device_id(
- request.id
- )
- except KeyError:
- context.set_details(
- 'Device \'{}\' not found'.format(request.id))
- context.set_code(StatusCode.NOT_FOUND)
- return Device()
-
- return self.dispatcher.dispatch(
- instance_id,
- VolthaLocalServiceStub,
- 'DeleteDevice',
- request,
- context)
+ response = yield self.dispatcher.dispatch('DeleteDevice',
+ request,
+ context,
+ id=request.id)
+ log.info('grpc-response', response=response)
+ if isinstance(response, DispatchError):
+ log.info('grpc-error-response', error=response.error_code)
+ context.set_details('Device \'{}\' error'.format(request.id))
+ context.set_code(response.error_code)
+ returnValue(Empty())
+ else:
+ log.info('grpc-success-response', response=response)
+ returnValue(Empty())
@twisted_async
+ @inlineCallbacks
def ListDevicePorts(self, request, context):
log.info('grpc-request', request=request)
-
- try:
- instance_id = self.dispatcher.instance_id_by_device_id(
- request.id
- )
- except KeyError:
- context.set_details(
- 'Device \'{}\' not found'.format(request.id))
- context.set_code(StatusCode.NOT_FOUND)
- return Ports()
-
- return self.dispatcher.dispatch(
- instance_id,
- VolthaLocalServiceStub,
- 'ListDevicePorts',
- request,
- context)
+ response = yield self.dispatcher.dispatch('ListDevicePorts',
+ request,
+ context,
+ id=request.id)
+ log.info('grpc-response', response=response)
+ if isinstance(response, DispatchError):
+ log.info('grpc-error-response', error=response.error_code)
+ context.set_details('Device \'{}\' error'.format(request.id))
+ context.set_code(response.error_code)
+ returnValue(Ports())
+ else:
+ log.info('grpc-success-response', response=response)
+ returnValue(response)
@twisted_async
+ @inlineCallbacks
def ListDevicePmConfigs(self, request, context):
log.info('grpc-request', request=request)
-
- try:
- instance_id = self.dispatcher.instance_id_by_device_id(
- request.id
- )
- except KeyError:
- context.set_details(
- 'Device \'{}\' not found'.format(request.id))
- context.set_code(StatusCode.NOT_FOUND)
- return PmConfigs()
-
- return self.dispatcher.dispatch(
- instance_id,
- VolthaLocalServiceStub,
- 'ListDevicePmConfigs',
- request,
- context)
+ response = yield self.dispatcher.dispatch('ListDevicePmConfigs',
+ request,
+ context,
+ id=request.id)
+ log.info('grpc-response', response=response)
+ if isinstance(response, DispatchError):
+ log.info('grpc-error-response', error=response.error_code)
+ context.set_details('Device \'{}\' error'.format(request.id))
+ context.set_code(response.error_code)
+ returnValue(PmConfigs())
+ else:
+ log.info('grpc-success-response', response=response)
+ returnValue(response)
@twisted_async
+ @inlineCallbacks
def UpdateDevicePmConfigs(self, request, context):
log.info('grpc-request', request=request)
-
- try:
- instance_id = self.dispatcher.instance_id_by_device_id(
- request.id
- )
- except KeyError:
- context.set_details(
- 'Device \'{}\' not found'.format(request.id))
- context.set_code(StatusCode.NOT_FOUND)
- return Empty()
-
- return self.dispatcher.dispatch(
- instance_id,
- VolthaLocalServiceStub,
- 'UpdateDevicePmConfigs',
- request,
- context)
+ response = yield self.dispatcher.dispatch('UpdateDevicePmConfigs',
+ request,
+ context,
+ id=request.id)
+ log.info('grpc-response', response=response)
+ if isinstance(response, DispatchError):
+ log.info('grpc-error-response', error=response.error_code)
+ context.set_details('Device \'{}\' error'.format(request.id))
+ context.set_code(response.error_code)
+ returnValue(Empty())
+ else:
+ log.info('grpc-success-response', response=response)
+ returnValue(response)
@twisted_async
+ @inlineCallbacks
def ListDeviceFlows(self, request, context):
log.info('grpc-request', request=request)
-
- try:
- instance_id = self.dispatcher.instance_id_by_device_id(
- request.id
- )
- except KeyError:
- context.set_details(
- 'Device \'{}\' not found'.format(request.id))
- context.set_code(StatusCode.NOT_FOUND)
- return Flows()
-
- return self.dispatcher.dispatch(
- instance_id,
- VolthaLocalServiceStub,
- 'ListDeviceFlows',
- request,
- context)
+ response = yield self.dispatcher.dispatch('ListDeviceFlows',
+ request,
+ context,
+ id=request.id)
+ log.info('grpc-response', response=response)
+ if isinstance(response, DispatchError):
+ log.info('grpc-error-response', error=response.error_code)
+ context.set_details('Device \'{}\' error'.format(request.id))
+ context.set_code(response.error_code)
+ returnValue(Flows())
+ else:
+ log.info('grpc-success-response', response=response)
+ returnValue(response)
@twisted_async
+ @inlineCallbacks
def ListDeviceFlowGroups(self, request, context):
log.info('grpc-request', request=request)
-
- try:
- instance_id = self.dispatcher.instance_id_by_device_id(
- request.id
- )
- except KeyError:
- context.set_details(
- 'Device \'{}\' not found'.format(request.id))
- context.set_code(StatusCode.NOT_FOUND)
- return FlowGroups()
-
- return self.dispatcher.dispatch(
- instance_id,
- VolthaLocalServiceStub,
- 'ListDeviceFlowGroups',
- request,
- context)
+ response = yield self.dispatcher.dispatch('ListDeviceFlowGroups',
+ request,
+ context,
+ id=request.id)
+ log.info('grpc-response', response=response)
+ if isinstance(response, DispatchError):
+ log.info('grpc-error-response', error=response.error_code)
+ context.set_details('Device \'{}\' error'.format(request.id))
+ context.set_code(response.error_code)
+ returnValue(FlowGroups())
+ else:
+ log.info('grpc-success-response', response=response)
+ returnValue(response)
@twisted_async
+ @inlineCallbacks
def ListDeviceTypes(self, request, context):
log.info('grpc-request', request=request)
# we always deflect this to the local instance, as we assume
# they all loaded the same adapters, supporting the same device
# types
- return self.dispatcher.dispatch(
- self.instance_id,
- VolthaLocalServiceStub,
- 'ListDeviceTypes',
- request,
- context)
+ response = yield self.dispatcher.dispatch('ListDeviceTypes',
+ request,
+ context)
+ log.info('grpc-response', response=response)
+ if isinstance(response, DispatchError):
+ log.info('grpc-error-response', error=response.error_code)
+ context.set_details('Device types error')
+ context.set_code(response.error_code)
+ returnValue(DeviceTypes())
+ else:
+ log.info('grpc-success-response', response=response)
+ returnValue(response)
@twisted_async
+ @inlineCallbacks
def GetDeviceType(self, request, context):
log.info('grpc-request', request=request)
# we always deflect this to the local instance, as we assume
# they all loaded the same adapters, supporting the same device
# types
- return self.dispatcher.dispatch(
- self.instance_id,
- VolthaLocalServiceStub,
- 'GetDeviceType',
- request,
- context)
+ response = yield self.dispatcher.dispatch('GetDeviceType',
+ request,
+ context)
+ log.info('grpc-response', response=response)
+ if isinstance(response, DispatchError):
+ log.info('grpc-error-response', error=response.error_code)
+ context.set_details('Device type \'{}\' error'.format(
+ request.id))
+ context.set_code(response.error_code)
+ returnValue(DeviceType())
+ else:
+ log.info('grpc-success-response', response=response)
+ returnValue(response)
@twisted_async
+ @inlineCallbacks
def ListDeviceGroups(self, request, context):
- log.warning('temp-limited-implementation')
- # TODO dispatching to local instead of collecting all
- return self.dispatcher.dispatch(
- self.instance_id,
- VolthaLocalServiceStub,
- 'ListDeviceGroups',
- Empty(),
- context)
+ log.info('grpc-request', request=request)
+ response = yield self.dispatcher.dispatch('ListDeviceGroups',
+ Empty(),
+ context,
+ broadcast=True)
+ log.info('grpc-response', response=response)
+ returnValue(response)
@twisted_async
+ @inlineCallbacks
def GetDeviceGroup(self, request, context):
- log.warning('temp-limited-implementation')
- # TODO dispatching to local instead of collecting all
- return self.dispatcher.dispatch(
- self.instance_id,
- VolthaLocalServiceStub,
- 'GetDeviceGroup',
- request,
- context)
-
+ log.info('grpc-request', request=request)
+ response = yield self.dispatcher.dispatch('GetDeviceGroup',
+ request,
+ context,
+ id=request.id)
+ log.info('grpc-response', response=response)
+ if isinstance(response, DispatchError):
+ log.info('grpc-error-response', error=response.error_code)
+ context.set_details('Device group\'{}\' error'.format(request.id))
+ context.set_code(response.error_code)
+ returnValue(DeviceGroup())
+ else:
+ log.info('grpc-success-response', response=response)
+ returnValue(response)
@twisted_async
+ @inlineCallbacks
def CreateAlarmFilter(self, request, context):
log.info('grpc-request', request=request)
- # TODO dispatching to local instead of passing it to leader
- return self.dispatcher.dispatch(
- self.instance_id,
- VolthaLocalServiceStub,
- 'CreateAlarmFilter',
- request,
- context)
+ # Since AlarmFilter applies to the entire cluster, it will be assigned
+ # a global id (using a global core_id). Every Voltha instance will
+ # have the same data. Since the voltha instances are managed by
+ # docker swarm mode then whenever an instance goes down it will be
+ # brought up right away, hence reducing the chance of two instances
+ # having different data. In future phases, we should adopt the
+ # strategy of having a unique persistence model for cluster data
+ # compare to instance data
+ try:
+ assert isinstance(request, AlarmFilter)
+ request.id = create_cluster_id()
+ except AssertionError, e:
+ context.set_details(e.message)
+ context.set_code(StatusCode.INVALID_ARGUMENT)
+ returnValue(AlarmFilter())
+
+ response = yield self.dispatcher.dispatch('CreateAlarmFilter',
+ request,
+ context,
+ id=request.id,
+ broadcast=True)
+ log.info('grpc-response', response=response)
+ if isinstance(response, DispatchError):
+ log.info('grpc-error-response', error=response.error_code)
+ context.set_details('Create alarm error')
+ context.set_code(response.error_code)
+ returnValue(AlarmFilter())
+ else:
+ log.info('grpc-success-response', response=response)
+ returnValue(response)
@twisted_async
+ @inlineCallbacks
def GetAlarmFilter(self, request, context):
- log.warning('temp-limited-implementation')
- # TODO dispatching to local instead of collecting all
- return self.dispatcher.dispatch(
- self.instance_id,
- VolthaLocalServiceStub,
- 'GetAlarmFilter',
- request,
- context)
+ log.info('grpc-request', request=request)
+ response = yield self.dispatcher.dispatch('GetAlarmFilter',
+ request,
+ context,
+ id=request.id)
+ log.info('grpc-response', response=response)
+ if isinstance(response, DispatchError):
+ log.info('grpc-error-response', error=response.error_code)
+ context.set_details('Alarm filter\'{}\' error'.format(request.id))
+ context.set_code(response.error_code)
+ returnValue(AlarmFilter())
+ else:
+ log.info('grpc-success-response', response=response)
+ returnValue(response)
@twisted_async
+ @inlineCallbacks
def UpdateAlarmFilter(self, request, context):
log.info('grpc-request', request=request)
- # TODO dispatching to local instead of passing it to leader
- return self.dispatcher.dispatch(
- self.instance_id,
- VolthaLocalServiceStub,
- 'UpdateAlarmFilter',
- request,
- context)
+ response = yield self.dispatcher.dispatch('UpdateAlarmFilter',
+ request,
+ context,
+ id=request.id,
+ broadcast=True)
+ log.info('grpc-response', response=response)
+ if isinstance(response, DispatchError):
+ log.info('grpc-error-response', error=response.error_code)
+ context.set_details('Alarm filter\'{}\' error'.format(request.id))
+ context.set_code(response.error_code)
+ returnValue(AlarmFilter())
+ else:
+ log.info('grpc-success-response', response=response)
+ returnValue(response)
@twisted_async
+ @inlineCallbacks
def DeleteAlarmFilter(self, request, context):
log.info('grpc-request', request=request)
- # TODO dispatching to local instead of passing it to leader
- return self.dispatcher.dispatch(
- self.instance_id,
- VolthaLocalServiceStub,
- 'DeleteAlarmFilter',
- request,
- context)
+ response = yield self.dispatcher.dispatch('DeleteAlarmFilter',
+ request,
+ context,
+ id=request.id,
+ broadcast=True)
+ log.info('grpc-response', response=response)
+ if isinstance(response, DispatchError):
+ log.info('grpc-error-response', error=response.error_code)
+ context.set_details('Alarm filter\'{}\' error'.format(request.id))
+ context.set_code(response.error_code)
+ returnValue(Empty())
+ else:
+ log.info('grpc-success-response', response=response)
+ returnValue(Empty())
@twisted_async
+ @inlineCallbacks
def ListAlarmFilters(self, request, context):
- log.warning('temp-limited-implementation')
- # TODO dispatching to local instead of collecting all
- return self.dispatcher.dispatch(
- self.instance_id,
- VolthaLocalServiceStub,
- 'ListAlarmFilters',
- Empty(),
- context)
+ log.info('grpc-request', request=request)
+ response = yield self.dispatcher.dispatch('ListAlarmFilters',
+ Empty(),
+ context,
+ broadcast=True)
+ log.info('grpc-response', response=response)
+ if isinstance(response, DispatchError):
+ log.info('grpc-error-response', error=response.error_code)
+ context.set_details('Alarm filters error')
+ context.set_code(response.error_code)
+ returnValue(AlarmFilter())
+ else:
+ log.info('grpc-success-response', response=response)
+ returnValue(response)
@twisted_async
+ @inlineCallbacks
def GetImages(self, request, context):
log.info('grpc-request', request=request)
-
- try:
- instance_id = self.dispatcher.instance_id_by_device_id(request.id)
- except KeyError:
- context.set_details(
- 'Device \'{}\' not found'.format(request.id))
- context.set_code(StatusCode.NOT_FOUND)
- return Images()
-
- return self.dispatcher.dispatch(
- instance_id,
- VolthaLocalServiceStub,
- 'GetImages',
- request,
- context)
+ response = yield self.dispatcher.dispatch('GetImages',
+ request,
+ context,
+ id=request.id)
+ log.info('grpc-response', response=response)
+ if isinstance(response, DispatchError):
+ log.info('grpc-error-response', error=response.error_code)
+ context.set_details('Device \'{}\' error'.format(request.id))
+ context.set_code(response.error_code)
+ returnValue(Images())
+ else:
+ log.info('grpc-success-response', response=response)
+ returnValue(response)
@twisted_async
+ @inlineCallbacks
def SelfTest(self, request, context):
log.info('grpc-request', request=request)
-
- try:
- instance_id = self.dispatcher.instance_id_by_device_id(
- request.id
- )
- except KeyError:
- context.set_details(
- 'Device \'{}\' not found'.format(request.id))
- context.set_code(StatusCode.NOT_FOUND)
- return SelfTestResponse()
-
- return self.dispatcher.dispatch(
- instance_id,
- VolthaLocalServiceStub,
- 'SelfTest',
- request,
- context)
+ response = yield self.dispatcher.dispatch('SelfTest',
+ request,
+ context,
+ id=request.id)
+ log.info('grpc-response', response=response)
+ if isinstance(response, DispatchError):
+ log.info('grpc-error-response', error=response.error_code)
+ context.set_details('Device \'{}\' error'.format(request.id))
+ context.set_code(response.error_code)
+ returnValue(SelfTestResponse())
+ else:
+ log.info('grpc-success-response', response=response)
+ returnValue(response)
diff --git a/voltha/core/local_handler.py b/voltha/core/local_handler.py
index 63efcef..bc36a35 100644
--- a/voltha/core/local_handler.py
+++ b/voltha/core/local_handler.py
@@ -625,6 +625,7 @@
try:
alarm_filter = self.root.get('/alarm_filters/{}'.format(request.id))
+
return alarm_filter
except KeyError:
context.set_details(
@@ -654,16 +655,13 @@
try:
assert isinstance(request, AlarmFilter)
alarm_filter = request
- assert alarm_filter.id == '', 'Alarm filter to be created cannot have id yet'
-
+ assert alarm_filter.id is not None, 'Local Alarm filter to be ' \
+ 'created must have id'
except AssertionError, e:
context.set_details(e.message)
context.set_code(StatusCode.INVALID_ARGUMENT)
return AlarmFilter()
- # fill additional data
- alarm_filter.id = uuid4().hex[:12]
-
# add device to tree
self.root.add('/alarm_filters', alarm_filter)
diff --git a/voltha/leader.py b/voltha/leader.py
index 47ed8bc..ca8d9b4 100644
--- a/voltha/leader.py
+++ b/voltha/leader.py
@@ -24,6 +24,7 @@
from simplejson import dumps, loads
from common.utils.asleep import asleep
+from common.utils.id_generation import get_next_core_id
log = get_logger()
@@ -68,9 +69,6 @@
self.core_data_id_match = re.compile(
self.CORE_STORE_KEY_EXTRACTOR % self.coord.core_store_prefix).match
- self.core_store_assignment_key = self.coord.core_store_prefix + \
- '/assignment'
-
self.assignment_match = re.compile(
self.ASSIGNMENT_EXTRACTOR % self.coord.assignment_prefix).match
@@ -155,21 +153,22 @@
try:
# Get the mapping record
(_, mappings) = yield self.coord.kv_get(
- self.core_store_assignment_key, recurse=True)
+ self.coord.core_store_assignment_key, recurse=True)
if mappings:
self.core_store_assignment = loads(mappings[0]['Value'])
return
else: # Key has not been created yet
# Create the key with an empty dictionary value
value = dict()
- result = yield self.coord.kv_put(self.core_store_assignment_key,
- dumps(value))
+ result = yield self.coord.kv_put(
+ self.coord.core_store_assignment_key,
+ dumps(value))
if not result:
raise ConfigMappingException(self.instance_id)
# Ensure the record was created
(_, mappings) = yield self.coord.kv_get(
- self.core_store_assignment_key, recurse=True)
+ self.coord.core_store_assignment_key, recurse=True)
self.core_store_assignment = loads(mappings[0]['Value'])
@@ -205,9 +204,9 @@
self.coord.membership_prefix, index=index, recurse=True)
# Only members with valid session are considered active
- members = [{'id':self.member_id_match(e['Key']).group(2),
+ members = [{'id': self.member_id_match(e['Key']).group(2),
'host': loads(e['Value'])['host_address']}
- for e in results if 'Session' in e ]
+ for e in results if 'Session' in e]
log.info('active-members', active_members=members)
@@ -257,9 +256,6 @@
@inlineCallbacks
def _reassign_core_stores(self):
- def _get_new_str_id(max_val_in_str):
- return str(int(max_val_in_str) + 1)
-
def _get_core_data_id_from_instance(instance_name):
for id, instance in self.core_store_assignment.iteritems():
if instance and instance['id'] == instance_name:
@@ -287,7 +283,7 @@
# 2. Update the mapping with the new set
current_id = max(self.core_store_assignment) \
- if self.core_store_assignment else '0'
+ if self.core_store_assignment else '0000'
for instance in self.members:
if instance['id'] not in existing_active_config_members:
# Add the member to the config map
@@ -297,21 +293,22 @@
updated_mapping[next_id] = instance
else:
# There are no empty slot, create new ids
- current_id = _get_new_str_id(current_id)
+ current_id = get_next_core_id(current_id)
updated_mapping[current_id] = instance
self.core_store_assignment = updated_mapping
log.info('updated-assignment',
- core_store_assignment=self.core_store_assignment,
- inactive_members=inactive_members)
+ core_store_assignment=self.core_store_assignment,
+ inactive_members=inactive_members)
# 3. save the mapping into consul
- yield self.coord.kv_put(self.core_store_assignment_key,
+ yield self.coord.kv_put(self.coord.core_store_assignment_key,
dumps(self.core_store_assignment))
# 4. Assign the new workload to the newly created members
curr_members_set = set([m['id'] for m in self.members])
- new_members = curr_members_set.difference(existing_active_config_members)
+ new_members = curr_members_set.difference(
+ existing_active_config_members)
for new_member_id in new_members:
yield self.coord.kv_put(
self.coord.assignment_prefix
@@ -331,88 +328,88 @@
log.exception('config-reassignment-failure', e=e)
self._restart_core_store_reassignment_soak_timer()
- # @inlineCallbacks
- # def _reassign_work(self):
- #
- # log.info('reassign-work')
- #
- # # Plan
- # #
- # # Step 1: calculate desired assignment from current members and
- # # workload list (e.g., using consistent hashing or any other
- # # algorithm
- # # Step 2: collect current assignments from consul
- # # Step 3: find the delta between the desired and actual assignments:
- # # these form two lists:
- # # 1. new assignments to be made
- # # 2. obsolete assignments to be revoked
- # # graceful handling may be desirable when moving existing
- # # assignment from existing member to another member (to make
- # # sure it is abandoned by old member before new takes charge)
- # # Step 4: orchestrate the assignment by adding/deleting(/locking)
- # # entries in consul
- # #
- # # We must make sure while we are working on this, we do not re-enter
- # # into same method!
- #
- # try:
- #
- # # Step 1: generate wanted assignment (mapping work to members)
- #
- # ring = HashRing(self.members)
- # wanted_assignments = dict() # member_id -> set(work_id)
- # _ = [
- # wanted_assignments.setdefault(ring.get_node(work), set())
- # .add(work)
- # for work in self.workload
- # ]
- # for (member, work) in sorted(wanted_assignments.iteritems()):
- # log.info('assignment',
- # member=member, work_count=len(work))
- #
- # # Step 2: discover current assignment (from consul)
- #
- # (_, results) = yield self.coord.kv_get(
- # self.coord.assignment_prefix, recurse=True)
- #
- # matches = [
- # (self.assignment_match(e['Key']), e) for e in results or []]
- #
- # current_assignments = dict() # member_id -> set(work_id)
- # _ = [
- # current_assignments.setdefault(
- # m.groupdict()['member_id'], set())
- # .add(m.groupdict()['work_id'])
- # for m, e in matches if m is not None
- # ]
- #
- # # Step 3: handle revoked assignments first on a per member basis
- #
- # for member_id, current_work in current_assignments.iteritems():
- # assert isinstance(current_work, set)
- # wanted_work = wanted_assignments.get(member_id, set())
- # work_to_revoke = current_work.difference(wanted_work)
- #
- # # revoking work by simply deleting the assignment entry
- # # TODO if we want some feedback to see that member abandoned
- # # work, we could add a consul-based protocol here
- # for work_id in work_to_revoke:
- # yield self.coord.kv_delete(
- # self.coord.assignment_prefix
- # + member_id + '/' + work_id)
- #
- # # Step 4: assign new work as needed
- #
- # for member_id, wanted_work in wanted_assignments.iteritems():
- # assert isinstance(wanted_work, set)
- # current_work = current_assignments.get(member_id, set())
- # work_to_assign = wanted_work.difference(current_work)
- #
- # for work_id in work_to_assign:
- # yield self.coord.kv_put(
- # self.coord.assignment_prefix
- # + member_id + '/' + work_id, '')
- #
- # except Exception, e:
- # log.exception('failed-reassignment', e=e)
- # self._restart_reassignment_soak_timer() # try again in a while
+ # @inlineCallbacks
+ # def _reassign_work(self):
+ #
+ # log.info('reassign-work')
+ #
+ # # Plan
+ # #
+ # # Step 1: calculate desired assignment from current members and
+ # # workload list (e.g., using consistent hashing or any other
+ # # algorithm
+ # # Step 2: collect current assignments from consul
+ # # Step 3: find the delta between the desired and actual assignments:
+ # # these form two lists:
+ # # 1. new assignments to be made
+ # # 2. obsolete assignments to be revoked
+ # # graceful handling may be desirable when moving existing
+ # # assignment from existing member to another member (to make
+ # # sure it is abandoned by old member before new takes charge)
+ # # Step 4: orchestrate the assignment by adding/deleting(/locking)
+ # # entries in consul
+ # #
+ # # We must make sure while we are working on this, we do not re-enter
+ # # into same method!
+ #
+ # try:
+ #
+ # # Step 1: generate wanted assignment (mapping work to members)
+ #
+ # ring = HashRing(self.members)
+ # wanted_assignments = dict() # member_id -> set(work_id)
+ # _ = [
+ # wanted_assignments.setdefault(ring.get_node(work), set())
+ # .add(work)
+ # for work in self.workload
+ # ]
+ # for (member, work) in sorted(wanted_assignments.iteritems()):
+ # log.info('assignment',
+ # member=member, work_count=len(work))
+ #
+ # # Step 2: discover current assignment (from consul)
+ #
+ # (_, results) = yield self.coord.kv_get(
+ # self.coord.assignment_prefix, recurse=True)
+ #
+ # matches = [
+ # (self.assignment_match(e['Key']), e) for e in results or []]
+ #
+ # current_assignments = dict() # member_id -> set(work_id)
+ # _ = [
+ # current_assignments.setdefault(
+ # m.groupdict()['member_id'], set())
+ # .add(m.groupdict()['work_id'])
+ # for m, e in matches if m is not None
+ # ]
+ #
+ # # Step 3: handle revoked assignments first on a per member basis
+ #
+ # for member_id, current_work in current_assignments.iteritems():
+ # assert isinstance(current_work, set)
+ # wanted_work = wanted_assignments.get(member_id, set())
+ # work_to_revoke = current_work.difference(wanted_work)
+ #
+ # # revoking work by simply deleting the assignment entry
+ # # TODO if we want some feedback to see that member abandoned
+ # # work, we could add a consul-based protocol here
+ # for work_id in work_to_revoke:
+ # yield self.coord.kv_delete(
+ # self.coord.assignment_prefix
+ # + member_id + '/' + work_id)
+ #
+ # # Step 4: assign new work as needed
+ #
+ # for member_id, wanted_work in wanted_assignments.iteritems():
+ # assert isinstance(wanted_work, set)
+ # current_work = current_assignments.get(member_id, set())
+ # work_to_assign = wanted_work.difference(current_work)
+ #
+ # for work_id in work_to_assign:
+ # yield self.coord.kv_put(
+ # self.coord.assignment_prefix
+ # + member_id + '/' + work_id, '')
+ #
+ # except Exception, e:
+ # log.exception('failed-reassignment', e=e)
+ # self._restart_reassignment_soak_timer() # try again in a while
diff --git a/voltha/main.py b/voltha/main.py
index ae914d0..787eb36 100755
--- a/voltha/main.py
+++ b/voltha/main.py
@@ -317,6 +317,7 @@
VolthaCore(
instance_id=self.instance_id,
core_store_id = self.core_store_id,
+ grpc_port=self.args.grpc_port,
version=VERSION,
log_level=LogLevel.INFO
)
diff --git a/voltha/worker.py b/voltha/worker.py
index aff83b1..bb8932a 100644
--- a/voltha/worker.py
+++ b/voltha/worker.py
@@ -19,6 +19,7 @@
from twisted.internet import reactor
from twisted.internet.base import DelayedCall
from twisted.internet.defer import inlineCallbacks, returnValue, Deferred
+from simplejson import dumps, loads
from common.utils.asleep import asleep
@@ -46,6 +47,7 @@
self.my_workload = set() # list of work_id's assigned to me
self.assignment_soak_timer = None
+ self.assignment_core_store_soak_timer = None
self.my_candidate_workload = set() # we stash here during soaking
self.assignment_match = re.compile(
@@ -55,10 +57,13 @@
self.wait_for_core_store_assignment = Deferred()
+ self.peers_map = None
+
@inlineCallbacks
def start(self):
log.debug('starting')
yield self._start_tracking_my_assignments()
+ yield self._start_tracking_my_peers()
log.info('started')
returnValue(self)
@@ -79,15 +84,15 @@
returnValue(val)
# Private methods:
-
def _start_tracking_my_assignments(self):
reactor.callLater(0, self._track_my_assignments, 0)
+ def _start_tracking_my_peers(self):
+ reactor.callLater(0, self._track_my_peers, 0)
+
@inlineCallbacks
def _track_my_assignments(self, index):
-
try:
-
# if there is no leader yet, wait for a stable leader
d = self.coord.wait_for_a_leader()
if not d.called:
@@ -105,13 +110,13 @@
if results and not self.mycore_store_id:
# We have no store id set yet
core_stores = [c['Value'] for c in results if
- c['Key'] == self.coord.assignment_prefix +
- self.instance_id + '/' +
- self.coord.core_storage_suffix]
+ c['Key'] == self.coord.assignment_prefix +
+ self.instance_id + '/' +
+ self.coord.core_storage_suffix]
if core_stores:
self.mycore_store_id = core_stores[0]
log.debug('store-assigned',
- mycore_store_id=self.mycore_store_id)
+ mycore_store_id=self.mycore_store_id)
self._stash_and_restart_core_store_soak_timer()
# 2. Check whether we have been assigned a work item
@@ -130,9 +135,40 @@
# to prevent flood
finally:
- if not self.halted:
+ if not self.halted and not self.mycore_store_id:
reactor.callLater(0, self._track_my_assignments, index)
+ @inlineCallbacks
+ def _track_my_peers(self, index):
+ try:
+ if self.mycore_store_id:
+ # Wait for updates to the store assigment key
+ (index, mappings) = yield self.coord.kv_get(
+ self.coord.core_store_assignment_key,
+ index=index,
+ recurse=True)
+ if mappings:
+ new_map = loads(mappings[0]['Value'])
+ # Remove my id from my peers list
+ new_map.pop(self.mycore_store_id)
+ if self.peers_map is None or self.peers_map != new_map:
+ self.coord.publish_peers_map_change(new_map)
+ self.peers_map = new_map
+ log.debug('peer-mapping-changed', mapping=new_map)
+
+ except Exception, e:
+ log.exception('peer-track-error', e=e)
+ yield asleep(
+ self.coord.worker_config.get(
+ self.coord.worker_config[
+ 'assignments_track_error_to_avoid_flood'], 1))
+ # to prevent flood
+ finally:
+ if not self.halted:
+ # Wait longer if we have not received a core id yet
+ reactor.callLater(0 if self.mycore_store_id else 5,
+ self._track_my_peers, index)
+
def _stash_and_restart_soak_timer(self, candidate_workload):
log.debug('re-start-assignment-soaking')
@@ -151,9 +187,9 @@
:return: None
"""
log.debug('my-assignments-changed',
- old_count=len(self.my_workload),
- new_count=len(self.my_candidate_workload),
- workload=self.my_workload)
+ old_count=len(self.my_workload),
+ new_count=len(self.my_candidate_workload),
+ workload=self.my_workload)
self.my_workload, self.my_candidate_workload = \
self.my_candidate_workload, None
@@ -161,14 +197,14 @@
log.debug('re-start-assignment-config-soaking')
- if self.assignment_soak_timer is not None:
- if not self.assignment_soak_timer.called:
- self.assignment_soak_timer.cancel()
+ if self.assignment_core_store_soak_timer is not None:
+ if not self.assignment_core_store_soak_timer.called:
+ self.assignment_core_store_soak_timer.cancel()
- self.assignment_soak_timer = reactor.callLater(
+ self.assignment_core_store_soak_timer = reactor.callLater(
self.soak_time, self._process_config_assignment)
def _process_config_assignment(self):
log.debug('process-config-assignment',
- mycore_store_id=self.mycore_store_id)
- self.wait_for_core_store_assignment.callback(self.mycore_store_id)
\ No newline at end of file
+ mycore_store_id=self.mycore_store_id)
+ self.wait_for_core_store_assignment.callback(self.mycore_store_id)