This commit made some modifications in the way messages over
kafka are consumed, mostly around the initial offset to use.
Change-Id: I6104ef710d9c595034cd4cedc0d58ae774cec719
diff --git a/rw_core/core/adapter_proxy.go b/rw_core/core/adapter_proxy.go
index 4af4fb0..3ce59a7 100644
--- a/rw_core/core/adapter_proxy.go
+++ b/rw_core/core/adapter_proxy.go
@@ -77,16 +77,20 @@
}
// Use a device topic for the response as we are the only core handling requests for this device
replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+ if err := ap.kafkaICProxy.SubscribeWithDefaultRequestHandler(replyToTopic); err != nil {
+ log.Errorw("Unable-to-subscribe-new-topic", log.Fields{"topic": replyToTopic, "error": err})
+ return err
+ }
success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, args...)
log.Debugw("AdoptDevice-response", log.Fields{"replyTopic": replyToTopic, "deviceid": device.Id, "success": success})
- if success {
- // From now on, any unsolicited requests from the adapters for this device will come over the device topic.
- // We should therefore include the replyToTopic as part of the target when unsolicited messages come in.
- if err := ap.kafkaICProxy.SubscribeWithDefaultRequestHandler(replyToTopic); err != nil {
- log.Errorw("Unable-to-subscribe-new-topic", log.Fields{"topic": replyToTopic, "error": err})
- return err
- }
- }
+ //if success {
+ // // From now on, any unsolicited requests from the adapters for this device will come over the device topic.
+ // // We should therefore include the replyToTopic as part of the target when unsolicited messages come in.
+ // if err := ap.kafkaICProxy.SubscribeWithDefaultRequestHandler(replyToTopic); err != nil {
+ // log.Errorw("Unable-to-subscribe-new-topic", log.Fields{"topic": replyToTopic, "error": err})
+ // return err
+ // }
+ //}
return unPackResponse(rpc, device.Id, success, result)
}