New event handling imported from att-workflow-driver
Change-Id: I692e4eb7604a42d47416474fac1c45dbf0ccfbe1
diff --git a/VERSION b/VERSION
index 6e8bf73..c992723 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-0.1.0
+0.1.1-dev
diff --git a/docs/README.md b/docs/README.md
index 569e90b..82084db 100644
--- a/docs/README.md
+++ b/docs/README.md
@@ -109,7 +109,8 @@
"deviceId" : "of:000000000a5a0072",
"portNumber" : "16",
"macAddress" : "90:e2:ba:82:fa:81",
- "ipAddress" : "10.11.1.1"
+ "ipAddress" : "10.11.1.1",
+ "serialNumber": "ALPHe3d1cfde" // ONU serial number
}
```
diff --git a/tox.ini b/tox.ini
index 9064a11..a6dde5f 100644
--- a/tox.ini
+++ b/tox.ini
@@ -35,6 +35,11 @@
[unittest]
plugins = nose2.plugins.junitxml
+code-directories=synchronizer
+ model_policies
+ steps
+ pull_steps
+ event_steps
[junit-xml]
path = nose2-results.xml
diff --git a/xos/synchronizer/__init__.py b/xos/synchronizer/__init__.py
index 8bbb6fa..42722a8 100644
--- a/xos/synchronizer/__init__.py
+++ b/xos/synchronizer/__init__.py
@@ -11,4 +11,4 @@
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
-# limitations under the License.
\ No newline at end of file
+# limitations under the License.
diff --git a/xos/synchronizer/event_steps/dhcp_event.py b/xos/synchronizer/event_steps/dhcp_event.py
index 8b3ea8d..92828f6 100644
--- a/xos/synchronizer/event_steps/dhcp_event.py
+++ b/xos/synchronizer/event_steps/dhcp_event.py
@@ -14,12 +14,9 @@
# limitations under the License.
import json
-import time
-import os
-import sys
from xossynchronizer.event_steps.eventstep import EventStep
+from helpers import TtHelpers
-from ..helpers import TtHelpers
class SubscriberDhcpEventStep(EventStep):
topics = ["dhcp.events"]
@@ -30,18 +27,11 @@
def process_event(self, event):
value = json.loads(event.value)
+ self.log.info("dhcp.events: Got event for subscriber", event_value=value)
- onu_sn = TtHelpers.get_onu_sn(self.model_accessor, self.log, value)
- si = TtHelpers.get_si_by_sn(self.model_accessor, self.log, onu_sn)
-
- if not si:
- self.log.exception("dhcp.events: Cannot find tt-workflow-driver service instance for this event", kafka_event=value)
- raise Exception("dhcp.events: Cannot find tt-workflow-driver service instance for this event")
-
- self.log.info("dhcp.events: Got event for subscriber", event_value=value, onu_sn=onu_sn, si=si)
-
+ si = TtHelpers.find_or_create_tt_si(self.model_accessor, self.log, value)
+ self.log.debug("dhcp.events: Updating service instance", si=si)
si.dhcp_state = value["messageType"]
si.ip_address = value["ipAddress"]
si.mac_address = value["macAddress"]
-
si.save_changed_fields(always_update_timestamp=True)
diff --git a/xos/synchronizer/event_steps/onu_event.py b/xos/synchronizer/event_steps/onu_event.py
index 3a18c26..8d64254 100644
--- a/xos/synchronizer/event_steps/onu_event.py
+++ b/xos/synchronizer/event_steps/onu_event.py
@@ -16,6 +16,8 @@
import json
from xossynchronizer.event_steps.eventstep import EventStep
+from helpers import TtHelpers
+
class ONUEventStep(EventStep):
topics = ["onu.events"]
@@ -26,31 +28,16 @@
def __init__(self, *args, **kwargs):
super(ONUEventStep, self).__init__(*args, **kwargs)
- def get_tt_si(self, event):
- try:
- tt_si = self.model_accessor.TtWorkflowDriverServiceInstance.objects.get(serial_number=event["serialNumber"])
- tt_si.no_sync = False;
- tt_si.uni_port_id = long(event["portNumber"])
- tt_si.of_dpid = event["deviceId"]
- self.log.debug("onu.events: Found existing TtWorkflowDriverServiceInstance", si=tt_si)
- except IndexError:
- # create an TtWorkflowDriverServiceInstance, the validation will be triggered in the corresponding sync step
- tt_si = self.model_accessor.TtWorkflowDriverServiceInstance(
- serial_number=event["serialNumber"],
- of_dpid=event["deviceId"],
- uni_port_id=long(event["portNumber"]),
- owner=self.model_accessor.TtWorkflowDriverService.objects.first() # we assume there is only one TtWorkflowDriverService
- )
- self.log.debug("onu.events: Created new TtWorkflowDriverServiceInstance", si=tt_si)
- return tt_si
-
def process_event(self, event):
value = json.loads(event.value)
self.log.info("onu.events: received event", value=value)
if value["status"] == "activated":
self.log.info("onu.events: activated onu", value=value)
- tt_si = self.get_tt_si(value)
+ tt_si = TtHelpers.find_or_create_tt_si(self.model_accessor, self.log, value)
+ tt_si.no_sync = False
+ tt_si.uni_port_id = long(value["portNumber"])
+ tt_si.of_dpid = value["deviceId"]
tt_si.onu_state = "ENABLED"
tt_si.save_changed_fields(always_update_timestamp=True)
elif value["status"] == "disabled":
diff --git a/xos/synchronizer/event_steps/test_dhcp_event.py b/xos/synchronizer/event_steps/test_dhcp_event.py
index d7934da..d51ee7a 100644
--- a/xos/synchronizer/event_steps/test_dhcp_event.py
+++ b/xos/synchronizer/event_steps/test_dhcp_event.py
@@ -13,12 +13,14 @@
# limitations under the License.
import unittest
-from mock import patch, call, Mock, PropertyMock
+from mock import patch, Mock
import json
-import os, sys
+import os
+import sys
-test_path=os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
+test_path = os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
+
class TestSubscriberAuthEvent(unittest.TestCase):
@@ -42,7 +44,7 @@
import xossynchronizer.modelaccessor
import mock_modelaccessor
- reload(mock_modelaccessor) # in case nose2 loaded it in a previous test
+ reload(mock_modelaccessor) # in case nose2 loaded it in a previous test
reload(xossynchronizer.modelaccessor) # in case nose2 loaded it in a previous test
from xossynchronizer.modelaccessor import model_accessor
@@ -74,28 +76,22 @@
self.si.serial_number = "BRCM1234"
self.si.save = Mock()
-
def tearDown(self):
sys.path = self.sys_path_save
def test_dhcp_subscriber(self):
self.event.value = json.dumps({
- "deviceId" : "of:0000000000000001",
- "portNumber" : "1",
- "macAddress" : self.mac_address,
- "ipAddress" : self.ip_address,
- "messageType": "DHCPREQUEST"
+ "deviceId": "of:0000000000000001",
+ "portNumber": "1",
+ "macAddress": self.mac_address,
+ "ipAddress": self.ip_address,
+ "messageType": "DHCPREQUEST",
+ 'serialNumber': "BRCM1234",
})
- with patch.object(VOLTService.objects, "get_items") as volt_service_mock, \
- patch.object(TtWorkflowDriverServiceInstance.objects, "get_items") as si_mock, \
- patch.object(self.volt, "get_onu_sn_from_openflow") as get_onu_sn:
+ with patch.object(TtWorkflowDriverServiceInstance.objects, "get_items") as si_mock:
- self.assertTrue(VOLTService.objects.first() is not None)
-
- volt_service_mock.return_value = [self.volt]
- get_onu_sn.return_value = "BRCM1234"
si_mock.return_value = [self.si]
self.event_step.process_event(self.event)
@@ -105,6 +101,7 @@
self.assertEqual(self.si.mac_address, self.mac_address)
self.assertEqual(self.si.ip_address, self.ip_address)
+
if __name__ == '__main__':
- sys.path.append("..") # for import of helpers.py
- unittest.main()
\ No newline at end of file
+ sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), "..")) # for import of helpers.py
+ unittest.main()
diff --git a/xos/synchronizer/event_steps/test_onu_events.py b/xos/synchronizer/event_steps/test_onu_events.py
index 0bd1c31..c11812f 100644
--- a/xos/synchronizer/event_steps/test_onu_events.py
+++ b/xos/synchronizer/event_steps/test_onu_events.py
@@ -131,5 +131,5 @@
if __name__ == '__main__':
- sys.path.append("..") # for import of helpers.py
+ sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), "..")) # for import of helpers.py
unittest.main()
diff --git a/xos/synchronizer/helpers.py b/xos/synchronizer/helpers.py
index e67890a..a300892 100644
--- a/xos/synchronizer/helpers.py
+++ b/xos/synchronizer/helpers.py
@@ -58,19 +58,21 @@
return [True, "ONU has been validated"]
@staticmethod
- def get_onu_sn(model_accessor, log, event):
- olt_service = model_accessor.VOLTService.objects.first()
- onu_sn = olt_service.get_onu_sn_from_openflow(event["deviceId"], event["portNumber"])
- if not onu_sn or onu_sn is None:
- log.exception("Cannot find onu serial number for this event", kafka_event=event)
- raise Exception("Cannot find onu serial number for this event")
-
- return onu_sn
-
- @staticmethod
- def get_si_by_sn(model_accessor, log, serial_number):
+ def find_or_create_tt_si(model_accessor, log, event):
try:
- return model_accessor.TtWorkflowDriverServiceInstance.objects.get(serial_number=serial_number)
+ tt_si = model_accessor.TtWorkflowDriverServiceInstance.objects.get(
+ serial_number=event["serialNumber"]
+ )
+ log.debug("TtHelpers: Found existing TtWorkflowDriverServiceInstance", si=tt_si)
except IndexError:
- log.exception("Cannot find tt-workflow-driver service instance for this serial number", serial_number=serial_number)
- raise Exception("Cannot find tt-workflow-driver service instance for this serial number %s", serial_number)
\ No newline at end of file
+ # create a TtWorkflowDriverServiceInstance, the validation will be
+ # triggered in the corresponding sync step
+ tt_si = model_accessor.TtWorkflowDriverServiceInstance(
+ serial_number=event["serialNumber"],
+ of_dpid=event["deviceId"],
+ uni_port_id=long(event["portNumber"]),
+ # we assume there is only one TtWorkflowDriverService
+ owner=model_accessor.TtWorkflowDriverService.objects.first()
+ )
+ log.debug("TtHelpers: Created new TtWorkflowDriverServiceInstance", si=tt_si)
+ return tt_si
diff --git a/xos/synchronizer/tt-workflow-driver-synchronizer.py b/xos/synchronizer/tt-workflow-driver-synchronizer.py
index 9b420e7..eb378ad 100644
--- a/xos/synchronizer/tt-workflow-driver-synchronizer.py
+++ b/xos/synchronizer/tt-workflow-driver-synchronizer.py
@@ -31,4 +31,4 @@
else:
Config.init(base_config_file, 'synchronizer-config-schema.yaml')
-Synchronizer().run()
\ No newline at end of file
+Synchronizer().run()