[VOL-2193] Create mocks for Kafka Client and Etcd

This commit consists of:
1) A kafka client mock that implements the kafka client interface
under voltha-lib-go/pkg/kafka/client.go
2) An embedded Etcd server that runs in-process and represents an
Etcd server.

Change-Id: I52a36132568e08c596bb4136918bebcb654a3b99
diff --git a/pkg/mocks/common_test.go b/pkg/mocks/common_test.go
new file mode 100644
index 0000000..1ff5700
--- /dev/null
+++ b/pkg/mocks/common_test.go
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package mocks
+
+import (
+	"github.com/opencord/voltha-lib-go/v2/pkg/log"
+)
+
+const (
+	logLevel = log.FatalLevel
+)
+
+// Unit test initialization. This init() function handles all unit tests in
+// the current directory.
+func init() {
+	// Setup this package so that it's log level can be modified at run time
+	_, err := log.AddPackage(log.JSON, logLevel, log.Fields{"instanceId": "mocks"})
+	if err != nil {
+		panic(err)
+	}
+}
diff --git a/pkg/mocks/etcd_server.go b/pkg/mocks/etcd_server.go
new file mode 100644
index 0000000..9b9dec4
--- /dev/null
+++ b/pkg/mocks/etcd_server.go
@@ -0,0 +1,84 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package mocks
+
+import (
+	"go.etcd.io/etcd/embed"
+	"log"
+	"os"
+	"time"
+)
+
+const (
+	serverStartUpTimeout   = 10 * time.Second // Maximum time allowed to wait for the Etcd server to be ready
+	localPersistentStorage = "voltha.embed.etcd"
+)
+
+//EtcdServer represents an embedded Etcd server.  It is used for testing only.
+type EtcdServer struct {
+	server *embed.Etcd
+}
+
+//getDefaultCfg specifies the default config
+func getDefaultCfg() *embed.Config {
+	cfg := embed.NewConfig()
+	cfg.Dir = localPersistentStorage
+	cfg.Logger = "zap"
+	cfg.LogLevel = "error"
+	return cfg
+}
+
+//StartEtcdServer creates and starts an embedded Etcd server.  A local directory to store data is created for the
+//embedded server lifetime (for the duration of a unit test.  The server runs at localhost:2379.
+func StartEtcdServer(cfg *embed.Config) *EtcdServer {
+	// If the server is already running, just return
+	if cfg == nil {
+		cfg = getDefaultCfg()
+	}
+	// Remove the local directory as
+	// a safeguard for the case where a prior test failed
+	if err := os.RemoveAll(localPersistentStorage); err != nil {
+		log.Fatalf("Failure removing local directory %s", localPersistentStorage)
+	}
+	e, err := embed.StartEtcd(cfg)
+	if err != nil {
+		log.Fatal(err)
+	}
+	select {
+	case <-e.Server.ReadyNotify():
+		log.Printf("Embedded Etcd server is ready!")
+	case <-time.After(serverStartUpTimeout):
+		e.Server.HardStop() // trigger a shutdown
+		e.Close()
+		log.Fatal("Embedded Etcd server took too long to start!")
+	case err := <-e.Err():
+		e.Server.HardStop() // trigger a shutdown
+		e.Close()
+		log.Fatalf("Embedded Etcd server errored out - %s", err)
+	}
+	return &EtcdServer{server: e}
+}
+
+//Stop closes the embedded Etcd server and removes the local data directory as well
+func (es *EtcdServer) Stop() {
+	if es != nil {
+		es.server.Server.HardStop()
+		es.server.Close()
+		if err := os.RemoveAll(localPersistentStorage); err != nil {
+			log.Fatalf("Failure removing local directory %s", es.server.Config().Dir)
+		}
+	}
+}
diff --git a/pkg/mocks/etcd_server_test.go b/pkg/mocks/etcd_server_test.go
new file mode 100644
index 0000000..8230869
--- /dev/null
+++ b/pkg/mocks/etcd_server_test.go
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mocks
+
+import (
+	"github.com/opencord/voltha-lib-go/v2/pkg/db/kvstore"
+	"github.com/stretchr/testify/assert"
+	"log"
+	"os"
+	"testing"
+)
+
+var etcdServer *EtcdServer
+var client *kvstore.EtcdClient
+
+func setup() {
+	etcdServer = StartEtcdServer(nil)
+	if etcdServer == nil {
+		log.Fatal("Embedded server failed to start")
+	}
+	var err error
+	client, err = kvstore.NewEtcdClient("localhost:2379", 10)
+	if err != nil || client == nil {
+		etcdServer.Stop()
+		log.Fatal("Failed to create an Etcd client")
+	}
+}
+
+func TestEtcdServerRW(t *testing.T) {
+	key := "myKey-1"
+	value := "myVal-1"
+	err := client.Put(key, value, 10)
+	assert.Nil(t, err)
+	kvp, err := client.Get(key, 10)
+	assert.Nil(t, err)
+	assert.NotNil(t, kvp)
+	assert.Equal(t, kvp.Key, key)
+	val, err := kvstore.ToString(kvp.Value)
+	assert.Nil(t, err)
+	assert.Equal(t, value, val)
+}
+
+func TestEtcdServerReserve(t *testing.T) {
+	assert.NotNil(t, client)
+	txId := "tnxId-1"
+	val, err := client.Reserve("transactions", txId, 10)
+	assert.Nil(t, err)
+	assert.NotNil(t, val)
+	assert.Equal(t, val, txId)
+}
+
+func shutdown() {
+	if client != nil {
+		client.Close()
+	}
+	if etcdServer != nil {
+		etcdServer.Stop()
+	}
+}
+
+func TestMain(m *testing.M) {
+	setup()
+	code := m.Run()
+	shutdown()
+	os.Exit(code)
+}
diff --git a/pkg/mocks/kafka_client.go b/pkg/mocks/kafka_client.go
new file mode 100644
index 0000000..79cdfbc
--- /dev/null
+++ b/pkg/mocks/kafka_client.go
@@ -0,0 +1,133 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package mocks
+
+import (
+	"fmt"
+	"github.com/opencord/voltha-lib-go/v2/pkg/kafka"
+	"github.com/opencord/voltha-lib-go/v2/pkg/log"
+	ic "github.com/opencord/voltha-protos/v2/go/inter_container"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+	"sync"
+)
+
+type KafkaClient struct {
+	topicsChannelMap map[string][]chan *ic.InterContainerMessage
+	lock             sync.RWMutex
+}
+
+func NewKafkaClient() *KafkaClient {
+	return &KafkaClient{
+		topicsChannelMap: make(map[string][]chan *ic.InterContainerMessage),
+		lock:             sync.RWMutex{},
+	}
+}
+
+func (kc *KafkaClient) Start() error {
+	log.Debug("kafka-client-started")
+	return nil
+}
+
+func (kc *KafkaClient) Stop() {
+	kc.lock.Lock()
+	defer kc.lock.Unlock()
+	for topic, chnls := range kc.topicsChannelMap {
+		for _, c := range chnls {
+			close(c)
+		}
+		delete(kc.topicsChannelMap, topic)
+	}
+	log.Debug("kafka-client-stopped")
+}
+
+func (kc *KafkaClient) CreateTopic(topic *kafka.Topic, numPartition int, repFactor int) error {
+	log.Debugw("CreatingTopic", log.Fields{"topic": topic.Name, "numPartition": numPartition, "replicationFactor": repFactor})
+	kc.lock.Lock()
+	defer kc.lock.Unlock()
+	if _, ok := kc.topicsChannelMap[topic.Name]; ok {
+		return fmt.Errorf("Topic %s already exist", topic.Name)
+	}
+	ch := make(chan *ic.InterContainerMessage)
+	kc.topicsChannelMap[topic.Name] = append(kc.topicsChannelMap[topic.Name], ch)
+	return nil
+}
+
+func (kc *KafkaClient) DeleteTopic(topic *kafka.Topic) error {
+	log.Debugw("DeleteTopic", log.Fields{"topic": topic.Name})
+	kc.lock.Lock()
+	defer kc.lock.Unlock()
+	delete(kc.topicsChannelMap, topic.Name)
+	return nil
+}
+
+func (kc *KafkaClient) Subscribe(topic *kafka.Topic, kvArgs ...*kafka.KVArg) (<-chan *ic.InterContainerMessage, error) {
+	log.Debugw("Subscribe", log.Fields{"topic": topic.Name, "args": kvArgs})
+	kc.lock.Lock()
+	defer kc.lock.Unlock()
+	ch := make(chan *ic.InterContainerMessage)
+	kc.topicsChannelMap[topic.Name] = append(kc.topicsChannelMap[topic.Name], ch)
+	return ch, nil
+}
+
+func removeChannel(s []chan *ic.InterContainerMessage, i int) []chan *ic.InterContainerMessage {
+	s[i] = s[len(s)-1]
+	return s[:len(s)-1]
+}
+
+func (kc *KafkaClient) UnSubscribe(topic *kafka.Topic, ch <-chan *ic.InterContainerMessage) error {
+	log.Debugw("UnSubscribe", log.Fields{"topic": topic.Name})
+	kc.lock.Lock()
+	defer kc.lock.Unlock()
+	if chnls, ok := kc.topicsChannelMap[topic.Name]; ok {
+		idx := -1
+		for i, c := range chnls {
+			if c == ch {
+				close(c)
+				idx = i
+			}
+		}
+		if idx >= 0 {
+			kc.topicsChannelMap[topic.Name] = removeChannel(kc.topicsChannelMap[topic.Name], idx)
+		}
+	}
+	return nil
+}
+
+func (kc *KafkaClient) Send(msg interface{}, topic *kafka.Topic, keys ...string) error {
+	req, ok := msg.(*ic.InterContainerMessage)
+	if !ok {
+		return status.Error(codes.InvalidArgument, "msg-not-InterContainerMessage-type")
+	}
+	if req == nil {
+		return status.Error(codes.InvalidArgument, "msg-nil")
+	}
+	kc.lock.RLock()
+	defer kc.lock.RUnlock()
+	for _, ch := range kc.topicsChannelMap[topic.Name] {
+		log.Debugw("Publishing", log.Fields{"fromTopic": req.Header.FromTopic, "toTopic": topic.Name, "id": req.Header.Id})
+		ch <- req
+	}
+	return nil
+}
+
+func (kc *KafkaClient) SendLiveness() error {
+	return status.Error(codes.Unimplemented, "SendLiveness")
+}
+
+func (kc *KafkaClient) EnableLivenessChannel(enable bool) chan bool {
+	return nil
+}
diff --git a/pkg/mocks/kafka_client_test.go b/pkg/mocks/kafka_client_test.go
new file mode 100644
index 0000000..f4707f6
--- /dev/null
+++ b/pkg/mocks/kafka_client_test.go
@@ -0,0 +1,100 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mocks
+
+import (
+	"github.com/opencord/voltha-lib-go/v2/pkg/kafka"
+	ic "github.com/opencord/voltha-protos/v2/go/inter_container"
+	"github.com/stretchr/testify/assert"
+	"testing"
+	"time"
+)
+
+func TestKafkaClientImplementsKafkaClientIf(t *testing.T) {
+	client := NewKafkaClient()
+
+	if _, ok := interface{}(client).(kafka.Client); !ok {
+		t.Error("mock kafka client does not implement voltha-lib-go/v2/pkg/kafka/Client interface")
+	}
+}
+
+func TestKafkaClientCreateTopic(t *testing.T) {
+	cTkc := NewKafkaClient()
+	topic := kafka.Topic{Name: "myTopic"}
+	err := cTkc.CreateTopic(&topic, 1, 1)
+	assert.Nil(t, err)
+	err = cTkc.CreateTopic(&topic, 1, 1)
+	assert.NotNil(t, err)
+}
+
+func TestKafkaClientDeleteTopic(t *testing.T) {
+	cTkc := NewKafkaClient()
+	topic := kafka.Topic{Name: "myTopic"}
+	err := cTkc.DeleteTopic(&topic)
+	assert.Nil(t, err)
+}
+
+func TestKafkaClientSubscribeSend(t *testing.T) {
+	cTkc := NewKafkaClient()
+	topic := kafka.Topic{Name: "myTopic"}
+	ch, err := cTkc.Subscribe(&topic)
+	assert.Nil(t, err)
+	assert.NotNil(t, ch)
+	testCh := make(chan bool)
+	maxWait := 5 * time.Millisecond
+	msg := &ic.InterContainerMessage{
+		Header: &ic.Header{Id: "1234", ToTopic: topic.Name},
+		Body:   nil,
+	}
+	timer := time.NewTimer(maxWait)
+	defer timer.Stop()
+	go func() {
+		select {
+		case val, ok := <-ch:
+			assert.True(t, ok)
+			assert.Equal(t, val, msg)
+			testCh <- true
+		case <-timer.C:
+			testCh <- false
+		}
+	}()
+	err = cTkc.Send(msg, &topic)
+	assert.Nil(t, err)
+	res := <-testCh
+	assert.True(t, res)
+}
+
+func TestKafkaClientUnSubscribe(t *testing.T) {
+	cTkc := NewKafkaClient()
+	topic := kafka.Topic{Name: "myTopic"}
+	ch, err := cTkc.Subscribe(&topic)
+	assert.Nil(t, err)
+	assert.NotNil(t, ch)
+	err = cTkc.UnSubscribe(&topic, ch)
+	assert.Nil(t, err)
+}
+
+func TestKafkaClientStop(t *testing.T) {
+	cTkc := NewKafkaClient()
+	topic := kafka.Topic{Name: "myTopic"}
+	ch, err := cTkc.Subscribe(&topic)
+	assert.Nil(t, err)
+	assert.NotNil(t, ch)
+	err = cTkc.UnSubscribe(&topic, ch)
+	assert.Nil(t, err)
+	cTkc.Stop()
+}