[VOL-1588] Improve Flow Add performance

This update consists of the following:
1) Update the performance when adding a flow to a logical device,
decomposing the flow into parent and child device and sending the
flow to the adapters.
2) Format a number of files as per GO fmt.
3) Ensure the device graph cache gets updated when a new port is
added to the graph that belongs to an existing device in cache.

The flow update/deletion performance will be addressed in a separate
commit.

Change-Id: I2eb663cc73eef9fc6172203ed88a35726f5fe008
diff --git a/kafka/client.go b/kafka/client.go
index a4c49ca..3d37f6e 100644
--- a/kafka/client.go
+++ b/kafka/client.go
@@ -32,7 +32,7 @@
 
 const (
 	GroupIdKey = "groupId"
-	Offset = "offset"
+	Offset     = "offset"
 )
 
 const (
diff --git a/kafka/kafka_inter_container_library.go b/kafka/kafka_inter_container_library.go
index b9c03e6..afad2ac 100644
--- a/kafka/kafka_inter_container_library.go
+++ b/kafka/kafka_inter_container_library.go
@@ -549,7 +549,7 @@
 		Type:      ic.MessageType_RESPONSE,
 		FromTopic: request.Header.ToTopic,
 		ToTopic:   request.Header.FromTopic,
-		KeyTopic: request.Header.KeyTopic,
+		KeyTopic:  request.Header.KeyTopic,
 		Timestamp: time.Now().UnixNano(),
 	}
 
@@ -706,7 +706,7 @@
 			key := msg.Header.KeyTopic
 			log.Debugw("sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": icm.Header, "key": key})
 			// TODO: handle error response.
-			 go kp.kafkaClient.Send(icm, replyTopic, key)
+			go kp.kafkaClient.Send(icm, replyTopic, key)
 		}
 	} else if msg.Header.Type == ic.MessageType_RESPONSE {
 		log.Debugw("response-received", log.Fields{"msg-header": msg.Header})
@@ -763,7 +763,7 @@
 		Type:      ic.MessageType_REQUEST,
 		FromTopic: replyTopic.Name,
 		ToTopic:   toTopic.Name,
-		KeyTopic: key,
+		KeyTopic:  key,
 		Timestamp: time.Now().UnixNano(),
 	}
 	requestBody := &ic.InterContainerRequestBody{
diff --git a/kafka/sarama_client.go b/kafka/sarama_client.go
index add1900..e920a83 100644
--- a/kafka/sarama_client.go
+++ b/kafka/sarama_client.go
@@ -52,7 +52,7 @@
 	producer                      sarama.AsyncProducer
 	consumer                      sarama.Consumer
 	groupConsumers                map[string]*scc.Consumer
-	lockOfGroupConsumers            sync.RWMutex
+	lockOfGroupConsumers          sync.RWMutex
 	consumerGroupPrefix           string
 	consumerType                  int
 	consumerGroupName             string
@@ -454,7 +454,6 @@
 
 	// Send message to kafka
 	sc.producer.Input() <- kafkaMsg
-
 	// Wait for result
 	// TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
 	select {
@@ -920,7 +919,6 @@
 	return channels
 }
 
-
 func (sc *SaramaClient) addToGroupConsumers(topic string, consumer *scc.Consumer) {
 	sc.lockOfGroupConsumers.Lock()
 	defer sc.lockOfGroupConsumers.Unlock()
@@ -935,7 +933,7 @@
 	if _, exist := sc.groupConsumers[topic]; exist {
 		consumer := sc.groupConsumers[topic]
 		delete(sc.groupConsumers, topic)
-		if err := consumer.Close(); err!= nil {
+		if err := consumer.Close(); err != nil {
 			log.Errorw("failure-closing-consumer", log.Fields{"error": err})
 			return err
 		}