VOL-2487 - correct meter state size calculation

Also:
- modified OF read loop to use bufio as opposed to custome buffered io
- cleaned up the OF client state machine processing, including more
  logging

Change-Id: If16f57f6f30a6fe6fecab85b5bf638402f2b7693
diff --git a/internal/pkg/openflow/client.go b/internal/pkg/openflow/client.go
index 33d531d..19182ca 100644
--- a/internal/pkg/openflow/client.go
+++ b/internal/pkg/openflow/client.go
@@ -17,6 +17,7 @@
 package openflow
 
 import (
+	"bufio"
 	"context"
 	"encoding/binary"
 	"encoding/json"
@@ -37,19 +38,55 @@
 
 const (
 	ofcEventStart = ofcEvent(iota)
-	ofcEventConnected
-	ofcEventDisconnected
+	ofcEventConnect
+	ofcEventDisconnect
+	ofcEventStop
 
-	ofcStateConnected = ofcState(iota)
+	ofcStateCreated = ofcState(iota)
+	ofcStateStarted
+	ofcStateConnected
 	ofcStateDisconnected
+	ofcStateStopped
 )
 
-//Client structure to hold fields of Openflow Client
+func (e ofcEvent) String() string {
+	switch e {
+	case ofcEventStart:
+		return "ofc-event-start"
+	case ofcEventConnect:
+		return "ofc-event-connected"
+	case ofcEventDisconnect:
+		return "ofc-event-disconnected"
+	case ofcEventStop:
+		return "ofc-event-stop"
+	default:
+		return "ofc-event-unknown"
+	}
+}
+
+func (s ofcState) String() string {
+	switch s {
+	case ofcStateCreated:
+		return "ofc-state-created"
+	case ofcStateStarted:
+		return "ofc-state-started"
+	case ofcStateConnected:
+		return "ofc-state-connected"
+	case ofcStateDisconnected:
+		return "ofc-state-disconnected"
+	case ofcStateStopped:
+		return "ofc-state-stopped"
+	default:
+		return "ofc-state-unknown"
+	}
+}
+
+// OFClient the configuration and operational state of a connection to an
+// openflow controller
 type OFClient struct {
 	OFControllerEndPoint string
 	Port                 uint16
 	DeviceID             string
-	KeepRunning          bool
 	VolthaClient         voltha.VolthaServiceClient
 	PacketOutChannel     chan *voltha.PacketOut
 	ConnectionMaxRetries int
@@ -62,7 +99,8 @@
 	lastUnsentMessage Message
 }
 
-//NewClient  contstructs a new Openflow Client and then starts up
+// NewClient returns an initialized OFClient instance based on the configuration
+// specified
 func NewOFClient(config *OFClient) *OFClient {
 
 	ofc := OFClient{
@@ -70,7 +108,6 @@
 		OFControllerEndPoint: config.OFControllerEndPoint,
 		VolthaClient:         config.VolthaClient,
 		PacketOutChannel:     config.PacketOutChannel,
-		KeepRunning:          config.KeepRunning,
 		ConnectionMaxRetries: config.ConnectionMaxRetries,
 		ConnectionRetryDelay: config.ConnectionRetryDelay,
 		events:               make(chan ofcEvent, 10),
@@ -88,9 +125,9 @@
 	return &ofc
 }
 
-//End - set keepRunning to false so start loop exits
+// Stop initiates a shutdown of the OFClient
 func (ofc *OFClient) Stop() {
-	ofc.KeepRunning = false
+	ofc.events <- ofcEventStop
 }
 
 func (ofc *OFClient) peekAtOFHeader(buf []byte) (ofp.IHeader, error) {
@@ -107,6 +144,8 @@
 
 func (ofc *OFClient) establishConnectionToController() error {
 	if ofc.conn != nil {
+		logger.Debugw("closing-of-connection-to-reconnect",
+			log.Fields{"device-id": ofc.DeviceID})
 		ofc.conn.Close()
 		ofc.conn = nil
 	}
@@ -121,7 +160,7 @@
 			if connection, err := net.DialTCP("tcp", nil, raddr); err == nil {
 				ofc.conn = connection
 				ofc.sayHello()
-				ofc.events <- ofcEventConnected
+				ofc.events <- ofcEventConnect
 				return nil
 			} else {
 				logger.Warnw("openflow-client-connect-error",
@@ -140,157 +179,147 @@
 	return errors.New("failed-to-connect-to-of-controller")
 }
 
+// Run implementes the state machine for the OF client reacting to state change
+// events and invoking actions as a reaction to those state changes
 func (ofc *OFClient) Run(ctx context.Context) {
 
 	var ofCtx context.Context
 	var ofDone func()
+	state := ofcStateCreated
 	ofc.events <- ofcEventStart
-	state := ofcStateDisconnected
 top:
 	for {
 		select {
 		case <-ctx.Done():
+			state = ofcStateStopped
+			logger.Debugw("state-transition-context-done",
+				log.Fields{"device-id": ofc.DeviceID})
 			break top
 		case event := <-ofc.events:
+			previous := state
 			switch event {
 			case ofcEventStart:
-				logger.Debugw("ofc-event-star",
+				logger.Debugw("ofc-event-start",
 					log.Fields{"device-id": ofc.DeviceID})
-				go ofc.establishConnectionToController()
-			case ofcEventConnected:
-				if state == ofcStateDisconnected {
+				if state == ofcStateCreated {
+					state = ofcStateStarted
+					logger.Debug("STARTED MORE THAN ONCE")
+					go ofc.establishConnectionToController()
+				} else {
+					logger.Errorw("illegal-state-transition",
+						log.Fields{
+							"device-id":     ofc.DeviceID,
+							"current-state": state.String(),
+							"event":         event.String()})
+				}
+			case ofcEventConnect:
+				logger.Debugw("ofc-event-connected",
+					log.Fields{"device-id": ofc.DeviceID})
+				if state == ofcStateStarted || state == ofcStateDisconnected {
 					state = ofcStateConnected
-					logger.Debugw("ofc-event-connected",
-						log.Fields{"device-id": ofc.DeviceID})
 					ofCtx, ofDone = context.WithCancel(context.Background())
 					go ofc.messageSender(ofCtx)
 					go ofc.processOFStream(ofCtx)
+				} else {
+					logger.Errorw("illegal-state-transition",
+						log.Fields{
+							"device-id":     ofc.DeviceID,
+							"current-state": state.String(),
+							"event":         event.String()})
 				}
-			case ofcEventDisconnected:
+			case ofcEventDisconnect:
+				logger.Debugw("ofc-event-disconnected",
+					log.Fields{
+						"device-id": ofc.DeviceID,
+						"state":     state.String()})
 				if state == ofcStateConnected {
 					state = ofcStateDisconnected
-					logger.Debugw("ofc-event-disconnected",
-						log.Fields{"device-id": ofc.DeviceID})
 					if ofDone != nil {
 						ofDone()
 						ofDone = nil
 					}
 					go ofc.establishConnectionToController()
+				} else {
+					logger.Errorw("illegal-state-transition",
+						log.Fields{
+							"device-id":     ofc.DeviceID,
+							"current-state": state.String(),
+							"event":         event.String()})
+				}
+			case ofcEventStop:
+				logger.Debugw("ofc-event-stop",
+					log.Fields{"device-id": ofc.DeviceID})
+				if state == ofcStateCreated || state == ofcStateConnected || state == ofcStateDisconnected {
+					state = ofcStateStopped
+					break top
+				} else {
+					logger.Errorw("illegal-state-transition",
+						log.Fields{
+							"device-id":     ofc.DeviceID,
+							"current-state": state.String(),
+							"event":         event.String()})
 				}
 			}
+			logger.Debugw("state-transition",
+				log.Fields{
+					"device-id":      ofc.DeviceID,
+					"previous-state": previous.String(),
+					"current-state":  state.String(),
+					"event":          event.String()})
 		}
 	}
 
+	// If the child context exists, then cancel it
 	if ofDone != nil {
+		log.Debugw("closing-child-processes",
+			log.Fields{"device-id": ofc.DeviceID})
 		ofDone()
 		ofDone = nil
 	}
 
+	// If the connection is open, then close it
+	if ofc.conn != nil {
+		log.Debugw("closing-of-connection",
+			log.Fields{"device-id": ofc.DeviceID})
+		ofc.conn.Close()
+		ofc.conn = nil
+	}
+	log.Debugw("state-machine-finished",
+		log.Fields{"device-id": ofc.DeviceID})
 }
 
-// Run run loop for the openflow client
+// processOFStream processes the OF connection from the controller and invokes
+// the appropriate handler methods for each message.
 func (ofc *OFClient) processOFStream(ctx context.Context) {
-	buf := make([]byte, 1500)
-	var need, have int
-	/*
-	 * EXPLANATION
-	 *
-	 * The below loops reuses a byte array to read messages from the TCP
-	 * connection to the OF controller. It reads messages into a large
-	 * buffer in an attempt to optimize the read performance from the
-	 * TCP connection. This means that on any given read there may be more
-	 * than a single message in the byte array read.
-	 *
-	 * As the minimal size for an OF message is 8 bytes (because that is
-	 * the size of the basic header) we know that if we have not read
-	 * 8 bytes we need to read more before we can process a message.
-	 *
-	 * Once the mninium header is read, the complete length of the
-	 * message is retrieved from the header and bytes are repeatedly read
-	 * until we know the byte array contains at least one message.
-	 *
-	 * Once it is known that the buffer has at least a single message
-	 * a slice (msg) is moved through the read bytes looking to process
-	 * each message util the length of read data is < the length required
-	 * i.e., the minimum size or the size of the next message.
-	 *
-	 * When no more message can be proessed from the byte array any unused
-	 * bytes are moved to the front of the source array and more data is
-	 * read from the TCP connection.
-	 */
+	fromController := bufio.NewReader(ofc.conn)
 
 	/*
-	 * First thing we are looking for is an openflow header, so we need at
-	 * least 8 bytes
+	 * We have a read buffer of a max size of 4096, so if we ever have
+	 * a message larger than this then we will have issues
 	 */
-	need = 8
+	buf := make([]byte, 4096)
 
 top:
 	// Continue until we are told to stop
-	for ofc.KeepRunning {
-		logger.Debugw("before-read-from-controller",
-			log.Fields{
-				"device-id":  ofc.DeviceID,
-				"have":       have,
-				"need":       need,
-				"buf-length": len(buf[have:])})
-		read, err := ofc.conn.Read(buf[have:])
-		have += read
-		logger.Debugw("read-from-controller",
-			log.Fields{
-				"device-id":  ofc.DeviceID,
-				"byte-count": read,
-				"error":      err})
+	for {
+		select {
+		case <-ctx.Done():
+			logger.Error("of-loop-ending-context-done")
+			break top
+		default:
+			// Read 8 bytes, the standard OF header
+			read, err := io.ReadFull(fromController, buf[:8])
+			if err != nil {
+				logger.Errorw("bad-of-header",
+					log.Fields{
+						"byte-count": read,
+						"device-id":  ofc.DeviceID,
+						"error":      err})
+				break top
+			}
 
-		/*
-		 * If we have less than we need and there is no
-		 * error, then continue to attempt to read more data
-		 */
-		if have < need && err == nil {
-			// No bytes available, just continue
-			logger.Debugw("continue-to-read",
-				log.Fields{
-					"device-id": ofc.DeviceID,
-					"have":      have,
-					"need":      need,
-					"error":     err})
-			continue
-		}
-
-		/*
-		 * Single out EOF here, because if we have bytes
-		 * but have an EOF we still want to process the
-		 * the last meesage. A read of 0 bytes and EOF is
-		 * a terminated connection.
-		 */
-		if err != nil && (err != io.EOF || read == 0) {
-			logger.Errorw("voltha-connection-dead",
-				log.Fields{
-					"device-id": ofc.DeviceID,
-					"error":     err})
-			break
-		}
-
-		/*
-		 * We should have at least 1 message at this point so
-		 * create a slice (msg) that points to the start of the
-		 * buffer
-		 */
-		msg := buf[0:]
-		for need <= have {
-			logger.Debugw("process-of-message-stream",
-				log.Fields{
-					"device-id": ofc.DeviceID,
-					"have":      have,
-					"need":      need})
-			/*
-			 * If we get here, we have at least the 8 bytes of the
-			 * header, if not enough for the complete message. So
-			 * take a peek at the OF header to do simple validation
-			 * and be able to get the full expected length of the
-			 * packet
-			 */
-			peek, err := ofc.peekAtOFHeader(msg)
+			// Decode the header
+			peek, err := ofc.peekAtOFHeader(buf[:8])
 			if err != nil {
 				/*
 				 * Header is bad, assume stream is corrupted
@@ -303,29 +332,20 @@
 				break top
 			}
 
-			/*
-			 * If we don't have the full packet, then back around
-			 * the outer loop to get more bytes
-			 */
-			need = int(peek.GetLength())
-
-			logger.Debugw("processed-header-need-message",
-				log.Fields{
-					"device-id": ofc.DeviceID,
-					"have":      have,
-					"need":      need})
-
-			if have < need {
-				logger.Debugw("end-processing:continue-to-read",
+			// Calculate the size of the rest of the packet and read it
+			need := int(peek.GetLength())
+			read, err = io.ReadFull(fromController, buf[8:need])
+			if err != nil {
+				logger.Errorw("bad-of-packet",
 					log.Fields{
-						"device-id": ofc.DeviceID,
-						"have":      have,
-						"need":      need})
-				break
+						"byte-count": read,
+						"device-id":  ofc.DeviceID,
+						"error":      err})
+				break top
 			}
 
 			// Decode and process the packet
-			decoder := goloxi.NewDecoder(msg)
+			decoder := goloxi.NewDecoder(buf[:need])
 			header, err := ofp.DecodeHeader(decoder)
 			if err != nil {
 				js, _ := json.Marshal(decoder)
@@ -344,34 +364,11 @@
 						"header":    js})
 			}
 			ofc.parseHeader(header)
-
-			/*
-			 * Move the msg slice to the start of the next
-			 * message, which is the current message plus the
-			 * used bytes (need)
-			 */
-			msg = msg[need:]
-			have -= need
-
-			// Finished process method, need header again
-			need = 8
-
-			logger.Debugw("message-process-complete",
-				log.Fields{
-					"device-id":   ofc.DeviceID,
-					"have":        have,
-					"need":        need,
-					"read-length": len(buf[have:])})
-		}
-		/*
-		 * If we have any left over bytes move them to the front
-		 * of the byte array to be appended to bny the next read
-		 */
-		if have > 0 {
-			copy(buf, msg)
 		}
 	}
-	ofc.events <- ofcEventDisconnected
+	logger.Debugw("end-of-stream",
+		log.Fields{"device-id": ofc.DeviceID})
+	ofc.events <- ofcEventDisconnect
 }
 
 func (ofc *OFClient) sayHello() {
@@ -451,7 +448,8 @@
 	}
 }
 
-//Message created to allow for a single SendMessage
+// Message interface that represents an open flow message and enables for a
+// unified implementation of SendMessage
 type Message interface {
 	Serialize(encoder *goloxi.Encoder) error
 }
@@ -464,7 +462,7 @@
 	msg.Serialize(enc)
 	bytes := enc.Bytes()
 	if _, err := ofc.conn.Write(bytes); err != nil {
-		logger.Warnw("unable-to-send-message-to-controller",
+		logger.Errorw("unable-to-send-message-to-controller",
 			log.Fields{
 				"device-id": ofc.DeviceID,
 				"message":   msg,
@@ -475,11 +473,10 @@
 }
 
 func (ofc *OFClient) messageSender(ctx context.Context) {
-
 	// first process last fail if it exists
 	if ofc.lastUnsentMessage != nil {
 		if err := ofc.doSend(ofc.lastUnsentMessage); err != nil {
-			ofc.events <- ofcEventDisconnected
+			ofc.events <- ofcEventDisconnect
 			return
 		}
 		ofc.lastUnsentMessage = nil
@@ -490,9 +487,9 @@
 		case <-ctx.Done():
 			break top
 		case msg := <-ofc.sendChannel:
-			if ofc.doSend(msg) != nil {
+			if err := ofc.doSend(msg); err != nil {
 				ofc.lastUnsentMessage = msg
-				ofc.events <- ofcEventDisconnected
+				ofc.events <- ofcEventDisconnect
 				return
 			}
 			ofc.lastUnsentMessage = nil
@@ -500,39 +497,8 @@
 	}
 }
 
+// SendMessage queues a message to be sent to the openflow controller
 func (ofc *OFClient) SendMessage(message Message) error {
 	ofc.sendChannel <- message
 	return nil
 }
-
-//SendMessage sends message to openflow server
-func (ofc *OFClient) SendMessageOrig(message Message) error {
-	if logger.V(log.DebugLevel) {
-		js, _ := json.Marshal(message)
-		logger.Debugw("SendMessage called",
-			log.Fields{
-				"device-id": ofc.DeviceID,
-				"message":   js})
-	}
-	enc := goloxi.NewEncoder()
-	message.Serialize(enc)
-	for {
-		if ofc.conn == nil {
-			logger.Warnln("SendMessage Connection is Nil sleeping for 10 milliseconds")
-			time.Sleep(10 * time.Millisecond)
-		} else {
-			break
-		}
-	}
-	bytes := enc.Bytes()
-	if _, err := ofc.conn.Write(bytes); err != nil {
-		jMessage, _ := json.Marshal(message)
-		logger.Errorw("SendMessage failed sending message",
-			log.Fields{
-				"device-id": ofc.DeviceID,
-				"error":     err,
-				"message":   jMessage})
-		return err
-	}
-	return nil
-}
diff --git a/internal/pkg/openflow/stats.go b/internal/pkg/openflow/stats.go
index 68d2625..17db448 100644
--- a/internal/pkg/openflow/stats.go
+++ b/internal/pkg/openflow/stats.go
@@ -48,7 +48,7 @@
 		if logger.V(log.DebugLevel) {
 			reqJs, _ := json.Marshal(statsReq)
 			resJs, _ := json.Marshal(response)
-			logger.Debugw("handle-stats-request",
+			logger.Debugw("handle-stats-request-desc",
 				log.Fields{
 					"device-id": ofc.DeviceID,
 					"request":   reqJs,
@@ -65,7 +65,7 @@
 		if logger.V(log.DebugLevel) {
 			reqJs, _ := json.Marshal(statsReq)
 			resJs, _ := json.Marshal(response)
-			logger.Debugw("handle-stats-request",
+			logger.Debugw("handle-stats-request-flow",
 				log.Fields{
 					"device-id": ofc.DeviceID,
 					"request":   reqJs,
@@ -82,7 +82,7 @@
 		if logger.V(log.DebugLevel) {
 			reqJs, _ := json.Marshal(statsReq)
 			resJs, _ := json.Marshal(response)
-			logger.Debugw("handle-stats-request",
+			logger.Debugw("handle-stats-request-aggregate",
 				log.Fields{
 					"device-id": ofc.DeviceID,
 					"request":   reqJs,
@@ -95,7 +95,7 @@
 		if logger.V(log.DebugLevel) {
 			reqJs, _ := json.Marshal(statsReq)
 			resJs, _ := json.Marshal(response)
-			logger.Debugw("handle-stats-request",
+			logger.Debugw("handle-stats-request-table",
 				log.Fields{
 					"device-id": ofc.DeviceID,
 					"request":   reqJs,
@@ -114,7 +114,7 @@
 		if logger.V(log.DebugLevel) {
 			reqJs, _ := json.Marshal(statsReq)
 			resJs, _ := json.Marshal(response)
-			logger.Debugw("handle-stats-request",
+			logger.Debugw("handle-stats-request-port",
 				log.Fields{
 					"device-id": ofc.DeviceID,
 					"request":   reqJs,
@@ -130,7 +130,7 @@
 		if logger.V(log.DebugLevel) {
 			reqJs, _ := json.Marshal(statsReq)
 			resJs, _ := json.Marshal(response)
-			logger.Debugw("handle-stats-request",
+			logger.Debugw("handle-stats-request-queue",
 				log.Fields{
 					"device-id": ofc.DeviceID,
 					"request":   reqJs,
@@ -146,7 +146,7 @@
 		if logger.V(log.DebugLevel) {
 			reqJs, _ := json.Marshal(statsReq)
 			resJs, _ := json.Marshal(response)
-			logger.Debugw("handle-stats-request",
+			logger.Debugw("handle-stats-request-group",
 				log.Fields{
 					"device-id": ofc.DeviceID,
 					"request":   reqJs,
@@ -162,7 +162,7 @@
 		if logger.V(log.DebugLevel) {
 			reqJs, _ := json.Marshal(statsReq)
 			resJs, _ := json.Marshal(response)
-			logger.Debugw("handle-stats-request",
+			logger.Debugw("handle-stats-request-group-desc",
 				log.Fields{
 					"device-id": ofc.DeviceID,
 					"request":   reqJs,
@@ -179,7 +179,7 @@
 		if logger.V(log.DebugLevel) {
 			reqJs, _ := json.Marshal(statsReq)
 			resJs, _ := json.Marshal(response)
-			logger.Debugw("handle-stats-request",
+			logger.Debugw("handle-stats-request-group-features",
 				log.Fields{
 					"device-id": ofc.DeviceID,
 					"request":   reqJs,
@@ -195,7 +195,7 @@
 		if logger.V(log.DebugLevel) {
 			reqJs, _ := json.Marshal(statsReq)
 			resJs, _ := json.Marshal(response)
-			logger.Debugw("handle-stats-request",
+			logger.Debugw("handle-stats-request-meter",
 				log.Fields{
 					"device-id": ofc.DeviceID,
 					"request":   reqJs,
@@ -211,7 +211,7 @@
 		if logger.V(log.DebugLevel) {
 			reqJs, _ := json.Marshal(statsReq)
 			resJs, _ := json.Marshal(response)
-			logger.Debugw("handle-stats-request",
+			logger.Debugw("handle-stats-request-meter-config",
 				log.Fields{
 					"device-id": ofc.DeviceID,
 					"request":   reqJs,
@@ -227,7 +227,7 @@
 		if logger.V(log.DebugLevel) {
 			reqJs, _ := json.Marshal(statsReq)
 			resJs, _ := json.Marshal(response)
-			logger.Debugw("handle-stats-request",
+			logger.Debugw("handle-stats-request-meter-features",
 				log.Fields{
 					"device-id": ofc.DeviceID,
 					"request":   reqJs,
@@ -243,7 +243,7 @@
 		if logger.V(log.DebugLevel) {
 			reqJs, _ := json.Marshal(statsReq)
 			resJs, _ := json.Marshal(response)
-			logger.Debugw("handle-stats-request",
+			logger.Debugw("handle-stats-request-table-features",
 				log.Fields{
 					"device-id": ofc.DeviceID,
 					"request":   reqJs,
@@ -259,7 +259,7 @@
 		if logger.V(log.DebugLevel) {
 			reqJs, _ := json.Marshal(statsReq)
 			resJs, _ := json.Marshal(response)
-			logger.Debugw("handle-stats-request",
+			logger.Debugw("handle-stats-request-port-desc",
 				log.Fields{
 					"device-id": ofc.DeviceID,
 					"request":   reqJs,
@@ -276,7 +276,7 @@
 		if logger.V(log.DebugLevel) {
 			reqJs, _ := json.Marshal(statsReq)
 			resJs, _ := json.Marshal(response)
-			logger.Debugw("handle-stats-request",
+			logger.Debugw("handle-stats-request-experimenter",
 				log.Fields{
 					"device-id": ofc.DeviceID,
 					"request":   reqJs,
@@ -465,9 +465,10 @@
 	if err != nil {
 		return nil, err
 	}
-	size := uint16(40)
+	size := uint16(5) // size of stats header
 	var meterStats []*ofp.MeterStats
 	for _, item := range resp.Items {
+		entrySize := uint16(40) // size of entry header
 		meterStat := ofp.NewMeterStats()
 		stats := item.Stats
 		meterStat.DurationNsec = stats.DurationNsec
@@ -481,13 +482,15 @@
 			bandStat.ByteBandCount = bStat.ByteBandCount
 			bandStat.PacketBandCount = bStat.PacketBandCount
 			bandStats = append(bandStats, bandStat)
-			size += 16
+			entrySize += uint16(16) // size of each band stat
 		}
 		meterStat.SetBandStats(bandStats)
-		meterStat.Len = size
+		meterStat.Len = entrySize
 		meterStats = append(meterStats, meterStat)
+		size += entrySize
 	}
 	response.SetEntries(meterStats)
+	response.SetLength(size)
 	return response, nil
 }