VOL-2518 - reconnect to voltha on disconnect

Change-Id: Ia497bb6a83312f15e54de0d7556753e1d9ef58b0
diff --git a/internal/pkg/ofagent/changeEvent.go b/internal/pkg/ofagent/changeEvent.go
index 69b14ff..20c55de 100644
--- a/internal/pkg/ofagent/changeEvent.go
+++ b/internal/pkg/ofagent/changeEvent.go
@@ -29,38 +29,51 @@
 
 func (ofa *OFAgent) receiveChangeEvents(ctx context.Context) {
 	logger.Debug("receive-change-events-started")
+	// If we exit, assume disconnected
+	defer func() {
+		ofa.events <- ofaEventVolthaDisconnected
+		logger.Debug("receive-change-events-finished")
+	}()
+	if ofa.volthaClient == nil {
+		logger.Error("no-voltha-connection")
+		return
+	}
 	opt := grpc.EmptyCallOption{}
 	streamCtx, streamDone := context.WithCancel(context.Background())
+	defer streamDone()
 	stream, err := ofa.volthaClient.ReceiveChangeEvents(streamCtx, &empty.Empty{}, opt)
 	if err != nil {
 		logger.Errorw("Unable to establish Receive Change Event Stream",
 			log.Fields{"error": err})
-		ofa.events <- ofaEventVolthaDisconnected
+		return
 	}
-	defer streamDone()
 
+top:
 	for {
 		select {
 		case <-ctx.Done():
-			return
+			break top
 		default:
-			if ce, err := stream.Recv(); err != nil {
+			ce, err := stream.Recv()
+			if err != nil {
 				logger.Errorw("error receiving change event",
 					log.Fields{"error": err})
-				ofa.events <- ofaEventVolthaDisconnected
-			} else {
-				ofa.changeEventChannel <- ce
+				break top
 			}
+			ofa.changeEventChannel <- ce
+			logger.Debug("receive-change-event-queued")
 		}
 	}
 }
 
 func (ofa *OFAgent) handleChangeEvents(ctx context.Context) {
-	logger.Debugln("handle-change-event-started")
+	logger.Debug("handle-change-event-started")
+
+top:
 	for {
 		select {
 		case <-ctx.Done():
-			return
+			break top
 		case changeEvent := <-ofa.changeEventChannel:
 			deviceID := changeEvent.GetId()
 			portStatus := changeEvent.GetPortStatus()
@@ -107,4 +120,6 @@
 			ofa.getOFClient(deviceID).SendMessage(ofPortStatus)
 		}
 	}
+
+	logger.Debug("handle-change-event-finsihed")
 }