SEBA-206 Report OLT link connectivity in OLTDevice model

Change-Id: Icb3a75d719adac6d8eea139fe9d00d34a11baecb
diff --git a/xos/synchronizer/event_steps/onos_event.py b/xos/synchronizer/event_steps/onos_event.py
new file mode 100644
index 0000000..574413d
--- /dev/null
+++ b/xos/synchronizer/event_steps/onos_event.py
@@ -0,0 +1,90 @@
+
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import datetime
+import json
+import time
+from xossynchronizer.event_steps.eventstep import EventStep
+from xosconfig import Config
+from xoskafka import XOSKafkaProducer
+from multistructlog import create_logger
+
+log = create_logger(Config().get('logging'))
+
+
+class OnosPortEventStep(EventStep):
+    topics = ["onos.events.port"]
+    technology = "kafka"
+
+    def __init__(self, *args, **kwargs):
+        super(OnosPortEventStep, self).__init__(*args, **kwargs)
+
+    def subscriber_olt_closure(self, olt):
+        subscribers = []
+        for pon_port in olt.pon_ports.all():
+            for onu_device in pon_port.onu_devices.all():
+                for si in onu_device.volt_service_instances.all():
+                    for subscriber in si.westbound_service_instances:
+                        subscribers.append(subscriber)
+        return subscribers
+
+
+    def send_alarm(self, olt, value):
+        timestamp = time.mktime(datetime.datetime.strptime(value["timestamp"], "%Y-%m-%dT%H:%M:%S.%fZ").timetuple())
+        state = "RAISED" if olt.link_status == "down" else "CLEARED"
+
+        # Hypothetically, a maximum of 64 subscribers per pon port, 16 pon ports, and 32 characters
+        # per subscriber name = 32KB of subscriber names in the event.
+        subscribers = self.subscriber_olt_closure(olt)
+        subscribers = [x.name for x in subscribers]
+
+        alarm = {"category": "OLT",
+                 "reported_ts": time.time(),
+                 "raised_ts": timestamp,
+                 "state": state,
+                 "alarm_type_name": "OLT.LOSS_OF_AGGSWITCH",
+                 "severity": "MAJOR",
+                 "resource_id": olt.device_id,
+                 "logical_device_id": olt.dp_id,
+                 "context": {"affected_subscribers": subscribers},
+                 "type": "COMMUNICATION",
+                 "id": "xos.voltservice.%s.OLT_LOSS_OF_AGGWSWITCH" % olt.device_id,
+                 "description": "xos.voltservice.%s - OLT LOSS OF AGGSWITCH Alarm - OLT_LOSS_OF_AGGSWITCH - %s" % (olt.device_id, state)}
+
+        topic = "xos.alarms.olt-service"
+        key = olt.device_id
+        value = json.dumps(alarm, default=lambda o: repr(o))
+
+        XOSKafkaProducer.produce(topic, key, value)
+
+    def process_event(self, event):
+        log.info("Received ONOS Port Event", kafka_event=event)
+
+        value = json.loads(event.value)
+
+        olt = self.model_accessor.OLTDevice.objects.filter(switch_datapath_id=value["deviceId"],
+                                                           switch_port=value["portId"])
+        if not olt:
+            log.info("Onos port event not for a known olt", deviceId=value["deviceId"], portId=value["portId"])
+            return
+
+        olt = olt[0]
+
+        link_status = "up" if value["enabled"] else "down"
+        if link_status != olt.link_status:
+            olt.link_status = link_status
+            olt.save_changed_fields()
+            self.send_alarm(olt, value)