Support connecting to multiple OpenFlow controllers
Change-Id: I0989d5031fb2d4f5aa78ba0e4576e465f826a419
diff --git a/internal/pkg/ofagent/ofagent.go b/internal/pkg/ofagent/ofagent.go
index 1d225b2..419a988 100644
--- a/internal/pkg/ofagent/ofagent.go
+++ b/internal/pkg/ofagent/ofagent.go
@@ -44,7 +44,7 @@
type OFAgent struct {
VolthaApiEndPoint string
- OFControllerEndPoint string
+ OFControllerEndPoints []string
DeviceListRefreshInterval time.Duration
ConnectionMaxRetries int
ConnectionRetryDelay time.Duration
@@ -63,7 +63,7 @@
func NewOFAgent(config *OFAgent) (*OFAgent, error) {
ofa := OFAgent{
VolthaApiEndPoint: config.VolthaApiEndPoint,
- OFControllerEndPoint: config.OFControllerEndPoint,
+ OFControllerEndPoints: config.OFControllerEndPoints,
DeviceListRefreshInterval: config.DeviceListRefreshInterval,
ConnectionMaxRetries: config.ConnectionMaxRetries,
ConnectionRetryDelay: config.ConnectionRetryDelay,
@@ -99,7 +99,7 @@
logger.Debugw("Starting GRPC - VOLTHA client",
log.Fields{
"voltha-endpoint": ofa.VolthaApiEndPoint,
- "controller-endpoint": ofa.OFControllerEndPoint})
+ "controller-endpoint": ofa.OFControllerEndPoints})
// If the context contains a k8s probe then register services
p := probe.GetProbeFromContext(ctx)
diff --git a/internal/pkg/ofagent/refresh.go b/internal/pkg/ofagent/refresh.go
index d50b3ec..835e551 100644
--- a/internal/pkg/ofagent/refresh.go
+++ b/internal/pkg/ofagent/refresh.go
@@ -90,14 +90,14 @@
ofc := ofa.clientMap[deviceID]
if ofc == nil {
ofc = openflow.NewOFClient(&openflow.OFClient{
- DeviceID: deviceID,
- OFControllerEndPoint: ofa.OFControllerEndPoint,
- VolthaClient: ofa.volthaClient,
- PacketOutChannel: ofa.packetOutChannel,
- ConnectionMaxRetries: ofa.ConnectionMaxRetries,
- ConnectionRetryDelay: ofa.ConnectionRetryDelay,
+ DeviceID: deviceID,
+ OFControllerEndPoints: ofa.OFControllerEndPoints,
+ VolthaClient: ofa.volthaClient,
+ PacketOutChannel: ofa.packetOutChannel,
+ ConnectionMaxRetries: ofa.ConnectionMaxRetries,
+ ConnectionRetryDelay: ofa.ConnectionRetryDelay,
})
- go ofc.Run(context.Background())
+ ofc.Run(context.Background())
ofa.clientMap[deviceID] = ofc
}
ofa.mapLock.Unlock()
diff --git a/internal/pkg/openflow/barrier.go b/internal/pkg/openflow/barrier.go
index 3db9e84..4775d2c 100644
--- a/internal/pkg/openflow/barrier.go
+++ b/internal/pkg/openflow/barrier.go
@@ -22,7 +22,7 @@
"github.com/opencord/voltha-lib-go/v3/pkg/log"
)
-func (ofc *OFClient) handleBarrierRequest(request *ofp.BarrierRequest) {
+func (ofc *OFConnection) handleBarrierRequest(request *ofp.BarrierRequest) {
if logger.V(log.DebugLevel) {
js, _ := json.Marshal(request)
diff --git a/internal/pkg/openflow/client.go b/internal/pkg/openflow/client.go
index 94ad645..aeac406 100644
--- a/internal/pkg/openflow/client.go
+++ b/internal/pkg/openflow/client.go
@@ -17,17 +17,12 @@
package openflow
import (
- "bufio"
"context"
- "encoding/binary"
- "encoding/json"
"errors"
- "github.com/donNewtonAlpha/goloxi"
ofp "github.com/donNewtonAlpha/goloxi/of13"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
"github.com/opencord/voltha-protos/v3/go/voltha"
- "io"
- "net"
+ "sync"
"time"
)
@@ -35,6 +30,7 @@
type ofcEvent byte
type ofcState byte
+type ofcRole byte
const (
ofcEventStart = ofcEvent(iota)
@@ -47,6 +43,11 @@
ofcStateConnected
ofcStateDisconnected
ofcStateStopped
+
+ ofcRoleNone = ofcRole(iota)
+ ofcRoleEqual
+ ofcRoleMaster
+ ofcRoleSlave
)
func (e ofcEvent) String() string {
@@ -84,19 +85,73 @@
// OFClient the configuration and operational state of a connection to an
// openflow controller
type OFClient struct {
- OFControllerEndPoint string
- Port uint16
- DeviceID string
- VolthaClient voltha.VolthaServiceClient
- PacketOutChannel chan *voltha.PacketOut
- ConnectionMaxRetries int
- ConnectionRetryDelay time.Duration
- conn net.Conn
+ OFControllerEndPoints []string
+ DeviceID string
+ VolthaClient voltha.VolthaServiceClient
+ PacketOutChannel chan *voltha.PacketOut
+ ConnectionMaxRetries int
+ ConnectionRetryDelay time.Duration
- // expirimental
- events chan ofcEvent
- sendChannel chan Message
- lastUnsentMessage Message
+ // map of endpoint to OF connection
+ connections map[string]*OFConnection
+
+ // global role state for device
+ generationIsDefined bool
+ generationID uint64
+ roleLock sync.Mutex
+}
+
+type RoleManager interface {
+ UpdateRoles(from string, request *ofp.RoleRequest) bool
+}
+
+func distance(a uint64, b uint64) int64 {
+ return (int64)(a - b)
+}
+
+// UpdateRoles validates a role request and updates role state for connections where it changed
+func (ofc *OFClient) UpdateRoles(from string, request *ofp.RoleRequest) bool {
+ log.Debug("updating role", log.Fields{
+ "from": from,
+ "to": request.Role,
+ "id": request.GenerationId})
+
+ ofc.roleLock.Lock()
+ defer ofc.roleLock.Unlock()
+
+ if request.Role == ofp.OFPCRRoleEqual {
+ // equal request doesn't care about generation ID and always succeeds
+ connection := ofc.connections[from]
+ connection.role = ofcRoleEqual
+ return true
+ }
+
+ if ofc.generationIsDefined && distance(request.GenerationId, ofc.generationID) < 0 {
+ // generation ID is not valid
+ return false
+ } else {
+ ofc.generationID = request.GenerationId
+ ofc.generationIsDefined = true
+
+ if request.Role == ofp.OFPCRRoleMaster {
+ // master is potentially changing, find the existing master and set it to slave
+ for endpoint, connection := range ofc.connections {
+ if endpoint == from {
+ connection.role = ofcRoleMaster
+ } else if connection.role == ofcRoleMaster {
+ // the old master should be set to slave
+ connection.role = ofcRoleSlave
+ }
+ }
+ return true
+ } else if request.Role == ofp.OFPCRRoleSlave {
+ connection := ofc.connections[from]
+ connection.role = ofcRoleSlave
+ return true
+ }
+ }
+
+ return false
}
// NewClient returns an initialized OFClient instance based on the configuration
@@ -104,14 +159,13 @@
func NewOFClient(config *OFClient) *OFClient {
ofc := OFClient{
- DeviceID: config.DeviceID,
- OFControllerEndPoint: config.OFControllerEndPoint,
- VolthaClient: config.VolthaClient,
- PacketOutChannel: config.PacketOutChannel,
- ConnectionMaxRetries: config.ConnectionMaxRetries,
- ConnectionRetryDelay: config.ConnectionRetryDelay,
- events: make(chan ofcEvent, 10),
- sendChannel: make(chan Message, 100),
+ DeviceID: config.DeviceID,
+ OFControllerEndPoints: config.OFControllerEndPoints,
+ VolthaClient: config.VolthaClient,
+ PacketOutChannel: config.PacketOutChannel,
+ ConnectionMaxRetries: config.ConnectionMaxRetries,
+ ConnectionRetryDelay: config.ConnectionRetryDelay,
+ connections: make(map[string]*OFConnection),
}
if ofc.ConnectionRetryDelay <= 0 {
@@ -127,417 +181,43 @@
// Stop initiates a shutdown of the OFClient
func (ofc *OFClient) Stop() {
- ofc.events <- ofcEventStop
-}
-
-func (ofc *OFClient) peekAtOFHeader(buf []byte) (ofp.IHeader, error) {
- header := ofp.Header{}
- header.Version = uint8(buf[0])
- header.Type = uint8(buf[1])
- header.Length = binary.BigEndian.Uint16(buf[2:4])
- header.Xid = binary.BigEndian.Uint32(buf[4:8])
-
- // TODO: add minimal validation of version and type
-
- return &header, nil
-}
-
-func (ofc *OFClient) establishConnectionToController() error {
- if ofc.conn != nil {
- logger.Debugw("closing-of-connection-to-reconnect",
- log.Fields{"device-id": ofc.DeviceID})
- ofc.conn.Close()
- ofc.conn = nil
+ for _, connection := range ofc.connections {
+ connection.events <- ofcEventStop
}
- try := 1
- for ofc.ConnectionMaxRetries == 0 || try < ofc.ConnectionMaxRetries {
- if raddr, err := net.ResolveTCPAddr("tcp", ofc.OFControllerEndPoint); err != nil {
- logger.Debugw("openflow-client unable to resolve endpoint",
- log.Fields{
- "device-id": ofc.DeviceID,
- "endpoint": ofc.OFControllerEndPoint})
- } else {
- if connection, err := net.DialTCP("tcp", nil, raddr); err == nil {
- ofc.conn = connection
- ofc.sayHello()
- ofc.events <- ofcEventConnect
- return nil
- } else {
- logger.Warnw("openflow-client-connect-error",
- log.Fields{
- "device-id": ofc.DeviceID,
- "endpoint": ofc.OFControllerEndPoint})
- }
- }
- if ofc.ConnectionMaxRetries == 0 || try < ofc.ConnectionMaxRetries {
- if ofc.ConnectionMaxRetries != 0 {
- try += 1
- }
- time.Sleep(ofc.ConnectionRetryDelay)
- }
- }
- return errors.New("failed-to-connect-to-of-controller")
}
-// Run implementes the state machine for the OF client reacting to state change
-// events and invoking actions as a reaction to those state changes
func (ofc *OFClient) Run(ctx context.Context) {
- var ofCtx context.Context
- var ofDone func()
- state := ofcStateCreated
- ofc.events <- ofcEventStart
-top:
- for {
- select {
- case <-ctx.Done():
- state = ofcStateStopped
- logger.Debugw("state-transition-context-done",
- log.Fields{"device-id": ofc.DeviceID})
- break top
- case event := <-ofc.events:
- previous := state
- switch event {
- case ofcEventStart:
- logger.Debugw("ofc-event-start",
- log.Fields{"device-id": ofc.DeviceID})
- if state == ofcStateCreated {
- state = ofcStateStarted
- logger.Debug("STARTED MORE THAN ONCE")
- go func() {
- if err := ofc.establishConnectionToController(); err != nil {
- logger.Errorw("controller-connection-failed", log.Fields{"error": err})
- panic(err)
- }
- }()
- } else {
- logger.Errorw("illegal-state-transition",
- log.Fields{
- "device-id": ofc.DeviceID,
- "current-state": state.String(),
- "event": event.String()})
- }
- case ofcEventConnect:
- logger.Debugw("ofc-event-connected",
- log.Fields{"device-id": ofc.DeviceID})
- if state == ofcStateStarted || state == ofcStateDisconnected {
- state = ofcStateConnected
- ofCtx, ofDone = context.WithCancel(context.Background())
- go ofc.messageSender(ofCtx)
- go ofc.processOFStream(ofCtx)
- } else {
- logger.Errorw("illegal-state-transition",
- log.Fields{
- "device-id": ofc.DeviceID,
- "current-state": state.String(),
- "event": event.String()})
- }
- case ofcEventDisconnect:
- logger.Debugw("ofc-event-disconnected",
- log.Fields{
- "device-id": ofc.DeviceID,
- "state": state.String()})
- if state == ofcStateConnected {
- state = ofcStateDisconnected
- if ofDone != nil {
- ofDone()
- ofDone = nil
- }
- go func() {
- if err := ofc.establishConnectionToController(); err != nil {
- logger.Errorw("controller-connection-failed", log.Fields{"error": err})
- panic(err)
- }
- }()
- } else {
- logger.Errorw("illegal-state-transition",
- log.Fields{
- "device-id": ofc.DeviceID,
- "current-state": state.String(),
- "event": event.String()})
- }
- case ofcEventStop:
- logger.Debugw("ofc-event-stop",
- log.Fields{"device-id": ofc.DeviceID})
- if state == ofcStateCreated || state == ofcStateConnected || state == ofcStateDisconnected {
- state = ofcStateStopped
- break top
- } else {
- logger.Errorw("illegal-state-transition",
- log.Fields{
- "device-id": ofc.DeviceID,
- "current-state": state.String(),
- "event": event.String()})
- }
- }
- logger.Debugw("state-transition",
- log.Fields{
- "device-id": ofc.DeviceID,
- "previous-state": previous.String(),
- "current-state": state.String(),
- "event": event.String()})
+ for _, endpoint := range ofc.OFControllerEndPoints {
+ connection := &OFConnection{
+ OFControllerEndPoint: endpoint,
+ DeviceID: ofc.DeviceID,
+ VolthaClient: ofc.VolthaClient,
+ PacketOutChannel: ofc.PacketOutChannel,
+ ConnectionMaxRetries: ofc.ConnectionMaxRetries,
+ ConnectionRetryDelay: ofc.ConnectionRetryDelay,
+ role: ofcRoleNone,
+ roleManager: ofc,
+ events: make(chan ofcEvent, 10),
+ sendChannel: make(chan Message, 100),
}
+
+ ofc.connections[endpoint] = connection
}
- // If the child context exists, then cancel it
- if ofDone != nil {
- logger.Debugw("closing-child-processes",
- log.Fields{"device-id": ofc.DeviceID})
- ofDone()
- }
-
- // If the connection is open, then close it
- if ofc.conn != nil {
- logger.Debugw("closing-of-connection",
- log.Fields{"device-id": ofc.DeviceID})
- ofc.conn.Close()
- ofc.conn = nil
- }
- logger.Debugw("state-machine-finished",
- log.Fields{"device-id": ofc.DeviceID})
-}
-
-// processOFStream processes the OF connection from the controller and invokes
-// the appropriate handler methods for each message.
-func (ofc *OFClient) processOFStream(ctx context.Context) {
- fromController := bufio.NewReader(ofc.conn)
-
- /*
- * We have a read buffer of a max size of 4096, so if we ever have
- * a message larger than this then we will have issues
- */
- headerBuf := make([]byte, 8)
-
-top:
- // Continue until we are told to stop
- for {
- select {
- case <-ctx.Done():
- logger.Error("of-loop-ending-context-done")
- break top
- default:
- // Read 8 bytes, the standard OF header
- read, err := io.ReadFull(fromController, headerBuf)
- if err != nil {
- logger.Errorw("bad-of-header",
- log.Fields{
- "byte-count": read,
- "device-id": ofc.DeviceID,
- "error": err})
- break top
- }
-
- // Decode the header
- peek, err := ofc.peekAtOFHeader(headerBuf)
- if err != nil {
- /*
- * Header is bad, assume stream is corrupted
- * and needs to be restarted
- */
- logger.Errorw("bad-of-packet",
- log.Fields{
- "device-id": ofc.DeviceID,
- "error": err})
- break top
- }
-
- // Calculate the size of the rest of the packet and read it
- need := int(peek.GetLength())
- messageBuf := make([]byte, need)
- copy(messageBuf, headerBuf)
- read, err = io.ReadFull(fromController, messageBuf[8:])
- if err != nil {
- logger.Errorw("bad-of-packet",
- log.Fields{
- "byte-count": read,
- "device-id": ofc.DeviceID,
- "error": err})
- break top
- }
-
- // Decode and process the packet
- decoder := goloxi.NewDecoder(messageBuf)
- msg, err := ofp.DecodeHeader(decoder)
- if err != nil {
- // nolint: staticcheck
- js, _ := json.Marshal(decoder)
- logger.Errorw("failed-to-decode",
- log.Fields{
- "device-id": ofc.DeviceID,
- "decoder": js,
- "error": err})
- break top
- }
- if logger.V(log.DebugLevel) {
- js, _ := json.Marshal(msg)
- logger.Debugw("packet-header",
- log.Fields{
- "device-id": ofc.DeviceID,
- "header": js})
- }
- ofc.parseHeader(msg)
- }
- }
- logger.Debugw("end-of-stream",
- log.Fields{"device-id": ofc.DeviceID})
- ofc.events <- ofcEventDisconnect
-}
-
-func (ofc *OFClient) sayHello() {
- hello := ofp.NewHello()
- hello.Xid = uint32(GetXid())
- elem := ofp.NewHelloElemVersionbitmap()
- elem.SetType(ofp.OFPHETVersionbitmap)
- elem.SetLength(8)
- elem.SetBitmaps([]*ofp.Uint32{{Value: 16}})
- hello.SetElements([]ofp.IHelloElem{elem})
- if logger.V(log.DebugLevel) {
- js, _ := json.Marshal(hello)
- logger.Debugw("sayHello Called",
- log.Fields{
- "device-id": ofc.DeviceID,
- "hello-message": js})
- }
- if err := ofc.SendMessage(hello); err != nil {
- logger.Fatalw("Failed saying hello to Openflow Server, unable to proceed",
- log.Fields{
- "device-id": ofc.DeviceID,
- "error": err})
+ for _, connection := range ofc.connections {
+ go connection.Run(ctx)
}
}
-func (ofc *OFClient) parseHeader(header ofp.IHeader) {
- headerType := header.GetType()
- logger.Debugw("packet-header-type",
- log.Fields{
- "header-type": ofp.Type(headerType).String()})
- switch headerType {
- case ofp.OFPTHello:
- //x := header.(*ofp.Hello)
- case ofp.OFPTError:
- go ofc.handleErrMsg(header.(*ofp.ErrorMsg))
- case ofp.OFPTEchoRequest:
- go ofc.handleEchoRequest(header.(*ofp.EchoRequest))
- case ofp.OFPTEchoReply:
- case ofp.OFPTExperimenter:
- case ofp.OFPTFeaturesRequest:
- go func() {
- if err := ofc.handleFeatureRequest(header.(*ofp.FeaturesRequest)); err != nil {
- logger.Errorw("handle-feature-request", log.Fields{"error": err})
- }
- }()
- case ofp.OFPTFeaturesReply:
- case ofp.OFPTGetConfigRequest:
- go ofc.handleGetConfigRequest(header.(*ofp.GetConfigRequest))
- case ofp.OFPTGetConfigReply:
- case ofp.OFPTSetConfig:
- go ofc.handleSetConfig(header.(*ofp.SetConfig))
- case ofp.OFPTPacketIn:
- case ofp.OFPTFlowRemoved:
- case ofp.OFPTPortStatus:
- case ofp.OFPTPacketOut:
- go ofc.handlePacketOut(header.(*ofp.PacketOut))
- case ofp.OFPTFlowMod:
- /*
- * Not using go routine to handle flow* messages or barrier requests
- * onos typically issues barrier requests just before a flow* message.
- * by handling in this thread I ensure all flow* are handled when barrier
- * request is issued.
- */
- switch header.(ofp.IFlowMod).GetCommand() {
- case ofp.OFPFCAdd:
- ofc.handleFlowAdd(header.(*ofp.FlowAdd))
- case ofp.OFPFCModify:
- ofc.handleFlowMod(header.(*ofp.FlowMod))
- case ofp.OFPFCModifyStrict:
- ofc.handleFlowModStrict(header.(*ofp.FlowModifyStrict))
- case ofp.OFPFCDelete:
- ofc.handleFlowDelete(header.(*ofp.FlowDelete))
- case ofp.OFPFCDeleteStrict:
- ofc.handleFlowDeleteStrict(header.(*ofp.FlowDeleteStrict))
- }
- case ofp.OFPTStatsRequest:
- go func() {
- if err := ofc.handleStatsRequest(header, header.(ofp.IStatsRequest).GetStatsType()); err != nil {
- logger.Errorw("ofpt-stats-request", log.Fields{"error": err})
- }
- }()
- case ofp.OFPTBarrierRequest:
- /* See note above at case ofp.OFPTFlowMod:*/
- ofc.handleBarrierRequest(header.(*ofp.BarrierRequest))
- case ofp.OFPTRoleRequest:
- go ofc.handleRoleRequest(header.(*ofp.RoleRequest))
- case ofp.OFPTMeterMod:
- ofc.handleMeterModRequest(header.(*ofp.MeterMod))
- }
-}
-
-// Message interface that represents an open flow message and enables for a
-// unified implementation of SendMessage
-type Message interface {
- Serialize(encoder *goloxi.Encoder) error
-}
-
-func (ofc *OFClient) doSend(msg Message) error {
- if ofc.conn == nil {
- return errors.New("no-connection")
- }
- enc := goloxi.NewEncoder()
- if err := msg.Serialize(enc); err != nil {
- return err
- }
-
- bytes := enc.Bytes()
- if _, err := ofc.conn.Write(bytes); err != nil {
- logger.Errorw("unable-to-send-message-to-controller",
- log.Fields{
- "device-id": ofc.DeviceID,
- "message": msg,
- "error": err})
- return err
- }
- return nil
-}
-
-func (ofc *OFClient) messageSender(ctx context.Context) {
- // first process last fail if it exists
- if ofc.lastUnsentMessage != nil {
- if err := ofc.doSend(ofc.lastUnsentMessage); err != nil {
- ofc.events <- ofcEventDisconnect
- return
- }
- ofc.lastUnsentMessage = nil
- }
-top:
- for {
- select {
- case <-ctx.Done():
- break top
- case msg := <-ofc.sendChannel:
- if err := ofc.doSend(msg); err != nil {
- ofc.lastUnsentMessage = msg
- ofc.events <- ofcEventDisconnect
- logger.Debugw("message-sender-error",
- log.Fields{
- "device-id": ofc.DeviceID,
- "error": err.Error()})
- break top
- }
- logger.Debugw("message-sender-send",
- log.Fields{
- "device-id": ofc.DeviceID})
- ofc.lastUnsentMessage = nil
- }
- }
-
- logger.Debugw("message-sender-finished",
- log.Fields{
- "device-id": ofc.DeviceID})
-}
-
-// SendMessage queues a message to be sent to the openflow controller
func (ofc *OFClient) SendMessage(message Message) error {
- logger.Debug("queuing-message")
- ofc.sendChannel <- message
+ for _, connection := range ofc.connections {
+ if connection.role == ofcRoleMaster || connection.role == ofcRoleEqual {
+ err := connection.SendMessage(message)
+ if err != nil {
+ return err
+ }
+ }
+ }
return nil
}
diff --git a/internal/pkg/openflow/connection.go b/internal/pkg/openflow/connection.go
new file mode 100644
index 0000000..7d57d34
--- /dev/null
+++ b/internal/pkg/openflow/connection.go
@@ -0,0 +1,480 @@
+/*
+ Copyright 2020 the original author or authors.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+package openflow
+
+import (
+ "bufio"
+ "context"
+ "encoding/binary"
+ "encoding/json"
+ "errors"
+ "github.com/donNewtonAlpha/goloxi"
+ ofp "github.com/donNewtonAlpha/goloxi/of13"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ "github.com/opencord/voltha-protos/v3/go/voltha"
+ "io"
+ "net"
+ "time"
+)
+
+type OFConnection struct {
+ OFControllerEndPoint string
+ DeviceID string
+ VolthaClient voltha.VolthaServiceClient
+ PacketOutChannel chan *voltha.PacketOut
+ ConnectionMaxRetries int
+ ConnectionRetryDelay time.Duration
+
+ conn net.Conn
+
+ // current role of this connection
+ role ofcRole
+ roleManager RoleManager
+
+ events chan ofcEvent
+ sendChannel chan Message
+ lastUnsentMessage Message
+}
+
+func (ofc *OFConnection) peekAtOFHeader(buf []byte) (ofp.IHeader, error) {
+ header := ofp.Header{}
+ header.Version = uint8(buf[0])
+ header.Type = uint8(buf[1])
+ header.Length = binary.BigEndian.Uint16(buf[2:4])
+ header.Xid = binary.BigEndian.Uint32(buf[4:8])
+
+ // TODO: add minimal validation of version and type
+
+ return &header, nil
+}
+
+func (ofc *OFConnection) establishConnectionToController() error {
+ if ofc.conn != nil {
+ logger.Debugw("closing-of-connection-to-reconnect",
+ log.Fields{"device-id": ofc.DeviceID})
+ ofc.conn.Close()
+ ofc.conn = nil
+ }
+ try := 1
+ for ofc.ConnectionMaxRetries == 0 || try < ofc.ConnectionMaxRetries {
+ if raddr, err := net.ResolveTCPAddr("tcp", ofc.OFControllerEndPoint); err != nil {
+ logger.Debugw("openflow-client unable to resolve endpoint",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "endpoint": ofc.OFControllerEndPoint})
+ } else {
+ if connection, err := net.DialTCP("tcp", nil, raddr); err == nil {
+ ofc.conn = connection
+ ofc.sayHello()
+ ofc.events <- ofcEventConnect
+ return nil
+ } else {
+ logger.Warnw("openflow-client-connect-error",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "endpoint": ofc.OFControllerEndPoint})
+ }
+ }
+ if ofc.ConnectionMaxRetries == 0 || try < ofc.ConnectionMaxRetries {
+ if ofc.ConnectionMaxRetries != 0 {
+ try += 1
+ }
+ time.Sleep(ofc.ConnectionRetryDelay)
+ }
+ }
+ return errors.New("failed-to-connect-to-of-controller")
+}
+
+// Run implements the state machine for the OF client reacting to state change
+// events and invoking actions as a reaction to those state changes
+func (ofc *OFConnection) Run(ctx context.Context) {
+
+ var ofCtx context.Context
+ var ofDone func()
+ state := ofcStateCreated
+ ofc.events <- ofcEventStart
+top:
+ for {
+ select {
+ case <-ctx.Done():
+ state = ofcStateStopped
+ logger.Debugw("state-transition-context-done",
+ log.Fields{"device-id": ofc.DeviceID})
+ break top
+ case event := <-ofc.events:
+ previous := state
+ switch event {
+ case ofcEventStart:
+ logger.Debugw("ofc-event-start",
+ log.Fields{"device-id": ofc.DeviceID})
+ if state == ofcStateCreated {
+ state = ofcStateStarted
+ logger.Debug("STARTED MORE THAN ONCE")
+ go func() {
+ if err := ofc.establishConnectionToController(); err != nil {
+ logger.Errorw("controller-connection-failed", log.Fields{"error": err})
+ panic(err)
+ }
+ }()
+ } else {
+ logger.Errorw("illegal-state-transition",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "current-state": state.String(),
+ "event": event.String()})
+ }
+ case ofcEventConnect:
+ logger.Debugw("ofc-event-connected",
+ log.Fields{"device-id": ofc.DeviceID})
+ if state == ofcStateStarted || state == ofcStateDisconnected {
+ state = ofcStateConnected
+ ofCtx, ofDone = context.WithCancel(context.Background())
+ go ofc.messageSender(ofCtx)
+ go ofc.processOFStream(ofCtx)
+ } else {
+ logger.Errorw("illegal-state-transition",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "current-state": state.String(),
+ "event": event.String()})
+ }
+ case ofcEventDisconnect:
+ logger.Debugw("ofc-event-disconnected",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "state": state.String()})
+ if state == ofcStateConnected {
+ state = ofcStateDisconnected
+ if ofDone != nil {
+ ofDone()
+ ofDone = nil
+ }
+ go func() {
+ if err := ofc.establishConnectionToController(); err != nil {
+ log.Errorw("controller-connection-failed", log.Fields{"error": err})
+ panic(err)
+ }
+ }()
+ } else {
+ logger.Errorw("illegal-state-transition",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "current-state": state.String(),
+ "event": event.String()})
+ }
+ case ofcEventStop:
+ logger.Debugw("ofc-event-stop",
+ log.Fields{"device-id": ofc.DeviceID})
+ if state == ofcStateCreated || state == ofcStateConnected || state == ofcStateDisconnected {
+ state = ofcStateStopped
+ break top
+ } else {
+ logger.Errorw("illegal-state-transition",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "current-state": state.String(),
+ "event": event.String()})
+ }
+ }
+ logger.Debugw("state-transition",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "previous-state": previous.String(),
+ "current-state": state.String(),
+ "event": event.String()})
+ }
+ }
+
+ // If the child context exists, then cancel it
+ if ofDone != nil {
+ log.Debugw("closing-child-processes",
+ log.Fields{"device-id": ofc.DeviceID})
+ ofDone()
+ }
+
+ // If the connection is open, then close it
+ if ofc.conn != nil {
+ log.Debugw("closing-of-connection",
+ log.Fields{"device-id": ofc.DeviceID})
+ ofc.conn.Close()
+ ofc.conn = nil
+ }
+ log.Debugw("state-machine-finished",
+ log.Fields{"device-id": ofc.DeviceID})
+}
+
+// processOFStream processes the OF connection from the controller and invokes
+// the appropriate handler methods for each message.
+func (ofc *OFConnection) processOFStream(ctx context.Context) {
+ fromController := bufio.NewReader(ofc.conn)
+
+ /*
+ * We have a read buffer of a max size of 4096, so if we ever have
+ * a message larger than this then we will have issues
+ */
+ headerBuf := make([]byte, 8)
+
+top:
+ // Continue until we are told to stop
+ for {
+ select {
+ case <-ctx.Done():
+ logger.Error("of-loop-ending-context-done")
+ break top
+ default:
+ // Read 8 bytes, the standard OF header
+ read, err := io.ReadFull(fromController, headerBuf)
+ if err != nil {
+ logger.Errorw("bad-of-header",
+ log.Fields{
+ "byte-count": read,
+ "device-id": ofc.DeviceID,
+ "error": err})
+ break top
+ }
+
+ // Decode the header
+ peek, err := ofc.peekAtOFHeader(headerBuf)
+ if err != nil {
+ /*
+ * Header is bad, assume stream is corrupted
+ * and needs to be restarted
+ */
+ logger.Errorw("bad-of-packet",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "error": err})
+ break top
+ }
+
+ // Calculate the size of the rest of the packet and read it
+ need := int(peek.GetLength())
+ messageBuf := make([]byte, need)
+ copy(messageBuf, headerBuf)
+ read, err = io.ReadFull(fromController, messageBuf[8:])
+ if err != nil {
+ logger.Errorw("bad-of-packet",
+ log.Fields{
+ "byte-count": read,
+ "device-id": ofc.DeviceID,
+ "error": err})
+ break top
+ }
+
+ // Decode and process the packet
+ decoder := goloxi.NewDecoder(messageBuf)
+ msg, err := ofp.DecodeHeader(decoder)
+ if err != nil {
+ // nolint: staticcheck
+ js, _ := json.Marshal(decoder)
+ logger.Errorw("failed-to-decode",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "decoder": js,
+ "error": err})
+ break top
+ }
+ if logger.V(log.DebugLevel) {
+ js, _ := json.Marshal(msg)
+ logger.Debugw("packet-header",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "header": js})
+ }
+ ofc.parseHeader(msg)
+ }
+ }
+ logger.Debugw("end-of-stream",
+ log.Fields{"device-id": ofc.DeviceID})
+ ofc.events <- ofcEventDisconnect
+}
+
+func (ofc *OFConnection) sayHello() {
+ hello := ofp.NewHello()
+ hello.Xid = uint32(GetXid())
+ elem := ofp.NewHelloElemVersionbitmap()
+ elem.SetType(ofp.OFPHETVersionbitmap)
+ elem.SetLength(8)
+ elem.SetBitmaps([]*ofp.Uint32{{Value: 16}})
+ hello.SetElements([]ofp.IHelloElem{elem})
+ if logger.V(log.DebugLevel) {
+ js, _ := json.Marshal(hello)
+ logger.Debugw("sayHello Called",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "hello-message": js})
+ }
+ if err := ofc.SendMessage(hello); err != nil {
+ logger.Fatalw("Failed saying hello to Openflow Server, unable to proceed",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "error": err})
+ }
+}
+
+func (ofc *OFConnection) parseHeader(header ofp.IHeader) {
+ headerType := header.GetType()
+ logger.Debugw("packet-header-type",
+ log.Fields{
+ "header-type": ofp.Type(headerType).String()})
+ switch headerType {
+ case ofp.OFPTHello:
+ //x := header.(*ofp.Hello)
+ case ofp.OFPTError:
+ go ofc.handleErrMsg(header.(*ofp.ErrorMsg))
+ case ofp.OFPTEchoRequest:
+ go ofc.handleEchoRequest(header.(*ofp.EchoRequest))
+ case ofp.OFPTEchoReply:
+ case ofp.OFPTExperimenter:
+ case ofp.OFPTFeaturesRequest:
+ go func() {
+ if err := ofc.handleFeatureRequest(header.(*ofp.FeaturesRequest)); err != nil {
+ logger.Errorw("handle-feature-request", log.Fields{"error": err})
+ }
+ }()
+ case ofp.OFPTFeaturesReply:
+ case ofp.OFPTGetConfigRequest:
+ go ofc.handleGetConfigRequest(header.(*ofp.GetConfigRequest))
+ case ofp.OFPTGetConfigReply:
+ case ofp.OFPTSetConfig:
+ go ofc.handleSetConfig(header.(*ofp.SetConfig))
+ case ofp.OFPTPacketIn:
+ case ofp.OFPTFlowRemoved:
+ case ofp.OFPTPortStatus:
+ case ofp.OFPTPacketOut:
+ if !(ofc.role == ofcRoleMaster || ofc.role == ofcRoleEqual) {
+ ofc.sendRoleSlaveError(header)
+ return
+ }
+ go ofc.handlePacketOut(header.(*ofp.PacketOut))
+ case ofp.OFPTFlowMod:
+ if !(ofc.role == ofcRoleMaster || ofc.role == ofcRoleEqual) {
+ ofc.sendRoleSlaveError(header)
+ return
+ }
+ /*
+ * Not using go routine to handle flow* messages or barrier requests
+ * onos typically issues barrier requests just before a flow* message.
+ * by handling in this thread I ensure all flow* are handled when barrier
+ * request is issued.
+ */
+ switch header.(ofp.IFlowMod).GetCommand() {
+ case ofp.OFPFCAdd:
+ ofc.handleFlowAdd(header.(*ofp.FlowAdd))
+ case ofp.OFPFCModify:
+ ofc.handleFlowMod(header.(*ofp.FlowMod))
+ case ofp.OFPFCModifyStrict:
+ ofc.handleFlowModStrict(header.(*ofp.FlowModifyStrict))
+ case ofp.OFPFCDelete:
+ ofc.handleFlowDelete(header.(*ofp.FlowDelete))
+ case ofp.OFPFCDeleteStrict:
+ ofc.handleFlowDeleteStrict(header.(*ofp.FlowDeleteStrict))
+ }
+ case ofp.OFPTStatsRequest:
+ go func() {
+ if err := ofc.handleStatsRequest(header, header.(ofp.IStatsRequest).GetStatsType()); err != nil {
+ logger.Errorw("ofpt-stats-request", log.Fields{"error": err})
+ }
+ }()
+ case ofp.OFPTBarrierRequest:
+ /* See note above at case ofp.OFPTFlowMod:*/
+ ofc.handleBarrierRequest(header.(*ofp.BarrierRequest))
+ case ofp.OFPTRoleRequest:
+ go ofc.handleRoleRequest(header.(*ofp.RoleRequest))
+ case ofp.OFPTMeterMod:
+ if !(ofc.role == ofcRoleMaster || ofc.role == ofcRoleEqual) {
+ ofc.sendRoleSlaveError(header)
+ return
+ }
+ ofc.handleMeterModRequest(header.(*ofp.MeterMod))
+ case ofp.OFPTGroupMod:
+ if !(ofc.role == ofcRoleMaster || ofc.role == ofcRoleEqual) {
+ ofc.sendRoleSlaveError(header)
+ return
+ }
+ // TODO handle group mods
+ }
+}
+
+// Message interface that represents an open flow message and enables for a
+// unified implementation of SendMessage
+type Message interface {
+ Serialize(encoder *goloxi.Encoder) error
+}
+
+func (ofc *OFConnection) doSend(msg Message) error {
+ if ofc.conn == nil {
+ return errors.New("no-connection")
+ }
+ enc := goloxi.NewEncoder()
+ if err := msg.Serialize(enc); err != nil {
+ return err
+ }
+
+ bytes := enc.Bytes()
+ if _, err := ofc.conn.Write(bytes); err != nil {
+ logger.Errorw("unable-to-send-message-to-controller",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "message": msg,
+ "error": err})
+ return err
+ }
+ return nil
+}
+
+func (ofc *OFConnection) messageSender(ctx context.Context) {
+ // first process last fail if it exists
+ if ofc.lastUnsentMessage != nil {
+ if err := ofc.doSend(ofc.lastUnsentMessage); err != nil {
+ ofc.events <- ofcEventDisconnect
+ return
+ }
+ ofc.lastUnsentMessage = nil
+ }
+top:
+ for {
+ select {
+ case <-ctx.Done():
+ break top
+ case msg := <-ofc.sendChannel:
+ if err := ofc.doSend(msg); err != nil {
+ ofc.lastUnsentMessage = msg
+ ofc.events <- ofcEventDisconnect
+ logger.Debugw("message-sender-error",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "error": err.Error()})
+ break top
+ }
+ logger.Debugw("message-sender-send",
+ log.Fields{
+ "device-id": ofc.DeviceID})
+ ofc.lastUnsentMessage = nil
+ }
+ }
+
+ logger.Debugw("message-sender-finished",
+ log.Fields{
+ "device-id": ofc.DeviceID})
+}
+
+// SendMessage queues a message to be sent to the openflow controller
+func (ofc *OFConnection) SendMessage(message Message) error {
+ logger.Debug("queuing-message")
+ ofc.sendChannel <- message
+ return nil
+}
diff --git a/internal/pkg/openflow/echo.go b/internal/pkg/openflow/echo.go
index af850a4..7358a0f 100644
--- a/internal/pkg/openflow/echo.go
+++ b/internal/pkg/openflow/echo.go
@@ -22,7 +22,7 @@
"github.com/opencord/voltha-lib-go/v3/pkg/log"
)
-func (ofc *OFClient) handleEchoRequest(request *ofp.EchoRequest) {
+func (ofc *OFConnection) handleEchoRequest(request *ofp.EchoRequest) {
if logger.V(log.DebugLevel) {
js, _ := json.Marshal(request)
logger.Debugw("handleEchoRequest called",
diff --git a/internal/pkg/openflow/error.go b/internal/pkg/openflow/error.go
index 3a70b93..0472ad8 100644
--- a/internal/pkg/openflow/error.go
+++ b/internal/pkg/openflow/error.go
@@ -22,7 +22,7 @@
"github.com/opencord/voltha-lib-go/v3/pkg/log"
)
-func (ofc *OFClient) handleErrMsg(message ofp.IErrorMsg) {
+func (ofc *OFConnection) handleErrMsg(message ofp.IErrorMsg) {
if logger.V(log.DebugLevel) {
js, _ := json.Marshal(message)
logger.Debugw("handleErrMsg called",
diff --git a/internal/pkg/openflow/feature.go b/internal/pkg/openflow/feature.go
index 5eb8fa0..82353db 100644
--- a/internal/pkg/openflow/feature.go
+++ b/internal/pkg/openflow/feature.go
@@ -24,7 +24,7 @@
"github.com/opencord/voltha-protos/v3/go/common"
)
-func (ofc *OFClient) handleFeatureRequest(request *ofp.FeaturesRequest) error {
+func (ofc *OFConnection) handleFeatureRequest(request *ofp.FeaturesRequest) error {
if logger.V(log.DebugLevel) {
js, _ := json.Marshal(request)
logger.Debugw("handleFeatureRequest called",
diff --git a/internal/pkg/openflow/flowMod.go b/internal/pkg/openflow/flowMod.go
index cabcfc2..114a012 100644
--- a/internal/pkg/openflow/flowMod.go
+++ b/internal/pkg/openflow/flowMod.go
@@ -71,7 +71,7 @@
"vlan_vid_masked": 200, //made up
}
-func (ofc *OFClient) handleFlowAdd(flowAdd *ofp.FlowAdd) {
+func (ofc *OFConnection) handleFlowAdd(flowAdd *ofp.FlowAdd) {
if logger.V(log.DebugLevel) {
js, _ := json.Marshal(flowAdd)
logger.Debugw("handleFlowAdd called",
@@ -265,7 +265,7 @@
}
}
-func (ofc *OFClient) handleFlowMod(flowMod *ofp.FlowMod) {
+func (ofc *OFConnection) handleFlowMod(flowMod *ofp.FlowMod) {
if logger.V(log.DebugLevel) {
js, _ := json.Marshal(flowMod)
logger.Debugw("handleFlowMod called",
@@ -277,7 +277,7 @@
log.Fields{"device-id": ofc.DeviceID})
}
-func (ofc *OFClient) handleFlowModStrict(flowModStrict *ofp.FlowModifyStrict) {
+func (ofc *OFConnection) handleFlowModStrict(flowModStrict *ofp.FlowModifyStrict) {
if logger.V(log.DebugLevel) {
js, _ := json.Marshal(flowModStrict)
logger.Debugw("handleFlowModStrict called",
@@ -289,7 +289,7 @@
log.Fields{"device-id": ofc.DeviceID})
}
-func (ofc *OFClient) handleFlowDelete(flowDelete *ofp.FlowDelete) {
+func (ofc *OFConnection) handleFlowDelete(flowDelete *ofp.FlowDelete) {
if logger.V(log.DebugLevel) {
js, _ := json.Marshal(flowDelete)
logger.Debugw("handleFlowDelete called",
@@ -302,7 +302,7 @@
}
-func (ofc *OFClient) handleFlowDeleteStrict(flowDeleteStrict *ofp.FlowDeleteStrict) {
+func (ofc *OFConnection) handleFlowDeleteStrict(flowDeleteStrict *ofp.FlowDeleteStrict) {
if logger.V(log.DebugLevel) {
js, _ := json.Marshal(flowDeleteStrict)
logger.Debugw("handleFlowDeleteStrict called",
diff --git a/internal/pkg/openflow/getConfig.go b/internal/pkg/openflow/getConfig.go
index 19efdf4..6425246 100644
--- a/internal/pkg/openflow/getConfig.go
+++ b/internal/pkg/openflow/getConfig.go
@@ -22,7 +22,7 @@
"github.com/opencord/voltha-lib-go/v3/pkg/log"
)
-func (ofc *OFClient) handleGetConfigRequest(request *ofp.GetConfigRequest) {
+func (ofc *OFConnection) handleGetConfigRequest(request *ofp.GetConfigRequest) {
if logger.V(log.DebugLevel) {
js, _ := json.Marshal(request)
logger.Debugw("handleGetConfigRequest called",
diff --git a/internal/pkg/openflow/meter.go b/internal/pkg/openflow/meter.go
index 61ded75..c8ee22c 100644
--- a/internal/pkg/openflow/meter.go
+++ b/internal/pkg/openflow/meter.go
@@ -23,7 +23,7 @@
"golang.org/x/net/context"
)
-func (ofc *OFClient) handleMeterModRequest(request *ofp.MeterMod) {
+func (ofc *OFConnection) handleMeterModRequest(request *ofp.MeterMod) {
if logger.V(log.DebugLevel) {
js, _ := json.Marshal(request)
logger.Debugw("handleMeterModRequest called",
diff --git a/internal/pkg/openflow/packet.go b/internal/pkg/openflow/packet.go
index 622ff0d..7596d4a 100644
--- a/internal/pkg/openflow/packet.go
+++ b/internal/pkg/openflow/packet.go
@@ -23,7 +23,7 @@
"github.com/opencord/voltha-protos/v3/go/voltha"
)
-func (ofc *OFClient) handlePacketOut(packetOut *ofp.PacketOut) {
+func (ofc *OFConnection) handlePacketOut(packetOut *ofp.PacketOut) {
if logger.V(log.DebugLevel) {
js, _ := json.Marshal(packetOut)
logger.Debugw("handlePacketOut called",
diff --git a/internal/pkg/openflow/role.go b/internal/pkg/openflow/role.go
index 3a123e8..6d86fe9 100644
--- a/internal/pkg/openflow/role.go
+++ b/internal/pkg/openflow/role.go
@@ -18,11 +18,12 @@
import (
"encoding/json"
+ "github.com/donNewtonAlpha/goloxi"
ofp "github.com/donNewtonAlpha/goloxi/of13"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
)
-func (ofc *OFClient) handleRoleRequest(request *ofp.RoleRequest) {
+func (ofc *OFConnection) handleRoleRequest(request *ofp.RoleRequest) {
if logger.V(log.DebugLevel) {
js, _ := json.Marshal(request)
logger.Debugw("handleRoleRequest called",
@@ -30,13 +31,67 @@
"device-id": ofc.DeviceID,
"request": js})
}
- reply := ofp.NewRoleReply()
+
+ if request.Role == ofp.OFPCRRoleNochange {
+ reply := ofp.NewRoleReply()
+ reply.SetXid(request.GetXid())
+ reply.SetVersion(request.GetVersion())
+ reply.SetRole(ofp.ControllerRole(ofc.role))
+ reply.SetGenerationId(request.GetGenerationId())
+ if err := ofc.SendMessage(reply); err != nil {
+ logger.Errorw("handle-role-request-send-message", log.Fields{
+ "device-id": ofc.DeviceID,
+ "error": err})
+ }
+ }
+
+ ok := ofc.roleManager.UpdateRoles(ofc.OFControllerEndPoint, request)
+
+ if ok {
+ reply := ofp.NewRoleReply()
+ reply.SetXid(request.GetXid())
+ reply.SetVersion(request.GetVersion())
+ reply.SetRole(request.GetRole())
+ reply.SetGenerationId(request.GetGenerationId())
+ if err := ofc.SendMessage(reply); err != nil {
+ logger.Errorw("handle-role-request-send-message", log.Fields{
+ "device-id": ofc.DeviceID,
+ "error": err})
+ }
+ } else {
+ reply := ofp.NewRoleRequestFailedErrorMsg()
+ reply.SetXid(request.GetXid())
+ reply.SetVersion(request.GetVersion())
+ reply.Code = ofp.OFPRRFCStale
+
+ enc := goloxi.NewEncoder()
+ err := request.Serialize(enc)
+ if err == nil {
+ reply.Data = enc.Bytes()
+ }
+
+ if err := ofc.SendMessage(reply); err != nil {
+ logger.Errorw("handle-role-request-send-message", log.Fields{
+ "device-id": ofc.DeviceID,
+ "error": err})
+ }
+ }
+}
+
+func (ofc *OFConnection) sendRoleSlaveError(request ofp.IHeader) {
+ reply := ofp.NewBadRequestErrorMsg()
reply.SetXid(request.GetXid())
reply.SetVersion(request.GetVersion())
- reply.SetRole(request.GetRole())
- reply.SetGenerationId(request.GetGenerationId())
+ reply.Code = ofp.OFPCRRoleSlave
+
+ enc := goloxi.NewEncoder()
+ err := request.Serialize(enc)
+ if err == nil {
+ reply.Data = enc.Bytes()
+ }
+
if err := ofc.SendMessage(reply); err != nil {
- logger.Errorw("handle-role-request-send-message", log.Fields{
+ logger.Errorw("send-role-slave-error", log.Fields{
"device-id": ofc.DeviceID,
"error": err})
}
diff --git a/internal/pkg/openflow/role_test.go b/internal/pkg/openflow/role_test.go
new file mode 100644
index 0000000..5eb3ba5
--- /dev/null
+++ b/internal/pkg/openflow/role_test.go
@@ -0,0 +1,160 @@
+/*
+ Copyright 2020 the original author or authors.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package openflow
+
+import (
+ ofp "github.com/donNewtonAlpha/goloxi/of13"
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+type testRoleManager struct {
+ role ofp.ControllerRole
+ generationId uint64
+ from string
+
+ generateError bool
+}
+
+func (trm *testRoleManager) UpdateRoles(from string, request *ofp.RoleRequest) bool {
+ trm.from = from
+ trm.role = request.Role
+ trm.generationId = request.GenerationId
+
+ return !trm.generateError
+}
+
+func createOFClient() *OFClient {
+ endpoints := []string{"e1", "e2", "e3"}
+
+ // create a client object
+ ofclient := &OFClient{
+ OFControllerEndPoints: endpoints,
+ connections: make(map[string]*OFConnection),
+ }
+
+ // create a connection object for each endpoint
+ for _, ep := range endpoints {
+ conn := &OFConnection{OFControllerEndPoint: ep, role: ofcRoleNone}
+ ofclient.connections[ep] = conn
+ }
+
+ return ofclient
+}
+
+func createRoleRequest(role ofp.ControllerRole, genId uint64) *ofp.RoleRequest {
+ rr := ofp.NewRoleRequest()
+ rr.Role = role
+ rr.GenerationId = genId
+ return rr
+}
+
+func TestRoleChange(t *testing.T) {
+ endpoints := []string{"e1", "e2", "e3"}
+ ofclient := &OFClient{
+ OFControllerEndPoints: endpoints, connections: make(map[string]*OFConnection),
+ }
+
+ for _, ep := range endpoints {
+ conn := &OFConnection{OFControllerEndPoint: ep, role: ofcRoleNone}
+ ofclient.connections[ep] = conn
+ }
+
+ assert.Equal(t, ofclient.connections["e1"].role, ofcRoleNone)
+
+ // change role of e1 to master
+ rr := createRoleRequest(ofp.OFPCRRoleMaster, 1)
+
+ ok := ofclient.UpdateRoles("e1", rr)
+ assert.True(t, ok)
+ assert.Equal(t, ofclient.connections["e1"].role, ofcRoleMaster)
+
+ // change role of e2 to master
+ ok = ofclient.UpdateRoles("e2", rr)
+ assert.True(t, ok)
+ assert.Equal(t, ofclient.connections["e2"].role, ofcRoleMaster)
+ // e1 should now have reverted to slave
+ assert.Equal(t, ofclient.connections["e1"].role, ofcRoleSlave)
+
+ // change role of e2 to slave
+ rr = createRoleRequest(ofp.OFPCRRoleSlave, 1)
+
+ ok = ofclient.UpdateRoles("e2", rr)
+ assert.True(t, ok)
+ assert.Equal(t, ofclient.connections["e2"].role, ofcRoleSlave)
+}
+
+func TestStaleRoleRequest(t *testing.T) {
+ ofclient := createOFClient()
+
+ rr1 := createRoleRequest(ofp.OFPCRRoleMaster, 2)
+
+ ok := ofclient.UpdateRoles("e1", rr1)
+ assert.True(t, ok)
+ assert.Equal(t, ofclient.connections["e1"].role, ofcRoleMaster)
+
+ // 'stale' role request
+ rr2 := createRoleRequest(ofp.OFPCRRoleSlave, 1)
+
+ ok = ofclient.UpdateRoles("e1", rr2)
+ // should not have succeeded
+ assert.False(t, ok)
+ // role should remain master
+ assert.Equal(t, ofclient.connections["e1"].role, ofcRoleMaster)
+}
+
+func TestHandleRoleRequest(t *testing.T) {
+
+ trm := &testRoleManager{}
+
+ connection := &OFConnection{
+ OFControllerEndPoint: "e1",
+ roleManager: trm,
+ sendChannel: make(chan Message, 10),
+ }
+
+ rr := createRoleRequest(ofp.OFPCRRoleMaster, 1)
+
+ connection.handleRoleRequest(rr)
+
+ assert.Equal(t, "e1", trm.from)
+ assert.EqualValues(t, ofp.OFPCRRoleMaster, trm.role)
+ assert.EqualValues(t, 1, trm.generationId)
+
+ resp := (<-connection.sendChannel).(*ofp.RoleReply)
+
+ assert.EqualValues(t, resp.Role, ofp.OFPCRRoleMaster)
+}
+
+func TestHandleRoleRequestError(t *testing.T) {
+
+ trm := &testRoleManager{generateError: true}
+
+ connection := &OFConnection{
+ OFControllerEndPoint: "e1",
+ roleManager: trm,
+ sendChannel: make(chan Message, 10),
+ }
+
+ rr := createRoleRequest(ofp.OFPCRRoleMaster, 1)
+
+ connection.handleRoleRequest(rr)
+
+ resp := (<-connection.sendChannel).(*ofp.RoleRequestFailedErrorMsg)
+
+ assert.EqualValues(t, resp.Code, ofp.OFPRRFCStale)
+}
diff --git a/internal/pkg/openflow/setConfig.go b/internal/pkg/openflow/setConfig.go
index df48f5d..64be03f 100644
--- a/internal/pkg/openflow/setConfig.go
+++ b/internal/pkg/openflow/setConfig.go
@@ -22,7 +22,7 @@
"github.com/opencord/voltha-lib-go/v3/pkg/log"
)
-func (ofc *OFClient) handleSetConfig(request *ofp.SetConfig) {
+func (ofc *OFConnection) handleSetConfig(request *ofp.SetConfig) {
if logger.V(log.DebugLevel) {
js, _ := json.Marshal(request)
logger.Debugw("handleSetConfig called",
diff --git a/internal/pkg/openflow/stats.go b/internal/pkg/openflow/stats.go
index 520b5a2..86c06d6 100644
--- a/internal/pkg/openflow/stats.go
+++ b/internal/pkg/openflow/stats.go
@@ -28,7 +28,7 @@
"unsafe"
)
-func (ofc *OFClient) handleStatsRequest(request ofp.IHeader, statType uint16) error {
+func (ofc *OFConnection) handleStatsRequest(request ofp.IHeader, statType uint16) error {
if logger.V(log.DebugLevel) {
js, _ := json.Marshal(request)
logger.Debugw("handleStatsRequest called",
@@ -288,7 +288,7 @@
return nil
}
-func (ofc *OFClient) handleDescStatsRequest(request *ofp.DescStatsRequest) (*ofp.DescStatsReply, error) {
+func (ofc *OFConnection) handleDescStatsRequest(request *ofp.DescStatsRequest) (*ofp.DescStatsReply, error) {
if ofc.VolthaClient == nil {
return nil, NoVolthaConnectionError
}
@@ -312,7 +312,7 @@
return response, nil
}
-func (ofc *OFClient) handleFlowStatsRequest(request *ofp.FlowStatsRequest) (*ofp.FlowStatsReply, error) {
+func (ofc *OFConnection) handleFlowStatsRequest(request *ofp.FlowStatsRequest) (*ofp.FlowStatsReply, error) {
if ofc.VolthaClient == nil {
return nil, NoVolthaConnectionError
}
@@ -377,7 +377,7 @@
return response, nil
}
-func (ofc *OFClient) handleAggregateStatsRequest(request *ofp.AggregateStatsRequest) (*ofp.AggregateStatsReply, error) {
+func (ofc *OFConnection) handleAggregateStatsRequest(request *ofp.AggregateStatsRequest) (*ofp.AggregateStatsReply, error) {
response := ofp.NewAggregateStatsReply()
response.SetVersion(request.GetVersion())
response.SetXid(request.GetXid())
@@ -387,7 +387,7 @@
return response, nil
}
-func (ofc *OFClient) handleGroupStatsRequest(request *ofp.GroupStatsRequest) (*ofp.GroupStatsReply, error) {
+func (ofc *OFConnection) handleGroupStatsRequest(request *ofp.GroupStatsRequest) (*ofp.GroupStatsReply, error) {
if ofc.VolthaClient == nil {
return nil, NoVolthaConnectionError
}
@@ -425,7 +425,7 @@
return response, nil
}
-func (ofc *OFClient) handleGroupStatsDescRequest(request *ofp.GroupDescStatsRequest) (*ofp.GroupDescStatsReply, error) {
+func (ofc *OFConnection) handleGroupStatsDescRequest(request *ofp.GroupDescStatsRequest) (*ofp.GroupDescStatsReply, error) {
if ofc.VolthaClient == nil {
return nil, NoVolthaConnectionError
}
@@ -458,7 +458,7 @@
return response, nil
}
-func (ofc *OFClient) handleGroupFeatureStatsRequest(request *ofp.GroupFeaturesStatsRequest) (*ofp.GroupFeaturesStatsReply, error) {
+func (ofc *OFConnection) handleGroupFeatureStatsRequest(request *ofp.GroupFeaturesStatsRequest) (*ofp.GroupFeaturesStatsReply, error) {
response := ofp.NewGroupFeaturesStatsReply()
response.SetVersion(request.GetVersion())
response.SetXid(request.GetXid())
@@ -467,7 +467,7 @@
return response, nil
}
-func (ofc *OFClient) handleMeterStatsRequest(request *ofp.MeterStatsRequest) (*ofp.MeterStatsReply, error) {
+func (ofc *OFConnection) handleMeterStatsRequest(request *ofp.MeterStatsRequest) (*ofp.MeterStatsReply, error) {
if ofc.VolthaClient == nil {
return nil, NoVolthaConnectionError
}
@@ -509,7 +509,7 @@
return response, nil
}
-func (ofc *OFClient) handleMeterConfigStatsRequest(request *ofp.MeterConfigStatsRequest) (*ofp.MeterConfigStatsReply, error) {
+func (ofc *OFConnection) handleMeterConfigStatsRequest(request *ofp.MeterConfigStatsRequest) (*ofp.MeterConfigStatsReply, error) {
response := ofp.NewMeterConfigStatsReply()
response.SetVersion(request.GetVersion())
response.SetXid(request.GetXid())
@@ -518,7 +518,7 @@
return response, nil
}
-func (ofc *OFClient) handleTableFeaturesStatsRequest(request *ofp.TableFeaturesStatsRequest) (*ofp.TableFeaturesStatsReply, error) {
+func (ofc *OFConnection) handleTableFeaturesStatsRequest(request *ofp.TableFeaturesStatsRequest) (*ofp.TableFeaturesStatsReply, error) {
response := ofp.NewTableFeaturesStatsReply()
response.SetVersion(request.GetVersion())
response.SetXid(request.GetXid())
@@ -527,7 +527,7 @@
return response, nil
}
-func (ofc *OFClient) handleTableStatsRequest(request *ofp.TableStatsRequest) (*ofp.TableStatsReply, error) {
+func (ofc *OFConnection) handleTableStatsRequest(request *ofp.TableStatsRequest) (*ofp.TableStatsReply, error) {
var response = ofp.NewTableStatsReply()
response.SetFlags(ofp.StatsReplyFlags(request.GetFlags()))
response.SetVersion(request.GetVersion())
@@ -536,7 +536,7 @@
return response, nil
}
-func (ofc *OFClient) handleQueueStatsRequest(request *ofp.QueueStatsRequest) (*ofp.QueueStatsReply, error) {
+func (ofc *OFConnection) handleQueueStatsRequest(request *ofp.QueueStatsRequest) (*ofp.QueueStatsReply, error) {
response := ofp.NewQueueStatsReply()
response.SetVersion(request.GetVersion())
response.SetXid(request.GetXid())
@@ -545,7 +545,7 @@
return response, nil
}
-func (ofc *OFClient) handlePortStatsRequest(request *ofp.PortStatsRequest) (*ofp.PortStatsReply, error) {
+func (ofc *OFConnection) handlePortStatsRequest(request *ofp.PortStatsRequest) (*ofp.PortStatsReply, error) {
if ofc.VolthaClient == nil {
return nil, NoVolthaConnectionError
}
@@ -574,7 +574,7 @@
return response, nil
}
-func (ofc *OFClient) handlePortDescStatsRequest(request *ofp.PortDescStatsRequest) (*ofp.PortDescStatsReply, error) {
+func (ofc *OFConnection) handlePortDescStatsRequest(request *ofp.PortDescStatsRequest) (*ofp.PortDescStatsReply, error) {
if ofc.VolthaClient == nil {
return nil, NoVolthaConnectionError
}
@@ -618,7 +618,7 @@
}
-func (ofc *OFClient) handleMeterFeatureStatsRequest(request *ofp.MeterFeaturesStatsRequest) (*ofp.MeterFeaturesStatsReply, error) {
+func (ofc *OFConnection) handleMeterFeatureStatsRequest(request *ofp.MeterFeaturesStatsRequest) (*ofp.MeterFeaturesStatsReply, error) {
response := ofp.NewMeterFeaturesStatsReply()
response.SetXid(request.GetXid())
response.SetVersion(request.GetVersion())
@@ -633,7 +633,7 @@
return response, nil
}
-func (ofc *OFClient) handleExperimenterStatsRequest(request *ofp.ExperimenterStatsRequest) (*ofp.ExperimenterStatsReply, error) {
+func (ofc *OFConnection) handleExperimenterStatsRequest(request *ofp.ExperimenterStatsRequest) (*ofp.ExperimenterStatsReply, error) {
response := ofp.NewExperimenterStatsReply(request.GetExperimenter())
response.SetVersion(request.GetVersion())
response.SetXid(request.GetXid())