[SEBA-241] Cleaning up and fixing behaviour
Change-Id: I2657c690a6b114613aa2aa434875f4f9ef4a7ee2
diff --git a/README.md b/README.md
index 1725dda..fa6200e 100644
--- a/README.md
+++ b/README.md
@@ -1,9 +1,7 @@
# AT&T Workflow Driver
-This service is intended to be an example implementation of a service that integrates XOS with an external OSS system.
-As the name suggests this service will be very welcoming and validate any ONU that is connected to the system.
-
-Peace and Love
+This service implements the ONU and Subscriber management logic required by AT&T.
+It's also a good start if you need to implement a different logic to suit your use-case.
> NOTE: This service depends on RCORDSubscriber and ONUDevice so make sure that the `rcord-synchronizer` and `volt-synchronzier` are running
@@ -23,4 +21,55 @@
helm install -n att-workflow xos-services/att-workflow-driver/ -f examples/image-tag-candidate.yaml -f examples/imagePullPolicy-IfNotPresent.yaml
```
-## Configure this service
+## Service Instances State Machine
+
+| Topic | Event | Actions | ONU State | Subscriber State | Message |
+|-------------------------|----------------------------------|-----------------------------------------------------------|--------------|----------------------|----------------------------------------------------------|
+| `onu.events` | `status: activated` | Validate against whitelist (successful) | enabled | awaiting-auth | ONU has been validated |
+| `onu.events` | `status: activated` | Validate against whitelist (failed, missing) | disabled | awaiting-auth | ONU not found in whitelist |
+| `onu.events` | `status: activated` | Validate against whitelist (failed, location) | disabled | awaiting-auth | ONU activated in wrong location |
+| `onu.events` | `status: disabled` | Mark ONU as disabled and revoke subscriber authentication | disabled | awaiting-auth | ONU has been disabled, revoked subscriber authentication |
+| `authentication.events` | `authenticationState: STARTED` | Update subscriber status | enabled | awaiting-auth | Authentication started |
+| `authentication.events` | `authenticationState: REQUESTED` | Update subscriber status | enabled | awaiting-auth | Authentication requested |
+| `authentication.events` | `authenticationState: APPROVED` | Update subscriber status | enabled | enabled | Authentication succeded |
+| `authentication.events` | `authenticationState: DENIED` | Update subscriber status | enabled | auth-failed | Authentication denied |
+| `dhcp.events` | -- | Update subscriber ip and mac address | enabled | enabled | DHCP information added |
+
+## Events format
+
+This events are generated by various applications running on top of ONOS and published on a Kafka bus.
+Here is the structure of the events and their topics.
+
+### onu.events
+
+```json
+{
+ "timestamp": "2018-09-11T01:00:49.506Z",
+ "status": "activated", // or disabled
+ "serial_number": "ALPHe3d1cfde", // ONU serial number
+ "uni_port_id": 16, // uni port
+ "of_dpid": "of:000000000a5a0072" // OLT OpenFlow Id
+}
+```
+
+### authentication.events
+
+```json
+{
+ "timestamp": "2018-09-11T00:41:47.483Z",
+ "deviceId": "of:000000000a5a0072", // OLT OpenFlow Id
+ "portNumber": "16", // uni port
+ "authenticationState": "STARTED" // REQUESTED, APPROVED, DENIED
+}
+```
+
+### dhcp.events
+
+```json
+{
+ "deviceId" : "of:000000000a5a0072",
+ "portNumber" : "16",
+ "macAddress" : "90:e2:ba:82:fa:81",
+ "ipAddress" : "10.11.1.1"
+}
+```
\ No newline at end of file
diff --git a/xos/synchronizer/__init__.py b/xos/synchronizer/__init__.py
new file mode 100644
index 0000000..8bbb6fa
--- /dev/null
+++ b/xos/synchronizer/__init__.py
@@ -0,0 +1,14 @@
+
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
\ No newline at end of file
diff --git a/xos/synchronizer/config.yaml b/xos/synchronizer/config.yaml
index 90c0dbf..920aa5e 100644
--- a/xos/synchronizer/config.yaml
+++ b/xos/synchronizer/config.yaml
@@ -22,7 +22,6 @@
- ONUDevice
model_policies_dir: "/opt/xos/synchronizers/att-workflow-driver/model_policies"
models_dir: "/opt/xos/synchronizers/att-workflow-driver/models"
-steps_dir: "/opt/xos/synchronizers/att-workflow-driver/steps"
event_steps_dir: "/opt/xos/synchronizers/att-workflow-driver/event_steps"
logging:
version: 1
diff --git a/xos/synchronizer/event_steps/dhcp_event.py b/xos/synchronizer/event_steps/dhcp_event.py
index 2483e46..d6a789f 100644
--- a/xos/synchronizer/event_steps/dhcp_event.py
+++ b/xos/synchronizer/event_steps/dhcp_event.py
@@ -45,6 +45,8 @@
self.log.debug("dhcp.events: Got event for subscriber", subscriber=subscriber, event_value=value, onu_sn=onu_sn)
+ # NOTE it will be better to update the SI and use the model policy to update the subscriber,
+ # if this fails for any reason the event is lost
subscriber.ip_address = value["ipAddress"]
subscriber.mac_address = value["macAddress"]
subscriber.save()
diff --git a/xos/synchronizer/event_steps/onu_event.py b/xos/synchronizer/event_steps/onu_event.py
index f21231c..4b770a1 100644
--- a/xos/synchronizer/event_steps/onu_event.py
+++ b/xos/synchronizer/event_steps/onu_event.py
@@ -15,9 +15,6 @@
import json
-import time
-import os
-import sys
from synchronizers.new_base.eventstep import EventStep
from synchronizers.new_base.modelaccessor import AttWorkflowDriverService, AttWorkflowDriverServiceInstance, model_accessor
@@ -60,6 +57,8 @@
self.log.info("onu.events: disabled onu", value=value)
att_si.onu_state = "DISABLED"
att_si.authentication_state = "AWAITING"
+ else:
+ self.log.warn("onu.events: Unkown status value: %s" % value["status"], value=value)
att_si.save(always_update_timestamp=True)
diff --git a/xos/synchronizer/helpers.py b/xos/synchronizer/helpers.py
new file mode 100644
index 0000000..14f0eb4
--- /dev/null
+++ b/xos/synchronizer/helpers.py
@@ -0,0 +1,59 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synchronizers.new_base.syncstep import DeferredException
+from synchronizers.new_base.modelaccessor import AttWorkflowDriverWhiteListEntry, ONUDevice, model_accessor
+
+from xosconfig import Config
+from multistructlog import create_logger
+
+log = create_logger(Config().get('logging'))
+
+class AttHelpers():
+
+ @staticmethod
+ def validate_onu(att_si):
+ """
+ This method validate an ONU against the whitelist and set the appropriate state.
+ It's expected that the deferred exception is managed in the caller method,
+ for example a model_policy or a sync_step.
+
+ :param att_si: AttWorkflowDriverServiceInstance
+ :return: [boolean, string]
+ """
+
+ oss_service = att_si.owner.leaf_model
+
+ # See if there is a matching entry in the whitelist.
+ matching_entries = AttWorkflowDriverWhiteListEntry.objects.filter(
+ owner_id=oss_service.id,
+ )
+ matching_entries = [e for e in matching_entries if e.serial_number.lower() == att_si.serial_number.lower()]
+
+ if len(matching_entries) == 0:
+ log.warn("ONU not found in whitelist", object=str(att_si), serial_number=att_si.serial_number, **att_si.tologdict())
+ return [False, "ONU not found in whitelist"]
+
+ whitelisted = matching_entries[0]
+ try:
+ pon_port = ONUDevice.objects.get(serial_number=att_si.serial_number).pon_port
+ except IndexError:
+ raise DeferredException("ONU device %s is not know to XOS yet" % att_si.serial_number)
+
+ if pon_port.port_no != whitelisted.pon_port_id or att_si.of_dpid != whitelisted.device_id:
+ log.warn("ONU disable as location don't match", object=str(att_si), serial_number=att_si.serial_number,
+ **att_si.tologdict())
+ return [False, "ONU activated in wrong location"]
+
+ return [True, "ONU has been validated"]
\ No newline at end of file
diff --git a/xos/synchronizer/model_policies/model_policy_att_workflow_driver_service.py b/xos/synchronizer/model_policies/model_policy_att_workflow_driver_service.py
index 0c9d978..f738c3a 100644
--- a/xos/synchronizer/model_policies/model_policy_att_workflow_driver_service.py
+++ b/xos/synchronizer/model_policies/model_policy_att_workflow_driver_service.py
@@ -25,8 +25,7 @@
sis = AttWorkflowDriverServiceInstance.objects.all()
- # TODO(smbaker): This is redudant with AttWorkflowDriverWhiteListEntry model policy, though etaining this does provide
- # a handy way to trigger a full reexamination of the whitelist.
+ # TODO(teone): use the method defined in helpers.py
whitelist = [x.serial_number.lower() for x in service.whitelist_entries.all()]
diff --git a/xos/synchronizer/model_policies/model_policy_att_workflow_driver_serviceinstance.py b/xos/synchronizer/model_policies/model_policy_att_workflow_driver_serviceinstance.py
index 26b0367..dffd154 100644
--- a/xos/synchronizer/model_policies/model_policy_att_workflow_driver_serviceinstance.py
+++ b/xos/synchronizer/model_policies/model_policy_att_workflow_driver_serviceinstance.py
@@ -14,83 +14,90 @@
# limitations under the License.
+
from synchronizers.new_base.modelaccessor import RCORDSubscriber, ONUDevice, model_accessor
from synchronizers.new_base.policy import Policy
+import os
+import sys
+
+sync_path = os.path.abspath(os.path.join(os.path.dirname(os.path.realpath(__file__)), ".."))
+sys.path.append(sync_path)
+
+from helpers import AttHelpers
+
class DeferredException(Exception):
pass
class AttWorkflowDriverServiceInstancePolicy(Policy):
model_name = "AttWorkflowDriverServiceInstance"
+ separator = " // "
+
def handle_create(self, si):
self.logger.debug("MODEL_POLICY: handle_create for AttWorkflowDriverServiceInstance %s " % si.id)
self.handle_update(si)
def handle_update(self, si):
+ self.logger.debug("MODEL_POLICY: handle_update for AttWorkflowDriverServiceInstance %s " % (si.id))
- # TODO if si.onu_state = DISABLED set subscriber.status to need_auth
- # TODO cleanup
+ # validating ONU
+ if si.onu_state == "AWAITING" or si.onu_state == "ENABLED":
+ # we validate the ONU state only if it is enabled or awaiting,
+ # if it's disabled it means someone has disabled it
+ self.validate_onu_state(si)
- self.logger.debug("MODEL_POLICY: handle_update for AttWorkflowDriverServiceInstance %s, valid=%s " % (si.id, si.valid))
+ # handling the subscriber status
+ subscriber = self.get_subscriber(si.serial_number)
- # Check to make sure the object has been synced. This is to cover a race condition where the model_policy
- # runs, is interrupted by the sync step, the sync step completes, and then the model policy ends up saving
- # a policed_timestamp that is later the updated timestamp set by the sync_step.
- if (si.backend_code!=1):
- raise DeferredException("MODEL_POLICY: AttWorkflowDriverServiceInstance %s has not been synced yet" % si.id)
+ if subscriber:
+ self.update_subscriber(subscriber, si)
- # waiting for Whitelist validation
- if not hasattr(si, 'valid') or si.valid is "awaiting":
- raise DeferredException("MODEL_POLICY: deferring handle_update for AttWorkflowDriverServiceInstance %s as not validated yet" % si.id)
+ si.save()
- # disabling ONU
- if si.valid == "invalid":
- self.logger.debug("MODEL_POLICY: disabling ONUDevice [%s] for AttWorkflowDriverServiceInstance %s" % (si.serial_number, si.id))
- onu = ONUDevice.objects.get(serial_number=si.serial_number)
- onu.admin_state = "DISABLED"
- onu.save(always_update_timestamp=True)
- return
- if si.valid == "valid":
+ def validate_onu_state(self, si):
+ [valid, message] = AttHelpers.validate_onu(si)
+ si.status_message += self.separator + message
+ if valid:
+ si.onu_state = "ENABLED"
+ self.update_onu(si.serial_number, "ENABLED")
+ else:
+ si.onu_state = "DISABLED"
+ self.update_onu(si.serial_number, "DISABLED")
- # reactivating the ONUDevice
- try:
- onu = ONUDevice.objects.get(serial_number=si.serial_number)
- except IndexError:
- raise Exception("MODEL_POLICY: cannot find ONUDevice [%s] for AttWorkflowDriverServiceInstance %s" % (si.serial_number, si.id))
- if onu.admin_state == "DISABLED":
- self.logger.debug("MODEL_POLICY: enabling ONUDevice [%s] for AttWorkflowDriverServiceInstance %s" % (si.serial_number, si.id))
- onu.admin_state = "ENABLED"
- onu.save(always_update_timestamp=True)
+ def update_onu(self, serial_number, admin_state):
+ self.logger.debug("MODEL_POLICY: setting ONUDevice [%s] admin_state to %s" % (serial_number, admin_state))
+ onu = ONUDevice.objects.get(serial_number=serial_number)
+ onu.admin_state = admin_state
+ onu.save(always_update_timestamp=True)
- # handling the subscriber status
+ def get_subscriber(self, serial_number):
+ try:
+ return [s for s in RCORDSubscriber.objects.all() if s.onu_device.lower() == serial_number.lower()][0]
+ except IndexError:
+ # If the subscriber doesn't exist we don't do anything
+ self.logger.debug("MODEL_POLICY: subscriber does not exists for this SI, doing nothing", onu_device=serial_number)
+ return None
- subscriber = None
- try:
- subscriber = [s for s in RCORDSubscriber.objects.all() if s.onu_device.lower() == si.serial_number.lower()][0]
- except IndexError:
- # we just want to find out if it exists or not
- pass
+ def update_subscriber(self, subscriber, si):
+ if si.authentication_state == "AWAITING":
+ subscriber.status = "awaiting-auth"
+ si.status_message += self.separator + "Awaiting Authentication"
+ elif si.authentication_state == "REQUESTED":
+ subscriber.status = "awaiting-auth"
+ si.status_message += self.separator + "Authentication requested"
+ elif si.authentication_state == "STARTED":
+ subscriber.status = "awaiting-auth"
+ si.status_message += self.separator + "Authentication started"
+ elif si.authentication_state == "APPROVED":
+ subscriber.status = "enabled"
+ si.status_message += self.separator + "Authentication succeded"
+ elif si.authentication_state == "DENIED":
+ subscriber.status = "auth-failed"
+ si.status_message += self.separator + "Authentication denied"
+ self.logger.debug("MODEL_POLICY: handling subscriber", onu_device=subscriber.onu_device, authentication_state=si.authentication_state, subscriber_status=subscriber.status)
- if subscriber:
- # if the subscriber is there and authentication is complete, update its state
- self.logger.debug("MODEL_POLICY: handling subscriber", onu_device=si.serial_number, authentication_state=si.authentication_state, onu_state=si.onu_state)
- if si.onu_state == "DISABLED":
- # NOTE do not mess with onu.admin_state as that triggered this condition
- subscriber.status = "awaiting-auth"
- elif si.authentication_state == "STARTED":
- subscriber.status = "awaiting-auth"
- elif si.authentication_state == "REQUESTED":
- subscriber.status = "awaiting-auth"
- elif si.authentication_state == "APPROVED":
- subscriber.status = "enabled"
- elif si.authentication_state == "DENIED":
- subscriber.status = "auth-failed"
-
- subscriber.save(always_update_timestamp=True)
- # if subscriber does not exist
- else:
- self.logger.warn("MODEL_POLICY: subscriber does not exists for this SI, doing nothing")
+ subscriber.save(always_update_timestamp=True)
def handle_delete(self, si):
pass
diff --git a/xos/synchronizer/model_policies/model_policy_att_workflow_driver_whitelistentry.py b/xos/synchronizer/model_policies/model_policy_att_workflow_driver_whitelistentry.py
index c788d23..5cb4c23 100644
--- a/xos/synchronizer/model_policies/model_policy_att_workflow_driver_whitelistentry.py
+++ b/xos/synchronizer/model_policies/model_policy_att_workflow_driver_whitelistentry.py
@@ -31,6 +31,8 @@
# serial_number = whitelist.serial_number,
# owner_id = whitelist.owner.id)
+ # TODO(teone): use the method defined in helpers.py
+
sis = AttWorkflowDriverServiceInstance.objects.all()
for si in sis:
diff --git a/xos/synchronizer/model_policies/test_model_policy_att_workflow_driver_serviceinstance.py b/xos/synchronizer/model_policies/test_model_policy_att_workflow_driver_serviceinstance.py
index c47738d..18c8174 100644
--- a/xos/synchronizer/model_policies/test_model_policy_att_workflow_driver_serviceinstance.py
+++ b/xos/synchronizer/model_policies/test_model_policy_att_workflow_driver_serviceinstance.py
@@ -64,165 +64,146 @@
# tags. Ideally, this wouldn't happen, but it does. So make sure we reset the world.
model_accessor.reset_all_object_stores()
+
self.policy = AttWorkflowDriverServiceInstancePolicy()
self.si = AttWorkflowDriverServiceInstance()
self.si.owner = AttWorkflowDriverService()
+ self.si.serial_number = "BRCM1234"
def tearDown(self):
sys.path = self.sys_path_save
- self.si = None
-
- def test_not_synced(self):
- self.si.valid = "awaiting"
- self.si.backend_code = 0
-
- with patch.object(RCORDSubscriber, "save") as subscriber_save, \
- patch.object(ONUDevice, "save") as onu_save:
-
- with self.assertRaises(Exception) as e:
- self.policy.handle_update(self.si)
-
- self.assertIn("has not been synced yet", e.exception.message)
-
- def test_defer_update(self):
- self.si.valid = "awaiting"
- self.si.backend_code = 1
-
- with patch.object(RCORDSubscriber, "save") as subscriber_save, \
- patch.object(ONUDevice, "save") as onu_save:
-
- with self.assertRaises(Exception) as e:
- self.policy.handle_update(self.si)
-
- self.assertEqual(e.exception.message, "MODEL_POLICY: deferring handle_update for AttWorkflowDriverServiceInstance 98052 as not validated yet")
- subscriber_save.assert_not_called()
- onu_save.assert_not_called()
-
- def test_disable_onu(self):
- self.si.valid = "invalid"
- self.si.serial_number = "BRCM1234"
- self.si.backend_code = 1
- self.si.onu_state = "ENABLED"
-
- onu = ONUDevice(
- serial_number=self.si.serial_number
- )
-
- with patch.object(ONUDevice.objects, "get_items") as onu_objects, \
- patch.object(RCORDSubscriber, "save") as subscriber_save, \
- patch.object(ONUDevice, "save") as onu_save:
-
- onu_objects.return_value = [onu]
-
- self.policy.handle_update(self.si)
- subscriber_save.assert_not_called()
- self.assertEqual(onu.admin_state, "DISABLED")
- onu_save.assert_called()
def test_enable_onu(self):
- self.si.valid = "valid"
- self.si.serial_number = "BRCM1234"
- self.si.c_tag = None
- self.si.backend_code = 1
- self.si.onu_state = "ENABLED"
+ from helpers import AttHelpers
+ with patch.object(AttHelpers, "validate_onu") as validate_onu, \
+ patch.object(self.policy, "update_onu") as update_onu, \
+ patch.object(self.si, "save") as save_si:
+ validate_onu.return_value = [True, "valid onu"]
- onu = ONUDevice(
- serial_number=self.si.serial_number,
- admin_state="DISABLED"
- )
+ self.policy.validate_onu_state(self.si)
- subscriber = RCORDSubscriber(
- onu_device=self.si.serial_number,
- status='pre-provisioned'
- )
+ update_onu.assert_called_once()
+ update_onu.assert_called_with("BRCM1234", "ENABLED")
- with patch.object(ONUDevice.objects, "get_items") as onu_objects, \
- patch.object(RCORDSubscriber.objects, "get_items") as subscriber_objects, \
- patch.object(ONUDevice, "save") as onu_save:
+ self.assertIn("valid onu", self.si.status_message)
- onu_objects.return_value = [onu]
- subscriber_objects.return_value = [subscriber]
+ def test_disable_onu(self):
+ from helpers import AttHelpers
+ with patch.object(AttHelpers, "validate_onu") as validate_onu, \
+ patch.object(self.policy, "update_onu") as update_onu, \
+ patch.object(self.si, "save") as save_si:
+ validate_onu.return_value = [False, "invalid onu"]
+ self.policy.validate_onu_state(self.si)
+
+ update_onu.assert_called_once()
+ update_onu.assert_called_with("BRCM1234", "DISABLED")
+
+ self.assertIn("invalid onu", self.si.status_message)
+
+ def test_handle_update_validate_onu(self):
+ """
+ Testing that handle_update calls validate_onu with the correct parameters
+ when necessary
+ """
+ with patch.object(self.policy, "validate_onu_state") as validate_onu_state, \
+ patch.object(self.policy, "update_onu") as update_onu, \
+ patch.object(self.policy, "get_subscriber") as get_subscriber:
+ update_onu.return_value = None
+ get_subscriber.return_value = None
+
+ self.si.onu_state = "AWAITING"
self.policy.handle_update(self.si)
- self.assertEqual(onu.admin_state, "ENABLED")
- onu_save.assert_called()
+ validate_onu_state.assert_called_with(self.si)
- def test_do_not_create_subscriber(self):
- self.si.valid = "valid"
- self.si.backend_code = 1
- self.si.serial_number = "BRCM1234"
- self.si.authentication_state = "DENIEND"
- self.si.onu_state = "ENABLED"
-
- onu = ONUDevice(
- serial_number=self.si.serial_number,
- admin_state="DISABLED"
- )
-
- with patch.object(ONUDevice.objects, "get_items") as onu_objects, \
- patch.object(RCORDSubscriber, "save", autospec=True) as subscriber_save, \
- patch.object(ONUDevice, "save") as onu_save:
-
- onu_objects.return_value = [onu]
-
+ self.si.onu_state = "ENABLED"
self.policy.handle_update(self.si)
+ validate_onu_state.assert_called_with(self.si)
- self.assertEqual(onu.admin_state, "ENABLED")
- onu_save.assert_called()
- self.assertEqual(subscriber_save.call_count, 0)
+ self.si.onu_state = "DISABLED"
+ self.policy.handle_update(self.si)
+ self.assertEqual(validate_onu_state.call_count, 2)
- def test_subscriber_awaiting_status_onu_state_disabled(self):
- self.si.valid = "valid"
- self.si.backend_code = 1
- self.si.serial_number = "BRCM1234"
+ def test_get_subscriber(self):
+
+ sub = RCORDSubscriber(
+ onu_device="BRCM1234"
+ )
+
+ with patch.object(RCORDSubscriber.objects, "get_items") as get_subscribers:
+ get_subscribers.return_value = [sub]
+
+ res = self.policy.get_subscriber("BRCM1234")
+ self.assertEqual(res, sub)
+
+ res = self.policy.get_subscriber("brcm1234")
+ self.assertEqual(res, sub)
+
+ res = self.policy.get_subscriber("foo")
+ self.assertEqual(res, None)
+
+ def test_update_subscriber(self):
+
+ sub = RCORDSubscriber(
+ onu_device="BRCM1234"
+ )
+
+ self.si.status_message = "some content"
+
+ with patch.object(sub, "save") as sub_save:
+ self.si.authentication_state = "AWAITING"
+ self.policy.update_subscriber(sub, self.si)
+ self.assertEqual(sub.status, "awaiting-auth")
+ self.assertIn("Awaiting Authentication", self.si.status_message)
+ sub_save.assert_called()
+ sub_save.reset_mock()
+
+ self.si.authentication_state = "REQUESTED"
+ self.policy.update_subscriber(sub, self.si)
+ self.assertEqual(sub.status, "awaiting-auth")
+ self.assertIn("Authentication requested", self.si.status_message)
+ sub_save.assert_called()
+ sub_save.reset_mock()
+
+ self.si.authentication_state = "STARTED"
+ self.policy.update_subscriber(sub, self.si)
+ self.assertEqual(sub.status, "awaiting-auth")
+ self.assertIn("Authentication started", self.si.status_message)
+ sub_save.assert_called()
+ sub_save.reset_mock()
+
+ self.si.authentication_state = "APPROVED"
+ self.policy.update_subscriber(sub, self.si)
+ self.assertEqual(sub.status, "enabled")
+ self.assertIn("Authentication succeded", self.si.status_message)
+ sub_save.assert_called()
+ sub_save.reset_mock()
+
+ self.si.authentication_state = "DENIED"
+ self.policy.update_subscriber(sub, self.si)
+ self.assertEqual(sub.status, "auth-failed")
+ self.assertIn("Authentication denied", self.si.status_message)
+ sub_save.assert_called()
+ sub_save.reset_mock()
+
+ def test_handle_update_subscriber(self):
self.si.onu_state = "DISABLED"
- onu = ONUDevice(
- serial_number=self.si.serial_number,
- admin_state="DISABLED"
+ sub = RCORDSubscriber(
+ onu_device="BRCM1234"
)
- subscriber = RCORDSubscriber(
- onu_device=self.si.serial_number,
- status='enabled'
- )
+ with patch.object(self.policy, "get_subscriber") as get_subscriber, \
+ patch.object(self.policy, "update_subscriber") as update_subscriber:
- with patch.object(ONUDevice.objects, "get_items") as onu_objects, \
- patch.object(RCORDSubscriber.objects, "get_items") as subscriber_objects, \
- patch.object(RCORDSubscriber, "save") as subscriber_save:
- onu_objects.return_value = [onu]
- subscriber_objects.return_value = [subscriber]
-
+ get_subscriber.return_value = None
self.policy.handle_update(self.si)
- self.assertEqual(subscriber.status, "awaiting-auth")
- subscriber_save.assert_called()
+ self.assertEqual(update_subscriber.call_count, 0)
- def test_subscriber_enable_status_auth_state_approved(self):
- self.si.valid = "valid"
- self.si.backend_code = 1
- self.si.serial_number = "brcm1234"
- self.si.onu_state = "ENABLED"
- self.si.authentication_state = "APPROVED"
-
- onu = ONUDevice(
- serial_number=self.si.serial_number,
- admin_state="ENABLED"
- )
-
- subscriber = RCORDSubscriber(
- onu_device="BRCM1234",
- status='awaiting-auth'
- )
-
- with patch.object(ONUDevice.objects, "get_items") as onu_objects, \
- patch.object(RCORDSubscriber.objects, "get_items") as subscriber_objects, \
- patch.object(RCORDSubscriber, "save") as subscriber_save:
- onu_objects.return_value = [onu]
- subscriber_objects.return_value = [subscriber]
-
+ get_subscriber.return_value = sub
self.policy.handle_update(self.si)
- self.assertEqual(subscriber.status, "enabled")
- subscriber_save.assert_called()
+ update_subscriber.assert_called_with(sub, self.si)
+
if __name__ == '__main__':
unittest.main()
diff --git a/xos/synchronizer/models/att-workflow-driver.xproto b/xos/synchronizer/models/att-workflow-driver.xproto
index 0ca9201..5a24d8b 100644
--- a/xos/synchronizer/models/att-workflow-driver.xproto
+++ b/xos/synchronizer/models/att-workflow-driver.xproto
@@ -3,19 +3,19 @@
message AttWorkflowDriverService (Service){
option verbose_name = "AttWorkflowDriver Service";
- option kind = "OSS";
+ option kind = "control";
}
message AttWorkflowDriverServiceInstance (ServiceInstance){
option owner_class_name = "AttWorkflowDriverService";
option verbose_name = "AttWorkflowDriver Service Instance";
- required string valid = 1 [default = "awaiting", choices = "(('awaiting', 'Awaiting Validation'), ('valid', 'Valid'), ('invalid', 'Invalid'))", help_text = "Wether this ONU has been validated by the external OSS"];
required string serial_number = 2 [max_length = 254, db_index = False, tosca_key=True, unique = True];
required string authentication_state = 3 [default = "AWAITING", choices = "(('AWAITING', 'Awaiting'), ('STARTED', 'Started'), ('REQUESTED', 'Requested'), ('APPROVED', 'Approved'), ('DENIED', 'Denied'), )", max_length = 50, db_index = False];
required string of_dpid = 4 [max_length = 254, db_index = False];
required int32 uni_port_id = 5 [db_index = False];
required string onu_state = 6 [max_length = 254, db_index = False, default = "AWAITING", choices = "(('AWAITING', 'Awaiting'), ('ENABLED', 'Enabled'), ('DISABLED', 'Disabled'))"];
+ optional string status_message = 7 [max_length = 254, db_index = False, default = ""];
}
message AttWorkflowDriverWhiteListEntry (XOSBase) {
diff --git a/xos/synchronizer/steps/sync_att_workflow_driver_service_instance.py b/xos/synchronizer/steps/sync_att_workflow_driver_service_instance.py
deleted file mode 100644
index 6923062..0000000
--- a/xos/synchronizer/steps/sync_att_workflow_driver_service_instance.py
+++ /dev/null
@@ -1,65 +0,0 @@
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import json
-from synchronizers.new_base.syncstep import SyncStep, model_accessor
-from synchronizers.new_base.modelaccessor import AttWorkflowDriverServiceInstance, AttWorkflowDriverWhiteListEntry, ONUDevice
-
-from xosconfig import Config
-from multistructlog import create_logger
-
-log = create_logger(Config().get('logging'))
-
-class SyncAttWorkflowDriverServiceInstance(SyncStep):
- provides = [AttWorkflowDriverServiceInstance]
- observes = AttWorkflowDriverServiceInstance
-
- def validate_onu(self, si):
- # This is where you may want to call your OSS Database to verify if this ONU can be activated
- oss_service = si.owner.leaf_model
-
- # See if there is a matching entry in the whitelist.
-
- matching_entries = AttWorkflowDriverWhiteListEntry.objects.filter(owner_id=oss_service.id,
- serial_number=si.serial_number)
-
- # check that it's in the whitelist
- if len(matching_entries) == 0:
- log.warn("ONU disable as not in whitelist", object=str(si), serial_number=si.serial_number, **si.tologdict())
- return False
-
- whitelisted = matching_entries[0]
-
- # FIXME if the ONU is not there yet it raise an index error, if that happens raise DeferredException
- pon_port = ONUDevice.objects.get(serial_number=si.serial_number).pon_port
- if pon_port.port_no != whitelisted.pon_port_id or si.of_dpid != whitelisted.device_id:
- log.warn("ONU disable as location don't match", object=str(si), serial_number=si.serial_number,
- **si.tologdict())
- return False
-
- return True
-
- def sync_record(self, si):
- log.info("synching AttWorkflowDriverServiceInstance", object=str(si), **si.tologdict())
-
- if not self.validate_onu(si):
- log.error("ONU with serial number %s is not valid in the OSS Database" % si.serial_number)
- si.valid = "invalid"
- else:
- si.valid = "valid"
-
- si.save()
-
- def delete_record(self, o):
- pass
diff --git a/xos/synchronizer/steps/test_sync_att_workflow_driver_service_instance.py b/xos/synchronizer/steps/test_sync_att_workflow_driver_service_instance.py
deleted file mode 100644
index 2b82b1b..0000000
--- a/xos/synchronizer/steps/test_sync_att_workflow_driver_service_instance.py
+++ /dev/null
@@ -1,146 +0,0 @@
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import unittest
-
-import functools
-from mock import patch, call, Mock, PropertyMock
-import requests_mock
-import multistructlog
-from multistructlog import create_logger
-
-import os, sys
-
-# Hack to load synchronizer framework
-test_path=os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
-xos_dir=os.path.join(test_path, "../../..")
-if not os.path.exists(os.path.join(test_path, "new_base")):
- xos_dir=os.path.join(test_path, "../../../../../../orchestration/xos/xos")
- services_dir = os.path.join(xos_dir, "../../xos_services")
-sys.path.append(xos_dir)
-sys.path.append(os.path.join(xos_dir, 'synchronizers', 'new_base'))
-# END Hack to load synchronizer framework
-
-# generate model from xproto
-def get_models_fn(service_name, xproto_name):
- name = os.path.join(service_name, "xos", xproto_name)
- if os.path.exists(os.path.join(services_dir, name)):
- return name
- else:
- name = os.path.join(service_name, "xos", "synchronizer", "models", xproto_name)
- if os.path.exists(os.path.join(services_dir, name)):
- return name
- raise Exception("Unable to find service=%s xproto=%s" % (service_name, xproto_name))
-# END generate model from xproto
-
-class TestSyncAttWorkflowDriverServiceInstance(unittest.TestCase):
-
- def setUp(self):
-
- self.sys_path_save = sys.path
- sys.path.append(xos_dir)
- sys.path.append(os.path.join(xos_dir, 'synchronizers', 'new_base'))
-
- # Setting up the config module
- from xosconfig import Config
- config = os.path.join(test_path, "../test_config.yaml")
- Config.clear()
- Config.init(config, "synchronizer-config-schema.yaml")
- # END Setting up the config module
-
- from synchronizers.new_base.mock_modelaccessor_build import build_mock_modelaccessor
- build_mock_modelaccessor(xos_dir, services_dir, [
- get_models_fn("att-workflow-driver", "att-workflow-driver.xproto"),
- get_models_fn("olt-service", "volt.xproto"),
- get_models_fn("../profiles/rcord", "rcord.xproto")
- ])
- import synchronizers.new_base.modelaccessor
-
- from sync_att_workflow_driver_service_instance import SyncAttWorkflowDriverServiceInstance, model_accessor
-
- # import all class names to globals
- for (k, v) in model_accessor.all_model_classes.items():
- globals()[k] = v
-
-
- self.sync_step = SyncAttWorkflowDriverServiceInstance
-
- self.oss = AttWorkflowDriverService()
- self.oss.name = "oss"
- self.oss.id = 5367
-
- # create a mock AttWorkflowDriverServiceInstance instance
- self.o = Mock()
- self.o.serial_number = "BRCM1234"
- self.o.of_dpid = "of:109299321"
- self.o.pon_port_id = 1
- self.o.owner.leaf_model = self.oss
- self.o.tologdict.return_value = {}
-
- self.pon_port = PONPort(
- port_no=1
- )
- self.onu = ONUDevice(
- serial_number=self.o.serial_number,
- pon_port=self.pon_port
- )
-
- def tearDown(self):
- self.o = None
- sys.path = self.sys_path_save
-
- def test_sync_valid(self):
- with patch.object(AttWorkflowDriverWhiteListEntry.objects, "get_items") as whitelist_items, \
- patch.object(ONUDevice.objects, "get_items") as onu_items:
- # Create a whitelist entry for self.o's serial number
- whitelist_entry = AttWorkflowDriverWhiteListEntry(
- owner_id=self.oss.id,
- serial_number=self.o.serial_number,
- device_id=self.o.of_dpid,
- pon_port_id=1,
- )
- whitelist_items.return_value = [whitelist_entry]
- onu_items.return_value = [self.onu]
-
- self.sync_step().sync_record(self.o)
-
- self.assertEqual(self.o.valid, "valid")
- self.o.save.assert_called()
-
- def test_sync_bad_location(self):
- with patch.object(AttWorkflowDriverWhiteListEntry.objects, "get_items") as whitelist_items, \
- patch.object(ONUDevice.objects, "get_items") as onu_items:
- # Create a whitelist entry for self.o's serial number
- whitelist_entry = AttWorkflowDriverWhiteListEntry(
- owner_id=self.oss.id,
- serial_number=self.o.serial_number,
- device_id="foo",
- pon_port_id=666
- )
- whitelist_items.return_value = [whitelist_entry]
- onu_items.return_value = [self.onu]
-
- self.sync_step().sync_record(self.o)
-
- self.assertEqual(self.o.valid, "invalid")
- self.o.save.assert_called()
-
- def test_sync_no_whitelist(self):
- self.sync_step().sync_record(self.o)
-
- self.assertEqual(self.o.valid, "invalid")
- self.o.save.assert_called()
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/xos/synchronizer/test_helpers.py b/xos/synchronizer/test_helpers.py
new file mode 100644
index 0000000..910930b
--- /dev/null
+++ b/xos/synchronizer/test_helpers.py
@@ -0,0 +1,171 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+from mock import patch, call, Mock, PropertyMock
+import json
+
+import os, sys
+
+# Hack to load synchronizer framework
+test_path=os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
+if not os.path.exists(os.path.join(test_path, "new_base")):
+ xos_dir=os.path.join(test_path, "../../../../../orchestration/xos/xos")
+ services_dir=os.path.join(xos_dir, "../../xos_services")
+# END Hack to load synchronizer framework
+
+
+# generate model from xproto
+def get_models_fn(service_name, xproto_name):
+ name = os.path.join(service_name, "xos", "synchronizer", "models", xproto_name)
+ if os.path.exists(os.path.join(services_dir, name)):
+ return name
+ raise Exception("Unable to find service=%s xproto=%s" % (service_name, xproto_name))
+# END generate model from xproto
+
+class TestAttHelpers(unittest.TestCase):
+
+ def setUp(self):
+
+ self.sys_path_save = sys.path
+ sys.path.append(xos_dir)
+ sys.path.append(os.path.join(xos_dir, 'synchronizers', 'new_base'))
+
+ # Setting up the config module
+ from xosconfig import Config
+ config = os.path.join(test_path, "test_config.yaml")
+ Config.clear()
+ Config.init(config, "synchronizer-config-schema.yaml")
+ # END Setting up the config module
+
+ from synchronizers.new_base.mock_modelaccessor_build import build_mock_modelaccessor
+
+ build_mock_modelaccessor(xos_dir, services_dir, [
+ get_models_fn("att-workflow-driver", "att-workflow-driver.xproto"),
+ get_models_fn("olt-service", "volt.xproto"),
+ get_models_fn("../profiles/rcord", "rcord.xproto")
+ ])
+ import synchronizers.new_base.modelaccessor
+ from helpers import AttHelpers, model_accessor
+
+ # import all class names to globals
+ for (k, v) in model_accessor.all_model_classes.items():
+ globals()[k] = v
+
+ self.helpers = AttHelpers
+
+ self._volt = VOLTService()
+ self._volt.id = 1
+
+ self.volt = Service()
+ self.volt.id = 1
+ self.volt.name = "vOLT"
+ self.volt.leaf_model = self._volt
+
+ self.pon_port = PONPort()
+ self.pon_port.port_no = 1234
+
+ self.onu = ONUDevice()
+ self.onu.pon_port = self.pon_port
+ self.onu.serial_number = "BRCM1234"
+
+ self.att_si = AttWorkflowDriverServiceInstance(
+ serial_number="BRCM1234",
+ owner=self.volt,
+ owner_id=self.volt.id,
+ of_dpid="of:1234"
+ )
+
+ self.whitelist_entry = AttWorkflowDriverWhiteListEntry(
+ serial_number="BRCM1234",
+ owner=self.volt,
+ owner_id=self.volt.id,
+ pon_port_id=1234,
+ device_id="of:1234"
+ )
+
+
+ def tearDown(self):
+ sys.path = self.sys_path_save
+
+ def test_not_in_whitelist(self):
+
+ with patch.object(AttWorkflowDriverWhiteListEntry.objects, "get_items") as whitelist_mock:
+ whitelist_mock.return_value = []
+
+ [res, message] = self.helpers.validate_onu(self.att_si)
+
+ self.assertFalse(res)
+ self.assertEqual(message, "ONU not found in whitelist")
+
+ def test_wrong_location_port(self):
+ self.pon_port.port_no = 666
+ with patch.object(AttWorkflowDriverWhiteListEntry.objects, "get_items") as whitelist_mock, \
+ patch.object(ONUDevice.objects, "get_items") as onu_mock:
+ whitelist_mock.return_value = [self.whitelist_entry]
+ onu_mock.return_value = [self.onu]
+
+ [res, message] = self.helpers.validate_onu(self.att_si)
+
+ self.assertFalse(res)
+ self.assertEqual(message, "ONU activated in wrong location")
+
+ def test_wrong_location_device(self):
+ self.att_si.of_dpid = 666
+ with patch.object(AttWorkflowDriverWhiteListEntry.objects, "get_items") as whitelist_mock, \
+ patch.object(ONUDevice.objects, "get_items") as onu_mock:
+ whitelist_mock.return_value = [self.whitelist_entry]
+ onu_mock.return_value = [self.onu]
+
+ [res, message] = self.helpers.validate_onu(self.att_si)
+
+ self.assertFalse(res)
+ self.assertEqual(message, "ONU activated in wrong location")
+
+ def test_deferred_validation(self):
+ with patch.object(AttWorkflowDriverWhiteListEntry.objects, "get_items") as whitelist_mock, \
+ patch.object(ONUDevice.objects, "get_items") as onu_mock:
+ whitelist_mock.return_value = [self.whitelist_entry]
+ onu_mock.return_value = []
+
+ with self.assertRaises(Exception) as e:
+ self.helpers.validate_onu(self.att_si)
+
+ self.assertEqual(e.exception.message, "ONU device %s is not know to XOS yet" % self.att_si.serial_number)
+
+ def test_validating_onu(self):
+ with patch.object(AttWorkflowDriverWhiteListEntry.objects, "get_items") as whitelist_mock, \
+ patch.object(ONUDevice.objects, "get_items") as onu_mock:
+ whitelist_mock.return_value = [self.whitelist_entry]
+ onu_mock.return_value = [self.onu]
+
+ [res, message] = self.helpers.validate_onu(self.att_si)
+
+ self.assertTrue(res)
+ self.assertEqual(message, "ONU has been validated")
+
+ def test_validating_onu_lowercase(self):
+ self.whitelist_entry.serial_number = "brcm1234"
+ with patch.object(AttWorkflowDriverWhiteListEntry.objects, "get_items") as whitelist_mock, \
+ patch.object(ONUDevice.objects, "get_items") as onu_mock:
+ whitelist_mock.return_value = [self.whitelist_entry]
+ onu_mock.return_value = [self.onu]
+
+ [res, message] = self.helpers.validate_onu(self.att_si)
+
+ self.assertTrue(res)
+ self.assertEqual(message, "ONU has been validated")
+
+if __name__ == '__main__':
+ unittest.main()
\ No newline at end of file