Nathan Knuth | 418fdc8 | 2016-09-16 22:51:15 -0700 | [diff] [blame] | 1 | import logging |
| 2 | import loxi.of13 as ofp |
| 3 | import socket |
| 4 | import sys |
| 5 | import time |
| 6 | |
| 7 | from loxi.connection import Connection |
Zsolt Haraszti | 023ea7c | 2016-10-16 19:30:34 -0700 | [diff] [blame] | 8 | from ofagent.utils import pp |
Nathan Knuth | 418fdc8 | 2016-09-16 22:51:15 -0700 | [diff] [blame] | 9 | |
| 10 | |
| 11 | class Agent(object): |
| 12 | |
Nathan Knuth | 950dff2 | 2016-09-17 16:12:34 -0700 | [diff] [blame] | 13 | def __init__(self, controller, datapath_id, |
| 14 | store, backend, retry_interval=1): |
Nathan Knuth | 418fdc8 | 2016-09-16 22:51:15 -0700 | [diff] [blame] | 15 | self.ip = controller.split(':')[0] |
| 16 | self.port = int(controller.split(':')[1]) |
| 17 | self.datapath_id = datapath_id |
| 18 | self.store = store |
| 19 | self.backend = backend |
| 20 | self.exiting = False |
| 21 | self.retry_interval = retry_interval |
| 22 | self.cxn = None |
| 23 | self.soc = None |
| 24 | |
| 25 | def run(self): |
| 26 | self.connect() |
| 27 | |
| 28 | def connect(self): |
| 29 | """ |
| 30 | Connect to a controller |
| 31 | """ |
| 32 | while not self.exiting: |
| 33 | self.cxn = None |
| 34 | self.soc = soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| 35 | try: |
| 36 | soc.connect((self.ip, self.port)) |
| 37 | except socket.error, e: |
Nathan Knuth | 950dff2 | 2016-09-17 16:12:34 -0700 | [diff] [blame] | 38 | logging.info( |
Zsolt Haraszti | 023ea7c | 2016-10-16 19:30:34 -0700 | [diff] [blame] | 39 | "Cannot connect to controller (errno=%d), " |
| 40 | "retrying in %s secs" % |
Nathan Knuth | 950dff2 | 2016-09-17 16:12:34 -0700 | [diff] [blame] | 41 | (e.errno, self.retry_interval)) |
Nathan Knuth | 418fdc8 | 2016-09-16 22:51:15 -0700 | [diff] [blame] | 42 | else: |
| 43 | logging.info("Connected to controller") |
| 44 | soc.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True) |
| 45 | self.cxn = cxn = Connection(self.soc) |
| 46 | cxn.daemon = False |
| 47 | cxn.start() |
| 48 | try: |
| 49 | self.handle_protocol() |
| 50 | except Exception, e: |
Nathan Knuth | 950dff2 | 2016-09-17 16:12:34 -0700 | [diff] [blame] | 51 | logging.info( |
| 52 | "Connection was lost (%s), will retry in %s secs" % |
| 53 | (e, self.retry_interval)) |
Nathan Knuth | 418fdc8 | 2016-09-16 22:51:15 -0700 | [diff] [blame] | 54 | time.sleep(self.retry_interval) |
| 55 | |
| 56 | def stop(self): |
| 57 | if self.cxn is not None: |
| 58 | self.cxn.stop() |
| 59 | if self.soc is not None: |
| 60 | self.soc.close() |
| 61 | |
| 62 | def signal_flow_mod_error(self, code, data): |
| 63 | msg = ofp.message.flow_mod_failed_error_msg(code=code, data=data) |
| 64 | self.cxn.send(msg) |
| 65 | |
| 66 | def signal_group_mod_error(self, code, data): |
| 67 | msg = ofp.message.group_mod_failed_error_msg(code=code, data=data) |
| 68 | self.cxn.send(msg) |
| 69 | |
| 70 | def signal_flow_removal(self, flow): |
| 71 | assert isinstance(flow, ofp.common.flow_stats_entry) |
| 72 | msg = ofp.message.flow_removed( |
| 73 | cookie=flow.cookie, |
| 74 | priority=flow.priority, |
| 75 | reason=None, # TODO |
| 76 | table_id=flow.table_id, |
| 77 | duration_sec=flow.duration_sec, |
| 78 | duration_nsec=flow.duration_nsec, |
| 79 | idle_timeout=flow.idle_timeout, |
| 80 | hard_timeout=flow.hard_timeout, |
| 81 | packet_count=flow.packet_count, |
| 82 | byte_count=flow.byte_count, |
| 83 | match=flow.match) |
| 84 | self.cxn.send(msg) |
| 85 | |
| 86 | def send_packet_in(self, data, in_port): |
| 87 | match = ofp.match() |
| 88 | match.oxm_list.append(ofp.oxm.in_port(in_port)) |
| 89 | msg = ofp.message.packet_in( |
| 90 | reason=ofp.OFPR_ACTION, |
| 91 | match=match, |
| 92 | data=data) |
| 93 | self.cxn.send(msg) |
| 94 | |
| 95 | def handle_protocol(self): |
| 96 | |
| 97 | cxn = self.cxn |
| 98 | |
| 99 | # Send initial hello |
| 100 | cxn.send(ofp.message.hello()) |
| 101 | |
| 102 | if not cxn.recv(lambda msg: msg.type == ofp.OFPT_HELLO): |
| 103 | raise Exception("Did not receive initial HELLO") |
| 104 | |
| 105 | while True: |
| 106 | |
| 107 | try: |
| 108 | req = cxn.recv(lambda msg: True) |
| 109 | except AssertionError, e: |
| 110 | raise Exception("Connection is no longer alive") |
| 111 | |
| 112 | print(pp(req)) |
| 113 | |
| 114 | if req is None: |
| 115 | # this simply means we timed out |
| 116 | # later we can use this to do other stuff |
| 117 | # for now we simply ignore this and loop back |
| 118 | pass |
| 119 | |
| 120 | elif req.type == ofp.OFPT_FEATURES_REQUEST: |
| 121 | msg = ofp.message.features_reply( |
| 122 | xid=req.xid, |
| 123 | datapath_id=self.datapath_id, |
| 124 | n_buffers=256, |
| 125 | n_tables=2, |
| 126 | capabilities= ( |
| 127 | ofp.OFPC_FLOW_STATS |
| 128 | | ofp.OFPC_TABLE_STATS |
| 129 | | ofp.OFPC_PORT_STATS |
| 130 | | ofp.OFPC_GROUP_STATS |
| 131 | ) |
| 132 | ) |
| 133 | cxn.send(msg) |
| 134 | |
| 135 | elif req.type == ofp.OFPT_STATS_REQUEST: |
| 136 | |
| 137 | if req.stats_type == ofp.OFPST_PORT_DESC: |
| 138 | # port stats request |
| 139 | msg = ofp.message.port_desc_stats_reply( |
| 140 | xid=req.xid, |
| 141 | #flags=None, |
| 142 | entries=self.store.port_list()) |
| 143 | cxn.send(msg) |
| 144 | |
| 145 | elif req.stats_type == ofp.OFPST_DESC: |
| 146 | # device description |
| 147 | msg = ofp.message.desc_stats_reply( |
| 148 | xid=req.xid, |
| 149 | flags=None, |
| 150 | mfr_desc=self.backend.mfr_desc, |
| 151 | hw_desc=self.backend.hw_desc, |
| 152 | sw_desc="pyofagent", |
| 153 | serial_num=self.backend.get_serial_num(), |
| 154 | dp_desc=self.backend.get_dp_desc()) |
| 155 | cxn.send(msg) |
| 156 | |
| 157 | elif req.stats_type == ofp.OFPST_FLOW: |
| 158 | # flow stats requested |
Zsolt Haraszti | 023ea7c | 2016-10-16 19:30:34 -0700 | [diff] [blame] | 159 | msg = ofp.message.flow_stats_reply( |
| 160 | xid=req.xid, entries=self.store.flow_list()) |
Nathan Knuth | 418fdc8 | 2016-09-16 22:51:15 -0700 | [diff] [blame] | 161 | cxn.send(msg) |
| 162 | |
| 163 | elif req.stats_type == ofp.OFPST_TABLE: |
| 164 | # table stats requested |
Zsolt Haraszti | 023ea7c | 2016-10-16 19:30:34 -0700 | [diff] [blame] | 165 | msg = ofp.message.table_stats_reply( |
| 166 | xid=req.xid, entries=self.store.table_stats()) |
Nathan Knuth | 418fdc8 | 2016-09-16 22:51:15 -0700 | [diff] [blame] | 167 | cxn.send(msg) |
| 168 | |
| 169 | elif req.stats_type == ofp.OFPST_PORT: |
| 170 | # port list |
Zsolt Haraszti | 023ea7c | 2016-10-16 19:30:34 -0700 | [diff] [blame] | 171 | msg = ofp.message.port_stats_reply( |
| 172 | xid=req.xid, entries=self.store.port_stats()) |
Nathan Knuth | 418fdc8 | 2016-09-16 22:51:15 -0700 | [diff] [blame] | 173 | cxn.send(msg) |
| 174 | |
| 175 | elif req.stats_type == ofp.OFPST_GROUP: |
Zsolt Haraszti | 023ea7c | 2016-10-16 19:30:34 -0700 | [diff] [blame] | 176 | msg = ofp.message.group_stats_reply( |
| 177 | xid=req.xid, entries=self.store.group_stats()) |
Nathan Knuth | 418fdc8 | 2016-09-16 22:51:15 -0700 | [diff] [blame] | 178 | cxn.send(msg) |
| 179 | |
| 180 | elif req.stats_type == ofp.OFPST_GROUP_DESC: |
Zsolt Haraszti | 023ea7c | 2016-10-16 19:30:34 -0700 | [diff] [blame] | 181 | msg = ofp.message.group_desc_stats_reply( |
| 182 | xid=req.xid, entries=self.store.group_list()) |
Nathan Knuth | 418fdc8 | 2016-09-16 22:51:15 -0700 | [diff] [blame] | 183 | cxn.send(msg) |
| 184 | |
| 185 | elif req.stats_type == ofp.OFPST_METER: |
Zsolt Haraszti | 023ea7c | 2016-10-16 19:30:34 -0700 | [diff] [blame] | 186 | msg = ofp.message.meter_stats_reply( |
| 187 | xid=req.xid, entries=[]) |
Nathan Knuth | 418fdc8 | 2016-09-16 22:51:15 -0700 | [diff] [blame] | 188 | cxn.send(msg) |
| 189 | |
| 190 | else: |
Zsolt Haraszti | 023ea7c | 2016-10-16 19:30:34 -0700 | [diff] [blame] | 191 | logging.error("Unhandled stats type: %d in request:" |
| 192 | % req.stats_type) |
Nathan Knuth | 418fdc8 | 2016-09-16 22:51:15 -0700 | [diff] [blame] | 193 | logging.error(pp(req)) |
| 194 | |
Nathan Knuth | 418fdc8 | 2016-09-16 22:51:15 -0700 | [diff] [blame] | 195 | elif req.type == ofp.OFPT_SET_CONFIG: |
| 196 | # TODO ignored for now |
| 197 | pass |
| 198 | |
| 199 | elif req.type == ofp.OFPT_BARRIER_REQUEST: |
Zsolt Haraszti | 023ea7c | 2016-10-16 19:30:34 -0700 | [diff] [blame] | 200 | # TODO this will be the place to commit all changes before |
| 201 | # replying |
Nathan Knuth | 418fdc8 | 2016-09-16 22:51:15 -0700 | [diff] [blame] | 202 | # but now we send a reply right away |
| 203 | msg = ofp.message.barrier_reply(xid=req.xid) |
| 204 | cxn.send(msg) |
| 205 | |
| 206 | elif req.type == ofp.OFPT_GET_CONFIG_REQUEST: |
| 207 | # send back configuration reply |
Zsolt Haraszti | 023ea7c | 2016-10-16 19:30:34 -0700 | [diff] [blame] | 208 | msg = ofp.message.get_config_reply( |
| 209 | xid=req.xid, miss_send_len=ofp.OFPCML_NO_BUFFER) |
Nathan Knuth | 418fdc8 | 2016-09-16 22:51:15 -0700 | [diff] [blame] | 210 | cxn.send(msg) |
| 211 | |
| 212 | elif req.type == ofp.OFPT_ROLE_REQUEST: |
| 213 | # TODO this is where we shall manage which connection is active |
Zsolt Haraszti | 023ea7c | 2016-10-16 19:30:34 -0700 | [diff] [blame] | 214 | # now we simply verify that the role request is for active and |
| 215 | # reply |
Nathan Knuth | 418fdc8 | 2016-09-16 22:51:15 -0700 | [diff] [blame] | 216 | if req.role != ofp.OFPCR_ROLE_MASTER: |
| 217 | self.stop() |
| 218 | sys.exit(1) |
Zsolt Haraszti | 023ea7c | 2016-10-16 19:30:34 -0700 | [diff] [blame] | 219 | msg = ofp.message.role_reply( |
| 220 | xid=req.xid, role=req.role, |
| 221 | generation_id=req.generation_id) |
Nathan Knuth | 418fdc8 | 2016-09-16 22:51:15 -0700 | [diff] [blame] | 222 | cxn.send(msg) |
| 223 | |
| 224 | elif req.type == ofp.OFPT_PACKET_OUT: |
| 225 | in_port = req.in_port |
| 226 | data = req.data |
| 227 | for action in req.actions: |
| 228 | if action.type == ofp.OFPAT_OUTPUT: |
| 229 | port = action.port |
| 230 | self.backend.packet_out(in_port, port, data) |
| 231 | else: |
Zsolt Haraszti | 023ea7c | 2016-10-16 19:30:34 -0700 | [diff] [blame] | 232 | logging.warn("Unhandled packet out action type %s" |
| 233 | % action.type) |
Nathan Knuth | 418fdc8 | 2016-09-16 22:51:15 -0700 | [diff] [blame] | 234 | |
| 235 | elif req.type == ofp.OFPT_FLOW_MOD: |
| 236 | |
| 237 | command = req._command |
| 238 | |
| 239 | if command == ofp.OFPFC_ADD: |
| 240 | self.store.flow_add(req) |
| 241 | |
| 242 | elif command == ofp.OFPFC_DELETE: |
| 243 | self.store.flow_delete(req) |
| 244 | |
| 245 | elif command == ofp.OFPFC_DELETE_STRICT: |
| 246 | self.store.flow_delete_strict(req) |
| 247 | |
| 248 | elif command == ofp.OFPFC_MODIFY: |
| 249 | self.store.flow_modify(req) |
| 250 | |
| 251 | elif command == ofp.OFPFC_MODIFY_STRICT: |
| 252 | self.store.flow_modify_strict(req) |
| 253 | |
| 254 | else: |
Zsolt Haraszti | 023ea7c | 2016-10-16 19:30:34 -0700 | [diff] [blame] | 255 | logging.warn("Unhandled flow mod command %s in message:" |
| 256 | % command) |
Nathan Knuth | 418fdc8 | 2016-09-16 22:51:15 -0700 | [diff] [blame] | 257 | logging.warn(pp(req)) |
| 258 | |
| 259 | elif req.type == ofp.OFPT_GROUP_MOD: |
| 260 | |
| 261 | command = req.command |
| 262 | |
| 263 | if command == ofp.OFPGC_DELETE: |
| 264 | self.store.group_delete(req) |
| 265 | |
| 266 | elif command == ofp.OFPGC_ADD: |
| 267 | self.store.group_add(req) |
| 268 | |
| 269 | elif command == ofp.OFPGC_MODIFY: |
| 270 | self.store.group_modify(req) |
| 271 | |
| 272 | else: |
Zsolt Haraszti | 023ea7c | 2016-10-16 19:30:34 -0700 | [diff] [blame] | 273 | logging.warn("Unhandled group command %s in message:" |
| 274 | % command) |
Nathan Knuth | 418fdc8 | 2016-09-16 22:51:15 -0700 | [diff] [blame] | 275 | logging.warn(pp(req)) |
| 276 | |
| 277 | else: |
| 278 | logging.warn("Unhandled message from controller:") |
| 279 | logging.warn(pp(req)) |
| 280 | |