[VOL-4448] Fix panic issue in grpc client on channel close

Change-Id: I08f30d01f50399806c839cfa01471e2b596894d3
diff --git a/VERSION b/VERSION
index 3769235..ef09838 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-7.1.0
\ No newline at end of file
+7.1.1
\ No newline at end of file
diff --git a/pkg/grpc/client.go b/pkg/grpc/client.go
index bbec5a3..add2b28 100644
--- a/pkg/grpc/client.go
+++ b/pkg/grpc/client.go
@@ -378,9 +378,17 @@
 		select {
 		case <-ctx.Done():
 			logger.Debugw(ctx, "context-closing", log.Fields{"endpoint": c.apiEndPoint})
-			return
+			break loop
 		case event := <-c.events:
 			logger.Debugw(ctx, "received-event", log.Fields{"event": event, "endpoint": c.apiEndPoint})
+			c.connectionLock.RLock()
+			// On a client stopped, just allow the stop event to go through
+			if c.done && event != eventStopped {
+				c.connectionLock.RUnlock()
+				logger.Debugw(ctx, "ignoring-event-on-client-stop", log.Fields{"event": event, "endpoint": c.apiEndPoint})
+				continue
+			}
+			c.connectionLock.RUnlock()
 			switch event {
 			case eventConnecting:
 				c.stateLock.Lock()
@@ -404,7 +412,11 @@
 								return
 							}
 							attempt += 1
-							c.events <- eventConnecting
+							c.connectionLock.RLock()
+							if !c.done {
+								c.events <- eventConnecting
+							}
+							c.connectionLock.RUnlock()
 						} else {
 							backoff.Reset()
 						}
@@ -544,11 +556,14 @@
 }
 
 func (c *Client) Stop(ctx context.Context) {
+	c.connectionLock.Lock()
+	defer c.connectionLock.Unlock()
 	if !c.done {
+		c.done = true
 		c.events <- eventStopped
 		close(c.events)
-		c.done = true
 	}
+	logger.Infow(ctx, "client-stopped", log.Fields{"endpoint": c.apiEndPoint})
 }
 
 // SetService is used for testing only
diff --git a/pkg/grpc/client_test.go b/pkg/grpc/client_test.go
index e06ad7b..0b880f1 100644
--- a/pkg/grpc/client_test.go
+++ b/pkg/grpc/client_test.go
@@ -408,6 +408,65 @@
 	}
 }
 
+func testClientFailure(t *testing.T, numClientRestarts int) {
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+	// Create a grpc endpoint for the server
+	grpcPort, err := freeport.GetFreePort()
+	assert.Nil(t, err)
+	apiEndpoint := "127.0.0.1:" + strconv.Itoa(grpcPort)
+	// Create the test client and start it
+	tc := newTestClient(apiEndpoint, serverRestarted)
+	assert.NotNil(t, tc)
+	go tc.start(ctx, t, idleConnectionTest)
+	// Create and start the test server
+	ts := newTestCoreServer(apiEndpoint)
+	ts.registerService(ctx, t)
+	go ts.start(ctx, t)
+	defer ts.stop()
+	// Test 1: Verify that probe status shows ready eventually
+	var servicesReady isConditionSatisfied = func() bool {
+		return ts.probe.IsReady() && tc.probe.IsReady()
+	}
+	err = waitUntilCondition(timeout, servicesReady)
+	assert.Nil(t, err)
+	// Test 2: Verify we get a valid client and can make grpc requests with it
+	coreClient := tc.getClient(t)
+	assert.NotNil(t, coreClient)
+	device, err := coreClient.GetDevice(context.Background(), &common.ID{Id: "1234"})
+	assert.Nil(t, err)
+	assert.NotNil(t, device)
+	assert.Equal(t, "test-1234", device.Type)
+	for i := 1; i <= numClientRestarts; i++ {
+		// Kill grpc client
+		tc.client.Stop(context.Background())
+		var clientNotReady isConditionSatisfied = func() bool {
+			return !tc.probe.IsReady()
+		}
+		err = waitUntilCondition(timeout, clientNotReady)
+		assert.Nil(t, err)
+		// Create a new client
+		tc.client, err = NewClient(
+			apiEndpoint,
+			serverRestarted,
+			ActivityCheck(true))
+		assert.Nil(t, err)
+		probeCtx := context.WithValue(ctx, probe.ProbeContextKey, tc.probe)
+		go tc.client.Start(probeCtx, idleConnectionTest)
+		//Verify that probe status shows ready eventually
+		err = waitUntilCondition(timeout, servicesReady)
+		assert.Nil(t, err)
+		// Verify we get a valid client and can make grpc requests with it
+		coreClient = tc.getClient(t)
+		assert.NotNil(t, coreClient)
+		device, err = coreClient.GetDevice(context.Background(), &common.ID{Id: "1234"})
+		assert.Nil(t, err)
+		assert.NotNil(t, device)
+		assert.Equal(t, "test-1234", device.Type)
+	}
+	tc.client.Stop(context.Background())
+}
+
 func testServerLimit(t *testing.T) {
 	t.Skip() // Not needed for regular unit tests
 
@@ -528,4 +587,7 @@
 
 	// Test client queueing with server limit
 	testServerLimit(t)
+
+	// Test the scenario where a client restarts
+	testClientFailure(t, 10)
 }