blob: 3ba82906864414844cdc88d4b8e8ef5523b68ce1 [file] [log] [blame]
Rohan Agrawal8d4d6c92020-04-23 12:59:35 +00001/*
2 * Copyright 2020-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package common
18
19import (
20 "context"
21 "github.com/opencord/voltha-lib-go/v3/pkg/db"
22 "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
23 mocks "github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka"
24 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
25 "github.com/opencord/voltha-protos/v3/go/voltha"
26 "github.com/phayes/freeport"
27 "github.com/stretchr/testify/assert"
Neha Sharmadd9af392020-04-28 09:03:57 +000028 "strconv"
Rohan Agrawal8d4d6c92020-04-23 12:59:35 +000029 "testing"
30)
31
32const (
33 embedEtcdServerHost = "localhost"
34 defaultTimeout = 1
35 defaultPathPrefix = "Prefix"
36)
37
38var embedEtcdServerPort int
39
40func init() {
41
42 var err error
43 embedEtcdServerPort, err = freeport.GetFreePort()
44 if err != nil {
Scott Bakere6685952020-06-23 04:05:39 +000045 logger.Fatal("Cannot get freeport for KvClient")
Rohan Agrawal8d4d6c92020-04-23 12:59:35 +000046 }
47}
48
49func TestNewAdapterProxy(t *testing.T) {
50
51 var mockKafkaIcProxy = &mocks.MockKafkaICProxy{
52 InvokeRpcSpy: mocks.InvokeRpcSpy{
53 Calls: make(map[int]mocks.InvokeRpcArgs),
54 Response: &voltha.Device{Id: "testDeviceId"},
55 },
56 }
Scott Bakere6685952020-06-23 04:05:39 +000057 backend := db.NewBackend("etcd", embedEtcdServerHost+":"+strconv.Itoa(embedEtcdServerPort), defaultTimeout, defaultPathPrefix)
58 adapter := NewAdapterProxy(mockKafkaIcProxy, "testAdapterTopic", "testCoreTopic", backend)
Rohan Agrawal8d4d6c92020-04-23 12:59:35 +000059
60 assert.NotNil(t, adapter)
61}
62
63func TestSendInterAdapterMessage(t *testing.T) {
64
65 var mockKafkaIcProxy = &mocks.MockKafkaICProxy{
66 InvokeRpcSpy: mocks.InvokeRpcSpy{
67 Calls: make(map[int]mocks.InvokeRpcArgs),
68 Response: &voltha.Device{Id: "testDeviceId"},
69 },
70 }
Scott Bakere6685952020-06-23 04:05:39 +000071 backend := db.NewBackend("etcd", embedEtcdServerHost+":"+strconv.Itoa(embedEtcdServerPort), defaultTimeout, defaultPathPrefix)
Rohan Agrawal8d4d6c92020-04-23 12:59:35 +000072
Scott Bakere6685952020-06-23 04:05:39 +000073 adapter := NewAdapterProxy(mockKafkaIcProxy, "testAdapterTopic", "testCoreTopic", backend)
Rohan Agrawal8d4d6c92020-04-23 12:59:35 +000074
75 adapter.endpointMgr = mocks.NewEndpointManager()
76
77 delGemPortMsg := &ic.InterAdapterDeleteGemPortMessage{UniId: 1, TpPath: "tpPath", GemPortId: 2}
78
79 err := adapter.SendInterAdapterMessage(context.TODO(), delGemPortMsg, ic.InterAdapterMessageType_DELETE_GEM_PORT_REQUEST, "Adapter1", "Adapter2", "testDeviceId", "testProxyDeviceId", "testMessage")
80
81 assert.Nil(t, err)
82
83 assert.Equal(t, mockKafkaIcProxy.InvokeRpcSpy.CallCount, 1)
84
85 call := mockKafkaIcProxy.InvokeRpcSpy.Calls[1]
86
87 assert.Equal(t, call.Rpc, "process_inter_adapter_message")
88 assert.Equal(t, *call.ToTopic, kafka.Topic{Name: "Adapter2"})
89 assert.Equal(t, *call.ReplyToTopic, kafka.Topic{Name: "Adapter1"})
90 assert.Equal(t, call.WaitForResponse, true)
91 assert.Equal(t, call.Key, "testProxyDeviceId")
92
93 kvArgs := call.KvArgs[0].(*kafka.KVArg)
94
95 adapterMessage := kvArgs.Value.(*ic.InterAdapterMessage)
96
97 assert.Equal(t, adapterMessage.Header.Id, "testMessage")
98 assert.Equal(t, adapterMessage.Header.Type, ic.InterAdapterMessageType_DELETE_GEM_PORT_REQUEST)
99 assert.Equal(t, adapterMessage.Header.FromTopic, "Adapter1")
100 assert.Equal(t, adapterMessage.Header.ToTopic, "Adapter2")
101 assert.Equal(t, adapterMessage.Header.ToDeviceId, "testDeviceId")
102 assert.Equal(t, adapterMessage.Header.ProxyDeviceId, "testProxyDeviceId")
103
104 assert.Equal(t, kvArgs.Key, "msg")
105}
106
107func TestHeaderId(t *testing.T) {
108
109 var mockKafkaIcProxy = &mocks.MockKafkaICProxy{
110 InvokeRpcSpy: mocks.InvokeRpcSpy{
111 Calls: make(map[int]mocks.InvokeRpcArgs),
112 Response: &voltha.Device{Id: "testDeviceId"},
113 },
114 }
Scott Bakere6685952020-06-23 04:05:39 +0000115 backend := db.NewBackend("etcd", embedEtcdServerHost+":"+strconv.Itoa(embedEtcdServerPort), defaultTimeout, defaultPathPrefix)
Rohan Agrawal8d4d6c92020-04-23 12:59:35 +0000116
Scott Bakere6685952020-06-23 04:05:39 +0000117 adapter := NewAdapterProxy(mockKafkaIcProxy, "testAdapterTopic", "testCoreTopic", backend)
Rohan Agrawal8d4d6c92020-04-23 12:59:35 +0000118
119 adapter.endpointMgr = mocks.NewEndpointManager()
120
121 delGemPortMsg := &ic.InterAdapterDeleteGemPortMessage{UniId: 1, TpPath: "tpPath", GemPortId: 2}
122
123 err := adapter.SendInterAdapterMessage(context.TODO(), delGemPortMsg, ic.InterAdapterMessageType_DELETE_GEM_PORT_REQUEST, "Adapter1", "Adapter2", "testDeviceId", "testProxyDeviceId", "")
124 call := mockKafkaIcProxy.InvokeRpcSpy.Calls[1]
125
126 kvArgs := call.KvArgs[0].(*kafka.KVArg)
127
128 adapterMessage := kvArgs.Value.(*ic.InterAdapterMessage)
129
130 assert.Nil(t, err)
131 assert.Equal(t, mockKafkaIcProxy.InvokeRpcSpy.CallCount, 1)
132 assert.Len(t, adapterMessage.Header.Id, 36)
133}
134
135func TestInvalidProtoMessage(t *testing.T) {
136
137 var mockKafkaIcProxy = &mocks.MockKafkaICProxy{
138 InvokeRpcSpy: mocks.InvokeRpcSpy{
139 Calls: make(map[int]mocks.InvokeRpcArgs),
140 Response: &voltha.Device{Id: "testDeviceId"},
141 },
142 }
Scott Bakere6685952020-06-23 04:05:39 +0000143 backend := db.NewBackend("etcd", embedEtcdServerHost+":"+strconv.Itoa(embedEtcdServerPort), defaultTimeout, defaultPathPrefix)
Rohan Agrawal8d4d6c92020-04-23 12:59:35 +0000144
Scott Bakere6685952020-06-23 04:05:39 +0000145 adapter := NewAdapterProxy(mockKafkaIcProxy, "testAdapterTopic", "testCoreTopic", backend)
Rohan Agrawal8d4d6c92020-04-23 12:59:35 +0000146
147 adapter.endpointMgr = mocks.NewEndpointManager()
148
149 err := adapter.SendInterAdapterMessage(context.TODO(), nil, ic.InterAdapterMessageType_DELETE_GEM_PORT_REQUEST, "Adapter1", "Adapter2", "testDeviceId", "testProxyDeviceId", "testMessage")
150
151 assert.NotNil(t, err)
152}