VOL-1405 : First submission for read-only core
- Most of the logic was copied from the read-write implementation
- Added missing Get/List calls
- Added necessary targets in Makefile
- Added docker and k8s manifests
Amendments:
- Removed more unecessary code.
- Removed refs to kafka
- Adjustements to reflect comments
- Removed refs to kafka in manifests
Change-Id: Ife2ca13d3ae428923825f7c19d42359d60406839
diff --git a/ro_core/config/config.go b/ro_core/config/config.go
new file mode 100644
index 0000000..b7906f2
--- /dev/null
+++ b/ro_core/config/config.go
@@ -0,0 +1,147 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package config
+
+import (
+ "flag"
+ "fmt"
+ "github.com/opencord/voltha-go/common/log"
+ "os"
+)
+
+// RO Core service default constants
+const (
+ ConsulStoreName = "consul"
+ EtcdStoreName = "etcd"
+ default_InstanceID = "rocore001"
+ default_GrpcPort = 50057
+ default_GrpcHost = ""
+ default_KVStoreType = EtcdStoreName
+ default_KVStoreTimeout = 5 //in seconds
+ default_KVStoreHost = "127.0.0.1"
+ default_KVStorePort = 2379 // Consul = 8500; Etcd = 2379
+ default_KVTxnKeyDelTime = 60
+ default_LogLevel = 0
+ default_Banner = false
+ default_CoreTopic = "rocore"
+ default_ROCoreEndpoint = "rocore"
+ default_ROCoreKey = "pki/voltha.key"
+ default_ROCoreCert = "pki/voltha.crt"
+ default_ROCoreCA = "pki/voltha-CA.pem"
+ default_Affinity_Router_Topic = "affinityRouter"
+)
+
+// ROCoreFlags represents the set of configurations used by the read-only core service
+type ROCoreFlags struct {
+ // Command line parameters
+ InstanceID string
+ ROCoreEndpoint string
+ GrpcHost string
+ GrpcPort int
+ KVStoreType string
+ KVStoreTimeout int // in seconds
+ KVStoreHost string
+ KVStorePort int
+ KVTxnKeyDelTime int
+ CoreTopic string
+ LogLevel int
+ Banner bool
+ ROCoreKey string
+ ROCoreCert string
+ ROCoreCA string
+ AffinityRouterTopic string
+}
+
+func init() {
+ log.AddPackage(log.JSON, log.WarnLevel, nil)
+}
+
+// NewROCoreFlags returns a new ROCore config
+func NewROCoreFlags() *ROCoreFlags {
+ var roCoreFlag = ROCoreFlags{ // Default values
+ InstanceID: default_InstanceID,
+ ROCoreEndpoint: default_ROCoreEndpoint,
+ GrpcHost: default_GrpcHost,
+ GrpcPort: default_GrpcPort,
+ KVStoreType: default_KVStoreType,
+ KVStoreTimeout: default_KVStoreTimeout,
+ KVStoreHost: default_KVStoreHost,
+ KVStorePort: default_KVStorePort,
+ KVTxnKeyDelTime: default_KVTxnKeyDelTime,
+ CoreTopic: default_CoreTopic,
+ LogLevel: default_LogLevel,
+ Banner: default_Banner,
+ ROCoreKey: default_ROCoreKey,
+ ROCoreCert: default_ROCoreCert,
+ ROCoreCA: default_ROCoreCA,
+ AffinityRouterTopic: default_Affinity_Router_Topic,
+ }
+ return &roCoreFlag
+}
+
+// ParseCommandArguments parses the arguments when running read-only core service
+func (cf *ROCoreFlags) ParseCommandArguments() {
+
+ var help string
+
+ help = fmt.Sprintf("RO core endpoint address")
+ flag.StringVar(&(cf.ROCoreEndpoint), "vcore-endpoint", default_ROCoreEndpoint, help)
+
+ help = fmt.Sprintf("GRPC server - host")
+ flag.StringVar(&(cf.GrpcHost), "grpc_host", default_GrpcHost, help)
+
+ help = fmt.Sprintf("GRPC server - port")
+ flag.IntVar(&(cf.GrpcPort), "grpc_port", default_GrpcPort, help)
+
+ help = fmt.Sprintf("RO Core topic")
+ flag.StringVar(&(cf.CoreTopic), "ro_core_topic", default_CoreTopic, help)
+
+ help = fmt.Sprintf("Affinity Router topic")
+ flag.StringVar(&(cf.AffinityRouterTopic), "affinity_router_topic", default_Affinity_Router_Topic, help)
+
+ help = fmt.Sprintf("KV store type")
+ flag.StringVar(&(cf.KVStoreType), "kv_store_type", default_KVStoreType, help)
+
+ help = fmt.Sprintf("The default timeout when making a kv store request")
+ flag.IntVar(&(cf.KVStoreTimeout), "kv_store_request_timeout", default_KVStoreTimeout, help)
+
+ help = fmt.Sprintf("KV store host")
+ flag.StringVar(&(cf.KVStoreHost), "kv_store_host", default_KVStoreHost, help)
+
+ help = fmt.Sprintf("KV store port")
+ flag.IntVar(&(cf.KVStorePort), "kv_store_port", default_KVStorePort, help)
+
+ help = fmt.Sprintf("The time to wait before deleting a completed transaction key")
+ flag.IntVar(&(cf.KVTxnKeyDelTime), "kv_txn_delete_time", default_KVTxnKeyDelTime, help)
+
+ help = fmt.Sprintf("Log level")
+ flag.IntVar(&(cf.LogLevel), "log_level", default_LogLevel, help)
+
+ help = fmt.Sprintf("Show startup banner log lines")
+ flag.BoolVar(&cf.Banner, "banner", default_Banner, help)
+
+ flag.Parse()
+
+ containerName := getContainerInfo()
+ if len(containerName) > 0 {
+ cf.InstanceID = containerName
+ }
+
+}
+
+func getContainerInfo() string {
+ return os.Getenv("HOSTNAME")
+}
diff --git a/ro_core/core/core.go b/ro_core/core/core.go
new file mode 100644
index 0000000..0ba9b1b
--- /dev/null
+++ b/ro_core/core/core.go
@@ -0,0 +1,134 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package core
+
+import (
+ "context"
+ grpcserver "github.com/opencord/voltha-go/common/grpc"
+ "github.com/opencord/voltha-go/common/log"
+ "github.com/opencord/voltha-go/db/kvstore"
+ "github.com/opencord/voltha-go/db/model"
+ "github.com/opencord/voltha-go/protos/voltha"
+ "github.com/opencord/voltha-go/ro_core/config"
+ "google.golang.org/grpc"
+)
+
+type Core struct {
+ instanceId string
+ genericMgr *ModelProxyManager
+ deviceMgr *DeviceManager
+ logicalDeviceMgr *LogicalDeviceManager
+ grpcServer *grpcserver.GrpcServer
+ grpcNBIAPIHandler *APIHandler
+ config *config.ROCoreFlags
+ clusterDataRoot model.Root
+ localDataRoot model.Root
+ clusterDataProxy *model.Proxy
+ localDataProxy *model.Proxy
+ exitChannel chan int
+ kvClient kvstore.Client
+}
+
+func init() {
+ log.AddPackage(log.JSON, log.DebugLevel, nil)
+}
+
+func NewCore(id string, cf *config.ROCoreFlags, kvClient kvstore.Client) *Core {
+ var core Core
+ core.instanceId = id
+ core.exitChannel = make(chan int, 1)
+ core.config = cf
+ core.kvClient = kvClient
+
+ // Setup the KV store
+ // Do not call NewBackend constructor; it creates its own KV client
+ // Commented the backend for now until the issue between the model and the KV store
+ // is resolved.
+ backend := model.Backend{
+ Client: kvClient,
+ StoreType: cf.KVStoreType,
+ Host: cf.KVStoreHost,
+ Port: cf.KVStorePort,
+ Timeout: cf.KVStoreTimeout,
+ PathPrefix: "service/voltha"}
+ core.clusterDataRoot = model.NewRoot(&voltha.Voltha{}, &backend)
+ core.localDataRoot = model.NewRoot(&voltha.CoreInstance{}, &backend)
+ core.clusterDataProxy = core.clusterDataRoot.CreateProxy("/", false)
+ core.localDataProxy = core.localDataRoot.CreateProxy("/", false)
+ return &core
+}
+
+func (core *Core) Start(ctx context.Context) {
+ log.Info("starting-adaptercore", log.Fields{"coreId": core.instanceId})
+ core.genericMgr = newModelProxyManager(core.clusterDataProxy)
+ core.deviceMgr = newDeviceManager(core.clusterDataProxy, core.instanceId)
+ core.logicalDeviceMgr = newLogicalDeviceManager(core.deviceMgr, core.clusterDataProxy)
+ go core.startDeviceManager(ctx)
+ go core.startLogicalDeviceManager(ctx)
+ go core.startGRPCService(ctx)
+
+ log.Info("adaptercore-started")
+}
+
+func (core *Core) Stop(ctx context.Context) {
+ log.Info("stopping-adaptercore")
+ core.exitChannel <- 1
+ // Stop all the started services
+ core.grpcServer.Stop()
+ core.logicalDeviceMgr.stop(ctx)
+ core.deviceMgr.stop(ctx)
+ log.Info("adaptercore-stopped")
+}
+
+//startGRPCService creates the grpc service handlers, registers it to the grpc server
+// and starts the server
+func (core *Core) startGRPCService(ctx context.Context) {
+ // create an insecure gserver server
+ core.grpcServer = grpcserver.NewGrpcServer(core.config.GrpcHost, core.config.GrpcPort, nil, false)
+ log.Info("grpc-server-created")
+
+ core.grpcNBIAPIHandler = NewAPIHandler(core.genericMgr, core.deviceMgr, core.logicalDeviceMgr)
+ core.logicalDeviceMgr.setGrpcNbiHandler(core.grpcNBIAPIHandler)
+ // Create a function to register the core GRPC service with the GRPC server
+ f := func(gs *grpc.Server) {
+ voltha.RegisterVolthaServiceServer(
+ gs,
+ core.grpcNBIAPIHandler,
+ )
+ }
+
+ core.grpcServer.AddService(f)
+ log.Info("grpc-service-added")
+
+ // Start the server
+ core.grpcServer.Start(context.Background())
+ log.Info("grpc-server-started")
+}
+
+func (core *Core) startDeviceManager(ctx context.Context) {
+ // TODO: Interaction between the logicaldevicemanager and devicemanager should mostly occur via
+ // callbacks. For now, until the model is ready, devicemanager will keep a reference to the
+ // logicaldevicemanager to initiate the creation of logical devices
+ log.Info("starting-DeviceManager")
+ core.deviceMgr.start(ctx, core.logicalDeviceMgr)
+ log.Info("started-DeviceManager")
+}
+
+func (core *Core) startLogicalDeviceManager(ctx context.Context) {
+ log.Info("starting-Logical-DeviceManager")
+ core.logicalDeviceMgr.start(ctx)
+ log.Info("started-Logical-DeviceManager")
+}
diff --git a/ro_core/core/device_agent.go b/ro_core/core/device_agent.go
new file mode 100644
index 0000000..1ae1275
--- /dev/null
+++ b/ro_core/core/device_agent.go
@@ -0,0 +1,193 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package core
+
+import (
+ "context"
+ "github.com/gogo/protobuf/proto"
+ "github.com/opencord/voltha-go/common/log"
+ "github.com/opencord/voltha-go/db/model"
+ "github.com/opencord/voltha-go/protos/voltha"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+ "sync"
+)
+
+type DeviceAgent struct {
+ deviceId string
+ deviceType string
+ lastData *voltha.Device
+ deviceMgr *DeviceManager
+ clusterDataProxy *model.Proxy
+ exitChannel chan int
+ lockDevice sync.RWMutex
+}
+
+//newDeviceAgent creates a new device agent along as creating a unique ID for the device and set the device state to
+//preprovisioning
+func newDeviceAgent(device *voltha.Device, deviceMgr *DeviceManager, cdProxy *model.Proxy) *DeviceAgent {
+ var agent DeviceAgent
+ agent.deviceId = device.Id
+ agent.deviceType = device.Type
+ agent.lastData = device
+ agent.deviceMgr = deviceMgr
+ agent.exitChannel = make(chan int, 1)
+ agent.clusterDataProxy = cdProxy
+ agent.lockDevice = sync.RWMutex{}
+ return &agent
+}
+
+// start save the device to the data model and registers for callbacks on that device
+func (agent *DeviceAgent) start(ctx context.Context) {
+ log.Debugw("starting-device-agent", log.Fields{"device": agent.lastData})
+ agent.lockDevice.Lock()
+ defer agent.lockDevice.Unlock()
+ log.Debug("device-agent-started")
+}
+
+// stop stops the device agent. Not much to do for now
+func (agent *DeviceAgent) stop(ctx context.Context) {
+ agent.lockDevice.Lock()
+ defer agent.lockDevice.Unlock()
+ log.Debug("stopping-device-agent")
+ agent.exitChannel <- 1
+ log.Debug("device-agent-stopped")
+}
+
+// GetDevice retrieves the latest device information from the data model
+func (agent *DeviceAgent) getDevice() (*voltha.Device, error) {
+ agent.lockDevice.Lock()
+ defer agent.lockDevice.Unlock()
+ if device := agent.clusterDataProxy.Get("/devices/"+agent.deviceId, 1, false, ""); device != nil {
+ if d, ok := device.(*voltha.Device); ok {
+ cloned := proto.Clone(d).(*voltha.Device)
+ return cloned, nil
+ }
+ }
+ return nil, status.Errorf(codes.NotFound, "device-%s", agent.deviceId)
+}
+
+// getDeviceWithoutLock is a helper function to be used ONLY by any device agent function AFTER it has acquired the device lock.
+// This function is meant so that we do not have duplicate code all over the device agent functions
+func (agent *DeviceAgent) getDeviceWithoutLock() (*voltha.Device, error) {
+ if device := agent.clusterDataProxy.Get("/devices/"+agent.deviceId, 1, false, ""); device != nil {
+ if d, ok := device.(*voltha.Device); ok {
+ cloned := proto.Clone(d).(*voltha.Device)
+ return cloned, nil
+ }
+ }
+ return nil, status.Errorf(codes.NotFound, "device-%s", agent.deviceId)
+}
+
+// getPorts retrieves the ports information of the device based on the port type.
+func (agent *DeviceAgent) getPorts(ctx context.Context, portType voltha.Port_PortType) *voltha.Ports {
+ log.Debugw("getPorts", log.Fields{"id": agent.deviceId, "portType": portType})
+ ports := &voltha.Ports{}
+ if device, _ := agent.deviceMgr.GetDevice(agent.deviceId); device != nil {
+ for _, port := range device.Ports {
+ if port.Type == portType {
+ ports.Items = append(ports.Items, port)
+ }
+ }
+ }
+ return ports
+}
+
+// ListDevicePorts retrieves the ports information for a particular device.
+func (agent *DeviceAgent) ListDevicePorts(ctx context.Context) (*voltha.Ports, error) {
+ log.Debugw("ListDevicePorts", log.Fields{"id": agent.deviceId})
+ ports := &voltha.Ports{}
+ if device, _ := agent.deviceMgr.GetDevice(agent.deviceId); device != nil {
+ for _, entry := range device.GetPorts() {
+ ports.Items = append(ports.Items, entry)
+ }
+ return ports, nil
+ }
+ return nil, status.Errorf(codes.NotFound, "device-%s", agent.deviceId)
+}
+
+// ListDevicePmConfigs retrieves the ports information for a particular device.
+func (agent *DeviceAgent) ListDevicePmConfigs(ctx context.Context) (*voltha.PmConfigs, error) {
+ log.Debugw("ListDevicePmConfigs", log.Fields{"id": agent.deviceId})
+ if device, _ := agent.deviceMgr.GetDevice(agent.deviceId); device != nil {
+ return device.GetPmConfigs(), nil
+ }
+ return nil, status.Errorf(codes.NotFound, "device-%s", agent.deviceId)
+}
+
+// ListDeviceFlows retrieves the ports information for a particular device.
+func (agent *DeviceAgent) ListDeviceFlows(ctx context.Context) (*voltha.Flows, error) {
+ log.Debugw("ListDeviceFlows", log.Fields{"id": agent.deviceId})
+ if device, _ := agent.deviceMgr.GetDevice(agent.deviceId); device != nil {
+ return device.GetFlows(), nil
+ }
+ return nil, status.Errorf(codes.NotFound, "device-%s", agent.deviceId)
+}
+
+// ListDeviceFlows retrieves the ports information for a particular device.
+func (agent *DeviceAgent) ListDeviceFlowGroups(ctx context.Context) (*voltha.FlowGroups, error) {
+ log.Debugw("ListDeviceFlowGroups", log.Fields{"id": agent.deviceId})
+ if device, _ := agent.deviceMgr.GetDevice(agent.deviceId); device != nil {
+ return device.GetFlowGroups(), nil
+ }
+ return nil, status.Errorf(codes.NotFound, "device-%s", agent.deviceId)
+}
+
+// GetImageDownloadStatus retrieves the download status of an image of a particular device.
+func (agent *DeviceAgent) GetImageDownloadStatus(ctx context.Context, imageName string) (*voltha.ImageDownload, error) {
+ log.Debugw("GetImageDownloadStatus", log.Fields{"id": agent.deviceId})
+ if device, _ := agent.deviceMgr.GetDevice(agent.deviceId); device != nil {
+ for _, img := range device.GetImageDownloads() {
+ if img.GetName() == imageName {
+ return img, nil
+ }
+ }
+ return nil, status.Errorf(codes.NotFound, "device-%s, image-%s", agent.deviceId, imageName)
+ }
+ return nil, status.Errorf(codes.NotFound, "device-%s", agent.deviceId)
+}
+
+// GetImageDownload retrieves the image download for a particular device.
+func (agent *DeviceAgent) GetImageDownload(ctx context.Context, imageName string) (*voltha.ImageDownload, error) {
+ log.Debugw("GetImageDownload", log.Fields{"id": agent.deviceId})
+ if device, _ := agent.deviceMgr.GetDevice(agent.deviceId); device != nil {
+ for _, img := range device.GetImageDownloads() {
+ if img.GetName() == imageName {
+ return img, nil
+ }
+ }
+ return nil, status.Errorf(codes.NotFound, "device-%s, image-%s", agent.deviceId, imageName)
+ }
+ return nil, status.Errorf(codes.NotFound, "device-%s", agent.deviceId)
+}
+
+// ListImageDownloads retrieves the image downloads for a particular device.
+func (agent *DeviceAgent) ListImageDownloads(ctx context.Context) (*voltha.ImageDownloads, error) {
+ log.Debugw("ListImageDownloads", log.Fields{"id": agent.deviceId})
+ if device, _ := agent.deviceMgr.GetDevice(agent.deviceId); device != nil {
+ return &voltha.ImageDownloads{Items: device.GetImageDownloads()}, nil
+ }
+ return nil, status.Errorf(codes.NotFound, "device-%s", agent.deviceId)
+}
+
+// GetImages retrieves the list of images for a particular device.
+func (agent *DeviceAgent) GetImages(ctx context.Context) (*voltha.Images, error) {
+ log.Debugw("GetImages", log.Fields{"id": agent.deviceId})
+ if device, _ := agent.deviceMgr.GetDevice(agent.deviceId); device != nil {
+ return device.GetImages(), nil
+ }
+ return nil, status.Errorf(codes.NotFound, "device-%s", agent.deviceId)
+}
\ No newline at end of file
diff --git a/ro_core/core/device_manager.go b/ro_core/core/device_manager.go
new file mode 100644
index 0000000..1851e27
--- /dev/null
+++ b/ro_core/core/device_manager.go
@@ -0,0 +1,268 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package core
+
+import (
+ "context"
+ "github.com/opencord/voltha-go/common/log"
+ "github.com/opencord/voltha-go/db/model"
+ "github.com/opencord/voltha-go/protos/voltha"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+ "sync"
+)
+
+type DeviceManager struct {
+ deviceAgents map[string]*DeviceAgent
+ logicalDeviceMgr *LogicalDeviceManager
+ clusterDataProxy *model.Proxy
+ coreInstanceId string
+ exitChannel chan int
+ lockDeviceAgentsMap sync.RWMutex
+}
+
+func newDeviceManager(cdProxy *model.Proxy, coreInstanceId string) *DeviceManager {
+ var deviceMgr DeviceManager
+ deviceMgr.exitChannel = make(chan int, 1)
+ deviceMgr.deviceAgents = make(map[string]*DeviceAgent)
+ deviceMgr.coreInstanceId = coreInstanceId
+ deviceMgr.clusterDataProxy = cdProxy
+ deviceMgr.lockDeviceAgentsMap = sync.RWMutex{}
+ return &deviceMgr
+}
+
+func (dMgr *DeviceManager) start(ctx context.Context, logicalDeviceMgr *LogicalDeviceManager) {
+ log.Info("starting-device-manager")
+ dMgr.logicalDeviceMgr = logicalDeviceMgr
+ log.Info("device-manager-started")
+}
+
+func (dMgr *DeviceManager) stop(ctx context.Context) {
+ log.Info("stopping-device-manager")
+ dMgr.exitChannel <- 1
+ log.Info("device-manager-stopped")
+}
+
+func sendResponse(ctx context.Context, ch chan interface{}, result interface{}) {
+ if ctx.Err() == nil {
+ // Returned response only of the ctx has not been cancelled/timeout/etc
+ // Channel is automatically closed when a context is Done
+ ch <- result
+ log.Debugw("sendResponse", log.Fields{"result": result})
+ } else {
+ // Should the transaction be reverted back?
+ log.Debugw("sendResponse-context-error", log.Fields{"context-error": ctx.Err()})
+ }
+}
+
+func (dMgr *DeviceManager) addDeviceAgentToMap(agent *DeviceAgent) {
+ dMgr.lockDeviceAgentsMap.Lock()
+ defer dMgr.lockDeviceAgentsMap.Unlock()
+ if _, exist := dMgr.deviceAgents[agent.deviceId]; !exist {
+ dMgr.deviceAgents[agent.deviceId] = agent
+ }
+}
+
+func (dMgr *DeviceManager) deleteDeviceAgentToMap(agent *DeviceAgent) {
+ dMgr.lockDeviceAgentsMap.Lock()
+ defer dMgr.lockDeviceAgentsMap.Unlock()
+ delete(dMgr.deviceAgents, agent.deviceId)
+}
+
+func (dMgr *DeviceManager) getDeviceAgent(deviceId string) *DeviceAgent {
+ // TODO If the device is not in memory it needs to be loaded first
+ dMgr.lockDeviceAgentsMap.Lock()
+ defer dMgr.lockDeviceAgentsMap.Unlock()
+ if agent, ok := dMgr.deviceAgents[deviceId]; ok {
+ return agent
+ }
+ return nil
+}
+
+func (dMgr *DeviceManager) listDeviceIdsFromMap() *voltha.IDs {
+ dMgr.lockDeviceAgentsMap.Lock()
+ defer dMgr.lockDeviceAgentsMap.Unlock()
+ result := &voltha.IDs{Items: make([]*voltha.ID, 0)}
+ for key, _ := range dMgr.deviceAgents {
+ result.Items = append(result.Items, &voltha.ID{Id: key})
+ }
+ return result
+}
+
+func (dMgr *DeviceManager) GetDevice(id string) (*voltha.Device, error) {
+ log.Debugw("GetDevice", log.Fields{"deviceid": id})
+ if agent := dMgr.getDeviceAgent(id); agent != nil {
+ return agent.getDevice()
+ }
+ return nil, status.Errorf(codes.NotFound, "%s", id)
+}
+
+func (dMgr *DeviceManager) IsRootDevice(id string) (bool, error) {
+ device, err := dMgr.GetDevice(id)
+ if err != nil {
+ return false, err
+ }
+ return device.Root, nil
+}
+
+// GetDevice retrieves the latest device information from the data model
+func (dMgr *DeviceManager) ListDevices() (*voltha.Devices, error) {
+ log.Debug("ListDevices")
+ result := &voltha.Devices{}
+ if devices := dMgr.clusterDataProxy.Get("/devices", 0, false, ""); devices != nil {
+ for _, device := range devices.([]interface{}) {
+ if agent := dMgr.getDeviceAgent(device.(*voltha.Device).Id); agent == nil {
+ agent = newDeviceAgent(device.(*voltha.Device), dMgr, dMgr.clusterDataProxy)
+ dMgr.addDeviceAgentToMap(agent)
+ agent.start(nil)
+ }
+ result.Items = append(result.Items, device.(*voltha.Device))
+ }
+ }
+ return result, nil
+}
+
+// ListDeviceIds retrieves the latest device IDs information from the data model (memory data only)
+func (dMgr *DeviceManager) ListDeviceIds() (*voltha.IDs, error) {
+ log.Debug("ListDeviceIDs")
+ // Report only device IDs that are in the device agent map
+ return dMgr.listDeviceIdsFromMap(), nil
+}
+
+//ReconcileDevices is a request to a voltha core to managed a list of devices based on their IDs
+func (dMgr *DeviceManager) ReconcileDevices(ctx context.Context, ids *voltha.IDs, ch chan interface{}) {
+ log.Debug("ReconcileDevices")
+ var res interface{}
+ if ids != nil {
+ toReconcile := len(ids.Items)
+ reconciled := 0
+ for _, id := range ids.Items {
+ // Act on the device only if its not present in the agent map
+ if agent := dMgr.getDeviceAgent(id.Id); agent == nil {
+ // Device Id not in memory
+ log.Debugw("reconciling-device", log.Fields{"id": id.Id})
+ // Load device from model
+ if device := dMgr.clusterDataProxy.Get("/devices/"+id.Id, 0, false, ""); device != nil {
+ agent = newDeviceAgent(device.(*voltha.Device), dMgr, dMgr.clusterDataProxy)
+ dMgr.addDeviceAgentToMap(agent)
+ agent.start(nil)
+ reconciled += 1
+ } else {
+ log.Warnw("device-inexistent", log.Fields{"id": id.Id})
+ }
+ } else {
+ reconciled += 1
+ }
+ }
+ if toReconcile != reconciled {
+ res = status.Errorf(codes.DataLoss, "less-device-reconciled:%d/%d", reconciled, toReconcile)
+ }
+ } else {
+ res = status.Errorf(codes.InvalidArgument, "empty-list-of-ids")
+ }
+ sendResponse(ctx, ch, res)
+}
+
+func (dMgr *DeviceManager) getPorts(ctx context.Context, deviceId string, portType voltha.Port_PortType) (*voltha.Ports, error) {
+ log.Debugw("getPorts", log.Fields{"deviceid": deviceId, "portType": portType})
+ if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
+ return agent.getPorts(ctx, portType), nil
+ }
+ return nil, status.Errorf(codes.NotFound, "%s", deviceId)
+
+}
+
+func (dMgr *DeviceManager) ListDevicePorts(ctx context.Context, deviceId string) (*voltha.Ports, error) {
+ log.Debugw("ListDevicePorts", log.Fields{"deviceid": deviceId})
+ if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
+ return agent.ListDevicePorts(ctx)
+ }
+ return nil, status.Errorf(codes.NotFound, "%s", deviceId)
+
+}
+
+func (dMgr *DeviceManager) ListDevicePmConfigs(ctx context.Context, deviceId string) (*voltha.PmConfigs, error) {
+ log.Debugw("ListDevicePmConfigs", log.Fields{"deviceid": deviceId})
+ if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
+ return agent.ListDevicePmConfigs(ctx)
+ }
+ return nil, status.Errorf(codes.NotFound, "%s", deviceId)
+
+}
+
+func (dMgr *DeviceManager) ListDeviceFlows(ctx context.Context, deviceId string) (*voltha.Flows, error) {
+ log.Debugw("ListDeviceFlows", log.Fields{"deviceid": deviceId})
+ if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
+ return agent.ListDeviceFlows(ctx)
+ }
+ return nil, status.Errorf(codes.NotFound, "%s", deviceId)
+
+}
+
+func (dMgr *DeviceManager) ListDeviceFlowGroups(ctx context.Context, deviceId string) (*voltha.FlowGroups, error) {
+ log.Debugw("ListDeviceFlowGroups", log.Fields{"deviceid": deviceId})
+ if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
+ return agent.ListDeviceFlowGroups(ctx)
+ }
+ return nil, status.Errorf(codes.NotFound, "%s", deviceId)
+
+}
+
+func (dMgr *DeviceManager) GetImageDownloadStatus(ctx context.Context, deviceId string, imageName string) (*voltha.ImageDownload, error) {
+ log.Debugw("GetImageDownloadStatus", log.Fields{"deviceid": deviceId, "imagename": imageName})
+ if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
+ return agent.GetImageDownloadStatus(ctx, imageName)
+ }
+ return nil, status.Errorf(codes.NotFound, "%s", deviceId)
+
+}
+
+func (dMgr *DeviceManager) GetImageDownload(ctx context.Context, deviceId string, imageName string) ( *voltha.ImageDownload, error) {
+ log.Debugw("GetImageDownload", log.Fields{"deviceid": deviceId, "imagename": imageName})
+ if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
+ return agent.GetImageDownload(ctx, imageName)
+ }
+ return nil, status.Errorf(codes.NotFound, "%s", deviceId)
+
+}
+
+func (dMgr *DeviceManager) ListImageDownloads(ctx context.Context, deviceId string) ( *voltha.ImageDownloads, error) {
+ log.Debugw("ListImageDownloads", log.Fields{"deviceid": deviceId})
+ if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
+ return agent.ListImageDownloads(ctx)
+ }
+ return nil, status.Errorf(codes.NotFound, "%s", deviceId)
+
+}
+
+func (dMgr *DeviceManager) GetImages(ctx context.Context, deviceId string) ( *voltha.Images, error) {
+ log.Debugw("GetImages", log.Fields{"deviceid": deviceId})
+ if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
+ return agent.GetImages(ctx)
+ }
+ return nil, status.Errorf(codes.NotFound, "%s", deviceId)
+
+}
+
+func (dMgr *DeviceManager) getParentDevice(childDevice *voltha.Device) *voltha.Device {
+ // Sanity check
+ if childDevice.Root {
+ // childDevice is the parent device
+ return childDevice
+ }
+ parentDevice, _ := dMgr.GetDevice(childDevice.ParentId)
+ return parentDevice
+}
diff --git a/ro_core/core/grpc_nbi_api_handler.go b/ro_core/core/grpc_nbi_api_handler.go
new file mode 100644
index 0000000..5727983
--- /dev/null
+++ b/ro_core/core/grpc_nbi_api_handler.go
@@ -0,0 +1,254 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package core
+
+import (
+ "context"
+ "errors"
+ "github.com/golang/protobuf/ptypes/empty"
+ da "github.com/opencord/voltha-go/common/core/northbound/grpc"
+ "github.com/opencord/voltha-go/common/log"
+ "github.com/opencord/voltha-go/protos/common"
+ "github.com/opencord/voltha-go/protos/voltha"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/metadata"
+ "google.golang.org/grpc/status"
+)
+
+type APIHandler struct {
+ commonMgr *ModelProxyManager
+ deviceMgr *DeviceManager
+ logicalDeviceMgr *LogicalDeviceManager
+ da.DefaultAPIHandler
+}
+
+func NewAPIHandler(generalMgr *ModelProxyManager, deviceMgr *DeviceManager, lDeviceMgr *LogicalDeviceManager) *APIHandler {
+ handler := &APIHandler{
+ commonMgr: generalMgr,
+ deviceMgr: deviceMgr,
+ logicalDeviceMgr: lDeviceMgr,
+ }
+ return handler
+}
+
+// isTestMode is a helper function to determine a function is invoked for testing only
+func isTestMode(ctx context.Context) bool {
+ md, _ := metadata.FromIncomingContext(ctx)
+ _, exist := md[common.TestModeKeys_api_test.String()]
+ return exist
+}
+
+// waitForNilResponseOnSuccess is a helper function to wait for a response on channel ch where an nil
+// response is expected in a successful scenario
+func waitForNilResponseOnSuccess(ctx context.Context, ch chan interface{}) (*empty.Empty, error) {
+ select {
+ case res := <-ch:
+ if res == nil {
+ return new(empty.Empty), nil
+ } else if err, ok := res.(error); ok {
+ return new(empty.Empty), err
+ } else {
+ log.Warnw("unexpected-return-type", log.Fields{"result": res})
+ err = status.Errorf(codes.Internal, "%s", res)
+ return new(empty.Empty), err
+ }
+ case <-ctx.Done():
+ log.Debug("client-timeout")
+ return nil, ctx.Err()
+ }
+}
+
+// GetVoltha returns the contents of all components (i.e. devices, logical_devices, ...)
+func (handler *APIHandler) GetVoltha(ctx context.Context, empty *empty.Empty) (*voltha.Voltha, error) {
+ log.Debug("GetVoltha")
+ return handler.commonMgr.GetVoltha(ctx)
+}
+
+// ListCoreInstances returns details on the running core containers
+func (handler *APIHandler) ListCoreInstances(ctx context.Context, empty *empty.Empty) (*voltha.CoreInstances, error) {
+ log.Debug("ListCoreInstances")
+ return handler.commonMgr.ListCoreInstances(ctx)
+}
+
+// GetCoreInstance returns the details of a specific core container
+func (handler *APIHandler) GetCoreInstance(ctx context.Context, id *voltha.ID) (*voltha.CoreInstance, error) {
+ log.Debugw("GetCoreInstance", log.Fields{"id": id})
+ return handler.commonMgr.GetCoreInstance(ctx, id.Id)
+}
+
+// ListAdapters returns the contents of all adapters known to the system
+func (handler *APIHandler) ListAdapters(ctx context.Context, empty *empty.Empty) (*voltha.Adapters, error) {
+ log.Debug("ListDevices")
+ return handler.commonMgr.ListAdapters(ctx)
+}
+
+// GetDevice returns the details a specific device
+func (handler *APIHandler) GetDevice(ctx context.Context, id *voltha.ID) (*voltha.Device, error) {
+ log.Debugw("GetDevice-request", log.Fields{"id": id})
+ return handler.deviceMgr.GetDevice(id.Id)
+}
+
+// ListDevices returns the contents of all devices known to the system
+func (handler *APIHandler) ListDevices(ctx context.Context, empty *empty.Empty) (*voltha.Devices, error) {
+ log.Debug("ListDevices")
+ return handler.deviceMgr.ListDevices()
+}
+
+// ListDeviceIds returns the list of device ids managed by a voltha core
+func (handler *APIHandler) ListDeviceIds(ctx context.Context, empty *empty.Empty) (*voltha.IDs, error) {
+ log.Debug("ListDeviceIDs")
+ if isTestMode(ctx) {
+ out := &voltha.IDs{Items: make([]*voltha.ID, 0)}
+ return out, nil
+ }
+ return handler.deviceMgr.ListDeviceIds()
+}
+
+// ListDevicePorts returns the ports details for a specific device entry
+func (handler *APIHandler) ListDevicePorts(ctx context.Context, id *voltha.ID) (*voltha.Ports, error) {
+ log.Debugw("ListDevicePorts", log.Fields{"deviceid": id})
+ return handler.deviceMgr.ListDevicePorts(ctx, id.Id)
+}
+
+// ListDevicePmConfigs returns the PM config details for a specific device entry
+func (handler *APIHandler) ListDevicePmConfigs(ctx context.Context, id *voltha.ID) (*voltha.PmConfigs, error) {
+ log.Debugw("ListDevicePmConfigs", log.Fields{"deviceid": id})
+ return handler.deviceMgr.ListDevicePmConfigs(ctx, id.Id)
+}
+
+// ListDeviceFlows returns the flow details for a specific device entry
+func (handler *APIHandler) ListDeviceFlows(ctx context.Context, id *voltha.ID) (*voltha.Flows, error) {
+ log.Debugw("ListDeviceFlows", log.Fields{"deviceid": id})
+ return handler.deviceMgr.ListDeviceFlows(ctx, id.Id)
+}
+
+// ListDeviceFlowGroups returns the flow group details for a specific device entry
+func (handler *APIHandler) ListDeviceFlowGroups(ctx context.Context, id *voltha.ID) (*voltha.FlowGroups, error) {
+ log.Debugw("ListDeviceFlowGroups", log.Fields{"deviceid": id})
+ return handler.deviceMgr.ListDeviceFlowGroups(ctx, id.Id)
+}
+
+// ListDeviceTypes returns all the device types known to the system
+func (handler *APIHandler) ListDeviceTypes(ctx context.Context, empty *empty.Empty) (*voltha.DeviceTypes, error) {
+ log.Debug("ListDeviceTypes")
+ return handler.commonMgr.ListDeviceTypes(ctx)
+}
+
+// GetDeviceType returns the device type for a specific device entry
+func (handler *APIHandler) GetDeviceType(ctx context.Context, id *voltha.ID) (*voltha.DeviceType, error) {
+ log.Debugw("GetDeviceType", log.Fields{"typeid": id})
+ return handler.commonMgr.GetDeviceType(ctx, id.Id)
+}
+
+// ListDeviceGroups returns all the device groups known to the system
+func (handler *APIHandler) ListDeviceGroups(ctx context.Context, empty *empty.Empty) (*voltha.DeviceGroups, error) {
+ log.Debug("ListDeviceGroups")
+ return handler.commonMgr.ListDeviceGroups(ctx)
+}
+
+// GetDeviceGroup returns a specific device group entry
+func (handler *APIHandler) GetDeviceGroup(ctx context.Context, id *voltha.ID) (*voltha.DeviceGroup, error) {
+ log.Debugw("GetDeviceGroup", log.Fields{"groupid": id})
+ return handler.commonMgr.GetDeviceGroup(ctx, id.Id)
+}
+
+// GetImageDownloadStatus returns the download status for a specific image entry
+func (handler *APIHandler) GetImageDownloadStatus(ctx context.Context, img *voltha.ImageDownload) (*voltha.ImageDownload, error) {
+ log.Debugw("GetImageDownloadStatus", log.Fields{"deviceid": img.GetId(), "imagename": img.GetName()})
+ return handler.deviceMgr.GetImageDownloadStatus(ctx, img.GetId(), img.GetName())
+}
+
+// GetImageDownload return the download details for a specific image entry
+func (handler *APIHandler) GetImageDownload(ctx context.Context, img *voltha.ImageDownload) (*voltha.ImageDownload, error) {
+ log.Debugw("GetImageDownload", log.Fields{"deviceid": img.GetId(), "imagename": img.GetName()})
+ return handler.deviceMgr.GetImageDownload(ctx, img.GetId(), img.GetName())
+}
+
+// ListImageDownloads returns all image downloads known to the system
+func (handler *APIHandler) ListImageDownloads(ctx context.Context, id *voltha.ID) (*voltha.ImageDownloads, error) {
+ log.Debugw("GetImageDownload", log.Fields{"deviceid": id})
+ return handler.deviceMgr.ListImageDownloads(ctx, id.Id)
+}
+
+// GetImages returns all images for a specific device entry
+func (handler *APIHandler) GetImages(ctx context.Context, id *voltha.ID) (*voltha.Images, error) {
+ log.Debugw("GetImages", log.Fields{"deviceid": id})
+ return handler.deviceMgr.GetImages(ctx, id.Id)
+}
+
+// ListAlarmFilters return all alarm filters known to the system
+func (handler *APIHandler) ListAlarmFilters(ctx context.Context, empty *empty.Empty) (*voltha.AlarmFilters, error) {
+ log.Debug("ListAlarmFilters")
+ return handler.commonMgr.ListAlarmFilters(ctx)
+}
+
+// GetAlarmFilter returns a specific alarm filter entry
+func (handler *APIHandler) GetAlarmFilter(ctx context.Context, id *voltha.ID) (*voltha.AlarmFilter, error) {
+ log.Debugw("GetAlarmFilter", log.Fields{"alarmid": id})
+ return handler.commonMgr.GetAlarmFilter(ctx, id.Id)
+}
+
+//ReconcileDevices is a request to a voltha core to managed a list of devices based on their IDs
+func (handler *APIHandler) ReconcileDevices(ctx context.Context, ids *voltha.IDs) (*empty.Empty, error) {
+ log.Debug("ReconcileDevices")
+ if isTestMode(ctx) {
+ out := new(empty.Empty)
+ return out, nil
+ }
+ ch := make(chan interface{})
+ defer close(ch)
+ go handler.deviceMgr.ReconcileDevices(ctx, ids, ch)
+ return waitForNilResponseOnSuccess(ctx, ch)
+}
+
+// GetLogicalDevice returns the details for a specific logical device entry
+func (handler *APIHandler) GetLogicalDevice(ctx context.Context, id *voltha.ID) (*voltha.LogicalDevice, error) {
+ log.Debugw("GetLogicalDevice-request", log.Fields{"id": id})
+ return handler.logicalDeviceMgr.getLogicalDevice(id.Id)
+}
+
+// ListLogicalDevices returns all logical devices known to the system
+func (handler *APIHandler) ListLogicalDevices(ctx context.Context, empty *empty.Empty) (*voltha.LogicalDevices, error) {
+ log.Debug("ListLogicalDevices")
+ return handler.logicalDeviceMgr.listLogicalDevices()
+}
+
+// ListLogicalDevicePorts returns port details for a specific logical device entry
+func (handler *APIHandler) ListLogicalDevicePorts(ctx context.Context, id *voltha.ID) (*voltha.LogicalPorts, error) {
+ log.Debugw("ListLogicalDevicePorts", log.Fields{"logicaldeviceid": id})
+ return handler.logicalDeviceMgr.ListLogicalDevicePorts(ctx, id.Id)
+}
+
+// ListLogicalDeviceFlows returns flow details for a specific logical device entry
+func (handler *APIHandler) ListLogicalDeviceFlows(ctx context.Context, id *voltha.ID) (*voltha.Flows, error) {
+ log.Debugw("ListLogicalDeviceFlows", log.Fields{"logicaldeviceid": id})
+ return handler.logicalDeviceMgr.ListLogicalDeviceFlows(ctx, id.Id)
+}
+
+// ListLogicalDeviceFlowGroups returns flow group details for a specific logical device entry
+func (handler *APIHandler) ListLogicalDeviceFlowGroups(ctx context.Context, id *voltha.ID) (*voltha.FlowGroups, error) {
+ log.Debugw("ListLogicalDeviceFlows", log.Fields{"logicaldeviceid": id})
+ return handler.logicalDeviceMgr.ListLogicalDeviceFlowGroups(ctx, id.Id)
+}
+
+func (handler *APIHandler) SelfTest(ctx context.Context, id *voltha.ID) (*voltha.SelfTestResponse, error) {
+ log.Debugw("SelfTest-request", log.Fields{"id": id})
+ if isTestMode(ctx) {
+ resp := &voltha.SelfTestResponse{Result: voltha.SelfTestResponse_SUCCESS}
+ return resp, nil
+ }
+ return nil, errors.New("UnImplemented")
+}
diff --git a/ro_core/core/logical_device_agent.go b/ro_core/core/logical_device_agent.go
new file mode 100644
index 0000000..9cb6655
--- /dev/null
+++ b/ro_core/core/logical_device_agent.go
@@ -0,0 +1,119 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package core
+
+import (
+ "context"
+ "github.com/opencord/voltha-go/common/log"
+ "github.com/opencord/voltha-go/db/model"
+ "github.com/opencord/voltha-go/protos/voltha"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+ "sync"
+)
+
+type LogicalDeviceAgent struct {
+ logicalDeviceId string
+ lastData *voltha.LogicalDevice
+ rootDeviceId string
+ deviceMgr *DeviceManager
+ ldeviceMgr *LogicalDeviceManager
+ clusterDataProxy *model.Proxy
+ exitChannel chan int
+ lockLogicalDevice sync.RWMutex
+}
+
+func newLogicalDeviceAgent(id string, deviceId string, ldeviceMgr *LogicalDeviceManager, deviceMgr *DeviceManager, cdProxy *model.Proxy) *LogicalDeviceAgent {
+ var agent LogicalDeviceAgent
+ agent.exitChannel = make(chan int, 1)
+ agent.logicalDeviceId = id
+ agent.rootDeviceId = deviceId
+ agent.deviceMgr = deviceMgr
+ agent.clusterDataProxy = cdProxy
+ agent.ldeviceMgr = ldeviceMgr
+ agent.lockLogicalDevice = sync.RWMutex{}
+ return &agent
+}
+
+// start creates the logical device and add it to the data model
+func (agent *LogicalDeviceAgent) start(ctx context.Context) error {
+ log.Infow("starting-logical_device-agent", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
+ agent.lockLogicalDevice.Lock()
+ defer agent.lockLogicalDevice.Unlock()
+ log.Info("logical_device-agent-started")
+ return nil
+}
+
+// stop terminates the logical device agent.
+func (agent *LogicalDeviceAgent) stop(ctx context.Context) {
+ log.Info("stopping-logical_device-agent")
+ agent.lockLogicalDevice.Lock()
+ defer agent.lockLogicalDevice.Unlock()
+ //Remove the logical device from the model
+ agent.exitChannel <- 1
+ log.Info("logical_device-agent-stopped")
+}
+
+// GetLogicalDevice locks the logical device model and then retrieves the latest logical device information
+func (agent *LogicalDeviceAgent) GetLogicalDevice() (*voltha.LogicalDevice, error) {
+ log.Debug("GetLogicalDevice")
+ agent.lockLogicalDevice.Lock()
+ defer agent.lockLogicalDevice.Unlock()
+ logicalDevice := agent.clusterDataProxy.Get("/logical_devices/"+agent.logicalDeviceId, 1, false, "")
+ if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
+ return lDevice, nil
+ }
+ return nil, status.Errorf(codes.NotFound, "logical_device-%s", agent.logicalDeviceId)
+}
+
+func (agent *LogicalDeviceAgent) ListLogicalDevicePorts() (*voltha.LogicalPorts, error) {
+ log.Debug("!!!!!ListLogicalDevicePorts")
+ agent.lockLogicalDevice.Lock()
+ defer agent.lockLogicalDevice.Unlock()
+ logicalDevice := agent.clusterDataProxy.Get("/logical_devices/"+agent.logicalDeviceId, 1, false, "")
+ if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
+ lPorts := make([]*voltha.LogicalPort, 0)
+ for _, port := range lDevice.Ports {
+ lPorts = append(lPorts, port)
+ }
+ return &voltha.LogicalPorts{Items: lPorts}, nil
+ }
+ return nil, status.Errorf(codes.NotFound, "logical_device-%s", agent.logicalDeviceId)
+}
+
+// listFlows locks the logical device model and then retrieves the latest flow information
+func (agent *LogicalDeviceAgent) ListLogicalDeviceFlows() (*voltha.Flows, error) {
+ log.Debug("listFlows")
+ agent.lockLogicalDevice.Lock()
+ defer agent.lockLogicalDevice.Unlock()
+ logicalDevice := agent.clusterDataProxy.Get("/logical_devices/"+agent.logicalDeviceId+"/flows", 1, false, "")
+ if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
+ return lDevice.Flows, nil
+ }
+ return nil, status.Errorf(codes.NotFound, "logical_device-%s", agent.logicalDeviceId)
+}
+
+// listFlowGroups locks the logical device model and then retrieves the latest flow groups information
+func (agent *LogicalDeviceAgent) ListLogicalDeviceFlowGroups() (*voltha.FlowGroups, error) {
+ log.Debug("listFlowGroups")
+ agent.lockLogicalDevice.Lock()
+ defer agent.lockLogicalDevice.Unlock()
+ logicalDevice := agent.clusterDataProxy.Get("/logical_devices/"+agent.logicalDeviceId, 1, false, "")
+ if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
+ return lDevice.FlowGroups, nil
+ }
+ return nil, status.Errorf(codes.NotFound, "logical_device-%s", agent.logicalDeviceId)
+}
diff --git a/ro_core/core/logical_device_manager.go b/ro_core/core/logical_device_manager.go
new file mode 100644
index 0000000..64ccf28
--- /dev/null
+++ b/ro_core/core/logical_device_manager.go
@@ -0,0 +1,191 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package core
+
+import (
+ "context"
+ "github.com/opencord/voltha-go/common/log"
+ "github.com/opencord/voltha-go/db/model"
+ "github.com/opencord/voltha-go/protos/voltha"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+ "sync"
+)
+
+type LogicalDeviceManager struct {
+ logicalDeviceAgents map[string]*LogicalDeviceAgent
+ deviceMgr *DeviceManager
+ grpcNbiHdlr *APIHandler
+ clusterDataProxy *model.Proxy
+ exitChannel chan int
+ lockLogicalDeviceAgentsMap sync.RWMutex
+}
+
+func newLogicalDeviceManager(deviceMgr *DeviceManager, cdProxy *model.Proxy) *LogicalDeviceManager {
+ var logicalDeviceMgr LogicalDeviceManager
+ logicalDeviceMgr.exitChannel = make(chan int, 1)
+ logicalDeviceMgr.logicalDeviceAgents = make(map[string]*LogicalDeviceAgent)
+ logicalDeviceMgr.deviceMgr = deviceMgr
+ logicalDeviceMgr.clusterDataProxy = cdProxy
+ logicalDeviceMgr.lockLogicalDeviceAgentsMap = sync.RWMutex{}
+ return &logicalDeviceMgr
+}
+
+func (ldMgr *LogicalDeviceManager) setGrpcNbiHandler(grpcNbiHandler *APIHandler) {
+ ldMgr.grpcNbiHdlr = grpcNbiHandler
+}
+
+func (ldMgr *LogicalDeviceManager) start(ctx context.Context) {
+ log.Info("starting-logical-device-manager")
+ log.Info("logical-device-manager-started")
+}
+
+func (ldMgr *LogicalDeviceManager) stop(ctx context.Context) {
+ log.Info("stopping-logical-device-manager")
+ ldMgr.exitChannel <- 1
+ log.Info("logical-device-manager-stopped")
+}
+
+func (ldMgr *LogicalDeviceManager) addLogicalDeviceAgentToMap(agent *LogicalDeviceAgent) {
+ ldMgr.lockLogicalDeviceAgentsMap.Lock()
+ defer ldMgr.lockLogicalDeviceAgentsMap.Unlock()
+ if _, exist := ldMgr.logicalDeviceAgents[agent.logicalDeviceId]; !exist {
+ ldMgr.logicalDeviceAgents[agent.logicalDeviceId] = agent
+ }
+}
+
+func (ldMgr *LogicalDeviceManager) getLogicalDeviceAgent(logicalDeviceId string) *LogicalDeviceAgent {
+ ldMgr.lockLogicalDeviceAgentsMap.Lock()
+ defer ldMgr.lockLogicalDeviceAgentsMap.Unlock()
+ if agent, ok := ldMgr.logicalDeviceAgents[logicalDeviceId]; ok {
+ return agent
+ }
+ return nil
+}
+
+func (ldMgr *LogicalDeviceManager) deleteLogicalDeviceAgent(logicalDeviceId string) {
+ ldMgr.lockLogicalDeviceAgentsMap.Lock()
+ defer ldMgr.lockLogicalDeviceAgentsMap.Unlock()
+ delete(ldMgr.logicalDeviceAgents, logicalDeviceId)
+}
+
+// GetLogicalDevice provides a cloned most up to date logical device
+func (ldMgr *LogicalDeviceManager) getLogicalDevice(id string) (*voltha.LogicalDevice, error) {
+ log.Debugw("getlogicalDevice", log.Fields{"logicaldeviceid": id})
+ if agent := ldMgr.getLogicalDeviceAgent(id); agent != nil {
+ return agent.GetLogicalDevice()
+ }
+ return nil, status.Errorf(codes.NotFound, "%s", id)
+}
+
+func (ldMgr *LogicalDeviceManager) listLogicalDevices() (*voltha.LogicalDevices, error) {
+ log.Debug("ListAllLogicalDevices")
+ result := &voltha.LogicalDevices{}
+ if logicalDevices := ldMgr.clusterDataProxy.Get("/logical_devices", 0, false, ""); logicalDevices != nil {
+ for _, logicalDevice := range logicalDevices.([]interface{}) {
+ if agent := ldMgr.getLogicalDeviceAgent(logicalDevice.(*voltha.LogicalDevice).Id); agent == nil {
+ agent = newLogicalDeviceAgent(
+ logicalDevice.(*voltha.LogicalDevice).Id,
+ logicalDevice.(*voltha.LogicalDevice).RootDeviceId,
+ ldMgr,
+ ldMgr.deviceMgr,
+ ldMgr.clusterDataProxy,
+ )
+ ldMgr.addLogicalDeviceAgentToMap(agent)
+ go agent.start(nil)
+ }
+ result.Items = append(result.Items, logicalDevice.(*voltha.LogicalDevice))
+ }
+ }
+ return result, nil
+}
+
+func (ldMgr *LogicalDeviceManager) getLogicalDeviceId(device *voltha.Device) (*string, error) {
+ // Device can either be a parent or a child device
+ if device.Root {
+ // Parent device. The ID of a parent device is the logical device ID
+ return &device.ParentId, nil
+ }
+ // Device is child device
+ // retrieve parent device using child device ID
+ if parentDevice := ldMgr.deviceMgr.getParentDevice(device); parentDevice != nil {
+ return &parentDevice.ParentId, nil
+ }
+ return nil, status.Errorf(codes.NotFound, "%s", device.Id)
+}
+
+func (ldMgr *LogicalDeviceManager) getLogicalPortId(device *voltha.Device) (*voltha.LogicalPortId, error) {
+ // Get the logical device where this device is attached
+ var lDeviceId *string
+ var err error
+ if lDeviceId, err = ldMgr.getLogicalDeviceId(device); err != nil {
+ return nil, err
+ }
+ var lDevice *voltha.LogicalDevice
+ if lDevice, err = ldMgr.getLogicalDevice(*lDeviceId); err != nil {
+ return nil, err
+ }
+ // Go over list of ports
+ for _, port := range lDevice.Ports {
+ if port.DeviceId == device.Id {
+ return &voltha.LogicalPortId{Id: *lDeviceId, PortId: port.Id}, nil
+ }
+ }
+ return nil, status.Errorf(codes.NotFound, "%s", device.Id)
+}
+
+func (ldMgr *LogicalDeviceManager) ListLogicalDevicePorts(ctx context.Context, id string) (*voltha.LogicalPorts, error) {
+ log.Debugw("ListLogicalDevicePorts", log.Fields{"logicaldeviceid": id})
+ if agent := ldMgr.getLogicalDeviceAgent(id); agent != nil {
+ return agent.ListLogicalDevicePorts()
+ }
+ return nil, status.Errorf(codes.NotFound, "%s", id)
+
+}
+
+func (ldMgr *LogicalDeviceManager) ListLogicalDeviceFlows(ctx context.Context, id string) (*voltha.Flows, error) {
+ log.Debugw("ListLogicalDeviceFlows", log.Fields{"logicaldeviceid": id})
+ if agent := ldMgr.getLogicalDeviceAgent(id); agent != nil {
+ return agent.ListLogicalDeviceFlows()
+ }
+ return nil, status.Errorf(codes.NotFound, "%s", id)
+
+}
+
+func (ldMgr *LogicalDeviceManager) ListLogicalDeviceFlowGroups(ctx context.Context, id string) (*voltha.FlowGroups, error) {
+ log.Debugw("ListLogicalDeviceFlowGroups", log.Fields{"logicaldeviceid": id})
+ if agent := ldMgr.getLogicalDeviceAgent(id); agent != nil {
+ return agent.ListLogicalDeviceFlowGroups()
+ }
+ return nil, status.Errorf(codes.NotFound, "%s", id)
+
+}
+
+func (ldMgr *LogicalDeviceManager) getLogicalPort(lPortId *voltha.LogicalPortId) (*voltha.LogicalPort, error) {
+ // Get the logical device where this device is attached
+ var err error
+ var lDevice *voltha.LogicalDevice
+ if lDevice, err = ldMgr.getLogicalDevice(lPortId.Id); err != nil {
+ return nil, err
+ }
+ // Go over list of ports
+ for _, port := range lDevice.Ports {
+ if port.Id == lPortId.PortId {
+ return port, nil
+ }
+ }
+ return nil, status.Errorf(codes.NotFound, "%s-$s", lPortId.Id, lPortId.PortId)
+}
diff --git a/ro_core/core/model_proxy.go b/ro_core/core/model_proxy.go
new file mode 100644
index 0000000..f5e6c3b
--- /dev/null
+++ b/ro_core/core/model_proxy.go
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package core
+
+import (
+ "github.com/opencord/voltha-go/common/log"
+ "github.com/opencord/voltha-go/db/model"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+ "strings"
+ "sync"
+)
+
+// ModelProxy controls requests made to the data model
+type ModelProxy struct {
+ rootProxy *model.Proxy
+ basePath string
+ mutex sync.RWMutex
+}
+
+func newModelProxy(basePath string, rootProxy *model.Proxy) *ModelProxy {
+ ga := &ModelProxy{}
+ ga.rootProxy = rootProxy
+
+ if strings.HasPrefix(basePath, "/") {
+ ga.basePath = basePath
+ } else {
+ ga.basePath = "/" + basePath
+ }
+
+ return ga
+}
+
+// Get retrieves information at the provided path
+func (mp *ModelProxy) Get(parts ...string) (interface{}, error) {
+ mp.mutex.Lock()
+ defer mp.mutex.Unlock()
+
+ rawPath := []string{mp.basePath}
+ rawPath = append(rawPath, parts...)
+ path := strings.Join(rawPath, "/")
+
+ log.Debugw("get-data", log.Fields{"path": path})
+
+ if data := mp.rootProxy.Get(path, 1, false, ""); data != nil {
+ return data, nil
+ }
+ return nil, status.Errorf(codes.NotFound, "data-path: %s", path)
+}
diff --git a/ro_core/core/model_proxy_manager.go b/ro_core/core/model_proxy_manager.go
new file mode 100644
index 0000000..d5f7ace
--- /dev/null
+++ b/ro_core/core/model_proxy_manager.go
@@ -0,0 +1,252 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package core
+
+import (
+ "context"
+ "github.com/opencord/voltha-go/common/log"
+ "github.com/opencord/voltha-go/db/model"
+ "github.com/opencord/voltha-go/protos/voltha"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+// Enumerated type to keep track of miscellaneous data path agents
+type DataModelType int
+
+// Enumerated list of data path agents
+const (
+ Adapters DataModelType = 1 + iota
+ AlarmFilters
+ CoreInstances
+ DeviceTypes
+ DeviceGroups
+ Voltha
+)
+
+// String equivalent for data path agents
+var commonTypes = []string {
+ "Adapters",
+ "AlarmFilters",
+ "CoreInstances",
+ "DeviceTypes",
+ "DeviceGroups",
+ "Voltha",
+}
+
+// String converts the enumerated data path agent value to its string equivalent
+func (t DataModelType) String() string {
+ return commonTypes[t-1]
+}
+
+// ModelProxyManager controls requests made to the miscellaneous data path agents
+type ModelProxyManager struct {
+ modelProxy map[string]*ModelProxy
+ clusterDataProxy *model.Proxy
+}
+
+func newModelProxyManager(cdProxy *model.Proxy) *ModelProxyManager {
+ var mgr ModelProxyManager
+ mgr.modelProxy = make(map[string]*ModelProxy)
+ mgr.clusterDataProxy = cdProxy
+ return &mgr
+}
+
+// GetDeviceType returns the device type associated to the provided id
+func (mpMgr *ModelProxyManager) GetVoltha(ctx context.Context) (*voltha.Voltha, error) {
+ log.Debug("GetVoltha")
+
+ var agent *ModelProxy
+ var exists bool
+
+ if agent, exists = mpMgr.modelProxy[Voltha.String()]; !exists {
+ agent = newModelProxy("", mpMgr.clusterDataProxy)
+ mpMgr.modelProxy[Voltha.String()] = agent
+ }
+
+ if instance, _ := agent.Get(); instance != nil {
+ return instance.(*voltha.Voltha), nil
+ }
+
+ return &voltha.Voltha{}, status.Errorf(codes.NotFound, "no-voltha-instance")
+}
+
+// ListCoreInstances returns all the core instances known to the system
+func (mpMgr *ModelProxyManager) ListCoreInstances(ctx context.Context) (*voltha.CoreInstances, error) {
+ log.Debug("ListCoreInstances")
+
+ // TODO: Need to retrieve the list of registered cores
+
+ return &voltha.CoreInstances{}, status.Errorf(codes.NotFound, "no-core-instances")
+}
+
+// GetCoreInstance returns the core instance associated to the provided id
+func (mpMgr *ModelProxyManager) GetCoreInstance(ctx context.Context, id string) (*voltha.CoreInstance, error) {
+ log.Debugw("GetCoreInstance", log.Fields{"id": id})
+
+ // TODO: Need to retrieve the list of registered cores
+
+ return &voltha.CoreInstance{}, status.Errorf(codes.NotFound, "core-instance-%s", id)
+}
+
+// ListAdapters returns all the device types known to the system
+func (mpMgr *ModelProxyManager) ListAdapters(ctx context.Context) (*voltha.Adapters, error) {
+ log.Debug("ListAdapters")
+
+ var agent *ModelProxy
+ var exists bool
+
+ if agent, exists = mpMgr.modelProxy[Adapters.String()]; !exists {
+ agent = newModelProxy("adapters", mpMgr.clusterDataProxy)
+ mpMgr.modelProxy[Adapters.String()] = agent
+ }
+
+ adapters := &voltha.Adapters{}
+ if items, _ := agent.Get(); items != nil {
+ for _, item := range items.([]interface{}) {
+ adapters.Items = append(adapters.Items, item.(*voltha.Adapter))
+ }
+ log.Debugw("retrieved-adapters", log.Fields{"adapters": adapters})
+ return adapters, nil
+ }
+
+ return adapters, status.Errorf(codes.NotFound, "no-adapters")
+}
+
+// ListDeviceTypes returns all the device types known to the system
+func (mpMgr *ModelProxyManager) ListDeviceTypes(ctx context.Context) (*voltha.DeviceTypes, error) {
+ log.Debug("ListDeviceTypes")
+
+ var agent *ModelProxy
+ var exists bool
+
+ if agent, exists = mpMgr.modelProxy[DeviceTypes.String()]; !exists {
+ agent = newModelProxy("device_types", mpMgr.clusterDataProxy)
+ mpMgr.modelProxy[DeviceTypes.String()] = agent
+ }
+
+ deviceTypes := &voltha.DeviceTypes{}
+ if items, _ := agent.Get(); items != nil {
+ for _, item := range items.([]interface{}) {
+ deviceTypes.Items = append(deviceTypes.Items, item.(*voltha.DeviceType))
+ }
+ return deviceTypes, nil
+ }
+
+ return deviceTypes, status.Errorf(codes.NotFound, "no-device-types")
+}
+
+// GetDeviceType returns the device type associated to the provided id
+func (mpMgr *ModelProxyManager) GetDeviceType(ctx context.Context, id string) (*voltha.DeviceType, error) {
+ log.Debugw("GetDeviceType", log.Fields{"id": id})
+
+ var agent *ModelProxy
+ var exists bool
+
+ if agent, exists = mpMgr.modelProxy[DeviceTypes.String()]; !exists {
+ agent = newModelProxy("device_types", mpMgr.clusterDataProxy)
+ mpMgr.modelProxy[DeviceTypes.String()] = agent
+ }
+
+ if deviceType, _ := agent.Get(id); deviceType != nil {
+ return deviceType.(*voltha.DeviceType), nil
+ }
+
+ return &voltha.DeviceType{}, status.Errorf(codes.NotFound, "device-type-%s", id)
+}
+
+// ListDeviceGroups returns all the device groups known to the system
+func (mpMgr *ModelProxyManager) ListDeviceGroups(ctx context.Context) (*voltha.DeviceGroups, error) {
+ log.Debug("ListDeviceGroups")
+
+ var agent *ModelProxy
+ var exists bool
+
+ if agent, exists = mpMgr.modelProxy[DeviceGroups.String()]; !exists {
+ agent = newModelProxy("device_groups", mpMgr.clusterDataProxy)
+ mpMgr.modelProxy[DeviceGroups.String()] = agent
+ }
+
+ deviceGroups := &voltha.DeviceGroups{}
+ if items, _ := agent.Get(); items != nil {
+ for _, item := range items.([]interface{}) {
+ deviceGroups.Items = append(deviceGroups.Items, item.(*voltha.DeviceGroup))
+ }
+ return deviceGroups, nil
+ }
+
+ return deviceGroups, status.Errorf(codes.NotFound, "no-device-groups")
+}
+
+// GetDeviceGroup returns the device group associated to the provided id
+func (mpMgr *ModelProxyManager) GetDeviceGroup(ctx context.Context, id string) (*voltha.DeviceGroup, error) {
+ log.Debugw("GetDeviceGroup", log.Fields{"id": id})
+
+ var agent *ModelProxy
+ var exists bool
+
+ if agent, exists = mpMgr.modelProxy[DeviceGroups.String()]; !exists {
+ agent = newModelProxy("device_groups", mpMgr.clusterDataProxy)
+ mpMgr.modelProxy[DeviceGroups.String()] = agent
+ }
+
+ if deviceGroup, _ := agent.Get(id); deviceGroup != nil {
+ return deviceGroup.(*voltha.DeviceGroup), nil
+ }
+
+ return &voltha.DeviceGroup{}, status.Errorf(codes.NotFound, "device-group-%s", id)
+}
+
+// ListAlarmFilters returns all the alarm filters known to the system
+func (mpMgr *ModelProxyManager) ListAlarmFilters(ctx context.Context) (*voltha.AlarmFilters, error) {
+ log.Debug("ListAlarmFilters")
+
+ var agent *ModelProxy
+ var exists bool
+
+ if agent, exists = mpMgr.modelProxy[AlarmFilters.String()]; !exists {
+ agent = newModelProxy("alarm_filters", mpMgr.clusterDataProxy)
+ mpMgr.modelProxy[AlarmFilters.String()] = agent
+ }
+
+ alarmFilters := &voltha.AlarmFilters{}
+ if items, _ := agent.Get(); items != nil {
+ for _, item := range items.([]interface{}) {
+ alarmFilters.Filters = append(alarmFilters.Filters, item.(*voltha.AlarmFilter))
+ }
+ return alarmFilters, nil
+ }
+
+ return alarmFilters, status.Errorf(codes.NotFound, "no-alarm-filters")
+}
+
+// GetAlarmFilter returns the alarm filter associated to the provided id
+func (mpMgr *ModelProxyManager) GetAlarmFilter(ctx context.Context, id string) (*voltha.AlarmFilter, error) {
+ log.Debugw("GetAlarmFilter", log.Fields{"id": id})
+
+ var agent *ModelProxy
+ var exists bool
+
+ if agent, exists = mpMgr.modelProxy[AlarmFilters.String()]; !exists {
+ agent = newModelProxy("alarm_filters", mpMgr.clusterDataProxy)
+ mpMgr.modelProxy[AlarmFilters.String()] = agent
+ }
+
+ if alarmFilter, _ := agent.Get(id); alarmFilter != nil {
+ return alarmFilter.(*voltha.AlarmFilter), nil
+ }
+ return &voltha.AlarmFilter{}, status.Errorf(codes.NotFound, "alarm-filter-%s", id)
+}
diff --git a/ro_core/main.go b/ro_core/main.go
new file mode 100644
index 0000000..c86cd45
--- /dev/null
+++ b/ro_core/main.go
@@ -0,0 +1,210 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package main
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ grpcserver "github.com/opencord/voltha-go/common/grpc"
+ "github.com/opencord/voltha-go/common/log"
+ "github.com/opencord/voltha-go/db/kvstore"
+ ic "github.com/opencord/voltha-go/protos/inter_container"
+ "github.com/opencord/voltha-go/ro_core/config"
+ c "github.com/opencord/voltha-go/ro_core/core"
+ "os"
+ "os/signal"
+ "strconv"
+ "syscall"
+ "time"
+)
+
+type roCore struct {
+ kvClient kvstore.Client
+ config *config.ROCoreFlags
+ halted bool
+ exitChannel chan int
+ grpcServer *grpcserver.GrpcServer
+ core *c.Core
+ //For test
+ receiverChannels []<-chan *ic.InterContainerMessage
+}
+
+func init() {
+ log.AddPackage(log.JSON, log.DebugLevel, nil)
+}
+
+func newKVClient(storeType string, address string, timeout int) (kvstore.Client, error) {
+
+ log.Infow("kv-store-type", log.Fields{"store": storeType})
+ switch storeType {
+ case "consul":
+ return kvstore.NewConsulClient(address, timeout)
+ case "etcd":
+ return kvstore.NewEtcdClient(address, timeout)
+ }
+ return nil, errors.New("unsupported-kv-store")
+}
+
+func newROCore(cf *config.ROCoreFlags) *roCore {
+ var roCore roCore
+ roCore.config = cf
+ roCore.halted = false
+ roCore.exitChannel = make(chan int, 1)
+ roCore.receiverChannels = make([]<-chan *ic.InterContainerMessage, 0)
+ return &roCore
+}
+
+func (ro *roCore) setKVClient() error {
+ addr := ro.config.KVStoreHost + ":" + strconv.Itoa(ro.config.KVStorePort)
+ client, err := newKVClient(ro.config.KVStoreType, addr, ro.config.KVStoreTimeout)
+ if err != nil {
+ ro.kvClient = nil
+ log.Error(err)
+ return err
+ }
+ ro.kvClient = client
+ return nil
+}
+
+func toString(value interface{}) (string, error) {
+ switch t := value.(type) {
+ case []byte:
+ return string(value.([]byte)), nil
+ case string:
+ return value.(string), nil
+ default:
+ return "", fmt.Errorf("unexpected-type-%T", t)
+ }
+}
+
+func (ro *roCore) start(ctx context.Context) {
+ log.Info("Starting RW Core components")
+
+ // Setup KV Client
+ log.Debugw("create-kv-client", log.Fields{"kvstore": ro.config.KVStoreType})
+ ro.setKVClient()
+
+ // Create the core service
+ ro.core = c.NewCore(ro.config.InstanceID, ro.config, ro.kvClient)
+
+ // start the core
+ ro.core.Start(ctx)
+}
+
+func (ro *roCore) stop() {
+ // Stop leadership tracking
+ ro.halted = true
+
+ // send exit signal
+ ro.exitChannel <- 0
+
+ // Cleanup - applies only if we had a kvClient
+ if ro.kvClient != nil {
+ // Release all reservations
+ if err := ro.kvClient.ReleaseAllReservations(); err != nil {
+ log.Infow("fail-to-release-all-reservations", log.Fields{"error": err})
+ }
+ // Close the DB connection
+ ro.kvClient.Close()
+ }
+
+ ro.core.Stop(nil)
+}
+
+func waitForExit() int {
+ signalChannel := make(chan os.Signal, 1)
+ signal.Notify(signalChannel,
+ syscall.SIGHUP,
+ syscall.SIGINT,
+ syscall.SIGTERM,
+ syscall.SIGQUIT)
+
+ exitChannel := make(chan int)
+
+ go func() {
+ s := <-signalChannel
+ switch s {
+ case syscall.SIGHUP,
+ syscall.SIGINT,
+ syscall.SIGTERM,
+ syscall.SIGQUIT:
+ log.Infow("closing-signal-received", log.Fields{"signal": s})
+ exitChannel <- 0
+ default:
+ log.Infow("unexpected-signal-received", log.Fields{"signal": s})
+ exitChannel <- 1
+ }
+ }()
+
+ code := <-exitChannel
+ return code
+}
+
+func printBanner() {
+ fmt.Println(" ")
+ fmt.Println(" ______ ______ ")
+ fmt.Println("| _ \\ \\ / / ___|___ _ __ ___ ")
+ fmt.Println("| |_) \\ \\ /\\ / / | / _ \\| '__/ _ \\ ")
+ fmt.Println("| _ < \\ V V /| |__| (_) | | | __/ ")
+ fmt.Println("|_| \\_\\ \\_/\\_/ \\____\\___/|_| \\___| ")
+ fmt.Println(" ")
+}
+
+func main() {
+ start := time.Now()
+
+ cf := config.NewROCoreFlags()
+ cf.ParseCommandArguments()
+
+ //// Setup logging
+
+ //Setup default logger - applies for packages that do not have specific logger set
+ if _, err := log.SetDefaultLogger(log.JSON, cf.LogLevel, log.Fields{"instanceId": cf.InstanceID}); err != nil {
+ log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
+ }
+
+ // Update all loggers (provisionned via init) with a common field
+ if err := log.UpdateAllLoggers(log.Fields{"instanceId": cf.InstanceID}); err != nil {
+ log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
+ }
+
+ log.SetPackageLogLevel("github.com/opencord/voltha-go/ro_core/core", log.DebugLevel)
+
+ defer log.CleanUp()
+
+ // Print banner if specified
+ if cf.Banner {
+ printBanner()
+ }
+
+ log.Infow("ro-core-config", log.Fields{"config": *cf})
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ ro := newROCore(cf)
+ go ro.start(ctx)
+
+ code := waitForExit()
+ log.Infow("received-a-closing-signal", log.Fields{"code": code})
+
+ // Cleanup before leaving
+ ro.stop()
+
+ elapsed := time.Since(start)
+ log.Infow("ro-core-run-time", log.Fields{"core": ro.config.InstanceID, "time": elapsed / time.Second})
+}