[VOL-1512] Set device ownership
This commit consists of the following:
1) Set device ownership per Core in a Core-pair such that only 1
Core actively process a device (i.e. handles all the requests for
that device) while the other Core in the pair passively watch for
updates on that device and will take over in case the owner Core
failed to process the transaction.
2) Cleanup the lock mechanisms to ensure we use a read lock when
needed instead of just a lock.
3) Update logical port additions such that ports are added only when
the device is enabled.
4) Update the port Ids for the logical ports.
5) Update some sarama client configs for performance - this is an
ongoing tune up.
6) Update the adapter request handler in the Core to send back an
ACK immediately to the adapter request instead of processing the
request fully and then sending an ACK. This reduces the latency
over kafka and therefore reduces the likelihood of timeouts.
Change-Id: I9149bf3ba6fbad38e3a29c76ea8dba2f9f731d29
diff --git a/kafka/sarama_client.go b/kafka/sarama_client.go
index 698fefa..b0ce502 100644
--- a/kafka/sarama_client.go
+++ b/kafka/sarama_client.go
@@ -540,8 +540,8 @@
}
func (sc *SaramaClient) getConsumerChannel(topic *Topic) *consumerChannels {
- sc.lockTopicToConsumerChannelMap.Lock()
- defer sc.lockTopicToConsumerChannelMap.Unlock()
+ sc.lockTopicToConsumerChannelMap.RLock()
+ defer sc.lockTopicToConsumerChannelMap.RUnlock()
if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
return consumerCh
@@ -726,8 +726,8 @@
// topic via the unique channel each subscriber received during subscription
func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ic.InterContainerMessage) {
// Need to go over all channels and publish messages to them - do we need to copy msg?
- sc.lockTopicToConsumerChannelMap.Lock()
- defer sc.lockTopicToConsumerChannelMap.Unlock()
+ sc.lockTopicToConsumerChannelMap.RLock()
+ defer sc.lockTopicToConsumerChannelMap.RUnlock()
for _, ch := range consumerCh.channels {
go func(c chan *ic.InterContainerMessage) {
c <- protoMessage