Dockerizing ofagent

Change-Id: Ic2ead67cedd74463a72efca8c3f5d74ea2433af8
diff --git a/Dockerfile.ofagent b/Dockerfile.ofagent
new file mode 100644
index 0000000..4f23ad1
--- /dev/null
+++ b/Dockerfile.ofagent
@@ -0,0 +1,44 @@
+#!/usr/bin/env python
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FROM cord/voltha-base
+
+MAINTAINER Zsolt Haraszti <zharaszt@ciena.com>
+MAINTAINER Ali Al-Shabibi <ali.al-shabibi@onlab.us>
+MAINTAINER Nathan Knuth   <nathan.knuth@tibitcom.com>
+
+# Install protoc version 3.0.0; this is not yet the supported
+# version on xenial, so we need to "backport" it
+RUN apt-get update && \
+    apt-get install -y zlib1g-dev wget && \
+    wget http://ftp.us.debian.org/debian/pool/main/p/protobuf/libprotoc10_3.0.0-7_amd64.deb && \
+    wget http://ftp.us.debian.org/debian/pool/main/p/protobuf/libprotobuf-lite10_3.0.0-7_amd64.deb && \
+    wget http://ftp.us.debian.org/debian/pool/main/p/protobuf/libprotobuf-dev_3.0.0-7_amd64.deb && \
+    wget http://ftp.us.debian.org/debian/pool/main/p/protobuf/libprotobuf10_3.0.0-7_amd64.deb && \
+    wget http://ftp.us.debian.org/debian/pool/main/p/protobuf/protobuf-compiler_3.0.0-7_amd64.deb && \
+    dpkg -i *.deb && \
+    protoc --version && \
+    rm -f *.deb
+
+# Bundle app source
+COPY ofagent /ofagent
+COPY common /common
+
+# Exposing process and default entry point
+# EXPOSE 8000
+
+CMD ["python", "ofagent/main.py"]
diff --git a/Makefile b/Makefile
index e43ec51..9ca87aa 100644
--- a/Makefile
+++ b/Makefile
@@ -76,6 +76,7 @@
 build: protos docker-base
 	docker build -t cord/voltha -f Dockerfile.voltha .
 	docker build -t cord/chameleon -f Dockerfile.chameleon .
+	docker build -t cord/ofagent -f Dockerfile.ofagent .
 
 docker-base: .docker-base-built
 
diff --git a/compose/docker-compose-system-test.yml b/compose/docker-compose-system-test.yml
index 5c261f8..a3e8ab1 100644
--- a/compose/docker-compose-system-test.yml
+++ b/compose/docker-compose-system-test.yml
@@ -126,3 +126,26 @@
       SERVICE_8881_NAME: "chameleon-rest"
     volumes:
     - "/var/run/docker.sock:/tmp/docker.sock"
