blob: 84d1a47bc199737ab37eb4aff7fb7efd61213926 [file] [log] [blame]
Kent Hagerman3136fbd2020-05-14 10:30:45 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package device
18
19import (
20 "context"
21 "fmt"
22 "sync"
23
24 "github.com/gogo/protobuf/proto"
25 fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
26 "github.com/opencord/voltha-lib-go/v3/pkg/log"
27 ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
28 "google.golang.org/grpc/codes"
29 "google.golang.org/grpc/status"
30)
31
32//FlowChunk keeps a flow and the lock for this flow. The lock in the struct is used to syncronize the
33//modifications for the related flow.
34type FlowChunk struct {
35 flow *ofp.OfpFlowStats
36 lock sync.Mutex
37}
38
39func (agent *LogicalAgent) loadFlows(ctx context.Context) {
40 agent.flowLock.Lock()
41 defer agent.flowLock.Unlock()
42
43 var flowList []*ofp.OfpFlowStats
44 if err := agent.clusterDataProxy.List(ctx, "logical_flows/"+agent.logicalDeviceID, &flowList); err != nil {
45 logger.Errorw("Failed-to-list-logicalflows-from-cluster-data-proxy", log.Fields{"error": err})
46 return
47 }
48 for _, flow := range flowList {
49 if flow != nil {
50 flowsChunk := FlowChunk{
51 flow: flow,
52 }
53 agent.flows[flow.Id] = &flowsChunk
54 }
55 }
56}
57
58//updateLogicalDeviceFlow updates flow in the store and cache
59//It is assumed that the chunk lock has been acquired before this function is called
60func (agent *LogicalAgent) updateLogicalDeviceFlow(ctx context.Context, flow *ofp.OfpFlowStats, flowChunk *FlowChunk) error {
61 path := fmt.Sprintf("logical_flows/%s/%d", agent.logicalDeviceID, flow.Id)
62 if err := agent.clusterDataProxy.Update(ctx, path, flow); err != nil {
63 return status.Errorf(codes.Internal, "failed-update-flow:%s:%d %s", agent.logicalDeviceID, flow.Id, err)
64 }
65 flowChunk.flow = flow
66 return nil
67}
68
69//removeLogicalDeviceFlow deletes the flow from store and cache.
70//It is assumed that the chunk lock has been acquired before this function is called
71func (agent *LogicalAgent) removeLogicalDeviceFlow(ctx context.Context, flowID uint64) error {
72 path := fmt.Sprintf("logical_flows/%s/%d", agent.logicalDeviceID, flowID)
73 if err := agent.clusterDataProxy.Remove(ctx, path); err != nil {
74 return fmt.Errorf("couldnt-delete-flow-from-the-store-%s", path)
75 }
76 agent.flowLock.Lock()
77 defer agent.flowLock.Unlock()
78 delete(agent.flows, flowID)
79 return nil
80}
81
82// ListLogicalDeviceFlows returns logical device flows
83func (agent *LogicalAgent) ListLogicalDeviceFlows(ctx context.Context) (*ofp.Flows, error) {
84 logger.Debug("ListLogicalDeviceFlows")
85 var flowStats []*ofp.OfpFlowStats
86 agent.flowLock.RLock()
87 defer agent.flowLock.RUnlock()
88 for _, flowChunk := range agent.flows {
89 flowStats = append(flowStats, (proto.Clone(flowChunk.flow)).(*ofp.OfpFlowStats))
90 }
91 return &ofp.Flows{Items: flowStats}, nil
92}
93
94func (agent *LogicalAgent) deleteFlowsOfMeter(ctx context.Context, meterID uint32) error {
95 logger.Infow("Delete-flows-matching-meter", log.Fields{"meter": meterID})
96 agent.flowLock.Lock()
97 defer agent.flowLock.Unlock()
98 for flowID, flowChunk := range agent.flows {
99 if mID := fu.GetMeterIdFromFlow(flowChunk.flow); mID != 0 && mID == meterID {
100 logger.Debugw("Flow-to-be- deleted", log.Fields{"flow": flowChunk.flow})
101 path := fmt.Sprintf("logical_flows/%s/%d", agent.logicalDeviceID, flowID)
102 if err := agent.clusterDataProxy.Remove(ctx, path); err != nil {
103 //TODO: Think on carrying on and deleting the remaining flows, instead of returning.
104 //Anyways this returns an error to controller which possibly results with a re-deletion.
105 //Then how can we handle the new deletion request(Same for group deletion)?
106 return fmt.Errorf("couldnt-deleted-flow-from-store-%s", path)
107 }
108 delete(agent.flows, flowID)
109 }
110 }
111 return nil
112}
113
114func (agent *LogicalAgent) deleteFlowsOfGroup(ctx context.Context, groupID uint32) ([]*ofp.OfpFlowStats, error) {
115 logger.Infow("Delete-flows-matching-group", log.Fields{"groupID": groupID})
116 var flowsRemoved []*ofp.OfpFlowStats
117 agent.flowLock.Lock()
118 defer agent.flowLock.Unlock()
119 for flowID, flowChunk := range agent.flows {
120 if fu.FlowHasOutGroup(flowChunk.flow, groupID) {
121 path := fmt.Sprintf("logical_flows/%s/%d", agent.logicalDeviceID, flowID)
122 if err := agent.clusterDataProxy.Remove(ctx, path); err != nil {
123 return nil, fmt.Errorf("couldnt-delete-flow-from-store-%s", path)
124 }
125 delete(agent.flows, flowID)
126 flowsRemoved = append(flowsRemoved, flowChunk.flow)
127 }
128 }
129 return flowsRemoved, nil
130}