VOL-281: OfAgent integration in swarm cluster
- Auto connect/reconnect of agent with a voltha instance
- Survives and cleans up after a voltha disconnect
- Added new compose files to start ofagent and onos in swarm cluster
Amendments:
- Fixed logs to have uniform format
- Removed instructions to start/stop ONOS service in cluster scripts
- Added missing change in local handler to exit streaming rpc calls
after ofagent termination
- Renamed references from voltha to vcore where necessary
Change-Id: Icb4611d92be35b48e557e6b12f7d2074282175ea
diff --git a/ofagent/connection_mgr.py b/ofagent/connection_mgr.py
index 8a4081c..0d75be0 100644
--- a/ofagent/connection_mgr.py
+++ b/ofagent/connection_mgr.py
@@ -28,34 +28,38 @@
from grpc._channel import _Rendezvous
from ofagent.protos import third_party
from protos import voltha_pb2
+from protos.voltha_pb2 import OfAgentSubscriber
from grpc_client import GrpcClient
from agent import Agent
from google.protobuf.empty_pb2 import Empty
+from common.utils.dockerhelpers import get_my_containers_name
log = get_logger()
# _ = third_party
class ConnectionManager(object):
-
- def __init__(self, consul_endpoint, voltha_endpoint, controller_endpoints,
- voltha_retry_interval=0.5, devices_refresh_interval=5):
+ def __init__(self, consul_endpoint, vcore_endpoint, controller_endpoints,
+ vcore_retry_interval=0.5, devices_refresh_interval=5,
+ subscription_refresh_interval=5):
log.info('init-connection-manager')
- log.info('list-of-controllers',controller_endpoints=controller_endpoints)
+ log.info('list-of-controllers', controller_endpoints=controller_endpoints)
self.controller_endpoints = controller_endpoints
self.consul_endpoint = consul_endpoint
- self.voltha_endpoint = voltha_endpoint
+ self.vcore_endpoint = vcore_endpoint
self.channel = None
- self.grpc_client = None # single, shared gRPC client to Voltha
+ self.grpc_client = None # single, shared gRPC client to vcore
self.agent_map = {} # (datapath_id, controller_endpoint) -> Agent()
self.device_id_to_datapath_id_map = {}
- self.voltha_retry_interval = voltha_retry_interval
+ self.vcore_retry_interval = vcore_retry_interval
self.devices_refresh_interval = devices_refresh_interval
+ self.subscription_refresh_interval = subscription_refresh_interval
+ self.subscription = None
self.running = False
@@ -68,11 +72,8 @@
self.running = True
- # Get voltha grpc endpoint
- self.channel = self.get_grpc_channel_with_voltha()
-
- # Create shared gRPC API object
- self.grpc_client = GrpcClient(self, self.channel).start()
+ # Start monitoring the vcore grpc channel
+ reactor.callInThread(self.monitor_vcore_grpc_channel)
# Start monitoring logical devices and manage agents accordingly
reactor.callLater(0, self.monitor_logical_devices)
@@ -87,8 +88,9 @@
for agent in self.agent_map.itervalues():
agent.stop()
self.running = False
- self.grpc_client.stop()
- del self.channel
+
+ self._reset_grpc_attributes()
+
log.info('stopped')
def resolve_endpoint(self, endpoint):
@@ -98,52 +100,121 @@
ip_port_endpoint = get_endpoint_from_consul(
self.consul_endpoint, endpoint[1:])
log.info(
- 'Found endpoint {} service at {}'.format(endpoint,
- ip_port_endpoint))
+ '{}-service-endpoint-found'.format(endpoint), address=ip_port_endpoint)
except Exception as e:
- log.error('Failure to locate {} service from '
- 'consul {}:'.format(endpoint, repr(e)))
- log.error('Committing suicide...')
+ log.error('{}-service-endpoint-not-found'.format(endpoint), exception=repr(e))
+ log.error('committing-suicide')
# Committing suicide in order to let docker restart ofagent
os.system("kill -15 {}".format(os.getpid()))
if ip_port_endpoint:
host, port = ip_port_endpoint.split(':', 2)
return host, int(port)
- def get_grpc_channel_with_voltha(self):
- log.info('Resolving voltha endpoint {} from consul'.format(
- self.voltha_endpoint))
- host, port = self.resolve_endpoint(self.voltha_endpoint)
+ def _reset_grpc_attributes(self):
+ log.debug('start-reset-grpc-attributes')
+
+ if self.grpc_client is not None:
+ self.grpc_client.stop()
+
+ if self.channel is not None:
+ del self.channel
+
+ self.is_alive = False
+ self.channel = None
+ self.subscription = None
+ self.grpc_client = None
+
+ log.debug('stop-reset-grpc-attributes')
+
+ def _assign_grpc_attributes(self):
+ log.debug('start-assign-grpc-attributes')
+
+ host, port = self.resolve_endpoint(self.vcore_endpoint)
+ log.info('revolved-vcore-endpoint', endpoint=self.vcore_endpoint, host=host, port=port)
+
assert host is not None
assert port is not None
- # Create grpc channel to Voltha
- channel = grpc.insecure_channel('{}:{}'.format(host, port))
- log.info('Acquired a grpc channel to voltha')
- return channel
+
+ # Establish a connection to the vcore GRPC server
+ self.channel = grpc.insecure_channel('{}:{}'.format(host, port))
+ self.is_alive = True
+
+ log.debug('stop-assign-grpc-attributes')
+
+ @inlineCallbacks
+ def monitor_vcore_grpc_channel(self):
+ log.debug('start-monitor-vcore-grpc-channel')
+
+ while self.running:
+ try:
+ # If a subscription is not yet assigned then establish new GRPC connection
+ # ... otherwise keep using existing connection details
+ if self.subscription is None:
+ self._assign_grpc_attributes()
+
+ # Send subscription request to register the current ofagent instance
+ container_name = get_my_containers_name()
+ stub = voltha_pb2.VolthaLocalServiceStub(self.channel)
+ subscription = stub.Subscribe(OfAgentSubscriber(ofagent_id=container_name))
+
+ # If the subscriber id matches the current instance
+ # ... then the subscription has succeeded
+ if subscription is not None and subscription.ofagent_id == container_name:
+ if self.subscription is None:
+ # Keep details on the current GRPC session and subscription
+ log.debug('subscription-with-vcore-successful', subscription=subscription)
+ self.subscription = subscription
+ self.grpc_client = GrpcClient(self, self.channel).start()
+
+ # Sleep a bit in between each subscribe
+ yield asleep(self.subscription_refresh_interval)
+
+ # Move on to next subscribe request
+ continue
+
+ # The subscription did not succeed, reset and move on
+ else:
+ log.info('subscription-with-vcore-unavailable', subscription=subscription)
+
+ except _Rendezvous, e:
+ log.error('subscription-with-vcore-terminated',exception=e, status=e.code())
+
+ except Exception as e:
+ log.exception('unexpected-subscription-termination-with-vcore', e=e)
+
+ # Reset grpc details
+ # The vcore instance is either not available for subscription
+ # or a failure occurred with the existing communication.
+ self._reset_grpc_attributes()
+
+ # Sleep for a short period and retry
+ yield asleep(self.vcore_retry_interval)
+
+ log.debug('stop-monitor-vcore-grpc-channel')
@inlineCallbacks
def get_list_of_logical_devices_from_voltha(self):
- while True:
- log.info('Retrieve devices from voltha')
+ while self.running:
+ log.info('retrieve-logical-device-list')
try:
stub = voltha_pb2.VolthaLocalServiceStub(self.channel)
devices = stub.ListLogicalDevices(Empty()).items
for device in devices:
- log.info("Devices {} -> {}".format(device.id,
- device.datapath_id))
+ log.info("logical-device-entry", id=device.id, datapath_id=device.datapath_id)
+
returnValue(devices)
except _Rendezvous, e:
+ log.error('vcore-communication-failure', exception=e, status=e.code())
if e.code() == StatusCode.UNAVAILABLE:
os.system("kill -15 {}".format(os.getpid()))
except Exception as e:
- log.error('Failure to retrieve devices from '
- 'voltha: {}'.format(repr(e)))
+ log.exception('logical-devices-retrieval-failure', exception=e)
- log.info('reconnect', after_delay=self.voltha_retry_interval)
- yield asleep(self.voltha_retry_interval)
+ log.info('reconnect', after_delay=self.vcore_retry_interval)
+ yield asleep(self.vcore_retry_interval)
def refresh_agent_connections(self, devices):
"""
@@ -185,7 +256,7 @@
device_id = device.id
for controller_endpoint in self.controller_endpoints:
agent = Agent(controller_endpoint, datapath_id,
- device_id, self.grpc_client)
+ device_id, self.grpc_client)
agent.start()
self.agent_map[(datapath_id,controller_endpoint)] = agent
self.device_id_to_datapath_id_map[device_id] = datapath_id
@@ -200,30 +271,45 @@
@inlineCallbacks
def monitor_logical_devices(self):
- while True:
+ log.debug('start-monitor-logical-devices')
+
+ while self.running:
+ log.info('monitoring-logical-devices')
+
# should change to a gRPC streaming call
# see https://jira.opencord.org/browse/CORD-821
- # get current list from Voltha
- devices = yield self.get_list_of_logical_devices_from_voltha()
+ try:
+ if self.channel is not None and self.grpc_client is not None:
+ # get current list from Voltha
+ devices = yield self.get_list_of_logical_devices_from_voltha()
- # update agent list and mapping tables as needed
- self.refresh_agent_connections(devices)
+ # update agent list and mapping tables as needed
+ self.refresh_agent_connections(devices)
+ else:
+ log.info('vcore-communication-unavailable')
- # wait before next poll
- yield asleep(self.devices_refresh_interval)
- log.info('Monitor connections')
+ # wait before next poll
+ yield asleep(self.devices_refresh_interval)
+
+ except _Rendezvous, e:
+ log.error('vcore-communication-failure', exception=repr(e), status=e.code())
+
+ except Exception as e:
+ log.exception('unexpected-vcore-communication-failure', exception=repr(e))
+
+ log.debug('stop-monitor-logical-devices')
def forward_packet_in(self, device_id, ofp_packet_in):
datapath_id = self.device_id_to_datapath_id_map.get(device_id, None)
if datapath_id:
- for controller_endpoint in self.controller_endpoints:
- agent = self.agent_map[(datapath_id,controller_endpoint)]
- agent.forward_packet_in(ofp_packet_in)
+ for controller_endpoint in self.controller_endpoints:
+ agent = self.agent_map[(datapath_id, controller_endpoint)]
+ agent.forward_packet_in(ofp_packet_in)
def forward_change_event(self, device_id, event):
datapath_id = self.device_id_to_datapath_id_map.get(device_id, None)
if datapath_id:
- for controller_endpoint in self.controller_endpoints:
- agent = self.agent_map[(datapath_id,controller_endpoint)]
- agent.forward_change_event(event)
+ for controller_endpoint in self.controller_endpoints:
+ agent = self.agent_map[(datapath_id, controller_endpoint)]
+ agent.forward_change_event(event)