[SEBA-164] Forcing subscriber auth if onu goes down
Change-Id: I6d1b721ac67533fb2b58048666771b50d3a84b2b
diff --git a/xos/synchronizer/event_steps/auth_event.py b/xos/synchronizer/event_steps/auth_event.py
index 5713f9f..9842406 100644
--- a/xos/synchronizer/event_steps/auth_event.py
+++ b/xos/synchronizer/event_steps/auth_event.py
@@ -36,27 +36,18 @@
return onu_sn
- def get_hippie_oss_si_by_sn(self, serial_number):
+ def get_si_by_sn(self, serial_number):
try:
return AttWorkflowDriverServiceInstance.objects.get(serial_number=serial_number)
except IndexError:
self.log.exception("authentication.events: Cannot find hippie-oss service instance for this event", kafka_event=value)
raise Exception("authentication.events: Cannot find hippie-oss service instance for this event")
-
- def activate_subscriber(self, subscriber):
- subscriber.status = 'enabled'
- subscriber.save()
-
- def disable_subscriber(self, subscriber):
- subscriber.status = 'auth-failed'
- subscriber.save()
-
def process_event(self, event):
value = json.loads(event.value)
onu_sn = self.get_onu_sn(value)
- si = self.get_hippie_oss_si_by_sn(onu_sn)
+ si = self.get_si_by_sn(onu_sn)
if not si:
self.log.exception("authentication.events: Cannot find hippie-oss service instance for this event", kafka_event=value)
raise Exception("authentication.events: Cannot find hippie-oss service instance for this event")
diff --git a/xos/synchronizer/event_steps/onu_event.py b/xos/synchronizer/event_steps/onu_event.py
index 045198f..14c6d9b 100644
--- a/xos/synchronizer/event_steps/onu_event.py
+++ b/xos/synchronizer/event_steps/onu_event.py
@@ -30,15 +30,12 @@
def __init__(self, *args, **kwargs):
super(ONUEventStep, self).__init__(*args, **kwargs)
- def handle_onu_activate_event(self, event):
-
- # NOTE do we need to wait of the ONU to be there?
-
- self.log.info("onu.events: validating ONU %s" % event["serial_number"], event_data=event)
-
+ def get_att_si(self, event):
try:
att_si = AttWorkflowDriverServiceInstance.objects.get(serial_number=event["serial_number"])
att_si.no_sync = False;
+ att_si.uni_port_id = event["uni_port_id"]
+ att_si.of_dpid = event["of_dpid"]
self.log.debug("onu.events: Found existing AttWorkflowDriverServiceInstance", si=att_si)
except IndexError:
# create an AttWorkflowDriverServiceInstance, the validation will be triggered in the corresponding sync step
@@ -48,13 +45,19 @@
uni_port_id=event["uni_port_id"]
)
self.log.debug("onu.events: Created new AttWorkflowDriverServiceInstance", si=att_si)
- att_si.save()
+ return att_si
def process_event(self, event):
value = json.loads(event.value)
self.log.info("onu.events: received event", value=value)
+ att_si = self.get_att_si(value)
if value["status"] == "activated":
- self.log.info("onu.events: activate onu", value=value)
- self.handle_onu_activate_event(value)
+ self.log.info("onu.events: activated onu", value=value)
+ att_si.onu_state = "ACTIVE"
+ elif value["status"] == "disabled":
+ self.log.info("onu.events: disabled onu", value=value)
+ att_si.onu_state = "DISABLED"
+ att_si.save(always_update_timestamp=True)
+
diff --git a/xos/synchronizer/event_steps/test_onu_events.py b/xos/synchronizer/event_steps/test_onu_events.py
index e81f876..b690663 100644
--- a/xos/synchronizer/event_steps/test_onu_events.py
+++ b/xos/synchronizer/event_steps/test_onu_events.py
@@ -94,7 +94,7 @@
with patch.object(AttWorkflowDriverServiceInstance.objects, "get_items") as att_si_mock , \
patch.object(AttWorkflowDriverServiceInstance, "save", autospec=True) as mock_save:
- att_si_mock.side_effect = IndexError
+ att_si_mock.return_value = []
self.event_step.process_event(self.event)
@@ -105,3 +105,56 @@
self.assertEqual(att_si.serial_number, self.event_dict['serial_number'])
self.assertEqual(att_si.of_dpid, self.event_dict['of_dpid'])
self.assertEqual(att_si.uni_port_id, self.event_dict['uni_port_id'])
+ self.assertEqual(att_si.onu_state, "ACTIVE")
+
+ def test_reuse_instance(self):
+
+ si = AttWorkflowDriverServiceInstance(
+ serial_number=self.event_dict["serial_number"],
+ of_dpid="foo",
+ uni_port_id="foo"
+ )
+
+ with patch.object(AttWorkflowDriverServiceInstance.objects, "get_items") as att_si_mock , \
+ patch.object(AttWorkflowDriverServiceInstance, "save", autospec=True) as mock_save:
+
+ att_si_mock.return_value = [si]
+
+ self.event_step.process_event(self.event)
+
+ att_si = mock_save.call_args[0][0]
+
+ self.assertEqual(mock_save.call_count, 1)
+
+ self.assertEqual(att_si.serial_number, self.event_dict['serial_number'])
+ self.assertEqual(att_si.of_dpid, self.event_dict['of_dpid'])
+ self.assertEqual(att_si.uni_port_id, self.event_dict['uni_port_id'])
+ self.assertEqual(att_si.onu_state, "ACTIVE")
+
+ def test_disable_onu(self):
+ self.event_dict = {
+ 'status': 'disabled',
+ 'serial_number': 'BRCM1234',
+ 'of_dpid': 'of:109299321',
+ 'uni_port_id': 16
+ }
+ self.event.value = json.dumps(self.event_dict)
+
+ with patch.object(AttWorkflowDriverServiceInstance.objects, "get_items") as att_si_mock , \
+ patch.object(AttWorkflowDriverServiceInstance, "save", autospec=True) as mock_save:
+
+ att_si_mock.return_value = []
+
+ self.event_step.process_event(self.event)
+
+ att_si = mock_save.call_args[0][0]
+
+ self.assertEqual(mock_save.call_count, 1)
+
+ self.assertEqual(att_si.serial_number, self.event_dict['serial_number'])
+ self.assertEqual(att_si.of_dpid, self.event_dict['of_dpid'])
+ self.assertEqual(att_si.uni_port_id, self.event_dict['uni_port_id'])
+ self.assertEqual(att_si.onu_state, "DISABLED")
+
+if __name__ == '__main__':
+ unittest.main()
\ No newline at end of file
diff --git a/xos/synchronizer/model_policies/model_policy_att_workflow_driver_serviceinstance.py b/xos/synchronizer/model_policies/model_policy_att_workflow_driver_serviceinstance.py
index 37946ed..ef3363b 100644
--- a/xos/synchronizer/model_policies/model_policy_att_workflow_driver_serviceinstance.py
+++ b/xos/synchronizer/model_policies/model_policy_att_workflow_driver_serviceinstance.py
@@ -17,6 +17,9 @@
from synchronizers.new_base.modelaccessor import RCORDSubscriber, ONUDevice, model_accessor
from synchronizers.new_base.policy import Policy
+class DeferredException(Exception):
+ pass
+
class AttWorkflowDriverServiceInstancePolicy(Policy):
model_name = "AttWorkflowDriverServiceInstance"
@@ -24,37 +27,24 @@
self.logger.debug("MODEL_POLICY: handle_create for AttWorkflowDriverServiceInstance %s " % si.id)
self.handle_update(si)
- def update_and_save_subscriber(self, subscriber, si, update_timestamp=False):
- if si.authentication_state == "STARTED":
- subscriber.status = "awaiting-auth"
- elif si.authentication_state == "REQUESTED":
- subscriber.status = "awaiting-auth"
- elif si.authentication_state == "APPROVED":
- subscriber.status = "enabled"
- elif si.authentication_state == "DENIED":
- subscriber.status = "auth-failed"
-
- subscriber.save(always_update_timestamp=update_timestamp)
-
- def create_subscriber(self, si):
- subscriber = RCORDSubscriber()
- subscriber.onu_device = si.serial_number
- subscriber.status == "awaiting-auth"
-
- return subscriber
-
def handle_update(self, si):
+
+ # TODO if si.onu_state = DISABLED set subscriber.status to need_auth
+ # TODO cleanup
+
self.logger.debug("MODEL_POLICY: handle_update for AttWorkflowDriverServiceInstance %s, valid=%s " % (si.id, si.valid))
# Check to make sure the object has been synced. This is to cover a race condition where the model_policy
# runs, is interrupted by the sync step, the sync step completes, and then the model policy ends up saving
# a policed_timestamp that is later the updated timestamp set by the sync_step.
if (si.backend_code!=1):
- raise Exception("MODEL_POLICY: AttWorkflowDriverServiceInstance %s has not been synced yet" % si.id)
+ raise DeferredException("MODEL_POLICY: AttWorkflowDriverServiceInstance %s has not been synced yet" % si.id)
+ # waiting for Whitelist validation
if not hasattr(si, 'valid') or si.valid is "awaiting":
- self.logger.debug("MODEL_POLICY: skipping handle_update for AttWorkflowDriverServiceInstance %s as not validated yet" % si.id)
- return
+ raise DeferredException("MODEL_POLICY: deferring handle_update for AttWorkflowDriverServiceInstance %s as not validated yet" % si.id)
+
+ # disabling ONU
if si.valid == "invalid":
self.logger.debug("MODEL_POLICY: disabling ONUDevice [%s] for AttWorkflowDriverServiceInstance %s" % (si.serial_number, si.id))
onu = ONUDevice.objects.get(serial_number=si.serial_number)
@@ -82,24 +72,25 @@
# we just want to find out if it exists or not
pass
+ if subscriber:
+ # if the subscriber is there and authentication is complete, update its state
+ self.logger.debug("MODEL_POLICY: handling subscriber", onu_device=si.serial_number, authentication_state=si.authentication_state, onu_state=si.onu_state)
+ if si.onu_state == "DISABLED":
+ # NOTE do not mess with onu.admin_state as that triggered this condition
+ subscriber.status = "awaiting-auth"
+ elif si.authentication_state == "STARTED":
+ subscriber.status = "awaiting-auth"
+ elif si.authentication_state == "REQUESTED":
+ subscriber.status = "awaiting-auth"
+ elif si.authentication_state == "APPROVED":
+ subscriber.status = "enabled"
+ elif si.authentication_state == "DENIED":
+ subscriber.status = "auth-failed"
+
+ subscriber.save(always_update_timestamp=True)
# if subscriber does not exist
- self.logger.debug("MODEL_POLICY: handling subscriber", onu_device=si.serial_number, create_on_discovery=si.owner.leaf_model.create_on_discovery)
- if not subscriber:
- # and create_on_discovery is false
- if not si.owner.leaf_model.create_on_discovery:
- # do not create the subscriber, unless it has been approved
- if si.authentication_state == "APPROVED":
- self.logger.debug("MODEL_POLICY: creating subscriber as authentication_sate=APPROVED")
- subscriber = self.create_subscriber(si)
- self.update_and_save_subscriber(subscriber, si)
- else:
- self.logger.debug("MODEL_POLICY: creating subscriber")
- subscriber = self.create_subscriber(si)
- self.update_and_save_subscriber(subscriber, si)
- # if the subscriber is there and authentication is complete, update its state
- elif subscriber and si.authentication_state == "APPROVED":
- self.logger.debug("MODEL_POLICY: updating subscriber status")
- self.update_and_save_subscriber(subscriber, si, update_timestamp=True)
+ else:
+ self.logger.warn("MODEL_POLICY: subscriber does not exists for this SI, doing nothing")
def handle_delete(self, si):
pass
diff --git a/xos/synchronizer/model_policies/test_model_policy_att_workflow_driver_serviceinstance.py b/xos/synchronizer/model_policies/test_model_policy_att_workflow_driver_serviceinstance.py
index 4386eb3..774efda 100644
--- a/xos/synchronizer/model_policies/test_model_policy_att_workflow_driver_serviceinstance.py
+++ b/xos/synchronizer/model_policies/test_model_policy_att_workflow_driver_serviceinstance.py
@@ -84,14 +84,17 @@
self.assertIn("has not been synced yet", e.exception.message)
- def test_skip_update(self):
+ def test_defer_update(self):
self.si.valid = "awaiting"
self.si.backend_code = 1
with patch.object(RCORDSubscriber, "save") as subscriber_save, \
patch.object(ONUDevice, "save") as onu_save:
- self.policy.handle_update(self.si)
+ with self.assertRaises(Exception) as e:
+ self.policy.handle_update(self.si)
+
+ self.assertEqual(e.exception.message, "MODEL_POLICY: deferring handle_update for AttWorkflowDriverServiceInstance 98052 as not validated yet")
subscriber_save.assert_not_called()
onu_save.assert_not_called()
@@ -99,6 +102,7 @@
self.si.valid = "invalid"
self.si.serial_number = "BRCM1234"
self.si.backend_code = 1
+ self.si.onu_state = "ENABLED"
onu = ONUDevice(
serial_number=self.si.serial_number
@@ -120,6 +124,7 @@
self.si.serial_number = "BRCM1234"
self.si.c_tag = None
self.si.backend_code = 1
+ self.si.onu_state = "ENABLED"
onu = ONUDevice(
serial_number=self.si.serial_number,
@@ -147,7 +152,7 @@
self.si.backend_code = 1
self.si.serial_number = "BRCM1234"
self.si.authentication_state = "DENIEND"
- self.si.owner.leaf_model.create_on_discovery = False
+ self.si.onu_state = "ENABLED"
onu = ONUDevice(
serial_number=self.si.serial_number,
@@ -166,40 +171,37 @@
onu_save.assert_called()
self.assertEqual(subscriber_save.call_count, 0)
- def test_create_subscriber(self):
+ def test_subscriber_awaiting_status_onu_state_disabled(self):
self.si.valid = "valid"
- self.si.serial_number = "BRCM1234"
self.si.backend_code = 1
+ self.si.serial_number = "BRCM1234"
+ self.si.onu_state = "DISABLED"
onu = ONUDevice(
serial_number=self.si.serial_number,
- admin_state="ENABLED"
+ admin_state="DISABLED"
+ )
+
+ subscriber = RCORDSubscriber(
+ onu_device=self.si.serial_number,
+ status='enabled'
)
with patch.object(ONUDevice.objects, "get_items") as onu_objects, \
- patch.object(RCORDSubscriber, "save", autospec=True) as subscriber_save, \
- patch.object(ONUDevice, "save") as onu_save:
-
+ patch.object(RCORDSubscriber.objects, "get_items") as subscriber_objects, \
+ patch.object(RCORDSubscriber, "save") as subscriber_save:
onu_objects.return_value = [onu]
+ subscriber_objects.return_value = [subscriber]
self.policy.handle_update(self.si)
- self.assertEqual(subscriber_save.call_count, 1)
+ self.assertEqual(subscriber.status, "awaiting-auth")
+ subscriber_save.assert_called()
- subscriber = subscriber_save.call_args[0][0]
- self.assertEqual(subscriber.onu_device, self.si.serial_number)
-
- onu_save.assert_not_called()
-
- def test_create_subscriber_no_create_on_discovery(self):
- """
- test_create_subscriber_no_create_on_discovery
- When si.owner.create_on_discovery = False we still need to create the subscriber after authentication
- """
-
+ def test_subscriber_enable_status_auth_state_approved(self):
self.si.valid = "valid"
- self.si.serial_number = "BRCM1234"
self.si.backend_code = 1
- self.si.owner.leaf_model.create_on_discovery = False
+ self.si.serial_number = "BRCM1234"
+ self.si.onu_state = "ENABLED"
self.si.authentication_state = "APPROVED"
onu = ONUDevice(
@@ -207,19 +209,20 @@
admin_state="ENABLED"
)
- with patch.object(ONUDevice.objects, "get_items") as onu_objects, \
- patch.object(RCORDSubscriber, "save", autospec=True) as subscriber_save, \
- patch.object(ONUDevice, "save") as onu_save:
+ subscriber = RCORDSubscriber(
+ onu_device=self.si.serial_number,
+ status='awaiting-auth'
+ )
+ with patch.object(ONUDevice.objects, "get_items") as onu_objects, \
+ patch.object(RCORDSubscriber.objects, "get_items") as subscriber_objects, \
+ patch.object(RCORDSubscriber, "save") as subscriber_save:
onu_objects.return_value = [onu]
+ subscriber_objects.return_value = [subscriber]
self.policy.handle_update(self.si)
- self.assertEqual(subscriber_save.call_count, 1)
-
- subscriber = subscriber_save.call_args[0][0]
- self.assertEqual(subscriber.onu_device, self.si.serial_number)
-
- onu_save.assert_not_called()
+ self.assertEqual(subscriber.status, "enabled")
+ subscriber_save.assert_called()
if __name__ == '__main__':
unittest.main()
diff --git a/xos/synchronizer/models/att-workflow-driver.xproto b/xos/synchronizer/models/att-workflow-driver.xproto
index f1a161a..0ca9201 100644
--- a/xos/synchronizer/models/att-workflow-driver.xproto
+++ b/xos/synchronizer/models/att-workflow-driver.xproto
@@ -4,8 +4,6 @@
message AttWorkflowDriverService (Service){
option verbose_name = "AttWorkflowDriver Service";
option kind = "OSS";
-
- required bool create_on_discovery = 2 [help_text = "Whether to create the subscriber when an ONU is discovered", db_index = False, default = True];
}
message AttWorkflowDriverServiceInstance (ServiceInstance){
@@ -17,6 +15,7 @@
required string authentication_state = 3 [default = "AWAITING", choices = "(('AWAITING', 'Awaiting'), ('STARTED', 'Started'), ('REQUESTED', 'Requested'), ('APPROVED', 'Approved'), ('DENIED', 'Denied'), )", max_length = 50, db_index = False];
required string of_dpid = 4 [max_length = 254, db_index = False];
required int32 uni_port_id = 5 [db_index = False];
+ required string onu_state = 6 [max_length = 254, db_index = False, default = "AWAITING", choices = "(('AWAITING', 'Awaiting'), ('ENABLED', 'Enabled'), ('DISABLED', 'Disabled'))"];
}
message AttWorkflowDriverWhiteListEntry (XOSBase) {
diff --git a/xos/synchronizer/steps/sync_att_workflow_driver_service_instance.py b/xos/synchronizer/steps/sync_att_workflow_driver_service_instance.py
index 0b9546b..6923062 100644
--- a/xos/synchronizer/steps/sync_att_workflow_driver_service_instance.py
+++ b/xos/synchronizer/steps/sync_att_workflow_driver_service_instance.py
@@ -40,6 +40,8 @@
return False
whitelisted = matching_entries[0]
+
+ # FIXME if the ONU is not there yet it raise an index error, if that happens raise DeferredException
pon_port = ONUDevice.objects.get(serial_number=si.serial_number).pon_port
if pon_port.port_no != whitelisted.pon_port_id or si.of_dpid != whitelisted.device_id:
log.warn("ONU disable as location don't match", object=str(si), serial_number=si.serial_number,