[VOL-1499] Use precreated topic
This commit migrate from dynamically created kafka topic to
pre-created topic. The changes are made in the rw_core, simulated
onu and olt adapters, and ponsim olt and onu adapters.
TODO: move the python shared library changes into the pyvoltha
repo.
Change-Id: Ia92287ec74009872e694aa22eb896d8a6487d231
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index f72d615..46382c2 100644
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -60,18 +60,34 @@
longRunningRequestTimeout int64
defaultRequestTimeout int64
da.DefaultAPIHandler
+ core *Core
}
-func NewAPIHandler(deviceMgr *DeviceManager, lDeviceMgr *LogicalDeviceManager, adapterMgr *AdapterManager, inCompetingMode bool, longRunningRequestTimeout int64, defaultRequestTimeout int64 ) *APIHandler {
+//func NewAPIHandler(deviceMgr *DeviceManager, lDeviceMgr *LogicalDeviceManager, adapterMgr *AdapterManager, inCompetingMode bool, longRunningRequestTimeout int64, defaultRequestTimeout int64 ) *APIHandler {
+// handler := &APIHandler{
+// deviceMgr: deviceMgr,
+// logicalDeviceMgr: lDeviceMgr,
+// adapterMgr:adapterMgr,
+// coreInCompetingMode:inCompetingMode,
+// longRunningRequestTimeout:longRunningRequestTimeout,
+// defaultRequestTimeout:defaultRequestTimeout,
+// // TODO: Figure out what the 'hint' parameter to queue.New does
+// packetInQueue: queue.New(10),
+// }
+// return handler
+//}
+
+func NewAPIHandler(core *Core) *APIHandler {
handler := &APIHandler{
- deviceMgr: deviceMgr,
- logicalDeviceMgr: lDeviceMgr,
- adapterMgr:adapterMgr,
- coreInCompetingMode:inCompetingMode,
- longRunningRequestTimeout:longRunningRequestTimeout,
- defaultRequestTimeout:defaultRequestTimeout,
+ deviceMgr: core.deviceMgr,
+ logicalDeviceMgr: core.logicalDeviceMgr,
+ adapterMgr: core.adapterMgr,
+ coreInCompetingMode: core.config.InCompetingMode,
+ longRunningRequestTimeout:core.config.LongRunningRequestTimeout,
+ defaultRequestTimeout:core.config.DefaultRequestTimeout,
// TODO: Figure out what the 'hint' parameter to queue.New does
packetInQueue: queue.New(10),
+ core: core,
}
return handler
}
@@ -186,6 +202,16 @@
return out, nil
}
+
+func (handler *APIHandler) UpdateMembership(ctx context.Context, membership *voltha.Membership) (*empty.Empty, error) {
+ log.Debugw("UpdateMembership-request", log.Fields{"membership": membership})
+ out := new(empty.Empty)
+ if err := handler.core.registerCoreTopic(ctx, membership.GroupName); err != nil {
+ return out, err
+ }
+ return out, nil
+}
+
func (handler *APIHandler) EnableLogicalDevicePort(ctx context.Context, id *voltha.LogicalPortId) (*empty.Empty, error) {
log.Debugw("EnableLogicalDevicePort-request", log.Fields{"id": id, "test": common.TestModeKeys_api_test.String()})
if isTestMode(ctx) {