Support connecting to multiple OpenFlow controllers

Change-Id: I0989d5031fb2d4f5aa78ba0e4576e465f826a419
diff --git a/internal/pkg/ofagent/ofagent.go b/internal/pkg/ofagent/ofagent.go
index 1d225b2..419a988 100644
--- a/internal/pkg/ofagent/ofagent.go
+++ b/internal/pkg/ofagent/ofagent.go
@@ -44,7 +44,7 @@
 
 type OFAgent struct {
 	VolthaApiEndPoint         string
-	OFControllerEndPoint      string
+	OFControllerEndPoints     []string
 	DeviceListRefreshInterval time.Duration
 	ConnectionMaxRetries      int
 	ConnectionRetryDelay      time.Duration
@@ -63,7 +63,7 @@
 func NewOFAgent(config *OFAgent) (*OFAgent, error) {
 	ofa := OFAgent{
 		VolthaApiEndPoint:         config.VolthaApiEndPoint,
-		OFControllerEndPoint:      config.OFControllerEndPoint,
+		OFControllerEndPoints:     config.OFControllerEndPoints,
 		DeviceListRefreshInterval: config.DeviceListRefreshInterval,
 		ConnectionMaxRetries:      config.ConnectionMaxRetries,
 		ConnectionRetryDelay:      config.ConnectionRetryDelay,
@@ -99,7 +99,7 @@
 	logger.Debugw("Starting GRPC - VOLTHA client",
 		log.Fields{
 			"voltha-endpoint":     ofa.VolthaApiEndPoint,
-			"controller-endpoint": ofa.OFControllerEndPoint})
+			"controller-endpoint": ofa.OFControllerEndPoints})
 
 	// If the context contains a k8s probe then register services
 	p := probe.GetProbeFromContext(ctx)
diff --git a/internal/pkg/ofagent/refresh.go b/internal/pkg/ofagent/refresh.go
index d50b3ec..835e551 100644
--- a/internal/pkg/ofagent/refresh.go
+++ b/internal/pkg/ofagent/refresh.go
@@ -90,14 +90,14 @@
 	ofc := ofa.clientMap[deviceID]
 	if ofc == nil {
 		ofc = openflow.NewOFClient(&openflow.OFClient{
-			DeviceID:             deviceID,
-			OFControllerEndPoint: ofa.OFControllerEndPoint,
-			VolthaClient:         ofa.volthaClient,
-			PacketOutChannel:     ofa.packetOutChannel,
-			ConnectionMaxRetries: ofa.ConnectionMaxRetries,
-			ConnectionRetryDelay: ofa.ConnectionRetryDelay,
+			DeviceID:              deviceID,
+			OFControllerEndPoints: ofa.OFControllerEndPoints,
+			VolthaClient:          ofa.volthaClient,
+			PacketOutChannel:      ofa.packetOutChannel,
+			ConnectionMaxRetries:  ofa.ConnectionMaxRetries,
+			ConnectionRetryDelay:  ofa.ConnectionRetryDelay,
 		})
-		go ofc.Run(context.Background())
+		ofc.Run(context.Background())
 		ofa.clientMap[deviceID] = ofc
 	}
 	ofa.mapLock.Unlock()
diff --git a/internal/pkg/openflow/barrier.go b/internal/pkg/openflow/barrier.go
index 3db9e84..4775d2c 100644
--- a/internal/pkg/openflow/barrier.go
+++ b/internal/pkg/openflow/barrier.go
@@ -22,7 +22,7 @@
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
 )
 
