blob: bf8582da4b2a372fcad3dfaf729e5a1a38f7e478 [file] [log] [blame]
Matteo Scandolo2ba00d32020-01-16 17:33:03 -08001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
khenaidoob6238b32020-04-07 12:07:36 -040017package kafka
Matteo Scandolo2ba00d32020-01-16 17:33:03 -080018
19import (
20 "context"
David Bainbridgece5e11a2020-06-23 12:41:16 -070021
Matteo Scandolob45cf592020-01-21 16:10:56 -080022 "github.com/gogo/protobuf/proto"
Matteo Scandolo2ba00d32020-01-16 17:33:03 -080023 "github.com/golang/protobuf/ptypes"
24 "github.com/golang/protobuf/ptypes/any"
25 "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
26 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
Matteo Scandolo2ba00d32020-01-16 17:33:03 -080027)
28
29type InvokeRpcArgs struct {
30 Rpc string
31 ToTopic *kafka.Topic
32 ReplyToTopic *kafka.Topic
33 WaitForResponse bool
34 Key string
35 ParentDeviceId string
36 KvArgs map[int]interface{}
37}
38
Matteo Scandolo2ba00d32020-01-16 17:33:03 -080039type InvokeRpcSpy struct {
40 CallCount int
41 Calls map[int]InvokeRpcArgs
Matteo Scandolob45cf592020-01-21 16:10:56 -080042 Timeout bool
43 Response proto.Message
Matteo Scandolo2ba00d32020-01-16 17:33:03 -080044}
45
Matteo Scandoloed128822020-02-10 15:52:35 -080046type InvokeAsyncRpcSpy struct {
47 CallCount int
48 Calls map[int]InvokeRpcArgs
49 Timeout bool
50 Response proto.Message
51}
52
Matteo Scandolo2ba00d32020-01-16 17:33:03 -080053type MockKafkaICProxy struct {
54 InvokeRpcSpy InvokeRpcSpy
55}
56
Scott Bakere6685952020-06-23 04:05:39 +000057func (s *MockKafkaICProxy) Start() error { return nil }
Matteo Scandolof346a2d2020-01-24 13:14:54 -080058func (s *MockKafkaICProxy) GetDefaultTopic() *kafka.Topic {
59 t := kafka.Topic{
60 Name: "test-topic",
61 }
62 return &t
63}
Scott Bakere6685952020-06-23 04:05:39 +000064func (s *MockKafkaICProxy) DeleteTopic(topic kafka.Topic) error { return nil }
David Bainbridgece5e11a2020-06-23 12:41:16 -070065
Scott Bakere6685952020-06-23 04:05:39 +000066func (s *MockKafkaICProxy) Stop() {}
Matteo Scandoloed128822020-02-10 15:52:35 -080067
68func (s *MockKafkaICProxy) InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *kafka.Topic, replyToTopic *kafka.Topic,
69 waitForResponse bool, key string, kvArgs ...*kafka.KVArg) chan *kafka.RpcResponse {
70
71 args := make(map[int]interface{}, 4)
72 for k, v := range kvArgs {
73 args[k] = v
74 }
75
76 s.InvokeRpcSpy.Calls[s.InvokeRpcSpy.CallCount] = InvokeRpcArgs{
77 Rpc: rpc,
78 ToTopic: toTopic,
79 ReplyToTopic: replyToTopic,
80 WaitForResponse: waitForResponse,
81 Key: key,
82 KvArgs: args,
83 }
84
85 chnl := make(chan *kafka.RpcResponse)
86
87 return chnl
88}
89
Matteo Scandolo2ba00d32020-01-16 17:33:03 -080090func (s *MockKafkaICProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *kafka.Topic, replyToTopic *kafka.Topic, waitForResponse bool, key string, kvArgs ...*kafka.KVArg) (bool, *any.Any) {
91 s.InvokeRpcSpy.CallCount++
92
93 success := true
94
95 args := make(map[int]interface{}, 4)
96 for k, v := range kvArgs {
97 args[k] = v
98 }
99
100 s.InvokeRpcSpy.Calls[s.InvokeRpcSpy.CallCount] = InvokeRpcArgs{
101 Rpc: rpc,
102 ToTopic: toTopic,
103 ReplyToTopic: replyToTopic,
104 WaitForResponse: waitForResponse,
105 Key: key,
106 KvArgs: args,
107 }
108
Matteo Scandolob45cf592020-01-21 16:10:56 -0800109 var response any.Any
110 if s.InvokeRpcSpy.Timeout {
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800111
112 success = false
113
Matteo Scandolob45cf592020-01-21 16:10:56 -0800114 err := &ic.Error{Reason: "context deadline exceeded", Code: ic.ErrorCode_DEADLINE_EXCEEDED}
115 res, _ := ptypes.MarshalAny(err)
116 response = *res
117 } else {
118 res, _ := ptypes.MarshalAny(s.InvokeRpcSpy.Response)
119 response = *res
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800120 }
121
Matteo Scandolob45cf592020-01-21 16:10:56 -0800122 return success, &response
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800123}
Scott Bakere6685952020-06-23 04:05:39 +0000124func (s *MockKafkaICProxy) SubscribeWithRequestHandlerInterface(topic kafka.Topic, handler interface{}) error {
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800125 return nil
126}
Scott Bakere6685952020-06-23 04:05:39 +0000127func (s *MockKafkaICProxy) SubscribeWithDefaultRequestHandler(topic kafka.Topic, initialOffset int64) error {
Matteo Scandolo2ba00d32020-01-16 17:33:03 -0800128 return nil
129}
Scott Bakere6685952020-06-23 04:05:39 +0000130func (s *MockKafkaICProxy) UnSubscribeFromRequestHandler(topic kafka.Topic) error { return nil }
131func (s *MockKafkaICProxy) EnableLivenessChannel(enable bool) chan bool { return nil }
132func (s *MockKafkaICProxy) SendLiveness() error { return nil }