blob: 0e811e761571488bbd6be45fa8eab42e2834afae [file] [log] [blame]
Kent Hagerman3136fbd2020-05-14 10:30:45 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package device
18
19import (
20 "context"
21 "errors"
22 "fmt"
Kent Hagerman3136fbd2020-05-14 10:30:45 -040023 "github.com/gogo/protobuf/proto"
24 "github.com/opencord/voltha-go/rw_core/route"
25 coreutils "github.com/opencord/voltha-go/rw_core/utils"
26 fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
27 "github.com/opencord/voltha-lib-go/v3/pkg/log"
28 ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
29 "github.com/opencord/voltha-protos/v3/go/voltha"
30 "google.golang.org/grpc/codes"
31 "google.golang.org/grpc/status"
khenaidoo0db4c812020-05-27 15:27:30 -040032 "strconv"
Kent Hagerman3136fbd2020-05-14 10:30:45 -040033)
34
35//updateFlowTable updates the flow table of that logical device
36func (agent *LogicalAgent) updateFlowTable(ctx context.Context, flow *ofp.OfpFlowMod) error {
37 logger.Debug("UpdateFlowTable")
38 if flow == nil {
39 return nil
40 }
41
Kent Hagerman3136fbd2020-05-14 10:30:45 -040042 switch flow.GetCommand() {
43 case ofp.OfpFlowModCommand_OFPFC_ADD:
44 return agent.flowAdd(ctx, flow)
45 case ofp.OfpFlowModCommand_OFPFC_DELETE:
46 return agent.flowDelete(ctx, flow)
47 case ofp.OfpFlowModCommand_OFPFC_DELETE_STRICT:
48 return agent.flowDeleteStrict(ctx, flow)
49 case ofp.OfpFlowModCommand_OFPFC_MODIFY:
50 return agent.flowModify(flow)
51 case ofp.OfpFlowModCommand_OFPFC_MODIFY_STRICT:
52 return agent.flowModifyStrict(flow)
53 }
54 return status.Errorf(codes.Internal,
55 "unhandled-command: lDeviceId:%s, command:%s", agent.logicalDeviceID, flow.GetCommand())
56}
57
58//flowAdd adds a flow to the flow table of that logical device
59func (agent *LogicalAgent) flowAdd(ctx context.Context, mod *ofp.OfpFlowMod) error {
60 logger.Debugw("flowAdd", log.Fields{"flow": mod})
61 if mod == nil {
62 return nil
63 }
64 flow, err := fu.FlowStatsEntryFromFlowModMessage(mod)
65 if err != nil {
66 logger.Errorw("flowAdd-failed", log.Fields{"flowMod": mod, "err": err})
67 return err
68 }
69 var updated bool
70 var changed bool
71 if changed, updated, err = agent.decomposeAndAdd(ctx, flow, mod); err != nil {
72 logger.Errorw("flow-decompose-and-add-failed ", log.Fields{"flowMod": mod, "err": err})
73 return err
74 }
75 if changed && !updated {
76 if dbupdated := agent.updateFlowCountOfMeterStats(ctx, mod, flow, false); !dbupdated {
77 return fmt.Errorf("couldnt-updated-flow-stats-%s", strconv.FormatUint(flow.Id, 10))
78 }
79 }
80 return nil
81
82}
83
84func (agent *LogicalAgent) decomposeAndAdd(ctx context.Context, flow *ofp.OfpFlowStats, mod *ofp.OfpFlowMod) (bool, bool, error) {
85 changed := false
86 updated := false
87 alreadyExist := true
88 var flowToReplace *ofp.OfpFlowStats
89
90 //if flow is not found in the map, create a new entry, otherwise get the existing one.
91 agent.flowLock.Lock()
92 flowChunk, ok := agent.flows[flow.Id]
93 if !ok {
94 flowChunk = &FlowChunk{
95 flow: flow,
96 }
97 agent.flows[flow.Id] = flowChunk
98 alreadyExist = false
99 flowChunk.lock.Lock() //acquire chunk lock before releasing map lock
100 defer flowChunk.lock.Unlock()
101 agent.flowLock.Unlock()
102 } else {
103 agent.flowLock.Unlock() //release map lock before acquiring chunk lock
104 flowChunk.lock.Lock()
105 defer flowChunk.lock.Unlock()
106 }
107
108 if !alreadyExist {
109 flowID := strconv.FormatUint(flow.Id, 10)
110 if err := agent.clusterDataProxy.AddWithID(ctx, "logical_flows/"+agent.logicalDeviceID, flowID, flow); err != nil {
111 logger.Errorw("failed-adding-flow-to-db", log.Fields{"deviceID": agent.logicalDeviceID, "flowID": flowID, "err": err})
112 //Revert the map
113 //TODO: Solve the condition:If we have two flow Adds of the same flow (at least same priority and match) in quick succession
114 //then if the first one fails while the second one was waiting on the flowchunk, we will end up with an instance of flowChunk that is no longer in the map.
115 agent.flowLock.Lock()
116 delete(agent.flows, flow.Id)
117 agent.flowLock.Unlock()
118 return changed, updated, err
119 }
120 }
121 flows := make([]*ofp.OfpFlowStats, 0)
122 updatedFlows := make([]*ofp.OfpFlowStats, 0)
123 checkOverlap := (mod.Flags & uint32(ofp.OfpFlowModFlags_OFPFF_CHECK_OVERLAP)) != 0
124 if checkOverlap {
125 if overlapped := fu.FindOverlappingFlows(flows, mod); len(overlapped) != 0 {
126 // TODO: should this error be notified other than being logged?
127 logger.Warnw("overlapped-flows", log.Fields{"logicaldeviceId": agent.logicalDeviceID})
128 } else {
129 // Add flow
130 changed = true
131 }
132 } else {
133 if alreadyExist {
134 flowToReplace = flowChunk.flow
135 if (mod.Flags & uint32(ofp.OfpFlowModFlags_OFPFF_RESET_COUNTS)) != 0 {
136 flow.ByteCount = flowToReplace.ByteCount
137 flow.PacketCount = flowToReplace.PacketCount
138 }
139 if !proto.Equal(flowToReplace, flow) {
140 changed = true
141 updated = true
142 }
143 } else {
144 changed = true
145 }
146 }
147 logger.Debugw("flowAdd-changed", log.Fields{"changed": changed, "updated": updated})
148 if changed {
149 updatedFlows = append(updatedFlows, flow)
150 var flowMetadata voltha.FlowMetadata
151 lMeters, _ := agent.ListLogicalDeviceMeters(ctx)
152 if err := agent.GetMeterConfig(updatedFlows, lMeters.Items, &flowMetadata); err != nil {
153 logger.Error("Meter-referred-in-flow-not-present")
154 return changed, updated, err
155 }
156 flowGroups, _ := agent.ListLogicalDeviceFlowGroups(ctx)
157 deviceRules, err := agent.flowDecomposer.DecomposeRules(ctx, agent, ofp.Flows{Items: updatedFlows}, *flowGroups)
158 if err != nil {
159 return changed, updated, err
160 }
161
162 logger.Debugw("rules", log.Fields{"rules": deviceRules.String()})
163 // Update store and cache
164 if updated {
165 if err := agent.updateLogicalDeviceFlow(ctx, flow, flowChunk); err != nil {
166 return changed, updated, err
167 }
168 }
khenaidoo0db4c812020-05-27 15:27:30 -0400169
170 respChannels := agent.addFlowsAndGroupsToDevices(deviceRules, &flowMetadata)
Kent Hagerman3136fbd2020-05-14 10:30:45 -0400171 // Create the go routines to wait
172 go func() {
173 // Wait for completion
174 if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChannels...); res != nil {
175 logger.Infow("failed-to-add-flows-will-attempt-deletion", log.Fields{"errors": res, "logical-device-id": agent.logicalDeviceID})
176 // Revert added flows
177 if err := agent.revertAddedFlows(context.Background(), mod, flow, flowToReplace, deviceRules, &flowMetadata); err != nil {
178 logger.Errorw("failure-to-delete-flows-after-failed-addition", log.Fields{"logical-device-id": agent.logicalDeviceID, "error": err})
179 }
180 }
181 }()
182 }
183 return changed, updated, nil
184}
185
186// revertAddedFlows reverts flows after the flowAdd request has failed. All flows corresponding to that flowAdd request
187// will be reverted, both from the logical devices and the devices.
188func (agent *LogicalAgent) revertAddedFlows(ctx context.Context, mod *ofp.OfpFlowMod, addedFlow *ofp.OfpFlowStats, replacedFlow *ofp.OfpFlowStats, deviceRules *fu.DeviceRules, metadata *voltha.FlowMetadata) error {
189 logger.Debugw("revertFlowAdd", log.Fields{"added-flow": addedFlow, "replaced-flow": replacedFlow, "device-rules": deviceRules, "metadata": metadata})
190
191 agent.flowLock.RLock()
192 flowChunk, ok := agent.flows[addedFlow.Id]
193 agent.flowLock.RUnlock()
194 if !ok {
195 // Not found - do nothing
196 log.Debugw("flow-not-found", log.Fields{"added-flow": addedFlow})
197 return nil
198 }
199 //Leave the map lock and syncronize per flow
200 flowChunk.lock.Lock()
201 defer flowChunk.lock.Unlock()
202
203 if replacedFlow != nil {
204 if err := agent.updateLogicalDeviceFlow(ctx, replacedFlow, flowChunk); err != nil {
205 return err
206 }
207 } else {
208 if err := agent.removeLogicalDeviceFlow(ctx, addedFlow.Id); err != nil {
209 return err
210 }
211 }
212 // Revert meters
213 if changedMeterStats := agent.updateFlowCountOfMeterStats(ctx, mod, addedFlow, true); !changedMeterStats {
214 return fmt.Errorf("Unable-to-revert-meterstats-for-flow-%s", strconv.FormatUint(addedFlow.Id, 10))
215 }
216
217 // Update the devices
khenaidoo0db4c812020-05-27 15:27:30 -0400218 respChnls := agent.deleteFlowsAndGroupsFromDevices(deviceRules, metadata)
Kent Hagerman3136fbd2020-05-14 10:30:45 -0400219
220 // Wait for the responses
221 go func() {
222 // Since this action is taken following an add failure, we may also receive a failure for the revert
223 if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
224 logger.Warnw("failure-reverting-added-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
225 }
226 }()
227
228 return nil
229}
230
231//flowDelete deletes a flow from the flow table of that logical device
232func (agent *LogicalAgent) flowDelete(ctx context.Context, mod *ofp.OfpFlowMod) error {
233 logger.Debug("flowDelete")
234 if mod == nil {
235 return nil
236 }
237
238 fs, err := fu.FlowStatsEntryFromFlowModMessage(mod)
239 if err != nil {
240 return err
241 }
242
243 //build a list of what to delete
244 toDelete := make([]*ofp.OfpFlowStats, 0)
245 toDeleteChunks := make([]*FlowChunk, 0)
246 //Lock the map to search the matched flows
247 agent.flowLock.RLock()
248 for _, f := range agent.flows {
249 if fu.FlowMatch(f.flow, fs) {
250 toDelete = append(toDelete, f.flow)
251 toDeleteChunks = append(toDeleteChunks, f)
252 continue
253 }
254 // Check wild card match
255 if fu.FlowMatchesMod(f.flow, mod) {
256 toDelete = append(toDelete, f.flow)
257 toDeleteChunks = append(toDeleteChunks, f)
258 }
259 }
260 agent.flowLock.RUnlock()
261 //Delete the matched flows
262 if len(toDelete) > 0 {
263 logger.Debugw("flowDelete", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "toDelete": len(toDelete)})
264 var meters []*ofp.OfpMeterEntry
265 var flowGroups []*ofp.OfpGroupEntry
266 if ofpMeters, err := agent.ListLogicalDeviceMeters(ctx); err != nil {
267 meters = ofpMeters.Items
268 }
269
270 if groups, err := agent.ListLogicalDeviceFlowGroups(ctx); err != nil {
271 flowGroups = groups.Items
272 }
273
274 for _, fc := range toDeleteChunks {
275 if err := agent.deleteFlowAndUpdateMeterStats(ctx, mod, fc); err != nil {
276 return err
277 }
278 }
279 var flowMetadata voltha.FlowMetadata
280 if err := agent.GetMeterConfig(toDelete, meters, &flowMetadata); err != nil { // This should never happen
281 logger.Error("Meter-referred-in-flows-not-present")
282 return err
283 }
284 var respChnls []coreutils.Response
285 var partialRoute bool
286 var deviceRules *fu.DeviceRules
287 deviceRules, err = agent.flowDecomposer.DecomposeRules(ctx, agent, ofp.Flows{Items: toDelete}, ofp.FlowGroups{Items: flowGroups})
288 if err != nil {
289 // A no route error means no route exists between the ports specified in the flow. This can happen when the
290 // child device is deleted and a request to delete flows from the parent device is received
291 if !errors.Is(err, route.ErrNoRoute) {
292 logger.Errorw("unexpected-error-received", log.Fields{"flows-to-delete": toDelete, "error": err})
293 return err
294 }
295 partialRoute = true
296 }
297
298 // Update the devices
299 if partialRoute {
khenaidoo0db4c812020-05-27 15:27:30 -0400300 respChnls = agent.deleteFlowsFromParentDevice(ofp.Flows{Items: toDelete}, &flowMetadata)
Kent Hagerman3136fbd2020-05-14 10:30:45 -0400301 } else {
khenaidoo0db4c812020-05-27 15:27:30 -0400302 respChnls = agent.deleteFlowsAndGroupsFromDevices(deviceRules, &flowMetadata)
Kent Hagerman3136fbd2020-05-14 10:30:45 -0400303 }
304
305 // Wait for the responses
306 go func() {
307 // Wait for completion
308 if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
309 logger.Errorw("failure-updating-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
310 // TODO: Revert the flow deletion
311 }
312 }()
313 }
314 //TODO: send announcement on delete
315 return nil
316}
317
318//flowDeleteStrict deletes a flow from the flow table of that logical device
319func (agent *LogicalAgent) flowDeleteStrict(ctx context.Context, mod *ofp.OfpFlowMod) error {
320 logger.Debugw("flowDeleteStrict", log.Fields{"mod": mod})
321 if mod == nil {
322 return nil
323 }
324
325 flow, err := fu.FlowStatsEntryFromFlowModMessage(mod)
326 if err != nil {
327 return err
328 }
329 logger.Debugw("flow-id-in-flow-delete-strict", log.Fields{"flowID": flow.Id})
330 agent.flowLock.RLock()
331 flowChunk, ok := agent.flows[flow.Id]
332 agent.flowLock.RUnlock()
333 if !ok {
334 logger.Debugw("Skipping-flow-delete-strict-request. No-flow-found", log.Fields{"flowMod": mod})
335 return nil
336 }
337 //Release the map lock and syncronize per flow
338 flowChunk.lock.Lock()
339 defer flowChunk.lock.Unlock()
340
341 var meters []*ofp.OfpMeterEntry
342 var flowGroups []*ofp.OfpGroupEntry
343 if ofMeters, er := agent.ListLogicalDeviceMeters(ctx); er == nil {
344 meters = ofMeters.Items
345 }
346 if ofGroups, er := agent.ListLogicalDeviceFlowGroups(ctx); er == nil {
347 flowGroups = ofGroups.Items
348 }
349 if changedMeter := agent.updateFlowCountOfMeterStats(ctx, mod, flow, false); !changedMeter {
350 return fmt.Errorf("Cannot delete flow - %s. Meter update failed", flow)
351 }
352
353 var flowMetadata voltha.FlowMetadata
354 flowsToDelete := []*ofp.OfpFlowStats{flowChunk.flow}
355 if err := agent.GetMeterConfig(flowsToDelete, meters, &flowMetadata); err != nil {
356 logger.Error("meter-referred-in-flows-not-present")
357 return err
358 }
359 var respChnls []coreutils.Response
360 var partialRoute bool
361 deviceRules, err := agent.flowDecomposer.DecomposeRules(ctx, agent, ofp.Flows{Items: flowsToDelete}, ofp.FlowGroups{Items: flowGroups})
362 if err != nil {
363 // A no route error means no route exists between the ports specified in the flow. This can happen when the
364 // child device is deleted and a request to delete flows from the parent device is received
365 if !errors.Is(err, route.ErrNoRoute) {
366 logger.Errorw("unexpected-error-received", log.Fields{"flows-to-delete": flowsToDelete, "error": err})
367 return err
368 }
369 partialRoute = true
370 }
371
372 // Update the model
373 if err := agent.removeLogicalDeviceFlow(ctx, flow.Id); err != nil {
374 return err
375 }
376 // Update the devices
377 if partialRoute {
khenaidoo0db4c812020-05-27 15:27:30 -0400378 respChnls = agent.deleteFlowsFromParentDevice(ofp.Flows{Items: flowsToDelete}, &flowMetadata)
Kent Hagerman3136fbd2020-05-14 10:30:45 -0400379 } else {
khenaidoo0db4c812020-05-27 15:27:30 -0400380 respChnls = agent.deleteFlowsAndGroupsFromDevices(deviceRules, &flowMetadata)
Kent Hagerman3136fbd2020-05-14 10:30:45 -0400381 }
382
383 // Wait for completion
384 go func() {
385 if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
386 logger.Warnw("failure-deleting-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
387 //TODO: Revert flow changes
388 }
389 }()
390
391 return nil
392}
393
394//flowModify modifies a flow from the flow table of that logical device
395func (agent *LogicalAgent) flowModify(mod *ofp.OfpFlowMod) error {
396 return errors.New("flowModify not implemented")
397}
398
399//flowModifyStrict deletes a flow from the flow table of that logical device
400func (agent *LogicalAgent) flowModifyStrict(mod *ofp.OfpFlowMod) error {
401 return errors.New("flowModifyStrict not implemented")
402}