VOL-3507 Implement the device update queries in rw-core

Change-Id: I2e9de4935c12981ddb7f10924d629bcd0ec09ef5
diff --git a/Makefile b/Makefile
index 545043b..abf615f 100644
--- a/Makefile
+++ b/Makefile
@@ -20,6 +20,9 @@
 # Variables
 VERSION                    ?= $(shell cat ./VERSION)
 
+# Packages
+PACKAGES                   = $(shell go list ./...)
+
 DOCKER_LABEL_VCS_DIRTY     = false
 ifneq ($(shell git ls-files --others --modified --exclude-standard 2>/dev/null | wc -l | sed -e 's/ //g'),0)
     DOCKER_LABEL_VCS_DIRTY = true
@@ -168,3 +171,6 @@
 mod-update:
 	${GO} mod tidy
 	${GO} mod vendor
+
+fmt:
+	@go fmt ${PACKAGES}
diff --git a/VERSION b/VERSION
index dc3c5b9..9b361bb 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-2.7.1-dev
+2.7.2-dev
diff --git a/go.sum b/go.sum
index feaa7c9..45c7749 100644
--- a/go.sum
+++ b/go.sum
@@ -143,7 +143,6 @@
 github.com/onsi/gomega v1.4.2/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
 github.com/opencord/voltha-lib-go/v4 v4.1.0 h1:Ba6w5bv36oYrzqfK96f42+hSEMkukwIjpdBWPztLY1g=
 github.com/opencord/voltha-lib-go/v4 v4.1.0/go.mod h1:K7lDkSkJ97EyfvX8fQtBmBvpj7n6MmwnAtD8Jz79HcQ=
-github.com/opencord/voltha-protos/v4 v4.0.12 h1:x8drb8inaUByjVLFbXSiQwRTU//dfde0MKIHyKb1JMw=
 github.com/opencord/voltha-protos/v4 v4.0.12/go.mod h1:W/OIFIyvFh/C0vchRUuarIsMylEhzCRM9pNxLvkPtKc=
 github.com/opencord/voltha-protos/v4 v4.0.13 h1:4D6jZLrNDwWC3dhRxNtTGeXMv3GKzRUaSkm0aDQwINQ=
 github.com/opencord/voltha-protos/v4 v4.0.13/go.mod h1:W/OIFIyvFh/C0vchRUuarIsMylEhzCRM9pNxLvkPtKc=
@@ -164,7 +163,6 @@
 github.com/prometheus/client_golang v1.0.0 h1:vrDKnkGzuGvhNAL56c7DBz29ZL+KxnoR0x7enabFceM=
 github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
 github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
-github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90 h1:S/YWwWx/RA8rT8tKFRuGUZhuA90OyIBpPCXkcbwU8DE=
 github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
 github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4 h1:gQz4mCbXsO+nc9n1hCxHcGA3Zx3Eo+UHZoInFGUIXNM=
 github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
@@ -221,7 +219,6 @@
 golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
 golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
 golang.org/x/crypto v0.0.0-20190404164418-38d8ce5564a5/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE=
-golang.org/x/crypto v0.0.0-20191001170739-f9e2070545dc h1:KyTYo8xkh/2WdbFLUyQwBS0Jfn3qfZ9QmuPbok2oENE=
 golang.org/x/crypto v0.0.0-20191001170739-f9e2070545dc/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
 golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
 golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI=
@@ -242,7 +239,6 @@
 golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
 golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
-golang.org/x/net v0.0.0-20190930134127-c5a3c61f89f3 h1:6KET3Sqa7fkVfD63QnAM81ZeYg5n4HwApOJkufONnHA=
 golang.org/x/net v0.0.0-20190930134127-c5a3c61f89f3/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20201021035429-f5854403a974 h1:IX6qOQeG5uLjB/hjjwjedwfjND0hgjPMMyO1RoIXQNI=
@@ -251,7 +247,6 @@
 golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
-golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU=
 golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@@ -264,12 +259,10 @@
 golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
-golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24 h1:R8bzl0244nw47n1xKs1MUMAaTNgjavKcN/aX2Ss3+Fo=
 golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f h1:+Nyd8tzPX9R7BWHguqsrbFdRx3WQ/1ib8I44HXV5yTA=
 golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
-golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
 golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
 golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
 golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
@@ -289,7 +282,6 @@
 golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
-golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
 golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
 google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
@@ -301,7 +293,6 @@
 google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
 google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
 google.golang.org/grpc v1.23.1/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
-google.golang.org/grpc v1.24.0 h1:vb/1TCsVn3DcJlQ0Gs1yB1pKI6Do2/QNwxdKqmc/b0s=
 google.golang.org/grpc v1.24.0/go.mod h1:XDChyiUovWa60DnaeDeZmSW86xtLtjtZbwvSiRnRtcA=
 google.golang.org/grpc v1.25.1 h1:wdKvqQk7IttEw92GoRyKG2IDrUIpgpj6H6m81yfeMW0=
 google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
diff --git a/rw_core/core/api/adapter_request_handler.go b/rw_core/core/api/adapter_request_handler.go
index 503d294..1c2d2e0 100644
--- a/rw_core/core/api/adapter_request_handler.go
+++ b/rw_core/core/api/adapter_request_handler.go
@@ -19,7 +19,6 @@
 import (
 	"context"
 	"errors"
-
 	"github.com/golang/protobuf/ptypes"
 	"github.com/golang/protobuf/ptypes/empty"
 	"github.com/opencord/voltha-go/rw_core/core/adapter"
@@ -303,6 +302,7 @@
 	serialNumber := &ic.StrType{}
 	vendorID := &ic.StrType{}
 	onuID := &ic.IntType{}
+	fromTopic := &ic.StrType{}
 	for _, arg := range args {
 		switch arg.Key {
 		case "parent_device_id":
@@ -345,6 +345,11 @@
 				logger.Warnw(ctx, "cannot-unmarshal-transaction-id", log.Fields{"error": err})
 				return nil, err
 			}
+		case "fromTopic":
+			if err := ptypes.UnmarshalAny(arg.Value, fromTopic); err != nil {
+				logger.Warnw(ctx, "cannot-unmarshal-fromTopic", log.Fields{"error": err})
+				return nil, err
+			}
 		}
 	}
 	logger.Debugw(ctx, "child-device-detected", log.Fields{"parent-device-id": pID.Id, "parent-port-no": portNo.Val,
@@ -352,7 +357,8 @@
 		"vendor-id": vendorID.Val, "onu-id": onuID.Val, "transaction-id": transactionID.Val})
 
 	rpcCtx := utils.WithRPCMetadataContext(log.WithSpanFromContext(context.TODO(), ctx), "ChildDeviceDetected")
-	device, err := rhp.deviceMgr.ChildDeviceDetected(rpcCtx, pID.Id, portNo.Val, dt.Val, chnlID.Val, vendorID.Val, serialNumber.Val, onuID.Val)
+	fromTopicContext := utils.WithFromTopicMetadataContext(rpcCtx, fromTopic.Val)
+	device, err := rhp.deviceMgr.ChildDeviceDetected(fromTopicContext, pID.Id, portNo.Val, dt.Val, chnlID.Val, vendorID.Val, serialNumber.Val, onuID.Val)
 	if err != nil {
 		logger.Debugw(ctx, "child-detection-failed", log.Fields{"parent-device-id": pID.Id, "onu-id": onuID.Val, "error": err})
 	}
