blob: 0efb811d37b0282654bb953f6f1bcbf839eba75c [file] [log] [blame]
khenaidoob9203542018-09-17 22:56:37 -04001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
khenaidoo6fdf0ba2018-11-02 14:38:33 -040018This facade handles kafka-formatted messages from the Core, extracts the kafka
19formatting and forwards the request to the concrete handler.
khenaidoob9203542018-09-17 22:56:37 -040020"""
khenaidoo90847922018-12-03 14:47:51 -050021import structlog
khenaidoo6fdf0ba2018-11-02 14:38:33 -040022from twisted.internet.defer import inlineCallbacks
khenaidoob9203542018-09-17 22:56:37 -040023from zope.interface import implementer
khenaidoo43c82122018-11-22 18:38:28 -050024from twisted.internet import reactor
khenaidoo90847922018-12-03 14:47:51 -050025from afkak.consumer import OFFSET_LATEST, OFFSET_EARLIEST
khenaidoofdbad6e2018-11-06 22:26:38 -050026from python.adapters.interface import IAdapterInterface
khenaidoo79232702018-12-04 11:00:41 -050027from python.protos.inter_container_pb2 import IntType, InterAdapterMessage, StrType, Error, ErrorCode
khenaidoofdbad6e2018-11-06 22:26:38 -050028from python.protos.device_pb2 import Device
29from python.protos.openflow_13_pb2 import FlowChanges, FlowGroups, Flows, \
30 FlowGroupChanges, ofp_packet_out
khenaidoo43c82122018-11-22 18:38:28 -050031from python.adapters.kafka.kafka_inter_container_library import IKafkaMessagingProxy, \
32 get_messaging_proxy
khenaidoob9203542018-09-17 22:56:37 -040033
khenaidoo90847922018-12-03 14:47:51 -050034log = structlog.get_logger()
khenaidoob9203542018-09-17 22:56:37 -040035
36class MacAddressError(BaseException):
37 def __init__(self, error):
38 self.error = error
39
40
41class IDError(BaseException):
42 def __init__(self, error):
43 self.error = error
44
45
46@implementer(IAdapterInterface)
47class AdapterRequestFacade(object):
48 """
49 Gate-keeper between CORE and device adapters.
50
51 On one side it interacts with Core's internal model and update/dispatch
52 mechanisms.
53
54 On the other side, it interacts with the adapters standard interface as
55 defined in
56 """
57
58 def __init__(self, adapter):
59 self.adapter = adapter
60
61 @inlineCallbacks
62 def start(self):
khenaidoo90847922018-12-03 14:47:51 -050063 log.debug('starting')
khenaidoob9203542018-09-17 22:56:37 -040064
65 @inlineCallbacks
66 def stop(self):
khenaidoo90847922018-12-03 14:47:51 -050067 log.debug('stopping')
khenaidoob9203542018-09-17 22:56:37 -040068
khenaidoo90847922018-12-03 14:47:51 -050069 @inlineCallbacks
khenaidoo43c82122018-11-22 18:38:28 -050070 def createKafkaDeviceTopic(self, deviceId):
khenaidoo90847922018-12-03 14:47:51 -050071 log.debug("subscribing-to-topic", device_id=deviceId)
khenaidoo43c82122018-11-22 18:38:28 -050072 kafka_proxy = get_messaging_proxy()
73 device_topic = kafka_proxy.get_default_topic() + "_" + deviceId
khenaidoo90847922018-12-03 14:47:51 -050074 # yield kafka_proxy.create_topic(topic=device_topic)
75 yield kafka_proxy.subscribe(topic=device_topic, target_cls=self, offset=OFFSET_EARLIEST)
76 log.debug("subscribed-to-topic", topic=device_topic)
khenaidoo43c82122018-11-22 18:38:28 -050077
khenaidoob9203542018-09-17 22:56:37 -040078 def adopt_device(self, device):
79 d = Device()
80 if device:
81 device.Unpack(d)
khenaidoo43c82122018-11-22 18:38:28 -050082
khenaidoo90847922018-12-03 14:47:51 -050083 # Start the creation of a device specific topic to handle all
84 # subsequent requests from the Core. This adapter instance will
85 # handle all requests for that device.
86 reactor.callLater(0, self.createKafkaDeviceTopic, d.id)
87
khenaidoo43c82122018-11-22 18:38:28 -050088 result = self.adapter.adopt_device(d)
89 # return True, self.adapter.adopt_device(d)
90
khenaidoo43c82122018-11-22 18:38:28 -050091 return True, result
khenaidoob9203542018-09-17 22:56:37 -040092 else:
khenaidoofdbad6e2018-11-06 22:26:38 -050093 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
94 reason="device-invalid")
khenaidoob9203542018-09-17 22:56:37 -040095
96 def get_ofp_device_info(self, device):
97 d = Device()
98 if device:
99 device.Unpack(d)
khenaidoo92e62c52018-10-03 14:02:54 -0400100 return True, self.adapter.get_ofp_device_info(d)
khenaidoob9203542018-09-17 22:56:37 -0400101 else:
khenaidoofdbad6e2018-11-06 22:26:38 -0500102 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
103 reason="device-invalid")
khenaidoob9203542018-09-17 22:56:37 -0400104
105 def get_ofp_port_info(self, device, port_no):
106 d = Device()
107 if device:
108 device.Unpack(d)
109 else:
khenaidoofdbad6e2018-11-06 22:26:38 -0500110 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
111 reason="device-invalid")
khenaidoob9203542018-09-17 22:56:37 -0400112 p = IntType()
khenaidoofdbad6e2018-11-06 22:26:38 -0500113 if port_no:
114 port_no.Unpack(p)
115 else:
116 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
117 reason="port-no-invalid")
khenaidoob9203542018-09-17 22:56:37 -0400118
khenaidoo92e62c52018-10-03 14:02:54 -0400119 return True, self.adapter.get_ofp_port_info(d, p.val)
khenaidoob9203542018-09-17 22:56:37 -0400120
khenaidoob9203542018-09-17 22:56:37 -0400121 def reconcile_device(self, device):
122 return self.adapter.reconcile_device(device)
123
124 def abandon_device(self, device):
125 return self.adapter.abandon_device(device)
126
127 def disable_device(self, device):
khenaidoo92e62c52018-10-03 14:02:54 -0400128 d = Device()
129 if device:
130 device.Unpack(d)
131 return True, self.adapter.disable_device(d)
132 else:
khenaidoofdbad6e2018-11-06 22:26:38 -0500133 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
134 reason="device-invalid")
khenaidoob9203542018-09-17 22:56:37 -0400135
136 def reenable_device(self, device):
khenaidoo92e62c52018-10-03 14:02:54 -0400137 d = Device()
138 if device:
139 device.Unpack(d)
140 return True, self.adapter.reenable_device(d)
141 else:
khenaidoofdbad6e2018-11-06 22:26:38 -0500142 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
143 reason="device-invalid")
khenaidoob9203542018-09-17 22:56:37 -0400144
145 def reboot_device(self, device):
146 d = Device()
147 if device:
148 device.Unpack(d)
149 return (True, self.adapter.reboot_device(d))
150 else:
khenaidoofdbad6e2018-11-06 22:26:38 -0500151 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
152 reason="device-invalid")
khenaidoob9203542018-09-17 22:56:37 -0400153
154 def download_image(self, device, request):
155 return self.adapter.download_image(device, request)
156
157 def get_image_download_status(self, device, request):
158 return self.adapter.get_image_download_status(device, request)
159
160 def cancel_image_download(self, device, request):
161 return self.adapter.cancel_image_download(device, request)
162
163 def activate_image_update(self, device, request):
164 return self.adapter.activate_image_update(device, request)
165
166 def revert_image_update(self, device, request):
167 return self.adapter.revert_image_update(device, request)
168
169 def self_test(self, device):
170 return self.adapter.self_test_device(device)
171
172 def delete_device(self, device):
khenaidoo4d4802d2018-10-04 21:59:49 -0400173 d = Device()
174 if device:
175 device.Unpack(d)
khenaidoo43c82122018-11-22 18:38:28 -0500176 result = self.adapter.delete_device(d)
177 # return (True, self.adapter.delete_device(d))
178
179 # Before we return, delete the device specific topic as we will no
180 # longer receive requests from the Core for that device
181 kafka_proxy = get_messaging_proxy()
182 device_topic = kafka_proxy.get_default_topic() + "/" + d.id
183 kafka_proxy.unsubscribe(topic=device_topic)
184
185 return (True, result)
khenaidoo4d4802d2018-10-04 21:59:49 -0400186 else:
khenaidoofdbad6e2018-11-06 22:26:38 -0500187 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
188 reason="device-invalid")
khenaidoob9203542018-09-17 22:56:37 -0400189
190 def get_device_details(self, device):
191 return self.adapter.get_device_details(device)
192
193 def update_flows_bulk(self, device, flows, groups):
khenaidoo19d7b632018-10-30 10:49:50 -0400194 d = Device()
195 if device:
196 device.Unpack(d)
197 else:
khenaidoofdbad6e2018-11-06 22:26:38 -0500198 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
199 reason="device-invalid")
khenaidoo19d7b632018-10-30 10:49:50 -0400200 f = Flows()
201 if flows:
202 flows.Unpack(f)
203
204 g = FlowGroups()
205 if groups:
206 groups.Unpack(g)
207
208 return (True, self.adapter.update_flows_bulk(d, f, g))
khenaidoob9203542018-09-17 22:56:37 -0400209
210 def update_flows_incrementally(self, device, flow_changes, group_changes):
khenaidoo19d7b632018-10-30 10:49:50 -0400211 d = Device()
212 if device:
213 device.Unpack(d)
214 else:
khenaidoofdbad6e2018-11-06 22:26:38 -0500215 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
216 reason="device-invalid")
khenaidoo19d7b632018-10-30 10:49:50 -0400217 f = FlowChanges()
218 if flow_changes:
219 flow_changes.Unpack(f)
220
221 g = FlowGroupChanges()
222 if group_changes:
223 group_changes.Unpack(g)
224
225 return (True, self.adapter.update_flows_incrementally(d, f, g))
khenaidoob9203542018-09-17 22:56:37 -0400226
227 def suppress_alarm(self, filter):
228 return self.adapter.suppress_alarm(filter)
229
230 def unsuppress_alarm(self, filter):
231 return self.adapter.unsuppress_alarm(filter)
232
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400233 def process_inter_adapter_message(self, msg):
234 m = InterAdapterMessage()
235 if msg:
236 msg.Unpack(m)
237 else:
khenaidoofdbad6e2018-11-06 22:26:38 -0500238 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
239 reason="msg-invalid")
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400240
241 return (True, self.adapter.process_inter_adapter_message(m))
khenaidoofdbad6e2018-11-06 22:26:38 -0500242
243
244 def receive_packet_out(self, deviceId, outPort, packet):
245 d_id = StrType()
246 if deviceId:
247 deviceId.Unpack(d_id)
248 else:
249 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
250 reason="deviceid-invalid")
251
252 op = IntType
253 if outPort:
254 outPort.Unpack(op)
255 else:
256 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
257 reason="outport-invalid")
258
259 p = ofp_packet_out()
260 if packet:
261 packet.Unpack(p)
262 else:
263 return False, Error(code=ErrorCode.INVALID_PARAMETERS,
264 reason="packet-invalid")
265
266 return (True, self.adapter.receive_packet_out(d_id, op, p))
267