[VOL-1505]  This update enables the core to add a key when
publishing an event onto kafka.   The corresponding update is
done in the adapter GO components.   Similar changes remain to
be done in pyvoltha.

Change-Id: I0bb1e3cb8c2fa9e0214f96d863819755d34a0bb9
diff --git a/rw_core/core/adapter_proxy.go b/rw_core/core/adapter_proxy.go
index b3ba10d..5d21838 100644
--- a/rw_core/core/adapter_proxy.go
+++ b/rw_core/core/adapter_proxy.go
@@ -93,7 +93,7 @@
 	//	}
 	//}
 	ap.deviceTopicRegistered = true
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
+	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
 	log.Debugw("AdoptDevice-response", log.Fields{"replyTopic": replyToTopic, "deviceid": device.Id, "success": success})
 	return unPackResponse(rpc, device.Id, success, result)
 }
@@ -114,7 +114,7 @@
 	// Use a device specific topic as we are the only core handling requests for this device
 	//replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
+	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
 	log.Debugw("DisableDevice-response", log.Fields{"deviceId": device.Id, "success": success})
 	return unPackResponse(rpc, device.Id, success, result)
 }
@@ -130,7 +130,7 @@
 	}
 	// Use a device specific topic as we are the only core handling requests for this device
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
+	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
 	log.Debugw("ReEnableDevice-response", log.Fields{"deviceid": device.Id, "success": success})
 	return unPackResponse(rpc, device.Id, success, result)
 }
@@ -146,7 +146,7 @@
 	}
 	// Use a device specific topic as we are the only core handling requests for this device
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
+	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
 	log.Debugw("RebootDevice-response", log.Fields{"deviceid": device.Id, "success": success})
 	return unPackResponse(rpc, device.Id, success, result)
 }
@@ -162,7 +162,7 @@
 	}
 	// Use a device specific topic as we are the only core handling requests for this device
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
+	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
 	log.Debugw("DeleteDevice-response", log.Fields{"deviceid": device.Id, "success": success})
 
 	// We no longer need to have this device topic as we won't receive any unsolicited messages on it
@@ -184,7 +184,7 @@
 	}
 	// Use a device specific topic as we are the only core handling requests for this device
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, "get_ofp_device_info", &toTopic, &replyToTopic, true, args...)
+	success, result := ap.kafkaICProxy.InvokeRPC(ctx, "get_ofp_device_info", &toTopic, &replyToTopic, true, device.Id, args...)
 	log.Debugw("GetOfpDeviceInfo-response", log.Fields{"deviceId": device.Id, "success": success, "result": result})
 	if success {
 		unpackResult := &ic.SwitchCapability{}
@@ -220,7 +220,7 @@
 	}
 	// Use a device specific topic as we are the only core handling requests for this device
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, "get_ofp_port_info", &toTopic, &replyToTopic, true, args...)
+	success, result := ap.kafkaICProxy.InvokeRPC(ctx, "get_ofp_port_info", &toTopic, &replyToTopic, true, device.Id, args...)
 	log.Debugw("GetOfpPortInfo-response", log.Fields{"deviceid": device.Id, "success": success})
 	if success {
 		unpackResult := &ic.PortCapability{}
@@ -288,7 +288,7 @@
 	}
 	// Use a device specific topic as we are the only core handling requests for this device
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
+	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
 	log.Debugw("DownloadImage-response", log.Fields{"deviceId": device.Id, "success": success})
 
 	return unPackResponse(rpc, device.Id, success, result)
@@ -309,7 +309,7 @@
 	}
 	// Use a device specific topic as we are the only core handling requests for this device
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
+	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
 	log.Debugw("GetImageDownloadStatus-response", log.Fields{"deviceId": device.Id, "success": success})
 
 	if success {
@@ -346,7 +346,7 @@
 	}
 	// Use a device specific topic as we are the only core handling requests for this device
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
+	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
 	log.Debugw("CancelImageDownload-response", log.Fields{"deviceId": device.Id, "success": success})
 
 	return unPackResponse(rpc, device.Id, success, result)
@@ -367,7 +367,7 @@
 	}
 	// Use a device specific topic as we are the only core handling requests for this device
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
+	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
 	log.Debugw("ActivateImageUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
 
 	return unPackResponse(rpc, device.Id, success, result)
@@ -388,7 +388,7 @@
 	}
 	// Use a device specific topic as we are the only core handling requests for this device
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
+	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
 	log.Debugw("RevertImageUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
 
 	return unPackResponse(rpc, device.Id, success, result)
@@ -422,7 +422,7 @@
 	// TODO:  Do we need to wait for an ACK on a packet Out?
 	// Use a device specific topic as we are the only core handling requests for this device
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
+	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, deviceId, args...)
 	log.Debugw("packetOut", log.Fields{"deviceid": deviceId, "success": success})
 	return unPackResponse(rpc, deviceId, success, result)
 }
@@ -447,7 +447,7 @@
 
 	// Use a device specific topic as we are the only core handling requests for this device
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
+	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
 	log.Debugw("UpdateFlowsBulk-response", log.Fields{"deviceid": device.Id, "success": success})
 	return unPackResponse(rpc, device.Id, success, result)
 }
@@ -472,7 +472,7 @@
 
 	// Use a device specific topic as we are the only core handling requests for this device
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
+	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
 	log.Debugw("UpdateFlowsIncremental-response", log.Fields{"deviceid": device.Id, "success": success})
 	return unPackResponse(rpc, device.Id, success, result)
 }