This commit fixes a few issues:
1) The number of arguments to decode in a request to the simulated
OLT was incorrect
2) Adapter type was not set properly when a device is loaded from
DB
Change-Id: I7aa9a5314bd167565372138b0819df9aa744c41b
diff --git a/kafka/kafka_inter_container_library.go b/kafka/kafka_inter_container_library.go
index 4359f7d..099d0d8 100644
--- a/kafka/kafka_inter_container_library.go
+++ b/kafka/kafka_inter_container_library.go
@@ -38,7 +38,7 @@
const (
DefaultMaxRetries = 3
- DefaultRequestTimeout = 3000 // 3000 milliseconds - to handle a wider latency range
+ DefaultRequestTimeout = 10000 // 10000 milliseconds - to handle a wider latency range
)
const (
@@ -256,7 +256,7 @@
// subscriber on that topic will receive the request in the order it was sent. The key used is the deviceId.
key := GetDeviceIdFromTopic(*toTopic)
log.Debugw("sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key})
- go kp.kafkaClient.Send(protoRequest, toTopic, key)
+ kp.kafkaClient.Send(protoRequest, toTopic, key)
if waitForResponse {
// Create a child context based on the parent context, if any
@@ -756,6 +756,12 @@
}
kp.setupTopicResponseChannelMap(topic.Name, subscribedCh)
go kp.waitForResponseLoop(subscribedCh, &topic)
+
+ // Wait until topic is ready - it takes on average 300 ms for a topic to be created. This is a one time
+ // delay everything a device is created.
+ // TODO: Implement a mechanism to determine when a topic is ready instead of relying on a timeout
+ //kp.kafkaClient.WaitForTopicToBeReady
+ time.Sleep(400 * time.Millisecond)
}
// Create a specific channel for this consumers. We cannot use the channel from the kafkaclient as it will