blob: 2a1b5a1ca8b5fc3d71f165f82bc9b0533c41453d [file] [log] [blame]
Matteo Scandolo2ba00d32020-01-16 17:33:03 -08001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
khenaidoob6238b32020-04-07 12:07:36 -040017package kafka
Matteo Scandolo2ba00d32020-01-16 17:33:03 -080018
19import (
20 "context"
Matteo Scandolob45cf592020-01-21 16:10:56 -080021 "github.com/gogo/protobuf/proto"
Matteo Scandolo2ba00d32020-01-16 17:33:03 -080022 "github.com/golang/protobuf/ptypes"
23 "github.com/golang/protobuf/ptypes/any"
24 "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
25 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
Matteo Scandolo2ba00d32020-01-16 17:33:03 -080026)
27
28type InvokeRpcArgs struct {
29 Rpc string
30 ToTopic *kafka.Topic
31 ReplyToTopic *kafka.Topic
32 WaitForResponse bool
33 Key string
34 ParentDeviceId string
35 KvArgs map[int]interface{}
36}
37
Matteo Scandolo2ba00d32020-01-16 17:33:03 -080038type InvokeRpcSpy struct {
39 CallCount int
40 Calls map[int]InvokeRpcArgs
Matteo Scandolob45cf592020-01-21 16:10:56 -080041 Timeout bool
42 Response proto.Message
Matteo Scandolo2ba00d32020-01-16 17:33:03 -080043}
44
Matteo Scandoloed128822020-02-10 15:52:35 -080045type InvokeAsyncRpcSpy struct {
46 CallCount int
47 Calls map[int]InvokeRpcArgs
48 Timeout bool
49 Response proto.Message
50}
51
Matteo Scandolo2ba00d32020-01-16 17:33:03 -080052type MockKafkaICProxy struct {
53 InvokeRpcSpy InvokeRpcSpy
54}
55
Neha Sharma3c425fb2020-06-08 16:42:32 +000056func (s *MockKafkaICProxy) Start(ctx context.Context) error { return nil }
Matteo Scandolof346a2d2020-01-24 13:14:54 -080057func (s *MockKafkaICProxy) GetDefaultTopic() *kafka.Topic {
58 t := kafka.Topic{
59 Name: "test-topic",
60 }
61 return &t
62}
Neha Sharma3c425fb2020-06-08 16:42:32 +000063func (s *MockKafkaICProxy) DeleteTopic(ctx context.Context, topic kafka.Topic) error { return nil }
64func (s *MockKafkaICProxy) DeviceDiscovered(ctx context.Context, deviceId string, deviceType string, parentId string, publisher string) error {
Matteo Scandolo2ba00d32020-01-16 17:33:03 -080065 return nil
66}
Neha Sharma3c425fb2020-06-08 16:42:32 +000067func (s *MockKafkaICProxy) Stop(ctx context.Context) {}
Matteo Scandoloed128822020-02-10 15:52:35 -080068
69func (s *MockKafkaICProxy) InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *kafka.Topic, replyToTopic *kafka.Topic,
70 waitForResponse bool, key string, kvArgs ...*kafka.KVArg) chan *kafka.RpcResponse {
71
72 args := make(map[int]interface{}, 4)
73 for k, v := range kvArgs {
74 args[k] = v
75 }
76
77 s.InvokeRpcSpy.Calls[s.InvokeRpcSpy.CallCount] = InvokeRpcArgs{
78 Rpc: rpc,
79 ToTopic: toTopic,
80 ReplyToTopic: replyToTopic,
81 WaitForResponse: waitForResponse,
82 Key: key,
83 KvArgs: args,
84 }
85
86 chnl := make(chan *kafka.RpcResponse)
87
88 return chnl
89}
90
Matteo Scandolo2ba00d32020-01-16 17:33:03 -080091func (s *MockKafkaICProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *kafka.Topic, replyToTopic *kafka.Topic, waitForResponse bool, key string, kvArgs ...*kafka.KVArg) (bool, *any.Any) {
92 s.InvokeRpcSpy.CallCount++
93
94 success := true
95
96 args := make(map[int]interface{}, 4)
97 for k, v := range kvArgs {
98 args[k] = v
99 }
100
101 s.InvokeRpcSpy.Calls[s.InvokeRpcSpy.CallCount] = InvokeRpcArgs{
102 Rpc: rpc,
103 ToTopic: toTopic,
104 ReplyToTopic: replyToTopic,
105 WaitForResponse: waitForResponse,
106 Key: key,
107 KvArgs: args,
108 }
109
Matteo Scandolob45cf592020-01-21 16:10:56 -0800110 var response any.Any
111 if s.InvokeRpcSpy.Timeout {
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800112
113 success = false
114
Matteo Scandolob45cf592020-01-21 16:10:56 -0800115 err := &ic.Error{Reason: "context deadline exceeded", Code: ic.ErrorCode_DEADLINE_EXCEEDED}
116 res, _ := ptypes.MarshalAny(err)
117 response = *res
118 } else {
119 res, _ := ptypes.MarshalAny(s.InvokeRpcSpy.Response)
120 response = *res
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800121 }
122
Matteo Scandolob45cf592020-01-21 16:10:56 -0800123 return success, &response
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800124}
Neha Sharma3c425fb2020-06-08 16:42:32 +0000125func (s *MockKafkaICProxy) SubscribeWithRequestHandlerInterface(ctx context.Context, topic kafka.Topic, handler interface{}) error {
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800126 return nil
127}
Neha Sharma3c425fb2020-06-08 16:42:32 +0000128func (s *MockKafkaICProxy) SubscribeWithDefaultRequestHandler(ctx context.Context, topic kafka.Topic, initialOffset int64) error {
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800129 return nil
130}
Neha Sharma3c425fb2020-06-08 16:42:32 +0000131func (s *MockKafkaICProxy) UnSubscribeFromRequestHandler(ctx context.Context, topic kafka.Topic) error {
132 return nil
133}
134func (s *MockKafkaICProxy) EnableLivenessChannel(ctx context.Context, enable bool) chan bool {
135 return nil
136}
137func (s *MockKafkaICProxy) SendLiveness(ctx context.Context) error { return nil }