VOL-1564: Incoporate Delete Flow Functionality and subsequent cleanup in stores
Change-Id: I429a380da3ed0c951cb5f01ee763eb318693d3b3
diff --git a/adaptercore/openolt_flowmgr.go b/adaptercore/openolt_flowmgr.go
index 7ccf97d..570fff6 100644
--- a/adaptercore/openolt_flowmgr.go
+++ b/adaptercore/openolt_flowmgr.go
@@ -31,6 +31,7 @@
openolt_pb2 "github.com/opencord/voltha-protos/go/openolt"
voltha "github.com/opencord/voltha-protos/go/voltha"
"math/big"
+ //deepcopy "github.com/getlantern/deepcopy"
)
const (
@@ -101,13 +102,14 @@
}
type OpenOltFlowMgr struct {
- techprofile []*tp.TechProfileMgr
- deviceHandler *DeviceHandler
- resourceMgr *rsrcMgr.OpenOltResourceMgr
- onuIds map[onuIdKey]onuInfo //OnuId -> OnuInfo
- onuSerialNumbers map[string]onuInfo //onu serial_number (string) -> OnuInfo
- onuGemPortIds map[gemPortKey]onuInfo //GemPortId -> OnuInfo
- packetInGemPort map[packetInInfoKey]uint32 //packet in gem port
+ techprofile []*tp.TechProfileMgr
+ deviceHandler *DeviceHandler
+ resourceMgr *rsrcMgr.OpenOltResourceMgr
+ onuIds map[onuIdKey]onuInfo //OnuId -> OnuInfo
+ onuSerialNumbers map[string]onuInfo //onu serial_number (string) -> OnuInfo
+ onuGemPortIds map[gemPortKey]onuInfo //GemPortId -> OnuInfo
+ packetInGemPort map[packetInInfoKey]uint32 //packet in gem port
+ storedDeviceFlows []ofp.OfpFlowStats /* Required during deletion to obtain device flows from logical flows */
}
func NewFlowManager(dh *DeviceHandler, rsrcMgr *rsrcMgr.OpenOltResourceMgr) *OpenOltFlowMgr {
@@ -127,6 +129,32 @@
return &flowMgr
}
+func (f *OpenOltFlowMgr) generateStoredFlowId(flowId uint32, direction string) (uint64, error) {
+ if direction == UPSTREAM {
+ log.Debug("upstream flow, shifting id")
+ return 0x1<<15 | uint64(flowId), nil
+ } else if direction == DOWNSTREAM {
+ log.Debug("downstream flow, not shifting id")
+ return uint64(flowId), nil
+ } else {
+ log.Debug("Unrecognized direction")
+ return 0, errors.New(fmt.Sprintf("Unrecognized direction %s", direction))
+ }
+}
+
+func (f *OpenOltFlowMgr) registerFlow(flowFromCore *ofp.OfpFlowStats, deviceFlow *openolt_pb2.Flow) {
+ log.Debug("Registering Flow for Device ", log.Fields{"flow": flowFromCore},
+ log.Fields{"device": f.deviceHandler.deviceId})
+
+ var storedFlow ofp.OfpFlowStats
+ storedFlow.Id, _ = f.generateStoredFlowId(deviceFlow.FlowId, deviceFlow.FlowType)
+ log.Debug(fmt.Sprintf("Generated stored device flow. id = %d, flowId = %d, direction = %s", storedFlow.Id,
+ deviceFlow.FlowId, deviceFlow.FlowType))
+ storedFlow.Cookie = flowFromCore.Id
+ f.storedDeviceFlows = append(f.storedDeviceFlows, storedFlow)
+ log.Debugw("updated Stored flow info", log.Fields{"storedDeviceFlows": f.storedDeviceFlows})
+}
+
func (f *OpenOltFlowMgr) divideAndAddFlow(intfId uint32, onuId uint32, uniId uint32, portNo uint32, classifierInfo map[string]interface{}, actionInfo map[string]interface{}, flow *ofp.OfpFlowStats) {
var allocId []uint32
var gemPorts []uint32
@@ -333,8 +361,9 @@
log.Debugw("Adding HSIA flow", log.Fields{"intfId": intfId, "onuId": onuId, "uniId": uniId, "classifier": classifier,
"action": action, "direction": direction, "allocId": allocId, "gemPortId": gemPortId,
"logicalFlow": *logicalFlow})
+ flowCategory := "HSIA"
flowStoreCookie := getFlowStoreCookie(classifier, gemPortId)
- flowId, err := f.resourceMgr.GetFlowID(intfId, onuId, uniId, flowStoreCookie, "HSIA")
+ flowId, err := f.resourceMgr.GetFlowID(intfId, onuId, uniId, flowStoreCookie, flowCategory)
if err != nil {
log.Errorw("Flow id unavailable for HSIA flow", log.Fields{"direction": direction})
return
@@ -369,13 +398,13 @@
Priority: int32(logicalFlow.Priority),
Cookie: logicalFlow.Cookie,
PortNo: portNo}
- if ok := f.addFlowToDevice(&flow); ok {
+ if ok := f.addFlowToDevice(logicalFlow, &flow); ok {
log.Debug("HSIA flow added to device successfully", log.Fields{"direction": direction})
- flowsToKVStore := f.getUpdatedFlowInfo(&flow, flowStoreCookie, "HSIA")
+ flowsToKVStore := f.getUpdatedFlowInfo(&flow, flowStoreCookie, "HSIA", flowId)
if err := f.updateFlowInfoToKVStore(flow.AccessIntfId,
flow.OnuId,
flow.UniId,
- flow.FlowId, flowsToKVStore); err != nil {
+ flow.FlowId /*flowCategory,*/, flowsToKVStore); err != nil {
log.Errorw("Error uploading HSIA flow into KV store", log.Fields{"flow": flow, "direction": direction, "error": err})
return
}
@@ -438,9 +467,9 @@
Cookie: logicalFlow.Cookie,
PortNo: portNo}
- if ok := f.addFlowToDevice(&dhcpFlow); ok {
+ if ok := f.addFlowToDevice(logicalFlow, &dhcpFlow); ok {
log.Debug("DHCP UL flow added to device successfully")
- flowsToKVStore := f.getUpdatedFlowInfo(&dhcpFlow, flowStoreCookie, "DHCP")
+ flowsToKVStore := f.getUpdatedFlowInfo(&dhcpFlow, flowStoreCookie, "DHCP", flowID)
if err := f.updateFlowInfoToKVStore(dhcpFlow.AccessIntfId,
dhcpFlow.OnuId,
dhcpFlow.UniId,
@@ -509,13 +538,16 @@
Priority: int32(logicalFlow.Priority),
Cookie: logicalFlow.Cookie,
PortNo: portNo}
- if ok := f.addFlowToDevice(&upstreamFlow); ok {
+ if ok := f.addFlowToDevice(logicalFlow, &upstreamFlow); ok {
log.Debug("EAPOL UL flow added to device successfully")
- flowsToKVStore := f.getUpdatedFlowInfo(&upstreamFlow, flowStoreCookie, "EAPOL")
+ flowCategory := "EAPOL"
+ flowsToKVStore := f.getUpdatedFlowInfo(&upstreamFlow, flowStoreCookie, flowCategory, uplinkFlowId)
if err := f.updateFlowInfoToKVStore(upstreamFlow.AccessIntfId,
upstreamFlow.OnuId,
upstreamFlow.UniId,
- upstreamFlow.FlowId, flowsToKVStore); err != nil {
+ upstreamFlow.FlowId,
+ /* lowCategory, */
+ flowsToKVStore); err != nil {
log.Errorw("Error uploading EAPOL UL flow into KV store", log.Fields{"flow": upstreamFlow, "error": err})
return
}
@@ -576,13 +608,16 @@
Priority: int32(logicalFlow.Priority),
Cookie: logicalFlow.Cookie,
PortNo: portNo}
- if ok := f.addFlowToDevice(&downstreamFlow); ok {
+ if ok := f.addFlowToDevice(logicalFlow, &downstreamFlow); ok {
log.Debug("EAPOL DL flow added to device successfully")
- flowsToKVStore := f.getUpdatedFlowInfo(&downstreamFlow, flowStoreCookie, "")
+ flowCategory := ""
+ flowsToKVStore := f.getUpdatedFlowInfo(&downstreamFlow, flowStoreCookie, flowCategory, downlinkFlowId)
if err := f.updateFlowInfoToKVStore(downstreamFlow.AccessIntfId,
downstreamFlow.OnuId,
downstreamFlow.UniId,
- downstreamFlow.FlowId, flowsToKVStore); err != nil {
+ downstreamFlow.FlowId,
+ /* flowCategory, */
+ flowsToKVStore); err != nil {
log.Errorw("Error uploading EAPOL DL flow into KV store", log.Fields{"flow": upstreamFlow, "error": err})
return
}
@@ -691,7 +726,7 @@
return hash.Uint64()
}
-func (f *OpenOltFlowMgr) getUpdatedFlowInfo(flow *openolt_pb2.Flow, flowStoreCookie uint64, flowCategory string) *[]rsrcMgr.FlowInfo {
+func (f *OpenOltFlowMgr) getUpdatedFlowInfo(flow *openolt_pb2.Flow, flowStoreCookie uint64, flowCategory string, deviceFlowId uint32) *[]rsrcMgr.FlowInfo {
var flows []rsrcMgr.FlowInfo = []rsrcMgr.FlowInfo{rsrcMgr.FlowInfo{Flow: flow, FlowCategory: flowCategory, FlowStoreCookie: flowStoreCookie}}
var intfId uint32
/* For flows which trap out of the NNI, the AccessIntfId is invalid
@@ -702,7 +737,6 @@
} else {
intfId = uint32(flow.NetworkIntfId)
}
- // Get existing flows matching flowid for given subscriber from KV store
existingFlows := f.resourceMgr.GetFlowIDInfo(intfId, uint32(flow.OnuId), uint32(flow.UniId), flow.FlowId)
if existingFlows != nil {
log.Debugw("Flow exists for given flowID, appending it to current flow", log.Fields{"flowID": flow.FlowId})
@@ -714,6 +748,29 @@
return &flows
}
+//func (f *OpenOltFlowMgr) getUpdatedFlowInfo(flow *openolt_pb2.Flow, flowStoreCookie uint64, flowCategory string) *[]rsrcMgr.FlowInfo {
+// var flows []rsrcMgr.FlowInfo = []rsrcMgr.FlowInfo{rsrcMgr.FlowInfo{Flow: flow, FlowCategory: flowCategory, FlowStoreCookie: flowStoreCookie}}
+// var intfId uint32
+// /* For flows which trap out of the NNI, the AccessIntfId is invalid
+// (set to -1). In such cases, we need to refer to the NetworkIntfId .
+// */
+// if flow.AccessIntfId != -1 {
+// intfId = uint32(flow.AccessIntfId)
+// } else {
+// intfId = uint32(flow.NetworkIntfId)
+// }
+// // Get existing flows matching flowid for given subscriber from KV store
+// existingFlows := f.resourceMgr.GetFlowIDInfo(intfId, uint32(flow.OnuId), uint32(flow.UniId), flow.FlowId)
+// if existingFlows != nil {
+// log.Debugw("Flow exists for given flowID, appending it to current flow", log.Fields{"flowID": flow.FlowId})
+// for _, f := range *existingFlows {
+// flows = append(flows, f)
+// }
+// }
+// log.Debugw("Updated flows for given flowID and onuid", log.Fields{"updatedflow": flows, "flowid": flow.FlowId, "onu": flow.OnuId})
+// return &flows
+//}
+
func (f *OpenOltFlowMgr) updateFlowInfoToKVStore(intfId int32, onuId int32, uniId int32, flowId uint32, flows *[]rsrcMgr.FlowInfo) error {
log.Debugw("Storing flow(s) into KV store", log.Fields{"flows": *flows})
if err := f.resourceMgr.UpdateFlowIDInfo(intfId, onuId, uniId, flowId, flows); err != nil {
@@ -724,7 +781,7 @@
return nil
}
-func (f *OpenOltFlowMgr) addFlowToDevice(deviceFlow *openolt_pb2.Flow) bool {
+func (f *OpenOltFlowMgr) addFlowToDevice(logicalFlow *ofp.OfpFlowStats, deviceFlow *openolt_pb2.Flow) bool {
log.Debugw("Sending flow to device via grpc", log.Fields{"flow": *deviceFlow})
_, err := f.deviceHandler.Client.FlowAdd(context.Background(), deviceFlow)
if err != nil {
@@ -732,6 +789,18 @@
return false
}
log.Debugw("Flow added to device successfuly ", log.Fields{"flow": *deviceFlow})
+ f.registerFlow(logicalFlow, deviceFlow)
+ return true
+}
+
+func (f *OpenOltFlowMgr) removeFlowFromDevice(deviceFlow *openolt_pb2.Flow) bool {
+ log.Debugw("Sending flow to device via grpc", log.Fields{"flow": *deviceFlow})
+ _, err := f.deviceHandler.Client.FlowRemove(context.Background(), deviceFlow)
+ if err != nil {
+ log.Errorw("Failed to Remove flow from device", log.Fields{"err": err, "deviceFlow": deviceFlow})
+ return false
+ }
+ log.Debugw("Flow removed from device successfuly ", log.Fields{"flow": *deviceFlow})
return true
}
@@ -789,6 +858,96 @@
log.Info("Unimplemented")
}
+func (f *OpenOltFlowMgr) decodeStoredId(id uint64) (uint64, string) {
+ if id>>15 == 0x1 {
+ return id & 0x7fff, UPSTREAM
+ }
+ return id, DOWNSTREAM
+}
+
+func (f *OpenOltFlowMgr) clearFlowFromResourceManager(flow *ofp.OfpFlowStats, flowId uint32, flowDirection string) {
+ log.Debugw("clearFlowFromResourceManager", log.Fields{"flowId": flowId, "flowDirection": flowDirection, "flow": *flow})
+ ponIntf, onuId, uniId, err := FlowExtractInfo(flow, flowDirection)
+ if err != nil {
+ log.Error(err)
+ return
+ }
+ log.Debugw("Extracted access info from flow to be deleted",
+ log.Fields{"ponIntf": ponIntf, "onuId": onuId, "uniId": uniId, "flowId": flowId})
+
+ flowsInfo := f.resourceMgr.GetFlowIDInfo(ponIntf, onuId, uniId, flowId)
+ if flowsInfo == nil {
+ log.Debugw("No FlowInfo found found in KV store",
+ log.Fields{"ponIntf": ponIntf, "onuId": onuId, "uniId": uniId, "flowId": flowId})
+ return
+ }
+ var updatedFlows []rsrcMgr.FlowInfo
+
+ for _, flow := range *flowsInfo {
+ updatedFlows = append(updatedFlows, flow)
+ }
+
+ for i, storedFlow := range updatedFlows {
+ if flowDirection == storedFlow.Flow.FlowType {
+ //Remove the Flow from FlowInfo
+ log.Debugw("Removing flow to be deleted", log.Fields{"flow": storedFlow})
+ updatedFlows = append(updatedFlows[:i], updatedFlows[i+1:]...)
+ break
+ }
+ }
+
+ if len(updatedFlows) >= 0 {
+ // There are still flows referencing the same flow_id.
+ // So the flow should not be freed yet.
+ // For ex: Case of HSIA where same flow is shared
+ // between DS and US.
+ f.updateFlowInfoToKVStore(int32(ponIntf), int32(onuId), int32(uniId), flowId, &updatedFlows)
+ return
+ }
+ log.Debugw("Releasing flow Id to resource manager", log.Fields{"ponIntf": ponIntf, "onuId": onuId, "uniId": uniId, "flowId": flowId})
+ f.resourceMgr.FreeFlowID(ponIntf, onuId, uniId, flowId)
+ flowIds := f.resourceMgr.GetCurrentFlowIDsForOnu(ponIntf, onuId, uniId)
+ if len(flowIds) == 0 {
+ /* TODO: Remove Upstream and Downstream Schedulers */
+ }
+}
+
+func (f *OpenOltFlowMgr) RemoveFlow(flow *ofp.OfpFlowStats) {
+ log.Debugw("Removing Flow", log.Fields{"flow": flow})
+ var deviceFlowsToRemove []ofp.OfpFlowStats
+ var deletedFlowsIdx []int
+ for _, curFlow := range f.storedDeviceFlows {
+ if curFlow.Cookie == flow.Id {
+ log.Debugw("Found found matching flow-cookie", log.Fields{"curFlow": curFlow})
+ deviceFlowsToRemove = append(deviceFlowsToRemove, curFlow)
+ }
+ }
+ log.Debugw("Flows to be deleted", log.Fields{"deviceFlowsToRemove": deviceFlowsToRemove})
+ for index, curFlow := range deviceFlowsToRemove {
+ id, direction := f.decodeStoredId(curFlow.GetId())
+ removeFlowMessage := openolt_pb2.Flow{FlowId: uint32(id), FlowType: direction}
+ if ok := f.removeFlowFromDevice(&removeFlowMessage); ok {
+ log.Debug("Flow removed from device successfully")
+ deletedFlowsIdx = append(deletedFlowsIdx, index)
+ f.clearFlowFromResourceManager(flow, uint32(id), direction) //TODO: Take care of the limitations
+ }
+
+ }
+ // Can be done in separate go routine as it takes time ?
+ for _, flowToRemove := range deletedFlowsIdx {
+ for index, storedFlow := range f.storedDeviceFlows {
+ if deviceFlowsToRemove[flowToRemove].Cookie == storedFlow.Cookie {
+ log.Debugw("Removing flow from local Store", log.Fields{"flow": storedFlow})
+ f.storedDeviceFlows = append(f.storedDeviceFlows[:index], f.storedDeviceFlows[index+1:]...)
+ break
+ }
+ }
+ }
+ log.Debugw("Flows removed from the data store",
+ log.Fields{"number_of_flows_removed": len(deviceFlowsToRemove), "updated_stored_flows": f.storedDeviceFlows})
+ return
+}
+
func (f *OpenOltFlowMgr) AddFlow(flow *ofp.OfpFlowStats) {
classifierInfo := make(map[string]interface{}, 0)
actionInfo := make(map[string]interface{}, 0)
@@ -1105,9 +1264,9 @@
Priority: int32(logicalFlow.Priority),
Cookie: logicalFlow.Cookie,
PortNo: portNo}
- if ok := f.addFlowToDevice(&downstreamflow); ok {
+ if ok := f.addFlowToDevice(logicalFlow, &downstreamflow); ok {
log.Debug("DHCP trap on NNI flow added to device successfully")
- flowsToKVStore := f.getUpdatedFlowInfo(&downstreamflow, flowStoreCookie, "")
+ flowsToKVStore := f.getUpdatedFlowInfo(&downstreamflow, flowStoreCookie, "", flowId)
if err := f.updateFlowInfoToKVStore(int32(networkInterfaceId),
int32(onuId),
int32(uniId),