[VOL-1588] Improve Flow Add performance
This update consists of the following:
1) Update the performance when adding a flow to a logical device,
decomposing the flow into parent and child device and sending the
flow to the adapters.
2) Format a number of files as per GO fmt.
3) Ensure the device graph cache gets updated when a new port is
added to the graph that belongs to an existing device in cache.
The flow update/deletion performance will be addressed in a separate
commit.
Change-Id: I2eb663cc73eef9fc6172203ed88a35726f5fe008
diff --git a/kafka/sarama_client.go b/kafka/sarama_client.go
index add1900..e920a83 100644
--- a/kafka/sarama_client.go
+++ b/kafka/sarama_client.go
@@ -52,7 +52,7 @@
producer sarama.AsyncProducer
consumer sarama.Consumer
groupConsumers map[string]*scc.Consumer
- lockOfGroupConsumers sync.RWMutex
+ lockOfGroupConsumers sync.RWMutex
consumerGroupPrefix string
consumerType int
consumerGroupName string
@@ -454,7 +454,6 @@
// Send message to kafka
sc.producer.Input() <- kafkaMsg
-
// Wait for result
// TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
select {
@@ -920,7 +919,6 @@
return channels
}
-
func (sc *SaramaClient) addToGroupConsumers(topic string, consumer *scc.Consumer) {
sc.lockOfGroupConsumers.Lock()
defer sc.lockOfGroupConsumers.Unlock()
@@ -935,7 +933,7 @@
if _, exist := sc.groupConsumers[topic]; exist {
consumer := sc.groupConsumers[topic]
delete(sc.groupConsumers, topic)
- if err := consumer.Close(); err!= nil {
+ if err := consumer.Close(); err != nil {
log.Errorw("failure-closing-consumer", log.Fields{"error": err})
return err
}