[VOL-1499] Use precreated topic
This commit migrate from dynamically created kafka topic to
pre-created topic. The changes are made in the rw_core, simulated
onu and olt adapters, and ponsim olt and onu adapters.
TODO: move the python shared library changes into the pyvoltha
repo.
Change-Id: Ia92287ec74009872e694aa22eb896d8a6487d231
diff --git a/adapters/common/core_proxy.go b/adapters/common/core_proxy.go
index f076910..a503c97 100644
--- a/adapters/common/core_proxy.go
+++ b/adapters/common/core_proxy.go
@@ -25,12 +25,16 @@
"github.com/opencord/voltha-go/protos/voltha"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
+ "sync"
)
type CoreProxy struct {
kafkaICProxy *kafka.InterContainerProxy
adapterTopic string
coreTopic string
+ deviceIdCoreMap map[string]string
+ lockDeviceIdCoreMap sync.RWMutex
+
}
func NewCoreProxy(kafkaProxy *kafka.InterContainerProxy, adapterTopic string, coreTopic string) *CoreProxy {
@@ -38,6 +42,8 @@
proxy.kafkaICProxy = kafkaProxy
proxy.adapterTopic = adapterTopic
proxy.coreTopic = coreTopic
+ proxy.deviceIdCoreMap = make(map[string]string)
+ proxy.lockDeviceIdCoreMap = sync.RWMutex{}
log.Debugw("TOPICS", log.Fields{"core": proxy.coreTopic, "adapter": proxy.adapterTopic})
return &proxy
@@ -58,11 +64,40 @@
}
}
+// UpdateCoreReference adds or update a core reference (really the topic name) for a given device Id
+func (ap *CoreProxy) UpdateCoreReference(deviceId string, coreReference string) {
+ ap.lockDeviceIdCoreMap.Lock()
+ defer ap.lockDeviceIdCoreMap.Unlock()
+ ap.deviceIdCoreMap[deviceId] = coreReference
+}
+
+// DeleteCoreReference removes a core reference (really the topic name) for a given device Id
+func (ap *CoreProxy) DeleteCoreReference(deviceId string) {
+ ap.lockDeviceIdCoreMap.Lock()
+ defer ap.lockDeviceIdCoreMap.Unlock()
+ delete(ap.deviceIdCoreMap, deviceId)
+}
+
+func (ap *CoreProxy) getCoreTopic(deviceId string) kafka.Topic {
+ ap.lockDeviceIdCoreMap.Lock()
+ defer ap.lockDeviceIdCoreMap.Unlock()
+
+ if t, exist := ap.deviceIdCoreMap[deviceId]; exist {
+ return kafka.Topic{Name: t}
+ }
+
+ return kafka.Topic{Name: ap.coreTopic}
+}
+
+func (ap *CoreProxy) getAdapterTopic(args ...string) kafka.Topic {
+ return kafka.Topic{Name: ap.adapterTopic}
+}
+
func (ap *CoreProxy) RegisterAdapter(ctx context.Context, adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) error {
log.Debugw("registering-adapter", log.Fields{"coreTopic": ap.coreTopic, "adapterTopic": ap.adapterTopic})
rpc := "Register"
topic := kafka.Topic{Name: ap.coreTopic}
- replyToTopic := kafka.Topic{Name: ap.adapterTopic}
+ replyToTopic := ap.getAdapterTopic()
args := make([]*kafka.KVArg, 2)
args[0] = &kafka.KVArg{
Key: "adapter",
@@ -81,16 +116,14 @@
func (ap *CoreProxy) DeviceUpdate(ctx context.Context, device *voltha.Device) error {
log.Debugw("DeviceUpdate", log.Fields{"deviceId": device.Id})
rpc := "DeviceUpdate"
- // Use a device specific topic to send the request. The adapter handling the device creates a device
- // specific topic
- toTopic := kafka.CreateSubTopic(ap.coreTopic, device.Id)
+ toTopic := ap.getCoreTopic(device.Id)
args := make([]*kafka.KVArg, 1)
args[0] = &kafka.KVArg{
Key: "device",
Value: device,
}
// Use a device specific topic as we are the only adaptercore handling requests for this device
- replyToTopic := kafka.CreateSubTopic(ap.adapterTopic, device.Id)
+ replyToTopic := ap.getAdapterTopic()
success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
log.Debugw("DeviceUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
return unPackResponse(rpc, device.Id, success, result)
@@ -101,7 +134,7 @@
rpc := "PortCreated"
// Use a device specific topic to send the request. The adapter handling the device creates a device
// specific topic
- toTopic := kafka.CreateSubTopic(ap.coreTopic, deviceId)
+ toTopic := ap.getCoreTopic(deviceId)
args := make([]*kafka.KVArg, 2)
id := &voltha.ID{Id: deviceId}
args[0] = &kafka.KVArg{
@@ -114,7 +147,7 @@
}
// Use a device specific topic as we are the only adaptercore handling requests for this device
- replyToTopic := kafka.CreateSubTopic(ap.adapterTopic, deviceId)
+ replyToTopic := ap.getAdapterTopic()
success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
log.Debugw("PortCreated-response", log.Fields{"deviceId": deviceId, "success": success})
return unPackResponse(rpc, deviceId, success, result)
@@ -126,7 +159,7 @@
rpc := "DeviceStateUpdate"
// Use a device specific topic to send the request. The adapter handling the device creates a device
// specific topic
- toTopic := kafka.CreateSubTopic(ap.coreTopic, deviceId)
+ toTopic := ap.getCoreTopic(deviceId)
args := make([]*kafka.KVArg, 3)
id := &voltha.ID{Id: deviceId}
oStatus := &ic.IntType{Val: int64(operStatus)}
@@ -145,7 +178,7 @@
Value: cStatus,
}
// Use a device specific topic as we are the only adaptercore handling requests for this device
- replyToTopic := kafka.CreateSubTopic(ap.adapterTopic, deviceId)
+ replyToTopic := ap.getAdapterTopic()
success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
log.Debugw("DeviceStateUpdate-response", log.Fields{"deviceId": deviceId, "success": success})
return unPackResponse(rpc, deviceId, success, result)
@@ -157,8 +190,8 @@
rpc := "ChildDeviceDetected"
// Use a device specific topic to send the request. The adapter handling the device creates a device
// specific topic
- toTopic := kafka.CreateSubTopic(ap.coreTopic, parentDeviceId)
- replyToTopic := kafka.CreateSubTopic(ap.adapterTopic, parentDeviceId)
+ toTopic := ap.getCoreTopic(parentDeviceId)
+ replyToTopic := ap.getAdapterTopic()
args := make([]*kafka.KVArg, 4)
id := &voltha.ID{Id: parentDeviceId}
diff --git a/adapters/common/request_handler.go b/adapters/common/request_handler.go
index b3606b0..4d65d89 100644
--- a/adapters/common/request_handler.go
+++ b/adapters/common/request_handler.go
@@ -32,12 +32,14 @@
TestMode bool
coreInstanceId string
adapter adapters.IAdapter
+ coreProxy *CoreProxy
}
-func NewRequestHandlerProxy(coreInstanceId string, iadapter adapters.IAdapter) *RequestHandlerProxy {
+func NewRequestHandlerProxy(coreInstanceId string, iadapter adapters.IAdapter, cProxy *CoreProxy) *RequestHandlerProxy {
var proxy RequestHandlerProxy
proxy.coreInstanceId = coreInstanceId
proxy.adapter = iadapter
+ proxy.coreProxy = cProxy
return &proxy
}
@@ -54,13 +56,14 @@
}
func (rhp *RequestHandlerProxy) Adopt_device(args []*ic.Argument) (*empty.Empty, error) {
- if len(args) != 2 {
+ if len(args) < 3 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
device := &voltha.Device{}
transactionID := &ic.StrType{}
+ fromTopic := &ic.StrType{}
for _, arg := range args {
switch arg.Key {
case "device":
@@ -73,11 +76,19 @@
log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
return nil, err
}
+ case kafka.FromTopic:
+ if err := ptypes.UnmarshalAny(arg.Value, fromTopic); err != nil {
+ log.Warnw("cannot-unmarshal-from-topic", log.Fields{"error": err})
+ return nil, err
+ }
}
}
log.Debugw("Adopt_device", log.Fields{"deviceId": device.Id})
+ //Update the core reference for that device
+ rhp.coreProxy.UpdateCoreReference(device.Id, fromTopic.Val)
+
//Invoke the adopt device on the adapter
if err := rhp.adapter.Adopt_device(device); err != nil {
return nil, status.Errorf(codes.NotFound, "%s", err.Error())
@@ -143,7 +154,7 @@
}
func (rhp *RequestHandlerProxy) Get_ofp_device_info(args []*ic.Argument) (*ic.SwitchCapability, error) {
- if len(args) != 2 {
+ if len(args) < 2 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
@@ -177,7 +188,7 @@
}
func (rhp *RequestHandlerProxy) Get_ofp_port_info(args []*ic.Argument) (*ic.PortCapability, error) {
- if len(args) != 3 {
+ if len(args) < 3 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
@@ -214,7 +225,7 @@
}
func (rhp *RequestHandlerProxy) Process_inter_adapter_message(args []*ic.Argument) (*empty.Empty, error) {
- if len(args) != 2 {
+ if len(args) < 2 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
diff --git a/adapters/simulated_olt/adaptercore/simulated_olt.go b/adapters/simulated_olt/adaptercore/simulated_olt.go
index edf3135..529b89d 100644
--- a/adapters/simulated_olt/adaptercore/simulated_olt.go
+++ b/adapters/simulated_olt/adaptercore/simulated_olt.go
@@ -96,16 +96,6 @@
return nil
}
-func (so *SimulatedOLT) createDeviceTopic(device *voltha.Device) error {
- log.Infow("create-device-topic", log.Fields{"deviceId": device.Id})
- deviceTopic := kafka.Topic{Name: so.kafkaICProxy.DefaultTopic.Name + "_" + device.Id}
- if err := so.kafkaICProxy.SubscribeWithDefaultRequestHandler(deviceTopic, kafka.OffsetOldest); err != nil {
- log.Infow("create-device-topic-failed", log.Fields{"deviceId": device.Id, "error": err})
- return err
- }
- return nil
-}
-
func (so *SimulatedOLT) Adopt_device(device *voltha.Device) error {
if device == nil {
log.Warn("device-is-nil")
@@ -117,8 +107,6 @@
handler := NewDeviceHandler(so.coreProxy, device, so)
so.addDeviceHandlerToMap(handler)
go handler.AdoptDevice(device)
- // Launch the creation of the device topic
- go so.createDeviceTopic(device)
}
return nil
}
diff --git a/adapters/simulated_olt/main.go b/adapters/simulated_olt/main.go
index 01234a9..ea6fb6c 100644
--- a/adapters/simulated_olt/main.go
+++ b/adapters/simulated_olt/main.go
@@ -91,7 +91,7 @@
}
// Register the core request handler
- if err = a.setupRequestHandler(a.instanceId, a.iAdapter); err != nil {
+ if err = a.setupRequestHandler(a.instanceId, a.iAdapter, a.coreProxy); err != nil {
log.Fatal("error-setting-core-request-handler")
}
@@ -218,9 +218,9 @@
return sOLT, nil
}
-func (a *adapter) setupRequestHandler(coreInstanceId string, iadapter adapters.IAdapter) error {
+func (a *adapter) setupRequestHandler(coreInstanceId string, iadapter adapters.IAdapter, coreProxy *com.CoreProxy) error {
log.Info("setting-request-handler")
- requestProxy := com.NewRequestHandlerProxy(coreInstanceId, iadapter)
+ requestProxy := com.NewRequestHandlerProxy(coreInstanceId, iadapter, coreProxy)
if err := a.kip.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: a.config.Topic}, requestProxy); err != nil {
log.Errorw("request-handler-setup-failed", log.Fields{"error": err})
return err
diff --git a/adapters/simulated_onu/adaptercore/simulated_onu.go b/adapters/simulated_onu/adaptercore/simulated_onu.go
index 2d43944..c10b844 100644
--- a/adapters/simulated_onu/adaptercore/simulated_onu.go
+++ b/adapters/simulated_onu/adaptercore/simulated_onu.go
@@ -94,16 +94,6 @@
return nil
}
-func (so *SimulatedONU) createDeviceTopic(device *voltha.Device) error {
- log.Infow("create-device-topic", log.Fields{"deviceId": device.Id})
- deviceTopic := kafka.Topic{Name: so.kafkaICProxy.DefaultTopic.Name + "_" + device.Id}
- if err := so.kafkaICProxy.SubscribeWithDefaultRequestHandler(deviceTopic, kafka.OffsetOldest); err != nil {
- log.Infow("create-device-topic-failed", log.Fields{"deviceId": device.Id, "error": err})
- return err
- }
- return nil
-}
-
func (so *SimulatedONU) Adopt_device(device *voltha.Device) error {
if device == nil {
log.Warn("device-is-nil")
@@ -115,8 +105,6 @@
handler := NewDeviceHandler(so.coreProxy, device, so)
so.addDeviceHandlerToMap(handler)
go handler.AdoptDevice(device)
- // Launch the creation of the device topic
- go so.createDeviceTopic(device)
}
return nil
}
diff --git a/adapters/simulated_onu/main.go b/adapters/simulated_onu/main.go
index 62cf2db..a5c18aa 100644
--- a/adapters/simulated_onu/main.go
+++ b/adapters/simulated_onu/main.go
@@ -91,7 +91,7 @@
}
// Register the core request handler
- if err = a.setupRequestHandler(a.instanceId, a.iAdapter); err != nil {
+ if err = a.setupRequestHandler(a.instanceId, a.iAdapter, a.coreProxy); err != nil {
log.Fatal("error-setting-core-request-handler")
}
@@ -218,9 +218,9 @@
return sOLT, nil
}
-func (a *adapter) setupRequestHandler(coreInstanceId string, iadapter adapters.IAdapter) error {
+func (a *adapter) setupRequestHandler(coreInstanceId string, iadapter adapters.IAdapter, coreProxy *com.CoreProxy) error {
log.Info("setting-request-handler")
- requestProxy := com.NewRequestHandlerProxy(coreInstanceId, iadapter)
+ requestProxy := com.NewRequestHandlerProxy(coreInstanceId, iadapter, coreProxy)
if err := a.kip.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: a.config.Topic}, requestProxy); err != nil {
log.Errorw("adaptercore-request-handler-setup-failed", log.Fields{"error": err})
return err