SEBA-450 Update Att-Workflow-Driver to use synchronizer library
Change-Id: I86740c694950e86f2d7afea6221d33e2aad8aeef
diff --git a/xos/synchronizer/att-workflow-driver-synchronizer.py b/xos/synchronizer/att-workflow-driver-synchronizer.py
old mode 100755
new mode 100644
index f9ea235..9b420e7
--- a/xos/synchronizer/att-workflow-driver-synchronizer.py
+++ b/xos/synchronizer/att-workflow-driver-synchronizer.py
@@ -18,11 +18,11 @@
# This imports and runs ../../xos-observer.py
-import importlib
import os
-import sys
+from xossynchronizer import Synchronizer
from xosconfig import Config
+
base_config_file = os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + '/config.yaml')
mounted_config_file = os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + '/mounted_config.yaml')
@@ -31,7 +31,4 @@
else:
Config.init(base_config_file, 'synchronizer-config-schema.yaml')
-observer_path = os.path.join(os.path.dirname(os.path.realpath(__file__)),"../../synchronizers/new_base")
-sys.path.append(observer_path)
-mod = importlib.import_module("xos-synchronizer")
-mod.main()
+Synchronizer().run()
\ No newline at end of file
diff --git a/xos/synchronizer/event_steps/auth_event.py b/xos/synchronizer/event_steps/auth_event.py
index b181121..5f4c480 100644
--- a/xos/synchronizer/event_steps/auth_event.py
+++ b/xos/synchronizer/event_steps/auth_event.py
@@ -17,8 +17,7 @@
import time
import os
import sys
-from synchronizers.new_base.eventstep import EventStep
-from synchronizers.new_base.modelaccessor import model_accessor
+from xossynchronizer.event_steps.eventstep import EventStep
from helpers import AttHelpers
class SubscriberAuthEventStep(EventStep):
@@ -31,8 +30,8 @@
def process_event(self, event):
value = json.loads(event.value)
- onu_sn = AttHelpers.get_onu_sn(self.log, value)
- si = AttHelpers.get_si_by_sn(self.log, onu_sn)
+ onu_sn = AttHelpers.get_onu_sn(self.model_accessor, self.log, value)
+ si = AttHelpers.get_si_by_sn(self.model_accessor, self.log, onu_sn)
if not si:
self.log.exception("authentication.events: Cannot find att-workflow-driver service instance for this event", kafka_event=value)
raise Exception("authentication.events: Cannot find att-workflow-driver service instance for this event")
diff --git a/xos/synchronizer/event_steps/dhcp_event.py b/xos/synchronizer/event_steps/dhcp_event.py
index 1b3033a..ab411f7 100644
--- a/xos/synchronizer/event_steps/dhcp_event.py
+++ b/xos/synchronizer/event_steps/dhcp_event.py
@@ -17,8 +17,7 @@
import time
import os
import sys
-from synchronizers.new_base.eventstep import EventStep
-from synchronizers.new_base.modelaccessor import VOLTService, RCORDSubscriber, model_accessor
+from xossynchronizer.event_steps.eventstep import EventStep
from helpers import AttHelpers
class SubscriberDhcpEventStep(EventStep):
@@ -31,8 +30,8 @@
def process_event(self, event):
value = json.loads(event.value)
- onu_sn = AttHelpers.get_onu_sn(self.log, value)
- si = AttHelpers.get_si_by_sn(self.log, onu_sn)
+ onu_sn = AttHelpers.get_onu_sn(self.model_accessor, self.log, value)
+ si = AttHelpers.get_si_by_sn(self.model_accessor, self.log, onu_sn)
if not si:
self.log.exception("dhcp.events: Cannot find att-workflow-driver service instance for this event", kafka_event=value)
diff --git a/xos/synchronizer/event_steps/onu_event.py b/xos/synchronizer/event_steps/onu_event.py
index d205a59..905b6dc 100644
--- a/xos/synchronizer/event_steps/onu_event.py
+++ b/xos/synchronizer/event_steps/onu_event.py
@@ -15,8 +15,7 @@
import json
-from synchronizers.new_base.eventstep import EventStep
-from synchronizers.new_base.modelaccessor import AttWorkflowDriverService, AttWorkflowDriverServiceInstance, model_accessor
+from xossynchronizer.event_steps.eventstep import EventStep
class ONUEventStep(EventStep):
topics = ["onu.events"]
@@ -29,18 +28,18 @@
def get_att_si(self, event):
try:
- att_si = AttWorkflowDriverServiceInstance.objects.get(serial_number=event["serial_number"])
+ att_si = self.model_accessor.AttWorkflowDriverServiceInstance.objects.get(serial_number=event["serial_number"])
att_si.no_sync = False;
att_si.uni_port_id = event["uni_port_id"]
att_si.of_dpid = event["of_dpid"]
self.log.debug("onu.events: Found existing AttWorkflowDriverServiceInstance", si=att_si)
except IndexError:
# create an AttWorkflowDriverServiceInstance, the validation will be triggered in the corresponding sync step
- att_si = AttWorkflowDriverServiceInstance(
+ att_si = self.model_accessor.AttWorkflowDriverServiceInstance(
serial_number=event["serial_number"],
of_dpid=event["of_dpid"],
uni_port_id=event["uni_port_id"],
- owner=AttWorkflowDriverService.objects.first() # we assume there is only one AttWorkflowDriverService
+ owner=self.model_accessor.AttWorkflowDriverService.objects.first() # we assume there is only one AttWorkflowDriverService
)
self.log.debug("onu.events: Created new AttWorkflowDriverServiceInstance", si=att_si)
return att_si
diff --git a/xos/synchronizer/event_steps/test_auth_event.py b/xos/synchronizer/event_steps/test_auth_event.py
index 5587940..ea27358 100644
--- a/xos/synchronizer/event_steps/test_auth_event.py
+++ b/xos/synchronizer/event_steps/test_auth_event.py
@@ -18,34 +18,13 @@
import os, sys
-# Hack to load synchronizer framework
test_path=os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
-service_dir=os.path.join(test_path, "../../../..")
-xos_dir=os.path.join(test_path, "../../..")
-if not os.path.exists(os.path.join(test_path, "new_base")):
- xos_dir=os.path.join(test_path, "../../../../../../orchestration/xos/xos")
- services_dir=os.path.join(xos_dir, "../../xos_services")
-# END Hack to load synchronizer framework
-
-# generate model from xproto
-def get_models_fn(service_name, xproto_name):
- name = os.path.join(service_name, "xos", xproto_name)
- if os.path.exists(os.path.join(services_dir, name)):
- return name
- else:
- name = os.path.join(service_name, "xos", "synchronizer", "models", xproto_name)
- if os.path.exists(os.path.join(services_dir, name)):
- return name
- raise Exception("Unable to find service=%s xproto=%s" % (service_name, xproto_name))
-# END generate model from xproto
class TestSubscriberAuthEvent(unittest.TestCase):
def setUp(self):
self.sys_path_save = sys.path
- sys.path.append(xos_dir)
- sys.path.append(os.path.join(xos_dir, 'synchronizers', 'new_base'))
# Setting up the config module
from xosconfig import Config
@@ -56,24 +35,27 @@
log = create_logger(Config().get('logging'))
# END Setting up the config module
- from synchronizers.new_base.mock_modelaccessor_build import build_mock_modelaccessor
- # build_mock_modelaccessor(xos_dir, services_dir, [get_models_fn("olt-service", "volt.xproto")])
+ from xossynchronizer.mock_modelaccessor_build import mock_modelaccessor_config
+ mock_modelaccessor_config(test_path, [("att-workflow-driver", "att-workflow-driver.xproto"),
+ ("olt-service", "volt.xproto"),
+ ("../profiles/rcord", "rcord.xproto")])
- build_mock_modelaccessor(xos_dir, services_dir, [
- get_models_fn("att-workflow-driver", "att-workflow-driver.xproto"),
- get_models_fn("olt-service", "volt.xproto"),
- get_models_fn("../profiles/rcord", "rcord.xproto")
- ])
- import synchronizers.new_base.modelaccessor
- from auth_event import SubscriberAuthEventStep, model_accessor
+ import xossynchronizer.modelaccessor
+ import mock_modelaccessor
+ reload(mock_modelaccessor) # in case nose2 loaded it in a previous test
+ reload(xossynchronizer.modelaccessor) # in case nose2 loaded it in a previous test
+
+ from xossynchronizer.modelaccessor import model_accessor
+ from auth_event import SubscriberAuthEventStep
# import all class names to globals
for (k, v) in model_accessor.all_model_classes.items():
globals()[k] = v
+ self.model_accessor = model_accessor
self.log = log
- self.event_step = SubscriberAuthEventStep(self.log)
+ self.event_step = SubscriberAuthEventStep(model_accessor=self.model_accessor, log=self.log)
self.event = Mock()
@@ -109,3 +91,7 @@
self.hippie_si.save.assert_called_with(always_update_timestamp=True, update_fields=['authentication_state', 'serial_number', 'updated'])
self.assertEqual(self.hippie_si.authentication_state, 'APPROVED')
+
+if __name__ == '__main__':
+ sys.path.append("..") # for import of helpers.py
+ unittest.main()
\ No newline at end of file
diff --git a/xos/synchronizer/event_steps/test_dhcp_event.py b/xos/synchronizer/event_steps/test_dhcp_event.py
index a31bf3f..ccd4d10 100644
--- a/xos/synchronizer/event_steps/test_dhcp_event.py
+++ b/xos/synchronizer/event_steps/test_dhcp_event.py
@@ -18,34 +18,13 @@
import os, sys
-# Hack to load synchronizer framework
test_path=os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
-service_dir=os.path.join(test_path, "../../../..")
-xos_dir=os.path.join(test_path, "../../..")
-if not os.path.exists(os.path.join(test_path, "new_base")):
- xos_dir=os.path.join(test_path, "../../../../../../orchestration/xos/xos")
- services_dir=os.path.join(xos_dir, "../../xos_services")
-# END Hack to load synchronizer framework
-
-# generate model from xproto
-def get_models_fn(service_name, xproto_name):
- name = os.path.join(service_name, "xos", xproto_name)
- if os.path.exists(os.path.join(services_dir, name)):
- return name
- else:
- name = os.path.join(service_name, "xos", "synchronizer", "models", xproto_name)
- if os.path.exists(os.path.join(services_dir, name)):
- return name
- raise Exception("Unable to find service=%s xproto=%s" % (service_name, xproto_name))
-# END generate model from xproto
class TestSubscriberAuthEvent(unittest.TestCase):
def setUp(self):
self.sys_path_save = sys.path
- sys.path.append(xos_dir)
- sys.path.append(os.path.join(xos_dir, 'synchronizers', 'new_base'))
# Setting up the config module
from xosconfig import Config
@@ -56,24 +35,27 @@
log = create_logger(Config().get('logging'))
# END Setting up the config module
- from synchronizers.new_base.mock_modelaccessor_build import build_mock_modelaccessor
- # build_mock_modelaccessor(xos_dir, services_dir, [get_models_fn("olt-service", "volt.xproto")])
+ from xossynchronizer.mock_modelaccessor_build import mock_modelaccessor_config
+ mock_modelaccessor_config(test_path, [("att-workflow-driver", "att-workflow-driver.xproto"),
+ ("olt-service", "volt.xproto"),
+ ("../profiles/rcord", "rcord.xproto")])
- build_mock_modelaccessor(xos_dir, services_dir, [
- get_models_fn("att-workflow-driver", "att-workflow-driver.xproto"),
- get_models_fn("olt-service", "volt.xproto"),
- get_models_fn("../profiles/rcord", "rcord.xproto")
- ])
- import synchronizers.new_base.modelaccessor
- from dhcp_event import SubscriberDhcpEventStep, model_accessor
+ import xossynchronizer.modelaccessor
+ import mock_modelaccessor
+ reload(mock_modelaccessor) # in case nose2 loaded it in a previous test
+ reload(xossynchronizer.modelaccessor) # in case nose2 loaded it in a previous test
+
+ from xossynchronizer.modelaccessor import model_accessor
+ from dhcp_event import SubscriberDhcpEventStep
# import all class names to globals
for (k, v) in model_accessor.all_model_classes.items():
globals()[k] = v
+ self.model_accessor = model_accessor
self.log = log
- self.event_step = SubscriberDhcpEventStep(self.log)
+ self.event_step = SubscriberDhcpEventStep(model_accessor=self.model_accessor, log=self.log)
self.event = Mock()
@@ -110,6 +92,8 @@
patch.object(AttWorkflowDriverServiceInstance.objects, "get_items") as si_mock, \
patch.object(self.volt, "get_onu_sn_from_openflow") as get_onu_sn:
+ self.assertTrue(VOLTService.objects.first() is not None)
+
volt_service_mock.return_value = [self.volt]
get_onu_sn.return_value = "BRCM1234"
si_mock.return_value = [self.si]
@@ -120,3 +104,7 @@
self.assertEqual(self.si.dhcp_state, "DHCPREQUEST")
self.assertEqual(self.si.mac_address, self.mac_address)
self.assertEqual(self.si.ip_address, self.ip_address)
+
+if __name__ == '__main__':
+ sys.path.append("..") # for import of helpers.py
+ unittest.main()
\ No newline at end of file
diff --git a/xos/synchronizer/event_steps/test_onu_events.py b/xos/synchronizer/event_steps/test_onu_events.py
index bea8c0c..6bfb837 100644
--- a/xos/synchronizer/event_steps/test_onu_events.py
+++ b/xos/synchronizer/event_steps/test_onu_events.py
@@ -18,35 +18,13 @@
import os, sys
-# Hack to load synchronizer framework
test_path=os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
-xos_dir=os.path.join(test_path, "../../..")
-if not os.path.exists(os.path.join(test_path, "new_base")):
- xos_dir=os.path.join(test_path, "../../../../../../orchestration/xos/xos")
- services_dir = os.path.join(xos_dir, "../../xos_services")
-sys.path.append(xos_dir)
-sys.path.append(os.path.join(xos_dir, 'synchronizers', 'new_base'))
-# END Hack to load synchronizer framework
-
-# generate model from xproto
-def get_models_fn(service_name, xproto_name):
- name = os.path.join(service_name, "xos", xproto_name)
- if os.path.exists(os.path.join(services_dir, name)):
- return name
- else:
- name = os.path.join(service_name, "xos", "synchronizer", "models", xproto_name)
- if os.path.exists(os.path.join(services_dir, name)):
- return name
- raise Exception("Unable to find service=%s xproto=%s" % (service_name, xproto_name))
-# END generate model from xproto
class TestSyncOLTDevice(unittest.TestCase):
def setUp(self):
self.sys_path_save = sys.path
- sys.path.append(xos_dir)
- sys.path.append(os.path.join(xos_dir, 'synchronizers', 'new_base'))
# Setting up the config module
from xosconfig import Config
@@ -55,26 +33,27 @@
Config.init(config, "synchronizer-config-schema.yaml")
# END Setting up the config module
- from synchronizers.new_base.mock_modelaccessor_build import build_mock_modelaccessor
- # build_mock_modelaccessor(xos_dir, services_dir, [get_models_fn("olt-service", "volt.xproto")])
+ from xossynchronizer.mock_modelaccessor_build import mock_modelaccessor_config
+ mock_modelaccessor_config(test_path, [("att-workflow-driver", "att-workflow-driver.xproto"),
+ ("olt-service", "volt.xproto"),
+ ("../profiles/rcord", "rcord.xproto")])
- # FIXME this is to get jenkins to pass the tests, somehow it is running tests in a different order
- # and apparently it is not overriding the generated model accessor
- build_mock_modelaccessor(xos_dir, services_dir, [
- get_models_fn("att-workflow-driver", "att-workflow-driver.xproto"),
- get_models_fn("olt-service", "volt.xproto"),
- get_models_fn("../profiles/rcord", "rcord.xproto")
- ])
- import synchronizers.new_base.modelaccessor
- from onu_event import ONUEventStep, model_accessor
+ import xossynchronizer.modelaccessor
+ import mock_modelaccessor
+ reload(mock_modelaccessor) # in case nose2 loaded it in a previous test
+ reload(xossynchronizer.modelaccessor) # in case nose2 loaded it in a previous test
+
+ from xossynchronizer.modelaccessor import model_accessor
+ from onu_event import ONUEventStep
# import all class names to globals
for (k, v) in model_accessor.all_model_classes.items():
globals()[k] = v
+ self.model_accessor = model_accessor
self.log = Mock()
- self.event_step = ONUEventStep(self.log)
+ self.event_step = ONUEventStep(model_accessor=self.model_accessor, log=self.log)
self.event = Mock()
self.event_dict = {
@@ -163,4 +142,5 @@
self.assertEqual(att_si.onu_state, "DISABLED")
if __name__ == '__main__':
+ sys.path.append("..") # for import of helpers.py
unittest.main()
\ No newline at end of file
diff --git a/xos/synchronizer/helpers.py b/xos/synchronizer/helpers.py
index 6b4b2b5..9a2d035 100644
--- a/xos/synchronizer/helpers.py
+++ b/xos/synchronizer/helpers.py
@@ -12,12 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from synchronizers.new_base.syncstep import DeferredException
-from synchronizers.new_base.modelaccessor import AttWorkflowDriverWhiteListEntry, AttWorkflowDriverServiceInstance, ONUDevice, VOLTService, model_accessor
+from xossynchronizer.steps.syncstep import DeferredException
class AttHelpers():
@staticmethod
- def validate_onu(log, att_si):
+ def validate_onu(model_accessor, log, att_si):
"""
This method validate an ONU against the whitelist and set the appropriate state.
It's expected that the deferred exception is managed in the caller method,
@@ -30,7 +29,7 @@
oss_service = att_si.owner.leaf_model
# See if there is a matching entry in the whitelist.
- matching_entries = AttWorkflowDriverWhiteListEntry.objects.filter(
+ matching_entries = model_accessor.AttWorkflowDriverWhiteListEntry.objects.filter(
owner_id=oss_service.id,
)
matching_entries = [e for e in matching_entries if e.serial_number.lower() == att_si.serial_number.lower()]
@@ -41,7 +40,7 @@
whitelisted = matching_entries[0]
try:
- pon_port = ONUDevice.objects.get(serial_number=att_si.serial_number).pon_port
+ pon_port = model_accessor.ONUDevice.objects.get(serial_number=att_si.serial_number).pon_port
except IndexError:
raise DeferredException("ONU device %s is not know to XOS yet" % att_si.serial_number)
@@ -59,8 +58,8 @@
return [True, "ONU has been validated"]
@staticmethod
- def get_onu_sn(log, event):
- olt_service = VOLTService.objects.first()
+ def get_onu_sn(model_accessor, log, event):
+ olt_service = model_accessor.VOLTService.objects.first()
onu_sn = olt_service.get_onu_sn_from_openflow(event["deviceId"], event["portNumber"])
if not onu_sn or onu_sn is None:
log.exception("Cannot find onu serial number for this event", kafka_event=event)
@@ -69,9 +68,9 @@
return onu_sn
@staticmethod
- def get_si_by_sn(log, serial_number):
+ def get_si_by_sn(model_accessor, log, serial_number):
try:
- return AttWorkflowDriverServiceInstance.objects.get(serial_number=serial_number)
+ return model_accessor.AttWorkflowDriverServiceInstance.objects.get(serial_number=serial_number)
except IndexError:
log.exception("Cannot find att-workflow-driver service instance for this serial number", serial_number=serial_number)
raise Exception("Cannot find att-workflow-driver service instance for this serial number %s", serial_number)
\ No newline at end of file
diff --git a/xos/synchronizer/model_policies/model_policy_att_workflow_driver_serviceinstance.py b/xos/synchronizer/model_policies/model_policy_att_workflow_driver_serviceinstance.py
index 67e25c4..8e591cb 100644
--- a/xos/synchronizer/model_policies/model_policy_att_workflow_driver_serviceinstance.py
+++ b/xos/synchronizer/model_policies/model_policy_att_workflow_driver_serviceinstance.py
@@ -15,8 +15,7 @@
-from synchronizers.new_base.modelaccessor import RCORDSubscriber, RCORDIpAddress, ONUDevice, model_accessor
-from synchronizers.new_base.policy import Policy
+from xossynchronizer.model_policies.policy import Policy
import os
import sys
@@ -57,7 +56,7 @@
si.save_changed_fields()
def process_onu_state(self, si):
- [valid, message] = AttHelpers.validate_onu(self.logger, si)
+ [valid, message] = AttHelpers.validate_onu(self.model_accessor, self.logger, si)
if si.onu_state == "AWAITING" or si.onu_state == "ENABLED":
si.status_message = message
if valid:
@@ -111,7 +110,7 @@
def update_onu(self, serial_number, admin_state):
- onu = [onu for onu in ONUDevice.objects.all() if onu.serial_number.lower() == serial_number.lower()][0]
+ onu = [onu for onu in self.model_accessor.ONUDevice.objects.all() if onu.serial_number.lower() == serial_number.lower()][0]
if onu.admin_state == admin_state:
self.logger.debug("MODEL_POLICY: ONUDevice [%s] already has admin_state to %s" % (serial_number, admin_state))
else:
@@ -121,7 +120,7 @@
def get_subscriber(self, serial_number):
try:
- return [s for s in RCORDSubscriber.objects.all() if s.onu_device.lower() == serial_number.lower()][0]
+ return [s for s in self.model_accessor.RCORDSubscriber.objects.all() if s.onu_device.lower() == serial_number.lower()][0]
except IndexError:
# If the subscriber doesn't exist we don't do anything
self.logger.debug("MODEL_POLICY: subscriber does not exists for this SI, doing nothing", onu_device=serial_number)
@@ -131,7 +130,7 @@
# TODO check if the subscriber has an IP and update it,
# or create a new one
try:
- ip = RCORDIpAddress.objects.filter(
+ ip = self.model_accessor.RCORDIpAddress.objects.filter(
subscriber_id=subscriber.id,
ip=ip
)[0]
@@ -139,7 +138,7 @@
ip.save_changed_fields()
except IndexError:
self.logger.debug("MODEL_POLICY: Creating new RCORDIpAddress for subscriber", onu_device=subscriber.onu_device, subscriber_status=subscriber.status, ip=ip)
- ip = RCORDIpAddress(
+ ip = self.model_accessor.RCORDIpAddress(
subscriber_id=subscriber.id,
ip=ip,
description="DHCP Assigned IP Address"
@@ -148,7 +147,7 @@
def delete_subscriber_ip(self, subscriber, ip):
try:
- ip = RCORDIpAddress.objects.filter(
+ ip = self.model_accessor.RCORDIpAddress.objects.filter(
subscriber_id=subscriber.id,
ip=ip
)[0]
diff --git a/xos/synchronizer/model_policies/model_policy_att_workflow_driver_whitelistentry.py b/xos/synchronizer/model_policies/model_policy_att_workflow_driver_whitelistentry.py
index 51f2ba4..f063cc4 100644
--- a/xos/synchronizer/model_policies/model_policy_att_workflow_driver_whitelistentry.py
+++ b/xos/synchronizer/model_policies/model_policy_att_workflow_driver_whitelistentry.py
@@ -14,8 +14,7 @@
# limitations under the License.
-from synchronizers.new_base.modelaccessor import AttWorkflowDriverServiceInstance, AttWorkflowDriverWhiteListEntry, model_accessor
-from synchronizers.new_base.policy import Policy
+from xossynchronizer.model_policies.policy import Policy
import os
import sys
@@ -31,7 +30,7 @@
self.handle_update(whitelist)
def validate_onu_state(self, si):
- [valid, message] = AttHelpers.validate_onu(self.logger, si)
+ [valid, message] = AttHelpers.validate_onu(self.model_accessor, self.logger, si)
si.status_message = message
if valid:
si.onu_state = "ENABLED"
@@ -46,7 +45,7 @@
def handle_update(self, whitelist):
self.logger.debug("MODEL_POLICY: handle_update for AttWorkflowDriverWhiteListEntry", whitelist=whitelist)
- sis = AttWorkflowDriverServiceInstance.objects.all()
+ sis = self.model_accessor.AttWorkflowDriverServiceInstance.objects.all()
for si in sis:
@@ -66,7 +65,7 @@
assert(whitelist.owner)
- sis = AttWorkflowDriverServiceInstance.objects.all()
+ sis = self.model_accessor.AttWorkflowDriverServiceInstance.objects.all()
sis = [si for si in sis if si.serial_number.lower() == whitelist.serial_number.lower()]
for si in sis:
diff --git a/xos/synchronizer/model_policies/test_model_policy_att_workflow_driver_serviceinstance.py b/xos/synchronizer/model_policies/test_model_policy_att_workflow_driver_serviceinstance.py
index 1abc7c8..19bcaba 100644
--- a/xos/synchronizer/model_policies/test_model_policy_att_workflow_driver_serviceinstance.py
+++ b/xos/synchronizer/model_policies/test_model_policy_att_workflow_driver_serviceinstance.py
@@ -20,39 +20,31 @@
import os, sys
test_path=os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
-service_dir=os.path.join(test_path, "../../../..")
-xos_dir=os.path.join(test_path, "../../..")
-if not os.path.exists(os.path.join(test_path, "new_base")):
- xos_dir=os.path.join(test_path, "../../../../../../orchestration/xos/xos")
- services_dir=os.path.join(xos_dir, "../../xos_services")
-def get_models_fn(service_name, xproto_name):
- name = os.path.join(service_name, "xos", "synchronizer", "models", xproto_name)
- if os.path.exists(os.path.join(services_dir, name)):
- return name
- raise Exception("Unable to find service=%s xproto=%s" % (service_name, xproto_name))
class TestModelPolicyAttWorkflowDriverServiceInstance(unittest.TestCase):
def setUp(self):
self.sys_path_save = sys.path
- sys.path.append(xos_dir)
- sys.path.append(os.path.join(xos_dir, 'synchronizers', 'new_base'))
config = os.path.join(test_path, "../test_config.yaml")
from xosconfig import Config
Config.clear()
Config.init(config, 'synchronizer-config-schema.yaml')
- from synchronizers.new_base.mock_modelaccessor_build import build_mock_modelaccessor
- build_mock_modelaccessor(xos_dir, services_dir, [
- get_models_fn("att-workflow-driver", "att-workflow-driver.xproto"),
- get_models_fn("olt-service", "volt.xproto"),
- get_models_fn("../profiles/rcord", "rcord.xproto")
- ])
+ from xossynchronizer.mock_modelaccessor_build import mock_modelaccessor_config
+ mock_modelaccessor_config(test_path, [("att-workflow-driver", "att-workflow-driver.xproto"),
+ ("olt-service", "volt.xproto"),
+ ("../profiles/rcord", "rcord.xproto")])
- import synchronizers.new_base.modelaccessor
- from model_policy_att_workflow_driver_serviceinstance import AttWorkflowDriverServiceInstancePolicy, model_accessor
+ import xossynchronizer.modelaccessor
+ import mock_modelaccessor
+ reload(mock_modelaccessor) # in case nose2 loaded it in a previous test
+ reload(xossynchronizer.modelaccessor) # in case nose2 loaded it in a previous test
+
+ from xossynchronizer.modelaccessor import model_accessor
+ from model_policy_att_workflow_driver_serviceinstance import AttWorkflowDriverServiceInstancePolicy, AttHelpers
+ self.AttHelpers = AttHelpers
from mock_modelaccessor import MockObjectList
@@ -65,7 +57,7 @@
model_accessor.reset_all_object_stores()
- self.policy = AttWorkflowDriverServiceInstancePolicy()
+ self.policy = AttWorkflowDriverServiceInstancePolicy(model_accessor=model_accessor)
self.si = AttWorkflowDriverServiceInstance()
self.si.owner = AttWorkflowDriverService()
self.si.serial_number = "BRCM1234"
@@ -92,8 +84,7 @@
def test_enable_onu(self):
- from helpers import AttHelpers
- with patch.object(AttHelpers, "validate_onu") as validate_onu, \
+ with patch.object(self.AttHelpers, "validate_onu") as validate_onu, \
patch.object(self.policy, "update_onu") as update_onu, \
patch.object(self.si, "save") as save_si:
validate_onu.return_value = [True, "valid onu"]
@@ -106,8 +97,7 @@
self.assertIn("valid onu", self.si.status_message)
def test_disable_onu(self):
- from helpers import AttHelpers
- with patch.object(AttHelpers, "validate_onu") as validate_onu, \
+ with patch.object(self.AttHelpers, "validate_onu") as validate_onu, \
patch.object(self.policy, "update_onu") as update_onu, \
patch.object(self.si, "save") as save_si:
validate_onu.return_value = [False, "invalid onu"]
diff --git a/xos/synchronizer/model_policies/test_model_policy_att_workflow_driver_whitelistentry.py b/xos/synchronizer/model_policies/test_model_policy_att_workflow_driver_whitelistentry.py
index c6e8818..4db0815 100644
--- a/xos/synchronizer/model_policies/test_model_policy_att_workflow_driver_whitelistentry.py
+++ b/xos/synchronizer/model_policies/test_model_policy_att_workflow_driver_whitelistentry.py
@@ -20,38 +20,29 @@
import os, sys
test_path=os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
-service_dir=os.path.join(test_path, "../../../..")
-xos_dir=os.path.join(test_path, "../../..")
-if not os.path.exists(os.path.join(test_path, "new_base")):
- xos_dir=os.path.join(test_path, "../../../../../../orchestration/xos/xos")
- services_dir=os.path.join(xos_dir, "../../xos_services")
-
-def get_models_fn(service_name, xproto_name):
- name = os.path.join(service_name, "xos", "synchronizer", "models", xproto_name)
- if os.path.exists(os.path.join(services_dir, name)):
- return name
- raise Exception("Unable to find service=%s xproto=%s" % (service_name, xproto_name))
class TestModelPolicyAttWorkflowDriverWhiteListEntry(unittest.TestCase):
def setUp(self):
self.sys_path_save = sys.path
- sys.path.append(xos_dir)
- sys.path.append(os.path.join(xos_dir, 'synchronizers', 'new_base'))
config = os.path.join(test_path, "../test_config.yaml")
from xosconfig import Config
Config.clear()
Config.init(config, 'synchronizer-config-schema.yaml')
- from synchronizers.new_base.mock_modelaccessor_build import build_mock_modelaccessor
- build_mock_modelaccessor(xos_dir, services_dir, [
- get_models_fn("att-workflow-driver", "att-workflow-driver.xproto"),
- get_models_fn("olt-service", "volt.xproto"),
- get_models_fn("../profiles/rcord", "rcord.xproto")
- ])
+ from xossynchronizer.mock_modelaccessor_build import mock_modelaccessor_config
+ mock_modelaccessor_config(test_path, [("att-workflow-driver", "att-workflow-driver.xproto"),
+ ("olt-service", "volt.xproto"),
+ ("../profiles/rcord", "rcord.xproto")])
- import synchronizers.new_base.modelaccessor
- from model_policy_att_workflow_driver_whitelistentry import AttWorkflowDriverWhiteListEntryPolicy, model_accessor
+ import xossynchronizer.modelaccessor
+ import mock_modelaccessor
+ reload(mock_modelaccessor) # in case nose2 loaded it in a previous test
+ reload(xossynchronizer.modelaccessor) # in case nose2 loaded it in a previous test
+
+ from xossynchronizer.modelaccessor import model_accessor
+ from model_policy_att_workflow_driver_whitelistentry import AttWorkflowDriverWhiteListEntryPolicy, AttHelpers
+ self.AttHelpers = AttHelpers
from mock_modelaccessor import MockObjectList
self.MockObjectList = MockObjectList
@@ -64,7 +55,7 @@
# tags. Ideally, this wouldn't happen, but it does. So make sure we reset the world.
model_accessor.reset_all_object_stores()
- self.policy = AttWorkflowDriverWhiteListEntryPolicy()
+ self.policy = AttWorkflowDriverWhiteListEntryPolicy(model_accessor=model_accessor)
self.service = AttWorkflowDriverService()
@@ -74,9 +65,8 @@
self.service = None
def test_enable_onu(self):
- from helpers import AttHelpers
si = AttWorkflowDriverServiceInstance(serial_number="BRCM333", owner_id=self.service.id, valid="invalid")
- with patch.object(AttHelpers, "validate_onu") as validate_onu, \
+ with patch.object(self.AttHelpers, "validate_onu") as validate_onu, \
patch.object(si, "save") as save_si:
validate_onu.return_value = [True, "valid onu"]
@@ -88,9 +78,8 @@
self.assertEqual("valid onu", si.status_message)
def test_disable_onu(self):
- from helpers import AttHelpers
si = AttWorkflowDriverServiceInstance(serial_number="BRCM333", owner_id=self.service.id, valid="invalid")
- with patch.object(AttHelpers, "validate_onu") as validate_onu, \
+ with patch.object(self.AttHelpers, "validate_onu") as validate_onu, \
patch.object(si, "save") as save_si:
validate_onu.return_value = [False, "invalid onu"]
diff --git a/xos/synchronizer/test_helpers.py b/xos/synchronizer/test_helpers.py
index c30cac3..98737d1 100644
--- a/xos/synchronizer/test_helpers.py
+++ b/xos/synchronizer/test_helpers.py
@@ -18,29 +18,14 @@
import os, sys
-# Hack to load synchronizer framework
test_path=os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
-if not os.path.exists(os.path.join(test_path, "new_base")):
- xos_dir=os.path.join(test_path, "../../../../../orchestration/xos/xos")
- services_dir=os.path.join(xos_dir, "../../xos_services")
-# END Hack to load synchronizer framework
-# generate model from xproto
-def get_models_fn(service_name, xproto_name):
- name = os.path.join(service_name, "xos", "synchronizer", "models", xproto_name)
- if os.path.exists(os.path.join(services_dir, name)):
- return name
- raise Exception("Unable to find service=%s xproto=%s" % (service_name, xproto_name))
-# END generate model from xproto
-
class TestAttHelpers(unittest.TestCase):
def setUp(self):
self.sys_path_save = sys.path
- sys.path.append(xos_dir)
- sys.path.append(os.path.join(xos_dir, 'synchronizers', 'new_base'))
# Setting up the config module
from xosconfig import Config
@@ -52,21 +37,25 @@
from multistructlog import create_logger
self.log = create_logger(Config().get('logging'))
- from synchronizers.new_base.mock_modelaccessor_build import build_mock_modelaccessor
+ from xossynchronizer.mock_modelaccessor_build import mock_modelaccessor_config
+ mock_modelaccessor_config(test_path, [("att-workflow-driver", "att-workflow-driver.xproto"),
+ ("olt-service", "volt.xproto"),
+ ("../profiles/rcord", "rcord.xproto")])
- build_mock_modelaccessor(xos_dir, services_dir, [
- get_models_fn("att-workflow-driver", "att-workflow-driver.xproto"),
- get_models_fn("olt-service", "volt.xproto"),
- get_models_fn("../profiles/rcord", "rcord.xproto")
- ])
- import synchronizers.new_base.modelaccessor
- from helpers import AttHelpers, model_accessor
+ import xossynchronizer.modelaccessor
+ import mock_modelaccessor
+ reload(mock_modelaccessor) # in case nose2 loaded it in a previous test
+ reload(xossynchronizer.modelaccessor) # in case nose2 loaded it in a previous test
+
+ from xossynchronizer.modelaccessor import model_accessor
+ from helpers import AttHelpers
# import all class names to globals
for (k, v) in model_accessor.all_model_classes.items():
globals()[k] = v
self.helpers = AttHelpers
+ self.model_accessor = model_accessor
self._volt = VOLTService()
self._volt.id = 1
@@ -107,7 +96,7 @@
with patch.object(AttWorkflowDriverWhiteListEntry.objects, "get_items") as whitelist_mock:
whitelist_mock.return_value = []
- [res, message] = self.helpers.validate_onu(self.log, self.att_si)
+ [res, message] = self.helpers.validate_onu(self.model_accessor, self.log, self.att_si)
self.assertFalse(res)
self.assertEqual(message, "ONU not found in whitelist")
@@ -119,7 +108,7 @@
whitelist_mock.return_value = [self.whitelist_entry]
onu_mock.return_value = [self.onu]
- [res, message] = self.helpers.validate_onu(self.log, self.att_si)
+ [res, message] = self.helpers.validate_onu(self.model_accessor, self.log, self.att_si)
self.assertFalse(res)
self.assertEqual(message, "ONU activated in wrong location")
@@ -131,7 +120,7 @@
whitelist_mock.return_value = [self.whitelist_entry]
onu_mock.return_value = [self.onu]
- [res, message] = self.helpers.validate_onu(self.log, self.att_si)
+ [res, message] = self.helpers.validate_onu(self.model_accessor, self.log, self.att_si)
self.assertFalse(res)
self.assertEqual(message, "ONU activated in wrong location")
@@ -143,7 +132,7 @@
onu_mock.return_value = []
with self.assertRaises(Exception) as e:
- self.helpers.validate_onu(self.log, self.att_si)
+ self.helpers.validate_onu(self.model_accessor, self.log, self.att_si)
self.assertEqual(e.exception.message, "ONU device %s is not know to XOS yet" % self.att_si.serial_number)
@@ -153,7 +142,7 @@
whitelist_mock.return_value = [self.whitelist_entry]
onu_mock.return_value = [self.onu]
- [res, message] = self.helpers.validate_onu(self.log, self.att_si)
+ [res, message] = self.helpers.validate_onu(self.model_accessor, self.log, self.att_si)
self.assertTrue(res)
self.assertEqual(message, "ONU has been validated")
@@ -165,7 +154,7 @@
whitelist_mock.return_value = [self.whitelist_entry]
onu_mock.return_value = [self.onu]
- [res, message] = self.helpers.validate_onu(self.log, self.att_si)
+ [res, message] = self.helpers.validate_onu(self.model_accessor, self.log, self.att_si)
self.assertTrue(res)
self.assertEqual(message, "ONU has been validated")