VOL-3121 - Separated out LogicalDevices' low-level flow/meter/group handling into separate packages.
The new implementation hides the compexity of locking, caching, and interacting with the db.
An attempt was made to ensure that locks are held while updates are made, by returning a "handle" object from each flow/group/meter lock() call, and only allowing access through this call.
An attempt was also made to remove proto.Clone-ing. flows/groups/meters which are returned are NOT cloned, and MUST NOT be modified by users of the flow/group/meter loaders. In addition, flows/groups/meters which are given to the loaders MUST NOT be modified afterward.
There remain many cases where errors during particular kv updates may cause inconsistent state. TODOs have been added for many of these cases. Resolving this may require exposing (through voltha-lib-go) the transaction mechanism from etcd.
There is also the issue that locking a flow/meter/group while another flow/meter/group is held could cause deadlocks. This can be avoided by acquiring locks in a consistent order. Something to keep in mind while fixing the previous issue.
Change-Id: I146eb319c3564635fdc461ec17be13e6f3968cf7
diff --git a/rw_core/core/device/flow/common.go b/rw_core/core/device/flow/common.go
new file mode 100644
index 0000000..ca8bb6a
--- /dev/null
+++ b/rw_core/core/device/flow/common.go
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package core Common Logger initialization
+package flow
+
+import (
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+)
+
+var logger log.Logger
+
+func init() {
+ // Setup this package so that it's log level can be modified at run time
+ var err error
+ logger, err = log.AddPackage(log.JSON, log.ErrorLevel, log.Fields{"pkg": "flow"})
+ if err != nil {
+ panic(err)
+ }
+}
diff --git a/rw_core/core/device/flow/loader.go b/rw_core/core/device/flow/loader.go
new file mode 100644
index 0000000..741a45f
--- /dev/null
+++ b/rw_core/core/device/flow/loader.go
@@ -0,0 +1,194 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package flow
+
+import (
+ "context"
+ "fmt"
+ "strconv"
+ "sync"
+
+ "github.com/opencord/voltha-go/db/model"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+// Loader hides all low-level locking & synchronization related to flow state updates
+type Loader struct {
+ // this lock protects the flows map, it does not protect individual flows
+ lock sync.RWMutex
+ flows map[uint64]*chunk
+
+ dbProxy *model.Proxy
+ logicalDeviceID string // TODO: dbProxy should already have the logicalDeviceID component of the path internally
+}
+
+// chunk keeps a flow and the lock for this flow
+type chunk struct {
+ // this lock is used to synchronize all access to the flow, and also to the "deleted" variable
+ lock sync.Mutex
+ deleted bool
+
+ flow *ofp.OfpFlowStats
+}
+
+func NewLoader(dataProxy *model.Proxy, logicalDeviceID string) *Loader {
+ return &Loader{
+ flows: make(map[uint64]*chunk),
+ dbProxy: dataProxy,
+ logicalDeviceID: logicalDeviceID,
+ }
+}
+
+// Load queries existing flows from the kv,
+// and should only be called once when first created.
+func (loader *Loader) Load(ctx context.Context) {
+ loader.lock.Lock()
+ defer loader.lock.Unlock()
+
+ var flows []*ofp.OfpFlowStats
+ if err := loader.dbProxy.List(ctx, "logical_flows/"+loader.logicalDeviceID, &flows); err != nil {
+ logger.Errorw("failed-to-list-flows-from-cluster-data-proxy", log.Fields{"error": err})
+ return
+ }
+ for _, flow := range flows {
+ loader.flows[flow.Id] = &chunk{flow: flow}
+ }
+}
+
+// LockOrCreate locks this flow if it exists, or creates a new flow if it does not.
+// In the case of flow creation, the provided "flow" must not be modified afterwards.
+func (loader *Loader) LockOrCreate(ctx context.Context, flow *ofp.OfpFlowStats) (*Handle, bool, error) {
+ // try to use read lock instead of full lock if possible
+ if handle, have := loader.Lock(flow.Id); have {
+ return handle, false, nil
+ }
+
+ loader.lock.Lock()
+ entry, have := loader.flows[flow.Id]
+ if !have {
+ entry := &chunk{flow: flow}
+ loader.flows[flow.Id] = entry
+ entry.lock.Lock()
+ loader.lock.Unlock()
+
+ flowID := strconv.FormatUint(uint64(flow.Id), 10)
+ if err := loader.dbProxy.AddWithID(ctx, "logical_flows/"+loader.logicalDeviceID, flowID, flow); err != nil {
+ logger.Errorw("failed-adding-flow-to-db", log.Fields{"deviceID": loader.logicalDeviceID, "flowID": flowID, "err": err})
+
+ // revert the map
+ loader.lock.Lock()
+ delete(loader.flows, flow.Id)
+ loader.lock.Unlock()
+
+ entry.deleted = true
+ entry.lock.Unlock()
+ return nil, false, err
+ }
+ return &Handle{loader: loader, chunk: entry}, true, nil
+ }
+ loader.lock.Unlock()
+
+ entry.lock.Lock()
+ if entry.deleted {
+ entry.lock.Unlock()
+ return loader.LockOrCreate(ctx, flow)
+ }
+ return &Handle{loader: loader, chunk: entry}, false, nil
+}
+
+// Lock acquires the lock for this flow, and returns a handle which can be used to access the meter until it's unlocked.
+// This handle ensures that the flow cannot be accessed if the lock is not held.
+// Returns false if the flow is not present.
+// TODO: consider accepting a ctx and aborting the lock attempt on cancellation
+func (loader *Loader) Lock(id uint64) (*Handle, bool) {
+ loader.lock.RLock()
+ entry, have := loader.flows[id]
+ loader.lock.RUnlock()
+
+ if !have {
+ return nil, false
+ }
+
+ entry.lock.Lock()
+ if entry.deleted {
+ entry.lock.Unlock()
+ return loader.Lock(id)
+ }
+ return &Handle{loader: loader, chunk: entry}, true
+}
+
+type Handle struct {
+ loader *Loader
+ chunk *chunk
+}
+
+// GetReadOnly returns an *ofp.OfpFlowStats which MUST NOT be modified externally, but which is safe to keep indefinitely
+func (h *Handle) GetReadOnly() *ofp.OfpFlowStats {
+ return h.chunk.flow
+}
+
+// Update updates an existing flow in the kv.
+// The provided "flow" must not be modified afterwards.
+func (h *Handle) Update(ctx context.Context, flow *ofp.OfpFlowStats) error {
+ path := fmt.Sprintf("logical_flows/%s/%d", h.loader.logicalDeviceID, flow.Id)
+ if err := h.loader.dbProxy.Update(ctx, path, flow); err != nil {
+ return status.Errorf(codes.Internal, "failed-update-flow:%s:%d %s", h.loader.logicalDeviceID, flow.Id, err)
+ }
+ h.chunk.flow = flow
+ return nil
+}
+
+// Delete removes the device from the kv
+func (h *Handle) Delete(ctx context.Context) error {
+ path := fmt.Sprintf("logical_flows/%s/%d", h.loader.logicalDeviceID, h.chunk.flow.Id)
+ if err := h.loader.dbProxy.Remove(ctx, path); err != nil {
+ return fmt.Errorf("couldnt-delete-flow-from-store-%s", path)
+ }
+ h.chunk.deleted = true
+
+ h.loader.lock.Lock()
+ delete(h.loader.flows, h.chunk.flow.Id)
+ h.loader.lock.Unlock()
+
+ h.Unlock()
+ return nil
+}
+
+// Unlock releases the lock on the flow
+func (h *Handle) Unlock() {
+ if h.chunk != nil {
+ h.chunk.lock.Unlock()
+ h.chunk = nil // attempting to access the flow through this handle in future will panic
+ }
+}
+
+// List returns a snapshot of all the managed flow IDs
+// TODO: iterating through flows safely is expensive now, since all flows are stored & locked separately
+// should avoid this where possible
+func (loader *Loader) List() map[uint64]struct{} {
+ loader.lock.RLock()
+ defer loader.lock.RUnlock()
+ // copy the IDs so caller can safely iterate
+ ret := make(map[uint64]struct{}, len(loader.flows))
+ for id := range loader.flows {
+ ret[id] = struct{}{}
+ }
+ return ret
+}
diff --git a/rw_core/core/device/flow/loader_test.go b/rw_core/core/device/flow/loader_test.go
new file mode 100644
index 0000000..b739272
--- /dev/null
+++ b/rw_core/core/device/flow/loader_test.go
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package flow
+
+import (
+ "bufio"
+ "fmt"
+ "os"
+ "regexp"
+ "strconv"
+ "strings"
+ "testing"
+)
+
+// TestLoadersIdentical ensures that the group, flow, and meter loaders always have an identical implementation.
+func TestLoadersIdentical(t *testing.T) {
+ identical := [][]string{
+ {"ofp\\.OfpFlowStats", "ofp\\.OfpGroupEntry", "ofp\\.OfpMeterEntry"},
+ {"\\.Id", "\\.Desc\\.GroupId", "\\.Config.MeterId"},
+ {"uint64", "uint32", "uint32"},
+ {"Flow", "Group", "Meter"},
+ {"flow", "group", "meter"},
+ }
+
+ regexes := make([]*regexp.Regexp, len(identical))
+ for i, group := range identical {
+ regexes[i] = regexp.MustCompile(strings.Join(group, "|"))
+ }
+
+ for i := 1; i < len(identical[0]); i++ {
+ if err := compare(regexes, "../"+identical[4][0]+"/loader.go", "../"+identical[4][i]+"/loader.go"); err != nil {
+ t.Error(err)
+ return
+ }
+ }
+}
+
+func compare(regexes []*regexp.Regexp, fileNameA, fileNameB string) error {
+ fileA, err := os.Open(fileNameA)
+ if err != nil {
+ return err
+ }
+ defer fileA.Close()
+
+ fileB, err := os.Open(fileNameB)
+ if err != nil {
+ return err
+ }
+ defer fileB.Close()
+
+ scannerA, scannerB := bufio.NewScanner(fileA), bufio.NewScanner(fileB)
+
+ spaceRegex := regexp.MustCompile(" +")
+
+ line := 1
+ for {
+ if continueA, continueB := scannerA.Scan(), scannerB.Scan(); continueA != continueB {
+ if !continueA && continueB {
+ if err := scannerA.Err(); err != nil {
+ return err
+ }
+ }
+ if continueA && !continueB {
+ if err := scannerB.Err(); err != nil {
+ return err
+ }
+ }
+ return fmt.Errorf("line %d: files are not the same length", line)
+ } else if !continueA {
+ // EOF from both files
+ break
+ }
+
+ textA, textB := scannerA.Text(), scannerB.Text()
+
+ replacedA, replacedB := textA, textB
+ for i, regex := range regexes {
+ replacement := "{{type" + strconv.Itoa(i) + "}}"
+ replacedA, replacedB = regex.ReplaceAllString(replacedA, replacement), regex.ReplaceAllString(replacedB, replacement)
+ }
+
+ // replace multiple spaces with single space
+ replacedA, replacedB = spaceRegex.ReplaceAllString(replacedA, " "), spaceRegex.ReplaceAllString(replacedB, " ")
+
+ if replacedA != replacedB {
+ return fmt.Errorf("line %d: files %s and %s do not match: \n\t%s\n\t%s\n\n\t%s\n\t%s", line, fileNameA, fileNameB, textA, textB, replacedA, replacedB)
+ }
+
+ line++
+ }
+
+ if err := scannerA.Err(); err != nil {
+ return err
+ }
+ if err := scannerB.Err(); err != nil {
+ return err
+ }
+ return nil
+}
diff --git a/rw_core/core/device/group/common.go b/rw_core/core/device/group/common.go
new file mode 100644
index 0000000..016dfff
--- /dev/null
+++ b/rw_core/core/device/group/common.go
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package core Common Logger initialization
+package group
+
+import (
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+)
+
+var logger log.Logger
+
+func init() {
+ // Setup this package so that it's log level can be modified at run time
+ var err error
+ logger, err = log.AddPackage(log.JSON, log.ErrorLevel, log.Fields{"pkg": "group"})
+ if err != nil {
+ panic(err)
+ }
+}
diff --git a/rw_core/core/device/group/loader.go b/rw_core/core/device/group/loader.go
new file mode 100644
index 0000000..0e3f078
--- /dev/null
+++ b/rw_core/core/device/group/loader.go
@@ -0,0 +1,194 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package group
+
+import (
+ "context"
+ "fmt"
+ "strconv"
+ "sync"
+
+ "github.com/opencord/voltha-go/db/model"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+// Loader hides all low-level locking & synchronization related to group state updates
+type Loader struct {
+ // this lock protects the groups map, it does not protect individual groups
+ lock sync.RWMutex
+ groups map[uint32]*chunk
+
+ dbProxy *model.Proxy
+ logicalDeviceID string // TODO: dbProxy should already have the logicalDeviceID component of the path internally
+}
+
+// chunk keeps a group and the lock for this group
+type chunk struct {
+ // this lock is used to synchronize all access to the group, and also to the "deleted" variable
+ lock sync.Mutex
+ deleted bool
+
+ group *ofp.OfpGroupEntry
+}
+
+func NewLoader(dataProxy *model.Proxy, logicalDeviceID string) *Loader {
+ return &Loader{
+ groups: make(map[uint32]*chunk),
+ dbProxy: dataProxy,
+ logicalDeviceID: logicalDeviceID,
+ }
+}
+
+// Load queries existing groups from the kv,
+// and should only be called once when first created.
+func (loader *Loader) Load(ctx context.Context) {
+ loader.lock.Lock()
+ defer loader.lock.Unlock()
+
+ var groups []*ofp.OfpGroupEntry
+ if err := loader.dbProxy.List(ctx, "logical_groups/"+loader.logicalDeviceID, &groups); err != nil {
+ logger.Errorw("failed-to-list-groups-from-cluster-data-proxy", log.Fields{"error": err})
+ return
+ }
+ for _, group := range groups {
+ loader.groups[group.Desc.GroupId] = &chunk{group: group}
+ }
+}
+
+// LockOrCreate locks this group if it exists, or creates a new group if it does not.
+// In the case of group creation, the provided "group" must not be modified afterwards.
+func (loader *Loader) LockOrCreate(ctx context.Context, group *ofp.OfpGroupEntry) (*Handle, bool, error) {
+ // try to use read lock instead of full lock if possible
+ if handle, have := loader.Lock(group.Desc.GroupId); have {
+ return handle, false, nil
+ }
+
+ loader.lock.Lock()
+ entry, have := loader.groups[group.Desc.GroupId]
+ if !have {
+ entry := &chunk{group: group}
+ loader.groups[group.Desc.GroupId] = entry
+ entry.lock.Lock()
+ loader.lock.Unlock()
+
+ groupID := strconv.FormatUint(uint64(group.Desc.GroupId), 10)
+ if err := loader.dbProxy.AddWithID(ctx, "logical_groups/"+loader.logicalDeviceID, groupID, group); err != nil {
+ logger.Errorw("failed-adding-group-to-db", log.Fields{"deviceID": loader.logicalDeviceID, "groupID": groupID, "err": err})
+
+ // revert the map
+ loader.lock.Lock()
+ delete(loader.groups, group.Desc.GroupId)
+ loader.lock.Unlock()
+
+ entry.deleted = true
+ entry.lock.Unlock()
+ return nil, false, err
+ }
+ return &Handle{loader: loader, chunk: entry}, true, nil
+ }
+ loader.lock.Unlock()
+
+ entry.lock.Lock()
+ if entry.deleted {
+ entry.lock.Unlock()
+ return loader.LockOrCreate(ctx, group)
+ }
+ return &Handle{loader: loader, chunk: entry}, false, nil
+}
+
+// Lock acquires the lock for this group, and returns a handle which can be used to access the group until it's unlocked.
+// This handle ensures that the group cannot be accessed if the lock is not held.
+// Returns false if the group is not present.
+// TODO: consider accepting a ctx and aborting the lock attempt on cancellation
+func (loader *Loader) Lock(id uint32) (*Handle, bool) {
+ loader.lock.RLock()
+ entry, have := loader.groups[id]
+ loader.lock.RUnlock()
+
+ if !have {
+ return nil, false
+ }
+
+ entry.lock.Lock()
+ if entry.deleted {
+ entry.lock.Unlock()
+ return loader.Lock(id)
+ }
+ return &Handle{loader: loader, chunk: entry}, true
+}
+
+type Handle struct {
+ loader *Loader
+ chunk *chunk
+}
+
+// GetReadOnly returns an *ofp.OfpGroupEntry which MUST NOT be modified externally, but which is safe to keep indefinitely
+func (h *Handle) GetReadOnly() *ofp.OfpGroupEntry {
+ return h.chunk.group
+}
+
+// Update updates an existing group in the kv.
+// The provided "group" must not be modified afterwards.
+func (h *Handle) Update(ctx context.Context, group *ofp.OfpGroupEntry) error {
+ path := fmt.Sprintf("logical_groups/%s/%d", h.loader.logicalDeviceID, group.Desc.GroupId)
+ if err := h.loader.dbProxy.Update(ctx, path, group); err != nil {
+ return status.Errorf(codes.Internal, "failed-update-group:%s:%d %s", h.loader.logicalDeviceID, group.Desc.GroupId, err)
+ }
+ h.chunk.group = group
+ return nil
+}
+
+// Delete removes the device from the kv
+func (h *Handle) Delete(ctx context.Context) error {
+ path := fmt.Sprintf("logical_groups/%s/%d", h.loader.logicalDeviceID, h.chunk.group.Desc.GroupId)
+ if err := h.loader.dbProxy.Remove(ctx, path); err != nil {
+ return fmt.Errorf("couldnt-delete-group-from-store-%s", path)
+ }
+ h.chunk.deleted = true
+
+ h.loader.lock.Lock()
+ delete(h.loader.groups, h.chunk.group.Desc.GroupId)
+ h.loader.lock.Unlock()
+
+ h.Unlock()
+ return nil
+}
+
+// Unlock releases the lock on the group
+func (h *Handle) Unlock() {
+ if h.chunk != nil {
+ h.chunk.lock.Unlock()
+ h.chunk = nil // attempting to access the group through this handle in future will panic
+ }
+}
+
+// List returns a snapshot of all the managed group IDs
+// TODO: iterating through groups safely is expensive now, since all groups are stored & locked separately
+// should avoid this where possible
+func (loader *Loader) List() map[uint32]struct{} {
+ loader.lock.RLock()
+ defer loader.lock.RUnlock()
+ // copy the IDs so caller can safely iterate
+ ret := make(map[uint32]struct{}, len(loader.groups))
+ for id := range loader.groups {
+ ret[id] = struct{}{}
+ }
+ return ret
+}
diff --git a/rw_core/core/device/logical_agent.go b/rw_core/core/device/logical_agent.go
index 7723b74..dbb2da5 100644
--- a/rw_core/core/device/logical_agent.go
+++ b/rw_core/core/device/logical_agent.go
@@ -25,6 +25,9 @@
"github.com/gogo/protobuf/proto"
"github.com/opencord/voltha-go/db/model"
+ "github.com/opencord/voltha-go/rw_core/core/device/flow"
+ "github.com/opencord/voltha-go/rw_core/core/device/group"
+ "github.com/opencord/voltha-go/rw_core/core/device/meter"
fd "github.com/opencord/voltha-go/rw_core/flowdecomposition"
"github.com/opencord/voltha-go/rw_core/route"
coreutils "github.com/opencord/voltha-go/rw_core/utils"
@@ -56,32 +59,31 @@
startOnce sync.Once
stopOnce sync.Once
- meters map[uint32]*MeterChunk
- meterLock sync.RWMutex
- flows map[uint64]*FlowChunk
- flowLock sync.RWMutex
- groups map[uint32]*GroupChunk
- groupLock sync.RWMutex
+ flowLoader *flow.Loader
+ meterLoader *meter.Loader
+ groupLoader *group.Loader
}
func newLogicalDeviceAgent(id string, sn string, deviceID string, ldeviceMgr *LogicalManager,
deviceMgr *Manager, cdProxy *model.Proxy, defaultTimeout time.Duration) *LogicalAgent {
- var agent LogicalAgent
- agent.logicalDeviceID = id
- agent.serialNumber = sn
- agent.rootDeviceID = deviceID
- agent.deviceMgr = deviceMgr
- agent.clusterDataProxy = cdProxy
- agent.ldeviceMgr = ldeviceMgr
- agent.flowDecomposer = fd.NewFlowDecomposer(agent.deviceMgr)
- agent.logicalPortsNo = make(map[uint32]bool)
- agent.defaultTimeout = defaultTimeout
- agent.requestQueue = coreutils.NewRequestQueue()
- agent.meters = make(map[uint32]*MeterChunk)
- agent.flows = make(map[uint64]*FlowChunk)
- agent.groups = make(map[uint32]*GroupChunk)
+ agent := &LogicalAgent{
+ logicalDeviceID: id,
+ serialNumber: sn,
+ rootDeviceID: deviceID,
+ deviceMgr: deviceMgr,
+ clusterDataProxy: cdProxy,
+ ldeviceMgr: ldeviceMgr,
+ flowDecomposer: fd.NewFlowDecomposer(deviceMgr),
+ logicalPortsNo: make(map[uint32]bool),
+ defaultTimeout: defaultTimeout,
+ requestQueue: coreutils.NewRequestQueue(),
+
+ flowLoader: flow.NewLoader(cdProxy, id),
+ meterLoader: meter.NewLoader(cdProxy, id),
+ groupLoader: group.NewLoader(cdProxy, id),
+ }
agent.deviceRoutes = route.NewDeviceRoutes(agent.logicalDeviceID, agent.deviceMgr.getDevice)
- return &agent
+ return agent
}
// start creates the logical device and add it to the data model
@@ -161,9 +163,9 @@
// Setup the local list of logical ports
agent.addLogicalPortsToMap(ld.Ports)
// load the flows, meters and groups from KV to cache
- agent.loadFlows(ctx)
- agent.loadMeters(ctx)
- agent.loadGroups(ctx)
+ agent.flowLoader.Load(ctx)
+ agent.meterLoader.Load(ctx)
+ agent.groupLoader.Load(ctx)
}
// Setup the device routes. Building routes may fail if the pre-conditions are not satisfied (e.g. no PON ports present)
@@ -234,20 +236,6 @@
}
agent.logicalDevice = logicalDevice
-
- return nil
-}
-
-func (agent *LogicalAgent) deleteFlowAndUpdateMeterStats(ctx context.Context, mod *ofp.OfpFlowMod, chunk *FlowChunk) error {
- chunk.lock.Lock()
- defer chunk.lock.Unlock()
- if changedMeter := agent.updateFlowCountOfMeterStats(ctx, mod, chunk.flow, false); !changedMeter {
- return fmt.Errorf("Cannot-delete-flow-%s. Meter-update-failed", chunk.flow)
- }
- // Update store and cache
- if err := agent.removeLogicalDeviceFlow(ctx, chunk.flow.Id); err != nil {
- return fmt.Errorf("Cannot-delete-flows-%s. Delete-from-store-failed", chunk.flow)
- }
return nil
}
@@ -314,10 +302,10 @@
return responses
}
-func (agent *LogicalAgent) deleteFlowsFromParentDevice(flows ofp.Flows, metadata *voltha.FlowMetadata) []coreutils.Response {
+func (agent *LogicalAgent) deleteFlowsFromParentDevice(flows map[uint64]*ofp.OfpFlowStats, metadata *voltha.FlowMetadata) []coreutils.Response {
logger.Debugw("deleting-flows-from-parent-device", log.Fields{"logical-device-id": agent.logicalDeviceID, "flows": flows})
responses := make([]coreutils.Response, 0)
- for _, flow := range flows.Items {
+ for _, flow := range flows {
response := coreutils.NewResponse()
responses = append(responses, response)
uniPort, err := agent.getUNILogicalPortNo(flow)
diff --git a/rw_core/core/device/logical_agent_flow.go b/rw_core/core/device/logical_agent_flow.go
index 0e811e7..b06b0f7 100644
--- a/rw_core/core/device/logical_agent_flow.go
+++ b/rw_core/core/device/logical_agent_flow.go
@@ -32,6 +32,19 @@
"strconv"
)
+// listLogicalDeviceFlows returns logical device flows
+func (agent *LogicalAgent) listLogicalDeviceFlows() map[uint64]*ofp.OfpFlowStats {
+ flowIDs := agent.flowLoader.List()
+ flows := make(map[uint64]*ofp.OfpFlowStats, len(flowIDs))
+ for flowID := range flowIDs {
+ if flowHandle, have := agent.flowLoader.Lock(flowID); have {
+ flows[flowID] = flowHandle.GetReadOnly()
+ flowHandle.Unlock()
+ }
+ }
+ return flows
+}
+
//updateFlowTable updates the flow table of that logical device
func (agent *LogicalAgent) updateFlowTable(ctx context.Context, flow *ofp.OfpFlowMod) error {
logger.Debug("UpdateFlowTable")
@@ -84,54 +97,29 @@
func (agent *LogicalAgent) decomposeAndAdd(ctx context.Context, flow *ofp.OfpFlowStats, mod *ofp.OfpFlowMod) (bool, bool, error) {
changed := false
updated := false
- alreadyExist := true
var flowToReplace *ofp.OfpFlowStats
//if flow is not found in the map, create a new entry, otherwise get the existing one.
- agent.flowLock.Lock()
- flowChunk, ok := agent.flows[flow.Id]
- if !ok {
- flowChunk = &FlowChunk{
- flow: flow,
- }
- agent.flows[flow.Id] = flowChunk
- alreadyExist = false
- flowChunk.lock.Lock() //acquire chunk lock before releasing map lock
- defer flowChunk.lock.Unlock()
- agent.flowLock.Unlock()
- } else {
- agent.flowLock.Unlock() //release map lock before acquiring chunk lock
- flowChunk.lock.Lock()
- defer flowChunk.lock.Unlock()
+ flowHandle, created, err := agent.flowLoader.LockOrCreate(ctx, flow)
+ if err != nil {
+ return changed, updated, err
}
+ defer flowHandle.Unlock()
- if !alreadyExist {
- flowID := strconv.FormatUint(flow.Id, 10)
- if err := agent.clusterDataProxy.AddWithID(ctx, "logical_flows/"+agent.logicalDeviceID, flowID, flow); err != nil {
- logger.Errorw("failed-adding-flow-to-db", log.Fields{"deviceID": agent.logicalDeviceID, "flowID": flowID, "err": err})
- //Revert the map
- //TODO: Solve the condition:If we have two flow Adds of the same flow (at least same priority and match) in quick succession
- //then if the first one fails while the second one was waiting on the flowchunk, we will end up with an instance of flowChunk that is no longer in the map.
- agent.flowLock.Lock()
- delete(agent.flows, flow.Id)
- agent.flowLock.Unlock()
- return changed, updated, err
- }
- }
flows := make([]*ofp.OfpFlowStats, 0)
- updatedFlows := make([]*ofp.OfpFlowStats, 0)
checkOverlap := (mod.Flags & uint32(ofp.OfpFlowModFlags_OFPFF_CHECK_OVERLAP)) != 0
if checkOverlap {
+ // TODO: this currently does nothing
if overlapped := fu.FindOverlappingFlows(flows, mod); len(overlapped) != 0 {
- // TODO: should this error be notified other than being logged?
+ // TODO: should this error be notified other than being logged?
logger.Warnw("overlapped-flows", log.Fields{"logicaldeviceId": agent.logicalDeviceID})
} else {
// Add flow
changed = true
}
} else {
- if alreadyExist {
- flowToReplace = flowChunk.flow
+ if !created {
+ flowToReplace = flowHandle.GetReadOnly()
if (mod.Flags & uint32(ofp.OfpFlowModFlags_OFPFF_RESET_COUNTS)) != 0 {
flow.ByteCount = flowToReplace.ByteCount
flow.PacketCount = flowToReplace.PacketCount
@@ -146,15 +134,24 @@
}
logger.Debugw("flowAdd-changed", log.Fields{"changed": changed, "updated": updated})
if changed {
- updatedFlows = append(updatedFlows, flow)
- var flowMetadata voltha.FlowMetadata
- lMeters, _ := agent.ListLogicalDeviceMeters(ctx)
- if err := agent.GetMeterConfig(updatedFlows, lMeters.Items, &flowMetadata); err != nil {
+ updatedFlows := map[uint64]*ofp.OfpFlowStats{flow.Id: flow}
+
+ flowMeterConfig, err := agent.GetMeterConfig(updatedFlows)
+ if err != nil {
logger.Error("Meter-referred-in-flow-not-present")
return changed, updated, err
}
- flowGroups, _ := agent.ListLogicalDeviceFlowGroups(ctx)
- deviceRules, err := agent.flowDecomposer.DecomposeRules(ctx, agent, ofp.Flows{Items: updatedFlows}, *flowGroups)
+
+ groupIDs := agent.groupLoader.List()
+ groups := make(map[uint32]*ofp.OfpGroupEntry, len(groupIDs))
+ for groupID := range groupIDs {
+ if groupHandle, have := agent.groupLoader.Lock(groupID); have {
+ groups[groupID] = groupHandle.GetReadOnly()
+ groupHandle.Unlock()
+ }
+ }
+
+ deviceRules, err := agent.flowDecomposer.DecomposeRules(ctx, agent, updatedFlows, groups)
if err != nil {
return changed, updated, err
}
@@ -162,19 +159,18 @@
logger.Debugw("rules", log.Fields{"rules": deviceRules.String()})
// Update store and cache
if updated {
- if err := agent.updateLogicalDeviceFlow(ctx, flow, flowChunk); err != nil {
+ if err := flowHandle.Update(ctx, flow); err != nil {
return changed, updated, err
}
}
-
- respChannels := agent.addFlowsAndGroupsToDevices(deviceRules, &flowMetadata)
+ respChannels := agent.addFlowsAndGroupsToDevices(deviceRules, toMetadata(flowMeterConfig))
// Create the go routines to wait
go func() {
// Wait for completion
if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChannels...); res != nil {
logger.Infow("failed-to-add-flows-will-attempt-deletion", log.Fields{"errors": res, "logical-device-id": agent.logicalDeviceID})
// Revert added flows
- if err := agent.revertAddedFlows(context.Background(), mod, flow, flowToReplace, deviceRules, &flowMetadata); err != nil {
+ if err := agent.revertAddedFlows(context.Background(), mod, flow, flowToReplace, deviceRules, toMetadata(flowMeterConfig)); err != nil {
logger.Errorw("failure-to-delete-flows-after-failed-addition", log.Fields{"logical-device-id": agent.logicalDeviceID, "error": err})
}
}
@@ -188,27 +184,24 @@
func (agent *LogicalAgent) revertAddedFlows(ctx context.Context, mod *ofp.OfpFlowMod, addedFlow *ofp.OfpFlowStats, replacedFlow *ofp.OfpFlowStats, deviceRules *fu.DeviceRules, metadata *voltha.FlowMetadata) error {
logger.Debugw("revertFlowAdd", log.Fields{"added-flow": addedFlow, "replaced-flow": replacedFlow, "device-rules": deviceRules, "metadata": metadata})
- agent.flowLock.RLock()
- flowChunk, ok := agent.flows[addedFlow.Id]
- agent.flowLock.RUnlock()
- if !ok {
+ flowHandle, have := agent.flowLoader.Lock(addedFlow.Id)
+ if !have {
// Not found - do nothing
log.Debugw("flow-not-found", log.Fields{"added-flow": addedFlow})
return nil
}
- //Leave the map lock and syncronize per flow
- flowChunk.lock.Lock()
- defer flowChunk.lock.Unlock()
+ defer flowHandle.Unlock()
if replacedFlow != nil {
- if err := agent.updateLogicalDeviceFlow(ctx, replacedFlow, flowChunk); err != nil {
+ if err := flowHandle.Update(ctx, replacedFlow); err != nil {
return err
}
} else {
- if err := agent.removeLogicalDeviceFlow(ctx, addedFlow.Id); err != nil {
+ if err := flowHandle.Delete(ctx); err != nil {
return err
}
}
+
// Revert meters
if changedMeterStats := agent.updateFlowCountOfMeterStats(ctx, mod, addedFlow, true); !changedMeterStats {
return fmt.Errorf("Unable-to-revert-meterstats-for-flow-%s", strconv.FormatUint(addedFlow.Id, 10))
@@ -235,56 +228,72 @@
return nil
}
+ //build a list of what to delete
+ toDelete := make(map[uint64]*ofp.OfpFlowStats)
+
+ // add perfectly matching entry if exists
fs, err := fu.FlowStatsEntryFromFlowModMessage(mod)
if err != nil {
return err
}
+ if handle, have := agent.flowLoader.Lock(fs.Id); have {
+ toDelete[fs.Id] = handle.GetReadOnly()
+ handle.Unlock()
+ }
- //build a list of what to delete
- toDelete := make([]*ofp.OfpFlowStats, 0)
- toDeleteChunks := make([]*FlowChunk, 0)
- //Lock the map to search the matched flows
- agent.flowLock.RLock()
- for _, f := range agent.flows {
- if fu.FlowMatch(f.flow, fs) {
- toDelete = append(toDelete, f.flow)
- toDeleteChunks = append(toDeleteChunks, f)
- continue
- }
- // Check wild card match
- if fu.FlowMatchesMod(f.flow, mod) {
- toDelete = append(toDelete, f.flow)
- toDeleteChunks = append(toDeleteChunks, f)
+ // search through all the flows
+ for flowID := range agent.flowLoader.List() {
+ if flowHandle, have := agent.flowLoader.Lock(flowID); have {
+ if flow := flowHandle.GetReadOnly(); fu.FlowMatchesMod(flow, mod) {
+ toDelete[flow.Id] = flow
+ }
+ flowHandle.Unlock()
}
}
- agent.flowLock.RUnlock()
+
//Delete the matched flows
if len(toDelete) > 0 {
logger.Debugw("flowDelete", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "toDelete": len(toDelete)})
- var meters []*ofp.OfpMeterEntry
- var flowGroups []*ofp.OfpGroupEntry
- if ofpMeters, err := agent.ListLogicalDeviceMeters(ctx); err != nil {
- meters = ofpMeters.Items
- }
- if groups, err := agent.ListLogicalDeviceFlowGroups(ctx); err != nil {
- flowGroups = groups.Items
- }
-
- for _, fc := range toDeleteChunks {
- if err := agent.deleteFlowAndUpdateMeterStats(ctx, mod, fc); err != nil {
- return err
+ for _, flow := range toDelete {
+ if flowHandle, have := agent.flowLoader.Lock(flow.Id); have {
+ // TODO: Flow should only be updated if meter is updated, and meter should only be updated if flow is updated
+ // currently an error while performing the second operation will leave an inconsistent state in kv.
+ // This should be a single atomic operation down to the kv.
+ if changedMeter := agent.updateFlowCountOfMeterStats(ctx, mod, flowHandle.GetReadOnly(), false); !changedMeter {
+ flowHandle.Unlock()
+ return fmt.Errorf("cannot-delete-flow-%d. Meter-update-failed", flow.Id)
+ }
+ // Update store and cache
+ if err := flowHandle.Delete(ctx); err != nil {
+ flowHandle.Unlock()
+ return fmt.Errorf("cannot-delete-flows-%d. Delete-from-store-failed", flow.Id)
+ }
+ flowHandle.Unlock()
+ // TODO: since this is executed in a loop without also updating meter stats, and error part way through this
+ // operation will leave inconsistent state in the meter stats & flows on the devices.
+ // This & related meter updates should be a single atomic operation down to the kv.
}
}
- var flowMetadata voltha.FlowMetadata
- if err := agent.GetMeterConfig(toDelete, meters, &flowMetadata); err != nil { // This should never happen
+
+ metersConfig, err := agent.GetMeterConfig(toDelete)
+ if err != nil { // This should never happen
logger.Error("Meter-referred-in-flows-not-present")
return err
}
+
+ groups := make(map[uint32]*ofp.OfpGroupEntry)
+ for groupID := range agent.groupLoader.List() {
+ if groupHandle, have := agent.groupLoader.Lock(groupID); have {
+ groups[groupID] = groupHandle.GetReadOnly()
+ groupHandle.Unlock()
+ }
+ }
+
var respChnls []coreutils.Response
var partialRoute bool
var deviceRules *fu.DeviceRules
- deviceRules, err = agent.flowDecomposer.DecomposeRules(ctx, agent, ofp.Flows{Items: toDelete}, ofp.FlowGroups{Items: flowGroups})
+ deviceRules, err = agent.flowDecomposer.DecomposeRules(ctx, agent, toDelete, groups)
if err != nil {
// A no route error means no route exists between the ports specified in the flow. This can happen when the
// child device is deleted and a request to delete flows from the parent device is received
@@ -297,9 +306,9 @@
// Update the devices
if partialRoute {
- respChnls = agent.deleteFlowsFromParentDevice(ofp.Flows{Items: toDelete}, &flowMetadata)
+ respChnls = agent.deleteFlowsFromParentDevice(toDelete, toMetadata(metersConfig))
} else {
- respChnls = agent.deleteFlowsAndGroupsFromDevices(deviceRules, &flowMetadata)
+ respChnls = agent.deleteFlowsAndGroupsFromDevices(deviceRules, toMetadata(metersConfig))
}
// Wait for the responses
@@ -327,38 +336,35 @@
return err
}
logger.Debugw("flow-id-in-flow-delete-strict", log.Fields{"flowID": flow.Id})
- agent.flowLock.RLock()
- flowChunk, ok := agent.flows[flow.Id]
- agent.flowLock.RUnlock()
- if !ok {
+ flowHandle, have := agent.flowLoader.Lock(flow.Id)
+ if !have {
logger.Debugw("Skipping-flow-delete-strict-request. No-flow-found", log.Fields{"flowMod": mod})
return nil
}
- //Release the map lock and syncronize per flow
- flowChunk.lock.Lock()
- defer flowChunk.lock.Unlock()
+ defer flowHandle.Unlock()
- var meters []*ofp.OfpMeterEntry
- var flowGroups []*ofp.OfpGroupEntry
- if ofMeters, er := agent.ListLogicalDeviceMeters(ctx); er == nil {
- meters = ofMeters.Items
+ groups := make(map[uint32]*ofp.OfpGroupEntry)
+ for groupID := range agent.groupLoader.List() {
+ if groupHandle, have := agent.groupLoader.Lock(groupID); have {
+ groups[groupID] = groupHandle.GetReadOnly()
+ groupHandle.Unlock()
+ }
}
- if ofGroups, er := agent.ListLogicalDeviceFlowGroups(ctx); er == nil {
- flowGroups = ofGroups.Items
- }
+
if changedMeter := agent.updateFlowCountOfMeterStats(ctx, mod, flow, false); !changedMeter {
return fmt.Errorf("Cannot delete flow - %s. Meter update failed", flow)
}
- var flowMetadata voltha.FlowMetadata
- flowsToDelete := []*ofp.OfpFlowStats{flowChunk.flow}
- if err := agent.GetMeterConfig(flowsToDelete, meters, &flowMetadata); err != nil {
+ flowsToDelete := map[uint64]*ofp.OfpFlowStats{flow.Id: flowHandle.GetReadOnly()}
+
+ flowMetadata, err := agent.GetMeterConfig(flowsToDelete)
+ if err != nil {
logger.Error("meter-referred-in-flows-not-present")
return err
}
var respChnls []coreutils.Response
var partialRoute bool
- deviceRules, err := agent.flowDecomposer.DecomposeRules(ctx, agent, ofp.Flows{Items: flowsToDelete}, ofp.FlowGroups{Items: flowGroups})
+ deviceRules, err := agent.flowDecomposer.DecomposeRules(ctx, agent, flowsToDelete, groups)
if err != nil {
// A no route error means no route exists between the ports specified in the flow. This can happen when the
// child device is deleted and a request to delete flows from the parent device is received
@@ -370,14 +376,14 @@
}
// Update the model
- if err := agent.removeLogicalDeviceFlow(ctx, flow.Id); err != nil {
+ if err := flowHandle.Delete(ctx); err != nil {
return err
}
// Update the devices
if partialRoute {
- respChnls = agent.deleteFlowsFromParentDevice(ofp.Flows{Items: flowsToDelete}, &flowMetadata)
+ respChnls = agent.deleteFlowsFromParentDevice(flowsToDelete, toMetadata(flowMetadata))
} else {
- respChnls = agent.deleteFlowsAndGroupsFromDevices(deviceRules, &flowMetadata)
+ respChnls = agent.deleteFlowsAndGroupsFromDevices(deviceRules, toMetadata(flowMetadata))
}
// Wait for completion
@@ -400,3 +406,48 @@
func (agent *LogicalAgent) flowModifyStrict(mod *ofp.OfpFlowMod) error {
return errors.New("flowModifyStrict not implemented")
}
+
+// TODO: Remove this helper, just pass the map through to functions directly
+func toMetadata(meters map[uint32]*ofp.OfpMeterConfig) *voltha.FlowMetadata {
+ ctr, ret := 0, make([]*ofp.OfpMeterConfig, len(meters))
+ for _, meter := range meters {
+ ret[ctr] = meter
+ ctr++
+ }
+ return &voltha.FlowMetadata{Meters: ret}
+}
+
+func (agent *LogicalAgent) deleteFlowsHavingMeter(ctx context.Context, meterID uint32) error {
+ logger.Infow("Delete-flows-matching-meter", log.Fields{"meter": meterID})
+ for flowID := range agent.flowLoader.List() {
+ if flowHandle, have := agent.flowLoader.Lock(flowID); have {
+ if flowMeterID := fu.GetMeterIdFromFlow(flowHandle.GetReadOnly()); flowMeterID != 0 && flowMeterID == meterID {
+ if err := flowHandle.Delete(ctx); err != nil {
+ //TODO: Think on carrying on and deleting the remaining flows, instead of returning.
+ //Anyways this returns an error to controller which possibly results with a re-deletion.
+ //Then how can we handle the new deletion request(Same for group deletion)?
+ return err
+ }
+ }
+ flowHandle.Unlock()
+ }
+ }
+ return nil
+}
+
+func (agent *LogicalAgent) deleteFlowsHavingGroup(ctx context.Context, groupID uint32) (map[uint64]*ofp.OfpFlowStats, error) {
+ logger.Infow("Delete-flows-matching-group", log.Fields{"groupID": groupID})
+ flowsRemoved := make(map[uint64]*ofp.OfpFlowStats)
+ for flowID := range agent.flowLoader.List() {
+ if flowHandle, have := agent.flowLoader.Lock(flowID); have {
+ if flow := flowHandle.GetReadOnly(); fu.FlowHasOutGroup(flow, groupID) {
+ if err := flowHandle.Delete(ctx); err != nil {
+ return nil, err
+ }
+ flowsRemoved[flowID] = flow
+ }
+ flowHandle.Unlock()
+ }
+ }
+ return flowsRemoved, nil
+}
diff --git a/rw_core/core/device/logical_agent_flow_loader.go b/rw_core/core/device/logical_agent_flow_loader.go
deleted file mode 100644
index 84d1a47..0000000
--- a/rw_core/core/device/logical_agent_flow_loader.go
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
-
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package device
-
-import (
- "context"
- "fmt"
- "sync"
-
- "github.com/gogo/protobuf/proto"
- fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
- "github.com/opencord/voltha-lib-go/v3/pkg/log"
- ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/status"
-)
-
-//FlowChunk keeps a flow and the lock for this flow. The lock in the struct is used to syncronize the
-//modifications for the related flow.
-type FlowChunk struct {
- flow *ofp.OfpFlowStats
- lock sync.Mutex
-}
-
-func (agent *LogicalAgent) loadFlows(ctx context.Context) {
- agent.flowLock.Lock()
- defer agent.flowLock.Unlock()
-
- var flowList []*ofp.OfpFlowStats
- if err := agent.clusterDataProxy.List(ctx, "logical_flows/"+agent.logicalDeviceID, &flowList); err != nil {
- logger.Errorw("Failed-to-list-logicalflows-from-cluster-data-proxy", log.Fields{"error": err})
- return
- }
- for _, flow := range flowList {
- if flow != nil {
- flowsChunk := FlowChunk{
- flow: flow,
- }
- agent.flows[flow.Id] = &flowsChunk
- }
- }
-}
-
-//updateLogicalDeviceFlow updates flow in the store and cache
-//It is assumed that the chunk lock has been acquired before this function is called
-func (agent *LogicalAgent) updateLogicalDeviceFlow(ctx context.Context, flow *ofp.OfpFlowStats, flowChunk *FlowChunk) error {
- path := fmt.Sprintf("logical_flows/%s/%d", agent.logicalDeviceID, flow.Id)
- if err := agent.clusterDataProxy.Update(ctx, path, flow); err != nil {
- return status.Errorf(codes.Internal, "failed-update-flow:%s:%d %s", agent.logicalDeviceID, flow.Id, err)
- }
- flowChunk.flow = flow
- return nil
-}
-
-//removeLogicalDeviceFlow deletes the flow from store and cache.
-//It is assumed that the chunk lock has been acquired before this function is called
-func (agent *LogicalAgent) removeLogicalDeviceFlow(ctx context.Context, flowID uint64) error {
- path := fmt.Sprintf("logical_flows/%s/%d", agent.logicalDeviceID, flowID)
- if err := agent.clusterDataProxy.Remove(ctx, path); err != nil {
- return fmt.Errorf("couldnt-delete-flow-from-the-store-%s", path)
- }
- agent.flowLock.Lock()
- defer agent.flowLock.Unlock()
- delete(agent.flows, flowID)
- return nil
-}
-
-// ListLogicalDeviceFlows returns logical device flows
-func (agent *LogicalAgent) ListLogicalDeviceFlows(ctx context.Context) (*ofp.Flows, error) {
- logger.Debug("ListLogicalDeviceFlows")
- var flowStats []*ofp.OfpFlowStats
- agent.flowLock.RLock()
- defer agent.flowLock.RUnlock()
- for _, flowChunk := range agent.flows {
- flowStats = append(flowStats, (proto.Clone(flowChunk.flow)).(*ofp.OfpFlowStats))
- }
- return &ofp.Flows{Items: flowStats}, nil
-}
-
-func (agent *LogicalAgent) deleteFlowsOfMeter(ctx context.Context, meterID uint32) error {
- logger.Infow("Delete-flows-matching-meter", log.Fields{"meter": meterID})
- agent.flowLock.Lock()
- defer agent.flowLock.Unlock()
- for flowID, flowChunk := range agent.flows {
- if mID := fu.GetMeterIdFromFlow(flowChunk.flow); mID != 0 && mID == meterID {
- logger.Debugw("Flow-to-be- deleted", log.Fields{"flow": flowChunk.flow})
- path := fmt.Sprintf("logical_flows/%s/%d", agent.logicalDeviceID, flowID)
- if err := agent.clusterDataProxy.Remove(ctx, path); err != nil {
- //TODO: Think on carrying on and deleting the remaining flows, instead of returning.
- //Anyways this returns an error to controller which possibly results with a re-deletion.
- //Then how can we handle the new deletion request(Same for group deletion)?
- return fmt.Errorf("couldnt-deleted-flow-from-store-%s", path)
- }
- delete(agent.flows, flowID)
- }
- }
- return nil
-}
-
-func (agent *LogicalAgent) deleteFlowsOfGroup(ctx context.Context, groupID uint32) ([]*ofp.OfpFlowStats, error) {
- logger.Infow("Delete-flows-matching-group", log.Fields{"groupID": groupID})
- var flowsRemoved []*ofp.OfpFlowStats
- agent.flowLock.Lock()
- defer agent.flowLock.Unlock()
- for flowID, flowChunk := range agent.flows {
- if fu.FlowHasOutGroup(flowChunk.flow, groupID) {
- path := fmt.Sprintf("logical_flows/%s/%d", agent.logicalDeviceID, flowID)
- if err := agent.clusterDataProxy.Remove(ctx, path); err != nil {
- return nil, fmt.Errorf("couldnt-delete-flow-from-store-%s", path)
- }
- delete(agent.flows, flowID)
- flowsRemoved = append(flowsRemoved, flowChunk.flow)
- }
- }
- return flowsRemoved, nil
-}
diff --git a/rw_core/core/device/logical_agent_group.go b/rw_core/core/device/logical_agent_group.go
index 73caa07..e126fdc 100644
--- a/rw_core/core/device/logical_agent_group.go
+++ b/rw_core/core/device/logical_agent_group.go
@@ -19,7 +19,6 @@
import (
"context"
"fmt"
- "strconv"
coreutils "github.com/opencord/voltha-go/rw_core/utils"
fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
@@ -30,6 +29,19 @@
"google.golang.org/grpc/status"
)
+// listLogicalDeviceGroups returns logical device flow groups
+func (agent *LogicalAgent) listLogicalDeviceGroups() map[uint32]*ofp.OfpGroupEntry {
+ groupIDs := agent.groupLoader.List()
+ groups := make(map[uint32]*ofp.OfpGroupEntry, len(groupIDs))
+ for groupID := range groupIDs {
+ if groupHandle, have := agent.groupLoader.Lock(groupID); have {
+ groups[groupID] = groupHandle.GetReadOnly()
+ groupHandle.Unlock()
+ }
+ }
+ return groups
+}
+
//updateGroupTable updates the group table of that logical device
func (agent *LogicalAgent) updateGroupTable(ctx context.Context, groupMod *ofp.OfpGroupMod) error {
logger.Debug("updateGroupTable")
@@ -53,36 +65,22 @@
return nil
}
logger.Debugw("groupAdd", log.Fields{"GroupId": groupMod.GroupId})
- agent.groupLock.Lock()
- _, ok := agent.groups[groupMod.GroupId]
- if ok {
- agent.groupLock.Unlock()
- return fmt.Errorf("Group %d already exists", groupMod.GroupId)
- }
groupEntry := fu.GroupEntryFromGroupMod(groupMod)
- groupChunk := GroupChunk{
- group: groupEntry,
- }
- //add to map
- agent.groups[groupMod.GroupId] = &groupChunk
- groupChunk.lock.Lock()
- defer groupChunk.lock.Unlock()
- agent.groupLock.Unlock()
- //add to the kv store
- path := fmt.Sprintf("groups/%s", agent.logicalDeviceID)
- groupID := strconv.Itoa(int(groupMod.GroupId))
- if err := agent.clusterDataProxy.AddWithID(ctx, path, groupID, groupEntry); err != nil {
- logger.Errorw("failed-adding-group", log.Fields{"deviceID": agent.logicalDeviceID, "groupID": groupID, "err": err})
- agent.groupLock.Lock()
- delete(agent.groups, groupMod.GroupId)
- agent.groupLock.Unlock()
+
+ groupHandle, created, err := agent.groupLoader.LockOrCreate(ctx, groupEntry)
+ if err != nil {
return err
}
- deviceRules := fu.NewDeviceRules()
- deviceRules.CreateEntryIfNotExist(agent.rootDeviceID)
+ groupHandle.Unlock()
+
+ if !created {
+ return fmt.Errorf("group %d already exists", groupMod.GroupId)
+ }
+
fg := fu.NewFlowsAndGroups()
- fg.AddGroup(fu.GroupEntryFromGroupMod(groupMod))
+ fg.AddGroup(groupEntry)
+ deviceRules := fu.NewDeviceRules()
deviceRules.AddFlowsAndGroup(agent.rootDeviceID, fg)
logger.Debugw("rules", log.Fields{"rules for group-add": deviceRules.String()})
@@ -105,63 +103,42 @@
if groupMod == nil {
return nil
}
- affectedFlows := make([]*ofp.OfpFlowStats, 0)
- affectedGroups := make([]*ofp.OfpGroupEntry, 0)
- var groupsChanged bool
- groupID := groupMod.GroupId
- var err error
- if groupID == uint32(ofp.OfpGroup_OFPG_ALL) {
- if err := func() error {
- agent.groupLock.Lock()
- defer agent.groupLock.Unlock()
- for key, groupChunk := range agent.groups {
- //Remove from store and cache. Do this in a one time lock allocation.
- path := fmt.Sprintf("groups/%s/%d", agent.logicalDeviceID, key)
- if err := agent.clusterDataProxy.Remove(ctx, path); err != nil {
- return fmt.Errorf("couldnt-deleted-group-from-store-%s", path)
- }
- delete(agent.groups, groupID)
- var flows []*ofp.OfpFlowStats
- if flows, err = agent.deleteFlowsOfGroup(ctx, key); err != nil {
- logger.Errorw("cannot-update-flow-for-group-delete", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "groupID": key})
- return err
- }
- affectedFlows = append(affectedFlows, flows...)
- affectedGroups = append(affectedGroups, groupChunk.group)
- }
- return nil
- }(); err != nil {
- return err
- }
- groupsChanged = true
- } else {
- agent.groupLock.RLock()
- groupChunk, ok := agent.groups[groupID]
- agent.groupLock.RUnlock()
- if !ok {
- logger.Warnw("group-not-found", log.Fields{"groupID": groupID})
- return nil
- }
- groupChunk.lock.Lock()
- defer groupChunk.lock.Unlock()
- var flows []*ofp.OfpFlowStats
- if flows, err = agent.deleteFlowsOfGroup(ctx, groupID); err != nil {
- logger.Errorw("cannot-update-flow-for-group-delete", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "groupID": groupID})
- return err
- }
- //remove from store
- if err := agent.removeLogicalDeviceFlowGroup(ctx, groupID); err != nil {
- return err
- }
- affectedFlows = append(affectedFlows, flows...)
- affectedGroups = append(affectedGroups, groupChunk.group)
- groupsChanged = true
+ affectedFlows := make(map[uint64]*ofp.OfpFlowStats)
+ affectedGroups := make(map[uint32]*ofp.OfpGroupEntry)
+ var groupsChanged bool
+
+ toDelete := map[uint32]struct{}{groupMod.GroupId: {}}
+ if groupMod.GroupId == uint32(ofp.OfpGroup_OFPG_ALL) {
+ toDelete = agent.groupLoader.List()
}
- if err != nil || groupsChanged {
- var deviceRules *fu.DeviceRules
- deviceRules, err = agent.flowDecomposer.DecomposeRules(ctx, agent, ofp.Flows{Items: affectedFlows}, ofp.FlowGroups{Items: affectedGroups})
+ for groupID := range toDelete {
+ if groupHandle, have := agent.groupLoader.Lock(groupID); have {
+ if err := groupHandle.Delete(ctx); err != nil {
+ return err
+ }
+ affectedGroups[groupID] = groupHandle.GetReadOnly()
+ groupHandle.Unlock()
+
+ //TODO: this is another case where ordering guarantees are not being made,
+ // group deletion does not guarantee deletion of corresponding flows.
+ // an error while deleting flows can cause inconsistent state.
+ flows, err := agent.deleteFlowsHavingGroup(ctx, groupID)
+ if err != nil {
+ logger.Errorw("cannot-update-flow-for-group-delete", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "groupID": groupID})
+ return err
+ }
+ for flowID, flow := range flows {
+ affectedFlows[flowID] = flow
+ }
+ }
+ }
+ groupsChanged = true
+
+ //TODO: groupsChanged is always true here? use `len(affectedFlows)!=0` or `len(affectedGroups)!=0` instead?
+ if groupsChanged {
+ deviceRules, err := agent.flowDecomposer.DecomposeRules(ctx, agent, affectedFlows, affectedGroups)
if err != nil {
return err
}
@@ -188,15 +165,13 @@
}
groupID := groupMod.GroupId
- agent.groupLock.RLock()
- groupChunk, ok := agent.groups[groupID]
- agent.groupLock.RUnlock()
- if !ok {
+
+ groupHandle, have := agent.groupLoader.Lock(groupID)
+ if !have {
return fmt.Errorf("group-absent:%d", groupID)
}
- //Don't let any other thread to make modifications to this group till all done here.
- groupChunk.lock.Lock()
- defer groupChunk.lock.Unlock()
+ defer groupHandle.Unlock()
+
//replace existing group entry with new group definition
groupEntry := fu.GroupEntryFromGroupMod(groupMod)
deviceRules := fu.NewDeviceRules()
@@ -206,8 +181,9 @@
deviceRules.AddFlowsAndGroup(agent.rootDeviceID, fg)
logger.Debugw("rules", log.Fields{"rules-for-group-modify": deviceRules.String()})
+
//update KV
- if err := agent.updateLogicalDeviceFlowGroup(ctx, groupEntry, groupChunk); err != nil {
+ if err := groupHandle.Update(ctx, groupEntry); err != nil {
logger.Errorw("Cannot-update-logical-group", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
return err
}
diff --git a/rw_core/core/device/logical_agent_group_loader.go b/rw_core/core/device/logical_agent_group_loader.go
deleted file mode 100644
index 53e4076..0000000
--- a/rw_core/core/device/logical_agent_group_loader.go
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
-
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package device
-
-import (
- "context"
- "fmt"
- "sync"
-
- "github.com/gogo/protobuf/proto"
- "github.com/opencord/voltha-lib-go/v3/pkg/log"
- ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
-)
-
-//GroupChunk keeps a group entry and its lock. The lock in the struct is used to syncronize the
-//modifications for the related group.
-type GroupChunk struct {
- group *ofp.OfpGroupEntry
- lock sync.Mutex
-}
-
-func (agent *LogicalAgent) loadGroups(ctx context.Context) {
- agent.groupLock.Lock()
- defer agent.groupLock.Unlock()
-
- var groups []*ofp.OfpGroupEntry
- if err := agent.clusterDataProxy.List(ctx, "groups/"+agent.logicalDeviceID, &groups); err != nil {
- logger.Errorw("Failed-to-list-groups-from-proxy", log.Fields{"error": err})
- return
- }
- for _, group := range groups {
- if group.Desc != nil {
- groupChunk := GroupChunk{
- group: group,
- }
- agent.groups[group.Desc.GroupId] = &groupChunk
- }
- }
- logger.Infow("Groups-are-loaded-into-the-cache-from-store", log.Fields{"logicalDeviceID": agent.logicalDeviceID})
-}
-
-//updateLogicalDeviceFlowGroup updates the flow groups in store and cache
-//It is assumed that the chunk lock has been acquired before this function is called
-func (agent *LogicalAgent) updateLogicalDeviceFlowGroup(ctx context.Context, groupEntry *ofp.OfpGroupEntry, groupChunk *GroupChunk) error {
- path := fmt.Sprintf("groups/%s/%d", agent.logicalDeviceID, groupEntry.Desc.GroupId)
- if err := agent.clusterDataProxy.Update(ctx, path, groupEntry); err != nil {
- logger.Errorw("error-updating-logical-device-with-group", log.Fields{"error": err})
- return err
- }
- groupChunk.group = groupEntry
- return nil
-}
-
-//removeLogicalDeviceFlowGroup removes the flow groups in store and cache
-//It is assumed that the chunk lock has been acquired before this function is called
-func (agent *LogicalAgent) removeLogicalDeviceFlowGroup(ctx context.Context, groupID uint32) error {
- path := fmt.Sprintf("groups/%s/%d", agent.logicalDeviceID, groupID)
- if err := agent.clusterDataProxy.Remove(ctx, path); err != nil {
- return fmt.Errorf("couldnt-delete-group-from-store-%s", path)
- }
- agent.groupLock.Lock()
- defer agent.groupLock.Unlock()
- delete(agent.groups, groupID)
- return nil
-}
-
-// ListLogicalDeviceFlowGroups returns logical device flow groups
-func (agent *LogicalAgent) ListLogicalDeviceFlowGroups(ctx context.Context) (*ofp.FlowGroups, error) {
- logger.Debug("ListLogicalDeviceFlowGroups")
-
- var groupEntries []*ofp.OfpGroupEntry
- agent.groupLock.RLock()
- defer agent.groupLock.RUnlock()
- for _, value := range agent.groups {
- groupEntries = append(groupEntries, (proto.Clone(value.group)).(*ofp.OfpGroupEntry))
- }
- return &ofp.FlowGroups{Items: groupEntries}, nil
-}
diff --git a/rw_core/core/device/logical_agent_meter.go b/rw_core/core/device/logical_agent_meter.go
index c211f1e..dc44fda 100644
--- a/rw_core/core/device/logical_agent_meter.go
+++ b/rw_core/core/device/logical_agent_meter.go
@@ -19,7 +19,6 @@
import (
"context"
"fmt"
- "strconv"
fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
@@ -28,6 +27,19 @@
"google.golang.org/grpc/status"
)
+// listLogicalDeviceMeters returns logical device meters
+func (agent *LogicalAgent) listLogicalDeviceMeters() map[uint32]*ofp.OfpMeterEntry {
+ meterIDs := agent.meterLoader.List()
+ meters := make(map[uint32]*ofp.OfpMeterEntry, len(meterIDs))
+ for meterID := range meterIDs {
+ if meterHandle, have := agent.meterLoader.Lock(meterID); have {
+ meters[meterID] = meterHandle.GetReadOnly()
+ meterHandle.Unlock()
+ }
+ }
+ return meters
+}
+
// updateMeterTable updates the meter table of that logical device
func (agent *LogicalAgent) updateMeterTable(ctx context.Context, meterMod *ofp.OfpMeterMod) error {
logger.Debug("updateMeterTable")
@@ -53,34 +65,18 @@
}
meterEntry := fu.MeterEntryFromMeterMod(meterMod)
- agent.meterLock.Lock()
- //check if the meter already exists or not
- _, ok := agent.meters[meterMod.MeterId]
- if ok {
- logger.Infow("Meter-already-exists", log.Fields{"meter": *meterMod})
- agent.meterLock.Unlock()
- return nil
- }
- mChunk := MeterChunk{
- meter: meterEntry,
- }
- //Add to map and acquire the per meter lock
- agent.meters[meterMod.MeterId] = &mChunk
- mChunk.lock.Lock()
- defer mChunk.lock.Unlock()
- agent.meterLock.Unlock()
- meterID := strconv.Itoa(int(meterMod.MeterId))
- if err := agent.clusterDataProxy.AddWithID(ctx, "meters/"+agent.logicalDeviceID, meterID, meterEntry); err != nil {
- logger.Errorw("failed-adding-meter", log.Fields{"deviceID": agent.logicalDeviceID, "meterID": meterID, "err": err})
- //Revert the map
- agent.meterLock.Lock()
- delete(agent.meters, meterMod.MeterId)
- agent.meterLock.Unlock()
+ meterHandle, created, err := agent.meterLoader.LockOrCreate(ctx, meterEntry)
+ if err != nil {
return err
}
+ defer meterHandle.Unlock()
- logger.Debugw("Meter-added-successfully", log.Fields{"Added-meter": meterEntry})
+ if created {
+ logger.Debugw("Meter-added-successfully", log.Fields{"Added-meter": meterEntry})
+ } else {
+ logger.Infow("Meter-already-exists", log.Fields{"meter": *meterMod})
+ }
return nil
}
@@ -89,25 +85,25 @@
if meterMod == nil {
return nil
}
- agent.meterLock.RLock()
- meterChunk, ok := agent.meters[meterMod.MeterId]
- agent.meterLock.RUnlock()
- if ok {
- //Dont let anyone to do any changes to this meter until this is done.
- //And wait if someone else is already making modifications. Do this with per meter lock.
- meterChunk.lock.Lock()
- defer meterChunk.lock.Unlock()
- if err := agent.deleteFlowsOfMeter(ctx, meterMod.MeterId); err != nil {
- return err
- }
- //remove from the store and cache
- if err := agent.removeLogicalDeviceMeter(ctx, meterMod.MeterId); err != nil {
- return err
- }
- logger.Debugw("meterDelete-success", log.Fields{"meterID": meterMod.MeterId})
- } else {
+
+ meterHandle, have := agent.meterLoader.Lock(meterMod.MeterId)
+ if !have {
logger.Warnw("meter-not-found", log.Fields{"meterID": meterMod.MeterId})
+ return nil
}
+ defer meterHandle.Unlock()
+
+ //TODO: A meter lock is held here while flow lock(s) are acquired, if this is done in opposite order anywhere
+ // there's potential for deadlock.
+ if err := agent.deleteFlowsHavingMeter(ctx, meterMod.MeterId); err != nil {
+ return err
+ }
+
+ if err := meterHandle.Delete(ctx); err != nil {
+ return err
+ }
+
+ logger.Debugw("meterDelete-success", log.Fields{"meterID": meterMod.MeterId})
return nil
}
@@ -116,21 +112,18 @@
if meterMod == nil {
return nil
}
- newMeter := fu.MeterEntryFromMeterMod(meterMod)
- agent.meterLock.RLock()
- meterChunk, ok := agent.meters[newMeter.Config.MeterId]
- agent.meterLock.RUnlock()
- if !ok {
- return fmt.Errorf("no-meter-to-modify:%d", newMeter.Config.MeterId)
+
+ meterHandle, have := agent.meterLoader.Lock(meterMod.MeterId)
+ if !have {
+ return fmt.Errorf("no-meter-to-modify: %d", meterMod.MeterId)
}
- //Release the map lock and syncronize per meter
- meterChunk.lock.Lock()
- defer meterChunk.lock.Unlock()
- oldMeter := meterChunk.meter
+ defer meterHandle.Unlock()
+
+ oldMeter := meterHandle.GetReadOnly()
+ newMeter := fu.MeterEntryFromMeterMod(meterMod)
newMeter.Stats.FlowCount = oldMeter.Stats.FlowCount
- if err := agent.updateLogicalDeviceMeter(ctx, newMeter, meterChunk); err != nil {
- logger.Errorw("db-meter-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "meterID": newMeter.Config.MeterId})
+ if err := meterHandle.Update(ctx, newMeter); err != nil {
return err
}
logger.Debugw("replaced-with-new-meter", log.Fields{"oldMeter": oldMeter, "newMeter": newMeter})
diff --git a/rw_core/core/device/logical_agent_meter_helpers.go b/rw_core/core/device/logical_agent_meter_helpers.go
new file mode 100644
index 0000000..fc09568
--- /dev/null
+++ b/rw_core/core/device/logical_agent_meter_helpers.go
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package device
+
+import (
+ "context"
+ "fmt"
+
+ fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
+)
+
+// GetMeterConfig returns meters which which are used by the given flows
+func (agent *LogicalAgent) GetMeterConfig(flows map[uint64]*ofp.OfpFlowStats) (map[uint32]*ofp.OfpMeterConfig, error) {
+ metersConfig := make(map[uint32]*ofp.OfpMeterConfig)
+ for _, flow := range flows {
+ if flowMeterID := fu.GetMeterIdFromFlow(flow); flowMeterID != 0 {
+ if _, have := metersConfig[flowMeterID]; !have {
+ // Meter is present in the flow, Get from logical device
+ meterHandle, have := agent.meterLoader.Lock(flowMeterID)
+ if !have {
+ logger.Errorw("Meter-referred-by-flow-is-not-found-in-logicaldevice",
+ log.Fields{"meterID": flowMeterID, "Available-meters": metersConfig, "flow": *flow})
+ return nil, fmt.Errorf("Meter-referred-by-flow-is-not-found-in-logicaldevice.MeterId-%d", flowMeterID)
+ }
+
+ meter := meterHandle.GetReadOnly()
+ metersConfig[flowMeterID] = meter.Config
+ logger.Debugw("Found meter in logical device", log.Fields{"meterID": flowMeterID, "meter-band": meter.Config})
+
+ meterHandle.Unlock()
+ }
+ }
+ }
+ logger.Debugw("meter-bands-for-flows", log.Fields{"flows": len(flows), "meters": metersConfig})
+ return metersConfig, nil
+}
+
+// updateFlowCountOfMeterStats updates the number of flows associated with this meter
+func (agent *LogicalAgent) updateFlowCountOfMeterStats(ctx context.Context, modCommand *ofp.OfpFlowMod, flow *ofp.OfpFlowStats, revertUpdate bool) bool {
+ flowCommand := modCommand.GetCommand()
+ meterID := fu.GetMeterIdFromFlow(flow)
+ logger.Debugw("Meter-id-in-flow-mod", log.Fields{"meterId": meterID})
+ if meterID == 0 {
+ logger.Debugw("No-meter-present-in-the-flow", log.Fields{"flow": *flow})
+ return true
+ }
+
+ if flowCommand != ofp.OfpFlowModCommand_OFPFC_ADD && flowCommand != ofp.OfpFlowModCommand_OFPFC_DELETE_STRICT {
+ return true
+ }
+
+ meterHandle, have := agent.meterLoader.Lock(meterID)
+ if !have {
+ logger.Debugw("Meter-is-not-present-in-logical-device", log.Fields{"meterID": meterID})
+ return true
+ }
+ defer meterHandle.Unlock()
+
+ oldMeter := meterHandle.GetReadOnly()
+ // avoiding using proto.Clone by only copying what have changed (this assumes that the oldMeter will never be modified)
+ newStats := *oldMeter.Stats
+ if flowCommand == ofp.OfpFlowModCommand_OFPFC_ADD {
+ if revertUpdate {
+ newStats.FlowCount--
+ } else {
+ newStats.FlowCount++
+ }
+ } else if flowCommand == ofp.OfpFlowModCommand_OFPFC_DELETE_STRICT {
+ if revertUpdate {
+ newStats.FlowCount++
+ } else {
+ newStats.FlowCount--
+ }
+ }
+
+ newMeter := &ofp.OfpMeterEntry{
+ Config: oldMeter.Config,
+ Stats: &newStats,
+ }
+ if err := meterHandle.Update(ctx, newMeter); err != nil {
+ logger.Debugw("unable-to-update-meter-in-db", log.Fields{"logicalDevice": agent.logicalDeviceID, "meterID": meterID})
+ return false
+ }
+
+ logger.Debugw("updated-meter-flow-stats", log.Fields{"meterId": meterID})
+ return true
+}
diff --git a/rw_core/core/device/logical_agent_meter_loader.go b/rw_core/core/device/logical_agent_meter_loader.go
deleted file mode 100644
index 4408bf1..0000000
--- a/rw_core/core/device/logical_agent_meter_loader.go
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
-
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package device
-
-import (
- "context"
- "fmt"
- "sync"
-
- "github.com/gogo/protobuf/proto"
- fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
- "github.com/opencord/voltha-lib-go/v3/pkg/log"
- ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
- "github.com/opencord/voltha-protos/v3/go/voltha"
-)
-
-//MeterChunk keeps a meter entry and its lock. The lock in the struct is used to syncronize the
-//modifications for the related meter.
-type MeterChunk struct {
- meter *ofp.OfpMeterEntry
- lock sync.Mutex
-}
-
-func (agent *LogicalAgent) loadMeters(ctx context.Context) {
- agent.meterLock.Lock()
- defer agent.meterLock.Unlock()
-
- var meters []*ofp.OfpMeterEntry
- if err := agent.clusterDataProxy.List(ctx, "meters/"+agent.logicalDeviceID, &meters); err != nil {
- logger.Errorw("Failed-to-list-meters-from-proxy", log.Fields{"error": err})
- return
- }
- for _, meter := range meters {
- if meter.Config != nil {
- meterChunk := MeterChunk{
- meter: meter,
- }
- agent.meters[meter.Config.MeterId] = &meterChunk
- }
- }
-}
-
-//updateLogicalDeviceMeter updates meter info in store and cache
-//It is assumed that the chunk lock has been acquired before this function is called
-func (agent *LogicalAgent) updateLogicalDeviceMeter(ctx context.Context, meter *ofp.OfpMeterEntry, meterChunk *MeterChunk) error {
- path := fmt.Sprintf("meters/%s/%d", agent.logicalDeviceID, meter.Config.MeterId)
- if err := agent.clusterDataProxy.Update(ctx, path, meter); err != nil {
- logger.Errorw("error-updating-logical-device-with-meters", log.Fields{"error": err})
- return err
- }
- meterChunk.meter = meter
- return nil
-}
-
-//removeLogicalDeviceMeter deletes the meter from store and cache
-//It is assumed that the chunk lock has been acquired before this function is called
-func (agent *LogicalAgent) removeLogicalDeviceMeter(ctx context.Context, meterID uint32) error {
- path := fmt.Sprintf("meters/%s/%d", agent.logicalDeviceID, meterID)
- if err := agent.clusterDataProxy.Remove(ctx, path); err != nil {
- return fmt.Errorf("couldnt-delete-meter-from-store-%s", path)
- }
- agent.meterLock.Lock()
- defer agent.meterLock.Unlock()
- delete(agent.meters, meterID)
- return nil
-}
-
-// ListLogicalDeviceMeters returns logical device meters
-func (agent *LogicalAgent) ListLogicalDeviceMeters(ctx context.Context) (*ofp.Meters, error) {
- logger.Debug("ListLogicalDeviceMeters")
-
- var meterEntries []*ofp.OfpMeterEntry
- agent.meterLock.RLock()
- defer agent.meterLock.RUnlock()
- for _, meterChunk := range agent.meters {
- meterEntries = append(meterEntries, (proto.Clone(meterChunk.meter)).(*ofp.OfpMeterEntry))
- }
- return &ofp.Meters{Items: meterEntries}, nil
-}
-
-// GetMeterConfig returns meter config
-func (agent *LogicalAgent) GetMeterConfig(flows []*ofp.OfpFlowStats, meters []*ofp.OfpMeterEntry, metadata *voltha.FlowMetadata) error {
- m := make(map[uint32]bool)
- for _, flow := range flows {
- if flowMeterID := fu.GetMeterIdFromFlow(flow); flowMeterID != 0 && !m[flowMeterID] {
- foundMeter := false
- // Meter is present in the flow , Get from logical device
- for _, meter := range meters {
- if flowMeterID == meter.Config.MeterId {
- metadata.Meters = append(metadata.Meters, meter.Config)
- logger.Debugw("Found meter in logical device",
- log.Fields{"meterID": flowMeterID, "meter-band": meter.Config})
- m[flowMeterID] = true
- foundMeter = true
- break
- }
- }
- if !foundMeter {
- logger.Errorw("Meter-referred-by-flow-is-not-found-in-logicaldevice",
- log.Fields{"meterID": flowMeterID, "Available-meters": meters, "flow": *flow})
- return fmt.Errorf("Meter-referred-by-flow-is-not-found-in-logicaldevice.MeterId-%d", flowMeterID)
- }
- }
- }
- logger.Debugw("meter-bands-for-flows", log.Fields{"flows": len(flows), "metadata": metadata})
- return nil
-}
-
-func (agent *LogicalAgent) updateFlowCountOfMeterStats(ctx context.Context, modCommand *ofp.OfpFlowMod, flow *ofp.OfpFlowStats, revertUpdate bool) bool {
- flowCommand := modCommand.GetCommand()
- meterID := fu.GetMeterIdFromFlow(flow)
- logger.Debugw("Meter-id-in-flow-mod", log.Fields{"meterId": meterID})
- if meterID == 0 {
- logger.Debugw("No-meter-present-in-the-flow", log.Fields{"flow": *flow})
- return true
- }
-
- if flowCommand != ofp.OfpFlowModCommand_OFPFC_ADD && flowCommand != ofp.OfpFlowModCommand_OFPFC_DELETE_STRICT {
- return true
- }
- agent.meterLock.RLock()
- meterChunk, ok := agent.meters[meterID]
- agent.meterLock.RUnlock()
- if !ok {
- logger.Debugw("Meter-is-not-present-in-logical-device", log.Fields{"meterID": meterID})
- return true
- }
-
- //acquire the meter lock
- meterChunk.lock.Lock()
- defer meterChunk.lock.Unlock()
-
- if flowCommand == ofp.OfpFlowModCommand_OFPFC_ADD {
- if revertUpdate {
- meterChunk.meter.Stats.FlowCount--
- } else {
- meterChunk.meter.Stats.FlowCount++
- }
- } else if flowCommand == ofp.OfpFlowModCommand_OFPFC_DELETE_STRICT {
- if revertUpdate {
- meterChunk.meter.Stats.FlowCount++
- } else {
- meterChunk.meter.Stats.FlowCount--
- }
- }
-
- // Update store and cache
- if err := agent.updateLogicalDeviceMeter(ctx, meterChunk.meter, meterChunk); err != nil {
- logger.Debugw("unable-to-update-meter-in-db", log.Fields{"logicalDevice": agent.logicalDeviceID, "meterID": meterID})
- return false
- }
-
- logger.Debugw("updated-meter-flow-stats", log.Fields{"meterId": meterID})
- return true
-}
diff --git a/rw_core/core/device/logical_agent_test.go b/rw_core/core/device/logical_agent_test.go
index 8ec5454..f5d404e 100644
--- a/rw_core/core/device/logical_agent_test.go
+++ b/rw_core/core/device/logical_agent_test.go
@@ -571,9 +571,12 @@
localWG.Wait()
meterEntry := fu.MeterEntryFromMeterMod(meterMod)
- meterChunk, ok := ldAgent.meters[meterMod.MeterId]
- assert.Equal(t, ok, true)
- assert.True(t, proto.Equal(meterEntry, meterChunk.meter))
+ meterHandle, have := ldAgent.meterLoader.Lock(meterMod.MeterId)
+ assert.Equal(t, have, true)
+ if have {
+ assert.True(t, proto.Equal(meterEntry, meterHandle.GetReadOnly()))
+ meterHandle.Unlock()
+ }
expectedChange := proto.Clone(originalLogicalDevice).(*voltha.LogicalDevice)
expectedChange.Ports[0].OfpPort.Config = originalLogicalDevice.Ports[0].OfpPort.Config | uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
diff --git a/rw_core/core/device/logical_manager.go b/rw_core/core/device/logical_manager.go
index d9b6731..cd7bce1 100644
--- a/rw_core/core/device/logical_manager.go
+++ b/rw_core/core/device/logical_manager.go
@@ -19,15 +19,15 @@
import (
"context"
"errors"
- "github.com/golang/protobuf/ptypes/empty"
- "github.com/opencord/voltha-go/rw_core/core/device/event"
- "github.com/opencord/voltha-go/rw_core/utils"
"io"
"strings"
"sync"
"time"
+ "github.com/golang/protobuf/ptypes/empty"
"github.com/opencord/voltha-go/db/model"
+ "github.com/opencord/voltha-go/rw_core/core/device/event"
+ "github.com/opencord/voltha-go/rw_core/utils"
"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
"github.com/opencord/voltha-protos/v3/go/openflow_13"
@@ -297,19 +297,35 @@
// ListLogicalDeviceFlows returns the flows of logical device
func (ldMgr *LogicalManager) ListLogicalDeviceFlows(ctx context.Context, id *voltha.ID) (*openflow_13.Flows, error) {
logger.Debugw("ListLogicalDeviceFlows", log.Fields{"logicaldeviceid": id.Id})
- if agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id); agent != nil {
- return agent.ListLogicalDeviceFlows(ctx)
+ agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id)
+ if agent == nil {
+ return nil, status.Errorf(codes.NotFound, "%s", id.Id)
}
- return nil, status.Errorf(codes.NotFound, "%s", id.Id)
+
+ flows := agent.listLogicalDeviceFlows()
+ ctr, ret := 0, make([]*openflow_13.OfpFlowStats, len(flows))
+ for _, flow := range flows {
+ ret[ctr] = flow
+ ctr++
+ }
+ return &openflow_13.Flows{Items: ret}, nil
}
// ListLogicalDeviceFlowGroups returns logical device flow groups
func (ldMgr *LogicalManager) ListLogicalDeviceFlowGroups(ctx context.Context, id *voltha.ID) (*openflow_13.FlowGroups, error) {
logger.Debugw("ListLogicalDeviceFlowGroups", log.Fields{"logicaldeviceid": id.Id})
- if agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id); agent != nil {
- return agent.ListLogicalDeviceFlowGroups(ctx)
+ agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id)
+ if agent == nil {
+ return nil, status.Errorf(codes.NotFound, "%s", id.Id)
}
- return nil, status.Errorf(codes.NotFound, "%s", id.Id)
+
+ groups := agent.listLogicalDeviceGroups()
+ ctr, ret := 0, make([]*openflow_13.OfpGroupEntry, len(groups))
+ for _, group := range groups {
+ ret[ctr] = group
+ ctr++
+ }
+ return &openflow_13.FlowGroups{Items: ret}, nil
}
// ListLogicalDevicePorts returns logical device ports
@@ -503,7 +519,13 @@
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", id.Id)
}
- return agent.ListLogicalDeviceMeters(ctx)
+ meters := agent.listLogicalDeviceMeters()
+ ctr, ret := 0, make([]*openflow_13.OfpMeterEntry, len(meters))
+ for _, meter := range meters {
+ ret[ctr] = meter
+ ctr++
+ }
+ return &openflow_13.Meters{Items: ret}, nil
}
// UpdateLogicalDeviceFlowGroupTable updates logical device flow group table
diff --git a/rw_core/core/device/meter/common.go b/rw_core/core/device/meter/common.go
new file mode 100644
index 0000000..81f7b14
--- /dev/null
+++ b/rw_core/core/device/meter/common.go
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package core Common Logger initialization
+package meter
+
+import (
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+)
+
+var logger log.Logger
+
+func init() {
+ // Setup this package so that it's log level can be modified at run time
+ var err error
+ logger, err = log.AddPackage(log.JSON, log.ErrorLevel, log.Fields{"pkg": "meter"})
+ if err != nil {
+ panic(err)
+ }
+}
diff --git a/rw_core/core/device/meter/loader.go b/rw_core/core/device/meter/loader.go
new file mode 100644
index 0000000..ae6c957
--- /dev/null
+++ b/rw_core/core/device/meter/loader.go
@@ -0,0 +1,194 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package meter
+
+import (
+ "context"
+ "fmt"
+ "strconv"
+ "sync"
+
+ "github.com/opencord/voltha-go/db/model"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+// Loader hides all low-level locking & synchronization related to meter state updates
+type Loader struct {
+ // this lock protects the meters map, it does not protect individual meters
+ lock sync.RWMutex
+ meters map[uint32]*chunk
+
+ dbProxy *model.Proxy
+ logicalDeviceID string // TODO: dbProxy should already have the logicalDeviceID component of the path internally
+}
+
+// chunk keeps a meter and the lock for this meter
+type chunk struct {
+ // this lock is used to synchronize all access to the meter, and also to the "deleted" variable
+ lock sync.Mutex
+ deleted bool
+
+ meter *ofp.OfpMeterEntry
+}
+
+func NewLoader(dataProxy *model.Proxy, logicalDeviceID string) *Loader {
+ return &Loader{
+ meters: make(map[uint32]*chunk),
+ dbProxy: dataProxy,
+ logicalDeviceID: logicalDeviceID,
+ }
+}
+
+// Load queries existing meters from the kv,
+// and should only be called once when first created.
+func (loader *Loader) Load(ctx context.Context) {
+ loader.lock.Lock()
+ defer loader.lock.Unlock()
+
+ var meters []*ofp.OfpMeterEntry
+ if err := loader.dbProxy.List(ctx, "logical_meters/"+loader.logicalDeviceID, &meters); err != nil {
+ logger.Errorw("failed-to-list-meters-from-cluster-data-proxy", log.Fields{"error": err})
+ return
+ }
+ for _, meter := range meters {
+ loader.meters[meter.Config.MeterId] = &chunk{meter: meter}
+ }
+}
+
+// LockOrCreate locks this meter if it exists, or creates a new meter if it does not.
+// In the case of meter creation, the provided "meter" must not be modified afterwards.
+func (loader *Loader) LockOrCreate(ctx context.Context, meter *ofp.OfpMeterEntry) (*Handle, bool, error) {
+ // try to use read lock instead of full lock if possible
+ if handle, have := loader.Lock(meter.Config.MeterId); have {
+ return handle, false, nil
+ }
+
+ loader.lock.Lock()
+ entry, have := loader.meters[meter.Config.MeterId]
+ if !have {
+ entry := &chunk{meter: meter}
+ loader.meters[meter.Config.MeterId] = entry
+ entry.lock.Lock()
+ loader.lock.Unlock()
+
+ meterID := strconv.FormatUint(uint64(meter.Config.MeterId), 10)
+ if err := loader.dbProxy.AddWithID(ctx, "logical_meters/"+loader.logicalDeviceID, meterID, meter); err != nil {
+ logger.Errorw("failed-adding-meter-to-db", log.Fields{"deviceID": loader.logicalDeviceID, "meterID": meterID, "err": err})
+
+ // revert the map
+ loader.lock.Lock()
+ delete(loader.meters, meter.Config.MeterId)
+ loader.lock.Unlock()
+
+ entry.deleted = true
+ entry.lock.Unlock()
+ return nil, false, err
+ }
+ return &Handle{loader: loader, chunk: entry}, true, nil
+ }
+ loader.lock.Unlock()
+
+ entry.lock.Lock()
+ if entry.deleted {
+ entry.lock.Unlock()
+ return loader.LockOrCreate(ctx, meter)
+ }
+ return &Handle{loader: loader, chunk: entry}, false, nil
+}
+
+// Lock acquires the lock for this meter, and returns a handle which can be used to access the meter until it's unlocked.
+// This handle ensures that the meter cannot be accessed if the lock is not held.
+// Returns false if the meter is not present.
+// TODO: consider accepting a ctx and aborting the lock attempt on cancellation
+func (loader *Loader) Lock(id uint32) (*Handle, bool) {
+ loader.lock.RLock()
+ entry, have := loader.meters[id]
+ loader.lock.RUnlock()
+
+ if !have {
+ return nil, false
+ }
+
+ entry.lock.Lock()
+ if entry.deleted {
+ entry.lock.Unlock()
+ return loader.Lock(id)
+ }
+ return &Handle{loader: loader, chunk: entry}, true
+}
+
+type Handle struct {
+ loader *Loader
+ chunk *chunk
+}
+
+// GetReadOnly returns an *ofp.OfpMeterEntry which MUST NOT be modified externally, but which is safe to keep indefinitely
+func (h *Handle) GetReadOnly() *ofp.OfpMeterEntry {
+ return h.chunk.meter
+}
+
+// Update updates an existing meter in the kv.
+// The provided "meter" must not be modified afterwards.
+func (h *Handle) Update(ctx context.Context, meter *ofp.OfpMeterEntry) error {
+ path := fmt.Sprintf("logical_meters/%s/%d", h.loader.logicalDeviceID, meter.Config.MeterId)
+ if err := h.loader.dbProxy.Update(ctx, path, meter); err != nil {
+ return status.Errorf(codes.Internal, "failed-update-meter:%s:%d %s", h.loader.logicalDeviceID, meter.Config.MeterId, err)
+ }
+ h.chunk.meter = meter
+ return nil
+}
+
+// Delete removes the device from the kv
+func (h *Handle) Delete(ctx context.Context) error {
+ path := fmt.Sprintf("logical_meters/%s/%d", h.loader.logicalDeviceID, h.chunk.meter.Config.MeterId)
+ if err := h.loader.dbProxy.Remove(ctx, path); err != nil {
+ return fmt.Errorf("couldnt-delete-meter-from-store-%s", path)
+ }
+ h.chunk.deleted = true
+
+ h.loader.lock.Lock()
+ delete(h.loader.meters, h.chunk.meter.Config.MeterId)
+ h.loader.lock.Unlock()
+
+ h.Unlock()
+ return nil
+}
+
+// Unlock releases the lock on the meter
+func (h *Handle) Unlock() {
+ if h.chunk != nil {
+ h.chunk.lock.Unlock()
+ h.chunk = nil // attempting to access the meter through this handle in future will panic
+ }
+}
+
+// List returns a snapshot of all the managed meter IDs
+// TODO: iterating through meters safely is expensive now, since all meters are stored & locked separately
+// should avoid this where possible
+func (loader *Loader) List() map[uint32]struct{} {
+ loader.lock.RLock()
+ defer loader.lock.RUnlock()
+ // copy the IDs so caller can safely iterate
+ ret := make(map[uint32]struct{}, len(loader.meters))
+ for id := range loader.meters {
+ ret[id] = struct{}{}
+ }
+ return ret
+}