https://jira.opencord.org/browse/CORD-824

Support for connecting multiple controllers in voltha provided.

Addressed review comments patch 7

Change-Id: I2d2375c7754014743c305a0f3841debe5eb3e795
diff --git a/compose/docker-compose-ofagent-test.yml b/compose/docker-compose-ofagent-test.yml
new file mode 100644
index 0000000..836bfc6
--- /dev/null
+++ b/compose/docker-compose-ofagent-test.yml
@@ -0,0 +1,90 @@
+version: '2'
+services:
+
+  voltha:
+    image: cord/voltha
+    command: [
+      "/voltha/voltha/main.py",
+      "--rest-port=8880",
+      "--grpc-port=50555",
+      "--instance-id-is-container-name"
+    ]
+    ports:
+    - 8880:8880
+    - 50055:50555
+    - 18880
+    volumes:
+    - "/var/run/docker.sock:/tmp/docker.sock"
+  #
+  # Chameleon server instance(s)
+  #
+  chameleon:
+    image: cord/chameleon
+    command: [
+      "/chameleon/chameleon/main.py",
+      "--rest-port=8881",
+      "--grpc-endpoint=${DOCKER_HOST_IP}:50055",
+      "--instance-id-is-container-name"
+    ]
+    ports:
+    - 8881:8881
+    volumes:
+    - "/var/run/docker.sock:/tmp/docker.sock"
+  #
+  # onos-1
+  #
+  onos1:
+    image: onosproject/onos:1.8
+    container_name: onos1
+    ports:
+    - 6633:6653
+    - 8101:8101
+    - 8181:8181
+    environment:
+      ONOS_APPS: drivers,openflow
+    volumes:
+    - "/var/run/docker.sock:/tmp/docker.sock"
+    restart: unless-stopped
+
+  #
+  # onos-2
+  #
+  onos2:
+    image: onosproject/onos:1.8
+    container_name: onos2
+    ports:
+    - 6644:6653
+    - 8102:8101
+    - 8182:8181
+    environment:
+      ONOS_APPS: drivers,openflow
+    volumes:
+    - "/var/run/docker.sock:/tmp/docker.sock"
+    restart: unless-stopped
+
+  #
+  # onos-3
+  #
+  onos3:
+    image: onosproject/onos:1.8
+    container_name: onos3
+    ports:
+    - 6655:6653
+    - 8103:8101
+    - 8183:8181
+    environment:
+      ONOS_APPS: drivers,openflow
+    volumes:
+    - "/var/run/docker.sock:/tmp/docker.sock"
+    restart: unless-stopped
+  #
+  # ofagent server instance
+  #
+  ofagent:
+    image: cord/ofagent
+    command: /ofagent/ofagent/main.py --grpc-endpoint=${DOCKER_HOST_IP}:50055 --controller ${DOCKER_HOST_IP}:6633 ${DOCKER_HOST_IP}:6644 ${DOCKER_HOST_IP}:6655
+    depends_on:
+    - voltha
+    volumes:
+    - "/var/run/docker.sock:/tmp/docker.sock"
+    restart: unless-stopped
\ No newline at end of file
diff --git a/ofagent/connection_mgr.py b/ofagent/connection_mgr.py
index 11bc0c0..8a4081c 100644
--- a/ofagent/connection_mgr.py
+++ b/ofagent/connection_mgr.py
@@ -39,18 +39,19 @@
 
 class ConnectionManager(object):
 
