[VOL-4448] Fix panic issue in grpc client on channel close
Change-Id: I08f30d01f50399806c839cfa01471e2b596894d3
diff --git a/VERSION b/VERSION
index 3769235..ef09838 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-7.1.0
\ No newline at end of file
+7.1.1
\ No newline at end of file
diff --git a/pkg/grpc/client.go b/pkg/grpc/client.go
index bbec5a3..add2b28 100644
--- a/pkg/grpc/client.go
+++ b/pkg/grpc/client.go
@@ -378,9 +378,17 @@
select {
case <-ctx.Done():
logger.Debugw(ctx, "context-closing", log.Fields{"endpoint": c.apiEndPoint})
- return
+ break loop
case event := <-c.events:
logger.Debugw(ctx, "received-event", log.Fields{"event": event, "endpoint": c.apiEndPoint})
+ c.connectionLock.RLock()
+ // On a client stopped, just allow the stop event to go through
+ if c.done && event != eventStopped {
+ c.connectionLock.RUnlock()
+ logger.Debugw(ctx, "ignoring-event-on-client-stop", log.Fields{"event": event, "endpoint": c.apiEndPoint})
+ continue
+ }
+ c.connectionLock.RUnlock()
switch event {
case eventConnecting:
c.stateLock.Lock()
@@ -404,7 +412,11 @@
return
}
attempt += 1
- c.events <- eventConnecting
+ c.connectionLock.RLock()
+ if !c.done {
+ c.events <- eventConnecting
+ }
+ c.connectionLock.RUnlock()
} else {
backoff.Reset()
}
@@ -544,11 +556,14 @@
}
func (c *Client) Stop(ctx context.Context) {
+ c.connectionLock.Lock()
+ defer c.connectionLock.Unlock()
if !c.done {
+ c.done = true
c.events <- eventStopped
close(c.events)
- c.done = true
}
+ logger.Infow(ctx, "client-stopped", log.Fields{"endpoint": c.apiEndPoint})
}
// SetService is used for testing only
diff --git a/pkg/grpc/client_test.go b/pkg/grpc/client_test.go
index e06ad7b..0b880f1 100644
--- a/pkg/grpc/client_test.go
+++ b/pkg/grpc/client_test.go
@@ -408,6 +408,65 @@
}
}
+func testClientFailure(t *testing.T, numClientRestarts int) {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ // Create a grpc endpoint for the server
+ grpcPort, err := freeport.GetFreePort()
+ assert.Nil(t, err)
+ apiEndpoint := "127.0.0.1:" + strconv.Itoa(grpcPort)
+ // Create the test client and start it
+ tc := newTestClient(apiEndpoint, serverRestarted)
+ assert.NotNil(t, tc)
+ go tc.start(ctx, t, idleConnectionTest)
+ // Create and start the test server
+ ts := newTestCoreServer(apiEndpoint)
+ ts.registerService(ctx, t)
+ go ts.start(ctx, t)
+ defer ts.stop()
+ // Test 1: Verify that probe status shows ready eventually
+ var servicesReady isConditionSatisfied = func() bool {
+ return ts.probe.IsReady() && tc.probe.IsReady()
+ }
+ err = waitUntilCondition(timeout, servicesReady)
+ assert.Nil(t, err)
+ // Test 2: Verify we get a valid client and can make grpc requests with it
+ coreClient := tc.getClient(t)
+ assert.NotNil(t, coreClient)
+ device, err := coreClient.GetDevice(context.Background(), &common.ID{Id: "1234"})
+ assert.Nil(t, err)
+ assert.NotNil(t, device)
+ assert.Equal(t, "test-1234", device.Type)
+ for i := 1; i <= numClientRestarts; i++ {
+ // Kill grpc client
+ tc.client.Stop(context.Background())
+ var clientNotReady isConditionSatisfied = func() bool {
+ return !tc.probe.IsReady()
+ }
+ err = waitUntilCondition(timeout, clientNotReady)
+ assert.Nil(t, err)
+ // Create a new client
+ tc.client, err = NewClient(
+ apiEndpoint,
+ serverRestarted,
+ ActivityCheck(true))
+ assert.Nil(t, err)
+ probeCtx := context.WithValue(ctx, probe.ProbeContextKey, tc.probe)
+ go tc.client.Start(probeCtx, idleConnectionTest)
+ //Verify that probe status shows ready eventually
+ err = waitUntilCondition(timeout, servicesReady)
+ assert.Nil(t, err)
+ // Verify we get a valid client and can make grpc requests with it
+ coreClient = tc.getClient(t)
+ assert.NotNil(t, coreClient)
+ device, err = coreClient.GetDevice(context.Background(), &common.ID{Id: "1234"})
+ assert.Nil(t, err)
+ assert.NotNil(t, device)
+ assert.Equal(t, "test-1234", device.Type)
+ }
+ tc.client.Stop(context.Background())
+}
+
func testServerLimit(t *testing.T) {
t.Skip() // Not needed for regular unit tests
@@ -528,4 +587,7 @@
// Test client queueing with server limit
testServerLimit(t)
+
+ // Test the scenario where a client restarts
+ testClientFailure(t, 10)
}