[ 4222 ] Minor changes after code review
This is the initial commit for Persistence and Voltha restart.
It consists of the following:
1) Introduction of a store config id which represents the data of
a Voltha instance.
2) The Coordinator service dynamically allocates a store config id
to each voltha instance on startup. It also reallocates the same id
to another voltha instance in the event the previous voltha instance
with that store id went down.
3) All voltha data is stored in Consul as KV
4) When a Voltha instance is started and get allocated a config id that
refers to existing data (from an instance that went down), then it will
load all the data from Consul into its own memory and start a reconciliation
process.
5) During the reconciliation process, the necessary agents and
callbacks are created as per the data. A reconcile() API is also
invoked on the adapters to perform their side of the reconciliation.
6) The Reconciliation process is implemented in ponsim OLT and ONU
7) A set of integration tests focussed on persistence and voltha
restarts.
8) Fix a few bugs along the way
Change-Id: I8c2bbae3b2fc79d0afd8ce3b7b0be6bde93e492a
diff --git a/voltha/adapters/broadcom_onu/broadcom_onu.py b/voltha/adapters/broadcom_onu/broadcom_onu.py
index a424831..8adb0e8 100644
--- a/voltha/adapters/broadcom_onu/broadcom_onu.py
+++ b/voltha/adapters/broadcom_onu/broadcom_onu.py
@@ -97,6 +97,9 @@
reactor.callLater(0, self.devices_handlers[device.proxy_address.channel_id].activate, device)
return device
+ def reconcile_device(self, device):
+ raise NotImplementedError()
+
def abandon_device(self, device):
raise NotImplementedError()
diff --git a/voltha/adapters/dpoe_onu/dpoe_onu.py b/voltha/adapters/dpoe_onu/dpoe_onu.py
index 5aa8838..4bf13f2 100644
--- a/voltha/adapters/dpoe_onu/dpoe_onu.py
+++ b/voltha/adapters/dpoe_onu/dpoe_onu.py
@@ -115,6 +115,9 @@
reactor.callLater(0.1, self._onu_device_activation, device)
return device
+ def reconcile_device(self, device):
+ raise NotImplementedError()
+
@inlineCallbacks
def _onu_device_activation(self, device):
# first we verify that we got parent reference and proxy info
diff --git a/voltha/adapters/interface.py b/voltha/adapters/interface.py
index e44537b..1b6881f 100644
--- a/voltha/adapters/interface.py
+++ b/voltha/adapters/interface.py
@@ -81,6 +81,19 @@
:return: (Deferred) Shall be fired to acknowledge device ownership.
"""
+ def reconcile_device(device):
+ """
+ Make sure the adapter looks after given device. Called when this
+ device has changed ownership from another Voltha instance to
+ this one (typically, this occurs when the previous voltha
+ instance went down).
+ :param device: A voltha.Device object, with possible device-type
+ specific extensions. Such extensions shall be described as part of
+ the device type specification returned by device_types().
+ :return: (Deferred) Shall be fired to acknowledge device ownership.
+ """
+
+
def abandon_device(device):
"""
Make sur ethe adapter no longer looks after device. This is called
diff --git a/voltha/adapters/maple_olt/maple_olt.py b/voltha/adapters/maple_olt/maple_olt.py
index f77db19..85430b0 100644
--- a/voltha/adapters/maple_olt/maple_olt.py
+++ b/voltha/adapters/maple_olt/maple_olt.py
@@ -430,6 +430,9 @@
reactor.callLater(0, self.devices_handlers[device.id].activate, device)
return device
+ def reconcile_device(self, device):
+ raise NotImplementedError()
+
def abandon_device(self, device):
raise NotImplementedError()
diff --git a/voltha/adapters/microsemi_olt/microsemi_olt.py b/voltha/adapters/microsemi_olt/microsemi_olt.py
index 42dc56f..03783fb 100644
--- a/voltha/adapters/microsemi_olt/microsemi_olt.py
+++ b/voltha/adapters/microsemi_olt/microsemi_olt.py
@@ -105,6 +105,9 @@
log.info('adopted-device', device=device)
self.olts[target] = (olt, activation, comm)
+ def reconcile_device(self, device):
+ raise NotImplementedError()
+
def abandon_device(self, device):
self._abandon(device.mac_address)
diff --git a/voltha/adapters/pmcs_onu/pmcs_onu.py b/voltha/adapters/pmcs_onu/pmcs_onu.py
index a58353f..da5a5be 100644
--- a/voltha/adapters/pmcs_onu/pmcs_onu.py
+++ b/voltha/adapters/pmcs_onu/pmcs_onu.py
@@ -99,6 +99,9 @@
reactor.callLater(0.1, self._onu_device_activation, device)
return device
+ def reconcile_device(self, device):
+ raise NotImplementedError()
+
def abandon_device(self, device):
raise NotImplementedError()
diff --git a/voltha/adapters/ponsim_olt/ponsim_olt.py b/voltha/adapters/ponsim_olt/ponsim_olt.py
index 138e248..be57fac 100644
--- a/voltha/adapters/ponsim_olt/ponsim_olt.py
+++ b/voltha/adapters/ponsim_olt/ponsim_olt.py
@@ -161,17 +161,22 @@
alarm_event = self.adapter.adapter_agent.create_alarm(
resource_id=self.device.id,
- description="{}.{} - {}".format(self.adapter.name, self.device.id,
- alarm_data['description']) if 'description' in alarm_data else None,
+ description="{}.{} - {}".format(self.adapter.name,
+ self.device.id,
+ alarm_data[
+ 'description']) if 'description' in alarm_data else None,
type=alarm_data['type'] if 'type' in alarm_data else None,
- category=alarm_data['category'] if 'category' in alarm_data else None,
- severity=alarm_data['severity'] if 'severity' in alarm_data else None,
+ category=alarm_data[
+ 'category'] if 'category' in alarm_data else None,
+ severity=alarm_data[
+ 'severity'] if 'severity' in alarm_data else None,
state=alarm_data['state'] if 'state' in alarm_data else None,
raised_ts=alarm_data['ts'] if 'ts' in alarm_data else 0,
context=current_context
)
- self.adapter.adapter_agent.submit_alarm(self.device.id, alarm_event)
+ self.adapter.adapter_agent.submit_alarm(self.device.id,
+ alarm_event)
except Exception as e:
log.exception('failed-to-send-alarm', e=e)
@@ -237,6 +242,23 @@
reactor.callLater(0, self.devices_handlers[device.id].activate, device)
return device
+ def reconcile_device(self, device):
+ try:
+ self.devices_handlers[device.id] = PonSimOltHandler(self,
+ device.id)
+ # Work only required for devices that are in ENABLED state
+ if device.admin_state == AdminState.ENABLED:
+ reactor.callLater(0,
+ self.devices_handlers[device.id].reconcile,
+ device)
+ else:
+ # Invoke the children reconciliation which would setup the
+ # basic children data structures
+ self.adapter_agent.reconcile_child_devices(device.id)
+ return device
+ except Exception, e:
+ log.exception('Exception', e=e)
+
def abandon_device(self, device):
raise NotImplementedError()
@@ -257,6 +279,7 @@
def delete_device(self, device):
log.info('delete-device', device_id=device.id)
+ # TODO: Update the logical device mapping
reactor.callLater(0, self.devices_handlers[device.id].delete)
return device
@@ -329,6 +352,12 @@
self.channel = grpc.insecure_channel(device.host_and_port)
return self.channel
+ def _get_nni_port(self):
+ ports = self.adapter_agent.get_ports(self.device_id, Port.ETHERNET_NNI)
+ if ports:
+ # For now, we use on one NNI port
+ return ports[0]
+
def activate(self, device):
self.log.info('activating')
@@ -448,8 +477,66 @@
# Start collecting stats from the device after a brief pause
self.start_kpi_collection(device.id)
+ def reconcile(self, device):
+ self.log.info('reconciling-OLT-device-starts')
+
+ if not device.host_and_port:
+ device.oper_status = OperStatus.FAILED
+ device.reason = 'No host_and_port field provided'
+ self.adapter_agent.update_device(device)
+ return
+
+ try:
+ stub = ponsim_pb2.PonSimStub(self.get_channel())
+ info = stub.GetDeviceInfo(Empty())
+ log.info('got-info', info=info)
+ # TODO: Verify we are connected to the same device we are
+ # reconciling - not much data in ponsim to differentiate at the
+ # time
+ device.oper_status = OperStatus.ACTIVE
+ self.adapter_agent.update_device(device)
+ self.ofp_port_no = info.nni_port
+ self.nni_port = self._get_nni_port()
+ except Exception, e:
+ log.exception('device-unreachable', e=e)
+ device.connect_status = ConnectStatus.UNREACHABLE
+ device.oper_status = OperStatus.UNKNOWN
+ self.adapter_agent.update_device(device)
+ return
+
+ # Now set the initial PM configuration for this device
+ self.pm_metrics = AdapterPmMetrics(device)
+ pm_config = self.pm_metrics.make_proto()
+ log.info("initial-pm-config", pm_config=pm_config)
+ self.adapter_agent.update_device_pm_config(pm_config, init=True)
+
+ # Setup alarm handler
+ self.alarms = AdapterAlarms(self.adapter, device)
+
+ # TODO: Is there anything required to verify nni and PON ports
+
+ # Set the logical device id
+ device = self.adapter_agent.get_device(device.id)
+ if device.parent_id:
+ self.logical_device_id = device.parent_id
+ self.adapter_agent.reconcile_logical_device(device.parent_id)
+ else:
+ self.log.info('no-logical-device-set')
+
+ # Reconcile child devices
+ self.adapter_agent.reconcile_child_devices(device.id)
+
+ # finally, open the frameio port to receive in-band packet_in messages
+ self.io_port = registry('frameio').open_port(
+ self.interface, self.rcv_io, is_inband_frame)
+
+ # Start collecting stats from the device after a brief pause
+ self.start_kpi_collection(device.id)
+
+ self.log.info('reconciling-OLT-device-ends')
+
def rcv_io(self, port, frame):
- self.log.info('reveived', iface_name=port.iface_name,
+ self.log.info('received', iface_name=port.iface_name,
frame_len=len(frame))
pkt = Ether(frame)
if pkt.haslayer(Dot1Q):
@@ -572,6 +659,12 @@
# close the frameio port
registry('frameio').close_port(self.io_port)
+ # Update the logice device mapping
+ if self.logical_device_id in \
+ self.adapter.logical_device_id_to_root_device_id:
+ del self.adapter.logical_device_id_to_root_device_id[
+ self.logical_device_id]
+
# TODO:
# 1) Remove all flows from the device
# 2) Remove the device from ponsim
@@ -584,6 +677,15 @@
# Get the latest device reference
device = self.adapter_agent.get_device(self.device_id)
+ # Set the ofp_port_no and nni_port in case we bypassed the reconcile
+ # process if the device was in DISABLED state on voltha restart
+ if not self.ofp_port_no and not self.nni_port:
+ stub = ponsim_pb2.PonSimStub(self.get_channel())
+ info = stub.GetDeviceInfo(Empty())
+ log.info('got-info', info=info)
+ self.ofp_port_no = info.nni_port
+ self.nni_port = self._get_nni_port()
+
# Update the connect status to REACHABLE
device.connect_status = ConnectStatus.REACHABLE
self.adapter_agent.update_device(device)
@@ -645,10 +747,8 @@
admin_state=AdminState.ENABLED)
# finally, open the frameio port to receive in-band packet_in messages
- self.log.info('registering-frameio')
self.io_port = registry('frameio').open_port(
self.interface, self.rcv_io, is_inband_frame)
- self.log.info('registered-frameio')
self.log.info('re-enabled', device_id=device.id)
diff --git a/voltha/adapters/ponsim_onu/ponsim_onu.py b/voltha/adapters/ponsim_onu/ponsim_onu.py
index cbaf3ab..05df340 100644
--- a/voltha/adapters/ponsim_onu/ponsim_onu.py
+++ b/voltha/adapters/ponsim_onu/ponsim_onu.py
@@ -94,6 +94,15 @@
reactor.callLater(0, self.devices_handlers[device.id].activate, device)
return device
+ def reconcile_device(self, device):
+ self.devices_handlers[device.id] = PonSimOnuHandler(self, device.id)
+ # Reconcile only if state was ENABLED
+ if device.admin_state == AdminState.ENABLED:
+ reactor.callLater(0,
+ self.devices_handlers[device.id].reconcile,
+ device)
+ return device
+
def abandon_device(self, device):
raise NotImplementedError()
@@ -136,8 +145,13 @@
def receive_proxied_message(self, proxy_address, msg):
log.info('receive-proxied-message', proxy_address=proxy_address,
device_id=proxy_address.device_id, msg=msg)
- handler = self.devices_handlers[proxy_address.device_id]
- handler.receive_message(msg)
+ # Device_id from the proxy_address is the olt device id. We need to
+ # get the onu device id using the port number in the proxy_address
+ device = self.adapter_agent. \
+ get_child_device_with_proxy_address(proxy_address)
+ if device:
+ handler = self.devices_handlers[device.id]
+ handler.receive_message(msg)
def receive_packet_out(self, logical_device_id, egress_port_no, msg):
log.info('packet-out', logical_device_id=logical_device_id,
@@ -240,6 +254,44 @@
device.oper_status = OperStatus.ACTIVE
self.adapter_agent.update_device(device)
+ def _get_uni_port(self):
+ ports = self.adapter_agent.get_ports(self.device_id, Port.ETHERNET_UNI)
+ if ports:
+ # For now, we use on one uni port
+ return ports[0]
+
+ def _get_pon_port(self):
+ ports = self.adapter_agent.get_ports(self.device_id, Port.PON_ONU)
+ if ports:
+ # For now, we use on one uni port
+ return ports[0]
+
+ def reconcile(self, device):
+ self.log.info('reconciling-ONU-device-starts')
+
+ # first we verify that we got parent reference and proxy info
+ assert device.parent_id
+ assert device.proxy_address.device_id
+ assert device.proxy_address.channel_id
+
+ # register for proxied messages right away
+ self.proxy_address = device.proxy_address
+ self.adapter_agent.register_for_proxied_messages(device.proxy_address)
+
+ # Set the connection status to REACHABLE
+ device.connect_status = ConnectStatus.REACHABLE
+ self.adapter_agent.update_device(device)
+
+ # TODO: Verify that the uni, pon and logical ports exists
+
+ # Mark the device as REACHABLE and ACTIVE
+ device = self.adapter_agent.get_device(device.id)
+ device.connect_status = ConnectStatus.REACHABLE
+ device.oper_status = OperStatus.ACTIVE
+ self.adapter_agent.update_device(device)
+
+ self.log.info('reconciling-ONU-device-ends')
+
@inlineCallbacks
def update_flow_table(self, flows):
@@ -314,6 +366,7 @@
portid=port_id)
# Remove pon port from parent
+ self.pon_port = self._get_pon_port()
self.adapter_agent.delete_port_reference_from_parent(self.device_id,
self.pon_port)
@@ -333,60 +386,66 @@
def reenable(self):
self.log.info('re-enabling', device_id=self.device_id)
+ try:
+ # Get the latest device reference
+ device = self.adapter_agent.get_device(self.device_id)
- # Get the latest device reference
- device = self.adapter_agent.get_device(self.device_id)
+ # First we verify that we got parent reference and proxy info
+ assert device.parent_id
+ assert device.proxy_address.device_id
+ assert device.proxy_address.channel_id
- # First we verify that we got parent reference and proxy info
- assert self.uni_port
- assert device.parent_id
- assert device.proxy_address.device_id
- assert device.proxy_address.channel_id
+ # Re-register for proxied messages right away
+ self.proxy_address = device.proxy_address
+ self.adapter_agent.register_for_proxied_messages(
+ device.proxy_address)
- # Re-register for proxied messages right away
- self.proxy_address = device.proxy_address
- self.adapter_agent.register_for_proxied_messages(device.proxy_address)
+ # Re-enable the ports on that device
+ self.adapter_agent.enable_all_ports(self.device_id)
- # Re-enable the ports on that device
- self.adapter_agent.enable_all_ports(self.device_id)
+ # Refresh the port reference
+ self.uni_port = self._get_uni_port()
+ self.pon_port = self._get_pon_port()
- # Add the pon port reference to the parent
- self.adapter_agent.add_port_reference_to_parent(device.id,
- self.pon_port)
+ # Add the pon port reference to the parent
+ self.adapter_agent.add_port_reference_to_parent(device.id,
+ self.pon_port)
- # Update the connect status to REACHABLE
- device.connect_status = ConnectStatus.REACHABLE
- self.adapter_agent.update_device(device)
+ # Update the connect status to REACHABLE
+ device.connect_status = ConnectStatus.REACHABLE
+ self.adapter_agent.update_device(device)
- # re-add uni port to logical device
- parent_device = self.adapter_agent.get_device(device.parent_id)
- logical_device_id = parent_device.parent_id
- assert logical_device_id
- port_no = device.proxy_address.channel_id
- cap = OFPPF_1GB_FD | OFPPF_FIBER
- self.adapter_agent.add_logical_port(logical_device_id, LogicalPort(
- id='uni-{}'.format(port_no),
- ofp_port=ofp_port(
- port_no=port_no,
- hw_addr=mac_str_to_tuple('00:00:00:00:00:%02x' % port_no),
- name='uni-{}'.format(port_no),
- config=0,
- state=OFPPS_LIVE,
- curr=cap,
- advertised=cap,
- peer=cap,
- curr_speed=OFPPF_1GB_FD,
- max_speed=OFPPF_1GB_FD
- ),
- device_id=device.id,
- device_port_no=self.uni_port.port_no
- ))
+ # re-add uni port to logical device
+ parent_device = self.adapter_agent.get_device(device.parent_id)
+ logical_device_id = parent_device.parent_id
+ assert logical_device_id
+ port_no = device.proxy_address.channel_id
+ cap = OFPPF_1GB_FD | OFPPF_FIBER
+ self.adapter_agent.add_logical_port(logical_device_id, LogicalPort(
+ id='uni-{}'.format(port_no),
+ ofp_port=ofp_port(
+ port_no=port_no,
+ hw_addr=mac_str_to_tuple('00:00:00:00:00:%02x' % port_no),
+ name='uni-{}'.format(port_no),
+ config=0,
+ state=OFPPS_LIVE,
+ curr=cap,
+ advertised=cap,
+ peer=cap,
+ curr_speed=OFPPF_1GB_FD,
+ max_speed=OFPPF_1GB_FD
+ ),
+ device_id=device.id,
+ device_port_no=self.uni_port.port_no
+ ))
- device = self.adapter_agent.get_device(device.id)
- device.oper_status = OperStatus.ACTIVE
- self.adapter_agent.update_device(device)
+ device = self.adapter_agent.get_device(device.id)
+ device.oper_status = OperStatus.ACTIVE
+ self.adapter_agent.update_device(device)
- self.log.info('re-enabled', device_id=device.id)
+ self.log.info('re-enabled', device_id=device.id)
+ except Exception, e:
+ self.log.exception('error-reenabling', e=e)
def delete(self):
self.log.info('deleting', device_id=self.device_id)
diff --git a/voltha/adapters/simulated_olt/simulated_olt.py b/voltha/adapters/simulated_olt/simulated_olt.py
index f2e056b..f973381 100644
--- a/voltha/adapters/simulated_olt/simulated_olt.py
+++ b/voltha/adapters/simulated_olt/simulated_olt.py
@@ -215,6 +215,9 @@
reactor.callLater(0.2, self._simulate_device_activation, device)
return device
+ def reconcile_device(self, device):
+ raise NotImplementedError()
+
def abandon_device(self, device):
raise NotImplementedError()
diff --git a/voltha/adapters/simulated_onu/simulated_onu.py b/voltha/adapters/simulated_onu/simulated_onu.py
index 83dfd85..b53fd11 100644
--- a/voltha/adapters/simulated_onu/simulated_onu.py
+++ b/voltha/adapters/simulated_onu/simulated_onu.py
@@ -90,6 +90,9 @@
reactor.callLater(0.2, self._simulate_device_activation, device)
return device
+ def reconcile_device(self, device):
+ raise NotImplementedError()
+
def abandon_device(self, device):
raise NotImplementedError()
diff --git a/voltha/adapters/tibit_olt/tibit_olt.py b/voltha/adapters/tibit_olt/tibit_olt.py
index 5936107..3d9e264 100644
--- a/voltha/adapters/tibit_olt/tibit_olt.py
+++ b/voltha/adapters/tibit_olt/tibit_olt.py
@@ -192,6 +192,9 @@
self._activate_io_port()
reactor.callLater(0, self._launch_device_activation, device)
+ def reconcile_device(self, device):
+ raise NotImplementedError()
+
def _activate_io_port(self):
if self.io_port is None:
self.io_port = registry('frameio').open_port(
diff --git a/voltha/adapters/tibit_onu/tibit_onu.py b/voltha/adapters/tibit_onu/tibit_onu.py
index 247a868..8b5f754 100644
--- a/voltha/adapters/tibit_onu/tibit_onu.py
+++ b/voltha/adapters/tibit_onu/tibit_onu.py
@@ -145,6 +145,9 @@
reactor.callLater(0.1, self._onu_device_activation, device)
return device
+ def reconcile_device(self, device):
+ raise NotImplementedError()
+
@inlineCallbacks
def _onu_device_activation(self, device):
# first we verify that we got parent reference and proxy info