VOL-1902: Stop kafka client only after device monitoring exits since consumer kafka channels are closed on Stop.
Also don't Stop the client unless Start had succeeded.
The kafka client is now passed to device monitoring which stops it after its done.
Stop the kafka client before closing the done channel.
Change-Id: I68815a035154de55cfa61b5775c9506b2ce62675
diff --git a/arouterd/arouterd.go b/arouterd/arouterd.go
index 538d4c8..19ebbdc 100644
--- a/arouterd/arouterd.go
+++ b/arouterd/arouterd.go
@@ -257,8 +257,9 @@
}
}
-func monitorDiscovery(ctx context.Context, client pb.ConfigurationClient, ch <-chan *ic.InterContainerMessage, doneCh chan<- struct{}) {
+func monitorDiscovery(kc kafka.Client, ctx context.Context, client pb.ConfigurationClient, ch <-chan *ic.InterContainerMessage, doneCh chan<- struct{}) {
defer close(doneCh)
+ defer kc.Stop()
monitorLoop:
for {
@@ -291,17 +292,21 @@
if err != nil {
panic(err)
}
- kc.Start()
- defer kc.Stop()
+ if err := kc.Start(); err != nil {
+ log.Error("Could not connect to kafka, discovery disabled")
+ close(doneCh)
+ return doneCh, err
+ }
ch, err := kc.Subscribe(&kafka.Topic{Name: kafkaTopic})
if err != nil {
log.Errorf("Could not subscribe to the '%s' channel, discovery disabled", kafkaTopic)
close(doneCh)
+ kc.Stop()
return doneCh, err
}
- go monitorDiscovery(ctx, client, ch, doneCh)
+ go monitorDiscovery(kc, ctx, client, ch, doneCh)
return doneCh, nil
}