+  #
+  # ofagent server instance
+  #
+  ofagent:
+    image: cord/ofagent
+    command: [
+      "/ofagent/main.py",
+      "-v",
+      "--consul=${DOCKER_HOST_IP}:8500",
+      "--fluentd=fluentd:24224",
+      "--controller=localhost:6633",
+      "--grpc-endpoint=@voltha-grpc",
+      "--instance-id-is-container-name",
+      "-v"
+    ]
+    depends_on:
+    - consul
+    - voltha
+    links:
+    - consul
+    - fluentd
+    volumes:
+    - "/var/run/docker.sock:/tmp/docker.sock"
diff --git a/ofagent/connection_mgr.py b/ofagent/connection_mgr.py
new file mode 100644
index 0000000..9a17121
--- /dev/null
+++ b/ofagent/connection_mgr.py
@@ -0,0 +1,231 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import sys
+
+from twisted.internet import reactor
+from twisted.internet.defer import Deferred, inlineCallbacks, returnValue
+
+from common.utils.asleep import asleep
+from common.utils.consulhelpers import get_endpoint_from_consul
+from structlog import get_logger
+import grpc
+from protos import voltha_pb2
+from grpc_client import GrpcClient
+
+from agent import Agent
+
+class ConnectionManager(object):
+
+    log = get_logger()
+
+    def __init__(self, consul_endpoint, voltha_endpoint, controller_endpoint,
+                 voltha_retry_interval=0.5, devices_refresh_interval=60):
+
+        self.log.info('Initializing connection manager')
+        self.controller_endpoint = controller_endpoint
+        self.consul_endpoint = consul_endpoint
+        self.voltha_endpoint = voltha_endpoint
+
+        self.channel = None
+        self.connected_devices = None
+        self.unprocessed_devices = None
+        self.agent_map = {}
+        self.grpc_client = None
+        self.device_id_map = None
+
+        self.voltha_retry_interval = voltha_retry_interval
+        self.devices_refresh_interval = devices_refresh_interval
+
+        self.running = False
+
+    @inlineCallbacks
+    def run(self):
+        if self.running:
+            return
+
+        self.log.info('Running connection manager')
+
+        self.running = True
+
+        # Get voltha grpc endpoint
+        self.channel = self.get_grpc_channel_with_voltha()
+
+        # Connect to voltha using grpc and fetch the list of logical devices
+        yield self.get_list_of_logical_devices_from_voltha()
+
+        # Create shared gRPC API object
+        self.grpc_client = GrpcClient(self.channel, self.device_id_map)
+
+        # Instantiate an OpenFlow agent for each logical device
+        self.refresh_openflow_agent_connections()
+
+        reactor.addSystemEventTrigger('before', 'shutdown', self.shutdown)
+        reactor.callLater(0, self.monitor_connections)
+
+        returnValue(self)
+
+
+    def shutdown(self):
+        # clean up all controller connections
+        for key, value in enumerate(self.agent_map):
+            value.stop()
+        self.running = False
+        # TODO: close grpc connection to voltha
+
+    def resolve_endpoint(self, endpoint):
+        ip_port_endpoint = endpoint
+        if endpoint.startswith('@'):
+            try:
+                ip_port_endpoint = get_endpoint_from_consul(
+                    self.consul_endpoint, endpoint[1:])
+                self.log.info(
+                    'Found endpoint {} service at {}'.format(endpoint,
+                                                             ip_port_endpoint))
+            except Exception as e:
+                self.log.error('Failure to locate {} service from '
+                               'consul {}:'.format(endpoint, repr(e)))
+                return
+        if ip_port_endpoint:
+            host, port = ip_port_endpoint.split(':', 2)
+            return host, int(port)
+
+    def get_grpc_channel_with_voltha(self):
+        self.log.info('Resolving voltha endpoint {} from consul'.format(
+            self.voltha_endpoint))
+        host, port = self.resolve_endpoint(self.voltha_endpoint)
+        assert host is not None
+        assert port is not None
+        # Create grpc channel to Voltha
+        channel = grpc.insecure_channel('{}:{}'.format(host, port))
+        self.log.info('Acquired a grpc channel to voltha')
+        return channel
+
+
+    @inlineCallbacks
+    def get_list_of_logical_devices_from_voltha(self):
+        while True:
+            self.log.info('Retrieve devices from voltha')
+            try:
+                stub = voltha_pb2.VolthaLogicalLayerStub(self.channel)
+                devices = stub.ListLogicalDevices(
+                    voltha_pb2.NullMessage()).items
+                for device in devices:
+                    self.log.info("Devices {} -> {}".format(device.id,
+                                                            device.datapath_id))
+                self.unprocessed_devices = devices
+                self.device_id_map = dict(
+                    (device.datapath_id, device.id) for device in devices)
+                return
+            except Exception as e:
+                self.log.error('Failure to retrieve devices from '
+                               'voltha: {}'.format(repr(e)))
+
+            self.log.info('reconnect', after_delay=self.voltha_retry_interval)
+            yield asleep(self.voltha_retry_interval)
+
+
+    def refresh_openflow_agent_connections(self):
+        # Compare the new device list again the previous
+        # For any new device, an agent connection will be created.  For
+        # existing device that are no longer part of the list then that
+        # agent connection will be stopped
+
+        # If the ofagent has no previous devices then just add them
+        if self.connected_devices is None:
+            datapath_ids_to_add = [device.datapath_id for device in self.unprocessed_devices]
+        else:
+            previous_datapath_ids = [device.datapath_id for device in self.connected_devices]
+            current_datapath_ids = [device.datapath_id for device in self.unprocessed_devices]
+            datapath_ids_to_add = [d for d in current_datapath_ids if
+                                 d not in previous_datapath_ids]
+            datapath_ids_to_remove = [d for d in previous_datapath_ids if
+                                 d not in current_datapath_ids]
+
+            # Check for no change
+            if not datapath_ids_to_add and not datapath_ids_to_remove:
+                self.log.info('No new devices found.  No OF agent update '
+                              'required')
+                return
+
+            self.log.info('Updating OF agent connections.')
+            print self.agent_map
+
+            # Stop previous agents
+            for datapath_id in datapath_ids_to_remove:
+                if self.agent_map.has_key(datapath_id):
+                    self.agent_map[datapath_id].stop()
+                    del self.agent_map[datapath_id]
+                    self.log.info('Removed OF agent with datapath id {'
+                                  '}'.format(datapath_id))
+
+        # Add the new agents
+        for datapath_id in datapath_ids_to_add:
+            self.agent_map[datapath_id] = Agent(self.controller_endpoint,
+                                                datapath_id,
+                                                self.grpc_client)
+            self.agent_map[datapath_id].run()
+            self.log.info('Launched OF agent with datapath id {}'.format(
+                datapath_id))
+
+        # replace the old device list with the new ones
+        self.connected_devices = self.unprocessed_devices
+        self.unprocessed_devices = None
+
+    @inlineCallbacks
+    def monitor_connections(self):
+        while True:
+            # sleep first
+            yield asleep(self.devices_refresh_interval)
+            self.log.info('Monitor connections')
+            yield self.get_list_of_logical_devices_from_voltha()
+            self.refresh_openflow_agent_connections()
+
+# class Model(object):
+#     def __init__(self, id, path):
+#         self.id=id
+#         self.datapath_id=path,
+
+
+# if __name__ == '__main__':
+#     conn = ConnectionManager("10.0.2.15:3181", "localhost:50555",
+#                              "10.100.198.150:6633")
+#
+#     conn.connected_devices = None
+#     model1 = Model('12311', 'wdadsa1')
+#     model2 = Model('12312', 'wdadsa2')
+#     model3 = Model('12313', 'wdadsa3')
+#     model4 = Model('12314', 'wdadsa4')
+#
+#     conn.unprocessed_devices = [model1, model2, model3]
+#
+#     conn.refresh_openflow_agent_connections()
+#
+#
+#     # val = [device.datapath_id for device in conn.connected_devices]
+#     # print val
+#     #
+#     # for (id,n) in enumerate(val):
+#     #     print n
+#
+#
+#     conn.unprocessed_devices = [model1, model2, model3]
+#
+#     conn.refresh_openflow_agent_connections()
+#
+#     conn.unprocessed_devices = [model1, model2, model4]
+#
+#     conn.refresh_openflow_agent_connections()
\ No newline at end of file
diff --git a/ofagent/main.py b/ofagent/main.py
index 0d8c8d4..7d0cdf7 100755
--- a/ofagent/main.py
+++ b/ofagent/main.py
@@ -14,22 +14,154 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-
-"""TODO This is a POC placeholder """
+import argparse
 import os
