[VOL-3624] Parametrizing the KV-Store path
Change-Id: I44d8d72e12f83ee6c79020dd98ace4d2f1155def
diff --git a/internal/pkg/config/config.go b/internal/pkg/config/config.go
index a19f2ef..e720890 100644
--- a/internal/pkg/config/config.go
+++ b/internal/pkg/config/config.go
@@ -60,7 +60,8 @@
// AdapterFlags represents the set of configurations used by the read-write adaptercore service
type AdapterFlags struct {
// Command line parameters
- InstanceID string
+ AdapterName string
+ InstanceID string // NOTE what am I used for? why not cli but only ENV? TODO expose in the chart
KafkaAdapterAddress string
KafkaClusterAddress string
KVStoreType string
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index 95b0a74..66ca4bb 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -34,10 +34,13 @@
"github.com/golang/protobuf/ptypes"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
+
"github.com/opencord/voltha-lib-go/v4/pkg/adapters/adapterif"
+ "github.com/opencord/voltha-lib-go/v4/pkg/config"
"github.com/opencord/voltha-lib-go/v4/pkg/flows"
"github.com/opencord/voltha-lib-go/v4/pkg/log"
"github.com/opencord/voltha-lib-go/v4/pkg/pmmetrics"
+
"github.com/opencord/voltha-openolt-adapter/internal/pkg/olterrors"
rsrcMgr "github.com/opencord/voltha-openolt-adapter/internal/pkg/resourcemanager"
"github.com/opencord/voltha-protos/v4/go/common"
@@ -57,6 +60,7 @@
//DeviceHandler will interact with the OLT device.
type DeviceHandler struct {
+ cm *config.ConfigManager
device *voltha.Device
coreProxy adapterif.CoreProxy
AdapterProxy adapterif.AdapterProxy
@@ -122,8 +126,9 @@
}
//NewDeviceHandler creates a new device handler
-func NewDeviceHandler(cp adapterif.CoreProxy, ap adapterif.AdapterProxy, ep adapterif.EventProxy, device *voltha.Device, adapter *OpenOLT) *DeviceHandler {
+func NewDeviceHandler(cp adapterif.CoreProxy, ap adapterif.AdapterProxy, ep adapterif.EventProxy, device *voltha.Device, adapter *OpenOLT, cm *config.ConfigManager) *DeviceHandler {
var dh DeviceHandler
+ dh.cm = cm
dh.coreProxy = cp
dh.AdapterProxy = ap
dh.EventProxy = ep
@@ -764,7 +769,7 @@
dh.totalPonPorts = deviceInfo.GetPonPorts()
// Instantiate resource manager
- if dh.resourceMgr = rsrcMgr.NewResourceMgr(ctx, dh.device.Id, dh.openOLT.KVStoreAddress, dh.openOLT.KVStoreType, dh.device.Type, deviceInfo); dh.resourceMgr == nil {
+ if dh.resourceMgr = rsrcMgr.NewResourceMgr(ctx, dh.device.Id, dh.openOLT.KVStoreAddress, dh.openOLT.KVStoreType, dh.device.Type, deviceInfo, dh.cm.Backend.PathPrefix); dh.resourceMgr == nil {
return olterrors.ErrResourceManagerInstantiating
}
diff --git a/internal/pkg/core/device_handler_test.go b/internal/pkg/core/device_handler_test.go
index 68309ad..bba8601 100644
--- a/internal/pkg/core/device_handler_test.go
+++ b/internal/pkg/core/device_handler_test.go
@@ -19,6 +19,7 @@
import (
"context"
+ conf "github.com/opencord/voltha-lib-go/v4/pkg/config"
"net"
"reflect"
"sync"
@@ -156,9 +157,10 @@
cp := newMockCoreProxy()
ap := &mocks.MockAdapterProxy{}
ep := &mocks.MockEventProxy{}
+ cm := &conf.ConfigManager{}
cfg := &config.AdapterFlags{OmccEncryption: true}
openOLT := &OpenOLT{coreProxy: cp, adapterProxy: ap, eventProxy: ep, config: cfg}
- dh := NewDeviceHandler(cp, ap, ep, device, openOLT)
+ dh := NewDeviceHandler(cp, ap, ep, device, openOLT, cm)
oopRanges := []*oop.DeviceInfo_DeviceResourceRanges{{
IntfIds: []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15},
Technology: "xgs-pon",
diff --git a/internal/pkg/core/openolt.go b/internal/pkg/core/openolt.go
index d1f94ce..11317b9 100644
--- a/internal/pkg/core/openolt.go
+++ b/internal/pkg/core/openolt.go
@@ -23,6 +23,7 @@
"time"
"github.com/opencord/voltha-lib-go/v4/pkg/adapters/adapterif"
+ conf "github.com/opencord/voltha-lib-go/v4/pkg/config"
"github.com/opencord/voltha-lib-go/v4/pkg/kafka"
"github.com/opencord/voltha-lib-go/v4/pkg/log"
"github.com/opencord/voltha-openolt-adapter/internal/pkg/config"
@@ -34,6 +35,7 @@
//OpenOLT structure holds the OLT information
type OpenOLT struct {
+ configManager *conf.ConfigManager
deviceHandlers map[string]*DeviceHandler
coreProxy adapterif.CoreProxy
adapterProxy adapterif.AdapterProxy
@@ -53,7 +55,7 @@
//NewOpenOLT returns a new instance of OpenOLT
func NewOpenOLT(ctx context.Context, kafkaICProxy kafka.InterContainerProxy,
coreProxy adapterif.CoreProxy, adapterProxy adapterif.AdapterProxy,
- eventProxy adapterif.EventProxy, cfg *config.AdapterFlags) *OpenOLT {
+ eventProxy adapterif.EventProxy, cfg *config.AdapterFlags, cm *conf.ConfigManager) *OpenOLT {
var openOLT OpenOLT
openOLT.exitChannel = make(chan int, 1)
openOLT.deviceHandlers = make(map[string]*DeviceHandler)
@@ -69,6 +71,7 @@
openOLT.HeartbeatFailReportInterval = cfg.HeartbeatFailReportInterval
openOLT.GrpcTimeoutInterval = cfg.GrpcTimeoutInterval
openOLT.lockDeviceHandlersMap = sync.RWMutex{}
+ openOLT.configManager = cm
return &openOLT
}
@@ -118,7 +121,7 @@
logger.Infow(ctx, "adopt-device", log.Fields{"device-id": device.Id})
var handler *DeviceHandler
if handler = oo.getDeviceHandler(device.Id); handler == nil {
- handler := NewDeviceHandler(oo.coreProxy, oo.adapterProxy, oo.eventProxy, device, oo)
+ handler := NewDeviceHandler(oo.coreProxy, oo.adapterProxy, oo.eventProxy, device, oo, oo.configManager)
oo.addDeviceHandlerToMap(handler)
go handler.AdoptDevice(ctx, device)
// Launch the creation of the device topic
@@ -173,7 +176,7 @@
logger.Infow(ctx, "reconcile-device", log.Fields{"device-id": device.Id})
var handler *DeviceHandler
if handler = oo.getDeviceHandler(device.Id); handler == nil {
- handler := NewDeviceHandler(oo.coreProxy, oo.adapterProxy, oo.eventProxy, device, oo)
+ handler := NewDeviceHandler(oo.coreProxy, oo.adapterProxy, oo.eventProxy, device, oo, oo.configManager)
oo.addDeviceHandlerToMap(handler)
handler.transitionMap = NewTransitionMap(handler)
handler.transitionMap.Handle(ctx, DeviceInit)
diff --git a/internal/pkg/core/openolt_flowmgr_test.go b/internal/pkg/core/openolt_flowmgr_test.go
index d8caa38..a7c671d 100644
--- a/internal/pkg/core/openolt_flowmgr_test.go
+++ b/internal/pkg/core/openolt_flowmgr_test.go
@@ -64,7 +64,7 @@
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
- rsrMgr := resourcemanager.NewResourceMgr(ctx, "olt", "127.0.0.1:2379", "etcd", "olt", deviceinfo)
+ rsrMgr := resourcemanager.NewResourceMgr(ctx, "olt", "127.0.0.1:2379", "etcd", "olt", deviceinfo, "service/voltha")
for key := range rsrMgr.ResourceMgrs {
rsrMgr.ResourceMgrs[key].KVStore = &db.Backend{}
rsrMgr.ResourceMgrs[key].KVStore.Client = &mocks.MockKVClient{}
diff --git a/internal/pkg/core/openolt_test.go b/internal/pkg/core/openolt_test.go
index cf08161..8f83f61 100644
--- a/internal/pkg/core/openolt_test.go
+++ b/internal/pkg/core/openolt_test.go
@@ -25,6 +25,7 @@
import (
"context"
"errors"
+ conf "github.com/opencord/voltha-lib-go/v4/pkg/config"
"reflect"
"testing"
@@ -99,19 +100,20 @@
name string
fields *fields
configFlags *config.AdapterFlags
+ cm *conf.ConfigManager
want *OpenOLT
}{
- {"newopenolt-1", &fields{}, &config.AdapterFlags{OnuNumber: 1, KVStoreAddress: "1.1.1.1:1", KVStoreType: "consul"},
+ {"newopenolt-1", &fields{}, &config.AdapterFlags{OnuNumber: 1, KVStoreAddress: "1.1.1.1:1", KVStoreType: "consul"}, &conf.ConfigManager{},
&OpenOLT{numOnus: 1, KVStoreAddress: "1.1.1.1:1", KVStoreType: "consul"}},
- {"newopenolt-2", &fields{}, &config.AdapterFlags{OnuNumber: 2, KVStoreAddress: "2.2.2.2:2", KVStoreType: "etcd"},
+ {"newopenolt-2", &fields{}, &config.AdapterFlags{OnuNumber: 2, KVStoreAddress: "2.2.2.2:2", KVStoreType: "etcd"}, &conf.ConfigManager{},
&OpenOLT{numOnus: 2, KVStoreAddress: "2.2.2.2:2", KVStoreType: "etcd"}},
- {"newopenolt-3", &fields{}, &config.AdapterFlags{OnuNumber: 3, KVStoreAddress: "3.3.3.3:3", KVStoreType: "consul"},
+ {"newopenolt-3", &fields{}, &config.AdapterFlags{OnuNumber: 3, KVStoreAddress: "3.3.3.3:3", KVStoreType: "consul"}, &conf.ConfigManager{},
&OpenOLT{numOnus: 3, KVStoreAddress: "3.3.3.3:3", KVStoreType: "consul"}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := NewOpenOLT(tt.fields.ctx, tt.fields.kafkaICProxy, tt.fields.coreProxy, tt.fields.adapterProxy,
- tt.fields.eventProxy, tt.configFlags); reflect.TypeOf(got) != reflect.TypeOf(tt.want) && got != nil {
+ tt.fields.eventProxy, tt.configFlags, tt.cm); reflect.TypeOf(got) != reflect.TypeOf(tt.want) && got != nil {
t.Errorf("NewOpenOLT() error = %v, wantErr %v", got, tt.want)
}
})
diff --git a/internal/pkg/resourcemanager/resourcemanager.go b/internal/pkg/resourcemanager/resourcemanager.go
index abfb73d..612ee8c 100755
--- a/internal/pkg/resourcemanager/resourcemanager.go
+++ b/internal/pkg/resourcemanager/resourcemanager.go
@@ -40,8 +40,8 @@
const (
// KvstoreTimeout specifies the time out for KV Store Connection
KvstoreTimeout = 5 * time.Second
- // BasePathKvStore - service/voltha/openolt/<device_id>
- BasePathKvStore = "service/voltha/openolt/{%s}"
+ // BasePathKvStore - <pathPrefix>/openolt/<device_id>
+ BasePathKvStore = "%s/openolt/{%s}"
// TpIDPathSuffix - <(pon_id, onu_id, uni_id)>/tp_id
TpIDPathSuffix = "{%d,%d,%d}/tp_id"
//MeterIDPathSuffix - <(pon_id, onu_id, uni_id)>/<tp_id>/meter_id/<direction>
@@ -140,7 +140,7 @@
}
// SetKVClient sets the KV client and return a kv backend
-func SetKVClient(ctx context.Context, backend string, addr string, DeviceID string) *db.Backend {
+func SetKVClient(ctx context.Context, backend string, addr string, DeviceID string, basePathKvStore string) *db.Backend {
// TODO : Make sure direct call to NewBackend is working fine with backend , currently there is some
// issue between kv store and backend , core is not calling NewBackend directly
kvClient, err := newKVClient(ctx, backend, addr, KvstoreTimeout)
@@ -154,7 +154,7 @@
StoreType: backend,
Address: addr,
Timeout: KvstoreTimeout,
- PathPrefix: fmt.Sprintf(BasePathKvStore, DeviceID)}
+ PathPrefix: fmt.Sprintf(BasePathKvStore, basePathKvStore, DeviceID)}
return kvbackend
}
@@ -162,7 +162,7 @@
// NewResourceMgr init a New resource manager instance which in turn instantiates pon resource manager
// instances according to technology. Initializes the default resource ranges for all
// the resources.
-func NewResourceMgr(ctx context.Context, deviceID string, KVStoreAddress string, kvStoreType string, deviceType string, devInfo *openolt.DeviceInfo) *OpenOltResourceMgr {
+func NewResourceMgr(ctx context.Context, deviceID string, KVStoreAddress string, kvStoreType string, deviceType string, devInfo *openolt.DeviceInfo, basePathKvStore string) *OpenOltResourceMgr {
var ResourceMgr OpenOltResourceMgr
logger.Debugf(ctx, "Init new resource manager , address: %s, device-id: %s", KVStoreAddress, deviceID)
ResourceMgr.DeviceID = deviceID
@@ -172,7 +172,7 @@
NumPONPorts := devInfo.GetPonPorts()
Backend := kvStoreType
- ResourceMgr.KVStore = SetKVClient(ctx, Backend, ResourceMgr.Address, deviceID)
+ ResourceMgr.KVStore = SetKVClient(ctx, Backend, ResourceMgr.Address, deviceID, basePathKvStore)
if ResourceMgr.KVStore == nil {
logger.Error(ctx, "Failed to setup KV store")
}
@@ -241,7 +241,7 @@
Ranges[technology] = TechRange
RsrcMgrsByTech[technology], err = ponrmgr.NewPONResourceManager(ctx, technology, deviceType, deviceID,
- Backend, ResourceMgr.Address)
+ Backend, ResourceMgr.Address, basePathKvStore)
if err != nil {
logger.Errorf(ctx, "Failed to create pon resource manager instance for technology %s", technology)
return nil
@@ -1213,7 +1213,7 @@
//remove them one by one
for key := range value {
// Formulate the right key path suffix ti be delete
- stringToBeReplaced := fmt.Sprintf(BasePathKvStore, RsrcMgr.DeviceID) + "/"
+ stringToBeReplaced := fmt.Sprintf(BasePathKvStore, RsrcMgr.KVStore.PathPrefix, RsrcMgr.DeviceID) + "/"
replacedWith := ""
key = strings.Replace(key, stringToBeReplaced, replacedWith, 1)
diff --git a/internal/pkg/resourcemanager/resourcemanager_test.go b/internal/pkg/resourcemanager/resourcemanager_test.go
index 5901c7c..50fcf4a 100644
--- a/internal/pkg/resourcemanager/resourcemanager_test.go
+++ b/internal/pkg/resourcemanager/resourcemanager_test.go
@@ -274,6 +274,7 @@
kvStoreType string
deviceType string
devInfo *openolt.DeviceInfo
+ kvStorePrefix string
}
tests := []struct {
name string
@@ -281,15 +282,15 @@
want *OpenOltResourceMgr
}{
{"NewResourceMgr-1", args{"olt1", "1:2", "consul",
- "onu", &openolt.DeviceInfo{OnuIdStart: 1, OnuIdEnd: 1}}, &OpenOltResourceMgr{}},
+ "onu", &openolt.DeviceInfo{OnuIdStart: 1, OnuIdEnd: 1}, "service/voltha"}, &OpenOltResourceMgr{}},
{"NewResourceMgr-2", args{"olt2", "3:4", "etcd",
- "onu", &openolt.DeviceInfo{OnuIdStart: 1, OnuIdEnd: 1}}, &OpenOltResourceMgr{}},
+ "onu", &openolt.DeviceInfo{OnuIdStart: 1, OnuIdEnd: 1}, "service/voltha"}, &OpenOltResourceMgr{}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
- if got := NewResourceMgr(ctx, tt.args.deviceID, tt.args.KVStoreAddress, tt.args.kvStoreType, tt.args.deviceType, tt.args.devInfo); reflect.TypeOf(got) != reflect.TypeOf(tt.want) {
+ if got := NewResourceMgr(ctx, tt.args.deviceID, tt.args.KVStoreAddress, tt.args.kvStoreType, tt.args.deviceType, tt.args.devInfo, tt.args.kvStorePrefix); reflect.TypeOf(got) != reflect.TypeOf(tt.want) {
t.Errorf("NewResourceMgr() = %v, want %v", got, tt.want)
}
})
@@ -827,21 +828,22 @@
func TestSetKVClient(t *testing.T) {
type args struct {
- backend string
- address string
- DeviceID string
+ backend string
+ address string
+ DeviceID string
+ kvStorePrefix string
}
tests := []struct {
name string
args args
want *db.Backend
}{
- {"setKVClient-1", args{"consul", "1.1.1.1:1", "olt1"}, &db.Backend{}},
- {"setKVClient-1", args{"etcd", "2.2.2.2:2", "olt2"}, &db.Backend{}},
+ {"setKVClient-1", args{"consul", "1.1.1.1:1", "olt1", "service/voltha"}, &db.Backend{}},
+ {"setKVClient-1", args{"etcd", "2.2.2.2:2", "olt2", "service/voltha"}, &db.Backend{}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- if got := SetKVClient(context.Background(), tt.args.backend, tt.args.address, tt.args.DeviceID); reflect.TypeOf(got) != reflect.TypeOf(tt.want) {
+ if got := SetKVClient(context.Background(), tt.args.backend, tt.args.address, tt.args.DeviceID, tt.args.kvStorePrefix); reflect.TypeOf(got) != reflect.TypeOf(tt.want) {
t.Errorf("SetKVClient() = %v, want %v", got, tt.want)
}
})