VOL-3501 Code changes to support rpc event
Change-Id: I2536c0c03faa5fb026349c906ebef46323398e9a
diff --git a/rw_core/core/device/agent_test.go b/rw_core/core/device/agent_test.go
index 3d06a68..6947447 100755
--- a/rw_core/core/device/agent_test.go
+++ b/rw_core/core/device/agent_test.go
@@ -35,6 +35,7 @@
tst "github.com/opencord/voltha-go/rw_core/test"
com "github.com/opencord/voltha-lib-go/v4/pkg/adapters/common"
"github.com/opencord/voltha-lib-go/v4/pkg/db"
+ "github.com/opencord/voltha-lib-go/v4/pkg/events"
"github.com/opencord/voltha-lib-go/v4/pkg/kafka"
"github.com/opencord/voltha-lib-go/v4/pkg/log"
mock_etcd "github.com/opencord/voltha-lib-go/v4/pkg/mocks/etcd"
@@ -52,6 +53,7 @@
adapterMgr *adapter.Manager
kmp kafka.InterContainerProxy
kClient kafka.Client
+ kEventClient kafka.Client
kvClientPort int
oltAdapter *cm.OLTAdapter
onuAdapter *cm.ONUAdapter
@@ -75,6 +77,7 @@
}
// Create the kafka client
test.kClient = mock_kafka.NewKafkaClient()
+ test.kEventClient = mock_kafka.NewKafkaClient()
test.oltAdapterName = "olt_adapter_mock"
test.onuAdapterName = "onu_adapter_mock"
test.coreInstanceID = "rw-da-test"
@@ -116,6 +119,7 @@
func (dat *DATest) startCore(ctx context.Context) {
cfg := config.NewRWCoreFlags()
cfg.CoreTopic = "rw_core"
+ cfg.EventTopic = "voltha.events"
cfg.DefaultRequestTimeout = dat.defaultTimeout
cfg.KVStoreAddress = "127.0.0.1" + ":" + strconv.Itoa(dat.kvClientPort)
grpcPort, err := freeport.GetFreePort()
@@ -138,8 +142,8 @@
endpointMgr := kafka.NewEndpointManager(backend)
proxy := model.NewDBPath(backend)
dat.adapterMgr = adapter.NewAdapterManager(ctx, proxy, dat.coreInstanceID, dat.kClient)
-
- dat.deviceMgr, dat.logicalDeviceMgr = NewManagers(proxy, dat.adapterMgr, dat.kmp, endpointMgr, cfg.CoreTopic, dat.coreInstanceID, cfg.DefaultCoreTimeout)
+ eventProxy := events.NewEventProxy(events.MsgClient(dat.kEventClient), events.MsgTopic(kafka.Topic{Name: cfg.EventTopic}))
+ dat.deviceMgr, dat.logicalDeviceMgr = NewManagers(proxy, dat.adapterMgr, dat.kmp, endpointMgr, cfg.CoreTopic, dat.coreInstanceID, cfg.DefaultCoreTimeout, eventProxy)
dat.adapterMgr.Start(context.Background())
if err = dat.kmp.Start(ctx); err != nil {
logger.Fatal(ctx, "Cannot start InterContainerProxy")
@@ -161,6 +165,9 @@
if dat.etcdServer != nil {
tst.StopEmbeddedEtcdServer(ctx, dat.etcdServer)
}
+ if dat.kEventClient != nil {
+ dat.kEventClient.Stop(ctx)
+ }
}
func (dat *DATest) createDeviceAgent(t *testing.T) *Agent {