-
-import grpc
+import sys
 import yaml
-from twisted.internet import reactor
+from twisted.internet.defer import inlineCallbacks
 
-from agent import Agent
+base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
+sys.path.append(base_dir)
+sys.path.append(os.path.join(base_dir, '/ofagent/protos/third_party'))
+
+from common.utils.dockerhelpers import get_my_containers_name
+from common.utils.nethelpers import get_my_primary_local_ipv4
 from common.utils.structlog_setup import setup_logging
-from protos import voltha_pb2
+from connection_mgr import ConnectionManager
 
-from grpc_client import GrpcClient
+defs = dict(
+    config=os.environ.get('CONFIG', './ofagent.yml'),
+    consul=os.environ.get('CONSUL', 'localhost:8500'),
+    controller=os.environ.get('CONTROLLER', 'localhost:6633'),
+    external_host_address=os.environ.get('EXTERNAL_HOST_ADDRESS',
+                                         get_my_primary_local_ipv4()),
+    grpc_endpoint=os.environ.get('GRPC_ENDPOINT', 'localhost:50055'),
+    fluentd=os.environ.get('FLUENTD', None),
+    instance_id=os.environ.get('INSTANCE_ID', os.environ.get('HOSTNAME', '1')),
+    internal_host_address=os.environ.get('INTERNAL_HOST_ADDRESS',
+                                         get_my_primary_local_ipv4()),
+    work_dir=os.environ.get('WORK_DIR', '/tmp/ofagent')
+)
 
 
-def load_config(path):
+def parse_args():
+
+    parser = argparse.ArgumentParser()
+
+    _help = ('Path to ofagent.yml config file (default: %s). '
+             'If relative, it is relative to main.py of ofagent.'
+             % defs['config'])
+    parser.add_argument('-c', '--config',
+                        dest='config',
+                        action='store',
+                        default=defs['config'],
+                        help=_help)
+
+    _help = '<hostname>:<port> to consul agent (default: %s)' % defs['consul']
+    parser.add_argument(
+        '-C', '--consul', dest='consul', action='store',
+        default=defs['consul'],
+        help=_help)
+
+    _help = '<hostname>:<port> to openflow controller (default: %s)' % \
+            defs['controller']
+    parser.add_argument(
+        '-O', '--controller', dest='controller', action='store',
+        default=defs['controller'],
+        help=_help)
+
+    _help = ('<hostname> or <ip> at which ofagent is reachable from outside '
+             'the cluster (default: %s)' % defs['external_host_address'])
+    parser.add_argument('-E', '--external-host-address',
+                        dest='external_host_address',
+                        action='store',
+                        default=defs['external_host_address'],
+                        help=_help)
+
+    _help = ('<hostname>:<port> to fluentd server (default: %s). (If not '
+             'specified (None), the address from the config file is used'
+             % defs['fluentd'])
+    parser.add_argument('-F', '--fluentd',
+                        dest='fluentd',
+                        action='store',
+                        default=defs['fluentd'],
+                        help=_help)
+
+    _help = ('gRPC end-point to connect to. It can either be a direct'
+             'definition in the form of <hostname>:<port>, or it can be an'
+             'indirect definition in the form of @<service-name> where'
+             '<service-name> is the name of the grpc service as registered'
+             'in consul (example: @voltha-grpc). (default: %s'
+             % defs['grpc_endpoint'])
+    parser.add_argument('-G', '--grpc-endpoint',
+                        dest='grpc_endpoint',
+                        action='store',
+                        default=defs['grpc_endpoint'],
+                        help=_help)
+
+    _help = ('<hostname> or <ip> at which ofagent is reachable from inside'
+             'the cluster (default: %s)' % defs['internal_host_address'])
+    parser.add_argument('-H', '--internal-host-address',
+                        dest='internal_host_address',
+                        action='store',
+                        default=defs['internal_host_address'],
+                        help=_help)
+
+    _help = ('unique string id of this ofagent instance (default: %s)'
+             % defs['instance_id'])
+    parser.add_argument('-i', '--instance-id',
+                        dest='instance_id',
+                        action='store',
+                        default=defs['instance_id'],
+                        help=_help)
+
+    _help = 'omit startup banner log lines'
+    parser.add_argument('-n', '--no-banner',
+                        dest='no_banner',
+                        action='store_true',
+                        default=False,
+                        help=_help)
+
+    _help = "suppress debug and info logs"
+    parser.add_argument('-q', '--quiet',
+                        dest='quiet',
+                        action='count',
+                        help=_help)
+
+    _help = 'enable verbose logging'
+    parser.add_argument('-v', '--verbose',
+                        dest='verbose',
+                        action='count',
+                        help=_help)
+
+    _help = ('work dir to compile and assemble generated files (default=%s)'
+             % defs['work_dir'])
+    parser.add_argument('-w', '--work-dir',
+                        dest='work_dir',
+                        action='store',
+                        default=defs['work_dir'],
+                        help=_help)
+
+    _help = ('use docker container name as ofagent instance id'
+             ' (overrides -i/--instance-id option)')
+    parser.add_argument('--instance-id-is-container-name',
+                        dest='instance_id_is_container_name',
+                        action='store_true',
+                        default=False,
+                        help=_help)
+
+    args = parser.parse_args()
+
+    # post-processing
+
+    if args.instance_id_is_container_name:
+        args.instance_id = get_my_containers_name()
+
+    return args
+
+
+def load_config(args):
+    path = args.config
     if path.startswith('.'):
         dir = os.path.dirname(os.path.abspath(__file__))
         path = os.path.join(dir, path)
