blob: a0ebe56faf99dca8d9db2efb36c5f8dd099786aa [file] [log] [blame]
khenaidoo0458db62019-06-20 08:50:36 -04001// "build integration
2
3/*
4 * Copyright 2018-present Open Networking Foundation
5
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9
10 * http://www.apache.org/licenses/LICENSE-2.0
11
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18package core
19
20import (
21 "context"
22 "fmt"
23 "os/exec"
24 "strings"
25 "time"
26
27 "github.com/golang/protobuf/ptypes/empty"
28 "github.com/google/uuid"
Scott Baker807addd2019-10-24 15:16:21 -070029 com "github.com/opencord/voltha-lib-go/v2/pkg/adapters/common"
Scott Baker555307d2019-11-04 08:58:01 -080030 "github.com/opencord/voltha-protos/v2/go/common"
31 ofp "github.com/opencord/voltha-protos/v2/go/openflow_13"
32 "github.com/opencord/voltha-protos/v2/go/voltha"
khenaidoo0458db62019-06-20 08:50:36 -040033 "google.golang.org/grpc"
34 "google.golang.org/grpc/codes"
35 "google.golang.org/grpc/metadata"
36 "google.golang.org/grpc/status"
37)
38
39const (
40 VOLTHA_SERIAL_NUMBER_KEY = "voltha_serial_number"
41)
42
43func startKafka(composePath string) error {
44 fmt.Println("Starting Kafka and Etcd ...")
45 command := "docker-compose"
46 fileName := fmt.Sprintf("%s/docker-compose-zk-kafka-test.yml", composePath)
47 cmd := exec.Command(command, "-f", fileName, "up", "-d")
48 if err := cmd.Run(); err != nil {
49 return err
50 }
51 return nil
52}
53
54func startEtcd(composePath string) error {
55 fmt.Println("Starting Etcd ...")
56 command := "docker-compose"
57 fileName := fmt.Sprintf("%s/docker-compose-etcd.yml", composePath)
58 cmd := exec.Command(command, "-f", fileName, "up", "-d")
59 if err := cmd.Run(); err != nil {
60 return err
61 }
62 return nil
63}
64
65func stopKafka(composePath string) error {
66 fmt.Println("Stopping Kafka and Etcd ...")
67 command := "docker-compose"
68 fileName := fmt.Sprintf("%s/docker-compose-zk-kafka-test.yml", composePath)
69 cmd := exec.Command(command, "-f", fileName, "down")
70 if err := cmd.Run(); err != nil {
71 return err
72 }
73 return nil
74}
75
76func stopEtcd(composePath string) error {
77 fmt.Println("Stopping Etcd ...")
78 command := "docker-compose"
79 fileName := fmt.Sprintf("%s/docker-compose-etcd.yml", composePath)
80 cmd := exec.Command(command, "-f", fileName, "down")
81 if err := cmd.Run(); err != nil {
82 return err
83 }
84 return nil
85}
86
87func startCore(composePath string) error {
88 fmt.Println("Starting voltha core ...")
89 command := "docker-compose"
90 fileName := fmt.Sprintf("%s/rw_core.yml", composePath)
91 cmd := exec.Command(command, "-f", fileName, "up", "-d")
92 if err := cmd.Run(); err != nil {
93 return err
94 }
95 return nil
96}
97
98func stopCore(composePath string) error {
99 fmt.Println("Stopping voltha core ...")
100 command := "docker-compose"
101 fileName := fmt.Sprintf("%s/rw_core.yml", composePath)
102 cmd := exec.Command(command, "-f", fileName, "down")
103 if err := cmd.Run(); err != nil {
104 return err
105 }
106 return nil
107}
108
109func startSimulatedOLTAndONUAdapters(composePath string) error {
110 fmt.Println("Starting simulated OLT and ONU adapters ...")
111 command := "docker-compose"
112 fileName := fmt.Sprintf("%s/adapters-simulated.yml", composePath)
113 cmd := exec.Command(command, "-f", fileName, "up", "-d")
114 if err := cmd.Run(); err != nil {
115 return err
116 }
117 return nil
118}
119
120func stopSimulatedOLTAndONUAdapters(composePath string) error {
121 fmt.Println("Stopping simulated OLT and ONU adapters ...")
122 command := "docker-compose"
123 fileName := fmt.Sprintf("%s/adapters-simulated.yml", composePath)
124 cmd := exec.Command(command, "-f", fileName, "down")
125 if err := cmd.Run(); err != nil {
126 return err
127 }
128 return nil
129}
130
131func ListLogicalDevices(stub voltha.VolthaServiceClient) (*voltha.LogicalDevices, error) {
132 ui := uuid.New()
133 ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(VOLTHA_SERIAL_NUMBER_KEY, ui.String()))
134 if response, err := stub.ListLogicalDevices(ctx, &empty.Empty{}); err != nil {
135 return nil, err
136 } else {
137 return response, nil
138 }
139}
140
141func getNumUniPort(ld *voltha.LogicalDevice) int {
142 num := 0
143 for _, port := range ld.Ports {
144 if !port.RootPort {
145 num += 1
146 }
147 }
148 return num
149}
150
151func sendCreateDeviceRequest(ctx context.Context, stub voltha.VolthaServiceClient, device *voltha.Device, ch chan interface{}) {
152 if response, err := stub.CreateDevice(ctx, device); err != nil {
153 ch <- err
154 } else {
155 ch <- response
156 }
157}
158
159func sendListAdapters(ctx context.Context, stub voltha.VolthaServiceClient, ch chan interface{}) {
160 if response, err := stub.ListAdapters(ctx, &empty.Empty{}); err != nil {
161 ch <- err
162 } else {
163 ch <- response
164 }
165}
166
167func sendEnableDeviceRequest(ctx context.Context, stub voltha.VolthaServiceClient, deviceId string, ch chan interface{}) {
168 if response, err := stub.EnableDevice(ctx, &common.ID{Id: deviceId}); err != nil {
169 ch <- err
170 } else {
171 ch <- response
172 }
173}
174
175func sendDisableDeviceRequest(ctx context.Context, stub voltha.VolthaServiceClient, deviceId string, ch chan interface{}) {
176 if response, err := stub.DisableDevice(ctx, &common.ID{Id: deviceId}); err != nil {
177 ch <- err
178 } else {
179 ch <- response
180 }
181}
182
183func sendDeleteDeviceRequest(ctx context.Context, stub voltha.VolthaServiceClient, deviceId string, ch chan interface{}) {
184 if response, err := stub.DeleteDevice(ctx, &common.ID{Id: deviceId}); err != nil {
185 ch <- err
186 } else {
187 ch <- response
188 }
189}
190
191func getDevices(ctx context.Context, stub voltha.VolthaServiceClient) (*voltha.Devices, error) {
192 if response, err := stub.ListDevices(ctx, &empty.Empty{}); err != nil {
193 return nil, err
194 } else {
195 return response, nil
196 }
197}
198
199func getLogicalDevices(ctx context.Context, stub voltha.VolthaServiceClient) (*voltha.LogicalDevices, error) {
200 if response, err := stub.ListLogicalDevices(ctx, &empty.Empty{}); err != nil {
201 return nil, err
202 } else {
203 return response, nil
204 }
205}
206
207func IsFlowPresent(lookingFor *voltha.OfpFlowStats, flows []*voltha.OfpFlowStats) bool {
208 for _, f := range flows {
209 if f.String() == lookingFor.String() {
210 return true
211 }
212 }
213 return false
214}
215
216func ListDevices(stub voltha.VolthaServiceClient) (*voltha.Devices, error) {
217 ui := uuid.New()
218 ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(VOLTHA_SERIAL_NUMBER_KEY, ui.String()))
219 if devices, err := getDevices(ctx, stub); err == nil {
220 return devices, nil
221 } else {
222 return nil, err
223 }
224}
225
226func sendFlow(ctx context.Context, stub voltha.VolthaServiceClient, flow *ofp.FlowTableUpdate, ch chan interface{}) {
227 if response, err := stub.UpdateLogicalDeviceFlowTable(ctx, flow); err != nil {
228 ch <- err
229 } else {
230 ch <- response
231 }
232}
233
234func SetLogLevel(stub voltha.VolthaServiceClient, l voltha.Logging) error {
235 ui := uuid.New()
236 ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(VOLTHA_SERIAL_NUMBER_KEY, ui.String()))
237 _, err := stub.UpdateLogLevel(ctx, &l)
238 return err
239}
240
241func SetAllLogLevel(stub voltha.VolthaServiceClient, l voltha.Logging) error {
242 ui := uuid.New()
243 ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(VOLTHA_SERIAL_NUMBER_KEY, ui.String()))
244 _, err := stub.UpdateLogLevel(ctx, &l)
245 return err
246}
247
248func SetupGrpcConnectionToCore(grpcHostIP string, grpcPort int) (voltha.VolthaServiceClient, error) {
249 grpcHost := fmt.Sprintf("%s:%d", grpcHostIP, grpcPort)
250 fmt.Println("Connecting to voltha using:", grpcHost)
251 conn, err := grpc.Dial(grpcHost, grpc.WithInsecure())
252 if err != nil {
253 return nil, err
254 }
255 return voltha.NewVolthaServiceClient(conn), nil
256}
257
258func VerifyLogicalDevices(stub voltha.VolthaServiceClient, parentDevice *voltha.Device, numONUsPerOLT int) (*voltha.LogicalDevices, error) {
259 ui := uuid.New()
260 ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(VOLTHA_SERIAL_NUMBER_KEY, ui.String()))
261 retrievedLogicalDevices, err := getLogicalDevices(ctx, stub)
262 if err != nil {
263 return nil, err
264 }
265 if len(retrievedLogicalDevices.Items) != 1 {
266 return nil, status.Errorf(codes.Internal, "Logical device number incorrect. Expected:{%d}, Created:{%d}", 1, len(retrievedLogicalDevices.Items))
267 }
268
269 // Verify that each device has two ports
270 for _, ld := range retrievedLogicalDevices.Items {
271 if ld.Id == "" ||
272 ld.DatapathId == uint64(0) ||
273 ld.Desc.HwDesc != "simulated_pon" ||
274 ld.Desc.SwDesc != "simulated_pon" ||
275 ld.RootDeviceId == "" ||
276 ld.Desc.SerialNum == "" ||
277 ld.SwitchFeatures.NBuffers != uint32(256) ||
278 ld.SwitchFeatures.NTables != uint32(2) ||
279 ld.SwitchFeatures.Capabilities != uint32(15) ||
280 len(ld.Ports) != 1+numONUsPerOLT ||
281 ld.RootDeviceId != parentDevice.Id {
282 return nil, status.Errorf(codes.Internal, "incorrect logical device status:{%v}", ld)
283 }
284 for _, p := range ld.Ports {
285 if p.DevicePortNo != p.OfpPort.PortNo ||
286 p.OfpPort.State != uint32(4) {
287 return nil, status.Errorf(codes.Internal, "incorrect logical ports status:{%v}", p)
288 }
289 if strings.HasPrefix(p.Id, "nni") {
290 if !p.RootPort || fmt.Sprintf("nni-%d", p.DevicePortNo) != p.Id {
291 return nil, status.Errorf(codes.Internal, "incorrect nni port status:{%v}", p)
292 }
293 } else {
294 if p.RootPort || fmt.Sprintf("uni-%d", p.DevicePortNo) != p.Id {
295 return nil, status.Errorf(codes.Internal, "incorrect uni port status:{%v}", p)
296 }
297 }
298 }
299 }
300 return retrievedLogicalDevices, nil
301}
302
303func VerifyDevices(stub voltha.VolthaServiceClient, numONUsPerOLT int) error {
304 ui := uuid.New()
305 ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(VOLTHA_SERIAL_NUMBER_KEY, ui.String()))
306 retrievedDevices, err := getDevices(ctx, stub)
307 if err != nil {
308 return err
309 }
310 if len(retrievedDevices.Items) != 1+numONUsPerOLT {
311 return status.Errorf(codes.Internal, "Device number incorrect. Expected:{%d}, Created:{%d}", 1, len(retrievedDevices.Items))
312 }
313 // Verify that each device has two ports
314 for _, d := range retrievedDevices.Items {
315 if d.AdminState != voltha.AdminState_ENABLED ||
316 d.ConnectStatus != voltha.ConnectStatus_REACHABLE ||
317 d.OperStatus != voltha.OperStatus_ACTIVE ||
318 d.Type != d.Adapter ||
319 d.Id == "" ||
320 d.MacAddress == "" ||
321 d.SerialNumber == "" {
322 return status.Errorf(codes.Internal, "incorrect device state - %s", d.Id)
323 }
324
325 if d.Type == "simulated_olt" && (!d.Root || d.ProxyAddress != nil) {
326 return status.Errorf(codes.Internal, "invalid olt status:{%v}", d)
327 } else if d.Type == "simulated_onu" && (d.Root ||
328 d.Vlan == uint32(0) ||
329 d.ParentId == "" ||
330 d.ProxyAddress.DeviceId == "" ||
331 d.ProxyAddress.DeviceType != "simulated_olt") {
332 return status.Errorf(codes.Internal, "invalid onu status:{%s}", d.Id)
333 }
334
335 if len(d.Ports) != 2 {
336 return status.Errorf(codes.Internal, "invalid number of ports:{%s, %v}", d.Id, d.Ports)
337 }
338
339 for _, p := range d.Ports {
340 if p.AdminState != voltha.AdminState_ENABLED ||
341 p.OperStatus != voltha.OperStatus_ACTIVE {
342 return status.Errorf(codes.Internal, "invalid port state:{%s, %v}", d.Id, p)
343 }
344
345 if p.Type == voltha.Port_ETHERNET_NNI || p.Type == voltha.Port_ETHERNET_UNI {
346 if len(p.Peers) != 0 {
347 return status.Errorf(codes.Internal, "invalid length of peers:{%s, %d}", d.Id, p.Type)
348 }
349 } else if p.Type == voltha.Port_PON_OLT {
350 if len(p.Peers) != numONUsPerOLT ||
351 p.PortNo != uint32(1) {
352 return status.Errorf(codes.Internal, "invalid length of peers for PON OLT port:{%s, %v}", d.Id, p)
353 }
354 } else if p.Type == voltha.Port_PON_ONU {
355 if len(p.Peers) != 1 ||
356 p.PortNo != uint32(1) {
357 return status.Errorf(codes.Internal, "invalid length of peers for PON ONU port:{%s, %v}", d.Id, p)
358 }
359 }
360 }
361 }
362 return nil
363}
364
365func areAdaptersPresent(requiredAdapterNames []string, retrievedAdapters *voltha.Adapters) bool {
366 if len(requiredAdapterNames) == 0 {
367 return true
368 }
369 for _, nAName := range requiredAdapterNames {
370 found := false
371 for _, rA := range retrievedAdapters.Items {
372 if nAName == rA.Id {
373 found = true
374 break
375 }
376 }
377 if !found {
378 return false
379 }
380 }
381 return true
382}
383
384func WaitForAdapterRegistration(stub voltha.VolthaServiceClient, requiredAdapterNames []string, timeout int) (*voltha.Adapters, error) {
385 fmt.Println("Waiting for adapter registration ...")
386 ui := uuid.New()
387 ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(VOLTHA_SERIAL_NUMBER_KEY, ui.String()))
388 ch := make(chan interface{})
389 defer close(ch)
390 for {
391 go sendListAdapters(ctx, stub, ch)
392 select {
393 case res, ok := <-ch:
394 if !ok {
395 return nil, status.Error(codes.Aborted, "channel closed")
396 } else if er, ok := res.(error); ok {
397 return nil, er
398 } else if a, ok := res.(*voltha.Adapters); ok {
399 if areAdaptersPresent(requiredAdapterNames, a) {
400 fmt.Println("All adapters registered:", a.Items)
401 return a, nil
402 }
403 }
404 case <-time.After(time.Duration(timeout) * time.Second):
405 return nil, status.Error(codes.Aborted, "timeout while waiting for adapter registration")
406 }
407 time.Sleep(1 * time.Second)
408 }
409}
410
411func PreProvisionDevice(stub voltha.VolthaServiceClient) (*voltha.Device, error) {
412 ui := uuid.New()
413 ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(VOLTHA_SERIAL_NUMBER_KEY, ui.String()))
414 randomMacAddress := strings.ToUpper(com.GetRandomMacAddress())
415 device := &voltha.Device{Type: "simulated_olt", MacAddress: randomMacAddress}
416 ch := make(chan interface{})
417 defer close(ch)
418 go sendCreateDeviceRequest(ctx, stub, device, ch)
419 res, ok := <-ch
420 if !ok {
421 return nil, status.Error(codes.Aborted, "channel closed")
422 } else if er, ok := res.(error); ok {
423 return nil, er
424 } else if d, ok := res.(*voltha.Device); ok {
425 return d, nil
426 }
427 return nil, status.Errorf(codes.Unknown, "cannot provision device:{%v}", device)
428}
429
430func EnableDevice(stub voltha.VolthaServiceClient, device *voltha.Device, numONUs int) error {
431 if device.AdminState == voltha.AdminState_PREPROVISIONED {
432 ui := uuid.New()
433 ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(VOLTHA_SERIAL_NUMBER_KEY, ui.String()))
434 ch := make(chan interface{})
435 defer close(ch)
436 go sendEnableDeviceRequest(ctx, stub, device.Id, ch)
437 res, ok := <-ch
438 if !ok {
439 return status.Error(codes.Aborted, "channel closed")
440 } else if er, ok := res.(error); ok {
441 return er
442 } else if _, ok := res.(*empty.Empty); ok {
443 return nil
444 }
445 }
446 return status.Errorf(codes.Unknown, "cannot enable device:{%s}", device.Id)
447}
448
449func UpdateFlow(stub voltha.VolthaServiceClient, flow *ofp.FlowTableUpdate) error {
450 ui := uuid.New()
451 ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(VOLTHA_SERIAL_NUMBER_KEY, ui.String()))
452 ch := make(chan interface{})
453 defer close(ch)
454 go sendFlow(ctx, stub, flow, ch)
455 res, ok := <-ch
456 if !ok {
457 return status.Error(codes.Aborted, "channel closed")
458 } else if er, ok := res.(error); ok {
459 return er
460 } else if _, ok := res.(*empty.Empty); ok {
461 return nil
462 }
463 return status.Errorf(codes.Unknown, "cannot add flow:{%v}", flow)
464}
465
466func StartSimulatedEnv(composePath string) error {
467 fmt.Println("Starting simulated environment ...")
468 // Start kafka and Etcd
469 if err := startKafka(composePath); err != nil {
470 return err
471 }
472 if err := startEtcd(composePath); err != nil {
473 return err
474 }
475 time.Sleep(5 * time.Second)
476
477 //Start the simulated adapters
478 if err := startSimulatedOLTAndONUAdapters(composePath); err != nil {
479 return err
480 }
481
482 //Start the core
483 if err := startCore(composePath); err != nil {
484 return err
485 }
486
487 time.Sleep(10 * time.Second)
488
489 fmt.Println("Simulated environment started.")
490 return nil
491}
492
493func StopSimulatedEnv(composePath string) error {
494 stopSimulatedOLTAndONUAdapters(composePath)
495 stopCore(composePath)
496 stopKafka(composePath)
497 stopEtcd(composePath)
498 return nil
499}