SEBA-450 Update Att-Workflow-Driver to use synchronizer library
Change-Id: I86740c694950e86f2d7afea6221d33e2aad8aeef
diff --git a/xos/synchronizer/event_steps/auth_event.py b/xos/synchronizer/event_steps/auth_event.py
index b181121..5f4c480 100644
--- a/xos/synchronizer/event_steps/auth_event.py
+++ b/xos/synchronizer/event_steps/auth_event.py
@@ -17,8 +17,7 @@
import time
import os
import sys
-from synchronizers.new_base.eventstep import EventStep
-from synchronizers.new_base.modelaccessor import model_accessor
+from xossynchronizer.event_steps.eventstep import EventStep
from helpers import AttHelpers
class SubscriberAuthEventStep(EventStep):
@@ -31,8 +30,8 @@
def process_event(self, event):
value = json.loads(event.value)
- onu_sn = AttHelpers.get_onu_sn(self.log, value)
- si = AttHelpers.get_si_by_sn(self.log, onu_sn)
+ onu_sn = AttHelpers.get_onu_sn(self.model_accessor, self.log, value)
+ si = AttHelpers.get_si_by_sn(self.model_accessor, self.log, onu_sn)
if not si:
self.log.exception("authentication.events: Cannot find att-workflow-driver service instance for this event", kafka_event=value)
raise Exception("authentication.events: Cannot find att-workflow-driver service instance for this event")
diff --git a/xos/synchronizer/event_steps/dhcp_event.py b/xos/synchronizer/event_steps/dhcp_event.py
index 1b3033a..ab411f7 100644
--- a/xos/synchronizer/event_steps/dhcp_event.py
+++ b/xos/synchronizer/event_steps/dhcp_event.py
@@ -17,8 +17,7 @@
import time
import os
import sys
-from synchronizers.new_base.eventstep import EventStep
-from synchronizers.new_base.modelaccessor import VOLTService, RCORDSubscriber, model_accessor
+from xossynchronizer.event_steps.eventstep import EventStep
from helpers import AttHelpers
class SubscriberDhcpEventStep(EventStep):
@@ -31,8 +30,8 @@
def process_event(self, event):
value = json.loads(event.value)
- onu_sn = AttHelpers.get_onu_sn(self.log, value)
- si = AttHelpers.get_si_by_sn(self.log, onu_sn)
+ onu_sn = AttHelpers.get_onu_sn(self.model_accessor, self.log, value)
+ si = AttHelpers.get_si_by_sn(self.model_accessor, self.log, onu_sn)
if not si:
self.log.exception("dhcp.events: Cannot find att-workflow-driver service instance for this event", kafka_event=value)
diff --git a/xos/synchronizer/event_steps/onu_event.py b/xos/synchronizer/event_steps/onu_event.py
index d205a59..905b6dc 100644
--- a/xos/synchronizer/event_steps/onu_event.py
+++ b/xos/synchronizer/event_steps/onu_event.py
@@ -15,8 +15,7 @@
import json
-from synchronizers.new_base.eventstep import EventStep
-from synchronizers.new_base.modelaccessor import AttWorkflowDriverService, AttWorkflowDriverServiceInstance, model_accessor
+from xossynchronizer.event_steps.eventstep import EventStep
class ONUEventStep(EventStep):
topics = ["onu.events"]
@@ -29,18 +28,18 @@
def get_att_si(self, event):
try:
- att_si = AttWorkflowDriverServiceInstance.objects.get(serial_number=event["serial_number"])
+ att_si = self.model_accessor.AttWorkflowDriverServiceInstance.objects.get(serial_number=event["serial_number"])
att_si.no_sync = False;
att_si.uni_port_id = event["uni_port_id"]
att_si.of_dpid = event["of_dpid"]
self.log.debug("onu.events: Found existing AttWorkflowDriverServiceInstance", si=att_si)
except IndexError:
# create an AttWorkflowDriverServiceInstance, the validation will be triggered in the corresponding sync step
- att_si = AttWorkflowDriverServiceInstance(
+ att_si = self.model_accessor.AttWorkflowDriverServiceInstance(
serial_number=event["serial_number"],
of_dpid=event["of_dpid"],
uni_port_id=event["uni_port_id"],
- owner=AttWorkflowDriverService.objects.first() # we assume there is only one AttWorkflowDriverService
+ owner=self.model_accessor.AttWorkflowDriverService.objects.first() # we assume there is only one AttWorkflowDriverService
)
self.log.debug("onu.events: Created new AttWorkflowDriverServiceInstance", si=att_si)
return att_si
diff --git a/xos/synchronizer/event_steps/test_auth_event.py b/xos/synchronizer/event_steps/test_auth_event.py
index 5587940..ea27358 100644
--- a/xos/synchronizer/event_steps/test_auth_event.py
+++ b/xos/synchronizer/event_steps/test_auth_event.py
@@ -18,34 +18,13 @@
import os, sys
-# Hack to load synchronizer framework
test_path=os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
-service_dir=os.path.join(test_path, "../../../..")
-xos_dir=os.path.join(test_path, "../../..")
-if not os.path.exists(os.path.join(test_path, "new_base")):
- xos_dir=os.path.join(test_path, "../../../../../../orchestration/xos/xos")
- services_dir=os.path.join(xos_dir, "../../xos_services")
-# END Hack to load synchronizer framework
-
-# generate model from xproto
-def get_models_fn(service_name, xproto_name):
- name = os.path.join(service_name, "xos", xproto_name)
- if os.path.exists(os.path.join(services_dir, name)):
- return name
- else:
- name = os.path.join(service_name, "xos", "synchronizer", "models", xproto_name)
- if os.path.exists(os.path.join(services_dir, name)):
- return name
- raise Exception("Unable to find service=%s xproto=%s" % (service_name, xproto_name))
-# END generate model from xproto
class TestSubscriberAuthEvent(unittest.TestCase):
def setUp(self):
self.sys_path_save = sys.path
- sys.path.append(xos_dir)
- sys.path.append(os.path.join(xos_dir, 'synchronizers', 'new_base'))
# Setting up the config module
from xosconfig import Config
@@ -56,24 +35,27 @@
log = create_logger(Config().get('logging'))
# END Setting up the config module
- from synchronizers.new_base.mock_modelaccessor_build import build_mock_modelaccessor
- # build_mock_modelaccessor(xos_dir, services_dir, [get_models_fn("olt-service", "volt.xproto")])
+ from xossynchronizer.mock_modelaccessor_build import mock_modelaccessor_config
+ mock_modelaccessor_config(test_path, [("att-workflow-driver", "att-workflow-driver.xproto"),
+ ("olt-service", "volt.xproto"),
+ ("../profiles/rcord", "rcord.xproto")])
- build_mock_modelaccessor(xos_dir, services_dir, [
- get_models_fn("att-workflow-driver", "att-workflow-driver.xproto"),
- get_models_fn("olt-service", "volt.xproto"),
- get_models_fn("../profiles/rcord", "rcord.xproto")
- ])
- import synchronizers.new_base.modelaccessor
- from auth_event import SubscriberAuthEventStep, model_accessor
+ import xossynchronizer.modelaccessor
+ import mock_modelaccessor
+ reload(mock_modelaccessor) # in case nose2 loaded it in a previous test
+ reload(xossynchronizer.modelaccessor) # in case nose2 loaded it in a previous test
+
+ from xossynchronizer.modelaccessor import model_accessor
+ from auth_event import SubscriberAuthEventStep
# import all class names to globals
for (k, v) in model_accessor.all_model_classes.items():
globals()[k] = v
+ self.model_accessor = model_accessor
self.log = log
- self.event_step = SubscriberAuthEventStep(self.log)
+ self.event_step = SubscriberAuthEventStep(model_accessor=self.model_accessor, log=self.log)
self.event = Mock()
@@ -109,3 +91,7 @@
self.hippie_si.save.assert_called_with(always_update_timestamp=True, update_fields=['authentication_state', 'serial_number', 'updated'])
self.assertEqual(self.hippie_si.authentication_state, 'APPROVED')
+
+if __name__ == '__main__':
+ sys.path.append("..") # for import of helpers.py
+ unittest.main()
\ No newline at end of file
diff --git a/xos/synchronizer/event_steps/test_dhcp_event.py b/xos/synchronizer/event_steps/test_dhcp_event.py
index a31bf3f..ccd4d10 100644
--- a/xos/synchronizer/event_steps/test_dhcp_event.py
+++ b/xos/synchronizer/event_steps/test_dhcp_event.py
@@ -18,34 +18,13 @@
import os, sys
-# Hack to load synchronizer framework
test_path=os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
-service_dir=os.path.join(test_path, "../../../..")
-xos_dir=os.path.join(test_path, "../../..")
-if not os.path.exists(os.path.join(test_path, "new_base")):
- xos_dir=os.path.join(test_path, "../../../../../../orchestration/xos/xos")
- services_dir=os.path.join(xos_dir, "../../xos_services")
-# END Hack to load synchronizer framework
-
-# generate model from xproto
-def get_models_fn(service_name, xproto_name):
- name = os.path.join(service_name, "xos", xproto_name)
- if os.path.exists(os.path.join(services_dir, name)):
- return name
- else:
- name = os.path.join(service_name, "xos", "synchronizer", "models", xproto_name)
- if os.path.exists(os.path.join(services_dir, name)):
- return name
- raise Exception("Unable to find service=%s xproto=%s" % (service_name, xproto_name))
-# END generate model from xproto
class TestSubscriberAuthEvent(unittest.TestCase):
def setUp(self):
self.sys_path_save = sys.path
- sys.path.append(xos_dir)
- sys.path.append(os.path.join(xos_dir, 'synchronizers', 'new_base'))
# Setting up the config module
from xosconfig import Config
@@ -56,24 +35,27 @@
log = create_logger(Config().get('logging'))
# END Setting up the config module
- from synchronizers.new_base.mock_modelaccessor_build import build_mock_modelaccessor
- # build_mock_modelaccessor(xos_dir, services_dir, [get_models_fn("olt-service", "volt.xproto")])
+ from xossynchronizer.mock_modelaccessor_build import mock_modelaccessor_config
+ mock_modelaccessor_config(test_path, [("att-workflow-driver", "att-workflow-driver.xproto"),
+ ("olt-service", "volt.xproto"),
+ ("../profiles/rcord", "rcord.xproto")])
- build_mock_modelaccessor(xos_dir, services_dir, [
- get_models_fn("att-workflow-driver", "att-workflow-driver.xproto"),
- get_models_fn("olt-service", "volt.xproto"),
- get_models_fn("../profiles/rcord", "rcord.xproto")
- ])
- import synchronizers.new_base.modelaccessor
- from dhcp_event import SubscriberDhcpEventStep, model_accessor
+ import xossynchronizer.modelaccessor
+ import mock_modelaccessor
+ reload(mock_modelaccessor) # in case nose2 loaded it in a previous test
+ reload(xossynchronizer.modelaccessor) # in case nose2 loaded it in a previous test
+
+ from xossynchronizer.modelaccessor import model_accessor
+ from dhcp_event import SubscriberDhcpEventStep
# import all class names to globals
for (k, v) in model_accessor.all_model_classes.items():
globals()[k] = v
+ self.model_accessor = model_accessor
self.log = log
- self.event_step = SubscriberDhcpEventStep(self.log)
+ self.event_step = SubscriberDhcpEventStep(model_accessor=self.model_accessor, log=self.log)
self.event = Mock()
@@ -110,6 +92,8 @@
patch.object(AttWorkflowDriverServiceInstance.objects, "get_items") as si_mock, \
patch.object(self.volt, "get_onu_sn_from_openflow") as get_onu_sn:
+ self.assertTrue(VOLTService.objects.first() is not None)
+
volt_service_mock.return_value = [self.volt]
get_onu_sn.return_value = "BRCM1234"
si_mock.return_value = [self.si]
@@ -120,3 +104,7 @@
self.assertEqual(self.si.dhcp_state, "DHCPREQUEST")
self.assertEqual(self.si.mac_address, self.mac_address)
self.assertEqual(self.si.ip_address, self.ip_address)
+
+if __name__ == '__main__':
+ sys.path.append("..") # for import of helpers.py
+ unittest.main()
\ No newline at end of file
diff --git a/xos/synchronizer/event_steps/test_onu_events.py b/xos/synchronizer/event_steps/test_onu_events.py
index bea8c0c..6bfb837 100644
--- a/xos/synchronizer/event_steps/test_onu_events.py
+++ b/xos/synchronizer/event_steps/test_onu_events.py
@@ -18,35 +18,13 @@
import os, sys
-# Hack to load synchronizer framework
test_path=os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
-xos_dir=os.path.join(test_path, "../../..")
-if not os.path.exists(os.path.join(test_path, "new_base")):
- xos_dir=os.path.join(test_path, "../../../../../../orchestration/xos/xos")
- services_dir = os.path.join(xos_dir, "../../xos_services")
-sys.path.append(xos_dir)
-sys.path.append(os.path.join(xos_dir, 'synchronizers', 'new_base'))
-# END Hack to load synchronizer framework
-
-# generate model from xproto
-def get_models_fn(service_name, xproto_name):
- name = os.path.join(service_name, "xos", xproto_name)
- if os.path.exists(os.path.join(services_dir, name)):
- return name
- else:
- name = os.path.join(service_name, "xos", "synchronizer", "models", xproto_name)
- if os.path.exists(os.path.join(services_dir, name)):
- return name
- raise Exception("Unable to find service=%s xproto=%s" % (service_name, xproto_name))
-# END generate model from xproto
class TestSyncOLTDevice(unittest.TestCase):
def setUp(self):
self.sys_path_save = sys.path
- sys.path.append(xos_dir)
- sys.path.append(os.path.join(xos_dir, 'synchronizers', 'new_base'))
# Setting up the config module
from xosconfig import Config
@@ -55,26 +33,27 @@
Config.init(config, "synchronizer-config-schema.yaml")
# END Setting up the config module
- from synchronizers.new_base.mock_modelaccessor_build import build_mock_modelaccessor
- # build_mock_modelaccessor(xos_dir, services_dir, [get_models_fn("olt-service", "volt.xproto")])
+ from xossynchronizer.mock_modelaccessor_build import mock_modelaccessor_config
+ mock_modelaccessor_config(test_path, [("att-workflow-driver", "att-workflow-driver.xproto"),
+ ("olt-service", "volt.xproto"),
+ ("../profiles/rcord", "rcord.xproto")])
- # FIXME this is to get jenkins to pass the tests, somehow it is running tests in a different order
- # and apparently it is not overriding the generated model accessor
- build_mock_modelaccessor(xos_dir, services_dir, [
- get_models_fn("att-workflow-driver", "att-workflow-driver.xproto"),
- get_models_fn("olt-service", "volt.xproto"),
- get_models_fn("../profiles/rcord", "rcord.xproto")
- ])
- import synchronizers.new_base.modelaccessor
- from onu_event import ONUEventStep, model_accessor
+ import xossynchronizer.modelaccessor
+ import mock_modelaccessor
+ reload(mock_modelaccessor) # in case nose2 loaded it in a previous test
+ reload(xossynchronizer.modelaccessor) # in case nose2 loaded it in a previous test
+
+ from xossynchronizer.modelaccessor import model_accessor
+ from onu_event import ONUEventStep
# import all class names to globals
for (k, v) in model_accessor.all_model_classes.items():
globals()[k] = v
+ self.model_accessor = model_accessor
self.log = Mock()
- self.event_step = ONUEventStep(self.log)
+ self.event_step = ONUEventStep(model_accessor=self.model_accessor, log=self.log)
self.event = Mock()
self.event_dict = {
@@ -163,4 +142,5 @@
self.assertEqual(att_si.onu_state, "DISABLED")
if __name__ == '__main__':
+ sys.path.append("..") # for import of helpers.py
unittest.main()
\ No newline at end of file