@@ -38,39 +170,73 @@
         config = yaml.load(fd)
     return config
 
+banner = r'''
+  ___  _____ _                    _
+ / _ \|  ___/ \   __ _  ___ _ __ | |_
+| | | | |_ / _ \ / _` |/ _ \ '_ \| __|
+| |_| |  _/ ___ \ (_| |  __/ | | | |_
+ \___/|_|/_/   \_\__, |\___|_| |_|\__|
+                 |___/
+'''
+
+def print_banner(log):
+    for line in banner.strip('\n').splitlines():
+        log.info(line)
+    log.info('(to stop: press Ctrl-C)')
+
+
+class Main(object):
+
+    def __init__(self):
+
+        self.args = args = parse_args()
+        self.config = load_config(args)
+
+        verbosity_adjust = (args.verbose or 0) - (args.quiet or 0)
+        self.log = setup_logging(self.config.get('logging', {}),
+                                 args.instance_id,
+                                 verbosity_adjust=verbosity_adjust,
+                                 fluentd=args.fluentd)
+
+        # components
+        self.connection_manager = None
+
+        self.exiting = False
+
+        if not args.no_banner:
+            print_banner(self.log)
+
+        self.startup_components()
+
+    def start(self):
+        self.start_reactor()  # will not return except Keyboard interrupt
+
+    @inlineCallbacks
+    def startup_components(self):
+        self.log.info('starting-internal-components')
+        args = self.args
+        self.connection_manager = yield ConnectionManager(args.consul,
+                                                          args.grpc_endpoint,
+                                                          args.controller).run()
+        self.log.info('started-internal-services')
+
+    @inlineCallbacks
+    def shutdown_components(self):
+        """Execute before the reactor is shut down"""
+        self.log.info('exiting-on-keyboard-interrupt')
+        self.exiting = True
+        if self.connection_manager is not None:
+            yield self.connection_manager.shutdown()
+
+    def start_reactor(self):
+        from twisted.internet import reactor
+        reactor.callWhenRunning(
+            lambda: self.log.info('twisted-reactor-started'))
+
+        reactor.addSystemEventTrigger('before', 'shutdown',
+                                      self.shutdown_components)
+        reactor.run()
+
 
 if __name__ == '__main__':
