VOL-1097 : Ofagent integration for voltha 2.0
- Created a common location for python based components
- Adjusted the ofagent component to interact with voltha 2.0
- Added streaming rpc methods for rcv/send of packets to voltha api
- Adjusted voltha.proto
Change-Id: I47fb7b80878ead060b4b42bd16cb4f8aa384fdb6
diff --git a/python/ofagent/connection_mgr.py b/python/ofagent/connection_mgr.py
new file mode 100755
index 0000000..b960b69
--- /dev/null
+++ b/python/ofagent/connection_mgr.py
@@ -0,0 +1,329 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import os
+
+import sys
+
+from twisted.internet import reactor
+from twisted.internet.defer import Deferred, inlineCallbacks, returnValue
+
+from common.utils.asleep import asleep
+from common.utils.consulhelpers import get_endpoint_from_consul
+from structlog import get_logger
+import grpc
+from grpc import StatusCode
+from grpc._channel import _Rendezvous
+from ofagent.protos import third_party
+from protos import voltha_pb2
+from protos.voltha_pb2 import OfAgentSubscriber
+from grpc_client import GrpcClient
+
+from agent import Agent
+from common.utils.dockerhelpers import get_my_containers_name
+
+
+log = get_logger()
+# _ = third_party
+
+class ConnectionManager(object):
+ def __init__(self, consul_endpoint, vcore_endpoint, vcore_grpc_timeout,
+ controller_endpoints, instance_id,
+ enable_tls=False, key_file=None, cert_file=None,
+ vcore_retry_interval=0.5, devices_refresh_interval=5,
+ subscription_refresh_interval=5):
+
+ log.info('init-connection-manager')
+ log.info('list-of-controllers', controller_endpoints=controller_endpoints)
+ self.controller_endpoints = controller_endpoints
+ self.consul_endpoint = consul_endpoint
+ self.vcore_endpoint = vcore_endpoint
+ self.grpc_timeout = vcore_grpc_timeout
+ self.instance_id = instance_id
+ self.enable_tls = enable_tls
+ self.key_file = key_file
+ self.cert_file = cert_file
+
+ self.channel = None
+ self.grpc_client = None # single, shared gRPC client to vcore
+
+ self.agent_map = {} # (datapath_id, controller_endpoint) -> Agent()
+ self.device_id_to_datapath_id_map = {}
+
+ self.vcore_retry_interval = vcore_retry_interval
+ self.devices_refresh_interval = devices_refresh_interval
+ self.subscription_refresh_interval = subscription_refresh_interval
+ self.subscription = None
+
+ self.running = False
+
+ def start(self):
+
+ if self.running:
+ return
+
+ log.debug('starting')
+
+ self.running = True
+
+ # Get a subscription to vcore
+ reactor.callInThread(self.get_vcore_subscription)
+
+ # Start monitoring logical devices and manage agents accordingly
+ reactor.callLater(0, self.monitor_logical_devices)
+
+ log.info('started')
+
+ return self
+
+ def stop(self):
+ log.debug('stopping')
+ # clean up all controller connections
+ for agent in self.agent_map.itervalues():
+ agent.stop()
+ self.running = False
+
+ self._reset_grpc_attributes()
+
+ log.info('stopped')
+
+ def resolve_endpoint(self, endpoint):
+ ip_port_endpoint = endpoint
+ if endpoint.startswith('@'):
+ try:
+ ip_port_endpoint = get_endpoint_from_consul(
+ self.consul_endpoint, endpoint[1:])
+ log.info(
+ '{}-service-endpoint-found'.format(endpoint), address=ip_port_endpoint)
+ except Exception as e:
+ log.error('{}-service-endpoint-not-found'.format(endpoint), exception=repr(e))
+ log.error('committing-suicide')
+ # Committing suicide in order to let docker restart ofagent
+ os.system("kill -15 {}".format(os.getpid()))
+ if ip_port_endpoint:
+ host, port = ip_port_endpoint.split(':', 2)
+ return host, int(port)
+
+ def _reset_grpc_attributes(self):
+ log.debug('start-reset-grpc-attributes')
+
+ if self.grpc_client is not None:
+ self.grpc_client.stop()
+
+ if self.channel is not None:
+ del self.channel
+
+ self.is_alive = False
+ self.channel = None
+ self.subscription = None
+ self.grpc_client = None
+
+ log.debug('stop-reset-grpc-attributes')
+
+ def _assign_grpc_attributes(self):
+ log.debug('start-assign-grpc-attributes')
+
+ host, port = self.resolve_endpoint(self.vcore_endpoint)
+ log.info('revolved-vcore-endpoint', endpoint=self.vcore_endpoint, host=host, port=port)
+
+ assert host is not None
+ assert port is not None
+
+ # Establish a connection to the vcore GRPC server
+ self.channel = grpc.insecure_channel('{}:{}'.format(host, port))
+ self.is_alive = True
+
+ log.debug('stop-assign-grpc-attributes')
+
+ @inlineCallbacks
+ def get_vcore_subscription(self):
+ log.debug('start-get-vcore-subscription')
+
+ while self.running and self.subscription is None:
+ try:
+ # If a subscription is not yet assigned then establish new GRPC connection
+ # ... otherwise keep using existing connection details
+ if self.subscription is None:
+ self._assign_grpc_attributes()
+
+ # Send subscription request to register the current ofagent instance
+ container_name = self.instance_id
+ if self.grpc_client is None:
+ self.grpc_client = GrpcClient(self, self.channel, self.grpc_timeout)
+ subscription = yield self.grpc_client.subscribe(
+ OfAgentSubscriber(ofagent_id=container_name))
+
+ # If the subscriber id matches the current instance
+ # ... then the subscription has succeeded
+ if subscription is not None and subscription.ofagent_id == container_name:
+ if self.subscription is None:
+ # Keep details on the current GRPC session and subscription
+ log.debug('subscription-with-vcore-successful', subscription=subscription)
+ self.subscription = subscription
+ self.grpc_client.start()
+
+ # Sleep a bit in between each subscribe
+ yield asleep(self.subscription_refresh_interval)
+
+ # Move on to next subscribe request
+ continue
+
+ # The subscription did not succeed, reset and move on
+ else:
+ log.info('subscription-with-vcore-unavailable', subscription=subscription)
+
+ except _Rendezvous, e:
+ log.error('subscription-with-vcore-terminated',exception=e, status=e.code())
+
+ except Exception as e:
+ log.exception('unexpected-subscription-termination-with-vcore', e=e)
+
+ # Reset grpc details
+ # The vcore instance is either not available for subscription
+ # or a failure occurred with the existing communication.
+ self._reset_grpc_attributes()
+
+ # Sleep for a short period and retry
+ yield asleep(self.vcore_retry_interval)
+
+ log.debug('stop-get-vcore-subscription')
+
+ @inlineCallbacks
+ def get_list_of_logical_devices_from_voltha(self):
+
+ while self.running:
+ log.info('retrieve-logical-device-list')
+ try:
+ devices = yield \
+ self.grpc_client.list_logical_devices()
+
+ for device in devices:
+ log.info("logical-device-entry", id=device.id,
+ datapath_id=device.datapath_id)
+
+ returnValue(devices)
+
+ except _Rendezvous, e:
+ status = e.code()
+ log.error('vcore-communication-failure', exception=e, status=status)
+ if status == StatusCode.UNAVAILABLE or status == StatusCode.DEADLINE_EXCEEDED:
+ os.system("kill -15 {}".format(os.getpid()))
+
+ except Exception as e:
+ log.exception('logical-devices-retrieval-failure', exception=e)
+
+ log.info('reconnect', after_delay=self.vcore_retry_interval)
+ yield asleep(self.vcore_retry_interval)
+
+ def refresh_agent_connections(self, devices):
+ """
+ Based on the new device list, update the following state in the class:
+ * agent_map
+ * datapath_map
+ * device_id_map
+ :param devices: full device list freshly received from Voltha
+ :return: None
+ """
+
+ # Use datapath ids for deciding what's new and what's obsolete
+ desired_datapath_ids = set(d.datapath_id for d in devices)
+ current_datapath_ids = set(datapath_ids[0] for datapath_ids in self.agent_map.iterkeys())
+
+ # if identical, nothing to do
+ if desired_datapath_ids == current_datapath_ids:
+ return
+
+ # ... otherwise calculate differences
+ to_add = desired_datapath_ids.difference(current_datapath_ids)
+ to_del = current_datapath_ids.difference(desired_datapath_ids)
+
+ # remove what we don't need
+ for datapath_id in to_del:
+ self.delete_agent(datapath_id)
+
+ # start new agents as needed
+ for device in devices:
+ if device.datapath_id in to_add:
+ self.create_agent(device)
+
+ log.debug('updated-agent-list', count=len(self.agent_map))
+ log.debug('updated-device-id-to-datapath-id-map',
+ map=str(self.device_id_to_datapath_id_map))
+
+ def create_agent(self, device):
+ datapath_id = device.datapath_id
+ device_id = device.id
+ for controller_endpoint in self.controller_endpoints:
+ agent = Agent(controller_endpoint, datapath_id,
+ device_id, self.grpc_client, self.enable_tls,
+ self.key_file, self.cert_file)
+ agent.start()
+ self.agent_map[(datapath_id,controller_endpoint)] = agent
+ self.device_id_to_datapath_id_map[device_id] = datapath_id
+
+ def delete_agent(self, datapath_id):
+ for controller_endpoint in self.controller_endpoints:
+ agent = self.agent_map[(datapath_id,controller_endpoint)]
+ device_id = agent.get_device_id()
+ agent.stop()
+ del self.agent_map[(datapath_id,controller_endpoint)]
+ del self.device_id_to_datapath_id_map[device_id]
+
+ @inlineCallbacks
+ def monitor_logical_devices(self):
+ log.debug('start-monitor-logical-devices')
+
+ while self.running:
+ log.info('monitoring-logical-devices')
+
+ # should change to a gRPC streaming call
+ # see https://jira.opencord.org/browse/CORD-821
+
+ try:
+ if self.channel is not None and self.grpc_client is not None and \
+ self.subscription is not None:
+ # get current list from Voltha
+ devices = yield \
+ self.get_list_of_logical_devices_from_voltha()
+
+ # update agent list and mapping tables as needed
+ self.refresh_agent_connections(devices)
+ else:
+ log.info('vcore-communication-unavailable')
+
+ # wait before next poll
+ yield asleep(self.devices_refresh_interval)
+
+ except _Rendezvous, e:
+ log.error('vcore-communication-failure', exception=repr(e), status=e.code())
+
+ except Exception as e:
+ log.exception('unexpected-vcore-communication-failure', exception=repr(e))
+
+ log.debug('stop-monitor-logical-devices')
+
+ def forward_packet_in(self, device_id, ofp_packet_in):
+ datapath_id = self.device_id_to_datapath_id_map.get(device_id, None)
+ if datapath_id:
+ for controller_endpoint in self.controller_endpoints:
+ agent = self.agent_map[(datapath_id, controller_endpoint)]
+ agent.forward_packet_in(ofp_packet_in)
+
+ def forward_change_event(self, device_id, event):
+ datapath_id = self.device_id_to_datapath_id_map.get(device_id, None)
+ if datapath_id:
+ for controller_endpoint in self.controller_endpoints:
+ agent = self.agent_map[(datapath_id, controller_endpoint)]
+ agent.forward_change_event(event)