-    def __init__(self, consul_endpoint, voltha_endpoint, controller_endpoint,
+    def __init__(self, consul_endpoint, voltha_endpoint, controller_endpoints,
                  voltha_retry_interval=0.5, devices_refresh_interval=5):
 
         log.info('init-connection-manager')
-        self.controller_endpoint = controller_endpoint
+        log.info('list-of-controllers',controller_endpoints=controller_endpoints)
+        self.controller_endpoints = controller_endpoints
         self.consul_endpoint = consul_endpoint
         self.voltha_endpoint = voltha_endpoint
 
         self.channel = None
         self.grpc_client = None  # single, shared gRPC client to Voltha
 
-        self.agent_map = {}  # datapath_id -> Agent()
+        self.agent_map = {}  # (datapath_id, controller_endpoint) -> Agent()
         self.device_id_to_datapath_id_map = {}
 
         self.voltha_retry_interval = voltha_retry_interval
@@ -156,7 +157,7 @@
 
         # Use datapath ids for deciding what's new and what's obsolete
         desired_datapath_ids = set(d.datapath_id for d in devices)
-        current_datapath_ids = set(self.agent_map.iterkeys())
+        current_datapath_ids = set(datapath_ids[0] for datapath_ids in self.agent_map.iterkeys())
 
         # if identical, nothing to do
         if desired_datapath_ids == current_datapath_ids:
@@ -182,18 +183,20 @@
     def create_agent(self, device):
         datapath_id = device.datapath_id
         device_id = device.id
-        agent = Agent(self.controller_endpoint, datapath_id,
+        for controller_endpoint in self.controller_endpoints:
+            agent = Agent(controller_endpoint, datapath_id,
                       device_id, self.grpc_client)
-        agent.start()
-        self.agent_map[datapath_id] = agent
-        self.device_id_to_datapath_id_map[device_id] = datapath_id
+            agent.start()
+            self.agent_map[(datapath_id,controller_endpoint)] = agent
+            self.device_id_to_datapath_id_map[device_id] = datapath_id
 
     def delete_agent(self, datapath_id):
-        agent = self.agent_map[datapath_id]
-        device_id = agent.get_device_id()
-        agent.stop()
-        del self.agent_map[datapath_id]
-        del self.device_id_to_datapath_id_map[device_id]
+        for controller_endpoint in self.controller_endpoints:
+            agent = self.agent_map[(datapath_id,controller_endpoint)]
+            device_id = agent.get_device_id()
+            agent.stop()
+            del self.agent_map[(datapath_id,controller_endpoint)]
+            del self.device_id_to_datapath_id_map[device_id]
 
     @inlineCallbacks
     def monitor_logical_devices(self):
@@ -214,11 +217,13 @@
     def forward_packet_in(self, device_id, ofp_packet_in):
         datapath_id = self.device_id_to_datapath_id_map.get(device_id, None)
         if datapath_id:
-            agent = self.agent_map[datapath_id]
-            agent.forward_packet_in(ofp_packet_in)
+           for controller_endpoint in self.controller_endpoints:
+               agent = self.agent_map[(datapath_id,controller_endpoint)]
+               agent.forward_packet_in(ofp_packet_in)
 
     def forward_change_event(self, device_id, event):
         datapath_id = self.device_id_to_datapath_id_map.get(device_id, None)
         if datapath_id:
-            agent = self.agent_map[datapath_id]
-            agent.forward_change_event(event)
+           for controller_endpoint in self.controller_endpoints:
+               agent = self.agent_map[(datapath_id,controller_endpoint)]
+               agent.forward_change_event(event)
diff --git a/ofagent/main.py b/ofagent/main.py
index f2e8122..4811380 100755
--- a/ofagent/main.py
+++ b/ofagent/main.py
@@ -60,10 +60,10 @@
         default=defs['consul'],
         help=_help)
 
-    _help = '<hostname>:<port> to openflow controller (default: %s)' % \
+    _help = '<hostname1>:<port1> <hostname2>:<port2> <hostname3>:<port3> ... <hostnamen>:<portn>   to openflow controller (default: %s)' % \
             defs['controller']
     parser.add_argument(
-        '-O', '--controller', dest='controller', action='store',
+        '-O', '--controller',nargs = '*', dest='controller', action='store',
         default=defs['controller'],
         help=_help)
 
diff --git a/ofagent/of_protocol_handler.py b/ofagent/of_protocol_handler.py
index a91a4a1..f4af2b9 100644
--- a/ofagent/of_protocol_handler.py
+++ b/ofagent/of_protocol_handler.py
@@ -42,6 +42,7 @@
         self.agent = agent
         self.cxn = cxn
         self.rpc = rpc
+        self.role = None
 
     @inlineCallbacks
     def start(self):
@@ -108,12 +109,16 @@
         raise NotImplementedError()
 
     def handle_flow_mod_request(self, req):
-        try:
-            grpc_req = to_grpc(req)
-        except Exception, e:
-            log.exception('failed-to-convert', e=e)
-        else:
-            return self.rpc.update_flow_table(self.device_id, grpc_req)
+        if self.role == ofp.OFPCR_ROLE_MASTER or self.role == ofp.OFPCR_ROLE_EQUAL:
+           try:
+              grpc_req = to_grpc(req)
+           except Exception, e:
+              log.exception('failed-to-convert', e=e)
+           else:
+              return self.rpc.update_flow_table(self.device_id, grpc_req)
+
+        elif self.role == ofp.OFPCR_ROLE_SLAVE:
+           self.cxn.send(ofp.message.bad_request_error_msg(code=ofp.OFPBRC_IS_SLAVE))
 
     def handle_get_async_request(self, req):
         raise NotImplementedError()
@@ -126,21 +131,33 @@
 
     @inlineCallbacks
     def handle_group_mod_request(self, req):
-        yield self.rpc.update_group_table(self.device_id, to_grpc(req))
+        if self.role == ofp.OFPCR_ROLE_MASTER or self.role == ofp.OFPCR_ROLE_EQUAL:
+           yield self.rpc.update_group_table(self.device_id, to_grpc(req))
+        elif self.role == ofp.OFPCR_ROLE_SLAVE:
+           self.cxn.send(ofp.message.bad_request_error_msg(code=ofp.OFPBRC_IS_SLAVE))
+
 
     def handle_meter_mod_request(self, req):
         raise NotImplementedError()
 
     def handle_role_request(self, req):
-        # Handle role messages appropriately to support multiple controllers
-        # see https://jira.opencord.org/browse/CORD-824
-        if req.role != ofp.OFPCR_ROLE_MASTER:
-            raise NotImplementedError()
-        self.cxn.send(ofp.message.role_reply(
+        # https://jira.opencord.org/browse/CORD-1174
+        # Need to handle generator_id
+        if req.role == ofp.OFPCR_ROLE_MASTER or req.role == ofp.OFPCR_ROLE_SLAVE:
+           self.role = req.role
+           self.cxn.send(ofp.message.role_reply(
             xid=req.xid, role=req.role, generation_id=req.generation_id))
+        elif req.role == ofp.OFPCR_ROLE_EQUAL:
+           self.role = req.role
+           self.cxn.send(ofp.message.role_reply(
+            xid=req.xid, role=req.role))
 
     def handle_packet_out_request(self, req):
-        self.rpc.send_packet_out(self.device_id, to_grpc(req))
+        if self.role == ofp.OFPCR_ROLE_MASTER or self.role == ofp.OFPCR_ROLE_EQUAL:
+           self.rpc.send_packet_out(self.device_id, to_grpc(req))
+
+        elif self.role == ofp.OFPCR_ROLE_SLAVE:
+           self.cxn.send(ofp.message.bad_request_error_msg(code=ofp.OFPBRC_IS_SLAVE))
 
     def handle_set_config_request(self, req):
         # Handle set config appropriately
@@ -270,8 +287,9 @@
     }
 
     def forward_packet_in(self, ofp_packet_in):
-        log.info('sending-packet-in', ofp_packet_in=ofp_packet_in)
-        self.cxn.send(to_loxi(ofp_packet_in))
+        if self.role == ofp.OFPCR_ROLE_MASTER or self.role == ofp.OFPCR_ROLE_EQUAL:
+           log.info('sending-packet-in', ofp_packet_in=ofp_packet_in)
+           self.cxn.send(to_loxi(ofp_packet_in))
 
     def forward_port_status(self, ofp_port_status):
         self.cxn.send(to_loxi(ofp_port_status))
diff --git a/tests/itests/ofagent/onos-form-cluster b/tests/itests/ofagent/onos-form-cluster
new file mode 100644
index 0000000..9286d0c
--- /dev/null
+++ b/tests/itests/ofagent/onos-form-cluster
@@ -0,0 +1,46 @@
+#!/bin/bash
+# -----------------------------------------------------------------------------
+# Forms ONOS cluster using REST API of each separate instance.
+# -----------------------------------------------------------------------------
+
+[ $# -lt 2 ] && echo "usage: $(basename $0) ip1 ip2..." && exit 1
+
+# Scan arguments for user/password or other options...
+while getopts u:p:s: o; do
+    case "$o" in
+        u) user=$OPTARG;;
+        p) password=$OPTARG;;
+        s) partitionsize=$OPTARG;;
+    esac
+done
+ONOS_WEB_USER=${ONOS_WEB_USER:-onos} # ONOS WEB User defaults to 'onos'
+ONOS_WEB_PASS=${ONOS_WEB_PASS:-rocks} # ONOS WEB Password defaults to 'rocks'
+user=${user:-$ONOS_WEB_USER}
+password=${password:-$ONOS_WEB_PASS}
+let OPC=$OPTIND-1
+shift $OPC
+
+ip=$1
+shift
+nodes=$*
+
+ipPrefix=${ip%.*}
+
+aux=/tmp/${ipPrefix}.cluster.json
+trap "rm -f $aux" EXIT
+
+echo "{ \"nodes\": [ { \"ip\": \"$ip\" }" > $aux
+for node in $nodes; do
+    echo ", { \"ip\": \"$node\" }" >> $aux
+done
+echo "], \"ipPrefix\": \"$ipPrefix.*\"" >> $aux
+if ! [ -z ${partitionsize} ]; then
+    echo ", \"partitionSize\": $partitionsize" >> $aux
+fi
+echo " }" >> $aux
+
+for node in $ip $nodes; do
+    echo "Forming cluster on $node..."
+    curl --user $user:$password -X POST \
+        http://$node:8181/onos/v1/cluster/configuration -d @$aux
+done
diff --git a/tests/itests/ofagent/test_ofagent_multicontroller_failover.py b/tests/itests/ofagent/test_ofagent_multicontroller_failover.py
new file mode 100644
index 0000000..89228a8
--- /dev/null
+++ b/tests/itests/ofagent/test_ofagent_multicontroller_failover.py
@@ -0,0 +1,189 @@
+#!/usr/bin/env python
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import time
+import logging
+import os
+import json
+from unittest import TestCase,main
+
+this_dir = os.path.abspath(os.path.dirname(__file__))
+
+from tests.itests.docutests.test_utils import run_command_to_completion_with_raw_stdout
+
+log = logging.getLogger(__name__)
+
+DOCKER_COMPOSE_FILE = "compose/docker-compose-ofagent-test.yml"
+
+command_defs = dict(
+    docker_images="docker images",
+    docker_stop="docker stop",
+    docker_rm="docker rm",
+    docker_voltha_logs="docker logs -f compose_voltha_1",
+    docker_compose_logs="docker-compose -f {} logs".format(
+        DOCKER_COMPOSE_FILE),
+    docker_stop_and_remove_all_containers="docker stop `docker ps -q` ; "
+                                          "docker rm `docker ps -a -q`",
+    docker_compose_start_all="docker-compose -f {} up -d "
+        .format(DOCKER_COMPOSE_FILE),
+    docker_compose_stop="docker-compose -f {} stop"
+        .format(DOCKER_COMPOSE_FILE),
+    docker_compose_rm_f="docker-compose -f {} rm -f"
+        .format(DOCKER_COMPOSE_FILE),
+    docker_compose_ps="docker-compose -f {} ps".format(DOCKER_COMPOSE_FILE),
+    docker_ps="docker ps",
+    onos_form_cluster="./tests/itests/ofagent/onos-form-cluster",
+    onos1_ip="docker inspect --format '{{ .NetworkSettings.Networks.compose_default.IPAddress }}' onos1",
+    onos2_ip ="docker inspect --format '{{ .NetworkSettings.Networks.compose_default.IPAddress }}' onos2",
+    onos3_ip="docker inspect --format '{{ .NetworkSettings.Networks.compose_default.IPAddress }}' onos3",
+    add_olt='''curl -s -X POST -d '{"type": "simulated_olt", "mac_address": "01:0c:e2:31:40:00"}' \
+               http://localhost:8881/api/v1/local/devices''',
+    enable_olt="curl -s -X POST http://localhost:8881/api/v1/local/devices/",
+    onos1_devices="curl -u karaf:karaf  http://localhost:8181/onos/v1/devices",
+    onos2_devices="curl -u karaf:karaf  http://localhost:8182/onos/v1/devices",
+    onos3_devices="curl -u karaf:karaf  http://localhost:8183/onos/v1/devices")
+
+class TestOFAGENT_MultiController(TestCase):
+    # Test OFAgent Support for Multiple controller
+    def setUp(self):
+        # Run Voltha,OFAgent,3 ONOS and form ONOS cluster.
+        print "Starting all containers ..."
+        cmd = command_defs['docker_compose_start_all']
+        out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
+        self.assertEqual(rc, 0)
+        print "Waiting for all containers to be ready ..."
+        time.sleep(80)
+        cmd = command_defs['onos1_ip']
+        out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
+        self.assertEqual(rc, 0)
+        onos1_ip = out
+        print "ONOS1 IP is {}".format(onos1_ip)
+        cmd = command_defs['onos2_ip']
+        out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
+        self.assertEqual(rc, 0)
+        onos2_ip = out
+        print "ONOS2 IP is {}".format(onos2_ip)
+        cmd = command_defs['onos3_ip']
+        out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
+        self.assertEqual(rc, 0)
+        onos3_ip = out
+        print "ONOS3 IP is {}".format(onos3_ip)
+        cmd = command_defs['onos_form_cluster'] + ' {} {} {}'.format(onos1_ip.strip(),onos2_ip.strip(),onos3_ip.strip())
+        out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
+        self.assertEqual(rc, 0)
+        print "Cluster Output :{} ".format(out)
+
+    def tearDown(self):
+        # Stopping and Removing Voltha,OFAgent,3 ONOS.
+        print "Stopping and removing all containers ..."
+        cmd = command_defs['docker_compose_stop']
+        out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
+        self.assertEqual(rc, 0)
+        print "Waiting for all containers to be stopped ..."
+        time.sleep(1)
+        cmd = command_defs['docker_compose_rm_f']
+        out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
+        self.assertEqual(rc, 0)
+
+    def test_ofagent_controller_failover(self):
+        cmd = command_defs['add_olt']
+        out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
+        self.assertEqual(rc, 0)
+        olt_device = json.loads(out)
+        print "Output of ADD OLT is {} {} {}".format(olt_device, type(olt_device), olt_device['id'])
+        time.sleep(5)
+        cmd = command_defs['enable_olt'] + '{}'.format(olt_device['id']) + '/enable'
+        out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
+        self.assertEqual(rc, 0)
+        print "output is {}".format(out)
+        print "Waiting for OLT device to be activated ..."
+        time.sleep(80)
+        cmd = command_defs['onos1_devices']
+        out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
+        self.assertEqual(rc, 0)
+        onos1_devices = json.loads(out)
+        onos1_role = onos1_devices['devices'][0]['role']
+        print "Role of ONOS1 is {}".format(onos1_role)
+        cmd = command_defs['onos2_devices']
+        out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
+        self.assertEqual(rc, 0)
+        onos2_devices = json.loads(out)
+        onos2_role = onos2_devices['devices'][0]['role']
+        print "Role of ONOS2 is {}".format(onos2_role)
+        cmd = command_defs['onos3_devices']
+        out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
+        self.assertEqual(rc, 0)
+        onos3_devices = json.loads(out)
+        onos3_role = onos3_devices['devices'][0]['role']
+        print "Role of ONOS3 is {}".format(onos3_role)
+        if onos1_role == "MASTER":
+           cmd = command_defs['docker_stop']+ ' onos1'
+           out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
+           self.assertEqual(rc, 0)
+           print "Waiting for ONOS to Elect New Master"
+           time.sleep(20)
+           cmd = command_defs['onos2_devices']
+           out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
+           self.assertEqual(rc, 0)
+           onos2_devices = json.loads(out)
+           onos2_role = onos2_devices['devices'][0]['role']
+           print "Role of ONOS2 is {}".format(onos2_role)
+           cmd = command_defs['onos3_devices']
+           out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
+           self.assertEqual(rc, 0)
+           onos3_devices = json.loads(out)
+           onos3_role = onos3_devices['devices'][0]['role']
+           print "Role of ONOS3 is {}".format(onos3_role)
+           assert (onos3_role == "MASTER" or onos2_role == "MASTER"), "Exception,New Master Election Failed"
+        elif onos2_role == "MASTER":
+           cmd = command_defs['docker_stop']+ ' onos2'
+           out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
+           self.assertEqual(rc, 0)
+           print "Waiting for ONOS to Elect New Master"
+           time.sleep(20)
+           cmd = command_defs['onos1_devices']
+           out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
+           self.assertEqual(rc, 0)
+           onos1_devices = json.loads(out)
+           onos1_role = onos1_devices['devices'][0]['role']
+           print "Role of ONOS1 is {}".format(onos1_role)
+           cmd = command_defs['onos3_devices']
+           out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
+           self.assertEqual(rc, 0)
+           onos3_devices = json.loads(out)
+           onos3_role = onos3_devices['devices'][0]['role']
+           print "Role of ONOS3 is {}".format(onos3_role)
+           assert (onos3_role == "MASTER" or onos1_role == "MASTER"), "Exception,New Master Election Failed"
+        elif onos3_role == "MASTER":
+           cmd = command_defs['docker_stop']+ ' onos3'
+           out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
+           self.assertEqual(rc, 0)
+           print "Waiting for ONOS to Elect New Master"
+           time.sleep(20)
+           cmd = command_defs['onos1_devices']
+           out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
+           self.assertEqual(rc, 0)
+           onos1_devices = json.loads(out)
+           onos1_role = onos1_devices['devices'][0]['role']
+           print "Role of ONOS1 is {}".format(onos1_role)
+           cmd = command_defs['onos2_devices']
+           out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
+           self.assertEqual(rc, 0)
+           onos2_devices = json.loads(out)
+           onos2_role = onos2_devices['devices'][0]['role']
+           print "Role of ONOS2 is {}".format(onos2_role)
+           assert (onos1_role == "MASTER" or onos2_role == "MASTER"), "Exception,New Master Election Failed"
+
diff --git a/tests/utests/ofagent/test_connection_mgr.py b/tests/utests/ofagent/test_connection_mgr.py
new file mode 100644
index 0000000..657a37a
--- /dev/null
+++ b/tests/utests/ofagent/test_connection_mgr.py
@@ -0,0 +1,72 @@
+from unittest import TestCase, main
+from connection_mgr import ConnectionManager
+
+class TestConection_mgr(TestCase):
+
+    def gen_endpoints(self):
+        consul_endpoint = "localhost:8500"
+        voltha_endpoint= "localhost:8880"
+        controller_endpoints = ["localhost:6633","localhost:6644","localhost:6655"]
+        return (consul_endpoint,voltha_endpoint,controller_endpoints)
+
+    def gen_devices(self):
+        device =lambda: None
+        device.id = "1"
+        device.datapath_id = 1
+        device.desc = '{mfr_desc: "cord porject" hw_desc: "simualted pon" sw_desc: "simualted pon"\
+                       serial_num: "2f150d56afa2405eba3ba24e33ce8df9"  dp_desc: "n/a"}'
+        device.switch_features = '{ n_buffers: 256 n_tables: 2 capabilities: 15 }'
+        device.root_device_id = "a245bd8bb8b8"
+        devices = [device]
+        return devices,device
+
+    def gen_packet_in(self):
+        packet_in = 1
+        return packet_in
+
+    def test_connection_mgr_init(self):
+        consul_endpoint,voltha_endpoint,controller_endpoints  = self.gen_endpoints()
+        test_connection_init = ConnectionManager(consul_endpoint, voltha_endpoint, controller_endpoints)
+        self.assertEqual(test_connection_init.consul_endpoint,consul_endpoint)
+        self.assertEqual(test_connection_init.voltha_endpoint, voltha_endpoint)
+        self.assertEqual(test_connection_init.controller_endpoints, controller_endpoints)
+
+    def test_resolve_endpoint(self):
+        consul_endpoint, voltha_endpoint, controller_endpoints = self.gen_endpoints()
+        test_connection_init = ConnectionManager(consul_endpoint, voltha_endpoint, controller_endpoints)
+        host,port = test_connection_init.resolve_endpoint(endpoint=consul_endpoint)
+        assert isinstance(port, int)
+        assert isinstance(host, basestring)
+
+    def test_refresh_agent_connections(self):
+        consul_endpoint, voltha_endpoint, controller_endpoints = self.gen_endpoints()
+        devices,device = self.gen_devices()
+        test_connection_init = ConnectionManager(consul_endpoint, voltha_endpoint, controller_endpoints)
+        test_connection_init.refresh_agent_connections(devices)
+
+    def test_create_agent(self):
+        consul_endpoint, voltha_endpoint, controller_endpoints = self.gen_endpoints()
+        devices,device = self.gen_devices()
+        test_connection_init = ConnectionManager(consul_endpoint, voltha_endpoint, controller_endpoints)
+        test_connection_init.create_agent(device)
+
+    def test_delete_agent(self):
+        consul_endpoint, voltha_endpoint, controller_endpoints = self.gen_endpoints()
+        devices,device = self.gen_devices()
+        test_connection_init = ConnectionManager(consul_endpoint, voltha_endpoint, controller_endpoints)
+        test_connection_init.create_agent(device)
+        with self.assertRaises(Exception) as context:
+            test_connection_init.delete_agent(device.datapath_id)
+        print context.exception
+        self.assertTrue('\'NoneType\' object has no attribute \'disconnect\'' in context.exception)
+
+    def test_forward_packet_in(self):
+        consul_endpoint, voltha_endpoint, controller_endpoints = self.gen_endpoints()
+        devices,device = self.gen_devices()
+        packet_in = self.gen_packet_in()
+        test_connection_init = ConnectionManager(consul_endpoint, voltha_endpoint, controller_endpoints)
+        test_connection_init.create_agent(device)
+        test_connection_init.forward_packet_in(device.id, packet_in)
+
+if __name__ == '__main__':
+    main()
\ No newline at end of file
diff --git a/tests/utests/ofagent/test_of_protocol_handler.py b/tests/utests/ofagent/test_of_protocol_handler.py
new file mode 100644
index 0000000..d72b0b2
--- /dev/null
+++ b/tests/utests/ofagent/test_of_protocol_handler.py
@@ -0,0 +1,77 @@
+from unittest import TestCase, main
+from of_protocol_handler import OpenFlowProtocolHandler
+import loxi.of13 as ofp
+
+class TestOF_Protocol_handler(TestCase):
+
+    def gen_packet_in(self):
+        packet_in = 1
+        return packet_in
+
+    def gen_device(self):
+        device =lambda: None
+        device.id = "1"
+        device.datapath_id = 1
+        device.desc = '{mfr_desc: "cord porject" hw_desc: "simualted pon" sw_desc: "simualted pon"\
+                       serial_num: "2f150d56afa2405eba3ba24e33ce8df9"  dp_desc: "n/a"}'
+        device.switch_features = '{ n_buffers: 256 n_tables: 2 capabilities: 15 }'
+        device.root_device_id = "a245bd8bb8b8"
+        return device
+
+    def gen_generic_obj(self):
+        generic_obj = lambda: None
+        return generic_obj
+
+    def gen_role_req(self):
+        req = self.gen_generic_obj()
+        req.role = ofp.OFPCR_ROLE_MASTER
+        return req
+
+    def test_handle_flow_mod_request_role_salve(self):
+        generic_obj = self.gen_generic_obj()
+        device = self.gen_device()
+        of_proto_handler = OpenFlowProtocolHandler(device.datapath_id, device.id, generic_obj, generic_obj, generic_obj)
+        of_proto_handler.role = ofp.OFPCR_ROLE_SLAVE
+        with self.assertRaises(Exception) as context:
+            of_proto_handler.handle_flow_mod_request(generic_obj)
+        print context.exception
+        self.assertTrue('\'function\' object has no attribute \'send\'' in context.exception)
+
+    def test_handle_flow_mod_request_role_master(self):
+        generic_obj = self.gen_generic_obj()
+        device = self.gen_device()
+        of_proto_handler = OpenFlowProtocolHandler(device.datapath_id, device.id, generic_obj, generic_obj, generic_obj)
+        of_proto_handler.role = ofp.OFPCR_ROLE_MASTER
+        of_proto_handler.handle_flow_mod_request(generic_obj)
+
+    def test_handle_role_request(self):
+        generic_obj = self.gen_generic_obj()
+        req = self.gen_role_req()
+        device = self.gen_device()
+        of_proto_handler = OpenFlowProtocolHandler(device.datapath_id, device.id, generic_obj, generic_obj, generic_obj)
+        with self.assertRaises(Exception) as context:
+            of_proto_handler.handle_role_request(req)
+            self.assertEqual(of_proto_handler.role,req.role)
+        print context.exception
+        self.assertTrue('\'function\' object has no attribute \'send\'' in context.exception)
+
+    def test_forward_packet_in_role_none(self):
+        packet_in = self.gen_packet_in()
+        generic_obj = self.gen_generic_obj()
+        device = self.gen_device()
+        of_proto_handler = OpenFlowProtocolHandler(device.datapath_id, device.id, generic_obj, generic_obj, generic_obj)
+        of_proto_handler.forward_packet_in(packet_in)
+
+    def test_forward_packet_in_role_master(self):
+        packet_in = self.gen_packet_in()
+        generic_obj = self.gen_generic_obj()
+        device = self.gen_device()
+        of_proto_handler = OpenFlowProtocolHandler(device.datapath_id, device.id, generic_obj, generic_obj, generic_obj)
+        of_proto_handler.role = ofp.OFPCR_ROLE_MASTER
+        with self.assertRaises(Exception) as context:
+            of_proto_handler.forward_packet_in(packet_in)
+        print context.exception
+        self.assertTrue('\'function\' object has no attribute \'send\'' in context.exception)
+
+if __name__ == '__main__':
+    main()
\ No newline at end of file