[VOL-1505] This update enables the core to add a key when
publishing an event onto kafka. The corresponding update is
done in the adapter GO components. Similar changes remain to
be done in pyvoltha.
Change-Id: I0bb1e3cb8c2fa9e0214f96d863819755d34a0bb9
diff --git a/rw_core/core/adapter_proxy.go b/rw_core/core/adapter_proxy.go
index b3ba10d..5d21838 100644
--- a/rw_core/core/adapter_proxy.go
+++ b/rw_core/core/adapter_proxy.go
@@ -93,7 +93,7 @@
// }
//}
ap.deviceTopicRegistered = true
- success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
log.Debugw("AdoptDevice-response", log.Fields{"replyTopic": replyToTopic, "deviceid": device.Id, "success": success})
return unPackResponse(rpc, device.Id, success, result)
}
@@ -114,7 +114,7 @@
// Use a device specific topic as we are the only core handling requests for this device
//replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
replyToTopic := ap.getCoreTopic()
- success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
log.Debugw("DisableDevice-response", log.Fields{"deviceId": device.Id, "success": success})
return unPackResponse(rpc, device.Id, success, result)
}
@@ -130,7 +130,7 @@
}
// Use a device specific topic as we are the only core handling requests for this device
replyToTopic := ap.getCoreTopic()
- success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
log.Debugw("ReEnableDevice-response", log.Fields{"deviceid": device.Id, "success": success})
return unPackResponse(rpc, device.Id, success, result)
}
@@ -146,7 +146,7 @@
}
// Use a device specific topic as we are the only core handling requests for this device
replyToTopic := ap.getCoreTopic()
- success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
log.Debugw("RebootDevice-response", log.Fields{"deviceid": device.Id, "success": success})
return unPackResponse(rpc, device.Id, success, result)
}
@@ -162,7 +162,7 @@
}
// Use a device specific topic as we are the only core handling requests for this device
replyToTopic := ap.getCoreTopic()
- success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
log.Debugw("DeleteDevice-response", log.Fields{"deviceid": device.Id, "success": success})
// We no longer need to have this device topic as we won't receive any unsolicited messages on it
@@ -184,7 +184,7 @@
}
// Use a device specific topic as we are the only core handling requests for this device
replyToTopic := ap.getCoreTopic()
- success, result := ap.kafkaICProxy.InvokeRPC(ctx, "get_ofp_device_info", &toTopic, &replyToTopic, true, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(ctx, "get_ofp_device_info", &toTopic, &replyToTopic, true, device.Id, args...)
log.Debugw("GetOfpDeviceInfo-response", log.Fields{"deviceId": device.Id, "success": success, "result": result})
if success {
unpackResult := &ic.SwitchCapability{}
@@ -220,7 +220,7 @@
}
// Use a device specific topic as we are the only core handling requests for this device
replyToTopic := ap.getCoreTopic()
- success, result := ap.kafkaICProxy.InvokeRPC(ctx, "get_ofp_port_info", &toTopic, &replyToTopic, true, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(ctx, "get_ofp_port_info", &toTopic, &replyToTopic, true, device.Id, args...)
log.Debugw("GetOfpPortInfo-response", log.Fields{"deviceid": device.Id, "success": success})
if success {
unpackResult := &ic.PortCapability{}
@@ -288,7 +288,7 @@
}
// Use a device specific topic as we are the only core handling requests for this device
replyToTopic := ap.getCoreTopic()
- success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
log.Debugw("DownloadImage-response", log.Fields{"deviceId": device.Id, "success": success})
return unPackResponse(rpc, device.Id, success, result)
@@ -309,7 +309,7 @@
}
// Use a device specific topic as we are the only core handling requests for this device
replyToTopic := ap.getCoreTopic()
- success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
log.Debugw("GetImageDownloadStatus-response", log.Fields{"deviceId": device.Id, "success": success})
if success {
@@ -346,7 +346,7 @@
}
// Use a device specific topic as we are the only core handling requests for this device
replyToTopic := ap.getCoreTopic()
- success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
log.Debugw("CancelImageDownload-response", log.Fields{"deviceId": device.Id, "success": success})
return unPackResponse(rpc, device.Id, success, result)
@@ -367,7 +367,7 @@
}
// Use a device specific topic as we are the only core handling requests for this device
replyToTopic := ap.getCoreTopic()
- success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
log.Debugw("ActivateImageUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
return unPackResponse(rpc, device.Id, success, result)
@@ -388,7 +388,7 @@
}
// Use a device specific topic as we are the only core handling requests for this device
replyToTopic := ap.getCoreTopic()
- success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
log.Debugw("RevertImageUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
return unPackResponse(rpc, device.Id, success, result)
@@ -422,7 +422,7 @@
// TODO: Do we need to wait for an ACK on a packet Out?
// Use a device specific topic as we are the only core handling requests for this device
replyToTopic := ap.getCoreTopic()
- success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, deviceId, args...)
log.Debugw("packetOut", log.Fields{"deviceid": deviceId, "success": success})
return unPackResponse(rpc, deviceId, success, result)
}
@@ -447,7 +447,7 @@
// Use a device specific topic as we are the only core handling requests for this device
replyToTopic := ap.getCoreTopic()
- success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
log.Debugw("UpdateFlowsBulk-response", log.Fields{"deviceid": device.Id, "success": success})
return unPackResponse(rpc, device.Id, success, result)
}
@@ -472,7 +472,7 @@
// Use a device specific topic as we are the only core handling requests for this device
replyToTopic := ap.getCoreTopic()
- success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
log.Debugw("UpdateFlowsIncremental-response", log.Fields{"deviceid": device.Id, "success": success})
return unPackResponse(rpc, device.Id, success, result)
}