This commit adds a complete partition consumer as well as a
group consumer to the sarama client library.  It also upgrades
the kafka running version.

Change-Id: Idca3eb1aa31d668afa86d12b39d6a1b0ab1965bc
diff --git a/tests/kafka/kafka_client_test.go b/tests/kafka/kafka_client_test.go
new file mode 100644
index 0000000..76d63c6
--- /dev/null
+++ b/tests/kafka/kafka_client_test.go
@@ -0,0 +1,189 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka
+
+import (
+	"fmt"
+	"github.com/golang/protobuf/ptypes"
+	"github.com/golang/protobuf/ptypes/any"
+	"github.com/google/uuid"
+	"github.com/opencord/voltha-go/common/log"
+	kk "github.com/opencord/voltha-go/kafka"
+	ca "github.com/opencord/voltha-go/protos/core_adapter"
+	"github.com/stretchr/testify/assert"
+	"os"
+	"testing"
+	"time"
+)
+
+/*
+Prerequite:  Start the kafka/zookeeper containers.
+*/
+
+var partionClient kk.Client
+var groupClient kk.Client
+var totalTime int64
+var numMessageToSend int
+var totalMessageReceived int
+
+type sendToKafka func(interface{}, *kk.Topic, ...string) error
+
+func init() {
+	log.AddPackage(log.JSON, log.ErrorLevel, nil)
+	hostIP := os.Getenv("DOCKER_HOST_IP")
+	log.UpdateAllLoggers(log.Fields{"instanceId": "testing"})
+	log.SetAllLogLevel(log.ErrorLevel)
+	partionClient = kk.NewSaramaClient(
+		kk.ConsumerType(kk.PartitionConsumer),
+		kk.Host(hostIP),
+		kk.Port(9092),
+		kk.AutoCreateTopic(true),
+		kk.ProducerFlushFrequency(5))
+	partionClient.Start()
+	groupClient = kk.NewSaramaClient(
+		kk.ConsumerType(kk.GroupCustomer),
+		kk.Host(hostIP),
+		kk.Port(9092),
+		kk.AutoCreateTopic(false),
+		kk.ProducerFlushFrequency(5))
+	groupClient.Start()
+	numMessageToSend = 1
+}
+
+func waitForMessage(ch <-chan *ca.InterContainerMessage, doneCh chan string, maxMessages int) {
+	totalTime = 0
+	totalMessageReceived = 0
+	mytime := time.Now()
+startloop:
+	for {
+		select {
+		case msg := <-ch:
+			if totalMessageReceived == 0 {
+				mytime = time.Now()
+			}
+			totalTime = totalTime + (time.Now().UnixNano()-msg.Header.Timestamp)/int64(time.Millisecond)
+			//log.Debugw("msg-received", log.Fields{"msg":msg})
+			totalMessageReceived = totalMessageReceived + 1
+			if totalMessageReceived == maxMessages {
+				doneCh <- "All received"
+				break startloop
+			}
+			if totalMessageReceived%10000 == 0 {
+				fmt.Println("received-so-far", totalMessageReceived, totalTime, totalTime/int64(totalMessageReceived))
+			}
+		}
+	}
+	log.Infow("Received all messages", log.Fields{"total": time.Since(mytime)})
+}
+
+func sendMessages(topic *kk.Topic, numMessages int, fn sendToKafka) error {
+	// Loop for numMessages
+	for i := 0; i < numMessages; i++ {
+		msg := &ca.InterContainerMessage{}
+		msg.Header = &ca.Header{
+			Id:        uuid.New().String(),
+			Type:      ca.MessageType_REQUEST,
+			FromTopic: topic.Name,
+			ToTopic:   topic.Name,
+			Timestamp: time.Now().UnixNano(),
+		}
+		var marshalledArg *any.Any
+		var err error
+		body := &ca.InterContainerRequestBody{Rpc: "testRPC", Args: []*ca.Argument{}}
+		if marshalledArg, err = ptypes.MarshalAny(body); err != nil {
+			log.Warnw("cannot-marshal-request", log.Fields{"error": err})
+			return err
+		}
+		msg.Body = marshalledArg
+		msg.Header.Timestamp = time.Now().UnixNano()
+		go fn(msg, topic)
+		//go partionClient.Send(msg, topic)
+	}
+	return nil
+}
+
+func runWithPartionConsumer(topic *kk.Topic, numMessages int, doneCh chan string) error {
+	var ch <-chan *ca.InterContainerMessage
+	var err error
+	if ch, err = partionClient.Subscribe(topic); err != nil {
+		return nil
+	}
+	go waitForMessage(ch, doneCh, numMessages)
+
+	//Now create a routine to send messages
+	go sendMessages(topic, numMessages, partionClient.Send)
+
+	return nil
+}
+
+func runWithGroupConsumer(topic *kk.Topic, numMessages int, doneCh chan string) error {
+	var ch <-chan *ca.InterContainerMessage
+	var err error
+	if ch, err = groupClient.Subscribe(topic); err != nil {
+		return nil
+	}
+	go waitForMessage(ch, doneCh, numMessages)
+
+	//Now create a routine to send messages
+	go sendMessages(topic, numMessages, groupClient.Send)
+
+	return nil
+}
+
+func TestPartitionConsumer(t *testing.T) {
+	done := make(chan string)
+	topic := &kk.Topic{Name: "CoreTest1"}
+	runWithPartionConsumer(topic, numMessageToSend, done)
+	start := time.Now()
+	// Wait for done
+	val := <-done
+	err := partionClient.DeleteTopic(topic)
+	assert.Nil(t, err)
+	partionClient.Stop()
+	assert.Equal(t, numMessageToSend, totalMessageReceived)
+	log.Infow("Partition consumer completed", log.Fields{"TotalMesages": totalMessageReceived, "TotalTime": totalTime, "val": val, "AverageTime": totalTime / int64(totalMessageReceived), "execTime": time.Since(start)})
+}
+
+func TestGroupConsumer(t *testing.T) {
+	done := make(chan string)
+	topic := &kk.Topic{Name: "CoreTest2"}
+	runWithGroupConsumer(topic, numMessageToSend, done)
+	start := time.Now()
+	// Wait for done
+	val := <-done
+	err := groupClient.DeleteTopic(topic)
+	assert.Nil(t, err)
+	groupClient.Stop()
+	assert.Equal(t, numMessageToSend, totalMessageReceived)
+	log.Infow("Group consumer completed", log.Fields{"TotalMesages": totalMessageReceived, "TotalTime": totalTime, "val": val, "AverageTime": totalTime / int64(totalMessageReceived), "execTime": time.Since(start)})
+
+}
+
+func TestCreateDeleteTopic(t *testing.T) {
+	hostIP := os.Getenv("DOCKER_HOST_IP")
+	client := kk.NewSaramaClient(
+		kk.ConsumerType(kk.PartitionConsumer),
+		kk.Host(hostIP),
+		kk.Port(9092),
+		kk.AutoCreateTopic(true),
+		kk.ProducerFlushFrequency(5))
+	client.Start()
+	topic := &kk.Topic{Name: "CoreTest20"}
+	err := client.CreateTopic(topic, 3, 1)
+	assert.Nil(t, err)
+	err = client.DeleteTopic(topic)
+	assert.Nil(t, err)
+}