VOL-3507 Implement the device update queries in rw-core

Change-Id: I2e9de4935c12981ddb7f10924d629bcd0ec09ef5
diff --git a/rw_core/core/api/adapter_request_handler.go b/rw_core/core/api/adapter_request_handler.go
index 503d294..1c2d2e0 100644
--- a/rw_core/core/api/adapter_request_handler.go
+++ b/rw_core/core/api/adapter_request_handler.go
@@ -19,7 +19,6 @@
 import (
 	"context"
 	"errors"
-
 	"github.com/golang/protobuf/ptypes"
 	"github.com/golang/protobuf/ptypes/empty"
 	"github.com/opencord/voltha-go/rw_core/core/adapter"
@@ -303,6 +302,7 @@
 	serialNumber := &ic.StrType{}
 	vendorID := &ic.StrType{}
 	onuID := &ic.IntType{}
+	fromTopic := &ic.StrType{}
 	for _, arg := range args {
 		switch arg.Key {
 		case "parent_device_id":
@@ -345,6 +345,11 @@
 				logger.Warnw(ctx, "cannot-unmarshal-transaction-id", log.Fields{"error": err})
 				return nil, err
 			}
+		case "fromTopic":
+			if err := ptypes.UnmarshalAny(arg.Value, fromTopic); err != nil {
+				logger.Warnw(ctx, "cannot-unmarshal-fromTopic", log.Fields{"error": err})
+				return nil, err
+			}
 		}
 	}
 	logger.Debugw(ctx, "child-device-detected", log.Fields{"parent-device-id": pID.Id, "parent-port-no": portNo.Val,
@@ -352,7 +357,8 @@
 		"vendor-id": vendorID.Val, "onu-id": onuID.Val, "transaction-id": transactionID.Val})
 
 	rpcCtx := utils.WithRPCMetadataContext(log.WithSpanFromContext(context.TODO(), ctx), "ChildDeviceDetected")
-	device, err := rhp.deviceMgr.ChildDeviceDetected(rpcCtx, pID.Id, portNo.Val, dt.Val, chnlID.Val, vendorID.Val, serialNumber.Val, onuID.Val)
+	fromTopicContext := utils.WithFromTopicMetadataContext(rpcCtx, fromTopic.Val)
+	device, err := rhp.deviceMgr.ChildDeviceDetected(fromTopicContext, pID.Id, portNo.Val, dt.Val, chnlID.Val, vendorID.Val, serialNumber.Val, onuID.Val)
 	if err != nil {
 		logger.Debugw(ctx, "child-detection-failed", log.Fields{"parent-device-id": pID.Id, "onu-id": onuID.Val, "error": err})
 	}
@@ -729,6 +735,7 @@
 	deviceID := &voltha.ID{}
 	port := &voltha.Port{}
 	transactionID := &ic.StrType{}
+	fromTopic := &ic.StrType{}
 	for _, arg := range args {
 		switch arg.Key {
 		case "device_id":
@@ -746,12 +753,19 @@
 				logger.Warnw(ctx, "cannot-unmarshal-transaction-id", log.Fields{"error": err})
 				return nil, err
 			}
+		case "fromTopic":
+			if err := ptypes.UnmarshalAny(arg.Value, fromTopic); err != nil {
+				logger.Warnw(ctx, "cannot-unmarshal-fromTopic", log.Fields{"error": err})
+				return nil, err
+			}
+			//log.EnrichSpan(ctx,log.Fields{"fromTopic": fromTopic})
 		}
 	}
 	logger.Debugw(ctx, "port-created", log.Fields{"device-id": deviceID.Id, "port": port, "transaction-id": transactionID.Val})
 	rpcCtx := utils.WithRPCMetadataContext(log.WithSpanFromContext(context.TODO(), ctx), "PortCreated")
+	fromTopicContext := utils.WithFromTopicMetadataContext(rpcCtx, fromTopic.Val)
 
-	if err := rhp.deviceMgr.AddPort(rpcCtx, deviceID.Id, port); err != nil {
+	if err := rhp.deviceMgr.AddPort(fromTopicContext, deviceID.Id, port); err != nil {
 		logger.Debugw(ctx, "unable-to-add-port", log.Fields{"error": err})
 		return nil, err
 	}
@@ -925,6 +939,7 @@
 	deviceID := &voltha.ID{}
 	reason := &ic.StrType{}
 	transactionID := &ic.StrType{}
+	fromTopic := &ic.StrType{}
 	for _, arg := range args {
 		switch arg.Key {
 		case "device_id":
@@ -942,14 +957,20 @@
 				logger.Warnw(ctx, "cannot-unmarshal-transaction-id", log.Fields{"error": err})
 				return nil, err
 			}
+		case "fromTopic":
+			if err := ptypes.UnmarshalAny(arg.Value, fromTopic); err != nil {
+				logger.Warnw(ctx, "cannot-unmarshal-fromTopic", log.Fields{"error": err})
+				return nil, err
+			}
 		}
 	}
 	logger.Debugw(ctx, "device-reason-update", log.Fields{"device-id": deviceID.Id, "reason": reason.Val,
 		"transaction-id": transactionID.Val})
 
 	rpcCtx := utils.WithRPCMetadataContext(log.WithSpanFromContext(context.TODO(), ctx), "DeviceReasonUpdate")
+	fromTopicContext := utils.WithFromTopicMetadataContext(rpcCtx, fromTopic.Val)
 
-	if err := rhp.deviceMgr.UpdateDeviceReason(rpcCtx, deviceID.Id, reason.Val); err != nil {
+	if err := rhp.deviceMgr.UpdateDeviceReason(fromTopicContext, deviceID.Id, reason.Val); err != nil {
 		logger.Debugw(ctx, "unable-to-update-device-reason", log.Fields{"error": err})
 		return nil, err