blob: fccb04962fe2e1bd87deb23c9a3eb22b6a9e259c [file] [log] [blame]
William Kurkian6f436d02019-02-06 16:25:01 -05001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18This facade handles kafka-formatted messages from the Core, extracts the kafka
19formatting and forwards the request to the concrete handler.
20"""
21import structlog
22from twisted.internet.defer import inlineCallbacks
23from zope.interface import implementer
24from twisted.internet import reactor
25
26from afkak.consumer import OFFSET_LATEST, OFFSET_EARLIEST
27from python.adapters.interface import IAdapterInterface
28from python.protos.inter_container_pb2 import IntType, InterAdapterMessage, StrType, Error, ErrorCode
29from python.protos.device_pb2 import Device, ImageDownload
30from python.protos.openflow_13_pb2 import FlowChanges, FlowGroups, Flows, \
31 FlowGroupChanges, ofp_packet_out
32from python.adapters.kafka.kafka_inter_container_library import IKafkaMessagingProxy, \
33 get_messaging_proxy, KAFKA_OFFSET_LATEST, KAFKA_OFFSET_EARLIEST
34
35log = structlog.get_logger()
36
37class MacAddressError(BaseException):
38 def __init__(self, error):
39 self.error = error
40
41
42class IDError(BaseException):
43 def __init__(self, error):
44 self.error = error
45
46
47@implementer(IAdapterInterface)
48class AdapterRequestFacade(object):
49 """
50 Gate-keeper between CORE and device adapters.
51
52 On one side it interacts with Core's internal model and update/dispatch
53 mechanisms.
54
55 On the other side, it interacts with the adapters standard interface as
56 defined in
57 """
58
59 def __init__(self, adapter):
60 self.adapter = adapter
61
62 @inlineCallbacks
63 def start(self):
64 log.debug('starting')
65
66 @inlineCallbacks
67 def stop(self):
68 log.debug('stopping')
69
70 @inlineCallbacks
71 def createKafkaDeviceTopic(self, deviceId):
72 log.debug("subscribing-to-topic", device_id=deviceId)
73 kafka_proxy = get_messaging_proxy()
74 device_topic = kafka_proxy.get_default_topic() + "_" + deviceId
75 # yield kafka_proxy.create_topic(topic=device_topic)
76 yield kafka_proxy.subscribe(topic=device_topic, group_id=device_topic, target_cls=self, offset=KAFKA_OFFSET_EARLIEST)
77 log.debug("subscribed-to-topic", topic=device_topic)
78
79 def adopt_device(self, device):
80 d = Device()
81 if device:
82 device.Unpack(d)
83
84 # Start the creation of a device specific topic to handle all
85 # subsequent requests from the Core. This adapter instance will
86 # handle all requests for that device.
87 reactor.callLater(0, self.createKafkaDeviceTopic, d.id)
88
89 result = self.adapter.adopt_device(d)
90 # return True, self.adapter.adopt_device(d)
91
92 return True, result
93 else:
94 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
95 reason="device-invalid")
96
97 def get_ofp_device_info(self, device):
98 d = Device()
99 if device:
100 device.Unpack(d)
101 return True, self.adapter.get_ofp_device_info(d)
102 else:
103 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
104 reason="device-invalid")
105
106 def get_ofp_port_info(self, device, port_no):
107 d = Device()
108 if device:
109 device.Unpack(d)
110 else:
111 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
112 reason="device-invalid")
113 p = IntType()
114 if port_no:
115 port_no.Unpack(p)
116 else:
117 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
118 reason="port-no-invalid")
119
120 return True, self.adapter.get_ofp_port_info(d, p.val)
121
122 def reconcile_device(self, device):
123 return self.adapter.reconcile_device(device)
124
125 def abandon_device(self, device):
126 return self.adapter.abandon_device(device)
127
128 def disable_device(self, device):
129 d = Device()
130 if device:
131 device.Unpack(d)
132 return True, self.adapter.disable_device(d)
133 else:
134 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
135 reason="device-invalid")
136
137 def reenable_device(self, device):
138 d = Device()
139 if device:
140 device.Unpack(d)
141 return True, self.adapter.reenable_device(d)
142 else:
143 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
144 reason="device-invalid")
145
146 def reboot_device(self, device):
147 d = Device()
148 if device:
149 device.Unpack(d)
150 return (True, self.adapter.reboot_device(d))
151 else:
152 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
153 reason="device-invalid")
154
155 def download_image(self, device, request):
156 d = Device()
157 if device:
158 device.Unpack(d)
159 else:
160 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
161 reason="device-invalid")
162 img = ImageDownload()
163 if request:
164 request.Unpack(img)
165 else:
166 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
167 reason="port-no-invalid")
168
169 return True, self.adapter.download_image(device, request)
170
171 def get_image_download_status(self, device, request):
172 d = Device()
173 if device:
174 device.Unpack(d)
175 else:
176 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
177 reason="device-invalid")
178 img = ImageDownload()
179 if request:
180 request.Unpack(img)
181 else:
182 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
183 reason="port-no-invalid")
184
185 return True, self.adapter.get_image_download_status(device, request)
186
187 def cancel_image_download(self, device, request):
188 d = Device()
189 if device:
190 device.Unpack(d)
191 else:
192 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
193 reason="device-invalid")
194 img = ImageDownload()
195 if request:
196 request.Unpack(img)
197 else:
198 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
199 reason="port-no-invalid")
200
201 return True, self.adapter.cancel_image_download(device, request)
202
203 def activate_image_update(self, device, request):
204 d = Device()
205 if device:
206 device.Unpack(d)
207 else:
208 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
209 reason="device-invalid")
210 img = ImageDownload()
211 if request:
212 request.Unpack(img)
213 else:
214 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
215 reason="port-no-invalid")
216
217 return True, self.adapter.activate_image_update(device, request)
218
219 def revert_image_update(self, device, request):
220 d = Device()
221 if device:
222 device.Unpack(d)
223 else:
224 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
225 reason="device-invalid")
226 img = ImageDownload()
227 if request:
228 request.Unpack(img)
229 else:
230 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
231 reason="port-no-invalid")
232
233 return True, self.adapter.revert_image_update(device, request)
234
235
236 def self_test(self, device):
237 return self.adapter.self_test_device(device)
238
239 def delete_device(self, device):
240 d = Device()
241 if device:
242 device.Unpack(d)
243 result = self.adapter.delete_device(d)
244 # return (True, self.adapter.delete_device(d))
245
246 # Before we return, delete the device specific topic as we will no
247 # longer receive requests from the Core for that device
248 kafka_proxy = get_messaging_proxy()
249 device_topic = kafka_proxy.get_default_topic() + "/" + d.id
250 kafka_proxy.unsubscribe(topic=device_topic)
251
252 return (True, result)
253 else:
254 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
255 reason="device-invalid")
256
257 def get_device_details(self, device):
258 return self.adapter.get_device_details(device)
259
260 def update_flows_bulk(self, device, flows, groups):
261 d = Device()
262 if device:
263 device.Unpack(d)
264 else:
265 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
266 reason="device-invalid")
267 f = Flows()
268 if flows:
269 flows.Unpack(f)
270
271 g = FlowGroups()
272 if groups:
273 groups.Unpack(g)
274
275 return (True, self.adapter.update_flows_bulk(d, f, g))
276
277 def update_flows_incrementally(self, device, flow_changes, group_changes):
278 d = Device()
279 if device:
280 device.Unpack(d)
281 else:
282 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
283 reason="device-invalid")
284 f = FlowChanges()
285 if flow_changes:
286 flow_changes.Unpack(f)
287
288 g = FlowGroupChanges()
289 if group_changes:
290 group_changes.Unpack(g)
291
292 return (True, self.adapter.update_flows_incrementally(d, f, g))
293
294 def suppress_alarm(self, filter):
295 return self.adapter.suppress_alarm(filter)
296
297 def unsuppress_alarm(self, filter):
298 return self.adapter.unsuppress_alarm(filter)
299
300 def process_inter_adapter_message(self, msg):
301 m = InterAdapterMessage()
302 if msg:
303 msg.Unpack(m)
304 else:
305 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
306 reason="msg-invalid")
307
308 return (True, self.adapter.process_inter_adapter_message(m))
309
310
311 def receive_packet_out(self, deviceId, outPort, packet):
312 try:
313 d_id = StrType()
314 if deviceId:
315 deviceId.Unpack(d_id)
316 else:
317 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
318 reason="deviceid-invalid")
319
320 op = IntType()
321 if outPort:
322 outPort.Unpack(op)
323 else:
324 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
325 reason="outport-invalid")
326
327 p = ofp_packet_out()
328 if packet:
329 packet.Unpack(p)
330 else:
331 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
332 reason="packet-invalid")
333
334 return (True, self.adapter.receive_packet_out(d_id.val, op.val, p))
335 except Exception as e:
336 log.exception("error-processing-receive_packet_out", e=e)
337