blob: e7bb371165eabaf0ad8164b0801e9de71837ccec [file] [log] [blame]
/*
* Copyright 2018-present Open Networking Foundation
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package device
import (
"context"
"fmt"
"github.com/opencord/voltha-protos/v5/go/common"
"github.com/gogo/protobuf/proto"
"github.com/opencord/voltha-go/rw_core/core/device/port"
coreutils "github.com/opencord/voltha-go/rw_core/utils"
"github.com/opencord/voltha-lib-go/v7/pkg/log"
"github.com/opencord/voltha-protos/v5/go/voltha"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// listDevicePorts returns device ports
func (agent *Agent) listDevicePorts() map[uint32]*voltha.Port {
portIDs := agent.portLoader.ListIDs()
ports := make(map[uint32]*voltha.Port, len(portIDs))
for portID := range portIDs {
if portHandle, have := agent.portLoader.Lock(portID); have {
ports[portID] = portHandle.GetReadOnly()
portHandle.Unlock()
}
}
return ports
}
// getPorts retrieves the ports information of the device based on the port type.
func (agent *Agent) getPorts(ctx context.Context, portType voltha.Port_PortType) *voltha.Ports {
logger.Debugw(ctx, "get-ports", log.Fields{"device-id": agent.deviceID, "port-type": portType})
ports := &voltha.Ports{}
for _, port := range agent.listDevicePorts() {
if port.Type == portType {
ports.Items = append(ports.Items, port)
}
}
return ports
}
func (agent *Agent) getDevicePort(portID uint32) (*voltha.Port, error) {
portHandle, have := agent.portLoader.Lock(portID)
if !have {
return nil, status.Errorf(codes.NotFound, "port-%d", portID)
}
defer portHandle.Unlock()
return portHandle.GetReadOnly(), nil
}
func (agent *Agent) updatePortsOperState(ctx context.Context, portTypeFilter uint32, operStatus voltha.OperStatus_Types) error {
logger.Debugw(ctx, "update-ports-oper-state", log.Fields{"device-id": agent.deviceID})
for portID := range agent.portLoader.ListIDs() {
if portHandle, have := agent.portLoader.Lock(portID); have {
if oldPort := portHandle.GetReadOnly(); (1<<oldPort.Type)&portTypeFilter == 0 { // only update port types not included in the mask
// clone top-level port struct
newPort := *oldPort
newPort.OperStatus = operStatus
if err := portHandle.Update(ctx, &newPort); err != nil {
portHandle.Unlock()
return err
}
// Notify the logical device manager to change the port state
// Do this for NNI and UNIs only. PON ports are not known by logical device
if newPort.Type == voltha.Port_ETHERNET_NNI || newPort.Type == voltha.Port_ETHERNET_UNI {
subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
go func(portID uint32, ctx context.Context) {
if err := agent.deviceMgr.logicalDeviceMgr.updatePortState(ctx, agent.deviceID, portID, operStatus); err != nil {
// TODO: VOL-2707
logger.Warnw(ctx, "unable-to-update-logical-port-state", log.Fields{"error": err})
}
}(portID, subCtx)
}
}
portHandle.Unlock()
}
}
return nil
}
func (agent *Agent) updatePortState(ctx context.Context, portType voltha.Port_PortType, portNo uint32, operStatus voltha.OperStatus_Types) error {
// Ensure the enums passed in are valid - they will be invalid if they are not set when this function is invoked
if _, ok := voltha.Port_PortType_value[portType.String()]; !ok {
return status.Errorf(codes.InvalidArgument, "%s", portType)
}
portHandle, have := agent.portLoader.Lock(portNo)
if !have {
return nil
}
defer portHandle.Unlock()
port := portHandle.GetReadOnly()
if port.Type != portType {
return nil
}
newPort := *port // clone top-level port struct
newPort.OperStatus = operStatus
return portHandle.Update(ctx, &newPort)
}
func (agent *Agent) deleteAllPorts(ctx context.Context) error {
logger.Debugw(ctx, "delete-all-ports", log.Fields{"device-id": agent.deviceID})
device, err := agent.getDeviceReadOnly(ctx)
if err != nil {
return err
}
if device.AdminState != voltha.AdminState_DISABLED && !agent.isDeletionInProgress() {
err := status.Error(codes.FailedPrecondition, fmt.Sprintf("invalid-admin-state-%v",
device.AdminState))
logger.Warnw(ctx, "invalid-state-removing-ports", log.Fields{"state": device.AdminState, "error": err})
return err
}
for portID := range agent.portLoader.ListIDs() {
if portHandle, have := agent.portLoader.Lock(portID); have {
if err := portHandle.Delete(ctx); err != nil {
portHandle.Unlock()
return err
}
portHandle.Unlock()
}
}
return nil
}
func (agent *Agent) addPort(ctx context.Context, port *voltha.Port) error {
logger.Debugw(ctx, "addPort", log.Fields{"device-id": agent.deviceID})
var desc string
var err error
operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
defer func() { agent.logDeviceUpdate(ctx, nil, nil, operStatus, err, desc) }()
port.AdminState = voltha.AdminState_ENABLED
portHandle, created, err := agent.portLoader.LockOrCreate(ctx, port)
if err != nil {
desc = err.Error()
return err
}
defer portHandle.Unlock()
if created {
operStatus.Code = common.OperationResp_OPERATION_SUCCESS
return nil
}
oldPort := portHandle.GetReadOnly()
if oldPort.Label != "" || oldPort.Type != voltha.Port_PON_OLT {
logger.Debugw(ctx, "port-already-exists", log.Fields{"port": port})
desc = fmt.Sprintf("port already exists, port : %s", port)
operStatus.Code = common.OperationResp_OPERATION_SUCCESS
return nil
}
// Creation of OLT PON port is being processed after a default PON port was created. Just update it.
logger.Infow(ctx, "update-pon-port-created-by-default", log.Fields{"default-port": oldPort, "port-to-add": port})
newPort := *oldPort // clone top-level port struct
newPort.Label = port.Label
newPort.OperStatus = port.OperStatus
err = portHandle.Update(ctx, &newPort)
if err != nil {
desc = err.Error()
return err
}
operStatus.Code = common.OperationResp_OPERATION_SUCCESS
return err
}
func (agent *Agent) addPeerPort(ctx context.Context, peerPort *voltha.Port_PeerPort) error {
logger.Debugw(ctx, "adding-peer-peerPort", log.Fields{"device-id": agent.deviceID, "peer-peerPort": peerPort})
var portHandle *port.Handle
if agent.isRootDevice {
// If an ONU PON port needs to be referenced before the corresponding creation of the OLT PON port, then create the OLT PON port
// with default values, and update it later when the OLT PON port creation is processed.
ponPort := &voltha.Port{
PortNo: peerPort.PortNo,
Type: voltha.Port_PON_OLT,
AdminState: voltha.AdminState_ENABLED,
DeviceId: agent.deviceID,
Peers: []*voltha.Port_PeerPort{peerPort},
}
h, created, err := agent.portLoader.LockOrCreate(ctx, ponPort)
if err != nil {
return err
}
defer h.Unlock()
if created {
logger.Infow(ctx, "added-default-pon-port", log.Fields{"device-id": agent.deviceID, "peer": peerPort, "pon-port": ponPort})
return nil
}
portHandle = h
} else {
h, have := agent.portLoader.Lock(peerPort.PortNo)
if !have {
return nil
}
defer h.Unlock()
portHandle = h
}
logger.Debugw(ctx, "found-peer", log.Fields{"device-id": agent.deviceID, "portNo": peerPort.PortNo, "deviceId": agent.deviceID})
newPort := proto.Clone(portHandle.GetReadOnly()).(*voltha.Port)
newPort.Peers = append(newPort.Peers, peerPort)
return portHandle.Update(ctx, newPort)
}
func (agent *Agent) disablePort(ctx context.Context, portID uint32) error {
logger.Debugw(ctx, "disable-port", log.Fields{"device-id": agent.deviceID, "port-no": portID})
var err error
var desc string
operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
defer func() { agent.logDeviceUpdate(ctx, nil, nil, operStatus, err, desc) }()
portHandle, have := agent.portLoader.Lock(portID)
if !have {
err = status.Errorf(codes.InvalidArgument, "%v", portID)
return err
}
defer portHandle.Unlock()
oldPort := portHandle.GetReadOnly()
if oldPort.Type != voltha.Port_PON_OLT {
err = status.Errorf(codes.Unimplemented, "disabling of Port Type %v unimplemented", oldPort.Type)
return err
}
newPort := *oldPort
newPort.AdminState = voltha.AdminState_DISABLED
if err := portHandle.Update(ctx, &newPort); err != nil {
return err
}
//send request to adapter
device, err := agent.getDeviceReadOnly(ctx)
if err != nil {
return err
}
// Send the request to the adapter
client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
if err != nil {
logger.Errorw(ctx, "grpc-client-nil",
log.Fields{
"error": err,
"device-id": agent.deviceID,
"device-type": agent.deviceType,
"adapter-endpoint": device.AdapterEndpoint,
})
return err
}
subCtx, cancel := context.WithTimeout(coreutils.WithAllMetadataFromContext(ctx), agent.rpcTimeout)
operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
go func() {
defer cancel()
_, err := client.DisablePort(subCtx, &newPort)
if err == nil {
agent.onSuccess(subCtx, nil, nil, true)
} else {
agent.onFailure(subCtx, err, nil, nil, true)
}
}()
return nil
}
func (agent *Agent) enablePort(ctx context.Context, portID uint32) error {
logger.Debugw(ctx, "enable-port", log.Fields{"device-id": agent.deviceID, "port-no": portID})
var err error
var desc string
operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
defer func() { agent.logDeviceUpdate(ctx, nil, nil, operStatus, err, desc) }()
portHandle, have := agent.portLoader.Lock(portID)
if !have {
err = status.Errorf(codes.InvalidArgument, "%v", portID)
return err
}
defer portHandle.Unlock()
oldPort := portHandle.GetReadOnly()
if oldPort.Type != voltha.Port_PON_OLT {
err = status.Errorf(codes.Unimplemented, "enabling of Port Type %v unimplemented", oldPort.Type)
return err
}
newPort := *oldPort
newPort.AdminState = voltha.AdminState_ENABLED
if err = portHandle.Update(ctx, &newPort); err != nil {
return err
}
//send request to adapter
device, err := agent.getDeviceReadOnly(ctx)
if err != nil {
return err
}
client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
if err != nil {
logger.Errorw(ctx, "grpc-client-nil",
log.Fields{
"error": err,
"device-id": agent.deviceID,
"device-type": agent.deviceType,
"adapter-endpoint": device.AdapterEndpoint,
})
return err
}
subCtx, cancel := context.WithTimeout(coreutils.WithAllMetadataFromContext(ctx), agent.rpcTimeout)
operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
go func() {
defer cancel()
_, err := client.EnablePort(subCtx, &newPort)
if err == nil {
agent.onSuccess(subCtx, nil, nil, true)
} else {
agent.onFailure(subCtx, err, nil, nil, true)
}
}()
return nil
}