[VOL-2641] Returning errors for flows and groups from the open olt adapter to the core
Change-Id: I513c2d391ddfa99d82589ab974956b1f4359bb68
diff --git a/VERSION b/VERSION
index 0e80d41..be14458 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-2.3.14
+2.3.15
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index 1fe5770..67b852a 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -1199,6 +1199,9 @@
//UpdateFlowsIncrementally updates the device flow
func (dh *DeviceHandler) UpdateFlowsIncrementally(ctx context.Context, device *voltha.Device, flows *of.FlowChanges, groups *of.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) error {
log.Debugw("Received-incremental-flowupdate-in-device-handler", log.Fields{"deviceID": device.Id, "flows": flows, "groups": groups, "flowMetadata": flowMetadata})
+
+ var errorsList []error
+
if flows != nil {
for _, flow := range flows.ToRemove.Items {
log.Debug("Removing flow", log.Fields{"deviceId": device.Id, "flowToRemove": flow})
@@ -1207,7 +1210,10 @@
for _, flow := range flows.ToAdd.Items {
log.Debug("Adding flow", log.Fields{"deviceId": device.Id, "flowToAdd": flow})
- dh.flowMgr.AddFlow(ctx, flow, flowMetadata)
+ err := dh.flowMgr.AddFlow(ctx, flow, flowMetadata)
+ if err != nil {
+ errorsList = append(errorsList, err)
+ }
}
}
if groups != nil && flows != nil {
@@ -1219,15 +1225,24 @@
if groups != nil {
for _, group := range groups.ToAdd.Items {
- dh.flowMgr.AddGroup(ctx, group)
+ err := dh.flowMgr.AddGroup(ctx, group)
+ if err != nil {
+ errorsList = append(errorsList, err)
+ }
}
for _, group := range groups.ToUpdate.Items {
- dh.flowMgr.ModifyGroup(ctx, group)
+ err := dh.flowMgr.ModifyGroup(ctx, group)
+ if err != nil {
+ errorsList = append(errorsList, err)
+ }
}
if len(groups.ToRemove.Items) != 0 {
log.Debug("Group delete operation is not supported for now")
}
}
+ if len(errorsList) > 0 {
+ return fmt.Errorf("errors-installing-flows-groups, errors:%v", errorsList)
+ }
log.Debug("UpdateFlowsIncrementally done successfully")
return nil
}
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index e29eb0e..dd33760 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -22,6 +22,7 @@
"crypto/md5"
"encoding/hex"
"encoding/json"
+ "errors"
"fmt"
"math/big"
"strings"
@@ -1812,7 +1813,7 @@
// AddFlow add flow to device
// nolint: gocyclo
-func (f *OpenOltFlowMgr) AddFlow(ctx context.Context, flow *ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) {
+func (f *OpenOltFlowMgr) AddFlow(ctx context.Context, flow *ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) error {
classifierInfo := make(map[string]interface{})
actionInfo := make(map[string]interface{})
var UsMeterID uint32
@@ -1825,20 +1826,19 @@
if err != nil {
// Error logging is already done in the called function
// So just return in case of error
- return
+ return err
}
if flows.HasGroup(flow) {
// handle multicast flow
- f.handleFlowWithGroup(ctx, actionInfo, classifierInfo, flow)
- return
+ return f.handleFlowWithGroup(ctx, actionInfo, classifierInfo, flow)
}
/* Controller bound trap flows */
err = formulateControllerBoundTrapFlowInfo(actionInfo, classifierInfo, flow)
if err != nil {
// error if any, already logged in the called function
- return
+ return err
}
log.Infow("Flow ports", log.Fields{"classifierInfo_inport": classifierInfo[InPort], "action_output": actionInfo[Output]})
@@ -1847,8 +1847,7 @@
if ethType, ok := classifierInfo[EthType]; ok {
if ethType.(uint32) == LldpEthType {
log.Info("Adding LLDP flow")
- f.addLLDPFlow(ctx, flow, portNo)
- return
+ return f.addLLDPFlow(ctx, flow, portNo)
}
}
if ipProto, ok := classifierInfo[IPProto]; ok {
@@ -1856,16 +1855,14 @@
if udpSrc, ok := classifierInfo[UDPSrc]; ok {
if udpSrc.(uint32) == uint32(67) || udpSrc.(uint32) == uint32(546) {
log.Debug("trap-dhcp-from-nni-flow")
- f.addDHCPTrapFlowOnNNI(ctx, flow, classifierInfo, portNo)
- return
+ return f.addDHCPTrapFlowOnNNI(ctx, flow, classifierInfo, portNo)
}
}
}
}
if isIgmpTrapDownstreamFlow(classifierInfo) {
log.Debug("trap-igmp-from-nni-flow")
- f.addIgmpTrapFlowOnNNI(ctx, flow, classifierInfo, portNo)
- return
+ return f.addIgmpTrapFlowOnNNI(ctx, flow, classifierInfo, portNo)
}
f.deviceHandler.AddUniPortToOnu(intfID, onuID, portNo)
@@ -1874,7 +1871,7 @@
TpID, err := getTpIDFromFlow(flow)
if err != nil {
log.Error("metadata-is-not-present-invalid-flow-to-process", log.Fields{"pon": intfID, "onuID": onuID, "uniID": uniID})
- return
+ return fmt.Errorf("metadata-is-not-present-invalid-flow-to-process, pon:%v, onuID:%v, uniID:%v", intfID, onuID, uniID)
}
log.Debugw("TPID for this subcriber", log.Fields{"TpId": TpID, "pon": intfID, "onuID": onuID, "uniID": uniID})
if IsUpstream(actionInfo[Output].(uint32)) {
@@ -1899,9 +1896,11 @@
f.divideAndAddFlow(ctx, intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, uint32(TpID), UsMeterID, DsMeterID, flowMetadata)
case <-time.After(10 * time.Second):
- log.Errorw("pending flow deletes not completed after timeout", log.Fields{"pon": intfID, "onuID": onuID, "uniID": uniID})
+ log.Errorw("pending-flow-deletes-not-completed-after-timeout", log.Fields{"pon": intfID, "onuID": onuID, "uniID": uniID})
+ return fmt.Errorf("pending-flow-deletes-not-completed-after-timeout, pon:%v, onuID:%v, uniID:%v", intfID, onuID, uniID)
}
}
+ return nil
}
// handleFlowWithGroup adds multicast flow to the device.
@@ -1977,9 +1976,11 @@
group, _, err := f.GetFlowGroupFromKVStore(ctx, groupID, true)
if err == nil {
//calling groupAdd to set group members after multicast flow creation
- if f.ModifyGroup(ctx, group) {
+ if err = f.ModifyGroup(ctx, group); err == nil {
//cached group can be removed now
f.resourceMgr.RemoveFlowGroupFromKVStore(ctx, groupID, true)
+ } else {
+ return olterrors.NewErrGroupOp("modify", groupID, log.Fields{"group": group}, err).Log()
}
}
@@ -2007,11 +2008,11 @@
}
// AddGroup add or update the group
-func (f *OpenOltFlowMgr) AddGroup(ctx context.Context, group *ofp.OfpGroupEntry) {
+func (f *OpenOltFlowMgr) AddGroup(ctx context.Context, group *ofp.OfpGroupEntry) error {
log.Infow("add-group", log.Fields{"group": group})
if group == nil {
log.Warn("skipping nil group")
- return
+ return errors.New("group is nil")
}
groupToOlt := openoltpb2.Group{
@@ -2024,14 +2025,15 @@
_, err := f.deviceHandler.Client.PerformGroupOperation(ctx, &groupToOlt)
if err != nil {
log.Errorw("add-group operation failed", log.Fields{"err": err, "groupToOlt": groupToOlt})
- return
+ return fmt.Errorf("add-group operation failed, err %v, groupToOlt %v", err, groupToOlt)
}
// group members not created yet. So let's store the group
if err := f.resourceMgr.AddFlowGroupToKVStore(ctx, group, true); err != nil {
log.Errorw("Group cannot be stored in KV store", log.Fields{"groupId": group.Desc.GroupId, "err": err})
- } else {
- log.Debugw("add-group operation performed on the device successfully ", log.Fields{"groupToOlt": groupToOlt})
+ return fmt.Errorf("group cannot be stored in KV store, groupId %v, err %v", group.Desc.GroupId, err)
}
+ log.Debugw("add-group operation performed on the device successfully ", log.Fields{"groupToOlt": groupToOlt})
+ return nil
}
//buildGroupAction creates and returns a group action
@@ -2045,21 +2047,21 @@
}
// ModifyGroup updates the group
-func (f *OpenOltFlowMgr) ModifyGroup(ctx context.Context, group *ofp.OfpGroupEntry) bool {
+func (f *OpenOltFlowMgr) ModifyGroup(ctx context.Context, group *ofp.OfpGroupEntry) error {
log.Infow("modify-group", log.Fields{"group": group})
if group == nil || group.Desc == nil {
log.Warn("cannot modify group; group is nil")
- return false
+ return errors.New("cannot modify group; group is nil")
}
- new := f.buildGroup(group.Desc.GroupId, group.Desc.Buckets)
+ newGroup := f.buildGroup(group.Desc.GroupId, group.Desc.Buckets)
//get existing members of the group
val, groupExists, err := f.GetFlowGroupFromKVStore(ctx, group.Desc.GroupId, false)
if err != nil {
log.Errorw("Failed to retrieve the group from the store. Cannot modify group.",
log.Fields{"groupId": group.Desc.GroupId, "err": err})
- return false
+ return fmt.Errorf("failed to retrieve the group from the store. Cannot modify group. groupId:%v, err:%v", group.Desc.GroupId, err)
}
var current *openoltpb2.Group // represents the group on the device
@@ -2071,11 +2073,11 @@
current = f.buildGroup(group.Desc.GroupId, nil)
}
- log.Debugw("modify-group: comparing current and new.", log.Fields{"group on the device": current, "new": new})
+ log.Debugw("modify-group: comparing current and new.", log.Fields{"group on the device": current, "new": newGroup})
// get members to be added
- membersToBeAdded := f.findDiff(current, new)
+ membersToBeAdded := f.findDiff(current, newGroup)
// get members to be removed
- membersToBeRemoved := f.findDiff(new, current)
+ membersToBeRemoved := f.findDiff(newGroup, current)
log.Infow("modify-group -> differences found", log.Fields{"membersToBeAdded": membersToBeAdded,
"membersToBeRemoved": membersToBeRemoved, "groupId": group.Desc.GroupId})
@@ -2083,43 +2085,48 @@
groupToOlt := openoltpb2.Group{
GroupId: group.Desc.GroupId,
}
- var added, removed = true, true
+ var errAdd, errRemoved error
if membersToBeAdded != nil && len(membersToBeAdded) > 0 {
groupToOlt.Command = openoltpb2.Group_ADD_MEMBERS
groupToOlt.Members = membersToBeAdded
//execute addMembers
- added = f.callGroupAddRemove(&groupToOlt)
+ errAdd = f.callGroupAddRemove(&groupToOlt)
}
if membersToBeRemoved != nil && len(membersToBeRemoved) > 0 {
groupToOlt.Command = openoltpb2.Group_REMOVE_MEMBERS
groupToOlt.Members = membersToBeRemoved
//execute removeMembers
- removed = f.callGroupAddRemove(&groupToOlt)
+ errRemoved = f.callGroupAddRemove(&groupToOlt)
}
//save the modified group
- if added && removed {
+ if errAdd == nil && errRemoved == nil {
if err := f.resourceMgr.AddFlowGroupToKVStore(ctx, group, false); err != nil {
- log.Errorw("Failed to save the group into kv store", log.Fields{"groupId": group.Desc.GroupId})
+ log.Errorw("Failed to save the group into kv store", log.Fields{"groupId": group.Desc.GroupId, "error": err})
+ return fmt.Errorf("failed to save the group into kv store. groupId:%v, err:%v", group.Desc.GroupId, err)
}
log.Debugw("modify-group was success. Storing the group", log.Fields{"group": group, "existingGroup": current})
} else {
log.Warnw("One of the group add/remove operations has failed. Cannot save group modifications",
log.Fields{"group": group})
+ if errAdd != nil {
+ return errAdd
+ }
+ return errRemoved
}
- return added && removed
+ return nil
}
//callGroupAddRemove performs add/remove buckets operation for the indicated group
-func (f *OpenOltFlowMgr) callGroupAddRemove(group *openoltpb2.Group) bool {
+func (f *OpenOltFlowMgr) callGroupAddRemove(group *openoltpb2.Group) error {
if err := f.performGroupOperation(group); err != nil {
st, _ := status.FromError(err)
//ignore already exists error code
if st.Code() != codes.AlreadyExists {
- return false
+ return olterrors.NewErrGroupOp("groupAddRemove", group.GroupId, log.Fields{"status": st}, err)
}
}
- return true
+ return nil
}
//findDiff compares group members and finds members which only exists in groups2
diff --git a/internal/pkg/olterrors/olterrors.go b/internal/pkg/olterrors/olterrors.go
index 175b948..33ef71e 100644
--- a/internal/pkg/olterrors/olterrors.go
+++ b/internal/pkg/olterrors/olterrors.go
@@ -281,6 +281,37 @@
return e
}
+// ErrGroupOp represents an error condition when a group operation to a device did
+// not succeed
+type ErrGroupOp struct {
+ ErrAdapter
+}
+
+// NewErrGroupOp constructs a new error based on the given values
+func NewErrGroupOp(operation string, ID uint32, fields log.Fields, wrapped error) LoggableError {
+ return &ErrPersistence{
+ ErrAdapter{
+ name: "unable-to-perform-group-operation",
+ fields: merge(fields, log.Fields{
+ "operation": operation,
+ "id": fmt.Sprintf("0x%x", ID)}),
+ wrapped: wrapped,
+ },
+ }
+}
+
+// Log logs the error at the default level for log and return
+func (e *ErrGroupOp) Log() error {
+ _ = e.ErrAdapter.Log()
+ return e
+}
+
+// LogAt logs the error at the specified level and then returns the error
+func (e *ErrGroupOp) LogAt(level log.LogLevel) error {
+ _ = e.ErrAdapter.LogAt(level)
+ return e
+}
+
// ErrTimeout represents an error condition when the deadline for performing an
// operation has been exceeded
type ErrTimeout struct {