| /* |
| * Copyright 2021-2023 Open Networking Foundation (ONF) and the ONF Contributors |
| |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package grpc_test |
| |
| import ( |
| "context" |
| "fmt" |
| "os" |
| "strconv" |
| "sync" |
| "testing" |
| "time" |
| |
| vgrpc "github.com/opencord/voltha-lib-go/v7/pkg/grpc" |
| "github.com/opencord/voltha-lib-go/v7/pkg/log" |
| "github.com/opencord/voltha-lib-go/v7/pkg/probe" |
| "github.com/opencord/voltha-protos/v5/go/common" |
| "github.com/opencord/voltha-protos/v5/go/core_service" |
| "github.com/opencord/voltha-protos/v5/go/voltha" |
| "github.com/phayes/freeport" |
| "github.com/stretchr/testify/assert" |
| "google.golang.org/grpc" |
| ) |
| |
| const ( |
| testGrpcServer = "test-grpc-server" |
| initialInterval = 100 * time.Millisecond |
| maxInterval = 5000 * time.Millisecond |
| maxElapsedTime = 0 * time.Millisecond |
| monitorInterval = 2 * time.Second |
| timeout = 10 * time.Second |
| ) |
| |
| var testForNoActivityCh = make(chan time.Time, 20) |
| var testKeepAliveCh = make(chan time.Time, 20) |
| |
| type testCoreServer struct { |
| apiEndPoint string |
| server *vgrpc.GrpcServer |
| probe *probe.Probe |
| coreService *vgrpc.MockCoreServiceHandler |
| } |
| |
| func newTestCoreServer(apiEndpoint string) *testCoreServer { |
| return &testCoreServer{ |
| apiEndPoint: apiEndpoint, |
| probe: &probe.Probe{}, |
| } |
| } |
| |
| func (s *testCoreServer) registerService(ctx context.Context, t *testing.T) { |
| assert.NotEqual(t, "", s.apiEndPoint) |
| |
| probePort, err := freeport.GetFreePort() |
| assert.Nil(t, err) |
| probeEndpoint := ":" + strconv.Itoa(probePort) |
| go s.probe.ListenAndServe(ctx, probeEndpoint) |
| s.probe.RegisterService(ctx, testGrpcServer) |
| |
| s.server = vgrpc.NewGrpcServer(s.apiEndPoint, nil, false, s.probe) |
| s.coreService = vgrpc.NewMockCoreServiceHandler() |
| |
| s.server.AddService(func(server *grpc.Server) { |
| core_service.RegisterCoreServiceServer(server, s.coreService) |
| }) |
| } |
| |
| func (s *testCoreServer) start(ctx context.Context, t *testing.T) { |
| assert.NotNil(t, s.server) |
| assert.NotEqual(t, "", s.apiEndPoint) |
| |
| s.probe.UpdateStatus(ctx, testGrpcServer, probe.ServiceStatusRunning) |
| s.coreService.Start() |
| s.server.Start(ctx) |
| s.probe.UpdateStatus(ctx, testGrpcServer, probe.ServiceStatusStopped) |
| } |
| |
| func (s *testCoreServer) stop() { |
| s.coreService.Stop() |
| if s.server != nil { |
| s.server.Stop() |
| } |
| } |
| |
| type testClient struct { |
| apiEndPoint string |
| probe *probe.Probe |
| client *vgrpc.Client |
| } |
| |
| func serverRestarted(ctx context.Context, endPoint string) error { |
| logger.Infow(ctx, "remote-restarted", log.Fields{"endpoint": endPoint}) |
| return nil |
| } |
| |
| func newTestClient(apiEndpoint string, handler vgrpc.RestartedHandler) *testClient { |
| tc := &testClient{ |
| apiEndPoint: apiEndpoint, |
| probe: &probe.Probe{}, |
| } |
| // Set the environment variables that this client will use |
| var err error |
| err = os.Setenv("GRPC_BACKOFF_INITIAL_INTERVAL", initialInterval.String()) |
| if err != nil { |
| logger.Warnw(context.Background(), "setting-env-variable-failed", log.Fields{"error": err, "variable": "GRPC_BACKOFF_INITIAL_INTERVAL"}) |
| return nil |
| } |
| err = os.Setenv("GRPC_BACKOFF_MAX_INTERVAL", maxInterval.String()) |
| if err != nil { |
| logger.Warnw(context.Background(), "setting-env-variable-failed", log.Fields{"error": err, "variable": "GRPC_BACKOFF_MAX_INTERVAL"}) |
| return nil |
| } |
| err = os.Setenv("GRPC_BACKOFF_MAX_ELAPSED_TIME", maxElapsedTime.String()) |
| if err != nil { |
| logger.Warnw(context.Background(), "setting-env-variable-failed", log.Fields{"error": err, "variable": "GRPC_BACKOFF_MAX_ELAPSED_TIME"}) |
| return nil |
| } |
| |
| err = os.Setenv("GRPC_MONITOR_INTERVAL", monitorInterval.String()) |
| if err != nil { |
| logger.Warnw(context.Background(), "setting-env-variable-failed", log.Fields{"error": err, "variable": "GRPC_MONITOR_INTERVAL"}) |
| return nil |
| } |
| |
| tc.client, err = vgrpc.NewClient( |
| "test-endpoint", |
| apiEndpoint, |
| "core_service.CoreService", |
| handler) |
| if err != nil { |
| return nil |
| } |
| return tc |
| } |
| |
| func getCoreServiceHandler(ctx context.Context, conn *grpc.ClientConn) interface{} { |
| if conn == nil { |
| return nil |
| } |
| return core_service.NewCoreServiceClient(conn) |
| } |
| |
| func idleConnectionTest(ctx context.Context, conn *grpc.ClientConn) interface{} { |
| if conn == nil { |
| return nil |
| } |
| svc := core_service.NewCoreServiceClient(conn) |
| |
| testForNoActivityCh <- time.Now() |
| return svc |
| } |
| |
| func (c *testClient) start(ctx context.Context, t *testing.T, handler vgrpc.GetServiceClient) { |
| assert.NotNil(t, c.client) |
| |
| probePort, err := freeport.GetFreePort() |
| assert.Nil(t, err) |
| probeEndpoint := "127.0.0.1:" + strconv.Itoa(probePort) |
| go c.probe.ListenAndServe(ctx, probeEndpoint) |
| |
| probeCtx := context.WithValue(ctx, probe.ProbeContextKey, c.probe) |
| c.client.Start(probeCtx, handler) |
| } |
| |
| func (c *testClient) getClient(t *testing.T) core_service.CoreServiceClient { |
| gc, err := c.client.GetClient() |
| assert.Nil(t, err) |
| coreClient, ok := gc.(core_service.CoreServiceClient) |
| assert.True(t, ok) |
| return coreClient |
| } |
| |
| func serverStartsFirstTest(t *testing.T) { |
| // Setup |
| ctx, cancel := context.WithCancel(context.Background()) |
| defer cancel() |
| |
| // Create and start the test server |
| grpcPort, err := freeport.GetFreePort() |
| assert.Nil(t, err) |
| apiEndpoint := "127.0.0.1:" + strconv.Itoa(grpcPort) |
| ts := newTestCoreServer(apiEndpoint) |
| ts.registerService(ctx, t) |
| go ts.start(ctx, t) |
| |
| // Create the test client and start it |
| tc := newTestClient(apiEndpoint, serverRestarted) |
| assert.NotNil(t, tc) |
| go tc.start(ctx, t, getCoreServiceHandler) |
| |
| // Test 1: Verify that probe status shows ready eventually |
| var servicesReady isConditionSatisfied = func() bool { |
| return ts.probe.IsReady() && tc.probe.IsReady() |
| } |
| err = waitUntilCondition(timeout, servicesReady) |
| assert.Nil(t, err) |
| |
| // Test 2: Verify we get a valid client and can make grpc requests with it |
| coreClient := tc.getClient(t) |
| assert.NotNil(t, coreClient) |
| |
| device, err := coreClient.GetDevice(context.Background(), &common.ID{Id: "1234"}) |
| assert.Nil(t, err) |
| assert.NotNil(t, device) |
| assert.Equal(t, "test-1234", device.Type) |
| } |
| |
| func clientStartsFirstTest(t *testing.T) { |
| ctx, cancel := context.WithCancel(context.Background()) |
| defer cancel() |
| |
| // Create a grpc endpoint for the server |
| grpcPort, err := freeport.GetFreePort() |
| assert.Nil(t, err) |
| apiEndpoint := "127.0.0.1:" + strconv.Itoa(grpcPort) |
| |
| // Create the test client and start it |
| tc := newTestClient(apiEndpoint, serverRestarted) |
| assert.NotNil(t, tc) |
| go tc.start(ctx, t, getCoreServiceHandler) |
| |
| // Verify client is not ready |
| var clientNotReady isConditionSatisfied = func() bool { |
| serviceStatus := tc.probe.GetStatus(apiEndpoint) |
| return serviceStatus == probe.ServiceStatusNotReady || |
| serviceStatus == probe.ServiceStatusPreparing || |
| serviceStatus == probe.ServiceStatusFailed |
| } |
| err = waitUntilCondition(timeout, clientNotReady) |
| assert.Nil(t, err) |
| |
| // Create and start the test server |
| ts := newTestCoreServer(apiEndpoint) |
| ts.registerService(ctx, t) |
| go ts.start(ctx, t) |
| |
| // Test 1: Verify that probe status shows ready eventually |
| var servicesReady isConditionSatisfied = func() bool { |
| return ts.probe.IsReady() && tc.probe.IsReady() |
| } |
| err = waitUntilCondition(timeout, servicesReady) |
| assert.Nil(t, err) |
| |
| // Test 2: Verify we get a valid client and can make grpc requests with it |
| coreClient := tc.getClient(t) |
| assert.NotNil(t, coreClient) |
| |
| device, err := coreClient.GetDevice(context.Background(), &common.ID{Id: "1234"}) |
| assert.Nil(t, err) |
| assert.NotNil(t, device) |
| assert.Equal(t, "test-1234", device.Type) |
| } |
| |
| // Liveness function |
| func livessness(timestamp time.Time) { |
| logger.Debugw(context.Background(), "received-liveness", log.Fields{"timestamp": timestamp}) |
| } |
| |
| func serverRestarts(t *testing.T, numRestartRuns int) { |
| // Setup |
| ctx, cancel := context.WithCancel(context.Background()) |
| defer cancel() |
| |
| // Create and start the test server |
| grpcPort, err := freeport.GetFreePort() |
| assert.Nil(t, err) |
| apiEndpoint := "127.0.0.1:" + strconv.Itoa(grpcPort) |
| ts := newTestCoreServer(apiEndpoint) |
| ts.registerService(ctx, t) |
| go ts.start(ctx, t) |
| |
| // Create the test client and start it |
| tc := newTestClient(apiEndpoint, serverRestarted) |
| assert.NotNil(t, tc) |
| |
| // Subscribe for liveness |
| tc.client.SubscribeForLiveness(livessness) |
| go tc.start(ctx, t, getCoreServiceHandler) |
| |
| // Test 1: Verify that probe status shows ready eventually |
| var servicesReady isConditionSatisfied = func() bool { |
| return ts.probe.IsReady() && tc.probe.IsReady() |
| } |
| err = waitUntilCondition(timeout, servicesReady) |
| assert.Nil(t, err) |
| |
| // Test 2: Verify we get a valid client and can make grpc requests with it |
| coreClient := tc.getClient(t) |
| assert.NotNil(t, coreClient) |
| |
| device, err := coreClient.GetDevice(context.Background(), &common.ID{Id: "1234"}) |
| assert.Nil(t, err) |
| assert.NotNil(t, device) |
| assert.Equal(t, "test-1234", device.Type) |
| |
| for i := 1; i <= numRestartRuns; i++ { |
| //Test 3: Stop server and verify server status |
| ts.stop() |
| var serverDown isConditionSatisfied = func() bool { |
| return ts.probe.GetStatus(testGrpcServer) == probe.ServiceStatusStopped |
| } |
| err = waitUntilCondition(timeout, serverDown) |
| assert.Nil(t, err) |
| |
| // Wait until the client service shows as not ready. A wait is not needed. It's just to verify that the |
| // client changes connection state. |
| var clientNotReady isConditionSatisfied = func() bool { |
| serviceStatus := tc.probe.GetStatus(apiEndpoint) |
| return serviceStatus == probe.ServiceStatusNotReady || |
| serviceStatus == probe.ServiceStatusPreparing || |
| serviceStatus == probe.ServiceStatusFailed |
| } |
| err = waitUntilCondition(timeout, clientNotReady) |
| |
| assert.Nil(t, err) |
| |
| // Test 4: Re-create the server and verify the server is back online |
| ts = newTestCoreServer(apiEndpoint) |
| ts.registerService(ctx, t) |
| go ts.start(ctx, t) |
| err = waitUntilCondition(timeout, servicesReady) |
| assert.Nil(t, err) |
| |
| // Test 5: verify we can pull new device with a new client instance |
| coreClient = tc.getClient(t) |
| assert.NotNil(t, coreClient) |
| device, err := coreClient.GetDevice(context.Background(), &common.ID{Id: "1234"}) |
| assert.Nil(t, err) |
| assert.Equal(t, "test-1234", device.Type) |
| } |
| // Stop the server |
| ts.stop() |
| } |
| |
| // Liveness function |
| func keepAliveMonitor(timestamp time.Time) { |
| logger.Debugw(context.Background(), "received-liveness", log.Fields{"timestamp": timestamp}) |
| testKeepAliveCh <- timestamp |
| } |
| |
| func testKeepAlive(t *testing.T) { |
| ctx, cancel := context.WithCancel(context.Background()) |
| defer cancel() |
| |
| // Create a grpc endpoint for the server |
| grpcPort, err := freeport.GetFreePort() |
| assert.Nil(t, err) |
| apiEndpoint := "127.0.0.1:" + strconv.Itoa(grpcPort) |
| |
| // Create the test client and start it |
| tc := newTestClient(apiEndpoint, serverRestarted) |
| assert.NotNil(t, tc) |
| go tc.start(ctx, t, idleConnectionTest) |
| |
| // Create and start the test server |
| ts := newTestCoreServer(apiEndpoint) |
| tc.client.SubscribeForLiveness(keepAliveMonitor) |
| ts.registerService(ctx, t) |
| go ts.start(ctx, t) |
| defer tc.client.Stop(context.Background()) |
| |
| // Test 1: Verify that probe status shows ready eventually |
| var servicesReady isConditionSatisfied = func() bool { |
| return ts.probe.IsReady() && tc.probe.IsReady() |
| } |
| err = waitUntilCondition(timeout, servicesReady) |
| assert.Nil(t, err) |
| |
| // Test 2: Verify we get a valid client and can make grpc requests with it |
| coreClient := tc.getClient(t) |
| assert.NotNil(t, coreClient) |
| |
| device, err := coreClient.GetDevice(context.Background(), &common.ID{Id: "1234"}) |
| assert.Nil(t, err) |
| assert.NotNil(t, device) |
| assert.Equal(t, "test-1234", device.Type) |
| |
| start := time.Now() |
| numChecks := 3 // Test for 3 checks |
| // Wait on the the idle channel - on no activity a connection probe will be attempted by the client |
| timer := time.NewTimer((monitorInterval + 1*time.Second) * time.Duration(numChecks)) |
| defer timer.Stop() |
| count := 0 |
| loop: |
| for { |
| select { |
| case timestamp := <-testKeepAliveCh: |
| if timestamp.After(start) { |
| count += 1 |
| if count > numChecks { |
| break loop |
| } |
| } |
| case <-timer.C: |
| t.Fatal("no activity on the idle channel") |
| } |
| } |
| } |
| |
| func testClientFailure(t *testing.T, numClientRestarts int) { |
| ctx, cancel := context.WithCancel(context.Background()) |
| defer cancel() |
| // Create a grpc endpoint for the server |
| grpcPort, err := freeport.GetFreePort() |
| assert.Nil(t, err) |
| apiEndpoint := "127.0.0.1:" + strconv.Itoa(grpcPort) |
| // Create the test client and start it |
| tc := newTestClient(apiEndpoint, serverRestarted) |
| assert.NotNil(t, tc) |
| go tc.start(ctx, t, idleConnectionTest) |
| // Create and start the test server |
| ts := newTestCoreServer(apiEndpoint) |
| ts.registerService(ctx, t) |
| go ts.start(ctx, t) |
| defer ts.stop() |
| // Test 1: Verify that probe status shows ready eventually |
| var servicesReady isConditionSatisfied = func() bool { |
| return ts.probe.IsReady() && tc.probe.IsReady() |
| } |
| err = waitUntilCondition(timeout, servicesReady) |
| assert.Nil(t, err) |
| // Test 2: Verify we get a valid client and can make grpc requests with it |
| coreClient := tc.getClient(t) |
| assert.NotNil(t, coreClient) |
| device, err := coreClient.GetDevice(context.Background(), &common.ID{Id: "1234"}) |
| assert.Nil(t, err) |
| assert.NotNil(t, device) |
| assert.Equal(t, "test-1234", device.Type) |
| for i := 1; i <= numClientRestarts; i++ { |
| // Kill grpc client |
| tc.client.Stop(context.Background()) |
| var clientNotReady isConditionSatisfied = func() bool { |
| return !tc.probe.IsReady() |
| } |
| err = waitUntilCondition(timeout, clientNotReady) |
| assert.Nil(t, err) |
| |
| // Create a new client |
| tc.client, err = vgrpc.NewClient( |
| "test-endpoint", |
| apiEndpoint, |
| "core_service.CoreService", |
| serverRestarted) |
| assert.Nil(t, err) |
| probeCtx := context.WithValue(ctx, probe.ProbeContextKey, tc.probe) |
| go tc.client.Start(probeCtx, idleConnectionTest) |
| |
| //Verify that probe status shows ready eventually |
| err = waitUntilCondition(timeout, servicesReady) |
| assert.Nil(t, err) |
| |
| // Verify we get a valid client and can make grpc requests with it |
| coreClient = tc.getClient(t) |
| assert.NotNil(t, coreClient) |
| |
| device, err = coreClient.GetDevice(context.Background(), &common.ID{Id: "1234"}) |
| assert.Nil(t, err) |
| assert.NotNil(t, device) |
| assert.Equal(t, "test-1234", device.Type) |
| } |
| tc.client.Stop(context.Background()) |
| } |
| |
| func testServerLimit(t *testing.T) { |
| t.Skip() // Not needed for regular unit tests |
| |
| ctx, cancel := context.WithCancel(context.Background()) |
| defer cancel() |
| |
| // Create a grpc endpoint for the server |
| grpcPort, err := freeport.GetFreePort() |
| assert.Nil(t, err) |
| apiEndpoint := "127.0.0.1:" + strconv.Itoa(grpcPort) |
| |
| // Create the test client and start it |
| tc := newTestClient(apiEndpoint, serverRestarted) |
| assert.NotNil(t, tc) |
| go tc.start(ctx, t, idleConnectionTest) |
| |
| // Create and start the test server |
| ts := newTestCoreServer(apiEndpoint) |
| ts.registerService(ctx, t) |
| go ts.start(ctx, t) |
| |
| // Test 1: Verify that probe status shows ready eventually |
| var servicesReady isConditionSatisfied = func() bool { |
| return ts.probe.IsReady() && tc.probe.IsReady() |
| } |
| err = waitUntilCondition(timeout, servicesReady) |
| assert.Nil(t, err) |
| |
| // Test 2: Verify we get a valid client and can make grpc requests with it |
| coreClient := tc.getClient(t) |
| assert.NotNil(t, coreClient) |
| |
| var lock sync.RWMutex |
| bad := []time.Duration{} |
| bad_err := []string{} |
| good := []time.Duration{} |
| var wg sync.WaitGroup |
| numRPCs := 10 |
| total_good := time.Duration(0) |
| max_good := time.Duration(0) |
| total_bad := time.Duration(0) |
| max_bad := time.Duration(0) |
| order_received := []uint32{} |
| for i := 1; i <= numRPCs; i++ { |
| wg.Add(1) |
| go func(seq int) { |
| local := time.Now() |
| ctx, cancel := context.WithTimeout(context.Background(), 10000*time.Millisecond) |
| defer cancel() |
| var err error |
| var d *voltha.Device |
| d, err = coreClient.GetDevice(ctx, &common.ID{Id: strconv.Itoa(seq)}) |
| if err != nil { |
| lock.Lock() |
| bad = append(bad, time.Since(local)) |
| bad_err = append(bad_err, err.Error()) |
| total_bad += time.Since(local) |
| if time.Since(local) > max_bad { |
| max_bad = time.Since(local) |
| } |
| logger.Errorw(ctx, "error produced", log.Fields{"error": err}) |
| lock.Unlock() |
| } else { |
| lock.Lock() |
| good = append(good, time.Since(local)) |
| total_good += time.Since(local) |
| if time.Since(local) > max_good { |
| max_good = time.Since(local) |
| } |
| if d != nil { |
| order_received = append(order_received, d.Vlan) |
| } |
| lock.Unlock() |
| } |
| wg.Done() |
| }(i) |
| } |
| wg.Wait() |
| assert.Equal(t, 0, len(bad)) |
| assert.Equal(t, numRPCs, len(good)) |
| //fmt.Println("Bad:", bad[:10]) |
| if len(bad_err) > 0 { |
| fmt.Println("Bad Err Last:", bad_err[len(bad_err)-1:]) |
| fmt.Println("Bad Err First:", bad_err[:1]) |
| } |
| fmt.Println("Good:", good[len(good)-10:]) |
| fmt.Println("Good average time:", total_good.Milliseconds()/int64(numRPCs)) |
| fmt.Println("Bad average time:", total_bad.Milliseconds()/int64(numRPCs)) |
| fmt.Println("Bad Max:", max_bad) |
| fmt.Println("Good Max:", max_good) |
| //fmt.Println("Order received:", order_received) |
| |
| prev := order_received[0] |
| |
| for i := 1; i < len(order_received); i++ { |
| if order_received[i] < prev { |
| fmt.Println("Prev:", prev, " curr:", order_received[i]) |
| } |
| prev = order_received[i] |
| } |
| } |
| |
| func TestSuite(t *testing.T) { |
| // Setup |
| log.SetAllLogLevel(volthaTestLogLevel) |
| |
| // Test starting server before client |
| serverStartsFirstTest(t) |
| |
| // Test starting client before server |
| clientStartsFirstTest(t) |
| |
| // Test server restarts |
| serverRestarts(t, 10) |
| |
| // Test that the client test the grpc connection on no activity |
| testKeepAlive(t) |
| |
| // Test client queueing with server limit |
| testServerLimit(t) |
| |
| // // Test the scenario where a client restarts |
| testClientFailure(t, 10) |
| } |