[VOL-3069]Pass Context down the execution call hierarchy across voltha codebase

Change-Id: I97a2630d9a4fe5dc3161113539edda476534f186
diff --git a/rw_core/core/adapter/agent.go b/rw_core/core/adapter/agent.go
index 2143a84..f2f0e8a 100644
--- a/rw_core/core/adapter/agent.go
+++ b/rw_core/core/adapter/agent.go
@@ -17,6 +17,7 @@
 package adapter
 
 import (
+	"context"
 	"github.com/golang/protobuf/ptypes"
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
 	"github.com/opencord/voltha-protos/v3/go/voltha"
@@ -30,13 +31,13 @@
 	lock    sync.RWMutex
 }
 
-func newAdapterAgent(adapter *voltha.Adapter) *agent {
+func newAdapterAgent(ctx context.Context, adapter *voltha.Adapter) *agent {
 	return &agent{
 		adapter: adapter,
 	}
 }
 
-func (aa *agent) getAdapter() *voltha.Adapter {
+func (aa *agent) getAdapter(ctx context.Context) *voltha.Adapter {
 	aa.lock.RLock()
 	defer aa.lock.RUnlock()
 	logger.Debugw("getAdapter", log.Fields{"adapter": aa.adapter})
@@ -45,7 +46,7 @@
 
 // updateCommunicationTime updates the message to the specified time.
 // No attempt is made to save the time to the db, so only recent times are guaranteed to be accurate.
-func (aa *agent) updateCommunicationTime(new time.Time) {
+func (aa *agent) updateCommunicationTime(ctx context.Context, new time.Time) {
 	// only update if new time is not in the future, and either the old time is invalid or new time > old time
 	if last, err := ptypes.Timestamp(aa.adapter.LastCommunication); !new.After(time.Now()) && (err != nil || new.After(last)) {
 		timestamp, err := ptypes.TimestampProto(new)
diff --git a/rw_core/core/adapter/manager.go b/rw_core/core/adapter/manager.go
index b552d8f..99f445b 100644
--- a/rw_core/core/adapter/manager.go
+++ b/rw_core/core/adapter/manager.go
@@ -44,14 +44,14 @@
 	lockdDeviceTypeToAdapterMap sync.RWMutex
 }
 
-func NewAdapterManager(cdProxy *model.Proxy, coreInstanceID string, kafkaClient kafka.Client) *Manager {
+func NewAdapterManager(ctx context.Context, cdProxy *model.Proxy, coreInstanceID string, kafkaClient kafka.Client) *Manager {
 	aMgr := &Manager{
 		coreInstanceID:   coreInstanceID,
 		clusterDataProxy: cdProxy,
 		deviceTypes:      make(map[string]*voltha.DeviceType),
 		adapterAgents:    make(map[string]*agent),
 	}
-	kafkaClient.SubscribeForMetadata(aMgr.updateLastAdapterCommunication)
+	kafkaClient.SubscribeForMetadata(ctx, aMgr.updateLastAdapterCommunication)
 	return aMgr
 }
 
@@ -59,7 +59,7 @@
 // if more than one callback is required, this should be converted to a proper interface
 type adapterRestartedHandler func(ctx context.Context, adapter *voltha.Adapter) error
 
-func (aMgr *Manager) SetAdapterRestartedCallback(onAdapterRestart adapterRestartedHandler) {
+func (aMgr *Manager) SetAdapterRestartedCallback(ctx context.Context, onAdapterRestart adapterRestartedHandler) {
 	aMgr.onAdapterRestart = onAdapterRestart
 }
 
@@ -69,7 +69,7 @@
 
 	// Load the existing adapterAgents and device types - this will also ensure the correct paths have been
 	// created if there are no data in the dB to start
-	err := aMgr.loadAdaptersAndDevicetypesInMemory()
+	err := aMgr.loadAdaptersAndDevicetypesInMemory(ctx)
 	if err != nil {
 		logger.Fatalf("failed-to-load-adapters-and-device-types-in-memory: %s", err)
 	}
@@ -79,7 +79,7 @@
 }
 
 //loadAdaptersAndDevicetypesInMemory loads the existing set of adapters and device types in memory
-func (aMgr *Manager) loadAdaptersAndDevicetypesInMemory() error {
+func (aMgr *Manager) loadAdaptersAndDevicetypesInMemory(ctx context.Context) error {
 	// Load the adapters
 	var adapters []*voltha.Adapter
 	if err := aMgr.clusterDataProxy.List(context.Background(), "adapters", &adapters); err != nil {
@@ -88,7 +88,7 @@
 	}
 	if len(adapters) != 0 {
 		for _, adapter := range adapters {
-			if err := aMgr.addAdapter(adapter, false); err != nil {
+			if err := aMgr.addAdapter(ctx, adapter, false); err != nil {
 				logger.Errorw("failed to add adapter", log.Fields{"adapterId": adapter.Id})
 			} else {
 				logger.Debugw("adapter added successfully", log.Fields{"adapterId": adapter.Id})
@@ -108,7 +108,7 @@
 			logger.Debugw("found-existing-device-types", log.Fields{"deviceTypes": dTypes})
 			dTypes.Items = append(dTypes.Items, dType)
 		}
-		return aMgr.addDeviceTypes(dTypes, false)
+		return aMgr.addDeviceTypes(ctx, dTypes, false)
 	}
 
 	logger.Debug("no-existing-device-type-found")
@@ -116,17 +116,17 @@
 	return nil
 }
 
-func (aMgr *Manager) updateLastAdapterCommunication(adapterID string, timestamp time.Time) {
+func (aMgr *Manager) updateLastAdapterCommunication(ctx context.Context, adapterID string, timestamp time.Time) {
 	aMgr.lockAdaptersMap.RLock()
 	adapterAgent, have := aMgr.adapterAgents[adapterID]
 	aMgr.lockAdaptersMap.RUnlock()
 
 	if have {
-		adapterAgent.updateCommunicationTime(timestamp)
+		adapterAgent.updateCommunicationTime(ctx, timestamp)
 	}
 }
 
-func (aMgr *Manager) addAdapter(adapter *voltha.Adapter, saveToDb bool) error {
+func (aMgr *Manager) addAdapter(ctx context.Context, adapter *voltha.Adapter, saveToDb bool) error {
 	aMgr.lockAdaptersMap.Lock()
 	defer aMgr.lockAdaptersMap.Unlock()
 	logger.Debugw("adding-adapter", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
@@ -153,12 +153,12 @@
 			}
 		}
 		clonedAdapter := (proto.Clone(adapter)).(*voltha.Adapter)
-		aMgr.adapterAgents[adapter.Id] = newAdapterAgent(clonedAdapter)
+		aMgr.adapterAgents[adapter.Id] = newAdapterAgent(ctx, clonedAdapter)
 	}
 	return nil
 }
 
-func (aMgr *Manager) addDeviceTypes(deviceTypes *voltha.DeviceTypes, saveToDb bool) error {
+func (aMgr *Manager) addDeviceTypes(ctx context.Context, deviceTypes *voltha.DeviceTypes, saveToDb bool) error {
 	if deviceTypes == nil {
 		return fmt.Errorf("no-device-type")
 	}
@@ -195,28 +195,28 @@
 }
 
 // ListAdapters returns the contents of all adapters known to the system
-func (aMgr *Manager) ListAdapters(_ context.Context, _ *empty.Empty) (*voltha.Adapters, error) {
+func (aMgr *Manager) ListAdapters(ctx context.Context, _ *empty.Empty) (*voltha.Adapters, error) {
 	result := &voltha.Adapters{Items: []*voltha.Adapter{}}
 	aMgr.lockAdaptersMap.RLock()
 	defer aMgr.lockAdaptersMap.RUnlock()
 	for _, adapterAgent := range aMgr.adapterAgents {
-		if a := adapterAgent.getAdapter(); a != nil {
+		if a := adapterAgent.getAdapter(ctx); a != nil {
 			result.Items = append(result.Items, (proto.Clone(a)).(*voltha.Adapter))
 		}
 	}
 	return result, nil
 }
 
-func (aMgr *Manager) getAdapter(adapterID string) *voltha.Adapter {
+func (aMgr *Manager) getAdapter(ctx context.Context, adapterID string) *voltha.Adapter {
 	aMgr.lockAdaptersMap.RLock()
 	defer aMgr.lockAdaptersMap.RUnlock()
 	if adapterAgent, ok := aMgr.adapterAgents[adapterID]; ok {
-		return adapterAgent.getAdapter()
+		return adapterAgent.getAdapter(ctx)
 	}
 	return nil
 }
 
-func (aMgr *Manager) RegisterAdapter(adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) (*voltha.CoreInstance, error) {
+func (aMgr *Manager) RegisterAdapter(ctx context.Context, adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) (*voltha.CoreInstance, error) {
 	logger.Debugw("RegisterAdapter", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
 		"currentReplica": adapter.CurrentReplica, "totalReplicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint, "deviceTypes": deviceTypes.Items})
 
@@ -229,7 +229,7 @@
 		return nil, status.Error(codes.InvalidArgument, "adapter-not-specifying-type")
 	}
 
-	if aMgr.getAdapter(adapter.Id) != nil {
+	if aMgr.getAdapter(ctx, adapter.Id) != nil {
 		//	Already registered - Adapter may have restarted.  Trigger the reconcile process for that adapter
 		go func() {
 			err := aMgr.onAdapterRestart(context.Background(), adapter)
@@ -240,11 +240,11 @@
 		return &voltha.CoreInstance{InstanceId: aMgr.coreInstanceID}, nil
 	}
 	// Save the adapter and the device types
-	if err := aMgr.addAdapter(adapter, true); err != nil {
+	if err := aMgr.addAdapter(ctx, adapter, true); err != nil {
 		logger.Errorw("failed-to-add-adapter", log.Fields{"error": err})
 		return nil, err
 	}
-	if err := aMgr.addDeviceTypes(deviceTypes, true); err != nil {
+	if err := aMgr.addDeviceTypes(ctx, deviceTypes, true); err != nil {
 		logger.Errorw("failed-to-add-device-types", log.Fields{"error": err})
 		return nil, err
 	}
@@ -256,7 +256,7 @@
 }
 
 // GetAdapterType returns the name of the device adapter that service this device type
-func (aMgr *Manager) GetAdapterType(deviceType string) (string, error) {
+func (aMgr *Manager) GetAdapterType(ctx context.Context, deviceType string) (string, error) {
 	aMgr.lockdDeviceTypeToAdapterMap.Lock()
 	defer aMgr.lockdDeviceTypeToAdapterMap.Unlock()
 	for _, adapterAgent := range aMgr.adapterAgents {
diff --git a/rw_core/core/api/adapter_request_handler.go b/rw_core/core/api/adapter_request_handler.go
index 4deca75..f49f435 100644
--- a/rw_core/core/api/adapter_request_handler.go
+++ b/rw_core/core/api/adapter_request_handler.go
@@ -36,14 +36,14 @@
 }
 
 // NewAdapterRequestHandlerProxy assigns values for adapter request handler proxy attributes and returns the new instance
-func NewAdapterRequestHandlerProxy(dMgr *device.Manager, aMgr *adapter.Manager) *AdapterRequestHandlerProxy {
+func NewAdapterRequestHandlerProxy(ctx context.Context, dMgr *device.Manager, aMgr *adapter.Manager) *AdapterRequestHandlerProxy {
 	return &AdapterRequestHandlerProxy{
 		deviceMgr:  dMgr,
 		adapterMgr: aMgr,
 	}
 }
 
-func (rhp *AdapterRequestHandlerProxy) Register(args []*ic.Argument) (*voltha.CoreInstance, error) {
+func (rhp *AdapterRequestHandlerProxy) Register(ctx context.Context, args []*ic.Argument) (*voltha.CoreInstance, error) {
 	if len(args) < 3 {
 		logger.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
@@ -73,11 +73,11 @@
 	}
 	logger.Debugw("Register", log.Fields{"adapter": *adapter, "device-types": deviceTypes, "transaction-id": transactionID.Val})
 
-	return rhp.adapterMgr.RegisterAdapter(adapter, deviceTypes)
+	return rhp.adapterMgr.RegisterAdapter(ctx, adapter, deviceTypes)
 }
 
 // GetDevice returns device info
-func (rhp *AdapterRequestHandlerProxy) GetDevice(args []*ic.Argument) (*voltha.Device, error) {
+func (rhp *AdapterRequestHandlerProxy) GetDevice(ctx context.Context, args []*ic.Argument) (*voltha.Device, error) {
 	if len(args) < 2 {
 		logger.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
@@ -111,7 +111,7 @@
 }
 
 // DeviceUpdate updates device using adapter data
-func (rhp *AdapterRequestHandlerProxy) DeviceUpdate(args []*ic.Argument) (*empty.Empty, error) {
+func (rhp *AdapterRequestHandlerProxy) DeviceUpdate(ctx context.Context, args []*ic.Argument) (*empty.Empty, error) {
 	if len(args) < 2 {
 		logger.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
@@ -144,7 +144,7 @@
 }
 
 // GetChildDevice returns details of child device
-func (rhp *AdapterRequestHandlerProxy) GetChildDevice(args []*ic.Argument) (*voltha.Device, error) {
+func (rhp *AdapterRequestHandlerProxy) GetChildDevice(ctx context.Context, args []*ic.Argument) (*voltha.Device, error) {
 	if len(args) < 3 {
 		logger.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
@@ -191,7 +191,7 @@
 }
 
 // GetChildDeviceWithProxyAddress returns details of child device with proxy address
-func (rhp *AdapterRequestHandlerProxy) GetChildDeviceWithProxyAddress(args []*ic.Argument) (*voltha.Device, error) {
+func (rhp *AdapterRequestHandlerProxy) GetChildDeviceWithProxyAddress(ctx context.Context, args []*ic.Argument) (*voltha.Device, error) {
 	if len(args) < 2 {
 		logger.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
@@ -220,7 +220,7 @@
 }
 
 // GetPorts returns the ports information of the device based on the port type.
-func (rhp *AdapterRequestHandlerProxy) GetPorts(args []*ic.Argument) (*voltha.Ports, error) {
+func (rhp *AdapterRequestHandlerProxy) GetPorts(ctx context.Context, args []*ic.Argument) (*voltha.Ports, error) {
 	if len(args) < 3 {
 		logger.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
@@ -254,7 +254,7 @@
 }
 
 // GetChildDevices gets all the child device IDs from the device passed as parameter
-func (rhp *AdapterRequestHandlerProxy) GetChildDevices(args []*ic.Argument) (*voltha.Devices, error) {
+func (rhp *AdapterRequestHandlerProxy) GetChildDevices(ctx context.Context, args []*ic.Argument) (*voltha.Devices, error) {
 	if len(args) < 2 {
 		logger.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
@@ -284,7 +284,7 @@
 
 // ChildDeviceDetected is invoked when a child device is detected.  The following parameters are expected:
 // {parent_device_id, parent_port_no, child_device_type, channel_id, vendor_id, serial_number)
-func (rhp *AdapterRequestHandlerProxy) ChildDeviceDetected(args []*ic.Argument) (*voltha.Device, error) {
+func (rhp *AdapterRequestHandlerProxy) ChildDeviceDetected(ctx context.Context, args []*ic.Argument) (*voltha.Device, error) {
 	if len(args) < 5 {
 		logger.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
@@ -355,7 +355,7 @@
 }
 
 // DeviceStateUpdate updates device status
-func (rhp *AdapterRequestHandlerProxy) DeviceStateUpdate(args []*ic.Argument) (*empty.Empty, error) {
+func (rhp *AdapterRequestHandlerProxy) DeviceStateUpdate(ctx context.Context, args []*ic.Argument) (*empty.Empty, error) {
 	if len(args) < 3 {
 		logger.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
@@ -401,7 +401,7 @@
 }
 
 // ChildrenStateUpdate updates child device status
-func (rhp *AdapterRequestHandlerProxy) ChildrenStateUpdate(args []*ic.Argument) (*empty.Empty, error) {
+func (rhp *AdapterRequestHandlerProxy) ChildrenStateUpdate(ctx context.Context, args []*ic.Argument) (*empty.Empty, error) {
 	if len(args) < 3 {
 		logger.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
@@ -448,7 +448,7 @@
 }
 
 // PortsStateUpdate updates the ports state related to the device
-func (rhp *AdapterRequestHandlerProxy) PortsStateUpdate(args []*ic.Argument) (*empty.Empty, error) {
+func (rhp *AdapterRequestHandlerProxy) PortsStateUpdate(ctx context.Context, args []*ic.Argument) (*empty.Empty, error) {
 	if len(args) < 2 {
 		logger.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
@@ -486,7 +486,7 @@
 }
 
 // PortStateUpdate updates the port state of the device
-func (rhp *AdapterRequestHandlerProxy) PortStateUpdate(args []*ic.Argument) (*empty.Empty, error) {
+func (rhp *AdapterRequestHandlerProxy) PortStateUpdate(ctx context.Context, args []*ic.Argument) (*empty.Empty, error) {
 	if len(args) < 3 {
 		logger.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
@@ -541,7 +541,7 @@
 }
 
 // DeleteAllPorts deletes all ports of device
-func (rhp *AdapterRequestHandlerProxy) DeleteAllPorts(args []*ic.Argument) (*empty.Empty, error) {
+func (rhp *AdapterRequestHandlerProxy) DeleteAllPorts(ctx context.Context, args []*ic.Argument) (*empty.Empty, error) {
 	if len(args) < 3 {
 		logger.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
@@ -574,7 +574,7 @@
 
 // ChildDevicesLost indicates that a parent device is in a state (Disabled) where it cannot manage the child devices.
 // This will trigger the Core to disable all the child devices.
-func (rhp *AdapterRequestHandlerProxy) ChildDevicesLost(args []*ic.Argument) (*empty.Empty, error) {
+func (rhp *AdapterRequestHandlerProxy) ChildDevicesLost(ctx context.Context, args []*ic.Argument) (*empty.Empty, error) {
 	if len(args) < 2 {
 		logger.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
@@ -607,7 +607,7 @@
 
 // ChildDevicesDetected invoked by an adapter when child devices are found, typically after after a disable/enable sequence.
 // This will trigger the Core to Enable all the child devices of that parent.
-func (rhp *AdapterRequestHandlerProxy) ChildDevicesDetected(args []*ic.Argument) (*empty.Empty, error) {
+func (rhp *AdapterRequestHandlerProxy) ChildDevicesDetected(ctx context.Context, args []*ic.Argument) (*empty.Empty, error) {
 	if len(args) < 2 {
 		logger.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
@@ -639,7 +639,7 @@
 }
 
 // PortCreated adds port to device
-func (rhp *AdapterRequestHandlerProxy) PortCreated(args []*ic.Argument) (*empty.Empty, error) {
+func (rhp *AdapterRequestHandlerProxy) PortCreated(ctx context.Context, args []*ic.Argument) (*empty.Empty, error) {
 	if len(args) < 3 {
 		logger.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
@@ -677,7 +677,7 @@
 }
 
 // DevicePMConfigUpdate initializes the pm configs as defined by the adapter.
-func (rhp *AdapterRequestHandlerProxy) DevicePMConfigUpdate(args []*ic.Argument) (*empty.Empty, error) {
+func (rhp *AdapterRequestHandlerProxy) DevicePMConfigUpdate(ctx context.Context, args []*ic.Argument) (*empty.Empty, error) {
 	if len(args) < 2 {
 		logger.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
@@ -710,7 +710,7 @@
 }
 
 // PacketIn sends the incoming packet of device
-func (rhp *AdapterRequestHandlerProxy) PacketIn(args []*ic.Argument) (*empty.Empty, error) {
+func (rhp *AdapterRequestHandlerProxy) PacketIn(ctx context.Context, args []*ic.Argument) (*empty.Empty, error) {
 	if len(args) < 4 {
 		logger.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
@@ -756,7 +756,7 @@
 }
 
 // UpdateImageDownload updates image download
-func (rhp *AdapterRequestHandlerProxy) UpdateImageDownload(args []*ic.Argument) (*empty.Empty, error) {
+func (rhp *AdapterRequestHandlerProxy) UpdateImageDownload(ctx context.Context, args []*ic.Argument) (*empty.Empty, error) {
 	if len(args) < 2 {
 		logger.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
@@ -795,7 +795,7 @@
 }
 
 // ReconcileChildDevices reconciles child devices
-func (rhp *AdapterRequestHandlerProxy) ReconcileChildDevices(args []*ic.Argument) (*empty.Empty, error) {
+func (rhp *AdapterRequestHandlerProxy) ReconcileChildDevices(ctx context.Context, args []*ic.Argument) (*empty.Empty, error) {
 	if len(args) < 2 {
 		logger.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
@@ -827,7 +827,7 @@
 }
 
 // DeviceReasonUpdate updates device reason
-func (rhp *AdapterRequestHandlerProxy) DeviceReasonUpdate(args []*ic.Argument) (*empty.Empty, error) {
+func (rhp *AdapterRequestHandlerProxy) DeviceReasonUpdate(ctx context.Context, args []*ic.Argument) (*empty.Empty, error) {
 	if len(args) < 2 {
 		logger.Warn("DeviceReasonUpdate: invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("DeviceReasonUpdate: invalid-number-of-args")
diff --git a/rw_core/core/api/common_test.go b/rw_core/core/api/common_test.go
index f9bb047..d701984 100644
--- a/rw_core/core/api/common_test.go
+++ b/rw_core/core/api/common_test.go
@@ -84,7 +84,7 @@
 	if err != nil {
 		return nil, 0, err
 	}
-	etcdServer := mock_etcd.StartEtcdServer(mock_etcd.MKConfig(configName, kvClientPort, peerPort, storageDir, logLevel))
+	etcdServer := mock_etcd.StartEtcdServer(context.Background(), mock_etcd.MKConfig(context.Background(), configName, kvClientPort, peerPort, storageDir, logLevel))
 	if etcdServer == nil {
 		return nil, 0, status.Error(codes.Internal, "Embedded server failed to start")
 	}
@@ -93,42 +93,43 @@
 
 func stopEmbeddedEtcdServer(server *mock_etcd.EtcdServer) {
 	if server != nil {
-		server.Stop()
+		server.Stop(context.Background())
 	}
 }
 
 func setupKVClient(cf *config.RWCoreFlags, coreInstanceID string) kvstore.Client {
 	addr := cf.KVStoreHost + ":" + strconv.Itoa(cf.KVStorePort)
-	client, err := kvstore.NewEtcdClient(addr, cf.KVStoreTimeout, log.FatalLevel)
+	client, err := kvstore.NewEtcdClient(context.Background(), addr, cf.KVStoreTimeout, log.FatalLevel)
 	if err != nil {
 		panic("no kv client")
 	}
 	return client
 }
 
-func createMockAdapter(adapterType int, kafkaClient kafka.Client, coreInstanceID string, coreName string, adapterName string) (adapters.IAdapter, error) {
+func createMockAdapter(ctx context.Context, adapterType int, kafkaClient kafka.Client, coreInstanceID string, coreName string, adapterName string) (adapters.IAdapter, error) {
 	var err error
 	var adapter adapters.IAdapter
 	adapterKafkaICProxy := kafka.NewInterContainerProxy(
-		kafka.MsgClient(kafkaClient),
-		kafka.DefaultTopic(&kafka.Topic{Name: adapterName}))
-	adapterCoreProxy := com.NewCoreProxy(adapterKafkaICProxy, adapterName, coreName)
+		ctx,
+		kafka.MsgClient(ctx, kafkaClient),
+		kafka.DefaultTopic(ctx, &kafka.Topic{Name: adapterName}))
+	adapterCoreProxy := com.NewCoreProxy(ctx, adapterKafkaICProxy, adapterName, coreName)
 	var adapterReqHandler *com.RequestHandlerProxy
 	switch adapterType {
 	case OltAdapter:
-		adapter = cm.NewOLTAdapter(adapterCoreProxy)
+		adapter = cm.NewOLTAdapter(ctx, adapterCoreProxy)
 	case OnuAdapter:
-		adapter = cm.NewONUAdapter(adapterCoreProxy)
+		adapter = cm.NewONUAdapter(ctx, adapterCoreProxy)
 	default:
 		logger.Fatalf("invalid-adapter-type-%d", adapterType)
 	}
-	adapterReqHandler = com.NewRequestHandlerProxy(coreInstanceID, adapter, adapterCoreProxy)
+	adapterReqHandler = com.NewRequestHandlerProxy(ctx, coreInstanceID, adapter, adapterCoreProxy)
 
-	if err = adapterKafkaICProxy.Start(); err != nil {
+	if err = adapterKafkaICProxy.Start(ctx); err != nil {
 		logger.Errorw("Failure-starting-adapter-intercontainerProxy", log.Fields{"error": err})
 		return nil, err
 	}
-	if err = adapterKafkaICProxy.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: adapterName}, adapterReqHandler); err != nil {
+	if err = adapterKafkaICProxy.SubscribeWithRequestHandlerInterface(ctx, kafka.Topic{Name: adapterName}, adapterReqHandler); err != nil {
 		logger.Errorw("Failure-to-subscribe-onu-request-handler", log.Fields{"error": err})
 		return nil, err
 	}
diff --git a/rw_core/core/api/grpc_nbi_handler.go b/rw_core/core/api/grpc_nbi_handler.go
index 55117d2..0793994 100755
--- a/rw_core/core/api/grpc_nbi_handler.go
+++ b/rw_core/core/api/grpc_nbi_handler.go
@@ -40,7 +40,7 @@
 type adapterManager struct{ *adapter.Manager }
 
 // NewNBIHandler creates API handler instance
-func NewNBIHandler(deviceMgr *device.Manager, logicalDeviceMgr *device.LogicalManager, adapterMgr *adapter.Manager) *NBIHandler {
+func NewNBIHandler(ctx context.Context, deviceMgr *device.Manager, logicalDeviceMgr *device.LogicalManager, adapterMgr *adapter.Manager) *NBIHandler {
 	return &NBIHandler{
 		Manager:        deviceMgr,
 		LogicalManager: logicalDeviceMgr,
diff --git a/rw_core/core/api/grpc_nbi_handler_test.go b/rw_core/core/api/grpc_nbi_handler_test.go
index fa1f657..fae4c79 100755
--- a/rw_core/core/api/grpc_nbi_handler_test.go
+++ b/rw_core/core/api/grpc_nbi_handler_test.go
@@ -73,7 +73,7 @@
 	maxTimeout        time.Duration
 }
 
-func newNBTest() *NBTest {
+func newNBTest(ctx context.Context) *NBTest {
 	test := &NBTest{}
 	// Start the embedded etcd server
 	var err error
@@ -82,7 +82,7 @@
 		logger.Fatal(err)
 	}
 	// Create the kafka client
-	test.kClient = mock_kafka.NewKafkaClient()
+	test.kClient = mock_kafka.NewKafkaClient(ctx)
 	test.oltAdapterName = "olt_adapter_mock"
 	test.onuAdapterName = "onu_adapter_mock"
 	test.coreInstanceID = "rw-nbi-test"
@@ -117,39 +117,40 @@
 		LivenessChannelInterval: cfg.LiveProbeInterval / 2,
 		PathPrefix:              cfg.KVStoreDataPrefix}
 	nb.kmp = kafka.NewInterContainerProxy(
-		kafka.InterContainerHost(cfg.KafkaAdapterHost),
-		kafka.InterContainerPort(cfg.KafkaAdapterPort),
-		kafka.MsgClient(nb.kClient),
-		kafka.DefaultTopic(&kafka.Topic{Name: cfg.CoreTopic}),
-		kafka.DeviceDiscoveryTopic(&kafka.Topic{Name: cfg.AffinityRouterTopic}))
+		context.Background(),
+		kafka.InterContainerHost(context.Background(), cfg.KafkaAdapterHost),
+		kafka.InterContainerPort(context.Background(), cfg.KafkaAdapterPort),
+		kafka.MsgClient(context.Background(), nb.kClient),
+		kafka.DefaultTopic(context.Background(), &kafka.Topic{Name: cfg.CoreTopic}),
+		kafka.DeviceDiscoveryTopic(context.Background(), &kafka.Topic{Name: cfg.AffinityRouterTopic}))
 
-	endpointMgr := kafka.NewEndpointManager(backend)
-	proxy := model.NewProxy(backend, "/")
-	nb.adapterMgr = adapter.NewAdapterManager(proxy, nb.coreInstanceID, nb.kClient)
-	nb.deviceMgr, nb.logicalDeviceMgr = device.NewManagers(proxy, nb.adapterMgr, nb.kmp, endpointMgr, cfg.CorePairTopic, nb.coreInstanceID, cfg.DefaultCoreTimeout)
+	endpointMgr := kafka.NewEndpointManager(context.Background(), backend)
+	proxy := model.NewProxy(context.Background(), backend, "/")
+	nb.adapterMgr = adapter.NewAdapterManager(context.Background(), proxy, nb.coreInstanceID, nb.kClient)
+	nb.deviceMgr, nb.logicalDeviceMgr = device.NewManagers(context.Background(), proxy, nb.adapterMgr, nb.kmp, endpointMgr, cfg.CorePairTopic, nb.coreInstanceID, cfg.DefaultCoreTimeout)
 	nb.adapterMgr.Start(ctx)
 
-	if err := nb.kmp.Start(); err != nil {
+	if err := nb.kmp.Start(context.Background()); err != nil {
 		logger.Fatalf("Cannot start InterContainerProxy: %s", err)
 	}
-	requestProxy := NewAdapterRequestHandlerProxy(nb.deviceMgr, nb.adapterMgr)
-	if err := nb.kmp.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: cfg.CoreTopic}, requestProxy); err != nil {
+	requestProxy := NewAdapterRequestHandlerProxy(context.Background(), nb.deviceMgr, nb.adapterMgr)
+	if err := nb.kmp.SubscribeWithRequestHandlerInterface(context.Background(), kafka.Topic{Name: cfg.CoreTopic}, requestProxy); err != nil {
 		logger.Fatalf("Cannot add request handler: %s", err)
 	}
-	if err := nb.kmp.SubscribeWithDefaultRequestHandler(kafka.Topic{Name: cfg.CorePairTopic}, kafka.OffsetNewest); err != nil {
+	if err := nb.kmp.SubscribeWithDefaultRequestHandler(context.Background(), kafka.Topic{Name: cfg.CorePairTopic}, kafka.OffsetNewest); err != nil {
 		logger.Fatalf("Cannot add default request handler: %s", err)
 	}
 }
 
 func (nb *NBTest) createAndregisterAdapters(t *testing.T) {
 	// Setup the mock OLT adapter
-	oltAdapter, err := createMockAdapter(OltAdapter, nb.kClient, nb.coreInstanceID, coreName, nb.oltAdapterName)
+	oltAdapter, err := createMockAdapter(context.Background(), OltAdapter, nb.kClient, nb.coreInstanceID, coreName, nb.oltAdapterName)
 	if err != nil {
 		logger.Fatalw("setting-mock-olt-adapter-failed", log.Fields{"error": err})
 	}
 	nb.oltAdapter = (oltAdapter).(*cm.OLTAdapter)
-	nb.numONUPerOLT = nb.oltAdapter.GetNumONUPerOLT()
-	nb.startingUNIPortNo = nb.oltAdapter.GetStartingUNIPortNo()
+	nb.numONUPerOLT = nb.oltAdapter.GetNumONUPerOLT(context.Background())
+	nb.startingUNIPortNo = nb.oltAdapter.GetStartingUNIPortNo(context.Background())
 
 	//	Register the adapter
 	registrationData := &voltha.Adapter{
@@ -163,13 +164,13 @@
 	}
 	types := []*voltha.DeviceType{{Id: nb.oltAdapterName, Adapter: nb.oltAdapterName, AcceptsAddRemoveFlowUpdates: true}}
 	deviceTypes := &voltha.DeviceTypes{Items: types}
-	if _, err := nb.adapterMgr.RegisterAdapter(registrationData, deviceTypes); err != nil {
+	if _, err := nb.adapterMgr.RegisterAdapter(context.Background(), registrationData, deviceTypes); err != nil {
 		logger.Errorw("failed-to-register-adapter", log.Fields{"error": err})
 		assert.NotNil(t, err)
 	}
 
 	// Setup the mock ONU adapter
-	onuAdapter, err := createMockAdapter(OnuAdapter, nb.kClient, nb.coreInstanceID, coreName, nb.onuAdapterName)
+	onuAdapter, err := createMockAdapter(context.Background(), OnuAdapter, nb.kClient, nb.coreInstanceID, coreName, nb.onuAdapterName)
 	if err != nil {
 		logger.Fatalw("setting-mock-onu-adapter-failed", log.Fields{"error": err})
 	}
@@ -187,7 +188,7 @@
 	}
 	types = []*voltha.DeviceType{{Id: nb.onuAdapterName, Adapter: nb.onuAdapterName, AcceptsAddRemoveFlowUpdates: true}}
 	deviceTypes = &voltha.DeviceTypes{Items: types}
-	if _, err := nb.adapterMgr.RegisterAdapter(registrationData, deviceTypes); err != nil {
+	if _, err := nb.adapterMgr.RegisterAdapter(context.Background(), registrationData, deviceTypes); err != nil {
 		logger.Errorw("failed-to-register-adapter", log.Fields{"error": err})
 		assert.NotNil(t, err)
 	}
@@ -195,10 +196,10 @@
 
 func (nb *NBTest) stopAll() {
 	if nb.kClient != nil {
-		nb.kClient.Stop()
+		nb.kClient.Stop(context.Background())
 	}
 	if nb.kmp != nil {
-		nb.kmp.Stop()
+		nb.kmp.Stop(context.Background())
 	}
 	if nb.etcdServer != nil {
 		stopEmbeddedEtcdServer(nb.etcdServer)
@@ -972,7 +973,7 @@
 	for _, val := range fa.MatchFields {
 		matchFields = append(matchFields, &ofp.OfpOxmField{Field: &ofp.OfpOxmField_OfbField{OfbField: val}})
 	}
-	return flows.MkSimpleFlowMod(matchFields, fa.Actions, fa.Command, fa.KV)
+	return flows.MkSimpleFlowMod(context.Background(), matchFields, fa.Actions, fa.Command, fa.KV)
 }
 
 func createMetadata(cTag int, techProfile int, port int) uint64 {
@@ -1001,6 +1002,7 @@
 	// Send flows for the parent device
 	var nniPorts []*voltha.LogicalPort
 	var uniPorts []*voltha.LogicalPort
+	ctx := context.Background()
 	for _, p := range logicalDevice.Ports {
 		if p.RootPort {
 			nniPorts = append(nniPorts, p)
@@ -1017,11 +1019,11 @@
 	fa = &flows.FlowArgs{
 		KV: flows.OfpFlowModArgs{"priority": 10000, "buffer_id": maxInt32, "out_port": maxInt32, "out_group": maxInt32, "flags": 1},
 		MatchFields: []*ofp.OfpOxmOfbField{
-			flows.InPort(nniPort),
-			flows.EthType(35020),
+			flows.InPort(ctx, nniPort),
+			flows.EthType(ctx, 35020),
 		},
 		Actions: []*ofp.OfpAction{
-			flows.Output(controllerPortMask),
+			flows.Output(ctx, controllerPortMask),
 		},
 	}
 	flowLLDP := ofp.FlowTableUpdate{FlowMod: makeSimpleFlowMod(fa), Id: logicalDevice.Id}
@@ -1031,14 +1033,14 @@
 	fa = &flows.FlowArgs{
 		KV: flows.OfpFlowModArgs{"priority": 10000, "buffer_id": maxInt32, "out_port": maxInt32, "out_group": maxInt32, "flags": 1},
 		MatchFields: []*ofp.OfpOxmOfbField{
-			flows.InPort(nniPort),
-			flows.EthType(2048),
-			flows.IpProto(17),
-			flows.UdpSrc(67),
-			flows.UdpDst(68),
+			flows.InPort(ctx, nniPort),
+			flows.EthType(ctx, 2048),
+			flows.IpProto(ctx, 17),
+			flows.UdpSrc(ctx, 67),
+			flows.UdpDst(ctx, 68),
 		},
 		Actions: []*ofp.OfpAction{
-			flows.Output(controllerPortMask),
+			flows.Output(ctx, controllerPortMask),
 		},
 	}
 	flowIPV4 := ofp.FlowTableUpdate{FlowMod: makeSimpleFlowMod(fa), Id: logicalDevice.Id}
@@ -1048,14 +1050,14 @@
 	fa = &flows.FlowArgs{
 		KV: flows.OfpFlowModArgs{"priority": 10000, "buffer_id": maxInt32, "out_port": maxInt32, "out_group": maxInt32, "flags": 1},
 		MatchFields: []*ofp.OfpOxmOfbField{
-			flows.InPort(nniPort),
-			flows.EthType(34525),
-			flows.IpProto(17),
-			flows.UdpSrc(546),
-			flows.UdpDst(547),
+			flows.InPort(ctx, nniPort),
+			flows.EthType(ctx, 34525),
+			flows.IpProto(ctx, 17),
+			flows.UdpSrc(ctx, 546),
+			flows.UdpDst(ctx, 547),
 		},
 		Actions: []*ofp.OfpAction{
-			flows.Output(controllerPortMask),
+			flows.Output(ctx, controllerPortMask),
 		},
 	}
 	flowIPV6 := ofp.FlowTableUpdate{FlowMod: makeSimpleFlowMod(fa), Id: logicalDevice.Id}
@@ -1066,17 +1068,18 @@
 }
 
 func (nb *NBTest) sendEAPFlows(t *testing.T, nbi *NBIHandler, logicalDeviceID string, port *ofp.OfpPort, vlan int, meterID uint64) {
+	ctx := context.Background()
 	maxInt32 := uint64(0xFFFFFFFF)
 	controllerPortMask := uint32(4294967293) // will result in 4294967293&0x7fffffff => 2147483645 which is the actual controller port
 	fa := &flows.FlowArgs{
 		KV: flows.OfpFlowModArgs{"priority": 10000, "buffer_id": maxInt32, "out_port": maxInt32, "out_group": maxInt32, "flags": 1, "write_metadata": createMetadata(vlan, 64, 0), "meter_id": meterID},
 		MatchFields: []*ofp.OfpOxmOfbField{
-			flows.InPort(port.PortNo),
-			flows.EthType(34958),
-			flows.VlanVid(8187),
+			flows.InPort(ctx, port.PortNo),
+			flows.EthType(ctx, 34958),
+			flows.VlanVid(ctx, 8187),
 		},
 		Actions: []*ofp.OfpAction{
-			flows.Output(controllerPortMask),
+			flows.Output(ctx, controllerPortMask),
 		},
 	}
 	flowEAP := ofp.FlowTableUpdate{FlowMod: makeSimpleFlowMod(fa), Id: logicalDeviceID}
@@ -1088,12 +1091,12 @@
 	defer wg.Done()
 
 	// Clear any existing flows on the adapters
-	nb.oltAdapter.ClearFlows()
-	nb.onuAdapter.ClearFlows()
+	nb.oltAdapter.ClearFlows(context.Background())
+	nb.onuAdapter.ClearFlows(context.Background())
 
 	// Set the adapter actions on flow addition/deletion
-	nb.oltAdapter.SetFlowAction(flowAddFail, flowDelete)
-	nb.onuAdapter.SetFlowAction(flowAddFail, flowDelete)
+	nb.oltAdapter.SetFlowAction(context.Background(), flowAddFail, flowDelete)
+	nb.onuAdapter.SetFlowAction(context.Background(), flowAddFail, flowDelete)
 
 	// Wait until a logical device is ready
 	var vlFunction isLogicalDevicesConditionSatisfied = func(lds *voltha.LogicalDevices) bool {
@@ -1149,7 +1152,7 @@
 	processedNniLogicalPorts := 0
 	processedUniLogicalPorts := 0
 
-	for event := range nbi.GetChangeEventsQueueForTest() {
+	for event := range nbi.GetChangeEventsQueueForTest(context.Background()) {
 		startingVlan++
 		if portStatus, ok := (event.Event).(*ofp.ChangeEvent_PortStatus); ok {
 			ps := portStatus.PortStatus
@@ -1177,7 +1180,7 @@
 		expectedFlowCount = 0
 	}
 	var oltVFunc isConditionSatisfied = func() bool {
-		return nb.oltAdapter.GetFlowCount() >= expectedFlowCount
+		return nb.oltAdapter.GetFlowCount(context.Background()) >= expectedFlowCount
 	}
 	err = waitUntilCondition(nb.maxTimeout, nbi, oltVFunc)
 	assert.Nil(t, err)
@@ -1188,7 +1191,7 @@
 		expectedFlowCount = 0
 	}
 	var onuVFunc isConditionSatisfied = func() bool {
-		return nb.onuAdapter.GetFlowCount() == expectedFlowCount
+		return nb.onuAdapter.GetFlowCount(context.Background()) == expectedFlowCount
 	}
 	err = waitUntilCondition(nb.maxTimeout, nbi, onuVFunc)
 	assert.Nil(t, err)
@@ -1252,7 +1255,7 @@
 
 	//log.SetPackageLogLevel("github.com/opencord/voltha-go/rw_core/core", log.DebugLevel)
 
-	nb := newNBTest()
+	nb := newNBTest(context.Background())
 	assert.NotNil(t, nb)
 
 	defer nb.stopAll()
@@ -1261,7 +1264,7 @@
 	nb.startCore(false)
 
 	// Set the grpc API interface - no grpc server is running in unit test
-	nbi := NewNBIHandler(nb.deviceMgr, nb.logicalDeviceMgr, nb.adapterMgr)
+	nbi := NewNBIHandler(context.Background(), nb.deviceMgr, nb.logicalDeviceMgr, nb.adapterMgr)
 
 	// 1. Basic test with no data in Core
 	nb.testCoreWithoutData(t, nbi)
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 0dbecc8..f33fba8 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -47,6 +47,7 @@
 	// If the context has a probe then fetch it and register our services
 	if p := probe.GetProbeFromContext(ctx); p != nil {
 		p.RegisterService(
+			ctx,
 			"message-bus",
 			"kv-store",
 			"adapter-manager",
@@ -74,15 +75,15 @@
 
 	// setup kv client
 	logger.Debugw("create-kv-client", log.Fields{"kvstore": cf.KVStoreType})
-	kvClient, err := newKVClient(cf.KVStoreType, cf.KVStoreHost+":"+strconv.Itoa(cf.KVStorePort), cf.KVStoreTimeout)
+	kvClient, err := newKVClient(ctx, cf.KVStoreType, cf.KVStoreHost+":"+strconv.Itoa(cf.KVStorePort), cf.KVStoreTimeout)
 	if err != nil {
 		logger.Fatal(err)
 	}
 	defer stopKVClient(context.Background(), kvClient)
 
 	// sync logging config with kv store
-	cm := conf.NewConfigManager(kvClient, cf.KVStoreType, cf.KVStoreHost, cf.KVStorePort, cf.KVStoreTimeout)
-	go conf.StartLogLevelConfigProcessing(cm, ctx)
+	cm := conf.NewConfigManager(ctx, kvClient, cf.KVStoreType, cf.KVStoreHost, cf.KVStorePort, cf.KVStoreTimeout)
+	go conf.StartLogLevelConfigProcessing(ctx, cm)
 
 	backend := &db.Backend{
 		Client:    kvClient,
@@ -104,27 +105,28 @@
 
 	// create kafka client
 	kafkaClient := kafka.NewSaramaClient(
-		kafka.Host(cf.KafkaAdapterHost),
-		kafka.Port(cf.KafkaAdapterPort),
-		kafka.ConsumerType(kafka.GroupCustomer),
-		kafka.ProducerReturnOnErrors(true),
-		kafka.ProducerReturnOnSuccess(true),
-		kafka.ProducerMaxRetries(6),
-		kafka.NumPartitions(3),
-		kafka.ConsumerGroupName(id),
-		kafka.ConsumerGroupPrefix(id),
-		kafka.AutoCreateTopic(true),
-		kafka.ProducerFlushFrequency(5),
-		kafka.ProducerRetryBackoff(time.Millisecond*30),
-		kafka.LivenessChannelInterval(cf.LiveProbeInterval/2),
+		ctx,
+		kafka.Host(ctx, cf.KafkaAdapterHost),
+		kafka.Port(ctx, cf.KafkaAdapterPort),
+		kafka.ConsumerType(ctx, kafka.GroupCustomer),
+		kafka.ProducerReturnOnErrors(ctx, true),
+		kafka.ProducerReturnOnSuccess(ctx, true),
+		kafka.ProducerMaxRetries(ctx, 6),
+		kafka.NumPartitions(ctx, 3),
+		kafka.ConsumerGroupName(ctx, id),
+		kafka.ConsumerGroupPrefix(ctx, id),
+		kafka.AutoCreateTopic(ctx, true),
+		kafka.ProducerFlushFrequency(ctx, 5),
+		kafka.ProducerRetryBackoff(ctx, time.Millisecond*30),
+		kafka.LivenessChannelInterval(ctx, cf.LiveProbeInterval/2),
 	)
 	// defer kafkaClient.Stop()
 
 	// create kv proxy
-	proxy := model.NewProxy(backend, "/")
+	proxy := model.NewProxy(ctx, backend, "/")
 
 	// load adapters & device types while other things are starting
-	adapterMgr := adapter.NewAdapterManager(proxy, id, kafkaClient)
+	adapterMgr := adapter.NewAdapterManager(ctx, proxy, id, kafkaClient)
 	go adapterMgr.Start(ctx)
 
 	// connect to kafka, then wait until reachable and publisher/consumer created
@@ -134,27 +136,27 @@
 		logger.Warn("Failed to setup kafka connection")
 		return
 	}
-	defer kmp.Stop()
+	defer kmp.Stop(ctx)
 	go monitorKafkaLiveness(ctx, kmp, cf.LiveProbeInterval, cf.NotLiveProbeInterval)
 
 	// create the core of the system, the device managers
-	endpointMgr := kafka.NewEndpointManager(backend)
-	deviceMgr, logicalDeviceMgr := device.NewManagers(proxy, adapterMgr, kmp, endpointMgr, cf.CorePairTopic, id, cf.DefaultCoreTimeout)
+	endpointMgr := kafka.NewEndpointManager(ctx, backend)
+	deviceMgr, logicalDeviceMgr := device.NewManagers(ctx, proxy, adapterMgr, kmp, endpointMgr, cf.CorePairTopic, id, cf.DefaultCoreTimeout)
 
 	// register kafka RPC handler
-	registerAdapterRequestHandlers(kmp, deviceMgr, adapterMgr, cf.CoreTopic, cf.CorePairTopic)
+	registerAdapterRequestHandlers(ctx, kmp, deviceMgr, adapterMgr, cf.CoreTopic, cf.CorePairTopic)
 
 	// start gRPC handler
-	grpcServer := grpcserver.NewGrpcServer(cf.GrpcHost, cf.GrpcPort, nil, false, probe.GetProbeFromContext(ctx))
-	go startGRPCService(ctx, grpcServer, api.NewNBIHandler(deviceMgr, logicalDeviceMgr, adapterMgr))
-	defer grpcServer.Stop()
+	grpcServer := grpcserver.NewGrpcServer(ctx, cf.GrpcHost, cf.GrpcPort, nil, false, probe.GetProbeFromContext(ctx))
+	go startGRPCService(ctx, grpcServer, api.NewNBIHandler(ctx, deviceMgr, logicalDeviceMgr, adapterMgr))
+	defer grpcServer.Stop(ctx)
 
 	// wait for core to be stopped, via Stop() or context cancellation, before running deferred functions
 	<-ctx.Done()
 }
 
 // Stop brings down core services
-func (core *Core) Stop() {
+func (core *Core) Stop(ctx context.Context) {
 	core.shutdown()
 	<-core.stopped
 }
diff --git a/rw_core/core/device/agent.go b/rw_core/core/device/agent.go
index 901b27f..6073dc4 100755
--- a/rw_core/core/device/agent.go
+++ b/rw_core/core/device/agent.go
@@ -62,11 +62,11 @@
 }
 
 //newAgent creates a new device agent. The device will be initialized when start() is called.
-func newAgent(ap *remote.AdapterProxy, device *voltha.Device, deviceMgr *Manager, cdProxy *model.Proxy, timeout time.Duration) *Agent {
+func newAgent(ctx context.Context, ap *remote.AdapterProxy, device *voltha.Device, deviceMgr *Manager, cdProxy *model.Proxy, timeout time.Duration) *Agent {
 	var agent Agent
 	agent.adapterProxy = ap
 	if device.Id == "" {
-		agent.deviceID = coreutils.CreateDeviceID()
+		agent.deviceID = coreutils.CreateDeviceID(ctx)
 	} else {
 		agent.deviceID = device.Id
 	}
@@ -80,7 +80,7 @@
 	agent.clusterDataProxy = cdProxy
 	agent.defaultTimeout = timeout
 	agent.device = proto.Clone(device).(*voltha.Device)
-	agent.requestQueue = coreutils.NewRequestQueue()
+	agent.requestQueue = coreutils.NewRequestQueue(ctx)
 	return &agent
 }
 
@@ -154,7 +154,7 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
-	defer agent.requestQueue.RequestComplete()
+	defer agent.requestQueue.RequestComplete(ctx)
 
 	logger.Infow("stopping-device-agent", log.Fields{"deviceId": agent.deviceID, "parentId": agent.parentID})
 
@@ -178,7 +178,7 @@
 		logger.Warnw("request-aborted", log.Fields{"device-id": agent.deviceID, "error": err})
 		return
 	}
-	defer agent.requestQueue.RequestComplete()
+	defer agent.requestQueue.RequestComplete(ctx)
 	logger.Debug("reconciling-device-agent-devicetype")
 	// TODO: context timeout
 	device := &voltha.Device{}
@@ -196,14 +196,14 @@
 
 // onSuccess is a common callback for scenarios where we receive a nil response following a request to an adapter
 // and the only action required is to publish a successful result on kafka
-func (agent *Agent) onSuccess(rpc string, response interface{}, reqArgs ...interface{}) {
+func (agent *Agent) onSuccess(ctx context.Context, rpc string, response interface{}, reqArgs ...interface{}) {
 	logger.Debugw("response successful", log.Fields{"rpc": rpc, "device-id": agent.deviceID})
 	// TODO: Post success message onto kafka
 }
 
 // onFailure is a common callback for scenarios where we receive an error response following a request to an adapter
 // and the only action required is to publish the failed result on kafka
-func (agent *Agent) onFailure(rpc string, response interface{}, reqArgs ...interface{}) {
+func (agent *Agent) onFailure(ctx context.Context, rpc string, response interface{}, reqArgs ...interface{}) {
 	if res, ok := response.(error); ok {
 		logger.Errorw("rpc-failed", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "error": res, "args": reqArgs})
 	} else {
@@ -218,14 +218,14 @@
 	select {
 	case rpcResponse, ok := <-ch:
 		if !ok {
-			onFailure(rpc, status.Errorf(codes.Aborted, "channel-closed"), reqArgs)
+			onFailure(ctx, rpc, status.Errorf(codes.Aborted, "channel-closed"), reqArgs)
 		} else if rpcResponse.Err != nil {
-			onFailure(rpc, rpcResponse.Err, reqArgs)
+			onFailure(ctx, rpc, rpcResponse.Err, reqArgs)
 		} else {
-			onSuccess(rpc, rpcResponse.Reply, reqArgs)
+			onSuccess(ctx, rpc, rpcResponse.Reply, reqArgs)
 		}
 	case <-ctx.Done():
-		onFailure(rpc, ctx.Err(), reqArgs)
+		onFailure(ctx, rpc, ctx.Err(), reqArgs)
 	}
 }
 
@@ -234,12 +234,12 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return nil, err
 	}
-	defer agent.requestQueue.RequestComplete()
+	defer agent.requestQueue.RequestComplete(ctx)
 	return proto.Clone(agent.device).(*voltha.Device), nil
 }
 
 // getDeviceWithoutLock is a helper function to be used ONLY by any device agent function AFTER it has acquired the device lock.
-func (agent *Agent) getDeviceWithoutLock() *voltha.Device {
+func (agent *Agent) getDeviceWithoutLock(ctx context.Context) *voltha.Device {
 	return proto.Clone(agent.device).(*voltha.Device)
 }
 
@@ -248,16 +248,16 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
-	defer agent.requestQueue.RequestComplete()
+	defer agent.requestQueue.RequestComplete(ctx)
 
 	logger.Debugw("enableDevice", log.Fields{"device-id": agent.deviceID})
 
-	cloned := agent.getDeviceWithoutLock()
+	cloned := agent.getDeviceWithoutLock(ctx)
 
 	// First figure out which adapter will handle this device type.  We do it at this stage as allow devices to be
 	// pre-provisioned with the required adapter not registered.   At this stage, since we need to communicate
 	// with the adapter then we need to know the adapter that will handle this request
-	adapterName, err := agent.adapterMgr.GetAdapterType(cloned.Type)
+	adapterName, err := agent.adapterMgr.GetAdapterType(ctx, cloned.Type)
 	if err != nil {
 		return err
 	}
@@ -306,20 +306,20 @@
 	select {
 	case rpcResponse, ok := <-ch:
 		if !ok {
-			response.Error(status.Errorf(codes.Aborted, "channel-closed"))
+			response.Error(ctx, status.Errorf(codes.Aborted, "channel-closed"))
 		} else if rpcResponse.Err != nil {
-			response.Error(rpcResponse.Err)
+			response.Error(ctx, rpcResponse.Err)
 		} else {
-			response.Done()
+			response.Done(ctx)
 		}
 	case <-ctx.Done():
-		response.Error(ctx.Err())
+		response.Error(ctx, ctx.Err())
 	}
 }
 
 //deleteFlowWithoutPreservingOrder removes a flow specified by index from the flows slice.  This function will
 //panic if the index is out of range.
-func deleteFlowWithoutPreservingOrder(flows []*ofp.OfpFlowStats, index int) []*ofp.OfpFlowStats {
+func deleteFlowWithoutPreservingOrder(ctx context.Context, flows []*ofp.OfpFlowStats, index int) []*ofp.OfpFlowStats {
 	flows[index] = flows[len(flows)-1]
 	flows[len(flows)-1] = nil
 	return flows[:len(flows)-1]
@@ -327,16 +327,16 @@
 
 //deleteGroupWithoutPreservingOrder removes a group specified by index from the groups slice.  This function will
 //panic if the index is out of range.
-func deleteGroupWithoutPreservingOrder(groups []*ofp.OfpGroupEntry, index int) []*ofp.OfpGroupEntry {
+func deleteGroupWithoutPreservingOrder(ctx context.Context, groups []*ofp.OfpGroupEntry, index int) []*ofp.OfpGroupEntry {
 	groups[index] = groups[len(groups)-1]
 	groups[len(groups)-1] = nil
 	return groups[:len(groups)-1]
 }
 
-func flowsToUpdateToDelete(newFlows, existingFlows []*ofp.OfpFlowStats) (updatedNewFlows, flowsToDelete, updatedAllFlows []*ofp.OfpFlowStats) {
+func flowsToUpdateToDelete(ctx context.Context, newFlows, existingFlows []*ofp.OfpFlowStats) (updatedNewFlows, flowsToDelete, updatedAllFlows []*ofp.OfpFlowStats) {
 	// Process flows
 	for _, flow := range existingFlows {
-		if idx := fu.FindFlows(newFlows, flow); idx == -1 {
+		if idx := fu.FindFlows(ctx, newFlows, flow); idx == -1 {
 			updatedAllFlows = append(updatedAllFlows, flow)
 		} else {
 			// We have a matching flow (i.e. the following field matches: "TableId", "Priority", "Flags", "Cookie",
@@ -344,7 +344,7 @@
 			// ignored.  Otherwise, the previous flow will be deleted and the new one added
 			if proto.Equal(newFlows[idx], flow) {
 				// Flow already exist, remove it from the new flows but keep it in the updated flows slice
-				newFlows = deleteFlowWithoutPreservingOrder(newFlows, idx)
+				newFlows = deleteFlowWithoutPreservingOrder(ctx, newFlows, idx)
 				updatedAllFlows = append(updatedAllFlows, flow)
 			} else {
 				// Minor change to flow, delete old and add new one
@@ -356,15 +356,15 @@
 	return newFlows, flowsToDelete, updatedAllFlows
 }
 
-func groupsToUpdateToDelete(newGroups, existingGroups []*ofp.OfpGroupEntry) (updatedNewGroups, groupsToDelete, updatedAllGroups []*ofp.OfpGroupEntry) {
+func groupsToUpdateToDelete(ctx context.Context, newGroups, existingGroups []*ofp.OfpGroupEntry) (updatedNewGroups, groupsToDelete, updatedAllGroups []*ofp.OfpGroupEntry) {
 	for _, group := range existingGroups {
-		if idx := fu.FindGroup(newGroups, group.Desc.GroupId); idx == -1 { // does not exist now
+		if idx := fu.FindGroup(ctx, newGroups, group.Desc.GroupId); idx == -1 { // does not exist now
 			updatedAllGroups = append(updatedAllGroups, group)
 		} else {
 			// Follow same logic as flows
 			if proto.Equal(newGroups[idx], group) {
 				// Group already exist, remove it from the new groups
-				newGroups = deleteGroupWithoutPreservingOrder(newGroups, idx)
+				newGroups = deleteGroupWithoutPreservingOrder(ctx, newGroups, idx)
 				updatedAllGroups = append(updatedAllGroups, group)
 			} else {
 				// Minor change to group, delete old and add new one
@@ -381,55 +381,55 @@
 
 	if (len(newFlows) | len(newGroups)) == 0 {
 		logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "groups": newGroups})
-		return coreutils.DoneResponse(), nil
+		return coreutils.DoneResponse(ctx), nil
 	}
 
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
-		return coreutils.DoneResponse(), err
+		return coreutils.DoneResponse(ctx), err
 	}
-	defer agent.requestQueue.RequestComplete()
+	defer agent.requestQueue.RequestComplete(ctx)
 
-	device := agent.getDeviceWithoutLock()
+	device := agent.getDeviceWithoutLock(ctx)
 	dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
 	if err != nil {
-		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
+		return coreutils.DoneResponse(ctx), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
 	}
 
 	existingFlows := proto.Clone(device.Flows).(*voltha.Flows)
 	existingGroups := proto.Clone(device.FlowGroups).(*ofp.FlowGroups)
 
 	// Process flows
-	newFlows, flowsToDelete, updatedAllFlows := flowsToUpdateToDelete(newFlows, existingFlows.Items)
+	newFlows, flowsToDelete, updatedAllFlows := flowsToUpdateToDelete(ctx, newFlows, existingFlows.Items)
 
 	// Process groups
-	newGroups, groupsToDelete, updatedAllGroups := groupsToUpdateToDelete(newGroups, existingGroups.Items)
+	newGroups, groupsToDelete, updatedAllGroups := groupsToUpdateToDelete(ctx, newGroups, existingGroups.Items)
 
 	// Sanity check
 	if (len(updatedAllFlows) | len(flowsToDelete) | len(updatedAllGroups) | len(groupsToDelete)) == 0 {
 		logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "groups": newGroups})
-		return coreutils.DoneResponse(), nil
+		return coreutils.DoneResponse(ctx), nil
 	}
 
 	// store the changed data
 	device.Flows = &voltha.Flows{Items: updatedAllFlows}
 	device.FlowGroups = &voltha.FlowGroups{Items: updatedAllGroups}
 	if err := agent.updateDeviceWithoutLock(ctx, device); err != nil {
-		return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-device-%s", agent.deviceID)
+		return coreutils.DoneResponse(ctx), status.Errorf(codes.Internal, "failure-updating-device-%s", agent.deviceID)
 	}
 
 	// Send update to adapters
 	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
-	response := coreutils.NewResponse()
+	response := coreutils.NewResponse(ctx)
 	if !dType.AcceptsAddRemoveFlowUpdates {
 		if len(updatedAllGroups) != 0 && reflect.DeepEqual(existingGroups.Items, updatedAllGroups) && len(updatedAllFlows) != 0 && reflect.DeepEqual(existingFlows.Items, updatedAllFlows) {
 			logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "groups": newGroups})
 			cancel()
-			return coreutils.DoneResponse(), nil
+			return coreutils.DoneResponse(ctx), nil
 		}
 		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: updatedAllFlows}, &voltha.FlowGroups{Items: updatedAllGroups}, flowMetadata)
 		if err != nil {
 			cancel()
-			return coreutils.DoneResponse(), err
+			return coreutils.DoneResponse(ctx), err
 		}
 		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
 	} else {
@@ -445,7 +445,7 @@
 		rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
 		if err != nil {
 			cancel()
-			return coreutils.DoneResponse(), err
+			return coreutils.DoneResponse(ctx), err
 		}
 		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
 	}
@@ -459,7 +459,7 @@
 	if err != nil {
 		return err
 	}
-	if errs := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, response); errs != nil {
+	if errs := coreutils.WaitForNilOrErrorResponses(ctx, agent.defaultTimeout, response); errs != nil {
 		logger.Warnw("no-adapter-response", log.Fields{"device-id": agent.deviceID, "result": errs})
 		return status.Errorf(codes.Aborted, "flow-failure-device-%s", agent.deviceID)
 	}
@@ -471,18 +471,18 @@
 
 	if (len(flowsToDel) | len(groupsToDel)) == 0 {
 		logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": flowsToDel, "groups": groupsToDel})
-		return coreutils.DoneResponse(), nil
+		return coreutils.DoneResponse(ctx), nil
 	}
 
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
-		return coreutils.DoneResponse(), err
+		return coreutils.DoneResponse(ctx), err
 	}
-	defer agent.requestQueue.RequestComplete()
+	defer agent.requestQueue.RequestComplete(ctx)
 
-	device := agent.getDeviceWithoutLock()
+	device := agent.getDeviceWithoutLock(ctx)
 	dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
 	if err != nil {
-		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
+		return coreutils.DoneResponse(ctx), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
 	}
 
 	existingFlows := proto.Clone(device.Flows).(*voltha.Flows)
@@ -493,14 +493,14 @@
 
 	// Process flows
 	for _, flow := range existingFlows.Items {
-		if idx := fu.FindFlows(flowsToDel, flow); idx == -1 {
+		if idx := fu.FindFlows(ctx, flowsToDel, flow); idx == -1 {
 			flowsToKeep = append(flowsToKeep, flow)
 		}
 	}
 
 	// Process groups
 	for _, group := range existingGroups.Items {
-		if fu.FindGroup(groupsToDel, group.Desc.GroupId) == -1 { // does not exist now
+		if fu.FindGroup(ctx, groupsToDel, group.Desc.GroupId) == -1 { // does not exist now
 			groupsToKeep = append(groupsToKeep, group)
 		}
 	}
@@ -517,29 +517,29 @@
 	// Sanity check
 	if (len(flowsToKeep) | len(flowsToDel) | len(groupsToKeep) | len(groupsToDel)) == 0 {
 		logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows-to-del": flowsToDel, "groups-to-del": groupsToDel})
-		return coreutils.DoneResponse(), nil
+		return coreutils.DoneResponse(ctx), nil
 	}
 
 	// store the changed data
 	device.Flows = &voltha.Flows{Items: flowsToKeep}
 	device.FlowGroups = &voltha.FlowGroups{Items: groupsToKeep}
 	if err := agent.updateDeviceWithoutLock(ctx, device); err != nil {
-		return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-%s", agent.deviceID)
+		return coreutils.DoneResponse(ctx), status.Errorf(codes.Internal, "failure-updating-%s", agent.deviceID)
 	}
 
 	// Send update to adapters
 	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
-	response := coreutils.NewResponse()
+	response := coreutils.NewResponse(ctx)
 	if !dType.AcceptsAddRemoveFlowUpdates {
 		if len(groupsToKeep) != 0 && reflect.DeepEqual(existingGroups.Items, groupsToKeep) && len(flowsToKeep) != 0 && reflect.DeepEqual(existingFlows.Items, flowsToKeep) {
 			logger.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceID, "flowsToDel": flowsToDel, "groupsToDel": groupsToDel})
 			cancel()
-			return coreutils.DoneResponse(), nil
+			return coreutils.DoneResponse(ctx), nil
 		}
 		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: flowsToKeep}, &voltha.FlowGroups{Items: groupsToKeep}, flowMetadata)
 		if err != nil {
 			cancel()
-			return coreutils.DoneResponse(), err
+			return coreutils.DoneResponse(ctx), err
 		}
 		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
 	} else {
@@ -555,7 +555,7 @@
 		rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
 		if err != nil {
 			cancel()
-			return coreutils.DoneResponse(), err
+			return coreutils.DoneResponse(ctx), err
 		}
 		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
 	}
@@ -569,7 +569,7 @@
 	if err != nil {
 		return err
 	}
-	if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, response); res != nil {
+	if res := coreutils.WaitForNilOrErrorResponses(ctx, agent.defaultTimeout, response); res != nil {
 		return status.Errorf(codes.Aborted, "errors-%s", res)
 	}
 	return nil
@@ -586,7 +586,7 @@
 
 	// If an existing flow has the uniPort as an InPort or OutPort or as a Tunnel ID then it needs to be removed
 	for _, flow := range existingFlows.Items {
-		if fu.GetInPort(flow) == uniPort || fu.GetOutPort(flow) == uniPort || fu.GetTunnelId(flow) == uint64(uniPort) {
+		if fu.GetInPort(ctx, flow) == uniPort || fu.GetOutPort(ctx, flow) == uniPort || fu.GetTunnelId(ctx, flow) == uint64(uniPort) {
 			flowsToDelete = append(flowsToDelete, flow)
 		}
 	}
@@ -599,7 +599,7 @@
 	if err != nil {
 		return err
 	}
-	if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, response); res != nil {
+	if res := coreutils.WaitForNilOrErrorResponses(ctx, agent.defaultTimeout, response); res != nil {
 		return status.Errorf(codes.Aborted, "errors-%s", res)
 	}
 	return nil
@@ -610,21 +610,21 @@
 
 	if (len(updatedFlows) | len(updatedGroups)) == 0 {
 		logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows, "groups": updatedGroups})
-		return coreutils.DoneResponse(), nil
+		return coreutils.DoneResponse(ctx), nil
 	}
 
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
-		return coreutils.DoneResponse(), err
+		return coreutils.DoneResponse(ctx), err
 	}
-	defer agent.requestQueue.RequestComplete()
+	defer agent.requestQueue.RequestComplete(ctx)
 
-	device := agent.getDeviceWithoutLock()
+	device := agent.getDeviceWithoutLock(ctx)
 	if device.OperStatus != voltha.OperStatus_ACTIVE || device.ConnectStatus != voltha.ConnectStatus_REACHABLE || device.AdminState != voltha.AdminState_ENABLED {
-		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "invalid device states")
+		return coreutils.DoneResponse(ctx), status.Errorf(codes.FailedPrecondition, "invalid device states")
 	}
 	dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
 	if err != nil {
-		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
+		return coreutils.DoneResponse(ctx), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
 	}
 
 	existingFlows := proto.Clone(device.Flows).(*voltha.Flows)
@@ -632,7 +632,7 @@
 
 	if len(updatedGroups) != 0 && reflect.DeepEqual(existingGroups.Items, updatedGroups) && len(updatedFlows) != 0 && reflect.DeepEqual(existingFlows.Items, updatedFlows) {
 		logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows, "groups": updatedGroups})
-		return coreutils.DoneResponse(), nil
+		return coreutils.DoneResponse(ctx), nil
 	}
 
 	logger.Debugw("updating-flows-and-groups",
@@ -646,17 +646,17 @@
 	device.Flows = &voltha.Flows{Items: updatedFlows}
 	device.FlowGroups = &voltha.FlowGroups{Items: updatedGroups}
 	if err := agent.updateDeviceWithoutLock(ctx, device); err != nil {
-		return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-%s", agent.deviceID)
+		return coreutils.DoneResponse(ctx), status.Errorf(codes.Internal, "failure-updating-%s", agent.deviceID)
 	}
 
 	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
-	response := coreutils.NewResponse()
+	response := coreutils.NewResponse(ctx)
 	// Process bulk flow update differently than incremental update
 	if !dType.AcceptsAddRemoveFlowUpdates {
 		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: updatedFlows}, &voltha.FlowGroups{Items: updatedGroups}, nil)
 		if err != nil {
 			cancel()
-			return coreutils.DoneResponse(), err
+			return coreutils.DoneResponse(ctx), err
 		}
 		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
 	} else {
@@ -667,24 +667,24 @@
 
 		// Process flows
 		for _, flow := range updatedFlows {
-			if idx := fu.FindFlows(existingFlows.Items, flow); idx == -1 {
+			if idx := fu.FindFlows(ctx, existingFlows.Items, flow); idx == -1 {
 				flowsToAdd = append(flowsToAdd, flow)
 			}
 		}
 		for _, flow := range existingFlows.Items {
-			if idx := fu.FindFlows(updatedFlows, flow); idx != -1 {
+			if idx := fu.FindFlows(ctx, updatedFlows, flow); idx != -1 {
 				flowsToDelete = append(flowsToDelete, flow)
 			}
 		}
 
 		// Process groups
 		for _, g := range updatedGroups {
-			if fu.FindGroup(existingGroups.Items, g.Desc.GroupId) == -1 { // does not exist now
+			if fu.FindGroup(ctx, existingGroups.Items, g.Desc.GroupId) == -1 { // does not exist now
 				groupsToAdd = append(groupsToAdd, g)
 			}
 		}
 		for _, group := range existingGroups.Items {
-			if fu.FindGroup(updatedGroups, group.Desc.GroupId) != -1 { // does not exist now
+			if fu.FindGroup(ctx, updatedGroups, group.Desc.GroupId) != -1 { // does not exist now
 				groupsToDelete = append(groupsToDelete, group)
 			}
 		}
@@ -702,7 +702,7 @@
 		if (len(flowsToAdd) | len(flowsToDelete) | len(groupsToAdd) | len(groupsToDelete) | len(updatedGroups)) == 0 {
 			logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows, "groups": updatedGroups})
 			cancel()
-			return coreutils.DoneResponse(), nil
+			return coreutils.DoneResponse(ctx), nil
 		}
 
 		flowChanges := &ofp.FlowChanges{
@@ -717,7 +717,7 @@
 		rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
 		if err != nil {
 			cancel()
-			return coreutils.DoneResponse(), err
+			return coreutils.DoneResponse(ctx), err
 		}
 		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
 	}
@@ -732,7 +732,7 @@
 	if err != nil {
 		return err
 	}
-	if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, response); res != nil {
+	if res := coreutils.WaitForNilOrErrorResponses(ctx, agent.defaultTimeout, response); res != nil {
 		return status.Errorf(codes.Aborted, "errors-%s", res)
 	}
 	return nil
@@ -744,9 +744,9 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
-	defer agent.requestQueue.RequestComplete()
+	defer agent.requestQueue.RequestComplete(ctx)
 
-	device := agent.getDeviceWithoutLock()
+	device := agent.getDeviceWithoutLock(ctx)
 	// purge all flows on the device by setting it to nil
 	device.Flows = &ofp.Flows{Items: nil}
 	if err := agent.updateDeviceWithoutLock(ctx, device); err != nil {
@@ -761,10 +761,10 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
-	defer agent.requestQueue.RequestComplete()
+	defer agent.requestQueue.RequestComplete(ctx)
 	logger.Debugw("disableDevice", log.Fields{"device-id": agent.deviceID})
 
-	cloned := agent.getDeviceWithoutLock()
+	cloned := agent.getDeviceWithoutLock(ctx)
 
 	if cloned.AdminState == voltha.AdminState_DISABLED {
 		logger.Debugw("device-already-disabled", log.Fields{"id": agent.deviceID})
@@ -795,10 +795,10 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
-	defer agent.requestQueue.RequestComplete()
+	defer agent.requestQueue.RequestComplete(ctx)
 	logger.Debugw("rebootDevice", log.Fields{"device-id": agent.deviceID})
 
-	device := agent.getDeviceWithoutLock()
+	device := agent.getDeviceWithoutLock(ctx)
 	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
 	ch, err := agent.adapterProxy.RebootDevice(subCtx, device)
 	if err != nil {
@@ -814,9 +814,9 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
-	defer agent.requestQueue.RequestComplete()
+	defer agent.requestQueue.RequestComplete(ctx)
 
-	cloned := agent.getDeviceWithoutLock()
+	cloned := agent.getDeviceWithoutLock(ctx)
 
 	previousState := cloned.AdminState
 
@@ -844,11 +844,11 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
-	defer agent.requestQueue.RequestComplete()
+	defer agent.requestQueue.RequestComplete(ctx)
 
 	logger.Debugw("setParentId", log.Fields{"device-id": device.Id, "parent-id": parentID})
 
-	cloned := agent.getDeviceWithoutLock()
+	cloned := agent.getDeviceWithoutLock(ctx)
 	cloned.ParentId = parentID
 	// Store the device
 	if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
@@ -862,10 +862,10 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
-	defer agent.requestQueue.RequestComplete()
+	defer agent.requestQueue.RequestComplete(ctx)
 	logger.Debugw("updatePmConfigs", log.Fields{"device-id": pmConfigs.Id})
 
-	cloned := agent.getDeviceWithoutLock()
+	cloned := agent.getDeviceWithoutLock(ctx)
 	cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
 	// Store the device
 	if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
@@ -886,10 +886,10 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
-	defer agent.requestQueue.RequestComplete()
+	defer agent.requestQueue.RequestComplete(ctx)
 	logger.Debugw("initPmConfigs", log.Fields{"device-id": pmConfigs.Id})
 
-	cloned := agent.getDeviceWithoutLock()
+	cloned := agent.getDeviceWithoutLock(ctx)
 	cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
 	updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
 	return agent.updateDeviceInStoreWithoutLock(updateCtx, cloned, false, "")
@@ -899,21 +899,21 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return nil, err
 	}
-	defer agent.requestQueue.RequestComplete()
+	defer agent.requestQueue.RequestComplete(ctx)
 	logger.Debugw("listPmConfigs", log.Fields{"device-id": agent.deviceID})
 
-	return agent.getDeviceWithoutLock().PmConfigs, nil
+	return agent.getDeviceWithoutLock(ctx).PmConfigs, nil
 }
 
 func (agent *Agent) downloadImage(ctx context.Context, img *voltha.ImageDownload) (*voltha.OperationResp, error) {
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return nil, err
 	}
-	defer agent.requestQueue.RequestComplete()
+	defer agent.requestQueue.RequestComplete(ctx)
 
 	logger.Debugw("downloadImage", log.Fields{"device-id": agent.deviceID})
 
-	device := agent.getDeviceWithoutLock()
+	device := agent.getDeviceWithoutLock(ctx)
 
 	if device.AdminState != voltha.AdminState_ENABLED {
 		return nil, status.Errorf(codes.FailedPrecondition, "device-id:%s, expected-admin-state:%s", agent.deviceID, voltha.AdminState_ENABLED)
@@ -954,7 +954,7 @@
 }
 
 // isImageRegistered is a helper method to figure out if an image is already registered
-func isImageRegistered(img *voltha.ImageDownload, device *voltha.Device) bool {
+func isImageRegistered(ctx context.Context, img *voltha.ImageDownload, device *voltha.Device) bool {
 	for _, image := range device.ImageDownloads {
 		if image.Id == img.Id && image.Name == img.Name {
 			return true
@@ -967,14 +967,14 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return nil, err
 	}
-	defer agent.requestQueue.RequestComplete()
+	defer agent.requestQueue.RequestComplete(ctx)
 
 	logger.Debugw("cancelImageDownload", log.Fields{"device-id": agent.deviceID})
 
-	device := agent.getDeviceWithoutLock()
+	device := agent.getDeviceWithoutLock(ctx)
 
 	// Verify whether the Image is in the list of image being downloaded
-	if !isImageRegistered(img, device) {
+	if !isImageRegistered(ctx, img, device) {
 		return nil, status.Errorf(codes.FailedPrecondition, "device-id:%s, image-not-registered:%s", agent.deviceID, img.Name)
 	}
 
@@ -1005,12 +1005,12 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return nil, err
 	}
-	defer agent.requestQueue.RequestComplete()
+	defer agent.requestQueue.RequestComplete(ctx)
 	logger.Debugw("activateImage", log.Fields{"device-id": agent.deviceID})
-	cloned := agent.getDeviceWithoutLock()
+	cloned := agent.getDeviceWithoutLock(ctx)
 
 	// Verify whether the Image is in the list of image being downloaded
-	if !isImageRegistered(img, cloned) {
+	if !isImageRegistered(ctx, img, cloned) {
 		return nil, status.Errorf(codes.FailedPrecondition, "device-id:%s, image-not-registered:%s", agent.deviceID, img.Name)
 	}
 
@@ -1045,13 +1045,13 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return nil, err
 	}
-	defer agent.requestQueue.RequestComplete()
+	defer agent.requestQueue.RequestComplete(ctx)
 	logger.Debugw("revertImage", log.Fields{"device-id": agent.deviceID})
 
-	cloned := agent.getDeviceWithoutLock()
+	cloned := agent.getDeviceWithoutLock(ctx)
 
 	// Verify whether the Image is in the list of image being downloaded
-	if !isImageRegistered(img, cloned) {
+	if !isImageRegistered(ctx, img, cloned) {
 		return nil, status.Errorf(codes.FailedPrecondition, "deviceId:%s, image-not-registered:%s", agent.deviceID, img.Name)
 	}
 
@@ -1086,9 +1086,9 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return nil, err
 	}
-	device := agent.getDeviceWithoutLock()
+	device := agent.getDeviceWithoutLock(ctx)
 	ch, err := agent.adapterProxy.GetImageDownloadStatus(ctx, device, img)
-	agent.requestQueue.RequestComplete()
+	agent.requestQueue.RequestComplete(ctx)
 	if err != nil {
 		return nil, err
 	}
@@ -1112,10 +1112,10 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
-	defer agent.requestQueue.RequestComplete()
+	defer agent.requestQueue.RequestComplete(ctx)
 	logger.Debugw("updating-image-download", log.Fields{"device-id": agent.deviceID, "img": img})
 
-	cloned := agent.getDeviceWithoutLock()
+	cloned := agent.getDeviceWithoutLock(ctx)
 
 	// Update the image as well as remove it if the download was cancelled
 	clonedImages := make([]*voltha.ImageDownload, len(cloned.ImageDownloads))
@@ -1140,10 +1140,10 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return nil, err
 	}
-	defer agent.requestQueue.RequestComplete()
+	defer agent.requestQueue.RequestComplete(ctx)
 	logger.Debugw("getImageDownload", log.Fields{"device-id": agent.deviceID})
 
-	cloned := agent.getDeviceWithoutLock()
+	cloned := agent.getDeviceWithoutLock(ctx)
 	for _, image := range cloned.ImageDownloads {
 		if image.Id == img.Id && image.Name == img.Name {
 			return image, nil
@@ -1156,10 +1156,10 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return nil, err
 	}
-	defer agent.requestQueue.RequestComplete()
+	defer agent.requestQueue.RequestComplete(ctx)
 	logger.Debugw("listImageDownloads", log.Fields{"device-id": agent.deviceID})
 
-	return &voltha.ImageDownloads{Items: agent.getDeviceWithoutLock().ImageDownloads}, nil
+	return &voltha.ImageDownloads{Items: agent.getDeviceWithoutLock(ctx).ImageDownloads}, nil
 }
 
 // getPorts retrieves the ports information of the device based on the port type.
@@ -1232,7 +1232,7 @@
 	return portCap, nil
 }
 
-func (agent *Agent) onPacketFailure(rpc string, response interface{}, args ...interface{}) {
+func (agent *Agent) onPacketFailure(ctx context.Context, rpc string, response interface{}, args ...interface{}) {
 	// packet data is encoded in the args param as the first parameter
 	var packet []byte
 	if len(args) >= 1 {
@@ -1270,8 +1270,8 @@
 
 // updatePartialDeviceData updates a subset of a device that an Adapter can update.
 // TODO:  May need a specific proto to handle only a subset of a device that can be changed by an adapter
-func (agent *Agent) mergeDeviceInfoFromAdapter(device *voltha.Device) (*voltha.Device, error) {
-	cloned := agent.getDeviceWithoutLock()
+func (agent *Agent) mergeDeviceInfoFromAdapter(ctx context.Context, device *voltha.Device) (*voltha.Device, error) {
+	cloned := agent.getDeviceWithoutLock(ctx)
 	cloned.Root = device.Root
 	cloned.Vendor = device.Vendor
 	cloned.Model = device.Model
@@ -1286,10 +1286,10 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
-	defer agent.requestQueue.RequestComplete()
+	defer agent.requestQueue.RequestComplete(ctx)
 	logger.Debugw("updateDeviceUsingAdapterData", log.Fields{"device-id": device.Id})
 
-	updatedDevice, err := agent.mergeDeviceInfoFromAdapter(device)
+	updatedDevice, err := agent.mergeDeviceInfoFromAdapter(ctx, device)
 	if err != nil {
 		return status.Errorf(codes.Internal, "%s", err.Error())
 	}
@@ -1308,9 +1308,9 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
-	defer agent.requestQueue.RequestComplete()
+	defer agent.requestQueue.RequestComplete(ctx)
 
-	cloned := agent.getDeviceWithoutLock()
+	cloned := agent.getDeviceWithoutLock(ctx)
 
 	newConnStatus, newOperStatus := cloned.ConnectStatus, cloned.OperStatus
 	// Ensure the enums passed in are valid - they will be invalid if they are not set when this function is invoked
@@ -1332,8 +1332,8 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
-	defer agent.requestQueue.RequestComplete()
-	cloned := agent.getDeviceWithoutLock()
+	defer agent.requestQueue.RequestComplete(ctx)
+	cloned := agent.getDeviceWithoutLock(ctx)
 	for _, port := range cloned.Ports {
 		port.OperStatus = operStatus
 	}
@@ -1345,10 +1345,10 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
-	defer agent.requestQueue.RequestComplete()
+	defer agent.requestQueue.RequestComplete(ctx)
 	// Work only on latest data
 	// TODO: Get list of ports from device directly instead of the entire device
-	cloned := agent.getDeviceWithoutLock()
+	cloned := agent.getDeviceWithoutLock(ctx)
 
 	// Ensure the enums passed in are valid - they will be invalid if they are not set when this function is invoked
 	if _, ok := voltha.Port_PortType_value[portType.String()]; !ok {
@@ -1369,9 +1369,9 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
-	defer agent.requestQueue.RequestComplete()
+	defer agent.requestQueue.RequestComplete(ctx)
 
-	cloned := agent.getDeviceWithoutLock()
+	cloned := agent.getDeviceWithoutLock(ctx)
 
 	if cloned.AdminState != voltha.AdminState_DISABLED && cloned.AdminState != voltha.AdminState_DELETED {
 		err := status.Error(codes.FailedPrecondition, fmt.Sprintf("invalid-state-%v", cloned.AdminState))
@@ -1393,10 +1393,10 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
-	defer agent.requestQueue.RequestComplete()
+	defer agent.requestQueue.RequestComplete(ctx)
 	logger.Debugw("addPort", log.Fields{"deviceId": agent.deviceID})
 
-	cloned := agent.getDeviceWithoutLock()
+	cloned := agent.getDeviceWithoutLock(ctx)
 	updatePort := false
 	if cloned.Ports == nil {
 		//	First port
@@ -1432,10 +1432,10 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
-	defer agent.requestQueue.RequestComplete()
+	defer agent.requestQueue.RequestComplete(ctx)
 	logger.Debugw("adding-peer-peerPort", log.Fields{"device-id": agent.deviceID, "peer-peerPort": peerPort})
 
-	cloned := agent.getDeviceWithoutLock()
+	cloned := agent.getDeviceWithoutLock(ctx)
 
 	// Get the peer port on the device based on the peerPort no
 	found := false
@@ -1471,12 +1471,12 @@
 		logger.Warnw("request-aborted", log.Fields{"device-id": agent.deviceID, "name": name, "error": err})
 		return
 	}
-	defer agent.requestQueue.RequestComplete()
+	defer agent.requestQueue.RequestComplete(ctx)
 	if value == nil {
 		return
 	}
 
-	cloned := agent.getDeviceWithoutLock()
+	cloned := agent.getDeviceWithoutLock(ctx)
 	updated := false
 	s := reflect.ValueOf(cloned).Elem()
 	if s.Kind() == reflect.Struct {
@@ -1508,10 +1508,10 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
-	defer agent.requestQueue.RequestComplete()
+	defer agent.requestQueue.RequestComplete(ctx)
 	logger.Debugw("simulateAlarm", log.Fields{"id": agent.deviceID})
 
-	cloned := agent.getDeviceWithoutLock()
+	cloned := agent.getDeviceWithoutLock(ctx)
 
 	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
 	ch, err := agent.adapterProxy.SimulateAlarm(subCtx, cloned, simulateReq)
@@ -1530,7 +1530,7 @@
 	connectStatus voltha.ConnectStatus_Types,
 	operStatus voltha.OperStatus_Types,
 ) error {
-	previousState := getDeviceStates(device)
+	previousState := getDeviceStates(ctx, device)
 	device.AdminState, device.ConnectStatus, device.OperStatus = adminState, connectStatus, operStatus
 
 	if err := agent.updateDeviceInStoreWithoutLock(ctx, device, false, ""); err != nil {
@@ -1567,9 +1567,9 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
-	defer agent.requestQueue.RequestComplete()
+	defer agent.requestQueue.RequestComplete(ctx)
 
-	cloned := agent.getDeviceWithoutLock()
+	cloned := agent.getDeviceWithoutLock(ctx)
 	cloned.Reason = reason
 	logger.Debugw("updateDeviceReason", log.Fields{"deviceId": cloned.Id, "reason": cloned.Reason})
 	// Store the device
@@ -1580,11 +1580,11 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
-	defer agent.requestQueue.RequestComplete()
+	defer agent.requestQueue.RequestComplete(ctx)
 	logger.Debugw("disablePort", log.Fields{"device-id": agent.deviceID, "port-no": Port.PortNo})
 	var cp *voltha.Port
 	// Get the most up to date the device info
-	device := agent.getDeviceWithoutLock()
+	device := agent.getDeviceWithoutLock(ctx)
 	for _, port := range device.Ports {
 		if port.PortNo == Port.PortNo {
 			port.AdminState = voltha.AdminState_DISABLED
@@ -1621,12 +1621,12 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
-	defer agent.requestQueue.RequestComplete()
+	defer agent.requestQueue.RequestComplete(ctx)
 	logger.Debugw("enablePort", log.Fields{"device-id": agent.deviceID, "port-no": Port.PortNo})
 
 	var cp *voltha.Port
 	// Get the most up to date the device info
-	device := agent.getDeviceWithoutLock()
+	device := agent.getDeviceWithoutLock(ctx)
 	for _, port := range device.Ports {
 		if port.PortNo == Port.PortNo {
 			port.AdminState = voltha.AdminState_ENABLED
@@ -1662,12 +1662,12 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
-	defer agent.requestQueue.RequestComplete()
+	defer agent.requestQueue.RequestComplete(ctx)
 
 	logger.Debugw("childDeviceLost", log.Fields{"child-device-id": device.Id, "parent-device-ud": agent.deviceID})
 
 	//Remove the associated peer ports on the parent device
-	parentDevice := agent.getDeviceWithoutLock()
+	parentDevice := agent.getDeviceWithoutLock(ctx)
 	var updatedPeers []*voltha.Port_PeerPort
 	for _, port := range parentDevice.Ports {
 		updatedPeers = make([]*voltha.Port_PeerPort, 0)
@@ -1698,12 +1698,12 @@
 		return nil, err
 	}
 
-	device := agent.getDeviceWithoutLock()
+	device := agent.getDeviceWithoutLock(ctx)
 
 	if device.Adapter == "" {
-		adapterName, err := agent.adapterMgr.GetAdapterType(device.Type)
+		adapterName, err := agent.adapterMgr.GetAdapterType(ctx, device.Type)
 		if err != nil {
-			agent.requestQueue.RequestComplete()
+			agent.requestQueue.RequestComplete(ctx)
 			return nil, err
 		}
 		device.Adapter = adapterName
@@ -1711,7 +1711,7 @@
 
 	// Send request to the adapter
 	ch, err := agent.adapterProxy.StartOmciTest(ctx, device, omcitestrequest)
-	agent.requestQueue.RequestComplete()
+	agent.requestQueue.RequestComplete(ctx)
 	if err != nil {
 		return nil, err
 	}
@@ -1742,7 +1742,7 @@
 
 	//send request to adapter
 	ch, err := agent.adapterProxy.GetExtValue(ctx, pdevice, cdevice, valueparam.Id, valueparam.Value)
-	agent.requestQueue.RequestComplete()
+	agent.requestQueue.RequestComplete(ctx)
 	if err != nil {
 		return nil, err
 	}
diff --git a/rw_core/core/device/agent_test.go b/rw_core/core/device/agent_test.go
index 2abfdeb..78353a7 100755
--- a/rw_core/core/device/agent_test.go
+++ b/rw_core/core/device/agent_test.go
@@ -69,7 +69,7 @@
 		logger.Fatal(err)
 	}
 	// Create the kafka client
-	test.kClient = mock_kafka.NewKafkaClient()
+	test.kClient = mock_kafka.NewKafkaClient(context.Background())
 	test.oltAdapterName = "olt_adapter_mock"
 	test.onuAdapterName = "onu_adapter_mock"
 	test.coreInstanceID = "rw-da-test"
@@ -114,6 +114,7 @@
 	cfg.DefaultRequestTimeout = dat.defaultTimeout
 	cfg.KVStorePort = dat.kvClientPort
 	cfg.InCompetingMode = inCompeteMode
+	ctx := context.Background()
 	grpcPort, err := freeport.GetFreePort()
 	if err != nil {
 		logger.Fatal("Cannot get a freeport for grpc")
@@ -130,18 +131,19 @@
 		LivenessChannelInterval: cfg.LiveProbeInterval / 2,
 		PathPrefix:              cfg.KVStoreDataPrefix}
 	dat.kmp = kafka.NewInterContainerProxy(
-		kafka.InterContainerHost(cfg.KafkaAdapterHost),
-		kafka.InterContainerPort(cfg.KafkaAdapterPort),
-		kafka.MsgClient(dat.kClient),
-		kafka.DefaultTopic(&kafka.Topic{Name: cfg.CoreTopic}),
-		kafka.DeviceDiscoveryTopic(&kafka.Topic{Name: cfg.AffinityRouterTopic}))
+		ctx,
+		kafka.InterContainerHost(ctx, cfg.KafkaAdapterHost),
+		kafka.InterContainerPort(ctx, cfg.KafkaAdapterPort),
+		kafka.MsgClient(ctx, dat.kClient),
+		kafka.DefaultTopic(ctx, &kafka.Topic{Name: cfg.CoreTopic}),
+		kafka.DeviceDiscoveryTopic(ctx, &kafka.Topic{Name: cfg.AffinityRouterTopic}))
 
-	endpointMgr := kafka.NewEndpointManager(backend)
-	proxy := model.NewProxy(backend, "/")
-	adapterMgr := adapter.NewAdapterManager(proxy, dat.coreInstanceID, dat.kClient)
+	endpointMgr := kafka.NewEndpointManager(ctx, backend)
+	proxy := model.NewProxy(ctx, backend, "/")
+	adapterMgr := adapter.NewAdapterManager(ctx, proxy, dat.coreInstanceID, dat.kClient)
 
-	dat.deviceMgr, dat.logicalDeviceMgr = NewManagers(proxy, adapterMgr, dat.kmp, endpointMgr, cfg.CorePairTopic, dat.coreInstanceID, cfg.DefaultCoreTimeout)
-	if err = dat.kmp.Start(); err != nil {
+	dat.deviceMgr, dat.logicalDeviceMgr = NewManagers(ctx, proxy, adapterMgr, dat.kmp, endpointMgr, cfg.CorePairTopic, dat.coreInstanceID, cfg.DefaultCoreTimeout)
+	if err = dat.kmp.Start(ctx); err != nil {
 		logger.Fatal("Cannot start InterContainerProxy")
 	}
 	adapterMgr.Start(context.Background())
@@ -149,10 +151,10 @@
 
 func (dat *DATest) stopAll() {
 	if dat.kClient != nil {
-		dat.kClient.Stop()
+		dat.kClient.Stop(context.Background())
 	}
 	if dat.kmp != nil {
-		dat.kmp.Stop()
+		dat.kmp.Stop(context.Background())
 	}
 	if dat.etcdServer != nil {
 		stopEmbeddedEtcdServer(dat.etcdServer)
@@ -169,7 +171,7 @@
 	if err != nil {
 		return nil, 0, err
 	}
-	etcdServer := mock_etcd.StartEtcdServer(mock_etcd.MKConfig(configName, kvClientPort, peerPort, storageDir, logLevel))
+	etcdServer := mock_etcd.StartEtcdServer(context.Background(), mock_etcd.MKConfig(context.Background(), configName, kvClientPort, peerPort, storageDir, logLevel))
 	if etcdServer == nil {
 		return nil, 0, status.Error(codes.Internal, "Embedded server failed to start")
 	}
@@ -178,13 +180,13 @@
 
 func stopEmbeddedEtcdServer(server *mock_etcd.EtcdServer) {
 	if server != nil {
-		server.Stop()
+		server.Stop(context.Background())
 	}
 }
 
 func setupKVClient(cf *config.RWCoreFlags, coreInstanceID string) kvstore.Client {
 	addr := cf.KVStoreHost + ":" + strconv.Itoa(cf.KVStorePort)
-	client, err := kvstore.NewEtcdClient(addr, cf.KVStoreTimeout, log.FatalLevel)
+	client, err := kvstore.NewEtcdClient(context.Background(), addr, cf.KVStoreTimeout, log.FatalLevel)
 	if err != nil {
 		panic("no kv client")
 	}
@@ -194,11 +196,11 @@
 func (dat *DATest) createDeviceAgent(t *testing.T) *Agent {
 	deviceMgr := dat.deviceMgr
 	clonedDevice := proto.Clone(dat.device).(*voltha.Device)
-	deviceAgent := newAgent(deviceMgr.adapterProxy, clonedDevice, deviceMgr, deviceMgr.clusterDataProxy, deviceMgr.defaultTimeout)
+	deviceAgent := newAgent(context.Background(), deviceMgr.adapterProxy, clonedDevice, deviceMgr, deviceMgr.clusterDataProxy, deviceMgr.defaultTimeout)
 	d, err := deviceAgent.start(context.TODO(), clonedDevice)
 	assert.Nil(t, err)
 	assert.NotNil(t, d)
-	deviceMgr.addDeviceAgentToMap(deviceAgent)
+	deviceMgr.addDeviceAgentToMap(context.Background(), deviceAgent)
 	return deviceAgent
 }
 
@@ -336,7 +338,7 @@
 	expectedNewFlows := []*ofp.OfpFlowStats{}
 	expectedFlowsToDelete := []*ofp.OfpFlowStats{}
 	expectedUpdatedAllFlows := []*ofp.OfpFlowStats{}
-	uNF, fD, uAF := flowsToUpdateToDelete(newFlows, existingFlows)
+	uNF, fD, uAF := flowsToUpdateToDelete(context.Background(), newFlows, existingFlows)
 	assert.True(t, isFlowSliceEqual(uNF, expectedNewFlows))
 	assert.True(t, isFlowSliceEqual(fD, expectedFlowsToDelete))
 	assert.True(t, isFlowSliceEqual(uAF, expectedUpdatedAllFlows))
@@ -360,7 +362,7 @@
 		{Id: 124, TableId: 1240, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1240000, PacketCount: 0},
 		{Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1250000, PacketCount: 0},
 	}
-	uNF, fD, uAF := flowsToUpdateToDelete(newFlows, existingFlows)
+	uNF, fD, uAF := flowsToUpdateToDelete(context.Background(), newFlows, existingFlows)
 	assert.True(t, isFlowSliceEqual(uNF, expectedNewFlows))
 	assert.True(t, isFlowSliceEqual(fD, expectedFlowsToDelete))
 	assert.True(t, isFlowSliceEqual(uAF, expectedUpdatedAllFlows))
@@ -389,7 +391,7 @@
 		{Id: 121, TableId: 1210, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1210000, PacketCount: 0},
 		{Id: 122, TableId: 1220, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1220000, PacketCount: 0},
 	}
-	uNF, fD, uAF := flowsToUpdateToDelete(newFlows, existingFlows)
+	uNF, fD, uAF := flowsToUpdateToDelete(context.Background(), newFlows, existingFlows)
 	assert.True(t, isFlowSliceEqual(uNF, expectedNewFlows))
 	assert.True(t, isFlowSliceEqual(fD, expectedFlowsToDelete))
 	assert.True(t, isFlowSliceEqual(uAF, expectedUpdatedAllFlows))
@@ -429,7 +431,7 @@
 		{Id: 126, TableId: 1260, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1260000, PacketCount: 0},
 		{Id: 127, TableId: 1270, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1270000, PacketCount: 0},
 	}
-	uNF, fD, uAF := flowsToUpdateToDelete(newFlows, existingFlows)
+	uNF, fD, uAF := flowsToUpdateToDelete(context.Background(), newFlows, existingFlows)
 	assert.True(t, isFlowSliceEqual(uNF, expectedNewFlows))
 	assert.True(t, isFlowSliceEqual(fD, expectedFlowsToDelete))
 	assert.True(t, isFlowSliceEqual(uAF, expectedUpdatedAllFlows))
@@ -441,7 +443,7 @@
 	expectedNewGroups := []*ofp.OfpGroupEntry{}
 	expectedGroupsToDelete := []*ofp.OfpGroupEntry{}
 	expectedUpdatedAllGroups := []*ofp.OfpGroupEntry{}
-	uNG, gD, uAG := groupsToUpdateToDelete(newGroups, existingGroups)
+	uNG, gD, uAG := groupsToUpdateToDelete(context.Background(), newGroups, existingGroups)
 	assert.True(t, isGroupSliceEqual(uNG, expectedNewGroups))
 	assert.True(t, isGroupSliceEqual(gD, expectedGroupsToDelete))
 	assert.True(t, isGroupSliceEqual(uAG, expectedUpdatedAllGroups))
@@ -462,7 +464,7 @@
 		{Desc: &ofp.OfpGroupDesc{Type: 1, GroupId: 10, Buckets: nil}},
 		{Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: nil}},
 	}
-	uNG, gD, uAG := groupsToUpdateToDelete(newGroups, existingGroups)
+	uNG, gD, uAG := groupsToUpdateToDelete(context.Background(), newGroups, existingGroups)
 	assert.True(t, isGroupSliceEqual(uNG, expectedNewGroups))
 	assert.True(t, isGroupSliceEqual(gD, expectedGroupsToDelete))
 	assert.True(t, isGroupSliceEqual(uAG, expectedUpdatedAllGroups))
@@ -488,7 +490,7 @@
 		{Desc: &ofp.OfpGroupDesc{Type: 3, GroupId: 30, Buckets: nil}},
 		{Desc: &ofp.OfpGroupDesc{Type: 4, GroupId: 40, Buckets: nil}},
 	}
-	uNG, gD, uAG := groupsToUpdateToDelete(newGroups, existingGroups)
+	uNG, gD, uAG := groupsToUpdateToDelete(context.Background(), newGroups, existingGroups)
 	assert.True(t, isGroupSliceEqual(uNG, expectedNewGroups))
 	assert.True(t, isGroupSliceEqual(gD, expectedGroupsToDelete))
 	assert.True(t, isGroupSliceEqual(uAG, expectedUpdatedAllGroups))
@@ -517,7 +519,7 @@
 		{Desc: &ofp.OfpGroupDesc{Type: 3, GroupId: 30, Buckets: nil}},
 		{Desc: &ofp.OfpGroupDesc{Type: 4, GroupId: 40, Buckets: nil}},
 	}
-	uNG, gD, uAG := groupsToUpdateToDelete(newGroups, existingGroups)
+	uNG, gD, uAG := groupsToUpdateToDelete(context.Background(), newGroups, existingGroups)
 	assert.True(t, isGroupSliceEqual(uNG, expectedNewGroups))
 	assert.True(t, isGroupSliceEqual(gD, expectedGroupsToDelete))
 	assert.True(t, isGroupSliceEqual(uAG, expectedUpdatedAllGroups))
diff --git a/rw_core/core/device/event/event.go b/rw_core/core/device/event/event.go
index c205564..829fc19 100644
--- a/rw_core/core/device/event/event.go
+++ b/rw_core/core/device/event/event.go
@@ -17,6 +17,7 @@
 package event
 
 import (
+	"context"
 	"encoding/hex"
 	"github.com/golang/protobuf/ptypes/empty"
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
@@ -32,7 +33,7 @@
 	changeEventQueueDone chan bool
 }
 
-func NewManager() *Manager {
+func NewManager(ctx context.Context) *Manager {
 	return &Manager{
 		packetInQueue:        make(chan openflow_13.PacketIn, 100),
 		packetInQueueDone:    make(chan bool, 1),
@@ -41,7 +42,7 @@
 	}
 }
 
-func (q *Manager) SendPacketIn(deviceID string, transationID string, packet *openflow_13.OfpPacketIn) {
+func (q *Manager) SendPacketIn(ctx context.Context, deviceID string, transationID string, packet *openflow_13.OfpPacketIn) {
 	// TODO: Augment the OF PacketIn to include the transactionId
 	packetIn := openflow_13.PacketIn{Id: deviceID, PacketIn: packet}
 	logger.Debugw("SendPacketIn", log.Fields{"packetIn": packetIn})
@@ -58,7 +59,7 @@
 
 var streamingTracker = &streamTracker{calls: make(map[string]*callTracker)}
 
-func (q *Manager) getStreamingTracker(method string, done chan<- bool) *callTracker {
+func (q *Manager) getStreamingTracker(ctx context.Context, method string, done chan<- bool) *callTracker {
 	streamingTracker.Lock()
 	defer streamingTracker.Unlock()
 	if _, ok := streamingTracker.calls[method]; ok {
@@ -72,7 +73,7 @@
 	return streamingTracker.calls[method]
 }
 
-func (q *Manager) flushFailedPackets(tracker *callTracker) error {
+func (q *Manager) flushFailedPackets(ctx context.Context, tracker *callTracker) error {
 	if tracker.failedPacket != nil {
 		switch tracker.failedPacket.(type) {
 		case openflow_13.PacketIn:
@@ -88,10 +89,11 @@
 
 // ReceivePacketsIn receives packets from adapter
 func (q *Manager) ReceivePacketsIn(_ *empty.Empty, packetsIn voltha.VolthaService_ReceivePacketsInServer) error {
-	var streamingTracker = q.getStreamingTracker("ReceivePacketsIn", q.packetInQueueDone)
+	ctx := context.Background()
+	var streamingTracker = q.getStreamingTracker(ctx, "ReceivePacketsIn", q.packetInQueueDone)
 	logger.Debugw("ReceivePacketsIn-request", log.Fields{"packetsIn": packetsIn})
 
-	err := q.flushFailedPackets(streamingTracker)
+	err := q.flushFailedPackets(ctx, streamingTracker)
 	if err != nil {
 		logger.Errorw("unable-to-flush-failed-packets", log.Fields{"error": err})
 	}
@@ -123,7 +125,7 @@
 	return nil
 }
 
-func (q *Manager) SendChangeEvent(deviceID string, portStatus *openflow_13.OfpPortStatus) {
+func (q *Manager) SendChangeEvent(ctx context.Context, deviceID string, portStatus *openflow_13.OfpPortStatus) {
 	// TODO: validate the type of portStatus parameter
 	//if _, ok := portStatus.(*openflow_13.OfpPortStatus); ok {
 	//}
@@ -134,10 +136,11 @@
 
 // ReceiveChangeEvents receives change in events
 func (q *Manager) ReceiveChangeEvents(_ *empty.Empty, changeEvents voltha.VolthaService_ReceiveChangeEventsServer) error {
-	var streamingTracker = q.getStreamingTracker("ReceiveChangeEvents", q.changeEventQueueDone)
+	ctx := context.Background()
+	var streamingTracker = q.getStreamingTracker(ctx, "ReceiveChangeEvents", q.changeEventQueueDone)
 	logger.Debugw("ReceiveChangeEvents-request", log.Fields{"changeEvents": changeEvents})
 
-	err := q.flushFailedPackets(streamingTracker)
+	err := q.flushFailedPackets(ctx, streamingTracker)
 	if err != nil {
 		logger.Errorw("unable-to-flush-failed-packets", log.Fields{"error": err})
 	}
@@ -167,6 +170,6 @@
 	return nil
 }
 
-func (q *Manager) GetChangeEventsQueueForTest() <-chan openflow_13.ChangeEvent {
+func (q *Manager) GetChangeEventsQueueForTest(ctx context.Context) <-chan openflow_13.ChangeEvent {
 	return q.changeEventQueue
 }
diff --git a/rw_core/core/device/logical_agent.go b/rw_core/core/device/logical_agent.go
index 15eb677..34bad6b 100644
--- a/rw_core/core/device/logical_agent.go
+++ b/rw_core/core/device/logical_agent.go
@@ -65,7 +65,7 @@
 	groupLock sync.RWMutex
 }
 
-func newLogicalDeviceAgent(id string, sn string, deviceID string, ldeviceMgr *LogicalManager,
+func newLogicalDeviceAgent(ctx context.Context, id string, sn string, deviceID string, ldeviceMgr *LogicalManager,
 	deviceMgr *Manager, cdProxy *model.Proxy, timeout time.Duration) *LogicalAgent {
 	var agent LogicalAgent
 	agent.logicalDeviceID = id
@@ -74,10 +74,10 @@
 	agent.deviceMgr = deviceMgr
 	agent.clusterDataProxy = cdProxy
 	agent.ldeviceMgr = ldeviceMgr
-	agent.flowDecomposer = fd.NewFlowDecomposer(agent.deviceMgr)
+	agent.flowDecomposer = fd.NewFlowDecomposer(ctx, agent.deviceMgr)
 	agent.logicalPortsNo = make(map[uint32]bool)
 	agent.defaultTimeout = timeout
-	agent.requestQueue = coreutils.NewRequestQueue()
+	agent.requestQueue = coreutils.NewRequestQueue(ctx)
 	agent.meters = make(map[uint32]*MeterChunk)
 	agent.flows = make(map[uint64]*FlowChunk)
 	agent.groups = make(map[uint32]*GroupChunk)
@@ -114,7 +114,7 @@
 
 		// Create the datapath ID (uint64) using the logical device ID (based on the MAC Address)
 		var datapathID uint64
-		if datapathID, err = coreutils.CreateDataPathID(agent.serialNumber); err != nil {
+		if datapathID, err = coreutils.CreateDataPathID(ctx, agent.serialNumber); err != nil {
 			return err
 		}
 		ld.DatapathId = datapathID
@@ -159,7 +159,7 @@
 		agent.logicalDevice = proto.Clone(ld).(*voltha.LogicalDevice)
 
 		// Setup the local list of logical ports
-		agent.addLogicalPortsToMap(ld.Ports)
+		agent.addLogicalPortsToMap(ctx, ld.Ports)
 		// load the flows, meters and groups from KV to cache
 		agent.loadFlows(ctx)
 		agent.loadMeters(ctx)
@@ -190,7 +190,7 @@
 			returnErr = err
 			return
 		}
-		defer agent.requestQueue.RequestComplete()
+		defer agent.requestQueue.RequestComplete(ctx)
 
 		//Remove the logical device from the model
 		if err := agent.clusterDataProxy.Remove(ctx, "logical_devices/"+agent.logicalDeviceID); err != nil {
@@ -211,12 +211,12 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return nil, err
 	}
-	defer agent.requestQueue.RequestComplete()
+	defer agent.requestQueue.RequestComplete(ctx)
 	return proto.Clone(agent.logicalDevice).(*voltha.LogicalDevice), nil
 }
 
 // getLogicalDeviceWithoutLock returns a cloned logical device to a function that already holds the agent lock.
-func (agent *LogicalAgent) getLogicalDeviceWithoutLock() *voltha.LogicalDevice {
+func (agent *LogicalAgent) getLogicalDeviceWithoutLock(ctx context.Context) *voltha.LogicalDevice {
 	logger.Debug("getLogicalDeviceWithoutLock")
 	return proto.Clone(agent.logicalDevice).(*voltha.LogicalDevice)
 }
@@ -255,17 +255,17 @@
 	logger.Debugw("send-add-flows-to-device-manager", log.Fields{"logicalDeviceID": agent.logicalDeviceID, "deviceRules": deviceRules, "flowMetadata": flowMetadata})
 
 	responses := make([]coreutils.Response, 0)
-	for deviceID, value := range deviceRules.GetRules() {
-		response := coreutils.NewResponse()
+	for deviceID, value := range deviceRules.GetRules(ctx) {
+		response := coreutils.NewResponse(ctx)
 		responses = append(responses, response)
 		go func(deviceId string, value *fu.FlowsAndGroups) {
 			ctx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
 			defer cancel()
-			if err := agent.deviceMgr.addFlowsAndGroups(ctx, deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
+			if err := agent.deviceMgr.addFlowsAndGroups(ctx, deviceId, value.ListFlows(ctx), value.ListGroups(ctx), flowMetadata); err != nil {
 				logger.Errorw("flow-add-failed", log.Fields{"deviceID": deviceId, "error": err})
-				response.Error(status.Errorf(codes.Internal, "flow-add-failed: %s", deviceId))
+				response.Error(ctx, status.Errorf(codes.Internal, "flow-add-failed: %s", deviceId))
 			}
-			response.Done()
+			response.Done(ctx)
 		}(deviceID, value)
 	}
 	// Return responses (an array of channels) for the caller to wait for a response from the far end.
@@ -276,17 +276,17 @@
 	logger.Debugw("send-delete-flows-to-device-manager", log.Fields{"logicalDeviceID": agent.logicalDeviceID})
 
 	responses := make([]coreutils.Response, 0)
-	for deviceID, value := range deviceRules.GetRules() {
-		response := coreutils.NewResponse()
+	for deviceID, value := range deviceRules.GetRules(ctx) {
+		response := coreutils.NewResponse(ctx)
 		responses = append(responses, response)
 		go func(deviceId string, value *fu.FlowsAndGroups) {
 			ctx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
 			defer cancel()
-			if err := agent.deviceMgr.deleteFlowsAndGroups(ctx, deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
+			if err := agent.deviceMgr.deleteFlowsAndGroups(ctx, deviceId, value.ListFlows(ctx), value.ListGroups(ctx), flowMetadata); err != nil {
 				logger.Errorw("flow-delete-failed", log.Fields{"deviceID": deviceId, "error": err})
-				response.Error(status.Errorf(codes.Internal, "flow-delete-failed: %s", deviceId))
+				response.Error(ctx, status.Errorf(codes.Internal, "flow-delete-failed: %s", deviceId))
 			}
-			response.Done()
+			response.Done(ctx)
 		}(deviceID, value)
 	}
 	return responses
@@ -296,17 +296,17 @@
 	logger.Debugw("send-update-flows-to-device-manager", log.Fields{"logicalDeviceID": agent.logicalDeviceID})
 
 	responses := make([]coreutils.Response, 0)
-	for deviceID, value := range deviceRules.GetRules() {
-		response := coreutils.NewResponse()
+	for deviceID, value := range deviceRules.GetRules(ctx) {
+		response := coreutils.NewResponse(ctx)
 		responses = append(responses, response)
 		go func(deviceId string, value *fu.FlowsAndGroups) {
 			ctx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
 			defer cancel()
-			if err := agent.deviceMgr.updateFlowsAndGroups(ctx, deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
+			if err := agent.deviceMgr.updateFlowsAndGroups(ctx, deviceId, value.ListFlows(ctx), value.ListGroups(ctx), flowMetadata); err != nil {
 				logger.Errorw("flow-update-failed", log.Fields{"deviceID": deviceId, "error": err})
-				response.Error(status.Errorf(codes.Internal, "flow-update-failed: %s", deviceId))
+				response.Error(ctx, status.Errorf(codes.Internal, "flow-update-failed: %s", deviceId))
 			}
-			response.Done()
+			response.Done(ctx)
 		}(deviceID, value)
 	}
 	return responses
@@ -316,13 +316,13 @@
 	logger.Debugw("deleting-flows-from-parent-device", log.Fields{"logical-device-id": agent.logicalDeviceID, "flows": flows})
 	responses := make([]coreutils.Response, 0)
 	for _, flow := range flows.Items {
-		response := coreutils.NewResponse()
+		response := coreutils.NewResponse(ctx)
 		responses = append(responses, response)
-		uniPort, err := agent.getUNILogicalPortNo(flow)
+		uniPort, err := agent.getUNILogicalPortNo(ctx, flow)
 		if err != nil {
 			logger.Error("no-uni-port-in-flow", log.Fields{"deviceID": agent.rootDeviceID, "flow": flow, "error": err})
-			response.Error(err)
-			response.Done()
+			response.Error(ctx, err)
+			response.Done(ctx)
 			continue
 		}
 		logger.Debugw("uni-port", log.Fields{"flows": flows, "uni-port": uniPort})
@@ -331,9 +331,9 @@
 			defer cancel()
 			if err := agent.deviceMgr.deleteParentFlows(ctx, agent.rootDeviceID, uniPort, metadata); err != nil {
 				logger.Error("flow-delete-failed", log.Fields{"device-id": agent.rootDeviceID, "error": err})
-				response.Error(status.Errorf(codes.Internal, "flow-delete-failed: %s %v", agent.rootDeviceID, err))
+				response.Error(ctx, status.Errorf(codes.Internal, "flow-delete-failed: %s %v", agent.rootDeviceID, err))
 			}
-			response.Done()
+			response.Done(ctx)
 		}(uniPort, metadata)
 	}
 	return responses
@@ -344,7 +344,7 @@
 		"packet": hex.EncodeToString(packet.Data),
 		"inPort": packet.GetInPort(),
 	})
-	outPort := fu.GetPacketOutPort(packet)
+	outPort := fu.GetPacketOutPort(ctx, packet)
 	//frame := packet.GetData()
 	//TODO: Use a channel between the logical agent and the device agent
 	if err := agent.deviceMgr.packetOut(ctx, agent.rootDeviceID, outPort, packet); err != nil {
@@ -352,13 +352,13 @@
 	}
 }
 
-func (agent *LogicalAgent) packetIn(port uint32, transactionID string, packet []byte) {
+func (agent *LogicalAgent) packetIn(ctx context.Context, port uint32, transactionID string, packet []byte) {
 	logger.Debugw("packet-in", log.Fields{
 		"port":          port,
 		"packet":        hex.EncodeToString(packet),
 		"transactionId": transactionID,
 	})
-	packetIn := fu.MkPacketIn(port, packet)
-	agent.ldeviceMgr.SendPacketIn(agent.logicalDeviceID, transactionID, packetIn)
+	packetIn := fu.MkPacketIn(ctx, port, packet)
+	agent.ldeviceMgr.SendPacketIn(ctx, agent.logicalDeviceID, transactionID, packetIn)
 	logger.Debugw("sending-packet-in", log.Fields{"packet": hex.EncodeToString(packetIn.Data)})
 }
diff --git a/rw_core/core/device/logical_agent_flow.go b/rw_core/core/device/logical_agent_flow.go
index 5d35251..03bbbdf 100644
--- a/rw_core/core/device/logical_agent_flow.go
+++ b/rw_core/core/device/logical_agent_flow.go
@@ -51,9 +51,9 @@
 	case ofp.OfpFlowModCommand_OFPFC_DELETE_STRICT:
 		return agent.flowDeleteStrict(ctx, flow)
 	case ofp.OfpFlowModCommand_OFPFC_MODIFY:
-		return agent.flowModify(flow)
+		return agent.flowModify(ctx, flow)
 	case ofp.OfpFlowModCommand_OFPFC_MODIFY_STRICT:
-		return agent.flowModifyStrict(flow)
+		return agent.flowModifyStrict(ctx, flow)
 	}
 	return status.Errorf(codes.Internal,
 		"unhandled-command: lDeviceId:%s, command:%s", agent.logicalDeviceID, flow.GetCommand())
@@ -65,7 +65,7 @@
 	if mod == nil {
 		return nil
 	}
-	flow, err := fu.FlowStatsEntryFromFlowModMessage(mod)
+	flow, err := fu.FlowStatsEntryFromFlowModMessage(ctx, mod)
 	if err != nil {
 		logger.Errorw("flowAdd-failed", log.Fields{"flowMod": mod, "err": err})
 		return err
@@ -126,7 +126,7 @@
 	updatedFlows := make([]*ofp.OfpFlowStats, 0)
 	checkOverlap := (mod.Flags & uint32(ofp.OfpFlowModFlags_OFPFF_CHECK_OVERLAP)) != 0
 	if checkOverlap {
-		if overlapped := fu.FindOverlappingFlows(flows, mod); len(overlapped) != 0 {
+		if overlapped := fu.FindOverlappingFlows(ctx, flows, mod); len(overlapped) != 0 {
 			//	TODO:  should this error be notified other than being logged?
 			logger.Warnw("overlapped-flows", log.Fields{"logicaldeviceId": agent.logicalDeviceID})
 		} else {
@@ -153,7 +153,7 @@
 		updatedFlows = append(updatedFlows, flow)
 		var flowMetadata voltha.FlowMetadata
 		lMeters, _ := agent.ListLogicalDeviceMeters(ctx)
-		if err := agent.GetMeterConfig(updatedFlows, lMeters.Items, &flowMetadata); err != nil {
+		if err := agent.GetMeterConfig(ctx, updatedFlows, lMeters.Items, &flowMetadata); err != nil {
 			logger.Error("Meter-referred-in-flow-not-present")
 			return changed, updated, err
 		}
@@ -163,7 +163,7 @@
 			return changed, updated, err
 		}
 
-		logger.Debugw("rules", log.Fields{"rules": deviceRules.String()})
+		logger.Debugw("rules", log.Fields{"rules": deviceRules.String(ctx)})
 		//	Update store and cache
 		if updated {
 			if err := agent.updateLogicalDeviceFlow(ctx, flow, flowChunk); err != nil {
@@ -174,7 +174,7 @@
 		// Create the go routines to wait
 		go func() {
 			// Wait for completion
-			if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChannels...); res != nil {
+			if res := coreutils.WaitForNilOrErrorResponses(ctx, agent.defaultTimeout, respChannels...); res != nil {
 				logger.Infow("failed-to-add-flows-will-attempt-deletion", log.Fields{"errors": res, "logical-device-id": agent.logicalDeviceID})
 				// Revert added flows
 				if err := agent.revertAddedFlows(context.Background(), mod, flow, flowToReplace, deviceRules, &flowMetadata); err != nil {
@@ -223,7 +223,7 @@
 	// Wait for the responses
 	go func() {
 		// Since this action is taken following an add failure, we may also receive a failure for the revert
-		if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
+		if res := coreutils.WaitForNilOrErrorResponses(ctx, agent.defaultTimeout, respChnls...); res != nil {
 			logger.Warnw("failure-reverting-added-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
 		}
 	}()
@@ -238,7 +238,7 @@
 		return nil
 	}
 
-	fs, err := fu.FlowStatsEntryFromFlowModMessage(mod)
+	fs, err := fu.FlowStatsEntryFromFlowModMessage(ctx, mod)
 	if err != nil {
 		return err
 	}
@@ -249,13 +249,13 @@
 	//Lock the map to search the matched flows
 	agent.flowLock.RLock()
 	for _, f := range agent.flows {
-		if fu.FlowMatch(f.flow, fs) {
+		if fu.FlowMatch(ctx, f.flow, fs) {
 			toDelete = append(toDelete, f.flow)
 			toDeleteChunks = append(toDeleteChunks, f)
 			continue
 		}
 		// Check wild card match
-		if fu.FlowMatchesMod(f.flow, mod) {
+		if fu.FlowMatchesMod(ctx, f.flow, mod) {
 			toDelete = append(toDelete, f.flow)
 			toDeleteChunks = append(toDeleteChunks, f)
 		}
@@ -280,7 +280,7 @@
 			}
 		}
 		var flowMetadata voltha.FlowMetadata
-		if err := agent.GetMeterConfig(toDelete, meters, &flowMetadata); err != nil { // This should never happen
+		if err := agent.GetMeterConfig(ctx, toDelete, meters, &flowMetadata); err != nil { // This should never happen
 			logger.Error("Meter-referred-in-flows-not-present")
 			return err
 		}
@@ -308,7 +308,7 @@
 		// Wait for the responses
 		go func() {
 			// Wait for completion
-			if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
+			if res := coreutils.WaitForNilOrErrorResponses(ctx, agent.defaultTimeout, respChnls...); res != nil {
 				logger.Errorw("failure-updating-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
 				// TODO: Revert the flow deletion
 			}
@@ -325,7 +325,7 @@
 		return nil
 	}
 
-	flow, err := fu.FlowStatsEntryFromFlowModMessage(mod)
+	flow, err := fu.FlowStatsEntryFromFlowModMessage(ctx, mod)
 	if err != nil {
 		return err
 	}
@@ -355,7 +355,7 @@
 
 	var flowMetadata voltha.FlowMetadata
 	flowsToDelete := []*ofp.OfpFlowStats{flowChunk.flow}
-	if err := agent.GetMeterConfig(flowsToDelete, meters, &flowMetadata); err != nil {
+	if err := agent.GetMeterConfig(ctx, flowsToDelete, meters, &flowMetadata); err != nil {
 		logger.Error("meter-referred-in-flows-not-present")
 		return err
 	}
@@ -385,7 +385,7 @@
 
 	// Wait for completion
 	go func() {
-		if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
+		if res := coreutils.WaitForNilOrErrorResponses(ctx, agent.defaultTimeout, respChnls...); res != nil {
 			logger.Warnw("failure-deleting-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
 			//TODO: Revert flow changes
 		}
@@ -395,11 +395,11 @@
 }
 
 //flowModify modifies a flow from the flow table of that logical device
-func (agent *LogicalAgent) flowModify(mod *ofp.OfpFlowMod) error {
+func (agent *LogicalAgent) flowModify(ctx context.Context, mod *ofp.OfpFlowMod) error {
 	return errors.New("flowModify not implemented")
 }
 
 //flowModifyStrict deletes a flow from the flow table of that logical device
-func (agent *LogicalAgent) flowModifyStrict(mod *ofp.OfpFlowMod) error {
+func (agent *LogicalAgent) flowModifyStrict(ctx context.Context, mod *ofp.OfpFlowMod) error {
 	return errors.New("flowModifyStrict not implemented")
 }
diff --git a/rw_core/core/device/logical_agent_flow_loader.go b/rw_core/core/device/logical_agent_flow_loader.go
index 84d1a47..d5cc54e 100644
--- a/rw_core/core/device/logical_agent_flow_loader.go
+++ b/rw_core/core/device/logical_agent_flow_loader.go
@@ -96,7 +96,7 @@
 	agent.flowLock.Lock()
 	defer agent.flowLock.Unlock()
 	for flowID, flowChunk := range agent.flows {
-		if mID := fu.GetMeterIdFromFlow(flowChunk.flow); mID != 0 && mID == meterID {
+		if mID := fu.GetMeterIdFromFlow(ctx, flowChunk.flow); mID != 0 && mID == meterID {
 			logger.Debugw("Flow-to-be- deleted", log.Fields{"flow": flowChunk.flow})
 			path := fmt.Sprintf("logical_flows/%s/%d", agent.logicalDeviceID, flowID)
 			if err := agent.clusterDataProxy.Remove(ctx, path); err != nil {
@@ -117,7 +117,7 @@
 	agent.flowLock.Lock()
 	defer agent.flowLock.Unlock()
 	for flowID, flowChunk := range agent.flows {
-		if fu.FlowHasOutGroup(flowChunk.flow, groupID) {
+		if fu.FlowHasOutGroup(ctx, flowChunk.flow, groupID) {
 			path := fmt.Sprintf("logical_flows/%s/%d", agent.logicalDeviceID, flowID)
 			if err := agent.clusterDataProxy.Remove(ctx, path); err != nil {
 				return nil, fmt.Errorf("couldnt-delete-flow-from-store-%s", path)
diff --git a/rw_core/core/device/logical_agent_group.go b/rw_core/core/device/logical_agent_group.go
index a0d6c4a..46855f4 100644
--- a/rw_core/core/device/logical_agent_group.go
+++ b/rw_core/core/device/logical_agent_group.go
@@ -64,7 +64,7 @@
 		return fmt.Errorf("Group %d already exists", groupMod.GroupId)
 	}
 
-	groupEntry := fu.GroupEntryFromGroupMod(groupMod)
+	groupEntry := fu.GroupEntryFromGroupMod(ctx, groupMod)
 	groupChunk := GroupChunk{
 		group: groupEntry,
 	}
@@ -83,20 +83,20 @@
 		agent.groupLock.Unlock()
 		return err
 	}
-	deviceRules := fu.NewDeviceRules()
-	deviceRules.CreateEntryIfNotExist(agent.rootDeviceID)
-	fg := fu.NewFlowsAndGroups()
-	fg.AddGroup(fu.GroupEntryFromGroupMod(groupMod))
-	deviceRules.AddFlowsAndGroup(agent.rootDeviceID, fg)
+	deviceRules := fu.NewDeviceRules(ctx)
+	deviceRules.CreateEntryIfNotExist(ctx, agent.rootDeviceID)
+	fg := fu.NewFlowsAndGroups(ctx)
+	fg.AddGroup(ctx, fu.GroupEntryFromGroupMod(ctx, groupMod))
+	deviceRules.AddFlowsAndGroup(ctx, agent.rootDeviceID, fg)
 
-	logger.Debugw("rules", log.Fields{"rules for group-add": deviceRules.String()})
+	logger.Debugw("rules", log.Fields{"rules for group-add": deviceRules.String(ctx)})
 
 	// Update the devices
 	respChnls := agent.addFlowsAndGroupsToDevices(ctx, deviceRules, &voltha.FlowMetadata{})
 
 	// Wait for completion
 	go func() {
-		if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
+		if res := coreutils.WaitForNilOrErrorResponses(ctx, agent.defaultTimeout, respChnls...); res != nil {
 			logger.Warnw("failure-updating-device-flows-groups", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
 			//TODO: Revert flow changes
 		}
@@ -169,14 +169,14 @@
 		if err != nil {
 			return err
 		}
-		logger.Debugw("rules", log.Fields{"rules": deviceRules.String()})
+		logger.Debugw("rules", log.Fields{"rules": deviceRules.String(ctx)})
 
 		// Update the devices
 		respChnls := agent.updateFlowsAndGroupsOfDevice(ctx, deviceRules, nil)
 
 		// Wait for completion
 		go func() {
-			if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
+			if res := coreutils.WaitForNilOrErrorResponses(ctx, agent.defaultTimeout, respChnls...); res != nil {
 				logger.Warnw("failure-updating-device-flows-groups", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
 				//TODO: Revert flow changes
 			}
@@ -202,14 +202,14 @@
 	groupChunk.lock.Lock()
 	defer groupChunk.lock.Unlock()
 	//replace existing group entry with new group definition
-	groupEntry := fu.GroupEntryFromGroupMod(groupMod)
-	deviceRules := fu.NewDeviceRules()
-	deviceRules.CreateEntryIfNotExist(agent.rootDeviceID)
-	fg := fu.NewFlowsAndGroups()
-	fg.AddGroup(fu.GroupEntryFromGroupMod(groupMod))
-	deviceRules.AddFlowsAndGroup(agent.rootDeviceID, fg)
+	groupEntry := fu.GroupEntryFromGroupMod(ctx, groupMod)
+	deviceRules := fu.NewDeviceRules(ctx)
+	deviceRules.CreateEntryIfNotExist(ctx, agent.rootDeviceID)
+	fg := fu.NewFlowsAndGroups(ctx)
+	fg.AddGroup(ctx, fu.GroupEntryFromGroupMod(ctx, groupMod))
+	deviceRules.AddFlowsAndGroup(ctx, agent.rootDeviceID, fg)
 
-	logger.Debugw("rules", log.Fields{"rules-for-group-modify": deviceRules.String()})
+	logger.Debugw("rules", log.Fields{"rules-for-group-modify": deviceRules.String(ctx)})
 	//update KV
 	if err := agent.updateLogicalDeviceFlowGroup(ctx, groupEntry, groupChunk); err != nil {
 		logger.Errorw("Cannot-update-logical-group", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
@@ -221,7 +221,7 @@
 
 	// Wait for completion
 	go func() {
-		if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
+		if res := coreutils.WaitForNilOrErrorResponses(ctx, agent.defaultTimeout, respChnls...); res != nil {
 			logger.Warnw("failure-updating-device-flows-groups", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
 			//TODO: Revert flow changes
 		}
diff --git a/rw_core/core/device/logical_agent_meter.go b/rw_core/core/device/logical_agent_meter.go
index c211f1e..497c35c 100644
--- a/rw_core/core/device/logical_agent_meter.go
+++ b/rw_core/core/device/logical_agent_meter.go
@@ -52,7 +52,7 @@
 		return nil
 	}
 
-	meterEntry := fu.MeterEntryFromMeterMod(meterMod)
+	meterEntry := fu.MeterEntryFromMeterMod(ctx, meterMod)
 	agent.meterLock.Lock()
 	//check if the meter already exists or not
 	_, ok := agent.meters[meterMod.MeterId]
@@ -116,7 +116,7 @@
 	if meterMod == nil {
 		return nil
 	}
-	newMeter := fu.MeterEntryFromMeterMod(meterMod)
+	newMeter := fu.MeterEntryFromMeterMod(ctx, meterMod)
 	agent.meterLock.RLock()
 	meterChunk, ok := agent.meters[newMeter.Config.MeterId]
 	agent.meterLock.RUnlock()
diff --git a/rw_core/core/device/logical_agent_meter_loader.go b/rw_core/core/device/logical_agent_meter_loader.go
index 4408bf1..b511b35 100644
--- a/rw_core/core/device/logical_agent_meter_loader.go
+++ b/rw_core/core/device/logical_agent_meter_loader.go
@@ -93,10 +93,10 @@
 }
 
 // GetMeterConfig returns meter config
-func (agent *LogicalAgent) GetMeterConfig(flows []*ofp.OfpFlowStats, meters []*ofp.OfpMeterEntry, metadata *voltha.FlowMetadata) error {
+func (agent *LogicalAgent) GetMeterConfig(ctx context.Context, flows []*ofp.OfpFlowStats, meters []*ofp.OfpMeterEntry, metadata *voltha.FlowMetadata) error {
 	m := make(map[uint32]bool)
 	for _, flow := range flows {
-		if flowMeterID := fu.GetMeterIdFromFlow(flow); flowMeterID != 0 && !m[flowMeterID] {
+		if flowMeterID := fu.GetMeterIdFromFlow(ctx, flow); flowMeterID != 0 && !m[flowMeterID] {
 			foundMeter := false
 			// Meter is present in the flow , Get from logical device
 			for _, meter := range meters {
@@ -122,7 +122,7 @@
 
 func (agent *LogicalAgent) updateFlowCountOfMeterStats(ctx context.Context, modCommand *ofp.OfpFlowMod, flow *ofp.OfpFlowStats, revertUpdate bool) bool {
 	flowCommand := modCommand.GetCommand()
-	meterID := fu.GetMeterIdFromFlow(flow)
+	meterID := fu.GetMeterIdFromFlow(ctx, flow)
 	logger.Debugw("Meter-id-in-flow-mod", log.Fields{"meterId": meterID})
 	if meterID == 0 {
 		logger.Debugw("No-meter-present-in-the-flow", log.Fields{"flow": *flow})
diff --git a/rw_core/core/device/logical_agent_port.go b/rw_core/core/device/logical_agent_port.go
index 7845ad5..7235791 100644
--- a/rw_core/core/device/logical_agent_port.go
+++ b/rw_core/core/device/logical_agent_port.go
@@ -53,12 +53,12 @@
 		if _, err = agent.addNNILogicalPort(ctx, device, port); err != nil {
 			return err
 		}
-		agent.addLogicalPortToMap(port.PortNo, true)
+		agent.addLogicalPortToMap(ctx, port.PortNo, true)
 	} else if port.Type == voltha.Port_ETHERNET_UNI {
 		if _, err = agent.addUNILogicalPort(ctx, device, port); err != nil {
 			return err
 		}
-		agent.addLogicalPortToMap(port.PortNo, false)
+		agent.addLogicalPortToMap(ctx, port.PortNo, false)
 	} else {
 		// Update the device routes to ensure all routes on the logical device have been calculated
 		if err = agent.buildRoutes(ctx); err != nil {
@@ -88,18 +88,18 @@
 	}
 	responses := make([]coreutils.Response, 0)
 	for _, child := range children.Items {
-		response := coreutils.NewResponse()
+		response := coreutils.NewResponse(ctx)
 		responses = append(responses, response)
 		go func(child *voltha.Device) {
 			if err = agent.setupUNILogicalPorts(context.Background(), child); err != nil {
 				logger.Error("setting-up-UNI-ports-failed", log.Fields{"deviceID": child.Id})
-				response.Error(status.Errorf(codes.Internal, "UNI-ports-setup-failed: %s", child.Id))
+				response.Error(ctx, status.Errorf(codes.Internal, "UNI-ports-setup-failed: %s", child.Id))
 			}
-			response.Done()
+			response.Done(ctx)
 		}(child)
 	}
 	// Wait for completion
-	if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, responses...); res != nil {
+	if res := coreutils.WaitForNilOrErrorResponses(ctx, agent.defaultTimeout, responses...); res != nil {
 		return status.Errorf(codes.Aborted, "errors-%s", res)
 	}
 	return nil
@@ -123,7 +123,7 @@
 			if _, err = agent.addNNILogicalPort(ctx, device, port); err != nil {
 				logger.Errorw("error-adding-UNI-port", log.Fields{"error": err})
 			}
-			agent.addLogicalPortToMap(port.PortNo, true)
+			agent.addLogicalPortToMap(ctx, port.PortNo, true)
 		}
 	}
 	return err
@@ -135,10 +135,10 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
-	defer agent.requestQueue.RequestComplete()
+	defer agent.requestQueue.RequestComplete(ctx)
 	// Get the latest logical device info
-	original := agent.getLogicalDeviceWithoutLock()
-	updatedPorts := clonePorts(original.Ports)
+	original := agent.getLogicalDeviceWithoutLock(ctx)
+	updatedPorts := clonePorts(ctx, original.Ports)
 	for _, port := range updatedPorts {
 		if port.DeviceId == deviceID && port.DevicePortNo == portNo {
 			if operStatus == voltha.OperStatus_ACTIVE {
@@ -165,10 +165,10 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
-	defer agent.requestQueue.RequestComplete()
+	defer agent.requestQueue.RequestComplete(ctx)
 	// Get the latest logical device info
-	original := agent.getLogicalDeviceWithoutLock()
-	updatedPorts := clonePorts(original.Ports)
+	original := agent.getLogicalDeviceWithoutLock(ctx)
+	updatedPorts := clonePorts(ctx, original.Ports)
 	for _, port := range updatedPorts {
 		if port.DeviceId == device.Id {
 			if state == voltha.OperStatus_ACTIVE {
@@ -201,7 +201,7 @@
 				logger.Errorw("error-adding-UNI-port", log.Fields{"error": err})
 			}
 			if added {
-				agent.addLogicalPortToMap(port.PortNo, false)
+				agent.addLogicalPortToMap(ctx, port.PortNo, false)
 			}
 		}
 	}
@@ -214,9 +214,9 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
-	defer agent.requestQueue.RequestComplete()
+	defer agent.requestQueue.RequestComplete(ctx)
 	// Get the latest logical device info
-	cloned := agent.getLogicalDeviceWithoutLock()
+	cloned := agent.getLogicalDeviceWithoutLock(ctx)
 
 	if err := agent.updateLogicalDevicePortsWithoutLock(ctx, cloned, []*voltha.LogicalPort{}); err != nil {
 		logger.Warnw("logical-device-update-failed", log.Fields{"ldeviceId": agent.logicalDeviceID, "error": err})
@@ -230,9 +230,9 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
-	defer agent.requestQueue.RequestComplete()
+	defer agent.requestQueue.RequestComplete(ctx)
 
-	logicalDevice := agent.getLogicalDeviceWithoutLock()
+	logicalDevice := agent.getLogicalDeviceWithoutLock(ctx)
 
 	index := -1
 	for i, logicalPort := range logicalDevice.Ports {
@@ -242,7 +242,7 @@
 		}
 	}
 	if index >= 0 {
-		clonedPorts := clonePorts(logicalDevice.Ports)
+		clonedPorts := clonePorts(ctx, logicalDevice.Ports)
 		if index < len(clonedPorts)-1 {
 			copy(clonedPorts[index:], clonedPorts[index+1:])
 		}
@@ -255,7 +255,7 @@
 		}
 
 		// Remove the logical port from cache
-		agent.deleteLogicalPortsFromMap([]uint32{lPort.DevicePortNo})
+		agent.deleteLogicalPortsFromMap(ctx, []uint32{lPort.DevicePortNo})
 		// Reset the logical device routes
 		go func() {
 			if err := agent.buildRoutes(context.Background()); err != nil {
@@ -272,9 +272,9 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
-	defer agent.requestQueue.RequestComplete()
+	defer agent.requestQueue.RequestComplete(ctx)
 
-	logicalDevice := agent.getLogicalDeviceWithoutLock()
+	logicalDevice := agent.getLogicalDeviceWithoutLock(ctx)
 	lPortstoKeep := []*voltha.LogicalPort{}
 	lPortsNoToDelete := []uint32{}
 	for _, logicalPort := range logicalDevice.Ports {
@@ -290,7 +290,7 @@
 		return err
 	}
 	// Remove the port from the cached logical ports set
-	agent.deleteLogicalPortsFromMap(lPortsNoToDelete)
+	agent.deleteLogicalPortsFromMap(ctx, lPortsNoToDelete)
 
 	// Reset the logical device routes
 	go func() {
@@ -307,9 +307,9 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
-	defer agent.requestQueue.RequestComplete()
+	defer agent.requestQueue.RequestComplete(ctx)
 
-	logicalDevice := agent.getLogicalDeviceWithoutLock()
+	logicalDevice := agent.getLogicalDeviceWithoutLock(ctx)
 
 	index := -1
 	for i, logicalPort := range logicalDevice.Ports {
@@ -319,7 +319,7 @@
 		}
 	}
 	if index >= 0 {
-		clonedPorts := clonePorts(logicalDevice.Ports)
+		clonedPorts := clonePorts(ctx, logicalDevice.Ports)
 		clonedPorts[index].OfpPort.Config = clonedPorts[index].OfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
 		return agent.updateLogicalDevicePortsWithoutLock(ctx, logicalDevice, clonedPorts)
 	}
@@ -331,10 +331,10 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
-	defer agent.requestQueue.RequestComplete()
+	defer agent.requestQueue.RequestComplete(ctx)
 
 	// Get the most up to date logical device
-	logicalDevice := agent.getLogicalDeviceWithoutLock()
+	logicalDevice := agent.getLogicalDeviceWithoutLock(ctx)
 	index := -1
 	for i, logicalPort := range logicalDevice.Ports {
 		if logicalPort.Id == lPortID {
@@ -343,7 +343,7 @@
 		}
 	}
 	if index >= 0 {
-		clonedPorts := clonePorts(logicalDevice.Ports)
+		clonedPorts := clonePorts(ctx, logicalDevice.Ports)
 		clonedPorts[index].OfpPort.Config = (clonedPorts[index].OfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)) | uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
 		return agent.updateLogicalDevicePortsWithoutLock(ctx, logicalDevice, clonedPorts)
 	}
@@ -360,12 +360,12 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return false, err
 	}
-	if agent.portExist(device, port) {
+	if agent.portExist(ctx, device, port) {
 		logger.Debugw("port-already-exist", log.Fields{"port": port})
-		agent.requestQueue.RequestComplete()
+		agent.requestQueue.RequestComplete(ctx)
 		return false, nil
 	}
-	agent.requestQueue.RequestComplete()
+	agent.requestQueue.RequestComplete(ctx)
 
 	var portCap *ic.PortCapability
 	var err error
@@ -379,9 +379,9 @@
 		return false, err
 	}
 
-	defer agent.requestQueue.RequestComplete()
+	defer agent.requestQueue.RequestComplete(ctx)
 	// Double check again if this port has been already added since the getPortCapability could have taken a long time
-	if agent.portExist(device, port) {
+	if agent.portExist(ctx, device, port) {
 		logger.Debugw("port-already-exist", log.Fields{"port": port})
 		return false, nil
 	}
@@ -394,9 +394,9 @@
 	lp.OfpPort.Name = lp.Id
 	lp.DevicePortNo = port.PortNo
 
-	ld := agent.getLogicalDeviceWithoutLock()
+	ld := agent.getLogicalDeviceWithoutLock(ctx)
 
-	clonedPorts := clonePorts(ld.Ports)
+	clonedPorts := clonePorts(ctx, ld.Ports)
 	if clonedPorts == nil {
 		clonedPorts = make([]*voltha.LogicalPort, 0)
 	}
@@ -418,8 +418,8 @@
 	return true, nil
 }
 
-func (agent *LogicalAgent) portExist(device *voltha.Device, port *voltha.Port) bool {
-	ldevice := agent.getLogicalDeviceWithoutLock()
+func (agent *LogicalAgent) portExist(ctx context.Context, device *voltha.Device, port *voltha.Port) bool {
+	ldevice := agent.getLogicalDeviceWithoutLock(ctx)
 	for _, lPort := range ldevice.Ports {
 		if lPort.DeviceId == device.Id && lPort.DevicePortNo == port.PortNo {
 			return true
@@ -442,12 +442,12 @@
 		return false, err
 	}
 
-	if agent.portExist(childDevice, port) {
+	if agent.portExist(ctx, childDevice, port) {
 		logger.Debugw("port-already-exist", log.Fields{"port": port})
-		agent.requestQueue.RequestComplete()
+		agent.requestQueue.RequestComplete(ctx)
 		return false, nil
 	}
-	agent.requestQueue.RequestComplete()
+	agent.requestQueue.RequestComplete(ctx)
 	var portCap *ic.PortCapability
 	var err error
 	// First get the port capability
@@ -458,14 +458,14 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return false, err
 	}
-	defer agent.requestQueue.RequestComplete()
+	defer agent.requestQueue.RequestComplete(ctx)
 	// Double check again if this port has been already added since the getPortCapability could have taken a long time
-	if agent.portExist(childDevice, port) {
+	if agent.portExist(ctx, childDevice, port) {
 		logger.Debugw("port-already-exist", log.Fields{"port": port})
 		return false, nil
 	}
 	// Get stored logical device
-	ldevice := agent.getLogicalDeviceWithoutLock()
+	ldevice := agent.getLogicalDeviceWithoutLock(ctx)
 
 	logger.Debugw("adding-uni", log.Fields{"deviceId": childDevice.Id})
 	portCap.Port.RootPort = false
@@ -473,7 +473,7 @@
 	portCap.Port.OfpPort.PortNo = port.PortNo
 	portCap.Port.DeviceId = childDevice.Id
 	portCap.Port.DevicePortNo = port.PortNo
-	clonedPorts := clonePorts(ldevice.Ports)
+	clonedPorts := clonePorts(ctx, ldevice.Ports)
 	if clonedPorts == nil {
 		clonedPorts = make([]*voltha.LogicalPort, 0)
 	}
@@ -493,7 +493,7 @@
 	return true, nil
 }
 
-func clonePorts(ports []*voltha.LogicalPort) []*voltha.LogicalPort {
+func clonePorts(ctx context.Context, ports []*voltha.LogicalPort) []*voltha.LogicalPort {
 	return proto.Clone(&voltha.LogicalPorts{Items: ports}).(*voltha.LogicalPorts).Items
 }
 
@@ -504,12 +504,12 @@
 	if err := agent.updateLogicalDeviceWithoutLock(ctx, device); err != nil {
 		return err
 	}
-	agent.portUpdated(oldPorts, newPorts)
+	agent.portUpdated(ctx, oldPorts, newPorts)
 	return nil
 }
 
 // diff go over two lists of logical ports and return what's new, what's changed and what's removed.
-func diff(oldList, newList []*voltha.LogicalPort) (newPorts, changedPorts, deletedPorts map[string]*voltha.LogicalPort) {
+func diff(ctx context.Context, oldList, newList []*voltha.LogicalPort) (newPorts, changedPorts, deletedPorts map[string]*voltha.LogicalPort) {
 	newPorts = make(map[string]*voltha.LogicalPort, len(newList))
 	changedPorts = make(map[string]*voltha.LogicalPort, len(oldList))
 	deletedPorts = make(map[string]*voltha.LogicalPort, len(oldList))
@@ -533,21 +533,21 @@
 }
 
 // portUpdated is invoked when a port is updated on the logical device
-func (agent *LogicalAgent) portUpdated(prevPorts, currPorts []*voltha.LogicalPort) interface{} {
+func (agent *LogicalAgent) portUpdated(ctx context.Context, prevPorts, currPorts []*voltha.LogicalPort) interface{} {
 	// Get the difference between the two list
-	newPorts, changedPorts, deletedPorts := diff(prevPorts, currPorts)
+	newPorts, changedPorts, deletedPorts := diff(ctx, prevPorts, currPorts)
 
 	// Send the port change events to the OF controller
 	for _, newP := range newPorts {
-		go agent.ldeviceMgr.SendChangeEvent(agent.logicalDeviceID,
+		go agent.ldeviceMgr.SendChangeEvent(ctx, agent.logicalDeviceID,
 			&ofp.OfpPortStatus{Reason: ofp.OfpPortReason_OFPPR_ADD, Desc: newP.OfpPort})
 	}
 	for _, change := range changedPorts {
-		go agent.ldeviceMgr.SendChangeEvent(agent.logicalDeviceID,
+		go agent.ldeviceMgr.SendChangeEvent(ctx, agent.logicalDeviceID,
 			&ofp.OfpPortStatus{Reason: ofp.OfpPortReason_OFPPR_MODIFY, Desc: change.OfpPort})
 	}
 	for _, del := range deletedPorts {
-		go agent.ldeviceMgr.SendChangeEvent(agent.logicalDeviceID,
+		go agent.ldeviceMgr.SendChangeEvent(ctx, agent.logicalDeviceID,
 			&ofp.OfpPortStatus{Reason: ofp.OfpPortReason_OFPPR_DELETE, Desc: del.OfpPort})
 	}
 
@@ -557,13 +557,13 @@
 //GetWildcardInputPorts filters out the logical port number from the set of logical ports on the device and
 //returns their port numbers.  This function is invoked only during flow decomposition where the lock on the logical
 //device is already held.  Therefore it is safe to retrieve the logical device without lock.
-func (agent *LogicalAgent) GetWildcardInputPorts(excludePort ...uint32) []uint32 {
+func (agent *LogicalAgent) GetWildcardInputPorts(ctx context.Context, excludePort ...uint32) []uint32 {
 	lPorts := make([]uint32, 0)
 	var exclPort uint32
 	if len(excludePort) == 1 {
 		exclPort = excludePort[0]
 	}
-	lDevice := agent.getLogicalDeviceWithoutLock()
+	lDevice := agent.getLogicalDeviceWithoutLock(ctx)
 	for _, port := range lDevice.Ports {
 		if port.OfpPort.PortNo != exclPort {
 			lPorts = append(lPorts, port.OfpPort.PortNo)
@@ -574,7 +574,7 @@
 
 // helpers for agent.logicalPortsNo
 
-func (agent *LogicalAgent) addLogicalPortToMap(portNo uint32, nniPort bool) {
+func (agent *LogicalAgent) addLogicalPortToMap(ctx context.Context, portNo uint32, nniPort bool) {
 	agent.lockLogicalPortsNo.Lock()
 	defer agent.lockLogicalPortsNo.Unlock()
 	if exist := agent.logicalPortsNo[portNo]; !exist {
@@ -582,7 +582,7 @@
 	}
 }
 
-func (agent *LogicalAgent) addLogicalPortsToMap(lps []*voltha.LogicalPort) {
+func (agent *LogicalAgent) addLogicalPortsToMap(ctx context.Context, lps []*voltha.LogicalPort) {
 	agent.lockLogicalPortsNo.Lock()
 	defer agent.lockLogicalPortsNo.Unlock()
 	for _, lp := range lps {
@@ -592,7 +592,7 @@
 	}
 }
 
-func (agent *LogicalAgent) deleteLogicalPortsFromMap(portsNo []uint32) {
+func (agent *LogicalAgent) deleteLogicalPortsFromMap(ctx context.Context, portsNo []uint32) {
 	agent.lockLogicalPortsNo.Lock()
 	defer agent.lockLogicalPortsNo.Unlock()
 	for _, pNo := range portsNo {
@@ -600,7 +600,7 @@
 	}
 }
 
-func (agent *LogicalAgent) isNNIPort(portNo uint32) bool {
+func (agent *LogicalAgent) isNNIPort(ctx context.Context, portNo uint32) bool {
 	agent.lockLogicalPortsNo.RLock()
 	defer agent.lockLogicalPortsNo.RUnlock()
 	if exist := agent.logicalPortsNo[portNo]; exist {
@@ -609,7 +609,7 @@
 	return false
 }
 
-func (agent *LogicalAgent) getFirstNNIPort() (uint32, error) {
+func (agent *LogicalAgent) getFirstNNIPort(ctx context.Context) (uint32, error) {
 	agent.lockLogicalPortsNo.RLock()
 	defer agent.lockLogicalPortsNo.RUnlock()
 	for portNo, nni := range agent.logicalPortsNo {
@@ -621,7 +621,7 @@
 }
 
 //GetNNIPorts returns NNI ports.
-func (agent *LogicalAgent) GetNNIPorts() []uint32 {
+func (agent *LogicalAgent) GetNNIPorts(ctx context.Context) []uint32 {
 	agent.lockLogicalPortsNo.RLock()
 	defer agent.lockLogicalPortsNo.RUnlock()
 	nniPorts := make([]uint32, 0)
@@ -634,13 +634,13 @@
 }
 
 // getUNILogicalPortNo returns the UNI logical port number specified in the flow
-func (agent *LogicalAgent) getUNILogicalPortNo(flow *ofp.OfpFlowStats) (uint32, error) {
+func (agent *LogicalAgent) getUNILogicalPortNo(ctx context.Context, flow *ofp.OfpFlowStats) (uint32, error) {
 	var uniPort uint32
-	inPortNo := fu.GetInPort(flow)
-	outPortNo := fu.GetOutPort(flow)
-	if agent.isNNIPort(inPortNo) {
+	inPortNo := fu.GetInPort(ctx, flow)
+	outPortNo := fu.GetOutPort(ctx, flow)
+	if agent.isNNIPort(ctx, inPortNo) {
 		uniPort = outPortNo
-	} else if agent.isNNIPort(outPortNo) {
+	} else if agent.isNNIPort(ctx, outPortNo) {
 		uniPort = inPortNo
 	}
 	if uniPort != 0 {
diff --git a/rw_core/core/device/logical_agent_route.go b/rw_core/core/device/logical_agent_route.go
index 6736160..db1c65c 100644
--- a/rw_core/core/device/logical_agent_route.go
+++ b/rw_core/core/device/logical_agent_route.go
@@ -39,7 +39,7 @@
 	//	Consider different possibilities
 	if egressPortNo != 0 && ((egressPortNo & 0x7fffffff) == uint32(ofp.OfpPortNo_OFPP_CONTROLLER)) {
 		logger.Debugw("controller-flow", log.Fields{"ingressPortNo": ingressPortNo, "egressPortNo": egressPortNo, "logicalPortsNo": agent.logicalPortsNo})
-		if agent.isNNIPort(ingressPortNo) {
+		if agent.isNNIPort(ctx, ingressPortNo) {
 			//This is a trap on the NNI Port
 			if len(agent.deviceRoutes.Routes) == 0 {
 				// If there are no routes set (usually when the logical device has only NNI port(s), then just return an
@@ -51,7 +51,7 @@
 			}
 			//Return a 'half' route to make the flow decomposer logic happy
 			for routeLink, path := range agent.deviceRoutes.Routes {
-				if agent.isNNIPort(routeLink.Egress) {
+				if agent.isNNIPort(ctx, routeLink.Egress) {
 					routes = append(routes, route.Hop{}) // first hop is set to empty
 					routes = append(routes, path[1])
 					return routes, nil
@@ -61,7 +61,7 @@
 		}
 		//treat it as if the output port is the first NNI of the OLT
 		var err error
-		if egressPortNo, err = agent.getFirstNNIPort(); err != nil {
+		if egressPortNo, err = agent.getFirstNNIPort(ctx); err != nil {
 			logger.Warnw("no-nni-port", log.Fields{"error": err})
 			return nil, err
 		}
@@ -70,10 +70,10 @@
 	//route if egress port is OFPP_CONTROLLER or a nni logical port,
 	//in which case we need to create a half-route where only the egress
 	//hop is filled, the first hop is nil
-	if ingressPortNo == 0 && agent.isNNIPort(egressPortNo) {
+	if ingressPortNo == 0 && agent.isNNIPort(ctx, egressPortNo) {
 		// We can use the 2nd hop of any upstream route, so just find the first upstream:
 		for routeLink, path := range agent.deviceRoutes.Routes {
-			if agent.isNNIPort(routeLink.Egress) {
+			if agent.isNNIPort(ctx, routeLink.Egress) {
 				routes = append(routes, route.Hop{}) // first hop is set to empty
 				routes = append(routes, path[1])
 				return routes, nil
@@ -93,10 +93,10 @@
 		return nil, fmt.Errorf("no downstream route from:%d to:%d :%w", ingressPortNo, egressPortNo, route.ErrNoRoute)
 	}
 	//	Return the pre-calculated route
-	return agent.getPreCalculatedRoute(ingressPortNo, egressPortNo)
+	return agent.getPreCalculatedRoute(ctx, ingressPortNo, egressPortNo)
 }
 
-func (agent *LogicalAgent) getPreCalculatedRoute(ingress, egress uint32) ([]route.Hop, error) {
+func (agent *LogicalAgent) getPreCalculatedRoute(ctx context.Context, ingress, egress uint32) ([]route.Hop, error) {
 	logger.Debugw("ROUTE", log.Fields{"len": len(agent.deviceRoutes.Routes)})
 	for routeLink, route := range agent.deviceRoutes.Routes {
 		logger.Debugw("ROUTELINKS", log.Fields{"ingress": ingress, "egress": egress, "routelink": routeLink})
@@ -108,7 +108,7 @@
 }
 
 // GetDeviceRoutes returns device graph
-func (agent *LogicalAgent) GetDeviceRoutes() *route.DeviceRoutes {
+func (agent *LogicalAgent) GetDeviceRoutes(ctx context.Context) *route.DeviceRoutes {
 	return agent.deviceRoutes
 }
 
@@ -123,7 +123,7 @@
 		return err
 	}
 
-	if agent.deviceRoutes != nil && agent.deviceRoutes.IsUpToDate(ld) {
+	if agent.deviceRoutes != nil && agent.deviceRoutes.IsUpToDate(ctx, ld) {
 		return nil
 	}
 	logger.Debug("Generation of device route required")
@@ -142,18 +142,18 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
-	defer agent.requestQueue.RequestComplete()
+	defer agent.requestQueue.RequestComplete(ctx)
 
 	if agent.deviceRoutes == nil {
-		agent.deviceRoutes = route.NewDeviceRoutes(agent.logicalDeviceID, agent.deviceMgr.getDevice)
+		agent.deviceRoutes = route.NewDeviceRoutes(ctx, agent.logicalDeviceID, agent.deviceMgr.getDevice)
 	}
 	// Get all the logical ports on that logical device
-	lDevice := agent.getLogicalDeviceWithoutLock()
+	lDevice := agent.getLogicalDeviceWithoutLock(ctx)
 
 	if err := agent.deviceRoutes.ComputeRoutes(ctx, lDevice.Ports); err != nil {
 		return err
 	}
-	if err := agent.deviceRoutes.Print(); err != nil {
+	if err := agent.deviceRoutes.Print(ctx); err != nil {
 		return err
 	}
 	return nil
@@ -165,15 +165,15 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
-	defer agent.requestQueue.RequestComplete()
+	defer agent.requestQueue.RequestComplete(ctx)
 
 	if agent.deviceRoutes == nil {
-		agent.deviceRoutes = route.NewDeviceRoutes(agent.logicalDeviceID, agent.deviceMgr.getDevice)
+		agent.deviceRoutes = route.NewDeviceRoutes(ctx, agent.logicalDeviceID, agent.deviceMgr.getDevice)
 	}
 	if err := agent.deviceRoutes.AddPort(ctx, lp, agent.logicalDevice.Ports); err != nil {
 		return err
 	}
-	if err := agent.deviceRoutes.Print(); err != nil {
+	if err := agent.deviceRoutes.Print(ctx); err != nil {
 		return err
 	}
 	return nil
diff --git a/rw_core/core/device/logical_agent_test.go b/rw_core/core/device/logical_agent_test.go
index 8ec5454..cad89bf 100644
--- a/rw_core/core/device/logical_agent_test.go
+++ b/rw_core/core/device/logical_agent_test.go
@@ -42,7 +42,7 @@
 func TestLogicalDeviceAgent_diff_nochange_1(t *testing.T) {
 	currentLogicalPorts := []*voltha.LogicalPort{}
 	updatedLogicalPorts := []*voltha.LogicalPort{}
-	newPorts, changedPorts, deletedPorts := diff(currentLogicalPorts, updatedLogicalPorts)
+	newPorts, changedPorts, deletedPorts := diff(context.Background(), currentLogicalPorts, updatedLogicalPorts)
 	assert.Equal(t, 0, len(newPorts))
 	assert.Equal(t, 0, len(changedPorts))
 	assert.Equal(t, 0, len(deletedPorts))
@@ -125,7 +125,7 @@
 			},
 		},
 	}
-	newPorts, changedPorts, deletedPorts := diff(currentLogicalPorts, updatedLogicalPorts)
+	newPorts, changedPorts, deletedPorts := diff(context.Background(), currentLogicalPorts, updatedLogicalPorts)
 	assert.Equal(t, 0, len(newPorts))
 	assert.Equal(t, 0, len(changedPorts))
 	assert.Equal(t, 0, len(deletedPorts))
@@ -159,7 +159,7 @@
 			},
 		},
 	}
-	newPorts, changedPorts, deletedPorts := diff(currentLogicalPorts, updatedLogicalPorts)
+	newPorts, changedPorts, deletedPorts := diff(context.Background(), currentLogicalPorts, updatedLogicalPorts)
 	assert.Equal(t, 2, len(newPorts))
 	assert.Equal(t, 0, len(changedPorts))
 	assert.Equal(t, 0, len(deletedPorts))
@@ -183,7 +183,7 @@
 		},
 	}
 	updatedLogicalPorts := []*voltha.LogicalPort{}
-	newPorts, changedPorts, deletedPorts := diff(currentLogicalPorts, updatedLogicalPorts)
+	newPorts, changedPorts, deletedPorts := diff(context.Background(), currentLogicalPorts, updatedLogicalPorts)
 	assert.Equal(t, 0, len(newPorts))
 	assert.Equal(t, 0, len(changedPorts))
 	assert.Equal(t, 1, len(deletedPorts))
@@ -267,7 +267,7 @@
 			},
 		},
 	}
-	newPorts, changedPorts, deletedPorts := diff(currentLogicalPorts, updatedLogicalPorts)
+	newPorts, changedPorts, deletedPorts := diff(context.Background(), currentLogicalPorts, updatedLogicalPorts)
 	assert.Equal(t, 0, len(newPorts))
 	assert.Equal(t, 2, len(changedPorts))
 	assert.Equal(t, 0, len(deletedPorts))
@@ -352,7 +352,7 @@
 			},
 		},
 	}
-	newPorts, changedPorts, deletedPorts := diff(currentLogicalPorts, updatedLogicalPorts)
+	newPorts, changedPorts, deletedPorts := diff(context.Background(), currentLogicalPorts, updatedLogicalPorts)
 	assert.Equal(t, 1, len(newPorts))
 	assert.Equal(t, 2, len(changedPorts))
 	assert.Equal(t, 1, len(deletedPorts))
@@ -387,7 +387,7 @@
 		logger.Fatal(err)
 	}
 	// Create the kafka client
-	test.kClient = mock_kafka.NewKafkaClient()
+	test.kClient = mock_kafka.NewKafkaClient(context.Background())
 	test.oltAdapterName = "olt_adapter_mock"
 	test.onuAdapterName = "onu_adapter_mock"
 	test.coreInstanceID = "rw-da-test"
@@ -458,6 +458,7 @@
 	cfg.DefaultRequestTimeout = lda.defaultTimeout
 	cfg.KVStorePort = lda.kvClientPort
 	cfg.InCompetingMode = inCompeteMode
+	ctx := context.Background()
 	grpcPort, err := freeport.GetFreePort()
 	if err != nil {
 		logger.Fatal("Cannot get a freeport for grpc")
@@ -474,29 +475,30 @@
 		LivenessChannelInterval: cfg.LiveProbeInterval / 2,
 		PathPrefix:              cfg.KVStoreDataPrefix}
 	lda.kmp = kafka.NewInterContainerProxy(
-		kafka.InterContainerHost(cfg.KafkaAdapterHost),
-		kafka.InterContainerPort(cfg.KafkaAdapterPort),
-		kafka.MsgClient(lda.kClient),
-		kafka.DefaultTopic(&kafka.Topic{Name: cfg.CoreTopic}),
-		kafka.DeviceDiscoveryTopic(&kafka.Topic{Name: cfg.AffinityRouterTopic}))
+		ctx,
+		kafka.InterContainerHost(ctx, cfg.KafkaAdapterHost),
+		kafka.InterContainerPort(ctx, cfg.KafkaAdapterPort),
+		kafka.MsgClient(ctx, lda.kClient),
+		kafka.DefaultTopic(ctx, &kafka.Topic{Name: cfg.CoreTopic}),
+		kafka.DeviceDiscoveryTopic(ctx, &kafka.Topic{Name: cfg.AffinityRouterTopic}))
 
-	endpointMgr := kafka.NewEndpointManager(backend)
-	proxy := model.NewProxy(backend, "/")
-	adapterMgr := adapter.NewAdapterManager(proxy, lda.coreInstanceID, lda.kClient)
+	endpointMgr := kafka.NewEndpointManager(ctx, backend)
+	proxy := model.NewProxy(ctx, backend, "/")
+	adapterMgr := adapter.NewAdapterManager(ctx, proxy, lda.coreInstanceID, lda.kClient)
 
-	lda.deviceMgr, lda.logicalDeviceMgr = NewManagers(proxy, adapterMgr, lda.kmp, endpointMgr, cfg.CorePairTopic, lda.coreInstanceID, cfg.DefaultCoreTimeout)
-	if err = lda.kmp.Start(); err != nil {
+	lda.deviceMgr, lda.logicalDeviceMgr = NewManagers(ctx, proxy, adapterMgr, lda.kmp, endpointMgr, cfg.CorePairTopic, lda.coreInstanceID, cfg.DefaultCoreTimeout)
+	if err = lda.kmp.Start(ctx); err != nil {
 		logger.Fatal("Cannot start InterContainerProxy")
 	}
-	adapterMgr.Start(context.Background())
+	adapterMgr.Start(ctx)
 }
 
 func (lda *LDATest) stopAll() {
 	if lda.kClient != nil {
-		lda.kClient.Stop()
+		lda.kClient.Stop(context.Background())
 	}
 	if lda.kmp != nil {
-		lda.kmp.Stop()
+		lda.kmp.Stop(context.Background())
 	}
 	if lda.etcdServer != nil {
 		stopEmbeddedEtcdServer(lda.etcdServer)
@@ -509,11 +511,11 @@
 	clonedLD := proto.Clone(lda.logicalDevice).(*voltha.LogicalDevice)
 	clonedLD.Id = com.GetRandomString(10)
 	clonedLD.DatapathId = rand.Uint64()
-	lDeviceAgent := newLogicalDeviceAgent(clonedLD.Id, clonedLD.Id, clonedLD.RootDeviceId, lDeviceMgr, deviceMgr, lDeviceMgr.clusterDataProxy, lDeviceMgr.defaultTimeout)
+	lDeviceAgent := newLogicalDeviceAgent(context.Background(), clonedLD.Id, clonedLD.Id, clonedLD.RootDeviceId, lDeviceMgr, deviceMgr, lDeviceMgr.clusterDataProxy, lDeviceMgr.defaultTimeout)
 	lDeviceAgent.logicalDevice = clonedLD
 	err := lDeviceAgent.clusterDataProxy.AddWithID(context.Background(), "logical_devices", clonedLD.Id, clonedLD)
 	assert.Nil(t, err)
-	lDeviceMgr.addLogicalDeviceAgentToMap(lDeviceAgent)
+	lDeviceMgr.addLogicalDeviceAgentToMap(context.Background(), lDeviceAgent)
 	return lDeviceAgent
 }
 
@@ -569,7 +571,7 @@
 	}()
 	// wait for go routines to be done
 	localWG.Wait()
-	meterEntry := fu.MeterEntryFromMeterMod(meterMod)
+	meterEntry := fu.MeterEntryFromMeterMod(context.Background(), meterMod)
 
 	meterChunk, ok := ldAgent.meters[meterMod.MeterId]
 	assert.Equal(t, ok, true)
diff --git a/rw_core/core/device/logical_manager.go b/rw_core/core/device/logical_manager.go
index d9b6731..89fdf34 100644
--- a/rw_core/core/device/logical_manager.go
+++ b/rw_core/core/device/logical_manager.go
@@ -48,7 +48,7 @@
 	logicalDeviceLoadingInProgress map[string][]chan int
 }
 
-func (ldMgr *LogicalManager) addLogicalDeviceAgentToMap(agent *LogicalAgent) {
+func (ldMgr *LogicalManager) addLogicalDeviceAgentToMap(ctx context.Context, agent *LogicalAgent) {
 	if _, exist := ldMgr.logicalDeviceAgents.Load(agent.logicalDeviceID); !exist {
 		ldMgr.logicalDeviceAgents.Store(agent.logicalDeviceID, agent)
 	}
@@ -79,7 +79,7 @@
 	return nil
 }
 
-func (ldMgr *LogicalManager) deleteLogicalDeviceAgent(logicalDeviceID string) {
+func (ldMgr *LogicalManager) deleteLogicalDeviceAgent(ctx context.Context, logicalDeviceID string) {
 	ldMgr.logicalDeviceAgents.Delete(logicalDeviceID)
 }
 
@@ -116,7 +116,7 @@
 	// For now use the serial number - it may contain any combination of alphabetic characters and numbers,
 	// with length varying from eight characters to a maximum of 14 characters.   Mac Address is part of oneof
 	// in the Device model.  May need to be moved out.
-	id := utils.CreateLogicalDeviceID()
+	id := utils.CreateLogicalDeviceID(ctx)
 	sn := strings.Replace(device.MacAddress, ":", "", -1)
 	if id == "" {
 		logger.Errorw("mac-address-not-set", log.Fields{"deviceId": device.Id, "serial-number": sn})
@@ -125,8 +125,8 @@
 
 	logger.Debugw("logical-device-id", log.Fields{"logicaldeviceId": id})
 
-	agent := newLogicalDeviceAgent(id, sn, device.Id, ldMgr, ldMgr.deviceMgr, ldMgr.clusterDataProxy, ldMgr.defaultTimeout)
-	ldMgr.addLogicalDeviceAgentToMap(agent)
+	agent := newLogicalDeviceAgent(ctx, id, sn, device.Id, ldMgr, ldMgr.deviceMgr, ldMgr.clusterDataProxy, ldMgr.defaultTimeout)
+	ldMgr.addLogicalDeviceAgentToMap(ctx, agent)
 
 	// Update the root device with the logical device Id reference
 	if err := ldMgr.deviceMgr.setParentID(ctx, device, id); err != nil {
@@ -139,7 +139,7 @@
 		err := agent.start(context.Background(), false)
 		if err != nil {
 			logger.Errorw("unable-to-create-the-logical-device", log.Fields{"error": err})
-			ldMgr.deleteLogicalDeviceAgent(id)
+			ldMgr.deleteLogicalDeviceAgent(ctx, id)
 		}
 	}()
 
@@ -196,7 +196,7 @@
 			ldMgr.logicalDevicesLoadingLock.Unlock()
 			if _, err := ldMgr.getLogicalDeviceFromModel(ctx, lDeviceID); err == nil {
 				logger.Debugw("loading-logical-device", log.Fields{"lDeviceId": lDeviceID})
-				agent := newLogicalDeviceAgent(lDeviceID, "", "", ldMgr, ldMgr.deviceMgr, ldMgr.clusterDataProxy, ldMgr.defaultTimeout)
+				agent := newLogicalDeviceAgent(ctx, lDeviceID, "", "", ldMgr, ldMgr.deviceMgr, ldMgr.clusterDataProxy, ldMgr.defaultTimeout)
 				if err := agent.start(ctx, true); err != nil {
 					return err
 				}
@@ -243,7 +243,7 @@
 			return err
 		}
 		//Remove the logical device agent from the Map
-		ldMgr.deleteLogicalDeviceAgent(logDeviceID)
+		ldMgr.deleteLogicalDeviceAgent(ctx, logDeviceID)
 	}
 
 	logger.Debug("deleting-logical-device-ends")
@@ -539,7 +539,7 @@
 func (ldMgr *LogicalManager) packetIn(ctx context.Context, logicalDeviceID string, port uint32, transactionID string, packet []byte) error {
 	logger.Debugw("packetIn", log.Fields{"logicalDeviceId": logicalDeviceID, "port": port})
 	if agent := ldMgr.getLogicalDeviceAgent(ctx, logicalDeviceID); agent != nil {
-		agent.packetIn(port, transactionID, packet)
+		agent.packetIn(ctx, port, transactionID, packet)
 	} else {
 		logger.Error("logical-device-not-exist", log.Fields{"logicalDeviceId": logicalDeviceID})
 	}
diff --git a/rw_core/core/device/manager.go b/rw_core/core/device/manager.go
index 0c4448b..3f88094 100755
--- a/rw_core/core/device/manager.go
+++ b/rw_core/core/device/manager.go
@@ -57,21 +57,21 @@
 	deviceLoadingInProgress map[string][]chan int
 }
 
-func NewManagers(proxy *model.Proxy, adapterMgr *adapter.Manager, kmp kafka.InterContainerProxy, endpointMgr kafka.EndpointManager, corePairTopic, coreInstanceID string, defaultCoreTimeout time.Duration) (*Manager, *LogicalManager) {
+func NewManagers(ctx context.Context, proxy *model.Proxy, adapterMgr *adapter.Manager, kmp kafka.InterContainerProxy, endpointMgr kafka.EndpointManager, corePairTopic, coreInstanceID string, defaultCoreTimeout time.Duration) (*Manager, *LogicalManager) {
 	deviceMgr := &Manager{
 		rootDevices:             make(map[string]bool),
 		kafkaICProxy:            kmp,
-		adapterProxy:            remote.NewAdapterProxy(kmp, corePairTopic, endpointMgr),
+		adapterProxy:            remote.NewAdapterProxy(ctx, kmp, corePairTopic, endpointMgr),
 		coreInstanceID:          coreInstanceID,
 		clusterDataProxy:        proxy,
 		adapterMgr:              adapterMgr,
 		defaultTimeout:          defaultCoreTimeout * time.Millisecond,
 		deviceLoadingInProgress: make(map[string][]chan int),
 	}
-	deviceMgr.stateTransitions = NewTransitionMap(deviceMgr)
+	deviceMgr.stateTransitions = NewTransitionMap(ctx, deviceMgr)
 
 	logicalDeviceMgr := &LogicalManager{
-		Manager:                        event.NewManager(),
+		Manager:                        event.NewManager(ctx),
 		deviceMgr:                      deviceMgr,
 		kafkaICProxy:                   kmp,
 		clusterDataProxy:               proxy,
@@ -80,12 +80,12 @@
 	}
 	deviceMgr.logicalDeviceMgr = logicalDeviceMgr
 
-	adapterMgr.SetAdapterRestartedCallback(deviceMgr.adapterRestarted)
+	adapterMgr.SetAdapterRestartedCallback(ctx, deviceMgr.adapterRestarted)
 
 	return deviceMgr, logicalDeviceMgr
 }
 
-func (dMgr *Manager) addDeviceAgentToMap(agent *Agent) {
+func (dMgr *Manager) addDeviceAgentToMap(ctx context.Context, agent *Agent) {
 	if _, exist := dMgr.deviceAgents.Load(agent.deviceID); !exist {
 		dMgr.deviceAgents.Store(agent.deviceID, agent)
 	}
@@ -95,7 +95,7 @@
 
 }
 
-func (dMgr *Manager) deleteDeviceAgentFromMap(agent *Agent) {
+func (dMgr *Manager) deleteDeviceAgentFromMap(ctx context.Context, agent *Agent) {
 	dMgr.deviceAgents.Delete(agent.deviceID)
 	dMgr.lockRootDeviceMap.Lock()
 	defer dMgr.lockRootDeviceMap.Unlock()
@@ -123,7 +123,7 @@
 }
 
 // listDeviceIdsFromMap returns the list of device IDs that are in memory
-func (dMgr *Manager) listDeviceIdsFromMap() *voltha.IDs {
+func (dMgr *Manager) listDeviceIdsFromMap(ctx context.Context) *voltha.IDs {
 	result := &voltha.IDs{Items: make([]*voltha.ID, 0)}
 
 	dMgr.deviceAgents.Range(func(key, value interface{}) bool {
@@ -137,7 +137,6 @@
 // CreateDevice creates a new parent device in the data model
 func (dMgr *Manager) CreateDevice(ctx context.Context, device *voltha.Device) (*voltha.Device, error) {
 	if device.MacAddress == "" && device.GetHostAndPort() == "" {
-		logger.Errorf("No Device Info Present")
 		return &voltha.Device{}, errors.New("no-device-info-present; MAC or HOSTIP&PORT")
 	}
 	logger.Debugw("create-device", log.Fields{"device": *device})
@@ -156,13 +155,13 @@
 	// Ensure this device is set as root
 	device.Root = true
 	// Create and start a device agent for that device
-	agent := newAgent(dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
+	agent := newAgent(ctx, dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
 	device, err = agent.start(ctx, device)
 	if err != nil {
 		logger.Errorw("Fail-to-start-device", log.Fields{"device-id": agent.deviceID, "error": err})
 		return nil, err
 	}
-	dMgr.addDeviceAgentToMap(agent)
+	dMgr.addDeviceAgentToMap(ctx, agent)
 	return device, nil
 }
 
@@ -242,8 +241,8 @@
 // deletion deletion also includes removal of any reference of this device.
 func (dMgr *Manager) stopManagingDevice(ctx context.Context, id string) {
 	logger.Infow("stopManagingDevice", log.Fields{"deviceId": id})
-	if dMgr.IsDeviceInCache(id) { // Proceed only if an agent is present for this device
-		if root, _ := dMgr.IsRootDevice(id); root {
+	if dMgr.IsDeviceInCache(ctx, id) { // Proceed only if an agent is present for this device
+		if root, _ := dMgr.IsRootDevice(ctx, id); root {
 			// stop managing the logical device
 			_ = dMgr.logicalDeviceMgr.stopManagingLogicalDeviceWithDeviceID(ctx, id)
 		}
@@ -251,7 +250,7 @@
 			if err := agent.stop(ctx); err != nil {
 				logger.Warnw("unable-to-stop-device-agent", log.Fields{"device-id": agent.deviceID, "error": err})
 			}
-			dMgr.deleteDeviceAgentFromMap(agent)
+			dMgr.deleteDeviceAgentFromMap(ctx, agent)
 		}
 	}
 }
@@ -287,7 +286,7 @@
 		return nil, status.Errorf(codes.Aborted, "%s", err.Error())
 	}
 	var childDeviceIds []string
-	if childDeviceIds, err = dMgr.getAllChildDeviceIds(parentDevice); err != nil {
+	if childDeviceIds, err = dMgr.getAllChildDeviceIds(ctx, parentDevice); err != nil {
 		return nil, status.Errorf(codes.Aborted, "%s", err.Error())
 	}
 	if len(childDeviceIds) == 0 {
@@ -349,7 +348,7 @@
 		return nil, status.Errorf(codes.Aborted, "%s", err.Error())
 	}
 	var childDeviceIds []string
-	if childDeviceIds, err = dMgr.getAllChildDeviceIds(parentDevice); err != nil {
+	if childDeviceIds, err = dMgr.getAllChildDeviceIds(ctx, parentDevice); err != nil {
 		return nil, status.Errorf(codes.Aborted, "%s", err.Error())
 	}
 	if len(childDeviceIds) == 0 {
@@ -377,13 +376,13 @@
 }
 
 // IsDeviceInCache returns true if device is found in the map
-func (dMgr *Manager) IsDeviceInCache(id string) bool {
+func (dMgr *Manager) IsDeviceInCache(ctx context.Context, id string) bool {
 	_, exist := dMgr.deviceAgents.Load(id)
 	return exist
 }
 
 // IsRootDevice returns true if root device is found in the map
-func (dMgr *Manager) IsRootDevice(id string) (bool, error) {
+func (dMgr *Manager) IsRootDevice(ctx context.Context, id string) (bool, error) {
 	dMgr.lockRootDeviceMap.RLock()
 	defer dMgr.lockRootDeviceMap.RUnlock()
 	if exist := dMgr.rootDevices[id]; exist {
@@ -405,13 +404,13 @@
 
 	for _, device := range devices {
 		// If device is not in memory then set it up
-		if !dMgr.IsDeviceInCache(device.Id) {
+		if !dMgr.IsDeviceInCache(ctx, device.Id) {
 			logger.Debugw("loading-device-from-Model", log.Fields{"id": device.Id})
-			agent := newAgent(dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
+			agent := newAgent(ctx, dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
 			if _, err := agent.start(ctx, nil); err != nil {
 				logger.Warnw("failure-starting-agent", log.Fields{"deviceId": device.Id})
 			} else {
-				dMgr.addDeviceAgentToMap(agent)
+				dMgr.addDeviceAgentToMap(ctx, agent)
 			}
 		}
 		result.Items = append(result.Items, device)
@@ -464,17 +463,17 @@
 	var device *voltha.Device
 	dMgr.devicesLoadingLock.Lock()
 	if _, exist := dMgr.deviceLoadingInProgress[deviceID]; !exist {
-		if !dMgr.IsDeviceInCache(deviceID) {
+		if !dMgr.IsDeviceInCache(ctx, deviceID) {
 			dMgr.deviceLoadingInProgress[deviceID] = []chan int{make(chan int, 1)}
 			dMgr.devicesLoadingLock.Unlock()
 			// Proceed with the loading only if the device exist in the Model (could have been deleted)
 			if device, err = dMgr.getDeviceFromModel(ctx, deviceID); err == nil {
 				logger.Debugw("loading-device", log.Fields{"deviceId": deviceID})
-				agent := newAgent(dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
+				agent := newAgent(ctx, dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
 				if _, err = agent.start(ctx, nil); err != nil {
 					logger.Warnw("Failure loading device", log.Fields{"deviceId": deviceID, "error": err})
 				} else {
-					dMgr.addDeviceAgentToMap(agent)
+					dMgr.addDeviceAgentToMap(ctx, agent)
 				}
 			} else {
 				logger.Debugw("Device not in model", log.Fields{"deviceId": deviceID})
@@ -518,7 +517,7 @@
 			logger.Debugw("no-parent-to-load", log.Fields{"deviceId": device.Id})
 		}
 		//	Load all child devices, if needed
-		if childDeviceIds, err := dMgr.getAllChildDeviceIds(device); err == nil {
+		if childDeviceIds, err := dMgr.getAllChildDeviceIds(ctx, device); err == nil {
 			for _, childDeviceID := range childDeviceIds {
 				if _, err := dMgr.loadDevice(ctx, childDeviceID); err != nil {
 					logger.Warnw("failure-loading-device", log.Fields{"deviceId": childDeviceID, "error": err})
@@ -574,10 +573,10 @@
 }
 
 // ListDeviceIds retrieves the latest device IDs information from the data model (memory data only)
-func (dMgr *Manager) ListDeviceIds(_ context.Context, _ *empty.Empty) (*voltha.IDs, error) {
+func (dMgr *Manager) ListDeviceIds(ctx context.Context, _ *empty.Empty) (*voltha.IDs, error) {
 	logger.Debug("ListDeviceIDs")
 	// Report only device IDs that are in the device agent map
-	return dMgr.listDeviceIdsFromMap(), nil
+	return dMgr.listDeviceIdsFromMap(ctx), nil
 }
 
 // ReconcileDevices is a request to a voltha core to update its list of managed devices.  This will
@@ -605,7 +604,7 @@
 }
 
 // isOkToReconcile validates whether a device is in the correct status to be reconciled
-func isOkToReconcile(device *voltha.Device) bool {
+func isOkToReconcile(ctx context.Context, device *voltha.Device) bool {
 	if device == nil {
 		return false
 	}
@@ -626,13 +625,13 @@
 	responses := make([]utils.Response, 0)
 	for rootDeviceID := range dMgr.rootDevices {
 		if rootDevice, _ := dMgr.getDeviceFromModel(ctx, rootDeviceID); rootDevice != nil {
-			isDeviceOwnedByService, err := dMgr.adapterProxy.IsDeviceOwnedByService(rootDeviceID, adapter.Type, adapter.CurrentReplica)
+			isDeviceOwnedByService, err := dMgr.adapterProxy.IsDeviceOwnedByService(ctx, rootDeviceID, adapter.Type, adapter.CurrentReplica)
 			if err != nil {
 				logger.Warnw("is-device-owned-by-service", log.Fields{"error": err, "root-device-id": rootDeviceID, "adapterType": adapter.Type, "replica-number": adapter.CurrentReplica})
 				continue
 			}
 			if isDeviceOwnedByService {
-				if isOkToReconcile(rootDevice) {
+				if isOkToReconcile(ctx, rootDevice) {
 					logger.Debugw("reconciling-root-device", log.Fields{"rootId": rootDevice.Id})
 					responses = append(responses, dMgr.sendReconcileDeviceRequest(ctx, rootDevice))
 				} else {
@@ -643,12 +642,12 @@
 				for _, port := range rootDevice.Ports {
 					for _, peer := range port.Peers {
 						if childDevice, _ := dMgr.getDeviceFromModel(ctx, peer.DeviceId); childDevice != nil {
-							isDeviceOwnedByService, err := dMgr.adapterProxy.IsDeviceOwnedByService(childDevice.Id, adapter.Type, adapter.CurrentReplica)
+							isDeviceOwnedByService, err := dMgr.adapterProxy.IsDeviceOwnedByService(ctx, childDevice.Id, adapter.Type, adapter.CurrentReplica)
 							if err != nil {
 								logger.Warnw("is-device-owned-by-service", log.Fields{"error": err, "child-device-id": childDevice.Id, "adapterType": adapter.Type, "replica-number": adapter.CurrentReplica})
 							}
 							if isDeviceOwnedByService {
-								if isOkToReconcile(childDevice) {
+								if isOkToReconcile(ctx, childDevice) {
 									logger.Debugw("reconciling-child-device", log.Fields{"child-device-id": childDevice.Id})
 									responses = append(responses, dMgr.sendReconcileDeviceRequest(ctx, childDevice))
 								} else {
@@ -667,7 +666,7 @@
 	}
 	if len(responses) > 0 {
 		// Wait for completion
-		if res := utils.WaitForNilOrErrorResponses(dMgr.defaultTimeout, responses...); res != nil {
+		if res := utils.WaitForNilOrErrorResponses(ctx, dMgr.defaultTimeout, responses...); res != nil {
 			return status.Errorf(codes.Aborted, "errors-%s", res)
 		}
 	} else {
@@ -681,20 +680,20 @@
 	// point of creating a device agent (if the device is not being managed by this Core) before sending the request
 	// to the adapter.   We will therefore bypass the adapter adapter and send the request directly to the adapter via
 	// the adapter proxy.
-	response := utils.NewResponse()
+	response := utils.NewResponse(ctx)
 	ch, err := dMgr.adapterProxy.ReconcileDevice(ctx, device)
 	if err != nil {
-		response.Error(err)
+		response.Error(ctx, err)
 	}
 	// Wait for adapter response in its own routine
 	go func() {
 		resp, ok := <-ch
 		if !ok {
-			response.Error(status.Errorf(codes.Aborted, "channel-closed-device: %s", device.Id))
+			response.Error(ctx, status.Errorf(codes.Aborted, "channel-closed-device: %s", device.Id))
 		} else if resp.Err != nil {
-			response.Error(resp.Err)
+			response.Error(ctx, resp.Err)
 		}
-		response.Done()
+		response.Done(ctx)
 	}()
 	return response
 }
@@ -710,7 +709,7 @@
 			}
 		}
 		// Wait for completion
-		if res := utils.WaitForNilOrErrorResponses(dMgr.defaultTimeout, responses...); res != nil {
+		if res := utils.WaitForNilOrErrorResponses(ctx, dMgr.defaultTimeout, responses...); res != nil {
 			return status.Errorf(codes.Aborted, "errors-%s", res)
 		}
 	}
@@ -869,7 +868,7 @@
 		return status.Errorf(codes.Aborted, "%s", err.Error())
 	}
 	var childDeviceIds []string
-	if childDeviceIds, err = dMgr.getAllChildDeviceIds(parentDevice); err != nil {
+	if childDeviceIds, err = dMgr.getAllChildDeviceIds(ctx, parentDevice); err != nil {
 		return status.Errorf(codes.Aborted, "%s", err.Error())
 	}
 	if len(childDeviceIds) == 0 {
@@ -1024,13 +1023,13 @@
 	childDevice.ProxyAddress = &voltha.Device_ProxyAddress{DeviceId: parentDeviceID, DeviceType: pAgent.deviceType, ChannelId: uint32(channelID), OnuId: uint32(onuID)}
 
 	// Create and start a device agent for that device
-	agent := newAgent(dMgr.adapterProxy, childDevice, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
+	agent := newAgent(ctx, dMgr.adapterProxy, childDevice, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
 	childDevice, err := agent.start(ctx, childDevice)
 	if err != nil {
 		logger.Errorw("error-starting-child-device", log.Fields{"parent-device-id": childDevice.ParentId, "child-device-id": agent.deviceID, "error": err})
 		return nil, err
 	}
-	dMgr.addDeviceAgentToMap(agent)
+	dMgr.addDeviceAgentToMap(ctx, agent)
 
 	// Activate the child device
 	if agent = dMgr.getDeviceAgent(ctx, agent.deviceID); agent != nil {
@@ -1044,7 +1043,7 @@
 
 	// Publish on the messaging bus that we have discovered new devices
 	go func() {
-		err := dMgr.kafkaICProxy.DeviceDiscovered(agent.deviceID, deviceType, parentDeviceID, dMgr.coreInstanceID)
+		err := dMgr.kafkaICProxy.DeviceDiscovered(ctx, agent.deviceID, deviceType, parentDeviceID, dMgr.coreInstanceID)
 		if err != nil {
 			logger.Errorw("unable-to-discover-the-device", log.Fields{"error": err})
 		}
@@ -1064,16 +1063,16 @@
 		"curr-oper-state":  device.OperStatus,
 		"curr-conn-state":  device.ConnectStatus,
 	})
-	handlers := dMgr.stateTransitions.GetTransitionHandler(device, previousState)
+	handlers := dMgr.stateTransitions.GetTransitionHandler(ctx, device, previousState)
 	if handlers == nil {
 		logger.Debugw("no-op-transition", log.Fields{"deviceId": device.Id})
 		return nil
 	}
 	logger.Debugw("handler-found", log.Fields{"num-expectedHandlers": len(handlers), "isParent": device.Root, "current-data": device, "previous-state": previousState})
 	for _, handler := range handlers {
-		logger.Debugw("running-handler", log.Fields{"handler": funcName(handler)})
+		logger.Debugw("running-handler", log.Fields{"handler": funcName(ctx, handler)})
 		if err := handler(ctx, device); err != nil {
-			logger.Warnw("handler-failed", log.Fields{"handler": funcName(handler), "error": err})
+			logger.Warnw("handler-failed", log.Fields{"handler": funcName(ctx, handler), "error": err})
 			return err
 		}
 	}
@@ -1210,7 +1209,7 @@
 		return err
 	}
 
-	if childDeviceIds, err = dMgr.getAllChildDeviceIds(parentDevice); err != nil {
+	if childDeviceIds, err = dMgr.getAllChildDeviceIds(ctx, parentDevice); err != nil {
 		return status.Errorf(codes.NotFound, "%s", parentDevice.Id)
 	}
 	if len(childDeviceIds) == 0 {
@@ -1248,7 +1247,7 @@
 	logger.Debug("DisableAllChildDevices")
 	var childDeviceIds []string
 	var err error
-	if childDeviceIds, err = dMgr.getAllChildDeviceIds(parentCurrDevice); err != nil {
+	if childDeviceIds, err = dMgr.getAllChildDeviceIds(ctx, parentCurrDevice); err != nil {
 		return status.Errorf(codes.NotFound, "%s", parentCurrDevice.Id)
 	}
 	if len(childDeviceIds) == 0 {
@@ -1270,7 +1269,7 @@
 	logger.Debug("DeleteAllChildDevices")
 	var childDeviceIds []string
 	var err error
-	if childDeviceIds, err = dMgr.getAllChildDeviceIds(parentCurrDevice); err != nil {
+	if childDeviceIds, err = dMgr.getAllChildDeviceIds(ctx, parentCurrDevice); err != nil {
 		return status.Errorf(codes.NotFound, "%s", parentCurrDevice.Id)
 	}
 	if len(childDeviceIds) == 0 {
@@ -1312,7 +1311,7 @@
 }
 
 //getAllChildDeviceIds is a helper method to get all the child device IDs from the device passed as parameter
-func (dMgr *Manager) getAllChildDeviceIds(parentDevice *voltha.Device) ([]string, error) {
+func (dMgr *Manager) getAllChildDeviceIds(ctx context.Context, parentDevice *voltha.Device) ([]string, error) {
 	logger.Debugw("getAllChildDeviceIds", log.Fields{"parentDeviceId": parentDevice.Id})
 	childDeviceIds := make([]string, 0)
 	if parentDevice != nil {
@@ -1331,7 +1330,7 @@
 	logger.Debugw("GetAllChildDevices", log.Fields{"parentDeviceId": parentDeviceID})
 	if parentDevice, err := dMgr.getDevice(ctx, parentDeviceID); err == nil {
 		childDevices := make([]*voltha.Device, 0)
-		if childDeviceIds, er := dMgr.getAllChildDeviceIds(parentDevice); er == nil {
+		if childDeviceIds, er := dMgr.getAllChildDeviceIds(ctx, parentDevice); er == nil {
 			for _, deviceID := range childDeviceIds {
 				if d, e := dMgr.getDevice(ctx, deviceID); e == nil && d != nil {
 					childDevices = append(childDevices, d)
@@ -1491,7 +1490,7 @@
 	return nil
 }
 
-func funcName(f interface{}) string {
+func funcName(ctx context.Context, f interface{}) string {
 	p := reflect.ValueOf(f).Pointer()
 	rf := runtime.FuncForPC(p)
 	return rf.Name()
diff --git a/rw_core/core/device/remote/adapter_proxy.go b/rw_core/core/device/remote/adapter_proxy.go
index f4579ef..eab5aa7 100755
--- a/rw_core/core/device/remote/adapter_proxy.go
+++ b/rw_core/core/device/remote/adapter_proxy.go
@@ -34,7 +34,7 @@
 }
 
 // NewAdapterProxy will return adapter proxy instance
-func NewAdapterProxy(kafkaProxy kafka.InterContainerProxy, corePairTopic string, endpointManager kafka.EndpointManager) *AdapterProxy {
+func NewAdapterProxy(ctx context.Context, kafkaProxy kafka.InterContainerProxy, corePairTopic string, endpointManager kafka.EndpointManager) *AdapterProxy {
 	return &AdapterProxy{
 		EndpointManager:       endpointManager,
 		kafkaICProxy:          kafkaProxy,
@@ -43,13 +43,12 @@
 	}
 }
 
-func (ap *AdapterProxy) getCoreTopic() kafka.Topic {
+func (ap *AdapterProxy) getCoreTopic(ctx context.Context) kafka.Topic {
 	return kafka.Topic{Name: ap.corePairTopic}
 }
 
-func (ap *AdapterProxy) getAdapterTopic(deviceID string, adapterType string) (*kafka.Topic, error) {
-
-	endpoint, err := ap.GetEndpoint(deviceID, adapterType)
+func (ap *AdapterProxy) getAdapterTopic(ctx context.Context, deviceID string, adapterType string) (*kafka.Topic, error) {
+	endpoint, err := ap.GetEndpoint(ctx, deviceID, adapterType)
 	if err != nil {
 		return nil, err
 	}
@@ -59,7 +58,6 @@
 
 func (ap *AdapterProxy) sendRPC(ctx context.Context, rpc string, toTopic *kafka.Topic, replyToTopic *kafka.Topic,
 	waitForResponse bool, deviceID string, kvArgs ...*kafka.KVArg) (chan *kafka.RpcResponse, error) {
-
 	// Sent the request to kafka
 	respChnl := ap.kafkaICProxy.InvokeAsyncRPC(ctx, rpc, toTopic, replyToTopic, waitForResponse, deviceID, kvArgs...)
 
@@ -77,14 +75,14 @@
 func (ap *AdapterProxy) AdoptDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("AdoptDevice", log.Fields{"device-id": device.Id})
 	rpc := "adopt_device"
-	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
 	if err != nil {
 		return nil, err
 	}
 	args := []*kafka.KVArg{
 		{Key: "device", Value: device},
 	}
-	replyToTopic := ap.getCoreTopic()
+	replyToTopic := ap.getCoreTopic(ctx)
 	ap.deviceTopicRegistered = true
 	logger.Debugw("adoptDevice-send-request", log.Fields{"device-id": device.Id, "deviceType": device.Type, "serialNumber": device.SerialNumber})
 	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
@@ -94,14 +92,14 @@
 func (ap *AdapterProxy) DisableDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("DisableDevice", log.Fields{"device-id": device.Id})
 	rpc := "disable_device"
-	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
 	if err != nil {
 		return nil, err
 	}
 	args := []*kafka.KVArg{
 		{Key: "device", Value: device},
 	}
-	replyToTopic := ap.getCoreTopic()
+	replyToTopic := ap.getCoreTopic(ctx)
 	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
@@ -109,14 +107,14 @@
 func (ap *AdapterProxy) ReEnableDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("ReEnableDevice", log.Fields{"device-id": device.Id})
 	rpc := "reenable_device"
-	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
 	if err != nil {
 		return nil, err
 	}
 	args := []*kafka.KVArg{
 		{Key: "device", Value: device},
 	}
-	replyToTopic := ap.getCoreTopic()
+	replyToTopic := ap.getCoreTopic(ctx)
 	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
@@ -124,14 +122,14 @@
 func (ap *AdapterProxy) RebootDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("RebootDevice", log.Fields{"device-id": device.Id})
 	rpc := "reboot_device"
-	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
 	if err != nil {
 		return nil, err
 	}
 	args := []*kafka.KVArg{
 		{Key: "device", Value: device},
 	}
-	replyToTopic := ap.getCoreTopic()
+	replyToTopic := ap.getCoreTopic(ctx)
 	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
@@ -139,14 +137,14 @@
 func (ap *AdapterProxy) DeleteDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("DeleteDevice", log.Fields{"device-id": device.Id})
 	rpc := "delete_device"
-	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
 	if err != nil {
 		return nil, err
 	}
 	args := []*kafka.KVArg{
 		{Key: "device", Value: device},
 	}
-	replyToTopic := ap.getCoreTopic()
+	replyToTopic := ap.getCoreTopic(ctx)
 	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
@@ -154,21 +152,21 @@
 func (ap *AdapterProxy) GetOfpDeviceInfo(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("GetOfpDeviceInfo", log.Fields{"device-id": device.Id})
 	rpc := "get_ofp_device_info"
-	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
 	if err != nil {
 		return nil, err
 	}
 	args := []*kafka.KVArg{
 		{Key: "device", Value: device},
 	}
-	replyToTopic := ap.getCoreTopic()
+	replyToTopic := ap.getCoreTopic(ctx)
 	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
 // GetOfpPortInfo invokes get ofp port info rpc
 func (ap *AdapterProxy) GetOfpPortInfo(ctx context.Context, device *voltha.Device, portNo uint32) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("GetOfpPortInfo", log.Fields{"device-id": device.Id, "port-no": portNo})
-	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
 	if err != nil {
 		return nil, err
 	}
@@ -176,7 +174,7 @@
 		{Key: "device", Value: device},
 		{Key: "port_no", Value: &ic.IntType{Val: int64(portNo)}},
 	}
-	replyToTopic := ap.getCoreTopic()
+	replyToTopic := ap.getCoreTopic(ctx)
 	return ap.sendRPC(ctx, "get_ofp_port_info", toTopic, &replyToTopic, true, device.Id, args...)
 }
 
@@ -184,14 +182,14 @@
 func (ap *AdapterProxy) ReconcileDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("ReconcileDevice", log.Fields{"device-id": device.Id})
 	rpc := "reconcile_device"
-	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
 	if err != nil {
 		return nil, err
 	}
 	args := []*kafka.KVArg{
 		{Key: "device", Value: device},
 	}
-	replyToTopic := ap.getCoreTopic()
+	replyToTopic := ap.getCoreTopic(ctx)
 	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
@@ -199,7 +197,7 @@
 func (ap *AdapterProxy) DownloadImage(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("DownloadImage", log.Fields{"device-id": device.Id, "image": download.Name})
 	rpc := "download_image"
-	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
 	if err != nil {
 		return nil, err
 	}
@@ -207,7 +205,7 @@
 		{Key: "device", Value: device},
 		{Key: "request", Value: download},
 	}
-	replyToTopic := ap.getCoreTopic()
+	replyToTopic := ap.getCoreTopic(ctx)
 	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
@@ -215,7 +213,7 @@
 func (ap *AdapterProxy) GetImageDownloadStatus(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("GetImageDownloadStatus", log.Fields{"device-id": device.Id, "image": download.Name})
 	rpc := "get_image_download_status"
-	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
 	if err != nil {
 		return nil, err
 	}
@@ -223,7 +221,7 @@
 		{Key: "device", Value: device},
 		{Key: "request", Value: download},
 	}
-	replyToTopic := ap.getCoreTopic()
+	replyToTopic := ap.getCoreTopic(ctx)
 	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
@@ -231,7 +229,7 @@
 func (ap *AdapterProxy) CancelImageDownload(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("CancelImageDownload", log.Fields{"device-id": device.Id, "image": download.Name})
 	rpc := "cancel_image_download"
-	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
 	if err != nil {
 		return nil, err
 	}
@@ -239,7 +237,7 @@
 		{Key: "device", Value: device},
 		{Key: "request", Value: download},
 	}
-	replyToTopic := ap.getCoreTopic()
+	replyToTopic := ap.getCoreTopic(ctx)
 	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
@@ -247,7 +245,7 @@
 func (ap *AdapterProxy) ActivateImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("ActivateImageUpdate", log.Fields{"device-id": device.Id, "image": download.Name})
 	rpc := "activate_image_update"
-	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
 	if err != nil {
 		return nil, err
 	}
@@ -255,7 +253,7 @@
 		{Key: "device", Value: device},
 		{Key: "request", Value: download},
 	}
-	replyToTopic := ap.getCoreTopic()
+	replyToTopic := ap.getCoreTopic(ctx)
 	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
@@ -263,7 +261,7 @@
 func (ap *AdapterProxy) RevertImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("RevertImageUpdate", log.Fields{"device-id": device.Id, "image": download.Name})
 	rpc := "revert_image_update"
-	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
 	if err != nil {
 		return nil, err
 	}
@@ -271,13 +269,13 @@
 		{Key: "device", Value: device},
 		{Key: "request", Value: download},
 	}
-	replyToTopic := ap.getCoreTopic()
+	replyToTopic := ap.getCoreTopic(ctx)
 	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
 func (ap *AdapterProxy) PacketOut(ctx context.Context, deviceType string, deviceID string, outPort uint32, packet *openflow_13.OfpPacketOut) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("PacketOut", log.Fields{"device-id": deviceID, "device-type": deviceType, "out-port": outPort})
-	toTopic, err := ap.getAdapterTopic(deviceID, deviceType)
+	toTopic, err := ap.getAdapterTopic(ctx, deviceID, deviceType)
 	if err != nil {
 		return nil, err
 	}
@@ -287,14 +285,14 @@
 		{Key: "outPort", Value: &ic.IntType{Val: int64(outPort)}},
 		{Key: "packet", Value: packet},
 	}
-	replyToTopic := ap.getCoreTopic()
+	replyToTopic := ap.getCoreTopic(ctx)
 	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, deviceID, args...)
 }
 
 // UpdateFlowsBulk invokes update flows bulk rpc
 func (ap *AdapterProxy) UpdateFlowsBulk(ctx context.Context, device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups, flowMetadata *voltha.FlowMetadata) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("UpdateFlowsBulk", log.Fields{"device-id": device.Id, "flow-count": len(flows.Items), "group-count": len(groups.Items), "flow-metadata": flowMetadata})
-	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
 	if err != nil {
 		return nil, err
 	}
@@ -305,8 +303,8 @@
 		{Key: "groups", Value: groups},
 		{Key: "flow_metadata", Value: flowMetadata},
 	}
-	replyToTopic := ap.getCoreTopic()
-	return ap.sendRPC(context.TODO(), rpc, toTopic, &replyToTopic, true, device.Id, args...)
+	replyToTopic := ap.getCoreTopic(ctx)
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
 // UpdateFlowsIncremental invokes update flows incremental rpc
@@ -320,7 +318,7 @@
 			"group-to-delete-count": len(groupChanges.ToRemove.Items),
 			"group-to-update-count": len(groupChanges.ToUpdate.Items),
 		})
-	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
 	if err != nil {
 		return nil, err
 	}
@@ -331,14 +329,14 @@
 		{Key: "group_changes", Value: groupChanges},
 		{Key: "flow_metadata", Value: flowMetadata},
 	}
-	replyToTopic := ap.getCoreTopic()
-	return ap.sendRPC(context.TODO(), rpc, toTopic, &replyToTopic, true, device.Id, args...)
+	replyToTopic := ap.getCoreTopic(ctx)
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
 // UpdatePmConfigs invokes update pm configs rpc
 func (ap *AdapterProxy) UpdatePmConfigs(ctx context.Context, device *voltha.Device, pmConfigs *voltha.PmConfigs) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("UpdatePmConfigs", log.Fields{"device-id": device.Id, "pm-configs-id": pmConfigs.Id})
-	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
 	if err != nil {
 		return nil, err
 	}
@@ -347,7 +345,7 @@
 		{Key: "device", Value: device},
 		{Key: "pm_configs", Value: pmConfigs},
 	}
-	replyToTopic := ap.getCoreTopic()
+	replyToTopic := ap.getCoreTopic(ctx)
 	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
@@ -355,7 +353,7 @@
 func (ap *AdapterProxy) SimulateAlarm(ctx context.Context, device *voltha.Device, simulateReq *voltha.SimulateAlarmRequest) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("SimulateAlarm", log.Fields{"device-id": device.Id, "simulate-req-id": simulateReq.Id})
 	rpc := "simulate_alarm"
-	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
 	if err != nil {
 		return nil, err
 	}
@@ -363,7 +361,7 @@
 		{Key: "device", Value: device},
 		{Key: "request", Value: simulateReq},
 	}
-	replyToTopic := ap.getCoreTopic()
+	replyToTopic := ap.getCoreTopic(ctx)
 	ap.deviceTopicRegistered = true
 	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
@@ -371,7 +369,7 @@
 func (ap *AdapterProxy) DisablePort(ctx context.Context, device *voltha.Device, port *voltha.Port) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("DisablePort", log.Fields{"device-id": device.Id, "port-no": port.PortNo})
 	rpc := "disable_port"
-	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
 	if err != nil {
 		return nil, err
 	}
@@ -379,14 +377,14 @@
 		{Key: "deviceId", Value: &ic.StrType{Val: device.Id}},
 		{Key: "port", Value: port},
 	}
-	replyToTopic := ap.getCoreTopic()
+	replyToTopic := ap.getCoreTopic(ctx)
 	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
 func (ap *AdapterProxy) EnablePort(ctx context.Context, device *voltha.Device, port *voltha.Port) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("EnablePort", log.Fields{"device-id": device.Id, "port-no": port.PortNo})
 	rpc := "enable_port"
-	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
 	if err != nil {
 		return nil, err
 	}
@@ -394,7 +392,7 @@
 		{Key: "deviceId", Value: &ic.StrType{Val: device.Id}},
 		{Key: "port", Value: port},
 	}
-	replyToTopic := ap.getCoreTopic()
+	replyToTopic := ap.getCoreTopic(ctx)
 	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
@@ -402,7 +400,7 @@
 func (ap *AdapterProxy) ChildDeviceLost(ctx context.Context, deviceType string, deviceID string, pPortNo uint32, onuID uint32) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("ChildDeviceLost", log.Fields{"device-id": deviceID, "parent-port-no": pPortNo, "onu-id": onuID})
 	rpc := "child_device_lost"
-	toTopic, err := ap.getAdapterTopic(deviceID, deviceType)
+	toTopic, err := ap.getAdapterTopic(ctx, deviceID, deviceType)
 	if err != nil {
 		return nil, err
 	}
@@ -411,19 +409,19 @@
 		{Key: "pPortNo", Value: &ic.IntType{Val: int64(pPortNo)}},
 		{Key: "onuID", Value: &ic.IntType{Val: int64(onuID)}},
 	}
-	replyToTopic := ap.getCoreTopic()
+	replyToTopic := ap.getCoreTopic(ctx)
 	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, deviceID, args...)
 }
 
 func (ap *AdapterProxy) StartOmciTest(ctx context.Context, device *voltha.Device, omcitestrequest *voltha.OmciTestRequest) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("Omci_test_Request_adapter_proxy", log.Fields{"device": device, "omciTestRequest": omcitestrequest})
 	rpc := "start_omci_test"
-	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
 	if err != nil {
 		return nil, err
 	}
 	// Use a device specific topic as we are the only core handling requests for this device
-	replyToTopic := ap.getCoreTopic()
+	replyToTopic := ap.getCoreTopic(ctx)
 	// TODO: Perhaps this should have used omcitestrequest.uuid as the second argument rather
 	//   than including the whole request, which is (deviceid, uuid)
 	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id,
@@ -434,7 +432,7 @@
 func (ap *AdapterProxy) GetExtValue(ctx context.Context, pdevice *voltha.Device, cdevice *voltha.Device, id string, valuetype voltha.ValueType_Type) (chan *kafka.RpcResponse, error) {
 	log.Debugw("GetExtValue", log.Fields{"device-id": pdevice.Id, "onuid": id})
 	rpc := "get_ext_value"
-	toTopic, err := ap.getAdapterTopic(pdevice.Id, pdevice.Adapter)
+	toTopic, err := ap.getAdapterTopic(ctx, pdevice.Id, pdevice.Adapter)
 	if err != nil {
 		return nil, err
 	}
@@ -454,6 +452,6 @@
 			Value: &ic.IntType{Val: int64(valuetype)},
 		}}
 
-	replyToTopic := ap.getCoreTopic()
+	replyToTopic := ap.getCoreTopic(ctx)
 	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, pdevice.Id, args...)
 }
diff --git a/rw_core/core/device/remote/adapter_proxy_test.go b/rw_core/core/device/remote/adapter_proxy_test.go
index 17627dc..d3b86fa 100755
--- a/rw_core/core/device/remote/adapter_proxy_test.go
+++ b/rw_core/core/device/remote/adapter_proxy_test.go
@@ -58,35 +58,38 @@
 	log.SetAllLogLevel(log.WarnLevel)
 
 	var err error
+	ctx := context.Background()
 
 	// Create the KV client
-	kc = mock_kafka.NewKafkaClient()
+	kc = mock_kafka.NewKafkaClient(ctx)
 
 	// Setup core inter-container proxy and core request handler
 	coreKafkaICProxy = kafka.NewInterContainerProxy(
-		kafka.MsgClient(kc),
-		kafka.DefaultTopic(&kafka.Topic{Name: coreName}))
+		ctx,
+		kafka.MsgClient(ctx, kc),
+		kafka.DefaultTopic(ctx, &kafka.Topic{Name: coreName}))
 
-	if err = coreKafkaICProxy.Start(); err != nil {
+	if err = coreKafkaICProxy.Start(ctx); err != nil {
 		logger.Fatalw("Failure-starting-core-kafka-intercontainerProxy", log.Fields{"error": err})
 	}
-	if err = coreKafkaICProxy.SubscribeWithDefaultRequestHandler(kafka.Topic{Name: coreName}, 0); err != nil {
+	if err = coreKafkaICProxy.SubscribeWithDefaultRequestHandler(ctx, kafka.Topic{Name: coreName}, 0); err != nil {
 		logger.Fatalw("Failure-subscribing-core-request-handler", log.Fields{"error": err})
 	}
 
 	// Setup adapter inter-container proxy and adapter request handler
-	adapterCoreProxy := com.NewCoreProxy(nil, adapterName, coreName)
-	adapter = cm.NewAdapter(adapterCoreProxy)
-	adapterReqHandler = com.NewRequestHandlerProxy(coreInstanceID, adapter, adapterCoreProxy)
+	adapterCoreProxy := com.NewCoreProxy(ctx, nil, adapterName, coreName)
+	adapter = cm.NewAdapter(ctx, adapterCoreProxy)
+	adapterReqHandler = com.NewRequestHandlerProxy(ctx, coreInstanceID, adapter, adapterCoreProxy)
 	adapterKafkaICProxy = kafka.NewInterContainerProxy(
-		kafka.MsgClient(kc),
-		kafka.DefaultTopic(&kafka.Topic{Name: adapterName}),
-		kafka.RequestHandlerInterface(adapterReqHandler))
+		ctx,
+		kafka.MsgClient(ctx, kc),
+		kafka.DefaultTopic(ctx, &kafka.Topic{Name: adapterName}),
+		kafka.RequestHandlerInterface(ctx, adapterReqHandler))
 
-	if err = adapterKafkaICProxy.Start(); err != nil {
+	if err = adapterKafkaICProxy.Start(ctx); err != nil {
 		logger.Fatalw("Failure-starting-adapter-kafka-intercontainerProxy", log.Fields{"error": err})
 	}
-	if err = adapterKafkaICProxy.SubscribeWithDefaultRequestHandler(kafka.Topic{Name: adapterName}, 0); err != nil {
+	if err = adapterKafkaICProxy.SubscribeWithDefaultRequestHandler(ctx, kafka.Topic{Name: adapterName}, 0); err != nil {
 		logger.Fatalw("Failure-subscribing-adapter-request-handler", log.Fields{"error": err})
 	}
 }
@@ -98,7 +101,7 @@
 }
 
 func TestCreateAdapterProxy(t *testing.T) {
-	ap := NewAdapterProxy(coreKafkaICProxy, coreName, mock_kafka.NewEndpointManager())
+	ap := NewAdapterProxy(context.Background(), coreKafkaICProxy, coreName, mock_kafka.NewEndpointManager(context.Background()))
 	assert.NotNil(t, ap)
 }
 
@@ -119,7 +122,7 @@
 
 func testSimpleRequests(t *testing.T) {
 	type simpleRequest func(context.Context, *voltha.Device) (chan *kafka.RpcResponse, error)
-	ap := NewAdapterProxy(coreKafkaICProxy, coreName, mock_kafka.NewEndpointManager())
+	ap := NewAdapterProxy(context.Background(), coreKafkaICProxy, coreName, mock_kafka.NewEndpointManager(context.Background()))
 	simpleRequests := []simpleRequest{
 		ap.AdoptDevice,
 		ap.DisableDevice,
@@ -162,7 +165,7 @@
 }
 
 func testGetSwitchCapabilityFromAdapter(t *testing.T) {
-	ap := NewAdapterProxy(coreKafkaICProxy, coreName, mock_kafka.NewEndpointManager())
+	ap := NewAdapterProxy(context.Background(), coreKafkaICProxy, coreName, mock_kafka.NewEndpointManager(context.Background()))
 	d := &voltha.Device{Id: "deviceId", Adapter: adapterName}
 	ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
 	defer cancel()
@@ -174,12 +177,12 @@
 	err = ptypes.UnmarshalAny(response, switchCap)
 	assert.Nil(t, err)
 	assert.NotNil(t, switchCap)
-	expectedCap, _ := adapter.Get_ofp_device_info(d)
+	expectedCap, _ := adapter.Get_ofp_device_info(ctx, d)
 	assert.Equal(t, switchCap.String(), expectedCap.String())
 }
 
 func testGetPortInfoFromAdapter(t *testing.T) {
-	ap := NewAdapterProxy(coreKafkaICProxy, coreName, mock_kafka.NewEndpointManager())
+	ap := NewAdapterProxy(context.Background(), coreKafkaICProxy, coreName, mock_kafka.NewEndpointManager(context.Background()))
 	d := &voltha.Device{Id: "deviceId", Adapter: adapterName}
 	ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
 	defer cancel()
@@ -192,12 +195,12 @@
 	err = ptypes.UnmarshalAny(response, portCap)
 	assert.Nil(t, err)
 	assert.NotNil(t, portCap)
-	expectedPortInfo, _ := adapter.Get_ofp_port_info(d, int64(portNo))
+	expectedPortInfo, _ := adapter.Get_ofp_port_info(context.Background(), d, int64(portNo))
 	assert.Equal(t, portCap.String(), expectedPortInfo.String())
 }
 
 func testPacketOut(t *testing.T) {
-	ap := NewAdapterProxy(coreKafkaICProxy, coreName, mock_kafka.NewEndpointManager())
+	ap := NewAdapterProxy(context.Background(), coreKafkaICProxy, coreName, mock_kafka.NewEndpointManager(context.Background()))
 	d := &voltha.Device{Id: "deviceId", Adapter: adapterName}
 	outPort := uint32(1)
 	packet, err := getRandomBytes(50)
@@ -211,7 +214,7 @@
 }
 
 func testFlowUpdates(t *testing.T) {
-	ap := NewAdapterProxy(coreKafkaICProxy, coreName, mock_kafka.NewEndpointManager())
+	ap := NewAdapterProxy(context.Background(), coreKafkaICProxy, coreName, mock_kafka.NewEndpointManager(context.Background()))
 	d := &voltha.Device{Id: "deviceId", Adapter: adapterName}
 	_, err := ap.UpdateFlowsBulk(context.Background(), d, &voltha.Flows{}, &voltha.FlowGroups{}, &voltha.FlowMetadata{})
 	assert.Nil(t, err)
@@ -226,7 +229,7 @@
 }
 
 func testPmUpdates(t *testing.T) {
-	ap := NewAdapterProxy(coreKafkaICProxy, coreName, mock_kafka.NewEndpointManager())
+	ap := NewAdapterProxy(context.Background(), coreKafkaICProxy, coreName, mock_kafka.NewEndpointManager(context.Background()))
 	d := &voltha.Device{Id: "deviceId", Adapter: adapterName}
 	ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
 	defer cancel()
diff --git a/rw_core/core/device/state_transitions.go b/rw_core/core/device/state_transitions.go
index 15f4c1e..66e7d4d 100644
--- a/rw_core/core/device/state_transitions.go
+++ b/rw_core/core/device/state_transitions.go
@@ -46,18 +46,18 @@
 }
 
 // toInt returns an integer representing the matching level of the match (the larger the number the better)
-func (m *match) toInt() int {
+func (m *match) toInt(ctx context.Context) int {
 	return int(m.admin<<4 | m.oper<<2 | m.conn)
 }
 
 // isExactMatch returns true if match is an exact match
-func (m *match) isExactMatch() bool {
+func (m *match) isExactMatch(ctx context.Context) bool {
 	return m.admin == currPrevStateMatch && m.oper == currPrevStateMatch && m.conn == currPrevStateMatch
 }
 
 // isBetterMatch returns true if newMatch is a worse match
-func (m *match) isBetterMatch(newMatch *match) bool {
-	return m.toInt() > newMatch.toInt()
+func (m *match) isBetterMatch(ctx context.Context, newMatch *match) bool {
+	return m.toInt(ctx) > newMatch.toInt(ctx)
 }
 
 // deviceState has admin, operational and connection status of device
@@ -85,7 +85,7 @@
 }
 
 // NewTransitionMap creates transition map
-func NewTransitionMap(dMgr coreif.DeviceManager) *TransitionMap {
+func NewTransitionMap(ctx context.Context, dMgr coreif.DeviceManager) *TransitionMap {
 	var transitionMap TransitionMap
 	transitionMap.dMgr = dMgr
 	transitionMap.transitions = make([]Transition, 0)
@@ -238,12 +238,12 @@
 	return &transitionMap
 }
 
-func getDeviceStates(device *voltha.Device) *deviceState {
+func getDeviceStates(ctx context.Context, device *voltha.Device) *deviceState {
 	return &deviceState{Admin: device.AdminState, Connection: device.ConnectStatus, Operational: device.OperStatus}
 }
 
 // isMatched matches a state transition.  It returns whether there is a match and if there is whether it is an exact match
-func getHandler(previous *deviceState, current *deviceState, transition *Transition) ([]TransitionHandler, *match) {
+func getHandler(ctx context.Context, previous *deviceState, current *deviceState, transition *Transition) ([]TransitionHandler, *match) {
 	m := &match{}
 	// Do we have an exact match?
 	if *previous == transition.previousState && *current == transition.currentState {
@@ -297,9 +297,9 @@
 }
 
 // GetTransitionHandler returns transition handler & a flag that's set if the transition is invalid
-func (tMap *TransitionMap) GetTransitionHandler(device *voltha.Device, pState *deviceState) []TransitionHandler {
+func (tMap *TransitionMap) GetTransitionHandler(ctx context.Context, device *voltha.Device, pState *deviceState) []TransitionHandler {
 	//1. Get the previous and current set of states
-	cState := getDeviceStates(device)
+	cState := getDeviceStates(ctx, device)
 
 	// Do nothing is there are no states change
 	if *pState == *cState {
@@ -324,11 +324,11 @@
 		if aTransition.deviceType != deviceType && aTransition.deviceType != any {
 			continue
 		}
-		tempHandler, m = getHandler(pState, cState, &aTransition)
+		tempHandler, m = getHandler(ctx, pState, cState, &aTransition)
 		if tempHandler != nil {
-			if m.isExactMatch() && aTransition.deviceType == deviceType {
+			if m.isExactMatch(ctx) && aTransition.deviceType == deviceType {
 				return tempHandler
-			} else if m.isExactMatch() || m.isBetterMatch(bestMatch) {
+			} else if m.isExactMatch(ctx) || m.isBetterMatch(ctx, bestMatch) {
 				currentMatch = tempHandler
 				bestMatch = m
 			}
diff --git a/rw_core/core/device/state_transitions_test.go b/rw_core/core/device/state_transitions_test.go
index 41d77dd..1f86e17 100644
--- a/rw_core/core/device/state_transitions_test.go
+++ b/rw_core/core/device/state_transitions_test.go
@@ -16,14 +16,14 @@
 package device
 
 import (
+	"context"
 	"fmt"
-	"reflect"
-	"testing"
-
 	"github.com/opencord/voltha-go/rw_core/coreif"
 	"github.com/opencord/voltha-go/rw_core/mocks"
 	"github.com/opencord/voltha-protos/v3/go/voltha"
 	"github.com/stretchr/testify/assert"
+	"reflect"
+	"testing"
 )
 
 var transitionMap *TransitionMap
@@ -39,7 +39,7 @@
 
 func init() {
 	tdm = newTestDeviceManager()
-	transitionMap = NewTransitionMap(tdm)
+	transitionMap = NewTransitionMap(context.Background(), tdm)
 }
 
 func getDevice(root bool, admin voltha.AdminState_Types, conn voltha.ConnectStatus_Types, oper voltha.OperStatus_Types) *voltha.Device {
@@ -61,122 +61,122 @@
 }
 
 func assertInvalidTransition(t *testing.T, device *voltha.Device, previousState *deviceState) {
-	handlers := transitionMap.GetTransitionHandler(device, previousState)
+	handlers := transitionMap.GetTransitionHandler(context.Background(), device, previousState)
 	assert.Equal(t, 1, len(handlers))
 	assert.True(t, reflect.ValueOf(tdm.NotifyInvalidTransition).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
 }
 
 func assertNoOpTransition(t *testing.T, device *voltha.Device, previousState *deviceState) {
-	handlers := transitionMap.GetTransitionHandler(device, previousState)
+	handlers := transitionMap.GetTransitionHandler(context.Background(), device, previousState)
 	assert.Equal(t, 0, len(handlers))
 }
 
 func TestValidTransitions(t *testing.T) {
+	ctx := context.Background()
 	previousState := getDeviceState(voltha.AdminState_ENABLED, voltha.ConnectStatus_UNKNOWN, voltha.OperStatus_ACTIVATING)
 	device := getDevice(true, voltha.AdminState_ENABLED, voltha.ConnectStatus_UNKNOWN, voltha.OperStatus_ACTIVE)
-	handlers := transitionMap.GetTransitionHandler(device, previousState)
+	handlers := transitionMap.GetTransitionHandler(ctx, device, previousState)
 	assert.Equal(t, 1, len(handlers))
 	assert.True(t, reflect.ValueOf(tdm.CreateLogicalDevice).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
 
 	previousState = getDeviceState(voltha.AdminState_ENABLED, voltha.ConnectStatus_UNKNOWN, voltha.OperStatus_ACTIVATING)
 	device = getDevice(true, voltha.AdminState_ENABLED, voltha.ConnectStatus_REACHABLE, voltha.OperStatus_ACTIVE)
-	handlers = transitionMap.GetTransitionHandler(device, previousState)
+	handlers = transitionMap.GetTransitionHandler(ctx, device, previousState)
 	assert.Equal(t, 1, len(handlers))
 	assert.True(t, reflect.ValueOf(tdm.CreateLogicalDevice).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
 
 	previousState = getDeviceState(voltha.AdminState_ENABLED, voltha.ConnectStatus_UNKNOWN, voltha.OperStatus_ACTIVATING)
 	device = getDevice(true, voltha.AdminState_ENABLED, voltha.ConnectStatus_UNREACHABLE, voltha.OperStatus_ACTIVE)
-	handlers = transitionMap.GetTransitionHandler(device, previousState)
+	handlers = transitionMap.GetTransitionHandler(ctx, device, previousState)
 	assert.Equal(t, 1, len(handlers))
 	assert.True(t, reflect.ValueOf(tdm.CreateLogicalDevice).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
 
 	previousState = getDeviceState(voltha.AdminState_ENABLED, voltha.ConnectStatus_UNREACHABLE, voltha.OperStatus_ACTIVATING)
 	device = getDevice(true, voltha.AdminState_ENABLED, voltha.ConnectStatus_UNREACHABLE, voltha.OperStatus_ACTIVE)
-	handlers = transitionMap.GetTransitionHandler(device, previousState)
+	handlers = transitionMap.GetTransitionHandler(ctx, device, previousState)
 	assert.Equal(t, 1, len(handlers))
 	assert.True(t, reflect.ValueOf(tdm.CreateLogicalDevice).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
 
 	previousState = getDeviceState(voltha.AdminState_ENABLED, voltha.ConnectStatus_UNREACHABLE, voltha.OperStatus_ACTIVATING)
 	device = getDevice(true, voltha.AdminState_ENABLED, voltha.ConnectStatus_UNKNOWN, voltha.OperStatus_ACTIVE)
-	handlers = transitionMap.GetTransitionHandler(device, previousState)
+	handlers = transitionMap.GetTransitionHandler(ctx, device, previousState)
 	assert.Equal(t, 1, len(handlers))
 	assert.True(t, reflect.ValueOf(tdm.CreateLogicalDevice).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
-
 	previousState = getDeviceState(voltha.AdminState_ENABLED, voltha.ConnectStatus_UNKNOWN, voltha.OperStatus_DISCOVERED)
 	device = getDevice(false, voltha.AdminState_ENABLED, voltha.ConnectStatus_UNKNOWN, voltha.OperStatus_ACTIVE)
-	handlers = transitionMap.GetTransitionHandler(device, previousState)
+	handlers = transitionMap.GetTransitionHandler(ctx, device, previousState)
 	assert.Equal(t, 1, len(handlers))
 	assert.True(t, reflect.ValueOf(tdm.SetupUNILogicalPorts).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
 
 	previousState = getDeviceState(voltha.AdminState_ENABLED, voltha.ConnectStatus_UNKNOWN, voltha.OperStatus_DISCOVERED)
 	device = getDevice(false, voltha.AdminState_ENABLED, voltha.ConnectStatus_UNREACHABLE, voltha.OperStatus_ACTIVE)
-	handlers = transitionMap.GetTransitionHandler(device, previousState)
+	handlers = transitionMap.GetTransitionHandler(ctx, device, previousState)
 	assert.Equal(t, 1, len(handlers))
 	assert.True(t, reflect.ValueOf(tdm.SetupUNILogicalPorts).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
 
 	previousState = getDeviceState(voltha.AdminState_ENABLED, voltha.ConnectStatus_UNKNOWN, voltha.OperStatus_DISCOVERED)
 	device = getDevice(false, voltha.AdminState_ENABLED, voltha.ConnectStatus_REACHABLE, voltha.OperStatus_ACTIVE)
-	handlers = transitionMap.GetTransitionHandler(device, previousState)
+	handlers = transitionMap.GetTransitionHandler(ctx, device, previousState)
 	assert.Equal(t, 1, len(handlers))
 	assert.True(t, reflect.ValueOf(tdm.SetupUNILogicalPorts).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
 
 	previousState = getDeviceState(voltha.AdminState_ENABLED, voltha.ConnectStatus_REACHABLE, voltha.OperStatus_DISCOVERED)
 	device = getDevice(false, voltha.AdminState_ENABLED, voltha.ConnectStatus_REACHABLE, voltha.OperStatus_ACTIVE)
-	handlers = transitionMap.GetTransitionHandler(device, previousState)
+	handlers = transitionMap.GetTransitionHandler(ctx, device, previousState)
 	assert.Equal(t, 1, len(handlers))
 	assert.True(t, reflect.ValueOf(tdm.SetupUNILogicalPorts).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
 
 	previousState = getDeviceState(voltha.AdminState_ENABLED, voltha.ConnectStatus_UNREACHABLE, voltha.OperStatus_DISCOVERED)
 	device = getDevice(false, voltha.AdminState_ENABLED, voltha.ConnectStatus_REACHABLE, voltha.OperStatus_ACTIVE)
-	handlers = transitionMap.GetTransitionHandler(device, previousState)
+	handlers = transitionMap.GetTransitionHandler(ctx, device, previousState)
 	assert.Equal(t, 1, len(handlers))
 	assert.True(t, reflect.ValueOf(tdm.SetupUNILogicalPorts).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
 
 	previousState = getDeviceState(voltha.AdminState_ENABLED, voltha.ConnectStatus_UNKNOWN, voltha.OperStatus_ACTIVATING)
 	device = getDevice(false, voltha.AdminState_ENABLED, voltha.ConnectStatus_UNKNOWN, voltha.OperStatus_ACTIVE)
-	handlers = transitionMap.GetTransitionHandler(device, previousState)
+	handlers = transitionMap.GetTransitionHandler(ctx, device, previousState)
 	assert.Equal(t, 1, len(handlers))
 	assert.True(t, reflect.ValueOf(tdm.SetupUNILogicalPorts).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
 
 	previousState = getDeviceState(voltha.AdminState_ENABLED, voltha.ConnectStatus_UNKNOWN, voltha.OperStatus_ACTIVATING)
 	device = getDevice(false, voltha.AdminState_ENABLED, voltha.ConnectStatus_UNREACHABLE, voltha.OperStatus_ACTIVE)
-	handlers = transitionMap.GetTransitionHandler(device, previousState)
+	handlers = transitionMap.GetTransitionHandler(ctx, device, previousState)
 	assert.Equal(t, 1, len(handlers))
 	assert.True(t, reflect.ValueOf(tdm.SetupUNILogicalPorts).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
 
 	previousState = getDeviceState(voltha.AdminState_ENABLED, voltha.ConnectStatus_UNKNOWN, voltha.OperStatus_ACTIVATING)
 	device = getDevice(false, voltha.AdminState_ENABLED, voltha.ConnectStatus_REACHABLE, voltha.OperStatus_ACTIVE)
-	handlers = transitionMap.GetTransitionHandler(device, previousState)
+	handlers = transitionMap.GetTransitionHandler(ctx, device, previousState)
 	assert.Equal(t, 1, len(handlers))
 	assert.True(t, reflect.ValueOf(tdm.SetupUNILogicalPorts).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
 
 	previousState = getDeviceState(voltha.AdminState_ENABLED, voltha.ConnectStatus_REACHABLE, voltha.OperStatus_ACTIVATING)
 	device = getDevice(false, voltha.AdminState_ENABLED, voltha.ConnectStatus_UNREACHABLE, voltha.OperStatus_ACTIVE)
-	handlers = transitionMap.GetTransitionHandler(device, previousState)
+	handlers = transitionMap.GetTransitionHandler(ctx, device, previousState)
 	assert.Equal(t, 1, len(handlers))
 	assert.True(t, reflect.ValueOf(tdm.SetupUNILogicalPorts).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
 
 	previousState = getDeviceState(voltha.AdminState_ENABLED, voltha.ConnectStatus_UNREACHABLE, voltha.OperStatus_ACTIVATING)
 	device = getDevice(false, voltha.AdminState_ENABLED, voltha.ConnectStatus_UNREACHABLE, voltha.OperStatus_ACTIVE)
-	handlers = transitionMap.GetTransitionHandler(device, previousState)
+	handlers = transitionMap.GetTransitionHandler(ctx, device, previousState)
 	assert.Equal(t, 1, len(handlers))
 	assert.True(t, reflect.ValueOf(tdm.SetupUNILogicalPorts).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
 
 	previousState = getDeviceState(voltha.AdminState_PREPROVISIONED, voltha.ConnectStatus_UNKNOWN, voltha.OperStatus_UNKNOWN)
 	device = getDevice(true, voltha.AdminState_DELETED, voltha.ConnectStatus_UNKNOWN, voltha.OperStatus_UNKNOWN)
-	handlers = transitionMap.GetTransitionHandler(device, previousState)
+	handlers = transitionMap.GetTransitionHandler(ctx, device, previousState)
 	assert.Equal(t, 1, len(handlers))
 	assert.True(t, reflect.ValueOf(tdm.RunPostDeviceDelete).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
 
 	previousState = getDeviceState(voltha.AdminState_PREPROVISIONED, voltha.ConnectStatus_UNKNOWN, voltha.OperStatus_UNKNOWN)
 	device = getDevice(false, voltha.AdminState_DELETED, voltha.ConnectStatus_UNKNOWN, voltha.OperStatus_UNKNOWN)
-	handlers = transitionMap.GetTransitionHandler(device, previousState)
+	handlers = transitionMap.GetTransitionHandler(ctx, device, previousState)
 	assert.Equal(t, 1, len(handlers))
 	assert.True(t, reflect.ValueOf(tdm.RunPostDeviceDelete).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
 
 	previousState = getDeviceState(voltha.AdminState_ENABLED, voltha.ConnectStatus_UNKNOWN, voltha.OperStatus_ACTIVE)
 	device = getDevice(false, voltha.AdminState_DELETED, voltha.ConnectStatus_UNKNOWN, voltha.OperStatus_FAILED)
-	handlers = transitionMap.GetTransitionHandler(device, previousState)
+	handlers = transitionMap.GetTransitionHandler(ctx, device, previousState)
 	assert.Equal(t, 3, len(handlers))
 	assert.True(t, reflect.ValueOf(tdm.ChildDeviceLost).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
 	assert.True(t, reflect.ValueOf(tdm.DeleteLogicalPorts).Pointer() == reflect.ValueOf(handlers[1]).Pointer())
@@ -184,7 +184,7 @@
 
 	previousState = getDeviceState(voltha.AdminState_ENABLED, voltha.ConnectStatus_REACHABLE, voltha.OperStatus_ACTIVE)
 	device = getDevice(true, voltha.AdminState_ENABLED, voltha.ConnectStatus_UNREACHABLE, voltha.OperStatus_UNKNOWN)
-	handlers = transitionMap.GetTransitionHandler(device, previousState)
+	handlers = transitionMap.GetTransitionHandler(ctx, device, previousState)
 	assert.Equal(t, 4, len(handlers))
 	assert.True(t, reflect.ValueOf(tdm.DeleteAllLogicalPorts).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
 	assert.True(t, reflect.ValueOf(tdm.DeleteAllChildDevices).Pointer() == reflect.ValueOf(handlers[1]).Pointer())
@@ -193,13 +193,13 @@
 
 	previousState = getDeviceState(voltha.AdminState_ENABLED, voltha.ConnectStatus_UNREACHABLE, voltha.OperStatus_UNKNOWN)
 	device = getDevice(true, voltha.AdminState_ENABLED, voltha.ConnectStatus_REACHABLE, voltha.OperStatus_ACTIVE)
-	handlers = transitionMap.GetTransitionHandler(device, previousState)
+	handlers = transitionMap.GetTransitionHandler(ctx, device, previousState)
 	assert.Equal(t, 1, len(handlers))
 	assert.True(t, reflect.ValueOf(tdm.CreateLogicalDevice).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
 
 	previousState = getDeviceState(voltha.AdminState_DISABLED, voltha.ConnectStatus_REACHABLE, voltha.OperStatus_UNKNOWN)
 	device = getDevice(true, voltha.AdminState_DISABLED, voltha.ConnectStatus_UNREACHABLE, voltha.OperStatus_UNKNOWN)
-	handlers = transitionMap.GetTransitionHandler(device, previousState)
+	handlers = transitionMap.GetTransitionHandler(ctx, device, previousState)
 	assert.Equal(t, 4, len(handlers))
 	assert.True(t, reflect.ValueOf(tdm.DeleteAllLogicalPorts).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
 	assert.True(t, reflect.ValueOf(tdm.DeleteAllChildDevices).Pointer() == reflect.ValueOf(handlers[1]).Pointer())
@@ -208,7 +208,7 @@
 
 	previousState = getDeviceState(voltha.AdminState_DISABLED, voltha.ConnectStatus_UNREACHABLE, voltha.OperStatus_UNKNOWN)
 	device = getDevice(true, voltha.AdminState_DISABLED, voltha.ConnectStatus_REACHABLE, voltha.OperStatus_UNKNOWN)
-	handlers = transitionMap.GetTransitionHandler(device, previousState)
+	handlers = transitionMap.GetTransitionHandler(ctx, device, previousState)
 	assert.Equal(t, 1, len(handlers))
 	assert.True(t, reflect.ValueOf(tdm.CreateLogicalDevice).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
 
@@ -250,7 +250,7 @@
 		for _, device := range deleteDeviceTest.devices {
 			device.Root = true
 			t.Run(testName, func(t *testing.T) {
-				handlers = transitionMap.GetTransitionHandler(device, previousState)
+				handlers = transitionMap.GetTransitionHandler(ctx, device, previousState)
 				assert.Equal(t, 4, len(handlers))
 				for idx, expHandler := range deleteDeviceTest.expectedParentHandlers {
 					assert.True(t, reflect.ValueOf(expHandler).Pointer() == reflect.ValueOf(handlers[idx]).Pointer())
@@ -264,7 +264,7 @@
 		for _, device := range deleteDeviceTest.devices {
 			device.Root = false
 			t.Run(testName, func(t *testing.T) {
-				handlers = transitionMap.GetTransitionHandler(device, deviceState)
+				handlers = transitionMap.GetTransitionHandler(ctx, device, deviceState)
 				assert.Equal(t, 3, len(handlers))
 				for idx, expHandler := range deleteDeviceTest.expectedChildHandlers {
 					assert.True(t, reflect.ValueOf(expHandler).Pointer() == reflect.ValueOf(handlers[idx]).Pointer())
@@ -333,5 +333,5 @@
 func TestMatch(t *testing.T) {
 	best := &match{admin: currPrevStateMatch, oper: currPrevStateMatch, conn: currPrevStateMatch}
 	m := &match{admin: currStateOnlyMatch, oper: currWildcardMatch, conn: currWildcardMatch}
-	fmt.Println(m.isBetterMatch(best), m.toInt(), best.toInt())
+	fmt.Println(m.isBetterMatch(context.Background(), best), m.toInt(context.Background()), best.toInt(context.Background()))
 }
diff --git a/rw_core/core/kafka.go b/rw_core/core/kafka.go
index fcdf340..d50274b 100644
--- a/rw_core/core/kafka.go
+++ b/rw_core/core/kafka.go
@@ -36,11 +36,12 @@
 
 	// create the kafka RPC proxy
 	kmp := kafka.NewInterContainerProxy(
-		kafka.InterContainerHost(host),
-		kafka.InterContainerPort(port),
-		kafka.MsgClient(kafkaClient),
-		kafka.DefaultTopic(&kafka.Topic{Name: coreTopic}),
-		kafka.DeviceDiscoveryTopic(&kafka.Topic{Name: affinityRouterTopic}))
+		ctx,
+		kafka.InterContainerHost(ctx, host),
+		kafka.InterContainerPort(ctx, port),
+		kafka.MsgClient(ctx, kafkaClient),
+		kafka.DefaultTopic(ctx, &kafka.Topic{Name: coreTopic}),
+		kafka.DeviceDiscoveryTopic(ctx, &kafka.Topic{Name: affinityRouterTopic}))
 
 	probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusPrepared)
 
@@ -51,7 +52,7 @@
 	for {
 		// If we haven't started yet, then try to start
 		logger.Infow("starting-kafka-proxy", log.Fields{})
-		if err := kmp.Start(); err != nil {
+		if err := kmp.Start(ctx); err != nil {
 			// We failed to start. Delay and then try again later.
 			// Don't worry about liveness, as we can't be live until we've started.
 			probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusNotReady)
@@ -103,7 +104,7 @@
 func monitorKafkaLiveness(ctx context.Context, kmp kafka.InterContainerProxy, liveProbeInterval time.Duration, notLiveProbeInterval time.Duration) {
 	logger.Info("started-kafka-message-proxy")
 
-	livenessChannel := kmp.EnableLivenessChannel(true)
+	livenessChannel := kmp.EnableLivenessChannel(ctx, true)
 
 	logger.Info("enabled-kafka-liveness-channel")
 
@@ -135,7 +136,7 @@
 			// send the liveness probe in a goroutine; we don't want to deadlock ourselves as
 			// the liveness probe may wait (and block) writing to our channel.
 			go func() {
-				err := kmp.SendLiveness()
+				err := kmp.SendLiveness(ctx)
 				if err != nil {
 					// Catch possible error case if sending liveness after Sarama has been stopped.
 					logger.Warnw("error-kafka-send-liveness", log.Fields{"error": err})
@@ -147,16 +148,16 @@
 	}
 }
 
-func registerAdapterRequestHandlers(kmp kafka.InterContainerProxy, dMgr *device.Manager, aMgr *adapter.Manager, coreTopic, corePairTopic string) {
-	requestProxy := api.NewAdapterRequestHandlerProxy(dMgr, aMgr)
+func registerAdapterRequestHandlers(ctx context.Context, kmp kafka.InterContainerProxy, dMgr *device.Manager, aMgr *adapter.Manager, coreTopic, corePairTopic string) {
+	requestProxy := api.NewAdapterRequestHandlerProxy(ctx, dMgr, aMgr)
 
 	// Register the broadcast topic to handle any core-bound broadcast requests
-	if err := kmp.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: coreTopic}, requestProxy); err != nil {
+	if err := kmp.SubscribeWithRequestHandlerInterface(ctx, kafka.Topic{Name: coreTopic}, requestProxy); err != nil {
 		logger.Fatalw("Failed-registering-broadcast-handler", log.Fields{"topic": coreTopic})
 	}
 
 	// Register the core-pair topic to handle core-bound requests destined to the core pair
-	if err := kmp.SubscribeWithDefaultRequestHandler(kafka.Topic{Name: corePairTopic}, kafka.OffsetNewest); err != nil {
+	if err := kmp.SubscribeWithDefaultRequestHandler(ctx, kafka.Topic{Name: corePairTopic}, kafka.OffsetNewest); err != nil {
 		logger.Fatalw("Failed-registering-pair-handler", log.Fields{"topic": corePairTopic})
 	}
 
diff --git a/rw_core/core/kv.go b/rw_core/core/kv.go
index 53db264..1bba04c 100644
--- a/rw_core/core/kv.go
+++ b/rw_core/core/kv.go
@@ -29,13 +29,13 @@
 	"google.golang.org/grpc/status"
 )
 
-func newKVClient(storeType string, address string, timeout time.Duration) (kvstore.Client, error) {
+func newKVClient(ctx context.Context, storeType string, address string, timeout time.Duration) (kvstore.Client, error) {
 	logger.Infow("kv-store-type", log.Fields{"store": storeType})
 	switch storeType {
 	case "consul":
-		return kvstore.NewConsulClient(address, timeout)
+		return kvstore.NewConsulClient(ctx, address, timeout)
 	case "etcd":
-		return kvstore.NewEtcdClient(address, timeout, log.FatalLevel)
+		return kvstore.NewEtcdClient(ctx, address, timeout, log.FatalLevel)
 	}
 	return nil, errors.New("unsupported-kv-store")
 }
@@ -46,7 +46,7 @@
 		logger.Infow("fail-to-release-all-reservations", log.Fields{"error": err})
 	}
 	// Close the DB connection
-	kvClient.Close()
+	kvClient.Close(ctx)
 }
 
 // waitUntilKVStoreReachableOrMaxTries will wait until it can connect to a KV store or until maxtries has been reached
@@ -98,7 +98,7 @@
 	logger.Info("start-monitoring-kvstore-liveness")
 
 	// Instruct backend to create Liveness channel for transporting state updates
-	livenessChannel := backend.EnableLivenessChannel()
+	livenessChannel := backend.EnableLivenessChannel(ctx)
 
 	logger.Debug("enabled-kvstore-liveness-channel")