[VOL-909] Add support to add and remove individual flows in voltha
core.

This update implements the logic in the voltha core to populate
only newly added flows or newly deleted flows to the adapters.  In
order for adapters to take advantage of that capability, the adapter
needs to set the accepts_add_remove_flow_updates flag to True when
registering their supported device types and also implements the
remove_from_flow_table() and add_to_flow_table() methods. Example
of these can be found in the ponsim adapter.  The above methods in
the ponsim adapters (one and olt) are not fully implemented as that
would require a change in the actual ponsim devices (a Jira will be
raised to follow-up this work).  Also, since these methods are not
implemented in the ponsim adapters then the accepts_add_remove_flow_updates
flag is set to False for now (the code was tested locally by setting that
flag to True and ensure the implemented logic works).

Additional work will eventually be required to optimized the flow decomposer
logic to decompose only newly added or deleted flows.  This work will
be performed as part of the voltha 2.0 effort.

 Please enter the commit message for your changes. Lines starting

Change-Id: I567771610f65aa0dde30eee9013c11218d1b9158
diff --git a/voltha/adapters/iadapter.py b/voltha/adapters/iadapter.py
index d8b6718..db08945 100644
--- a/voltha/adapters/iadapter.py
+++ b/voltha/adapters/iadapter.py
@@ -35,7 +35,10 @@
 
 @implementer(IAdapterInterface)
 class IAdapter(object):
-    def __init__(self, adapter_agent, config, device_handler_class, name, vendor, version, device_type, vendor_id):
+    def __init__(self, adapter_agent, config, device_handler_class, name,
+                 vendor, version, device_type, vendor_id,
+                 accepts_bulk_flow_update=True,
+                 accepts_add_remove_flow_updates=False):
         log.debug('Initializing adapter: {} {} {}'.format(vendor, name, version))
         self.adapter_agent = adapter_agent
         self.config = config
@@ -45,7 +48,8 @@
                 id=device_type,
                 vendor_id=vendor_id,
                 adapter=name,
