[VOL-2235] Mocks and interfaces for rw-core
This update consists of mocks that are used by the rw-core
during unit testing. It also includes interfaces used for unit
tests.
Change-Id: I20ca1455c358113c3aa897acc6355e0ddbc614b7
diff --git a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/mocks/etcd_server.go b/vendor/github.com/opencord/voltha-lib-go/v2/pkg/mocks/etcd_server.go
new file mode 100644
index 0000000..9b9dec4
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v2/pkg/mocks/etcd_server.go
@@ -0,0 +1,84 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package mocks
+
+import (
+ "go.etcd.io/etcd/embed"
+ "log"
+ "os"
+ "time"
+)
+
+const (
+ serverStartUpTimeout = 10 * time.Second // Maximum time allowed to wait for the Etcd server to be ready
+ localPersistentStorage = "voltha.embed.etcd"
+)
+
+//EtcdServer represents an embedded Etcd server. It is used for testing only.
+type EtcdServer struct {
+ server *embed.Etcd
+}
+
+//getDefaultCfg specifies the default config
+func getDefaultCfg() *embed.Config {
+ cfg := embed.NewConfig()
+ cfg.Dir = localPersistentStorage
+ cfg.Logger = "zap"
+ cfg.LogLevel = "error"
+ return cfg
+}
+
+//StartEtcdServer creates and starts an embedded Etcd server. A local directory to store data is created for the
+//embedded server lifetime (for the duration of a unit test. The server runs at localhost:2379.
+func StartEtcdServer(cfg *embed.Config) *EtcdServer {
+ // If the server is already running, just return
+ if cfg == nil {
+ cfg = getDefaultCfg()
+ }
+ // Remove the local directory as
+ // a safeguard for the case where a prior test failed
+ if err := os.RemoveAll(localPersistentStorage); err != nil {
+ log.Fatalf("Failure removing local directory %s", localPersistentStorage)
+ }
+ e, err := embed.StartEtcd(cfg)
+ if err != nil {
+ log.Fatal(err)
+ }
+ select {
+ case <-e.Server.ReadyNotify():
+ log.Printf("Embedded Etcd server is ready!")
+ case <-time.After(serverStartUpTimeout):
+ e.Server.HardStop() // trigger a shutdown
+ e.Close()
+ log.Fatal("Embedded Etcd server took too long to start!")
+ case err := <-e.Err():
+ e.Server.HardStop() // trigger a shutdown
+ e.Close()
+ log.Fatalf("Embedded Etcd server errored out - %s", err)
+ }
+ return &EtcdServer{server: e}
+}
+
+//Stop closes the embedded Etcd server and removes the local data directory as well
+func (es *EtcdServer) Stop() {
+ if es != nil {
+ es.server.Server.HardStop()
+ es.server.Close()
+ if err := os.RemoveAll(localPersistentStorage); err != nil {
+ log.Fatalf("Failure removing local directory %s", es.server.Config().Dir)
+ }
+ }
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/mocks/kafka_client.go b/vendor/github.com/opencord/voltha-lib-go/v2/pkg/mocks/kafka_client.go
new file mode 100644
index 0000000..79cdfbc
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v2/pkg/mocks/kafka_client.go
@@ -0,0 +1,133 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package mocks
+
+import (
+ "fmt"
+ "github.com/opencord/voltha-lib-go/v2/pkg/kafka"
+ "github.com/opencord/voltha-lib-go/v2/pkg/log"
+ ic "github.com/opencord/voltha-protos/v2/go/inter_container"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+ "sync"
+)
+
+type KafkaClient struct {
+ topicsChannelMap map[string][]chan *ic.InterContainerMessage
+ lock sync.RWMutex
+}
+
+func NewKafkaClient() *KafkaClient {
+ return &KafkaClient{
+ topicsChannelMap: make(map[string][]chan *ic.InterContainerMessage),
+ lock: sync.RWMutex{},
+ }
+}
+
+func (kc *KafkaClient) Start() error {
+ log.Debug("kafka-client-started")
+ return nil
+}
+
+func (kc *KafkaClient) Stop() {
+ kc.lock.Lock()
+ defer kc.lock.Unlock()
+ for topic, chnls := range kc.topicsChannelMap {
+ for _, c := range chnls {
+ close(c)
+ }
+ delete(kc.topicsChannelMap, topic)
+ }
+ log.Debug("kafka-client-stopped")
+}
+
+func (kc *KafkaClient) CreateTopic(topic *kafka.Topic, numPartition int, repFactor int) error {
+ log.Debugw("CreatingTopic", log.Fields{"topic": topic.Name, "numPartition": numPartition, "replicationFactor": repFactor})
+ kc.lock.Lock()
+ defer kc.lock.Unlock()
+ if _, ok := kc.topicsChannelMap[topic.Name]; ok {
+ return fmt.Errorf("Topic %s already exist", topic.Name)
+ }
+ ch := make(chan *ic.InterContainerMessage)
+ kc.topicsChannelMap[topic.Name] = append(kc.topicsChannelMap[topic.Name], ch)
+ return nil
+}
+
+func (kc *KafkaClient) DeleteTopic(topic *kafka.Topic) error {
+ log.Debugw("DeleteTopic", log.Fields{"topic": topic.Name})
+ kc.lock.Lock()
+ defer kc.lock.Unlock()
+ delete(kc.topicsChannelMap, topic.Name)
+ return nil
+}
+
+func (kc *KafkaClient) Subscribe(topic *kafka.Topic, kvArgs ...*kafka.KVArg) (<-chan *ic.InterContainerMessage, error) {
+ log.Debugw("Subscribe", log.Fields{"topic": topic.Name, "args": kvArgs})
+ kc.lock.Lock()
+ defer kc.lock.Unlock()
+ ch := make(chan *ic.InterContainerMessage)
+ kc.topicsChannelMap[topic.Name] = append(kc.topicsChannelMap[topic.Name], ch)
+ return ch, nil
+}
+
+func removeChannel(s []chan *ic.InterContainerMessage, i int) []chan *ic.InterContainerMessage {
+ s[i] = s[len(s)-1]
+ return s[:len(s)-1]
+}
+
+func (kc *KafkaClient) UnSubscribe(topic *kafka.Topic, ch <-chan *ic.InterContainerMessage) error {
+ log.Debugw("UnSubscribe", log.Fields{"topic": topic.Name})
+ kc.lock.Lock()
+ defer kc.lock.Unlock()
+ if chnls, ok := kc.topicsChannelMap[topic.Name]; ok {
+ idx := -1
+ for i, c := range chnls {
+ if c == ch {
+ close(c)
+ idx = i
+ }
+ }
+ if idx >= 0 {
+ kc.topicsChannelMap[topic.Name] = removeChannel(kc.topicsChannelMap[topic.Name], idx)
+ }
+ }
+ return nil
+}
+
+func (kc *KafkaClient) Send(msg interface{}, topic *kafka.Topic, keys ...string) error {
+ req, ok := msg.(*ic.InterContainerMessage)
+ if !ok {
+ return status.Error(codes.InvalidArgument, "msg-not-InterContainerMessage-type")
+ }
+ if req == nil {
+ return status.Error(codes.InvalidArgument, "msg-nil")
+ }
+ kc.lock.RLock()
+ defer kc.lock.RUnlock()
+ for _, ch := range kc.topicsChannelMap[topic.Name] {
+ log.Debugw("Publishing", log.Fields{"fromTopic": req.Header.FromTopic, "toTopic": topic.Name, "id": req.Header.Id})
+ ch <- req
+ }
+ return nil
+}
+
+func (kc *KafkaClient) SendLiveness() error {
+ return status.Error(codes.Unimplemented, "SendLiveness")
+}
+
+func (kc *KafkaClient) EnableLivenessChannel(enable bool) chan bool {
+ return nil
+}