SEBA-206 Report OLT link connectivity in OLTDevice model
Change-Id: Icb3a75d719adac6d8eea139fe9d00d34a11baecb
diff --git a/xos/synchronizer/event_steps/onos_event.py b/xos/synchronizer/event_steps/onos_event.py
new file mode 100644
index 0000000..574413d
--- /dev/null
+++ b/xos/synchronizer/event_steps/onos_event.py
@@ -0,0 +1,90 @@
+
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import datetime
+import json
+import time
+from xossynchronizer.event_steps.eventstep import EventStep
+from xosconfig import Config
+from xoskafka import XOSKafkaProducer
+from multistructlog import create_logger
+
+log = create_logger(Config().get('logging'))
+
+
+class OnosPortEventStep(EventStep):
+ topics = ["onos.events.port"]
+ technology = "kafka"
+
+ def __init__(self, *args, **kwargs):
+ super(OnosPortEventStep, self).__init__(*args, **kwargs)
+
+ def subscriber_olt_closure(self, olt):
+ subscribers = []
+ for pon_port in olt.pon_ports.all():
+ for onu_device in pon_port.onu_devices.all():
+ for si in onu_device.volt_service_instances.all():
+ for subscriber in si.westbound_service_instances:
+ subscribers.append(subscriber)
+ return subscribers
+
+
+ def send_alarm(self, olt, value):
+ timestamp = time.mktime(datetime.datetime.strptime(value["timestamp"], "%Y-%m-%dT%H:%M:%S.%fZ").timetuple())
+ state = "RAISED" if olt.link_status == "down" else "CLEARED"
+
+ # Hypothetically, a maximum of 64 subscribers per pon port, 16 pon ports, and 32 characters
+ # per subscriber name = 32KB of subscriber names in the event.
+ subscribers = self.subscriber_olt_closure(olt)
+ subscribers = [x.name for x in subscribers]
+
+ alarm = {"category": "OLT",
+ "reported_ts": time.time(),
+ "raised_ts": timestamp,
+ "state": state,
+ "alarm_type_name": "OLT.LOSS_OF_AGGSWITCH",
+ "severity": "MAJOR",
+ "resource_id": olt.device_id,
+ "logical_device_id": olt.dp_id,
+ "context": {"affected_subscribers": subscribers},
+ "type": "COMMUNICATION",
+ "id": "xos.voltservice.%s.OLT_LOSS_OF_AGGWSWITCH" % olt.device_id,
+ "description": "xos.voltservice.%s - OLT LOSS OF AGGSWITCH Alarm - OLT_LOSS_OF_AGGSWITCH - %s" % (olt.device_id, state)}
+
+ topic = "xos.alarms.olt-service"
+ key = olt.device_id
+ value = json.dumps(alarm, default=lambda o: repr(o))
+
+ XOSKafkaProducer.produce(topic, key, value)
+
+ def process_event(self, event):
+ log.info("Received ONOS Port Event", kafka_event=event)
+
+ value = json.loads(event.value)
+
+ olt = self.model_accessor.OLTDevice.objects.filter(switch_datapath_id=value["deviceId"],
+ switch_port=value["portId"])
+ if not olt:
+ log.info("Onos port event not for a known olt", deviceId=value["deviceId"], portId=value["portId"])
+ return
+
+ olt = olt[0]
+
+ link_status = "up" if value["enabled"] else "down"
+ if link_status != olt.link_status:
+ olt.link_status = link_status
+ olt.save_changed_fields()
+ self.send_alarm(olt, value)
diff --git a/xos/synchronizer/event_steps/test_onos_event.py b/xos/synchronizer/event_steps/test_onos_event.py
new file mode 100644
index 0000000..8f8f0d8
--- /dev/null
+++ b/xos/synchronizer/event_steps/test_onos_event.py
@@ -0,0 +1,235 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+import json
+from mock import patch, Mock, MagicMock, ANY
+
+import datetime
+import os
+import sys
+import time
+
+test_path = os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
+
+
+class TestOnosPortEvent(unittest.TestCase):
+
+ def setUp(self):
+ global DeferredException
+
+ self.sys_path_save = sys.path
+
+ # Setting up the config module
+ from xosconfig import Config
+ config = os.path.join(test_path, "../test_config.yaml")
+ Config.clear()
+ Config.init(config, "synchronizer-config-schema.yaml")
+
+ # Mock the kafka producer
+ self.mockxoskafka = MagicMock()
+ modules = {
+ 'xoskafka': self.mockxoskafka,
+ 'xoskafka.XOSKafkaProducer': self.mockxoskafka.XOSKafkaProducer,
+ }
+ self.module_patcher = patch.dict('sys.modules', modules)
+ self.module_patcher.start()
+
+ from xossynchronizer.mock_modelaccessor_build import mock_modelaccessor_config
+ mock_modelaccessor_config(test_path, [("olt-service", "volt.xproto"),
+ ("rcord", "rcord.xproto")])
+
+ import xossynchronizer.modelaccessor
+ import mock_modelaccessor
+ reload(mock_modelaccessor) # in case nose2 loaded it in a previous test
+ reload(xossynchronizer.modelaccessor) # in case nose2 loaded it in a previous test
+
+ from mock_modelaccessor import MockObjectList
+ from xossynchronizer.modelaccessor import model_accessor
+ self.model_accessor = model_accessor
+
+ # necessary to reset XOSKafkaProducer's call_count
+ import onos_event
+ reload(onos_event)
+
+ from onos_event import OnosPortEventStep, XOSKafkaProducer
+ from onos_event import XOSKafkaProducer
+ self.XOSKafkaProducer = XOSKafkaProducer
+
+ # import all class names to globals
+ for (k, v) in model_accessor.all_model_classes.items():
+ globals()[k] = v
+
+ self.event_step = OnosPortEventStep
+
+ self.volt_service = VOLTService(name="volt",
+ id=1112,
+ backend_code=1,
+ backend_status="succeeded")
+
+ self.oltdevice = OLTDevice(name="myolt",
+ device_id="of:0000000000000001",
+ switch_dsatapath_id="of:0000000000000001",
+ switch_port="1")
+
+ self.ponport = PONPort(olt_device = self.oltdevice)
+
+ self.onudevice = ONUDevice(pon_port = self.ponport)
+
+ self.subscriber = RCORDSubscriber(name="somesubscriber")
+ self.voltsi = VOLTServiceInstance()
+
+ # chain it all together
+ self.oltdevice.pon_ports = MockObjectList([self.ponport])
+ self.ponport.onu_devices = MockObjectList([self.onudevice])
+ self.onudevice.volt_service_instances = MockObjectList([self.voltsi])
+ self.voltsi.westbound_service_instances = [self.subscriber]
+
+ self.log = Mock()
+
+ def tearDown(self):
+ sys.path = self.sys_path_save
+
+ def test_process_event_enable(self):
+ with patch.object(OLTDevice.objects, "get_items") as olt_objects:
+ olt_objects.return_value = [self.oltdevice]
+
+ event_dict = {"timestamp":"2019-03-21T18:00:26.613Z",
+ "deviceId": self.oltdevice.switch_datapath_id,
+ "portId": self.oltdevice.switch_port,
+ "enabled": True}
+ event = Mock()
+ event.value = json.dumps(event_dict)
+
+ step = self.event_step(model_accessor=self.model_accessor, log=self.log)
+ step.process_event(event)
+
+ self.assertEqual(self.oltdevice.link_status, "up")
+
+ def test_process_event_disable(self):
+ with patch.object(OLTDevice.objects, "get_items") as olt_objects:
+ olt_objects.return_value = [self.oltdevice]
+
+ event_dict = {"timestamp":"2019-03-21T18:00:26.613Z",
+ "deviceId": self.oltdevice.switch_datapath_id,
+ "portId": self.oltdevice.switch_port,
+ "enabled": False}
+ event = Mock()
+ event.value = json.dumps(event_dict)
+
+ step = self.event_step(model_accessor=self.model_accessor, log=self.log)
+ step.process_event(event)
+
+ self.assertEqual(self.oltdevice.link_status, "down")
+
+ def test_process_event_no_olt(self):
+ with patch.object(OLTDevice.objects, "get_items") as olt_objects:
+ olt_objects.return_value = [self.oltdevice]
+
+ event_dict = {"timestamp":"2019-03-21T18:00:26.613Z",
+ "deviceId": "doesnotexist",
+ "portId": self.oltdevice.switch_port,
+ "enabled": True}
+ event = Mock()
+ event.value = json.dumps(event_dict)
+
+ step = self.event_step(model_accessor=self.model_accessor, log=self.log)
+ step.process_event(event)
+
+ # should not have changed
+ self.assertEqual(self.oltdevice.link_status, None)
+
+ def test_send_alarm(self):
+ self.oltdevice.link_status = "down"
+ value = {"timestamp":"2019-03-21T18:00:26.613Z",
+ "deviceId":"of:0000000000000001",
+ "portId":"2",
+ "enabled":False,
+ "speed":10000,
+ "type":"COPPER"}
+
+ step = self.event_step(model_accessor=self.model_accessor, log=self.log)
+ step.send_alarm(self.oltdevice, value)
+
+ self.assertEqual(self.XOSKafkaProducer.produce.call_count, 1)
+ topic = self.XOSKafkaProducer.produce.call_args[0][0]
+ key = self.XOSKafkaProducer.produce.call_args[0][1]
+ event = json.loads(self.XOSKafkaProducer.produce.call_args[0][2])
+
+ self.assertEqual(topic, "xos.alarms.olt-service")
+ self.assertEqual(key, "of:0000000000000001")
+
+ raised_ts = time.mktime(datetime.datetime.strptime(value["timestamp"], "%Y-%m-%dT%H:%M:%S.%fZ").timetuple())
+
+ self.maxDiff = None
+
+ expected_alarm = {
+ u"category": u"OLT",
+ u"reported_ts": ANY,
+ u"raised_ts": raised_ts,
+ u"state": u"RAISED",
+ u"alarm_type_name": u"OLT.LOSS_OF_AGGSWITCH",
+ u"severity": u"MAJOR",
+ u"resource_id": unicode(self.oltdevice.device_id),
+ u"logical_device_id": self.oltdevice.dp_id,
+ u"context": {u'affected_subscribers': [u'somesubscriber']},
+ u"type": u"COMMUNICATION",
+ u"id": u"xos.voltservice.%s.OLT_LOSS_OF_AGGWSWITCH" % self.oltdevice.device_id,
+ u"description": u"xos.voltservice.%s - OLT LOSS OF AGGSWITCH Alarm - OLT_LOSS_OF_AGGSWITCH - RAISED" % self.oltdevice.device_id}
+
+ self.assertDictEqual(expected_alarm, event)
+
+ def test_clear_alarm(self):
+ self.oltdevice.link_status = "up"
+ value = {"timestamp":"2019-03-21T18:00:26.613Z",
+ "deviceId":"of:0000000000000001",
+ "portId":"2",
+ "enabled":True,
+ "speed":10000,
+ "type":"COPPER"}
+
+ step = self.event_step(model_accessor=self.model_accessor, log=self.log)
+ step.send_alarm(self.oltdevice, value)
+
+ self.assertEqual(self.XOSKafkaProducer.produce.call_count, 1)
+ topic = self.XOSKafkaProducer.produce.call_args[0][0]
+ key = self.XOSKafkaProducer.produce.call_args[0][1]
+ event = json.loads(self.XOSKafkaProducer.produce.call_args[0][2])
+
+ self.assertEqual(topic, "xos.alarms.olt-service")
+ self.assertEqual(key, "of:0000000000000001")
+
+ raised_ts = time.mktime(datetime.datetime.strptime(value["timestamp"], "%Y-%m-%dT%H:%M:%S.%fZ").timetuple())
+
+ self.maxDiff = None
+
+ expected_alarm = {
+ u"category": u"OLT",
+ u"reported_ts": ANY,
+ u"raised_ts": raised_ts,
+ u"state": u"CLEARED",
+ u"alarm_type_name": u"OLT.LOSS_OF_AGGSWITCH",
+ u"severity": u"MAJOR",
+ u"resource_id": unicode(self.oltdevice.device_id),
+ u"logical_device_id": self.oltdevice.dp_id,
+ u"context": {u'affected_subscribers': [u'somesubscriber']},
+ u"type": u"COMMUNICATION",
+ u"id": u"xos.voltservice.%s.OLT_LOSS_OF_AGGWSWITCH" % self.oltdevice.device_id,
+ u"description": u"xos.voltservice.%s - OLT LOSS OF AGGSWITCH Alarm - OLT_LOSS_OF_AGGSWITCH - CLEARED" % self.oltdevice.device_id}
+
+ self.assertDictEqual(expected_alarm, event)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/xos/synchronizer/migrations/0004_oltdevice_decl_link_status.py b/xos/synchronizer/migrations/0004_oltdevice_decl_link_status.py
new file mode 100644
index 0000000..10a73e3
--- /dev/null
+++ b/xos/synchronizer/migrations/0004_oltdevice_decl_link_status.py
@@ -0,0 +1,34 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# -*- coding: utf-8 -*-
+# Generated by Django 1.11.11 on 2019-03-19 22:46
+from __future__ import unicode_literals
+
+from django.db import migrations, models
+
+
+class Migration(migrations.Migration):
+
+ dependencies = [
+ ('volt', '0003_auto_20190312_1830'),
+ ]
+
+ operations = [
+ migrations.AddField(
+ model_name='oltdevice_decl',
+ name='link_status',
+ field=models.TextField(blank=True, choices=[(b'up', b'up'), (b'down', b'down')], help_text=b'connectivity status, whether OLT has connectivity to agg switch', null=True),
+ ),
+ ]
diff --git a/xos/synchronizer/models/volt.xproto b/xos/synchronizer/models/volt.xproto
index d830300..20492a4 100644
--- a/xos/synchronizer/models/volt.xproto
+++ b/xos/synchronizer/models/volt.xproto
@@ -75,6 +75,10 @@
optional string oper_status = 12 [
help_text = "operational status, whether OLT is active",
feedback_state = True];
+ optional string link_status = 21 [
+ help_text = "connectivity status, whether OLT has connectivity to agg switch",
+ choices = "(('up', 'up'), ('down', 'down'))",
+ feedback_state = True];
optional string of_id = 13 [
help_text = "Logical device openflow id",
feedback_state = True];
diff --git a/xos/synchronizer/volt-synchronizer.py b/xos/synchronizer/volt-synchronizer.py
index 5f82ca5..a5e3019 100755
--- a/xos/synchronizer/volt-synchronizer.py
+++ b/xos/synchronizer/volt-synchronizer.py
@@ -19,6 +19,7 @@
import os
from xossynchronizer import Synchronizer
from xosconfig import Config
+from xoskafka import XOSKafkaProducer
base_config_file = os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + '/config.yaml')
mounted_config_file = os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + '/mounted_config.yaml')
@@ -28,4 +29,7 @@
else:
Config.init(base_config_file, 'synchronizer-config-schema.yaml')
+# init kafka producer connection
+XOSKafkaProducer.init()
+
Synchronizer().run()