-                accepts_bulk_flow_update=True
+                accepts_bulk_flow_update=accepts_bulk_flow_update,
+                accepts_add_remove_flow_updates=accepts_add_remove_flow_updates
             )
         ]
         self.descriptor = Adapter(
@@ -138,7 +142,20 @@
         return handler.update_flow_table(flows.items)
 
     def update_flows_incrementally(self, device, flow_changes, group_changes):
-        raise NotImplementedError()
+        log.info('incremental-flow-update', device_id=device.id,
+                 flows=flow_changes, groups=group_changes)
+        # For now, there is no support for group changes
+        assert len(group_changes.to_add.items) == 0
+        assert len(group_changes.to_remove.items) == 0
+
+        handler = self.devices_handlers[device.id]
+        # Remove flows
+        if len(flow_changes.to_remove.items) != 0:
+            handler.remove_from_flow_table(flow_changes.to_remove.items)
+
+        # Add flows
+        if len(flow_changes.to_add.items) != 0:
+            handler.add_to_flow_table(flow_changes.to_add.items)
 
     def update_pm_config(self, device, pm_config):
         log.info("adapter-update-pm-config", device=device,
@@ -245,15 +262,20 @@
 
 
 class OltAdapter(IAdapter):
-    def __init__(self, adapter_agent, config, device_handler_class, name, vendor, version, device_type):
-        super(OltAdapter, self).__init__(adapter_agent,
-                                         config,
-                                         device_handler_class,
-                                         name,
-                                         vendor,
-                                         version,
-                                         device_type,
-                                         None)
+    def __init__(self, adapter_agent, config, device_handler_class, name,
+                 vendor, version, device_type,
+                 accepts_bulk_flow_update=True,
+                 accepts_add_remove_flow_updates=False):
+        super(OltAdapter, self).__init__(adapter_agent=adapter_agent,
+                                         config=config,
+                                         device_handler_class=device_handler_class,
+                                         name=name,
+                                         vendor=vendor,
+                                         version=version,
+                                         device_type=device_type,
+                                         vendor_id=None,
+                                         accepts_bulk_flow_update=accepts_bulk_flow_update,
+                                         accepts_add_remove_flow_updates=accepts_add_remove_flow_updates)
         self.logical_device_id_to_root_device_id = dict()
 
     def reconcile_device(self, device):
@@ -297,15 +319,20 @@
 
 
 class OnuAdapter(IAdapter):
-    def __init__(self, adapter_agent, config, device_handler_class, name, vendor, version, device_type, vendor_id):
-        super(OnuAdapter, self).__init__(adapter_agent,
-                                         config,
-                                         device_handler_class,
-                                         name,
-                                         vendor,
-                                         version,
-                                         device_type,
-                                         vendor_id)
+    def __init__(self, adapter_agent, config, device_handler_class, name,
+                 vendor, version, device_type, vendor_id, accepts_bulk_flow_update=True,
+                 accepts_add_remove_flow_updates=False):
+        super(OnuAdapter, self).__init__(adapter_agent=adapter_agent,
+                                         config=config,
+                                         device_handler_class=device_handler_class,
+                                         name=name,
+                                         vendor=vendor,
+                                         version=version,
+                                         device_type=device_type,
+                                         vendor_id=vendor_id,
+                                         accepts_bulk_flow_update=accepts_bulk_flow_update,
+                                         accepts_add_remove_flow_updates=accepts_add_remove_flow_updates
+                                         )
 
     def reconcile_device(self, device):
         self.devices_handlers[device.id] = self.device_handler_class(self, device.id)
diff --git a/voltha/adapters/interface.py b/voltha/adapters/interface.py
index f8c6f28..af0acbf 100644
--- a/voltha/adapters/interface.py
+++ b/voltha/adapters/interface.py
@@ -223,11 +223,13 @@
 
     def update_flows_incrementally(device, flow_changes, group_changes):
         """
-        [This mode is not supported yet.]
+        Called after a flow table update, but only if the device supports
+        non-bulk mode, which is expressed by the 'accepts_add_remove_flow_updates'
+        capability attribute of the device type.
         :param device: A Voltha.Device object.
-        :param flow_changes:
-        :param group_changes:
-        :return:
+        :param flow_changes: An openflow_v13.FlowChanges object
+        :param group_changes: An openflow_v13.FlowGroupChanges object
+        :return: (Deferred or None)
         """
 
     def update_pm_config(device, pm_configs):
diff --git a/voltha/adapters/ponsim_olt/ponsim_olt.py b/voltha/adapters/ponsim_olt/ponsim_olt.py
index 1e0c464..dc6b297 100644
--- a/voltha/adapters/ponsim_olt/ponsim_olt.py
+++ b/voltha/adapters/ponsim_olt/ponsim_olt.py
@@ -51,7 +51,7 @@
 from voltha.registry import registry
 
 from voltha.protos.bbf_fiber_base_pb2 import \
-    ChannelgroupConfig, ChannelpartitionConfig, ChannelpairConfig,\
+    ChannelgroupConfig, ChannelpartitionConfig, ChannelpairConfig, \
     ChannelterminationConfig, OntaniConfig, VOntaniConfig, VEnetConfig
 from voltha.protos.bbf_fiber_traffic_descriptor_profile_body_pb2 import \
     TrafficDescriptorProfileData
@@ -161,6 +161,7 @@
                  device_id=self.device.id)
         self.lc.stop()
 
+
 class AdapterAlarms:
     def __init__(self, adapter, device):
         self.adapter = adapter
@@ -195,6 +196,7 @@
         except Exception as e:
             log.exception('failed-to-send-alarm', e=e)
 
+
 class PonSimOltAdapter(OltAdapter):
     def __init__(self, adapter_agent, config):
         super(PonSimOltAdapter, self).__init__(adapter_agent=adapter_agent,
@@ -203,7 +205,9 @@
                                                name='ponsim_olt',
                                                vendor='Voltha project',
                                                version='0.4',
-                                               device_type='ponsim_olt')
+                                               device_type='ponsim_olt',
+                                               accepts_bulk_flow_update=True,
+                                               accepts_add_remove_flow_updates=False)
 
     def update_pm_config(self, device, pm_config):
         log.info("adapter-update-pm-config", device=device,
@@ -292,6 +296,7 @@
             self.devices_handlers[device.id].remove_multicast_distribution_set(
                 data)
 
+
 class PonSimOltHandler(object):
     xpon_ponsim_olt_itfs = {
         'create_interface': {
@@ -339,7 +344,7 @@
         'remove_multicast_distribution_set': {
             'method_name': 'RemoveMulticastDistributionSet',
             'log': 'remove-multicast-distribution-set-data'},
-                            }
+    }
 
     def __init__(self, adapter, device_id):
         self.adapter = adapter
@@ -367,23 +372,28 @@
 
             # read in certificate
             try:
-               with open('/voltha/pki/voltha-CA.pem') as f:
-                  trusted_certs = f.read()
+                with open('/voltha/pki/voltha-CA.pem') as f:
+                    trusted_certs = f.read()
 
-               with open('/voltha/pki/voltha.crt') as f:
-                  client_cert = f.read()
+                with open('/voltha/pki/voltha.crt') as f:
+                    client_cert = f.read()
 
-               with open('/voltha/pki/voltha.key') as f:
-                  client_key = f.read()
+                with open('/voltha/pki/voltha.key') as f:
+                    client_key = f.read()
             except Exception as e:
-               log.error('failed-to-read-cert-keys', reason=e)
+                log.error('failed-to-read-cert-keys', reason=e)
 
             # create credentials
-            credentials = grpc.ssl_channel_credentials( root_certificates=trusted_certs, private_key=client_key, certificate_chain=client_cert)
+            credentials = grpc.ssl_channel_credentials(
+                root_certificates=trusted_certs, private_key=client_key,
+                certificate_chain=client_cert)
 
             # create channel using ssl credentials
-            my_server_host_override_string = "ABCD" # Server's CN Name, Ugly but no other Choice.
-            self.channel = grpc.secure_channel(device.host_and_port, credentials, options=(('grpc.ssl_target_name_override', my_server_host_override_string,),))
+            my_server_host_override_string = "ABCD"  # Server's CN Name, Ugly but no other Choice.
+            self.channel = grpc.secure_channel(device.host_and_port,
+                                               credentials, options=((
+                                                                         'grpc.ssl_target_name_override',
+                                                                         my_server_host_override_string,),))
 
         return self.channel
 
@@ -460,17 +470,18 @@
             desc=ofp_desc(
                 hw_desc='simualted pon',
                 sw_desc='simualted pon',
-                serial_num=uuid4().hex,
+                # serial_num=uuid4().hex,
+                serial_num="9b7cfd85441b407c87ee3261df7d4818",
                 dp_desc='n/a'
             ),
             switch_features=ofp_switch_features(
                 n_buffers=256,  # TODO fake for now
                 n_tables=2,  # TODO ditto
                 capabilities=(  # TODO and ditto
-                    OFPC_FLOW_STATS
-                    | OFPC_TABLE_STATS
-                    | OFPC_PORT_STATS
-                    | OFPC_GROUP_STATS
+                        OFPC_FLOW_STATS
+                        | OFPC_TABLE_STATS
+                        | OFPC_PORT_STATS
+                        | OFPC_GROUP_STATS
                 )
             ),
             root_device_id=device.id
@@ -607,8 +618,8 @@
                 cvid = inner_shim.vlan
                 logical_port = cvid
                 popped_frame = (
-                    Ether(src=pkt.src, dst=pkt.dst, type=inner_shim.type) /
-                    inner_shim.payload
+                        Ether(src=pkt.src, dst=pkt.dst, type=inner_shim.type) /
+                        inner_shim.payload
                 )
                 kw = dict(
                     logical_device_id=self.logical_device_id,
@@ -635,11 +646,12 @@
 
         try:
             for frame in self.frames:
-                self.log.info('received-grpc-frame', frame_len=len(frame.payload))
+                self.log.info('received-grpc-frame',
+                              frame_len=len(frame.payload))
                 self._rcv_frame(frame.payload)
 
         except _Rendezvous, e:
-            log.warn('grpc-connection-lost',message=e.message)
+            log.warn('grpc-connection-lost', message=e.message)
 
         self.log.info('stopped-receiving-grpc-frames')
 
@@ -657,6 +669,18 @@
         ))
         self.log.info('success')
 
