| /* |
| * Copyright 2018-present Open Networking Foundation |
| |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package concurrency |
| |
| import ( |
| "context" |
| "errors" |
| "fmt" |
| "github.com/golang/protobuf/ptypes/empty" |
| "github.com/google/uuid" |
| com "github.com/opencord/voltha-go/adapters/common" |
| "github.com/opencord/voltha-go/common/log" |
| "github.com/opencord/voltha-go/protos/common" |
| "github.com/opencord/voltha-go/protos/voltha" |
| "github.com/stretchr/testify/assert" |
| "google.golang.org/grpc" |
| "google.golang.org/grpc/metadata" |
| "os" |
| "os/exec" |
| "strings" |
| "testing" |
| ) |
| |
| var conns []*grpc.ClientConn |
| var stubs []voltha.VolthaServiceClient |
| var volthaSerialNumberKey string |
| var grpcPorts []int |
| |
| /* |
| This series of tests are executed with two RW_Cores |
| */ |
| |
| var devices map[string]*voltha.Device |
| |
| func setup() { |
| var err error |
| |
| if _, err = log.AddPackage(log.JSON, log.WarnLevel, log.Fields{"instanceId": "testing"}); err != nil { |
| log.With(log.Fields{"error": err}).Fatal("Cannot setup logging") |
| } |
| log.UpdateAllLoggers(log.Fields{"instanceId": "testing"}) |
| log.SetAllLogLevel(log.ErrorLevel) |
| |
| grpcPorts = []int{50057, 50058} |
| stubs = make([]voltha.VolthaServiceClient, 0) |
| conns = make([]*grpc.ClientConn, 0) |
| |
| volthaSerialNumberKey = "voltha_serial_number" |
| devices = make(map[string]*voltha.Device) |
| } |
| |
| func connectToCore(port int) (voltha.VolthaServiceClient, error) { |
| grpcHostIP := os.Getenv("DOCKER_HOST_IP") |
| grpcHost := fmt.Sprintf("%s:%d", grpcHostIP, port) |
| conn, err := grpc.Dial(grpcHost, grpc.WithInsecure()) |
| if err != nil { |
| log.Fatalf("did not connect: %s", err) |
| return nil, errors.New("failure-to-connect") |
| } |
| conns = append(conns, conn) |
| return voltha.NewVolthaServiceClient(conn), nil |
| } |
| |
| func setupGrpcConnection() []voltha.VolthaServiceClient { |
| // We have 2 concurrent cores. Connect to them |
| for _, port := range grpcPorts { |
| if client, err := connectToCore(port); err == nil { |
| stubs = append(stubs, client) |
| log.Infow("connected", log.Fields{"port": port}) |
| } |
| } |
| return stubs |
| } |
| |
| func clearAllDevices(clearMap bool) { |
| for key, _ := range devices { |
| ctx := context.Background() |
| response, err := stubs[1].DeleteDevice(ctx, &voltha.ID{Id: key}) |
| log.Infow("response", log.Fields{"res": response, "error": err}) |
| if clearMap { |
| delete(devices, key) |
| } |
| } |
| } |
| |
| // Verify if all ids are present in the global list of devices |
| func hasAllIds(ids *voltha.IDs) bool { |
| if ids == nil && len(devices) == 0 { |
| return true |
| } |
| if ids == nil { |
| return false |
| } |
| for _, id := range ids.Items { |
| if _, exist := devices[id.Id]; !exist { |
| return false |
| } |
| } |
| return true |
| } |
| |
| func startKafka() { |
| fmt.Println("Starting Kafka and Etcd ...") |
| command := "docker-compose" |
| cmd := exec.Command(command, "-f", "../../../compose/docker-compose-zk-kafka-test.yml", "up", "-d") |
| if err := cmd.Run(); err != nil { |
| log.Fatal(err) |
| } |
| } |
| |
| func startEtcd() { |
| fmt.Println("Starting Etcd ...") |
| command := "docker-compose" |
| cmd := exec.Command(command, "-f", "../../../compose/docker-compose-etcd.yml", "up", "-d") |
| if err := cmd.Run(); err != nil { |
| log.Fatal(err) |
| } |
| } |
| |
| func stopKafka() { |
| fmt.Println("Stopping Kafka and Etcd ...") |
| command := "docker-compose" |
| cmd := exec.Command(command, "-f", "../../../compose/docker-compose-zk-kafka-test.yml", "down") |
| if err := cmd.Run(); err != nil { |
| // ignore error - as this is mostly due network being left behind as its being used by other |
| // containers |
| log.Warn(err) |
| } |
| } |
| |
| func stopEtcd() { |
| fmt.Println("Stopping Etcd ...") |
| command := "docker-compose" |
| cmd := exec.Command(command, "-f", "../../../compose/docker-compose-etcd.yml", "down") |
| if err := cmd.Run(); err != nil { |
| // ignore error - as this is mostly due network being left behind as its being used by other |
| // containers |
| log.Warn(err) |
| } |
| } |
| |
| func startCores() { |
| fmt.Println("Starting voltha cores ...") |
| command := "docker-compose" |
| cmd := exec.Command(command, "-f", "../../../compose/rw_core_concurrency_test.yml", "up", "-d") |
| if err := cmd.Run(); err != nil { |
| log.Fatal(err) |
| } |
| } |
| |
| func stopCores() { |
| fmt.Println("Stopping voltha cores ...") |
| command := "docker-compose" |
| cmd := exec.Command(command, "-f", "../../../compose/rw_core_concurrency_test.yml", "down") |
| if err := cmd.Run(); err != nil { |
| // ignore error - as this is mostly due network being left behind as its being used by other |
| // containers |
| log.Warn(err) |
| } |
| } |
| |
| func startSimulatedOLTAndONUAdapters() { |
| fmt.Println("Starting simulated OLT and ONU adapters ...") |
| command := "docker-compose" |
| cmd := exec.Command(command, "-f", "../../../compose/adapters-simulated.yml", "up", "-d") |
| if err := cmd.Run(); err != nil { |
| log.Fatal(err) |
| } |
| } |
| |
| func stopSimulatedOLTAndONUAdapters() { |
| fmt.Println("Stopping simulated OLT and ONU adapters ...") |
| command := "docker-compose" |
| cmd := exec.Command(command, "-f", "../../../compose/adapters-simulated.yml", "down") |
| if err := cmd.Run(); err != nil { |
| // ignore error - as this is mostly due network being left behind as its being used by other |
| // containers |
| log.Warn(err) |
| } |
| } |
| |
| |
| func sendCreateDeviceRequest(ctx context.Context, stub voltha.VolthaServiceClient, device *voltha.Device, ch chan interface{} ) { |
| fmt.Println("Sending create device ...") |
| if response, err := stub.CreateDevice(ctx, device); err != nil { |
| ch <- err |
| } else { |
| ch <- response |
| } |
| } |
| |
| func sendEnableDeviceRequest(ctx context.Context, stub voltha.VolthaServiceClient, deviceId string, ch chan interface{} ) { |
| fmt.Println("Sending enable device ...") |
| if response, err := stub.EnableDevice(ctx, &common.ID{Id:deviceId}); err != nil { |
| ch <- err |
| } else { |
| ch <- response |
| } |
| } |
| |
| //// createPonsimDevice sends two requests to each core and waits for both responses |
| //func createPonsimDevice(stubs []voltha.VolthaServiceClient) (*voltha.Device, error) { |
| // ui := uuid.New() |
| // ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(volthaSerialNumberKey, ui.String())) |
| // //preprovision_olt -t ponsim_olt -H 172.20.0.11:50060 |
| // device := &voltha.Device{Type: "ponsim_olt"} |
| // device.Address = &voltha.Device_HostAndPort{HostAndPort:"172.20.0.11:50060"} |
| // ch := make(chan interface{}) |
| // defer close(ch) |
| // requestNum := 0 |
| // for _, stub := range stubs { |
| // go sendCreateDeviceRequest(ctx, stub, device, ch) |
| // requestNum += 1 |
| // } |
| // fmt.Println("Waiting for create device response ...") |
| // receivedResponse := 0 |
| // var err error |
| // var returnedDevice *voltha.Device |
| // select { |
| // case res, ok := <-ch: |
| // receivedResponse += 1 |
| // if !ok { |
| // } else if er, ok := res.(error); ok { |
| // err = er |
| // } else if d, ok := res.(*voltha.Device); ok { |
| // returnedDevice = d |
| // } |
| // if receivedResponse == requestNum { |
| // break |
| // } |
| // } |
| // if returnedDevice != nil { |
| // return returnedDevice, nil |
| // } |
| // return nil, err |
| //} |
| |
| // createDevice sends two requests to each core and waits for both responses |
| func createDevice(stubs []voltha.VolthaServiceClient) (*voltha.Device, error) { |
| ui := uuid.New() |
| ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(volthaSerialNumberKey, ui.String())) |
| randomMacAddress := strings.ToUpper(com.GetRandomMacAddress()) |
| device := &voltha.Device{Type: "simulated_olt", MacAddress:randomMacAddress} |
| ch := make(chan interface{}) |
| defer close(ch) |
| requestNum := 0 |
| for _, stub := range stubs { |
| go sendCreateDeviceRequest(ctx, stub, device, ch) |
| requestNum += 1 |
| } |
| fmt.Println("Waiting for create device response ...") |
| receivedResponse := 0 |
| var err error |
| var returnedDevice *voltha.Device |
| select { |
| case res, ok := <-ch: |
| receivedResponse += 1 |
| if !ok { |
| } else if er, ok := res.(error); ok { |
| err = er |
| } else if d, ok := res.(*voltha.Device); ok { |
| returnedDevice = d |
| } |
| if receivedResponse == requestNum { |
| break |
| } |
| } |
| if returnedDevice != nil { |
| return returnedDevice, nil |
| } |
| return nil, err |
| } |
| |
| // enableDevices sends two requests to each core for each device and waits for both responses before sending another |
| // enable request for a different device. |
| func enableAllDevices(stubs []voltha.VolthaServiceClient) error { |
| for deviceId, val := range devices { |
| if val.AdminState == voltha.AdminState_PREPROVISIONED { |
| ui := uuid.New() |
| ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(volthaSerialNumberKey, ui.String())) |
| ch := make(chan interface{}) |
| defer close(ch) |
| requestNum := 0 |
| for _, stub := range stubs { |
| go sendEnableDeviceRequest(ctx, stub, deviceId, ch) |
| requestNum +=1 |
| } |
| receivedResponse := 0 |
| var err error |
| fmt.Println("Waiting for enable device response ...") |
| validResponseReceived := false |
| select { |
| case res, ok := <-ch: |
| receivedResponse += 1 |
| if !ok { |
| } else if er, ok := res.(error); ok { |
| err = er |
| } else if _ , ok := res.(*empty.Empty); ok { |
| validResponseReceived = true |
| } |
| if receivedResponse == requestNum { |
| break |
| } |
| } |
| if validResponseReceived { |
| return nil |
| } |
| return err |
| } |
| } |
| return nil |
| } |
| |
| |
| func TestConcurrentRequests(t *testing.T) { |
| fmt.Println("Testing Concurrent requests ...") |
| |
| ////0. Start kafka and Ectd |
| //startKafka() |
| //startEtcd() |
| // |
| ////1. Start the core |
| //startCores() |
| // |
| ////2. Start the simulated adapters |
| //startSimulatedOLTAndONUAdapters() |
| // |
| //// Wait until the core and adapters sync up |
| //time.Sleep(10 * time.Second) |
| |
| stubs = setupGrpcConnection() |
| |
| //3. Create the devices |
| response, err := createDevice(stubs) |
| log.Infow("response", log.Fields{"res": response, "error": err}) |
| assert.Nil(t, err) |
| devices[response.Id] = response |
| |
| //4. Enable all the devices |
| err = enableAllDevices(stubs) |
| assert.Nil(t, err) |
| |
| ////5. Store simulated adapters |
| //stopSimulatedOLTAndONUAdapters() |
| // |
| ////6. Store the core |
| //stopCores() |
| // |
| ////7. Stop Kafka and Etcd |
| //stopKafka() |
| //stopEtcd() |
| } |
| |
| |
| func shutdown() { |
| for _, conn := range conns { |
| conn.Close() |
| } |
| } |
| |
| func TestMain(m *testing.M) { |
| setup() |
| code := m.Run() |
| shutdown() |
| os.Exit(code) |
| } |