VOL-4077: Improve storage usage on etcd
- Do away with unnecessary data storage on etcd if it can be
reconciled on adapter restart
- For data that needs storage, use lesser footprint if possible
- Use write-through-cache for all data stored on etcd via
resource manager module
- Use ResourceManager module per interface to localize lock
contention per PON port
Change-Id: I21d38216fab195d738a446b3f96a00251569e38b
diff --git a/internal/pkg/resourcemanager/resourcemanager_test.go b/internal/pkg/resourcemanager/resourcemanager_test.go
index 53f8898..443f418 100644
--- a/internal/pkg/resourcemanager/resourcemanager_test.go
+++ b/internal/pkg/resourcemanager/resourcemanager_test.go
@@ -27,19 +27,18 @@
"context"
"encoding/json"
"errors"
- tp "github.com/opencord/voltha-lib-go/v4/pkg/techprofile"
+ "github.com/opencord/voltha-openolt-adapter/pkg/mocks"
"reflect"
"strconv"
"strings"
- "sync"
"testing"
"time"
- "github.com/opencord/voltha-lib-go/v4/pkg/db"
- "github.com/opencord/voltha-lib-go/v4/pkg/db/kvstore"
- fu "github.com/opencord/voltha-lib-go/v4/pkg/flows"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
- ponrmgr "github.com/opencord/voltha-lib-go/v4/pkg/ponresourcemanager"
+ "github.com/opencord/voltha-lib-go/v5/pkg/db"
+ "github.com/opencord/voltha-lib-go/v5/pkg/db/kvstore"
+ fu "github.com/opencord/voltha-lib-go/v5/pkg/flows"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
+ ponrmgr "github.com/opencord/voltha-lib-go/v5/pkg/ponresourcemanager"
ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
"github.com/opencord/voltha-protos/v4/go/openolt"
)
@@ -77,7 +76,7 @@
KVStore *db.Backend
DeviceType string
DevInfo *openolt.DeviceInfo
- ResourceMgrs map[uint32]*ponrmgr.PONResourceManager
+ PonRsrMgr *ponrmgr.PONResourceManager
NumOfPonPorts uint32
}
@@ -87,12 +86,11 @@
// getResMgr mocks OpenOltResourceMgr struct.
func getResMgr() *fields {
- ctx := context.TODO()
var resMgr fields
resMgr.KVStore = &db.Backend{
Client: &MockResKVClient{},
}
- resMgr.ResourceMgrs = make(map[uint32]*ponrmgr.PONResourceManager)
+ resMgr.PonRsrMgr = &ponrmgr.PONResourceManager{}
ranges := make(map[string]interface{})
sharedIdxByType := make(map[string]string)
sharedIdxByType["ALLOC_ID"] = "ALLOC_ID"
@@ -108,25 +106,22 @@
ranges["gemport_id_shared"] = uint32(0)
ranges["flow_id_shared"] = uint32(0)
resMgr.NumOfPonPorts = 16
- ponMgr := &ponrmgr.PONResourceManager{}
- tpMgr, err := tp.NewTechProfile(ctx, ponMgr, "etcd", "127.0.0.1", "/")
- if err != nil {
- logger.Fatal(ctx, err.Error())
- }
-
- ponMgr.DeviceID = "onu-1"
- ponMgr.IntfIDs = []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}
- ponMgr.KVStore = &db.Backend{
+ resMgr.PonRsrMgr.DeviceID = "onu-1"
+ resMgr.PonRsrMgr.IntfIDs = []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}
+ resMgr.PonRsrMgr.KVStore = &db.Backend{
Client: &MockResKVClient{},
}
- ponMgr.PonResourceRanges = ranges
- ponMgr.SharedIdxByType = sharedIdxByType
- ponMgr.TechProfileMgr = tpMgr
+ resMgr.PonRsrMgr.Technology = "XGS-PON"
+ resMgr.PonRsrMgr.PonResourceRanges = ranges
+ resMgr.PonRsrMgr.SharedIdxByType = sharedIdxByType
+ /*
+ tpMgr, err := tp.NewTechProfile(ctx, resMgr.PonRsrMgr, "etcd", "127.0.0.1", "/")
+ if err != nil {
+ logger.Fatal(ctx, err.Error())
+ }
+ */
+ resMgr.PonRsrMgr.TechProfileMgr = &mocks.MockTechProfile{TpID: 64}
- var ponIntf uint32
- for ponIntf = 0; ponIntf < resMgr.NumOfPonPorts; ponIntf++ {
- resMgr.ResourceMgrs[ponIntf] = ponMgr
- }
return &resMgr
}
@@ -265,18 +260,15 @@
// testResMgrObject maps fields type to OpenOltResourceMgr type.
func testResMgrObject(testResMgr *fields) *OpenOltResourceMgr {
var rsrMgr = OpenOltResourceMgr{
- DeviceID: testResMgr.DeviceID,
- Args: testResMgr.Args,
- KVStore: testResMgr.KVStore,
- DeviceType: testResMgr.DeviceType,
- Address: testResMgr.Address,
- DevInfo: testResMgr.DevInfo,
- ResourceMgrs: testResMgr.ResourceMgrs,
+ DeviceID: testResMgr.DeviceID,
+ Args: testResMgr.Args,
+ KVStore: testResMgr.KVStore,
+ DeviceType: testResMgr.DeviceType,
+ Address: testResMgr.Address,
+ DevInfo: testResMgr.DevInfo,
+ PonRsrMgr: testResMgr.PonRsrMgr,
}
-
- rsrMgr.AllocIDMgmtLock = make([]sync.RWMutex, testResMgr.NumOfPonPorts)
- rsrMgr.GemPortIDMgmtLock = make([]sync.RWMutex, testResMgr.NumOfPonPorts)
- rsrMgr.OnuIDMgmtLock = make([]sync.RWMutex, testResMgr.NumOfPonPorts)
+ rsrMgr.InitLocalCache()
return &rsrMgr
}
@@ -284,6 +276,7 @@
func TestNewResourceMgr(t *testing.T) {
type args struct {
deviceID string
+ intfID uint32
KVStoreAddress string
kvStoreType string
deviceType string
@@ -295,14 +288,14 @@
args args
want *OpenOltResourceMgr
}{
- {"NewResourceMgr-2", args{"olt1", "1:2", "etcd",
+ {"NewResourceMgr-2", args{"olt1", 0, "1:2", "etcd",
"onu", &openolt.DeviceInfo{OnuIdStart: 1, OnuIdEnd: 1}, "service/voltha"}, &OpenOltResourceMgr{}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
- if got := NewResourceMgr(ctx, tt.args.deviceID, tt.args.KVStoreAddress, tt.args.kvStoreType, tt.args.deviceType, tt.args.devInfo, tt.args.kvStorePrefix); reflect.TypeOf(got) != reflect.TypeOf(tt.want) {
+ if got := NewResourceMgr(ctx, tt.args.intfID, tt.args.deviceID, tt.args.KVStoreAddress, tt.args.kvStoreType, tt.args.deviceType, tt.args.devInfo, tt.args.kvStorePrefix); reflect.TypeOf(got) != reflect.TypeOf(tt.want) {
t.Errorf("NewResourceMgr() = %v, want %v", got, tt.want)
}
})
@@ -310,19 +303,23 @@
}
func TestOpenOltResourceMgr_Delete(t *testing.T) {
+ type args struct {
+ intfID uint32
+ }
tests := []struct {
name string
fields *fields
wantErr error
+ args args
}{
- {"Delete-1", getResMgr(), errors.New("failed to clear device resource pool")},
+ {"Delete-1", getResMgr(), errors.New("failed to clear device resource pool"), args{intfID: 0}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
RsrcMgr := testResMgrObject(tt.fields)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
- if err := RsrcMgr.Delete(ctx); (err != nil) && reflect.TypeOf(err) != reflect.TypeOf(tt.wantErr) {
+ if err := RsrcMgr.Delete(ctx, tt.args.intfID); (err != nil) && reflect.TypeOf(err) != reflect.TypeOf(tt.wantErr) {
t.Errorf("Delete() error = %v, wantErr %v", err, tt.wantErr)
}
})
@@ -374,33 +371,6 @@
}
}
-func TestOpenOltResourceMgr_GetAllocID(t *testing.T) {
-
- type args struct {
- intfID uint32
- onuID uint32
- uniID uint32
- }
- tests := []struct {
- name string
- fields *fields
- args args
- want uint32
- }{
- {"GetAllocID-1", getResMgr(), args{1, 2, 2}, 0},
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- RsrcMgr := testResMgrObject(tt.fields)
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
- defer cancel()
- if got := RsrcMgr.GetAllocID(ctx, tt.args.intfID, tt.args.onuID, tt.args.uniID); reflect.TypeOf(got) != reflect.TypeOf(tt.want) {
- t.Errorf("GetAllocID() = %v, want %v", got, tt.want)
- }
- })
- }
-}
-
func TestOpenOltResourceMgr_GetCurrentAllocIDForOnu(t *testing.T) {
type args struct {
intfID uint32
@@ -420,8 +390,16 @@
RsrcMgr := testResMgrObject(tt.fields)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
- if got := RsrcMgr.GetCurrentAllocIDsForOnu(ctx, tt.args.intfID, tt.args.onuID, tt.args.uniID); !reflect.DeepEqual(got, tt.want) {
+ got := RsrcMgr.GetCurrentAllocIDsForOnu(ctx, tt.args.intfID, tt.args.onuID, tt.args.uniID)
+ if len(got) != len(tt.want) {
t.Errorf("GetCurrentAllocIDsForOnu() = %v, want %v", got, tt.want)
+ } else {
+ for i := range tt.want {
+ if got[i] != tt.want[i] {
+ t.Errorf("GetCurrentAllocIDsForOnu() = %v, want %v", got, tt.want)
+ break
+ }
+ }
}
})
}
@@ -484,40 +462,6 @@
}
}
-func TestOpenOltResourceMgr_GetGEMPortID(t *testing.T) {
- type args struct {
- ponPort uint32
- onuID uint32
- uniID uint32
- NumOfPorts uint32
- }
- tests := []struct {
- name string
- fields *fields
- args args
- want []uint32
- wantErr error
- }{
- {"GetGEMPortID-1", getResMgr(), args{1, 2, 2, 2}, []uint32{},
- errors.New("failed to get gem port")},
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- RsrcMgr := testResMgrObject(tt.fields)
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
- defer cancel()
- got, err := RsrcMgr.GetGEMPortID(ctx, tt.args.ponPort, tt.args.onuID, tt.args.uniID, tt.args.NumOfPorts)
- if reflect.TypeOf(err) != reflect.TypeOf(tt.wantErr) && err != nil {
- t.Errorf("GetGEMPortID() error = %v, wantErr %v", err, tt.wantErr)
- return
- }
- if reflect.TypeOf(got) != reflect.TypeOf(tt.want) {
- t.Errorf("GetGEMPortID() got = %v, want %v", got, tt.want)
- }
- })
- }
-}
-
func TestOpenOltResourceMgr_GetMeterInfoForOnu(t *testing.T) {
type args struct {
Direction string
@@ -693,34 +637,6 @@
}
}
-func TestOpenOltResourceMgr_UpdateFlowIDInfo(t *testing.T) {
- type args struct {
- ponIntfID int32
- onuID int32
- uniID int32
- flowID uint64
- flowData FlowInfo
- }
- tests := []struct {
- name string
- fields *fields
- args args
- wantErr error
- }{
- {"UpdateFlowIDInfo-1", getResMgr(), args{1, 2, 2, 2, FlowInfo{}}, errors.New("")},
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- RsrcMgr := testResMgrObject(tt.fields)
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
- defer cancel()
- if err := RsrcMgr.UpdateFlowIDInfo(ctx, uint32(tt.args.ponIntfID), tt.args.onuID, tt.args.uniID, tt.args.flowID, tt.args.flowData); err != nil && reflect.TypeOf(err) != reflect.TypeOf(tt.wantErr) {
- t.Errorf("UpdateFlowIDInfo() error = %v, wantErr %v", err, tt.wantErr)
- }
- })
- }
-}
-
func TestOpenOltResourceMgr_UpdateGEMPortIDsForOnu(t *testing.T) {
type args struct {
@@ -750,35 +666,6 @@
}
}
-func TestOpenOltResourceMgr_UpdateGEMportsPonportToOnuMapOnKVStore(t *testing.T) {
- type args struct {
- gemPorts []uint32
- PonPort uint32
- onuID uint32
- uniID uint32
- }
- tests := []struct {
- name string
- fields *fields
- args args
- wantErr error
- }{
- {"UpdateGEMportsPonportToOnuMapOnKVStore-1", getResMgr(), args{[]uint32{1, 2},
- 1, 2, 2}, errors.New("failed to update resource")},
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- RsrcMgr := testResMgrObject(tt.fields)
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
- defer cancel()
- if err := RsrcMgr.UpdateGEMportsPonportToOnuMapOnKVStore(ctx, tt.args.gemPorts, tt.args.PonPort,
- tt.args.onuID, tt.args.uniID); err != nil && reflect.TypeOf(err) != reflect.TypeOf(tt.wantErr) {
- t.Errorf("UpdateGEMportsPonportToOnuMapOnKVStore() error = %v, wantErr %v", err, tt.wantErr)
- }
- })
- }
-}
-
func TestOpenOltResourceMgr_UpdateMeterIDForOnu(t *testing.T) {
type args struct {
Direction string