VOL-1902: Stop kafka client only after device monitoring exits since consumer kafka channels are closed on Stop.
Also exit when context is done instead of exiting device monitor loop on receiving first kafka message.
Change-Id: I4938dfeb8335327d1625250db761e1d1b8887bbb
diff --git a/arouterd/arouterd.go b/arouterd/arouterd.go
index 538d4c8..d53891a 100644
--- a/arouterd/arouterd.go
+++ b/arouterd/arouterd.go
@@ -257,13 +257,15 @@
}
}
-func monitorDiscovery(ctx context.Context, client pb.ConfigurationClient, ch <-chan *ic.InterContainerMessage, doneCh chan<- struct{}) {
+func monitorDiscovery(kc kafka.Client, ctx context.Context, client pb.ConfigurationClient, ch <-chan *ic.InterContainerMessage, doneCh chan<- struct{}) {
defer close(doneCh)
+ defer kc.Stop()
monitorLoop:
for {
select {
case <-ctx.Done():
+ break monitorLoop
case msg := <-ch:
log.Debug("Received a device discovery notification")
device := &ic.DeviceDiscovered{}
@@ -279,7 +281,6 @@
log.Error("backend is unknown")
}
}
- break monitorLoop
}
}
}
@@ -291,17 +292,21 @@
if err != nil {
panic(err)
}
- kc.Start()
- defer kc.Stop()
+ if err := kc.Start(); err != nil {
+ log.Error("Could not connect to kafka, discovery disabled")
+ close(doneCh)
+ return doneCh, err
+ }
ch, err := kc.Subscribe(&kafka.Topic{Name: kafkaTopic})
if err != nil {
log.Errorf("Could not subscribe to the '%s' channel, discovery disabled", kafkaTopic)
close(doneCh)
+ kc.Stop()
return doneCh, err
}
- go monitorDiscovery(ctx, client, ch, doneCh)
+ go monitorDiscovery(kc, ctx, client, ch, doneCh)
return doneCh, nil
}