blob: 8d0946170295a59cb1a133294f55d5284b6e9a10 [file] [log] [blame]
Stephane Barbarie6e1bd502018-11-05 22:44:45 -05001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import structlog
17from twisted.internet.defer import inlineCallbacks, returnValue
18
19import loxi.of13 as ofp
20from converter import to_loxi, pb2dict, to_grpc
21
22log = structlog.get_logger()
23
24
25class OpenFlowProtocolError(Exception): pass
26
27
28class OpenFlowProtocolHandler(object):
29
30 ofp_version = [4] # OFAgent supported versions
31
32 def __init__(self, datapath_id, device_id, agent, cxn, rpc):
33 """
34 The upper half of the OpenFlow protocol, focusing on message
35 exchanges.
36 :param agent: Reference to the Agent() instance, can be used to
37 indicate critical errors to break the connection.
38 :param cxn: The lower level message serdes part of the OF protocol.
39 :param rpc: The application level stub on which RPC calls
40 are made as result of processing incoming OpenFlow request messages.
41 """
42 self.datapath_id = datapath_id
43 self.device_id = device_id
44 self.agent = agent
45 self.cxn = cxn
46 self.rpc = rpc
47 self.role = None
48
49 @inlineCallbacks
50 def start(self):
51 """A new call is made after a fresh reconnect"""
52
53 log.debug('starting')
54
55 try:
56 support = False
57 # send initial hello message
58 self.cxn.send(ofp.message.hello(elements=[ofp.common.hello_elem_versionbitmap(
59 bitmaps = [ofp.common.hello_elem_bitmap(self.ofp_version)])]))
60 # expect to receive a hello message
61 msg = yield self.cxn.recv_class(ofp.message.hello)
62 # supports only ofp_versions till 31 and single bitmap.
63 if msg:
64 support = ofp.util.verify_version_support(msg,self.ofp_version)
65 if not support:
66 self.cxn.send(ofp.message.hello_failed_error_msg(
67 xid=msg.xid, code=ofp.OFPHFC_INCOMPATIBLE,
68 data='i support only 1.3'))
69 log.error('peer-do-not-support-OpenFlow-version',self.ofp_version)
70
71 while support:
72 req = yield self.cxn.recv_any()
73 handler = self.main_handlers.get(req.type, None)
74 if handler:
75 handler(self, req)
76 else:
77 log.error('cannot-handle',
78 request=req, xid=req.xid, type=req.type)
79
80 except Exception, e:
81 log.exception('exception', e=e)
82
83 log.info('started')
84 returnValue(self)
85
86 def stop(self):
87 log.debug('stopping')
88 pass # nothing to do yet
89 log.info('stopped')
90
91 def handle_echo_request(self, req):
92 self.cxn.send(ofp.message.echo_reply(xid=req.xid))
93
94 @inlineCallbacks
95 def handle_feature_request(self, req):
96 device_info = yield self.rpc.get_device_info(self.device_id)
97 kw = pb2dict(device_info.switch_features)
98 self.cxn.send(ofp.message.features_reply(
99 xid=req.xid,
100 datapath_id=self.datapath_id,
101 **kw))
102
103 def handle_stats_request(self, req):
104 handler = self.stats_handlers.get(req.stats_type, None)
105 if handler:
106 handler(self, req)
107 else:
108 raise OpenFlowProtocolError(
109 'Cannot handle stats request type "{}"'.format(req.stats_type))
110
111 def handle_barrier_request(self, req):
112 # not really doing barrier yet, but we respond
113 # see https://jira.opencord.org/browse/CORD-823
114 self.cxn.send(ofp.message.barrier_reply(xid=req.xid))
115
116 def handle_experimenter_request(self, req):
117 raise NotImplementedError()
118
119 def handle_flow_mod_request(self, req):
120 if self.role == ofp.OFPCR_ROLE_MASTER or self.role == ofp.OFPCR_ROLE_EQUAL:
121 try:
122 grpc_req = to_grpc(req)
123 except Exception, e:
124 log.exception('failed-to-convert', e=e)
125 else:
126 return self.rpc.update_flow_table(self.device_id, grpc_req)
127
128 elif self.role == ofp.OFPCR_ROLE_SLAVE:
129 self.cxn.send(ofp.message.bad_request_error_msg(code=ofp.OFPBRC_IS_SLAVE))
130
131 def handle_get_async_request(self, req):
132 raise NotImplementedError()
133
134 def handle_get_config_request(self, req):
135 self.cxn.send(ofp.message.get_config_reply(
136 xid=req.xid,
137 miss_send_len=ofp.OFPCML_NO_BUFFER
138 ))
139
140 @inlineCallbacks
141 def handle_group_mod_request(self, req):
142 if self.role == ofp.OFPCR_ROLE_MASTER or self.role == ofp.OFPCR_ROLE_EQUAL:
143 yield self.rpc.update_group_table(self.device_id, to_grpc(req))
144 elif self.role == ofp.OFPCR_ROLE_SLAVE:
145 self.cxn.send(ofp.message.bad_request_error_msg(code=ofp.OFPBRC_IS_SLAVE))
146
147
148 def handle_meter_mod_request(self, req):
149 raise NotImplementedError()
150
151 def handle_role_request(self, req):
152 if req.role == ofp.OFPCR_ROLE_MASTER or req.role == ofp.OFPCR_ROLE_SLAVE:
153 if self.agent.generation_is_defined and (
154 ((req.generation_id - self.agent.cached_generation_id) & 0xffffffffffffffff) if abs(
155 req.generation_id - self.agent.cached_generation_id) > 0x7fffffffffffffff else (
156 req.generation_id - self.agent.cached_generation_id)) < 0:
157 self.cxn.send(ofp.message.bad_request_error_msg(code=ofp.OFPRRFC_STALE))
158 else:
159 self.agent.generation_is_defined = True
160 self.agent.cached_generation_id = req.generation_id
161 self.role = req.role
162 self.cxn.send(ofp.message.role_reply(
163 xid=req.xid, role=req.role, generation_id=req.generation_id))
164 elif req.role == ofp.OFPCR_ROLE_EQUAL:
165 self.role = req.role
166 self.cxn.send(ofp.message.role_reply(
167 xid=req.xid, role=req.role))
168
169 def handle_packet_out_request(self, req):
170 if self.role == ofp.OFPCR_ROLE_MASTER or self.role == ofp.OFPCR_ROLE_EQUAL:
171 self.rpc.send_packet_out(self.device_id, to_grpc(req))
172
173 elif self.role == ofp.OFPCR_ROLE_SLAVE:
174 self.cxn.send(ofp.message.bad_request_error_msg(code=ofp.OFPBRC_IS_SLAVE))
175
176 def handle_set_config_request(self, req):
177 # Handle set config appropriately
178 # https://jira.opencord.org/browse/CORD-826
179 pass
180
181 @inlineCallbacks
182 def handle_port_mod_request(self, req):
183 if self.role == ofp.OFPCR_ROLE_MASTER or self.role == ofp.OFPCR_ROLE_EQUAL:
184 port = yield self.rpc.get_port(self.device_id, str(req.port_no))
185
186 if port.ofp_port.config & ofp.OFPPC_PORT_DOWN != \
187 req.config & ofp.OFPPC_PORT_DOWN:
188 if req.config & ofp.OFPPC_PORT_DOWN:
189 self.rpc.disable_port(self.device_id, port.id)
190 else:
191 self.rpc.enable_port(self.device_id, port.id)
192
193 elif self.role == ofp.OFPCR_ROLE_SLAVE:
194 self.cxn.send(ofp.message.bad_request_error_msg(code=ofp.OFPBRC_IS_SLAVE))
195
196 def handle_table_mod_request(self, req):
197 raise NotImplementedError()
198
199 def handle_queue_get_config_request(self, req):
200 raise NotImplementedError()
201
202 def handle_set_async_request(self, req):
203 raise NotImplementedError()
204
205 def handle_aggregate_request(self, req):
206 raise NotImplementedError
207
208 @inlineCallbacks
209 def handle_device_description_request(self, req):
210 device_info = yield self.rpc.get_device_info(self.device_id)
211 kw = pb2dict(device_info.desc)
212 self.cxn.send(ofp.message.desc_stats_reply(xid=req.xid, **kw))
213
214 def handle_experimenter_stats_request(self, req):
215 raise NotImplementedError()
216
217 @inlineCallbacks
218 def handle_flow_stats_request(self, req):
219 try:
220 flow_stats = yield self.rpc.list_flows(self.device_id)
221 self.cxn.send(ofp.message.flow_stats_reply(
222 xid=req.xid, entries=[to_loxi(f) for f in flow_stats]))
223 except Exception, e:
224 log.exception('failed-flow-stats-request', req=req)
225
226 @inlineCallbacks
227 def handle_group_stats_request(self, req):
228 group_stats = yield self.rpc.list_groups(self.device_id)
229 self.cxn.send(ofp.message.group_stats_reply(
230 xid=req.xid, entries=[to_loxi(g.stats) for g in group_stats]))
231
232 @inlineCallbacks
233 def handle_group_descriptor_request(self, req):
234 group_stats = yield self.rpc.list_groups(self.device_id)
235 self.cxn.send(ofp.message.group_desc_stats_reply(
236 xid=req.xid, entries=[to_loxi(g.desc) for g in group_stats]))
237
238 def handle_group_features_request(self, req):
239 raise NotImplementedError()
240
241 def handle_meter_stats_request(self, req):
242 meter_stats = [] # see https://jira.opencord.org/browse/CORD-825
243 self.cxn.send(ofp.message.meter_stats_reply(
244 xid=req.xid, entries=meter_stats))
245
246 def handle_meter_config_request(self, req):
247 raise NotImplementedError()
248
249 def handle_meter_features_request(self, req):
250 self.cxn.send(ofp.message.bad_request_error_msg())
251
252 @inlineCallbacks
253 def handle_port_stats_request(self, req):
254 try:
255 ports = yield self.rpc.list_ports(self.device_id)
256 port_stats = [to_loxi(p.ofp_port_stats) for p in ports]
257 of_message = ofp.message.port_stats_reply(
258 xid=req.xid,entries=port_stats)
259 self.cxn.send(of_message)
260 except:
261 log.exception('failed-port_stats-request', req=req)
262
263 @inlineCallbacks
264 def handle_port_desc_request(self, req):
265 port_list = yield self.rpc.get_port_list(self.device_id)
266 self.cxn.send(ofp.message.port_desc_stats_reply(
267 xid=req.xid,
268 #flags=None,
269 entries=[to_loxi(port.ofp_port) for port in port_list]
270 ))
271
272 def handle_queue_stats_request(self, req):
273 raise NotImplementedError()
274
275 def handle_table_stats_request(self, req):
276 table_stats = [] # see https://jira.opencord.org/browse/CORD-825
277 self.cxn.send(ofp.message.table_stats_reply(
278 xid=req.xid, entries=table_stats))
279
280 def handle_table_features_request(self, req):
281 raise NotImplementedError()
282
283 stats_handlers = {
284 ofp.OFPST_AGGREGATE: handle_aggregate_request,
285 ofp.OFPST_DESC: handle_device_description_request,
286 ofp.OFPST_EXPERIMENTER: handle_experimenter_stats_request,
287 ofp.OFPST_FLOW: handle_flow_stats_request,
288 ofp.OFPST_GROUP: handle_group_stats_request,
289 ofp.OFPST_GROUP_DESC: handle_group_descriptor_request,
290 ofp.OFPST_GROUP_FEATURES: handle_group_features_request,
291 ofp.OFPST_METER: handle_meter_stats_request,
292 ofp.OFPST_METER_CONFIG: handle_meter_config_request,
293 ofp.OFPST_METER_FEATURES: handle_meter_features_request,
294 ofp.OFPST_PORT: handle_port_stats_request,
295 ofp.OFPST_PORT_DESC: handle_port_desc_request,
296 ofp.OFPST_QUEUE: handle_queue_stats_request,
297 ofp.OFPST_TABLE: handle_table_stats_request,
298 ofp.OFPST_TABLE_FEATURES: handle_table_features_request
299 }
300
301 main_handlers = {
302 ofp.OFPT_BARRIER_REQUEST: handle_barrier_request,
303 ofp.OFPT_ECHO_REQUEST: handle_echo_request,
304 ofp.OFPT_FEATURES_REQUEST: handle_feature_request,
305 ofp.OFPT_EXPERIMENTER: handle_experimenter_request,
306 ofp.OFPT_FLOW_MOD: handle_flow_mod_request,
307 ofp.OFPT_GET_ASYNC_REQUEST: handle_get_async_request,
308 ofp.OFPT_GET_CONFIG_REQUEST: handle_get_config_request,
309 ofp.OFPT_GROUP_MOD: handle_group_mod_request,
310 ofp.OFPT_METER_MOD: handle_meter_mod_request,
311 ofp.OFPT_PACKET_OUT: handle_packet_out_request,
312 ofp.OFPT_PORT_MOD: handle_port_mod_request,
313 ofp.OFPT_QUEUE_GET_CONFIG_REQUEST: handle_queue_get_config_request,
314 ofp.OFPT_ROLE_REQUEST: handle_role_request,
315 ofp.OFPT_SET_ASYNC: handle_set_async_request,
316 ofp.OFPT_SET_CONFIG: handle_set_config_request,
317 ofp.OFPT_STATS_REQUEST: handle_stats_request,
318 ofp.OFPT_TABLE_MOD: handle_table_mod_request,
319 }
320
321 def forward_packet_in(self, ofp_packet_in):
322 if self.role == ofp.OFPCR_ROLE_MASTER or self.role == ofp.OFPCR_ROLE_EQUAL:
323 log.info('sending-packet-in', ofp_packet_in=ofp_packet_in)
324 self.cxn.send(to_loxi(ofp_packet_in))
325
326 def forward_port_status(self, ofp_port_status):
327 self.cxn.send(to_loxi(ofp_port_status))