VOL-2487 - correct meter state size calculation
Also:
- modified OF read loop to use bufio as opposed to custome buffered io
- cleaned up the OF client state machine processing, including more
logging
Change-Id: If16f57f6f30a6fe6fecab85b5bf638402f2b7693
diff --git a/internal/pkg/ofagent/refresh.go b/internal/pkg/ofagent/refresh.go
index 8f68ef0..9baed59 100644
--- a/internal/pkg/ofagent/refresh.go
+++ b/internal/pkg/ofagent/refresh.go
@@ -67,8 +67,7 @@
}
logger.Debugw("GrpcClient refreshDeviceList", log.Fields{"ToAdd": toAdd, "ToDel": toDel})
for i := 0; i < len(toAdd); i++ {
- var client = ofa.addOFClient(toAdd[i])
- go client.Run(context.Background())
+ ofa.addOFClient(toAdd[i]) // client is started in addOFClient
}
for i := 0; i < len(toDel); i++ {
ofa.clientMap[toDel[i]].Stop()
@@ -90,7 +89,6 @@
PacketOutChannel: ofa.packetOutChannel,
ConnectionMaxRetries: ofa.ConnectionMaxRetries,
ConnectionRetryDelay: ofa.ConnectionRetryDelay,
- KeepRunning: true,
})
go ofc.Run(context.Background())
ofa.clientMap[deviceID] = ofc
diff --git a/internal/pkg/openflow/client.go b/internal/pkg/openflow/client.go
index 33d531d..19182ca 100644
--- a/internal/pkg/openflow/client.go
+++ b/internal/pkg/openflow/client.go
@@ -17,6 +17,7 @@
package openflow
import (
+ "bufio"
"context"
"encoding/binary"
"encoding/json"
@@ -37,19 +38,55 @@
const (
ofcEventStart = ofcEvent(iota)
- ofcEventConnected
- ofcEventDisconnected
+ ofcEventConnect
+ ofcEventDisconnect
+ ofcEventStop
- ofcStateConnected = ofcState(iota)
+ ofcStateCreated = ofcState(iota)
+ ofcStateStarted
+ ofcStateConnected
ofcStateDisconnected
+ ofcStateStopped
)
-//Client structure to hold fields of Openflow Client
+func (e ofcEvent) String() string {
+ switch e {
+ case ofcEventStart:
+ return "ofc-event-start"
+ case ofcEventConnect:
+ return "ofc-event-connected"
+ case ofcEventDisconnect:
+ return "ofc-event-disconnected"
+ case ofcEventStop:
+ return "ofc-event-stop"
+ default:
+ return "ofc-event-unknown"
+ }
+}
+
+func (s ofcState) String() string {
+ switch s {
+ case ofcStateCreated:
+ return "ofc-state-created"
+ case ofcStateStarted:
+ return "ofc-state-started"
+ case ofcStateConnected:
+ return "ofc-state-connected"
+ case ofcStateDisconnected:
+ return "ofc-state-disconnected"
+ case ofcStateStopped:
+ return "ofc-state-stopped"
+ default:
+ return "ofc-state-unknown"
+ }
+}
+
+// OFClient the configuration and operational state of a connection to an
+// openflow controller
type OFClient struct {
OFControllerEndPoint string
Port uint16
DeviceID string
- KeepRunning bool
VolthaClient voltha.VolthaServiceClient
PacketOutChannel chan *voltha.PacketOut
ConnectionMaxRetries int
@@ -62,7 +99,8 @@
lastUnsentMessage Message
}
-//NewClient contstructs a new Openflow Client and then starts up
+// NewClient returns an initialized OFClient instance based on the configuration
+// specified
func NewOFClient(config *OFClient) *OFClient {
ofc := OFClient{
@@ -70,7 +108,6 @@
OFControllerEndPoint: config.OFControllerEndPoint,
VolthaClient: config.VolthaClient,
PacketOutChannel: config.PacketOutChannel,
- KeepRunning: config.KeepRunning,
ConnectionMaxRetries: config.ConnectionMaxRetries,
ConnectionRetryDelay: config.ConnectionRetryDelay,
events: make(chan ofcEvent, 10),
@@ -88,9 +125,9 @@
return &ofc
}
-//End - set keepRunning to false so start loop exits
+// Stop initiates a shutdown of the OFClient
func (ofc *OFClient) Stop() {
- ofc.KeepRunning = false
+ ofc.events <- ofcEventStop
}
func (ofc *OFClient) peekAtOFHeader(buf []byte) (ofp.IHeader, error) {
@@ -107,6 +144,8 @@
func (ofc *OFClient) establishConnectionToController() error {
if ofc.conn != nil {
+ logger.Debugw("closing-of-connection-to-reconnect",
+ log.Fields{"device-id": ofc.DeviceID})
ofc.conn.Close()
ofc.conn = nil
}
@@ -121,7 +160,7 @@
if connection, err := net.DialTCP("tcp", nil, raddr); err == nil {
ofc.conn = connection
ofc.sayHello()
- ofc.events <- ofcEventConnected
+ ofc.events <- ofcEventConnect
return nil
} else {
logger.Warnw("openflow-client-connect-error",
@@ -140,157 +179,147 @@
return errors.New("failed-to-connect-to-of-controller")
}
+// Run implementes the state machine for the OF client reacting to state change
+// events and invoking actions as a reaction to those state changes
func (ofc *OFClient) Run(ctx context.Context) {
var ofCtx context.Context
var ofDone func()
+ state := ofcStateCreated
ofc.events <- ofcEventStart
- state := ofcStateDisconnected
top:
for {
select {
case <-ctx.Done():
+ state = ofcStateStopped
+ logger.Debugw("state-transition-context-done",
+ log.Fields{"device-id": ofc.DeviceID})
break top
case event := <-ofc.events:
+ previous := state
switch event {
case ofcEventStart:
- logger.Debugw("ofc-event-star",
+ logger.Debugw("ofc-event-start",
log.Fields{"device-id": ofc.DeviceID})
- go ofc.establishConnectionToController()
- case ofcEventConnected:
- if state == ofcStateDisconnected {
+ if state == ofcStateCreated {
+ state = ofcStateStarted
+ logger.Debug("STARTED MORE THAN ONCE")
+ go ofc.establishConnectionToController()
+ } else {
+ logger.Errorw("illegal-state-transition",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "current-state": state.String(),
+ "event": event.String()})
+ }
+ case ofcEventConnect:
+ logger.Debugw("ofc-event-connected",
+ log.Fields{"device-id": ofc.DeviceID})
+ if state == ofcStateStarted || state == ofcStateDisconnected {
state = ofcStateConnected
- logger.Debugw("ofc-event-connected",
- log.Fields{"device-id": ofc.DeviceID})
ofCtx, ofDone = context.WithCancel(context.Background())
go ofc.messageSender(ofCtx)
go ofc.processOFStream(ofCtx)
+ } else {
+ logger.Errorw("illegal-state-transition",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "current-state": state.String(),
+ "event": event.String()})
}
- case ofcEventDisconnected:
+ case ofcEventDisconnect:
+ logger.Debugw("ofc-event-disconnected",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "state": state.String()})
if state == ofcStateConnected {
state = ofcStateDisconnected
- logger.Debugw("ofc-event-disconnected",
- log.Fields{"device-id": ofc.DeviceID})
if ofDone != nil {
ofDone()
ofDone = nil
}
go ofc.establishConnectionToController()
+ } else {
+ logger.Errorw("illegal-state-transition",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "current-state": state.String(),
+ "event": event.String()})
+ }
+ case ofcEventStop:
+ logger.Debugw("ofc-event-stop",
+ log.Fields{"device-id": ofc.DeviceID})
+ if state == ofcStateCreated || state == ofcStateConnected || state == ofcStateDisconnected {
+ state = ofcStateStopped
+ break top
+ } else {
+ logger.Errorw("illegal-state-transition",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "current-state": state.String(),
+ "event": event.String()})
}
}
+ logger.Debugw("state-transition",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "previous-state": previous.String(),
+ "current-state": state.String(),
+ "event": event.String()})
}
}
+ // If the child context exists, then cancel it
if ofDone != nil {
+ log.Debugw("closing-child-processes",
+ log.Fields{"device-id": ofc.DeviceID})
ofDone()
ofDone = nil
}
+ // If the connection is open, then close it
+ if ofc.conn != nil {
+ log.Debugw("closing-of-connection",
+ log.Fields{"device-id": ofc.DeviceID})
+ ofc.conn.Close()
+ ofc.conn = nil
+ }
+ log.Debugw("state-machine-finished",
+ log.Fields{"device-id": ofc.DeviceID})
}
-// Run run loop for the openflow client
+// processOFStream processes the OF connection from the controller and invokes
+// the appropriate handler methods for each message.
func (ofc *OFClient) processOFStream(ctx context.Context) {
- buf := make([]byte, 1500)
- var need, have int
- /*
- * EXPLANATION
- *
- * The below loops reuses a byte array to read messages from the TCP
- * connection to the OF controller. It reads messages into a large
- * buffer in an attempt to optimize the read performance from the
- * TCP connection. This means that on any given read there may be more
- * than a single message in the byte array read.
- *
- * As the minimal size for an OF message is 8 bytes (because that is
- * the size of the basic header) we know that if we have not read
- * 8 bytes we need to read more before we can process a message.
- *
- * Once the mninium header is read, the complete length of the
- * message is retrieved from the header and bytes are repeatedly read
- * until we know the byte array contains at least one message.
- *
- * Once it is known that the buffer has at least a single message
- * a slice (msg) is moved through the read bytes looking to process
- * each message util the length of read data is < the length required
- * i.e., the minimum size or the size of the next message.
- *
- * When no more message can be proessed from the byte array any unused
- * bytes are moved to the front of the source array and more data is
- * read from the TCP connection.
- */
+ fromController := bufio.NewReader(ofc.conn)
/*
- * First thing we are looking for is an openflow header, so we need at
- * least 8 bytes
+ * We have a read buffer of a max size of 4096, so if we ever have
+ * a message larger than this then we will have issues
*/
- need = 8
+ buf := make([]byte, 4096)
top:
// Continue until we are told to stop
- for ofc.KeepRunning {
- logger.Debugw("before-read-from-controller",
- log.Fields{
- "device-id": ofc.DeviceID,
- "have": have,
- "need": need,
- "buf-length": len(buf[have:])})
- read, err := ofc.conn.Read(buf[have:])
- have += read
- logger.Debugw("read-from-controller",
- log.Fields{
- "device-id": ofc.DeviceID,
- "byte-count": read,
- "error": err})
+ for {
+ select {
+ case <-ctx.Done():
+ logger.Error("of-loop-ending-context-done")
+ break top
+ default:
+ // Read 8 bytes, the standard OF header
+ read, err := io.ReadFull(fromController, buf[:8])
+ if err != nil {
+ logger.Errorw("bad-of-header",
+ log.Fields{
+ "byte-count": read,
+ "device-id": ofc.DeviceID,
+ "error": err})
+ break top
+ }
- /*
- * If we have less than we need and there is no
- * error, then continue to attempt to read more data
- */
- if have < need && err == nil {
- // No bytes available, just continue
- logger.Debugw("continue-to-read",
- log.Fields{
- "device-id": ofc.DeviceID,
- "have": have,
- "need": need,
- "error": err})
- continue
- }
-
- /*
- * Single out EOF here, because if we have bytes
- * but have an EOF we still want to process the
- * the last meesage. A read of 0 bytes and EOF is
- * a terminated connection.
- */
- if err != nil && (err != io.EOF || read == 0) {
- logger.Errorw("voltha-connection-dead",
- log.Fields{
- "device-id": ofc.DeviceID,
- "error": err})
- break
- }
-
- /*
- * We should have at least 1 message at this point so
- * create a slice (msg) that points to the start of the
- * buffer
- */
- msg := buf[0:]
- for need <= have {
- logger.Debugw("process-of-message-stream",
- log.Fields{
- "device-id": ofc.DeviceID,
- "have": have,
- "need": need})
- /*
- * If we get here, we have at least the 8 bytes of the
- * header, if not enough for the complete message. So
- * take a peek at the OF header to do simple validation
- * and be able to get the full expected length of the
- * packet
- */
- peek, err := ofc.peekAtOFHeader(msg)
+ // Decode the header
+ peek, err := ofc.peekAtOFHeader(buf[:8])
if err != nil {
/*
* Header is bad, assume stream is corrupted
@@ -303,29 +332,20 @@
break top
}
- /*
- * If we don't have the full packet, then back around
- * the outer loop to get more bytes
- */
- need = int(peek.GetLength())
-
- logger.Debugw("processed-header-need-message",
- log.Fields{
- "device-id": ofc.DeviceID,
- "have": have,
- "need": need})
-
- if have < need {
- logger.Debugw("end-processing:continue-to-read",
+ // Calculate the size of the rest of the packet and read it
+ need := int(peek.GetLength())
+ read, err = io.ReadFull(fromController, buf[8:need])
+ if err != nil {
+ logger.Errorw("bad-of-packet",
log.Fields{
- "device-id": ofc.DeviceID,
- "have": have,
- "need": need})
- break
+ "byte-count": read,
+ "device-id": ofc.DeviceID,
+ "error": err})
+ break top
}
// Decode and process the packet
- decoder := goloxi.NewDecoder(msg)
+ decoder := goloxi.NewDecoder(buf[:need])
header, err := ofp.DecodeHeader(decoder)
if err != nil {
js, _ := json.Marshal(decoder)
@@ -344,34 +364,11 @@
"header": js})
}
ofc.parseHeader(header)
-
- /*
- * Move the msg slice to the start of the next
- * message, which is the current message plus the
- * used bytes (need)
- */
- msg = msg[need:]
- have -= need
-
- // Finished process method, need header again
- need = 8
-
- logger.Debugw("message-process-complete",
- log.Fields{
- "device-id": ofc.DeviceID,
- "have": have,
- "need": need,
- "read-length": len(buf[have:])})
- }
- /*
- * If we have any left over bytes move them to the front
- * of the byte array to be appended to bny the next read
- */
- if have > 0 {
- copy(buf, msg)
}
}
- ofc.events <- ofcEventDisconnected
+ logger.Debugw("end-of-stream",
+ log.Fields{"device-id": ofc.DeviceID})
+ ofc.events <- ofcEventDisconnect
}
func (ofc *OFClient) sayHello() {
@@ -451,7 +448,8 @@
}
}
-//Message created to allow for a single SendMessage
+// Message interface that represents an open flow message and enables for a
+// unified implementation of SendMessage
type Message interface {
Serialize(encoder *goloxi.Encoder) error
}
@@ -464,7 +462,7 @@
msg.Serialize(enc)
bytes := enc.Bytes()
if _, err := ofc.conn.Write(bytes); err != nil {
- logger.Warnw("unable-to-send-message-to-controller",
+ logger.Errorw("unable-to-send-message-to-controller",
log.Fields{
"device-id": ofc.DeviceID,
"message": msg,
@@ -475,11 +473,10 @@
}
func (ofc *OFClient) messageSender(ctx context.Context) {
-
// first process last fail if it exists
if ofc.lastUnsentMessage != nil {
if err := ofc.doSend(ofc.lastUnsentMessage); err != nil {
- ofc.events <- ofcEventDisconnected
+ ofc.events <- ofcEventDisconnect
return
}
ofc.lastUnsentMessage = nil
@@ -490,9 +487,9 @@
case <-ctx.Done():
break top
case msg := <-ofc.sendChannel:
- if ofc.doSend(msg) != nil {
+ if err := ofc.doSend(msg); err != nil {
ofc.lastUnsentMessage = msg
- ofc.events <- ofcEventDisconnected
+ ofc.events <- ofcEventDisconnect
return
}
ofc.lastUnsentMessage = nil
@@ -500,39 +497,8 @@
}
}
+// SendMessage queues a message to be sent to the openflow controller
func (ofc *OFClient) SendMessage(message Message) error {
ofc.sendChannel <- message
return nil
}
-
-//SendMessage sends message to openflow server
-func (ofc *OFClient) SendMessageOrig(message Message) error {
- if logger.V(log.DebugLevel) {
- js, _ := json.Marshal(message)
- logger.Debugw("SendMessage called",
- log.Fields{
- "device-id": ofc.DeviceID,
- "message": js})
- }
- enc := goloxi.NewEncoder()
- message.Serialize(enc)
- for {
- if ofc.conn == nil {
- logger.Warnln("SendMessage Connection is Nil sleeping for 10 milliseconds")
- time.Sleep(10 * time.Millisecond)
- } else {
- break
- }
- }
- bytes := enc.Bytes()
- if _, err := ofc.conn.Write(bytes); err != nil {
- jMessage, _ := json.Marshal(message)
- logger.Errorw("SendMessage failed sending message",
- log.Fields{
- "device-id": ofc.DeviceID,
- "error": err,
- "message": jMessage})
- return err
- }
- return nil
-}
diff --git a/internal/pkg/openflow/stats.go b/internal/pkg/openflow/stats.go
index 68d2625..17db448 100644
--- a/internal/pkg/openflow/stats.go
+++ b/internal/pkg/openflow/stats.go
@@ -48,7 +48,7 @@
if logger.V(log.DebugLevel) {
reqJs, _ := json.Marshal(statsReq)
resJs, _ := json.Marshal(response)
- logger.Debugw("handle-stats-request",
+ logger.Debugw("handle-stats-request-desc",
log.Fields{
"device-id": ofc.DeviceID,
"request": reqJs,
@@ -65,7 +65,7 @@
if logger.V(log.DebugLevel) {
reqJs, _ := json.Marshal(statsReq)
resJs, _ := json.Marshal(response)
- logger.Debugw("handle-stats-request",
+ logger.Debugw("handle-stats-request-flow",
log.Fields{
"device-id": ofc.DeviceID,
"request": reqJs,
@@ -82,7 +82,7 @@
if logger.V(log.DebugLevel) {
reqJs, _ := json.Marshal(statsReq)
resJs, _ := json.Marshal(response)
- logger.Debugw("handle-stats-request",
+ logger.Debugw("handle-stats-request-aggregate",
log.Fields{
"device-id": ofc.DeviceID,
"request": reqJs,
@@ -95,7 +95,7 @@
if logger.V(log.DebugLevel) {
reqJs, _ := json.Marshal(statsReq)
resJs, _ := json.Marshal(response)
- logger.Debugw("handle-stats-request",
+ logger.Debugw("handle-stats-request-table",
log.Fields{
"device-id": ofc.DeviceID,
"request": reqJs,
@@ -114,7 +114,7 @@
if logger.V(log.DebugLevel) {
reqJs, _ := json.Marshal(statsReq)
resJs, _ := json.Marshal(response)
- logger.Debugw("handle-stats-request",
+ logger.Debugw("handle-stats-request-port",
log.Fields{
"device-id": ofc.DeviceID,
"request": reqJs,
@@ -130,7 +130,7 @@
if logger.V(log.DebugLevel) {
reqJs, _ := json.Marshal(statsReq)
resJs, _ := json.Marshal(response)
- logger.Debugw("handle-stats-request",
+ logger.Debugw("handle-stats-request-queue",
log.Fields{
"device-id": ofc.DeviceID,
"request": reqJs,
@@ -146,7 +146,7 @@
if logger.V(log.DebugLevel) {
reqJs, _ := json.Marshal(statsReq)
resJs, _ := json.Marshal(response)
- logger.Debugw("handle-stats-request",
+ logger.Debugw("handle-stats-request-group",
log.Fields{
"device-id": ofc.DeviceID,
"request": reqJs,
@@ -162,7 +162,7 @@
if logger.V(log.DebugLevel) {
reqJs, _ := json.Marshal(statsReq)
resJs, _ := json.Marshal(response)
- logger.Debugw("handle-stats-request",
+ logger.Debugw("handle-stats-request-group-desc",
log.Fields{
"device-id": ofc.DeviceID,
"request": reqJs,
@@ -179,7 +179,7 @@
if logger.V(log.DebugLevel) {
reqJs, _ := json.Marshal(statsReq)
resJs, _ := json.Marshal(response)
- logger.Debugw("handle-stats-request",
+ logger.Debugw("handle-stats-request-group-features",
log.Fields{
"device-id": ofc.DeviceID,
"request": reqJs,
@@ -195,7 +195,7 @@
if logger.V(log.DebugLevel) {
reqJs, _ := json.Marshal(statsReq)
resJs, _ := json.Marshal(response)
- logger.Debugw("handle-stats-request",
+ logger.Debugw("handle-stats-request-meter",
log.Fields{
"device-id": ofc.DeviceID,
"request": reqJs,
@@ -211,7 +211,7 @@
if logger.V(log.DebugLevel) {
reqJs, _ := json.Marshal(statsReq)
resJs, _ := json.Marshal(response)
- logger.Debugw("handle-stats-request",
+ logger.Debugw("handle-stats-request-meter-config",
log.Fields{
"device-id": ofc.DeviceID,
"request": reqJs,
@@ -227,7 +227,7 @@
if logger.V(log.DebugLevel) {
reqJs, _ := json.Marshal(statsReq)
resJs, _ := json.Marshal(response)
- logger.Debugw("handle-stats-request",
+ logger.Debugw("handle-stats-request-meter-features",
log.Fields{
"device-id": ofc.DeviceID,
"request": reqJs,
@@ -243,7 +243,7 @@
if logger.V(log.DebugLevel) {
reqJs, _ := json.Marshal(statsReq)
resJs, _ := json.Marshal(response)
- logger.Debugw("handle-stats-request",
+ logger.Debugw("handle-stats-request-table-features",
log.Fields{
"device-id": ofc.DeviceID,
"request": reqJs,
@@ -259,7 +259,7 @@
if logger.V(log.DebugLevel) {
reqJs, _ := json.Marshal(statsReq)
resJs, _ := json.Marshal(response)
- logger.Debugw("handle-stats-request",
+ logger.Debugw("handle-stats-request-port-desc",
log.Fields{
"device-id": ofc.DeviceID,
"request": reqJs,
@@ -276,7 +276,7 @@
if logger.V(log.DebugLevel) {
reqJs, _ := json.Marshal(statsReq)
resJs, _ := json.Marshal(response)
- logger.Debugw("handle-stats-request",
+ logger.Debugw("handle-stats-request-experimenter",
log.Fields{
"device-id": ofc.DeviceID,
"request": reqJs,
@@ -465,9 +465,10 @@
if err != nil {
return nil, err
}
- size := uint16(40)
+ size := uint16(5) // size of stats header
var meterStats []*ofp.MeterStats
for _, item := range resp.Items {
+ entrySize := uint16(40) // size of entry header
meterStat := ofp.NewMeterStats()
stats := item.Stats
meterStat.DurationNsec = stats.DurationNsec
@@ -481,13 +482,15 @@
bandStat.ByteBandCount = bStat.ByteBandCount
bandStat.PacketBandCount = bStat.PacketBandCount
bandStats = append(bandStats, bandStat)
- size += 16
+ entrySize += uint16(16) // size of each band stat
}
meterStat.SetBandStats(bandStats)
- meterStat.Len = size
+ meterStat.Len = entrySize
meterStats = append(meterStats, meterStat)
+ size += entrySize
}
response.SetEntries(meterStats)
+ response.SetLength(size)
return response, nil
}