[VOL-1435] Initial submission for device management integration
tests.  This update also comprises of some fixes with kafka
consumer and random mac address generation.

Change-Id: I4f8081752af646c3ed218ab17a541abb1b70cf5c
diff --git a/kafka/sarama_client.go b/kafka/sarama_client.go
index 35ede44..55c68a3 100644
--- a/kafka/sarama_client.go
+++ b/kafka/sarama_client.go
@@ -372,7 +372,7 @@
 				return nil, err
 			}
 		}
-		if consumerListeningChannel, err = sc.setupPartitionConsumerChannel(topic, sarama.OffsetNewest); err != nil {
+		if consumerListeningChannel, err = sc.setupPartitionConsumerChannel(topic, getOffset(kvArgs...)); err != nil {
 			log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
 			return nil, err
 		}
@@ -394,7 +394,7 @@
 			// Need to use a unique group Id per topic
 			groupId = sc.consumerGroupPrefix + topic.Name
 		}
-		if consumerListeningChannel, err = sc.setupGroupConsumerChannel(topic, groupId); err != nil {
+		if consumerListeningChannel, err = sc.setupGroupConsumerChannel(topic, groupId, getOffset(kvArgs...)); err != nil {
 			log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
 			return nil, err
 		}
@@ -477,6 +477,16 @@
 	return ""
 }
 
+// getOffset returns the offset from the key-value args.
+func getOffset(kvArgs ...*KVArg) int64 {
+	for _, arg := range kvArgs {
+		if arg.Key == Offset {
+			return arg.Value.(int64)
+		}
+	}
+	return sarama.OffsetNewest
+}
+
 func (sc *SaramaClient) createClusterAdmin() error {
 	kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
 	config := sarama.NewConfig()
@@ -684,7 +694,7 @@
 }
 
 // createGroupConsumer creates a consumers group
-func (sc *SaramaClient) createGroupConsumer(topic *Topic, groupId string, retries int) (*scc.Consumer, error) {
+func (sc *SaramaClient) createGroupConsumer(topic *Topic, groupId string, initialOffset int64, retries int) (*scc.Consumer, error) {
 	config := scc.NewConfig()
 	config.ClientID = uuid.New().String()
 	config.Group.Mode = scc.ConsumerModeMultiplex
@@ -692,7 +702,7 @@
 	//config.Group.Return.Notifications = false
 	//config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
 	//config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
-	config.Consumer.Offsets.Initial = sarama.OffsetNewest
+	config.Consumer.Offsets.Initial = initialOffset
 	//config.Consumer.Offsets.Initial = sarama.OffsetOldest
 	kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
 	brokers := []string{kafkaFullAddr}
@@ -713,7 +723,7 @@
 }
 
 // dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
-// topic via the unique channel each subsciber received during subscription
+// topic via the unique channel each subscriber received during subscription
 func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ic.InterContainerMessage) {
 	// Need to go over all channels and publish messages to them - do we need to copy msg?
 	sc.lockTopicToConsumerChannelMap.Lock()
@@ -851,11 +861,11 @@
 
 // setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
 // for that topic.  It also starts the routine that listens for messages on that topic.
-func (sc *SaramaClient) setupGroupConsumerChannel(topic *Topic, groupId string) (chan *ic.InterContainerMessage, error) {
+func (sc *SaramaClient) setupGroupConsumerChannel(topic *Topic, groupId string, initialOffset int64) (chan *ic.InterContainerMessage, error) {
 	// TODO:  Replace this development partition consumers with a group consumers
 	var pConsumer *scc.Consumer
 	var err error
-	if pConsumer, err = sc.createGroupConsumer(topic, groupId, DefaultMaxRetries); err != nil {
+	if pConsumer, err = sc.createGroupConsumer(topic, groupId, initialOffset, DefaultMaxRetries); err != nil {
 		log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
 		return nil, err
 	}