| import logging |
| import loxi.of13 as ofp |
| import socket |
| import sys |
| import time |
| |
| from loxi.connection import Connection |
| from ofagent.utils import pp |
| |
| |
| class Agent(object): |
| |
| def __init__(self, controller, datapath_id, |
| store, backend, retry_interval=1): |
| self.ip = controller.split(':')[0] |
| self.port = int(controller.split(':')[1]) |
| self.datapath_id = datapath_id |
| self.store = store |
| self.backend = backend |
| self.exiting = False |
| self.retry_interval = retry_interval |
| self.cxn = None |
| self.soc = None |
| |
| def run(self): |
| self.connect() |
| |
| def connect(self): |
| """ |
| Connect to a controller |
| """ |
| while not self.exiting: |
| self.cxn = None |
| self.soc = soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| try: |
| soc.connect((self.ip, self.port)) |
| except socket.error, e: |
| logging.info( |
| "Cannot connect to controller (errno=%d), " |
| "retrying in %s secs" % |
| (e.errno, self.retry_interval)) |
| else: |
| logging.info("Connected to controller") |
| soc.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True) |
| self.cxn = cxn = Connection(self.soc) |
| cxn.daemon = False |
| cxn.start() |
| try: |
| self.handle_protocol() |
| except Exception, e: |
| logging.info( |
| "Connection was lost (%s), will retry in %s secs" % |
| (e, self.retry_interval)) |
| time.sleep(self.retry_interval) |
| |
| def stop(self): |
| if self.cxn is not None: |
| self.cxn.stop() |
| if self.soc is not None: |
| self.soc.close() |
| |
| def signal_flow_mod_error(self, code, data): |
| msg = ofp.message.flow_mod_failed_error_msg(code=code, data=data) |
| self.cxn.send(msg) |
| |
| def signal_group_mod_error(self, code, data): |
| msg = ofp.message.group_mod_failed_error_msg(code=code, data=data) |
| self.cxn.send(msg) |
| |
| def signal_flow_removal(self, flow): |
| assert isinstance(flow, ofp.common.flow_stats_entry) |
| msg = ofp.message.flow_removed( |
| cookie=flow.cookie, |
| priority=flow.priority, |
| reason=None, # TODO |
| table_id=flow.table_id, |
| duration_sec=flow.duration_sec, |
| duration_nsec=flow.duration_nsec, |
| idle_timeout=flow.idle_timeout, |
| hard_timeout=flow.hard_timeout, |
| packet_count=flow.packet_count, |
| byte_count=flow.byte_count, |
| match=flow.match) |
| self.cxn.send(msg) |
| |
| def send_packet_in(self, data, in_port): |
| match = ofp.match() |
| match.oxm_list.append(ofp.oxm.in_port(in_port)) |
| msg = ofp.message.packet_in( |
| reason=ofp.OFPR_ACTION, |
| match=match, |
| data=data) |
| self.cxn.send(msg) |
| |
| def handle_protocol(self): |
| |
| cxn = self.cxn |
| |
| # Send initial hello |
| cxn.send(ofp.message.hello()) |
| |
| if not cxn.recv(lambda msg: msg.type == ofp.OFPT_HELLO): |
| raise Exception("Did not receive initial HELLO") |
| |
| while True: |
| |
| try: |
| req = cxn.recv(lambda msg: True) |
| except AssertionError, e: |
| raise Exception("Connection is no longer alive") |
| |
| print(pp(req)) |
| |
| if req is None: |
| # this simply means we timed out |
| # later we can use this to do other stuff |
| # for now we simply ignore this and loop back |
| pass |
| |
| elif req.type == ofp.OFPT_FEATURES_REQUEST: |
| msg = ofp.message.features_reply( |
| xid=req.xid, |
| datapath_id=self.datapath_id, |
| n_buffers=256, |
| n_tables=2, |
| capabilities= ( |
| ofp.OFPC_FLOW_STATS |
| | ofp.OFPC_TABLE_STATS |
| | ofp.OFPC_PORT_STATS |
| | ofp.OFPC_GROUP_STATS |
| ) |
| ) |
| cxn.send(msg) |
| |
| elif req.type == ofp.OFPT_STATS_REQUEST: |
| |
| if req.stats_type == ofp.OFPST_PORT_DESC: |
| # port stats request |
| msg = ofp.message.port_desc_stats_reply( |
| xid=req.xid, |
| #flags=None, |
| entries=self.store.port_list()) |
| cxn.send(msg) |
| |
| elif req.stats_type == ofp.OFPST_DESC: |
| # device description |
| msg = ofp.message.desc_stats_reply( |
| xid=req.xid, |
| flags=None, |
| mfr_desc=self.backend.mfr_desc, |
| hw_desc=self.backend.hw_desc, |
| sw_desc="pyofagent", |
| serial_num=self.backend.get_serial_num(), |
| dp_desc=self.backend.get_dp_desc()) |
| cxn.send(msg) |
| |
| elif req.stats_type == ofp.OFPST_FLOW: |
| # flow stats requested |
| msg = ofp.message.flow_stats_reply( |
| xid=req.xid, entries=self.store.flow_list()) |
| cxn.send(msg) |
| |
| elif req.stats_type == ofp.OFPST_TABLE: |
| # table stats requested |
| msg = ofp.message.table_stats_reply( |
| xid=req.xid, entries=self.store.table_stats()) |
| cxn.send(msg) |
| |
| elif req.stats_type == ofp.OFPST_PORT: |
| # port list |
| msg = ofp.message.port_stats_reply( |
| xid=req.xid, entries=self.store.port_stats()) |
| cxn.send(msg) |
| |
| elif req.stats_type == ofp.OFPST_GROUP: |
| msg = ofp.message.group_stats_reply( |
| xid=req.xid, entries=self.store.group_stats()) |
| cxn.send(msg) |
| |
| elif req.stats_type == ofp.OFPST_GROUP_DESC: |
| msg = ofp.message.group_desc_stats_reply( |
| xid=req.xid, entries=self.store.group_list()) |
| cxn.send(msg) |
| |
| elif req.stats_type == ofp.OFPST_METER: |
| msg = ofp.message.meter_stats_reply( |
| xid=req.xid, entries=[]) |
| cxn.send(msg) |
| |
| else: |
| logging.error("Unhandled stats type: %d in request:" |
| % req.stats_type) |
| logging.error(pp(req)) |
| |
| elif req.type == ofp.OFPT_SET_CONFIG: |
| # TODO ignored for now |
| pass |
| |
| elif req.type == ofp.OFPT_BARRIER_REQUEST: |
| # TODO this will be the place to commit all changes before |
| # replying |
| # but now we send a reply right away |
| msg = ofp.message.barrier_reply(xid=req.xid) |
| cxn.send(msg) |
| |
| elif req.type == ofp.OFPT_GET_CONFIG_REQUEST: |
| # send back configuration reply |
| msg = ofp.message.get_config_reply( |
| xid=req.xid, miss_send_len=ofp.OFPCML_NO_BUFFER) |
| cxn.send(msg) |
| |
| elif req.type == ofp.OFPT_ROLE_REQUEST: |
| # TODO this is where we shall manage which connection is active |
| # now we simply verify that the role request is for active and |
| # reply |
| if req.role != ofp.OFPCR_ROLE_MASTER: |
| self.stop() |
| sys.exit(1) |
| msg = ofp.message.role_reply( |
| xid=req.xid, role=req.role, |
| generation_id=req.generation_id) |
| cxn.send(msg) |
| |
| elif req.type == ofp.OFPT_PACKET_OUT: |
| in_port = req.in_port |
| data = req.data |
| for action in req.actions: |
| if action.type == ofp.OFPAT_OUTPUT: |
| port = action.port |
| self.backend.packet_out(in_port, port, data) |
| else: |
| logging.warn("Unhandled packet out action type %s" |
| % action.type) |
| |
| elif req.type == ofp.OFPT_FLOW_MOD: |
| |
| command = req._command |
| |
| if command == ofp.OFPFC_ADD: |
| self.store.flow_add(req) |
| |
| elif command == ofp.OFPFC_DELETE: |
| self.store.flow_delete(req) |
| |
| elif command == ofp.OFPFC_DELETE_STRICT: |
| self.store.flow_delete_strict(req) |
| |
| elif command == ofp.OFPFC_MODIFY: |
| self.store.flow_modify(req) |
| |
| elif command == ofp.OFPFC_MODIFY_STRICT: |
| self.store.flow_modify_strict(req) |
| |
| else: |
| logging.warn("Unhandled flow mod command %s in message:" |
| % command) |
| logging.warn(pp(req)) |
| |
| elif req.type == ofp.OFPT_GROUP_MOD: |
| |
| command = req.command |
| |
| if command == ofp.OFPGC_DELETE: |
| self.store.group_delete(req) |
| |
| elif command == ofp.OFPGC_ADD: |
| self.store.group_add(req) |
| |
| elif command == ofp.OFPGC_MODIFY: |
| self.store.group_modify(req) |
| |
| else: |
| logging.warn("Unhandled group command %s in message:" |
| % command) |
| logging.warn(pp(req)) |
| |
| else: |
| logging.warn("Unhandled message from controller:") |
| logging.warn(pp(req)) |
| |