VOL-2518 - reconnect to voltha on disconnect
Change-Id: Ia497bb6a83312f15e54de0d7556753e1d9ef58b0
diff --git a/internal/pkg/ofagent/packetIn.go b/internal/pkg/ofagent/packetIn.go
index 324d568..175a9c8 100644
--- a/internal/pkg/ofagent/packetIn.go
+++ b/internal/pkg/ofagent/packetIn.go
@@ -31,37 +31,50 @@
func (ofa *OFAgent) receivePacketsIn(ctx context.Context) {
logger.Debug("receive-packets-in-started")
+ // If we exit, assume disconnected
+ defer func() {
+ ofa.events <- ofaEventVolthaDisconnected
+ logger.Debug("receive-packets-in-finished")
+ }()
+ if ofa.volthaClient == nil {
+ logger.Error("no-voltha-connection")
+ return
+ }
opt := grpc.EmptyCallOption{}
streamCtx, streamDone := context.WithCancel(context.Background())
+ defer streamDone()
stream, err := ofa.volthaClient.ReceivePacketsIn(streamCtx, &empty.Empty{}, opt)
if err != nil {
logger.Errorw("Unable to establish Receive PacketIn Stream",
log.Fields{"error": err})
+ return
}
- defer streamDone()
+
+top:
for {
select {
case <-ctx.Done():
- return
+ break top
default:
- if pkt, err := stream.Recv(); err != nil {
+ pkt, err := stream.Recv()
+ if err != nil {
logger.Errorw("error receiving packet",
log.Fields{"error": err})
- ofa.events <- ofaEventVolthaDisconnected
- } else {
- ofa.packetInChannel <- pkt
+ break top
}
+ ofa.packetInChannel <- pkt
}
}
}
func (ofa *OFAgent) handlePacketsIn(ctx context.Context) {
logger.Debug("handle-packets-in-started")
+top:
for {
select {
case <-ctx.Done():
- return
+ break top
case packet := <-ofa.packetInChannel:
packetIn := packet.GetPacketIn()
@@ -157,4 +170,5 @@
}
}
+ logger.Debug("handle-packets-in-finished")
}