[VOL-1499] Use precreated topic
This commit migrate from dynamically created kafka topic to
pre-created topic. The changes are made in the rw_core, simulated
onu and olt adapters, and ponsim olt and onu adapters.
TODO: move the python shared library changes into the pyvoltha
repo.
Change-Id: Ia92287ec74009872e694aa22eb896d8a6487d231
diff --git a/adapters/simulated_olt/adaptercore/simulated_olt.go b/adapters/simulated_olt/adaptercore/simulated_olt.go
index edf3135..529b89d 100644
--- a/adapters/simulated_olt/adaptercore/simulated_olt.go
+++ b/adapters/simulated_olt/adaptercore/simulated_olt.go
@@ -96,16 +96,6 @@
return nil
}
-func (so *SimulatedOLT) createDeviceTopic(device *voltha.Device) error {
- log.Infow("create-device-topic", log.Fields{"deviceId": device.Id})
- deviceTopic := kafka.Topic{Name: so.kafkaICProxy.DefaultTopic.Name + "_" + device.Id}
- if err := so.kafkaICProxy.SubscribeWithDefaultRequestHandler(deviceTopic, kafka.OffsetOldest); err != nil {
- log.Infow("create-device-topic-failed", log.Fields{"deviceId": device.Id, "error": err})
- return err
- }
- return nil
-}
-
func (so *SimulatedOLT) Adopt_device(device *voltha.Device) error {
if device == nil {
log.Warn("device-is-nil")
@@ -117,8 +107,6 @@
handler := NewDeviceHandler(so.coreProxy, device, so)
so.addDeviceHandlerToMap(handler)
go handler.AdoptDevice(device)
- // Launch the creation of the device topic
- go so.createDeviceTopic(device)
}
return nil
}
diff --git a/adapters/simulated_olt/main.go b/adapters/simulated_olt/main.go
index 01234a9..ea6fb6c 100644
--- a/adapters/simulated_olt/main.go
+++ b/adapters/simulated_olt/main.go
@@ -91,7 +91,7 @@
}
// Register the core request handler
- if err = a.setupRequestHandler(a.instanceId, a.iAdapter); err != nil {
+ if err = a.setupRequestHandler(a.instanceId, a.iAdapter, a.coreProxy); err != nil {
log.Fatal("error-setting-core-request-handler")
}
@@ -218,9 +218,9 @@
return sOLT, nil
}
-func (a *adapter) setupRequestHandler(coreInstanceId string, iadapter adapters.IAdapter) error {
+func (a *adapter) setupRequestHandler(coreInstanceId string, iadapter adapters.IAdapter, coreProxy *com.CoreProxy) error {
log.Info("setting-request-handler")
- requestProxy := com.NewRequestHandlerProxy(coreInstanceId, iadapter)
+ requestProxy := com.NewRequestHandlerProxy(coreInstanceId, iadapter, coreProxy)
if err := a.kip.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: a.config.Topic}, requestProxy); err != nil {
log.Errorw("request-handler-setup-failed", log.Fields{"error": err})
return err