blob: de780a0e4b76253c990660c0f64b2ef160b29c0b [file] [log] [blame]
/*
Copyright 2020 the original author or authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package openflow
import (
"bufio"
"context"
"encoding/binary"
"encoding/json"
"errors"
"io"
"net"
"sync"
"time"
"github.com/opencord/goloxi"
ofp "github.com/opencord/goloxi/of13"
"github.com/opencord/ofagent-go/internal/pkg/holder"
"github.com/opencord/voltha-lib-go/v5/pkg/log"
"github.com/opencord/voltha-protos/v4/go/voltha"
)
type OFConnection struct {
OFControllerEndPoint string
DeviceID string
VolthaClient *holder.VolthaServiceClientHolder
PacketOutChannel chan *voltha.PacketOut
ConnectionMaxRetries int
ConnectionRetryDelay time.Duration
conn net.Conn
// current role of this connection
role ofcRole
roleManager RoleManager
events chan ofcEvent
sendChannel chan Message
lastUnsentMessage Message
flowsChunkSize int
portsChunkSize int
portsDescChunkSize int
}
func (ofc *OFConnection) peekAtOFHeader(buf []byte) (ofp.IHeader, error) {
header := ofp.Header{}
header.Version = uint8(buf[0])
header.Type = uint8(buf[1])
header.Length = binary.BigEndian.Uint16(buf[2:4])
header.Xid = binary.BigEndian.Uint32(buf[4:8])
// TODO: add minimal validation of version and type
return &header, nil
}
func (ofc *OFConnection) establishConnectionToController(ctx context.Context) error {
if ofc.conn != nil {
logger.Debugw(ctx, "closing-of-connection-to-reconnect",
log.Fields{"device-id": ofc.DeviceID})
err := ofc.conn.Close()
if err != nil {
logger.Errorw(ctx, "failed-connection-close-proceeding-setting-to-nil", log.Fields{"error": err})
}
ofc.conn = nil
}
try := 1
for ofc.ConnectionMaxRetries == 0 || try < ofc.ConnectionMaxRetries {
if raddr, err := net.ResolveTCPAddr("tcp", ofc.OFControllerEndPoint); err != nil {
logger.Debugw(ctx, "openflow-client unable to resolve endpoint",
log.Fields{
"device-id": ofc.DeviceID,
"endpoint": ofc.OFControllerEndPoint})
} else {
if connection, err := net.DialTCP("tcp", nil, raddr); err == nil {
ofc.conn = connection
ofc.sayHello(ctx)
ofc.events <- ofcEventConnect
return nil
} else {
logger.Warnw(ctx, "openflow-client-connect-error",
log.Fields{
"device-id": ofc.DeviceID,
"endpoint": ofc.OFControllerEndPoint})
}
}
if ofc.ConnectionMaxRetries == 0 || try < ofc.ConnectionMaxRetries {
if ofc.ConnectionMaxRetries != 0 {
try += 1
}
time.Sleep(ofc.ConnectionRetryDelay)
}
}
return errors.New("failed-to-connect-to-of-controller")
}
// Run implements the state machine for the OF client reacting to state change
// events and invoking actions as a reaction to those state changes
func (ofc *OFConnection) Run(ctx context.Context) {
var ofCtx context.Context
var ofDone func()
state := ofcStateCreated
ofc.events <- ofcEventStart
top:
for {
select {
case <-ctx.Done():
state = ofcStateStopped
logger.Debugw(ctx, "state-transition-context-done",
log.Fields{"device-id": ofc.DeviceID})
break top
case event := <-ofc.events:
previous := state
switch event {
case ofcEventStart:
logger.Debugw(ctx, "ofc-event-start",
log.Fields{"device-id": ofc.DeviceID})
if state == ofcStateCreated {
state = ofcStateStarted
logger.Debug(ctx, "STARTED MORE THAN ONCE")
go func() {
if err := ofc.establishConnectionToController(ctx); err != nil {
logger.Errorw(ctx, "controller-connection-failed", log.Fields{"error": err})
panic(err)
}
}()
} else {
logger.Errorw(ctx, "illegal-state-transition",
log.Fields{
"device-id": ofc.DeviceID,
"current-state": state.String(),
"event": event.String()})
}
case ofcEventConnect:
logger.Debugw(ctx, "ofc-event-connected",
log.Fields{"device-id": ofc.DeviceID})
if state == ofcStateStarted || state == ofcStateDisconnected {
state = ofcStateConnected
ofCtx, ofDone = context.WithCancel(log.WithSpanFromContext(context.Background(), ctx))
go ofc.messageSender(ofCtx)
go ofc.processOFStream(ofCtx)
} else {
logger.Errorw(ctx, "illegal-state-transition",
log.Fields{
"device-id": ofc.DeviceID,
"current-state": state.String(),
"event": event.String()})
}
case ofcEventDisconnect:
logger.Debugw(ctx, "ofc-event-disconnected",
log.Fields{
"device-id": ofc.DeviceID,
"state": state.String()})
if state == ofcStateConnected {
state = ofcStateDisconnected
if ofDone != nil {
ofDone()
ofDone = nil
}
go func() {
if err := ofc.establishConnectionToController(ctx); err != nil {
logger.Errorw(ctx, "controller-connection-failed", log.Fields{"error": err})
panic(err)
}
}()
} else {
logger.Errorw(ctx, "illegal-state-transition",
log.Fields{
"device-id": ofc.DeviceID,
"current-state": state.String(),
"event": event.String()})
}
case ofcEventStop:
logger.Debugw(ctx, "ofc-event-stop",
log.Fields{"device-id": ofc.DeviceID})
if state == ofcStateCreated || state == ofcStateConnected || state == ofcStateDisconnected {
state = ofcStateStopped
break top
} else {
logger.Errorw(ctx, "illegal-state-transition",
log.Fields{
"device-id": ofc.DeviceID,
"current-state": state.String(),
"event": event.String()})
}
}
logger.Debugw(ctx, "state-transition",
log.Fields{
"device-id": ofc.DeviceID,
"previous-state": previous.String(),
"current-state": state.String(),
"event": event.String()})
}
}
// If the child context exists, then cancel it
if ofDone != nil {
logger.Debugw(ctx, "closing-child-processes",
log.Fields{"device-id": ofc.DeviceID})
ofDone()
}
// If the connection is open, then close it
if ofc.conn != nil {
logger.Debugw(ctx, "closing-of-connection",
log.Fields{"device-id": ofc.DeviceID})
err := ofc.conn.Close()
if err != nil {
logger.Errorw(ctx, "closing-of-connection", log.Fields{"error": err})
}
ofc.conn = nil
}
logger.Debugw(ctx, "state-machine-finished",
log.Fields{"device-id": ofc.DeviceID})
}
// processOFStream processes the OF connection from the controller and invokes
// the appropriate handler methods for each message.
func (ofc *OFConnection) processOFStream(ctx context.Context) {
fromController := bufio.NewReader(ofc.conn)
/*
* We have a read buffer of a max size of 4096, so if we ever have
* a message larger than this then we will have issues
*/
headerBuf := make([]byte, 8)
wg := sync.WaitGroup{}
top:
// Continue until we are told to stop
for {
select {
case <-ctx.Done():
logger.Error(ctx, "of-loop-ending-context-done")
break top
default:
// Read 8 bytes, the standard OF header
read, err := io.ReadFull(fromController, headerBuf)
if err != nil {
if err == io.EOF {
logger.Infow(ctx, "controller-disconnected",
log.Fields{
"device-id": ofc.DeviceID,
"controller": ofc.OFControllerEndPoint,
})
} else {
logger.Errorw(ctx, "bad-of-header",
log.Fields{
"byte-count": read,
"device-id": ofc.DeviceID,
"controller": ofc.OFControllerEndPoint,
"error": err})
}
break top
}
// Decode the header
peek, err := ofc.peekAtOFHeader(headerBuf)
if err != nil {
/*
* Header is bad, assume stream is corrupted
* and needs to be restarted
*/
logger.Errorw(ctx, "bad-of-packet",
log.Fields{
"device-id": ofc.DeviceID,
"error": err})
break top
}
// Calculate the size of the rest of the packet and read it
need := int(peek.GetLength())
messageBuf := make([]byte, need)
copy(messageBuf, headerBuf)
read, err = io.ReadFull(fromController, messageBuf[8:])
if err != nil {
logger.Errorw(ctx, "bad-of-packet",
log.Fields{
"byte-count": read,
"device-id": ofc.DeviceID,
"error": err})
break top
}
// Decode and process the packet
decoder := goloxi.NewDecoder(messageBuf)
msg, err := ofp.DecodeHeader(decoder)
if err != nil {
// nolint: staticcheck
js, _ := json.Marshal(decoder)
logger.Errorw(ctx, "failed-to-decode",
log.Fields{
"device-id": ofc.DeviceID,
"decoder": js,
"error": err})
break top
}
if logger.V(log.DebugLevel) {
js, _ := json.Marshal(msg)
logger.Debugw(ctx, "packet-header",
log.Fields{
"device-id": ofc.DeviceID,
"header": js})
}
// We can parallelize the processing of all the operations
// that we get before a BarrieRequest, then we need to wait.
// What we are doing is:
// - spawn threads until we get a Barrier
// - when we get a barrier wait for the threads to complete before continuing
msgType := msg.GetType()
if msgType == ofp.OFPTBarrierRequest {
logger.Debug(ctx, "received-barrier-request-waiting-for-pending-requests")
wg.Wait()
logger.Debug(ctx, "restarting-requests-processing")
}
wg.Add(1)
go ofc.parseHeader(ctx, msg, &wg)
}
}
logger.Debugw(ctx, "end-of-stream",
log.Fields{"device-id": ofc.DeviceID})
ofc.events <- ofcEventDisconnect
}
func (ofc *OFConnection) sayHello(ctx context.Context) {
hello := ofp.NewHello()
hello.Xid = uint32(GetXid())
elem := ofp.NewHelloElemVersionbitmap()
elem.SetType(ofp.OFPHETVersionbitmap)
elem.SetLength(8)
elem.SetBitmaps([]*ofp.Uint32{{Value: 16}})
hello.SetElements([]ofp.IHelloElem{elem})
if logger.V(log.DebugLevel) {
js, _ := json.Marshal(hello)
logger.Debugw(ctx, "sayHello Called",
log.Fields{
"device-id": ofc.DeviceID,
"hello-message": js})
}
if err := ofc.SendMessage(ctx, hello); err != nil {
logger.Fatalw(ctx, "Failed saying hello to Openflow Server, unable to proceed",
log.Fields{
"device-id": ofc.DeviceID,
"error": err})
}
}
func (ofc *OFConnection) parseHeader(ctx context.Context, header ofp.IHeader, wg *sync.WaitGroup) {
defer wg.Done()
headerType := header.GetType()
logger.Debugw(ctx, "packet-header-type",
log.Fields{
"header-type": ofp.Type(headerType).String()})
switch headerType {
case ofp.OFPTHello:
//x := header.(*ofp.Hello)
case ofp.OFPTError:
ofc.handleErrMsg(ctx, header.(*ofp.ErrorMsg))
case ofp.OFPTEchoRequest:
ofc.handleEchoRequest(ctx, header.(*ofp.EchoRequest))
case ofp.OFPTEchoReply:
case ofp.OFPTExperimenter:
case ofp.OFPTFeaturesRequest:
if err := ofc.handleFeatureRequest(ctx, header.(*ofp.FeaturesRequest)); err != nil {
logger.Errorw(ctx, "handle-feature-request", log.Fields{"error": err})
}
case ofp.OFPTFeaturesReply:
case ofp.OFPTGetConfigRequest:
ofc.handleGetConfigRequest(ctx, header.(*ofp.GetConfigRequest))
case ofp.OFPTGetConfigReply:
case ofp.OFPTSetConfig:
ofc.handleSetConfig(ctx, header.(*ofp.SetConfig))
case ofp.OFPTPacketIn:
case ofp.OFPTFlowRemoved:
case ofp.OFPTPortStatus:
case ofp.OFPTPacketOut:
if !(ofc.role == ofcRoleMaster || ofc.role == ofcRoleEqual) {
ofc.sendRoleSlaveError(ctx, header)
return
}
ofc.handlePacketOut(ctx, header.(*ofp.PacketOut))
case ofp.OFPTFlowMod:
if !(ofc.role == ofcRoleMaster || ofc.role == ofcRoleEqual) {
ofc.sendRoleSlaveError(ctx, header)
return
}
switch header.(ofp.IFlowMod).GetCommand() {
case ofp.OFPFCAdd:
ofc.handleFlowAdd(ctx, header.(*ofp.FlowAdd))
case ofp.OFPFCModify:
ofc.handleFlowMod(ctx, header.(*ofp.FlowMod))
case ofp.OFPFCModifyStrict:
ofc.handleFlowModStrict(ctx, header.(*ofp.FlowModifyStrict))
case ofp.OFPFCDelete:
ofc.handleFlowDelete(ctx, header.(*ofp.FlowDelete))
case ofp.OFPFCDeleteStrict:
ofc.handleFlowDeleteStrict(ctx, header.(*ofp.FlowDeleteStrict))
}
case ofp.OFPTStatsRequest:
if err := ofc.handleStatsRequest(ctx, header, header.(ofp.IStatsRequest).GetStatsType()); err != nil {
logger.Errorw(ctx, "ofpt-stats-request", log.Fields{"error": err})
}
case ofp.OFPTBarrierRequest:
/* See note above at case ofp.OFPTFlowMod:*/
ofc.handleBarrierRequest(ctx, header.(*ofp.BarrierRequest))
case ofp.OFPTRoleRequest:
ofc.handleRoleRequest(ctx, header.(*ofp.RoleRequest))
case ofp.OFPTMeterMod:
if !(ofc.role == ofcRoleMaster || ofc.role == ofcRoleEqual) {
ofc.sendRoleSlaveError(ctx, header)
return
}
ofc.handleMeterModRequest(ctx, header.(*ofp.MeterMod))
case ofp.OFPTGroupMod:
if !(ofc.role == ofcRoleMaster || ofc.role == ofcRoleEqual) {
ofc.sendRoleSlaveError(ctx, header)
return
}
ofc.handleGroupMod(ctx, header.(ofp.IGroupMod))
}
}
// Message interface that represents an open flow message and enables for a
// unified implementation of SendMessage
type Message interface {
Serialize(encoder *goloxi.Encoder) error
}
func (ofc *OFConnection) doSend(ctx context.Context, msg Message) error {
if ofc.conn == nil {
return errors.New("no-connection")
}
enc := goloxi.NewEncoder()
if err := msg.Serialize(enc); err != nil {
return err
}
bytes := enc.Bytes()
if _, err := ofc.conn.Write(bytes); err != nil {
logger.Errorw(ctx, "unable-to-send-message-to-controller",
log.Fields{
"device-id": ofc.DeviceID,
"message": msg,
"error": err})
return err
}
return nil
}
func (ofc *OFConnection) messageSender(ctx context.Context) {
// first process last fail if it exists
if ofc.lastUnsentMessage != nil {
if err := ofc.doSend(ctx, ofc.lastUnsentMessage); err != nil {
ofc.events <- ofcEventDisconnect
return
}
ofc.lastUnsentMessage = nil
}
top:
for {
select {
case <-ctx.Done():
break top
case msg := <-ofc.sendChannel:
if err := ofc.doSend(ctx, msg); err != nil {
ofc.lastUnsentMessage = msg
ofc.events <- ofcEventDisconnect
logger.Debugw(ctx, "message-sender-error",
log.Fields{
"device-id": ofc.DeviceID,
"error": err.Error()})
break top
}
logger.Debugw(ctx, "message-sender-send",
log.Fields{
"device-id": ofc.DeviceID})
ofc.lastUnsentMessage = nil
}
}
logger.Debugw(ctx, "message-sender-finished",
log.Fields{
"device-id": ofc.DeviceID})
}
// SendMessage queues a message to be sent to the openflow controller
func (ofc *OFConnection) SendMessage(ctx context.Context, message Message) error {
// Removing this very chatty DEBUG log, it's the same as line 273 of client.go
//which has been extended to include the role.
//logger.Debugw(ctx, "queuing-message", log.Fields{
// "endpoint": ofc.OFControllerEndPoint,
// "role": ofc.role,
//})
ofc.sendChannel <- message
return nil
}