@@ -729,6 +735,7 @@
 	deviceID := &voltha.ID{}
 	port := &voltha.Port{}
 	transactionID := &ic.StrType{}
+	fromTopic := &ic.StrType{}
 	for _, arg := range args {
 		switch arg.Key {
 		case "device_id":
@@ -746,12 +753,19 @@
 				logger.Warnw(ctx, "cannot-unmarshal-transaction-id", log.Fields{"error": err})
 				return nil, err
 			}
+		case "fromTopic":
+			if err := ptypes.UnmarshalAny(arg.Value, fromTopic); err != nil {
+				logger.Warnw(ctx, "cannot-unmarshal-fromTopic", log.Fields{"error": err})
+				return nil, err
+			}
+			//log.EnrichSpan(ctx,log.Fields{"fromTopic": fromTopic})
 		}
 	}
 	logger.Debugw(ctx, "port-created", log.Fields{"device-id": deviceID.Id, "port": port, "transaction-id": transactionID.Val})
 	rpcCtx := utils.WithRPCMetadataContext(log.WithSpanFromContext(context.TODO(), ctx), "PortCreated")
+	fromTopicContext := utils.WithFromTopicMetadataContext(rpcCtx, fromTopic.Val)
 
-	if err := rhp.deviceMgr.AddPort(rpcCtx, deviceID.Id, port); err != nil {
+	if err := rhp.deviceMgr.AddPort(fromTopicContext, deviceID.Id, port); err != nil {
 		logger.Debugw(ctx, "unable-to-add-port", log.Fields{"error": err})
 		return nil, err
 	}
@@ -925,6 +939,7 @@
 	deviceID := &voltha.ID{}
 	reason := &ic.StrType{}
 	transactionID := &ic.StrType{}
+	fromTopic := &ic.StrType{}
 	for _, arg := range args {
 		switch arg.Key {
 		case "device_id":
@@ -942,14 +957,20 @@
 				logger.Warnw(ctx, "cannot-unmarshal-transaction-id", log.Fields{"error": err})
 				return nil, err
 			}
+		case "fromTopic":
+			if err := ptypes.UnmarshalAny(arg.Value, fromTopic); err != nil {
+				logger.Warnw(ctx, "cannot-unmarshal-fromTopic", log.Fields{"error": err})
+				return nil, err
+			}
 		}
 	}
 	logger.Debugw(ctx, "device-reason-update", log.Fields{"device-id": deviceID.Id, "reason": reason.Val,
 		"transaction-id": transactionID.Val})
 
 	rpcCtx := utils.WithRPCMetadataContext(log.WithSpanFromContext(context.TODO(), ctx), "DeviceReasonUpdate")
+	fromTopicContext := utils.WithFromTopicMetadataContext(rpcCtx, fromTopic.Val)
 
-	if err := rhp.deviceMgr.UpdateDeviceReason(rpcCtx, deviceID.Id, reason.Val); err != nil {
+	if err := rhp.deviceMgr.UpdateDeviceReason(fromTopicContext, deviceID.Id, reason.Val); err != nil {
 		logger.Debugw(ctx, "unable-to-update-device-reason", log.Fields{"error": err})
 		return nil, err
 
diff --git a/rw_core/core/device/agent.go b/rw_core/core/device/agent.go
index bc5fc1a..9358d8a 100755
--- a/rw_core/core/device/agent.go
+++ b/rw_core/core/device/agent.go
@@ -21,13 +21,15 @@
 	"encoding/hex"
 	"errors"
 	"fmt"
+	"github.com/gogo/protobuf/proto"
+	"github.com/golang/protobuf/ptypes"
+	"github.com/golang/protobuf/ptypes/empty"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
 	"reflect"
 	"sync"
 	"time"
 
-	"github.com/gogo/protobuf/proto"
-	"github.com/golang/protobuf/ptypes"
-	"github.com/golang/protobuf/ptypes/empty"
 	"github.com/opencord/voltha-go/db/model"
 	"github.com/opencord/voltha-go/rw_core/core/adapter"
 	"github.com/opencord/voltha-go/rw_core/core/device/flow"
@@ -38,12 +40,11 @@
 	coreutils "github.com/opencord/voltha-go/rw_core/utils"
 	"github.com/opencord/voltha-lib-go/v4/pkg/kafka"
 	"github.com/opencord/voltha-lib-go/v4/pkg/log"
+	"github.com/opencord/voltha-protos/v4/go/common"
 	"github.com/opencord/voltha-protos/v4/go/extension"
 	ic "github.com/opencord/voltha-protos/v4/go/inter_container"
 	ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
 	"github.com/opencord/voltha-protos/v4/go/voltha"
-	"google.golang.org/grpc/codes"
-	"google.golang.org/grpc/status"
 )
 
 // Agent represents device agent attributes
@@ -136,12 +137,20 @@
 		logger.Infow(ctx, "device-loaded-from-db", log.Fields{"device-id": agent.deviceID})
 	} else {
 		// Create a new device
+		var desc string
+		prevState := common.AdminState_UNKNOWN
+		currState := common.AdminState_UNKNOWN
+		operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+
+		defer agent.logDeviceUpdate(ctx, "createDevice", &prevState, &currState, operStatus, &desc)
+
 		// Assumption is that AdminState, FlowGroups, and Flows are uninitialized since this
 		// is a new device, so populate them here before passing the device to ldProxy.Set.
 		// agent.deviceId will also have been set during newAgent().
 		device = (proto.Clone(deviceToCreate)).(*voltha.Device)
 		device.Id = agent.deviceID
 		device.AdminState = voltha.AdminState_PREPROVISIONED
+		currState = device.AdminState
 		if !deviceToCreate.GetRoot() && deviceToCreate.ProxyAddress != nil {
 			// Set the default vlan ID to the one specified by the parent adapter.  It can be
 			// overwritten by the child adapter during a device update request
@@ -150,8 +159,10 @@
 
 		// Add the initial device to the local model
 		if err := agent.dbProxy.Set(ctx, agent.deviceID, device); err != nil {
+			desc = fmt.Sprintf("failed-adding-device-%s: %s", agent.deviceID, err.Error())
 			return nil, status.Errorf(codes.Aborted, "failed-adding-device-%s: %s", agent.deviceID, err)
 		}
+		operStatus.Code = common.OperationResp_OPERATION_SUCCESS
 		agent.device = device
 	}
 	startSucceeded = true
@@ -312,6 +323,42 @@
 	}
 }
 
