VOL-4077: Improve storage usage on etcd
- Do away with unnecessary data storage on etcd if it can be
reconciled on adapter restart
- For data that needs storage, use lesser footprint if possible
- Use write-through-cache for all data stored on etcd via
resource manager module
- Use ResourceManager module per interface to localize lock
contention per PON port
Change-Id: I21d38216fab195d738a446b3f96a00251569e38b
diff --git a/pkg/mocks/common.go b/pkg/mocks/common.go
index 7a67acf..360007a 100644
--- a/pkg/mocks/common.go
+++ b/pkg/mocks/common.go
@@ -18,7 +18,7 @@
package mocks
import (
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
)
var logger log.CLogger
diff --git a/pkg/mocks/mockAdapterProxy.go b/pkg/mocks/mockAdapterProxy.go
index c410b48..20ea24d 100644
--- a/pkg/mocks/mockAdapterProxy.go
+++ b/pkg/mocks/mockAdapterProxy.go
@@ -43,3 +43,16 @@
}
return nil
}
+
+// TechProfileInstanceRequest mocks TechProfileInstanceRequest function
+func (ma *MockAdapterProxy) TechProfileInstanceRequest(ctx context.Context,
+ tpPath string,
+ parentPonPort uint32,
+ onuID uint32,
+ uniID uint32,
+ fromAdapter string,
+ toAdapter string,
+ toDeviceID string,
+ proxyDeviceID string) (*inter_container.InterAdapterTechProfileDownloadMessage, error) {
+ return nil, nil
+}
diff --git a/pkg/mocks/mockCoreProxy.go b/pkg/mocks/mockCoreProxy.go
index e143fbd..f572f09 100644
--- a/pkg/mocks/mockCoreProxy.go
+++ b/pkg/mocks/mockCoreProxy.go
@@ -22,7 +22,7 @@
"errors"
"fmt"
- "github.com/opencord/voltha-lib-go/v4/pkg/kafka"
+ "github.com/opencord/voltha-lib-go/v5/pkg/kafka"
"github.com/opencord/voltha-protos/v4/go/voltha"
)
@@ -132,6 +132,11 @@
if parentdeviceID == "" {
return nil, errors.New("no deviceID")
}
+ for k, v := range mcp.Devices {
+ if k == "olt" {
+ return v, nil
+ }
+ }
return nil, nil
}
diff --git a/pkg/mocks/mockKVClient.go b/pkg/mocks/mockKVClient.go
index 164e896..6884fc7 100644
--- a/pkg/mocks/mockKVClient.go
+++ b/pkg/mocks/mockKVClient.go
@@ -25,12 +25,9 @@
"strings"
"time"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
- "github.com/opencord/voltha-openolt-adapter/internal/pkg/resourcemanager"
-
- "github.com/opencord/voltha-lib-go/v4/pkg/db/kvstore"
+ "github.com/opencord/voltha-lib-go/v5/pkg/db/kvstore"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
- openolt "github.com/opencord/voltha-protos/v4/go/openolt"
)
const (
@@ -62,6 +59,21 @@
type MockKVClient struct {
}
+// OnuGemInfo holds onu information along with gem port list and uni port list
+type OnuGemInfo struct {
+ OnuID uint32
+ SerialNumber string
+ IntfID uint32
+ GemPorts []uint32
+ UniPorts []uint32
+}
+
+// GroupInfo holds group information
+type GroupInfo struct {
+ GroupID uint32
+ OutPorts []uint32
+}
+
// List mock function implementation for KVClient
func (kvclient *MockKVClient) List(ctx context.Context, key string) (map[string]*kvstore.KVPair, error) {
if key != "" {
@@ -136,24 +148,7 @@
str, _ := json.Marshal(data)
return kvstore.NewKVPair(key, str, "mock", 3000, 1), nil
}
- if strings.Contains(key, "/{olt}/{0,-1,-1}/flow_id_info/") {
- //multicast flow
- data := resourcemanager.FlowInfo{
- Flow: &openolt.Flow{FlowId: 1, OnuId: 0, UniId: 0, GemportId: 4000},
- }
- logger.Debug(ctx, "Error Error Error Key:", FlowIDs)
- str, _ := json.Marshal(data)
- return kvstore.NewKVPair(key, str, "mock", 3000, 1), nil
- }
- if strings.Contains(key, FlowIDInfo) {
- data := resourcemanager.FlowInfo{
- Flow: &openolt.Flow{FlowId: 1, OnuId: 1, UniId: 1, GemportId: 1},
- }
- logger.Debug(ctx, "Error Error Error Key:", FlowIDs)
- str, _ := json.Marshal(data)
- return kvstore.NewKVPair(key, str, "mock", 3000, 1), nil
- }
if strings.Contains(key, GemportIDs) {
logger.Debug(ctx, "Error Error Error Key:", GemportIDs)
data := []uint32{1}
@@ -168,7 +163,7 @@
}
if strings.Contains(key, FlowGroup) || strings.Contains(key, FlowGroupCached) {
logger.Debug(ctx, "Error Error Error Key:", FlowGroup)
- groupInfo := resourcemanager.GroupInfo{
+ groupInfo := GroupInfo{
GroupID: 2,
OutPorts: []uint32{1},
}
@@ -178,11 +173,13 @@
if strings.Contains(key, OnuPacketIn) {
return getPacketInGemPort(key)
}
+
if strings.Contains(key, OnuGemInfoPath) {
- var data []resourcemanager.OnuGemInfo
+ var data []OnuGemInfo
str, _ := json.Marshal(data)
return kvstore.NewKVPair(key, str, "mock", 3000, 1), nil
}
+
//Interface, GEM port path
if strings.Contains(key, "0,255") {
//return onuID, uniID associated with the given interface and GEM port
diff --git a/pkg/mocks/mockTechprofile.go b/pkg/mocks/mockTechprofile.go
index e51f44a..e06d016 100644
--- a/pkg/mocks/mockTechprofile.go
+++ b/pkg/mocks/mockTechprofile.go
@@ -20,8 +20,7 @@
import (
"context"
- "github.com/opencord/voltha-lib-go/v4/pkg/db"
- tp "github.com/opencord/voltha-lib-go/v4/pkg/techprofile"
+ "github.com/opencord/voltha-lib-go/v5/pkg/db"
tp_pb "github.com/opencord/voltha-protos/v4/go/tech_profile"
)
@@ -35,103 +34,98 @@
return &db.Backend{Client: &MockKVClient{}}
}
-// GetTechProfileInstanceKVPath to mock techprofile GetTechProfileInstanceKVPath method
-func (m MockTechProfile) GetTechProfileInstanceKVPath(ctx context.Context, techProfiletblID uint32, uniPortName string) string {
- return ""
-
-}
-
-// GetTPInstanceFromKVStore to mock techprofile GetTPInstanceFromKVStore method
-func (m MockTechProfile) GetTPInstanceFromKVStore(ctx context.Context, techProfiletblID uint32, path string) (interface{}, error) {
+// GetTPInstance to mock techprofile GetTPInstance method
+func (m MockTechProfile) GetTPInstance(ctx context.Context, path string) (interface{}, error) {
logger.Debug(ctx, "GetTPInstanceFromKVStore")
- if techProfiletblID == 64 {
- return &tp.TechProfile{
- Name: "mock-tech-profile",
- SubscriberIdentifier: "257",
- ProfileType: "mock",
- Version: 0,
- NumGemPorts: 1,
- UsScheduler: tp.IScheduler{
- AllocID: 1,
- Direction: "upstream",
- AdditionalBw: "None",
- Priority: 0,
- Weight: 0,
- QSchedPolicy: "",
- },
- DsScheduler: tp.IScheduler{
- AllocID: 1,
- Direction: "downstream",
- AdditionalBw: "None",
- Priority: 0,
- Weight: 0,
- QSchedPolicy: "",
- },
- UpstreamGemPortAttributeList: []tp.IGemPortAttribute{{
- GemportID: 1,
- PbitMap: "0b11111111",
- },
- },
- DownstreamGemPortAttributeList: []tp.IGemPortAttribute{{
- GemportID: 1,
- PbitMap: "0b11111111",
- },
- },
- }, nil
- } else if techProfiletblID == 65 {
- return &tp.EponProfile{
- Name: "mock-epon-profile",
- SubscriberIdentifier: "257",
- ProfileType: "mock",
- Version: 0,
- NumGemPorts: 2,
- UpstreamQueueAttributeList: nil,
- DownstreamQueueAttributeList: nil,
- }, nil
- } else {
- return nil, nil
- }
+ var usGemPortAttributeList []*tp_pb.GemPortAttributes
+ var dsGemPortAttributeList []*tp_pb.GemPortAttributes
+ usGemPortAttributeList = append(usGemPortAttributeList, &tp_pb.GemPortAttributes{
+ GemportId: 1,
+ PbitMap: "0b11111111",
+ })
+ dsGemPortAttributeList = append(dsGemPortAttributeList, &tp_pb.GemPortAttributes{
+ GemportId: 1,
+ PbitMap: "0b11111111",
+ })
+ return &tp_pb.TechProfileInstance{
+ Name: "mock-tech-profile",
+ SubscriberIdentifier: "257",
+ ProfileType: "mock",
+ Version: 0,
+ NumGemPorts: 1,
+ InstanceControl: &tp_pb.InstanceControl{
+ Onu: "multi-instance",
+ Uni: "single-instance",
+ MaxGemPayloadSize: "",
+ },
+ UsScheduler: &tp_pb.SchedulerAttributes{
+ AllocId: 1,
+ Direction: tp_pb.Direction_UPSTREAM,
+ AdditionalBw: tp_pb.AdditionalBW_AdditionalBW_None,
+ Priority: 0,
+ Weight: 0,
+ QSchedPolicy: tp_pb.SchedulingPolicy_WRR,
+ },
+ DsScheduler: &tp_pb.SchedulerAttributes{
+ AllocId: 1,
+ Direction: tp_pb.Direction_DOWNSTREAM,
+ AdditionalBw: tp_pb.AdditionalBW_AdditionalBW_None,
+ Priority: 0,
+ Weight: 0,
+ QSchedPolicy: tp_pb.SchedulingPolicy_WRR,
+ },
+ UpstreamGemPortAttributeList: usGemPortAttributeList,
+ DownstreamGemPortAttributeList: dsGemPortAttributeList,
+ }, nil
+
}
-// CreateTechProfInstance to mock techprofile CreateTechProfInstance method
-func (m MockTechProfile) CreateTechProfInstance(ctx context.Context, techProfiletblID uint32, uniPortName string, intfID uint32) (interface{}, error) {
+// CreateTechProfileInstance to mock techprofile CreateTechProfileInstance method
+func (m MockTechProfile) CreateTechProfileInstance(ctx context.Context, techProfiletblID uint32, uniPortName string, intfID uint32) (interface{}, error) {
+ var usGemPortAttributeList []*tp_pb.GemPortAttributes
+ var dsGemPortAttributeList []*tp_pb.GemPortAttributes
if techProfiletblID == 64 {
- return &tp.TechProfile{
+ usGemPortAttributeList = append(usGemPortAttributeList, &tp_pb.GemPortAttributes{
+ GemportId: 1,
+ PbitMap: "0b11111111",
+ })
+ dsGemPortAttributeList = append(dsGemPortAttributeList, &tp_pb.GemPortAttributes{
+ GemportId: 1,
+ PbitMap: "0b11111111",
+ })
+ return &tp_pb.TechProfileInstance{
Name: "mock-tech-profile",
SubscriberIdentifier: "257",
ProfileType: "mock",
Version: 0,
NumGemPorts: 1,
- UsScheduler: tp.IScheduler{
- AllocID: 1,
- Direction: "upstream",
- AdditionalBw: "None",
+ InstanceControl: &tp_pb.InstanceControl{
+ Onu: "multi-instance",
+ Uni: "single-instance",
+ MaxGemPayloadSize: "",
+ },
+ UsScheduler: &tp_pb.SchedulerAttributes{
+ AllocId: 1,
+ Direction: tp_pb.Direction_UPSTREAM,
+ AdditionalBw: tp_pb.AdditionalBW_AdditionalBW_None,
Priority: 0,
Weight: 0,
- QSchedPolicy: "",
+ QSchedPolicy: tp_pb.SchedulingPolicy_WRR,
},
- DsScheduler: tp.IScheduler{
- AllocID: 1,
- Direction: "downstream",
- AdditionalBw: "None",
+ DsScheduler: &tp_pb.SchedulerAttributes{
+ AllocId: 1,
+ Direction: tp_pb.Direction_DOWNSTREAM,
+ AdditionalBw: tp_pb.AdditionalBW_AdditionalBW_None,
Priority: 0,
Weight: 0,
- QSchedPolicy: "",
+ QSchedPolicy: tp_pb.SchedulingPolicy_WRR,
},
- UpstreamGemPortAttributeList: []tp.IGemPortAttribute{{
- GemportID: 1,
- PbitMap: "0b11111111",
- },
- },
- DownstreamGemPortAttributeList: []tp.IGemPortAttribute{{
- GemportID: 1,
- PbitMap: "0b11111111",
- },
- },
+ UpstreamGemPortAttributeList: usGemPortAttributeList,
+ DownstreamGemPortAttributeList: dsGemPortAttributeList,
}, nil
} else if techProfiletblID == 65 {
- return &tp.EponProfile{
+ return &tp_pb.EponTechProfileInstance{
Name: "mock-epon-profile",
SubscriberIdentifier: "257",
ProfileType: "mock",
@@ -143,7 +137,6 @@
} else {
return nil, nil
}
-
}
// DeleteTechProfileInstance to mock techprofile DeleteTechProfileInstance method
@@ -158,37 +151,37 @@
}
// GetUsScheduler to mock techprofile GetUsScheduler method
-func (m MockTechProfile) GetUsScheduler(ctx context.Context, tpInstance *tp.TechProfile) (*tp_pb.SchedulerConfig, error) {
- return &tp_pb.SchedulerConfig{}, nil
+func (m MockTechProfile) GetUsScheduler(tpInstance *tp_pb.TechProfileInstance) *tp_pb.SchedulerConfig {
+ return &tp_pb.SchedulerConfig{}
}
// GetDsScheduler to mock techprofile GetDsScheduler method
-func (m MockTechProfile) GetDsScheduler(ctx context.Context, tpInstance *tp.TechProfile) (*tp_pb.SchedulerConfig, error) {
- return &tp_pb.SchedulerConfig{}, nil
+func (m MockTechProfile) GetDsScheduler(tpInstance *tp_pb.TechProfileInstance) *tp_pb.SchedulerConfig {
+ return &tp_pb.SchedulerConfig{}
}
// GetTrafficScheduler to mock techprofile GetTrafficScheduler method
-func (m MockTechProfile) GetTrafficScheduler(tpInstance *tp.TechProfile, SchedCfg *tp_pb.SchedulerConfig,
+func (m MockTechProfile) GetTrafficScheduler(tpInstance *tp_pb.TechProfileInstance, SchedCfg *tp_pb.SchedulerConfig,
ShapingCfg *tp_pb.TrafficShapingInfo) *tp_pb.TrafficScheduler {
return &tp_pb.TrafficScheduler{}
}
// GetTrafficQueues to mock techprofile GetTrafficQueues method
-func (m MockTechProfile) GetTrafficQueues(ctx context.Context, tp *tp.TechProfile, Dir tp_pb.Direction) ([]*tp_pb.TrafficQueue, error) {
+func (m MockTechProfile) GetTrafficQueues(ctx context.Context, tp *tp_pb.TechProfileInstance, Dir tp_pb.Direction) ([]*tp_pb.TrafficQueue, error) {
return []*tp_pb.TrafficQueue{{}}, nil
}
// GetMulticastTrafficQueues to mock techprofile GetMulticastTrafficQueues method
-func (m MockTechProfile) GetMulticastTrafficQueues(ctx context.Context, tp *tp.TechProfile) []*tp_pb.TrafficQueue {
+func (m MockTechProfile) GetMulticastTrafficQueues(ctx context.Context, tp *tp_pb.TechProfileInstance) []*tp_pb.TrafficQueue {
return []*tp_pb.TrafficQueue{{}}
}
// GetGemportForPbit to mock techprofile GetGemportForPbit method
func (m MockTechProfile) GetGemportForPbit(ctx context.Context, tpInst interface{}, Dir tp_pb.Direction, pbit uint32) interface{} {
- return tp.IGemPortAttribute{
- GemportID: 1,
+ return &tp_pb.GemPortAttributes{
+ GemportId: 1,
PbitMap: "0b11111111",
AesEncryption: "false",
}
@@ -196,7 +189,7 @@
// FindAllTpInstances to mock techprofile FindAllTpInstances method
func (m MockTechProfile) FindAllTpInstances(ctx context.Context, oltDeviceID string, tpID uint32, ponIntf uint32, onuID uint32) interface{} {
- return []tp.TechProfile{}
+ return []tp_pb.TechProfileInstance{}
}
// GetResourceID to mock techprofile GetResourceID method
@@ -208,3 +201,8 @@
func (m MockTechProfile) FreeResourceID(ctx context.Context, IntfID uint32, ResourceType string, ReleaseContent []uint32) error {
return nil
}
+
+// GetTechProfileInstanceKey to mock techprofile GetTechProfileInstanceKey method
+func (m MockTechProfile) GetTechProfileInstanceKey(ctx context.Context, tpID uint32, uniPortName string) string {
+ return ""
+}