blob: dd64ab90a8b00cc9249200271796df607bf3de33 [file] [log] [blame]
/*
* Copyright 2018-present Open Networking Foundation
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package device
import (
"context"
"errors"
"sync"
"time"
"github.com/golang/protobuf/ptypes/empty"
"github.com/opencord/voltha-go/db/model"
"github.com/opencord/voltha-go/rw_core/core/adapter"
"github.com/opencord/voltha-go/rw_core/core/device/event"
"github.com/opencord/voltha-go/rw_core/core/device/remote"
"github.com/opencord/voltha-go/rw_core/core/device/state"
"github.com/opencord/voltha-go/rw_core/utils"
"github.com/opencord/voltha-lib-go/v4/pkg/events"
"github.com/opencord/voltha-lib-go/v4/pkg/kafka"
"github.com/opencord/voltha-lib-go/v4/pkg/log"
"github.com/opencord/voltha-protos/v4/go/common"
ic "github.com/opencord/voltha-protos/v4/go/inter_container"
"github.com/opencord/voltha-protos/v4/go/openflow_13"
ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
"github.com/opencord/voltha-protos/v4/go/voltha"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// Manager represent device manager attributes
type Manager struct {
deviceAgents sync.Map
rootDevices map[string]bool
lockRootDeviceMap sync.RWMutex
adapterProxy *remote.AdapterProxy
*event.Agent
adapterMgr *adapter.Manager
logicalDeviceMgr *LogicalManager
kafkaICProxy kafka.InterContainerProxy
stateTransitions *state.TransitionMap
dbPath *model.Path
dProxy *model.Proxy
coreInstanceID string
defaultTimeout time.Duration
devicesLoadingLock sync.RWMutex
deviceLoadingInProgress map[string][]chan int
}
//NewManagers creates the Manager and the Logical Manager.
func NewManagers(dbPath *model.Path, adapterMgr *adapter.Manager, kmp kafka.InterContainerProxy, endpointMgr kafka.EndpointManager, coreTopic, coreInstanceID string, defaultCoreTimeout time.Duration, eventProxy *events.EventProxy, stackID string) (*Manager, *LogicalManager) {
deviceMgr := &Manager{
rootDevices: make(map[string]bool),
kafkaICProxy: kmp,
adapterProxy: remote.NewAdapterProxy(kmp, coreTopic, endpointMgr),
coreInstanceID: coreInstanceID,
dbPath: dbPath,
dProxy: dbPath.Proxy("devices"),
adapterMgr: adapterMgr,
defaultTimeout: defaultCoreTimeout,
Agent: event.NewAgent(eventProxy, coreInstanceID, stackID),
deviceLoadingInProgress: make(map[string][]chan int),
}
deviceMgr.stateTransitions = state.NewTransitionMap(deviceMgr)
logicalDeviceMgr := &LogicalManager{
Manager: event.NewManager(eventProxy, coreInstanceID, stackID),
deviceMgr: deviceMgr,
kafkaICProxy: kmp,
dbPath: dbPath,
ldProxy: dbPath.Proxy("logical_devices"),
defaultTimeout: defaultCoreTimeout,
logicalDeviceLoadingInProgress: make(map[string][]chan int),
}
deviceMgr.logicalDeviceMgr = logicalDeviceMgr
adapterMgr.SetAdapterRestartedCallback(deviceMgr.adapterRestarted)
return deviceMgr, logicalDeviceMgr
}
func (dMgr *Manager) addDeviceAgentToMap(agent *Agent) {
if _, exist := dMgr.deviceAgents.Load(agent.deviceID); !exist {
dMgr.deviceAgents.Store(agent.deviceID, agent)
}
dMgr.lockRootDeviceMap.Lock()
defer dMgr.lockRootDeviceMap.Unlock()
dMgr.rootDevices[agent.deviceID] = agent.isRootDevice
}
func (dMgr *Manager) deleteDeviceAgentFromMap(agent *Agent) {
dMgr.deviceAgents.Delete(agent.deviceID)
dMgr.lockRootDeviceMap.Lock()
defer dMgr.lockRootDeviceMap.Unlock()
delete(dMgr.rootDevices, agent.deviceID)
}
// getDeviceAgent returns the agent managing the device. If the device is not in memory, it will loads it, if it exists
func (dMgr *Manager) getDeviceAgent(ctx context.Context, deviceID string) *Agent {
agent, ok := dMgr.deviceAgents.Load(deviceID)
if ok {
return agent.(*Agent)
}
// Try to load into memory - loading will also create the device agent and set the device ownership
err := dMgr.load(ctx, deviceID)
if err == nil {
agent, ok = dMgr.deviceAgents.Load(deviceID)
if !ok {
return nil
}
return agent.(*Agent)
}
//TODO: Change the return params to return an error as well
logger.Errorw(ctx, "loading-device-failed", log.Fields{"device-id": deviceID, "error": err})
return nil
}
// listDeviceIdsFromMap returns the list of device IDs that are in memory
func (dMgr *Manager) listDeviceIdsFromMap() *voltha.IDs {
result := &voltha.IDs{Items: make([]*voltha.ID, 0)}
dMgr.deviceAgents.Range(func(key, value interface{}) bool {
result.Items = append(result.Items, &voltha.ID{Id: key.(string)})
return true
})
return result
}
// CreateDevice creates a new parent device in the data model
func (dMgr *Manager) CreateDevice(ctx context.Context, device *voltha.Device) (*voltha.Device, error) {
if device.MacAddress == "" && device.GetHostAndPort() == "" {
logger.Errorf(ctx, "no-device-info-present")
return &voltha.Device{}, errors.New("no-device-info-present; MAC or HOSTIP&PORT")
}
ctx = utils.WithRPCMetadataContext(ctx, "CreateDevice")
logger.Debugw(ctx, "create-device", log.Fields{"device": *device})
deviceExist, err := dMgr.isParentDeviceExist(ctx, device)
if err != nil {
logger.Errorf(ctx, "failed-to-fetch-parent-device-info")
return nil, err
}
if deviceExist {
logger.Errorf(ctx, "device-is-pre-provisioned-already-with-same-ip-port-or-mac-address")
return nil, errors.New("device is already pre-provisioned")
}
logger.Debugw(ctx, "create-device", log.Fields{"device": device, "aproxy": dMgr.adapterProxy})
// Ensure this device is set as root
device.Root = true
// Create and start a device agent for that device
agent := newAgent(dMgr.adapterProxy, device, dMgr, dMgr.dbPath, dMgr.dProxy, dMgr.defaultTimeout)
device, err = agent.start(ctx, device)
if err != nil {
logger.Errorw(ctx, "fail-to-start-device", log.Fields{"device-id": agent.deviceID, "error": err})
return nil, err
}
dMgr.addDeviceAgentToMap(agent)
return device, nil
}
// EnableDevice activates a device by invoking the adopt_device API on the appropriate adapter
func (dMgr *Manager) EnableDevice(ctx context.Context, id *voltha.ID) (*empty.Empty, error) {
ctx = utils.WithRPCMetadataContext(ctx, "EnableDevice")
log.EnrichSpan(ctx, log.Fields{"device-id": id.Id})
logger.Debugw(ctx, "enable-device", log.Fields{"device-id": id.Id})
agent := dMgr.getDeviceAgent(ctx, id.Id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", id.Id)
}
return &empty.Empty{}, agent.enableDevice(ctx)
}
// DisableDevice disables a device along with any child device it may have
func (dMgr *Manager) DisableDevice(ctx context.Context, id *voltha.ID) (*empty.Empty, error) {
ctx = utils.WithRPCMetadataContext(ctx, "DisableDevice")
log.EnrichSpan(ctx, log.Fields{"device-id": id.Id})
logger.Debugw(ctx, "disable-device", log.Fields{"device-id": id.Id})
agent := dMgr.getDeviceAgent(ctx, id.Id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", id.Id)
}
return &empty.Empty{}, agent.disableDevice(ctx)
}
//RebootDevice invoked the reboot API to the corresponding adapter
func (dMgr *Manager) RebootDevice(ctx context.Context, id *voltha.ID) (*empty.Empty, error) {
ctx = utils.WithRPCMetadataContext(ctx, "RebootDevice")
log.EnrichSpan(ctx, log.Fields{"device-id": id.Id})
logger.Debugw(ctx, "reboot-device", log.Fields{"device-id": id.Id})
agent := dMgr.getDeviceAgent(ctx, id.Id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", id.Id)
}
return &empty.Empty{}, agent.rebootDevice(ctx)
}
// DeleteDevice removes a device from the data model
func (dMgr *Manager) DeleteDevice(ctx context.Context, id *voltha.ID) (*empty.Empty, error) {
ctx = utils.WithRPCMetadataContext(ctx, "DeleteDevice")
log.EnrichSpan(ctx, log.Fields{"device-id": id.Id})
logger.Debugw(ctx, "delete-device", log.Fields{"device-id": id.Id})
agent := dMgr.getDeviceAgent(ctx, id.Id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", id.Id)
}
return &empty.Empty{}, agent.deleteDevice(ctx)
}
// ForceDeleteDevice removes a device from the data model forcefully without successfully waiting for the adapters.
func (dMgr *Manager) ForceDeleteDevice(ctx context.Context, id *voltha.ID) (*empty.Empty, error) {
ctx = utils.WithRPCMetadataContext(ctx, "ForceDeleteDevice")
log.EnrichSpan(ctx, log.Fields{"device-id": id.Id})
logger.Debugw(ctx, "force-delete-device", log.Fields{"device-id": id.Id})
agent := dMgr.getDeviceAgent(ctx, id.Id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", id.Id)
}
return &empty.Empty{}, agent.deleteDeviceForce(ctx)
}
// GetDevicePort returns the port details for a specific device port entry
func (dMgr *Manager) GetDevicePort(ctx context.Context, deviceID string, portID uint32) (*voltha.Port, error) {
logger.Debugw(ctx, "get-device-port", log.Fields{"device-id": deviceID})
agent := dMgr.getDeviceAgent(ctx, deviceID)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "device-%s", deviceID)
}
return agent.getDevicePort(portID)
}
// ListDevicePorts returns the ports details for a specific device entry
func (dMgr *Manager) ListDevicePorts(ctx context.Context, id *voltha.ID) (*voltha.Ports, error) {
ctx = utils.WithRPCMetadataContext(ctx, "ListDevicePorts")
log.EnrichSpan(ctx, log.Fields{"device-id": id.Id})
logger.Debugw(ctx, "list-device-ports", log.Fields{"device-id": id.Id})
agent := dMgr.getDeviceAgent(ctx, id.Id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "device-%s", id.Id)
}
ports := agent.listDevicePorts()
ctr, ret := 0, make([]*voltha.Port, len(ports))
for _, port := range ports {
ret[ctr] = port
ctr++
}
return &voltha.Ports{Items: ret}, nil
}
// ListDeviceFlows returns the flow details for a specific device entry
func (dMgr *Manager) ListDeviceFlows(ctx context.Context, id *voltha.ID) (*ofp.Flows, error) {
ctx = utils.WithRPCMetadataContext(ctx, "ListDeviceFlows")
log.EnrichSpan(ctx, log.Fields{"device-id": id.Id})
logger.Debugw(ctx, "list-device-flows", log.Fields{"device-id": id.Id})
agent := dMgr.getDeviceAgent(ctx, id.Id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "device-%s", id.Id)
}
flows := agent.listDeviceFlows()
ctr, ret := 0, make([]*ofp.OfpFlowStats, len(flows))
for _, flow := range flows {
ret[ctr] = flow
ctr++
}
return &openflow_13.Flows{Items: ret}, nil
}
// ListDeviceFlowGroups returns the flow group details for a specific device entry
func (dMgr *Manager) ListDeviceFlowGroups(ctx context.Context, id *voltha.ID) (*voltha.FlowGroups, error) {
ctx = utils.WithRPCMetadataContext(ctx, "ListDeviceFlowGroups")
log.EnrichSpan(ctx, log.Fields{"device-id": id.Id})
logger.Debugw(ctx, "list-device-flow-groups", log.Fields{"device-id": id.Id})
agent := dMgr.getDeviceAgent(ctx, id.Id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "device-%s", id.Id)
}
groups := agent.listDeviceGroups()
ctr, ret := 0, make([]*openflow_13.OfpGroupEntry, len(groups))
for _, group := range groups {
ret[ctr] = group
ctr++
}
return &voltha.FlowGroups{Items: ret}, nil
}
// stopManagingDevice stops the management of the device as well as any of its reference device and logical device.
// This function is called only in the Core that does not own this device. In the Core that owns this device then a
// deletion deletion also includes removal of any reference of this device.
func (dMgr *Manager) stopManagingDevice(ctx context.Context, id string) {
logger.Infow(ctx, "stop-managing-device", log.Fields{"device-id": id})
if dMgr.IsDeviceInCache(id) { // Proceed only if an agent is present for this device
if device, err := dMgr.getDeviceReadOnly(ctx, id); err == nil && device.Root {
// stop managing the logical device
_ = dMgr.logicalDeviceMgr.stopManagingLogicalDeviceWithDeviceID(ctx, id)
}
if agent := dMgr.getDeviceAgent(ctx, id); agent != nil {
if err := agent.stop(ctx); err != nil {
logger.Warnw(ctx, "unable-to-stop-device-agent", log.Fields{"device-id": agent.deviceID, "error": err})
}
dMgr.deleteDeviceAgentFromMap(agent)
}
}
}
// RunPostDeviceDelete removes any reference of this device
func (dMgr *Manager) RunPostDeviceDelete(ctx context.Context, cDevice *voltha.Device) error {
logger.Infow(ctx, "run-post-device-delete", log.Fields{"device-id": cDevice.Id})
dMgr.stopManagingDevice(ctx, cDevice.Id)
return nil
}
// GetDevice exists primarily to implement the gRPC interface.
// Internal functions should call getDeviceReadOnly instead.
func (dMgr *Manager) GetDevice(ctx context.Context, id *voltha.ID) (*voltha.Device, error) {
ctx = utils.WithRPCMetadataContext(ctx, "GetDevice")
log.EnrichSpan(ctx, log.Fields{"device-id": id.Id})
return dMgr.getDeviceReadOnly(ctx, id.Id)
}
// getDeviceReadOnly will returns a device, either from memory or from the dB, if present
func (dMgr *Manager) getDeviceReadOnly(ctx context.Context, id string) (*voltha.Device, error) {
logger.Debugw(ctx, "get-device-read-only", log.Fields{"device-id": id})
if agent := dMgr.getDeviceAgent(ctx, id); agent != nil {
return agent.getDeviceReadOnly(ctx)
}
return nil, status.Errorf(codes.NotFound, "%s", id)
}
func (dMgr *Manager) listDevicePorts(ctx context.Context, id string) (map[uint32]*voltha.Port, error) {
logger.Debugw(ctx, "list-device-ports", log.Fields{"device-id": id})
agent := dMgr.getDeviceAgent(ctx, id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", id)
}
return agent.listDevicePorts(), nil
}
// GetChildDevice will return a device, either from memory or from the dB, if present
func (dMgr *Manager) GetChildDevice(ctx context.Context, parentDeviceID string, serialNumber string, onuID int64, parentPortNo int64) (*voltha.Device, error) {
logger.Debugw(ctx, "get-child-device", log.Fields{"parent-device-id": parentDeviceID, "serialNumber": serialNumber,
"parent-port-no": parentPortNo, "onu-id": onuID})
parentDevicePorts, err := dMgr.listDevicePorts(ctx, parentDeviceID)
if err != nil {
return nil, status.Errorf(codes.Aborted, "%s", err.Error())
}
childDeviceIds := dMgr.getAllChildDeviceIds(ctx, parentDevicePorts)
if len(childDeviceIds) == 0 {
logger.Debugw(ctx, "no-child-devices", log.Fields{"parent-device-id": parentDeviceID, "serial-number": serialNumber, "onu-id": onuID})
return nil, status.Errorf(codes.NotFound, "%s", parentDeviceID)
}
var foundChildDevice *voltha.Device
for childDeviceID := range childDeviceIds {
var found bool
if searchDevice, err := dMgr.getDeviceReadOnly(ctx, childDeviceID); err == nil {
foundOnuID := false
if searchDevice.ProxyAddress.OnuId == uint32(onuID) {
if searchDevice.ParentPortNo == uint32(parentPortNo) {
logger.Debugw(ctx, "found-child-by-onuid", log.Fields{"parent-device-id": parentDeviceID, "onu-id": onuID})
foundOnuID = true
}
}
foundSerialNumber := false
if searchDevice.SerialNumber == serialNumber {
logger.Debugw(ctx, "found-child-by-serial-number", log.Fields{"parent-device-id": parentDeviceID, "serial-number": serialNumber})
foundSerialNumber = true
}
// if both onuId and serialNumber are provided both must be true for the device to be found
// otherwise whichever one found a match is good enough
if onuID > 0 && serialNumber != "" {
found = foundOnuID && foundSerialNumber
} else {
found = foundOnuID || foundSerialNumber
}
if found {
foundChildDevice = searchDevice
break
}
}
}
if foundChildDevice != nil {
logger.Debugw(ctx, "child-device-found", log.Fields{"parent-device-id": parentDeviceID, "found-child-device": foundChildDevice})
return foundChildDevice, nil
}
logger.Debugw(ctx, "child-device-not-found", log.Fields{"parent-device-id": parentDeviceID,
"serial-number": serialNumber, "onu-id": onuID, "parent-port-no": parentPortNo})
return nil, status.Errorf(codes.NotFound, "%s", parentDeviceID)
}
// GetChildDeviceWithProxyAddress will return a device based on proxy address
func (dMgr *Manager) GetChildDeviceWithProxyAddress(ctx context.Context, proxyAddress *voltha.Device_ProxyAddress) (*voltha.Device, error) {
logger.Debugw(ctx, "get-child-device-with-proxy-address", log.Fields{"proxy-address": proxyAddress})
parentDevicePorts, err := dMgr.listDevicePorts(ctx, proxyAddress.DeviceId)
if err != nil {
return nil, status.Errorf(codes.Aborted, "%s", err.Error())
}
childDeviceIds := dMgr.getAllChildDeviceIds(ctx, parentDevicePorts)
if len(childDeviceIds) == 0 {
logger.Debugw(ctx, "no-child-devices", log.Fields{"parent-device-id": proxyAddress.DeviceId})
return nil, status.Errorf(codes.NotFound, "%s", proxyAddress)
}
var foundChildDevice *voltha.Device
for childDeviceID := range childDeviceIds {
if searchDevice, err := dMgr.getDeviceReadOnly(ctx, childDeviceID); err == nil {
if searchDevice.ProxyAddress == proxyAddress {
foundChildDevice = searchDevice
break
}
}
}
if foundChildDevice != nil {
logger.Debugw(ctx, "child-device-found", log.Fields{"proxy-address": proxyAddress})
return foundChildDevice, nil
}
logger.Warnw(ctx, "child-device-not-found", log.Fields{"proxy-address": proxyAddress})
return nil, status.Errorf(codes.NotFound, "%s", proxyAddress)
}
// IsDeviceInCache returns true if device is found in the map
func (dMgr *Manager) IsDeviceInCache(id string) bool {
_, exist := dMgr.deviceAgents.Load(id)
return exist
}
// ListDevices retrieves the latest devices from the data model
func (dMgr *Manager) ListDevices(ctx context.Context, _ *empty.Empty) (*voltha.Devices, error) {
ctx = utils.WithRPCMetadataContext(ctx, "ListDevices")
logger.Debug(ctx, "list-devices")
result := &voltha.Devices{}
dMgr.deviceAgents.Range(func(key, value interface{}) bool {
result.Items = append(result.Items, value.(*Agent).device)
return true
})
logger.Debugw(ctx, "list-devices-end", log.Fields{"len": len(result.Items)})
return result, nil
}
//isParentDeviceExist checks whether device is already preprovisioned.
func (dMgr *Manager) isParentDeviceExist(ctx context.Context, newDevice *voltha.Device) (bool, error) {
hostPort := newDevice.GetHostAndPort()
var devices []*voltha.Device
if err := dMgr.dProxy.List(ctx, &devices); err != nil {
logger.Errorw(ctx, "failed-to-list-devices-from-cluster-data-proxy", log.Fields{"error": err})
return false, err
}
for _, device := range devices {
if !device.Root {
continue
}
if hostPort != "" && hostPort == device.GetHostAndPort() {
return true, nil
}
if newDevice.MacAddress != "" && newDevice.MacAddress == device.MacAddress {
return true, nil
}
}
return false, nil
}
//getDeviceFromModelretrieves the device data from the model.
func (dMgr *Manager) getDeviceFromModel(ctx context.Context, deviceID string) (*voltha.Device, error) {
device := &voltha.Device{}
if have, err := dMgr.dProxy.Get(ctx, deviceID, device); err != nil {
logger.Errorw(ctx, "failed-to-get-device-info-from-cluster-proxy", log.Fields{"error": err})
return nil, err
} else if !have {
return nil, status.Error(codes.NotFound, deviceID)
}
return device, nil
}
// loadDevice loads the deviceID in memory, if not present
func (dMgr *Manager) loadDevice(ctx context.Context, deviceID string) (*Agent, error) {
if deviceID == "" {
return nil, status.Error(codes.InvalidArgument, "deviceId empty")
}
var err error
var device *voltha.Device
dMgr.devicesLoadingLock.Lock()
if _, exist := dMgr.deviceLoadingInProgress[deviceID]; !exist {
if !dMgr.IsDeviceInCache(deviceID) {
dMgr.deviceLoadingInProgress[deviceID] = []chan int{make(chan int, 1)}
dMgr.devicesLoadingLock.Unlock()
// Proceed with the loading only if the device exist in the Model (could have been deleted)
if device, err = dMgr.getDeviceFromModel(ctx, deviceID); err == nil {
logger.Debugw(ctx, "loading-device", log.Fields{"device-id": deviceID})
agent := newAgent(dMgr.adapterProxy, device, dMgr, dMgr.dbPath, dMgr.dProxy, dMgr.defaultTimeout)
if _, err = agent.start(ctx, nil); err != nil {
logger.Warnw(ctx, "failure-loading-device", log.Fields{"device-id": deviceID, "error": err})
} else {
dMgr.addDeviceAgentToMap(agent)
}
} else {
logger.Debugw(ctx, "device-is-not-in-model", log.Fields{"device-id": deviceID})
}
// announce completion of task to any number of waiting channels
dMgr.devicesLoadingLock.Lock()
if v, ok := dMgr.deviceLoadingInProgress[deviceID]; ok {
for _, ch := range v {
close(ch)
}
delete(dMgr.deviceLoadingInProgress, deviceID)
}
dMgr.devicesLoadingLock.Unlock()
} else {
dMgr.devicesLoadingLock.Unlock()
}
} else {
ch := make(chan int, 1)
dMgr.deviceLoadingInProgress[deviceID] = append(dMgr.deviceLoadingInProgress[deviceID], ch)
dMgr.devicesLoadingLock.Unlock()
// Wait for the channel to be closed, implying the process loading this device is done.
<-ch
}
if agent, ok := dMgr.deviceAgents.Load(deviceID); ok {
return agent.(*Agent), nil
}
return nil, status.Errorf(codes.Aborted, "Error loading device %s", deviceID)
}
// loadRootDeviceParentAndChildren loads the children and parents of a root device in memory
func (dMgr *Manager) loadRootDeviceParentAndChildren(ctx context.Context, device *voltha.Device, devicePorts map[uint32]*voltha.Port) error {
logger.Debugw(ctx, "loading-parent-and-children", log.Fields{"device-id": device.Id})
if device.Root {
// Scenario A
if device.ParentId != "" {
// Load logical device if needed.
if err := dMgr.logicalDeviceMgr.load(ctx, device.ParentId); err != nil {
logger.Warnw(ctx, "failure-loading-logical-device", log.Fields{"logical-device-id": device.ParentId})
}
} else {
logger.Debugw(ctx, "no-parent-to-load", log.Fields{"device-id": device.Id})
}
// Load all child devices, if needed
childDeviceIds := dMgr.getAllChildDeviceIds(ctx, devicePorts)
for childDeviceID := range childDeviceIds {
if _, err := dMgr.loadDevice(ctx, childDeviceID); err != nil {
logger.Warnw(ctx, "failure-loading-device", log.Fields{"device-id": childDeviceID, "error": err})
return err
}
}
logger.Debugw(ctx, "loaded-children", log.Fields{"device-id": device.Id, "num-children": len(childDeviceIds)})
}
return nil
}
// load loads the deviceId in memory, if not present, and also loads its accompanying parents and children. Loading
// in memory is for improved performance. It is not imperative that a device needs to be in memory when a request
// acting on the device is received by the core. In such a scenario, the Core will load the device in memory first
// and the proceed with the request.
func (dMgr *Manager) load(ctx context.Context, deviceID string) error {
logger.Debug(ctx, "load...")
// First load the device - this may fail in case the device was deleted intentionally by the other core
var dAgent *Agent
var err error
if dAgent, err = dMgr.loadDevice(ctx, deviceID); err != nil {
return err
}
// Get the loaded device details
device, err := dAgent.getDeviceReadOnly(ctx)
if err != nil {
return err
}
// If the device is in Pre-provisioning or getting deleted state stop here
if device.AdminState == voltha.AdminState_PREPROVISIONED || dAgent.isDeletionInProgress() {
return nil
}
// Now we face two scenarios
if device.Root {
devicePorts := dAgent.listDevicePorts()
// Load all children as well as the parent of this device (logical_device)
if err := dMgr.loadRootDeviceParentAndChildren(ctx, device, devicePorts); err != nil {
logger.Warnw(ctx, "failure-loading-device-parent-and-children", log.Fields{"device-id": deviceID})
return err
}
logger.Debugw(ctx, "successfully-loaded-parent-and-children", log.Fields{"device-id": deviceID})
} else {
// Scenario B - use the parentId of that device (root device) to trigger the loading
if device.ParentId != "" {
return dMgr.load(ctx, device.ParentId)
}
}
return nil
}
// ListDeviceIds retrieves the latest device IDs information from the data model (memory data only)
func (dMgr *Manager) ListDeviceIds(ctx context.Context, _ *empty.Empty) (*voltha.IDs, error) {
ctx = utils.WithRPCMetadataContext(ctx, "ListDeviceIds")
logger.Debug(ctx, "list-device-ids")
// Report only device IDs that are in the device agent map
return dMgr.listDeviceIdsFromMap(), nil
}
// ReconcileDevices is a request to a voltha core to update its list of managed devices. This will
// trigger loading the devices along with their children and parent in memory
func (dMgr *Manager) ReconcileDevices(ctx context.Context, ids *voltha.IDs) (*empty.Empty, error) {
ctx = utils.WithRPCMetadataContext(ctx, "ReconcileDevices")
logger.Debugw(ctx, "reconcile-devices", log.Fields{"num-devices": len(ids.Items)})
if ids != nil && len(ids.Items) != 0 {
toReconcile := len(ids.Items)
reconciled := 0
var err error
for _, id := range ids.Items {
if err = dMgr.load(ctx, id.Id); err != nil {
logger.Warnw(ctx, "failure-reconciling-device", log.Fields{"device-id": id.Id, "error": err})
} else {
reconciled++
}
}
if toReconcile != reconciled {
return nil, status.Errorf(codes.DataLoss, "less-device-reconciled-than-requested:%d/%d", reconciled, toReconcile)
}
} else {
return nil, status.Errorf(codes.InvalidArgument, "empty-list-of-ids")
}
return &empty.Empty{}, nil
}
// isOkToReconcile validates whether a device is in the correct status to be reconciled
func (dMgr *Manager) isOkToReconcile(ctx context.Context, device *voltha.Device) bool {
if device == nil {
return false
}
if agent := dMgr.getDeviceAgent(ctx, device.Id); agent != nil {
return device.AdminState != voltha.AdminState_PREPROVISIONED && (!agent.isDeletionInProgress())
}
return false
}
// adapterRestarted is invoked whenever an adapter is restarted
func (dMgr *Manager) adapterRestarted(ctx context.Context, adapter *voltha.Adapter) error {
logger.Debugw(ctx, "adapter-restarted", log.Fields{"adapter-id": adapter.Id, "vendor": adapter.Vendor,
"current-replica": adapter.CurrentReplica, "total-replicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint})
// Let's reconcile the device managed by this Core only
if len(dMgr.rootDevices) == 0 {
logger.Debugw(ctx, "nothing-to-reconcile", log.Fields{"adapter-id": adapter.Id})
return nil
}
responses := make([]utils.Response, 0)
for rootDeviceID := range dMgr.rootDevices {
dAgent := dMgr.getDeviceAgent(ctx, rootDeviceID)
if dAgent == nil {
continue
}
logger.Debugw(ctx, "checking-adapter-type", log.Fields{"agentType": dAgent.deviceType, "adapter-type": adapter.Type})
if dAgent.deviceType == adapter.Type {
rootDevice, _ := dAgent.getDeviceReadOnly(ctx)
if rootDevice == nil {
continue
}
isDeviceOwnedByService, err := dMgr.adapterProxy.IsDeviceOwnedByService(ctx, rootDeviceID, adapter.Type, adapter.CurrentReplica)
if err != nil {
logger.Warnw(ctx, "is-device-owned-by-service", log.Fields{"error": err, "root-device-id": rootDeviceID, "adapter-type": adapter.Type, "replica-number": adapter.CurrentReplica})
continue
}
if isDeviceOwnedByService {
if dMgr.isOkToReconcile(ctx, rootDevice) {
logger.Debugw(ctx, "reconciling-root-device", log.Fields{"rootId": rootDevice.Id})
responses = append(responses, dMgr.sendReconcileDeviceRequest(ctx, rootDevice))
} else {
logger.Debugw(ctx, "not-reconciling-root-device", log.Fields{"rootId": rootDevice.Id, "state": rootDevice.AdminState})
}
} else { // Should we be reconciling the root's children instead?
rootDevicePorts, _ := dMgr.listDevicePorts(ctx, rootDeviceID)
childManagedByAdapter:
for _, port := range rootDevicePorts {
for _, peer := range port.Peers {
if childDevice, _ := dMgr.getDeviceFromModel(ctx, peer.DeviceId); childDevice != nil {
isDeviceOwnedByService, err := dMgr.adapterProxy.IsDeviceOwnedByService(ctx, childDevice.Id, adapter.Type, adapter.CurrentReplica)
if err != nil {
logger.Warnw(ctx, "is-device-owned-by-service", log.Fields{"error": err, "child-device-id": childDevice.Id, "adapter-type": adapter.Type, "replica-number": adapter.CurrentReplica})
}
if isDeviceOwnedByService {
if dMgr.isOkToReconcile(ctx, childDevice) {
logger.Debugw(ctx, "reconciling-child-device", log.Fields{"child-device-id": childDevice.Id})
responses = append(responses, dMgr.sendReconcileDeviceRequest(ctx, childDevice))
} else {
logger.Debugw(ctx, "not-reconciling-child-device", log.Fields{"child-device-id": childDevice.Id, "state": childDevice.AdminState})
}
} else {
// All child devices under a parent device are typically managed by the same adapter type.
// Therefore we only need to check whether the first device we retrieved is managed by that adapter
break childManagedByAdapter
}
}
}
}
}
}
}
if len(responses) > 0 {
// Wait for completion
if res := utils.WaitForNilOrErrorResponses(dMgr.defaultTimeout, responses...); res != nil {
return status.Errorf(codes.Aborted, "errors-%s", res)
}
} else {
logger.Debugw(ctx, "no-managed-device-to-reconcile", log.Fields{"adapter-id": adapter.Id})
}
return nil
}
func (dMgr *Manager) sendReconcileDeviceRequest(ctx context.Context, device *voltha.Device) utils.Response {
// Send a reconcile request to the adapter. Since this Core may not be managing this device then there is no
// point of creating a device agent (if the device is not being managed by this Core) before sending the request
// to the adapter. We will therefore bypass the adapter adapter and send the request directly to the adapter via
// the adapter proxy.
response := utils.NewResponse()
ch, err := dMgr.adapterProxy.ReconcileDevice(ctx, device)
if err != nil {
response.Error(err)
}
// Wait for adapter response in its own routine
go func() {
resp, ok := <-ch
if !ok {
response.Error(status.Errorf(codes.Aborted, "channel-closed-device: %s", device.Id))
} else if resp.Err != nil {
response.Error(resp.Err)
}
response.Done()
}()
return response
}
func (dMgr *Manager) ReconcileChildDevices(ctx context.Context, parentDeviceID string) error {
if parentDevicePorts, err := dMgr.listDevicePorts(ctx, parentDeviceID); err == nil {
responses := make([]utils.Response, 0)
for _, port := range parentDevicePorts {
for _, peer := range port.Peers {
if childDevice, err := dMgr.getDeviceFromModel(ctx, peer.DeviceId); err == nil {
responses = append(responses, dMgr.sendReconcileDeviceRequest(ctx, childDevice))
}
}
}
// Wait for completion
if res := utils.WaitForNilOrErrorResponses(dMgr.defaultTimeout, responses...); res != nil {
return status.Errorf(codes.Aborted, "errors-%s", res)
}
}
return nil
}
func (dMgr *Manager) UpdateDeviceUsingAdapterData(ctx context.Context, device *voltha.Device) error {
logger.Debugw(ctx, "update-device-using-adapter-data", log.Fields{"device-id": device.Id, "device": device})
if agent := dMgr.getDeviceAgent(ctx, device.Id); agent != nil {
return agent.updateDeviceUsingAdapterData(ctx, device)
}
return status.Errorf(codes.NotFound, "%s", device.Id)
}
func (dMgr *Manager) addPeerPort(ctx context.Context, deviceID string, port *voltha.Port) error {
meAsPeer := &voltha.Port_PeerPort{DeviceId: deviceID, PortNo: port.PortNo}
for _, peerPort := range port.Peers {
if agent := dMgr.getDeviceAgent(ctx, peerPort.DeviceId); agent != nil {
if err := agent.addPeerPort(ctx, meAsPeer); err != nil {
return err
}
}
}
// Notify the logical device manager to setup a logical port, if needed. If the added port is an NNI or UNI
// then a logical port will be added to the logical device and the device route generated. If the port is a
// PON port then only the device graph will be generated.
device, err := dMgr.getDeviceReadOnly(ctx, deviceID)
if err != nil {
return err
}
ports, err := dMgr.listDevicePorts(ctx, deviceID)
if err != nil {
return err
}
subCtx := utils.WithSpanAndRPCMetadataFromContext(ctx)
if err = dMgr.logicalDeviceMgr.updateLogicalPort(subCtx, device, ports, port); err != nil {
return err
}
return nil
}
func (dMgr *Manager) AddPort(ctx context.Context, deviceID string, port *voltha.Port) error {
agent := dMgr.getDeviceAgent(ctx, deviceID)
if agent != nil {
if err := agent.addPort(ctx, port); err != nil {
return err
}
// Setup peer ports in its own routine
go func() {
subCtx := utils.WithSpanAndRPCMetadataFromContext(ctx)
if err := dMgr.addPeerPort(subCtx, deviceID, port); err != nil {
logger.Errorw(ctx, "unable-to-add-peer-port", log.Fields{"error": err, "device-id": deviceID})
}
}()
return nil
}
return status.Errorf(codes.NotFound, "%s", deviceID)
}
func (dMgr *Manager) addFlowsAndGroups(ctx context.Context, deviceID string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
logger.Debugw(ctx, "add-flows-and-groups", log.Fields{"device-id": deviceID, "groups:": groups, "flow-metadata": flowMetadata})
if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
return agent.addFlowsAndGroups(ctx, flows, groups, flowMetadata)
}
return status.Errorf(codes.NotFound, "%s", deviceID)
}
// deleteParentFlows removes flows from the parent device based on specific attributes
func (dMgr *Manager) deleteParentFlows(ctx context.Context, deviceID string, uniPort uint32, metadata *voltha.FlowMetadata) error {
logger.Debugw(ctx, "delete-parent-flows", log.Fields{"device-id": deviceID, "uni-port": uniPort, "metadata": metadata})
if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
if !agent.isRootDevice {
return status.Errorf(codes.FailedPrecondition, "not-a-parent-device-%s", deviceID)
}
return agent.filterOutFlows(ctx, uniPort, metadata)
}
return status.Errorf(codes.NotFound, "%s", deviceID)
}
func (dMgr *Manager) deleteFlowsAndGroups(ctx context.Context, deviceID string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
logger.Debugw(ctx, "delete-flows-and-groups", log.Fields{"device-id": deviceID})
if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
return agent.deleteFlowsAndGroups(ctx, flows, groups, flowMetadata)
}
return status.Errorf(codes.NotFound, "%s", deviceID)
}
func (dMgr *Manager) updateFlowsAndGroups(ctx context.Context, deviceID string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
logger.Debugw(ctx, "update-flows-and-groups", log.Fields{"device-id": deviceID})
if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
return agent.updateFlowsAndGroups(ctx, flows, groups, flowMetadata)
}
return status.Errorf(codes.NotFound, "%s", deviceID)
}
// UpdateDevicePmConfigs updates the PM configs. This is executed when the northbound gRPC API is invoked, typically
// following a user action
func (dMgr *Manager) UpdateDevicePmConfigs(ctx context.Context, configs *voltha.PmConfigs) (*empty.Empty, error) {
ctx = utils.WithRPCMetadataContext(ctx, "UpdateDevicePmConfigs")
log.EnrichSpan(ctx, log.Fields{"device-id": configs.Id})
if configs.Id == "" {
return nil, status.Error(codes.FailedPrecondition, "invalid-device-Id")
}
agent := dMgr.getDeviceAgent(ctx, configs.Id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", configs.Id)
}
return &empty.Empty{}, agent.updatePmConfigs(ctx, configs)
}
// InitPmConfigs initialize the pm configs as defined by the adapter.
func (dMgr *Manager) InitPmConfigs(ctx context.Context, deviceID string, pmConfigs *voltha.PmConfigs) error {
if pmConfigs.Id == "" {
return status.Errorf(codes.FailedPrecondition, "invalid-device-Id")
}
if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
return agent.initPmConfigs(ctx, pmConfigs)
}
return status.Errorf(codes.NotFound, "%s", deviceID)
}
// ListDevicePmConfigs returns pm configs of device
func (dMgr *Manager) ListDevicePmConfigs(ctx context.Context, id *voltha.ID) (*voltha.PmConfigs, error) {
ctx = utils.WithRPCMetadataContext(ctx, "ListDevicePmConfigs")
log.EnrichSpan(ctx, log.Fields{"device-id": id.Id})
agent := dMgr.getDeviceAgent(ctx, id.Id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", id.Id)
}
return agent.listPmConfigs(ctx)
}
func (dMgr *Manager) getSwitchCapability(ctx context.Context, deviceID string) (*ic.SwitchCapability, error) {
logger.Debugw(ctx, "get-switch-capability", log.Fields{"device-id": deviceID})
if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
return agent.getSwitchCapability(ctx)
}
return nil, status.Errorf(codes.NotFound, "%s", deviceID)
}
func (dMgr *Manager) GetPorts(ctx context.Context, deviceID string, portType voltha.Port_PortType) (*voltha.Ports, error) {
logger.Debugw(ctx, "get-ports", log.Fields{"device-id": deviceID, "port-type": portType})
agent := dMgr.getDeviceAgent(ctx, deviceID)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", deviceID)
}
return agent.getPorts(ctx, portType), nil
}
func (dMgr *Manager) UpdateDeviceStatus(ctx context.Context, deviceID string, operStatus voltha.OperStatus_Types, connStatus voltha.ConnectStatus_Types) error {
logger.Debugw(ctx, "update-device-status", log.Fields{"device-id": deviceID, "oper-status": operStatus, "conn-status": connStatus})
if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
return agent.updateDeviceStatus(ctx, operStatus, connStatus)
}
return status.Errorf(codes.NotFound, "%s", deviceID)
}
func (dMgr *Manager) UpdateChildrenStatus(ctx context.Context, deviceID string, operStatus voltha.OperStatus_Types, connStatus voltha.ConnectStatus_Types) error {
logger.Debugw(ctx, "update-children-status", log.Fields{"parent-device-id": deviceID, "oper-status": operStatus, "conn-status": connStatus})
parentDevicePorts, err := dMgr.listDevicePorts(ctx, deviceID)
if err != nil {
return status.Errorf(codes.Aborted, "%s", err.Error())
}
for childDeviceID := range dMgr.getAllChildDeviceIds(ctx, parentDevicePorts) {
if agent := dMgr.getDeviceAgent(ctx, childDeviceID); agent != nil {
if err = agent.updateDeviceStatus(ctx, operStatus, connStatus); err != nil {
return status.Errorf(codes.Aborted, "childDevice:%s, error:%s", childDeviceID, err.Error())
}
}
}
return nil
}
func (dMgr *Manager) UpdatePortState(ctx context.Context, deviceID string, portType voltha.Port_PortType, portNo uint32, operStatus voltha.OperStatus_Types) error {
logger.Debugw(ctx, "update-port-state", log.Fields{"device-id": deviceID, "port-type": portType, "port-no": portNo, "oper-status": operStatus})
if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
if err := agent.updatePortState(ctx, portType, portNo, operStatus); err != nil {
logger.Errorw(ctx, "updating-port-state-failed", log.Fields{"device-id": deviceID, "port-no": portNo, "error": err})
return err
}
// Notify the logical device manager to change the port state
// Do this for NNI and UNIs only. PON ports are not known by logical device
if portType == voltha.Port_ETHERNET_NNI || portType == voltha.Port_ETHERNET_UNI {
go func() {
subCtx := utils.WithSpanAndRPCMetadataFromContext(ctx)
err := dMgr.logicalDeviceMgr.updatePortState(subCtx, deviceID, portNo, operStatus)
if err != nil {
// While we want to handle (catch) and log when
// an update to a port was not able to be
// propagated to the logical port, we can report
// it as a warning and not an error because it
// doesn't stop or modify processing.
// TODO: VOL-2707
logger.Warnw(ctx, "unable-to-update-logical-port-state", log.Fields{"error": err})
}
}()
}
return nil
}
return status.Errorf(codes.NotFound, "%s", deviceID)
}
func (dMgr *Manager) DeleteAllPorts(ctx context.Context, deviceID string) error {
logger.Debugw(ctx, "delete-all-ports", log.Fields{"device-id": deviceID})
if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
if err := agent.deleteAllPorts(ctx); err != nil {
return err
}
// Notify the logical device manager to remove all logical ports, if needed.
// At this stage the device itself may gave been deleted already at a DeleteAllPorts
// typically is part of a device deletion phase.
if device, err := dMgr.getDeviceReadOnly(ctx, deviceID); err == nil {
go func() {
subCtx := utils.WithSpanAndRPCMetadataFromContext(ctx)
if err := dMgr.logicalDeviceMgr.deleteAllLogicalPorts(subCtx, device); err != nil {
logger.Errorw(ctx, "unable-to-delete-logical-ports", log.Fields{"error": err})
}
}()
} else {
logger.Warnw(ctx, "failed-to-retrieve-device", log.Fields{"device-id": deviceID})
return err
}
return nil
}
return status.Errorf(codes.NotFound, "%s", deviceID)
}
//UpdatePortsState updates all ports on the device
func (dMgr *Manager) UpdatePortsState(ctx context.Context, deviceID string, portTypeFilter uint32, state voltha.OperStatus_Types) error {
logger.Debugw(ctx, "update-ports-state", log.Fields{"device-id": deviceID})
agent := dMgr.getDeviceAgent(ctx, deviceID)
if agent == nil {
return status.Errorf(codes.NotFound, "%s", deviceID)
}
if state != voltha.OperStatus_ACTIVE && state != voltha.OperStatus_UNKNOWN {
return status.Error(codes.Unimplemented, "state-change-not-implemented")
}
if err := agent.updatePortsOperState(ctx, portTypeFilter, state); err != nil {
logger.Warnw(ctx, "update-ports-state-failed", log.Fields{"device-id": deviceID, "error": err})
return err
}
return nil
}
func (dMgr *Manager) ChildDeviceDetected(ctx context.Context, parentDeviceID string, parentPortNo int64, deviceType string,
channelID int64, vendorID string, serialNumber string, onuID int64) (*voltha.Device, error) {
logger.Debugw(ctx, "child-device-detected", log.Fields{"parent-device-id": parentDeviceID, "parent-port-no": parentPortNo, "device-type": deviceType, "channel-id": channelID, "vendor-id": vendorID, "serial-number": serialNumber, "onu-id": onuID})
if deviceType == "" && vendorID != "" {
logger.Debug(ctx, "device-type-is-nil-fetching-device-type")
deviceTypes, err := dMgr.adapterMgr.ListDeviceTypes(ctx, nil)
if err != nil {
return nil, err
}
OLoop:
for _, dType := range deviceTypes.Items {
for _, v := range dType.VendorIds {
if v == vendorID {
deviceType = dType.Adapter
break OLoop
}
}
}
}
//if no match found for the vendorid,report adapter with the custom error message
if deviceType == "" {
logger.Errorw(ctx, "failed-to-fetch-adapter-name ", log.Fields{"vendor-id": vendorID})
return nil, status.Errorf(codes.NotFound, "%s", vendorID)
}
// Create the ONU device
childDevice := &voltha.Device{}
childDevice.Type = deviceType
childDevice.ParentId = parentDeviceID
childDevice.ParentPortNo = uint32(parentPortNo)
childDevice.VendorId = vendorID
childDevice.SerialNumber = serialNumber
childDevice.Root = false
// Get parent device type
pAgent := dMgr.getDeviceAgent(ctx, parentDeviceID)
if pAgent == nil {
return nil, status.Errorf(codes.NotFound, "%s", parentDeviceID)
}
if pAgent.deviceType == "" {
return nil, status.Errorf(codes.FailedPrecondition, "device Type not set %s", parentDeviceID)
}
if device, err := dMgr.GetChildDevice(ctx, parentDeviceID, serialNumber, onuID, parentPortNo); err == nil {
logger.Warnw(ctx, "child-device-exists", log.Fields{"parent-device-id": parentDeviceID, "serial-number": serialNumber})
return device, status.Errorf(codes.AlreadyExists, "%s", serialNumber)
}
childDevice.ProxyAddress = &voltha.Device_ProxyAddress{DeviceId: parentDeviceID, DeviceType: pAgent.deviceType, ChannelId: uint32(channelID), OnuId: uint32(onuID)}
// Create and start a device agent for that device
agent := newAgent(dMgr.adapterProxy, childDevice, dMgr, dMgr.dbPath, dMgr.dProxy, dMgr.defaultTimeout)
insertedChildDevice, err := agent.start(ctx, childDevice)
if err != nil {
logger.Errorw(ctx, "error-starting-child-device", log.Fields{"parent-device-id": childDevice.ParentId, "child-device-id": agent.deviceID, "error": err})
return nil, err
}
dMgr.addDeviceAgentToMap(agent)
// Activate the child device
if agent = dMgr.getDeviceAgent(ctx, agent.deviceID); agent != nil {
go func() {
subCtx := utils.WithFromTopicMetadataFromContext(utils.WithSpanAndRPCMetadataFromContext(ctx), ctx)
err := agent.enableDevice(subCtx)
if err != nil {
logger.Errorw(ctx, "unable-to-enable-device", log.Fields{"error": err})
}
}()
}
return insertedChildDevice, nil
}
func (dMgr *Manager) packetOut(ctx context.Context, deviceID string, outPort uint32, packet *ofp.OfpPacketOut) error {
logger.Debugw(ctx, "packet-out", log.Fields{"device-id": deviceID, "out-port": outPort})
if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
return agent.packetOut(ctx, outPort, packet)
}
return status.Errorf(codes.NotFound, "%s", deviceID)
}
// PacketIn receives packet from adapter
func (dMgr *Manager) PacketIn(ctx context.Context, deviceID string, port uint32, transactionID string, packet []byte) error {
logger.Debugw(ctx, "packet-in", log.Fields{"device-id": deviceID, "port": port})
// Get the logical device Id based on the deviceId
var device *voltha.Device
var err error
if device, err = dMgr.getDeviceReadOnly(ctx, deviceID); err != nil {
logger.Errorw(ctx, "device-not-found", log.Fields{"device-id": deviceID})
return err
}
if !device.Root {
logger.Errorw(ctx, "device-not-root", log.Fields{"device-id": deviceID})
return status.Errorf(codes.FailedPrecondition, "%s", deviceID)
}
if err := dMgr.logicalDeviceMgr.packetIn(ctx, device.ParentId, port, transactionID, packet); err != nil {
return err
}
return nil
}
func (dMgr *Manager) setParentID(ctx context.Context, device *voltha.Device, parentID string) error {
logger.Debugw(ctx, "set-parent-id", log.Fields{"device-id": device.Id, "parent-id": parentID})
if agent := dMgr.getDeviceAgent(ctx, device.Id); agent != nil {
return agent.setParentID(ctx, device, parentID)
}
return status.Errorf(codes.NotFound, "%s", device.Id)
}
//
//CreateLogicalDevice creates logical device in core
func (dMgr *Manager) CreateLogicalDevice(ctx context.Context, cDevice *voltha.Device) error {
logger.Info(ctx, "create-logical-device")
// Verify whether the logical device has already been created
if cDevice.ParentId != "" {
logger.Debugw(ctx, "parent-device-already-exist", log.Fields{"device-id": cDevice.Id, "logical-device-id": cDevice.Id})
return nil
}
var err error
if _, err = dMgr.logicalDeviceMgr.createLogicalDevice(ctx, cDevice); err != nil {
logger.Warnw(ctx, "create-logical-device-error", log.Fields{"device": cDevice})
return err
}
return nil
}
// DeleteLogicalDevice deletes logical device from core
func (dMgr *Manager) DeleteLogicalDevice(ctx context.Context, cDevice *voltha.Device) error {
logger.Info(ctx, "delete-logical-device")
var err error
if err = dMgr.logicalDeviceMgr.deleteLogicalDevice(ctx, cDevice); err != nil {
logger.Warnw(ctx, "delete-logical-device-error", log.Fields{"device-id": cDevice.Id})
return err
}
// Remove the logical device Id from the parent device
logicalID := ""
dMgr.UpdateDeviceAttribute(ctx, cDevice.Id, "ParentId", logicalID)
return nil
}
// DeleteLogicalPorts removes the logical ports associated with that deviceId
func (dMgr *Manager) DeleteLogicalPorts(ctx context.Context, cDevice *voltha.Device) error {
logger.Debugw(ctx, "delete-all-logical-ports", log.Fields{"device-id": cDevice.Id})
if err := dMgr.logicalDeviceMgr.deleteLogicalPorts(ctx, cDevice.Id); err != nil {
// Just log the error. The logical device or port may already have been deleted before this callback is invoked.
logger.Warnw(ctx, "delete-logical-ports-error", log.Fields{"device-id": cDevice.Id, "error": err})
}
return nil
}
func (dMgr *Manager) getParentDevice(ctx context.Context, childDevice *voltha.Device) *voltha.Device {
// Sanity check
if childDevice.Root {
// childDevice is the parent device
return childDevice
}
parentDevice, _ := dMgr.getDeviceReadOnly(ctx, childDevice.ParentId)
return parentDevice
}
//ChildDevicesLost is invoked by an adapter to indicate that a parent device is in a state (Disabled) where it
//cannot manage the child devices. This will trigger the Core to disable all the child devices.
func (dMgr *Manager) ChildDevicesLost(ctx context.Context, parentDeviceID string) error {
logger.Debug(ctx, "child-devices-lost")
parentDevice, err := dMgr.getDeviceReadOnly(ctx, parentDeviceID)
if err != nil {
logger.Warnw(ctx, "failed-getting-device", log.Fields{"parent-device-id": parentDeviceID, "error": err})
return err
}
return dMgr.DisableAllChildDevices(ctx, parentDevice)
}
//ChildDevicesDetected is invoked by an adapter when child devices are found, typically after after a
// disable/enable sequence. This will trigger the Core to Enable all the child devices of that parent.
func (dMgr *Manager) ChildDevicesDetected(ctx context.Context, parentDeviceID string) error {
logger.Debug(ctx, "child-devices-detected")
parentDevicePorts, err := dMgr.listDevicePorts(ctx, parentDeviceID)
if err != nil {
logger.Warnw(ctx, "failed-getting-device", log.Fields{"device-id": parentDeviceID, "error": err})
return err
}
childDeviceIds := dMgr.getAllChildDeviceIds(ctx, parentDevicePorts)
if len(childDeviceIds) == 0 {
logger.Debugw(ctx, "no-child-device", log.Fields{"parent-device-id": parentDeviceID})
}
allChildEnableRequestSent := true
for childDeviceID := range childDeviceIds {
if agent := dMgr.getDeviceAgent(ctx, childDeviceID); agent != nil {
subCtx := utils.WithSpanAndRPCMetadataFromContext(ctx)
// Run the children re-registration in its own routine
go func(ctx context.Context) {
err = agent.enableDevice(ctx)
if err != nil {
logger.Errorw(ctx, "unable-to-enable-device", log.Fields{"error": err})
}
}(subCtx)
} else {
err = status.Errorf(codes.Unavailable, "no agent for child device %s", childDeviceID)
logger.Errorw(ctx, "no-child-device-agent", log.Fields{"parent-device-id": parentDeviceID, "child-id": childDeviceID})
allChildEnableRequestSent = false
}
}
if !allChildEnableRequestSent {
return err
}
return nil
}
/*
All the functions below are callback functions where they are invoked with the latest and previous data. We can
therefore use the data as is without trying to get the latest from the model.
*/
//DisableAllChildDevices is invoked as a callback when the parent device is disabled
func (dMgr *Manager) DisableAllChildDevices(ctx context.Context, parentCurrDevice *voltha.Device) error {
logger.Debug(ctx, "disable-all-child-devices")
ports, _ := dMgr.listDevicePorts(ctx, parentCurrDevice.Id)
for childDeviceID := range dMgr.getAllChildDeviceIds(ctx, ports) {
if agent := dMgr.getDeviceAgent(ctx, childDeviceID); agent != nil {
if err := agent.disableDevice(ctx); err != nil {
// Just log the error - this error happens only if the child device was already in deleted state.
logger.Errorw(ctx, "failure-disable-device", log.Fields{"device-id": childDeviceID, "error": err.Error()})
}
}
}
return nil
}
//DeleteAllChildDevices is invoked as a callback when the parent device is deleted
func (dMgr *Manager) DeleteAllChildDevices(ctx context.Context, parentCurrDevice *voltha.Device) error {
logger.Debug(ctx, "delete-all-child-devices")
force := false
// Get the parent device Transient state, if its FORCE_DELETED(go for force delete for child devices)
// So in cases when this handler is getting called other than DELETE operation, no force option would be used.
agent := dMgr.getDeviceAgent(ctx, parentCurrDevice.Id)
if agent == nil {
return status.Errorf(codes.NotFound, "%s", parentCurrDevice.Id)
}
force = agent.getTransientState() == voltha.DeviceTransientState_FORCE_DELETING
ports, _ := dMgr.listDevicePorts(ctx, parentCurrDevice.Id)
for childDeviceID := range dMgr.getAllChildDeviceIds(ctx, ports) {
if agent := dMgr.getDeviceAgent(ctx, childDeviceID); agent != nil {
if force {
if err := agent.deleteDeviceForce(ctx); err != nil {
logger.Warnw(ctx, "failure-delete-device-force", log.Fields{"device-id": childDeviceID,
"error": err.Error()})
}
} else {
if err := agent.deleteDevice(ctx); err != nil {
logger.Warnw(ctx, "failure-delete-device", log.Fields{"device-id": childDeviceID,
"error": err.Error()})
}
}
// No further action is required here. The deleteDevice will change the device state where the resulting
// callback will take care of cleaning the child device agent.
}
}
return nil
}
//DeleteAllLogicalPorts is invoked as a callback when the parent device's connection status moves to UNREACHABLE
func (dMgr *Manager) DeleteAllLogicalPorts(ctx context.Context, parentDevice *voltha.Device) error {
logger.Debugw(ctx, "delete-all-logical-ports", log.Fields{"parent-device-id": parentDevice.Id})
if err := dMgr.logicalDeviceMgr.deleteAllLogicalPorts(ctx, parentDevice); err != nil {
// Just log error as logical device may already have been deleted
logger.Warnw(ctx, "delete-all-logical-ports-fail", log.Fields{"parent-device-id": parentDevice.Id, "error": err})
}
return nil
}
//DeleteAllDeviceFlows is invoked as a callback when the parent device's connection status moves to UNREACHABLE
func (dMgr *Manager) DeleteAllDeviceFlows(ctx context.Context, parentDevice *voltha.Device) error {
logger.Debugw(ctx, "delete-all-device-flows", log.Fields{"parent-device-id": parentDevice.Id})
if agent := dMgr.getDeviceAgent(ctx, parentDevice.Id); agent != nil {
if err := agent.deleteAllFlows(ctx); err != nil {
logger.Errorw(ctx, "error-deleting-all-device-flows", log.Fields{"parent-device-id": parentDevice.Id})
return err
}
return nil
}
return status.Errorf(codes.NotFound, "%s", parentDevice.Id)
}
//getAllChildDeviceIds is a helper method to get all the child device IDs from the device passed as parameter
func (dMgr *Manager) getAllChildDeviceIds(ctx context.Context, parentDevicePorts map[uint32]*voltha.Port) map[string]struct{} {
logger.Debug(ctx, "get-all-child-device-ids")
childDeviceIds := make(map[string]struct{}, len(parentDevicePorts))
for _, port := range parentDevicePorts {
for _, peer := range port.Peers {
childDeviceIds[peer.DeviceId] = struct{}{}
}
}
logger.Debugw(ctx, "returning-getAllChildDeviceIds", log.Fields{"childDeviceIds": childDeviceIds})
return childDeviceIds
}
//GetAllChildDevices is a helper method to get all the child device IDs from the device passed as parameter
func (dMgr *Manager) GetAllChildDevices(ctx context.Context, parentDeviceID string) (*voltha.Devices, error) {
logger.Debugw(ctx, "get-all-child-devices", log.Fields{"parent-device-id": parentDeviceID})
if parentDevicePorts, err := dMgr.listDevicePorts(ctx, parentDeviceID); err == nil {
childDevices := make([]*voltha.Device, 0)
for deviceID := range dMgr.getAllChildDeviceIds(ctx, parentDevicePorts) {
if d, e := dMgr.getDeviceReadOnly(ctx, deviceID); e == nil && d != nil {
childDevices = append(childDevices, d)
}
}
return &voltha.Devices{Items: childDevices}, nil
}
return nil, status.Errorf(codes.NotFound, "%s", parentDeviceID)
}
// SetupUNILogicalPorts creates UNI ports on the logical device that represents a child UNI interface
func (dMgr *Manager) SetupUNILogicalPorts(ctx context.Context, cDevice *voltha.Device) error {
logger.Info(ctx, "setup-uni-logical-ports")
cDevicePorts, err := dMgr.listDevicePorts(ctx, cDevice.Id)
if err != nil {
return err
}
if err := dMgr.logicalDeviceMgr.setupUNILogicalPorts(ctx, cDevice, cDevicePorts); err != nil {
logger.Warnw(ctx, "setup-uni-logical-ports-error", log.Fields{"device": cDevice, "err": err})
return err
}
return nil
}
// convenience to avoid redefining
var operationFailureResp = &common.OperationResp{Code: voltha.OperationResp_OPERATION_FAILURE}
// DownloadImage execute an image download request
func (dMgr *Manager) DownloadImage(ctx context.Context, img *voltha.ImageDownload) (*common.OperationResp, error) {
ctx = utils.WithRPCMetadataContext(ctx, "DownloadImage")
log.EnrichSpan(ctx, log.Fields{"device-id": img.Id})
logger.Debugw(ctx, "download-image", log.Fields{"device-id": img.Id, "image-name": img.Name})
agent := dMgr.getDeviceAgent(ctx, img.Id)
if agent == nil {
return operationFailureResp, status.Errorf(codes.NotFound, "%s", img.Id)
}
resp, err := agent.downloadImage(ctx, img)
if err != nil {
return operationFailureResp, err
}
return resp, nil
}
// CancelImageDownload cancels image download request
func (dMgr *Manager) CancelImageDownload(ctx context.Context, img *voltha.ImageDownload) (*common.OperationResp, error) {
ctx = utils.WithRPCMetadataContext(ctx, "CancelImageDownload")
log.EnrichSpan(ctx, log.Fields{"device-id": img.Id})
logger.Debugw(ctx, "cancel-image-download", log.Fields{"device-id": img.Id, "image-name": img.Name})
agent := dMgr.getDeviceAgent(ctx, img.Id)
if agent == nil {
return operationFailureResp, status.Errorf(codes.NotFound, "%s", img.Id)
}
resp, err := agent.cancelImageDownload(ctx, img)
if err != nil {
return operationFailureResp, err
}
return resp, nil
}
// ActivateImageUpdate activates image update request
func (dMgr *Manager) ActivateImageUpdate(ctx context.Context, img *voltha.ImageDownload) (*common.OperationResp, error) {
ctx = utils.WithRPCMetadataContext(ctx, "ActivateImageUpdate")
log.EnrichSpan(ctx, log.Fields{"device-id": img.Id})
logger.Debugw(ctx, "activate-image-update", log.Fields{"device-id": img.Id, "image-name": img.Name})
agent := dMgr.getDeviceAgent(ctx, img.Id)
if agent == nil {
return operationFailureResp, status.Errorf(codes.NotFound, "%s", img.Id)
}
resp, err := agent.activateImage(ctx, img)
if err != nil {
return operationFailureResp, err
}
return resp, nil
}
// RevertImageUpdate reverts image update
func (dMgr *Manager) RevertImageUpdate(ctx context.Context, img *voltha.ImageDownload) (*common.OperationResp, error) {
ctx = utils.WithRPCMetadataContext(ctx, "RevertImageUpdate")
log.EnrichSpan(ctx, log.Fields{"device-id": img.Id})
logger.Debugw(ctx, "rever-image-update", log.Fields{"device-id": img.Id, "image-name": img.Name})
agent := dMgr.getDeviceAgent(ctx, img.Id)
if agent == nil {
return operationFailureResp, status.Errorf(codes.NotFound, "%s", img.Id)
}
resp, err := agent.revertImage(ctx, img)
if err != nil {
return operationFailureResp, err
}
return resp, nil
}
// convenience to avoid redefining
var imageDownloadFailureResp = &voltha.ImageDownload{DownloadState: voltha.ImageDownload_DOWNLOAD_UNKNOWN}
// GetImageDownloadStatus returns status of image download
func (dMgr *Manager) GetImageDownloadStatus(ctx context.Context, img *voltha.ImageDownload) (*voltha.ImageDownload, error) {
ctx = utils.WithRPCMetadataContext(ctx, "GetImageDownloadStatus")
log.EnrichSpan(ctx, log.Fields{"device-id": img.Id})
logger.Debugw(ctx, "get-image-download-status", log.Fields{"device-id": img.Id, "image-name": img.Name})
agent := dMgr.getDeviceAgent(ctx, img.Id)
if agent == nil {
return imageDownloadFailureResp, status.Errorf(codes.NotFound, "%s", img.Id)
}
resp, err := agent.getImageDownloadStatus(ctx, img)
if err != nil {
return imageDownloadFailureResp, err
}
return resp, nil
}
func (dMgr *Manager) UpdateImageDownload(ctx context.Context, deviceID string, img *voltha.ImageDownload) error {
ctx = utils.WithRPCMetadataContext(ctx, "UpdateImageDownload")
log.EnrichSpan(ctx, log.Fields{"device-id": img.Id})
logger.Debugw(ctx, "update-image-download", log.Fields{"device-id": img.Id, "image-name": img.Name})
if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
if err := agent.updateImageDownload(ctx, img); err != nil {
logger.Debugw(ctx, "update-image-download-failed", log.Fields{"err": err, "image-name": img.Name})
return err
}
} else {
return status.Errorf(codes.NotFound, "%s", img.Id)
}
return nil
}
// GetImageDownload returns image download
func (dMgr *Manager) GetImageDownload(ctx context.Context, img *voltha.ImageDownload) (*voltha.ImageDownload, error) {
ctx = utils.WithRPCMetadataContext(ctx, "GetImageDownload")
log.EnrichSpan(ctx, log.Fields{"device-id": img.Id})
logger.Debugw(ctx, "get-image-download", log.Fields{"device-id": img.Id, "image-name": img.Name})
agent := dMgr.getDeviceAgent(ctx, img.Id)
if agent == nil {
return imageDownloadFailureResp, status.Errorf(codes.NotFound, "%s", img.Id)
}
resp, err := agent.getImageDownload(ctx, img)
if err != nil {
return imageDownloadFailureResp, err
}
return resp, nil
}
// ListImageDownloads returns image downloads
func (dMgr *Manager) ListImageDownloads(ctx context.Context, id *voltha.ID) (*voltha.ImageDownloads, error) {
ctx = utils.WithRPCMetadataContext(ctx, "ListImageDownloads")
log.EnrichSpan(ctx, log.Fields{"device-id": id.Id})
logger.Debugw(ctx, "list-image-downloads", log.Fields{"device-id": id.Id})
agent := dMgr.getDeviceAgent(ctx, id.Id)
if agent == nil {
return &voltha.ImageDownloads{Items: []*voltha.ImageDownload{imageDownloadFailureResp}}, status.Errorf(codes.NotFound, "%s", id.Id)
}
resp, err := agent.listImageDownloads(ctx, id.Id)
if err != nil {
return &voltha.ImageDownloads{Items: []*voltha.ImageDownload{imageDownloadFailureResp}}, err
}
return resp, nil
}
// GetImages returns all images for a specific device entry
func (dMgr *Manager) GetImages(ctx context.Context, id *voltha.ID) (*voltha.Images, error) {
ctx = utils.WithRPCMetadataContext(ctx, "GetImages")
log.EnrichSpan(ctx, log.Fields{"device-id": id.Id})
logger.Debugw(ctx, "get-images", log.Fields{"device-id": id.Id})
device, err := dMgr.getDeviceReadOnly(ctx, id.Id)
if err != nil {
return nil, err
}
return device.Images, nil
}
func (dMgr *Manager) NotifyInvalidTransition(ctx context.Context, device *voltha.Device) error {
logger.Errorw(ctx, "notify-invalid-transition", log.Fields{
"device": device.Id,
"curr-admin-state": device.AdminState,
"curr-oper-state": device.OperStatus,
"curr-conn-state": device.ConnectStatus,
})
//TODO: notify over kafka?
return nil
}
// UpdateDeviceAttribute updates value of particular device attribute
func (dMgr *Manager) UpdateDeviceAttribute(ctx context.Context, deviceID string, attribute string, value interface{}) {
if agent, ok := dMgr.deviceAgents.Load(deviceID); ok {
agent.(*Agent).updateDeviceAttribute(ctx, attribute, value)
}
}
// GetParentDeviceID returns parent device id, either from memory or from the dB, if present
func (dMgr *Manager) GetParentDeviceID(ctx context.Context, deviceID string) string {
if device, _ := dMgr.getDeviceReadOnly(ctx, deviceID); device != nil {
logger.Infow(ctx, "get-parent-device-id", log.Fields{"device-id": device.Id, "parent-id": device.ParentId})
return device.ParentId
}
return ""
}
func (dMgr *Manager) SimulateAlarm(ctx context.Context, simulateReq *voltha.SimulateAlarmRequest) (*common.OperationResp, error) {
ctx = utils.WithRPCMetadataContext(ctx, "SimulateAlarm")
logger.Debugw(ctx, "simulate-alarm", log.Fields{"id": simulateReq.Id, "indicator": simulateReq.Indicator, "intf-id": simulateReq.IntfId,
"port-type-name": simulateReq.PortTypeName, "onu-device-id": simulateReq.OnuDeviceId, "inverse-bit-error-rate": simulateReq.InverseBitErrorRate,
"drift": simulateReq.Drift, "new-eqd": simulateReq.NewEqd, "onu-serial-number": simulateReq.OnuSerialNumber, "operation": simulateReq.Operation})
agent := dMgr.getDeviceAgent(ctx, simulateReq.Id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", simulateReq.Id)
}
if err := agent.simulateAlarm(ctx, simulateReq); err != nil {
return nil, err
}
return &common.OperationResp{Code: common.OperationResp_OPERATION_SUCCESS}, nil
}
func (dMgr *Manager) UpdateDeviceReason(ctx context.Context, deviceID string, reason string) error {
logger.Debugw(ctx, "update-device-reason", log.Fields{"device-id": deviceID, "reason": reason})
if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
return agent.updateDeviceReason(ctx, reason)
}
return status.Errorf(codes.NotFound, "%s", deviceID)
}
func (dMgr *Manager) EnablePort(ctx context.Context, port *voltha.Port) (*empty.Empty, error) {
ctx = utils.WithRPCMetadataContext(ctx, "EnablePort")
log.EnrichSpan(ctx, log.Fields{"device-id": port.DeviceId})
logger.Debugw(ctx, "enable-port", log.Fields{"device-id": port.DeviceId, "port-no": port.PortNo})
agent := dMgr.getDeviceAgent(ctx, port.DeviceId)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", port.DeviceId)
}
return &empty.Empty{}, agent.enablePort(ctx, port.PortNo)
}
func (dMgr *Manager) DisablePort(ctx context.Context, port *voltha.Port) (*empty.Empty, error) {
ctx = utils.WithRPCMetadataContext(ctx, "DisablePort")
log.EnrichSpan(ctx, log.Fields{"device-id": port.DeviceId})
logger.Debugw(ctx, "disable-port", log.Fields{"device-id": port.DeviceId, "port-no": port.PortNo})
agent := dMgr.getDeviceAgent(ctx, port.DeviceId)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", port.DeviceId)
}
return &empty.Empty{}, agent.disablePort(ctx, port.PortNo)
}
// ChildDeviceLost calls parent adapter to delete child device and all its references
func (dMgr *Manager) ChildDeviceLost(ctx context.Context, curr *voltha.Device) error {
logger.Debugw(ctx, "child-device-lost", log.Fields{"child-device-id": curr.Id, "parent-device-id": curr.ParentId})
if parentAgent := dMgr.getDeviceAgent(ctx, curr.ParentId); parentAgent != nil {
if err := parentAgent.ChildDeviceLost(ctx, curr); err != nil {
// Just log the message and let the remaining pipeline proceed.
logger.Warnw(ctx, "childDeviceLost", log.Fields{"child-device-id": curr.Id, "parent-device-id": curr.ParentId, "error": err})
}
}
// Do not return an error as parent device may also have been deleted. Let the remaining pipeline proceed.
return nil
}
func (dMgr *Manager) StartOmciTestAction(ctx context.Context, request *voltha.OmciTestRequest) (*voltha.TestResponse, error) {
ctx = utils.WithRPCMetadataContext(ctx, "StartOmciTestAction")
log.EnrichSpan(ctx, log.Fields{"device-id": request.Id})
logger.Debugw(ctx, "start-omci-test-action", log.Fields{"device-id": request.Id, "uuid": request.Uuid})
agent := dMgr.getDeviceAgent(ctx, request.Id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", request.Id)
}
return agent.startOmciTest(ctx, request)
}
func (dMgr *Manager) GetExtValue(ctx context.Context, value *voltha.ValueSpecifier) (*voltha.ReturnValues, error) {
ctx = utils.WithRPCMetadataContext(ctx, "GetExtValue")
log.EnrichSpan(ctx, log.Fields{"device-id": value.Id})
logger.Debugw(ctx, "get-ext-value", log.Fields{"onu-id": value.Id})
cDevice, err := dMgr.getDeviceReadOnly(ctx, value.Id)
if err != nil {
return nil, status.Errorf(codes.Aborted, "%s", err.Error())
}
pDevice, err := dMgr.getDeviceReadOnly(ctx, cDevice.ParentId)
if err != nil {
return nil, status.Errorf(codes.Aborted, "%s", err.Error())
}
if agent := dMgr.getDeviceAgent(ctx, cDevice.ParentId); agent != nil {
resp, err := agent.getExtValue(ctx, pDevice, cDevice, value)
if err != nil {
return nil, err
}
logger.Debugw(ctx, "get-ext-value-result", log.Fields{"result": resp})
return resp, nil
}
return nil, status.Errorf(codes.NotFound, "%s", value.Id)
}
// SetExtValue set some given configs or value
func (dMgr *Manager) SetExtValue(ctx context.Context, value *voltha.ValueSet) (*empty.Empty, error) {
ctx = utils.WithRPCMetadataContext(ctx, "SetExtValue")
logger.Debugw(ctx, "set-ext-value", log.Fields{"onu-id": value.Id})
device, err := dMgr.getDeviceReadOnly(ctx, value.Id)
if err != nil {
return nil, status.Errorf(codes.Aborted, "%s", err.Error())
}
if agent := dMgr.getDeviceAgent(ctx, device.Id); agent != nil {
resp, err := agent.setExtValue(ctx, device, value)
if err != nil {
return nil, err
}
logger.Debugw(ctx, "set-ext-value-result", log.Fields{"result": resp})
return resp, nil
}
return nil, status.Errorf(codes.NotFound, "%s", value.Id)
}
func (dMgr *Manager) SendRPCEvent(ctx context.Context, id string, rpcEvent *voltha.RPCEvent,
category voltha.EventCategory_Types, subCategory *voltha.EventSubCategory_Types, raisedTs int64) {
//TODO Instead of directly sending to the kafka bus, queue the message and send it asynchronously
dMgr.Agent.SendRPCEvent(ctx, id, rpcEvent, category, subCategory, raisedTs)
}
func (dMgr *Manager) GetTransientState(ctx context.Context, id string) (voltha.DeviceTransientState_Types, error) {
agent := dMgr.getDeviceAgent(ctx, id)
if agent == nil {
return voltha.DeviceTransientState_NONE, status.Errorf(codes.NotFound, "%s", id)
}
return agent.getTransientState(), nil
}
func (dMgr *Manager) DownloadImageToDevice(ctx context.Context, request *voltha.DeviceImageDownloadRequest) (*voltha.DeviceImageResponse, error) {
if err := dMgr.validateImageDownloadRequest(request); err != nil {
return nil, err
}
ctx = utils.WithRPCMetadataContext(ctx, "DownloadImageToDevice")
respCh := make(chan []*voltha.DeviceImageState, len(request.GetDeviceId()))
downloadReq := &voltha.DeviceImageDownloadRequest{
Image: request.Image,
ActivateOnSuccess: request.ActivateOnSuccess,
CommitOnSuccess: request.CommitOnSuccess,
}
for index, deviceID := range request.DeviceId {
//slice-out only single deviceID from the request
downloadReq.DeviceId = request.DeviceId[index : index+1]
go func(deviceID string, req *voltha.DeviceImageDownloadRequest, ch chan []*voltha.DeviceImageState) {
agent := dMgr.getDeviceAgent(ctx, deviceID)
if agent == nil {
logger.Errorw(ctx, "Not-found", log.Fields{"device-id": deviceID})
ch <- nil
return
}
resp, err := agent.downloadImageToDevice(ctx, req)
if err != nil {
logger.Errorw(ctx, "download-image-to-device-failed", log.Fields{"device-id": deviceID, "error": err})
ch <- nil
return
}
err = dMgr.validateDeviceImageResponse(resp)
if err != nil {
logger.Errorw(ctx, "download-image-to-device-failed", log.Fields{"device-id": deviceID, "error": err})
ch <- nil
return
}
ch <- resp.GetDeviceImageStates()
}(deviceID.GetId(), downloadReq, respCh)
}
return dMgr.waitForAllResponses(ctx, "download-image-to-device", respCh, len(request.GetDeviceId()))
}
func (dMgr *Manager) GetImageStatus(ctx context.Context, request *voltha.DeviceImageRequest) (*voltha.DeviceImageResponse, error) {
if err := dMgr.validateImageRequest(request); err != nil {
return nil, err
}
ctx = utils.WithRPCMetadataContext(ctx, "GetImageStatus")
imageStatusReq := &voltha.DeviceImageRequest{
Version: request.Version,
CommitOnSuccess: request.CommitOnSuccess,
}
respCh := make(chan []*voltha.DeviceImageState, len(request.GetDeviceId()))
for index, deviceID := range request.DeviceId {
//slice-out only single deviceID from the request
imageStatusReq.DeviceId = request.DeviceId[index : index+1]
go func(deviceID string, req *voltha.DeviceImageRequest, ch chan []*voltha.DeviceImageState) {
agent := dMgr.getDeviceAgent(ctx, deviceID)
if agent == nil {
logger.Errorw(ctx, "Not-found", log.Fields{"device-id": deviceID})
ch <- nil
return
}
resp, err := agent.getImageStatus(ctx, req)
if err != nil {
logger.Errorw(ctx, "get-image-status-failed", log.Fields{"device-id": deviceID, "error": err})
ch <- nil
return
}
err = dMgr.validateDeviceImageResponse(resp)
if err != nil {
logger.Errorw(ctx, "get-image-status-failed", log.Fields{"device-id": deviceID, "error": err})
ch <- nil
return
}
ch <- resp.GetDeviceImageStates()
}(deviceID.GetId(), imageStatusReq, respCh)
}
return dMgr.waitForAllResponses(ctx, "get-image-status", respCh, len(request.GetDeviceId()))
}
func (dMgr *Manager) AbortImageUpgradeToDevice(ctx context.Context, request *voltha.DeviceImageRequest) (*voltha.DeviceImageResponse, error) {
if err := dMgr.validateImageRequest(request); err != nil {
return nil, err
}
ctx = utils.WithRPCMetadataContext(ctx, "AbortImageUpgradeToDevice")
respCh := make(chan []*voltha.DeviceImageState, len(request.GetDeviceId()))
abortImageReq := &voltha.DeviceImageRequest{
Version: request.Version,
CommitOnSuccess: request.CommitOnSuccess,
}
for index, deviceID := range request.DeviceId {
//slice-out only single deviceID from the request
abortImageReq.DeviceId = request.DeviceId[index : index+1]
go func(deviceID string, req *voltha.DeviceImageRequest, ch chan []*voltha.DeviceImageState) {
agent := dMgr.getDeviceAgent(ctx, deviceID)
if agent == nil {
logger.Errorw(ctx, "Not-found", log.Fields{"device-id": deviceID})
ch <- nil
return
}
resp, err := agent.abortImageUpgradeToDevice(ctx, req)
if err != nil {
logger.Errorw(ctx, "abort-image-upgrade-to-device-failed", log.Fields{"device-id": deviceID, "error": err})
ch <- nil
return
}
err = dMgr.validateDeviceImageResponse(resp)
if err != nil {
logger.Errorw(ctx, "abort-image-upgrade-to-device-failed", log.Fields{"device-id": deviceID, "error": err})
ch <- nil
return
}
ch <- resp.GetDeviceImageStates()
}(deviceID.GetId(), abortImageReq, respCh)
}
return dMgr.waitForAllResponses(ctx, "abort-image-upgrade-to-device", respCh, len(request.GetDeviceId()))
}
func (dMgr *Manager) GetOnuImages(ctx context.Context, id *common.ID) (*voltha.OnuImages, error) {
if id == nil || id.Id == "" {
return nil, status.Errorf(codes.InvalidArgument, "empty device id")
}
ctx = utils.WithRPCMetadataContext(ctx, "GetOnuImages")
log.EnrichSpan(ctx, log.Fields{"device-id": id.Id})
agent := dMgr.getDeviceAgent(ctx, id.Id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", id.Id)
}
resp, err := agent.getOnuImages(ctx, id)
if err != nil {
return nil, err
}
logger.Debugw(ctx, "get-onu-images-result", log.Fields{"onu-image": resp.Items})
return resp, nil
}
func (dMgr *Manager) ActivateImage(ctx context.Context, request *voltha.DeviceImageRequest) (*voltha.DeviceImageResponse, error) {
if err := dMgr.validateImageRequest(request); err != nil {
return nil, err
}
ctx = utils.WithRPCMetadataContext(ctx, "ActivateImage")
respCh := make(chan []*voltha.DeviceImageState, len(request.GetDeviceId()))
activateImageReq := &voltha.DeviceImageRequest{
Version: request.Version,
CommitOnSuccess: request.CommitOnSuccess,
}
for index, deviceID := range request.DeviceId {
//slice-out only single deviceID from the request
activateImageReq.DeviceId = request.DeviceId[index : index+1]
go func(deviceID string, req *voltha.DeviceImageRequest, ch chan []*voltha.DeviceImageState) {
agent := dMgr.getDeviceAgent(ctx, deviceID)
if agent == nil {
logger.Errorw(ctx, "Not-found", log.Fields{"device-id": deviceID})
ch <- nil
return
}
resp, err := agent.activateImageOnDevice(ctx, req)
if err != nil {
logger.Errorw(ctx, "activate-image-failed", log.Fields{"device-id": deviceID, "error": err})
ch <- nil
return
}
err = dMgr.validateDeviceImageResponse(resp)
if err != nil {
logger.Errorw(ctx, "activate-image-failed", log.Fields{"device-id": deviceID, "error": err})
ch <- nil
return
}
ch <- resp.GetDeviceImageStates()
}(deviceID.GetId(), activateImageReq, respCh)
}
return dMgr.waitForAllResponses(ctx, "activate-image", respCh, len(request.GetDeviceId()))
}
func (dMgr *Manager) CommitImage(ctx context.Context, request *voltha.DeviceImageRequest) (*voltha.DeviceImageResponse, error) {
if err := dMgr.validateImageRequest(request); err != nil {
return nil, err
}
ctx = utils.WithRPCMetadataContext(ctx, "CommitImage")
respCh := make(chan []*voltha.DeviceImageState, len(request.GetDeviceId()))
commitImageReq := &voltha.DeviceImageRequest{
Version: request.Version,
CommitOnSuccess: request.CommitOnSuccess,
}
for index, deviceID := range request.DeviceId {
//slice-out only single deviceID from the request
commitImageReq.DeviceId = request.DeviceId[index : index+1]
go func(deviceID string, req *voltha.DeviceImageRequest, ch chan []*voltha.DeviceImageState) {
agent := dMgr.getDeviceAgent(ctx, deviceID)
if agent == nil {
logger.Errorw(ctx, "Not-found", log.Fields{"device-id": deviceID})
ch <- nil
return
}
resp, err := agent.commitImage(ctx, req)
if err != nil {
logger.Errorw(ctx, "commit-image-failed", log.Fields{"device-id": deviceID, "error": err})
ch <- nil
return
}
err = dMgr.validateDeviceImageResponse(resp)
if err != nil {
logger.Errorf(ctx, "commit-image-failed", log.Fields{"device-id": deviceID, "error": err})
ch <- nil
return
}
ch <- resp.GetDeviceImageStates()
}(deviceID.GetId(), commitImageReq, respCh)
}
return dMgr.waitForAllResponses(ctx, "commit-image", respCh, len(request.GetDeviceId()))
}
func (dMgr *Manager) validateImageDownloadRequest(request *voltha.DeviceImageDownloadRequest) error {
if request == nil || request.Image == nil || len(request.DeviceId) == 0 {
return status.Errorf(codes.InvalidArgument, "invalid argument")
}
for _, deviceID := range request.DeviceId {
if deviceID == nil {
return status.Errorf(codes.InvalidArgument, "id is nil")
}
}
return nil
}
func (dMgr *Manager) validateImageRequest(request *voltha.DeviceImageRequest) error {
if request == nil || len(request.DeviceId) == 0 || request.DeviceId[0] == nil {
return status.Errorf(codes.InvalidArgument, "invalid argument")
}
for _, deviceID := range request.DeviceId {
if deviceID == nil {
return status.Errorf(codes.InvalidArgument, "id is nil")
}
}
return nil
}
func (dMgr *Manager) validateDeviceImageResponse(response *voltha.DeviceImageResponse) error {
if response == nil || len(response.GetDeviceImageStates()) == 0 || response.GetDeviceImageStates()[0] == nil {
return status.Errorf(codes.Internal, "invalid-response-from-adapter")
}
return nil
}
func (dMgr *Manager) waitForAllResponses(ctx context.Context, opName string, respCh chan []*voltha.DeviceImageState, expectedResps int) (*voltha.DeviceImageResponse, error) {
response := &voltha.DeviceImageResponse{}
respCount := 0
for {
select {
case resp, ok := <-respCh:
if !ok {
logger.Errorw(ctx, opName+"-failed", log.Fields{"error": "channel-closed"})
return response, status.Errorf(codes.Aborted, "channel-closed")
}
if resp != nil {
logger.Debugw(ctx, opName+"-result", log.Fields{"image-state": resp[0].GetImageState(), "device-id": resp[0].GetDeviceId()})
response.DeviceImageStates = append(response.DeviceImageStates, resp...)
}
respCount++
//check whether all responses received, if so, sent back the collated response
if respCount == expectedResps {
return response, nil
}
continue
case <-ctx.Done():
return nil, status.Errorf(codes.Aborted, opName+"-failed-%s", ctx.Err())
}
}
}