VOL-1405 : First submission for read-only core
- Most of the logic was copied from the read-write implementation
- Added missing Get/List calls
- Added necessary targets in Makefile
- Added docker and k8s manifests
Amendments:
- Removed more unecessary code.
- Removed refs to kafka
- Adjustements to reflect comments
- Removed refs to kafka in manifests
Change-Id: Ife2ca13d3ae428923825f7c19d42359d60406839
diff --git a/ro_core/core/core.go b/ro_core/core/core.go
new file mode 100644
index 0000000..0ba9b1b
--- /dev/null
+++ b/ro_core/core/core.go
@@ -0,0 +1,134 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package core
+
+import (
+ "context"
+ grpcserver "github.com/opencord/voltha-go/common/grpc"
+ "github.com/opencord/voltha-go/common/log"
+ "github.com/opencord/voltha-go/db/kvstore"
+ "github.com/opencord/voltha-go/db/model"
+ "github.com/opencord/voltha-go/protos/voltha"
+ "github.com/opencord/voltha-go/ro_core/config"
+ "google.golang.org/grpc"
+)
+
+type Core struct {
+ instanceId string
+ genericMgr *ModelProxyManager
+ deviceMgr *DeviceManager
+ logicalDeviceMgr *LogicalDeviceManager
+ grpcServer *grpcserver.GrpcServer
+ grpcNBIAPIHandler *APIHandler
+ config *config.ROCoreFlags
+ clusterDataRoot model.Root
+ localDataRoot model.Root
+ clusterDataProxy *model.Proxy
+ localDataProxy *model.Proxy
+ exitChannel chan int
+ kvClient kvstore.Client
+}
+
+func init() {
+ log.AddPackage(log.JSON, log.DebugLevel, nil)
+}
+
+func NewCore(id string, cf *config.ROCoreFlags, kvClient kvstore.Client) *Core {
+ var core Core
+ core.instanceId = id
+ core.exitChannel = make(chan int, 1)
+ core.config = cf
+ core.kvClient = kvClient
+
+ // Setup the KV store
+ // Do not call NewBackend constructor; it creates its own KV client
+ // Commented the backend for now until the issue between the model and the KV store
+ // is resolved.
+ backend := model.Backend{
+ Client: kvClient,
+ StoreType: cf.KVStoreType,
+ Host: cf.KVStoreHost,
+ Port: cf.KVStorePort,
+ Timeout: cf.KVStoreTimeout,
+ PathPrefix: "service/voltha"}
+ core.clusterDataRoot = model.NewRoot(&voltha.Voltha{}, &backend)
+ core.localDataRoot = model.NewRoot(&voltha.CoreInstance{}, &backend)
+ core.clusterDataProxy = core.clusterDataRoot.CreateProxy("/", false)
+ core.localDataProxy = core.localDataRoot.CreateProxy("/", false)
+ return &core
+}
+
+func (core *Core) Start(ctx context.Context) {
+ log.Info("starting-adaptercore", log.Fields{"coreId": core.instanceId})
+ core.genericMgr = newModelProxyManager(core.clusterDataProxy)
+ core.deviceMgr = newDeviceManager(core.clusterDataProxy, core.instanceId)
+ core.logicalDeviceMgr = newLogicalDeviceManager(core.deviceMgr, core.clusterDataProxy)
+ go core.startDeviceManager(ctx)
+ go core.startLogicalDeviceManager(ctx)
+ go core.startGRPCService(ctx)
+
+ log.Info("adaptercore-started")
+}
+
+func (core *Core) Stop(ctx context.Context) {
+ log.Info("stopping-adaptercore")
+ core.exitChannel <- 1
+ // Stop all the started services
+ core.grpcServer.Stop()
+ core.logicalDeviceMgr.stop(ctx)
+ core.deviceMgr.stop(ctx)
+ log.Info("adaptercore-stopped")
+}
+
+//startGRPCService creates the grpc service handlers, registers it to the grpc server
+// and starts the server
+func (core *Core) startGRPCService(ctx context.Context) {
+ // create an insecure gserver server
+ core.grpcServer = grpcserver.NewGrpcServer(core.config.GrpcHost, core.config.GrpcPort, nil, false)
+ log.Info("grpc-server-created")
+
+ core.grpcNBIAPIHandler = NewAPIHandler(core.genericMgr, core.deviceMgr, core.logicalDeviceMgr)
+ core.logicalDeviceMgr.setGrpcNbiHandler(core.grpcNBIAPIHandler)
+ // Create a function to register the core GRPC service with the GRPC server
+ f := func(gs *grpc.Server) {
+ voltha.RegisterVolthaServiceServer(
+ gs,
+ core.grpcNBIAPIHandler,
+ )
+ }
+
+ core.grpcServer.AddService(f)
+ log.Info("grpc-service-added")
+
+ // Start the server
+ core.grpcServer.Start(context.Background())
+ log.Info("grpc-server-started")
+}
+
+func (core *Core) startDeviceManager(ctx context.Context) {
+ // TODO: Interaction between the logicaldevicemanager and devicemanager should mostly occur via
+ // callbacks. For now, until the model is ready, devicemanager will keep a reference to the
+ // logicaldevicemanager to initiate the creation of logical devices
+ log.Info("starting-DeviceManager")
+ core.deviceMgr.start(ctx, core.logicalDeviceMgr)
+ log.Info("started-DeviceManager")
+}
+
+func (core *Core) startLogicalDeviceManager(ctx context.Context) {
+ log.Info("starting-Logical-DeviceManager")
+ core.logicalDeviceMgr.start(ctx)
+ log.Info("started-Logical-DeviceManager")
+}
diff --git a/ro_core/core/device_agent.go b/ro_core/core/device_agent.go
new file mode 100644
index 0000000..1ae1275
--- /dev/null
+++ b/ro_core/core/device_agent.go
@@ -0,0 +1,193 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package core
+
+import (
+ "context"
+ "github.com/gogo/protobuf/proto"
+ "github.com/opencord/voltha-go/common/log"
+ "github.com/opencord/voltha-go/db/model"
+ "github.com/opencord/voltha-go/protos/voltha"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+ "sync"
+)
+
+type DeviceAgent struct {
+ deviceId string
+ deviceType string
+ lastData *voltha.Device
+ deviceMgr *DeviceManager
+ clusterDataProxy *model.Proxy
+ exitChannel chan int
+ lockDevice sync.RWMutex
+}
+
+//newDeviceAgent creates a new device agent along as creating a unique ID for the device and set the device state to
+//preprovisioning
+func newDeviceAgent(device *voltha.Device, deviceMgr *DeviceManager, cdProxy *model.Proxy) *DeviceAgent {
+ var agent DeviceAgent
+ agent.deviceId = device.Id
+ agent.deviceType = device.Type
+ agent.lastData = device
+ agent.deviceMgr = deviceMgr
+ agent.exitChannel = make(chan int, 1)
+ agent.clusterDataProxy = cdProxy
+ agent.lockDevice = sync.RWMutex{}
+ return &agent
+}
+
+// start save the device to the data model and registers for callbacks on that device
+func (agent *DeviceAgent) start(ctx context.Context) {
+ log.Debugw("starting-device-agent", log.Fields{"device": agent.lastData})
+ agent.lockDevice.Lock()
+ defer agent.lockDevice.Unlock()
+ log.Debug("device-agent-started")
+}
+
+// stop stops the device agent. Not much to do for now
+func (agent *DeviceAgent) stop(ctx context.Context) {
+ agent.lockDevice.Lock()
+ defer agent.lockDevice.Unlock()
+ log.Debug("stopping-device-agent")
+ agent.exitChannel <- 1
+ log.Debug("device-agent-stopped")
+}
+
+// GetDevice retrieves the latest device information from the data model
+func (agent *DeviceAgent) getDevice() (*voltha.Device, error) {
+ agent.lockDevice.Lock()
+ defer agent.lockDevice.Unlock()
+ if device := agent.clusterDataProxy.Get("/devices/"+agent.deviceId, 1, false, ""); device != nil {
+ if d, ok := device.(*voltha.Device); ok {
+ cloned := proto.Clone(d).(*voltha.Device)
+ return cloned, nil
+ }
+ }
+ return nil, status.Errorf(codes.NotFound, "device-%s", agent.deviceId)
+}
+
+// getDeviceWithoutLock is a helper function to be used ONLY by any device agent function AFTER it has acquired the device lock.
+// This function is meant so that we do not have duplicate code all over the device agent functions
+func (agent *DeviceAgent) getDeviceWithoutLock() (*voltha.Device, error) {
+ if device := agent.clusterDataProxy.Get("/devices/"+agent.deviceId, 1, false, ""); device != nil {
+ if d, ok := device.(*voltha.Device); ok {
+ cloned := proto.Clone(d).(*voltha.Device)
+ return cloned, nil
+ }
+ }
+ return nil, status.Errorf(codes.NotFound, "device-%s", agent.deviceId)
+}
+
+// getPorts retrieves the ports information of the device based on the port type.
+func (agent *DeviceAgent) getPorts(ctx context.Context, portType voltha.Port_PortType) *voltha.Ports {
+ log.Debugw("getPorts", log.Fields{"id": agent.deviceId, "portType": portType})
+ ports := &voltha.Ports{}
+ if device, _ := agent.deviceMgr.GetDevice(agent.deviceId); device != nil {
+ for _, port := range device.Ports {
+ if port.Type == portType {
+ ports.Items = append(ports.Items, port)
+ }
+ }
+ }
+ return ports
+}
+
+// ListDevicePorts retrieves the ports information for a particular device.
+func (agent *DeviceAgent) ListDevicePorts(ctx context.Context) (*voltha.Ports, error) {
+ log.Debugw("ListDevicePorts", log.Fields{"id": agent.deviceId})
+ ports := &voltha.Ports{}
+ if device, _ := agent.deviceMgr.GetDevice(agent.deviceId); device != nil {
+ for _, entry := range device.GetPorts() {
+ ports.Items = append(ports.Items, entry)
+ }
+ return ports, nil
+ }
+ return nil, status.Errorf(codes.NotFound, "device-%s", agent.deviceId)
+}
+
+// ListDevicePmConfigs retrieves the ports information for a particular device.
+func (agent *DeviceAgent) ListDevicePmConfigs(ctx context.Context) (*voltha.PmConfigs, error) {
+ log.Debugw("ListDevicePmConfigs", log.Fields{"id": agent.deviceId})
+ if device, _ := agent.deviceMgr.GetDevice(agent.deviceId); device != nil {
+ return device.GetPmConfigs(), nil
+ }
+ return nil, status.Errorf(codes.NotFound, "device-%s", agent.deviceId)
+}
+
+// ListDeviceFlows retrieves the ports information for a particular device.
+func (agent *DeviceAgent) ListDeviceFlows(ctx context.Context) (*voltha.Flows, error) {
+ log.Debugw("ListDeviceFlows", log.Fields{"id": agent.deviceId})
+ if device, _ := agent.deviceMgr.GetDevice(agent.deviceId); device != nil {
+ return device.GetFlows(), nil
+ }
+ return nil, status.Errorf(codes.NotFound, "device-%s", agent.deviceId)
+}
+
+// ListDeviceFlows retrieves the ports information for a particular device.
+func (agent *DeviceAgent) ListDeviceFlowGroups(ctx context.Context) (*voltha.FlowGroups, error) {
+ log.Debugw("ListDeviceFlowGroups", log.Fields{"id": agent.deviceId})
+ if device, _ := agent.deviceMgr.GetDevice(agent.deviceId); device != nil {
+ return device.GetFlowGroups(), nil
+ }
+ return nil, status.Errorf(codes.NotFound, "device-%s", agent.deviceId)
+}
+
+// GetImageDownloadStatus retrieves the download status of an image of a particular device.
+func (agent *DeviceAgent) GetImageDownloadStatus(ctx context.Context, imageName string) (*voltha.ImageDownload, error) {
+ log.Debugw("GetImageDownloadStatus", log.Fields{"id": agent.deviceId})
+ if device, _ := agent.deviceMgr.GetDevice(agent.deviceId); device != nil {
+ for _, img := range device.GetImageDownloads() {
+ if img.GetName() == imageName {
+ return img, nil
+ }
+ }
+ return nil, status.Errorf(codes.NotFound, "device-%s, image-%s", agent.deviceId, imageName)
+ }
+ return nil, status.Errorf(codes.NotFound, "device-%s", agent.deviceId)
+}
+
+// GetImageDownload retrieves the image download for a particular device.
+func (agent *DeviceAgent) GetImageDownload(ctx context.Context, imageName string) (*voltha.ImageDownload, error) {
+ log.Debugw("GetImageDownload", log.Fields{"id": agent.deviceId})
+ if device, _ := agent.deviceMgr.GetDevice(agent.deviceId); device != nil {
+ for _, img := range device.GetImageDownloads() {
+ if img.GetName() == imageName {
+ return img, nil
+ }
+ }
+ return nil, status.Errorf(codes.NotFound, "device-%s, image-%s", agent.deviceId, imageName)
+ }
+ return nil, status.Errorf(codes.NotFound, "device-%s", agent.deviceId)
+}
+
+// ListImageDownloads retrieves the image downloads for a particular device.
+func (agent *DeviceAgent) ListImageDownloads(ctx context.Context) (*voltha.ImageDownloads, error) {
+ log.Debugw("ListImageDownloads", log.Fields{"id": agent.deviceId})
+ if device, _ := agent.deviceMgr.GetDevice(agent.deviceId); device != nil {
+ return &voltha.ImageDownloads{Items: device.GetImageDownloads()}, nil
+ }
+ return nil, status.Errorf(codes.NotFound, "device-%s", agent.deviceId)
+}
+
+// GetImages retrieves the list of images for a particular device.
+func (agent *DeviceAgent) GetImages(ctx context.Context) (*voltha.Images, error) {
+ log.Debugw("GetImages", log.Fields{"id": agent.deviceId})
+ if device, _ := agent.deviceMgr.GetDevice(agent.deviceId); device != nil {
+ return device.GetImages(), nil
+ }
+ return nil, status.Errorf(codes.NotFound, "device-%s", agent.deviceId)
+}
\ No newline at end of file
diff --git a/ro_core/core/device_manager.go b/ro_core/core/device_manager.go
new file mode 100644
index 0000000..1851e27
--- /dev/null
+++ b/ro_core/core/device_manager.go
@@ -0,0 +1,268 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package core
+
+import (
+ "context"
+ "github.com/opencord/voltha-go/common/log"
+ "github.com/opencord/voltha-go/db/model"
+ "github.com/opencord/voltha-go/protos/voltha"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+ "sync"
+)
+
+type DeviceManager struct {
+ deviceAgents map[string]*DeviceAgent
+ logicalDeviceMgr *LogicalDeviceManager
+ clusterDataProxy *model.Proxy
+ coreInstanceId string
+ exitChannel chan int
+ lockDeviceAgentsMap sync.RWMutex
+}
+
+func newDeviceManager(cdProxy *model.Proxy, coreInstanceId string) *DeviceManager {
+ var deviceMgr DeviceManager
+ deviceMgr.exitChannel = make(chan int, 1)
+ deviceMgr.deviceAgents = make(map[string]*DeviceAgent)
+ deviceMgr.coreInstanceId = coreInstanceId
+ deviceMgr.clusterDataProxy = cdProxy
+ deviceMgr.lockDeviceAgentsMap = sync.RWMutex{}
+ return &deviceMgr
+}
+
+func (dMgr *DeviceManager) start(ctx context.Context, logicalDeviceMgr *LogicalDeviceManager) {
+ log.Info("starting-device-manager")
+ dMgr.logicalDeviceMgr = logicalDeviceMgr
+ log.Info("device-manager-started")
+}
+
+func (dMgr *DeviceManager) stop(ctx context.Context) {
+ log.Info("stopping-device-manager")
+ dMgr.exitChannel <- 1
+ log.Info("device-manager-stopped")
+}
+
+func sendResponse(ctx context.Context, ch chan interface{}, result interface{}) {
+ if ctx.Err() == nil {
+ // Returned response only of the ctx has not been cancelled/timeout/etc
+ // Channel is automatically closed when a context is Done
+ ch <- result
+ log.Debugw("sendResponse", log.Fields{"result": result})
+ } else {
+ // Should the transaction be reverted back?
+ log.Debugw("sendResponse-context-error", log.Fields{"context-error": ctx.Err()})
+ }
+}
+
+func (dMgr *DeviceManager) addDeviceAgentToMap(agent *DeviceAgent) {
+ dMgr.lockDeviceAgentsMap.Lock()
+ defer dMgr.lockDeviceAgentsMap.Unlock()
+ if _, exist := dMgr.deviceAgents[agent.deviceId]; !exist {
+ dMgr.deviceAgents[agent.deviceId] = agent
+ }
+}
+
+func (dMgr *DeviceManager) deleteDeviceAgentToMap(agent *DeviceAgent) {
+ dMgr.lockDeviceAgentsMap.Lock()
+ defer dMgr.lockDeviceAgentsMap.Unlock()
+ delete(dMgr.deviceAgents, agent.deviceId)
+}
+
+func (dMgr *DeviceManager) getDeviceAgent(deviceId string) *DeviceAgent {
+ // TODO If the device is not in memory it needs to be loaded first
+ dMgr.lockDeviceAgentsMap.Lock()
+ defer dMgr.lockDeviceAgentsMap.Unlock()
+ if agent, ok := dMgr.deviceAgents[deviceId]; ok {
+ return agent
+ }
+ return nil
+}
+
+func (dMgr *DeviceManager) listDeviceIdsFromMap() *voltha.IDs {
+ dMgr.lockDeviceAgentsMap.Lock()
+ defer dMgr.lockDeviceAgentsMap.Unlock()
+ result := &voltha.IDs{Items: make([]*voltha.ID, 0)}
+ for key, _ := range dMgr.deviceAgents {
+ result.Items = append(result.Items, &voltha.ID{Id: key})
+ }
+ return result
+}
+
+func (dMgr *DeviceManager) GetDevice(id string) (*voltha.Device, error) {
+ log.Debugw("GetDevice", log.Fields{"deviceid": id})
+ if agent := dMgr.getDeviceAgent(id); agent != nil {
+ return agent.getDevice()
+ }
+ return nil, status.Errorf(codes.NotFound, "%s", id)
+}
+
+func (dMgr *DeviceManager) IsRootDevice(id string) (bool, error) {
+ device, err := dMgr.GetDevice(id)
+ if err != nil {
+ return false, err
+ }
+ return device.Root, nil
+}
+
+// GetDevice retrieves the latest device information from the data model
+func (dMgr *DeviceManager) ListDevices() (*voltha.Devices, error) {
+ log.Debug("ListDevices")
+ result := &voltha.Devices{}
+ if devices := dMgr.clusterDataProxy.Get("/devices", 0, false, ""); devices != nil {
+ for _, device := range devices.([]interface{}) {
+ if agent := dMgr.getDeviceAgent(device.(*voltha.Device).Id); agent == nil {
+ agent = newDeviceAgent(device.(*voltha.Device), dMgr, dMgr.clusterDataProxy)
+ dMgr.addDeviceAgentToMap(agent)
+ agent.start(nil)
+ }
+ result.Items = append(result.Items, device.(*voltha.Device))
+ }
+ }
+ return result, nil
+}
+
+// ListDeviceIds retrieves the latest device IDs information from the data model (memory data only)
+func (dMgr *DeviceManager) ListDeviceIds() (*voltha.IDs, error) {
+ log.Debug("ListDeviceIDs")
+ // Report only device IDs that are in the device agent map
+ return dMgr.listDeviceIdsFromMap(), nil
+}
+
+//ReconcileDevices is a request to a voltha core to managed a list of devices based on their IDs
+func (dMgr *DeviceManager) ReconcileDevices(ctx context.Context, ids *voltha.IDs, ch chan interface{}) {
+ log.Debug("ReconcileDevices")
+ var res interface{}
+ if ids != nil {
+ toReconcile := len(ids.Items)
+ reconciled := 0
+ for _, id := range ids.Items {
+ // Act on the device only if its not present in the agent map
+ if agent := dMgr.getDeviceAgent(id.Id); agent == nil {
+ // Device Id not in memory
+ log.Debugw("reconciling-device", log.Fields{"id": id.Id})
+ // Load device from model
+ if device := dMgr.clusterDataProxy.Get("/devices/"+id.Id, 0, false, ""); device != nil {
+ agent = newDeviceAgent(device.(*voltha.Device), dMgr, dMgr.clusterDataProxy)
+ dMgr.addDeviceAgentToMap(agent)
+ agent.start(nil)
+ reconciled += 1
+ } else {
+ log.Warnw("device-inexistent", log.Fields{"id": id.Id})
+ }
+ } else {
+ reconciled += 1
+ }
+ }
+ if toReconcile != reconciled {
+ res = status.Errorf(codes.DataLoss, "less-device-reconciled:%d/%d", reconciled, toReconcile)
+ }
+ } else {
+ res = status.Errorf(codes.InvalidArgument, "empty-list-of-ids")
+ }
+ sendResponse(ctx, ch, res)
+}
+
+func (dMgr *DeviceManager) getPorts(ctx context.Context, deviceId string, portType voltha.Port_PortType) (*voltha.Ports, error) {
+ log.Debugw("getPorts", log.Fields{"deviceid": deviceId, "portType": portType})
+ if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
+ return agent.getPorts(ctx, portType), nil
+ }
+ return nil, status.Errorf(codes.NotFound, "%s", deviceId)
+
+}
+
+func (dMgr *DeviceManager) ListDevicePorts(ctx context.Context, deviceId string) (*voltha.Ports, error) {
+ log.Debugw("ListDevicePorts", log.Fields{"deviceid": deviceId})
+ if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
+ return agent.ListDevicePorts(ctx)
+ }
+ return nil, status.Errorf(codes.NotFound, "%s", deviceId)
+
+}
+
+func (dMgr *DeviceManager) ListDevicePmConfigs(ctx context.Context, deviceId string) (*voltha.PmConfigs, error) {
+ log.Debugw("ListDevicePmConfigs", log.Fields{"deviceid": deviceId})
+ if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
+ return agent.ListDevicePmConfigs(ctx)
+ }
+ return nil, status.Errorf(codes.NotFound, "%s", deviceId)
+
+}
+
+func (dMgr *DeviceManager) ListDeviceFlows(ctx context.Context, deviceId string) (*voltha.Flows, error) {
+ log.Debugw("ListDeviceFlows", log.Fields{"deviceid": deviceId})
+ if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
+ return agent.ListDeviceFlows(ctx)
+ }
+ return nil, status.Errorf(codes.NotFound, "%s", deviceId)
+
+}
+
+func (dMgr *DeviceManager) ListDeviceFlowGroups(ctx context.Context, deviceId string) (*voltha.FlowGroups, error) {
+ log.Debugw("ListDeviceFlowGroups", log.Fields{"deviceid": deviceId})
+ if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
+ return agent.ListDeviceFlowGroups(ctx)
+ }
+ return nil, status.Errorf(codes.NotFound, "%s", deviceId)
+
+}
+
+func (dMgr *DeviceManager) GetImageDownloadStatus(ctx context.Context, deviceId string, imageName string) (*voltha.ImageDownload, error) {
+ log.Debugw("GetImageDownloadStatus", log.Fields{"deviceid": deviceId, "imagename": imageName})
+ if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
+ return agent.GetImageDownloadStatus(ctx, imageName)
+ }
+ return nil, status.Errorf(codes.NotFound, "%s", deviceId)
+
+}
+
+func (dMgr *DeviceManager) GetImageDownload(ctx context.Context, deviceId string, imageName string) ( *voltha.ImageDownload, error) {
+ log.Debugw("GetImageDownload", log.Fields{"deviceid": deviceId, "imagename": imageName})
+ if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
+ return agent.GetImageDownload(ctx, imageName)
+ }
+ return nil, status.Errorf(codes.NotFound, "%s", deviceId)
+
+}
+
+func (dMgr *DeviceManager) ListImageDownloads(ctx context.Context, deviceId string) ( *voltha.ImageDownloads, error) {
+ log.Debugw("ListImageDownloads", log.Fields{"deviceid": deviceId})
+ if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
+ return agent.ListImageDownloads(ctx)
+ }
+ return nil, status.Errorf(codes.NotFound, "%s", deviceId)
+
+}
+
+func (dMgr *DeviceManager) GetImages(ctx context.Context, deviceId string) ( *voltha.Images, error) {
+ log.Debugw("GetImages", log.Fields{"deviceid": deviceId})
+ if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
+ return agent.GetImages(ctx)
+ }
+ return nil, status.Errorf(codes.NotFound, "%s", deviceId)
+
+}
+
+func (dMgr *DeviceManager) getParentDevice(childDevice *voltha.Device) *voltha.Device {
+ // Sanity check
+ if childDevice.Root {
+ // childDevice is the parent device
+ return childDevice
+ }
+ parentDevice, _ := dMgr.GetDevice(childDevice.ParentId)
+ return parentDevice
+}
diff --git a/ro_core/core/grpc_nbi_api_handler.go b/ro_core/core/grpc_nbi_api_handler.go
new file mode 100644
index 0000000..5727983
--- /dev/null
+++ b/ro_core/core/grpc_nbi_api_handler.go
@@ -0,0 +1,254 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package core
+
+import (
+ "context"
+ "errors"
+ "github.com/golang/protobuf/ptypes/empty"
+ da "github.com/opencord/voltha-go/common/core/northbound/grpc"
+ "github.com/opencord/voltha-go/common/log"
+ "github.com/opencord/voltha-go/protos/common"
+ "github.com/opencord/voltha-go/protos/voltha"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/metadata"
+ "google.golang.org/grpc/status"
+)
+
+type APIHandler struct {
+ commonMgr *ModelProxyManager
+ deviceMgr *DeviceManager
+ logicalDeviceMgr *LogicalDeviceManager
+ da.DefaultAPIHandler
+}
+
+func NewAPIHandler(generalMgr *ModelProxyManager, deviceMgr *DeviceManager, lDeviceMgr *LogicalDeviceManager) *APIHandler {
+ handler := &APIHandler{
+ commonMgr: generalMgr,
+ deviceMgr: deviceMgr,
+ logicalDeviceMgr: lDeviceMgr,
+ }
+ return handler
+}
+
+// isTestMode is a helper function to determine a function is invoked for testing only
+func isTestMode(ctx context.Context) bool {
+ md, _ := metadata.FromIncomingContext(ctx)
+ _, exist := md[common.TestModeKeys_api_test.String()]
+ return exist
+}
+
+// waitForNilResponseOnSuccess is a helper function to wait for a response on channel ch where an nil
+// response is expected in a successful scenario
+func waitForNilResponseOnSuccess(ctx context.Context, ch chan interface{}) (*empty.Empty, error) {
+ select {
+ case res := <-ch:
+ if res == nil {
+ return new(empty.Empty), nil
+ } else if err, ok := res.(error); ok {
+ return new(empty.Empty), err
+ } else {
+ log.Warnw("unexpected-return-type", log.Fields{"result": res})
+ err = status.Errorf(codes.Internal, "%s", res)
+ return new(empty.Empty), err
+ }
+ case <-ctx.Done():
+ log.Debug("client-timeout")
+ return nil, ctx.Err()
+ }
+}
+
+// GetVoltha returns the contents of all components (i.e. devices, logical_devices, ...)
+func (handler *APIHandler) GetVoltha(ctx context.Context, empty *empty.Empty) (*voltha.Voltha, error) {
+ log.Debug("GetVoltha")
+ return handler.commonMgr.GetVoltha(ctx)
+}
+
+// ListCoreInstances returns details on the running core containers
+func (handler *APIHandler) ListCoreInstances(ctx context.Context, empty *empty.Empty) (*voltha.CoreInstances, error) {
+ log.Debug("ListCoreInstances")
+ return handler.commonMgr.ListCoreInstances(ctx)
+}
+
+// GetCoreInstance returns the details of a specific core container
+func (handler *APIHandler) GetCoreInstance(ctx context.Context, id *voltha.ID) (*voltha.CoreInstance, error) {
+ log.Debugw("GetCoreInstance", log.Fields{"id": id})
+ return handler.commonMgr.GetCoreInstance(ctx, id.Id)
+}
+
+// ListAdapters returns the contents of all adapters known to the system
+func (handler *APIHandler) ListAdapters(ctx context.Context, empty *empty.Empty) (*voltha.Adapters, error) {
+ log.Debug("ListDevices")
+ return handler.commonMgr.ListAdapters(ctx)
+}
+
+// GetDevice returns the details a specific device
+func (handler *APIHandler) GetDevice(ctx context.Context, id *voltha.ID) (*voltha.Device, error) {
+ log.Debugw("GetDevice-request", log.Fields{"id": id})
+ return handler.deviceMgr.GetDevice(id.Id)
+}
+
+// ListDevices returns the contents of all devices known to the system
+func (handler *APIHandler) ListDevices(ctx context.Context, empty *empty.Empty) (*voltha.Devices, error) {
+ log.Debug("ListDevices")
+ return handler.deviceMgr.ListDevices()
+}
+
+// ListDeviceIds returns the list of device ids managed by a voltha core
+func (handler *APIHandler) ListDeviceIds(ctx context.Context, empty *empty.Empty) (*voltha.IDs, error) {
+ log.Debug("ListDeviceIDs")
+ if isTestMode(ctx) {
+ out := &voltha.IDs{Items: make([]*voltha.ID, 0)}
+ return out, nil
+ }
+ return handler.deviceMgr.ListDeviceIds()
+}
+
+// ListDevicePorts returns the ports details for a specific device entry
+func (handler *APIHandler) ListDevicePorts(ctx context.Context, id *voltha.ID) (*voltha.Ports, error) {
+ log.Debugw("ListDevicePorts", log.Fields{"deviceid": id})
+ return handler.deviceMgr.ListDevicePorts(ctx, id.Id)
+}
+
+// ListDevicePmConfigs returns the PM config details for a specific device entry
+func (handler *APIHandler) ListDevicePmConfigs(ctx context.Context, id *voltha.ID) (*voltha.PmConfigs, error) {
+ log.Debugw("ListDevicePmConfigs", log.Fields{"deviceid": id})
+ return handler.deviceMgr.ListDevicePmConfigs(ctx, id.Id)
+}
+
+// ListDeviceFlows returns the flow details for a specific device entry
+func (handler *APIHandler) ListDeviceFlows(ctx context.Context, id *voltha.ID) (*voltha.Flows, error) {
+ log.Debugw("ListDeviceFlows", log.Fields{"deviceid": id})
+ return handler.deviceMgr.ListDeviceFlows(ctx, id.Id)
+}
+
+// ListDeviceFlowGroups returns the flow group details for a specific device entry
+func (handler *APIHandler) ListDeviceFlowGroups(ctx context.Context, id *voltha.ID) (*voltha.FlowGroups, error) {
+ log.Debugw("ListDeviceFlowGroups", log.Fields{"deviceid": id})
+ return handler.deviceMgr.ListDeviceFlowGroups(ctx, id.Id)
+}
+
+// ListDeviceTypes returns all the device types known to the system
+func (handler *APIHandler) ListDeviceTypes(ctx context.Context, empty *empty.Empty) (*voltha.DeviceTypes, error) {
+ log.Debug("ListDeviceTypes")
+ return handler.commonMgr.ListDeviceTypes(ctx)
+}
+
+// GetDeviceType returns the device type for a specific device entry
+func (handler *APIHandler) GetDeviceType(ctx context.Context, id *voltha.ID) (*voltha.DeviceType, error) {
+ log.Debugw("GetDeviceType", log.Fields{"typeid": id})
+ return handler.commonMgr.GetDeviceType(ctx, id.Id)
+}
+
+// ListDeviceGroups returns all the device groups known to the system
+func (handler *APIHandler) ListDeviceGroups(ctx context.Context, empty *empty.Empty) (*voltha.DeviceGroups, error) {
+ log.Debug("ListDeviceGroups")
+ return handler.commonMgr.ListDeviceGroups(ctx)
+}
+
+// GetDeviceGroup returns a specific device group entry
+func (handler *APIHandler) GetDeviceGroup(ctx context.Context, id *voltha.ID) (*voltha.DeviceGroup, error) {
+ log.Debugw("GetDeviceGroup", log.Fields{"groupid": id})
+ return handler.commonMgr.GetDeviceGroup(ctx, id.Id)
+}
+
+// GetImageDownloadStatus returns the download status for a specific image entry
+func (handler *APIHandler) GetImageDownloadStatus(ctx context.Context, img *voltha.ImageDownload) (*voltha.ImageDownload, error) {
+ log.Debugw("GetImageDownloadStatus", log.Fields{"deviceid": img.GetId(), "imagename": img.GetName()})
+ return handler.deviceMgr.GetImageDownloadStatus(ctx, img.GetId(), img.GetName())
+}
+
+// GetImageDownload return the download details for a specific image entry
+func (handler *APIHandler) GetImageDownload(ctx context.Context, img *voltha.ImageDownload) (*voltha.ImageDownload, error) {
+ log.Debugw("GetImageDownload", log.Fields{"deviceid": img.GetId(), "imagename": img.GetName()})
+ return handler.deviceMgr.GetImageDownload(ctx, img.GetId(), img.GetName())
+}
+
+// ListImageDownloads returns all image downloads known to the system
+func (handler *APIHandler) ListImageDownloads(ctx context.Context, id *voltha.ID) (*voltha.ImageDownloads, error) {
+ log.Debugw("GetImageDownload", log.Fields{"deviceid": id})
+ return handler.deviceMgr.ListImageDownloads(ctx, id.Id)
+}
+
+// GetImages returns all images for a specific device entry
+func (handler *APIHandler) GetImages(ctx context.Context, id *voltha.ID) (*voltha.Images, error) {
+ log.Debugw("GetImages", log.Fields{"deviceid": id})
+ return handler.deviceMgr.GetImages(ctx, id.Id)
+}
+
+// ListAlarmFilters return all alarm filters known to the system
+func (handler *APIHandler) ListAlarmFilters(ctx context.Context, empty *empty.Empty) (*voltha.AlarmFilters, error) {
+ log.Debug("ListAlarmFilters")
+ return handler.commonMgr.ListAlarmFilters(ctx)
+}
+
+// GetAlarmFilter returns a specific alarm filter entry
+func (handler *APIHandler) GetAlarmFilter(ctx context.Context, id *voltha.ID) (*voltha.AlarmFilter, error) {
+ log.Debugw("GetAlarmFilter", log.Fields{"alarmid": id})
+ return handler.commonMgr.GetAlarmFilter(ctx, id.Id)
+}
+
+//ReconcileDevices is a request to a voltha core to managed a list of devices based on their IDs
+func (handler *APIHandler) ReconcileDevices(ctx context.Context, ids *voltha.IDs) (*empty.Empty, error) {
+ log.Debug("ReconcileDevices")
+ if isTestMode(ctx) {
+ out := new(empty.Empty)
+ return out, nil
+ }
+ ch := make(chan interface{})
+ defer close(ch)
+ go handler.deviceMgr.ReconcileDevices(ctx, ids, ch)
+ return waitForNilResponseOnSuccess(ctx, ch)
+}
+
+// GetLogicalDevice returns the details for a specific logical device entry
+func (handler *APIHandler) GetLogicalDevice(ctx context.Context, id *voltha.ID) (*voltha.LogicalDevice, error) {
+ log.Debugw("GetLogicalDevice-request", log.Fields{"id": id})
+ return handler.logicalDeviceMgr.getLogicalDevice(id.Id)
+}
+
+// ListLogicalDevices returns all logical devices known to the system
+func (handler *APIHandler) ListLogicalDevices(ctx context.Context, empty *empty.Empty) (*voltha.LogicalDevices, error) {
+ log.Debug("ListLogicalDevices")
+ return handler.logicalDeviceMgr.listLogicalDevices()
+}
+
+// ListLogicalDevicePorts returns port details for a specific logical device entry
+func (handler *APIHandler) ListLogicalDevicePorts(ctx context.Context, id *voltha.ID) (*voltha.LogicalPorts, error) {
+ log.Debugw("ListLogicalDevicePorts", log.Fields{"logicaldeviceid": id})
+ return handler.logicalDeviceMgr.ListLogicalDevicePorts(ctx, id.Id)
+}
+
+// ListLogicalDeviceFlows returns flow details for a specific logical device entry
+func (handler *APIHandler) ListLogicalDeviceFlows(ctx context.Context, id *voltha.ID) (*voltha.Flows, error) {
+ log.Debugw("ListLogicalDeviceFlows", log.Fields{"logicaldeviceid": id})
+ return handler.logicalDeviceMgr.ListLogicalDeviceFlows(ctx, id.Id)
+}
+
+// ListLogicalDeviceFlowGroups returns flow group details for a specific logical device entry
+func (handler *APIHandler) ListLogicalDeviceFlowGroups(ctx context.Context, id *voltha.ID) (*voltha.FlowGroups, error) {
+ log.Debugw("ListLogicalDeviceFlows", log.Fields{"logicaldeviceid": id})
+ return handler.logicalDeviceMgr.ListLogicalDeviceFlowGroups(ctx, id.Id)
+}
+
+func (handler *APIHandler) SelfTest(ctx context.Context, id *voltha.ID) (*voltha.SelfTestResponse, error) {
+ log.Debugw("SelfTest-request", log.Fields{"id": id})
+ if isTestMode(ctx) {
+ resp := &voltha.SelfTestResponse{Result: voltha.SelfTestResponse_SUCCESS}
+ return resp, nil
+ }
+ return nil, errors.New("UnImplemented")
+}
diff --git a/ro_core/core/logical_device_agent.go b/ro_core/core/logical_device_agent.go
new file mode 100644
index 0000000..9cb6655
--- /dev/null
+++ b/ro_core/core/logical_device_agent.go
@@ -0,0 +1,119 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package core
+
+import (
+ "context"
+ "github.com/opencord/voltha-go/common/log"
+ "github.com/opencord/voltha-go/db/model"
+ "github.com/opencord/voltha-go/protos/voltha"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+ "sync"
+)
+
+type LogicalDeviceAgent struct {
+ logicalDeviceId string
+ lastData *voltha.LogicalDevice
+ rootDeviceId string
+ deviceMgr *DeviceManager
+ ldeviceMgr *LogicalDeviceManager
+ clusterDataProxy *model.Proxy
+ exitChannel chan int
+ lockLogicalDevice sync.RWMutex
+}
+
+func newLogicalDeviceAgent(id string, deviceId string, ldeviceMgr *LogicalDeviceManager, deviceMgr *DeviceManager, cdProxy *model.Proxy) *LogicalDeviceAgent {
+ var agent LogicalDeviceAgent
+ agent.exitChannel = make(chan int, 1)
+ agent.logicalDeviceId = id
+ agent.rootDeviceId = deviceId
+ agent.deviceMgr = deviceMgr
+ agent.clusterDataProxy = cdProxy
+ agent.ldeviceMgr = ldeviceMgr
+ agent.lockLogicalDevice = sync.RWMutex{}
+ return &agent
+}
+
+// start creates the logical device and add it to the data model
+func (agent *LogicalDeviceAgent) start(ctx context.Context) error {
+ log.Infow("starting-logical_device-agent", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
+ agent.lockLogicalDevice.Lock()
+ defer agent.lockLogicalDevice.Unlock()
+ log.Info("logical_device-agent-started")
+ return nil
+}
+
+// stop terminates the logical device agent.
+func (agent *LogicalDeviceAgent) stop(ctx context.Context) {
+ log.Info("stopping-logical_device-agent")
+ agent.lockLogicalDevice.Lock()
+ defer agent.lockLogicalDevice.Unlock()
+ //Remove the logical device from the model
+ agent.exitChannel <- 1
+ log.Info("logical_device-agent-stopped")
+}
+
+// GetLogicalDevice locks the logical device model and then retrieves the latest logical device information
+func (agent *LogicalDeviceAgent) GetLogicalDevice() (*voltha.LogicalDevice, error) {
+ log.Debug("GetLogicalDevice")
+ agent.lockLogicalDevice.Lock()
+ defer agent.lockLogicalDevice.Unlock()
+ logicalDevice := agent.clusterDataProxy.Get("/logical_devices/"+agent.logicalDeviceId, 1, false, "")
+ if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
+ return lDevice, nil
+ }
+ return nil, status.Errorf(codes.NotFound, "logical_device-%s", agent.logicalDeviceId)
+}
+
+func (agent *LogicalDeviceAgent) ListLogicalDevicePorts() (*voltha.LogicalPorts, error) {
+ log.Debug("!!!!!ListLogicalDevicePorts")
+ agent.lockLogicalDevice.Lock()
+ defer agent.lockLogicalDevice.Unlock()
+ logicalDevice := agent.clusterDataProxy.Get("/logical_devices/"+agent.logicalDeviceId, 1, false, "")
+ if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
+ lPorts := make([]*voltha.LogicalPort, 0)
+ for _, port := range lDevice.Ports {
+ lPorts = append(lPorts, port)
+ }
+ return &voltha.LogicalPorts{Items: lPorts}, nil
+ }
+ return nil, status.Errorf(codes.NotFound, "logical_device-%s", agent.logicalDeviceId)
+}
+
+// listFlows locks the logical device model and then retrieves the latest flow information
+func (agent *LogicalDeviceAgent) ListLogicalDeviceFlows() (*voltha.Flows, error) {
+ log.Debug("listFlows")
+ agent.lockLogicalDevice.Lock()
+ defer agent.lockLogicalDevice.Unlock()
+ logicalDevice := agent.clusterDataProxy.Get("/logical_devices/"+agent.logicalDeviceId+"/flows", 1, false, "")
+ if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
+ return lDevice.Flows, nil
+ }
+ return nil, status.Errorf(codes.NotFound, "logical_device-%s", agent.logicalDeviceId)
+}
+
+// listFlowGroups locks the logical device model and then retrieves the latest flow groups information
+func (agent *LogicalDeviceAgent) ListLogicalDeviceFlowGroups() (*voltha.FlowGroups, error) {
+ log.Debug("listFlowGroups")
+ agent.lockLogicalDevice.Lock()
+ defer agent.lockLogicalDevice.Unlock()
+ logicalDevice := agent.clusterDataProxy.Get("/logical_devices/"+agent.logicalDeviceId, 1, false, "")
+ if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
+ return lDevice.FlowGroups, nil
+ }
+ return nil, status.Errorf(codes.NotFound, "logical_device-%s", agent.logicalDeviceId)
+}
diff --git a/ro_core/core/logical_device_manager.go b/ro_core/core/logical_device_manager.go
new file mode 100644
index 0000000..64ccf28
--- /dev/null
+++ b/ro_core/core/logical_device_manager.go
@@ -0,0 +1,191 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package core
+
+import (
+ "context"
+ "github.com/opencord/voltha-go/common/log"
+ "github.com/opencord/voltha-go/db/model"
+ "github.com/opencord/voltha-go/protos/voltha"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+ "sync"
+)
+
+type LogicalDeviceManager struct {
+ logicalDeviceAgents map[string]*LogicalDeviceAgent
+ deviceMgr *DeviceManager
+ grpcNbiHdlr *APIHandler
+ clusterDataProxy *model.Proxy
+ exitChannel chan int
+ lockLogicalDeviceAgentsMap sync.RWMutex
+}
+
+func newLogicalDeviceManager(deviceMgr *DeviceManager, cdProxy *model.Proxy) *LogicalDeviceManager {
+ var logicalDeviceMgr LogicalDeviceManager
+ logicalDeviceMgr.exitChannel = make(chan int, 1)
+ logicalDeviceMgr.logicalDeviceAgents = make(map[string]*LogicalDeviceAgent)
+ logicalDeviceMgr.deviceMgr = deviceMgr
+ logicalDeviceMgr.clusterDataProxy = cdProxy
+ logicalDeviceMgr.lockLogicalDeviceAgentsMap = sync.RWMutex{}
+ return &logicalDeviceMgr
+}
+
+func (ldMgr *LogicalDeviceManager) setGrpcNbiHandler(grpcNbiHandler *APIHandler) {
+ ldMgr.grpcNbiHdlr = grpcNbiHandler
+}
+
+func (ldMgr *LogicalDeviceManager) start(ctx context.Context) {
+ log.Info("starting-logical-device-manager")
+ log.Info("logical-device-manager-started")
+}
+
+func (ldMgr *LogicalDeviceManager) stop(ctx context.Context) {
+ log.Info("stopping-logical-device-manager")
+ ldMgr.exitChannel <- 1
+ log.Info("logical-device-manager-stopped")
+}
+
+func (ldMgr *LogicalDeviceManager) addLogicalDeviceAgentToMap(agent *LogicalDeviceAgent) {
+ ldMgr.lockLogicalDeviceAgentsMap.Lock()
+ defer ldMgr.lockLogicalDeviceAgentsMap.Unlock()
+ if _, exist := ldMgr.logicalDeviceAgents[agent.logicalDeviceId]; !exist {
+ ldMgr.logicalDeviceAgents[agent.logicalDeviceId] = agent
+ }
+}
+
+func (ldMgr *LogicalDeviceManager) getLogicalDeviceAgent(logicalDeviceId string) *LogicalDeviceAgent {
+ ldMgr.lockLogicalDeviceAgentsMap.Lock()
+ defer ldMgr.lockLogicalDeviceAgentsMap.Unlock()
+ if agent, ok := ldMgr.logicalDeviceAgents[logicalDeviceId]; ok {
+ return agent
+ }
+ return nil
+}
+
+func (ldMgr *LogicalDeviceManager) deleteLogicalDeviceAgent(logicalDeviceId string) {
+ ldMgr.lockLogicalDeviceAgentsMap.Lock()
+ defer ldMgr.lockLogicalDeviceAgentsMap.Unlock()
+ delete(ldMgr.logicalDeviceAgents, logicalDeviceId)
+}
+
+// GetLogicalDevice provides a cloned most up to date logical device
+func (ldMgr *LogicalDeviceManager) getLogicalDevice(id string) (*voltha.LogicalDevice, error) {
+ log.Debugw("getlogicalDevice", log.Fields{"logicaldeviceid": id})
+ if agent := ldMgr.getLogicalDeviceAgent(id); agent != nil {
+ return agent.GetLogicalDevice()
+ }
+ return nil, status.Errorf(codes.NotFound, "%s", id)
+}
+
+func (ldMgr *LogicalDeviceManager) listLogicalDevices() (*voltha.LogicalDevices, error) {
+ log.Debug("ListAllLogicalDevices")
+ result := &voltha.LogicalDevices{}
+ if logicalDevices := ldMgr.clusterDataProxy.Get("/logical_devices", 0, false, ""); logicalDevices != nil {
+ for _, logicalDevice := range logicalDevices.([]interface{}) {
+ if agent := ldMgr.getLogicalDeviceAgent(logicalDevice.(*voltha.LogicalDevice).Id); agent == nil {
+ agent = newLogicalDeviceAgent(
+ logicalDevice.(*voltha.LogicalDevice).Id,
+ logicalDevice.(*voltha.LogicalDevice).RootDeviceId,
+ ldMgr,
+ ldMgr.deviceMgr,
+ ldMgr.clusterDataProxy,
+ )
+ ldMgr.addLogicalDeviceAgentToMap(agent)
+ go agent.start(nil)
+ }
+ result.Items = append(result.Items, logicalDevice.(*voltha.LogicalDevice))
+ }
+ }
+ return result, nil
+}
+
+func (ldMgr *LogicalDeviceManager) getLogicalDeviceId(device *voltha.Device) (*string, error) {
+ // Device can either be a parent or a child device
+ if device.Root {
+ // Parent device. The ID of a parent device is the logical device ID
+ return &device.ParentId, nil
+ }
+ // Device is child device
+ // retrieve parent device using child device ID
+ if parentDevice := ldMgr.deviceMgr.getParentDevice(device); parentDevice != nil {
+ return &parentDevice.ParentId, nil
+ }
+ return nil, status.Errorf(codes.NotFound, "%s", device.Id)
+}
+
+func (ldMgr *LogicalDeviceManager) getLogicalPortId(device *voltha.Device) (*voltha.LogicalPortId, error) {
+ // Get the logical device where this device is attached
+ var lDeviceId *string
+ var err error
+ if lDeviceId, err = ldMgr.getLogicalDeviceId(device); err != nil {
+ return nil, err
+ }
+ var lDevice *voltha.LogicalDevice
+ if lDevice, err = ldMgr.getLogicalDevice(*lDeviceId); err != nil {
+ return nil, err
+ }
+ // Go over list of ports
+ for _, port := range lDevice.Ports {
+ if port.DeviceId == device.Id {
+ return &voltha.LogicalPortId{Id: *lDeviceId, PortId: port.Id}, nil
+ }
+ }
+ return nil, status.Errorf(codes.NotFound, "%s", device.Id)
+}
+
+func (ldMgr *LogicalDeviceManager) ListLogicalDevicePorts(ctx context.Context, id string) (*voltha.LogicalPorts, error) {
+ log.Debugw("ListLogicalDevicePorts", log.Fields{"logicaldeviceid": id})
+ if agent := ldMgr.getLogicalDeviceAgent(id); agent != nil {
+ return agent.ListLogicalDevicePorts()
+ }
+ return nil, status.Errorf(codes.NotFound, "%s", id)
+
+}
+
+func (ldMgr *LogicalDeviceManager) ListLogicalDeviceFlows(ctx context.Context, id string) (*voltha.Flows, error) {
+ log.Debugw("ListLogicalDeviceFlows", log.Fields{"logicaldeviceid": id})
+ if agent := ldMgr.getLogicalDeviceAgent(id); agent != nil {
+ return agent.ListLogicalDeviceFlows()
+ }
+ return nil, status.Errorf(codes.NotFound, "%s", id)
+
+}
+
+func (ldMgr *LogicalDeviceManager) ListLogicalDeviceFlowGroups(ctx context.Context, id string) (*voltha.FlowGroups, error) {
+ log.Debugw("ListLogicalDeviceFlowGroups", log.Fields{"logicaldeviceid": id})
+ if agent := ldMgr.getLogicalDeviceAgent(id); agent != nil {
+ return agent.ListLogicalDeviceFlowGroups()
+ }
+ return nil, status.Errorf(codes.NotFound, "%s", id)
+
+}
+
+func (ldMgr *LogicalDeviceManager) getLogicalPort(lPortId *voltha.LogicalPortId) (*voltha.LogicalPort, error) {
+ // Get the logical device where this device is attached
+ var err error
+ var lDevice *voltha.LogicalDevice
+ if lDevice, err = ldMgr.getLogicalDevice(lPortId.Id); err != nil {
+ return nil, err
+ }
+ // Go over list of ports
+ for _, port := range lDevice.Ports {
+ if port.Id == lPortId.PortId {
+ return port, nil
+ }
+ }
+ return nil, status.Errorf(codes.NotFound, "%s-$s", lPortId.Id, lPortId.PortId)
+}
diff --git a/ro_core/core/model_proxy.go b/ro_core/core/model_proxy.go
new file mode 100644
index 0000000..f5e6c3b
--- /dev/null
+++ b/ro_core/core/model_proxy.go
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package core
+
+import (
+ "github.com/opencord/voltha-go/common/log"
+ "github.com/opencord/voltha-go/db/model"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+ "strings"
+ "sync"
+)
+
+// ModelProxy controls requests made to the data model
+type ModelProxy struct {
+ rootProxy *model.Proxy
+ basePath string
+ mutex sync.RWMutex
+}
+
+func newModelProxy(basePath string, rootProxy *model.Proxy) *ModelProxy {
+ ga := &ModelProxy{}
+ ga.rootProxy = rootProxy
+
+ if strings.HasPrefix(basePath, "/") {
+ ga.basePath = basePath
+ } else {
+ ga.basePath = "/" + basePath
+ }
+
+ return ga
+}
+
+// Get retrieves information at the provided path
+func (mp *ModelProxy) Get(parts ...string) (interface{}, error) {
+ mp.mutex.Lock()
+ defer mp.mutex.Unlock()
+
+ rawPath := []string{mp.basePath}
+ rawPath = append(rawPath, parts...)
+ path := strings.Join(rawPath, "/")
+
+ log.Debugw("get-data", log.Fields{"path": path})
+
+ if data := mp.rootProxy.Get(path, 1, false, ""); data != nil {
+ return data, nil
+ }
+ return nil, status.Errorf(codes.NotFound, "data-path: %s", path)
+}
diff --git a/ro_core/core/model_proxy_manager.go b/ro_core/core/model_proxy_manager.go
new file mode 100644
index 0000000..d5f7ace
--- /dev/null
+++ b/ro_core/core/model_proxy_manager.go
@@ -0,0 +1,252 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package core
+
+import (
+ "context"
+ "github.com/opencord/voltha-go/common/log"
+ "github.com/opencord/voltha-go/db/model"
+ "github.com/opencord/voltha-go/protos/voltha"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+// Enumerated type to keep track of miscellaneous data path agents
+type DataModelType int
+
+// Enumerated list of data path agents
+const (
+ Adapters DataModelType = 1 + iota
+ AlarmFilters
+ CoreInstances
+ DeviceTypes
+ DeviceGroups
+ Voltha
+)
+
+// String equivalent for data path agents
+var commonTypes = []string {
+ "Adapters",
+ "AlarmFilters",
+ "CoreInstances",
+ "DeviceTypes",
+ "DeviceGroups",
+ "Voltha",
+}
+
+// String converts the enumerated data path agent value to its string equivalent
+func (t DataModelType) String() string {
+ return commonTypes[t-1]
+}
+
+// ModelProxyManager controls requests made to the miscellaneous data path agents
+type ModelProxyManager struct {
+ modelProxy map[string]*ModelProxy
+ clusterDataProxy *model.Proxy
+}
+
+func newModelProxyManager(cdProxy *model.Proxy) *ModelProxyManager {
+ var mgr ModelProxyManager
+ mgr.modelProxy = make(map[string]*ModelProxy)
+ mgr.clusterDataProxy = cdProxy
+ return &mgr
+}
+
+// GetDeviceType returns the device type associated to the provided id
+func (mpMgr *ModelProxyManager) GetVoltha(ctx context.Context) (*voltha.Voltha, error) {
+ log.Debug("GetVoltha")
+
+ var agent *ModelProxy
+ var exists bool
+
+ if agent, exists = mpMgr.modelProxy[Voltha.String()]; !exists {
+ agent = newModelProxy("", mpMgr.clusterDataProxy)
+ mpMgr.modelProxy[Voltha.String()] = agent
+ }
+
+ if instance, _ := agent.Get(); instance != nil {
+ return instance.(*voltha.Voltha), nil
+ }
+
+ return &voltha.Voltha{}, status.Errorf(codes.NotFound, "no-voltha-instance")
+}
+
+// ListCoreInstances returns all the core instances known to the system
+func (mpMgr *ModelProxyManager) ListCoreInstances(ctx context.Context) (*voltha.CoreInstances, error) {
+ log.Debug("ListCoreInstances")
+
+ // TODO: Need to retrieve the list of registered cores
+
+ return &voltha.CoreInstances{}, status.Errorf(codes.NotFound, "no-core-instances")
+}
+
+// GetCoreInstance returns the core instance associated to the provided id
+func (mpMgr *ModelProxyManager) GetCoreInstance(ctx context.Context, id string) (*voltha.CoreInstance, error) {
+ log.Debugw("GetCoreInstance", log.Fields{"id": id})
+
+ // TODO: Need to retrieve the list of registered cores
+
+ return &voltha.CoreInstance{}, status.Errorf(codes.NotFound, "core-instance-%s", id)
+}
+
+// ListAdapters returns all the device types known to the system
+func (mpMgr *ModelProxyManager) ListAdapters(ctx context.Context) (*voltha.Adapters, error) {
+ log.Debug("ListAdapters")
+
+ var agent *ModelProxy
+ var exists bool
+
+ if agent, exists = mpMgr.modelProxy[Adapters.String()]; !exists {
+ agent = newModelProxy("adapters", mpMgr.clusterDataProxy)
+ mpMgr.modelProxy[Adapters.String()] = agent
+ }
+
+ adapters := &voltha.Adapters{}
+ if items, _ := agent.Get(); items != nil {
+ for _, item := range items.([]interface{}) {
+ adapters.Items = append(adapters.Items, item.(*voltha.Adapter))
+ }
+ log.Debugw("retrieved-adapters", log.Fields{"adapters": adapters})
+ return adapters, nil
+ }
+
+ return adapters, status.Errorf(codes.NotFound, "no-adapters")
+}
+
+// ListDeviceTypes returns all the device types known to the system
+func (mpMgr *ModelProxyManager) ListDeviceTypes(ctx context.Context) (*voltha.DeviceTypes, error) {
+ log.Debug("ListDeviceTypes")
+
+ var agent *ModelProxy
+ var exists bool
+
+ if agent, exists = mpMgr.modelProxy[DeviceTypes.String()]; !exists {
+ agent = newModelProxy("device_types", mpMgr.clusterDataProxy)
+ mpMgr.modelProxy[DeviceTypes.String()] = agent
+ }
+
+ deviceTypes := &voltha.DeviceTypes{}
+ if items, _ := agent.Get(); items != nil {
+ for _, item := range items.([]interface{}) {
+ deviceTypes.Items = append(deviceTypes.Items, item.(*voltha.DeviceType))
+ }
+ return deviceTypes, nil
+ }
+
+ return deviceTypes, status.Errorf(codes.NotFound, "no-device-types")
+}
+
+// GetDeviceType returns the device type associated to the provided id
+func (mpMgr *ModelProxyManager) GetDeviceType(ctx context.Context, id string) (*voltha.DeviceType, error) {
+ log.Debugw("GetDeviceType", log.Fields{"id": id})
+
+ var agent *ModelProxy
+ var exists bool
+
+ if agent, exists = mpMgr.modelProxy[DeviceTypes.String()]; !exists {
+ agent = newModelProxy("device_types", mpMgr.clusterDataProxy)
+ mpMgr.modelProxy[DeviceTypes.String()] = agent
+ }
+
+ if deviceType, _ := agent.Get(id); deviceType != nil {
+ return deviceType.(*voltha.DeviceType), nil
+ }
+
+ return &voltha.DeviceType{}, status.Errorf(codes.NotFound, "device-type-%s", id)
+}
+
+// ListDeviceGroups returns all the device groups known to the system
+func (mpMgr *ModelProxyManager) ListDeviceGroups(ctx context.Context) (*voltha.DeviceGroups, error) {
+ log.Debug("ListDeviceGroups")
+
+ var agent *ModelProxy
+ var exists bool
+
+ if agent, exists = mpMgr.modelProxy[DeviceGroups.String()]; !exists {
+ agent = newModelProxy("device_groups", mpMgr.clusterDataProxy)
+ mpMgr.modelProxy[DeviceGroups.String()] = agent
+ }
+
+ deviceGroups := &voltha.DeviceGroups{}
+ if items, _ := agent.Get(); items != nil {
+ for _, item := range items.([]interface{}) {
+ deviceGroups.Items = append(deviceGroups.Items, item.(*voltha.DeviceGroup))
+ }
+ return deviceGroups, nil
+ }
+
+ return deviceGroups, status.Errorf(codes.NotFound, "no-device-groups")
+}
+
+// GetDeviceGroup returns the device group associated to the provided id
+func (mpMgr *ModelProxyManager) GetDeviceGroup(ctx context.Context, id string) (*voltha.DeviceGroup, error) {
+ log.Debugw("GetDeviceGroup", log.Fields{"id": id})
+
+ var agent *ModelProxy
+ var exists bool
+
+ if agent, exists = mpMgr.modelProxy[DeviceGroups.String()]; !exists {
+ agent = newModelProxy("device_groups", mpMgr.clusterDataProxy)
+ mpMgr.modelProxy[DeviceGroups.String()] = agent
+ }
+
+ if deviceGroup, _ := agent.Get(id); deviceGroup != nil {
+ return deviceGroup.(*voltha.DeviceGroup), nil
+ }
+
+ return &voltha.DeviceGroup{}, status.Errorf(codes.NotFound, "device-group-%s", id)
+}
+
+// ListAlarmFilters returns all the alarm filters known to the system
+func (mpMgr *ModelProxyManager) ListAlarmFilters(ctx context.Context) (*voltha.AlarmFilters, error) {
+ log.Debug("ListAlarmFilters")
+
+ var agent *ModelProxy
+ var exists bool
+
+ if agent, exists = mpMgr.modelProxy[AlarmFilters.String()]; !exists {
+ agent = newModelProxy("alarm_filters", mpMgr.clusterDataProxy)
+ mpMgr.modelProxy[AlarmFilters.String()] = agent
+ }
+
+ alarmFilters := &voltha.AlarmFilters{}
+ if items, _ := agent.Get(); items != nil {
+ for _, item := range items.([]interface{}) {
+ alarmFilters.Filters = append(alarmFilters.Filters, item.(*voltha.AlarmFilter))
+ }
+ return alarmFilters, nil
+ }
+
+ return alarmFilters, status.Errorf(codes.NotFound, "no-alarm-filters")
+}
+
+// GetAlarmFilter returns the alarm filter associated to the provided id
+func (mpMgr *ModelProxyManager) GetAlarmFilter(ctx context.Context, id string) (*voltha.AlarmFilter, error) {
+ log.Debugw("GetAlarmFilter", log.Fields{"id": id})
+
+ var agent *ModelProxy
+ var exists bool
+
+ if agent, exists = mpMgr.modelProxy[AlarmFilters.String()]; !exists {
+ agent = newModelProxy("alarm_filters", mpMgr.clusterDataProxy)
+ mpMgr.modelProxy[AlarmFilters.String()] = agent
+ }
+
+ if alarmFilter, _ := agent.Get(id); alarmFilter != nil {
+ return alarmFilter.(*voltha.AlarmFilter), nil
+ }
+ return &voltha.AlarmFilter{}, status.Errorf(codes.NotFound, "alarm-filter-%s", id)
+}