[VOL-2835] Using different topic per ONU device
Change-Id: I3e55064292f28f9bf39ad6bc75fd5758f5313317
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka/common.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka/common.go
new file mode 100644
index 0000000..05bc5f9
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka/common.go
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka
+
+import (
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+)
+
+var logger log.Logger
+
+func init() {
+ // Setup this package so that it's log level can be modified at run time
+ var err error
+ logger, err = log.AddPackage(log.JSON, log.ErrorLevel, log.Fields{"pkg": "mocks"})
+ if err != nil {
+ panic(err)
+ }
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka/endpoint_manager.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka/endpoint_manager.go
new file mode 100644
index 0000000..fedbebf
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka/endpoint_manager.go
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka
+
+import (
+ "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
+)
+
+type EndpointManager struct{}
+
+func NewEndpointManager() kafka.EndpointManager {
+ mock := &EndpointManager{}
+ return mock
+}
+
+func (em *EndpointManager) GetEndpoint(deviceID string, serviceType string) (kafka.Endpoint, error) {
+ // TODO add mocks call and args
+ return kafka.Endpoint(serviceType), nil
+}
+
+func (em *EndpointManager) IsDeviceOwnedByService(deviceID string, serviceType string, replicaNumber int32) (bool, error) {
+ // TODO add mocks call and args
+ return true, nil
+}
+
+func (em *EndpointManager) GetReplicaAssignment(deviceID string, serviceType string) (kafka.ReplicaID, error) {
+ return kafka.ReplicaID(1), nil
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka/kafka_client.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka/kafka_client.go
new file mode 100644
index 0000000..5922ce2
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka/kafka_client.go
@@ -0,0 +1,147 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka
+
+import (
+ "fmt"
+ "sync"
+
+ "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ ic "github.com/opencord/voltha-protos/v3/go/inter_container"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+// static check to ensure KafkaClient implements kafka.Client
+var _ kafka.Client = &KafkaClient{}
+
+type KafkaClient struct {
+ topicsChannelMap map[string][]chan *ic.InterContainerMessage
+ lock sync.RWMutex
+}
+
+func NewKafkaClient() *KafkaClient {
+ return &KafkaClient{
+ topicsChannelMap: make(map[string][]chan *ic.InterContainerMessage),
+ lock: sync.RWMutex{},
+ }
+}
+
+func (kc *KafkaClient) Start() error {
+ logger.Debug("kafka-client-started")
+ return nil
+}
+
+func (kc *KafkaClient) Stop() {
+ kc.lock.Lock()
+ defer kc.lock.Unlock()
+ for topic, chnls := range kc.topicsChannelMap {
+ for _, c := range chnls {
+ close(c)
+ }
+ delete(kc.topicsChannelMap, topic)
+ }
+ logger.Debug("kafka-client-stopped")
+}
+
+func (kc *KafkaClient) CreateTopic(topic *kafka.Topic, numPartition int, repFactor int) error {
+ logger.Debugw("CreatingTopic", log.Fields{"topic": topic.Name, "numPartition": numPartition, "replicationFactor": repFactor})
+ kc.lock.Lock()
+ defer kc.lock.Unlock()
+ if _, ok := kc.topicsChannelMap[topic.Name]; ok {
+ return fmt.Errorf("Topic %s already exist", topic.Name)
+ }
+ ch := make(chan *ic.InterContainerMessage)
+ kc.topicsChannelMap[topic.Name] = append(kc.topicsChannelMap[topic.Name], ch)
+ return nil
+}
+
+func (kc *KafkaClient) DeleteTopic(topic *kafka.Topic) error {
+ logger.Debugw("DeleteTopic", log.Fields{"topic": topic.Name})
+ kc.lock.Lock()
+ defer kc.lock.Unlock()
+ delete(kc.topicsChannelMap, topic.Name)
+ return nil
+}
+
+func (kc *KafkaClient) Subscribe(topic *kafka.Topic, kvArgs ...*kafka.KVArg) (<-chan *ic.InterContainerMessage, error) {
+ logger.Debugw("Subscribe", log.Fields{"topic": topic.Name, "args": kvArgs})
+ kc.lock.Lock()
+ defer kc.lock.Unlock()
+ ch := make(chan *ic.InterContainerMessage)
+ kc.topicsChannelMap[topic.Name] = append(kc.topicsChannelMap[topic.Name], ch)
+ return ch, nil
+}
+
+func removeChannel(s []chan *ic.InterContainerMessage, i int) []chan *ic.InterContainerMessage {
+ s[i] = s[len(s)-1]
+ return s[:len(s)-1]
+}
+
+func (kc *KafkaClient) UnSubscribe(topic *kafka.Topic, ch <-chan *ic.InterContainerMessage) error {
+ logger.Debugw("UnSubscribe", log.Fields{"topic": topic.Name})
+ kc.lock.Lock()
+ defer kc.lock.Unlock()
+ if chnls, ok := kc.topicsChannelMap[topic.Name]; ok {
+ idx := -1
+ for i, c := range chnls {
+ if c == ch {
+ close(c)
+ idx = i
+ }
+ }
+ if idx >= 0 {
+ kc.topicsChannelMap[topic.Name] = removeChannel(kc.topicsChannelMap[topic.Name], idx)
+ }
+ }
+ return nil
+}
+
+func (kc *KafkaClient) SubscribeForMetadata(_ func(fromTopic string, timestamp int64)) {
+ logger.Debug("SubscribeForMetadata - unimplemented")
+}
+
+func (kc *KafkaClient) Send(msg interface{}, topic *kafka.Topic, keys ...string) error {
+ req, ok := msg.(*ic.InterContainerMessage)
+ if !ok {
+ return status.Error(codes.InvalidArgument, "msg-not-InterContainerMessage-type")
+ }
+ if req == nil {
+ return status.Error(codes.InvalidArgument, "msg-nil")
+ }
+ kc.lock.RLock()
+ defer kc.lock.RUnlock()
+ for _, ch := range kc.topicsChannelMap[topic.Name] {
+ logger.Debugw("Publishing", log.Fields{"fromTopic": req.Header.FromTopic, "toTopic": topic.Name, "id": req.Header.Id})
+ ch <- req
+ }
+ return nil
+}
+
+func (kc *KafkaClient) SendLiveness() error {
+ return status.Error(codes.Unimplemented, "SendLiveness")
+}
+
+func (kc *KafkaClient) EnableLivenessChannel(enable bool) chan bool {
+ logger.Debug("EnableLivenessChannel - unimplemented")
+ return nil
+}
+
+func (kc *KafkaClient) EnableHealthinessChannel(enable bool) chan bool {
+ logger.Debug("EnableHealthinessChannel - unimplemented")
+ return nil
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka/kafka_inter_container_proxy.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka/kafka_inter_container_proxy.go
new file mode 100644
index 0000000..34aec95
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka/kafka_inter_container_proxy.go
@@ -0,0 +1,133 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka
+
+import (
+ "context"
+ "github.com/gogo/protobuf/proto"
+ "github.com/golang/protobuf/ptypes"
+ "github.com/golang/protobuf/ptypes/any"
+ "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
+ ic "github.com/opencord/voltha-protos/v3/go/inter_container"
+)
+
+type InvokeRpcArgs struct {
+ Rpc string
+ ToTopic *kafka.Topic
+ ReplyToTopic *kafka.Topic
+ WaitForResponse bool
+ Key string
+ ParentDeviceId string
+ KvArgs map[int]interface{}
+}
+
+type InvokeRpcSpy struct {
+ CallCount int
+ Calls map[int]InvokeRpcArgs
+ Timeout bool
+ Response proto.Message
+}
+
+type InvokeAsyncRpcSpy struct {
+ CallCount int
+ Calls map[int]InvokeRpcArgs
+ Timeout bool
+ Response proto.Message
+}
+
+type MockKafkaICProxy struct {
+ InvokeRpcSpy InvokeRpcSpy
+}
+
+func (s *MockKafkaICProxy) Start() error { return nil }
+func (s *MockKafkaICProxy) GetDefaultTopic() *kafka.Topic {
+ t := kafka.Topic{
+ Name: "test-topic",
+ }
+ return &t
+}
+func (s *MockKafkaICProxy) DeleteTopic(topic kafka.Topic) error { return nil }
+func (s *MockKafkaICProxy) DeviceDiscovered(deviceId string, deviceType string, parentId string, publisher string) error {
+ return nil
+}
+func (s *MockKafkaICProxy) Stop() {}
+
+func (s *MockKafkaICProxy) InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *kafka.Topic, replyToTopic *kafka.Topic,
+ waitForResponse bool, key string, kvArgs ...*kafka.KVArg) chan *kafka.RpcResponse {
+
+ args := make(map[int]interface{}, 4)
+ for k, v := range kvArgs {
+ args[k] = v
+ }
+
+ s.InvokeRpcSpy.Calls[s.InvokeRpcSpy.CallCount] = InvokeRpcArgs{
+ Rpc: rpc,
+ ToTopic: toTopic,
+ ReplyToTopic: replyToTopic,
+ WaitForResponse: waitForResponse,
+ Key: key,
+ KvArgs: args,
+ }
+
+ chnl := make(chan *kafka.RpcResponse)
+
+ return chnl
+}
+
+func (s *MockKafkaICProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *kafka.Topic, replyToTopic *kafka.Topic, waitForResponse bool, key string, kvArgs ...*kafka.KVArg) (bool, *any.Any) {
+ s.InvokeRpcSpy.CallCount++
+
+ success := true
+
+ args := make(map[int]interface{}, 4)
+ for k, v := range kvArgs {
+ args[k] = v
+ }
+
+ s.InvokeRpcSpy.Calls[s.InvokeRpcSpy.CallCount] = InvokeRpcArgs{
+ Rpc: rpc,
+ ToTopic: toTopic,
+ ReplyToTopic: replyToTopic,
+ WaitForResponse: waitForResponse,
+ Key: key,
+ KvArgs: args,
+ }
+
+ var response any.Any
+ if s.InvokeRpcSpy.Timeout {
+
+ success = false
+
+ err := &ic.Error{Reason: "context deadline exceeded", Code: ic.ErrorCode_DEADLINE_EXCEEDED}
+ res, _ := ptypes.MarshalAny(err)
+ response = *res
+ } else {
+ res, _ := ptypes.MarshalAny(s.InvokeRpcSpy.Response)
+ response = *res
+ }
+
+ return success, &response
+}
+func (s *MockKafkaICProxy) SubscribeWithRequestHandlerInterface(topic kafka.Topic, handler interface{}) error {
+ return nil
+}
+func (s *MockKafkaICProxy) SubscribeWithDefaultRequestHandler(topic kafka.Topic, initialOffset int64) error {
+ return nil
+}
+func (s *MockKafkaICProxy) UnSubscribeFromRequestHandler(topic kafka.Topic) error { return nil }
+func (s *MockKafkaICProxy) EnableLivenessChannel(enable bool) chan bool { return nil }
+func (s *MockKafkaICProxy) SendLiveness() error { return nil }