First Commit of Voltha-Go-Controller from Radisys

Change-Id: I8e2e908e7ab09a4fe3d86849da18b6d69dcf4ab0
diff --git a/internal/pkg/controller/addflows.go b/internal/pkg/controller/addflows.go
new file mode 100644
index 0000000..0364341
--- /dev/null
+++ b/internal/pkg/controller/addflows.go
@@ -0,0 +1,217 @@
+/*
+* Copyright 2022-present Open Networking Foundation
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package controller
+
+import (
+	"context"
+	infraerror "voltha-go-controller/internal/pkg/errorcodes"
+	infraerrorcode "voltha-go-controller/internal/pkg/errorcodes/service"
+	"time"
+
+	"voltha-go-controller/internal/pkg/of"
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
+)
+
+const (
+	//MaxRetryCount - Maximum retry attempts on failure
+	MaxRetryCount int = 1
+)
+
+// AddFlowsTask structure
+type AddFlowsTask struct {
+	taskID    uint8
+	ctx       context.Context
+	flow      *of.VoltFlow
+	device    *Device
+	timestamp string
+}
+
+// NewAddFlowsTask is constructor for AddFlowsTask
+func NewAddFlowsTask(ctx context.Context, flow *of.VoltFlow, device *Device) *AddFlowsTask {
+	var aft AddFlowsTask
+	aft.device = device
+	aft.flow = flow
+	aft.ctx = ctx
+	tstamp := (time.Now()).Format(time.RFC3339Nano)
+	aft.timestamp = tstamp
+	return &aft
+}
+
+// Name to add flow task
+func (aft *AddFlowsTask) Name() string {
+	for _, flow := range aft.flow.SubFlows {
+		logger.Infow(ctx, "Flow Cookies", log.Fields{"Cookie": flow.Cookie})
+	}
+	return "Add Flows Task"
+}
+
+// TaskID to return task ID
+func (aft *AddFlowsTask) TaskID() uint8 {
+	return aft.taskID
+}
+
+// Timestamp to return timestamp
+func (aft *AddFlowsTask) Timestamp() string {
+	return aft.timestamp
+}
+
+// Stop to stop the add flow task
+func (aft *AddFlowsTask) Stop() {
+}
+
+// Start to start adding flow task
+func (aft *AddFlowsTask) Start(ctx context.Context, taskID uint8) error {
+	var err error
+	aft.taskID = taskID
+	aft.ctx = ctx
+	flowsToProcess := make(map[uint64]*of.VoltSubFlow)
+	flowsPresent := 0
+	// First add/delete the flows first locally before passing them to actual device
+	for _, flow := range aft.flow.SubFlows {
+		logger.Infow(ctx, "Flow Mod Request", log.Fields{"Cookie": flow.Cookie, "Oper": aft.flow.Command, "Port": aft.flow.PortID})
+		if aft.flow.Command == of.CommandAdd {
+			flow.State = of.FlowAddPending
+			if err := aft.device.AddFlow(flow); err != nil {
+				logger.Warnw(ctx, "Add Flow Error", log.Fields{"Cookie": flow.Cookie, "Reason": err.Error()})
+
+				// If flow already exists in cache, check for flow state
+				// If Success: Trigger success FLow Indication
+				// if Failure: Continue process, so add-retry happens
+				if err.Error() == ErrDuplicateFlow {
+					dbFlow, _ := aft.device.GetFlow(flow.Cookie)
+					if dbFlow.State == of.FlowAddSuccess {
+						aft.device.triggerFlowNotification(flow.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil)
+						flowsPresent++
+					}
+				}
+			}
+			flowsToProcess[flow.Cookie] = flow
+		} else {
+			dbFlow, ok := aft.device.GetFlow(flow.Cookie)
+			if !ok {
+				logger.Warnw(ctx, "Delete Flow Error: Flow Does not Exist", log.Fields{"Cookie": flow.Cookie, "Device": aft.device.ID})
+			} else {
+				// dbFlow.State = of.FlowDelPending
+				// aft.device.AddFlowToDb(dbFlow)
+				flowsToProcess[flow.Cookie] = dbFlow
+			}
+			aft.device.triggerFlowNotification(flow.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil)
+		}
+	}
+
+	if flowsPresent == len(aft.flow.SubFlows) {
+		logger.Warn(ctx, "All Flows already present in database. Skipping Flow Push to SB")
+	}
+
+	// PortName and PortID are used for validation of PortID, whether it is still valid and associated with old PortName or
+	// PortID got assigned to another PortName. If the condition met, skip these flow update to voltha core
+	if aft.flow.PortName != "" && aft.flow.PortID != 0 {
+		portName, _ := aft.device.GetPortName(aft.flow.PortID)
+		if aft.flow.PortName != portName && portName != "" {
+			for _, flow := range aft.flow.SubFlows {
+				logger.Errorw(ctx, "Skip Flow Update", log.Fields{"Reason": "Port Deleted", "PortName": aft.flow.PortName, "PortNo": aft.flow.PortID, "Cookie": flow.Cookie, "Operation": aft.flow.Command})
+				if aft.flow.Command == of.CommandDel {
+					aft.device.triggerFlowNotification(flow.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil)
+				}
+			}
+			return nil
+		}
+	}
+
+	if !aft.device.isSBOperAllowed(aft.flow.ForceAction) {
+		for _, flow := range aft.flow.SubFlows {
+			logger.Errorw(ctx, "Skipping Flow Table Update", log.Fields{"Reason": "Device State not UP", "State": aft.device.State, "Cookie": flow.Cookie, "Operation": aft.flow.Command})
+		}
+		return nil
+	}
+
+	flows := of.ProcessVoltFlow(aft.device.ID, aft.flow.Command, flowsToProcess)
+	for _, flow := range flows {
+		attempt := 0
+		if vc := aft.device.VolthaClient(); vc != nil {
+			for {
+				if _, err = vc.UpdateLogicalDeviceFlowTable(aft.ctx, flow); err != nil {
+					logger.Errorw(ctx, "Update Flow Table Failed", log.Fields{"Cookie": flow.GetFlowMod().Cookie, "Reason": err.Error(), "Operation": aft.flow.Command})
+					statusCode, _ := infraerror.GetErrorInfo(err)
+
+					// Retry on flow delete failure once.
+					// Do NOT retry incase of failure with reason: Entry Not Found
+					if aft.flow.Command == of.CommandDel && statusCode != uint32(infraerrorcode.ErrNotExists) {
+						if attempt != MaxRetryCount {
+							logger.Errorw(ctx, "Retrying Flow Delete", log.Fields{"Cookie": flow.GetFlowMod().Cookie, "Attempt": attempt})
+							attempt++
+							continue
+						}
+						logger.Errorw(ctx, "Flow Delete failed even aft max retries", log.Fields{"Flow": flow, "Attempt": attempt})
+					}
+				}
+				break
+			}
+			aft.device.triggerFlowNotification(flow.FlowMod.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil)
+
+		} else {
+			logger.Errorw(ctx, "Update Flow Table Failed: Voltha Client Unavailable", log.Fields{"Flow": flow})
+		}
+	}
+	return nil
+}
+
+func isFlowOperSuccess(statusCode uint32, oper of.Command) bool {
+	volthaErrorCode := infraerrorcode.ErrorCode(statusCode)
+
+	if volthaErrorCode == infraerrorcode.ErrOk {
+		return true
+	}
+
+	if oper == of.CommandAdd && volthaErrorCode == infraerrorcode.ErrAlreadyExists {
+		return true
+
+	} else if oper == of.CommandDel && volthaErrorCode == infraerrorcode.ErrNotExists {
+		return true
+	}
+	return false
+}
+
+// func getBwAvailInfo(bwAvailInfo []*voltha.ResponseMsg) of.BwAvailDetails {
+// 	var bwInfo of.BwAvailDetails
+// 	// convert the bw details sent from olt to a struct
+// 	// received msg format:
+// 	// additional_data[Data{ResponseMsg
+// 	//{"key":"prevBW","value":"111111"},
+// 	//{"key":"presentBW","value":"10000"}]
+// 	if len(bwAvailInfo) > 1 {
+// 		prevBwResp := bwAvailInfo[0]
+// 		if prevBwResp.Key == of.PrevBwInfo {
+// 			_, err := strconv.Atoi(prevBwResp.Val)
+// 			if err == nil {
+// 				bwInfo.PrevBw = prevBwResp.Val
+// 			}
+// 		}
+
+// 		presentBwResp := bwAvailInfo[1]
+// 		if presentBwResp.Key == of.PresentBwInfo {
+// 			_, err := strconv.Atoi(prevBwResp.Val)
+// 			if err == nil {
+// 				bwInfo.PresentBw = presentBwResp.Val
+// 			}
+// 		}
+// 		if bwInfo.PresentBw == bwInfo.PrevBw {
+// 			return of.BwAvailDetails{}
+// 		}
+// 		logger.Infow(ctx, "Bandwidth-consumed-info", log.Fields{"BwConsumed": bwInfo})
+// 	}
+// 	return bwInfo
+// }