VOL-3501 Code changes to support rpc event

Change-Id: I2536c0c03faa5fb026349c906ebef46323398e9a
diff --git a/rw_core/core/device/logical_agent_test.go b/rw_core/core/device/logical_agent_test.go
index dcd6691..71ecf26 100644
--- a/rw_core/core/device/logical_agent_test.go
+++ b/rw_core/core/device/logical_agent_test.go
@@ -30,6 +30,7 @@
 	tst "github.com/opencord/voltha-go/rw_core/test"
 	com "github.com/opencord/voltha-lib-go/v4/pkg/adapters/common"
 	"github.com/opencord/voltha-lib-go/v4/pkg/db"
+	"github.com/opencord/voltha-lib-go/v4/pkg/events"
 	fu "github.com/opencord/voltha-lib-go/v4/pkg/flows"
 	"github.com/opencord/voltha-lib-go/v4/pkg/kafka"
 	mock_etcd "github.com/opencord/voltha-lib-go/v4/pkg/mocks/etcd"
@@ -46,6 +47,7 @@
 	kmp              kafka.InterContainerProxy
 	logicalDeviceMgr *LogicalManager
 	kClient          kafka.Client
+	kEventClient     kafka.Client
 	kvClientPort     int
 	oltAdapterName   string
 	onuAdapterName   string
@@ -68,6 +70,7 @@
 	}
 	// Create the kafka client
 	test.kClient = mock_kafka.NewKafkaClient()
+	test.kEventClient = mock_kafka.NewKafkaClient()
 	test.oltAdapterName = "olt_adapter_mock"
 	test.onuAdapterName = "onu_adapter_mock"
 	test.coreInstanceID = "rw-da-test"
@@ -135,6 +138,7 @@
 func (lda *LDATest) startCore(ctx context.Context, inCompeteMode bool) {
 	cfg := config.NewRWCoreFlags()
 	cfg.CoreTopic = "rw_core"
+	cfg.EventTopic = "voltha.events"
 	cfg.DefaultRequestTimeout = lda.defaultTimeout
 	cfg.KVStoreAddress = "127.0.0.1" + ":" + strconv.Itoa(lda.kvClientPort)
 	grpcPort, err := freeport.GetFreePort()
@@ -157,8 +161,8 @@
 	endpointMgr := kafka.NewEndpointManager(backend)
 	proxy := model.NewDBPath(backend)
 	adapterMgr := adapter.NewAdapterManager(ctx, proxy, lda.coreInstanceID, lda.kClient)
-
-	lda.deviceMgr, lda.logicalDeviceMgr = NewManagers(proxy, adapterMgr, lda.kmp, endpointMgr, cfg.CoreTopic, lda.coreInstanceID, cfg.DefaultCoreTimeout)
+	eventProxy := events.NewEventProxy(events.MsgClient(lda.kEventClient), events.MsgTopic(kafka.Topic{Name: cfg.EventTopic}))
+	lda.deviceMgr, lda.logicalDeviceMgr = NewManagers(proxy, adapterMgr, lda.kmp, endpointMgr, cfg.CoreTopic, lda.coreInstanceID, cfg.DefaultCoreTimeout, eventProxy)
 	if err = lda.kmp.Start(ctx); err != nil {
 		logger.Fatal(ctx, "Cannot start InterContainerProxy")
 	}
@@ -175,6 +179,9 @@
 	if lda.etcdServer != nil {
 		tst.StopEmbeddedEtcdServer(ctx, lda.etcdServer)
 	}
+	if lda.kEventClient != nil {
+		lda.kEventClient.Stop(ctx)
+	}
 }
 
 func (lda *LDATest) createLogicalDeviceAgent(t *testing.T) *LogicalAgent {