blob: 33d531d99e67ba39bbe2b5c1ef9dce1c31f01faa [file] [log] [blame]
/*
Copyright 2020 the original author or authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package openflow
import (
"context"
"encoding/binary"
"encoding/json"
"errors"
"github.com/donNewtonAlpha/goloxi"
ofp "github.com/donNewtonAlpha/goloxi/of13"
"github.com/opencord/voltha-lib-go/v2/pkg/log"
"github.com/opencord/voltha-protos/v2/go/voltha"
"io"
"net"
"time"
)
var logger, _ = log.AddPackage(log.JSON, log.DebugLevel, nil)
type ofcEvent byte
type ofcState byte
const (
ofcEventStart = ofcEvent(iota)
ofcEventConnected
ofcEventDisconnected
ofcStateConnected = ofcState(iota)
ofcStateDisconnected
)
//Client structure to hold fields of Openflow Client
type OFClient struct {
OFControllerEndPoint string
Port uint16
DeviceID string
KeepRunning bool
VolthaClient voltha.VolthaServiceClient
PacketOutChannel chan *voltha.PacketOut
ConnectionMaxRetries int
ConnectionRetryDelay time.Duration
conn net.Conn
// expirimental
events chan ofcEvent
sendChannel chan Message
lastUnsentMessage Message
}
//NewClient contstructs a new Openflow Client and then starts up
func NewOFClient(config *OFClient) *OFClient {
ofc := OFClient{
DeviceID: config.DeviceID,
OFControllerEndPoint: config.OFControllerEndPoint,
VolthaClient: config.VolthaClient,
PacketOutChannel: config.PacketOutChannel,
KeepRunning: config.KeepRunning,
ConnectionMaxRetries: config.ConnectionMaxRetries,
ConnectionRetryDelay: config.ConnectionRetryDelay,
events: make(chan ofcEvent, 10),
sendChannel: make(chan Message, 100),
}
if ofc.ConnectionRetryDelay <= 0 {
logger.Warnw("connection retry delay not valid, setting to default",
log.Fields{
"device-id": ofc.DeviceID,
"value": ofc.ConnectionRetryDelay.String(),
"default": (3 * time.Second).String()})
ofc.ConnectionRetryDelay = 3 * time.Second
}
return &ofc
}
//End - set keepRunning to false so start loop exits
func (ofc *OFClient) Stop() {
ofc.KeepRunning = false
}
func (ofc *OFClient) peekAtOFHeader(buf []byte) (ofp.IHeader, error) {
header := ofp.Header{}
header.Version = uint8(buf[0])
header.Type = uint8(buf[1])
header.Length = binary.BigEndian.Uint16(buf[2:4])
header.Xid = binary.BigEndian.Uint32(buf[4:8])
// TODO: add minimal validation of version and type
return &header, nil
}
func (ofc *OFClient) establishConnectionToController() error {
if ofc.conn != nil {
ofc.conn.Close()
ofc.conn = nil
}
try := 1
for ofc.ConnectionMaxRetries == 0 || try < ofc.ConnectionMaxRetries {
if raddr, err := net.ResolveTCPAddr("tcp", ofc.OFControllerEndPoint); err != nil {
logger.Debugw("openflow-client unable to resolve endpoint",
log.Fields{
"device-id": ofc.DeviceID,
"endpoint": ofc.OFControllerEndPoint})
} else {
if connection, err := net.DialTCP("tcp", nil, raddr); err == nil {
ofc.conn = connection
ofc.sayHello()
ofc.events <- ofcEventConnected
return nil
} else {
logger.Warnw("openflow-client-connect-error",
log.Fields{
"device-id": ofc.DeviceID,
"endpoint": ofc.OFControllerEndPoint})
}
}
if ofc.ConnectionMaxRetries == 0 || try < ofc.ConnectionMaxRetries {
if ofc.ConnectionMaxRetries != 0 {
try += 1
}
time.Sleep(ofc.ConnectionRetryDelay)
}
}
return errors.New("failed-to-connect-to-of-controller")
}
func (ofc *OFClient) Run(ctx context.Context) {
var ofCtx context.Context
var ofDone func()
ofc.events <- ofcEventStart
state := ofcStateDisconnected
top:
for {
select {
case <-ctx.Done():
break top
case event := <-ofc.events:
switch event {
case ofcEventStart:
logger.Debugw("ofc-event-star",
log.Fields{"device-id": ofc.DeviceID})
go ofc.establishConnectionToController()
case ofcEventConnected:
if state == ofcStateDisconnected {
state = ofcStateConnected
logger.Debugw("ofc-event-connected",
log.Fields{"device-id": ofc.DeviceID})
ofCtx, ofDone = context.WithCancel(context.Background())
go ofc.messageSender(ofCtx)
go ofc.processOFStream(ofCtx)
}
case ofcEventDisconnected:
if state == ofcStateConnected {
state = ofcStateDisconnected
logger.Debugw("ofc-event-disconnected",
log.Fields{"device-id": ofc.DeviceID})
if ofDone != nil {
ofDone()
ofDone = nil
}
go ofc.establishConnectionToController()
}
}
}
}
if ofDone != nil {
ofDone()
ofDone = nil
}
}
// Run run loop for the openflow client
func (ofc *OFClient) processOFStream(ctx context.Context) {
buf := make([]byte, 1500)
var need, have int
/*
* EXPLANATION
*
* The below loops reuses a byte array to read messages from the TCP
* connection to the OF controller. It reads messages into a large
* buffer in an attempt to optimize the read performance from the
* TCP connection. This means that on any given read there may be more
* than a single message in the byte array read.
*
* As the minimal size for an OF message is 8 bytes (because that is
* the size of the basic header) we know that if we have not read
* 8 bytes we need to read more before we can process a message.
*
* Once the mninium header is read, the complete length of the
* message is retrieved from the header and bytes are repeatedly read
* until we know the byte array contains at least one message.
*
* Once it is known that the buffer has at least a single message
* a slice (msg) is moved through the read bytes looking to process
* each message util the length of read data is < the length required
* i.e., the minimum size or the size of the next message.
*
* When no more message can be proessed from the byte array any unused
* bytes are moved to the front of the source array and more data is
* read from the TCP connection.
*/
/*
* First thing we are looking for is an openflow header, so we need at
* least 8 bytes
*/
need = 8
top:
// Continue until we are told to stop
for ofc.KeepRunning {
logger.Debugw("before-read-from-controller",
log.Fields{
"device-id": ofc.DeviceID,
"have": have,
"need": need,
"buf-length": len(buf[have:])})
read, err := ofc.conn.Read(buf[have:])
have += read
logger.Debugw("read-from-controller",
log.Fields{
"device-id": ofc.DeviceID,
"byte-count": read,
"error": err})
/*
* If we have less than we need and there is no
* error, then continue to attempt to read more data
*/
if have < need && err == nil {
// No bytes available, just continue
logger.Debugw("continue-to-read",
log.Fields{
"device-id": ofc.DeviceID,
"have": have,
"need": need,
"error": err})
continue
}
/*
* Single out EOF here, because if we have bytes
* but have an EOF we still want to process the
* the last meesage. A read of 0 bytes and EOF is
* a terminated connection.
*/
if err != nil && (err != io.EOF || read == 0) {
logger.Errorw("voltha-connection-dead",
log.Fields{
"device-id": ofc.DeviceID,
"error": err})
break
}
/*
* We should have at least 1 message at this point so
* create a slice (msg) that points to the start of the
* buffer
*/
msg := buf[0:]
for need <= have {
logger.Debugw("process-of-message-stream",
log.Fields{
"device-id": ofc.DeviceID,
"have": have,
"need": need})
/*
* If we get here, we have at least the 8 bytes of the
* header, if not enough for the complete message. So
* take a peek at the OF header to do simple validation
* and be able to get the full expected length of the
* packet
*/
peek, err := ofc.peekAtOFHeader(msg)
if err != nil {
/*
* Header is bad, assume stream is corrupted
* and needs to be restarted
*/
logger.Errorw("bad-of-packet",
log.Fields{
"device-id": ofc.DeviceID,
"error": err})
break top
}
/*
* If we don't have the full packet, then back around
* the outer loop to get more bytes
*/
need = int(peek.GetLength())
logger.Debugw("processed-header-need-message",
log.Fields{
"device-id": ofc.DeviceID,
"have": have,
"need": need})
if have < need {
logger.Debugw("end-processing:continue-to-read",
log.Fields{
"device-id": ofc.DeviceID,
"have": have,
"need": need})
break
}
// Decode and process the packet
decoder := goloxi.NewDecoder(msg)
header, err := ofp.DecodeHeader(decoder)
if err != nil {
js, _ := json.Marshal(decoder)
logger.Errorw("failed-to-decode",
log.Fields{
"device-id": ofc.DeviceID,
"decoder": js,
"error": err})
break top
}
if logger.V(log.DebugLevel) {
js, _ := json.Marshal(header)
logger.Debugw("packet-header",
log.Fields{
"device-id": ofc.DeviceID,
"header": js})
}
ofc.parseHeader(header)
/*
* Move the msg slice to the start of the next
* message, which is the current message plus the
* used bytes (need)
*/
msg = msg[need:]
have -= need
// Finished process method, need header again
need = 8
logger.Debugw("message-process-complete",
log.Fields{
"device-id": ofc.DeviceID,
"have": have,
"need": need,
"read-length": len(buf[have:])})
}
/*
* If we have any left over bytes move them to the front
* of the byte array to be appended to bny the next read
*/
if have > 0 {
copy(buf, msg)
}
}
ofc.events <- ofcEventDisconnected
}
func (ofc *OFClient) sayHello() {
hello := ofp.NewHello()
hello.Xid = uint32(GetXid())
elem := ofp.NewHelloElemVersionbitmap()
elem.SetType(ofp.OFPHETVersionbitmap)
elem.SetLength(8)
elem.SetBitmaps([]*ofp.Uint32{&ofp.Uint32{Value: 16}})
hello.SetElements([]ofp.IHelloElem{elem})
if logger.V(log.DebugLevel) {
js, _ := json.Marshal(hello)
logger.Debugw("sayHello Called",
log.Fields{
"device-id": ofc.DeviceID,
"hello-message": js})
}
if err := ofc.SendMessage(hello); err != nil {
logger.Fatalw("Failed saying hello to Openflow Server, unable to proceed",
log.Fields{
"device-id": ofc.DeviceID,
"error": err})
}
}
func (ofc *OFClient) parseHeader(header ofp.IHeader) {
switch header.GetType() {
case ofp.OFPTHello:
//x := header.(*ofp.Hello)
case ofp.OFPTError:
go ofc.handleErrMsg(header.(*ofp.ErrorMsg))
case ofp.OFPTEchoRequest:
go ofc.handleEchoRequest(header.(*ofp.EchoRequest))
case ofp.OFPTEchoReply:
case ofp.OFPTExperimenter:
case ofp.OFPTFeaturesRequest:
go ofc.handleFeatureRequest(header.(*ofp.FeaturesRequest))
case ofp.OFPTFeaturesReply:
case ofp.OFPTGetConfigRequest:
go ofc.handleGetConfigRequest(header.(*ofp.GetConfigRequest))
case ofp.OFPTGetConfigReply:
case ofp.OFPTSetConfig:
go ofc.handleSetConfig(header.(*ofp.SetConfig))
case ofp.OFPTPacketIn:
case ofp.OFPTFlowRemoved:
case ofp.OFPTPortStatus:
case ofp.OFPTPacketOut:
go ofc.handlePacketOut(header.(*ofp.PacketOut))
case ofp.OFPTFlowMod:
/*
* Not using go routine to handle flow* messages or barrier requests
* onos typically issues barrier requests just before a flow* message.
* by handling in this thread I ensure all flow* are handled when barrier
* request is issued.
*/
switch header.(ofp.IFlowMod).GetCommand() {
case ofp.OFPFCAdd:
ofc.handleFlowAdd(header.(*ofp.FlowAdd))
case ofp.OFPFCModify:
ofc.handleFlowMod(header.(*ofp.FlowMod))
case ofp.OFPFCModifyStrict:
ofc.handleFlowModStrict(header.(*ofp.FlowModifyStrict))
case ofp.OFPFCDelete:
ofc.handleFlowDelete(header.(*ofp.FlowDelete))
case ofp.OFPFCDeleteStrict:
ofc.handleFlowDeleteStrict(header.(*ofp.FlowDeleteStrict))
}
case ofp.OFPTStatsRequest:
go ofc.handleStatsRequest(header, header.(ofp.IStatsRequest).GetStatsType())
case ofp.OFPTBarrierRequest:
/* See note above at case ofp.OFPTFlowMod:*/
ofc.handleBarrierRequest(header.(*ofp.BarrierRequest))
case ofp.OFPTRoleRequest:
go ofc.handleRoleRequest(header.(*ofp.RoleRequest))
case ofp.OFPTMeterMod:
go ofc.handleMeterModRequest(header.(*ofp.MeterMod))
}
}
//Message created to allow for a single SendMessage
type Message interface {
Serialize(encoder *goloxi.Encoder) error
}
func (ofc *OFClient) doSend(msg Message) error {
if ofc.conn == nil {
return errors.New("no-connection")
}
enc := goloxi.NewEncoder()
msg.Serialize(enc)
bytes := enc.Bytes()
if _, err := ofc.conn.Write(bytes); err != nil {
logger.Warnw("unable-to-send-message-to-controller",
log.Fields{
"device-id": ofc.DeviceID,
"message": msg,
"error": err})
return err
}
return nil
}
func (ofc *OFClient) messageSender(ctx context.Context) {
// first process last fail if it exists
if ofc.lastUnsentMessage != nil {
if err := ofc.doSend(ofc.lastUnsentMessage); err != nil {
ofc.events <- ofcEventDisconnected
return
}
ofc.lastUnsentMessage = nil
}
top:
for {
select {
case <-ctx.Done():
break top
case msg := <-ofc.sendChannel:
if ofc.doSend(msg) != nil {
ofc.lastUnsentMessage = msg
ofc.events <- ofcEventDisconnected
return
}
ofc.lastUnsentMessage = nil
}
}
}
func (ofc *OFClient) SendMessage(message Message) error {
ofc.sendChannel <- message
return nil
}
//SendMessage sends message to openflow server
func (ofc *OFClient) SendMessageOrig(message Message) error {
if logger.V(log.DebugLevel) {
js, _ := json.Marshal(message)
logger.Debugw("SendMessage called",
log.Fields{
"device-id": ofc.DeviceID,
"message": js})
}
enc := goloxi.NewEncoder()
message.Serialize(enc)
for {
if ofc.conn == nil {
logger.Warnln("SendMessage Connection is Nil sleeping for 10 milliseconds")
time.Sleep(10 * time.Millisecond)
} else {
break
}
}
bytes := enc.Bytes()
if _, err := ofc.conn.Write(bytes); err != nil {
jMessage, _ := json.Marshal(message)
logger.Errorw("SendMessage failed sending message",
log.Fields{
"device-id": ofc.DeviceID,
"error": err,
"message": jMessage})
return err
}
return nil
}