VOL-3507 Implement the device update queries in rw-core
Change-Id: I2e9de4935c12981ddb7f10924d629bcd0ec09ef5
diff --git a/rw_core/core/device/agent.go b/rw_core/core/device/agent.go
index bc5fc1a..9358d8a 100755
--- a/rw_core/core/device/agent.go
+++ b/rw_core/core/device/agent.go
@@ -21,13 +21,15 @@
"encoding/hex"
"errors"
"fmt"
+ "github.com/gogo/protobuf/proto"
+ "github.com/golang/protobuf/ptypes"
+ "github.com/golang/protobuf/ptypes/empty"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
"reflect"
"sync"
"time"
- "github.com/gogo/protobuf/proto"
- "github.com/golang/protobuf/ptypes"
- "github.com/golang/protobuf/ptypes/empty"
"github.com/opencord/voltha-go/db/model"
"github.com/opencord/voltha-go/rw_core/core/adapter"
"github.com/opencord/voltha-go/rw_core/core/device/flow"
@@ -38,12 +40,11 @@
coreutils "github.com/opencord/voltha-go/rw_core/utils"
"github.com/opencord/voltha-lib-go/v4/pkg/kafka"
"github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-protos/v4/go/common"
"github.com/opencord/voltha-protos/v4/go/extension"
ic "github.com/opencord/voltha-protos/v4/go/inter_container"
ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
"github.com/opencord/voltha-protos/v4/go/voltha"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/status"
)
// Agent represents device agent attributes
@@ -136,12 +137,20 @@
logger.Infow(ctx, "device-loaded-from-db", log.Fields{"device-id": agent.deviceID})
} else {
// Create a new device
+ var desc string
+ prevState := common.AdminState_UNKNOWN
+ currState := common.AdminState_UNKNOWN
+ operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+
+ defer agent.logDeviceUpdate(ctx, "createDevice", &prevState, &currState, operStatus, &desc)
+
// Assumption is that AdminState, FlowGroups, and Flows are uninitialized since this
// is a new device, so populate them here before passing the device to ldProxy.Set.
// agent.deviceId will also have been set during newAgent().
device = (proto.Clone(deviceToCreate)).(*voltha.Device)
device.Id = agent.deviceID
device.AdminState = voltha.AdminState_PREPROVISIONED
+ currState = device.AdminState
if !deviceToCreate.GetRoot() && deviceToCreate.ProxyAddress != nil {
// Set the default vlan ID to the one specified by the parent adapter. It can be
// overwritten by the child adapter during a device update request
@@ -150,8 +159,10 @@
// Add the initial device to the local model
if err := agent.dbProxy.Set(ctx, agent.deviceID, device); err != nil {
+ desc = fmt.Sprintf("failed-adding-device-%s: %s", agent.deviceID, err.Error())
return nil, status.Errorf(codes.Aborted, "failed-adding-device-%s: %s", agent.deviceID, err)
}
+ operStatus.Code = common.OperationResp_OPERATION_SUCCESS
agent.device = device
}
startSucceeded = true
@@ -312,6 +323,42 @@
}
}
+func (agent *Agent) waitForAdapterResponseAndLogDeviceUpdate(ctx context.Context, cancel context.CancelFunc, rpc string, ch chan *kafka.RpcResponse,
+ onSuccess coreutils.ResponseCallback, onFailure coreutils.ResponseCallback, prevState *common.AdminState_Types, reqArgs ...interface{}) {
+ defer cancel()
+ var desc string
+ operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+ defer agent.logDeviceUpdate(ctx, rpc, prevState, &agent.device.AdminState, operStatus, &desc)
+ var rpce *voltha.RPCEvent
+ defer func() {
+ if rpce != nil {
+ go agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce,
+ voltha.EventCategory_COMMUNICATION, nil, time.Now().UnixNano())
+ }
+ }()
+
+ select {
+ case rpcResponse, ok := <-ch:
+ if !ok {
+ rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, "Response Channel Closed", nil)
+ onFailure(ctx, rpc, status.Errorf(codes.Aborted, "channel-closed"), reqArgs)
+ //add failure
+ } else if rpcResponse.Err != nil {
+ desc = rpcResponse.Err.Error()
+ rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, desc, nil)
+ onFailure(ctx, rpc, rpcResponse.Err, reqArgs)
+ //add failure
+ } else {
+ operStatus.Code = common.OperationResp_OPERATION_SUCCESS
+ onSuccess(ctx, rpc, rpcResponse.Reply, reqArgs)
+ }
+ case <-ctx.Done():
+ desc = ctx.Err().Error()
+ rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, desc, nil)
+ onFailure(ctx, rpc, ctx.Err(), reqArgs)
+ }
+}
+
// getDeviceReadOnly returns a device which MUST NOT be modified, but is safe to keep forever.
func (agent *Agent) getDeviceReadOnly(ctx context.Context) (*voltha.Device, error) {
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
@@ -335,20 +382,34 @@
// enableDevice activates a preprovisioned or a disable device
func (agent *Agent) enableDevice(ctx context.Context) error {
+ //To preserve and use oldDevice state as prev state in new device
+ prevDeviceState := agent.device.AdminState
+ var desc string
+ operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+
+ defer agent.logDeviceUpdate(ctx, "enableDevice", nil, nil, operStatus, &desc)
+
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
logger.Debugw(ctx, "enable-device", log.Fields{"device-id": agent.deviceID})
oldDevice := agent.getDeviceReadOnlyWithoutLock()
+
if oldDevice.AdminState == voltha.AdminState_ENABLED {
logger.Warnw(ctx, "device-already-enabled", log.Fields{"device-id": agent.deviceID})
agent.requestQueue.RequestComplete()
- return status.Error(codes.FailedPrecondition, fmt.Sprintf("cannot-enable-an-already-enabled-device: %s", oldDevice.Id))
+ desc = fmt.Sprintf("cannot-enable-an-already-enabled-device: %s", oldDevice.Id)
+ return status.Error(codes.FailedPrecondition, desc)
}
if agent.isDeletionInProgress() {
agent.requestQueue.RequestComplete()
- return status.Errorf(codes.FailedPrecondition, "deviceId:%s, Device deletion is in progress.", agent.deviceID)
+
+ operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
+
+ desc = fmt.Sprintf("deviceId:%s, Device deletion is in progress.", agent.deviceID)
+ return status.Error(codes.FailedPrecondition, desc)
+
}
// First figure out which adapter will handle this device type. We do it at this stage as allow devices to be
// pre-provisioned with the required adapter not registered. At this stage, since we need to communicate
@@ -356,6 +417,7 @@
adapterName, err := agent.adapterMgr.GetAdapterType(oldDevice.Type)
if err != nil {
agent.requestQueue.RequestComplete()
+ desc = err.Error()
return err
}
@@ -365,7 +427,9 @@
// Update the Admin State and set the operational state to activating before sending the request to the Adapters
newDevice.AdminState = voltha.AdminState_ENABLED
newDevice.OperStatus = voltha.OperStatus_ACTIVATING
+
if err := agent.updateDeviceAndReleaseLock(ctx, newDevice); err != nil {
+ desc = err.Error()
return err
}
@@ -373,6 +437,7 @@
var ch chan *kafka.RpcResponse
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+ subCtx = coreutils.WithFromTopicMetadataFromContext(subCtx, ctx)
if oldDevice.AdminState == voltha.AdminState_PREPROVISIONED {
ch, err = agent.adapterProxy.AdoptDevice(subCtx, newDevice)
@@ -381,15 +446,23 @@
}
if err != nil {
cancel()
+ desc = err.Error()
return err
}
+
+ operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
+
// Wait for response
- go agent.waitForAdapterResponse(subCtx, cancel, "enableDevice", ch, agent.onSuccess, agent.onFailure)
+ go agent.waitForAdapterResponseAndLogDeviceUpdate(subCtx, cancel, "enableDevice", ch, agent.onSuccess, agent.onFailure, &prevDeviceState)
return nil
}
-func (agent *Agent) waitForAdapterFlowResponse(ctx context.Context, cancel context.CancelFunc, ch chan *kafka.RpcResponse, response coreutils.Response) {
+func (agent *Agent) waitForAdapterFlowResponse(ctx context.Context, cancel context.CancelFunc, rpc string, ch chan *kafka.RpcResponse, response coreutils.Response) {
defer cancel()
+ var desc string
+ operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+ defer agent.logDeviceUpdate(ctx, rpc, nil, nil, operStatus, &desc)
+
var rpce *voltha.RPCEvent
defer func() {
if rpce != nil {
@@ -401,17 +474,21 @@
case rpcResponse, ok := <-ch:
if !ok {
//add failure
+ desc = "Response Channel Closed"
rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, "Response Channel Closed", nil)
response.Error(status.Errorf(codes.Aborted, "channel-closed"))
} else if rpcResponse.Err != nil {
//add failure
- rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, rpcResponse.Err.Error(), nil)
+ desc = rpcResponse.Err.Error()
+ rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, desc, nil)
response.Error(rpcResponse.Err)
} else {
+ operStatus.Code = common.OperationResp_OPERATION_SUCCESS
response.Done()
}
case <-ctx.Done():
- rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, ctx.Err().Error(), nil)
+ desc = ctx.Err().Error()
+ rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, desc, nil)
response.Error(ctx.Err())
}
}
@@ -474,7 +551,15 @@
//disableDevice disable a device
func (agent *Agent) disableDevice(ctx context.Context) error {
+ var desc string
+ operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+
+ prevDeviceState := agent.device.AdminState
+
+ defer agent.logDeviceUpdate(ctx, "disableDevice", nil, nil, operStatus, &desc)
+
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ desc = err.Error()
return err
}
logger.Debugw(ctx, "disable-device", log.Fields{"device-id": agent.deviceID})
@@ -482,12 +567,14 @@
cloned := agent.cloneDeviceWithoutLock()
if cloned.AdminState == voltha.AdminState_DISABLED {
+ desc = "device-already-disabled"
logger.Debugw(ctx, "device-already-disabled", log.Fields{"device-id": agent.deviceID})
agent.requestQueue.RequestComplete()
return nil
}
if cloned.AdminState == voltha.AdminState_PREPROVISIONED {
agent.requestQueue.RequestComplete()
+ desc = fmt.Sprintf("deviceId:%s, invalid-admin-state:%s", agent.deviceID, cloned.AdminState)
return status.Errorf(codes.FailedPrecondition, "deviceId:%s, invalid-admin-state:%s", agent.deviceID, cloned.AdminState)
}
if agent.isDeletionInProgress() {
@@ -497,6 +584,7 @@
// Update the Admin State and operational state before sending the request out
cloned.AdminState = voltha.AdminState_DISABLED
cloned.OperStatus = voltha.OperStatus_UNKNOWN
+
if err := agent.updateDeviceAndReleaseLock(ctx, cloned); err != nil {
return err
}
@@ -507,15 +595,27 @@
ch, err := agent.adapterProxy.DisableDevice(subCtx, cloned)
if err != nil {
cancel()
+ desc = err.Error()
return err
}
- go agent.waitForAdapterResponse(subCtx, cancel, "disableDevice", ch, agent.onSuccess, agent.onFailure)
+ operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
+
+ // Wait for response
+ go agent.waitForAdapterResponseAndLogDeviceUpdate(subCtx, cancel, "disableDevice", ch, agent.onSuccess, agent.onFailure, &prevDeviceState)
return nil
}
func (agent *Agent) rebootDevice(ctx context.Context) error {
+ var desc string
+ operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+
+ prevDeviceState := agent.device.AdminState
+
+ defer agent.logDeviceUpdate(ctx, "rebootDevice", nil, nil, operStatus, &desc)
+
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ desc = err.Error()
return err
}
defer agent.requestQueue.RequestComplete()
@@ -531,15 +631,25 @@
ch, err := agent.adapterProxy.RebootDevice(subCtx, device)
if err != nil {
cancel()
+ desc = err.Error()
return err
}
- go agent.waitForAdapterResponse(subCtx, cancel, "rebootDevice", ch, agent.onSuccess, agent.onFailure)
+ operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
+
+ // Wait for response
+ go agent.waitForAdapterResponseAndLogDeviceUpdate(subCtx, cancel, "rebootDevice", ch, agent.onSuccess, agent.onFailure, &prevDeviceState)
return nil
}
func (agent *Agent) deleteDeviceForce(ctx context.Context) error {
logger.Debugw(ctx, "delete-device-force", log.Fields{"device-id": agent.deviceID})
+
+ var desc string
+ operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ desc = err.Error()
+ agent.logDeviceUpdate(ctx, "deleteDeviceForce", nil, nil, operStatus, &desc)
return err
}
// Get the device Transient state, return err if it is DELETING
@@ -547,8 +657,10 @@
if agent.isStateDeleting(previousDeviceTransientState) {
agent.requestQueue.RequestComplete()
- return status.Errorf(codes.FailedPrecondition, "deviceId:%s, Device Deletion is in progress",
+ desc = fmt.Sprintf("deviceId:%s, Device Deletion is in progress",
agent.deviceID)
+ agent.logDeviceUpdate(ctx, "deleteDeviceForce", nil, nil, operStatus, &desc)
+ return status.Error(codes.FailedPrecondition, desc)
}
device := agent.cloneDeviceWithoutLock()
if err := agent.updateDeviceWithTransientStateAndReleaseLock(ctx, device,
@@ -563,8 +675,13 @@
ch, err := agent.adapterProxy.DeleteDevice(subCtx, device)
if err != nil {
cancel()
+ desc = err.Error()
+ agent.logDeviceUpdate(ctx, "deleteDeviceForce", nil, nil, operStatus, &desc)
return err
}
+ // As force delete will not be dependent over the response of adapter, marking this operation as success
+ operStatus.Code = common.OperationResp_OPERATION_SUCCESS
+ agent.logDeviceUpdate(ctx, "deleteDeviceForce", nil, nil, operStatus, &desc)
// Since it is a case of force delete, nothing needs to be done on adapter responses.
go agent.waitForAdapterForceDeleteResponse(subCtx, cancel, "deleteDeviceForce", ch, agent.onSuccess,
agent.onFailure)
@@ -574,7 +691,15 @@
func (agent *Agent) deleteDevice(ctx context.Context) error {
logger.Debugw(ctx, "delete-device", log.Fields{"device-id": agent.deviceID})
+
+ var desc string
+ operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+ prevState := agent.device.AdminState
+
+ defer agent.logDeviceUpdate(ctx, "deleteDevice", nil, nil, operStatus, &desc)
+
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ desc = err.Error()
return err
}
// Get the device Transient state, return err if it is DELETING
@@ -582,7 +707,8 @@
if agent.isStateDeleting(previousDeviceTransientState) {
agent.requestQueue.RequestComplete()
- return status.Errorf(codes.FailedPrecondition, "deviceId:%s, Device Deletion is in progress", agent.deviceID)
+ desc = fmt.Sprintf("deviceId:%s, Device Deletion is in progress", agent.deviceID)
+ return status.Error(codes.FailedPrecondition, desc)
}
device := agent.cloneDeviceWithoutLock()
previousAdminState := device.AdminState
@@ -595,6 +721,7 @@
}
if err := agent.updateDeviceWithTransientStateAndReleaseLock(ctx, device,
currentDeviceTransientState, previousDeviceTransientState); err != nil {
+ desc = err.Error()
return err
}
// If the device was in pre-prov state (only parent device are in that state) then do not send the request to the
@@ -610,10 +737,13 @@
if err := agent.updateTransientState(ctx, voltha.DeviceTransientState_DELETE_FAILED); err != nil {
logger.Errorw(ctx, "failed-to-update-transient-state-as-delete-failed", log.Fields{"device-id": agent.deviceID})
}
+ desc = err.Error()
return err
}
- go agent.waitForAdapterResponse(subCtx, cancel, "deleteDevice", ch, agent.onDeleteSuccess,
- agent.onDeleteFailure)
+
+ operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
+ go agent.waitForAdapterResponseAndLogDeviceUpdate(subCtx, cancel, "deleteDevice", ch, agent.onDeleteSuccess,
+ agent.onDeleteFailure, &prevState)
}
return nil
}
@@ -877,11 +1007,24 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
+
logger.Debugw(ctx, "update-device-reason", log.Fields{"device-id": agent.deviceID, "reason": reason})
+ var desc string
+ operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+
+ defer agent.logDeviceUpdate(ctx, "updateDeviceReason", nil, nil, operStatus, &desc)
+
cloned := agent.cloneDeviceWithoutLock()
cloned.Reason = reason
- return agent.updateDeviceAndReleaseLock(ctx, cloned)
+ retErr := agent.updateDeviceAndReleaseLock(ctx, cloned)
+ if retErr != nil {
+ desc = retErr.Error()
+ } else {
+ operStatus.Code = common.OperationResp_OPERATION_SUCCESS
+ desc = reason
+ }
+ return retErr
}
func (agent *Agent) ChildDeviceLost(ctx context.Context, device *voltha.Device) error {
diff --git a/rw_core/core/device/agent_device_update.go b/rw_core/core/device/agent_device_update.go
new file mode 100644
index 0000000..9e5013a
--- /dev/null
+++ b/rw_core/core/device/agent_device_update.go
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package device
+
+import (
+ "context"
+ "fmt"
+ "github.com/opencord/voltha-go/rw_core/utils"
+ "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-protos/v4/go/common"
+)
+
+func (agent *Agent) logDeviceUpdate(ctx context.Context, operation string, prevState *common.AdminState_Types, currState *common.AdminState_Types, status *common.OperationResp, desc *string) {
+ logger.Debugw(ctx, "addDeviceUpdate", log.Fields{"device-id": agent.deviceID})
+
+ requestedBy := utils.GetFromTopicMetadataFromContext(ctx)
+
+ if requestedBy == "" {
+ requestedBy = "NB"
+ }
+
+ logger.Infow(ctx, "logDeviceUpdate", log.Fields{"device-update": operation, "device-update-id": agent.deviceID,
+ "requested-by": requestedBy, "state-change": agent.stateChangeString(prevState, currState),
+ "status": status.GetCode().String(), "description": desc})
+}
+
+func (agent *Agent) stateChangeString(prevState *common.AdminState_Types, currState *common.AdminState_Types) string {
+ device := agent.getDeviceReadOnlyWithoutLock()
+ if prevState != nil && currState != nil && *prevState != *currState {
+ return fmt.Sprintf("%s->%s", *prevState, device.AdminState)
+ }
+ return ""
+}
diff --git a/rw_core/core/device/agent_flow.go b/rw_core/core/device/agent_flow.go
index f2fd10a..4589b91 100644
--- a/rw_core/core/device/agent_flow.go
+++ b/rw_core/core/device/agent_flow.go
@@ -18,11 +18,13 @@
import (
"context"
+ "fmt"
"github.com/gogo/protobuf/proto"
coreutils "github.com/opencord/voltha-go/rw_core/utils"
fu "github.com/opencord/voltha-lib-go/v4/pkg/flows"
"github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-protos/v4/go/common"
ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
"github.com/opencord/voltha-protos/v4/go/voltha"
"google.golang.org/grpc/codes"
@@ -45,16 +47,23 @@
func (agent *Agent) addFlowsToAdapter(ctx context.Context, newFlows []*ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
logger.Debugw(ctx, "add-flows-to-adapters", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "flow-metadata": flowMetadata})
+ var desc string
+ operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+
if (len(newFlows)) == 0 {
logger.Debugw(ctx, "nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": newFlows})
return coreutils.DoneResponse(), nil
}
device, err := agent.getDeviceReadOnly(ctx)
if err != nil {
+ desc = err.Error()
+ agent.logDeviceUpdate(ctx, "addFlowsToAdapter", nil, nil, operStatus, &desc)
return coreutils.DoneResponse(), status.Errorf(codes.Aborted, "%s", err)
}
dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
if err != nil {
+ desc = fmt.Sprintf("non-existent-device-type-%s", device.Type)
+ agent.logDeviceUpdate(ctx, "addFlowsToAdapter", nil, nil, operStatus, &desc)
return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
}
flowsToAdd := make([]*ofp.OfpFlowStats, 0)
@@ -62,6 +71,8 @@
for _, flow := range newFlows {
flowHandle, created, err := agent.flowLoader.LockOrCreate(ctx, flow)
if err != nil {
+ desc = err.Error()
+ agent.logDeviceUpdate(ctx, "addFlowsToAdapter", nil, nil, operStatus, &desc)
return coreutils.DoneResponse(), err
}
if created {
@@ -72,6 +83,8 @@
//Flow needs to be updated.
if err := flowHandle.Update(ctx, flow); err != nil {
flowHandle.Unlock()
+ desc = fmt.Sprintf("failure-updating-flow-%d-to-device-%s", flow.Id, agent.deviceID)
+ agent.logDeviceUpdate(ctx, "addFlowsToAdapter", nil, nil, operStatus, &desc)
return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-flow-%d-to-device-%s", flow.Id, agent.deviceID)
}
flowsToDelete = append(flowsToDelete, flowToReplace)
@@ -101,9 +114,11 @@
rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, updatedAllFlows, nil, flowMetadata)
if err != nil {
cancel()
+ desc = err.Error()
+ agent.logDeviceUpdate(ctx, "addFlowsToAdapter", nil, nil, operStatus, &desc)
return coreutils.DoneResponse(), err
}
- go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+ go agent.waitForAdapterFlowResponse(subCtx, cancel, "addFlowsToAdapter", rpcResponse, response)
} else {
flowChanges := &ofp.FlowChanges{
ToAdd: &voltha.Flows{Items: flowsToAdd},
@@ -117,27 +132,38 @@
rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
if err != nil {
cancel()
+ desc = err.Error()
+ agent.logDeviceUpdate(ctx, "addFlowsToAdapter", nil, nil, operStatus, &desc)
return coreutils.DoneResponse(), err
}
- go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+ go agent.waitForAdapterFlowResponse(subCtx, cancel, "addFlowsToAdapter", rpcResponse, response)
}
+ operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
+ agent.logDeviceUpdate(ctx, "addFlowsToAdapter", nil, nil, operStatus, &desc)
return response, nil
}
func (agent *Agent) deleteFlowsFromAdapter(ctx context.Context, flowsToDel []*ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
logger.Debugw(ctx, "delete-flows-from-adapter", log.Fields{"device-id": agent.deviceID, "flows": flowsToDel})
+ var desc string
+ operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+
if (len(flowsToDel)) == 0 {
logger.Debugw(ctx, "nothing-to-delete", log.Fields{"device-id": agent.deviceID, "flows": flowsToDel})
return coreutils.DoneResponse(), nil
}
+ defer agent.logDeviceUpdate(ctx, "deleteFlowsFromAdapter", nil, nil, operStatus, &desc)
+
device, err := agent.getDeviceReadOnly(ctx)
if err != nil {
+ desc = err.Error()
return coreutils.DoneResponse(), status.Errorf(codes.Aborted, "%s", err)
}
dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
if err != nil {
+ desc = fmt.Sprintf("non-existent-device-type-%s", device.Type)
return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
}
for _, flow := range flowsToDel {
@@ -145,6 +171,7 @@
// Update the store and cache
if err := flowHandle.Delete(ctx); err != nil {
flowHandle.Unlock()
+ desc = err.Error()
return coreutils.DoneResponse(), err
}
flowHandle.Unlock()
@@ -162,9 +189,10 @@
rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, updatedAllFlows, nil, flowMetadata)
if err != nil {
cancel()
+ desc = err.Error()
return coreutils.DoneResponse(), err
}
- go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+ go agent.waitForAdapterFlowResponse(subCtx, cancel, "deleteFlowToAdapter", rpcResponse, response)
} else {
flowChanges := &ofp.FlowChanges{
ToAdd: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
@@ -178,16 +206,21 @@
rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
if err != nil {
cancel()
+ desc = err.Error()
return coreutils.DoneResponse(), err
}
- go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+ go agent.waitForAdapterFlowResponse(subCtx, cancel, "deleteFlowToAdapter", rpcResponse, response)
}
+ operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
return response, nil
}
func (agent *Agent) updateFlowsToAdapter(ctx context.Context, updatedFlows []*ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
logger.Debugw(ctx, "update-flows-to-adapter", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows})
+ var desc string
+ operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+
if (len(updatedFlows)) == 0 {
logger.Debugw(ctx, "nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows})
return coreutils.DoneResponse(), nil
@@ -198,10 +231,15 @@
return coreutils.DoneResponse(), status.Errorf(codes.Aborted, "%s", err)
}
if device.OperStatus != voltha.OperStatus_ACTIVE || device.ConnectStatus != voltha.ConnectStatus_REACHABLE || device.AdminState != voltha.AdminState_ENABLED {
+ desc = fmt.Sprint("invalid device states")
+ agent.logDeviceUpdate(ctx, "updateFlowsToAdapter", nil, nil, operStatus, &desc)
return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "invalid device states")
}
dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
if err != nil {
+ desc = fmt.Sprintf("non-existent-device-type-%s", device.Type)
+ agent.logDeviceUpdate(ctx, "updateFlowsToAdapter", nil, nil, operStatus, &desc)
+
return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
}
flowsToAdd := make([]*ofp.OfpFlowStats, 0, len(updatedFlows))
@@ -212,6 +250,8 @@
// Update the store and cache
if err := flowHandle.Update(ctx, flow); err != nil {
flowHandle.Unlock()
+ desc = err.Error()
+ agent.logDeviceUpdate(ctx, "updateFlowsToAdapter", nil, nil, operStatus, &desc)
return coreutils.DoneResponse(), err
}
@@ -231,9 +271,11 @@
rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, updatedAllFlows, nil, nil)
if err != nil {
cancel()
+ desc = err.Error()
+ agent.logDeviceUpdate(ctx, "updateFlowsToAdapter", nil, nil, operStatus, &desc)
return coreutils.DoneResponse(), err
}
- go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+ go agent.waitForAdapterFlowResponse(subCtx, cancel, "updateFlowToAdapter", rpcResponse, response)
} else {
logger.Debugw(ctx, "updating-flows-and-groups",
log.Fields{
@@ -260,11 +302,15 @@
rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
if err != nil {
cancel()
+ desc = err.Error()
+ agent.logDeviceUpdate(ctx, "updateFlowsToAdapter", nil, nil, operStatus, &desc)
return coreutils.DoneResponse(), err
}
- go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+ go agent.waitForAdapterFlowResponse(subCtx, cancel, "updateFlowToAdapter", rpcResponse, response)
}
+ operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
+ agent.logDeviceUpdate(ctx, "updateFlowsToAdapter", nil, nil, operStatus, &desc)
return response, nil
}
@@ -301,16 +347,30 @@
func (agent *Agent) deleteAllFlows(ctx context.Context) error {
logger.Debugw(ctx, "deleteAllFlows", log.Fields{"device-id": agent.deviceID})
+ var error string
+ var desc string
+ operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+
+ defer agent.logDeviceUpdate(ctx, "deleteAllFlows", nil, nil, operStatus, &desc)
+
for flowID := range agent.flowLoader.ListIDs() {
if flowHandle, have := agent.flowLoader.Lock(flowID); have {
// Update the store and cache
if err := flowHandle.Delete(ctx); err != nil {
flowHandle.Unlock()
+ error += fmt.Sprintf("%v ", flowID)
logger.Errorw(ctx, "unable-to-delete-flow", log.Fields{"device-id": agent.deviceID, "flowID": flowID})
continue
}
flowHandle.Unlock()
}
}
+
+ if error != "" {
+ desc = fmt.Sprintf("Unable to delete flows : %s", error)
+ } else {
+ operStatus.Code = common.OperationResp_OPERATION_SUCCESS
+ }
+
return nil
}
diff --git a/rw_core/core/device/agent_group.go b/rw_core/core/device/agent_group.go
index 9552b78..5db0ea5 100644
--- a/rw_core/core/device/agent_group.go
+++ b/rw_core/core/device/agent_group.go
@@ -18,11 +18,13 @@
import (
"context"
+ "fmt"
"strconv"
"github.com/gogo/protobuf/proto"
coreutils "github.com/opencord/voltha-go/rw_core/utils"
"github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-protos/v4/go/common"
ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
"github.com/opencord/voltha-protos/v4/go/voltha"
"google.golang.org/grpc/codes"
@@ -45,6 +47,9 @@
func (agent *Agent) addGroupsToAdapter(ctx context.Context, newGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
logger.Debugw(ctx, "add-groups-to-adapters", log.Fields{"device-id": agent.deviceID, "groups": newGroups, "flow-metadata": flowMetadata})
+ var desc string
+ operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+
if (len(newGroups)) == 0 {
logger.Debugw(ctx, "nothing-to-update", log.Fields{"device-id": agent.deviceID, "groups": newGroups})
return coreutils.DoneResponse(), nil
@@ -52,10 +57,14 @@
device, err := agent.getDeviceReadOnly(ctx)
if err != nil {
+ desc = err.Error()
+ agent.logDeviceUpdate(ctx, "addGroupsToAdapter", nil, nil, operStatus, &desc)
return coreutils.DoneResponse(), status.Errorf(codes.Aborted, "%s", err)
}
dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
if err != nil {
+ desc = fmt.Sprintf("non-existent-device-type-%s", device.Type)
+ agent.logDeviceUpdate(ctx, "addGroupsToAdapter", nil, nil, operStatus, &desc)
return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
}
@@ -64,6 +73,8 @@
for _, group := range newGroups {
groupHandle, created, err := agent.groupLoader.LockOrCreate(ctx, group)
if err != nil {
+ desc = err.Error()
+ agent.logDeviceUpdate(ctx, "addGroupsToAdapter", nil, nil, operStatus, &desc)
return coreutils.DoneResponse(), err
}
@@ -75,6 +86,8 @@
//Group needs to be updated.
if err := groupHandle.Update(ctx, group); err != nil {
groupHandle.Unlock()
+ desc = fmt.Sprintf("failure-updating-group-%s-to-device-%s", strconv.Itoa(int(group.Desc.GroupId)), agent.deviceID)
+ agent.logDeviceUpdate(ctx, "addGroupsToAdapter", nil, nil, operStatus, &desc)
return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-group-%s-to-device-%s", strconv.Itoa(int(group.Desc.GroupId)), agent.deviceID)
}
groupsToDelete = append(groupsToDelete, groupToChange)
@@ -103,9 +116,11 @@
rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, nil, updatedAllGroups, flowMetadata)
if err != nil {
cancel()
+ desc = err.Error()
+ agent.logDeviceUpdate(ctx, "addGroupsToAdapter", nil, nil, operStatus, &desc)
return coreutils.DoneResponse(), err
}
- go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+ go agent.waitForAdapterFlowResponse(subCtx, cancel, "addGroupsToAdapter", rpcResponse, response)
} else {
flowChanges := &ofp.FlowChanges{
ToAdd: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
@@ -119,10 +134,14 @@
rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
if err != nil {
cancel()
+ desc = err.Error()
+ agent.logDeviceUpdate(ctx, "addGroupsToAdapter", nil, nil, operStatus, &desc)
return coreutils.DoneResponse(), err
}
- go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+ go agent.waitForAdapterFlowResponse(subCtx, cancel, "addGroupsToAdapter", rpcResponse, response)
}
+ operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
+ agent.logDeviceUpdate(ctx, "addGroupsToAdapter", nil, nil, operStatus, &desc)
return response, nil
}
@@ -133,12 +152,20 @@
logger.Debugw(ctx, "nothing-to-delete", log.Fields{"device-id": agent.deviceID})
return coreutils.DoneResponse(), nil
}
+
+ var desc string
+ operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+
+ defer agent.logDeviceUpdate(ctx, "deleteGroupsFromAdapter", nil, nil, operStatus, &desc)
+
device, err := agent.getDeviceReadOnly(ctx)
if err != nil {
+ desc = err.Error()
return coreutils.DoneResponse(), status.Errorf(codes.Aborted, "%s", err)
}
dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
if err != nil {
+ desc = fmt.Sprintf("non-existent-device-type-%s", device.Type)
return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
}
@@ -147,6 +174,7 @@
// Update the store and cache
if err := groupHandle.Delete(ctx); err != nil {
groupHandle.Unlock()
+ desc = err.Error()
return coreutils.DoneResponse(), err
}
groupHandle.Unlock()
@@ -163,9 +191,10 @@
rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, nil, updatedAllGroups, flowMetadata)
if err != nil {
cancel()
+ desc = err.Error()
return coreutils.DoneResponse(), err
}
- go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+ go agent.waitForAdapterFlowResponse(subCtx, cancel, "deleteGroupsFromAdapter", rpcResponse, response)
} else {
flowChanges := &ofp.FlowChanges{
ToAdd: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
@@ -179,16 +208,21 @@
rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
if err != nil {
cancel()
+ desc = err.Error()
return coreutils.DoneResponse(), err
}
- go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+ go agent.waitForAdapterFlowResponse(subCtx, cancel, "deleteGroupsFromAdapter", rpcResponse, response)
}
+ operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
return response, nil
}
func (agent *Agent) updateGroupsToAdapter(ctx context.Context, updatedGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
logger.Debugw(ctx, "update-groups-to-adapter", log.Fields{"device-id": agent.deviceID, "groups": updatedGroups})
+ var desc string
+ operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+
if (len(updatedGroups)) == 0 {
logger.Debugw(ctx, "nothing-to-update", log.Fields{"device-id": agent.deviceID, "groups": updatedGroups})
return coreutils.DoneResponse(), nil
@@ -196,13 +230,19 @@
device, err := agent.getDeviceReadOnly(ctx)
if err != nil {
+ desc = err.Error()
+ agent.logDeviceUpdate(ctx, "updateGroupsToAdapter", nil, nil, operStatus, &desc)
return coreutils.DoneResponse(), status.Errorf(codes.Aborted, "%s", err)
}
if device.OperStatus != voltha.OperStatus_ACTIVE || device.ConnectStatus != voltha.ConnectStatus_REACHABLE || device.AdminState != voltha.AdminState_ENABLED {
+ desc = fmt.Sprintf("invalid device states-oper-%s-connect-%s-admin-%s", device.OperStatus, device.ConnectStatus, device.AdminState)
+ agent.logDeviceUpdate(ctx, "updateGroupsToAdapter", nil, nil, operStatus, &desc)
return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "invalid device states-oper-%s-connect-%s-admin-%s", device.OperStatus, device.ConnectStatus, device.AdminState)
}
dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
if err != nil {
+ desc = fmt.Sprintf("non-existent-device-type-%s", device.Type)
+ agent.logDeviceUpdate(ctx, "updateGroupsToAdapter", nil, nil, operStatus, &desc)
return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
}
@@ -212,6 +252,8 @@
// Update the store and cache
if err := groupHandle.Update(ctx, group); err != nil {
groupHandle.Unlock()
+ desc = err.Error()
+ agent.logDeviceUpdate(ctx, "updateGroupsToAdapter", nil, nil, operStatus, &desc)
return coreutils.DoneResponse(), err
}
groupsToUpdate = append(groupsToUpdate, group)
@@ -229,9 +271,11 @@
rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, nil, updatedAllGroups, nil)
if err != nil {
cancel()
+ desc = err.Error()
+ agent.logDeviceUpdate(ctx, "updateGroupsToAdapter", nil, nil, operStatus, &desc)
return coreutils.DoneResponse(), err
}
- go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+ go agent.waitForAdapterFlowResponse(subCtx, cancel, "updateGroupsToAdapter", rpcResponse, response)
} else {
logger.Debugw(ctx, "updating-groups",
log.Fields{
@@ -258,10 +302,14 @@
rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
if err != nil {
cancel()
+ desc = err.Error()
+ agent.logDeviceUpdate(ctx, "updateGroupsToAdapter", nil, nil, operStatus, &desc)
return coreutils.DoneResponse(), err
}
- go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+ go agent.waitForAdapterFlowResponse(subCtx, cancel, "updateGroupsToAdapter", rpcResponse, response)
}
+ operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
+ agent.logDeviceUpdate(ctx, "updateGroupsToAdapter", nil, nil, operStatus, &desc)
return response, nil
}
diff --git a/rw_core/core/device/agent_port.go b/rw_core/core/device/agent_port.go
index 6e53d16..f99cc5d 100644
--- a/rw_core/core/device/agent_port.go
+++ b/rw_core/core/device/agent_port.go
@@ -19,6 +19,7 @@
import (
"context"
"fmt"
+ "github.com/opencord/voltha-protos/v4/go/common"
"github.com/gogo/protobuf/proto"
"github.com/opencord/voltha-go/rw_core/core/device/port"
@@ -147,22 +148,29 @@
func (agent *Agent) addPort(ctx context.Context, port *voltha.Port) error {
logger.Debugw(ctx, "addPort", log.Fields{"device-id": agent.deviceID})
+ var desc string
+ operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+
+ defer agent.logDeviceUpdate(ctx, "addPort", nil, nil, operStatus, &desc)
port.AdminState = voltha.AdminState_ENABLED
portHandle, created, err := agent.portLoader.LockOrCreate(ctx, port)
if err != nil {
+ desc = err.Error()
return err
}
defer portHandle.Unlock()
if created {
+ operStatus.Code = common.OperationResp_OPERATION_SUCCESS
return nil
}
oldPort := portHandle.GetReadOnly()
if oldPort.Label != "" || oldPort.Type != voltha.Port_PON_OLT {
logger.Debugw(ctx, "port-already-exists", log.Fields{"port": port})
+ desc = fmt.Sprintf("port already exists, port : %s", port)
return nil
}
@@ -172,7 +180,13 @@
newPort.Label = port.Label
newPort.OperStatus = port.OperStatus
- return portHandle.Update(ctx, &newPort)
+ err = portHandle.Update(ctx, &newPort)
+ if err != nil {
+ desc = err.Error()
+ return err
+ }
+ operStatus.Code = common.OperationResp_OPERATION_SUCCESS
+ return err
}
func (agent *Agent) addPeerPort(ctx context.Context, peerPort *voltha.Port_PeerPort) error {
@@ -223,8 +237,14 @@
func (agent *Agent) disablePort(ctx context.Context, portID uint32) error {
logger.Debugw(ctx, "disable-port", log.Fields{"device-id": agent.deviceID, "port-no": portID})
+ var desc string
+ operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+
+ defer agent.logDeviceUpdate(ctx, "disablePort", nil, nil, operStatus, &desc)
+
portHandle, have := agent.portLoader.Lock(portID)
if !have {
+ desc = fmt.Sprintf("Invalid argument portID: %v", portID)
return status.Errorf(codes.InvalidArgument, "%v", portID)
}
defer portHandle.Unlock()
@@ -232,18 +252,21 @@
oldPort := portHandle.GetReadOnly()
if oldPort.Type != voltha.Port_PON_OLT {
+ desc = fmt.Sprintf("Disabling of Port Type %v unimplemented", oldPort.Type)
return status.Errorf(codes.InvalidArgument, "Disabling of Port Type %v unimplemented", oldPort.Type)
}
newPort := *oldPort
newPort.AdminState = voltha.AdminState_DISABLED
if err := portHandle.Update(ctx, &newPort); err != nil {
+ desc = err.Error()
return err
}
//send request to adapter
device, err := agent.getDeviceReadOnly(ctx)
if err != nil {
+ desc = err.Error()
return err
}
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
@@ -251,18 +274,26 @@
ch, err := agent.adapterProxy.DisablePort(ctx, device, &newPort)
if err != nil {
+ desc = err.Error()
cancel()
return err
}
- go agent.waitForAdapterResponse(subCtx, cancel, "disablePort", ch, agent.onSuccess, agent.onFailure)
+ operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
+ go agent.waitForAdapterResponseAndLogDeviceUpdate(subCtx, cancel, "disablePort", ch, agent.onSuccess, agent.onFailure, nil)
return nil
}
func (agent *Agent) enablePort(ctx context.Context, portID uint32) error {
logger.Debugw(ctx, "enable-port", log.Fields{"device-id": agent.deviceID, "port-no": portID})
+ var desc string
+ operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+
+ defer agent.logDeviceUpdate(ctx, "enablePort", nil, nil, operStatus, &desc)
+
portHandle, have := agent.portLoader.Lock(portID)
if !have {
+ desc = fmt.Sprintf("Invalid Argument portID: %v", portID)
return status.Errorf(codes.InvalidArgument, "%v", portID)
}
defer portHandle.Unlock()
@@ -270,18 +301,21 @@
oldPort := portHandle.GetReadOnly()
if oldPort.Type != voltha.Port_PON_OLT {
+ desc = fmt.Sprintf("Enabling of Port Type %v unimplemented", oldPort.Type)
return status.Errorf(codes.InvalidArgument, "Enabling of Port Type %v unimplemented", oldPort.Type)
}
newPort := *oldPort
newPort.AdminState = voltha.AdminState_ENABLED
if err := portHandle.Update(ctx, &newPort); err != nil {
+ desc = err.Error()
return err
}
//send request to adapter
device, err := agent.getDeviceReadOnly(ctx)
if err != nil {
+ desc = err.Error()
return err
}
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
@@ -289,9 +323,11 @@
ch, err := agent.adapterProxy.EnablePort(ctx, device, &newPort)
if err != nil {
+ desc = err.Error()
cancel()
return err
}
- go agent.waitForAdapterResponse(subCtx, cancel, "enablePort", ch, agent.onSuccess, agent.onFailure)
+ operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
+ go agent.waitForAdapterResponseAndLogDeviceUpdate(subCtx, cancel, "enablePort", ch, agent.onSuccess, agent.onFailure, nil)
return nil
}
diff --git a/rw_core/core/device/manager.go b/rw_core/core/device/manager.go
index a33f778..02eaf11 100755
--- a/rw_core/core/device/manager.go
+++ b/rw_core/core/device/manager.go
@@ -1079,7 +1079,7 @@
// Activate the child device
if agent = dMgr.getDeviceAgent(ctx, agent.deviceID); agent != nil {
go func() {
- subCtx := utils.WithSpanAndRPCMetadataFromContext(ctx)
+ subCtx := utils.WithFromTopicMetadataFromContext(utils.WithSpanAndRPCMetadataFromContext(ctx), ctx)
err := agent.enableDevice(subCtx)
if err != nil {
logger.Errorw(ctx, "unable-to-enable-device", log.Fields{"error": err})