blob: 3af728a09d9cdbdded50822e47e92865634f5b56 [file] [log] [blame]
Matteo Scandolo2ba00d32020-01-16 17:33:03 -08001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package mocks
18
19import (
20 "context"
21 "github.com/golang/protobuf/ptypes"
22 "github.com/golang/protobuf/ptypes/any"
23 "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
24 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
25 "github.com/opencord/voltha-protos/v3/go/voltha"
26)
27
28type InvokeRpcArgs struct {
29 Rpc string
30 ToTopic *kafka.Topic
31 ReplyToTopic *kafka.Topic
32 WaitForResponse bool
33 Key string
34 ParentDeviceId string
35 KvArgs map[int]interface{}
36}
37
38type FailReason int
39
40const (
41 Timeout FailReason = iota + 1
42 UnmarshalError
43)
44
45func (r FailReason) String() string {
46 return [...]string{"Timeout", "UnmarshalError"}[r]
47}
48
49type InvokeRpcSpy struct {
50 CallCount int
51 Calls map[int]InvokeRpcArgs
52 Fail FailReason // timeout, error
53}
54
55type MockKafkaICProxy struct {
56 InvokeRpcSpy InvokeRpcSpy
57}
58
59func (s *MockKafkaICProxy) Start() error { return nil }
60func (s *MockKafkaICProxy) DeleteTopic(topic kafka.Topic) error { return nil }
61func (s *MockKafkaICProxy) DeviceDiscovered(deviceId string, deviceType string, parentId string, publisher string) error {
62 return nil
63}
64func (s *MockKafkaICProxy) Stop() {}
65func (s *MockKafkaICProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *kafka.Topic, replyToTopic *kafka.Topic, waitForResponse bool, key string, kvArgs ...*kafka.KVArg) (bool, *any.Any) {
66 s.InvokeRpcSpy.CallCount++
67
68 success := true
69
70 args := make(map[int]interface{}, 4)
71 for k, v := range kvArgs {
72 args[k] = v
73 }
74
75 s.InvokeRpcSpy.Calls[s.InvokeRpcSpy.CallCount] = InvokeRpcArgs{
76 Rpc: rpc,
77 ToTopic: toTopic,
78 ReplyToTopic: replyToTopic,
79 WaitForResponse: waitForResponse,
80 Key: key,
81 KvArgs: args,
82 }
83
84 device := &voltha.Device{
85 Id: "testDevice",
86 }
87 response, _ := ptypes.MarshalAny(device)
88
89 if s.InvokeRpcSpy.Fail == Timeout {
90
91 success = false
92
93 // TODO once InvokeRPC is fixed to return an error code, add it here
94 err := &ic.Error{Reason: "context deadline exceeded"}
95 response, _ = ptypes.MarshalAny(err)
96 } else if s.InvokeRpcSpy.Fail == UnmarshalError {
97 res := &voltha.LogicalDevice{
98 Id: "testLogicalDevice",
99 }
100 response, _ = ptypes.MarshalAny(res)
101 }
102
103 return success, response
104}
105func (s *MockKafkaICProxy) SubscribeWithRequestHandlerInterface(topic kafka.Topic, handler interface{}) error {
106 return nil
107}
108func (s *MockKafkaICProxy) SubscribeWithDefaultRequestHandler(topic kafka.Topic, initialOffset int64) error {
109 return nil
110}
111func (s *MockKafkaICProxy) UnSubscribeFromRequestHandler(topic kafka.Topic) error { return nil }