SEBA-450 Update Att-Workflow-Driver to use synchronizer library
Change-Id: I86740c694950e86f2d7afea6221d33e2aad8aeef
diff --git a/xos/synchronizer/model_policies/model_policy_att_workflow_driver_serviceinstance.py b/xos/synchronizer/model_policies/model_policy_att_workflow_driver_serviceinstance.py
index 67e25c4..8e591cb 100644
--- a/xos/synchronizer/model_policies/model_policy_att_workflow_driver_serviceinstance.py
+++ b/xos/synchronizer/model_policies/model_policy_att_workflow_driver_serviceinstance.py
@@ -15,8 +15,7 @@
-from synchronizers.new_base.modelaccessor import RCORDSubscriber, RCORDIpAddress, ONUDevice, model_accessor
-from synchronizers.new_base.policy import Policy
+from xossynchronizer.model_policies.policy import Policy
import os
import sys
@@ -57,7 +56,7 @@
si.save_changed_fields()
def process_onu_state(self, si):
- [valid, message] = AttHelpers.validate_onu(self.logger, si)
+ [valid, message] = AttHelpers.validate_onu(self.model_accessor, self.logger, si)
if si.onu_state == "AWAITING" or si.onu_state == "ENABLED":
si.status_message = message
if valid:
@@ -111,7 +110,7 @@
def update_onu(self, serial_number, admin_state):
- onu = [onu for onu in ONUDevice.objects.all() if onu.serial_number.lower() == serial_number.lower()][0]
+ onu = [onu for onu in self.model_accessor.ONUDevice.objects.all() if onu.serial_number.lower() == serial_number.lower()][0]
if onu.admin_state == admin_state:
self.logger.debug("MODEL_POLICY: ONUDevice [%s] already has admin_state to %s" % (serial_number, admin_state))
else:
@@ -121,7 +120,7 @@
def get_subscriber(self, serial_number):
try:
- return [s for s in RCORDSubscriber.objects.all() if s.onu_device.lower() == serial_number.lower()][0]
+ return [s for s in self.model_accessor.RCORDSubscriber.objects.all() if s.onu_device.lower() == serial_number.lower()][0]
except IndexError:
# If the subscriber doesn't exist we don't do anything
self.logger.debug("MODEL_POLICY: subscriber does not exists for this SI, doing nothing", onu_device=serial_number)
@@ -131,7 +130,7 @@
# TODO check if the subscriber has an IP and update it,
# or create a new one
try:
- ip = RCORDIpAddress.objects.filter(
+ ip = self.model_accessor.RCORDIpAddress.objects.filter(
subscriber_id=subscriber.id,
ip=ip
)[0]
@@ -139,7 +138,7 @@
ip.save_changed_fields()
except IndexError:
self.logger.debug("MODEL_POLICY: Creating new RCORDIpAddress for subscriber", onu_device=subscriber.onu_device, subscriber_status=subscriber.status, ip=ip)
- ip = RCORDIpAddress(
+ ip = self.model_accessor.RCORDIpAddress(
subscriber_id=subscriber.id,
ip=ip,
description="DHCP Assigned IP Address"
@@ -148,7 +147,7 @@
def delete_subscriber_ip(self, subscriber, ip):
try:
- ip = RCORDIpAddress.objects.filter(
+ ip = self.model_accessor.RCORDIpAddress.objects.filter(
subscriber_id=subscriber.id,
ip=ip
)[0]
diff --git a/xos/synchronizer/model_policies/model_policy_att_workflow_driver_whitelistentry.py b/xos/synchronizer/model_policies/model_policy_att_workflow_driver_whitelistentry.py
index 51f2ba4..f063cc4 100644
--- a/xos/synchronizer/model_policies/model_policy_att_workflow_driver_whitelistentry.py
+++ b/xos/synchronizer/model_policies/model_policy_att_workflow_driver_whitelistentry.py
@@ -14,8 +14,7 @@
# limitations under the License.
-from synchronizers.new_base.modelaccessor import AttWorkflowDriverServiceInstance, AttWorkflowDriverWhiteListEntry, model_accessor
-from synchronizers.new_base.policy import Policy
+from xossynchronizer.model_policies.policy import Policy
import os
import sys
@@ -31,7 +30,7 @@
self.handle_update(whitelist)
def validate_onu_state(self, si):
- [valid, message] = AttHelpers.validate_onu(self.logger, si)
+ [valid, message] = AttHelpers.validate_onu(self.model_accessor, self.logger, si)
si.status_message = message
if valid:
si.onu_state = "ENABLED"
@@ -46,7 +45,7 @@
def handle_update(self, whitelist):
self.logger.debug("MODEL_POLICY: handle_update for AttWorkflowDriverWhiteListEntry", whitelist=whitelist)
- sis = AttWorkflowDriverServiceInstance.objects.all()
+ sis = self.model_accessor.AttWorkflowDriverServiceInstance.objects.all()
for si in sis:
@@ -66,7 +65,7 @@
assert(whitelist.owner)
- sis = AttWorkflowDriverServiceInstance.objects.all()
+ sis = self.model_accessor.AttWorkflowDriverServiceInstance.objects.all()
sis = [si for si in sis if si.serial_number.lower() == whitelist.serial_number.lower()]
for si in sis:
diff --git a/xos/synchronizer/model_policies/test_model_policy_att_workflow_driver_serviceinstance.py b/xos/synchronizer/model_policies/test_model_policy_att_workflow_driver_serviceinstance.py
index 1abc7c8..19bcaba 100644
--- a/xos/synchronizer/model_policies/test_model_policy_att_workflow_driver_serviceinstance.py
+++ b/xos/synchronizer/model_policies/test_model_policy_att_workflow_driver_serviceinstance.py
@@ -20,39 +20,31 @@
import os, sys
test_path=os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
-service_dir=os.path.join(test_path, "../../../..")
-xos_dir=os.path.join(test_path, "../../..")
-if not os.path.exists(os.path.join(test_path, "new_base")):
- xos_dir=os.path.join(test_path, "../../../../../../orchestration/xos/xos")
- services_dir=os.path.join(xos_dir, "../../xos_services")
-def get_models_fn(service_name, xproto_name):
- name = os.path.join(service_name, "xos", "synchronizer", "models", xproto_name)
- if os.path.exists(os.path.join(services_dir, name)):
- return name
- raise Exception("Unable to find service=%s xproto=%s" % (service_name, xproto_name))
class TestModelPolicyAttWorkflowDriverServiceInstance(unittest.TestCase):
def setUp(self):
self.sys_path_save = sys.path
- sys.path.append(xos_dir)
- sys.path.append(os.path.join(xos_dir, 'synchronizers', 'new_base'))
config = os.path.join(test_path, "../test_config.yaml")
from xosconfig import Config
Config.clear()
Config.init(config, 'synchronizer-config-schema.yaml')
- from synchronizers.new_base.mock_modelaccessor_build import build_mock_modelaccessor
- build_mock_modelaccessor(xos_dir, services_dir, [
- get_models_fn("att-workflow-driver", "att-workflow-driver.xproto"),
- get_models_fn("olt-service", "volt.xproto"),
- get_models_fn("../profiles/rcord", "rcord.xproto")
- ])
+ from xossynchronizer.mock_modelaccessor_build import mock_modelaccessor_config
+ mock_modelaccessor_config(test_path, [("att-workflow-driver", "att-workflow-driver.xproto"),
+ ("olt-service", "volt.xproto"),
+ ("../profiles/rcord", "rcord.xproto")])
- import synchronizers.new_base.modelaccessor
- from model_policy_att_workflow_driver_serviceinstance import AttWorkflowDriverServiceInstancePolicy, model_accessor
+ import xossynchronizer.modelaccessor
+ import mock_modelaccessor
+ reload(mock_modelaccessor) # in case nose2 loaded it in a previous test
+ reload(xossynchronizer.modelaccessor) # in case nose2 loaded it in a previous test
+
+ from xossynchronizer.modelaccessor import model_accessor
+ from model_policy_att_workflow_driver_serviceinstance import AttWorkflowDriverServiceInstancePolicy, AttHelpers
+ self.AttHelpers = AttHelpers
from mock_modelaccessor import MockObjectList
@@ -65,7 +57,7 @@
model_accessor.reset_all_object_stores()
- self.policy = AttWorkflowDriverServiceInstancePolicy()
+ self.policy = AttWorkflowDriverServiceInstancePolicy(model_accessor=model_accessor)
self.si = AttWorkflowDriverServiceInstance()
self.si.owner = AttWorkflowDriverService()
self.si.serial_number = "BRCM1234"
@@ -92,8 +84,7 @@
def test_enable_onu(self):
- from helpers import AttHelpers
- with patch.object(AttHelpers, "validate_onu") as validate_onu, \
+ with patch.object(self.AttHelpers, "validate_onu") as validate_onu, \
patch.object(self.policy, "update_onu") as update_onu, \
patch.object(self.si, "save") as save_si:
validate_onu.return_value = [True, "valid onu"]
@@ -106,8 +97,7 @@
self.assertIn("valid onu", self.si.status_message)
def test_disable_onu(self):
- from helpers import AttHelpers
- with patch.object(AttHelpers, "validate_onu") as validate_onu, \
+ with patch.object(self.AttHelpers, "validate_onu") as validate_onu, \
patch.object(self.policy, "update_onu") as update_onu, \
patch.object(self.si, "save") as save_si:
validate_onu.return_value = [False, "invalid onu"]
diff --git a/xos/synchronizer/model_policies/test_model_policy_att_workflow_driver_whitelistentry.py b/xos/synchronizer/model_policies/test_model_policy_att_workflow_driver_whitelistentry.py
index c6e8818..4db0815 100644
--- a/xos/synchronizer/model_policies/test_model_policy_att_workflow_driver_whitelistentry.py
+++ b/xos/synchronizer/model_policies/test_model_policy_att_workflow_driver_whitelistentry.py
@@ -20,38 +20,29 @@
import os, sys
test_path=os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
-service_dir=os.path.join(test_path, "../../../..")
-xos_dir=os.path.join(test_path, "../../..")
-if not os.path.exists(os.path.join(test_path, "new_base")):
- xos_dir=os.path.join(test_path, "../../../../../../orchestration/xos/xos")
- services_dir=os.path.join(xos_dir, "../../xos_services")
-
-def get_models_fn(service_name, xproto_name):
- name = os.path.join(service_name, "xos", "synchronizer", "models", xproto_name)
- if os.path.exists(os.path.join(services_dir, name)):
- return name
- raise Exception("Unable to find service=%s xproto=%s" % (service_name, xproto_name))
class TestModelPolicyAttWorkflowDriverWhiteListEntry(unittest.TestCase):
def setUp(self):
self.sys_path_save = sys.path
- sys.path.append(xos_dir)
- sys.path.append(os.path.join(xos_dir, 'synchronizers', 'new_base'))
config = os.path.join(test_path, "../test_config.yaml")
from xosconfig import Config
Config.clear()
Config.init(config, 'synchronizer-config-schema.yaml')
- from synchronizers.new_base.mock_modelaccessor_build import build_mock_modelaccessor
- build_mock_modelaccessor(xos_dir, services_dir, [
- get_models_fn("att-workflow-driver", "att-workflow-driver.xproto"),
- get_models_fn("olt-service", "volt.xproto"),
- get_models_fn("../profiles/rcord", "rcord.xproto")
- ])
+ from xossynchronizer.mock_modelaccessor_build import mock_modelaccessor_config
+ mock_modelaccessor_config(test_path, [("att-workflow-driver", "att-workflow-driver.xproto"),
+ ("olt-service", "volt.xproto"),
+ ("../profiles/rcord", "rcord.xproto")])
- import synchronizers.new_base.modelaccessor
- from model_policy_att_workflow_driver_whitelistentry import AttWorkflowDriverWhiteListEntryPolicy, model_accessor
+ import xossynchronizer.modelaccessor
+ import mock_modelaccessor
+ reload(mock_modelaccessor) # in case nose2 loaded it in a previous test
+ reload(xossynchronizer.modelaccessor) # in case nose2 loaded it in a previous test
+
+ from xossynchronizer.modelaccessor import model_accessor
+ from model_policy_att_workflow_driver_whitelistentry import AttWorkflowDriverWhiteListEntryPolicy, AttHelpers
+ self.AttHelpers = AttHelpers
from mock_modelaccessor import MockObjectList
self.MockObjectList = MockObjectList
@@ -64,7 +55,7 @@
# tags. Ideally, this wouldn't happen, but it does. So make sure we reset the world.
model_accessor.reset_all_object_stores()
- self.policy = AttWorkflowDriverWhiteListEntryPolicy()
+ self.policy = AttWorkflowDriverWhiteListEntryPolicy(model_accessor=model_accessor)
self.service = AttWorkflowDriverService()
@@ -74,9 +65,8 @@
self.service = None
def test_enable_onu(self):
- from helpers import AttHelpers
si = AttWorkflowDriverServiceInstance(serial_number="BRCM333", owner_id=self.service.id, valid="invalid")
- with patch.object(AttHelpers, "validate_onu") as validate_onu, \
+ with patch.object(self.AttHelpers, "validate_onu") as validate_onu, \
patch.object(si, "save") as save_si:
validate_onu.return_value = [True, "valid onu"]
@@ -88,9 +78,8 @@
self.assertEqual("valid onu", si.status_message)
def test_disable_onu(self):
- from helpers import AttHelpers
si = AttWorkflowDriverServiceInstance(serial_number="BRCM333", owner_id=self.service.id, valid="invalid")
- with patch.object(AttHelpers, "validate_onu") as validate_onu, \
+ with patch.object(self.AttHelpers, "validate_onu") as validate_onu, \
patch.object(si, "save") as save_si:
validate_onu.return_value = [False, "invalid onu"]