blob: 0364341e2ed7af63fd503bc91fddf990aec61a95 [file] [log] [blame]
Naveen Sampath04696f72022-06-13 15:19:14 +05301/*
2* Copyright 2022-present Open Networking Foundation
3* Licensed under the Apache License, Version 2.0 (the "License");
4* you may not use this file except in compliance with the License.
5* You may obtain a copy of the License at
6*
7* http://www.apache.org/licenses/LICENSE-2.0
8*
9* Unless required by applicable law or agreed to in writing, software
10* distributed under the License is distributed on an "AS IS" BASIS,
11* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12* See the License for the specific language governing permissions and
13* limitations under the License.
14*/
15
16package controller
17
18import (
19 "context"
20 infraerror "voltha-go-controller/internal/pkg/errorcodes"
21 infraerrorcode "voltha-go-controller/internal/pkg/errorcodes/service"
22 "time"
23
24 "voltha-go-controller/internal/pkg/of"
25 "github.com/opencord/voltha-lib-go/v7/pkg/log"
26)
27
28const (
29 //MaxRetryCount - Maximum retry attempts on failure
30 MaxRetryCount int = 1
31)
32
33// AddFlowsTask structure
34type AddFlowsTask struct {
35 taskID uint8
36 ctx context.Context
37 flow *of.VoltFlow
38 device *Device
39 timestamp string
40}
41
42// NewAddFlowsTask is constructor for AddFlowsTask
43func NewAddFlowsTask(ctx context.Context, flow *of.VoltFlow, device *Device) *AddFlowsTask {
44 var aft AddFlowsTask
45 aft.device = device
46 aft.flow = flow
47 aft.ctx = ctx
48 tstamp := (time.Now()).Format(time.RFC3339Nano)
49 aft.timestamp = tstamp
50 return &aft
51}
52
53// Name to add flow task
54func (aft *AddFlowsTask) Name() string {
55 for _, flow := range aft.flow.SubFlows {
56 logger.Infow(ctx, "Flow Cookies", log.Fields{"Cookie": flow.Cookie})
57 }
58 return "Add Flows Task"
59}
60
61// TaskID to return task ID
62func (aft *AddFlowsTask) TaskID() uint8 {
63 return aft.taskID
64}
65
66// Timestamp to return timestamp
67func (aft *AddFlowsTask) Timestamp() string {
68 return aft.timestamp
69}
70
71// Stop to stop the add flow task
72func (aft *AddFlowsTask) Stop() {
73}
74
75// Start to start adding flow task
76func (aft *AddFlowsTask) Start(ctx context.Context, taskID uint8) error {
77 var err error
78 aft.taskID = taskID
79 aft.ctx = ctx
80 flowsToProcess := make(map[uint64]*of.VoltSubFlow)
81 flowsPresent := 0
82 // First add/delete the flows first locally before passing them to actual device
83 for _, flow := range aft.flow.SubFlows {
84 logger.Infow(ctx, "Flow Mod Request", log.Fields{"Cookie": flow.Cookie, "Oper": aft.flow.Command, "Port": aft.flow.PortID})
85 if aft.flow.Command == of.CommandAdd {
86 flow.State = of.FlowAddPending
87 if err := aft.device.AddFlow(flow); err != nil {
88 logger.Warnw(ctx, "Add Flow Error", log.Fields{"Cookie": flow.Cookie, "Reason": err.Error()})
89
90 // If flow already exists in cache, check for flow state
91 // If Success: Trigger success FLow Indication
92 // if Failure: Continue process, so add-retry happens
93 if err.Error() == ErrDuplicateFlow {
94 dbFlow, _ := aft.device.GetFlow(flow.Cookie)
95 if dbFlow.State == of.FlowAddSuccess {
96 aft.device.triggerFlowNotification(flow.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil)
97 flowsPresent++
98 }
99 }
100 }
101 flowsToProcess[flow.Cookie] = flow
102 } else {
103 dbFlow, ok := aft.device.GetFlow(flow.Cookie)
104 if !ok {
105 logger.Warnw(ctx, "Delete Flow Error: Flow Does not Exist", log.Fields{"Cookie": flow.Cookie, "Device": aft.device.ID})
106 } else {
107 // dbFlow.State = of.FlowDelPending
108 // aft.device.AddFlowToDb(dbFlow)
109 flowsToProcess[flow.Cookie] = dbFlow
110 }
111 aft.device.triggerFlowNotification(flow.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil)
112 }
113 }
114
115 if flowsPresent == len(aft.flow.SubFlows) {
116 logger.Warn(ctx, "All Flows already present in database. Skipping Flow Push to SB")
117 }
118
119 // PortName and PortID are used for validation of PortID, whether it is still valid and associated with old PortName or
120 // PortID got assigned to another PortName. If the condition met, skip these flow update to voltha core
121 if aft.flow.PortName != "" && aft.flow.PortID != 0 {
122 portName, _ := aft.device.GetPortName(aft.flow.PortID)
123 if aft.flow.PortName != portName && portName != "" {
124 for _, flow := range aft.flow.SubFlows {
125 logger.Errorw(ctx, "Skip Flow Update", log.Fields{"Reason": "Port Deleted", "PortName": aft.flow.PortName, "PortNo": aft.flow.PortID, "Cookie": flow.Cookie, "Operation": aft.flow.Command})
126 if aft.flow.Command == of.CommandDel {
127 aft.device.triggerFlowNotification(flow.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil)
128 }
129 }
130 return nil
131 }
132 }
133
134 if !aft.device.isSBOperAllowed(aft.flow.ForceAction) {
135 for _, flow := range aft.flow.SubFlows {
136 logger.Errorw(ctx, "Skipping Flow Table Update", log.Fields{"Reason": "Device State not UP", "State": aft.device.State, "Cookie": flow.Cookie, "Operation": aft.flow.Command})
137 }
138 return nil
139 }
140
141 flows := of.ProcessVoltFlow(aft.device.ID, aft.flow.Command, flowsToProcess)
142 for _, flow := range flows {
143 attempt := 0
144 if vc := aft.device.VolthaClient(); vc != nil {
145 for {
146 if _, err = vc.UpdateLogicalDeviceFlowTable(aft.ctx, flow); err != nil {
147 logger.Errorw(ctx, "Update Flow Table Failed", log.Fields{"Cookie": flow.GetFlowMod().Cookie, "Reason": err.Error(), "Operation": aft.flow.Command})
148 statusCode, _ := infraerror.GetErrorInfo(err)
149
150 // Retry on flow delete failure once.
151 // Do NOT retry incase of failure with reason: Entry Not Found
152 if aft.flow.Command == of.CommandDel && statusCode != uint32(infraerrorcode.ErrNotExists) {
153 if attempt != MaxRetryCount {
154 logger.Errorw(ctx, "Retrying Flow Delete", log.Fields{"Cookie": flow.GetFlowMod().Cookie, "Attempt": attempt})
155 attempt++
156 continue
157 }
158 logger.Errorw(ctx, "Flow Delete failed even aft max retries", log.Fields{"Flow": flow, "Attempt": attempt})
159 }
160 }
161 break
162 }
163 aft.device.triggerFlowNotification(flow.FlowMod.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil)
164
165 } else {
166 logger.Errorw(ctx, "Update Flow Table Failed: Voltha Client Unavailable", log.Fields{"Flow": flow})
167 }
168 }
169 return nil
170}
171
172func isFlowOperSuccess(statusCode uint32, oper of.Command) bool {
173 volthaErrorCode := infraerrorcode.ErrorCode(statusCode)
174
175 if volthaErrorCode == infraerrorcode.ErrOk {
176 return true
177 }
178
179 if oper == of.CommandAdd && volthaErrorCode == infraerrorcode.ErrAlreadyExists {
180 return true
181
182 } else if oper == of.CommandDel && volthaErrorCode == infraerrorcode.ErrNotExists {
183 return true
184 }
185 return false
186}
187
188// func getBwAvailInfo(bwAvailInfo []*voltha.ResponseMsg) of.BwAvailDetails {
189// var bwInfo of.BwAvailDetails
190// // convert the bw details sent from olt to a struct
191// // received msg format:
192// // additional_data[Data{ResponseMsg
193// //{"key":"prevBW","value":"111111"},
194// //{"key":"presentBW","value":"10000"}]
195// if len(bwAvailInfo) > 1 {
196// prevBwResp := bwAvailInfo[0]
197// if prevBwResp.Key == of.PrevBwInfo {
198// _, err := strconv.Atoi(prevBwResp.Val)
199// if err == nil {
200// bwInfo.PrevBw = prevBwResp.Val
201// }
202// }
203
204// presentBwResp := bwAvailInfo[1]
205// if presentBwResp.Key == of.PresentBwInfo {
206// _, err := strconv.Atoi(prevBwResp.Val)
207// if err == nil {
208// bwInfo.PresentBw = presentBwResp.Val
209// }
210// }
211// if bwInfo.PresentBw == bwInfo.PrevBw {
212// return of.BwAvailDetails{}
213// }
214// logger.Infow(ctx, "Bandwidth-consumed-info", log.Fields{"BwConsumed": bwInfo})
215// }
216// return bwInfo
217// }