[VOL-1547] Add port to logical device when device is active

This commit consists of the following changes:
1)  Fix the issue where flows were received when the logical
device flow graph was not ready.
2) Update the default kafka config for improved performance
3) Add a lock to the device ownership logic to ensure the
lock map does not get corrupted.

Change-Id: I840d572e06ed5acf0f3bc1ce423a0ada8f335543
diff --git a/kafka/client.go b/kafka/client.go
index 4eb3e5a..a4c49ca 100644
--- a/kafka/client.go
+++ b/kafka/client.go
@@ -40,14 +40,14 @@
 	DefaultKafkaPort                = 9092
 	DefaultGroupName                = "voltha"
 	DefaultSleepOnError             = 1
-	DefaultProducerFlushFrequency   = 1
-	DefaultProducerFlushMessages    = 1
-	DefaultProducerFlushMaxmessages = 1
+	DefaultProducerFlushFrequency   = 10
+	DefaultProducerFlushMessages    = 10
+	DefaultProducerFlushMaxmessages = 100
 	DefaultProducerReturnSuccess    = true
 	DefaultProducerReturnErrors     = true
 	DefaultProducerRetryMax         = 3
 	DefaultProducerRetryBackoff     = time.Millisecond * 100
-	DefaultConsumerMaxwait          = 10
+	DefaultConsumerMaxwait          = 100
 	DefaultMaxProcessingTime        = 100
 	DefaultConsumerType             = PartitionConsumer
 	DefaultNumberPartitions         = 3
diff --git a/kafka/kafka_inter_container_library.go b/kafka/kafka_inter_container_library.go
index 42e5a02..b9c03e6 100644
--- a/kafka/kafka_inter_container_library.go
+++ b/kafka/kafka_inter_container_library.go
@@ -257,7 +257,7 @@
 	// subscriber on that topic will receive the request in the order it was sent.  The key used is the deviceId.
 	//key := GetDeviceIdFromTopic(*toTopic)
 	log.Debugw("sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
-	kp.kafkaClient.Send(protoRequest, toTopic, key)
+	go kp.kafkaClient.Send(protoRequest, toTopic, key)
 
 	if waitForResponse {
 		// Create a child context based on the parent context, if any
@@ -550,7 +550,7 @@
 		FromTopic: request.Header.ToTopic,
 		ToTopic:   request.Header.FromTopic,
 		KeyTopic: request.Header.KeyTopic,
-		Timestamp: time.Now().Unix(),
+		Timestamp: time.Now().UnixNano(),
 	}
 
 	// Go over all returned values
@@ -635,7 +635,6 @@
 
 	// First extract the header to know whether this is a request - responses are handled by a different handler
 	if msg.Header.Type == ic.MessageType_REQUEST {
-
 		var out []reflect.Value
 		var err error
 
@@ -707,7 +706,7 @@
 			key := msg.Header.KeyTopic
 			log.Debugw("sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": icm.Header, "key": key})
 			// TODO: handle error response.
-			kp.kafkaClient.Send(icm, replyTopic, key)
+			 go kp.kafkaClient.Send(icm, replyTopic, key)
 		}
 	} else if msg.Header.Type == ic.MessageType_RESPONSE {
 		log.Debugw("response-received", log.Fields{"msg-header": msg.Header})
@@ -765,7 +764,7 @@
 		FromTopic: replyTopic.Name,
 		ToTopic:   toTopic.Name,
 		KeyTopic: key,
-		Timestamp: time.Now().Unix(),
+		Timestamp: time.Now().UnixNano(),
 	}
 	requestBody := &ic.InterContainerRequestBody{
 		Rpc:              rpc,