VOL-2518 - reconnect to voltha on disconnect

Change-Id: Ia497bb6a83312f15e54de0d7556753e1d9ef58b0
diff --git a/internal/pkg/ofagent/packetIn.go b/internal/pkg/ofagent/packetIn.go
index 324d568..175a9c8 100644
--- a/internal/pkg/ofagent/packetIn.go
+++ b/internal/pkg/ofagent/packetIn.go
@@ -31,37 +31,50 @@
 
 func (ofa *OFAgent) receivePacketsIn(ctx context.Context) {
 	logger.Debug("receive-packets-in-started")
+	// If we exit, assume disconnected
+	defer func() {
+		ofa.events <- ofaEventVolthaDisconnected
+		logger.Debug("receive-packets-in-finished")
+	}()
+	if ofa.volthaClient == nil {
+		logger.Error("no-voltha-connection")
+		return
+	}
 	opt := grpc.EmptyCallOption{}
 	streamCtx, streamDone := context.WithCancel(context.Background())
+	defer streamDone()
 	stream, err := ofa.volthaClient.ReceivePacketsIn(streamCtx, &empty.Empty{}, opt)
 	if err != nil {
 		logger.Errorw("Unable to establish Receive PacketIn Stream",
 			log.Fields{"error": err})
+		return
 	}
-	defer streamDone()
+
+top:
 
 	for {
 		select {
 		case <-ctx.Done():
-			return
+			break top
 		default:
-			if pkt, err := stream.Recv(); err != nil {
+			pkt, err := stream.Recv()
+			if err != nil {
 				logger.Errorw("error receiving packet",
 					log.Fields{"error": err})
-				ofa.events <- ofaEventVolthaDisconnected
-			} else {
-				ofa.packetInChannel <- pkt
+				break top
 			}
+			ofa.packetInChannel <- pkt
 		}
 	}
 }
 
 func (ofa *OFAgent) handlePacketsIn(ctx context.Context) {
 	logger.Debug("handle-packets-in-started")
+top:
 	for {
 		select {
 		case <-ctx.Done():
-			return
+			break top
 		case packet := <-ofa.packetInChannel:
 			packetIn := packet.GetPacketIn()
 
@@ -157,4 +170,5 @@
 
 		}
 	}
+	logger.Debug("handle-packets-in-finished")
 }