VOL-2999 - Reworked how Proxies are created & used.
Added DB Paths to separate location specification logic from entry access logic.
Also merged Update() and AddWithID() and renamed to Set().
Change-Id: I9ed5eafd63c180dddc5845a166554f89bda12325
diff --git a/rw_core/core/device/logical_agent.go b/rw_core/core/device/logical_agent.go
index dbb2da5..f943106 100644
--- a/rw_core/core/device/logical_agent.go
+++ b/rw_core/core/device/logical_agent.go
@@ -47,7 +47,7 @@
rootDeviceID string
deviceMgr *Manager
ldeviceMgr *LogicalManager
- clusterDataProxy *model.Proxy
+ ldProxy *model.Proxy
stopped bool
deviceRoutes *route.DeviceRoutes
logicalPortsNo map[uint32]bool //value is true for NNI port
@@ -64,23 +64,23 @@
groupLoader *group.Loader
}
-func newLogicalDeviceAgent(id string, sn string, deviceID string, ldeviceMgr *LogicalManager,
- deviceMgr *Manager, cdProxy *model.Proxy, defaultTimeout time.Duration) *LogicalAgent {
+func newLogicalAgent(id string, sn string, deviceID string, ldeviceMgr *LogicalManager,
+ deviceMgr *Manager, dbProxy *model.Path, ldProxy *model.Proxy, defaultTimeout time.Duration) *LogicalAgent {
agent := &LogicalAgent{
- logicalDeviceID: id,
- serialNumber: sn,
- rootDeviceID: deviceID,
- deviceMgr: deviceMgr,
- clusterDataProxy: cdProxy,
- ldeviceMgr: ldeviceMgr,
- flowDecomposer: fd.NewFlowDecomposer(deviceMgr),
- logicalPortsNo: make(map[uint32]bool),
- defaultTimeout: defaultTimeout,
- requestQueue: coreutils.NewRequestQueue(),
+ logicalDeviceID: id,
+ serialNumber: sn,
+ rootDeviceID: deviceID,
+ deviceMgr: deviceMgr,
+ ldProxy: ldProxy,
+ ldeviceMgr: ldeviceMgr,
+ flowDecomposer: fd.NewFlowDecomposer(deviceMgr),
+ logicalPortsNo: make(map[uint32]bool),
+ defaultTimeout: defaultTimeout,
+ requestQueue: coreutils.NewRequestQueue(),
- flowLoader: flow.NewLoader(cdProxy, id),
- meterLoader: meter.NewLoader(cdProxy, id),
- groupLoader: group.NewLoader(cdProxy, id),
+ flowLoader: flow.NewLoader(dbProxy.SubPath("logical_flows").Proxy(id)),
+ groupLoader: group.NewLoader(dbProxy.SubPath("logical_groups").Proxy(id)),
+ meterLoader: meter.NewLoader(dbProxy.SubPath("logical_meters").Proxy(id)),
}
agent.deviceRoutes = route.NewDeviceRoutes(agent.logicalDeviceID, agent.deviceMgr.getDevice)
return agent
@@ -128,7 +128,7 @@
ld.Ports = []*voltha.LogicalPort{}
// Save the logical device
- if err := agent.clusterDataProxy.AddWithID(ctx, "logical_devices", ld.Id, ld); err != nil {
+ if err := agent.ldProxy.Set(ctx, ld.Id, ld); err != nil {
logger.Errorw("failed-to-add-logical-device", log.Fields{"logical-device-id": agent.logicalDeviceID})
return err
}
@@ -147,7 +147,7 @@
// load from dB - the logical may not exist at this time. On error, just return and the calling function
// will destroy this agent.
ld := &voltha.LogicalDevice{}
- have, err := agent.clusterDataProxy.Get(ctx, "logical_devices/"+agent.logicalDeviceID, ld)
+ have, err := agent.ldProxy.Get(ctx, agent.logicalDeviceID, ld)
if err != nil {
return err
} else if !have {
@@ -195,7 +195,7 @@
defer agent.requestQueue.RequestComplete()
//Remove the logical device from the model
- if err := agent.clusterDataProxy.Remove(ctx, "logical_devices/"+agent.logicalDeviceID); err != nil {
+ if err := agent.ldProxy.Remove(ctx, agent.logicalDeviceID); err != nil {
returnErr = err
} else {
logger.Debugw("logicaldevice-removed", log.Fields{"logicaldeviceId": agent.logicalDeviceID})
@@ -230,7 +230,7 @@
}
updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
- if err := agent.clusterDataProxy.Update(updateCtx, "logical_devices/"+agent.logicalDeviceID, logicalDevice); err != nil {
+ if err := agent.ldProxy.Set(updateCtx, agent.logicalDeviceID, logicalDevice); err != nil {
logger.Errorw("failed-to-update-logical-devices-to-cluster-proxy", log.Fields{"error": err})
return err
}