VOL-1995: Enable device monitoring in arouterd.
It was skipped when kafka wasn't ready to listen to device notifications during startup.
Bumping version to dev for jenkins tag-collision test.
Keep trying to connect to kafka forever on failure.
Try connecting to kafka as soon as possible without delaying even for first time.
This will ensure faster restarts when api-server pod is restarted with kafka pod already up.
Change-Id: I7433f3bd7d038e41388c84003b7db26a8eda17e6
diff --git a/VERSION b/VERSION
index eca07e4..7783fad 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-2.1.2
+2.1.3-dev
diff --git a/arouterd/arouterd.go b/arouterd/arouterd.go
index d53891a..2c6b31e 100644
--- a/arouterd/arouterd.go
+++ b/arouterd/arouterd.go
@@ -274,7 +274,7 @@
} else {
// somewhat hackish solution, backend is known from the first digit found in the publisher name
group := regexp.MustCompile(`\d`).FindString(device.Publisher)
- if group == "" {
+ if group != "" {
// set the affinity of the discovered device
setAffinity(ctx, client, device.Id, afrouterRWClusterName+group)
} else {
@@ -292,12 +292,21 @@
if err != nil {
panic(err)
}
- if err := kc.Start(); err != nil {
- log.Error("Could not connect to kafka, discovery disabled")
- close(doneCh)
- return doneCh, err
- }
+ for {
+ if err := kc.Start(); err != nil {
+ log.Error("Could not connect to kafka")
+ } else {
+ break
+ }
+ select {
+ case <-ctx.Done():
+ close(doneCh)
+ return doneCh, errors.New("GRPC context done")
+
+ case <-time.After(5 * time.Second):
+ }
+ }
ch, err := kc.Subscribe(&kafka.Topic{Name: kafkaTopic})
if err != nil {
log.Errorf("Could not subscribe to the '%s' channel, discovery disabled", kafkaTopic)