blob: 3989142c5d00e8f3d42c5d501e9ad542de03bc4b [file] [log] [blame]
khenaidooab1f7bd2019-11-14 14:00:27 -05001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package core
17
18import (
19 "context"
20 "crypto/rand"
npujar1d86a522019-11-14 17:11:16 +053021 "testing"
22 "time"
23
khenaidooab1f7bd2019-11-14 14:00:27 -050024 cm "github.com/opencord/voltha-go/rw_core/mocks"
serkant.uluderya2ae470f2020-01-21 11:13:09 -080025 com "github.com/opencord/voltha-lib-go/v3/pkg/adapters/common"
26 "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
27 "github.com/opencord/voltha-lib-go/v3/pkg/log"
28 lm "github.com/opencord/voltha-lib-go/v3/pkg/mocks"
29 of "github.com/opencord/voltha-protos/v3/go/openflow_13"
30 "github.com/opencord/voltha-protos/v3/go/voltha"
khenaidooab1f7bd2019-11-14 14:00:27 -050031 "github.com/stretchr/testify/assert"
32 "google.golang.org/grpc/codes"
33 "google.golang.org/grpc/status"
khenaidooab1f7bd2019-11-14 14:00:27 -050034)
35
36const (
37 coreName = "rw_core"
38 adapterName = "adapter_mock"
npujar1d86a522019-11-14 17:11:16 +053039 coreInstanceID = "1000"
khenaidooab1f7bd2019-11-14 14:00:27 -050040)
41
42var (
npujar467fe752020-01-16 20:17:45 +053043 coreKafkaICProxy kafka.InterContainerProxy
44 adapterKafkaICProxy kafka.InterContainerProxy
khenaidooab1f7bd2019-11-14 14:00:27 -050045 kc kafka.Client
46 adapterReqHandler *com.RequestHandlerProxy
47 adapter *cm.Adapter
48)
49
50func init() {
npujar1d86a522019-11-14 17:11:16 +053051 if _, err := log.SetDefaultLogger(log.JSON, 0, log.Fields{"instanceId": coreInstanceID}); err != nil {
khenaidooab1f7bd2019-11-14 14:00:27 -050052 log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
53 }
54 // Set the log level to Warning
55 log.SetAllLogLevel(2)
56
57 var err error
58
59 // Create the KV client
60 kc = lm.NewKafkaClient()
61
62 // Setup core inter-container proxy and core request handler
npujar467fe752020-01-16 20:17:45 +053063 coreKafkaICProxy = kafka.NewInterContainerProxy(
khenaidooab1f7bd2019-11-14 14:00:27 -050064 kafka.MsgClient(kc),
npujar467fe752020-01-16 20:17:45 +053065 kafka.DefaultTopic(&kafka.Topic{Name: coreName}))
npujar1d86a522019-11-14 17:11:16 +053066 if err = coreKafkaICProxy.Start(); err != nil {
khenaidooab1f7bd2019-11-14 14:00:27 -050067 log.Fatalw("Failure-starting-core-kafka-intercontainerProxy", log.Fields{"error": err})
68 }
npujar1d86a522019-11-14 17:11:16 +053069 if err = coreKafkaICProxy.SubscribeWithDefaultRequestHandler(kafka.Topic{Name: coreName}, 0); err != nil {
khenaidooab1f7bd2019-11-14 14:00:27 -050070 log.Fatalw("Failure-subscribing-core-request-handler", log.Fields{"error": err})
71 }
72
73 // Setup adapter inter-container proxy and adapter request handler
74 adapterCoreProxy := com.NewCoreProxy(nil, adapterName, coreName)
75 adapter = cm.NewAdapter(adapterCoreProxy)
npujar1d86a522019-11-14 17:11:16 +053076 adapterReqHandler = com.NewRequestHandlerProxy(coreInstanceID, adapter, adapterCoreProxy)
npujar467fe752020-01-16 20:17:45 +053077 adapterKafkaICProxy = kafka.NewInterContainerProxy(
khenaidooab1f7bd2019-11-14 14:00:27 -050078 kafka.MsgClient(kc),
79 kafka.DefaultTopic(&kafka.Topic{Name: adapterName}),
npujar467fe752020-01-16 20:17:45 +053080 kafka.RequestHandlerInterface(adapterReqHandler))
khenaidooab1f7bd2019-11-14 14:00:27 -050081 if err = adapterKafkaICProxy.Start(); err != nil {
82 log.Fatalw("Failure-starting-adapter-kafka-intercontainerProxy", log.Fields{"error": err})
83 }
84 if err = adapterKafkaICProxy.SubscribeWithDefaultRequestHandler(kafka.Topic{Name: adapterName}, 0); err != nil {
85 log.Fatalw("Failure-subscribing-adapter-request-handler", log.Fields{"error": err})
86 }
87}
88
89func getRandomBytes(size int) (bytes []byte, err error) {
90 bytes = make([]byte, size)
91 _, err = rand.Read(bytes)
92 return
93}
94
95func TestCreateAdapterProxy(t *testing.T) {
96 ap := NewAdapterProxy(coreKafkaICProxy, coreName)
97 assert.NotNil(t, ap)
98}
99
100func testSimpleRequests(t *testing.T) {
101 type simpleRequest func(context.Context, *voltha.Device) error
102 ap := NewAdapterProxy(coreKafkaICProxy, coreName)
103 simpleRequests := []simpleRequest{
104 ap.AdoptDevice,
105 ap.DisableDevice,
106 ap.RebootDevice,
107 ap.DeleteDevice,
108 ap.ReconcileDevice,
109 ap.ReEnableDevice,
110 }
111 for _, f := range simpleRequests {
112 //Success
113 d := &voltha.Device{Id: "deviceId", Adapter: adapterName}
114 ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
115 err := f(ctx, d)
116 cancel()
117 assert.Nil(t, err)
118
119 // Failure - invalid adapter
120 expectedError := status.Error(codes.Canceled, "context deadline exceeded")
121 d = &voltha.Device{Id: "deviceId", Adapter: "adapter_mock_1"}
122 ctx, cancel = context.WithTimeout(context.Background(), 20*time.Millisecond)
123 err = f(ctx, d)
124 cancel()
125 assert.NotNil(t, err)
126 assert.Equal(t, expectedError.Error(), err.Error())
127
128 // Failure - short timeout
129 expectedError = status.Error(codes.Canceled, "context deadline exceeded")
130 d = &voltha.Device{Id: "deviceId", Adapter: adapterName}
131 ctx, cancel = context.WithTimeout(context.Background(), 100*time.Nanosecond)
132 err = f(ctx, d)
133 cancel()
134 assert.NotNil(t, err)
135 assert.Equal(t, expectedError.Error(), err.Error())
136 }
137}
138
139func testGetSwitchCapabilityFromAdapter(t *testing.T) {
140 ap := NewAdapterProxy(coreKafkaICProxy, coreName)
141 d := &voltha.Device{Id: "deviceId", Adapter: adapterName}
142 ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
143 switchCap, err := ap.GetOfpDeviceInfo(ctx, d)
144 cancel()
145 assert.Nil(t, err)
146 assert.NotNil(t, switchCap)
147 expectedCap, _ := adapter.Get_ofp_device_info(d)
148 assert.Equal(t, switchCap.String(), expectedCap.String())
149}
150
151func testGetPortInfoFromAdapter(t *testing.T) {
152 ap := NewAdapterProxy(coreKafkaICProxy, coreName)
153 d := &voltha.Device{Id: "deviceId", Adapter: adapterName}
154 ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
155 portNo := uint32(1)
156 portInfo, err := ap.GetOfpPortInfo(ctx, d, portNo)
157 cancel()
158 assert.Nil(t, err)
159 assert.NotNil(t, portInfo)
160 expectedPortInfo, _ := adapter.Get_ofp_port_info(d, int64(portNo))
161 assert.Equal(t, portInfo.String(), expectedPortInfo.String())
162}
163
164func testPacketOut(t *testing.T) {
165 ap := NewAdapterProxy(coreKafkaICProxy, coreName)
166 d := &voltha.Device{Id: "deviceId", Adapter: adapterName}
167 outPort := uint32(1)
168 packet, err := getRandomBytes(50)
169 assert.Nil(t, err)
npujar467fe752020-01-16 20:17:45 +0530170 err = ap.packetOut(context.Background(), adapterName, d.Id, outPort, &of.OfpPacketOut{Data: packet})
khenaidooab1f7bd2019-11-14 14:00:27 -0500171 assert.Nil(t, err)
172}
173
174func testFlowUpdates(t *testing.T) {
175 ap := NewAdapterProxy(coreKafkaICProxy, coreName)
176 d := &voltha.Device{Id: "deviceId", Adapter: adapterName}
npujar467fe752020-01-16 20:17:45 +0530177 err := ap.UpdateFlowsBulk(context.Background(), d, &voltha.Flows{}, &voltha.FlowGroups{}, &voltha.FlowMetadata{})
khenaidooab1f7bd2019-11-14 14:00:27 -0500178 assert.Nil(t, err)
179 flowChanges := &voltha.FlowChanges{ToAdd: &voltha.Flows{Items: nil}, ToRemove: &voltha.Flows{Items: nil}}
180 groupChanges := &voltha.FlowGroupChanges{ToAdd: &voltha.FlowGroups{Items: nil}, ToRemove: &voltha.FlowGroups{Items: nil}, ToUpdate: &voltha.FlowGroups{Items: nil}}
npujar467fe752020-01-16 20:17:45 +0530181 err = ap.UpdateFlowsIncremental(context.Background(), d, flowChanges, groupChanges, &voltha.FlowMetadata{})
khenaidooab1f7bd2019-11-14 14:00:27 -0500182 assert.Nil(t, err)
183}
184
185func testPmUpdates(t *testing.T) {
186 ap := NewAdapterProxy(coreKafkaICProxy, coreName)
187 d := &voltha.Device{Id: "deviceId", Adapter: adapterName}
188 ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
189 err := ap.UpdatePmConfigs(ctx, d, &voltha.PmConfigs{})
190 cancel()
191 assert.Nil(t, err)
192}
193
194func TestSuite(t *testing.T) {
195 //1. Test the simple requests first
196 testSimpleRequests(t)
197
198 //2. Test get switch capability
199 testGetSwitchCapabilityFromAdapter(t)
200
201 //3. Test get port info
202 testGetPortInfoFromAdapter(t)
203
204 //4. Test PacketOut
205 testPacketOut(t)
206
207 // 5. Test flow updates
208 testFlowUpdates(t)
209
210 //6. Pm configs
211 testPmUpdates(t)
212}