blob: a0a14fc26ba8c8a7880ff1b878dc85e3f70dfc44 [file] [log] [blame]
Nathan Knuth418fdc82016-09-16 22:51:15 -07001import logging
2import loxi.of13 as ofp
3import socket
4import sys
5import time
6
7from loxi.connection import Connection
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -07008from ofagent.utils import pp
Nathan Knuth418fdc82016-09-16 22:51:15 -07009
10
11class Agent(object):
12
Nathan Knuth950dff22016-09-17 16:12:34 -070013 def __init__(self, controller, datapath_id,
14 store, backend, retry_interval=1):
Nathan Knuth418fdc82016-09-16 22:51:15 -070015 self.ip = controller.split(':')[0]
16 self.port = int(controller.split(':')[1])
17 self.datapath_id = datapath_id
18 self.store = store
19 self.backend = backend
20 self.exiting = False
21 self.retry_interval = retry_interval
22 self.cxn = None
23 self.soc = None
24
25 def run(self):
26 self.connect()
27
28 def connect(self):
29 """
30 Connect to a controller
31 """
32 while not self.exiting:
33 self.cxn = None
34 self.soc = soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
35 try:
36 soc.connect((self.ip, self.port))
37 except socket.error, e:
Nathan Knuth950dff22016-09-17 16:12:34 -070038 logging.info(
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070039 "Cannot connect to controller (errno=%d), "
40 "retrying in %s secs" %
Nathan Knuth950dff22016-09-17 16:12:34 -070041 (e.errno, self.retry_interval))
Nathan Knuth418fdc82016-09-16 22:51:15 -070042 else:
43 logging.info("Connected to controller")
44 soc.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
45 self.cxn = cxn = Connection(self.soc)
46 cxn.daemon = False
47 cxn.start()
48 try:
49 self.handle_protocol()
50 except Exception, e:
Nathan Knuth950dff22016-09-17 16:12:34 -070051 logging.info(
52 "Connection was lost (%s), will retry in %s secs" %
53 (e, self.retry_interval))
Nathan Knuth418fdc82016-09-16 22:51:15 -070054 time.sleep(self.retry_interval)
55
56 def stop(self):
57 if self.cxn is not None:
58 self.cxn.stop()
59 if self.soc is not None:
60 self.soc.close()
61
62 def signal_flow_mod_error(self, code, data):
63 msg = ofp.message.flow_mod_failed_error_msg(code=code, data=data)
64 self.cxn.send(msg)
65
66 def signal_group_mod_error(self, code, data):
67 msg = ofp.message.group_mod_failed_error_msg(code=code, data=data)
68 self.cxn.send(msg)
69
70 def signal_flow_removal(self, flow):
71 assert isinstance(flow, ofp.common.flow_stats_entry)
72 msg = ofp.message.flow_removed(
73 cookie=flow.cookie,
74 priority=flow.priority,
75 reason=None, # TODO
76 table_id=flow.table_id,
77 duration_sec=flow.duration_sec,
78 duration_nsec=flow.duration_nsec,
79 idle_timeout=flow.idle_timeout,
80 hard_timeout=flow.hard_timeout,
81 packet_count=flow.packet_count,
82 byte_count=flow.byte_count,
83 match=flow.match)
84 self.cxn.send(msg)
85
86 def send_packet_in(self, data, in_port):
87 match = ofp.match()
88 match.oxm_list.append(ofp.oxm.in_port(in_port))
89 msg = ofp.message.packet_in(
90 reason=ofp.OFPR_ACTION,
91 match=match,
92 data=data)
93 self.cxn.send(msg)
94
95 def handle_protocol(self):
96
97 cxn = self.cxn
98
99 # Send initial hello
100 cxn.send(ofp.message.hello())
101
102 if not cxn.recv(lambda msg: msg.type == ofp.OFPT_HELLO):
103 raise Exception("Did not receive initial HELLO")
104
105 while True:
106
107 try:
108 req = cxn.recv(lambda msg: True)
109 except AssertionError, e:
110 raise Exception("Connection is no longer alive")
111
112 print(pp(req))
113
114 if req is None:
115 # this simply means we timed out
116 # later we can use this to do other stuff
117 # for now we simply ignore this and loop back
118 pass
119
120 elif req.type == ofp.OFPT_FEATURES_REQUEST:
121 msg = ofp.message.features_reply(
122 xid=req.xid,
123 datapath_id=self.datapath_id,
124 n_buffers=256,
125 n_tables=2,
126 capabilities= (
127 ofp.OFPC_FLOW_STATS
128 | ofp.OFPC_TABLE_STATS
129 | ofp.OFPC_PORT_STATS
130 | ofp.OFPC_GROUP_STATS
131 )
132 )
133 cxn.send(msg)
134
135 elif req.type == ofp.OFPT_STATS_REQUEST:
136
137 if req.stats_type == ofp.OFPST_PORT_DESC:
138 # port stats request
139 msg = ofp.message.port_desc_stats_reply(
140 xid=req.xid,
141 #flags=None,
142 entries=self.store.port_list())
143 cxn.send(msg)
144
145 elif req.stats_type == ofp.OFPST_DESC:
146 # device description
147 msg = ofp.message.desc_stats_reply(
148 xid=req.xid,
149 flags=None,
150 mfr_desc=self.backend.mfr_desc,
151 hw_desc=self.backend.hw_desc,
152 sw_desc="pyofagent",
153 serial_num=self.backend.get_serial_num(),
154 dp_desc=self.backend.get_dp_desc())
155 cxn.send(msg)
156
157 elif req.stats_type == ofp.OFPST_FLOW:
158 # flow stats requested
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700159 msg = ofp.message.flow_stats_reply(
160 xid=req.xid, entries=self.store.flow_list())
Nathan Knuth418fdc82016-09-16 22:51:15 -0700161 cxn.send(msg)
162
163 elif req.stats_type == ofp.OFPST_TABLE:
164 # table stats requested
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700165 msg = ofp.message.table_stats_reply(
166 xid=req.xid, entries=self.store.table_stats())
Nathan Knuth418fdc82016-09-16 22:51:15 -0700167 cxn.send(msg)
168
169 elif req.stats_type == ofp.OFPST_PORT:
170 # port list
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700171 msg = ofp.message.port_stats_reply(
172 xid=req.xid, entries=self.store.port_stats())
Nathan Knuth418fdc82016-09-16 22:51:15 -0700173 cxn.send(msg)
174
175 elif req.stats_type == ofp.OFPST_GROUP:
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700176 msg = ofp.message.group_stats_reply(
177 xid=req.xid, entries=self.store.group_stats())
Nathan Knuth418fdc82016-09-16 22:51:15 -0700178 cxn.send(msg)
179
180 elif req.stats_type == ofp.OFPST_GROUP_DESC:
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700181 msg = ofp.message.group_desc_stats_reply(
182 xid=req.xid, entries=self.store.group_list())
Nathan Knuth418fdc82016-09-16 22:51:15 -0700183 cxn.send(msg)
184
185 elif req.stats_type == ofp.OFPST_METER:
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700186 msg = ofp.message.meter_stats_reply(
187 xid=req.xid, entries=[])
Nathan Knuth418fdc82016-09-16 22:51:15 -0700188 cxn.send(msg)
189
190 else:
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700191 logging.error("Unhandled stats type: %d in request:"
192 % req.stats_type)
Nathan Knuth418fdc82016-09-16 22:51:15 -0700193 logging.error(pp(req))
194
Nathan Knuth418fdc82016-09-16 22:51:15 -0700195 elif req.type == ofp.OFPT_SET_CONFIG:
196 # TODO ignored for now
197 pass
198
199 elif req.type == ofp.OFPT_BARRIER_REQUEST:
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700200 # TODO this will be the place to commit all changes before
201 # replying
Nathan Knuth418fdc82016-09-16 22:51:15 -0700202 # but now we send a reply right away
203 msg = ofp.message.barrier_reply(xid=req.xid)
204 cxn.send(msg)
205
206 elif req.type == ofp.OFPT_GET_CONFIG_REQUEST:
207 # send back configuration reply
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700208 msg = ofp.message.get_config_reply(
209 xid=req.xid, miss_send_len=ofp.OFPCML_NO_BUFFER)
Nathan Knuth418fdc82016-09-16 22:51:15 -0700210 cxn.send(msg)
211
212 elif req.type == ofp.OFPT_ROLE_REQUEST:
213 # TODO this is where we shall manage which connection is active
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700214 # now we simply verify that the role request is for active and
215 # reply
Nathan Knuth418fdc82016-09-16 22:51:15 -0700216 if req.role != ofp.OFPCR_ROLE_MASTER:
217 self.stop()
218 sys.exit(1)
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700219 msg = ofp.message.role_reply(
220 xid=req.xid, role=req.role,
221 generation_id=req.generation_id)
Nathan Knuth418fdc82016-09-16 22:51:15 -0700222 cxn.send(msg)
223
224 elif req.type == ofp.OFPT_PACKET_OUT:
225 in_port = req.in_port
226 data = req.data
227 for action in req.actions:
228 if action.type == ofp.OFPAT_OUTPUT:
229 port = action.port
230 self.backend.packet_out(in_port, port, data)
231 else:
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700232 logging.warn("Unhandled packet out action type %s"
233 % action.type)
Nathan Knuth418fdc82016-09-16 22:51:15 -0700234
235 elif req.type == ofp.OFPT_FLOW_MOD:
236
237 command = req._command
238
239 if command == ofp.OFPFC_ADD:
240 self.store.flow_add(req)
241
242 elif command == ofp.OFPFC_DELETE:
243 self.store.flow_delete(req)
244
245 elif command == ofp.OFPFC_DELETE_STRICT:
246 self.store.flow_delete_strict(req)
247
248 elif command == ofp.OFPFC_MODIFY:
249 self.store.flow_modify(req)
250
251 elif command == ofp.OFPFC_MODIFY_STRICT:
252 self.store.flow_modify_strict(req)
253
254 else:
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700255 logging.warn("Unhandled flow mod command %s in message:"
256 % command)
Nathan Knuth418fdc82016-09-16 22:51:15 -0700257 logging.warn(pp(req))
258
259 elif req.type == ofp.OFPT_GROUP_MOD:
260
261 command = req.command
262
263 if command == ofp.OFPGC_DELETE:
264 self.store.group_delete(req)
265
266 elif command == ofp.OFPGC_ADD:
267 self.store.group_add(req)
268
269 elif command == ofp.OFPGC_MODIFY:
270 self.store.group_modify(req)
271
272 else:
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700273 logging.warn("Unhandled group command %s in message:"
274 % command)
Nathan Knuth418fdc82016-09-16 22:51:15 -0700275 logging.warn(pp(req))
276
277 else:
278 logging.warn("Unhandled message from controller:")
279 logging.warn(pp(req))
280