blob: 98ba6806efb9426a253ccdf7609dfc998748a1a6 [file] [log] [blame]
Chip Boling67b674a2019-02-08 11:42:18 -06001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18This facade handles kafka-formatted messages from the Core, extracts the kafka
19formatting and forwards the request to the concrete handler.
20"""
21import structlog
22from twisted.internet.defer import inlineCallbacks
23from zope.interface import implementer
24from twisted.internet import reactor
25
26from afkak.consumer import OFFSET_LATEST, OFFSET_EARLIEST
27from pyvoltha.adapters.interface import IAdapterInterface
William Kurkianede82e92019-03-05 13:02:57 -050028from voltha_protos.inter_container_pb2 import IntType, InterAdapterMessage, StrType, Error, ErrorCode
29from voltha_protos.device_pb2 import Device, ImageDownload
30from voltha_protos.openflow_13_pb2 import FlowChanges, FlowGroups, Flows, \
Chip Boling67b674a2019-02-08 11:42:18 -060031 FlowGroupChanges, ofp_packet_out
32from pyvoltha.adapters.kafka.kafka_inter_container_library import IKafkaMessagingProxy, \
khenaidoo944aee72019-02-28 11:00:24 -050033 get_messaging_proxy, KAFKA_OFFSET_LATEST, KAFKA_OFFSET_EARLIEST, ARG_FROM_TOPIC
Chip Boling67b674a2019-02-08 11:42:18 -060034
35log = structlog.get_logger()
36
37class MacAddressError(BaseException):
38 def __init__(self, error):
39 self.error = error
40
41
42class IDError(BaseException):
43 def __init__(self, error):
44 self.error = error
45
46
47@implementer(IAdapterInterface)
48class AdapterRequestFacade(object):
49 """
50 Gate-keeper between CORE and device adapters.
51
52 On one side it interacts with Core's internal model and update/dispatch
53 mechanisms.
54
55 On the other side, it interacts with the adapters standard interface as
56 defined in
57 """
58
khenaidoo944aee72019-02-28 11:00:24 -050059 def __init__(self, adapter, core_proxy):
Chip Boling67b674a2019-02-08 11:42:18 -060060 self.adapter = adapter
khenaidoo944aee72019-02-28 11:00:24 -050061 self.core_proxy = core_proxy
Chip Boling67b674a2019-02-08 11:42:18 -060062
63 @inlineCallbacks
64 def start(self):
65 log.debug('starting')
66
67 @inlineCallbacks
68 def stop(self):
69 log.debug('stopping')
70
khenaidoo944aee72019-02-28 11:00:24 -050071 # @inlineCallbacks
72 # def createKafkaDeviceTopic(self, deviceId):
73 # log.debug("subscribing-to-topic", device_id=deviceId)
74 # kafka_proxy = get_messaging_proxy()
75 # device_topic = kafka_proxy.get_default_topic() + "_" + deviceId
76 # # yield kafka_proxy.create_topic(topic=device_topic)
77 # yield kafka_proxy.subscribe(topic=device_topic, group_id=device_topic, target_cls=self, offset=KAFKA_OFFSET_EARLIEST)
78 # log.debug("subscribed-to-topic", topic=device_topic)
Chip Boling67b674a2019-02-08 11:42:18 -060079
khenaidoo944aee72019-02-28 11:00:24 -050080 def adopt_device(self, device, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -060081 d = Device()
82 if device:
83 device.Unpack(d)
84
khenaidoo944aee72019-02-28 11:00:24 -050085 # Update the core reference for that device as it will be used
86 # by the adapter to send async messages to the Core.
87 if ARG_FROM_TOPIC in kwargs:
88 t = StrType()
89 kwargs[ARG_FROM_TOPIC].Unpack(t)
90 # Update the core reference for that device
91 self.core_proxy.update_device_core_reference(d.id, t.val)
92
93 # # Start the creation of a device specific topic to handle all
94 # # subsequent requests from the Core. This adapter instance will
95 # # handle all requests for that device.
96 # reactor.callLater(0, self.createKafkaDeviceTopic, d.id)
Chip Boling67b674a2019-02-08 11:42:18 -060097
98 result = self.adapter.adopt_device(d)
99 # return True, self.adapter.adopt_device(d)
100
101 return True, result
102 else:
103 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
104 reason="device-invalid")
105
khenaidoo944aee72019-02-28 11:00:24 -0500106 def get_ofp_device_info(self, device, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600107 d = Device()
108 if device:
109 device.Unpack(d)
110 return True, self.adapter.get_ofp_device_info(d)
111 else:
112 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
113 reason="device-invalid")
114
khenaidoo944aee72019-02-28 11:00:24 -0500115 def get_ofp_port_info(self, device, port_no, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600116 d = Device()
117 if device:
118 device.Unpack(d)
119 else:
120 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
121 reason="device-invalid")
122 p = IntType()
123 if port_no:
124 port_no.Unpack(p)
125 else:
126 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
127 reason="port-no-invalid")
128
129 return True, self.adapter.get_ofp_port_info(d, p.val)
130
khenaidoo944aee72019-02-28 11:00:24 -0500131 def reconcile_device(self, device, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600132 return self.adapter.reconcile_device(device)
133
khenaidoo944aee72019-02-28 11:00:24 -0500134 def abandon_device(self, device, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600135 return self.adapter.abandon_device(device)
136
khenaidoo944aee72019-02-28 11:00:24 -0500137 def disable_device(self, device, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600138 d = Device()
139 if device:
140 device.Unpack(d)
141 return True, self.adapter.disable_device(d)
142 else:
143 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
144 reason="device-invalid")
145
khenaidoo944aee72019-02-28 11:00:24 -0500146 def reenable_device(self, device, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600147 d = Device()
148 if device:
149 device.Unpack(d)
150 return True, self.adapter.reenable_device(d)
151 else:
152 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
153 reason="device-invalid")
154
khenaidoo944aee72019-02-28 11:00:24 -0500155 def reboot_device(self, device, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600156 d = Device()
157 if device:
158 device.Unpack(d)
159 return (True, self.adapter.reboot_device(d))
160 else:
161 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
162 reason="device-invalid")
163
khenaidoo944aee72019-02-28 11:00:24 -0500164 def download_image(self, device, request, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600165 d = Device()
166 if device:
167 device.Unpack(d)
168 else:
169 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
170 reason="device-invalid")
171 img = ImageDownload()
172 if request:
173 request.Unpack(img)
174 else:
175 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
176 reason="port-no-invalid")
177
178 return True, self.adapter.download_image(device, request)
179
khenaidoo944aee72019-02-28 11:00:24 -0500180 def get_image_download_status(self, device, request, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600181 d = Device()
182 if device:
183 device.Unpack(d)
184 else:
185 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
186 reason="device-invalid")
187 img = ImageDownload()
188 if request:
189 request.Unpack(img)
190 else:
191 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
192 reason="port-no-invalid")
193
194 return True, self.adapter.get_image_download_status(device, request)
195
khenaidoo944aee72019-02-28 11:00:24 -0500196 def cancel_image_download(self, device, request, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600197 d = Device()
198 if device:
199 device.Unpack(d)
200 else:
201 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
202 reason="device-invalid")
203 img = ImageDownload()
204 if request:
205 request.Unpack(img)
206 else:
207 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
208 reason="port-no-invalid")
209
210 return True, self.adapter.cancel_image_download(device, request)
211
khenaidoo944aee72019-02-28 11:00:24 -0500212 def activate_image_update(self, device, request, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600213 d = Device()
214 if device:
215 device.Unpack(d)
216 else:
217 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
218 reason="device-invalid")
219 img = ImageDownload()
220 if request:
221 request.Unpack(img)
222 else:
223 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
224 reason="port-no-invalid")
225
226 return True, self.adapter.activate_image_update(device, request)
227
khenaidoo944aee72019-02-28 11:00:24 -0500228 def revert_image_update(self, device, request, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600229 d = Device()
230 if device:
231 device.Unpack(d)
232 else:
233 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
234 reason="device-invalid")
235 img = ImageDownload()
236 if request:
237 request.Unpack(img)
238 else:
239 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
240 reason="port-no-invalid")
241
242 return True, self.adapter.revert_image_update(device, request)
243
244
khenaidoo944aee72019-02-28 11:00:24 -0500245 def self_test(self, device, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600246 return self.adapter.self_test_device(device)
247
khenaidoo944aee72019-02-28 11:00:24 -0500248 def delete_device(self, device, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600249 d = Device()
250 if device:
251 device.Unpack(d)
252 result = self.adapter.delete_device(d)
253 # return (True, self.adapter.delete_device(d))
254
255 # Before we return, delete the device specific topic as we will no
256 # longer receive requests from the Core for that device
257 kafka_proxy = get_messaging_proxy()
258 device_topic = kafka_proxy.get_default_topic() + "/" + d.id
259 kafka_proxy.unsubscribe(topic=device_topic)
260
261 return (True, result)
262 else:
263 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
264 reason="device-invalid")
265
khenaidoo944aee72019-02-28 11:00:24 -0500266 def get_device_details(self, device, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600267 return self.adapter.get_device_details(device)
268
khenaidoo944aee72019-02-28 11:00:24 -0500269 def update_flows_bulk(self, device, flows, groups, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600270 d = Device()
271 if device:
272 device.Unpack(d)
273 else:
274 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
275 reason="device-invalid")
276 f = Flows()
277 if flows:
278 flows.Unpack(f)
279
280 g = FlowGroups()
281 if groups:
282 groups.Unpack(g)
283
284 return (True, self.adapter.update_flows_bulk(d, f, g))
285
khenaidoo944aee72019-02-28 11:00:24 -0500286 def update_flows_incrementally(self, device, flow_changes, group_changes, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600287 d = Device()
288 if device:
289 device.Unpack(d)
290 else:
291 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
292 reason="device-invalid")
293 f = FlowChanges()
294 if flow_changes:
295 flow_changes.Unpack(f)
296
297 g = FlowGroupChanges()
298 if group_changes:
299 group_changes.Unpack(g)
300
301 return (True, self.adapter.update_flows_incrementally(d, f, g))
302
khenaidoo944aee72019-02-28 11:00:24 -0500303 def suppress_alarm(self, filter, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600304 return self.adapter.suppress_alarm(filter)
305
khenaidoo944aee72019-02-28 11:00:24 -0500306 def unsuppress_alarm(self, filter, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600307 return self.adapter.unsuppress_alarm(filter)
308
khenaidoo944aee72019-02-28 11:00:24 -0500309 def process_inter_adapter_message(self, msg, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600310 m = InterAdapterMessage()
311 if msg:
312 msg.Unpack(m)
313 else:
314 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
315 reason="msg-invalid")
316
317 return (True, self.adapter.process_inter_adapter_message(m))
318
319
khenaidoo944aee72019-02-28 11:00:24 -0500320 def receive_packet_out(self, deviceId, outPort, packet, **kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600321 try:
322 d_id = StrType()
323 if deviceId:
324 deviceId.Unpack(d_id)
325 else:
326 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
327 reason="deviceid-invalid")
328
329 op = IntType()
330 if outPort:
331 outPort.Unpack(op)
332 else:
333 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
334 reason="outport-invalid")
335
336 p = ofp_packet_out()
337 if packet:
338 packet.Unpack(p)
339 else:
340 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
341 reason="packet-invalid")
342
343 return (True, self.adapter.receive_packet_out(d_id.val, op.val, p))
344 except Exception as e:
345 log.exception("error-processing-receive_packet_out", e=e)
346