blob: d8ea31df4c838901f3d5b940dc187948b5a7ff80 [file] [log] [blame]
khenaidoo26721882021-08-11 17:42:52 -04001/*
2 * Copyright 2021-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package grpc
17
18import (
19 "context"
20 "fmt"
21 "os"
22 "strconv"
23 "sync"
24 "testing"
25 "time"
26
27 "github.com/golang/protobuf/ptypes/empty"
28 "github.com/opencord/voltha-lib-go/v7/pkg/log"
29 "github.com/opencord/voltha-lib-go/v7/pkg/probe"
30 "github.com/opencord/voltha-protos/v5/go/common"
31 "github.com/opencord/voltha-protos/v5/go/core"
32 "github.com/opencord/voltha-protos/v5/go/voltha"
33 "github.com/phayes/freeport"
34 "github.com/stretchr/testify/assert"
35 "google.golang.org/grpc"
36)
37
38const (
39 testGrpcServer = "test-grpc-server"
40 initialInterval = 100 * time.Millisecond
41 maxInterval = 5000 * time.Millisecond
42 maxElapsedTime = 0 * time.Millisecond
43 monitorInterval = 2 * time.Second
44 timeout = 10 * time.Second
45)
46
47var testForNoActivityCh = make(chan time.Time, 10)
48
49type testCoreServer struct {
50 apiEndPoint string
51 server *GrpcServer
52 probe *probe.Probe
53}
54
55func newTestCoreServer(apiEndpoint string) *testCoreServer {
56 return &testCoreServer{
57 apiEndPoint: apiEndpoint,
58 probe: &probe.Probe{},
59 }
60}
61
62func (s *testCoreServer) registerService(ctx context.Context, t *testing.T) {
63 assert.NotEqual(t, "", s.apiEndPoint)
64
65 probePort, err := freeport.GetFreePort()
66 assert.Nil(t, err)
67 probeEndpoint := "127.0.0.1:" + strconv.Itoa(probePort)
68 go s.probe.ListenAndServe(ctx, probeEndpoint)
69 s.probe.RegisterService(ctx, testGrpcServer)
70
71 s.server = NewGrpcServer(s.apiEndPoint, nil, false, s.probe)
72
73 s.server.AddService(func(server *grpc.Server) {
74 core.RegisterCoreServiceServer(server, &MockCoreServiceHandler{})
75 })
76}
77
78func (s *testCoreServer) start(ctx context.Context, t *testing.T) {
79 assert.NotNil(t, s.server)
80 assert.NotEqual(t, "", s.apiEndPoint)
81
82 s.probe.UpdateStatus(ctx, testGrpcServer, probe.ServiceStatusRunning)
83 s.server.Start(ctx)
84 s.probe.UpdateStatus(ctx, testGrpcServer, probe.ServiceStatusStopped)
85}
86
87func (s *testCoreServer) stop() {
88 if s.server != nil {
89 s.server.Stop()
90 }
91}
92
93type testClient struct {
94 apiEndPoint string
95 probe *probe.Probe
96 client *Client
97}
98
99func serverRestarted(ctx context.Context, endPoint string) error {
100 logger.Infow(ctx, "remote-restarted", log.Fields{"endpoint": endPoint})
101 return nil
102}
103
104func newTestClient(apiEndpoint string, handler RestartedHandler) *testClient {
105 tc := &testClient{
106 apiEndPoint: apiEndpoint,
107 probe: &probe.Probe{},
108 }
109 // Set the environment variables that this client will use
110 var err error
111 err = os.Setenv(grpcBackoffInitialInterval, initialInterval.String())
112 if err != nil {
113 logger.Warnw(context.Background(), "setting-env-variable-failed", log.Fields{"error": err, "variable": grpcBackoffInitialInterval})
114 return nil
115 }
116 err = os.Setenv(grpcBackoffInitialInterval, maxInterval.String())
117 if err != nil {
118 logger.Warnw(context.Background(), "setting-env-variable-failed", log.Fields{"error": err, "variable": grpcBackoffInitialInterval})
119 return nil
120 }
121 err = os.Setenv(grpcBackoffMaxElapsedTime, maxElapsedTime.String())
122 if err != nil {
123 logger.Warnw(context.Background(), "setting-env-variable-failed", log.Fields{"error": err, "variable": grpcBackoffMaxElapsedTime})
124 return nil
125 }
126
127 err = os.Setenv(grpcMonitorInterval, monitorInterval.String())
128 if err != nil {
129 logger.Warnw(context.Background(), "setting-env-variable-failed", log.Fields{"error": err, "variable": grpcMonitorInterval})
130 return nil
131 }
132
133 tc.client, err = NewClient(apiEndpoint,
134 handler,
135 ActivityCheck(true))
136 if err != nil {
137 return nil
138 }
139 return tc
140}
141
142func setAndTestCoreServiceHandler(ctx context.Context, conn *grpc.ClientConn) interface{} {
143 if conn == nil {
144 return nil
145 }
146 svc := core.NewCoreServiceClient(conn)
147 if h, err := svc.GetHealthStatus(ctx, &empty.Empty{}); err != nil || h.State != voltha.HealthStatus_HEALTHY {
148 return nil
149 }
150 return svc
151}
152
153func idleConnectionTest(ctx context.Context, conn *grpc.ClientConn) interface{} {
154 if conn == nil {
155 return nil
156 }
157 svc := core.NewCoreServiceClient(conn)
158 if h, err := svc.GetHealthStatus(ctx, &empty.Empty{}); err != nil || h.State != voltha.HealthStatus_HEALTHY {
159 return nil
160 }
161 testForNoActivityCh <- time.Now()
162 return svc
163}
164
165func (c *testClient) start(ctx context.Context, t *testing.T, handler SetAndTestServiceHandler) {
166 assert.NotNil(t, c.client)
167
168 probePort, err := freeport.GetFreePort()
169 assert.Nil(t, err)
170 probeEndpoint := "127.0.0.1:" + strconv.Itoa(probePort)
171 go c.probe.ListenAndServe(ctx, probeEndpoint)
172
173 probeCtx := context.WithValue(ctx, probe.ProbeContextKey, c.probe)
174 c.client.Start(probeCtx, handler)
175}
176
177func (c *testClient) getClient(t *testing.T) core.CoreServiceClient {
178 gc, err := c.client.GetClient()
179 assert.Nil(t, err)
180 coreClient, ok := gc.(core.CoreServiceClient)
181 assert.True(t, ok)
182 return coreClient
183}
184
185func serverStartsFirstTest(t *testing.T) {
186 // Setup
187 ctx, cancel := context.WithCancel(context.Background())
188 defer cancel()
189
190 // Create and start the test server
191 grpcPort, err := freeport.GetFreePort()
192 assert.Nil(t, err)
193 apiEndpoint := "127.0.0.1:" + strconv.Itoa(grpcPort)
194 ts := newTestCoreServer(apiEndpoint)
195 ts.registerService(ctx, t)
196 go ts.start(ctx, t)
197
198 // Create the test client and start it
199 tc := newTestClient(apiEndpoint, serverRestarted)
200 assert.NotNil(t, tc)
201 go tc.start(ctx, t, setAndTestCoreServiceHandler)
202
203 // Test 1: Verify that probe status shows ready eventually
204 var servicesReady isConditionSatisfied = func() bool {
205 return ts.probe.IsReady() && tc.probe.IsReady()
206 }
207 err = waitUntilCondition(timeout, servicesReady)
208 assert.Nil(t, err)
209
210 // Test 2: Verify we get a valid client and can make grpc requests with it
211 coreClient := tc.getClient(t)
212 assert.NotNil(t, coreClient)
213
214 device, err := coreClient.GetDevice(context.Background(), &common.ID{Id: "1234"})
215 assert.Nil(t, err)
216 assert.NotNil(t, device)
217 assert.Equal(t, "test-1234", device.Type)
218}
219
220func clientStartsFirstTest(t *testing.T) {
221 ctx, cancel := context.WithCancel(context.Background())
222 defer cancel()
223
224 // Create a grpc endpoint for the server
225 grpcPort, err := freeport.GetFreePort()
226 assert.Nil(t, err)
227 apiEndpoint := "127.0.0.1:" + strconv.Itoa(grpcPort)
228
229 // Create the test client and start it
230 tc := newTestClient(apiEndpoint, serverRestarted)
231 assert.NotNil(t, tc)
232 go tc.start(ctx, t, setAndTestCoreServiceHandler)
233
234 // Verify client is not ready
235 var clientNotReady isConditionSatisfied = func() bool {
236 serviceStatus := tc.probe.GetStatus(apiEndpoint)
237 return serviceStatus == probe.ServiceStatusNotReady ||
238 serviceStatus == probe.ServiceStatusPreparing ||
239 serviceStatus == probe.ServiceStatusFailed
240 }
241 err = waitUntilCondition(timeout, clientNotReady)
242 assert.Nil(t, err)
243
244 // Create and start the test server
245 ts := newTestCoreServer(apiEndpoint)
246 ts.registerService(ctx, t)
247 go ts.start(ctx, t)
248
249 // Test 1: Verify that probe status shows ready eventually
250 var servicesReady isConditionSatisfied = func() bool {
251 return ts.probe.IsReady() && tc.probe.IsReady()
252 }
253 err = waitUntilCondition(timeout, servicesReady)
254 assert.Nil(t, err)
255
256 // Test 2: Verify we get a valid client and can make grpc requests with it
257 coreClient := tc.getClient(t)
258 assert.NotNil(t, coreClient)
259
260 device, err := coreClient.GetDevice(context.Background(), &common.ID{Id: "1234"})
261 assert.Nil(t, err)
262 assert.NotNil(t, device)
263 assert.Equal(t, "test-1234", device.Type)
264}
265
266// Liveness function
267func livessness(timestamp time.Time) {
268 logger.Debugw(context.Background(), "received-liveness", log.Fields{"timestamp": timestamp})
269}
270
271func serverRestarts(t *testing.T, numRestartRuns int) {
272 // Setup
273 ctx, cancel := context.WithCancel(context.Background())
274 defer cancel()
275
276 // Create and start the test server
277 grpcPort, err := freeport.GetFreePort()
278 assert.Nil(t, err)
279 apiEndpoint := "127.0.0.1:" + strconv.Itoa(grpcPort)
280 ts := newTestCoreServer(apiEndpoint)
281 ts.registerService(ctx, t)
282 go ts.start(ctx, t)
283
284 // Create the test client and start it
285 tc := newTestClient(apiEndpoint, serverRestarted)
286 assert.NotNil(t, tc)
287
288 // Subscribe for liveness
289 tc.client.SubscribeForLiveness(livessness)
290 go tc.start(ctx, t, setAndTestCoreServiceHandler)
291
292 // Test 1: Verify that probe status shows ready eventually
293 var servicesReady isConditionSatisfied = func() bool {
294 return ts.probe.IsReady() && tc.probe.IsReady()
295 }
296 err = waitUntilCondition(timeout, servicesReady)
297 assert.Nil(t, err)
298
299 // Test 2: Verify we get a valid client and can make grpc requests with it
300 coreClient := tc.getClient(t)
301 assert.NotNil(t, coreClient)
302
303 device, err := coreClient.GetDevice(context.Background(), &common.ID{Id: "1234"})
304 assert.Nil(t, err)
305 assert.NotNil(t, device)
306 assert.Equal(t, "test-1234", device.Type)
307
308 for i := 1; i <= numRestartRuns; i++ {
309 //Test 3: Stop server and verify server status
310 ts.stop()
311 var serverDown isConditionSatisfied = func() bool {
312 return ts.probe.GetStatus(testGrpcServer) == probe.ServiceStatusStopped
313 }
314 err = waitUntilCondition(timeout, serverDown)
315 assert.Nil(t, err)
316
317 // Make a grpc request - this will detect the server being down and automatically trigger the grpc client
318 // to reconnect
319 _, err = coreClient.GetDevice(context.Background(), &common.ID{Id: "1234"})
320 assert.NotNil(t, err)
321
322 // Wait until the client service shows as not ready. A wait is not needed. It's just to verify that the
323 // client changes connection state.
324 var clientNotReady isConditionSatisfied = func() bool {
325 serviceStatus := tc.probe.GetStatus(apiEndpoint)
326 return serviceStatus == probe.ServiceStatusNotReady ||
327 serviceStatus == probe.ServiceStatusPreparing ||
328 serviceStatus == probe.ServiceStatusFailed
329 }
330 err = waitUntilCondition(timeout, clientNotReady)
331
332 assert.Nil(t, err)
333
334 // Keep the server down for 1/2 second
335 time.Sleep(500 * time.Millisecond)
336
337 // Test 4: Restart the server and verify the server is back online
338 go ts.start(ctx, t)
339 err = waitUntilCondition(timeout, servicesReady)
340 assert.Nil(t, err)
341
342 // Test 5: verify we can pull new device with a new client instance
343 coreClient = tc.getClient(t)
344 assert.NotNil(t, coreClient)
345 device, err := coreClient.GetDevice(context.Background(), &common.ID{Id: "1234"})
346 assert.Nil(t, err)
347 assert.Equal(t, "test-1234", device.Type)
348 }
349 // Stop the server
350 ts.stop()
351}
352
353func testNoActivity(t *testing.T) {
354 ctx, cancel := context.WithCancel(context.Background())
355 defer cancel()
356
357 // Create a grpc endpoint for the server
358 grpcPort, err := freeport.GetFreePort()
359 assert.Nil(t, err)
360 apiEndpoint := "127.0.0.1:" + strconv.Itoa(grpcPort)
361
362 // Create the test client and start it
363 tc := newTestClient(apiEndpoint, serverRestarted)
364 assert.NotNil(t, tc)
365 go tc.start(ctx, t, idleConnectionTest)
366
367 // Create and start the test server
368 ts := newTestCoreServer(apiEndpoint)
369 ts.registerService(ctx, t)
370 go ts.start(ctx, t)
371
372 // Test 1: Verify that probe status shows ready eventually
373 var servicesReady isConditionSatisfied = func() bool {
374 return ts.probe.IsReady() && tc.probe.IsReady()
375 }
376 err = waitUntilCondition(timeout, servicesReady)
377 assert.Nil(t, err)
378
379 // Test 2: Verify we get a valid client and can make grpc requests with it
380 coreClient := tc.getClient(t)
381 assert.NotNil(t, coreClient)
382
383 device, err := coreClient.GetDevice(context.Background(), &common.ID{Id: "1234"})
384 assert.Nil(t, err)
385 assert.NotNil(t, device)
386 assert.Equal(t, "test-1234", device.Type)
387
388 start := time.Now()
389 numChecks := 3 // Test for 3 checks
390 // Wait on the the idle channel - on no activity a connection probe will be attempted by the client
391 timer := time.NewTimer((monitorInterval + 1*time.Second) * time.Duration(numChecks))
392 defer timer.Stop()
393 count := 0
394loop:
395 for {
396 select {
397 case timestamp := <-testForNoActivityCh:
398 if timestamp.After(start) {
399 count += 1
400 if count > numChecks {
401 break loop
402 }
403 }
404 case <-timer.C:
405 t.Fatal("no activity on the idle channel")
406 }
407 }
408}
409
410func testServerLimit(t *testing.T) {
411 t.Skip() // Not needed for regular unit tests
412
413 ctx, cancel := context.WithCancel(context.Background())
414 defer cancel()
415
416 // Create a grpc endpoint for the server
417 grpcPort, err := freeport.GetFreePort()
418 assert.Nil(t, err)
419 apiEndpoint := "127.0.0.1:" + strconv.Itoa(grpcPort)
420
421 // Create the test client and start it
422 tc := newTestClient(apiEndpoint, serverRestarted)
423 assert.NotNil(t, tc)
424 go tc.start(ctx, t, idleConnectionTest)
425
426 // Create and start the test server
427 ts := newTestCoreServer(apiEndpoint)
428 ts.registerService(ctx, t)
429 go ts.start(ctx, t)
430
431 // Test 1: Verify that probe status shows ready eventually
432 var servicesReady isConditionSatisfied = func() bool {
433 return ts.probe.IsReady() && tc.probe.IsReady()
434 }
435 err = waitUntilCondition(timeout, servicesReady)
436 assert.Nil(t, err)
437
438 // Test 2: Verify we get a valid client and can make grpc requests with it
439 coreClient := tc.getClient(t)
440 assert.NotNil(t, coreClient)
441
442 var lock sync.RWMutex
443 bad := []time.Duration{}
444 bad_err := []string{}
445 good := []time.Duration{}
446 var wg sync.WaitGroup
447 numRPCs := 10
448 total_good := time.Duration(0)
449 max_good := time.Duration(0)
450 total_bad := time.Duration(0)
451 max_bad := time.Duration(0)
452 order_received := []uint32{}
453 for i := 1; i <= numRPCs; i++ {
454 wg.Add(1)
455 go func(seq int) {
456 local := time.Now()
457 ctx, cancel := context.WithTimeout(context.Background(), 10000*time.Millisecond)
458 defer cancel()
459 var err error
460 var d *voltha.Device
461 d, err = coreClient.GetDevice(ctx, &common.ID{Id: strconv.Itoa(seq)})
462 if err != nil {
463 lock.Lock()
464 bad = append(bad, time.Since(local))
465 bad_err = append(bad_err, err.Error())
466 total_bad += time.Since(local)
467 if time.Since(local) > max_bad {
468 max_bad = time.Since(local)
469 }
470 logger.Errorw(ctx, "error produced", log.Fields{"error": err})
471 lock.Unlock()
472 } else {
473 lock.Lock()
474 good = append(good, time.Since(local))
475 total_good += time.Since(local)
476 if time.Since(local) > max_good {
477 max_good = time.Since(local)
478 }
479 if d != nil {
480 order_received = append(order_received, d.Vlan)
481 }
482 lock.Unlock()
483 }
484 wg.Done()
485 }(i)
486 }
487 wg.Wait()
488 assert.Equal(t, 0, len(bad))
489 assert.Equal(t, numRPCs, len(good))
490 //fmt.Println("Bad:", bad[:10])
491 if len(bad_err) > 0 {
492 fmt.Println("Bad Err Last:", bad_err[len(bad_err)-1:])
493 fmt.Println("Bad Err First:", bad_err[:1])
494 }
495 fmt.Println("Good:", good[len(good)-10:])
496 fmt.Println("Good average time:", total_good.Milliseconds()/int64(numRPCs))
497 fmt.Println("Bad average time:", total_bad.Milliseconds()/int64(numRPCs))
498 fmt.Println("Bad Max:", max_bad)
499 fmt.Println("Good Max:", max_good)
500 //fmt.Println("Order received:", order_received)
501
502 prev := order_received[0]
503
504 for i := 1; i < len(order_received); i++ {
505 if order_received[i] < prev {
506 fmt.Println("Prev:", prev, " curr:", order_received[i])
507 }
508 prev = order_received[i]
509 }
510}
511
512func TestSuiteClient3(t *testing.T) {
513 // Setup
514 log.SetAllLogLevel(volthaTestLogLevel)
515
516 // Test starting server before client
517 serverStartsFirstTest(t)
518
519 // Test starting client before server
520 clientStartsFirstTest(t)
521
522 // Test server restarts
523 serverRestarts(t, 1)
524
525 //Test that the client test the grpc connection on no activity
526 testNoActivity(t)
527
528 // Test client queueing with server limit
529 testServerLimit(t)
530}