VOL-2293, VOL-2456 improve error handling

Change-Id: I4be5f12719a31b40363758cd47cc02968f180c75
diff --git a/adaptercore/openolt_flowmgr.go b/adaptercore/openolt_flowmgr.go
index c817455..5a4225a 100644
--- a/adaptercore/openolt_flowmgr.go
+++ b/adaptercore/openolt_flowmgr.go
@@ -22,7 +22,6 @@
 	"crypto/md5"
 	"encoding/hex"
 	"encoding/json"
-	"errors"
 	"fmt"
 	"math/big"
 	"sync"
@@ -268,8 +267,7 @@
 		log.Debug("multicast flow, shifting id")
 		return 0x2<<15 | uint64(flowID), nil
 	} else {
-		log.Debug("Unrecognized direction")
-		return 0, fmt.Errorf("unrecognized direction %s", direction)
+		return 0, NewErrInvalidValue(log.Fields{"direction": direction}, nil).Log()
 	}
 }
 
@@ -362,8 +360,10 @@
 			log.Debug("Scheduler already created for upstream")
 			return nil
 		}
-		log.Errorw("Dynamic meter update not supported", log.Fields{"KvStoreMeterId": KvStoreMeter.MeterId, "MeterID-in-flow": sq.meterID})
-		return errors.New("invalid-meter-id-in-flow")
+		return NewErrInvalidValue(log.Fields{
+			"unsupported":       "meter-id",
+			"kv-store-meter-id": KvStoreMeter.MeterId,
+			"meter-id-in-flow":  sq.meterID}, nil).Log()
 	}
 
 	log.Debugw("Meter-does-not-exist-Creating-new", log.Fields{"MeterID": sq.meterID, "Direction": Direction})
@@ -392,12 +392,17 @@
 		log.Error("Flow-metadata-is-not-present-in-flow")
 	}
 	if meterConfig == nil {
-		log.Errorw("Could-not-get-meterbands-from-flowMetadata", log.Fields{"flowMetadata": sq.flowMetadata,
-			"MeterID": sq.meterID})
-		return errors.New("failed-to-get-meter-from-flowMetadata")
+		return NewErrNotFound("meterbands", log.Fields{
+			"reason":        "Could-not-get-meterbands-from-flowMetadata",
+			"flow-metadata": sq.flowMetadata,
+			"meter-id":      sq.meterID}, nil).Log()
 	} else if len(meterConfig.Bands) < MaxMeterBand {
 		log.Errorw("Invalid-number-of-bands-in-meter", log.Fields{"Bands": meterConfig.Bands, "MeterID": sq.meterID})
-		return errors.New("invalid-number-of-bands-in-meter")
+		return NewErrInvalidValue(log.Fields{
+			"reason":          "Invalid-number-of-bands-in-meter",
+			"meterband-count": len(meterConfig.Bands),
+			"metabands":       meterConfig.Bands,
+			"meter-id":        sq.meterID}, nil).Log()
 	}
 	cir := meterConfig.Bands[0].Rate
 	cbs := meterConfig.Bands[0].BurstSize
@@ -652,9 +657,10 @@
 	}
 	//Make sure we have as many tech_profiles as there are pon ports on the device
 	if tpCount != int(f.resourceMgr.DevInfo.GetPonPorts()) {
-		log.Errorw("Error while populating techprofile",
-			log.Fields{"numofTech": tpCount, "numPonPorts": f.resourceMgr.DevInfo.GetPonPorts()})
-		return errors.New("error while populating techprofile mgrs")
+		return NewErrInvalidValue(log.Fields{
+			"reason":             "TP count does not match number of PON ports",
+			"tech-profile-count": tpCount,
+			"pon-port-count":     f.resourceMgr.DevInfo.GetPonPorts()}, nil).Log()
 	}
 	log.Infow("Populated techprofile for ponports successfully",
 		log.Fields{"numofTech": tpCount, "numPonPorts": f.resourceMgr.DevInfo.GetPonPorts()})
@@ -664,10 +670,10 @@
 func (f *OpenOltFlowMgr) addUpstreamDataFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32,
 	portNo uint32, uplinkClassifier map[string]interface{},
 	uplinkAction map[string]interface{}, logicalFlow *ofp.OfpFlowStats,
-	allocID uint32, gemportID uint32) {
+	allocID uint32, gemportID uint32) error {
 	uplinkClassifier[PacketTagType] = SingleTag
 	log.Debugw("Adding upstream data flow", log.Fields{"uplinkClassifier": uplinkClassifier, "uplinkAction": uplinkAction})
-	f.addHSIAFlow(ctx, intfID, onuID, uniID, portNo, uplinkClassifier, uplinkAction,
+	return f.addHSIAFlow(ctx, intfID, onuID, uniID, portNo, uplinkClassifier, uplinkAction,
 		Upstream, logicalFlow, allocID, gemportID)
 	/* TODO: Install Secondary EAP on the subscriber vlan */
 }
@@ -675,7 +681,7 @@
 func (f *OpenOltFlowMgr) addDownstreamDataFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32,
 	portNo uint32, downlinkClassifier map[string]interface{},
 	downlinkAction map[string]interface{}, logicalFlow *ofp.OfpFlowStats,
-	allocID uint32, gemportID uint32) {
+	allocID uint32, gemportID uint32) error {
 	downlinkClassifier[PacketTagType] = DoubleTag
 	log.Debugw("Adding downstream data flow", log.Fields{"downlinkClassifier": downlinkClassifier,
 		"downlinkAction": downlinkAction})
@@ -685,7 +691,7 @@
 			if metadata, exists := downlinkClassifier[Metadata]; exists { // inport is filled in metadata by core
 				if uint32(metadata.(uint64)) == MkUniPortNum(intfID, onuID, uniID) {
 					log.Infow("Ignoring DL trap device flow from core", log.Fields{"flow": logicalFlow})
-					return
+					return nil
 				}
 			}
 		}
@@ -698,18 +704,18 @@
 	if ok {
 		downlinkAction[VlanVid] = dlClVid & 0xfff
 	} else {
-		log.Error("dl-classifier-vid-type-conversion-failed")
-		return
+		return NewErrInvalidValue(log.Fields{
+			"reason":  "failed to convert VLANID classifier",
+			"vlan-id": VlanVid}, nil).Log()
 	}
 
-	f.addHSIAFlow(ctx, intfID, onuID, uniID, portNo, downlinkClassifier, downlinkAction,
+	return f.addHSIAFlow(ctx, intfID, onuID, uniID, portNo, downlinkClassifier, downlinkAction,
 		Downstream, logicalFlow, allocID, gemportID)
 }
 
 func (f *OpenOltFlowMgr) addHSIAFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, portNo uint32, classifier map[string]interface{},
 	action map[string]interface{}, direction string, logicalFlow *ofp.OfpFlowStats,