-func (ofc *OFClient) handleBarrierRequest(request *ofp.BarrierRequest) {
+func (ofc *OFConnection) handleBarrierRequest(request *ofp.BarrierRequest) {
 
 	if logger.V(log.DebugLevel) {
 		js, _ := json.Marshal(request)
diff --git a/internal/pkg/openflow/client.go b/internal/pkg/openflow/client.go
index 94ad645..aeac406 100644
--- a/internal/pkg/openflow/client.go
+++ b/internal/pkg/openflow/client.go
@@ -17,17 +17,12 @@
 package openflow
 
 import (
-	"bufio"
 	"context"
-	"encoding/binary"
-	"encoding/json"
 	"errors"
-	"github.com/donNewtonAlpha/goloxi"
 	ofp "github.com/donNewtonAlpha/goloxi/of13"
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
 	"github.com/opencord/voltha-protos/v3/go/voltha"
-	"io"
-	"net"
+	"sync"
 	"time"
 )
 
@@ -35,6 +30,7 @@
 
 type ofcEvent byte
 type ofcState byte
+type ofcRole byte
 
 const (
 	ofcEventStart = ofcEvent(iota)
@@ -47,6 +43,11 @@
 	ofcStateConnected
 	ofcStateDisconnected
 	ofcStateStopped
+
+	ofcRoleNone = ofcRole(iota)
+	ofcRoleEqual
+	ofcRoleMaster
+	ofcRoleSlave
 )
 
 func (e ofcEvent) String() string {
@@ -84,19 +85,73 @@
 // OFClient the configuration and operational state of a connection to an
 // openflow controller
 type OFClient struct {
-	OFControllerEndPoint string
-	Port                 uint16
-	DeviceID             string
-	VolthaClient         voltha.VolthaServiceClient
-	PacketOutChannel     chan *voltha.PacketOut
-	ConnectionMaxRetries int
-	ConnectionRetryDelay time.Duration
-	conn                 net.Conn
+	OFControllerEndPoints []string
+	DeviceID              string
+	VolthaClient          voltha.VolthaServiceClient
+	PacketOutChannel      chan *voltha.PacketOut
+	ConnectionMaxRetries  int
+	ConnectionRetryDelay  time.Duration
 
-	// expirimental
-	events            chan ofcEvent
-	sendChannel       chan Message
-	lastUnsentMessage Message
+	// map of endpoint to OF connection
+	connections map[string]*OFConnection
+
+	// global role state for device
+	generationIsDefined bool
+	generationID        uint64
+	roleLock            sync.Mutex
+}
+
+type RoleManager interface {
+	UpdateRoles(from string, request *ofp.RoleRequest) bool
+}
+
+func distance(a uint64, b uint64) int64 {
+	return (int64)(a - b)
+}
+
+// UpdateRoles validates a role request and updates role state for connections where it changed
+func (ofc *OFClient) UpdateRoles(from string, request *ofp.RoleRequest) bool {
+	log.Debug("updating role", log.Fields{
+		"from": from,
+		"to":   request.Role,
+		"id":   request.GenerationId})
+
+	ofc.roleLock.Lock()
+	defer ofc.roleLock.Unlock()
+
+	if request.Role == ofp.OFPCRRoleEqual {
+		// equal request doesn't care about generation ID and always succeeds
+		connection := ofc.connections[from]
+		connection.role = ofcRoleEqual
+		return true
+	}
+
+	if ofc.generationIsDefined && distance(request.GenerationId, ofc.generationID) < 0 {
+		// generation ID is not valid
+		return false
+	} else {
+		ofc.generationID = request.GenerationId
+		ofc.generationIsDefined = true
+
+		if request.Role == ofp.OFPCRRoleMaster {
+			// master is potentially changing, find the existing master and set it to slave
+			for endpoint, connection := range ofc.connections {
+				if endpoint == from {
+					connection.role = ofcRoleMaster
+				} else if connection.role == ofcRoleMaster {
+					// the old master should be set to slave
+					connection.role = ofcRoleSlave
+				}
+			}
+			return true
+		} else if request.Role == ofp.OFPCRRoleSlave {
+			connection := ofc.connections[from]
+			connection.role = ofcRoleSlave
+			return true
+		}
+	}
+
+	return false
 }
 
 // NewClient returns an initialized OFClient instance based on the configuration
@@ -104,14 +159,13 @@
 func NewOFClient(config *OFClient) *OFClient {
 
 	ofc := OFClient{
-		DeviceID:             config.DeviceID,
-		OFControllerEndPoint: config.OFControllerEndPoint,
-		VolthaClient:         config.VolthaClient,
-		PacketOutChannel:     config.PacketOutChannel,
-		ConnectionMaxRetries: config.ConnectionMaxRetries,
-		ConnectionRetryDelay: config.ConnectionRetryDelay,
-		events:               make(chan ofcEvent, 10),
-		sendChannel:          make(chan Message, 100),
+		DeviceID:              config.DeviceID,
+		OFControllerEndPoints: config.OFControllerEndPoints,
+		VolthaClient:          config.VolthaClient,
+		PacketOutChannel:      config.PacketOutChannel,
+		ConnectionMaxRetries:  config.ConnectionMaxRetries,
+		ConnectionRetryDelay:  config.ConnectionRetryDelay,
+		connections:           make(map[string]*OFConnection),
 	}
 
 	if ofc.ConnectionRetryDelay <= 0 {
@@ -127,417 +181,43 @@
 
 // Stop initiates a shutdown of the OFClient
 func (ofc *OFClient) Stop() {
-	ofc.events <- ofcEventStop
-}
-
-func (ofc *OFClient) peekAtOFHeader(buf []byte) (ofp.IHeader, error) {
-	header := ofp.Header{}
-	header.Version = uint8(buf[0])
-	header.Type = uint8(buf[1])
-	header.Length = binary.BigEndian.Uint16(buf[2:4])
-	header.Xid = binary.BigEndian.Uint32(buf[4:8])
-
-	// TODO: add minimal validation of version and type
-
-	return &header, nil
-}
-
-func (ofc *OFClient) establishConnectionToController() error {
-	if ofc.conn != nil {
-		logger.Debugw("closing-of-connection-to-reconnect",
-			log.Fields{"device-id": ofc.DeviceID})
-		ofc.conn.Close()
-		ofc.conn = nil
+	for _, connection := range ofc.connections {
+		connection.events <- ofcEventStop
 	}
-	try := 1
-	for ofc.ConnectionMaxRetries == 0 || try < ofc.ConnectionMaxRetries {
-		if raddr, err := net.ResolveTCPAddr("tcp", ofc.OFControllerEndPoint); err != nil {
-			logger.Debugw("openflow-client unable to resolve endpoint",
-				log.Fields{
-					"device-id": ofc.DeviceID,
-					"endpoint":  ofc.OFControllerEndPoint})
-		} else {
-			if connection, err := net.DialTCP("tcp", nil, raddr); err == nil {
-				ofc.conn = connection
-				ofc.sayHello()
-				ofc.events <- ofcEventConnect
-				return nil
-			} else {
-				logger.Warnw("openflow-client-connect-error",
-					log.Fields{
-						"device-id": ofc.DeviceID,
-						"endpoint":  ofc.OFControllerEndPoint})
-			}
-		}
-		if ofc.ConnectionMaxRetries == 0 || try < ofc.ConnectionMaxRetries {
-			if ofc.ConnectionMaxRetries != 0 {
-				try += 1
-			}
-			time.Sleep(ofc.ConnectionRetryDelay)
-		}
-	}
-	return errors.New("failed-to-connect-to-of-controller")
 }
 
-// Run implementes the state machine for the OF client reacting to state change
-// events and invoking actions as a reaction to those state changes
 func (ofc *OFClient) Run(ctx context.Context) {
 
-	var ofCtx context.Context
-	var ofDone func()
-	state := ofcStateCreated
-	ofc.events <- ofcEventStart
-top:
-	for {
-		select {
-		case <-ctx.Done():
-			state = ofcStateStopped
-			logger.Debugw("state-transition-context-done",
-				log.Fields{"device-id": ofc.DeviceID})
-			break top
-		case event := <-ofc.events:
-			previous := state
-			switch event {
-			case ofcEventStart:
-				logger.Debugw("ofc-event-start",
-					log.Fields{"device-id": ofc.DeviceID})
-				if state == ofcStateCreated {
-					state = ofcStateStarted
-					logger.Debug("STARTED MORE THAN ONCE")
-					go func() {
-						if err := ofc.establishConnectionToController(); err != nil {
-							logger.Errorw("controller-connection-failed", log.Fields{"error": err})
-							panic(err)
-						}
-					}()
-				} else {
-					logger.Errorw("illegal-state-transition",
-						log.Fields{
-							"device-id":     ofc.DeviceID,
-							"current-state": state.String(),
-							"event":         event.String()})
-				}
-			case ofcEventConnect:
-				logger.Debugw("ofc-event-connected",
-					log.Fields{"device-id": ofc.DeviceID})
-				if state == ofcStateStarted || state == ofcStateDisconnected {
-					state = ofcStateConnected
-					ofCtx, ofDone = context.WithCancel(context.Background())
-					go ofc.messageSender(ofCtx)
-					go ofc.processOFStream(ofCtx)
-				} else {
-					logger.Errorw("illegal-state-transition",
-						log.Fields{
-							"device-id":     ofc.DeviceID,
-							"current-state": state.String(),
-							"event":         event.String()})
-				}
-			case ofcEventDisconnect:
-				logger.Debugw("ofc-event-disconnected",
-					log.Fields{
-						"device-id": ofc.DeviceID,
-						"state":     state.String()})
-				if state == ofcStateConnected {
-					state = ofcStateDisconnected
-					if ofDone != nil {
-						ofDone()
-						ofDone = nil
-					}
-					go func() {
-						if err := ofc.establishConnectionToController(); err != nil {
-							logger.Errorw("controller-connection-failed", log.Fields{"error": err})
-							panic(err)
-						}
-					}()
-				} else {
-					logger.Errorw("illegal-state-transition",
-						log.Fields{
-							"device-id":     ofc.DeviceID,
-							"current-state": state.String(),
-							"event":         event.String()})
-				}
-			case ofcEventStop:
-				logger.Debugw("ofc-event-stop",
-					log.Fields{"device-id": ofc.DeviceID})
-				if state == ofcStateCreated || state == ofcStateConnected || state == ofcStateDisconnected {
-					state = ofcStateStopped
-					break top
-				} else {
-					logger.Errorw("illegal-state-transition",
-						log.Fields{
-							"device-id":     ofc.DeviceID,
-							"current-state": state.String(),
-							"event":         event.String()})
-				}
-			}
-			logger.Debugw("state-transition",
-				log.Fields{
-					"device-id":      ofc.DeviceID,
-					"previous-state": previous.String(),
-					"current-state":  state.String(),
-					"event":          event.String()})
+	for _, endpoint := range ofc.OFControllerEndPoints {
+		connection := &OFConnection{
+			OFControllerEndPoint: endpoint,
+			DeviceID:             ofc.DeviceID,
+			VolthaClient:         ofc.VolthaClient,
+			PacketOutChannel:     ofc.PacketOutChannel,
+			ConnectionMaxRetries: ofc.ConnectionMaxRetries,
+			ConnectionRetryDelay: ofc.ConnectionRetryDelay,
+			role:                 ofcRoleNone,
+			roleManager:          ofc,
+			events:               make(chan ofcEvent, 10),
+			sendChannel:          make(chan Message, 100),
 		}
+
+		ofc.connections[endpoint] = connection
 	}
 
-	// If the child context exists, then cancel it
-	if ofDone != nil {
-		logger.Debugw("closing-child-processes",
-			log.Fields{"device-id": ofc.DeviceID})
-		ofDone()
-	}
-
-	// If the connection is open, then close it
-	if ofc.conn != nil {
-		logger.Debugw("closing-of-connection",
-			log.Fields{"device-id": ofc.DeviceID})
-		ofc.conn.Close()
-		ofc.conn = nil
-	}
-	logger.Debugw("state-machine-finished",
-		log.Fields{"device-id": ofc.DeviceID})
-}
-
-// processOFStream processes the OF connection from the controller and invokes
-// the appropriate handler methods for each message.
-func (ofc *OFClient) processOFStream(ctx context.Context) {
-	fromController := bufio.NewReader(ofc.conn)
-
-	/*
-	 * We have a read buffer of a max size of 4096, so if we ever have
-	 * a message larger than this then we will have issues
-	 */
-	headerBuf := make([]byte, 8)
-
-top:
-	// Continue until we are told to stop
-	for {
-		select {
-		case <-ctx.Done():
-			logger.Error("of-loop-ending-context-done")
-			break top
-		default:
-			// Read 8 bytes, the standard OF header
-			read, err := io.ReadFull(fromController, headerBuf)
-			if err != nil {
-				logger.Errorw("bad-of-header",
-					log.Fields{
-						"byte-count": read,
-						"device-id":  ofc.DeviceID,
-						"error":      err})
-				break top
-			}
-
-			// Decode the header
-			peek, err := ofc.peekAtOFHeader(headerBuf)
-			if err != nil {
-				/*
-				 * Header is bad, assume stream is corrupted
-				 * and needs to be restarted
-				 */
-				logger.Errorw("bad-of-packet",
-					log.Fields{
-						"device-id": ofc.DeviceID,
-						"error":     err})
-				break top
-			}
-
-			// Calculate the size of the rest of the packet and read it
-			need := int(peek.GetLength())
-			messageBuf := make([]byte, need)
-			copy(messageBuf, headerBuf)
-			read, err = io.ReadFull(fromController, messageBuf[8:])
-			if err != nil {
-				logger.Errorw("bad-of-packet",
-					log.Fields{
-						"byte-count": read,
-						"device-id":  ofc.DeviceID,
-						"error":      err})
-				break top
-			}
-
-			// Decode and process the packet
-			decoder := goloxi.NewDecoder(messageBuf)
-			msg, err := ofp.DecodeHeader(decoder)
-			if err != nil {
-				// nolint: staticcheck
-				js, _ := json.Marshal(decoder)
-				logger.Errorw("failed-to-decode",
-					log.Fields{
-						"device-id": ofc.DeviceID,
-						"decoder":   js,
-						"error":     err})
-				break top
-			}
-			if logger.V(log.DebugLevel) {
-				js, _ := json.Marshal(msg)
-				logger.Debugw("packet-header",
-					log.Fields{
-						"device-id": ofc.DeviceID,
-						"header":    js})
-			}
-			ofc.parseHeader(msg)
-		}
-	}
-	logger.Debugw("end-of-stream",
-		log.Fields{"device-id": ofc.DeviceID})
-	ofc.events <- ofcEventDisconnect
-}
-
-func (ofc *OFClient) sayHello() {
-	hello := ofp.NewHello()
-	hello.Xid = uint32(GetXid())
-	elem := ofp.NewHelloElemVersionbitmap()
-	elem.SetType(ofp.OFPHETVersionbitmap)
-	elem.SetLength(8)
-	elem.SetBitmaps([]*ofp.Uint32{{Value: 16}})
-	hello.SetElements([]ofp.IHelloElem{elem})
-	if logger.V(log.DebugLevel) {
-		js, _ := json.Marshal(hello)
-		logger.Debugw("sayHello Called",
-			log.Fields{
-				"device-id":     ofc.DeviceID,
-				"hello-message": js})
-	}
-	if err := ofc.SendMessage(hello); err != nil {
-		logger.Fatalw("Failed saying hello to Openflow Server, unable to proceed",
-			log.Fields{
-				"device-id": ofc.DeviceID,
-				"error":     err})
+	for _, connection := range ofc.connections {
+		go connection.Run(ctx)
 	}
 }
 
-func (ofc *OFClient) parseHeader(header ofp.IHeader) {
-	headerType := header.GetType()
-	logger.Debugw("packet-header-type",
-		log.Fields{
-			"header-type": ofp.Type(headerType).String()})
-	switch headerType {
-	case ofp.OFPTHello:
-		//x := header.(*ofp.Hello)
-	case ofp.OFPTError:
-		go ofc.handleErrMsg(header.(*ofp.ErrorMsg))
-	case ofp.OFPTEchoRequest:
-		go ofc.handleEchoRequest(header.(*ofp.EchoRequest))
-	case ofp.OFPTEchoReply:
-	case ofp.OFPTExperimenter:
-	case ofp.OFPTFeaturesRequest:
-		go func() {
-			if err := ofc.handleFeatureRequest(header.(*ofp.FeaturesRequest)); err != nil {
-				logger.Errorw("handle-feature-request", log.Fields{"error": err})
-			}
-		}()
-	case ofp.OFPTFeaturesReply:
-	case ofp.OFPTGetConfigRequest:
-		go ofc.handleGetConfigRequest(header.(*ofp.GetConfigRequest))
-	case ofp.OFPTGetConfigReply:
-	case ofp.OFPTSetConfig:
-		go ofc.handleSetConfig(header.(*ofp.SetConfig))
-	case ofp.OFPTPacketIn:
-	case ofp.OFPTFlowRemoved:
-	case ofp.OFPTPortStatus:
-	case ofp.OFPTPacketOut:
-		go ofc.handlePacketOut(header.(*ofp.PacketOut))
-	case ofp.OFPTFlowMod:
-		/*
-		 * Not using go routine to handle flow* messages or barrier requests
-		 * onos typically issues barrier requests just before a flow* message.
-		 * by handling in this thread I ensure all flow* are handled when barrier
-		 * request is issued.
-		 */
-		switch header.(ofp.IFlowMod).GetCommand() {
-		case ofp.OFPFCAdd:
-			ofc.handleFlowAdd(header.(*ofp.FlowAdd))
-		case ofp.OFPFCModify:
-			ofc.handleFlowMod(header.(*ofp.FlowMod))
-		case ofp.OFPFCModifyStrict:
-			ofc.handleFlowModStrict(header.(*ofp.FlowModifyStrict))
-		case ofp.OFPFCDelete:
-			ofc.handleFlowDelete(header.(*ofp.FlowDelete))
-		case ofp.OFPFCDeleteStrict:
-			ofc.handleFlowDeleteStrict(header.(*ofp.FlowDeleteStrict))
-		}
-	case ofp.OFPTStatsRequest:
-		go func() {
-			if err := ofc.handleStatsRequest(header, header.(ofp.IStatsRequest).GetStatsType()); err != nil {
-				logger.Errorw("ofpt-stats-request", log.Fields{"error": err})
-			}
-		}()
-	case ofp.OFPTBarrierRequest:
-		/* See note above at case ofp.OFPTFlowMod:*/
-		ofc.handleBarrierRequest(header.(*ofp.BarrierRequest))
-	case ofp.OFPTRoleRequest:
-		go ofc.handleRoleRequest(header.(*ofp.RoleRequest))
-	case ofp.OFPTMeterMod:
-		ofc.handleMeterModRequest(header.(*ofp.MeterMod))
-	}
-}
-
-// Message interface that represents an open flow message and enables for a
-// unified implementation of SendMessage
-type Message interface {
-	Serialize(encoder *goloxi.Encoder) error
-}
-
-func (ofc *OFClient) doSend(msg Message) error {
-	if ofc.conn == nil {
-		return errors.New("no-connection")
-	}
-	enc := goloxi.NewEncoder()
-	if err := msg.Serialize(enc); err != nil {
-		return err
-	}
-
-	bytes := enc.Bytes()
-	if _, err := ofc.conn.Write(bytes); err != nil {
-		logger.Errorw("unable-to-send-message-to-controller",
-			log.Fields{
-				"device-id": ofc.DeviceID,
-				"message":   msg,
-				"error":     err})
-		return err
-	}
-	return nil
-}
-
-func (ofc *OFClient) messageSender(ctx context.Context) {
-	// first process last fail if it exists
-	if ofc.lastUnsentMessage != nil {
-		if err := ofc.doSend(ofc.lastUnsentMessage); err != nil {
-			ofc.events <- ofcEventDisconnect
-			return
-		}
-		ofc.lastUnsentMessage = nil
-	}
-top:
-	for {
-		select {
-		case <-ctx.Done():
-			break top
-		case msg := <-ofc.sendChannel:
-			if err := ofc.doSend(msg); err != nil {
-				ofc.lastUnsentMessage = msg
-				ofc.events <- ofcEventDisconnect
-				logger.Debugw("message-sender-error",
-					log.Fields{
-						"device-id": ofc.DeviceID,
-						"error":     err.Error()})
-				break top
-			}
-			logger.Debugw("message-sender-send",
-				log.Fields{
-					"device-id": ofc.DeviceID})
-			ofc.lastUnsentMessage = nil
-		}
-	}
-
-	logger.Debugw("message-sender-finished",
-		log.Fields{
-			"device-id": ofc.DeviceID})
-}
-
-// SendMessage queues a message to be sent to the openflow controller
 func (ofc *OFClient) SendMessage(message Message) error {
-	logger.Debug("queuing-message")
-	ofc.sendChannel <- message
+	for _, connection := range ofc.connections {
+		if connection.role == ofcRoleMaster || connection.role == ofcRoleEqual {
+			err := connection.SendMessage(message)
+			if err != nil {
+				return err
+			}
+		}
+	}
 	return nil
 }
diff --git a/internal/pkg/openflow/connection.go b/internal/pkg/openflow/connection.go
new file mode 100644
index 0000000..7d57d34
--- /dev/null
+++ b/internal/pkg/openflow/connection.go
@@ -0,0 +1,480 @@
+/*
+   Copyright 2020 the original author or authors.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+package openflow
+
+import (
+	"bufio"
+	"context"
+	"encoding/binary"
+	"encoding/json"
+	"errors"
+	"github.com/donNewtonAlpha/goloxi"
+	ofp "github.com/donNewtonAlpha/goloxi/of13"
+	"github.com/opencord/voltha-lib-go/v3/pkg/log"
+	"github.com/opencord/voltha-protos/v3/go/voltha"
+	"io"
+	"net"
+	"time"
+)
+
+type OFConnection struct {
+	OFControllerEndPoint string
+	DeviceID             string
+	VolthaClient         voltha.VolthaServiceClient
+	PacketOutChannel     chan *voltha.PacketOut
+	ConnectionMaxRetries int
+	ConnectionRetryDelay time.Duration
+
+	conn net.Conn
+
+	// current role of this connection
+	role        ofcRole
+	roleManager RoleManager
+
+	events            chan ofcEvent
+	sendChannel       chan Message
+	lastUnsentMessage Message
+}
+
+func (ofc *OFConnection) peekAtOFHeader(buf []byte) (ofp.IHeader, error) {
+	header := ofp.Header{}
+	header.Version = uint8(buf[0])
+	header.Type = uint8(buf[1])
+	header.Length = binary.BigEndian.Uint16(buf[2:4])
+	header.Xid = binary.BigEndian.Uint32(buf[4:8])
+
+	// TODO: add minimal validation of version and type
+
+	return &header, nil
+}
+
+func (ofc *OFConnection) establishConnectionToController() error {
+	if ofc.conn != nil {
+		logger.Debugw("closing-of-connection-to-reconnect",
+			log.Fields{"device-id": ofc.DeviceID})
+		ofc.conn.Close()
+		ofc.conn = nil
+	}
+	try := 1
+	for ofc.ConnectionMaxRetries == 0 || try < ofc.ConnectionMaxRetries {
+		if raddr, err := net.ResolveTCPAddr("tcp", ofc.OFControllerEndPoint); err != nil {
+			logger.Debugw("openflow-client unable to resolve endpoint",
+				log.Fields{
+					"device-id": ofc.DeviceID,
+					"endpoint":  ofc.OFControllerEndPoint})
+		} else {
+			if connection, err := net.DialTCP("tcp", nil, raddr); err == nil {
+				ofc.conn = connection
+				ofc.sayHello()
+				ofc.events <- ofcEventConnect
+				return nil
+			} else {
+				logger.Warnw("openflow-client-connect-error",
+					log.Fields{
+						"device-id": ofc.DeviceID,
+						"endpoint":  ofc.OFControllerEndPoint})
+			}
+		}
+		if ofc.ConnectionMaxRetries == 0 || try < ofc.ConnectionMaxRetries {
+			if ofc.ConnectionMaxRetries != 0 {
+				try += 1
+			}
+			time.Sleep(ofc.ConnectionRetryDelay)
+		}
+	}
+	return errors.New("failed-to-connect-to-of-controller")
+}
+
+// Run implements the state machine for the OF client reacting to state change
+// events and invoking actions as a reaction to those state changes
+func (ofc *OFConnection) Run(ctx context.Context) {
+
+	var ofCtx context.Context
+	var ofDone func()
+	state := ofcStateCreated
+	ofc.events <- ofcEventStart
+top:
+	for {
+		select {
+		case <-ctx.Done():
+			state = ofcStateStopped
+			logger.Debugw("state-transition-context-done",
+				log.Fields{"device-id": ofc.DeviceID})
+			break top
+		case event := <-ofc.events:
+			previous := state
+			switch event {
+			case ofcEventStart:
+				logger.Debugw("ofc-event-start",
+					log.Fields{"device-id": ofc.DeviceID})
+				if state == ofcStateCreated {
+					state = ofcStateStarted
+					logger.Debug("STARTED MORE THAN ONCE")
+					go func() {
+						if err := ofc.establishConnectionToController(); err != nil {
+							logger.Errorw("controller-connection-failed", log.Fields{"error": err})
+							panic(err)
+						}
+					}()
+				} else {
+					logger.Errorw("illegal-state-transition",
+						log.Fields{
+							"device-id":     ofc.DeviceID,
+							"current-state": state.String(),
+							"event":         event.String()})
+				}
+			case ofcEventConnect:
+				logger.Debugw("ofc-event-connected",
+					log.Fields{"device-id": ofc.DeviceID})
+				if state == ofcStateStarted || state == ofcStateDisconnected {
+					state = ofcStateConnected
+					ofCtx, ofDone = context.WithCancel(context.Background())
+					go ofc.messageSender(ofCtx)
+					go ofc.processOFStream(ofCtx)
+				} else {
+					logger.Errorw("illegal-state-transition",
+						log.Fields{
+							"device-id":     ofc.DeviceID,
+							"current-state": state.String(),
+							"event":         event.String()})
+				}
+			case ofcEventDisconnect:
+				logger.Debugw("ofc-event-disconnected",
+					log.Fields{
+						"device-id": ofc.DeviceID,
+						"state":     state.String()})
+				if state == ofcStateConnected {
+					state = ofcStateDisconnected
+					if ofDone != nil {
+						ofDone()
+						ofDone = nil
+					}
+					go func() {
+						if err := ofc.establishConnectionToController(); err != nil {
+							log.Errorw("controller-connection-failed", log.Fields{"error": err})
+							panic(err)
+						}
+					}()
+				} else {
+					logger.Errorw("illegal-state-transition",
+						log.Fields{
+							"device-id":     ofc.DeviceID,
+							"current-state": state.String(),
+							"event":         event.String()})
+				}
+			case ofcEventStop:
+				logger.Debugw("ofc-event-stop",
+					log.Fields{"device-id": ofc.DeviceID})
+				if state == ofcStateCreated || state == ofcStateConnected || state == ofcStateDisconnected {
+					state = ofcStateStopped
+					break top
+				} else {
+					logger.Errorw("illegal-state-transition",
+						log.Fields{
+							"device-id":     ofc.DeviceID,
+							"current-state": state.String(),
+							"event":         event.String()})
+				}
+			}
+			logger.Debugw("state-transition",
+				log.Fields{
+					"device-id":      ofc.DeviceID,
+					"previous-state": previous.String(),
+					"current-state":  state.String(),
+					"event":          event.String()})
+		}
+	}
+
+	// If the child context exists, then cancel it
+	if ofDone != nil {
+		log.Debugw("closing-child-processes",
+			log.Fields{"device-id": ofc.DeviceID})
+		ofDone()
+	}
+
+	// If the connection is open, then close it
+	if ofc.conn != nil {
+		log.Debugw("closing-of-connection",
+			log.Fields{"device-id": ofc.DeviceID})
+		ofc.conn.Close()
+		ofc.conn = nil
+	}
+	log.Debugw("state-machine-finished",
+		log.Fields{"device-id": ofc.DeviceID})
+}
+
+// processOFStream processes the OF connection from the controller and invokes
+// the appropriate handler methods for each message.
+func (ofc *OFConnection) processOFStream(ctx context.Context) {
+	fromController := bufio.NewReader(ofc.conn)
+
+	/*
+	 * We have a read buffer of a max size of 4096, so if we ever have
+	 * a message larger than this then we will have issues
+	 */
+	headerBuf := make([]byte, 8)
+
+top:
+	// Continue until we are told to stop
+	for {
+		select {
+		case <-ctx.Done():
+			logger.Error("of-loop-ending-context-done")
+			break top
+		default:
+			// Read 8 bytes, the standard OF header
+			read, err := io.ReadFull(fromController, headerBuf)
+			if err != nil {
+				logger.Errorw("bad-of-header",
+					log.Fields{
+						"byte-count": read,
+						"device-id":  ofc.DeviceID,
+						"error":      err})
+				break top
+			}
+
+			// Decode the header
+			peek, err := ofc.peekAtOFHeader(headerBuf)
+			if err != nil {
+				/*
+				 * Header is bad, assume stream is corrupted
+				 * and needs to be restarted
+				 */
+				logger.Errorw("bad-of-packet",
+					log.Fields{
+						"device-id": ofc.DeviceID,
+						"error":     err})
+				break top
+			}
+
+			// Calculate the size of the rest of the packet and read it
+			need := int(peek.GetLength())
+			messageBuf := make([]byte, need)
+			copy(messageBuf, headerBuf)
+			read, err = io.ReadFull(fromController, messageBuf[8:])
+			if err != nil {
+				logger.Errorw("bad-of-packet",
+					log.Fields{
+						"byte-count": read,
+						"device-id":  ofc.DeviceID,
+						"error":      err})
+				break top
+			}
+
+			// Decode and process the packet
+			decoder := goloxi.NewDecoder(messageBuf)
+			msg, err := ofp.DecodeHeader(decoder)
+			if err != nil {
+				// nolint: staticcheck
+				js, _ := json.Marshal(decoder)
+				logger.Errorw("failed-to-decode",
+					log.Fields{
+						"device-id": ofc.DeviceID,
+						"decoder":   js,
+						"error":     err})
+				break top
+			}
+			if logger.V(log.DebugLevel) {
+				js, _ := json.Marshal(msg)
+				logger.Debugw("packet-header",
+					log.Fields{
+						"device-id": ofc.DeviceID,
+						"header":    js})
+			}
+			ofc.parseHeader(msg)
+		}
+	}
+	logger.Debugw("end-of-stream",
+		log.Fields{"device-id": ofc.DeviceID})
+	ofc.events <- ofcEventDisconnect
+}
+
+func (ofc *OFConnection) sayHello() {
+	hello := ofp.NewHello()
+	hello.Xid = uint32(GetXid())
+	elem := ofp.NewHelloElemVersionbitmap()
+	elem.SetType(ofp.OFPHETVersionbitmap)
+	elem.SetLength(8)
+	elem.SetBitmaps([]*ofp.Uint32{{Value: 16}})
+	hello.SetElements([]ofp.IHelloElem{elem})
+	if logger.V(log.DebugLevel) {
+		js, _ := json.Marshal(hello)
+		logger.Debugw("sayHello Called",
+			log.Fields{
+				"device-id":     ofc.DeviceID,
+				"hello-message": js})
+	}
+	if err := ofc.SendMessage(hello); err != nil {
+		logger.Fatalw("Failed saying hello to Openflow Server, unable to proceed",
+			log.Fields{
+				"device-id": ofc.DeviceID,
+				"error":     err})
+	}
+}
+
+func (ofc *OFConnection) parseHeader(header ofp.IHeader) {
+	headerType := header.GetType()
+	logger.Debugw("packet-header-type",
+		log.Fields{
+			"header-type": ofp.Type(headerType).String()})
+	switch headerType {
+	case ofp.OFPTHello:
+		//x := header.(*ofp.Hello)
+	case ofp.OFPTError:
+		go ofc.handleErrMsg(header.(*ofp.ErrorMsg))
+	case ofp.OFPTEchoRequest:
+		go ofc.handleEchoRequest(header.(*ofp.EchoRequest))
+	case ofp.OFPTEchoReply:
+	case ofp.OFPTExperimenter:
+	case ofp.OFPTFeaturesRequest:
+		go func() {
+			if err := ofc.handleFeatureRequest(header.(*ofp.FeaturesRequest)); err != nil {
+				logger.Errorw("handle-feature-request", log.Fields{"error": err})
+			}
+		}()
+	case ofp.OFPTFeaturesReply:
+	case ofp.OFPTGetConfigRequest:
+		go ofc.handleGetConfigRequest(header.(*ofp.GetConfigRequest))
+	case ofp.OFPTGetConfigReply:
+	case ofp.OFPTSetConfig:
+		go ofc.handleSetConfig(header.(*ofp.SetConfig))
+	case ofp.OFPTPacketIn:
+	case ofp.OFPTFlowRemoved:
+	case ofp.OFPTPortStatus:
+	case ofp.OFPTPacketOut:
+		if !(ofc.role == ofcRoleMaster || ofc.role == ofcRoleEqual) {
+			ofc.sendRoleSlaveError(header)
+			return
+		}
+		go ofc.handlePacketOut(header.(*ofp.PacketOut))
+	case ofp.OFPTFlowMod:
+		if !(ofc.role == ofcRoleMaster || ofc.role == ofcRoleEqual) {
+			ofc.sendRoleSlaveError(header)
+			return
+		}
+		/*
+		 * Not using go routine to handle flow* messages or barrier requests
+		 * onos typically issues barrier requests just before a flow* message.
+		 * by handling in this thread I ensure all flow* are handled when barrier
+		 * request is issued.
+		 */
+		switch header.(ofp.IFlowMod).GetCommand() {
+		case ofp.OFPFCAdd:
+			ofc.handleFlowAdd(header.(*ofp.FlowAdd))
+		case ofp.OFPFCModify:
+			ofc.handleFlowMod(header.(*ofp.FlowMod))
+		case ofp.OFPFCModifyStrict:
+			ofc.handleFlowModStrict(header.(*ofp.FlowModifyStrict))
+		case ofp.OFPFCDelete:
+			ofc.handleFlowDelete(header.(*ofp.FlowDelete))
+		case ofp.OFPFCDeleteStrict:
+			ofc.handleFlowDeleteStrict(header.(*ofp.FlowDeleteStrict))
+		}
+	case ofp.OFPTStatsRequest:
+		go func() {
+			if err := ofc.handleStatsRequest(header, header.(ofp.IStatsRequest).GetStatsType()); err != nil {
+				logger.Errorw("ofpt-stats-request", log.Fields{"error": err})
+			}
+		}()
+	case ofp.OFPTBarrierRequest:
+		/* See note above at case ofp.OFPTFlowMod:*/
+		ofc.handleBarrierRequest(header.(*ofp.BarrierRequest))
+	case ofp.OFPTRoleRequest:
+		go ofc.handleRoleRequest(header.(*ofp.RoleRequest))
+	case ofp.OFPTMeterMod:
+		if !(ofc.role == ofcRoleMaster || ofc.role == ofcRoleEqual) {
+			ofc.sendRoleSlaveError(header)
+			return
+		}
+		ofc.handleMeterModRequest(header.(*ofp.MeterMod))
+	case ofp.OFPTGroupMod:
+		if !(ofc.role == ofcRoleMaster || ofc.role == ofcRoleEqual) {
+			ofc.sendRoleSlaveError(header)
+			return
+		}
+		// TODO handle group mods
+	}
+}
+
+// Message interface that represents an open flow message and enables for a
+// unified implementation of SendMessage
+type Message interface {
+	Serialize(encoder *goloxi.Encoder) error
+}
+
+func (ofc *OFConnection) doSend(msg Message) error {
+	if ofc.conn == nil {
+		return errors.New("no-connection")
+	}
+	enc := goloxi.NewEncoder()
+	if err := msg.Serialize(enc); err != nil {
+		return err
+	}
+
+	bytes := enc.Bytes()
+	if _, err := ofc.conn.Write(bytes); err != nil {
+		logger.Errorw("unable-to-send-message-to-controller",
+			log.Fields{
+				"device-id": ofc.DeviceID,
+				"message":   msg,
+				"error":     err})
+		return err
+	}
+	return nil
+}
+
+func (ofc *OFConnection) messageSender(ctx context.Context) {
+	// first process last fail if it exists
+	if ofc.lastUnsentMessage != nil {
+		if err := ofc.doSend(ofc.lastUnsentMessage); err != nil {
+			ofc.events <- ofcEventDisconnect
+			return
+		}
+		ofc.lastUnsentMessage = nil
+	}
+top:
+	for {
+		select {
+		case <-ctx.Done():
+			break top
+		case msg := <-ofc.sendChannel:
+			if err := ofc.doSend(msg); err != nil {
+				ofc.lastUnsentMessage = msg
+				ofc.events <- ofcEventDisconnect
+				logger.Debugw("message-sender-error",
+					log.Fields{
+						"device-id": ofc.DeviceID,
+						"error":     err.Error()})
+				break top
+			}
+			logger.Debugw("message-sender-send",
+				log.Fields{
+					"device-id": ofc.DeviceID})
+			ofc.lastUnsentMessage = nil
+		}
+	}
+
+	logger.Debugw("message-sender-finished",
+		log.Fields{
+			"device-id": ofc.DeviceID})
+}
+
+// SendMessage queues a message to be sent to the openflow controller
+func (ofc *OFConnection) SendMessage(message Message) error {
+	logger.Debug("queuing-message")
+	ofc.sendChannel <- message
+	return nil
+}
diff --git a/internal/pkg/openflow/echo.go b/internal/pkg/openflow/echo.go
index af850a4..7358a0f 100644
--- a/internal/pkg/openflow/echo.go
+++ b/internal/pkg/openflow/echo.go
@@ -22,7 +22,7 @@
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
 )
 
-func (ofc *OFClient) handleEchoRequest(request *ofp.EchoRequest) {
+func (ofc *OFConnection) handleEchoRequest(request *ofp.EchoRequest) {
 	if logger.V(log.DebugLevel) {
 		js, _ := json.Marshal(request)
 		logger.Debugw("handleEchoRequest called",
diff --git a/internal/pkg/openflow/error.go b/internal/pkg/openflow/error.go
index 3a70b93..0472ad8 100644
--- a/internal/pkg/openflow/error.go
+++ b/internal/pkg/openflow/error.go
@@ -22,7 +22,7 @@
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
 )
 
-func (ofc *OFClient) handleErrMsg(message ofp.IErrorMsg) {
+func (ofc *OFConnection) handleErrMsg(message ofp.IErrorMsg) {
 	if logger.V(log.DebugLevel) {
 		js, _ := json.Marshal(message)
 		logger.Debugw("handleErrMsg called",
diff --git a/internal/pkg/openflow/feature.go b/internal/pkg/openflow/feature.go
index 5eb8fa0..82353db 100644
--- a/internal/pkg/openflow/feature.go
+++ b/internal/pkg/openflow/feature.go
@@ -24,7 +24,7 @@
 	"github.com/opencord/voltha-protos/v3/go/common"
 )
 
-func (ofc *OFClient) handleFeatureRequest(request *ofp.FeaturesRequest) error {
+func (ofc *OFConnection) handleFeatureRequest(request *ofp.FeaturesRequest) error {
 	if logger.V(log.DebugLevel) {
 		js, _ := json.Marshal(request)
 		logger.Debugw("handleFeatureRequest called",
diff --git a/internal/pkg/openflow/flowMod.go b/internal/pkg/openflow/flowMod.go
index cabcfc2..114a012 100644
--- a/internal/pkg/openflow/flowMod.go
+++ b/internal/pkg/openflow/flowMod.go
@@ -71,7 +71,7 @@
 	"vlan_vid_masked": 200, //made up
 }
 
-func (ofc *OFClient) handleFlowAdd(flowAdd *ofp.FlowAdd) {
+func (ofc *OFConnection) handleFlowAdd(flowAdd *ofp.FlowAdd) {
 	if logger.V(log.DebugLevel) {
 		js, _ := json.Marshal(flowAdd)
 		logger.Debugw("handleFlowAdd called",
@@ -265,7 +265,7 @@
 	}
 }
 
-func (ofc *OFClient) handleFlowMod(flowMod *ofp.FlowMod) {
+func (ofc *OFConnection) handleFlowMod(flowMod *ofp.FlowMod) {
 	if logger.V(log.DebugLevel) {
 		js, _ := json.Marshal(flowMod)
 		logger.Debugw("handleFlowMod called",
@@ -277,7 +277,7 @@
 		log.Fields{"device-id": ofc.DeviceID})
 }
 
-func (ofc *OFClient) handleFlowModStrict(flowModStrict *ofp.FlowModifyStrict) {
+func (ofc *OFConnection) handleFlowModStrict(flowModStrict *ofp.FlowModifyStrict) {
 	if logger.V(log.DebugLevel) {
 		js, _ := json.Marshal(flowModStrict)
 		logger.Debugw("handleFlowModStrict called",
@@ -289,7 +289,7 @@
 		log.Fields{"device-id": ofc.DeviceID})
 }
 
-func (ofc *OFClient) handleFlowDelete(flowDelete *ofp.FlowDelete) {
+func (ofc *OFConnection) handleFlowDelete(flowDelete *ofp.FlowDelete) {
 	if logger.V(log.DebugLevel) {
 		js, _ := json.Marshal(flowDelete)
 		logger.Debugw("handleFlowDelete called",
@@ -302,7 +302,7 @@
 
 }
 
-func (ofc *OFClient) handleFlowDeleteStrict(flowDeleteStrict *ofp.FlowDeleteStrict) {
+func (ofc *OFConnection) handleFlowDeleteStrict(flowDeleteStrict *ofp.FlowDeleteStrict) {
 	if logger.V(log.DebugLevel) {
 		js, _ := json.Marshal(flowDeleteStrict)
 		logger.Debugw("handleFlowDeleteStrict called",
diff --git a/internal/pkg/openflow/getConfig.go b/internal/pkg/openflow/getConfig.go
index 19efdf4..6425246 100644
--- a/internal/pkg/openflow/getConfig.go
+++ b/internal/pkg/openflow/getConfig.go
@@ -22,7 +22,7 @@
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
 )
 
-func (ofc *OFClient) handleGetConfigRequest(request *ofp.GetConfigRequest) {
+func (ofc *OFConnection) handleGetConfigRequest(request *ofp.GetConfigRequest) {
 	if logger.V(log.DebugLevel) {
 		js, _ := json.Marshal(request)
 		logger.Debugw("handleGetConfigRequest called",
diff --git a/internal/pkg/openflow/meter.go b/internal/pkg/openflow/meter.go
index 61ded75..c8ee22c 100644
--- a/internal/pkg/openflow/meter.go
+++ b/internal/pkg/openflow/meter.go
@@ -23,7 +23,7 @@
 	"golang.org/x/net/context"
 )
 
-func (ofc *OFClient) handleMeterModRequest(request *ofp.MeterMod) {
+func (ofc *OFConnection) handleMeterModRequest(request *ofp.MeterMod) {
 	if logger.V(log.DebugLevel) {
 		js, _ := json.Marshal(request)
 		logger.Debugw("handleMeterModRequest called",
diff --git a/internal/pkg/openflow/packet.go b/internal/pkg/openflow/packet.go
index 622ff0d..7596d4a 100644
--- a/internal/pkg/openflow/packet.go
+++ b/internal/pkg/openflow/packet.go
@@ -23,7 +23,7 @@
 	"github.com/opencord/voltha-protos/v3/go/voltha"
 )
 
-func (ofc *OFClient) handlePacketOut(packetOut *ofp.PacketOut) {
+func (ofc *OFConnection) handlePacketOut(packetOut *ofp.PacketOut) {
 	if logger.V(log.DebugLevel) {
 		js, _ := json.Marshal(packetOut)
 		logger.Debugw("handlePacketOut called",
diff --git a/internal/pkg/openflow/role.go b/internal/pkg/openflow/role.go
index 3a123e8..6d86fe9 100644
--- a/internal/pkg/openflow/role.go
+++ b/internal/pkg/openflow/role.go
@@ -18,11 +18,12 @@
 
 import (
 	"encoding/json"
+	"github.com/donNewtonAlpha/goloxi"
 	ofp "github.com/donNewtonAlpha/goloxi/of13"
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
 )
 
-func (ofc *OFClient) handleRoleRequest(request *ofp.RoleRequest) {
+func (ofc *OFConnection) handleRoleRequest(request *ofp.RoleRequest) {
 	if logger.V(log.DebugLevel) {
 		js, _ := json.Marshal(request)
 		logger.Debugw("handleRoleRequest called",
@@ -30,13 +31,67 @@
 				"device-id": ofc.DeviceID,
 				"request":   js})
 	}
-	reply := ofp.NewRoleReply()
+
+	if request.Role == ofp.OFPCRRoleNochange {
+		reply := ofp.NewRoleReply()
+		reply.SetXid(request.GetXid())
+		reply.SetVersion(request.GetVersion())
+		reply.SetRole(ofp.ControllerRole(ofc.role))
+		reply.SetGenerationId(request.GetGenerationId())
+		if err := ofc.SendMessage(reply); err != nil {
+			logger.Errorw("handle-role-request-send-message", log.Fields{
+				"device-id": ofc.DeviceID,
+				"error":     err})
+		}
+	}
+
+	ok := ofc.roleManager.UpdateRoles(ofc.OFControllerEndPoint, request)
+
+	if ok {
+		reply := ofp.NewRoleReply()
+		reply.SetXid(request.GetXid())
+		reply.SetVersion(request.GetVersion())
+		reply.SetRole(request.GetRole())
+		reply.SetGenerationId(request.GetGenerationId())
+		if err := ofc.SendMessage(reply); err != nil {
+			logger.Errorw("handle-role-request-send-message", log.Fields{
+				"device-id": ofc.DeviceID,
+				"error":     err})
+		}
+	} else {
+		reply := ofp.NewRoleRequestFailedErrorMsg()
+		reply.SetXid(request.GetXid())
+		reply.SetVersion(request.GetVersion())
+		reply.Code = ofp.OFPRRFCStale
+
+		enc := goloxi.NewEncoder()
+		err := request.Serialize(enc)
+		if err == nil {
+			reply.Data = enc.Bytes()
+		}
+
+		if err := ofc.SendMessage(reply); err != nil {
+			logger.Errorw("handle-role-request-send-message", log.Fields{
+				"device-id": ofc.DeviceID,
+				"error":     err})
+		}
+	}
+}
+
+func (ofc *OFConnection) sendRoleSlaveError(request ofp.IHeader) {
+	reply := ofp.NewBadRequestErrorMsg()
 	reply.SetXid(request.GetXid())
 	reply.SetVersion(request.GetVersion())
-	reply.SetRole(request.GetRole())
-	reply.SetGenerationId(request.GetGenerationId())
+	reply.Code = ofp.OFPCRRoleSlave
+
+	enc := goloxi.NewEncoder()
+	err := request.Serialize(enc)
+	if err == nil {
+		reply.Data = enc.Bytes()
+	}
+
 	if err := ofc.SendMessage(reply); err != nil {
-		logger.Errorw("handle-role-request-send-message", log.Fields{
+		logger.Errorw("send-role-slave-error", log.Fields{
 			"device-id": ofc.DeviceID,
 			"error":     err})
 	}
diff --git a/internal/pkg/openflow/role_test.go b/internal/pkg/openflow/role_test.go
new file mode 100644
index 0000000..5eb3ba5
--- /dev/null
+++ b/internal/pkg/openflow/role_test.go
@@ -0,0 +1,160 @@
+/*
+   Copyright 2020 the original author or authors.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+
+package openflow
+
+import (
+	ofp "github.com/donNewtonAlpha/goloxi/of13"
+	"github.com/stretchr/testify/assert"
+	"testing"
+)
+
+type testRoleManager struct {
+	role         ofp.ControllerRole
+	generationId uint64
+	from         string
+
+	generateError bool
+}
+
+func (trm *testRoleManager) UpdateRoles(from string, request *ofp.RoleRequest) bool {
+	trm.from = from
+	trm.role = request.Role
+	trm.generationId = request.GenerationId
+
+	return !trm.generateError
+}
+
+func createOFClient() *OFClient {
+	endpoints := []string{"e1", "e2", "e3"}
+
+	// create a client object
+	ofclient := &OFClient{
+		OFControllerEndPoints: endpoints,
+		connections:           make(map[string]*OFConnection),
+	}
+
+	// create a connection object for each endpoint
+	for _, ep := range endpoints {
+		conn := &OFConnection{OFControllerEndPoint: ep, role: ofcRoleNone}
+		ofclient.connections[ep] = conn
+	}
+
+	return ofclient
+}
+
+func createRoleRequest(role ofp.ControllerRole, genId uint64) *ofp.RoleRequest {
+	rr := ofp.NewRoleRequest()
+	rr.Role = role
+	rr.GenerationId = genId
+	return rr
+}
+
+func TestRoleChange(t *testing.T) {
+	endpoints := []string{"e1", "e2", "e3"}
+	ofclient := &OFClient{
+		OFControllerEndPoints: endpoints, connections: make(map[string]*OFConnection),
+	}
+
+	for _, ep := range endpoints {
+		conn := &OFConnection{OFControllerEndPoint: ep, role: ofcRoleNone}
+		ofclient.connections[ep] = conn
+	}
+
+	assert.Equal(t, ofclient.connections["e1"].role, ofcRoleNone)
+
+	// change role of e1 to master
+	rr := createRoleRequest(ofp.OFPCRRoleMaster, 1)
+
+	ok := ofclient.UpdateRoles("e1", rr)
+	assert.True(t, ok)
+	assert.Equal(t, ofclient.connections["e1"].role, ofcRoleMaster)
+
+	// change role of e2 to master
+	ok = ofclient.UpdateRoles("e2", rr)
+	assert.True(t, ok)
+	assert.Equal(t, ofclient.connections["e2"].role, ofcRoleMaster)
+	// e1 should now have reverted to slave
+	assert.Equal(t, ofclient.connections["e1"].role, ofcRoleSlave)
+
+	// change role of e2 to slave
+	rr = createRoleRequest(ofp.OFPCRRoleSlave, 1)
+
+	ok = ofclient.UpdateRoles("e2", rr)
+	assert.True(t, ok)
+	assert.Equal(t, ofclient.connections["e2"].role, ofcRoleSlave)
+}
+
+func TestStaleRoleRequest(t *testing.T) {
+	ofclient := createOFClient()
+
+	rr1 := createRoleRequest(ofp.OFPCRRoleMaster, 2)
+
+	ok := ofclient.UpdateRoles("e1", rr1)
+	assert.True(t, ok)
+	assert.Equal(t, ofclient.connections["e1"].role, ofcRoleMaster)
+
+	// 'stale' role request
+	rr2 := createRoleRequest(ofp.OFPCRRoleSlave, 1)
+
+	ok = ofclient.UpdateRoles("e1", rr2)
+	// should not have succeeded
+	assert.False(t, ok)
+	// role should remain master
+	assert.Equal(t, ofclient.connections["e1"].role, ofcRoleMaster)
+}
+
+func TestHandleRoleRequest(t *testing.T) {
+
+	trm := &testRoleManager{}
+
+	connection := &OFConnection{
+		OFControllerEndPoint: "e1",
+		roleManager:          trm,
+		sendChannel:          make(chan Message, 10),
+	}
+
+	rr := createRoleRequest(ofp.OFPCRRoleMaster, 1)
+
+	connection.handleRoleRequest(rr)
+
+	assert.Equal(t, "e1", trm.from)
+	assert.EqualValues(t, ofp.OFPCRRoleMaster, trm.role)
+	assert.EqualValues(t, 1, trm.generationId)
+
+	resp := (<-connection.sendChannel).(*ofp.RoleReply)
+
+	assert.EqualValues(t, resp.Role, ofp.OFPCRRoleMaster)
+}
+
+func TestHandleRoleRequestError(t *testing.T) {
+
+	trm := &testRoleManager{generateError: true}
+
+	connection := &OFConnection{
+		OFControllerEndPoint: "e1",
+		roleManager:          trm,
+		sendChannel:          make(chan Message, 10),
+	}
+
+	rr := createRoleRequest(ofp.OFPCRRoleMaster, 1)
+
+	connection.handleRoleRequest(rr)
+
+	resp := (<-connection.sendChannel).(*ofp.RoleRequestFailedErrorMsg)
+
+	assert.EqualValues(t, resp.Code, ofp.OFPRRFCStale)
+}
diff --git a/internal/pkg/openflow/setConfig.go b/internal/pkg/openflow/setConfig.go
index df48f5d..64be03f 100644
--- a/internal/pkg/openflow/setConfig.go
+++ b/internal/pkg/openflow/setConfig.go
@@ -22,7 +22,7 @@
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
 )
 
-func (ofc *OFClient) handleSetConfig(request *ofp.SetConfig) {
+func (ofc *OFConnection) handleSetConfig(request *ofp.SetConfig) {
 	if logger.V(log.DebugLevel) {
 		js, _ := json.Marshal(request)
 		logger.Debugw("handleSetConfig called",
diff --git a/internal/pkg/openflow/stats.go b/internal/pkg/openflow/stats.go
index 520b5a2..86c06d6 100644
--- a/internal/pkg/openflow/stats.go
+++ b/internal/pkg/openflow/stats.go
@@ -28,7 +28,7 @@
 	"unsafe"
 )
 
-func (ofc *OFClient) handleStatsRequest(request ofp.IHeader, statType uint16) error {
+func (ofc *OFConnection) handleStatsRequest(request ofp.IHeader, statType uint16) error {
 	if logger.V(log.DebugLevel) {
 		js, _ := json.Marshal(request)
 		logger.Debugw("handleStatsRequest called",
@@ -288,7 +288,7 @@
 	return nil
 }
 
-func (ofc *OFClient) handleDescStatsRequest(request *ofp.DescStatsRequest) (*ofp.DescStatsReply, error) {
+func (ofc *OFConnection) handleDescStatsRequest(request *ofp.DescStatsRequest) (*ofp.DescStatsReply, error) {
 	if ofc.VolthaClient == nil {
 		return nil, NoVolthaConnectionError
 	}
@@ -312,7 +312,7 @@
 	return response, nil
 }
 
-func (ofc *OFClient) handleFlowStatsRequest(request *ofp.FlowStatsRequest) (*ofp.FlowStatsReply, error) {
+func (ofc *OFConnection) handleFlowStatsRequest(request *ofp.FlowStatsRequest) (*ofp.FlowStatsReply, error) {
 	if ofc.VolthaClient == nil {
 		return nil, NoVolthaConnectionError
 	}
@@ -377,7 +377,7 @@
 	return response, nil
 }
 
-func (ofc *OFClient) handleAggregateStatsRequest(request *ofp.AggregateStatsRequest) (*ofp.AggregateStatsReply, error) {
+func (ofc *OFConnection) handleAggregateStatsRequest(request *ofp.AggregateStatsRequest) (*ofp.AggregateStatsReply, error) {
 	response := ofp.NewAggregateStatsReply()
 	response.SetVersion(request.GetVersion())
 	response.SetXid(request.GetXid())
@@ -387,7 +387,7 @@
 	return response, nil
 }
 
-func (ofc *OFClient) handleGroupStatsRequest(request *ofp.GroupStatsRequest) (*ofp.GroupStatsReply, error) {
+func (ofc *OFConnection) handleGroupStatsRequest(request *ofp.GroupStatsRequest) (*ofp.GroupStatsReply, error) {
 	if ofc.VolthaClient == nil {
 		return nil, NoVolthaConnectionError
 	}
@@ -425,7 +425,7 @@
 	return response, nil
 }
 
-func (ofc *OFClient) handleGroupStatsDescRequest(request *ofp.GroupDescStatsRequest) (*ofp.GroupDescStatsReply, error) {
+func (ofc *OFConnection) handleGroupStatsDescRequest(request *ofp.GroupDescStatsRequest) (*ofp.GroupDescStatsReply, error) {
 	if ofc.VolthaClient == nil {
 		return nil, NoVolthaConnectionError
 	}
@@ -458,7 +458,7 @@
 	return response, nil
 }
 
-func (ofc *OFClient) handleGroupFeatureStatsRequest(request *ofp.GroupFeaturesStatsRequest) (*ofp.GroupFeaturesStatsReply, error) {
+func (ofc *OFConnection) handleGroupFeatureStatsRequest(request *ofp.GroupFeaturesStatsRequest) (*ofp.GroupFeaturesStatsReply, error) {
 	response := ofp.NewGroupFeaturesStatsReply()
 	response.SetVersion(request.GetVersion())
 	response.SetXid(request.GetXid())
@@ -467,7 +467,7 @@
 	return response, nil
 }
 
-func (ofc *OFClient) handleMeterStatsRequest(request *ofp.MeterStatsRequest) (*ofp.MeterStatsReply, error) {
+func (ofc *OFConnection) handleMeterStatsRequest(request *ofp.MeterStatsRequest) (*ofp.MeterStatsReply, error) {
 	if ofc.VolthaClient == nil {
 		return nil, NoVolthaConnectionError
 	}
@@ -509,7 +509,7 @@
 	return response, nil
 }
 
-func (ofc *OFClient) handleMeterConfigStatsRequest(request *ofp.MeterConfigStatsRequest) (*ofp.MeterConfigStatsReply, error) {
+func (ofc *OFConnection) handleMeterConfigStatsRequest(request *ofp.MeterConfigStatsRequest) (*ofp.MeterConfigStatsReply, error) {
 	response := ofp.NewMeterConfigStatsReply()
 	response.SetVersion(request.GetVersion())
 	response.SetXid(request.GetXid())
@@ -518,7 +518,7 @@
 	return response, nil
 }
 
-func (ofc *OFClient) handleTableFeaturesStatsRequest(request *ofp.TableFeaturesStatsRequest) (*ofp.TableFeaturesStatsReply, error) {
+func (ofc *OFConnection) handleTableFeaturesStatsRequest(request *ofp.TableFeaturesStatsRequest) (*ofp.TableFeaturesStatsReply, error) {
 	response := ofp.NewTableFeaturesStatsReply()
 	response.SetVersion(request.GetVersion())
 	response.SetXid(request.GetXid())
@@ -527,7 +527,7 @@
 	return response, nil
 }
 
-func (ofc *OFClient) handleTableStatsRequest(request *ofp.TableStatsRequest) (*ofp.TableStatsReply, error) {
+func (ofc *OFConnection) handleTableStatsRequest(request *ofp.TableStatsRequest) (*ofp.TableStatsReply, error) {
 	var response = ofp.NewTableStatsReply()
 	response.SetFlags(ofp.StatsReplyFlags(request.GetFlags()))
 	response.SetVersion(request.GetVersion())
@@ -536,7 +536,7 @@
 	return response, nil
 }
 
-func (ofc *OFClient) handleQueueStatsRequest(request *ofp.QueueStatsRequest) (*ofp.QueueStatsReply, error) {
+func (ofc *OFConnection) handleQueueStatsRequest(request *ofp.QueueStatsRequest) (*ofp.QueueStatsReply, error) {
 	response := ofp.NewQueueStatsReply()
 	response.SetVersion(request.GetVersion())
 	response.SetXid(request.GetXid())
@@ -545,7 +545,7 @@
 	return response, nil
 }
 
-func (ofc *OFClient) handlePortStatsRequest(request *ofp.PortStatsRequest) (*ofp.PortStatsReply, error) {
+func (ofc *OFConnection) handlePortStatsRequest(request *ofp.PortStatsRequest) (*ofp.PortStatsReply, error) {
 	if ofc.VolthaClient == nil {
 		return nil, NoVolthaConnectionError
 	}
@@ -574,7 +574,7 @@
 	return response, nil
 }
 
-func (ofc *OFClient) handlePortDescStatsRequest(request *ofp.PortDescStatsRequest) (*ofp.PortDescStatsReply, error) {
+func (ofc *OFConnection) handlePortDescStatsRequest(request *ofp.PortDescStatsRequest) (*ofp.PortDescStatsReply, error) {
 	if ofc.VolthaClient == nil {
 		return nil, NoVolthaConnectionError
 	}
@@ -618,7 +618,7 @@
 
 }
 
-func (ofc *OFClient) handleMeterFeatureStatsRequest(request *ofp.MeterFeaturesStatsRequest) (*ofp.MeterFeaturesStatsReply, error) {
+func (ofc *OFConnection) handleMeterFeatureStatsRequest(request *ofp.MeterFeaturesStatsRequest) (*ofp.MeterFeaturesStatsReply, error) {
 	response := ofp.NewMeterFeaturesStatsReply()
 	response.SetXid(request.GetXid())
 	response.SetVersion(request.GetVersion())
@@ -633,7 +633,7 @@
 	return response, nil
 }
 
-func (ofc *OFClient) handleExperimenterStatsRequest(request *ofp.ExperimenterStatsRequest) (*ofp.ExperimenterStatsReply, error) {
+func (ofc *OFConnection) handleExperimenterStatsRequest(request *ofp.ExperimenterStatsRequest) (*ofp.ExperimenterStatsReply, error) {
 	response := ofp.NewExperimenterStatsReply(request.GetExperimenter())
 	response.SetVersion(request.GetVersion())
 	response.SetXid(request.GetXid())