[VOL-1505] This update enables the core to add a key when
publishing an event onto kafka. The corresponding update is
done in the adapter GO components. Similar changes remain to
be done in pyvoltha.
Change-Id: I0bb1e3cb8c2fa9e0214f96d863819755d34a0bb9
diff --git a/kafka/kafka_inter_container_library.go b/kafka/kafka_inter_container_library.go
index c5e0772..1229e7a 100644
--- a/kafka/kafka_inter_container_library.go
+++ b/kafka/kafka_inter_container_library.go
@@ -227,7 +227,7 @@
// InvokeRPC is used to send a request to a given topic
func (kp *InterContainerProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
- waitForResponse bool, kvArgs ...*KVArg) (bool, *any.Any) {
+ waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any) {
// If a replyToTopic is provided then we use it, otherwise just use the default toTopic. The replyToTopic is
// typically the device ID.
@@ -237,7 +237,7 @@
}
// Encode the request
- protoRequest, err := encodeRequest(rpc, toTopic, responseTopic, kvArgs...)
+ protoRequest, err := encodeRequest(rpc, toTopic, responseTopic, key, kvArgs...)
if err != nil {
log.Warnw("cannot-format-request", log.Fields{"rpc": rpc, "error": err})
return false, nil
@@ -255,7 +255,7 @@
// Send request - if the topic is formatted with a device Id then we will send the request using a
// specific key, hence ensuring a single partition is used to publish the request. This ensures that the
// subscriber on that topic will receive the request in the order it was sent. The key used is the deviceId.
- key := GetDeviceIdFromTopic(*toTopic)
+ //key := GetDeviceIdFromTopic(*toTopic)
log.Debugw("sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key})
kp.kafkaClient.Send(protoRequest, toTopic, key)
@@ -549,6 +549,7 @@
Type: ic.MessageType_RESPONSE,
FromTopic: request.Header.ToTopic,
ToTopic: request.Header.FromTopic,
+ KeyTopic: request.Header.KeyTopic,
Timestamp: time.Now().Unix(),
}
@@ -703,7 +704,7 @@
// present then the key will be empty, hence all messages for a given topic will be sent to all
// partitions.
replyTopic := &Topic{Name: msg.Header.FromTopic}
- key := GetDeviceIdFromTopic(*replyTopic)
+ key := msg.Header.KeyTopic
log.Debugw("sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": icm.Header, "key": key})
// TODO: handle error response.
kp.kafkaClient.Send(icm, replyTopic, key)
@@ -757,12 +758,13 @@
//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
//or an error on failure
-func encodeRequest(rpc string, toTopic *Topic, replyTopic *Topic, kvArgs ...*KVArg) (*ic.InterContainerMessage, error) {
+func encodeRequest(rpc string, toTopic *Topic, replyTopic *Topic, key string, kvArgs ...*KVArg) (*ic.InterContainerMessage, error) {
requestHeader := &ic.Header{
Id: uuid.New().String(),
Type: ic.MessageType_REQUEST,
FromTopic: replyTopic.Name,
ToTopic: toTopic.Name,
+ KeyTopic: key,
Timestamp: time.Now().Unix(),
}
requestBody := &ic.InterContainerRequestBody{