+    def remove_from_flow_table(self, flows):
+        self.log.debug('remove-from-flow-table', flows=flows)
+        # TODO: Update PONSIM code to accept incremental flow changes
+        # Once completed, the accepts_add_remove_flow_updates for this
+        # device type can be set to True
+
+    def add_to_flow_table(self, flows):
+        self.log.debug('add-to-flow-table', flows=flows)
+        # TODO: Update PONSIM code to accept incremental flow changes
+        # Once completed, the accepts_add_remove_flow_updates for this
+        # device type can be set to True
+
     def update_pm_config(self, device, pm_config):
         log.info("handler-update-pm-config", device=device,
                  pm_config=pm_config)
@@ -675,10 +699,10 @@
                       msg=hexify(msg))
         pkt = Ether(msg)
         out_pkt = (
-            Ether(src=pkt.src, dst=pkt.dst) /
-            Dot1Q(vlan=4000) /
-            Dot1Q(vlan=egress_port, type=pkt.type) /
-            pkt.payload
+                Ether(src=pkt.src, dst=pkt.dst) /
+                Dot1Q(vlan=4000) /
+                Dot1Q(vlan=egress_port, type=pkt.type) /
+                pkt.payload
         )
 
         if self.ponsim_comm == 'grpc':
@@ -690,7 +714,6 @@
             # send over frameio
             self.io_port.send(str(out_pkt))
 
-
     @inlineCallbacks
     def reboot(self):
         self.log.info('rebooting', device_id=self.device_id)
@@ -818,10 +841,10 @@
                 n_buffers=256,  # TODO fake for now
                 n_tables=2,  # TODO ditto
                 capabilities=(  # TODO and ditto
-                    OFPC_FLOW_STATS
-                    | OFPC_TABLE_STATS
-                    | OFPC_PORT_STATS
-                    | OFPC_GROUP_STATS
+                        OFPC_FLOW_STATS
+                        | OFPC_TABLE_STATS
+                        | OFPC_PORT_STATS
+                        | OFPC_GROUP_STATS
                 )
             ),
             root_device_id=device.id
@@ -928,7 +951,6 @@
     def stop_kpi_collection(self):
         self.pm_metrics.stop_collector()
 
-
     def get_interface_config(self, data):
         interfaceConfig = InterfaceConfig()
         if isinstance(data, ChannelgroupConfig):
