[VOL-4291] Rw-core updates for gRPC migration
Change-Id: I8d5a554409115b29318089671ca4e1ab3fa98810
diff --git a/rw_core/core/device/logical_agent_test.go b/rw_core/core/device/logical_agent_test.go
index 8114dd0..e611019 100644
--- a/rw_core/core/device/logical_agent_test.go
+++ b/rw_core/core/device/logical_agent_test.go
@@ -28,15 +28,15 @@
"github.com/opencord/voltha-go/rw_core/config"
"github.com/opencord/voltha-go/rw_core/core/adapter"
tst "github.com/opencord/voltha-go/rw_core/test"
- com "github.com/opencord/voltha-lib-go/v5/pkg/adapters/common"
- "github.com/opencord/voltha-lib-go/v5/pkg/db"
- "github.com/opencord/voltha-lib-go/v5/pkg/events"
- fu "github.com/opencord/voltha-lib-go/v5/pkg/flows"
- "github.com/opencord/voltha-lib-go/v5/pkg/kafka"
- mock_etcd "github.com/opencord/voltha-lib-go/v5/pkg/mocks/etcd"
- mock_kafka "github.com/opencord/voltha-lib-go/v5/pkg/mocks/kafka"
- ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
- "github.com/opencord/voltha-protos/v4/go/voltha"
+ com "github.com/opencord/voltha-lib-go/v7/pkg/adapters/common"
+ "github.com/opencord/voltha-lib-go/v7/pkg/db"
+ "github.com/opencord/voltha-lib-go/v7/pkg/events"
+ fu "github.com/opencord/voltha-lib-go/v7/pkg/flows"
+ "github.com/opencord/voltha-lib-go/v7/pkg/kafka"
+ mock_etcd "github.com/opencord/voltha-lib-go/v7/pkg/mocks/etcd"
+ mock_kafka "github.com/opencord/voltha-lib-go/v7/pkg/mocks/kafka"
+ ofp "github.com/opencord/voltha-protos/v5/go/openflow_13"
+ "github.com/opencord/voltha-protos/v5/go/voltha"
"github.com/phayes/freeport"
"github.com/stretchr/testify/assert"
)
@@ -44,7 +44,6 @@
type LDATest struct {
etcdServer *mock_etcd.EtcdServer
deviceMgr *Manager
- kmp kafka.InterContainerProxy
logicalDeviceMgr *LogicalManager
kClient kafka.Client
kEventClient kafka.Client
@@ -52,8 +51,8 @@
oltAdapterName string
onuAdapterName string
coreInstanceID string
- defaultTimeout time.Duration
- maxTimeout time.Duration
+ internalTimeout time.Duration
+ rpcTimeout time.Duration
logicalDevice *voltha.LogicalDevice
logicalPorts map[uint32]*voltha.LogicalPort
deviceIds []string
@@ -74,8 +73,8 @@
test.oltAdapterName = "olt_adapter_mock"
test.onuAdapterName = "onu_adapter_mock"
test.coreInstanceID = "rw-da-test"
- test.defaultTimeout = 5 * time.Second
- test.maxTimeout = 20 * time.Second
+ test.internalTimeout = 5 * time.Second
+ test.rpcTimeout = 20 * time.Second
test.done = make(chan int)
test.deviceIds = []string{com.GetRandomString(10), com.GetRandomString(10), com.GetRandomString(10)}
test.logicalDevice = &voltha.LogicalDevice{
@@ -138,15 +137,14 @@
func (lda *LDATest) startCore(ctx context.Context, inCompeteMode bool) {
cfg := &config.RWCoreFlags{}
cfg.ParseCommandArguments([]string{})
- cfg.CoreTopic = "rw_core"
cfg.EventTopic = "voltha.events"
- cfg.DefaultRequestTimeout = lda.defaultTimeout
+ cfg.InternalTimeout = lda.internalTimeout
cfg.KVStoreAddress = "127.0.0.1" + ":" + strconv.Itoa(lda.kvClientPort)
grpcPort, err := freeport.GetFreePort()
if err != nil {
logger.Fatal(ctx, "Cannot get a freeport for grpc")
}
- cfg.GrpcAddress = "127.0.0.1" + ":" + strconv.Itoa(grpcPort)
+ cfg.GrpcNBIAddress = "127.0.0.1" + ":" + strconv.Itoa(grpcPort)
client := tst.SetupKVClient(ctx, cfg, lda.coreInstanceID)
backend := &db.Backend{
Client: client,
@@ -154,29 +152,18 @@
Address: cfg.KVStoreAddress,
Timeout: cfg.KVStoreTimeout,
LivenessChannelInterval: cfg.LiveProbeInterval / 2}
- lda.kmp = kafka.NewInterContainerProxy(
- kafka.InterContainerAddress(cfg.KafkaAdapterAddress),
- kafka.MsgClient(lda.kClient),
- kafka.DefaultTopic(&kafka.Topic{Name: cfg.CoreTopic}))
- endpointMgr := kafka.NewEndpointManager(backend)
proxy := model.NewDBPath(backend)
- adapterMgr := adapter.NewAdapterManager(ctx, proxy, lda.coreInstanceID, lda.kClient)
+ adapterMgr := adapter.NewAdapterManager(proxy, lda.coreInstanceID, backend, 5)
eventProxy := events.NewEventProxy(events.MsgClient(lda.kEventClient), events.MsgTopic(kafka.Topic{Name: cfg.EventTopic}))
- lda.deviceMgr, lda.logicalDeviceMgr = NewManagers(proxy, adapterMgr, lda.kmp, endpointMgr, cfg, lda.coreInstanceID, eventProxy)
- if err = lda.kmp.Start(ctx); err != nil {
- logger.Fatal(ctx, "Cannot start InterContainerProxy")
- }
- adapterMgr.Start(context.Background())
+ lda.deviceMgr, lda.logicalDeviceMgr = NewManagers(proxy, adapterMgr, cfg, lda.coreInstanceID, eventProxy)
+ adapterMgr.Start(context.Background(), "logical-test")
}
func (lda *LDATest) stopAll(ctx context.Context) {
if lda.kClient != nil {
lda.kClient.Stop(ctx)
}
- if lda.kmp != nil {
- lda.kmp.Stop(ctx)
- }
if lda.etcdServer != nil {
tst.StopEmbeddedEtcdServer(ctx, lda.etcdServer)
}
@@ -191,7 +178,7 @@
clonedLD := proto.Clone(lda.logicalDevice).(*voltha.LogicalDevice)
clonedLD.Id = com.GetRandomString(10)
clonedLD.DatapathId = rand.Uint64()
- lDeviceAgent := newLogicalAgent(context.Background(), clonedLD.Id, clonedLD.Id, clonedLD.RootDeviceId, lDeviceMgr, deviceMgr, lDeviceMgr.dbPath, lDeviceMgr.ldProxy, lDeviceMgr.defaultTimeout)
+ lDeviceAgent := newLogicalAgent(context.Background(), clonedLD.Id, clonedLD.Id, clonedLD.RootDeviceId, lDeviceMgr, deviceMgr, lDeviceMgr.dbPath, lDeviceMgr.ldProxy, lDeviceMgr.internalTimeout)
lDeviceAgent.logicalDevice = clonedLD
for _, port := range lda.logicalPorts {
clonedPort := proto.Clone(port).(*voltha.LogicalPort)