[VOL-4291] Rw-core updates for gRPC migration
Change-Id: I8d5a554409115b29318089671ca4e1ab3fa98810
diff --git a/rw_core/core/device/agent.go b/rw_core/core/device/agent.go
index cd879c6..1a83cb2 100755
--- a/rw_core/core/device/agent.go
+++ b/rw_core/core/device/agent.go
@@ -25,9 +25,11 @@
"sync"
"time"
+ "github.com/opencord/voltha-protos/v5/go/adapter_services"
+ "github.com/opencord/voltha-protos/v5/go/core"
+
"github.com/cenkalti/backoff/v3"
"github.com/gogo/protobuf/proto"
- "github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/empty"
"github.com/opencord/voltha-go/rw_core/config"
"google.golang.org/grpc/codes"
@@ -38,32 +40,34 @@
"github.com/opencord/voltha-go/rw_core/core/device/flow"
"github.com/opencord/voltha-go/rw_core/core/device/group"
"github.com/opencord/voltha-go/rw_core/core/device/port"
- "github.com/opencord/voltha-go/rw_core/core/device/remote"
"github.com/opencord/voltha-go/rw_core/core/device/transientstate"
coreutils "github.com/opencord/voltha-go/rw_core/utils"
- "github.com/opencord/voltha-lib-go/v5/pkg/kafka"
- "github.com/opencord/voltha-lib-go/v5/pkg/log"
- "github.com/opencord/voltha-protos/v4/go/common"
- "github.com/opencord/voltha-protos/v4/go/extension"
- ic "github.com/opencord/voltha-protos/v4/go/inter_container"
- ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
- "github.com/opencord/voltha-protos/v4/go/voltha"
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
+ "github.com/opencord/voltha-protos/v5/go/common"
+ "github.com/opencord/voltha-protos/v5/go/extension"
+ ic "github.com/opencord/voltha-protos/v5/go/inter_container"
+ ofp "github.com/opencord/voltha-protos/v5/go/openflow_13"
+ "github.com/opencord/voltha-protos/v5/go/voltha"
)
+var errReconcileAborted = errors.New("reconcile aborted")
+var errContextExpired = errors.New("context expired")
+
// Agent represents device agent attributes
type Agent struct {
deviceID string
parentID string
deviceType string
+ adapterEndpoint string
isRootDevice bool
- adapterProxy *remote.AdapterProxy
adapterMgr *adapter.Manager
deviceMgr *Manager
dbProxy *model.Proxy
exitChannel chan int
device *voltha.Device
requestQueue *coreutils.RequestQueue
- defaultTimeout time.Duration
+ internalTimeout time.Duration
+ rpcTimeout time.Duration
startOnce sync.Once
stopOnce sync.Once
stopped bool
@@ -78,7 +82,7 @@
}
//newAgent creates a new device agent. The device will be initialized when start() is called.
-func newAgent(ap *remote.AdapterProxy, device *voltha.Device, deviceMgr *Manager, dbPath *model.Path, deviceProxy *model.Proxy, timeout time.Duration) *Agent {
+func newAgent(device *voltha.Device, deviceMgr *Manager, dbPath *model.Path, deviceProxy *model.Proxy, internalTimeout, rpcTimeout time.Duration) *Agent {
deviceID := device.Id
if deviceID == "" {
deviceID = coreutils.CreateDeviceID()
@@ -86,15 +90,16 @@
return &Agent{
deviceID: deviceID,
- adapterProxy: ap,
isRootDevice: device.Root,
parentID: device.ParentId,
deviceType: device.Type,
+ adapterEndpoint: device.AdapterEndpoint,
deviceMgr: deviceMgr,
adapterMgr: deviceMgr.adapterMgr,
exitChannel: make(chan int, 1),
dbProxy: deviceProxy,
- defaultTimeout: timeout,
+ internalTimeout: internalTimeout,
+ rpcTimeout: rpcTimeout,
device: proto.Clone(device).(*voltha.Device),
requestQueue: coreutils.NewRequestQueue(),
config: deviceMgr.config,
@@ -132,22 +137,23 @@
} else if !have {
return nil, status.Errorf(codes.NotFound, "device-%s", agent.deviceID)
}
+ logger.Infow(ctx, "device-loaded-from-db", log.Fields{"device-id": agent.deviceID, "adapter-endpoint": device.AdapterEndpoint, "type": device.Type})
}
- agent.deviceType = device.Adapter
+ agent.deviceType = device.Type
+ agent.adapterEndpoint = device.AdapterEndpoint
agent.device = proto.Clone(device).(*voltha.Device)
// load the ports from KV to cache
agent.portLoader.Load(ctx)
agent.transientStateLoader.Load(ctx)
-
- logger.Infow(ctx, "device-loaded-from-db", log.Fields{"device-id": agent.deviceID})
} else {
// Create a new device
var desc string
+ var err error
prevState := common.AdminState_UNKNOWN
currState := common.AdminState_UNKNOWN
- operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+ requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
- defer agent.logDeviceUpdate(ctx, "createDevice", &prevState, &currState, operStatus, &desc)
+ defer func() { agent.logDeviceUpdate(ctx, &prevState, &currState, requestStatus, err, desc) }()
// Assumption is that AdminState, FlowGroups, and Flows are uninitialized since this
// is a new device, so populate them here before passing the device to ldProxy.Set.
@@ -162,13 +168,13 @@
device.Vlan = deviceToCreate.ProxyAddress.ChannelId
}
- // Add the initial device to the local model
- if err := agent.dbProxy.Set(ctx, agent.deviceID, device); err != nil {
- desc = fmt.Sprintf("failed-adding-device-%s: %s", agent.deviceID, err.Error())
- return nil, status.Errorf(codes.Aborted, "failed-adding-device-%s: %s", agent.deviceID, err)
+ // Save the device to the model
+ if err = agent.dbProxy.Set(ctx, agent.deviceID, device); err != nil {
+ err = status.Errorf(codes.Aborted, "failed-adding-device-%s: %s", agent.deviceID, err)
+ return nil, err
}
_ = agent.deviceMgr.Agent.SendDeviceStateChangeEvent(ctx, device.OperStatus, device.ConnectStatus, prevState, device, time.Now().Unix())
- operStatus.Code = common.OperationResp_OPERATION_SUCCESS
+ requestStatus.Code = common.OperationResp_OPERATION_SUCCESS
agent.device = device
}
startSucceeded = true
@@ -225,8 +231,9 @@
return // not found in kv
}
- agent.deviceType = device.Adapter
+ agent.deviceType = device.Type
agent.device = device
+ agent.adapterEndpoint = device.AdapterEndpoint
agent.portLoader.Load(ctx)
agent.transientStateLoader.Load(ctx)
@@ -234,139 +241,68 @@
}
// onSuccess is a common callback for scenarios where we receive a nil response following a request to an adapter
-// and the only action required is to publish a successful result on kafka
-func (agent *Agent) onSuccess(ctx context.Context, rpc string, response interface{}, reqArgs ...interface{}) {
- logger.Debugw(ctx, "response-successful", log.Fields{"rpc": rpc, "device-id": agent.deviceID})
- // TODO: Post success message onto kafka
+func (agent *Agent) onSuccess(ctx context.Context, prevState, currState *common.AdminState_Types, deviceUpdateLog bool) {
+ if deviceUpdateLog {
+ requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_SUCCESS}
+ desc := "adapter-response"
+ agent.logDeviceUpdate(ctx, prevState, currState, requestStatus, nil, desc)
+ return
+ }
+ logger.Debugw(ctx, "successful-operation", log.Fields{"device-id": agent.deviceID, "rpc": coreutils.GetRPCMetadataFromContext(ctx)})
}
// onFailure is a common callback for scenarios where we receive an error response following a request to an adapter
// and the only action required is to publish the failed result on kafka
-func (agent *Agent) onFailure(ctx context.Context, rpc string, response interface{}, reqArgs ...interface{}) {
- if res, ok := response.(error); ok {
- logger.Errorw(ctx, "rpc-failed", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "error": res, "args": reqArgs})
- } else {
- logger.Errorw(ctx, "rpc-failed-invalid-error", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "args": reqArgs})
- }
- // TODO: Post failure message onto kafka
-}
+func (agent *Agent) onFailure(ctx context.Context, err error, prevState, currState *common.AdminState_Types, deviceUpdateLog bool) {
+ // Send an event on kafka
+ rpce := agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, err.Error(), nil)
+ go agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce,
+ voltha.EventCategory_COMMUNICATION, nil, time.Now().Unix())
-func (agent *Agent) waitForAdapterForceDeleteResponse(ctx context.Context, cancel context.CancelFunc, rpc string, ch chan *kafka.RpcResponse,
- onSuccess coreutils.ResponseCallback, onFailure coreutils.ResponseCallback, reqArgs ...interface{}) {
- defer cancel()
- select {
- case rpcResponse, ok := <-ch:
- if !ok {
- onFailure(ctx, rpc, status.Errorf(codes.Aborted, "channel-closed"), reqArgs)
- } else if rpcResponse.Err != nil {
- onFailure(ctx, rpc, rpcResponse.Err, reqArgs)
- } else {
- onSuccess(ctx, rpc, rpcResponse.Reply, reqArgs)
- }
- case <-ctx.Done():
- onFailure(ctx, rpc, ctx.Err(), reqArgs)
+ // Log the device update event
+ if deviceUpdateLog {
+ requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+ desc := "adapter-response"
+ agent.logDeviceUpdate(ctx, prevState, currState, requestStatus, err, desc)
+ return
}
+ logger.Errorw(ctx, "failed-operation", log.Fields{"error": err, "device-id": agent.deviceID, "rpc": coreutils.GetRPCMetadataFromContext(ctx)})
}
// onDeleteSuccess is a common callback for scenarios where we receive a nil response following a delete request
// to an adapter.
-func (agent *Agent) onDeleteSuccess(ctx context.Context, rpc string, response interface{}, reqArgs ...interface{}) {
- logger.Debugw(ctx, "response-successful", log.Fields{"rpc": rpc, "device-id": agent.deviceID})
+func (agent *Agent) onDeleteSuccess(ctx context.Context, prevState, currState *common.AdminState_Types) {
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- logger.Errorw(ctx, "delete-device-failure", log.Fields{"device-id": agent.deviceID, "error": err, "args": reqArgs})
+ logger.Errorw(ctx, "delete-device-failure", log.Fields{"device-id": agent.deviceID, "error": err})
}
previousDeviceTransientState := agent.getTransientState()
newDevice := agent.cloneDeviceWithoutLock()
if err := agent.updateDeviceWithTransientStateAndReleaseLock(ctx, newDevice,
- voltha.DeviceTransientState_DELETING_POST_ADAPTER_RESPONSE, previousDeviceTransientState); err != nil {
- logger.Errorw(ctx, "delete-device-failure", log.Fields{"device-id": agent.deviceID, "error": err, "args": reqArgs})
+ core.DeviceTransientState_DELETING_POST_ADAPTER_RESPONSE, previousDeviceTransientState); err != nil {
+ logger.Errorw(ctx, "delete-device-failure", log.Fields{"device-id": agent.deviceID, "error": err})
}
+ requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_SUCCESS}
+ desc := "adapter-response"
+ agent.logDeviceUpdate(ctx, prevState, currState, requestStatus, nil, desc)
}
// onDeleteFailure is a common callback for scenarios where we receive an error response following a delete request
// to an adapter and the only action required is to return the error response.
-func (agent *Agent) onDeleteFailure(ctx context.Context, rpc string, response interface{}, reqArgs ...interface{}) {
- if res, ok := response.(error); ok {
- logger.Errorw(ctx, "rpc-failed", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "error": res, "args": reqArgs})
- } else {
- logger.Errorw(ctx, "rpc-failed-invalid-error", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "args": reqArgs})
- }
+func (agent *Agent) onDeleteFailure(ctx context.Context, err error, prevState, currState *common.AdminState_Types) {
+ logger.Errorw(ctx, "rpc-failed", log.Fields{"rpc": coreutils.GetRPCMetadataFromContext(ctx), "device-id": agent.deviceID, "error": err})
+
//Only updating of transient state is required, no transition.
- if err := agent.updateTransientState(ctx, voltha.DeviceTransientState_DELETE_FAILED); err != nil {
- logger.Errorw(ctx, "failed-to-update-transient-state-as-delete-failed", log.Fields{"device-id": agent.deviceID})
+ if er := agent.updateTransientState(ctx, core.DeviceTransientState_DELETE_FAILED); er != nil {
+ logger.Errorw(ctx, "failed-to-update-transient-state-as-delete-failed", log.Fields{"device-id": agent.deviceID, "error": er})
}
+ rpce := agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, err.Error(), nil)
+ go agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce,
+ voltha.EventCategory_COMMUNICATION, nil, time.Now().Unix())
-}
-
-func (agent *Agent) waitForAdapterResponse(ctx context.Context, cancel context.CancelFunc, rpc string, ch chan *kafka.RpcResponse,
- onSuccess coreutils.ResponseCallback, onFailure coreutils.ResponseCallback, reqArgs ...interface{}) {
- defer cancel()
- var rpce *voltha.RPCEvent
- defer func() {
- if rpce != nil {
- agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce,
- voltha.EventCategory_COMMUNICATION, nil, time.Now().Unix())
- }
- }()
- select {
- case rpcResponse, ok := <-ch:
- if !ok {
- rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, "Response Channel Closed", nil)
- onFailure(ctx, rpc, status.Errorf(codes.Aborted, "channel-closed"), reqArgs)
- //add failure
- } else if rpcResponse.Err != nil {
- rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, rpcResponse.Err.Error(), nil)
- onFailure(ctx, rpc, rpcResponse.Err, reqArgs)
- //add failure
- } else {
- onSuccess(ctx, rpc, rpcResponse.Reply, reqArgs)
- }
- case <-ctx.Done():
- rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, ctx.Err().Error(), nil)
- onFailure(ctx, rpc, ctx.Err(), reqArgs)
- }
-}
-
-func (agent *Agent) waitForAdapterResponseAndLogDeviceUpdate(ctx context.Context, cancel context.CancelFunc, rpc string, ch chan *kafka.RpcResponse,
- onSuccess coreutils.ResponseCallback, onFailure coreutils.ResponseCallback, prevState *common.AdminState_Types, reqArgs ...interface{}) {
- defer cancel()
- var desc string
- operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
- defer func() {
- currAdminState := prevState
- if d, _ := agent.getDeviceReadOnly(ctx); d != nil {
- currAdminState = &d.AdminState
- }
- agent.logDeviceUpdate(ctx, rpc, prevState, currAdminState, operStatus, &desc)
- }()
- var rpce *voltha.RPCEvent
- defer func() {
- if rpce != nil {
- agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce,
- voltha.EventCategory_COMMUNICATION, nil, time.Now().Unix())
- }
- }()
-
- select {
- case rpcResponse, ok := <-ch:
- if !ok {
- rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, "Response Channel Closed", nil)
- onFailure(ctx, rpc, status.Errorf(codes.Aborted, "channel-closed"), reqArgs)
- //add failure
- } else if rpcResponse.Err != nil {
- desc = rpcResponse.Err.Error()
- rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, desc, nil)
- onFailure(ctx, rpc, rpcResponse.Err, reqArgs)
- //add failure
- } else {
- operStatus.Code = common.OperationResp_OPERATION_SUCCESS
- onSuccess(ctx, rpc, rpcResponse.Reply, reqArgs)
- }
- case <-ctx.Done():
- desc = ctx.Err().Error()
- rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, desc, nil)
- onFailure(ctx, rpc, ctx.Err(), reqArgs)
- }
+ // Log the device update event
+ requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+ desc := "adapter-response"
+ agent.logDeviceUpdate(ctx, prevState, currState, requestStatus, err, desc)
}
// getDeviceReadOnly returns a device which MUST NOT be modified, but is safe to keep forever.
@@ -390,117 +326,128 @@
return proto.Clone(agent.device).(*voltha.Device)
}
+func (agent *Agent) updateDeviceTypeAndEndpoint(ctx context.Context) error {
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ changed := false
+ cloned := agent.cloneDeviceWithoutLock()
+ if cloned.Type == "" {
+ adapterType, err := agent.adapterMgr.GetAdapterType(cloned.Type)
+ if err != nil {
+ agent.requestQueue.RequestComplete()
+ return err
+ }
+ cloned.Type = adapterType
+ changed = true
+ }
+
+ if cloned.AdapterEndpoint == "" {
+ var err error
+ if cloned.AdapterEndpoint, err = agent.adapterMgr.GetAdapterEndpoint(ctx, cloned.Id, cloned.Type); err != nil {
+ agent.requestQueue.RequestComplete()
+ return err
+ }
+ agent.adapterEndpoint = cloned.AdapterEndpoint
+ changed = true
+ }
+
+ if changed {
+ return agent.updateDeviceAndReleaseLock(ctx, cloned)
+ }
+ agent.requestQueue.RequestComplete()
+ return nil
+}
+
// enableDevice activates a preprovisioned or a disable device
func (agent *Agent) enableDevice(ctx context.Context) error {
//To preserve and use oldDevice state as prev state in new device
+ var err error
var desc string
- operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+ var prevAdminState, currAdminState common.AdminState_Types
+ requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
- defer agent.logDeviceUpdate(ctx, "enableDevice", nil, nil, operStatus, &desc)
+ defer func() { agent.logDeviceUpdate(ctx, &prevAdminState, &currAdminState, requestStatus, err, desc) }()
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ if err = agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
logger.Debugw(ctx, "enable-device", log.Fields{"device-id": agent.deviceID})
- prevDeviceState := agent.device.AdminState
-
oldDevice := agent.getDeviceReadOnlyWithoutLock()
+ prevAdminState = oldDevice.AdminState
if !agent.proceedWithRequest(oldDevice) {
agent.requestQueue.RequestComplete()
-
- desc = fmt.Sprintf("deviceId:%s, Cannot complete operation as device deletion is in progress or reconcile is in progress/failed.", agent.deviceID)
- return status.Error(codes.FailedPrecondition, desc)
+ err = status.Errorf(codes.FailedPrecondition, "cannot complete operation as device deletion is in progress or reconciling is in progress/failed: %s", agent.deviceID)
+ return err
}
if oldDevice.AdminState == voltha.AdminState_ENABLED {
logger.Warnw(ctx, "device-already-enabled", log.Fields{"device-id": agent.deviceID})
agent.requestQueue.RequestComplete()
- desc = fmt.Sprintf("cannot-enable-an-already-enabled-device: %s", oldDevice.Id)
- return status.Error(codes.FailedPrecondition, desc)
- }
-
- // First figure out which adapter will handle this device type. We do it at this stage as allow devices to be
- // pre-provisioned with the required adapter not registered. At this stage, since we need to communicate
- // with the adapter then we need to know the adapter that will handle this request
- adapterName, err := agent.adapterMgr.GetAdapterType(oldDevice.Type)
- if err != nil {
- agent.requestQueue.RequestComplete()
- desc = err.Error()
+ err = status.Errorf(codes.FailedPrecondition, fmt.Sprintf("cannot-enable-an-already-enabled-device: %s", oldDevice.Id))
return err
}
+ // Verify whether there is a device type that supports this device type
+ _, err = agent.adapterMgr.GetAdapterType(oldDevice.Type)
+ if err != nil {
+ agent.requestQueue.RequestComplete()
+ return err
+ }
+
+ // Update device adapter endpoint if not set. This is set once by the Core and use as is by the adapters. E.g if this is a
+ // child device then the parent adapter will use this device's adapter endpoint (set here) to communicate with it.
newDevice := agent.cloneDeviceWithoutLock()
- newDevice.Adapter = adapterName
+ if newDevice.AdapterEndpoint == "" {
+ if newDevice.AdapterEndpoint, err = agent.adapterMgr.GetAdapterEndpoint(ctx, newDevice.Id, newDevice.Type); err != nil {
+ agent.requestQueue.RequestComplete()
+ return err
+ }
+ agent.adapterEndpoint = newDevice.AdapterEndpoint
+ }
// Update the Admin State and set the operational state to activating before sending the request to the Adapters
newDevice.AdminState = voltha.AdminState_ENABLED
newDevice.OperStatus = voltha.OperStatus_ACTIVATING
- if err := agent.updateDeviceAndReleaseLock(ctx, newDevice); err != nil {
- desc = err.Error()
- return err
- }
-
// Adopt the device if it was in pre-provision state. In all other cases, try to re-enable it.
- var ch chan *kafka.RpcResponse
- subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
- subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
- subCtx = coreutils.WithFromTopicMetadataFromContext(subCtx, ctx)
-
- if oldDevice.AdminState == voltha.AdminState_PREPROVISIONED {
- ch, err = agent.adapterProxy.AdoptDevice(subCtx, newDevice)
- } else {
- ch, err = agent.adapterProxy.ReEnableDevice(subCtx, newDevice)
- }
+ client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
if err != nil {
- cancel()
- desc = err.Error()
+ logger.Errorw(ctx, "grpc-client-nil",
+ log.Fields{
+ "error": err,
+ "device-id": agent.deviceID,
+ "device-type": agent.deviceType,
+ "adapter-endpoint": newDevice.AdapterEndpoint,
+ })
+ agent.requestQueue.RequestComplete()
return err
}
-
- operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
-
- // Wait for response
- go agent.waitForAdapterResponseAndLogDeviceUpdate(subCtx, cancel, "enableDevice", ch, agent.onSuccess, agent.onFailure, &prevDeviceState)
- return nil
-}
-
-func (agent *Agent) waitForAdapterFlowResponse(ctx context.Context, cancel context.CancelFunc, rpc string, ch chan *kafka.RpcResponse, response coreutils.Response) {
- defer cancel()
- var desc string
- operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
- defer agent.logDeviceUpdate(ctx, rpc, nil, nil, operStatus, &desc)
-
- var rpce *voltha.RPCEvent
- defer func() {
- if rpce != nil {
- agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce,
- voltha.EventCategory_COMMUNICATION, nil, time.Now().Unix())
+ subCtx, cancel := context.WithTimeout(coreutils.WithAllMetadataFromContext(ctx), agent.rpcTimeout)
+ requestStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
+ go func() {
+ defer cancel()
+ var err error
+ if oldDevice.AdminState == voltha.AdminState_PREPROVISIONED {
+ _, err = client.AdoptDevice(subCtx, newDevice)
+ } else {
+ _, err = client.ReEnableDevice(subCtx, newDevice)
+ }
+ if err == nil {
+ agent.onSuccess(subCtx, nil, nil, true)
+ } else {
+ agent.onFailure(subCtx, err, nil, nil, true)
}
}()
- select {
- case rpcResponse, ok := <-ch:
- if !ok {
- //add failure
- desc = "Response Channel Closed"
- rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, "Response Channel Closed", nil)
- response.Error(status.Errorf(codes.Aborted, "channel-closed"))
- } else if rpcResponse.Err != nil {
- //add failure
- desc = rpcResponse.Err.Error()
- rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, desc, nil)
- response.Error(rpcResponse.Err)
- } else {
- operStatus.Code = common.OperationResp_OPERATION_SUCCESS
- response.Done()
- }
- case <-ctx.Done():
- desc = ctx.Err().Error()
- rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, desc, nil)
- response.Error(ctx.Err())
+
+ // Update device
+ if err = agent.updateDeviceAndReleaseLock(ctx, newDevice); err != nil {
+ return err
}
+ currAdminState = newDevice.AdminState
+ return nil
}
//addFlowsAndGroups adds the "newFlows" and "newGroups" from the existing flows/groups and sends the update to the
@@ -516,8 +463,8 @@
if grpResponse, err = agent.addGroupsToAdapter(ctx, newGroups, flowMetadata); err != nil {
return err
}
- if errs := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, flwResponse, grpResponse); errs != nil {
- logger.Warnw(ctx, "no-adapter-response", log.Fields{"device-id": agent.deviceID, "result": errs})
+ if errs := coreutils.WaitForNilOrErrorResponses(agent.rpcTimeout, flwResponse, grpResponse); errs != nil {
+ logger.Warnw(ctx, "adapter-response", log.Fields{"device-id": agent.deviceID, "result": errs})
return status.Errorf(codes.Aborted, "flow-failure-device-%s", agent.deviceID)
}
return nil
@@ -535,7 +482,7 @@
return err
}
- if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, flwResponse, grpResponse); res != nil {
+ if res := coreutils.WaitForNilOrErrorResponses(agent.rpcTimeout, flwResponse, grpResponse); res != nil {
return status.Errorf(codes.Aborted, "errors-%s", res)
}
return nil
@@ -553,7 +500,7 @@
return err
}
- if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, flwResponse, grpResponse); res != nil {
+ if res := coreutils.WaitForNilOrErrorResponses(agent.rpcTimeout, flwResponse, grpResponse); res != nil {
return status.Errorf(codes.Aborted, "errors-%s", res)
}
return nil
@@ -561,73 +508,81 @@
//disableDevice disable a device
func (agent *Agent) disableDevice(ctx context.Context) error {
+ var err error
var desc string
- operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+ var prevAdminState, currAdminState common.AdminState_Types
+ requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+ defer func() { agent.logDeviceUpdate(ctx, &prevAdminState, &currAdminState, requestStatus, err, desc) }()
- prevDeviceState := agent.device.AdminState
-
- defer agent.logDeviceUpdate(ctx, "disableDevice", nil, nil, operStatus, &desc)
-
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- desc = err.Error()
+ if err = agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
logger.Debugw(ctx, "disable-device", log.Fields{"device-id": agent.deviceID})
cloned := agent.cloneDeviceWithoutLock()
+ prevAdminState = agent.device.AdminState
if !agent.proceedWithRequest(cloned) {
+ err = status.Errorf(codes.FailedPrecondition, "cannot complete operation as device deletion is in progress or reconciling is in progress/failed: %s", agent.deviceID)
agent.requestQueue.RequestComplete()
- desc = fmt.Sprintf("deviceId:%s,Cannot complete operation as device deletion is in progress or reconciling is in progress/failed.", agent.deviceID)
- return status.Errorf(codes.FailedPrecondition, desc)
+ return err
}
if cloned.AdminState == voltha.AdminState_DISABLED {
desc = "device-already-disabled"
- logger.Debugw(ctx, "device-already-disabled", log.Fields{"device-id": agent.deviceID})
agent.requestQueue.RequestComplete()
return nil
}
if cloned.AdminState == voltha.AdminState_PREPROVISIONED {
agent.requestQueue.RequestComplete()
- desc = fmt.Sprintf("deviceId:%s, invalid-admin-state:%s", agent.deviceID, cloned.AdminState)
- return status.Errorf(codes.FailedPrecondition, "deviceId:%s, invalid-admin-state:%s", agent.deviceID, cloned.AdminState)
+ err = status.Errorf(codes.FailedPrecondition, "deviceId:%s, invalid-admin-state:%s", agent.deviceID, cloned.AdminState)
+ return err
}
// Update the Admin State and operational state before sending the request out
cloned.AdminState = voltha.AdminState_DISABLED
cloned.OperStatus = voltha.OperStatus_UNKNOWN
- if err := agent.updateDeviceAndReleaseLock(ctx, cloned); err != nil {
- return err
- }
-
- subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
- subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
-
- ch, err := agent.adapterProxy.DisableDevice(subCtx, cloned)
+ client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
if err != nil {
- cancel()
- desc = err.Error()
+ logger.Errorw(ctx, "grpc-client-nil",
+ log.Fields{
+ "error": err,
+ "device-id": agent.deviceID,
+ "device-type": agent.deviceType,
+ "adapter-endpoint": cloned.AdapterEndpoint,
+ })
+ agent.requestQueue.RequestComplete()
return err
}
- operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
+ subCtx, cancel := context.WithTimeout(coreutils.WithAllMetadataFromContext(ctx), agent.rpcTimeout)
+ requestStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
+ go func() {
+ defer cancel()
+ _, err := client.DisableDevice(subCtx, cloned)
+ if err == nil {
+ agent.onSuccess(subCtx, nil, nil, true)
+ } else {
+ agent.onFailure(subCtx, err, nil, nil, true)
+ }
+ }()
- // Wait for response
- go agent.waitForAdapterResponseAndLogDeviceUpdate(subCtx, cancel, "disableDevice", ch, agent.onSuccess, agent.onFailure, &prevDeviceState)
+ // Update device
+ if err = agent.updateDeviceAndReleaseLock(ctx, cloned); err != nil {
+ return err
+ }
+ currAdminState = cloned.AdminState
return nil
}
func (agent *Agent) rebootDevice(ctx context.Context) error {
var desc string
- operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+ var err error
+ requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+ defer func() { agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc) }()
- prevDeviceState := agent.device.AdminState
-
- defer agent.logDeviceUpdate(ctx, "rebootDevice", nil, nil, operStatus, &desc)
-
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ if err = agent.requestQueue.WaitForGreenLight(ctx); err != nil {
desc = err.Error()
return err
}
@@ -637,22 +592,32 @@
device := agent.getDeviceReadOnlyWithoutLock()
if !agent.proceedWithRequest(device) {
- desc = fmt.Sprintf("deviceId:%s, Cannot complete operation as device deletion is in progress or reconciling is in progress/failed.", agent.deviceID)
- return status.Errorf(codes.FailedPrecondition, desc)
- }
- subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
- subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
-
- ch, err := agent.adapterProxy.RebootDevice(subCtx, device)
- if err != nil {
- cancel()
- desc = err.Error()
+ err = status.Errorf(codes.FailedPrecondition, "cannot complete operation as device deletion is in progress or reconciling is in progress/failed:%s", agent.deviceID)
return err
}
- operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
- // Wait for response
- go agent.waitForAdapterResponseAndLogDeviceUpdate(subCtx, cancel, "rebootDevice", ch, agent.onSuccess, agent.onFailure, &prevDeviceState)
+ client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
+ if err != nil {
+ logger.Errorw(ctx, "grpc-client-nil",
+ log.Fields{
+ "error": err,
+ "device-id": agent.deviceID,
+ "device-type": agent.deviceType,
+ "adapter-endpoint": device.AdapterEndpoint,
+ })
+ return err
+ }
+ subCtx, cancel := context.WithTimeout(coreutils.WithAllMetadataFromContext(ctx), agent.rpcTimeout)
+ requestStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
+ go func() {
+ defer cancel()
+ _, err := client.RebootDevice(subCtx, device)
+ if err == nil {
+ agent.onSuccess(subCtx, nil, nil, true)
+ } else {
+ agent.onFailure(subCtx, err, nil, nil, true)
+ }
+ }()
return nil
}
@@ -660,50 +625,54 @@
logger.Debugw(ctx, "delete-device-force", log.Fields{"device-id": agent.deviceID})
var desc string
- operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+ var err error
+ requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+ defer func() { agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc) }()
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- desc = err.Error()
- agent.logDeviceUpdate(ctx, "deleteDeviceForce", nil, nil, operStatus, &desc)
+ if err = agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
// Get the device Transient state, return err if it is DELETING
previousDeviceTransientState := agent.getTransientState()
-
- if agent.isStateDeleting(previousDeviceTransientState) {
- agent.requestQueue.RequestComplete()
- desc = fmt.Sprintf("deviceId:%s, Device Deletion is in progress",
- agent.deviceID)
- agent.logDeviceUpdate(ctx, "deleteDeviceForce", nil, nil, operStatus, &desc)
- return status.Error(codes.FailedPrecondition, desc)
- }
-
- //Send stop Reconcile if in progress
- agent.stopReconcile()
-
device := agent.cloneDeviceWithoutLock()
- if err := agent.updateDeviceWithTransientStateAndReleaseLock(ctx, device,
- voltha.DeviceTransientState_FORCE_DELETING, previousDeviceTransientState); err != nil {
+ if !agent.isForceDeletingAllowed(previousDeviceTransientState, device) {
+ agent.requestQueue.RequestComplete()
+ err = status.Error(codes.FailedPrecondition, fmt.Sprintf("deviceId:%s, force deletion is in progress", agent.deviceID))
return err
}
- previousAdminState := device.AdminState
- if previousAdminState != ic.AdminState_PREPROVISIONED {
- subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
- subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
- ch, err := agent.adapterProxy.DeleteDevice(subCtx, device)
+ previousAdminState := device.AdminState
+ if previousAdminState != common.AdminState_PREPROVISIONED {
+ var client adapter_services.AdapterServiceClient
+ client, err = agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
if err != nil {
- cancel()
- desc = err.Error()
- agent.logDeviceUpdate(ctx, "deleteDeviceForce", nil, nil, operStatus, &desc)
+ logger.Errorw(ctx, "grpc-client-nil",
+ log.Fields{
+ "error": err,
+ "device-id": agent.deviceID,
+ "device-type": agent.deviceType,
+ "adapter-endpoint": device.AdapterEndpoint,
+ })
+ agent.requestQueue.RequestComplete()
return err
}
- // As force delete will not be dependent over the response of adapter, marking this operation as success
- operStatus.Code = common.OperationResp_OPERATION_SUCCESS
- agent.logDeviceUpdate(ctx, "deleteDeviceForce", nil, nil, operStatus, &desc)
- // Since it is a case of force delete, nothing needs to be done on adapter responses.
- go agent.waitForAdapterForceDeleteResponse(subCtx, cancel, "deleteDeviceForce", ch, agent.onSuccess,
- agent.onFailure)
+ subCtx, cancel := context.WithTimeout(coreutils.WithAllMetadataFromContext(ctx), agent.rpcTimeout)
+ requestStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
+ go func() {
+ defer cancel()
+ _, err := client.DeleteDevice(subCtx, device)
+ if err == nil {
+ agent.onSuccess(subCtx, nil, nil, true)
+ } else {
+ agent.onFailure(subCtx, err, nil, nil, true)
+ }
+ }()
+ }
+
+ // Update device
+ if err = agent.updateDeviceWithTransientStateAndReleaseLock(ctx, device,
+ core.DeviceTransientState_FORCE_DELETING, previousDeviceTransientState); err != nil {
+ return err
}
return nil
}
@@ -712,12 +681,11 @@
logger.Debugw(ctx, "delete-device", log.Fields{"device-id": agent.deviceID})
var desc string
- operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
- prevState := agent.device.AdminState
+ var err error
+ requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+ defer func() { agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc) }()
- defer agent.logDeviceUpdate(ctx, "deleteDevice", nil, nil, operStatus, &desc)
-
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ if err = agent.requestQueue.WaitForGreenLight(ctx); err != nil {
desc = err.Error()
return err
}
@@ -726,8 +694,8 @@
if !agent.proceedWithRequest(device) {
agent.requestQueue.RequestComplete()
- desc = fmt.Sprintf("deviceId:%s, Cannot complete operation as device deletion is in progress or reconciling is in progress/failed", agent.deviceID)
- return status.Error(codes.FailedPrecondition, desc)
+ err = status.Errorf(codes.FailedPrecondition, "cannot complete operation as device deletion is in progress or reconciling is in progress/failed: %s", agent.deviceID)
+ return err
}
// Get the device Transient state, return err if it is DELETING
@@ -735,38 +703,48 @@
previousAdminState := device.AdminState
// Change the device transient state to DELETING_FROM_ADAPTER state till the device is removed from adapters.
- currentDeviceTransientState := voltha.DeviceTransientState_DELETING_FROM_ADAPTER
+ currentDeviceTransientState := core.DeviceTransientState_DELETING_FROM_ADAPTER
- if previousAdminState == ic.AdminState_PREPROVISIONED {
+ if previousAdminState == common.AdminState_PREPROVISIONED {
// Change the state to DELETING POST ADAPTER RESPONSE directly as adapters have no info of the device.
- currentDeviceTransientState = voltha.DeviceTransientState_DELETING_POST_ADAPTER_RESPONSE
+ currentDeviceTransientState = core.DeviceTransientState_DELETING_POST_ADAPTER_RESPONSE
}
- if err := agent.updateDeviceWithTransientStateAndReleaseLock(ctx, device,
+ // If the device was in pre-prov state (only parent device are in that state) then do not send the request to the
+ // adapter
+ if previousAdminState != common.AdminState_PREPROVISIONED {
+ var client adapter_services.AdapterServiceClient
+ client, err = agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
+ if err != nil {
+ logger.Errorw(ctx, "grpc-client-nil",
+ log.Fields{
+ "error": err,
+ "device-id": agent.deviceID,
+ "device-type": agent.deviceType,
+ "adapter-endpoint": device.AdapterEndpoint,
+ })
+ agent.requestQueue.RequestComplete()
+ return err
+ }
+ subCtx, cancel := context.WithTimeout(coreutils.WithAllMetadataFromContext(ctx), agent.rpcTimeout)
+ requestStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
+ go func() {
+ defer cancel()
+ _, err := client.DeleteDevice(subCtx, device)
+ if err == nil {
+ agent.onDeleteSuccess(subCtx, nil, nil)
+ } else {
+ agent.onDeleteFailure(subCtx, err, nil, nil)
+ }
+ }()
+ }
+
+ // Update device and release lock
+ if err = agent.updateDeviceWithTransientStateAndReleaseLock(ctx, device,
currentDeviceTransientState, previousDeviceTransientState); err != nil {
desc = err.Error()
return err
}
- // If the device was in pre-prov state (only parent device are in that state) then do not send the request to the
- // adapter
- if previousAdminState != ic.AdminState_PREPROVISIONED {
- subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
- subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
- ch, err := agent.adapterProxy.DeleteDevice(subCtx, device)
- if err != nil {
- cancel()
- //updating of transient state is required in error
- if err := agent.updateTransientState(ctx, voltha.DeviceTransientState_DELETE_FAILED); err != nil {
- logger.Errorw(ctx, "failed-to-update-transient-state-as-delete-failed", log.Fields{"device-id": agent.deviceID})
- }
- desc = err.Error()
- return err
- }
-
- operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
- go agent.waitForAdapterResponseAndLogDeviceUpdate(subCtx, cancel, "deleteDevice", ch, agent.onDeleteSuccess,
- agent.onDeleteFailure, &prevState)
- }
return nil
}
@@ -789,67 +767,66 @@
if err != nil {
return nil, err
}
- ch, err := agent.adapterProxy.GetOfpDeviceInfo(ctx, device)
+
+ // Get the gRPC client
+ client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
if err != nil {
return nil, err
}
- // Wait for adapter response
- rpcResponse, ok := <-ch
- if !ok {
- return nil, status.Errorf(codes.Aborted, "channel-closed")
- }
- if rpcResponse.Err != nil {
- return nil, rpcResponse.Err
- }
- // Successful response
- switchCap := &ic.SwitchCapability{}
- if err := ptypes.UnmarshalAny(rpcResponse.Reply, switchCap); err != nil {
- return nil, err
- }
- return switchCap, nil
+ return client.GetOfpDeviceInfo(ctx, device)
}
-func (agent *Agent) onPacketFailure(ctx context.Context, rpc string, response interface{}, args ...interface{}) {
- // packet data is encoded in the args param as the first parameter
- var packet []byte
- if len(args) >= 1 {
- if pkt, ok := args[0].([]byte); ok {
- packet = pkt
- }
- }
- var errResp error
- if err, ok := response.(error); ok {
- errResp = err
- }
- logger.Warnw(ctx, "packet-out-error", log.Fields{
+func (agent *Agent) onPacketFailure(ctx context.Context, err error, packet *ofp.OfpPacketOut) {
+ logger.Errorw(ctx, "packet-out-error", log.Fields{
"device-id": agent.deviceID,
- "error": errResp,
- "packet": hex.EncodeToString(packet),
+ "error": err.Error(),
+ "packet": hex.EncodeToString(packet.Data),
})
+ rpce := agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, err.Error(), nil)
+ go agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce,
+ voltha.EventCategory_COMMUNICATION, nil, time.Now().Unix())
}
func (agent *Agent) packetOut(ctx context.Context, outPort uint32, packet *ofp.OfpPacketOut) error {
- // If deviceType=="" then we must have taken ownership of this device.
- // Fixes VOL-2226 where a core would take ownership and have stale data
if agent.deviceType == "" {
agent.reconcileWithKVStore(ctx)
}
// Send packet to adapter
- subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
- subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
-
- ch, err := agent.adapterProxy.PacketOut(subCtx, agent.deviceType, agent.deviceID, outPort, packet)
+ client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
if err != nil {
- cancel()
- return nil
+ logger.Errorw(ctx, "grpc-client-nil",
+ log.Fields{
+ "error": err,
+ "device-id": agent.deviceID,
+ "device-type": agent.deviceType,
+ })
+ return err
}
- go agent.waitForAdapterResponse(subCtx, cancel, "packetOut", ch, agent.onSuccess, agent.onPacketFailure, packet.Data)
+ subCtx, cancel := context.WithTimeout(coreutils.WithAllMetadataFromContext(ctx), agent.rpcTimeout)
+ go func() {
+ defer cancel()
+ _, err := client.SendPacketOut(subCtx, &ic.PacketOut{
+ DeviceId: agent.deviceID,
+ EgressPortNo: outPort,
+ Packet: packet,
+ })
+ if err == nil {
+ agent.onSuccess(subCtx, nil, nil, false)
+ } else {
+ agent.onPacketFailure(subCtx, err, packet)
+ }
+ }()
return nil
}
func (agent *Agent) updateDeviceUsingAdapterData(ctx context.Context, device *voltha.Device) error {
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ var err error
+ var desc string
+ requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+ defer func() { agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc) }()
+
+ if err = agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
logger.Debugw(ctx, "update-device-using-adapter-data", log.Fields{"device-id": device.Id})
@@ -863,27 +840,40 @@
cloned.Vlan = device.Vlan
cloned.Reason = device.Reason
cloned.ImageDownloads = device.ImageDownloads
- return agent.updateDeviceAndReleaseLock(ctx, cloned)
+ cloned.OperStatus = device.OperStatus
+ cloned.ConnectStatus = device.ConnectStatus
+ if err = agent.updateDeviceAndReleaseLock(ctx, cloned); err == nil {
+ requestStatus.Code = common.OperationResp_OPERATION_SUCCESS
+ }
+ return err
}
func (agent *Agent) updateDeviceStatus(ctx context.Context, operStatus voltha.OperStatus_Types, connStatus voltha.ConnectStatus_Types) error {
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ var err error
+ var desc string
+ opStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+ defer func() { agent.logDeviceUpdate(ctx, nil, nil, opStatus, err, desc) }()
+
+ if err = agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
cloned := agent.cloneDeviceWithoutLock()
// Ensure the enums passed in are valid - they will be invalid if they are not set when this function is invoked
if s, ok := voltha.ConnectStatus_Types_name[int32(connStatus)]; ok {
- logger.Debugw(ctx, "update-device-status-conn", log.Fields{"ok": ok, "val": s})
+ logger.Debugw(ctx, "update-device-conn-status", log.Fields{"ok": ok, "val": s})
cloned.ConnectStatus = connStatus
}
if s, ok := voltha.OperStatus_Types_name[int32(operStatus)]; ok {
- logger.Debugw(ctx, "update-device-status-conn", log.Fields{"ok": ok, "val": s})
+ logger.Debugw(ctx, "update-device-oper-status", log.Fields{"ok": ok, "val": s})
cloned.OperStatus = operStatus
}
logger.Debugw(ctx, "update-device-status", log.Fields{"device-id": cloned.Id, "oper-status": cloned.OperStatus, "connect-status": cloned.ConnectStatus})
// Store the device
- return agent.updateDeviceAndReleaseLock(ctx, cloned)
+ if err = agent.updateDeviceAndReleaseLock(ctx, cloned); err == nil {
+ opStatus.Code = common.OperationResp_OPERATION_SUCCESS
+ }
+ return err
}
// TODO: A generic device update by attribute
@@ -926,7 +916,12 @@
}
func (agent *Agent) simulateAlarm(ctx context.Context, simulateReq *voltha.SimulateAlarmRequest) error {
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ var err error
+ var desc string
+ requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+ defer func() { agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc) }()
+
+ if err = agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
defer agent.requestQueue.RequestComplete()
@@ -934,15 +929,28 @@
device := agent.getDeviceReadOnlyWithoutLock()
- subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
- subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
-
- ch, err := agent.adapterProxy.SimulateAlarm(subCtx, device, simulateReq)
+ client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
if err != nil {
- cancel()
+ logger.Errorw(ctx, "grpc-client-nil",
+ log.Fields{
+ "error": err,
+ "device-id": agent.deviceID,
+ "device-type": agent.deviceType,
+ "adapter-endpoint": device.AdapterEndpoint,
+ })
return err
}
- go agent.waitForAdapterResponse(subCtx, cancel, "simulateAlarm", ch, agent.onSuccess, agent.onFailure)
+ subCtx, cancel := context.WithTimeout(coreutils.WithAllMetadataFromContext(ctx), agent.rpcTimeout)
+ requestStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
+ go func() {
+ defer cancel()
+ _, err := client.SimulateAlarm(subCtx, &ic.SimulateAlarmMessage{Device: device, Request: simulateReq})
+ if err == nil {
+ agent.onSuccess(subCtx, nil, nil, false)
+ } else {
+ agent.onFailure(subCtx, err, nil, nil, false)
+ }
+ }()
return nil
}
@@ -990,7 +998,7 @@
// This function updates the device transient in the DB through loader, releases the device lock, and runs any state transitions.
// The calling function MUST hold the device lock. The caller MUST NOT modify the device after this is called.
func (agent *Agent) updateDeviceWithTransientStateAndReleaseLock(ctx context.Context, device *voltha.Device,
- transientState, prevTransientState voltha.DeviceTransientState_Types) error {
+ transientState, prevTransientState core.DeviceTransientState_Types) error {
// fail early if this agent is no longer valid
if agent.stopped {
agent.requestQueue.RequestComplete()
@@ -1004,9 +1012,10 @@
// update in db
if err := agent.dbProxy.Set(ctx, agent.deviceID, device); err != nil {
//Reverting TransientState update
- err := agent.updateTransientState(ctx, prevTransientState)
- logger.Errorw(ctx, "failed-to-revert-transient-state-update-on-error", log.Fields{"device-id": device.Id,
- "previous-transient-state": prevTransientState, "current-transient-state": transientState})
+ if errTransient := agent.updateTransientState(ctx, prevTransientState); errTransient != nil {
+ logger.Errorw(ctx, "failed-to-revert-transient-state-update-on-error", log.Fields{"device-id": device.Id,
+ "previous-transient-state": prevTransientState, "current-transient-state": transientState, "error": errTransient})
+ }
agent.requestQueue.RequestComplete()
return status.Errorf(codes.Internal, "failed-update-device:%s: %s", agent.deviceID, err)
}
@@ -1023,44 +1032,47 @@
// release lock before processing transition
agent.requestQueue.RequestComplete()
- subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
- if err := agent.deviceMgr.stateTransitions.ProcessTransition(subCtx,
- device, prevDevice, transientState, prevTransientState); err != nil {
- logger.Errorw(ctx, "failed-process-transition", log.Fields{"device-id": device.Id, "previous-admin-state": prevDevice.AdminState, "current-admin-state": device.AdminState})
- // Sending RPC EVENT here
- rpce := agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, err.Error(), nil)
- agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce, voltha.EventCategory_COMMUNICATION,
- nil, time.Now().Unix())
- }
+ go func() {
+ subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
+ if err := agent.deviceMgr.stateTransitions.ProcessTransition(subCtx,
+ device, prevDevice, transientState, prevTransientState); err != nil {
+ logger.Errorw(ctx, "failed-process-transition", log.Fields{"device-id": device.Id, "previous-admin-state": prevDevice.AdminState, "current-admin-state": device.AdminState})
+ // Sending RPC EVENT here
+ rpce := agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, err.Error(), nil)
+ agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce, voltha.EventCategory_COMMUNICATION,
+ nil, time.Now().Unix())
+ }
+ }()
return nil
}
func (agent *Agent) updateDeviceReason(ctx context.Context, reason string) error {
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ logger.Debugw(ctx, "update-device-reason", log.Fields{"device-id": agent.deviceID, "reason": reason})
+
+ var err error
+ var desc string
+ requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+ defer func() { agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc) }()
+
+ if err = agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- logger.Debugw(ctx, "update-device-reason", log.Fields{"device-id": agent.deviceID, "reason": reason})
-
- var desc string
- operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
-
- defer agent.logDeviceUpdate(ctx, "updateDeviceReason", nil, nil, operStatus, &desc)
-
cloned := agent.cloneDeviceWithoutLock()
cloned.Reason = reason
- retErr := agent.updateDeviceAndReleaseLock(ctx, cloned)
- if retErr != nil {
- desc = retErr.Error()
- } else {
- operStatus.Code = common.OperationResp_OPERATION_SUCCESS
- desc = reason
+ if err = agent.updateDeviceAndReleaseLock(ctx, cloned); err == nil {
+ requestStatus.Code = common.OperationResp_OPERATION_SUCCESS
}
- return retErr
+ return err
}
func (agent *Agent) ChildDeviceLost(ctx context.Context, device *voltha.Device) error {
logger.Debugw(ctx, "child-device-lost", log.Fields{"child-device-id": device.Id, "parent-device-id": agent.deviceID})
+ var err error
+ var desc string
+ requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+ defer func() { agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc) }()
+
// Remove the associated peer ports on the parent device
for portID := range agent.portLoader.ListIDs() {
if portHandle, have := agent.portLoader.Lock(portID); have {
@@ -1082,154 +1094,199 @@
}
//send request to adapter
- subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
- subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
-
- ch, err := agent.adapterProxy.ChildDeviceLost(ctx, agent.deviceType, device)
+ client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
if err != nil {
- cancel()
+ logger.Errorw(ctx, "grpc-client-nil",
+ log.Fields{
+ "error": err,
+ "device-id": agent.deviceID,
+ "device-type": agent.deviceType,
+ "adapter-endpoint": device.AdapterEndpoint,
+ })
return err
}
- go agent.waitForAdapterResponse(subCtx, cancel, "childDeviceLost", ch, agent.onSuccess, agent.onFailure)
+ subCtx, cancel := context.WithTimeout(coreutils.WithAllMetadataFromContext(ctx), agent.rpcTimeout)
+ requestStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
+ go func() {
+ defer cancel()
+ _, err := client.ChildDeviceLost(subCtx, device)
+ if err == nil {
+ agent.onSuccess(subCtx, nil, nil, true)
+ } else {
+ agent.onFailure(subCtx, err, nil, nil, true)
+ }
+ }()
return nil
}
func (agent *Agent) startOmciTest(ctx context.Context, omcitestrequest *voltha.OmciTestRequest) (*voltha.TestResponse, error) {
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return nil, err
- }
+ var err error
+ var desc string
+ requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+ defer func() { agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc) }()
- cloned := agent.cloneDeviceWithoutLock()
-
- if cloned.Adapter == "" {
- adapterName, err := agent.adapterMgr.GetAdapterType(cloned.Type)
- if err != nil {
- agent.requestQueue.RequestComplete()
- return nil, err
- }
- cloned.Adapter = adapterName
- }
-
- // Send request to the adapter
- ch, err := agent.adapterProxy.StartOmciTest(ctx, cloned, omcitestrequest)
- agent.requestQueue.RequestComplete()
+ // OMCI test may be performed on a pre-provisioned device. If a device is in that state both its device type and endpoint
+ // may not have been set yet.
+ // First check if we need to update the type or endpoint
+ cloned, err := agent.getDeviceReadOnly(ctx)
if err != nil {
return nil, err
}
-
- // Wait for the adapter response
- rpcResponse, ok := <-ch
- if !ok {
- return nil, status.Errorf(codes.Aborted, "channel-closed-device-id-%s", agent.deviceID)
- }
- if rpcResponse.Err != nil {
- return nil, rpcResponse.Err
+ if cloned.Type == "" || cloned.AdapterEndpoint == "" {
+ if err = agent.updateDeviceTypeAndEndpoint(ctx); err != nil {
+ return nil, err
+ }
+ cloned, err = agent.getDeviceReadOnly(ctx)
+ if err != nil {
+ return nil, err
+ }
}
- // Unmarshal and return the response
- testResp := &voltha.TestResponse{}
- if err := ptypes.UnmarshalAny(rpcResponse.Reply, testResp); err != nil {
- return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
+ // Send request to the adapter
+ client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
+ if err != nil {
+ logger.Errorw(ctx, "grpc-client-nil",
+ log.Fields{
+ "error": err,
+ "device-id": agent.deviceID,
+ "device-type": agent.deviceType,
+ "adapter-endpoint": cloned.AdapterEndpoint,
+ })
+ return nil, err
}
- logger.Debugw(ctx, "omci_test_request-success-device-agent", log.Fields{"test-resp": testResp})
- return testResp, nil
+
+ res, err := client.StartOmciTest(ctx, &ic.OMCITest{
+ Device: cloned,
+ Request: omcitestrequest,
+ })
+ if err == nil {
+ requestStatus.Code = common.OperationResp_OPERATION_SUCCESS
+ }
+ return res, err
}
func (agent *Agent) getExtValue(ctx context.Context, pdevice *voltha.Device, cdevice *voltha.Device, valueparam *voltha.ValueSpecifier) (*voltha.ReturnValues, error) {
logger.Debugw(ctx, "get-ext-value", log.Fields{"device-id": agent.deviceID, "onu-id": valueparam.Id, "value-type": valueparam.Value})
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ var err error
+ var desc string
+ requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+ defer func() { agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc) }()
+
+ if err = agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
}
- //send request to adapter
- ch, err := agent.adapterProxy.GetExtValue(ctx, pdevice, cdevice, valueparam.Id, valueparam.Value)
- agent.requestQueue.RequestComplete()
+ //send request to adapter synchronously
+ client, err := agent.adapterMgr.GetAdapterClient(ctx, pdevice.AdapterEndpoint)
if err != nil {
+ logger.Errorw(ctx, "grpc-client-nil",
+ log.Fields{
+ "error": err,
+ "device-id": agent.deviceID,
+ "device-type": agent.deviceType,
+ "adapter-endpoint": pdevice.AdapterEndpoint,
+ })
+ agent.requestQueue.RequestComplete()
return nil, err
}
- // Wait for the adapter response
- rpcResponse, ok := <-ch
- if !ok {
- return nil, status.Errorf(codes.Aborted, "channel-closed-device-id-%s", agent.deviceID)
- }
- if rpcResponse.Err != nil {
- return nil, rpcResponse.Err
- }
+ // Release lock before sending to adapter
+ agent.requestQueue.RequestComplete()
- // Unmarshal and return the response
- Resp := &voltha.ReturnValues{}
- if err := ptypes.UnmarshalAny(rpcResponse.Reply, Resp); err != nil {
- return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
+ retVal, err := client.GetExtValue(ctx, &ic.GetExtValueMessage{
+ ParentDevice: pdevice,
+ ChildDevice: cdevice,
+ ValueType: valueparam.Value,
+ })
+ if err == nil {
+ requestStatus.Code = common.OperationResp_OPERATION_SUCCESS
}
- logger.Debugw(ctx, "get-ext-value-success-device-agent", log.Fields{"Resp": Resp})
- return Resp, nil
+ return retVal, err
}
func (agent *Agent) setExtValue(ctx context.Context, device *voltha.Device, value *voltha.ValueSet) (*empty.Empty, error) {
logger.Debugw(ctx, "set-ext-value", log.Fields{"device-id": value.Id})
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+
+ var err error
+ var desc string
+ requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+ defer func() { agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc) }()
+
+ if err = agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
}
//send request to adapter
- ch, err := agent.adapterProxy.SetExtValue(ctx, device, value)
- agent.requestQueue.RequestComplete()
+ //send request to adapter synchronously
+ client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
if err != nil {
+ logger.Errorw(ctx, "grpc-client-nil",
+ log.Fields{
+ "error": err,
+ "device-id": agent.deviceID,
+ "device-type": agent.deviceType,
+ "adapter-endpoint": device.AdapterEndpoint,
+ })
+ agent.requestQueue.RequestComplete()
return nil, err
}
+ // Release lock before sending request to adapter
+ agent.requestQueue.RequestComplete()
- // Wait for the adapter response
- rpcResponse, ok := <-ch
- if !ok {
- return nil, status.Errorf(codes.Aborted, "channel-closed-device-id-%s", agent.deviceID)
+ retVal, err := client.SetExtValue(ctx, &ic.SetExtValueMessage{
+ Device: device,
+ Value: value,
+ })
+ if err == nil {
+ requestStatus.Code = common.OperationResp_OPERATION_SUCCESS
}
- if rpcResponse.Err != nil {
- return nil, rpcResponse.Err
- }
-
- // Unmarshal and return the response
- logger.Debug(ctx, "set-ext-value-success-device-agent")
- return &empty.Empty{}, nil
+ return retVal, err
}
func (agent *Agent) getSingleValue(ctx context.Context, request *extension.SingleGetValueRequest) (*extension.SingleGetValueResponse, error) {
logger.Debugw(ctx, "get-single-value", log.Fields{"device-id": request.TargetId})
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ var err error
+ var desc string
+ requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+ defer func() { agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc) }()
+
+ if err = agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
}
cloned := agent.cloneDeviceWithoutLock()
//send request to adapter
- ch, err := agent.adapterProxy.GetSingleValue(ctx, cloned.Adapter, request)
- agent.requestQueue.RequestComplete()
+ client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
if err != nil {
+ logger.Errorw(ctx, "grpc-client-nil",
+ log.Fields{
+ "error": err,
+ "device-id": cloned.Id,
+ "adapter-endpoint": cloned.AdapterEndpoint,
+ })
+ agent.requestQueue.RequestComplete()
return nil, err
}
+ // Release lock before sending request to adapter
+ agent.requestQueue.RequestComplete()
- // Wait for the adapter response
- rpcResponse, ok := <-ch
- if !ok {
- return nil, status.Errorf(codes.Aborted, "channel-closed-device-id-%s", agent.deviceID)
+ resp, err := client.GetSingleValue(ctx, request)
+ if err == nil {
+ requestStatus.Code = common.OperationResp_OPERATION_SUCCESS
}
-
- if rpcResponse.Err != nil {
- return nil, rpcResponse.Err
- }
-
- resp := &extension.SingleGetValueResponse{}
- if err := ptypes.UnmarshalAny(rpcResponse.Reply, resp); err != nil {
- return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
- }
-
- return resp, nil
+ return resp, err
}
func (agent *Agent) setSingleValue(ctx context.Context, request *extension.SingleSetValueRequest) (*extension.SingleSetValueResponse, error) {
logger.Debugw(ctx, "set-single-value", log.Fields{"device-id": request.TargetId})
+ var err error
+ var desc string
+ requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+ defer func() { agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc) }()
+
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
}
@@ -1237,28 +1294,26 @@
cloned := agent.cloneDeviceWithoutLock()
//send request to adapter
- ch, err := agent.adapterProxy.SetSingleValue(ctx, cloned.Adapter, request)
- agent.requestQueue.RequestComplete()
+ client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
if err != nil {
+ logger.Errorw(ctx, "grpc-client-nil",
+ log.Fields{
+ "error": err,
+ "device-id": agent.deviceID,
+ "device-type": agent.deviceType,
+ "adapter-endpoint": cloned.AdapterEndpoint,
+ })
+ agent.requestQueue.RequestComplete()
return nil, err
}
+ // Release lock before sending request to adapter
+ agent.requestQueue.RequestComplete()
- // Wait for the adapter response
- rpcResponse, ok := <-ch
- if !ok {
- return nil, status.Errorf(codes.Aborted, "channel-closed-cloned-id-%s", agent.deviceID)
+ resp, err := client.SetSingleValue(ctx, request)
+ if err == nil {
+ requestStatus.Code = common.OperationResp_OPERATION_SUCCESS
}
-
- if rpcResponse.Err != nil {
- return nil, rpcResponse.Err
- }
-
- resp := &extension.SingleSetValueResponse{}
- if err := ptypes.UnmarshalAny(rpcResponse.Reply, resp); err != nil {
- return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
- }
-
- return resp, nil
+ return resp, err
}
func (agent *Agent) proceedWithRequest(device *voltha.Device) bool {
@@ -1273,32 +1328,66 @@
agent.stopReconcilingMutex.Unlock()
}
-func (agent *Agent) ReconcileDevice(ctx context.Context, device *voltha.Device) {
+// abortAllProcessing is invoked when an adapter managing this device is restarted
+func (agent *Agent) abortAllProcessing(ctx context.Context) error {
+ logger.Infow(ctx, "aborting-current-running-requests", log.Fields{"device-id": agent.deviceID})
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ defer agent.requestQueue.RequestComplete()
+
+ // If any reconciling is in progress just abort it. The adapter is gone.
+ agent.stopReconcile()
+
+ // Update the Core device transient state accordingly
+ var updatedState core.DeviceTransientState_Types
+ switch agent.getTransientState() {
+ case core.DeviceTransientState_RECONCILE_IN_PROGRESS:
+ updatedState = core.DeviceTransientState_NONE
+ case core.DeviceTransientState_FORCE_DELETING:
+ updatedState = core.DeviceTransientState_DELETE_FAILED
+ case core.DeviceTransientState_DELETING_FROM_ADAPTER:
+ updatedState = core.DeviceTransientState_DELETE_FAILED
+ default:
+ updatedState = core.DeviceTransientState_NONE
+ }
+ if err := agent.updateTransientState(ctx, updatedState); err != nil {
+ logger.Errorf(ctx, "transient-state-update-failed", log.Fields{"error": err})
+ return err
+ }
+ return nil
+}
+
+func (agent *Agent) ReconcileDevice(ctx context.Context) {
+ requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
var desc string
- operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- desc = err.Error()
- agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
+ agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc)
+ return
+ }
+
+ device := agent.getDeviceReadOnlyWithoutLock()
+ if device.AdminState == voltha.AdminState_PREPROVISIONED {
+ agent.requestQueue.RequestComplete()
+ logger.Debugw(ctx, "device-in-preprovisioning-state-reconcile-not-needed", log.Fields{"device-id": device.Id})
return
}
if !agent.proceedWithRequest(device) {
agent.requestQueue.RequestComplete()
- desc = fmt.Sprintf("deviceId:%s, Cannot complete operation as device deletion is in progress or reconciling is in progress/failed", device.Id)
- logger.Errorf(ctx, desc)
- agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
+ err := fmt.Errorf("cannot complete operation as device deletion/reconciling is in progress or reconcile failed for device : %s", device.Id)
+ logger.Errorw(ctx, "reconcile-failed", log.Fields{"error": err})
+ agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc)
return
}
//set transient state to RECONCILE IN PROGRESS
- err := agent.updateTransientState(ctx, voltha.DeviceTransientState_RECONCILE_IN_PROGRESS)
+ err := agent.updateTransientState(ctx, core.DeviceTransientState_RECONCILE_IN_PROGRESS)
if err != nil {
agent.requestQueue.RequestComplete()
- desc = fmt.Sprintf("Not able to set device transient state to Reconcile in progress."+
- "Err: %s", err.Error())
- logger.Errorf(ctx, desc)
- agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
+ logger.Errorw(ctx, "setting-transient-state-failed", log.Fields{"error": err})
+ agent.logDeviceUpdate(ctx, nil, nil, requestStatus, nil, desc)
return
}
@@ -1343,43 +1432,35 @@
backoffTimer = time.NewTimer(duration)
- logger.Debugw(ctx, "retrying-reconciling", log.Fields{"deviceID": device.Id})
- // Send a reconcile request to the adapter.
- ch, err := agent.adapterProxy.ReconcileDevice(ctx, agent.device)
- //release lock before moving further
+ logger.Debugw(ctx, "retrying-reconciling", log.Fields{"deviceID": device.Id, "endpoint": device.AdapterEndpoint})
+ // Release lock before sending request to adapter
agent.requestQueue.RequestComplete()
+
+ // Send a reconcile request to the adapter.
+ err := agent.sendReconcileRequestToAdapter(ctx, device)
+ if errors.Is(err, errContextExpired) || errors.Is(err, errReconcileAborted) {
+ logger.Errorw(ctx, "reconcile-aborted", log.Fields{"error": err})
+ requestStatus = &common.OperationResp{Code: common.OperationResp_OperationReturnCode(common.OperStatus_FAILED)}
+ desc = "aborted"
+ agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc)
+ break retry
+ }
if err != nil {
- desc := fmt.Sprintf("Failed reconciling from adapter side. Err: %s", err.Error())
- agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
+ agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc)
<-backoffTimer.C
// backoffTimer expired continue
// Take lock back before retrying
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- desc = err.Error()
- agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
+ agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc)
break retry
}
continue
}
-
- // if return err retry if not then break loop and quit retrying reconcile
- if err = agent.waitForReconcileResponse(backoffTimer, ch); err != nil {
- desc = err.Error()
- logger.Errorf(ctx, desc)
- agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
- <-backoffTimer.C
- } else {
- operStatus = &common.OperationResp{Code: common.OperationResp_OPERATION_IN_PROGRESS}
- agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
- break retry
- }
-
- // Take lock back before retrying
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- desc = err.Error()
- agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
- break retry
- }
+ // Success
+ requestStatus = &common.OperationResp{Code: common.OperationResp_OPERATION_SUCCESS}
+ desc = "adapter-response"
+ agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc)
+ break retry
}
// Retry loop is broken, so stop any timers and drain the channel
@@ -1400,18 +1481,23 @@
}
}
-func (agent *Agent) waitForReconcileResponse(backoffTimer *time.Timer, ch chan *kafka.RpcResponse) error {
+func (agent *Agent) sendReconcileRequestToAdapter(ctx context.Context, device *voltha.Device) error {
+ logger.Debugw(ctx, "sending-reconcile-to-adapter", log.Fields{"device-id": device.Id, "endpoint": agent.adapterEndpoint})
+ client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
+ if err != nil {
+ return err
+ }
+ adapterResponse := make(chan error)
+ go func() {
+ _, err := client.ReconcileDevice(ctx, device)
+ adapterResponse <- err
+ }()
select {
// wait for response
- case resp, ok := <-ch:
- if !ok {
- //channel-closed
- return errors.New("channel on which reconcile response is awaited is closed")
- } else if resp.Err != nil {
- //error encountered
- return fmt.Errorf("error encountered while retrying reconcile. Err: %s", resp.Err.Error())
+ case err := <-adapterResponse:
+ if err != nil {
+ return err
}
-
//In case of success quit retrying and wait for adapter to reset operation state of device
agent.stopReconcilingMutex.Lock()
agent.stopReconciling = nil
@@ -1425,33 +1511,47 @@
agent.stopReconcilingMutex.Unlock()
if !ok {
//channel-closed
- return errors.New("channel used to notify to stop reconcile is closed")
+ return fmt.Errorf("reconcile channel closed:%w", errReconcileAborted)
}
- return nil
- //continue if timer expired
- case <-backoffTimer.C:
+ return fmt.Errorf("reconciling aborted:%w", errReconcileAborted)
+ // Context expired
+ case <-ctx.Done():
+ return fmt.Errorf("context expired:%s :%w", ctx.Err(), errContextExpired)
}
- return nil
}
func (agent *Agent) reconcilingCleanup(ctx context.Context) error {
var desc string
+ var err error
operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- desc = err.Error()
- agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
+ defer func() { agent.logDeviceUpdate(ctx, nil, nil, operStatus, err, desc) }()
+
+ if err = agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ desc = "reconcile-cleanup-failed"
return err
}
defer agent.requestQueue.RequestComplete()
- err := agent.updateTransientState(ctx, voltha.DeviceTransientState_NONE)
+ err = agent.updateTransientState(ctx, core.DeviceTransientState_NONE)
if err != nil {
- desc = fmt.Sprintf("Not able to clear device transient state from Reconcile in progress."+
- "Err: %s", err.Error())
- logger.Errorf(ctx, desc)
- agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
+ logger.Errorf(ctx, "transient-state-update-failed", log.Fields{"error": err})
return err
}
- operStatus = &common.OperationResp{Code: common.OperationResp_OPERATION_SUCCESS}
- agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
+ operStatus.Code = common.OperationResp_OPERATION_SUCCESS
return nil
}
+
+func (agent *Agent) isAdapterConnectionUp(ctx context.Context) bool {
+ c, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
+ return c != nil && err == nil
+}
+
+func (agent *Agent) canDeviceRequestProceed(ctx context.Context) error {
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ defer agent.requestQueue.RequestComplete()
+ if agent.proceedWithRequest(agent.device) {
+ return nil
+ }
+ return fmt.Errorf("device-cannot-process-request-%s", agent.deviceID)
+}