[VOL-1505]  This update enables the core to add a key when
publishing an event onto kafka.   The corresponding update is
done in the adapter GO components.   Similar changes remain to
be done in pyvoltha.

Change-Id: I0bb1e3cb8c2fa9e0214f96d863819755d34a0bb9
diff --git a/kafka/kafka_inter_container_library.go b/kafka/kafka_inter_container_library.go
index c5e0772..1229e7a 100644
--- a/kafka/kafka_inter_container_library.go
+++ b/kafka/kafka_inter_container_library.go
@@ -227,7 +227,7 @@
 
 // InvokeRPC is used to send a request to a given topic
 func (kp *InterContainerProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
-	waitForResponse bool, kvArgs ...*KVArg) (bool, *any.Any) {
+	waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any) {
 
 	//	If a replyToTopic is provided then we use it, otherwise just use the  default toTopic.  The replyToTopic is
 	// typically the device ID.
@@ -237,7 +237,7 @@
 	}
 
 	// Encode the request
-	protoRequest, err := encodeRequest(rpc, toTopic, responseTopic, kvArgs...)
+	protoRequest, err := encodeRequest(rpc, toTopic, responseTopic, key, kvArgs...)
 	if err != nil {
 		log.Warnw("cannot-format-request", log.Fields{"rpc": rpc, "error": err})
 		return false, nil
@@ -255,7 +255,7 @@
 	// Send request - if the topic is formatted with a device Id then we will send the request using a
 	// specific key, hence ensuring a single partition is used to publish the request.  This ensures that the
 	// subscriber on that topic will receive the request in the order it was sent.  The key used is the deviceId.
-	key := GetDeviceIdFromTopic(*toTopic)
+	//key := GetDeviceIdFromTopic(*toTopic)
 	log.Debugw("sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key})
 	kp.kafkaClient.Send(protoRequest, toTopic, key)
 
@@ -549,6 +549,7 @@
 		Type:      ic.MessageType_RESPONSE,
 		FromTopic: request.Header.ToTopic,
 		ToTopic:   request.Header.FromTopic,
+		KeyTopic: request.Header.KeyTopic,
 		Timestamp: time.Now().Unix(),
 	}
 
@@ -703,7 +704,7 @@
 			// present then the key will be empty, hence all messages for a given topic will be sent to all
 			// partitions.
 			replyTopic := &Topic{Name: msg.Header.FromTopic}
-			key := GetDeviceIdFromTopic(*replyTopic)
+			key := msg.Header.KeyTopic
 			log.Debugw("sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": icm.Header, "key": key})
 			// TODO: handle error response.
 			kp.kafkaClient.Send(icm, replyTopic, key)
@@ -757,12 +758,13 @@
 
 //formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
 //or an error on failure
-func encodeRequest(rpc string, toTopic *Topic, replyTopic *Topic, kvArgs ...*KVArg) (*ic.InterContainerMessage, error) {
+func encodeRequest(rpc string, toTopic *Topic, replyTopic *Topic, key string, kvArgs ...*KVArg) (*ic.InterContainerMessage, error) {
 	requestHeader := &ic.Header{
 		Id:        uuid.New().String(),
 		Type:      ic.MessageType_REQUEST,
 		FromTopic: replyTopic.Name,
 		ToTopic:   toTopic.Name,
+		KeyTopic: key,
 		Timestamp: time.Now().Unix(),
 	}
 	requestBody := &ic.InterContainerRequestBody{