Make Voltha/OFAgent Ctrl-C terminatable
Change-Id: I8d64b126d8d8d6f368d6cc236b2293fbcd108416
diff --git a/ofagent/connection_mgr.py b/ofagent/connection_mgr.py
index 479bf4e..71318af 100644
--- a/ofagent/connection_mgr.py
+++ b/ofagent/connection_mgr.py
@@ -115,7 +115,6 @@
log.info('Acquired a grpc channel to voltha')
return channel
-
@inlineCallbacks
def get_list_of_logical_devices_from_voltha(self):
@@ -126,7 +125,7 @@
devices = stub.ListLogicalDevices(Empty()).items
for device in devices:
log.info("Devices {} -> {}".format(device.id,
- device.datapath_id))
+ device.datapath_id))
returnValue(devices)
except Exception as e:
@@ -136,7 +135,6 @@
log.info('reconnect', after_delay=self.voltha_retry_interval)
yield asleep(self.voltha_retry_interval)
-
def refresh_agent_connections(self, devices):
"""
Based on the new device list, update the following state in the class:
diff --git a/ofagent/grpc_client.py b/ofagent/grpc_client.py
index d1b78bd..75c48ce 100644
--- a/ofagent/grpc_client.py
+++ b/ofagent/grpc_client.py
@@ -85,13 +85,19 @@
def receive_packet_in_stream():
streaming_rpc_method = self.local_stub.ReceivePacketsIn
- iterator = streaming_rpc_method(empty_pb2.Empty())
- for packet_in in iterator:
- reactor.callFromThread(self.packet_in_queue.put,
- packet_in)
- log.debug('enqued-packet-in',
- packet_in=packet_in,
- queue_len=len(self.packet_in_queue.pending))
+ iterator = streaming_rpc_method(empty_pb2.Empty(), timeout=1.0)
+ while not self.stopped:
+ try:
+ for packet_in in iterator:
+ reactor.callFromThread(self.packet_in_queue.put,
+ packet_in)
+ log.debug('enqued-packet-in',
+ packet_in=packet_in,
+ queue_len=len(self.packet_in_queue.pending))
+ except _Rendezvous, e:
+ if e.code() == StatusCode.DEADLINE_EXCEEDED:
+ continue
+ raise
reactor.callInThread(receive_packet_in_stream)
@@ -99,12 +105,18 @@
def receive_change_events():
streaming_rpc_method = self.local_stub.ReceiveChangeEvents
- iterator = streaming_rpc_method(empty_pb2.Empty())
- for event in iterator:
- reactor.callFromThread(self.change_event_queue.put, event)
- log.debug('enqued-change-event',
- change_event=event,
- queue_len=len(self.change_event_queue.pending))
+ iterator = streaming_rpc_method(empty_pb2.Empty(), timeout=1.0)
+ while not self.stopped:
+ try:
+ for event in iterator:
+ reactor.callFromThread(self.change_event_queue.put, event)
+ log.debug('enqued-change-event',
+ change_event=event,
+ queue_len=len(self.change_event_queue.pending))
+ except _Rendezvous, e:
+ if e.code() == StatusCode.DEADLINE_EXCEEDED:
+ continue
+ raise
reactor.callInThread(receive_change_events)