+func (agent *Agent) waitForAdapterResponseAndLogDeviceUpdate(ctx context.Context, cancel context.CancelFunc, rpc string, ch chan *kafka.RpcResponse,
+	onSuccess coreutils.ResponseCallback, onFailure coreutils.ResponseCallback, prevState *common.AdminState_Types, reqArgs ...interface{}) {
+	defer cancel()
+	var desc string
+	operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+	defer agent.logDeviceUpdate(ctx, rpc, prevState, &agent.device.AdminState, operStatus, &desc)
+	var rpce *voltha.RPCEvent
+	defer func() {
+		if rpce != nil {
+			go agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce,
+				voltha.EventCategory_COMMUNICATION, nil, time.Now().UnixNano())
+		}
+	}()
+
+	select {
+	case rpcResponse, ok := <-ch:
+		if !ok {
+			rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, "Response Channel Closed", nil)
+			onFailure(ctx, rpc, status.Errorf(codes.Aborted, "channel-closed"), reqArgs)
+			//add failure
+		} else if rpcResponse.Err != nil {
+			desc = rpcResponse.Err.Error()
+			rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, desc, nil)
+			onFailure(ctx, rpc, rpcResponse.Err, reqArgs)
+			//add failure
+		} else {
+			operStatus.Code = common.OperationResp_OPERATION_SUCCESS
+			onSuccess(ctx, rpc, rpcResponse.Reply, reqArgs)
+		}
+	case <-ctx.Done():
+		desc = ctx.Err().Error()
+		rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, desc, nil)
+		onFailure(ctx, rpc, ctx.Err(), reqArgs)
+	}
+}
+
 // getDeviceReadOnly returns a device which MUST NOT be modified, but is safe to keep forever.
 func (agent *Agent) getDeviceReadOnly(ctx context.Context) (*voltha.Device, error) {
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
@@ -335,20 +382,34 @@
 
 // enableDevice activates a preprovisioned or a disable device
 func (agent *Agent) enableDevice(ctx context.Context) error {
+	//To preserve and use oldDevice state as prev state in new device
+	prevDeviceState := agent.device.AdminState
+	var desc string
+	operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+
+	defer agent.logDeviceUpdate(ctx, "enableDevice", nil, nil, operStatus, &desc)
+
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
 	logger.Debugw(ctx, "enable-device", log.Fields{"device-id": agent.deviceID})
 
 	oldDevice := agent.getDeviceReadOnlyWithoutLock()
+
 	if oldDevice.AdminState == voltha.AdminState_ENABLED {
 		logger.Warnw(ctx, "device-already-enabled", log.Fields{"device-id": agent.deviceID})
 		agent.requestQueue.RequestComplete()
-		return status.Error(codes.FailedPrecondition, fmt.Sprintf("cannot-enable-an-already-enabled-device: %s", oldDevice.Id))
+		desc = fmt.Sprintf("cannot-enable-an-already-enabled-device: %s", oldDevice.Id)
+		return status.Error(codes.FailedPrecondition, desc)
 	}
 	if agent.isDeletionInProgress() {
 		agent.requestQueue.RequestComplete()
-		return status.Errorf(codes.FailedPrecondition, "deviceId:%s, Device deletion is in progress.", agent.deviceID)
+
+		operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
+
+		desc = fmt.Sprintf("deviceId:%s, Device deletion is in progress.", agent.deviceID)
+		return status.Error(codes.FailedPrecondition, desc)
+
 	}
 	// First figure out which adapter will handle this device type.  We do it at this stage as allow devices to be
 	// pre-provisioned with the required adapter not registered.   At this stage, since we need to communicate
@@ -356,6 +417,7 @@
 	adapterName, err := agent.adapterMgr.GetAdapterType(oldDevice.Type)
 	if err != nil {
 		agent.requestQueue.RequestComplete()
+		desc = err.Error()
 		return err
 	}
 
@@ -365,7 +427,9 @@
 	// Update the Admin State and set the operational state to activating before sending the request to the Adapters
 	newDevice.AdminState = voltha.AdminState_ENABLED
 	newDevice.OperStatus = voltha.OperStatus_ACTIVATING
+
 	if err := agent.updateDeviceAndReleaseLock(ctx, newDevice); err != nil {
+		desc = err.Error()
 		return err
 	}
 
@@ -373,6 +437,7 @@
 	var ch chan *kafka.RpcResponse
 	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
 	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+	subCtx = coreutils.WithFromTopicMetadataFromContext(subCtx, ctx)
 
 	if oldDevice.AdminState == voltha.AdminState_PREPROVISIONED {
 		ch, err = agent.adapterProxy.AdoptDevice(subCtx, newDevice)
@@ -381,15 +446,23 @@
 	}
 	if err != nil {
 		cancel()
+		desc = err.Error()
 		return err
 	}
+
+	operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
+
 	// Wait for response
-	go agent.waitForAdapterResponse(subCtx, cancel, "enableDevice", ch, agent.onSuccess, agent.onFailure)
+	go agent.waitForAdapterResponseAndLogDeviceUpdate(subCtx, cancel, "enableDevice", ch, agent.onSuccess, agent.onFailure, &prevDeviceState)
 	return nil
 }
 
-func (agent *Agent) waitForAdapterFlowResponse(ctx context.Context, cancel context.CancelFunc, ch chan *kafka.RpcResponse, response coreutils.Response) {
+func (agent *Agent) waitForAdapterFlowResponse(ctx context.Context, cancel context.CancelFunc, rpc string, ch chan *kafka.RpcResponse, response coreutils.Response) {
 	defer cancel()
+	var desc string
+	operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+	defer agent.logDeviceUpdate(ctx, rpc, nil, nil, operStatus, &desc)
+
 	var rpce *voltha.RPCEvent
 	defer func() {
 		if rpce != nil {
@@ -401,17 +474,21 @@
 	case rpcResponse, ok := <-ch:
 		if !ok {
 			//add failure
+			desc = "Response Channel Closed"
 			rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, "Response Channel Closed", nil)
 			response.Error(status.Errorf(codes.Aborted, "channel-closed"))
 		} else if rpcResponse.Err != nil {
 			//add failure
-			rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, rpcResponse.Err.Error(), nil)
+			desc = rpcResponse.Err.Error()
+			rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, desc, nil)
 			response.Error(rpcResponse.Err)
 		} else {
+			operStatus.Code = common.OperationResp_OPERATION_SUCCESS
 			response.Done()
 		}
 	case <-ctx.Done():
-		rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, ctx.Err().Error(), nil)
+		desc = ctx.Err().Error()
+		rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, desc, nil)
 		response.Error(ctx.Err())
 	}
 }
@@ -474,7 +551,15 @@
 
 //disableDevice disable a device
 func (agent *Agent) disableDevice(ctx context.Context) error {
+	var desc string
+	operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+
+	prevDeviceState := agent.device.AdminState
+
+	defer agent.logDeviceUpdate(ctx, "disableDevice", nil, nil, operStatus, &desc)
+
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		desc = err.Error()
 		return err
 	}
 	logger.Debugw(ctx, "disable-device", log.Fields{"device-id": agent.deviceID})
