VOL-2999 - Reworked how Proxies are created & used.

Added DB Paths to separate location specification logic from entry access logic.
Also merged Update() and AddWithID() and renamed to Set().

Change-Id: I9ed5eafd63c180dddc5845a166554f89bda12325
diff --git a/rw_core/core/device/agent.go b/rw_core/core/device/agent.go
index c8f03e7..918432f 100755
--- a/rw_core/core/device/agent.go
+++ b/rw_core/core/device/agent.go
@@ -44,25 +44,25 @@
 
 // Agent represents device agent attributes
 type Agent struct {
-	deviceID         string
-	parentID         string
-	deviceType       string
-	isRootdevice     bool
-	adapterProxy     *remote.AdapterProxy
-	adapterMgr       *adapter.Manager
-	deviceMgr        *Manager
-	clusterDataProxy *model.Proxy
-	exitChannel      chan int
-	device           *voltha.Device
-	requestQueue     *coreutils.RequestQueue
-	defaultTimeout   time.Duration
-	startOnce        sync.Once
-	stopOnce         sync.Once
-	stopped          bool
+	deviceID       string
+	parentID       string
+	deviceType     string
+	isRootdevice   bool
+	adapterProxy   *remote.AdapterProxy
+	adapterMgr     *adapter.Manager
+	deviceMgr      *Manager
+	dbProxy        *model.Proxy
+	exitChannel    chan int
+	device         *voltha.Device
+	requestQueue   *coreutils.RequestQueue
+	defaultTimeout time.Duration
+	startOnce      sync.Once
+	stopOnce       sync.Once
+	stopped        bool
 }
 
 //newAgent creates a new device agent. The device will be initialized when start() is called.
