blob: e06ad7bc68e1649dcdd2f313cf001398238a0971 [file] [log] [blame]
khenaidoo26721882021-08-11 17:42:52 -04001/*
2 * Copyright 2021-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package grpc
17
18import (
19 "context"
20 "fmt"
21 "os"
22 "strconv"
23 "sync"
24 "testing"
25 "time"
26
27 "github.com/golang/protobuf/ptypes/empty"
28 "github.com/opencord/voltha-lib-go/v7/pkg/log"
29 "github.com/opencord/voltha-lib-go/v7/pkg/probe"
30 "github.com/opencord/voltha-protos/v5/go/common"
khenaidooa5feb8e2021-10-19 17:29:22 -040031 "github.com/opencord/voltha-protos/v5/go/core_service"
32 "github.com/opencord/voltha-protos/v5/go/health"
khenaidoo26721882021-08-11 17:42:52 -040033 "github.com/opencord/voltha-protos/v5/go/voltha"
34 "github.com/phayes/freeport"
35 "github.com/stretchr/testify/assert"
36 "google.golang.org/grpc"
37)
38
39const (
40 testGrpcServer = "test-grpc-server"
41 initialInterval = 100 * time.Millisecond
42 maxInterval = 5000 * time.Millisecond
43 maxElapsedTime = 0 * time.Millisecond
44 monitorInterval = 2 * time.Second
45 timeout = 10 * time.Second
46)
47
48var testForNoActivityCh = make(chan time.Time, 10)
49
50type testCoreServer struct {
51 apiEndPoint string
52 server *GrpcServer
53 probe *probe.Probe
54}
55
56func newTestCoreServer(apiEndpoint string) *testCoreServer {
57 return &testCoreServer{
58 apiEndPoint: apiEndpoint,
59 probe: &probe.Probe{},
60 }
61}
62
63func (s *testCoreServer) registerService(ctx context.Context, t *testing.T) {
64 assert.NotEqual(t, "", s.apiEndPoint)
65
66 probePort, err := freeport.GetFreePort()
67 assert.Nil(t, err)
68 probeEndpoint := "127.0.0.1:" + strconv.Itoa(probePort)
69 go s.probe.ListenAndServe(ctx, probeEndpoint)
70 s.probe.RegisterService(ctx, testGrpcServer)
71
72 s.server = NewGrpcServer(s.apiEndPoint, nil, false, s.probe)
73
74 s.server.AddService(func(server *grpc.Server) {
khenaidooa5feb8e2021-10-19 17:29:22 -040075 core_service.RegisterCoreServiceServer(server, &MockCoreServiceHandler{})
khenaidoo26721882021-08-11 17:42:52 -040076 })
77}
78
79func (s *testCoreServer) start(ctx context.Context, t *testing.T) {
80 assert.NotNil(t, s.server)
81 assert.NotEqual(t, "", s.apiEndPoint)
82
83 s.probe.UpdateStatus(ctx, testGrpcServer, probe.ServiceStatusRunning)
84 s.server.Start(ctx)
85 s.probe.UpdateStatus(ctx, testGrpcServer, probe.ServiceStatusStopped)
86}
87
88func (s *testCoreServer) stop() {
89 if s.server != nil {
90 s.server.Stop()
91 }
92}
93
94type testClient struct {
95 apiEndPoint string
96 probe *probe.Probe
97 client *Client
98}
99
100func serverRestarted(ctx context.Context, endPoint string) error {
101 logger.Infow(ctx, "remote-restarted", log.Fields{"endpoint": endPoint})
102 return nil
103}
104
105func newTestClient(apiEndpoint string, handler RestartedHandler) *testClient {
106 tc := &testClient{
107 apiEndPoint: apiEndpoint,
108 probe: &probe.Probe{},
109 }
110 // Set the environment variables that this client will use
111 var err error
112 err = os.Setenv(grpcBackoffInitialInterval, initialInterval.String())
113 if err != nil {
114 logger.Warnw(context.Background(), "setting-env-variable-failed", log.Fields{"error": err, "variable": grpcBackoffInitialInterval})
115 return nil
116 }
117 err = os.Setenv(grpcBackoffInitialInterval, maxInterval.String())
118 if err != nil {
119 logger.Warnw(context.Background(), "setting-env-variable-failed", log.Fields{"error": err, "variable": grpcBackoffInitialInterval})
120 return nil
121 }
122 err = os.Setenv(grpcBackoffMaxElapsedTime, maxElapsedTime.String())
123 if err != nil {
124 logger.Warnw(context.Background(), "setting-env-variable-failed", log.Fields{"error": err, "variable": grpcBackoffMaxElapsedTime})
125 return nil
126 }
127
128 err = os.Setenv(grpcMonitorInterval, monitorInterval.String())
129 if err != nil {
130 logger.Warnw(context.Background(), "setting-env-variable-failed", log.Fields{"error": err, "variable": grpcMonitorInterval})
131 return nil
132 }
133
134 tc.client, err = NewClient(apiEndpoint,
135 handler,
136 ActivityCheck(true))
137 if err != nil {
138 return nil
139 }
140 return tc
141}
142
143func setAndTestCoreServiceHandler(ctx context.Context, conn *grpc.ClientConn) interface{} {
144 if conn == nil {
145 return nil
146 }
khenaidooa5feb8e2021-10-19 17:29:22 -0400147 svc := core_service.NewCoreServiceClient(conn)
148 if h, err := svc.GetHealthStatus(ctx, &empty.Empty{}); err != nil || h.State != health.HealthStatus_HEALTHY {
khenaidoo26721882021-08-11 17:42:52 -0400149 return nil
150 }
151 return svc
152}
153
154func idleConnectionTest(ctx context.Context, conn *grpc.ClientConn) interface{} {
155 if conn == nil {
156 return nil
157 }
khenaidooa5feb8e2021-10-19 17:29:22 -0400158 svc := core_service.NewCoreServiceClient(conn)
159 if h, err := svc.GetHealthStatus(ctx, &empty.Empty{}); err != nil || h.State != health.HealthStatus_HEALTHY {
khenaidoo26721882021-08-11 17:42:52 -0400160 return nil
161 }
162 testForNoActivityCh <- time.Now()
163 return svc
164}
165
166func (c *testClient) start(ctx context.Context, t *testing.T, handler SetAndTestServiceHandler) {
167 assert.NotNil(t, c.client)
168
169 probePort, err := freeport.GetFreePort()
170 assert.Nil(t, err)
171 probeEndpoint := "127.0.0.1:" + strconv.Itoa(probePort)
172 go c.probe.ListenAndServe(ctx, probeEndpoint)
173
174 probeCtx := context.WithValue(ctx, probe.ProbeContextKey, c.probe)
175 c.client.Start(probeCtx, handler)
176}
177
khenaidooa5feb8e2021-10-19 17:29:22 -0400178func (c *testClient) getClient(t *testing.T) core_service.CoreServiceClient {
khenaidoo26721882021-08-11 17:42:52 -0400179 gc, err := c.client.GetClient()
180 assert.Nil(t, err)
khenaidooa5feb8e2021-10-19 17:29:22 -0400181 coreClient, ok := gc.(core_service.CoreServiceClient)
khenaidoo26721882021-08-11 17:42:52 -0400182 assert.True(t, ok)
183 return coreClient
184}
185
186func serverStartsFirstTest(t *testing.T) {
187 // Setup
188 ctx, cancel := context.WithCancel(context.Background())
189 defer cancel()
190
191 // Create and start the test server
192 grpcPort, err := freeport.GetFreePort()
193 assert.Nil(t, err)
194 apiEndpoint := "127.0.0.1:" + strconv.Itoa(grpcPort)
195 ts := newTestCoreServer(apiEndpoint)
196 ts.registerService(ctx, t)
197 go ts.start(ctx, t)
198
199 // Create the test client and start it
200 tc := newTestClient(apiEndpoint, serverRestarted)
201 assert.NotNil(t, tc)
202 go tc.start(ctx, t, setAndTestCoreServiceHandler)
203
204 // Test 1: Verify that probe status shows ready eventually
205 var servicesReady isConditionSatisfied = func() bool {
206 return ts.probe.IsReady() && tc.probe.IsReady()
207 }
208 err = waitUntilCondition(timeout, servicesReady)
209 assert.Nil(t, err)
210
211 // Test 2: Verify we get a valid client and can make grpc requests with it
212 coreClient := tc.getClient(t)
213 assert.NotNil(t, coreClient)
214
215 device, err := coreClient.GetDevice(context.Background(), &common.ID{Id: "1234"})
216 assert.Nil(t, err)
217 assert.NotNil(t, device)
218 assert.Equal(t, "test-1234", device.Type)
219}
220
221func clientStartsFirstTest(t *testing.T) {
222 ctx, cancel := context.WithCancel(context.Background())
223 defer cancel()
224
225 // Create a grpc endpoint for the server
226 grpcPort, err := freeport.GetFreePort()
227 assert.Nil(t, err)
228 apiEndpoint := "127.0.0.1:" + strconv.Itoa(grpcPort)
229
230 // Create the test client and start it
231 tc := newTestClient(apiEndpoint, serverRestarted)
232 assert.NotNil(t, tc)
233 go tc.start(ctx, t, setAndTestCoreServiceHandler)
234
235 // Verify client is not ready
236 var clientNotReady isConditionSatisfied = func() bool {
237 serviceStatus := tc.probe.GetStatus(apiEndpoint)
238 return serviceStatus == probe.ServiceStatusNotReady ||
239 serviceStatus == probe.ServiceStatusPreparing ||
240 serviceStatus == probe.ServiceStatusFailed
241 }
242 err = waitUntilCondition(timeout, clientNotReady)
243 assert.Nil(t, err)
244
245 // Create and start the test server
246 ts := newTestCoreServer(apiEndpoint)
247 ts.registerService(ctx, t)
248 go ts.start(ctx, t)
249
250 // Test 1: Verify that probe status shows ready eventually
251 var servicesReady isConditionSatisfied = func() bool {
252 return ts.probe.IsReady() && tc.probe.IsReady()
253 }
254 err = waitUntilCondition(timeout, servicesReady)
255 assert.Nil(t, err)
256
257 // Test 2: Verify we get a valid client and can make grpc requests with it
258 coreClient := tc.getClient(t)
259 assert.NotNil(t, coreClient)
260
261 device, err := coreClient.GetDevice(context.Background(), &common.ID{Id: "1234"})
262 assert.Nil(t, err)
263 assert.NotNil(t, device)
264 assert.Equal(t, "test-1234", device.Type)
265}
266
267// Liveness function
268func livessness(timestamp time.Time) {
269 logger.Debugw(context.Background(), "received-liveness", log.Fields{"timestamp": timestamp})
270}
271
272func serverRestarts(t *testing.T, numRestartRuns int) {
273 // Setup
274 ctx, cancel := context.WithCancel(context.Background())
275 defer cancel()
276
277 // Create and start the test server
278 grpcPort, err := freeport.GetFreePort()
279 assert.Nil(t, err)
280 apiEndpoint := "127.0.0.1:" + strconv.Itoa(grpcPort)
281 ts := newTestCoreServer(apiEndpoint)
282 ts.registerService(ctx, t)
283 go ts.start(ctx, t)
284
285 // Create the test client and start it
286 tc := newTestClient(apiEndpoint, serverRestarted)
287 assert.NotNil(t, tc)
288
289 // Subscribe for liveness
290 tc.client.SubscribeForLiveness(livessness)
291 go tc.start(ctx, t, setAndTestCoreServiceHandler)
292
293 // Test 1: Verify that probe status shows ready eventually
294 var servicesReady isConditionSatisfied = func() bool {
295 return ts.probe.IsReady() && tc.probe.IsReady()
296 }
297 err = waitUntilCondition(timeout, servicesReady)
298 assert.Nil(t, err)
299
300 // Test 2: Verify we get a valid client and can make grpc requests with it
301 coreClient := tc.getClient(t)
302 assert.NotNil(t, coreClient)
303
304 device, err := coreClient.GetDevice(context.Background(), &common.ID{Id: "1234"})
305 assert.Nil(t, err)
306 assert.NotNil(t, device)
307 assert.Equal(t, "test-1234", device.Type)
308
309 for i := 1; i <= numRestartRuns; i++ {
310 //Test 3: Stop server and verify server status
311 ts.stop()
312 var serverDown isConditionSatisfied = func() bool {
313 return ts.probe.GetStatus(testGrpcServer) == probe.ServiceStatusStopped
314 }
315 err = waitUntilCondition(timeout, serverDown)
316 assert.Nil(t, err)
317
318 // Make a grpc request - this will detect the server being down and automatically trigger the grpc client
319 // to reconnect
320 _, err = coreClient.GetDevice(context.Background(), &common.ID{Id: "1234"})
321 assert.NotNil(t, err)
322
323 // Wait until the client service shows as not ready. A wait is not needed. It's just to verify that the
324 // client changes connection state.
325 var clientNotReady isConditionSatisfied = func() bool {
326 serviceStatus := tc.probe.GetStatus(apiEndpoint)
327 return serviceStatus == probe.ServiceStatusNotReady ||
328 serviceStatus == probe.ServiceStatusPreparing ||
329 serviceStatus == probe.ServiceStatusFailed
330 }
331 err = waitUntilCondition(timeout, clientNotReady)
332
333 assert.Nil(t, err)
334
335 // Keep the server down for 1/2 second
336 time.Sleep(500 * time.Millisecond)
337
338 // Test 4: Restart the server and verify the server is back online
339 go ts.start(ctx, t)
340 err = waitUntilCondition(timeout, servicesReady)
341 assert.Nil(t, err)
342
343 // Test 5: verify we can pull new device with a new client instance
344 coreClient = tc.getClient(t)
345 assert.NotNil(t, coreClient)
346 device, err := coreClient.GetDevice(context.Background(), &common.ID{Id: "1234"})
347 assert.Nil(t, err)
348 assert.Equal(t, "test-1234", device.Type)
349 }
350 // Stop the server
351 ts.stop()
352}
353
354func testNoActivity(t *testing.T) {
355 ctx, cancel := context.WithCancel(context.Background())
356 defer cancel()
357
358 // Create a grpc endpoint for the server
359 grpcPort, err := freeport.GetFreePort()
360 assert.Nil(t, err)
361 apiEndpoint := "127.0.0.1:" + strconv.Itoa(grpcPort)
362
363 // Create the test client and start it
364 tc := newTestClient(apiEndpoint, serverRestarted)
365 assert.NotNil(t, tc)
366 go tc.start(ctx, t, idleConnectionTest)
367
368 // Create and start the test server
369 ts := newTestCoreServer(apiEndpoint)
370 ts.registerService(ctx, t)
371 go ts.start(ctx, t)
372
373 // Test 1: Verify that probe status shows ready eventually
374 var servicesReady isConditionSatisfied = func() bool {
375 return ts.probe.IsReady() && tc.probe.IsReady()
376 }
377 err = waitUntilCondition(timeout, servicesReady)
378 assert.Nil(t, err)
379
380 // Test 2: Verify we get a valid client and can make grpc requests with it
381 coreClient := tc.getClient(t)
382 assert.NotNil(t, coreClient)
383
384 device, err := coreClient.GetDevice(context.Background(), &common.ID{Id: "1234"})
385 assert.Nil(t, err)
386 assert.NotNil(t, device)
387 assert.Equal(t, "test-1234", device.Type)
388
389 start := time.Now()
390 numChecks := 3 // Test for 3 checks
391 // Wait on the the idle channel - on no activity a connection probe will be attempted by the client
392 timer := time.NewTimer((monitorInterval + 1*time.Second) * time.Duration(numChecks))
393 defer timer.Stop()
394 count := 0
395loop:
396 for {
397 select {
398 case timestamp := <-testForNoActivityCh:
399 if timestamp.After(start) {
400 count += 1
401 if count > numChecks {
402 break loop
403 }
404 }
405 case <-timer.C:
406 t.Fatal("no activity on the idle channel")
407 }
408 }
409}
410
411func testServerLimit(t *testing.T) {
412 t.Skip() // Not needed for regular unit tests
413
414 ctx, cancel := context.WithCancel(context.Background())
415 defer cancel()
416
417 // Create a grpc endpoint for the server
418 grpcPort, err := freeport.GetFreePort()
419 assert.Nil(t, err)
420 apiEndpoint := "127.0.0.1:" + strconv.Itoa(grpcPort)
421
422 // Create the test client and start it
423 tc := newTestClient(apiEndpoint, serverRestarted)
424 assert.NotNil(t, tc)
425 go tc.start(ctx, t, idleConnectionTest)
426
427 // Create and start the test server
428 ts := newTestCoreServer(apiEndpoint)
429 ts.registerService(ctx, t)
430 go ts.start(ctx, t)
431
432 // Test 1: Verify that probe status shows ready eventually
433 var servicesReady isConditionSatisfied = func() bool {
434 return ts.probe.IsReady() && tc.probe.IsReady()
435 }
436 err = waitUntilCondition(timeout, servicesReady)
437 assert.Nil(t, err)
438
439 // Test 2: Verify we get a valid client and can make grpc requests with it
440 coreClient := tc.getClient(t)
441 assert.NotNil(t, coreClient)
442
443 var lock sync.RWMutex
444 bad := []time.Duration{}
445 bad_err := []string{}
446 good := []time.Duration{}
447 var wg sync.WaitGroup
448 numRPCs := 10
449 total_good := time.Duration(0)
450 max_good := time.Duration(0)
451 total_bad := time.Duration(0)
452 max_bad := time.Duration(0)
453 order_received := []uint32{}
454 for i := 1; i <= numRPCs; i++ {
455 wg.Add(1)
456 go func(seq int) {
457 local := time.Now()
458 ctx, cancel := context.WithTimeout(context.Background(), 10000*time.Millisecond)
459 defer cancel()
460 var err error
461 var d *voltha.Device
462 d, err = coreClient.GetDevice(ctx, &common.ID{Id: strconv.Itoa(seq)})
463 if err != nil {
464 lock.Lock()
465 bad = append(bad, time.Since(local))
466 bad_err = append(bad_err, err.Error())
467 total_bad += time.Since(local)
468 if time.Since(local) > max_bad {
469 max_bad = time.Since(local)
470 }
471 logger.Errorw(ctx, "error produced", log.Fields{"error": err})
472 lock.Unlock()
473 } else {
474 lock.Lock()
475 good = append(good, time.Since(local))
476 total_good += time.Since(local)
477 if time.Since(local) > max_good {
478 max_good = time.Since(local)
479 }
480 if d != nil {
481 order_received = append(order_received, d.Vlan)
482 }
483 lock.Unlock()
484 }
485 wg.Done()
486 }(i)
487 }
488 wg.Wait()
489 assert.Equal(t, 0, len(bad))
490 assert.Equal(t, numRPCs, len(good))
491 //fmt.Println("Bad:", bad[:10])
492 if len(bad_err) > 0 {
493 fmt.Println("Bad Err Last:", bad_err[len(bad_err)-1:])
494 fmt.Println("Bad Err First:", bad_err[:1])
495 }
496 fmt.Println("Good:", good[len(good)-10:])
497 fmt.Println("Good average time:", total_good.Milliseconds()/int64(numRPCs))
498 fmt.Println("Bad average time:", total_bad.Milliseconds()/int64(numRPCs))
499 fmt.Println("Bad Max:", max_bad)
500 fmt.Println("Good Max:", max_good)
501 //fmt.Println("Order received:", order_received)
502
503 prev := order_received[0]
504
505 for i := 1; i < len(order_received); i++ {
506 if order_received[i] < prev {
507 fmt.Println("Prev:", prev, " curr:", order_received[i])
508 }
509 prev = order_received[i]
510 }
511}
512
513func TestSuiteClient3(t *testing.T) {
514 // Setup
515 log.SetAllLogLevel(volthaTestLogLevel)
516
517 // Test starting server before client
518 serverStartsFirstTest(t)
519
520 // Test starting client before server
521 clientStartsFirstTest(t)
522
523 // Test server restarts
524 serverRestarts(t, 1)
525
526 //Test that the client test the grpc connection on no activity
527 testNoActivity(t)
528
529 // Test client queueing with server limit
530 testServerLimit(t)
531}