@@ -965,7 +987,7 @@
         if interfaceConfig is not None:
             self.log.info(
                 'forwarding-{}-request-to-olt-for-interface-type'
-                .format(self.xpon_ponsim_olt_itfs[method_name]['log']),
+                    .format(self.xpon_ponsim_olt_itfs[method_name]['log']),
                 interface_type=type(data))
             stub = ponsim_pb2.XPonSimStub(self.get_channel())
             _method = getattr(
@@ -981,64 +1003,64 @@
             self.log.info('success')
 
     def create_interface(self, data):
-       _method_name = sys._getframe().f_code.co_name
-       self.xpon_ponsim_olt_interface(_method_name, data);
+        _method_name = sys._getframe().f_code.co_name
+        self.xpon_ponsim_olt_interface(_method_name, data);
 
     def update_interface(self, data):
-       _method_name = sys._getframe().f_code.co_name
-       self.xpon_ponsim_olt_interface(_method_name, data);
+        _method_name = sys._getframe().f_code.co_name
+        self.xpon_ponsim_olt_interface(_method_name, data);
 
     def remove_interface(self, data):
-       _method_name = sys._getframe().f_code.co_name
-       self.xpon_ponsim_olt_interface(_method_name, data);
+        _method_name = sys._getframe().f_code.co_name
+        self.xpon_ponsim_olt_interface(_method_name, data);
 
     def create_tcont(self, tcont_data, traffic_descriptor_data):
-       _method_name = sys._getframe().f_code.co_name
-       self.xpon_ponsim_olt_interface(_method_name, tcont_data,
-                                      traffic_descriptor_data);
+        _method_name = sys._getframe().f_code.co_name
+        self.xpon_ponsim_olt_interface(_method_name, tcont_data,
+                                       traffic_descriptor_data);
 
     def update_tcont(self, tcont_data, traffic_descriptor_data):
-       _method_name = sys._getframe().f_code.co_name
-       self.xpon_ponsim_olt_interface(_method_name, tcont_data,
-                                      traffic_descriptor_data);
+        _method_name = sys._getframe().f_code.co_name
+        self.xpon_ponsim_olt_interface(_method_name, tcont_data,
+                                       traffic_descriptor_data);
 
     def remove_tcont(self, tcont_data, traffic_descriptor_data):
-       _method_name = sys._getframe().f_code.co_name
-       self.xpon_ponsim_olt_interface(_method_name, tcont_data,
-                                      traffic_descriptor_data);
+        _method_name = sys._getframe().f_code.co_name
+        self.xpon_ponsim_olt_interface(_method_name, tcont_data,
+                                       traffic_descriptor_data);
 
     def create_gemport(self, data):
-       _method_name = sys._getframe().f_code.co_name
-       self.xpon_ponsim_olt_interface(_method_name, data);
+        _method_name = sys._getframe().f_code.co_name
+        self.xpon_ponsim_olt_interface(_method_name, data);
 
     def update_gemport(self, data):
-       _method_name = sys._getframe().f_code.co_name
-       self.xpon_ponsim_olt_interface(_method_name, data);
+        _method_name = sys._getframe().f_code.co_name
+        self.xpon_ponsim_olt_interface(_method_name, data);
 
     def remove_gemport(self, data):
-       _method_name = sys._getframe().f_code.co_name
-       self.xpon_ponsim_olt_interface(_method_name, data);
+        _method_name = sys._getframe().f_code.co_name
+        self.xpon_ponsim_olt_interface(_method_name, data);
 
     def create_multicast_gemport(self, data):
-       _method_name = sys._getframe().f_code.co_name
-       self.xpon_ponsim_olt_interface(_method_name, data);
+        _method_name = sys._getframe().f_code.co_name
+        self.xpon_ponsim_olt_interface(_method_name, data);
 
     def update_multicast_gemport(self, data):
-       _method_name = sys._getframe().f_code.co_name
-       self.xpon_ponsim_olt_interface(_method_name, data);
+        _method_name = sys._getframe().f_code.co_name
+        self.xpon_ponsim_olt_interface(_method_name, data);
 
     def remove_multicast_gemport(self, data):
-       _method_name = sys._getframe().f_code.co_name
-       self.xpon_ponsim_olt_interface(_method_name, data);
+        _method_name = sys._getframe().f_code.co_name
+        self.xpon_ponsim_olt_interface(_method_name, data);
 
     def create_multicast_distribution_set(self, data):
-       _method_name = sys._getframe().f_code.co_name
-       self.xpon_ponsim_olt_interface(_method_name, data);
+        _method_name = sys._getframe().f_code.co_name
+        self.xpon_ponsim_olt_interface(_method_name, data);
 
     def update_multicast_distribution_set(self, data):
-       _method_name = sys._getframe().f_code.co_name
-       self.xpon_ponsim_olt_interface(_method_name, data);
+        _method_name = sys._getframe().f_code.co_name
+        self.xpon_ponsim_olt_interface(_method_name, data);
 
     def remove_multicast_distribution_set(self, data):
-       _method_name = sys._getframe().f_code.co_name
-       self.xpon_ponsim_olt_interface(_method_name, data);
+        _method_name = sys._getframe().f_code.co_name
+        self.xpon_ponsim_olt_interface(_method_name, data);
diff --git a/voltha/adapters/ponsim_onu/ponsim_onu.py b/voltha/adapters/ponsim_onu/ponsim_onu.py
index 5467aad..7f770e1 100644
--- a/voltha/adapters/ponsim_onu/ponsim_onu.py
+++ b/voltha/adapters/ponsim_onu/ponsim_onu.py
@@ -79,20 +79,23 @@
         'log': 'update-multicast-distribution-set-data'},
     'remove_multicast_distribution_set': {
         'log': 'remove-multicast-distribution-set-data'},
-                        }
+}
+
 
 class PonSimOnuAdapter(OnuAdapter):
     def __init__(self, adapter_agent, config):
-        #DeviceType of ONU should be same as VENDOR ID of ONU Serial Number as specified by standard
-        #requires for identifying correct adapter or ranged ONU
+        # DeviceType of ONU should be same as VENDOR ID of ONU Serial Number as specified by standard
+        # requires for identifying correct adapter or ranged ONU
         super(PonSimOnuAdapter, self).__init__(adapter_agent=adapter_agent,
                                                config=config,
-                                               device_handler_class = PonSimOnuHandler,
+                                               device_handler_class=PonSimOnuHandler,
                                                name='ponsim_onu',
                                                vendor='Voltha project',
                                                version='0.4',
                                                device_type='ponsim_onu',
-                                               vendor_id='PSMO')
+                                               vendor_id='PSMO',
+                                               accepts_bulk_flow_update=True,
+                                               accepts_add_remove_flow_updates=False)
 
     def xpon_ponsim_onu_adapter_interface(self, method_name, device, data,
                                           data2=None):
@@ -108,67 +111,71 @@
                     _method(data)
 
     def create_interface(self, device, data):
-       _method_name = sys._getframe().f_code.co_name
-       self.xpon_ponsim_onu_adapter_interface(_method_name, device, data)
+        _method_name = sys._getframe().f_code.co_name
+        self.xpon_ponsim_onu_adapter_interface(_method_name, device, data)
 
     def update_interface(self, device, data):
-       _method_name = sys._getframe().f_code.co_name
-       self.xpon_ponsim_onu_adapter_interface(_method_name, device, data)
+        _method_name = sys._getframe().f_code.co_name
+        self.xpon_ponsim_onu_adapter_interface(_method_name, device, data)
 
     def remove_interface(self, device, data):
-       _method_name = sys._getframe().f_code.co_name
-       self.xpon_ponsim_onu_adapter_interface(_method_name, device, data)
+        _method_name = sys._getframe().f_code.co_name
+        self.xpon_ponsim_onu_adapter_interface(_method_name, device, data)
 
     def create_tcont(self, device, tcont_data, traffic_descriptor_data):
-       _method_name = sys._getframe().f_code.co_name
-       self.xpon_ponsim_onu_adapter_interface(_method_name, device, tcont_data,
-                                              traffic_descriptor_data)
+        _method_name = sys._getframe().f_code.co_name
+        self.xpon_ponsim_onu_adapter_interface(_method_name, device,
+                                               tcont_data,
+                                               traffic_descriptor_data)
 
     def update_tcont(self, device, tcont_data, traffic_descriptor_data):
