[VOL-1499] Use precreated topic
This commit migrate from dynamically created kafka topic to
pre-created topic. The changes are made in the rw_core, simulated
onu and olt adapters, and ponsim olt and onu adapters.
TODO: move the python shared library changes into the pyvoltha
repo.
Change-Id: Ia92287ec74009872e694aa22eb896d8a6487d231
diff --git a/adapters/common/core_proxy.go b/adapters/common/core_proxy.go
index f076910..a503c97 100644
--- a/adapters/common/core_proxy.go
+++ b/adapters/common/core_proxy.go
@@ -25,12 +25,16 @@
"github.com/opencord/voltha-go/protos/voltha"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
+ "sync"
)
type CoreProxy struct {
kafkaICProxy *kafka.InterContainerProxy
adapterTopic string
coreTopic string
+ deviceIdCoreMap map[string]string
+ lockDeviceIdCoreMap sync.RWMutex
+
}
func NewCoreProxy(kafkaProxy *kafka.InterContainerProxy, adapterTopic string, coreTopic string) *CoreProxy {
@@ -38,6 +42,8 @@
proxy.kafkaICProxy = kafkaProxy
proxy.adapterTopic = adapterTopic
proxy.coreTopic = coreTopic
+ proxy.deviceIdCoreMap = make(map[string]string)
+ proxy.lockDeviceIdCoreMap = sync.RWMutex{}
log.Debugw("TOPICS", log.Fields{"core": proxy.coreTopic, "adapter": proxy.adapterTopic})
return &proxy
@@ -58,11 +64,40 @@
}
}
+// UpdateCoreReference adds or update a core reference (really the topic name) for a given device Id
+func (ap *CoreProxy) UpdateCoreReference(deviceId string, coreReference string) {
+ ap.lockDeviceIdCoreMap.Lock()
+ defer ap.lockDeviceIdCoreMap.Unlock()
+ ap.deviceIdCoreMap[deviceId] = coreReference
+}
+
+// DeleteCoreReference removes a core reference (really the topic name) for a given device Id
+func (ap *CoreProxy) DeleteCoreReference(deviceId string) {
+ ap.lockDeviceIdCoreMap.Lock()
+ defer ap.lockDeviceIdCoreMap.Unlock()
+ delete(ap.deviceIdCoreMap, deviceId)
+}
+
+func (ap *CoreProxy) getCoreTopic(deviceId string) kafka.Topic {
+ ap.lockDeviceIdCoreMap.Lock()
+ defer ap.lockDeviceIdCoreMap.Unlock()
+
+ if t, exist := ap.deviceIdCoreMap[deviceId]; exist {
+ return kafka.Topic{Name: t}
+ }
+
+ return kafka.Topic{Name: ap.coreTopic}
+}
+
+func (ap *CoreProxy) getAdapterTopic(args ...string) kafka.Topic {
+ return kafka.Topic{Name: ap.adapterTopic}
+}
+
func (ap *CoreProxy) RegisterAdapter(ctx context.Context, adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) error {
log.Debugw("registering-adapter", log.Fields{"coreTopic": ap.coreTopic, "adapterTopic": ap.adapterTopic})
rpc := "Register"
topic := kafka.Topic{Name: ap.coreTopic}
- replyToTopic := kafka.Topic{Name: ap.adapterTopic}
+ replyToTopic := ap.getAdapterTopic()
args := make([]*kafka.KVArg, 2)
args[0] = &kafka.KVArg{
Key: "adapter",
@@ -81,16 +116,14 @@
func (ap *CoreProxy) DeviceUpdate(ctx context.Context, device *voltha.Device) error {
log.Debugw("DeviceUpdate", log.Fields{"deviceId": device.Id})
rpc := "DeviceUpdate"
- // Use a device specific topic to send the request. The adapter handling the device creates a device
- // specific topic
- toTopic := kafka.CreateSubTopic(ap.coreTopic, device.Id)
+ toTopic := ap.getCoreTopic(device.Id)
args := make([]*kafka.KVArg, 1)
args[0] = &kafka.KVArg{
Key: "device",
Value: device,
}
// Use a device specific topic as we are the only adaptercore handling requests for this device
- replyToTopic := kafka.CreateSubTopic(ap.adapterTopic, device.Id)
+ replyToTopic := ap.getAdapterTopic()
success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
log.Debugw("DeviceUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
return unPackResponse(rpc, device.Id, success, result)
@@ -101,7 +134,7 @@
rpc := "PortCreated"
// Use a device specific topic to send the request. The adapter handling the device creates a device
// specific topic
- toTopic := kafka.CreateSubTopic(ap.coreTopic, deviceId)
+ toTopic := ap.getCoreTopic(deviceId)
args := make([]*kafka.KVArg, 2)
id := &voltha.ID{Id: deviceId}
args[0] = &kafka.KVArg{
@@ -114,7 +147,7 @@
}
// Use a device specific topic as we are the only adaptercore handling requests for this device
- replyToTopic := kafka.CreateSubTopic(ap.adapterTopic, deviceId)
+ replyToTopic := ap.getAdapterTopic()
success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
log.Debugw("PortCreated-response", log.Fields{"deviceId": deviceId, "success": success})
return unPackResponse(rpc, deviceId, success, result)
@@ -126,7 +159,7 @@
rpc := "DeviceStateUpdate"
// Use a device specific topic to send the request. The adapter handling the device creates a device
// specific topic
- toTopic := kafka.CreateSubTopic(ap.coreTopic, deviceId)
+ toTopic := ap.getCoreTopic(deviceId)
args := make([]*kafka.KVArg, 3)
id := &voltha.ID{Id: deviceId}
oStatus := &ic.IntType{Val: int64(operStatus)}
@@ -145,7 +178,7 @@
Value: cStatus,
}
// Use a device specific topic as we are the only adaptercore handling requests for this device
- replyToTopic := kafka.CreateSubTopic(ap.adapterTopic, deviceId)
+ replyToTopic := ap.getAdapterTopic()
success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
log.Debugw("DeviceStateUpdate-response", log.Fields{"deviceId": deviceId, "success": success})
return unPackResponse(rpc, deviceId, success, result)
@@ -157,8 +190,8 @@
rpc := "ChildDeviceDetected"
// Use a device specific topic to send the request. The adapter handling the device creates a device
// specific topic
- toTopic := kafka.CreateSubTopic(ap.coreTopic, parentDeviceId)
- replyToTopic := kafka.CreateSubTopic(ap.adapterTopic, parentDeviceId)
+ toTopic := ap.getCoreTopic(parentDeviceId)
+ replyToTopic := ap.getAdapterTopic()
args := make([]*kafka.KVArg, 4)
id := &voltha.ID{Id: parentDeviceId}