Initial DT workflow

Sample PPPoE-based workflow.
Track the subscriber PPPoE protocols state, authentication and IP address assignement are done via PPPoE protocols (PAP, CHAP, IPCP...).
Work with BNG app running on ONOS that generates events in the bng.pppoe Kafka topic.

Change-Id: Iae57395dcc90d027932c790c1c36d7b3e3f3e19b
diff --git a/xos/synchronizer/model_policies/__init__.py b/xos/synchronizer/model_policies/__init__.py
new file mode 100755
index 0000000..8612cfd
--- /dev/null
+++ b/xos/synchronizer/model_policies/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2020-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/xos/synchronizer/model_policies/model_policy_dt_workflow_driver_serviceinstance.py b/xos/synchronizer/model_policies/model_policy_dt_workflow_driver_serviceinstance.py
new file mode 100755
index 0000000..f885459
--- /dev/null
+++ b/xos/synchronizer/model_policies/model_policy_dt_workflow_driver_serviceinstance.py
@@ -0,0 +1,208 @@
+
+# Copyright 2020-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from helpers import DtHelpers
+from xossynchronizer.model_policies.policy import Policy
+
+import os
+import sys
+
+sync_path = os.path.abspath(os.path.join(os.path.dirname(os.path.realpath(__file__)), ".."))
+sys.path.append(sync_path)
+
+
+class DeferredException(Exception):
+    pass
+
+
+class DtWorkflowDriverServiceInstancePolicy(Policy):
+    model_name = "DtWorkflowDriverServiceInstance"
+
+    def handle_create(self, si):
+        self.logger.debug("MODEL_POLICY: handle_create for DtWorkflowDriverServiceInstance %s " % si.id)
+        self.handle_update(si)
+
+    def handle_update(self, si):
+        self.logger.debug("MODEL_POLICY: handle_update for DtWorkflowDriverServiceInstance %s " %
+                          (si.id), onu_state=si.admin_onu_state, authentication_state=si.authentication_state)
+
+        # Changing ONU state can change auth state
+        # Changing auth state can change IPCP state
+        # So need to process in this order
+        self.process_onu_state(si)
+
+        self.validate_states(si)
+
+        self.process_pppoe_state(si)
+        self.process_ipcp_state(si)
+
+        # handling the subscriber status
+        # It's a combination of all the other states
+        subscriber = self.get_subscriber(si.serial_number)
+        if subscriber:
+            self.update_subscriber(subscriber, si)
+
+        if si.pppoe_state != "CONNECTED" or si.oper_onu_status != "ENABLED" or si.admin_onu_state != "ENABLED":
+            # Clean-up of SI
+            si.pppoe_session_id = ""
+            si.ip_address = ""
+            si.mac_address = ""
+
+        si.save_changed_fields()
+
+    # Check the whitelist to see if the ONU is valid.  If it is, make sure that it's enabled.
+    def process_onu_state(self, si):
+        [valid, message] = DtHelpers.validate_onu(self.model_accessor, self.logger, si)
+        si.status_message = message
+        if valid:
+            si.admin_onu_state = "ENABLED"
+            self.update_onu(si.serial_number, "ENABLED")
+        else:
+            si.admin_onu_state = "DISABLED"
+            self.update_onu(si.serial_number, "DISABLED")
+
+    def process_pppoe_state(self, si):
+        pppoe_msgs = {
+            "AWAITING": " - Awaiting PPPoE connection",
+            "INITIATED": "",
+            "CONNECTED": "",
+            "DISCONNECTED": " - PPPoE session terminated",
+        }
+        si.status_message += pppoe_msgs[si.pppoe_state]
+
+    def process_ipcp_state(self, si):
+        ipcp_msgs = {
+            "AWAITING": "",
+            "CONF_ACK": " - IP address assigned",
+            "CONF_REQUEST": ""
+        }
+        si.status_message += ipcp_msgs[si.ipcp_state]
+
+    def validate_states(self, si):
+        if si.pppoe_state != "CONNECTED" or si.oper_onu_status != "ENABLED" or si.admin_onu_state != "ENABLED":
+            # Clean-up of SI
+            si.ipcp_state = "AWAITING"
+            si.authentication_state = "AWAITING"
+
+    def update_onu(self, serial_number, admin_state):
+        onu = [onu for onu in self.model_accessor.ONUDevice.objects.all() if onu.serial_number.lower()
+               == serial_number.lower()][0]
+        if onu.admin_state == "ADMIN_DISABLED":
+            self.logger.debug(
+                "MODEL_POLICY: ONUDevice [%s] has been manually disabled, not changing state to %s" %
+                (serial_number, admin_state))
+            return
+        if onu.admin_state == admin_state:
+            self.logger.debug(
+                "MODEL_POLICY: ONUDevice [%s] already has admin_state to %s" %
+                (serial_number, admin_state))
+        else:
+            self.logger.debug("MODEL_POLICY: setting ONUDevice [%s] admin_state to %s" % (serial_number, admin_state))
+            onu.admin_state = admin_state
+            onu.save_changed_fields(always_update_timestamp=True)
+
+    def get_subscriber(self, serial_number):
+        try:
+            return [s for s in self.model_accessor.RCORDSubscriber.objects.all() if s.onu_device.lower()
+                    == serial_number.lower()][0]
+        except IndexError:
+            # If the subscriber doesn't exist we don't do anything
+            self.logger.debug(
+                "MODEL_POLICY: subscriber does not exists for this SI, doing nothing",
+                onu_device=serial_number)
+            return None
+
+    def update_subscriber_ip(self, subscriber, ip):
+        # TODO check if the subscriber has an IP and update it,
+        #  or create a new one
+        try:
+            ip = self.model_accessor.RCORDIpAddress.objects.filter(
+                subscriber_id=subscriber.id,
+                ip=ip
+            )[0]
+            self.logger.debug("MODEL_POLICY: found existing RCORDIpAddress for subscriber",
+                              onu_device=subscriber.onu_device, subscriber_status=subscriber.status, ip=ip)
+            ip.save_changed_fields()
+        except IndexError:
+            self.logger.debug(
+                "MODEL_POLICY: Creating new RCORDIpAddress for subscriber",
+                onu_device=subscriber.onu_device,
+                subscriber_status=subscriber.status,
+                ip=ip)
+            ip = self.model_accessor.RCORDIpAddress(
+                subscriber_id=subscriber.id,
+                ip=ip,
+                description="IPCP Assigned IP Address"
+            )
+            ip.save()
+
+    def delete_subscriber_ip(self, subscriber, ip):
+        try:
+            ip = self.model_accessor.RCORDIpAddress.objects.filter(
+                subscriber_id=subscriber.id,
+                ip=ip
+            )[0]
+            self.logger.debug(
+                "MODEL_POLICY: delete RCORDIpAddress for subscriber",
+                onu_device=subscriber.onu_device,
+                subscriber_status=subscriber.status,
+                ip=ip)
+            ip.delete()
+        except BaseException:
+            self.logger.warning("MODEL_POLICY: no RCORDIpAddress object found, cannot delete", ip=ip)
+
+    def update_subscriber(self, subscriber, si):
+        cur_status = subscriber.status
+        if si.oper_onu_status == "ENABLED" and si.admin_onu_state == "ENABLED":
+            subscriber.status = "enabled"
+        # FIXME: SEBA-670
+        # elif si.admin_onu_state == "DISABLED":
+        #     subscriber.status = "disabled"
+
+        # FIXME: we need subscriber to be always in enabled state to be able
+        #  to handle pppoe authentication via thew asg/pppoe relay, otherwise
+        #  packets will be dropped at the OLT.
+        #  We should either create an intermediate state to allow passing
+        #  traffic from the OLT to the ASG, or we should relay PPPPoE
+        #  control packets from the OLT.
+
+        if si.ipcp_state == "CONF_ACK" and si.ip_address:
+            self.update_subscriber_ip(subscriber, si.ip_address)
+        else:
+            self.delete_subscriber_ip(subscriber, si.ip_address)
+
+        if si.ipcp_state == "CONF_ACK" and si.mac_address:
+            subscriber.mac_address = si.mac_address
+        else:
+            subscriber.mac_address = ""
+
+        important_changes = cur_status != subscriber.status
+        if important_changes or si.pppoe_state == "DISCONNECTED" or si.ipcp_state == "CONF_ACK":
+            # NOTE SEBA-744
+            # Trigger sync_step only when the subscriber state change
+            self.logger.debug(
+                "MODEL_POLICY: updating subscriber",
+                onu_device=subscriber.onu_device,
+                authentication_state=si.authentication_state,
+                subscriber_status=subscriber.status,
+                always_update_timestamp=important_changes)
+            subscriber.save_changed_fields(always_update_timestamp=important_changes)
+        else:
+            self.logger.debug("MODEL_POLICY: subscriber status has not changed", onu_device=subscriber.onu_device,
+                              authentication_state=si.authentication_state, subscriber_status=subscriber.status)
+
+    def handle_delete(self, si):
+        pass
diff --git a/xos/synchronizer/model_policies/model_policy_dt_workflow_driver_whitelistentry.py b/xos/synchronizer/model_policies/model_policy_dt_workflow_driver_whitelistentry.py
new file mode 100755
index 0000000..9bd456c
--- /dev/null
+++ b/xos/synchronizer/model_policies/model_policy_dt_workflow_driver_whitelistentry.py
@@ -0,0 +1,82 @@
+
+# Copyright 2020-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from helpers import DtHelpers
+from xossynchronizer.model_policies.policy import Policy
+import os
+import sys
+
+sync_path = os.path.abspath(os.path.join(os.path.dirname(os.path.realpath(__file__)), ".."))
+sys.path.append(sync_path)
+
+
+class DtWorkflowDriverWhiteListEntryPolicy(Policy):
+    model_name = "DtWorkflowDriverWhiteListEntry"
+
+    def handle_create(self, whitelist):
+        self.handle_update(whitelist)
+
+    # Update the SI if the onu_state has changed.
+    # The SI model policy will take care of updating other state.
+    def validate_onu_state(self, si):
+        [valid, message] = DtHelpers.validate_onu(self.model_accessor, self.logger, si)
+        if valid:
+            si.admin_onu_state = "ENABLED"
+        else:
+            si.admin_onu_state = "DISABLED"
+
+        self.logger.debug(
+            "MODEL_POLICY: activating DtWorkflowDriverServiceInstance because of change in the whitelist",
+            si=si,
+            onu_state=si.admin_onu_state,
+            authentication_state=si.authentication_state)
+        si.save_changed_fields(always_update_timestamp=True)
+
+    def handle_update(self, whitelist):
+        self.logger.debug("MODEL_POLICY: handle_update for DtWorkflowDriverWhiteListEntry", whitelist=whitelist)
+
+        sis = self.model_accessor.DtWorkflowDriverServiceInstance.objects.all()
+
+        for si in sis:
+
+            if si.serial_number.lower() != whitelist.serial_number.lower():
+                # NOTE we don't care about this SI as it has a different serial number
+                continue
+
+            self.validate_onu_state(si)
+
+        whitelist.backend_need_delete_policy = True
+        whitelist.save_changed_fields()
+
+    def handle_delete(self, whitelist):
+        self.logger.debug(
+            "MODEL_POLICY: handle_delete for DtWorkflowDriverWhiteListEntry",
+            serial_number=whitelist.serial_number,
+            pon_port=whitelist.pon_port_id,
+            device=whitelist.device_id)
+
+        # BUG: Sometimes the delete policy is not called, because the reaper deletes
+
+        assert(whitelist.owner)
+
+        sis = self.model_accessor.DtWorkflowDriverServiceInstance.objects.all()
+        sis = [si for si in sis if si.serial_number.lower() == whitelist.serial_number.lower()]
+
+        for si in sis:
+            self.validate_onu_state(si)
+
+        whitelist.backend_need_reap = True
+        whitelist.save_changed_fields()
diff --git a/xos/synchronizer/model_policies/test_model_policy_dt_workflow_driver_serviceinstance.py b/xos/synchronizer/model_policies/test_model_policy_dt_workflow_driver_serviceinstance.py
new file mode 100755
index 0000000..e520036
--- /dev/null
+++ b/xos/synchronizer/model_policies/test_model_policy_dt_workflow_driver_serviceinstance.py
@@ -0,0 +1,357 @@
+
+# Copyright 2020-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import unittest
+from mock import patch
+
+import os
+import sys
+
+test_path = os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
+
+
+class TestModelPolicyDtWorkflowDriverServiceInstance(unittest.TestCase):
+    def setUp(self):
+
+        self.sys_path_save = sys.path
+
+        config = os.path.join(test_path, "../test_config.yaml")
+        from xosconfig import Config
+        Config.clear()
+        Config.init(config, 'synchronizer-config-schema.yaml')
+
+        from xossynchronizer.mock_modelaccessor_build import mock_modelaccessor_config
+        mock_modelaccessor_config(test_path, [("dt-workflow-driver", "dt-workflow-driver.xproto"),
+                                              ("olt-service", "volt.xproto"),
+                                              ("rcord", "rcord.xproto")])
+
+        import xossynchronizer.modelaccessor
+        import mock_modelaccessor
+        reload(mock_modelaccessor)  # in case nose2 loaded it in a previous test
+        reload(xossynchronizer.modelaccessor)      # in case nose2 loaded it in a previous test
+
+        from xossynchronizer.modelaccessor import model_accessor
+        from model_policy_dt_workflow_driver_serviceinstance import DtWorkflowDriverServiceInstancePolicy, DtHelpers
+        self.DtHelpers = DtHelpers
+
+        # import all class names to globals
+        for (k, v) in model_accessor.all_model_classes.items():
+            globals()[k] = v
+
+        # Some of the functions we call have side-effects. For example, creating a VSGServiceInstance may lead to
+        # creation of tags. Ideally, this wouldn't happen, but it does. So make sure we reset the world.
+        model_accessor.reset_all_object_stores()
+
+        self.policy = DtWorkflowDriverServiceInstancePolicy(model_accessor=model_accessor)
+        self.si = DtWorkflowDriverServiceInstance()
+        self.si.owner = DtWorkflowDriverService()
+        self.si.serial_number = "BRCM1234"
+
+    def tearDown(self):
+        sys.path = self.sys_path_save
+
+    def test_update_onu(self):
+
+        onu = ONUDevice(
+            serial_number="BRCM1234",
+            admin_state="ENABLED"
+        )
+        with patch.object(ONUDevice.objects, "get_items") as get_onu, \
+                patch.object(onu, "save") as onu_save:
+            get_onu.return_value = [onu]
+
+            self.policy.update_onu("brcm1234", "ENABLED")
+            onu_save.assert_not_called()
+
+            self.policy.update_onu("brcm1234", "DISABLED")
+            self.assertEqual(onu.admin_state, "DISABLED")
+            onu_save.assert_called_with(
+                always_update_timestamp=True, update_fields=[
+                    'admin_state', 'serial_number', 'updated'])
+
+    def test_enable_onu(self):
+        with patch.object(self.DtHelpers, "validate_onu") as validate_onu, \
+                patch.object(self.policy, "update_onu") as update_onu:
+            validate_onu.return_value = [True, "valid onu"]
+
+            self.policy.process_onu_state(self.si)
+
+            update_onu.assert_called_once()
+            update_onu.assert_called_with("BRCM1234", "ENABLED")
+
+            self.assertIn("valid onu", self.si.status_message)
+
+    def test_disable_onu(self):
+        with patch.object(self.DtHelpers, "validate_onu") as validate_onu, \
+                patch.object(self.policy, "update_onu") as update_onu:
+            validate_onu.return_value = [False, "invalid onu"]
+
+            self.policy.process_onu_state(self.si)
+
+            update_onu.assert_called_once()
+            update_onu.assert_called_with("BRCM1234", "DISABLED")
+
+            self.assertIn("invalid onu", self.si.status_message)
+
+    def test_handle_update_validate_onu(self):
+        """
+        Testing that handle_update calls validate_onu with the correct parameters
+        when necessary
+        """
+        with patch.object(self.policy, "process_onu_state") as process_onu_state, \
+                patch.object(self.policy, "update_onu") as update_onu, \
+                patch.object(self.policy, "get_subscriber") as get_subscriber:
+            update_onu.return_value = None
+            get_subscriber.return_value = None
+
+            self.si.admin_onu_state = "AWAITING"
+            self.si.oper_onu_status = "AWAITING"
+            self.policy.handle_update(self.si)
+            process_onu_state.assert_called_with(self.si)
+
+            self.si.admin_onu_state = "ENABLED"
+            self.si.oper_onu_status = "ENABLED"
+            self.policy.handle_update(self.si)
+            process_onu_state.assert_called_with(self.si)
+
+            self.si.admin_onu_state = "DISABLED"
+            self.si.oper_onu_status = "DISABLED"
+            self.policy.handle_update(self.si)
+            process_onu_state.assert_called_with(self.si)
+
+    def test_get_subscriber(self):
+
+        sub = RCORDSubscriber(
+            onu_device="BRCM1234"
+        )
+
+        with patch.object(RCORDSubscriber.objects, "get_items") as get_subscribers:
+            get_subscribers.return_value = [sub]
+
+            res = self.policy.get_subscriber("BRCM1234")
+            self.assertEqual(res, sub)
+
+            res = self.policy.get_subscriber("brcm1234")
+            self.assertEqual(res, sub)
+
+            res = self.policy.get_subscriber("foo")
+            self.assertEqual(res, None)
+
+    def test_update_subscriber_with_onu_event(self):
+
+        sub = RCORDSubscriber(
+            onu_device="BRCM1234",
+            status="pre-provisioned"
+        )
+
+        self.assertEqual(sub.status, "pre-provisioned")
+
+        with patch.object(sub, "save") as sub_save:
+            self.si.oper_onu_status = "AWAITING"
+            self.policy.update_subscriber(sub, self.si)
+            self.assertEqual(sub.status, "pre-provisioned")
+            sub_save.assert_not_called()
+            sub_save.reset_mock()
+
+            self.si.oper_onu_status = "ENABLED"
+            self.si.admin_onu_state = "ENABLED"
+            self.policy.update_subscriber(sub, self.si)
+            self.assertEqual(sub.status, "enabled")
+            sub_save.assert_called()
+            sub_save.reset_mock()
+
+            self.si.oper_onu_status = "DISABLED"
+            self.policy.update_subscriber(sub, self.si)
+            # FIXME: the result should be disabled, but for now we force it to remain enabled
+            self.assertEqual(sub.status, "enabled")
+            # sub_save.assert_called()
+            sub_save.reset_mock()
+
+    def test_not_update_subscriber_with_auth_events(self):
+
+        sub = RCORDSubscriber(
+            onu_device="BRCM1234"
+        )
+
+        self.si.status_message = "some content"
+        self.si.oper_onu_status = "ENABLED"
+
+        with patch.object(sub, "save") as sub_save:
+            self.si.authentication_state = "AWAITING"
+            self.policy.update_subscriber(sub, self.si)
+            self.assertEqual(sub.status, "enabled")
+            sub_save.assert_not_called()
+            sub_save.reset_mock()
+            sub.status = "enabled"
+
+            self.si.authentication_state = "REQUESTED"
+            self.policy.update_subscriber(sub, self.si)
+            self.assertEqual(sub.status, "enabled")
+            sub_save.assert_not_called()
+            sub_save.reset_mock()
+            sub.status = "enabled"
+
+            self.si.authentication_state = "STARTED"
+            self.policy.update_subscriber(sub, self.si)
+            self.assertEqual(sub.status, "enabled")
+            sub_save.assert_not_called()
+            sub_save.reset_mock()
+            sub.status = "enabled"
+
+            self.si.authentication_state = "APPROVED"
+            self.policy.update_subscriber(sub, self.si)
+            self.assertEqual(sub.status, "enabled")
+            sub_save.assert_not_called()
+            sub_save.reset_mock()
+            sub.status = "enabled"
+
+            self.si.authentication_state = "DENIED"
+            self.policy.update_subscriber(sub, self.si)
+            self.assertEqual(sub.status, "enabled")
+            sub_save.assert_not_called()
+            sub_save.reset_mock()
+            sub.status = "enabled"
+
+    def test_update_subscriber_not(self):
+        sub = RCORDSubscriber(
+            onu_device="BRCM1234"
+        )
+
+        self.si.oper_onu_status = "ENABLED"
+        sub.status = "enabled"
+
+        with patch.object(sub, "save") as sub_save:
+            self.si.authentication_state = "AWAITING"
+            self.policy.update_subscriber(sub, self.si)
+            sub_save.assert_not_called()
+
+            self.si.authentication_state = "REQUESTED"
+            self.policy.update_subscriber(sub, self.si)
+            sub_save.assert_not_called()
+
+            self.si.authentication_state = "STARTED"
+            self.policy.update_subscriber(sub, self.si)
+            sub_save.assert_not_called()
+
+            self.si.authentication_state = "APPROVED"
+            self.policy.update_subscriber(sub, self.si)
+            sub_save.assert_not_called()
+
+            self.si.authentication_state = "DENIED"
+            self.policy.update_subscriber(sub, self.si)
+            sub_save.assert_not_called()
+
+    def test_update_subscriber_ipcp_with_exiting_ip(self):
+        sub = RCORDSubscriber(
+            id=10,
+            onu_device="BRCM1234"
+        )
+
+        ip = RCORDIpAddress(
+            subscriber_id=sub.id,
+            ip='10.11.2.23'
+        )
+
+        self.si.pppoe_state = "CONNECTED"
+        self.si.authentication_state = "APPROVED"
+        self.si.oper_onu_status = "ENABLED"
+        self.si.ipcp_state = "CONF_ACK"
+        self.si.ip_address = "10.11.2.23"
+        self.si.mac_address = "4321"
+
+        with patch.object(sub, "save") as sub_save, \
+                patch.object(RCORDIpAddress.objects, "get_items") as get_ips, \
+                patch.object(ip, "save_changed_fields") as ip_mock:
+
+            get_ips.return_value = [ip]
+            ip_mock.return_value = []
+
+            self.policy.update_subscriber(sub, self.si)
+            sub_save.assert_called_with(always_update_timestamp=False,
+                update_fields=['id', 'mac_address', 'onu_device'])
+            self.assertEqual(sub.mac_address, self.si.mac_address)
+
+            ip_mock.assert_called_with()
+
+    def test_update_subscriber_ipcp_with_new_ip(self):
+        sub = RCORDSubscriber(
+            id=10,
+            onu_device="BRCM1234"
+        )
+
+        self.si.pppoe_state = "CONNECTED"
+        self.si.oper_onu_status = "ENABLED"
+        self.si.authentication_state = "APPROVED"
+        self.si.ipcp_state = "CONF_ACK"
+        self.si.ip_address = "10.11.2.23"
+        self.si.mac_address = "4321"
+
+        with patch.object(sub, "save") as sub_save, \
+                patch.object(RCORDIpAddress, "save", autospec=True) as ip_mock:
+
+            ip_mock.return_value = []
+
+            self.policy.update_subscriber(sub, self.si)
+            sub_save.assert_called_with(always_update_timestamp=False,
+                                        update_fields=['id', 'mac_address', 'onu_device'])
+            self.assertEqual(sub.mac_address, self.si.mac_address)
+
+            saved_ip = ip_mock.call_args[0][0]
+            self.assertEqual(saved_ip.ip, self.si.ip_address)
+            self.assertEqual(saved_ip.subscriber_id, sub.id)
+            self.assertEqual(saved_ip.description, "IPCP Assigned IP Address")
+
+    def test_handle_update_subscriber(self):
+        self.si.admin_onu_state = "DISABLED"
+
+        sub = RCORDSubscriber(
+            onu_device="BRCM1234"
+        )
+
+        with patch.object(self.policy, "get_subscriber") as get_subscriber, \
+                patch.object(self.policy, "update_onu") as update_onu, \
+                patch.object(self.policy, "update_subscriber") as update_subscriber:
+
+            get_subscriber.return_value = None
+            self.policy.handle_update(self.si)
+            update_onu.assert_called_with(sub.onu_device, "DISABLED")
+            self.assertEqual(update_subscriber.call_count, 0)
+
+            get_subscriber.return_value = sub
+            self.policy.handle_update(self.si)
+            update_subscriber.assert_called_with(sub, self.si)
+
+    def test_process_auth_state(self):
+        # testing change in admin_onu_state
+        self.si.admin_onu_state = "DISABLED"
+        self.si.oper_onu_status = "ENABLED"
+        self.si.authentication_state, "APPROVED"
+
+        self.policy.validate_states(self.si)
+        self.assertEqual(self.si.authentication_state, "AWAITING")
+
+        # testing change in oper_onu_status
+        self.si.admin_onu_state = "ENABLED"
+        self.si.oper_onu_status = "DISABLED"
+        self.si.authentication_state, "APPROVED"
+
+        self.policy.validate_states(self.si)
+        self.assertEqual(self.si.authentication_state, "AWAITING")
+
+
+if __name__ == '__main__':
+    sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), ".."))
+    unittest.main()
diff --git a/xos/synchronizer/model_policies/test_model_policy_dt_workflow_driver_whitelistentry.py b/xos/synchronizer/model_policies/test_model_policy_dt_workflow_driver_whitelistentry.py
new file mode 100755
index 0000000..193845f
--- /dev/null
+++ b/xos/synchronizer/model_policies/test_model_policy_dt_workflow_driver_whitelistentry.py
@@ -0,0 +1,129 @@
+
+# Copyright 2020-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import unittest
+from mock import patch
+
+import os
+import sys
+
+test_path = os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
+
+
+class TestModelPolicyDtWorkflowDriverWhiteListEntry(unittest.TestCase):
+    def setUp(self):
+        self.sys_path_save = sys.path
+
+        config = os.path.join(test_path, "../test_config.yaml")
+        from xosconfig import Config
+        Config.clear()
+        Config.init(config, 'synchronizer-config-schema.yaml')
+
+        from xossynchronizer.mock_modelaccessor_build import mock_modelaccessor_config
+        mock_modelaccessor_config(test_path, [("dt-workflow-driver", "dt-workflow-driver.xproto"),
+                                              ("olt-service", "volt.xproto"),
+                                              ("rcord", "rcord.xproto")])
+
+        import xossynchronizer.modelaccessor
+        import mock_modelaccessor
+        reload(mock_modelaccessor)  # in case nose2 loaded it in a previous test
+        reload(xossynchronizer.modelaccessor)      # in case nose2 loaded it in a previous test
+
+        from xossynchronizer.modelaccessor import model_accessor
+        from model_policy_dt_workflow_driver_whitelistentry import DtWorkflowDriverWhiteListEntryPolicy, DtHelpers
+        self.DtHelpers = DtHelpers
+
+        from mock_modelaccessor import MockObjectList
+        self.MockObjectList = MockObjectList
+
+        # import all class names to globals
+        for (k, v) in model_accessor.all_model_classes.items():
+            globals()[k] = v
+
+        # Some of the functions we call have side-effects. For example, creating a VSGServiceInstance may lead to
+        # creation of tags. Ideally, this wouldn't happen, but it does. So make sure we reset the world.
+        model_accessor.reset_all_object_stores()
+
+        self.policy = DtWorkflowDriverWhiteListEntryPolicy(model_accessor=model_accessor)
+
+        self.service = DtWorkflowDriverService()
+
+    def tearDown(self):
+        sys.path = self.sys_path_save
+        self.service = None
+
+    def test_enable_onu(self):
+        si = DtWorkflowDriverServiceInstance(serial_number="BRCM333", owner_id=self.service.id, valid="invalid")
+        with patch.object(self.DtHelpers, "validate_onu") as validate_onu, \
+                patch.object(si, "save") as save_si:
+            validate_onu.return_value = [True, "valid onu"]
+
+            self.policy.validate_onu_state(si)
+
+            save_si.assert_called_once()
+            save_si.assert_called_with(
+                always_update_timestamp=True, update_fields=[
+                    'admin_onu_state', 'serial_number', 'updated'])
+
+    def test_disable_onu(self):
+        si = DtWorkflowDriverServiceInstance(serial_number="BRCM333", owner_id=self.service.id, valid="invalid")
+        with patch.object(self.DtHelpers, "validate_onu") as validate_onu, \
+                patch.object(si, "save") as save_si:
+            validate_onu.return_value = [False, "invalid onu"]
+
+            self.policy.validate_onu_state(si)
+
+            save_si.assert_called_once()
+            save_si.assert_called_with(
+                always_update_timestamp=True, update_fields=[
+                    'admin_onu_state', 'serial_number', 'updated'])
+
+    def test_whitelist_update(self):
+        si = DtWorkflowDriverServiceInstance(serial_number="BRCM333", owner_id=self.service.id)
+        wle = DtWorkflowDriverWhiteListEntry(serial_number="brcm333", owner_id=self.service.id, owner=self.service)
+        with patch.object(DtWorkflowDriverServiceInstance.objects, "get_items") as oss_si_items, \
+                patch.object(self.policy, "validate_onu_state") as validate_onu_state, \
+                patch.object(wle, "save") as wle_save:
+            oss_si_items.return_value = [si]
+
+            self.policy.handle_update(wle)
+
+            validate_onu_state.assert_called_with(si)
+            self.assertTrue(wle.backend_need_delete_policy)
+            wle_save.assert_called_with(
+                always_update_timestamp=False, update_fields=[
+                    'backend_need_delete_policy', 'owner', 'serial_number'])
+
+    def test_whitelist_delete(self):
+        si = DtWorkflowDriverServiceInstance(serial_number="BRCM333", owner_id=self.service.id)
+        wle = DtWorkflowDriverWhiteListEntry(serial_number="brcm333", owner_id=self.service.id, owner=self.service)
+        with patch.object(DtWorkflowDriverServiceInstance.objects, "get_items") as oss_si_items, \
+                patch.object(self.policy, "validate_onu_state") as validate_onu_state, \
+                patch.object(wle, "save") as wle_save:
+            oss_si_items.return_value = [si]
+
+            self.policy.handle_delete(wle)
+
+            validate_onu_state.assert_called_with(si)
+            self.assertTrue(wle.backend_need_reap)
+            wle_save.assert_called_with(
+                always_update_timestamp=False, update_fields=[
+                    'backend_need_reap', 'owner', 'serial_number'])
+
+
+if __name__ == '__main__':
+    sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), ".."))
+    unittest.main()