blob: 800f80bf700e48afecd9998aae281bf00a99222e [file] [log] [blame]
Rohan Agrawal8d4d6c92020-04-23 12:59:35 +00001/*
2 * Copyright 2020-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package common
18
19import (
20 "context"
21 "github.com/opencord/voltha-lib-go/v3/pkg/db"
22 "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
23 mocks "github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka"
24 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
25 "github.com/opencord/voltha-protos/v3/go/voltha"
26 "github.com/phayes/freeport"
27 "github.com/stretchr/testify/assert"
28 "testing"
29)
30
31const (
32 embedEtcdServerHost = "localhost"
33 defaultTimeout = 1
34 defaultPathPrefix = "Prefix"
35)
36
37var embedEtcdServerPort int
38
39func init() {
40
41 var err error
42 embedEtcdServerPort, err = freeport.GetFreePort()
43 if err != nil {
44 logger.Fatal("Cannot get freeport for KvClient")
45 }
46}
47
48func TestNewAdapterProxy(t *testing.T) {
49
50 var mockKafkaIcProxy = &mocks.MockKafkaICProxy{
51 InvokeRpcSpy: mocks.InvokeRpcSpy{
52 Calls: make(map[int]mocks.InvokeRpcArgs),
53 Response: &voltha.Device{Id: "testDeviceId"},
54 },
55 }
56 backend := db.NewBackend("etcd", embedEtcdServerHost, embedEtcdServerPort, defaultTimeout, defaultPathPrefix)
57 adapter := NewAdapterProxy(mockKafkaIcProxy, "testAdapterTopic", "testCoreTopic", backend)
58
59 assert.NotNil(t, adapter)
60}
61
62func TestSendInterAdapterMessage(t *testing.T) {
63
64 var mockKafkaIcProxy = &mocks.MockKafkaICProxy{
65 InvokeRpcSpy: mocks.InvokeRpcSpy{
66 Calls: make(map[int]mocks.InvokeRpcArgs),
67 Response: &voltha.Device{Id: "testDeviceId"},
68 },
69 }
70
71 backend := db.NewBackend("etcd", embedEtcdServerHost, embedEtcdServerPort, defaultTimeout, defaultPathPrefix)
72
73 adapter := NewAdapterProxy(mockKafkaIcProxy, "testAdapterTopic", "testCoreTopic", backend)
74
75 adapter.endpointMgr = mocks.NewEndpointManager()
76
77 delGemPortMsg := &ic.InterAdapterDeleteGemPortMessage{UniId: 1, TpPath: "tpPath", GemPortId: 2}
78
79 err := adapter.SendInterAdapterMessage(context.TODO(), delGemPortMsg, ic.InterAdapterMessageType_DELETE_GEM_PORT_REQUEST, "Adapter1", "Adapter2", "testDeviceId", "testProxyDeviceId", "testMessage")
80
81 assert.Nil(t, err)
82
83 assert.Equal(t, mockKafkaIcProxy.InvokeRpcSpy.CallCount, 1)
84
85 call := mockKafkaIcProxy.InvokeRpcSpy.Calls[1]
86
87 assert.Equal(t, call.Rpc, "process_inter_adapter_message")
88 assert.Equal(t, *call.ToTopic, kafka.Topic{Name: "Adapter2"})
89 assert.Equal(t, *call.ReplyToTopic, kafka.Topic{Name: "Adapter1"})
90 assert.Equal(t, call.WaitForResponse, true)
91 assert.Equal(t, call.Key, "testProxyDeviceId")
92
93 kvArgs := call.KvArgs[0].(*kafka.KVArg)
94
95 adapterMessage := kvArgs.Value.(*ic.InterAdapterMessage)
96
97 assert.Equal(t, adapterMessage.Header.Id, "testMessage")
98 assert.Equal(t, adapterMessage.Header.Type, ic.InterAdapterMessageType_DELETE_GEM_PORT_REQUEST)
99 assert.Equal(t, adapterMessage.Header.FromTopic, "Adapter1")
100 assert.Equal(t, adapterMessage.Header.ToTopic, "Adapter2")
101 assert.Equal(t, adapterMessage.Header.ToDeviceId, "testDeviceId")
102 assert.Equal(t, adapterMessage.Header.ProxyDeviceId, "testProxyDeviceId")
103
104 assert.Equal(t, kvArgs.Key, "msg")
105}
106
107func TestHeaderId(t *testing.T) {
108
109 var mockKafkaIcProxy = &mocks.MockKafkaICProxy{
110 InvokeRpcSpy: mocks.InvokeRpcSpy{
111 Calls: make(map[int]mocks.InvokeRpcArgs),
112 Response: &voltha.Device{Id: "testDeviceId"},
113 },
114 }
115
116 backend := db.NewBackend("etcd", embedEtcdServerHost, embedEtcdServerPort, defaultTimeout, defaultPathPrefix)
117
118 adapter := NewAdapterProxy(mockKafkaIcProxy, "testAdapterTopic", "testCoreTopic", backend)
119
120 adapter.endpointMgr = mocks.NewEndpointManager()
121
122 delGemPortMsg := &ic.InterAdapterDeleteGemPortMessage{UniId: 1, TpPath: "tpPath", GemPortId: 2}
123
124 err := adapter.SendInterAdapterMessage(context.TODO(), delGemPortMsg, ic.InterAdapterMessageType_DELETE_GEM_PORT_REQUEST, "Adapter1", "Adapter2", "testDeviceId", "testProxyDeviceId", "")
125 call := mockKafkaIcProxy.InvokeRpcSpy.Calls[1]
126
127 kvArgs := call.KvArgs[0].(*kafka.KVArg)
128
129 adapterMessage := kvArgs.Value.(*ic.InterAdapterMessage)
130
131 assert.Nil(t, err)
132 assert.Equal(t, mockKafkaIcProxy.InvokeRpcSpy.CallCount, 1)
133 assert.Len(t, adapterMessage.Header.Id, 36)
134}
135
136func TestInvalidProtoMessage(t *testing.T) {
137
138 var mockKafkaIcProxy = &mocks.MockKafkaICProxy{
139 InvokeRpcSpy: mocks.InvokeRpcSpy{
140 Calls: make(map[int]mocks.InvokeRpcArgs),
141 Response: &voltha.Device{Id: "testDeviceId"},
142 },
143 }
144
145 backend := db.NewBackend("etcd", embedEtcdServerHost, embedEtcdServerPort, defaultTimeout, defaultPathPrefix)
146
147 adapter := NewAdapterProxy(mockKafkaIcProxy, "testAdapterTopic", "testCoreTopic", backend)
148
149 adapter.endpointMgr = mocks.NewEndpointManager()
150
151 err := adapter.SendInterAdapterMessage(context.TODO(), nil, ic.InterAdapterMessageType_DELETE_GEM_PORT_REQUEST, "Adapter1", "Adapter2", "testDeviceId", "testProxyDeviceId", "testMessage")
152
153 assert.NotNil(t, err)
154}