blob: ebda7de693318b2bc4e372bec398a70fe079ebd3 [file] [log] [blame]
khenaidoo0458db62019-06-20 08:50:36 -04001// +build integration
2
3/*
4 * Copyright 2018-present Open Networking Foundation
5
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9
10 * http://www.apache.org/licenses/LICENSE-2.0
11
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18package core
19
20import (
21 "context"
22 "fmt"
23 "github.com/google/uuid"
24 "github.com/opencord/voltha-go/common/log"
25 fu "github.com/opencord/voltha-go/rw_core/utils"
26 tu "github.com/opencord/voltha-go/tests/utils"
27 "github.com/opencord/voltha-protos/go/common"
28 ofp "github.com/opencord/voltha-protos/go/openflow_13"
29 "github.com/opencord/voltha-protos/go/voltha"
30 "github.com/stretchr/testify/assert"
31 "google.golang.org/grpc/metadata"
32 "math"
33 "os"
34 "testing"
35 "time"
36)
37
38var stub voltha.VolthaServiceClient
39var volthaSerialNumberKey string
40
41/*
42 This local "integration" test uses one RW-Core, one simulated_olt and one simulated_onu adapter to test flows
43(add/delete), in a development environment. It uses docker-compose to set up the local environment. However, it can
44easily be extended to run in k8s environment.
45
46The compose files used are located under %GOPATH/src/github.com/opencord/voltha-go/compose. If the GOPATH is not set
47then you can specify the location of the compose files by using COMPOSE_PATH to set the compose files location.
48
49To run this test: DOCKER_HOST_IP=<local IP> go test -v
50
51NOTE: Since this is an integration test that involves several containers and features (device creation, device
52activation, validation of parent and discovered devices, validation of logical device as well as add/delete flows)
53then a failure can occur anywhere not just when testing flows.
54
55*/
56
57var allDevices map[string]*voltha.Device
58var allLogicalDevices map[string]*voltha.LogicalDevice
59
60var composePath string
61
62const (
63 GRPC_PORT = 50057
64 NUM_OLTS = 1
65 NUM_ONUS_PER_OLT = 4 // This should coincide with the number of onus per olt in adapters-simulated.yml file
66)
67
68func setup() {
69 var err error
70
71 if _, err = log.AddPackage(log.JSON, log.WarnLevel, log.Fields{"instanceId": "testing"}); err != nil {
72 log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
73 }
74 log.UpdateAllLoggers(log.Fields{"instanceId": "testing"})
75 log.SetAllLogLevel(log.ErrorLevel)
76
77 volthaSerialNumberKey = "voltha_serial_number"
78 allDevices = make(map[string]*voltha.Device)
79 allLogicalDevices = make(map[string]*voltha.LogicalDevice)
80
81 grpcHostIP := os.Getenv("DOCKER_HOST_IP")
82 goPath := os.Getenv("GOPATH")
83 if goPath != "" {
84 composePath = fmt.Sprintf("%s/src/github.com/opencord/voltha-go/compose", goPath)
85 } else {
86 composePath = os.Getenv("COMPOSE_PATH")
87 }
88
89 fmt.Println("Using compose path:", composePath)
90
91 //Start the simulated environment
92 if err = tu.StartSimulatedEnv(composePath); err != nil {
93 fmt.Println("Failure starting simulated environment:", err)
94 os.Exit(10)
95 }
96
97 stub, err = tu.SetupGrpcConnectionToCore(grpcHostIP, GRPC_PORT)
98 if err != nil {
99 fmt.Println("Failure connecting to Voltha Core:", err)
100 os.Exit(11)
101 }
102
103 // Wait for the simulated devices to be registered in the Voltha Core
104 adapters := []string{"simulated_olt", "simulated_onu"}
105 if _, err = tu.WaitForAdapterRegistration(stub, adapters, 20); err != nil {
106 fmt.Println("Failure retrieving adapters:", err)
107 os.Exit(12)
108 }
109}
110
111func shutdown() {
112 err := tu.StopSimulatedEnv(composePath)
113 if err != nil {
114 fmt.Println("Failure stop simulated environment:", err)
115 }
116}
117
118func refreshLocalDeviceCache(stub voltha.VolthaServiceClient) error {
119 retrievedDevices, err := tu.ListDevices(stub)
120 if err != nil {
121 return err
122 }
123 for _, d := range retrievedDevices.Items {
124 allDevices[d.Id] = d
125 }
126
127 retrievedLogicalDevices, err := tu.ListLogicalDevices(stub)
128 if err != nil {
129 return err
130 }
131
132 for _, ld := range retrievedLogicalDevices.Items {
133 allLogicalDevices[ld.Id] = ld
134 }
135 return nil
136}
137
138func makeSimpleFlowMod(fa *fu.FlowArgs) *ofp.OfpFlowMod {
139 matchFields := make([]*ofp.OfpOxmField, 0)
140 for _, val := range fa.MatchFields {
141 matchFields = append(matchFields, &ofp.OfpOxmField{Field: &ofp.OfpOxmField_OfbField{OfbField: val}})
142 }
143 return fu.MkSimpleFlowMod(matchFields, fa.Actions, fa.Command, fa.KV)
144}
145
146func addEAPOLFlow(stub voltha.VolthaServiceClient, ld *voltha.LogicalDevice, port *voltha.LogicalPort, ch chan interface{}) {
147 var fa *fu.FlowArgs
148 fa = &fu.FlowArgs{
149 KV: fu.OfpFlowModArgs{"priority": 2000},
150 MatchFields: []*ofp.OfpOxmOfbField{
151 fu.InPort(port.OfpPort.PortNo),
152 fu.EthType(0x888e),
153 },
154 Actions: []*ofp.OfpAction{
155 fu.Output(uint32(ofp.OfpPortNo_OFPP_CONTROLLER)),
156 },
157 }
158 matchFields := make([]*ofp.OfpOxmField, 0)
159 for _, val := range fa.MatchFields {
160 matchFields = append(matchFields, &ofp.OfpOxmField{Field: &ofp.OfpOxmField_OfbField{OfbField: val}})
161 }
162 f := ofp.FlowTableUpdate{FlowMod: makeSimpleFlowMod(fa), Id: ld.Id}
163
164 ui := uuid.New()
165 ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(volthaSerialNumberKey, ui.String()))
166 if response, err := stub.UpdateLogicalDeviceFlowTable(ctx, &f); err != nil {
167 ch <- err
168 } else {
169 ch <- response
170 }
171}
172
173func getNumUniPort(ld *voltha.LogicalDevice, lPortNos ...uint32) int {
174 num := 0
175 if len(lPortNos) > 0 {
176 for _, pNo := range lPortNos {
177 for _, lPort := range ld.Ports {
178 if !lPort.RootPort && lPort.OfpPort.PortNo == pNo {
179 num += 1
180 }
181 }
182 }
183 } else {
184 for _, port := range ld.Ports {
185 if !port.RootPort {
186 num += 1
187 }
188 }
189 }
190 return num
191}
192
193func filterOutPort(lPort *voltha.LogicalPort, lPortNos ...uint32) bool {
194 if len(lPortNos) == 0 {
195 return false
196 }
197 for _, pNo := range lPortNos {
198 if lPort.OfpPort.PortNo == pNo {
199 return false
200 }
201 }
202 return true
203}
204
205func verifyEAPOLFlows(t *testing.T, ld *voltha.LogicalDevice, lPortNos ...uint32) {
206 // First get the flows from the logical device
207 lFlows := ld.Flows
208 assert.Equal(t, getNumUniPort(ld, lPortNos...), len(lFlows.Items))
209
210 onuDeviceId := ""
211
212 // Verify that the flows in the logical device is what was pushed
213 for _, lPort := range ld.Ports {
214 if lPort.RootPort {
215 continue
216 }
217 if filterOutPort(lPort, lPortNos...) {
218 continue
219 }
220 onuDeviceId = lPort.DeviceId
221 var fa *fu.FlowArgs
222 fa = &fu.FlowArgs{
223 KV: fu.OfpFlowModArgs{"priority": 2000},
224 MatchFields: []*ofp.OfpOxmOfbField{
225 fu.InPort(lPort.OfpPort.PortNo),
226 fu.EthType(0x888e),
227 },
228 Actions: []*ofp.OfpAction{
229 fu.Output(uint32(ofp.OfpPortNo_OFPP_CONTROLLER)),
230 },
231 }
232 expectedLdFlow := fu.MkFlowStat(fa)
233 assert.Equal(t, true, tu.IsFlowPresent(expectedLdFlow, lFlows.Items))
234 }
235
236 // Verify the OLT flows
237 retrievedOltFlows := allDevices[ld.RootDeviceId].Flows.Items
238 assert.Equal(t, NUM_OLTS*getNumUniPort(ld, lPortNos...)*2, len(retrievedOltFlows))
239 for _, lPort := range ld.Ports {
240 if lPort.RootPort {
241 continue
242 }
243 if filterOutPort(lPort, lPortNos...) {
244 continue
245 }
246
247 fa := &fu.FlowArgs{
248 KV: fu.OfpFlowModArgs{"priority": 2000},
249 MatchFields: []*ofp.OfpOxmOfbField{
250 fu.InPort(1),
251 fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | lPort.OfpPort.PortNo),
252 fu.TunnelId(uint64(lPort.OfpPort.PortNo)),
253 fu.EthType(0x888e),
254 },
255 Actions: []*ofp.OfpAction{
256 fu.PushVlan(0x8100),
257 fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 4000)),
258 fu.Output(uint32(ofp.OfpPortNo_OFPP_CONTROLLER)),
259 },
260 }
261 expectedOltFlow := fu.MkFlowStat(fa)
262 assert.Equal(t, true, tu.IsFlowPresent(expectedOltFlow, retrievedOltFlows))
263
264 fa = &fu.FlowArgs{
265 KV: fu.OfpFlowModArgs{"priority": 2000},
266 MatchFields: []*ofp.OfpOxmOfbField{
267 fu.InPort(2),
268 fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 4000),
269 fu.VlanPcp(0),
270 fu.Metadata_ofp(uint64(lPort.OfpPort.PortNo)),
271 fu.TunnelId(uint64(lPort.OfpPort.PortNo)),
272 },
273 Actions: []*ofp.OfpAction{
274 fu.PopVlan(),
275 fu.Output(1),
276 },
277 }
278 expectedOltFlow = fu.MkFlowStat(fa)
279 assert.Equal(t, true, tu.IsFlowPresent(expectedOltFlow, retrievedOltFlows))
280 }
281 // Verify the ONU flows
282 retrievedOnuFlows := allDevices[onuDeviceId].Flows.Items
283 assert.Equal(t, 0, len(retrievedOnuFlows))
284}
285
286func verifyNOFlows(t *testing.T, ld *voltha.LogicalDevice, lPortNos ...uint32) {
287 if len(lPortNos) == 0 {
288 assert.Equal(t, 0, len(ld.Flows.Items))
289 for _, d := range allDevices {
290 if d.ParentId == ld.Id {
291 assert.Equal(t, 0, len(d.Flows.Items))
292 }
293 }
294 return
295 }
296 for _, p := range lPortNos {
297 // Check absence of flows in logical device for that port
298 for _, f := range ld.Flows.Items {
299 assert.NotEqual(t, p, fu.GetInPort(f))
300 }
301 // Check absence of flows in the parent device for that port
302 for _, d := range allDevices {
303 if d.ParentId == ld.Id {
304 for _, f := range d.Flows.Items {
305 assert.NotEqual(t, p, fu.GetTunnelId(f))
306 }
307 }
308 }
309 // TODO: check flows in child device. Not required for the use cases being tested
310 }
311
312}
313
314func installEapolFlows(stub voltha.VolthaServiceClient, lDevice *voltha.LogicalDevice, lPortNos ...uint32) error {
315 requestNum := 0
316 combineCh := make(chan interface{})
317 if len(lPortNos) > 0 {
318 fmt.Println("Installing EAPOL flows on ports:", lPortNos)
319 for _, p := range lPortNos {
320 for _, lport := range lDevice.Ports {
321 if !lport.RootPort && lport.OfpPort.PortNo == p {
322 go addEAPOLFlow(stub, lDevice, lport, combineCh)
323 requestNum += 1
324 }
325 }
326 }
327 } else {
328 fmt.Println("Installing EAPOL flows on logical device ", lDevice.Id)
329 for _, lport := range lDevice.Ports {
330 if !lport.RootPort {
331 go addEAPOLFlow(stub, lDevice, lport, combineCh)
332 requestNum += 1
333 }
334 }
335
336 }
337 receivedResponse := 0
338 var err error
339 for {
340 select {
341 case res, ok := <-combineCh:
342 receivedResponse += 1
343 if !ok {
344 } else if er, ok := res.(error); ok {
345 err = er
346 }
347 }
348 if receivedResponse == requestNum {
349 break
350 }
351 }
352 return err
353}
354
355func deleteAllFlows(stub voltha.VolthaServiceClient, lDevice *voltha.LogicalDevice) error {
356 fmt.Println("Deleting all flows for logical device:", lDevice.Id)
357 ui := uuid.New()
358 ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(volthaSerialNumberKey, ui.String()))
359 ch := make(chan interface{})
360 defer close(ch)
361 fa := &fu.FlowArgs{
362 KV: fu.OfpFlowModArgs{"table_id": uint64(ofp.OfpTable_OFPTT_ALL),
363 "cookie_mask": 0,
364 "out_port": uint64(ofp.OfpPortNo_OFPP_ANY),
365 "out_group": uint64(ofp.OfpGroup_OFPG_ANY),
366 },
367 }
368 cmd := ofp.OfpFlowModCommand_OFPFC_DELETE
369 fa.Command = &cmd
370 flowMod := fu.MkSimpleFlowMod(fu.ToOfpOxmField(fa.MatchFields), fa.Actions, fa.Command, fa.KV)
371 f := ofp.FlowTableUpdate{FlowMod: flowMod, Id: lDevice.Id}
372 _, err := stub.UpdateLogicalDeviceFlowTable(ctx, &f)
373 return err
374}
375
376func deleteEapolFlow(stub voltha.VolthaServiceClient, lDevice *voltha.LogicalDevice, lPortNo uint32) error {
377 fmt.Println("Deleting flows from port ", lPortNo, " of logical device ", lDevice.Id)
378 ui := uuid.New()
379 var fa *fu.FlowArgs
380 ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(volthaSerialNumberKey, ui.String()))
381 fa = &fu.FlowArgs{
382 KV: fu.OfpFlowModArgs{"priority": 2000},
383 MatchFields: []*ofp.OfpOxmOfbField{
384 fu.InPort(lPortNo),
385 fu.EthType(0x888e),
386 },
387 Actions: []*ofp.OfpAction{
388 fu.Output(uint32(ofp.OfpPortNo_OFPP_CONTROLLER)),
389 },
390 }
391 matchFields := make([]*ofp.OfpOxmField, 0)
392 for _, val := range fa.MatchFields {
393 matchFields = append(matchFields, &ofp.OfpOxmField{Field: &ofp.OfpOxmField_OfbField{OfbField: val}})
394 }
395 cmd := ofp.OfpFlowModCommand_OFPFC_DELETE
396 fa.Command = &cmd
397 f := ofp.FlowTableUpdate{FlowMod: makeSimpleFlowMod(fa), Id: lDevice.Id}
398 _, err := stub.UpdateLogicalDeviceFlowTable(ctx, &f)
399 return err
400}
401
402func runInstallEapolFlows(t *testing.T, stub voltha.VolthaServiceClient, lPortNos ...uint32) {
403 err := refreshLocalDeviceCache(stub)
404 assert.Nil(t, err)
405
406 for _, ld := range allLogicalDevices {
407 err = installEapolFlows(stub, ld, lPortNos...)
408 assert.Nil(t, err)
409 }
410
411 err = refreshLocalDeviceCache(stub)
412 assert.Nil(t, err)
413
414 for _, ld := range allLogicalDevices {
415 verifyEAPOLFlows(t, ld, lPortNos...)
416 }
417}
418
419func runDeleteAllFlows(t *testing.T, stub voltha.VolthaServiceClient) {
420 fmt.Println("Removing ALL flows ...")
421 err := refreshLocalDeviceCache(stub)
422 assert.Nil(t, err)
423
424 for _, ld := range allLogicalDevices {
425 err = deleteAllFlows(stub, ld)
426 assert.Nil(t, err)
427 }
428
429 err = refreshLocalDeviceCache(stub)
430 assert.Nil(t, err)
431
432 for _, ld := range allLogicalDevices {
433 verifyNOFlows(t, ld)
434 }
435}
436
437func runDeleteEapolFlows(t *testing.T, stub voltha.VolthaServiceClient, ld *voltha.LogicalDevice, lPortNos ...uint32) {
438 err := refreshLocalDeviceCache(stub)
439 assert.Nil(t, err)
440
441 if len(lPortNos) == 0 {
442 err = deleteAllFlows(stub, ld)
443 assert.Nil(t, err)
444 } else {
445 for _, lPortNo := range lPortNos {
446 err = deleteEapolFlow(stub, ld, lPortNo)
447 assert.Nil(t, err)
448 }
449 }
450
451 err = refreshLocalDeviceCache(stub)
452 assert.Nil(t, err)
453
454 for _, lde := range allLogicalDevices {
455 if lde.Id == ld.Id {
456 verifyNOFlows(t, lde, lPortNos...)
457 break
458 }
459 }
460}
461
462func createAndEnableDevices(t *testing.T) {
463 err := tu.SetAllLogLevel(stub, voltha.Logging{Level: common.LogLevel_WARNING})
464 assert.Nil(t, err)
465
466 err = tu.SetLogLevel(stub, voltha.Logging{Level: common.LogLevel_DEBUG, PackageName: "github.com/opencord/voltha-go/rw_core/core"})
467 assert.Nil(t, err)
468
469 startTime := time.Now()
470
471 //Pre-provision the parent device
472 oltDevice, err := tu.PreProvisionDevice(stub)
473 assert.Nil(t, err)
474
475 fmt.Println("Creation of ", NUM_OLTS, " OLT devices took:", time.Since(startTime))
476
477 startTime = time.Now()
478
479 //Enable all parent device - this will enable the child devices as well as validate the child devices
480 err = tu.EnableDevice(stub, oltDevice, NUM_ONUS_PER_OLT)
481 assert.Nil(t, err)
482
483 fmt.Println("Enabling of OLT device took:", time.Since(startTime))
484
485 // Wait until the core and adapters sync up after an enabled
486 time.Sleep(time.Duration(math.Max(10, float64(NUM_OLTS*NUM_ONUS_PER_OLT)/2)) * time.Second)
487
488 err = tu.VerifyDevices(stub, NUM_ONUS_PER_OLT)
489 assert.Nil(t, err)
490
491 lds, err := tu.VerifyLogicalDevices(stub, oltDevice, NUM_ONUS_PER_OLT)
492 assert.Nil(t, err)
493 assert.Equal(t, 1, len(lds.Items))
494}
495
496func TestFlowManagement(t *testing.T) {
497 //1. Test creation and activation of the devices. This will validate the devices as well as the logical device created/
498 createAndEnableDevices(t)
499
500 //2. Test installation of EAPOL flows
501 runInstallEapolFlows(t, stub)
502
503 //3. Test deletion of all EAPOL flows
504 runDeleteAllFlows(t, stub)
505
506 //4. Test installation of EAPOL flows on specific ports
507 runInstallEapolFlows(t, stub, 101, 102)
508
509 lds, err := tu.ListLogicalDevices(stub)
510 assert.Nil(t, err)
511
512 //5. Test deletion of EAPOL on a specific port for a given logical device
513 runDeleteEapolFlows(t, stub, lds.Items[0], 101)
514}
515
516func TestMain(m *testing.M) {
517 setup()
518 code := m.Run()
519 shutdown()
520 os.Exit(code)
521}