VOL-3501 Code changes to support rpc event
Change-Id: I2536c0c03faa5fb026349c906ebef46323398e9a
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 1a22267..4cd1e85 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -26,6 +26,7 @@
"github.com/opencord/voltha-go/rw_core/core/api"
"github.com/opencord/voltha-go/rw_core/core/device"
conf "github.com/opencord/voltha-lib-go/v4/pkg/config"
+ "github.com/opencord/voltha-lib-go/v4/pkg/events"
grpcserver "github.com/opencord/voltha-lib-go/v4/pkg/grpc"
"github.com/opencord/voltha-lib-go/v4/pkg/kafka"
"github.com/opencord/voltha-lib-go/v4/pkg/log"
@@ -70,7 +71,7 @@
defer close(core.stopped)
defer core.shutdown()
- logger.Info(ctx, "Starting RW Core components")
+ logger.Info(ctx, "starting-rw-core-components")
// setup kv client
logger.Debugw(ctx, "create-kv-client", log.Fields{"kvstore": cf.KVStoreType})
@@ -90,7 +91,7 @@
// wait until connection to KV Store is up
if err := waitUntilKVStoreReachableOrMaxTries(ctx, kvClient, cf.MaxConnectionRetries, cf.ConnectionRetryInterval); err != nil {
- logger.Fatal(ctx, "Unable-to-connect-to-KV-store")
+ logger.Fatal(ctx, "unable-to-connect-to-kv-store")
}
go monitorKVStoreLiveness(ctx, backend, cf.LiveProbeInterval, cf.NotLiveProbeInterval)
@@ -109,7 +110,25 @@
kafka.ProducerRetryBackoff(time.Millisecond*30),
kafka.LivenessChannelInterval(cf.LiveProbeInterval/2),
)
- // defer kafkaClient.Stop()
+
+ // create kafka client for events
+ kafkaClientEvent := kafka.NewSaramaClient(
+ kafka.Address(cf.KafkaClusterAddress),
+ kafka.ProducerReturnOnErrors(true),
+ kafka.ProducerReturnOnSuccess(true),
+ kafka.ProducerMaxRetries(6),
+ kafka.ProducerRetryBackoff(time.Millisecond*30),
+ kafka.AutoCreateTopic(true),
+ kafka.MetadatMaxRetries(15),
+ )
+ // create event proxy
+ eventProxy := events.NewEventProxy(events.MsgClient(kafkaClientEvent), events.MsgTopic(kafka.Topic{Name: cf.EventTopic}))
+ if err := kafkaClientEvent.Start(ctx); err != nil {
+ logger.Warn(ctx, "failed-to-setup-kafka-connection-on-kafka-cluster-address")
+ return
+ }
+
+ defer kafkaClientEvent.Stop(ctx)
// create kv path
dbPath := model.NewDBPath(backend)
@@ -122,7 +141,7 @@
// core.kmp must be created before deviceMgr and adapterMgr
kmp, err := startKafkInterContainerProxy(ctx, kafkaClient, cf.KafkaAdapterAddress, cf.CoreTopic, cf.ConnectionRetryInterval)
if err != nil {
- logger.Warn(ctx, "Failed to setup kafka connection")
+ logger.Warn(ctx, "failed-to-setup-kafka-connection")
return
}
defer kmp.Stop(ctx)
@@ -130,7 +149,7 @@
// create the core of the system, the device managers
endpointMgr := kafka.NewEndpointManager(backend)
- deviceMgr, logicalDeviceMgr := device.NewManagers(dbPath, adapterMgr, kmp, endpointMgr, cf.CoreTopic, id, cf.DefaultCoreTimeout)
+ deviceMgr, logicalDeviceMgr := device.NewManagers(dbPath, adapterMgr, kmp, endpointMgr, cf.CoreTopic, id, cf.DefaultCoreTimeout, eventProxy)
// register kafka RPC handler
registerAdapterRequestHandlers(ctx, kmp, deviceMgr, adapterMgr, cf.CoreTopic)