Moving DHCP event handling in model policy, update the subscriber only when needed
Change-Id: I77ec7c05d802a5902d96a8216b932d7a703af4e9
diff --git a/xos/synchronizer/event_steps/auth_event.py b/xos/synchronizer/event_steps/auth_event.py
index 9842406..6786c98 100644
--- a/xos/synchronizer/event_steps/auth_event.py
+++ b/xos/synchronizer/event_steps/auth_event.py
@@ -18,7 +18,8 @@
import os
import sys
from synchronizers.new_base.eventstep import EventStep
-from synchronizers.new_base.modelaccessor import VOLTService, AttWorkflowDriverServiceInstance, model_accessor
+from synchronizers.new_base.modelaccessor import model_accessor
+from helpers import AttHelpers
class SubscriberAuthEventStep(EventStep):
topics = ["authentication.events"]
@@ -27,31 +28,14 @@
def __init__(self, *args, **kwargs):
super(SubscriberAuthEventStep, self).__init__(*args, **kwargs)
- def get_onu_sn(self, event):
- olt_service = VOLTService.objects.first()
- onu_sn = olt_service.get_onu_sn_from_openflow(event["deviceId"], event["portNumber"])
- if not onu_sn or onu_sn is None:
- self.log.exception("authentication.events: Cannot find onu serial number for this event", kafka_event=event)
- raise Exception("authentication.events: Cannot find onu serial number for this event")
-
- return onu_sn
-
- def get_si_by_sn(self, serial_number):
- try:
- return AttWorkflowDriverServiceInstance.objects.get(serial_number=serial_number)
- except IndexError:
- self.log.exception("authentication.events: Cannot find hippie-oss service instance for this event", kafka_event=value)
- raise Exception("authentication.events: Cannot find hippie-oss service instance for this event")
-
def process_event(self, event):
value = json.loads(event.value)
- onu_sn = self.get_onu_sn(value)
- si = self.get_si_by_sn(onu_sn)
+ onu_sn = AttHelpers.get_onu_sn(value)
+ si = AttHelpers.get_si_by_sn(onu_sn)
if not si:
- self.log.exception("authentication.events: Cannot find hippie-oss service instance for this event", kafka_event=value)
- raise Exception("authentication.events: Cannot find hippie-oss service instance for this event")
+ self.log.exception("authentication.events: Cannot find att-workflow-driver service instance for this event", kafka_event=value)
+ raise Exception("authentication.events: Cannot find att-workflow-driver service instance for this event")
si.authentication_state = value["authenticationState"];
- si.no_sync = True
- si.save(update_fields=["authentication_state", "no_sync", "updated"], always_update_timestamp=True)
+ si.save(update_fields=["authentication_state", "updated"], always_update_timestamp=True)
diff --git a/xos/synchronizer/event_steps/dhcp_event.py b/xos/synchronizer/event_steps/dhcp_event.py
index b41f803..3d04e96 100644
--- a/xos/synchronizer/event_steps/dhcp_event.py
+++ b/xos/synchronizer/event_steps/dhcp_event.py
@@ -19,6 +19,7 @@
import sys
from synchronizers.new_base.eventstep import EventStep
from synchronizers.new_base.modelaccessor import VOLTService, RCORDSubscriber, model_accessor
+from helpers import AttHelpers
class SubscriberDhcpEventStep(EventStep):
topics = ["dhcp.events"]
@@ -27,30 +28,31 @@
def __init__(self, *args, **kwargs):
super(SubscriberDhcpEventStep, self).__init__(*args, **kwargs)
- def get_onu_sn(self, event):
- olt_service = VOLTService.objects.first()
- onu_sn = olt_service.get_onu_sn_from_openflow(event["deviceId"], event["portNumber"])
- if not onu_sn or onu_sn is None:
- self.log.exception("dhcp.events: Cannot find onu serial number for this event", kafka_event=event)
- raise Exception("dhcp.events: Cannot find onu serial number for this event")
-
- return onu_sn
-
def process_event(self, event):
value = json.loads(event.value)
- onu_sn = self.get_onu_sn(value)
+ onu_sn = AttHelpers.get_onu_sn(value)
+ si = AttHelpers.get_si_by_sn(onu_sn)
- subscriber = RCORDSubscriber.objects.get(onu_device=onu_sn)
+ if not si:
+ self.log.exception("dhcp.events: Cannot find att-workflow-driver service instance for this event", kafka_event=value)
+ raise Exception("dhcp.events: Cannot find att-workflow-driver service instance for this event")
- self.log.info("dhcp.events: Got event for subscriber", subscriber=subscriber, event_value=value, onu_sn=onu_sn)
+ self.log.info("dhcp.events: Got event for subscriber", event_value=value, onu_sn=onu_sn, si=si)
- # NOTE it will be better to update the SI and use the model policy to update the subscriber,
- # if this fails for any reason the event is lost
- if subscriber.ip_address != value["ipAddress"] or \
- subscriber.mac_address != value["macAddress"]:
-
- # FIXME apparently it's always saving
- subscriber.ip_address = value["ipAddress"]
- subscriber.mac_address = value["macAddress"]
- subscriber.save()
+ si.dhcp_state = value["messageType"];
+ si.ip_address = value["ipAddress"];
+ si.mac_address = value["macAddress"];
+ si.save(update_fields=["dhcp_state", "ip_address", "mac_address", "updated"], always_update_timestamp=True)
+ # subscriber = RCORDSubscriber.objects.get(onu_device=onu_sn)
+ #
+ #
+ # # NOTE it will be better to update the SI and use the model policy to update the subscriber,
+ # # if this fails for any reason the event is lost
+ #
+ # if value["messageType"] == "DHCPACK":
+ #
+ # # FIXME apparently it's always saving
+ # subscriber.ip_address = value["ipAddress"]
+ # subscriber.mac_address = value["macAddress"]
+ # subscriber.save()
diff --git a/xos/synchronizer/event_steps/test_auth_event.py b/xos/synchronizer/event_steps/test_auth_event.py
index 08887dd..e04f70b 100644
--- a/xos/synchronizer/event_steps/test_auth_event.py
+++ b/xos/synchronizer/event_steps/test_auth_event.py
@@ -107,5 +107,5 @@
self.event_step.process_event(self.event)
- self.hippie_si.save.assert_called_with(always_update_timestamp=True, update_fields=['authentication_state', 'no_sync', 'updated'])
+ self.hippie_si.save.assert_called_with(always_update_timestamp=True, update_fields=['authentication_state', 'updated'])
self.assertEqual(self.hippie_si.authentication_state, 'APPROVED')
diff --git a/xos/synchronizer/event_steps/test_dhcp_event.py b/xos/synchronizer/event_steps/test_dhcp_event.py
index a13b2b3..a31bf3f 100644
--- a/xos/synchronizer/event_steps/test_dhcp_event.py
+++ b/xos/synchronizer/event_steps/test_dhcp_event.py
@@ -81,13 +81,17 @@
self.volt.name = "vOLT"
self.volt.leaf_model = Mock()
- self.subscriber = RCORDSubscriber()
- self.subscriber.onu_device = "BRCM1234"
- self.subscriber.save = Mock()
+ # self.subscriber = RCORDSubscriber()
+ # self.subscriber.onu_device = "BRCM1234"
+ # self.subscriber.save = Mock()
self.mac_address = "00:AA:00:00:00:01"
self.ip_address = "192.168.3.5"
+ self.si = AttWorkflowDriverServiceInstance()
+ self.si.serial_number = "BRCM1234"
+ self.si.save = Mock()
+
def tearDown(self):
sys.path = self.sys_path_save
@@ -98,19 +102,21 @@
"deviceId" : "of:0000000000000001",
"portNumber" : "1",
"macAddress" : self.mac_address,
- "ipAddress" : self.ip_address
+ "ipAddress" : self.ip_address,
+ "messageType": "DHCPREQUEST"
})
with patch.object(VOLTService.objects, "get_items") as volt_service_mock, \
- patch.object(RCORDSubscriber.objects, "get_items") as subscriber_mock, \
+ patch.object(AttWorkflowDriverServiceInstance.objects, "get_items") as si_mock, \
patch.object(self.volt, "get_onu_sn_from_openflow") as get_onu_sn:
volt_service_mock.return_value = [self.volt]
get_onu_sn.return_value = "BRCM1234"
- subscriber_mock.return_value = [self.subscriber]
+ si_mock.return_value = [self.si]
self.event_step.process_event(self.event)
- self.subscriber.save.assert_called()
- self.assertEqual(self.subscriber.mac_address, self.mac_address)
- self.assertEqual(self.subscriber.ip_address, self.ip_address)
+ self.si.save.assert_called()
+ self.assertEqual(self.si.dhcp_state, "DHCPREQUEST")
+ self.assertEqual(self.si.mac_address, self.mac_address)
+ self.assertEqual(self.si.ip_address, self.ip_address)
diff --git a/xos/synchronizer/helpers.py b/xos/synchronizer/helpers.py
index 14f0eb4..9fffd7c 100644
--- a/xos/synchronizer/helpers.py
+++ b/xos/synchronizer/helpers.py
@@ -13,7 +13,7 @@
# limitations under the License.
from synchronizers.new_base.syncstep import DeferredException
-from synchronizers.new_base.modelaccessor import AttWorkflowDriverWhiteListEntry, ONUDevice, model_accessor
+from synchronizers.new_base.modelaccessor import AttWorkflowDriverWhiteListEntry, AttWorkflowDriverServiceInstance, ONUDevice, VOLTService, model_accessor
from xosconfig import Config
from multistructlog import create_logger
@@ -52,8 +52,32 @@
raise DeferredException("ONU device %s is not know to XOS yet" % att_si.serial_number)
if pon_port.port_no != whitelisted.pon_port_id or att_si.of_dpid != whitelisted.device_id:
- log.warn("ONU disable as location don't match", object=str(att_si), serial_number=att_si.serial_number,
+ log.warn("ONU disable as location don't match",
+ object=str(att_si),
+ serial_number=att_si.serial_number,
+ pon_port=pon_port.port_no,
+ whitelisted_pon_port=whitelisted.pon_port_id,
+ device_id=att_si.of_dpid,
+ whitelisted_device_id=whitelisted.device_id,
**att_si.tologdict())
return [False, "ONU activated in wrong location"]
- return [True, "ONU has been validated"]
\ No newline at end of file
+ return [True, "ONU has been validated"]
+
+ @staticmethod
+ def get_onu_sn(event):
+ olt_service = VOLTService.objects.first()
+ onu_sn = olt_service.get_onu_sn_from_openflow(event["deviceId"], event["portNumber"])
+ if not onu_sn or onu_sn is None:
+ log.exception("Cannot find onu serial number for this event", kafka_event=event)
+ raise Exception("Cannot find onu serial number for this event")
+
+ return onu_sn
+
+ @staticmethod
+ def get_si_by_sn(serial_number):
+ try:
+ return AttWorkflowDriverServiceInstance.objects.get(serial_number=serial_number)
+ except IndexError:
+ log.exception("Cannot find att-workflow-driver service instance for this serial number", serial_number=serial_number)
+ raise Exception("Cannot find att-workflow-driver service instance for this serial number %s", serial_number)
\ No newline at end of file
diff --git a/xos/synchronizer/model_policies/model_policy_att_workflow_driver_serviceinstance.py b/xos/synchronizer/model_policies/model_policy_att_workflow_driver_serviceinstance.py
index 8513d41..32d0eaa 100644
--- a/xos/synchronizer/model_policies/model_policy_att_workflow_driver_serviceinstance.py
+++ b/xos/synchronizer/model_policies/model_policy_att_workflow_driver_serviceinstance.py
@@ -46,7 +46,7 @@
self.validate_onu_state(si)
else:
# just clean the status
- si.status_message = ""
+ si.status_message = "ONU has been disabled"
# handling the subscriber status
subscriber = self.get_subscriber(si.serial_number)
@@ -101,12 +101,18 @@
subscriber.status = "auth-failed"
si.status_message += " - Authentication denied"
- if cur_status == subscriber.status:
+ # NOTE we save the subscriber only if:
+ # - the status has changed
+ # - we get a DHCPACK event
+ if cur_status != subscriber.status or si.dhcp_state == "DHCPACK":
+ self.logger.debug("MODEL_POLICY: updating subscriber", onu_device=subscriber.onu_device, authentication_state=si.authentication_state, subscriber_status=subscriber.status)
+ if si.ip_address and si.mac_address:
+ subscriber.ip_address = si.ip_address
+ subscriber.mac_address = si.mac_address
+ subscriber.save(always_update_timestamp=True)
+ else:
self.logger.debug("MODEL_POLICY: subscriber status has not changed", onu_device=subscriber.onu_device,
authentication_state=si.authentication_state, subscriber_status=subscriber.status)
- else:
- self.logger.debug("MODEL_POLICY: updating subscriber", onu_device=subscriber.onu_device, authentication_state=si.authentication_state, subscriber_status=subscriber.status)
- subscriber.save(always_update_timestamp=True)
def handle_delete(self, si):
pass
diff --git a/xos/synchronizer/model_policies/test_model_policy_att_workflow_driver_serviceinstance.py b/xos/synchronizer/model_policies/test_model_policy_att_workflow_driver_serviceinstance.py
index 8e15202..afa4183 100644
--- a/xos/synchronizer/model_policies/test_model_policy_att_workflow_driver_serviceinstance.py
+++ b/xos/synchronizer/model_policies/test_model_policy_att_workflow_driver_serviceinstance.py
@@ -240,6 +240,22 @@
self.policy.update_subscriber(sub, self.si)
sub_save.assert_not_called()
+ def test_update_subscriber_dhcp(self):
+ sub = RCORDSubscriber(
+ onu_device="BRCM1234"
+ )
+
+ self.si.dhcp_state = "DHCPACK"
+ self.si.ip_address = "1234"
+ self.si.mac_address = "4321"
+
+ with patch.object(sub, "save") as sub_save:
+ self.policy.update_subscriber(sub, self.si)
+ sub_save.assert_called()
+ self.assertEqual(sub.mac_address, self.si.mac_address)
+ self.assertEqual(sub.ip_address, self.si.ip_address)
+
+
def test_handle_update_subscriber(self):
self.si.onu_state = "DISABLED"
@@ -249,7 +265,6 @@
)
with patch.object(self.policy, "get_subscriber") as get_subscriber, \
- patch.object(self.policy, "update_onu") as update_onu, \
patch.object(self.policy, "update_subscriber") as update_subscriber:
get_subscriber.return_value = None
diff --git a/xos/synchronizer/models/att-workflow-driver.xproto b/xos/synchronizer/models/att-workflow-driver.xproto
index 7216dff..233eb00 100644
--- a/xos/synchronizer/models/att-workflow-driver.xproto
+++ b/xos/synchronizer/models/att-workflow-driver.xproto
@@ -16,6 +16,9 @@
required int32 uni_port_id = 5 [db_index = False];
required string onu_state = 6 [max_length = 254, db_index = False, default = "AWAITING", choices = "(('AWAITING', 'Awaiting'), ('ENABLED', 'Enabled'), ('DISABLED', 'Disabled'))"];
optional string status_message = 7 [max_length = 254, db_index = False, default = ""];
+ optional string dhcp_state = 8 [max_length = 254, db_index = False, default = "AWAITING", choices = "(('AWAITING', 'Awaiting'), ('DHCPDISCOVER', 'DHCPDISCOVER'), ('DHCPACK', 'DHCPACK'), ('DHCPREQUEST', 'DHCPREQUEST'))"];
+ optional string ip_address = 9 [max_length = 20, db_index = False];
+ optional string mac_address = 10 [max_length = 20, db_index = False];
}
message AttWorkflowDriverWhiteListEntry (XOSBase) {