@@ -482,12 +567,14 @@
 	cloned := agent.cloneDeviceWithoutLock()
 
 	if cloned.AdminState == voltha.AdminState_DISABLED {
+		desc = "device-already-disabled"
 		logger.Debugw(ctx, "device-already-disabled", log.Fields{"device-id": agent.deviceID})
 		agent.requestQueue.RequestComplete()
 		return nil
 	}
 	if cloned.AdminState == voltha.AdminState_PREPROVISIONED {
 		agent.requestQueue.RequestComplete()
+		desc = fmt.Sprintf("deviceId:%s, invalid-admin-state:%s", agent.deviceID, cloned.AdminState)
 		return status.Errorf(codes.FailedPrecondition, "deviceId:%s, invalid-admin-state:%s", agent.deviceID, cloned.AdminState)
 	}
 	if agent.isDeletionInProgress() {
@@ -497,6 +584,7 @@
 	// Update the Admin State and operational state before sending the request out
 	cloned.AdminState = voltha.AdminState_DISABLED
 	cloned.OperStatus = voltha.OperStatus_UNKNOWN
+
 	if err := agent.updateDeviceAndReleaseLock(ctx, cloned); err != nil {
 		return err
 	}
@@ -507,15 +595,27 @@
 	ch, err := agent.adapterProxy.DisableDevice(subCtx, cloned)
 	if err != nil {
 		cancel()
+		desc = err.Error()
 		return err
 	}
-	go agent.waitForAdapterResponse(subCtx, cancel, "disableDevice", ch, agent.onSuccess, agent.onFailure)
+	operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
+
+	// Wait for response
+	go agent.waitForAdapterResponseAndLogDeviceUpdate(subCtx, cancel, "disableDevice", ch, agent.onSuccess, agent.onFailure, &prevDeviceState)
 
 	return nil
 }
 
 func (agent *Agent) rebootDevice(ctx context.Context) error {
+	var desc string
+	operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+
+	prevDeviceState := agent.device.AdminState
+
+	defer agent.logDeviceUpdate(ctx, "rebootDevice", nil, nil, operStatus, &desc)
+
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		desc = err.Error()
 		return err
 	}
 	defer agent.requestQueue.RequestComplete()
@@ -531,15 +631,25 @@
 	ch, err := agent.adapterProxy.RebootDevice(subCtx, device)
 	if err != nil {
 		cancel()
+		desc = err.Error()
 		return err
 	}
-	go agent.waitForAdapterResponse(subCtx, cancel, "rebootDevice", ch, agent.onSuccess, agent.onFailure)
+	operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
+
+	// Wait for response
+	go agent.waitForAdapterResponseAndLogDeviceUpdate(subCtx, cancel, "rebootDevice", ch, agent.onSuccess, agent.onFailure, &prevDeviceState)
 	return nil
 }
 
 func (agent *Agent) deleteDeviceForce(ctx context.Context) error {
 	logger.Debugw(ctx, "delete-device-force", log.Fields{"device-id": agent.deviceID})
+
+	var desc string
+	operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		desc = err.Error()
+		agent.logDeviceUpdate(ctx, "deleteDeviceForce", nil, nil, operStatus, &desc)
 		return err
 	}
 	// Get the device Transient state, return err if it is DELETING
@@ -547,8 +657,10 @@
 
 	if agent.isStateDeleting(previousDeviceTransientState) {
 		agent.requestQueue.RequestComplete()
-		return status.Errorf(codes.FailedPrecondition, "deviceId:%s, Device Deletion is in progress",
+		desc = fmt.Sprintf("deviceId:%s, Device Deletion is in progress",
 			agent.deviceID)
+		agent.logDeviceUpdate(ctx, "deleteDeviceForce", nil, nil, operStatus, &desc)
+		return status.Error(codes.FailedPrecondition, desc)
 	}
 	device := agent.cloneDeviceWithoutLock()
 	if err := agent.updateDeviceWithTransientStateAndReleaseLock(ctx, device,
@@ -563,8 +675,13 @@
 		ch, err := agent.adapterProxy.DeleteDevice(subCtx, device)
 		if err != nil {
 			cancel()
+			desc = err.Error()
+			agent.logDeviceUpdate(ctx, "deleteDeviceForce", nil, nil, operStatus, &desc)
 			return err
 		}
+		// As force delete will not be dependent over the response of adapter, marking this operation as success
+		operStatus.Code = common.OperationResp_OPERATION_SUCCESS
+		agent.logDeviceUpdate(ctx, "deleteDeviceForce", nil, nil, operStatus, &desc)
 		// Since it is a case of force delete, nothing needs to be done on adapter responses.
 		go agent.waitForAdapterForceDeleteResponse(subCtx, cancel, "deleteDeviceForce", ch, agent.onSuccess,
 			agent.onFailure)
@@ -574,7 +691,15 @@
 
 func (agent *Agent) deleteDevice(ctx context.Context) error {
 	logger.Debugw(ctx, "delete-device", log.Fields{"device-id": agent.deviceID})
+
+	var desc string
+	operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+	prevState := agent.device.AdminState
+
+	defer agent.logDeviceUpdate(ctx, "deleteDevice", nil, nil, operStatus, &desc)
+
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		desc = err.Error()
 		return err
 	}
 	// Get the device Transient state, return err if it is DELETING
@@ -582,7 +707,8 @@
 
 	if agent.isStateDeleting(previousDeviceTransientState) {
 		agent.requestQueue.RequestComplete()
-		return status.Errorf(codes.FailedPrecondition, "deviceId:%s, Device Deletion is in progress", agent.deviceID)
+		desc = fmt.Sprintf("deviceId:%s, Device Deletion is in progress", agent.deviceID)
+		return status.Error(codes.FailedPrecondition, desc)
 	}
 	device := agent.cloneDeviceWithoutLock()
 	previousAdminState := device.AdminState
@@ -595,6 +721,7 @@
 	}
 	if err := agent.updateDeviceWithTransientStateAndReleaseLock(ctx, device,
 		currentDeviceTransientState, previousDeviceTransientState); err != nil {
+		desc = err.Error()
 		return err
 	}
 	// If the device was in pre-prov state (only parent device are in that state) then do not send the request to the
@@ -610,10 +737,13 @@
 			if err := agent.updateTransientState(ctx, voltha.DeviceTransientState_DELETE_FAILED); err != nil {
 				logger.Errorw(ctx, "failed-to-update-transient-state-as-delete-failed", log.Fields{"device-id": agent.deviceID})
 			}
+			desc = err.Error()
 			return err
 		}
-		go agent.waitForAdapterResponse(subCtx, cancel, "deleteDevice", ch, agent.onDeleteSuccess,
-			agent.onDeleteFailure)
+
+		operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
+		go agent.waitForAdapterResponseAndLogDeviceUpdate(subCtx, cancel, "deleteDevice", ch, agent.onDeleteSuccess,
+			agent.onDeleteFailure, &prevState)
 	}
 	return nil
 }
@@ -877,11 +1007,24 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
+
 	logger.Debugw(ctx, "update-device-reason", log.Fields{"device-id": agent.deviceID, "reason": reason})
 
+	var desc string
+	operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+
+	defer agent.logDeviceUpdate(ctx, "updateDeviceReason", nil, nil, operStatus, &desc)
+
 	cloned := agent.cloneDeviceWithoutLock()
 	cloned.Reason = reason
