blob: cd879c644268568fe5901821bed7cfb8dcabf84a [file] [log] [blame]
/*
* Copyright 2018-present Open Networking Foundation
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package device
import (
"context"
"encoding/hex"
"errors"
"fmt"
"reflect"
"sync"
"time"
"github.com/cenkalti/backoff/v3"
"github.com/gogo/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/empty"
"github.com/opencord/voltha-go/rw_core/config"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/opencord/voltha-go/db/model"
"github.com/opencord/voltha-go/rw_core/core/adapter"
"github.com/opencord/voltha-go/rw_core/core/device/flow"
"github.com/opencord/voltha-go/rw_core/core/device/group"
"github.com/opencord/voltha-go/rw_core/core/device/port"
"github.com/opencord/voltha-go/rw_core/core/device/remote"
"github.com/opencord/voltha-go/rw_core/core/device/transientstate"
coreutils "github.com/opencord/voltha-go/rw_core/utils"
"github.com/opencord/voltha-lib-go/v5/pkg/kafka"
"github.com/opencord/voltha-lib-go/v5/pkg/log"
"github.com/opencord/voltha-protos/v4/go/common"
"github.com/opencord/voltha-protos/v4/go/extension"
ic "github.com/opencord/voltha-protos/v4/go/inter_container"
ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
"github.com/opencord/voltha-protos/v4/go/voltha"
)
// Agent represents device agent attributes
type Agent struct {
deviceID string
parentID string
deviceType string
isRootDevice bool
adapterProxy *remote.AdapterProxy
adapterMgr *adapter.Manager
deviceMgr *Manager
dbProxy *model.Proxy
exitChannel chan int
device *voltha.Device
requestQueue *coreutils.RequestQueue
defaultTimeout time.Duration
startOnce sync.Once
stopOnce sync.Once
stopped bool
stopReconciling chan int
stopReconcilingMutex sync.RWMutex
config *config.RWCoreFlags
flowCache *flow.Cache
groupCache *group.Cache
portLoader *port.Loader
transientStateLoader *transientstate.Loader
}
//newAgent creates a new device agent. The device will be initialized when start() is called.
func newAgent(ap *remote.AdapterProxy, device *voltha.Device, deviceMgr *Manager, dbPath *model.Path, deviceProxy *model.Proxy, timeout time.Duration) *Agent {
deviceID := device.Id
if deviceID == "" {
deviceID = coreutils.CreateDeviceID()
}
return &Agent{
deviceID: deviceID,
adapterProxy: ap,
isRootDevice: device.Root,
parentID: device.ParentId,
deviceType: device.Type,
deviceMgr: deviceMgr,
adapterMgr: deviceMgr.adapterMgr,
exitChannel: make(chan int, 1),
dbProxy: deviceProxy,
defaultTimeout: timeout,
device: proto.Clone(device).(*voltha.Device),
requestQueue: coreutils.NewRequestQueue(),
config: deviceMgr.config,
flowCache: flow.NewCache(),
groupCache: group.NewCache(),
portLoader: port.NewLoader(dbPath.SubPath("ports").Proxy(deviceID)),
transientStateLoader: transientstate.NewLoader(dbPath.SubPath("core").Proxy("transientstate"), deviceID),
}
}
// start() saves the device to the data model and registers for callbacks on that device if deviceToCreate!=nil.
// Otherwise, it will load the data from the dB and setup the necessary callbacks and proxies. Returns the device that
// was started.
func (agent *Agent) start(ctx context.Context, deviceExist bool, deviceToCreate *voltha.Device) (*voltha.Device, error) {
needToStart := false
if agent.startOnce.Do(func() { needToStart = true }); !needToStart {
return agent.getDeviceReadOnly(ctx)
}
var startSucceeded bool
defer func() {
if !startSucceeded {
if err := agent.stop(ctx); err != nil {
logger.Errorw(ctx, "failed-to-cleanup-after-unsuccessful-start", log.Fields{"device-id": agent.deviceID, "error": err})
}
}
}()
if deviceExist {
device := deviceToCreate
if device == nil {
// Load from dB
device = &voltha.Device{}
have, err := agent.dbProxy.Get(ctx, agent.deviceID, device)
if err != nil {
return nil, err
} else if !have {
return nil, status.Errorf(codes.NotFound, "device-%s", agent.deviceID)
}
}
agent.deviceType = device.Adapter
agent.device = proto.Clone(device).(*voltha.Device)
// load the ports from KV to cache
agent.portLoader.Load(ctx)
agent.transientStateLoader.Load(ctx)
logger.Infow(ctx, "device-loaded-from-db", log.Fields{"device-id": agent.deviceID})
} else {
// Create a new device
var desc string
prevState := common.AdminState_UNKNOWN
currState := common.AdminState_UNKNOWN
operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
defer agent.logDeviceUpdate(ctx, "createDevice", &prevState, &currState, operStatus, &desc)
// Assumption is that AdminState, FlowGroups, and Flows are uninitialized since this
// is a new device, so populate them here before passing the device to ldProxy.Set.
// agent.deviceId will also have been set during newAgent().
device := (proto.Clone(deviceToCreate)).(*voltha.Device)
device.Id = agent.deviceID
device.AdminState = voltha.AdminState_PREPROVISIONED
currState = device.AdminState
if !deviceToCreate.GetRoot() && deviceToCreate.ProxyAddress != nil {
// Set the default vlan ID to the one specified by the parent adapter. It can be
// overwritten by the child adapter during a device update request
device.Vlan = deviceToCreate.ProxyAddress.ChannelId
}
// Add the initial device to the local model
if err := agent.dbProxy.Set(ctx, agent.deviceID, device); err != nil {
desc = fmt.Sprintf("failed-adding-device-%s: %s", agent.deviceID, err.Error())
return nil, status.Errorf(codes.Aborted, "failed-adding-device-%s: %s", agent.deviceID, err)
}
_ = agent.deviceMgr.Agent.SendDeviceStateChangeEvent(ctx, device.OperStatus, device.ConnectStatus, prevState, device, time.Now().Unix())
operStatus.Code = common.OperationResp_OPERATION_SUCCESS
agent.device = device
}
startSucceeded = true
log.EnrichSpan(ctx, log.Fields{"device-id": agent.deviceID})
logger.Debugw(ctx, "device-agent-started", log.Fields{"device-id": agent.deviceID})
return agent.getDeviceReadOnly(ctx)
}
// stop stops the device agent. Not much to do for now
func (agent *Agent) stop(ctx context.Context) error {
needToStop := false
if agent.stopOnce.Do(func() { needToStop = true }); !needToStop {
return nil
}
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
defer agent.requestQueue.RequestComplete()
logger.Infow(ctx, "stopping-device-agent", log.Fields{"device-id": agent.deviceID, "parent-id": agent.parentID})
// Remove the device transient loader
if err := agent.deleteTransientState(ctx); err != nil {
return err
}
// Remove the device from the KV store
if err := agent.dbProxy.Remove(ctx, agent.deviceID); err != nil {
return err
}
close(agent.exitChannel)
agent.stopped = true
logger.Infow(ctx, "device-agent-stopped", log.Fields{"device-id": agent.deviceID, "parent-id": agent.parentID})
return nil
}
// Load the most recent state from the KVStore for the device.
func (agent *Agent) reconcileWithKVStore(ctx context.Context) {
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
logger.Warnw(ctx, "request-aborted", log.Fields{"device-id": agent.deviceID, "error": err})
return
}
defer agent.requestQueue.RequestComplete()
logger.Debug(ctx, "reconciling-device-agent-devicetype")
// TODO: context timeout
device := &voltha.Device{}
if have, err := agent.dbProxy.Get(ctx, agent.deviceID, device); err != nil {
logger.Errorw(ctx, "kv-get-failed", log.Fields{"device-id": agent.deviceID, "error": err})
return
} else if !have {
return // not found in kv
}
agent.deviceType = device.Adapter
agent.device = device
agent.portLoader.Load(ctx)
agent.transientStateLoader.Load(ctx)
logger.Debugw(ctx, "reconciled-device-agent-devicetype", log.Fields{"device-id": agent.deviceID, "type": agent.deviceType})
}
// onSuccess is a common callback for scenarios where we receive a nil response following a request to an adapter
// and the only action required is to publish a successful result on kafka
func (agent *Agent) onSuccess(ctx context.Context, rpc string, response interface{}, reqArgs ...interface{}) {
logger.Debugw(ctx, "response-successful", log.Fields{"rpc": rpc, "device-id": agent.deviceID})
// TODO: Post success message onto kafka
}
// onFailure is a common callback for scenarios where we receive an error response following a request to an adapter
// and the only action required is to publish the failed result on kafka
func (agent *Agent) onFailure(ctx context.Context, rpc string, response interface{}, reqArgs ...interface{}) {
if res, ok := response.(error); ok {
logger.Errorw(ctx, "rpc-failed", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "error": res, "args": reqArgs})
} else {
logger.Errorw(ctx, "rpc-failed-invalid-error", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "args": reqArgs})
}
// TODO: Post failure message onto kafka
}
func (agent *Agent) waitForAdapterForceDeleteResponse(ctx context.Context, cancel context.CancelFunc, rpc string, ch chan *kafka.RpcResponse,
onSuccess coreutils.ResponseCallback, onFailure coreutils.ResponseCallback, reqArgs ...interface{}) {
defer cancel()
select {
case rpcResponse, ok := <-ch:
if !ok {
onFailure(ctx, rpc, status.Errorf(codes.Aborted, "channel-closed"), reqArgs)
} else if rpcResponse.Err != nil {
onFailure(ctx, rpc, rpcResponse.Err, reqArgs)
} else {
onSuccess(ctx, rpc, rpcResponse.Reply, reqArgs)
}
case <-ctx.Done():
onFailure(ctx, rpc, ctx.Err(), reqArgs)
}
}
// onDeleteSuccess is a common callback for scenarios where we receive a nil response following a delete request
// to an adapter.
func (agent *Agent) onDeleteSuccess(ctx context.Context, rpc string, response interface{}, reqArgs ...interface{}) {
logger.Debugw(ctx, "response-successful", log.Fields{"rpc": rpc, "device-id": agent.deviceID})
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
logger.Errorw(ctx, "delete-device-failure", log.Fields{"device-id": agent.deviceID, "error": err, "args": reqArgs})
}
previousDeviceTransientState := agent.getTransientState()
newDevice := agent.cloneDeviceWithoutLock()
if err := agent.updateDeviceWithTransientStateAndReleaseLock(ctx, newDevice,
voltha.DeviceTransientState_DELETING_POST_ADAPTER_RESPONSE, previousDeviceTransientState); err != nil {
logger.Errorw(ctx, "delete-device-failure", log.Fields{"device-id": agent.deviceID, "error": err, "args": reqArgs})
}
}
// onDeleteFailure is a common callback for scenarios where we receive an error response following a delete request
// to an adapter and the only action required is to return the error response.
func (agent *Agent) onDeleteFailure(ctx context.Context, rpc string, response interface{}, reqArgs ...interface{}) {
if res, ok := response.(error); ok {
logger.Errorw(ctx, "rpc-failed", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "error": res, "args": reqArgs})
} else {
logger.Errorw(ctx, "rpc-failed-invalid-error", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "args": reqArgs})
}
//Only updating of transient state is required, no transition.
if err := agent.updateTransientState(ctx, voltha.DeviceTransientState_DELETE_FAILED); err != nil {
logger.Errorw(ctx, "failed-to-update-transient-state-as-delete-failed", log.Fields{"device-id": agent.deviceID})
}
}
func (agent *Agent) waitForAdapterResponse(ctx context.Context, cancel context.CancelFunc, rpc string, ch chan *kafka.RpcResponse,
onSuccess coreutils.ResponseCallback, onFailure coreutils.ResponseCallback, reqArgs ...interface{}) {
defer cancel()
var rpce *voltha.RPCEvent
defer func() {
if rpce != nil {
agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce,
voltha.EventCategory_COMMUNICATION, nil, time.Now().Unix())
}
}()
select {
case rpcResponse, ok := <-ch:
if !ok {
rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, "Response Channel Closed", nil)
onFailure(ctx, rpc, status.Errorf(codes.Aborted, "channel-closed"), reqArgs)
//add failure
} else if rpcResponse.Err != nil {
rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, rpcResponse.Err.Error(), nil)
onFailure(ctx, rpc, rpcResponse.Err, reqArgs)
//add failure
} else {
onSuccess(ctx, rpc, rpcResponse.Reply, reqArgs)
}
case <-ctx.Done():
rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, ctx.Err().Error(), nil)
onFailure(ctx, rpc, ctx.Err(), reqArgs)
}
}
func (agent *Agent) waitForAdapterResponseAndLogDeviceUpdate(ctx context.Context, cancel context.CancelFunc, rpc string, ch chan *kafka.RpcResponse,
onSuccess coreutils.ResponseCallback, onFailure coreutils.ResponseCallback, prevState *common.AdminState_Types, reqArgs ...interface{}) {
defer cancel()
var desc string
operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
defer func() {
currAdminState := prevState
if d, _ := agent.getDeviceReadOnly(ctx); d != nil {
currAdminState = &d.AdminState
}
agent.logDeviceUpdate(ctx, rpc, prevState, currAdminState, operStatus, &desc)
}()
var rpce *voltha.RPCEvent
defer func() {
if rpce != nil {
agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce,
voltha.EventCategory_COMMUNICATION, nil, time.Now().Unix())
}
}()
select {
case rpcResponse, ok := <-ch:
if !ok {
rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, "Response Channel Closed", nil)
onFailure(ctx, rpc, status.Errorf(codes.Aborted, "channel-closed"), reqArgs)
//add failure
} else if rpcResponse.Err != nil {
desc = rpcResponse.Err.Error()
rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, desc, nil)
onFailure(ctx, rpc, rpcResponse.Err, reqArgs)
//add failure
} else {
operStatus.Code = common.OperationResp_OPERATION_SUCCESS
onSuccess(ctx, rpc, rpcResponse.Reply, reqArgs)
}
case <-ctx.Done():
desc = ctx.Err().Error()
rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, desc, nil)
onFailure(ctx, rpc, ctx.Err(), reqArgs)
}
}
// getDeviceReadOnly returns a device which MUST NOT be modified, but is safe to keep forever.
func (agent *Agent) getDeviceReadOnly(ctx context.Context) (*voltha.Device, error) {
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
}
defer agent.requestQueue.RequestComplete()
return agent.device, nil
}
// getDeviceReadOnlyWithoutLock returns a device which MUST NOT be modified, but is safe to keep forever. This is very efficient.
// The device lock MUST be held by the caller.
func (agent *Agent) getDeviceReadOnlyWithoutLock() *voltha.Device {
return agent.device
}
// cloneDeviceWithoutLock returns a copy of the device which is safe to modify.
// The device lock MUST be held by the caller.
func (agent *Agent) cloneDeviceWithoutLock() *voltha.Device {
return proto.Clone(agent.device).(*voltha.Device)
}
// enableDevice activates a preprovisioned or a disable device
func (agent *Agent) enableDevice(ctx context.Context) error {
//To preserve and use oldDevice state as prev state in new device
var desc string
operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
defer agent.logDeviceUpdate(ctx, "enableDevice", nil, nil, operStatus, &desc)
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
logger.Debugw(ctx, "enable-device", log.Fields{"device-id": agent.deviceID})
prevDeviceState := agent.device.AdminState
oldDevice := agent.getDeviceReadOnlyWithoutLock()
if !agent.proceedWithRequest(oldDevice) {
agent.requestQueue.RequestComplete()
desc = fmt.Sprintf("deviceId:%s, Cannot complete operation as device deletion is in progress or reconcile is in progress/failed.", agent.deviceID)
return status.Error(codes.FailedPrecondition, desc)
}
if oldDevice.AdminState == voltha.AdminState_ENABLED {
logger.Warnw(ctx, "device-already-enabled", log.Fields{"device-id": agent.deviceID})
agent.requestQueue.RequestComplete()
desc = fmt.Sprintf("cannot-enable-an-already-enabled-device: %s", oldDevice.Id)
return status.Error(codes.FailedPrecondition, desc)
}
// First figure out which adapter will handle this device type. We do it at this stage as allow devices to be
// pre-provisioned with the required adapter not registered. At this stage, since we need to communicate
// with the adapter then we need to know the adapter that will handle this request
adapterName, err := agent.adapterMgr.GetAdapterType(oldDevice.Type)
if err != nil {
agent.requestQueue.RequestComplete()
desc = err.Error()
return err
}
newDevice := agent.cloneDeviceWithoutLock()
newDevice.Adapter = adapterName
// Update the Admin State and set the operational state to activating before sending the request to the Adapters
newDevice.AdminState = voltha.AdminState_ENABLED
newDevice.OperStatus = voltha.OperStatus_ACTIVATING
if err := agent.updateDeviceAndReleaseLock(ctx, newDevice); err != nil {
desc = err.Error()
return err
}
// Adopt the device if it was in pre-provision state. In all other cases, try to re-enable it.
var ch chan *kafka.RpcResponse
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
subCtx = coreutils.WithFromTopicMetadataFromContext(subCtx, ctx)
if oldDevice.AdminState == voltha.AdminState_PREPROVISIONED {
ch, err = agent.adapterProxy.AdoptDevice(subCtx, newDevice)
} else {
ch, err = agent.adapterProxy.ReEnableDevice(subCtx, newDevice)
}
if err != nil {
cancel()
desc = err.Error()
return err
}
operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
// Wait for response
go agent.waitForAdapterResponseAndLogDeviceUpdate(subCtx, cancel, "enableDevice", ch, agent.onSuccess, agent.onFailure, &prevDeviceState)
return nil
}
func (agent *Agent) waitForAdapterFlowResponse(ctx context.Context, cancel context.CancelFunc, rpc string, ch chan *kafka.RpcResponse, response coreutils.Response) {
defer cancel()
var desc string
operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
defer agent.logDeviceUpdate(ctx, rpc, nil, nil, operStatus, &desc)
var rpce *voltha.RPCEvent
defer func() {
if rpce != nil {
agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce,
voltha.EventCategory_COMMUNICATION, nil, time.Now().Unix())
}
}()
select {
case rpcResponse, ok := <-ch:
if !ok {
//add failure
desc = "Response Channel Closed"
rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, "Response Channel Closed", nil)
response.Error(status.Errorf(codes.Aborted, "channel-closed"))
} else if rpcResponse.Err != nil {
//add failure
desc = rpcResponse.Err.Error()
rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, desc, nil)
response.Error(rpcResponse.Err)
} else {
operStatus.Code = common.OperationResp_OPERATION_SUCCESS
response.Done()
}
case <-ctx.Done():
desc = ctx.Err().Error()
rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, desc, nil)
response.Error(ctx.Err())
}
}
//addFlowsAndGroups adds the "newFlows" and "newGroups" from the existing flows/groups and sends the update to the
//adapters
func (agent *Agent) addFlowsAndGroups(ctx context.Context, newFlows []*ofp.OfpFlowStats, newGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
var flwResponse, grpResponse coreutils.Response
var err error
//if new flow list is empty then the called function returns quickly
if flwResponse, err = agent.addFlowsToAdapter(ctx, newFlows, flowMetadata); err != nil {
return err
}
//if new group list is empty then the called function returns quickly
if grpResponse, err = agent.addGroupsToAdapter(ctx, newGroups, flowMetadata); err != nil {
return err
}
if errs := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, flwResponse, grpResponse); errs != nil {
logger.Warnw(ctx, "no-adapter-response", log.Fields{"device-id": agent.deviceID, "result": errs})
return status.Errorf(codes.Aborted, "flow-failure-device-%s", agent.deviceID)
}
return nil
}
//deleteFlowsAndGroups removes the "flowsToDel" and "groupsToDel" from the existing flows/groups and sends the update to the
//adapters
func (agent *Agent) deleteFlowsAndGroups(ctx context.Context, flowsToDel []*ofp.OfpFlowStats, groupsToDel []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
var flwResponse, grpResponse coreutils.Response
var err error
if flwResponse, err = agent.deleteFlowsFromAdapter(ctx, flowsToDel, flowMetadata); err != nil {
return err
}
if grpResponse, err = agent.deleteGroupsFromAdapter(ctx, groupsToDel, flowMetadata); err != nil {
return err
}
if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, flwResponse, grpResponse); res != nil {
return status.Errorf(codes.Aborted, "errors-%s", res)
}
return nil
}
//updateFlowsAndGroups replaces the existing flows and groups with "updatedFlows" and "updatedGroups" respectively. It
//also sends the updates to the adapters
func (agent *Agent) updateFlowsAndGroups(ctx context.Context, updatedFlows []*ofp.OfpFlowStats, updatedGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
var flwResponse, grpResponse coreutils.Response
var err error
if flwResponse, err = agent.updateFlowsToAdapter(ctx, updatedFlows, flowMetadata); err != nil {
return err
}
if grpResponse, err = agent.updateGroupsToAdapter(ctx, updatedGroups, flowMetadata); err != nil {
return err
}
if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, flwResponse, grpResponse); res != nil {
return status.Errorf(codes.Aborted, "errors-%s", res)
}
return nil
}
//disableDevice disable a device
func (agent *Agent) disableDevice(ctx context.Context) error {
var desc string
operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
prevDeviceState := agent.device.AdminState
defer agent.logDeviceUpdate(ctx, "disableDevice", nil, nil, operStatus, &desc)
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
desc = err.Error()
return err
}
logger.Debugw(ctx, "disable-device", log.Fields{"device-id": agent.deviceID})
cloned := agent.cloneDeviceWithoutLock()
if !agent.proceedWithRequest(cloned) {
agent.requestQueue.RequestComplete()
desc = fmt.Sprintf("deviceId:%s,Cannot complete operation as device deletion is in progress or reconciling is in progress/failed.", agent.deviceID)
return status.Errorf(codes.FailedPrecondition, desc)
}
if cloned.AdminState == voltha.AdminState_DISABLED {
desc = "device-already-disabled"
logger.Debugw(ctx, "device-already-disabled", log.Fields{"device-id": agent.deviceID})
agent.requestQueue.RequestComplete()
return nil
}
if cloned.AdminState == voltha.AdminState_PREPROVISIONED {
agent.requestQueue.RequestComplete()
desc = fmt.Sprintf("deviceId:%s, invalid-admin-state:%s", agent.deviceID, cloned.AdminState)
return status.Errorf(codes.FailedPrecondition, "deviceId:%s, invalid-admin-state:%s", agent.deviceID, cloned.AdminState)
}
// Update the Admin State and operational state before sending the request out
cloned.AdminState = voltha.AdminState_DISABLED
cloned.OperStatus = voltha.OperStatus_UNKNOWN
if err := agent.updateDeviceAndReleaseLock(ctx, cloned); err != nil {
return err
}
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
ch, err := agent.adapterProxy.DisableDevice(subCtx, cloned)
if err != nil {
cancel()
desc = err.Error()
return err
}
operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
// Wait for response
go agent.waitForAdapterResponseAndLogDeviceUpdate(subCtx, cancel, "disableDevice", ch, agent.onSuccess, agent.onFailure, &prevDeviceState)
return nil
}
func (agent *Agent) rebootDevice(ctx context.Context) error {
var desc string
operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
prevDeviceState := agent.device.AdminState
defer agent.logDeviceUpdate(ctx, "rebootDevice", nil, nil, operStatus, &desc)
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
desc = err.Error()
return err
}
defer agent.requestQueue.RequestComplete()
logger.Debugw(ctx, "reboot-device", log.Fields{"device-id": agent.deviceID})
device := agent.getDeviceReadOnlyWithoutLock()
if !agent.proceedWithRequest(device) {
desc = fmt.Sprintf("deviceId:%s, Cannot complete operation as device deletion is in progress or reconciling is in progress/failed.", agent.deviceID)
return status.Errorf(codes.FailedPrecondition, desc)
}
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
ch, err := agent.adapterProxy.RebootDevice(subCtx, device)
if err != nil {
cancel()
desc = err.Error()
return err
}
operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
// Wait for response
go agent.waitForAdapterResponseAndLogDeviceUpdate(subCtx, cancel, "rebootDevice", ch, agent.onSuccess, agent.onFailure, &prevDeviceState)
return nil
}
func (agent *Agent) deleteDeviceForce(ctx context.Context) error {
logger.Debugw(ctx, "delete-device-force", log.Fields{"device-id": agent.deviceID})
var desc string
operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
desc = err.Error()
agent.logDeviceUpdate(ctx, "deleteDeviceForce", nil, nil, operStatus, &desc)
return err
}
// Get the device Transient state, return err if it is DELETING
previousDeviceTransientState := agent.getTransientState()
if agent.isStateDeleting(previousDeviceTransientState) {
agent.requestQueue.RequestComplete()
desc = fmt.Sprintf("deviceId:%s, Device Deletion is in progress",
agent.deviceID)
agent.logDeviceUpdate(ctx, "deleteDeviceForce", nil, nil, operStatus, &desc)
return status.Error(codes.FailedPrecondition, desc)
}
//Send stop Reconcile if in progress
agent.stopReconcile()
device := agent.cloneDeviceWithoutLock()
if err := agent.updateDeviceWithTransientStateAndReleaseLock(ctx, device,
voltha.DeviceTransientState_FORCE_DELETING, previousDeviceTransientState); err != nil {
return err
}
previousAdminState := device.AdminState
if previousAdminState != ic.AdminState_PREPROVISIONED {
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
ch, err := agent.adapterProxy.DeleteDevice(subCtx, device)
if err != nil {
cancel()
desc = err.Error()
agent.logDeviceUpdate(ctx, "deleteDeviceForce", nil, nil, operStatus, &desc)
return err
}
// As force delete will not be dependent over the response of adapter, marking this operation as success
operStatus.Code = common.OperationResp_OPERATION_SUCCESS
agent.logDeviceUpdate(ctx, "deleteDeviceForce", nil, nil, operStatus, &desc)
// Since it is a case of force delete, nothing needs to be done on adapter responses.
go agent.waitForAdapterForceDeleteResponse(subCtx, cancel, "deleteDeviceForce", ch, agent.onSuccess,
agent.onFailure)
}
return nil
}
func (agent *Agent) deleteDevice(ctx context.Context) error {
logger.Debugw(ctx, "delete-device", log.Fields{"device-id": agent.deviceID})
var desc string
operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
prevState := agent.device.AdminState
defer agent.logDeviceUpdate(ctx, "deleteDevice", nil, nil, operStatus, &desc)
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
desc = err.Error()
return err
}
device := agent.cloneDeviceWithoutLock()
if !agent.proceedWithRequest(device) {
agent.requestQueue.RequestComplete()
desc = fmt.Sprintf("deviceId:%s, Cannot complete operation as device deletion is in progress or reconciling is in progress/failed", agent.deviceID)
return status.Error(codes.FailedPrecondition, desc)
}
// Get the device Transient state, return err if it is DELETING
previousDeviceTransientState := agent.getTransientState()
previousAdminState := device.AdminState
// Change the device transient state to DELETING_FROM_ADAPTER state till the device is removed from adapters.
currentDeviceTransientState := voltha.DeviceTransientState_DELETING_FROM_ADAPTER
if previousAdminState == ic.AdminState_PREPROVISIONED {
// Change the state to DELETING POST ADAPTER RESPONSE directly as adapters have no info of the device.
currentDeviceTransientState = voltha.DeviceTransientState_DELETING_POST_ADAPTER_RESPONSE
}
if err := agent.updateDeviceWithTransientStateAndReleaseLock(ctx, device,
currentDeviceTransientState, previousDeviceTransientState); err != nil {
desc = err.Error()
return err
}
// If the device was in pre-prov state (only parent device are in that state) then do not send the request to the
// adapter
if previousAdminState != ic.AdminState_PREPROVISIONED {
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
ch, err := agent.adapterProxy.DeleteDevice(subCtx, device)
if err != nil {
cancel()
//updating of transient state is required in error
if err := agent.updateTransientState(ctx, voltha.DeviceTransientState_DELETE_FAILED); err != nil {
logger.Errorw(ctx, "failed-to-update-transient-state-as-delete-failed", log.Fields{"device-id": agent.deviceID})
}
desc = err.Error()
return err
}
operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
go agent.waitForAdapterResponseAndLogDeviceUpdate(subCtx, cancel, "deleteDevice", ch, agent.onDeleteSuccess,
agent.onDeleteFailure, &prevState)
}
return nil
}
func (agent *Agent) setParentID(ctx context.Context, device *voltha.Device, parentID string) error {
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
logger.Debugw(ctx, "set-parent-id", log.Fields{"device-id": device.Id, "parent-id": parentID})
cloned := agent.cloneDeviceWithoutLock()
cloned.ParentId = parentID
return agent.updateDeviceAndReleaseLock(ctx, cloned)
}
// getSwitchCapability retrieves the switch capability of a parent device
func (agent *Agent) getSwitchCapability(ctx context.Context) (*ic.SwitchCapability, error) {
logger.Debugw(ctx, "get-switch-capability", log.Fields{"device-id": agent.deviceID})
device, err := agent.getDeviceReadOnly(ctx)
if err != nil {
return nil, err
}
ch, err := agent.adapterProxy.GetOfpDeviceInfo(ctx, device)
if err != nil {
return nil, err
}
// Wait for adapter response
rpcResponse, ok := <-ch
if !ok {
return nil, status.Errorf(codes.Aborted, "channel-closed")
}
if rpcResponse.Err != nil {
return nil, rpcResponse.Err
}
// Successful response
switchCap := &ic.SwitchCapability{}
if err := ptypes.UnmarshalAny(rpcResponse.Reply, switchCap); err != nil {
return nil, err
}
return switchCap, nil
}
func (agent *Agent) onPacketFailure(ctx context.Context, rpc string, response interface{}, args ...interface{}) {
// packet data is encoded in the args param as the first parameter
var packet []byte
if len(args) >= 1 {
if pkt, ok := args[0].([]byte); ok {
packet = pkt
}
}
var errResp error
if err, ok := response.(error); ok {
errResp = err
}
logger.Warnw(ctx, "packet-out-error", log.Fields{
"device-id": agent.deviceID,
"error": errResp,
"packet": hex.EncodeToString(packet),
})
}
func (agent *Agent) packetOut(ctx context.Context, outPort uint32, packet *ofp.OfpPacketOut) error {
// If deviceType=="" then we must have taken ownership of this device.
// Fixes VOL-2226 where a core would take ownership and have stale data
if agent.deviceType == "" {
agent.reconcileWithKVStore(ctx)
}
// Send packet to adapter
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
ch, err := agent.adapterProxy.PacketOut(subCtx, agent.deviceType, agent.deviceID, outPort, packet)
if err != nil {
cancel()
return nil
}
go agent.waitForAdapterResponse(subCtx, cancel, "packetOut", ch, agent.onSuccess, agent.onPacketFailure, packet.Data)
return nil
}
func (agent *Agent) updateDeviceUsingAdapterData(ctx context.Context, device *voltha.Device) error {
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
logger.Debugw(ctx, "update-device-using-adapter-data", log.Fields{"device-id": device.Id})
cloned := agent.cloneDeviceWithoutLock()
cloned.Root = device.Root
cloned.Vendor = device.Vendor
cloned.Model = device.Model
cloned.SerialNumber = device.SerialNumber
cloned.MacAddress = device.MacAddress
cloned.Vlan = device.Vlan
cloned.Reason = device.Reason
cloned.ImageDownloads = device.ImageDownloads
return agent.updateDeviceAndReleaseLock(ctx, cloned)
}
func (agent *Agent) updateDeviceStatus(ctx context.Context, operStatus voltha.OperStatus_Types, connStatus voltha.ConnectStatus_Types) error {
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
cloned := agent.cloneDeviceWithoutLock()
// Ensure the enums passed in are valid - they will be invalid if they are not set when this function is invoked
if s, ok := voltha.ConnectStatus_Types_name[int32(connStatus)]; ok {
logger.Debugw(ctx, "update-device-status-conn", log.Fields{"ok": ok, "val": s})
cloned.ConnectStatus = connStatus
}
if s, ok := voltha.OperStatus_Types_name[int32(operStatus)]; ok {
logger.Debugw(ctx, "update-device-status-conn", log.Fields{"ok": ok, "val": s})
cloned.OperStatus = operStatus
}
logger.Debugw(ctx, "update-device-status", log.Fields{"device-id": cloned.Id, "oper-status": cloned.OperStatus, "connect-status": cloned.ConnectStatus})
// Store the device
return agent.updateDeviceAndReleaseLock(ctx, cloned)
}
// TODO: A generic device update by attribute
func (agent *Agent) updateDeviceAttribute(ctx context.Context, name string, value interface{}) {
if value == nil {
return
}
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
logger.Warnw(ctx, "request-aborted", log.Fields{"device-id": agent.deviceID, "name": name, "error": err})
return
}
cloned := agent.cloneDeviceWithoutLock()
updated := false
s := reflect.ValueOf(cloned).Elem()
if s.Kind() == reflect.Struct {
// exported field
f := s.FieldByName(name)
if f.IsValid() && f.CanSet() {
switch f.Kind() {
case reflect.String:
f.SetString(value.(string))
updated = true
case reflect.Uint32:
f.SetUint(uint64(value.(uint32)))
updated = true
case reflect.Bool:
f.SetBool(value.(bool))
updated = true
}
}
}
logger.Debugw(ctx, "update-field-status", log.Fields{"device-id": cloned.Id, "name": name, "updated": updated})
// Save the data
if err := agent.updateDeviceAndReleaseLock(ctx, cloned); err != nil {
logger.Warnw(ctx, "attribute-update-failed", log.Fields{"attribute": name, "value": value})
}
}
func (agent *Agent) simulateAlarm(ctx context.Context, simulateReq *voltha.SimulateAlarmRequest) error {
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
defer agent.requestQueue.RequestComplete()
logger.Debugw(ctx, "simulate-alarm", log.Fields{"device-id": agent.deviceID})
device := agent.getDeviceReadOnlyWithoutLock()
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
ch, err := agent.adapterProxy.SimulateAlarm(subCtx, device, simulateReq)
if err != nil {
cancel()
return err
}
go agent.waitForAdapterResponse(subCtx, cancel, "simulateAlarm", ch, agent.onSuccess, agent.onFailure)
return nil
}
// This function updates the device in the DB, releases the device lock, and runs any state transitions.
// The calling function MUST hold the device lock. The caller MUST NOT modify the device after this is called.
func (agent *Agent) updateDeviceAndReleaseLock(ctx context.Context, device *voltha.Device) error {
// fail early if this agent is no longer valid
if agent.stopped {
agent.requestQueue.RequestComplete()
return errors.New("device-agent-stopped")
}
// update in db
if err := agent.dbProxy.Set(ctx, agent.deviceID, device); err != nil {
agent.requestQueue.RequestComplete()
return status.Errorf(codes.Internal, "failed-update-device:%s: %s", agent.deviceID, err)
}
logger.Debugw(ctx, "updated-device-in-store", log.Fields{"device-id: ": agent.deviceID})
prevDevice := agent.device
// update the device
agent.device = device
//If any of the states has chenged, send the change event.
if prevDevice.OperStatus != device.OperStatus || prevDevice.ConnectStatus != device.ConnectStatus || prevDevice.AdminState != device.AdminState {
_ = agent.deviceMgr.Agent.SendDeviceStateChangeEvent(ctx, prevDevice.OperStatus, prevDevice.ConnectStatus, prevDevice.AdminState, device, time.Now().Unix())
}
deviceTransientState := agent.getTransientState()
// release lock before processing transition
agent.requestQueue.RequestComplete()
subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
if err := agent.deviceMgr.stateTransitions.ProcessTransition(subCtx,
device, prevDevice, deviceTransientState, deviceTransientState); err != nil {
logger.Errorw(ctx, "failed-process-transition", log.Fields{"device-id": device.Id, "previous-admin-state": prevDevice.AdminState, "current-admin-state": device.AdminState})
// Sending RPC EVENT here
rpce := agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, err.Error(), nil)
agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce, voltha.EventCategory_COMMUNICATION,
nil, time.Now().Unix())
}
return nil
}
// This function updates the device transient in the DB through loader, releases the device lock, and runs any state transitions.
// The calling function MUST hold the device lock. The caller MUST NOT modify the device after this is called.
func (agent *Agent) updateDeviceWithTransientStateAndReleaseLock(ctx context.Context, device *voltha.Device,
transientState, prevTransientState voltha.DeviceTransientState_Types) error {
// fail early if this agent is no longer valid
if agent.stopped {
agent.requestQueue.RequestComplete()
return errors.New("device-agent-stopped")
}
//update device TransientState
if err := agent.updateTransientState(ctx, transientState); err != nil {
agent.requestQueue.RequestComplete()
return err
}
// update in db
if err := agent.dbProxy.Set(ctx, agent.deviceID, device); err != nil {
//Reverting TransientState update
err := agent.updateTransientState(ctx, prevTransientState)
logger.Errorw(ctx, "failed-to-revert-transient-state-update-on-error", log.Fields{"device-id": device.Id,
"previous-transient-state": prevTransientState, "current-transient-state": transientState})
agent.requestQueue.RequestComplete()
return status.Errorf(codes.Internal, "failed-update-device:%s: %s", agent.deviceID, err)
}
logger.Debugw(ctx, "updated-device-in-store", log.Fields{"device-id: ": agent.deviceID})
prevDevice := agent.device
// update the device
agent.device = device
//If any of the states has chenged, send the change event.
if prevDevice.OperStatus != device.OperStatus || prevDevice.ConnectStatus != device.ConnectStatus || prevDevice.AdminState != device.AdminState {
_ = agent.deviceMgr.Agent.SendDeviceStateChangeEvent(ctx, prevDevice.OperStatus, prevDevice.ConnectStatus, prevDevice.AdminState, device, time.Now().Unix())
}
// release lock before processing transition
agent.requestQueue.RequestComplete()
subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
if err := agent.deviceMgr.stateTransitions.ProcessTransition(subCtx,
device, prevDevice, transientState, prevTransientState); err != nil {
logger.Errorw(ctx, "failed-process-transition", log.Fields{"device-id": device.Id, "previous-admin-state": prevDevice.AdminState, "current-admin-state": device.AdminState})
// Sending RPC EVENT here
rpce := agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, err.Error(), nil)
agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce, voltha.EventCategory_COMMUNICATION,
nil, time.Now().Unix())
}
return nil
}
func (agent *Agent) updateDeviceReason(ctx context.Context, reason string) error {
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
logger.Debugw(ctx, "update-device-reason", log.Fields{"device-id": agent.deviceID, "reason": reason})
var desc string
operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
defer agent.logDeviceUpdate(ctx, "updateDeviceReason", nil, nil, operStatus, &desc)
cloned := agent.cloneDeviceWithoutLock()
cloned.Reason = reason
retErr := agent.updateDeviceAndReleaseLock(ctx, cloned)
if retErr != nil {
desc = retErr.Error()
} else {
operStatus.Code = common.OperationResp_OPERATION_SUCCESS
desc = reason
}
return retErr
}
func (agent *Agent) ChildDeviceLost(ctx context.Context, device *voltha.Device) error {
logger.Debugw(ctx, "child-device-lost", log.Fields{"child-device-id": device.Id, "parent-device-id": agent.deviceID})
// Remove the associated peer ports on the parent device
for portID := range agent.portLoader.ListIDs() {
if portHandle, have := agent.portLoader.Lock(portID); have {
oldPort := portHandle.GetReadOnly()
updatedPeers := make([]*voltha.Port_PeerPort, 0)
for _, peerPort := range oldPort.Peers {
if peerPort.DeviceId != device.Id {
updatedPeers = append(updatedPeers, peerPort)
}
}
newPort := *oldPort
newPort.Peers = updatedPeers
if err := portHandle.Update(ctx, &newPort); err != nil {
portHandle.Unlock()
return nil
}
portHandle.Unlock()
}
}
//send request to adapter
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
ch, err := agent.adapterProxy.ChildDeviceLost(ctx, agent.deviceType, device)
if err != nil {
cancel()
return err
}
go agent.waitForAdapterResponse(subCtx, cancel, "childDeviceLost", ch, agent.onSuccess, agent.onFailure)
return nil
}
func (agent *Agent) startOmciTest(ctx context.Context, omcitestrequest *voltha.OmciTestRequest) (*voltha.TestResponse, error) {
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
}
cloned := agent.cloneDeviceWithoutLock()
if cloned.Adapter == "" {
adapterName, err := agent.adapterMgr.GetAdapterType(cloned.Type)
if err != nil {
agent.requestQueue.RequestComplete()
return nil, err
}
cloned.Adapter = adapterName
}
// Send request to the adapter
ch, err := agent.adapterProxy.StartOmciTest(ctx, cloned, omcitestrequest)
agent.requestQueue.RequestComplete()
if err != nil {
return nil, err
}
// Wait for the adapter response
rpcResponse, ok := <-ch
if !ok {
return nil, status.Errorf(codes.Aborted, "channel-closed-device-id-%s", agent.deviceID)
}
if rpcResponse.Err != nil {
return nil, rpcResponse.Err
}
// Unmarshal and return the response
testResp := &voltha.TestResponse{}
if err := ptypes.UnmarshalAny(rpcResponse.Reply, testResp); err != nil {
return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
}
logger.Debugw(ctx, "omci_test_request-success-device-agent", log.Fields{"test-resp": testResp})
return testResp, nil
}
func (agent *Agent) getExtValue(ctx context.Context, pdevice *voltha.Device, cdevice *voltha.Device, valueparam *voltha.ValueSpecifier) (*voltha.ReturnValues, error) {
logger.Debugw(ctx, "get-ext-value", log.Fields{"device-id": agent.deviceID, "onu-id": valueparam.Id, "value-type": valueparam.Value})
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
}
//send request to adapter
ch, err := agent.adapterProxy.GetExtValue(ctx, pdevice, cdevice, valueparam.Id, valueparam.Value)
agent.requestQueue.RequestComplete()
if err != nil {
return nil, err
}
// Wait for the adapter response
rpcResponse, ok := <-ch
if !ok {
return nil, status.Errorf(codes.Aborted, "channel-closed-device-id-%s", agent.deviceID)
}
if rpcResponse.Err != nil {
return nil, rpcResponse.Err
}
// Unmarshal and return the response
Resp := &voltha.ReturnValues{}
if err := ptypes.UnmarshalAny(rpcResponse.Reply, Resp); err != nil {
return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
}
logger.Debugw(ctx, "get-ext-value-success-device-agent", log.Fields{"Resp": Resp})
return Resp, nil
}
func (agent *Agent) setExtValue(ctx context.Context, device *voltha.Device, value *voltha.ValueSet) (*empty.Empty, error) {
logger.Debugw(ctx, "set-ext-value", log.Fields{"device-id": value.Id})
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
}
//send request to adapter
ch, err := agent.adapterProxy.SetExtValue(ctx, device, value)
agent.requestQueue.RequestComplete()
if err != nil {
return nil, err
}
// Wait for the adapter response
rpcResponse, ok := <-ch
if !ok {
return nil, status.Errorf(codes.Aborted, "channel-closed-device-id-%s", agent.deviceID)
}
if rpcResponse.Err != nil {
return nil, rpcResponse.Err
}
// Unmarshal and return the response
logger.Debug(ctx, "set-ext-value-success-device-agent")
return &empty.Empty{}, nil
}
func (agent *Agent) getSingleValue(ctx context.Context, request *extension.SingleGetValueRequest) (*extension.SingleGetValueResponse, error) {
logger.Debugw(ctx, "get-single-value", log.Fields{"device-id": request.TargetId})
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
}
cloned := agent.cloneDeviceWithoutLock()
//send request to adapter
ch, err := agent.adapterProxy.GetSingleValue(ctx, cloned.Adapter, request)
agent.requestQueue.RequestComplete()
if err != nil {
return nil, err
}
// Wait for the adapter response
rpcResponse, ok := <-ch
if !ok {
return nil, status.Errorf(codes.Aborted, "channel-closed-device-id-%s", agent.deviceID)
}
if rpcResponse.Err != nil {
return nil, rpcResponse.Err
}
resp := &extension.SingleGetValueResponse{}
if err := ptypes.UnmarshalAny(rpcResponse.Reply, resp); err != nil {
return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
}
return resp, nil
}
func (agent *Agent) setSingleValue(ctx context.Context, request *extension.SingleSetValueRequest) (*extension.SingleSetValueResponse, error) {
logger.Debugw(ctx, "set-single-value", log.Fields{"device-id": request.TargetId})
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
}
cloned := agent.cloneDeviceWithoutLock()
//send request to adapter
ch, err := agent.adapterProxy.SetSingleValue(ctx, cloned.Adapter, request)
agent.requestQueue.RequestComplete()
if err != nil {
return nil, err
}
// Wait for the adapter response
rpcResponse, ok := <-ch
if !ok {
return nil, status.Errorf(codes.Aborted, "channel-closed-cloned-id-%s", agent.deviceID)
}
if rpcResponse.Err != nil {
return nil, rpcResponse.Err
}
resp := &extension.SingleSetValueResponse{}
if err := ptypes.UnmarshalAny(rpcResponse.Reply, resp); err != nil {
return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
}
return resp, nil
}
func (agent *Agent) proceedWithRequest(device *voltha.Device) bool {
return !agent.isDeletionInProgress() && !agent.isInReconcileState(device)
}
func (agent *Agent) stopReconcile() {
agent.stopReconcilingMutex.Lock()
if agent.stopReconciling != nil {
agent.stopReconciling <- 0
}
agent.stopReconcilingMutex.Unlock()
}
func (agent *Agent) ReconcileDevice(ctx context.Context, device *voltha.Device) {
var desc string
operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
desc = err.Error()
agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
return
}
if !agent.proceedWithRequest(device) {
agent.requestQueue.RequestComplete()
desc = fmt.Sprintf("deviceId:%s, Cannot complete operation as device deletion is in progress or reconciling is in progress/failed", device.Id)
logger.Errorf(ctx, desc)
agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
return
}
//set transient state to RECONCILE IN PROGRESS
err := agent.updateTransientState(ctx, voltha.DeviceTransientState_RECONCILE_IN_PROGRESS)
if err != nil {
agent.requestQueue.RequestComplete()
desc = fmt.Sprintf("Not able to set device transient state to Reconcile in progress."+
"Err: %s", err.Error())
logger.Errorf(ctx, desc)
agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
return
}
reconcilingBackoff := backoff.NewExponentialBackOff()
reconcilingBackoff.InitialInterval = agent.config.BackoffRetryInitialInterval
reconcilingBackoff.MaxElapsedTime = agent.config.BackoffRetryMaxElapsedTime
reconcilingBackoff.MaxInterval = agent.config.BackoffRetryMaxInterval
//making here to keep lifecycle of this channel within the scope of retryReconcile
agent.stopReconcilingMutex.Lock()
agent.stopReconciling = make(chan int)
agent.stopReconcilingMutex.Unlock()
// defined outside the retry loop so it can be cleaned
// up when the loop breaks
var backoffTimer *time.Timer
retry:
for {
// If the operations state of the device is RECONCILING_FAILED then we do not
// want to continue to attempt reconciliation.
deviceRef := agent.getDeviceReadOnlyWithoutLock()
if deviceRef.OperStatus == common.OperStatus_RECONCILING_FAILED {
logger.Warnw(ctx, "reconciling-failed-halting-retries",
log.Fields{"device-id": device.Id})
agent.requestQueue.RequestComplete()
break retry
}
// Use an exponential back off to prevent getting into a tight loop
duration := reconcilingBackoff.NextBackOff()
//This case should never occur in default case as max elapsed time for backoff is 0(by default) , so it will never return stop
if duration == backoff.Stop {
// If we reach a maximum then warn and reset the backoff
// timer and keep attempting.
logger.Warnw(ctx, "maximum-reconciling-backoff-reached--resetting-backoff-timer",
log.Fields{"max-reconciling-backoff": reconcilingBackoff.MaxElapsedTime,
"device-id": device.Id})
reconcilingBackoff.Reset()
duration = reconcilingBackoff.NextBackOff()
}
backoffTimer = time.NewTimer(duration)
logger.Debugw(ctx, "retrying-reconciling", log.Fields{"deviceID": device.Id})
// Send a reconcile request to the adapter.
ch, err := agent.adapterProxy.ReconcileDevice(ctx, agent.device)
//release lock before moving further
agent.requestQueue.RequestComplete()
if err != nil {
desc := fmt.Sprintf("Failed reconciling from adapter side. Err: %s", err.Error())
agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
<-backoffTimer.C
// backoffTimer expired continue
// Take lock back before retrying
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
desc = err.Error()
agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
break retry
}
continue
}
// if return err retry if not then break loop and quit retrying reconcile
if err = agent.waitForReconcileResponse(backoffTimer, ch); err != nil {
desc = err.Error()
logger.Errorf(ctx, desc)
agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
<-backoffTimer.C
} else {
operStatus = &common.OperationResp{Code: common.OperationResp_OPERATION_IN_PROGRESS}
agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
break retry
}
// Take lock back before retrying
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
desc = err.Error()
agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
break retry
}
}
// Retry loop is broken, so stop any timers and drain the channel
if backoffTimer != nil && !backoffTimer.Stop() {
// As per documentation and stack overflow when a timer is stopped its
// channel should be drained. The issue is that Stop returns false
// either if the timer has already been fired "OR" if the timer can be
// stopped before being fired. This means that in some cases the
// channel has already be emptied so attempting to read from it means
// a blocked thread. To get around this use a select so if the
// channel is already empty the default case hits and we are not
// blocked.
select {
case <-backoffTimer.C:
default:
}
}
}
func (agent *Agent) waitForReconcileResponse(backoffTimer *time.Timer, ch chan *kafka.RpcResponse) error {
select {
// wait for response
case resp, ok := <-ch:
if !ok {
//channel-closed
return errors.New("channel on which reconcile response is awaited is closed")
} else if resp.Err != nil {
//error encountered
return fmt.Errorf("error encountered while retrying reconcile. Err: %s", resp.Err.Error())
}
//In case of success quit retrying and wait for adapter to reset operation state of device
agent.stopReconcilingMutex.Lock()
agent.stopReconciling = nil
agent.stopReconcilingMutex.Unlock()
return nil
//if reconciling need to be stopped
case _, ok := <-agent.stopReconciling:
agent.stopReconcilingMutex.Lock()
agent.stopReconciling = nil
agent.stopReconcilingMutex.Unlock()
if !ok {
//channel-closed
return errors.New("channel used to notify to stop reconcile is closed")
}
return nil
//continue if timer expired
case <-backoffTimer.C:
}
return nil
}
func (agent *Agent) reconcilingCleanup(ctx context.Context) error {
var desc string
operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
desc = err.Error()
agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
return err
}
defer agent.requestQueue.RequestComplete()
err := agent.updateTransientState(ctx, voltha.DeviceTransientState_NONE)
if err != nil {
desc = fmt.Sprintf("Not able to clear device transient state from Reconcile in progress."+
"Err: %s", err.Error())
logger.Errorf(ctx, desc)
agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
return err
}
operStatus = &common.OperationResp{Code: common.OperationResp_OPERATION_SUCCESS}
agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
return nil
}