blob: 579804536493b6a59264db64fe5f03caf1927d34 [file] [log] [blame]
Zsolt Haraszti66862032016-11-28 14:28:39 -08001#
Zsolt Haraszti3eb27a52017-01-03 21:56:48 -08002# Copyright 2017 the original author or authors.
Zsolt Haraszti66862032016-11-28 14:28:39 -08003#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18Agent to play gateway between CORE and an individual adapter.
19"""
20from uuid import uuid4
21
Stephane Barbarie4db8ca22017-04-24 10:30:20 -040022import arrow
Zsolt Haraszti66862032016-11-28 14:28:39 -080023import structlog
Zsolt Harasztief05ad22017-01-07 22:08:06 -080024from google.protobuf.json_format import MessageToJson
Zsolt Haraszti8925d1f2016-12-21 00:45:19 -080025from scapy.packet import Packet
Zsolt Haraszti66862032016-11-28 14:28:39 -080026from twisted.internet.defer import inlineCallbacks, returnValue
27from zope.interface import implementer
28
Zsolt Haraszti89a27302016-12-08 16:53:06 -080029from common.event_bus import EventBusClient
Zsolt Harasztief05ad22017-01-07 22:08:06 -080030from common.frameio.frameio import hexify
Zsolt Haraszti66862032016-11-28 14:28:39 -080031from voltha.adapters.interface import IAdapterAgent
32from voltha.protos import third_party
Stephane Barbarie4db8ca22017-04-24 10:30:20 -040033from voltha.core.flow_decomposer import OUTPUT
Sergio Slobodriana2eb52b2017-03-07 12:24:46 -050034from voltha.protos.device_pb2 import Device, Port, PmConfigs
Stephane Barbarie4db8ca22017-04-24 10:30:20 -040035from voltha.protos.events_pb2 import AlarmEvent, AlarmEventType, \
36 AlarmEventSeverity, AlarmEventState, AlarmEventCategory
Sergio Slobodriana2eb52b2017-03-07 12:24:46 -050037from voltha.protos.events_pb2 import KpiEvent
Zsolt Haraszti66862032016-11-28 14:28:39 -080038from voltha.protos.voltha_pb2 import DeviceGroup, LogicalDevice, \
Stephane Barbarie4db8ca22017-04-24 10:30:20 -040039 LogicalPort, AdminState, OperStatus, AlarmFilterRuleKey
Zsolt Haraszti66862032016-11-28 14:28:39 -080040from voltha.registry import registry
41
42
Zsolt Haraszti66862032016-11-28 14:28:39 -080043@implementer(IAdapterAgent)
44class AdapterAgent(object):
45 """
46 Gate-keeper between CORE and device adapters.
47
48 On one side it interacts with Core's internal model and update/dispatch
49 mechanisms.
50
51 On the other side, it interacts with the adapters standard interface as
52 defined in
53 """
54
55 def __init__(self, adapter_name, adapter_cls):
56 self.adapter_name = adapter_name
57 self.adapter_cls = adapter_cls
58 self.core = registry('core')
59 self.adapter = None
60 self.adapter_node_proxy = None
61 self.root_proxy = self.core.get_proxy('/')
Zsolt Haraszti89a27302016-12-08 16:53:06 -080062 self._rx_event_subscriptions = {}
63 self._tx_event_subscriptions = {}
64 self.event_bus = EventBusClient()
Khen Nursimulud068d812017-03-06 11:44:18 -050065 self.packet_out_subscription = None
Zsolt Haraszti89a27302016-12-08 16:53:06 -080066 self.log = structlog.get_logger(adapter_name=adapter_name)
Zsolt Haraszti66862032016-11-28 14:28:39 -080067
68 @inlineCallbacks
69 def start(self):
Zsolt Haraszti89a27302016-12-08 16:53:06 -080070 self.log.debug('starting')
Zsolt Haraszti66862032016-11-28 14:28:39 -080071 config = self._get_adapter_config() # this may be None
Zsolt Haraszti89a27302016-12-08 16:53:06 -080072 try:
73 adapter = self.adapter_cls(self, config)
74 yield adapter.start()
Zsolt Haraszti656ecc62016-12-28 15:08:23 -080075 self.adapter = adapter
76 self.adapter_node_proxy = self._update_adapter_node()
77 self._update_device_types()
Zsolt Haraszti89a27302016-12-08 16:53:06 -080078 except Exception, e:
79 self.log.exception(e)
Zsolt Haraszti89a27302016-12-08 16:53:06 -080080 self.log.info('started')
Zsolt Haraszti66862032016-11-28 14:28:39 -080081 returnValue(self)
82
83 @inlineCallbacks
84 def stop(self):
Zsolt Haraszti89a27302016-12-08 16:53:06 -080085 self.log.debug('stopping')
Zsolt Haraszti66862032016-11-28 14:28:39 -080086 if self.adapter is not None:
87 yield self.adapter.stop()
88 self.adapter = None
Zsolt Haraszti89a27302016-12-08 16:53:06 -080089 self.log.info('stopped')
Zsolt Haraszti66862032016-11-28 14:28:39 -080090
91 def _get_adapter_config(self):
92 """
93 Opportunistically load persisted adapter configuration.
94 Return None if no configuration exists yet.
95 """
96 proxy = self.core.get_proxy('/')
97 try:
98 config = proxy.get('/adapters/' + self.adapter_name)
99 return config
100 except KeyError:
101 return None
102
103 def _update_adapter_node(self):
104 """
105 Creates or updates the adapter node object based on self
106 description from the adapter.
107 """
108
109 adapter_desc = self.adapter.adapter_descriptor()
110 assert adapter_desc.id == self.adapter_name
111 path = self._make_up_to_date(
112 '/adapters', self.adapter_name, adapter_desc)
113 return self.core.get_proxy(path)
114
115 def _update_device_types(self):
116 """
117 Make sure device types are registered in Core
118 """
119 device_types = self.adapter.device_types()
120 for device_type in device_types.items:
121 key = device_type.id
122 self._make_up_to_date('/device_types', key, device_type)
123
124 def _make_up_to_date(self, container_path, key, data):
125 full_path = container_path + '/' + str(key)
126 root_proxy = self.core.get_proxy('/')
127 try:
128 root_proxy.get(full_path)
129 root_proxy.update(full_path, data)
130 except KeyError:
131 root_proxy.add(container_path, data)
132 return full_path
133
Khen Nursimulud068d812017-03-06 11:44:18 -0500134 def _remove_node(self, container_path, key):
135 """
136 Remove a node from the data model
137 :param container_path: path to node
138 :param key: node
139 :return: None
140 """
141 full_path = container_path + '/' + str(key)
142 root_proxy = self.core.get_proxy('/')
143 try:
144 root_proxy.get(full_path)
145 root_proxy.remove(full_path)
146 except KeyError:
147 # Node does not exist
148 pass
149
Zsolt Haraszti66862032016-11-28 14:28:39 -0800150 # ~~~~~~~~~~~~~~~~~~~~~ Core-Facing Service ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
151
152 def adopt_device(self, device):
153 return self.adapter.adopt_device(device)
154
155 def abandon_device(self, device):
156 return self.adapter.abandon_device(device)
157
Khen Nursimulud068d812017-03-06 11:44:18 -0500158 def disable_device(self, device):
159 return self.adapter.disable_device(device)
160
161 def reenable_device(self, device):
162 return self.adapter.reenable_device(device)
163
164 def reboot_device(self, device):
165 return self.adapter.reboot_device(device)
166
167 def delete_device(self, device):
168 return self.adapter.delete_device(device)
169
170 def get_device_details(self, device):
171 return self.adapter.get_device_details(device)
Zsolt Haraszti66862032016-11-28 14:28:39 -0800172
Zsolt Harasztic5c5d102016-12-07 21:12:27 -0800173 def update_flows_bulk(self, device, flows, groups):
174 return self.adapter.update_flows_bulk(device, flows, groups)
175
176 def update_flows_incrementally(self, device, flow_changes, group_changes):
177 return self.update_flows_incrementally(
178 device, flow_changes, group_changes)
179
Stephane Barbarie4db8ca22017-04-24 10:30:20 -0400180 # def update_pm_collection(self, device, pm_collection_config):
Sergio Slobodriana2eb52b2017-03-07 12:24:46 -0500181 # return self.adapter.update_pm_collection(device, pm_collection_config)
182
183
Zsolt Haraszti66862032016-11-28 14:28:39 -0800184 # ~~~~~~~~~~~~~~~~~~~ Adapter-Facing Service ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
185
186 def get_device(self, device_id):
187 return self.root_proxy.get('/devices/{}'.format(device_id))
188
Peter Shafik9107f2e2017-05-02 15:54:39 -0400189 def get_child_device(self, parent_device_id, **kwargs):
190 """
191 Retrieve a child device object belonging
192 to the specified parent device based on some match
193 criteria. The first child device that matches the
194 provided criteria is returned.
195 :param parent_device_id: parent's device id
196 :param **kwargs: arbitrary list of match criteria
197 :return: Child Device Object or None
198 """
199 # Get all arguments to be used for comparison
200 # Note that for now we are only matching on the ONU ID
201 # Other matching fields can be added as required in the future
202 onu_id = kwargs.pop('onu_id', None)
203 if onu_id is None: return None
204
205 # Get all devices
206 devices = self.root_proxy.get('/devices')
207
208 # Get all child devices with the same parent ID
209 children_ids = set(d.id for d in devices if d.parent_id == parent_device_id)
210
211 # Loop through all the child devices with this parent ID
212 for child_id in children_ids:
213 found = True
214 device = self.get_device(child_id)
215
216 # Does this child device match the passed in ONU ID?
217 if device.proxy_address.onu_id != onu_id:
218 found = False
219
220 # Return the matched child device
221 if found is True:
222 return device
223
224 return None
225
Zsolt Haraszti66862032016-11-28 14:28:39 -0800226 def add_device(self, device):
227 assert isinstance(device, Device)
228 self._make_up_to_date('/devices', device.id, device)
229
alshabibbe8ca2e2017-02-01 18:28:57 -0800230 # Ultimately, assign devices to device grpups.
231 # see https://jira.opencord.org/browse/CORD-838
Zsolt Haraszti66862032016-11-28 14:28:39 -0800232
233 dg = DeviceGroup(id='1')
234 self._make_up_to_date('/device_groups', dg.id, dg)
235
236 # add device to device group
alshabibbe8ca2e2017-02-01 18:28:57 -0800237 # see https://jira.opencord.org/browse/CORD-838
Zsolt Haraszti66862032016-11-28 14:28:39 -0800238
239 def update_device(self, device):
240 assert isinstance(device, Device)
241
242 # we run the update through the device_agent so that the change
243 # does not loop back to the adapter unnecessarily
244 device_agent = self.core.get_device_agent(device.id)
245 device_agent.update_device(device)
246
Sergio Slobodrian2db4c102017-03-09 22:29:23 -0500247 def update_device_pm_config(self, device_pm_config, init=False):
248 assert isinstance(device_pm_config, PmConfigs)
249
250 # we run the update through the device_agent so that the change
251 # does not loop back to the adapter unnecessarily
252 device_agent = self.core.get_device_agent(device_pm_config.id)
Stephane Barbarie4db8ca22017-04-24 10:30:20 -0400253 device_agent.update_device_pm_config(device_pm_config, init)
Sergio Slobodrian2db4c102017-03-09 22:29:23 -0500254
Sergio Slobodrian98eff412017-03-15 14:46:30 -0400255 def update_adapter_pm_config(self, device_id, device_pm_config):
256 device = self.get_device(device_id)
Sergio Slobodrian2db4c102017-03-09 22:29:23 -0500257 self.adapter.update_pm_config(device, device_pm_config)
258
Khen Nursimuluc60afa12017-03-13 14:33:50 -0400259 def _add_peer_reference(self, device_id, port):
Zsolt Haraszti66862032016-11-28 14:28:39 -0800260 # for referential integrity, add/augment references
261 port.device_id = device_id
262 me_as_peer = Port.PeerPort(device_id=device_id, port_no=port.port_no)
263 for peer in port.peers:
264 peer_port_path = '/devices/{}/ports/{}'.format(
265 peer.device_id, peer.port_no)
266 peer_port = self.root_proxy.get(peer_port_path)
267 if me_as_peer not in peer_port.peers:
268 new = peer_port.peers.add()
269 new.CopyFrom(me_as_peer)
270 self.root_proxy.update(peer_port_path, peer_port)
271
Khen Nursimuluc60afa12017-03-13 14:33:50 -0400272 def _del_peer_reference(self, device_id, port):
273 me_as_peer = Port.PeerPort(device_id=device_id, port_no=port.port_no)
274 for peer in port.peers:
275 peer_port_path = '/devices/{}/ports/{}'.format(
276 peer.device_id, peer.port_no)
277 peer_port = self.root_proxy.get(peer_port_path)
278 if me_as_peer in peer_port.peers:
279 peer_port.peers.remove(me_as_peer)
280 self.root_proxy.update(peer_port_path, peer_port)
281
282 def add_port(self, device_id, port):
283 assert isinstance(port, Port)
284
285 # for referential integrity, add/augment references
286 self._add_peer_reference(device_id, port)
287
288 # Add port
Zsolt Haraszti66862032016-11-28 14:28:39 -0800289 self._make_up_to_date('/devices/{}/ports'.format(device_id),
290 port.port_no, port)
291
Khen Nursimulud068d812017-03-06 11:44:18 -0500292 def disable_all_ports(self, device_id):
293 """
294 Disable all ports on that device, i.e. change the admin status to
Khen Nursimuluc60afa12017-03-13 14:33:50 -0400295 disable and operational status to UNKNOWN.
Khen Nursimulud068d812017-03-06 11:44:18 -0500296 :param device_id: device id
297 :return: None
298 """
299
300 # get all device ports
301 ports = self.root_proxy.get('/devices/{}/ports'.format(device_id))
302 for port in ports:
303 port.admin_state = AdminState.DISABLED
304 port.oper_status = OperStatus.UNKNOWN
305 self._make_up_to_date('/devices/{}/ports'.format(device_id),
306 port.port_no, port)
307
Khen Nursimuluc60afa12017-03-13 14:33:50 -0400308 def enable_all_ports(self, device_id):
Khen Nursimulud068d812017-03-06 11:44:18 -0500309 """
310 Re-enable all ports on that device, i.e. change the admin status to
311 enabled and operational status to ACTIVE
312 :param device_id: device id
313 :return: None
314 """
315
316 # get all device ports
317 ports = self.root_proxy.get('/devices/{}/ports'.format(device_id))
318 for port in ports:
319 port.admin_state = AdminState.ENABLED
320 port.oper_status = OperStatus.ACTIVE
321 self._make_up_to_date('/devices/{}/ports'.format(device_id),
322 port.port_no, port)
323
324 def delete_all_peer_references(self, device_id):
325 """
326 Remove all peer port references for that device
327 :param device_id: device_id of device
328 :return: None
329 """
330 ports = self.root_proxy.get('/devices/{}/ports'.format(device_id))
331 for port in ports:
332 port_path = '/devices/{}/ports/{}'.format(device_id, port.port_no)
333 for peer in port.peers:
334 port.peers.remove(peer)
335 self.root_proxy.update(port_path, port)
336
337 def delete_port_reference_from_parent(self, device_id, port):
338 """
339 Delete the port reference from the parent device
340 :param device_id: id of device containing the port
341 :param port: port to remove
342 :return: None
343 """
344 assert isinstance(port, Port)
345 self.log.info('delete-port-reference', device_id=device_id, port=port)
Khen Nursimuluc60afa12017-03-13 14:33:50 -0400346 self._del_peer_reference(device_id, port)
Khen Nursimulud068d812017-03-06 11:44:18 -0500347
Khen Nursimuluc60afa12017-03-13 14:33:50 -0400348 def add_port_reference_to_parent(self, device_id, port):
349 """
350 Add the port reference to the parent device
351 :param device_id: id of device containing the port
352 :param port: port to add
353 :return: None
354 """
355 assert isinstance(port, Port)
356 self.log.info('add-port-reference', device_id=device_id, port=port)
357 self._add_peer_reference(device_id, port)
Khen Nursimulud068d812017-03-06 11:44:18 -0500358
Zsolt Harasztid036b7e2016-12-23 15:36:01 -0800359 def _find_first_available_id(self):
360 logical_devices = self.root_proxy.get('/logical_devices')
361 existing_ids = set(ld.id for ld in logical_devices)
362 existing_datapath_ids = set(ld.datapath_id for ld in logical_devices)
363 i = 1
364 while True:
365 if i not in existing_datapath_ids and str(i) not in existing_ids:
366 return i
367 i += 1
368
Zsolt Haraszti656ecc62016-12-28 15:08:23 -0800369 def get_logical_device(self, logical_device_id):
370 return self.root_proxy.get('/logical_devices/{}'.format(
371 logical_device_id))
372
Khen Nursimulud068d812017-03-06 11:44:18 -0500373 def get_logical_port(self, logical_device_id, port_id):
374 return self.root_proxy.get('/logical_devices/{}/ports/{}'.format(
375 logical_device_id, port_id))
376
Zsolt Haraszti66862032016-11-28 14:28:39 -0800377 def create_logical_device(self, logical_device):
378 assert isinstance(logical_device, LogicalDevice)
Zsolt Harasztid036b7e2016-12-23 15:36:01 -0800379
380 if not logical_device.id:
381 id = self._find_first_available_id()
382 logical_device.id = str(id)
383 logical_device.datapath_id = id
384
Zsolt Haraszti66862032016-11-28 14:28:39 -0800385 self._make_up_to_date('/logical_devices',
386 logical_device.id, logical_device)
387
Khen Nursimulud068d812017-03-06 11:44:18 -0500388 # Keep a reference to the packet out subscription as it will be
389 # referred during removal
390 self.packet_out_subscription = self.event_bus.subscribe(
Zsolt Haraszti656ecc62016-12-28 15:08:23 -0800391 topic='packet-out:{}'.format(logical_device.id),
392 callback=lambda _, p: self.receive_packet_out(logical_device.id, p)
393 )
394
Zsolt Harasztid036b7e2016-12-23 15:36:01 -0800395 return logical_device
396
Khen Nursimulud068d812017-03-06 11:44:18 -0500397 def delete_logical_device(self, logical_device):
398 """
399 This will remove the logical device as well as all logical ports
400 associated with it
401 :param logical_device: The logical device to remove
402 :return: None
403 """
404 assert isinstance(logical_device, LogicalDevice)
405
406 # Remove packet out subscription
407 self.event_bus.unsubscribe(self.packet_out_subscription)
408
409 # Remove node from the data model - this will trigger the logical
410 # device 'remove callbacks' as well as logical ports 'remove
411 # callbacks' if present
412 self._remove_node('/logical_devices', logical_device.id)
413
Zsolt Haraszti656ecc62016-12-28 15:08:23 -0800414 def receive_packet_out(self, logical_device_id, ofp_packet_out):
415
416 def get_port_out(opo):
417 for action in opo.actions:
418 if action.type == OUTPUT:
419 return action.output.port
420
421 out_port = get_port_out(ofp_packet_out)
422 frame = ofp_packet_out.data
423 self.adapter.receive_packet_out(logical_device_id, out_port, frame)
424
Zsolt Haraszti66862032016-11-28 14:28:39 -0800425 def add_logical_port(self, logical_device_id, port):
426 assert isinstance(port, LogicalPort)
427 self._make_up_to_date(
428 '/logical_devices/{}/ports'.format(logical_device_id),
429 port.id, port)
430
Khen Nursimulud068d812017-03-06 11:44:18 -0500431 def delete_logical_port(self, logical_device_id, port):
432 assert isinstance(port, LogicalPort)
433 self._remove_node('/logical_devices/{}/ports'.format(
434 logical_device_id), port.id)
435
436 def update_logical_port(self, logical_device_id, port):
437 assert isinstance(port, LogicalPort)
438 self.log.debug('update-logical-port',
439 logical_device_id=logical_device_id,
440 port=port)
441
442 self._make_up_to_date(
443 '/logical_devices/{}/ports'.format(logical_device_id),
444 port.id, port)
445
Zsolt Haraszti66862032016-11-28 14:28:39 -0800446 def child_device_detected(self,
447 parent_device_id,
448 parent_port_no,
449 child_device_type,
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800450 proxy_address,
451 **kw):
Zsolt Haraszti66862032016-11-28 14:28:39 -0800452 # we create new ONU device objects and insert them into the config
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800453 # TODO should we auto-enable the freshly created device? Probably.
Zsolt Haraszti66862032016-11-28 14:28:39 -0800454 device = Device(
455 id=uuid4().hex[:12],
456 type=child_device_type,
457 parent_id=parent_device_id,
458 parent_port_no=parent_port_no,
459 admin_state=AdminState.ENABLED,
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800460 proxy_address=proxy_address,
461 **kw
Zsolt Haraszti66862032016-11-28 14:28:39 -0800462 )
463 self._make_up_to_date(
464 '/devices', device.id, device)
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800465
466 topic = self._gen_tx_proxy_address_topic(proxy_address)
467 self._tx_event_subscriptions[topic] = self.event_bus.subscribe(
468 topic, lambda t, m: self._send_proxied_message(proxy_address, m))
469
Khen Nursimulud068d812017-03-06 11:44:18 -0500470 def remove_all_logical_ports(self, logical_device_id):
471 """ Remove all logical ports from a given logical device"""
472 ports = self.root_proxy.get('/logical_devices/{}/ports')
473 for port in ports:
474 self._remove_node('/logical_devices/{}/ports', port.id)
475
476 def delete_all_child_devices(self, parent_device_id):
477 """ Remove all ONUs from a given OLT """
478 devices = self.root_proxy.get('/devices')
479 children_ids = set(d.id for d in devices if d.parent_id == parent_device_id)
480 self.log.debug('devices-to-delete',
481 parent_id=parent_device_id,
482 children_ids=children_ids)
483 for child_id in children_ids:
484 self._remove_node('/devices', child_id)
485
khenaidoo2d7af132017-03-23 15:45:51 -0400486 def update_child_devices_state(self,
487 parent_device_id,
488 oper_status=None,
489 connect_status=None,
490 admin_state=None):
491 """ Update status of all child devices """
Khen Nursimulud068d812017-03-06 11:44:18 -0500492 devices = self.root_proxy.get('/devices')
493 children_ids = set(d.id for d in devices if d.parent_id == parent_device_id)
khenaidoo2d7af132017-03-23 15:45:51 -0400494 self.log.debug('update-devices',
Khen Nursimulud068d812017-03-06 11:44:18 -0500495 parent_id=parent_device_id,
khenaidoo2d7af132017-03-23 15:45:51 -0400496 children_ids=children_ids,
497 oper_status=oper_status,
498 connect_status=connect_status,
499 admin_state=admin_state)
Khen Nursimulud068d812017-03-06 11:44:18 -0500500
khenaidoo71d0a6c2017-03-22 21:46:04 -0400501 for child_id in children_ids:
502 device = self.get_device(child_id)
khenaidoo2d7af132017-03-23 15:45:51 -0400503 if oper_status:
504 device.oper_status = oper_status
505 if connect_status:
506 device.connect_status = connect_status
507 if admin_state:
508 device.admin_state = admin_state
khenaidoo71d0a6c2017-03-22 21:46:04 -0400509 self._make_up_to_date(
510 '/devices', device.id, device)
511
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800512 def _gen_rx_proxy_address_topic(self, proxy_address):
513 """Generate unique topic name specific to this proxy address for rx"""
Zsolt Harasztief05ad22017-01-07 22:08:06 -0800514 topic = 'rx:' + MessageToJson(proxy_address)
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800515 return topic
516
517 def _gen_tx_proxy_address_topic(self, proxy_address):
518 """Generate unique topic name specific to this proxy address for tx"""
Zsolt Harasztief05ad22017-01-07 22:08:06 -0800519 topic = 'tx:' + MessageToJson(proxy_address)
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800520 return topic
521
522 def register_for_proxied_messages(self, proxy_address):
523 topic = self._gen_rx_proxy_address_topic(proxy_address)
524 self._rx_event_subscriptions[topic] = self.event_bus.subscribe(
Stephane Barbariecc6b2e62017-03-02 14:35:55 -0500525 topic,
526 lambda t, m: self._receive_proxied_message(proxy_address, m))
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800527
Khen Nursimulud068d812017-03-06 11:44:18 -0500528 def unregister_for_proxied_messages(self, proxy_address):
529 topic = self._gen_rx_proxy_address_topic(proxy_address)
530 self.event_bus.unsubscribe(self._rx_event_subscriptions[topic])
531 del self._rx_event_subscriptions[topic]
532
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800533 def _receive_proxied_message(self, proxy_address, msg):
534 self.adapter.receive_proxied_message(proxy_address, msg)
535
536 def send_proxied_message(self, proxy_address, msg):
537 topic = self._gen_tx_proxy_address_topic(proxy_address)
538 self.event_bus.publish(topic, msg)
539
540 def _send_proxied_message(self, proxy_address, msg):
541 self.adapter.send_proxied_message(proxy_address, msg)
542
543 def receive_proxied_message(self, proxy_address, msg):
544 topic = self._gen_rx_proxy_address_topic(proxy_address)
545 self.event_bus.publish(topic, msg)
Zsolt Haraszti8925d1f2016-12-21 00:45:19 -0800546
Peter Shafik9107f2e2017-05-02 15:54:39 -0400547 def register_for_inter_adapter_messages(self):
548 self.event_bus.subscribe(self.adapter_name,
549 lambda t, m: self.adapter.receive_inter_adapter_message(m))
550
551 def unregister_for_inter_adapter_messages(self):
552 self.event_bus.unsubscribe(self.adapter_name)
553
554 def publish_inter_adapter_message(self, device_id, msg):
555 # Get the device from the device_id
556 device = self.get_device(device_id)
557 assert device is not None
558
559 # Publish a message to the adapter that is responsible
560 # for managing this device
561 self.event_bus.publish(device.type, msg)
562
Zsolt Haraszti8925d1f2016-12-21 00:45:19 -0800563 # ~~~~~~~~~~~~~~~~~~ Handling packet-in and packet-out ~~~~~~~~~~~~~~~~~~~~
564
565 def send_packet_in(self, logical_device_id, logical_port_no, packet):
566 self.log.debug('send-packet-in', logical_device_id=logical_device_id,
Zsolt Harasztief05ad22017-01-07 22:08:06 -0800567 logical_port_no=logical_port_no, packet=hexify(packet))
Zsolt Haraszti8925d1f2016-12-21 00:45:19 -0800568
569 if isinstance(packet, Packet):
570 packet = str(packet)
571
572 topic = 'packet-in:' + logical_device_id
573 self.event_bus.publish(topic, (logical_port_no, packet))
Zsolt Haraszti749b0952017-01-18 09:02:35 -0800574
575 # ~~~~~~~~~~~~~~~~~~~ Handling KPI metric submissions ~~~~~~~~~~~~~~~~~~~~~
Zsolt Harasztic5f740b2017-01-18 09:53:17 -0800576
Zsolt Haraszti749b0952017-01-18 09:02:35 -0800577 def submit_kpis(self, kpi_event_msg):
578 try:
579 assert isinstance(kpi_event_msg, KpiEvent)
580 self.event_bus.publish('kpis', kpi_event_msg)
581 except Exception as e:
582 self.log.exception('failed-kpi-submission',
583 type=type(kpi_event_msg))
Stephane Barbarie52198b92017-03-02 13:44:46 -0500584
585 # ~~~~~~~~~~~~~~~~~~~ Handle alarm submissions ~~~~~~~~~~~~~~~~~~~~~
586
Stephane Barbariecc6b2e62017-03-02 14:35:55 -0500587 def create_alarm(self, id=None, resource_id=None, description=None,
588 raised_ts=0, changed_ts=0,
589 type=AlarmEventType.EQUIPMENT,
Stephane Barbariebf3e10c2017-03-03 10:15:58 -0500590 category=AlarmEventCategory.PON,
Stephane Barbariecc6b2e62017-03-02 14:35:55 -0500591 severity=AlarmEventSeverity.MINOR,
592 state=AlarmEventState.RAISED,
Stephane Barbarie52198b92017-03-02 13:44:46 -0500593 context=None):
594
595 # Construct the ID if it is not provided
596 if id == None:
597 id = 'voltha.{}.{}'.format(self.adapter_name, resource_id)
598
599 return AlarmEvent(
600 id=id,
601 resource_id=resource_id,
602 type=type,
603 category=category,
604 severity=severity,
605 state=state,
606 description=description,
607 reported_ts=arrow.utcnow().timestamp,
608 raised_ts=raised_ts,
609 changed_ts=changed_ts,
610 context=context
611 )
612
Stephane Barbarie4db8ca22017-04-24 10:30:20 -0400613 def filter_alarm(self, device_id, alarm_event):
614 alarm_filters = self.root_proxy.get('/alarm_filters')
615
616 rule_values = {
617 'id': alarm_event.id,
618 'type': AlarmEventType.AlarmEventType.Name(alarm_event.type),
619 'category': AlarmEventCategory.AlarmEventCategory.Name(alarm_event.category),
620 'severity': AlarmEventSeverity.AlarmEventSeverity.Name(alarm_event.severity),
621 'resource_id': alarm_event.resource_id,
622 'device_id': device_id
623 }
624
625 for alarm_filter in alarm_filters:
626 if alarm_filter.rules:
627 exclude = True
628 for rule in alarm_filter.rules:
629 self.log.debug("compare-alarm-event",
630 key=AlarmFilterRuleKey.AlarmFilterRuleKey.Name(rule.key),
631 actual=rule_values[AlarmFilterRuleKey.AlarmFilterRuleKey.Name(rule.key).lower()],
632 expected=rule.value.lower())
633 exclude = exclude and \
634 (rule_values[AlarmFilterRuleKey.AlarmFilterRuleKey.Name(
635 rule.key).lower()] == rule.value.lower())
636 if not exclude:
637 break
638
639 if exclude:
640 self.log.info("filtered-alarm-event", alarm=alarm_event)
641 return True
642
643 return False
644
645 def submit_alarm(self, device_id, alarm_event_msg):
Stephane Barbarie52198b92017-03-02 13:44:46 -0500646 try:
647 assert isinstance(alarm_event_msg, AlarmEvent)
Stephane Barbarie4db8ca22017-04-24 10:30:20 -0400648 if not self.filter_alarm(device_id, alarm_event_msg):
649 self.event_bus.publish('alarms', alarm_event_msg)
Stephane Barbarie52198b92017-03-02 13:44:46 -0500650
651 except Exception as e:
652 self.log.exception('failed-alarm-submission',
653 type=type(alarm_event_msg))