-
-    # Load config and setup logging
-    config = load_config('./ofagent.yml')
-    setup_logging(config.get('logging', {}), '1')
-
-
-    # Create grpc channel to Voltha and grab client stub
-    channel = grpc.insecure_channel('localhost:50055')
-
-    # Connect to voltha using grpc and fetch the list of logical devices
-    stub = voltha_pb2.VolthaLogicalLayerStub(channel)
-    devices = stub.ListLogicalDevices(voltha_pb2.NullMessage()).items
-    print 'device id and datapaht_id list:'
-    for device in devices:
-        print '\t{} -> {}'.format(device.id, device.datapath_id)
-
-    # make a device.datapath_id -> device.id map (this will need to be actively
-    # managed in the real agent based on devices coming and going
-    device_id_map = dict((device.datapath_id, device.id) for device in devices)
-
-    # Create shared gRPC API object
-    grpc_client = GrpcClient(channel, device_id_map)
-
-    # Instantiate an OpenFlow agent for each logical device
-    agents = [
-        Agent('localhost:6633', device.datapath_id, grpc_client).run()
-        for device in devices
-    ]
-
-    def shutdown():
-        [a.stop() for a in agents]
-
-    reactor.addSystemEventTrigger('before', 'shutdown', shutdown)
-    reactor.run()
+    Main().start()