[VOL-1034] This commit consists of:
1) Implement PM collections from the ONU
2) Update the Registration method to include for the adapter type
and its supported device types.
Change-Id: Id984468546328b6ebf2ca47578675c69b2b66f01
diff --git a/rw_core/core/adapter_request_handler.go b/rw_core/core/adapter_request_handler.go
index e7f69f5..570b445 100644
--- a/rw_core/core/adapter_request_handler.go
+++ b/rw_core/core/adapter_request_handler.go
@@ -30,14 +30,16 @@
type AdapterRequestHandlerProxy struct {
TestMode bool
+ coreInstanceId string
deviceMgr *DeviceManager
lDeviceMgr *LogicalDeviceManager
localDataProxy *model.Proxy
clusterDataProxy *model.Proxy
}
-func NewAdapterRequestHandlerProxy(dMgr *DeviceManager, ldMgr *LogicalDeviceManager, cdProxy *model.Proxy, ldProxy *model.Proxy) *AdapterRequestHandlerProxy {
+func NewAdapterRequestHandlerProxy(coreInstanceId string, dMgr *DeviceManager, ldMgr *LogicalDeviceManager, cdProxy *model.Proxy, ldProxy *model.Proxy) *AdapterRequestHandlerProxy {
var proxy AdapterRequestHandlerProxy
+ proxy.coreInstanceId = coreInstanceId
proxy.deviceMgr = dMgr
proxy.lDeviceMgr = ldMgr
proxy.clusterDataProxy = cdProxy
@@ -46,23 +48,34 @@
}
func (rhp *AdapterRequestHandlerProxy) Register(args []*ca.Argument) (*voltha.CoreInstance, error) {
- if len(args) != 1 {
+ if len(args) != 2 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
adapter := &voltha.Adapter{}
- if err := ptypes.UnmarshalAny(args[0].Value, adapter); err != nil {
- log.Warnw("cannot-unmarshal-adapter", log.Fields{"error": err})
- return nil, err
+ deviceTypes := &voltha.DeviceTypes{}
+ for _, arg := range args {
+ switch arg.Key {
+ case "adapter":
+ if err := ptypes.UnmarshalAny(arg.Value, adapter); err != nil {
+ log.Warnw("cannot-unmarshal-adapter", log.Fields{"error": err})
+ return nil, err
+ }
+ case "deviceTypes":
+ if err := ptypes.UnmarshalAny(arg.Value, deviceTypes); err != nil {
+ log.Warnw("cannot-unmarshal-device-types", log.Fields{"error": err})
+ return nil, err
+ }
+ }
}
- log.Debugw("Register", log.Fields{"Adapter": *adapter})
+ log.Debugw("Register", log.Fields{"Adapter": *adapter, "DeviceTypes": deviceTypes, "coreId": rhp.coreInstanceId})
// TODO process the request and store the data in the KV store
if rhp.TestMode { // Execute only for test cases
return &voltha.CoreInstance{InstanceId: "CoreInstance"}, nil
}
- return &voltha.CoreInstance{InstanceId: "CoreInstance"}, nil
+ return &voltha.CoreInstance{InstanceId: rhp.coreInstanceId}, nil
}
func (rhp *AdapterRequestHandlerProxy) GetDevice(args []*ca.Argument) (*voltha.Device, error) {
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 8449ccb..6015e7f 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -56,27 +56,29 @@
// Setup the KV store
// Do not call NewBackend constructor; it creates its own KV client
- backend := model.Backend{
- Client: kvClient,
- StoreType: cf.KVStoreType,
- Host: cf.KVStoreHost,
- Port: cf.KVStorePort,
- Timeout: cf.KVStoreTimeout,
- PathPrefix: "service/voltha"}
- core.clusterDataRoot = model.NewRoot(&voltha.Voltha{}, &backend)
- core.localDataRoot = model.NewRoot(&voltha.CoreInstance{}, &backend)
+ // Commented the backend for now until the issue between the model and the KV store
+ // is resolved.
+ //backend := model.Backend{
+ // Client: kvClient,
+ // StoreType: cf.KVStoreType,
+ // Host: cf.KVStoreHost,
+ // Port: cf.KVStorePort,
+ // Timeout: cf.KVStoreTimeout,
+ // PathPrefix: "service/voltha"}
+ core.clusterDataRoot = model.NewRoot(&voltha.Voltha{}, nil)
+ core.localDataRoot = model.NewRoot(&voltha.CoreInstance{}, nil)
core.clusterDataProxy = core.clusterDataRoot.GetProxy("/", false)
core.localDataProxy = core.localDataRoot.GetProxy("/", false)
return &core
}
func (core *Core) Start(ctx context.Context) {
- log.Info("starting-core")
+ log.Info("starting-core", log.Fields{"coreId": core.instanceId})
core.startKafkaMessagingProxy(ctx)
log.Info("values", log.Fields{"kmp": core.kmp})
core.deviceMgr = newDeviceManager(core.kmp, core.clusterDataProxy)
core.logicalDeviceMgr = newLogicalDeviceManager(core.deviceMgr, core.kmp, core.clusterDataProxy)
- core.registerAdapterRequestHandler(ctx, core.deviceMgr, core.logicalDeviceMgr, core.localDataProxy, core.clusterDataProxy)
+ core.registerAdapterRequestHandler(ctx, core.instanceId, core.deviceMgr, core.logicalDeviceMgr, core.localDataProxy, core.clusterDataProxy)
go core.startDeviceManager(ctx)
go core.startLogicalDeviceManager(ctx)
go core.startGRPCService(ctx)
@@ -135,9 +137,9 @@
return nil
}
-func (core *Core) registerAdapterRequestHandler(ctx context.Context, dMgr *DeviceManager, ldMgr *LogicalDeviceManager,
+func (core *Core) registerAdapterRequestHandler(ctx context.Context, coreInstanceId string, dMgr *DeviceManager, ldMgr *LogicalDeviceManager,
cdProxy *model.Proxy, ldProxy *model.Proxy) error {
- requestProxy := NewAdapterRequestHandlerProxy(dMgr, ldMgr, cdProxy, ldProxy)
+ requestProxy := NewAdapterRequestHandlerProxy(coreInstanceId, dMgr, ldMgr, cdProxy, ldProxy)
core.kmp.SubscribeWithTarget(kafka.Topic{Name: core.config.CoreTopic}, requestProxy)
log.Info("request-handlers")