blob: bc6aa66d1d8cddb2d950951bea31c2e42eb83cf0 [file] [log] [blame]
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -07001#
Zsolt Haraszti3eb27a52017-01-03 21:56:48 -08002# Copyright 2017 the original author or authors.
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -07003#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import structlog
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070017from twisted.internet.defer import inlineCallbacks, returnValue
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070018
19import loxi.of13 as ofp
20from converter import to_loxi, pb2dict, to_grpc
21
22log = structlog.get_logger()
23
24
25class OpenFlowProtocolError(Exception): pass
26
27
28class OpenFlowProtocolHandler(object):
29
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070030 def __init__(self, datapath_id, device_id, agent, cxn, rpc):
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070031 """
32 The upper half of the OpenFlow protocol, focusing on message
33 exchanges.
34 :param agent: Reference to the Agent() instance, can be used to
35 indicate critical errors to break the connection.
36 :param cxn: The lower level message serdes part of the OF protocol.
37 :param rpc: The application level stub on which RPC calls
38 are made as result of processing incoming OpenFlow request messages.
39 """
40 self.datapath_id = datapath_id
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070041 self.device_id = device_id
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070042 self.agent = agent
43 self.cxn = cxn
44 self.rpc = rpc
sgovindacc736782017-05-02 20:06:37 +053045 self.role = None
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070046
47 @inlineCallbacks
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070048 def start(self):
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070049 """A new call is made after a fresh reconnect"""
50
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070051 log.debug('starting')
52
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070053 try:
54 # send initial hello message
55 self.cxn.send(ofp.message.hello())
56
57 # expect to receive a hello message
58 msg = yield self.cxn.recv_class(ofp.message.hello)
alshabibc3fb4942017-01-26 15:34:24 -080059 # verify version compatibility (must list version 1.3)
60 # and negotiate if not.
61 # see https://jira.opencord.org/browse/CORD-822
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070062
63 while True:
64 req = yield self.cxn.recv_any()
65 handler = self.main_handlers.get(req.type, None)
66 if handler:
67 handler(self, req)
68 else:
69 log.error('cannot-handle',
70 request=req, xid=req.xid, type=req.type)
71
72 except Exception, e:
73 log.exception('exception', e=e)
74
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070075 log.info('started')
76 returnValue(self)
77
78 def stop(self):
79 log.debug('stopping')
80 pass # nothing to do yet
81 log.info('stopped')
82
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070083 def handle_echo_request(self, req):
84 self.cxn.send(ofp.message.echo_reply(xid=req.xid))
85
86 @inlineCallbacks
87 def handle_feature_request(self, req):
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070088 device_info = yield self.rpc.get_device_info(self.device_id)
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070089 kw = pb2dict(device_info.switch_features)
90 self.cxn.send(ofp.message.features_reply(
91 xid=req.xid,
92 datapath_id=self.datapath_id,
93 **kw))
94
95 def handle_stats_request(self, req):
96 handler = self.stats_handlers.get(req.stats_type, None)
97 if handler:
98 handler(self, req)
99 else:
100 raise OpenFlowProtocolError(
101 'Cannot handle stats request type "{}"'.format(req.stats_type))
102
103 def handle_barrier_request(self, req):
alshabibc3fb4942017-01-26 15:34:24 -0800104 # not really doing barrier yet, but we respond
105 # see https://jira.opencord.org/browse/CORD-823
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700106 self.cxn.send(ofp.message.barrier_reply(xid=req.xid))
107
108 def handle_experimenter_request(self, req):
109 raise NotImplementedError()
110
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700111 def handle_flow_mod_request(self, req):
sgovindacc736782017-05-02 20:06:37 +0530112 if self.role == ofp.OFPCR_ROLE_MASTER or self.role == ofp.OFPCR_ROLE_EQUAL:
113 try:
114 grpc_req = to_grpc(req)
115 except Exception, e:
116 log.exception('failed-to-convert', e=e)
117 else:
118 return self.rpc.update_flow_table(self.device_id, grpc_req)
119
120 elif self.role == ofp.OFPCR_ROLE_SLAVE:
121 self.cxn.send(ofp.message.bad_request_error_msg(code=ofp.OFPBRC_IS_SLAVE))
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700122
123 def handle_get_async_request(self, req):
124 raise NotImplementedError()
125
126 def handle_get_config_request(self, req):
127 self.cxn.send(ofp.message.get_config_reply(
128 xid=req.xid,
129 miss_send_len=ofp.OFPCML_NO_BUFFER
130 ))
131
Zsolt Haraszti8a774382016-10-24 18:25:54 -0700132 @inlineCallbacks
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700133 def handle_group_mod_request(self, req):
sgovindacc736782017-05-02 20:06:37 +0530134 if self.role == ofp.OFPCR_ROLE_MASTER or self.role == ofp.OFPCR_ROLE_EQUAL:
135 yield self.rpc.update_group_table(self.device_id, to_grpc(req))
136 elif self.role == ofp.OFPCR_ROLE_SLAVE:
137 self.cxn.send(ofp.message.bad_request_error_msg(code=ofp.OFPBRC_IS_SLAVE))
138
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700139
140 def handle_meter_mod_request(self, req):
141 raise NotImplementedError()
142
143 def handle_role_request(self, req):
sgovindacc736782017-05-02 20:06:37 +0530144 if req.role == ofp.OFPCR_ROLE_MASTER or req.role == ofp.OFPCR_ROLE_SLAVE:
sathishg00433e52017-05-23 16:07:41 +0530145 if self.agent.generation_is_defined and (
146 ((req.generation_id - self.agent.cached_generation_id) & 0xffffffffffffffff) if abs(
147 req.generation_id - self.agent.cached_generation_id) > 0x7fffffffffffffff else (
148 req.generation_id - self.agent.cached_generation_id)) < 0:
149 self.cxn.send(ofp.message.bad_request_error_msg(code=ofp.OFPRRFC_STALE))
150 else:
151 self.agent.generation_is_defined = True
152 self.agent.cached_generation_id = req.generation_id
153 self.role = req.role
154 self.cxn.send(ofp.message.role_reply(
155 xid=req.xid, role=req.role, generation_id=req.generation_id))
sgovindacc736782017-05-02 20:06:37 +0530156 elif req.role == ofp.OFPCR_ROLE_EQUAL:
sathishg00433e52017-05-23 16:07:41 +0530157 self.role = req.role
158 self.cxn.send(ofp.message.role_reply(
159 xid=req.xid, role=req.role))
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700160
161 def handle_packet_out_request(self, req):
sgovindacc736782017-05-02 20:06:37 +0530162 if self.role == ofp.OFPCR_ROLE_MASTER or self.role == ofp.OFPCR_ROLE_EQUAL:
163 self.rpc.send_packet_out(self.device_id, to_grpc(req))
164
165 elif self.role == ofp.OFPCR_ROLE_SLAVE:
166 self.cxn.send(ofp.message.bad_request_error_msg(code=ofp.OFPBRC_IS_SLAVE))
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700167
168 def handle_set_config_request(self, req):
alshabibc3fb4942017-01-26 15:34:24 -0800169 # Handle set config appropriately
170 # https://jira.opencord.org/browse/CORD-826
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700171 pass
172
173 def handle_port_mod_request(self, req):
174 raise NotImplementedError()
175
176 def handle_table_mod_request(self, req):
177 raise NotImplementedError()
178
179 def handle_queue_get_config_request(self, req):
180 raise NotImplementedError()
181
182 def handle_set_async_request(self, req):
183 raise NotImplementedError()
184
185 def handle_aggregate_request(self, req):
186 raise NotImplementedError
187
188 @inlineCallbacks
189 def handle_device_description_request(self, req):
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700190 device_info = yield self.rpc.get_device_info(self.device_id)
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700191 kw = pb2dict(device_info.desc)
192 self.cxn.send(ofp.message.desc_stats_reply(xid=req.xid, **kw))
193
194 def handle_experimenter_stats_request(self, req):
195 raise NotImplementedError()
196
197 @inlineCallbacks
198 def handle_flow_stats_request(self, req):
Zsolt Haraszti3578a1c2017-01-10 15:29:02 -0800199 try:
200 flow_stats = yield self.rpc.list_flows(self.device_id)
201 self.cxn.send(ofp.message.flow_stats_reply(
202 xid=req.xid, entries=[to_loxi(f) for f in flow_stats]))
203 except Exception, e:
204 log.exception('failed-flow-stats-request', req=req)
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700205
Zsolt Haraszti8a774382016-10-24 18:25:54 -0700206 @inlineCallbacks
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700207 def handle_group_stats_request(self, req):
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700208 group_stats = yield self.rpc.list_groups(self.device_id)
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700209 self.cxn.send(ofp.message.group_stats_reply(
alshabibf4fb2682017-01-12 00:32:56 -0600210 xid=req.xid, entries=[to_loxi(g.stats) for g in group_stats]))
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700211
alshabibf4fb2682017-01-12 00:32:56 -0600212 @inlineCallbacks
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700213 def handle_group_descriptor_request(self, req):
alshabibf4fb2682017-01-12 00:32:56 -0600214 group_stats = yield self.rpc.list_groups(self.device_id)
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700215 self.cxn.send(ofp.message.group_desc_stats_reply(
alshabibf4fb2682017-01-12 00:32:56 -0600216 xid=req.xid, entries=[to_loxi(g.desc) for g in group_stats]))
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700217
218 def handle_group_features_request(self, req):
219 raise NotImplementedError()
220
221 def handle_meter_stats_request(self, req):
alshabibc3fb4942017-01-26 15:34:24 -0800222 meter_stats = [] # see https://jira.opencord.org/browse/CORD-825
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700223 self.cxn.send(ofp.message.meter_stats_reply(
224 xid=req.xid, entries=meter_stats))
225
226 def handle_meter_config_request(self, req):
227 raise NotImplementedError()
228
229 def handle_meter_features_request(self, req):
alshabib81824e32016-12-21 21:43:45 -0800230 self.cxn.send(ofp.message.bad_request_error_msg())
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700231
232 def handle_port_stats_request(self, req):
alshabibc3fb4942017-01-26 15:34:24 -0800233 port_stats = [] # see https://jira.opencord.org/browse/CORD-825
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700234 self.cxn.send(ofp.message.port_stats_reply(
235 xid=req.xid,entries=port_stats))
236
237 @inlineCallbacks
238 def handle_port_desc_request(self, req):
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700239 port_list = yield self.rpc.get_port_list(self.device_id)
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700240 self.cxn.send(ofp.message.port_desc_stats_reply(
241 xid=req.xid,
242 #flags=None,
Zsolt Haraszti66862032016-11-28 14:28:39 -0800243 entries=[to_loxi(port.ofp_port) for port in port_list]
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700244 ))
245
246 def handle_queue_stats_request(self, req):
247 raise NotImplementedError()
248
249 def handle_table_stats_request(self, req):
alshabibc3fb4942017-01-26 15:34:24 -0800250 table_stats = [] # see https://jira.opencord.org/browse/CORD-825
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700251 self.cxn.send(ofp.message.table_stats_reply(
252 xid=req.xid, entries=table_stats))
253
254 def handle_table_features_request(self, req):
255 raise NotImplementedError()
256
257 stats_handlers = {
258 ofp.OFPST_AGGREGATE: handle_aggregate_request,
259 ofp.OFPST_DESC: handle_device_description_request,
260 ofp.OFPST_EXPERIMENTER: handle_experimenter_stats_request,
261 ofp.OFPST_FLOW: handle_flow_stats_request,
262 ofp.OFPST_GROUP: handle_group_stats_request,
263 ofp.OFPST_GROUP_DESC: handle_group_descriptor_request,
264 ofp.OFPST_GROUP_FEATURES: handle_group_features_request,
265 ofp.OFPST_METER: handle_meter_stats_request,
266 ofp.OFPST_METER_CONFIG: handle_meter_config_request,
267 ofp.OFPST_METER_FEATURES: handle_meter_features_request,
268 ofp.OFPST_PORT: handle_port_stats_request,
269 ofp.OFPST_PORT_DESC: handle_port_desc_request,
270 ofp.OFPST_QUEUE: handle_queue_stats_request,
271 ofp.OFPST_TABLE: handle_table_stats_request,
272 ofp.OFPST_TABLE_FEATURES: handle_table_features_request
273 }
274
275 main_handlers = {
276 ofp.OFPT_BARRIER_REQUEST: handle_barrier_request,
277 ofp.OFPT_ECHO_REQUEST: handle_echo_request,
278 ofp.OFPT_FEATURES_REQUEST: handle_feature_request,
279 ofp.OFPT_EXPERIMENTER: handle_experimenter_request,
280 ofp.OFPT_FLOW_MOD: handle_flow_mod_request,
281 ofp.OFPT_GET_ASYNC_REQUEST: handle_get_async_request,
282 ofp.OFPT_GET_CONFIG_REQUEST: handle_get_config_request,
283 ofp.OFPT_GROUP_MOD: handle_group_mod_request,
284 ofp.OFPT_METER_MOD: handle_meter_mod_request,
285 ofp.OFPT_PACKET_OUT: handle_packet_out_request,
286 ofp.OFPT_PORT_MOD: handle_port_mod_request,
287 ofp.OFPT_QUEUE_GET_CONFIG_REQUEST: handle_queue_get_config_request,
288 ofp.OFPT_ROLE_REQUEST: handle_role_request,
289 ofp.OFPT_SET_ASYNC: handle_set_async_request,
290 ofp.OFPT_SET_CONFIG: handle_set_config_request,
291 ofp.OFPT_STATS_REQUEST: handle_stats_request,
292 ofp.OFPT_TABLE_MOD: handle_table_mod_request,
293 }
294
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700295 def forward_packet_in(self, ofp_packet_in):
sgovindacc736782017-05-02 20:06:37 +0530296 if self.role == ofp.OFPCR_ROLE_MASTER or self.role == ofp.OFPCR_ROLE_EQUAL:
297 log.info('sending-packet-in', ofp_packet_in=ofp_packet_in)
298 self.cxn.send(to_loxi(ofp_packet_in))
Zsolt Haraszti217a12e2016-12-19 16:37:55 -0800299
300 def forward_port_status(self, ofp_port_status):
301 self.cxn.send(to_loxi(ofp_port_status))