[VOL-2193] Create mocks for Kafka Client and Etcd
This commit consists of:
1) A kafka client mock that implements the kafka client interface
under voltha-lib-go/pkg/kafka/client.go
2) An embedded Etcd server that runs in-process and represents an
Etcd server.
Change-Id: I52a36132568e08c596bb4136918bebcb654a3b99
diff --git a/pkg/mocks/common_test.go b/pkg/mocks/common_test.go
new file mode 100644
index 0000000..1ff5700
--- /dev/null
+++ b/pkg/mocks/common_test.go
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package mocks
+
+import (
+ "github.com/opencord/voltha-lib-go/v2/pkg/log"
+)
+
+const (
+ logLevel = log.FatalLevel
+)
+
+// Unit test initialization. This init() function handles all unit tests in
+// the current directory.
+func init() {
+ // Setup this package so that it's log level can be modified at run time
+ _, err := log.AddPackage(log.JSON, logLevel, log.Fields{"instanceId": "mocks"})
+ if err != nil {
+ panic(err)
+ }
+}
diff --git a/pkg/mocks/etcd_server.go b/pkg/mocks/etcd_server.go
new file mode 100644
index 0000000..9b9dec4
--- /dev/null
+++ b/pkg/mocks/etcd_server.go
@@ -0,0 +1,84 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package mocks
+
+import (
+ "go.etcd.io/etcd/embed"
+ "log"
+ "os"
+ "time"
+)
+
+const (
+ serverStartUpTimeout = 10 * time.Second // Maximum time allowed to wait for the Etcd server to be ready
+ localPersistentStorage = "voltha.embed.etcd"
+)
+
+//EtcdServer represents an embedded Etcd server. It is used for testing only.
+type EtcdServer struct {
+ server *embed.Etcd
+}
+
+//getDefaultCfg specifies the default config
+func getDefaultCfg() *embed.Config {
+ cfg := embed.NewConfig()
+ cfg.Dir = localPersistentStorage
+ cfg.Logger = "zap"
+ cfg.LogLevel = "error"
+ return cfg
+}
+
+//StartEtcdServer creates and starts an embedded Etcd server. A local directory to store data is created for the
+//embedded server lifetime (for the duration of a unit test. The server runs at localhost:2379.
+func StartEtcdServer(cfg *embed.Config) *EtcdServer {
+ // If the server is already running, just return
+ if cfg == nil {
+ cfg = getDefaultCfg()
+ }
+ // Remove the local directory as
+ // a safeguard for the case where a prior test failed
+ if err := os.RemoveAll(localPersistentStorage); err != nil {
+ log.Fatalf("Failure removing local directory %s", localPersistentStorage)
+ }
+ e, err := embed.StartEtcd(cfg)
+ if err != nil {
+ log.Fatal(err)
+ }
+ select {
+ case <-e.Server.ReadyNotify():
+ log.Printf("Embedded Etcd server is ready!")
+ case <-time.After(serverStartUpTimeout):
+ e.Server.HardStop() // trigger a shutdown
+ e.Close()
+ log.Fatal("Embedded Etcd server took too long to start!")
+ case err := <-e.Err():
+ e.Server.HardStop() // trigger a shutdown
+ e.Close()
+ log.Fatalf("Embedded Etcd server errored out - %s", err)
+ }
+ return &EtcdServer{server: e}
+}
+
+//Stop closes the embedded Etcd server and removes the local data directory as well
+func (es *EtcdServer) Stop() {
+ if es != nil {
+ es.server.Server.HardStop()
+ es.server.Close()
+ if err := os.RemoveAll(localPersistentStorage); err != nil {
+ log.Fatalf("Failure removing local directory %s", es.server.Config().Dir)
+ }
+ }
+}
diff --git a/pkg/mocks/etcd_server_test.go b/pkg/mocks/etcd_server_test.go
new file mode 100644
index 0000000..8230869
--- /dev/null
+++ b/pkg/mocks/etcd_server_test.go
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mocks
+
+import (
+ "github.com/opencord/voltha-lib-go/v2/pkg/db/kvstore"
+ "github.com/stretchr/testify/assert"
+ "log"
+ "os"
+ "testing"
+)
+
+var etcdServer *EtcdServer
+var client *kvstore.EtcdClient
+
+func setup() {
+ etcdServer = StartEtcdServer(nil)
+ if etcdServer == nil {
+ log.Fatal("Embedded server failed to start")
+ }
+ var err error
+ client, err = kvstore.NewEtcdClient("localhost:2379", 10)
+ if err != nil || client == nil {
+ etcdServer.Stop()
+ log.Fatal("Failed to create an Etcd client")
+ }
+}
+
+func TestEtcdServerRW(t *testing.T) {
+ key := "myKey-1"
+ value := "myVal-1"
+ err := client.Put(key, value, 10)
+ assert.Nil(t, err)
+ kvp, err := client.Get(key, 10)
+ assert.Nil(t, err)
+ assert.NotNil(t, kvp)
+ assert.Equal(t, kvp.Key, key)
+ val, err := kvstore.ToString(kvp.Value)
+ assert.Nil(t, err)
+ assert.Equal(t, value, val)
+}
+
+func TestEtcdServerReserve(t *testing.T) {
+ assert.NotNil(t, client)
+ txId := "tnxId-1"
+ val, err := client.Reserve("transactions", txId, 10)
+ assert.Nil(t, err)
+ assert.NotNil(t, val)
+ assert.Equal(t, val, txId)
+}
+
+func shutdown() {
+ if client != nil {
+ client.Close()
+ }
+ if etcdServer != nil {
+ etcdServer.Stop()
+ }
+}
+
+func TestMain(m *testing.M) {
+ setup()
+ code := m.Run()
+ shutdown()
+ os.Exit(code)
+}
diff --git a/pkg/mocks/kafka_client.go b/pkg/mocks/kafka_client.go
new file mode 100644
index 0000000..79cdfbc
--- /dev/null
+++ b/pkg/mocks/kafka_client.go
@@ -0,0 +1,133 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package mocks
+
+import (
+ "fmt"
+ "github.com/opencord/voltha-lib-go/v2/pkg/kafka"
+ "github.com/opencord/voltha-lib-go/v2/pkg/log"
+ ic "github.com/opencord/voltha-protos/v2/go/inter_container"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+ "sync"
+)
+
+type KafkaClient struct {
+ topicsChannelMap map[string][]chan *ic.InterContainerMessage
+ lock sync.RWMutex
+}
+
+func NewKafkaClient() *KafkaClient {
+ return &KafkaClient{
+ topicsChannelMap: make(map[string][]chan *ic.InterContainerMessage),
+ lock: sync.RWMutex{},
+ }
+}
+
+func (kc *KafkaClient) Start() error {
+ log.Debug("kafka-client-started")
+ return nil
+}
+
+func (kc *KafkaClient) Stop() {
+ kc.lock.Lock()
+ defer kc.lock.Unlock()
+ for topic, chnls := range kc.topicsChannelMap {
+ for _, c := range chnls {
+ close(c)
+ }
+ delete(kc.topicsChannelMap, topic)
+ }
+ log.Debug("kafka-client-stopped")
+}
+
+func (kc *KafkaClient) CreateTopic(topic *kafka.Topic, numPartition int, repFactor int) error {
+ log.Debugw("CreatingTopic", log.Fields{"topic": topic.Name, "numPartition": numPartition, "replicationFactor": repFactor})
+ kc.lock.Lock()
+ defer kc.lock.Unlock()
+ if _, ok := kc.topicsChannelMap[topic.Name]; ok {
+ return fmt.Errorf("Topic %s already exist", topic.Name)
+ }
+ ch := make(chan *ic.InterContainerMessage)
+ kc.topicsChannelMap[topic.Name] = append(kc.topicsChannelMap[topic.Name], ch)
+ return nil
+}
+
+func (kc *KafkaClient) DeleteTopic(topic *kafka.Topic) error {
+ log.Debugw("DeleteTopic", log.Fields{"topic": topic.Name})
+ kc.lock.Lock()
+ defer kc.lock.Unlock()
+ delete(kc.topicsChannelMap, topic.Name)
+ return nil
+}
+
+func (kc *KafkaClient) Subscribe(topic *kafka.Topic, kvArgs ...*kafka.KVArg) (<-chan *ic.InterContainerMessage, error) {
+ log.Debugw("Subscribe", log.Fields{"topic": topic.Name, "args": kvArgs})
+ kc.lock.Lock()
+ defer kc.lock.Unlock()
+ ch := make(chan *ic.InterContainerMessage)
+ kc.topicsChannelMap[topic.Name] = append(kc.topicsChannelMap[topic.Name], ch)
+ return ch, nil
+}
+
+func removeChannel(s []chan *ic.InterContainerMessage, i int) []chan *ic.InterContainerMessage {
+ s[i] = s[len(s)-1]
+ return s[:len(s)-1]
+}
+
+func (kc *KafkaClient) UnSubscribe(topic *kafka.Topic, ch <-chan *ic.InterContainerMessage) error {
+ log.Debugw("UnSubscribe", log.Fields{"topic": topic.Name})
+ kc.lock.Lock()
+ defer kc.lock.Unlock()
+ if chnls, ok := kc.topicsChannelMap[topic.Name]; ok {
+ idx := -1
+ for i, c := range chnls {
+ if c == ch {
+ close(c)
+ idx = i
+ }
+ }
+ if idx >= 0 {
+ kc.topicsChannelMap[topic.Name] = removeChannel(kc.topicsChannelMap[topic.Name], idx)
+ }
+ }
+ return nil
+}
+
+func (kc *KafkaClient) Send(msg interface{}, topic *kafka.Topic, keys ...string) error {
+ req, ok := msg.(*ic.InterContainerMessage)
+ if !ok {
+ return status.Error(codes.InvalidArgument, "msg-not-InterContainerMessage-type")
+ }
+ if req == nil {
+ return status.Error(codes.InvalidArgument, "msg-nil")
+ }
+ kc.lock.RLock()
+ defer kc.lock.RUnlock()
+ for _, ch := range kc.topicsChannelMap[topic.Name] {
+ log.Debugw("Publishing", log.Fields{"fromTopic": req.Header.FromTopic, "toTopic": topic.Name, "id": req.Header.Id})
+ ch <- req
+ }
+ return nil
+}
+
+func (kc *KafkaClient) SendLiveness() error {
+ return status.Error(codes.Unimplemented, "SendLiveness")
+}
+
+func (kc *KafkaClient) EnableLivenessChannel(enable bool) chan bool {
+ return nil
+}
diff --git a/pkg/mocks/kafka_client_test.go b/pkg/mocks/kafka_client_test.go
new file mode 100644
index 0000000..f4707f6
--- /dev/null
+++ b/pkg/mocks/kafka_client_test.go
@@ -0,0 +1,100 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mocks
+
+import (
+ "github.com/opencord/voltha-lib-go/v2/pkg/kafka"
+ ic "github.com/opencord/voltha-protos/v2/go/inter_container"
+ "github.com/stretchr/testify/assert"
+ "testing"
+ "time"
+)
+
+func TestKafkaClientImplementsKafkaClientIf(t *testing.T) {
+ client := NewKafkaClient()
+
+ if _, ok := interface{}(client).(kafka.Client); !ok {
+ t.Error("mock kafka client does not implement voltha-lib-go/v2/pkg/kafka/Client interface")
+ }
+}
+
+func TestKafkaClientCreateTopic(t *testing.T) {
+ cTkc := NewKafkaClient()
+ topic := kafka.Topic{Name: "myTopic"}
+ err := cTkc.CreateTopic(&topic, 1, 1)
+ assert.Nil(t, err)
+ err = cTkc.CreateTopic(&topic, 1, 1)
+ assert.NotNil(t, err)
+}
+
+func TestKafkaClientDeleteTopic(t *testing.T) {
+ cTkc := NewKafkaClient()
+ topic := kafka.Topic{Name: "myTopic"}
+ err := cTkc.DeleteTopic(&topic)
+ assert.Nil(t, err)
+}
+
+func TestKafkaClientSubscribeSend(t *testing.T) {
+ cTkc := NewKafkaClient()
+ topic := kafka.Topic{Name: "myTopic"}
+ ch, err := cTkc.Subscribe(&topic)
+ assert.Nil(t, err)
+ assert.NotNil(t, ch)
+ testCh := make(chan bool)
+ maxWait := 5 * time.Millisecond
+ msg := &ic.InterContainerMessage{
+ Header: &ic.Header{Id: "1234", ToTopic: topic.Name},
+ Body: nil,
+ }
+ timer := time.NewTimer(maxWait)
+ defer timer.Stop()
+ go func() {
+ select {
+ case val, ok := <-ch:
+ assert.True(t, ok)
+ assert.Equal(t, val, msg)
+ testCh <- true
+ case <-timer.C:
+ testCh <- false
+ }
+ }()
+ err = cTkc.Send(msg, &topic)
+ assert.Nil(t, err)
+ res := <-testCh
+ assert.True(t, res)
+}
+
+func TestKafkaClientUnSubscribe(t *testing.T) {
+ cTkc := NewKafkaClient()
+ topic := kafka.Topic{Name: "myTopic"}
+ ch, err := cTkc.Subscribe(&topic)
+ assert.Nil(t, err)
+ assert.NotNil(t, ch)
+ err = cTkc.UnSubscribe(&topic, ch)
+ assert.Nil(t, err)
+}
+
+func TestKafkaClientStop(t *testing.T) {
+ cTkc := NewKafkaClient()
+ topic := kafka.Topic{Name: "myTopic"}
+ ch, err := cTkc.Subscribe(&topic)
+ assert.Nil(t, err)
+ assert.NotNil(t, ch)
+ err = cTkc.UnSubscribe(&topic, ch)
+ assert.Nil(t, err)
+ cTkc.Stop()
+}