Make Voltha/OFAgent Ctrl-C terminatable
Change-Id: I8d64b126d8d8d6f368d6cc236b2293fbcd108416
diff --git a/ofagent/connection_mgr.py b/ofagent/connection_mgr.py
index 479bf4e..71318af 100644
--- a/ofagent/connection_mgr.py
+++ b/ofagent/connection_mgr.py
@@ -115,7 +115,6 @@
log.info('Acquired a grpc channel to voltha')
return channel
-
@inlineCallbacks
def get_list_of_logical_devices_from_voltha(self):
@@ -126,7 +125,7 @@
devices = stub.ListLogicalDevices(Empty()).items
for device in devices:
log.info("Devices {} -> {}".format(device.id,
- device.datapath_id))
+ device.datapath_id))
returnValue(devices)
except Exception as e:
@@ -136,7 +135,6 @@
log.info('reconnect', after_delay=self.voltha_retry_interval)
yield asleep(self.voltha_retry_interval)
-
def refresh_agent_connections(self, devices):
"""
Based on the new device list, update the following state in the class:
diff --git a/ofagent/grpc_client.py b/ofagent/grpc_client.py
index d1b78bd..75c48ce 100644
--- a/ofagent/grpc_client.py
+++ b/ofagent/grpc_client.py
@@ -85,13 +85,19 @@
def receive_packet_in_stream():
streaming_rpc_method = self.local_stub.ReceivePacketsIn
- iterator = streaming_rpc_method(empty_pb2.Empty())
- for packet_in in iterator:
- reactor.callFromThread(self.packet_in_queue.put,
- packet_in)
- log.debug('enqued-packet-in',
- packet_in=packet_in,
- queue_len=len(self.packet_in_queue.pending))
+ iterator = streaming_rpc_method(empty_pb2.Empty(), timeout=1.0)
+ while not self.stopped:
+ try:
+ for packet_in in iterator:
+ reactor.callFromThread(self.packet_in_queue.put,
+ packet_in)
+ log.debug('enqued-packet-in',
+ packet_in=packet_in,
+ queue_len=len(self.packet_in_queue.pending))
+ except _Rendezvous, e:
+ if e.code() == StatusCode.DEADLINE_EXCEEDED:
+ continue
+ raise
reactor.callInThread(receive_packet_in_stream)
@@ -99,12 +105,18 @@
def receive_change_events():
streaming_rpc_method = self.local_stub.ReceiveChangeEvents
- iterator = streaming_rpc_method(empty_pb2.Empty())
- for event in iterator:
- reactor.callFromThread(self.change_event_queue.put, event)
- log.debug('enqued-change-event',
- change_event=event,
- queue_len=len(self.change_event_queue.pending))
+ iterator = streaming_rpc_method(empty_pb2.Empty(), timeout=1.0)
+ while not self.stopped:
+ try:
+ for event in iterator:
+ reactor.callFromThread(self.change_event_queue.put, event)
+ log.debug('enqued-change-event',
+ change_event=event,
+ queue_len=len(self.change_event_queue.pending))
+ except _Rendezvous, e:
+ if e.code() == StatusCode.DEADLINE_EXCEEDED:
+ continue
+ raise
reactor.callInThread(receive_change_events)
diff --git a/voltha/coordinator.py b/voltha/coordinator.py
index b38d2a7..a2b94a6 100644
--- a/voltha/coordinator.py
+++ b/voltha/coordinator.py
@@ -94,6 +94,7 @@
self.leader_id = None # will be the instance id of the current leader
self.shutting_down = False
self.leader = None
+ self.session_renew_timer = None
self.worker = Worker(self.instance_id, self)
@@ -115,6 +116,7 @@
def stop(self):
log.debug('stopping')
self.shutting_down = True
+ self.session_renew_timer.stop()
yield self._delete_session() # this will delete the leader lock too
yield self.worker.stop()
if self.leader is not None:
@@ -198,8 +200,8 @@
log.info('created-consul-session', session_id=self.session_id)
# start renewing session it 3 times within the ttl
- lc = LoopingCall(_renew_session)
- lc.start(3)
+ self.session_renew_timer = LoopingCall(_renew_session)
+ self.session_renew_timer.start(3)
yield self._retry(_create_session)
diff --git a/voltha/core/local_handler.py b/voltha/core/local_handler.py
index ff60dbb..61a6e2c 100644
--- a/voltha/core/local_handler.py
+++ b/voltha/core/local_handler.py
@@ -12,18 +12,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
+from Queue import Empty as QueueEmpty
from uuid import uuid4
import structlog
+from google.protobuf.empty_pb2 import Empty
from grpc import StatusCode
from common.utils.grpc_utils import twisted_async
from voltha.core.config.config_root import ConfigRoot
from voltha.protos.openflow_13_pb2 import PacketIn, Flows, FlowGroups, \
ofp_port_status
-
-from google.protobuf.empty_pb2 import Empty
-
from voltha.protos.voltha_pb2 import \
add_VolthaLocalServiceServicer_to_server, VolthaLocalServiceServicer, \
VolthaInstance, Adapters, LogicalDevices, LogicalDevice, Ports, \
@@ -416,8 +415,12 @@
def ReceivePacketsIn(self, request, context):
while 1:
- packet_in = self.core.packet_in_queue.get()
- yield packet_in
+ try:
+ packet_in = self.core.packet_in_queue.get(timeout=1)
+ yield packet_in
+ except QueueEmpty:
+ if self.stopped:
+ break
def send_packet_in(self, device_id, ofp_packet_in):
"""Must be called on the twisted thread"""
@@ -426,8 +429,12 @@
def ReceiveChangeEvents(self, request, context):
while 1:
- event = self.core.change_event_queue.get()
- yield event
+ try:
+ event = self.core.change_event_queue.get(timeout=1)
+ yield event
+ except QueueEmpty:
+ if self.stopped:
+ break
def send_port_change_event(self, device_id, port_status):
"""Must be called on the twisted thread"""
diff --git a/voltha/main.py b/voltha/main.py
index 1fb5d04..2262c94 100755
--- a/voltha/main.py
+++ b/voltha/main.py
@@ -343,6 +343,18 @@
for component in reversed(registry.iterate()):
yield component.stop()
+ import threading
+ self.log.info('THREADS:')
+ main_thread = threading.current_thread()
+ for t in threading.enumerate():
+ if t is main_thread:
+ continue
+ if not t.isDaemon():
+ continue
+ self.log.info('joining thread {} {}'.format(
+ t.getName(), "daemon" if t.isDaemon() else "not-daemon"))
+ t.join()
+
def start_reactor(self):
from twisted.internet import reactor
reactor.callWhenRunning(
diff --git a/voltha/northbound/grpc/grpc_server.py b/voltha/northbound/grpc/grpc_server.py
index c6f0fbc..ded5de4 100644
--- a/voltha/northbound/grpc/grpc_server.py
+++ b/voltha/northbound/grpc/grpc_server.py
@@ -226,6 +226,7 @@
for service in self.services:
service.stop()
self.server.stop(grace)
+ self.thread_pool.shutdown(False)
log.debug('stopped')
def register(self, activator_func, service):