[VOL-1547] Add port to logical device when device is active
This commit consists of the following changes:
1) Fix the issue where flows were received when the logical
device flow graph was not ready.
2) Update the default kafka config for improved performance
3) Add a lock to the device ownership logic to ensure the
lock map does not get corrupted.
Change-Id: I840d572e06ed5acf0f3bc1ce423a0ada8f335543
diff --git a/kafka/client.go b/kafka/client.go
index 4eb3e5a..a4c49ca 100644
--- a/kafka/client.go
+++ b/kafka/client.go
@@ -40,14 +40,14 @@
DefaultKafkaPort = 9092
DefaultGroupName = "voltha"
DefaultSleepOnError = 1
- DefaultProducerFlushFrequency = 1
- DefaultProducerFlushMessages = 1
- DefaultProducerFlushMaxmessages = 1
+ DefaultProducerFlushFrequency = 10
+ DefaultProducerFlushMessages = 10
+ DefaultProducerFlushMaxmessages = 100
DefaultProducerReturnSuccess = true
DefaultProducerReturnErrors = true
DefaultProducerRetryMax = 3
DefaultProducerRetryBackoff = time.Millisecond * 100
- DefaultConsumerMaxwait = 10
+ DefaultConsumerMaxwait = 100
DefaultMaxProcessingTime = 100
DefaultConsumerType = PartitionConsumer
DefaultNumberPartitions = 3
diff --git a/kafka/kafka_inter_container_library.go b/kafka/kafka_inter_container_library.go
index 42e5a02..b9c03e6 100644
--- a/kafka/kafka_inter_container_library.go
+++ b/kafka/kafka_inter_container_library.go
@@ -257,7 +257,7 @@
// subscriber on that topic will receive the request in the order it was sent. The key used is the deviceId.
//key := GetDeviceIdFromTopic(*toTopic)
log.Debugw("sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
- kp.kafkaClient.Send(protoRequest, toTopic, key)
+ go kp.kafkaClient.Send(protoRequest, toTopic, key)
if waitForResponse {
// Create a child context based on the parent context, if any
@@ -550,7 +550,7 @@
FromTopic: request.Header.ToTopic,
ToTopic: request.Header.FromTopic,
KeyTopic: request.Header.KeyTopic,
- Timestamp: time.Now().Unix(),
+ Timestamp: time.Now().UnixNano(),
}
// Go over all returned values
@@ -635,7 +635,6 @@
// First extract the header to know whether this is a request - responses are handled by a different handler
if msg.Header.Type == ic.MessageType_REQUEST {
-
var out []reflect.Value
var err error
@@ -707,7 +706,7 @@
key := msg.Header.KeyTopic
log.Debugw("sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": icm.Header, "key": key})
// TODO: handle error response.
- kp.kafkaClient.Send(icm, replyTopic, key)
+ go kp.kafkaClient.Send(icm, replyTopic, key)
}
} else if msg.Header.Type == ic.MessageType_RESPONSE {
log.Debugw("response-received", log.Fields{"msg-header": msg.Header})
@@ -765,7 +764,7 @@
FromTopic: replyTopic.Name,
ToTopic: toTopic.Name,
KeyTopic: key,
- Timestamp: time.Now().Unix(),
+ Timestamp: time.Now().UnixNano(),
}
requestBody := &ic.InterContainerRequestBody{
Rpc: rpc,