SEBA-207 Generate switch port alarms;
Add kind field to switch port;
Add managementAddress and pull step

Change-Id: I17ec83d3ae9e82c8c3f99151115a35e299ee0d60
diff --git a/xos/synchronizer/config.yaml b/xos/synchronizer/config.yaml
index 37caa46..826dfd4 100644
--- a/xos/synchronizer/config.yaml
+++ b/xos/synchronizer/config.yaml
@@ -23,6 +23,7 @@
 models_dir: "/opt/xos/synchronizers/fabric/models"
 model_policies_dir: "/opt/xos/synchronizers/fabric/model_policies"
 event_steps_dir: "/opt/xos/synchronizers/fabric/event_steps"
+pull_steps_dir: "/opt/xos/synchronizers/fabric/pull_steps"
 logging:
   version: 1
   handlers:
diff --git a/xos/synchronizer/event_steps/onos_event.py b/xos/synchronizer/event_steps/onos_event.py
index 27199f2..fcf8e27 100644
--- a/xos/synchronizer/event_steps/onos_event.py
+++ b/xos/synchronizer/event_steps/onos_event.py
@@ -15,9 +15,12 @@
 
 from __future__ import absolute_import
 
+import datetime
 import json
+import time
 from xossynchronizer.event_steps.eventstep import EventStep
 from xosconfig import Config
+from xoskafka import XOSKafkaProducer
 from multistructlog import create_logger
 
 log = create_logger(Config().get('logging'))
@@ -30,6 +33,33 @@
     def __init__(self, *args, **kwargs):
         super(OnosPortEventStep, self).__init__(*args, **kwargs)
 
+    def send_alarm(self, switch, port, value):
+        timestamp = time.mktime(datetime.datetime.strptime(value["timestamp"], "%Y-%m-%dT%H:%M:%S.%fZ").timetuple())
+        state = "RAISED" if port.oper_status == "disabled" else "CLEARED"
+
+        context = {"portId": port.portId,
+                   "portKind": port.kind or "unknown",
+                   "switch.name": switch.name}
+
+        alarm = {"category": "SWITCH",
+                 "reported_ts": time.time(),
+                 "raised_ts": timestamp,
+                 "state": state,
+                 "alarm_type_name": "SWITCH.PORT_LOS",
+                 "severity": "MAJOR",
+                 "resource_id": switch.ofId,
+                 "context": context,
+                 "type": "COMMUNICATION",
+                 "id": "xos.fabricservice.%s.SWITCH_PORT_LOS" % switch.ofId,
+                 "description": "xos.fabricservice.%s - SWITCH PORT LOS Alarm -"
+                                " SWITCH_PORT_LOS - %s" % (switch.ofId, state)}
+
+        topic = "xos.alarms.fabric-service"
+        key = "%s:%s" % (switch.ofId, port.portId)
+        value = json.dumps(alarm, default=lambda o: repr(o))
+
+        XOSKafkaProducer.produce(topic, key, value)
+
     def process_event(self, event):
         value = json.loads(event.value)
 
@@ -58,3 +88,4 @@
         if oper_status != port.oper_status:
             port.oper_status = oper_status
             port.save_changed_fields()
+            self.send_alarm(switch, port, value)
diff --git a/xos/synchronizer/event_steps/test_onos_event.py b/xos/synchronizer/event_steps/test_onos_event.py
index 61b0185..af5e509 100644
--- a/xos/synchronizer/event_steps/test_onos_event.py
+++ b/xos/synchronizer/event_steps/test_onos_event.py
@@ -14,10 +14,12 @@
 
 #from __future__ import absolute_import
 
+import datetime
 import imp
 import unittest
 import json
-from mock import patch, Mock
+import time
+from mock import patch, Mock, MagicMock, ANY
 
 import os
 import sys
@@ -39,6 +41,15 @@
         Config.init(config, "synchronizer-config-schema.yaml")
         # END Setting up the config module
 
