[VOL-4291] Rw-core updates for gRPC migration
Change-Id: I8d5a554409115b29318089671ca4e1ab3fa98810
diff --git a/vendor/github.com/opencord/voltha-lib-go/v7/pkg/mocks/etcd/common.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/mocks/etcd/common.go
new file mode 100644
index 0000000..c893312
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/mocks/etcd/common.go
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package etcd
+
+import (
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
+)
+
+var logger log.CLogger
+
+func init() {
+ // Setup this package so that it's log level can be modified at run time
+ var err error
+ logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{})
+ if err != nil {
+ panic(err)
+ }
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v7/pkg/mocks/etcd/etcd_server.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/mocks/etcd/etcd_server.go
new file mode 100644
index 0000000..951baa9
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/mocks/etcd/etcd_server.go
@@ -0,0 +1,141 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package etcd
+
+import (
+ "context"
+ "fmt"
+ "net/url"
+ "os"
+ "strings"
+ "time"
+
+ "go.etcd.io/etcd/embed"
+)
+
+const (
+ serverStartUpTimeout = 10 * time.Second // Maximum time allowed to wait for the Etcd server to be ready
+ defaultLocalPersistentStorage = "voltha.test.embed.etcd"
+)
+
+//EtcdServer represents an embedded Etcd server. It is used for testing only.
+type EtcdServer struct {
+ server *embed.Etcd
+}
+
+func islogLevelValid(logLevel string) bool {
+ valid := []string{"debug", "info", "warn", "error", "panic", "fatal"}
+ for _, l := range valid {
+ if l == logLevel {
+ return true
+ }
+ }
+ return false
+}
+
+/*
+* MKConfig creates an embedded Etcd config
+* :param configName: A name for this config
+* :param clientPort: The port the etcd client will connect to (do not use 2379 for unit test)
+* :param peerPort: The port the etcd server will listen for its peers (do not use 2380 for unit test)
+* :param localPersistentStorageDir: The name of a local directory which will hold the Etcd server data
+* :param logLevel: One of debug, info, warn, error, panic, or fatal. Default 'info'.
+ */
+func MKConfig(ctx context.Context, configName string, clientPort, peerPort int, localPersistentStorageDir string, logLevel string) *embed.Config {
+ cfg := embed.NewConfig()
+ cfg.Name = configName
+ cfg.Dir = localPersistentStorageDir
+ // cfg.Logger = "zap"
+ if !islogLevelValid(logLevel) {
+ logger.Fatalf(ctx, "Invalid log level -%s", logLevel)
+ }
+ // cfg.LogLevel = logLevel
+ cfg.Debug = strings.EqualFold(logLevel, "debug")
+ cfg.LogPkgLevels = "*=C"
+ cfg.SetupLogging()
+
+ acurl, err := url.Parse(fmt.Sprintf("http://localhost:%d", clientPort))
+ if err != nil {
+ logger.Fatalf(ctx, "Invalid client port -%d", clientPort)
+ }
+ cfg.ACUrls = []url.URL{*acurl}
+ cfg.LCUrls = []url.URL{*acurl}
+
+ apurl, err := url.Parse(fmt.Sprintf("http://localhost:%d", peerPort))
+ if err != nil {
+ logger.Fatalf(ctx, "Invalid peer port -%d", peerPort)
+ }
+ cfg.LPUrls = []url.URL{*apurl}
+ cfg.APUrls = []url.URL{*apurl}
+
+ cfg.ClusterState = embed.ClusterStateFlagNew
+ cfg.InitialCluster = cfg.Name + "=" + apurl.String()
+
+ return cfg
+}
+
+//getDefaultCfg specifies the default config
+func getDefaultCfg() *embed.Config {
+ cfg := embed.NewConfig()
+ cfg.Debug = false
+ cfg.LogPkgLevels = "*=C"
+ cfg.SetupLogging()
+ cfg.Dir = defaultLocalPersistentStorage
+ return cfg
+}
+
+//StartEtcdServer creates and starts an embedded Etcd server. A local directory to store data is created for the
+//embedded server lifetime (for the duration of a unit test. The server runs at localhost:2379.
+func StartEtcdServer(ctx context.Context, cfg *embed.Config) *EtcdServer {
+ // If the server is already running, just return
+ if cfg == nil {
+ cfg = getDefaultCfg()
+ }
+ // Remove the local directory as
+ // a safeguard for the case where a prior test failed
+ if err := os.RemoveAll(cfg.Dir); err != nil {
+ logger.Fatalf(ctx, "Failure removing local directory %s", cfg.Dir)
+ }
+ e, err := embed.StartEtcd(cfg)
+ if err != nil {
+ logger.Fatal(ctx, err)
+ }
+ select {
+ case <-e.Server.ReadyNotify():
+ logger.Debug(ctx, "Embedded Etcd server is ready!")
+ case <-time.After(serverStartUpTimeout):
+ e.Server.HardStop() // trigger a shutdown
+ e.Close()
+ logger.Fatal(ctx, "Embedded Etcd server took too long to start!")
+ case err := <-e.Err():
+ e.Server.HardStop() // trigger a shutdown
+ e.Close()
+ logger.Fatalf(ctx, "Embedded Etcd server errored out - %s", err)
+ }
+ return &EtcdServer{server: e}
+}
+
+//Stop closes the embedded Etcd server and removes the local data directory as well
+func (es *EtcdServer) Stop(ctx context.Context) {
+ if es != nil {
+ storage := es.server.Config().Dir
+ es.server.Server.HardStop()
+ es.server.Close()
+ if err := os.RemoveAll(storage); err != nil {
+ logger.Fatalf(ctx, "Failure removing local directory %s", es.server.Config().Dir)
+ }
+ }
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v7/pkg/mocks/kafka/common.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/mocks/kafka/common.go
new file mode 100644
index 0000000..ff4aec9
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/mocks/kafka/common.go
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka
+
+import (
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
+)
+
+var logger log.CLogger
+
+func init() {
+ // Setup this package so that it's log level can be modified at run time
+ var err error
+ logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{})
+ if err != nil {
+ panic(err)
+ }
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v7/pkg/mocks/kafka/kafka_client.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/mocks/kafka/kafka_client.go
new file mode 100644
index 0000000..ea410ac
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/mocks/kafka/kafka_client.go
@@ -0,0 +1,176 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka
+
+import (
+ "context"
+ "fmt"
+ "sync"
+ "time"
+
+ "github.com/golang/protobuf/proto"
+ "github.com/opencord/voltha-lib-go/v7/pkg/kafka"
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+const (
+ maxConcurrentMessage = 100
+)
+
+// static check to ensure KafkaClient implements kafka.Client
+var _ kafka.Client = &KafkaClient{}
+
+type KafkaClient struct {
+ topicsChannelMap map[string][]chan proto.Message
+ lock sync.RWMutex
+ alive bool
+ livenessMutex sync.Mutex
+ liveness chan bool
+}
+
+func NewKafkaClient() *KafkaClient {
+ return &KafkaClient{
+ topicsChannelMap: make(map[string][]chan proto.Message),
+ lock: sync.RWMutex{},
+ }
+}
+
+func (kc *KafkaClient) Start(ctx context.Context) error {
+ logger.Debug(ctx, "kafka-client-started")
+ return nil
+}
+
+func (kc *KafkaClient) Stop(ctx context.Context) {
+ kc.lock.Lock()
+ defer kc.lock.Unlock()
+ for topic, chnls := range kc.topicsChannelMap {
+ for _, c := range chnls {
+ close(c)
+ }
+ delete(kc.topicsChannelMap, topic)
+ }
+ logger.Debug(ctx, "kafka-client-stopped")
+}
+
+func (kc *KafkaClient) CreateTopic(ctx context.Context, topic *kafka.Topic, numPartition int, repFactor int) error {
+ logger.Debugw(ctx, "CreatingTopic", log.Fields{"topic": topic.Name, "numPartition": numPartition, "replicationFactor": repFactor})
+ kc.lock.Lock()
+ defer kc.lock.Unlock()
+ if _, ok := kc.topicsChannelMap[topic.Name]; ok {
+ return fmt.Errorf("Topic %s already exist", topic.Name)
+ }
+ ch := make(chan proto.Message)
+ kc.topicsChannelMap[topic.Name] = append(kc.topicsChannelMap[topic.Name], ch)
+ return nil
+}
+
+func (kc *KafkaClient) DeleteTopic(ctx context.Context, topic *kafka.Topic) error {
+ logger.Debugw(ctx, "DeleteTopic", log.Fields{"topic": topic.Name})
+ kc.lock.Lock()
+ defer kc.lock.Unlock()
+ delete(kc.topicsChannelMap, topic.Name)
+ return nil
+}
+
+func (kc *KafkaClient) Subscribe(ctx context.Context, topic *kafka.Topic, kvArgs ...*kafka.KVArg) (<-chan proto.Message, error) {
+ logger.Debugw(ctx, "Subscribe", log.Fields{"topic": topic.Name, "args": kvArgs})
+ kc.lock.Lock()
+ defer kc.lock.Unlock()
+ ch := make(chan proto.Message, maxConcurrentMessage)
+ kc.topicsChannelMap[topic.Name] = append(kc.topicsChannelMap[topic.Name], ch)
+ return ch, nil
+}
+
+func removeChannel(s []chan proto.Message, i int) []chan proto.Message {
+ s[i] = s[len(s)-1]
+ return s[:len(s)-1]
+}
+
+func (kc *KafkaClient) UnSubscribe(ctx context.Context, topic *kafka.Topic, ch <-chan proto.Message) error {
+ logger.Debugw(ctx, "UnSubscribe", log.Fields{"topic": topic.Name})
+ kc.lock.Lock()
+ defer kc.lock.Unlock()
+ if chnls, ok := kc.topicsChannelMap[topic.Name]; ok {
+ idx := -1
+ for i, c := range chnls {
+ if c == ch {
+ close(c)
+ idx = i
+ }
+ }
+ if idx >= 0 {
+ kc.topicsChannelMap[topic.Name] = removeChannel(kc.topicsChannelMap[topic.Name], idx)
+ }
+ }
+ return nil
+}
+
+func (kc *KafkaClient) SubscribeForMetadata(ctx context.Context, _ func(fromTopic string, timestamp time.Time)) {
+ logger.Debug(ctx, "SubscribeForMetadata - unimplemented")
+}
+
+func (kc *KafkaClient) Send(ctx context.Context, msg interface{}, topic *kafka.Topic, keys ...string) error {
+ // Assert message is a proto message
+ protoMsg, ok := msg.(proto.Message)
+ if !ok {
+ logger.Warnw(ctx, "message-not-a-proto-message", log.Fields{"msg": msg})
+ return status.Error(codes.InvalidArgument, "msg-not-a-proto-msg")
+ }
+ kc.lock.RLock()
+ defer kc.lock.RUnlock()
+ for _, ch := range kc.topicsChannelMap[topic.Name] {
+ select {
+ case ch <- protoMsg:
+ logger.Debugw(ctx, "publishing", log.Fields{"toTopic": topic.Name, "msg": protoMsg})
+ default:
+ logger.Debugw(ctx, "ignoring-event-channel-busy", log.Fields{"toTopic": topic.Name, "msg": protoMsg})
+ }
+ }
+ return nil
+}
+
+func (kc *KafkaClient) SendLiveness(ctx context.Context) error {
+ kc.livenessMutex.Lock()
+ defer kc.livenessMutex.Unlock()
+ if kc.liveness != nil {
+ kc.liveness <- true // I am a mock
+ }
+ return nil
+}
+
+func (kc *KafkaClient) EnableLivenessChannel(ctx context.Context, enable bool) chan bool {
+ logger.Infow(ctx, "kafka-enable-liveness-channel", log.Fields{"enable": enable})
+ if enable {
+ kc.livenessMutex.Lock()
+ defer kc.livenessMutex.Unlock()
+ if kc.liveness == nil {
+ logger.Info(ctx, "kafka-create-liveness-channel")
+ kc.liveness = make(chan bool, 10)
+ // post intial state to the channel
+ kc.liveness <- kc.alive
+ }
+ } else {
+ panic("Turning off liveness reporting is not supported")
+ }
+ return kc.liveness
+}
+
+func (kc *KafkaClient) EnableHealthinessChannel(ctx context.Context, enable bool) chan bool {
+ logger.Debug(ctx, "EnableHealthinessChannel - unimplemented")
+ return nil
+}