VOL-2518 - reconnect to voltha on disconnect
Change-Id: Ia497bb6a83312f15e54de0d7556753e1d9ef58b0
diff --git a/internal/pkg/ofagent/changeEvent.go b/internal/pkg/ofagent/changeEvent.go
index 69b14ff..20c55de 100644
--- a/internal/pkg/ofagent/changeEvent.go
+++ b/internal/pkg/ofagent/changeEvent.go
@@ -29,38 +29,51 @@
func (ofa *OFAgent) receiveChangeEvents(ctx context.Context) {
logger.Debug("receive-change-events-started")
+ // If we exit, assume disconnected
+ defer func() {
+ ofa.events <- ofaEventVolthaDisconnected
+ logger.Debug("receive-change-events-finished")
+ }()
+ if ofa.volthaClient == nil {
+ logger.Error("no-voltha-connection")
+ return
+ }
opt := grpc.EmptyCallOption{}
streamCtx, streamDone := context.WithCancel(context.Background())
+ defer streamDone()
stream, err := ofa.volthaClient.ReceiveChangeEvents(streamCtx, &empty.Empty{}, opt)
if err != nil {
logger.Errorw("Unable to establish Receive Change Event Stream",
log.Fields{"error": err})
- ofa.events <- ofaEventVolthaDisconnected
+ return
}
- defer streamDone()
+top:
for {
select {
case <-ctx.Done():
- return
+ break top
default:
- if ce, err := stream.Recv(); err != nil {
+ ce, err := stream.Recv()
+ if err != nil {
logger.Errorw("error receiving change event",
log.Fields{"error": err})
- ofa.events <- ofaEventVolthaDisconnected
- } else {
- ofa.changeEventChannel <- ce
+ break top
}
+ ofa.changeEventChannel <- ce
+ logger.Debug("receive-change-event-queued")
}
}
}
func (ofa *OFAgent) handleChangeEvents(ctx context.Context) {
- logger.Debugln("handle-change-event-started")
+ logger.Debug("handle-change-event-started")
+
+top:
for {
select {
case <-ctx.Done():
- return
+ break top
case changeEvent := <-ofa.changeEventChannel:
deviceID := changeEvent.GetId()
portStatus := changeEvent.GetPortStatus()
@@ -107,4 +120,6 @@
ofa.getOFClient(deviceID).SendMessage(ofPortStatus)
}
}
+
+ logger.Debug("handle-change-event-finsihed")
}