VOL-1564: Incoporate Delete Flow Functionality and subsequent cleanup in stores

Change-Id: I429a380da3ed0c951cb5f01ee763eb318693d3b3
diff --git a/adaptercore/openolt_flowmgr.go b/adaptercore/openolt_flowmgr.go
index 7ccf97d..570fff6 100644
--- a/adaptercore/openolt_flowmgr.go
+++ b/adaptercore/openolt_flowmgr.go
@@ -31,6 +31,7 @@
 	openolt_pb2 "github.com/opencord/voltha-protos/go/openolt"
 	voltha "github.com/opencord/voltha-protos/go/voltha"
 	"math/big"
+	//deepcopy "github.com/getlantern/deepcopy"
 )
 
 const (
@@ -101,13 +102,14 @@
 }
 
 type OpenOltFlowMgr struct {
-	techprofile      []*tp.TechProfileMgr
-	deviceHandler    *DeviceHandler
-	resourceMgr      *rsrcMgr.OpenOltResourceMgr
-	onuIds           map[onuIdKey]onuInfo       //OnuId -> OnuInfo
-	onuSerialNumbers map[string]onuInfo         //onu serial_number (string) -> OnuInfo
-	onuGemPortIds    map[gemPortKey]onuInfo     //GemPortId -> OnuInfo
-	packetInGemPort  map[packetInInfoKey]uint32 //packet in gem port
+	techprofile       []*tp.TechProfileMgr
+	deviceHandler     *DeviceHandler
+	resourceMgr       *rsrcMgr.OpenOltResourceMgr
+	onuIds            map[onuIdKey]onuInfo       //OnuId -> OnuInfo
+	onuSerialNumbers  map[string]onuInfo         //onu serial_number (string) -> OnuInfo
+	onuGemPortIds     map[gemPortKey]onuInfo     //GemPortId -> OnuInfo
+	packetInGemPort   map[packetInInfoKey]uint32 //packet in gem port
+	storedDeviceFlows []ofp.OfpFlowStats         /* Required during deletion to obtain device flows from logical flows */
 }
 
 func NewFlowManager(dh *DeviceHandler, rsrcMgr *rsrcMgr.OpenOltResourceMgr) *OpenOltFlowMgr {
@@ -127,6 +129,32 @@
 	return &flowMgr
 }
 
