Packet in/out streaming from ofagent to core
Getting ready for packet streaming
Change-Id: I8d70d4d6ffbb23c0d7ab20582e9afac49f9f6461
Support flow_delete_strict
Change-Id: I5dab5f74a7daddcddfeb8691a3940347cb2fc11b
Packet out halfway plumbed
Change-Id: I799d3f59d42ac9de0563b5e6b9a0064fd895a6f6
refactored async_twisted
Change-Id: I68f8d12ce6fdbb70cee398f581669529b567d94d
Packet in pipeline and ofagent refactoring
Change-Id: I31ecbf7d52fdd18c3884b8d1870f673488f808df
diff --git a/ofagent/grpc_client.py b/ofagent/grpc_client.py
index 69bf940..efbc038 100644
--- a/ofagent/grpc_client.py
+++ b/ofagent/grpc_client.py
@@ -17,39 +17,86 @@
"""
The gRPC client layer for the OpenFlow agent
"""
+from Queue import Queue
+
+from structlog import get_logger
+from twisted.internet import reactor
from twisted.internet import threads
-from twisted.internet.defer import inlineCallbacks, returnValue
+from twisted.internet.defer import inlineCallbacks, returnValue, DeferredQueue
from protos.voltha_pb2 import ID, VolthaLogicalLayerStub, FlowTableUpdate, \
- GroupTableUpdate
+ GroupTableUpdate, NullMessage, PacketOut
+
+
+log = get_logger()
class GrpcClient(object):
- def __init__(self, channel, device_id_map):
+ def __init__(self, connection_manager, channel):
+
+ self.connection_manager = connection_manager
self.channel = channel
- self.device_id_map = device_id_map
self.logical_stub = VolthaLogicalLayerStub(channel)
+ self.packet_out_queue = Queue() # queue to send out PacketOut msgs
+ self.packet_in_queue = DeferredQueue() # queue to receive PacketIn
+ self.start_packet_out_stream()
+ self.start_packet_in_stream()
+ reactor.callLater(0, self.packet_in_forwarder_loop)
+
+ def start_packet_out_stream(self):
+
+ def packet_generator():
+ while 1:
+ packet = self.packet_out_queue.get(block=True)
+ yield packet
+
+ def stream_packets_out():
+ generator = packet_generator()
+ self.logical_stub.StreamPacketsOut(generator)
+
+ reactor.callInThread(stream_packets_out)
+
+ def start_packet_in_stream(self):
+
+ def receive_packet_in_stream():
+ for packet_in in self.logical_stub.ReceivePacketsIn(NullMessage()):
+ reactor.callFromThread(self.packet_in_queue.put, packet_in)
+ log.debug('enqued-packet-in',
+ packet_in=packet_in,
+ queue_len=len(self.packet_in_queue.pending))
+
+ reactor.callInThread(receive_packet_in_stream)
+
@inlineCallbacks
- def get_port_list(self, datapath_id):
- device_id = self.device_id_map[datapath_id]
+ def packet_in_forwarder_loop(self):
+ while True:
+ packet_in = yield self.packet_in_queue.get()
+ device_id = packet_in.id
+ ofp_packet_in = packet_in.packet_in
+ self.connection_manager.forward_packet_in(device_id, ofp_packet_in)
+
+ def send_packet_out(self, device_id, packet_out):
+ packet_out = PacketOut(id=device_id, packet_out=packet_out)
+ self.packet_out_queue.put(packet_out)
+
+ @inlineCallbacks
+ def get_port_list(self, device_id):
req = ID(id=device_id)
res = yield threads.deferToThread(
self.logical_stub.ListLogicalDevicePorts, req)
returnValue(res.items)
@inlineCallbacks
- def get_device_info(self, datapath_id):
- device_id = self.device_id_map[datapath_id]
+ def get_device_info(self, device_id):
req = ID(id=device_id)
res = yield threads.deferToThread(
self.logical_stub.GetLogicalDevice, req)
returnValue(res)
@inlineCallbacks
- def update_flow_table(self, datapath_id, flow_mod):
- device_id = self.device_id_map[datapath_id]
+ def update_flow_table(self, device_id, flow_mod):
req = FlowTableUpdate(
id=device_id,
flow_mod=flow_mod
@@ -59,8 +106,7 @@
returnValue(res)
@inlineCallbacks
- def update_group_table(self, datapath_id, group_mod):
- device_id = self.device_id_map[datapath_id]
+ def update_group_table(self, device_id, group_mod):
req = GroupTableUpdate(
id=device_id,
group_mod=group_mod
@@ -70,16 +116,14 @@
returnValue(res)
@inlineCallbacks
- def list_flows(self, datapath_id):
- device_id = self.device_id_map[datapath_id]
+ def list_flows(self, device_id):
req = ID(id=device_id)
res = yield threads.deferToThread(
self.logical_stub.ListDeviceFlows, req)
returnValue(res.items)
@inlineCallbacks
- def list_groups(self, datapath_id):
- device_id = self.device_id_map[datapath_id]
+ def list_groups(self, device_id):
req = ID(id=device_id)
res = yield threads.deferToThread(
self.logical_stub.ListDeviceFlowGroups, req)