[VOL-4178, VOL-3952] This commit removes flows/groups/meters persistency
This commit removes flows/groups/meters persistency from rw-core.
As part of this change, it also fixes a bug where devices were not
being loaded on an rw-core restart. This is a necessary condition
to allow the non-persistency of flows/groups/meters to work.
This commit also renames "loader" to "cache" for the flows/groups/
meters to differentiate between data that is loaded from the KV
store and the one in cache.
Change-Id: Ib14e1450021abe30b17673c2910768fb740dba51
diff --git a/VERSION b/VERSION
index 5d9ade1..c95a470 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-2.9.2
+2.9.3-dev
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 9d9f5da..43e9903 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -55,6 +55,8 @@
adapterMessageBus,
"kv-store",
"adapter-manager",
+ "device-manager",
+ "logical-device-manager",
"grpc-service",
"adapter-request-handler",
)
@@ -169,6 +171,13 @@
endpointMgr := kafka.NewEndpointManager(backend)
deviceMgr, logicalDeviceMgr := device.NewManagers(dbPath, adapterMgr, kmp, endpointMgr, cf, id, eventProxy)
+ // Start the device manager to load the devices. Wait until it is completed to prevent multiple loading happening
+ // triggered by logicalDeviceMgr.Start(Ctx)
+ deviceMgr.Start(ctx)
+
+ // Start the logical device manager to load the logical devices.
+ logicalDeviceMgr.Start(ctx)
+
// register kafka RPC handler
registerAdapterRequestHandlers(ctx, kmp, deviceMgr, adapterMgr, cf, "adapter-request-handler")
diff --git a/rw_core/core/device/agent.go b/rw_core/core/device/agent.go
index 6e32bfe..22ed4c4 100755
--- a/rw_core/core/device/agent.go
+++ b/rw_core/core/device/agent.go
@@ -71,8 +71,8 @@
stopReconcilingMutex sync.RWMutex
config *config.RWCoreFlags
- flowLoader *flow.Loader
- groupLoader *group.Loader
+ flowCache *flow.Cache
+ groupCache *group.Cache
portLoader *port.Loader
transientStateLoader *transientstate.Loader
}
@@ -98,8 +98,8 @@
device: proto.Clone(device).(*voltha.Device),
requestQueue: coreutils.NewRequestQueue(),
config: deviceMgr.config,
- flowLoader: flow.NewLoader(dbPath.SubPath("flows").Proxy(deviceID)),
- groupLoader: group.NewLoader(dbPath.SubPath("groups").Proxy(deviceID)),
+ flowCache: flow.NewCache(),
+ groupCache: group.NewCache(),
portLoader: port.NewLoader(dbPath.SubPath("ports").Proxy(deviceID)),
transientStateLoader: transientstate.NewLoader(dbPath.SubPath("core").Proxy("transientstate"), deviceID),
}
@@ -108,7 +108,7 @@
// start() saves the device to the data model and registers for callbacks on that device if deviceToCreate!=nil.
// Otherwise, it will load the data from the dB and setup the necessary callbacks and proxies. Returns the device that
// was started.
-func (agent *Agent) start(ctx context.Context, deviceToCreate *voltha.Device) (*voltha.Device, error) {
+func (agent *Agent) start(ctx context.Context, deviceExist bool, deviceToCreate *voltha.Device) (*voltha.Device, error) {
needToStart := false
if agent.startOnce.Do(func() { needToStart = true }); !needToStart {
return agent.getDeviceReadOnly(ctx)
@@ -121,23 +121,21 @@
}
}
}()
-
- var device *voltha.Device
- if deviceToCreate == nil {
- // Load the existing device
- device := &voltha.Device{}
- have, err := agent.dbProxy.Get(ctx, agent.deviceID, device)
- if err != nil {
- return nil, err
- } else if !have {
- return nil, status.Errorf(codes.NotFound, "device-%s", agent.deviceID)
+ if deviceExist {
+ device := deviceToCreate
+ if device == nil {
+ // Load from dB
+ device = &voltha.Device{}
+ have, err := agent.dbProxy.Get(ctx, agent.deviceID, device)
+ if err != nil {
+ return nil, err
+ } else if !have {
+ return nil, status.Errorf(codes.NotFound, "device-%s", agent.deviceID)
+ }
}
-
agent.deviceType = device.Adapter
agent.device = proto.Clone(device).(*voltha.Device)
- // load the flows and groups from KV to cache
- agent.flowLoader.Load(ctx)
- agent.groupLoader.Load(ctx)
+ // load the ports from KV to cache
agent.portLoader.Load(ctx)
agent.transientStateLoader.Load(ctx)
@@ -154,7 +152,7 @@
// Assumption is that AdminState, FlowGroups, and Flows are uninitialized since this
// is a new device, so populate them here before passing the device to ldProxy.Set.
// agent.deviceId will also have been set during newAgent().
- device = (proto.Clone(deviceToCreate)).(*voltha.Device)
+ device := (proto.Clone(deviceToCreate)).(*voltha.Device)
device.Id = agent.deviceID
device.AdminState = voltha.AdminState_PREPROVISIONED
currState = device.AdminState
@@ -229,8 +227,6 @@
agent.deviceType = device.Adapter
agent.device = device
- agent.flowLoader.Load(ctx)
- agent.groupLoader.Load(ctx)
agent.portLoader.Load(ctx)
agent.transientStateLoader.Load(ctx)
diff --git a/rw_core/core/device/agent_flow.go b/rw_core/core/device/agent_flow.go
index 3b04c02..2ac9bd7 100644
--- a/rw_core/core/device/agent_flow.go
+++ b/rw_core/core/device/agent_flow.go
@@ -33,10 +33,10 @@
// listDeviceFlows returns device flows
func (agent *Agent) listDeviceFlows() map[uint64]*ofp.OfpFlowStats {
- flowIDs := agent.flowLoader.ListIDs()
+ flowIDs := agent.flowCache.ListIDs()
flows := make(map[uint64]*ofp.OfpFlowStats, len(flowIDs))
for flowID := range flowIDs {
- if flowHandle, have := agent.flowLoader.Lock(flowID); have {
+ if flowHandle, have := agent.flowCache.Lock(flowID); have {
flows[flowID] = flowHandle.GetReadOnly()
flowHandle.Unlock()
}
@@ -69,7 +69,7 @@
flowsToAdd := make([]*ofp.OfpFlowStats, 0)
flowsToDelete := make([]*ofp.OfpFlowStats, 0)
for _, flow := range newFlows {
- flowHandle, created, err := agent.flowLoader.LockOrCreate(ctx, flow)
+ flowHandle, created, err := agent.flowCache.LockOrCreate(ctx, flow)
if err != nil {
desc = err.Error()
agent.logDeviceUpdate(ctx, "addFlowsToAdapter", nil, nil, operStatus, &desc)
@@ -167,7 +167,7 @@
return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
}
for _, flow := range flowsToDel {
- if flowHandle, have := agent.flowLoader.Lock(flow.Id); have {
+ if flowHandle, have := agent.flowCache.Lock(flow.Id); have {
// Update the store and cache
if err := flowHandle.Delete(ctx); err != nil {
flowHandle.Unlock()
@@ -245,7 +245,7 @@
flowsToAdd := make([]*ofp.OfpFlowStats, 0, len(updatedFlows))
flowsToDelete := make([]*ofp.OfpFlowStats, 0, len(updatedFlows))
for _, flow := range updatedFlows {
- if flowHandle, have := agent.flowLoader.Lock(flow.Id); have {
+ if flowHandle, have := agent.flowCache.Lock(flow.Id); have {
flowToDelete := flowHandle.GetReadOnly()
// Update the store and cache
if err := flowHandle.Update(ctx, flow); err != nil {
@@ -318,8 +318,8 @@
func (agent *Agent) filterOutFlows(ctx context.Context, uniPort uint32, flowMetadata *voltha.FlowMetadata) error {
var flowsToDelete []*ofp.OfpFlowStats
// If an existing flow has the uniPort as an InPort or OutPort or as a Tunnel ID then it needs to be removed
- for flowID := range agent.flowLoader.ListIDs() {
- if flowHandle, have := agent.flowLoader.Lock(flowID); have {
+ for flowID := range agent.flowCache.ListIDs() {
+ if flowHandle, have := agent.flowCache.Lock(flowID); have {
flow := flowHandle.GetReadOnly()
if flow != nil && (fu.GetInPort(flow) == uniPort || fu.GetOutPort(flow) == uniPort || fu.GetTunnelId(flow) == uint64(uniPort)) {
flowsToDelete = append(flowsToDelete, flow)
@@ -353,8 +353,8 @@
defer agent.logDeviceUpdate(ctx, "deleteAllFlows", nil, nil, operStatus, &desc)
- for flowID := range agent.flowLoader.ListIDs() {
- if flowHandle, have := agent.flowLoader.Lock(flowID); have {
+ for flowID := range agent.flowCache.ListIDs() {
+ if flowHandle, have := agent.flowCache.Lock(flowID); have {
// Update the store and cache
if err := flowHandle.Delete(ctx); err != nil {
flowHandle.Unlock()
diff --git a/rw_core/core/device/agent_group.go b/rw_core/core/device/agent_group.go
index 5db0ea5..67fa74c 100644
--- a/rw_core/core/device/agent_group.go
+++ b/rw_core/core/device/agent_group.go
@@ -33,10 +33,10 @@
// listDeviceGroups returns logical device flow groups
func (agent *Agent) listDeviceGroups() map[uint32]*ofp.OfpGroupEntry {
- groupIDs := agent.groupLoader.ListIDs()
+ groupIDs := agent.groupCache.ListIDs()
groups := make(map[uint32]*ofp.OfpGroupEntry, len(groupIDs))
for groupID := range groupIDs {
- if groupHandle, have := agent.groupLoader.Lock(groupID); have {
+ if groupHandle, have := agent.groupCache.Lock(groupID); have {
groups[groupID] = groupHandle.GetReadOnly()
groupHandle.Unlock()
}
@@ -71,7 +71,7 @@
groupsToAdd := make([]*ofp.OfpGroupEntry, 0)
groupsToDelete := make([]*ofp.OfpGroupEntry, 0)
for _, group := range newGroups {
- groupHandle, created, err := agent.groupLoader.LockOrCreate(ctx, group)
+ groupHandle, created, err := agent.groupCache.LockOrCreate(ctx, group)
if err != nil {
desc = err.Error()
agent.logDeviceUpdate(ctx, "addGroupsToAdapter", nil, nil, operStatus, &desc)
@@ -170,7 +170,7 @@
}
for _, group := range groupsToDel {
- if groupHandle, have := agent.groupLoader.Lock(group.Desc.GroupId); have {
+ if groupHandle, have := agent.groupCache.Lock(group.Desc.GroupId); have {
// Update the store and cache
if err := groupHandle.Delete(ctx); err != nil {
groupHandle.Unlock()
@@ -248,7 +248,7 @@
groupsToUpdate := make([]*ofp.OfpGroupEntry, 0)
for _, group := range updatedGroups {
- if groupHandle, have := agent.groupLoader.Lock(group.Desc.GroupId); have {
+ if groupHandle, have := agent.groupCache.Lock(group.Desc.GroupId); have {
// Update the store and cache
if err := groupHandle.Update(ctx, group); err != nil {
groupHandle.Unlock()
diff --git a/rw_core/core/device/agent_test.go b/rw_core/core/device/agent_test.go
index daf2508..88d901d 100755
--- a/rw_core/core/device/agent_test.go
+++ b/rw_core/core/device/agent_test.go
@@ -174,7 +174,7 @@
deviceMgr := dat.deviceMgr
clonedDevice := proto.Clone(dat.device).(*voltha.Device)
deviceAgent := newAgent(deviceMgr.adapterProxy, clonedDevice, deviceMgr, deviceMgr.dbPath, deviceMgr.dProxy, deviceMgr.defaultTimeout)
- d, err := deviceAgent.start(context.TODO(), clonedDevice)
+ d, err := deviceAgent.start(context.TODO(), false, clonedDevice)
assert.Nil(t, err)
assert.NotNil(t, d)
for _, port := range dat.devicePorts {
diff --git a/rw_core/core/device/flow/cache.go b/rw_core/core/device/flow/cache.go
new file mode 100644
index 0000000..b4e358e
--- /dev/null
+++ b/rw_core/core/device/flow/cache.go
@@ -0,0 +1,148 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package flow
+
+import (
+ "context"
+ "sync"
+
+ ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
+)
+
+// Cache hides all low-level locking & synchronization related to flow state updates
+type Cache struct {
+ // this lock protects the flows map, it does not protect individual flows
+ lock sync.RWMutex
+ flows map[uint64]*chunk
+}
+
+// chunk keeps a flow and the lock for this flow
+type chunk struct {
+ // this lock is used to synchronize all access to the flow, and also to the "deleted" variable
+ lock sync.Mutex
+ deleted bool
+
+ flow *ofp.OfpFlowStats
+}
+
+func NewCache() *Cache {
+ return &Cache{
+ flows: make(map[uint64]*chunk),
+ }
+}
+
+// LockOrCreate locks this flow if it exists, or creates a new flow if it does not.
+// In the case of flow creation, the provided "flow" must not be modified afterwards.
+func (cache *Cache) LockOrCreate(ctx context.Context, flow *ofp.OfpFlowStats) (*Handle, bool, error) {
+ // try to use read lock instead of full lock if possible
+ if handle, have := cache.Lock(flow.Id); have {
+ return handle, false, nil
+ }
+
+ cache.lock.Lock()
+ entry, have := cache.flows[flow.Id]
+ if !have {
+ entry := &chunk{flow: flow}
+ cache.flows[flow.Id] = entry
+ entry.lock.Lock()
+ cache.lock.Unlock()
+
+ return &Handle{loader: cache, chunk: entry}, true, nil
+ }
+ cache.lock.Unlock()
+
+ entry.lock.Lock()
+ if entry.deleted {
+ entry.lock.Unlock()
+ return cache.LockOrCreate(ctx, flow)
+ }
+ return &Handle{loader: cache, chunk: entry}, false, nil
+}
+
+// Lock acquires the lock for this flow, and returns a handle which can be used to access the flow until it's unlocked.
+// This handle ensures that the flow cannot be accessed if the lock is not held.
+// Returns false if the flow is not present.
+// TODO: consider accepting a ctx and aborting the lock attempt on cancellation
+func (cache *Cache) Lock(id uint64) (*Handle, bool) {
+ cache.lock.RLock()
+ entry, have := cache.flows[id]
+ cache.lock.RUnlock()
+
+ if !have {
+ return nil, false
+ }
+
+ entry.lock.Lock()
+ if entry.deleted {
+ entry.lock.Unlock()
+ return cache.Lock(id)
+ }
+ return &Handle{loader: cache, chunk: entry}, true
+}
+
+// Handle is allocated for each Lock() call, all modifications are made using it, and it is invalidated by Unlock()
+// This enforces correct Lock()-Usage()-Unlock() ordering.
+type Handle struct {
+ loader *Cache
+ chunk *chunk
+}
+
+// GetReadOnly returns an *ofp.OfpFlowStats which MUST NOT be modified externally, but which is safe to keep indefinitely
+func (h *Handle) GetReadOnly() *ofp.OfpFlowStats {
+ return h.chunk.flow
+}
+
+// Update updates an existing flow in cache.
+// The provided "flow" must not be modified afterwards.
+func (h *Handle) Update(ctx context.Context, flow *ofp.OfpFlowStats) error {
+ h.chunk.flow = flow
+ return nil
+}
+
+// Delete removes the flow from the cache
+func (h *Handle) Delete(ctx context.Context) error {
+ h.chunk.deleted = true
+
+ h.loader.lock.Lock()
+ delete(h.loader.flows, h.chunk.flow.Id)
+ h.loader.lock.Unlock()
+
+ h.Unlock()
+ return nil
+}
+
+// Unlock releases the lock on the flow
+func (h *Handle) Unlock() {
+ if h.chunk != nil {
+ h.chunk.lock.Unlock()
+ h.chunk = nil // attempting to access the flow through this handle in future will panic
+ }
+}
+
+// ListIDs returns a snapshot of all the managed flow IDs
+// TODO: iterating through flows safely is expensive now, since all flows are stored & locked separately
+// should avoid this where possible
+func (cache *Cache) ListIDs() map[uint64]struct{} {
+ cache.lock.RLock()
+ defer cache.lock.RUnlock()
+ // copy the IDs so caller can safely iterate
+ ret := make(map[uint64]struct{}, len(cache.flows))
+ for id := range cache.flows {
+ ret[id] = struct{}{}
+ }
+ return ret
+}
diff --git a/rw_core/core/device/flow/loader_test.go b/rw_core/core/device/flow/cache_test.go
similarity index 90%
rename from rw_core/core/device/flow/loader_test.go
rename to rw_core/core/device/flow/cache_test.go
index 536a447..a418904 100644
--- a/rw_core/core/device/flow/loader_test.go
+++ b/rw_core/core/device/flow/cache_test.go
@@ -29,14 +29,14 @@
// TestLoadersIdentical ensures that the group, flow, and meter loaders always have an identical implementation.
func TestLoadersIdentical(t *testing.T) {
- types := []string{"flow", "group", "meter", "port", "logical_port"}
+ types := []string{"flow", "group", "meter"}
identical := [][]string{
- {`ofp\.OfpFlowStats`, `ofp\.OfpGroupEntry`, `ofp\.OfpMeterEntry`, `voltha\.Port`, `voltha\.LogicalPort`},
- {`\.Id`, `\.Desc\.GroupId`, `\.Config\.MeterId`, `\.PortNo`, `\.OfpPort\.PortNo`},
- {`uint64`, `uint32`, `uint32`, `uint32`, `uint32`},
- {`Flow`, `Group`, `Meter`, `Port`, `Port`},
- {`flow`, `group`, `meter`, `port`, `port|logical_port`},
+ {`ofp\.OfpFlowStats`, `ofp\.OfpGroupEntry`, `ofp\.OfpMeterEntry`},
+ {`\.Id`, `\.Desc\.GroupId`, `\.Config\.MeterId`},
+ {`uint64`, `uint32`, `uint32`},
+ {`Flow`, `Group`, `Meter`},
+ {`flow`, `group`, `meter`},
}
regexes := make([][]*regexp.Regexp, len(identical[0]))
@@ -52,8 +52,8 @@
for i := 1; i < len(types); i++ {
if err := compare(regexes[0], regexes[i],
- "../"+types[0]+"/loader.go",
- "../"+types[i]+"/loader.go"); err != nil {
+ "../"+types[0]+"/cache.go",
+ "../"+types[i]+"/cache.go"); err != nil {
t.Error(err)
return
}
diff --git a/rw_core/core/device/flow/loader.go b/rw_core/core/device/flow/loader.go
deleted file mode 100644
index 6455aed..0000000
--- a/rw_core/core/device/flow/loader.go
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
-
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package flow
-
-import (
- "context"
- "fmt"
- "sync"
-
- "github.com/opencord/voltha-go/db/model"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
- ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/status"
-)
-
-// Loader hides all low-level locking & synchronization related to flow state updates
-type Loader struct {
- dbProxy *model.Proxy
- // this lock protects the flows map, it does not protect individual flows
- lock sync.RWMutex
- flows map[uint64]*chunk
-}
-
-// chunk keeps a flow and the lock for this flow
-type chunk struct {
- // this lock is used to synchronize all access to the flow, and also to the "deleted" variable
- lock sync.Mutex
- deleted bool
-
- flow *ofp.OfpFlowStats
-}
-
-func NewLoader(dbProxy *model.Proxy) *Loader {
- return &Loader{
- dbProxy: dbProxy,
- flows: make(map[uint64]*chunk),
- }
-}
-
-// Load queries existing flows from the kv,
-// and should only be called once when first created.
-func (loader *Loader) Load(ctx context.Context) {
- loader.lock.Lock()
- defer loader.lock.Unlock()
-
- var flows []*ofp.OfpFlowStats
- if err := loader.dbProxy.List(ctx, &flows); err != nil {
- logger.Errorw(ctx, "failed-to-list-flows-from-cluster-data-proxy", log.Fields{"error": err})
- return
- }
- for _, flow := range flows {
- loader.flows[flow.Id] = &chunk{flow: flow}
- }
-}
-
-// LockOrCreate locks this flow if it exists, or creates a new flow if it does not.
-// In the case of flow creation, the provided "flow" must not be modified afterwards.
-func (loader *Loader) LockOrCreate(ctx context.Context, flow *ofp.OfpFlowStats) (*Handle, bool, error) {
- // try to use read lock instead of full lock if possible
- if handle, have := loader.Lock(flow.Id); have {
- return handle, false, nil
- }
-
- loader.lock.Lock()
- entry, have := loader.flows[flow.Id]
- if !have {
- entry := &chunk{flow: flow}
- loader.flows[flow.Id] = entry
- entry.lock.Lock()
- loader.lock.Unlock()
-
- if err := loader.dbProxy.Set(ctx, fmt.Sprint(flow.Id), flow); err != nil {
- // revert the map
- loader.lock.Lock()
- delete(loader.flows, flow.Id)
- loader.lock.Unlock()
-
- entry.deleted = true
- entry.lock.Unlock()
- return nil, false, err
- }
- return &Handle{loader: loader, chunk: entry}, true, nil
- }
- loader.lock.Unlock()
-
- entry.lock.Lock()
- if entry.deleted {
- entry.lock.Unlock()
- return loader.LockOrCreate(ctx, flow)
- }
- return &Handle{loader: loader, chunk: entry}, false, nil
-}
-
-// Lock acquires the lock for this flow, and returns a handle which can be used to access the flow until it's unlocked.
-// This handle ensures that the flow cannot be accessed if the lock is not held.
-// Returns false if the flow is not present.
-// TODO: consider accepting a ctx and aborting the lock attempt on cancellation
-func (loader *Loader) Lock(id uint64) (*Handle, bool) {
- loader.lock.RLock()
- entry, have := loader.flows[id]
- loader.lock.RUnlock()
-
- if !have {
- return nil, false
- }
-
- entry.lock.Lock()
- if entry.deleted {
- entry.lock.Unlock()
- return loader.Lock(id)
- }
- return &Handle{loader: loader, chunk: entry}, true
-}
-
-// Handle is allocated for each Lock() call, all modifications are made using it, and it is invalidated by Unlock()
-// This enforces correct Lock()-Usage()-Unlock() ordering.
-type Handle struct {
- loader *Loader
- chunk *chunk
-}
-
-// GetReadOnly returns an *ofp.OfpFlowStats which MUST NOT be modified externally, but which is safe to keep indefinitely
-func (h *Handle) GetReadOnly() *ofp.OfpFlowStats {
- return h.chunk.flow
-}
-
-// Update updates an existing flow in the kv.
-// The provided "flow" must not be modified afterwards.
-func (h *Handle) Update(ctx context.Context, flow *ofp.OfpFlowStats) error {
- if err := h.loader.dbProxy.Set(ctx, fmt.Sprint(flow.Id), flow); err != nil {
- return status.Errorf(codes.Internal, "failed-update-flow-%v: %s", flow.Id, err)
- }
- h.chunk.flow = flow
- return nil
-}
-
-// Delete removes the device from the kv
-func (h *Handle) Delete(ctx context.Context) error {
- if err := h.loader.dbProxy.Remove(ctx, fmt.Sprint(h.chunk.flow.Id)); err != nil {
- return fmt.Errorf("couldnt-delete-flow-from-store-%v", h.chunk.flow.Id)
- }
- h.chunk.deleted = true
-
- h.loader.lock.Lock()
- delete(h.loader.flows, h.chunk.flow.Id)
- h.loader.lock.Unlock()
-
- h.Unlock()
- return nil
-}
-
-// Unlock releases the lock on the flow
-func (h *Handle) Unlock() {
- if h.chunk != nil {
- h.chunk.lock.Unlock()
- h.chunk = nil // attempting to access the flow through this handle in future will panic
- }
-}
-
-// ListIDs returns a snapshot of all the managed flow IDs
-// TODO: iterating through flows safely is expensive now, since all flows are stored & locked separately
-// should avoid this where possible
-func (loader *Loader) ListIDs() map[uint64]struct{} {
- loader.lock.RLock()
- defer loader.lock.RUnlock()
- // copy the IDs so caller can safely iterate
- ret := make(map[uint64]struct{}, len(loader.flows))
- for id := range loader.flows {
- ret[id] = struct{}{}
- }
- return ret
-}
diff --git a/rw_core/core/device/group/cache.go b/rw_core/core/device/group/cache.go
new file mode 100644
index 0000000..eb6a5a3
--- /dev/null
+++ b/rw_core/core/device/group/cache.go
@@ -0,0 +1,148 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package group
+
+import (
+ "context"
+ "sync"
+
+ ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
+)
+
+// Cache hides all low-level locking & synchronization related to group state updates
+type Cache struct {
+ // this lock protects the groups map, it does not protect individual groups
+ lock sync.RWMutex
+ groups map[uint32]*chunk
+}
+
+// chunk keeps a group and the lock for this group
+type chunk struct {
+ // this lock is used to synchronize all access to the group, and also to the "deleted" variable
+ lock sync.Mutex
+ deleted bool
+
+ group *ofp.OfpGroupEntry
+}
+
+func NewCache() *Cache {
+ return &Cache{
+ groups: make(map[uint32]*chunk),
+ }
+}
+
+// LockOrCreate locks this group if it exists, or creates a new group if it does not.
+// In the case of group creation, the provided "group" must not be modified afterwards.
+func (cache *Cache) LockOrCreate(ctx context.Context, group *ofp.OfpGroupEntry) (*Handle, bool, error) {
+ // try to use read lock instead of full lock if possible
+ if handle, have := cache.Lock(group.Desc.GroupId); have {
+ return handle, false, nil
+ }
+
+ cache.lock.Lock()
+ entry, have := cache.groups[group.Desc.GroupId]
+ if !have {
+ entry := &chunk{group: group}
+ cache.groups[group.Desc.GroupId] = entry
+ entry.lock.Lock()
+ cache.lock.Unlock()
+
+ return &Handle{loader: cache, chunk: entry}, true, nil
+ }
+ cache.lock.Unlock()
+
+ entry.lock.Lock()
+ if entry.deleted {
+ entry.lock.Unlock()
+ return cache.LockOrCreate(ctx, group)
+ }
+ return &Handle{loader: cache, chunk: entry}, false, nil
+}
+
+// Lock acquires the lock for this group, and returns a handle which can be used to access the group until it's unlocked.
+// This handle ensures that the group cannot be accessed if the lock is not held.
+// Returns false if the group is not present.
+// TODO: consider accepting a ctx and aborting the lock attempt on cancellation
+func (cache *Cache) Lock(id uint32) (*Handle, bool) {
+ cache.lock.RLock()
+ entry, have := cache.groups[id]
+ cache.lock.RUnlock()
+
+ if !have {
+ return nil, false
+ }
+
+ entry.lock.Lock()
+ if entry.deleted {
+ entry.lock.Unlock()
+ return cache.Lock(id)
+ }
+ return &Handle{loader: cache, chunk: entry}, true
+}
+
+// Handle is allocated for each Lock() call, all modifications are made using it, and it is invalidated by Unlock()
+// This enforces correct Lock()-Usage()-Unlock() ordering.
+type Handle struct {
+ loader *Cache
+ chunk *chunk
+}
+
+// GetReadOnly returns an *ofp.OfpGroupEntry which MUST NOT be modified externally, but which is safe to keep indefinitely
+func (h *Handle) GetReadOnly() *ofp.OfpGroupEntry {
+ return h.chunk.group
+}
+
+// Update updates an existing group in cache.
+// The provided "group" must not be modified afterwards.
+func (h *Handle) Update(ctx context.Context, group *ofp.OfpGroupEntry) error {
+ h.chunk.group = group
+ return nil
+}
+
+// Delete removes the group from the cache
+func (h *Handle) Delete(ctx context.Context) error {
+ h.chunk.deleted = true
+
+ h.loader.lock.Lock()
+ delete(h.loader.groups, h.chunk.group.Desc.GroupId)
+ h.loader.lock.Unlock()
+
+ h.Unlock()
+ return nil
+}
+
+// Unlock releases the lock on the group
+func (h *Handle) Unlock() {
+ if h.chunk != nil {
+ h.chunk.lock.Unlock()
+ h.chunk = nil // attempting to access the group through this handle in future will panic
+ }
+}
+
+// ListIDs returns a snapshot of all the managed group IDs
+// TODO: iterating through groups safely is expensive now, since all groups are stored & locked separately
+// should avoid this where possible
+func (cache *Cache) ListIDs() map[uint32]struct{} {
+ cache.lock.RLock()
+ defer cache.lock.RUnlock()
+ // copy the IDs so caller can safely iterate
+ ret := make(map[uint32]struct{}, len(cache.groups))
+ for id := range cache.groups {
+ ret[id] = struct{}{}
+ }
+ return ret
+}
diff --git a/rw_core/core/device/group/common.go b/rw_core/core/device/group/common.go
deleted file mode 100644
index d56cb28..0000000
--- a/rw_core/core/device/group/common.go
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Copyright 2020-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// Package core Common Logger initialization
-package group
-
-import (
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
-)
-
-var logger log.CLogger
-
-func init() {
- // Setup this package so that it's log level can be modified at run time
- var err error
- logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{})
- if err != nil {
- panic(err)
- }
-}
diff --git a/rw_core/core/device/group/loader.go b/rw_core/core/device/group/loader.go
deleted file mode 100644
index 973663f..0000000
--- a/rw_core/core/device/group/loader.go
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
-
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package group
-
-import (
- "context"
- "fmt"
- "sync"
-
- "github.com/opencord/voltha-go/db/model"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
- ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/status"
-)
-
-// Loader hides all low-level locking & synchronization related to group state updates
-type Loader struct {
- dbProxy *model.Proxy
- // this lock protects the groups map, it does not protect individual groups
- lock sync.RWMutex
- groups map[uint32]*chunk
-}
-
-// chunk keeps a group and the lock for this group
-type chunk struct {
- // this lock is used to synchronize all access to the group, and also to the "deleted" variable
- lock sync.Mutex
- deleted bool
-
- group *ofp.OfpGroupEntry
-}
-
-func NewLoader(dbProxy *model.Proxy) *Loader {
- return &Loader{
- dbProxy: dbProxy,
- groups: make(map[uint32]*chunk),
- }
-}
-
-// Load queries existing groups from the kv,
-// and should only be called once when first created.
-func (loader *Loader) Load(ctx context.Context) {
- loader.lock.Lock()
- defer loader.lock.Unlock()
-
- var groups []*ofp.OfpGroupEntry
- if err := loader.dbProxy.List(ctx, &groups); err != nil {
- logger.Errorw(ctx, "failed-to-list-groups-from-cluster-data-proxy", log.Fields{"error": err})
- return
- }
- for _, group := range groups {
- loader.groups[group.Desc.GroupId] = &chunk{group: group}
- }
-}
-
-// LockOrCreate locks this group if it exists, or creates a new group if it does not.
-// In the case of group creation, the provided "group" must not be modified afterwards.
-func (loader *Loader) LockOrCreate(ctx context.Context, group *ofp.OfpGroupEntry) (*Handle, bool, error) {
- // try to use read lock instead of full lock if possible
- if handle, have := loader.Lock(group.Desc.GroupId); have {
- return handle, false, nil
- }
-
- loader.lock.Lock()
- entry, have := loader.groups[group.Desc.GroupId]
- if !have {
- entry := &chunk{group: group}
- loader.groups[group.Desc.GroupId] = entry
- entry.lock.Lock()
- loader.lock.Unlock()
-
- if err := loader.dbProxy.Set(ctx, fmt.Sprint(group.Desc.GroupId), group); err != nil {
- // revert the map
- loader.lock.Lock()
- delete(loader.groups, group.Desc.GroupId)
- loader.lock.Unlock()
-
- entry.deleted = true
- entry.lock.Unlock()
- return nil, false, err
- }
- return &Handle{loader: loader, chunk: entry}, true, nil
- }
- loader.lock.Unlock()
-
- entry.lock.Lock()
- if entry.deleted {
- entry.lock.Unlock()
- return loader.LockOrCreate(ctx, group)
- }
- return &Handle{loader: loader, chunk: entry}, false, nil
-}
-
-// Lock acquires the lock for this group, and returns a handle which can be used to access the group until it's unlocked.
-// This handle ensures that the group cannot be accessed if the lock is not held.
-// Returns false if the group is not present.
-// TODO: consider accepting a ctx and aborting the lock attempt on cancellation
-func (loader *Loader) Lock(id uint32) (*Handle, bool) {
- loader.lock.RLock()
- entry, have := loader.groups[id]
- loader.lock.RUnlock()
-
- if !have {
- return nil, false
- }
-
- entry.lock.Lock()
- if entry.deleted {
- entry.lock.Unlock()
- return loader.Lock(id)
- }
- return &Handle{loader: loader, chunk: entry}, true
-}
-
-// Handle is allocated for each Lock() call, all modifications are made using it, and it is invalidated by Unlock()
-// This enforces correct Lock()-Usage()-Unlock() ordering.
-type Handle struct {
- loader *Loader
- chunk *chunk
-}
-
-// GetReadOnly returns an *ofp.OfpGroupEntry which MUST NOT be modified externally, but which is safe to keep indefinitely
-func (h *Handle) GetReadOnly() *ofp.OfpGroupEntry {
- return h.chunk.group
-}
-
-// Update updates an existing group in the kv.
-// The provided "group" must not be modified afterwards.
-func (h *Handle) Update(ctx context.Context, group *ofp.OfpGroupEntry) error {
- if err := h.loader.dbProxy.Set(ctx, fmt.Sprint(group.Desc.GroupId), group); err != nil {
- return status.Errorf(codes.Internal, "failed-update-group-%v: %s", group.Desc.GroupId, err)
- }
- h.chunk.group = group
- return nil
-}
-
-// Delete removes the device from the kv
-func (h *Handle) Delete(ctx context.Context) error {
- if err := h.loader.dbProxy.Remove(ctx, fmt.Sprint(h.chunk.group.Desc.GroupId)); err != nil {
- return fmt.Errorf("couldnt-delete-group-from-store-%v", h.chunk.group.Desc.GroupId)
- }
- h.chunk.deleted = true
-
- h.loader.lock.Lock()
- delete(h.loader.groups, h.chunk.group.Desc.GroupId)
- h.loader.lock.Unlock()
-
- h.Unlock()
- return nil
-}
-
-// Unlock releases the lock on the group
-func (h *Handle) Unlock() {
- if h.chunk != nil {
- h.chunk.lock.Unlock()
- h.chunk = nil // attempting to access the group through this handle in future will panic
- }
-}
-
-// ListIDs returns a snapshot of all the managed group IDs
-// TODO: iterating through groups safely is expensive now, since all groups are stored & locked separately
-// should avoid this where possible
-func (loader *Loader) ListIDs() map[uint32]struct{} {
- loader.lock.RLock()
- defer loader.lock.RUnlock()
- // copy the IDs so caller can safely iterate
- ret := make(map[uint32]struct{}, len(loader.groups))
- for id := range loader.groups {
- ret[id] = struct{}{}
- }
- return ret
-}
diff --git a/rw_core/core/device/logical_agent.go b/rw_core/core/device/logical_agent.go
index 917313e..a1579e8 100644
--- a/rw_core/core/device/logical_agent.go
+++ b/rw_core/core/device/logical_agent.go
@@ -58,10 +58,10 @@
startOnce sync.Once
stopOnce sync.Once
- flowLoader *flow.Loader
- meterLoader *meter.Loader
- groupLoader *group.Loader
- portLoader *port.Loader
+ flowCache *flow.Cache
+ meterCache *meter.Cache
+ groupCache *group.Cache
+ portLoader *port.Loader
}
func newLogicalAgent(ctx context.Context, id string, sn string, deviceID string, ldeviceMgr *LogicalManager,
@@ -78,21 +78,21 @@
defaultTimeout: defaultTimeout,
requestQueue: coreutils.NewRequestQueue(),
- flowLoader: flow.NewLoader(dbProxy.SubPath("logical_flows").Proxy(id)),
- groupLoader: group.NewLoader(dbProxy.SubPath("logical_groups").Proxy(id)),
- meterLoader: meter.NewLoader(dbProxy.SubPath("logical_meters").Proxy(id)),
- portLoader: port.NewLoader(dbProxy.SubPath("logical_ports").Proxy(id)),
+ flowCache: flow.NewCache(),
+ groupCache: group.NewCache(),
+ meterCache: meter.NewCache(),
+ portLoader: port.NewLoader(dbProxy.SubPath("logical_ports").Proxy(id)),
}
}
// start creates the logical device and add it to the data model
-func (agent *LogicalAgent) start(ctx context.Context, loadFromDB bool) error {
+func (agent *LogicalAgent) start(ctx context.Context, logicalDeviceExist bool, logicalDevice *voltha.LogicalDevice) error {
needToStart := false
if agent.startOnce.Do(func() { needToStart = true }); !needToStart {
return nil
}
- logger.Infow(ctx, "starting-logical-device-agent", log.Fields{"logical-device-id": agent.logicalDeviceID, "load-from-db": loadFromDB})
+ logger.Infow(ctx, "starting-logical-device-agent", log.Fields{"logical-device-id": agent.logicalDeviceID, "load-from-db": logicalDeviceExist})
var startSucceeded bool
defer func() {
@@ -104,7 +104,7 @@
}()
var ld *voltha.LogicalDevice
- if !loadFromDB {
+ if !logicalDeviceExist {
//Build the logical device based on information retrieved from the device adapter
var switchCap *ic.SwitchCapability
var err error
@@ -141,14 +141,17 @@
}
}()
} else {
- // load from dB - the logical may not exist at this time. On error, just return and the calling function
- // will destroy this agent.
- ld := &voltha.LogicalDevice{}
- have, err := agent.ldProxy.Get(ctx, agent.logicalDeviceID, ld)
- if err != nil {
- return err
- } else if !have {
- return status.Errorf(codes.NotFound, "logical_device-%s", agent.logicalDeviceID)
+ // Check to see if we need to load from dB
+ ld = logicalDevice
+ if logicalDevice == nil {
+ // load from dB
+ ld = &voltha.LogicalDevice{}
+ have, err := agent.ldProxy.Get(ctx, agent.logicalDeviceID, ld)
+ if err != nil {
+ return err
+ } else if !have {
+ return status.Errorf(codes.NotFound, "logical_device-%s", agent.logicalDeviceID)
+ }
}
// Update the root device Id
@@ -160,15 +163,12 @@
// now that the root device is known, create DeviceRoutes with it
agent.deviceRoutes = route.NewDeviceRoutes(agent.logicalDeviceID, agent.rootDeviceID, agent.deviceMgr.listDevicePorts)
- // load the flows, meters and groups from KV to cache
- agent.flowLoader.Load(ctx)
- agent.meterLoader.Load(ctx)
- agent.groupLoader.Load(ctx)
+ // load the logical ports from KV to cache
agent.portLoader.Load(ctx)
}
// Setup the device routes. Building routes may fail if the pre-conditions are not satisfied (e.g. no PON ports present)
- if loadFromDB {
+ if logicalDeviceExist {
go func() {
subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
if err := agent.buildRoutes(subCtx); err != nil {
diff --git a/rw_core/core/device/logical_agent_flow.go b/rw_core/core/device/logical_agent_flow.go
index 46d4ffb..808dfd3 100644
--- a/rw_core/core/device/logical_agent_flow.go
+++ b/rw_core/core/device/logical_agent_flow.go
@@ -36,10 +36,10 @@
// listLogicalDeviceFlows returns logical device flows
func (agent *LogicalAgent) listLogicalDeviceFlows() map[uint64]*ofp.OfpFlowStats {
- flowIDs := agent.flowLoader.ListIDs()
+ flowIDs := agent.flowCache.ListIDs()
flows := make(map[uint64]*ofp.OfpFlowStats, len(flowIDs))
for flowID := range flowIDs {
- if flowHandle, have := agent.flowLoader.Lock(flowID); have {
+ if flowHandle, have := agent.flowCache.Lock(flowID); have {
flows[flowID] = flowHandle.GetReadOnly()
flowHandle.Unlock()
}
@@ -104,7 +104,7 @@
var flowToReplace *ofp.OfpFlowStats
//if flow is not found in the map, create a new entry, otherwise get the existing one.
- flowHandle, created, err := agent.flowLoader.LockOrCreate(ctx, flow)
+ flowHandle, created, err := agent.flowCache.LockOrCreate(ctx, flow)
if err != nil {
return changed, updated, err
}
@@ -146,10 +146,10 @@
return changed, updated, err
}
- groupIDs := agent.groupLoader.ListIDs()
+ groupIDs := agent.groupCache.ListIDs()
groups := make(map[uint32]*ofp.OfpGroupEntry, len(groupIDs))
for groupID := range groupIDs {
- if groupHandle, have := agent.groupLoader.Lock(groupID); have {
+ if groupHandle, have := agent.groupCache.Lock(groupID); have {
groups[groupID] = groupHandle.GetReadOnly()
groupHandle.Unlock()
}
@@ -214,7 +214,7 @@
func (agent *LogicalAgent) revertAddedFlows(ctx context.Context, mod *ofp.OfpFlowMod, addedFlow *ofp.OfpFlowStats, replacedFlow *ofp.OfpFlowStats, deviceRules *fu.DeviceRules, metadata *voltha.FlowMetadata) error {
logger.Debugw(ctx, "revert-flow-add", log.Fields{"added-flow": addedFlow, "replaced-flow": replacedFlow, "device-rules": deviceRules, "metadata": metadata})
- flowHandle, have := agent.flowLoader.Lock(addedFlow.Id)
+ flowHandle, have := agent.flowCache.Lock(addedFlow.Id)
if !have {
// Not found - do nothing
logger.Debugw(ctx, "flow-not-found", log.Fields{"added-flow": addedFlow})
@@ -271,14 +271,14 @@
if err != nil {
return err
}
- if handle, have := agent.flowLoader.Lock(fs.Id); have {
+ if handle, have := agent.flowCache.Lock(fs.Id); have {
toDelete[fs.Id] = handle.GetReadOnly()
handle.Unlock()
}
// search through all the flows
- for flowID := range agent.flowLoader.ListIDs() {
- if flowHandle, have := agent.flowLoader.Lock(flowID); have {
+ for flowID := range agent.flowCache.ListIDs() {
+ if flowHandle, have := agent.flowCache.Lock(flowID); have {
if flow := flowHandle.GetReadOnly(); fu.FlowMatchesMod(flow, mod) {
toDelete[flow.Id] = flow
}
@@ -291,7 +291,7 @@
logger.Debugw(ctx, "flow-delete", log.Fields{"logical-device-id": agent.logicalDeviceID, "to-delete": len(toDelete)})
for _, flow := range toDelete {
- if flowHandle, have := agent.flowLoader.Lock(flow.Id); have {
+ if flowHandle, have := agent.flowCache.Lock(flow.Id); have {
// TODO: Flow should only be updated if meter is updated, and meter should only be updated if flow is updated
// currently an error while performing the second operation will leave an inconsistent state in kv.
// This should be a single atomic operation down to the kv.
@@ -318,8 +318,8 @@
}
groups := make(map[uint32]*ofp.OfpGroupEntry)
- for groupID := range agent.groupLoader.ListIDs() {
- if groupHandle, have := agent.groupLoader.Lock(groupID); have {
+ for groupID := range agent.groupCache.ListIDs() {
+ if groupHandle, have := agent.groupCache.Lock(groupID); have {
groups[groupID] = groupHandle.GetReadOnly()
groupHandle.Unlock()
}
@@ -386,7 +386,7 @@
return err
}
logger.Debugw(ctx, "flow-id-in-flow-delete-strict", log.Fields{"flow-id": flow.Id})
- flowHandle, have := agent.flowLoader.Lock(flow.Id)
+ flowHandle, have := agent.flowCache.Lock(flow.Id)
if !have {
logger.Debugw(ctx, "skipping-flow-delete-strict-request-no-flow-found", log.Fields{"flow-mod": mod})
return nil
@@ -394,8 +394,8 @@
defer flowHandle.Unlock()
groups := make(map[uint32]*ofp.OfpGroupEntry)
- for groupID := range agent.groupLoader.ListIDs() {
- if groupHandle, have := agent.groupLoader.Lock(groupID); have {
+ for groupID := range agent.groupCache.ListIDs() {
+ if groupHandle, have := agent.groupCache.Lock(groupID); have {
groups[groupID] = groupHandle.GetReadOnly()
groupHandle.Unlock()
}
@@ -487,8 +487,8 @@
func (agent *LogicalAgent) deleteFlowsHavingMeter(ctx context.Context, meterID uint32) error {
logger.Infow(ctx, "delete-flows-matching-meter", log.Fields{"meter": meterID})
- for flowID := range agent.flowLoader.ListIDs() {
- if flowHandle, have := agent.flowLoader.Lock(flowID); have {
+ for flowID := range agent.flowCache.ListIDs() {
+ if flowHandle, have := agent.flowCache.Lock(flowID); have {
if flowMeterID := fu.GetMeterIdFromFlow(flowHandle.GetReadOnly()); flowMeterID != 0 && flowMeterID == meterID {
if err := flowHandle.Delete(ctx); err != nil {
//TODO: Think on carrying on and deleting the remaining flows, instead of returning.
@@ -506,8 +506,8 @@
func (agent *LogicalAgent) deleteFlowsHavingGroup(ctx context.Context, groupID uint32) (map[uint64]*ofp.OfpFlowStats, error) {
logger.Infow(ctx, "delete-flows-matching-group", log.Fields{"group-id": groupID})
flowsRemoved := make(map[uint64]*ofp.OfpFlowStats)
- for flowID := range agent.flowLoader.ListIDs() {
- if flowHandle, have := agent.flowLoader.Lock(flowID); have {
+ for flowID := range agent.flowCache.ListIDs() {
+ if flowHandle, have := agent.flowCache.Lock(flowID); have {
if flow := flowHandle.GetReadOnly(); fu.FlowHasOutGroup(flow, groupID) {
if err := flowHandle.Delete(ctx); err != nil {
return nil, err
diff --git a/rw_core/core/device/logical_agent_group.go b/rw_core/core/device/logical_agent_group.go
index df324d5..780eec3 100644
--- a/rw_core/core/device/logical_agent_group.go
+++ b/rw_core/core/device/logical_agent_group.go
@@ -32,10 +32,10 @@
// listLogicalDeviceGroups returns logical device flow groups
func (agent *LogicalAgent) listLogicalDeviceGroups() map[uint32]*ofp.OfpGroupEntry {
- groupIDs := agent.groupLoader.ListIDs()
+ groupIDs := agent.groupCache.ListIDs()
groups := make(map[uint32]*ofp.OfpGroupEntry, len(groupIDs))
for groupID := range groupIDs {
- if groupHandle, have := agent.groupLoader.Lock(groupID); have {
+ if groupHandle, have := agent.groupCache.Lock(groupID); have {
groups[groupID] = groupHandle.GetReadOnly()
groupHandle.Unlock()
}
@@ -69,7 +69,7 @@
groupEntry := fu.GroupEntryFromGroupMod(groupMod)
- groupHandle, created, err := agent.groupLoader.LockOrCreate(ctx, groupEntry)
+ groupHandle, created, err := agent.groupCache.LockOrCreate(ctx, groupEntry)
if err != nil {
return err
}
@@ -120,11 +120,11 @@
toDelete := map[uint32]struct{}{groupMod.GroupId: {}}
if groupMod.GroupId == uint32(ofp.OfpGroup_OFPG_ALL) {
- toDelete = agent.groupLoader.ListIDs()
+ toDelete = agent.groupCache.ListIDs()
}
for groupID := range toDelete {
- if groupHandle, have := agent.groupLoader.Lock(groupID); have {
+ if groupHandle, have := agent.groupCache.Lock(groupID); have {
affectedGroups[groupID] = groupHandle.GetReadOnly()
if err := groupHandle.Delete(ctx); err != nil {
return err
@@ -202,7 +202,7 @@
groupID := groupMod.GroupId
- groupHandle, have := agent.groupLoader.Lock(groupID)
+ groupHandle, have := agent.groupCache.Lock(groupID)
if !have {
return fmt.Errorf("group-absent:%d", groupID)
}
diff --git a/rw_core/core/device/logical_agent_meter.go b/rw_core/core/device/logical_agent_meter.go
index f3a1c89..d67b2df 100644
--- a/rw_core/core/device/logical_agent_meter.go
+++ b/rw_core/core/device/logical_agent_meter.go
@@ -29,10 +29,10 @@
// listLogicalDeviceMeters returns logical device meters
func (agent *LogicalAgent) listLogicalDeviceMeters() map[uint32]*ofp.OfpMeterEntry {
- meterIDs := agent.meterLoader.ListIDs()
+ meterIDs := agent.meterCache.ListIDs()
meters := make(map[uint32]*ofp.OfpMeterEntry, len(meterIDs))
for meterID := range meterIDs {
- if meterHandle, have := agent.meterLoader.Lock(meterID); have {
+ if meterHandle, have := agent.meterCache.Lock(meterID); have {
meters[meterID] = meterHandle.GetReadOnly()
meterHandle.Unlock()
}
@@ -67,7 +67,7 @@
meterEntry := fu.MeterEntryFromMeterMod(ctx, meterMod)
- meterHandle, created, err := agent.meterLoader.LockOrCreate(ctx, meterEntry)
+ meterHandle, created, err := agent.meterCache.LockOrCreate(ctx, meterEntry)
if err != nil {
return err
}
@@ -88,7 +88,7 @@
}
logger.Debug(ctx, "meterDelete", log.Fields{"meterMod": *meterMod, "logical-device-id": agent.logicalDeviceID})
- meterHandle, have := agent.meterLoader.Lock(meterMod.MeterId)
+ meterHandle, have := agent.meterCache.Lock(meterMod.MeterId)
if !have {
logger.Warnw(ctx, "meter-not-found", log.Fields{"meterID": meterMod.MeterId, "logical-device-id": agent.logicalDeviceID})
return nil
@@ -116,7 +116,7 @@
return nil
}
- meterHandle, have := agent.meterLoader.Lock(meterMod.MeterId)
+ meterHandle, have := agent.meterCache.Lock(meterMod.MeterId)
if !have {
return fmt.Errorf("no-meter-to-modify: %d, logical-device-id: %s", meterMod.MeterId, agent.logicalDeviceID)
}
diff --git a/rw_core/core/device/logical_agent_meter_helpers.go b/rw_core/core/device/logical_agent_meter_helpers.go
index 8bffba9..654392e 100644
--- a/rw_core/core/device/logical_agent_meter_helpers.go
+++ b/rw_core/core/device/logical_agent_meter_helpers.go
@@ -32,7 +32,7 @@
if flowMeterID := fu.GetMeterIdFromFlow(flow); flowMeterID != 0 {
if _, have := metersConfig[flowMeterID]; !have {
// Meter is present in the flow, Get from logical device
- meterHandle, have := agent.meterLoader.Lock(flowMeterID)
+ meterHandle, have := agent.meterCache.Lock(flowMeterID)
if !have {
logger.Errorw(ctx, "Meter-referred-by-flow-is-not-found-in-logicaldevice",
log.Fields{"meterID": flowMeterID, "Available-meters": metersConfig, "flow": *flow})
@@ -65,7 +65,7 @@
return true
}
- meterHandle, have := agent.meterLoader.Lock(meterID)
+ meterHandle, have := agent.meterCache.Lock(meterID)
if !have {
logger.Debugw(ctx, "Meter-is-not-present-in-logical-device", log.Fields{"meterID": meterID})
return true
diff --git a/rw_core/core/device/logical_agent_port.go b/rw_core/core/device/logical_agent_port.go
index c82ada0..8a6e726 100644
--- a/rw_core/core/device/logical_agent_port.go
+++ b/rw_core/core/device/logical_agent_port.go
@@ -32,8 +32,8 @@
// listLogicalDevicePorts returns logical device ports
func (agent *LogicalAgent) listLogicalDevicePorts(ctx context.Context) map[uint32]*voltha.LogicalPort {
- logger.Debug(ctx, "list-logical-device-ports")
portIDs := agent.portLoader.ListIDs()
+ logger.Debugw(ctx, "list-logical-device-ports", log.Fields{"num-ports": len(portIDs)})
ret := make(map[uint32]*voltha.LogicalPort, len(portIDs))
for portID := range portIDs {
if portHandle, have := agent.portLoader.Lock(portID); have {
diff --git a/rw_core/core/device/logical_agent_test.go b/rw_core/core/device/logical_agent_test.go
index 94f336c..011cacb 100644
--- a/rw_core/core/device/logical_agent_test.go
+++ b/rw_core/core/device/logical_agent_test.go
@@ -264,7 +264,7 @@
localWG.Wait()
meterEntry := fu.MeterEntryFromMeterMod(ctx, meterMod)
- meterHandle, have := ldAgent.meterLoader.Lock(meterMod.MeterId)
+ meterHandle, have := ldAgent.meterCache.Lock(meterMod.MeterId)
assert.Equal(t, have, true)
if have {
assert.True(t, proto.Equal(meterEntry, meterHandle.GetReadOnly()))
diff --git a/rw_core/core/device/logical_manager.go b/rw_core/core/device/logical_manager.go
index 360d83b..9a38d9c 100644
--- a/rw_core/core/device/logical_manager.go
+++ b/rw_core/core/device/logical_manager.go
@@ -19,6 +19,7 @@
import (
"context"
"errors"
+ "github.com/opencord/voltha-lib-go/v4/pkg/probe"
"io"
"strconv"
"strings"
@@ -50,6 +51,29 @@
logicalDeviceLoadingInProgress map[string][]chan int
}
+func (ldMgr *LogicalManager) Start(ctx context.Context) {
+ logger.Info(ctx, "starting-logical-device-manager")
+ probe.UpdateStatusFromContext(ctx, "logical-device-manager", probe.ServiceStatusPreparing)
+
+ // Load all the logical devices from the dB
+ var logicalDevices []*voltha.LogicalDevice
+ if err := ldMgr.ldProxy.List(ctx, &logicalDevices); err != nil {
+ logger.Fatalw(ctx, "failed-to-list-logical-devices-from-cluster-proxy", log.Fields{"error": err})
+ }
+ for _, lDevice := range logicalDevices {
+ // Create an agent for each device
+ agent := newLogicalAgent(ctx, lDevice.Id, "", "", ldMgr, ldMgr.deviceMgr, ldMgr.dbPath, ldMgr.ldProxy, ldMgr.defaultTimeout)
+ if err := agent.start(ctx, true, lDevice); err != nil {
+ logger.Warnw(ctx, "failure-starting-logical-agent", log.Fields{"logical-device-id": lDevice.Id})
+ } else {
+ ldMgr.logicalDeviceAgents.Store(agent.logicalDeviceID, agent)
+ }
+ }
+
+ probe.UpdateStatusFromContext(ctx, "logical-device-manager", probe.ServiceStatusRunning)
+ logger.Info(ctx, "logical-device-manager-started")
+}
+
func (ldMgr *LogicalManager) addLogicalDeviceAgentToMap(agent *LogicalAgent) {
if _, exist := ldMgr.logicalDeviceAgents.Load(agent.logicalDeviceID); !exist {
ldMgr.logicalDeviceAgents.Store(agent.logicalDeviceID, agent)
@@ -110,6 +134,8 @@
}
return true
})
+ logger.Debugw(ctx, "list-all-logical-devices", log.Fields{"num-logical-devices": len(logicalDevices)})
+
return &voltha.LogicalDevices{Items: logicalDevices}, nil
}
@@ -146,7 +172,7 @@
//TODO: either wait for the agent to be started before returning, or
// implement locks in the agent to ensure request are not processed before start() is complete
ldCtx := utils.WithSpanAndRPCMetadataFromContext(ctx)
- err := agent.start(ldCtx, false)
+ err := agent.start(ldCtx, false, nil)
if err != nil {
logger.Errorw(ctx, "unable-to-create-the-logical-device", log.Fields{"error": err})
ldMgr.deleteLogicalDeviceAgent(id)
@@ -207,7 +233,7 @@
if _, err := ldMgr.getLogicalDeviceFromModel(ctx, lDeviceID); err == nil {
logger.Debugw(ctx, "loading-logical-device", log.Fields{"lDeviceId": lDeviceID})
agent := newLogicalAgent(ctx, lDeviceID, "", "", ldMgr, ldMgr.deviceMgr, ldMgr.dbPath, ldMgr.ldProxy, ldMgr.defaultTimeout)
- if err := agent.start(ctx, true); err != nil {
+ if err := agent.start(ctx, true, nil); err != nil {
return err
}
ldMgr.logicalDeviceAgents.Store(agent.logicalDeviceID, agent)
@@ -301,6 +327,7 @@
ret[ctr] = flow
ctr++
}
+ logger.Debugw(ctx, "list-logical-device-flows", log.Fields{"logical-device-id": id.Id, "num-flows": len(flows)})
return &openflow_13.Flows{Items: ret}, nil
}
@@ -319,6 +346,7 @@
ret[ctr] = group
ctr++
}
+ logger.Debugw(ctx, "list-logical-device-flow-groups", log.Fields{"logical-device-id": id.Id, "num-groups": len(groups)})
return &openflow_13.FlowGroups{Items: ret}, nil
}
@@ -337,6 +365,7 @@
ret[ctr] = port
ctr++
}
+ logger.Debugw(ctx, "list-logical-device-ports", log.Fields{"logical-device-id": id.Id, "num-ports": len(ports)})
return &voltha.LogicalPorts{Items: ret}, nil
}
diff --git a/rw_core/core/device/manager.go b/rw_core/core/device/manager.go
index b3fa7b7..3533046 100755
--- a/rw_core/core/device/manager.go
+++ b/rw_core/core/device/manager.go
@@ -20,6 +20,7 @@
"context"
"errors"
"github.com/opencord/voltha-go/rw_core/config"
+ "github.com/opencord/voltha-lib-go/v4/pkg/probe"
"sync"
"time"
@@ -95,6 +96,33 @@
return deviceMgr, logicalDeviceMgr
}
+func (dMgr *Manager) Start(ctx context.Context) {
+ logger.Info(ctx, "starting-device-manager")
+ probe.UpdateStatusFromContext(ctx, "device-manager", probe.ServiceStatusPreparing)
+
+ // Load all the devices from the dB
+ var devices []*voltha.Device
+ if err := dMgr.dProxy.List(ctx, &devices); err != nil {
+ // Any error from the dB means if we proceed we may end up with corrupted data
+ logger.Fatalw(ctx, "failed-to-list-devices-from-KV", log.Fields{"error": err})
+ }
+
+ for _, device := range devices {
+ // Create an agent for each device
+ agent := newAgent(dMgr.adapterProxy, device, dMgr, dMgr.dbPath, dMgr.dProxy, dMgr.defaultTimeout)
+ if _, err := agent.start(ctx, true, device); err != nil {
+ logger.Warnw(ctx, "failure-starting-agent", log.Fields{"device-id": device.Id})
+ } else {
+ dMgr.addDeviceAgentToMap(agent)
+ }
+ }
+
+ // TODO: Need to trigger a reconcile at this point
+
+ probe.UpdateStatusFromContext(ctx, "device-manager", probe.ServiceStatusRunning)
+ logger.Info(ctx, "device-manager-started")
+}
+
func (dMgr *Manager) addDeviceAgentToMap(agent *Agent) {
if _, exist := dMgr.deviceAgents.Load(agent.deviceID); !exist {
dMgr.deviceAgents.Store(agent.deviceID, agent)
@@ -168,7 +196,7 @@
device.Root = true
// Create and start a device agent for that device
agent := newAgent(dMgr.adapterProxy, device, dMgr, dMgr.dbPath, dMgr.dProxy, dMgr.defaultTimeout)
- device, err = agent.start(ctx, device)
+ device, err = agent.start(ctx, false, device)
if err != nil {
logger.Errorw(ctx, "fail-to-start-device", log.Fields{"device-id": agent.deviceID, "error": err})
return nil, err
@@ -520,7 +548,7 @@
if device, err = dMgr.getDeviceFromModel(ctx, deviceID); err == nil {
logger.Debugw(ctx, "loading-device", log.Fields{"device-id": deviceID})
agent := newAgent(dMgr.adapterProxy, device, dMgr, dMgr.dbPath, dMgr.dProxy, dMgr.defaultTimeout)
- if _, err = agent.start(ctx, nil); err != nil {
+ if _, err = agent.start(ctx, true, device); err != nil {
logger.Warnw(ctx, "failure-loading-device", log.Fields{"device-id": deviceID, "error": err})
} else {
dMgr.addDeviceAgentToMap(agent)
@@ -1041,7 +1069,7 @@
// Create and start a device agent for that device
agent := newAgent(dMgr.adapterProxy, childDevice, dMgr, dMgr.dbPath, dMgr.dProxy, dMgr.defaultTimeout)
- insertedChildDevice, err := agent.start(ctx, childDevice)
+ insertedChildDevice, err := agent.start(ctx, false, childDevice)
if err != nil {
logger.Errorw(ctx, "error-starting-child-device", log.Fields{"parent-device-id": childDevice.ParentId, "child-device-id": agent.deviceID, "error": err})
return nil, err
diff --git a/rw_core/core/device/meter/cache.go b/rw_core/core/device/meter/cache.go
new file mode 100644
index 0000000..8912bbe
--- /dev/null
+++ b/rw_core/core/device/meter/cache.go
@@ -0,0 +1,148 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package meter
+
+import (
+ "context"
+ "sync"
+
+ ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
+)
+
+// Cache hides all low-level locking & synchronization related to meter state updates
+type Cache struct {
+ // this lock protects the meters map, it does not protect individual meters
+ lock sync.RWMutex
+ meters map[uint32]*chunk
+}
+
+// chunk keeps a meter and the lock for this meter
+type chunk struct {
+ // this lock is used to synchronize all access to the meter, and also to the "deleted" variable
+ lock sync.Mutex
+ deleted bool
+
+ meter *ofp.OfpMeterEntry
+}
+
+func NewCache() *Cache {
+ return &Cache{
+ meters: make(map[uint32]*chunk),
+ }
+}
+
+// LockOrCreate locks this meter if it exists, or creates a new meter if it does not.
+// In the case of meter creation, the provided "meter" must not be modified afterwards.
+func (cache *Cache) LockOrCreate(ctx context.Context, meter *ofp.OfpMeterEntry) (*Handle, bool, error) {
+ // try to use read lock instead of full lock if possible
+ if handle, have := cache.Lock(meter.Config.MeterId); have {
+ return handle, false, nil
+ }
+
+ cache.lock.Lock()
+ entry, have := cache.meters[meter.Config.MeterId]
+ if !have {
+ entry := &chunk{meter: meter}
+ cache.meters[meter.Config.MeterId] = entry
+ entry.lock.Lock()
+ cache.lock.Unlock()
+
+ return &Handle{loader: cache, chunk: entry}, true, nil
+ }
+ cache.lock.Unlock()
+
+ entry.lock.Lock()
+ if entry.deleted {
+ entry.lock.Unlock()
+ return cache.LockOrCreate(ctx, meter)
+ }
+ return &Handle{loader: cache, chunk: entry}, false, nil
+}
+
+// Lock acquires the lock for this meter, and returns a handle which can be used to access the meter until it's unlocked.
+// This handle ensures that the meter cannot be accessed if the lock is not held.
+// Returns false if the meter is not present.
+// TODO: consider accepting a ctx and aborting the lock attempt on cancellation
+func (cache *Cache) Lock(id uint32) (*Handle, bool) {
+ cache.lock.RLock()
+ entry, have := cache.meters[id]
+ cache.lock.RUnlock()
+
+ if !have {
+ return nil, false
+ }
+
+ entry.lock.Lock()
+ if entry.deleted {
+ entry.lock.Unlock()
+ return cache.Lock(id)
+ }
+ return &Handle{loader: cache, chunk: entry}, true
+}
+
+// Handle is allocated for each Lock() call, all modifications are made using it, and it is invalidated by Unlock()
+// This enforces correct Lock()-Usage()-Unlock() ordering.
+type Handle struct {
+ loader *Cache
+ chunk *chunk
+}
+
+// GetReadOnly returns an *ofp.OfpMeterEntry which MUST NOT be modified externally, but which is safe to keep indefinitely
+func (h *Handle) GetReadOnly() *ofp.OfpMeterEntry {
+ return h.chunk.meter
+}
+
+// Update updates an existing meter in cache.
+// The provided "meter" must not be modified afterwards.
+func (h *Handle) Update(ctx context.Context, meter *ofp.OfpMeterEntry) error {
+ h.chunk.meter = meter
+ return nil
+}
+
+// Delete removes the meter from the cache
+func (h *Handle) Delete(ctx context.Context) error {
+ h.chunk.deleted = true
+
+ h.loader.lock.Lock()
+ delete(h.loader.meters, h.chunk.meter.Config.MeterId)
+ h.loader.lock.Unlock()
+
+ h.Unlock()
+ return nil
+}
+
+// Unlock releases the lock on the meter
+func (h *Handle) Unlock() {
+ if h.chunk != nil {
+ h.chunk.lock.Unlock()
+ h.chunk = nil // attempting to access the meter through this handle in future will panic
+ }
+}
+
+// ListIDs returns a snapshot of all the managed meter IDs
+// TODO: iterating through meters safely is expensive now, since all meters are stored & locked separately
+// should avoid this where possible
+func (cache *Cache) ListIDs() map[uint32]struct{} {
+ cache.lock.RLock()
+ defer cache.lock.RUnlock()
+ // copy the IDs so caller can safely iterate
+ ret := make(map[uint32]struct{}, len(cache.meters))
+ for id := range cache.meters {
+ ret[id] = struct{}{}
+ }
+ return ret
+}
diff --git a/rw_core/core/device/meter/common.go b/rw_core/core/device/meter/common.go
deleted file mode 100644
index fc77caa..0000000
--- a/rw_core/core/device/meter/common.go
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Copyright 2020-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// Package core Common Logger initialization
-package meter
-
-import (
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
-)
-
-var logger log.CLogger
-
-func init() {
- // Setup this package so that it's log level can be modified at run time
- var err error
- logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{})
- if err != nil {
- panic(err)
- }
-}
diff --git a/rw_core/core/device/meter/loader.go b/rw_core/core/device/meter/loader.go
deleted file mode 100644
index 8f3046b..0000000
--- a/rw_core/core/device/meter/loader.go
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
-
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package meter
-
-import (
- "context"
- "fmt"
- "sync"
-
- "github.com/opencord/voltha-go/db/model"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
- ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/status"
-)
-
-// Loader hides all low-level locking & synchronization related to meter state updates
-type Loader struct {
- dbProxy *model.Proxy
- // this lock protects the meters map, it does not protect individual meters
- lock sync.RWMutex
- meters map[uint32]*chunk
-}
-
-// chunk keeps a meter and the lock for this meter
-type chunk struct {
- // this lock is used to synchronize all access to the meter, and also to the "deleted" variable
- lock sync.Mutex
- deleted bool
-
- meter *ofp.OfpMeterEntry
-}
-
-func NewLoader(dbProxy *model.Proxy) *Loader {
- return &Loader{
- dbProxy: dbProxy,
- meters: make(map[uint32]*chunk),
- }
-}
-
-// Load queries existing meters from the kv,
-// and should only be called once when first created.
-func (loader *Loader) Load(ctx context.Context) {
- loader.lock.Lock()
- defer loader.lock.Unlock()
-
- var meters []*ofp.OfpMeterEntry
- if err := loader.dbProxy.List(ctx, &meters); err != nil {
- logger.Errorw(ctx, "failed-to-list-meters-from-cluster-data-proxy", log.Fields{"error": err})
- return
- }
- for _, meter := range meters {
- loader.meters[meter.Config.MeterId] = &chunk{meter: meter}
- }
-}
-
-// LockOrCreate locks this meter if it exists, or creates a new meter if it does not.
-// In the case of meter creation, the provided "meter" must not be modified afterwards.
-func (loader *Loader) LockOrCreate(ctx context.Context, meter *ofp.OfpMeterEntry) (*Handle, bool, error) {
- // try to use read lock instead of full lock if possible
- if handle, have := loader.Lock(meter.Config.MeterId); have {
- return handle, false, nil
- }
-
- loader.lock.Lock()
- entry, have := loader.meters[meter.Config.MeterId]
- if !have {
- entry := &chunk{meter: meter}
- loader.meters[meter.Config.MeterId] = entry
- entry.lock.Lock()
- loader.lock.Unlock()
-
- if err := loader.dbProxy.Set(ctx, fmt.Sprint(meter.Config.MeterId), meter); err != nil {
- // revert the map
- loader.lock.Lock()
- delete(loader.meters, meter.Config.MeterId)
- loader.lock.Unlock()
-
- entry.deleted = true
- entry.lock.Unlock()
- return nil, false, err
- }
- return &Handle{loader: loader, chunk: entry}, true, nil
- }
- loader.lock.Unlock()
-
- entry.lock.Lock()
- if entry.deleted {
- entry.lock.Unlock()
- return loader.LockOrCreate(ctx, meter)
- }
- return &Handle{loader: loader, chunk: entry}, false, nil
-}
-
-// Lock acquires the lock for this meter, and returns a handle which can be used to access the meter until it's unlocked.
-// This handle ensures that the meter cannot be accessed if the lock is not held.
-// Returns false if the meter is not present.
-// TODO: consider accepting a ctx and aborting the lock attempt on cancellation
-func (loader *Loader) Lock(id uint32) (*Handle, bool) {
- loader.lock.RLock()
- entry, have := loader.meters[id]
- loader.lock.RUnlock()
-
- if !have {
- return nil, false
- }
-
- entry.lock.Lock()
- if entry.deleted {
- entry.lock.Unlock()
- return loader.Lock(id)
- }
- return &Handle{loader: loader, chunk: entry}, true
-}
-
-// Handle is allocated for each Lock() call, all modifications are made using it, and it is invalidated by Unlock()
-// This enforces correct Lock()-Usage()-Unlock() ordering.
-type Handle struct {
- loader *Loader
- chunk *chunk
-}
-
-// GetReadOnly returns an *ofp.OfpMeterEntry which MUST NOT be modified externally, but which is safe to keep indefinitely
-func (h *Handle) GetReadOnly() *ofp.OfpMeterEntry {
- return h.chunk.meter
-}
-
-// Update updates an existing meter in the kv.
-// The provided "meter" must not be modified afterwards.
-func (h *Handle) Update(ctx context.Context, meter *ofp.OfpMeterEntry) error {
- if err := h.loader.dbProxy.Set(ctx, fmt.Sprint(meter.Config.MeterId), meter); err != nil {
- return status.Errorf(codes.Internal, "failed-update-meter-%v: %s", meter.Config.MeterId, err)
- }
- h.chunk.meter = meter
- return nil
-}
-
-// Delete removes the device from the kv
-func (h *Handle) Delete(ctx context.Context) error {
- if err := h.loader.dbProxy.Remove(ctx, fmt.Sprint(h.chunk.meter.Config.MeterId)); err != nil {
- return fmt.Errorf("couldnt-delete-meter-from-store-%v", h.chunk.meter.Config.MeterId)
- }
- h.chunk.deleted = true
-
- h.loader.lock.Lock()
- delete(h.loader.meters, h.chunk.meter.Config.MeterId)
- h.loader.lock.Unlock()
-
- h.Unlock()
- return nil
-}
-
-// Unlock releases the lock on the meter
-func (h *Handle) Unlock() {
- if h.chunk != nil {
- h.chunk.lock.Unlock()
- h.chunk = nil // attempting to access the meter through this handle in future will panic
- }
-}
-
-// ListIDs returns a snapshot of all the managed meter IDs
-// TODO: iterating through meters safely is expensive now, since all meters are stored & locked separately
-// should avoid this where possible
-func (loader *Loader) ListIDs() map[uint32]struct{} {
- loader.lock.RLock()
- defer loader.lock.RUnlock()
- // copy the IDs so caller can safely iterate
- ret := make(map[uint32]struct{}, len(loader.meters))
- for id := range loader.meters {
- ret[id] = struct{}{}
- }
- return ret
-}
diff --git a/rw_core/core/device/transientstate/loader.go b/rw_core/core/device/transientstate/loader.go
index f8710cc..411515f 100644
--- a/rw_core/core/device/transientstate/loader.go
+++ b/rw_core/core/device/transientstate/loader.go
@@ -58,11 +58,16 @@
defer loader.lock.Unlock()
var deviceTransientState voltha.DeviceTransientState
- if have, err := loader.dbProxy.Get(ctx, loader.deviceTransientState.deviceID, &deviceTransientState); err != nil || !have {
+ have, err := loader.dbProxy.Get(ctx, loader.deviceTransientState.deviceID, &deviceTransientState)
+ if err != nil {
logger.Errorw(ctx, "failed-to-get-device-transient-state-from-cluster-data-proxy", log.Fields{"error": err})
return
}
- loader.deviceTransientState.transientState = deviceTransientState.TransientState
+ if have {
+ loader.deviceTransientState.transientState = deviceTransientState.TransientState
+ return
+ }
+ loader.deviceTransientState.transientState = voltha.DeviceTransientState_NONE
}
// Lock acquires the lock for deviceTransientStateLoader, and returns a handle