[VOL-4183] Multi UNI

voltha-lib-go version changed

Unused constant removed

Before sending trap flows to OLT device, rw-core sets the correct meterId to flow.

Change-Id: Id16afc685161dee560b62cc6c045c44f0bc4a427
diff --git a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/mocks/etcd/common.go b/vendor/github.com/opencord/voltha-lib-go/v5/pkg/mocks/etcd/common.go
new file mode 100644
index 0000000..a874bd1
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v5/pkg/mocks/etcd/common.go
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package etcd
+
+import (
+	"github.com/opencord/voltha-lib-go/v5/pkg/log"
+)
+
+var logger log.CLogger
+
+func init() {
+	// Setup this package so that it's log level can be modified at run time
+	var err error
+	logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{})
+	if err != nil {
+		panic(err)
+	}
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/mocks/etcd/etcd_server.go b/vendor/github.com/opencord/voltha-lib-go/v5/pkg/mocks/etcd/etcd_server.go
new file mode 100644
index 0000000..6113b3a
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v5/pkg/mocks/etcd/etcd_server.go
@@ -0,0 +1,134 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package etcd
+
+import (
+	"context"
+	"fmt"
+	"go.etcd.io/etcd/embed"
+	"net/url"
+	"os"
+	"time"
+)
+
+const (
+	serverStartUpTimeout          = 10 * time.Second // Maximum time allowed to wait for the Etcd server to be ready
+	defaultLocalPersistentStorage = "voltha.test.embed.etcd"
+)
+
+//EtcdServer represents an embedded Etcd server.  It is used for testing only.
+type EtcdServer struct {
+	server *embed.Etcd
+}
+
+func islogLevelValid(logLevel string) bool {
+	valid := []string{"debug", "info", "warn", "error", "panic", "fatal"}
+	for _, l := range valid {
+		if l == logLevel {
+			return true
+		}
+	}
+	return false
+}
+
+/*
+* MKConfig creates an embedded Etcd config
+* :param configName: A name for this config
+* :param clientPort:  The port the etcd client will connect to (do not use 2379 for unit test)
+* :param peerPort:  The port the etcd server will listen for its peers (do not use 2380 for unit test)
+* :param localPersistentStorageDir: The name of a local directory which will hold the Etcd server data
+* :param logLevel: One of debug, info, warn, error, panic, or fatal. Default 'info'.
+ */
+func MKConfig(ctx context.Context, configName string, clientPort, peerPort int, localPersistentStorageDir string, logLevel string) *embed.Config {
+	cfg := embed.NewConfig()
+	cfg.Name = configName
+	cfg.Dir = localPersistentStorageDir
+	cfg.Logger = "zap"
+	if !islogLevelValid(logLevel) {
+		logger.Fatalf(ctx, "Invalid log level -%s", logLevel)
+	}
+	cfg.LogLevel = logLevel
+	acurl, err := url.Parse(fmt.Sprintf("http://localhost:%d", clientPort))
+	if err != nil {
+		logger.Fatalf(ctx, "Invalid client port -%d", clientPort)
+	}
+	cfg.ACUrls = []url.URL{*acurl}
+	cfg.LCUrls = []url.URL{*acurl}
+
+	apurl, err := url.Parse(fmt.Sprintf("http://localhost:%d", peerPort))
+	if err != nil {
+		logger.Fatalf(ctx, "Invalid peer port -%d", peerPort)
+	}
+	cfg.LPUrls = []url.URL{*apurl}
+	cfg.APUrls = []url.URL{*apurl}
+
+	cfg.ClusterState = embed.ClusterStateFlagNew
+	cfg.InitialCluster = cfg.Name + "=" + apurl.String()
+
+	return cfg
+}
+
+//getDefaultCfg specifies the default config
+func getDefaultCfg() *embed.Config {
+	cfg := embed.NewConfig()
+	cfg.Dir = defaultLocalPersistentStorage
+	cfg.Logger = "zap"
+	cfg.LogLevel = "error"
+	return cfg
+}
+
+//StartEtcdServer creates and starts an embedded Etcd server.  A local directory to store data is created for the
+//embedded server lifetime (for the duration of a unit test.  The server runs at localhost:2379.
+func StartEtcdServer(ctx context.Context, cfg *embed.Config) *EtcdServer {
+	// If the server is already running, just return
+	if cfg == nil {
+		cfg = getDefaultCfg()
+	}
+	// Remove the local directory as
+	// a safeguard for the case where a prior test failed
+	if err := os.RemoveAll(cfg.Dir); err != nil {
+		logger.Fatalf(ctx, "Failure removing local directory %s", cfg.Dir)
+	}
+	e, err := embed.StartEtcd(cfg)
+	if err != nil {
+		logger.Fatal(ctx, err)
+	}
+	select {
+	case <-e.Server.ReadyNotify():
+		logger.Debug(ctx, "Embedded Etcd server is ready!")
+	case <-time.After(serverStartUpTimeout):
+		e.Server.HardStop() // trigger a shutdown
+		e.Close()
+		logger.Fatal(ctx, "Embedded Etcd server took too long to start!")
+	case err := <-e.Err():
+		e.Server.HardStop() // trigger a shutdown
+		e.Close()
+		logger.Fatalf(ctx, "Embedded Etcd server errored out - %s", err)
+	}
+	return &EtcdServer{server: e}
+}
+
+//Stop closes the embedded Etcd server and removes the local data directory as well
+func (es *EtcdServer) Stop(ctx context.Context) {
+	if es != nil {
+		storage := es.server.Config().Dir
+		es.server.Server.HardStop()
+		es.server.Close()
+		if err := os.RemoveAll(storage); err != nil {
+			logger.Fatalf(ctx, "Failure removing local directory %s", es.server.Config().Dir)
+		}
+	}
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/mocks/kafka/common.go b/vendor/github.com/opencord/voltha-lib-go/v5/pkg/mocks/kafka/common.go
new file mode 100644
index 0000000..313ff9e
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v5/pkg/mocks/kafka/common.go
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka
+
+import (
+	"github.com/opencord/voltha-lib-go/v5/pkg/log"
+)
+
+var logger log.CLogger
+
+func init() {
+	// Setup this package so that it's log level can be modified at run time
+	var err error
+	logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{})
+	if err != nil {
+		panic(err)
+	}
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/mocks/kafka/endpoint_manager.go b/vendor/github.com/opencord/voltha-lib-go/v5/pkg/mocks/kafka/endpoint_manager.go
new file mode 100644
index 0000000..5acec69
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v5/pkg/mocks/kafka/endpoint_manager.go
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka
+
+import (
+	"context"
+	"github.com/opencord/voltha-lib-go/v5/pkg/kafka"
+)
+
+type EndpointManager struct{}
+
+func NewEndpointManager() kafka.EndpointManager {
+	mock := &EndpointManager{}
+	return mock
+}
+
+func (em *EndpointManager) GetEndpoint(ctx context.Context, deviceID string, serviceType string) (kafka.Endpoint, error) {
+	// TODO add mocks call and args
+	return kafka.Endpoint(serviceType), nil
+}
+
+func (em *EndpointManager) IsDeviceOwnedByService(ctx context.Context, deviceID string, serviceType string, replicaNumber int32) (bool, error) {
+	// TODO add mocks call and args
+	return true, nil
+}
+
+func (em *EndpointManager) GetReplicaAssignment(ctx context.Context, deviceID string, serviceType string) (kafka.ReplicaID, error) {
+	return kafka.ReplicaID(1), nil
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/mocks/kafka/kafka_client.go b/vendor/github.com/opencord/voltha-lib-go/v5/pkg/mocks/kafka/kafka_client.go
new file mode 100644
index 0000000..92966a7
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v5/pkg/mocks/kafka/kafka_client.go
@@ -0,0 +1,177 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka
+
+import (
+	"context"
+	"fmt"
+	"github.com/golang/protobuf/ptypes"
+	"sync"
+	"time"
+
+	"github.com/golang/protobuf/proto"
+	"github.com/opencord/voltha-lib-go/v5/pkg/kafka"
+	"github.com/opencord/voltha-lib-go/v5/pkg/log"
+	ic "github.com/opencord/voltha-protos/v4/go/inter_container"
+	"github.com/opencord/voltha-protos/v4/go/voltha"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+)
+
+// static check to ensure KafkaClient implements kafka.Client
+var _ kafka.Client = &KafkaClient{}
+
+type KafkaClient struct {
+	topicsChannelMap map[string][]chan *ic.InterContainerMessage
+	lock             sync.RWMutex
+}
+
+func NewKafkaClient() *KafkaClient {
+	return &KafkaClient{
+		topicsChannelMap: make(map[string][]chan *ic.InterContainerMessage),
+		lock:             sync.RWMutex{},
+	}
+}
+
+func (kc *KafkaClient) Start(ctx context.Context) error {
+	logger.Debug(ctx, "kafka-client-started")
+	return nil
+}
+
+func (kc *KafkaClient) Stop(ctx context.Context) {
+	kc.lock.Lock()
+	defer kc.lock.Unlock()
+	for topic, chnls := range kc.topicsChannelMap {
+		for _, c := range chnls {
+			close(c)
+		}
+		delete(kc.topicsChannelMap, topic)
+	}
+	logger.Debug(ctx, "kafka-client-stopped")
+}
+
+func (kc *KafkaClient) CreateTopic(ctx context.Context, topic *kafka.Topic, numPartition int, repFactor int) error {
+	logger.Debugw(ctx, "CreatingTopic", log.Fields{"topic": topic.Name, "numPartition": numPartition, "replicationFactor": repFactor})
+	kc.lock.Lock()
+	defer kc.lock.Unlock()
+	if _, ok := kc.topicsChannelMap[topic.Name]; ok {
+		return fmt.Errorf("Topic %s already exist", topic.Name)
+	}
+	ch := make(chan *ic.InterContainerMessage)
+	kc.topicsChannelMap[topic.Name] = append(kc.topicsChannelMap[topic.Name], ch)
+	return nil
+}
+
+func (kc *KafkaClient) DeleteTopic(ctx context.Context, topic *kafka.Topic) error {
+	logger.Debugw(ctx, "DeleteTopic", log.Fields{"topic": topic.Name})
+	kc.lock.Lock()
+	defer kc.lock.Unlock()
+	delete(kc.topicsChannelMap, topic.Name)
+	return nil
+}
+
+func (kc *KafkaClient) Subscribe(ctx context.Context, topic *kafka.Topic, kvArgs ...*kafka.KVArg) (<-chan *ic.InterContainerMessage, error) {
+	logger.Debugw(ctx, "Subscribe", log.Fields{"topic": topic.Name, "args": kvArgs})
+	kc.lock.Lock()
+	defer kc.lock.Unlock()
+	ch := make(chan *ic.InterContainerMessage)
+	kc.topicsChannelMap[topic.Name] = append(kc.topicsChannelMap[topic.Name], ch)
+	return ch, nil
+}
+
+func removeChannel(s []chan *ic.InterContainerMessage, i int) []chan *ic.InterContainerMessage {
+	s[i] = s[len(s)-1]
+	return s[:len(s)-1]
+}
+
+func (kc *KafkaClient) UnSubscribe(ctx context.Context, topic *kafka.Topic, ch <-chan *ic.InterContainerMessage) error {
+	logger.Debugw(ctx, "UnSubscribe", log.Fields{"topic": topic.Name})
+	kc.lock.Lock()
+	defer kc.lock.Unlock()
+	if chnls, ok := kc.topicsChannelMap[topic.Name]; ok {
+		idx := -1
+		for i, c := range chnls {
+			if c == ch {
+				close(c)
+				idx = i
+			}
+		}
+		if idx >= 0 {
+			kc.topicsChannelMap[topic.Name] = removeChannel(kc.topicsChannelMap[topic.Name], idx)
+		}
+	}
+	return nil
+}
+
+func (kc *KafkaClient) SubscribeForMetadata(ctx context.Context, _ func(fromTopic string, timestamp time.Time)) {
+	logger.Debug(ctx, "SubscribeForMetadata - unimplemented")
+}
+
+func toIntercontainerMessage(event *voltha.Event) *ic.InterContainerMessage {
+	msg := &ic.InterContainerMessage{
+		Header: &ic.Header{
+			Id:        event.Header.Id,
+			Type:      ic.MessageType_REQUEST,
+			Timestamp: event.Header.RaisedTs,
+		},
+	}
+	// Marshal event
+	if eventBody, err := ptypes.MarshalAny(event); err == nil {
+		msg.Body = eventBody
+	}
+	return msg
+}
+
+func (kc *KafkaClient) Send(ctx context.Context, msg interface{}, topic *kafka.Topic, keys ...string) error {
+	// Assert message is a proto message
+	// ascertain the value interface type is a proto.Message
+	if _, ok := msg.(proto.Message); !ok {
+		logger.Warnw(ctx, "message-not-a-proto-message", log.Fields{"msg": msg})
+		return status.Error(codes.InvalidArgument, "msg-not-a-proto-msg")
+	}
+	req, ok := msg.(*ic.InterContainerMessage)
+	if !ok {
+		event, ok := msg.(*voltha.Event) //This is required as event message will be of type voltha.Event
+		if !ok {
+			return status.Error(codes.InvalidArgument, "unexpected-message-type")
+		}
+		req = toIntercontainerMessage(event)
+	}
+	if req == nil {
+		return status.Error(codes.InvalidArgument, "msg-nil")
+	}
+	kc.lock.RLock()
+	defer kc.lock.RUnlock()
+	for _, ch := range kc.topicsChannelMap[topic.Name] {
+		logger.Debugw(ctx, "Publishing", log.Fields{"fromTopic": req.Header.FromTopic, "toTopic": topic.Name, "id": req.Header.Id})
+		ch <- req
+	}
+	return nil
+}
+
+func (kc *KafkaClient) SendLiveness(ctx context.Context) error {
+	return status.Error(codes.Unimplemented, "SendLiveness")
+}
+
+func (kc *KafkaClient) EnableLivenessChannel(ctx context.Context, enable bool) chan bool {
+	logger.Debug(ctx, "EnableLivenessChannel - unimplemented")
+	return nil
+}
+
+func (kc *KafkaClient) EnableHealthinessChannel(ctx context.Context, enable bool) chan bool {
+	logger.Debug(ctx, "EnableHealthinessChannel - unimplemented")
+	return nil
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/mocks/kafka/kafka_inter_container_proxy.go b/vendor/github.com/opencord/voltha-lib-go/v5/pkg/mocks/kafka/kafka_inter_container_proxy.go
new file mode 100644
index 0000000..a028aa9
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v5/pkg/mocks/kafka/kafka_inter_container_proxy.go
@@ -0,0 +1,137 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka
+
+import (
+	"context"
+
+	"github.com/gogo/protobuf/proto"
+	"github.com/golang/protobuf/ptypes"
+	"github.com/golang/protobuf/ptypes/any"
+	"github.com/opencord/voltha-lib-go/v5/pkg/kafka"
+	ic "github.com/opencord/voltha-protos/v4/go/inter_container"
+)
+
+type InvokeRpcArgs struct {
+	Rpc             string
+	ToTopic         *kafka.Topic
+	ReplyToTopic    *kafka.Topic
+	WaitForResponse bool
+	Key             string
+	ParentDeviceId  string
+	KvArgs          map[int]interface{}
+}
+
+type InvokeRpcSpy struct {
+	CallCount int
+	Calls     map[int]InvokeRpcArgs
+	Timeout   bool
+	Response  proto.Message
+}
+
+type InvokeAsyncRpcSpy struct {
+	CallCount int
+	Calls     map[int]InvokeRpcArgs
+	Timeout   bool
+	Response  proto.Message
+}
+
+type MockKafkaICProxy struct {
+	InvokeRpcSpy InvokeRpcSpy
+}
+
+func (s *MockKafkaICProxy) Start(ctx context.Context) error { return nil }
+func (s *MockKafkaICProxy) GetDefaultTopic() *kafka.Topic {
+	t := kafka.Topic{
+		Name: "test-topic",
+	}
+	return &t
+}
+
+func (s *MockKafkaICProxy) DeleteTopic(ctx context.Context, topic kafka.Topic) error { return nil }
+
+func (s *MockKafkaICProxy) Stop(ctx context.Context) {}
+
+func (s *MockKafkaICProxy) InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *kafka.Topic, replyToTopic *kafka.Topic,
+	waitForResponse bool, key string, kvArgs ...*kafka.KVArg) chan *kafka.RpcResponse {
+
+	args := make(map[int]interface{}, 4)
+	for k, v := range kvArgs {
+		args[k] = v
+	}
+
+	s.InvokeRpcSpy.Calls[s.InvokeRpcSpy.CallCount] = InvokeRpcArgs{
+		Rpc:             rpc,
+		ToTopic:         toTopic,
+		ReplyToTopic:    replyToTopic,
+		WaitForResponse: waitForResponse,
+		Key:             key,
+		KvArgs:          args,
+	}
+
+	chnl := make(chan *kafka.RpcResponse)
+
+	return chnl
+}
+
+func (s *MockKafkaICProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *kafka.Topic, replyToTopic *kafka.Topic, waitForResponse bool, key string, kvArgs ...*kafka.KVArg) (bool, *any.Any) {
+	s.InvokeRpcSpy.CallCount++
+
+	success := true
+
+	args := make(map[int]interface{}, 4)
+	for k, v := range kvArgs {
+		args[k] = v
+	}
+
+	s.InvokeRpcSpy.Calls[s.InvokeRpcSpy.CallCount] = InvokeRpcArgs{
+		Rpc:             rpc,
+		ToTopic:         toTopic,
+		ReplyToTopic:    replyToTopic,
+		WaitForResponse: waitForResponse,
+		Key:             key,
+		KvArgs:          args,
+	}
+
+	var response any.Any
+	if s.InvokeRpcSpy.Timeout {
+
+		success = false
+
+		err := &ic.Error{Reason: "context deadline exceeded", Code: ic.ErrorCode_DEADLINE_EXCEEDED}
+		res, _ := ptypes.MarshalAny(err)
+		response = *res
+	} else {
+		res, _ := ptypes.MarshalAny(s.InvokeRpcSpy.Response)
+		response = *res
+	}
+
+	return success, &response
+}
+func (s *MockKafkaICProxy) SubscribeWithRequestHandlerInterface(ctx context.Context, topic kafka.Topic, handler interface{}) error {
+	return nil
+}
+func (s *MockKafkaICProxy) SubscribeWithDefaultRequestHandler(ctx context.Context, topic kafka.Topic, initialOffset int64) error {
+	return nil
+}
+func (s *MockKafkaICProxy) UnSubscribeFromRequestHandler(ctx context.Context, topic kafka.Topic) error {
+	return nil
+}
+func (s *MockKafkaICProxy) EnableLivenessChannel(ctx context.Context, enable bool) chan bool {
+	return nil
+}
+func (s *MockKafkaICProxy) SendLiveness(ctx context.Context) error { return nil }