blob: a4b3f706888134520a66055a40e21bfffe497385 [file] [log] [blame]
Stephane Barbarie6e1bd502018-11-05 22:44:45 -05001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import structlog
17from twisted.internet.defer import inlineCallbacks, returnValue
18
19import loxi.of13 as ofp
20from converter import to_loxi, pb2dict, to_grpc
Matteo Scandolo360605d2019-11-05 18:29:17 -080021from binascii import hexlify
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050022
23log = structlog.get_logger()
24
25
26class OpenFlowProtocolError(Exception): pass
27
28
29class OpenFlowProtocolHandler(object):
30
31 ofp_version = [4] # OFAgent supported versions
32
Manikkaraj kb1a10922019-07-29 12:10:34 -040033 MAX_METER_IDS = 4294967295
34 MAX_METER_BANDS = 255
35 MAX_METER_COLORS = 255
36
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050037 def __init__(self, datapath_id, device_id, agent, cxn, rpc):
38 """
39 The upper half of the OpenFlow protocol, focusing on message
40 exchanges.
41 :param agent: Reference to the Agent() instance, can be used to
42 indicate critical errors to break the connection.
43 :param cxn: The lower level message serdes part of the OF protocol.
44 :param rpc: The application level stub on which RPC calls
45 are made as result of processing incoming OpenFlow request messages.
46 """
47 self.datapath_id = datapath_id
48 self.device_id = device_id
49 self.agent = agent
50 self.cxn = cxn
51 self.rpc = rpc
52 self.role = None
53
54 @inlineCallbacks
55 def start(self):
56 """A new call is made after a fresh reconnect"""
57
58 log.debug('starting')
59
60 try:
61 support = False
62 # send initial hello message
63 self.cxn.send(ofp.message.hello(elements=[ofp.common.hello_elem_versionbitmap(
64 bitmaps = [ofp.common.hello_elem_bitmap(self.ofp_version)])]))
65 # expect to receive a hello message
66 msg = yield self.cxn.recv_class(ofp.message.hello)
67 # supports only ofp_versions till 31 and single bitmap.
68 if msg:
69 support = ofp.util.verify_version_support(msg,self.ofp_version)
70 if not support:
71 self.cxn.send(ofp.message.hello_failed_error_msg(
72 xid=msg.xid, code=ofp.OFPHFC_INCOMPATIBLE,
73 data='i support only 1.3'))
74 log.error('peer-do-not-support-OpenFlow-version',self.ofp_version)
75
76 while support:
77 req = yield self.cxn.recv_any()
78 handler = self.main_handlers.get(req.type, None)
79 if handler:
80 handler(self, req)
81 else:
82 log.error('cannot-handle',
83 request=req, xid=req.xid, type=req.type)
84
85 except Exception, e:
86 log.exception('exception', e=e)
87
88 log.info('started')
89 returnValue(self)
90
91 def stop(self):
92 log.debug('stopping')
93 pass # nothing to do yet
94 log.info('stopped')
95
96 def handle_echo_request(self, req):
97 self.cxn.send(ofp.message.echo_reply(xid=req.xid))
98
99 @inlineCallbacks
100 def handle_feature_request(self, req):
101 device_info = yield self.rpc.get_device_info(self.device_id)
102 kw = pb2dict(device_info.switch_features)
103 self.cxn.send(ofp.message.features_reply(
104 xid=req.xid,
105 datapath_id=self.datapath_id,
106 **kw))
107
108 def handle_stats_request(self, req):
109 handler = self.stats_handlers.get(req.stats_type, None)
110 if handler:
111 handler(self, req)
112 else:
113 raise OpenFlowProtocolError(
114 'Cannot handle stats request type "{}"'.format(req.stats_type))
115
116 def handle_barrier_request(self, req):
117 # not really doing barrier yet, but we respond
118 # see https://jira.opencord.org/browse/CORD-823
119 self.cxn.send(ofp.message.barrier_reply(xid=req.xid))
120
121 def handle_experimenter_request(self, req):
122 raise NotImplementedError()
123
124 def handle_flow_mod_request(self, req):
125 if self.role == ofp.OFPCR_ROLE_MASTER or self.role == ofp.OFPCR_ROLE_EQUAL:
126 try:
127 grpc_req = to_grpc(req)
128 except Exception, e:
129 log.exception('failed-to-convert', e=e)
130 else:
131 return self.rpc.update_flow_table(self.device_id, grpc_req)
132
133 elif self.role == ofp.OFPCR_ROLE_SLAVE:
134 self.cxn.send(ofp.message.bad_request_error_msg(code=ofp.OFPBRC_IS_SLAVE))
135
Manikkaraj kb1a10922019-07-29 12:10:34 -0400136
137 def handle_meter_mod_request(self, req):
138 log.info('Received handle_meter_mod_request', request=req)
139 if self.role == ofp.OFPCR_ROLE_MASTER or self.role == ofp.OFPCR_ROLE_EQUAL:
140 try:
141 grpc_req = to_grpc(req)
142 except Exception, e:
143 log.exception('failed-to-convert-meter-mod-request', e=e)
144 else:
145 return self.rpc.update_meter_mod_table(self.device_id, grpc_req)
146
147 elif self.role == ofp.OFPCR_ROLE_SLAVE:
148 self.cxn.send(ofp.message.bad_request_error_msg(code=ofp.OFPBRC_IS_SLAVE))
149
150 @inlineCallbacks
151 def handle_meter_stats_request(self, req):
152 log.info('Received handle_meter_stats_request', request=req)
153 try:
154 meters = yield self.rpc.list_meters(self.device_id)
155 self.cxn.send(ofp.message.meter_stats_reply(
156 xid=req.xid, entries=[to_loxi(m.stats) for m in meters]))
157 except Exception, e:
158 log.exception("failed-meter-stats-request", req=req, e=e)
159
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500160 def handle_get_async_request(self, req):
161 raise NotImplementedError()
162
163 def handle_get_config_request(self, req):
164 self.cxn.send(ofp.message.get_config_reply(
165 xid=req.xid,
166 miss_send_len=ofp.OFPCML_NO_BUFFER
167 ))
168
169 @inlineCallbacks
170 def handle_group_mod_request(self, req):
171 if self.role == ofp.OFPCR_ROLE_MASTER or self.role == ofp.OFPCR_ROLE_EQUAL:
172 yield self.rpc.update_group_table(self.device_id, to_grpc(req))
173 elif self.role == ofp.OFPCR_ROLE_SLAVE:
174 self.cxn.send(ofp.message.bad_request_error_msg(code=ofp.OFPBRC_IS_SLAVE))
175
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500176 def handle_role_request(self, req):
177 if req.role == ofp.OFPCR_ROLE_MASTER or req.role == ofp.OFPCR_ROLE_SLAVE:
178 if self.agent.generation_is_defined and (
179 ((req.generation_id - self.agent.cached_generation_id) & 0xffffffffffffffff) if abs(
180 req.generation_id - self.agent.cached_generation_id) > 0x7fffffffffffffff else (
181 req.generation_id - self.agent.cached_generation_id)) < 0:
182 self.cxn.send(ofp.message.bad_request_error_msg(code=ofp.OFPRRFC_STALE))
183 else:
184 self.agent.generation_is_defined = True
185 self.agent.cached_generation_id = req.generation_id
186 self.role = req.role
187 self.cxn.send(ofp.message.role_reply(
188 xid=req.xid, role=req.role, generation_id=req.generation_id))
189 elif req.role == ofp.OFPCR_ROLE_EQUAL:
190 self.role = req.role
191 self.cxn.send(ofp.message.role_reply(
192 xid=req.xid, role=req.role))
193
194 def handle_packet_out_request(self, req):
195 if self.role == ofp.OFPCR_ROLE_MASTER or self.role == ofp.OFPCR_ROLE_EQUAL:
196 self.rpc.send_packet_out(self.device_id, to_grpc(req))
197
198 elif self.role == ofp.OFPCR_ROLE_SLAVE:
199 self.cxn.send(ofp.message.bad_request_error_msg(code=ofp.OFPBRC_IS_SLAVE))
200
201 def handle_set_config_request(self, req):
202 # Handle set config appropriately
203 # https://jira.opencord.org/browse/CORD-826
204 pass
205
206 @inlineCallbacks
207 def handle_port_mod_request(self, req):
208 if self.role == ofp.OFPCR_ROLE_MASTER or self.role == ofp.OFPCR_ROLE_EQUAL:
209 port = yield self.rpc.get_port(self.device_id, str(req.port_no))
210
211 if port.ofp_port.config & ofp.OFPPC_PORT_DOWN != \
212 req.config & ofp.OFPPC_PORT_DOWN:
213 if req.config & ofp.OFPPC_PORT_DOWN:
214 self.rpc.disable_port(self.device_id, port.id)
215 else:
216 self.rpc.enable_port(self.device_id, port.id)
217
218 elif self.role == ofp.OFPCR_ROLE_SLAVE:
219 self.cxn.send(ofp.message.bad_request_error_msg(code=ofp.OFPBRC_IS_SLAVE))
220
221 def handle_table_mod_request(self, req):
222 raise NotImplementedError()
223
224 def handle_queue_get_config_request(self, req):
225 raise NotImplementedError()
226
227 def handle_set_async_request(self, req):
228 raise NotImplementedError()
229
230 def handle_aggregate_request(self, req):
231 raise NotImplementedError
232
233 @inlineCallbacks
234 def handle_device_description_request(self, req):
235 device_info = yield self.rpc.get_device_info(self.device_id)
236 kw = pb2dict(device_info.desc)
237 self.cxn.send(ofp.message.desc_stats_reply(xid=req.xid, **kw))
238
239 def handle_experimenter_stats_request(self, req):
240 raise NotImplementedError()
241
242 @inlineCallbacks
243 def handle_flow_stats_request(self, req):
244 try:
245 flow_stats = yield self.rpc.list_flows(self.device_id)
246 self.cxn.send(ofp.message.flow_stats_reply(
247 xid=req.xid, entries=[to_loxi(f) for f in flow_stats]))
248 except Exception, e:
249 log.exception('failed-flow-stats-request', req=req)
250
251 @inlineCallbacks
252 def handle_group_stats_request(self, req):
253 group_stats = yield self.rpc.list_groups(self.device_id)
254 self.cxn.send(ofp.message.group_stats_reply(
255 xid=req.xid, entries=[to_loxi(g.stats) for g in group_stats]))
256
257 @inlineCallbacks
258 def handle_group_descriptor_request(self, req):
259 group_stats = yield self.rpc.list_groups(self.device_id)
260 self.cxn.send(ofp.message.group_desc_stats_reply(
261 xid=req.xid, entries=[to_loxi(g.desc) for g in group_stats]))
262
263 def handle_group_features_request(self, req):
264 raise NotImplementedError()
265
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500266 def handle_meter_config_request(self, req):
267 raise NotImplementedError()
268
269 def handle_meter_features_request(self, req):
Manikkaraj kb1a10922019-07-29 12:10:34 -0400270 feature = ofp.meter_features(max_meter=OpenFlowProtocolHandler.MAX_METER_IDS,
271 band_types=ofp.OFPMBT_DROP,
272 capabilities=ofp.OFPMF_KBPS,
273 max_bands=OpenFlowProtocolHandler.MAX_METER_BANDS,
274 max_color=OpenFlowProtocolHandler.MAX_METER_COLORS)
275 self.cxn.send(ofp.message.meter_features_stats_reply(xid=req.xid, flags=None,
276 features=feature))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500277
278 @inlineCallbacks
279 def handle_port_stats_request(self, req):
280 try:
281 ports = yield self.rpc.list_ports(self.device_id)
282 port_stats = [to_loxi(p.ofp_port_stats) for p in ports]
283 of_message = ofp.message.port_stats_reply(
284 xid=req.xid,entries=port_stats)
285 self.cxn.send(of_message)
286 except:
287 log.exception('failed-port_stats-request', req=req)
288
289 @inlineCallbacks
290 def handle_port_desc_request(self, req):
291 port_list = yield self.rpc.get_port_list(self.device_id)
Matt Jeanneret3815e322019-03-12 19:15:49 -0400292 try:
293 self.cxn.send(ofp.message.port_desc_stats_reply(
294 xid=req.xid,
295 #flags=None,
296 entries=[to_loxi(port.ofp_port) for port in port_list]
297 ))
298 except Exception as err:
299 log.exception('failed-port-desc-reply', err=err)
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500300
301 def handle_queue_stats_request(self, req):
302 raise NotImplementedError()
303
304 def handle_table_stats_request(self, req):
305 table_stats = [] # see https://jira.opencord.org/browse/CORD-825
306 self.cxn.send(ofp.message.table_stats_reply(
307 xid=req.xid, entries=table_stats))
308
309 def handle_table_features_request(self, req):
310 raise NotImplementedError()
311
312 stats_handlers = {
313 ofp.OFPST_AGGREGATE: handle_aggregate_request,
314 ofp.OFPST_DESC: handle_device_description_request,
315 ofp.OFPST_EXPERIMENTER: handle_experimenter_stats_request,
316 ofp.OFPST_FLOW: handle_flow_stats_request,
317 ofp.OFPST_GROUP: handle_group_stats_request,
318 ofp.OFPST_GROUP_DESC: handle_group_descriptor_request,
319 ofp.OFPST_GROUP_FEATURES: handle_group_features_request,
320 ofp.OFPST_METER: handle_meter_stats_request,
321 ofp.OFPST_METER_CONFIG: handle_meter_config_request,
322 ofp.OFPST_METER_FEATURES: handle_meter_features_request,
323 ofp.OFPST_PORT: handle_port_stats_request,
324 ofp.OFPST_PORT_DESC: handle_port_desc_request,
325 ofp.OFPST_QUEUE: handle_queue_stats_request,
326 ofp.OFPST_TABLE: handle_table_stats_request,
327 ofp.OFPST_TABLE_FEATURES: handle_table_features_request
328 }
329
330 main_handlers = {
331 ofp.OFPT_BARRIER_REQUEST: handle_barrier_request,
332 ofp.OFPT_ECHO_REQUEST: handle_echo_request,
333 ofp.OFPT_FEATURES_REQUEST: handle_feature_request,
334 ofp.OFPT_EXPERIMENTER: handle_experimenter_request,
335 ofp.OFPT_FLOW_MOD: handle_flow_mod_request,
336 ofp.OFPT_GET_ASYNC_REQUEST: handle_get_async_request,
337 ofp.OFPT_GET_CONFIG_REQUEST: handle_get_config_request,
338 ofp.OFPT_GROUP_MOD: handle_group_mod_request,
339 ofp.OFPT_METER_MOD: handle_meter_mod_request,
340 ofp.OFPT_PACKET_OUT: handle_packet_out_request,
341 ofp.OFPT_PORT_MOD: handle_port_mod_request,
342 ofp.OFPT_QUEUE_GET_CONFIG_REQUEST: handle_queue_get_config_request,
343 ofp.OFPT_ROLE_REQUEST: handle_role_request,
344 ofp.OFPT_SET_ASYNC: handle_set_async_request,
345 ofp.OFPT_SET_CONFIG: handle_set_config_request,
346 ofp.OFPT_STATS_REQUEST: handle_stats_request,
347 ofp.OFPT_TABLE_MOD: handle_table_mod_request,
348 }
349
350 def forward_packet_in(self, ofp_packet_in):
351 if self.role == ofp.OFPCR_ROLE_MASTER or self.role == ofp.OFPCR_ROLE_EQUAL:
Matteo Scandolo360605d2019-11-05 18:29:17 -0800352 log.info('sending-packet-in', ofp_packet_in=ofp_packet_in, packet=hexlify(ofp_packet_in.data))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500353 self.cxn.send(to_loxi(ofp_packet_in))
354
355 def forward_port_status(self, ofp_port_status):
356 self.cxn.send(to_loxi(ofp_port_status))