Define and wire in bulk flow update adapter API
This adds support for defining bulk or incremental
flow update capability on a per device type basis
and implements the callout mechanism.
It also defines 5 new API entries for proxying
messages from a child-device adapter (e.g. ONU adapter)
to the actual device as well as the APIs for registering
and receiving async messages from a device via a
parent device acting as a proxy. The implementation
of this is left for the next PR.
Change-Id: Ic48a458c170083842b6bc674d675b5b60c0827f6
diff --git a/voltha/adapters/interface.py b/voltha/adapters/interface.py
index feec225..f9426c6 100644
--- a/voltha/adapters/interface.py
+++ b/voltha/adapters/interface.py
@@ -92,9 +92,55 @@
def deactivate_device(device):
"""
Called if the device is to be deactivate based on a NBI call.
+ :param device: A Voltha.Device object.
:return: (Deferred) Shall be fired to acknowledge deactivation.
"""
+ def update_flows_bulk(device, flows, groups):
+ """
+ Called after any flow table change, but only if the device supports
+ bulk mode, which is expressed by the 'accepts_bulk_flow_update'
+ capability attribute of the device type.
+ :param device: A Voltha.Device object.
+ :param flows: An openflow_v13.Flows object
+ :param groups: An openflow_v13.Flows object
+ :return: (Deferred or None)
+ """
+
+ def update_flows_incrementally(device, flow_changes, group_changes):
+ """
+ [This mode is not supported yet.]
+ :param device: A Voltha.Device object.
+ :param flow_changes:
+ :param group_changes:
+ :return:
+ """
+
+ def send_proxied_message(proxy_address, msg):
+ """
+ Forward a msg to a child device of device, addressed by the given
+ proxy_address=Device.ProxyAddress().
+ :param proxy_address: Address info for the parent device
+ to route the message to the child device. This was given to the
+ child device by the parent device at the creation of the child
+ device.
+ :param msg: (str) The actual message to send.
+ :return: (Deferred(None) or None) The return of this method should
+ indicate that the message was successfully *sent*.
+ """
+
+ def receive_proxied_message(proxy_address, msg):
+ """
+ Pass an async message (arrived via a proxy) to this device.
+ :param proxy_address: Address info for the parent device
+ to route the message to the child device. This was given to the
+ child device by the parent device at the creation of the child
+ device. Note this is the proxy_address with which the adapter
+ had to register prior to receiving proxied messages.
+ :param msg: (str) The actual message received.
+ :return: None
+ """
+
# TODO work in progress
# ...
@@ -106,7 +152,7 @@
toward Voltha's CORE via the APIs defined here.
"""
- def get_device(selfdevice_id):
+ def get_device(device_id):
# TODO add doc
""""""
@@ -131,12 +177,54 @@
""""""
def child_device_detected(parent_device_id,
+ parent_port_no,
child_device_type,
child_device_address_kw):
# TODO add doc
""""""
- # TODO work in progress
- pass
+ def send_proxied_message(proxy_address, msg):
+ """
+ Forward a msg to a child device of device, addressed by the given
+ proxy_address=Device.ProxyAddress().
+ :param proxy_address: Address info for the parent device
+ to route the message to the child device. This was given to the
+ child device by the parent device at the creation of the child
+ device.
+ :param msg: (str) The actual message to send.
+ :return: (Deferred(None) or None) The return of this method should
+ indicate that the message was successfully *sent*.
+ """
+
+ def receive_proxied_message(proxy_address, msg):
+ """
+ Pass an async message (arrived via a proxy) to this device.
+ :param proxy_address: Address info for the parent device
+ to route the message to the child device. This was given to the
+ child device by the parent device at the creation of the child
+ device. Note this is the proxy_address with which the adapter
+ had to register prior to receiving proxied messages.
+ :param msg: (str) The actual message received.
+ :return: None
+ """
+
+ def register_for_proxied_messages(proxy_address):
+ """
+ A child device adapter can use this to indicate its intent to
+ receive async messages sent via a parent device. Example: an
+ ONU adapter can use this to register for OMCI messages received
+ via the OLT and the OLT adapter.
+ :param child_device_address: Address info that was given to the
+ child device by the parent device at the creation of the child
+ device. Its uniqueness acts as a router information for the
+ registration.
+ :return: None
+ """
+
+ def unregister_for_proxied_messages(proxy_address):
+ """
+ Cancel a previous registration
+ :return:
+ """
diff --git a/voltha/adapters/simulated_olt/simulated_olt.py b/voltha/adapters/simulated_olt/simulated_olt.py
index e358b26..c6a6e11 100644
--- a/voltha/adapters/simulated_olt/simulated_olt.py
+++ b/voltha/adapters/simulated_olt/simulated_olt.py
@@ -45,6 +45,14 @@
name = 'simulated_olt'
+ supported_device_types = [
+ DeviceType(
+ id='simulated_olt',
+ adapter=name,
+ accepts_bulk_flow_update=True
+ )
+ ]
+
def __init__(self, adapter_agent, config):
self.adapter_agent = adapter_agent
self.config = config
@@ -69,10 +77,7 @@
return self.descriptor
def device_types(self):
- return DeviceTypes(items=[
- DeviceType(id='simulated_olt', adapter=self.name),
- # DeviceType(id='simulated_onu', adapter=self.name)
- ])
+ return DeviceTypes(items=self.supported_device_types)
def health(self):
return HealthStatus(state=HealthStatus.HealthState.HEALTHY)
@@ -317,7 +322,7 @@
parent_port_no=1,
child_device_type='simulated_onu',
child_device_address_kw=dict(
- proxy_device=Device.ProxyDevice(
+ proxy_device=Device.ProxyAddress(
device_id=device.id,
channel_id=vlan_id
),
@@ -336,3 +341,15 @@
vlan_id = seq + 100
return gemport, vlan_id
+ def update_flows_bulk(self, device, flows, groups):
+ log.debug('bulk-flow-update', device_id=device.id,
+ flows=flows, groups=groups)
+
+ def update_flows_incrementally(self, device, flow_changes, group_changes):
+ raise NotImplementedError()
+
+ def send_proxied_message(self, proxy_address, msg):
+ raise NotImplementedError()
+
+ def receive_proxied_message(self, proxy_address, msg):
+ raise NotImplementedError()
diff --git a/voltha/adapters/simulated_onu/simulated_onu.py b/voltha/adapters/simulated_onu/simulated_onu.py
index ff8a01a..38cb2c5 100644
--- a/voltha/adapters/simulated_onu/simulated_onu.py
+++ b/voltha/adapters/simulated_onu/simulated_onu.py
@@ -45,6 +45,14 @@
name = 'simulated_onu'
+ supported_device_types = [
+ DeviceType(
+ id='simulated_onu',
+ adapter=name,
+ accepts_bulk_flow_update=True
+ )
+ ]
+
def __init__(self, adapter_agent, config):
self.adapter_agent = adapter_agent
self.config = config
@@ -67,9 +75,7 @@
return self.descriptor
def device_types(self):
- return DeviceTypes(items=[
- DeviceType(id='simulated_onu', adapter=self.name)
- ])
+ return DeviceTypes(items=self.supported_device_types)
def health(self):
return HealthStatus(state=HealthStatus.HealthState.HEALTHY)
@@ -83,8 +89,8 @@
return device
def abandon_device(self, device):
- raise NotImplementedError(0
- )
+ raise NotImplementedError()
+
def deactivate_device(self, device):
raise NotImplementedError()
@@ -168,3 +174,16 @@
device = self.adapter_agent.get_device(device.id)
device.oper_status = OperStatus.ACTIVE
self.adapter_agent.update_device(device)
+
+ def update_flows_bulk(self, device, flows, groups):
+ log.debug('bulk-flow-update', device_id=device.id,
+ flows=flows, groups=groups)
+
+ def update_flows_incrementally(self, device, flow_changes, group_changes):
+ raise NotImplementedError()
+
+ def send_proxied_message(self, proxy_address, msg):
+ raise NotImplementedError()
+
+ def receive_proxied_message(self, proxy_address, msg):
+ raise NotImplementedError()
diff --git a/voltha/core/adapter_agent.py b/voltha/core/adapter_agent.py
index 319f084..5659f85 100644
--- a/voltha/core/adapter_agent.py
+++ b/voltha/core/adapter_agent.py
@@ -127,6 +127,13 @@
def deactivate_device(self, device):
return self.adapter.deactivate_device(device)
+ def update_flows_bulk(self, device, flows, groups):
+ return self.adapter.update_flows_bulk(device, flows, groups)
+
+ def update_flows_incrementally(self, device, flow_changes, group_changes):
+ return self.update_flows_incrementally(
+ device, flow_changes, group_changes)
+
# ~~~~~~~~~~~~~~~~~~~ Adapter-Facing Service ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def get_device(self, device_id):
diff --git a/voltha/core/device_agent.py b/voltha/core/device_agent.py
index 150a9fc..17ec620 100644
--- a/voltha/core/device_agent.py
+++ b/voltha/core/device_agent.py
@@ -35,14 +35,31 @@
class DeviceAgent(object):
def __init__(self, core, initial_data):
+
self.core = core
self._tmp_initial_data = initial_data
+ self.last_data = None
+
self.proxy = core.get_proxy('/devices/{}'.format(initial_data.id))
+ self.flows_proxy = core.get_proxy(
+ '/devices/{}/flows'.format(initial_data.id))
+ self.groups_proxy = core.get_proxy(
+ '/devices/{}/flow_groups'.format(initial_data.id))
+
self.proxy.register_callback(
CallbackType.PRE_UPDATE, self._validate_update)
self.proxy.register_callback(
CallbackType.POST_UPDATE, self._process_update)
- self.last_data = None
+
+ self.flows_proxy.register_callback(
+ CallbackType.POST_UPDATE, self._flow_table_updated)
+ self.groups_proxy.register_callback(
+ CallbackType.POST_UPDATE, self._group_table_updated)
+
+ # to know device capabilities
+ self.device_type = core.get_proxy(
+ '/device_types/{}'.format(initial_data.type)).get()
+
self.adapter_agent = None
@inlineCallbacks
@@ -172,3 +189,49 @@
(AdminState.DISABLED, AdminState.ENABLED): _reenable_device
}
+
+ ## <======================= FLOW TABLE UPDATE HANDLING ====================
+
+ @inlineCallbacks
+ def _flow_table_updated(self, flows):
+ log.debug('flow-table-updated',
+ logical_device_id=self.last_data.id, flows=flows)
+
+ # if device accepts bulk flow update, lets just call that
+ if self.device_type.accepts_bulk_flow_update:
+ groups = self.groups_proxy.get('/') # gather flow groups
+ yield self.adapter_agent.update_flows_bulk(
+ device=self.last_data,
+ flows=flows,
+ groups=groups)
+ # TODO place to feed back completion
+
+ elif self.accepts_add_remove_flow_updates:
+ raise NotImplementedError()
+
+ else:
+ raise NotImplementedError()
+
+ ## <======================= GROUP TABLE UPDATE HANDLING ===================
+
+ @inlineCallbacks
+ def _group_table_updated(self, groups):
+ log.debug('group-table-updated',
+ logical_device_id=self.last_data.id,
+ flow_groups=groups)
+
+ # if device accepts bulk flow update, lets just call that
+ if self.device_type.accepts_bulk_flow_update:
+ flows = self.flows_proxy.get('/') # gather flows
+ yield self.adapter_agent.update_flows_bulk(
+ device=self.last_data,
+ flows=flows,
+ groups=groups)
+ # TODO place to feed back completion
+
+ elif self.accepts_add_remove_flow_updates:
+ raise NotImplementedError()
+
+ else:
+ raise NotImplementedError()
+
diff --git a/voltha/core/logical_device_agent.py b/voltha/core/logical_device_agent.py
index 0ada073..5b629ca 100644
--- a/voltha/core/logical_device_agent.py
+++ b/voltha/core/logical_device_agent.py
@@ -46,6 +46,7 @@
self.core = core
self.grpc_server = registry('grpc_server')
self.logical_device_id = logical_device.id
+
self.root_proxy = core.get_proxy('/')
self.flows_proxy = core.get_proxy(
'/logical_devices/{}/flows'.format(logical_device.id))
@@ -53,6 +54,7 @@
'/logical_devices/{}/flow_groups'.format(logical_device.id))
self.self_proxy = core.get_proxy(
'/logical_devices/{}'.format(logical_device.id))
+
self.flows_proxy.register_callback(
CallbackType.POST_UPDATE, self._flow_table_updated)
self.groups_proxy.register_callback(
@@ -69,6 +71,14 @@
def stop(self):
log.debug('stopping')
+ self.flows_proxy.unregister_callback(
+ CallbackType.POST_UPDATE, self._flow_table_updated)
+ self.groups_proxy.unregister_callback(
+ CallbackType.POST_UPDATE, self._group_table_updated)
+ self.self_proxy.unregister_callback(
+ CallbackType.POST_ADD, self._port_list_updated)
+ self.self_proxy.unregister_callback(
+ CallbackType.POST_REMOVE, self._port_list_updated)
log.info('stopped')
def announce_flows_deleted(self, flows):
diff --git a/voltha/protos/device.proto b/voltha/protos/device.proto
index 476cbc2..0a1e9b2 100644
--- a/voltha/protos/device.proto
+++ b/voltha/protos/device.proto
@@ -16,8 +16,10 @@
// Name of the adapter that handles device type
string adapter = 2;
- // TODO
- // ...
+ // Capabilitities
+
+ bool accepts_bulk_flow_update = 3;
+ bool accepts_add_remove_flow_updates = 4;
}
@@ -92,7 +94,7 @@
// Device contact on vlan (if 0, no vlan)
uint32 vlan = 12;
- message ProxyDevice {
+ message ProxyAddress {
string device_id = 1; // Which device to use as proxy to this device
uint32 channel_id = 2; // Sub-address within proxy device
};
@@ -108,7 +110,7 @@
// ("xxxx:xxxx:xxxx:xxxx:xxxx:xxxx:xxxx:xxxx")
string ipv6_address = 15;
- ProxyDevice proxy_device = 19;
+ ProxyAddress proxy_device = 19;
};
AdminState.AdminState admin_state = 16;