Support connecting to multiple OpenFlow controllers

Change-Id: I0989d5031fb2d4f5aa78ba0e4576e465f826a419
diff --git a/internal/pkg/openflow/client.go b/internal/pkg/openflow/client.go
index 94ad645..aeac406 100644
--- a/internal/pkg/openflow/client.go
+++ b/internal/pkg/openflow/client.go
@@ -17,17 +17,12 @@
 package openflow
 
 import (
-	"bufio"
 	"context"
-	"encoding/binary"
-	"encoding/json"
 	"errors"
-	"github.com/donNewtonAlpha/goloxi"
 	ofp "github.com/donNewtonAlpha/goloxi/of13"
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
 	"github.com/opencord/voltha-protos/v3/go/voltha"
-	"io"
-	"net"
+	"sync"
 	"time"
 )
 
@@ -35,6 +30,7 @@
 
 type ofcEvent byte
 type ofcState byte
+type ofcRole byte
 
 const (
 	ofcEventStart = ofcEvent(iota)
@@ -47,6 +43,11 @@
 	ofcStateConnected
 	ofcStateDisconnected
 	ofcStateStopped
+
+	ofcRoleNone = ofcRole(iota)
+	ofcRoleEqual
+	ofcRoleMaster
+	ofcRoleSlave
 )
 
 func (e ofcEvent) String() string {
@@ -84,19 +85,73 @@
 // OFClient the configuration and operational state of a connection to an
 // openflow controller
 type OFClient struct {
-	OFControllerEndPoint string
-	Port                 uint16
-	DeviceID             string
-	VolthaClient         voltha.VolthaServiceClient
-	PacketOutChannel     chan *voltha.PacketOut
-	ConnectionMaxRetries int
-	ConnectionRetryDelay time.Duration
-	conn                 net.Conn
+	OFControllerEndPoints []string
+	DeviceID              string
+	VolthaClient          voltha.VolthaServiceClient
+	PacketOutChannel      chan *voltha.PacketOut
+	ConnectionMaxRetries  int
+	ConnectionRetryDelay  time.Duration
 
-	// expirimental
-	events            chan ofcEvent
-	sendChannel       chan Message
-	lastUnsentMessage Message
+	// map of endpoint to OF connection
+	connections map[string]*OFConnection
+
+	// global role state for device
+	generationIsDefined bool
+	generationID        uint64
+	roleLock            sync.Mutex
+}
+
+type RoleManager interface {
+	UpdateRoles(from string, request *ofp.RoleRequest) bool
+}
+
+func distance(a uint64, b uint64) int64 {
+	return (int64)(a - b)
+}
+
+// UpdateRoles validates a role request and updates role state for connections where it changed
+func (ofc *OFClient) UpdateRoles(from string, request *ofp.RoleRequest) bool {
+	log.Debug("updating role", log.Fields{
+		"from": from,
+		"to":   request.Role,
+		"id":   request.GenerationId})
+
+	ofc.roleLock.Lock()
+	defer ofc.roleLock.Unlock()
+
+	if request.Role == ofp.OFPCRRoleEqual {
+		// equal request doesn't care about generation ID and always succeeds
+		connection := ofc.connections[from]
+		connection.role = ofcRoleEqual
+		return true
+	}
+
+	if ofc.generationIsDefined && distance(request.GenerationId, ofc.generationID) < 0 {
+		// generation ID is not valid
+		return false
+	} else {
+		ofc.generationID = request.GenerationId
+		ofc.generationIsDefined = true
+
+		if request.Role == ofp.OFPCRRoleMaster {
+			// master is potentially changing, find the existing master and set it to slave
+			for endpoint, connection := range ofc.connections {
+				if endpoint == from {
+					connection.role = ofcRoleMaster
+				} else if connection.role == ofcRoleMaster {
+					// the old master should be set to slave
+					connection.role = ofcRoleSlave
+				}
+			}
+			return true
+		} else if request.Role == ofp.OFPCRRoleSlave {
+			connection := ofc.connections[from]
+			connection.role = ofcRoleSlave
+			return true
+		}
+	}
+
+	return false
 }
 
 // NewClient returns an initialized OFClient instance based on the configuration
@@ -104,14 +159,13 @@
 func NewOFClient(config *OFClient) *OFClient {
 
 	ofc := OFClient{
-		DeviceID:             config.DeviceID,
-		OFControllerEndPoint: config.OFControllerEndPoint,
-		VolthaClient:         config.VolthaClient,
-		PacketOutChannel:     config.PacketOutChannel,
-		ConnectionMaxRetries: config.ConnectionMaxRetries,
-		ConnectionRetryDelay: config.ConnectionRetryDelay,
-		events:               make(chan ofcEvent, 10),
-		sendChannel:          make(chan Message, 100),
+		DeviceID:              config.DeviceID,
+		OFControllerEndPoints: config.OFControllerEndPoints,
+		VolthaClient:          config.VolthaClient,
+		PacketOutChannel:      config.PacketOutChannel,
+		ConnectionMaxRetries:  config.ConnectionMaxRetries,
+		ConnectionRetryDelay:  config.ConnectionRetryDelay,
+		connections:           make(map[string]*OFConnection),
 	}
 
 	if ofc.ConnectionRetryDelay <= 0 {
@@ -127,417 +181,43 @@
 
 // Stop initiates a shutdown of the OFClient
 func (ofc *OFClient) Stop() {
-	ofc.events <- ofcEventStop
-}
-
-func (ofc *OFClient) peekAtOFHeader(buf []byte) (ofp.IHeader, error) {
-	header := ofp.Header{}
-	header.Version = uint8(buf[0])
-	header.Type = uint8(buf[1])
-	header.Length = binary.BigEndian.Uint16(buf[2:4])
-	header.Xid = binary.BigEndian.Uint32(buf[4:8])
-
-	// TODO: add minimal validation of version and type
-
-	return &header, nil
-}
-
-func (ofc *OFClient) establishConnectionToController() error {
-	if ofc.conn != nil {
-		logger.Debugw("closing-of-connection-to-reconnect",
-			log.Fields{"device-id": ofc.DeviceID})
-		ofc.conn.Close()
-		ofc.conn = nil
+	for _, connection := range ofc.connections {
+		connection.events <- ofcEventStop
 	}
-	try := 1
-	for ofc.ConnectionMaxRetries == 0 || try < ofc.ConnectionMaxRetries {
-		if raddr, err := net.ResolveTCPAddr("tcp", ofc.OFControllerEndPoint); err != nil {
-			logger.Debugw("openflow-client unable to resolve endpoint",
-				log.Fields{
-					"device-id": ofc.DeviceID,
-					"endpoint":  ofc.OFControllerEndPoint})
-		} else {
-			if connection, err := net.DialTCP("tcp", nil, raddr); err == nil {
-				ofc.conn = connection
-				ofc.sayHello()
-				ofc.events <- ofcEventConnect
-				return nil
-			} else {
-				logger.Warnw("openflow-client-connect-error",
-					log.Fields{
-						"device-id": ofc.DeviceID,
-						"endpoint":  ofc.OFControllerEndPoint})
-			}
-		}
-		if ofc.ConnectionMaxRetries == 0 || try < ofc.ConnectionMaxRetries {
-			if ofc.ConnectionMaxRetries != 0 {
-				try += 1
-			}
-			time.Sleep(ofc.ConnectionRetryDelay)
-		}
-	}
-	return errors.New("failed-to-connect-to-of-controller")
 }
 
-// Run implementes the state machine for the OF client reacting to state change
-// events and invoking actions as a reaction to those state changes
 func (ofc *OFClient) Run(ctx context.Context) {
 
-	var ofCtx context.Context
-	var ofDone func()
-	state := ofcStateCreated
-	ofc.events <- ofcEventStart
-top:
-	for {
-		select {
-		case <-ctx.Done():
-			state = ofcStateStopped
-			logger.Debugw("state-transition-context-done",
-				log.Fields{"device-id": ofc.DeviceID})
-			break top
-		case event := <-ofc.events:
-			previous := state
-			switch event {
-			case ofcEventStart:
-				logger.Debugw("ofc-event-start",
-					log.Fields{"device-id": ofc.DeviceID})
-				if state == ofcStateCreated {
-					state = ofcStateStarted
-					logger.Debug("STARTED MORE THAN ONCE")
-					go func() {
-						if err := ofc.establishConnectionToController(); err != nil {
-							logger.Errorw("controller-connection-failed", log.Fields{"error": err})
-							panic(err)
-						}
-					}()
-				} else {
-					logger.Errorw("illegal-state-transition",
-						log.Fields{
-							"device-id":     ofc.DeviceID,
-							"current-state": state.String(),
-							"event":         event.String()})
-				}
-			case ofcEventConnect:
-				logger.Debugw("ofc-event-connected",
-					log.Fields{"device-id": ofc.DeviceID})
-				if state == ofcStateStarted || state == ofcStateDisconnected {
-					state = ofcStateConnected
-					ofCtx, ofDone = context.WithCancel(context.Background())
-					go ofc.messageSender(ofCtx)
-					go ofc.processOFStream(ofCtx)
-				} else {
-					logger.Errorw("illegal-state-transition",
-						log.Fields{
-							"device-id":     ofc.DeviceID,
-							"current-state": state.String(),
-							"event":         event.String()})
-				}
-			case ofcEventDisconnect:
-				logger.Debugw("ofc-event-disconnected",
-					log.Fields{
-						"device-id": ofc.DeviceID,
-						"state":     state.String()})
-				if state == ofcStateConnected {
-					state = ofcStateDisconnected
-					if ofDone != nil {
-						ofDone()
-						ofDone = nil
-					}
-					go func() {
-						if err := ofc.establishConnectionToController(); err != nil {
-							logger.Errorw("controller-connection-failed", log.Fields{"error": err})
-							panic(err)
-						}
-					}()
-				} else {
-					logger.Errorw("illegal-state-transition",
-						log.Fields{
-							"device-id":     ofc.DeviceID,
-							"current-state": state.String(),
-							"event":         event.String()})
-				}
-			case ofcEventStop:
-				logger.Debugw("ofc-event-stop",
-					log.Fields{"device-id": ofc.DeviceID})
-				if state == ofcStateCreated || state == ofcStateConnected || state == ofcStateDisconnected {
-					state = ofcStateStopped
-					break top
-				} else {
-					logger.Errorw("illegal-state-transition",
-						log.Fields{
-							"device-id":     ofc.DeviceID,
-							"current-state": state.String(),
-							"event":         event.String()})
-				}
-			}
-			logger.Debugw("state-transition",
-				log.Fields{
-					"device-id":      ofc.DeviceID,
-					"previous-state": previous.String(),
-					"current-state":  state.String(),
-					"event":          event.String()})
+	for _, endpoint := range ofc.OFControllerEndPoints {
+		connection := &OFConnection{
+			OFControllerEndPoint: endpoint,
+			DeviceID:             ofc.DeviceID,
+			VolthaClient:         ofc.VolthaClient,
+			PacketOutChannel:     ofc.PacketOutChannel,
+			ConnectionMaxRetries: ofc.ConnectionMaxRetries,
+			ConnectionRetryDelay: ofc.ConnectionRetryDelay,
+			role:                 ofcRoleNone,
+			roleManager:          ofc,
+			events:               make(chan ofcEvent, 10),
+			sendChannel:          make(chan Message, 100),
 		}
+
+		ofc.connections[endpoint] = connection
 	}
 
-	// If the child context exists, then cancel it
-	if ofDone != nil {
-		logger.Debugw("closing-child-processes",
-			log.Fields{"device-id": ofc.DeviceID})
-		ofDone()
-	}
-
-	// If the connection is open, then close it
-	if ofc.conn != nil {
-		logger.Debugw("closing-of-connection",
-			log.Fields{"device-id": ofc.DeviceID})
-		ofc.conn.Close()
-		ofc.conn = nil
-	}
-	logger.Debugw("state-machine-finished",
-		log.Fields{"device-id": ofc.DeviceID})
-}
-
-// processOFStream processes the OF connection from the controller and invokes
-// the appropriate handler methods for each message.
-func (ofc *OFClient) processOFStream(ctx context.Context) {
-	fromController := bufio.NewReader(ofc.conn)
-
-	/*
-	 * We have a read buffer of a max size of 4096, so if we ever have
-	 * a message larger than this then we will have issues
-	 */
-	headerBuf := make([]byte, 8)
-
-top:
-	// Continue until we are told to stop
-	for {
-		select {
-		case <-ctx.Done():
-			logger.Error("of-loop-ending-context-done")
-			break top
-		default:
-			// Read 8 bytes, the standard OF header
-			read, err := io.ReadFull(fromController, headerBuf)
-			if err != nil {
-				logger.Errorw("bad-of-header",
-					log.Fields{
-						"byte-count": read,
-						"device-id":  ofc.DeviceID,
-						"error":      err})
-				break top
-			}
-
-			// Decode the header
-			peek, err := ofc.peekAtOFHeader(headerBuf)
-			if err != nil {
-				/*
-				 * Header is bad, assume stream is corrupted
-				 * and needs to be restarted
-				 */
-				logger.Errorw("bad-of-packet",
-					log.Fields{
-						"device-id": ofc.DeviceID,
-						"error":     err})
-				break top
-			}
-
-			// Calculate the size of the rest of the packet and read it
-			need := int(peek.GetLength())
-			messageBuf := make([]byte, need)
-			copy(messageBuf, headerBuf)
-			read, err = io.ReadFull(fromController, messageBuf[8:])
-			if err != nil {
-				logger.Errorw("bad-of-packet",
-					log.Fields{
-						"byte-count": read,
-						"device-id":  ofc.DeviceID,
-						"error":      err})
-				break top
-			}
-
-			// Decode and process the packet
-			decoder := goloxi.NewDecoder(messageBuf)
-			msg, err := ofp.DecodeHeader(decoder)
-			if err != nil {
-				// nolint: staticcheck
-				js, _ := json.Marshal(decoder)
-				logger.Errorw("failed-to-decode",
-					log.Fields{
-						"device-id": ofc.DeviceID,
-						"decoder":   js,
-						"error":     err})
-				break top
-			}
-			if logger.V(log.DebugLevel) {
-				js, _ := json.Marshal(msg)
-				logger.Debugw("packet-header",
-					log.Fields{
-						"device-id": ofc.DeviceID,
-						"header":    js})
-			}
-			ofc.parseHeader(msg)
-		}
-	}
-	logger.Debugw("end-of-stream",
-		log.Fields{"device-id": ofc.DeviceID})
-	ofc.events <- ofcEventDisconnect
-}
-
-func (ofc *OFClient) sayHello() {
-	hello := ofp.NewHello()
-	hello.Xid = uint32(GetXid())
-	elem := ofp.NewHelloElemVersionbitmap()
-	elem.SetType(ofp.OFPHETVersionbitmap)
-	elem.SetLength(8)
-	elem.SetBitmaps([]*ofp.Uint32{{Value: 16}})
-	hello.SetElements([]ofp.IHelloElem{elem})
-	if logger.V(log.DebugLevel) {
-		js, _ := json.Marshal(hello)
-		logger.Debugw("sayHello Called",
-			log.Fields{
-				"device-id":     ofc.DeviceID,
-				"hello-message": js})
-	}
-	if err := ofc.SendMessage(hello); err != nil {
-		logger.Fatalw("Failed saying hello to Openflow Server, unable to proceed",
-			log.Fields{
-				"device-id": ofc.DeviceID,
-				"error":     err})
+	for _, connection := range ofc.connections {
+		go connection.Run(ctx)
 	}
 }
 
-func (ofc *OFClient) parseHeader(header ofp.IHeader) {
-	headerType := header.GetType()
-	logger.Debugw("packet-header-type",
-		log.Fields{
-			"header-type": ofp.Type(headerType).String()})
-	switch headerType {
-	case ofp.OFPTHello:
-		//x := header.(*ofp.Hello)
-	case ofp.OFPTError:
-		go ofc.handleErrMsg(header.(*ofp.ErrorMsg))
-	case ofp.OFPTEchoRequest:
-		go ofc.handleEchoRequest(header.(*ofp.EchoRequest))
-	case ofp.OFPTEchoReply:
-	case ofp.OFPTExperimenter:
-	case ofp.OFPTFeaturesRequest:
-		go func() {
-			if err := ofc.handleFeatureRequest(header.(*ofp.FeaturesRequest)); err != nil {
-				logger.Errorw("handle-feature-request", log.Fields{"error": err})
-			}
-		}()
-	case ofp.OFPTFeaturesReply:
-	case ofp.OFPTGetConfigRequest:
-		go ofc.handleGetConfigRequest(header.(*ofp.GetConfigRequest))
-	case ofp.OFPTGetConfigReply:
-	case ofp.OFPTSetConfig:
-		go ofc.handleSetConfig(header.(*ofp.SetConfig))
-	case ofp.OFPTPacketIn:
-	case ofp.OFPTFlowRemoved:
-	case ofp.OFPTPortStatus:
-	case ofp.OFPTPacketOut:
-		go ofc.handlePacketOut(header.(*ofp.PacketOut))
-	case ofp.OFPTFlowMod:
-		/*
-		 * Not using go routine to handle flow* messages or barrier requests
-		 * onos typically issues barrier requests just before a flow* message.
-		 * by handling in this thread I ensure all flow* are handled when barrier
-		 * request is issued.
-		 */
-		switch header.(ofp.IFlowMod).GetCommand() {
-		case ofp.OFPFCAdd:
-			ofc.handleFlowAdd(header.(*ofp.FlowAdd))
-		case ofp.OFPFCModify:
-			ofc.handleFlowMod(header.(*ofp.FlowMod))
-		case ofp.OFPFCModifyStrict:
-			ofc.handleFlowModStrict(header.(*ofp.FlowModifyStrict))
-		case ofp.OFPFCDelete:
-			ofc.handleFlowDelete(header.(*ofp.FlowDelete))
-		case ofp.OFPFCDeleteStrict:
-			ofc.handleFlowDeleteStrict(header.(*ofp.FlowDeleteStrict))
-		}
-	case ofp.OFPTStatsRequest:
-		go func() {
-			if err := ofc.handleStatsRequest(header, header.(ofp.IStatsRequest).GetStatsType()); err != nil {
-				logger.Errorw("ofpt-stats-request", log.Fields{"error": err})
-			}
-		}()
-	case ofp.OFPTBarrierRequest:
-		/* See note above at case ofp.OFPTFlowMod:*/
-		ofc.handleBarrierRequest(header.(*ofp.BarrierRequest))
-	case ofp.OFPTRoleRequest:
-		go ofc.handleRoleRequest(header.(*ofp.RoleRequest))
-	case ofp.OFPTMeterMod:
-		ofc.handleMeterModRequest(header.(*ofp.MeterMod))
-	}
-}
-
-// Message interface that represents an open flow message and enables for a
-// unified implementation of SendMessage
-type Message interface {
-	Serialize(encoder *goloxi.Encoder) error
-}
-
-func (ofc *OFClient) doSend(msg Message) error {
-	if ofc.conn == nil {
-		return errors.New("no-connection")
-	}
-	enc := goloxi.NewEncoder()
-	if err := msg.Serialize(enc); err != nil {
-		return err
-	}
-
-	bytes := enc.Bytes()
-	if _, err := ofc.conn.Write(bytes); err != nil {
-		logger.Errorw("unable-to-send-message-to-controller",
-			log.Fields{
-				"device-id": ofc.DeviceID,
-				"message":   msg,
-				"error":     err})
-		return err
-	}
-	return nil
-}
-
-func (ofc *OFClient) messageSender(ctx context.Context) {
-	// first process last fail if it exists
-	if ofc.lastUnsentMessage != nil {
-		if err := ofc.doSend(ofc.lastUnsentMessage); err != nil {
-			ofc.events <- ofcEventDisconnect
-			return
-		}
-		ofc.lastUnsentMessage = nil
-	}
-top:
-	for {
-		select {
-		case <-ctx.Done():
-			break top
-		case msg := <-ofc.sendChannel:
-			if err := ofc.doSend(msg); err != nil {
-				ofc.lastUnsentMessage = msg
-				ofc.events <- ofcEventDisconnect
-				logger.Debugw("message-sender-error",
-					log.Fields{
-						"device-id": ofc.DeviceID,
-						"error":     err.Error()})
-				break top
-			}
-			logger.Debugw("message-sender-send",
-				log.Fields{
-					"device-id": ofc.DeviceID})
-			ofc.lastUnsentMessage = nil
-		}
-	}
-
-	logger.Debugw("message-sender-finished",
-		log.Fields{
-			"device-id": ofc.DeviceID})
-}
-
-// SendMessage queues a message to be sent to the openflow controller
 func (ofc *OFClient) SendMessage(message Message) error {
-	logger.Debug("queuing-message")
-	ofc.sendChannel <- message
+	for _, connection := range ofc.connections {
+		if connection.role == ofcRoleMaster || connection.role == ofcRoleEqual {
+			err := connection.SendMessage(message)
+			if err != nil {
+				return err
+			}
+		}
+	}
 	return nil
 }