VOL-3501 Code changes to support rpc event

Change-Id: I2536c0c03faa5fb026349c906ebef46323398e9a
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 1a22267..4cd1e85 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -26,6 +26,7 @@
 	"github.com/opencord/voltha-go/rw_core/core/api"
 	"github.com/opencord/voltha-go/rw_core/core/device"
 	conf "github.com/opencord/voltha-lib-go/v4/pkg/config"
+	"github.com/opencord/voltha-lib-go/v4/pkg/events"
 	grpcserver "github.com/opencord/voltha-lib-go/v4/pkg/grpc"
 	"github.com/opencord/voltha-lib-go/v4/pkg/kafka"
 	"github.com/opencord/voltha-lib-go/v4/pkg/log"
@@ -70,7 +71,7 @@
 	defer close(core.stopped)
 	defer core.shutdown()
 
-	logger.Info(ctx, "Starting RW Core components")
+	logger.Info(ctx, "starting-rw-core-components")
 
 	// setup kv client
 	logger.Debugw(ctx, "create-kv-client", log.Fields{"kvstore": cf.KVStoreType})
@@ -90,7 +91,7 @@
 
 	// wait until connection to KV Store is up
 	if err := waitUntilKVStoreReachableOrMaxTries(ctx, kvClient, cf.MaxConnectionRetries, cf.ConnectionRetryInterval); err != nil {
-		logger.Fatal(ctx, "Unable-to-connect-to-KV-store")
+		logger.Fatal(ctx, "unable-to-connect-to-kv-store")
 	}
 	go monitorKVStoreLiveness(ctx, backend, cf.LiveProbeInterval, cf.NotLiveProbeInterval)
 
@@ -109,7 +110,25 @@
 		kafka.ProducerRetryBackoff(time.Millisecond*30),
 		kafka.LivenessChannelInterval(cf.LiveProbeInterval/2),
 	)
-	// defer kafkaClient.Stop()
+
+	// create kafka client for events
+	kafkaClientEvent := kafka.NewSaramaClient(
+		kafka.Address(cf.KafkaClusterAddress),
+		kafka.ProducerReturnOnErrors(true),
+		kafka.ProducerReturnOnSuccess(true),
+		kafka.ProducerMaxRetries(6),
+		kafka.ProducerRetryBackoff(time.Millisecond*30),
+		kafka.AutoCreateTopic(true),
+		kafka.MetadatMaxRetries(15),
+	)
+	// create event proxy
+	eventProxy := events.NewEventProxy(events.MsgClient(kafkaClientEvent), events.MsgTopic(kafka.Topic{Name: cf.EventTopic}))
+	if err := kafkaClientEvent.Start(ctx); err != nil {
+		logger.Warn(ctx, "failed-to-setup-kafka-connection-on-kafka-cluster-address")
+		return
+	}
+
+	defer kafkaClientEvent.Stop(ctx)
 
 	// create kv path
 	dbPath := model.NewDBPath(backend)
@@ -122,7 +141,7 @@
 	// core.kmp must be created before deviceMgr and adapterMgr
 	kmp, err := startKafkInterContainerProxy(ctx, kafkaClient, cf.KafkaAdapterAddress, cf.CoreTopic, cf.ConnectionRetryInterval)
 	if err != nil {
-		logger.Warn(ctx, "Failed to setup kafka connection")
+		logger.Warn(ctx, "failed-to-setup-kafka-connection")
 		return
 	}
 	defer kmp.Stop(ctx)
@@ -130,7 +149,7 @@
 
 	// create the core of the system, the device managers
 	endpointMgr := kafka.NewEndpointManager(backend)
-	deviceMgr, logicalDeviceMgr := device.NewManagers(dbPath, adapterMgr, kmp, endpointMgr, cf.CoreTopic, id, cf.DefaultCoreTimeout)
+	deviceMgr, logicalDeviceMgr := device.NewManagers(dbPath, adapterMgr, kmp, endpointMgr, cf.CoreTopic, id, cf.DefaultCoreTimeout, eventProxy)
 
 	// register kafka RPC handler
 	registerAdapterRequestHandlers(ctx, kmp, deviceMgr, adapterMgr, cf.CoreTopic)