/*
 * Copyright 2018-2024 Open Networking Foundation (ONF) and the ONF Contributors

 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at

 * http://www.apache.org/licenses/LICENSE-2.0

 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package device

import (
	"context"
	"encoding/hex"
	"errors"
	"fmt"
	"reflect"
	"sync"
	"time"

	"github.com/opencord/voltha-protos/v5/go/adapter_service"
	"github.com/opencord/voltha-protos/v5/go/core"
	"github.com/opencord/voltha-protos/v5/go/omci"

	"github.com/cenkalti/backoff/v3"
	"github.com/gogo/protobuf/proto"
	"github.com/golang/protobuf/ptypes/empty"
	"github.com/opencord/voltha-go/rw_core/config"
	"github.com/opencord/voltha-go/rw_core/utils"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"

	"github.com/opencord/voltha-go/db/model"
	"github.com/opencord/voltha-go/rw_core/core/adapter"
	"github.com/opencord/voltha-go/rw_core/core/device/flow"
	"github.com/opencord/voltha-go/rw_core/core/device/group"
	"github.com/opencord/voltha-go/rw_core/core/device/port"
	"github.com/opencord/voltha-go/rw_core/core/device/transientstate"
	coreutils "github.com/opencord/voltha-go/rw_core/utils"
	"github.com/opencord/voltha-lib-go/v7/pkg/log"
	"github.com/opencord/voltha-protos/v5/go/common"
	ca "github.com/opencord/voltha-protos/v5/go/core_adapter"
	"github.com/opencord/voltha-protos/v5/go/extension"
	ofp "github.com/opencord/voltha-protos/v5/go/openflow_13"
	"github.com/opencord/voltha-protos/v5/go/voltha"
)

var errReconcileAborted = errors.New("reconcile aborted")
var errContextExpired = errors.New("context expired")
var errNoConnection = errors.New("no connection")

// Agent represents device agent attributes
type Agent struct {
	deviceID             string
	parentID             string
	deviceType           string
	adapterEndpoint      string
	isRootDevice         bool
	adapterMgr           *adapter.Manager
	deviceMgr            *Manager
	dbProxy              *model.Proxy
	exitChannel          chan int
	device               *voltha.Device
	requestQueue         *coreutils.RequestQueue
	internalTimeout      time.Duration
	rpcTimeout           time.Duration
	flowTimeout          time.Duration
	startOnce            sync.Once
	stopOnce             sync.Once
	stopped              bool
	stopReconciling      chan int
	stopReconcilingMutex sync.RWMutex
	config               *config.RWCoreFlags

	flowCache            *flow.Cache
	groupCache           *group.Cache
	portLoader           *port.Loader
	transientStateLoader *transientstate.Loader
}

// newAgent creates a new device agent. The device will be initialized when start() is called.
func newAgent(device *voltha.Device, deviceMgr *Manager, dbPath *model.Path, deviceProxy *model.Proxy, internalTimeout, rpcTimeout, flowTimeout time.Duration) *Agent {
	deviceID := device.Id
	if deviceID == "" {
		deviceID = coreutils.CreateDeviceID()
	}

	return &Agent{
		deviceID:             deviceID,
		isRootDevice:         device.Root,
		parentID:             device.ParentId,
		deviceType:           device.Type,
		adapterEndpoint:      device.AdapterEndpoint,
		deviceMgr:            deviceMgr,
		adapterMgr:           deviceMgr.adapterMgr,
		exitChannel:          make(chan int, 1),
		dbProxy:              deviceProxy,
		internalTimeout:      internalTimeout,
		rpcTimeout:           rpcTimeout,
		flowTimeout:          flowTimeout,
		device:               proto.Clone(device).(*voltha.Device),
		requestQueue:         coreutils.NewRequestQueue(),
		config:               deviceMgr.config,
		flowCache:            flow.NewCache(),
		groupCache:           group.NewCache(),
		portLoader:           port.NewLoader(dbPath.SubPath("ports").Proxy(deviceID)),
		transientStateLoader: transientstate.NewLoader(dbPath.SubPath("core").Proxy("transientstate"), deviceID),
	}
}

// start() saves the device to the data model and registers for callbacks on that device if deviceToCreate!=nil.
// Otherwise, it will load the data from the dB and setup the necessary callbacks and proxies. Returns the device that
// was started.
func (agent *Agent) start(ctx context.Context, deviceExist bool, deviceToCreate *voltha.Device) (*voltha.Device, error) {
	needToStart := false
	if agent.startOnce.Do(func() { needToStart = true }); !needToStart {
		return agent.getDeviceReadOnly(ctx)
	}
	var startSucceeded bool
	defer func() {
		if !startSucceeded {
			if err := agent.stop(ctx); err != nil {
				logger.Errorw(ctx, "failed-to-cleanup-after-unsuccessful-start", log.Fields{"device-id": agent.deviceID, "error": err})
			}
		}
	}()
	if deviceExist {
		device := deviceToCreate
		if device == nil {
			// Load from dB
			device = &voltha.Device{}
			have, err := agent.dbProxy.Get(ctx, agent.deviceID, device)
			if err != nil {
				return nil, err
			} else if !have {
				return nil, status.Errorf(codes.NotFound, "device-%s", agent.deviceID)
			}
			logger.Infow(ctx, "device-loaded-from-db", log.Fields{"device-id": agent.deviceID, "adapter-endpoint": device.AdapterEndpoint, "type": device.Type})
		}
		agent.deviceType = device.Type
		agent.adapterEndpoint = device.AdapterEndpoint
		agent.device = proto.Clone(device).(*voltha.Device)
		// load the ports from KV to cache
		agent.portLoader.Load(ctx)
		agent.transientStateLoader.Load(ctx)
	} else {
		// Create a new device
		var desc string
		var err error
		prevState := common.AdminState_UNKNOWN
		currState := common.AdminState_UNKNOWN
		requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}

		defer func() { agent.logDeviceUpdate(ctx, &prevState, &currState, requestStatus, err, desc) }()

		// Assumption is that AdminState, FlowGroups, and Flows are uninitialized since this
		// is a new device, so populate them here before passing the device to ldProxy.Set.
		// agent.deviceId will also have been set during newAgent().
		device := (proto.Clone(deviceToCreate)).(*voltha.Device)
		device.Id = agent.deviceID
		device.AdminState = voltha.AdminState_PREPROVISIONED
		currState = device.AdminState
		if !deviceToCreate.GetRoot() && deviceToCreate.ProxyAddress != nil {
			// Set the default vlan ID to the one specified by the parent adapter.  It can be
			// overwritten by the child adapter during a device update request
			device.Vlan = deviceToCreate.ProxyAddress.ChannelId
		}

		// Save the device to the model
		if err = agent.dbProxy.Set(ctx, agent.deviceID, device); err != nil {
			err = status.Errorf(codes.Aborted, "failed-adding-device-%s: %s", agent.deviceID, err)
			return nil, err
		}
		_ = agent.deviceMgr.Agent.SendDeviceStateChangeEvent(ctx, device.OperStatus, device.ConnectStatus, prevState, device, time.Now().Unix())
		requestStatus.Code = common.OperationResp_OPERATION_SUCCESS
		agent.device = device
	}
	startSucceeded = true
	log.EnrichSpan(ctx, log.Fields{"device-id": agent.deviceID})
	logger.Debugw(ctx, "device-agent-started", log.Fields{"device-id": agent.deviceID})

	return agent.getDeviceReadOnly(ctx)
}

// stop stops the device agent.  Not much to do for now
func (agent *Agent) stop(ctx context.Context) error {
	needToStop := false
	if agent.stopOnce.Do(func() { needToStop = true }); !needToStop {
		return nil
	}
	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
		return err
	}
	defer agent.requestQueue.RequestComplete()

	logger.Infow(ctx, "stopping-device-agent", log.Fields{"device-id": agent.deviceID, "parent-id": agent.parentID})
	// Remove the device transient loader
	if err := agent.deleteTransientState(ctx); err != nil {
		return err
	}
	//	Remove the device from the KV store
	if err := agent.dbProxy.Remove(ctx, agent.deviceID); err != nil {
		return err
	}
	//send the device event to the message bus
	_ = agent.deviceMgr.Agent.SendDeviceDeletedEvent(ctx, agent.device, time.Now().Unix())

	close(agent.exitChannel)

	agent.stopped = true

	logger.Infow(ctx, "device-agent-stopped", log.Fields{"device-id": agent.deviceID, "parent-id": agent.parentID})

	return nil
}

// Load the most recent state from the KVStore for the device.
func (agent *Agent) reconcileWithKVStore(ctx context.Context) {
	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
		logger.Warnw(ctx, "request-aborted", log.Fields{"device-id": agent.deviceID, "error": err})
		return
	}
	defer agent.requestQueue.RequestComplete()
	logger.Debug(ctx, "reconciling-device-agent-devicetype")
	// TODO: context timeout
	device := &voltha.Device{}
	if have, err := agent.dbProxy.Get(ctx, agent.deviceID, device); err != nil {
		logger.Errorw(ctx, "kv-get-failed", log.Fields{"device-id": agent.deviceID, "error": err})
		return
	} else if !have {
		return // not found in kv
	}

	agent.deviceType = device.Type
	agent.device = device
	agent.adapterEndpoint = device.AdapterEndpoint
	agent.portLoader.Load(ctx)
	agent.transientStateLoader.Load(ctx)

	logger.Debugw(ctx, "reconciled-device-agent-devicetype", log.Fields{"device-id": agent.deviceID, "type": agent.deviceType})
}

// onSuccess is a common callback for scenarios where we receive a nil response following a request to an adapter
func (agent *Agent) onSuccess(ctx context.Context, prevState, currState *common.AdminState_Types, deviceUpdateLog bool) {
	if deviceUpdateLog {
		requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_SUCCESS}
		desc := "adapter-response"
		agent.logDeviceUpdate(ctx, prevState, currState, requestStatus, nil, desc)
		return
	}
	logger.Debugw(ctx, "successful-operation", log.Fields{"device-id": agent.deviceID, "rpc": coreutils.GetRPCMetadataFromContext(ctx)})
}

// onFailure is a common callback for scenarios where we receive an error response following a request to an adapter
// and the only action required is to publish the failed result on kafka
func (agent *Agent) onFailure(ctx context.Context, err error, prevState, currState *common.AdminState_Types, deviceUpdateLog bool) {
	// Send an event on kafka
	rpce := agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, err.Error(), nil)
	go agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce,
		voltha.EventCategory_COMMUNICATION, nil, time.Now().Unix())

	// Log the device update event
	if deviceUpdateLog {
		requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
		desc := "adapter-response"
		agent.logDeviceUpdate(ctx, prevState, currState, requestStatus, err, desc)
		return
	}
	logger.Errorw(ctx, "failed-operation", log.Fields{"error": err, "device-id": agent.deviceID, "rpc": coreutils.GetRPCMetadataFromContext(ctx)})
}

// onForceDeleteResponse is invoked following a force delete request to an adapter.
func (agent *Agent) onForceDeleteResponse(ctx context.Context, prevState, currState *common.AdminState_Types, dErr error) {
	// Log the status
	requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_SUCCESS}
	if dErr != nil {
		requestStatus = &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
	}
	agent.logDeviceUpdate(ctx, prevState, currState, requestStatus, dErr, "adapter-force-delete-response")

	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
		logger.Errorw(ctx, "failed-getting-device-request-lock", log.Fields{"device-id": agent.deviceID, "error": err})
	}
	previousDeviceTransientState := agent.getTransientState()
	newDevice := agent.cloneDeviceWithoutLock()

	// Even on a delete error response, cleaup the device in the core
	requestStatus = &common.OperationResp{Code: common.OperationResp_OPERATION_SUCCESS}
	err := agent.updateDeviceWithTransientStateAndReleaseLock(ctx, newDevice,
		core.DeviceTransientState_DELETING_POST_ADAPTER_RESPONSE, previousDeviceTransientState)
	if err != nil {
		requestStatus = &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
	}
	agent.logDeviceUpdate(ctx, prevState, currState, requestStatus, err, "transient-state-update")
}

// onDeleteSuccess is a common callback for scenarios where we receive a nil response following a delete request
// to an adapter.
func (agent *Agent) onDeleteSuccess(ctx context.Context, prevState, currState *common.AdminState_Types) {
	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
		logger.Errorw(ctx, "delete-device-failure", log.Fields{"device-id": agent.deviceID, "error": err})
	}
	previousDeviceTransientState := agent.getTransientState()
	newDevice := agent.cloneDeviceWithoutLock()
	if err := agent.updateDeviceWithTransientStateAndReleaseLock(ctx, newDevice,
		core.DeviceTransientState_DELETING_POST_ADAPTER_RESPONSE, previousDeviceTransientState); err != nil {
		logger.Errorw(ctx, "delete-device-failure", log.Fields{"device-id": agent.deviceID, "error": err})
	}
	requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_SUCCESS}
	desc := "adapter-response"
	agent.logDeviceUpdate(ctx, prevState, currState, requestStatus, nil, desc)
}

// onDeleteFailure is a common callback for scenarios where we receive an error response following a delete request
//
//	to an adapter and the only action required is to return the error response.
func (agent *Agent) onDeleteFailure(ctx context.Context, err error, prevState, currState *common.AdminState_Types) {
	logger.Errorw(ctx, "rpc-failed", log.Fields{"rpc": coreutils.GetRPCMetadataFromContext(ctx), "device-id": agent.deviceID, "error": err})

	//Only updating of transient state is required, no transition.
	if er := agent.updateTransientState(ctx, core.DeviceTransientState_DELETE_FAILED); er != nil {
		logger.Errorw(ctx, "failed-to-update-transient-state-as-delete-failed", log.Fields{"device-id": agent.deviceID, "error": er})
	}
	rpce := agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, err.Error(), nil)
	go agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce,
		voltha.EventCategory_COMMUNICATION, nil, time.Now().Unix())

	// Log the device update event
	requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
	desc := "adapter-response"
	agent.logDeviceUpdate(ctx, prevState, currState, requestStatus, err, desc)
}

// getDeviceReadOnly returns a device which MUST NOT be modified, but is safe to keep forever.
func (agent *Agent) getDeviceReadOnly(ctx context.Context) (*voltha.Device, error) {
	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
		return nil, err
	}
	defer agent.requestQueue.RequestComplete()
	return agent.device, nil
}

// getDeviceReadOnlyWithoutLock returns a device which MUST NOT be modified, but is safe to keep forever.  This is very efficient.
// The device lock MUST be held by the caller.
func (agent *Agent) getDeviceReadOnlyWithoutLock() *voltha.Device {
	return agent.device
}

// cloneDeviceWithoutLock returns a copy of the device which is safe to modify.
// The device lock MUST be held by the caller.
func (agent *Agent) cloneDeviceWithoutLock() *voltha.Device {
	return proto.Clone(agent.device).(*voltha.Device)
}

func (agent *Agent) updateDeviceTypeAndEndpoint(ctx context.Context) error {
	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
		return err
	}
	changed := false
	cloned := agent.cloneDeviceWithoutLock()
	if cloned.Type == "" {
		adapterType, err := agent.adapterMgr.GetAdapterType(cloned.Type)
		if err != nil {
			agent.requestQueue.RequestComplete()
			return err
		}
		cloned.Type = adapterType
		changed = true
	}

	if cloned.AdapterEndpoint == "" {
		var err error
		if cloned.AdapterEndpoint, err = agent.adapterMgr.GetAdapterEndpoint(ctx, cloned.Id, cloned.Type); err != nil {
			agent.requestQueue.RequestComplete()
			return err
		}
		agent.adapterEndpoint = cloned.AdapterEndpoint
		changed = true
	}

	if changed {
		return agent.updateDeviceAndReleaseLock(ctx, cloned)
	}
	agent.requestQueue.RequestComplete()
	return nil
}

// enableDevice activates a preprovisioned or a disable device
func (agent *Agent) enableDevice(ctx context.Context) error {
	//To preserve and use oldDevice state as prev state in new device
	var err error
	var desc string
	var prevAdminState, currAdminState common.AdminState_Types
	requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}

	defer func() { agent.logDeviceUpdate(ctx, &prevAdminState, &currAdminState, requestStatus, err, desc) }()

	if err = agent.requestQueue.WaitForGreenLight(ctx); err != nil {
		return err
	}
	logger.Debugw(ctx, "enable-device", log.Fields{"device-id": agent.deviceID})

	oldDevice := agent.getDeviceReadOnlyWithoutLock()
	prevAdminState = oldDevice.AdminState

	if !agent.proceedWithRequest(oldDevice) {
		agent.requestQueue.RequestComplete()
		err = status.Errorf(codes.FailedPrecondition, "cannot complete operation as device deletion is in progress or reconciling is in progress/failed: %s", agent.deviceID)
		return err
	}
	//vol-4275 TST meeting 08/04/2021: Let EnableDevice to be called again if device is in FAILED operational state,
	//even the admin state is ENABLED.
	if oldDevice.AdminState == voltha.AdminState_ENABLED && oldDevice.OperStatus != voltha.OperStatus_FAILED {
		logger.Warnw(ctx, "device-already-enabled", log.Fields{"device-id": agent.deviceID})
		agent.requestQueue.RequestComplete()
		err = status.Errorf(codes.FailedPrecondition, fmt.Sprintf("cannot-enable-an-already-enabled-device: %s", oldDevice.Id))
		return err
	}

	// Verify whether there is a device type that supports this device type
	_, err = agent.adapterMgr.GetAdapterType(oldDevice.Type)
	if err != nil {
		agent.requestQueue.RequestComplete()
		return err
	}

	// Update device adapter endpoint if not set.  This is set once by the Core and use as is by the adapters.  E.g if this is a
	// child device then the parent adapter will use this device's adapter endpoint (set here) to communicate with it.
	newDevice := agent.cloneDeviceWithoutLock()
	if newDevice.AdapterEndpoint == "" {
		if newDevice.AdapterEndpoint, err = agent.adapterMgr.GetAdapterEndpoint(ctx, newDevice.Id, newDevice.Type); err != nil {
			agent.requestQueue.RequestComplete()
			return err
		}
		agent.adapterEndpoint = newDevice.AdapterEndpoint
	}

	// Update the Admin State and set the operational state to activating before sending the request to the Adapters
	newDevice.AdminState = voltha.AdminState_ENABLED
	newDevice.OperStatus = voltha.OperStatus_ACTIVATING

	// Adopt the device if it was in pre-provision state.  In all other cases, try to re-enable it.
	client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
	if err != nil {
		logger.Errorw(ctx, "grpc-client-nil",
			log.Fields{
				"error":            err,
				"device-id":        agent.deviceID,
				"device-type":      agent.deviceType,
				"adapter-endpoint": newDevice.AdapterEndpoint,
			})
		agent.requestQueue.RequestComplete()
		return err
	}
	subCtx, cancel := context.WithTimeout(coreutils.WithAllMetadataFromContext(ctx), agent.rpcTimeout)
	requestStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
	go func() {
		defer cancel()
		var err error
		if oldDevice.AdminState == voltha.AdminState_PREPROVISIONED {
			_, err = client.AdoptDevice(subCtx, newDevice)
		} else {
			_, err = client.ReEnableDevice(subCtx, newDevice)
		}
		if err == nil {
			agent.onSuccess(subCtx, nil, nil, true)
		} else {
			agent.onFailure(subCtx, err, nil, nil, true)
		}
	}()

	// Update device
	if err = agent.updateDeviceAndReleaseLock(ctx, newDevice); err != nil {
		return err
	}
	currAdminState = newDevice.AdminState
	return nil
}

// addFlowsAndGroups adds the "newFlows" and "newGroups" from the existing flows/groups and sends the update to the
// adapters
func (agent *Agent) addFlowsAndGroups(ctx context.Context, newFlows []*ofp.OfpFlowStats, newGroups []*ofp.OfpGroupEntry, flowMetadata *ofp.FlowMetadata) error {
	var flwResponse, grpResponse coreutils.Response
	var err error
	//if new flow list is empty then the called function returns quickly
	if flwResponse, err = agent.addFlowsToAdapter(ctx, newFlows, flowMetadata); err != nil {
		return err
	}
	//if new group list is empty then the called function returns quickly
	if grpResponse, err = agent.addGroupsToAdapter(ctx, newGroups, flowMetadata); err != nil {
		return err
	}
	if errs := coreutils.WaitForNilOrErrorResponses(agent.flowTimeout, flwResponse, grpResponse); errs != nil {
		logger.Warnw(ctx, "adapter-response", log.Fields{"device-id": agent.deviceID, "result": errs})
		return status.Errorf(codes.Aborted, "flow-failure-device-%s", agent.deviceID)
	}
	return nil
}

// deleteFlowsAndGroups removes the "flowsToDel" and "groupsToDel" from the existing flows/groups and sends the update to the
// adapters
func (agent *Agent) deleteFlowsAndGroups(ctx context.Context, flowsToDel []*ofp.OfpFlowStats, groupsToDel []*ofp.OfpGroupEntry, flowMetadata *ofp.FlowMetadata) error {
	var flwResponse, grpResponse coreutils.Response
	var err error
	if flwResponse, err = agent.deleteFlowsFromAdapter(ctx, flowsToDel, flowMetadata); err != nil {
		return err
	}
	if grpResponse, err = agent.deleteGroupsFromAdapter(ctx, groupsToDel, flowMetadata); err != nil {
		return err
	}

	if res := coreutils.WaitForNilOrErrorResponses(agent.flowTimeout, flwResponse, grpResponse); res != nil {
		return status.Errorf(codes.Aborted, "errors-%s", res)
	}
	return nil
}

// updateFlowsAndGroups replaces the existing flows and groups with "updatedFlows" and "updatedGroups" respectively. It
// also sends the updates to the adapters
func (agent *Agent) updateFlowsAndGroups(ctx context.Context, updatedFlows []*ofp.OfpFlowStats, updatedGroups []*ofp.OfpGroupEntry, flowMetadata *ofp.FlowMetadata) error {
	var flwResponse, grpResponse coreutils.Response
	var err error
	if flwResponse, err = agent.updateFlowsToAdapter(ctx, updatedFlows, flowMetadata); err != nil {
		return err
	}
	if grpResponse, err = agent.updateGroupsToAdapter(ctx, updatedGroups, flowMetadata); err != nil {
		return err
	}

	if res := coreutils.WaitForNilOrErrorResponses(agent.flowTimeout, flwResponse, grpResponse); res != nil {
		return status.Errorf(codes.Aborted, "errors-%s", res)
	}
	return nil
}

// disableDevice disable a device
func (agent *Agent) disableDevice(ctx context.Context) error {
	var err error
	var desc string
	var prevAdminState, currAdminState common.AdminState_Types
	requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
	defer func() { agent.logDeviceUpdate(ctx, &prevAdminState, &currAdminState, requestStatus, err, desc) }()

	if err = agent.requestQueue.WaitForGreenLight(ctx); err != nil {
		return err
	}
	logger.Debugw(ctx, "disable-device", log.Fields{"device-id": agent.deviceID})

	cloned := agent.cloneDeviceWithoutLock()
	prevAdminState = agent.device.AdminState

	if !agent.proceedWithRequest(cloned) {
		err = status.Errorf(codes.FailedPrecondition, "cannot complete operation as device deletion is in progress/failed: %s", agent.deviceID)
		agent.requestQueue.RequestComplete()
		return err
	}

	if cloned.AdminState == voltha.AdminState_DISABLED {
		desc = "device-already-disabled"
		agent.requestQueue.RequestComplete()
		return nil
	}
	if cloned.AdminState == voltha.AdminState_PREPROVISIONED {
		agent.requestQueue.RequestComplete()
		err = status.Errorf(codes.FailedPrecondition, "deviceId:%s, invalid-admin-state:%s", agent.deviceID, cloned.AdminState)
		return err
	}

	// Update the Admin State and operational state before sending the request out
	cloned.AdminState = voltha.AdminState_DISABLED
	cloned.OperStatus = voltha.OperStatus_UNKNOWN

	client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
	if err != nil {
		logger.Errorw(ctx, "grpc-client-nil",
			log.Fields{
				"error":            err,
				"device-id":        agent.deviceID,
				"device-type":      agent.deviceType,
				"adapter-endpoint": cloned.AdapterEndpoint,
			})
		agent.requestQueue.RequestComplete()
		return err
	}
	subCtx, cancel := context.WithTimeout(coreutils.WithAllMetadataFromContext(ctx), agent.rpcTimeout)
	requestStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
	go func() {
		defer cancel()
		_, err := client.DisableDevice(subCtx, cloned)
		if err == nil {
			agent.onSuccess(subCtx, nil, nil, true)
		} else {
			agent.onFailure(subCtx, err, nil, nil, true)
		}
	}()

	// Update device
	if err = agent.updateDeviceAndReleaseLock(ctx, cloned); err != nil {
		return err
	}
	currAdminState = cloned.AdminState

	return nil
}

func (agent *Agent) rebootDevice(ctx context.Context) error {
	var desc string
	var err error
	requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
	defer func() { agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc) }()

	if err = agent.requestQueue.WaitForGreenLight(ctx); err != nil {
		desc = err.Error()
		return err
	}
	defer agent.requestQueue.RequestComplete()
	logger.Debugw(ctx, "reboot-device", log.Fields{"device-id": agent.deviceID})

	device := agent.getDeviceReadOnlyWithoutLock()

	if !agent.proceedWithRequest(device) {
		err = status.Errorf(codes.FailedPrecondition, "cannot complete operation as device deletion is in progress or reconciling is in progress/failed:%s", agent.deviceID)
		return err
	}

	client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
	if err != nil {
		logger.Errorw(ctx, "grpc-client-nil",
			log.Fields{
				"error":            err,
				"device-id":        agent.deviceID,
				"device-type":      agent.deviceType,
				"adapter-endpoint": device.AdapterEndpoint,
			})
		return err
	}
	subCtx, cancel := context.WithTimeout(coreutils.WithAllMetadataFromContext(ctx), agent.rpcTimeout)
	requestStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
	go func() {
		defer cancel()
		_, err := client.RebootDevice(subCtx, device)
		if err == nil {
			agent.onSuccess(subCtx, nil, nil, true)
		} else {
			agent.onFailure(subCtx, err, nil, nil, true)
		}
	}()
	return nil
}

func (agent *Agent) deleteDeviceForce(ctx context.Context) error {
	logger.Debugw(ctx, "delete-device-force", log.Fields{"device-id": agent.deviceID})

	var desc string
	var err error
	requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
	defer func() { agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc) }()

	if err = agent.requestQueue.WaitForGreenLight(ctx); err != nil {
		return err
	}
	// Get the device Transient state, return err if it is DELETING
	previousDeviceTransientState := agent.getTransientState()
	device := agent.cloneDeviceWithoutLock()
	if !agent.isForceDeletingAllowed(previousDeviceTransientState, device) {
		agent.requestQueue.RequestComplete()
		err = status.Error(codes.FailedPrecondition, fmt.Sprintf("deviceId:%s, force deletion is in progress", agent.deviceID))
		return err
	}

	previousAdminState := device.AdminState
	if previousAdminState != common.AdminState_PREPROVISIONED {
		var client adapter_service.AdapterServiceClient
		client, err = agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
		if err != nil {
			logger.Errorw(ctx, "grpc-client-nil",
				log.Fields{
					"error":            err,
					"device-id":        agent.deviceID,
					"device-type":      agent.deviceType,
					"adapter-endpoint": device.AdapterEndpoint,
				})
			agent.requestQueue.RequestComplete()
			return fmt.Errorf("remote-not-reachable %w", errNoConnection)
		}
		subCtx, cancel := context.WithTimeout(coreutils.WithAllMetadataFromContext(ctx), agent.rpcTimeout)
		requestStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
		go func() {
			defer cancel()
			_, err := client.DeleteDevice(subCtx, device)
			if err == nil {
				agent.onSuccess(subCtx, nil, nil, true)
			} else {
				agent.onFailure(subCtx, err, nil, nil, true)
			}
		}()
	}

	// Update device
	if err = agent.updateDeviceWithTransientStateAndReleaseLock(ctx, device,
		core.DeviceTransientState_FORCE_DELETING, previousDeviceTransientState); err != nil {
		return err
	}
	return nil
}

func (agent *Agent) deleteDevice(ctx context.Context) error {
	logger.Debugw(ctx, "delete-device", log.Fields{"device-id": agent.deviceID})

	var desc string
	var err error
	requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
	defer func() { agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc) }()

	if err = agent.requestQueue.WaitForGreenLight(ctx); err != nil {
		desc = err.Error()
		return err
	}

	device := agent.cloneDeviceWithoutLock()

	if !agent.proceedWithRequest(device) {
		agent.requestQueue.RequestComplete()
		err = status.Errorf(codes.FailedPrecondition, "cannot complete operation as device deletion is in progress or reconciling is in progress/failed: %s", agent.deviceID)
		return err
	}

	// Get the device Transient state, return err if it is DELETING
	previousDeviceTransientState := agent.getTransientState()

	previousAdminState := device.AdminState
	// Change the device transient state to DELETING_FROM_ADAPTER  state till the device is removed from adapters.
	currentDeviceTransientState := core.DeviceTransientState_DELETING_FROM_ADAPTER

	if previousAdminState == common.AdminState_PREPROVISIONED {
		// Change the state to DELETING POST ADAPTER RESPONSE directly as adapters have no info of the device.
		currentDeviceTransientState = core.DeviceTransientState_DELETING_POST_ADAPTER_RESPONSE
	}
	// Update device and release lock
	if err = agent.updateDeviceWithTransientStateAndReleaseLock(ctx, device,
		currentDeviceTransientState, previousDeviceTransientState); err != nil {
		desc = err.Error()
		return err
	}
	// If the device was in pre-prov state (only parent device are in that state) then do not send the request to the
	// adapter
	if previousAdminState != common.AdminState_PREPROVISIONED {
		var client adapter_service.AdapterServiceClient
		client, err = agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
		if err != nil {
			logger.Errorw(ctx, "grpc-client-nil",
				log.Fields{
					"error":            err,
					"device-id":        agent.deviceID,
					"device-type":      agent.deviceType,
					"adapter-endpoint": device.AdapterEndpoint,
				})
			agent.requestQueue.RequestComplete()
			return err
		}
		subCtx, cancel := context.WithTimeout(coreutils.WithAllMetadataFromContext(ctx), agent.rpcTimeout)
		requestStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
		if _, err = client.DeleteDevice(subCtx, device); err != nil {
			agent.onDeleteFailure(subCtx, err, &previousAdminState, &agent.device.AdminState)
		} else {
			agent.onDeleteSuccess(subCtx, &previousAdminState, &agent.device.AdminState)
		}
		cancel()
	}
	return err
}

func (agent *Agent) setParentID(ctx context.Context, device *voltha.Device, parentID string) error {
	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
		return err
	}
	logger.Debugw(ctx, "set-parent-id", log.Fields{"device-id": device.Id, "parent-id": parentID})

	cloned := agent.cloneDeviceWithoutLock()
	cloned.ParentId = parentID
	return agent.updateDeviceAndReleaseLock(ctx, cloned)
}

// getSwitchCapability retrieves the switch capability of a parent device
func (agent *Agent) getSwitchCapability(ctx context.Context) (*ca.SwitchCapability, error) {
	logger.Debugw(ctx, "get-switch-capability", log.Fields{"device-id": agent.deviceID})

	device, err := agent.getDeviceReadOnly(ctx)
	if err != nil {
		return nil, err
	}

	// Get the gRPC client
	client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
	if err != nil {
		return nil, err
	}

	return client.GetOfpDeviceInfo(ctx, device)
}

func (agent *Agent) onPacketFailure(ctx context.Context, err error, packet *ofp.OfpPacketOut) {
	logger.Errorw(ctx, "packet-out-error", log.Fields{
		"device-id": agent.deviceID,
		"error":     err.Error(),
		"packet":    hex.EncodeToString(packet.Data),
	})
	rpce := agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, err.Error(), nil)
	go agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce,
		voltha.EventCategory_COMMUNICATION, nil, time.Now().Unix())
}

func (agent *Agent) packetOut(ctx context.Context, outPort uint32, packet *ofp.OfpPacketOut) error {
	if agent.deviceType == "" {
		agent.reconcileWithKVStore(ctx)
	}
	//	Send packet to adapter
	client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
	if err != nil {
		logger.Errorw(ctx, "grpc-client-nil",
			log.Fields{
				"error":       err,
				"device-id":   agent.deviceID,
				"device-type": agent.deviceType,
			})
		return err
	}
	subCtx, cancel := context.WithTimeout(coreutils.WithAllMetadataFromContext(ctx), agent.rpcTimeout)
	go func() {
		defer cancel()
		_, err := client.SendPacketOut(subCtx, &ca.PacketOut{
			DeviceId:     agent.deviceID,
			EgressPortNo: outPort,
			Packet:       packet,
		})
		if err == nil {
			agent.onSuccess(subCtx, nil, nil, false)
		} else {
			agent.onPacketFailure(subCtx, err, packet)
		}
	}()
	return nil
}

func (agent *Agent) updateDeviceUsingAdapterData(ctx context.Context, device *voltha.Device) error {
	var err error
	var desc string
	requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
	defer func() { agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc) }()

	if err = agent.requestQueue.WaitForGreenLight(ctx); err != nil {
		return err
	}
	logger.Debugw(ctx, "update-device-using-adapter-data", log.Fields{"device-id": device.Id})

	cloned := agent.cloneDeviceWithoutLock()
	cloned.Root = device.Root
	cloned.Vendor = device.Vendor
	cloned.Model = device.Model
	cloned.SerialNumber = device.SerialNumber
	cloned.MacAddress = device.MacAddress
	cloned.Vlan = device.Vlan
	cloned.Reason = device.Reason
	cloned.ImageDownloads = device.ImageDownloads
	cloned.OperStatus = device.OperStatus
	cloned.ConnectStatus = device.ConnectStatus
	if err = agent.updateDeviceAndReleaseLock(ctx, cloned); err == nil {
		requestStatus.Code = common.OperationResp_OPERATION_SUCCESS
	}
	return err
}

func (agent *Agent) updateDeviceStatus(ctx context.Context, operStatus voltha.OperStatus_Types, connStatus voltha.ConnectStatus_Types) error {
	var err error
	var desc string
	opStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
	defer func() { agent.logDeviceUpdate(ctx, nil, nil, opStatus, err, desc) }()

	if err = agent.requestQueue.WaitForGreenLight(ctx); err != nil {
		return err
	}

	cloned := agent.cloneDeviceWithoutLock()
	// Ensure the enums passed in are valid - they will be invalid if they are not set when this function is invoked
	if s, ok := voltha.ConnectStatus_Types_name[int32(connStatus)]; ok {
		logger.Debugw(ctx, "update-device-conn-status", log.Fields{"ok": ok, "val": s})
		cloned.ConnectStatus = connStatus
	}
	if s, ok := voltha.OperStatus_Types_name[int32(operStatus)]; ok {
		logger.Debugw(ctx, "update-device-oper-status", log.Fields{"ok": ok, "val": s})
		cloned.OperStatus = operStatus
	}
	logger.Debugw(ctx, "update-device-status", log.Fields{"device-id": cloned.Id, "oper-status": cloned.OperStatus, "connect-status": cloned.ConnectStatus})
	// Store the device
	if err = agent.updateDeviceAndReleaseLock(ctx, cloned); err == nil {
		opStatus.Code = common.OperationResp_OPERATION_SUCCESS
	}
	return err
}

// TODO: A generic device update by attribute
func (agent *Agent) updateDeviceAttribute(ctx context.Context, name string, value interface{}) {
	if value == nil {
		return
	}

	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
		logger.Warnw(ctx, "request-aborted", log.Fields{"device-id": agent.deviceID, "name": name, "error": err})
		return
	}

	cloned := agent.cloneDeviceWithoutLock()
	updated := false
	s := reflect.ValueOf(cloned).Elem()
	if s.Kind() == reflect.Struct {
		// exported field
		f := s.FieldByName(name)
		if f.IsValid() && f.CanSet() {
			switch f.Kind() {
			case reflect.String:
				f.SetString(value.(string))
				updated = true
			case reflect.Uint32:
				f.SetUint(uint64(value.(uint32)))
				updated = true
			case reflect.Bool:
				f.SetBool(value.(bool))
				updated = true
			}
		}
	}
	logger.Debugw(ctx, "update-field-status", log.Fields{"device-id": cloned.Id, "name": name, "updated": updated})
	//	Save the data

	if err := agent.updateDeviceAndReleaseLock(ctx, cloned); err != nil {
		logger.Warnw(ctx, "attribute-update-failed", log.Fields{"attribute": name, "value": value})
	}
}

func (agent *Agent) simulateAlarm(ctx context.Context, simulateReq *voltha.SimulateAlarmRequest) error {
	var err error
	var desc string
	requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
	defer func() { agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc) }()

	if err = agent.requestQueue.WaitForGreenLight(ctx); err != nil {
		return err
	}
	defer agent.requestQueue.RequestComplete()
	logger.Debugw(ctx, "simulate-alarm", log.Fields{"device-id": agent.deviceID})

	device := agent.getDeviceReadOnlyWithoutLock()

	client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
	if err != nil {
		logger.Errorw(ctx, "grpc-client-nil",
			log.Fields{
				"error":            err,
				"device-id":        agent.deviceID,
				"device-type":      agent.deviceType,
				"adapter-endpoint": device.AdapterEndpoint,
			})
		return err
	}
	subCtx, cancel := context.WithTimeout(coreutils.WithAllMetadataFromContext(ctx), agent.rpcTimeout)
	requestStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
	go func() {
		defer cancel()
		_, err := client.SimulateAlarm(subCtx, &ca.SimulateAlarmMessage{Device: device, Request: simulateReq})
		if err == nil {
			agent.onSuccess(subCtx, nil, nil, false)
		} else {
			agent.onFailure(subCtx, err, nil, nil, false)
		}
	}()
	return nil
}

// This function updates the device in the DB, releases the device lock, and runs any state transitions.
// The calling function MUST hold the device lock.  The caller MUST NOT modify the device after this is called.
func (agent *Agent) updateDeviceAndReleaseLock(ctx context.Context, device *voltha.Device) error {
	// fail early if this agent is no longer valid
	if agent.stopped {
		agent.requestQueue.RequestComplete()
		return errors.New("device-agent-stopped")
	}

	// update in db
	if err := agent.dbProxy.Set(ctx, agent.deviceID, device); err != nil {
		agent.requestQueue.RequestComplete()
		return status.Errorf(codes.Internal, "failed-update-device:%s: %s", agent.deviceID, err)
	}
	logger.Debugw(ctx, "updated-device-in-store", log.Fields{"device-id: ": agent.deviceID})

	prevDevice := agent.device
	// update the device
	agent.device = device
	//If any of the states has chenged, send the change event.
	if prevDevice.OperStatus != device.OperStatus || prevDevice.ConnectStatus != device.ConnectStatus || prevDevice.AdminState != device.AdminState {
		_ = agent.deviceMgr.Agent.SendDeviceStateChangeEvent(ctx, prevDevice.OperStatus, prevDevice.ConnectStatus, prevDevice.AdminState, device, time.Now().Unix())
	}
	deviceTransientState := agent.getTransientState()

	// release lock before processing transition
	agent.requestQueue.RequestComplete()
	subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)

	if err := agent.deviceMgr.stateTransitions.ProcessTransition(subCtx,
		device, prevDevice, deviceTransientState, deviceTransientState); err != nil {
		logger.Errorw(ctx, "failed-process-transition", log.Fields{"device-id": device.Id, "previous-admin-state": prevDevice.AdminState, "current-admin-state": device.AdminState})
		// Sending RPC EVENT here
		rpce := agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, err.Error(), nil)
		agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce, voltha.EventCategory_COMMUNICATION,
			nil, time.Now().Unix())

	}
	return nil
}

// This function updates the device transient in the DB through loader, releases the device lock, and runs any state transitions.
// The calling function MUST hold the device lock.  The caller MUST NOT modify the device after this is called.
func (agent *Agent) updateDeviceWithTransientStateAndReleaseLock(ctx context.Context, device *voltha.Device,
	transientState, prevTransientState core.DeviceTransientState_Types) error {
	// fail early if this agent is no longer valid
	if agent.stopped {
		agent.requestQueue.RequestComplete()
		return errors.New("device-agent-stopped")
	}
	//update device TransientState
	if err := agent.updateTransientState(ctx, transientState); err != nil {
		agent.requestQueue.RequestComplete()
		return err
	}
	// update in db
	if err := agent.dbProxy.Set(ctx, agent.deviceID, device); err != nil {
		//Reverting TransientState update
		if errTransient := agent.updateTransientState(ctx, prevTransientState); errTransient != nil {
			logger.Errorw(ctx, "failed-to-revert-transient-state-update-on-error", log.Fields{"device-id": device.Id,
				"previous-transient-state": prevTransientState, "current-transient-state": transientState, "error": errTransient})
		}
		agent.requestQueue.RequestComplete()
		return status.Errorf(codes.Internal, "failed-update-device:%s: %s", agent.deviceID, err)
	}

	logger.Debugw(ctx, "updated-device-in-store", log.Fields{"device-id: ": agent.deviceID})

	prevDevice := agent.device
	// update the device
	agent.device = device
	//If any of the states has chenged, send the change event.
	if prevDevice.OperStatus != device.OperStatus || prevDevice.ConnectStatus != device.ConnectStatus || prevDevice.AdminState != device.AdminState {
		_ = agent.deviceMgr.Agent.SendDeviceStateChangeEvent(ctx, prevDevice.OperStatus, prevDevice.ConnectStatus, prevDevice.AdminState, device, time.Now().Unix())
	}

	// release lock before processing transition
	agent.requestQueue.RequestComplete()

	if err := agent.deviceMgr.stateTransitions.ProcessTransition(ctx,
		device, prevDevice, transientState, prevTransientState); err != nil {
		logger.Errorw(ctx, "failed-process-transition", log.Fields{"device-id": device.Id, "previous-admin-state": prevDevice.AdminState, "current-admin-state": device.AdminState})
		// Sending RPC EVENT here
		rpce := agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, err.Error(), nil)
		go agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce, voltha.EventCategory_COMMUNICATION,
			nil, time.Now().Unix())
	}

	return nil
}
func (agent *Agent) updateDeviceReason(ctx context.Context, reason string) error {
	logger.Debugw(ctx, "update-device-reason", log.Fields{"device-id": agent.deviceID, "reason": reason})

	var err error
	var desc string
	requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
	defer func() { agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc) }()

	if err = agent.requestQueue.WaitForGreenLight(ctx); err != nil {
		return err
	}

	cloned := agent.cloneDeviceWithoutLock()
	cloned.Reason = reason
	if err = agent.updateDeviceAndReleaseLock(ctx, cloned); err == nil {
		requestStatus.Code = common.OperationResp_OPERATION_SUCCESS
	}
	return err
}

func (agent *Agent) ChildDeviceLost(ctx context.Context, device *voltha.Device) error {
	logger.Debugw(ctx, "child-device-lost", log.Fields{"child-device-id": device.Id, "parent-device-id": agent.deviceID})

	var err error
	var desc string
	requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
	defer func() { agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc) }()

	// Remove the associated peer ports on the parent device
	for portID := range agent.portLoader.ListIDs() {
		if portHandle, have := agent.portLoader.Lock(portID); have {
			oldPort := portHandle.GetReadOnly()
			updatedPeers := make([]*voltha.Port_PeerPort, 0)
			for _, peerPort := range oldPort.Peers {
				if peerPort.DeviceId != device.Id {
					updatedPeers = append(updatedPeers, peerPort)
				}
			}
			newPort := *oldPort
			newPort.Peers = updatedPeers
			if err := portHandle.Update(ctx, &newPort); err != nil {
				portHandle.Unlock()
				return nil
			}
			portHandle.Unlock()
		}
	}

	//send request to adapter
	client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
	if err != nil {
		logger.Errorw(ctx, "grpc-client-nil",
			log.Fields{
				"error":            err,
				"device-id":        agent.deviceID,
				"device-type":      agent.deviceType,
				"adapter-endpoint": device.AdapterEndpoint,
			})
		return err
	}
	subCtx, cancel := context.WithTimeout(coreutils.WithAllMetadataFromContext(ctx), agent.rpcTimeout)
	requestStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS

	defer cancel()
	_, err = client.ChildDeviceLost(subCtx, device)
	if err == nil {
		agent.onSuccess(subCtx, nil, nil, true)
	} else {
		agent.onFailure(subCtx, err, nil, nil, true)
	}

	return nil
}

func (agent *Agent) startOmciTest(ctx context.Context, omcitestrequest *omci.OmciTestRequest) (*omci.TestResponse, error) {
	var err error
	var desc string
	requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
	defer func() { agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc) }()

	// OMCI test may be performed on a pre-provisioned device.  If a device is in that state both its device type and endpoint
	// may not have been set yet.
	// First check if we need to update the type or endpoint
	cloned, err := agent.getDeviceReadOnly(ctx)
	if err != nil {
		return nil, err
	}
	if cloned.Type == "" || cloned.AdapterEndpoint == "" {
		if err = agent.updateDeviceTypeAndEndpoint(ctx); err != nil {
			return nil, err
		}
		cloned, err = agent.getDeviceReadOnly(ctx)
		if err != nil {
			return nil, err
		}
	}

	// Send request to the adapter
	client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
	if err != nil {
		logger.Errorw(ctx, "grpc-client-nil",
			log.Fields{
				"error":            err,
				"device-id":        agent.deviceID,
				"device-type":      agent.deviceType,
				"adapter-endpoint": cloned.AdapterEndpoint,
			})
		return nil, err
	}

	res, err := client.StartOmciTest(ctx, &ca.OMCITest{
		Device:  cloned,
		Request: omcitestrequest,
	})
	if err == nil {
		requestStatus.Code = common.OperationResp_OPERATION_SUCCESS
	}
	return res, err
}

func (agent *Agent) getExtValue(ctx context.Context, pdevice *voltha.Device, cdevice *voltha.Device, valueparam *extension.ValueSpecifier) (*extension.ReturnValues, error) {
	logger.Debugw(ctx, "get-ext-value", log.Fields{"device-id": agent.deviceID, "onu-id": valueparam.Id, "value-type": valueparam.Value})
	var err error
	var desc string
	requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
	defer func() { agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc) }()

	if err = agent.requestQueue.WaitForGreenLight(ctx); err != nil {
		return nil, err
	}

	//send request to adapter synchronously
	client, err := agent.adapterMgr.GetAdapterClient(ctx, pdevice.AdapterEndpoint)
	if err != nil {
		logger.Errorw(ctx, "grpc-client-nil",
			log.Fields{
				"error":            err,
				"device-id":        agent.deviceID,
				"device-type":      agent.deviceType,
				"adapter-endpoint": pdevice.AdapterEndpoint,
			})
		agent.requestQueue.RequestComplete()
		return nil, err
	}

	// Release lock before sending to adapter
	agent.requestQueue.RequestComplete()

	retVal, err := client.GetExtValue(ctx, &ca.GetExtValueMessage{
		ParentDevice: pdevice,
		ChildDevice:  cdevice,
		ValueType:    valueparam.Value,
	})
	if err == nil {
		requestStatus.Code = common.OperationResp_OPERATION_SUCCESS
	}
	return retVal, err
}

func (agent *Agent) setExtValue(ctx context.Context, device *voltha.Device, value *extension.ValueSet) (*empty.Empty, error) {
	logger.Debugw(ctx, "set-ext-value", log.Fields{"device-id": value.Id})

	var err error
	var desc string
	requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
	defer func() { agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc) }()

	if err = agent.requestQueue.WaitForGreenLight(ctx); err != nil {
		return nil, err
	}

	//send request to adapter
	//send request to adapter synchronously
	client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
	if err != nil {
		logger.Errorw(ctx, "grpc-client-nil",
			log.Fields{
				"error":            err,
				"device-id":        agent.deviceID,
				"device-type":      agent.deviceType,
				"adapter-endpoint": device.AdapterEndpoint,
			})
		agent.requestQueue.RequestComplete()
		return nil, err
	}
	// Release lock before sending request to adapter
	agent.requestQueue.RequestComplete()

	retVal, err := client.SetExtValue(ctx, &ca.SetExtValueMessage{
		Device: device,
		Value:  value,
	})
	if err == nil {
		requestStatus.Code = common.OperationResp_OPERATION_SUCCESS
	}
	return retVal, err
}

func (agent *Agent) getSingleValue(ctx context.Context, request *extension.SingleGetValueRequest) (*extension.SingleGetValueResponse, error) {
	logger.Debugw(ctx, "get-single-value", log.Fields{"device-id": request.TargetId})

	var err error
	var desc string
	requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
	defer func() { agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc) }()

	if err = agent.requestQueue.WaitForGreenLight(ctx); err != nil {
		return nil, err
	}

	cloned := agent.cloneDeviceWithoutLock()

	//send request to adapter
	client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
	if err != nil {
		logger.Errorw(ctx, "grpc-client-nil",
			log.Fields{
				"error":            err,
				"device-id":        cloned.Id,
				"adapter-endpoint": cloned.AdapterEndpoint,
			})
		agent.requestQueue.RequestComplete()
		return nil, err
	}
	// Release lock before sending request to adapter
	agent.requestQueue.RequestComplete()

	resp, err := client.GetSingleValue(ctx, request)
	if err == nil {
		requestStatus.Code = common.OperationResp_OPERATION_SUCCESS
	}
	return resp, err
}

func (agent *Agent) setSingleValue(ctx context.Context, request *extension.SingleSetValueRequest) (*extension.SingleSetValueResponse, error) {
	logger.Debugw(ctx, "set-single-value", log.Fields{"device-id": request.TargetId})

	var err error
	var desc string
	requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
	defer func() { agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc) }()

	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
		return nil, err
	}

	cloned := agent.cloneDeviceWithoutLock()

	//send request to adapter
	client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
	if err != nil {
		logger.Errorw(ctx, "grpc-client-nil",
			log.Fields{
				"error":            err,
				"device-id":        agent.deviceID,
				"device-type":      agent.deviceType,
				"adapter-endpoint": cloned.AdapterEndpoint,
			})
		agent.requestQueue.RequestComplete()
		return nil, err
	}
	// Release lock before sending request to adapter
	agent.requestQueue.RequestComplete()

	resp, err := client.SetSingleValue(ctx, request)
	if err == nil {
		requestStatus.Code = common.OperationResp_OPERATION_SUCCESS
	}
	return resp, err
}

func (agent *Agent) proceedWithRequest(device *voltha.Device) bool {
	return !agent.isDeletionInProgress() && !agent.isInReconcileState(device)
}

func (agent *Agent) stopReconcile() {
	agent.stopReconcilingMutex.Lock()
	if agent.stopReconciling != nil {
		agent.stopReconciling <- 0
	}
	agent.stopReconcilingMutex.Unlock()
}

// abortAllProcessing is invoked when an adapter managing this device is restarted
func (agent *Agent) abortAllProcessing(ctx context.Context) error {
	logger.Infow(ctx, "aborting-current-running-requests", log.Fields{"device-id": agent.deviceID})
	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
		return err
	}
	logger.Infow(ctx, "aborting-current-running-requests-after-wait", log.Fields{"device-id": agent.deviceID})

	defer agent.requestQueue.RequestComplete()

	// If any reconciling is in progress just abort it. The adapter is gone.
	agent.stopReconcile()

	logger.Infow(ctx, "aborting-current-running-requests-after-sendstop", log.Fields{"device-id": agent.deviceID})

	// Update the Core device transient state accordingly
	var updatedState core.DeviceTransientState_Types
	switch agent.getTransientState() {
	case core.DeviceTransientState_RECONCILE_IN_PROGRESS:
		updatedState = core.DeviceTransientState_NONE
	case core.DeviceTransientState_FORCE_DELETING:
		updatedState = core.DeviceTransientState_DELETE_FAILED
	case core.DeviceTransientState_DELETING_FROM_ADAPTER:
		updatedState = core.DeviceTransientState_DELETE_FAILED
	case core.DeviceTransientState_DELETE_FAILED:
		// do not change state
		return nil
	default:
		updatedState = core.DeviceTransientState_NONE
	}
	if err := agent.updateTransientState(ctx, updatedState); err != nil {
		logger.Errorf(ctx, "transient-state-update-failed", log.Fields{"error": err})
		return err
	}
	return nil
}

func (agent *Agent) DeleteDevicePostAdapterRestart(ctx context.Context) error {
	logger.Debugw(ctx, "delete-post-restart", log.Fields{"device-id": agent.deviceID})
	ctx = utils.WithNewSpanAndRPCMetadataContext(ctx, "DelteDevicePostAdapterRestart")

	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
		return err
	}

	device := agent.getDeviceReadOnlyWithoutLock()
	if device.AdminState == voltha.AdminState_PREPROVISIONED {
		logger.Debugw(ctx, "device-in-preprovisioning-state-reconcile-not-needed", log.Fields{"device-id": device.Id})
		agent.requestQueue.RequestComplete()
		return nil
	}
	// Change device transient state to FORCE_DELETING
	if err := agent.updateTransientState(ctx, core.DeviceTransientState_FORCE_DELETING); err != nil {
		logger.Errorw(ctx, "failure-updating-transient-state", log.Fields{"error": err, "device-id": agent.deviceID})
		agent.requestQueue.RequestComplete()
		return err
	}

	// Ensure we have a valid grpc client available as we have just restarted
	deleteBackoff := backoff.NewExponentialBackOff()
	deleteBackoff.InitialInterval = agent.config.BackoffRetryInitialInterval
	deleteBackoff.MaxElapsedTime = agent.config.BackoffRetryMaxElapsedTime
	deleteBackoff.MaxInterval = agent.config.BackoffRetryMaxInterval
	var backoffTimer *time.Timer
	var err error
	var client adapter_service.AdapterServiceClient
retry:
	for {
		client, err = agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
		if err == nil {
			break retry
		}
		duration := deleteBackoff.NextBackOff()
		if duration == backoff.Stop {
			deleteBackoff.Reset()
			duration = deleteBackoff.NextBackOff()
		}
		backoffTimer = time.NewTimer(duration)
		select {
		case <-backoffTimer.C:
			logger.Debugw(ctx, "backoff-timer-expires", log.Fields{"device-id": agent.deviceID})
		case <-ctx.Done():
			err = ctx.Err()
			break retry
		}
	}
	if backoffTimer != nil && !backoffTimer.Stop() {
		select {
		case <-backoffTimer.C:
		default:
		}
	}
	if err != nil || client == nil {
		agent.requestQueue.RequestComplete()
		return err
	}

	// Release the device lock to allow for device state update, if any
	agent.requestQueue.RequestComplete()

	// Send the delete request to the adapter
	subCtx, cancel := context.WithTimeout(coreutils.WithAllMetadataFromContext(ctx), agent.rpcTimeout)
	defer cancel()
	_, err = client.DeleteDevice(subCtx, device)
	agent.onForceDeleteResponse(subCtx, nil, nil, err)
	return err
}

func (agent *Agent) ReconcileDevice(ctx context.Context) {
	// Do not reconcile if the device was in DELETE_FAILED transient state.  Just invoke the force delete on that device.
	state := agent.getTransientState()
	logger.Debugw(ctx, "starting-reconcile", log.Fields{"device-id": agent.deviceID, "state": state})
	if agent.getTransientState() == core.DeviceTransientState_DELETE_FAILED {
		if err := agent.DeleteDevicePostAdapterRestart(ctx); err != nil {
			logger.Errorw(ctx, "delete-post-restart-failed", log.Fields{"error": err, "device-id": agent.deviceID})
		}
		return
	}

	requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
	var desc string

	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
		agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc)
		return
	}

	device := agent.getDeviceReadOnlyWithoutLock()
	if device.AdminState == voltha.AdminState_PREPROVISIONED {
		agent.requestQueue.RequestComplete()
		logger.Debugw(ctx, "device-in-preprovisioning-state-reconcile-not-needed", log.Fields{"device-id": device.Id})
		return
	}

	if agent.isDeletionInProgress() {
		agent.requestQueue.RequestComplete()
		err := fmt.Errorf("cannot complete operation as device deletion is in progress device : %s", device.Id)
		logger.Errorw(ctx, "reconcile-failed", log.Fields{"error": err})
		agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc)
		return
	}

	//set transient state to RECONCILE IN PROGRESS
	err := agent.updateTransientState(ctx, core.DeviceTransientState_RECONCILE_IN_PROGRESS)
	if err != nil {
		agent.requestQueue.RequestComplete()
		logger.Errorw(ctx, "setting-transient-state-failed", log.Fields{"error": err})
		agent.logDeviceUpdate(ctx, nil, nil, requestStatus, nil, desc)
		return
	}

	reconcilingBackoff := backoff.NewExponentialBackOff()
	reconcilingBackoff.InitialInterval = agent.config.BackoffRetryInitialInterval
	reconcilingBackoff.MaxElapsedTime = agent.config.BackoffRetryMaxElapsedTime
	reconcilingBackoff.MaxInterval = agent.config.BackoffRetryMaxInterval

	//making here to keep lifecycle of this channel within the scope of retryReconcile
	agent.stopReconcilingMutex.Lock()
	agent.stopReconciling = make(chan int, 1)
	agent.stopReconcilingMutex.Unlock()

	// defined outside the retry loop so it can be cleaned
	// up when the loop breaks
	var backoffTimer *time.Timer

retry:
	for {
		// If the operations state of the device is RECONCILING_FAILED then we do not
		// want to continue to attempt reconciliation.
		deviceRef := agent.getDeviceReadOnlyWithoutLock()
		if deviceRef.OperStatus == common.OperStatus_RECONCILING_FAILED {
			logger.Warnw(ctx, "reconciling-failed-halting-retries",
				log.Fields{"device-id": device.Id})
			agent.requestQueue.RequestComplete()
			break retry
		}

		// Use an exponential back off to prevent getting into a tight loop
		duration := reconcilingBackoff.NextBackOff()
		//This case should never occur in default case as max elapsed time for backoff is 0(by default) , so it will never return stop
		if duration == backoff.Stop {
			// If we reach a maximum then warn and reset the backoff
			// timer and keep attempting.
			logger.Warnw(ctx, "maximum-reconciling-backoff-reached--resetting-backoff-timer",
				log.Fields{"max-reconciling-backoff": reconcilingBackoff.MaxElapsedTime,
					"device-id": device.Id})
			reconcilingBackoff.Reset()
			duration = reconcilingBackoff.NextBackOff()
		}

		backoffTimer = time.NewTimer(duration)

		logger.Debugw(ctx, "retrying-reconciling", log.Fields{"deviceID": device.Id, "endpoint": device.AdapterEndpoint})
		// Release lock before sending request to adapter
		agent.requestQueue.RequestComplete()

		// Send a reconcile request to the adapter.
		err := agent.sendReconcileRequestToAdapter(ctx, device)

		// Check the transient state after a response from the adapter.   If a device delete
		// request was issued due to a callback during that time and failed then just delete
		// the device and stop the reconcile loop and invoke the device deletion
		if agent.getTransientState() == core.DeviceTransientState_DELETE_FAILED {
			if dErr := agent.DeleteDevicePostAdapterRestart(ctx); dErr != nil {
				logger.Errorw(ctx, "delete-post-restart-failed", log.Fields{"error": dErr, "device-id": agent.deviceID})
			}
			break retry
		}

		if errors.Is(err, errContextExpired) || errors.Is(err, errReconcileAborted) {
			logger.Errorw(ctx, "reconcile-aborted", log.Fields{"error": err})
			requestStatus = &common.OperationResp{Code: common.OperationResp_OperationReturnCode(common.OperStatus_FAILED)}
			desc = "aborted"
			agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc)
			break retry
		}
		st, ok := status.FromError(err)
		if ok {
			// Decode the error code and error message
			errorCode := st.Code()
			if errorCode == codes.AlreadyExists {
				logger.Warnw(ctx, "device already reconciled", log.Fields{"error": err})
				err := agent.reconcilingCleanup(ctx)
				if err != nil {
					logger.Errorf(ctx, "error during reconcile cleanup", err.Error())
				}
				break retry

			}

		}
		if err != nil {
			agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc)
			<-backoffTimer.C
			// backoffTimer expired continue
			// Take lock back before retrying
			if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
				agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc)
				break retry
			}
			continue
		}
		// Success
		requestStatus = &common.OperationResp{Code: common.OperationResp_OPERATION_SUCCESS}
		desc = "adapter-response"
		agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc)
		break retry
	}

	logger.Debugw(ctx, "reconcile-retry-ends", log.Fields{"adapter-endpoint": agent.adapterEndpoint})

	// Retry loop is broken, so stop any timers and drain the channel
	if backoffTimer != nil && !backoffTimer.Stop() {

		// As per documentation and stack overflow when a timer is stopped its
		// channel should be drained. The issue is that Stop returns false
		// either if the timer has already been fired "OR" if the timer can be
		// stopped before being fired. This means that in some cases the
		// channel has already be emptied so attempting to read from it means
		// a blocked thread. To get around this use a select so if the
		// channel is already empty the default case hits and we are not
		// blocked.
		select {
		case <-backoffTimer.C:
		default:
		}
	}
}

func (agent *Agent) sendReconcileRequestToAdapter(ctx context.Context, device *voltha.Device) error {
	logger.Debugw(ctx, "sending-reconcile-to-adapter", log.Fields{"device-id": device.Id, "endpoint": agent.adapterEndpoint})
	client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
	if err != nil {
		return err
	}
	adapterResponse := make(chan error)
	go func() {
		_, err := client.ReconcileDevice(ctx, device)
		adapterResponse <- err
	}()
	select {
	// wait for response
	case err := <-adapterResponse:
		if err != nil {
			return err
		}
		//In case of success quit retrying and wait for adapter to reset operation state of device
		agent.stopReconcilingMutex.Lock()
		agent.stopReconciling = nil
		agent.stopReconcilingMutex.Unlock()
		return nil

	//if reconciling need to be stopped
	case _, ok := <-agent.stopReconciling:
		agent.stopReconcilingMutex.Lock()
		agent.stopReconciling = nil
		agent.stopReconcilingMutex.Unlock()
		if !ok {
			//channel-closed
			return fmt.Errorf("reconcile channel closed:%w", errReconcileAborted)
		}
		return fmt.Errorf("reconciling aborted:%w", errReconcileAborted)
	// Context expired
	case <-ctx.Done():
		return fmt.Errorf("context expired:%s :%w", ctx.Err(), errContextExpired)
	}
}

func (agent *Agent) reconcilingCleanup(ctx context.Context) error {
	var desc string
	var err error
	operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
	defer func() { agent.logDeviceUpdate(ctx, nil, nil, operStatus, err, desc) }()

	if err = agent.requestQueue.WaitForGreenLight(ctx); err != nil {
		desc = "reconcile-cleanup-failed"
		return err
	}
	defer agent.requestQueue.RequestComplete()
	err = agent.updateTransientState(ctx, core.DeviceTransientState_NONE)
	if err != nil {
		logger.Errorf(ctx, "transient-state-update-failed", log.Fields{"error": err})
		return err
	}
	operStatus.Code = common.OperationResp_OPERATION_SUCCESS
	return nil
}

func (agent *Agent) isAdapterConnectionUp(ctx context.Context) bool {
	c, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
	return c != nil && err == nil
}

func (agent *Agent) canDeviceRequestProceed(ctx context.Context) error {
	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
		return err
	}
	defer agent.requestQueue.RequestComplete()
	if agent.proceedWithRequest(agent.device) {
		return nil
	}
	return fmt.Errorf("device-cannot-process-request-%s", agent.deviceID)
}
