Nomralize component start()/stop()
Also fixed the /schema swagger/rest entry. It did not work
because the 3rdparty protobuf_to_dict library cannot handle
Map fields. Changed the two map fields to a single list
entry.
Change-Id: Ib25a528701b67d58d32451687724c8247da6efa5
diff --git a/ofagent/grpc_client.py b/ofagent/grpc_client.py
index efbc038..5b6b0a8 100644
--- a/ofagent/grpc_client.py
+++ b/ofagent/grpc_client.py
@@ -17,8 +17,10 @@
"""
The gRPC client layer for the OpenFlow agent
"""
-from Queue import Queue
+from Queue import Queue, Empty
+from grpc import StatusCode
+from grpc._channel import _Rendezvous
from structlog import get_logger
from twisted.internet import reactor
from twisted.internet import threads
@@ -39,18 +41,35 @@
self.channel = channel
self.logical_stub = VolthaLogicalLayerStub(channel)
+ self.stopped = False
+
self.packet_out_queue = Queue() # queue to send out PacketOut msgs
self.packet_in_queue = DeferredQueue() # queue to receive PacketIn
+
+ def start(self):
+ log.debug('starting')
self.start_packet_out_stream()
self.start_packet_in_stream()
reactor.callLater(0, self.packet_in_forwarder_loop)
+ log.info('started')
+ return self
+
+ def stop(self):
+ log.debug('stopping')
+ self.stopped = True
+ log.info('stopped')
def start_packet_out_stream(self):
def packet_generator():
while 1:
- packet = self.packet_out_queue.get(block=True)
- yield packet
+ try:
+ packet = self.packet_out_queue.get(block=True, timeout=1.0)
+ except Empty:
+ if self.stopped:
+ return
+ else:
+ yield packet
def stream_packets_out():
generator = packet_generator()
@@ -61,8 +80,11 @@
def start_packet_in_stream(self):
def receive_packet_in_stream():
- for packet_in in self.logical_stub.ReceivePacketsIn(NullMessage()):
- reactor.callFromThread(self.packet_in_queue.put, packet_in)
+ streaming_rpc_method = self.logical_stub.ReceivePacketsIn
+ iterator = streaming_rpc_method(NullMessage())
+ for packet_in in iterator:
+ reactor.callFromThread(self.packet_in_queue.put,
+ packet_in)
log.debug('enqued-packet-in',
packet_in=packet_in,
queue_len=len(self.packet_in_queue.pending))
@@ -76,6 +98,8 @@
device_id = packet_in.id
ofp_packet_in = packet_in.packet_in
self.connection_manager.forward_packet_in(device_id, ofp_packet_in)
+ if self.stopped:
+ break
def send_packet_out(self, device_id, packet_out):
packet_out = PacketOut(id=device_id, packet_out=packet_out)