blob: e6e0ecb53ba0ee4d81913595200e1c84896e9ef6 [file] [log] [blame]
khenaidooab1f7bd2019-11-14 14:00:27 -05001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package core
17
18import (
19 "context"
20 "crypto/rand"
21 cm "github.com/opencord/voltha-go/rw_core/mocks"
22 com "github.com/opencord/voltha-lib-go/v2/pkg/adapters/common"
23 "github.com/opencord/voltha-lib-go/v2/pkg/kafka"
24 "github.com/opencord/voltha-lib-go/v2/pkg/log"
25 lm "github.com/opencord/voltha-lib-go/v2/pkg/mocks"
26 of "github.com/opencord/voltha-protos/v2/go/openflow_13"
27 "github.com/opencord/voltha-protos/v2/go/voltha"
28 "github.com/stretchr/testify/assert"
29 "google.golang.org/grpc/codes"
30 "google.golang.org/grpc/status"
31 "testing"
32 "time"
33)
34
35const (
36 coreName = "rw_core"
37 adapterName = "adapter_mock"
38 coreInstanceId = "1000"
39)
40
41var (
42 coreKafkaICProxy *kafka.InterContainerProxy
43 adapterKafkaICProxy *kafka.InterContainerProxy
44 kc kafka.Client
45 adapterReqHandler *com.RequestHandlerProxy
46 adapter *cm.Adapter
47)
48
49func init() {
50 if _, err := log.SetDefaultLogger(log.JSON, 0, log.Fields{"instanceId": coreInstanceId}); err != nil {
51 log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
52 }
53 // Set the log level to Warning
54 log.SetAllLogLevel(2)
55
56 var err error
57
58 // Create the KV client
59 kc = lm.NewKafkaClient()
60
61 // Setup core inter-container proxy and core request handler
62 if coreKafkaICProxy, err = kafka.NewInterContainerProxy(
63 kafka.MsgClient(kc),
64 kafka.DefaultTopic(&kafka.Topic{Name: coreName})); err != nil || coreKafkaICProxy == nil {
65 log.Fatalw("Failure-creating-core-intercontainerProxy", log.Fields{"error": err})
66
67 }
68 if err := coreKafkaICProxy.Start(); err != nil {
69 log.Fatalw("Failure-starting-core-kafka-intercontainerProxy", log.Fields{"error": err})
70 }
71 if err := coreKafkaICProxy.SubscribeWithDefaultRequestHandler(kafka.Topic{Name: coreName}, 0); err != nil {
72 log.Fatalw("Failure-subscribing-core-request-handler", log.Fields{"error": err})
73 }
74
75 // Setup adapter inter-container proxy and adapter request handler
76 adapterCoreProxy := com.NewCoreProxy(nil, adapterName, coreName)
77 adapter = cm.NewAdapter(adapterCoreProxy)
78 adapterReqHandler = com.NewRequestHandlerProxy(coreInstanceId, adapter, adapterCoreProxy)
79 if adapterKafkaICProxy, err = kafka.NewInterContainerProxy(
80 kafka.MsgClient(kc),
81 kafka.DefaultTopic(&kafka.Topic{Name: adapterName}),
82 kafka.RequestHandlerInterface(adapterReqHandler)); err != nil || adapterKafkaICProxy == nil {
83 log.Fatalw("Failure-creating-adapter-intercontainerProxy", log.Fields{"error": err})
84 }
85 if err = adapterKafkaICProxy.Start(); err != nil {
86 log.Fatalw("Failure-starting-adapter-kafka-intercontainerProxy", log.Fields{"error": err})
87 }
88 if err = adapterKafkaICProxy.SubscribeWithDefaultRequestHandler(kafka.Topic{Name: adapterName}, 0); err != nil {
89 log.Fatalw("Failure-subscribing-adapter-request-handler", log.Fields{"error": err})
90 }
91}
92
93func getRandomBytes(size int) (bytes []byte, err error) {
94 bytes = make([]byte, size)
95 _, err = rand.Read(bytes)
96 return
97}
98
99func TestCreateAdapterProxy(t *testing.T) {
100 ap := NewAdapterProxy(coreKafkaICProxy, coreName)
101 assert.NotNil(t, ap)
102}
103
104func testSimpleRequests(t *testing.T) {
105 type simpleRequest func(context.Context, *voltha.Device) error
106 ap := NewAdapterProxy(coreKafkaICProxy, coreName)
107 simpleRequests := []simpleRequest{
108 ap.AdoptDevice,
109 ap.DisableDevice,
110 ap.RebootDevice,
111 ap.DeleteDevice,
112 ap.ReconcileDevice,
113 ap.ReEnableDevice,
114 }
115 for _, f := range simpleRequests {
116 //Success
117 d := &voltha.Device{Id: "deviceId", Adapter: adapterName}
118 ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
119 err := f(ctx, d)
120 cancel()
121 assert.Nil(t, err)
122
123 // Failure - invalid adapter
124 expectedError := status.Error(codes.Canceled, "context deadline exceeded")
125 d = &voltha.Device{Id: "deviceId", Adapter: "adapter_mock_1"}
126 ctx, cancel = context.WithTimeout(context.Background(), 20*time.Millisecond)
127 err = f(ctx, d)
128 cancel()
129 assert.NotNil(t, err)
130 assert.Equal(t, expectedError.Error(), err.Error())
131
132 // Failure - short timeout
133 expectedError = status.Error(codes.Canceled, "context deadline exceeded")
134 d = &voltha.Device{Id: "deviceId", Adapter: adapterName}
135 ctx, cancel = context.WithTimeout(context.Background(), 100*time.Nanosecond)
136 err = f(ctx, d)
137 cancel()
138 assert.NotNil(t, err)
139 assert.Equal(t, expectedError.Error(), err.Error())
140 }
141}
142
143func testGetSwitchCapabilityFromAdapter(t *testing.T) {
144 ap := NewAdapterProxy(coreKafkaICProxy, coreName)
145 d := &voltha.Device{Id: "deviceId", Adapter: adapterName}
146 ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
147 switchCap, err := ap.GetOfpDeviceInfo(ctx, d)
148 cancel()
149 assert.Nil(t, err)
150 assert.NotNil(t, switchCap)
151 expectedCap, _ := adapter.Get_ofp_device_info(d)
152 assert.Equal(t, switchCap.String(), expectedCap.String())
153}
154
155func testGetPortInfoFromAdapter(t *testing.T) {
156 ap := NewAdapterProxy(coreKafkaICProxy, coreName)
157 d := &voltha.Device{Id: "deviceId", Adapter: adapterName}
158 ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
159 portNo := uint32(1)
160 portInfo, err := ap.GetOfpPortInfo(ctx, d, portNo)
161 cancel()
162 assert.Nil(t, err)
163 assert.NotNil(t, portInfo)
164 expectedPortInfo, _ := adapter.Get_ofp_port_info(d, int64(portNo))
165 assert.Equal(t, portInfo.String(), expectedPortInfo.String())
166}
167
168func testPacketOut(t *testing.T) {
169 ap := NewAdapterProxy(coreKafkaICProxy, coreName)
170 d := &voltha.Device{Id: "deviceId", Adapter: adapterName}
171 outPort := uint32(1)
172 packet, err := getRandomBytes(50)
173 assert.Nil(t, err)
174 err = ap.packetOut(adapterName, d.Id, outPort, &of.OfpPacketOut{Data: packet})
175 assert.Nil(t, err)
176}
177
178func testFlowUpdates(t *testing.T) {
179 ap := NewAdapterProxy(coreKafkaICProxy, coreName)
180 d := &voltha.Device{Id: "deviceId", Adapter: adapterName}
181 err := ap.UpdateFlowsBulk(d, &voltha.Flows{}, &voltha.FlowGroups{}, &voltha.FlowMetadata{})
182 assert.Nil(t, err)
183 flowChanges := &voltha.FlowChanges{ToAdd: &voltha.Flows{Items: nil}, ToRemove: &voltha.Flows{Items: nil}}
184 groupChanges := &voltha.FlowGroupChanges{ToAdd: &voltha.FlowGroups{Items: nil}, ToRemove: &voltha.FlowGroups{Items: nil}, ToUpdate: &voltha.FlowGroups{Items: nil}}
185 err = ap.UpdateFlowsIncremental(d, flowChanges, groupChanges, &voltha.FlowMetadata{})
186 assert.Nil(t, err)
187}
188
189func testPmUpdates(t *testing.T) {
190 ap := NewAdapterProxy(coreKafkaICProxy, coreName)
191 d := &voltha.Device{Id: "deviceId", Adapter: adapterName}
192 ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
193 err := ap.UpdatePmConfigs(ctx, d, &voltha.PmConfigs{})
194 cancel()
195 assert.Nil(t, err)
196}
197
198func TestSuite(t *testing.T) {
199 //1. Test the simple requests first
200 testSimpleRequests(t)
201
202 //2. Test get switch capability
203 testGetSwitchCapabilityFromAdapter(t)
204
205 //3. Test get port info
206 testGetPortInfoFromAdapter(t)
207
208 //4. Test PacketOut
209 testPacketOut(t)
210
211 // 5. Test flow updates
212 testFlowUpdates(t)
213
214 //6. Pm configs
215 testPmUpdates(t)
216}