[ 4222 ] Minor changes after code review
This is the initial commit for Persistence and Voltha restart.
It consists of the following:
1) Introduction of a store config id which represents the data of
a Voltha instance.
2) The Coordinator service dynamically allocates a store config id
to each voltha instance on startup. It also reallocates the same id
to another voltha instance in the event the previous voltha instance
with that store id went down.
3) All voltha data is stored in Consul as KV
4) When a Voltha instance is started and get allocated a config id that
refers to existing data (from an instance that went down), then it will
load all the data from Consul into its own memory and start a reconciliation
process.
5) During the reconciliation process, the necessary agents and
callbacks are created as per the data. A reconcile() API is also
invoked on the adapters to perform their side of the reconciliation.
6) The Reconciliation process is implemented in ponsim OLT and ONU
7) A set of integration tests focussed on persistence and voltha
restarts.
8) Fix a few bugs along the way
Change-Id: I8c2bbae3b2fc79d0afd8ce3b7b0be6bde93e492a
diff --git a/common/event_bus.py b/common/event_bus.py
index d572e62..8c903d9 100644
--- a/common/event_bus.py
+++ b/common/event_bus.py
@@ -111,7 +111,8 @@
try:
candidate.callback(topic, msg)
except Exception, e:
- log.warning('callback-failed', e=repr(e), topic=topic)
+ log.exception('callback-failed', e=repr(e), topic=topic)
+
default_bus = EventBus()
diff --git a/common/utils/consulhelpers.py b/common/utils/consulhelpers.py
index 7eeac38..df4dd58 100644
--- a/common/utils/consulhelpers.py
+++ b/common/utils/consulhelpers.py
@@ -59,7 +59,7 @@
items = services.keys()
if number_of_expected_services is not None and \
- len(items) != number_of_expected_services:
+ len(items) != number_of_expected_services:
return False
for item in items:
@@ -70,7 +70,6 @@
def get_all_services(consul_endpoint):
-
log.debug('getting-service-verify-health')
consul = connect_to_consult(consul_endpoint)
@@ -78,6 +77,23 @@
return services
+
+def get_all_instances_of_service(consul_endpoint, service_name):
+ log.debug('getting-all-instances-of-service', service=service_name)
+
+ consul = connect_to_consult(consul_endpoint)
+ _, services = consul.catalog.service(service_name)
+
+ for service in services:
+ log.debug('service',
+ name=service['ServiceName'],
+ serviceid=service['ServiceID'],
+ serviceport=service['ServicePort'],
+ createindex=service['CreateIndex'])
+
+ return services
+
+
def get_endpoint_from_consul(consul_endpoint, service_name):
"""
Get endpoint of service_name from consul.
@@ -108,7 +124,7 @@
if service['ServiceAddress'] == local_ipv4:
log.debug("picking address locally")
endpoint = '{}:{}'.format(service['ServiceAddress'],
- service['ServicePort'])
+ service['ServicePort'])
return endpoint
""" If service is not available locally, picak a random
@@ -121,5 +137,42 @@
return endpoint
+def get_healthy_instances(consul_endpoint, service_name=None,
+ number_of_expected_services=None):
+ """
+ Verify in consul if any service is healthy
+ :param consul_endpoint: a <host>:<port> string
+ :param service_name: name of service to check, optional
+ :param number_of_expected_services number of services to check for, optional
+ :return: true if healthy, false otherwise
+ """
+
+ def check_health(service):
+ _, serv_health = consul.health.service(service, passing=True)
+ return not serv_health == []
+
+ consul = connect_to_consult(consul_endpoint)
+
+ if service_name is not None:
+ return check_health(service_name)
+
+ services = get_all_services(consul_endpoint)
+
+ items = services.keys()
+
+ if number_of_expected_services is not None and \
+ len(items) != number_of_expected_services:
+ return False
+
+ for item in items:
+ if not check_health(item):
+ return False
+
+ return True
+
+
if __name__ == '__main__':
- print get_endpoint_from_consul('10.100.198.220:8500', 'kafka')
+ # print get_endpoint_from_consul('10.100.198.220:8500', 'kafka')
+ # print get_healthy_instances('10.100.198.220:8500', 'voltha-health')
+ # print get_healthy_instances('10.100.198.220:8500')
+ get_all_instances_of_service('10.100.198.220:8500', 'voltha-grpc')
diff --git a/compose/docker-compose-system-test.yml b/compose/docker-compose-system-test.yml
index fb6385f..7d15c16 100644
--- a/compose/docker-compose-system-test.yml
+++ b/compose/docker-compose-system-test.yml
@@ -50,6 +50,7 @@
command: [
"-ip=${DOCKER_HOST_IP}",
"-retry-attempts", "100",
+ "-cleanup",
# "-internal",
"consul://consul:8500"
]
@@ -123,6 +124,7 @@
"--kafka=@kafka",
"--instance-id-is-container-name",
"--interface=eth1",
+ "--backend=consul",
"-v"
]
ports:
diff --git a/ponsim/main.py b/ponsim/main.py
index 2100879..1b11407 100755
--- a/ponsim/main.py
+++ b/ponsim/main.py
@@ -125,8 +125,8 @@
dest='alarm_frequency',
action='store',
type=int,
- metavar="[30-300]",
- choices=range(30,301),
+ metavar="[5-300]",
+ choices=range(5,301),
default=60,
help=_help)
diff --git a/tests/itests/voltha/rest_base.py b/tests/itests/voltha/rest_base.py
index b304b03..3b880d6 100644
--- a/tests/itests/voltha/rest_base.py
+++ b/tests/itests/voltha/rest_base.py
@@ -26,8 +26,8 @@
return response.content
def get(self, path, expected_code=200,
- expected_content_type='application/json'):
- r = get(self.url(path))
+ expected_content_type='application/json', headers=None):
+ r = get(self.url(path), headers=headers)
self.assertEqual(r.status_code, expected_code,
msg='Code %d!=%d; msg:%s' % (
r.status_code, expected_code, r.content))
diff --git a/tests/itests/voltha/test_persistence.py b/tests/itests/voltha/test_persistence.py
new file mode 100644
index 0000000..428d51d
--- /dev/null
+++ b/tests/itests/voltha/test_persistence.py
@@ -0,0 +1,754 @@
+from google.protobuf.json_format import MessageToDict
+from time import time, sleep
+from voltha.core.flow_decomposer import *
+from voltha.protos import openflow_13_pb2 as ofp
+import simplejson, jsonschema
+import os
+import subprocess
+import select
+
+from tests.itests.docutests.test_utils import \
+ run_command_to_completion_with_raw_stdout, \
+ run_command_to_completion_with_stdout_in_list
+
+from common.utils.consulhelpers import verify_all_services_healthy
+
+from voltha.protos.device_pb2 import Device
+from tests.itests.voltha.rest_base import RestBase
+from common.utils.consulhelpers import get_endpoint_from_consul
+from voltha.protos.voltha_pb2 import AlarmFilter
+
+LOCAL_CONSUL = "localhost:8500"
+DOCKER_COMPOSE_FILE = "compose/docker-compose-system-test.yml"
+
+command_defs = dict(
+ docker_ps="docker ps",
+ docker_compose_start_all="docker-compose -f {} up -d "
+ .format(DOCKER_COMPOSE_FILE),
+ docker_stop_and_remove_all_containers="docker-compose -f {} down"
+ .format(DOCKER_COMPOSE_FILE),
+ docker_compose_start_voltha="docker-compose -f {} up -d voltha "
+ .format(DOCKER_COMPOSE_FILE),
+ docker_compose_stop_voltha="docker-compose -f {} stop voltha"
+ .format(DOCKER_COMPOSE_FILE),
+ docker_compose_remove_voltha="docker-compose -f {} rm -f voltha"
+ .format(DOCKER_COMPOSE_FILE),
+ kafka_topics="kafkacat -b {} -L",
+ kafka_alarms="kafkacat -o end -b {} -C -t voltha.alarms -c 2",
+ kafka_kpis="kafkacat -o end -b {} -C -t voltha.kpis -c 5"
+)
+
+ALARM_SCHEMA = {
+ "type": "object",
+ "properties": {
+ "id": {"type": "string"},
+ "type": {"type": "string"},
+ "category": {"type": "string"},
+ "state": {"type": "string"},
+ "severity": {"type": "string"},
+ "resource_id": {"type": "string"},
+ "raised_ts": {"type": "number"},
+ "reported_ts": {"type": "number"},
+ "changed_ts": {"type": "number"},
+ "description": {"type": "string"},
+ "context": {
+ "type": "object",
+ "additionalProperties": {"type": "string"}
+ }
+ }
+}
+
+
+class TestConsulPersistence(RestBase):
+ t0 = [time()]
+
+ def pt(self, msg=''):
+ t1 = time()
+ print '%20.8f ms - %s' % (1000 * (t1 - TestConsulPersistence.t0[0]),
+ msg)
+ TestConsulPersistence.t0[0] = t1
+
+ def test_all_scenarios(self):
+ self.basic_scenario()
+ self.data_integrity()
+
+ def basic_scenario(self):
+ # 1. Setup the test
+ # A. Stop and restart all containers (to start from clean)
+ # B. Setup the REST endpoint
+ self.pt('Test setup - starts')
+ self._stop_and_remove_all_containers()
+ sleep(5) # A small wait for the system to settle down
+ self.start_all_containers()
+ self.set_rest_endpoint()
+ self.set_kafka_endpoint()
+ self.pt('Test setup - ends')
+
+ # 2. Test 1 - Verify no data is present in voltha
+ self.pt('Test 1 - starts')
+ self.verify_instance_has_no_data()
+ self.pt('Test 1 - ends')
+
+ # 3. Test 2 - Verify voltha data is preserved after a restart
+ # A. Add data to voltha
+ # B. Stop voltha instance only (data is in consul)
+ # C. Start a new voltha instance
+ # D. Verify the data is previoulsy set data is in the new voltha
+ # instance
+ self.pt('Test 2 - starts')
+ self.add_data_to_voltha_instance()
+ instance_data_before = self.get_voltha_instance_data()
+ self.stop_remove_start_voltha()
+ instance_data_after = self.get_voltha_instance_data()
+ self.assertEqual(instance_data_before, instance_data_after)
+ self.pt('Test 2 - ends')
+
+ def data_integrity(self):
+ """
+ This test goes through several voltha restarts along with variations
+ of configurations in between to ensure data integrity is preserved.
+
+ During this test, the user will be prompted to start ponsim. Use
+ these commands to run ponsim with 1 OLT and 4 ONUs. THis will also
+ enable alarm at a frequency of 5 seconds:
+ sudo -s
+ . ./env.sh
+ ./ponsim/main.py -v -o 4 -a -f 5
+
+ The user will also be prompted to enable port forwarding on ponmgmt
+ bridge. Use these commands:
+ sudo -s
+ echo 8 > /sys/class/net/ponmgmt/bridge/group_fwd_mask
+ """
+
+ def prompt(input_func, text):
+ val = input_func(text)
+ return val
+
+ def prompt_for_return(text):
+ return raw_input(text)
+
+ # 1. Setup the test
+ # A. Stop and restart all containers (to start from clean)
+ # B. Setup the REST endpoint
+ self.pt('Test setup - starts')
+ self._stop_and_remove_all_containers()
+ sleep(5) # A small wait for the system to settle down
+ self.start_all_containers()
+ self.consume_kafka_message_starting_at = time()
+ self.set_rest_endpoint()
+ self.set_kafka_endpoint()
+ self.pt('Test setup - ends')
+
+ # Get the user to start PONSIM as root
+ prompt(prompt_for_return,
+ '\nStart PONSIM as root with alarms enabled in another window ...')
+
+ prompt(prompt_for_return,
+ '\nEnsure port forwarding is set on ponmgnt ...')
+
+ # 2. Configure some data on the volthainstance
+ self.assert_no_device_present()
+ olt = self.add_olt_device()
+ olt_id = olt['id']
+ self.pt(olt_id)
+ self.verify_device_preprovisioned_state(olt_id)
+ self.enable_device(olt_id)
+ ldev_id = self.wait_for_logical_device(olt_id)
+ onu_ids = self.wait_for_onu_discovery(olt_id)
+ self.verify_logical_ports(ldev_id, 5)
+ self.simulate_eapol_flow_install(ldev_id, olt_id, onu_ids)
+ self.verify_olt_eapol_flow(olt_id)
+ self.assert_kpis_event(olt_id)
+ self.assert_alarm_generation(olt_id)
+ alarm_filter = self.create_device_filter(olt_id)
+ self.consume_kafka_message_starting_at = time()
+ self.assert_alarm_generation(olt_id, event_present=False)
+
+ # 3. Kill and restart the voltha instance
+ self.assert_restart_voltha_successful()
+ self.assert_kpis_event(olt_id)
+ self.remove_device_filter(alarm_filter['id'])
+ self.assert_alarm_generation(olt_id)
+
+ self.pt('Voltha restart with initial set of data - successful')
+
+ # 4. Ensure we can keep doing operation on the new voltha instance
+ # as if nothing happened
+ olt_ids, onu_ids = self.get_olt_onu_devices()
+ self.disable_device(onu_ids[0])
+ self.verify_logical_ports(ldev_id, 4)
+
+ # 5. Kill and restart the voltha instance
+ self.assert_restart_voltha_successful()
+ self.assert_kpis_event(olt_id)
+ alarm_filter = self.create_device_filter(olt_id)
+ self.consume_kafka_message_starting_at = time()
+ self.assert_alarm_generation(olt_id, event_present=False)
+ self.remove_device_filter(alarm_filter['id'])
+ self.assert_alarm_generation(olt_id)
+
+ self.pt('Voltha restart with disabled ONU - successful')
+
+ # 6. Do some more operations
+ self.enable_device(onu_ids[0])
+ self.verify_logical_ports(ldev_id, 5)
+ self.simulate_eapol_flow_install(ldev_id, olt_id, onu_ids)
+ self.verify_olt_eapol_flow(olt_id)
+ self.disable_device(olt_ids[0])
+ self.assert_all_onus_state(olt_ids[0], 'DISABLED', 'UNKNOWN')
+ self.assert_no_logical_device()
+
+ # 6. Kill and restart the voltha instance
+ self.assert_restart_voltha_successful()
+ self.assert_kpis_event(olt_id, event_present=False)
+ self.assert_alarm_generation(olt_id, event_present=False)
+
+ self.pt('Voltha restart with disabled OLT - successful')
+
+ # 7. Enable OLT and very states of ONUs
+ self.enable_device(olt_ids[0])
+ self.assert_all_onus_state(olt_ids[0], 'ENABLED', 'ACTIVE')
+ self.wait_for_logical_device(olt_ids[0])
+
+ # 8. Kill and restart the voltha instance
+ self.assert_restart_voltha_successful()
+ self.assert_kpis_event(olt_id)
+ self.assert_alarm_generation(olt_id)
+
+ self.pt('Voltha restart with re-enabled OLT - successful')
+
+ # 9. Install EAPOL and disable ONU
+ self.simulate_eapol_flow_install(ldev_id, olt_id, onu_ids)
+ self.verify_olt_eapol_flow(olt_id)
+ self.disable_device(onu_ids[0])
+
+ # 10. Kill and restart the voltha instance
+ self.assert_restart_voltha_successful()
+ self.assert_kpis_event(olt_id)
+ self.assert_alarm_generation(olt_id)
+
+ self.pt('Voltha restart with EAPOL and disabled ONU - successful')
+
+ # 11. Delete the OLT and ONU
+ self.delete_device(onu_ids[0])
+ self.verify_logical_ports(ldev_id, 4)
+ self.disable_device(olt_ids[0])
+ self.delete_device(olt_ids[0])
+ self.assert_no_device_present()
+
+ # 13. Kill and restart the voltha instance
+ self.assert_restart_voltha_successful()
+ self.assert_kpis_event(olt_id, event_present=False)
+ self.assert_alarm_generation(olt_id, event_present=False)
+
+ self.pt('Voltha restart with no data - successful')
+
+ # 14. Verify no device present
+ self.assert_no_device_present()
+
+ def set_rest_endpoint(self):
+ self.rest_endpoint = get_endpoint_from_consul(LOCAL_CONSUL,
+ 'chameleon-rest')
+ self.base_url = 'http://' + self.rest_endpoint
+
+ def set_kafka_endpoint(self):
+ self.kafka_endpoint = get_endpoint_from_consul(LOCAL_CONSUL, 'kafka')
+
+ def assert_restart_voltha_successful(self):
+ self.maxDiff = None
+ instance_data_before = self.get_voltha_instance_data()
+ self.stop_remove_start_voltha()
+ instance_data_after = self.get_voltha_instance_data()
+ self.assertEqual(instance_data_before, instance_data_after)
+
+ def stop_remove_start_voltha(self):
+ self.stop_voltha(remove=True)
+ self.consume_kafka_message_starting_at = time()
+ self.start_voltha()
+ # REST endpoint may have changed after a voltha restart
+ # Send a basic command to trigger the REST endpoint to refresh itself
+ try:
+ self.get_devices()
+ except Exception as e:
+ self.pt('get-devices-fail expected')
+ # Wait for everything to settle
+ self.wait_till('chameleon service HEALTHY',
+ lambda: verify_all_services_healthy(LOCAL_CONSUL,
+ service_name='chameleon-rest') == True,
+ timeout=30)
+ # Chameleon takes some time to compile the protos and make them
+ # available. So let's wait 10 seconds
+ sleep(10)
+ # Update the REST endpoint info
+ self.set_rest_endpoint()
+
+ def wait_till(self, msg, predicate, interval=0.1, timeout=5.0):
+ deadline = time() + timeout
+ while time() < deadline:
+ if predicate():
+ return
+ sleep(interval)
+ self.fail('Timed out while waiting for condition: {}'.format(msg))
+
+ def _stop_and_remove_all_containers(self):
+ # check if there are any running containers first
+ cmd = command_defs['docker_ps']
+ out, err, rc = run_command_to_completion_with_stdout_in_list(cmd)
+ self.assertEqual(rc, 0)
+ if len(out) > 1: # not counting docker ps header
+ cmd = command_defs['docker_stop_and_remove_all_containers']
+ out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
+ self.assertEqual(rc, 0)
+
+ def start_all_containers(self):
+ t0 = time()
+
+ # start all the containers
+ self.pt("Starting all containers ...")
+ cmd = command_defs['docker_compose_start_all']
+ out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
+ self.assertEqual(rc, 0)
+
+ self.pt("Waiting for voltha and chameleon containers to be ready ...")
+ self.wait_till('voltha services HEALTHY',
+ lambda: verify_all_services_healthy(LOCAL_CONSUL,
+ service_name='voltha-grpc') == True,
+ timeout=10)
+ self.wait_till('chameleon services HEALTHY',
+ lambda: verify_all_services_healthy(LOCAL_CONSUL,
+ service_name='chameleon-rest') == True,
+ timeout=10)
+
+ # Chameleon takes some time to compile the protos and make them
+ # available. So let's wait 10 seconds
+ sleep(10)
+
+ def start_voltha(self):
+ t0 = time()
+ self.pt("Starting voltha ...")
+ cmd = command_defs['docker_compose_start_voltha']
+ out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
+ self.assertEqual(rc, 0)
+
+ self.pt("Waiting for voltha to be ready ...")
+ self.wait_till('voltha service HEALTHY',
+ lambda: verify_all_services_healthy(LOCAL_CONSUL,
+ service_name='voltha-grpc') == True,
+ timeout=30)
+ self.pt("Voltha is ready ...")
+
+ def stop_voltha(self, remove=False):
+ t0 = time()
+ self.pt("Stopping voltha ...")
+ cmd = command_defs['docker_compose_stop_voltha']
+ out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
+ self.assertEqual(rc, 0)
+
+ if remove:
+ cmd = command_defs['docker_compose_remove_voltha']
+ out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
+ self.assertEqual(rc, 0)
+
+ def get_devices(self):
+ devices = self.get('/api/v1/devices')['items']
+ return devices
+
+ def get_logical_devices(self):
+ ldevices = self.get('/api/v1/logical_devices')['items']
+ return ldevices
+
+ def get_adapters(self):
+ adapters = self.get('/api/v1/local/adapters')['items']
+ return adapters
+
+ def verify_instance_has_no_data(self):
+ data = self.get_voltha_instance_data()
+ self.assertEqual(data['logical_devices'], [])
+ self.assertEqual(data['devices'], [])
+
+ def add_data_to_voltha_instance(self):
+ # Preprovision a bunch of ponsim devices
+ self.n_olts = 100
+ self.olt_devices = {}
+ for i in xrange(self.n_olts):
+ d = self.add_olt_device()
+ self.olt_devices[d['id']] = d
+
+ def get_voltha_instance_data(self):
+ return self.get('/api/v1/local', headers={'get-depth': '-1'})
+
+ def add_olt_device(self):
+ device = Device(
+ type='ponsim_olt',
+ host_and_port='172.17.0.1:50060'
+ )
+ device = self.post('/api/v1/devices', MessageToDict(device),
+ expected_code=200)
+ return device
+
+ def get_olt_onu_devices(self):
+ devices = self.get('/api/v1/devices')['items']
+ olt_ids = []
+ onu_ids = []
+ for d in devices:
+ if d['adapter'] == 'ponsim_olt':
+ olt_ids.append(d['id'])
+ elif d['adapter'] == 'ponsim_onu':
+ onu_ids.append(d['id'])
+ else:
+ onu_ids.append(d['id'])
+ return olt_ids, onu_ids
+
+ def verify_device_preprovisioned_state(self, olt_id):
+ # we also check that so far what we read back is same as what we get
+ # back on create
+ device = self.get('/api/v1/devices/{}'.format(olt_id))
+ self.assertNotEqual(device['id'], '')
+ self.assertEqual(device['adapter'], 'ponsim_olt')
+ self.assertEqual(device['admin_state'], 'PREPROVISIONED')
+ self.assertEqual(device['oper_status'], 'UNKNOWN')
+
+ def enable_device(self, olt_id):
+ path = '/api/v1/devices/{}'.format(olt_id)
+ self.post(path + '/enable', expected_code=200)
+ device = self.get(path)
+ self.assertEqual(device['admin_state'], 'ENABLED')
+
+ self.wait_till(
+ 'admin state moves to ACTIVATING or ACTIVE',
+ lambda: self.get(path)['oper_status'] in ('ACTIVATING', 'ACTIVE'),
+ timeout=0.5)
+
+ # eventually, it shall move to active state and by then we shall have
+ # device details filled, connect_state set, and device ports created
+ self.wait_till(
+ 'admin state ACTIVE',
+ lambda: self.get(path)['oper_status'] == 'ACTIVE',
+ timeout=0.5)
+ device = self.get(path)
+ self.assertEqual(device['connect_status'], 'REACHABLE')
+
+ ports = self.get(path + '/ports')['items']
+ self.assertEqual(len(ports), 2)
+
+ def wait_for_logical_device(self, olt_id):
+ # we shall find the logical device id from the parent_id of the olt
+ # (root) device
+ device = self.get(
+ '/api/v1/devices/{}'.format(olt_id))
+ self.assertNotEqual(device['parent_id'], '')
+ logical_device = self.get(
+ '/api/v1/logical_devices/{}'.format(device['parent_id']))
+
+ # the logical device shall be linked back to the hard device,
+ # its ports too
+ self.assertEqual(logical_device['root_device_id'], device['id'])
+
+ logical_ports = self.get(
+ '/api/v1/logical_devices/{}/ports'.format(
+ logical_device['id'])
+ )['items']
+ self.assertGreaterEqual(len(logical_ports), 1)
+ logical_port = logical_ports[0]
+ self.assertEqual(logical_port['id'], 'nni')
+ self.assertEqual(logical_port['ofp_port']['name'], 'nni')
+ self.assertEqual(logical_port['ofp_port']['port_no'], 0)
+ self.assertEqual(logical_port['device_id'], device['id'])
+ self.assertEqual(logical_port['device_port_no'], 2)
+ return logical_device['id']
+
+ def find_onus(self, olt_id):
+ devices = self.get('/api/v1/devices')['items']
+ return [
+ d for d in devices
+ if d['parent_id'] == olt_id
+ ]
+
+ def wait_for_onu_discovery(self, olt_id):
+ # shortly after we shall see the discovery of four new onus, linked to
+ # the olt device
+ self.wait_till(
+ 'find four ONUs linked to the olt device',
+ lambda: len(self.find_onus(olt_id)) >= 4,
+ 2
+ )
+ # verify that they are properly set
+ onus = self.find_onus(olt_id)
+ for onu in onus:
+ self.assertEqual(onu['admin_state'], 'ENABLED')
+ self.assertEqual(onu['oper_status'], 'ACTIVE')
+
+ return [onu['id'] for onu in onus]
+
+ def assert_all_onus_state(self, olt_id, admin_state, oper_state):
+ # verify all onus are in a given state
+ onus = self.find_onus(olt_id)
+ for onu in onus:
+ self.assertEqual(onu['admin_state'], admin_state)
+ self.assertEqual(onu['oper_status'], oper_state)
+
+ return [onu['id'] for onu in onus]
+
+ def assert_onu_state(self, onu_id, admin_state, oper_state):
+ # Verify the onu states are correctly set
+ onu = self.get('/api/v1/devices/{}'.format(onu_id))
+ self.assertEqual(onu['admin_state'], admin_state)
+ self.assertEqual(onu['oper_status'], oper_state)
+
+ def verify_logical_ports(self, ldev_id, num_ports):
+
+ # at this point we shall see num_ports logical ports on the
+ # logical device
+ logical_ports = self.get(
+ '/api/v1/logical_devices/{}/ports'.format(ldev_id)
+ )['items']
+ self.assertGreaterEqual(len(logical_ports), num_ports)
+
+ # verify that all logical ports are LIVE (state=4)
+ for lport in logical_ports:
+ self.assertEqual(lport['ofp_port']['state'], 4)
+
+ def simulate_eapol_flow_install(self, ldev_id, olt_id, onu_ids):
+
+ # emulate the flow mod requests that shall arrive from the SDN
+ # controller, one for each ONU
+ lports = self.get(
+ '/api/v1/logical_devices/{}/ports'.format(ldev_id)
+ )['items']
+
+ # device_id -> logical port map, which we will use to construct
+ # our flows
+ lport_map = dict((lp['device_id'], lp) for lp in lports)
+ for onu_id in onu_ids:
+ # if eth_type == 0x888e => send to controller
+ _in_port = lport_map[onu_id]['ofp_port']['port_no']
+ req = ofp.FlowTableUpdate(
+ id='ponsim1',
+ flow_mod=mk_simple_flow_mod(
+ match_fields=[
+ in_port(_in_port),
+ vlan_vid(ofp.OFPVID_PRESENT | 0),
+ eth_type(0x888e)],
+ actions=[
+ output(ofp.OFPP_CONTROLLER)
+ ],
+ priority=1000
+ )
+ )
+ res = self.post('/api/v1/logical_devices/{}/flows'.format(ldev_id),
+ MessageToDict(req,
+ preserving_proto_field_name=True),
+ expected_code=200)
+
+ # for sanity, verify that flows are in flow table of logical device
+ flows = self.get(
+ '/api/v1/logical_devices/{}/flows'.format(ldev_id))['items']
+ self.assertGreaterEqual(len(flows), 4)
+
+ def verify_olt_eapol_flow(self, olt_id):
+ # olt shall have two flow rules, one is the default and the
+ # second is the result of eapol forwarding with rule:
+ # if eth_type == 0x888e => push vlan(1000); out_port=nni_port
+ flows = self.get('/api/v1/devices/{}/flows'.format(olt_id))['items']
+ self.assertEqual(len(flows), 2)
+ flow = flows[1]
+ self.assertEqual(flow['table_id'], 0)
+ self.assertEqual(flow['priority'], 1000)
+
+ # TODO refine this
+ # self.assertEqual(flow['match'], {})
+ # self.assertEqual(flow['instructions'], [])
+
+ def disable_device(self, id):
+ path = '/api/v1/devices/{}'.format(id)
+ self.post(path + '/disable', expected_code=200)
+ device = self.get(path)
+ self.assertEqual(device['admin_state'], 'DISABLED')
+
+ self.wait_till(
+ 'operational state moves to UNKNOWN',
+ lambda: self.get(path)['oper_status'] == 'UNKNOWN',
+ timeout=0.5)
+
+ # eventually, the connect_state should be UNREACHABLE
+ self.wait_till(
+ 'connest status UNREACHABLE',
+ lambda: self.get(path)['connect_status'] == 'UNREACHABLE',
+ timeout=0.5)
+
+ # Device's ports should be INACTIVE
+ ports = self.get(path + '/ports')['items']
+ self.assertEqual(len(ports), 2)
+ for p in ports:
+ self.assertEqual(p['admin_state'], 'DISABLED')
+ self.assertEqual(p['oper_status'], 'UNKNOWN')
+
+ def delete_device(self, id):
+ path = '/api/v1/devices/{}'.format(id)
+ self.delete(path + '/delete', expected_code=200)
+ device = self.get(path, expected_code=404)
+ self.assertIsNone(device)
+
+ def assert_no_device_present(self):
+ path = '/api/v1/devices'
+ devices = self.get(path)['items']
+ self.assertEqual(devices, [])
+
+ def assert_no_logical_device(self):
+ path = '/api/v1/logical_devices'
+ ld = self.get(path)['items']
+ self.assertEqual(ld, [])
+
+ def delete_device_incorrect_state(self, id):
+ path = '/api/v1/devices/{}'.format(id)
+ self.delete(path + '/delete', expected_code=400)
+
+ def enable_unknown_device(self, id):
+ path = '/api/v1/devices/{}'.format(id)
+ self.post(path + '/enable', expected_code=404)
+
+ def disable_unknown_device(self, id):
+ path = '/api/v1/devices/{}'.format(id)
+ self.post(path + '/disable', expected_code=404)
+
+ def delete_unknown_device(self, id):
+ path = '/api/v1/devices/{}'.format(id)
+ self.delete(path + '/delete', expected_code=404)
+
+ def assert_alarm_generation(self, device_id, event_present=True):
+ # The olt device should start generating alarms periodically
+ alarm = self.assert_alarm_event(device_id, event_present)
+
+ if event_present:
+ self.assertIsNotNone(alarm)
+ # Make sure that the schema is valid
+ self.assert_alarm_event_schema(alarm)
+
+ # Validate a sample alarm for a specific device
+ def assert_alarm_event(self, device_id, event_present=True):
+ self.alarm_data = None
+
+ def validate_output(data):
+ alarm = simplejson.loads(data)
+
+ if not alarm or \
+ 'resource_id' not in alarm or \
+ 'reported_ts' not in alarm:
+ return False
+
+ # Check for new alarms only
+ if alarm['reported_ts'] > self.consume_kafka_message_starting_at:
+ if alarm['resource_id'] == device_id:
+ self.alarm_data = alarm
+ return True
+
+ cmd = command_defs['kafka_alarms'].format(self.kafka_endpoint)
+
+ self.run_command_and_wait_until(cmd, validate_output, 30, 'alarms',
+ expected_predicate_result=event_present)
+
+ return self.alarm_data
+
+ # Validate a sample kpi for a specific device
+ def assert_kpis_event(self, device_id, event_present=True):
+
+ def validate_output(data):
+ kpis_data = simplejson.loads(data)
+
+ if not kpis_data or \
+ 'ts' not in kpis_data or \
+ 'prefixes' not in kpis_data:
+ return False
+
+ # Check only new kpis
+ if kpis_data['ts'] > self.consume_kafka_message_starting_at:
+ for key, value in kpis_data['prefixes'].items():
+ if device_id in key:
+ return True
+ return False
+
+ cmd = command_defs['kafka_kpis'].format(self.kafka_endpoint)
+
+ self.run_command_and_wait_until(cmd, validate_output, 60, 'kpis',
+ expected_predicate_result=event_present)
+
+ # Verify that the alarm follows the proper schema structure
+ def assert_alarm_event_schema(self, alarm):
+ try:
+ jsonschema.validate(alarm, ALARM_SCHEMA)
+ except Exception as e:
+ self.assertTrue(
+ False, 'Validation failed for alarm : {}'.format(e.message))
+
+ def create_device_filter(self, device_id):
+ rules = list()
+ rule = dict()
+
+ # Create a filter with a single rule
+ rule['key'] = 'device_id'
+ rule['value'] = device_id
+ rules.append(rule)
+
+ alarm_filter = AlarmFilter(rules=rules)
+ alarm_filter = self.post('/api/v1/local/alarm_filters',
+ MessageToDict(alarm_filter),
+ expected_code=200)
+ self.assertIsNotNone(alarm_filter)
+ return alarm_filter
+
+ def remove_device_filter(self, alarm_filter_id):
+ path = '/api/v1/local/alarm_filters/{}'.format(alarm_filter_id)
+ self.delete(path, expected_code=200)
+ alarm_filter = self.get(path, expected_code=404)
+ self.assertIsNone(alarm_filter)
+
+ def run_command_and_wait_until(self, cmd, predicate, timeout, msg,
+ expected_predicate_result=True):
+ # Run until the predicate is True or timeout
+ try:
+ deadline = time() + timeout
+ env = os.environ.copy()
+ proc = subprocess.Popen(
+ cmd,
+ env=env,
+ shell=True,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ bufsize=1
+ )
+ poll_obj = select.poll()
+ poll_obj.register(proc.stdout, select.POLLIN)
+ while time() < deadline:
+ poll_result = poll_obj.poll(0)
+ if poll_result:
+ line = proc.stdout.readline()
+ if predicate(line):
+ try:
+ proc.terminate()
+ proc.wait()
+ subprocess.Popen(['reset']).wait()
+ except Exception as e:
+ print "Received exception {} when killing process " \
+ "started with {}".format(repr(e), cmd)
+ if not expected_predicate_result:
+ self.fail(
+ 'Predicate is True but is should be false:{}'.
+ format(msg))
+ else:
+ return
+ try:
+ proc.terminate()
+ proc.wait()
+ subprocess.Popen(['reset']).wait()
+ except Exception as e:
+ print "Received exception {} when killing process " \
+ "started with {}".format(repr(e), cmd)
+
+ if expected_predicate_result:
+ self.fail(
+ 'Timed out while waiting for condition: {}'.format(msg))
+
+ except Exception as e:
+ print 'Exception {} when running command:{}'.format(repr(e), cmd)
+ return
diff --git a/voltha/adapters/broadcom_onu/broadcom_onu.py b/voltha/adapters/broadcom_onu/broadcom_onu.py
index a424831..8adb0e8 100644
--- a/voltha/adapters/broadcom_onu/broadcom_onu.py
+++ b/voltha/adapters/broadcom_onu/broadcom_onu.py
@@ -97,6 +97,9 @@
reactor.callLater(0, self.devices_handlers[device.proxy_address.channel_id].activate, device)
return device
+ def reconcile_device(self, device):
+ raise NotImplementedError()
+
def abandon_device(self, device):
raise NotImplementedError()
diff --git a/voltha/adapters/dpoe_onu/dpoe_onu.py b/voltha/adapters/dpoe_onu/dpoe_onu.py
index 5aa8838..4bf13f2 100644
--- a/voltha/adapters/dpoe_onu/dpoe_onu.py
+++ b/voltha/adapters/dpoe_onu/dpoe_onu.py
@@ -115,6 +115,9 @@
reactor.callLater(0.1, self._onu_device_activation, device)
return device
+ def reconcile_device(self, device):
+ raise NotImplementedError()
+
@inlineCallbacks
def _onu_device_activation(self, device):
# first we verify that we got parent reference and proxy info
diff --git a/voltha/adapters/interface.py b/voltha/adapters/interface.py
index e44537b..1b6881f 100644
--- a/voltha/adapters/interface.py
+++ b/voltha/adapters/interface.py
@@ -81,6 +81,19 @@
:return: (Deferred) Shall be fired to acknowledge device ownership.
"""
+ def reconcile_device(device):
+ """
+ Make sure the adapter looks after given device. Called when this
+ device has changed ownership from another Voltha instance to
+ this one (typically, this occurs when the previous voltha
+ instance went down).
+ :param device: A voltha.Device object, with possible device-type
+ specific extensions. Such extensions shall be described as part of
+ the device type specification returned by device_types().
+ :return: (Deferred) Shall be fired to acknowledge device ownership.
+ """
+
+
def abandon_device(device):
"""
Make sur ethe adapter no longer looks after device. This is called
diff --git a/voltha/adapters/maple_olt/maple_olt.py b/voltha/adapters/maple_olt/maple_olt.py
index f77db19..85430b0 100644
--- a/voltha/adapters/maple_olt/maple_olt.py
+++ b/voltha/adapters/maple_olt/maple_olt.py
@@ -430,6 +430,9 @@
reactor.callLater(0, self.devices_handlers[device.id].activate, device)
return device
+ def reconcile_device(self, device):
+ raise NotImplementedError()
+
def abandon_device(self, device):
raise NotImplementedError()
diff --git a/voltha/adapters/microsemi_olt/microsemi_olt.py b/voltha/adapters/microsemi_olt/microsemi_olt.py
index 42dc56f..03783fb 100644
--- a/voltha/adapters/microsemi_olt/microsemi_olt.py
+++ b/voltha/adapters/microsemi_olt/microsemi_olt.py
@@ -105,6 +105,9 @@
log.info('adopted-device', device=device)
self.olts[target] = (olt, activation, comm)
+ def reconcile_device(self, device):
+ raise NotImplementedError()
+
def abandon_device(self, device):
self._abandon(device.mac_address)
diff --git a/voltha/adapters/pmcs_onu/pmcs_onu.py b/voltha/adapters/pmcs_onu/pmcs_onu.py
index a58353f..da5a5be 100644
--- a/voltha/adapters/pmcs_onu/pmcs_onu.py
+++ b/voltha/adapters/pmcs_onu/pmcs_onu.py
@@ -99,6 +99,9 @@
reactor.callLater(0.1, self._onu_device_activation, device)
return device
+ def reconcile_device(self, device):
+ raise NotImplementedError()
+
def abandon_device(self, device):
raise NotImplementedError()
diff --git a/voltha/adapters/ponsim_olt/ponsim_olt.py b/voltha/adapters/ponsim_olt/ponsim_olt.py
index 138e248..be57fac 100644
--- a/voltha/adapters/ponsim_olt/ponsim_olt.py
+++ b/voltha/adapters/ponsim_olt/ponsim_olt.py
@@ -161,17 +161,22 @@
alarm_event = self.adapter.adapter_agent.create_alarm(
resource_id=self.device.id,
- description="{}.{} - {}".format(self.adapter.name, self.device.id,
- alarm_data['description']) if 'description' in alarm_data else None,
+ description="{}.{} - {}".format(self.adapter.name,
+ self.device.id,
+ alarm_data[
+ 'description']) if 'description' in alarm_data else None,
type=alarm_data['type'] if 'type' in alarm_data else None,
- category=alarm_data['category'] if 'category' in alarm_data else None,
- severity=alarm_data['severity'] if 'severity' in alarm_data else None,
+ category=alarm_data[
+ 'category'] if 'category' in alarm_data else None,
+ severity=alarm_data[
+ 'severity'] if 'severity' in alarm_data else None,
state=alarm_data['state'] if 'state' in alarm_data else None,
raised_ts=alarm_data['ts'] if 'ts' in alarm_data else 0,
context=current_context
)
- self.adapter.adapter_agent.submit_alarm(self.device.id, alarm_event)
+ self.adapter.adapter_agent.submit_alarm(self.device.id,
+ alarm_event)
except Exception as e:
log.exception('failed-to-send-alarm', e=e)
@@ -237,6 +242,23 @@
reactor.callLater(0, self.devices_handlers[device.id].activate, device)
return device
+ def reconcile_device(self, device):
+ try:
+ self.devices_handlers[device.id] = PonSimOltHandler(self,
+ device.id)
+ # Work only required for devices that are in ENABLED state
+ if device.admin_state == AdminState.ENABLED:
+ reactor.callLater(0,
+ self.devices_handlers[device.id].reconcile,
+ device)
+ else:
+ # Invoke the children reconciliation which would setup the
+ # basic children data structures
+ self.adapter_agent.reconcile_child_devices(device.id)
+ return device
+ except Exception, e:
+ log.exception('Exception', e=e)
+
def abandon_device(self, device):
raise NotImplementedError()
@@ -257,6 +279,7 @@
def delete_device(self, device):
log.info('delete-device', device_id=device.id)
+ # TODO: Update the logical device mapping
reactor.callLater(0, self.devices_handlers[device.id].delete)
return device
@@ -329,6 +352,12 @@
self.channel = grpc.insecure_channel(device.host_and_port)
return self.channel
+ def _get_nni_port(self):
+ ports = self.adapter_agent.get_ports(self.device_id, Port.ETHERNET_NNI)
+ if ports:
+ # For now, we use on one NNI port
+ return ports[0]
+
def activate(self, device):
self.log.info('activating')
@@ -448,8 +477,66 @@
# Start collecting stats from the device after a brief pause
self.start_kpi_collection(device.id)
+ def reconcile(self, device):
+ self.log.info('reconciling-OLT-device-starts')
+
+ if not device.host_and_port:
+ device.oper_status = OperStatus.FAILED
+ device.reason = 'No host_and_port field provided'
+ self.adapter_agent.update_device(device)
+ return
+
+ try:
+ stub = ponsim_pb2.PonSimStub(self.get_channel())
+ info = stub.GetDeviceInfo(Empty())
+ log.info('got-info', info=info)
+ # TODO: Verify we are connected to the same device we are
+ # reconciling - not much data in ponsim to differentiate at the
+ # time
+ device.oper_status = OperStatus.ACTIVE
+ self.adapter_agent.update_device(device)
+ self.ofp_port_no = info.nni_port
+ self.nni_port = self._get_nni_port()
+ except Exception, e:
+ log.exception('device-unreachable', e=e)
+ device.connect_status = ConnectStatus.UNREACHABLE
+ device.oper_status = OperStatus.UNKNOWN
+ self.adapter_agent.update_device(device)
+ return
+
+ # Now set the initial PM configuration for this device
+ self.pm_metrics = AdapterPmMetrics(device)
+ pm_config = self.pm_metrics.make_proto()
+ log.info("initial-pm-config", pm_config=pm_config)
+ self.adapter_agent.update_device_pm_config(pm_config, init=True)
+
+ # Setup alarm handler
+ self.alarms = AdapterAlarms(self.adapter, device)
+
+ # TODO: Is there anything required to verify nni and PON ports
+
+ # Set the logical device id
+ device = self.adapter_agent.get_device(device.id)
+ if device.parent_id:
+ self.logical_device_id = device.parent_id
+ self.adapter_agent.reconcile_logical_device(device.parent_id)
+ else:
+ self.log.info('no-logical-device-set')
+
+ # Reconcile child devices
+ self.adapter_agent.reconcile_child_devices(device.id)
+
+ # finally, open the frameio port to receive in-band packet_in messages
+ self.io_port = registry('frameio').open_port(
+ self.interface, self.rcv_io, is_inband_frame)
+
+ # Start collecting stats from the device after a brief pause
+ self.start_kpi_collection(device.id)
+
+ self.log.info('reconciling-OLT-device-ends')
+
def rcv_io(self, port, frame):
- self.log.info('reveived', iface_name=port.iface_name,
+ self.log.info('received', iface_name=port.iface_name,
frame_len=len(frame))
pkt = Ether(frame)
if pkt.haslayer(Dot1Q):
@@ -572,6 +659,12 @@
# close the frameio port
registry('frameio').close_port(self.io_port)
+ # Update the logice device mapping
+ if self.logical_device_id in \
+ self.adapter.logical_device_id_to_root_device_id:
+ del self.adapter.logical_device_id_to_root_device_id[
+ self.logical_device_id]
+
# TODO:
# 1) Remove all flows from the device
# 2) Remove the device from ponsim
@@ -584,6 +677,15 @@
# Get the latest device reference
device = self.adapter_agent.get_device(self.device_id)
+ # Set the ofp_port_no and nni_port in case we bypassed the reconcile
+ # process if the device was in DISABLED state on voltha restart
+ if not self.ofp_port_no and not self.nni_port:
+ stub = ponsim_pb2.PonSimStub(self.get_channel())
+ info = stub.GetDeviceInfo(Empty())
+ log.info('got-info', info=info)
+ self.ofp_port_no = info.nni_port
+ self.nni_port = self._get_nni_port()
+
# Update the connect status to REACHABLE
device.connect_status = ConnectStatus.REACHABLE
self.adapter_agent.update_device(device)
@@ -645,10 +747,8 @@
admin_state=AdminState.ENABLED)
# finally, open the frameio port to receive in-band packet_in messages
- self.log.info('registering-frameio')
self.io_port = registry('frameio').open_port(
self.interface, self.rcv_io, is_inband_frame)
- self.log.info('registered-frameio')
self.log.info('re-enabled', device_id=device.id)
diff --git a/voltha/adapters/ponsim_onu/ponsim_onu.py b/voltha/adapters/ponsim_onu/ponsim_onu.py
index cbaf3ab..05df340 100644
--- a/voltha/adapters/ponsim_onu/ponsim_onu.py
+++ b/voltha/adapters/ponsim_onu/ponsim_onu.py
@@ -94,6 +94,15 @@
reactor.callLater(0, self.devices_handlers[device.id].activate, device)
return device
+ def reconcile_device(self, device):
+ self.devices_handlers[device.id] = PonSimOnuHandler(self, device.id)
+ # Reconcile only if state was ENABLED
+ if device.admin_state == AdminState.ENABLED:
+ reactor.callLater(0,
+ self.devices_handlers[device.id].reconcile,
+ device)
+ return device
+
def abandon_device(self, device):
raise NotImplementedError()
@@ -136,8 +145,13 @@
def receive_proxied_message(self, proxy_address, msg):
log.info('receive-proxied-message', proxy_address=proxy_address,
device_id=proxy_address.device_id, msg=msg)
- handler = self.devices_handlers[proxy_address.device_id]
- handler.receive_message(msg)
+ # Device_id from the proxy_address is the olt device id. We need to
+ # get the onu device id using the port number in the proxy_address
+ device = self.adapter_agent. \
+ get_child_device_with_proxy_address(proxy_address)
+ if device:
+ handler = self.devices_handlers[device.id]
+ handler.receive_message(msg)
def receive_packet_out(self, logical_device_id, egress_port_no, msg):
log.info('packet-out', logical_device_id=logical_device_id,
@@ -240,6 +254,44 @@
device.oper_status = OperStatus.ACTIVE
self.adapter_agent.update_device(device)
+ def _get_uni_port(self):
+ ports = self.adapter_agent.get_ports(self.device_id, Port.ETHERNET_UNI)
+ if ports:
+ # For now, we use on one uni port
+ return ports[0]
+
+ def _get_pon_port(self):
+ ports = self.adapter_agent.get_ports(self.device_id, Port.PON_ONU)
+ if ports:
+ # For now, we use on one uni port
+ return ports[0]
+
+ def reconcile(self, device):
+ self.log.info('reconciling-ONU-device-starts')
+
+ # first we verify that we got parent reference and proxy info
+ assert device.parent_id
+ assert device.proxy_address.device_id
+ assert device.proxy_address.channel_id
+
+ # register for proxied messages right away
+ self.proxy_address = device.proxy_address
+ self.adapter_agent.register_for_proxied_messages(device.proxy_address)
+
+ # Set the connection status to REACHABLE
+ device.connect_status = ConnectStatus.REACHABLE
+ self.adapter_agent.update_device(device)
+
+ # TODO: Verify that the uni, pon and logical ports exists
+
+ # Mark the device as REACHABLE and ACTIVE
+ device = self.adapter_agent.get_device(device.id)
+ device.connect_status = ConnectStatus.REACHABLE
+ device.oper_status = OperStatus.ACTIVE
+ self.adapter_agent.update_device(device)
+
+ self.log.info('reconciling-ONU-device-ends')
+
@inlineCallbacks
def update_flow_table(self, flows):
@@ -314,6 +366,7 @@
portid=port_id)
# Remove pon port from parent
+ self.pon_port = self._get_pon_port()
self.adapter_agent.delete_port_reference_from_parent(self.device_id,
self.pon_port)
@@ -333,60 +386,66 @@
def reenable(self):
self.log.info('re-enabling', device_id=self.device_id)
+ try:
+ # Get the latest device reference
+ device = self.adapter_agent.get_device(self.device_id)
- # Get the latest device reference
- device = self.adapter_agent.get_device(self.device_id)
+ # First we verify that we got parent reference and proxy info
+ assert device.parent_id
+ assert device.proxy_address.device_id
+ assert device.proxy_address.channel_id
- # First we verify that we got parent reference and proxy info
- assert self.uni_port
- assert device.parent_id
- assert device.proxy_address.device_id
- assert device.proxy_address.channel_id
+ # Re-register for proxied messages right away
+ self.proxy_address = device.proxy_address
+ self.adapter_agent.register_for_proxied_messages(
+ device.proxy_address)
- # Re-register for proxied messages right away
- self.proxy_address = device.proxy_address
- self.adapter_agent.register_for_proxied_messages(device.proxy_address)
+ # Re-enable the ports on that device
+ self.adapter_agent.enable_all_ports(self.device_id)
- # Re-enable the ports on that device
- self.adapter_agent.enable_all_ports(self.device_id)
+ # Refresh the port reference
+ self.uni_port = self._get_uni_port()
+ self.pon_port = self._get_pon_port()
- # Add the pon port reference to the parent
- self.adapter_agent.add_port_reference_to_parent(device.id,
- self.pon_port)
+ # Add the pon port reference to the parent
+ self.adapter_agent.add_port_reference_to_parent(device.id,
+ self.pon_port)
- # Update the connect status to REACHABLE
- device.connect_status = ConnectStatus.REACHABLE
- self.adapter_agent.update_device(device)
+ # Update the connect status to REACHABLE
+ device.connect_status = ConnectStatus.REACHABLE
+ self.adapter_agent.update_device(device)
- # re-add uni port to logical device
- parent_device = self.adapter_agent.get_device(device.parent_id)
- logical_device_id = parent_device.parent_id
- assert logical_device_id
- port_no = device.proxy_address.channel_id
- cap = OFPPF_1GB_FD | OFPPF_FIBER
- self.adapter_agent.add_logical_port(logical_device_id, LogicalPort(
- id='uni-{}'.format(port_no),
- ofp_port=ofp_port(
- port_no=port_no,
- hw_addr=mac_str_to_tuple('00:00:00:00:00:%02x' % port_no),
- name='uni-{}'.format(port_no),
- config=0,
- state=OFPPS_LIVE,
- curr=cap,
- advertised=cap,
- peer=cap,
- curr_speed=OFPPF_1GB_FD,
- max_speed=OFPPF_1GB_FD
- ),
- device_id=device.id,
- device_port_no=self.uni_port.port_no
- ))
+ # re-add uni port to logical device
+ parent_device = self.adapter_agent.get_device(device.parent_id)
+ logical_device_id = parent_device.parent_id
+ assert logical_device_id
+ port_no = device.proxy_address.channel_id
+ cap = OFPPF_1GB_FD | OFPPF_FIBER
+ self.adapter_agent.add_logical_port(logical_device_id, LogicalPort(
+ id='uni-{}'.format(port_no),
+ ofp_port=ofp_port(
+ port_no=port_no,
+ hw_addr=mac_str_to_tuple('00:00:00:00:00:%02x' % port_no),
+ name='uni-{}'.format(port_no),
+ config=0,
+ state=OFPPS_LIVE,
+ curr=cap,
+ advertised=cap,
+ peer=cap,
+ curr_speed=OFPPF_1GB_FD,
+ max_speed=OFPPF_1GB_FD
+ ),
+ device_id=device.id,
+ device_port_no=self.uni_port.port_no
+ ))
- device = self.adapter_agent.get_device(device.id)
- device.oper_status = OperStatus.ACTIVE
- self.adapter_agent.update_device(device)
+ device = self.adapter_agent.get_device(device.id)
+ device.oper_status = OperStatus.ACTIVE
+ self.adapter_agent.update_device(device)
- self.log.info('re-enabled', device_id=device.id)
+ self.log.info('re-enabled', device_id=device.id)
+ except Exception, e:
+ self.log.exception('error-reenabling', e=e)
def delete(self):
self.log.info('deleting', device_id=self.device_id)
diff --git a/voltha/adapters/simulated_olt/simulated_olt.py b/voltha/adapters/simulated_olt/simulated_olt.py
index f2e056b..f973381 100644
--- a/voltha/adapters/simulated_olt/simulated_olt.py
+++ b/voltha/adapters/simulated_olt/simulated_olt.py
@@ -215,6 +215,9 @@
reactor.callLater(0.2, self._simulate_device_activation, device)
return device
+ def reconcile_device(self, device):
+ raise NotImplementedError()
+
def abandon_device(self, device):
raise NotImplementedError()
diff --git a/voltha/adapters/simulated_onu/simulated_onu.py b/voltha/adapters/simulated_onu/simulated_onu.py
index 83dfd85..b53fd11 100644
--- a/voltha/adapters/simulated_onu/simulated_onu.py
+++ b/voltha/adapters/simulated_onu/simulated_onu.py
@@ -90,6 +90,9 @@
reactor.callLater(0.2, self._simulate_device_activation, device)
return device
+ def reconcile_device(self, device):
+ raise NotImplementedError()
+
def abandon_device(self, device):
raise NotImplementedError()
diff --git a/voltha/adapters/tibit_olt/tibit_olt.py b/voltha/adapters/tibit_olt/tibit_olt.py
index 5936107..3d9e264 100644
--- a/voltha/adapters/tibit_olt/tibit_olt.py
+++ b/voltha/adapters/tibit_olt/tibit_olt.py
@@ -192,6 +192,9 @@
self._activate_io_port()
reactor.callLater(0, self._launch_device_activation, device)
+ def reconcile_device(self, device):
+ raise NotImplementedError()
+
def _activate_io_port(self):
if self.io_port is None:
self.io_port = registry('frameio').open_port(
diff --git a/voltha/adapters/tibit_onu/tibit_onu.py b/voltha/adapters/tibit_onu/tibit_onu.py
index 247a868..8b5f754 100644
--- a/voltha/adapters/tibit_onu/tibit_onu.py
+++ b/voltha/adapters/tibit_onu/tibit_onu.py
@@ -145,6 +145,9 @@
reactor.callLater(0.1, self._onu_device_activation, device)
return device
+ def reconcile_device(self, device):
+ raise NotImplementedError()
+
@inlineCallbacks
def _onu_device_activation(self, device):
# first we verify that we got parent reference and proxy info
diff --git a/voltha/coordinator.py b/voltha/coordinator.py
index a2b94a6..65786ff 100644
--- a/voltha/coordinator.py
+++ b/voltha/coordinator.py
@@ -81,6 +81,9 @@
self.config['assignment_key'], 'assignments'), ''))
self.workload_prefix = '/'.join((self.prefix, self.config.get(
self.config['workload_key'], 'work'), ''))
+ self.core_store_prefix = '/'.join((self.prefix, self.config.get(
+ self.config['core_store_key'], 'data/core')))
+ self.core_storage_suffix='core_store'
self.retries = 0
self.instance_id = instance_id
@@ -138,6 +141,13 @@
self.wait_for_leader_deferreds.append(d)
return d
+
+ # Wait for a core data id to be assigned to this voltha instance
+ @inlineCallbacks
+ def get_core_store_id_and_prefix(self):
+ core_store_id = yield self.worker.get_core_store_id()
+ returnValue((core_store_id, self.core_store_prefix))
+
# Proxy methods for consul with retry support
def kv_get(self, *args, **kw):
@@ -196,7 +206,7 @@
# create consul session
self.session_id = yield self.consul.session.create(
- behavior='delete', ttl=10, lock_delay=1)
+ behavior='release', ttl=60, lock_delay=1)
log.info('created-consul-session', session_id=self.session_id)
# start renewing session it 3 times within the ttl
diff --git a/voltha/core/adapter_agent.py b/voltha/core/adapter_agent.py
index 7daa0a7..614d535 100644
--- a/voltha/core/adapter_agent.py
+++ b/voltha/core/adapter_agent.py
@@ -152,6 +152,9 @@
def adopt_device(self, device):
return self.adapter.adopt_device(device)
+ def reconcile_device(self, device):
+ return self.adapter.reconcile_device(device)
+
def abandon_device(self, device):
return self.adapter.abandon_device(device)
@@ -295,6 +298,12 @@
self._make_up_to_date('/devices/{}/ports'.format(device_id),
port.port_no, port)
+ def get_ports(self, device_id, port_type):
+ # assert Port.PortType.DESCRIPTOR.values_by_name[port_type]
+ ports = self.root_proxy.get('/devices/{}/ports'.format(device_id))
+ return [p for p in ports if p.type == port_type]
+
+
def disable_all_ports(self, device_id):
"""
Disable all ports on that device, i.e. change the admin status to
@@ -400,6 +409,21 @@
return logical_device
+ def reconcile_logical_device(self, logical_device_id):
+ """
+ This is called by the adapter to reconcile the physical device with
+ the logical device. For now, we only set the packet-out subscription
+ :param logical_device_id:
+ :return:
+ """
+ # Keep a reference to the packet out subscription as it will be
+ # referred during removal
+ self.packet_out_subscription = self.event_bus.subscribe(
+ topic='packet-out:{}'.format(logical_device_id),
+ callback=lambda _, p: self.receive_packet_out(logical_device_id, p)
+ )
+
+
def delete_logical_device(self, logical_device):
"""
This will remove the logical device as well as all logical ports
@@ -449,6 +473,30 @@
'/logical_devices/{}/ports'.format(logical_device_id),
port.id, port)
+ def get_child_devices(self, parent_device_id):
+ try:
+ devices = self.root_proxy.get('/devices')
+ children = [d for d in devices if d.parent_id == parent_device_id]
+ return children
+ except Exception, e:
+ self.log.exception('failure', e=e)
+
+ def subscribe_to_proxy_child_messages(self, proxy_address):
+ topic = self._gen_tx_proxy_address_topic(proxy_address)
+ self._tx_event_subscriptions[topic] = self.event_bus.subscribe(
+ topic, lambda t, m: self._send_proxied_message(proxy_address, m))
+
+ def reconcile_child_devices(self, parent_device_id):
+ children = self.get_child_devices(parent_device_id)
+ for child in children:
+ # First subscribe to proxy messages from a chile device
+ self.subscribe_to_proxy_child_messages(child.proxy_address)
+
+ # Then trigger the reconciliation of the existing child device
+ device_agent = self.core.get_device_agent(child.id)
+ device_agent.reconcile_existing_device(child)
+
+
def child_device_detected(self,
parent_device_id,
parent_port_no,
@@ -473,6 +521,16 @@
self._tx_event_subscriptions[topic] = self.event_bus.subscribe(
topic, lambda t, m: self._send_proxied_message(proxy_address, m))
+ def get_child_device_with_proxy_address(self, proxy_address):
+ # Proxy address is defined as {parent id, channel_id}
+ devices = self.root_proxy.get('/devices')
+ children_ids = set(d.id for d in devices if d.parent_id ==
+ proxy_address.device_id)
+ for child_id in children_ids:
+ device = self.get_device(child_id)
+ if device.proxy_address == proxy_address:
+ return device
+
def remove_all_logical_ports(self, logical_device_id):
""" Remove all logical ports from a given logical device"""
ports = self.root_proxy.get('/logical_devices/{}/ports')
diff --git a/voltha/core/config/config_backend.py b/voltha/core/config/config_backend.py
index 21ef5ec..0b1a38f 100644
--- a/voltha/core/config/config_backend.py
+++ b/voltha/core/config/config_backend.py
@@ -14,6 +14,10 @@
#
from consul import Consul
+import structlog
+
+log = structlog.get_logger()
+
class ConsulStore(object):
""" Config kv store for consul with a cache for quicker subsequent reads
@@ -56,23 +60,28 @@
return False
def __setitem__(self, key, value):
- assert isinstance(value, basestring)
- self._cache[key] = value
- self._consul.kv.put(self.make_path(key), value)
+ try:
+ assert isinstance(value, basestring)
+ self._cache[key] = value
+ self._consul.kv.put(self.make_path(key), value)
+ except Exception, e:
+ log.exception('cannot-set-item', e=e)
def __delitem__(self, key):
self._cache.pop(key, None)
self._consul.kv.delete(self.make_path(key))
-def load_backend(args):
+def load_backend(store_id, store_prefix, args):
""" Return the kv store backend based on the command line arguments
"""
# TODO: Make this more dynamic
def load_consul_store():
+ instance_core_store_prefix = '{}/{}'.format(store_prefix, store_id)
+
host, port = args.consul.split(':', 1)
- return ConsulStore(host, int(port), 'service/voltha/config_data')
+ return ConsulStore(host, int(port), instance_core_store_prefix)
loaders = {
'none': lambda: None,
diff --git a/voltha/core/config/config_node.py b/voltha/core/config/config_node.py
index f1c8fc0..ab73484 100644
--- a/voltha/core/config/config_node.py
+++ b/voltha/core/config/config_node.py
@@ -29,6 +29,9 @@
from voltha.protos import third_party
from voltha.protos import meta_pb2
+import structlog
+
+log = structlog.get_logger()
def message_to_dict(m):
return MessageToDict(m, True, True, False)
@@ -240,10 +243,21 @@
children = copy(rev._children[name])
idx, child_rev = find_rev_by_key(children, field.key, key)
child_node = child_rev.node
+ # chek if deep copy will work better
new_child_rev = child_node.update(
path, data, strict, txid, mk_branch)
if new_child_rev.hash == child_rev.hash:
- # no change, we can return
+ # When the new_child_rev goes out of scope,
+ # it's destructor gets invoked as it is not being
+ # referred by any other data structures. To prevent
+ # this to trigger the hash it is holding from being
+ # erased in the db, its hash is set to None. If the
+ # new_child_rev object is pointing at the same address
+ # as the child_rev address then do not clear the hash
+ if new_child_rev != child_rev:
+ log.debug('clear-hash',
+ hash=new_child_rev.hash, object_ref=new_child_rev)
+ new_child_rev.clear_hash()
return branch._latest
if getattr(new_child_rev.data, field.key) != key:
raise ValueError('Cannot change key field')
@@ -284,10 +298,15 @@
return branch._latest
def _make_latest(self, branch, rev, change_announcements=()):
- branch._latest = rev
+ # Update the latest branch only when the hash between the previous
+ # data and the new rev is different, otherwise this will trigger the
+ # data already saved in the db (with that hash) to be erased
if rev.hash not in branch._revs:
branch._revs[rev.hash] = rev
+ if not branch._latest or rev.hash != branch._latest.hash:
+ branch._latest = rev
+
# announce only if this is main branch
if change_announcements and branch._txid is None:
@@ -591,7 +610,7 @@
root = self._root
kv_store = root._kv_store
- branch = ConfigBranch(self, self._auto_prune)
+ branch = ConfigBranch(node=self, auto_prune=self._auto_prune)
rev = PersistedConfigRevision.load(
branch, kv_store, self._type, latest_hash)
self._make_latest(branch, rev)
diff --git a/voltha/core/config/config_rev.py b/voltha/core/config/config_rev.py
index 01c1ae2..8bfac18 100644
--- a/voltha/core/config/config_rev.py
+++ b/voltha/core/config/config_rev.py
@@ -32,6 +32,9 @@
from voltha.protos import third_party
from voltha.protos import meta_pb2
+import structlog
+
+log = structlog.get_logger()
def is_proto_message(o):
"""
@@ -284,6 +287,9 @@
def type(self):
return self._config.data.__class__
+ def clear_hash(self):
+ self._hash = None
+
def get(self, depth):
"""
Get config data of node. If depth > 0, recursively assemble the
diff --git a/voltha/core/config/config_rev_persisted.py b/voltha/core/config/config_rev_persisted.py
index 547641c..3464664 100644
--- a/voltha/core/config/config_rev_persisted.py
+++ b/voltha/core/config/config_rev_persisted.py
@@ -34,7 +34,7 @@
__slots__ = ('_kv_store',)
def __init__(self, branch, data, children=None):
- self._kv_store = branch._node._root.kv_store
+ self._kv_store = branch._node._root._kv_store
super(PersistedConfigRevision, self).__init__(branch, data, children)
def _finalize(self):
@@ -43,38 +43,47 @@
def __del__(self):
try:
- if self._config.__weakref__ is None:
- del self._kv_store[self._config._hash]
- assert self.__weakref__ is None
- del self._kv_store[self._hash]
+ if self._hash:
+ if self._config.__weakref__ is None:
+ if self._config._hash in self._kv_store:
+ del self._kv_store[self._config._hash]
+ assert self.__weakref__ is None
+ if self._hash in self._kv_store:
+ del self._kv_store[self._hash]
except Exception, e:
# this should never happen
- log.exception('del-error', hash=hash, e=e)
+ log.exception('del-error', hash=self.hash, e=e)
def store(self):
- # crude serialization of children hash and config data hash
- if self._hash in self._kv_store:
- return
- self.store_config()
+ try:
+ # crude serialization of children hash and config data hash
+ if self._hash in self._kv_store:
+ return
- children_lists = {}
- for field_name, children in self._children.iteritems():
- hashes = [rev.hash for rev in children]
- children_lists[field_name] = hashes
+ self.store_config()
- data = dict(
- children=children_lists,
- config=self._config._hash
- )
- blob = dumps(data)
- if self.compress:
- blob = compress(blob)
+ children_lists = {}
+ for field_name, children in self._children.iteritems():
+ hashes = [rev.hash for rev in children]
+ children_lists[field_name] = hashes
- self._kv_store[self._hash] = blob
+ data = dict(
+ children=children_lists,
+ config=self._config._hash
+ )
+ blob = dumps(data)
+ if self.compress:
+ blob = compress(blob)
+
+ self._kv_store[self._hash] = blob
+
+ except Exception, e:
+ log.exception('store-error', e=e)
@classmethod
def load(cls, branch, kv_store, msg_cls, hash):
+ # Update the branch's config store
blob = kv_store[hash]
if cls.compress:
blob = decompress(blob)
@@ -106,6 +115,7 @@
blob = self._config._data.SerializeToString()
if self.compress:
blob = compress(blob)
+
self._kv_store[self._config._hash] = blob
@classmethod
diff --git a/voltha/core/core.py b/voltha/core/core.py
index 88271a4..ae81306 100644
--- a/voltha/core/core.py
+++ b/voltha/core/core.py
@@ -40,10 +40,11 @@
@implementer(IComponent)
class VolthaCore(object):
- def __init__(self, instance_id, version, log_level):
+ def __init__(self, instance_id, core_store_id, version, log_level):
self.instance_id = instance_id
self.stopped = False
self.dispatcher = Dispatcher(self, instance_id)
+ self.core_store_id = core_store_id
self.global_handler = GlobalHandler(
dispatcher=self.dispatcher,
instance_id=instance_id,
@@ -52,6 +53,7 @@
self.local_handler = LocalHandler(
core=self,
instance_id=instance_id,
+ core_store_id=core_store_id,
version=version,
log_level=log_level)
self.local_root_proxy = None
@@ -84,6 +86,54 @@
def get_local_handler(self):
return self.local_handler
+ @inlineCallbacks
+ def reconcile_data(self):
+ # This method is used to trigger the necessary APIs when a voltha
+ # instance is started using an existing config
+ if self.local_handler.has_started_with_existing_data():
+ log.info('reconciliation-started')
+
+ # 1. Reconcile the logical device agents as they will be
+ # referred by the device agents
+ logical_devices = self.local_root_proxy.get('/logical_devices')
+ for logical_device in logical_devices:
+ self._handle_reconcile_logical_device(logical_device,
+ reconcile=True)
+
+ # 2. Reconcile the device agents
+ devices = self.local_root_proxy.get('/devices')
+
+ # First create the device agents for the ONU without reconciling
+ # them. Reconciliation will be triggered by the OLT adapter after
+ # it finishes reconciling the OLT. Note that the device_agent
+ # handling the ONU should be present before the ONU reconciliation
+ # occurs
+ for device in devices:
+ if device.type.endswith("_onu"):
+ yield self._handle_reconcile_existing_device(
+ device=device, reconcile=False)
+
+ # Then reconcile the OLT devices
+ for device in devices:
+ if device.type.endswith("_olt"):
+ yield self._handle_reconcile_existing_device(
+ device=device, reconcile=True)
+
+ # 3. Reconcile the alarm filters
+ alarm_filters = self.local_root_proxy.get('/alarm_filters')
+ for alarm_filter in alarm_filters:
+ yield self._handle_add_alarm_filter(alarm_filter)
+
+ log.info('reconciliation-ends')
+ else:
+ log.info('no-existing-data-to-reconcile')
+
+ def _get_devices(self):
+ pass
+
+ def _get_logical_devices(self):
+ pass
+
def get_proxy(self, path, exclusive=False):
return self.local_handler.get_proxy(path, exclusive)
@@ -121,6 +171,15 @@
self.device_agents[device.id] = yield DeviceAgent(self, device).start()
@inlineCallbacks
+ def _handle_reconcile_existing_device(self, device, reconcile):
+ assert isinstance(device, Device)
+ assert device.id not in self.device_agents
+ # We need to provide the existing device data to the start function
+ self.device_agents[device.id] = \
+ yield DeviceAgent(self, device).start(device=device,
+ reconcile=reconcile)
+
+ @inlineCallbacks
def _handle_remove_device(self, device):
if device.id in self.device_agents:
AlarmFilterAgent(self).remove_device_filters(device)
@@ -141,6 +200,16 @@
self.logical_device_agents[logical_device.id] = agent
@inlineCallbacks
+ def _handle_reconcile_logical_device(self, logical_device, reconcile):
+ assert isinstance(logical_device, LogicalDevice)
+ assert logical_device.id not in self.logical_device_agents
+ log.info('reconcile', reconcile=reconcile)
+ agent = yield LogicalDeviceAgent(self,
+ logical_device).start(
+ reconcile=reconcile)
+ self.logical_device_agents[logical_device.id] = agent
+
+ @inlineCallbacks
def _handle_remove_logical_device(self, logical_device):
if logical_device.id in self.logical_device_agents:
yield self.logical_device_agents[logical_device.id].stop()
diff --git a/voltha/core/device_agent.py b/voltha/core/device_agent.py
index b00bebf..9f7c8cf 100644
--- a/voltha/core/device_agent.py
+++ b/voltha/core/device_agent.py
@@ -69,11 +69,19 @@
self.log = structlog.get_logger(device_id=initial_data.id)
@inlineCallbacks
- def start(self):
- self.log.debug('starting')
+ def start(self, device=None, reconcile=False):
+ self.log.info('starting', device=device)
self._set_adapter_agent()
- yield self._process_update(self._tmp_initial_data)
+ if device:
+ # Starting from an existing data, so set the last_data
+ self.last_data = device
+ if reconcile:
+ self.reconcile_existing_device(device)
+ else:
+ yield self._process_update(self._tmp_initial_data)
+
del self._tmp_initial_data
+
self.log.info('started')
returnValue(self)
@@ -90,15 +98,16 @@
CallbackType.POST_UPDATE, self._process_update)
self.log.info('stopped')
+
@inlineCallbacks
def reboot_device(self, device, dry_run=False):
- self.log.info('reboot-device', device=device, dry_run=dry_run)
+ self.log.debug('reboot-device', device=device, dry_run=dry_run)
if not dry_run:
yield self.adapter_agent.reboot_device(device)
@inlineCallbacks
def get_device_details(self, device, dry_run=False):
- self.log.info('get-device-details', device=device, dry_run=dry_run)
+ self.log.debug('get-device-details', device=device, dry_run=dry_run)
if not dry_run:
yield self.adapter_agent.get_device_details(device)
@@ -118,6 +127,14 @@
except Exception as e:
self.log.exception(e.message)
+ @inlineCallbacks
+ def reconcile_existing_device(self, device, dry_run=False):
+ self.log.debug('reconcile-existing-device',
+ device=device,
+ dry_run=False)
+ if not dry_run:
+ yield self.adapter_agent.reconcile_device(device)
+
def _set_adapter_agent(self):
adapter_name = self._tmp_initial_data.adapter
if adapter_name == '':
@@ -161,6 +178,8 @@
old_admin_state = getattr(self.last_data, 'admin_state',
AdminState.UNKNOWN)
new_admin_state = device.admin_state
+ self.log.debug('device-admin-states', old_state=old_admin_state,
+ new_state=new_admin_state, dry_run=dry_run)
transition_handler = self.admin_state_fsm.get(
(old_admin_state, new_admin_state), None)
if transition_handler is None:
@@ -174,7 +193,7 @@
@inlineCallbacks
def _activate_device(self, device, dry_run=False):
- self.log.info('activate-device', device=device, dry_run=dry_run)
+ self.log.debug('activate-device', device=device, dry_run=dry_run)
if not dry_run:
device.oper_status = OperStatus.ACTIVATING
self.update_device(device)
@@ -189,14 +208,14 @@
self.pm_config_proxy.update('/', device_pm_config)
def _propagate_change(self, device, dry_run=False):
- self.log.info('propagate-change', device=device, dry_run=dry_run)
+ self.log.debug('propagate-change', device=device, dry_run=dry_run)
if device != self.last_data:
raise NotImplementedError()
else:
self.log.debug('no-op')
def _abandon_device(self, device, dry_run=False):
- self.log.info('abandon-device', device=device, dry_run=dry_run)
+ self.log.debug('abandon-device', device=device, dry_run=dry_run)
raise NotImplementedError()
def _delete_all_flows(self):
@@ -209,21 +228,24 @@
@inlineCallbacks
def _disable_device(self, device, dry_run=False):
- self.log.info('disable-device', device=device, dry_run=dry_run)
- if not dry_run:
- # Remove all flows before disabling device
- self._delete_all_flows()
- yield self.adapter_agent.disable_device(device)
+ try:
+ self.log.debug('disable-device', device=device, dry_run=dry_run)
+ if not dry_run:
+ # Remove all flows before disabling device
+ self._delete_all_flows()
+ yield self.adapter_agent.disable_device(device)
+ except Exception, e:
+ self.log.exception('error', e=e)
@inlineCallbacks
def _reenable_device(self, device, dry_run=False):
- self.log.info('reenable-device', device=device, dry_run=dry_run)
+ self.log.debug('reenable-device', device=device, dry_run=dry_run)
if not dry_run:
yield self.adapter_agent.reenable_device(device)
@inlineCallbacks
def _delete_device(self, device, dry_run=False):
- self.log.info('delete-device', device=device, dry_run=dry_run)
+ self.log.debug('delete-device', device=device, dry_run=dry_run)
if not dry_run:
yield self.adapter_agent.delete_device(device)
diff --git a/voltha/core/local_handler.py b/voltha/core/local_handler.py
index 13e8b68..00d5ecf 100644
--- a/voltha/core/local_handler.py
+++ b/voltha/core/local_handler.py
@@ -37,22 +37,30 @@
class LocalHandler(VolthaLocalServiceServicer):
- def __init__(self, core, **init_kw):
+ def __init__(self, core, instance_id, core_store_id, **init_kw):
self.core = core
+ self.instance_id = instance_id
+ self.core_store_id = core_store_id
self.init_kw = init_kw
self.root = None
+ self.started_with_existing_data = False
self.stopped = False
def start(self, config_backend=None):
log.debug('starting')
if config_backend:
if 'root' in config_backend:
- # This is going to block the entire reactor until loading is completed
- log.info('loading config from persisted backend')
- self.root = ConfigRoot.load(VolthaInstance,
- kv_store=config_backend)
+ # This is going to block the entire reactor until loading is
+ # completed
+ log.info('loading-config-from-persisted-backend')
+ try:
+ self.root = ConfigRoot.load(VolthaInstance,
+ kv_store=config_backend)
+ self.started_with_existing_data = True
+ except Exception, e:
+ log.exception('Failure-loading-from-backend', e=e)
else:
- log.info('initializing new config')
+ log.info('initializing-a-new-config')
self.root = ConfigRoot(VolthaInstance(**self.init_kw),
kv_store=config_backend)
else:
@@ -71,6 +79,9 @@
def get_proxy(self, path, exclusive=False):
return self.root.get_proxy(path, exclusive)
+ def has_started_with_existing_data(self):
+ return self.started_with_existing_data
+
# gRPC service method implementations. BE CAREFUL; THESE ARE CALLED ON
# the gRPC threadpool threads.
@@ -338,6 +349,9 @@
'Device \'{}\' not found'.format(request.id))
context.set_code(StatusCode.NOT_FOUND)
+ except Exception, e:
+ log.exception('disable-exception', e=e)
+
return Empty()
@twisted_async
diff --git a/voltha/core/logical_device_agent.py b/voltha/core/logical_device_agent.py
index 21be9bf..083f42e 100644
--- a/voltha/core/logical_device_agent.py
+++ b/voltha/core/logical_device_agent.py
@@ -79,9 +79,15 @@
except Exception, e:
self.log.exception('init-error', e=e)
- def start(self):
+ def start(self, reconcile=False):
self.log.debug('starting')
- self.log.info('started')
+ if reconcile:
+ # Register the callbacks for the ports
+ ports = self.self_proxy.get('/ports')
+ for port in ports:
+ self._reconcile_port(port)
+ self.log.debug('ports-reconciled', ports=ports)
+ self.log.debug('started')
return self
def stop(self):
@@ -101,7 +107,7 @@
except Exception, e:
self.log.info('stop-exception', e=e)
- self.log.info('stopped')
+ self.log.debug('stopped')
def announce_flows_deleted(self, flows):
for f in flows:
@@ -551,6 +557,18 @@
)
)
+ def _reconcile_port(self, port):
+ self.log.debug('reconcile-port', port=port)
+ assert isinstance(port, LogicalPort)
+ self._port_list_updated(port)
+
+ # Set a proxy and callback for that specific port
+ self.port_proxy[port.id] = self.core.get_proxy(
+ '/logical_devices/{}/ports/{}'.format(self.logical_device_id,
+ port.id))
+ self.port_proxy[port.id].register_callback(
+ CallbackType.POST_UPDATE, self._port_changed)
+
def _port_removed(self, port):
self.log.debug('port-removed', port=port)
assert isinstance(port, LogicalPort)
diff --git a/voltha/leader.py b/voltha/leader.py
index fed32bd..6e58bd7 100644
--- a/voltha/leader.py
+++ b/voltha/leader.py
@@ -21,12 +21,17 @@
from twisted.internet import reactor
from twisted.internet.base import DelayedCall
from twisted.internet.defer import inlineCallbacks, DeferredList
+from simplejson import dumps, loads
from common.utils.asleep import asleep
log = get_logger()
+class ConfigMappingException(Exception):
+ pass
+
+
class Leader(object):
"""
A single instance of this object shall exist across the whole cluster.
@@ -37,6 +42,7 @@
ID_EXTRACTOR = '^(%s)([^/]+)$'
ASSIGNMENT_EXTRACTOR = '^%s(?P<member_id>[^/]+)/(?P<work_id>[^/]+)$'
+ CORE_STORE_KEY_EXTRACTOR = '^%s(?P<core_store_id>[^/]+)/root$'
# Public methods:
@@ -48,14 +54,23 @@
self.workload = []
self.members = []
+ self.core_store_ids = []
+ self.core_store_assignment = None
+
self.reassignment_soak_timer = None
self.workload_id_match = re.compile(
- self.ID_EXTRACTOR % self.coord.workload_prefix).match
+ self.ID_EXTRACTOR % self.coord.workload_prefix).match
self.member_id_match = re.compile(
self.ID_EXTRACTOR % self.coord.membership_prefix).match
+ self.core_data_id_match = re.compile(
+ self.CORE_STORE_KEY_EXTRACTOR % self.coord.core_store_prefix).match
+
+ self.core_store_assignment_key = self.coord.core_store_prefix + \
+ '/assignment'
+
self.assignment_match = re.compile(
self.ASSIGNMENT_EXTRACTOR % self.coord.assignment_prefix).match
@@ -118,8 +133,8 @@
if workload != self.workload:
log.info('workload-changed',
- old_workload_count=len(self.workload),
- new_workload_count=len(workload))
+ old_workload_count=len(self.workload),
+ new_workload_count=len(workload))
self.workload = workload
self._restart_reassignment_soak_timer()
@@ -136,21 +151,72 @@
reactor.callLater(0, self._track_workload, index)
@inlineCallbacks
+ def _get_core_store_mappings(self):
+ # Get the mapping record
+ (_, mappings) = yield self.coord.kv_get(
+ self.core_store_assignment_key, recurse=True)
+ if mappings:
+ self.core_store_assignment = loads(mappings[0]['Value'])
+ return
+ else: # Key has not been created yet
+ # Create the key with an empty dictionary value
+ value = dict()
+ result = yield self.coord.kv_put(self.core_store_assignment_key,
+ dumps(value))
+ if not result:
+ raise ConfigMappingException(self.instance_id)
+
+ # Ensure the record was created
+ (_, mappings) = yield self.coord.kv_get(
+ self.core_store_assignment_key, recurse=True)
+
+ self.core_store_assignment = loads(mappings[0]['Value'])
+
+ @inlineCallbacks
+ def _update_core_store_references(self):
+ try:
+ # Get the current set of configs keys
+ (_, results) = yield self.coord.kv_get(
+ self.coord.core_store_prefix, recurse=False, keys=True)
+
+ matches = (self.core_data_id_match(e) for e in results or [])
+ core_ids = [m.group(1) for m in matches if m is not None]
+
+ self.core_store_ids = core_ids
+
+ # Update the config mapping
+ self._get_core_store_mappings()
+
+ log.debug('core-data', core_ids=core_ids,
+ assignment=self.core_store_assignment)
+
+ except Exception, e:
+ log.exception('get-config-error', e=e)
+
+ @inlineCallbacks
def _track_members(self, index):
try:
(index, results) = yield self.coord.kv_get(
self.coord.membership_prefix, index=index, recurse=True)
- matches = (self.member_id_match(e['Key']) for e in results or [])
+ # Only members with valid session are considered active
+ matches = (self.member_id_match(e['Key'])
+ for e in results if 'Session' in e)
members = [m.group(2) for m in matches if m is not None]
+ log.debug('active-members', active_members=members)
+
+ # Check if the two sets are the same
if members != self.members:
+ # update the current set of config
+ yield self._update_core_store_references()
log.info('membership-changed',
- old_members_count=len(self.members),
- new_members_count=len(members))
+ prev_members=self.members,
+ curr_members=members,
+ core_store_mapping=self.core_store_assignment)
self.members = members
- self._restart_reassignment_soak_timer()
+ self._restart_core_store_reassignment_soak_timer()
except Exception, e:
log.exception('members-track-error', e=e)
@@ -174,6 +240,93 @@
self.reassignment_soak_timer = reactor.callLater(
self.soak_time, self._reassign_work)
+ def _restart_core_store_reassignment_soak_timer(self):
+
+ if self.reassignment_soak_timer is not None:
+ assert isinstance(self.reassignment_soak_timer, DelayedCall)
+ if not self.reassignment_soak_timer.called:
+ self.reassignment_soak_timer.cancel()
+
+ self.reassignment_soak_timer = reactor.callLater(
+ self.soak_time, self._reassign_core_stores)
+
+ @inlineCallbacks
+ def _reassign_core_stores(self):
+
+ def _get_new_str_id(max_val_in_str):
+ return str(int(max_val_in_str) + 1)
+
+ def _get_core_data_id_from_instance(instance_name):
+ for id, instance in self.core_store_assignment.iteritems():
+ if instance == instance_name:
+ return id
+
+ try:
+ log.debug('reassign-core-stores', curr_members=self.members)
+
+ # 1. clear the mapping for instances that are no longer running
+ updated_mapping = dict()
+ existing_active_config_members = set()
+ cleared_config_ids = set()
+ inactive_members = set()
+ log.debug('previous-assignment',
+ core_store_assignment=self.core_store_assignment)
+ if self.core_store_assignment:
+ for id, instance in self.core_store_assignment.iteritems():
+ if instance not in self.members:
+ updated_mapping[id] = None
+ cleared_config_ids.add(id)
+ inactive_members.add(instance)
+ else:
+ updated_mapping[id] = instance
+ existing_active_config_members.add(instance)
+
+ # 2. Update the mapping with the new set
+ current_id = max(self.core_store_assignment) \
+ if self.core_store_assignment else '0'
+ for instance in self.members:
+ if instance not in existing_active_config_members:
+ # Add the member to the config map
+ if cleared_config_ids:
+ # There is an empty slot
+ next_id = cleared_config_ids.pop()
+ updated_mapping[next_id] = instance
+ else:
+ # There are no empty slot, create new ids
+ current_id = _get_new_str_id(current_id)
+ updated_mapping[current_id] = instance
+
+ self.core_store_assignment = updated_mapping
+ log.debug('updated-assignment',
+ core_store_assignment=self.core_store_assignment)
+
+ # 3. save the mapping into consul
+ yield self.coord.kv_put(self.core_store_assignment_key,
+ dumps(self.core_store_assignment))
+
+ # 4. Assign the new workload to the newly created members
+ curr_members_set = set(self.members)
+ new_members = curr_members_set.difference(
+ existing_active_config_members)
+ for new_member in new_members:
+ yield self.coord.kv_put(
+ self.coord.assignment_prefix
+ + new_member + '/' +
+ self.coord.core_storage_suffix,
+ _get_core_data_id_from_instance(new_member))
+
+ # 5. Remove non-existent members
+ for member in inactive_members:
+ yield self.coord.kv_delete(
+ self.coord.assignment_prefix + member, recurse=True)
+ yield self.coord.kv_delete(
+ self.coord.membership_prefix + member,
+ recurse=True)
+
+ except Exception as e:
+ log.exception('config-reassignment-failure', e=e)
+ self._restart_core_store_reassignment_soak_timer()
+
@inlineCallbacks
def _reassign_work(self):
@@ -206,12 +359,12 @@
wanted_assignments = dict() # member_id -> set(work_id)
_ = [
wanted_assignments.setdefault(ring.get_node(work), set())
- .add(work)
+ .add(work)
for work in self.workload
]
for (member, work) in sorted(wanted_assignments.iteritems()):
log.info('assignment',
- member=member, work_count=len(work))
+ member=member, work_count=len(work))
# Step 2: discover current assignment (from consul)
@@ -225,7 +378,7 @@
_ = [
current_assignments.setdefault(
m.groupdict()['member_id'], set())
- .add(m.groupdict()['work_id'])
+ .add(m.groupdict()['work_id'])
for m, e in matches if m is not None
]
diff --git a/voltha/main.py b/voltha/main.py
index e889864..ae914d0 100755
--- a/voltha/main.py
+++ b/voltha/main.py
@@ -48,6 +48,7 @@
VERSION = '0.9.0'
+
defs = dict(
config=os.environ.get('CONFIG', './voltha.yml'),
consul=os.environ.get('CONSUL', 'localhost:8500'),
@@ -249,11 +250,20 @@
if not args.no_banner:
print_banner(self.log)
+ # Create a unique instnce id using the passed-in instanceid and
+ # UTC timestamp
+ current_time = arrow.utcnow().timestamp
+ self.instance_id = self.args.instance_id + '_' + str(current_time)
+
+ # Every voltha instance is given a core_storage id where the
+ # instance data is stored
+ self.core_store_id = None
+
self.startup_components()
if not args.no_heartbeat:
self.start_heartbeat()
- self.start_kafka_heartbeat(args.instance_id)
+ self.start_kafka_heartbeat(self.instance_id)
self.manhole = None
@@ -284,12 +294,18 @@
internal_host_address=self.args.internal_host_address,
external_host_address=self.args.external_host_address,
rest_port=self.args.rest_port,
- instance_id=self.args.instance_id,
+ instance_id=self.instance_id,
config=self.config,
consul=self.args.consul)
).start()
- init_rest_service(self.args.rest_port)
+ self.log.info('waiting-for-config-assignment')
+
+ # Wait until we get a config id before we proceed
+ self.core_store_id, store_prefix = \
+ yield registry('coordinator').get_core_store_id_and_prefix()
+
+ self.log.info('store-id', core_store_id=self.core_store_id)
yield registry.register(
'grpc_server',
@@ -297,6 +313,20 @@
).start()
yield registry.register(
+ 'core',
+ VolthaCore(
+ instance_id=self.instance_id,
+ core_store_id = self.core_store_id,
+ version=VERSION,
+ log_level=LogLevel.INFO
+ )
+ ).start(config_backend=load_backend(store_id=self.core_store_id,
+ store_prefix=store_prefix,
+ args=self.args))
+
+ init_rest_service(self.args.rest_port)
+
+ yield registry.register(
'kafka_proxy',
KafkaProxy(
self.args.consul,
@@ -306,15 +336,6 @@
).start()
yield registry.register(
- 'core',
- VolthaCore(
- instance_id=self.args.instance_id,
- version=VERSION,
- log_level=LogLevel.INFO
- )
- ).start(config_backend=load_backend(self.args))
-
- yield registry.register(
'frameio',
FrameIOManager()
).start()
@@ -332,10 +353,17 @@
if self.args.manhole_port is not None:
self.start_manhole(self.args.manhole_port)
+ # Now that all components are loaded, in the scenario where this
+ # voltha instance is picking up an existing set of data (from a
+ # voltha instance that dies/stopped) then we need to setup this
+ # instance from where the previous one left
+
+ yield registry('core').reconcile_data()
+
self.log.info('started-internal-services')
except Exception as e:
- self.log.exception('Failure to start all components {}'.format(e))
+ self.log.exception('Failure-to-start-all-components', e=e)
def start_manhole(self, port):
self.manhole = Manhole(
diff --git a/voltha/voltha.yml b/voltha/voltha.yml
index fa2a3ea..bd91ec4 100644
--- a/voltha/voltha.yml
+++ b/voltha/voltha.yml
@@ -55,6 +55,7 @@
coordinator:
voltha_kv_prefix: 'service/voltha'
+ core_store_key: 'data/core'
leader_key: 'leader'
membership_key: 'members'
assignment_key: 'assignments'
diff --git a/voltha/worker.py b/voltha/worker.py
index b9e14e2..aff83b1 100644
--- a/voltha/worker.py
+++ b/voltha/worker.py
@@ -18,7 +18,7 @@
from structlog import get_logger
from twisted.internet import reactor
from twisted.internet.base import DelayedCall
-from twisted.internet.defer import inlineCallbacks, returnValue
+from twisted.internet.defer import inlineCallbacks, returnValue, Deferred
from common.utils.asleep import asleep
@@ -51,6 +51,10 @@
self.assignment_match = re.compile(
self.ASSIGNMENT_EXTRACTOR % self.coord.assignment_prefix).match
+ self.mycore_store_id = None
+
+ self.wait_for_core_store_assignment = Deferred()
+
@inlineCallbacks
def start(self):
log.debug('starting')
@@ -65,6 +69,15 @@
self.assignment_soak_timer.cancel()
log.info('stopped')
+ @inlineCallbacks
+ def get_core_store_id(self):
+ if self.mycore_store_id:
+ returnValue(self.mycore_store_id)
+ else:
+ # Let's wait until we get assigned a store_id from the leader
+ val = yield self.wait_for_core_store_assignment
+ returnValue(val)
+
# Private methods:
def _start_tracking_my_assignments(self):
@@ -88,15 +101,25 @@
self.coord.assignment_prefix + self.instance_id,
index=index, recurse=True)
- matches = [
- (self.assignment_match(e['Key']), e) for e in results or []]
+ # 1. Check whether we have been assigned a full voltha instance
+ if results and not self.mycore_store_id:
+ # We have no store id set yet
+ core_stores = [c['Value'] for c in results if
+ c['Key'] == self.coord.assignment_prefix +
+ self.instance_id + '/' +
+ self.coord.core_storage_suffix]
+ if core_stores:
+ self.mycore_store_id = core_stores[0]
+ log.debug('store-assigned',
+ mycore_store_id=self.mycore_store_id)
+ self._stash_and_restart_core_store_soak_timer()
- my_workload = set([
- m.groupdict()['work_id'] for m, e in matches if m is not None
- ])
-
- if my_workload != self.my_workload:
- self._stash_and_restart_soak_timer(my_workload)
+ # 2. Check whether we have been assigned a work item
+ if results and self.mycore_store_id:
+ # Check for difference between current worload and newer one
+ # TODO: Depending on how workload gets load balanced we may
+ # need to add workload distribution here
+ pass
except Exception, e:
log.exception('assignments-track-error', e=e)
@@ -127,8 +150,25 @@
Called when finally the dust has settled on our assignments.
:return: None
"""
- log.info('my-assignments-changed',
- old_count=len(self.my_workload),
- new_count=len(self.my_candidate_workload))
+ log.debug('my-assignments-changed',
+ old_count=len(self.my_workload),
+ new_count=len(self.my_candidate_workload),
+ workload=self.my_workload)
self.my_workload, self.my_candidate_workload = \
self.my_candidate_workload, None
+
+ def _stash_and_restart_core_store_soak_timer(self):
+
+ log.debug('re-start-assignment-config-soaking')
+
+ if self.assignment_soak_timer is not None:
+ if not self.assignment_soak_timer.called:
+ self.assignment_soak_timer.cancel()
+
+ self.assignment_soak_timer = reactor.callLater(
+ self.soak_time, self._process_config_assignment)
+
+ def _process_config_assignment(self):
+ log.debug('process-config-assignment',
+ mycore_store_id=self.mycore_store_id)
+ self.wait_for_core_store_assignment.callback(self.mycore_store_id)
\ No newline at end of file