blob: 440c1378d5caa86886355a96ec81baae1a614397 [file] [log] [blame]
khenaidoo26721882021-08-11 17:42:52 -04001/*
2 * Copyright 2021-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package grpc
17
18import (
19 "context"
20 "fmt"
21 "os"
22 "strconv"
23 "sync"
24 "testing"
25 "time"
26
khenaidoo26721882021-08-11 17:42:52 -040027 "github.com/opencord/voltha-lib-go/v7/pkg/log"
28 "github.com/opencord/voltha-lib-go/v7/pkg/probe"
29 "github.com/opencord/voltha-protos/v5/go/common"
khenaidooa5feb8e2021-10-19 17:29:22 -040030 "github.com/opencord/voltha-protos/v5/go/core_service"
31 "github.com/opencord/voltha-protos/v5/go/health"
khenaidoo26721882021-08-11 17:42:52 -040032 "github.com/opencord/voltha-protos/v5/go/voltha"
33 "github.com/phayes/freeport"
34 "github.com/stretchr/testify/assert"
35 "google.golang.org/grpc"
36)
37
38const (
39 testGrpcServer = "test-grpc-server"
40 initialInterval = 100 * time.Millisecond
41 maxInterval = 5000 * time.Millisecond
42 maxElapsedTime = 0 * time.Millisecond
43 monitorInterval = 2 * time.Second
44 timeout = 10 * time.Second
45)
46
47var testForNoActivityCh = make(chan time.Time, 10)
48
49type testCoreServer struct {
50 apiEndPoint string
51 server *GrpcServer
52 probe *probe.Probe
53}
54
55func newTestCoreServer(apiEndpoint string) *testCoreServer {
56 return &testCoreServer{
57 apiEndPoint: apiEndpoint,
58 probe: &probe.Probe{},
59 }
60}
61
62func (s *testCoreServer) registerService(ctx context.Context, t *testing.T) {
63 assert.NotEqual(t, "", s.apiEndPoint)
64
65 probePort, err := freeport.GetFreePort()
66 assert.Nil(t, err)
67 probeEndpoint := "127.0.0.1:" + strconv.Itoa(probePort)
68 go s.probe.ListenAndServe(ctx, probeEndpoint)
69 s.probe.RegisterService(ctx, testGrpcServer)
70
71 s.server = NewGrpcServer(s.apiEndPoint, nil, false, s.probe)
72
73 s.server.AddService(func(server *grpc.Server) {
khenaidooa5feb8e2021-10-19 17:29:22 -040074 core_service.RegisterCoreServiceServer(server, &MockCoreServiceHandler{})
khenaidoo26721882021-08-11 17:42:52 -040075 })
76}
77
78func (s *testCoreServer) start(ctx context.Context, t *testing.T) {
79 assert.NotNil(t, s.server)
80 assert.NotEqual(t, "", s.apiEndPoint)
81
82 s.probe.UpdateStatus(ctx, testGrpcServer, probe.ServiceStatusRunning)
83 s.server.Start(ctx)
84 s.probe.UpdateStatus(ctx, testGrpcServer, probe.ServiceStatusStopped)
85}
86
87func (s *testCoreServer) stop() {
88 if s.server != nil {
89 s.server.Stop()
90 }
91}
92
93type testClient struct {
94 apiEndPoint string
95 probe *probe.Probe
96 client *Client
97}
98
99func serverRestarted(ctx context.Context, endPoint string) error {
100 logger.Infow(ctx, "remote-restarted", log.Fields{"endpoint": endPoint})
101 return nil
102}
103
104func newTestClient(apiEndpoint string, handler RestartedHandler) *testClient {
105 tc := &testClient{
106 apiEndPoint: apiEndpoint,
107 probe: &probe.Probe{},
108 }
109 // Set the environment variables that this client will use
110 var err error
111 err = os.Setenv(grpcBackoffInitialInterval, initialInterval.String())
112 if err != nil {
113 logger.Warnw(context.Background(), "setting-env-variable-failed", log.Fields{"error": err, "variable": grpcBackoffInitialInterval})
114 return nil
115 }
116 err = os.Setenv(grpcBackoffInitialInterval, maxInterval.String())
117 if err != nil {
118 logger.Warnw(context.Background(), "setting-env-variable-failed", log.Fields{"error": err, "variable": grpcBackoffInitialInterval})
119 return nil
120 }
121 err = os.Setenv(grpcBackoffMaxElapsedTime, maxElapsedTime.String())
122 if err != nil {
123 logger.Warnw(context.Background(), "setting-env-variable-failed", log.Fields{"error": err, "variable": grpcBackoffMaxElapsedTime})
124 return nil
125 }
126
127 err = os.Setenv(grpcMonitorInterval, monitorInterval.String())
128 if err != nil {
129 logger.Warnw(context.Background(), "setting-env-variable-failed", log.Fields{"error": err, "variable": grpcMonitorInterval})
130 return nil
131 }
132
khenaidoob9503212021-12-08 14:22:21 -0500133 tc.client, err = NewClient(
134 "test-endpoint",
135 apiEndpoint,
136 handler)
khenaidoo26721882021-08-11 17:42:52 -0400137 if err != nil {
138 return nil
139 }
140 return tc
141}
142
khenaidoob9503212021-12-08 14:22:21 -0500143func setAndTestCoreServiceHandler(ctx context.Context, conn *grpc.ClientConn, clientConn *common.Connection) interface{} {
khenaidoo26721882021-08-11 17:42:52 -0400144 if conn == nil {
145 return nil
146 }
khenaidooa5feb8e2021-10-19 17:29:22 -0400147 svc := core_service.NewCoreServiceClient(conn)
khenaidoob9503212021-12-08 14:22:21 -0500148 if h, err := svc.GetHealthStatus(ctx, clientConn); err != nil || h.State != health.HealthStatus_HEALTHY {
khenaidoo26721882021-08-11 17:42:52 -0400149 return nil
150 }
151 return svc
152}
153
khenaidoob9503212021-12-08 14:22:21 -0500154func idleConnectionTest(ctx context.Context, conn *grpc.ClientConn, clientConn *common.Connection) interface{} {
khenaidoo26721882021-08-11 17:42:52 -0400155 if conn == nil {
156 return nil
157 }
khenaidooa5feb8e2021-10-19 17:29:22 -0400158 svc := core_service.NewCoreServiceClient(conn)
khenaidoob9503212021-12-08 14:22:21 -0500159 if h, err := svc.GetHealthStatus(ctx, clientConn); err != nil || h.State != health.HealthStatus_HEALTHY {
khenaidoo26721882021-08-11 17:42:52 -0400160 return nil
161 }
162 testForNoActivityCh <- time.Now()
163 return svc
164}
165
166func (c *testClient) start(ctx context.Context, t *testing.T, handler SetAndTestServiceHandler) {
167 assert.NotNil(t, c.client)
168
169 probePort, err := freeport.GetFreePort()
170 assert.Nil(t, err)
171 probeEndpoint := "127.0.0.1:" + strconv.Itoa(probePort)
172 go c.probe.ListenAndServe(ctx, probeEndpoint)
173
174 probeCtx := context.WithValue(ctx, probe.ProbeContextKey, c.probe)
175 c.client.Start(probeCtx, handler)
176}
177
khenaidooa5feb8e2021-10-19 17:29:22 -0400178func (c *testClient) getClient(t *testing.T) core_service.CoreServiceClient {
khenaidoo26721882021-08-11 17:42:52 -0400179 gc, err := c.client.GetClient()
180 assert.Nil(t, err)
khenaidooa5feb8e2021-10-19 17:29:22 -0400181 coreClient, ok := gc.(core_service.CoreServiceClient)
khenaidoo26721882021-08-11 17:42:52 -0400182 assert.True(t, ok)
183 return coreClient
184}
185
186func serverStartsFirstTest(t *testing.T) {
187 // Setup
188 ctx, cancel := context.WithCancel(context.Background())
189 defer cancel()
190
191 // Create and start the test server
192 grpcPort, err := freeport.GetFreePort()
193 assert.Nil(t, err)
194 apiEndpoint := "127.0.0.1:" + strconv.Itoa(grpcPort)
195 ts := newTestCoreServer(apiEndpoint)
196 ts.registerService(ctx, t)
197 go ts.start(ctx, t)
198
199 // Create the test client and start it
200 tc := newTestClient(apiEndpoint, serverRestarted)
201 assert.NotNil(t, tc)
202 go tc.start(ctx, t, setAndTestCoreServiceHandler)
203
204 // Test 1: Verify that probe status shows ready eventually
205 var servicesReady isConditionSatisfied = func() bool {
206 return ts.probe.IsReady() && tc.probe.IsReady()
207 }
208 err = waitUntilCondition(timeout, servicesReady)
209 assert.Nil(t, err)
210
211 // Test 2: Verify we get a valid client and can make grpc requests with it
212 coreClient := tc.getClient(t)
213 assert.NotNil(t, coreClient)
214
215 device, err := coreClient.GetDevice(context.Background(), &common.ID{Id: "1234"})
216 assert.Nil(t, err)
217 assert.NotNil(t, device)
218 assert.Equal(t, "test-1234", device.Type)
219}
220
221func clientStartsFirstTest(t *testing.T) {
222 ctx, cancel := context.WithCancel(context.Background())
223 defer cancel()
224
225 // Create a grpc endpoint for the server
226 grpcPort, err := freeport.GetFreePort()
227 assert.Nil(t, err)
228 apiEndpoint := "127.0.0.1:" + strconv.Itoa(grpcPort)
229
230 // Create the test client and start it
231 tc := newTestClient(apiEndpoint, serverRestarted)
232 assert.NotNil(t, tc)
233 go tc.start(ctx, t, setAndTestCoreServiceHandler)
234
235 // Verify client is not ready
236 var clientNotReady isConditionSatisfied = func() bool {
237 serviceStatus := tc.probe.GetStatus(apiEndpoint)
238 return serviceStatus == probe.ServiceStatusNotReady ||
239 serviceStatus == probe.ServiceStatusPreparing ||
240 serviceStatus == probe.ServiceStatusFailed
241 }
242 err = waitUntilCondition(timeout, clientNotReady)
243 assert.Nil(t, err)
244
245 // Create and start the test server
246 ts := newTestCoreServer(apiEndpoint)
247 ts.registerService(ctx, t)
248 go ts.start(ctx, t)
249
250 // Test 1: Verify that probe status shows ready eventually
251 var servicesReady isConditionSatisfied = func() bool {
252 return ts.probe.IsReady() && tc.probe.IsReady()
253 }
254 err = waitUntilCondition(timeout, servicesReady)
255 assert.Nil(t, err)
256
257 // Test 2: Verify we get a valid client and can make grpc requests with it
258 coreClient := tc.getClient(t)
259 assert.NotNil(t, coreClient)
260
261 device, err := coreClient.GetDevice(context.Background(), &common.ID{Id: "1234"})
262 assert.Nil(t, err)
263 assert.NotNil(t, device)
264 assert.Equal(t, "test-1234", device.Type)
265}
266
267// Liveness function
268func livessness(timestamp time.Time) {
269 logger.Debugw(context.Background(), "received-liveness", log.Fields{"timestamp": timestamp})
270}
271
272func serverRestarts(t *testing.T, numRestartRuns int) {
273 // Setup
274 ctx, cancel := context.WithCancel(context.Background())
275 defer cancel()
276
277 // Create and start the test server
278 grpcPort, err := freeport.GetFreePort()
279 assert.Nil(t, err)
280 apiEndpoint := "127.0.0.1:" + strconv.Itoa(grpcPort)
281 ts := newTestCoreServer(apiEndpoint)
282 ts.registerService(ctx, t)
283 go ts.start(ctx, t)
284
285 // Create the test client and start it
286 tc := newTestClient(apiEndpoint, serverRestarted)
287 assert.NotNil(t, tc)
288
289 // Subscribe for liveness
290 tc.client.SubscribeForLiveness(livessness)
291 go tc.start(ctx, t, setAndTestCoreServiceHandler)
292
293 // Test 1: Verify that probe status shows ready eventually
294 var servicesReady isConditionSatisfied = func() bool {
295 return ts.probe.IsReady() && tc.probe.IsReady()
296 }
297 err = waitUntilCondition(timeout, servicesReady)
298 assert.Nil(t, err)
299
300 // Test 2: Verify we get a valid client and can make grpc requests with it
301 coreClient := tc.getClient(t)
302 assert.NotNil(t, coreClient)
303
304 device, err := coreClient.GetDevice(context.Background(), &common.ID{Id: "1234"})
305 assert.Nil(t, err)
306 assert.NotNil(t, device)
307 assert.Equal(t, "test-1234", device.Type)
308
309 for i := 1; i <= numRestartRuns; i++ {
310 //Test 3: Stop server and verify server status
311 ts.stop()
312 var serverDown isConditionSatisfied = func() bool {
313 return ts.probe.GetStatus(testGrpcServer) == probe.ServiceStatusStopped
314 }
315 err = waitUntilCondition(timeout, serverDown)
316 assert.Nil(t, err)
317
318 // Make a grpc request - this will detect the server being down and automatically trigger the grpc client
319 // to reconnect
320 _, err = coreClient.GetDevice(context.Background(), &common.ID{Id: "1234"})
321 assert.NotNil(t, err)
322
323 // Wait until the client service shows as not ready. A wait is not needed. It's just to verify that the
324 // client changes connection state.
325 var clientNotReady isConditionSatisfied = func() bool {
326 serviceStatus := tc.probe.GetStatus(apiEndpoint)
327 return serviceStatus == probe.ServiceStatusNotReady ||
328 serviceStatus == probe.ServiceStatusPreparing ||
329 serviceStatus == probe.ServiceStatusFailed
330 }
331 err = waitUntilCondition(timeout, clientNotReady)
332
333 assert.Nil(t, err)
334
335 // Keep the server down for 1/2 second
336 time.Sleep(500 * time.Millisecond)
337
338 // Test 4: Restart the server and verify the server is back online
339 go ts.start(ctx, t)
340 err = waitUntilCondition(timeout, servicesReady)
341 assert.Nil(t, err)
342
343 // Test 5: verify we can pull new device with a new client instance
344 coreClient = tc.getClient(t)
345 assert.NotNil(t, coreClient)
346 device, err := coreClient.GetDevice(context.Background(), &common.ID{Id: "1234"})
347 assert.Nil(t, err)
348 assert.Equal(t, "test-1234", device.Type)
349 }
350 // Stop the server
351 ts.stop()
352}
353
354func testNoActivity(t *testing.T) {
355 ctx, cancel := context.WithCancel(context.Background())
356 defer cancel()
357
358 // Create a grpc endpoint for the server
359 grpcPort, err := freeport.GetFreePort()
360 assert.Nil(t, err)
361 apiEndpoint := "127.0.0.1:" + strconv.Itoa(grpcPort)
362
363 // Create the test client and start it
364 tc := newTestClient(apiEndpoint, serverRestarted)
365 assert.NotNil(t, tc)
366 go tc.start(ctx, t, idleConnectionTest)
367
368 // Create and start the test server
369 ts := newTestCoreServer(apiEndpoint)
370 ts.registerService(ctx, t)
371 go ts.start(ctx, t)
372
373 // Test 1: Verify that probe status shows ready eventually
374 var servicesReady isConditionSatisfied = func() bool {
375 return ts.probe.IsReady() && tc.probe.IsReady()
376 }
377 err = waitUntilCondition(timeout, servicesReady)
378 assert.Nil(t, err)
379
380 // Test 2: Verify we get a valid client and can make grpc requests with it
381 coreClient := tc.getClient(t)
382 assert.NotNil(t, coreClient)
383
384 device, err := coreClient.GetDevice(context.Background(), &common.ID{Id: "1234"})
385 assert.Nil(t, err)
386 assert.NotNil(t, device)
387 assert.Equal(t, "test-1234", device.Type)
388
389 start := time.Now()
390 numChecks := 3 // Test for 3 checks
391 // Wait on the the idle channel - on no activity a connection probe will be attempted by the client
392 timer := time.NewTimer((monitorInterval + 1*time.Second) * time.Duration(numChecks))
393 defer timer.Stop()
394 count := 0
395loop:
396 for {
397 select {
398 case timestamp := <-testForNoActivityCh:
399 if timestamp.After(start) {
400 count += 1
401 if count > numChecks {
402 break loop
403 }
404 }
405 case <-timer.C:
406 t.Fatal("no activity on the idle channel")
407 }
408 }
409}
410
khenaidoofe90ac32021-11-08 18:17:32 -0500411func testClientFailure(t *testing.T, numClientRestarts int) {
412 ctx, cancel := context.WithCancel(context.Background())
413 defer cancel()
414 // Create a grpc endpoint for the server
415 grpcPort, err := freeport.GetFreePort()
416 assert.Nil(t, err)
417 apiEndpoint := "127.0.0.1:" + strconv.Itoa(grpcPort)
418 // Create the test client and start it
419 tc := newTestClient(apiEndpoint, serverRestarted)
420 assert.NotNil(t, tc)
421 go tc.start(ctx, t, idleConnectionTest)
422 // Create and start the test server
423 ts := newTestCoreServer(apiEndpoint)
424 ts.registerService(ctx, t)
425 go ts.start(ctx, t)
426 defer ts.stop()
427 // Test 1: Verify that probe status shows ready eventually
428 var servicesReady isConditionSatisfied = func() bool {
429 return ts.probe.IsReady() && tc.probe.IsReady()
430 }
431 err = waitUntilCondition(timeout, servicesReady)
432 assert.Nil(t, err)
433 // Test 2: Verify we get a valid client and can make grpc requests with it
434 coreClient := tc.getClient(t)
435 assert.NotNil(t, coreClient)
436 device, err := coreClient.GetDevice(context.Background(), &common.ID{Id: "1234"})
437 assert.Nil(t, err)
438 assert.NotNil(t, device)
439 assert.Equal(t, "test-1234", device.Type)
440 for i := 1; i <= numClientRestarts; i++ {
441 // Kill grpc client
442 tc.client.Stop(context.Background())
443 var clientNotReady isConditionSatisfied = func() bool {
444 return !tc.probe.IsReady()
445 }
446 err = waitUntilCondition(timeout, clientNotReady)
447 assert.Nil(t, err)
448 // Create a new client
449 tc.client, err = NewClient(
khenaidoob9503212021-12-08 14:22:21 -0500450 "test-ednpoint",
khenaidoofe90ac32021-11-08 18:17:32 -0500451 apiEndpoint,
khenaidoob9503212021-12-08 14:22:21 -0500452 serverRestarted)
khenaidoofe90ac32021-11-08 18:17:32 -0500453 assert.Nil(t, err)
454 probeCtx := context.WithValue(ctx, probe.ProbeContextKey, tc.probe)
455 go tc.client.Start(probeCtx, idleConnectionTest)
456 //Verify that probe status shows ready eventually
457 err = waitUntilCondition(timeout, servicesReady)
458 assert.Nil(t, err)
459 // Verify we get a valid client and can make grpc requests with it
460 coreClient = tc.getClient(t)
461 assert.NotNil(t, coreClient)
462 device, err = coreClient.GetDevice(context.Background(), &common.ID{Id: "1234"})
463 assert.Nil(t, err)
464 assert.NotNil(t, device)
465 assert.Equal(t, "test-1234", device.Type)
466 }
467 tc.client.Stop(context.Background())
468}
469
khenaidoo26721882021-08-11 17:42:52 -0400470func testServerLimit(t *testing.T) {
471 t.Skip() // Not needed for regular unit tests
472
473 ctx, cancel := context.WithCancel(context.Background())
474 defer cancel()
475
476 // Create a grpc endpoint for the server
477 grpcPort, err := freeport.GetFreePort()
478 assert.Nil(t, err)
479 apiEndpoint := "127.0.0.1:" + strconv.Itoa(grpcPort)
480
481 // Create the test client and start it
482 tc := newTestClient(apiEndpoint, serverRestarted)
483 assert.NotNil(t, tc)
484 go tc.start(ctx, t, idleConnectionTest)
485
486 // Create and start the test server
487 ts := newTestCoreServer(apiEndpoint)
488 ts.registerService(ctx, t)
489 go ts.start(ctx, t)
490
491 // Test 1: Verify that probe status shows ready eventually
492 var servicesReady isConditionSatisfied = func() bool {
493 return ts.probe.IsReady() && tc.probe.IsReady()
494 }
495 err = waitUntilCondition(timeout, servicesReady)
496 assert.Nil(t, err)
497
498 // Test 2: Verify we get a valid client and can make grpc requests with it
499 coreClient := tc.getClient(t)
500 assert.NotNil(t, coreClient)
501
502 var lock sync.RWMutex
503 bad := []time.Duration{}
504 bad_err := []string{}
505 good := []time.Duration{}
506 var wg sync.WaitGroup
507 numRPCs := 10
508 total_good := time.Duration(0)
509 max_good := time.Duration(0)
510 total_bad := time.Duration(0)
511 max_bad := time.Duration(0)
512 order_received := []uint32{}
513 for i := 1; i <= numRPCs; i++ {
514 wg.Add(1)
515 go func(seq int) {
516 local := time.Now()
517 ctx, cancel := context.WithTimeout(context.Background(), 10000*time.Millisecond)
518 defer cancel()
519 var err error
520 var d *voltha.Device
521 d, err = coreClient.GetDevice(ctx, &common.ID{Id: strconv.Itoa(seq)})
522 if err != nil {
523 lock.Lock()
524 bad = append(bad, time.Since(local))
525 bad_err = append(bad_err, err.Error())
526 total_bad += time.Since(local)
527 if time.Since(local) > max_bad {
528 max_bad = time.Since(local)
529 }
530 logger.Errorw(ctx, "error produced", log.Fields{"error": err})
531 lock.Unlock()
532 } else {
533 lock.Lock()
534 good = append(good, time.Since(local))
535 total_good += time.Since(local)
536 if time.Since(local) > max_good {
537 max_good = time.Since(local)
538 }
539 if d != nil {
540 order_received = append(order_received, d.Vlan)
541 }
542 lock.Unlock()
543 }
544 wg.Done()
545 }(i)
546 }
547 wg.Wait()
548 assert.Equal(t, 0, len(bad))
549 assert.Equal(t, numRPCs, len(good))
550 //fmt.Println("Bad:", bad[:10])
551 if len(bad_err) > 0 {
552 fmt.Println("Bad Err Last:", bad_err[len(bad_err)-1:])
553 fmt.Println("Bad Err First:", bad_err[:1])
554 }
555 fmt.Println("Good:", good[len(good)-10:])
556 fmt.Println("Good average time:", total_good.Milliseconds()/int64(numRPCs))
557 fmt.Println("Bad average time:", total_bad.Milliseconds()/int64(numRPCs))
558 fmt.Println("Bad Max:", max_bad)
559 fmt.Println("Good Max:", max_good)
560 //fmt.Println("Order received:", order_received)
561
562 prev := order_received[0]
563
564 for i := 1; i < len(order_received); i++ {
565 if order_received[i] < prev {
566 fmt.Println("Prev:", prev, " curr:", order_received[i])
567 }
568 prev = order_received[i]
569 }
570}
571
572func TestSuiteClient3(t *testing.T) {
573 // Setup
574 log.SetAllLogLevel(volthaTestLogLevel)
575
576 // Test starting server before client
577 serverStartsFirstTest(t)
578
579 // Test starting client before server
580 clientStartsFirstTest(t)
581
582 // Test server restarts
583 serverRestarts(t, 1)
584
585 //Test that the client test the grpc connection on no activity
586 testNoActivity(t)
587
588 // Test client queueing with server limit
589 testServerLimit(t)
khenaidoofe90ac32021-11-08 18:17:32 -0500590
591 // Test the scenario where a client restarts
592 testClientFailure(t, 10)
khenaidoo26721882021-08-11 17:42:52 -0400593}