-func newAgent(ap *remote.AdapterProxy, device *voltha.Device, deviceMgr *Manager, cdProxy *model.Proxy, timeout time.Duration) *Agent {
+func newAgent(ap *remote.AdapterProxy, device *voltha.Device, deviceMgr *Manager, deviceProxy *model.Proxy, timeout time.Duration) *Agent {
 	var agent Agent
 	agent.adapterProxy = ap
 	if device.Id == "" {
@@ -77,7 +77,7 @@
 	agent.deviceMgr = deviceMgr
 	agent.adapterMgr = deviceMgr.adapterMgr
 	agent.exitChannel = make(chan int, 1)
-	agent.clusterDataProxy = cdProxy
+	agent.dbProxy = deviceProxy
 	agent.defaultTimeout = timeout
 	agent.device = proto.Clone(device).(*voltha.Device)
 	agent.requestQueue = coreutils.NewRequestQueue()
@@ -105,7 +105,7 @@
 	if deviceToCreate == nil {
 		// Load the existing device
 		device := &voltha.Device{}
-		have, err := agent.clusterDataProxy.Get(ctx, "devices/"+agent.deviceID, device)
+		have, err := agent.dbProxy.Get(ctx, agent.deviceID, device)
 		if err != nil {
 			return nil, err
 		} else if !have {
@@ -118,8 +118,8 @@
 		logger.Infow("device-loaded-from-dB", log.Fields{"device-id": agent.deviceID})
 	} else {
 		// Create a new device
-		// Assumption is that AdminState, FlowGroups, and Flows are unitialized since this
-		// is a new device, so populate them here before passing the device to clusterDataProxy.AddWithId.
+		// Assumption is that AdminState, FlowGroups, and Flows are uninitialized since this
+		// is a new device, so populate them here before passing the device to ldProxy.Set.
 		// agent.deviceId will also have been set during newAgent().
 		device = (proto.Clone(deviceToCreate)).(*voltha.Device)
 		device.Id = agent.deviceID
@@ -133,7 +133,7 @@
 		}
 
 		// Add the initial device to the local model
-		if err := agent.clusterDataProxy.AddWithID(ctx, "devices", agent.deviceID, device); err != nil {
+		if err := agent.dbProxy.Set(ctx, agent.deviceID, device); err != nil {
 			return nil, status.Errorf(codes.Aborted, "failed-adding-device-%s: %s", agent.deviceID, err)
 		}
 		agent.device = device
@@ -159,7 +159,7 @@
 	logger.Infow("stopping-device-agent", log.Fields{"deviceId": agent.deviceID, "parentId": agent.parentID})
 
 	//	Remove the device from the KV store
-	if err := agent.clusterDataProxy.Remove(ctx, "devices/"+agent.deviceID); err != nil {
+	if err := agent.dbProxy.Remove(ctx, agent.deviceID); err != nil {
 		return err
 	}
 
@@ -182,7 +182,7 @@
 	logger.Debug("reconciling-device-agent-devicetype")
 	// TODO: context timeout
 	device := &voltha.Device{}
-	if have, err := agent.clusterDataProxy.Get(ctx, "devices/"+agent.deviceID, device); err != nil {
+	if have, err := agent.dbProxy.Get(ctx, agent.deviceID, device); err != nil {
 		logger.Errorw("kv-get-failed", log.Fields{"device-id": agent.deviceID, "error": err})
 		return
 	} else if !have {
@@ -1553,7 +1553,7 @@
 		return errors.New("device agent stopped")
 	}
 
-	if err := agent.clusterDataProxy.Update(ctx, "devices/"+agent.deviceID, device); err != nil {
+	if err := agent.dbProxy.Set(ctx, agent.deviceID, device); err != nil {
 		return status.Errorf(codes.Internal, "failed-update-device:%s: %s", agent.deviceID, err)
 	}
 	logger.Debugw("updated-device-in-store", log.Fields{"deviceId: ": agent.deviceID})
diff --git a/rw_core/core/device/agent_test.go b/rw_core/core/device/agent_test.go
index 2abfdeb..ffc2a3b 100755
--- a/rw_core/core/device/agent_test.go
+++ b/rw_core/core/device/agent_test.go
@@ -137,7 +137,7 @@
 		kafka.DeviceDiscoveryTopic(&kafka.Topic{Name: cfg.AffinityRouterTopic}))
 
 	endpointMgr := kafka.NewEndpointManager(backend)
-	proxy := model.NewProxy(backend, "/")
+	proxy := model.NewDBPath(backend)
 	adapterMgr := adapter.NewAdapterManager(proxy, dat.coreInstanceID, dat.kClient)
 
 	dat.deviceMgr, dat.logicalDeviceMgr = NewManagers(proxy, adapterMgr, dat.kmp, endpointMgr, cfg.CorePairTopic, dat.coreInstanceID, cfg.DefaultCoreTimeout)
@@ -194,7 +194,7 @@
 func (dat *DATest) createDeviceAgent(t *testing.T) *Agent {
 	deviceMgr := dat.deviceMgr
 	clonedDevice := proto.Clone(dat.device).(*voltha.Device)
-	deviceAgent := newAgent(deviceMgr.adapterProxy, clonedDevice, deviceMgr, deviceMgr.clusterDataProxy, deviceMgr.defaultTimeout)
+	deviceAgent := newAgent(deviceMgr.adapterProxy, clonedDevice, deviceMgr, deviceMgr.dProxy, deviceMgr.defaultTimeout)
 	d, err := deviceAgent.start(context.TODO(), clonedDevice)
 	assert.Nil(t, err)
 	assert.NotNil(t, d)
diff --git a/rw_core/core/device/flow/loader.go b/rw_core/core/device/flow/loader.go
index 741a45f..b407a3b 100644
--- a/rw_core/core/device/flow/loader.go
+++ b/rw_core/core/device/flow/loader.go
@@ -19,7 +19,6 @@
 import (
 	"context"
 	"fmt"
-	"strconv"
 	"sync"
 
 	"github.com/opencord/voltha-go/db/model"
@@ -35,8 +34,7 @@
 	lock  sync.RWMutex
 	flows map[uint64]*chunk
 
-	dbProxy         *model.Proxy
-	logicalDeviceID string // TODO: dbProxy should already have the logicalDeviceID component of the path internally
+	dbProxy *model.Proxy
 }
 
 // chunk keeps a flow and the lock for this flow
@@ -48,11 +46,10 @@
 	flow *ofp.OfpFlowStats
 }
 
-func NewLoader(dataProxy *model.Proxy, logicalDeviceID string) *Loader {
+func NewLoader(dbProxy *model.Proxy) *Loader {
 	return &Loader{
-		flows:           make(map[uint64]*chunk),
-		dbProxy:         dataProxy,
-		logicalDeviceID: logicalDeviceID,
+		flows:   make(map[uint64]*chunk),
+		dbProxy: dbProxy,
 	}
 }
 
@@ -63,7 +60,7 @@
 	defer loader.lock.Unlock()
 
 	var flows []*ofp.OfpFlowStats
-	if err := loader.dbProxy.List(ctx, "logical_flows/"+loader.logicalDeviceID, &flows); err != nil {
+	if err := loader.dbProxy.List(ctx, &flows); err != nil {
 		logger.Errorw("failed-to-list-flows-from-cluster-data-proxy", log.Fields{"error": err})
 		return
 	}
@@ -88,9 +85,8 @@
 		entry.lock.Lock()
 		loader.lock.Unlock()
 
-		flowID := strconv.FormatUint(uint64(flow.Id), 10)
-		if err := loader.dbProxy.AddWithID(ctx, "logical_flows/"+loader.logicalDeviceID, flowID, flow); err != nil {
-			logger.Errorw("failed-adding-flow-to-db", log.Fields{"deviceID": loader.logicalDeviceID, "flowID": flowID, "err": err})
+		if err := loader.dbProxy.Set(ctx, fmt.Sprint(flow.Id), flow); err != nil {
+			logger.Errorw("failed-adding-flow-to-db", log.Fields{"flowID": flow.Id, "err": err})
 
 			// revert the map
 			loader.lock.Lock()
@@ -147,9 +143,8 @@
 // Update updates an existing flow in the kv.
 // The provided "flow" must not be modified afterwards.
 func (h *Handle) Update(ctx context.Context, flow *ofp.OfpFlowStats) error {
-	path := fmt.Sprintf("logical_flows/%s/%d", h.loader.logicalDeviceID, flow.Id)
-	if err := h.loader.dbProxy.Update(ctx, path, flow); err != nil {
-		return status.Errorf(codes.Internal, "failed-update-flow:%s:%d %s", h.loader.logicalDeviceID, flow.Id, err)
+	if err := h.loader.dbProxy.Set(ctx, fmt.Sprint(flow.Id), flow); err != nil {
+		return status.Errorf(codes.Internal, "failed-update-flow-%v: %s", flow.Id, err)
 	}
 	h.chunk.flow = flow
 	return nil
@@ -157,9 +152,8 @@
 
 // Delete removes the device from the kv
 func (h *Handle) Delete(ctx context.Context) error {
-	path := fmt.Sprintf("logical_flows/%s/%d", h.loader.logicalDeviceID, h.chunk.flow.Id)
-	if err := h.loader.dbProxy.Remove(ctx, path); err != nil {
-		return fmt.Errorf("couldnt-delete-flow-from-store-%s", path)
+	if err := h.loader.dbProxy.Remove(ctx, fmt.Sprint(h.chunk.flow.Id)); err != nil {
+		return fmt.Errorf("couldnt-delete-flow-from-store-%v", h.chunk.flow.Id)
 	}
 	h.chunk.deleted = true
 
diff --git a/rw_core/core/device/group/loader.go b/rw_core/core/device/group/loader.go
index 0e3f078..5b2890a 100644
--- a/rw_core/core/device/group/loader.go
+++ b/rw_core/core/device/group/loader.go
@@ -19,7 +19,6 @@
 import (
 	"context"
 	"fmt"
-	"strconv"
 	"sync"
 
 	"github.com/opencord/voltha-go/db/model"
@@ -35,8 +34,7 @@
 	lock   sync.RWMutex
 	groups map[uint32]*chunk
 
-	dbProxy         *model.Proxy
-	logicalDeviceID string // TODO: dbProxy should already have the logicalDeviceID component of the path internally
+	dbProxy *model.Proxy
 }
 
 // chunk keeps a group and the lock for this group
@@ -48,11 +46,10 @@
 	group *ofp.OfpGroupEntry
 }
 
-func NewLoader(dataProxy *model.Proxy, logicalDeviceID string) *Loader {
+func NewLoader(dbProxy *model.Proxy) *Loader {
 	return &Loader{
-		groups:          make(map[uint32]*chunk),
-		dbProxy:         dataProxy,
-		logicalDeviceID: logicalDeviceID,
+		groups:  make(map[uint32]*chunk),
+		dbProxy: dbProxy,
 	}
 }
 
@@ -63,7 +60,7 @@
 	defer loader.lock.Unlock()
 
 	var groups []*ofp.OfpGroupEntry
-	if err := loader.dbProxy.List(ctx, "logical_groups/"+loader.logicalDeviceID, &groups); err != nil {
+	if err := loader.dbProxy.List(ctx, &groups); err != nil {
 		logger.Errorw("failed-to-list-groups-from-cluster-data-proxy", log.Fields{"error": err})
 		return
 	}
@@ -88,9 +85,8 @@
 		entry.lock.Lock()
 		loader.lock.Unlock()
 
-		groupID := strconv.FormatUint(uint64(group.Desc.GroupId), 10)
-		if err := loader.dbProxy.AddWithID(ctx, "logical_groups/"+loader.logicalDeviceID, groupID, group); err != nil {
-			logger.Errorw("failed-adding-group-to-db", log.Fields{"deviceID": loader.logicalDeviceID, "groupID": groupID, "err": err})
+		if err := loader.dbProxy.Set(ctx, fmt.Sprint(group.Desc.GroupId), group); err != nil {
+			logger.Errorw("failed-adding-group-to-db", log.Fields{"groupID": group.Desc.GroupId, "err": err})
 
 			// revert the map
 			loader.lock.Lock()
@@ -147,9 +143,8 @@
 // Update updates an existing group in the kv.
 // The provided "group" must not be modified afterwards.
 func (h *Handle) Update(ctx context.Context, group *ofp.OfpGroupEntry) error {
-	path := fmt.Sprintf("logical_groups/%s/%d", h.loader.logicalDeviceID, group.Desc.GroupId)
-	if err := h.loader.dbProxy.Update(ctx, path, group); err != nil {
-		return status.Errorf(codes.Internal, "failed-update-group:%s:%d %s", h.loader.logicalDeviceID, group.Desc.GroupId, err)
+	if err := h.loader.dbProxy.Set(ctx, fmt.Sprint(group.Desc.GroupId), group); err != nil {
+		return status.Errorf(codes.Internal, "failed-update-group-%v: %s", group.Desc.GroupId, err)
 	}
 	h.chunk.group = group
 	return nil
@@ -157,9 +152,8 @@
 
 // Delete removes the device from the kv
 func (h *Handle) Delete(ctx context.Context) error {
-	path := fmt.Sprintf("logical_groups/%s/%d", h.loader.logicalDeviceID, h.chunk.group.Desc.GroupId)
-	if err := h.loader.dbProxy.Remove(ctx, path); err != nil {
-		return fmt.Errorf("couldnt-delete-group-from-store-%s", path)
+	if err := h.loader.dbProxy.Remove(ctx, fmt.Sprint(h.chunk.group.Desc.GroupId)); err != nil {
+		return fmt.Errorf("couldnt-delete-group-from-store-%v", h.chunk.group.Desc.GroupId)
 	}
 	h.chunk.deleted = true
 
diff --git a/rw_core/core/device/logical_agent.go b/rw_core/core/device/logical_agent.go
index dbb2da5..f943106 100644
--- a/rw_core/core/device/logical_agent.go
+++ b/rw_core/core/device/logical_agent.go
@@ -47,7 +47,7 @@
 	rootDeviceID       string
 	deviceMgr          *Manager
 	ldeviceMgr         *LogicalManager
-	clusterDataProxy   *model.Proxy
+	ldProxy            *model.Proxy
 	stopped            bool
 	deviceRoutes       *route.DeviceRoutes
 	logicalPortsNo     map[uint32]bool //value is true for NNI port
@@ -64,23 +64,23 @@
 	groupLoader *group.Loader
 }
 
-func newLogicalDeviceAgent(id string, sn string, deviceID string, ldeviceMgr *LogicalManager,
-	deviceMgr *Manager, cdProxy *model.Proxy, defaultTimeout time.Duration) *LogicalAgent {
+func newLogicalAgent(id string, sn string, deviceID string, ldeviceMgr *LogicalManager,
+	deviceMgr *Manager, dbProxy *model.Path, ldProxy *model.Proxy, defaultTimeout time.Duration) *LogicalAgent {
 	agent := &LogicalAgent{
-		logicalDeviceID:  id,
-		serialNumber:     sn,
-		rootDeviceID:     deviceID,
-		deviceMgr:        deviceMgr,
-		clusterDataProxy: cdProxy,
-		ldeviceMgr:       ldeviceMgr,
-		flowDecomposer:   fd.NewFlowDecomposer(deviceMgr),
-		logicalPortsNo:   make(map[uint32]bool),
-		defaultTimeout:   defaultTimeout,
-		requestQueue:     coreutils.NewRequestQueue(),
+		logicalDeviceID: id,
+		serialNumber:    sn,
+		rootDeviceID:    deviceID,
+		deviceMgr:       deviceMgr,
+		ldProxy:         ldProxy,
+		ldeviceMgr:      ldeviceMgr,
+		flowDecomposer:  fd.NewFlowDecomposer(deviceMgr),
+		logicalPortsNo:  make(map[uint32]bool),
+		defaultTimeout:  defaultTimeout,
+		requestQueue:    coreutils.NewRequestQueue(),
 
-		flowLoader:  flow.NewLoader(cdProxy, id),
-		meterLoader: meter.NewLoader(cdProxy, id),
-		groupLoader: group.NewLoader(cdProxy, id),
+		flowLoader:  flow.NewLoader(dbProxy.SubPath("logical_flows").Proxy(id)),
+		groupLoader: group.NewLoader(dbProxy.SubPath("logical_groups").Proxy(id)),
+		meterLoader: meter.NewLoader(dbProxy.SubPath("logical_meters").Proxy(id)),
 	}
 	agent.deviceRoutes = route.NewDeviceRoutes(agent.logicalDeviceID, agent.deviceMgr.getDevice)
 	return agent
@@ -128,7 +128,7 @@
 		ld.Ports = []*voltha.LogicalPort{}
 
 		// Save the logical device
-		if err := agent.clusterDataProxy.AddWithID(ctx, "logical_devices", ld.Id, ld); err != nil {
+		if err := agent.ldProxy.Set(ctx, ld.Id, ld); err != nil {
 			logger.Errorw("failed-to-add-logical-device", log.Fields{"logical-device-id": agent.logicalDeviceID})
 			return err
 		}
@@ -147,7 +147,7 @@
 		//	load from dB - the logical may not exist at this time.  On error, just return and the calling function
 		// will destroy this agent.
 		ld := &voltha.LogicalDevice{}
-		have, err := agent.clusterDataProxy.Get(ctx, "logical_devices/"+agent.logicalDeviceID, ld)
+		have, err := agent.ldProxy.Get(ctx, agent.logicalDeviceID, ld)
 		if err != nil {
 			return err
 		} else if !have {
@@ -195,7 +195,7 @@
 		defer agent.requestQueue.RequestComplete()
 
 		//Remove the logical device from the model
-		if err := agent.clusterDataProxy.Remove(ctx, "logical_devices/"+agent.logicalDeviceID); err != nil {
+		if err := agent.ldProxy.Remove(ctx, agent.logicalDeviceID); err != nil {
 			returnErr = err
 		} else {
 			logger.Debugw("logicaldevice-removed", log.Fields{"logicaldeviceId": agent.logicalDeviceID})
@@ -230,7 +230,7 @@
 	}
 
 	updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
-	if err := agent.clusterDataProxy.Update(updateCtx, "logical_devices/"+agent.logicalDeviceID, logicalDevice); err != nil {
+	if err := agent.ldProxy.Set(updateCtx, agent.logicalDeviceID, logicalDevice); err != nil {
 		logger.Errorw("failed-to-update-logical-devices-to-cluster-proxy", log.Fields{"error": err})
 		return err
 	}
diff --git a/rw_core/core/device/logical_agent_test.go b/rw_core/core/device/logical_agent_test.go
index f5d404e..eb65673 100644
--- a/rw_core/core/device/logical_agent_test.go
+++ b/rw_core/core/device/logical_agent_test.go
@@ -22,14 +22,13 @@
 	"testing"
 	"time"
 
+	"github.com/gogo/protobuf/proto"
 	"github.com/opencord/voltha-go/db/model"
+	"github.com/opencord/voltha-go/rw_core/config"
 	"github.com/opencord/voltha-go/rw_core/core/adapter"
+	com "github.com/opencord/voltha-lib-go/v3/pkg/adapters/common"
 	"github.com/opencord/voltha-lib-go/v3/pkg/db"
 	fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
-
-	"github.com/gogo/protobuf/proto"
-	"github.com/opencord/voltha-go/rw_core/config"
-	com "github.com/opencord/voltha-lib-go/v3/pkg/adapters/common"
 	"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
 	mock_etcd "github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd"
 	mock_kafka "github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka"
@@ -481,7 +480,7 @@
 		kafka.DeviceDiscoveryTopic(&kafka.Topic{Name: cfg.AffinityRouterTopic}))
 
 	endpointMgr := kafka.NewEndpointManager(backend)
-	proxy := model.NewProxy(backend, "/")
+	proxy := model.NewDBPath(backend)
 	adapterMgr := adapter.NewAdapterManager(proxy, lda.coreInstanceID, lda.kClient)
 
 	lda.deviceMgr, lda.logicalDeviceMgr = NewManagers(proxy, adapterMgr, lda.kmp, endpointMgr, cfg.CorePairTopic, lda.coreInstanceID, cfg.DefaultCoreTimeout)
@@ -509,9 +508,9 @@
 	clonedLD := proto.Clone(lda.logicalDevice).(*voltha.LogicalDevice)
 	clonedLD.Id = com.GetRandomString(10)
 	clonedLD.DatapathId = rand.Uint64()
-	lDeviceAgent := newLogicalDeviceAgent(clonedLD.Id, clonedLD.Id, clonedLD.RootDeviceId, lDeviceMgr, deviceMgr, lDeviceMgr.clusterDataProxy, lDeviceMgr.defaultTimeout)
+	lDeviceAgent := newLogicalAgent(clonedLD.Id, clonedLD.Id, clonedLD.RootDeviceId, lDeviceMgr, deviceMgr, lDeviceMgr.dbProxy, lDeviceMgr.ldProxy, lDeviceMgr.defaultTimeout)
 	lDeviceAgent.logicalDevice = clonedLD
-	err := lDeviceAgent.clusterDataProxy.AddWithID(context.Background(), "logical_devices", clonedLD.Id, clonedLD)
+	err := lDeviceAgent.ldProxy.Set(context.Background(), clonedLD.Id, clonedLD)
 	assert.Nil(t, err)
 	lDeviceMgr.addLogicalDeviceAgentToMap(lDeviceAgent)
 	return lDeviceAgent
diff --git a/rw_core/core/device/logical_manager.go b/rw_core/core/device/logical_manager.go
index cd7bce1..f9bff21 100644
--- a/rw_core/core/device/logical_manager.go
+++ b/rw_core/core/device/logical_manager.go
@@ -42,7 +42,8 @@
 	logicalDeviceAgents            sync.Map
 	deviceMgr                      *Manager
 	kafkaICProxy                   kafka.InterContainerProxy
-	clusterDataProxy               *model.Proxy
+	dbProxy                        *model.Path
+	ldProxy                        *model.Proxy
 	defaultTimeout                 time.Duration
 	logicalDevicesLoadingLock      sync.RWMutex
 	logicalDeviceLoadingInProgress map[string][]chan int
@@ -98,7 +99,7 @@
 	logger.Debug("ListAllLogicalDevices")
 
 	var logicalDevices []*voltha.LogicalDevice
-	if err := ldMgr.clusterDataProxy.List(ctx, "logical_devices", &logicalDevices); err != nil {
+	if err := ldMgr.ldProxy.List(ctx, &logicalDevices); err != nil {
 		logger.Errorw("failed-to-list-logical-devices-from-cluster-proxy", log.Fields{"error": err})
 		return nil, err
 	}
@@ -125,7 +126,7 @@
 
 	logger.Debugw("logical-device-id", log.Fields{"logicaldeviceId": id})
 
-	agent := newLogicalDeviceAgent(id, sn, device.Id, ldMgr, ldMgr.deviceMgr, ldMgr.clusterDataProxy, ldMgr.defaultTimeout)
+	agent := newLogicalAgent(id, sn, device.Id, ldMgr, ldMgr.deviceMgr, ldMgr.dbProxy, ldMgr.ldProxy, ldMgr.defaultTimeout)
 	ldMgr.addLogicalDeviceAgentToMap(agent)
 
 	// Update the root device with the logical device Id reference
@@ -135,7 +136,8 @@
 	}
 
 	go func() {
-		//agent := newLogicalDeviceAgent(id, device.Id, ldMgr, ldMgr.deviceMgr, ldMgr.clusterDataProxy, ldMgr.defaultTimeout)
+		//TODO: either wait for the agent to be started before returning, or
+		//      implement locks in the agent to ensure request are not processed before start() is complete
 		err := agent.start(context.Background(), false)
 		if err != nil {
 			logger.Errorw("unable-to-create-the-logical-device", log.Fields{"error": err})
@@ -173,7 +175,7 @@
 //getLogicalDeviceFromModel retrieves the logical device data from the model.
 func (ldMgr *LogicalManager) getLogicalDeviceFromModel(ctx context.Context, lDeviceID string) (*voltha.LogicalDevice, error) {
 	logicalDevice := &voltha.LogicalDevice{}
-	if have, err := ldMgr.clusterDataProxy.Get(ctx, "logical_devices/"+lDeviceID, logicalDevice); err != nil {
+	if have, err := ldMgr.ldProxy.Get(ctx, lDeviceID, logicalDevice); err != nil {
 		logger.Errorw("failed-to-get-logical-devices-from-cluster-proxy", log.Fields{"error": err})
 		return nil, err
 	} else if !have {
@@ -196,7 +198,7 @@
 			ldMgr.logicalDevicesLoadingLock.Unlock()
 			if _, err := ldMgr.getLogicalDeviceFromModel(ctx, lDeviceID); err == nil {
 				logger.Debugw("loading-logical-device", log.Fields{"lDeviceId": lDeviceID})
-				agent := newLogicalDeviceAgent(lDeviceID, "", "", ldMgr, ldMgr.deviceMgr, ldMgr.clusterDataProxy, ldMgr.defaultTimeout)
+				agent := newLogicalAgent(lDeviceID, "", "", ldMgr, ldMgr.deviceMgr, ldMgr.dbProxy, ldMgr.ldProxy, ldMgr.defaultTimeout)
 				if err := agent.start(ctx, true); err != nil {
 					return err
 				}
diff --git a/rw_core/core/device/manager.go b/rw_core/core/device/manager.go
index cf1301f..357c49a 100755
--- a/rw_core/core/device/manager.go
+++ b/rw_core/core/device/manager.go
@@ -19,7 +19,6 @@
 import (
 	"context"
 	"errors"
-	"github.com/opencord/voltha-go/rw_core/core/device/event"
 	"reflect"
 	"runtime"
 	"sync"
@@ -28,6 +27,7 @@
 	"github.com/golang/protobuf/ptypes/empty"
 	"github.com/opencord/voltha-go/db/model"
 	"github.com/opencord/voltha-go/rw_core/core/adapter"
+	"github.com/opencord/voltha-go/rw_core/core/device/event"
 	"github.com/opencord/voltha-go/rw_core/core/device/remote"
 	"github.com/opencord/voltha-go/rw_core/utils"
 	"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
@@ -50,20 +50,20 @@
 	logicalDeviceMgr        *LogicalManager
 	kafkaICProxy            kafka.InterContainerProxy
 	stateTransitions        *TransitionMap
-	clusterDataProxy        *model.Proxy
+	dProxy                  *model.Proxy
 	coreInstanceID          string
 	defaultTimeout          time.Duration
 	devicesLoadingLock      sync.RWMutex
 	deviceLoadingInProgress map[string][]chan int
 }
 
-func NewManagers(proxy *model.Proxy, adapterMgr *adapter.Manager, kmp kafka.InterContainerProxy, endpointMgr kafka.EndpointManager, corePairTopic, coreInstanceID string, defaultCoreTimeout time.Duration) (*Manager, *LogicalManager) {
+func NewManagers(dbProxy *model.Path, adapterMgr *adapter.Manager, kmp kafka.InterContainerProxy, endpointMgr kafka.EndpointManager, corePairTopic, coreInstanceID string, defaultCoreTimeout time.Duration) (*Manager, *LogicalManager) {
 	deviceMgr := &Manager{
 		rootDevices:             make(map[string]bool),
 		kafkaICProxy:            kmp,
 		adapterProxy:            remote.NewAdapterProxy(kmp, corePairTopic, endpointMgr),
 		coreInstanceID:          coreInstanceID,
-		clusterDataProxy:        proxy,
+		dProxy:                  dbProxy.Proxy("devices"),
 		adapterMgr:              adapterMgr,
 		defaultTimeout:          defaultCoreTimeout * time.Millisecond,
 		deviceLoadingInProgress: make(map[string][]chan int),
@@ -74,7 +74,8 @@
 		Manager:                        event.NewManager(),
 		deviceMgr:                      deviceMgr,
 		kafkaICProxy:                   kmp,
-		clusterDataProxy:               proxy,
+		dbProxy:                        dbProxy,
+		ldProxy:                        dbProxy.Proxy("logical_devices"),
 		defaultTimeout:                 defaultCoreTimeout,
 		logicalDeviceLoadingInProgress: make(map[string][]chan int),
 	}
@@ -156,7 +157,7 @@
 	// Ensure this device is set as root
 	device.Root = true
 	// Create and start a device agent for that device
-	agent := newAgent(dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
+	agent := newAgent(dMgr.adapterProxy, device, dMgr, dMgr.dProxy, dMgr.defaultTimeout)
 	device, err = agent.start(ctx, device)
 	if err != nil {
 		logger.Errorw("Fail-to-start-device", log.Fields{"device-id": agent.deviceID, "error": err})
@@ -398,7 +399,7 @@
 	result := &voltha.Devices{}
 
 	var devices []*voltha.Device
-	if err := dMgr.clusterDataProxy.List(ctx, "devices", &devices); err != nil {
+	if err := dMgr.dProxy.List(ctx, &devices); err != nil {
 		logger.Errorw("failed-to-list-devices-from-cluster-proxy", log.Fields{"error": err})
 		return nil, err
 	}
@@ -407,7 +408,7 @@
 		// If device is not in memory then set it up
 		if !dMgr.IsDeviceInCache(device.Id) {
 			logger.Debugw("loading-device-from-Model", log.Fields{"id": device.Id})
-			agent := newAgent(dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
+			agent := newAgent(dMgr.adapterProxy, device, dMgr, dMgr.dProxy, dMgr.defaultTimeout)
 			if _, err := agent.start(ctx, nil); err != nil {
 				logger.Warnw("failure-starting-agent", log.Fields{"deviceId": device.Id})
 			} else {
@@ -424,7 +425,7 @@
 func (dMgr *Manager) isParentDeviceExist(ctx context.Context, newDevice *voltha.Device) (bool, error) {
 	hostPort := newDevice.GetHostAndPort()
 	var devices []*voltha.Device
-	if err := dMgr.clusterDataProxy.List(ctx, "devices", &devices); err != nil {
+	if err := dMgr.dProxy.List(ctx, &devices); err != nil {
 		logger.Errorw("Failed to list devices from cluster data proxy", log.Fields{"error": err})
 		return false, err
 	}
@@ -445,7 +446,7 @@
 //getDeviceFromModelretrieves the device data from the model.
 func (dMgr *Manager) getDeviceFromModel(ctx context.Context, deviceID string) (*voltha.Device, error) {
 	device := &voltha.Device{}
-	if have, err := dMgr.clusterDataProxy.Get(ctx, "devices/"+deviceID, device); err != nil {
+	if have, err := dMgr.dProxy.Get(ctx, deviceID, device); err != nil {
 		logger.Errorw("failed-to-get-device-info-from-cluster-proxy", log.Fields{"error": err})
 		return nil, err
 	} else if !have {
@@ -470,7 +471,7 @@
 			// Proceed with the loading only if the device exist in the Model (could have been deleted)
 			if device, err = dMgr.getDeviceFromModel(ctx, deviceID); err == nil {
 				logger.Debugw("loading-device", log.Fields{"deviceId": deviceID})
-				agent := newAgent(dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
+				agent := newAgent(dMgr.adapterProxy, device, dMgr, dMgr.dProxy, dMgr.defaultTimeout)
 				if _, err = agent.start(ctx, nil); err != nil {
 					logger.Warnw("Failure loading device", log.Fields{"deviceId": deviceID, "error": err})
 				} else {
@@ -1028,7 +1029,7 @@
 	childDevice.ProxyAddress = &voltha.Device_ProxyAddress{DeviceId: parentDeviceID, DeviceType: pAgent.deviceType, ChannelId: uint32(channelID), OnuId: uint32(onuID)}
 
 	// Create and start a device agent for that device
-	agent := newAgent(dMgr.adapterProxy, childDevice, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
+	agent := newAgent(dMgr.adapterProxy, childDevice, dMgr, dMgr.dProxy, dMgr.defaultTimeout)
 	childDevice, err := agent.start(ctx, childDevice)
 	if err != nil {
 		logger.Errorw("error-starting-child-device", log.Fields{"parent-device-id": childDevice.ParentId, "child-device-id": agent.deviceID, "error": err})
diff --git a/rw_core/core/device/meter/loader.go b/rw_core/core/device/meter/loader.go
index ae6c957..daae9ae 100644
--- a/rw_core/core/device/meter/loader.go
+++ b/rw_core/core/device/meter/loader.go
@@ -19,7 +19,6 @@
 import (
 	"context"
 	"fmt"
-	"strconv"
 	"sync"
 
 	"github.com/opencord/voltha-go/db/model"
@@ -35,8 +34,7 @@
 	lock   sync.RWMutex
 	meters map[uint32]*chunk
 
-	dbProxy         *model.Proxy
-	logicalDeviceID string // TODO: dbProxy should already have the logicalDeviceID component of the path internally
+	dbProxy *model.Proxy
 }
 
 // chunk keeps a meter and the lock for this meter
@@ -48,11 +46,10 @@
 	meter *ofp.OfpMeterEntry
 }
 
-func NewLoader(dataProxy *model.Proxy, logicalDeviceID string) *Loader {
+func NewLoader(dbProxy *model.Proxy) *Loader {
 	return &Loader{
-		meters:          make(map[uint32]*chunk),
-		dbProxy:         dataProxy,
-		logicalDeviceID: logicalDeviceID,
+		meters:  make(map[uint32]*chunk),
+		dbProxy: dbProxy,
 	}
 }
 
@@ -63,7 +60,7 @@
 	defer loader.lock.Unlock()
 
 	var meters []*ofp.OfpMeterEntry
-	if err := loader.dbProxy.List(ctx, "logical_meters/"+loader.logicalDeviceID, &meters); err != nil {
+	if err := loader.dbProxy.List(ctx, &meters); err != nil {
 		logger.Errorw("failed-to-list-meters-from-cluster-data-proxy", log.Fields{"error": err})
 		return
 	}
@@ -88,9 +85,8 @@
 		entry.lock.Lock()
 		loader.lock.Unlock()
 
-		meterID := strconv.FormatUint(uint64(meter.Config.MeterId), 10)
-		if err := loader.dbProxy.AddWithID(ctx, "logical_meters/"+loader.logicalDeviceID, meterID, meter); err != nil {
-			logger.Errorw("failed-adding-meter-to-db", log.Fields{"deviceID": loader.logicalDeviceID, "meterID": meterID, "err": err})
+		if err := loader.dbProxy.Set(ctx, fmt.Sprint(meter.Config.MeterId), meter); err != nil {
+			logger.Errorw("failed-adding-meter-to-db", log.Fields{"meterID": meter.Config.MeterId, "err": err})
 
 			// revert the map
 			loader.lock.Lock()
@@ -147,9 +143,8 @@
 // Update updates an existing meter in the kv.
 // The provided "meter" must not be modified afterwards.
 func (h *Handle) Update(ctx context.Context, meter *ofp.OfpMeterEntry) error {
-	path := fmt.Sprintf("logical_meters/%s/%d", h.loader.logicalDeviceID, meter.Config.MeterId)
-	if err := h.loader.dbProxy.Update(ctx, path, meter); err != nil {
-		return status.Errorf(codes.Internal, "failed-update-meter:%s:%d %s", h.loader.logicalDeviceID, meter.Config.MeterId, err)
+	if err := h.loader.dbProxy.Set(ctx, fmt.Sprint(meter.Config.MeterId), meter); err != nil {
+		return status.Errorf(codes.Internal, "failed-update-meter-%v: %s", meter.Config.MeterId, err)
 	}
 	h.chunk.meter = meter
 	return nil
@@ -157,9 +152,8 @@
 
 // Delete removes the device from the kv
 func (h *Handle) Delete(ctx context.Context) error {
-	path := fmt.Sprintf("logical_meters/%s/%d", h.loader.logicalDeviceID, h.chunk.meter.Config.MeterId)
-	if err := h.loader.dbProxy.Remove(ctx, path); err != nil {
-		return fmt.Errorf("couldnt-delete-meter-from-store-%s", path)
+	if err := h.loader.dbProxy.Remove(ctx, fmt.Sprint(h.chunk.meter.Config.MeterId)); err != nil {
+		return fmt.Errorf("couldnt-delete-meter-from-store-%v", h.chunk.meter.Config.MeterId)
 	}
 	h.chunk.deleted = true