-       _method_name = sys._getframe().f_code.co_name
-       self.xpon_ponsim_onu_adapter_interface(_method_name, device, tcont_data,
-                                              traffic_descriptor_data)
+        _method_name = sys._getframe().f_code.co_name
+        self.xpon_ponsim_onu_adapter_interface(_method_name, device,
+                                               tcont_data,
+                                               traffic_descriptor_data)
 
     def remove_tcont(self, device, tcont_data, traffic_descriptor_data):
-       _method_name = sys._getframe().f_code.co_name
-       self.xpon_ponsim_onu_adapter_interface(_method_name, device, tcont_data,
-                                              traffic_descriptor_data)
+        _method_name = sys._getframe().f_code.co_name
+        self.xpon_ponsim_onu_adapter_interface(_method_name, device,
+                                               tcont_data,
+                                               traffic_descriptor_data)
 
     def create_gemport(self, device, data):
-       _method_name = sys._getframe().f_code.co_name
-       self.xpon_ponsim_onu_adapter_interface(_method_name, device, data)
+        _method_name = sys._getframe().f_code.co_name
+        self.xpon_ponsim_onu_adapter_interface(_method_name, device, data)
 
     def update_gemport(self, device, data):
-       _method_name = sys._getframe().f_code.co_name
-       self.xpon_ponsim_onu_adapter_interface(_method_name, device, data)
+        _method_name = sys._getframe().f_code.co_name
+        self.xpon_ponsim_onu_adapter_interface(_method_name, device, data)
 
     def remove_gemport(self, device, data):
-       _method_name = sys._getframe().f_code.co_name
-       self.xpon_ponsim_onu_adapter_interface(_method_name, device, data)
+        _method_name = sys._getframe().f_code.co_name
+        self.xpon_ponsim_onu_adapter_interface(_method_name, device, data)
 
     def create_multicast_gemport(self, device, data):
-       _method_name = sys._getframe().f_code.co_name
-       self.xpon_ponsim_onu_adapter_interface(_method_name, device, data)
+        _method_name = sys._getframe().f_code.co_name
+        self.xpon_ponsim_onu_adapter_interface(_method_name, device, data)
 
     def update_multicast_gemport(self, device, data):
-       _method_name = sys._getframe().f_code.co_name
-       self.xpon_ponsim_onu_adapter_interface(_method_name, device, data)
+        _method_name = sys._getframe().f_code.co_name
+        self.xpon_ponsim_onu_adapter_interface(_method_name, device, data)
 
     def remove_multicast_gemport(self, device, data):
-       _method_name = sys._getframe().f_code.co_name
-       self.xpon_ponsim_onu_adapter_interface(_method_name, device, data)
+        _method_name = sys._getframe().f_code.co_name
+        self.xpon_ponsim_onu_adapter_interface(_method_name, device, data)
 
     def create_multicast_distribution_set(self, device, data):
-       _method_name = sys._getframe().f_code.co_name
-       self.xpon_ponsim_onu_adapter_interface(_method_name, device, data)
+        _method_name = sys._getframe().f_code.co_name
+        self.xpon_ponsim_onu_adapter_interface(_method_name, device, data)
 
     def update_multicast_distribution_set(self, device, data):
-       _method_name = sys._getframe().f_code.co_name
-       self.xpon_ponsim_onu_adapter_interface(_method_name, device, data)
+        _method_name = sys._getframe().f_code.co_name
+        self.xpon_ponsim_onu_adapter_interface(_method_name, device, data)
 
     def remove_multicast_distribution_set(self, device, data):
-       _method_name = sys._getframe().f_code.co_name
-       self.xpon_ponsim_onu_adapter_interface(_method_name, device, data)
+        _method_name = sys._getframe().f_code.co_name
+        self.xpon_ponsim_onu_adapter_interface(_method_name, device, data)
+
 
 class PonSimOnuHandler(object):
     def __init__(self, adapter, device_id):
@@ -312,6 +319,18 @@
 
         yield self.incoming_messages.get()
 
+    def remove_from_flow_table(self, flows):
+        self.log.debug('remove-from-flow-table', flows=flows)
+        # TODO: Update PONSIM code to accept incremental flow changes.
+        # Once completed, the accepts_add_remove_flow_updates for this
+        # device type can be set to True
+
+    def add_to_flow_table(self, flows):
+        self.log.debug('add-to-flow-table', flows=flows)
+        # TODO: Update PONSIM code to accept incremental flow changes
+        # Once completed, the accepts_add_remove_flow_updates for this
+        # device type can be set to True
+
     @inlineCallbacks
     def reboot(self):
         self.log.info('rebooting', device_id=self.device_id)
@@ -493,7 +512,7 @@
             return None
         return interfaceConfig
 
