[VOL-1435] Initial submission for device management integration
tests. This update also comprises of some fixes with kafka
consumer and random mac address generation.
Change-Id: I4f8081752af646c3ed218ab17a541abb1b70cf5c
diff --git a/kafka/sarama_client.go b/kafka/sarama_client.go
index 35ede44..55c68a3 100644
--- a/kafka/sarama_client.go
+++ b/kafka/sarama_client.go
@@ -372,7 +372,7 @@
return nil, err
}
}
- if consumerListeningChannel, err = sc.setupPartitionConsumerChannel(topic, sarama.OffsetNewest); err != nil {
+ if consumerListeningChannel, err = sc.setupPartitionConsumerChannel(topic, getOffset(kvArgs...)); err != nil {
log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
return nil, err
}
@@ -394,7 +394,7 @@
// Need to use a unique group Id per topic
groupId = sc.consumerGroupPrefix + topic.Name
}
- if consumerListeningChannel, err = sc.setupGroupConsumerChannel(topic, groupId); err != nil {
+ if consumerListeningChannel, err = sc.setupGroupConsumerChannel(topic, groupId, getOffset(kvArgs...)); err != nil {
log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
return nil, err
}
@@ -477,6 +477,16 @@
return ""
}
+// getOffset returns the offset from the key-value args.
+func getOffset(kvArgs ...*KVArg) int64 {
+ for _, arg := range kvArgs {
+ if arg.Key == Offset {
+ return arg.Value.(int64)
+ }
+ }
+ return sarama.OffsetNewest
+}
+
func (sc *SaramaClient) createClusterAdmin() error {
kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
config := sarama.NewConfig()
@@ -684,7 +694,7 @@
}
// createGroupConsumer creates a consumers group
-func (sc *SaramaClient) createGroupConsumer(topic *Topic, groupId string, retries int) (*scc.Consumer, error) {
+func (sc *SaramaClient) createGroupConsumer(topic *Topic, groupId string, initialOffset int64, retries int) (*scc.Consumer, error) {
config := scc.NewConfig()
config.ClientID = uuid.New().String()
config.Group.Mode = scc.ConsumerModeMultiplex
@@ -692,7 +702,7 @@
//config.Group.Return.Notifications = false
//config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
//config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
- config.Consumer.Offsets.Initial = sarama.OffsetNewest
+ config.Consumer.Offsets.Initial = initialOffset
//config.Consumer.Offsets.Initial = sarama.OffsetOldest
kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
brokers := []string{kafkaFullAddr}
@@ -713,7 +723,7 @@
}
// dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
-// topic via the unique channel each subsciber received during subscription
+// topic via the unique channel each subscriber received during subscription
func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ic.InterContainerMessage) {
// Need to go over all channels and publish messages to them - do we need to copy msg?
sc.lockTopicToConsumerChannelMap.Lock()
@@ -851,11 +861,11 @@
// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
// for that topic. It also starts the routine that listens for messages on that topic.
-func (sc *SaramaClient) setupGroupConsumerChannel(topic *Topic, groupId string) (chan *ic.InterContainerMessage, error) {
+func (sc *SaramaClient) setupGroupConsumerChannel(topic *Topic, groupId string, initialOffset int64) (chan *ic.InterContainerMessage, error) {
// TODO: Replace this development partition consumers with a group consumers
var pConsumer *scc.Consumer
var err error
- if pConsumer, err = sc.createGroupConsumer(topic, groupId, DefaultMaxRetries); err != nil {
+ if pConsumer, err = sc.createGroupConsumer(topic, groupId, initialOffset, DefaultMaxRetries); err != nil {
log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
return nil, err
}