blob: 3adce23e748bffcef8c82069e065bb50468352c7 [file] [log] [blame]
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import json
import time
from xossynchronizer.event_steps.eventstep import EventStep
from xosconfig import Config
from xoskafka import XOSKafkaProducer
from multistructlog import create_logger
log = create_logger(Config().get('logging'))
class OnosPortEventStep(EventStep):
topics = ["onos.events.port"]
technology = "kafka"
def __init__(self, *args, **kwargs):
super(OnosPortEventStep, self).__init__(*args, **kwargs)
def subscriber_olt_closure(self, olt):
subscribers = []
for pon_port in olt.pon_ports.all():
for onu_device in pon_port.onu_devices.all():
for si in onu_device.volt_service_instances.all():
for subscriber in si.westbound_service_instances:
subscribers.append(subscriber)
return subscribers
def send_alarm(self, olt, value):
timestamp = time.mktime(datetime.datetime.strptime(value["timestamp"], "%Y-%m-%dT%H:%M:%S.%fZ").timetuple())
state = "RAISED" if olt.link_status == "down" else "CLEARED"
# Hypothetically, a maximum of 64 subscribers per pon port, 16 pon ports, and 32 characters
# per subscriber name = 32KB of subscriber names in the event.
subscribers = self.subscriber_olt_closure(olt)
subscribers = [x.name for x in subscribers]
alarm = {"category": "OLT",
"reported_ts": time.time(),
"raised_ts": timestamp,
"state": state,
"alarm_type_name": "OLT.PORT_LOS",
"severity": "MAJOR",
"resource_id": olt.device_id,
"logical_device_id": olt.dp_id,
"context": {"affected_subscribers": subscribers,
"switch_datapath_id": olt.switch_datapath_id,
"switch_port": olt.switch_port,
"oltdevice.name": olt.name},
"type": "COMMUNICATION",
"id": "xos.voltservice.%s.OLT_PORT_LOS" % olt.device_id,
"description": "xos.voltservice.%s - OLT PORT LOS Alarm -"
" OLT_PORT_LOS - %s" % (olt.device_id, state)}
topic = "xos.alarms.olt-service"
key = olt.device_id
value = json.dumps(alarm, default=lambda o: repr(o))
XOSKafkaProducer.produce(topic, key, value)
def process_event(self, event):
log.info("Received ONOS Port Event", kafka_event=event)
value = json.loads(event.value)
olt = self.model_accessor.OLTDevice.objects.filter(switch_datapath_id=value["deviceId"],
switch_port=value["portId"])
if not olt:
log.info("Onos port event not for a known olt", deviceId=value["deviceId"], portId=value["portId"])
return
olt = olt[0]
link_status = "up" if value["enabled"] else "down"
if link_status != olt.link_status:
olt.link_status = link_status
olt.save_changed_fields()
self.send_alarm(olt, value)