-    def xpon_ponsim_onu_interface (self, method_name, data, data2=None):
+    def xpon_ponsim_onu_interface(self, method_name, data, data2=None):
         interfaceConfig = self.get_interface_config(data)
         if interfaceConfig is not None:
             self.log.info('forwarding-{}-request-to-onu-for-interface-type'
@@ -503,64 +522,64 @@
                 self.log.info(interface_type=type(data2))
 
     def create_interface(self, data):
-       _method_name = sys._getframe().f_code.co_name
-       self.xpon_ponsim_onu_interface(_method_name, data)
+        _method_name = sys._getframe().f_code.co_name
+        self.xpon_ponsim_onu_interface(_method_name, data)
 
     def update_interface(self, data):
-       _method_name = sys._getframe().f_code.co_name
-       self.xpon_ponsim_onu_interface(_method_name, data)
+        _method_name = sys._getframe().f_code.co_name
+        self.xpon_ponsim_onu_interface(_method_name, data)
 
     def remove_interface(self, data):
-       _method_name = sys._getframe().f_code.co_name
-       self.xpon_ponsim_onu_interface(_method_name, data)
+        _method_name = sys._getframe().f_code.co_name
+        self.xpon_ponsim_onu_interface(_method_name, data)
 
     def create_tcont(self, tcont_data, traffic_descriptor_data):
-       _method_name = sys._getframe().f_code.co_name
-       self.xpon_ponsim_onu_interface(_method_name, tcont_data,
-                                      traffic_descriptor_data)
+        _method_name = sys._getframe().f_code.co_name
+        self.xpon_ponsim_onu_interface(_method_name, tcont_data,
+                                       traffic_descriptor_data)
 
     def update_tcont(self, tcont_data, traffic_descriptor_data):
-       _method_name = sys._getframe().f_code.co_name
-       self.xpon_ponsim_onu_interface(_method_name, tcont_data,
-                                      traffic_descriptor_data)
+        _method_name = sys._getframe().f_code.co_name
+        self.xpon_ponsim_onu_interface(_method_name, tcont_data,
+                                       traffic_descriptor_data)
 
     def remove_tcont(self, tcont_data, traffic_descriptor_data):
-       _method_name = sys._getframe().f_code.co_name
-       self.xpon_ponsim_onu_interface(_method_name, tcont_data,
-                                      traffic_descriptor_data)
+        _method_name = sys._getframe().f_code.co_name
+        self.xpon_ponsim_onu_interface(_method_name, tcont_data,
+                                       traffic_descriptor_data)
 
     def create_gemport(self, data):
-       _method_name = sys._getframe().f_code.co_name
-       self.xpon_ponsim_onu_interface(_method_name, data)
+        _method_name = sys._getframe().f_code.co_name
+        self.xpon_ponsim_onu_interface(_method_name, data)
 
     def update_gemport(self, data):
-       _method_name = sys._getframe().f_code.co_name
-       self.xpon_ponsim_onu_interface(_method_name, data)
+        _method_name = sys._getframe().f_code.co_name
+        self.xpon_ponsim_onu_interface(_method_name, data)
 
     def remove_gemport(self, data):
-       _method_name = sys._getframe().f_code.co_name
-       self.xpon_ponsim_onu_interface(_method_name, data)
+        _method_name = sys._getframe().f_code.co_name
+        self.xpon_ponsim_onu_interface(_method_name, data)
 
     def create_multicast_gemport(self, data):
-       _method_name = sys._getframe().f_code.co_name
-       self.xpon_ponsim_onu_interface(_method_name, data)
+        _method_name = sys._getframe().f_code.co_name
+        self.xpon_ponsim_onu_interface(_method_name, data)
 
     def update_multicast_gemport(self, data):
-       _method_name = sys._getframe().f_code.co_name
-       self.xpon_ponsim_onu_interface(_method_name, data)
+        _method_name = sys._getframe().f_code.co_name
+        self.xpon_ponsim_onu_interface(_method_name, data)
 
     def remove_multicast_gemport(self, data):
-       _method_name = sys._getframe().f_code.co_name
-       self.xpon_ponsim_onu_interface(_method_name, data)
+        _method_name = sys._getframe().f_code.co_name
+        self.xpon_ponsim_onu_interface(_method_name, data)
 
     def create_multicast_distribution_set(self, data):
-       _method_name = sys._getframe().f_code.co_name
-       self.xpon_ponsim_onu_interface(_method_name, data)
+        _method_name = sys._getframe().f_code.co_name
+        self.xpon_ponsim_onu_interface(_method_name, data)
 
     def update_multicast_distribution_set(self, data):
-       _method_name = sys._getframe().f_code.co_name
-       self.xpon_ponsim_onu_interface(_method_name, data)
+        _method_name = sys._getframe().f_code.co_name
+        self.xpon_ponsim_onu_interface(_method_name, data)
 
     def remove_multicast_distribution_set(self, data):
-       _method_name = sys._getframe().f_code.co_name
-       self.xpon_ponsim_onu_interface(_method_name, data)
+        _method_name = sys._getframe().f_code.co_name
+        self.xpon_ponsim_onu_interface(_method_name, data)
diff --git a/voltha/core/adapter_agent.py b/voltha/core/adapter_agent.py
index cde651a..951cc67 100644
--- a/voltha/core/adapter_agent.py
+++ b/voltha/core/adapter_agent.py
@@ -212,8 +212,7 @@
         return self.adapter.update_flows_bulk(device, flows, groups)
 
     def update_flows_incrementally(self, device, flow_changes, group_changes):
-        return self.update_flows_incrementally(
-            device, flow_changes, group_changes)
+        return self.adapter.update_flows_incrementally(device, flow_changes, group_changes)
 
     def suppress_alarm(self, filter):
         return self.adapter.suppress_alarm(filter)
diff --git a/voltha/core/device_agent.py b/voltha/core/device_agent.py
index a5f11bb..5ed9115 100644
--- a/voltha/core/device_agent.py
+++ b/voltha/core/device_agent.py
@@ -24,10 +24,12 @@
 
 from voltha.core.config.config_proxy import CallbackType
 from voltha.protos.common_pb2 import AdminState, OperStatus, ConnectStatus, \
-                                     OperationResp
+    OperationResp
 from voltha.protos.device_pb2 import ImageDownload
 from voltha.registry import registry
-from voltha.protos.openflow_13_pb2 import Flows, FlowGroups
+from voltha.protos.openflow_13_pb2 import Flows, FlowGroups, FlowChanges, \
+    FlowGroupChanges
+
 
 class InvalidStateTransition(Exception): pass
 
@@ -58,6 +60,9 @@
             CallbackType.POST_UPDATE, self._process_update)
 
         self.flows_proxy.register_callback(
+            CallbackType.PRE_UPDATE, self._pre_process_flows)
+
+        self.flows_proxy.register_callback(
             CallbackType.POST_UPDATE, self._flow_table_updated)
         self.groups_proxy.register_callback(
             CallbackType.POST_UPDATE, self._group_table_updated)