-	return agent.updateDeviceAndReleaseLock(ctx, cloned)
+	retErr := agent.updateDeviceAndReleaseLock(ctx, cloned)
+	if retErr != nil {
+		desc = retErr.Error()
+	} else {
+		operStatus.Code = common.OperationResp_OPERATION_SUCCESS
+		desc = reason
+	}
+	return retErr
 }
 
 func (agent *Agent) ChildDeviceLost(ctx context.Context, device *voltha.Device) error {
diff --git a/rw_core/core/device/agent_device_update.go b/rw_core/core/device/agent_device_update.go
new file mode 100644
index 0000000..9e5013a
--- /dev/null
+++ b/rw_core/core/device/agent_device_update.go
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package device
+
+import (
+	"context"
+	"fmt"
+	"github.com/opencord/voltha-go/rw_core/utils"
+	"github.com/opencord/voltha-lib-go/v4/pkg/log"
+	"github.com/opencord/voltha-protos/v4/go/common"
+)
+
+func (agent *Agent) logDeviceUpdate(ctx context.Context, operation string, prevState *common.AdminState_Types, currState *common.AdminState_Types, status *common.OperationResp, desc *string) {
+	logger.Debugw(ctx, "addDeviceUpdate", log.Fields{"device-id": agent.deviceID})
+
+	requestedBy := utils.GetFromTopicMetadataFromContext(ctx)
+
+	if requestedBy == "" {
+		requestedBy = "NB"
+	}
+
+	logger.Infow(ctx, "logDeviceUpdate", log.Fields{"device-update": operation, "device-update-id": agent.deviceID,
+		"requested-by": requestedBy, "state-change": agent.stateChangeString(prevState, currState),
+		"status": status.GetCode().String(), "description": desc})
+}
+
+func (agent *Agent) stateChangeString(prevState *common.AdminState_Types, currState *common.AdminState_Types) string {
+	device := agent.getDeviceReadOnlyWithoutLock()
+	if prevState != nil && currState != nil && *prevState != *currState {
+		return fmt.Sprintf("%s->%s", *prevState, device.AdminState)
+	}
+	return ""
+}
diff --git a/rw_core/core/device/agent_flow.go b/rw_core/core/device/agent_flow.go
index f2fd10a..4589b91 100644
--- a/rw_core/core/device/agent_flow.go
+++ b/rw_core/core/device/agent_flow.go
@@ -18,11 +18,13 @@
 
 import (
 	"context"
+	"fmt"
 
 	"github.com/gogo/protobuf/proto"
 	coreutils "github.com/opencord/voltha-go/rw_core/utils"
 	fu "github.com/opencord/voltha-lib-go/v4/pkg/flows"
 	"github.com/opencord/voltha-lib-go/v4/pkg/log"
+	"github.com/opencord/voltha-protos/v4/go/common"
 	ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
 	"github.com/opencord/voltha-protos/v4/go/voltha"
 	"google.golang.org/grpc/codes"
@@ -45,16 +47,23 @@
 func (agent *Agent) addFlowsToAdapter(ctx context.Context, newFlows []*ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
 	logger.Debugw(ctx, "add-flows-to-adapters", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "flow-metadata": flowMetadata})
 
+	var desc string
+	operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+
 	if (len(newFlows)) == 0 {
 		logger.Debugw(ctx, "nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": newFlows})
 		return coreutils.DoneResponse(), nil
 	}
 	device, err := agent.getDeviceReadOnly(ctx)
 	if err != nil {
+		desc = err.Error()
+		agent.logDeviceUpdate(ctx, "addFlowsToAdapter", nil, nil, operStatus, &desc)
 		return coreutils.DoneResponse(), status.Errorf(codes.Aborted, "%s", err)
 	}
 	dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
 	if err != nil {
+		desc = fmt.Sprintf("non-existent-device-type-%s", device.Type)
+		agent.logDeviceUpdate(ctx, "addFlowsToAdapter", nil, nil, operStatus, &desc)
 		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
 	}
 	flowsToAdd := make([]*ofp.OfpFlowStats, 0)
@@ -62,6 +71,8 @@
 	for _, flow := range newFlows {
 		flowHandle, created, err := agent.flowLoader.LockOrCreate(ctx, flow)
 		if err != nil {
+			desc = err.Error()
+			agent.logDeviceUpdate(ctx, "addFlowsToAdapter", nil, nil, operStatus, &desc)
 			return coreutils.DoneResponse(), err
 		}
 		if created {
@@ -72,6 +83,8 @@
 				//Flow needs to be updated.
 				if err := flowHandle.Update(ctx, flow); err != nil {
 					flowHandle.Unlock()
+					desc = fmt.Sprintf("failure-updating-flow-%d-to-device-%s", flow.Id, agent.deviceID)
+					agent.logDeviceUpdate(ctx, "addFlowsToAdapter", nil, nil, operStatus, &desc)
 					return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-flow-%d-to-device-%s", flow.Id, agent.deviceID)
 				}
 				flowsToDelete = append(flowsToDelete, flowToReplace)
@@ -101,9 +114,11 @@
 		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, updatedAllFlows, nil, flowMetadata)
 		if err != nil {
 			cancel()
+			desc = err.Error()
+			agent.logDeviceUpdate(ctx, "addFlowsToAdapter", nil, nil, operStatus, &desc)
 			return coreutils.DoneResponse(), err
 		}
-		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, "addFlowsToAdapter", rpcResponse, response)
 	} else {
 		flowChanges := &ofp.FlowChanges{
 			ToAdd:    &voltha.Flows{Items: flowsToAdd},
@@ -117,27 +132,38 @@
 		rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
 		if err != nil {
 			cancel()
+			desc = err.Error()
+			agent.logDeviceUpdate(ctx, "addFlowsToAdapter", nil, nil, operStatus, &desc)
 			return coreutils.DoneResponse(), err
 		}
-		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, "addFlowsToAdapter", rpcResponse, response)
 	}
+	operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
+	agent.logDeviceUpdate(ctx, "addFlowsToAdapter", nil, nil, operStatus, &desc)
 	return response, nil
 }
 
 func (agent *Agent) deleteFlowsFromAdapter(ctx context.Context, flowsToDel []*ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
 	logger.Debugw(ctx, "delete-flows-from-adapter", log.Fields{"device-id": agent.deviceID, "flows": flowsToDel})
 
+	var desc string
+	operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+
 	if (len(flowsToDel)) == 0 {
 		logger.Debugw(ctx, "nothing-to-delete", log.Fields{"device-id": agent.deviceID, "flows": flowsToDel})
 		return coreutils.DoneResponse(), nil
 	}
 
+	defer agent.logDeviceUpdate(ctx, "deleteFlowsFromAdapter", nil, nil, operStatus, &desc)
+
 	device, err := agent.getDeviceReadOnly(ctx)
 	if err != nil {
+		desc = err.Error()
 		return coreutils.DoneResponse(), status.Errorf(codes.Aborted, "%s", err)
 	}
 	dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
 	if err != nil {
+		desc = fmt.Sprintf("non-existent-device-type-%s", device.Type)
 		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
 	}
 	for _, flow := range flowsToDel {
@@ -145,6 +171,7 @@
 			// Update the store and cache
 			if err := flowHandle.Delete(ctx); err != nil {
 				flowHandle.Unlock()
+				desc = err.Error()
 				return coreutils.DoneResponse(), err
 			}
 			flowHandle.Unlock()
@@ -162,9 +189,10 @@
 		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, updatedAllFlows, nil, flowMetadata)
 		if err != nil {
 			cancel()
+			desc = err.Error()
 			return coreutils.DoneResponse(), err
 		}
-		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, "deleteFlowToAdapter", rpcResponse, response)
 	} else {
 		flowChanges := &ofp.FlowChanges{
 			ToAdd:    &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
@@ -178,16 +206,21 @@
 		rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
 		if err != nil {
 			cancel()
+			desc = err.Error()
 			return coreutils.DoneResponse(), err
 		}
-		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, "deleteFlowToAdapter", rpcResponse, response)
 	}
