[VOL-1499] Use precreated topic
This commit migrate from dynamically created kafka topic to
pre-created topic. The changes are made in the rw_core, simulated
onu and olt adapters, and ponsim olt and onu adapters.
TODO: move the python shared library changes into the pyvoltha
repo.
Change-Id: Ia92287ec74009872e694aa22eb896d8a6487d231
diff --git a/rw_core/core/adapter_proxy.go b/rw_core/core/adapter_proxy.go
index ed09c9d..b3ba10d 100644
--- a/rw_core/core/adapter_proxy.go
+++ b/rw_core/core/adapter_proxy.go
@@ -30,12 +30,15 @@
type AdapterProxy struct {
TestMode bool
+ deviceTopicRegistered bool
+ coreTopic *kafka.Topic
kafkaICProxy *kafka.InterContainerProxy
}
func NewAdapterProxy(kafkaProxy *kafka.InterContainerProxy) *AdapterProxy {
var proxy AdapterProxy
proxy.kafkaICProxy = kafkaProxy
+ proxy.deviceTopicRegistered = false
return &proxy
}
@@ -55,22 +58,42 @@
}
}
+func (ap *AdapterProxy) updateCoreTopic(coreTopic *kafka.Topic) {
+ ap.coreTopic = coreTopic
+}
+
+func (ap *AdapterProxy) getCoreTopic() kafka.Topic{
+ if ap.coreTopic != nil {
+ return *ap.coreTopic
+ }
+ return kafka.Topic{Name:ap.kafkaICProxy.DefaultTopic.Name}
+}
+
+func (ap *AdapterProxy) getAdapterTopic(adapterName string) kafka.Topic{
+ return kafka.Topic{Name: adapterName}
+}
+
+
func (ap *AdapterProxy) AdoptDevice(ctx context.Context, device *voltha.Device) error {
log.Debugw("AdoptDevice", log.Fields{"device": device})
rpc := "adopt_device"
- topic := kafka.Topic{Name: device.Adapter}
+ toTopic := ap.getAdapterTopic(device.Adapter)
+ //topic := kafka.Topic{Name: device.Adapter}
args := make([]*kafka.KVArg, 1)
args[0] = &kafka.KVArg{
Key: "device",
Value: device,
}
// Use a device topic for the response as we are the only core handling requests for this device
- replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
- if err := ap.kafkaICProxy.SubscribeWithDefaultRequestHandler(replyToTopic, kafka.OffsetOldest); err != nil {
- log.Errorw("Unable-to-subscribe-new-topic", log.Fields{"topic": replyToTopic, "error": err})
- return err
- }
- success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, args...)
+ replyToTopic := ap.getCoreTopic()
+ //if !ap.deviceTopicRegistered {
+ // if err := ap.kafkaICProxy.SubscribeWithDefaultRequestHandler(replyToTopic, kafka.OffsetOldest); err != nil {
+ // log.Errorw("Unable-to-subscribe-new-topic", log.Fields{"topic": replyToTopic, "error": err})
+ // return err
+ // }
+ //}
+ ap.deviceTopicRegistered = true
+ success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
log.Debugw("AdoptDevice-response", log.Fields{"replyTopic": replyToTopic, "deviceid": device.Id, "success": success})
return unPackResponse(rpc, device.Id, success, result)
}
@@ -78,16 +101,19 @@
func (ap *AdapterProxy) DisableDevice(ctx context.Context, device *voltha.Device) error {
log.Debugw("DisableDevice", log.Fields{"deviceId": device.Id})
rpc := "disable_device"
+ toTopic := ap.getAdapterTopic(device.Adapter)
+
// Use a device specific topic to send the request. The adapter handling the device creates a device
// specific topic
- toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
+ //toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
args := make([]*kafka.KVArg, 1)
args[0] = &kafka.KVArg{
Key: "device",
Value: device,
}
// Use a device specific topic as we are the only core handling requests for this device
- replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+ //replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+ replyToTopic := ap.getCoreTopic()
success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
log.Debugw("DisableDevice-response", log.Fields{"deviceId": device.Id, "success": success})
return unPackResponse(rpc, device.Id, success, result)
@@ -96,14 +122,14 @@
func (ap *AdapterProxy) ReEnableDevice(ctx context.Context, device *voltha.Device) error {
log.Debugw("ReEnableDevice", log.Fields{"deviceId": device.Id})
rpc := "reenable_device"
- toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
+ toTopic := ap.getAdapterTopic(device.Adapter)
args := make([]*kafka.KVArg, 1)
args[0] = &kafka.KVArg{
Key: "device",
Value: device,
}
// Use a device specific topic as we are the only core handling requests for this device
- replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+ replyToTopic := ap.getCoreTopic()
success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
log.Debugw("ReEnableDevice-response", log.Fields{"deviceid": device.Id, "success": success})
return unPackResponse(rpc, device.Id, success, result)
@@ -112,14 +138,14 @@
func (ap *AdapterProxy) RebootDevice(ctx context.Context, device *voltha.Device) error {
log.Debugw("RebootDevice", log.Fields{"deviceId": device.Id})
rpc := "reboot_device"
- toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
+ toTopic := ap.getAdapterTopic(device.Adapter)
args := make([]*kafka.KVArg, 1)
args[0] = &kafka.KVArg{
Key: "device",
Value: device,
}
// Use a device specific topic as we are the only core handling requests for this device
- replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+ replyToTopic := ap.getCoreTopic()
success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
log.Debugw("RebootDevice-response", log.Fields{"deviceid": device.Id, "success": success})
return unPackResponse(rpc, device.Id, success, result)
@@ -128,14 +154,14 @@
func (ap *AdapterProxy) DeleteDevice(ctx context.Context, device *voltha.Device) error {
log.Debugw("DeleteDevice", log.Fields{"deviceId": device.Id})
rpc := "delete_device"
- toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
+ toTopic := ap.getAdapterTopic(device.Adapter)
args := make([]*kafka.KVArg, 1)
args[0] = &kafka.KVArg{
Key: "device",
Value: device,
}
// Use a device specific topic as we are the only core handling requests for this device
- replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+ replyToTopic := ap.getCoreTopic()
success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
log.Debugw("DeleteDevice-response", log.Fields{"deviceid": device.Id, "success": success})
@@ -150,14 +176,14 @@
func (ap *AdapterProxy) GetOfpDeviceInfo(ctx context.Context, device *voltha.Device) (*ic.SwitchCapability, error) {
log.Debugw("GetOfpDeviceInfo", log.Fields{"deviceId": device.Id})
- toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
+ toTopic := ap.getAdapterTopic(device.Adapter)
args := make([]*kafka.KVArg, 1)
args[0] = &kafka.KVArg{
Key: "device",
Value: device,
}
// Use a device specific topic as we are the only core handling requests for this device
- replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+ replyToTopic := ap.getCoreTopic()
success, result := ap.kafkaICProxy.InvokeRPC(ctx, "get_ofp_device_info", &toTopic, &replyToTopic, true, args...)
log.Debugw("GetOfpDeviceInfo-response", log.Fields{"deviceId": device.Id, "success": success, "result": result})
if success {
@@ -181,7 +207,7 @@
func (ap *AdapterProxy) GetOfpPortInfo(ctx context.Context, device *voltha.Device, portNo uint32) (*ic.PortCapability, error) {
log.Debugw("GetOfpPortInfo", log.Fields{"deviceId": device.Id})
- toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
+ toTopic := ap.getAdapterTopic(device.Adapter)
args := make([]*kafka.KVArg, 2)
args[0] = &kafka.KVArg{
Key: "device",
@@ -193,7 +219,7 @@
Value: pNo,
}
// Use a device specific topic as we are the only core handling requests for this device
- replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+ replyToTopic := ap.getCoreTopic()
success, result := ap.kafkaICProxy.InvokeRPC(ctx, "get_ofp_port_info", &toTopic, &replyToTopic, true, args...)
log.Debugw("GetOfpPortInfo-response", log.Fields{"deviceid": device.Id, "success": success})
if success {
@@ -250,7 +276,7 @@
func (ap *AdapterProxy) DownloadImage(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
log.Debugw("DownloadImage", log.Fields{"deviceId": device.Id, "image": download.Name})
rpc := "download_image"
- toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
+ toTopic := ap.getAdapterTopic(device.Adapter)
args := make([]*kafka.KVArg, 2)
args[0] = &kafka.KVArg{
Key: "device",
@@ -261,7 +287,7 @@
Value: download,
}
// Use a device specific topic as we are the only core handling requests for this device
- replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+ replyToTopic := ap.getCoreTopic()
success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
log.Debugw("DownloadImage-response", log.Fields{"deviceId": device.Id, "success": success})
@@ -271,7 +297,7 @@
func (ap *AdapterProxy) GetImageDownloadStatus(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (*voltha.ImageDownload, error) {
log.Debugw("GetImageDownloadStatus", log.Fields{"deviceId": device.Id, "image": download.Name})
rpc := "get_image_download_status"
- toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
+ toTopic := ap.getAdapterTopic(device.Adapter)
args := make([]*kafka.KVArg, 2)
args[0] = &kafka.KVArg{
Key: "device",
@@ -282,7 +308,7 @@
Value: download,
}
// Use a device specific topic as we are the only core handling requests for this device
- replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+ replyToTopic := ap.getCoreTopic()
success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
log.Debugw("GetImageDownloadStatus-response", log.Fields{"deviceId": device.Id, "success": success})
@@ -308,7 +334,7 @@
func (ap *AdapterProxy) CancelImageDownload(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
log.Debugw("CancelImageDownload", log.Fields{"deviceId": device.Id, "image": download.Name})
rpc := "cancel_image_download"
- toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
+ toTopic := ap.getAdapterTopic(device.Adapter)
args := make([]*kafka.KVArg, 2)
args[0] = &kafka.KVArg{
Key: "device",
@@ -319,7 +345,7 @@
Value: download,
}
// Use a device specific topic as we are the only core handling requests for this device
- replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+ replyToTopic := ap.getCoreTopic()
success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
log.Debugw("CancelImageDownload-response", log.Fields{"deviceId": device.Id, "success": success})
@@ -329,7 +355,7 @@
func (ap *AdapterProxy) ActivateImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
log.Debugw("ActivateImageUpdate", log.Fields{"deviceId": device.Id, "image": download.Name})
rpc := "activate_image_update"
- toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
+ toTopic := ap.getAdapterTopic(device.Adapter)
args := make([]*kafka.KVArg, 2)
args[0] = &kafka.KVArg{
Key: "device",
@@ -340,7 +366,7 @@
Value: download,
}
// Use a device specific topic as we are the only core handling requests for this device
- replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+ replyToTopic := ap.getCoreTopic()
success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
log.Debugw("ActivateImageUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
@@ -350,7 +376,7 @@
func (ap *AdapterProxy) RevertImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
log.Debugw("RevertImageUpdate", log.Fields{"deviceId": device.Id, "image": download.Name})
rpc := "revert_image_update"
- toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
+ toTopic := ap.getAdapterTopic(device.Adapter)
args := make([]*kafka.KVArg, 2)
args[0] = &kafka.KVArg{
Key: "device",
@@ -361,7 +387,7 @@
Value: download,
}
// Use a device specific topic as we are the only core handling requests for this device
- replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+ replyToTopic := ap.getCoreTopic()
success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
log.Debugw("RevertImageUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
@@ -375,7 +401,7 @@
func (ap *AdapterProxy) packetOut(deviceType string, deviceId string, outPort uint32, packet *openflow_13.OfpPacketOut) error {
log.Debugw("packetOut", log.Fields{"deviceId": deviceId})
- toTopic := kafka.CreateSubTopic(deviceType, deviceId)
+ toTopic := ap.getAdapterTopic(deviceType)
rpc := "receive_packet_out"
dId := &ic.StrType{Val: deviceId}
args := make([]*kafka.KVArg, 3)
@@ -395,15 +421,15 @@
// TODO: Do we need to wait for an ACK on a packet Out?
// Use a device specific topic as we are the only core handling requests for this device
- replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, deviceId)
- success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, false, args...)
+ replyToTopic := ap.getCoreTopic()
+ success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
log.Debugw("packetOut", log.Fields{"deviceid": deviceId, "success": success})
return unPackResponse(rpc, deviceId, success, result)
}
func (ap *AdapterProxy) UpdateFlowsBulk(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups) error {
log.Debugw("UpdateFlowsBulk", log.Fields{"deviceId": device.Id})
- toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
+ toTopic := ap.getAdapterTopic(device.Adapter)
rpc := "update_flows_bulk"
args := make([]*kafka.KVArg, 3)
args[0] = &kafka.KVArg{
@@ -420,7 +446,7 @@
}
// Use a device specific topic as we are the only core handling requests for this device
- replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+ replyToTopic := ap.getCoreTopic()
success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
log.Debugw("UpdateFlowsBulk-response", log.Fields{"deviceid": device.Id, "success": success})
return unPackResponse(rpc, device.Id, success, result)
@@ -428,7 +454,7 @@
func (ap *AdapterProxy) UpdateFlowsIncremental(device *voltha.Device, flowChanges *openflow_13.FlowChanges, groupChanges *openflow_13.FlowGroupChanges) error {
log.Debugw("UpdateFlowsIncremental", log.Fields{"deviceId": device.Id})
- toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
+ toTopic := ap.getAdapterTopic(device.Adapter)
rpc := "update_flows_bulk"
args := make([]*kafka.KVArg, 3)
args[0] = &kafka.KVArg{
@@ -445,7 +471,7 @@
}
// Use a device specific topic as we are the only core handling requests for this device
- replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+ replyToTopic := ap.getCoreTopic()
success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
log.Debugw("UpdateFlowsIncremental-response", log.Fields{"deviceid": device.Id, "success": success})
return unPackResponse(rpc, device.Id, success, result)