[VOL-4532] Remove duplicate maps in FlowManager and ResourceManager
Change-Id: I0a0fee7dbd3b3a25f2f0eee062bf565ba3212df3
diff --git a/internal/pkg/resourcemanager/resourcemanager.go b/internal/pkg/resourcemanager/resourcemanager.go
index 476bbf5..cfc755e 100755
--- a/internal/pkg/resourcemanager/resourcemanager.go
+++ b/internal/pkg/resourcemanager/resourcemanager.go
@@ -244,7 +244,9 @@
}
ResourceMgr.InitLocalCache()
-
+ if err := ResourceMgr.LoadLocalCacheFromKVStore(ctx, PonIntfID); err != nil {
+ logger.Error(ctx, "failed-to-load-local-cache-from-kvstore")
+ }
logger.Info(ctx, "Initialization of resource manager success!")
return &ResourceMgr
}
@@ -263,6 +265,52 @@
rsrcMgr.groupInfo = make(map[string]*GroupInfo)
}
+//LoadLocalCacheFromKVStore loads local maps
+func (rsrcMgr *OpenOltResourceMgr) LoadLocalCacheFromKVStore(ctx context.Context, PonIntfID uint32) error {
+
+ //List all the keys for OnuGemInfo
+ prefixPath := fmt.Sprintf(OnuGemInfoPathPathPrefix, PonIntfID)
+ keys, err := rsrcMgr.KVStore.List(ctx, prefixPath)
+ logger.Debug(ctx, "load-local-cache-from-KV-store-started")
+ if err != nil {
+ logger.Errorf(ctx, "failed-to-list-keys-from-path-%s", prefixPath)
+ return err
+ }
+ for path := range keys {
+ var Val []byte
+ var onugem OnuGemInfo
+ // Get rid of the path prefix
+ stringToBeReplaced := rsrcMgr.KVStore.PathPrefix + "/"
+ replacedWith := ""
+ path = strings.Replace(path, stringToBeReplaced, replacedWith, 1)
+
+ value, err := rsrcMgr.KVStore.Get(ctx, path)
+ if err != nil {
+ logger.Errorw(ctx, "failed-to-get-from-kv-store", log.Fields{"path": path})
+ return err
+ } else if value == nil {
+ logger.Debug(ctx, "no-onugeminfo-for-path", log.Fields{"path": path})
+ continue
+ }
+ if Val, err = kvstore.ToByte(value.Value); err != nil {
+ logger.Error(ctx, "failed-to-covert-to-byte-array")
+ return err
+ }
+ if err = json.Unmarshal(Val, &onugem); err != nil {
+ logger.Error(ctx, "failed-to-unmarshall")
+ return err
+ }
+ logger.Debugw(ctx, "found-onugeminfo-from-path", log.Fields{"path": path, "onuGemInfo": onugem})
+
+ rsrcMgr.onuGemInfoLock.Lock()
+ rsrcMgr.onuGemInfo[path] = &onugem
+ rsrcMgr.onuGemInfoLock.Unlock()
+
+ }
+ logger.Debug(ctx, "load-local-cache-from-KV-store-finished")
+ return nil
+}
+
// InitializeDeviceResourceRangeAndPool initializes the resource range pool according to the sharing type, then apply
// device specific information. If KV doesn't exist
// or is broader than the device, the device's information will
@@ -948,6 +996,36 @@
return &onugem, nil
}
+//AddNewOnuGemInfoToCacheAndKvStore function adds a new onu gem info to cache and kvstore
+func (rsrcMgr *OpenOltResourceMgr) AddNewOnuGemInfoToCacheAndKvStore(ctx context.Context, intfID uint32, onuID uint32, serialNum string) error {
+
+ Path := fmt.Sprintf(OnuGemInfoPath, intfID, onuID)
+
+ rsrcMgr.onuGemInfoLock.Lock()
+ _, ok := rsrcMgr.onuGemInfo[Path]
+ rsrcMgr.onuGemInfoLock.Unlock()
+
+ // If the ONU already exists in onuGemInfo list, nothing to do
+ if ok {
+ logger.Debugw(ctx, "onu-id-already-exists-in-cache", log.Fields{"onuID": onuID, "serialNum": serialNum})
+ return nil
+ }
+
+ onuGemInfo := OnuGemInfo{OnuID: onuID, SerialNumber: serialNum, IntfID: intfID}
+
+ if err := rsrcMgr.AddOnuGemInfo(ctx, intfID, onuID, onuGemInfo); err != nil {
+ return err
+ }
+ logger.Infow(ctx, "added-onuinfo",
+ log.Fields{
+ "intf-id": intfID,
+ "onu-id": onuID,
+ "serial-num": serialNum,
+ "onu": onuGemInfo,
+ "device-id": rsrcMgr.DeviceID})
+ return nil
+}
+
// AddOnuGemInfo adds onu info on to the kvstore per interface
func (rsrcMgr *OpenOltResourceMgr) AddOnuGemInfo(ctx context.Context, intfID uint32, onuID uint32, onuGem OnuGemInfo) error {
@@ -965,7 +1043,7 @@
logger.Errorf(ctx, "Failed to update resource %s", Path)
return err
}
- logger.Debugw(ctx, "added onu gem info to store", log.Fields{"onuGemInfo": onuGem})
+ logger.Debugw(ctx, "added onu gem info to store", log.Fields{"onuGemInfo": onuGem, "Path": Path})
//update cache
rsrcMgr.onuGemInfoLock.Lock()
@@ -1173,6 +1251,34 @@
return flowIDs, nil
}
+// IsGemPortUsedByAnotherFlow returns true if given gem is used by another flow
+func (rsrcMgr *OpenOltResourceMgr) IsGemPortUsedByAnotherFlow(gemPortID uint32, flowID uint64) bool {
+ rsrcMgr.flowIDsForGemLock.RLock()
+ flowIDList := rsrcMgr.flowIDsForGem[gemPortID]
+ rsrcMgr.flowIDsForGemLock.RUnlock()
+ for _, id := range flowIDList {
+ if flowID != id {
+ return true
+ }
+ }
+ return false
+}
+
+// RegisterFlowIDForGem updates both cache and KV store for flowIDsForGem map
+func (rsrcMgr *OpenOltResourceMgr) RegisterFlowIDForGem(ctx context.Context, accessIntfID uint32, gemPortID uint32, flowFromCore *ofp.OfpFlowStats) error {
+ // get from cache
+ rsrcMgr.flowIDsForGemLock.RLock()
+ flowIDs, ok := rsrcMgr.flowIDsForGem[gemPortID]
+ rsrcMgr.flowIDsForGemLock.RUnlock()
+ if !ok {
+ flowIDs = []uint64{flowFromCore.Id}
+ } else {
+ flowIDs = appendUnique64bit(flowIDs, flowFromCore.Id)
+ }
+ // update the flowids for a gem to the KVstore
+ return rsrcMgr.UpdateFlowIDsForGem(ctx, accessIntfID, gemPortID, flowIDs)
+}
+
//UpdateFlowIDsForGem updates flow id per gemport
func (rsrcMgr *OpenOltResourceMgr) UpdateFlowIDsForGem(ctx context.Context, intf uint32, gem uint32, flowIDs []uint64) error {
var val []byte
@@ -1424,6 +1530,17 @@
return false, groupInfo, nil
}
+// GetOnuGemInfoList returns all gems in the onuGemInfo map
+func (rsrcMgr *OpenOltResourceMgr) GetOnuGemInfoList(ctx context.Context) []OnuGemInfo {
+ var onuGemInfoLst []OnuGemInfo
+ rsrcMgr.onuGemInfoLock.RLock()
+ defer rsrcMgr.onuGemInfoLock.RUnlock()
+ for _, v := range rsrcMgr.onuGemInfo {
+ onuGemInfoLst = append(onuGemInfoLst, *v)
+ }
+ return onuGemInfoLst
+}
+
// toByte converts an interface value to a []byte. The interface should either be of
// a string type or []byte. Otherwise, an error is returned.
func toByte(value interface{}) ([]byte, error) {
@@ -1436,3 +1553,12 @@
return nil, fmt.Errorf("unexpected-type-%T", t)
}
}
+
+func appendUnique64bit(slice []uint64, item uint64) []uint64 {
+ for _, sliceElement := range slice {
+ if sliceElement == item {
+ return slice
+ }
+ }
+ return append(slice, item)
+}
diff --git a/internal/pkg/resourcemanager/resourcemanager_test.go b/internal/pkg/resourcemanager/resourcemanager_test.go
index 5392eb7..85a5811 100644
--- a/internal/pkg/resourcemanager/resourcemanager_test.go
+++ b/internal/pkg/resourcemanager/resourcemanager_test.go
@@ -27,6 +27,7 @@
"context"
"encoding/json"
"errors"
+ "fmt"
"reflect"
"strconv"
"strings"
@@ -493,6 +494,162 @@
}
}
+func TestOpenOltResourceMgr_deleteGemPort(t *testing.T) {
+
+ type args struct {
+ intfID uint32
+ onuID uint32
+ gemPortIDs []uint32
+ gemPortIDsToBeDeleted []uint32
+ gemPortIDsRemaining []uint32
+ serialNum string
+ finalLength int
+ }
+ tests := []struct {
+ name string
+ fields *fields
+ args args
+ }{
+ // Add/Delete single gem port
+ {"DeleteGemPortFromLocalCache1", getResMgr(), args{0, 1, []uint32{1}, []uint32{1}, []uint32{}, "onu1", 0}},
+ // Delete all gemports
+ {"DeleteGemPortFromLocalCache2", getResMgr(), args{0, 1, []uint32{1, 2, 3, 4}, []uint32{1, 2, 3, 4}, []uint32{}, "onu1", 0}},
+ // Try to delete when there is no gem port
+ {"DeleteGemPortFromLocalCache3", getResMgr(), args{0, 1, []uint32{}, []uint32{1, 2}, nil, "onu1", 0}},
+ // Try to delete non-existent gem port
+ {"DeleteGemPortFromLocalCache4", getResMgr(), args{0, 1, []uint32{1}, []uint32{2}, []uint32{1}, "onu1", 1}},
+ // Try to delete two of the gem ports
+ {"DeleteGemPortFromLocalCache5", getResMgr(), args{0, 1, []uint32{1, 2, 3, 4}, []uint32{2, 4}, []uint32{1, 3}, "onu1", 2}},
+ }
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ RsrcMgr := testResMgrObject(tt.fields)
+ if err := RsrcMgr.DelOnuGemInfo(ctx, tt.args.intfID, tt.args.onuID); err != nil {
+ t.Errorf("failed to remove onu")
+ }
+ if err := RsrcMgr.AddNewOnuGemInfoToCacheAndKvStore(ctx, tt.args.intfID, tt.args.onuID, tt.args.serialNum); err != nil {
+ t.Errorf("failed to add onu")
+ }
+ for _, gemPort := range tt.args.gemPortIDs {
+ if err := RsrcMgr.AddGemToOnuGemInfo(ctx, tt.args.intfID, tt.args.onuID, gemPort); err != nil {
+ t.Errorf("failed to add gem to onu")
+ }
+ }
+ for _, gemPortDeleted := range tt.args.gemPortIDsToBeDeleted {
+ if err := RsrcMgr.RemoveGemFromOnuGemInfo(ctx, tt.args.intfID, tt.args.onuID, gemPortDeleted); err != nil {
+ t.Errorf("failed to remove gem from onu")
+ }
+ }
+ lenofGemPorts := 0
+ gP, err := RsrcMgr.GetOnuGemInfo(ctx, tt.args.intfID, tt.args.onuID)
+ if err != nil || gP == nil {
+ t.Errorf("failed to get onuGemInfo")
+ }
+ var gemPorts []uint32
+
+ lenofGemPorts = len(gP.GemPorts)
+ gemPorts = gP.GemPorts
+
+ if lenofGemPorts != tt.args.finalLength {
+ t.Errorf("GemPorts length is not as expected len = %d, want %d", lenofGemPorts, tt.args.finalLength)
+ }
+
+ if !reflect.DeepEqual(tt.args.gemPortIDsRemaining, gemPorts) {
+ t.Errorf("GemPorts are not as expected = %v, want %v", gemPorts, tt.args.gemPortIDsRemaining)
+ }
+ })
+ }
+}
+
+func TestOpenOltResourceMgr_AddNewOnuGemInfo(t *testing.T) {
+
+ type args struct {
+ PONIntfID uint32
+ OnuCount uint32
+ }
+ tests := []struct {
+ name string
+ fields *fields
+ args args
+ want error
+ }{
+ {"AddNewOnuGemInfoForIntf-0", getResMgr(), args{0, 32}, nil},
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ RsrcMgr := testResMgrObject(tt.fields)
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ for j := 1; j <= int(tt.args.OnuCount); j++ {
+ go func(i uint32, j uint32) {
+ // TODO: actually verify success
+ _ = RsrcMgr.AddNewOnuGemInfoToCacheAndKvStore(ctx, i, i, fmt.Sprintf("onu-%d", i))
+ }(tt.args.PONIntfID, uint32(j))
+ }
+ })
+ }
+}
+
+func TestOpenOltFlowMgr_addGemPortToOnuInfoMap(t *testing.T) {
+
+ type args struct {
+ intfID uint32
+ onuID uint32
+ gemPortIDs []uint32
+ gemPortIDsRemaining []uint32
+ serialNum string
+ finalLength int
+ }
+ tests := []struct {
+ name string
+ fields *fields
+ args args
+ }{
+ // Add single gem port
+ {"addGemPortToOnuInfoMap1", getResMgr(), args{0, 1, []uint32{1}, []uint32{1}, "onu1", 1}},
+ // Delete all gemports
+ {"addGemPortToOnuInfoMap2", getResMgr(), args{0, 1, []uint32{1, 2, 3, 4}, []uint32{1, 2, 3, 4}, "onu1", 4}},
+ // Do not add any gemport
+ {"addGemPortToOnuInfoMap3", getResMgr(), args{0, 1, []uint32{}, nil, "onu1", 0}},
+ }
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ RsrcMgr := testResMgrObject(tt.fields)
+ if err := RsrcMgr.DelOnuGemInfo(ctx, tt.args.intfID, tt.args.onuID); err != nil {
+ t.Errorf("failed to remove onu")
+ }
+ if err := RsrcMgr.AddNewOnuGemInfoToCacheAndKvStore(ctx, tt.args.intfID, tt.args.onuID, tt.args.serialNum); err != nil {
+ t.Errorf("failed to add onu")
+ }
+ for _, gemPort := range tt.args.gemPortIDs {
+ if err := RsrcMgr.AddGemToOnuGemInfo(ctx, tt.args.intfID, tt.args.onuID, gemPort); err != nil {
+ t.Errorf("failed to add gem to onu")
+ }
+ }
+
+ lenofGemPorts := 0
+ gP, err := RsrcMgr.GetOnuGemInfo(ctx, tt.args.intfID, tt.args.onuID)
+
+ var gemPorts []uint32
+ if err == nil && gP != nil {
+ lenofGemPorts = len(gP.GemPorts)
+ gemPorts = gP.GemPorts
+ }
+ if lenofGemPorts != tt.args.finalLength {
+ t.Errorf("GemPorts length is not as expected len = %d, want %d", lenofGemPorts, tt.args.finalLength)
+ }
+
+ if !reflect.DeepEqual(tt.args.gemPortIDsRemaining, gemPorts) {
+ t.Errorf("GemPorts are not as expected = %v, want %v", gemPorts, tt.args.gemPortIDsRemaining)
+ }
+ })
+ }
+}
+
func TestOpenOltResourceMgr_GetCurrentGEMPortIDsForOnu(t *testing.T) {
type args struct {
intfID uint32