blob: e90ca5b1fdda9fdaf8fcfd8db4d987b28a285156 [file] [log] [blame]
Stephane Barbarie6e1bd502018-11-05 22:44:45 -05001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import structlog
17from twisted.internet.defer import inlineCallbacks, returnValue
18
19import loxi.of13 as ofp
20from converter import to_loxi, pb2dict, to_grpc
21
22log = structlog.get_logger()
23
24
25class OpenFlowProtocolError(Exception): pass
26
27
28class OpenFlowProtocolHandler(object):
29
30 ofp_version = [4] # OFAgent supported versions
31
Manikkaraj kb1a10922019-07-29 12:10:34 -040032 MAX_METER_IDS = 4294967295
33 MAX_METER_BANDS = 255
34 MAX_METER_COLORS = 255
35
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050036 def __init__(self, datapath_id, device_id, agent, cxn, rpc):
37 """
38 The upper half of the OpenFlow protocol, focusing on message
39 exchanges.
40 :param agent: Reference to the Agent() instance, can be used to
41 indicate critical errors to break the connection.
42 :param cxn: The lower level message serdes part of the OF protocol.
43 :param rpc: The application level stub on which RPC calls
44 are made as result of processing incoming OpenFlow request messages.
45 """
46 self.datapath_id = datapath_id
47 self.device_id = device_id
48 self.agent = agent
49 self.cxn = cxn
50 self.rpc = rpc
51 self.role = None
52
53 @inlineCallbacks
54 def start(self):
55 """A new call is made after a fresh reconnect"""
56
57 log.debug('starting')
58
59 try:
60 support = False
61 # send initial hello message
62 self.cxn.send(ofp.message.hello(elements=[ofp.common.hello_elem_versionbitmap(
63 bitmaps = [ofp.common.hello_elem_bitmap(self.ofp_version)])]))
64 # expect to receive a hello message
65 msg = yield self.cxn.recv_class(ofp.message.hello)
66 # supports only ofp_versions till 31 and single bitmap.
67 if msg:
68 support = ofp.util.verify_version_support(msg,self.ofp_version)
69 if not support:
70 self.cxn.send(ofp.message.hello_failed_error_msg(
71 xid=msg.xid, code=ofp.OFPHFC_INCOMPATIBLE,
72 data='i support only 1.3'))
73 log.error('peer-do-not-support-OpenFlow-version',self.ofp_version)
74
75 while support:
76 req = yield self.cxn.recv_any()
77 handler = self.main_handlers.get(req.type, None)
78 if handler:
79 handler(self, req)
80 else:
81 log.error('cannot-handle',
82 request=req, xid=req.xid, type=req.type)
83
84 except Exception, e:
85 log.exception('exception', e=e)
86
87 log.info('started')
88 returnValue(self)
89
90 def stop(self):
91 log.debug('stopping')
92 pass # nothing to do yet
93 log.info('stopped')
94
95 def handle_echo_request(self, req):
96 self.cxn.send(ofp.message.echo_reply(xid=req.xid))
97
98 @inlineCallbacks
99 def handle_feature_request(self, req):
100 device_info = yield self.rpc.get_device_info(self.device_id)
101 kw = pb2dict(device_info.switch_features)
102 self.cxn.send(ofp.message.features_reply(
103 xid=req.xid,
104 datapath_id=self.datapath_id,
105 **kw))
106
107 def handle_stats_request(self, req):
108 handler = self.stats_handlers.get(req.stats_type, None)
109 if handler:
110 handler(self, req)
111 else:
112 raise OpenFlowProtocolError(
113 'Cannot handle stats request type "{}"'.format(req.stats_type))
114
115 def handle_barrier_request(self, req):
116 # not really doing barrier yet, but we respond
117 # see https://jira.opencord.org/browse/CORD-823
118 self.cxn.send(ofp.message.barrier_reply(xid=req.xid))
119
120 def handle_experimenter_request(self, req):
121 raise NotImplementedError()
122
123 def handle_flow_mod_request(self, req):
124 if self.role == ofp.OFPCR_ROLE_MASTER or self.role == ofp.OFPCR_ROLE_EQUAL:
125 try:
126 grpc_req = to_grpc(req)
127 except Exception, e:
128 log.exception('failed-to-convert', e=e)
129 else:
130 return self.rpc.update_flow_table(self.device_id, grpc_req)
131
132 elif self.role == ofp.OFPCR_ROLE_SLAVE:
133 self.cxn.send(ofp.message.bad_request_error_msg(code=ofp.OFPBRC_IS_SLAVE))
134
Manikkaraj kb1a10922019-07-29 12:10:34 -0400135
136 def handle_meter_mod_request(self, req):
137 log.info('Received handle_meter_mod_request', request=req)
138 if self.role == ofp.OFPCR_ROLE_MASTER or self.role == ofp.OFPCR_ROLE_EQUAL:
139 try:
140 grpc_req = to_grpc(req)
141 except Exception, e:
142 log.exception('failed-to-convert-meter-mod-request', e=e)
143 else:
144 return self.rpc.update_meter_mod_table(self.device_id, grpc_req)
145
146 elif self.role == ofp.OFPCR_ROLE_SLAVE:
147 self.cxn.send(ofp.message.bad_request_error_msg(code=ofp.OFPBRC_IS_SLAVE))
148
149 @inlineCallbacks
150 def handle_meter_stats_request(self, req):
151 log.info('Received handle_meter_stats_request', request=req)
152 try:
153 meters = yield self.rpc.list_meters(self.device_id)
154 self.cxn.send(ofp.message.meter_stats_reply(
155 xid=req.xid, entries=[to_loxi(m.stats) for m in meters]))
156 except Exception, e:
157 log.exception("failed-meter-stats-request", req=req, e=e)
158
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500159 def handle_get_async_request(self, req):
160 raise NotImplementedError()
161
162 def handle_get_config_request(self, req):
163 self.cxn.send(ofp.message.get_config_reply(
164 xid=req.xid,
165 miss_send_len=ofp.OFPCML_NO_BUFFER
166 ))
167
168 @inlineCallbacks
169 def handle_group_mod_request(self, req):
170 if self.role == ofp.OFPCR_ROLE_MASTER or self.role == ofp.OFPCR_ROLE_EQUAL:
171 yield self.rpc.update_group_table(self.device_id, to_grpc(req))
172 elif self.role == ofp.OFPCR_ROLE_SLAVE:
173 self.cxn.send(ofp.message.bad_request_error_msg(code=ofp.OFPBRC_IS_SLAVE))
174
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500175 def handle_role_request(self, req):
176 if req.role == ofp.OFPCR_ROLE_MASTER or req.role == ofp.OFPCR_ROLE_SLAVE:
177 if self.agent.generation_is_defined and (
178 ((req.generation_id - self.agent.cached_generation_id) & 0xffffffffffffffff) if abs(
179 req.generation_id - self.agent.cached_generation_id) > 0x7fffffffffffffff else (
180 req.generation_id - self.agent.cached_generation_id)) < 0:
181 self.cxn.send(ofp.message.bad_request_error_msg(code=ofp.OFPRRFC_STALE))
182 else:
183 self.agent.generation_is_defined = True
184 self.agent.cached_generation_id = req.generation_id
185 self.role = req.role
186 self.cxn.send(ofp.message.role_reply(
187 xid=req.xid, role=req.role, generation_id=req.generation_id))
188 elif req.role == ofp.OFPCR_ROLE_EQUAL:
189 self.role = req.role
190 self.cxn.send(ofp.message.role_reply(
191 xid=req.xid, role=req.role))
192
193 def handle_packet_out_request(self, req):
194 if self.role == ofp.OFPCR_ROLE_MASTER or self.role == ofp.OFPCR_ROLE_EQUAL:
195 self.rpc.send_packet_out(self.device_id, to_grpc(req))
196
197 elif self.role == ofp.OFPCR_ROLE_SLAVE:
198 self.cxn.send(ofp.message.bad_request_error_msg(code=ofp.OFPBRC_IS_SLAVE))
199
200 def handle_set_config_request(self, req):
201 # Handle set config appropriately
202 # https://jira.opencord.org/browse/CORD-826
203 pass
204
205 @inlineCallbacks
206 def handle_port_mod_request(self, req):
207 if self.role == ofp.OFPCR_ROLE_MASTER or self.role == ofp.OFPCR_ROLE_EQUAL:
208 port = yield self.rpc.get_port(self.device_id, str(req.port_no))
209
210 if port.ofp_port.config & ofp.OFPPC_PORT_DOWN != \
211 req.config & ofp.OFPPC_PORT_DOWN:
212 if req.config & ofp.OFPPC_PORT_DOWN:
213 self.rpc.disable_port(self.device_id, port.id)
214 else:
215 self.rpc.enable_port(self.device_id, port.id)
216
217 elif self.role == ofp.OFPCR_ROLE_SLAVE:
218 self.cxn.send(ofp.message.bad_request_error_msg(code=ofp.OFPBRC_IS_SLAVE))
219
220 def handle_table_mod_request(self, req):
221 raise NotImplementedError()
222
223 def handle_queue_get_config_request(self, req):
224 raise NotImplementedError()
225
226 def handle_set_async_request(self, req):
227 raise NotImplementedError()
228
229 def handle_aggregate_request(self, req):
230 raise NotImplementedError
231
232 @inlineCallbacks
233 def handle_device_description_request(self, req):
234 device_info = yield self.rpc.get_device_info(self.device_id)
235 kw = pb2dict(device_info.desc)
236 self.cxn.send(ofp.message.desc_stats_reply(xid=req.xid, **kw))
237
238 def handle_experimenter_stats_request(self, req):
239 raise NotImplementedError()
240
241 @inlineCallbacks
242 def handle_flow_stats_request(self, req):
243 try:
244 flow_stats = yield self.rpc.list_flows(self.device_id)
245 self.cxn.send(ofp.message.flow_stats_reply(
246 xid=req.xid, entries=[to_loxi(f) for f in flow_stats]))
247 except Exception, e:
248 log.exception('failed-flow-stats-request', req=req)
249
250 @inlineCallbacks
251 def handle_group_stats_request(self, req):
252 group_stats = yield self.rpc.list_groups(self.device_id)
253 self.cxn.send(ofp.message.group_stats_reply(
254 xid=req.xid, entries=[to_loxi(g.stats) for g in group_stats]))
255
256 @inlineCallbacks
257 def handle_group_descriptor_request(self, req):
258 group_stats = yield self.rpc.list_groups(self.device_id)
259 self.cxn.send(ofp.message.group_desc_stats_reply(
260 xid=req.xid, entries=[to_loxi(g.desc) for g in group_stats]))
261
262 def handle_group_features_request(self, req):
263 raise NotImplementedError()
264
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500265 def handle_meter_config_request(self, req):
266 raise NotImplementedError()
267
268 def handle_meter_features_request(self, req):
Manikkaraj kb1a10922019-07-29 12:10:34 -0400269 feature = ofp.meter_features(max_meter=OpenFlowProtocolHandler.MAX_METER_IDS,
270 band_types=ofp.OFPMBT_DROP,
271 capabilities=ofp.OFPMF_KBPS,
272 max_bands=OpenFlowProtocolHandler.MAX_METER_BANDS,
273 max_color=OpenFlowProtocolHandler.MAX_METER_COLORS)
274 self.cxn.send(ofp.message.meter_features_stats_reply(xid=req.xid, flags=None,
275 features=feature))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500276
277 @inlineCallbacks
278 def handle_port_stats_request(self, req):
279 try:
280 ports = yield self.rpc.list_ports(self.device_id)
281 port_stats = [to_loxi(p.ofp_port_stats) for p in ports]
282 of_message = ofp.message.port_stats_reply(
283 xid=req.xid,entries=port_stats)
284 self.cxn.send(of_message)
285 except:
286 log.exception('failed-port_stats-request', req=req)
287
288 @inlineCallbacks
289 def handle_port_desc_request(self, req):
290 port_list = yield self.rpc.get_port_list(self.device_id)
Matt Jeanneret3815e322019-03-12 19:15:49 -0400291 try:
292 self.cxn.send(ofp.message.port_desc_stats_reply(
293 xid=req.xid,
294 #flags=None,
295 entries=[to_loxi(port.ofp_port) for port in port_list]
296 ))
297 except Exception as err:
298 log.exception('failed-port-desc-reply', err=err)
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500299
300 def handle_queue_stats_request(self, req):
301 raise NotImplementedError()
302
303 def handle_table_stats_request(self, req):
304 table_stats = [] # see https://jira.opencord.org/browse/CORD-825
305 self.cxn.send(ofp.message.table_stats_reply(
306 xid=req.xid, entries=table_stats))
307
308 def handle_table_features_request(self, req):
309 raise NotImplementedError()
310
311 stats_handlers = {
312 ofp.OFPST_AGGREGATE: handle_aggregate_request,
313 ofp.OFPST_DESC: handle_device_description_request,
314 ofp.OFPST_EXPERIMENTER: handle_experimenter_stats_request,
315 ofp.OFPST_FLOW: handle_flow_stats_request,
316 ofp.OFPST_GROUP: handle_group_stats_request,
317 ofp.OFPST_GROUP_DESC: handle_group_descriptor_request,
318 ofp.OFPST_GROUP_FEATURES: handle_group_features_request,
319 ofp.OFPST_METER: handle_meter_stats_request,
320 ofp.OFPST_METER_CONFIG: handle_meter_config_request,
321 ofp.OFPST_METER_FEATURES: handle_meter_features_request,
322 ofp.OFPST_PORT: handle_port_stats_request,
323 ofp.OFPST_PORT_DESC: handle_port_desc_request,
324 ofp.OFPST_QUEUE: handle_queue_stats_request,
325 ofp.OFPST_TABLE: handle_table_stats_request,
326 ofp.OFPST_TABLE_FEATURES: handle_table_features_request
327 }
328
329 main_handlers = {
330 ofp.OFPT_BARRIER_REQUEST: handle_barrier_request,
331 ofp.OFPT_ECHO_REQUEST: handle_echo_request,
332 ofp.OFPT_FEATURES_REQUEST: handle_feature_request,
333 ofp.OFPT_EXPERIMENTER: handle_experimenter_request,
334 ofp.OFPT_FLOW_MOD: handle_flow_mod_request,
335 ofp.OFPT_GET_ASYNC_REQUEST: handle_get_async_request,
336 ofp.OFPT_GET_CONFIG_REQUEST: handle_get_config_request,
337 ofp.OFPT_GROUP_MOD: handle_group_mod_request,
338 ofp.OFPT_METER_MOD: handle_meter_mod_request,
339 ofp.OFPT_PACKET_OUT: handle_packet_out_request,
340 ofp.OFPT_PORT_MOD: handle_port_mod_request,
341 ofp.OFPT_QUEUE_GET_CONFIG_REQUEST: handle_queue_get_config_request,
342 ofp.OFPT_ROLE_REQUEST: handle_role_request,
343 ofp.OFPT_SET_ASYNC: handle_set_async_request,
344 ofp.OFPT_SET_CONFIG: handle_set_config_request,
345 ofp.OFPT_STATS_REQUEST: handle_stats_request,
346 ofp.OFPT_TABLE_MOD: handle_table_mod_request,
347 }
348
349 def forward_packet_in(self, ofp_packet_in):
350 if self.role == ofp.OFPCR_ROLE_MASTER or self.role == ofp.OFPCR_ROLE_EQUAL:
351 log.info('sending-packet-in', ofp_packet_in=ofp_packet_in)
352 self.cxn.send(to_loxi(ofp_packet_in))
353
354 def forward_port_status(self, ofp_port_status):
355 self.cxn.send(to_loxi(ofp_port_status))