-	allocID uint32, gemPortID uint32) {
-	var networkIntfID uint32
+	allocID uint32, gemPortID uint32) error {
 	/* One of the OLT platform (Broadcom BAL) requires that symmetric
 	   flows require the same flow_id to be used across UL and DL.
 	   Since HSIA flow is the only symmetric flow currently, we need to
@@ -724,33 +730,35 @@
 	if _, ok := classifier[VlanPcp]; ok {
 		vlanPbit = classifier[VlanPcp].(uint32)
 		log.Debugw("Found pbit in the flow", log.Fields{"VlanPbit": vlanPbit})
+	} else {
+		log.Debugw("bpit-not-found-in-flow", log.Fields{"vlan-pcp": VlanPcp})
 	}
 	flowStoreCookie := getFlowStoreCookie(classifier, gemPortID)
 	if present := f.resourceMgr.IsFlowCookieOnKVStore(ctx, uint32(intfID), int32(onuID), int32(uniID), flowStoreCookie); present {
-		log.Debug("Flow-exists--not-re-adding")
-		return
+		log.Debug("flow-already-exists")
+		return nil
 	}
 	flowID, err := f.resourceMgr.GetFlowID(ctx, intfID, int32(onuID), int32(uniID), gemPortID, flowStoreCookie, HsiaFlow, vlanPbit)
 	if err != nil {
-		log.Errorw("Flow id unavailable for HSIA flow", log.Fields{"direction": direction})
-		return
+		return NewErrNotFound("hsia-flow-id", log.Fields{"direction": direction}, err).Log()
 	}
-	var classifierProto *openoltpb2.Classifier
-	var actionProto *openoltpb2.Action
-	if classifierProto = makeOpenOltClassifierField(classifier); classifierProto == nil {
-		log.Error("Error in making classifier protobuf for hsia flow")
-		return
+	classifierProto, err := makeOpenOltClassifierField(classifier)
+	if err != nil {
+		return NewErrInvalidValue(log.Fields{"classifier": classifier}, err).Log()
 	}
 	log.Debugw("Created classifier proto", log.Fields{"classifier": *classifierProto})
-	if actionProto = makeOpenOltActionField(action); actionProto == nil {
-		log.Errorw("Error in making action protobuf for hsia flow", log.Fields{"direction": direction})
-		return
+	actionProto, err := makeOpenOltActionField(action)
+	if err != nil {
+		return NewErrInvalidValue(log.Fields{"action": action}, err).Log()
 	}
 	log.Debugw("Created action proto", log.Fields{"action": *actionProto})
-	networkIntfID, err = getNniIntfID(classifier, action)
+	networkIntfID, err := getNniIntfID(classifier, action)
 	if err != nil {
-		log.Error("Failed to get nniIntf ID")
-		return
+		return NewErrNotFound("nni-interface-id",
+			log.Fields{
+				"classifier": classifier,
+				"action":     action,
+			}, err).Log()
 	}
 	flow := openoltpb2.Flow{AccessIntfId: int32(intfID),
 		OnuId:         int32(onuID),
@@ -765,29 +773,28 @@
 		Priority:      int32(logicalFlow.Priority),
 		Cookie:        logicalFlow.Cookie,
 		PortNo:        portNo}
-	if ok := f.addFlowToDevice(ctx, logicalFlow, &flow); ok {
-		log.Debug("HSIA flow added to device successfully", log.Fields{"direction": direction})
-		flowsToKVStore := f.getUpdatedFlowInfo(ctx, &flow, flowStoreCookie, HsiaFlow, flowID, logicalFlow.Id)
-		if err := f.updateFlowInfoToKVStore(ctx, flow.AccessIntfId,
-			flow.OnuId,
-			flow.UniId,
-			flow.FlowId /*flowCategory,*/, flowsToKVStore); err != nil {
-			log.Errorw("Error uploading HSIA  flow into KV store", log.Fields{"flow": flow, "direction": direction, "error": err})
-			return
-		}
+	if err := f.addFlowToDevice(ctx, logicalFlow, &flow); err != nil {
+		return NewErrFlowOp("add", flowID, nil, err).Log()
 	}
+	log.Debug("HSIA flow added to device successfully", log.Fields{"direction": direction})
+	flowsToKVStore := f.getUpdatedFlowInfo(ctx, &flow, flowStoreCookie, HsiaFlow, flowID, logicalFlow.Id)
+	if err := f.updateFlowInfoToKVStore(ctx, flow.AccessIntfId,
+		flow.OnuId,
+		flow.UniId,
+		flow.FlowId /*flowCategory,*/, flowsToKVStore); err != nil {
+		return NewErrPersistence("update", "flow", flowID, log.Fields{"flow": flow}, err).Log()
+	}
+	return nil
 }
 
