blob: a74f20555ce6110316135e585f39d610653f816f [file] [log] [blame]
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -07001#
2# Copyright 2016 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import structlog
17from twisted.internet.defer import inlineCallbacks
18
19import loxi.of13 as ofp
20from converter import to_loxi, pb2dict, to_grpc
21
22log = structlog.get_logger()
23
24
25class OpenFlowProtocolError(Exception): pass
26
27
28class OpenFlowProtocolHandler(object):
29
30 def __init__(self, datapath_id, agent, cxn, rpc):
31 """
32 The upper half of the OpenFlow protocol, focusing on message
33 exchanges.
34 :param agent: Reference to the Agent() instance, can be used to
35 indicate critical errors to break the connection.
36 :param cxn: The lower level message serdes part of the OF protocol.
37 :param rpc: The application level stub on which RPC calls
38 are made as result of processing incoming OpenFlow request messages.
39 """
40 self.datapath_id = datapath_id
41 self.agent = agent
42 self.cxn = cxn
43 self.rpc = rpc
44
45 @inlineCallbacks
46 def run(self):
47 """A new call is made after a fresh reconnect"""
48
49 try:
50 # send initial hello message
51 self.cxn.send(ofp.message.hello())
52
53 # expect to receive a hello message
54 msg = yield self.cxn.recv_class(ofp.message.hello)
55 # TODO verify version compatibility (must list version 1.3)
56
57 while True:
58 req = yield self.cxn.recv_any()
59 handler = self.main_handlers.get(req.type, None)
60 if handler:
61 handler(self, req)
62 else:
63 log.error('cannot-handle',
64 request=req, xid=req.xid, type=req.type)
65
66 except Exception, e:
67 log.exception('exception', e=e)
68
69 def handle_echo_request(self, req):
70 self.cxn.send(ofp.message.echo_reply(xid=req.xid))
71
72 @inlineCallbacks
73 def handle_feature_request(self, req):
74 device_info = yield self.rpc.get_device_info(self.datapath_id)
75 kw = pb2dict(device_info.switch_features)
76 self.cxn.send(ofp.message.features_reply(
77 xid=req.xid,
78 datapath_id=self.datapath_id,
79 **kw))
80
81 def handle_stats_request(self, req):
82 handler = self.stats_handlers.get(req.stats_type, None)
83 if handler:
84 handler(self, req)
85 else:
86 raise OpenFlowProtocolError(
87 'Cannot handle stats request type "{}"'.format(req.stats_type))
88
89 def handle_barrier_request(self, req):
90 # TODO not really doing barrier yet, but we respond
91 self.cxn.send(ofp.message.barrier_reply(xid=req.xid))
92
93 def handle_experimenter_request(self, req):
94 raise NotImplementedError()
95
96 @inlineCallbacks
97 def handle_flow_mod_request(self, req):
98 yield self.rpc.update_flow_table(self.datapath_id, to_grpc(req))
99
100 def handle_get_async_request(self, req):
101 raise NotImplementedError()
102
103 def handle_get_config_request(self, req):
104 self.cxn.send(ofp.message.get_config_reply(
105 xid=req.xid,
106 miss_send_len=ofp.OFPCML_NO_BUFFER
107 ))
108
Zsolt Haraszti8a774382016-10-24 18:25:54 -0700109 @inlineCallbacks
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700110 def handle_group_mod_request(self, req):
Zsolt Haraszti8a774382016-10-24 18:25:54 -0700111 yield self.rpc.update_group_table(self.datapath_id, to_grpc(req))
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700112
113 def handle_meter_mod_request(self, req):
114 raise NotImplementedError()
115
116 def handle_role_request(self, req):
117 # TODO this is where we need to manage which connection is active
118 if req.role != ofp.OFPCR_ROLE_MASTER:
119 raise NotImplementedError()
120 self.cxn.send(ofp.message.role_reply(
121 xid=req.xid, role=req.role, generation_id=req.generation_id))
122
123 def handle_packet_out_request(self, req):
124 # TODO send packet out
125 pass
126
127 def handle_set_config_request(self, req):
128 # TODO ignore for now
129 pass
130
131 def handle_port_mod_request(self, req):
132 raise NotImplementedError()
133
134 def handle_table_mod_request(self, req):
135 raise NotImplementedError()
136
137 def handle_queue_get_config_request(self, req):
138 raise NotImplementedError()
139
140 def handle_set_async_request(self, req):
141 raise NotImplementedError()
142
143 def handle_aggregate_request(self, req):
144 raise NotImplementedError
145
146 @inlineCallbacks
147 def handle_device_description_request(self, req):
148 device_info = yield self.rpc.get_device_info(self.datapath_id)
149 kw = pb2dict(device_info.desc)
150 self.cxn.send(ofp.message.desc_stats_reply(xid=req.xid, **kw))
151
152 def handle_experimenter_stats_request(self, req):
153 raise NotImplementedError()
154
155 @inlineCallbacks
156 def handle_flow_stats_request(self, req):
157 flow_stats = yield self.rpc.list_flows(self.datapath_id)
158 self.cxn.send(ofp.message.flow_stats_reply(
159 xid=req.xid, entries=[to_loxi(f) for f in flow_stats]))
160
Zsolt Haraszti8a774382016-10-24 18:25:54 -0700161 @inlineCallbacks
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700162 def handle_group_stats_request(self, req):
Zsolt Haraszti8a774382016-10-24 18:25:54 -0700163 group_stats = yield self.rpc.list_groups(self.datapath_id)
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700164 self.cxn.send(ofp.message.group_stats_reply(
Zsolt Haraszti8a774382016-10-24 18:25:54 -0700165 xid=req.xid, entries=[to_loxi(g) for g in group_stats]))
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700166
167 def handle_group_descriptor_request(self, req):
168 group_list = [] # TODO
169 self.cxn.send(ofp.message.group_desc_stats_reply(
170 xid=req.xid, entries=group_list))
171
172 def handle_group_features_request(self, req):
173 raise NotImplementedError()
174
175 def handle_meter_stats_request(self, req):
176 meter_stats = [] # TODO
177 self.cxn.send(ofp.message.meter_stats_reply(
178 xid=req.xid, entries=meter_stats))
179
180 def handle_meter_config_request(self, req):
181 raise NotImplementedError()
182
183 def handle_meter_features_request(self, req):
184 raise NotImplementedError()
185
186 def handle_port_stats_request(self, req):
187 port_stats = [] # TODO
188 self.cxn.send(ofp.message.port_stats_reply(
189 xid=req.xid,entries=port_stats))
190
191 @inlineCallbacks
192 def handle_port_desc_request(self, req):
193 port_list = yield self.rpc.get_port_list(self.datapath_id)
194 self.cxn.send(ofp.message.port_desc_stats_reply(
195 xid=req.xid,
196 #flags=None,
197 entries=[to_loxi(port) for port in port_list]
198 ))
199
200 def handle_queue_stats_request(self, req):
201 raise NotImplementedError()
202
203 def handle_table_stats_request(self, req):
204 table_stats = [] # TODO
205 self.cxn.send(ofp.message.table_stats_reply(
206 xid=req.xid, entries=table_stats))
207
208 def handle_table_features_request(self, req):
209 raise NotImplementedError()
210
211 stats_handlers = {
212 ofp.OFPST_AGGREGATE: handle_aggregate_request,
213 ofp.OFPST_DESC: handle_device_description_request,
214 ofp.OFPST_EXPERIMENTER: handle_experimenter_stats_request,
215 ofp.OFPST_FLOW: handle_flow_stats_request,
216 ofp.OFPST_GROUP: handle_group_stats_request,
217 ofp.OFPST_GROUP_DESC: handle_group_descriptor_request,
218 ofp.OFPST_GROUP_FEATURES: handle_group_features_request,
219 ofp.OFPST_METER: handle_meter_stats_request,
220 ofp.OFPST_METER_CONFIG: handle_meter_config_request,
221 ofp.OFPST_METER_FEATURES: handle_meter_features_request,
222 ofp.OFPST_PORT: handle_port_stats_request,
223 ofp.OFPST_PORT_DESC: handle_port_desc_request,
224 ofp.OFPST_QUEUE: handle_queue_stats_request,
225 ofp.OFPST_TABLE: handle_table_stats_request,
226 ofp.OFPST_TABLE_FEATURES: handle_table_features_request
227 }
228
229 main_handlers = {
230 ofp.OFPT_BARRIER_REQUEST: handle_barrier_request,
231 ofp.OFPT_ECHO_REQUEST: handle_echo_request,
232 ofp.OFPT_FEATURES_REQUEST: handle_feature_request,
233 ofp.OFPT_EXPERIMENTER: handle_experimenter_request,
234 ofp.OFPT_FLOW_MOD: handle_flow_mod_request,
235 ofp.OFPT_GET_ASYNC_REQUEST: handle_get_async_request,
236 ofp.OFPT_GET_CONFIG_REQUEST: handle_get_config_request,
237 ofp.OFPT_GROUP_MOD: handle_group_mod_request,
238 ofp.OFPT_METER_MOD: handle_meter_mod_request,
239 ofp.OFPT_PACKET_OUT: handle_packet_out_request,
240 ofp.OFPT_PORT_MOD: handle_port_mod_request,
241 ofp.OFPT_QUEUE_GET_CONFIG_REQUEST: handle_queue_get_config_request,
242 ofp.OFPT_ROLE_REQUEST: handle_role_request,
243 ofp.OFPT_SET_ASYNC: handle_set_async_request,
244 ofp.OFPT_SET_CONFIG: handle_set_config_request,
245 ofp.OFPT_STATS_REQUEST: handle_stats_request,
246 ofp.OFPT_TABLE_MOD: handle_table_mod_request,
247 }
248