blob: 655e942e0bd2f789168cb334cf3bc0c4af90f437 [file] [log] [blame]
khenaidoo0458db62019-06-20 08:50:36 -04001// "build integration
2
3/*
4 * Copyright 2018-present Open Networking Foundation
5
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9
10 * http://www.apache.org/licenses/LICENSE-2.0
11
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
npujar12732342019-11-14 17:28:40 +053018
khenaidoo0458db62019-06-20 08:50:36 -040019package core
20
21import (
22 "context"
23 "fmt"
24 "os/exec"
25 "strings"
26 "time"
27
28 "github.com/golang/protobuf/ptypes/empty"
29 "github.com/google/uuid"
serkant.uluderya2ae470f2020-01-21 11:13:09 -080030 com "github.com/opencord/voltha-lib-go/v3/pkg/adapters/common"
31 "github.com/opencord/voltha-lib-go/v3/pkg/log"
32 "github.com/opencord/voltha-protos/v3/go/common"
33 ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
34 "github.com/opencord/voltha-protos/v3/go/voltha"
npujar12732342019-11-14 17:28:40 +053035
khenaidoo0458db62019-06-20 08:50:36 -040036 "google.golang.org/grpc"
37 "google.golang.org/grpc/codes"
38 "google.golang.org/grpc/metadata"
39 "google.golang.org/grpc/status"
40)
41
npujar12732342019-11-14 17:28:40 +053042// constant represents voltha serial number
khenaidoo0458db62019-06-20 08:50:36 -040043const (
npujar12732342019-11-14 17:28:40 +053044 VolthaSerialNumberKey = "voltha_serial_number"
khenaidoo0458db62019-06-20 08:50:36 -040045)
46
47func startKafka(composePath string) error {
48 fmt.Println("Starting Kafka and Etcd ...")
49 command := "docker-compose"
50 fileName := fmt.Sprintf("%s/docker-compose-zk-kafka-test.yml", composePath)
51 cmd := exec.Command(command, "-f", fileName, "up", "-d")
52 if err := cmd.Run(); err != nil {
53 return err
54 }
55 return nil
56}
57
58func startEtcd(composePath string) error {
59 fmt.Println("Starting Etcd ...")
60 command := "docker-compose"
61 fileName := fmt.Sprintf("%s/docker-compose-etcd.yml", composePath)
62 cmd := exec.Command(command, "-f", fileName, "up", "-d")
63 if err := cmd.Run(); err != nil {
64 return err
65 }
66 return nil
67}
68
69func stopKafka(composePath string) error {
70 fmt.Println("Stopping Kafka and Etcd ...")
71 command := "docker-compose"
72 fileName := fmt.Sprintf("%s/docker-compose-zk-kafka-test.yml", composePath)
73 cmd := exec.Command(command, "-f", fileName, "down")
74 if err := cmd.Run(); err != nil {
75 return err
76 }
77 return nil
78}
79
80func stopEtcd(composePath string) error {
81 fmt.Println("Stopping Etcd ...")
82 command := "docker-compose"
83 fileName := fmt.Sprintf("%s/docker-compose-etcd.yml", composePath)
84 cmd := exec.Command(command, "-f", fileName, "down")
85 if err := cmd.Run(); err != nil {
86 return err
87 }
88 return nil
89}
90
91func startCore(composePath string) error {
92 fmt.Println("Starting voltha core ...")
93 command := "docker-compose"
94 fileName := fmt.Sprintf("%s/rw_core.yml", composePath)
95 cmd := exec.Command(command, "-f", fileName, "up", "-d")
96 if err := cmd.Run(); err != nil {
97 return err
98 }
99 return nil
100}
101
102func stopCore(composePath string) error {
103 fmt.Println("Stopping voltha core ...")
104 command := "docker-compose"
105 fileName := fmt.Sprintf("%s/rw_core.yml", composePath)
106 cmd := exec.Command(command, "-f", fileName, "down")
107 if err := cmd.Run(); err != nil {
108 return err
109 }
110 return nil
111}
112
113func startSimulatedOLTAndONUAdapters(composePath string) error {
114 fmt.Println("Starting simulated OLT and ONU adapters ...")
115 command := "docker-compose"
116 fileName := fmt.Sprintf("%s/adapters-simulated.yml", composePath)
117 cmd := exec.Command(command, "-f", fileName, "up", "-d")
118 if err := cmd.Run(); err != nil {
119 return err
120 }
121 return nil
122}
123
124func stopSimulatedOLTAndONUAdapters(composePath string) error {
125 fmt.Println("Stopping simulated OLT and ONU adapters ...")
126 command := "docker-compose"
127 fileName := fmt.Sprintf("%s/adapters-simulated.yml", composePath)
128 cmd := exec.Command(command, "-f", fileName, "down")
129 if err := cmd.Run(); err != nil {
130 return err
131 }
132 return nil
133}
134
npujar12732342019-11-14 17:28:40 +0530135// ListLogicalDevices lists all logical devices managed by the Voltha cluster
khenaidoo0458db62019-06-20 08:50:36 -0400136func ListLogicalDevices(stub voltha.VolthaServiceClient) (*voltha.LogicalDevices, error) {
137 ui := uuid.New()
npujar12732342019-11-14 17:28:40 +0530138 ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(VolthaSerialNumberKey, ui.String()))
139 return stub.ListLogicalDevices(ctx, &empty.Empty{})
khenaidoo0458db62019-06-20 08:50:36 -0400140}
141
142func sendCreateDeviceRequest(ctx context.Context, stub voltha.VolthaServiceClient, device *voltha.Device, ch chan interface{}) {
143 if response, err := stub.CreateDevice(ctx, device); err != nil {
144 ch <- err
145 } else {
146 ch <- response
147 }
148}
149
150func sendListAdapters(ctx context.Context, stub voltha.VolthaServiceClient, ch chan interface{}) {
151 if response, err := stub.ListAdapters(ctx, &empty.Empty{}); err != nil {
152 ch <- err
153 } else {
154 ch <- response
155 }
156}
157
npujar12732342019-11-14 17:28:40 +0530158func sendEnableDeviceRequest(ctx context.Context, stub voltha.VolthaServiceClient, deviceID string, ch chan interface{}) {
159 if response, err := stub.EnableDevice(ctx, &common.ID{Id: deviceID}); err != nil {
khenaidoo0458db62019-06-20 08:50:36 -0400160 ch <- err
161 } else {
162 ch <- response
163 }
164}
165
166func getDevices(ctx context.Context, stub voltha.VolthaServiceClient) (*voltha.Devices, error) {
npujar12732342019-11-14 17:28:40 +0530167 return stub.ListDevices(ctx, &empty.Empty{})
khenaidoo0458db62019-06-20 08:50:36 -0400168}
169
170func getLogicalDevices(ctx context.Context, stub voltha.VolthaServiceClient) (*voltha.LogicalDevices, error) {
npujar12732342019-11-14 17:28:40 +0530171 return stub.ListLogicalDevices(ctx, &empty.Empty{})
khenaidoo0458db62019-06-20 08:50:36 -0400172}
173
npujar12732342019-11-14 17:28:40 +0530174// IsFlowPresent returns true if flow is present
khenaidoo0458db62019-06-20 08:50:36 -0400175func IsFlowPresent(lookingFor *voltha.OfpFlowStats, flows []*voltha.OfpFlowStats) bool {
176 for _, f := range flows {
177 if f.String() == lookingFor.String() {
178 return true
179 }
180 }
181 return false
182}
183
npujar12732342019-11-14 17:28:40 +0530184// ListDevices lists all physical devices controlled by the Voltha cluster
khenaidoo0458db62019-06-20 08:50:36 -0400185func ListDevices(stub voltha.VolthaServiceClient) (*voltha.Devices, error) {
186 ui := uuid.New()
npujar12732342019-11-14 17:28:40 +0530187 ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(VolthaSerialNumberKey, ui.String()))
188 return getDevices(ctx, stub)
khenaidoo0458db62019-06-20 08:50:36 -0400189}
190
191func sendFlow(ctx context.Context, stub voltha.VolthaServiceClient, flow *ofp.FlowTableUpdate, ch chan interface{}) {
192 if response, err := stub.UpdateLogicalDeviceFlowTable(ctx, flow); err != nil {
193 ch <- err
194 } else {
195 ch <- response
196 }
197}
198
npujar12732342019-11-14 17:28:40 +0530199// SetupGrpcConnectionToCore sets up client connection to an RPC server.
khenaidoo0458db62019-06-20 08:50:36 -0400200func SetupGrpcConnectionToCore(grpcHostIP string, grpcPort int) (voltha.VolthaServiceClient, error) {
201 grpcHost := fmt.Sprintf("%s:%d", grpcHostIP, grpcPort)
202 fmt.Println("Connecting to voltha using:", grpcHost)
203 conn, err := grpc.Dial(grpcHost, grpc.WithInsecure())
204 if err != nil {
205 return nil, err
206 }
207 return voltha.NewVolthaServiceClient(conn), nil
208}
209
npujar12732342019-11-14 17:28:40 +0530210// VerifyLogicalDevices verifies all logical devices managed by the Voltha cluster
khenaidoo0458db62019-06-20 08:50:36 -0400211func VerifyLogicalDevices(stub voltha.VolthaServiceClient, parentDevice *voltha.Device, numONUsPerOLT int) (*voltha.LogicalDevices, error) {
212 ui := uuid.New()
npujar12732342019-11-14 17:28:40 +0530213 ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(VolthaSerialNumberKey, ui.String()))
khenaidoo0458db62019-06-20 08:50:36 -0400214 retrievedLogicalDevices, err := getLogicalDevices(ctx, stub)
215 if err != nil {
216 return nil, err
217 }
218 if len(retrievedLogicalDevices.Items) != 1 {
219 return nil, status.Errorf(codes.Internal, "Logical device number incorrect. Expected:{%d}, Created:{%d}", 1, len(retrievedLogicalDevices.Items))
220 }
221
222 // Verify that each device has two ports
223 for _, ld := range retrievedLogicalDevices.Items {
224 if ld.Id == "" ||
225 ld.DatapathId == uint64(0) ||
226 ld.Desc.HwDesc != "simulated_pon" ||
227 ld.Desc.SwDesc != "simulated_pon" ||
228 ld.RootDeviceId == "" ||
229 ld.Desc.SerialNum == "" ||
230 ld.SwitchFeatures.NBuffers != uint32(256) ||
231 ld.SwitchFeatures.NTables != uint32(2) ||
232 ld.SwitchFeatures.Capabilities != uint32(15) ||
233 len(ld.Ports) != 1+numONUsPerOLT ||
234 ld.RootDeviceId != parentDevice.Id {
235 return nil, status.Errorf(codes.Internal, "incorrect logical device status:{%v}", ld)
236 }
237 for _, p := range ld.Ports {
238 if p.DevicePortNo != p.OfpPort.PortNo ||
239 p.OfpPort.State != uint32(4) {
240 return nil, status.Errorf(codes.Internal, "incorrect logical ports status:{%v}", p)
241 }
242 if strings.HasPrefix(p.Id, "nni") {
243 if !p.RootPort || fmt.Sprintf("nni-%d", p.DevicePortNo) != p.Id {
244 return nil, status.Errorf(codes.Internal, "incorrect nni port status:{%v}", p)
245 }
246 } else {
247 if p.RootPort || fmt.Sprintf("uni-%d", p.DevicePortNo) != p.Id {
248 return nil, status.Errorf(codes.Internal, "incorrect uni port status:{%v}", p)
249 }
250 }
251 }
252 }
253 return retrievedLogicalDevices, nil
254}
255
npujar12732342019-11-14 17:28:40 +0530256// VerifyDevices verifies all physical devices controlled by the Voltha cluster
khenaidoo0458db62019-06-20 08:50:36 -0400257func VerifyDevices(stub voltha.VolthaServiceClient, numONUsPerOLT int) error {
258 ui := uuid.New()
npujar12732342019-11-14 17:28:40 +0530259 ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(VolthaSerialNumberKey, ui.String()))
khenaidoo0458db62019-06-20 08:50:36 -0400260 retrievedDevices, err := getDevices(ctx, stub)
261 if err != nil {
262 return err
263 }
264 if len(retrievedDevices.Items) != 1+numONUsPerOLT {
265 return status.Errorf(codes.Internal, "Device number incorrect. Expected:{%d}, Created:{%d}", 1, len(retrievedDevices.Items))
266 }
267 // Verify that each device has two ports
268 for _, d := range retrievedDevices.Items {
269 if d.AdminState != voltha.AdminState_ENABLED ||
270 d.ConnectStatus != voltha.ConnectStatus_REACHABLE ||
271 d.OperStatus != voltha.OperStatus_ACTIVE ||
272 d.Type != d.Adapter ||
273 d.Id == "" ||
274 d.MacAddress == "" ||
275 d.SerialNumber == "" {
276 return status.Errorf(codes.Internal, "incorrect device state - %s", d.Id)
277 }
278
279 if d.Type == "simulated_olt" && (!d.Root || d.ProxyAddress != nil) {
280 return status.Errorf(codes.Internal, "invalid olt status:{%v}", d)
281 } else if d.Type == "simulated_onu" && (d.Root ||
282 d.Vlan == uint32(0) ||
283 d.ParentId == "" ||
284 d.ProxyAddress.DeviceId == "" ||
285 d.ProxyAddress.DeviceType != "simulated_olt") {
286 return status.Errorf(codes.Internal, "invalid onu status:{%s}", d.Id)
287 }
288
289 if len(d.Ports) != 2 {
290 return status.Errorf(codes.Internal, "invalid number of ports:{%s, %v}", d.Id, d.Ports)
291 }
292
293 for _, p := range d.Ports {
294 if p.AdminState != voltha.AdminState_ENABLED ||
295 p.OperStatus != voltha.OperStatus_ACTIVE {
296 return status.Errorf(codes.Internal, "invalid port state:{%s, %v}", d.Id, p)
297 }
298
299 if p.Type == voltha.Port_ETHERNET_NNI || p.Type == voltha.Port_ETHERNET_UNI {
300 if len(p.Peers) != 0 {
301 return status.Errorf(codes.Internal, "invalid length of peers:{%s, %d}", d.Id, p.Type)
302 }
303 } else if p.Type == voltha.Port_PON_OLT {
304 if len(p.Peers) != numONUsPerOLT ||
305 p.PortNo != uint32(1) {
306 return status.Errorf(codes.Internal, "invalid length of peers for PON OLT port:{%s, %v}", d.Id, p)
307 }
308 } else if p.Type == voltha.Port_PON_ONU {
309 if len(p.Peers) != 1 ||
310 p.PortNo != uint32(1) {
311 return status.Errorf(codes.Internal, "invalid length of peers for PON ONU port:{%s, %v}", d.Id, p)
312 }
313 }
314 }
315 }
316 return nil
317}
318
319func areAdaptersPresent(requiredAdapterNames []string, retrievedAdapters *voltha.Adapters) bool {
320 if len(requiredAdapterNames) == 0 {
321 return true
322 }
323 for _, nAName := range requiredAdapterNames {
324 found := false
325 for _, rA := range retrievedAdapters.Items {
326 if nAName == rA.Id {
327 found = true
328 break
329 }
330 }
331 if !found {
332 return false
333 }
334 }
335 return true
336}
337
npujar12732342019-11-14 17:28:40 +0530338// WaitForAdapterRegistration waits on channel for response of list adapters
khenaidoo0458db62019-06-20 08:50:36 -0400339func WaitForAdapterRegistration(stub voltha.VolthaServiceClient, requiredAdapterNames []string, timeout int) (*voltha.Adapters, error) {
340 fmt.Println("Waiting for adapter registration ...")
341 ui := uuid.New()
npujar12732342019-11-14 17:28:40 +0530342 ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(VolthaSerialNumberKey, ui.String()))
khenaidoo0458db62019-06-20 08:50:36 -0400343 ch := make(chan interface{})
344 defer close(ch)
345 for {
346 go sendListAdapters(ctx, stub, ch)
347 select {
348 case res, ok := <-ch:
349 if !ok {
350 return nil, status.Error(codes.Aborted, "channel closed")
351 } else if er, ok := res.(error); ok {
352 return nil, er
353 } else if a, ok := res.(*voltha.Adapters); ok {
354 if areAdaptersPresent(requiredAdapterNames, a) {
355 fmt.Println("All adapters registered:", a.Items)
356 return a, nil
357 }
358 }
359 case <-time.After(time.Duration(timeout) * time.Second):
360 return nil, status.Error(codes.Aborted, "timeout while waiting for adapter registration")
361 }
362 time.Sleep(1 * time.Second)
363 }
364}
365
npujar12732342019-11-14 17:28:40 +0530366// PreProvisionDevice pre-provisions a new physical device
khenaidoo0458db62019-06-20 08:50:36 -0400367func PreProvisionDevice(stub voltha.VolthaServiceClient) (*voltha.Device, error) {
368 ui := uuid.New()
npujar12732342019-11-14 17:28:40 +0530369 ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(VolthaSerialNumberKey, ui.String()))
khenaidoo0458db62019-06-20 08:50:36 -0400370 randomMacAddress := strings.ToUpper(com.GetRandomMacAddress())
371 device := &voltha.Device{Type: "simulated_olt", MacAddress: randomMacAddress}
372 ch := make(chan interface{})
373 defer close(ch)
374 go sendCreateDeviceRequest(ctx, stub, device, ch)
375 res, ok := <-ch
376 if !ok {
377 return nil, status.Error(codes.Aborted, "channel closed")
378 } else if er, ok := res.(error); ok {
379 return nil, er
380 } else if d, ok := res.(*voltha.Device); ok {
381 return d, nil
382 }
383 return nil, status.Errorf(codes.Unknown, "cannot provision device:{%v}", device)
384}
385
npujar12732342019-11-14 17:28:40 +0530386// EnableDevice - if the device was in pre-provisioned state then it
387// will transition to ENABLED state. If it was is DISABLED state then it
388// will transition to ENABLED state as well.
khenaidoo0458db62019-06-20 08:50:36 -0400389func EnableDevice(stub voltha.VolthaServiceClient, device *voltha.Device, numONUs int) error {
390 if device.AdminState == voltha.AdminState_PREPROVISIONED {
391 ui := uuid.New()
npujar12732342019-11-14 17:28:40 +0530392 ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(VolthaSerialNumberKey, ui.String()))
khenaidoo0458db62019-06-20 08:50:36 -0400393 ch := make(chan interface{})
394 defer close(ch)
395 go sendEnableDeviceRequest(ctx, stub, device.Id, ch)
396 res, ok := <-ch
397 if !ok {
398 return status.Error(codes.Aborted, "channel closed")
399 } else if er, ok := res.(error); ok {
400 return er
401 } else if _, ok := res.(*empty.Empty); ok {
402 return nil
403 }
404 }
405 return status.Errorf(codes.Unknown, "cannot enable device:{%s}", device.Id)
406}
407
npujar12732342019-11-14 17:28:40 +0530408// UpdateFlow updates flow table for logical device
khenaidoo0458db62019-06-20 08:50:36 -0400409func UpdateFlow(stub voltha.VolthaServiceClient, flow *ofp.FlowTableUpdate) error {
410 ui := uuid.New()
npujar12732342019-11-14 17:28:40 +0530411 ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(VolthaSerialNumberKey, ui.String()))
khenaidoo0458db62019-06-20 08:50:36 -0400412 ch := make(chan interface{})
413 defer close(ch)
414 go sendFlow(ctx, stub, flow, ch)
415 res, ok := <-ch
416 if !ok {
417 return status.Error(codes.Aborted, "channel closed")
418 } else if er, ok := res.(error); ok {
419 return er
420 } else if _, ok := res.(*empty.Empty); ok {
421 return nil
422 }
423 return status.Errorf(codes.Unknown, "cannot add flow:{%v}", flow)
424}
425
npujar12732342019-11-14 17:28:40 +0530426// StartSimulatedEnv starts kafka, etcd, olt, onu adapters, core
khenaidoo0458db62019-06-20 08:50:36 -0400427func StartSimulatedEnv(composePath string) error {
428 fmt.Println("Starting simulated environment ...")
429 // Start kafka and Etcd
430 if err := startKafka(composePath); err != nil {
431 return err
432 }
433 if err := startEtcd(composePath); err != nil {
434 return err
435 }
436 time.Sleep(5 * time.Second)
437
438 //Start the simulated adapters
439 if err := startSimulatedOLTAndONUAdapters(composePath); err != nil {
440 return err
441 }
442
443 //Start the core
444 if err := startCore(composePath); err != nil {
445 return err
446 }
447
448 time.Sleep(10 * time.Second)
449
450 fmt.Println("Simulated environment started.")
451 return nil
452}
453
npujar12732342019-11-14 17:28:40 +0530454// StopSimulatedEnv stops kafka, etcd, olt, onu adapters, core
khenaidoo0458db62019-06-20 08:50:36 -0400455func StopSimulatedEnv(composePath string) error {
npujar12732342019-11-14 17:28:40 +0530456 err := stopSimulatedOLTAndONUAdapters(composePath)
457 if err != nil {
458 log.Errorw("unable-to-stop-simulated-olt-onu-adapters", log.Fields{"error": err})
459 }
460 err = stopCore(composePath)
461 if err != nil {
462 log.Errorw("unable-to-stop-core", log.Fields{"error": err})
463 }
464 err = stopKafka(composePath)
465 if err != nil {
466 log.Errorw("unable-to-stop-kafka", log.Fields{"error": err})
467 }
468 err = stopEtcd(composePath)
469 if err != nil {
470 log.Errorw("unable-to-stop-etcd", log.Fields{"error": err})
471 }
khenaidoo0458db62019-06-20 08:50:36 -0400472 return nil
473}