Define and wire in bulk flow update adapter API

This adds support for defining bulk or incremental
flow update capability on a per device type basis
and implements the callout mechanism.

It also defines 5 new API entries for proxying
messages from a child-device adapter (e.g. ONU adapter)
to the actual device as well as the APIs for registering
and receiving async messages from a device via a
parent device acting as a proxy. The implementation
of this is left for the next PR.

Change-Id: Ic48a458c170083842b6bc674d675b5b60c0827f6
diff --git a/voltha/adapters/interface.py b/voltha/adapters/interface.py
index feec225..f9426c6 100644
--- a/voltha/adapters/interface.py
+++ b/voltha/adapters/interface.py
@@ -92,9 +92,55 @@
     def deactivate_device(device):
         """
         Called if the device is to be deactivate based on a NBI call.
+        :param device: A Voltha.Device object.
         :return: (Deferred) Shall be fired to acknowledge deactivation.
         """
 
+    def update_flows_bulk(device, flows, groups):
+        """
+        Called after any flow table change, but only if the device supports
+        bulk mode, which is expressed by the 'accepts_bulk_flow_update'
+        capability attribute of the device type.
+        :param device: A Voltha.Device object.
+        :param flows: An openflow_v13.Flows object
+        :param groups: An  openflow_v13.Flows object
+        :return: (Deferred or None)
+        """
+
+    def update_flows_incrementally(device, flow_changes, group_changes):
+        """
+        [This mode is not supported yet.]
+        :param device: A Voltha.Device object.
+        :param flow_changes:
+        :param group_changes:
+        :return:
+        """
+
+    def send_proxied_message(proxy_address, msg):
+        """
+        Forward a msg to a child device of device, addressed by the given
+        proxy_address=Device.ProxyAddress().
+        :param proxy_address: Address info for the parent device
+         to route the message to the child device. This was given to the
+         child device by the parent device at the creation of the child
+         device.
+        :param msg: (str) The actual message to send.
+        :return: (Deferred(None) or None) The return of this method should
+         indicate that the message was successfully *sent*.
+        """
+
+    def receive_proxied_message(proxy_address, msg):
+        """
+        Pass an async message (arrived via a proxy) to this device.
+        :param proxy_address: Address info for the parent device
+         to route the message to the child device. This was given to the
+         child device by the parent device at the creation of the child
+         device. Note this is the proxy_address with which the adapter
+         had to register prior to receiving proxied messages.
+        :param msg: (str) The actual message received.
+        :return: None
+        """
+
     # TODO work in progress
     # ...
 
