VOL-281: OfAgent integration in swarm cluster
- Auto connect/reconnect of agent with a voltha instance
- Survives and cleans up after a voltha disconnect
- Added new compose files to start ofagent and onos in swarm cluster
Amendments:
- Fixed logs to have uniform format
- Removed instructions to start/stop ONOS service in cluster scripts
- Added missing change in local handler to exit streaming rpc calls
after ofagent termination
- Renamed references from voltha to vcore where necessary
Change-Id: Icb4611d92be35b48e557e6b12f7d2074282175ea
diff --git a/compose/docker-compose-ofagent-swarm.yml b/compose/docker-compose-ofagent-swarm.yml
new file mode 100644
index 0000000..dee18cc
--- /dev/null
+++ b/compose/docker-compose-ofagent-swarm.yml
@@ -0,0 +1,31 @@
+#
+# This Docker stackfile deploys a ofagent container on all swarm instances.
+#
+# The stackfile assumes that overlay network 'voltha_net' has already been
+# created. To deploy the stack, issue the command:
+#
+# docker stack deploy -c docker-compose-ofagent-swarm.yml ofagent
+#
+version: "3.2"
+services:
+ ofagent:
+ image: cord/ofagent
+ deploy:
+ replicas: 3
+ entrypoint:
+ - /ofagent/ofagent/main.py
+ - -v
+ - --consul=consul:8500
+ - --fluentd=fluentd:24224
+ - --controller=onos:6653
+ - --grpc-endpoint=vcore:50556
+ - --instance-id-is-container-name
+ volumes:
+ - /var/run/docker.sock:/tmp/docker.sock
+ networks:
+ - voltha-net
+
+networks:
+ voltha-net:
+ external:
+ name: voltha_net
diff --git a/compose/docker-compose-onos-swarm.yml b/compose/docker-compose-onos-swarm.yml
new file mode 100644
index 0000000..54d43ed
--- /dev/null
+++ b/compose/docker-compose-onos-swarm.yml
@@ -0,0 +1,54 @@
+#
+# This Docker stackfile deploys a single onos container and freeradius.
+#
+# The stackfile assumes that overlay network 'voltha_net' has already been
+# created. To deploy the stack, issue the command:
+#
+# docker stack deploy -c docker-compose-onos-swarm.yml onos
+#
+version: "3.2"
+services:
+ freeradius:
+ deploy:
+ replicas: 1
+ image: "marcelmaatkamp/freeradius"
+ ports:
+ - "1812:1812/udp"
+ - "1813:1813"
+ - "18120:18120"
+ volumes:
+ - /cord/incubator/voltha/compose/data/clients.conf:/etc/raddb/clients.conf
+ - /cord/incubator/voltha/compose/data/users:/etc/raddb/users
+ networks:
+ - net
+ - voltha-net
+
+ onos:
+ deploy:
+ # Single instance for now since a cluster
+ # needs to be configured through onos
+ replicas: 1
+ image: "cord/onos"
+ ports:
+ - 8101:8101 # ssh
+ - 6653:6653 # OF
+ - 8181:8181 # UI
+ environment:
+ ONOS_APPS: 'drivers,openflow-base'
+ networks:
+ - net
+ - voltha-net
+
+networks:
+ net:
+ driver: overlay
+ driver_opts:
+ encrypted: "true"
+ ipam:
+ driver: default
+ config:
+ - subnet: 172.25.0.0/24
+
+ voltha-net:
+ external:
+ name: voltha_net
diff --git a/install/voltha-swarm-start.sh b/install/voltha-swarm-start.sh
index ad5bdd0..4b40887 100755
--- a/install/voltha-swarm-start.sh
+++ b/install/voltha-swarm-start.sh
@@ -35,6 +35,7 @@
docker stack deploy -c ${voltha_base_dir}/compose/docker-compose-voltha-swarm.yml vcore
+docker stack deploy -c ${voltha_base_dir}/compose/docker-compose-ofagent-swarm.yml ofagent
docker stack deploy -c ${voltha_base_dir}/compose/docker-compose-envoy-swarm.yml voltha
docker stack deploy -c ${voltha_base_dir}/compose/docker-compose-vcli.yml cli
docker stack deploy -c ${voltha_base_dir}/compose/docker-compose-chameleon-swarm.yml chameleon
diff --git a/install/voltha-swarm-stop.sh b/install/voltha-swarm-stop.sh
old mode 100644
new mode 100755
index 8cdf9db..2633cd9
--- a/install/voltha-swarm-stop.sh
+++ b/install/voltha-swarm-stop.sh
@@ -4,6 +4,7 @@
docker service rm netconf_netconf
docker service rm cli_cli
docker service rm voltha_voltha
+docker service rm ofagent_ofagent
docker service rm vcore_vcore
docker service rm tools
docker stack rm consul
diff --git a/ofagent/connection_mgr.py b/ofagent/connection_mgr.py
index 8a4081c..0d75be0 100644
--- a/ofagent/connection_mgr.py
+++ b/ofagent/connection_mgr.py
@@ -28,34 +28,38 @@
from grpc._channel import _Rendezvous
from ofagent.protos import third_party
from protos import voltha_pb2
+from protos.voltha_pb2 import OfAgentSubscriber
from grpc_client import GrpcClient
from agent import Agent
from google.protobuf.empty_pb2 import Empty
+from common.utils.dockerhelpers import get_my_containers_name
log = get_logger()
# _ = third_party
class ConnectionManager(object):
-
- def __init__(self, consul_endpoint, voltha_endpoint, controller_endpoints,
- voltha_retry_interval=0.5, devices_refresh_interval=5):
+ def __init__(self, consul_endpoint, vcore_endpoint, controller_endpoints,
+ vcore_retry_interval=0.5, devices_refresh_interval=5,
+ subscription_refresh_interval=5):
log.info('init-connection-manager')
- log.info('list-of-controllers',controller_endpoints=controller_endpoints)
+ log.info('list-of-controllers', controller_endpoints=controller_endpoints)
self.controller_endpoints = controller_endpoints
self.consul_endpoint = consul_endpoint
- self.voltha_endpoint = voltha_endpoint
+ self.vcore_endpoint = vcore_endpoint
self.channel = None
- self.grpc_client = None # single, shared gRPC client to Voltha
+ self.grpc_client = None # single, shared gRPC client to vcore
self.agent_map = {} # (datapath_id, controller_endpoint) -> Agent()
self.device_id_to_datapath_id_map = {}
- self.voltha_retry_interval = voltha_retry_interval
+ self.vcore_retry_interval = vcore_retry_interval
self.devices_refresh_interval = devices_refresh_interval
+ self.subscription_refresh_interval = subscription_refresh_interval
+ self.subscription = None
self.running = False
@@ -68,11 +72,8 @@
self.running = True
- # Get voltha grpc endpoint
- self.channel = self.get_grpc_channel_with_voltha()
-
- # Create shared gRPC API object
- self.grpc_client = GrpcClient(self, self.channel).start()
+ # Start monitoring the vcore grpc channel
+ reactor.callInThread(self.monitor_vcore_grpc_channel)
# Start monitoring logical devices and manage agents accordingly
reactor.callLater(0, self.monitor_logical_devices)
@@ -87,8 +88,9 @@
for agent in self.agent_map.itervalues():
agent.stop()
self.running = False
- self.grpc_client.stop()
- del self.channel
+
+ self._reset_grpc_attributes()
+
log.info('stopped')
def resolve_endpoint(self, endpoint):
@@ -98,52 +100,121 @@
ip_port_endpoint = get_endpoint_from_consul(
self.consul_endpoint, endpoint[1:])
log.info(
- 'Found endpoint {} service at {}'.format(endpoint,
- ip_port_endpoint))
+ '{}-service-endpoint-found'.format(endpoint), address=ip_port_endpoint)
except Exception as e:
- log.error('Failure to locate {} service from '
- 'consul {}:'.format(endpoint, repr(e)))
- log.error('Committing suicide...')
+ log.error('{}-service-endpoint-not-found'.format(endpoint), exception=repr(e))
+ log.error('committing-suicide')
# Committing suicide in order to let docker restart ofagent
os.system("kill -15 {}".format(os.getpid()))
if ip_port_endpoint:
host, port = ip_port_endpoint.split(':', 2)
return host, int(port)
- def get_grpc_channel_with_voltha(self):
- log.info('Resolving voltha endpoint {} from consul'.format(
- self.voltha_endpoint))
- host, port = self.resolve_endpoint(self.voltha_endpoint)
+ def _reset_grpc_attributes(self):
+ log.debug('start-reset-grpc-attributes')
+
+ if self.grpc_client is not None:
+ self.grpc_client.stop()
+
+ if self.channel is not None:
+ del self.channel
+
+ self.is_alive = False
+ self.channel = None
+ self.subscription = None
+ self.grpc_client = None
+
+ log.debug('stop-reset-grpc-attributes')
+
+ def _assign_grpc_attributes(self):
+ log.debug('start-assign-grpc-attributes')
+
+ host, port = self.resolve_endpoint(self.vcore_endpoint)
+ log.info('revolved-vcore-endpoint', endpoint=self.vcore_endpoint, host=host, port=port)
+
assert host is not None
assert port is not None
- # Create grpc channel to Voltha
- channel = grpc.insecure_channel('{}:{}'.format(host, port))
- log.info('Acquired a grpc channel to voltha')
- return channel
+
+ # Establish a connection to the vcore GRPC server
+ self.channel = grpc.insecure_channel('{}:{}'.format(host, port))
+ self.is_alive = True
+
+ log.debug('stop-assign-grpc-attributes')
+
+ @inlineCallbacks
+ def monitor_vcore_grpc_channel(self):
+ log.debug('start-monitor-vcore-grpc-channel')
+
+ while self.running:
+ try:
+ # If a subscription is not yet assigned then establish new GRPC connection
+ # ... otherwise keep using existing connection details
+ if self.subscription is None:
+ self._assign_grpc_attributes()
+
+ # Send subscription request to register the current ofagent instance
+ container_name = get_my_containers_name()
+ stub = voltha_pb2.VolthaLocalServiceStub(self.channel)
+ subscription = stub.Subscribe(OfAgentSubscriber(ofagent_id=container_name))
+
+ # If the subscriber id matches the current instance
+ # ... then the subscription has succeeded
+ if subscription is not None and subscription.ofagent_id == container_name:
+ if self.subscription is None:
+ # Keep details on the current GRPC session and subscription
+ log.debug('subscription-with-vcore-successful', subscription=subscription)
+ self.subscription = subscription
+ self.grpc_client = GrpcClient(self, self.channel).start()
+
+ # Sleep a bit in between each subscribe
+ yield asleep(self.subscription_refresh_interval)
+
+ # Move on to next subscribe request
+ continue
+
+ # The subscription did not succeed, reset and move on
+ else:
+ log.info('subscription-with-vcore-unavailable', subscription=subscription)
+
+ except _Rendezvous, e:
+ log.error('subscription-with-vcore-terminated',exception=e, status=e.code())
+
+ except Exception as e:
+ log.exception('unexpected-subscription-termination-with-vcore', e=e)
+
+ # Reset grpc details
+ # The vcore instance is either not available for subscription
+ # or a failure occurred with the existing communication.
+ self._reset_grpc_attributes()
+
+ # Sleep for a short period and retry
+ yield asleep(self.vcore_retry_interval)
+
+ log.debug('stop-monitor-vcore-grpc-channel')
@inlineCallbacks
def get_list_of_logical_devices_from_voltha(self):
- while True:
- log.info('Retrieve devices from voltha')
+ while self.running:
+ log.info('retrieve-logical-device-list')
try:
stub = voltha_pb2.VolthaLocalServiceStub(self.channel)
devices = stub.ListLogicalDevices(Empty()).items
for device in devices:
- log.info("Devices {} -> {}".format(device.id,
- device.datapath_id))
+ log.info("logical-device-entry", id=device.id, datapath_id=device.datapath_id)
+
returnValue(devices)
except _Rendezvous, e:
+ log.error('vcore-communication-failure', exception=e, status=e.code())
if e.code() == StatusCode.UNAVAILABLE:
os.system("kill -15 {}".format(os.getpid()))
except Exception as e:
- log.error('Failure to retrieve devices from '
- 'voltha: {}'.format(repr(e)))
+ log.exception('logical-devices-retrieval-failure', exception=e)
- log.info('reconnect', after_delay=self.voltha_retry_interval)
- yield asleep(self.voltha_retry_interval)
+ log.info('reconnect', after_delay=self.vcore_retry_interval)
+ yield asleep(self.vcore_retry_interval)
def refresh_agent_connections(self, devices):
"""
@@ -185,7 +256,7 @@
device_id = device.id
for controller_endpoint in self.controller_endpoints:
agent = Agent(controller_endpoint, datapath_id,
- device_id, self.grpc_client)
+ device_id, self.grpc_client)
agent.start()
self.agent_map[(datapath_id,controller_endpoint)] = agent
self.device_id_to_datapath_id_map[device_id] = datapath_id
@@ -200,30 +271,45 @@
@inlineCallbacks
def monitor_logical_devices(self):
- while True:
+ log.debug('start-monitor-logical-devices')
+
+ while self.running:
+ log.info('monitoring-logical-devices')
+
# should change to a gRPC streaming call
# see https://jira.opencord.org/browse/CORD-821
- # get current list from Voltha
- devices = yield self.get_list_of_logical_devices_from_voltha()
+ try:
+ if self.channel is not None and self.grpc_client is not None:
+ # get current list from Voltha
+ devices = yield self.get_list_of_logical_devices_from_voltha()
- # update agent list and mapping tables as needed
- self.refresh_agent_connections(devices)
+ # update agent list and mapping tables as needed
+ self.refresh_agent_connections(devices)
+ else:
+ log.info('vcore-communication-unavailable')
- # wait before next poll
- yield asleep(self.devices_refresh_interval)
- log.info('Monitor connections')
+ # wait before next poll
+ yield asleep(self.devices_refresh_interval)
+
+ except _Rendezvous, e:
+ log.error('vcore-communication-failure', exception=repr(e), status=e.code())
+
+ except Exception as e:
+ log.exception('unexpected-vcore-communication-failure', exception=repr(e))
+
+ log.debug('stop-monitor-logical-devices')
def forward_packet_in(self, device_id, ofp_packet_in):
datapath_id = self.device_id_to_datapath_id_map.get(device_id, None)
if datapath_id:
- for controller_endpoint in self.controller_endpoints:
- agent = self.agent_map[(datapath_id,controller_endpoint)]
- agent.forward_packet_in(ofp_packet_in)
+ for controller_endpoint in self.controller_endpoints:
+ agent = self.agent_map[(datapath_id, controller_endpoint)]
+ agent.forward_packet_in(ofp_packet_in)
def forward_change_event(self, device_id, event):
datapath_id = self.device_id_to_datapath_id_map.get(device_id, None)
if datapath_id:
- for controller_endpoint in self.controller_endpoints:
- agent = self.agent_map[(datapath_id,controller_endpoint)]
- agent.forward_change_event(event)
+ for controller_endpoint in self.controller_endpoints:
+ agent = self.agent_map[(datapath_id, controller_endpoint)]
+ agent.forward_change_event(event)
diff --git a/tests/itests/README.md b/tests/itests/README.md
index 644b4ee..4aa9572 100644
--- a/tests/itests/README.md
+++ b/tests/itests/README.md
@@ -41,6 +41,23 @@
. ./env.sh
nosetests -s tests/itests/ofagent/test_ofagent_multicontroller_failover.py
```
+* **Ofagent_recovery**: This tests the OFAgent capability
+to recover the connectivity with Voltha after a component failure.
+Also note this test takes a while to run (approximately 6 mins).
+The steps it follows are
+ * Spawns three ONOS controllers and clusters them.
+ * Spawns required Voltha components.
+ * OFagent establishes connection with the three spawned controllers.
+ * Adds simulated OLT and enables it.
+ * Stop/start OFAgent and VOLTHA processes (2 separate tests)
+ * Ensure that the OLT created prior to stopping process is still present
+ * Adds another simulated OLT to ensure connectivity
+
+```
+cd /cord/incubator/voltha
+. ./env.sh
+nosetests -s tests/itests/ofagent/test_ofagent_recovery.py
+```
* **Frameio**: This tests the packet send/receive/filter capabilities of the
FrameIOManager. This test needs to run as root.
```
diff --git a/tests/itests/ofagent/test_ofagent_recovery.py b/tests/itests/ofagent/test_ofagent_recovery.py
new file mode 100644
index 0000000..38ea471
--- /dev/null
+++ b/tests/itests/ofagent/test_ofagent_recovery.py
@@ -0,0 +1,238 @@
+#!/usr/bin/env python
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import logging
+import os
+import time
+import json
+
+from tests.itests.voltha.rest_base import RestBase
+
+this_dir = os.path.abspath(os.path.dirname(__file__))
+
+from tests.itests.docutests.test_utils import run_command_to_completion_with_raw_stdout
+
+log = logging.getLogger(__name__)
+
+DOCKER_COMPOSE_FILE = "compose/docker-compose-ofagent-test.yml"
+
+command_defs = dict(
+ docker_stop="docker stop {}",
+ docker_start="docker start {}",
+ docker_compose_start_all="docker-compose -f {} up -d "
+ .format(DOCKER_COMPOSE_FILE),
+ docker_compose_stop="docker-compose -f {} stop"
+ .format(DOCKER_COMPOSE_FILE),
+ docker_compose_rm_f="docker-compose -f {} rm -f"
+ .format(DOCKER_COMPOSE_FILE),
+ onos_form_cluster="./tests/itests/ofagent/onos-form-cluster",
+ onos1_ip="docker inspect --format '{{ .NetworkSettings.Networks.compose_default.IPAddress }}' onos1",
+ onos2_ip="docker inspect --format '{{ .NetworkSettings.Networks.compose_default.IPAddress }}' onos2",
+ onos3_ip="docker inspect --format '{{ .NetworkSettings.Networks.compose_default.IPAddress }}' onos3",
+ add_olt='''curl -k -s -X POST -d '{"type": "simulated_olt"}' \
+ https://localhost:8881/api/v1/local/devices''',
+ enable_olt="curl -k -s -X POST https://localhost:8881/api/v1/local/devices/{}/enable",
+ get_onos_devices="curl -u karaf:karaf http://localhost:8181/onos/v1/devices")
+
+
+class OfagentRecoveryTest(RestBase):
+ def setUp(self):
+ # Run Voltha,OFAgent,3 ONOS and form ONOS cluster.
+ print "Starting all containers ..."
+ cmd = command_defs['docker_compose_start_all']
+ out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
+ self.assertEqual(rc, 0)
+ print "Waiting for all containers to be ready ..."
+ time.sleep(60)
+ cmd = command_defs['onos1_ip']
+ out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
+ self.assertEqual(rc, 0)
+ onos1_ip = out
+ print "ONOS1 IP is {}".format(onos1_ip)
+ cmd = command_defs['onos2_ip']
+ out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
+ self.assertEqual(rc, 0)
+ onos2_ip = out
+ print "ONOS2 IP is {}".format(onos2_ip)
+ cmd = command_defs['onos3_ip']
+ out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
+ self.assertEqual(rc, 0)
+ onos3_ip = out
+ print "ONOS3 IP is {}".format(onos3_ip)
+ cmd = command_defs['onos_form_cluster'] + ' {} {} {}'.format(onos1_ip.strip(),
+ onos2_ip.strip(),
+ onos3_ip.strip())
+ out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
+ self.assertEqual(rc, 0)
+ print "Cluster Output :{} ".format(out)
+
+ def tearDown(self):
+ # Stopping and Removing Voltha,OFAgent,3 ONOS.
+ print "Stopping and removing all containers ..."
+ cmd = command_defs['docker_compose_stop']
+ out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
+ self.assertEqual(rc, 0)
+ print "Waiting for all containers to be stopped ..."
+ time.sleep(1)
+ cmd = command_defs['docker_compose_rm_f']
+ out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
+ self.assertEqual(rc, 0)
+
+ def add_device(self):
+ print "Adding device"
+
+ cmd = command_defs['add_olt']
+ out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
+ self.assertEqual(rc, 0)
+ device = json.loads(out)
+
+ print "Added device - id:{}, type:{}".format(device['id'], device['type'])
+ time.sleep(5)
+
+ return device
+
+ def enable_device(self, device_id):
+ print "Enabling device - id:{}".format(device_id)
+
+ cmd = command_defs['enable_olt'].format(device_id)
+ out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
+ self.assertEqual(rc, 0)
+
+ time.sleep(30)
+ print "Enabled device - id:{}".format(device_id)
+
+ def get_device(self, device_id, expected_code=200):
+ print "Getting device - id:{}".format(device_id)
+
+ device = self.get('/api/v1/local/devices/{}'.format(device_id),
+ expected_code=expected_code)
+
+ if device is not None:
+ print "Got device - id:{}, type:{}".format(device['id'], device['type'])
+ else:
+ print "Unable to get device - id:{}".format(device_id)
+
+ return device
+
+ def get_onos_devices(self):
+ print "Getting ONOS devices ..."
+ cmd = command_defs['get_onos_devices']
+ out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
+ self.assertEqual(rc, 0)
+
+ if out is not None:
+ onos_devices = json.loads(out)
+ print "Got ONOS devices"
+ else:
+ onos_devices = None
+ print "Unable to get ONOS devices"
+
+ return onos_devices
+
+ def stop_container(self, container):
+ print "Stopping {} ...".format(container)
+
+ cmd = command_defs['docker_stop'].format(container)
+ out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
+ self.assertEqual(rc, 0)
+
+ time.sleep(10)
+ print "Stopped {}".format(container)
+
+ def start_container(self, container):
+ print "Starting {} ...".format(container)
+
+ cmd = command_defs['docker_start'].format(container)
+ out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
+ self.assertEqual(rc, 0)
+
+ time.sleep(10)
+ print "Started {}".format(container)
+
+ def test_01_recovery_after_voltha_restart(self):
+ # Add and enable a new OLT device
+ device_1 = self.add_device()
+ self.enable_device(device_1['id'])
+
+ # Verify that the device was propagated in ONOS
+ onos_devices = self.get_onos_devices()
+
+ self.assertEqual(len(onos_devices['devices']), 1)
+
+ # Restart voltha
+ self.stop_container('compose_voltha_1')
+ self.assertEqual(self.get_device(device_1['id'], 503), None)
+ self.start_container('compose_voltha_1')
+
+ # Get the device from VOLTHA after restart
+ device_1_after = self.get_device(device_1['id'])
+ self.assertEqual(device_1_after['id'], device_1['id'])
+
+ # Get the device from ONOS after restart
+ onos_devices = self.get_onos_devices()
+
+ self.assertEqual(len(onos_devices['devices']), 1)
+
+ # Add a new device
+ device_2 = self.add_device()
+ self.enable_device(device_2['id'])
+
+ # Ensure that ONOS has picked up the new device
+ onos_devices = self.get_onos_devices()
+
+ self.assertEqual(len(onos_devices['devices']), 2)
+
+ def test_02_recovery_after_ofagent_restart(self):
+ # Add and enable a new OLT device
+ device_1 = self.add_device()
+ self.enable_device(device_1['id'])
+
+ # Verify that the device was propagated in ONOS
+ onos_devices = self.get_onos_devices()
+
+ self.assertEqual(len(onos_devices['devices']), 1)
+
+ # Restart ofagent
+ self.stop_container('compose_ofagent_1')
+
+ # Try to create a device while ofagent is down
+ # this will succeed from a voltha point of view
+ # but it will not be propagated to ONOS until ofagent is back up
+ device_fail = self.add_device()
+ self.enable_device(device_fail['id'])
+ onos_devices = self.get_onos_devices()
+
+ # Onos should only have 1 device
+ self.assertNotEqual(len(onos_devices['devices']), 2)
+
+ self.start_container('compose_ofagent_1')
+
+ # Get the device from VOLTHA after restart
+ device_1_after = self.get_device(device_1['id'])
+ self.assertEqual(device_1_after['id'], device_1['id'])
+
+ # Get the device from ONOS after restart
+ onos_devices = self.get_onos_devices()
+ self.assertEqual(len(onos_devices['devices']), 2)
+
+ # Add a new device
+ device_2 = self.add_device()
+ self.enable_device(device_2['id'])
+
+ # Ensure that ONOS has picked up the new device
+ onos_devices = self.get_onos_devices()
+
+ self.assertEqual(len(onos_devices['devices']), 3)
diff --git a/voltha/core/local_handler.py b/voltha/core/local_handler.py
index f87830d..c598911 100644
--- a/voltha/core/local_handler.py
+++ b/voltha/core/local_handler.py
@@ -18,8 +18,10 @@
import structlog
from google.protobuf.empty_pb2 import Empty
from grpc import StatusCode
+from grpc._channel import _Rendezvous
from common.utils.grpc_utils import twisted_async
+from twisted.internet import task
from common.utils.id_generation import create_cluster_device_id
from voltha.core.config.config_root import ConfigRoot
from voltha.protos.openflow_13_pb2 import PacketIn, Flows, FlowGroups, \
@@ -29,11 +31,13 @@
VolthaInstance, Adapters, LogicalDevices, LogicalDevice, Ports, \
LogicalPorts, Devices, Device, DeviceType, \
DeviceTypes, DeviceGroups, DeviceGroup, AdminState, OperStatus, ChangeEvent, \
- AlarmFilter, AlarmFilters, SelfTestResponse
+ AlarmFilter, AlarmFilters, SelfTestResponse, OfAgentSubscriber
from voltha.protos.device_pb2 import PmConfigs, Images, ImageDownload, ImageDownloads
from voltha.protos.common_pb2 import OperationResp
+from voltha.protos.bbf_fiber_base_pb2 import AllMulticastDistributionSetData, AllMulticastGemportsConfigData
from voltha.registry import registry
from requests.api import request
+from common.utils.asleep import asleep
log = structlog.get_logger()
@@ -48,6 +52,14 @@
self.started_with_existing_data = False
self.stopped = False
+ self.restart_delay = 2
+ self.subscriber = None
+ self.ofagent_heartbeat_count = 0
+ self.ofagent_heartbeat_max_count = 3
+ self.ofagent_heartbeat_delay = 5
+ self.ofagent_heartbeat_lc = None
+ self.ofagent_is_alive = True
+
def start(self, config_backend=None):
log.debug('starting')
if config_backend:
@@ -72,12 +84,19 @@
registry('grpc_server').register(
add_VolthaLocalServiceServicer_to_server, self)
+
log.info('started')
return self
def stop(self):
log.debug('stopping')
self.stopped = True
+
+ if self.ofagent_heartbeat_lc is not None:
+ self.ofagent_heartbeat_lc.stop()
+
+ self._ofagent_session_termination()
+
log.info('stopped')
def get_proxy(self, path, exclusive=False):
@@ -986,6 +1005,7 @@
# bbf_fiber rpcs end
def StreamPacketsOut(self, request_iterator, context):
+ log.debug('start-stream-packets-out')
@twisted_async
def forward_packet_out(packet_out):
@@ -995,16 +1015,20 @@
for request in request_iterator:
forward_packet_out(packet_out=request)
+ log.debug('stop-stream-packets-out')
+
return Empty()
def ReceivePacketsIn(self, request, context):
- while 1:
+ log.debug('start-receive-packets-in')
+ while self.ofagent_is_alive:
try:
packet_in = self.core.packet_in_queue.get(timeout=1)
yield packet_in
except QueueEmpty:
if self.stopped:
break
+ log.debug('stop-receive-packets-in')
def send_packet_in(self, device_id, ofp_packet_in):
"""Must be called on the twisted thread"""
@@ -1012,13 +1036,15 @@
self.core.packet_in_queue.put(packet_in)
def ReceiveChangeEvents(self, request, context):
- while 1:
+ log.debug('start-receive-change-events')
+ while self.ofagent_is_alive:
try:
event = self.core.change_event_queue.get(timeout=1)
yield event
except QueueEmpty:
if self.stopped:
break
+ log.debug('stop-receive-change-events')
def send_port_change_event(self, device_id, port_status):
"""Must be called on the twisted thread"""
@@ -1151,3 +1177,71 @@
'Device \'{}\' not found'.format(request.id))
context.set_code(StatusCode.NOT_FOUND)
return SelfTestResponse()
+
+ def _ofagent_session_termination(self):
+ log.debug('start-ofagent-session-termination')
+
+ # Stop ofagent heartbeat
+ if self.ofagent_heartbeat_lc is not None:
+ self.ofagent_heartbeat_lc.stop()
+
+ # Reset flags and assignments
+ self.ofagent_is_alive = False
+ self.subscriber = None
+ self.ofagent_heartbeat_count = 0
+
+ # Some local services will stop (packet-in/change-events)
+ # need to re-register them
+ registry('grpc_server').register(
+ add_VolthaLocalServiceServicer_to_server, self)
+
+ log.debug('stop-ofagent-session-termination')
+
+ def _ofagent_session_heartbeat(self):
+ log.debug('start-ofagent-heartbeat')
+ if self.ofagent_heartbeat_count > self.ofagent_heartbeat_max_count:
+ self._ofagent_session_termination()
+ else:
+ self.ofagent_heartbeat_count += 1
+
+ log.debug('stop-ofagent-heartbeat')
+
+ @twisted_async
+ def Subscribe(self, request, context):
+ log.info('grpc-request', request=request)
+
+ # Check if an ofagent subscriber is assigned
+ if self.subscriber is None:
+ log.debug('ofagent-subscriber-request')
+
+ try:
+ # Assign the request as the active subscriber
+ self.subscriber = OfAgentSubscriber(
+ ofagent_id=request.ofagent_id,
+ voltha_id=self.instance_id
+ )
+
+ # Start the hearbeat
+ self.ofagent_heartbeat_count = 0
+ self.ofagent_heartbeat_lc = task.LoopingCall(self._ofagent_session_heartbeat)
+ self.ofagent_heartbeat_lc.start(self.ofagent_heartbeat_delay)
+
+ log.debug('ofagent-subscriber-connected', subscriber=self.subscriber)
+
+ except _Rendezvous, e:
+ log.error('ofagent-subscriber-failure', exception=repr(e), status=e.code())
+
+ except Exception as e:
+ log.exception('ofagent-subscriber-unexpected-failure', exception=repr(e))
+
+ elif self.subscriber.ofagent_id == request.ofagent_id:
+ log.debug('ofagent-subscriber-matches-assigned',
+ current=self.subscriber)
+ # reset counter
+ self.ofagent_heartbeat_count = 0
+
+ else:
+ log.debug('ofagent-subscriber-not-matching-assigned',
+ current=self.subscriber)
+
+ return self.subscriber
diff --git a/voltha/protos/voltha.proto b/voltha/protos/voltha.proto
index 30e1834..15cef20 100644
--- a/voltha/protos/voltha.proto
+++ b/voltha/protos/voltha.proto
@@ -207,6 +207,14 @@
SelfTestResult result = 1;
}
+message OfAgentSubscriber {
+ // ID of ofagent instance
+ string ofagent_id = 1;
+
+ // ID of voltha instance to which the ofagent is subscribed
+ string voltha_id = 2;
+}
+
/*
* Cluster-wide Voltha APIs
*
@@ -1612,4 +1620,6 @@
post: "/api/v1/local/devices/{id}/self_test"
};
}
+
+ rpc Subscribe(OfAgentSubscriber) returns (OfAgentSubscriber) {}
}