[VOL-1505] This update enables the core to add a key when
publishing an event onto kafka. The corresponding update is
done in the adapter GO components. Similar changes remain to
be done in pyvoltha.
Change-Id: I0bb1e3cb8c2fa9e0214f96d863819755d34a0bb9
diff --git a/adapters/common/core_proxy.go b/adapters/common/core_proxy.go
index a503c97..3198111 100644
--- a/adapters/common/core_proxy.go
+++ b/adapters/common/core_proxy.go
@@ -108,7 +108,7 @@
Value: deviceTypes,
}
- success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, "", args...)
log.Debugw("Register-Adapter-response", log.Fields{"replyTopic": replyToTopic, "success": success})
return unPackResponse(rpc, "", success, result)
}
@@ -124,7 +124,7 @@
}
// Use a device specific topic as we are the only adaptercore handling requests for this device
replyToTopic := ap.getAdapterTopic()
- success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
log.Debugw("DeviceUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
return unPackResponse(rpc, device.Id, success, result)
}
@@ -148,7 +148,7 @@
// Use a device specific topic as we are the only adaptercore handling requests for this device
replyToTopic := ap.getAdapterTopic()
- success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, deviceId, args...)
log.Debugw("PortCreated-response", log.Fields{"deviceId": deviceId, "success": success})
return unPackResponse(rpc, deviceId, success, result)
}
@@ -179,13 +179,13 @@
}
// Use a device specific topic as we are the only adaptercore handling requests for this device
replyToTopic := ap.getAdapterTopic()
- success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, deviceId, args...)
log.Debugw("DeviceStateUpdate-response", log.Fields{"deviceId": deviceId, "success": success})
return unPackResponse(rpc, deviceId, success, result)
}
func (ap *CoreProxy) ChildDeviceDetected(ctx context.Context, parentDeviceId string, parentPortNo int,
- childDeviceType string, channelId int) error {
+ childDeviceType string, channelId int, vendorId string, serialNumber string, onuId int64 ) error {
log.Debugw("ChildDeviceDetected", log.Fields{"pPeviceId": parentDeviceId, "channelId": channelId})
rpc := "ChildDeviceDetected"
// Use a device specific topic to send the request. The adapter handling the device creates a device
@@ -193,7 +193,7 @@
toTopic := ap.getCoreTopic(parentDeviceId)
replyToTopic := ap.getAdapterTopic()
- args := make([]*kafka.KVArg, 4)
+ args := make([]*kafka.KVArg, 7)
id := &voltha.ID{Id: parentDeviceId}
args[0] = &kafka.KVArg{
Key: "parent_device_id",
@@ -214,8 +214,23 @@
Key: "channel_id",
Value: channel,
}
+ vId := &ic.StrType{Val: vendorId}
+ args[4] = &kafka.KVArg{
+ Key: "vendor_id",
+ Value: vId,
+ }
+ sNo := &ic.StrType{Val: serialNumber}
+ args[5] = &kafka.KVArg{
+ Key: "serial_number",
+ Value: sNo,
+ }
+ oId := &ic.IntType{Val: int64(onuId)}
+ args[6] = &kafka.KVArg{
+ Key: "onu_id",
+ Value: oId,
+ }
- success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
log.Debugw("ChildDeviceDetected-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
return unPackResponse(rpc, parentDeviceId, success, result)
}