Propagate port change to SDN controller via OF13
This change adds plumbing to propagate new logical port
additions up to the SDN contoller via OpenFlow.
Change-Id: Ic02586b096144c3697a0974ebbc8a7be8760bef8
diff --git a/ofagent/agent.py b/ofagent/agent.py
index cabcc6c..a60b7f7 100644
--- a/ofagent/agent.py
+++ b/ofagent/agent.py
@@ -25,7 +25,7 @@
from common.utils.asleep import asleep
from of_connection import OpenFlowConnection
from of_protocol_handler import OpenFlowProtocolHandler
-
+from ofagent.protos.openflow_13_pb2 import ChangeEvent
log = structlog.get_logger()
@@ -121,6 +121,14 @@
def forward_packet_in(self, ofp_packet_in):
self.proto_handler.forward_packet_in(ofp_packet_in)
+ def forward_change_event(self, event):
+ # assert isinstance(event, ChangeEvent)
+ log.info('got-change-event', change_event=event)
+ if event.HasField("port_status"):
+ self.proto_handler.forward_port_status(event.port_status)
+ else:
+ log.error('unknown-change-event', change_event=event)
+
if __name__ == '__main__':
"""Run this to test the agent for N concurrent sessions:
diff --git a/ofagent/connection_mgr.py b/ofagent/connection_mgr.py
index 8d25891..1aa7031 100644
--- a/ofagent/connection_mgr.py
+++ b/ofagent/connection_mgr.py
@@ -209,3 +209,9 @@
if datapath_id:
agent = self.agent_map[datapath_id]
agent.forward_packet_in(ofp_packet_in)
+
+ def forward_change_event(self, device_id, event):
+ datapath_id = self.device_id_to_datapath_id_map.get(device_id, None)
+ if datapath_id:
+ agent = self.agent_map[datapath_id]
+ agent.forward_change_event(event)
diff --git a/ofagent/converter.py b/ofagent/converter.py
index 9ab9636..7252c43 100644
--- a/ofagent/converter.py
+++ b/ofagent/converter.py
@@ -55,6 +55,12 @@
kw = pb2dict(pb)
return of13.common.port_desc(**kw)
+def ofp_port_status_to_loxi_port_status(pb):
+ return of13.message.port_status(
+ reason=pb.reason,
+ desc=ofp_port_to_loxi_port_desc(pb.desc)
+ )
+
def make_loxi_match(match):
assert match.get('type', pb2.OFPMT_STANDARD) == pb2.OFPMT_OXM
loxi_match_fields = []
@@ -120,6 +126,7 @@
to_loxi_converters = {
pb2.ofp_port: ofp_port_to_loxi_port_desc,
+ pb2.ofp_port_status: ofp_port_status_to_loxi_port_status,
pb2.ofp_flow_stats: ofp_flow_stats_to_loxi_flow_stats,
pb2.ofp_packet_in: ofp_packet_in_to_loxi_packet_in,
pb2.ofp_group_entry: ofp_group_entry_to_loxi_group_entry,
diff --git a/ofagent/grpc_client.py b/ofagent/grpc_client.py
index 5d42566..13b49b9 100644
--- a/ofagent/grpc_client.py
+++ b/ofagent/grpc_client.py
@@ -46,12 +46,15 @@
self.packet_out_queue = Queue() # queue to send out PacketOut msgs
self.packet_in_queue = DeferredQueue() # queue to receive PacketIn
+ self.change_event_queue = DeferredQueue() # queue change events
def start(self):
log.debug('starting')
self.start_packet_out_stream()
self.start_packet_in_stream()
+ self.start_change_event_in_stream()
reactor.callLater(0, self.packet_in_forwarder_loop)
+ reactor.callLater(0, self.change_event_processing_loop)
log.info('started')
return self
@@ -92,6 +95,31 @@
reactor.callInThread(receive_packet_in_stream)
+ def start_change_event_in_stream(self):
+
+ def receive_change_events():
+ streaming_rpc_method = self.local_stub.ReceiveChangeEvents
+ iterator = streaming_rpc_method(empty_pb2.Empty())
+ for event in iterator:
+ reactor.callFromThread(self.change_event_queue.put, event)
+ log.debug('enqued-change-event',
+ change_event=event,
+ queue_len=len(self.change_event_queue.pending))
+
+ reactor.callInThread(receive_change_events)
+
+ @inlineCallbacks
+ def change_event_processing_loop(self):
+ while True:
+ try:
+ event = yield self.change_event_queue.get()
+ device_id = event.id
+ self.connection_manager.forward_change_event(device_id, event)
+ except Exception, e:
+ log.exception('failed-in-packet-in-handler', e=e)
+ if self.stopped:
+ break
+
@inlineCallbacks
def packet_in_forwarder_loop(self):
while True:
diff --git a/ofagent/of_protocol_handler.py b/ofagent/of_protocol_handler.py
index b79786f..5ac8712 100644
--- a/ofagent/of_protocol_handler.py
+++ b/ofagent/of_protocol_handler.py
@@ -258,3 +258,6 @@
def forward_packet_in(self, ofp_packet_in):
self.cxn.send(to_loxi(ofp_packet_in))
+
+ def forward_port_status(self, ofp_port_status):
+ self.cxn.send(to_loxi(ofp_port_status))