blob: 84d1a47bc199737ab37eb4aff7fb7efd61213926 [file] [log] [blame]
/*
* Copyright 2018-present Open Networking Foundation
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package device
import (
"context"
"fmt"
"sync"
"github.com/gogo/protobuf/proto"
fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
//FlowChunk keeps a flow and the lock for this flow. The lock in the struct is used to syncronize the
//modifications for the related flow.
type FlowChunk struct {
flow *ofp.OfpFlowStats
lock sync.Mutex
}
func (agent *LogicalAgent) loadFlows(ctx context.Context) {
agent.flowLock.Lock()
defer agent.flowLock.Unlock()
var flowList []*ofp.OfpFlowStats
if err := agent.clusterDataProxy.List(ctx, "logical_flows/"+agent.logicalDeviceID, &flowList); err != nil {
logger.Errorw("Failed-to-list-logicalflows-from-cluster-data-proxy", log.Fields{"error": err})
return
}
for _, flow := range flowList {
if flow != nil {
flowsChunk := FlowChunk{
flow: flow,
}
agent.flows[flow.Id] = &flowsChunk
}
}
}
//updateLogicalDeviceFlow updates flow in the store and cache
//It is assumed that the chunk lock has been acquired before this function is called
func (agent *LogicalAgent) updateLogicalDeviceFlow(ctx context.Context, flow *ofp.OfpFlowStats, flowChunk *FlowChunk) error {
path := fmt.Sprintf("logical_flows/%s/%d", agent.logicalDeviceID, flow.Id)
if err := agent.clusterDataProxy.Update(ctx, path, flow); err != nil {
return status.Errorf(codes.Internal, "failed-update-flow:%s:%d %s", agent.logicalDeviceID, flow.Id, err)
}
flowChunk.flow = flow
return nil
}
//removeLogicalDeviceFlow deletes the flow from store and cache.
//It is assumed that the chunk lock has been acquired before this function is called
func (agent *LogicalAgent) removeLogicalDeviceFlow(ctx context.Context, flowID uint64) error {
path := fmt.Sprintf("logical_flows/%s/%d", agent.logicalDeviceID, flowID)
if err := agent.clusterDataProxy.Remove(ctx, path); err != nil {
return fmt.Errorf("couldnt-delete-flow-from-the-store-%s", path)
}
agent.flowLock.Lock()
defer agent.flowLock.Unlock()
delete(agent.flows, flowID)
return nil
}
// ListLogicalDeviceFlows returns logical device flows
func (agent *LogicalAgent) ListLogicalDeviceFlows(ctx context.Context) (*ofp.Flows, error) {
logger.Debug("ListLogicalDeviceFlows")
var flowStats []*ofp.OfpFlowStats
agent.flowLock.RLock()
defer agent.flowLock.RUnlock()
for _, flowChunk := range agent.flows {
flowStats = append(flowStats, (proto.Clone(flowChunk.flow)).(*ofp.OfpFlowStats))
}
return &ofp.Flows{Items: flowStats}, nil
}
func (agent *LogicalAgent) deleteFlowsOfMeter(ctx context.Context, meterID uint32) error {
logger.Infow("Delete-flows-matching-meter", log.Fields{"meter": meterID})
agent.flowLock.Lock()
defer agent.flowLock.Unlock()
for flowID, flowChunk := range agent.flows {
if mID := fu.GetMeterIdFromFlow(flowChunk.flow); mID != 0 && mID == meterID {
logger.Debugw("Flow-to-be- deleted", log.Fields{"flow": flowChunk.flow})
path := fmt.Sprintf("logical_flows/%s/%d", agent.logicalDeviceID, flowID)
if err := agent.clusterDataProxy.Remove(ctx, path); err != nil {
//TODO: Think on carrying on and deleting the remaining flows, instead of returning.
//Anyways this returns an error to controller which possibly results with a re-deletion.
//Then how can we handle the new deletion request(Same for group deletion)?
return fmt.Errorf("couldnt-deleted-flow-from-store-%s", path)
}
delete(agent.flows, flowID)
}
}
return nil
}
func (agent *LogicalAgent) deleteFlowsOfGroup(ctx context.Context, groupID uint32) ([]*ofp.OfpFlowStats, error) {
logger.Infow("Delete-flows-matching-group", log.Fields{"groupID": groupID})
var flowsRemoved []*ofp.OfpFlowStats
agent.flowLock.Lock()
defer agent.flowLock.Unlock()
for flowID, flowChunk := range agent.flows {
if fu.FlowHasOutGroup(flowChunk.flow, groupID) {
path := fmt.Sprintf("logical_flows/%s/%d", agent.logicalDeviceID, flowID)
if err := agent.clusterDataProxy.Remove(ctx, path); err != nil {
return nil, fmt.Errorf("couldnt-delete-flow-from-store-%s", path)
}
delete(agent.flows, flowID)
flowsRemoved = append(flowsRemoved, flowChunk.flow)
}
}
return flowsRemoved, nil
}