blob: e7a26f8985ef5624d565d91059431c139d115dae [file] [log] [blame]
Zsolt Haraszti66862032016-11-28 14:28:39 -08001#
Zsolt Haraszti3eb27a52017-01-03 21:56:48 -08002# Copyright 2017 the original author or authors.
Zsolt Haraszti66862032016-11-28 14:28:39 -08003#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18Agent to play gateway between CORE and an individual adapter.
19"""
20from uuid import uuid4
Stephane Barbarie52198b92017-03-02 13:44:46 -050021import arrow
22import re
Zsolt Haraszti66862032016-11-28 14:28:39 -080023
24import structlog
Zsolt Harasztief05ad22017-01-07 22:08:06 -080025from google.protobuf.json_format import MessageToJson
Zsolt Haraszti8925d1f2016-12-21 00:45:19 -080026from scapy.packet import Packet
Zsolt Haraszti66862032016-11-28 14:28:39 -080027from twisted.internet.defer import inlineCallbacks, returnValue
28from zope.interface import implementer
29
Zsolt Haraszti89a27302016-12-08 16:53:06 -080030from common.event_bus import EventBusClient
Zsolt Harasztief05ad22017-01-07 22:08:06 -080031from common.frameio.frameio import hexify
Zsolt Haraszti66862032016-11-28 14:28:39 -080032from voltha.adapters.interface import IAdapterAgent
33from voltha.protos import third_party
34from voltha.protos.device_pb2 import Device, Port
Stephane Barbariecc6b2e62017-03-02 14:35:55 -050035from voltha.protos.events_pb2 import KpiEvent, AlarmEvent, AlarmEventType, \
36 AlarmEventSeverity, AlarmEventState, AlarmEventCategory
Zsolt Haraszti66862032016-11-28 14:28:39 -080037from voltha.protos.voltha_pb2 import DeviceGroup, LogicalDevice, \
38 LogicalPort, AdminState
39from voltha.registry import registry
Zsolt Haraszti656ecc62016-12-28 15:08:23 -080040from voltha.core.flow_decomposer import OUTPUT
Zsolt Haraszti66862032016-11-28 14:28:39 -080041
42
Zsolt Haraszti66862032016-11-28 14:28:39 -080043@implementer(IAdapterAgent)
44class AdapterAgent(object):
45 """
46 Gate-keeper between CORE and device adapters.
47
48 On one side it interacts with Core's internal model and update/dispatch
49 mechanisms.
50
51 On the other side, it interacts with the adapters standard interface as
52 defined in
53 """
54
55 def __init__(self, adapter_name, adapter_cls):
56 self.adapter_name = adapter_name
57 self.adapter_cls = adapter_cls
58 self.core = registry('core')
59 self.adapter = None
60 self.adapter_node_proxy = None
61 self.root_proxy = self.core.get_proxy('/')
Zsolt Haraszti89a27302016-12-08 16:53:06 -080062 self._rx_event_subscriptions = {}
63 self._tx_event_subscriptions = {}
64 self.event_bus = EventBusClient()
65 self.log = structlog.get_logger(adapter_name=adapter_name)
Zsolt Haraszti66862032016-11-28 14:28:39 -080066
67 @inlineCallbacks
68 def start(self):
Zsolt Haraszti89a27302016-12-08 16:53:06 -080069 self.log.debug('starting')
Zsolt Haraszti66862032016-11-28 14:28:39 -080070 config = self._get_adapter_config() # this may be None
Zsolt Haraszti89a27302016-12-08 16:53:06 -080071 try:
72 adapter = self.adapter_cls(self, config)
73 yield adapter.start()
Zsolt Haraszti656ecc62016-12-28 15:08:23 -080074 self.adapter = adapter
75 self.adapter_node_proxy = self._update_adapter_node()
76 self._update_device_types()
Zsolt Haraszti89a27302016-12-08 16:53:06 -080077 except Exception, e:
78 self.log.exception(e)
Zsolt Haraszti89a27302016-12-08 16:53:06 -080079 self.log.info('started')
Zsolt Haraszti66862032016-11-28 14:28:39 -080080 returnValue(self)
81
82 @inlineCallbacks
83 def stop(self):
Zsolt Haraszti89a27302016-12-08 16:53:06 -080084 self.log.debug('stopping')
Zsolt Haraszti66862032016-11-28 14:28:39 -080085 if self.adapter is not None:
86 yield self.adapter.stop()
87 self.adapter = None
Zsolt Haraszti89a27302016-12-08 16:53:06 -080088 self.log.info('stopped')
Zsolt Haraszti66862032016-11-28 14:28:39 -080089
90 def _get_adapter_config(self):
91 """
92 Opportunistically load persisted adapter configuration.
93 Return None if no configuration exists yet.
94 """
95 proxy = self.core.get_proxy('/')
96 try:
97 config = proxy.get('/adapters/' + self.adapter_name)
98 return config
99 except KeyError:
100 return None
101
102 def _update_adapter_node(self):
103 """
104 Creates or updates the adapter node object based on self
105 description from the adapter.
106 """
107
108 adapter_desc = self.adapter.adapter_descriptor()
109 assert adapter_desc.id == self.adapter_name
110 path = self._make_up_to_date(
111 '/adapters', self.adapter_name, adapter_desc)
112 return self.core.get_proxy(path)
113
114 def _update_device_types(self):
115 """
116 Make sure device types are registered in Core
117 """
118 device_types = self.adapter.device_types()
119 for device_type in device_types.items:
120 key = device_type.id
121 self._make_up_to_date('/device_types', key, device_type)
122
123 def _make_up_to_date(self, container_path, key, data):
124 full_path = container_path + '/' + str(key)
125 root_proxy = self.core.get_proxy('/')
126 try:
127 root_proxy.get(full_path)
128 root_proxy.update(full_path, data)
129 except KeyError:
130 root_proxy.add(container_path, data)
131 return full_path
132
133 # ~~~~~~~~~~~~~~~~~~~~~ Core-Facing Service ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
134
135 def adopt_device(self, device):
136 return self.adapter.adopt_device(device)
137
138 def abandon_device(self, device):
139 return self.adapter.abandon_device(device)
140
141 def deactivate_device(self, device):
142 return self.adapter.deactivate_device(device)
143
Zsolt Harasztic5c5d102016-12-07 21:12:27 -0800144 def update_flows_bulk(self, device, flows, groups):
145 return self.adapter.update_flows_bulk(device, flows, groups)
146
147 def update_flows_incrementally(self, device, flow_changes, group_changes):
148 return self.update_flows_incrementally(
149 device, flow_changes, group_changes)
150
Zsolt Haraszti66862032016-11-28 14:28:39 -0800151 # ~~~~~~~~~~~~~~~~~~~ Adapter-Facing Service ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
152
153 def get_device(self, device_id):
154 return self.root_proxy.get('/devices/{}'.format(device_id))
155
156 def add_device(self, device):
157 assert isinstance(device, Device)
158 self._make_up_to_date('/devices', device.id, device)
159
alshabibbe8ca2e2017-02-01 18:28:57 -0800160 # Ultimately, assign devices to device grpups.
161 # see https://jira.opencord.org/browse/CORD-838
Zsolt Haraszti66862032016-11-28 14:28:39 -0800162
163 dg = DeviceGroup(id='1')
164 self._make_up_to_date('/device_groups', dg.id, dg)
165
166 # add device to device group
alshabibbe8ca2e2017-02-01 18:28:57 -0800167 # see https://jira.opencord.org/browse/CORD-838
Zsolt Haraszti66862032016-11-28 14:28:39 -0800168
169 def update_device(self, device):
170 assert isinstance(device, Device)
171
172 # we run the update through the device_agent so that the change
173 # does not loop back to the adapter unnecessarily
174 device_agent = self.core.get_device_agent(device.id)
175 device_agent.update_device(device)
176
177 def remove_device(self, device_id):
178 device_agent = self.core.get_device_agent(device_id)
179 device_agent.remove_device(device_id)
180
181 def add_port(self, device_id, port):
182 assert isinstance(port, Port)
183
184 # for referential integrity, add/augment references
185 port.device_id = device_id
186 me_as_peer = Port.PeerPort(device_id=device_id, port_no=port.port_no)
187 for peer in port.peers:
188 peer_port_path = '/devices/{}/ports/{}'.format(
189 peer.device_id, peer.port_no)
190 peer_port = self.root_proxy.get(peer_port_path)
191 if me_as_peer not in peer_port.peers:
192 new = peer_port.peers.add()
193 new.CopyFrom(me_as_peer)
194 self.root_proxy.update(peer_port_path, peer_port)
195
196 self._make_up_to_date('/devices/{}/ports'.format(device_id),
197 port.port_no, port)
198
Zsolt Harasztid036b7e2016-12-23 15:36:01 -0800199 def _find_first_available_id(self):
200 logical_devices = self.root_proxy.get('/logical_devices')
201 existing_ids = set(ld.id for ld in logical_devices)
202 existing_datapath_ids = set(ld.datapath_id for ld in logical_devices)
203 i = 1
204 while True:
205 if i not in existing_datapath_ids and str(i) not in existing_ids:
206 return i
207 i += 1
208
Zsolt Haraszti656ecc62016-12-28 15:08:23 -0800209 def get_logical_device(self, logical_device_id):
210 return self.root_proxy.get('/logical_devices/{}'.format(
211 logical_device_id))
212
Zsolt Haraszti66862032016-11-28 14:28:39 -0800213 def create_logical_device(self, logical_device):
214 assert isinstance(logical_device, LogicalDevice)
Zsolt Harasztid036b7e2016-12-23 15:36:01 -0800215
216 if not logical_device.id:
217 id = self._find_first_available_id()
218 logical_device.id = str(id)
219 logical_device.datapath_id = id
220
Zsolt Haraszti66862032016-11-28 14:28:39 -0800221 self._make_up_to_date('/logical_devices',
222 logical_device.id, logical_device)
223
Zsolt Haraszti656ecc62016-12-28 15:08:23 -0800224 self.event_bus.subscribe(
225 topic='packet-out:{}'.format(logical_device.id),
226 callback=lambda _, p: self.receive_packet_out(logical_device.id, p)
227 )
228
Zsolt Harasztid036b7e2016-12-23 15:36:01 -0800229 return logical_device
230
Zsolt Haraszti656ecc62016-12-28 15:08:23 -0800231 def receive_packet_out(self, logical_device_id, ofp_packet_out):
232
233 def get_port_out(opo):
234 for action in opo.actions:
235 if action.type == OUTPUT:
236 return action.output.port
237
238 out_port = get_port_out(ofp_packet_out)
239 frame = ofp_packet_out.data
240 self.adapter.receive_packet_out(logical_device_id, out_port, frame)
241
Zsolt Haraszti66862032016-11-28 14:28:39 -0800242 def add_logical_port(self, logical_device_id, port):
243 assert isinstance(port, LogicalPort)
244 self._make_up_to_date(
245 '/logical_devices/{}/ports'.format(logical_device_id),
246 port.id, port)
247
248 def child_device_detected(self,
249 parent_device_id,
250 parent_port_no,
251 child_device_type,
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800252 proxy_address,
253 **kw):
Zsolt Haraszti66862032016-11-28 14:28:39 -0800254 # we create new ONU device objects and insert them into the config
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800255 # TODO should we auto-enable the freshly created device? Probably.
Zsolt Haraszti66862032016-11-28 14:28:39 -0800256 device = Device(
257 id=uuid4().hex[:12],
258 type=child_device_type,
259 parent_id=parent_device_id,
260 parent_port_no=parent_port_no,
261 admin_state=AdminState.ENABLED,
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800262 proxy_address=proxy_address,
263 **kw
Zsolt Haraszti66862032016-11-28 14:28:39 -0800264 )
265 self._make_up_to_date(
266 '/devices', device.id, device)
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800267
268 topic = self._gen_tx_proxy_address_topic(proxy_address)
269 self._tx_event_subscriptions[topic] = self.event_bus.subscribe(
270 topic, lambda t, m: self._send_proxied_message(proxy_address, m))
271
272 def _gen_rx_proxy_address_topic(self, proxy_address):
273 """Generate unique topic name specific to this proxy address for rx"""
Zsolt Harasztief05ad22017-01-07 22:08:06 -0800274 topic = 'rx:' + MessageToJson(proxy_address)
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800275 return topic
276
277 def _gen_tx_proxy_address_topic(self, proxy_address):
278 """Generate unique topic name specific to this proxy address for tx"""
Zsolt Harasztief05ad22017-01-07 22:08:06 -0800279 topic = 'tx:' + MessageToJson(proxy_address)
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800280 return topic
281
282 def register_for_proxied_messages(self, proxy_address):
283 topic = self._gen_rx_proxy_address_topic(proxy_address)
284 self._rx_event_subscriptions[topic] = self.event_bus.subscribe(
Stephane Barbariecc6b2e62017-03-02 14:35:55 -0500285 topic,
286 lambda t, m: self._receive_proxied_message(proxy_address, m))
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800287
288 def _receive_proxied_message(self, proxy_address, msg):
289 self.adapter.receive_proxied_message(proxy_address, msg)
290
291 def send_proxied_message(self, proxy_address, msg):
292 topic = self._gen_tx_proxy_address_topic(proxy_address)
293 self.event_bus.publish(topic, msg)
294
295 def _send_proxied_message(self, proxy_address, msg):
296 self.adapter.send_proxied_message(proxy_address, msg)
297
298 def receive_proxied_message(self, proxy_address, msg):
299 topic = self._gen_rx_proxy_address_topic(proxy_address)
300 self.event_bus.publish(topic, msg)
Zsolt Haraszti8925d1f2016-12-21 00:45:19 -0800301
302 # ~~~~~~~~~~~~~~~~~~ Handling packet-in and packet-out ~~~~~~~~~~~~~~~~~~~~
303
304 def send_packet_in(self, logical_device_id, logical_port_no, packet):
305 self.log.debug('send-packet-in', logical_device_id=logical_device_id,
Zsolt Harasztief05ad22017-01-07 22:08:06 -0800306 logical_port_no=logical_port_no, packet=hexify(packet))
Zsolt Haraszti8925d1f2016-12-21 00:45:19 -0800307
308 if isinstance(packet, Packet):
309 packet = str(packet)
310
311 topic = 'packet-in:' + logical_device_id
312 self.event_bus.publish(topic, (logical_port_no, packet))
Zsolt Haraszti749b0952017-01-18 09:02:35 -0800313
314 # ~~~~~~~~~~~~~~~~~~~ Handling KPI metric submissions ~~~~~~~~~~~~~~~~~~~~~
Zsolt Harasztic5f740b2017-01-18 09:53:17 -0800315
Zsolt Haraszti749b0952017-01-18 09:02:35 -0800316 def submit_kpis(self, kpi_event_msg):
317 try:
318 assert isinstance(kpi_event_msg, KpiEvent)
319 self.event_bus.publish('kpis', kpi_event_msg)
320 except Exception as e:
321 self.log.exception('failed-kpi-submission',
322 type=type(kpi_event_msg))
Stephane Barbarie52198b92017-03-02 13:44:46 -0500323
324 # ~~~~~~~~~~~~~~~~~~~ Handle alarm submissions ~~~~~~~~~~~~~~~~~~~~~
325
Stephane Barbariecc6b2e62017-03-02 14:35:55 -0500326 def create_alarm(self, id=None, resource_id=None, description=None,
327 raised_ts=0, changed_ts=0,
328 type=AlarmEventType.EQUIPMENT,
Stephane Barbariebf3e10c2017-03-03 10:15:58 -0500329 category=AlarmEventCategory.PON,
Stephane Barbariecc6b2e62017-03-02 14:35:55 -0500330 severity=AlarmEventSeverity.MINOR,
331 state=AlarmEventState.RAISED,
Stephane Barbarie52198b92017-03-02 13:44:46 -0500332 context=None):
333
334 # Construct the ID if it is not provided
335 if id == None:
336 id = 'voltha.{}.{}'.format(self.adapter_name, resource_id)
337
338 return AlarmEvent(
339 id=id,
340 resource_id=resource_id,
341 type=type,
342 category=category,
343 severity=severity,
344 state=state,
345 description=description,
346 reported_ts=arrow.utcnow().timestamp,
347 raised_ts=raised_ts,
348 changed_ts=changed_ts,
349 context=context
350 )
351
352 def submit_alarm(self, alarm_event_msg):
353 try:
354 assert isinstance(alarm_event_msg, AlarmEvent)
355 self.event_bus.publish('alarms', alarm_event_msg)
356
357 except Exception as e:
358 self.log.exception('failed-alarm-submission',
359 type=type(alarm_event_msg))