@@ -70,6 +75,7 @@
             '/device_types/{}'.format(initial_data.type)).get()
 
         self.adapter_agent = None
+        self.flow_changes = None
         self.log = structlog.get_logger(device_id=initial_data.id)
 
     @inlineCallbacks
@@ -111,7 +117,8 @@
     def register_image_download(self, request):
         try:
             self.log.debug('register-image-download', request=request)
-            path = '/devices/{}/image_downloads/{}'.format(request.id, request.name)
+            path = '/devices/{}/image_downloads/{}'.format(request.id,
+                                                           request.name)
             self.img_dnld_proxies[request.name] = self.core.get_proxy(path)
             self.img_dnld_proxies[request.name].register_callback(
                 CallbackType.POST_UPDATE, self._update_image)
@@ -119,7 +126,7 @@
             request.state = ImageDownload.DOWNLOAD_REQUESTED
             self.img_dnld_proxies[request.name].update('/', request)
         except Exception as e:
-                self.log.exception(e.message)
+            self.log.exception(e.message)
 
     def activate_image_update(self, request):
         try:
@@ -127,7 +134,7 @@
             request.image_state = ImageDownload.IMAGE_ACTIVATE
             self.img_dnld_proxies[request.name].update('/', request)
         except Exception as e:
-                self.log.exception(e.message)
+            self.log.exception(e.message)
 
     def revert_image_update(self, request):
         try:
@@ -135,7 +142,7 @@
             request.image_state = ImageDownload.IMAGE_REVERT
             self.img_dnld_proxies[request.name].update('/', request)
         except Exception as e:
-                self.log.exception(e.message)
+            self.log.exception(e.message)
 
     @inlineCallbacks
     def _download_image(self, device, img_dnld):
@@ -148,7 +155,7 @@
     def get_image_download_status(self, request):
         try:
             self.log.debug('get-image-download-status',
-                    request=request)
+                           request=request)
             device = self.proxy.get('/')
             self.adapter_agent.get_image_download_status(device, request)
         except Exception as e:
@@ -157,7 +164,7 @@
     def cancel_image_download(self, img_dnld):
         try:
             self.log.debug('cancel-image-download',
-                    img_dnld=img_dnld)
+                           img_dnld=img_dnld)
             device = self.proxy.get('/')
             self.adapter_agent.cancel_image_download(device, img_dnld)
         except Exception as e:
@@ -166,22 +173,22 @@
     def update_device_image_download(self, img_dnld):
         try:
             self.log.debug('update-device-image-download',
-                    img_dnld=img_dnld)
-            self.proxy.update('/image_downloads/{}'\
-                    .format(img_dnld.name), img_dnld)
+                           img_dnld=img_dnld)
+            self.proxy.update('/image_downloads/{}' \
+                              .format(img_dnld.name), img_dnld)
         except Exception as e:
             self.log.exception(e.message)
 
     def unregister_device_image_download(self, name):
         try:
             self.log.debug('unregister-device-image-download',
-                            name=name)
+                           name=name)
             self.self_proxies[name].unregister_callback(
                 CallbackType.POST_ADD, self._download_image)
             self.self_proxies[name].unregister_callback(
                 CallbackType.POST_UPDATE, self._process_image)
         except Exception as e:
-                self.log.exception(e.message)
+            self.log.exception(e.message)
 
     @inlineCallbacks
     def _update_image(self, img_dnld):
@@ -193,12 +200,13 @@
                 yield self._download_image(device, img_dnld)
             if img_dnld.image_state == ImageDownload.IMAGE_ACTIVATE:
                 device = self.proxy.get('/')
-                yield self.adapter_agent.activate_image_update(device, img_dnld)
+                yield self.adapter_agent.activate_image_update(device,
+                                                               img_dnld)
             elif img_dnld.image_state == ImageDownload.IMAGE_REVERT:
                 device = self.proxy.get('/')
                 yield self.adapter_agent.revert_image_update(device, img_dnld)
         except Exception as e:
-                self.log.exception(e.message)
+            self.log.exception(e.message)
 
     @inlineCallbacks
     def self_test(self, device, dry_run=False):
@@ -232,8 +240,8 @@
     @inlineCallbacks
     def reconcile_existing_device(self, device, dry_run=False):
         self.log.debug('reconcile-existing-device',
-                      device=device,
-                      dry_run=False)
+                       device=device,
+                       dry_run=False)
         if not dry_run:
             yield self.adapter_agent.reconcile_device(device)
 
@@ -278,7 +286,7 @@
     def _process_state_transitions(self, device, dry_run=False):
 
         old_admin_state = getattr(self.last_data, 'admin_state',
-                                   AdminState.UNKNOWN)
+                                  AdminState.UNKNOWN)
         new_admin_state = device.admin_state
         self.log.debug('device-admin-states', old_state=old_admin_state,
                        new_state=new_admin_state, dry_run=dry_run)
@@ -309,14 +317,14 @@
         self.last_data = device  # so that we don't propagate back
         self.proxy.update('/', device)
         if device.admin_state == AdminState.ENABLED and \
-           device.oper_status == OperStatus.ACTIVE and \
-           device.connect_status == ConnectStatus.REACHABLE:
+                device.oper_status == OperStatus.ACTIVE and \
+                device.connect_status == ConnectStatus.REACHABLE:
             self.log.info('replay-create-interfaces ', device=device.id)
             self.core.xpon_agent.replay_interface(device.id)
             # if device accepts bulk flow update, lets just call that
             if self.device_type.accepts_bulk_flow_update:
                 flows = self.flows_proxy.get('/')  # gather flows
