blob: c461f7e2c115664dd548fcbee5fc7c5f86d2af85 [file] [log] [blame]
// "build integration
/*
* Copyright 2018-present Open Networking Foundation
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package core
import (
"context"
"fmt"
"os/exec"
"strings"
"time"
"github.com/golang/protobuf/ptypes/empty"
"github.com/google/uuid"
com "github.com/opencord/voltha-lib-go/v2/pkg/adapters/common"
"github.com/opencord/voltha-lib-go/v2/pkg/log"
"github.com/opencord/voltha-protos/v2/go/common"
ofp "github.com/opencord/voltha-protos/v2/go/openflow_13"
"github.com/opencord/voltha-protos/v2/go/voltha"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)
// constant represents voltha serial number
const (
VolthaSerialNumberKey = "voltha_serial_number"
)
func startKafka(composePath string) error {
fmt.Println("Starting Kafka and Etcd ...")
command := "docker-compose"
fileName := fmt.Sprintf("%s/docker-compose-zk-kafka-test.yml", composePath)
cmd := exec.Command(command, "-f", fileName, "up", "-d")
if err := cmd.Run(); err != nil {
return err
}
return nil
}
func startEtcd(composePath string) error {
fmt.Println("Starting Etcd ...")
command := "docker-compose"
fileName := fmt.Sprintf("%s/docker-compose-etcd.yml", composePath)
cmd := exec.Command(command, "-f", fileName, "up", "-d")
if err := cmd.Run(); err != nil {
return err
}
return nil
}
func stopKafka(composePath string) error {
fmt.Println("Stopping Kafka and Etcd ...")
command := "docker-compose"
fileName := fmt.Sprintf("%s/docker-compose-zk-kafka-test.yml", composePath)
cmd := exec.Command(command, "-f", fileName, "down")
if err := cmd.Run(); err != nil {
return err
}
return nil
}
func stopEtcd(composePath string) error {
fmt.Println("Stopping Etcd ...")
command := "docker-compose"
fileName := fmt.Sprintf("%s/docker-compose-etcd.yml", composePath)
cmd := exec.Command(command, "-f", fileName, "down")
if err := cmd.Run(); err != nil {
return err
}
return nil
}
func startCore(composePath string) error {
fmt.Println("Starting voltha core ...")
command := "docker-compose"
fileName := fmt.Sprintf("%s/rw_core.yml", composePath)
cmd := exec.Command(command, "-f", fileName, "up", "-d")
if err := cmd.Run(); err != nil {
return err
}
return nil
}
func stopCore(composePath string) error {
fmt.Println("Stopping voltha core ...")
command := "docker-compose"
fileName := fmt.Sprintf("%s/rw_core.yml", composePath)
cmd := exec.Command(command, "-f", fileName, "down")
if err := cmd.Run(); err != nil {
return err
}
return nil
}
func startSimulatedOLTAndONUAdapters(composePath string) error {
fmt.Println("Starting simulated OLT and ONU adapters ...")
command := "docker-compose"
fileName := fmt.Sprintf("%s/adapters-simulated.yml", composePath)
cmd := exec.Command(command, "-f", fileName, "up", "-d")
if err := cmd.Run(); err != nil {
return err
}
return nil
}
func stopSimulatedOLTAndONUAdapters(composePath string) error {
fmt.Println("Stopping simulated OLT and ONU adapters ...")
command := "docker-compose"
fileName := fmt.Sprintf("%s/adapters-simulated.yml", composePath)
cmd := exec.Command(command, "-f", fileName, "down")
if err := cmd.Run(); err != nil {
return err
}
return nil
}
// ListLogicalDevices lists all logical devices managed by the Voltha cluster
func ListLogicalDevices(stub voltha.VolthaServiceClient) (*voltha.LogicalDevices, error) {
ui := uuid.New()
ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(VolthaSerialNumberKey, ui.String()))
return stub.ListLogicalDevices(ctx, &empty.Empty{})
}
func sendCreateDeviceRequest(ctx context.Context, stub voltha.VolthaServiceClient, device *voltha.Device, ch chan interface{}) {
if response, err := stub.CreateDevice(ctx, device); err != nil {
ch <- err
} else {
ch <- response
}
}
func sendListAdapters(ctx context.Context, stub voltha.VolthaServiceClient, ch chan interface{}) {
if response, err := stub.ListAdapters(ctx, &empty.Empty{}); err != nil {
ch <- err
} else {
ch <- response
}
}
func sendEnableDeviceRequest(ctx context.Context, stub voltha.VolthaServiceClient, deviceID string, ch chan interface{}) {
if response, err := stub.EnableDevice(ctx, &common.ID{Id: deviceID}); err != nil {
ch <- err
} else {
ch <- response
}
}
func getDevices(ctx context.Context, stub voltha.VolthaServiceClient) (*voltha.Devices, error) {
return stub.ListDevices(ctx, &empty.Empty{})
}
func getLogicalDevices(ctx context.Context, stub voltha.VolthaServiceClient) (*voltha.LogicalDevices, error) {
return stub.ListLogicalDevices(ctx, &empty.Empty{})
}
// IsFlowPresent returns true if flow is present
func IsFlowPresent(lookingFor *voltha.OfpFlowStats, flows []*voltha.OfpFlowStats) bool {
for _, f := range flows {
if f.String() == lookingFor.String() {
return true
}
}
return false
}
// ListDevices lists all physical devices controlled by the Voltha cluster
func ListDevices(stub voltha.VolthaServiceClient) (*voltha.Devices, error) {
ui := uuid.New()
ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(VolthaSerialNumberKey, ui.String()))
return getDevices(ctx, stub)
}
func sendFlow(ctx context.Context, stub voltha.VolthaServiceClient, flow *ofp.FlowTableUpdate, ch chan interface{}) {
if response, err := stub.UpdateLogicalDeviceFlowTable(ctx, flow); err != nil {
ch <- err
} else {
ch <- response
}
}
// SetLogLevel sets log level to the given level
func SetLogLevel(stub voltha.VolthaServiceClient, l voltha.Logging) error {
ui := uuid.New()
ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(VolthaSerialNumberKey, ui.String()))
_, err := stub.UpdateLogLevel(ctx, &l)
return err
}
// SetAllLogLevel sets log level of all service to the given level
func SetAllLogLevel(stub voltha.VolthaServiceClient, l voltha.Logging) error {
ui := uuid.New()
ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(VolthaSerialNumberKey, ui.String()))
_, err := stub.UpdateLogLevel(ctx, &l)
return err
}
// SetupGrpcConnectionToCore sets up client connection to an RPC server.
func SetupGrpcConnectionToCore(grpcHostIP string, grpcPort int) (voltha.VolthaServiceClient, error) {
grpcHost := fmt.Sprintf("%s:%d", grpcHostIP, grpcPort)
fmt.Println("Connecting to voltha using:", grpcHost)
conn, err := grpc.Dial(grpcHost, grpc.WithInsecure())
if err != nil {
return nil, err
}
return voltha.NewVolthaServiceClient(conn), nil
}
// VerifyLogicalDevices verifies all logical devices managed by the Voltha cluster
func VerifyLogicalDevices(stub voltha.VolthaServiceClient, parentDevice *voltha.Device, numONUsPerOLT int) (*voltha.LogicalDevices, error) {
ui := uuid.New()
ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(VolthaSerialNumberKey, ui.String()))
retrievedLogicalDevices, err := getLogicalDevices(ctx, stub)
if err != nil {
return nil, err
}
if len(retrievedLogicalDevices.Items) != 1 {
return nil, status.Errorf(codes.Internal, "Logical device number incorrect. Expected:{%d}, Created:{%d}", 1, len(retrievedLogicalDevices.Items))
}
// Verify that each device has two ports
for _, ld := range retrievedLogicalDevices.Items {
if ld.Id == "" ||
ld.DatapathId == uint64(0) ||
ld.Desc.HwDesc != "simulated_pon" ||
ld.Desc.SwDesc != "simulated_pon" ||
ld.RootDeviceId == "" ||
ld.Desc.SerialNum == "" ||
ld.SwitchFeatures.NBuffers != uint32(256) ||
ld.SwitchFeatures.NTables != uint32(2) ||
ld.SwitchFeatures.Capabilities != uint32(15) ||
len(ld.Ports) != 1+numONUsPerOLT ||
ld.RootDeviceId != parentDevice.Id {
return nil, status.Errorf(codes.Internal, "incorrect logical device status:{%v}", ld)
}
for _, p := range ld.Ports {
if p.DevicePortNo != p.OfpPort.PortNo ||
p.OfpPort.State != uint32(4) {
return nil, status.Errorf(codes.Internal, "incorrect logical ports status:{%v}", p)
}
if strings.HasPrefix(p.Id, "nni") {
if !p.RootPort || fmt.Sprintf("nni-%d", p.DevicePortNo) != p.Id {
return nil, status.Errorf(codes.Internal, "incorrect nni port status:{%v}", p)
}
} else {
if p.RootPort || fmt.Sprintf("uni-%d", p.DevicePortNo) != p.Id {
return nil, status.Errorf(codes.Internal, "incorrect uni port status:{%v}", p)
}
}
}
}
return retrievedLogicalDevices, nil
}
// VerifyDevices verifies all physical devices controlled by the Voltha cluster
func VerifyDevices(stub voltha.VolthaServiceClient, numONUsPerOLT int) error {
ui := uuid.New()
ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(VolthaSerialNumberKey, ui.String()))
retrievedDevices, err := getDevices(ctx, stub)
if err != nil {
return err
}
if len(retrievedDevices.Items) != 1+numONUsPerOLT {
return status.Errorf(codes.Internal, "Device number incorrect. Expected:{%d}, Created:{%d}", 1, len(retrievedDevices.Items))
}
// Verify that each device has two ports
for _, d := range retrievedDevices.Items {
if d.AdminState != voltha.AdminState_ENABLED ||
d.ConnectStatus != voltha.ConnectStatus_REACHABLE ||
d.OperStatus != voltha.OperStatus_ACTIVE ||
d.Type != d.Adapter ||
d.Id == "" ||
d.MacAddress == "" ||
d.SerialNumber == "" {
return status.Errorf(codes.Internal, "incorrect device state - %s", d.Id)
}
if d.Type == "simulated_olt" && (!d.Root || d.ProxyAddress != nil) {
return status.Errorf(codes.Internal, "invalid olt status:{%v}", d)
} else if d.Type == "simulated_onu" && (d.Root ||
d.Vlan == uint32(0) ||
d.ParentId == "" ||
d.ProxyAddress.DeviceId == "" ||
d.ProxyAddress.DeviceType != "simulated_olt") {
return status.Errorf(codes.Internal, "invalid onu status:{%s}", d.Id)
}
if len(d.Ports) != 2 {
return status.Errorf(codes.Internal, "invalid number of ports:{%s, %v}", d.Id, d.Ports)
}
for _, p := range d.Ports {
if p.AdminState != voltha.AdminState_ENABLED ||
p.OperStatus != voltha.OperStatus_ACTIVE {
return status.Errorf(codes.Internal, "invalid port state:{%s, %v}", d.Id, p)
}
if p.Type == voltha.Port_ETHERNET_NNI || p.Type == voltha.Port_ETHERNET_UNI {
if len(p.Peers) != 0 {
return status.Errorf(codes.Internal, "invalid length of peers:{%s, %d}", d.Id, p.Type)
}
} else if p.Type == voltha.Port_PON_OLT {
if len(p.Peers) != numONUsPerOLT ||
p.PortNo != uint32(1) {
return status.Errorf(codes.Internal, "invalid length of peers for PON OLT port:{%s, %v}", d.Id, p)
}
} else if p.Type == voltha.Port_PON_ONU {
if len(p.Peers) != 1 ||
p.PortNo != uint32(1) {
return status.Errorf(codes.Internal, "invalid length of peers for PON ONU port:{%s, %v}", d.Id, p)
}
}
}
}
return nil
}
func areAdaptersPresent(requiredAdapterNames []string, retrievedAdapters *voltha.Adapters) bool {
if len(requiredAdapterNames) == 0 {
return true
}
for _, nAName := range requiredAdapterNames {
found := false
for _, rA := range retrievedAdapters.Items {
if nAName == rA.Id {
found = true
break
}
}
if !found {
return false
}
}
return true
}
// WaitForAdapterRegistration waits on channel for response of list adapters
func WaitForAdapterRegistration(stub voltha.VolthaServiceClient, requiredAdapterNames []string, timeout int) (*voltha.Adapters, error) {
fmt.Println("Waiting for adapter registration ...")
ui := uuid.New()
ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(VolthaSerialNumberKey, ui.String()))
ch := make(chan interface{})
defer close(ch)
for {
go sendListAdapters(ctx, stub, ch)
select {
case res, ok := <-ch:
if !ok {
return nil, status.Error(codes.Aborted, "channel closed")
} else if er, ok := res.(error); ok {
return nil, er
} else if a, ok := res.(*voltha.Adapters); ok {
if areAdaptersPresent(requiredAdapterNames, a) {
fmt.Println("All adapters registered:", a.Items)
return a, nil
}
}
case <-time.After(time.Duration(timeout) * time.Second):
return nil, status.Error(codes.Aborted, "timeout while waiting for adapter registration")
}
time.Sleep(1 * time.Second)
}
}
// PreProvisionDevice pre-provisions a new physical device
func PreProvisionDevice(stub voltha.VolthaServiceClient) (*voltha.Device, error) {
ui := uuid.New()
ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(VolthaSerialNumberKey, ui.String()))
randomMacAddress := strings.ToUpper(com.GetRandomMacAddress())
device := &voltha.Device{Type: "simulated_olt", MacAddress: randomMacAddress}
ch := make(chan interface{})
defer close(ch)
go sendCreateDeviceRequest(ctx, stub, device, ch)
res, ok := <-ch
if !ok {
return nil, status.Error(codes.Aborted, "channel closed")
} else if er, ok := res.(error); ok {
return nil, er
} else if d, ok := res.(*voltha.Device); ok {
return d, nil
}
return nil, status.Errorf(codes.Unknown, "cannot provision device:{%v}", device)
}
// EnableDevice - if the device was in pre-provisioned state then it
// will transition to ENABLED state. If it was is DISABLED state then it
// will transition to ENABLED state as well.
func EnableDevice(stub voltha.VolthaServiceClient, device *voltha.Device, numONUs int) error {
if device.AdminState == voltha.AdminState_PREPROVISIONED {
ui := uuid.New()
ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(VolthaSerialNumberKey, ui.String()))
ch := make(chan interface{})
defer close(ch)
go sendEnableDeviceRequest(ctx, stub, device.Id, ch)
res, ok := <-ch
if !ok {
return status.Error(codes.Aborted, "channel closed")
} else if er, ok := res.(error); ok {
return er
} else if _, ok := res.(*empty.Empty); ok {
return nil
}
}
return status.Errorf(codes.Unknown, "cannot enable device:{%s}", device.Id)
}
// UpdateFlow updates flow table for logical device
func UpdateFlow(stub voltha.VolthaServiceClient, flow *ofp.FlowTableUpdate) error {
ui := uuid.New()
ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(VolthaSerialNumberKey, ui.String()))
ch := make(chan interface{})
defer close(ch)
go sendFlow(ctx, stub, flow, ch)
res, ok := <-ch
if !ok {
return status.Error(codes.Aborted, "channel closed")
} else if er, ok := res.(error); ok {
return er
} else if _, ok := res.(*empty.Empty); ok {
return nil
}
return status.Errorf(codes.Unknown, "cannot add flow:{%v}", flow)
}
// StartSimulatedEnv starts kafka, etcd, olt, onu adapters, core
func StartSimulatedEnv(composePath string) error {
fmt.Println("Starting simulated environment ...")
// Start kafka and Etcd
if err := startKafka(composePath); err != nil {
return err
}
if err := startEtcd(composePath); err != nil {
return err
}
time.Sleep(5 * time.Second)
//Start the simulated adapters
if err := startSimulatedOLTAndONUAdapters(composePath); err != nil {
return err
}
//Start the core
if err := startCore(composePath); err != nil {
return err
}
time.Sleep(10 * time.Second)
fmt.Println("Simulated environment started.")
return nil
}
// StopSimulatedEnv stops kafka, etcd, olt, onu adapters, core
func StopSimulatedEnv(composePath string) error {
err := stopSimulatedOLTAndONUAdapters(composePath)
if err != nil {
log.Errorw("unable-to-stop-simulated-olt-onu-adapters", log.Fields{"error": err})
}
err = stopCore(composePath)
if err != nil {
log.Errorw("unable-to-stop-core", log.Fields{"error": err})
}
err = stopKafka(composePath)
if err != nil {
log.Errorw("unable-to-stop-kafka", log.Fields{"error": err})
}
err = stopEtcd(composePath)
if err != nil {
log.Errorw("unable-to-stop-etcd", log.Fields{"error": err})
}
return nil
}