blob: bef4fb2edb3e5018c7013db7db1ba0c2d5d6844d [file] [log] [blame]
Rohan Agrawal8d4d6c92020-04-23 12:59:35 +00001/*
2 * Copyright 2020-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package common
18
19import (
20 "context"
Girish Gowdra4c60c672021-07-26 13:30:57 -070021 "github.com/opencord/voltha-lib-go/v6/pkg/db"
22 "github.com/opencord/voltha-lib-go/v6/pkg/kafka"
23 mocks "github.com/opencord/voltha-lib-go/v6/pkg/mocks/kafka"
Girish Gowdra89c985b2020-10-14 15:02:09 -070024 ic "github.com/opencord/voltha-protos/v4/go/inter_container"
25 "github.com/opencord/voltha-protos/v4/go/voltha"
Rohan Agrawal8d4d6c92020-04-23 12:59:35 +000026 "github.com/phayes/freeport"
27 "github.com/stretchr/testify/assert"
Neha Sharmadd9af392020-04-28 09:03:57 +000028 "strconv"
Rohan Agrawal8d4d6c92020-04-23 12:59:35 +000029 "testing"
30)
31
32const (
33 embedEtcdServerHost = "localhost"
34 defaultTimeout = 1
35 defaultPathPrefix = "Prefix"
36)
37
38var embedEtcdServerPort int
39
40func init() {
41
Neha Sharma94f16a92020-06-26 04:17:55 +000042 ctx := context.Background()
Rohan Agrawal8d4d6c92020-04-23 12:59:35 +000043 var err error
44 embedEtcdServerPort, err = freeport.GetFreePort()
45 if err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +000046 logger.Fatal(ctx, "Cannot get freeport for KvClient")
Rohan Agrawal8d4d6c92020-04-23 12:59:35 +000047 }
48}
49
50func TestNewAdapterProxy(t *testing.T) {
51
52 var mockKafkaIcProxy = &mocks.MockKafkaICProxy{
53 InvokeRpcSpy: mocks.InvokeRpcSpy{
54 Calls: make(map[int]mocks.InvokeRpcArgs),
55 Response: &voltha.Device{Id: "testDeviceId"},
56 },
57 }
Neha Sharma94f16a92020-06-26 04:17:55 +000058 backend := db.NewBackend(context.Background(), "etcd", embedEtcdServerHost+":"+strconv.Itoa(embedEtcdServerPort), defaultTimeout, defaultPathPrefix)
Rohan Agrawal00e5efc2020-07-24 11:43:02 +000059 adapter := NewAdapterProxy(context.Background(), mockKafkaIcProxy, "testCoreTopic", backend)
Rohan Agrawal8d4d6c92020-04-23 12:59:35 +000060
61 assert.NotNil(t, adapter)
62}
63
64func TestSendInterAdapterMessage(t *testing.T) {
65
66 var mockKafkaIcProxy = &mocks.MockKafkaICProxy{
67 InvokeRpcSpy: mocks.InvokeRpcSpy{
68 Calls: make(map[int]mocks.InvokeRpcArgs),
69 Response: &voltha.Device{Id: "testDeviceId"},
70 },
71 }
Neha Sharma94f16a92020-06-26 04:17:55 +000072 backend := db.NewBackend(context.Background(), "etcd", embedEtcdServerHost+":"+strconv.Itoa(embedEtcdServerPort), defaultTimeout, defaultPathPrefix)
Rohan Agrawal8d4d6c92020-04-23 12:59:35 +000073
Rohan Agrawal00e5efc2020-07-24 11:43:02 +000074 adapter := NewAdapterProxy(context.Background(), mockKafkaIcProxy, "testCoreTopic", backend)
Rohan Agrawal8d4d6c92020-04-23 12:59:35 +000075
76 adapter.endpointMgr = mocks.NewEndpointManager()
77
Girish Gowdra248971a2021-06-01 15:14:15 -070078 delGemPortMsg := &ic.InterAdapterDeleteGemPortMessage{UniId: 1, TpInstancePath: "tpPath", GemPortId: 2}
Rohan Agrawal8d4d6c92020-04-23 12:59:35 +000079
80 err := adapter.SendInterAdapterMessage(context.TODO(), delGemPortMsg, ic.InterAdapterMessageType_DELETE_GEM_PORT_REQUEST, "Adapter1", "Adapter2", "testDeviceId", "testProxyDeviceId", "testMessage")
81
82 assert.Nil(t, err)
83
84 assert.Equal(t, mockKafkaIcProxy.InvokeRpcSpy.CallCount, 1)
85
86 call := mockKafkaIcProxy.InvokeRpcSpy.Calls[1]
87
88 assert.Equal(t, call.Rpc, "process_inter_adapter_message")
89 assert.Equal(t, *call.ToTopic, kafka.Topic{Name: "Adapter2"})
90 assert.Equal(t, *call.ReplyToTopic, kafka.Topic{Name: "Adapter1"})
91 assert.Equal(t, call.WaitForResponse, true)
92 assert.Equal(t, call.Key, "testProxyDeviceId")
93
94 kvArgs := call.KvArgs[0].(*kafka.KVArg)
95
96 adapterMessage := kvArgs.Value.(*ic.InterAdapterMessage)
97
98 assert.Equal(t, adapterMessage.Header.Id, "testMessage")
99 assert.Equal(t, adapterMessage.Header.Type, ic.InterAdapterMessageType_DELETE_GEM_PORT_REQUEST)
100 assert.Equal(t, adapterMessage.Header.FromTopic, "Adapter1")
101 assert.Equal(t, adapterMessage.Header.ToTopic, "Adapter2")
102 assert.Equal(t, adapterMessage.Header.ToDeviceId, "testDeviceId")
103 assert.Equal(t, adapterMessage.Header.ProxyDeviceId, "testProxyDeviceId")
104
105 assert.Equal(t, kvArgs.Key, "msg")
106}
107
108func TestHeaderId(t *testing.T) {
109
110 var mockKafkaIcProxy = &mocks.MockKafkaICProxy{
111 InvokeRpcSpy: mocks.InvokeRpcSpy{
112 Calls: make(map[int]mocks.InvokeRpcArgs),
113 Response: &voltha.Device{Id: "testDeviceId"},
114 },
115 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000116 backend := db.NewBackend(context.Background(), "etcd", embedEtcdServerHost+":"+strconv.Itoa(embedEtcdServerPort), defaultTimeout, defaultPathPrefix)
Rohan Agrawal8d4d6c92020-04-23 12:59:35 +0000117
Rohan Agrawal00e5efc2020-07-24 11:43:02 +0000118 adapter := NewAdapterProxy(context.Background(), mockKafkaIcProxy, "testCoreTopic", backend)
Rohan Agrawal8d4d6c92020-04-23 12:59:35 +0000119
120 adapter.endpointMgr = mocks.NewEndpointManager()
121
Girish Gowdra248971a2021-06-01 15:14:15 -0700122 delGemPortMsg := &ic.InterAdapterDeleteGemPortMessage{UniId: 1, TpInstancePath: "tpPath", GemPortId: 2}
Rohan Agrawal8d4d6c92020-04-23 12:59:35 +0000123
124 err := adapter.SendInterAdapterMessage(context.TODO(), delGemPortMsg, ic.InterAdapterMessageType_DELETE_GEM_PORT_REQUEST, "Adapter1", "Adapter2", "testDeviceId", "testProxyDeviceId", "")
125 call := mockKafkaIcProxy.InvokeRpcSpy.Calls[1]
126
127 kvArgs := call.KvArgs[0].(*kafka.KVArg)
128
129 adapterMessage := kvArgs.Value.(*ic.InterAdapterMessage)
130
131 assert.Nil(t, err)
132 assert.Equal(t, mockKafkaIcProxy.InvokeRpcSpy.CallCount, 1)
133 assert.Len(t, adapterMessage.Header.Id, 36)
134}
135
136func TestInvalidProtoMessage(t *testing.T) {
137
138 var mockKafkaIcProxy = &mocks.MockKafkaICProxy{
139 InvokeRpcSpy: mocks.InvokeRpcSpy{
140 Calls: make(map[int]mocks.InvokeRpcArgs),
141 Response: &voltha.Device{Id: "testDeviceId"},
142 },
143 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000144 backend := db.NewBackend(context.Background(), "etcd", embedEtcdServerHost+":"+strconv.Itoa(embedEtcdServerPort), defaultTimeout, defaultPathPrefix)
Rohan Agrawal8d4d6c92020-04-23 12:59:35 +0000145
Rohan Agrawal00e5efc2020-07-24 11:43:02 +0000146 adapter := NewAdapterProxy(context.Background(), mockKafkaIcProxy, "testCoreTopic", backend)
Rohan Agrawal8d4d6c92020-04-23 12:59:35 +0000147
148 adapter.endpointMgr = mocks.NewEndpointManager()
149
150 err := adapter.SendInterAdapterMessage(context.TODO(), nil, ic.InterAdapterMessageType_DELETE_GEM_PORT_REQUEST, "Adapter1", "Adapter2", "testDeviceId", "testProxyDeviceId", "testMessage")
151
152 assert.NotNil(t, err)
153}