[VOL-1499] Use precreated topic
This commit migrate from dynamically created kafka topic to
pre-created topic. The changes are made in the rw_core, simulated
onu and olt adapters, and ponsim olt and onu adapters.
TODO: move the python shared library changes into the pyvoltha
repo.
Change-Id: Ia92287ec74009872e694aa22eb896d8a6487d231
diff --git a/rw_core/core/adapter_proxy.go b/rw_core/core/adapter_proxy.go
index ed09c9d..b3ba10d 100644
--- a/rw_core/core/adapter_proxy.go
+++ b/rw_core/core/adapter_proxy.go
@@ -30,12 +30,15 @@
type AdapterProxy struct {
TestMode bool
+ deviceTopicRegistered bool
+ coreTopic *kafka.Topic
kafkaICProxy *kafka.InterContainerProxy
}
func NewAdapterProxy(kafkaProxy *kafka.InterContainerProxy) *AdapterProxy {
var proxy AdapterProxy
proxy.kafkaICProxy = kafkaProxy
+ proxy.deviceTopicRegistered = false
return &proxy
}
@@ -55,22 +58,42 @@
}
}
+func (ap *AdapterProxy) updateCoreTopic(coreTopic *kafka.Topic) {
+ ap.coreTopic = coreTopic
+}
+
+func (ap *AdapterProxy) getCoreTopic() kafka.Topic{
+ if ap.coreTopic != nil {
+ return *ap.coreTopic
+ }
+ return kafka.Topic{Name:ap.kafkaICProxy.DefaultTopic.Name}
+}
+
+func (ap *AdapterProxy) getAdapterTopic(adapterName string) kafka.Topic{
+ return kafka.Topic{Name: adapterName}
+}
+
+
func (ap *AdapterProxy) AdoptDevice(ctx context.Context, device *voltha.Device) error {
log.Debugw("AdoptDevice", log.Fields{"device": device})
rpc := "adopt_device"
- topic := kafka.Topic{Name: device.Adapter}
+ toTopic := ap.getAdapterTopic(device.Adapter)
+ //topic := kafka.Topic{Name: device.Adapter}
args := make([]*kafka.KVArg, 1)
args[0] = &kafka.KVArg{
Key: "device",
Value: device,
}
// Use a device topic for the response as we are the only core handling requests for this device
- replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
- if err := ap.kafkaICProxy.SubscribeWithDefaultRequestHandler(replyToTopic, kafka.OffsetOldest); err != nil {
- log.Errorw("Unable-to-subscribe-new-topic", log.Fields{"topic": replyToTopic, "error": err})
- return err
- }
- success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, args...)
+ replyToTopic := ap.getCoreTopic()
+ //if !ap.deviceTopicRegistered {
+ // if err := ap.kafkaICProxy.SubscribeWithDefaultRequestHandler(replyToTopic, kafka.OffsetOldest); err != nil {
+ // log.Errorw("Unable-to-subscribe-new-topic", log.Fields{"topic": replyToTopic, "error": err})
+ // return err
+ // }
+ //}
+ ap.deviceTopicRegistered = true
+ success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
log.Debugw("AdoptDevice-response", log.Fields{"replyTopic": replyToTopic, "deviceid": device.Id, "success": success})
return unPackResponse(rpc, device.Id, success, result)
}
@@ -78,16 +101,19 @@
func (ap *AdapterProxy) DisableDevice(ctx context.Context, device *voltha.Device) error {
log.Debugw("DisableDevice", log.Fields{"deviceId": device.Id})
rpc := "disable_device"
+ toTopic := ap.getAdapterTopic(device.Adapter)
+
// Use a device specific topic to send the request. The adapter handling the device creates a device
// specific topic
- toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
+ //toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
args := make([]*kafka.KVArg, 1)
args[0] = &kafka.KVArg{
Key: "device",
Value: device,
}
// Use a device specific topic as we are the only core handling requests for this device
- replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+ //replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+ replyToTopic := ap.getCoreTopic()
success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
log.Debugw("DisableDevice-response", log.Fields{"deviceId": device.Id, "success": success})
return unPackResponse(rpc, device.Id, success, result)
@@ -96,14 +122,14 @@
func (ap *AdapterProxy) ReEnableDevice(ctx context.Context, device *voltha.Device) error {
log.Debugw("ReEnableDevice", log.Fields{"deviceId": device.Id})
rpc := "reenable_device"
- toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
+ toTopic := ap.getAdapterTopic(device.Adapter)
args := make([]*kafka.KVArg, 1)
args[0] = &kafka.KVArg{
Key: "device",
Value: device,
}
// Use a device specific topic as we are the only core handling requests for this device
- replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+ replyToTopic := ap.getCoreTopic()
success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
log.Debugw("ReEnableDevice-response", log.Fields{"deviceid": device.Id, "success": success})
return unPackResponse(rpc, device.Id, success, result)
@@ -112,14 +138,14 @@
func (ap *AdapterProxy) RebootDevice(ctx context.Context, device *voltha.Device) error {
log.Debugw("RebootDevice", log.Fields{"deviceId": device.Id})
rpc := "reboot_device"
- toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
+ toTopic := ap.getAdapterTopic(device.Adapter)
args := make([]*kafka.KVArg, 1)
args[0] = &kafka.KVArg{
Key: "device",
Value: device,
}
// Use a device specific topic as we are the only core handling requests for this device
- replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+ replyToTopic := ap.getCoreTopic()
success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
log.Debugw("RebootDevice-response", log.Fields{"deviceid": device.Id, "success": success})
return unPackResponse(rpc, device.Id, success, result)
@@ -128,14 +154,14 @@
func (ap *AdapterProxy) DeleteDevice(ctx context.Context, device *voltha.Device) error {
log.Debugw("DeleteDevice", log.Fields{"deviceId": device.Id})
rpc := "delete_device"
- toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
+ toTopic := ap.getAdapterTopic(device.Adapter)
args := make([]*kafka.KVArg, 1)
args[0] = &kafka.KVArg{
Key: "device",
Value: device,
}
// Use a device specific topic as we are the only core handling requests for this device
- replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+ replyToTopic := ap.getCoreTopic()
success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
log.Debugw("DeleteDevice-response", log.Fields{"deviceid": device.Id, "success": success})
@@ -150,14 +176,14 @@
func (ap *AdapterProxy) GetOfpDeviceInfo(ctx context.Context, device *voltha.Device) (*ic.SwitchCapability, error) {
log.Debugw("GetOfpDeviceInfo", log.Fields{"deviceId": device.Id})
- toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
+ toTopic := ap.getAdapterTopic(device.Adapter)
args := make([]*kafka.KVArg, 1)
args[0] = &kafka.KVArg{
Key: "device",
Value: device,
}
// Use a device specific topic as we are the only core handling requests for this device
- replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+ replyToTopic := ap.getCoreTopic()
success, result := ap.kafkaICProxy.InvokeRPC(ctx, "get_ofp_device_info", &toTopic, &replyToTopic, true, args...)
log.Debugw("GetOfpDeviceInfo-response", log.Fields{"deviceId": device.Id, "success": success, "result": result})
if success {
@@ -181,7 +207,7 @@
func (ap *AdapterProxy) GetOfpPortInfo(ctx context.Context, device *voltha.Device, portNo uint32) (*ic.PortCapability, error) {
log.Debugw("GetOfpPortInfo", log.Fields{"deviceId": device.Id})
- toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
+ toTopic := ap.getAdapterTopic(device.Adapter)
args := make([]*kafka.KVArg, 2)
args[0] = &kafka.KVArg{
Key: "device",
@@ -193,7 +219,7 @@
Value: pNo,
}
// Use a device specific topic as we are the only core handling requests for this device
- replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+ replyToTopic := ap.getCoreTopic()
success, result := ap.kafkaICProxy.InvokeRPC(ctx, "get_ofp_port_info", &toTopic, &replyToTopic, true, args...)
log.Debugw("GetOfpPortInfo-response", log.Fields{"deviceid": device.Id, "success": success})
if success {
@@ -250,7 +276,7 @@
func (ap *AdapterProxy) DownloadImage(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
log.Debugw("DownloadImage", log.Fields{"deviceId": device.Id, "image": download.Name})
rpc := "download_image"
- toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
+ toTopic := ap.getAdapterTopic(device.Adapter)
args := make([]*kafka.KVArg, 2)
args[0] = &kafka.KVArg{
Key: "device",
@@ -261,7 +287,7 @@
Value: download,
}
// Use a device specific topic as we are the only core handling requests for this device
- replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+ replyToTopic := ap.getCoreTopic()
success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
log.Debugw("DownloadImage-response", log.Fields{"deviceId": device.Id, "success": success})
@@ -271,7 +297,7 @@
func (ap *AdapterProxy) GetImageDownloadStatus(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (*voltha.ImageDownload, error) {
log.Debugw("GetImageDownloadStatus", log.Fields{"deviceId": device.Id, "image": download.Name})
rpc := "get_image_download_status"
- toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
+ toTopic := ap.getAdapterTopic(device.Adapter)
args := make([]*kafka.KVArg, 2)
args[0] = &kafka.KVArg{
Key: "device",
@@ -282,7 +308,7 @@
Value: download,
}
// Use a device specific topic as we are the only core handling requests for this device
- replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+ replyToTopic := ap.getCoreTopic()
success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
log.Debugw("GetImageDownloadStatus-response", log.Fields{"deviceId": device.Id, "success": success})
@@ -308,7 +334,7 @@
func (ap *AdapterProxy) CancelImageDownload(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
log.Debugw("CancelImageDownload", log.Fields{"deviceId": device.Id, "image": download.Name})
rpc := "cancel_image_download"
- toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
+ toTopic := ap.getAdapterTopic(device.Adapter)
args := make([]*kafka.KVArg, 2)
args[0] = &kafka.KVArg{
Key: "device",
@@ -319,7 +345,7 @@
Value: download,
}
// Use a device specific topic as we are the only core handling requests for this device
- replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+ replyToTopic := ap.getCoreTopic()
success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
log.Debugw("CancelImageDownload-response", log.Fields{"deviceId": device.Id, "success": success})
@@ -329,7 +355,7 @@
func (ap *AdapterProxy) ActivateImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
log.Debugw("ActivateImageUpdate", log.Fields{"deviceId": device.Id, "image": download.Name})
rpc := "activate_image_update"
- toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
+ toTopic := ap.getAdapterTopic(device.Adapter)
args := make([]*kafka.KVArg, 2)
args[0] = &kafka.KVArg{
Key: "device",
@@ -340,7 +366,7 @@
Value: download,
}
// Use a device specific topic as we are the only core handling requests for this device
- replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+ replyToTopic := ap.getCoreTopic()
success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
log.Debugw("ActivateImageUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
@@ -350,7 +376,7 @@
func (ap *AdapterProxy) RevertImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
log.Debugw("RevertImageUpdate", log.Fields{"deviceId": device.Id, "image": download.Name})
rpc := "revert_image_update"
- toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
+ toTopic := ap.getAdapterTopic(device.Adapter)
args := make([]*kafka.KVArg, 2)
args[0] = &kafka.KVArg{
Key: "device",
@@ -361,7 +387,7 @@
Value: download,
}
// Use a device specific topic as we are the only core handling requests for this device
- replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+ replyToTopic := ap.getCoreTopic()
success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
log.Debugw("RevertImageUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
@@ -375,7 +401,7 @@
func (ap *AdapterProxy) packetOut(deviceType string, deviceId string, outPort uint32, packet *openflow_13.OfpPacketOut) error {
log.Debugw("packetOut", log.Fields{"deviceId": deviceId})
- toTopic := kafka.CreateSubTopic(deviceType, deviceId)
+ toTopic := ap.getAdapterTopic(deviceType)
rpc := "receive_packet_out"
dId := &ic.StrType{Val: deviceId}
args := make([]*kafka.KVArg, 3)
@@ -395,15 +421,15 @@
// TODO: Do we need to wait for an ACK on a packet Out?
// Use a device specific topic as we are the only core handling requests for this device
- replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, deviceId)
- success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, false, args...)
+ replyToTopic := ap.getCoreTopic()
+ success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
log.Debugw("packetOut", log.Fields{"deviceid": deviceId, "success": success})
return unPackResponse(rpc, deviceId, success, result)
}
func (ap *AdapterProxy) UpdateFlowsBulk(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups) error {
log.Debugw("UpdateFlowsBulk", log.Fields{"deviceId": device.Id})
- toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
+ toTopic := ap.getAdapterTopic(device.Adapter)
rpc := "update_flows_bulk"
args := make([]*kafka.KVArg, 3)
args[0] = &kafka.KVArg{
@@ -420,7 +446,7 @@
}
// Use a device specific topic as we are the only core handling requests for this device
- replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+ replyToTopic := ap.getCoreTopic()
success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
log.Debugw("UpdateFlowsBulk-response", log.Fields{"deviceid": device.Id, "success": success})
return unPackResponse(rpc, device.Id, success, result)
@@ -428,7 +454,7 @@
func (ap *AdapterProxy) UpdateFlowsIncremental(device *voltha.Device, flowChanges *openflow_13.FlowChanges, groupChanges *openflow_13.FlowGroupChanges) error {
log.Debugw("UpdateFlowsIncremental", log.Fields{"deviceId": device.Id})
- toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
+ toTopic := ap.getAdapterTopic(device.Adapter)
rpc := "update_flows_bulk"
args := make([]*kafka.KVArg, 3)
args[0] = &kafka.KVArg{
@@ -445,7 +471,7 @@
}
// Use a device specific topic as we are the only core handling requests for this device
- replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+ replyToTopic := ap.getCoreTopic()
success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
log.Debugw("UpdateFlowsIncremental-response", log.Fields{"deviceid": device.Id, "success": success})
return unPackResponse(rpc, device.Id, success, result)
diff --git a/rw_core/core/adapter_request_handler.go b/rw_core/core/adapter_request_handler.go
index 88b622e..1be0cff 100644
--- a/rw_core/core/adapter_request_handler.go
+++ b/rw_core/core/adapter_request_handler.go
@@ -81,7 +81,7 @@
}
func (rhp *AdapterRequestHandlerProxy) Register(args []*ic.Argument) (*voltha.CoreInstance, error) {
- if len(args) != 3 {
+ if len(args) < 3 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
@@ -130,7 +130,7 @@
}
func (rhp *AdapterRequestHandlerProxy) GetDevice(args []*ic.Argument) (*voltha.Device, error) {
- if len(args) != 2 {
+ if len(args) < 2 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
@@ -198,7 +198,7 @@
}
func (rhp *AdapterRequestHandlerProxy) DeviceUpdate(args []*ic.Argument) (*empty.Empty, error) {
- if len(args) != 2 {
+ if len(args) < 2 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
@@ -292,7 +292,7 @@
}
func (rhp *AdapterRequestHandlerProxy) GetPorts(args []*ic.Argument) (*voltha.Ports, error) {
- if len(args) != 3 {
+ if len(args) < 3 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
@@ -330,7 +330,7 @@
}
func (rhp *AdapterRequestHandlerProxy) GetChildDevices(args []*ic.Argument) (*voltha.Devices, error) {
- if len(args) != 2 {
+ if len(args) < 2 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
@@ -623,7 +623,7 @@
}
func (rhp *AdapterRequestHandlerProxy) PortCreated(args []*ic.Argument) (*empty.Empty, error) {
- if len(args) != 3 {
+ if len(args) < 3 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
@@ -674,7 +674,7 @@
}
func (rhp *AdapterRequestHandlerProxy) DevicePMConfigUpdate(args []*ic.Argument) (*empty.Empty, error) {
- if len(args) != 3 {
+ if len(args) < 3 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 838235d..d576a6e 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -17,6 +17,8 @@
import (
"context"
+ "errors"
+ "fmt"
grpcserver "github.com/opencord/voltha-go/common/grpc"
"github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/db/kvstore"
@@ -43,6 +45,7 @@
exitChannel chan int
kvClient kvstore.Client
kafkaClient kafka.Client
+ coreMembershipRegistered bool
}
func init() {
@@ -72,6 +75,7 @@
core.localDataRoot = model.NewRoot(&voltha.CoreInstance{}, &backend)
core.clusterDataProxy = core.clusterDataRoot.CreateProxy("/", false)
core.localDataProxy = core.localDataRoot.CreateProxy("/", false)
+ core.coreMembershipRegistered = false
return &core
}
@@ -84,7 +88,8 @@
core.adapterMgr = newAdapterManager(core.clusterDataProxy, core.instanceId)
core.deviceMgr = newDeviceManager(core.kmp, core.clusterDataProxy, core.adapterMgr, core.instanceId)
core.logicalDeviceMgr = newLogicalDeviceManager(core.deviceMgr, core.kmp, core.clusterDataProxy)
- if err := core.registerAdapterRequestHandler(ctx, core.instanceId, core.deviceMgr, core.logicalDeviceMgr, core.adapterMgr, core.clusterDataProxy, core.localDataProxy); err != nil {
+
+ if err := core.registerAdapterRequestHandlers(ctx, core.instanceId, core.deviceMgr, core.logicalDeviceMgr, core.adapterMgr, core.clusterDataProxy, core.localDataProxy); err != nil {
log.Fatal("Failure-registering-adapterRequestHandler")
}
go core.startDeviceManager(ctx)
@@ -113,7 +118,8 @@
core.grpcServer = grpcserver.NewGrpcServer(core.config.GrpcHost, core.config.GrpcPort, nil, false)
log.Info("grpc-server-created")
- core.grpcNBIAPIHandler = NewAPIHandler(core.deviceMgr, core.logicalDeviceMgr, core.adapterMgr, core.config.InCompetingMode, core.config.LongRunningRequestTimeout, core.config.DefaultRequestTimeout)
+ //core.grpcNBIAPIHandler = NewAPIHandler(core.deviceMgr, core.logicalDeviceMgr, core.adapterMgr, core.config.InCompetingMode, core.config.LongRunningRequestTimeout, core.config.DefaultRequestTimeout)
+ core.grpcNBIAPIHandler = NewAPIHandler(core)
core.logicalDeviceMgr.setGrpcNbiHandler(core.grpcNBIAPIHandler)
// Create a function to register the core GRPC service with the GRPC server
f := func(gs *grpc.Server) {
@@ -154,17 +160,57 @@
return nil
}
-func (core *Core) registerAdapterRequestHandler(ctx context.Context, coreInstanceId string, dMgr *DeviceManager,
+//func (core *Core) registerAdapterRequestHandler(ctx context.Context, coreInstanceId string, dMgr *DeviceManager,
+// ldMgr *LogicalDeviceManager, aMgr *AdapterManager, cdProxy *model.Proxy, ldProxy *model.Proxy,
+// ) error {
+// requestProxy := NewAdapterRequestHandlerProxy(coreInstanceId, dMgr, ldMgr, aMgr, cdProxy, ldProxy,
+// core.config.InCompetingMode, core.config.LongRunningRequestTimeout, core.config.DefaultRequestTimeout)
+// core.kmp.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: core.config.CoreTopic}, requestProxy)
+//
+// log.Info("request-handlers")
+// return nil
+//}
+
+func (core *Core) registerAdapterRequestHandlers(ctx context.Context, coreInstanceId string, dMgr *DeviceManager,
ldMgr *LogicalDeviceManager, aMgr *AdapterManager, cdProxy *model.Proxy, ldProxy *model.Proxy,
- ) error {
+) error {
requestProxy := NewAdapterRequestHandlerProxy(coreInstanceId, dMgr, ldMgr, aMgr, cdProxy, ldProxy,
core.config.InCompetingMode, core.config.LongRunningRequestTimeout, core.config.DefaultRequestTimeout)
- core.kmp.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: core.config.CoreTopic}, requestProxy)
- log.Info("request-handlers")
+ // Register the broadcast topic to handle any core-bound broadcast requests
+ if err := core.kmp.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: core.config.CoreTopic}, requestProxy); err != nil {
+ log.Fatalw("Failed-registering-broadcast-handler", log.Fields{"topic": core.config.CoreTopic})
+ return err
+ }
+
+ log.Info("request-handler-registered")
return nil
}
+
+func (core *Core) registerCoreTopic(ctx context.Context, coreTopicSuffix string) error {
+ // Sanity check - can only register once
+ if core.coreMembershipRegistered {
+ return errors.New("Can-only-set-once")
+ }
+
+ go func(coreTopicSuffix string) {
+ // Register the core-pair topic to handle core-bound requests aimed to the core pair
+ pTopic := kafka.Topic{Name: fmt.Sprintf("%s_%s", core.config.CoreTopic, coreTopicSuffix)}
+ // TODO: Set a retry here to ensure this subscription happens
+ if err := core.kmp.SubscribeWithDefaultRequestHandler(pTopic, kafka.OffsetNewest); err != nil {
+ log.Fatalw("Failed-registering-pair-handler", log.Fields{"topic": core.config.CoreTopic})
+ }
+ // Update the CoreTopic to use by the adapter proxy
+ core.deviceMgr.adapterProxy.updateCoreTopic(&pTopic)
+ }(coreTopicSuffix)
+
+ core.coreMembershipRegistered = true
+ log.Info("request-handlers-registered")
+ return nil
+}
+
+
func (core *Core) startDeviceManager(ctx context.Context) {
// TODO: Interaction between the logicaldevicemanager and devicemanager should mostly occur via
// callbacks. For now, until the model is ready, devicemanager will keep a reference to the
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index f72d615..46382c2 100644
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -60,18 +60,34 @@
longRunningRequestTimeout int64
defaultRequestTimeout int64
da.DefaultAPIHandler
+ core *Core
}
-func NewAPIHandler(deviceMgr *DeviceManager, lDeviceMgr *LogicalDeviceManager, adapterMgr *AdapterManager, inCompetingMode bool, longRunningRequestTimeout int64, defaultRequestTimeout int64 ) *APIHandler {
+//func NewAPIHandler(deviceMgr *DeviceManager, lDeviceMgr *LogicalDeviceManager, adapterMgr *AdapterManager, inCompetingMode bool, longRunningRequestTimeout int64, defaultRequestTimeout int64 ) *APIHandler {
+// handler := &APIHandler{
+// deviceMgr: deviceMgr,
+// logicalDeviceMgr: lDeviceMgr,
+// adapterMgr:adapterMgr,
+// coreInCompetingMode:inCompetingMode,
+// longRunningRequestTimeout:longRunningRequestTimeout,
+// defaultRequestTimeout:defaultRequestTimeout,
+// // TODO: Figure out what the 'hint' parameter to queue.New does
+// packetInQueue: queue.New(10),
+// }
+// return handler
+//}
+
+func NewAPIHandler(core *Core) *APIHandler {
handler := &APIHandler{
- deviceMgr: deviceMgr,
- logicalDeviceMgr: lDeviceMgr,
- adapterMgr:adapterMgr,
- coreInCompetingMode:inCompetingMode,
- longRunningRequestTimeout:longRunningRequestTimeout,
- defaultRequestTimeout:defaultRequestTimeout,
+ deviceMgr: core.deviceMgr,
+ logicalDeviceMgr: core.logicalDeviceMgr,
+ adapterMgr: core.adapterMgr,
+ coreInCompetingMode: core.config.InCompetingMode,
+ longRunningRequestTimeout:core.config.LongRunningRequestTimeout,
+ defaultRequestTimeout:core.config.DefaultRequestTimeout,
// TODO: Figure out what the 'hint' parameter to queue.New does
packetInQueue: queue.New(10),
+ core: core,
}
return handler
}
@@ -186,6 +202,16 @@
return out, nil
}
+
+func (handler *APIHandler) UpdateMembership(ctx context.Context, membership *voltha.Membership) (*empty.Empty, error) {
+ log.Debugw("UpdateMembership-request", log.Fields{"membership": membership})
+ out := new(empty.Empty)
+ if err := handler.core.registerCoreTopic(ctx, membership.GroupName); err != nil {
+ return out, err
+ }
+ return out, nil
+}
+
func (handler *APIHandler) EnableLogicalDevicePort(ctx context.Context, id *voltha.LogicalPortId) (*empty.Empty, error) {
log.Debugw("EnableLogicalDevicePort-request", log.Fields{"id": id, "test": common.TestModeKeys_api_test.String()})
if isTestMode(ctx) {
diff --git a/rw_core/core/logical_device_manager.go b/rw_core/core/logical_device_manager.go
index 1b2fb1e..e313b77 100644
--- a/rw_core/core/logical_device_manager.go
+++ b/rw_core/core/logical_device_manager.go
@@ -306,7 +306,7 @@
}
func (ldMgr *LogicalDeviceManager) addUNILogicalPort(ctx context.Context, childDevice *voltha.Device) error {
- log.Debugw("AddUNILogicalPort", log.Fields{"deviceId": childDevice.Id})
+ log.Debugw("AddUNILogicalPort", log.Fields{"childDeviceId": childDevice.Id, "parentDeviceId": childDevice.ParentId})
// Sanity check
if childDevice.Root {
return errors.New("Device-root")