SEBA-207 Generate switch port alarms;
Add kind field to switch port;
Add managementAddress and pull step
Change-Id: I17ec83d3ae9e82c8c3f99151115a35e299ee0d60
diff --git a/xos/synchronizer/config.yaml b/xos/synchronizer/config.yaml
index 37caa46..826dfd4 100644
--- a/xos/synchronizer/config.yaml
+++ b/xos/synchronizer/config.yaml
@@ -23,6 +23,7 @@
models_dir: "/opt/xos/synchronizers/fabric/models"
model_policies_dir: "/opt/xos/synchronizers/fabric/model_policies"
event_steps_dir: "/opt/xos/synchronizers/fabric/event_steps"
+pull_steps_dir: "/opt/xos/synchronizers/fabric/pull_steps"
logging:
version: 1
handlers:
diff --git a/xos/synchronizer/event_steps/onos_event.py b/xos/synchronizer/event_steps/onos_event.py
index 27199f2..fcf8e27 100644
--- a/xos/synchronizer/event_steps/onos_event.py
+++ b/xos/synchronizer/event_steps/onos_event.py
@@ -15,9 +15,12 @@
from __future__ import absolute_import
+import datetime
import json
+import time
from xossynchronizer.event_steps.eventstep import EventStep
from xosconfig import Config
+from xoskafka import XOSKafkaProducer
from multistructlog import create_logger
log = create_logger(Config().get('logging'))
@@ -30,6 +33,33 @@
def __init__(self, *args, **kwargs):
super(OnosPortEventStep, self).__init__(*args, **kwargs)
+ def send_alarm(self, switch, port, value):
+ timestamp = time.mktime(datetime.datetime.strptime(value["timestamp"], "%Y-%m-%dT%H:%M:%S.%fZ").timetuple())
+ state = "RAISED" if port.oper_status == "disabled" else "CLEARED"
+
+ context = {"portId": port.portId,
+ "portKind": port.kind or "unknown",
+ "switch.name": switch.name}
+
+ alarm = {"category": "SWITCH",
+ "reported_ts": time.time(),
+ "raised_ts": timestamp,
+ "state": state,
+ "alarm_type_name": "SWITCH.PORT_LOS",
+ "severity": "MAJOR",
+ "resource_id": switch.ofId,
+ "context": context,
+ "type": "COMMUNICATION",
+ "id": "xos.fabricservice.%s.SWITCH_PORT_LOS" % switch.ofId,
+ "description": "xos.fabricservice.%s - SWITCH PORT LOS Alarm -"
+ " SWITCH_PORT_LOS - %s" % (switch.ofId, state)}
+
+ topic = "xos.alarms.fabric-service"
+ key = "%s:%s" % (switch.ofId, port.portId)
+ value = json.dumps(alarm, default=lambda o: repr(o))
+
+ XOSKafkaProducer.produce(topic, key, value)
+
def process_event(self, event):
value = json.loads(event.value)
@@ -58,3 +88,4 @@
if oper_status != port.oper_status:
port.oper_status = oper_status
port.save_changed_fields()
+ self.send_alarm(switch, port, value)
diff --git a/xos/synchronizer/event_steps/test_onos_event.py b/xos/synchronizer/event_steps/test_onos_event.py
index 61b0185..af5e509 100644
--- a/xos/synchronizer/event_steps/test_onos_event.py
+++ b/xos/synchronizer/event_steps/test_onos_event.py
@@ -14,10 +14,12 @@
#from __future__ import absolute_import
+import datetime
import imp
import unittest
import json
-from mock import patch, Mock
+import time
+from mock import patch, Mock, MagicMock, ANY
import os
import sys
@@ -39,6 +41,15 @@
Config.init(config, "synchronizer-config-schema.yaml")
# END Setting up the config module
+ # Mock the kafka producer
+ self.mockxoskafka = MagicMock()
+ modules = {
+ 'xoskafka': self.mockxoskafka,
+ 'xoskafka.XOSKafkaProducer': self.mockxoskafka.XOSKafkaProducer,
+ }
+ self.module_patcher = patch.dict('sys.modules', modules)
+ self.module_patcher.start()
+
from xossynchronizer.mock_modelaccessor_build import mock_modelaccessor_config
mock_modelaccessor_config(test_path, [("fabric", "fabric.xproto"),
("onos-service", "onos.xproto")])
@@ -52,7 +63,13 @@
self.model_accessor = model_accessor
from mock_modelaccessor import MockObjectList
- from onos_event import OnosPortEventStep
+
+ # necessary to reset XOSKafkaProducer's call_count
+ import onos_event
+ reload(onos_event)
+
+ from onos_event import OnosPortEventStep, XOSKafkaProducer
+ self.XOSKafkaProducer = XOSKafkaProducer
# import all class names to globals
for (k, v) in model_accessor.all_model_classes.items():
@@ -79,6 +96,7 @@
backend_status="succeeded")
self.port2 = SwitchPort(name="switch1port2",
+ kind="access",
switch=self.switch,
switch_id=self.switch.id,
portId="2",
@@ -99,7 +117,8 @@
switch_objects.return_value = [self.switch]
switchport_objects.return_value = [self.port1, self.port2]
- event_dict = {"deviceId": self.switch.ofId,
+ event_dict = {"timestamp":"2019-03-21T18:00:26.613Z",
+ "deviceId": self.switch.ofId,
"portId": self.port1.portId,
"enabled": True}
event = Mock()
@@ -116,7 +135,8 @@
switch_objects.return_value = [self.switch]
switchport_objects.return_value = [self.port1, self.port2]
- event_dict = {"deviceId": self.switch.ofId,
+ event_dict = {"timestamp":"2019-03-21T18:00:26.613Z",
+ "deviceId": self.switch.ofId,
"portId": self.port1.portId,
"enabled": False}
event = Mock()
@@ -133,7 +153,8 @@
switch_objects.return_value = [self.switch]
switchport_objects.return_value = [self.port1, self.port2]
- event_dict = {"deviceId": "doesnotexist",
+ event_dict = {"timestamp":"2019-03-21T18:00:26.613Z",
+ "deviceId": "doesnotexist",
"portId": self.port1.portId,
"enabled": True}
event = Mock()
@@ -152,7 +173,8 @@
switch_objects.return_value = [self.switch]
switchport_objects.return_value = [self.port1, self.port2]
- event_dict = {"deviceId": self.switch.ofId,
+ event_dict = {"timestamp":"2019-03-21T18:00:26.613Z",
+ "deviceId": self.switch.ofId,
"portId": "doesnotexist",
"enabled": True}
event = Mock()
@@ -165,6 +187,86 @@
# should not have changed
self.assertEqual(self.port1.oper_status, None)
+ def test_send_alarm(self):
+ self.port2.oper_status = "disabled"
+ value = {"timestamp":"2019-03-21T18:00:26.613Z",
+ "deviceId":"of:0000000000000001",
+ "portId":"2",
+ "enabled":False,
+ "speed":10000,
+ "type":"COPPER"}
+
+ step = self.event_step(model_accessor=self.model_accessor, log=self.log)
+ step.send_alarm(self.switch, self.port2, value)
+
+ self.assertEqual(self.XOSKafkaProducer.produce.call_count, 1)
+ topic = self.XOSKafkaProducer.produce.call_args[0][0]
+ key = self.XOSKafkaProducer.produce.call_args[0][1]
+ event = json.loads(self.XOSKafkaProducer.produce.call_args[0][2])
+
+ self.assertEqual(topic, "xos.alarms.fabric-service")
+ self.assertEqual(key, "of:0000000000000001:2")
+
+ raised_ts = time.mktime(datetime.datetime.strptime(value["timestamp"], "%Y-%m-%dT%H:%M:%S.%fZ").timetuple())
+
+ self.maxDiff = None
+
+ expected_alarm = {
+ u"category": u"SWITCH",
+ u"reported_ts": ANY,
+ u"raised_ts": raised_ts,
+ u"state": u"RAISED",
+ u"alarm_type_name": u"SWITCH.PORT_LOS",
+ u"severity": u"MAJOR",
+ u"resource_id": unicode(self.switch.ofId),
+ u"context": {u"portId": u"2", u"portKind": u"access",
+ u'switch.name': u'switch1'},
+ u"type": u"COMMUNICATION",
+ u"id": u"xos.fabricservice.%s.SWITCH_PORT_LOS" % self.switch.ofId,
+ u"description": u"xos.fabricservice.%s - SWITCH PORT LOS Alarm - SWITCH_PORT_LOS - RAISED" % self.switch.ofId}
+
+ self.assertDictEqual(expected_alarm, event)
+
+ def test_clear_alarm(self):
+ self.port2.oper_status = "enabled"
+ value = {"timestamp":"2019-03-21T18:00:26.613Z",
+ "deviceId":"of:0000000000000001",
+ "portId":"2",
+ "enabled":False,
+ "speed":10000,
+ "type":"COPPER"}
+
+ step = self.event_step(model_accessor=self.model_accessor, log=self.log)
+ step.send_alarm(self.switch, self.port2, value)
+
+ self.assertEqual(self.XOSKafkaProducer.produce.call_count, 1)
+ topic = self.XOSKafkaProducer.produce.call_args[0][0]
+ key = self.XOSKafkaProducer.produce.call_args[0][1]
+ event = json.loads(self.XOSKafkaProducer.produce.call_args[0][2])
+
+ self.assertEqual(topic, "xos.alarms.fabric-service")
+ self.assertEqual(key, "of:0000000000000001:2")
+
+ raised_ts = time.mktime(datetime.datetime.strptime(value["timestamp"], "%Y-%m-%dT%H:%M:%S.%fZ").timetuple())
+
+ self.maxDiff = None
+
+ expected_alarm = {
+ u"category": u"SWITCH",
+ u"reported_ts": ANY,
+ u"raised_ts": raised_ts,
+ u"state": u"CLEARED",
+ u"alarm_type_name": u"SWITCH.PORT_LOS",
+ u"severity": u"MAJOR",
+ u"resource_id": unicode(self.switch.ofId),
+ u"context": {u"portId": u"2", u"portKind": u"access",
+ u'switch.name': u'switch1'},
+ u"type": u"COMMUNICATION",
+ u"id": u"xos.fabricservice.%s.SWITCH_PORT_LOS" % self.switch.ofId,
+ u"description": u"xos.fabricservice.%s - SWITCH PORT LOS Alarm - SWITCH_PORT_LOS - CLEARED" % self.switch.ofId}
+
+ self.assertDictEqual(expected_alarm, event)
+
if __name__ == '__main__':
unittest.main()
diff --git a/xos/synchronizer/fabric-synchronizer.py b/xos/synchronizer/fabric-synchronizer.py
index 1c98ecd..c4ce3a3 100755
--- a/xos/synchronizer/fabric-synchronizer.py
+++ b/xos/synchronizer/fabric-synchronizer.py
@@ -19,6 +19,7 @@
import os
from xossynchronizer import Synchronizer
from xosconfig import Config
+from xoskafka import XOSKafkaProducer
base_config_file = os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + '/config.yaml')
mounted_config_file = os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + '/mounted_config.yaml')
@@ -28,4 +29,7 @@
else:
Config.init(base_config_file, 'synchronizer-config-schema.yaml')
+# init kafka producer connection
+XOSKafkaProducer.init()
+
Synchronizer().run()
diff --git a/xos/synchronizer/migrations/0006_auto_20190416_1711.py b/xos/synchronizer/migrations/0006_auto_20190416_1711.py
new file mode 100644
index 0000000..7bc9571
--- /dev/null
+++ b/xos/synchronizer/migrations/0006_auto_20190416_1711.py
@@ -0,0 +1,39 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# -*- coding: utf-8 -*-
+# Generated by Django 1.11.20 on 2019-04-16 21:11
+from __future__ import unicode_literals
+
+from django.db import migrations, models
+
+
+class Migration(migrations.Migration):
+
+ dependencies = [
+ ('fabric', '0005_auto_20190409_1831'),
+ ]
+
+ operations = [
+ migrations.AddField(
+ model_name='switch',
+ name='managementAddress',
+ field=models.CharField(blank=True, help_text=b'Address where this switch can be managed', max_length=1024, null=True),
+ ),
+ migrations.AddField(
+ model_name='switchport',
+ name='kind',
+ field=models.CharField(blank=True, choices=[(b'access', b'access'), (b'internet', b'internet')], help_text=b'type of device attached to port', max_length=32, null=True),
+ ),
+ ]
diff --git a/xos/synchronizer/models/fabric.xproto b/xos/synchronizer/models/fabric.xproto
index df4275c..dc89c95 100644
--- a/xos/synchronizer/models/fabric.xproto
+++ b/xos/synchronizer/models/fabric.xproto
@@ -35,6 +35,10 @@
required bool isEdgeRouter = 7 [
help_text="True if the fabric switch is a leaf, False if it is a spine",
default = True];
+ optional string managementAddress = 8 [
+ help_text = "Address where this switch can be managed",
+ feedback_state = True,
+ max_length = 1024];
}
message SwitchPort(XOSBase) {
@@ -61,6 +65,10 @@
choices = "(('enabled', 'enabled'), ('disabled', 'disabled'))",
feedback_state = True,
max_length = 32];
+ optional string kind = 6 [
+ help_text = "type of device attached to port",
+ choices = "(('access', 'access'), ('internet', 'internet'))",
+ max_length = 32];
}
message PortInterface(XOSBase) {
diff --git a/xos/synchronizer/pull_steps/pull_onos_devices.py b/xos/synchronizer/pull_steps/pull_onos_devices.py
new file mode 100644
index 0000000..427b02a
--- /dev/null
+++ b/xos/synchronizer/pull_steps/pull_onos_devices.py
@@ -0,0 +1,78 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from xossynchronizer.pull_steps.pullstep import PullStep
+
+from xosconfig import Config
+from multistructlog import create_logger
+
+import requests
+from requests import ConnectionError
+from requests.auth import HTTPBasicAuth
+from requests.models import InvalidURL
+
+import os, sys
+sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+
+from helpers import Helpers
+
+log = create_logger(Config().get('logging'))
+
+class ONOSDevicePullStep(PullStep):
+ def __init__(self, model_accessor):
+ super(ONOSDevicePullStep, self).__init__(model_accessor=model_accessor)
+
+ def get_onos_fabric_service(self):
+ # FIXME do not select by name but follow ServiceDependency
+ fabric_service = self.model_accessor.Service.objects.get(name="fabric")
+ onos_fabric_service = fabric_service.provider_services[0].leaf_model
+ return onos_fabric_service
+
+ def pull_records(self):
+ log.debug("[ONOS device pull step] pulling devices from ONOS")
+
+ onos = self.get_onos_fabric_service()
+
+ url = 'http://%s:%s/onos/v1/devices/' % (onos.rest_hostname, onos.rest_port)
+
+ r = requests.get(url, auth=HTTPBasicAuth(onos.rest_username, onos.rest_password))
+
+ if r.status_code != 200:
+ log.error(r.text)
+ raise Exception("Failed to get onos devices")
+ else:
+ try:
+ log.info("Get devices response", json=r.json())
+ except Exception:
+ log.info("Get devices exception response", text=r.text)
+
+ for device in r.json()["devices"]:
+ if device["type"] != "SWITCH":
+ continue
+
+ xos_devices = self.model_accessor.Switch.objects.filter(ofId = device["id"])
+ if not xos_devices:
+ continue
+
+ xos_device = xos_devices[0]
+ changed = False
+
+ managementAddress = device.get("annotations", {}).get("managementAddress")
+ if (xos_device.managementAddress != managementAddress):
+ log.info("Setting managementAddress on switch %s to %s" % (xos_device.id, managementAddress))
+ xos_device.managementAddress = managementAddress
+ changed = True
+
+ if changed:
+ xos_device.save_changed_fields()