+        # Mock the kafka producer
+        self.mockxoskafka = MagicMock()
+        modules = {
+            'xoskafka': self.mockxoskafka,
+            'xoskafka.XOSKafkaProducer': self.mockxoskafka.XOSKafkaProducer,
+        }
+        self.module_patcher = patch.dict('sys.modules', modules)
+        self.module_patcher.start()
+
         from xossynchronizer.mock_modelaccessor_build import mock_modelaccessor_config
         mock_modelaccessor_config(test_path, [("fabric", "fabric.xproto"),
                                               ("onos-service", "onos.xproto")])
@@ -52,7 +63,13 @@
         self.model_accessor = model_accessor
 
         from mock_modelaccessor import MockObjectList
-        from onos_event import OnosPortEventStep
+
+        # necessary to reset XOSKafkaProducer's call_count
+        import onos_event
+        reload(onos_event)
+
+        from onos_event import OnosPortEventStep, XOSKafkaProducer
+        self.XOSKafkaProducer = XOSKafkaProducer
 
         # import all class names to globals
         for (k, v) in model_accessor.all_model_classes.items():
@@ -79,6 +96,7 @@
                                 backend_status="succeeded")
 
         self.port2 = SwitchPort(name="switch1port2",
+                                kind="access",
                                 switch=self.switch,
                                 switch_id=self.switch.id,
                                 portId="2",
@@ -99,7 +117,8 @@
             switch_objects.return_value = [self.switch]
             switchport_objects.return_value = [self.port1, self.port2]
 
-            event_dict = {"deviceId": self.switch.ofId,
+            event_dict = {"timestamp":"2019-03-21T18:00:26.613Z",
+                          "deviceId": self.switch.ofId,
                           "portId": self.port1.portId,
                           "enabled": True}
             event = Mock()
@@ -116,7 +135,8 @@
             switch_objects.return_value = [self.switch]
             switchport_objects.return_value = [self.port1, self.port2]
 
-            event_dict = {"deviceId": self.switch.ofId,
+            event_dict = {"timestamp":"2019-03-21T18:00:26.613Z",
+                          "deviceId": self.switch.ofId,
                           "portId": self.port1.portId,
                           "enabled": False}
             event = Mock()
@@ -133,7 +153,8 @@
             switch_objects.return_value = [self.switch]
             switchport_objects.return_value = [self.port1, self.port2]
 
-            event_dict = {"deviceId": "doesnotexist",
+            event_dict = {"timestamp":"2019-03-21T18:00:26.613Z",
+                          "deviceId": "doesnotexist",
                           "portId": self.port1.portId,
                           "enabled": True}
             event = Mock()
@@ -152,7 +173,8 @@
             switch_objects.return_value = [self.switch]
             switchport_objects.return_value = [self.port1, self.port2]
 
-            event_dict = {"deviceId": self.switch.ofId,
+            event_dict = {"timestamp":"2019-03-21T18:00:26.613Z",
+                          "deviceId": self.switch.ofId,
                           "portId": "doesnotexist",
                           "enabled": True}
             event = Mock()
@@ -165,6 +187,86 @@
             # should not have changed
             self.assertEqual(self.port1.oper_status, None)
 
+    def test_send_alarm(self):
+        self.port2.oper_status = "disabled"
+        value = {"timestamp":"2019-03-21T18:00:26.613Z",
+                 "deviceId":"of:0000000000000001",
+                 "portId":"2",
+                 "enabled":False,
+                 "speed":10000,
+                 "type":"COPPER"}
+
+        step = self.event_step(model_accessor=self.model_accessor, log=self.log)
+        step.send_alarm(self.switch, self.port2, value)
+
+        self.assertEqual(self.XOSKafkaProducer.produce.call_count, 1)
+        topic = self.XOSKafkaProducer.produce.call_args[0][0]
+        key = self.XOSKafkaProducer.produce.call_args[0][1]
+        event = json.loads(self.XOSKafkaProducer.produce.call_args[0][2])
+
+        self.assertEqual(topic, "xos.alarms.fabric-service")
+        self.assertEqual(key, "of:0000000000000001:2")
+
+        raised_ts = time.mktime(datetime.datetime.strptime(value["timestamp"], "%Y-%m-%dT%H:%M:%S.%fZ").timetuple())
+
+        self.maxDiff = None
+
+        expected_alarm = {
+                 u"category": u"SWITCH",
+                 u"reported_ts": ANY,
+                 u"raised_ts": raised_ts,
+                 u"state": u"RAISED",
+                 u"alarm_type_name": u"SWITCH.PORT_LOS",
+                 u"severity": u"MAJOR",
+                 u"resource_id": unicode(self.switch.ofId),
+                 u"context": {u"portId": u"2", u"portKind": u"access",
+                              u'switch.name': u'switch1'},
+                 u"type": u"COMMUNICATION",
+                 u"id": u"xos.fabricservice.%s.SWITCH_PORT_LOS" % self.switch.ofId,
+                 u"description": u"xos.fabricservice.%s - SWITCH PORT LOS Alarm - SWITCH_PORT_LOS - RAISED" % self.switch.ofId}
+
+        self.assertDictEqual(expected_alarm, event)
+
+    def test_clear_alarm(self):
+        self.port2.oper_status = "enabled"
+        value = {"timestamp":"2019-03-21T18:00:26.613Z",
+                 "deviceId":"of:0000000000000001",
+                 "portId":"2",
+                 "enabled":False,
+                 "speed":10000,
+                 "type":"COPPER"}
+
+        step = self.event_step(model_accessor=self.model_accessor, log=self.log)
+        step.send_alarm(self.switch, self.port2, value)
+
+        self.assertEqual(self.XOSKafkaProducer.produce.call_count, 1)
+        topic = self.XOSKafkaProducer.produce.call_args[0][0]
+        key = self.XOSKafkaProducer.produce.call_args[0][1]
+        event = json.loads(self.XOSKafkaProducer.produce.call_args[0][2])
+
+        self.assertEqual(topic, "xos.alarms.fabric-service")
+        self.assertEqual(key, "of:0000000000000001:2")
+
+        raised_ts = time.mktime(datetime.datetime.strptime(value["timestamp"], "%Y-%m-%dT%H:%M:%S.%fZ").timetuple())
+
+        self.maxDiff = None
+
+        expected_alarm = {
+                 u"category": u"SWITCH",
+                 u"reported_ts": ANY,
+                 u"raised_ts": raised_ts,
+                 u"state": u"CLEARED",
+                 u"alarm_type_name": u"SWITCH.PORT_LOS",
+                 u"severity": u"MAJOR",
+                 u"resource_id": unicode(self.switch.ofId),
+                 u"context": {u"portId": u"2", u"portKind": u"access",
+                              u'switch.name': u'switch1'},
+                 u"type": u"COMMUNICATION",
+                 u"id": u"xos.fabricservice.%s.SWITCH_PORT_LOS" % self.switch.ofId,
+                 u"description": u"xos.fabricservice.%s - SWITCH PORT LOS Alarm - SWITCH_PORT_LOS - CLEARED" % self.switch.ofId}
+
+        self.assertDictEqual(expected_alarm, event)
+
 
 if __name__ == '__main__':
     unittest.main()
diff --git a/xos/synchronizer/fabric-synchronizer.py b/xos/synchronizer/fabric-synchronizer.py
index 1c98ecd..c4ce3a3 100755
--- a/xos/synchronizer/fabric-synchronizer.py
+++ b/xos/synchronizer/fabric-synchronizer.py
@@ -19,6 +19,7 @@
 import os
 from xossynchronizer import Synchronizer
 from xosconfig import Config
+from xoskafka import XOSKafkaProducer
 
 base_config_file = os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + '/config.yaml')
 mounted_config_file = os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + '/mounted_config.yaml')
@@ -28,4 +29,7 @@
 else:
     Config.init(base_config_file, 'synchronizer-config-schema.yaml')
 
+# init kafka producer connection
+XOSKafkaProducer.init()
+
 Synchronizer().run()
diff --git a/xos/synchronizer/migrations/0006_auto_20190416_1711.py b/xos/synchronizer/migrations/0006_auto_20190416_1711.py
new file mode 100644
index 0000000..7bc9571
--- /dev/null
+++ b/xos/synchronizer/migrations/0006_auto_20190416_1711.py
@@ -0,0 +1,39 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# -*- coding: utf-8 -*-
+# Generated by Django 1.11.20 on 2019-04-16 21:11
+from __future__ import unicode_literals
+
+from django.db import migrations, models
+
+
+class Migration(migrations.Migration):
+
+    dependencies = [
+        ('fabric', '0005_auto_20190409_1831'),
+    ]
+
+    operations = [
+        migrations.AddField(
+            model_name='switch',
+            name='managementAddress',
+            field=models.CharField(blank=True, help_text=b'Address where this switch can be managed', max_length=1024, null=True),
+        ),
+        migrations.AddField(
+            model_name='switchport',
+            name='kind',
+            field=models.CharField(blank=True, choices=[(b'access', b'access'), (b'internet', b'internet')], help_text=b'type of device attached to port', max_length=32, null=True),
+        ),
+    ]
diff --git a/xos/synchronizer/models/fabric.xproto b/xos/synchronizer/models/fabric.xproto
index df4275c..dc89c95 100644
--- a/xos/synchronizer/models/fabric.xproto
+++ b/xos/synchronizer/models/fabric.xproto
@@ -35,6 +35,10 @@
     required bool isEdgeRouter = 7 [
         help_text="True if the fabric switch is a leaf, False if it is a spine",
         default = True];
+    optional string managementAddress = 8 [
+        help_text = "Address where this switch can be managed",
+        feedback_state = True,
+        max_length = 1024];
 }
 
 message SwitchPort(XOSBase) {
@@ -61,6 +65,10 @@
         choices = "(('enabled', 'enabled'), ('disabled', 'disabled'))",
         feedback_state = True,
         max_length = 32];
+    optional string kind = 6 [
+        help_text = "type of device attached to port",
+        choices = "(('access', 'access'), ('internet', 'internet'))",
+        max_length = 32];
 }
 
 message PortInterface(XOSBase) {
diff --git a/xos/synchronizer/pull_steps/pull_onos_devices.py b/xos/synchronizer/pull_steps/pull_onos_devices.py
new file mode 100644
index 0000000..427b02a
--- /dev/null
+++ b/xos/synchronizer/pull_steps/pull_onos_devices.py
@@ -0,0 +1,78 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from xossynchronizer.pull_steps.pullstep import PullStep
+
+from xosconfig import Config
+from multistructlog import create_logger
+
+import requests
+from requests import ConnectionError
+from requests.auth import HTTPBasicAuth
+from requests.models import InvalidURL
+
+import os, sys
+sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+
+from helpers import Helpers
+
+log = create_logger(Config().get('logging'))
+
+class ONOSDevicePullStep(PullStep):
+    def __init__(self, model_accessor):
+        super(ONOSDevicePullStep, self).__init__(model_accessor=model_accessor)
+
+    def get_onos_fabric_service(self):
+        # FIXME do not select by name but follow ServiceDependency
+        fabric_service = self.model_accessor.Service.objects.get(name="fabric")
+        onos_fabric_service = fabric_service.provider_services[0].leaf_model
+        return onos_fabric_service
+
+    def pull_records(self):
+        log.debug("[ONOS device pull step] pulling devices from ONOS")
+
+        onos = self.get_onos_fabric_service()
+
+        url = 'http://%s:%s/onos/v1/devices/' % (onos.rest_hostname, onos.rest_port)
+
+        r = requests.get(url, auth=HTTPBasicAuth(onos.rest_username, onos.rest_password))
+
+        if r.status_code != 200:
+            log.error(r.text)
+            raise Exception("Failed to get onos devices")
+        else:
+            try:
+                log.info("Get devices response", json=r.json())
+            except Exception:
+                log.info("Get devices exception response", text=r.text)
+
+        for device in r.json()["devices"]:
+            if device["type"] != "SWITCH":
+                continue
+
+            xos_devices = self.model_accessor.Switch.objects.filter(ofId = device["id"])
+            if not xos_devices:
+                continue
+
+            xos_device = xos_devices[0]
+            changed = False
+
+            managementAddress = device.get("annotations", {}).get("managementAddress")
+            if (xos_device.managementAddress != managementAddress):
+                log.info("Setting managementAddress on switch %s to %s" % (xos_device.id, managementAddress))
+                xos_device.managementAddress = managementAddress
+                changed = True
+
+            if changed:
+                xos_device.save_changed_fields()