This commit made some modifications in the way messages over
kafka are consumed, mostly around the initial offset to use.
Change-Id: I6104ef710d9c595034cd4cedc0d58ae774cec719
diff --git a/rw_core/main.go b/rw_core/main.go
index 77ce304..472072a 100644
--- a/rw_core/main.go
+++ b/rw_core/main.go
@@ -69,7 +69,11 @@
case "sarama":
return kafka.NewSaramaClient(
kafka.Host(host),
- kafka.Port(port)), nil
+ kafka.Port(port),
+ kafka.ProducerReturnOnErrors(true),
+ kafka.ProducerReturnOnSuccess(true),
+ kafka.ProducerMaxRetries(6),
+ kafka.ProducerRetryBackoff(time.Millisecond * 30)), nil
}
return nil, errors.New("unsupported-client-type")
}
@@ -109,7 +113,7 @@
func (rw *rwCore) start(ctx context.Context) {
log.Info("Starting RW Core components")
- // Setup KV MsgClient
+ // Setup KV Client
log.Debugw("create-kv-client", log.Fields{"kvstore": rw.config.KVStoreType})
err := rw.setKVClient()
if err == nil {
@@ -121,6 +125,7 @@
rw.config.KVTxnKeyDelTime)
}
+ // Setup Kafka Client
if rw.kafkaClient, err = newKafkaClient("sarama", rw.config.KafkaAdapterHost, rw.config.KafkaAdapterPort); err != nil {
log.Fatal("Unsupported-kafka-client")
}
@@ -214,7 +219,7 @@
}
log.SetPackageLogLevel("github.com/opencord/voltha-go/rw_core/core", log.DebugLevel)
- log.SetPackageLogLevel("github.com/opencord/voltha-go/kafka", log.WarnLevel)
+ log.SetPackageLogLevel("github.com/opencord/voltha-go/kafka", log.DebugLevel)
defer log.CleanUp()