WIP - Suggesting changes (take2)

    This is not yet completed, still working on things. Eventually the plan
    is to provide the following changes

    - restructure repo to be more aligned with https://github.com/golang-standards/project-layout
    - add k8s probes
    - modifications (golang range loops, etc) to follow some golang
    practices

Change-Id: I6922cbc00b5ef17ceab183aba00a7fc59ab46480
diff --git a/internal/pkg/openflow/client.go b/internal/pkg/openflow/client.go
new file mode 100644
index 0000000..33d531d
--- /dev/null
+++ b/internal/pkg/openflow/client.go
@@ -0,0 +1,538 @@
+/*
+   Copyright 2020 the original author or authors.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+
+package openflow
+
+import (
+	"context"
+	"encoding/binary"
+	"encoding/json"
+	"errors"
+	"github.com/donNewtonAlpha/goloxi"
+	ofp "github.com/donNewtonAlpha/goloxi/of13"
+	"github.com/opencord/voltha-lib-go/v2/pkg/log"
+	"github.com/opencord/voltha-protos/v2/go/voltha"
+	"io"
+	"net"
+	"time"
+)
+
+var logger, _ = log.AddPackage(log.JSON, log.DebugLevel, nil)
+
+type ofcEvent byte
+type ofcState byte
+
+const (
+	ofcEventStart = ofcEvent(iota)
+	ofcEventConnected
+	ofcEventDisconnected
+
+	ofcStateConnected = ofcState(iota)
+	ofcStateDisconnected
+)
+
+//Client structure to hold fields of Openflow Client
+type OFClient struct {
+	OFControllerEndPoint string
+	Port                 uint16
+	DeviceID             string
+	KeepRunning          bool
+	VolthaClient         voltha.VolthaServiceClient
+	PacketOutChannel     chan *voltha.PacketOut
+	ConnectionMaxRetries int
+	ConnectionRetryDelay time.Duration
+	conn                 net.Conn
+
+	// expirimental
+	events            chan ofcEvent
+	sendChannel       chan Message
+	lastUnsentMessage Message
+}
+
+//NewClient  contstructs a new Openflow Client and then starts up
+func NewOFClient(config *OFClient) *OFClient {
+
+	ofc := OFClient{
+		DeviceID:             config.DeviceID,
+		OFControllerEndPoint: config.OFControllerEndPoint,
+		VolthaClient:         config.VolthaClient,
+		PacketOutChannel:     config.PacketOutChannel,
+		KeepRunning:          config.KeepRunning,
+		ConnectionMaxRetries: config.ConnectionMaxRetries,
+		ConnectionRetryDelay: config.ConnectionRetryDelay,
+		events:               make(chan ofcEvent, 10),
+		sendChannel:          make(chan Message, 100),
+	}
+
+	if ofc.ConnectionRetryDelay <= 0 {
+		logger.Warnw("connection retry delay not valid, setting to default",
+			log.Fields{
+				"device-id": ofc.DeviceID,
+				"value":     ofc.ConnectionRetryDelay.String(),
+				"default":   (3 * time.Second).String()})
+		ofc.ConnectionRetryDelay = 3 * time.Second
+	}
+	return &ofc
+}
+
+//End - set keepRunning to false so start loop exits
+func (ofc *OFClient) Stop() {
+	ofc.KeepRunning = false
+}
+
+func (ofc *OFClient) peekAtOFHeader(buf []byte) (ofp.IHeader, error) {
+	header := ofp.Header{}
+	header.Version = uint8(buf[0])
+	header.Type = uint8(buf[1])
+	header.Length = binary.BigEndian.Uint16(buf[2:4])
+	header.Xid = binary.BigEndian.Uint32(buf[4:8])
+
+	// TODO: add minimal validation of version and type
+
+	return &header, nil
+}
+
+func (ofc *OFClient) establishConnectionToController() error {
+	if ofc.conn != nil {
+		ofc.conn.Close()
+		ofc.conn = nil
+	}
+	try := 1
+	for ofc.ConnectionMaxRetries == 0 || try < ofc.ConnectionMaxRetries {
+		if raddr, err := net.ResolveTCPAddr("tcp", ofc.OFControllerEndPoint); err != nil {
+			logger.Debugw("openflow-client unable to resolve endpoint",
+				log.Fields{
+					"device-id": ofc.DeviceID,
+					"endpoint":  ofc.OFControllerEndPoint})
+		} else {
+			if connection, err := net.DialTCP("tcp", nil, raddr); err == nil {
+				ofc.conn = connection
+				ofc.sayHello()
+				ofc.events <- ofcEventConnected
+				return nil
+			} else {
+				logger.Warnw("openflow-client-connect-error",
+					log.Fields{
+						"device-id": ofc.DeviceID,
+						"endpoint":  ofc.OFControllerEndPoint})
+			}
+		}
+		if ofc.ConnectionMaxRetries == 0 || try < ofc.ConnectionMaxRetries {
+			if ofc.ConnectionMaxRetries != 0 {
+				try += 1
+			}
+			time.Sleep(ofc.ConnectionRetryDelay)
+		}
+	}
+	return errors.New("failed-to-connect-to-of-controller")
+}
+
+func (ofc *OFClient) Run(ctx context.Context) {
+
+	var ofCtx context.Context
+	var ofDone func()
+	ofc.events <- ofcEventStart
+	state := ofcStateDisconnected
+top:
+	for {
+		select {
+		case <-ctx.Done():
+			break top
+		case event := <-ofc.events:
+			switch event {
+			case ofcEventStart:
+				logger.Debugw("ofc-event-star",
+					log.Fields{"device-id": ofc.DeviceID})
+				go ofc.establishConnectionToController()
+			case ofcEventConnected:
+				if state == ofcStateDisconnected {
+					state = ofcStateConnected
+					logger.Debugw("ofc-event-connected",
+						log.Fields{"device-id": ofc.DeviceID})
+					ofCtx, ofDone = context.WithCancel(context.Background())
+					go ofc.messageSender(ofCtx)
+					go ofc.processOFStream(ofCtx)
+				}
+			case ofcEventDisconnected:
+				if state == ofcStateConnected {
+					state = ofcStateDisconnected
+					logger.Debugw("ofc-event-disconnected",
+						log.Fields{"device-id": ofc.DeviceID})
+					if ofDone != nil {
+						ofDone()
+						ofDone = nil
+					}
+					go ofc.establishConnectionToController()
+				}
+			}
+		}
+	}
+
+	if ofDone != nil {
+		ofDone()
+		ofDone = nil
+	}
+
+}
+
+// Run run loop for the openflow client
+func (ofc *OFClient) processOFStream(ctx context.Context) {
+	buf := make([]byte, 1500)
+	var need, have int
+	/*
+	 * EXPLANATION
+	 *
+	 * The below loops reuses a byte array to read messages from the TCP
+	 * connection to the OF controller. It reads messages into a large
+	 * buffer in an attempt to optimize the read performance from the
+	 * TCP connection. This means that on any given read there may be more
+	 * than a single message in the byte array read.
+	 *
+	 * As the minimal size for an OF message is 8 bytes (because that is
+	 * the size of the basic header) we know that if we have not read
+	 * 8 bytes we need to read more before we can process a message.
+	 *
+	 * Once the mninium header is read, the complete length of the
+	 * message is retrieved from the header and bytes are repeatedly read
+	 * until we know the byte array contains at least one message.
+	 *
+	 * Once it is known that the buffer has at least a single message
+	 * a slice (msg) is moved through the read bytes looking to process
+	 * each message util the length of read data is < the length required
+	 * i.e., the minimum size or the size of the next message.
+	 *
+	 * When no more message can be proessed from the byte array any unused
+	 * bytes are moved to the front of the source array and more data is
+	 * read from the TCP connection.
+	 */
+
+	/*
+	 * First thing we are looking for is an openflow header, so we need at
+	 * least 8 bytes
+	 */
+	need = 8
+
+top:
+	// Continue until we are told to stop
+	for ofc.KeepRunning {
+		logger.Debugw("before-read-from-controller",
+			log.Fields{
+				"device-id":  ofc.DeviceID,
+				"have":       have,
+				"need":       need,
+				"buf-length": len(buf[have:])})
+		read, err := ofc.conn.Read(buf[have:])
+		have += read
+		logger.Debugw("read-from-controller",
+			log.Fields{
+				"device-id":  ofc.DeviceID,
+				"byte-count": read,
+				"error":      err})
+
+		/*
+		 * If we have less than we need and there is no
+		 * error, then continue to attempt to read more data
+		 */
+		if have < need && err == nil {
+			// No bytes available, just continue
+			logger.Debugw("continue-to-read",
+				log.Fields{
+					"device-id": ofc.DeviceID,
+					"have":      have,
+					"need":      need,
+					"error":     err})
+			continue
+		}
+
+		/*
+		 * Single out EOF here, because if we have bytes
+		 * but have an EOF we still want to process the
+		 * the last meesage. A read of 0 bytes and EOF is
+		 * a terminated connection.
+		 */
+		if err != nil && (err != io.EOF || read == 0) {
+			logger.Errorw("voltha-connection-dead",
+				log.Fields{
+					"device-id": ofc.DeviceID,
+					"error":     err})
+			break
+		}
+
+		/*
+		 * We should have at least 1 message at this point so
+		 * create a slice (msg) that points to the start of the
+		 * buffer
+		 */
+		msg := buf[0:]
+		for need <= have {
+			logger.Debugw("process-of-message-stream",
+				log.Fields{
+					"device-id": ofc.DeviceID,
+					"have":      have,
+					"need":      need})
+			/*
+			 * If we get here, we have at least the 8 bytes of the
+			 * header, if not enough for the complete message. So
+			 * take a peek at the OF header to do simple validation
+			 * and be able to get the full expected length of the
+			 * packet
+			 */
+			peek, err := ofc.peekAtOFHeader(msg)
+			if err != nil {
+				/*
+				 * Header is bad, assume stream is corrupted
+				 * and needs to be restarted
+				 */
+				logger.Errorw("bad-of-packet",
+					log.Fields{
+						"device-id": ofc.DeviceID,
+						"error":     err})
+				break top
+			}
+
+			/*
+			 * If we don't have the full packet, then back around
+			 * the outer loop to get more bytes
+			 */
+			need = int(peek.GetLength())
+
+			logger.Debugw("processed-header-need-message",
+				log.Fields{
+					"device-id": ofc.DeviceID,
+					"have":      have,
+					"need":      need})
+
+			if have < need {
+				logger.Debugw("end-processing:continue-to-read",
+					log.Fields{
+						"device-id": ofc.DeviceID,
+						"have":      have,
+						"need":      need})
+				break
+			}
+
+			// Decode and process the packet
+			decoder := goloxi.NewDecoder(msg)
+			header, err := ofp.DecodeHeader(decoder)
+			if err != nil {
+				js, _ := json.Marshal(decoder)
+				logger.Errorw("failed-to-decode",
+					log.Fields{
+						"device-id": ofc.DeviceID,
+						"decoder":   js,
+						"error":     err})
+				break top
+			}
+			if logger.V(log.DebugLevel) {
+				js, _ := json.Marshal(header)
+				logger.Debugw("packet-header",
+					log.Fields{
+						"device-id": ofc.DeviceID,
+						"header":    js})
+			}
+			ofc.parseHeader(header)
+
+			/*
+			 * Move the msg slice to the start of the next
+			 * message, which is the current message plus the
+			 * used bytes (need)
+			 */
+			msg = msg[need:]
+			have -= need
+
+			// Finished process method, need header again
+			need = 8
+
+			logger.Debugw("message-process-complete",
+				log.Fields{
+					"device-id":   ofc.DeviceID,
+					"have":        have,
+					"need":        need,
+					"read-length": len(buf[have:])})
+		}
+		/*
+		 * If we have any left over bytes move them to the front
+		 * of the byte array to be appended to bny the next read
+		 */
+		if have > 0 {
+			copy(buf, msg)
+		}
+	}
+	ofc.events <- ofcEventDisconnected
+}
+
+func (ofc *OFClient) sayHello() {
+	hello := ofp.NewHello()
+	hello.Xid = uint32(GetXid())
+	elem := ofp.NewHelloElemVersionbitmap()
+	elem.SetType(ofp.OFPHETVersionbitmap)
+	elem.SetLength(8)
+	elem.SetBitmaps([]*ofp.Uint32{&ofp.Uint32{Value: 16}})
+	hello.SetElements([]ofp.IHelloElem{elem})
+	if logger.V(log.DebugLevel) {
+		js, _ := json.Marshal(hello)
+		logger.Debugw("sayHello Called",
+			log.Fields{
+				"device-id":     ofc.DeviceID,
+				"hello-message": js})
+	}
+	if err := ofc.SendMessage(hello); err != nil {
+		logger.Fatalw("Failed saying hello to Openflow Server, unable to proceed",
+			log.Fields{
+				"device-id": ofc.DeviceID,
+				"error":     err})
+	}
+}
+
+func (ofc *OFClient) parseHeader(header ofp.IHeader) {
+	switch header.GetType() {
+	case ofp.OFPTHello:
+		//x := header.(*ofp.Hello)
+	case ofp.OFPTError:
+		go ofc.handleErrMsg(header.(*ofp.ErrorMsg))
+	case ofp.OFPTEchoRequest:
+		go ofc.handleEchoRequest(header.(*ofp.EchoRequest))
+	case ofp.OFPTEchoReply:
+	case ofp.OFPTExperimenter:
+	case ofp.OFPTFeaturesRequest:
+		go ofc.handleFeatureRequest(header.(*ofp.FeaturesRequest))
+	case ofp.OFPTFeaturesReply:
+	case ofp.OFPTGetConfigRequest:
+		go ofc.handleGetConfigRequest(header.(*ofp.GetConfigRequest))
+	case ofp.OFPTGetConfigReply:
+	case ofp.OFPTSetConfig:
+		go ofc.handleSetConfig(header.(*ofp.SetConfig))
+	case ofp.OFPTPacketIn:
+	case ofp.OFPTFlowRemoved:
+	case ofp.OFPTPortStatus:
+	case ofp.OFPTPacketOut:
+		go ofc.handlePacketOut(header.(*ofp.PacketOut))
+	case ofp.OFPTFlowMod:
+		/*
+		 * Not using go routine to handle flow* messages or barrier requests
+		 * onos typically issues barrier requests just before a flow* message.
+		 * by handling in this thread I ensure all flow* are handled when barrier
+		 * request is issued.
+		 */
+		switch header.(ofp.IFlowMod).GetCommand() {
+		case ofp.OFPFCAdd:
+			ofc.handleFlowAdd(header.(*ofp.FlowAdd))
+		case ofp.OFPFCModify:
+			ofc.handleFlowMod(header.(*ofp.FlowMod))
+		case ofp.OFPFCModifyStrict:
+			ofc.handleFlowModStrict(header.(*ofp.FlowModifyStrict))
+		case ofp.OFPFCDelete:
+			ofc.handleFlowDelete(header.(*ofp.FlowDelete))
+		case ofp.OFPFCDeleteStrict:
+			ofc.handleFlowDeleteStrict(header.(*ofp.FlowDeleteStrict))
+		}
+	case ofp.OFPTStatsRequest:
+		go ofc.handleStatsRequest(header, header.(ofp.IStatsRequest).GetStatsType())
+	case ofp.OFPTBarrierRequest:
+		/* See note above at case ofp.OFPTFlowMod:*/
+		ofc.handleBarrierRequest(header.(*ofp.BarrierRequest))
+	case ofp.OFPTRoleRequest:
+		go ofc.handleRoleRequest(header.(*ofp.RoleRequest))
+	case ofp.OFPTMeterMod:
+		go ofc.handleMeterModRequest(header.(*ofp.MeterMod))
+	}
+}
+
+//Message created to allow for a single SendMessage
+type Message interface {
+	Serialize(encoder *goloxi.Encoder) error
+}
+
+func (ofc *OFClient) doSend(msg Message) error {
+	if ofc.conn == nil {
+		return errors.New("no-connection")
+	}
+	enc := goloxi.NewEncoder()
+	msg.Serialize(enc)
+	bytes := enc.Bytes()
+	if _, err := ofc.conn.Write(bytes); err != nil {
+		logger.Warnw("unable-to-send-message-to-controller",
+			log.Fields{
+				"device-id": ofc.DeviceID,
+				"message":   msg,
+				"error":     err})
+		return err
+	}
+	return nil
+}
+
+func (ofc *OFClient) messageSender(ctx context.Context) {
+
+	// first process last fail if it exists
+	if ofc.lastUnsentMessage != nil {
+		if err := ofc.doSend(ofc.lastUnsentMessage); err != nil {
+			ofc.events <- ofcEventDisconnected
+			return
+		}
+		ofc.lastUnsentMessage = nil
+	}
+top:
+	for {
+		select {
+		case <-ctx.Done():
+			break top
+		case msg := <-ofc.sendChannel:
+			if ofc.doSend(msg) != nil {
+				ofc.lastUnsentMessage = msg
+				ofc.events <- ofcEventDisconnected
+				return
+			}
+			ofc.lastUnsentMessage = nil
+		}
+	}
+}
+
+func (ofc *OFClient) SendMessage(message Message) error {
+	ofc.sendChannel <- message
+	return nil
+}
+
+//SendMessage sends message to openflow server
+func (ofc *OFClient) SendMessageOrig(message Message) error {
+	if logger.V(log.DebugLevel) {
+		js, _ := json.Marshal(message)
+		logger.Debugw("SendMessage called",
+			log.Fields{
+				"device-id": ofc.DeviceID,
+				"message":   js})
+	}
+	enc := goloxi.NewEncoder()
+	message.Serialize(enc)
+	for {
+		if ofc.conn == nil {
+			logger.Warnln("SendMessage Connection is Nil sleeping for 10 milliseconds")
+			time.Sleep(10 * time.Millisecond)
+		} else {
+			break
+		}
+	}
+	bytes := enc.Bytes()
+	if _, err := ofc.conn.Write(bytes); err != nil {
+		jMessage, _ := json.Marshal(message)
+		logger.Errorw("SendMessage failed sending message",
+			log.Fields{
+				"device-id": ofc.DeviceID,
+				"error":     err,
+				"message":   jMessage})
+		return err
+	}
+	return nil
+}