blob: b2356bace729ea98efca6ed44ad45b29df567208 [file] [log] [blame]
Chip Boling67b674a2019-02-08 11:42:18 -06001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18This facade handles kafka-formatted messages from the Core, extracts the kafka
19formatting and forwards the request to the concrete handler.
20"""
Zack Williams84a71e92019-11-15 09:00:19 -070021from __future__ import absolute_import
Chip Boling67b674a2019-02-08 11:42:18 -060022import structlog
23from twisted.internet.defer import inlineCallbacks
24from zope.interface import implementer
25from twisted.internet import reactor
26
27from afkak.consumer import OFFSET_LATEST, OFFSET_EARLIEST
28from pyvoltha.adapters.interface import IAdapterInterface
William Kurkianede82e92019-03-05 13:02:57 -050029from voltha_protos.inter_container_pb2 import IntType, InterAdapterMessage, StrType, Error, ErrorCode
serkant.uluderyaece067c2019-04-10 09:13:48 -070030from voltha_protos.device_pb2 import Device, ImageDownload, SimulateAlarmRequest
William Kurkianede82e92019-03-05 13:02:57 -050031from voltha_protos.openflow_13_pb2 import FlowChanges, FlowGroups, Flows, \
Chip Boling67b674a2019-02-08 11:42:18 -060032 FlowGroupChanges, ofp_packet_out
33from pyvoltha.adapters.kafka.kafka_inter_container_library import IKafkaMessagingProxy, \
khenaidoo944aee72019-02-28 11:00:24 -050034 get_messaging_proxy, KAFKA_OFFSET_LATEST, KAFKA_OFFSET_EARLIEST, ARG_FROM_TOPIC
Chip Boling67b674a2019-02-08 11:42:18 -060035
36log = structlog.get_logger()
37
38class MacAddressError(BaseException):
39 def __init__(self, error):
40 self.error = error
41
42
43class IDError(BaseException):
44 def __init__(self, error):
45 self.error = error
46
47
48@implementer(IAdapterInterface)
49class AdapterRequestFacade(object):
50 """
51 Gate-keeper between CORE and device adapters.
52
53 On one side it interacts with Core's internal model and update/dispatch
54 mechanisms.
55
56 On the other side, it interacts with the adapters standard interface as
57 defined in
58 """
59
khenaidoo944aee72019-02-28 11:00:24 -050060 def __init__(self, adapter, core_proxy):
Chip Boling67b674a2019-02-08 11:42:18 -060061 self.adapter = adapter
khenaidoo944aee72019-02-28 11:00:24 -050062 self.core_proxy = core_proxy
Chip Boling67b674a2019-02-08 11:42:18 -060063
64 @inlineCallbacks
65 def start(self):
66 log.debug('starting')
67
68 @inlineCallbacks
69 def stop(self):
70 log.debug('stopping')
71
khenaidoo944aee72019-02-28 11:00:24 -050072 # @inlineCallbacks
73 # def createKafkaDeviceTopic(self, deviceId):
74 # log.debug("subscribing-to-topic", device_id=deviceId)
75 # kafka_proxy = get_messaging_proxy()
76 # device_topic = kafka_proxy.get_default_topic() + "_" + deviceId
77 # # yield kafka_proxy.create_topic(topic=device_topic)
78 # yield kafka_proxy.subscribe(topic=device_topic, group_id=device_topic, target_cls=self, offset=KAFKA_OFFSET_EARLIEST)
79 # log.debug("subscribed-to-topic", topic=device_topic)
Chip Boling67b674a2019-02-08 11:42:18 -060080
khenaidoo944aee72019-02-28 11:00:24 -050081 def adopt_device(self, device, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -060082 d = Device()
83 if device:
84 device.Unpack(d)
85
khenaidoo944aee72019-02-28 11:00:24 -050086 # Update the core reference for that device as it will be used
87 # by the adapter to send async messages to the Core.
88 if ARG_FROM_TOPIC in kwargs:
89 t = StrType()
90 kwargs[ARG_FROM_TOPIC].Unpack(t)
91 # Update the core reference for that device
92 self.core_proxy.update_device_core_reference(d.id, t.val)
93
94 # # Start the creation of a device specific topic to handle all
95 # # subsequent requests from the Core. This adapter instance will
96 # # handle all requests for that device.
97 # reactor.callLater(0, self.createKafkaDeviceTopic, d.id)
Chip Boling67b674a2019-02-08 11:42:18 -060098
99 result = self.adapter.adopt_device(d)
100 # return True, self.adapter.adopt_device(d)
101
102 return True, result
103 else:
104 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
105 reason="device-invalid")
106
khenaidoo944aee72019-02-28 11:00:24 -0500107 def get_ofp_device_info(self, device, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600108 d = Device()
109 if device:
110 device.Unpack(d)
111 return True, self.adapter.get_ofp_device_info(d)
112 else:
113 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
114 reason="device-invalid")
115
khenaidoo944aee72019-02-28 11:00:24 -0500116 def get_ofp_port_info(self, device, port_no, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600117 d = Device()
118 if device:
119 device.Unpack(d)
120 else:
121 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
122 reason="device-invalid")
123 p = IntType()
124 if port_no:
125 port_no.Unpack(p)
126 else:
127 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
128 reason="port-no-invalid")
129
130 return True, self.adapter.get_ofp_port_info(d, p.val)
131
khenaidoo944aee72019-02-28 11:00:24 -0500132 def reconcile_device(self, device, **kwargs):
Matt Jeanneretec797262020-01-09 11:55:36 -0500133 d = Device()
134 if device:
135 device.Unpack(d)
136 return True, self.adapter.reconcile_device(d)
137 else:
138 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
139 reason="device-invalid")
Chip Boling67b674a2019-02-08 11:42:18 -0600140
khenaidoo944aee72019-02-28 11:00:24 -0500141 def abandon_device(self, device, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600142 return self.adapter.abandon_device(device)
143
khenaidoo944aee72019-02-28 11:00:24 -0500144 def disable_device(self, device, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600145 d = Device()
146 if device:
147 device.Unpack(d)
148 return True, self.adapter.disable_device(d)
149 else:
150 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
151 reason="device-invalid")
152
khenaidoo944aee72019-02-28 11:00:24 -0500153 def reenable_device(self, device, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600154 d = Device()
155 if device:
156 device.Unpack(d)
157 return True, self.adapter.reenable_device(d)
158 else:
159 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
160 reason="device-invalid")
161
khenaidoo944aee72019-02-28 11:00:24 -0500162 def reboot_device(self, device, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600163 d = Device()
164 if device:
165 device.Unpack(d)
166 return (True, self.adapter.reboot_device(d))
167 else:
168 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
169 reason="device-invalid")
170
khenaidoo944aee72019-02-28 11:00:24 -0500171 def download_image(self, device, request, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600172 d = Device()
173 if device:
174 device.Unpack(d)
175 else:
176 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
177 reason="device-invalid")
178 img = ImageDownload()
179 if request:
180 request.Unpack(img)
181 else:
182 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
183 reason="port-no-invalid")
184
185 return True, self.adapter.download_image(device, request)
186
khenaidoo944aee72019-02-28 11:00:24 -0500187 def get_image_download_status(self, device, request, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600188 d = Device()
189 if device:
190 device.Unpack(d)
191 else:
192 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
193 reason="device-invalid")
194 img = ImageDownload()
195 if request:
196 request.Unpack(img)
197 else:
198 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
199 reason="port-no-invalid")
200
201 return True, self.adapter.get_image_download_status(device, request)
202
khenaidoo944aee72019-02-28 11:00:24 -0500203 def cancel_image_download(self, device, request, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600204 d = Device()
205 if device:
206 device.Unpack(d)
207 else:
208 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
209 reason="device-invalid")
210 img = ImageDownload()
211 if request:
212 request.Unpack(img)
213 else:
214 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
215 reason="port-no-invalid")
216
217 return True, self.adapter.cancel_image_download(device, request)
218
khenaidoo944aee72019-02-28 11:00:24 -0500219 def activate_image_update(self, device, request, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600220 d = Device()
221 if device:
222 device.Unpack(d)
223 else:
224 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
225 reason="device-invalid")
226 img = ImageDownload()
227 if request:
228 request.Unpack(img)
229 else:
230 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
231 reason="port-no-invalid")
232
233 return True, self.adapter.activate_image_update(device, request)
234
khenaidoo944aee72019-02-28 11:00:24 -0500235 def revert_image_update(self, device, request, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600236 d = Device()
237 if device:
238 device.Unpack(d)
239 else:
240 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
241 reason="device-invalid")
242 img = ImageDownload()
243 if request:
244 request.Unpack(img)
245 else:
246 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
247 reason="port-no-invalid")
248
249 return True, self.adapter.revert_image_update(device, request)
250
251
khenaidoo944aee72019-02-28 11:00:24 -0500252 def self_test(self, device, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600253 return self.adapter.self_test_device(device)
254
khenaidoo944aee72019-02-28 11:00:24 -0500255 def delete_device(self, device, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600256 d = Device()
257 if device:
258 device.Unpack(d)
259 result = self.adapter.delete_device(d)
260 # return (True, self.adapter.delete_device(d))
261
262 # Before we return, delete the device specific topic as we will no
263 # longer receive requests from the Core for that device
264 kafka_proxy = get_messaging_proxy()
265 device_topic = kafka_proxy.get_default_topic() + "/" + d.id
266 kafka_proxy.unsubscribe(topic=device_topic)
267
268 return (True, result)
269 else:
270 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
271 reason="device-invalid")
272
khenaidoo944aee72019-02-28 11:00:24 -0500273 def get_device_details(self, device, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600274 return self.adapter.get_device_details(device)
275
khenaidoo944aee72019-02-28 11:00:24 -0500276 def update_flows_bulk(self, device, flows, groups, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600277 d = Device()
278 if device:
279 device.Unpack(d)
280 else:
281 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
282 reason="device-invalid")
283 f = Flows()
284 if flows:
285 flows.Unpack(f)
286
287 g = FlowGroups()
288 if groups:
289 groups.Unpack(g)
290
291 return (True, self.adapter.update_flows_bulk(d, f, g))
292
khenaidoo944aee72019-02-28 11:00:24 -0500293 def update_flows_incrementally(self, device, flow_changes, group_changes, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600294 d = Device()
295 if device:
296 device.Unpack(d)
297 else:
298 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
299 reason="device-invalid")
300 f = FlowChanges()
301 if flow_changes:
302 flow_changes.Unpack(f)
303
304 g = FlowGroupChanges()
305 if group_changes:
306 group_changes.Unpack(g)
307
308 return (True, self.adapter.update_flows_incrementally(d, f, g))
309
khenaidoo944aee72019-02-28 11:00:24 -0500310 def suppress_alarm(self, filter, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600311 return self.adapter.suppress_alarm(filter)
312
khenaidoo944aee72019-02-28 11:00:24 -0500313 def unsuppress_alarm(self, filter, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600314 return self.adapter.unsuppress_alarm(filter)
315
khenaidoo944aee72019-02-28 11:00:24 -0500316 def process_inter_adapter_message(self, msg, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600317 m = InterAdapterMessage()
318 if msg:
319 msg.Unpack(m)
320 else:
321 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
322 reason="msg-invalid")
323
324 return (True, self.adapter.process_inter_adapter_message(m))
325
326
khenaidoo944aee72019-02-28 11:00:24 -0500327 def receive_packet_out(self, deviceId, outPort, packet, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600328 try:
329 d_id = StrType()
330 if deviceId:
331 deviceId.Unpack(d_id)
332 else:
333 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
334 reason="deviceid-invalid")
335
336 op = IntType()
337 if outPort:
338 outPort.Unpack(op)
339 else:
340 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
341 reason="outport-invalid")
342
343 p = ofp_packet_out()
344 if packet:
345 packet.Unpack(p)
346 else:
347 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
348 reason="packet-invalid")
349
350 return (True, self.adapter.receive_packet_out(d_id.val, op.val, p))
351 except Exception as e:
352 log.exception("error-processing-receive_packet_out", e=e)
serkant.uluderyaece067c2019-04-10 09:13:48 -0700353
354 def simulate_alarm(self, device, request, **kwargs):
355 d = Device()
356 if device:
357 device.Unpack(d)
358 else:
359 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
360 reason="device-invalid")
361 req = SimulateAlarmRequest()
362 if request:
363 request.Unpack(req)
364 else:
365 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
366 reason="simulate-alarm-request-invalid")
367
368 return True, self.adapter.simulate_alarm(d, req)
Chip Boling67b674a2019-02-08 11:42:18 -0600369