VOL-1670 - close and reestablish connections
When grpc connectivity to the core is broken the ofagent will
break the connection to ONOS and then work to reconnect to
the core. After connecting to the core the connection to
ONOS will be restablished.
Change-Id: I75e645de3784a64ef4f9992df8baf37959cbbd86
diff --git a/python/ofagent/of_protocol_handler.py b/python/ofagent/of_protocol_handler.py
index a4b3f70..f8f817c 100755
--- a/python/ofagent/of_protocol_handler.py
+++ b/python/ofagent/of_protocol_handler.py
@@ -34,21 +34,18 @@
MAX_METER_BANDS = 255
MAX_METER_COLORS = 255
- def __init__(self, datapath_id, device_id, agent, cxn, rpc):
+ def __init__(self, datapath_id, device_id, agent, cxn):
"""
The upper half of the OpenFlow protocol, focusing on message
exchanges.
:param agent: Reference to the Agent() instance, can be used to
indicate critical errors to break the connection.
:param cxn: The lower level message serdes part of the OF protocol.
- :param rpc: The application level stub on which RPC calls
- are made as result of processing incoming OpenFlow request messages.
"""
self.datapath_id = datapath_id
self.device_id = device_id
self.agent = agent
self.cxn = cxn
- self.rpc = rpc
self.role = None
@inlineCallbacks
@@ -98,12 +95,14 @@
@inlineCallbacks
def handle_feature_request(self, req):
- device_info = yield self.rpc.get_device_info(self.device_id)
- kw = pb2dict(device_info.switch_features)
- self.cxn.send(ofp.message.features_reply(
- xid=req.xid,
- datapath_id=self.datapath_id,
- **kw))
+ rpc = self.agent.connection_manager.get_rpc_client()
+ if rpc is not None:
+ device_info = yield rpc.get_device_info(self.device_id)
+ kw = pb2dict(device_info.switch_features)
+ self.cxn.send(ofp.message.features_reply(
+ xid=req.xid,
+ datapath_id=self.datapath_id,
+ **kw))
def handle_stats_request(self, req):
handler = self.stats_handlers.get(req.stats_type, None)
@@ -122,40 +121,47 @@
raise NotImplementedError()
def handle_flow_mod_request(self, req):
- if self.role == ofp.OFPCR_ROLE_MASTER or self.role == ofp.OFPCR_ROLE_EQUAL:
- try:
- grpc_req = to_grpc(req)
- except Exception, e:
- log.exception('failed-to-convert', e=e)
- else:
- return self.rpc.update_flow_table(self.device_id, grpc_req)
+ log.debug('flow mod request')
+ rpc = self.agent.connection_manager.get_rpc_client()
+ if rpc is not None:
+ if self.role == ofp.OFPCR_ROLE_MASTER or self.role == ofp.OFPCR_ROLE_EQUAL:
+ try:
+ grpc_req = to_grpc(req)
+ except Exception, e:
+ log.exception('failed-to-convert', e=e)
+ else:
+ return rpc.update_flow_table(self.device_id, grpc_req)
- elif self.role == ofp.OFPCR_ROLE_SLAVE:
- self.cxn.send(ofp.message.bad_request_error_msg(code=ofp.OFPBRC_IS_SLAVE))
+ elif self.role == ofp.OFPCR_ROLE_SLAVE:
+ self.cxn.send(ofp.message.bad_request_error_msg(code=ofp.OFPBRC_IS_SLAVE))
def handle_meter_mod_request(self, req):
log.info('Received handle_meter_mod_request', request=req)
- if self.role == ofp.OFPCR_ROLE_MASTER or self.role == ofp.OFPCR_ROLE_EQUAL:
- try:
- grpc_req = to_grpc(req)
- except Exception, e:
- log.exception('failed-to-convert-meter-mod-request', e=e)
- else:
- return self.rpc.update_meter_mod_table(self.device_id, grpc_req)
+ rpc = self.agent.connection_manager.get_rpc_client()
+ if rpc is not None:
+ if self.role == ofp.OFPCR_ROLE_MASTER or self.role == ofp.OFPCR_ROLE_EQUAL:
+ try:
+ grpc_req = to_grpc(req)
+ except Exception, e:
+ log.exception('failed-to-convert-meter-mod-request', e=e)
+ else:
+ return rpc.update_meter_mod_table(self.device_id, grpc_req)
- elif self.role == ofp.OFPCR_ROLE_SLAVE:
- self.cxn.send(ofp.message.bad_request_error_msg(code=ofp.OFPBRC_IS_SLAVE))
+ elif self.role == ofp.OFPCR_ROLE_SLAVE:
+ self.cxn.send(ofp.message.bad_request_error_msg(code=ofp.OFPBRC_IS_SLAVE))
@inlineCallbacks
def handle_meter_stats_request(self, req):
log.info('Received handle_meter_stats_request', request=req)
- try:
- meters = yield self.rpc.list_meters(self.device_id)
- self.cxn.send(ofp.message.meter_stats_reply(
- xid=req.xid, entries=[to_loxi(m.stats) for m in meters]))
- except Exception, e:
- log.exception("failed-meter-stats-request", req=req, e=e)
+ rpc = self.agent.connection_manager.get_rpc_client()
+ if rpc is not None:
+ try:
+ meters = yield rpc.list_meters(self.device_id)
+ self.cxn.send(ofp.message.meter_stats_reply(
+ xid=req.xid, entries=[to_loxi(m.stats) for m in meters]))
+ except Exception, e:
+ log.exception("failed-meter-stats-request", req=req, e=e)
def handle_get_async_request(self, req):
raise NotImplementedError()
@@ -168,10 +174,12 @@
@inlineCallbacks
def handle_group_mod_request(self, req):
- if self.role == ofp.OFPCR_ROLE_MASTER or self.role == ofp.OFPCR_ROLE_EQUAL:
- yield self.rpc.update_group_table(self.device_id, to_grpc(req))
- elif self.role == ofp.OFPCR_ROLE_SLAVE:
- self.cxn.send(ofp.message.bad_request_error_msg(code=ofp.OFPBRC_IS_SLAVE))
+ rpc = self.agent.connection_manager.get_rpc_client()
+ if rpc is not None:
+ if self.role == ofp.OFPCR_ROLE_MASTER or self.role == ofp.OFPCR_ROLE_EQUAL:
+ yield rpc.update_group_table(self.device_id, to_grpc(req))
+ elif self.role == ofp.OFPCR_ROLE_SLAVE:
+ self.cxn.send(ofp.message.bad_request_error_msg(code=ofp.OFPBRC_IS_SLAVE))
def handle_role_request(self, req):
if req.role == ofp.OFPCR_ROLE_MASTER or req.role == ofp.OFPCR_ROLE_SLAVE:
@@ -192,11 +200,13 @@
xid=req.xid, role=req.role))
def handle_packet_out_request(self, req):
- if self.role == ofp.OFPCR_ROLE_MASTER or self.role == ofp.OFPCR_ROLE_EQUAL:
- self.rpc.send_packet_out(self.device_id, to_grpc(req))
+ rpc = self.agent.connection_manager.get_rpc_client()
+ if rpc is not None:
+ if self.role == ofp.OFPCR_ROLE_MASTER or self.role == ofp.OFPCR_ROLE_EQUAL:
+ rpc.send_packet_out(self.device_id, to_grpc(req))
- elif self.role == ofp.OFPCR_ROLE_SLAVE:
- self.cxn.send(ofp.message.bad_request_error_msg(code=ofp.OFPBRC_IS_SLAVE))
+ elif self.role == ofp.OFPCR_ROLE_SLAVE:
+ self.cxn.send(ofp.message.bad_request_error_msg(code=ofp.OFPBRC_IS_SLAVE))
def handle_set_config_request(self, req):
# Handle set config appropriately
@@ -205,18 +215,20 @@
@inlineCallbacks
def handle_port_mod_request(self, req):
- if self.role == ofp.OFPCR_ROLE_MASTER or self.role == ofp.OFPCR_ROLE_EQUAL:
- port = yield self.rpc.get_port(self.device_id, str(req.port_no))
+ rpc = self.agent.connection_manager.get_rpc_client()
+ if rpc is not None:
+ if self.role == ofp.OFPCR_ROLE_MASTER or self.role == ofp.OFPCR_ROLE_EQUAL:
+ port = yield rpc.get_port(self.device_id, str(req.port_no))
- if port.ofp_port.config & ofp.OFPPC_PORT_DOWN != \
- req.config & ofp.OFPPC_PORT_DOWN:
- if req.config & ofp.OFPPC_PORT_DOWN:
- self.rpc.disable_port(self.device_id, port.id)
- else:
- self.rpc.enable_port(self.device_id, port.id)
+ if port.ofp_port.config & ofp.OFPPC_PORT_DOWN != \
+ req.config & ofp.OFPPC_PORT_DOWN:
+ if req.config & ofp.OFPPC_PORT_DOWN:
+ rpc.disable_port(self.device_id, port.id)
+ else:
+ rpc.enable_port(self.device_id, port.id)
- elif self.role == ofp.OFPCR_ROLE_SLAVE:
- self.cxn.send(ofp.message.bad_request_error_msg(code=ofp.OFPBRC_IS_SLAVE))
+ elif self.role == ofp.OFPCR_ROLE_SLAVE:
+ self.cxn.send(ofp.message.bad_request_error_msg(code=ofp.OFPBRC_IS_SLAVE))
def handle_table_mod_request(self, req):
raise NotImplementedError()
@@ -232,33 +244,41 @@
@inlineCallbacks
def handle_device_description_request(self, req):
- device_info = yield self.rpc.get_device_info(self.device_id)
- kw = pb2dict(device_info.desc)
- self.cxn.send(ofp.message.desc_stats_reply(xid=req.xid, **kw))
+ rpc = self.agent.connection_manager.get_rpc_client()
+ if rpc is not None:
+ device_info = yield rpc.get_device_info(self.device_id)
+ kw = pb2dict(device_info.desc)
+ self.cxn.send(ofp.message.desc_stats_reply(xid=req.xid, **kw))
def handle_experimenter_stats_request(self, req):
raise NotImplementedError()
@inlineCallbacks
def handle_flow_stats_request(self, req):
- try:
- flow_stats = yield self.rpc.list_flows(self.device_id)
- self.cxn.send(ofp.message.flow_stats_reply(
- xid=req.xid, entries=[to_loxi(f) for f in flow_stats]))
- except Exception, e:
- log.exception('failed-flow-stats-request', req=req)
+ rpc = self.agent.connection_manager.get_rpc_client()
+ if rpc is not None:
+ try:
+ flow_stats = yield rpc.list_flows(self.device_id)
+ self.cxn.send(ofp.message.flow_stats_reply(
+ xid=req.xid, entries=[to_loxi(f) for f in flow_stats]))
+ except Exception, e:
+ log.exception('failed-flow-stats-request', req=req)
@inlineCallbacks
def handle_group_stats_request(self, req):
- group_stats = yield self.rpc.list_groups(self.device_id)
- self.cxn.send(ofp.message.group_stats_reply(
- xid=req.xid, entries=[to_loxi(g.stats) for g in group_stats]))
+ rpc = self.agent.connection_manager.get_rpc_client()
+ if rpc is not None:
+ group_stats = yield rpc.list_groups(self.device_id)
+ self.cxn.send(ofp.message.group_stats_reply(
+ xid=req.xid, entries=[to_loxi(g.stats) for g in group_stats]))
@inlineCallbacks
def handle_group_descriptor_request(self, req):
- group_stats = yield self.rpc.list_groups(self.device_id)
- self.cxn.send(ofp.message.group_desc_stats_reply(
- xid=req.xid, entries=[to_loxi(g.desc) for g in group_stats]))
+ rpc = self.agent.connection_manager.get_rpc_client()
+ if rpc is not None:
+ group_stats = yield rpc.list_groups(self.device_id)
+ self.cxn.send(ofp.message.group_desc_stats_reply(
+ xid=req.xid, entries=[to_loxi(g.desc) for g in group_stats]))
def handle_group_features_request(self, req):
raise NotImplementedError()
@@ -277,26 +297,30 @@
@inlineCallbacks
def handle_port_stats_request(self, req):
- try:
- ports = yield self.rpc.list_ports(self.device_id)
- port_stats = [to_loxi(p.ofp_port_stats) for p in ports]
- of_message = ofp.message.port_stats_reply(
- xid=req.xid,entries=port_stats)
- self.cxn.send(of_message)
- except:
- log.exception('failed-port_stats-request', req=req)
+ rpc = self.agent.connection_manager.get_rpc_client()
+ if rpc is not None:
+ try:
+ ports = yield rpc.list_ports(self.device_id)
+ port_stats = [to_loxi(p.ofp_port_stats) for p in ports]
+ of_message = ofp.message.port_stats_reply(
+ xid=req.xid,entries=port_stats)
+ self.cxn.send(of_message)
+ except:
+ log.exception('failed-port_stats-request', req=req)
@inlineCallbacks
def handle_port_desc_request(self, req):
- port_list = yield self.rpc.get_port_list(self.device_id)
- try:
- self.cxn.send(ofp.message.port_desc_stats_reply(
- xid=req.xid,
- #flags=None,
- entries=[to_loxi(port.ofp_port) for port in port_list]
- ))
- except Exception as err:
- log.exception('failed-port-desc-reply', err=err)
+ rpc = self.agent.connection_manager.get_rpc_client()
+ if rpc is not None:
+ port_list = yield rpc.get_port_list(self.device_id)
+ try:
+ self.cxn.send(ofp.message.port_desc_stats_reply(
+ xid=req.xid,
+ #flags=None,
+ entries=[to_loxi(port.ofp_port) for port in port_list]
+ ))
+ except Exception as err:
+ log.exception('failed-port-desc-reply', err=err)
def handle_queue_stats_request(self, req):
raise NotImplementedError()