Fix for timestamps not getting added in the kafka record
Change-Id: Iad3d3230034bf5cc498a4566a5b779e651a2ab11
diff --git a/VERSION b/VERSION
index fae6e3d..af8c8ec 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-4.2.1
+4.2.2
diff --git a/pkg/kafka/sarama_client.go b/pkg/kafka/sarama_client.go
index 1e4efae..cd6d27b 100755
--- a/pkg/kafka/sarama_client.go
+++ b/pkg/kafka/sarama_client.go
@@ -833,6 +833,7 @@
func (sc *SaramaClient) createPublisher(ctx context.Context) error {
// This Creates the publisher
config := sarama.NewConfig()
+ config.Version = sarama.V1_0_0_0
config.Producer.Partitioner = sarama.NewRandomPartitioner
config.Producer.Flush.Frequency = time.Duration(sc.producerFlushFrequency)
config.Producer.Flush.Messages = sc.producerFlushMessages
@@ -856,6 +857,7 @@
func (sc *SaramaClient) createConsumer(ctx context.Context) error {
config := sarama.NewConfig()
+ config.Version = sarama.V1_0_0_0
config.Consumer.Return.Errors = true
config.Consumer.Fetch.Min = 1
config.Consumer.MaxWaitTime = time.Duration(sc.consumerMaxwait) * time.Millisecond
@@ -877,6 +879,7 @@
// createGroupConsumer creates a consumers group
func (sc *SaramaClient) createGroupConsumer(ctx context.Context, topic *Topic, groupId string, initialOffset int64, retries int) (*scc.Consumer, error) {
config := scc.NewConfig()
+ config.Version = sarama.V1_0_0_0
config.ClientID = uuid.New().String()
config.Group.Mode = scc.ConsumerModeMultiplex
config.Consumer.Group.Heartbeat.Interval, _ = time.ParseDuration("1s")