blob: c461f7e2c115664dd548fcbee5fc7c5f86d2af85 [file] [log] [blame]
khenaidoo0458db62019-06-20 08:50:36 -04001// "build integration
2
3/*
4 * Copyright 2018-present Open Networking Foundation
5
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9
10 * http://www.apache.org/licenses/LICENSE-2.0
11
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
npujar12732342019-11-14 17:28:40 +053018
khenaidoo0458db62019-06-20 08:50:36 -040019package core
20
21import (
22 "context"
23 "fmt"
24 "os/exec"
25 "strings"
26 "time"
27
28 "github.com/golang/protobuf/ptypes/empty"
29 "github.com/google/uuid"
Scott Baker807addd2019-10-24 15:16:21 -070030 com "github.com/opencord/voltha-lib-go/v2/pkg/adapters/common"
npujar12732342019-11-14 17:28:40 +053031 "github.com/opencord/voltha-lib-go/v2/pkg/log"
Scott Baker555307d2019-11-04 08:58:01 -080032 "github.com/opencord/voltha-protos/v2/go/common"
33 ofp "github.com/opencord/voltha-protos/v2/go/openflow_13"
34 "github.com/opencord/voltha-protos/v2/go/voltha"
npujar12732342019-11-14 17:28:40 +053035
khenaidoo0458db62019-06-20 08:50:36 -040036 "google.golang.org/grpc"
37 "google.golang.org/grpc/codes"
38 "google.golang.org/grpc/metadata"
39 "google.golang.org/grpc/status"
40)
41
npujar12732342019-11-14 17:28:40 +053042// constant represents voltha serial number
khenaidoo0458db62019-06-20 08:50:36 -040043const (
npujar12732342019-11-14 17:28:40 +053044 VolthaSerialNumberKey = "voltha_serial_number"
khenaidoo0458db62019-06-20 08:50:36 -040045)
46
47func startKafka(composePath string) error {
48 fmt.Println("Starting Kafka and Etcd ...")
49 command := "docker-compose"
50 fileName := fmt.Sprintf("%s/docker-compose-zk-kafka-test.yml", composePath)
51 cmd := exec.Command(command, "-f", fileName, "up", "-d")
52 if err := cmd.Run(); err != nil {
53 return err
54 }
55 return nil
56}
57
58func startEtcd(composePath string) error {
59 fmt.Println("Starting Etcd ...")
60 command := "docker-compose"
61 fileName := fmt.Sprintf("%s/docker-compose-etcd.yml", composePath)
62 cmd := exec.Command(command, "-f", fileName, "up", "-d")
63 if err := cmd.Run(); err != nil {
64 return err
65 }
66 return nil
67}
68
69func stopKafka(composePath string) error {
70 fmt.Println("Stopping Kafka and Etcd ...")
71 command := "docker-compose"
72 fileName := fmt.Sprintf("%s/docker-compose-zk-kafka-test.yml", composePath)
73 cmd := exec.Command(command, "-f", fileName, "down")
74 if err := cmd.Run(); err != nil {
75 return err
76 }
77 return nil
78}
79
80func stopEtcd(composePath string) error {
81 fmt.Println("Stopping Etcd ...")
82 command := "docker-compose"
83 fileName := fmt.Sprintf("%s/docker-compose-etcd.yml", composePath)
84 cmd := exec.Command(command, "-f", fileName, "down")
85 if err := cmd.Run(); err != nil {
86 return err
87 }
88 return nil
89}
90
91func startCore(composePath string) error {
92 fmt.Println("Starting voltha core ...")
93 command := "docker-compose"
94 fileName := fmt.Sprintf("%s/rw_core.yml", composePath)
95 cmd := exec.Command(command, "-f", fileName, "up", "-d")
96 if err := cmd.Run(); err != nil {
97 return err
98 }
99 return nil
100}
101
102func stopCore(composePath string) error {
103 fmt.Println("Stopping voltha core ...")
104 command := "docker-compose"
105 fileName := fmt.Sprintf("%s/rw_core.yml", composePath)
106 cmd := exec.Command(command, "-f", fileName, "down")
107 if err := cmd.Run(); err != nil {
108 return err
109 }
110 return nil
111}
112
113func startSimulatedOLTAndONUAdapters(composePath string) error {
114 fmt.Println("Starting simulated OLT and ONU adapters ...")
115 command := "docker-compose"
116 fileName := fmt.Sprintf("%s/adapters-simulated.yml", composePath)
117 cmd := exec.Command(command, "-f", fileName, "up", "-d")
118 if err := cmd.Run(); err != nil {
119 return err
120 }
121 return nil
122}
123
124func stopSimulatedOLTAndONUAdapters(composePath string) error {
125 fmt.Println("Stopping simulated OLT and ONU adapters ...")
126 command := "docker-compose"
127 fileName := fmt.Sprintf("%s/adapters-simulated.yml", composePath)
128 cmd := exec.Command(command, "-f", fileName, "down")
129 if err := cmd.Run(); err != nil {
130 return err
131 }
132 return nil
133}
134
npujar12732342019-11-14 17:28:40 +0530135// ListLogicalDevices lists all logical devices managed by the Voltha cluster
khenaidoo0458db62019-06-20 08:50:36 -0400136func ListLogicalDevices(stub voltha.VolthaServiceClient) (*voltha.LogicalDevices, error) {
137 ui := uuid.New()
npujar12732342019-11-14 17:28:40 +0530138 ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(VolthaSerialNumberKey, ui.String()))
139 return stub.ListLogicalDevices(ctx, &empty.Empty{})
khenaidoo0458db62019-06-20 08:50:36 -0400140}
141
142func sendCreateDeviceRequest(ctx context.Context, stub voltha.VolthaServiceClient, device *voltha.Device, ch chan interface{}) {
143 if response, err := stub.CreateDevice(ctx, device); err != nil {
144 ch <- err
145 } else {
146 ch <- response
147 }
148}
149
150func sendListAdapters(ctx context.Context, stub voltha.VolthaServiceClient, ch chan interface{}) {
151 if response, err := stub.ListAdapters(ctx, &empty.Empty{}); err != nil {
152 ch <- err
153 } else {
154 ch <- response
155 }
156}
157
npujar12732342019-11-14 17:28:40 +0530158func sendEnableDeviceRequest(ctx context.Context, stub voltha.VolthaServiceClient, deviceID string, ch chan interface{}) {
159 if response, err := stub.EnableDevice(ctx, &common.ID{Id: deviceID}); err != nil {
khenaidoo0458db62019-06-20 08:50:36 -0400160 ch <- err
161 } else {
162 ch <- response
163 }
164}
165
166func getDevices(ctx context.Context, stub voltha.VolthaServiceClient) (*voltha.Devices, error) {
npujar12732342019-11-14 17:28:40 +0530167 return stub.ListDevices(ctx, &empty.Empty{})
khenaidoo0458db62019-06-20 08:50:36 -0400168}
169
170func getLogicalDevices(ctx context.Context, stub voltha.VolthaServiceClient) (*voltha.LogicalDevices, error) {
npujar12732342019-11-14 17:28:40 +0530171 return stub.ListLogicalDevices(ctx, &empty.Empty{})
khenaidoo0458db62019-06-20 08:50:36 -0400172}
173
npujar12732342019-11-14 17:28:40 +0530174// IsFlowPresent returns true if flow is present
khenaidoo0458db62019-06-20 08:50:36 -0400175func IsFlowPresent(lookingFor *voltha.OfpFlowStats, flows []*voltha.OfpFlowStats) bool {
176 for _, f := range flows {
177 if f.String() == lookingFor.String() {
178 return true
179 }
180 }
181 return false
182}
183
npujar12732342019-11-14 17:28:40 +0530184// ListDevices lists all physical devices controlled by the Voltha cluster
khenaidoo0458db62019-06-20 08:50:36 -0400185func ListDevices(stub voltha.VolthaServiceClient) (*voltha.Devices, error) {
186 ui := uuid.New()
npujar12732342019-11-14 17:28:40 +0530187 ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(VolthaSerialNumberKey, ui.String()))
188 return getDevices(ctx, stub)
khenaidoo0458db62019-06-20 08:50:36 -0400189}
190
191func sendFlow(ctx context.Context, stub voltha.VolthaServiceClient, flow *ofp.FlowTableUpdate, ch chan interface{}) {
192 if response, err := stub.UpdateLogicalDeviceFlowTable(ctx, flow); err != nil {
193 ch <- err
194 } else {
195 ch <- response
196 }
197}
198
npujar12732342019-11-14 17:28:40 +0530199// SetLogLevel sets log level to the given level
khenaidoo0458db62019-06-20 08:50:36 -0400200func SetLogLevel(stub voltha.VolthaServiceClient, l voltha.Logging) error {
201 ui := uuid.New()
npujar12732342019-11-14 17:28:40 +0530202 ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(VolthaSerialNumberKey, ui.String()))
khenaidoo0458db62019-06-20 08:50:36 -0400203 _, err := stub.UpdateLogLevel(ctx, &l)
204 return err
205}
206
npujar12732342019-11-14 17:28:40 +0530207// SetAllLogLevel sets log level of all service to the given level
khenaidoo0458db62019-06-20 08:50:36 -0400208func SetAllLogLevel(stub voltha.VolthaServiceClient, l voltha.Logging) error {
209 ui := uuid.New()
npujar12732342019-11-14 17:28:40 +0530210 ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(VolthaSerialNumberKey, ui.String()))
khenaidoo0458db62019-06-20 08:50:36 -0400211 _, err := stub.UpdateLogLevel(ctx, &l)
212 return err
213}
214
npujar12732342019-11-14 17:28:40 +0530215// SetupGrpcConnectionToCore sets up client connection to an RPC server.
khenaidoo0458db62019-06-20 08:50:36 -0400216func SetupGrpcConnectionToCore(grpcHostIP string, grpcPort int) (voltha.VolthaServiceClient, error) {
217 grpcHost := fmt.Sprintf("%s:%d", grpcHostIP, grpcPort)
218 fmt.Println("Connecting to voltha using:", grpcHost)
219 conn, err := grpc.Dial(grpcHost, grpc.WithInsecure())
220 if err != nil {
221 return nil, err
222 }
223 return voltha.NewVolthaServiceClient(conn), nil
224}
225
npujar12732342019-11-14 17:28:40 +0530226// VerifyLogicalDevices verifies all logical devices managed by the Voltha cluster
khenaidoo0458db62019-06-20 08:50:36 -0400227func VerifyLogicalDevices(stub voltha.VolthaServiceClient, parentDevice *voltha.Device, numONUsPerOLT int) (*voltha.LogicalDevices, error) {
228 ui := uuid.New()
npujar12732342019-11-14 17:28:40 +0530229 ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(VolthaSerialNumberKey, ui.String()))
khenaidoo0458db62019-06-20 08:50:36 -0400230 retrievedLogicalDevices, err := getLogicalDevices(ctx, stub)
231 if err != nil {
232 return nil, err
233 }
234 if len(retrievedLogicalDevices.Items) != 1 {
235 return nil, status.Errorf(codes.Internal, "Logical device number incorrect. Expected:{%d}, Created:{%d}", 1, len(retrievedLogicalDevices.Items))
236 }
237
238 // Verify that each device has two ports
239 for _, ld := range retrievedLogicalDevices.Items {
240 if ld.Id == "" ||
241 ld.DatapathId == uint64(0) ||
242 ld.Desc.HwDesc != "simulated_pon" ||
243 ld.Desc.SwDesc != "simulated_pon" ||
244 ld.RootDeviceId == "" ||
245 ld.Desc.SerialNum == "" ||
246 ld.SwitchFeatures.NBuffers != uint32(256) ||
247 ld.SwitchFeatures.NTables != uint32(2) ||
248 ld.SwitchFeatures.Capabilities != uint32(15) ||
249 len(ld.Ports) != 1+numONUsPerOLT ||
250 ld.RootDeviceId != parentDevice.Id {
251 return nil, status.Errorf(codes.Internal, "incorrect logical device status:{%v}", ld)
252 }
253 for _, p := range ld.Ports {
254 if p.DevicePortNo != p.OfpPort.PortNo ||
255 p.OfpPort.State != uint32(4) {
256 return nil, status.Errorf(codes.Internal, "incorrect logical ports status:{%v}", p)
257 }
258 if strings.HasPrefix(p.Id, "nni") {
259 if !p.RootPort || fmt.Sprintf("nni-%d", p.DevicePortNo) != p.Id {
260 return nil, status.Errorf(codes.Internal, "incorrect nni port status:{%v}", p)
261 }
262 } else {
263 if p.RootPort || fmt.Sprintf("uni-%d", p.DevicePortNo) != p.Id {
264 return nil, status.Errorf(codes.Internal, "incorrect uni port status:{%v}", p)
265 }
266 }
267 }
268 }
269 return retrievedLogicalDevices, nil
270}
271
npujar12732342019-11-14 17:28:40 +0530272// VerifyDevices verifies all physical devices controlled by the Voltha cluster
khenaidoo0458db62019-06-20 08:50:36 -0400273func VerifyDevices(stub voltha.VolthaServiceClient, numONUsPerOLT int) error {
274 ui := uuid.New()
npujar12732342019-11-14 17:28:40 +0530275 ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(VolthaSerialNumberKey, ui.String()))
khenaidoo0458db62019-06-20 08:50:36 -0400276 retrievedDevices, err := getDevices(ctx, stub)
277 if err != nil {
278 return err
279 }
280 if len(retrievedDevices.Items) != 1+numONUsPerOLT {
281 return status.Errorf(codes.Internal, "Device number incorrect. Expected:{%d}, Created:{%d}", 1, len(retrievedDevices.Items))
282 }
283 // Verify that each device has two ports
284 for _, d := range retrievedDevices.Items {
285 if d.AdminState != voltha.AdminState_ENABLED ||
286 d.ConnectStatus != voltha.ConnectStatus_REACHABLE ||
287 d.OperStatus != voltha.OperStatus_ACTIVE ||
288 d.Type != d.Adapter ||
289 d.Id == "" ||
290 d.MacAddress == "" ||
291 d.SerialNumber == "" {
292 return status.Errorf(codes.Internal, "incorrect device state - %s", d.Id)
293 }
294
295 if d.Type == "simulated_olt" && (!d.Root || d.ProxyAddress != nil) {
296 return status.Errorf(codes.Internal, "invalid olt status:{%v}", d)
297 } else if d.Type == "simulated_onu" && (d.Root ||
298 d.Vlan == uint32(0) ||
299 d.ParentId == "" ||
300 d.ProxyAddress.DeviceId == "" ||
301 d.ProxyAddress.DeviceType != "simulated_olt") {
302 return status.Errorf(codes.Internal, "invalid onu status:{%s}", d.Id)
303 }
304
305 if len(d.Ports) != 2 {
306 return status.Errorf(codes.Internal, "invalid number of ports:{%s, %v}", d.Id, d.Ports)
307 }
308
309 for _, p := range d.Ports {
310 if p.AdminState != voltha.AdminState_ENABLED ||
311 p.OperStatus != voltha.OperStatus_ACTIVE {
312 return status.Errorf(codes.Internal, "invalid port state:{%s, %v}", d.Id, p)
313 }
314
315 if p.Type == voltha.Port_ETHERNET_NNI || p.Type == voltha.Port_ETHERNET_UNI {
316 if len(p.Peers) != 0 {
317 return status.Errorf(codes.Internal, "invalid length of peers:{%s, %d}", d.Id, p.Type)
318 }
319 } else if p.Type == voltha.Port_PON_OLT {
320 if len(p.Peers) != numONUsPerOLT ||
321 p.PortNo != uint32(1) {
322 return status.Errorf(codes.Internal, "invalid length of peers for PON OLT port:{%s, %v}", d.Id, p)
323 }
324 } else if p.Type == voltha.Port_PON_ONU {
325 if len(p.Peers) != 1 ||
326 p.PortNo != uint32(1) {
327 return status.Errorf(codes.Internal, "invalid length of peers for PON ONU port:{%s, %v}", d.Id, p)
328 }
329 }
330 }
331 }
332 return nil
333}
334
335func areAdaptersPresent(requiredAdapterNames []string, retrievedAdapters *voltha.Adapters) bool {
336 if len(requiredAdapterNames) == 0 {
337 return true
338 }
339 for _, nAName := range requiredAdapterNames {
340 found := false
341 for _, rA := range retrievedAdapters.Items {
342 if nAName == rA.Id {
343 found = true
344 break
345 }
346 }
347 if !found {
348 return false
349 }
350 }
351 return true
352}
353
npujar12732342019-11-14 17:28:40 +0530354// WaitForAdapterRegistration waits on channel for response of list adapters
khenaidoo0458db62019-06-20 08:50:36 -0400355func WaitForAdapterRegistration(stub voltha.VolthaServiceClient, requiredAdapterNames []string, timeout int) (*voltha.Adapters, error) {
356 fmt.Println("Waiting for adapter registration ...")
357 ui := uuid.New()
npujar12732342019-11-14 17:28:40 +0530358 ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(VolthaSerialNumberKey, ui.String()))
khenaidoo0458db62019-06-20 08:50:36 -0400359 ch := make(chan interface{})
360 defer close(ch)
361 for {
362 go sendListAdapters(ctx, stub, ch)
363 select {
364 case res, ok := <-ch:
365 if !ok {
366 return nil, status.Error(codes.Aborted, "channel closed")
367 } else if er, ok := res.(error); ok {
368 return nil, er
369 } else if a, ok := res.(*voltha.Adapters); ok {
370 if areAdaptersPresent(requiredAdapterNames, a) {
371 fmt.Println("All adapters registered:", a.Items)
372 return a, nil
373 }
374 }
375 case <-time.After(time.Duration(timeout) * time.Second):
376 return nil, status.Error(codes.Aborted, "timeout while waiting for adapter registration")
377 }
378 time.Sleep(1 * time.Second)
379 }
380}
381
npujar12732342019-11-14 17:28:40 +0530382// PreProvisionDevice pre-provisions a new physical device
khenaidoo0458db62019-06-20 08:50:36 -0400383func PreProvisionDevice(stub voltha.VolthaServiceClient) (*voltha.Device, error) {
384 ui := uuid.New()
npujar12732342019-11-14 17:28:40 +0530385 ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(VolthaSerialNumberKey, ui.String()))
khenaidoo0458db62019-06-20 08:50:36 -0400386 randomMacAddress := strings.ToUpper(com.GetRandomMacAddress())
387 device := &voltha.Device{Type: "simulated_olt", MacAddress: randomMacAddress}
388 ch := make(chan interface{})
389 defer close(ch)
390 go sendCreateDeviceRequest(ctx, stub, device, ch)
391 res, ok := <-ch
392 if !ok {
393 return nil, status.Error(codes.Aborted, "channel closed")
394 } else if er, ok := res.(error); ok {
395 return nil, er
396 } else if d, ok := res.(*voltha.Device); ok {
397 return d, nil
398 }
399 return nil, status.Errorf(codes.Unknown, "cannot provision device:{%v}", device)
400}
401
npujar12732342019-11-14 17:28:40 +0530402// EnableDevice - if the device was in pre-provisioned state then it
403// will transition to ENABLED state. If it was is DISABLED state then it
404// will transition to ENABLED state as well.
khenaidoo0458db62019-06-20 08:50:36 -0400405func EnableDevice(stub voltha.VolthaServiceClient, device *voltha.Device, numONUs int) error {
406 if device.AdminState == voltha.AdminState_PREPROVISIONED {
407 ui := uuid.New()
npujar12732342019-11-14 17:28:40 +0530408 ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(VolthaSerialNumberKey, ui.String()))
khenaidoo0458db62019-06-20 08:50:36 -0400409 ch := make(chan interface{})
410 defer close(ch)
411 go sendEnableDeviceRequest(ctx, stub, device.Id, ch)
412 res, ok := <-ch
413 if !ok {
414 return status.Error(codes.Aborted, "channel closed")
415 } else if er, ok := res.(error); ok {
416 return er
417 } else if _, ok := res.(*empty.Empty); ok {
418 return nil
419 }
420 }
421 return status.Errorf(codes.Unknown, "cannot enable device:{%s}", device.Id)
422}
423
npujar12732342019-11-14 17:28:40 +0530424// UpdateFlow updates flow table for logical device
khenaidoo0458db62019-06-20 08:50:36 -0400425func UpdateFlow(stub voltha.VolthaServiceClient, flow *ofp.FlowTableUpdate) error {
426 ui := uuid.New()
npujar12732342019-11-14 17:28:40 +0530427 ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(VolthaSerialNumberKey, ui.String()))
khenaidoo0458db62019-06-20 08:50:36 -0400428 ch := make(chan interface{})
429 defer close(ch)
430 go sendFlow(ctx, stub, flow, ch)
431 res, ok := <-ch
432 if !ok {
433 return status.Error(codes.Aborted, "channel closed")
434 } else if er, ok := res.(error); ok {
435 return er
436 } else if _, ok := res.(*empty.Empty); ok {
437 return nil
438 }
439 return status.Errorf(codes.Unknown, "cannot add flow:{%v}", flow)
440}
441
npujar12732342019-11-14 17:28:40 +0530442// StartSimulatedEnv starts kafka, etcd, olt, onu adapters, core
khenaidoo0458db62019-06-20 08:50:36 -0400443func StartSimulatedEnv(composePath string) error {
444 fmt.Println("Starting simulated environment ...")
445 // Start kafka and Etcd
446 if err := startKafka(composePath); err != nil {
447 return err
448 }
449 if err := startEtcd(composePath); err != nil {
450 return err
451 }
452 time.Sleep(5 * time.Second)
453
454 //Start the simulated adapters
455 if err := startSimulatedOLTAndONUAdapters(composePath); err != nil {
456 return err
457 }
458
459 //Start the core
460 if err := startCore(composePath); err != nil {
461 return err
462 }
463
464 time.Sleep(10 * time.Second)
465
466 fmt.Println("Simulated environment started.")
467 return nil
468}
469
npujar12732342019-11-14 17:28:40 +0530470// StopSimulatedEnv stops kafka, etcd, olt, onu adapters, core
khenaidoo0458db62019-06-20 08:50:36 -0400471func StopSimulatedEnv(composePath string) error {
npujar12732342019-11-14 17:28:40 +0530472 err := stopSimulatedOLTAndONUAdapters(composePath)
473 if err != nil {
474 log.Errorw("unable-to-stop-simulated-olt-onu-adapters", log.Fields{"error": err})
475 }
476 err = stopCore(composePath)
477 if err != nil {
478 log.Errorw("unable-to-stop-core", log.Fields{"error": err})
479 }
480 err = stopKafka(composePath)
481 if err != nil {
482 log.Errorw("unable-to-stop-kafka", log.Fields{"error": err})
483 }
484 err = stopEtcd(composePath)
485 if err != nil {
486 log.Errorw("unable-to-stop-etcd", log.Fields{"error": err})
487 }
khenaidoo0458db62019-06-20 08:50:36 -0400488 return nil
489}