| /* |
| * Copyright 2018-2023 Open Networking Foundation (ONF) and the ONF Contributors |
| |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package device |
| |
| import ( |
| "context" |
| "encoding/hex" |
| "errors" |
| "fmt" |
| "reflect" |
| "sync" |
| "time" |
| |
| "github.com/opencord/voltha-protos/v5/go/adapter_service" |
| "github.com/opencord/voltha-protos/v5/go/core" |
| "github.com/opencord/voltha-protos/v5/go/omci" |
| |
| "github.com/cenkalti/backoff/v3" |
| "github.com/gogo/protobuf/proto" |
| "github.com/golang/protobuf/ptypes/empty" |
| "github.com/opencord/voltha-go/rw_core/config" |
| "github.com/opencord/voltha-go/rw_core/utils" |
| "google.golang.org/grpc/codes" |
| "google.golang.org/grpc/status" |
| |
| "github.com/opencord/voltha-go/db/model" |
| "github.com/opencord/voltha-go/rw_core/core/adapter" |
| "github.com/opencord/voltha-go/rw_core/core/device/flow" |
| "github.com/opencord/voltha-go/rw_core/core/device/group" |
| "github.com/opencord/voltha-go/rw_core/core/device/port" |
| "github.com/opencord/voltha-go/rw_core/core/device/transientstate" |
| coreutils "github.com/opencord/voltha-go/rw_core/utils" |
| "github.com/opencord/voltha-lib-go/v7/pkg/log" |
| "github.com/opencord/voltha-protos/v5/go/common" |
| ca "github.com/opencord/voltha-protos/v5/go/core_adapter" |
| "github.com/opencord/voltha-protos/v5/go/extension" |
| ofp "github.com/opencord/voltha-protos/v5/go/openflow_13" |
| "github.com/opencord/voltha-protos/v5/go/voltha" |
| ) |
| |
| var errReconcileAborted = errors.New("reconcile aborted") |
| var errContextExpired = errors.New("context expired") |
| var errNoConnection = errors.New("no connection") |
| |
| // Agent represents device agent attributes |
| type Agent struct { |
| deviceID string |
| parentID string |
| deviceType string |
| adapterEndpoint string |
| isRootDevice bool |
| adapterMgr *adapter.Manager |
| deviceMgr *Manager |
| dbProxy *model.Proxy |
| exitChannel chan int |
| device *voltha.Device |
| requestQueue *coreutils.RequestQueue |
| internalTimeout time.Duration |
| rpcTimeout time.Duration |
| flowTimeout time.Duration |
| startOnce sync.Once |
| stopOnce sync.Once |
| stopped bool |
| stopReconciling chan int |
| stopReconcilingMutex sync.RWMutex |
| config *config.RWCoreFlags |
| |
| flowCache *flow.Cache |
| groupCache *group.Cache |
| portLoader *port.Loader |
| transientStateLoader *transientstate.Loader |
| } |
| |
| // newAgent creates a new device agent. The device will be initialized when start() is called. |
| func newAgent(device *voltha.Device, deviceMgr *Manager, dbPath *model.Path, deviceProxy *model.Proxy, internalTimeout, rpcTimeout, flowTimeout time.Duration) *Agent { |
| deviceID := device.Id |
| if deviceID == "" { |
| deviceID = coreutils.CreateDeviceID() |
| } |
| |
| return &Agent{ |
| deviceID: deviceID, |
| isRootDevice: device.Root, |
| parentID: device.ParentId, |
| deviceType: device.Type, |
| adapterEndpoint: device.AdapterEndpoint, |
| deviceMgr: deviceMgr, |
| adapterMgr: deviceMgr.adapterMgr, |
| exitChannel: make(chan int, 1), |
| dbProxy: deviceProxy, |
| internalTimeout: internalTimeout, |
| rpcTimeout: rpcTimeout, |
| flowTimeout: flowTimeout, |
| device: proto.Clone(device).(*voltha.Device), |
| requestQueue: coreutils.NewRequestQueue(), |
| config: deviceMgr.config, |
| flowCache: flow.NewCache(), |
| groupCache: group.NewCache(), |
| portLoader: port.NewLoader(dbPath.SubPath("ports").Proxy(deviceID)), |
| transientStateLoader: transientstate.NewLoader(dbPath.SubPath("core").Proxy("transientstate"), deviceID), |
| } |
| } |
| |
| // start() saves the device to the data model and registers for callbacks on that device if deviceToCreate!=nil. |
| // Otherwise, it will load the data from the dB and setup the necessary callbacks and proxies. Returns the device that |
| // was started. |
| func (agent *Agent) start(ctx context.Context, deviceExist bool, deviceToCreate *voltha.Device) (*voltha.Device, error) { |
| needToStart := false |
| if agent.startOnce.Do(func() { needToStart = true }); !needToStart { |
| return agent.getDeviceReadOnly(ctx) |
| } |
| var startSucceeded bool |
| defer func() { |
| if !startSucceeded { |
| if err := agent.stop(ctx); err != nil { |
| logger.Errorw(ctx, "failed-to-cleanup-after-unsuccessful-start", log.Fields{"device-id": agent.deviceID, "error": err}) |
| } |
| } |
| }() |
| if deviceExist { |
| device := deviceToCreate |
| if device == nil { |
| // Load from dB |
| device = &voltha.Device{} |
| have, err := agent.dbProxy.Get(ctx, agent.deviceID, device) |
| if err != nil { |
| return nil, err |
| } else if !have { |
| return nil, status.Errorf(codes.NotFound, "device-%s", agent.deviceID) |
| } |
| logger.Infow(ctx, "device-loaded-from-db", log.Fields{"device-id": agent.deviceID, "adapter-endpoint": device.AdapterEndpoint, "type": device.Type}) |
| } |
| agent.deviceType = device.Type |
| agent.adapterEndpoint = device.AdapterEndpoint |
| agent.device = proto.Clone(device).(*voltha.Device) |
| // load the ports from KV to cache |
| agent.portLoader.Load(ctx) |
| agent.transientStateLoader.Load(ctx) |
| } else { |
| // Create a new device |
| var desc string |
| var err error |
| prevState := common.AdminState_UNKNOWN |
| currState := common.AdminState_UNKNOWN |
| requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE} |
| |
| defer func() { agent.logDeviceUpdate(ctx, &prevState, &currState, requestStatus, err, desc) }() |
| |
| // Assumption is that AdminState, FlowGroups, and Flows are uninitialized since this |
| // is a new device, so populate them here before passing the device to ldProxy.Set. |
| // agent.deviceId will also have been set during newAgent(). |
| device := (proto.Clone(deviceToCreate)).(*voltha.Device) |
| device.Id = agent.deviceID |
| device.AdminState = voltha.AdminState_PREPROVISIONED |
| currState = device.AdminState |
| if !deviceToCreate.GetRoot() && deviceToCreate.ProxyAddress != nil { |
| // Set the default vlan ID to the one specified by the parent adapter. It can be |
| // overwritten by the child adapter during a device update request |
| device.Vlan = deviceToCreate.ProxyAddress.ChannelId |
| } |
| |
| // Save the device to the model |
| if err = agent.dbProxy.Set(ctx, agent.deviceID, device); err != nil { |
| err = status.Errorf(codes.Aborted, "failed-adding-device-%s: %s", agent.deviceID, err) |
| return nil, err |
| } |
| _ = agent.deviceMgr.Agent.SendDeviceStateChangeEvent(ctx, device.OperStatus, device.ConnectStatus, prevState, device, time.Now().Unix()) |
| requestStatus.Code = common.OperationResp_OPERATION_SUCCESS |
| agent.device = device |
| } |
| startSucceeded = true |
| log.EnrichSpan(ctx, log.Fields{"device-id": agent.deviceID}) |
| logger.Debugw(ctx, "device-agent-started", log.Fields{"device-id": agent.deviceID}) |
| |
| return agent.getDeviceReadOnly(ctx) |
| } |
| |
| // stop stops the device agent. Not much to do for now |
| func (agent *Agent) stop(ctx context.Context) error { |
| needToStop := false |
| if agent.stopOnce.Do(func() { needToStop = true }); !needToStop { |
| return nil |
| } |
| if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil { |
| return err |
| } |
| defer agent.requestQueue.RequestComplete() |
| |
| logger.Infow(ctx, "stopping-device-agent", log.Fields{"device-id": agent.deviceID, "parent-id": agent.parentID}) |
| // Remove the device transient loader |
| if err := agent.deleteTransientState(ctx); err != nil { |
| return err |
| } |
| // Remove the device from the KV store |
| if err := agent.dbProxy.Remove(ctx, agent.deviceID); err != nil { |
| return err |
| } |
| //send the device event to the message bus |
| _ = agent.deviceMgr.Agent.SendDeviceDeletedEvent(ctx, agent.device, time.Now().Unix()) |
| |
| close(agent.exitChannel) |
| |
| agent.stopped = true |
| |
| logger.Infow(ctx, "device-agent-stopped", log.Fields{"device-id": agent.deviceID, "parent-id": agent.parentID}) |
| |
| return nil |
| } |
| |
| // Load the most recent state from the KVStore for the device. |
| func (agent *Agent) reconcileWithKVStore(ctx context.Context) { |
| if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil { |
| logger.Warnw(ctx, "request-aborted", log.Fields{"device-id": agent.deviceID, "error": err}) |
| return |
| } |
| defer agent.requestQueue.RequestComplete() |
| logger.Debug(ctx, "reconciling-device-agent-devicetype") |
| // TODO: context timeout |
| device := &voltha.Device{} |
| if have, err := agent.dbProxy.Get(ctx, agent.deviceID, device); err != nil { |
| logger.Errorw(ctx, "kv-get-failed", log.Fields{"device-id": agent.deviceID, "error": err}) |
| return |
| } else if !have { |
| return // not found in kv |
| } |
| |
| agent.deviceType = device.Type |
| agent.device = device |
| agent.adapterEndpoint = device.AdapterEndpoint |
| agent.portLoader.Load(ctx) |
| agent.transientStateLoader.Load(ctx) |
| |
| logger.Debugw(ctx, "reconciled-device-agent-devicetype", log.Fields{"device-id": agent.deviceID, "type": agent.deviceType}) |
| } |
| |
| // onSuccess is a common callback for scenarios where we receive a nil response following a request to an adapter |
| func (agent *Agent) onSuccess(ctx context.Context, prevState, currState *common.AdminState_Types, deviceUpdateLog bool) { |
| if deviceUpdateLog { |
| requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_SUCCESS} |
| desc := "adapter-response" |
| agent.logDeviceUpdate(ctx, prevState, currState, requestStatus, nil, desc) |
| return |
| } |
| logger.Debugw(ctx, "successful-operation", log.Fields{"device-id": agent.deviceID, "rpc": coreutils.GetRPCMetadataFromContext(ctx)}) |
| } |
| |
| // onFailure is a common callback for scenarios where we receive an error response following a request to an adapter |
| // and the only action required is to publish the failed result on kafka |
| func (agent *Agent) onFailure(ctx context.Context, err error, prevState, currState *common.AdminState_Types, deviceUpdateLog bool) { |
| // Send an event on kafka |
| rpce := agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, err.Error(), nil) |
| go agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce, |
| voltha.EventCategory_COMMUNICATION, nil, time.Now().Unix()) |
| |
| // Log the device update event |
| if deviceUpdateLog { |
| requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE} |
| desc := "adapter-response" |
| agent.logDeviceUpdate(ctx, prevState, currState, requestStatus, err, desc) |
| return |
| } |
| logger.Errorw(ctx, "failed-operation", log.Fields{"error": err, "device-id": agent.deviceID, "rpc": coreutils.GetRPCMetadataFromContext(ctx)}) |
| } |
| |
| // onForceDeleteResponse is invoked following a force delete request to an adapter. |
| func (agent *Agent) onForceDeleteResponse(ctx context.Context, prevState, currState *common.AdminState_Types, dErr error) { |
| // Log the status |
| requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_SUCCESS} |
| if dErr != nil { |
| requestStatus = &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE} |
| } |
| agent.logDeviceUpdate(ctx, prevState, currState, requestStatus, dErr, "adapter-force-delete-response") |
| |
| if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil { |
| logger.Errorw(ctx, "failed-getting-device-request-lock", log.Fields{"device-id": agent.deviceID, "error": err}) |
| } |
| previousDeviceTransientState := agent.getTransientState() |
| newDevice := agent.cloneDeviceWithoutLock() |
| |
| // Even on a delete error response, cleaup the device in the core |
| requestStatus = &common.OperationResp{Code: common.OperationResp_OPERATION_SUCCESS} |
| err := agent.updateDeviceWithTransientStateAndReleaseLock(ctx, newDevice, |
| core.DeviceTransientState_DELETING_POST_ADAPTER_RESPONSE, previousDeviceTransientState) |
| if err != nil { |
| requestStatus = &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE} |
| } |
| agent.logDeviceUpdate(ctx, prevState, currState, requestStatus, err, "transient-state-update") |
| } |
| |
| // onDeleteSuccess is a common callback for scenarios where we receive a nil response following a delete request |
| // to an adapter. |
| func (agent *Agent) onDeleteSuccess(ctx context.Context, prevState, currState *common.AdminState_Types) { |
| if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil { |
| logger.Errorw(ctx, "delete-device-failure", log.Fields{"device-id": agent.deviceID, "error": err}) |
| } |
| previousDeviceTransientState := agent.getTransientState() |
| newDevice := agent.cloneDeviceWithoutLock() |
| if err := agent.updateDeviceWithTransientStateAndReleaseLock(ctx, newDevice, |
| core.DeviceTransientState_DELETING_POST_ADAPTER_RESPONSE, previousDeviceTransientState); err != nil { |
| logger.Errorw(ctx, "delete-device-failure", log.Fields{"device-id": agent.deviceID, "error": err}) |
| } |
| requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_SUCCESS} |
| desc := "adapter-response" |
| agent.logDeviceUpdate(ctx, prevState, currState, requestStatus, nil, desc) |
| } |
| |
| // onDeleteFailure is a common callback for scenarios where we receive an error response following a delete request |
| // |
| // to an adapter and the only action required is to return the error response. |
| func (agent *Agent) onDeleteFailure(ctx context.Context, err error, prevState, currState *common.AdminState_Types) { |
| logger.Errorw(ctx, "rpc-failed", log.Fields{"rpc": coreutils.GetRPCMetadataFromContext(ctx), "device-id": agent.deviceID, "error": err}) |
| |
| //Only updating of transient state is required, no transition. |
| if er := agent.updateTransientState(ctx, core.DeviceTransientState_DELETE_FAILED); er != nil { |
| logger.Errorw(ctx, "failed-to-update-transient-state-as-delete-failed", log.Fields{"device-id": agent.deviceID, "error": er}) |
| } |
| rpce := agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, err.Error(), nil) |
| go agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce, |
| voltha.EventCategory_COMMUNICATION, nil, time.Now().Unix()) |
| |
| // Log the device update event |
| requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE} |
| desc := "adapter-response" |
| agent.logDeviceUpdate(ctx, prevState, currState, requestStatus, err, desc) |
| } |
| |
| // getDeviceReadOnly returns a device which MUST NOT be modified, but is safe to keep forever. |
| func (agent *Agent) getDeviceReadOnly(ctx context.Context) (*voltha.Device, error) { |
| if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil { |
| return nil, err |
| } |
| defer agent.requestQueue.RequestComplete() |
| return agent.device, nil |
| } |
| |
| // getDeviceReadOnlyWithoutLock returns a device which MUST NOT be modified, but is safe to keep forever. This is very efficient. |
| // The device lock MUST be held by the caller. |
| func (agent *Agent) getDeviceReadOnlyWithoutLock() *voltha.Device { |
| return agent.device |
| } |
| |
| // cloneDeviceWithoutLock returns a copy of the device which is safe to modify. |
| // The device lock MUST be held by the caller. |
| func (agent *Agent) cloneDeviceWithoutLock() *voltha.Device { |
| return proto.Clone(agent.device).(*voltha.Device) |
| } |
| |
| func (agent *Agent) updateDeviceTypeAndEndpoint(ctx context.Context) error { |
| if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil { |
| return err |
| } |
| changed := false |
| cloned := agent.cloneDeviceWithoutLock() |
| if cloned.Type == "" { |
| adapterType, err := agent.adapterMgr.GetAdapterType(cloned.Type) |
| if err != nil { |
| agent.requestQueue.RequestComplete() |
| return err |
| } |
| cloned.Type = adapterType |
| changed = true |
| } |
| |
| if cloned.AdapterEndpoint == "" { |
| var err error |
| if cloned.AdapterEndpoint, err = agent.adapterMgr.GetAdapterEndpoint(ctx, cloned.Id, cloned.Type); err != nil { |
| agent.requestQueue.RequestComplete() |
| return err |
| } |
| agent.adapterEndpoint = cloned.AdapterEndpoint |
| changed = true |
| } |
| |
| if changed { |
| return agent.updateDeviceAndReleaseLock(ctx, cloned) |
| } |
| agent.requestQueue.RequestComplete() |
| return nil |
| } |
| |
| // enableDevice activates a preprovisioned or a disable device |
| func (agent *Agent) enableDevice(ctx context.Context) error { |
| //To preserve and use oldDevice state as prev state in new device |
| var err error |
| var desc string |
| var prevAdminState, currAdminState common.AdminState_Types |
| requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE} |
| |
| defer func() { agent.logDeviceUpdate(ctx, &prevAdminState, &currAdminState, requestStatus, err, desc) }() |
| |
| if err = agent.requestQueue.WaitForGreenLight(ctx); err != nil { |
| return err |
| } |
| logger.Debugw(ctx, "enable-device", log.Fields{"device-id": agent.deviceID}) |
| |
| oldDevice := agent.getDeviceReadOnlyWithoutLock() |
| prevAdminState = oldDevice.AdminState |
| |
| if !agent.proceedWithRequest(oldDevice) { |
| agent.requestQueue.RequestComplete() |
| err = status.Errorf(codes.FailedPrecondition, "cannot complete operation as device deletion is in progress or reconciling is in progress/failed: %s", agent.deviceID) |
| return err |
| } |
| //vol-4275 TST meeting 08/04/2021: Let EnableDevice to be called again if device is in FAILED operational state, |
| //even the admin state is ENABLED. |
| if oldDevice.AdminState == voltha.AdminState_ENABLED && oldDevice.OperStatus != voltha.OperStatus_FAILED { |
| logger.Warnw(ctx, "device-already-enabled", log.Fields{"device-id": agent.deviceID}) |
| agent.requestQueue.RequestComplete() |
| err = status.Errorf(codes.FailedPrecondition, fmt.Sprintf("cannot-enable-an-already-enabled-device: %s", oldDevice.Id)) |
| return err |
| } |
| |
| // Verify whether there is a device type that supports this device type |
| _, err = agent.adapterMgr.GetAdapterType(oldDevice.Type) |
| if err != nil { |
| agent.requestQueue.RequestComplete() |
| return err |
| } |
| |
| // Update device adapter endpoint if not set. This is set once by the Core and use as is by the adapters. E.g if this is a |
| // child device then the parent adapter will use this device's adapter endpoint (set here) to communicate with it. |
| newDevice := agent.cloneDeviceWithoutLock() |
| if newDevice.AdapterEndpoint == "" { |
| if newDevice.AdapterEndpoint, err = agent.adapterMgr.GetAdapterEndpoint(ctx, newDevice.Id, newDevice.Type); err != nil { |
| agent.requestQueue.RequestComplete() |
| return err |
| } |
| agent.adapterEndpoint = newDevice.AdapterEndpoint |
| } |
| |
| // Update the Admin State and set the operational state to activating before sending the request to the Adapters |
| newDevice.AdminState = voltha.AdminState_ENABLED |
| newDevice.OperStatus = voltha.OperStatus_ACTIVATING |
| |
| // Adopt the device if it was in pre-provision state. In all other cases, try to re-enable it. |
| client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint) |
| if err != nil { |
| logger.Errorw(ctx, "grpc-client-nil", |
| log.Fields{ |
| "error": err, |
| "device-id": agent.deviceID, |
| "device-type": agent.deviceType, |
| "adapter-endpoint": newDevice.AdapterEndpoint, |
| }) |
| agent.requestQueue.RequestComplete() |
| return err |
| } |
| subCtx, cancel := context.WithTimeout(coreutils.WithAllMetadataFromContext(ctx), agent.rpcTimeout) |
| requestStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS |
| go func() { |
| defer cancel() |
| var err error |
| if oldDevice.AdminState == voltha.AdminState_PREPROVISIONED { |
| _, err = client.AdoptDevice(subCtx, newDevice) |
| } else { |
| _, err = client.ReEnableDevice(subCtx, newDevice) |
| } |
| if err == nil { |
| agent.onSuccess(subCtx, nil, nil, true) |
| } else { |
| agent.onFailure(subCtx, err, nil, nil, true) |
| } |
| }() |
| |
| // Update device |
| if err = agent.updateDeviceAndReleaseLock(ctx, newDevice); err != nil { |
| return err |
| } |
| currAdminState = newDevice.AdminState |
| return nil |
| } |
| |
| // addFlowsAndGroups adds the "newFlows" and "newGroups" from the existing flows/groups and sends the update to the |
| // adapters |
| func (agent *Agent) addFlowsAndGroups(ctx context.Context, newFlows []*ofp.OfpFlowStats, newGroups []*ofp.OfpGroupEntry, flowMetadata *ofp.FlowMetadata) error { |
| var flwResponse, grpResponse coreutils.Response |
| var err error |
| //if new flow list is empty then the called function returns quickly |
| if flwResponse, err = agent.addFlowsToAdapter(ctx, newFlows, flowMetadata); err != nil { |
| return err |
| } |
| //if new group list is empty then the called function returns quickly |
| if grpResponse, err = agent.addGroupsToAdapter(ctx, newGroups, flowMetadata); err != nil { |
| return err |
| } |
| if errs := coreutils.WaitForNilOrErrorResponses(agent.flowTimeout, flwResponse, grpResponse); errs != nil { |
| logger.Warnw(ctx, "adapter-response", log.Fields{"device-id": agent.deviceID, "result": errs}) |
| return status.Errorf(codes.Aborted, "flow-failure-device-%s", agent.deviceID) |
| } |
| return nil |
| } |
| |
| // deleteFlowsAndGroups removes the "flowsToDel" and "groupsToDel" from the existing flows/groups and sends the update to the |
| // adapters |
| func (agent *Agent) deleteFlowsAndGroups(ctx context.Context, flowsToDel []*ofp.OfpFlowStats, groupsToDel []*ofp.OfpGroupEntry, flowMetadata *ofp.FlowMetadata) error { |
| var flwResponse, grpResponse coreutils.Response |
| var err error |
| if flwResponse, err = agent.deleteFlowsFromAdapter(ctx, flowsToDel, flowMetadata); err != nil { |
| return err |
| } |
| if grpResponse, err = agent.deleteGroupsFromAdapter(ctx, groupsToDel, flowMetadata); err != nil { |
| return err |
| } |
| |
| if res := coreutils.WaitForNilOrErrorResponses(agent.flowTimeout, flwResponse, grpResponse); res != nil { |
| return status.Errorf(codes.Aborted, "errors-%s", res) |
| } |
| return nil |
| } |
| |
| // updateFlowsAndGroups replaces the existing flows and groups with "updatedFlows" and "updatedGroups" respectively. It |
| // also sends the updates to the adapters |
| func (agent *Agent) updateFlowsAndGroups(ctx context.Context, updatedFlows []*ofp.OfpFlowStats, updatedGroups []*ofp.OfpGroupEntry, flowMetadata *ofp.FlowMetadata) error { |
| var flwResponse, grpResponse coreutils.Response |
| var err error |
| if flwResponse, err = agent.updateFlowsToAdapter(ctx, updatedFlows, flowMetadata); err != nil { |
| return err |
| } |
| if grpResponse, err = agent.updateGroupsToAdapter(ctx, updatedGroups, flowMetadata); err != nil { |
| return err |
| } |
| |
| if res := coreutils.WaitForNilOrErrorResponses(agent.flowTimeout, flwResponse, grpResponse); res != nil { |
| return status.Errorf(codes.Aborted, "errors-%s", res) |
| } |
| return nil |
| } |
| |
| // disableDevice disable a device |
| func (agent *Agent) disableDevice(ctx context.Context) error { |
| var err error |
| var desc string |
| var prevAdminState, currAdminState common.AdminState_Types |
| requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE} |
| defer func() { agent.logDeviceUpdate(ctx, &prevAdminState, &currAdminState, requestStatus, err, desc) }() |
| |
| if err = agent.requestQueue.WaitForGreenLight(ctx); err != nil { |
| return err |
| } |
| logger.Debugw(ctx, "disable-device", log.Fields{"device-id": agent.deviceID}) |
| |
| cloned := agent.cloneDeviceWithoutLock() |
| prevAdminState = agent.device.AdminState |
| |
| if !agent.proceedWithRequest(cloned) { |
| err = status.Errorf(codes.FailedPrecondition, "cannot complete operation as device deletion is in progress/failed: %s", agent.deviceID) |
| agent.requestQueue.RequestComplete() |
| return err |
| } |
| |
| if cloned.AdminState == voltha.AdminState_DISABLED { |
| desc = "device-already-disabled" |
| agent.requestQueue.RequestComplete() |
| return nil |
| } |
| if cloned.AdminState == voltha.AdminState_PREPROVISIONED { |
| agent.requestQueue.RequestComplete() |
| err = status.Errorf(codes.FailedPrecondition, "deviceId:%s, invalid-admin-state:%s", agent.deviceID, cloned.AdminState) |
| return err |
| } |
| |
| // Update the Admin State and operational state before sending the request out |
| cloned.AdminState = voltha.AdminState_DISABLED |
| cloned.OperStatus = voltha.OperStatus_UNKNOWN |
| |
| client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint) |
| if err != nil { |
| logger.Errorw(ctx, "grpc-client-nil", |
| log.Fields{ |
| "error": err, |
| "device-id": agent.deviceID, |
| "device-type": agent.deviceType, |
| "adapter-endpoint": cloned.AdapterEndpoint, |
| }) |
| agent.requestQueue.RequestComplete() |
| return err |
| } |
| subCtx, cancel := context.WithTimeout(coreutils.WithAllMetadataFromContext(ctx), agent.rpcTimeout) |
| requestStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS |
| go func() { |
| defer cancel() |
| _, err := client.DisableDevice(subCtx, cloned) |
| if err == nil { |
| agent.onSuccess(subCtx, nil, nil, true) |
| } else { |
| agent.onFailure(subCtx, err, nil, nil, true) |
| } |
| }() |
| |
| // Update device |
| if err = agent.updateDeviceAndReleaseLock(ctx, cloned); err != nil { |
| return err |
| } |
| currAdminState = cloned.AdminState |
| |
| return nil |
| } |
| |
| func (agent *Agent) rebootDevice(ctx context.Context) error { |
| var desc string |
| var err error |
| requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE} |
| defer func() { agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc) }() |
| |
| if err = agent.requestQueue.WaitForGreenLight(ctx); err != nil { |
| desc = err.Error() |
| return err |
| } |
| defer agent.requestQueue.RequestComplete() |
| logger.Debugw(ctx, "reboot-device", log.Fields{"device-id": agent.deviceID}) |
| |
| device := agent.getDeviceReadOnlyWithoutLock() |
| |
| if !agent.proceedWithRequest(device) { |
| err = status.Errorf(codes.FailedPrecondition, "cannot complete operation as device deletion is in progress or reconciling is in progress/failed:%s", agent.deviceID) |
| return err |
| } |
| |
| client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint) |
| if err != nil { |
| logger.Errorw(ctx, "grpc-client-nil", |
| log.Fields{ |
| "error": err, |
| "device-id": agent.deviceID, |
| "device-type": agent.deviceType, |
| "adapter-endpoint": device.AdapterEndpoint, |
| }) |
| return err |
| } |
| subCtx, cancel := context.WithTimeout(coreutils.WithAllMetadataFromContext(ctx), agent.rpcTimeout) |
| requestStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS |
| go func() { |
| defer cancel() |
| _, err := client.RebootDevice(subCtx, device) |
| if err == nil { |
| agent.onSuccess(subCtx, nil, nil, true) |
| } else { |
| agent.onFailure(subCtx, err, nil, nil, true) |
| } |
| }() |
| return nil |
| } |
| |
| func (agent *Agent) deleteDeviceForce(ctx context.Context) error { |
| logger.Debugw(ctx, "delete-device-force", log.Fields{"device-id": agent.deviceID}) |
| |
| var desc string |
| var err error |
| requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE} |
| defer func() { agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc) }() |
| |
| if err = agent.requestQueue.WaitForGreenLight(ctx); err != nil { |
| return err |
| } |
| // Get the device Transient state, return err if it is DELETING |
| previousDeviceTransientState := agent.getTransientState() |
| device := agent.cloneDeviceWithoutLock() |
| if !agent.isForceDeletingAllowed(previousDeviceTransientState, device) { |
| agent.requestQueue.RequestComplete() |
| err = status.Error(codes.FailedPrecondition, fmt.Sprintf("deviceId:%s, force deletion is in progress", agent.deviceID)) |
| return err |
| } |
| |
| previousAdminState := device.AdminState |
| if previousAdminState != common.AdminState_PREPROVISIONED { |
| var client adapter_service.AdapterServiceClient |
| client, err = agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint) |
| if err != nil { |
| logger.Errorw(ctx, "grpc-client-nil", |
| log.Fields{ |
| "error": err, |
| "device-id": agent.deviceID, |
| "device-type": agent.deviceType, |
| "adapter-endpoint": device.AdapterEndpoint, |
| }) |
| agent.requestQueue.RequestComplete() |
| return fmt.Errorf("remote-not-reachable %w", errNoConnection) |
| } |
| subCtx, cancel := context.WithTimeout(coreutils.WithAllMetadataFromContext(ctx), agent.rpcTimeout) |
| requestStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS |
| go func() { |
| defer cancel() |
| _, err := client.DeleteDevice(subCtx, device) |
| if err == nil { |
| agent.onSuccess(subCtx, nil, nil, true) |
| } else { |
| agent.onFailure(subCtx, err, nil, nil, true) |
| } |
| }() |
| } |
| |
| // Update device |
| if err = agent.updateDeviceWithTransientStateAndReleaseLock(ctx, device, |
| core.DeviceTransientState_FORCE_DELETING, previousDeviceTransientState); err != nil { |
| return err |
| } |
| return nil |
| } |
| |
| func (agent *Agent) deleteDevice(ctx context.Context) error { |
| logger.Debugw(ctx, "delete-device", log.Fields{"device-id": agent.deviceID}) |
| |
| var desc string |
| var err error |
| requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE} |
| defer func() { agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc) }() |
| |
| if err = agent.requestQueue.WaitForGreenLight(ctx); err != nil { |
| desc = err.Error() |
| return err |
| } |
| |
| device := agent.cloneDeviceWithoutLock() |
| |
| if !agent.proceedWithRequest(device) { |
| agent.requestQueue.RequestComplete() |
| err = status.Errorf(codes.FailedPrecondition, "cannot complete operation as device deletion is in progress or reconciling is in progress/failed: %s", agent.deviceID) |
| return err |
| } |
| |
| // Get the device Transient state, return err if it is DELETING |
| previousDeviceTransientState := agent.getTransientState() |
| |
| previousAdminState := device.AdminState |
| // Change the device transient state to DELETING_FROM_ADAPTER state till the device is removed from adapters. |
| currentDeviceTransientState := core.DeviceTransientState_DELETING_FROM_ADAPTER |
| |
| if previousAdminState == common.AdminState_PREPROVISIONED { |
| // Change the state to DELETING POST ADAPTER RESPONSE directly as adapters have no info of the device. |
| currentDeviceTransientState = core.DeviceTransientState_DELETING_POST_ADAPTER_RESPONSE |
| } |
| // Update device and release lock |
| if err = agent.updateDeviceWithTransientStateAndReleaseLock(ctx, device, |
| currentDeviceTransientState, previousDeviceTransientState); err != nil { |
| desc = err.Error() |
| return err |
| } |
| // If the device was in pre-prov state (only parent device are in that state) then do not send the request to the |
| // adapter |
| if previousAdminState != common.AdminState_PREPROVISIONED { |
| var client adapter_service.AdapterServiceClient |
| client, err = agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint) |
| if err != nil { |
| logger.Errorw(ctx, "grpc-client-nil", |
| log.Fields{ |
| "error": err, |
| "device-id": agent.deviceID, |
| "device-type": agent.deviceType, |
| "adapter-endpoint": device.AdapterEndpoint, |
| }) |
| agent.requestQueue.RequestComplete() |
| return err |
| } |
| subCtx, cancel := context.WithTimeout(coreutils.WithAllMetadataFromContext(ctx), agent.rpcTimeout) |
| requestStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS |
| if _, err = client.DeleteDevice(subCtx, device); err != nil { |
| agent.onDeleteFailure(subCtx, err, &previousAdminState, &agent.device.AdminState) |
| } else { |
| agent.onDeleteSuccess(subCtx, &previousAdminState, &agent.device.AdminState) |
| } |
| cancel() |
| } |
| return err |
| } |
| |
| func (agent *Agent) setParentID(ctx context.Context, device *voltha.Device, parentID string) error { |
| if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil { |
| return err |
| } |
| logger.Debugw(ctx, "set-parent-id", log.Fields{"device-id": device.Id, "parent-id": parentID}) |
| |
| cloned := agent.cloneDeviceWithoutLock() |
| cloned.ParentId = parentID |
| return agent.updateDeviceAndReleaseLock(ctx, cloned) |
| } |
| |
| // getSwitchCapability retrieves the switch capability of a parent device |
| func (agent *Agent) getSwitchCapability(ctx context.Context) (*ca.SwitchCapability, error) { |
| logger.Debugw(ctx, "get-switch-capability", log.Fields{"device-id": agent.deviceID}) |
| |
| device, err := agent.getDeviceReadOnly(ctx) |
| if err != nil { |
| return nil, err |
| } |
| |
| // Get the gRPC client |
| client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint) |
| if err != nil { |
| return nil, err |
| } |
| |
| return client.GetOfpDeviceInfo(ctx, device) |
| } |
| |
| func (agent *Agent) onPacketFailure(ctx context.Context, err error, packet *ofp.OfpPacketOut) { |
| logger.Errorw(ctx, "packet-out-error", log.Fields{ |
| "device-id": agent.deviceID, |
| "error": err.Error(), |
| "packet": hex.EncodeToString(packet.Data), |
| }) |
| rpce := agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, err.Error(), nil) |
| go agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce, |
| voltha.EventCategory_COMMUNICATION, nil, time.Now().Unix()) |
| } |
| |
| func (agent *Agent) packetOut(ctx context.Context, outPort uint32, packet *ofp.OfpPacketOut) error { |
| if agent.deviceType == "" { |
| agent.reconcileWithKVStore(ctx) |
| } |
| // Send packet to adapter |
| client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint) |
| if err != nil { |
| logger.Errorw(ctx, "grpc-client-nil", |
| log.Fields{ |
| "error": err, |
| "device-id": agent.deviceID, |
| "device-type": agent.deviceType, |
| }) |
| return err |
| } |
| subCtx, cancel := context.WithTimeout(coreutils.WithAllMetadataFromContext(ctx), agent.rpcTimeout) |
| go func() { |
| defer cancel() |
| _, err := client.SendPacketOut(subCtx, &ca.PacketOut{ |
| DeviceId: agent.deviceID, |
| EgressPortNo: outPort, |
| Packet: packet, |
| }) |
| if err == nil { |
| agent.onSuccess(subCtx, nil, nil, false) |
| } else { |
| agent.onPacketFailure(subCtx, err, packet) |
| } |
| }() |
| return nil |
| } |
| |
| func (agent *Agent) updateDeviceUsingAdapterData(ctx context.Context, device *voltha.Device) error { |
| var err error |
| var desc string |
| requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE} |
| defer func() { agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc) }() |
| |
| if err = agent.requestQueue.WaitForGreenLight(ctx); err != nil { |
| return err |
| } |
| logger.Debugw(ctx, "update-device-using-adapter-data", log.Fields{"device-id": device.Id}) |
| |
| cloned := agent.cloneDeviceWithoutLock() |
| cloned.Root = device.Root |
| cloned.Vendor = device.Vendor |
| cloned.Model = device.Model |
| cloned.SerialNumber = device.SerialNumber |
| cloned.MacAddress = device.MacAddress |
| cloned.Vlan = device.Vlan |
| cloned.Reason = device.Reason |
| cloned.ImageDownloads = device.ImageDownloads |
| cloned.OperStatus = device.OperStatus |
| cloned.ConnectStatus = device.ConnectStatus |
| if err = agent.updateDeviceAndReleaseLock(ctx, cloned); err == nil { |
| requestStatus.Code = common.OperationResp_OPERATION_SUCCESS |
| } |
| return err |
| } |
| |
| func (agent *Agent) updateDeviceStatus(ctx context.Context, operStatus voltha.OperStatus_Types, connStatus voltha.ConnectStatus_Types) error { |
| var err error |
| var desc string |
| opStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE} |
| defer func() { agent.logDeviceUpdate(ctx, nil, nil, opStatus, err, desc) }() |
| |
| if err = agent.requestQueue.WaitForGreenLight(ctx); err != nil { |
| return err |
| } |
| |
| cloned := agent.cloneDeviceWithoutLock() |
| // Ensure the enums passed in are valid - they will be invalid if they are not set when this function is invoked |
| if s, ok := voltha.ConnectStatus_Types_name[int32(connStatus)]; ok { |
| logger.Debugw(ctx, "update-device-conn-status", log.Fields{"ok": ok, "val": s}) |
| cloned.ConnectStatus = connStatus |
| } |
| if s, ok := voltha.OperStatus_Types_name[int32(operStatus)]; ok { |
| logger.Debugw(ctx, "update-device-oper-status", log.Fields{"ok": ok, "val": s}) |
| cloned.OperStatus = operStatus |
| } |
| logger.Debugw(ctx, "update-device-status", log.Fields{"device-id": cloned.Id, "oper-status": cloned.OperStatus, "connect-status": cloned.ConnectStatus}) |
| // Store the device |
| if err = agent.updateDeviceAndReleaseLock(ctx, cloned); err == nil { |
| opStatus.Code = common.OperationResp_OPERATION_SUCCESS |
| } |
| return err |
| } |
| |
| // TODO: A generic device update by attribute |
| func (agent *Agent) updateDeviceAttribute(ctx context.Context, name string, value interface{}) { |
| if value == nil { |
| return |
| } |
| |
| if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil { |
| logger.Warnw(ctx, "request-aborted", log.Fields{"device-id": agent.deviceID, "name": name, "error": err}) |
| return |
| } |
| |
| cloned := agent.cloneDeviceWithoutLock() |
| updated := false |
| s := reflect.ValueOf(cloned).Elem() |
| if s.Kind() == reflect.Struct { |
| // exported field |
| f := s.FieldByName(name) |
| if f.IsValid() && f.CanSet() { |
| switch f.Kind() { |
| case reflect.String: |
| f.SetString(value.(string)) |
| updated = true |
| case reflect.Uint32: |
| f.SetUint(uint64(value.(uint32))) |
| updated = true |
| case reflect.Bool: |
| f.SetBool(value.(bool)) |
| updated = true |
| } |
| } |
| } |
| logger.Debugw(ctx, "update-field-status", log.Fields{"device-id": cloned.Id, "name": name, "updated": updated}) |
| // Save the data |
| |
| if err := agent.updateDeviceAndReleaseLock(ctx, cloned); err != nil { |
| logger.Warnw(ctx, "attribute-update-failed", log.Fields{"attribute": name, "value": value}) |
| } |
| } |
| |
| func (agent *Agent) simulateAlarm(ctx context.Context, simulateReq *voltha.SimulateAlarmRequest) error { |
| var err error |
| var desc string |
| requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE} |
| defer func() { agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc) }() |
| |
| if err = agent.requestQueue.WaitForGreenLight(ctx); err != nil { |
| return err |
| } |
| defer agent.requestQueue.RequestComplete() |
| logger.Debugw(ctx, "simulate-alarm", log.Fields{"device-id": agent.deviceID}) |
| |
| device := agent.getDeviceReadOnlyWithoutLock() |
| |
| client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint) |
| if err != nil { |
| logger.Errorw(ctx, "grpc-client-nil", |
| log.Fields{ |
| "error": err, |
| "device-id": agent.deviceID, |
| "device-type": agent.deviceType, |
| "adapter-endpoint": device.AdapterEndpoint, |
| }) |
| return err |
| } |
| subCtx, cancel := context.WithTimeout(coreutils.WithAllMetadataFromContext(ctx), agent.rpcTimeout) |
| requestStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS |
| go func() { |
| defer cancel() |
| _, err := client.SimulateAlarm(subCtx, &ca.SimulateAlarmMessage{Device: device, Request: simulateReq}) |
| if err == nil { |
| agent.onSuccess(subCtx, nil, nil, false) |
| } else { |
| agent.onFailure(subCtx, err, nil, nil, false) |
| } |
| }() |
| return nil |
| } |
| |
| // This function updates the device in the DB, releases the device lock, and runs any state transitions. |
| // The calling function MUST hold the device lock. The caller MUST NOT modify the device after this is called. |
| func (agent *Agent) updateDeviceAndReleaseLock(ctx context.Context, device *voltha.Device) error { |
| // fail early if this agent is no longer valid |
| if agent.stopped { |
| agent.requestQueue.RequestComplete() |
| return errors.New("device-agent-stopped") |
| } |
| |
| // update in db |
| if err := agent.dbProxy.Set(ctx, agent.deviceID, device); err != nil { |
| agent.requestQueue.RequestComplete() |
| return status.Errorf(codes.Internal, "failed-update-device:%s: %s", agent.deviceID, err) |
| } |
| logger.Debugw(ctx, "updated-device-in-store", log.Fields{"device-id: ": agent.deviceID}) |
| |
| prevDevice := agent.device |
| // update the device |
| agent.device = device |
| //If any of the states has chenged, send the change event. |
| if prevDevice.OperStatus != device.OperStatus || prevDevice.ConnectStatus != device.ConnectStatus || prevDevice.AdminState != device.AdminState { |
| _ = agent.deviceMgr.Agent.SendDeviceStateChangeEvent(ctx, prevDevice.OperStatus, prevDevice.ConnectStatus, prevDevice.AdminState, device, time.Now().Unix()) |
| } |
| deviceTransientState := agent.getTransientState() |
| |
| // release lock before processing transition |
| agent.requestQueue.RequestComplete() |
| subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx) |
| |
| if err := agent.deviceMgr.stateTransitions.ProcessTransition(subCtx, |
| device, prevDevice, deviceTransientState, deviceTransientState); err != nil { |
| logger.Errorw(ctx, "failed-process-transition", log.Fields{"device-id": device.Id, "previous-admin-state": prevDevice.AdminState, "current-admin-state": device.AdminState}) |
| // Sending RPC EVENT here |
| rpce := agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, err.Error(), nil) |
| agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce, voltha.EventCategory_COMMUNICATION, |
| nil, time.Now().Unix()) |
| |
| } |
| return nil |
| } |
| |
| // This function updates the device transient in the DB through loader, releases the device lock, and runs any state transitions. |
| // The calling function MUST hold the device lock. The caller MUST NOT modify the device after this is called. |
| func (agent *Agent) updateDeviceWithTransientStateAndReleaseLock(ctx context.Context, device *voltha.Device, |
| transientState, prevTransientState core.DeviceTransientState_Types) error { |
| // fail early if this agent is no longer valid |
| if agent.stopped { |
| agent.requestQueue.RequestComplete() |
| return errors.New("device-agent-stopped") |
| } |
| //update device TransientState |
| if err := agent.updateTransientState(ctx, transientState); err != nil { |
| agent.requestQueue.RequestComplete() |
| return err |
| } |
| // update in db |
| if err := agent.dbProxy.Set(ctx, agent.deviceID, device); err != nil { |
| //Reverting TransientState update |
| if errTransient := agent.updateTransientState(ctx, prevTransientState); errTransient != nil { |
| logger.Errorw(ctx, "failed-to-revert-transient-state-update-on-error", log.Fields{"device-id": device.Id, |
| "previous-transient-state": prevTransientState, "current-transient-state": transientState, "error": errTransient}) |
| } |
| agent.requestQueue.RequestComplete() |
| return status.Errorf(codes.Internal, "failed-update-device:%s: %s", agent.deviceID, err) |
| } |
| |
| logger.Debugw(ctx, "updated-device-in-store", log.Fields{"device-id: ": agent.deviceID}) |
| |
| prevDevice := agent.device |
| // update the device |
| agent.device = device |
| //If any of the states has chenged, send the change event. |
| if prevDevice.OperStatus != device.OperStatus || prevDevice.ConnectStatus != device.ConnectStatus || prevDevice.AdminState != device.AdminState { |
| _ = agent.deviceMgr.Agent.SendDeviceStateChangeEvent(ctx, prevDevice.OperStatus, prevDevice.ConnectStatus, prevDevice.AdminState, device, time.Now().Unix()) |
| } |
| |
| // release lock before processing transition |
| agent.requestQueue.RequestComplete() |
| |
| if err := agent.deviceMgr.stateTransitions.ProcessTransition(ctx, |
| device, prevDevice, transientState, prevTransientState); err != nil { |
| logger.Errorw(ctx, "failed-process-transition", log.Fields{"device-id": device.Id, "previous-admin-state": prevDevice.AdminState, "current-admin-state": device.AdminState}) |
| // Sending RPC EVENT here |
| rpce := agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, err.Error(), nil) |
| go agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce, voltha.EventCategory_COMMUNICATION, |
| nil, time.Now().Unix()) |
| } |
| |
| return nil |
| } |
| func (agent *Agent) updateDeviceReason(ctx context.Context, reason string) error { |
| logger.Debugw(ctx, "update-device-reason", log.Fields{"device-id": agent.deviceID, "reason": reason}) |
| |
| var err error |
| var desc string |
| requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE} |
| defer func() { agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc) }() |
| |
| if err = agent.requestQueue.WaitForGreenLight(ctx); err != nil { |
| return err |
| } |
| |
| cloned := agent.cloneDeviceWithoutLock() |
| cloned.Reason = reason |
| if err = agent.updateDeviceAndReleaseLock(ctx, cloned); err == nil { |
| requestStatus.Code = common.OperationResp_OPERATION_SUCCESS |
| } |
| return err |
| } |
| |
| func (agent *Agent) ChildDeviceLost(ctx context.Context, device *voltha.Device) error { |
| logger.Debugw(ctx, "child-device-lost", log.Fields{"child-device-id": device.Id, "parent-device-id": agent.deviceID}) |
| |
| var err error |
| var desc string |
| requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE} |
| defer func() { agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc) }() |
| |
| // Remove the associated peer ports on the parent device |
| for portID := range agent.portLoader.ListIDs() { |
| if portHandle, have := agent.portLoader.Lock(portID); have { |
| oldPort := portHandle.GetReadOnly() |
| updatedPeers := make([]*voltha.Port_PeerPort, 0) |
| for _, peerPort := range oldPort.Peers { |
| if peerPort.DeviceId != device.Id { |
| updatedPeers = append(updatedPeers, peerPort) |
| } |
| } |
| newPort := *oldPort |
| newPort.Peers = updatedPeers |
| if err := portHandle.Update(ctx, &newPort); err != nil { |
| portHandle.Unlock() |
| return nil |
| } |
| portHandle.Unlock() |
| } |
| } |
| |
| //send request to adapter |
| client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint) |
| if err != nil { |
| logger.Errorw(ctx, "grpc-client-nil", |
| log.Fields{ |
| "error": err, |
| "device-id": agent.deviceID, |
| "device-type": agent.deviceType, |
| "adapter-endpoint": device.AdapterEndpoint, |
| }) |
| return err |
| } |
| subCtx, cancel := context.WithTimeout(coreutils.WithAllMetadataFromContext(ctx), agent.rpcTimeout) |
| requestStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS |
| |
| defer cancel() |
| _, err = client.ChildDeviceLost(subCtx, device) |
| if err == nil { |
| agent.onSuccess(subCtx, nil, nil, true) |
| } else { |
| agent.onFailure(subCtx, err, nil, nil, true) |
| } |
| |
| return nil |
| } |
| |
| func (agent *Agent) startOmciTest(ctx context.Context, omcitestrequest *omci.OmciTestRequest) (*omci.TestResponse, error) { |
| var err error |
| var desc string |
| requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE} |
| defer func() { agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc) }() |
| |
| // OMCI test may be performed on a pre-provisioned device. If a device is in that state both its device type and endpoint |
| // may not have been set yet. |
| // First check if we need to update the type or endpoint |
| cloned, err := agent.getDeviceReadOnly(ctx) |
| if err != nil { |
| return nil, err |
| } |
| if cloned.Type == "" || cloned.AdapterEndpoint == "" { |
| if err = agent.updateDeviceTypeAndEndpoint(ctx); err != nil { |
| return nil, err |
| } |
| cloned, err = agent.getDeviceReadOnly(ctx) |
| if err != nil { |
| return nil, err |
| } |
| } |
| |
| // Send request to the adapter |
| client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint) |
| if err != nil { |
| logger.Errorw(ctx, "grpc-client-nil", |
| log.Fields{ |
| "error": err, |
| "device-id": agent.deviceID, |
| "device-type": agent.deviceType, |
| "adapter-endpoint": cloned.AdapterEndpoint, |
| }) |
| return nil, err |
| } |
| |
| res, err := client.StartOmciTest(ctx, &ca.OMCITest{ |
| Device: cloned, |
| Request: omcitestrequest, |
| }) |
| if err == nil { |
| requestStatus.Code = common.OperationResp_OPERATION_SUCCESS |
| } |
| return res, err |
| } |
| |
| func (agent *Agent) getExtValue(ctx context.Context, pdevice *voltha.Device, cdevice *voltha.Device, valueparam *extension.ValueSpecifier) (*extension.ReturnValues, error) { |
| logger.Debugw(ctx, "get-ext-value", log.Fields{"device-id": agent.deviceID, "onu-id": valueparam.Id, "value-type": valueparam.Value}) |
| var err error |
| var desc string |
| requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE} |
| defer func() { agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc) }() |
| |
| if err = agent.requestQueue.WaitForGreenLight(ctx); err != nil { |
| return nil, err |
| } |
| |
| //send request to adapter synchronously |
| client, err := agent.adapterMgr.GetAdapterClient(ctx, pdevice.AdapterEndpoint) |
| if err != nil { |
| logger.Errorw(ctx, "grpc-client-nil", |
| log.Fields{ |
| "error": err, |
| "device-id": agent.deviceID, |
| "device-type": agent.deviceType, |
| "adapter-endpoint": pdevice.AdapterEndpoint, |
| }) |
| agent.requestQueue.RequestComplete() |
| return nil, err |
| } |
| |
| // Release lock before sending to adapter |
| agent.requestQueue.RequestComplete() |
| |
| retVal, err := client.GetExtValue(ctx, &ca.GetExtValueMessage{ |
| ParentDevice: pdevice, |
| ChildDevice: cdevice, |
| ValueType: valueparam.Value, |
| }) |
| if err == nil { |
| requestStatus.Code = common.OperationResp_OPERATION_SUCCESS |
| } |
| return retVal, err |
| } |
| |
| func (agent *Agent) setExtValue(ctx context.Context, device *voltha.Device, value *extension.ValueSet) (*empty.Empty, error) { |
| logger.Debugw(ctx, "set-ext-value", log.Fields{"device-id": value.Id}) |
| |
| var err error |
| var desc string |
| requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE} |
| defer func() { agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc) }() |
| |
| if err = agent.requestQueue.WaitForGreenLight(ctx); err != nil { |
| return nil, err |
| } |
| |
| //send request to adapter |
| //send request to adapter synchronously |
| client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint) |
| if err != nil { |
| logger.Errorw(ctx, "grpc-client-nil", |
| log.Fields{ |
| "error": err, |
| "device-id": agent.deviceID, |
| "device-type": agent.deviceType, |
| "adapter-endpoint": device.AdapterEndpoint, |
| }) |
| agent.requestQueue.RequestComplete() |
| return nil, err |
| } |
| // Release lock before sending request to adapter |
| agent.requestQueue.RequestComplete() |
| |
| retVal, err := client.SetExtValue(ctx, &ca.SetExtValueMessage{ |
| Device: device, |
| Value: value, |
| }) |
| if err == nil { |
| requestStatus.Code = common.OperationResp_OPERATION_SUCCESS |
| } |
| return retVal, err |
| } |
| |
| func (agent *Agent) getSingleValue(ctx context.Context, request *extension.SingleGetValueRequest) (*extension.SingleGetValueResponse, error) { |
| logger.Debugw(ctx, "get-single-value", log.Fields{"device-id": request.TargetId}) |
| |
| var err error |
| var desc string |
| requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE} |
| defer func() { agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc) }() |
| |
| if err = agent.requestQueue.WaitForGreenLight(ctx); err != nil { |
| return nil, err |
| } |
| |
| cloned := agent.cloneDeviceWithoutLock() |
| |
| //send request to adapter |
| client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint) |
| if err != nil { |
| logger.Errorw(ctx, "grpc-client-nil", |
| log.Fields{ |
| "error": err, |
| "device-id": cloned.Id, |
| "adapter-endpoint": cloned.AdapterEndpoint, |
| }) |
| agent.requestQueue.RequestComplete() |
| return nil, err |
| } |
| // Release lock before sending request to adapter |
| agent.requestQueue.RequestComplete() |
| |
| resp, err := client.GetSingleValue(ctx, request) |
| if err == nil { |
| requestStatus.Code = common.OperationResp_OPERATION_SUCCESS |
| } |
| return resp, err |
| } |
| |
| func (agent *Agent) setSingleValue(ctx context.Context, request *extension.SingleSetValueRequest) (*extension.SingleSetValueResponse, error) { |
| logger.Debugw(ctx, "set-single-value", log.Fields{"device-id": request.TargetId}) |
| |
| var err error |
| var desc string |
| requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE} |
| defer func() { agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc) }() |
| |
| if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil { |
| return nil, err |
| } |
| |
| cloned := agent.cloneDeviceWithoutLock() |
| |
| //send request to adapter |
| client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint) |
| if err != nil { |
| logger.Errorw(ctx, "grpc-client-nil", |
| log.Fields{ |
| "error": err, |
| "device-id": agent.deviceID, |
| "device-type": agent.deviceType, |
| "adapter-endpoint": cloned.AdapterEndpoint, |
| }) |
| agent.requestQueue.RequestComplete() |
| return nil, err |
| } |
| // Release lock before sending request to adapter |
| agent.requestQueue.RequestComplete() |
| |
| resp, err := client.SetSingleValue(ctx, request) |
| if err == nil { |
| requestStatus.Code = common.OperationResp_OPERATION_SUCCESS |
| } |
| return resp, err |
| } |
| |
| func (agent *Agent) proceedWithRequest(device *voltha.Device) bool { |
| return !agent.isDeletionInProgress() && !agent.isInReconcileState(device) |
| } |
| |
| func (agent *Agent) stopReconcile() { |
| agent.stopReconcilingMutex.Lock() |
| if agent.stopReconciling != nil { |
| agent.stopReconciling <- 0 |
| } |
| agent.stopReconcilingMutex.Unlock() |
| } |
| |
| // abortAllProcessing is invoked when an adapter managing this device is restarted |
| func (agent *Agent) abortAllProcessing(ctx context.Context) error { |
| logger.Infow(ctx, "aborting-current-running-requests", log.Fields{"device-id": agent.deviceID}) |
| if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil { |
| return err |
| } |
| logger.Infow(ctx, "aborting-current-running-requests-after-wait", log.Fields{"device-id": agent.deviceID}) |
| |
| defer agent.requestQueue.RequestComplete() |
| |
| // If any reconciling is in progress just abort it. The adapter is gone. |
| agent.stopReconcile() |
| |
| logger.Infow(ctx, "aborting-current-running-requests-after-sendstop", log.Fields{"device-id": agent.deviceID}) |
| |
| // Update the Core device transient state accordingly |
| var updatedState core.DeviceTransientState_Types |
| switch agent.getTransientState() { |
| case core.DeviceTransientState_RECONCILE_IN_PROGRESS: |
| updatedState = core.DeviceTransientState_NONE |
| case core.DeviceTransientState_FORCE_DELETING: |
| updatedState = core.DeviceTransientState_DELETE_FAILED |
| case core.DeviceTransientState_DELETING_FROM_ADAPTER: |
| updatedState = core.DeviceTransientState_DELETE_FAILED |
| case core.DeviceTransientState_DELETE_FAILED: |
| // do not change state |
| return nil |
| default: |
| updatedState = core.DeviceTransientState_NONE |
| } |
| if err := agent.updateTransientState(ctx, updatedState); err != nil { |
| logger.Errorf(ctx, "transient-state-update-failed", log.Fields{"error": err}) |
| return err |
| } |
| return nil |
| } |
| |
| func (agent *Agent) DeleteDevicePostAdapterRestart(ctx context.Context) error { |
| logger.Debugw(ctx, "delete-post-restart", log.Fields{"device-id": agent.deviceID}) |
| ctx = utils.WithNewSpanAndRPCMetadataContext(ctx, "DelteDevicePostAdapterRestart") |
| |
| if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil { |
| return err |
| } |
| |
| device := agent.getDeviceReadOnlyWithoutLock() |
| if device.AdminState == voltha.AdminState_PREPROVISIONED { |
| logger.Debugw(ctx, "device-in-preprovisioning-state-reconcile-not-needed", log.Fields{"device-id": device.Id}) |
| agent.requestQueue.RequestComplete() |
| return nil |
| } |
| // Change device transient state to FORCE_DELETING |
| if err := agent.updateTransientState(ctx, core.DeviceTransientState_FORCE_DELETING); err != nil { |
| logger.Errorw(ctx, "failure-updating-transient-state", log.Fields{"error": err, "device-id": agent.deviceID}) |
| agent.requestQueue.RequestComplete() |
| return err |
| } |
| |
| // Ensure we have a valid grpc client available as we have just restarted |
| deleteBackoff := backoff.NewExponentialBackOff() |
| deleteBackoff.InitialInterval = agent.config.BackoffRetryInitialInterval |
| deleteBackoff.MaxElapsedTime = agent.config.BackoffRetryMaxElapsedTime |
| deleteBackoff.MaxInterval = agent.config.BackoffRetryMaxInterval |
| var backoffTimer *time.Timer |
| var err error |
| var client adapter_service.AdapterServiceClient |
| retry: |
| for { |
| client, err = agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint) |
| if err == nil { |
| break retry |
| } |
| duration := deleteBackoff.NextBackOff() |
| if duration == backoff.Stop { |
| deleteBackoff.Reset() |
| duration = deleteBackoff.NextBackOff() |
| } |
| backoffTimer = time.NewTimer(duration) |
| select { |
| case <-backoffTimer.C: |
| logger.Debugw(ctx, "backoff-timer-expires", log.Fields{"device-id": agent.deviceID}) |
| case <-ctx.Done(): |
| err = ctx.Err() |
| break retry |
| } |
| } |
| if backoffTimer != nil && !backoffTimer.Stop() { |
| select { |
| case <-backoffTimer.C: |
| default: |
| } |
| } |
| if err != nil || client == nil { |
| agent.requestQueue.RequestComplete() |
| return err |
| } |
| |
| // Release the device lock to allow for device state update, if any |
| agent.requestQueue.RequestComplete() |
| |
| // Send the delete request to the adapter |
| subCtx, cancel := context.WithTimeout(coreutils.WithAllMetadataFromContext(ctx), agent.rpcTimeout) |
| defer cancel() |
| _, err = client.DeleteDevice(subCtx, device) |
| agent.onForceDeleteResponse(subCtx, nil, nil, err) |
| return err |
| } |
| |
| func (agent *Agent) ReconcileDevice(ctx context.Context) { |
| // Do not reconcile if the device was in DELETE_FAILED transient state. Just invoke the force delete on that device. |
| state := agent.getTransientState() |
| logger.Debugw(ctx, "starting-reconcile", log.Fields{"device-id": agent.deviceID, "state": state}) |
| if agent.getTransientState() == core.DeviceTransientState_DELETE_FAILED { |
| if err := agent.DeleteDevicePostAdapterRestart(ctx); err != nil { |
| logger.Errorw(ctx, "delete-post-restart-failed", log.Fields{"error": err, "device-id": agent.deviceID}) |
| } |
| return |
| } |
| |
| requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE} |
| var desc string |
| |
| if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil { |
| agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc) |
| return |
| } |
| |
| device := agent.getDeviceReadOnlyWithoutLock() |
| if device.AdminState == voltha.AdminState_PREPROVISIONED { |
| agent.requestQueue.RequestComplete() |
| logger.Debugw(ctx, "device-in-preprovisioning-state-reconcile-not-needed", log.Fields{"device-id": device.Id}) |
| return |
| } |
| |
| if agent.isDeletionInProgress() { |
| agent.requestQueue.RequestComplete() |
| err := fmt.Errorf("cannot complete operation as device deletion is in progress device : %s", device.Id) |
| logger.Errorw(ctx, "reconcile-failed", log.Fields{"error": err}) |
| agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc) |
| return |
| } |
| |
| //set transient state to RECONCILE IN PROGRESS |
| err := agent.updateTransientState(ctx, core.DeviceTransientState_RECONCILE_IN_PROGRESS) |
| if err != nil { |
| agent.requestQueue.RequestComplete() |
| logger.Errorw(ctx, "setting-transient-state-failed", log.Fields{"error": err}) |
| agent.logDeviceUpdate(ctx, nil, nil, requestStatus, nil, desc) |
| return |
| } |
| |
| reconcilingBackoff := backoff.NewExponentialBackOff() |
| reconcilingBackoff.InitialInterval = agent.config.BackoffRetryInitialInterval |
| reconcilingBackoff.MaxElapsedTime = agent.config.BackoffRetryMaxElapsedTime |
| reconcilingBackoff.MaxInterval = agent.config.BackoffRetryMaxInterval |
| |
| //making here to keep lifecycle of this channel within the scope of retryReconcile |
| agent.stopReconcilingMutex.Lock() |
| agent.stopReconciling = make(chan int, 1) |
| agent.stopReconcilingMutex.Unlock() |
| |
| // defined outside the retry loop so it can be cleaned |
| // up when the loop breaks |
| var backoffTimer *time.Timer |
| |
| retry: |
| for { |
| // If the operations state of the device is RECONCILING_FAILED then we do not |
| // want to continue to attempt reconciliation. |
| deviceRef := agent.getDeviceReadOnlyWithoutLock() |
| if deviceRef.OperStatus == common.OperStatus_RECONCILING_FAILED { |
| logger.Warnw(ctx, "reconciling-failed-halting-retries", |
| log.Fields{"device-id": device.Id}) |
| agent.requestQueue.RequestComplete() |
| break retry |
| } |
| |
| // Use an exponential back off to prevent getting into a tight loop |
| duration := reconcilingBackoff.NextBackOff() |
| //This case should never occur in default case as max elapsed time for backoff is 0(by default) , so it will never return stop |
| if duration == backoff.Stop { |
| // If we reach a maximum then warn and reset the backoff |
| // timer and keep attempting. |
| logger.Warnw(ctx, "maximum-reconciling-backoff-reached--resetting-backoff-timer", |
| log.Fields{"max-reconciling-backoff": reconcilingBackoff.MaxElapsedTime, |
| "device-id": device.Id}) |
| reconcilingBackoff.Reset() |
| duration = reconcilingBackoff.NextBackOff() |
| } |
| |
| backoffTimer = time.NewTimer(duration) |
| |
| logger.Debugw(ctx, "retrying-reconciling", log.Fields{"deviceID": device.Id, "endpoint": device.AdapterEndpoint}) |
| // Release lock before sending request to adapter |
| agent.requestQueue.RequestComplete() |
| |
| // Send a reconcile request to the adapter. |
| err := agent.sendReconcileRequestToAdapter(ctx, device) |
| |
| // Check the transient state after a response from the adapter. If a device delete |
| // request was issued due to a callback during that time and failed then just delete |
| // the device and stop the reconcile loop and invoke the device deletion |
| if agent.getTransientState() == core.DeviceTransientState_DELETE_FAILED { |
| if dErr := agent.DeleteDevicePostAdapterRestart(ctx); dErr != nil { |
| logger.Errorw(ctx, "delete-post-restart-failed", log.Fields{"error": dErr, "device-id": agent.deviceID}) |
| } |
| break retry |
| } |
| |
| if errors.Is(err, errContextExpired) || errors.Is(err, errReconcileAborted) { |
| logger.Errorw(ctx, "reconcile-aborted", log.Fields{"error": err}) |
| requestStatus = &common.OperationResp{Code: common.OperationResp_OperationReturnCode(common.OperStatus_FAILED)} |
| desc = "aborted" |
| agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc) |
| break retry |
| } |
| st, ok := status.FromError(err) |
| if ok { |
| // Decode the error code and error message |
| errorCode := st.Code() |
| if errorCode == codes.AlreadyExists { |
| logger.Warnw(ctx, "device already reconciled", log.Fields{"error": err}) |
| err := agent.reconcilingCleanup(ctx) |
| if err != nil { |
| logger.Errorf(ctx, "error during reconcile cleanup", err.Error()) |
| } |
| break retry |
| |
| } |
| |
| } |
| if err != nil { |
| agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc) |
| <-backoffTimer.C |
| // backoffTimer expired continue |
| // Take lock back before retrying |
| if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil { |
| agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc) |
| break retry |
| } |
| continue |
| } |
| // Success |
| requestStatus = &common.OperationResp{Code: common.OperationResp_OPERATION_SUCCESS} |
| desc = "adapter-response" |
| agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc) |
| break retry |
| } |
| |
| logger.Debugw(ctx, "reconcile-retry-ends", log.Fields{"adapter-endpoint": agent.adapterEndpoint}) |
| |
| // Retry loop is broken, so stop any timers and drain the channel |
| if backoffTimer != nil && !backoffTimer.Stop() { |
| |
| // As per documentation and stack overflow when a timer is stopped its |
| // channel should be drained. The issue is that Stop returns false |
| // either if the timer has already been fired "OR" if the timer can be |
| // stopped before being fired. This means that in some cases the |
| // channel has already be emptied so attempting to read from it means |
| // a blocked thread. To get around this use a select so if the |
| // channel is already empty the default case hits and we are not |
| // blocked. |
| select { |
| case <-backoffTimer.C: |
| default: |
| } |
| } |
| } |
| |
| func (agent *Agent) sendReconcileRequestToAdapter(ctx context.Context, device *voltha.Device) error { |
| logger.Debugw(ctx, "sending-reconcile-to-adapter", log.Fields{"device-id": device.Id, "endpoint": agent.adapterEndpoint}) |
| client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint) |
| if err != nil { |
| return err |
| } |
| adapterResponse := make(chan error) |
| go func() { |
| _, err := client.ReconcileDevice(ctx, device) |
| adapterResponse <- err |
| }() |
| select { |
| // wait for response |
| case err := <-adapterResponse: |
| if err != nil { |
| return err |
| } |
| //In case of success quit retrying and wait for adapter to reset operation state of device |
| agent.stopReconcilingMutex.Lock() |
| agent.stopReconciling = nil |
| agent.stopReconcilingMutex.Unlock() |
| return nil |
| |
| //if reconciling need to be stopped |
| case _, ok := <-agent.stopReconciling: |
| agent.stopReconcilingMutex.Lock() |
| agent.stopReconciling = nil |
| agent.stopReconcilingMutex.Unlock() |
| if !ok { |
| //channel-closed |
| return fmt.Errorf("reconcile channel closed:%w", errReconcileAborted) |
| } |
| return fmt.Errorf("reconciling aborted:%w", errReconcileAborted) |
| // Context expired |
| case <-ctx.Done(): |
| return fmt.Errorf("context expired:%s :%w", ctx.Err(), errContextExpired) |
| } |
| } |
| |
| func (agent *Agent) reconcilingCleanup(ctx context.Context) error { |
| var desc string |
| var err error |
| operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE} |
| defer func() { agent.logDeviceUpdate(ctx, nil, nil, operStatus, err, desc) }() |
| |
| if err = agent.requestQueue.WaitForGreenLight(ctx); err != nil { |
| desc = "reconcile-cleanup-failed" |
| return err |
| } |
| defer agent.requestQueue.RequestComplete() |
| err = agent.updateTransientState(ctx, core.DeviceTransientState_NONE) |
| if err != nil { |
| logger.Errorf(ctx, "transient-state-update-failed", log.Fields{"error": err}) |
| return err |
| } |
| operStatus.Code = common.OperationResp_OPERATION_SUCCESS |
| return nil |
| } |
| |
| func (agent *Agent) isAdapterConnectionUp(ctx context.Context) bool { |
| c, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint) |
| return c != nil && err == nil |
| } |
| |
| func (agent *Agent) canDeviceRequestProceed(ctx context.Context) error { |
| if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil { |
| return err |
| } |
| defer agent.requestQueue.RequestComplete() |
| if agent.proceedWithRequest(agent.device) { |
| return nil |
| } |
| return fmt.Errorf("device-cannot-process-request-%s", agent.deviceID) |
| } |