[VOL-3187]Pass Context down the execution call hierarchy across voltha-go codebase
Change-Id: I6bc2a0f7226c1beed4ae01a15d7b5c4dc04358d8
diff --git a/rw_core/core/device/agent.go b/rw_core/core/device/agent.go
index dbaeb2f..2b18296 100755
--- a/rw_core/core/device/agent.go
+++ b/rw_core/core/device/agent.go
@@ -103,7 +103,7 @@
defer func() {
if !startSucceeded {
if err := agent.stop(ctx); err != nil {
- logger.Errorw("failed-to-cleanup-after-unsuccessful-start", log.Fields{"device-id": agent.deviceID, "error": err})
+ logger.Errorw(ctx, "failed-to-cleanup-after-unsuccessful-start", log.Fields{"device-id": agent.deviceID, "error": err})
}
}
}()
@@ -125,7 +125,7 @@
agent.flowLoader.Load(ctx)
agent.groupLoader.Load(ctx)
- logger.Infow("device-loaded-from-dB", log.Fields{"device-id": agent.deviceID})
+ logger.Infow(ctx, "device-loaded-from-dB", log.Fields{"device-id": agent.deviceID})
} else {
// Create a new device
// Assumption is that AdminState, FlowGroups, and Flows are uninitialized since this
@@ -149,7 +149,7 @@
agent.device = device
}
startSucceeded = true
- logger.Debugw("device-agent-started", log.Fields{"device-id": agent.deviceID})
+ logger.Debugw(ctx, "device-agent-started", log.Fields{"device-id": agent.deviceID})
return agent.getDevice(ctx)
}
@@ -165,7 +165,7 @@
}
defer agent.requestQueue.RequestComplete()
- logger.Infow("stopping-device-agent", log.Fields{"deviceId": agent.deviceID, "parentId": agent.parentID})
+ logger.Infow(ctx, "stopping-device-agent", log.Fields{"deviceId": agent.deviceID, "parentId": agent.parentID})
// Remove the device from the KV store
if err := agent.dbProxy.Remove(ctx, agent.deviceID); err != nil {
@@ -176,7 +176,7 @@
agent.stopped = true
- logger.Infow("device-agent-stopped", log.Fields{"device-id": agent.deviceID, "parent-id": agent.parentID})
+ logger.Infow(ctx, "device-agent-stopped", log.Fields{"device-id": agent.deviceID, "parent-id": agent.parentID})
return nil
}
@@ -184,15 +184,15 @@
// Load the most recent state from the KVStore for the device.
func (agent *Agent) reconcileWithKVStore(ctx context.Context) {
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- logger.Warnw("request-aborted", log.Fields{"device-id": agent.deviceID, "error": err})
+ logger.Warnw(ctx, "request-aborted", log.Fields{"device-id": agent.deviceID, "error": err})
return
}
defer agent.requestQueue.RequestComplete()
- logger.Debug("reconciling-device-agent-devicetype")
+ logger.Debug(ctx, "reconciling-device-agent-devicetype")
// TODO: context timeout
device := &voltha.Device{}
if have, err := agent.dbProxy.Get(ctx, agent.deviceID, device); err != nil {
- logger.Errorw("kv-get-failed", log.Fields{"device-id": agent.deviceID, "error": err})
+ logger.Errorw(ctx, "kv-get-failed", log.Fields{"device-id": agent.deviceID, "error": err})
return
} else if !have {
return // not found in kv
@@ -202,23 +202,23 @@
agent.device = device
agent.flowLoader.Load(ctx)
agent.groupLoader.Load(ctx)
- logger.Debugw("reconciled-device-agent-devicetype", log.Fields{"device-id": agent.deviceID, "type": agent.deviceType})
+ logger.Debugw(ctx, "reconciled-device-agent-devicetype", log.Fields{"device-id": agent.deviceID, "type": agent.deviceType})
}
// onSuccess is a common callback for scenarios where we receive a nil response following a request to an adapter
// and the only action required is to publish a successful result on kafka
-func (agent *Agent) onSuccess(rpc string, response interface{}, reqArgs ...interface{}) {
- logger.Debugw("response successful", log.Fields{"rpc": rpc, "device-id": agent.deviceID})
+func (agent *Agent) onSuccess(ctx context.Context, rpc string, response interface{}, reqArgs ...interface{}) {
+ logger.Debugw(ctx, "response successful", log.Fields{"rpc": rpc, "device-id": agent.deviceID})
// TODO: Post success message onto kafka
}
// onFailure is a common callback for scenarios where we receive an error response following a request to an adapter
// and the only action required is to publish the failed result on kafka
-func (agent *Agent) onFailure(rpc string, response interface{}, reqArgs ...interface{}) {
+func (agent *Agent) onFailure(ctx context.Context, rpc string, response interface{}, reqArgs ...interface{}) {
if res, ok := response.(error); ok {
- logger.Errorw("rpc-failed", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "error": res, "args": reqArgs})
+ logger.Errorw(ctx, "rpc-failed", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "error": res, "args": reqArgs})
} else {
- logger.Errorw("rpc-failed-invalid-error", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "args": reqArgs})
+ logger.Errorw(ctx, "rpc-failed-invalid-error", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "args": reqArgs})
}
// TODO: Post failure message onto kafka
}
@@ -229,14 +229,14 @@
select {
case rpcResponse, ok := <-ch:
if !ok {
- onFailure(rpc, status.Errorf(codes.Aborted, "channel-closed"), reqArgs)
+ onFailure(ctx, rpc, status.Errorf(codes.Aborted, "channel-closed"), reqArgs)
} else if rpcResponse.Err != nil {
- onFailure(rpc, rpcResponse.Err, reqArgs)
+ onFailure(ctx, rpc, rpcResponse.Err, reqArgs)
} else {
- onSuccess(rpc, rpcResponse.Reply, reqArgs)
+ onSuccess(ctx, rpc, rpcResponse.Reply, reqArgs)
}
case <-ctx.Done():
- onFailure(rpc, ctx.Err(), reqArgs)
+ onFailure(ctx, rpc, ctx.Err(), reqArgs)
}
}
@@ -261,7 +261,7 @@
}
defer agent.requestQueue.RequestComplete()
- logger.Debugw("enableDevice", log.Fields{"device-id": agent.deviceID})
+ logger.Debugw(ctx, "enableDevice", log.Fields{"device-id": agent.deviceID})
cloned := agent.getDeviceWithoutLock()
@@ -275,7 +275,7 @@
cloned.Adapter = adapterName
if cloned.AdminState == voltha.AdminState_ENABLED {
- logger.Warnw("device-already-enabled", log.Fields{"device-id": agent.deviceID})
+ logger.Warnw(ctx, "device-already-enabled", log.Fields{"device-id": agent.deviceID})
err = status.Error(codes.FailedPrecondition, fmt.Sprintf("cannot-enable-an-already-enabled-device: %s ", cloned.Id))
return err
}
@@ -342,7 +342,7 @@
return err
}
if errs := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, flwResponse, grpResponse); errs != nil {
- logger.Warnw("no-adapter-response", log.Fields{"device-id": agent.deviceID, "result": errs})
+ logger.Warnw(ctx, "no-adapter-response", log.Fields{"device-id": agent.deviceID, "result": errs})
return status.Errorf(codes.Aborted, "flow-failure-device-%s", agent.deviceID)
}
return nil
@@ -390,12 +390,12 @@
return err
}
defer agent.requestQueue.RequestComplete()
- logger.Debugw("disableDevice", log.Fields{"device-id": agent.deviceID})
+ logger.Debugw(ctx, "disableDevice", log.Fields{"device-id": agent.deviceID})
cloned := agent.getDeviceWithoutLock()
if cloned.AdminState == voltha.AdminState_DISABLED {
- logger.Debugw("device-already-disabled", log.Fields{"id": agent.deviceID})
+ logger.Debugw(ctx, "device-already-disabled", log.Fields{"id": agent.deviceID})
return nil
}
if cloned.AdminState == voltha.AdminState_PREPROVISIONED ||
@@ -424,7 +424,7 @@
return err
}
defer agent.requestQueue.RequestComplete()
- logger.Debugw("rebootDevice", log.Fields{"device-id": agent.deviceID})
+ logger.Debugw(ctx, "rebootDevice", log.Fields{"device-id": agent.deviceID})
device := agent.getDeviceWithoutLock()
subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
@@ -438,7 +438,7 @@
}
func (agent *Agent) deleteDevice(ctx context.Context) error {
- logger.Debugw("deleteDevice", log.Fields{"device-id": agent.deviceID})
+ logger.Debugw(ctx, "deleteDevice", log.Fields{"device-id": agent.deviceID})
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
@@ -474,7 +474,7 @@
}
defer agent.requestQueue.RequestComplete()
- logger.Debugw("setParentId", log.Fields{"device-id": device.Id, "parent-id": parentID})
+ logger.Debugw(ctx, "setParentId", log.Fields{"device-id": device.Id, "parent-id": parentID})
cloned := agent.getDeviceWithoutLock()
cloned.ParentId = parentID
@@ -488,7 +488,7 @@
// getSwitchCapability retrieves the switch capability of a parent device
func (agent *Agent) getSwitchCapability(ctx context.Context) (*ic.SwitchCapability, error) {
- logger.Debugw("getSwitchCapability", log.Fields{"device-id": agent.deviceID})
+ logger.Debugw(ctx, "getSwitchCapability", log.Fields{"device-id": agent.deviceID})
cloned, err := agent.getDevice(ctx)
if err != nil {
@@ -515,7 +515,7 @@
return switchCap, nil
}
-func (agent *Agent) onPacketFailure(rpc string, response interface{}, args ...interface{}) {
+func (agent *Agent) onPacketFailure(ctx context.Context, rpc string, response interface{}, args ...interface{}) {
// packet data is encoded in the args param as the first parameter
var packet []byte
if len(args) >= 1 {
@@ -527,7 +527,7 @@
if err, ok := response.(error); ok {
errResp = err
}
- logger.Warnw("packet-out-error", log.Fields{
+ logger.Warnw(ctx, "packet-out-error", log.Fields{
"device-id": agent.deviceID,
"error": errResp,
"packet": hex.EncodeToString(packet),
@@ -570,7 +570,7 @@
return err
}
defer agent.requestQueue.RequestComplete()
- logger.Debugw("updateDeviceUsingAdapterData", log.Fields{"device-id": device.Id})
+ logger.Debugw(ctx, "updateDeviceUsingAdapterData", log.Fields{"device-id": device.Id})
updatedDevice, err := agent.mergeDeviceInfoFromAdapter(device)
if err != nil {
@@ -591,14 +591,14 @@
newConnStatus, newOperStatus := cloned.ConnectStatus, cloned.OperStatus
// Ensure the enums passed in are valid - they will be invalid if they are not set when this function is invoked
if s, ok := voltha.ConnectStatus_Types_value[connStatus.String()]; ok {
- logger.Debugw("updateDeviceStatus-conn", log.Fields{"ok": ok, "val": s})
+ logger.Debugw(ctx, "updateDeviceStatus-conn", log.Fields{"ok": ok, "val": s})
newConnStatus = connStatus
}
if s, ok := voltha.OperStatus_Types_value[operStatus.String()]; ok {
- logger.Debugw("updateDeviceStatus-oper", log.Fields{"ok": ok, "val": s})
+ logger.Debugw(ctx, "updateDeviceStatus-oper", log.Fields{"ok": ok, "val": s})
newOperStatus = operStatus
}
- logger.Debugw("updateDeviceStatus", log.Fields{"deviceId": cloned.Id, "operStatus": cloned.OperStatus, "connectStatus": cloned.ConnectStatus})
+ logger.Debugw(ctx, "updateDeviceStatus", log.Fields{"deviceId": cloned.Id, "operStatus": cloned.OperStatus, "connectStatus": cloned.ConnectStatus})
// Store the device
return agent.updateDeviceStateInStoreWithoutLock(ctx, cloned, cloned.AdminState, newConnStatus, newOperStatus)
}
@@ -606,7 +606,7 @@
// TODO: A generic device update by attribute
func (agent *Agent) updateDeviceAttribute(ctx context.Context, name string, value interface{}) {
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- logger.Warnw("request-aborted", log.Fields{"device-id": agent.deviceID, "name": name, "error": err})
+ logger.Warnw(ctx, "request-aborted", log.Fields{"device-id": agent.deviceID, "name": name, "error": err})
return
}
defer agent.requestQueue.RequestComplete()
@@ -634,11 +634,11 @@
}
}
}
- logger.Debugw("update-field-status", log.Fields{"deviceId": cloned.Id, "name": name, "updated": updated})
+ logger.Debugw(ctx, "update-field-status", log.Fields{"deviceId": cloned.Id, "name": name, "updated": updated})
// Save the data
if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
- logger.Warnw("attribute-update-failed", log.Fields{"attribute": name, "value": value})
+ logger.Warnw(ctx, "attribute-update-failed", log.Fields{"attribute": name, "value": value})
}
}
@@ -647,7 +647,7 @@
return err
}
defer agent.requestQueue.RequestComplete()
- logger.Debugw("simulateAlarm", log.Fields{"id": agent.deviceID})
+ logger.Debugw(ctx, "simulateAlarm", log.Fields{"id": agent.deviceID})
cloned := agent.getDeviceWithoutLock()
@@ -694,7 +694,7 @@
if err := agent.dbProxy.Set(ctx, agent.deviceID, device); err != nil {
return status.Errorf(codes.Internal, "failed-update-device:%s: %s", agent.deviceID, err)
}
- logger.Debugw("updated-device-in-store", log.Fields{"deviceId: ": agent.deviceID})
+ logger.Debugw(ctx, "updated-device-in-store", log.Fields{"deviceId: ": agent.deviceID})
agent.device = device
return nil
@@ -708,7 +708,7 @@
cloned := agent.getDeviceWithoutLock()
cloned.Reason = reason
- logger.Debugw("updateDeviceReason", log.Fields{"deviceId": cloned.Id, "reason": cloned.Reason})
+ logger.Debugw(ctx, "updateDeviceReason", log.Fields{"deviceId": cloned.Id, "reason": cloned.Reason})
// Store the device
return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
}
@@ -719,7 +719,7 @@
}
defer agent.requestQueue.RequestComplete()
- logger.Debugw("childDeviceLost", log.Fields{"child-device-id": device.Id, "parent-device-ud": agent.deviceID})
+ logger.Debugw(ctx, "childDeviceLost", log.Fields{"child-device-id": device.Id, "parent-device-ud": agent.deviceID})
//Remove the associated peer ports on the parent device
parentDevice := agent.getDeviceWithoutLock()
@@ -785,7 +785,7 @@
if err := ptypes.UnmarshalAny(rpcResponse.Reply, testResp); err != nil {
return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
}
- logger.Debugw("Omci_test_Request-Success-device-agent", log.Fields{"testResp": testResp})
+ logger.Debugw(ctx, "Omci_test_Request-Success-device-agent", log.Fields{"testResp": testResp})
return testResp, nil
}
@@ -816,6 +816,6 @@
if err := ptypes.UnmarshalAny(rpcResponse.Reply, Resp); err != nil {
return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
}
- logger.Debugw("getExtValue-Success-device-agent", log.Fields{"Resp": Resp})
+ logger.Debugw(ctx, "getExtValue-Success-device-agent", log.Fields{"Resp": Resp})
return Resp, nil
}