@@ -106,7 +152,7 @@
     toward Voltha's CORE via the APIs defined here.
     """
 
-    def get_device(selfdevice_id):
+    def get_device(device_id):
         # TODO add doc
         """"""
 
@@ -131,12 +177,54 @@
         """"""
 
     def child_device_detected(parent_device_id,
+                              parent_port_no,
                               child_device_type,
                               child_device_address_kw):
         # TODO add doc
         """"""
 
-    # TODO work in progress
-    pass
+    def send_proxied_message(proxy_address, msg):
+        """
+        Forward a msg to a child device of device, addressed by the given
+        proxy_address=Device.ProxyAddress().
+        :param proxy_address: Address info for the parent device
+         to route the message to the child device. This was given to the
+         child device by the parent device at the creation of the child
+         device.
+        :param msg: (str) The actual message to send.
+        :return: (Deferred(None) or None) The return of this method should
+         indicate that the message was successfully *sent*.
+        """
+
+    def receive_proxied_message(proxy_address, msg):
+        """
+        Pass an async message (arrived via a proxy) to this device.
+        :param proxy_address: Address info for the parent device
+         to route the message to the child device. This was given to the
+         child device by the parent device at the creation of the child
+         device. Note this is the proxy_address with which the adapter
+         had to register prior to receiving proxied messages.
+        :param msg: (str) The actual message received.
+        :return: None
+        """
+
+    def register_for_proxied_messages(proxy_address):
+        """
+        A child device adapter can use this to indicate its intent to
+        receive async messages sent via a parent device. Example: an
+        ONU adapter can use this to register for OMCI messages received
+        via the OLT and the OLT adapter.
+        :param child_device_address: Address info that was given to the
+         child device by the parent device at the creation of the child
+         device. Its uniqueness acts as a router information for the
+         registration.
+        :return: None
+        """
+
+    def unregister_for_proxied_messages(proxy_address):
+        """
+        Cancel a previous registration
+        :return:
+        """
 
 
diff --git a/voltha/adapters/simulated_olt/simulated_olt.py b/voltha/adapters/simulated_olt/simulated_olt.py
index e358b26..c6a6e11 100644
--- a/voltha/adapters/simulated_olt/simulated_olt.py
+++ b/voltha/adapters/simulated_olt/simulated_olt.py
@@ -45,6 +45,14 @@
 
     name = 'simulated_olt'
 
+    supported_device_types = [
+        DeviceType(
+            id='simulated_olt',
+            adapter=name,
+            accepts_bulk_flow_update=True
+        )
+    ]
+
     def __init__(self, adapter_agent, config):
         self.adapter_agent = adapter_agent
         self.config = config
@@ -69,10 +77,7 @@
         return self.descriptor
 
     def device_types(self):
-        return DeviceTypes(items=[
-            DeviceType(id='simulated_olt', adapter=self.name),
-            # DeviceType(id='simulated_onu', adapter=self.name)
-        ])
+        return DeviceTypes(items=self.supported_device_types)
 
     def health(self):
         return HealthStatus(state=HealthStatus.HealthState.HEALTHY)
@@ -317,7 +322,7 @@
                 parent_port_no=1,
                 child_device_type='simulated_onu',
                 child_device_address_kw=dict(
-                    proxy_device=Device.ProxyDevice(
+                    proxy_device=Device.ProxyAddress(
                         device_id=device.id,
                         channel_id=vlan_id
                     ),
@@ -336,3 +341,15 @@
         vlan_id = seq + 100
         return gemport, vlan_id
 
+    def update_flows_bulk(self, device, flows, groups):
+        log.debug('bulk-flow-update', device_id=device.id,
+                  flows=flows, groups=groups)
+
+    def update_flows_incrementally(self, device, flow_changes, group_changes):
+        raise NotImplementedError()
+
+    def send_proxied_message(self, proxy_address, msg):
+        raise NotImplementedError()
+
+    def receive_proxied_message(self, proxy_address, msg):
+        raise NotImplementedError()
diff --git a/voltha/adapters/simulated_onu/simulated_onu.py b/voltha/adapters/simulated_onu/simulated_onu.py
index ff8a01a..38cb2c5 100644
--- a/voltha/adapters/simulated_onu/simulated_onu.py
+++ b/voltha/adapters/simulated_onu/simulated_onu.py
@@ -45,6 +45,14 @@
 
     name = 'simulated_onu'
 
+    supported_device_types = [
+        DeviceType(
+            id='simulated_onu',
+            adapter=name,
+            accepts_bulk_flow_update=True
+        )
+    ]
+
     def __init__(self, adapter_agent, config):
         self.adapter_agent = adapter_agent
         self.config = config
@@ -67,9 +75,7 @@
         return self.descriptor
 
     def device_types(self):
-        return DeviceTypes(items=[
-            DeviceType(id='simulated_onu', adapter=self.name)
-        ])
+        return DeviceTypes(items=self.supported_device_types)
 
     def health(self):
         return HealthStatus(state=HealthStatus.HealthState.HEALTHY)
@@ -83,8 +89,8 @@
         return device
 
     def abandon_device(self, device):
-        raise NotImplementedError(0
-                                  )
+        raise NotImplementedError()
+
     def deactivate_device(self, device):
         raise NotImplementedError()
 
@@ -168,3 +174,16 @@
         device = self.adapter_agent.get_device(device.id)
         device.oper_status = OperStatus.ACTIVE
         self.adapter_agent.update_device(device)
+
+    def update_flows_bulk(self, device, flows, groups):
+        log.debug('bulk-flow-update', device_id=device.id,
+                  flows=flows, groups=groups)
+
+    def update_flows_incrementally(self, device, flow_changes, group_changes):
+        raise NotImplementedError()
+
+    def send_proxied_message(self, proxy_address, msg):
+        raise NotImplementedError()
+
+    def receive_proxied_message(self, proxy_address, msg):
+        raise NotImplementedError()
diff --git a/voltha/core/adapter_agent.py b/voltha/core/adapter_agent.py
index 319f084..5659f85 100644
--- a/voltha/core/adapter_agent.py
+++ b/voltha/core/adapter_agent.py
@@ -127,6 +127,13 @@
     def deactivate_device(self, device):
         return self.adapter.deactivate_device(device)
 
+    def update_flows_bulk(self, device, flows, groups):
+        return self.adapter.update_flows_bulk(device, flows, groups)
+
+    def update_flows_incrementally(self, device, flow_changes, group_changes):
+        return self.update_flows_incrementally(
+            device, flow_changes, group_changes)
+
     # ~~~~~~~~~~~~~~~~~~~ Adapter-Facing Service ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
     def get_device(self, device_id):
diff --git a/voltha/core/device_agent.py b/voltha/core/device_agent.py
index 150a9fc..17ec620 100644
--- a/voltha/core/device_agent.py
+++ b/voltha/core/device_agent.py
@@ -35,14 +35,31 @@
 class DeviceAgent(object):
 
     def __init__(self, core, initial_data):
+
         self.core = core
         self._tmp_initial_data = initial_data
+        self.last_data = None
+
         self.proxy = core.get_proxy('/devices/{}'.format(initial_data.id))
+        self.flows_proxy = core.get_proxy(
+            '/devices/{}/flows'.format(initial_data.id))
+        self.groups_proxy = core.get_proxy(
+            '/devices/{}/flow_groups'.format(initial_data.id))
+
         self.proxy.register_callback(
             CallbackType.PRE_UPDATE, self._validate_update)
         self.proxy.register_callback(
             CallbackType.POST_UPDATE, self._process_update)
-        self.last_data = None
+
+        self.flows_proxy.register_callback(
+            CallbackType.POST_UPDATE, self._flow_table_updated)
+        self.groups_proxy.register_callback(
+            CallbackType.POST_UPDATE, self._group_table_updated)
+
+        # to know device capabilities
+        self.device_type = core.get_proxy(
+            '/device_types/{}'.format(initial_data.type)).get()
+
         self.adapter_agent = None
 
     @inlineCallbacks
@@ -172,3 +189,49 @@
         (AdminState.DISABLED, AdminState.ENABLED): _reenable_device
 
     }
+
+    ## <======================= FLOW TABLE UPDATE HANDLING ====================
+
+    @inlineCallbacks
+    def _flow_table_updated(self, flows):
+        log.debug('flow-table-updated',
+                  logical_device_id=self.last_data.id, flows=flows)
+
+        # if device accepts bulk flow update, lets just call that
+        if self.device_type.accepts_bulk_flow_update:
+            groups = self.groups_proxy.get('/') # gather flow groups
+            yield self.adapter_agent.update_flows_bulk(
+                device=self.last_data,
+                flows=flows,
+                groups=groups)
+            # TODO place to feed back completion
+
+        elif self.accepts_add_remove_flow_updates:
+            raise NotImplementedError()
+
+        else:
+            raise NotImplementedError()
+
+    ## <======================= GROUP TABLE UPDATE HANDLING ===================
+
+    @inlineCallbacks
+    def _group_table_updated(self, groups):
+        log.debug('group-table-updated',
+                  logical_device_id=self.last_data.id,
+                  flow_groups=groups)
+
+        # if device accepts bulk flow update, lets just call that
+        if self.device_type.accepts_bulk_flow_update:
+            flows = self.flows_proxy.get('/')  # gather flows
+            yield self.adapter_agent.update_flows_bulk(
+                device=self.last_data,
+                flows=flows,
+                groups=groups)
+            # TODO place to feed back completion
+
+        elif self.accepts_add_remove_flow_updates:
+            raise NotImplementedError()
+
+        else:
+            raise NotImplementedError()
+
diff --git a/voltha/core/logical_device_agent.py b/voltha/core/logical_device_agent.py
index 0ada073..5b629ca 100644
--- a/voltha/core/logical_device_agent.py
+++ b/voltha/core/logical_device_agent.py
@@ -46,6 +46,7 @@
         self.core = core
         self.grpc_server = registry('grpc_server')
         self.logical_device_id = logical_device.id
+
         self.root_proxy = core.get_proxy('/')
         self.flows_proxy = core.get_proxy(
             '/logical_devices/{}/flows'.format(logical_device.id))
@@ -53,6 +54,7 @@
             '/logical_devices/{}/flow_groups'.format(logical_device.id))
         self.self_proxy = core.get_proxy(
             '/logical_devices/{}'.format(logical_device.id))
+
         self.flows_proxy.register_callback(
             CallbackType.POST_UPDATE, self._flow_table_updated)
         self.groups_proxy.register_callback(
@@ -69,6 +71,14 @@
 
     def stop(self):
         log.debug('stopping')
+        self.flows_proxy.unregister_callback(
+            CallbackType.POST_UPDATE, self._flow_table_updated)
+        self.groups_proxy.unregister_callback(
+            CallbackType.POST_UPDATE, self._group_table_updated)
+        self.self_proxy.unregister_callback(
+            CallbackType.POST_ADD, self._port_list_updated)
+        self.self_proxy.unregister_callback(
+            CallbackType.POST_REMOVE, self._port_list_updated)
         log.info('stopped')
 
     def announce_flows_deleted(self, flows):
diff --git a/voltha/protos/device.proto b/voltha/protos/device.proto
index 476cbc2..0a1e9b2 100644
--- a/voltha/protos/device.proto
+++ b/voltha/protos/device.proto
@@ -16,8 +16,10 @@
     // Name of the adapter that handles device type
     string adapter = 2;
 
-    // TODO
-    // ...
+    // Capabilitities
+
+    bool accepts_bulk_flow_update = 3;
+    bool accepts_add_remove_flow_updates = 4;
 
 }
 
@@ -92,7 +94,7 @@
     // Device contact on vlan (if 0, no vlan)
     uint32 vlan = 12;
 
-    message ProxyDevice {
+    message ProxyAddress {
         string device_id = 1;  // Which device to use as proxy to this device
         uint32 channel_id = 2;  // Sub-address within proxy device
     };
@@ -108,7 +110,7 @@
         // ("xxxx:xxxx:xxxx:xxxx:xxxx:xxxx:xxxx:xxxx")
         string ipv6_address = 15;
 
-        ProxyDevice proxy_device = 19;
+        ProxyAddress proxy_device = 19;
     };
 
     AdminState.AdminState admin_state = 16;