VOL-1097 : Ofagent integration for voltha 2.0

- Created a common location for python based components
- Adjusted the ofagent component to interact with voltha 2.0
- Added streaming rpc methods for rcv/send of packets to voltha api
- Adjusted voltha.proto

Change-Id: I47fb7b80878ead060b4b42bd16cb4f8aa384fdb6
diff --git a/python/ofagent/connection_mgr.py b/python/ofagent/connection_mgr.py
new file mode 100755
index 0000000..b960b69
--- /dev/null
+++ b/python/ofagent/connection_mgr.py
@@ -0,0 +1,329 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import os
+
+import sys
+
+from twisted.internet import reactor
+from twisted.internet.defer import Deferred, inlineCallbacks, returnValue
+
+from common.utils.asleep import asleep
+from common.utils.consulhelpers import get_endpoint_from_consul
+from structlog import get_logger
+import grpc
+from grpc import StatusCode
+from grpc._channel import _Rendezvous
+from ofagent.protos import third_party
+from protos import voltha_pb2
+from protos.voltha_pb2 import OfAgentSubscriber
+from grpc_client import GrpcClient
+
+from agent import Agent
+from common.utils.dockerhelpers import get_my_containers_name
+
+
+log = get_logger()
+# _ = third_party
+
+class ConnectionManager(object):
+    def __init__(self, consul_endpoint, vcore_endpoint, vcore_grpc_timeout,
+                 controller_endpoints, instance_id,
+                 enable_tls=False, key_file=None, cert_file=None,
+                 vcore_retry_interval=0.5, devices_refresh_interval=5,
+                 subscription_refresh_interval=5):
+
+        log.info('init-connection-manager')
+        log.info('list-of-controllers', controller_endpoints=controller_endpoints)
+        self.controller_endpoints = controller_endpoints
+        self.consul_endpoint = consul_endpoint
+        self.vcore_endpoint = vcore_endpoint
+        self.grpc_timeout = vcore_grpc_timeout
+        self.instance_id = instance_id
+        self.enable_tls = enable_tls
+        self.key_file = key_file
+        self.cert_file = cert_file
+
+        self.channel = None
+        self.grpc_client = None  # single, shared gRPC client to vcore
+
+        self.agent_map = {}  # (datapath_id, controller_endpoint) -> Agent()
+        self.device_id_to_datapath_id_map = {}
+
+        self.vcore_retry_interval = vcore_retry_interval
+        self.devices_refresh_interval = devices_refresh_interval
+        self.subscription_refresh_interval = subscription_refresh_interval
+        self.subscription = None
+
+        self.running = False
+
+    def start(self):
+
+        if self.running:
+            return
+
+        log.debug('starting')
+
+        self.running = True
+
+        # Get a subscription to vcore
+        reactor.callInThread(self.get_vcore_subscription)
+
+        # Start monitoring logical devices and manage agents accordingly
+        reactor.callLater(0, self.monitor_logical_devices)
+
+        log.info('started')
+
+        return self
+
+    def stop(self):
+        log.debug('stopping')
+        # clean up all controller connections
+        for agent in self.agent_map.itervalues():
+            agent.stop()
+        self.running = False
+
+        self._reset_grpc_attributes()
+
+        log.info('stopped')
+
+    def resolve_endpoint(self, endpoint):
+        ip_port_endpoint = endpoint
+        if endpoint.startswith('@'):
+            try:
+                ip_port_endpoint = get_endpoint_from_consul(
+                    self.consul_endpoint, endpoint[1:])
+                log.info(
+                    '{}-service-endpoint-found'.format(endpoint), address=ip_port_endpoint)
+            except Exception as e:
+                log.error('{}-service-endpoint-not-found'.format(endpoint), exception=repr(e))
+                log.error('committing-suicide')
+                # Committing suicide in order to let docker restart ofagent
+                os.system("kill -15 {}".format(os.getpid()))
+        if ip_port_endpoint:
+            host, port = ip_port_endpoint.split(':', 2)
+            return host, int(port)
+
+    def _reset_grpc_attributes(self):
+        log.debug('start-reset-grpc-attributes')
+
+        if self.grpc_client is not None:
+            self.grpc_client.stop()
+
+        if self.channel is not None:
+            del self.channel
+
+        self.is_alive = False
+        self.channel = None
+        self.subscription = None
+        self.grpc_client = None
+
+        log.debug('stop-reset-grpc-attributes')
+
+    def _assign_grpc_attributes(self):
+        log.debug('start-assign-grpc-attributes')
+
+        host, port = self.resolve_endpoint(self.vcore_endpoint)
+        log.info('revolved-vcore-endpoint', endpoint=self.vcore_endpoint, host=host, port=port)
+
+        assert host is not None
+        assert port is not None
+
+        # Establish a connection to the vcore GRPC server
+        self.channel = grpc.insecure_channel('{}:{}'.format(host, port))
+        self.is_alive = True
+
+        log.debug('stop-assign-grpc-attributes')
+
+    @inlineCallbacks
+    def get_vcore_subscription(self):
+        log.debug('start-get-vcore-subscription')
+
+        while self.running and self.subscription is None:
+            try:
+                # If a subscription is not yet assigned then establish new GRPC connection
+                # ... otherwise keep using existing connection details
+                if self.subscription is None:
+                    self._assign_grpc_attributes()
+
+                # Send subscription request to register the current ofagent instance
+                container_name = self.instance_id
+                if self.grpc_client is None:
+                    self.grpc_client = GrpcClient(self, self.channel, self.grpc_timeout)
+                subscription = yield self.grpc_client.subscribe(
+                    OfAgentSubscriber(ofagent_id=container_name))
+
+                # If the subscriber id matches the current instance
+                # ... then the subscription has succeeded
+                if subscription is not None and subscription.ofagent_id == container_name:
+                    if self.subscription is None:
+                        # Keep details on the current GRPC session and subscription
+                        log.debug('subscription-with-vcore-successful', subscription=subscription)
+                        self.subscription = subscription
+                        self.grpc_client.start()
+
+                    # Sleep a bit in between each subscribe
+                    yield asleep(self.subscription_refresh_interval)
+
+                    # Move on to next subscribe request
+                    continue
+
+                # The subscription did not succeed, reset and move on
+                else:
+                    log.info('subscription-with-vcore-unavailable', subscription=subscription)
+
+            except _Rendezvous, e:
+                log.error('subscription-with-vcore-terminated',exception=e, status=e.code())
+
+            except Exception as e:
+                log.exception('unexpected-subscription-termination-with-vcore', e=e)
+
+            # Reset grpc details
+            # The vcore instance is either not available for subscription
+            # or a failure occurred with the existing communication.
+            self._reset_grpc_attributes()
+
+            # Sleep for a short period and retry
+            yield asleep(self.vcore_retry_interval)
+
+        log.debug('stop-get-vcore-subscription')
+
+    @inlineCallbacks
+    def get_list_of_logical_devices_from_voltha(self):
+
+        while self.running:
+            log.info('retrieve-logical-device-list')
+            try:
+                devices = yield \
+                    self.grpc_client.list_logical_devices()
+
+                for device in devices:
+                    log.info("logical-device-entry", id=device.id,
+                             datapath_id=device.datapath_id)
+
+                returnValue(devices)
+
+            except _Rendezvous, e:
+                status = e.code()
+                log.error('vcore-communication-failure', exception=e, status=status)
+                if status == StatusCode.UNAVAILABLE or status == StatusCode.DEADLINE_EXCEEDED:
+                    os.system("kill -15 {}".format(os.getpid()))
+
+            except Exception as e:
+                log.exception('logical-devices-retrieval-failure', exception=e)
+
+            log.info('reconnect', after_delay=self.vcore_retry_interval)
+            yield asleep(self.vcore_retry_interval)
+
+    def refresh_agent_connections(self, devices):
+        """
+        Based on the new device list, update the following state in the class:
+        * agent_map
+        * datapath_map
+        * device_id_map
+        :param devices: full device list freshly received from Voltha
+        :return: None
+        """
+
+        # Use datapath ids for deciding what's new and what's obsolete
+        desired_datapath_ids = set(d.datapath_id for d in devices)
+        current_datapath_ids = set(datapath_ids[0] for datapath_ids in self.agent_map.iterkeys())
+
+        # if identical, nothing to do
+        if desired_datapath_ids == current_datapath_ids:
+            return
+
+        # ... otherwise calculate differences
+        to_add = desired_datapath_ids.difference(current_datapath_ids)
+        to_del = current_datapath_ids.difference(desired_datapath_ids)
+
+        # remove what we don't need
+        for datapath_id in to_del:
+            self.delete_agent(datapath_id)
+
+        # start new agents as needed
+        for device in devices:
+            if device.datapath_id in to_add:
+                self.create_agent(device)
+
+        log.debug('updated-agent-list', count=len(self.agent_map))
+        log.debug('updated-device-id-to-datapath-id-map',
+                  map=str(self.device_id_to_datapath_id_map))
+
+    def create_agent(self, device):
+        datapath_id = device.datapath_id
+        device_id = device.id
+        for controller_endpoint in self.controller_endpoints:
+            agent = Agent(controller_endpoint, datapath_id,
+                          device_id, self.grpc_client, self.enable_tls,
+                          self.key_file, self.cert_file)
+            agent.start()
+            self.agent_map[(datapath_id,controller_endpoint)] = agent
+            self.device_id_to_datapath_id_map[device_id] = datapath_id
+
+    def delete_agent(self, datapath_id):
+        for controller_endpoint in self.controller_endpoints:
+            agent = self.agent_map[(datapath_id,controller_endpoint)]
+            device_id = agent.get_device_id()
+            agent.stop()
+            del self.agent_map[(datapath_id,controller_endpoint)]
+            del self.device_id_to_datapath_id_map[device_id]
+
+    @inlineCallbacks
+    def monitor_logical_devices(self):
+        log.debug('start-monitor-logical-devices')
+
+        while self.running:
+            log.info('monitoring-logical-devices')
+
+            # should change to a gRPC streaming call
+            # see https://jira.opencord.org/browse/CORD-821
+
+            try:
+                if self.channel is not None and self.grpc_client is not None and \
+                                self.subscription is not None:
+                    # get current list from Voltha
+                    devices = yield \
+                        self.get_list_of_logical_devices_from_voltha()
+
+                    # update agent list and mapping tables as needed
+                    self.refresh_agent_connections(devices)
+                else:
+                    log.info('vcore-communication-unavailable')
+
+                # wait before next poll
+                yield asleep(self.devices_refresh_interval)
+
+            except _Rendezvous, e:
+                log.error('vcore-communication-failure', exception=repr(e), status=e.code())
+
+            except Exception as e:
+                log.exception('unexpected-vcore-communication-failure', exception=repr(e))
+
+        log.debug('stop-monitor-logical-devices')
+
+    def forward_packet_in(self, device_id, ofp_packet_in):
+        datapath_id = self.device_id_to_datapath_id_map.get(device_id, None)
+        if datapath_id:
+            for controller_endpoint in self.controller_endpoints:
+                agent = self.agent_map[(datapath_id, controller_endpoint)]
+                agent.forward_packet_in(ofp_packet_in)
+
+    def forward_change_event(self, device_id, event):
+        datapath_id = self.device_id_to_datapath_id_map.get(device_id, None)
+        if datapath_id:
+            for controller_endpoint in self.controller_endpoints:
+                agent = self.agent_map[(datapath_id, controller_endpoint)]
+                agent.forward_change_event(event)