Add more tests to the global dispatcher and some minor fixes
Change-Id: I8c62e18c52a09052b68f824ba9055a6b6d30a1d6
diff --git a/tests/itests/README.md b/tests/itests/README.md
index dff4850..aec496a 100644
--- a/tests/itests/README.md
+++ b/tests/itests/README.md
@@ -94,7 +94,7 @@
of configurations in between to ensure data integrity is preserved.
During this test, the user will be prompted to start ponsim. Use
-these commands to run ponsim with 1 OLT and 4 ONUs. THis will also
+these commands to run ponsim with 1 OLT and 4 ONUs. This will also
enable alarm at a frequency of 5 seconds:
```
sudo -s
@@ -134,6 +134,25 @@
* **Dispatcher**: This test exercises the requests forwarding via the Global
handler.
+
+During this test, the user will be prompted to start ponsim. Use
+these commands to run ponsim with 1 OLT and 4 ONUs.
+
+```
+sudo -s
+. ./env.sh
+./ponsim/main.py -v -o 4
+```
+
+The user will also be prompted to enable port forwarding on ponmgmt
+bridge. Use these commands:
+
+```
+sudo -s
+echo 8 > /sys/class/net/ponmgmt/bridge/group_fwd_mask
+```
+
+Run the test:
```
cd /cord/incubator/voltha
. ./env.sh
diff --git a/tests/itests/voltha/test_dispatcher.py b/tests/itests/voltha/test_dispatcher.py
index c369fc0..0f6ba73 100644
--- a/tests/itests/voltha/test_dispatcher.py
+++ b/tests/itests/voltha/test_dispatcher.py
@@ -2,17 +2,22 @@
from time import time, sleep
from google.protobuf.json_format import MessageToDict
-from unittest import main, TestCase
+from unittest import main
from voltha.protos.device_pb2 import Device
from tests.itests.voltha.rest_base import RestBase
-from voltha.core.flow_decomposer import mk_simple_flow_mod, in_port, output
-from voltha.protos import openflow_13_pb2 as ofp
-from common.utils.consulhelpers import get_endpoint_from_consul
+from common.utils.consulhelpers import get_endpoint_from_consul, \
+ get_all_instances_of_service
from common.utils.consulhelpers import verify_all_services_healthy
from tests.itests.docutests.test_utils import \
run_command_to_completion_with_raw_stdout, \
run_command_to_completion_with_stdout_in_list
from voltha.protos.voltha_pb2 import AlarmFilter
+from google.protobuf.empty_pb2 import Empty
+import grpc
+from voltha.protos import third_party
+from voltha.protos import voltha_pb2
+from voltha.core.flow_decomposer import *
+from voltha.protos.openflow_13_pb2 import FlowTableUpdate
LOCAL_CONSUL = "localhost:8500"
DOCKER_COMPOSE_FILE = "compose/docker-compose-system-test.yml"
@@ -29,12 +34,17 @@
.format(DOCKER_COMPOSE_FILE),
docker_compose_remove_voltha="docker-compose -f {} rm -f voltha"
.format(DOCKER_COMPOSE_FILE),
+ docker_compose_scale_voltha="docker-compose -f {} scale "
+ "voltha=".format(DOCKER_COMPOSE_FILE),
kafka_topics="kafkacat -b {} -L",
kafka_alarms="kafkacat -o end -b {} -C -t voltha.alarms -c 2",
kafka_kpis="kafkacat -o end -b {} -C -t voltha.kpis -c 5"
)
+
class DispatcherTest(RestBase):
+ def setUp(self):
+ self.grpc_channels = dict()
t0 = [time()]
@@ -44,7 +54,6 @@
msg)
DispatcherTest.t0[0] = t1
-
def wait_till(self, msg, predicate, interval=0.1, timeout=5.0):
deadline = time() + timeout
while time() < deadline:
@@ -53,6 +62,11 @@
sleep(interval)
self.fail('Timed out while waiting for condition: {}'.format(msg))
+ def get_channel(self, voltha_grpc):
+ if voltha_grpc not in self.grpc_channels:
+ self.grpc_channels[voltha_grpc] = grpc.insecure_channel(
+ voltha_grpc)
+ return self.grpc_channels[voltha_grpc]
def test_01_global_rest_apis(self):
# Start the voltha ensemble with a single voltha instance
@@ -62,39 +76,163 @@
self.set_rest_endpoint()
self.set_kafka_endpoint()
- self._get_root()
- self._get_schema()
- self._get_health()
- self._get_voltha()
- self._list_voltha_instances()
- self._get_voltha_instance()
- olt_id = self._add_olt_device()
- self._verify_device_preprovisioned_state(olt_id)
- self._activate_device(olt_id)
- ldev_id = self._wait_for_logical_device(olt_id)
- ldevices = self._list_logical_devices()
+ self._get_root_rest()
+ self._get_schema_rest()
+ self._get_health_rest()
+ self._get_voltha_rest()
+ self._list_voltha_instances_rest()
+ self._get_voltha_instance_rest()
+ olt_id = self._add_olt_device_rest()
+ self._verify_device_preprovisioned_state_rest(olt_id)
+ self._activate_device_rest(olt_id)
+ ldev_id = self._wait_for_logical_device_rest(olt_id)
+ ldevices = self._list_logical_devices_rest()
logical_device_id = ldevices['items'][0]['id']
- self._get_logical_device(logical_device_id)
- self._list_logical_device_ports(logical_device_id)
- self._list_and_update_logical_device_flows(logical_device_id)
- self._list_and_update_logical_device_flow_groups(logical_device_id)
- devices = self._list_devices()
+ self._get_logical_device_rest(logical_device_id)
+ self._list_logical_device_ports_rest(logical_device_id)
+ self._list_and_update_logical_device_flows_rest(logical_device_id)
+ self._list_and_update_logical_device_flow_groups_rest(
+ logical_device_id)
+ devices = self._list_devices_rest()
device_id = devices['items'][0]['id']
- self._get_device(device_id)
- self._list_device_ports(device_id)
- self._list_device_flows(device_id)
- self._list_device_flow_groups(device_id)
- self._get_images(device_id)
- self._self_test(device_id)
- dtypes = self._list_device_types()
- self._get_device_type(dtypes['items'][0]['id'])
- alarm_filter = self._create_device_filter(olt_id)
- self._remove_device_filter(alarm_filter['id'])
+ self._get_device_rest(device_id)
+ self._list_device_ports_rest(device_id)
+ self._list_device_flows_rest(device_id)
+ self._list_device_flow_groups_rest(device_id)
+ self._get_images_rest(device_id)
+ self._self_test_rest(device_id)
+ dtypes = self._list_device_types_rest()
+ self._get_device_type_rest(dtypes['items'][0]['id'])
+ alarm_filter = self._create_device_filter_rest(olt_id)
+ self._remove_device_filter_rest(alarm_filter['id'])
# TODO: PM APIs test
def test_02_cross_instances_dispatch(self):
- """ TODO: So far manual tests done. Needs to be automated. """
- pass
+
+ def prompt(input_func, text):
+ val = input_func(text)
+ return val
+
+ def prompt_for_return(text):
+ return raw_input(text)
+
+ # Start the voltha ensemble with a single voltha instance
+ self._stop_and_remove_all_containers()
+ sleep(5) # A small wait for the system to settle down
+ self.start_all_containers()
+ self.set_rest_endpoint()
+ self.set_kafka_endpoint()
+
+ # Scale voltha to 3 instances and setup the voltha grpc assigments
+ self._scale_voltha(3)
+ sleep(10) # A small wait for the system to settle down
+ voltha_instances = get_all_instances_of_service(LOCAL_CONSUL,
+ 'voltha-grpc')
+ self.assertEqual(len(voltha_instances), 3)
+ self.ponsim_voltha_stub_local = voltha_pb2.VolthaLocalServiceStub(
+ self.get_channel(self._get_grpc_address(voltha_instances[2])))
+ self.ponsim_voltha_stub_global = voltha_pb2.VolthaGlobalServiceStub(
+ self.get_channel(self._get_grpc_address(voltha_instances[2])))
+
+ self.simulated_voltha_stub_local = voltha_pb2.VolthaLocalServiceStub(
+ self.get_channel(self._get_grpc_address(voltha_instances[1])))
+ self.simulated_voltha_stub_global = voltha_pb2.VolthaGlobalServiceStub(
+ self.get_channel(self._get_grpc_address(voltha_instances[1])))
+
+ self.empty_voltha_stub_local = voltha_pb2.VolthaLocalServiceStub(
+ self.get_channel(self._get_grpc_address(voltha_instances[0])))
+ self.empty_voltha_stub_global = voltha_pb2.VolthaGlobalServiceStub(
+ self.get_channel(self._get_grpc_address(voltha_instances[0])))
+
+ # Prompt the user to start ponsim
+ # Get the user to start PONSIM as root
+ prompt(prompt_for_return,
+ '\nStart PONSIM as root in another window ...')
+
+ prompt(prompt_for_return,
+ '\nEnsure port forwarding is set on ponmgnt ...')
+
+ # Test 1:
+ # A. Provision a pomsim olt using the ponsim_voltha_stub
+ # B. Enable the posim olt using the simulated_voltha_stub
+ # C. Wait for onu discovery using the empty_voltha_stub
+ ponsim_olt = self._provision_ponsim_olt_grpc(
+ self.ponsim_voltha_stub_local)
+ ponsim_logical_device_id = self._enable_device_grpc(
+ self.simulated_voltha_stub_global,
+ ponsim_olt.id)
+ self._wait_for_onu_discovery_grpc(self.empty_voltha_stub_global,
+ ponsim_olt.id,
+ count=4)
+ # Test 2:
+ # A. Provision a simulated olt using the simulated_voltha_stub
+ # B. Enable the simulated olt using the ponsim_voltha_stub
+ # C. Wait for onu discovery using the empty_voltha_stub
+ simulated_olt = self._provision_simulated_olt_grpc(
+ self.simulated_voltha_stub_local)
+ simulated_logical_device_id = self._enable_device_grpc(
+ self.ponsim_voltha_stub_global, simulated_olt.id)
+ self._wait_for_onu_discovery_grpc(self.empty_voltha_stub_global,
+ simulated_olt.id, count=4)
+
+ # Test 3:
+ # Verify that we have at least 8 devices created using the global
+ # REST and also via direct grpc in the empty stub
+ devices_via_rest = self._list_devices_rest(8)['items']
+ devices_via_global_grpc = self._get_devices_grpc(
+ self.empty_voltha_stub_global)
+ assert len(devices_via_rest) == len(devices_via_global_grpc)
+
+ # Test 4:
+ # A. Create 2 Alarms filters using REST
+ # B. Ensure it is present across all instances
+ # C. Ensure when requesting the alarm filters we do not get
+ # duplicate results
+ alarm_filter1 = self._create_device_filter_rest(ponsim_olt.id)
+ alarm_filter2 = self._create_device_filter_rest(simulated_olt.id)
+ global_filters = self._get_alarm_filters_rest()
+ filter = self._get_alarm_filters_grpc(self.simulated_voltha_stub_local)
+ assert global_filters == MessageToDict(filter)
+ filter = self._get_alarm_filters_grpc(self.ponsim_voltha_stub_local)
+ assert global_filters == MessageToDict(filter)
+ filter = self._get_alarm_filters_grpc(self.empty_voltha_stub_local)
+ assert global_filters == MessageToDict(filter)
+ filter = self._get_alarm_filters_grpc(self.empty_voltha_stub_global)
+ assert global_filters == MessageToDict(filter)
+
+ # Test 5:
+ # A. Delete an alarm filter
+ # B. Ensure that filter is deleted from all instances
+ self._remove_device_filter_rest(alarm_filter1['id'])
+ previous_filters = global_filters
+ global_filters = self._get_alarm_filters_rest()
+ assert global_filters != previous_filters
+ filter = self._get_alarm_filters_grpc(self.simulated_voltha_stub_local)
+ assert global_filters == MessageToDict(filter)
+ filter = self._get_alarm_filters_grpc(self.ponsim_voltha_stub_local)
+ assert global_filters == MessageToDict(filter)
+ filter = self._get_alarm_filters_grpc(self.empty_voltha_stub_local)
+ assert global_filters == MessageToDict(filter)
+ filter = self._get_alarm_filters_grpc(self.empty_voltha_stub_global)
+ assert global_filters == MessageToDict(filter)
+
+ # Test 6:
+ # A. Simulate EAPOL install on ponsim instance using grpc
+ # B. Validate the flows using global REST
+ # C. Retrieve the flows from global grpc using empty voltha instance
+ self._install_eapol_flow_grpc(self.ponsim_voltha_stub_local,
+ ponsim_logical_device_id)
+ self._verify_olt_eapol_flow_rest(ponsim_olt.id)
+ res = self._get_olt_flows_grpc(self.empty_voltha_stub_global,
+ ponsim_logical_device_id)
+
+ # TODO: More tests to be added as new features are added
+
+
+ def _get_grpc_address(self, voltha_instance):
+ address = '{}:{}'.format(voltha_instance['ServiceAddress'],
+ voltha_instance['ServicePort'])
+ return address
def _stop_and_remove_all_containers(self):
# check if there are any running containers first
@@ -122,7 +260,8 @@
timeout=10)
self.wait_till('chameleon services HEALTHY',
lambda: verify_all_services_healthy(
- LOCAL_CONSUL,service_name='chameleon-rest') == True,
+ LOCAL_CONSUL,
+ service_name='chameleon-rest') == True,
timeout=10)
# Chameleon takes some time to compile the protos and make them
@@ -137,36 +276,42 @@
def set_kafka_endpoint(self):
self.kafka_endpoint = get_endpoint_from_consul(LOCAL_CONSUL, 'kafka')
+ def _scale_voltha(self, scale=2):
+ self.pt("Scaling voltha ...")
+ cmd = command_defs['docker_compose_scale_voltha'] + str(scale)
+ out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
+ self.assertEqual(rc, 0)
- def _get_root(self):
+ def _get_root_rest(self):
res = self.get('/', expected_content_type='text/html')
self.assertGreaterEqual(res.find('swagger'), 0)
- def _get_schema(self):
+ def _get_schema_rest(self):
res = self.get('/schema')
- self.assertEqual(set(res.keys()), {'protos', 'yang_from','swagger_from'})
+ self.assertEqual(set(res.keys()),
+ {'protos', 'yang_from', 'swagger_from'})
- def _get_health(self):
+ def _get_health_rest(self):
res = self.get('/health')
self.assertEqual(res['state'], 'HEALTHY')
# ~~~~~~~~~~~~~~~~~~~~~ TOP LEVEL VOLTHA OPERATIONS ~~~~~~~~~~~~~~~~~~~~~~~
- def _get_voltha(self):
+ def _get_voltha_rest(self):
res = self.get('/api/v1')
self.assertEqual(res['version'], '0.9.0')
- def _list_voltha_instances(self):
+ def _list_voltha_instances_rest(self):
res = self.get('/api/v1/instances')
self.assertEqual(len(res['items']), 1)
- def _get_voltha_instance(self):
+ def _get_voltha_instance_rest(self):
res = self.get('/api/v1/instances')
- voltha_id=res['items'][0]
+ voltha_id = res['items'][0]
res = self.get('/api/v1/instances/{}'.format(voltha_id))
self.assertEqual(res['version'], '0.9.0')
- def _add_olt_device(self):
+ def _add_olt_device_rest(self, grpc=None):
device = Device(
type='simulated_olt',
mac_address='00:00:00:00:00:01'
@@ -175,7 +320,81 @@
expected_code=200)
return device['id']
- def _verify_device_preprovisioned_state(self, olt_id):
+ def _provision_simulated_olt_grpc(self, stub):
+ device = Device(
+ type='simulated_olt',
+ mac_address='00:00:00:00:00:01'
+ )
+ device = stub.CreateDevice(device)
+ return device
+
+ def _provision_ponsim_olt_grpc(self, stub):
+ device = Device(
+ type='ponsim_olt',
+ host_and_port='172.17.0.1:50060'
+ )
+ device = stub.CreateDevice(device)
+ return device
+
+ def _enable_device_grpc(self, stub, device_id):
+ logical_device_id = None
+ try:
+ stub.EnableDevice(voltha_pb2.ID(id=device_id))
+ while True:
+ device = stub.GetDevice(voltha_pb2.ID(id=device_id))
+ # If this is an OLT then acquire logical device id
+ if device.oper_status == voltha_pb2.OperStatus.ACTIVE:
+ if device.type.endswith('_olt'):
+ assert device.parent_id
+ logical_device_id = device.parent_id
+ self.pt('success (logical device id = {})'.format(
+ logical_device_id))
+ else:
+ self.pt('success (device id = {})'.format(device.id))
+ break
+ self.pt('waiting for device to be enabled...')
+ sleep(.5)
+ except Exception, e:
+ self.pt('Error enabling {}. Error:{}'.format(device_id, e))
+ return logical_device_id
+
+ def _delete_device_grpc(self, stub, device_id):
+ try:
+ stub.DeleteDevice(voltha_pb2.ID(id=device_id))
+ while True:
+ device = stub.GetDevice(voltha_pb2.ID(id=device_id))
+ assert not device
+ except Exception, e:
+ self.pt('deleting device {}. Error:{}'.format(device_id, e))
+
+ def _get_devices_grpc(self, stub):
+ res = stub.ListDevices(Empty())
+ return res.items
+
+ def _find_onus_grpc(self, stub, olt_id):
+ devices = self._get_devices_grpc(stub)
+ return [
+ d for d in devices
+ if d.parent_id == olt_id
+ ]
+
+ def _wait_for_onu_discovery_grpc(self, stub, olt_id, count=4):
+ # shortly after we shall see the discovery of four new onus, linked to
+ # the olt device
+ self.wait_till(
+ 'find ONUs linked to the olt device',
+ lambda: len(self._find_onus_grpc(stub, olt_id)) >= count,
+ 2
+ )
+ # verify that they are properly set
+ onus = self._find_onus_grpc(stub, olt_id)
+ for onu in onus:
+ self.assertEqual(onu.admin_state, 3) # ENABLED
+ self.assertEqual(onu.oper_status, 4) # ACTIVE
+
+ return [onu.id for onu in onus]
+
+ def _verify_device_preprovisioned_state_rest(self, olt_id):
# we also check that so far what we read back is same as what we get
# back on create
device = self.get('/api/v1/devices/{}'.format(olt_id))
@@ -184,7 +403,7 @@
self.assertEqual(device['admin_state'], 'PREPROVISIONED')
self.assertEqual(device['oper_status'], 'UNKNOWN')
- def _activate_device(self, olt_id):
+ def _activate_device_rest(self, olt_id):
path = '/api/v1/devices/{}'.format(olt_id)
self.post(path + '/enable', expected_code=200)
device = self.get(path)
@@ -212,7 +431,7 @@
ports = self.get(path + '/ports')['items']
self.assertEqual(len(ports), 2)
- def _wait_for_logical_device(self, olt_id):
+ def _wait_for_logical_device_rest(self, olt_id):
# we shall find the logical device id from the parent_id of the olt
# (root) device
device = self.get(
@@ -238,20 +457,20 @@
self.assertEqual(logical_port['device_port_no'], 2)
return logical_device['id']
- def _list_logical_devices(self):
+ def _list_logical_devices_rest(self):
res = self.get('/api/v1/logical_devices')
self.assertGreaterEqual(len(res['items']), 1)
return res
- def _get_logical_device(self, id):
+ def _get_logical_device_rest(self, id):
res = self.get('/api/v1/logical_devices/{}'.format(id))
self.assertIsNotNone(res['datapath_id'])
- def _list_logical_device_ports(self, id):
+ def _list_logical_device_ports_rest(self, id):
res = self.get('/api/v1/logical_devices/{}/ports'.format(id))
self.assertGreaterEqual(len(res['items']), 1)
- def _list_and_update_logical_device_flows(self, id):
+ def _list_and_update_logical_device_flows_rest(self, id):
# retrieve flow list
res = self.get('/api/v1/logical_devices/{}/flows'.format(id))
@@ -280,7 +499,7 @@
len_after = len(res['items'])
self.assertGreater(len_after, len_before)
- def _list_and_update_logical_device_flow_groups(self, id):
+ def _list_and_update_logical_device_flow_groups_rest(self, id):
# retrieve flow list
res = self.get('/api/v1/logical_devices/{}/flow_groups'.format(id))
@@ -316,34 +535,34 @@
len_after = len(res['items'])
self.assertGreater(len_after, len_before)
- def _list_devices(self):
+ def _list_devices_rest(self, count=2):
res = self.get('/api/v1/devices')
- self.assertGreaterEqual(len(res['items']), 2)
+ self.assertGreaterEqual(len(res['items']), count)
return res
- def _get_device(self, id):
+ def _get_device_rest(self, id):
res = self.get('/api/v1/devices/{}'.format(id))
# TODO test result
- def _list_device_ports(self, id):
+ def _list_device_ports_rest(self, id, count=2):
res = self.get('/api/v1/devices/{}/ports'.format(id))
- self.assertGreaterEqual(len(res['items']), 2)
+ self.assertGreaterEqual(len(res['items']), count)
- def _list_device_flows(self, id):
+ def _list_device_flows_rest(self, id, count=1):
# pump some flows into the logical device
res = self.get('/api/v1/devices/{}/flows'.format(id))
- self.assertGreaterEqual(len(res['items']), 1)
+ self.assertGreaterEqual(len(res['items']), count)
- def _list_device_flow_groups(self,id):
+ def _list_device_flow_groups_rest(self, id, count=0):
res = self.get('/api/v1/devices/{}/flow_groups'.format(id))
- self.assertGreaterEqual(len(res['items']), 0)
+ self.assertGreaterEqual(len(res['items']), count)
- def _list_device_types(self):
+ def _list_device_types_rest(self, count=2):
res = self.get('/api/v1/device_types')
- self.assertGreaterEqual(len(res['items']), 2)
+ self.assertGreaterEqual(len(res['items']), count)
return res
- def _get_device_type(self, dtype):
+ def _get_device_type_rest(self, dtype):
res = self.get('/api/v1/device_types/{}'.format(dtype))
self.assertIsNotNone(res)
# TODO test the result
@@ -358,16 +577,16 @@
# res = self.get('/api/v1/device_groups/1')
# # TODO test the result
- def _get_images(self, id):
+ def _get_images_rest(self, id):
res = self.get('/api/v1/devices/{}/images'.format(id))
self.assertIsNotNone(res)
- def _self_test(self, id):
+ def _self_test_rest(self, id):
res = self.post('/api/v1/devices/{}/self_test'.format(id),
expected_code=200)
self.assertIsNotNone(res)
- def _create_device_filter(self, device_id):
+ def _create_device_filter_rest(self, device_id):
rules = list()
rule = dict()
@@ -383,12 +602,93 @@
self.assertIsNotNone(alarm_filter)
return alarm_filter
- def _remove_device_filter(self, alarm_filter_id):
+ def _remove_device_filter_rest(self, alarm_filter_id):
path = '/api/v1/alarm_filters/{}'.format(alarm_filter_id)
self.delete(path, expected_code=200)
alarm_filter = self.get(path, expected_code=404)
self.assertIsNone(alarm_filter)
+ def _get_alarm_filter_grpc(self, stub, alarm_filter_id):
+ res = stub.GetAlarmFilter(voltha_pb2.ID(id=alarm_filter_id))
+ return res
+
+ def _get_alarm_filters_grpc(self, stub):
+ res = stub.ListAlarmFilters(Empty())
+ return res
+
+ def _get_alarm_filters_rest(self):
+ res = self.get('/api/v1/alarm_filters')
+ return res
+
+ def _install_eapol_flow_grpc(self, stub, logical_device_id):
+ """
+ Install an EAPOL flow on the given logical device. If device is not
+ given, it will be applied to logical device of the last pre-provisioned
+ OLT device.
+ """
+
+ # gather NNI and UNI port IDs
+ nni_port_no, unis = self._get_logical_ports(stub, logical_device_id)
+
+ # construct and push flow rule
+ for uni_port_no, _ in unis:
+ update = FlowTableUpdate(
+ id=logical_device_id,
+ flow_mod=mk_simple_flow_mod(
+ priority=2000,
+ match_fields=[in_port(uni_port_no), eth_type(0x888e)],
+ actions=[
+ # push_vlan(0x8100),
+ # set_field(vlan_vid(4096 + 4000)),
+ output(ofp.OFPP_CONTROLLER)
+ ]
+ )
+ )
+ res = stub.UpdateLogicalDeviceFlowTable(update)
+ self.pt('success for uni {} ({})'.format(uni_port_no, res))
+
+ def _get_logical_ports(self, stub, logical_device_id):
+ """
+ Return the NNI port number and the first usable UNI port of logical
+ device, and the vlan associated with the latter.
+ """
+ ports = stub.ListLogicalDevicePorts(
+ voltha_pb2.ID(id=logical_device_id)).items
+ nni = None
+ unis = []
+ for port in ports:
+ if port.root_port:
+ assert nni is None, "There shall be only one root port"
+ nni = port.ofp_port.port_no
+ else:
+ uni = port.ofp_port.port_no
+ uni_device = self._get_device_grpc(stub, port.device_id)
+ vlan = uni_device.vlan
+ unis.append((uni, vlan))
+
+ assert nni is not None, "No NNI port found"
+ assert unis, "Not a single UNI?"
+
+ return nni, unis
+
+ def _get_device_grpc(self, stub, device_id, depth=0):
+ res = stub.GetDevice(voltha_pb2.ID(id=device_id),
+ metadata=(('get-depth', str(depth)),))
+ return res
+
+ def _verify_olt_eapol_flow_rest(self, logical_device_id):
+ flows = self.get('/api/v1/devices/{}/flows'.format(logical_device_id))[
+ 'items']
+ self.assertEqual(len(flows), 2)
+ flow = flows[1]
+ self.assertEqual(flow['table_id'], 0)
+ self.assertEqual(flow['priority'], 2000)
+
+
+ def _get_olt_flows_grpc(self, stub, logical_device_id):
+ res = stub.ListLogicalDeviceFlows(voltha_pb2.ID(id=logical_device_id))
+ return res
+
if __name__ == '__main__':
main()
diff --git a/voltha/adapters/simulated_olt/simulated_olt.py b/voltha/adapters/simulated_olt/simulated_olt.py
index ce3242e..465efef 100644
--- a/voltha/adapters/simulated_olt/simulated_olt.py
+++ b/voltha/adapters/simulated_olt/simulated_olt.py
@@ -512,6 +512,7 @@
device_id=device_id,
channel_id=vlan_id
),
+ admin_state=AdminState.ENABLED,
vlan=vlan_id
)
except Exception as e:
diff --git a/voltha/core/dispatcher.py b/voltha/core/dispatcher.py
index 99357ac..3b7a217 100644
--- a/voltha/core/dispatcher.py
+++ b/voltha/core/dispatcher.py
@@ -173,7 +173,11 @@
request,
context)
# Then get peers results
+ current_responses = [result]
for core_id in self.peers_map:
+ if core_id == self.core_store_id:
+ continue # already processed
+
if self.peers_map[core_id] and self.grpc_conn_map[core_id]:
res = yield self._dispatch_to_peer(core_id,
method_name,
@@ -183,8 +187,9 @@
log.warning('ignoring-peer',
core_id=core_id,
error_code=res.error_code)
- else:
+ elif res not in current_responses:
result.MergeFrom(res)
+ current_responses.append(res)
returnValue(result)
def _local_dispatch(self, core_id, method_name, request, context):