-func (f *OpenOltFlowMgr) addDHCPTrapFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, portNo uint32, classifier map[string]interface{}, action map[string]interface{}, logicalFlow *ofp.OfpFlowStats, allocID uint32, gemPortID uint32) {
+func (f *OpenOltFlowMgr) addDHCPTrapFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, portNo uint32, classifier map[string]interface{}, action map[string]interface{}, logicalFlow *ofp.OfpFlowStats, allocID uint32, gemPortID uint32) error {
 
-	var dhcpFlow openoltpb2.Flow
-	var actionProto *openoltpb2.Action
-	var classifierProto *openoltpb2.Classifier
-	var flowID uint32
 	networkIntfID, err := getNniIntfID(classifier, action)
 	if err != nil {
-		log.Error("Failed to get nniIntf ID")
-		return
+		return NewErrNotFound("nni-interface-id", log.Fields{
+			"classifier": classifier,
+			"action":     action},
+			err).Log()
 	}
 
 	// Clear the action map
@@ -804,29 +811,32 @@
 	flowStoreCookie := getFlowStoreCookie(classifier, gemPortID)
 	if present := f.resourceMgr.IsFlowCookieOnKVStore(ctx, uint32(intfID), int32(onuID), int32(uniID), flowStoreCookie); present {
 		log.Debug("Flow-exists--not-re-adding")
-		return
+		return nil
 	}
 
-	flowID, err = f.resourceMgr.GetFlowID(ctx, intfID, int32(onuID), int32(uniID), gemPortID, flowStoreCookie, DhcpFlow, 0 /*classifier[VLAN_PCP].(uint32)*/)
+	flowID, err := f.resourceMgr.GetFlowID(ctx, intfID, int32(onuID), int32(uniID), gemPortID, flowStoreCookie, DhcpFlow, 0 /*classifier[VLAN_PCP].(uint32)*/)
 
 	if err != nil {
-		log.Errorw("flowId unavailable for UL DHCP", log.Fields{"intfId": intfID, "onuId": onuID, "flowStoreCookie": flowStoreCookie})
-		return
+		return NewErrNotFound("flow", log.Fields{
+			"interface-id": intfID,
+			"gem-port":     gemPortID,
+			"cookie":       flowStoreCookie},
+			err).Log()
 	}
 
 	log.Debugw("Creating UL DHCP flow", log.Fields{"ul_classifier": classifier, "ul_action": action, "uplinkFlowId": flowID})
 
-	if classifierProto = makeOpenOltClassifierField(classifier); classifierProto == nil {
-		log.Error("Error in making classifier protobuf for ul flow")
-		return
+	classifierProto, err := makeOpenOltClassifierField(classifier)
+	if err != nil {
+		return NewErrInvalidValue(log.Fields{"classifier": classifier}, err).Log()
 	}
 	log.Debugw("Created classifier proto", log.Fields{"classifier": *classifierProto})
-	if actionProto = makeOpenOltActionField(action); actionProto == nil {
-		log.Error("Error in making action protobuf for ul flow")
-		return
+	actionProto, err := makeOpenOltActionField(action)
+	if err != nil {
+		return NewErrInvalidValue(log.Fields{"action": action}, err).Log()
 	}
 
-	dhcpFlow = openoltpb2.Flow{AccessIntfId: int32(intfID),
+	dhcpFlow := openoltpb2.Flow{AccessIntfId: int32(intfID),
 		OnuId:         int32(onuID),
 		UniId:         int32(uniID),
 		FlowId:        flowID,
@@ -840,39 +850,37 @@
 		Cookie:        logicalFlow.Cookie,
 		PortNo:        portNo}
 
-	if ok := f.addFlowToDevice(ctx, logicalFlow, &dhcpFlow); ok {
-		log.Debug("DHCP UL flow added to device successfully")
-		flowsToKVStore := f.getUpdatedFlowInfo(ctx, &dhcpFlow, flowStoreCookie, "DHCP", flowID, logicalFlow.Id)
-		if err := f.updateFlowInfoToKVStore(ctx, dhcpFlow.AccessIntfId,
-			dhcpFlow.OnuId,
-			dhcpFlow.UniId,
-			dhcpFlow.FlowId, flowsToKVStore); err != nil {
-			log.Errorw("Error uploading DHCP UL flow into KV store", log.Fields{"flow": dhcpFlow, "error": err})
-			return
-		}
+	if err := f.addFlowToDevice(ctx, logicalFlow, &dhcpFlow); err != nil {
+		return NewErrFlowOp("add", flowID, log.Fields{"dhcp-flow": dhcpFlow}, err).Log()
+	}
+	log.Debug("DHCP UL flow added to device successfully")
+	flowsToKVStore := f.getUpdatedFlowInfo(ctx, &dhcpFlow, flowStoreCookie, "DHCP", flowID, logicalFlow.Id)
+	if err := f.updateFlowInfoToKVStore(ctx, dhcpFlow.AccessIntfId,
+		dhcpFlow.OnuId,
+		dhcpFlow.UniId,
+		dhcpFlow.FlowId, flowsToKVStore); err != nil {
+		return NewErrPersistence("update", "flow", dhcpFlow.FlowId, log.Fields{"flow": dhcpFlow}, err).Log()
 	}
 
-	return
+	return nil
 }
 
 //addIGMPTrapFlow creates IGMP trap-to-host flow
 func (f *OpenOltFlowMgr) addIGMPTrapFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, portNo uint32, classifier map[string]interface{},
-	action map[string]interface{}, logicalFlow *ofp.OfpFlowStats, allocID uint32, gemPortID uint32) {
-	f.addUpstreamTrapFlow(ctx, intfID, onuID, uniID, portNo, classifier, action, logicalFlow, allocID, gemPortID, IgmpFlow)
+	action map[string]interface{}, logicalFlow *ofp.OfpFlowStats, allocID uint32, gemPortID uint32) error {
+	return f.addUpstreamTrapFlow(ctx, intfID, onuID, uniID, portNo, classifier, action, logicalFlow, allocID, gemPortID, IgmpFlow)
 }
 
 //addUpstreamTrapFlow creates a trap-to-host flow
 func (f *OpenOltFlowMgr) addUpstreamTrapFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, portNo uint32, classifier map[string]interface{},
-	action map[string]interface{}, logicalFlow *ofp.OfpFlowStats, allocID uint32, gemPortID uint32, flowType string) {
-
-	var flow openoltpb2.Flow
-	var actionProto *openoltpb2.Action
-	var classifierProto *openoltpb2.Classifier
+	action map[string]interface{}, logicalFlow *ofp.OfpFlowStats, allocID uint32, gemPortID uint32, flowType string) error {
 
 	networkIntfID, err := getNniIntfID(classifier, action)
 	if err != nil {
-		log.Error("Failed to get nniIntf ID")
-		return
+		return NewErrNotFound("nni-interface-id", log.Fields{
+			"classifier": classifier,
+			"action":     action},
+			err).Log()
 	}
 
 	// Clear the action map
@@ -886,30 +894,34 @@
 
 	flowStoreCookie := getFlowStoreCookie(classifier, gemPortID)
 	if present := f.resourceMgr.IsFlowCookieOnKVStore(ctx, uint32(networkIntfID), int32(onuID), int32(uniID), flowStoreCookie); present {
-		log.Debug("Flow-exists--not-re-adding")
-		return
+		log.Debug("Flow-exists-not-re-adding")
+		return nil
 	}
 
 	flowID, err := f.resourceMgr.GetFlowID(ctx, intfID, int32(onuID), int32(uniID), gemPortID, flowStoreCookie, flowType, 0, 0 /*classifier[VLAN_PCP].(uint32)*/)
 
 	if err != nil {
-		log.Errorw("flowId unavailable for upstream trap flow", log.Fields{"intfId": intfID, "onuId": onuID, "flowStoreCookie": flowStoreCookie, "flowType": flowType})
-		return
+		return NewErrNotFound("flow-id", log.Fields{
+			"interface-id": intfID,
+			"oni-id":       onuID,
+			"cookie":       flowStoreCookie,
+			"flow-type":    flowType},
+			err).Log()
 	}
 
 	log.Debugw("Creating upstream trap flow", log.Fields{"ul_classifier": classifier, "ul_action": action, "uplinkFlowId": flowID, "flowType": flowType})
 
-	if classifierProto = makeOpenOltClassifierField(classifier); classifierProto == nil {
-		log.Error("Error in making classifier protobuf for ul flow")
-		return
+	classifierProto, err := makeOpenOltClassifierField(classifier)
+	if err != nil {
+		return NewErrInvalidValue(log.Fields{"classifier": classifier}, err).Log()
 	}
 	log.Debugw("Created classifier proto", log.Fields{"classifier": *classifierProto})
-	if actionProto = makeOpenOltActionField(action); actionProto == nil {
-		log.Error("Error in making action protobuf for ul flow")
-		return
+	actionProto, err := makeOpenOltActionField(action)
+	if err != nil {
+		return NewErrInvalidValue(log.Fields{"action": action}, err).Log()
 	}
 
-	flow = openoltpb2.Flow{AccessIntfId: int32(intfID),
+	flow := openoltpb2.Flow{AccessIntfId: int32(intfID),
 		OnuId:         int32(onuID),
 		UniId:         int32(uniID),
 		FlowId:        flowID,
@@ -923,32 +935,29 @@
 		Cookie:        logicalFlow.Cookie,
 		PortNo:        portNo}
 
-	if ok := f.addFlowToDevice(ctx, logicalFlow, &flow); ok {
-		log.Debugf("%s UL flow added to device successfully", flowType)
+	if err := f.addFlowToDevice(ctx, logicalFlow, &flow); err != nil {
+		return NewErrFlowOp("add", flowID, log.Fields{"flow": flow}, err).Log()
+	}
+	log.Debugf("%s UL flow added to device successfully", flowType)
 
-		flowsToKVStore := f.getUpdatedFlowInfo(ctx, &flow, flowStoreCookie, flowType, flowID, logicalFlow.Id)
-		if err := f.updateFlowInfoToKVStore(ctx, flow.AccessIntfId,
-			flow.OnuId,
-			flow.UniId,
-			flow.FlowId, flowsToKVStore); err != nil {
-			log.Errorw("Error uploading UL flow into KV store", log.Fields{"flow": flow, "error": err})
-			return
-		}
+	flowsToKVStore := f.getUpdatedFlowInfo(ctx, &flow, flowStoreCookie, flowType, flowID, logicalFlow.Id)
+	if err := f.updateFlowInfoToKVStore(ctx, flow.AccessIntfId,
+		flow.OnuId,
+		flow.UniId,
+		flow.FlowId, flowsToKVStore); err != nil {
+		return NewErrPersistence("update", "flow", flow.FlowId, log.Fields{"flow": flow}, err).Log()
 	}
 
-	return
+	return nil
 }
 
 // Add EAPOL flow to  device with mac, vlanId as classifier for upstream and downstream
-func (f *OpenOltFlowMgr) addEAPOLFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, portNo uint32, logicalFlow *ofp.OfpFlowStats, allocID uint32, gemPortID uint32, vlanID uint32, classifier map[string]interface{}, action map[string]interface{}) {
+func (f *OpenOltFlowMgr) addEAPOLFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, portNo uint32, logicalFlow *ofp.OfpFlowStats, allocID uint32, gemPortID uint32, vlanID uint32, classifier map[string]interface{}, action map[string]interface{}) error {
 	log.Debugw("Adding EAPOL to device", log.Fields{"intfId": intfID, "onuId": onuID, "portNo": portNo, "allocId": allocID, "gemPortId": gemPortID, "vlanId": vlanID, "flow": logicalFlow})
 
 	uplinkClassifier := make(map[string]interface{})
 	uplinkAction := make(map[string]interface{})
 
-	var upstreamFlow openoltpb2.Flow
-	var networkIntfID uint32
-
 	// Fill Classfier
 	uplinkClassifier[EthType] = uint32(EapEthType)
 	uplinkClassifier[PacketTagType] = SingleTag
@@ -957,36 +966,39 @@
 	uplinkAction[TrapToHost] = true
 	flowStoreCookie := getFlowStoreCookie(uplinkClassifier, gemPortID)
 	if present := f.resourceMgr.IsFlowCookieOnKVStore(ctx, uint32(intfID), int32(onuID), int32(uniID), flowStoreCookie); present {
-		log.Debug("Flow-exists--not-re-adding")
-		return
+		log.Debug("Flow-exists-not-re-adding")
+		return nil
 	}
 	//Add Uplink EAPOL Flow
 	uplinkFlowID, err := f.resourceMgr.GetFlowID(ctx, intfID, int32(onuID), int32(uniID), gemPortID, flowStoreCookie, "", 0)
 	if err != nil {
-		log.Errorw("flowId unavailable for UL EAPOL", log.Fields{"intfId": intfID, "onuId": onuID, "flowStoreCookie": flowStoreCookie})
-		return
+		return NewErrNotFound("flow-id", log.Fields{
+			"interface-id": intfID,
+			"onu-id":       onuID,
+			"coookie":      flowStoreCookie},
+			err).Log()
 	}
-	var classifierProto *openoltpb2.Classifier
-	var actionProto *openoltpb2.Action
 	log.Debugw("Creating UL EAPOL flow", log.Fields{"ul_classifier": uplinkClassifier, "ul_action": uplinkAction, "uplinkFlowId": uplinkFlowID})
 
-	if classifierProto = makeOpenOltClassifierField(uplinkClassifier); classifierProto == nil {
-		log.Error("Error in making classifier protobuf for ul flow")
-		return
+	classifierProto, err := makeOpenOltClassifierField(uplinkClassifier)
+	if err != nil {
+		return NewErrInvalidValue(log.Fields{"classifier": uplinkClassifier}, err).Log()
 	}
 	log.Debugw("Created classifier proto", log.Fields{"classifier": *classifierProto})
-	if actionProto = makeOpenOltActionField(uplinkAction); actionProto == nil {
-		log.Error("Error in making action protobuf for ul flow")
-		return
+	actionProto, err := makeOpenOltActionField(uplinkAction)
+	if err != nil {
+		return NewErrInvalidValue(log.Fields{"action": uplinkAction}, err).Log()
 	}
 	log.Debugw("Created action proto", log.Fields{"action": *actionProto})
-	networkIntfID, err = getNniIntfID(classifier, action)
+	networkIntfID, err := getNniIntfID(classifier, action)
 	if err != nil {
-		log.Error("Failed to get nniIntf ID")
-		return
+		return NewErrNotFound("nni-interface-id", log.Fields{
+			"classifier": classifier,
+			"action":     action},
+			err).Log()
 	}
 
-	upstreamFlow = openoltpb2.Flow{AccessIntfId: int32(intfID),
+	upstreamFlow := openoltpb2.Flow{AccessIntfId: int32(intfID),
 		OnuId:         int32(onuID),
 		UniId:         int32(uniID),
 		FlowId:        uplinkFlowID,
@@ -999,25 +1011,26 @@
 		Priority:      int32(logicalFlow.Priority),
 		Cookie:        logicalFlow.Cookie,
 		PortNo:        portNo}
-	if ok := f.addFlowToDevice(ctx, logicalFlow, &upstreamFlow); ok {
-		log.Debug("EAPOL UL flow added to device successfully")
-		flowCategory := "EAPOL"
-		flowsToKVStore := f.getUpdatedFlowInfo(ctx, &upstreamFlow, flowStoreCookie, flowCategory, uplinkFlowID, logicalFlow.Id)
-		if err := f.updateFlowInfoToKVStore(ctx, upstreamFlow.AccessIntfId,
-			upstreamFlow.OnuId,
-			upstreamFlow.UniId,
-			upstreamFlow.FlowId,
-			/* lowCategory, */
-			flowsToKVStore); err != nil {
-			log.Errorw("Error uploading EAPOL UL flow into KV store", log.Fields{"flow": upstreamFlow, "error": err})
-			return
-		}
+	if err := f.addFlowToDevice(ctx, logicalFlow, &upstreamFlow); err != nil {
+		return NewErrFlowOp("add", uplinkFlowID, log.Fields{"flow": upstreamFlow}, err).Log()
+	}
+	log.Debug("EAPOL UL flow added to device successfully")
+	flowCategory := "EAPOL"
+	flowsToKVStore := f.getUpdatedFlowInfo(ctx, &upstreamFlow, flowStoreCookie, flowCategory, uplinkFlowID, logicalFlow.Id)
+	if err := f.updateFlowInfoToKVStore(ctx, upstreamFlow.AccessIntfId,
+		upstreamFlow.OnuId,
+		upstreamFlow.UniId,
+		upstreamFlow.FlowId,
+		/* lowCategory, */
+		flowsToKVStore); err != nil {
+		return NewErrPersistence("update", "flow", upstreamFlow.FlowId, log.Fields{"flow": upstreamFlow}, err).Log()
 	}
 
 	log.Debugw("Added EAPOL flows to device successfully", log.Fields{"flow": logicalFlow})
+	return nil
 }
 
-func makeOpenOltClassifierField(classifierInfo map[string]interface{}) *openoltpb2.Classifier {
+func makeOpenOltClassifierField(classifierInfo map[string]interface{}) (*openoltpb2.Classifier, error) {
 	var classifier openoltpb2.Classifier
 
 	classifier.EthType, _ = classifierInfo[EthType].(uint32)
@@ -1054,14 +1067,13 @@
 		case DoubleTag:
 		case Untagged:
 		default:
-			log.Error("Invalid tag type in classifier") // should not hit
-			return nil
+			return nil, NewErrInvalidValue(log.Fields{"packet-tag-type": pktTagType}, nil).Log()
 		}
 	}
-	return &classifier
+	return &classifier, nil
 }
 
-func makeOpenOltActionField(actionInfo map[string]interface{}) *openoltpb2.Action {
+func makeOpenOltActionField(actionInfo map[string]interface{}) (*openoltpb2.Action, error) {
 	var actionCmd openoltpb2.ActionCmd
 	var action openoltpb2.Action
 	action.Cmd = &actionCmd
@@ -1074,10 +1086,9 @@
 	} else if _, ok := actionInfo[TrapToHost]; ok {
 		action.Cmd.TrapToHost = actionInfo[TrapToHost].(bool)
 	} else {
-		log.Errorw("Invalid-action-field", log.Fields{"action": actionInfo})
-		return nil
+		return nil, NewErrInvalidValue(log.Fields{"action-command": actionInfo}, nil).Log()
 	}
-	return &action
+	return &action, nil
 }
 
 func (f *OpenOltFlowMgr) getTPpath(intfID uint32, uni string, TpID uint32) string {
@@ -1194,7 +1205,7 @@
 	return nil
 }
 
-func (f *OpenOltFlowMgr) addFlowToDevice(ctx context.Context, logicalFlow *ofp.OfpFlowStats, deviceFlow *openoltpb2.Flow) bool {
+func (f *OpenOltFlowMgr) addFlowToDevice(ctx context.Context, logicalFlow *ofp.OfpFlowStats, deviceFlow *openoltpb2.Flow) error {
 
 	var intfID uint32
 	/* For flows which trap out of the NNI, the AccessIntfId is invalid
@@ -1213,37 +1224,37 @@
 	st, _ := status.FromError(err)
 	if st.Code() == codes.AlreadyExists {
 		log.Debug("Flow already exists", log.Fields{"err": err, "deviceFlow": deviceFlow})
-		return true
+		return nil
 	}
 
 	if err != nil {
 		log.Errorw("Failed to Add flow to device", log.Fields{"err": err, "deviceFlow": deviceFlow})
 		f.resourceMgr.FreeFlowID(ctx, intfID, deviceFlow.OnuId, deviceFlow.UniId, deviceFlow.FlowId)
-		return false
+		return err
 	}
 	if deviceFlow.GemportId != -1 {
 		// No need to register the flow if it is a trap on nni flow.
 		f.registerFlow(ctx, logicalFlow, deviceFlow)
 	}
 	log.Debugw("Flow added to device successfully ", log.Fields{"flow": *deviceFlow})
-	return true
+	return nil
 }
 
-func (f *OpenOltFlowMgr) removeFlowFromDevice(deviceFlow *openoltpb2.Flow) bool {
+func (f *OpenOltFlowMgr) removeFlowFromDevice(deviceFlow *openoltpb2.Flow) error {
 	log.Debugw("Sending flow to device via grpc", log.Fields{"flow": *deviceFlow})
 	_, err := f.deviceHandler.Client.FlowRemove(context.Background(), deviceFlow)
 	if err != nil {
 		if f.deviceHandler.device.ConnectStatus == common.ConnectStatus_UNREACHABLE {
 			log.Warnw("Can not remove flow from device since it's unreachable", log.Fields{"err": err, "deviceFlow": deviceFlow})
 			//Assume the flow is removed
-			return true
+			return nil
 		}
 		log.Errorw("Failed to Remove flow from device", log.Fields{"err": err, "deviceFlow": deviceFlow})
-		return false
+		return err
 
 	}
 	log.Debugw("Flow removed from device successfully ", log.Fields{"flow": *deviceFlow})
-	return true
+	return nil
 }
 
 /*func register_flow(deviceFlow *openolt_pb2.Flow, logicalFlow *ofp.OfpFlowStats){
@@ -1266,7 +1277,7 @@
 
 */
 
-func (f *OpenOltFlowMgr) addLLDPFlow(ctx context.Context, flow *ofp.OfpFlowStats, portNo uint32) {
+func (f *OpenOltFlowMgr) addLLDPFlow(ctx context.Context, flow *ofp.OfpFlowStats, portNo uint32) error {
 
 	classifierInfo := make(map[string]interface{})
 	actionInfo := make(map[string]interface{})
@@ -1291,28 +1302,34 @@
 	var uniID = -1
 	var gemPortID = -1
 
-	var networkInterfaceID = IntfIDFromNniPortNum(portNo)
+	networkInterfaceID, err := IntfIDFromNniPortNum(portNo)
+	if err != nil {
+		return NewErrInvalidValue(log.Fields{"nni-port-number": portNo}, err).Log()
+	}
 	var flowStoreCookie = getFlowStoreCookie(classifierInfo, uint32(0))
 	if present := f.resourceMgr.IsFlowCookieOnKVStore(ctx, uint32(networkInterfaceID), int32(onuID), int32(uniID), flowStoreCookie); present {
 		log.Debug("Flow-exists--not-re-adding")
-		return
+		return nil
 	}
 	flowID, err := f.resourceMgr.GetFlowID(ctx, uint32(networkInterfaceID), int32(onuID), int32(uniID), uint32(gemPortID), flowStoreCookie, "", 0)
 
 	if err != nil {
-		log.Errorw("Flow id unavailable for LLDP traponNNI flow", log.Fields{"error": err})
-		return
+		return NewErrNotFound("flow-id", log.Fields{
+			"interface-id": networkInterfaceID,
+			"onu-id":       onuID,
+			"uni-id":       uniID,
+			"gem-port-id":  gemPortID,
+			"cookie":       flowStoreCookie},
+			err).Log()
 	}
-	var classifierProto *openoltpb2.Classifier
-	var actionProto *openoltpb2.Action
-	if classifierProto = makeOpenOltClassifierField(classifierInfo); classifierProto == nil {
-		log.Error("Error in making classifier protobuf for  LLDP trap on nni flow")
-		return
+	classifierProto, err := makeOpenOltClassifierField(classifierInfo)
+	if err != nil {
+		return NewErrInvalidValue(log.Fields{"classifier": classifierInfo}, err).Log()
 	}
 	log.Debugw("Created classifier proto", log.Fields{"classifier": *classifierProto})
-	if actionProto = makeOpenOltActionField(actionInfo); actionProto == nil {
-		log.Error("Error in making action protobuf for LLDP trap on nni flow")
-		return
+	actionProto, err := makeOpenOltActionField(actionInfo)
+	if err != nil {
+		return NewErrInvalidValue(log.Fields{"action": actionInfo}, err).Log()
 	}
 	log.Debugw("Created action proto", log.Fields{"action": *actionProto})
 
@@ -1328,17 +1345,18 @@
 		Priority:      int32(flow.Priority),
 		Cookie:        flow.Cookie,
 		PortNo:        portNo}
-	if ok := f.addFlowToDevice(ctx, flow, &downstreamflow); ok {
-		log.Debug("LLDP trap on NNI flow added to device successfully")
-		flowsToKVStore := f.getUpdatedFlowInfo(ctx, &downstreamflow, flowStoreCookie, "", flowID, flow.Id)
-		if err := f.updateFlowInfoToKVStore(ctx, int32(networkInterfaceID),
-			int32(onuID),
-			int32(uniID),
-			flowID, flowsToKVStore); err != nil {
-			log.Errorw("Error uploading LLDP flow into KV store", log.Fields{"flow": downstreamflow, "error": err})
-		}
+	if err := f.addFlowToDevice(ctx, flow, &downstreamflow); err != nil {
+		return NewErrFlowOp("add", flowID, log.Fields{"flow": downstreamflow}, err).Log()
 	}
-	return
+	log.Debug("LLDP trap on NNI flow added to device successfully")
+	flowsToKVStore := f.getUpdatedFlowInfo(ctx, &downstreamflow, flowStoreCookie, "", flowID, flow.Id)
+	if err := f.updateFlowInfoToKVStore(ctx, int32(networkInterfaceID),
+		int32(onuID),
+		int32(uniID),
+		flowID, flowsToKVStore); err != nil {
+		return NewErrPersistence("update", "flow", flowID, log.Fields{"flow": downstreamflow}, err).Log()
+	}
+	return nil
 }
 
 func getUniPortPath(intfID uint32, onuID int32, uniID int32) string {
@@ -1349,10 +1367,12 @@
 func (f *OpenOltFlowMgr) getOnuChildDevice(intfID uint32, onuID uint32) (*voltha.Device, error) {
 	log.Debugw("GetChildDevice", log.Fields{"pon port": intfID, "onuId": onuID})
 	parentPortNo := IntfIDToPortNo(intfID, voltha.Port_PON_OLT)
-	onuDevice := f.deviceHandler.GetChildDevice(parentPortNo, onuID)
-	if onuDevice == nil {
-		log.Errorw("onu not found", log.Fields{"intfId": parentPortNo, "onuId": onuID})
-		return nil, errors.New("onu not found")
+	onuDevice, err := f.deviceHandler.GetChildDevice(parentPortNo, onuID)
+	if err != nil {
+		return nil, NewErrNotFound("onu", log.Fields{
+			"interface-id": parentPortNo,
+			"onu-id":       onuID},
+			err).Log()
 	}
 	log.Debugw("Successfully received child device from core", log.Fields{"child_device": *onuDevice})
 	return onuDevice, nil
@@ -1575,6 +1595,7 @@
 	return nil
 }
 
+// nolint: gocyclo
 func (f *OpenOltFlowMgr) clearFlowFromResourceManager(ctx context.Context, flow *ofp.OfpFlowStats, flowDirection string) {
 
 	log.Debugw("clearFlowFromResourceManager", log.Fields{"flowDirection": flowDirection, "flow": *flow})
@@ -1585,8 +1606,6 @@
 	}
 
 	var updatedFlows []rsrcMgr.FlowInfo
-	var flowID uint32
-	var onuID, uniID int32
 	classifierInfo := make(map[string]interface{})
 
 	portNum, Intf, onu, uni, inPort, ethType, err := FlowExtractInfo(flow, flowDirection)
@@ -1595,8 +1614,8 @@
 		return
 	}
 
-	onuID = int32(onu)
-	uniID = int32(uni)
+	onuID := int32(onu)
+	uniID := int32(uni)
 
 	for _, field := range flows.GetOfbFields(flow) {
 		if field.Type == flows.IP_PROTO {
@@ -1611,10 +1630,17 @@
 		onuID = -1
 		uniID = -1
 		log.Debug("Trap on nni flow set oni, uni to -1")
-		Intf = IntfIDFromNniPortNum(inPort)
+		Intf, err = IntfIDFromNniPortNum(inPort)
+		if err != nil {
+			log.Errorw("invalid-in-port-number",
+				log.Fields{
+					"port-number": inPort,
+					"error":       err})
+			return
+		}
 	}
 	flowIds := f.resourceMgr.GetCurrentFlowIDsForOnu(ctx, Intf, onuID, uniID)
-	for _, flowID = range flowIds {
+	for _, flowID := range flowIds {
 		flowInfo := f.resourceMgr.GetFlowIDInfo(ctx, Intf, onuID, uniID, flowID)
 		if flowInfo == nil {
 			log.Debugw("No FlowInfo found found in KV store",
@@ -1630,18 +1656,17 @@
 			if flow.Id == storedFlow.LogicalFlowID {
 				removeFlowMessage := openoltpb2.Flow{FlowId: storedFlow.Flow.FlowId, FlowType: storedFlow.Flow.FlowType}
 				log.Debugw("Flow to be deleted", log.Fields{"flow": storedFlow})
-				if ok := f.removeFlowFromDevice(&removeFlowMessage); ok {
-					log.Debug("Flow removed from device successfully")
-					//Remove the Flow from FlowInfo
-					updatedFlows = append(updatedFlows[:i], updatedFlows[i+1:]...)
-					err = f.clearResources(ctx, flow, Intf, onuID, uniID, storedFlow.Flow.GemportId,
-						flowID, flowDirection, portNum, updatedFlows)
-					if err != nil {
-						log.Error("Failed to clear resources for flow", log.Fields{"flow": storedFlow})
-						return
-					}
-				} else {
-					log.Error("Failed to remove flow from device")
+				// DKB
+				if err = f.removeFlowFromDevice(&removeFlowMessage); err != nil {
+					log.Errorw("failed-to-remove-flow", log.Fields{"error": err})
+					return
+				}
+				log.Debug("Flow removed from device successfully")
+				//Remove the Flow from FlowInfo
+				updatedFlows = append(updatedFlows[:i], updatedFlows[i+1:]...)
+				if err = f.clearResources(ctx, flow, Intf, onuID, uniID, storedFlow.Flow.GemportId,
+					flowID, flowDirection, portNum, updatedFlows); err != nil {
+					log.Error("Failed to clear resources for flow", log.Fields{"flow": storedFlow})
 					return
 				}
 			}
@@ -1661,7 +1686,15 @@
 		return
 	}
 
-	networkInterfaceID := IntfIDFromNniPortNum(inPort)
+	networkInterfaceID, err := IntfIDFromNniPortNum(inPort)
+	if err != nil {
+		// DKB
+		log.Errorw("invalid-in-port-number",
+			log.Fields{
+				"port-number": inPort,
+				"error":       err})
+		return
+	}
 	var onuID = int32(NoneOnuID)
 	var uniID = int32(NoneUniID)
 	var flowID uint32
@@ -1685,8 +1718,12 @@
 				removeFlowMessage := openoltpb2.Flow{FlowId: storedFlow.Flow.FlowId, FlowType: storedFlow.Flow.FlowType}
 				log.Debugw("Multicast flow to be deleted", log.Fields{"flow": storedFlow})
 				//remove from device
-				if ok := f.removeFlowFromDevice(&removeFlowMessage); !ok {
-					log.Errorw("Failed to remove multicast flow from device", log.Fields{"flowId": flow.Id})
+				if err := f.removeFlowFromDevice(&removeFlowMessage); err != nil {
+					// DKB
+					log.Errorw("failed-to-remove-multicast-flow",
+						log.Fields{
+							"flow-id": flow.Id,
+							"error":   err})
 					return
 				}
 				log.Debugw("Multicast flow removed from device successfully", log.Fields{"flowId": flow.Id})
@@ -1863,14 +1900,13 @@
 }
 
 // handleFlowWithGroup adds multicast flow to the device.
-func (f *OpenOltFlowMgr) handleFlowWithGroup(ctx context.Context, actionInfo, classifierInfo map[string]interface{}, flow *ofp.OfpFlowStats) {
+func (f *OpenOltFlowMgr) handleFlowWithGroup(ctx context.Context, actionInfo, classifierInfo map[string]interface{}, flow *ofp.OfpFlowStats) error {
 	classifierInfo[PacketTagType] = DoubleTag
 	log.Debugw("add-multicast-flow", log.Fields{"classifierInfo": classifierInfo, "actionInfo": actionInfo})
 
 	inPort, err := f.getInPortOfMulticastFlow(ctx, classifierInfo)
 	if err != nil {
-		log.Warnw("No inPort found. Ignoring multicast flow.", log.Fields{"flowId:": flow.Id})
-		return
+		return NewErrNotFound("multicast-in-port", log.Fields{"classifier": classifierInfo}, err).Log()
 	}
 	//replace ipDst with ethDst
 	if ipv4Dst, ok := classifierInfo[Ipv4Dst]; ok &&
@@ -1883,26 +1919,33 @@
 		log.Debugw("multicast-ip-to-mac-conversion-success", log.Fields{"ip:": ipv4Dst.(uint32), "mac:": multicastMac})
 	}
 
-	var onuID = NoneOnuID
-	var uniID = NoneUniID
-	var gemPortID = NoneGemPortID
+	onuID := NoneOnuID
+	uniID := NoneUniID
+	gemPortID := NoneGemPortID
 
-	networkInterfaceID := IntfIDFromNniPortNum(inPort)
+	networkInterfaceID, err := IntfIDFromNniPortNum(inPort)
+	if err != nil {
+		return NewErrInvalidValue(log.Fields{"nni-in-port-number": inPort}, err).Log()
+	}
 
-	var flowStoreCookie = getFlowStoreCookie(classifierInfo, uint32(0))
+	flowStoreCookie := getFlowStoreCookie(classifierInfo, uint32(0))
 	if present := f.resourceMgr.IsFlowCookieOnKVStore(ctx, uint32(networkInterfaceID), int32(onuID), int32(uniID), flowStoreCookie); present {
-		log.Debugw("multicast-flow-exists--not-re-adding", log.Fields{"classifierInfo": classifierInfo})
-		return
+		log.Debugw("multicast-flow-exists-not-re-adding", log.Fields{"classifierInfo": classifierInfo})
+		return nil
 	}
 	flowID, err := f.resourceMgr.GetFlowID(ctx, uint32(networkInterfaceID), int32(onuID), int32(uniID), uint32(gemPortID), flowStoreCookie, "", 0, 0)
 	if err != nil {
-		log.Errorw("Flow id unavailable for multicast flow", log.Fields{"error": err})
-		return
+		return NewErrNotFound("multicast-flow-id", log.Fields{
+			"interface-id": networkInterfaceID,
+			"onu-id":       onuID,
+			"uni-id":       uniID,
+			"gem-port-id":  gemPortID,
+			"cookie":       flowStoreCookie},
+			err).Log()
 	}
-	var classifierProto *openoltpb2.Classifier
-	if classifierProto = makeOpenOltClassifierField(classifierInfo); classifierProto == nil {
-		log.Error("Error in making classifier protobuf for multicast flow")
-		return
+	classifierProto, err := makeOpenOltClassifierField(classifierInfo)
+	if err != nil {
+		return NewErrInvalidValue(log.Fields{"classifier": classifierInfo}, err).Log()
 	}
 	groupID := actionInfo[GroupID].(uint32)
 	multicastFlow := openoltpb2.Flow{
@@ -1914,27 +1957,28 @@
 		Priority:      int32(flow.Priority),
 		Cookie:        flow.Cookie}
 
-	if ok := f.addFlowToDevice(ctx, flow, &multicastFlow); ok {
-		log.Debug("multicast flow added to device successfully")
-		//get cached group
-		group, _, err := f.GetFlowGroupFromKVStore(ctx, groupID, true)
-		if err == nil {
-			//calling groupAdd to set group members after multicast flow creation
-			if f.ModifyGroup(ctx, group) {
-				//cached group can be removed now
-				f.resourceMgr.RemoveFlowGroupFromKVStore(ctx, groupID, true)
-			}
-		}
-
-		flowsToKVStore := f.getUpdatedFlowInfo(ctx, &multicastFlow, flowStoreCookie, MulticastFlow, flowID, flow.Id)
-		if err := f.updateFlowInfoToKVStore(ctx, int32(networkInterfaceID),
-			int32(onuID),
-			int32(uniID),
-			flowID, flowsToKVStore); err != nil {
-			log.Errorw("Error uploading multicast flow into KV store", log.Fields{"flow": multicastFlow, "error": err})
+	if err = f.addFlowToDevice(ctx, flow, &multicastFlow); err != nil {
+		return NewErrFlowOp("add", flowID, log.Fields{"flow": multicastFlow}, err).Log()
+	}
+	log.Debug("multicast flow added to device successfully")
+	//get cached group
+	group, _, err := f.GetFlowGroupFromKVStore(ctx, groupID, true)
+	if err == nil {
+		//calling groupAdd to set group members after multicast flow creation
+		if f.ModifyGroup(ctx, group) {
+			//cached group can be removed now
+			f.resourceMgr.RemoveFlowGroupFromKVStore(ctx, groupID, true)
 		}
 	}
-	return
+
+	flowsToKVStore := f.getUpdatedFlowInfo(ctx, &multicastFlow, flowStoreCookie, MulticastFlow, flowID, flow.Id)
+	if err = f.updateFlowInfoToKVStore(ctx, int32(networkInterfaceID),
+		int32(onuID),
+		int32(uniID),
+		flowID, flowsToKVStore); err != nil {
+		return NewErrPersistence("update", "flow", flowID, log.Fields{"flow": multicastFlow}, err).Log()
+	}
+	return nil
 }
 
 //getInPortOfMulticastFlow return inPort criterion if exists; returns NNI interface of the device otherwise
@@ -1947,7 +1991,7 @@
 	if e == nil && len(nniPorts) > 0 {
 		return nniPorts[0], nil
 	}
-	return 0, errors.New("cannot find NNI port of device")
+	return 0, NewErrNotFound("nni-port", nil, e).Log()
 }
 
 // AddGroup add or update the group
@@ -2236,8 +2280,11 @@
 			}
 		}
 	}
-	log.Errorw("onuid is not found", log.Fields{"serialNumber": serialNumber, "intfId": intfID, "gemPort": gemPortID})
-	return uint32(0), errors.New("key error, onuid is not found") // ONU ID 0 is not a valid one
+	return uint32(0), NewErrNotFound("onu-id", log.Fields{
+		"serial-number": serialNumber,
+		"interface-id":  intfID,
+		"gem-port-id":   gemPortID},
+		nil).Log()
 }
 
 //GetLogicalPortFromPacketIn function computes logical port UNI/NNI port from packet-in indication and returns the same
@@ -2302,10 +2349,10 @@
 func installFlowOnAllGemports(ctx context.Context,
 	f1 func(ctx context.Context, intfId uint32, onuId uint32, uniId uint32,
 		portNo uint32, classifier map[string]interface{}, action map[string]interface{},
-		logicalFlow *ofp.OfpFlowStats, allocId uint32, gemPortId uint32),
+		logicalFlow *ofp.OfpFlowStats, allocId uint32, gemPortId uint32) error,
 	f2 func(ctx context.Context, intfId uint32, onuId uint32, uniId uint32, portNo uint32,
 		logicalFlow *ofp.OfpFlowStats, allocId uint32, gemPortId uint32, vlanId uint32,
-		classifier map[string]interface{}, action map[string]interface{}),
+		classifier map[string]interface{}, action map[string]interface{}) error,
 	args map[string]uint32,
 	classifier map[string]interface{}, action map[string]interface{},
 	logicalFlow *ofp.OfpFlowStats,
@@ -2325,13 +2372,11 @@
 	}
 }
 
-func (f *OpenOltFlowMgr) addDHCPTrapFlowOnNNI(ctx context.Context, logicalFlow *ofp.OfpFlowStats, classifier map[string]interface{}, portNo uint32) {
+func (f *OpenOltFlowMgr) addDHCPTrapFlowOnNNI(ctx context.Context, logicalFlow *ofp.OfpFlowStats, classifier map[string]interface{}, portNo uint32) error {
 	log.Debug("Adding trap-dhcp-of-nni-flow")
 	action := make(map[string]interface{})
 	classifier[PacketTagType] = DoubleTag
 	action[TrapToHost] = true
-	var err error
-	var networkInterfaceID uint32
 	/* We manage flowId resource pool on per PON port basis.
 	   Since this situation is tricky, as a hack, we pass the NNI port
 	   index (network_intf_id) as PON port Index for the flowId resource
@@ -2347,32 +2392,37 @@
 	uniID := -1
 	gemPortID := -1
 	allocID := -1
-	networkInterfaceID, err = getNniIntfID(classifier, action)
+	networkInterfaceID, err := getNniIntfID(classifier, action)
 	if err != nil {
-		log.Error("Failed to get nniIntf ID")
-		return
+		return NewErrNotFound("nni-intreface-id", log.Fields{
+			"classifier": classifier,
+			"action":     action},
+			err).Log()
 	}
 
 	flowStoreCookie := getFlowStoreCookie(classifier, uint32(0))
 	if present := f.resourceMgr.IsFlowCookieOnKVStore(ctx, uint32(networkInterfaceID), int32(onuID), int32(uniID), flowStoreCookie); present {
-		log.Debug("Flow-exists--not-re-adding")
-		return
+		log.Debug("Flow-exists-not-re-adding")
+		return nil
 	}
 	flowID, err := f.resourceMgr.GetFlowID(ctx, uint32(networkInterfaceID), int32(onuID), int32(uniID), uint32(gemPortID), flowStoreCookie, "", 0)
 	if err != nil {
-		log.Errorw("Flow id unavailable for DHCP traponNNI flow", log.Fields{"error": err})
-		return
+		return NewErrNotFound("dhcp-trap-nni-flow-id", log.Fields{
+			"interface-id": networkInterfaceID,
+			"onu-id":       onuID,
+			"uni-id":       uniID,
+			"gem-port-id":  gemPortID,
+			"cookie":       flowStoreCookie},
+			err).Log()
 	}
-	var classifierProto *openoltpb2.Classifier
-	var actionProto *openoltpb2.Action
-	if classifierProto = makeOpenOltClassifierField(classifier); classifierProto == nil {
-		log.Error("Error in making classifier protobuf for  dhcp trap on nni flow")
-		return
+	classifierProto, err := makeOpenOltClassifierField(classifier)
+	if err != nil {
+		return NewErrInvalidValue(log.Fields{"classifier": classifier}, err).Log()
 	}
 	log.Debugw("Created classifier proto", log.Fields{"classifier": *classifierProto})
-	if actionProto = makeOpenOltActionField(action); actionProto == nil {
-		log.Error("Error in making action protobuf for dhcp trap on nni flow")
-		return
+	actionProto, err := makeOpenOltActionField(action)
+	if err != nil {
+		return NewErrInvalidValue(log.Fields{"action": action}, err).Log()
 	}
 	log.Debugw("Created action proto", log.Fields{"action": *actionProto})
 	downstreamflow := openoltpb2.Flow{AccessIntfId: int32(-1), // AccessIntfId not required
@@ -2388,17 +2438,18 @@
 		Priority:      int32(logicalFlow.Priority),
 		Cookie:        logicalFlow.Cookie,
 		PortNo:        portNo}
-	if ok := f.addFlowToDevice(ctx, logicalFlow, &downstreamflow); ok {
-		log.Debug("DHCP trap on NNI flow added to device successfully")
-		flowsToKVStore := f.getUpdatedFlowInfo(ctx, &downstreamflow, flowStoreCookie, "", flowID, logicalFlow.Id)
-		if err := f.updateFlowInfoToKVStore(ctx, int32(networkInterfaceID),
-			int32(onuID),
-			int32(uniID),
-			flowID, flowsToKVStore); err != nil {
-			log.Errorw("Error uploading DHCP DL  flow into KV store", log.Fields{"flow": downstreamflow, "error": err})
-		}
+	if err := f.addFlowToDevice(ctx, logicalFlow, &downstreamflow); err != nil {
+		return NewErrFlowOp("add", flowID, log.Fields{"flow": downstreamflow}, err).Log()
 	}
-	return
+	log.Debug("DHCP trap on NNI flow added to device successfully")
+	flowsToKVStore := f.getUpdatedFlowInfo(ctx, &downstreamflow, flowStoreCookie, "", flowID, logicalFlow.Id)
+	if err := f.updateFlowInfoToKVStore(ctx, int32(networkInterfaceID),
+		int32(onuID),
+		int32(uniID),
+		flowID, flowsToKVStore); err != nil {
+		return NewErrPersistence("update", "flow", flowID, log.Fields{"flow": downstreamflow}, err).Log()
+	}
+	return nil
 }
 
 //getPacketTypeFromClassifiers finds and returns packet type of a flow by checking flow classifiers
@@ -2428,7 +2479,7 @@
 }
 
 //addIgmpTrapFlowOnNNI adds a trap-to-host flow on NNI
-func (f *OpenOltFlowMgr) addIgmpTrapFlowOnNNI(ctx context.Context, logicalFlow *ofp.OfpFlowStats, classifier map[string]interface{}, portNo uint32) {
+func (f *OpenOltFlowMgr) addIgmpTrapFlowOnNNI(ctx context.Context, logicalFlow *ofp.OfpFlowStats, classifier map[string]interface{}, portNo uint32) error {
 	log.Debugw("Adding igmp-trap-of-nni-flow", log.Fields{"classifierInfo": classifier})
 	action := make(map[string]interface{})
 	classifier[PacketTagType] = getPacketTypeFromClassifiers(classifier)
@@ -2450,29 +2501,34 @@
 	allocID := -1
 	networkInterfaceID, err := getNniIntfID(classifier, action)
 	if err != nil {
-		log.Error("Failed to get nniIntf ID")
-		return
+		return NewErrNotFound("nni-interface-id", log.Fields{
+			"classifier": classifier,
+			"action":     action},
+			err).Log()
 	}
 	flowStoreCookie := getFlowStoreCookie(classifier, uint32(0))
 	if present := f.resourceMgr.IsFlowCookieOnKVStore(ctx, uint32(networkInterfaceID), int32(onuID), int32(uniID), flowStoreCookie); present {
-		log.Debug("igmp-flow-exists--not-re-adding")
-		return
+		log.Debug("igmp-flow-exists-not-re-adding")
+		return nil
 	}
 	flowID, err := f.resourceMgr.GetFlowID(ctx, uint32(networkInterfaceID), int32(onuID), int32(uniID), uint32(gemPortID), flowStoreCookie, "", 0, 0)
 	if err != nil {
-		log.Errorw("IGMP flow id unavailable for trap-on-NNI flow", log.Fields{"error": err})
-		return
+		return NewErrNotFound("igmp-flow-id", log.Fields{
+			"interface-id": networkInterfaceID,
+			"onu-id":       onuID,
+			"uni-id":       uniID,
+			"gem-port-id":  gemPortID,
+			"cookie":       flowStoreCookie},
+			err).Log()
 	}
-	var classifierProto *openoltpb2.Classifier
-	var actionProto *openoltpb2.Action
-	if classifierProto = makeOpenOltClassifierField(classifier); classifierProto == nil {
-		log.Error("Error in making classifier protobuf for igmp trap on nni flow")
-		return
+	classifierProto, err := makeOpenOltClassifierField(classifier)
+	if err != nil {
+		return NewErrInvalidValue(log.Fields{"classifier": classifier}, err).Log()
 	}
 	log.Debugw("Created classifier proto for the IGMP flow", log.Fields{"classifier": *classifierProto})
-	if actionProto = makeOpenOltActionField(action); actionProto == nil {
-		log.Error("Error in making action protobuf for IGMP trap on nni flow")
-		return
+	actionProto, err := makeOpenOltActionField(action)
+	if err != nil {
+		return NewErrInvalidValue(log.Fields{"action": action}, err).Log()
 	}
 	log.Debugw("Created action proto for the IGMP flow", log.Fields{"action": *actionProto})
 	downstreamflow := openoltpb2.Flow{AccessIntfId: int32(-1), // AccessIntfId not required
@@ -2488,23 +2544,23 @@
 		Priority:      int32(logicalFlow.Priority),
 		Cookie:        logicalFlow.Cookie,
 		PortNo:        portNo}
-	if ok := f.addFlowToDevice(ctx, logicalFlow, &downstreamflow); ok {
-		log.Debug("IGMP Trap on NNI flow added to device successfully")
-		flowsToKVStore := f.getUpdatedFlowInfo(ctx, &downstreamflow, flowStoreCookie, "", flowID, logicalFlow.Id)
-		if err := f.updateFlowInfoToKVStore(ctx, int32(networkInterfaceID),
-			int32(onuID),
-			int32(uniID),
-			flowID, flowsToKVStore); err != nil {
-			log.Errorw("Error uploading igmp-trap-on-nni flow into KV store", log.Fields{"flow": downstreamflow, "error": err})
-		}
+	if err := f.addFlowToDevice(ctx, logicalFlow, &downstreamflow); err != nil {
+		return NewErrFlowOp("add", flowID, log.Fields{"flow": downstreamflow}, err).Log()
 	}
-	return
+	log.Debug("IGMP Trap on NNI flow added to device successfully")
+	flowsToKVStore := f.getUpdatedFlowInfo(ctx, &downstreamflow, flowStoreCookie, "", flowID, logicalFlow.Id)
+	if err := f.updateFlowInfoToKVStore(ctx, int32(networkInterfaceID),
+		int32(onuID),
+		int32(uniID),
+		flowID, flowsToKVStore); err != nil {
+		return NewErrPersistence("update", "flow", flowID, log.Fields{"flow": downstreamflow}, err).Log()
+	}
+	return nil
 }
 
 func verifyMeterIDAndGetDirection(MeterID uint32, Dir tp_pb.Direction) (string, error) {
 	if MeterID == 0 { // This should never happen
-		log.Error("Invalid meter id")
-		return "", errors.New("invalid meter id")
+		return "", NewErrInvalidValue(log.Fields{"meter-id": MeterID}, nil).Log()
 	}
 	if Dir == tp_pb.Direction_UPSTREAM {
 		return "upstream", nil
@@ -2699,8 +2755,7 @@
 				actionInfo[Output] = out.GetPort()
 				log.Debugw("action-type-output", log.Fields{"out_port": actionInfo[Output].(uint32)})
 			} else {
-				log.Error("Invalid output port in action")
-				return errors.New("invalid output port in action")
+				return NewErrInvalidValue(log.Fields{"output-port": nil}, nil).Log()
 			}
 		} else if action.Type == flows.POP_VLAN {
 			actionInfo[PopVlan] = true
@@ -2720,8 +2775,7 @@
 			if out := action.GetSetField(); out != nil {
 				if field := out.GetField(); field != nil {
 					if ofClass := field.GetOxmClass(); ofClass != ofp.OfpOxmClass_OFPXMC_OPENFLOW_BASIC {
-						log.Errorw("Invalid openflow class", log.Fields{"class": ofClass})
-						return errors.New("invalid openflow class")
+						return NewErrInvalidValue(log.Fields{"openflow-class": ofClass}, nil).Log()
 					}
 					/*log.Debugw("action-type-set-field",log.Fields{"field": field, "in_port": classifierInfo[IN_PORT].(uint32)})*/
 					formulateSetFieldActionInfoFromFlow(field, actionInfo)
@@ -2730,8 +2784,7 @@
 		} else if action.Type == flows.GROUP {
 			formulateGroupActionInfoFromFlow(action, actionInfo)
 		} else {
-			log.Errorw("Un supported action type", log.Fields{"type": action.Type})
-			return errors.New("un supported action type")
+			return NewErrInvalidValue(log.Fields{"action-type": action.Type}, nil).Log()
 		}
 	}
 	return nil
@@ -2770,8 +2823,9 @@
 				classifierInfo[InPort] = uniPort
 				log.Debugw("upstream pon-to-controller-flow,inport-in-tunnelid", log.Fields{"newInPort": classifierInfo[InPort].(uint32), "outPort": actionInfo[Output].(uint32)})
 			} else {
-				log.Error("upstream pon-to-controller-flow, NO-inport-in-tunnelid")
-				return errors.New("upstream pon-to-controller-flow, NO-inport-in-tunnelid")
+				return NewErrNotFound("child-in-port", log.Fields{
+					"reason": "upstream pon-to-controller-flow, NO-inport-in-tunnelid",
+					"flow":   flow}, nil).Log()
 			}
 		}
 	} else {
@@ -2782,8 +2836,9 @@
 				actionInfo[Output] = uniPort
 				log.Debugw("downstream-nni-to-pon-port-flow, outport-in-tunnelid", log.Fields{"newOutPort": actionInfo[Output].(uint32), "outPort": actionInfo[Output].(uint32)})
 			} else {
-				log.Debug("downstream-nni-to-pon-port-flow, no-outport-in-tunnelid", log.Fields{"InPort": classifierInfo[InPort].(uint32), "outPort": actionInfo[Output].(uint32)})
-				return errors.New("downstream-nni-to-pon-port-flow, no-outport-in-tunnelid")
+				return NewErrNotFound("out-port", log.Fields{
+					"reason": "downstream-nni-to-pon-port-flow, no-outport-in-tunnelid",
+					"flow":   flow}, nil).Log()
 			}
 			// Upstream flow from PON to NNI port , Use tunnel ID as new IN port / UNI port
 		} else if portType := IntfIDToPortTypeName(classifierInfo[InPort].(uint32)); portType == voltha.Port_PON_OLT {
@@ -2792,9 +2847,11 @@
 				log.Debugw("upstream-pon-to-nni-port-flow, inport-in-tunnelid", log.Fields{"newInPort": actionInfo[Output].(uint32),
 					"outport": actionInfo[Output].(uint32)})
 			} else {
-				log.Debug("upstream-pon-to-nni-port-flow, no-inport-in-tunnelid", log.Fields{"InPort": classifierInfo[InPort].(uint32),
-					"outPort": actionInfo[Output].(uint32)})
-				return errors.New("upstream-pon-to-nni-port-flow, no-inport-in-tunnelid")
+				return NewErrNotFound("nni-port", log.Fields{
+					"reason":   "upstream-pon-to-nni-port-flow, no-inport-in-tunnelid",
+					"in-port":  classifierInfo[InPort].(uint32),
+					"out-port": actionInfo[Output].(uint32),
+					"flow":     flow}, nil).Log()
 			}
 		}
 	}
@@ -2811,8 +2868,7 @@
 	*/
 	metadata := flows.GetMetadataFromWriteMetadataAction(flow)
 	if metadata == 0 {
-		log.Error("metadata-is-not-present-in-flow-which-is-mandatory")
-		return 0, errors.New("metadata-is-not-present-in-flow-which-is-mandatory")
+		return 0, NewErrNotFound("metadata", log.Fields{"flow": flow}, nil).Log()
 	}
 	TpID := flows.GetTechProfileIDFromWriteMetaData(metadata)
 	return uint32(TpID), nil
@@ -2832,11 +2888,25 @@
 
 	portType := IntfIDToPortTypeName(classifier[InPort].(uint32))
 	if portType == voltha.Port_PON_OLT {
-		intfID := IntfIDFromNniPortNum(action[Output].(uint32))
+		intfID, err := IntfIDFromNniPortNum(action[Output].(uint32))
+		if err != nil {
+			log.Debugw("invalid-action-port-number",
+				log.Fields{
+					"port-number": action[Output].(uint32),
+					"error":       err})
+			return uint32(0), err
+		}
 		log.Debugw("output Nni IntfID is", log.Fields{"intfid": intfID})
 		return intfID, nil
 	} else if portType == voltha.Port_ETHERNET_NNI {
-		intfID := IntfIDFromNniPortNum(classifier[InPort].(uint32))
+		intfID, err := IntfIDFromNniPortNum(classifier[InPort].(uint32))
+		if err != nil {
+			log.Debugw("invalid-classifier-port-number",
+				log.Fields{
+					"port-number": action[Output].(uint32),
+					"error":       err})
+			return uint32(0), err
+		}
 		log.Debugw("input Nni IntfID is", log.Fields{"intfid": intfID})
 		return intfID, nil
 	}
@@ -2922,8 +2992,7 @@
 func (f *OpenOltFlowMgr) GetFlowGroupFromKVStore(ctx context.Context, groupID uint32, cached bool) (*ofp.OfpGroupEntry, bool, error) {
 	exists, groupInfo, err := f.resourceMgr.GetFlowGroupFromKVStore(ctx, groupID, cached)
 	if err != nil {
-		log.Errorw("Failed to get the flow group from KV store", log.Fields{"groupId": groupID, "err": err})
-		return nil, false, errors.New("failed to retrieve the flow group")
+		return nil, false, NewErrNotFound("flow-group", log.Fields{"group-id": groupID}, err).Log()
 	}
 	if exists {
 		return newGroup(groupInfo.GroupID, groupInfo.OutPorts), exists, nil