[VOL-2641] Returning errors for flows and groups from the open olt adapter to the core

Change-Id: I513c2d391ddfa99d82589ab974956b1f4359bb68
diff --git a/VERSION b/VERSION
index 0e80d41..be14458 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-2.3.14
+2.3.15
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index 1fe5770..67b852a 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -1199,6 +1199,9 @@
 //UpdateFlowsIncrementally updates the device flow
 func (dh *DeviceHandler) UpdateFlowsIncrementally(ctx context.Context, device *voltha.Device, flows *of.FlowChanges, groups *of.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) error {
 	log.Debugw("Received-incremental-flowupdate-in-device-handler", log.Fields{"deviceID": device.Id, "flows": flows, "groups": groups, "flowMetadata": flowMetadata})
+
+	var errorsList []error
+
 	if flows != nil {
 		for _, flow := range flows.ToRemove.Items {
 			log.Debug("Removing flow", log.Fields{"deviceId": device.Id, "flowToRemove": flow})
@@ -1207,7 +1210,10 @@
 
 		for _, flow := range flows.ToAdd.Items {
 			log.Debug("Adding flow", log.Fields{"deviceId": device.Id, "flowToAdd": flow})
-			dh.flowMgr.AddFlow(ctx, flow, flowMetadata)
+			err := dh.flowMgr.AddFlow(ctx, flow, flowMetadata)
+			if err != nil {
+				errorsList = append(errorsList, err)
+			}
 		}
 	}
 	if groups != nil && flows != nil {
@@ -1219,15 +1225,24 @@
 
 	if groups != nil {
 		for _, group := range groups.ToAdd.Items {
-			dh.flowMgr.AddGroup(ctx, group)
+			err := dh.flowMgr.AddGroup(ctx, group)
+			if err != nil {
+				errorsList = append(errorsList, err)
+			}
 		}
 		for _, group := range groups.ToUpdate.Items {
-			dh.flowMgr.ModifyGroup(ctx, group)
+			err := dh.flowMgr.ModifyGroup(ctx, group)
+			if err != nil {
+				errorsList = append(errorsList, err)
+			}
 		}
 		if len(groups.ToRemove.Items) != 0 {
 			log.Debug("Group delete operation is not supported for now")
 		}
 	}
+	if len(errorsList) > 0 {
+		return fmt.Errorf("errors-installing-flows-groups, errors:%v", errorsList)
+	}
 	log.Debug("UpdateFlowsIncrementally done successfully")
 	return nil
 }
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index e29eb0e..dd33760 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -22,6 +22,7 @@
 	"crypto/md5"
 	"encoding/hex"
 	"encoding/json"
+	"errors"
 	"fmt"
 	"math/big"
 	"strings"
@@ -1812,7 +1813,7 @@
 
 // AddFlow add flow to device
 // nolint: gocyclo
-func (f *OpenOltFlowMgr) AddFlow(ctx context.Context, flow *ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) {
+func (f *OpenOltFlowMgr) AddFlow(ctx context.Context, flow *ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) error {
 	classifierInfo := make(map[string]interface{})
 	actionInfo := make(map[string]interface{})
 	var UsMeterID uint32
@@ -1825,20 +1826,19 @@
 	if err != nil {
 		// Error logging is already done in the called function
 		// So just return in case of error
-		return
+		return err
 	}
 
 	if flows.HasGroup(flow) {
 		// handle multicast flow
-		f.handleFlowWithGroup(ctx, actionInfo, classifierInfo, flow)
-		return
+		return f.handleFlowWithGroup(ctx, actionInfo, classifierInfo, flow)
 	}
 
 	/* Controller bound trap flows */
 	err = formulateControllerBoundTrapFlowInfo(actionInfo, classifierInfo, flow)
 	if err != nil {
 		// error if any, already logged in the called function
-		return
+		return err
 	}
 
 	log.Infow("Flow ports", log.Fields{"classifierInfo_inport": classifierInfo[InPort], "action_output": actionInfo[Output]})
@@ -1847,8 +1847,7 @@
 	if ethType, ok := classifierInfo[EthType]; ok {
 		if ethType.(uint32) == LldpEthType {
 			log.Info("Adding LLDP flow")
-			f.addLLDPFlow(ctx, flow, portNo)
-			return
+			return f.addLLDPFlow(ctx, flow, portNo)
 		}
 	}
 	if ipProto, ok := classifierInfo[IPProto]; ok {
@@ -1856,16 +1855,14 @@
 			if udpSrc, ok := classifierInfo[UDPSrc]; ok {
 				if udpSrc.(uint32) == uint32(67) || udpSrc.(uint32) == uint32(546) {
 					log.Debug("trap-dhcp-from-nni-flow")
-					f.addDHCPTrapFlowOnNNI(ctx, flow, classifierInfo, portNo)
-					return
+					return f.addDHCPTrapFlowOnNNI(ctx, flow, classifierInfo, portNo)
 				}
 			}
 		}
 	}
 	if isIgmpTrapDownstreamFlow(classifierInfo) {
 		log.Debug("trap-igmp-from-nni-flow")
-		f.addIgmpTrapFlowOnNNI(ctx, flow, classifierInfo, portNo)
-		return
+		return f.addIgmpTrapFlowOnNNI(ctx, flow, classifierInfo, portNo)
 	}
 
 	f.deviceHandler.AddUniPortToOnu(intfID, onuID, portNo)
@@ -1874,7 +1871,7 @@
 	TpID, err := getTpIDFromFlow(flow)
 	if err != nil {
 		log.Error("metadata-is-not-present-invalid-flow-to-process", log.Fields{"pon": intfID, "onuID": onuID, "uniID": uniID})
-		return
+		return fmt.Errorf("metadata-is-not-present-invalid-flow-to-process, pon:%v, onuID:%v, uniID:%v", intfID, onuID, uniID)
 	}
 	log.Debugw("TPID for this subcriber", log.Fields{"TpId": TpID, "pon": intfID, "onuID": onuID, "uniID": uniID})
 	if IsUpstream(actionInfo[Output].(uint32)) {
@@ -1899,9 +1896,11 @@
 			f.divideAndAddFlow(ctx, intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, uint32(TpID), UsMeterID, DsMeterID, flowMetadata)
 
 		case <-time.After(10 * time.Second):
-			log.Errorw("pending flow deletes not completed after timeout", log.Fields{"pon": intfID, "onuID": onuID, "uniID": uniID})
+			log.Errorw("pending-flow-deletes-not-completed-after-timeout", log.Fields{"pon": intfID, "onuID": onuID, "uniID": uniID})
+			return fmt.Errorf("pending-flow-deletes-not-completed-after-timeout, pon:%v, onuID:%v, uniID:%v", intfID, onuID, uniID)
 		}
 	}
+	return nil
 }
 
 // handleFlowWithGroup adds multicast flow to the device.
@@ -1977,9 +1976,11 @@
 	group, _, err := f.GetFlowGroupFromKVStore(ctx, groupID, true)
 	if err == nil {
 		//calling groupAdd to set group members after multicast flow creation
-		if f.ModifyGroup(ctx, group) {
+		if err = f.ModifyGroup(ctx, group); err == nil {
 			//cached group can be removed now
 			f.resourceMgr.RemoveFlowGroupFromKVStore(ctx, groupID, true)
+		} else {
+			return olterrors.NewErrGroupOp("modify", groupID, log.Fields{"group": group}, err).Log()
 		}
 	}
 
@@ -2007,11 +2008,11 @@
 }
 
 // AddGroup add or update the group
-func (f *OpenOltFlowMgr) AddGroup(ctx context.Context, group *ofp.OfpGroupEntry) {
+func (f *OpenOltFlowMgr) AddGroup(ctx context.Context, group *ofp.OfpGroupEntry) error {
 	log.Infow("add-group", log.Fields{"group": group})
 	if group == nil {
 		log.Warn("skipping nil group")
-		return
+		return errors.New("group is nil")
 	}
 
 	groupToOlt := openoltpb2.Group{
@@ -2024,14 +2025,15 @@
 	_, err := f.deviceHandler.Client.PerformGroupOperation(ctx, &groupToOlt)
 	if err != nil {
 		log.Errorw("add-group operation failed", log.Fields{"err": err, "groupToOlt": groupToOlt})
-		return
+		return fmt.Errorf("add-group operation failed, err %v, groupToOlt %v", err, groupToOlt)
 	}
 	// group members not created yet. So let's store the group
 	if err := f.resourceMgr.AddFlowGroupToKVStore(ctx, group, true); err != nil {
 		log.Errorw("Group cannot be stored in KV store", log.Fields{"groupId": group.Desc.GroupId, "err": err})
-	} else {
-		log.Debugw("add-group operation performed on the device successfully ", log.Fields{"groupToOlt": groupToOlt})
+		return fmt.Errorf("group cannot be stored in KV store, groupId %v, err %v", group.Desc.GroupId, err)
 	}
+	log.Debugw("add-group operation performed on the device successfully ", log.Fields{"groupToOlt": groupToOlt})
+	return nil
 }
 
 //buildGroupAction creates and returns a group action
@@ -2045,21 +2047,21 @@
 }
 
 // ModifyGroup updates the group
-func (f *OpenOltFlowMgr) ModifyGroup(ctx context.Context, group *ofp.OfpGroupEntry) bool {
+func (f *OpenOltFlowMgr) ModifyGroup(ctx context.Context, group *ofp.OfpGroupEntry) error {
 	log.Infow("modify-group", log.Fields{"group": group})
 	if group == nil || group.Desc == nil {
 		log.Warn("cannot modify group; group is nil")
-		return false
+		return errors.New("cannot modify group; group is nil")
 	}
 
-	new := f.buildGroup(group.Desc.GroupId, group.Desc.Buckets)
+	newGroup := f.buildGroup(group.Desc.GroupId, group.Desc.Buckets)
 	//get existing members of the group
 	val, groupExists, err := f.GetFlowGroupFromKVStore(ctx, group.Desc.GroupId, false)
 
 	if err != nil {
 		log.Errorw("Failed to retrieve the group from the store. Cannot modify group.",
 			log.Fields{"groupId": group.Desc.GroupId, "err": err})
-		return false
+		return fmt.Errorf("failed to retrieve the group from the store. Cannot modify group. groupId:%v, err:%v", group.Desc.GroupId, err)
 	}
 
 	var current *openoltpb2.Group // represents the group on the device
@@ -2071,11 +2073,11 @@
 		current = f.buildGroup(group.Desc.GroupId, nil)
 	}
 
-	log.Debugw("modify-group: comparing current and new.", log.Fields{"group on the device": current, "new": new})
+	log.Debugw("modify-group: comparing current and new.", log.Fields{"group on the device": current, "new": newGroup})
 	// get members to be added
-	membersToBeAdded := f.findDiff(current, new)
+	membersToBeAdded := f.findDiff(current, newGroup)
 	// get members to be removed
-	membersToBeRemoved := f.findDiff(new, current)
+	membersToBeRemoved := f.findDiff(newGroup, current)
 
 	log.Infow("modify-group -> differences found", log.Fields{"membersToBeAdded": membersToBeAdded,
 		"membersToBeRemoved": membersToBeRemoved, "groupId": group.Desc.GroupId})
@@ -2083,43 +2085,48 @@
 	groupToOlt := openoltpb2.Group{
 		GroupId: group.Desc.GroupId,
 	}
-	var added, removed = true, true
+	var errAdd, errRemoved error
 	if membersToBeAdded != nil && len(membersToBeAdded) > 0 {
 		groupToOlt.Command = openoltpb2.Group_ADD_MEMBERS
 		groupToOlt.Members = membersToBeAdded
 		//execute addMembers
-		added = f.callGroupAddRemove(&groupToOlt)
+		errAdd = f.callGroupAddRemove(&groupToOlt)
 	}
 	if membersToBeRemoved != nil && len(membersToBeRemoved) > 0 {
 		groupToOlt.Command = openoltpb2.Group_REMOVE_MEMBERS
 		groupToOlt.Members = membersToBeRemoved
 		//execute removeMembers
-		removed = f.callGroupAddRemove(&groupToOlt)
+		errRemoved = f.callGroupAddRemove(&groupToOlt)
 	}
 
 	//save the modified group
-	if added && removed {
+	if errAdd == nil && errRemoved == nil {
 		if err := f.resourceMgr.AddFlowGroupToKVStore(ctx, group, false); err != nil {
-			log.Errorw("Failed to save the group into kv store", log.Fields{"groupId": group.Desc.GroupId})
+			log.Errorw("Failed to save the group into kv store", log.Fields{"groupId": group.Desc.GroupId, "error": err})
+			return fmt.Errorf("failed to save the group into kv store. groupId:%v, err:%v", group.Desc.GroupId, err)
 		}
 		log.Debugw("modify-group was success. Storing the group", log.Fields{"group": group, "existingGroup": current})
 	} else {
 		log.Warnw("One of the group add/remove operations has failed. Cannot save group modifications",
 			log.Fields{"group": group})
+		if errAdd != nil {
+			return errAdd
+		}
+		return errRemoved
 	}
-	return added && removed
+	return nil
 }
 
 //callGroupAddRemove performs add/remove buckets operation for the indicated group
-func (f *OpenOltFlowMgr) callGroupAddRemove(group *openoltpb2.Group) bool {
+func (f *OpenOltFlowMgr) callGroupAddRemove(group *openoltpb2.Group) error {
 	if err := f.performGroupOperation(group); err != nil {
 		st, _ := status.FromError(err)
 		//ignore already exists error code
 		if st.Code() != codes.AlreadyExists {
-			return false
+			return olterrors.NewErrGroupOp("groupAddRemove", group.GroupId, log.Fields{"status": st}, err)
 		}
 	}
-	return true
+	return nil
 }
 
 //findDiff compares group members and finds members which only exists in groups2
diff --git a/internal/pkg/olterrors/olterrors.go b/internal/pkg/olterrors/olterrors.go
index 175b948..33ef71e 100644
--- a/internal/pkg/olterrors/olterrors.go
+++ b/internal/pkg/olterrors/olterrors.go
@@ -281,6 +281,37 @@
 	return e
 }
 
+// ErrGroupOp represents an error condition when a group operation to a device did
+// not succeed
+type ErrGroupOp struct {
+	ErrAdapter
+}
+
+// NewErrGroupOp constructs a new error based on the given values
+func NewErrGroupOp(operation string, ID uint32, fields log.Fields, wrapped error) LoggableError {
+	return &ErrPersistence{
+		ErrAdapter{
+			name: "unable-to-perform-group-operation",
+			fields: merge(fields, log.Fields{
+				"operation": operation,
+				"id":        fmt.Sprintf("0x%x", ID)}),
+			wrapped: wrapped,
+		},
+	}
+}
+
+// Log logs the error at the default level for log and return
+func (e *ErrGroupOp) Log() error {
+	_ = e.ErrAdapter.Log()
+	return e
+}
+
+// LogAt logs the error at the specified level and then returns the error
+func (e *ErrGroupOp) LogAt(level log.LogLevel) error {
+	_ = e.ErrAdapter.LogAt(level)
+	return e
+}
+
 // ErrTimeout represents an error condition when the deadline for performing an
 // operation has been exceeded
 type ErrTimeout struct {