VOL-3501 Code changes to support rpc event

Change-Id: I2536c0c03faa5fb026349c906ebef46323398e9a
diff --git a/rw_core/core/api/grpc_nbi_handler_test.go b/rw_core/core/api/grpc_nbi_handler_test.go
index 7436083..8766c7c 100755
--- a/rw_core/core/api/grpc_nbi_handler_test.go
+++ b/rw_core/core/api/grpc_nbi_handler_test.go
@@ -38,6 +38,7 @@
 	cm "github.com/opencord/voltha-go/rw_core/mocks"
 	tst "github.com/opencord/voltha-go/rw_core/test"
 	"github.com/opencord/voltha-lib-go/v4/pkg/db"
+	"github.com/opencord/voltha-lib-go/v4/pkg/events"
 	"github.com/opencord/voltha-lib-go/v4/pkg/flows"
 	"github.com/opencord/voltha-lib-go/v4/pkg/kafka"
 	mock_etcd "github.com/opencord/voltha-lib-go/v4/pkg/mocks/etcd"
@@ -59,6 +60,7 @@
 	adapterMgr        *adapter.Manager
 	kmp               kafka.InterContainerProxy
 	kClient           kafka.Client
+	kEventClient      kafka.Client
 	kvClientPort      int
 	numONUPerOLT      int
 	startingUNIPortNo int
@@ -81,6 +83,7 @@
 	}
 	// Create the kafka client
 	test.kClient = mock_kafka.NewKafkaClient()
+	test.kEventClient = mock_kafka.NewKafkaClient()
 	test.oltAdapterName = "olt_adapter_mock"
 	test.onuAdapterName = "onu_adapter_mock"
 	test.coreInstanceID = "rw-nbi-test"
@@ -94,6 +97,7 @@
 	defer cancel()
 	cfg := config.NewRWCoreFlags()
 	cfg.CoreTopic = "rw_core"
+	cfg.EventTopic = "voltha.events"
 	cfg.DefaultRequestTimeout = nb.defaultTimeout
 	cfg.DefaultCoreTimeout = nb.defaultTimeout
 	cfg.KVStoreAddress = "127.0.0.1" + ":" + strconv.Itoa(nb.kvClientPort)
@@ -118,7 +122,8 @@
 	endpointMgr := kafka.NewEndpointManager(backend)
 	proxy := model.NewDBPath(backend)
 	nb.adapterMgr = adapter.NewAdapterManager(ctx, proxy, nb.coreInstanceID, nb.kClient)
-	nb.deviceMgr, nb.logicalDeviceMgr = device.NewManagers(proxy, nb.adapterMgr, nb.kmp, endpointMgr, cfg.CoreTopic, nb.coreInstanceID, cfg.DefaultCoreTimeout)
+	eventProxy := events.NewEventProxy(events.MsgClient(nb.kEventClient), events.MsgTopic(kafka.Topic{Name: cfg.EventTopic}))
+	nb.deviceMgr, nb.logicalDeviceMgr = device.NewManagers(proxy, nb.adapterMgr, nb.kmp, endpointMgr, cfg.CoreTopic, nb.coreInstanceID, cfg.DefaultCoreTimeout, eventProxy)
 	nb.adapterMgr.Start(ctx)
 
 	if err := nb.kmp.Start(ctx); err != nil {
@@ -140,6 +145,9 @@
 	if nb.etcdServer != nil {
 		tst.StopEmbeddedEtcdServer(ctx, nb.etcdServer)
 	}
+	if nb.kEventClient != nil {
+		nb.kEventClient.Stop(ctx)
+	}
 }
 
 func (nb *NBTest) verifyLogicalDevices(t *testing.T, oltDevice *voltha.Device, nbi *NBIHandler) {