This commit adds a complete partition consumer as well as a
group consumer to the sarama client library. It also upgrades
the kafka running version.
Change-Id: Idca3eb1aa31d668afa86d12b39d6a1b0ab1965bc
diff --git a/compose/docker-compose-zk-kafka-test.yml b/compose/docker-compose-zk-kafka-test.yml
index e339aa4..dac9216 100644
--- a/compose/docker-compose-zk-kafka-test.yml
+++ b/compose/docker-compose-zk-kafka-test.yml
@@ -28,7 +28,8 @@
# Single-node kafka service
#
kafka:
- image: "wurstmeister/kafka:1.1.0"
+# image: "wurstmeister/kafka:1.1.0"
+ image: "wurstmeister/kafka:2.11-2.0.1"
ports:
- 9092:9092
environment:
diff --git a/kafka/client.go b/kafka/client.go
index cb33a35..ad8f01a 100644
--- a/kafka/client.go
+++ b/kafka/client.go
@@ -17,29 +17,41 @@
import (
ca "github.com/opencord/voltha-go/protos/core_adapter"
+ "time"
)
const (
- DefaultKafkaHost = "127.0.0.1"
- DefaultKafkaPort = 9092
- DefaultGroupName = "rw_core"
- DefaultSleepOnError = 1
- DefaultFlushFrequency = 1
- DefaultFlushMessages = 1
- DefaultFlushMaxmessages = 1
- DefaultReturnSuccess = false
- DefaultReturnErrors = true
- DefaultConsumerMaxwait = 10
- DefaultMaxProcessingTime = 100
+ PartitionConsumer = iota
+ GroupCustomer = iota
+)
+
+const (
+ DefaultKafkaHost = "127.0.0.1"
+ DefaultKafkaPort = 9092
+ DefaultGroupName = "rw_core"
+ DefaultSleepOnError = 1
+ DefaultProducerFlushFrequency = 5
+ DefaultProducerFlushMessages = 1
+ DefaultProducerFlushMaxmessages = 5
+ DefaultProducerReturnSuccess = false
+ DefaultProducerReturnErrors = true
+ DefaultProducerRetryMax = 3
+ DefaultProducerRetryBackoff = time.Millisecond * 100
+ DefaultConsumerMaxwait = 10
+ DefaultMaxProcessingTime = 100
+ DefaultConsumerType = PartitionConsumer
+ DefaultNumberPartitions = 3
+ DefaultNumberReplicas = 1
+ DefaultAutoCreateTopic = false
)
// MsgClient represents the set of APIs a Kafka MsgClient must implement
type Client interface {
- Start(retries int) error
+ Start() error
Stop()
- CreateTopic(topic *Topic, numPartition int, repFactor int, retries int) error
+ CreateTopic(topic *Topic, numPartition int, repFactor int) error
DeleteTopic(topic *Topic) error
- Subscribe(topic *Topic, retries int) (<-chan *ca.InterContainerMessage, error)
+ Subscribe(topic *Topic) (<-chan *ca.InterContainerMessage, error)
UnSubscribe(topic *Topic, ch <-chan *ca.InterContainerMessage) error
- Send(msg interface{}, topic *Topic, keys ...string)
+ Send(msg interface{}, topic *Topic, keys ...string) error
}
diff --git a/kafka/kafka_inter_container_library.go b/kafka/kafka_inter_container_library.go
index 25fc1b7..e2210c4 100644
--- a/kafka/kafka_inter_container_library.go
+++ b/kafka/kafka_inter_container_library.go
@@ -70,12 +70,12 @@
lockTopicRequestHandlerChannelMap sync.RWMutex
// This map is used to map a channel to a response topic. This channel handles all responses on that
- // channel for that topic and forward them to the appropriate consumer channel, using the
+ // channel for that topic and forward them to the appropriate consumers channel, using the
// transactionIdToChannelMap.
topicToResponseChannelMap map[string]<-chan *ca.InterContainerMessage
lockTopicResponseChannelMap sync.RWMutex
- // This map is used to map a transaction to a consumer channel. This is used whenever a request has been
+ // This map is used to map a transaction to a consumers channel. This is used whenever a request has been
// sent out and we are waiting for a response.
transactionIdToChannelMap map[string]*transactionChannel
lockTransactionIdToChannelMap sync.RWMutex
@@ -143,7 +143,7 @@
kp.doneCh = make(chan int, 1)
// Start the kafka client
- if err := kp.kafkaClient.Start(DefaultMaxRetries); err != nil {
+ if err := kp.kafkaClient.Start(); err != nil {
log.Errorw("Cannot-create-kafka-proxy", log.Fields{"error": err})
return err
}
@@ -260,7 +260,7 @@
// Subscribe to receive messages for that topic
var ch <-chan *ca.InterContainerMessage
var err error
- if ch, err = kp.kafkaClient.Subscribe(&topic, 1); err != nil {
+ if ch, err = kp.kafkaClient.Subscribe(&topic); err != nil {
//if ch, err = kp.Subscribe(topic); err != nil {
log.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
}
@@ -279,7 +279,7 @@
// Subscribe to receive messages for that topic
var ch <-chan *ca.InterContainerMessage
var err error
- if ch, err = kp.kafkaClient.Subscribe(&topic, 1); err != nil {
+ if ch, err = kp.kafkaClient.Subscribe(&topic); err != nil {
log.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
}
kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: kp.defaultRequestHandlerInterface, ch: ch})
@@ -294,7 +294,7 @@
return kp.deleteFromTopicRequestHandlerChannelMap(topic.Name)
}
-// setupTopicResponseChannelMap sets up single consumer channel that will act as a broadcast channel for all
+// setupTopicResponseChannelMap sets up single consumers channel that will act as a broadcast channel for all
// responses from that topic.
func (kp *InterContainerProxy) setupTopicResponseChannelMap(topic string, arg <-chan *ca.InterContainerMessage) {
kp.lockTopicResponseChannelMap.Lock()
@@ -616,7 +616,7 @@
}
// waitForResponse listens for messages on the subscribedCh, ensure we get a response with the transaction ID,
-// and then dispatches to the consumer
+// and then dispatches to the consumers
func (kp *InterContainerProxy) waitForResponseLoop(subscribedCh <-chan *ca.InterContainerMessage, topic *Topic) {
log.Debugw("starting-response-loop-for-topic", log.Fields{"topic": topic.Name})
startloop:
@@ -649,7 +649,7 @@
if !kp.isTopicSubscribedForResponse(topic.Name) {
var subscribedCh <-chan *ca.InterContainerMessage
var err error
- if subscribedCh, err = kp.kafkaClient.Subscribe(&topic, 1); err != nil {
+ if subscribedCh, err = kp.kafkaClient.Subscribe(&topic); err != nil {
log.Debugw("subscribe-failure", log.Fields{"topic": topic.Name})
return nil, err
}
@@ -657,7 +657,7 @@
go kp.waitForResponseLoop(subscribedCh, &topic)
}
- // Create a specific channel for this consumer. We cannot use the channel from the kafkaclient as it will
+ // Create a specific channel for this consumers. We cannot use the channel from the kafkaclient as it will
// broadcast any message for this topic to all channels waiting on it.
ch := make(chan *ca.InterContainerMessage)
kp.addToTransactionIdToChannelMap(trnsId, &topic, ch)
diff --git a/kafka/sarama_client.go b/kafka/sarama_client.go
index eed0588..f2de01a 100644
--- a/kafka/sarama_client.go
+++ b/kafka/sarama_client.go
@@ -18,32 +18,54 @@
import (
"errors"
"fmt"
- "github.com/Shopify/sarama"
scc "github.com/bsm/sarama-cluster"
"github.com/golang/protobuf/proto"
+ "github.com/google/uuid"
"github.com/opencord/voltha-go/common/log"
ca "github.com/opencord/voltha-go/protos/core_adapter"
+ "gopkg.in/Shopify/sarama.v1"
+ "strings"
"sync"
"time"
)
-// consumerChannels represents a consumer listening on a kafka topic. Once it receives a message on that
-// topic it broadcasts the message to all the listening channels
+func init() {
+ log.AddPackage(log.JSON, log.WarnLevel, nil)
+}
+
+type returnErrorFunction func() error
+
+// consumerChannels represents one or more consumers listening on a kafka topic. Once a message is received on that
+// topic, the consumer(s) broadcasts the message to all the listening channels. The consumer can be a partition
+//consumer or a group consumer
type consumerChannels struct {
- consumer sarama.PartitionConsumer
- //consumer *sc.Consumer
- channels []chan *ca.InterContainerMessage
+ consumers []interface{}
+ channels []chan *ca.InterContainerMessage
}
// SaramaClient represents the messaging proxy
type SaramaClient struct {
- broker *sarama.Broker
+ cAdmin sarama.ClusterAdmin
client sarama.Client
KafkaHost string
KafkaPort int
producer sarama.AsyncProducer
consumer sarama.Consumer
groupConsumer *scc.Consumer
+ consumerType int
+ groupName string
+ producerFlushFrequency int
+ producerFlushMessages int
+ producerFlushMaxmessages int
+ producerRetryMax int
+ producerRetryBackOff time.Duration
+ producerReturnSuccess bool
+ producerReturnErrors bool
+ consumerMaxwait int
+ maxProcessingTime int
+ numPartitions int
+ numReplicas int
+ autoCreateTopic bool
doneCh chan int
topicToConsumerChannelMap map[string]*consumerChannels
lockTopicToConsumerChannelMap sync.RWMutex
@@ -63,11 +85,90 @@
}
}
+func ConsumerType(consumer int) SaramaClientOption {
+ return func(args *SaramaClient) {
+ args.consumerType = consumer
+ }
+}
+
+func ProducerFlushFrequency(frequency int) SaramaClientOption {
+ return func(args *SaramaClient) {
+ args.producerFlushFrequency = frequency
+ }
+}
+
+func ProducerFlushMessages(num int) SaramaClientOption {
+ return func(args *SaramaClient) {
+ args.producerFlushMessages = num
+ }
+}
+
+func ProducerFlushMaxMessages(num int) SaramaClientOption {
+ return func(args *SaramaClient) {
+ args.producerFlushMaxmessages = num
+ }
+}
+
+func ReturnOnErrors(opt bool) SaramaClientOption {
+ return func(args *SaramaClient) {
+ args.producerReturnErrors = opt
+ }
+}
+
+func ReturnOnSuccess(opt bool) SaramaClientOption {
+ return func(args *SaramaClient) {
+ args.producerReturnSuccess = opt
+ }
+}
+
+func ConsumerMaxWait(wait int) SaramaClientOption {
+ return func(args *SaramaClient) {
+ args.consumerMaxwait = wait
+ }
+}
+
+func MaxProcessingTime(pTime int) SaramaClientOption {
+ return func(args *SaramaClient) {
+ args.maxProcessingTime = pTime
+ }
+}
+
+func NumPartitions(number int) SaramaClientOption {
+ return func(args *SaramaClient) {
+ args.numPartitions = number
+ }
+}
+
+func NumReplicas(number int) SaramaClientOption {
+ return func(args *SaramaClient) {
+ args.numReplicas = number
+ }
+}
+
+func AutoCreateTopic(opt bool) SaramaClientOption {
+ return func(args *SaramaClient) {
+ args.autoCreateTopic = opt
+ }
+}
+
func NewSaramaClient(opts ...SaramaClientOption) *SaramaClient {
client := &SaramaClient{
KafkaHost: DefaultKafkaHost,
KafkaPort: DefaultKafkaPort,
}
+ client.consumerType = DefaultConsumerType
+ client.producerFlushFrequency = DefaultProducerFlushFrequency
+ client.producerFlushMessages = DefaultProducerFlushMessages
+ client.producerFlushMaxmessages = DefaultProducerFlushMaxmessages
+ client.producerReturnErrors = DefaultProducerReturnErrors
+ client.producerReturnSuccess = DefaultProducerReturnSuccess
+ client.producerRetryMax = DefaultProducerRetryMax
+ client.producerRetryBackOff = DefaultProducerRetryBackoff
+ client.consumerMaxwait = DefaultConsumerMaxwait
+ client.maxProcessingTime = DefaultMaxProcessingTime
+ client.numPartitions = DefaultNumberPartitions
+ client.numReplicas = DefaultNumberReplicas
+ client.autoCreateTopic = DefaultAutoCreateTopic
for _, option := range opts {
option(client)
@@ -78,25 +179,33 @@
return client
}
-func (sc *SaramaClient) Start(retries int) error {
- log.Info("Starting-Proxy")
+func (sc *SaramaClient) Start() error {
+ log.Info("Starting-kafka-sarama-client")
// Create the Done channel
sc.doneCh = make(chan int, 1)
+ var err error
+
+ // Create the Cluster Admin
+ if err = sc.createClusterAdmin(); err != nil {
+ log.Errorw("Cannot-create-cluster-admin", log.Fields{"error": err})
+ return err
+ }
+
// Create the Publisher
- if err := sc.createPublisher(retries); err != nil {
+ if err := sc.createPublisher(); err != nil {
log.Errorw("Cannot-create-kafka-publisher", log.Fields{"error": err})
return err
}
- // Create the master consumer
- if err := sc.createConsumer(retries); err != nil {
- log.Errorw("Cannot-create-kafka-consumer", log.Fields{"error": err})
+ // Create the master consumers
+ if err := sc.createConsumer(); err != nil {
+ log.Errorw("Cannot-create-kafka-consumers", log.Fields{"error": err})
return err
}
- // Create the topic to consumer/channel map
+ // Create the topic to consumers/channel map
sc.topicToConsumerChannelMap = make(map[string]*consumerChannels)
return nil
@@ -108,19 +217,182 @@
//Send a message over the done channel to close all long running routines
sc.doneCh <- 1
- // Clear the consumer map
- //sc.clearConsumerChannelMap()
-
if sc.producer != nil {
if err := sc.producer.Close(); err != nil {
panic(err)
}
}
+
if sc.consumer != nil {
if err := sc.consumer.Close(); err != nil {
panic(err)
}
}
+
+ if sc.groupConsumer != nil {
+ if err := sc.groupConsumer.Close(); err != nil {
+ panic(err)
+ }
+ }
+
+ if sc.cAdmin != nil {
+ if err := sc.cAdmin.Close(); err != nil {
+ panic(err)
+ }
+ }
+
+ //TODO: Clear the consumers map
+ sc.clearConsumerChannelMap()
+
+ log.Info("sarama-client-stopped")
+}
+
+//CreateTopic creates a topic on the Kafka Broker.
+func (sc *SaramaClient) CreateTopic(topic *Topic, numPartition int, repFactor int) error {
+ // Set the topic details
+ topicDetail := &sarama.TopicDetail{}
+ topicDetail.NumPartitions = int32(numPartition)
+ topicDetail.ReplicationFactor = int16(repFactor)
+ topicDetail.ConfigEntries = make(map[string]*string)
+ topicDetails := make(map[string]*sarama.TopicDetail)
+ topicDetails[topic.Name] = topicDetail
+
+ if err := sc.cAdmin.CreateTopic(topic.Name, topicDetail, false); err != nil {
+ if err == sarama.ErrTopicAlreadyExists {
+ // Not an error
+ log.Debugw("topic-already-exist", log.Fields{"topic": topic.Name})
+ return nil
+ }
+ log.Errorw("create-topic-failure", log.Fields{"error": err})
+ return err
+ }
+ // TODO: Wait until the topic has been created. No API is available in the Sarama clusterAdmin to
+ // do so.
+ log.Debugw("topic-created", log.Fields{"topic": topic, "numPartition": numPartition, "replicationFactor": repFactor})
+ return nil
+}
+
+//DeleteTopic removes a topic from the kafka Broker
+func (sc *SaramaClient) DeleteTopic(topic *Topic) error {
+ // Remove the topic from the broker
+ if err := sc.cAdmin.DeleteTopic(topic.Name); err != nil {
+ if err == sarama.ErrUnknownTopicOrPartition {
+ // Not an error as does not exist
+ log.Debugw("topic-not-exist", log.Fields{"topic": topic.Name})
+ return nil
+ }
+ log.Errorw("delete-topic-failed", log.Fields{"topic": topic, "error": err})
+ return err
+ }
+
+ // Clear the topic from the consumer channel. This will also close any consumers listening on that topic.
+ if err := sc.clearTopicFromConsumerChannelMap(*topic); err != nil {
+ log.Errorw("failure-clearing-channels", log.Fields{"topic": topic, "error": err})
+ return err
+ }
+ return nil
+}
+
+// Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
+// messages from that topic
+func (sc *SaramaClient) Subscribe(topic *Topic) (<-chan *ca.InterContainerMessage, error) {
+ log.Debugw("subscribe", log.Fields{"topic": topic.Name})
+
+ // If a consumers already exist for that topic then resuse it
+ if consumerCh := sc.getConsumerChannel(topic); consumerCh != nil {
+ log.Debugw("topic-already-subscribed", log.Fields{"topic": topic.Name})
+ // Create a channel specific for that consumers and add it to the consumers channel map
+ ch := make(chan *ca.InterContainerMessage)
+ sc.addChannelToConsumerChannelMap(topic, ch)
+ return ch, nil
+ }
+
+ // Register for the topic and set it up
+ var consumerListeningChannel chan *ca.InterContainerMessage
+ var err error
+
+ // Use the consumerType option to figure out the type of consumer to launch
+ if sc.consumerType == PartitionConsumer {
+ if sc.autoCreateTopic {
+ if err = sc.CreateTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
+ log.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
+ return nil, err
+ }
+ }
+ if consumerListeningChannel, err = sc.setupPartitionConsumerChannel(topic, sarama.OffsetNewest); err != nil {
+ log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
+ return nil, err
+ }
+ } else if sc.consumerType == GroupCustomer {
+ // TODO: create topic if auto create is on. There is an issue with the sarama cluster library that
+ // does not consume from a precreated topic in some scenarios
+ if consumerListeningChannel, err = sc.setupGroupConsumerChannel(topic, "mytest"); err != nil {
+ log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
+ return nil, err
+ }
+ } else {
+ log.Warnw("unknown-consumer-type", log.Fields{"consumer-type": sc.consumerType})
+ return nil, errors.New("unknown-consumer-type")
+ }
+
+ return consumerListeningChannel, nil
+}
+
+//UnSubscribe unsubscribe a consumer from a given topic
+func (sc *SaramaClient) UnSubscribe(topic *Topic, ch <-chan *ca.InterContainerMessage) error {
+ log.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
+ err := sc.removeChannelFromConsumerChannelMap(*topic, ch)
+ return err
+}
+
+// send formats and sends the request onto the kafka messaging bus.
+func (sc *SaramaClient) Send(msg interface{}, topic *Topic, keys ...string) error {
+
+ // Assert message is a proto message
+ var protoMsg proto.Message
+ var ok bool
+ // ascertain the value interface type is a proto.Message
+ if protoMsg, ok = msg.(proto.Message); !ok {
+ log.Warnw("message-not-proto-message", log.Fields{"msg": msg})
+ return errors.New(fmt.Sprintf("not-a-proto-msg-%s", msg))
+ }
+
+ var marshalled []byte
+ var err error
+ // Create the Sarama producer message
+ if marshalled, err = proto.Marshal(protoMsg); err != nil {
+ log.Errorw("marshalling-failed", log.Fields{"msg": protoMsg, "error": err})
+ return err
+ }
+ key := ""
+ if len(keys) > 0 {
+ key = keys[0] // Only the first key is relevant
+ }
+ kafkaMsg := &sarama.ProducerMessage{
+ Topic: topic.Name,
+ Key: sarama.StringEncoder(key),
+ Value: sarama.ByteEncoder(marshalled),
+ }
+
+ // Send message to kafka
+ sc.producer.Input() <- kafkaMsg
+ return nil
+}
+
+func (sc *SaramaClient) createClusterAdmin() error {
+ kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
+ config := sarama.NewConfig()
+ config.Version = sarama.V1_0_0_0
+
+ // Create a cluster Admin
+ var cAdmin sarama.ClusterAdmin
+ var err error
+ if cAdmin, err = sarama.NewClusterAdmin([]string{kafkaFullAddr}, config); err != nil {
+ log.Errorw("cluster-admin-failure", log.Fields{"error": err, "broker-address": kafkaFullAddr})
+ return err
+ }
+ sc.cAdmin = cAdmin
+ return nil
}
func (sc *SaramaClient) addTopicToConsumerChannelMap(id string, arg *consumerChannels) {
@@ -139,7 +411,7 @@
}
}
-func (sc *SaramaClient) getConsumerChannel(topic Topic) *consumerChannels {
+func (sc *SaramaClient) getConsumerChannel(topic *Topic) *consumerChannels {
sc.lockTopicToConsumerChannelMap.Lock()
defer sc.lockTopicToConsumerChannelMap.Unlock()
@@ -149,14 +421,43 @@
return nil
}
-func (sc *SaramaClient) addChannelToConsumerChannelMap(topic Topic, ch chan *ca.InterContainerMessage) {
+func (sc *SaramaClient) addChannelToConsumerChannelMap(topic *Topic, ch chan *ca.InterContainerMessage) {
sc.lockTopicToConsumerChannelMap.Lock()
defer sc.lockTopicToConsumerChannelMap.Unlock()
if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
consumerCh.channels = append(consumerCh.channels, ch)
return
}
- log.Warnw("consumer-channel-not-exist", log.Fields{"topic": topic.Name})
+ log.Warnw("consumers-channel-not-exist", log.Fields{"topic": topic.Name})
+}
+
+//closeConsumers closes a list of sarama consumers. The consumers can either be a partition consumers or a group consumers
+func closeConsumers(consumers []interface{}) error {
+ var err error
+ for _, consumer := range consumers {
+ // Is it a partition consumers?
+ if partionConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
+ if errTemp := partionConsumer.Close(); errTemp != nil {
+ log.Debugw("partition!!!", log.Fields{"err": errTemp})
+ if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
+ // This can occur on race condition
+ err = nil
+ } else {
+ err = errTemp
+ }
+ }
+ } else if groupConsumer, ok := consumer.(*scc.Consumer); ok {
+ if errTemp := groupConsumer.Close(); errTemp != nil {
+ if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
+ // This can occur on race condition
+ err = nil
+ } else {
+ err = errTemp
+ }
+ }
+ }
+ }
+ return err
}
func (sc *SaramaClient) removeChannelFromConsumerChannelMap(topic Topic, ch <-chan *ca.InterContainerMessage) error {
@@ -165,10 +466,11 @@
if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
// Channel will be closed in the removeChannel method
consumerCh.channels = removeChannel(consumerCh.channels, ch)
- // If there are no more channels then we can close the consumer itself
+ // If there are no more channels then we can close the consumers itself
if len(consumerCh.channels) == 0 {
- log.Debugw("closing-consumer", log.Fields{"topic": topic})
- err := consumerCh.consumer.Close()
+ log.Debugw("closing-consumers", log.Fields{"topic": topic})
+ err := closeConsumers(consumerCh.consumers)
+ //err := consumerCh.consumers.Close()
delete(sc.topicToConsumerChannelMap, topic.Name)
return err
}
@@ -186,12 +488,17 @@
// Channel will be closed in the removeChannel method
removeChannel(consumerCh.channels, ch)
}
- err := consumerCh.consumer.Close()
+ err := closeConsumers(consumerCh.consumers)
+ //if err == sarama.ErrUnknownTopicOrPartition {
+ // // Not an error
+ // err = nil
+ //}
+ //err := consumerCh.consumers.Close()
delete(sc.topicToConsumerChannelMap, topic.Name)
return err
}
- log.Warnw("topic-does-not-exist", log.Fields{"topic": topic.Name})
- return errors.New("topic-does-not-exist")
+ log.Debugw("topic-does-not-exist", log.Fields{"topic": topic.Name})
+ return nil
}
func (sc *SaramaClient) clearConsumerChannelMap() error {
@@ -203,139 +510,70 @@
// Channel will be closed in the removeChannel method
removeChannel(consumerCh.channels, ch)
}
- err = consumerCh.consumer.Close()
+ if errTemp := closeConsumers(consumerCh.consumers); errTemp != nil {
+ err = errTemp
+ }
+ //err = consumerCh.consumers.Close()
delete(sc.topicToConsumerChannelMap, topic)
}
return err
}
-func (sc *SaramaClient) CreateTopic(topic *Topic, numPartition int, repFactor int, retries int) error {
- // This Creates the kafka topic
- // Set broker configuration
- kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
- broker := sarama.NewBroker(kafkaFullAddr)
-
- // Additional configurations. Check sarama doc for more info
- config := sarama.NewConfig()
- config.Version = sarama.V1_0_0_0
-
- // Open broker connection with configs defined above
- broker.Open(config)
-
- // check if the connection was OK
- _, err := broker.Connected()
- if err != nil {
- return err
- }
-
- topicDetail := &sarama.TopicDetail{}
- topicDetail.NumPartitions = int32(numPartition)
- topicDetail.ReplicationFactor = int16(repFactor)
- topicDetail.ConfigEntries = make(map[string]*string)
-
- topicDetails := make(map[string]*sarama.TopicDetail)
- topicDetails[topic.Name] = topicDetail
-
- request := sarama.CreateTopicsRequest{
- Timeout: time.Second * 1,
- TopicDetails: topicDetails,
- }
-
- for {
- // Send request to Broker
- if response, err := broker.CreateTopics(&request); err != nil {
- if retries == 0 {
- log.Errorw("error-creating-topic", log.Fields{"error": err})
- return err
- } else {
- // If retries is -ve then we will retry indefinitely
- retries--
- }
- log.Debug("retrying-after-a-second-delay")
- time.Sleep(time.Duration(DefaultSleepOnError) * time.Second)
- } else {
- log.Debug("topic-response", log.Fields{"response": response})
- break
- }
- }
-
- log.Debug("topic-created", log.Fields{"topic": topic, "numPartition": numPartition, "replicationFactor": repFactor})
- return nil
-}
-
-func (sc *SaramaClient) createPublisher(retries int) error {
+//createPublisher creates the publisher which is used to send a message onto kafka
+func (sc *SaramaClient) createPublisher() error {
// This Creates the publisher
config := sarama.NewConfig()
config.Producer.Partitioner = sarama.NewRandomPartitioner
- config.Producer.Flush.Frequency = time.Duration(DefaultFlushFrequency)
- config.Producer.Flush.Messages = DefaultFlushMessages
- config.Producer.Flush.MaxMessages = DefaultFlushMaxmessages
- config.Producer.Return.Errors = DefaultReturnErrors
- config.Producer.Return.Successes = DefaultReturnSuccess
- config.Producer.RequiredAcks = sarama.WaitForAll
+ config.Producer.Flush.Frequency = time.Duration(sc.producerFlushFrequency)
+ config.Producer.Flush.Messages = sc.producerFlushMessages
+ config.Producer.Flush.MaxMessages = sc.producerFlushMaxmessages
+ config.Producer.Return.Errors = sc.producerReturnErrors
+ config.Producer.Return.Successes = sc.producerReturnSuccess
+ //config.Producer.RequiredAcks = sarama.WaitForAll
+ config.Producer.RequiredAcks = sarama.WaitForLocal
+
kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
brokers := []string{kafkaFullAddr}
- for {
- producer, err := sarama.NewAsyncProducer(brokers, config)
- if err != nil {
- if retries == 0 {
- log.Errorw("error-starting-publisher", log.Fields{"error": err})
- return err
- } else {
- // If retries is -ve then we will retry indefinitely
- retries--
- }
- log.Info("retrying-after-a-second-delay")
- time.Sleep(time.Duration(DefaultSleepOnError) * time.Second)
- } else {
- sc.producer = producer
- break
- }
+ if producer, err := sarama.NewAsyncProducer(brokers, config); err != nil {
+ log.Errorw("error-starting-publisher", log.Fields{"error": err})
+ return err
+ } else {
+ sc.producer = producer
}
log.Info("Kafka-publisher-created")
return nil
}
-func (sc *SaramaClient) createConsumer(retries int) error {
+func (sc *SaramaClient) createConsumer() error {
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
config.Consumer.Fetch.Min = 1
- config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
- config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
+ config.Consumer.MaxWaitTime = time.Duration(sc.consumerMaxwait) * time.Millisecond
+ config.Consumer.MaxProcessingTime = time.Duration(sc.maxProcessingTime) * time.Millisecond
config.Consumer.Offsets.Initial = sarama.OffsetNewest
kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
brokers := []string{kafkaFullAddr}
- for {
- consumer, err := sarama.NewConsumer(brokers, config)
- if err != nil {
- if retries == 0 {
- log.Errorw("error-starting-consumer", log.Fields{"error": err})
- return err
- } else {
- // If retries is -ve then we will retry indefinitely
- retries--
- }
- log.Info("retrying-after-a-second-delay")
- time.Sleep(time.Duration(DefaultSleepOnError) * time.Second)
- } else {
- sc.consumer = consumer
- break
- }
+ if consumer, err := sarama.NewConsumer(brokers, config); err != nil {
+ log.Errorw("error-starting-consumers", log.Fields{"error": err})
+ return err
+ } else {
+ sc.consumer = consumer
}
- log.Info("Kafka-consumer-created")
+ log.Info("Kafka-consumers-created")
return nil
}
-// createGroupConsumer creates a consumer group
+// createGroupConsumer creates a consumers group
func (sc *SaramaClient) createGroupConsumer(topic *Topic, groupId *string, retries int) (*scc.Consumer, error) {
config := scc.NewConfig()
+ config.ClientID = uuid.New().String()
config.Group.Mode = scc.ConsumerModeMultiplex
- config.Consumer.Return.Errors = true
- config.Group.Return.Notifications = true
- config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
- config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
+ //config.Consumer.Return.Errors = true
+ //config.Group.Return.Notifications = false
+ //config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
+ //config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
config.Consumer.Offsets.Initial = sarama.OffsetNewest
kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
brokers := []string{kafkaFullAddr}
@@ -348,73 +586,16 @@
var consumer *scc.Consumer
var err error
- // Create the topic with default attributes
- // TODO: needs to be revisited
- //sc.CreateTopic(&Topic{Name:topic.Name}, 3, 1, 1)
-
if consumer, err = scc.NewConsumer(brokers, *groupId, topics, config); err != nil {
- log.Errorw("create-consumer-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
+ log.Errorw("create-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
return nil, err
}
- log.Debugw("create-consumer-success", log.Fields{"topic": topic.Name, "groupId": groupId})
+ log.Debugw("create-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
//time.Sleep(10*time.Second)
sc.groupConsumer = consumer
return consumer, nil
}
-// send formats and sends the request onto the kafka messaging bus.
-func (sc *SaramaClient) Send(msg interface{}, topic *Topic, keys ...string) {
-
- // Assert message is a proto message
- var protoMsg proto.Message
- var ok bool
- // ascertain the value interface type is a proto.Message
- if protoMsg, ok = msg.(proto.Message); !ok {
- log.Warnw("message-not-proto-message", log.Fields{"msg": msg})
- return
- }
-
- // Create the Sarama producer message
- marshalled, _ := proto.Marshal(protoMsg)
- key := ""
- if len(keys) > 0 {
- key = keys[0] // Only the first key is relevant
- }
- kafkaMsg := &sarama.ProducerMessage{
- Topic: topic.Name,
- Key: sarama.StringEncoder(key),
- Value: sarama.ByteEncoder(marshalled),
- }
-
- // Send message to kafka
- sc.producer.Input() <- kafkaMsg
-}
-
-// Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
-// messages from that topic
-func (sc *SaramaClient) Subscribe(topic *Topic, retries int) (<-chan *ca.InterContainerMessage, error) {
- log.Debugw("subscribe", log.Fields{"topic": topic.Name})
-
- // If a consumer already exist for that topic then resuse it
- if consumerCh := sc.getConsumerChannel(*topic); consumerCh != nil {
- log.Debugw("topic-already-subscribed", log.Fields{"topic": topic.Name})
- // Create a channel specific for that consumer and add it to the consumer channel map
- ch := make(chan *ca.InterContainerMessage)
- sc.addChannelToConsumerChannelMap(*topic, ch)
- return ch, nil
- }
-
- // Register for the topic and set it up
- var consumerListeningChannel chan *ca.InterContainerMessage
- var err error
- if consumerListeningChannel, err = sc.setupConsumerChannel(topic); err != nil {
- log.Warnw("create-consumer-channel-failure", log.Fields{"error": err, "topic": topic.Name})
- return nil, err
- }
-
- return consumerListeningChannel, nil
-}
-
// dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
// topic via the unique channel each subsciber received during subscription
func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ca.InterContainerMessage) {
@@ -428,91 +609,179 @@
}
}
-func (sc *SaramaClient) consumeMessagesLoop(topic Topic) {
- log.Debugw("starting-consuming-messages", log.Fields{"topic": topic.Name})
- var consumerCh *consumerChannels
- if consumerCh = sc.getConsumerChannel(topic); consumerCh == nil {
- log.Errorw("consumer-not-exist", log.Fields{"topic": topic.Name})
- return
- }
+func (sc *SaramaClient) consumeFromAPartition(topic *Topic, consumer sarama.PartitionConsumer, consumerChnls *consumerChannels) {
+ log.Debugw("starting-partition-consumption-loop", log.Fields{"topic": topic.Name})
startloop:
for {
select {
- case err := <-consumerCh.consumer.Errors():
- log.Warnw("consumer-error", log.Fields{"error": err})
- case msg := <-consumerCh.consumer.Messages():
+ case err := <-consumer.Errors():
+ if err != nil {
+ log.Warnw("partition-consumers-error", log.Fields{"error": err})
+ } else {
+ // There is a race condition when this loop is stopped and the consumer is closed where
+ // the actual error comes as nil
+ log.Warn("partition-consumers-error")
+ }
+ case msg := <-consumer.Messages():
//log.Debugw("message-received", log.Fields{"msg": msg, "receivedTopic": msg.Topic})
- // Since the only expected message is a proto intercontainermessage then extract it right away
- // instead of dispatching it to the consumers
+ if msg == nil {
+ // There is a race condition when this loop is stopped and the consumer is closed where
+ // the actual msg comes as nil
+ break startloop
+ }
+ msgBody := msg.Value
+ icm := &ca.InterContainerMessage{}
+ if err := proto.Unmarshal(msgBody, icm); err != nil {
+ log.Warnw("partition-invalid-message", log.Fields{"error": err})
+ continue
+ }
+ go sc.dispatchToConsumers(consumerChnls, icm)
+ case <-sc.doneCh:
+ log.Infow("partition-received-exit-signal", log.Fields{"topic": topic.Name})
+ break startloop
+ }
+ }
+ log.Infow("partition-consumer-stopped", log.Fields{"topic": topic.Name})
+}
+
+func (sc *SaramaClient) consumeGroupMessages(topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
+ log.Debugw("starting-group-consumption-loop", log.Fields{"topic": topic.Name})
+
+startloop:
+ for {
+ select {
+ case err := <-consumer.Errors():
+ if err != nil {
+ log.Warnw("group-consumers-error", log.Fields{"error": err})
+ } else {
+ // There is a race condition when this loop is stopped and the consumer is closed where
+ // the actual error comes as nil
+ log.Warn("group-consumers-error")
+ }
+ case msg := <-consumer.Messages():
+ //log.Debugw("message-received", log.Fields{"msg": msg, "receivedTopic": msg.Topic})
+ if msg == nil {
+ // There is a race condition when this loop is stopped and the consumer is closed where
+ // the actual msg comes as nil
+ break startloop
+ }
msgBody := msg.Value
icm := &ca.InterContainerMessage{}
if err := proto.Unmarshal(msgBody, icm); err != nil {
log.Warnw("invalid-message", log.Fields{"error": err})
continue
}
- go sc.dispatchToConsumers(consumerCh, icm)
-
- //consumerCh.consumer.MarkOffset(msg, "")
- //// TODO: Dispatch requests and responses separately
+ go sc.dispatchToConsumers(consumerChnls, icm)
+ consumer.MarkOffset(msg, "")
+ case ntf := <-consumer.Notifications():
+ log.Debugw("group-received-notification", log.Fields{"notification": ntf})
case <-sc.doneCh:
- log.Infow("received-exit-signal", log.Fields{"topic": topic.Name})
+ log.Infow("group-received-exit-signal", log.Fields{"topic": topic.Name})
break startloop
}
}
- log.Infow("received-exit-signal-out-of-for-loop", log.Fields{"topic": topic.Name})
+ log.Infow("group-consumer-stopped", log.Fields{"topic": topic.Name})
}
-// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
-// for that topic. It also starts the routine that listens for messages on that topic.
-func (sc *SaramaClient) setupConsumerChannel(topic *Topic) (chan *ca.InterContainerMessage, error) {
- // TODO: Replace this development partition consumer with a group consumer
- var pConsumer *sarama.PartitionConsumer
+func (sc *SaramaClient) startConsumers(topic *Topic) error {
+ log.Debugw("starting-consumers", log.Fields{"topic": topic.Name})
+ var consumerCh *consumerChannels
+ if consumerCh = sc.getConsumerChannel(topic); consumerCh == nil {
+ log.Errorw("consumers-not-exist", log.Fields{"topic": topic.Name})
+ return errors.New("consumers-not-exist")
+ }
+ // For each consumer listening for that topic, start a consumption loop
+ for _, consumer := range consumerCh.consumers {
+ if pConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
+ go sc.consumeFromAPartition(topic, pConsumer, consumerCh)
+ } else if gConsumer, ok := consumer.(*scc.Consumer); ok {
+ go sc.consumeGroupMessages(topic, gConsumer, consumerCh)
+ } else {
+ log.Errorw("invalid-consumer", log.Fields{"topic": topic})
+ return errors.New("invalid-consumer")
+ }
+ }
+ return nil
+}
+
+//// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
+//// for that topic. It also starts the routine that listens for messages on that topic.
+func (sc *SaramaClient) setupPartitionConsumerChannel(topic *Topic, initialOffset int64) (chan *ca.InterContainerMessage, error) {
+ var pConsumers []sarama.PartitionConsumer
var err error
- if pConsumer, err = sc.CreatePartionConsumer(topic, DefaultMaxRetries); err != nil {
- log.Errorw("creating-partition-consumer-failure", log.Fields{"error": err, "topic": topic.Name})
+
+ if pConsumers, err = sc.createPartionConsumers(topic, initialOffset); err != nil {
+ log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
return nil, err
}
- // Create the consumer/channel structure and set the consumer and create a channel on that topic - for now
+ consumersIf := make([]interface{}, 0)
+ for _, pConsumer := range pConsumers {
+ consumersIf = append(consumersIf, pConsumer)
+ }
+
+ // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
// unbuffered to verify race conditions.
consumerListeningChannel := make(chan *ca.InterContainerMessage)
cc := &consumerChannels{
- consumer: *pConsumer,
- channels: []chan *ca.InterContainerMessage{consumerListeningChannel},
+ consumers: consumersIf,
+ channels: []chan *ca.InterContainerMessage{consumerListeningChannel},
}
- // Add the consumer channel to the map
+ // Add the consumers channel to the map
sc.addTopicToConsumerChannelMap(topic.Name, cc)
- //Start a consumer to listen on that specific topic
- go sc.consumeMessagesLoop(*topic)
+ //Start a consumers to listen on that specific topic
+ go sc.startConsumers(topic)
return consumerListeningChannel, nil
}
-func (sc *SaramaClient) CreatePartionConsumer(topic *Topic, retries int) (*sarama.PartitionConsumer, error) {
- log.Debugw("creating-partition-consumer", log.Fields{"topic": topic.Name})
+// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
+// for that topic. It also starts the routine that listens for messages on that topic.
+func (sc *SaramaClient) setupGroupConsumerChannel(topic *Topic, groupId string) (chan *ca.InterContainerMessage, error) {
+ // TODO: Replace this development partition consumers with a group consumers
+ var pConsumer *scc.Consumer
+ var err error
+ if pConsumer, err = sc.createGroupConsumer(topic, &groupId, DefaultMaxRetries); err != nil {
+ log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
+ return nil, err
+ }
+ // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
+ // unbuffered to verify race conditions.
+ consumerListeningChannel := make(chan *ca.InterContainerMessage)
+ cc := &consumerChannels{
+ consumers: []interface{}{pConsumer},
+ channels: []chan *ca.InterContainerMessage{consumerListeningChannel},
+ }
+
+ // Add the consumers channel to the map
+ sc.addTopicToConsumerChannelMap(topic.Name, cc)
+
+ //Start a consumers to listen on that specific topic
+ go sc.startConsumers(topic)
+
+ return consumerListeningChannel, nil
+}
+
+func (sc *SaramaClient) createPartionConsumers(topic *Topic, initialOffset int64) ([]sarama.PartitionConsumer, error) {
+ log.Debugw("creating-partition-consumers", log.Fields{"topic": topic.Name})
partitionList, err := sc.consumer.Partitions(topic.Name)
if err != nil {
log.Warnw("get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
return nil, err
}
- log.Debugw("partitions", log.Fields{"topic": topic.Name, "partitionList": partitionList, "first": partitionList[0]})
- // Create a partition consumer for that topic - for now just use one partition
- var pConsumer sarama.PartitionConsumer
- if pConsumer, err = sc.consumer.ConsumePartition(topic.Name, partitionList[0], sarama.OffsetNewest); err != nil {
- log.Warnw("consumer-partition-failure", log.Fields{"error": err, "topic": topic.Name})
- return nil, err
+ pConsumers := make([]sarama.PartitionConsumer, 0)
+ for _, partition := range partitionList {
+ var pConsumer sarama.PartitionConsumer
+ if pConsumer, err = sc.consumer.ConsumePartition(topic.Name, partition, initialOffset); err != nil {
+ log.Warnw("consumers-partition-failure", log.Fields{"error": err, "topic": topic.Name})
+ return nil, err
+ }
+ pConsumers = append(pConsumers, pConsumer)
}
- log.Debugw("partition-consumer-created", log.Fields{"topic": topic.Name})
- return &pConsumer, nil
-}
-
-func (sc *SaramaClient) UnSubscribe(topic *Topic, ch <-chan *ca.InterContainerMessage) error {
- log.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
- err := sc.removeChannelFromConsumerChannelMap(*topic, ch)
- return err
+ return pConsumers, nil
}
func removeChannel(channels []chan *ca.InterContainerMessage, ch <-chan *ca.InterContainerMessage) []chan *ca.InterContainerMessage {
@@ -527,11 +796,3 @@
}
return channels
}
-
-func (sc *SaramaClient) DeleteTopic(topic *Topic) error {
- if err := sc.clearTopicFromConsumerChannelMap(*topic); err != nil {
- log.Errorw("failure-clearing-channels", log.Fields{"topic": topic, "error": err})
- return err
- }
- return nil
-}
diff --git a/tests/kafka/kafka_client_test.go b/tests/kafka/kafka_client_test.go
new file mode 100644
index 0000000..76d63c6
--- /dev/null
+++ b/tests/kafka/kafka_client_test.go
@@ -0,0 +1,189 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka
+
+import (
+ "fmt"
+ "github.com/golang/protobuf/ptypes"
+ "github.com/golang/protobuf/ptypes/any"
+ "github.com/google/uuid"
+ "github.com/opencord/voltha-go/common/log"
+ kk "github.com/opencord/voltha-go/kafka"
+ ca "github.com/opencord/voltha-go/protos/core_adapter"
+ "github.com/stretchr/testify/assert"
+ "os"
+ "testing"
+ "time"
+)
+
+/*
+Prerequite: Start the kafka/zookeeper containers.
+*/
+
+var partionClient kk.Client
+var groupClient kk.Client
+var totalTime int64
+var numMessageToSend int
+var totalMessageReceived int
+
+type sendToKafka func(interface{}, *kk.Topic, ...string) error
+
+func init() {
+ log.AddPackage(log.JSON, log.ErrorLevel, nil)
+ hostIP := os.Getenv("DOCKER_HOST_IP")
+ log.UpdateAllLoggers(log.Fields{"instanceId": "testing"})
+ log.SetAllLogLevel(log.ErrorLevel)
+ partionClient = kk.NewSaramaClient(
+ kk.ConsumerType(kk.PartitionConsumer),
+ kk.Host(hostIP),
+ kk.Port(9092),
+ kk.AutoCreateTopic(true),
+ kk.ProducerFlushFrequency(5))
+ partionClient.Start()
+ groupClient = kk.NewSaramaClient(
+ kk.ConsumerType(kk.GroupCustomer),
+ kk.Host(hostIP),
+ kk.Port(9092),
+ kk.AutoCreateTopic(false),
+ kk.ProducerFlushFrequency(5))
+ groupClient.Start()
+ numMessageToSend = 1
+}
+
+func waitForMessage(ch <-chan *ca.InterContainerMessage, doneCh chan string, maxMessages int) {
+ totalTime = 0
+ totalMessageReceived = 0
+ mytime := time.Now()
+startloop:
+ for {
+ select {
+ case msg := <-ch:
+ if totalMessageReceived == 0 {
+ mytime = time.Now()
+ }
+ totalTime = totalTime + (time.Now().UnixNano()-msg.Header.Timestamp)/int64(time.Millisecond)
+ //log.Debugw("msg-received", log.Fields{"msg":msg})
+ totalMessageReceived = totalMessageReceived + 1
+ if totalMessageReceived == maxMessages {
+ doneCh <- "All received"
+ break startloop
+ }
+ if totalMessageReceived%10000 == 0 {
+ fmt.Println("received-so-far", totalMessageReceived, totalTime, totalTime/int64(totalMessageReceived))
+ }
+ }
+ }
+ log.Infow("Received all messages", log.Fields{"total": time.Since(mytime)})
+}
+
+func sendMessages(topic *kk.Topic, numMessages int, fn sendToKafka) error {
+ // Loop for numMessages
+ for i := 0; i < numMessages; i++ {
+ msg := &ca.InterContainerMessage{}
+ msg.Header = &ca.Header{
+ Id: uuid.New().String(),
+ Type: ca.MessageType_REQUEST,
+ FromTopic: topic.Name,
+ ToTopic: topic.Name,
+ Timestamp: time.Now().UnixNano(),
+ }
+ var marshalledArg *any.Any
+ var err error
+ body := &ca.InterContainerRequestBody{Rpc: "testRPC", Args: []*ca.Argument{}}
+ if marshalledArg, err = ptypes.MarshalAny(body); err != nil {
+ log.Warnw("cannot-marshal-request", log.Fields{"error": err})
+ return err
+ }
+ msg.Body = marshalledArg
+ msg.Header.Timestamp = time.Now().UnixNano()
+ go fn(msg, topic)
+ //go partionClient.Send(msg, topic)
+ }
+ return nil
+}
+
+func runWithPartionConsumer(topic *kk.Topic, numMessages int, doneCh chan string) error {
+ var ch <-chan *ca.InterContainerMessage
+ var err error
+ if ch, err = partionClient.Subscribe(topic); err != nil {
+ return nil
+ }
+ go waitForMessage(ch, doneCh, numMessages)
+
+ //Now create a routine to send messages
+ go sendMessages(topic, numMessages, partionClient.Send)
+
+ return nil
+}
+
+func runWithGroupConsumer(topic *kk.Topic, numMessages int, doneCh chan string) error {
+ var ch <-chan *ca.InterContainerMessage
+ var err error
+ if ch, err = groupClient.Subscribe(topic); err != nil {
+ return nil
+ }
+ go waitForMessage(ch, doneCh, numMessages)
+
+ //Now create a routine to send messages
+ go sendMessages(topic, numMessages, groupClient.Send)
+
+ return nil
+}
+
+func TestPartitionConsumer(t *testing.T) {
+ done := make(chan string)
+ topic := &kk.Topic{Name: "CoreTest1"}
+ runWithPartionConsumer(topic, numMessageToSend, done)
+ start := time.Now()
+ // Wait for done
+ val := <-done
+ err := partionClient.DeleteTopic(topic)
+ assert.Nil(t, err)
+ partionClient.Stop()
+ assert.Equal(t, numMessageToSend, totalMessageReceived)
+ log.Infow("Partition consumer completed", log.Fields{"TotalMesages": totalMessageReceived, "TotalTime": totalTime, "val": val, "AverageTime": totalTime / int64(totalMessageReceived), "execTime": time.Since(start)})
+}
+
+func TestGroupConsumer(t *testing.T) {
+ done := make(chan string)
+ topic := &kk.Topic{Name: "CoreTest2"}
+ runWithGroupConsumer(topic, numMessageToSend, done)
+ start := time.Now()
+ // Wait for done
+ val := <-done
+ err := groupClient.DeleteTopic(topic)
+ assert.Nil(t, err)
+ groupClient.Stop()
+ assert.Equal(t, numMessageToSend, totalMessageReceived)
+ log.Infow("Group consumer completed", log.Fields{"TotalMesages": totalMessageReceived, "TotalTime": totalTime, "val": val, "AverageTime": totalTime / int64(totalMessageReceived), "execTime": time.Since(start)})
+
+}
+
+func TestCreateDeleteTopic(t *testing.T) {
+ hostIP := os.Getenv("DOCKER_HOST_IP")
+ client := kk.NewSaramaClient(
+ kk.ConsumerType(kk.PartitionConsumer),
+ kk.Host(hostIP),
+ kk.Port(9092),
+ kk.AutoCreateTopic(true),
+ kk.ProducerFlushFrequency(5))
+ client.Start()
+ topic := &kk.Topic{Name: "CoreTest20"}
+ err := client.CreateTopic(topic, 3, 1)
+ assert.Nil(t, err)
+ err = client.DeleteTopic(topic)
+ assert.Nil(t, err)
+}