blob: f59f213e424dfbf179ebd123471ec1c019686473 [file] [log] [blame]
Chip Boling67b674a2019-02-08 11:42:18 -06001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18This facade handles kafka-formatted messages from the Core, extracts the kafka
19formatting and forwards the request to the concrete handler.
20"""
Zack Williams84a71e92019-11-15 09:00:19 -070021from __future__ import absolute_import
Chip Boling67b674a2019-02-08 11:42:18 -060022import structlog
amit.ghosh1b7b4542020-11-19 09:19:21 +010023from twisted.internet.defer import inlineCallbacks, returnValue
Chip Boling67b674a2019-02-08 11:42:18 -060024from zope.interface import implementer
25from twisted.internet import reactor
26
27from afkak.consumer import OFFSET_LATEST, OFFSET_EARLIEST
28from pyvoltha.adapters.interface import IAdapterInterface
William Kurkianede82e92019-03-05 13:02:57 -050029from voltha_protos.inter_container_pb2 import IntType, InterAdapterMessage, StrType, Error, ErrorCode
Rohan Agrawal9a028682020-06-01 09:08:14 +000030from voltha_protos.device_pb2 import Device, Port, ImageDownload, SimulateAlarmRequest, PmConfigs
William Kurkianede82e92019-03-05 13:02:57 -050031from voltha_protos.openflow_13_pb2 import FlowChanges, FlowGroups, Flows, \
Chip Boling67b674a2019-02-08 11:42:18 -060032 FlowGroupChanges, ofp_packet_out
amit.ghosh1b7b4542020-11-19 09:19:21 +010033from voltha_protos.extensions_pb2 import SingleGetValueRequest
onkarkundargiea01eb82020-01-27 12:15:40 +053034from voltha_protos.voltha_pb2 import OmciTestRequest
Chip Boling67b674a2019-02-08 11:42:18 -060035from pyvoltha.adapters.kafka.kafka_inter_container_library import IKafkaMessagingProxy, \
khenaidoo944aee72019-02-28 11:00:24 -050036 get_messaging_proxy, KAFKA_OFFSET_LATEST, KAFKA_OFFSET_EARLIEST, ARG_FROM_TOPIC
Chip Boling67b674a2019-02-08 11:42:18 -060037
onkarkundargiea01eb82020-01-27 12:15:40 +053038
Chip Boling67b674a2019-02-08 11:42:18 -060039log = structlog.get_logger()
40
41class MacAddressError(BaseException):
42 def __init__(self, error):
43 self.error = error
44
45
46class IDError(BaseException):
47 def __init__(self, error):
48 self.error = error
49
50
51@implementer(IAdapterInterface)
52class AdapterRequestFacade(object):
53 """
54 Gate-keeper between CORE and device adapters.
55
56 On one side it interacts with Core's internal model and update/dispatch
57 mechanisms.
58
59 On the other side, it interacts with the adapters standard interface as
60 defined in
61 """
62
khenaidoo944aee72019-02-28 11:00:24 -050063 def __init__(self, adapter, core_proxy):
Chip Boling67b674a2019-02-08 11:42:18 -060064 self.adapter = adapter
khenaidoo944aee72019-02-28 11:00:24 -050065 self.core_proxy = core_proxy
Chip Boling67b674a2019-02-08 11:42:18 -060066
67 @inlineCallbacks
68 def start(self):
69 log.debug('starting')
70
71 @inlineCallbacks
72 def stop(self):
73 log.debug('stopping')
74
khenaidoo944aee72019-02-28 11:00:24 -050075 # @inlineCallbacks
76 # def createKafkaDeviceTopic(self, deviceId):
77 # log.debug("subscribing-to-topic", device_id=deviceId)
78 # kafka_proxy = get_messaging_proxy()
79 # device_topic = kafka_proxy.get_default_topic() + "_" + deviceId
80 # # yield kafka_proxy.create_topic(topic=device_topic)
81 # yield kafka_proxy.subscribe(topic=device_topic, group_id=device_topic, target_cls=self, offset=KAFKA_OFFSET_EARLIEST)
82 # log.debug("subscribed-to-topic", topic=device_topic)
Chip Boling67b674a2019-02-08 11:42:18 -060083
onkarkundargiea01eb82020-01-27 12:15:40 +053084 def start_omci_test(self, device, omcitestrequest, **kwargs):
85 if not device:
86 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
87 reason="device-invalid")
88 if not omcitestrequest:
89 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
90 reason="omcitestrequest-invalid")
91
92 d = Device()
93 device.Unpack(d)
94 omci_test = OmciTestRequest()
95 omcitestrequest.Unpack(omci_test)
96 result = self.adapter.start_omci_test(d, omci_test.uuid)
97 return True, result
98
khenaidoo944aee72019-02-28 11:00:24 -050099 def adopt_device(self, device, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600100 d = Device()
101 if device:
102 device.Unpack(d)
103
khenaidoo944aee72019-02-28 11:00:24 -0500104 # Update the core reference for that device as it will be used
105 # by the adapter to send async messages to the Core.
106 if ARG_FROM_TOPIC in kwargs:
107 t = StrType()
108 kwargs[ARG_FROM_TOPIC].Unpack(t)
109 # Update the core reference for that device
110 self.core_proxy.update_device_core_reference(d.id, t.val)
111
112 # # Start the creation of a device specific topic to handle all
113 # # subsequent requests from the Core. This adapter instance will
114 # # handle all requests for that device.
115 # reactor.callLater(0, self.createKafkaDeviceTopic, d.id)
Chip Boling67b674a2019-02-08 11:42:18 -0600116
117 result = self.adapter.adopt_device(d)
118 # return True, self.adapter.adopt_device(d)
119
120 return True, result
121 else:
122 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
123 reason="device-invalid")
124
khenaidoo944aee72019-02-28 11:00:24 -0500125 def get_ofp_device_info(self, device, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600126 d = Device()
127 if device:
128 device.Unpack(d)
129 return True, self.adapter.get_ofp_device_info(d)
130 else:
131 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
132 reason="device-invalid")
133
khenaidoo944aee72019-02-28 11:00:24 -0500134 def reconcile_device(self, device, **kwargs):
Matt Jeanneretec797262020-01-09 11:55:36 -0500135 d = Device()
136 if device:
137 device.Unpack(d)
138 return True, self.adapter.reconcile_device(d)
139 else:
140 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
141 reason="device-invalid")
Chip Boling67b674a2019-02-08 11:42:18 -0600142
khenaidoo944aee72019-02-28 11:00:24 -0500143 def abandon_device(self, device, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600144 return self.adapter.abandon_device(device)
145
khenaidoo944aee72019-02-28 11:00:24 -0500146 def disable_device(self, device, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600147 d = Device()
148 if device:
149 device.Unpack(d)
150 return True, self.adapter.disable_device(d)
151 else:
152 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
153 reason="device-invalid")
154
khenaidoo944aee72019-02-28 11:00:24 -0500155 def reenable_device(self, device, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600156 d = Device()
157 if device:
158 device.Unpack(d)
159 return True, self.adapter.reenable_device(d)
160 else:
161 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
162 reason="device-invalid")
163
khenaidoo944aee72019-02-28 11:00:24 -0500164 def reboot_device(self, device, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600165 d = Device()
166 if device:
167 device.Unpack(d)
168 return (True, self.adapter.reboot_device(d))
169 else:
170 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
171 reason="device-invalid")
172
Rohan Agrawal9a028682020-06-01 09:08:14 +0000173 def update_pm_config(self, device, pm_configs, **kwargs):
174 d = Device()
175 if device:
176 device.Unpack(d)
177 else:
178 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
179 reason="device-invalid")
180 pm = PmConfigs()
181 if pm_configs:
182 pm_configs.Unpack(pm)
183
184 return (True, self.adapter.update_pm_config(d, pm))
185
khenaidoo944aee72019-02-28 11:00:24 -0500186 def download_image(self, device, request, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600187 d = Device()
188 if device:
189 device.Unpack(d)
190 else:
191 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
192 reason="device-invalid")
193 img = ImageDownload()
194 if request:
195 request.Unpack(img)
196 else:
197 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
198 reason="port-no-invalid")
199
200 return True, self.adapter.download_image(device, request)
201
khenaidoo944aee72019-02-28 11:00:24 -0500202 def get_image_download_status(self, device, request, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600203 d = Device()
204 if device:
205 device.Unpack(d)
206 else:
207 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
208 reason="device-invalid")
209 img = ImageDownload()
210 if request:
211 request.Unpack(img)
212 else:
213 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
214 reason="port-no-invalid")
215
216 return True, self.adapter.get_image_download_status(device, request)
217
khenaidoo944aee72019-02-28 11:00:24 -0500218 def cancel_image_download(self, device, request, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600219 d = Device()
220 if device:
221 device.Unpack(d)
222 else:
223 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
224 reason="device-invalid")
225 img = ImageDownload()
226 if request:
227 request.Unpack(img)
228 else:
229 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
230 reason="port-no-invalid")
231
232 return True, self.adapter.cancel_image_download(device, request)
233
khenaidoo944aee72019-02-28 11:00:24 -0500234 def activate_image_update(self, device, request, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600235 d = Device()
236 if device:
237 device.Unpack(d)
238 else:
239 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
240 reason="device-invalid")
241 img = ImageDownload()
242 if request:
243 request.Unpack(img)
244 else:
245 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
246 reason="port-no-invalid")
247
248 return True, self.adapter.activate_image_update(device, request)
249
khenaidoo944aee72019-02-28 11:00:24 -0500250 def revert_image_update(self, device, request, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600251 d = Device()
252 if device:
253 device.Unpack(d)
254 else:
255 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
256 reason="device-invalid")
257 img = ImageDownload()
258 if request:
259 request.Unpack(img)
260 else:
261 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
262 reason="port-no-invalid")
263
264 return True, self.adapter.revert_image_update(device, request)
265
kesavandd9ef7fe2020-01-29 20:54:13 -0500266 def enable_port(self, device_id, port, **kwargs):
267 if not device_id:
268 return False, Error(code=ErrorCode.INVALID_PARAMETER,
269 reason="device")
270 p = Port()
271 if port:
272 port.Unpack(p)
273 else:
274 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
275 reason="port-invalid")
276
277 return (True, self.adapter.enable_port(device_id, port))
278
279 def disable_port(self, device_id, port, **kwargs):
280 if not device_id:
281 return False, Error(code=ErrorCode.INVALID_PARAMETER,
282 reason="device")
283 p = Port()
284 if port:
285 port.Unpack(p)
286 else:
287 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
288 reason="port-invalid")
289
290 return (True, self.adapter.disable_port(device_id, port))
Chip Boling67b674a2019-02-08 11:42:18 -0600291
khenaidoo944aee72019-02-28 11:00:24 -0500292 def self_test(self, device, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600293 return self.adapter.self_test_device(device)
294
khenaidoo944aee72019-02-28 11:00:24 -0500295 def delete_device(self, device, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600296 d = Device()
297 if device:
298 device.Unpack(d)
299 result = self.adapter.delete_device(d)
300 # return (True, self.adapter.delete_device(d))
301
302 # Before we return, delete the device specific topic as we will no
303 # longer receive requests from the Core for that device
304 kafka_proxy = get_messaging_proxy()
305 device_topic = kafka_proxy.get_default_topic() + "/" + d.id
306 kafka_proxy.unsubscribe(topic=device_topic)
307
308 return (True, result)
309 else:
310 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
311 reason="device-invalid")
312
khenaidoo944aee72019-02-28 11:00:24 -0500313 def get_device_details(self, device, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600314 return self.adapter.get_device_details(device)
315
khenaidoo944aee72019-02-28 11:00:24 -0500316 def update_flows_bulk(self, device, flows, groups, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600317 d = Device()
318 if device:
319 device.Unpack(d)
320 else:
321 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
322 reason="device-invalid")
323 f = Flows()
324 if flows:
325 flows.Unpack(f)
326
327 g = FlowGroups()
328 if groups:
329 groups.Unpack(g)
330
331 return (True, self.adapter.update_flows_bulk(d, f, g))
332
khenaidoo944aee72019-02-28 11:00:24 -0500333 def update_flows_incrementally(self, device, flow_changes, group_changes, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600334 d = Device()
335 if device:
336 device.Unpack(d)
337 else:
338 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
339 reason="device-invalid")
340 f = FlowChanges()
341 if flow_changes:
342 flow_changes.Unpack(f)
343
344 g = FlowGroupChanges()
345 if group_changes:
346 group_changes.Unpack(g)
347
348 return (True, self.adapter.update_flows_incrementally(d, f, g))
349
khenaidoo944aee72019-02-28 11:00:24 -0500350 def suppress_alarm(self, filter, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600351 return self.adapter.suppress_alarm(filter)
352
khenaidoo944aee72019-02-28 11:00:24 -0500353 def unsuppress_alarm(self, filter, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600354 return self.adapter.unsuppress_alarm(filter)
355
khenaidoo944aee72019-02-28 11:00:24 -0500356 def process_inter_adapter_message(self, msg, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600357 m = InterAdapterMessage()
358 if msg:
359 msg.Unpack(m)
360 else:
361 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
362 reason="msg-invalid")
363
serkant.uluderya6371af52020-05-29 23:26:57 -0700364 max_retry = 0
365 # NOTE as per VOL-3223 a race condition on ONU_IND_REQUEST may occur,
366 # so if that's the message retry up to 10 times
367 if m.header.type == 6:
368 max_retry = 10
369 return (True, self.adapter.process_inter_adapter_message(m, max_retry=max_retry))
Chip Boling67b674a2019-02-08 11:42:18 -0600370
371
khenaidoo944aee72019-02-28 11:00:24 -0500372 def receive_packet_out(self, deviceId, outPort, packet, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600373 try:
374 d_id = StrType()
375 if deviceId:
376 deviceId.Unpack(d_id)
377 else:
378 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
379 reason="deviceid-invalid")
380
381 op = IntType()
382 if outPort:
383 outPort.Unpack(op)
384 else:
385 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
386 reason="outport-invalid")
387
388 p = ofp_packet_out()
389 if packet:
390 packet.Unpack(p)
391 else:
392 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
393 reason="packet-invalid")
394
395 return (True, self.adapter.receive_packet_out(d_id.val, op.val, p))
396 except Exception as e:
397 log.exception("error-processing-receive_packet_out", e=e)
serkant.uluderyaece067c2019-04-10 09:13:48 -0700398
399 def simulate_alarm(self, device, request, **kwargs):
400 d = Device()
401 if device:
402 device.Unpack(d)
403 else:
404 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
405 reason="device-invalid")
406 req = SimulateAlarmRequest()
407 if request:
408 request.Unpack(req)
409 else:
410 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
411 reason="simulate-alarm-request-invalid")
412
amit.ghosh1b7b4542020-11-19 09:19:21 +0100413 return True, self.adapter.simulate_alarm(d, req)
Chip Boling67b674a2019-02-08 11:42:18 -0600414
amit.ghosh1b7b4542020-11-19 09:19:21 +0100415 @inlineCallbacks
416 def single_get_value_request(self, request, **kwargs):
417 req = SingleGetValueRequest()
418 if request:
419 request.Unpack(req)
420 else:
421 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
422 reason="request-invalid")
423 result = yield self.adapter.single_get_value_request(req)
424 res = yield result
425 return (True, res)