blob: 5d35251e211725c94840786d8100aa46f1983d16 [file] [log] [blame]
Kent Hagerman3136fbd2020-05-14 10:30:45 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package device
18
19import (
20 "context"
21 "errors"
22 "fmt"
23 "strconv"
24
25 "github.com/gogo/protobuf/proto"
26 "github.com/opencord/voltha-go/rw_core/route"
27 coreutils "github.com/opencord/voltha-go/rw_core/utils"
28 fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
29 "github.com/opencord/voltha-lib-go/v3/pkg/log"
30 ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
31 "github.com/opencord/voltha-protos/v3/go/voltha"
32 "google.golang.org/grpc/codes"
33 "google.golang.org/grpc/status"
34)
35
36//updateFlowTable updates the flow table of that logical device
37func (agent *LogicalAgent) updateFlowTable(ctx context.Context, flow *ofp.OfpFlowMod) error {
38 logger.Debug("UpdateFlowTable")
39 if flow == nil {
40 return nil
41 }
42
43 if err := agent.generateDeviceRoutesIfNeeded(ctx); err != nil {
44 return err
45 }
46 switch flow.GetCommand() {
47 case ofp.OfpFlowModCommand_OFPFC_ADD:
48 return agent.flowAdd(ctx, flow)
49 case ofp.OfpFlowModCommand_OFPFC_DELETE:
50 return agent.flowDelete(ctx, flow)
51 case ofp.OfpFlowModCommand_OFPFC_DELETE_STRICT:
52 return agent.flowDeleteStrict(ctx, flow)
53 case ofp.OfpFlowModCommand_OFPFC_MODIFY:
54 return agent.flowModify(flow)
55 case ofp.OfpFlowModCommand_OFPFC_MODIFY_STRICT:
56 return agent.flowModifyStrict(flow)
57 }
58 return status.Errorf(codes.Internal,
59 "unhandled-command: lDeviceId:%s, command:%s", agent.logicalDeviceID, flow.GetCommand())
60}
61
62//flowAdd adds a flow to the flow table of that logical device
63func (agent *LogicalAgent) flowAdd(ctx context.Context, mod *ofp.OfpFlowMod) error {
64 logger.Debugw("flowAdd", log.Fields{"flow": mod})
65 if mod == nil {
66 return nil
67 }
68 flow, err := fu.FlowStatsEntryFromFlowModMessage(mod)
69 if err != nil {
70 logger.Errorw("flowAdd-failed", log.Fields{"flowMod": mod, "err": err})
71 return err
72 }
73 var updated bool
74 var changed bool
75 if changed, updated, err = agent.decomposeAndAdd(ctx, flow, mod); err != nil {
76 logger.Errorw("flow-decompose-and-add-failed ", log.Fields{"flowMod": mod, "err": err})
77 return err
78 }
79 if changed && !updated {
80 if dbupdated := agent.updateFlowCountOfMeterStats(ctx, mod, flow, false); !dbupdated {
81 return fmt.Errorf("couldnt-updated-flow-stats-%s", strconv.FormatUint(flow.Id, 10))
82 }
83 }
84 return nil
85
86}
87
88func (agent *LogicalAgent) decomposeAndAdd(ctx context.Context, flow *ofp.OfpFlowStats, mod *ofp.OfpFlowMod) (bool, bool, error) {
89 changed := false
90 updated := false
91 alreadyExist := true
92 var flowToReplace *ofp.OfpFlowStats
93
94 //if flow is not found in the map, create a new entry, otherwise get the existing one.
95 agent.flowLock.Lock()
96 flowChunk, ok := agent.flows[flow.Id]
97 if !ok {
98 flowChunk = &FlowChunk{
99 flow: flow,
100 }
101 agent.flows[flow.Id] = flowChunk
102 alreadyExist = false
103 flowChunk.lock.Lock() //acquire chunk lock before releasing map lock
104 defer flowChunk.lock.Unlock()
105 agent.flowLock.Unlock()
106 } else {
107 agent.flowLock.Unlock() //release map lock before acquiring chunk lock
108 flowChunk.lock.Lock()
109 defer flowChunk.lock.Unlock()
110 }
111
112 if !alreadyExist {
113 flowID := strconv.FormatUint(flow.Id, 10)
114 if err := agent.clusterDataProxy.AddWithID(ctx, "logical_flows/"+agent.logicalDeviceID, flowID, flow); err != nil {
115 logger.Errorw("failed-adding-flow-to-db", log.Fields{"deviceID": agent.logicalDeviceID, "flowID": flowID, "err": err})
116 //Revert the map
117 //TODO: Solve the condition:If we have two flow Adds of the same flow (at least same priority and match) in quick succession
118 //then if the first one fails while the second one was waiting on the flowchunk, we will end up with an instance of flowChunk that is no longer in the map.
119 agent.flowLock.Lock()
120 delete(agent.flows, flow.Id)
121 agent.flowLock.Unlock()
122 return changed, updated, err
123 }
124 }
125 flows := make([]*ofp.OfpFlowStats, 0)
126 updatedFlows := make([]*ofp.OfpFlowStats, 0)
127 checkOverlap := (mod.Flags & uint32(ofp.OfpFlowModFlags_OFPFF_CHECK_OVERLAP)) != 0
128 if checkOverlap {
129 if overlapped := fu.FindOverlappingFlows(flows, mod); len(overlapped) != 0 {
130 // TODO: should this error be notified other than being logged?
131 logger.Warnw("overlapped-flows", log.Fields{"logicaldeviceId": agent.logicalDeviceID})
132 } else {
133 // Add flow
134 changed = true
135 }
136 } else {
137 if alreadyExist {
138 flowToReplace = flowChunk.flow
139 if (mod.Flags & uint32(ofp.OfpFlowModFlags_OFPFF_RESET_COUNTS)) != 0 {
140 flow.ByteCount = flowToReplace.ByteCount
141 flow.PacketCount = flowToReplace.PacketCount
142 }
143 if !proto.Equal(flowToReplace, flow) {
144 changed = true
145 updated = true
146 }
147 } else {
148 changed = true
149 }
150 }
151 logger.Debugw("flowAdd-changed", log.Fields{"changed": changed, "updated": updated})
152 if changed {
153 updatedFlows = append(updatedFlows, flow)
154 var flowMetadata voltha.FlowMetadata
155 lMeters, _ := agent.ListLogicalDeviceMeters(ctx)
156 if err := agent.GetMeterConfig(updatedFlows, lMeters.Items, &flowMetadata); err != nil {
157 logger.Error("Meter-referred-in-flow-not-present")
158 return changed, updated, err
159 }
160 flowGroups, _ := agent.ListLogicalDeviceFlowGroups(ctx)
161 deviceRules, err := agent.flowDecomposer.DecomposeRules(ctx, agent, ofp.Flows{Items: updatedFlows}, *flowGroups)
162 if err != nil {
163 return changed, updated, err
164 }
165
166 logger.Debugw("rules", log.Fields{"rules": deviceRules.String()})
167 // Update store and cache
168 if updated {
169 if err := agent.updateLogicalDeviceFlow(ctx, flow, flowChunk); err != nil {
170 return changed, updated, err
171 }
172 }
173 respChannels := agent.addFlowsAndGroupsToDevices(ctx, deviceRules, &flowMetadata)
174 // Create the go routines to wait
175 go func() {
176 // Wait for completion
177 if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChannels...); res != nil {
178 logger.Infow("failed-to-add-flows-will-attempt-deletion", log.Fields{"errors": res, "logical-device-id": agent.logicalDeviceID})
179 // Revert added flows
180 if err := agent.revertAddedFlows(context.Background(), mod, flow, flowToReplace, deviceRules, &flowMetadata); err != nil {
181 logger.Errorw("failure-to-delete-flows-after-failed-addition", log.Fields{"logical-device-id": agent.logicalDeviceID, "error": err})
182 }
183 }
184 }()
185 }
186 return changed, updated, nil
187}
188
189// revertAddedFlows reverts flows after the flowAdd request has failed. All flows corresponding to that flowAdd request
190// will be reverted, both from the logical devices and the devices.
191func (agent *LogicalAgent) revertAddedFlows(ctx context.Context, mod *ofp.OfpFlowMod, addedFlow *ofp.OfpFlowStats, replacedFlow *ofp.OfpFlowStats, deviceRules *fu.DeviceRules, metadata *voltha.FlowMetadata) error {
192 logger.Debugw("revertFlowAdd", log.Fields{"added-flow": addedFlow, "replaced-flow": replacedFlow, "device-rules": deviceRules, "metadata": metadata})
193
194 agent.flowLock.RLock()
195 flowChunk, ok := agent.flows[addedFlow.Id]
196 agent.flowLock.RUnlock()
197 if !ok {
198 // Not found - do nothing
199 log.Debugw("flow-not-found", log.Fields{"added-flow": addedFlow})
200 return nil
201 }
202 //Leave the map lock and syncronize per flow
203 flowChunk.lock.Lock()
204 defer flowChunk.lock.Unlock()
205
206 if replacedFlow != nil {
207 if err := agent.updateLogicalDeviceFlow(ctx, replacedFlow, flowChunk); err != nil {
208 return err
209 }
210 } else {
211 if err := agent.removeLogicalDeviceFlow(ctx, addedFlow.Id); err != nil {
212 return err
213 }
214 }
215 // Revert meters
216 if changedMeterStats := agent.updateFlowCountOfMeterStats(ctx, mod, addedFlow, true); !changedMeterStats {
217 return fmt.Errorf("Unable-to-revert-meterstats-for-flow-%s", strconv.FormatUint(addedFlow.Id, 10))
218 }
219
220 // Update the devices
221 respChnls := agent.deleteFlowsAndGroupsFromDevices(ctx, deviceRules, metadata)
222
223 // Wait for the responses
224 go func() {
225 // Since this action is taken following an add failure, we may also receive a failure for the revert
226 if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
227 logger.Warnw("failure-reverting-added-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
228 }
229 }()
230
231 return nil
232}
233
234//flowDelete deletes a flow from the flow table of that logical device
235func (agent *LogicalAgent) flowDelete(ctx context.Context, mod *ofp.OfpFlowMod) error {
236 logger.Debug("flowDelete")
237 if mod == nil {
238 return nil
239 }
240
241 fs, err := fu.FlowStatsEntryFromFlowModMessage(mod)
242 if err != nil {
243 return err
244 }
245
246 //build a list of what to delete
247 toDelete := make([]*ofp.OfpFlowStats, 0)
248 toDeleteChunks := make([]*FlowChunk, 0)
249 //Lock the map to search the matched flows
250 agent.flowLock.RLock()
251 for _, f := range agent.flows {
252 if fu.FlowMatch(f.flow, fs) {
253 toDelete = append(toDelete, f.flow)
254 toDeleteChunks = append(toDeleteChunks, f)
255 continue
256 }
257 // Check wild card match
258 if fu.FlowMatchesMod(f.flow, mod) {
259 toDelete = append(toDelete, f.flow)
260 toDeleteChunks = append(toDeleteChunks, f)
261 }
262 }
263 agent.flowLock.RUnlock()
264 //Delete the matched flows
265 if len(toDelete) > 0 {
266 logger.Debugw("flowDelete", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "toDelete": len(toDelete)})
267 var meters []*ofp.OfpMeterEntry
268 var flowGroups []*ofp.OfpGroupEntry
269 if ofpMeters, err := agent.ListLogicalDeviceMeters(ctx); err != nil {
270 meters = ofpMeters.Items
271 }
272
273 if groups, err := agent.ListLogicalDeviceFlowGroups(ctx); err != nil {
274 flowGroups = groups.Items
275 }
276
277 for _, fc := range toDeleteChunks {
278 if err := agent.deleteFlowAndUpdateMeterStats(ctx, mod, fc); err != nil {
279 return err
280 }
281 }
282 var flowMetadata voltha.FlowMetadata
283 if err := agent.GetMeterConfig(toDelete, meters, &flowMetadata); err != nil { // This should never happen
284 logger.Error("Meter-referred-in-flows-not-present")
285 return err
286 }
287 var respChnls []coreutils.Response
288 var partialRoute bool
289 var deviceRules *fu.DeviceRules
290 deviceRules, err = agent.flowDecomposer.DecomposeRules(ctx, agent, ofp.Flows{Items: toDelete}, ofp.FlowGroups{Items: flowGroups})
291 if err != nil {
292 // A no route error means no route exists between the ports specified in the flow. This can happen when the
293 // child device is deleted and a request to delete flows from the parent device is received
294 if !errors.Is(err, route.ErrNoRoute) {
295 logger.Errorw("unexpected-error-received", log.Fields{"flows-to-delete": toDelete, "error": err})
296 return err
297 }
298 partialRoute = true
299 }
300
301 // Update the devices
302 if partialRoute {
303 respChnls = agent.deleteFlowsFromParentDevice(ctx, ofp.Flows{Items: toDelete}, &flowMetadata)
304 } else {
305 respChnls = agent.deleteFlowsAndGroupsFromDevices(ctx, deviceRules, &flowMetadata)
306 }
307
308 // Wait for the responses
309 go func() {
310 // Wait for completion
311 if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
312 logger.Errorw("failure-updating-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
313 // TODO: Revert the flow deletion
314 }
315 }()
316 }
317 //TODO: send announcement on delete
318 return nil
319}
320
321//flowDeleteStrict deletes a flow from the flow table of that logical device
322func (agent *LogicalAgent) flowDeleteStrict(ctx context.Context, mod *ofp.OfpFlowMod) error {
323 logger.Debugw("flowDeleteStrict", log.Fields{"mod": mod})
324 if mod == nil {
325 return nil
326 }
327
328 flow, err := fu.FlowStatsEntryFromFlowModMessage(mod)
329 if err != nil {
330 return err
331 }
332 logger.Debugw("flow-id-in-flow-delete-strict", log.Fields{"flowID": flow.Id})
333 agent.flowLock.RLock()
334 flowChunk, ok := agent.flows[flow.Id]
335 agent.flowLock.RUnlock()
336 if !ok {
337 logger.Debugw("Skipping-flow-delete-strict-request. No-flow-found", log.Fields{"flowMod": mod})
338 return nil
339 }
340 //Release the map lock and syncronize per flow
341 flowChunk.lock.Lock()
342 defer flowChunk.lock.Unlock()
343
344 var meters []*ofp.OfpMeterEntry
345 var flowGroups []*ofp.OfpGroupEntry
346 if ofMeters, er := agent.ListLogicalDeviceMeters(ctx); er == nil {
347 meters = ofMeters.Items
348 }
349 if ofGroups, er := agent.ListLogicalDeviceFlowGroups(ctx); er == nil {
350 flowGroups = ofGroups.Items
351 }
352 if changedMeter := agent.updateFlowCountOfMeterStats(ctx, mod, flow, false); !changedMeter {
353 return fmt.Errorf("Cannot delete flow - %s. Meter update failed", flow)
354 }
355
356 var flowMetadata voltha.FlowMetadata
357 flowsToDelete := []*ofp.OfpFlowStats{flowChunk.flow}
358 if err := agent.GetMeterConfig(flowsToDelete, meters, &flowMetadata); err != nil {
359 logger.Error("meter-referred-in-flows-not-present")
360 return err
361 }
362 var respChnls []coreutils.Response
363 var partialRoute bool
364 deviceRules, err := agent.flowDecomposer.DecomposeRules(ctx, agent, ofp.Flows{Items: flowsToDelete}, ofp.FlowGroups{Items: flowGroups})
365 if err != nil {
366 // A no route error means no route exists between the ports specified in the flow. This can happen when the
367 // child device is deleted and a request to delete flows from the parent device is received
368 if !errors.Is(err, route.ErrNoRoute) {
369 logger.Errorw("unexpected-error-received", log.Fields{"flows-to-delete": flowsToDelete, "error": err})
370 return err
371 }
372 partialRoute = true
373 }
374
375 // Update the model
376 if err := agent.removeLogicalDeviceFlow(ctx, flow.Id); err != nil {
377 return err
378 }
379 // Update the devices
380 if partialRoute {
381 respChnls = agent.deleteFlowsFromParentDevice(ctx, ofp.Flows{Items: flowsToDelete}, &flowMetadata)
382 } else {
383 respChnls = agent.deleteFlowsAndGroupsFromDevices(ctx, deviceRules, &flowMetadata)
384 }
385
386 // Wait for completion
387 go func() {
388 if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
389 logger.Warnw("failure-deleting-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
390 //TODO: Revert flow changes
391 }
392 }()
393
394 return nil
395}
396
397//flowModify modifies a flow from the flow table of that logical device
398func (agent *LogicalAgent) flowModify(mod *ofp.OfpFlowMod) error {
399 return errors.New("flowModify not implemented")
400}
401
402//flowModifyStrict deletes a flow from the flow table of that logical device
403func (agent *LogicalAgent) flowModifyStrict(mod *ofp.OfpFlowMod) error {
404 return errors.New("flowModifyStrict not implemented")
405}