VOL-1904 openolt adapter reconcile.
cache flowsUsedByGemPort is also stored and kvstore kept in sync.
on adapter restart flow ids per gemport will be loaded to flowsUsedByGemPort from kvstore
Change-Id: Ife58fd9cb790e032402e77c630cf9069d0db5bcb
diff --git a/adaptercore/openolt_flowmgr.go b/adaptercore/openolt_flowmgr.go
index e3f0ccb..67d2a2a 100644
--- a/adaptercore/openolt_flowmgr.go
+++ b/adaptercore/openolt_flowmgr.go
@@ -200,6 +200,8 @@
if flowMgr.onuGemInfo[idx], err = rMgr.GetOnuGemInfo(idx); err != nil {
log.Error("Failed to load onu gem info cache")
}
+ //Load flowID list per gem map per interface from the kvstore.
+ flowMgr.loadFlowIDlistForGem(idx)
}
flowMgr.lockCache = sync.RWMutex{}
log.Info("Initialization of flow manager success!!")
@@ -229,6 +231,8 @@
}
flowIDList = appendUnique(flowIDList, deviceFlow.FlowId)
f.flowsUsedByGemPort[gemPK] = flowIDList
+ // update the flowids for a gem to the KVstore
+ f.resourceMgr.UpdateFlowIDsForGem(uint32(deviceFlow.AccessIntfId), uint32(deviceFlow.GemportId), flowIDList)
}
func (f *OpenOltFlowMgr) divideAndAddFlow(intfID uint32, onuID uint32, uniID uint32, portNo uint32,
@@ -1256,7 +1260,10 @@
for i, flowIDinMap := range flowIDs {
if flowIDinMap == flowID {
flowIDs = append(flowIDs[:i], flowIDs[i+1:]...)
+ // everytime flowsUsedByGemPort cache is updated the same should be updated
+ // in kv store by calling UpdateFlowIDsForGem
f.flowsUsedByGemPort[gemPK] = flowIDs
+ f.resourceMgr.UpdateFlowIDsForGem(Intf, uint32(gemPortID), flowIDs)
break
}
}
@@ -1269,8 +1276,10 @@
// But it is anyway eventually removed later when the TechProfile is freed, so not a big issue for now.
f.resourceMgr.RemoveGEMportPonportToOnuMapOnKVStore(uint32(gemPortID), Intf)
f.onuIdsLock.Lock()
+ //everytime an entry is deleted from flowsUsedByGemPort cache, the same should be updated in kv as well
+ // by calling DeleteFlowIDsForGem
delete(f.flowsUsedByGemPort, gemPK)
- //delete(f.onuGemPortIds, gemPK)
+ f.resourceMgr.DeleteFlowIDsForGem(Intf, uint32(gemPortID))
f.resourceMgr.FreeGemPortID(Intf, uint32(onuID), uint32(uniID), uint32(gemPortID))
f.onuIdsLock.Unlock()
@@ -2060,3 +2069,16 @@
}
f.resourceMgr.AddUniPortToOnuInfo(intfID, onuID, portNum)
}
+
+func (f *OpenOltFlowMgr) loadFlowIDlistForGem(intf uint32) {
+ flowIDsList, err := f.resourceMgr.GetFlowIDsGemMapForInterface(intf)
+ if err != nil {
+ log.Error("Failed to get flowid list per gem", log.Fields{"intf": intf})
+ return
+ }
+ for gem, FlowIDs := range flowIDsList {
+ gemPK := gemPortKey{intf, uint32(gem)}
+ f.flowsUsedByGemPort[gemPK] = FlowIDs
+ }
+ return
+}
diff --git a/adaptercore/resourcemanager/resourcemanager.go b/adaptercore/resourcemanager/resourcemanager.go
index 3c01117..a12d5f6 100755
--- a/adaptercore/resourcemanager/resourcemanager.go
+++ b/adaptercore/resourcemanager/resourcemanager.go
@@ -46,6 +46,8 @@
// OnuPacketINPath path on the kvstore to store packetin gemport,which will be used for packetin, pcketout
//format: onu_packetin/<intfid>,<onuid>,<logicalport>
OnuPacketINPath = "onu_packetin/{%d,%d,%d}"
+ //FlowIDsForGem flowids_per_gem/<intfid>
+ FlowIDsForGem = "flowids_per_gem/{%d}"
)
// FlowInfo holds the flow information
@@ -1207,3 +1209,83 @@
}
return nil
}
+
+//UpdateFlowIDsForGem updates flow id per gemport
+func (RsrcMgr *OpenOltResourceMgr) UpdateFlowIDsForGem(intf uint32, gem uint32, flowIDs []uint32) error {
+ var val []byte
+ path := fmt.Sprintf(FlowIDsForGem, intf)
+
+ flowsForGem, err := RsrcMgr.GetFlowIDsGemMapForInterface(intf)
+ if err != nil {
+ log.Error("Failed to ger flowids for interface", log.Fields{"error": err, "intf": intf})
+ return err
+ }
+ if flowsForGem == nil {
+ flowsForGem = make(map[uint32][]uint32)
+ }
+ flowsForGem[gem] = flowIDs
+ val, err = json.Marshal(flowsForGem)
+ if err != nil {
+ log.Error("Failed to marshal data", log.Fields{"error": err})
+ return err
+ }
+ if err = RsrcMgr.KVStore.Put(path, val); err != nil {
+ log.Errorw("Failed to put to kvstore", log.Fields{"error": err, "path": path, "value": val})
+ return err
+ }
+ log.Debugw("added flowid list for gem to kv successfully", log.Fields{"path": path, "flowidlist": flowsForGem[gem]})
+ return nil
+}
+
+//DeleteFlowIDsForGem deletes the flowID list entry per gem from kvstore.
+func (RsrcMgr *OpenOltResourceMgr) DeleteFlowIDsForGem(intf uint32, gem uint32) {
+ path := fmt.Sprintf(FlowIDsForGem, intf)
+ var val []byte
+
+ flowsForGem, err := RsrcMgr.GetFlowIDsGemMapForInterface(intf)
+ if err != nil {
+ log.Error("Failed to ger flowids for interface", log.Fields{"error": err, "intf": intf})
+ return
+ }
+ if flowsForGem == nil {
+ log.Error("No flowids found ", log.Fields{"intf": intf, "gemport": gem})
+ return
+ }
+ // once we get the flows per gem map from kv , just delete the gem entry from the map
+ delete(flowsForGem, gem)
+ // once gem entry is deleted update the kv store.
+ val, err = json.Marshal(flowsForGem)
+ if err != nil {
+ log.Error("Failed to marshal data", log.Fields{"error": err})
+ return
+ }
+ if err = RsrcMgr.KVStore.Put(path, val); err != nil {
+ log.Errorw("Failed to put to kvstore", log.Fields{"error": err, "path": path, "value": val})
+ return
+ }
+ return
+}
+
+//GetFlowIDsGemMapForInterface gets flowids per gemport and interface
+func (RsrcMgr *OpenOltResourceMgr) GetFlowIDsGemMapForInterface(intf uint32) (map[uint32][]uint32, error) {
+ path := fmt.Sprintf(FlowIDsForGem, intf)
+ var flowsForGem map[uint32][]uint32
+ var val []byte
+
+ value, err := RsrcMgr.KVStore.Get(path)
+ if err != nil {
+ log.Error("failed to get data from kv store")
+ return nil, err
+ }
+ if value != nil {
+ if val, err = kvstore.ToByte(value.Value); err != nil {
+ log.Error("Failed to convert to byte array", log.Fields{"error": err})
+ return nil, err
+ }
+ if err = json.Unmarshal(val, &flowsForGem); err != nil {
+ log.Error("Failed to unmarshall", log.Fields{"error": err})
+ return nil, err
+ }
+ }
+ return flowsForGem, nil
+}