+	operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
 	return response, nil
 }
 
 func (agent *Agent) updateFlowsToAdapter(ctx context.Context, updatedFlows []*ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
 	logger.Debugw(ctx, "update-flows-to-adapter", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows})
 
+	var desc string
+	operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+
 	if (len(updatedFlows)) == 0 {
 		logger.Debugw(ctx, "nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows})
 		return coreutils.DoneResponse(), nil
@@ -198,10 +231,15 @@
 		return coreutils.DoneResponse(), status.Errorf(codes.Aborted, "%s", err)
 	}
 	if device.OperStatus != voltha.OperStatus_ACTIVE || device.ConnectStatus != voltha.ConnectStatus_REACHABLE || device.AdminState != voltha.AdminState_ENABLED {
+		desc = fmt.Sprint("invalid device states")
+		agent.logDeviceUpdate(ctx, "updateFlowsToAdapter", nil, nil, operStatus, &desc)
 		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "invalid device states")
 	}
 	dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
 	if err != nil {
+		desc = fmt.Sprintf("non-existent-device-type-%s", device.Type)
+		agent.logDeviceUpdate(ctx, "updateFlowsToAdapter", nil, nil, operStatus, &desc)
+
 		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
 	}
 	flowsToAdd := make([]*ofp.OfpFlowStats, 0, len(updatedFlows))
@@ -212,6 +250,8 @@
 			// Update the store and cache
 			if err := flowHandle.Update(ctx, flow); err != nil {
 				flowHandle.Unlock()
+				desc = err.Error()
+				agent.logDeviceUpdate(ctx, "updateFlowsToAdapter", nil, nil, operStatus, &desc)
 				return coreutils.DoneResponse(), err
 			}
 
@@ -231,9 +271,11 @@
 		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, updatedAllFlows, nil, nil)
 		if err != nil {
 			cancel()
+			desc = err.Error()
+			agent.logDeviceUpdate(ctx, "updateFlowsToAdapter", nil, nil, operStatus, &desc)
 			return coreutils.DoneResponse(), err
 		}
-		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, "updateFlowToAdapter", rpcResponse, response)
 	} else {
 		logger.Debugw(ctx, "updating-flows-and-groups",
 			log.Fields{
@@ -260,11 +302,15 @@
 		rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
 		if err != nil {
 			cancel()
+			desc = err.Error()
+			agent.logDeviceUpdate(ctx, "updateFlowsToAdapter", nil, nil, operStatus, &desc)
 			return coreutils.DoneResponse(), err
 		}
-		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, "updateFlowToAdapter", rpcResponse, response)
 	}
 
+	operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
+	agent.logDeviceUpdate(ctx, "updateFlowsToAdapter", nil, nil, operStatus, &desc)
 	return response, nil
 }
 
@@ -301,16 +347,30 @@
 func (agent *Agent) deleteAllFlows(ctx context.Context) error {
 	logger.Debugw(ctx, "deleteAllFlows", log.Fields{"device-id": agent.deviceID})
 
+	var error string
+	var desc string
+	operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+
+	defer agent.logDeviceUpdate(ctx, "deleteAllFlows", nil, nil, operStatus, &desc)
+
 	for flowID := range agent.flowLoader.ListIDs() {
 		if flowHandle, have := agent.flowLoader.Lock(flowID); have {
 			// Update the store and cache
 			if err := flowHandle.Delete(ctx); err != nil {
 				flowHandle.Unlock()
+				error += fmt.Sprintf("%v ", flowID)
 				logger.Errorw(ctx, "unable-to-delete-flow", log.Fields{"device-id": agent.deviceID, "flowID": flowID})
 				continue
 			}
 			flowHandle.Unlock()
 		}
 	}
+
+	if error != "" {
+		desc = fmt.Sprintf("Unable to delete flows : %s", error)
+	} else {
+		operStatus.Code = common.OperationResp_OPERATION_SUCCESS
+	}
+
 	return nil
 }
diff --git a/rw_core/core/device/agent_group.go b/rw_core/core/device/agent_group.go
index 9552b78..5db0ea5 100644
--- a/rw_core/core/device/agent_group.go
+++ b/rw_core/core/device/agent_group.go
@@ -18,11 +18,13 @@
 
 import (
 	"context"
+	"fmt"
 	"strconv"
 
 	"github.com/gogo/protobuf/proto"
 	coreutils "github.com/opencord/voltha-go/rw_core/utils"
 	"github.com/opencord/voltha-lib-go/v4/pkg/log"
+	"github.com/opencord/voltha-protos/v4/go/common"
 	ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
 	"github.com/opencord/voltha-protos/v4/go/voltha"
 	"google.golang.org/grpc/codes"
@@ -45,6 +47,9 @@
 func (agent *Agent) addGroupsToAdapter(ctx context.Context, newGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
 	logger.Debugw(ctx, "add-groups-to-adapters", log.Fields{"device-id": agent.deviceID, "groups": newGroups, "flow-metadata": flowMetadata})
 
+	var desc string
+	operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+
 	if (len(newGroups)) == 0 {
 		logger.Debugw(ctx, "nothing-to-update", log.Fields{"device-id": agent.deviceID, "groups": newGroups})
 		return coreutils.DoneResponse(), nil
@@ -52,10 +57,14 @@
 
 	device, err := agent.getDeviceReadOnly(ctx)
 	if err != nil {
+		desc = err.Error()
+		agent.logDeviceUpdate(ctx, "addGroupsToAdapter", nil, nil, operStatus, &desc)
 		return coreutils.DoneResponse(), status.Errorf(codes.Aborted, "%s", err)
 	}
 	dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
 	if err != nil {
+		desc = fmt.Sprintf("non-existent-device-type-%s", device.Type)
+		agent.logDeviceUpdate(ctx, "addGroupsToAdapter", nil, nil, operStatus, &desc)
 		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
 	}
 
@@ -64,6 +73,8 @@
 	for _, group := range newGroups {
 		groupHandle, created, err := agent.groupLoader.LockOrCreate(ctx, group)
 		if err != nil {
+			desc = err.Error()
+			agent.logDeviceUpdate(ctx, "addGroupsToAdapter", nil, nil, operStatus, &desc)
 			return coreutils.DoneResponse(), err
 		}
 
@@ -75,6 +86,8 @@
 				//Group needs to be updated.
 				if err := groupHandle.Update(ctx, group); err != nil {
 					groupHandle.Unlock()
+					desc = fmt.Sprintf("failure-updating-group-%s-to-device-%s", strconv.Itoa(int(group.Desc.GroupId)), agent.deviceID)
+					agent.logDeviceUpdate(ctx, "addGroupsToAdapter", nil, nil, operStatus, &desc)
 					return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-group-%s-to-device-%s", strconv.Itoa(int(group.Desc.GroupId)), agent.deviceID)
 				}
 				groupsToDelete = append(groupsToDelete, groupToChange)
@@ -103,9 +116,11 @@
 		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, nil, updatedAllGroups, flowMetadata)
 		if err != nil {
 			cancel()
+			desc = err.Error()
+			agent.logDeviceUpdate(ctx, "addGroupsToAdapter", nil, nil, operStatus, &desc)
 			return coreutils.DoneResponse(), err
 		}
-		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, "addGroupsToAdapter", rpcResponse, response)
 	} else {
 		flowChanges := &ofp.FlowChanges{
 			ToAdd:    &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
@@ -119,10 +134,14 @@
 		rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
 		if err != nil {
 			cancel()
+			desc = err.Error()
+			agent.logDeviceUpdate(ctx, "addGroupsToAdapter", nil, nil, operStatus, &desc)
 			return coreutils.DoneResponse(), err
 		}
-		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, "addGroupsToAdapter", rpcResponse, response)
 	}
+	operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
+	agent.logDeviceUpdate(ctx, "addGroupsToAdapter", nil, nil, operStatus, &desc)
 	return response, nil
 }
 
