[VOL-1512] Set device ownership

This commit consists of the following:
1) Set device ownership per Core in a Core-pair such that only 1
Core actively process a device (i.e. handles all the requests for
that device) while the other Core in the pair passively watch for
updates on that device and will take over in case the owner Core
failed to process the transaction.
2) Cleanup the lock mechanisms to ensure we use a read lock when
needed instead of just a lock.
3) Update logical port additions such that ports are added only when
the device is enabled.
4) Update the port Ids for the logical ports.
5) Update some sarama client configs for performance - this is an
ongoing tune up.
6) Update the adapter request handler in the Core to send back an
ACK immediately to the adapter request instead of processing the
request fully and then sending an ACK.  This reduces the latency
over kafka and therefore reduces the likelihood of timeouts.

Change-Id: I9149bf3ba6fbad38e3a29c76ea8dba2f9f731d29
diff --git a/kafka/kafka_inter_container_library.go b/kafka/kafka_inter_container_library.go
index 1229e7a..e5e9606 100644
--- a/kafka/kafka_inter_container_library.go
+++ b/kafka/kafka_inter_container_library.go
@@ -256,7 +256,7 @@
 	// specific key, hence ensuring a single partition is used to publish the request.  This ensures that the
 	// subscriber on that topic will receive the request in the order it was sent.  The key used is the deviceId.
 	//key := GetDeviceIdFromTopic(*toTopic)
-	log.Debugw("sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key})
+	log.Debugw("sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
 	kp.kafkaClient.Send(protoRequest, toTopic, key)
 
 	if waitForResponse {
@@ -370,8 +370,8 @@
 }
 
 func (kp *InterContainerProxy) isTopicSubscribedForResponse(topic string) bool {
-	kp.lockTopicResponseChannelMap.Lock()
-	defer kp.lockTopicResponseChannelMap.Unlock()
+	kp.lockTopicResponseChannelMap.RLock()
+	defer kp.lockTopicResponseChannelMap.RUnlock()
 	_, exist := kp.topicToResponseChannelMap[topic]
 	return exist
 }
@@ -710,10 +710,10 @@
 			kp.kafkaClient.Send(icm, replyTopic, key)
 		}
 	} else if msg.Header.Type == ic.MessageType_RESPONSE {
-		log.Debugw("response-received", log.Fields{"msg": msg})
+		log.Debugw("response-received", log.Fields{"msg-header": msg.Header})
 		go kp.dispatchResponse(msg)
 	} else {
-		log.Warnw("unsupported-message-received", log.Fields{"msg": msg})
+		log.Warnw("unsupported-message-received", log.Fields{"msg-header": msg.Header})
 	}
 }
 
@@ -726,8 +726,8 @@
 }
 
 func (kp *InterContainerProxy) dispatchResponse(msg *ic.InterContainerMessage) {
-	kp.lockTransactionIdToChannelMap.Lock()
-	defer kp.lockTransactionIdToChannelMap.Unlock()
+	kp.lockTransactionIdToChannelMap.RLock()
+	defer kp.lockTransactionIdToChannelMap.RUnlock()
 	if _, exist := kp.transactionIdToChannelMap[msg.Header.Id]; !exist {
 		log.Debugw("no-waiting-channel", log.Fields{"transaction": msg.Header.Id})
 		return