blob: f8f817cec60729725e82d2c7f69f0b057ac7315d [file] [log] [blame]
Stephane Barbarie6e1bd502018-11-05 22:44:45 -05001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import structlog
17from twisted.internet.defer import inlineCallbacks, returnValue
18
19import loxi.of13 as ofp
20from converter import to_loxi, pb2dict, to_grpc
Matteo Scandolo360605d2019-11-05 18:29:17 -080021from binascii import hexlify
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050022
23log = structlog.get_logger()
24
25
26class OpenFlowProtocolError(Exception): pass
27
28
29class OpenFlowProtocolHandler(object):
30
31 ofp_version = [4] # OFAgent supported versions
32
Manikkaraj kb1a10922019-07-29 12:10:34 -040033 MAX_METER_IDS = 4294967295
34 MAX_METER_BANDS = 255
35 MAX_METER_COLORS = 255
36
David Bainbridge006dc842019-11-22 02:05:32 +000037 def __init__(self, datapath_id, device_id, agent, cxn):
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050038 """
39 The upper half of the OpenFlow protocol, focusing on message
40 exchanges.
41 :param agent: Reference to the Agent() instance, can be used to
42 indicate critical errors to break the connection.
43 :param cxn: The lower level message serdes part of the OF protocol.
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050044 """
45 self.datapath_id = datapath_id
46 self.device_id = device_id
47 self.agent = agent
48 self.cxn = cxn
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050049 self.role = None
50
51 @inlineCallbacks
52 def start(self):
53 """A new call is made after a fresh reconnect"""
54
55 log.debug('starting')
56
57 try:
58 support = False
59 # send initial hello message
60 self.cxn.send(ofp.message.hello(elements=[ofp.common.hello_elem_versionbitmap(
61 bitmaps = [ofp.common.hello_elem_bitmap(self.ofp_version)])]))
62 # expect to receive a hello message
63 msg = yield self.cxn.recv_class(ofp.message.hello)
64 # supports only ofp_versions till 31 and single bitmap.
65 if msg:
66 support = ofp.util.verify_version_support(msg,self.ofp_version)
67 if not support:
68 self.cxn.send(ofp.message.hello_failed_error_msg(
69 xid=msg.xid, code=ofp.OFPHFC_INCOMPATIBLE,
70 data='i support only 1.3'))
71 log.error('peer-do-not-support-OpenFlow-version',self.ofp_version)
72
73 while support:
74 req = yield self.cxn.recv_any()
75 handler = self.main_handlers.get(req.type, None)
76 if handler:
77 handler(self, req)
78 else:
79 log.error('cannot-handle',
80 request=req, xid=req.xid, type=req.type)
81
82 except Exception, e:
83 log.exception('exception', e=e)
84
85 log.info('started')
86 returnValue(self)
87
88 def stop(self):
89 log.debug('stopping')
90 pass # nothing to do yet
91 log.info('stopped')
92
93 def handle_echo_request(self, req):
94 self.cxn.send(ofp.message.echo_reply(xid=req.xid))
95
96 @inlineCallbacks
97 def handle_feature_request(self, req):
David Bainbridge006dc842019-11-22 02:05:32 +000098 rpc = self.agent.connection_manager.get_rpc_client()
99 if rpc is not None:
100 device_info = yield rpc.get_device_info(self.device_id)
101 kw = pb2dict(device_info.switch_features)
102 self.cxn.send(ofp.message.features_reply(
103 xid=req.xid,
104 datapath_id=self.datapath_id,
105 **kw))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500106
107 def handle_stats_request(self, req):
108 handler = self.stats_handlers.get(req.stats_type, None)
109 if handler:
110 handler(self, req)
111 else:
112 raise OpenFlowProtocolError(
113 'Cannot handle stats request type "{}"'.format(req.stats_type))
114
115 def handle_barrier_request(self, req):
116 # not really doing barrier yet, but we respond
117 # see https://jira.opencord.org/browse/CORD-823
118 self.cxn.send(ofp.message.barrier_reply(xid=req.xid))
119
120 def handle_experimenter_request(self, req):
121 raise NotImplementedError()
122
123 def handle_flow_mod_request(self, req):
David Bainbridge006dc842019-11-22 02:05:32 +0000124 log.debug('flow mod request')
125 rpc = self.agent.connection_manager.get_rpc_client()
126 if rpc is not None:
127 if self.role == ofp.OFPCR_ROLE_MASTER or self.role == ofp.OFPCR_ROLE_EQUAL:
128 try:
129 grpc_req = to_grpc(req)
130 except Exception, e:
131 log.exception('failed-to-convert', e=e)
132 else:
133 return rpc.update_flow_table(self.device_id, grpc_req)
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500134
David Bainbridge006dc842019-11-22 02:05:32 +0000135 elif self.role == ofp.OFPCR_ROLE_SLAVE:
136 self.cxn.send(ofp.message.bad_request_error_msg(code=ofp.OFPBRC_IS_SLAVE))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500137
Manikkaraj kb1a10922019-07-29 12:10:34 -0400138
139 def handle_meter_mod_request(self, req):
140 log.info('Received handle_meter_mod_request', request=req)
David Bainbridge006dc842019-11-22 02:05:32 +0000141 rpc = self.agent.connection_manager.get_rpc_client()
142 if rpc is not None:
143 if self.role == ofp.OFPCR_ROLE_MASTER or self.role == ofp.OFPCR_ROLE_EQUAL:
144 try:
145 grpc_req = to_grpc(req)
146 except Exception, e:
147 log.exception('failed-to-convert-meter-mod-request', e=e)
148 else:
149 return rpc.update_meter_mod_table(self.device_id, grpc_req)
Manikkaraj kb1a10922019-07-29 12:10:34 -0400150
David Bainbridge006dc842019-11-22 02:05:32 +0000151 elif self.role == ofp.OFPCR_ROLE_SLAVE:
152 self.cxn.send(ofp.message.bad_request_error_msg(code=ofp.OFPBRC_IS_SLAVE))
Manikkaraj kb1a10922019-07-29 12:10:34 -0400153
154 @inlineCallbacks
155 def handle_meter_stats_request(self, req):
156 log.info('Received handle_meter_stats_request', request=req)
David Bainbridge006dc842019-11-22 02:05:32 +0000157 rpc = self.agent.connection_manager.get_rpc_client()
158 if rpc is not None:
159 try:
160 meters = yield rpc.list_meters(self.device_id)
161 self.cxn.send(ofp.message.meter_stats_reply(
162 xid=req.xid, entries=[to_loxi(m.stats) for m in meters]))
163 except Exception, e:
164 log.exception("failed-meter-stats-request", req=req, e=e)
Manikkaraj kb1a10922019-07-29 12:10:34 -0400165
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500166 def handle_get_async_request(self, req):
167 raise NotImplementedError()
168
169 def handle_get_config_request(self, req):
170 self.cxn.send(ofp.message.get_config_reply(
171 xid=req.xid,
172 miss_send_len=ofp.OFPCML_NO_BUFFER
173 ))
174
175 @inlineCallbacks
176 def handle_group_mod_request(self, req):
David Bainbridge006dc842019-11-22 02:05:32 +0000177 rpc = self.agent.connection_manager.get_rpc_client()
178 if rpc is not None:
179 if self.role == ofp.OFPCR_ROLE_MASTER or self.role == ofp.OFPCR_ROLE_EQUAL:
180 yield rpc.update_group_table(self.device_id, to_grpc(req))
181 elif self.role == ofp.OFPCR_ROLE_SLAVE:
182 self.cxn.send(ofp.message.bad_request_error_msg(code=ofp.OFPBRC_IS_SLAVE))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500183
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500184 def handle_role_request(self, req):
185 if req.role == ofp.OFPCR_ROLE_MASTER or req.role == ofp.OFPCR_ROLE_SLAVE:
186 if self.agent.generation_is_defined and (
187 ((req.generation_id - self.agent.cached_generation_id) & 0xffffffffffffffff) if abs(
188 req.generation_id - self.agent.cached_generation_id) > 0x7fffffffffffffff else (
189 req.generation_id - self.agent.cached_generation_id)) < 0:
190 self.cxn.send(ofp.message.bad_request_error_msg(code=ofp.OFPRRFC_STALE))
191 else:
192 self.agent.generation_is_defined = True
193 self.agent.cached_generation_id = req.generation_id
194 self.role = req.role
195 self.cxn.send(ofp.message.role_reply(
196 xid=req.xid, role=req.role, generation_id=req.generation_id))
197 elif req.role == ofp.OFPCR_ROLE_EQUAL:
198 self.role = req.role
199 self.cxn.send(ofp.message.role_reply(
200 xid=req.xid, role=req.role))
201
202 def handle_packet_out_request(self, req):
David Bainbridge006dc842019-11-22 02:05:32 +0000203 rpc = self.agent.connection_manager.get_rpc_client()
204 if rpc is not None:
205 if self.role == ofp.OFPCR_ROLE_MASTER or self.role == ofp.OFPCR_ROLE_EQUAL:
206 rpc.send_packet_out(self.device_id, to_grpc(req))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500207
David Bainbridge006dc842019-11-22 02:05:32 +0000208 elif self.role == ofp.OFPCR_ROLE_SLAVE:
209 self.cxn.send(ofp.message.bad_request_error_msg(code=ofp.OFPBRC_IS_SLAVE))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500210
211 def handle_set_config_request(self, req):
212 # Handle set config appropriately
213 # https://jira.opencord.org/browse/CORD-826
214 pass
215
216 @inlineCallbacks
217 def handle_port_mod_request(self, req):
David Bainbridge006dc842019-11-22 02:05:32 +0000218 rpc = self.agent.connection_manager.get_rpc_client()
219 if rpc is not None:
220 if self.role == ofp.OFPCR_ROLE_MASTER or self.role == ofp.OFPCR_ROLE_EQUAL:
221 port = yield rpc.get_port(self.device_id, str(req.port_no))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500222
David Bainbridge006dc842019-11-22 02:05:32 +0000223 if port.ofp_port.config & ofp.OFPPC_PORT_DOWN != \
224 req.config & ofp.OFPPC_PORT_DOWN:
225 if req.config & ofp.OFPPC_PORT_DOWN:
226 rpc.disable_port(self.device_id, port.id)
227 else:
228 rpc.enable_port(self.device_id, port.id)
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500229
David Bainbridge006dc842019-11-22 02:05:32 +0000230 elif self.role == ofp.OFPCR_ROLE_SLAVE:
231 self.cxn.send(ofp.message.bad_request_error_msg(code=ofp.OFPBRC_IS_SLAVE))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500232
233 def handle_table_mod_request(self, req):
234 raise NotImplementedError()
235
236 def handle_queue_get_config_request(self, req):
237 raise NotImplementedError()
238
239 def handle_set_async_request(self, req):
240 raise NotImplementedError()
241
242 def handle_aggregate_request(self, req):
243 raise NotImplementedError
244
245 @inlineCallbacks
246 def handle_device_description_request(self, req):
David Bainbridge006dc842019-11-22 02:05:32 +0000247 rpc = self.agent.connection_manager.get_rpc_client()
248 if rpc is not None:
249 device_info = yield rpc.get_device_info(self.device_id)
250 kw = pb2dict(device_info.desc)
251 self.cxn.send(ofp.message.desc_stats_reply(xid=req.xid, **kw))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500252
253 def handle_experimenter_stats_request(self, req):
254 raise NotImplementedError()
255
256 @inlineCallbacks
257 def handle_flow_stats_request(self, req):
David Bainbridge006dc842019-11-22 02:05:32 +0000258 rpc = self.agent.connection_manager.get_rpc_client()
259 if rpc is not None:
260 try:
261 flow_stats = yield rpc.list_flows(self.device_id)
262 self.cxn.send(ofp.message.flow_stats_reply(
263 xid=req.xid, entries=[to_loxi(f) for f in flow_stats]))
264 except Exception, e:
265 log.exception('failed-flow-stats-request', req=req)
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500266
267 @inlineCallbacks
268 def handle_group_stats_request(self, req):
David Bainbridge006dc842019-11-22 02:05:32 +0000269 rpc = self.agent.connection_manager.get_rpc_client()
270 if rpc is not None:
271 group_stats = yield rpc.list_groups(self.device_id)
272 self.cxn.send(ofp.message.group_stats_reply(
273 xid=req.xid, entries=[to_loxi(g.stats) for g in group_stats]))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500274
275 @inlineCallbacks
276 def handle_group_descriptor_request(self, req):
David Bainbridge006dc842019-11-22 02:05:32 +0000277 rpc = self.agent.connection_manager.get_rpc_client()
278 if rpc is not None:
279 group_stats = yield rpc.list_groups(self.device_id)
280 self.cxn.send(ofp.message.group_desc_stats_reply(
281 xid=req.xid, entries=[to_loxi(g.desc) for g in group_stats]))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500282
283 def handle_group_features_request(self, req):
284 raise NotImplementedError()
285
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500286 def handle_meter_config_request(self, req):
287 raise NotImplementedError()
288
289 def handle_meter_features_request(self, req):
Manikkaraj kb1a10922019-07-29 12:10:34 -0400290 feature = ofp.meter_features(max_meter=OpenFlowProtocolHandler.MAX_METER_IDS,
291 band_types=ofp.OFPMBT_DROP,
292 capabilities=ofp.OFPMF_KBPS,
293 max_bands=OpenFlowProtocolHandler.MAX_METER_BANDS,
294 max_color=OpenFlowProtocolHandler.MAX_METER_COLORS)
295 self.cxn.send(ofp.message.meter_features_stats_reply(xid=req.xid, flags=None,
296 features=feature))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500297
298 @inlineCallbacks
299 def handle_port_stats_request(self, req):
David Bainbridge006dc842019-11-22 02:05:32 +0000300 rpc = self.agent.connection_manager.get_rpc_client()
301 if rpc is not None:
302 try:
303 ports = yield rpc.list_ports(self.device_id)
304 port_stats = [to_loxi(p.ofp_port_stats) for p in ports]
305 of_message = ofp.message.port_stats_reply(
306 xid=req.xid,entries=port_stats)
307 self.cxn.send(of_message)
308 except:
309 log.exception('failed-port_stats-request', req=req)
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500310
311 @inlineCallbacks
312 def handle_port_desc_request(self, req):
David Bainbridge006dc842019-11-22 02:05:32 +0000313 rpc = self.agent.connection_manager.get_rpc_client()
314 if rpc is not None:
315 port_list = yield rpc.get_port_list(self.device_id)
316 try:
317 self.cxn.send(ofp.message.port_desc_stats_reply(
318 xid=req.xid,
319 #flags=None,
320 entries=[to_loxi(port.ofp_port) for port in port_list]
321 ))
322 except Exception as err:
323 log.exception('failed-port-desc-reply', err=err)
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500324
325 def handle_queue_stats_request(self, req):
326 raise NotImplementedError()
327
328 def handle_table_stats_request(self, req):
329 table_stats = [] # see https://jira.opencord.org/browse/CORD-825
330 self.cxn.send(ofp.message.table_stats_reply(
331 xid=req.xid, entries=table_stats))
332
333 def handle_table_features_request(self, req):
334 raise NotImplementedError()
335
336 stats_handlers = {
337 ofp.OFPST_AGGREGATE: handle_aggregate_request,
338 ofp.OFPST_DESC: handle_device_description_request,
339 ofp.OFPST_EXPERIMENTER: handle_experimenter_stats_request,
340 ofp.OFPST_FLOW: handle_flow_stats_request,
341 ofp.OFPST_GROUP: handle_group_stats_request,
342 ofp.OFPST_GROUP_DESC: handle_group_descriptor_request,
343 ofp.OFPST_GROUP_FEATURES: handle_group_features_request,
344 ofp.OFPST_METER: handle_meter_stats_request,
345 ofp.OFPST_METER_CONFIG: handle_meter_config_request,
346 ofp.OFPST_METER_FEATURES: handle_meter_features_request,
347 ofp.OFPST_PORT: handle_port_stats_request,
348 ofp.OFPST_PORT_DESC: handle_port_desc_request,
349 ofp.OFPST_QUEUE: handle_queue_stats_request,
350 ofp.OFPST_TABLE: handle_table_stats_request,
351 ofp.OFPST_TABLE_FEATURES: handle_table_features_request
352 }
353
354 main_handlers = {
355 ofp.OFPT_BARRIER_REQUEST: handle_barrier_request,
356 ofp.OFPT_ECHO_REQUEST: handle_echo_request,
357 ofp.OFPT_FEATURES_REQUEST: handle_feature_request,
358 ofp.OFPT_EXPERIMENTER: handle_experimenter_request,
359 ofp.OFPT_FLOW_MOD: handle_flow_mod_request,
360 ofp.OFPT_GET_ASYNC_REQUEST: handle_get_async_request,
361 ofp.OFPT_GET_CONFIG_REQUEST: handle_get_config_request,
362 ofp.OFPT_GROUP_MOD: handle_group_mod_request,
363 ofp.OFPT_METER_MOD: handle_meter_mod_request,
364 ofp.OFPT_PACKET_OUT: handle_packet_out_request,
365 ofp.OFPT_PORT_MOD: handle_port_mod_request,
366 ofp.OFPT_QUEUE_GET_CONFIG_REQUEST: handle_queue_get_config_request,
367 ofp.OFPT_ROLE_REQUEST: handle_role_request,
368 ofp.OFPT_SET_ASYNC: handle_set_async_request,
369 ofp.OFPT_SET_CONFIG: handle_set_config_request,
370 ofp.OFPT_STATS_REQUEST: handle_stats_request,
371 ofp.OFPT_TABLE_MOD: handle_table_mod_request,
372 }
373
374 def forward_packet_in(self, ofp_packet_in):
375 if self.role == ofp.OFPCR_ROLE_MASTER or self.role == ofp.OFPCR_ROLE_EQUAL:
Matteo Scandolo360605d2019-11-05 18:29:17 -0800376 log.info('sending-packet-in', ofp_packet_in=ofp_packet_in, packet=hexlify(ofp_packet_in.data))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500377 self.cxn.send(to_loxi(ofp_packet_in))
378
379 def forward_port_status(self, ofp_port_status):
380 self.cxn.send(to_loxi(ofp_port_status))