@@ -133,12 +152,20 @@
 		logger.Debugw(ctx, "nothing-to-delete", log.Fields{"device-id": agent.deviceID})
 		return coreutils.DoneResponse(), nil
 	}
+
+	var desc string
+	operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+
+	defer agent.logDeviceUpdate(ctx, "deleteGroupsFromAdapter", nil, nil, operStatus, &desc)
+
 	device, err := agent.getDeviceReadOnly(ctx)
 	if err != nil {
+		desc = err.Error()
 		return coreutils.DoneResponse(), status.Errorf(codes.Aborted, "%s", err)
 	}
 	dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
 	if err != nil {
+		desc = fmt.Sprintf("non-existent-device-type-%s", device.Type)
 		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
 	}
 
@@ -147,6 +174,7 @@
 			// Update the store and cache
 			if err := groupHandle.Delete(ctx); err != nil {
 				groupHandle.Unlock()
+				desc = err.Error()
 				return coreutils.DoneResponse(), err
 			}
 			groupHandle.Unlock()
@@ -163,9 +191,10 @@
 		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, nil, updatedAllGroups, flowMetadata)
 		if err != nil {
 			cancel()
+			desc = err.Error()
 			return coreutils.DoneResponse(), err
 		}
-		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, "deleteGroupsFromAdapter", rpcResponse, response)
 	} else {
 		flowChanges := &ofp.FlowChanges{
 			ToAdd:    &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
@@ -179,16 +208,21 @@
 		rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
 		if err != nil {
 			cancel()
+			desc = err.Error()
 			return coreutils.DoneResponse(), err
 		}
-		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, "deleteGroupsFromAdapter", rpcResponse, response)
 	}
+	operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
 	return response, nil
 }
 
 func (agent *Agent) updateGroupsToAdapter(ctx context.Context, updatedGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
 	logger.Debugw(ctx, "update-groups-to-adapter", log.Fields{"device-id": agent.deviceID, "groups": updatedGroups})
 
+	var desc string
+	operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+
 	if (len(updatedGroups)) == 0 {
 		logger.Debugw(ctx, "nothing-to-update", log.Fields{"device-id": agent.deviceID, "groups": updatedGroups})
 		return coreutils.DoneResponse(), nil
@@ -196,13 +230,19 @@
 
 	device, err := agent.getDeviceReadOnly(ctx)
 	if err != nil {
+		desc = err.Error()
+		agent.logDeviceUpdate(ctx, "updateGroupsToAdapter", nil, nil, operStatus, &desc)
 		return coreutils.DoneResponse(), status.Errorf(codes.Aborted, "%s", err)
 	}
 	if device.OperStatus != voltha.OperStatus_ACTIVE || device.ConnectStatus != voltha.ConnectStatus_REACHABLE || device.AdminState != voltha.AdminState_ENABLED {
+		desc = fmt.Sprintf("invalid device states-oper-%s-connect-%s-admin-%s", device.OperStatus, device.ConnectStatus, device.AdminState)
+		agent.logDeviceUpdate(ctx, "updateGroupsToAdapter", nil, nil, operStatus, &desc)
 		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "invalid device states-oper-%s-connect-%s-admin-%s", device.OperStatus, device.ConnectStatus, device.AdminState)
 	}
 	dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
 	if err != nil {
+		desc = fmt.Sprintf("non-existent-device-type-%s", device.Type)
+		agent.logDeviceUpdate(ctx, "updateGroupsToAdapter", nil, nil, operStatus, &desc)
 		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
 	}
 
@@ -212,6 +252,8 @@
 			// Update the store and cache
 			if err := groupHandle.Update(ctx, group); err != nil {
 				groupHandle.Unlock()
+				desc = err.Error()
+				agent.logDeviceUpdate(ctx, "updateGroupsToAdapter", nil, nil, operStatus, &desc)
 				return coreutils.DoneResponse(), err
 			}
 			groupsToUpdate = append(groupsToUpdate, group)
@@ -229,9 +271,11 @@
 		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, nil, updatedAllGroups, nil)
 		if err != nil {
 			cancel()
+			desc = err.Error()
+			agent.logDeviceUpdate(ctx, "updateGroupsToAdapter", nil, nil, operStatus, &desc)
 			return coreutils.DoneResponse(), err
 		}
-		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, "updateGroupsToAdapter", rpcResponse, response)
 	} else {
 		logger.Debugw(ctx, "updating-groups",
 			log.Fields{
@@ -258,10 +302,14 @@
 		rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
 		if err != nil {
 			cancel()
+			desc = err.Error()
+			agent.logDeviceUpdate(ctx, "updateGroupsToAdapter", nil, nil, operStatus, &desc)
 			return coreutils.DoneResponse(), err
 		}
-		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, "updateGroupsToAdapter", rpcResponse, response)
 	}
 
+	operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
+	agent.logDeviceUpdate(ctx, "updateGroupsToAdapter", nil, nil, operStatus, &desc)
 	return response, nil
 }
