[VOL-1417]
Fix to the issue where discovery events were not being processed.
Also some minor tweaks to the affinity router's build system.
Change-Id: I73bd9ea5e747dcfacb2bc5c2c8e77a7edbf318a3
diff --git a/arouterd/arouterd.go b/arouterd/arouterd.go
index 0ea0b49..06f4628 100644
--- a/arouterd/arouterd.go
+++ b/arouterd/arouterd.go
@@ -671,23 +671,44 @@
}
}
+func getBackendForCore(coreId string, coreGroups [][]*rwPod) string {
+ for _,v := range(coreGroups) {
+ for _,v2 := range(v) {
+ if v2.name == coreId {
+ return v2.backend
+ }
+ }
+ }
+ log.Errorf("No backend found for core %s\n", coreId)
+ return ""
+}
+
func monitorDiscovery(client pb.ConfigurationClient,
- ch <-chan *ic.InterContainerMessage) {
+ ch <-chan *ic.InterContainerMessage,
+ coreGroups [][]*rwPod) {
+ var id map[string]struct{} = make(map[string]struct{})
+
select {
case msg := <-ch:
log.Debugf("Received a device discovery notification")
- _ = msg
- requestBody := &ic.InterContainerRequestBody{}
- if err := ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
+ device := &ic.DeviceDiscovered{}
+ if err := ptypes.UnmarshalAny(msg.Body, device); err != nil {
log.Errorf("Could not unmarshal received notification %v", msg)
} else {
- // Do something with the message here
+ // Set the affinity of the discovered device.
+ if be := getBackendForCore(device.Id, coreGroups); be != "" {
+ id[device.Id]=struct{}{}
+ setAffinity(client, id, be)
+ } else {
+ log.Error("Cant use an empty string as a backend name")
+ }
}
break
}
}
-func startDiscoveryMonitor(client pb.ConfigurationClient) error {
+func startDiscoveryMonitor(client pb.ConfigurationClient,
+ coreGroups [][]*rwPod) error {
var ch <-chan *ic.InterContainerMessage
// Connect to kafka for discovery events
topic := &kafka.Topic{Name: "AffinityRouter"}
@@ -698,7 +719,7 @@
log.Error("Could not subscribe to the 'AffinityRouter' channel, discovery disabled")
return err
}
- go monitorDiscovery(client, ch)
+ go monitorDiscovery(client, ch, coreGroups)
return nil
}
@@ -816,7 +837,7 @@
}
log.Debug("Starting discovery monitoring")
- startDiscoveryMonitor(client)
+ startDiscoveryMonitor(client, coreGroups)
log.Debugf("Starting core monitoring")
startCoreMonitor(client, clientset, coreFltr, coreGroups) // Never returns