+func (f *OpenOltFlowMgr) generateStoredFlowId(flowId uint32, direction string) (uint64, error) {
+	if direction == UPSTREAM {
+		log.Debug("upstream flow, shifting id")
+		return 0x1<<15 | uint64(flowId), nil
+	} else if direction == DOWNSTREAM {
+		log.Debug("downstream flow, not shifting id")
+		return uint64(flowId), nil
+	} else {
+		log.Debug("Unrecognized direction")
+		return 0, errors.New(fmt.Sprintf("Unrecognized direction %s", direction))
+	}
+}
+
+func (f *OpenOltFlowMgr) registerFlow(flowFromCore *ofp.OfpFlowStats, deviceFlow *openolt_pb2.Flow) {
+	log.Debug("Registering Flow for Device ", log.Fields{"flow": flowFromCore},
+		log.Fields{"device": f.deviceHandler.deviceId})
+
+	var storedFlow ofp.OfpFlowStats
+	storedFlow.Id, _ = f.generateStoredFlowId(deviceFlow.FlowId, deviceFlow.FlowType)
+	log.Debug(fmt.Sprintf("Generated stored device flow. id = %d, flowId = %d, direction = %s", storedFlow.Id,
+		deviceFlow.FlowId, deviceFlow.FlowType))
+	storedFlow.Cookie = flowFromCore.Id
+	f.storedDeviceFlows = append(f.storedDeviceFlows, storedFlow)
+	log.Debugw("updated Stored flow info", log.Fields{"storedDeviceFlows": f.storedDeviceFlows})
+}
+
 func (f *OpenOltFlowMgr) divideAndAddFlow(intfId uint32, onuId uint32, uniId uint32, portNo uint32, classifierInfo map[string]interface{}, actionInfo map[string]interface{}, flow *ofp.OfpFlowStats) {
 	var allocId []uint32
 	var gemPorts []uint32
@@ -333,8 +361,9 @@
 	log.Debugw("Adding HSIA flow", log.Fields{"intfId": intfId, "onuId": onuId, "uniId": uniId, "classifier": classifier,
 		"action": action, "direction": direction, "allocId": allocId, "gemPortId": gemPortId,
 		"logicalFlow": *logicalFlow})
+	flowCategory := "HSIA"
 	flowStoreCookie := getFlowStoreCookie(classifier, gemPortId)
-	flowId, err := f.resourceMgr.GetFlowID(intfId, onuId, uniId, flowStoreCookie, "HSIA")
+	flowId, err := f.resourceMgr.GetFlowID(intfId, onuId, uniId, flowStoreCookie, flowCategory)
 	if err != nil {
 		log.Errorw("Flow id unavailable for HSIA flow", log.Fields{"direction": direction})
 		return
@@ -369,13 +398,13 @@
 		Priority:      int32(logicalFlow.Priority),
 		Cookie:        logicalFlow.Cookie,
 		PortNo:        portNo}
-	if ok := f.addFlowToDevice(&flow); ok {
+	if ok := f.addFlowToDevice(logicalFlow, &flow); ok {
 		log.Debug("HSIA flow added to device successfully", log.Fields{"direction": direction})
-		flowsToKVStore := f.getUpdatedFlowInfo(&flow, flowStoreCookie, "HSIA")
+		flowsToKVStore := f.getUpdatedFlowInfo(&flow, flowStoreCookie, "HSIA", flowId)
 		if err := f.updateFlowInfoToKVStore(flow.AccessIntfId,
 			flow.OnuId,
 			flow.UniId,
-			flow.FlowId, flowsToKVStore); err != nil {
+			flow.FlowId /*flowCategory,*/, flowsToKVStore); err != nil {
 			log.Errorw("Error uploading HSIA  flow into KV store", log.Fields{"flow": flow, "direction": direction, "error": err})
 			return
 		}
@@ -438,9 +467,9 @@
 		Cookie:        logicalFlow.Cookie,
 		PortNo:        portNo}
 
-	if ok := f.addFlowToDevice(&dhcpFlow); ok {
+	if ok := f.addFlowToDevice(logicalFlow, &dhcpFlow); ok {
 		log.Debug("DHCP UL flow added to device successfully")
-		flowsToKVStore := f.getUpdatedFlowInfo(&dhcpFlow, flowStoreCookie, "DHCP")
+		flowsToKVStore := f.getUpdatedFlowInfo(&dhcpFlow, flowStoreCookie, "DHCP", flowID)
 		if err := f.updateFlowInfoToKVStore(dhcpFlow.AccessIntfId,
 			dhcpFlow.OnuId,
 			dhcpFlow.UniId,
@@ -509,13 +538,16 @@
 		Priority:      int32(logicalFlow.Priority),
 		Cookie:        logicalFlow.Cookie,
 		PortNo:        portNo}
-	if ok := f.addFlowToDevice(&upstreamFlow); ok {
+	if ok := f.addFlowToDevice(logicalFlow, &upstreamFlow); ok {
 		log.Debug("EAPOL UL flow added to device successfully")
-		flowsToKVStore := f.getUpdatedFlowInfo(&upstreamFlow, flowStoreCookie, "EAPOL")
+		flowCategory := "EAPOL"
+		flowsToKVStore := f.getUpdatedFlowInfo(&upstreamFlow, flowStoreCookie, flowCategory, uplinkFlowId)
 		if err := f.updateFlowInfoToKVStore(upstreamFlow.AccessIntfId,
 			upstreamFlow.OnuId,
 			upstreamFlow.UniId,
-			upstreamFlow.FlowId, flowsToKVStore); err != nil {
+			upstreamFlow.FlowId,
+			/* lowCategory, */
+			flowsToKVStore); err != nil {
 			log.Errorw("Error uploading EAPOL UL flow into KV store", log.Fields{"flow": upstreamFlow, "error": err})
 			return
 		}
@@ -576,13 +608,16 @@
 			Priority:      int32(logicalFlow.Priority),
 			Cookie:        logicalFlow.Cookie,
 			PortNo:        portNo}
-		if ok := f.addFlowToDevice(&downstreamFlow); ok {
+		if ok := f.addFlowToDevice(logicalFlow, &downstreamFlow); ok {
 			log.Debug("EAPOL DL flow added to device successfully")
-			flowsToKVStore := f.getUpdatedFlowInfo(&downstreamFlow, flowStoreCookie, "")
+			flowCategory := ""
+			flowsToKVStore := f.getUpdatedFlowInfo(&downstreamFlow, flowStoreCookie, flowCategory, downlinkFlowId)
 			if err := f.updateFlowInfoToKVStore(downstreamFlow.AccessIntfId,
 				downstreamFlow.OnuId,
 				downstreamFlow.UniId,
-				downstreamFlow.FlowId, flowsToKVStore); err != nil {
+				downstreamFlow.FlowId,
+				/* flowCategory, */
+				flowsToKVStore); err != nil {
 				log.Errorw("Error uploading EAPOL DL flow into KV store", log.Fields{"flow": upstreamFlow, "error": err})
 				return
 			}
@@ -691,7 +726,7 @@
 	return hash.Uint64()
 }
 
-func (f *OpenOltFlowMgr) getUpdatedFlowInfo(flow *openolt_pb2.Flow, flowStoreCookie uint64, flowCategory string) *[]rsrcMgr.FlowInfo {
+func (f *OpenOltFlowMgr) getUpdatedFlowInfo(flow *openolt_pb2.Flow, flowStoreCookie uint64, flowCategory string, deviceFlowId uint32) *[]rsrcMgr.FlowInfo {
 	var flows []rsrcMgr.FlowInfo = []rsrcMgr.FlowInfo{rsrcMgr.FlowInfo{Flow: flow, FlowCategory: flowCategory, FlowStoreCookie: flowStoreCookie}}
 	var intfId uint32
 	/* For flows which trap out of the NNI, the AccessIntfId is invalid
@@ -702,7 +737,6 @@
 	} else {
 		intfId = uint32(flow.NetworkIntfId)
 	}
-	// Get existing flows matching flowid for given subscriber from KV store
 	existingFlows := f.resourceMgr.GetFlowIDInfo(intfId, uint32(flow.OnuId), uint32(flow.UniId), flow.FlowId)
 	if existingFlows != nil {
 		log.Debugw("Flow exists for given flowID, appending it to current flow", log.Fields{"flowID": flow.FlowId})
@@ -714,6 +748,29 @@
 	return &flows
 }
 
+//func (f *OpenOltFlowMgr) getUpdatedFlowInfo(flow *openolt_pb2.Flow, flowStoreCookie uint64, flowCategory string) *[]rsrcMgr.FlowInfo {
+//	var flows []rsrcMgr.FlowInfo = []rsrcMgr.FlowInfo{rsrcMgr.FlowInfo{Flow: flow, FlowCategory: flowCategory, FlowStoreCookie: flowStoreCookie}}
+//	var intfId uint32
+//	/* For flows which trap out of the NNI, the AccessIntfId is invalid
+//	   (set to -1). In such cases, we need to refer to the NetworkIntfId .
+//	*/
+//	if flow.AccessIntfId != -1 {
+//		intfId = uint32(flow.AccessIntfId)
+//	} else {
+//		intfId = uint32(flow.NetworkIntfId)
+//	}
+//	// Get existing flows matching flowid for given subscriber from KV store
+//	existingFlows := f.resourceMgr.GetFlowIDInfo(intfId, uint32(flow.OnuId), uint32(flow.UniId), flow.FlowId)
+//	if existingFlows != nil {
+//		log.Debugw("Flow exists for given flowID, appending it to current flow", log.Fields{"flowID": flow.FlowId})
+//		for _, f := range *existingFlows {
+//			flows = append(flows, f)
+//		}
+//	}
+//	log.Debugw("Updated flows for given flowID and onuid", log.Fields{"updatedflow": flows, "flowid": flow.FlowId, "onu": flow.OnuId})
+//	return &flows
+//}
+
 func (f *OpenOltFlowMgr) updateFlowInfoToKVStore(intfId int32, onuId int32, uniId int32, flowId uint32, flows *[]rsrcMgr.FlowInfo) error {
 	log.Debugw("Storing flow(s) into KV store", log.Fields{"flows": *flows})
 	if err := f.resourceMgr.UpdateFlowIDInfo(intfId, onuId, uniId, flowId, flows); err != nil {
@@ -724,7 +781,7 @@
 	return nil
 }
 
-func (f *OpenOltFlowMgr) addFlowToDevice(deviceFlow *openolt_pb2.Flow) bool {
+func (f *OpenOltFlowMgr) addFlowToDevice(logicalFlow *ofp.OfpFlowStats, deviceFlow *openolt_pb2.Flow) bool {
 	log.Debugw("Sending flow to device via grpc", log.Fields{"flow": *deviceFlow})
 	_, err := f.deviceHandler.Client.FlowAdd(context.Background(), deviceFlow)
 	if err != nil {
@@ -732,6 +789,18 @@
 		return false
 	}
 	log.Debugw("Flow added to device successfuly ", log.Fields{"flow": *deviceFlow})
+	f.registerFlow(logicalFlow, deviceFlow)
+	return true
+}
+
+func (f *OpenOltFlowMgr) removeFlowFromDevice(deviceFlow *openolt_pb2.Flow) bool {
+	log.Debugw("Sending flow to device via grpc", log.Fields{"flow": *deviceFlow})
+	_, err := f.deviceHandler.Client.FlowRemove(context.Background(), deviceFlow)
+	if err != nil {
+		log.Errorw("Failed to Remove flow from device", log.Fields{"err": err, "deviceFlow": deviceFlow})
+		return false
+	}
+	log.Debugw("Flow removed from device successfuly ", log.Fields{"flow": *deviceFlow})
 	return true
 }
 
@@ -789,6 +858,96 @@
 	log.Info("Unimplemented")
 }
 
+func (f *OpenOltFlowMgr) decodeStoredId(id uint64) (uint64, string) {
+	if id>>15 == 0x1 {
+		return id & 0x7fff, UPSTREAM
+	}
+	return id, DOWNSTREAM
+}
+
+func (f *OpenOltFlowMgr) clearFlowFromResourceManager(flow *ofp.OfpFlowStats, flowId uint32, flowDirection string) {
+	log.Debugw("clearFlowFromResourceManager", log.Fields{"flowId": flowId, "flowDirection": flowDirection, "flow": *flow})
+	ponIntf, onuId, uniId, err := FlowExtractInfo(flow, flowDirection)
+	if err != nil {
+		log.Error(err)
+		return
+	}
+	log.Debugw("Extracted access info from flow to be deleted",
+		log.Fields{"ponIntf": ponIntf, "onuId": onuId, "uniId": uniId, "flowId": flowId})
+
+	flowsInfo := f.resourceMgr.GetFlowIDInfo(ponIntf, onuId, uniId, flowId)
+	if flowsInfo == nil {
+		log.Debugw("No FlowInfo found found in KV store",
+			log.Fields{"ponIntf": ponIntf, "onuId": onuId, "uniId": uniId, "flowId": flowId})
+		return
+	}
+	var updatedFlows []rsrcMgr.FlowInfo
+
+	for _, flow := range *flowsInfo {
+		updatedFlows = append(updatedFlows, flow)
+	}
+
+	for i, storedFlow := range updatedFlows {
+		if flowDirection == storedFlow.Flow.FlowType {
+			//Remove the Flow from FlowInfo
+			log.Debugw("Removing flow to be deleted", log.Fields{"flow": storedFlow})
+			updatedFlows = append(updatedFlows[:i], updatedFlows[i+1:]...)
+			break
+		}
+	}
+
+	if len(updatedFlows) >= 0 {
+		// There are still flows referencing the same flow_id.
+		// So the flow should not be freed yet.
+		// For ex: Case of HSIA where same flow is shared
+		// between DS and US.
+		f.updateFlowInfoToKVStore(int32(ponIntf), int32(onuId), int32(uniId), flowId, &updatedFlows)
+		return
+	}
+	log.Debugw("Releasing flow Id to resource manager", log.Fields{"ponIntf": ponIntf, "onuId": onuId, "uniId": uniId, "flowId": flowId})
+	f.resourceMgr.FreeFlowID(ponIntf, onuId, uniId, flowId)
+	flowIds := f.resourceMgr.GetCurrentFlowIDsForOnu(ponIntf, onuId, uniId)
+	if len(flowIds) == 0 {
+		/* TODO: Remove Upstream and Downstream Schedulers */
+	}
+}
+
+func (f *OpenOltFlowMgr) RemoveFlow(flow *ofp.OfpFlowStats) {
+	log.Debugw("Removing Flow", log.Fields{"flow": flow})
+	var deviceFlowsToRemove []ofp.OfpFlowStats
+	var deletedFlowsIdx []int
+	for _, curFlow := range f.storedDeviceFlows {
+		if curFlow.Cookie == flow.Id {
+			log.Debugw("Found found matching flow-cookie", log.Fields{"curFlow": curFlow})
+			deviceFlowsToRemove = append(deviceFlowsToRemove, curFlow)
+		}
+	}
+	log.Debugw("Flows to be deleted", log.Fields{"deviceFlowsToRemove": deviceFlowsToRemove})
+	for index, curFlow := range deviceFlowsToRemove {
+		id, direction := f.decodeStoredId(curFlow.GetId())
+		removeFlowMessage := openolt_pb2.Flow{FlowId: uint32(id), FlowType: direction}
+		if ok := f.removeFlowFromDevice(&removeFlowMessage); ok {
+			log.Debug("Flow removed from device successfully")
+			deletedFlowsIdx = append(deletedFlowsIdx, index)
+			f.clearFlowFromResourceManager(flow, uint32(id), direction) //TODO: Take care of the limitations
+		}
+
+	}
+	// Can be done in separate go routine as it takes time ?
+	for _, flowToRemove := range deletedFlowsIdx {
+		for index, storedFlow := range f.storedDeviceFlows {
+			if deviceFlowsToRemove[flowToRemove].Cookie == storedFlow.Cookie {
+				log.Debugw("Removing flow from local Store", log.Fields{"flow": storedFlow})
+				f.storedDeviceFlows = append(f.storedDeviceFlows[:index], f.storedDeviceFlows[index+1:]...)
+				break
+			}
+		}
+	}
+	log.Debugw("Flows removed from the data store",
+		log.Fields{"number_of_flows_removed": len(deviceFlowsToRemove), "updated_stored_flows": f.storedDeviceFlows})
+	return
+}
+
 func (f *OpenOltFlowMgr) AddFlow(flow *ofp.OfpFlowStats) {
 	classifierInfo := make(map[string]interface{}, 0)
 	actionInfo := make(map[string]interface{}, 0)
@@ -1105,9 +1264,9 @@
 		Priority:      int32(logicalFlow.Priority),
 		Cookie:        logicalFlow.Cookie,
 		PortNo:        portNo}
-	if ok := f.addFlowToDevice(&downstreamflow); ok {
+	if ok := f.addFlowToDevice(logicalFlow, &downstreamflow); ok {
 		log.Debug("DHCP trap on NNI flow added to device successfully")
-		flowsToKVStore := f.getUpdatedFlowInfo(&downstreamflow, flowStoreCookie, "")
+		flowsToKVStore := f.getUpdatedFlowInfo(&downstreamflow, flowStoreCookie, "", flowId)
 		if err := f.updateFlowInfoToKVStore(int32(networkInterfaceId),
 			int32(onuId),
 			int32(uniId),