blob: 1d9b57faca433fbcf74dea142ff3d71c96fb89ba [file] [log] [blame]
Chip Boling67b674a2019-02-08 11:42:18 -06001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18This facade handles kafka-formatted messages from the Core, extracts the kafka
19formatting and forwards the request to the concrete handler.
20"""
Zack Williams84a71e92019-11-15 09:00:19 -070021from __future__ import absolute_import
Chip Boling67b674a2019-02-08 11:42:18 -060022import structlog
23from twisted.internet.defer import inlineCallbacks
24from zope.interface import implementer
25from twisted.internet import reactor
26
27from afkak.consumer import OFFSET_LATEST, OFFSET_EARLIEST
28from pyvoltha.adapters.interface import IAdapterInterface
William Kurkianede82e92019-03-05 13:02:57 -050029from voltha_protos.inter_container_pb2 import IntType, InterAdapterMessage, StrType, Error, ErrorCode
Rohan Agrawal9a028682020-06-01 09:08:14 +000030from voltha_protos.device_pb2 import Device, Port, ImageDownload, SimulateAlarmRequest, PmConfigs
William Kurkianede82e92019-03-05 13:02:57 -050031from voltha_protos.openflow_13_pb2 import FlowChanges, FlowGroups, Flows, \
Chip Boling67b674a2019-02-08 11:42:18 -060032 FlowGroupChanges, ofp_packet_out
onkarkundargiea01eb82020-01-27 12:15:40 +053033from voltha_protos.voltha_pb2 import OmciTestRequest
Chip Boling67b674a2019-02-08 11:42:18 -060034from pyvoltha.adapters.kafka.kafka_inter_container_library import IKafkaMessagingProxy, \
khenaidoo944aee72019-02-28 11:00:24 -050035 get_messaging_proxy, KAFKA_OFFSET_LATEST, KAFKA_OFFSET_EARLIEST, ARG_FROM_TOPIC
Chip Boling67b674a2019-02-08 11:42:18 -060036
onkarkundargiea01eb82020-01-27 12:15:40 +053037
Chip Boling67b674a2019-02-08 11:42:18 -060038log = structlog.get_logger()
39
40class MacAddressError(BaseException):
41 def __init__(self, error):
42 self.error = error
43
44
45class IDError(BaseException):
46 def __init__(self, error):
47 self.error = error
48
49
50@implementer(IAdapterInterface)
51class AdapterRequestFacade(object):
52 """
53 Gate-keeper between CORE and device adapters.
54
55 On one side it interacts with Core's internal model and update/dispatch
56 mechanisms.
57
58 On the other side, it interacts with the adapters standard interface as
59 defined in
60 """
61
khenaidoo944aee72019-02-28 11:00:24 -050062 def __init__(self, adapter, core_proxy):
Chip Boling67b674a2019-02-08 11:42:18 -060063 self.adapter = adapter
khenaidoo944aee72019-02-28 11:00:24 -050064 self.core_proxy = core_proxy
Chip Boling67b674a2019-02-08 11:42:18 -060065
66 @inlineCallbacks
67 def start(self):
68 log.debug('starting')
69
70 @inlineCallbacks
71 def stop(self):
72 log.debug('stopping')
73
khenaidoo944aee72019-02-28 11:00:24 -050074 # @inlineCallbacks
75 # def createKafkaDeviceTopic(self, deviceId):
76 # log.debug("subscribing-to-topic", device_id=deviceId)
77 # kafka_proxy = get_messaging_proxy()
78 # device_topic = kafka_proxy.get_default_topic() + "_" + deviceId
79 # # yield kafka_proxy.create_topic(topic=device_topic)
80 # yield kafka_proxy.subscribe(topic=device_topic, group_id=device_topic, target_cls=self, offset=KAFKA_OFFSET_EARLIEST)
81 # log.debug("subscribed-to-topic", topic=device_topic)
Chip Boling67b674a2019-02-08 11:42:18 -060082
onkarkundargiea01eb82020-01-27 12:15:40 +053083 def start_omci_test(self, device, omcitestrequest, **kwargs):
84 if not device:
85 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
86 reason="device-invalid")
87 if not omcitestrequest:
88 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
89 reason="omcitestrequest-invalid")
90
91 d = Device()
92 device.Unpack(d)
93 omci_test = OmciTestRequest()
94 omcitestrequest.Unpack(omci_test)
95 result = self.adapter.start_omci_test(d, omci_test.uuid)
96 return True, result
97
khenaidoo944aee72019-02-28 11:00:24 -050098 def adopt_device(self, device, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -060099 d = Device()
100 if device:
101 device.Unpack(d)
102
khenaidoo944aee72019-02-28 11:00:24 -0500103 # Update the core reference for that device as it will be used
104 # by the adapter to send async messages to the Core.
105 if ARG_FROM_TOPIC in kwargs:
106 t = StrType()
107 kwargs[ARG_FROM_TOPIC].Unpack(t)
108 # Update the core reference for that device
109 self.core_proxy.update_device_core_reference(d.id, t.val)
110
111 # # Start the creation of a device specific topic to handle all
112 # # subsequent requests from the Core. This adapter instance will
113 # # handle all requests for that device.
114 # reactor.callLater(0, self.createKafkaDeviceTopic, d.id)
Chip Boling67b674a2019-02-08 11:42:18 -0600115
116 result = self.adapter.adopt_device(d)
117 # return True, self.adapter.adopt_device(d)
118
119 return True, result
120 else:
121 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
122 reason="device-invalid")
123
khenaidoo944aee72019-02-28 11:00:24 -0500124 def get_ofp_device_info(self, device, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600125 d = Device()
126 if device:
127 device.Unpack(d)
128 return True, self.adapter.get_ofp_device_info(d)
129 else:
130 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
131 reason="device-invalid")
132
khenaidoo944aee72019-02-28 11:00:24 -0500133 def reconcile_device(self, device, **kwargs):
Matt Jeanneretec797262020-01-09 11:55:36 -0500134 d = Device()
135 if device:
136 device.Unpack(d)
137 return True, self.adapter.reconcile_device(d)
138 else:
139 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
140 reason="device-invalid")
Chip Boling67b674a2019-02-08 11:42:18 -0600141
khenaidoo944aee72019-02-28 11:00:24 -0500142 def abandon_device(self, device, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600143 return self.adapter.abandon_device(device)
144
khenaidoo944aee72019-02-28 11:00:24 -0500145 def disable_device(self, device, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600146 d = Device()
147 if device:
148 device.Unpack(d)
149 return True, self.adapter.disable_device(d)
150 else:
151 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
152 reason="device-invalid")
153
khenaidoo944aee72019-02-28 11:00:24 -0500154 def reenable_device(self, device, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600155 d = Device()
156 if device:
157 device.Unpack(d)
158 return True, self.adapter.reenable_device(d)
159 else:
160 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
161 reason="device-invalid")
162
khenaidoo944aee72019-02-28 11:00:24 -0500163 def reboot_device(self, device, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600164 d = Device()
165 if device:
166 device.Unpack(d)
167 return (True, self.adapter.reboot_device(d))
168 else:
169 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
170 reason="device-invalid")
171
Rohan Agrawal9a028682020-06-01 09:08:14 +0000172 def update_pm_config(self, device, pm_configs, **kwargs):
173 d = Device()
174 if device:
175 device.Unpack(d)
176 else:
177 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
178 reason="device-invalid")
179 pm = PmConfigs()
180 if pm_configs:
181 pm_configs.Unpack(pm)
182
183 return (True, self.adapter.update_pm_config(d, pm))
184
khenaidoo944aee72019-02-28 11:00:24 -0500185 def download_image(self, device, request, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600186 d = Device()
187 if device:
188 device.Unpack(d)
189 else:
190 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
191 reason="device-invalid")
192 img = ImageDownload()
193 if request:
194 request.Unpack(img)
195 else:
196 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
197 reason="port-no-invalid")
198
199 return True, self.adapter.download_image(device, request)
200
khenaidoo944aee72019-02-28 11:00:24 -0500201 def get_image_download_status(self, device, request, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600202 d = Device()
203 if device:
204 device.Unpack(d)
205 else:
206 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
207 reason="device-invalid")
208 img = ImageDownload()
209 if request:
210 request.Unpack(img)
211 else:
212 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
213 reason="port-no-invalid")
214
215 return True, self.adapter.get_image_download_status(device, request)
216
khenaidoo944aee72019-02-28 11:00:24 -0500217 def cancel_image_download(self, device, request, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600218 d = Device()
219 if device:
220 device.Unpack(d)
221 else:
222 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
223 reason="device-invalid")
224 img = ImageDownload()
225 if request:
226 request.Unpack(img)
227 else:
228 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
229 reason="port-no-invalid")
230
231 return True, self.adapter.cancel_image_download(device, request)
232
khenaidoo944aee72019-02-28 11:00:24 -0500233 def activate_image_update(self, device, request, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600234 d = Device()
235 if device:
236 device.Unpack(d)
237 else:
238 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
239 reason="device-invalid")
240 img = ImageDownload()
241 if request:
242 request.Unpack(img)
243 else:
244 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
245 reason="port-no-invalid")
246
247 return True, self.adapter.activate_image_update(device, request)
248
khenaidoo944aee72019-02-28 11:00:24 -0500249 def revert_image_update(self, device, request, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600250 d = Device()
251 if device:
252 device.Unpack(d)
253 else:
254 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
255 reason="device-invalid")
256 img = ImageDownload()
257 if request:
258 request.Unpack(img)
259 else:
260 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
261 reason="port-no-invalid")
262
263 return True, self.adapter.revert_image_update(device, request)
264
kesavandd9ef7fe2020-01-29 20:54:13 -0500265 def enable_port(self, device_id, port, **kwargs):
266 if not device_id:
267 return False, Error(code=ErrorCode.INVALID_PARAMETER,
268 reason="device")
269 p = Port()
270 if port:
271 port.Unpack(p)
272 else:
273 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
274 reason="port-invalid")
275
276 return (True, self.adapter.enable_port(device_id, port))
277
278 def disable_port(self, device_id, port, **kwargs):
279 if not device_id:
280 return False, Error(code=ErrorCode.INVALID_PARAMETER,
281 reason="device")
282 p = Port()
283 if port:
284 port.Unpack(p)
285 else:
286 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
287 reason="port-invalid")
288
289 return (True, self.adapter.disable_port(device_id, port))
Chip Boling67b674a2019-02-08 11:42:18 -0600290
khenaidoo944aee72019-02-28 11:00:24 -0500291 def self_test(self, device, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600292 return self.adapter.self_test_device(device)
293
khenaidoo944aee72019-02-28 11:00:24 -0500294 def delete_device(self, device, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600295 d = Device()
296 if device:
297 device.Unpack(d)
298 result = self.adapter.delete_device(d)
299 # return (True, self.adapter.delete_device(d))
300
301 # Before we return, delete the device specific topic as we will no
302 # longer receive requests from the Core for that device
303 kafka_proxy = get_messaging_proxy()
304 device_topic = kafka_proxy.get_default_topic() + "/" + d.id
305 kafka_proxy.unsubscribe(topic=device_topic)
306
307 return (True, result)
308 else:
309 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
310 reason="device-invalid")
311
khenaidoo944aee72019-02-28 11:00:24 -0500312 def get_device_details(self, device, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600313 return self.adapter.get_device_details(device)
314
khenaidoo944aee72019-02-28 11:00:24 -0500315 def update_flows_bulk(self, device, flows, groups, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600316 d = Device()
317 if device:
318 device.Unpack(d)
319 else:
320 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
321 reason="device-invalid")
322 f = Flows()
323 if flows:
324 flows.Unpack(f)
325
326 g = FlowGroups()
327 if groups:
328 groups.Unpack(g)
329
330 return (True, self.adapter.update_flows_bulk(d, f, g))
331
khenaidoo944aee72019-02-28 11:00:24 -0500332 def update_flows_incrementally(self, device, flow_changes, group_changes, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600333 d = Device()
334 if device:
335 device.Unpack(d)
336 else:
337 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
338 reason="device-invalid")
339 f = FlowChanges()
340 if flow_changes:
341 flow_changes.Unpack(f)
342
343 g = FlowGroupChanges()
344 if group_changes:
345 group_changes.Unpack(g)
346
347 return (True, self.adapter.update_flows_incrementally(d, f, g))
348
khenaidoo944aee72019-02-28 11:00:24 -0500349 def suppress_alarm(self, filter, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600350 return self.adapter.suppress_alarm(filter)
351
khenaidoo944aee72019-02-28 11:00:24 -0500352 def unsuppress_alarm(self, filter, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600353 return self.adapter.unsuppress_alarm(filter)
354
khenaidoo944aee72019-02-28 11:00:24 -0500355 def process_inter_adapter_message(self, msg, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600356 m = InterAdapterMessage()
357 if msg:
358 msg.Unpack(m)
359 else:
360 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
361 reason="msg-invalid")
362
serkant.uluderya6371af52020-05-29 23:26:57 -0700363 max_retry = 0
364 # NOTE as per VOL-3223 a race condition on ONU_IND_REQUEST may occur,
365 # so if that's the message retry up to 10 times
366 if m.header.type == 6:
367 max_retry = 10
368 return (True, self.adapter.process_inter_adapter_message(m, max_retry=max_retry))
Chip Boling67b674a2019-02-08 11:42:18 -0600369
370
khenaidoo944aee72019-02-28 11:00:24 -0500371 def receive_packet_out(self, deviceId, outPort, packet, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600372 try:
373 d_id = StrType()
374 if deviceId:
375 deviceId.Unpack(d_id)
376 else:
377 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
378 reason="deviceid-invalid")
379
380 op = IntType()
381 if outPort:
382 outPort.Unpack(op)
383 else:
384 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
385 reason="outport-invalid")
386
387 p = ofp_packet_out()
388 if packet:
389 packet.Unpack(p)
390 else:
391 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
392 reason="packet-invalid")
393
394 return (True, self.adapter.receive_packet_out(d_id.val, op.val, p))
395 except Exception as e:
396 log.exception("error-processing-receive_packet_out", e=e)
serkant.uluderyaece067c2019-04-10 09:13:48 -0700397
398 def simulate_alarm(self, device, request, **kwargs):
399 d = Device()
400 if device:
401 device.Unpack(d)
402 else:
403 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
404 reason="device-invalid")
405 req = SimulateAlarmRequest()
406 if request:
407 request.Unpack(req)
408 else:
409 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
410 reason="simulate-alarm-request-invalid")
411
412 return True, self.adapter.simulate_alarm(d, req)
Chip Boling67b674a2019-02-08 11:42:18 -0600413