blob: 991df6d4bd5ca3ed132077140e3f122eeef41359 [file] [log] [blame]
Zack Williams41513bf2018-07-07 20:08:35 -07001# Copyright 2017-present Open Networking Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
Nathan Knuth418fdc82016-09-16 22:51:15 -070014import logging
15import loxi.of13 as ofp
16import socket
17import sys
18import time
19
20from loxi.connection import Connection
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070021from ofagent.utils import pp
Nathan Knuth418fdc82016-09-16 22:51:15 -070022
23
24class Agent(object):
25
Nathan Knuth950dff22016-09-17 16:12:34 -070026 def __init__(self, controller, datapath_id,
27 store, backend, retry_interval=1):
Nathan Knuth418fdc82016-09-16 22:51:15 -070028 self.ip = controller.split(':')[0]
29 self.port = int(controller.split(':')[1])
30 self.datapath_id = datapath_id
31 self.store = store
32 self.backend = backend
33 self.exiting = False
34 self.retry_interval = retry_interval
35 self.cxn = None
36 self.soc = None
37
38 def run(self):
39 self.connect()
40
41 def connect(self):
42 """
43 Connect to a controller
44 """
45 while not self.exiting:
46 self.cxn = None
47 self.soc = soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
48 try:
49 soc.connect((self.ip, self.port))
50 except socket.error, e:
Nathan Knuth950dff22016-09-17 16:12:34 -070051 logging.info(
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070052 "Cannot connect to controller (errno=%d), "
53 "retrying in %s secs" %
Nathan Knuth950dff22016-09-17 16:12:34 -070054 (e.errno, self.retry_interval))
Nathan Knuth418fdc82016-09-16 22:51:15 -070055 else:
56 logging.info("Connected to controller")
57 soc.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
58 self.cxn = cxn = Connection(self.soc)
59 cxn.daemon = False
60 cxn.start()
61 try:
62 self.handle_protocol()
63 except Exception, e:
Nathan Knuth950dff22016-09-17 16:12:34 -070064 logging.info(
65 "Connection was lost (%s), will retry in %s secs" %
66 (e, self.retry_interval))
Nathan Knuth418fdc82016-09-16 22:51:15 -070067 time.sleep(self.retry_interval)
68
69 def stop(self):
70 if self.cxn is not None:
71 self.cxn.stop()
72 if self.soc is not None:
73 self.soc.close()
74
75 def signal_flow_mod_error(self, code, data):
76 msg = ofp.message.flow_mod_failed_error_msg(code=code, data=data)
77 self.cxn.send(msg)
78
79 def signal_group_mod_error(self, code, data):
80 msg = ofp.message.group_mod_failed_error_msg(code=code, data=data)
81 self.cxn.send(msg)
82
83 def signal_flow_removal(self, flow):
84 assert isinstance(flow, ofp.common.flow_stats_entry)
85 msg = ofp.message.flow_removed(
86 cookie=flow.cookie,
87 priority=flow.priority,
88 reason=None, # TODO
89 table_id=flow.table_id,
90 duration_sec=flow.duration_sec,
91 duration_nsec=flow.duration_nsec,
92 idle_timeout=flow.idle_timeout,
93 hard_timeout=flow.hard_timeout,
94 packet_count=flow.packet_count,
95 byte_count=flow.byte_count,
96 match=flow.match)
97 self.cxn.send(msg)
98
99 def send_packet_in(self, data, in_port):
100 match = ofp.match()
101 match.oxm_list.append(ofp.oxm.in_port(in_port))
102 msg = ofp.message.packet_in(
103 reason=ofp.OFPR_ACTION,
104 match=match,
105 data=data)
106 self.cxn.send(msg)
107
108 def handle_protocol(self):
109
110 cxn = self.cxn
111
112 # Send initial hello
113 cxn.send(ofp.message.hello())
114
115 if not cxn.recv(lambda msg: msg.type == ofp.OFPT_HELLO):
116 raise Exception("Did not receive initial HELLO")
117
118 while True:
119
120 try:
121 req = cxn.recv(lambda msg: True)
122 except AssertionError, e:
123 raise Exception("Connection is no longer alive")
124
125 print(pp(req))
126
127 if req is None:
128 # this simply means we timed out
129 # later we can use this to do other stuff
130 # for now we simply ignore this and loop back
131 pass
132
133 elif req.type == ofp.OFPT_FEATURES_REQUEST:
134 msg = ofp.message.features_reply(
135 xid=req.xid,
136 datapath_id=self.datapath_id,
137 n_buffers=256,
138 n_tables=2,
139 capabilities= (
140 ofp.OFPC_FLOW_STATS
141 | ofp.OFPC_TABLE_STATS
142 | ofp.OFPC_PORT_STATS
143 | ofp.OFPC_GROUP_STATS
144 )
145 )
146 cxn.send(msg)
147
148 elif req.type == ofp.OFPT_STATS_REQUEST:
149
150 if req.stats_type == ofp.OFPST_PORT_DESC:
151 # port stats request
152 msg = ofp.message.port_desc_stats_reply(
153 xid=req.xid,
154 #flags=None,
155 entries=self.store.port_list())
156 cxn.send(msg)
157
158 elif req.stats_type == ofp.OFPST_DESC:
159 # device description
160 msg = ofp.message.desc_stats_reply(
161 xid=req.xid,
162 flags=None,
163 mfr_desc=self.backend.mfr_desc,
164 hw_desc=self.backend.hw_desc,
165 sw_desc="pyofagent",
166 serial_num=self.backend.get_serial_num(),
167 dp_desc=self.backend.get_dp_desc())
168 cxn.send(msg)
169
170 elif req.stats_type == ofp.OFPST_FLOW:
171 # flow stats requested
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700172 msg = ofp.message.flow_stats_reply(
173 xid=req.xid, entries=self.store.flow_list())
Nathan Knuth418fdc82016-09-16 22:51:15 -0700174 cxn.send(msg)
175
176 elif req.stats_type == ofp.OFPST_TABLE:
177 # table stats requested
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700178 msg = ofp.message.table_stats_reply(
179 xid=req.xid, entries=self.store.table_stats())
Nathan Knuth418fdc82016-09-16 22:51:15 -0700180 cxn.send(msg)
181
182 elif req.stats_type == ofp.OFPST_PORT:
183 # port list
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700184 msg = ofp.message.port_stats_reply(
185 xid=req.xid, entries=self.store.port_stats())
Nathan Knuth418fdc82016-09-16 22:51:15 -0700186 cxn.send(msg)
187
188 elif req.stats_type == ofp.OFPST_GROUP:
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700189 msg = ofp.message.group_stats_reply(
190 xid=req.xid, entries=self.store.group_stats())
Nathan Knuth418fdc82016-09-16 22:51:15 -0700191 cxn.send(msg)
192
193 elif req.stats_type == ofp.OFPST_GROUP_DESC:
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700194 msg = ofp.message.group_desc_stats_reply(
195 xid=req.xid, entries=self.store.group_list())
Nathan Knuth418fdc82016-09-16 22:51:15 -0700196 cxn.send(msg)
197
198 elif req.stats_type == ofp.OFPST_METER:
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700199 msg = ofp.message.meter_stats_reply(
200 xid=req.xid, entries=[])
Nathan Knuth418fdc82016-09-16 22:51:15 -0700201 cxn.send(msg)
202
203 else:
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700204 logging.error("Unhandled stats type: %d in request:"
205 % req.stats_type)
Nathan Knuth418fdc82016-09-16 22:51:15 -0700206 logging.error(pp(req))
207
Nathan Knuth418fdc82016-09-16 22:51:15 -0700208 elif req.type == ofp.OFPT_SET_CONFIG:
209 # TODO ignored for now
210 pass
211
212 elif req.type == ofp.OFPT_BARRIER_REQUEST:
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700213 # TODO this will be the place to commit all changes before
214 # replying
Nathan Knuth418fdc82016-09-16 22:51:15 -0700215 # but now we send a reply right away
216 msg = ofp.message.barrier_reply(xid=req.xid)
217 cxn.send(msg)
218
219 elif req.type == ofp.OFPT_GET_CONFIG_REQUEST:
220 # send back configuration reply
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700221 msg = ofp.message.get_config_reply(
222 xid=req.xid, miss_send_len=ofp.OFPCML_NO_BUFFER)
Nathan Knuth418fdc82016-09-16 22:51:15 -0700223 cxn.send(msg)
224
225 elif req.type == ofp.OFPT_ROLE_REQUEST:
226 # TODO this is where we shall manage which connection is active
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700227 # now we simply verify that the role request is for active and
228 # reply
Nathan Knuth418fdc82016-09-16 22:51:15 -0700229 if req.role != ofp.OFPCR_ROLE_MASTER:
230 self.stop()
231 sys.exit(1)
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700232 msg = ofp.message.role_reply(
233 xid=req.xid, role=req.role,
234 generation_id=req.generation_id)
Nathan Knuth418fdc82016-09-16 22:51:15 -0700235 cxn.send(msg)
236
237 elif req.type == ofp.OFPT_PACKET_OUT:
238 in_port = req.in_port
239 data = req.data
240 for action in req.actions:
241 if action.type == ofp.OFPAT_OUTPUT:
242 port = action.port
243 self.backend.packet_out(in_port, port, data)
244 else:
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700245 logging.warn("Unhandled packet out action type %s"
246 % action.type)
Nathan Knuth418fdc82016-09-16 22:51:15 -0700247
248 elif req.type == ofp.OFPT_FLOW_MOD:
249
250 command = req._command
251
252 if command == ofp.OFPFC_ADD:
253 self.store.flow_add(req)
254
255 elif command == ofp.OFPFC_DELETE:
256 self.store.flow_delete(req)
257
258 elif command == ofp.OFPFC_DELETE_STRICT:
259 self.store.flow_delete_strict(req)
260
261 elif command == ofp.OFPFC_MODIFY:
262 self.store.flow_modify(req)
263
264 elif command == ofp.OFPFC_MODIFY_STRICT:
265 self.store.flow_modify_strict(req)
266
267 else:
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700268 logging.warn("Unhandled flow mod command %s in message:"
269 % command)
Nathan Knuth418fdc82016-09-16 22:51:15 -0700270 logging.warn(pp(req))
271
272 elif req.type == ofp.OFPT_GROUP_MOD:
273
274 command = req.command
275
276 if command == ofp.OFPGC_DELETE:
277 self.store.group_delete(req)
278
279 elif command == ofp.OFPGC_ADD:
280 self.store.group_add(req)
281
282 elif command == ofp.OFPGC_MODIFY:
283 self.store.group_modify(req)
284
285 else:
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700286 logging.warn("Unhandled group command %s in message:"
287 % command)
Nathan Knuth418fdc82016-09-16 22:51:15 -0700288 logging.warn(pp(req))
289
290 else:
291 logging.warn("Unhandled message from controller:")
292 logging.warn(pp(req))
293