-                groups = self.groups_proxy.get('/') # gather flow groups
+                groups = self.groups_proxy.get('/')  # gather flow groups
                 self.log.info('replay-flows ', device=device.id)
                 yield self.adapter_agent.update_flows_bulk(
                     device=device,
@@ -324,14 +332,14 @@
                     groups=groups)
 
     def update_device_pm_config(self, device_pm_config, init=False):
-        self.callback_data = init# so that we don't push init data
+        self.callback_data = init  # so that we don't push init data
         self.pm_config_proxy.update('/', device_pm_config)
 
     def _propagate_change(self, device, dry_run=False):
         self.log.debug('propagate-change', device=device, dry_run=dry_run)
         if device != self.last_data:
             self.log.warn('Not-implemented-default-to-noop')
-            #raise NotImplementedError()
+            # raise NotImplementedError()
         else:
             self.log.debug('no-op')
 
@@ -397,7 +405,7 @@
 
     ## <======================= PM CONFIG UPDATE HANDLING ====================
 
-    #@inlineCallbacks
+    # @inlineCallbacks
     def _pm_config_updated(self, pm_configs):
         self.log.debug('pm-config-updated', pm_configs=pm_configs,
                        callback_data=self.callback_data)
@@ -408,24 +416,82 @@
 
     ## <======================= FLOW TABLE UPDATE HANDLING ====================
 
+    def _pre_process_flows(self, flows):
+        """
+        This method is invoked before a device flow table data model is
+        updated. If the device supports accepts_add_remove_flow_updates then it
+        pre-processes the desired flows against what currently
+        exist on the device to figure out which flows to delete and which
+        ones to add. The resulting data is stored locally and the flow table is
+        updated during the post-processing phase, i.e. via the POST_UPDATE
+        callback
+        :param flows: Desired flows
+        :return: None
+        """
+        if self.device_type.accepts_add_remove_flow_updates:
+            self.current_flows = self.flows_proxy.get('/')
+            self.log.debug('pre-processing-flows',
+                           logical_device_id=self.last_data.id,
+                           desired_flows=flows,
+                           existing_flows=self.current_flows)
+
+            if self.flow_changes is None:
+                self.flow_changes = FlowChanges()
+            else:
+                del self.flow_changes.to_add.items[:]
+                del self.flow_changes.to_remove.items[:]
+
+            current_flow_ids = set(f.id for f in self.current_flows.items)
+            desired_flow_ids = set(f.id for f in flows.items)
+
+            ids_to_add = desired_flow_ids.difference(current_flow_ids)
+            ids_to_del = current_flow_ids.difference(desired_flow_ids)
+
+            for f in flows.items:
+                if f.id in ids_to_add:
+                    self.flow_changes.to_add.items.extend([f])
+
+            for f in self.current_flows.items:
+                if f.id in ids_to_del:
+                    self.flow_changes.to_remove.items.extend([f])
+
+            self.log.debug('pre-processed-flows',
+                           logical_device_id=self.last_data.id,
+                           flow_changes=self.flow_changes)
+        else:
+            self.log.debug('no-pre-processing-required')
+
     @inlineCallbacks
     def _flow_table_updated(self, flows):
         self.log.debug('flow-table-updated',
-                  logical_device_id=self.last_data.id, flows=flows)
+                       logical_device_id=self.last_data.id, flows=flows)
+
+        # if device accepts non-bulk flow update, lets just call that first
+        if self.device_type.accepts_add_remove_flow_updates:
+            if (len(self.flow_changes.to_remove.items) == 0) and (len(
+                    self.flow_changes.to_add.items) == 0):
+                self.log.debug('no-flow-update-required',
+                               logical_device_id=self.last_data.id)
+            else:
+                try:
+                    yield self.adapter_agent.update_flows_incrementally(
+                        device=self.last_data,
+                        flow_changes=self.flow_changes,
+                        group_changes=FlowGroupChanges()
+                    )
+                except Exception as e:
+                    self.log.exception("Failure-updating-flows", e=e)
 
         # if device accepts bulk flow update, lets just call that
-        if self.device_type.accepts_bulk_flow_update:
-            groups = self.groups_proxy.get('/') # gather flow groups
+        elif self.device_type.accepts_bulk_flow_update:
+            self.log.debug('invoking bulk')
+            groups = self.groups_proxy.get('/')  # gather flow groups
             yield self.adapter_agent.update_flows_bulk(
                 device=self.last_data,
                 flows=flows,
                 groups=groups)
             # add ability to notify called when an flow update completes
             # see https://jira.opencord.org/browse/CORD-839
-
-        elif self.device_type.accepts_add_remove_flow_updates:
-            raise NotImplementedError()
-
         else:
             raise NotImplementedError()
 
@@ -434,8 +500,8 @@
     @inlineCallbacks
     def _group_table_updated(self, groups):
         self.log.debug('group-table-updated',
-                  logical_device_id=self.last_data.id,
-                  flow_groups=groups)
+                       logical_device_id=self.last_data.id,
+                       flow_groups=groups)
 
         # if device accepts bulk flow update, lets just call that
         if self.device_type.accepts_bulk_flow_update:
diff --git a/voltha/protos/openflow_13.proto b/voltha/protos/openflow_13.proto
index ea8235a..48b7944 100644
--- a/voltha/protos/openflow_13.proto
+++ b/voltha/protos/openflow_13.proto
@@ -2270,6 +2270,16 @@
     repeated ofp_group_entry items = 1;
 }
 
+message FlowChanges {
+    Flows to_add = 1;
+    Flows to_remove = 2;
+}
+
+message FlowGroupChanges {
+    FlowGroups to_add = 1;
+    FlowGroups to_remove = 2;
+}
+
 message PacketIn {
     string id = 1;  // LogicalDevice.id
     ofp_packet_in packet_in = 2;