VOL-2999 - Reworked how Proxies are created & used.
Added DB Paths to separate location specification logic from entry access logic.
Also merged Update() and AddWithID() and renamed to Set().
Change-Id: I9ed5eafd63c180dddc5845a166554f89bda12325
diff --git a/rw_core/core/device/flow/loader.go b/rw_core/core/device/flow/loader.go
index 741a45f..b407a3b 100644
--- a/rw_core/core/device/flow/loader.go
+++ b/rw_core/core/device/flow/loader.go
@@ -19,7 +19,6 @@
import (
"context"
"fmt"
- "strconv"
"sync"
"github.com/opencord/voltha-go/db/model"
@@ -35,8 +34,7 @@
lock sync.RWMutex
flows map[uint64]*chunk
- dbProxy *model.Proxy
- logicalDeviceID string // TODO: dbProxy should already have the logicalDeviceID component of the path internally
+ dbProxy *model.Proxy
}
// chunk keeps a flow and the lock for this flow
@@ -48,11 +46,10 @@
flow *ofp.OfpFlowStats
}
-func NewLoader(dataProxy *model.Proxy, logicalDeviceID string) *Loader {
+func NewLoader(dbProxy *model.Proxy) *Loader {
return &Loader{
- flows: make(map[uint64]*chunk),
- dbProxy: dataProxy,
- logicalDeviceID: logicalDeviceID,
+ flows: make(map[uint64]*chunk),
+ dbProxy: dbProxy,
}
}
@@ -63,7 +60,7 @@
defer loader.lock.Unlock()
var flows []*ofp.OfpFlowStats
- if err := loader.dbProxy.List(ctx, "logical_flows/"+loader.logicalDeviceID, &flows); err != nil {
+ if err := loader.dbProxy.List(ctx, &flows); err != nil {
logger.Errorw("failed-to-list-flows-from-cluster-data-proxy", log.Fields{"error": err})
return
}
@@ -88,9 +85,8 @@
entry.lock.Lock()
loader.lock.Unlock()
- flowID := strconv.FormatUint(uint64(flow.Id), 10)
- if err := loader.dbProxy.AddWithID(ctx, "logical_flows/"+loader.logicalDeviceID, flowID, flow); err != nil {
- logger.Errorw("failed-adding-flow-to-db", log.Fields{"deviceID": loader.logicalDeviceID, "flowID": flowID, "err": err})
+ if err := loader.dbProxy.Set(ctx, fmt.Sprint(flow.Id), flow); err != nil {
+ logger.Errorw("failed-adding-flow-to-db", log.Fields{"flowID": flow.Id, "err": err})
// revert the map
loader.lock.Lock()
@@ -147,9 +143,8 @@
// Update updates an existing flow in the kv.
// The provided "flow" must not be modified afterwards.
func (h *Handle) Update(ctx context.Context, flow *ofp.OfpFlowStats) error {
- path := fmt.Sprintf("logical_flows/%s/%d", h.loader.logicalDeviceID, flow.Id)
- if err := h.loader.dbProxy.Update(ctx, path, flow); err != nil {
- return status.Errorf(codes.Internal, "failed-update-flow:%s:%d %s", h.loader.logicalDeviceID, flow.Id, err)
+ if err := h.loader.dbProxy.Set(ctx, fmt.Sprint(flow.Id), flow); err != nil {
+ return status.Errorf(codes.Internal, "failed-update-flow-%v: %s", flow.Id, err)
}
h.chunk.flow = flow
return nil
@@ -157,9 +152,8 @@
// Delete removes the device from the kv
func (h *Handle) Delete(ctx context.Context) error {
- path := fmt.Sprintf("logical_flows/%s/%d", h.loader.logicalDeviceID, h.chunk.flow.Id)
- if err := h.loader.dbProxy.Remove(ctx, path); err != nil {
- return fmt.Errorf("couldnt-delete-flow-from-store-%s", path)
+ if err := h.loader.dbProxy.Remove(ctx, fmt.Sprint(h.chunk.flow.Id)); err != nil {
+ return fmt.Errorf("couldnt-delete-flow-from-store-%v", h.chunk.flow.Id)
}
h.chunk.deleted = true