blob: 68344fce3efe170355954d56085bb345f02d85c7 [file] [log] [blame]
khenaidoob9203542018-09-17 22:56:37 -04001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
khenaidoo6fdf0ba2018-11-02 14:38:33 -040018This facade handles kafka-formatted messages from the Core, extracts the kafka
19formatting and forwards the request to the concrete handler.
khenaidoob9203542018-09-17 22:56:37 -040020"""
khenaidoo90847922018-12-03 14:47:51 -050021import structlog
khenaidoo6fdf0ba2018-11-02 14:38:33 -040022from twisted.internet.defer import inlineCallbacks
khenaidoob9203542018-09-17 22:56:37 -040023from zope.interface import implementer
khenaidoo43c82122018-11-22 18:38:28 -050024from twisted.internet import reactor
khenaidooca301322019-01-09 23:06:32 -050025
khenaidoo90847922018-12-03 14:47:51 -050026from afkak.consumer import OFFSET_LATEST, OFFSET_EARLIEST
khenaidoofdbad6e2018-11-06 22:26:38 -050027from python.adapters.interface import IAdapterInterface
khenaidoo79232702018-12-04 11:00:41 -050028from python.protos.inter_container_pb2 import IntType, InterAdapterMessage, StrType, Error, ErrorCode
khenaidoof5a5bfa2019-01-23 22:20:29 -050029from python.protos.device_pb2 import Device, ImageDownload
khenaidoofdbad6e2018-11-06 22:26:38 -050030from python.protos.openflow_13_pb2 import FlowChanges, FlowGroups, Flows, \
31 FlowGroupChanges, ofp_packet_out
khenaidoo43c82122018-11-22 18:38:28 -050032from python.adapters.kafka.kafka_inter_container_library import IKafkaMessagingProxy, \
khenaidoo54e0ddf2019-02-27 16:21:33 -050033 get_messaging_proxy, KAFKA_OFFSET_LATEST, KAFKA_OFFSET_EARLIEST, ARG_FROM_TOPIC
khenaidoob9203542018-09-17 22:56:37 -040034
khenaidoo90847922018-12-03 14:47:51 -050035log = structlog.get_logger()
khenaidoob9203542018-09-17 22:56:37 -040036
37class MacAddressError(BaseException):
38 def __init__(self, error):
39 self.error = error
40
41
42class IDError(BaseException):
43 def __init__(self, error):
44 self.error = error
45
46
47@implementer(IAdapterInterface)
48class AdapterRequestFacade(object):
49 """
50 Gate-keeper between CORE and device adapters.
51
52 On one side it interacts with Core's internal model and update/dispatch
53 mechanisms.
54
55 On the other side, it interacts with the adapters standard interface as
56 defined in
57 """
58
khenaidoo54e0ddf2019-02-27 16:21:33 -050059 def __init__(self, adapter, core_proxy):
khenaidoob9203542018-09-17 22:56:37 -040060 self.adapter = adapter
khenaidoo54e0ddf2019-02-27 16:21:33 -050061 self.core_proxy = core_proxy
khenaidoob9203542018-09-17 22:56:37 -040062
63 @inlineCallbacks
64 def start(self):
khenaidoo90847922018-12-03 14:47:51 -050065 log.debug('starting')
khenaidoob9203542018-09-17 22:56:37 -040066
67 @inlineCallbacks
68 def stop(self):
khenaidoo90847922018-12-03 14:47:51 -050069 log.debug('stopping')
khenaidoob9203542018-09-17 22:56:37 -040070
khenaidoo54e0ddf2019-02-27 16:21:33 -050071 # @inlineCallbacks
72 # def createKafkaDeviceTopic(self, deviceId):
73 # log.debug("subscribing-to-topic", device_id=deviceId)
74 # kafka_proxy = get_messaging_proxy()
75 # device_topic = kafka_proxy.get_default_topic() + "_" + deviceId
76 # # yield kafka_proxy.create_topic(topic=device_topic)
77 # yield kafka_proxy.subscribe(topic=device_topic, group_id=device_topic, target_cls=self, offset=KAFKA_OFFSET_EARLIEST)
78 # log.debug("subscribed-to-topic", topic=device_topic)
khenaidoo43c82122018-11-22 18:38:28 -050079
khenaidoo54e0ddf2019-02-27 16:21:33 -050080 def adopt_device(self, device, **kwargs):
khenaidoob9203542018-09-17 22:56:37 -040081 d = Device()
82 if device:
83 device.Unpack(d)
khenaidoo43c82122018-11-22 18:38:28 -050084
khenaidoo54e0ddf2019-02-27 16:21:33 -050085 # Update the core reference for that device as it will be used
86 # by the adapter to send async messages to the Core.
87 if ARG_FROM_TOPIC in kwargs:
88 t = StrType()
89 kwargs[ARG_FROM_TOPIC].Unpack(t)
90 # Update the core reference for that device
91 self.core_proxy.update_device_core_reference(d.id, t.val)
92
93 # # Start the creation of a device specific topic to handle all
94 # # subsequent requests from the Core. This adapter instance will
95 # # handle all requests for that device.
96 # reactor.callLater(0, self.createKafkaDeviceTopic, d.id)
khenaidoo90847922018-12-03 14:47:51 -050097
khenaidoo43c82122018-11-22 18:38:28 -050098 result = self.adapter.adopt_device(d)
99 # return True, self.adapter.adopt_device(d)
100
khenaidoo43c82122018-11-22 18:38:28 -0500101 return True, result
khenaidoob9203542018-09-17 22:56:37 -0400102 else:
khenaidoofdbad6e2018-11-06 22:26:38 -0500103 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
104 reason="device-invalid")
khenaidoob9203542018-09-17 22:56:37 -0400105
khenaidoo54e0ddf2019-02-27 16:21:33 -0500106 def get_ofp_device_info(self, device, **kwargs):
khenaidoob9203542018-09-17 22:56:37 -0400107 d = Device()
108 if device:
109 device.Unpack(d)
khenaidoo92e62c52018-10-03 14:02:54 -0400110 return True, self.adapter.get_ofp_device_info(d)
khenaidoob9203542018-09-17 22:56:37 -0400111 else:
khenaidoofdbad6e2018-11-06 22:26:38 -0500112 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
113 reason="device-invalid")
khenaidoob9203542018-09-17 22:56:37 -0400114
khenaidoo54e0ddf2019-02-27 16:21:33 -0500115 def get_ofp_port_info(self, device, port_no, **kwargs):
khenaidoob9203542018-09-17 22:56:37 -0400116 d = Device()
117 if device:
118 device.Unpack(d)
119 else:
khenaidoofdbad6e2018-11-06 22:26:38 -0500120 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
121 reason="device-invalid")
khenaidoob9203542018-09-17 22:56:37 -0400122 p = IntType()
khenaidoofdbad6e2018-11-06 22:26:38 -0500123 if port_no:
124 port_no.Unpack(p)
125 else:
126 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
127 reason="port-no-invalid")
khenaidoob9203542018-09-17 22:56:37 -0400128
khenaidoo92e62c52018-10-03 14:02:54 -0400129 return True, self.adapter.get_ofp_port_info(d, p.val)
khenaidoob9203542018-09-17 22:56:37 -0400130
khenaidoo54e0ddf2019-02-27 16:21:33 -0500131 def reconcile_device(self, device, **kwargs):
khenaidoob9203542018-09-17 22:56:37 -0400132 return self.adapter.reconcile_device(device)
133
khenaidoo54e0ddf2019-02-27 16:21:33 -0500134 def abandon_device(self, device, **kwargs):
khenaidoob9203542018-09-17 22:56:37 -0400135 return self.adapter.abandon_device(device)
136
khenaidoo54e0ddf2019-02-27 16:21:33 -0500137 def disable_device(self, device, **kwargs):
khenaidoo92e62c52018-10-03 14:02:54 -0400138 d = Device()
139 if device:
140 device.Unpack(d)
141 return True, self.adapter.disable_device(d)
142 else:
khenaidoofdbad6e2018-11-06 22:26:38 -0500143 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
144 reason="device-invalid")
khenaidoob9203542018-09-17 22:56:37 -0400145
khenaidoo54e0ddf2019-02-27 16:21:33 -0500146 def reenable_device(self, device, **kwargs):
khenaidoo92e62c52018-10-03 14:02:54 -0400147 d = Device()
148 if device:
149 device.Unpack(d)
150 return True, self.adapter.reenable_device(d)
151 else:
khenaidoofdbad6e2018-11-06 22:26:38 -0500152 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
153 reason="device-invalid")
khenaidoob9203542018-09-17 22:56:37 -0400154
khenaidoo54e0ddf2019-02-27 16:21:33 -0500155 def reboot_device(self, device, **kwargs):
khenaidoob9203542018-09-17 22:56:37 -0400156 d = Device()
157 if device:
158 device.Unpack(d)
159 return (True, self.adapter.reboot_device(d))
160 else:
khenaidoofdbad6e2018-11-06 22:26:38 -0500161 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
162 reason="device-invalid")
khenaidoob9203542018-09-17 22:56:37 -0400163
khenaidoo54e0ddf2019-02-27 16:21:33 -0500164 def download_image(self, device, request, **kwargs):
khenaidoof5a5bfa2019-01-23 22:20:29 -0500165 d = Device()
166 if device:
167 device.Unpack(d)
168 else:
169 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
170 reason="device-invalid")
171 img = ImageDownload()
172 if request:
173 request.Unpack(img)
174 else:
175 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
176 reason="port-no-invalid")
177
178 return True, self.adapter.download_image(device, request)
khenaidoob9203542018-09-17 22:56:37 -0400179
khenaidoo54e0ddf2019-02-27 16:21:33 -0500180 def get_image_download_status(self, device, request, **kwargs):
khenaidoof5a5bfa2019-01-23 22:20:29 -0500181 d = Device()
182 if device:
183 device.Unpack(d)
184 else:
185 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
186 reason="device-invalid")
187 img = ImageDownload()
188 if request:
189 request.Unpack(img)
190 else:
191 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
192 reason="port-no-invalid")
193
194 return True, self.adapter.get_image_download_status(device, request)
khenaidoob9203542018-09-17 22:56:37 -0400195
khenaidoo54e0ddf2019-02-27 16:21:33 -0500196 def cancel_image_download(self, device, request, **kwargs):
khenaidoof5a5bfa2019-01-23 22:20:29 -0500197 d = Device()
198 if device:
199 device.Unpack(d)
200 else:
201 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
202 reason="device-invalid")
203 img = ImageDownload()
204 if request:
205 request.Unpack(img)
206 else:
207 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
208 reason="port-no-invalid")
209
210 return True, self.adapter.cancel_image_download(device, request)
khenaidoob9203542018-09-17 22:56:37 -0400211
khenaidoo54e0ddf2019-02-27 16:21:33 -0500212 def activate_image_update(self, device, request, **kwargs):
khenaidoof5a5bfa2019-01-23 22:20:29 -0500213 d = Device()
214 if device:
215 device.Unpack(d)
216 else:
217 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
218 reason="device-invalid")
219 img = ImageDownload()
220 if request:
221 request.Unpack(img)
222 else:
223 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
224 reason="port-no-invalid")
225
226 return True, self.adapter.activate_image_update(device, request)
khenaidoob9203542018-09-17 22:56:37 -0400227
khenaidoo54e0ddf2019-02-27 16:21:33 -0500228 def revert_image_update(self, device, request, **kwargs):
khenaidoof5a5bfa2019-01-23 22:20:29 -0500229 d = Device()
230 if device:
231 device.Unpack(d)
232 else:
233 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
234 reason="device-invalid")
235 img = ImageDownload()
236 if request:
237 request.Unpack(img)
238 else:
239 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
240 reason="port-no-invalid")
241
242 return True, self.adapter.revert_image_update(device, request)
243
khenaidoob9203542018-09-17 22:56:37 -0400244
khenaidoo54e0ddf2019-02-27 16:21:33 -0500245 def self_test(self, device, **kwargs):
khenaidoob9203542018-09-17 22:56:37 -0400246 return self.adapter.self_test_device(device)
247
khenaidoo54e0ddf2019-02-27 16:21:33 -0500248 def delete_device(self, device, **kwargs):
khenaidoo4d4802d2018-10-04 21:59:49 -0400249 d = Device()
250 if device:
251 device.Unpack(d)
khenaidoo43c82122018-11-22 18:38:28 -0500252 result = self.adapter.delete_device(d)
253 # return (True, self.adapter.delete_device(d))
254
255 # Before we return, delete the device specific topic as we will no
256 # longer receive requests from the Core for that device
257 kafka_proxy = get_messaging_proxy()
258 device_topic = kafka_proxy.get_default_topic() + "/" + d.id
259 kafka_proxy.unsubscribe(topic=device_topic)
260
261 return (True, result)
khenaidoo4d4802d2018-10-04 21:59:49 -0400262 else:
khenaidoofdbad6e2018-11-06 22:26:38 -0500263 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
264 reason="device-invalid")
khenaidoob9203542018-09-17 22:56:37 -0400265
khenaidoo54e0ddf2019-02-27 16:21:33 -0500266 def get_device_details(self, device, **kwargs):
khenaidoob9203542018-09-17 22:56:37 -0400267 return self.adapter.get_device_details(device)
268
khenaidoo54e0ddf2019-02-27 16:21:33 -0500269 def update_flows_bulk(self, device, flows, groups, **kwargs):
khenaidoo19d7b632018-10-30 10:49:50 -0400270 d = Device()
271 if device:
272 device.Unpack(d)
273 else:
khenaidoofdbad6e2018-11-06 22:26:38 -0500274 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
275 reason="device-invalid")
khenaidoo19d7b632018-10-30 10:49:50 -0400276 f = Flows()
277 if flows:
278 flows.Unpack(f)
279
280 g = FlowGroups()
281 if groups:
282 groups.Unpack(g)
283
284 return (True, self.adapter.update_flows_bulk(d, f, g))
khenaidoob9203542018-09-17 22:56:37 -0400285
khenaidoo54e0ddf2019-02-27 16:21:33 -0500286 def update_flows_incrementally(self, device, flow_changes, group_changes, **kwargs):
khenaidoo19d7b632018-10-30 10:49:50 -0400287 d = Device()
288 if device:
289 device.Unpack(d)
290 else:
khenaidoofdbad6e2018-11-06 22:26:38 -0500291 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
292 reason="device-invalid")
khenaidoo19d7b632018-10-30 10:49:50 -0400293 f = FlowChanges()
294 if flow_changes:
295 flow_changes.Unpack(f)
296
297 g = FlowGroupChanges()
298 if group_changes:
299 group_changes.Unpack(g)
300
301 return (True, self.adapter.update_flows_incrementally(d, f, g))
khenaidoob9203542018-09-17 22:56:37 -0400302
khenaidoo54e0ddf2019-02-27 16:21:33 -0500303 def suppress_alarm(self, filter, **kwargs):
khenaidoob9203542018-09-17 22:56:37 -0400304 return self.adapter.suppress_alarm(filter)
305
khenaidoo54e0ddf2019-02-27 16:21:33 -0500306 def unsuppress_alarm(self, filter, **kwargs):
khenaidoob9203542018-09-17 22:56:37 -0400307 return self.adapter.unsuppress_alarm(filter)
308
khenaidoo54e0ddf2019-02-27 16:21:33 -0500309 def process_inter_adapter_message(self, msg, **kwargs):
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400310 m = InterAdapterMessage()
311 if msg:
312 msg.Unpack(m)
313 else:
khenaidoofdbad6e2018-11-06 22:26:38 -0500314 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
315 reason="msg-invalid")
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400316
317 return (True, self.adapter.process_inter_adapter_message(m))
khenaidoofdbad6e2018-11-06 22:26:38 -0500318
319
khenaidoo54e0ddf2019-02-27 16:21:33 -0500320 def receive_packet_out(self, deviceId, outPort, packet, **kwargs):
khenaidoobcf205b2019-01-25 22:21:14 -0500321 try:
322 d_id = StrType()
323 if deviceId:
324 deviceId.Unpack(d_id)
325 else:
326 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
327 reason="deviceid-invalid")
khenaidoofdbad6e2018-11-06 22:26:38 -0500328
khenaidoobcf205b2019-01-25 22:21:14 -0500329 op = IntType()
330 if outPort:
331 outPort.Unpack(op)
332 else:
333 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
334 reason="outport-invalid")
khenaidoofdbad6e2018-11-06 22:26:38 -0500335
khenaidoobcf205b2019-01-25 22:21:14 -0500336 p = ofp_packet_out()
337 if packet:
338 packet.Unpack(p)
339 else:
340 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
341 reason="packet-invalid")
khenaidoofdbad6e2018-11-06 22:26:38 -0500342
khenaidoobcf205b2019-01-25 22:21:14 -0500343 return (True, self.adapter.receive_packet_out(d_id.val, op.val, p))
344 except Exception as e:
345 log.exception("error-processing-receive_packet_out", e=e)
khenaidoofdbad6e2018-11-06 22:26:38 -0500346