diff --git a/rw_core/core/device/agent_port.go b/rw_core/core/device/agent_port.go
index 6e53d16..f99cc5d 100644
--- a/rw_core/core/device/agent_port.go
+++ b/rw_core/core/device/agent_port.go
@@ -19,6 +19,7 @@
 import (
 	"context"
 	"fmt"
+	"github.com/opencord/voltha-protos/v4/go/common"
 
 	"github.com/gogo/protobuf/proto"
 	"github.com/opencord/voltha-go/rw_core/core/device/port"
@@ -147,22 +148,29 @@
 
 func (agent *Agent) addPort(ctx context.Context, port *voltha.Port) error {
 	logger.Debugw(ctx, "addPort", log.Fields{"device-id": agent.deviceID})
+	var desc string
+	operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+
+	defer agent.logDeviceUpdate(ctx, "addPort", nil, nil, operStatus, &desc)
 
 	port.AdminState = voltha.AdminState_ENABLED
 
 	portHandle, created, err := agent.portLoader.LockOrCreate(ctx, port)
 	if err != nil {
+		desc = err.Error()
 		return err
 	}
 	defer portHandle.Unlock()
 
 	if created {
+		operStatus.Code = common.OperationResp_OPERATION_SUCCESS
 		return nil
 	}
 
 	oldPort := portHandle.GetReadOnly()
 	if oldPort.Label != "" || oldPort.Type != voltha.Port_PON_OLT {
 		logger.Debugw(ctx, "port-already-exists", log.Fields{"port": port})
+		desc = fmt.Sprintf("port already exists, port : %s", port)
 		return nil
 	}
 
@@ -172,7 +180,13 @@
 	newPort.Label = port.Label
 	newPort.OperStatus = port.OperStatus
 
-	return portHandle.Update(ctx, &newPort)
+	err = portHandle.Update(ctx, &newPort)
+	if err != nil {
+		desc = err.Error()
+		return err
+	}
+	operStatus.Code = common.OperationResp_OPERATION_SUCCESS
+	return err
 }
 
 func (agent *Agent) addPeerPort(ctx context.Context, peerPort *voltha.Port_PeerPort) error {
@@ -223,8 +237,14 @@
 func (agent *Agent) disablePort(ctx context.Context, portID uint32) error {
 	logger.Debugw(ctx, "disable-port", log.Fields{"device-id": agent.deviceID, "port-no": portID})
 
+	var desc string
+	operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+
+	defer agent.logDeviceUpdate(ctx, "disablePort", nil, nil, operStatus, &desc)
+
 	portHandle, have := agent.portLoader.Lock(portID)
 	if !have {
+		desc = fmt.Sprintf("Invalid argument portID: %v", portID)
 		return status.Errorf(codes.InvalidArgument, "%v", portID)
 	}
 	defer portHandle.Unlock()
@@ -232,18 +252,21 @@
 	oldPort := portHandle.GetReadOnly()
 
 	if oldPort.Type != voltha.Port_PON_OLT {
+		desc = fmt.Sprintf("Disabling of Port Type %v unimplemented", oldPort.Type)
 		return status.Errorf(codes.InvalidArgument, "Disabling of Port Type %v unimplemented", oldPort.Type)
 	}
 
 	newPort := *oldPort
 	newPort.AdminState = voltha.AdminState_DISABLED
 	if err := portHandle.Update(ctx, &newPort); err != nil {
+		desc = err.Error()
 		return err
 	}
 
 	//send request to adapter
 	device, err := agent.getDeviceReadOnly(ctx)
 	if err != nil {
+		desc = err.Error()
 		return err
 	}
 	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
@@ -251,18 +274,26 @@
 
 	ch, err := agent.adapterProxy.DisablePort(ctx, device, &newPort)
 	if err != nil {
+		desc = err.Error()
 		cancel()
 		return err
 	}
-	go agent.waitForAdapterResponse(subCtx, cancel, "disablePort", ch, agent.onSuccess, agent.onFailure)
+	operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
+	go agent.waitForAdapterResponseAndLogDeviceUpdate(subCtx, cancel, "disablePort", ch, agent.onSuccess, agent.onFailure, nil)
 	return nil
 }
 
 func (agent *Agent) enablePort(ctx context.Context, portID uint32) error {
 	logger.Debugw(ctx, "enable-port", log.Fields{"device-id": agent.deviceID, "port-no": portID})
 
+	var desc string
+	operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+
+	defer agent.logDeviceUpdate(ctx, "enablePort", nil, nil, operStatus, &desc)
+
 	portHandle, have := agent.portLoader.Lock(portID)
 	if !have {
+		desc = fmt.Sprintf("Invalid Argument portID: %v", portID)
 		return status.Errorf(codes.InvalidArgument, "%v", portID)
 	}
 	defer portHandle.Unlock()
@@ -270,18 +301,21 @@
 	oldPort := portHandle.GetReadOnly()
 
 	if oldPort.Type != voltha.Port_PON_OLT {
+		desc = fmt.Sprintf("Enabling of Port Type %v unimplemented", oldPort.Type)
 		return status.Errorf(codes.InvalidArgument, "Enabling of Port Type %v unimplemented", oldPort.Type)
 	}
 
 	newPort := *oldPort
 	newPort.AdminState = voltha.AdminState_ENABLED
 	if err := portHandle.Update(ctx, &newPort); err != nil {
+		desc = err.Error()
 		return err
 	}
 
 	//send request to adapter
 	device, err := agent.getDeviceReadOnly(ctx)
 	if err != nil {
+		desc = err.Error()
 		return err
 	}
 	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
@@ -289,9 +323,11 @@
 
 	ch, err := agent.adapterProxy.EnablePort(ctx, device, &newPort)
 	if err != nil {
+		desc = err.Error()
 		cancel()
 		return err
 	}
-	go agent.waitForAdapterResponse(subCtx, cancel, "enablePort", ch, agent.onSuccess, agent.onFailure)
+	operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
+	go agent.waitForAdapterResponseAndLogDeviceUpdate(subCtx, cancel, "enablePort", ch, agent.onSuccess, agent.onFailure, nil)
 	return nil
 }
diff --git a/rw_core/core/device/manager.go b/rw_core/core/device/manager.go
index a33f778..02eaf11 100755
--- a/rw_core/core/device/manager.go
+++ b/rw_core/core/device/manager.go
@@ -1079,7 +1079,7 @@
 	// Activate the child device
 	if agent = dMgr.getDeviceAgent(ctx, agent.deviceID); agent != nil {
 		go func() {
-			subCtx := utils.WithSpanAndRPCMetadataFromContext(ctx)
+			subCtx := utils.WithFromTopicMetadataFromContext(utils.WithSpanAndRPCMetadataFromContext(ctx), ctx)
 			err := agent.enableDevice(subCtx)
 			if err != nil {
 				logger.Errorw(ctx, "unable-to-enable-device", log.Fields{"error": err})
diff --git a/rw_core/utils/core_utils.go b/rw_core/utils/core_utils.go
index 71d5acd..c080dc6 100644
--- a/rw_core/utils/core_utils.go
+++ b/rw_core/utils/core_utils.go
@@ -34,6 +34,9 @@
 var (
 	// RPCContextKey for keeping rpc name as metadata
 	rpcContextKey = contextKey("rpc")
+
+	// fromTopicContextKey for keeping entity from which operation is requested as metadata
+	fromTopicContextKey = contextKey("fromTopic")
 )
 
 // ResponseCallback is the function signature for callbacks to execute after a response is received.
@@ -169,3 +172,26 @@
 	}
 	return targetCtx
 }
+
+func WithFromTopicMetadataContext(ctx context.Context, fromTopic string) context.Context {
+	ctx = context.WithValue(ctx, fromTopicContextKey, fromTopic)
+	return ctx
+}
+
+func WithFromTopicMetadataFromContext(targetCtx, sourceCtx context.Context) context.Context {
+	if sourceCtx != nil {
+		if val, ok := sourceCtx.Value(fromTopicContextKey).(string); ok {
+			targetCtx = context.WithValue(targetCtx, fromTopicContextKey, val)
+		}
+	}
+	return targetCtx
+}
+
+func GetFromTopicMetadataFromContext(ctx context.Context) string {
+	if ctx != nil {
+		if val, ok := ctx.Value(fromTopicContextKey).(string); ok {
+			return val
+		}
+	}
+	return ""
+}