blob: 574413d176e96f5a0405d4f250195ad3c5183482 [file] [log] [blame]
Scott Baker12687732019-03-19 15:40:57 -07001
2# Copyright 2017-present Open Networking Foundation
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16
17import datetime
18import json
19import time
20from xossynchronizer.event_steps.eventstep import EventStep
21from xosconfig import Config
22from xoskafka import XOSKafkaProducer
23from multistructlog import create_logger
24
25log = create_logger(Config().get('logging'))
26
27
28class OnosPortEventStep(EventStep):
29 topics = ["onos.events.port"]
30 technology = "kafka"
31
32 def __init__(self, *args, **kwargs):
33 super(OnosPortEventStep, self).__init__(*args, **kwargs)
34
35 def subscriber_olt_closure(self, olt):
36 subscribers = []
37 for pon_port in olt.pon_ports.all():
38 for onu_device in pon_port.onu_devices.all():
39 for si in onu_device.volt_service_instances.all():
40 for subscriber in si.westbound_service_instances:
41 subscribers.append(subscriber)
42 return subscribers
43
44
45 def send_alarm(self, olt, value):
46 timestamp = time.mktime(datetime.datetime.strptime(value["timestamp"], "%Y-%m-%dT%H:%M:%S.%fZ").timetuple())
47 state = "RAISED" if olt.link_status == "down" else "CLEARED"
48
49 # Hypothetically, a maximum of 64 subscribers per pon port, 16 pon ports, and 32 characters
50 # per subscriber name = 32KB of subscriber names in the event.
51 subscribers = self.subscriber_olt_closure(olt)
52 subscribers = [x.name for x in subscribers]
53
54 alarm = {"category": "OLT",
55 "reported_ts": time.time(),
56 "raised_ts": timestamp,
57 "state": state,
58 "alarm_type_name": "OLT.LOSS_OF_AGGSWITCH",
59 "severity": "MAJOR",
60 "resource_id": olt.device_id,
61 "logical_device_id": olt.dp_id,
62 "context": {"affected_subscribers": subscribers},
63 "type": "COMMUNICATION",
64 "id": "xos.voltservice.%s.OLT_LOSS_OF_AGGWSWITCH" % olt.device_id,
65 "description": "xos.voltservice.%s - OLT LOSS OF AGGSWITCH Alarm - OLT_LOSS_OF_AGGSWITCH - %s" % (olt.device_id, state)}
66
67 topic = "xos.alarms.olt-service"
68 key = olt.device_id
69 value = json.dumps(alarm, default=lambda o: repr(o))
70
71 XOSKafkaProducer.produce(topic, key, value)
72
73 def process_event(self, event):
74 log.info("Received ONOS Port Event", kafka_event=event)
75
76 value = json.loads(event.value)
77
78 olt = self.model_accessor.OLTDevice.objects.filter(switch_datapath_id=value["deviceId"],
79 switch_port=value["portId"])
80 if not olt:
81 log.info("Onos port event not for a known olt", deviceId=value["deviceId"], portId=value["portId"])
82 return
83
84 olt = olt[0]
85
86 link_status = "up" if value["enabled"] else "down"
87 if link_status != olt.link_status:
88 olt.link_status = link_status
89 olt.save_changed_fields()
90 self.send_alarm(olt, value)