blob: b7d42cdd5221c2c22ea1331d5d2459e242b724da [file] [log] [blame]
Naveen Sampath04696f72022-06-13 15:19:14 +05301/*
2* Copyright 2022-present Open Networking Foundation
3* Licensed under the Apache License, Version 2.0 (the "License");
4* you may not use this file except in compliance with the License.
5* You may obtain a copy of the License at
6*
7* http://www.apache.org/licenses/LICENSE-2.0
8*
9* Unless required by applicable law or agreed to in writing, software
10* distributed under the License is distributed on an "AS IS" BASIS,
11* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12* See the License for the specific language governing permissions and
13* limitations under the License.
vinokuma926cb3e2023-03-29 11:41:06 +053014 */
Naveen Sampath04696f72022-06-13 15:19:14 +053015
16package controller
17
18import (
19 "context"
vinokuma926cb3e2023-03-29 11:41:06 +053020 "time"
Naveen Sampath04696f72022-06-13 15:19:14 +053021 infraerror "voltha-go-controller/internal/pkg/errorcodes"
22 infraerrorcode "voltha-go-controller/internal/pkg/errorcodes/service"
Naveen Sampath04696f72022-06-13 15:19:14 +053023
24 "voltha-go-controller/internal/pkg/of"
Tinoj Joseph1d108322022-07-13 10:07:39 +053025 "voltha-go-controller/log"
Naveen Sampath04696f72022-06-13 15:19:14 +053026)
27
28const (
vinokuma926cb3e2023-03-29 11:41:06 +053029 // MaxRetryCount - Maximum retry attempts on failure
Naveen Sampath04696f72022-06-13 15:19:14 +053030 MaxRetryCount int = 1
31)
32
33// AddFlowsTask structure
34type AddFlowsTask struct {
Naveen Sampath04696f72022-06-13 15:19:14 +053035 ctx context.Context
36 flow *of.VoltFlow
37 device *Device
38 timestamp string
vinokuma926cb3e2023-03-29 11:41:06 +053039 taskID uint8
Naveen Sampath04696f72022-06-13 15:19:14 +053040}
41
42// NewAddFlowsTask is constructor for AddFlowsTask
43func NewAddFlowsTask(ctx context.Context, flow *of.VoltFlow, device *Device) *AddFlowsTask {
44 var aft AddFlowsTask
45 aft.device = device
46 aft.flow = flow
47 aft.ctx = ctx
48 tstamp := (time.Now()).Format(time.RFC3339Nano)
49 aft.timestamp = tstamp
50 return &aft
51}
52
53// Name to add flow task
54func (aft *AddFlowsTask) Name() string {
55 for _, flow := range aft.flow.SubFlows {
Akash Sonie863fe42023-11-30 14:35:01 +053056 logger.Debugw(ctx, "Flow Cookies", log.Fields{"Cookie": flow.Cookie})
Naveen Sampath04696f72022-06-13 15:19:14 +053057 }
58 return "Add Flows Task"
59}
60
61// TaskID to return task ID
62func (aft *AddFlowsTask) TaskID() uint8 {
63 return aft.taskID
64}
65
66// Timestamp to return timestamp
67func (aft *AddFlowsTask) Timestamp() string {
68 return aft.timestamp
69}
70
71// Stop to stop the add flow task
72func (aft *AddFlowsTask) Stop() {
73}
74
75// Start to start adding flow task
76func (aft *AddFlowsTask) Start(ctx context.Context, taskID uint8) error {
77 var err error
78 aft.taskID = taskID
79 aft.ctx = ctx
80 flowsToProcess := make(map[uint64]*of.VoltSubFlow)
81 flowsPresent := 0
82 // First add/delete the flows first locally before passing them to actual device
83 for _, flow := range aft.flow.SubFlows {
Akash Sonie863fe42023-11-30 14:35:01 +053084 logger.Debugw(ctx, "Flow Mod Request", log.Fields{"Cookie": flow.Cookie, "Oper": aft.flow.Command, "Port": aft.flow.PortID})
Naveen Sampath04696f72022-06-13 15:19:14 +053085 if aft.flow.Command == of.CommandAdd {
86 flow.State = of.FlowAddPending
vinokuma926cb3e2023-03-29 11:41:06 +053087 if err = aft.device.AddFlow(ctx, flow); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +053088 logger.Warnw(ctx, "Add Flow Error", log.Fields{"Cookie": flow.Cookie, "Reason": err.Error()})
89
90 // If flow already exists in cache, check for flow state
91 // If Success: Trigger success FLow Indication
92 // if Failure: Continue process, so add-retry happens
93 if err.Error() == ErrDuplicateFlow {
94 dbFlow, _ := aft.device.GetFlow(flow.Cookie)
95 if dbFlow.State == of.FlowAddSuccess {
Sridhar Ravindra3ec14232024-01-01 19:11:48 +053096 aft.device.triggerFlowNotification(ctx, flow.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil, true)
Naveen Sampath04696f72022-06-13 15:19:14 +053097 flowsPresent++
Akash Soni38f0c242024-01-24 11:37:34 +053098 continue
Naveen Sampath04696f72022-06-13 15:19:14 +053099 }
100 }
101 }
102 flowsToProcess[flow.Cookie] = flow
103 } else {
104 dbFlow, ok := aft.device.GetFlow(flow.Cookie)
105 if !ok {
106 logger.Warnw(ctx, "Delete Flow Error: Flow Does not Exist", log.Fields{"Cookie": flow.Cookie, "Device": aft.device.ID})
107 } else {
108 // dbFlow.State = of.FlowDelPending
109 // aft.device.AddFlowToDb(dbFlow)
110 flowsToProcess[flow.Cookie] = dbFlow
111 }
Sridhar Ravindra3ec14232024-01-01 19:11:48 +0530112 aft.device.triggerFlowNotification(ctx, flow.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil, false)
Naveen Sampath04696f72022-06-13 15:19:14 +0530113 }
114 }
115
116 if flowsPresent == len(aft.flow.SubFlows) {
117 logger.Warn(ctx, "All Flows already present in database. Skipping Flow Push to SB")
Akash Soni38f0c242024-01-24 11:37:34 +0530118 return nil
Naveen Sampath04696f72022-06-13 15:19:14 +0530119 }
120
121 // PortName and PortID are used for validation of PortID, whether it is still valid and associated with old PortName or
122 // PortID got assigned to another PortName. If the condition met, skip these flow update to voltha core
123 if aft.flow.PortName != "" && aft.flow.PortID != 0 {
124 portName, _ := aft.device.GetPortName(aft.flow.PortID)
125 if aft.flow.PortName != portName && portName != "" {
126 for _, flow := range aft.flow.SubFlows {
Akash Soni6168f312023-05-18 20:57:33 +0530127 logger.Warnw(ctx, "Skip Flow Update", log.Fields{"Reason": "Port Deleted", "PortName": aft.flow.PortName, "PortNo": aft.flow.PortID, "Cookie": flow.Cookie, "Operation": aft.flow.Command})
Naveen Sampath04696f72022-06-13 15:19:14 +0530128 if aft.flow.Command == of.CommandDel {
Sridhar Ravindra3ec14232024-01-01 19:11:48 +0530129 aft.device.triggerFlowNotification(ctx, flow.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil, true)
Naveen Sampath04696f72022-06-13 15:19:14 +0530130 }
131 }
132 return nil
133 }
134 }
135
136 if !aft.device.isSBOperAllowed(aft.flow.ForceAction) {
137 for _, flow := range aft.flow.SubFlows {
Akash Soni6168f312023-05-18 20:57:33 +0530138 logger.Warnw(ctx, "Skipping Flow Table Update", log.Fields{"Reason": "Device State not UP", "State": aft.device.State, "Cookie": flow.Cookie, "Operation": aft.flow.Command})
Naveen Sampath04696f72022-06-13 15:19:14 +0530139 }
140 return nil
141 }
142
143 flows := of.ProcessVoltFlow(aft.device.ID, aft.flow.Command, flowsToProcess)
144 for _, flow := range flows {
145 attempt := 0
146 if vc := aft.device.VolthaClient(); vc != nil {
147 for {
148 if _, err = vc.UpdateLogicalDeviceFlowTable(aft.ctx, flow); err != nil {
149 logger.Errorw(ctx, "Update Flow Table Failed", log.Fields{"Cookie": flow.GetFlowMod().Cookie, "Reason": err.Error(), "Operation": aft.flow.Command})
150 statusCode, _ := infraerror.GetErrorInfo(err)
151
152 // Retry on flow delete failure once.
153 // Do NOT retry incase of failure with reason: Entry Not Found
154 if aft.flow.Command == of.CommandDel && statusCode != uint32(infraerrorcode.ErrNotExists) {
155 if attempt != MaxRetryCount {
Akash Soni6168f312023-05-18 20:57:33 +0530156 logger.Warnw(ctx, "Retrying Flow Delete", log.Fields{"Cookie": flow.GetFlowMod().Cookie, "Attempt": attempt})
Naveen Sampath04696f72022-06-13 15:19:14 +0530157 attempt++
158 continue
159 }
160 logger.Errorw(ctx, "Flow Delete failed even aft max retries", log.Fields{"Flow": flow, "Attempt": attempt})
161 }
162 }
163 break
164 }
Sridhar Ravindra3ec14232024-01-01 19:11:48 +0530165 aft.device.triggerFlowNotification(ctx, flow.FlowMod.Cookie, aft.flow.Command, of.BwAvailDetails{}, err, true)
Naveen Sampath04696f72022-06-13 15:19:14 +0530166 } else {
167 logger.Errorw(ctx, "Update Flow Table Failed: Voltha Client Unavailable", log.Fields{"Flow": flow})
168 }
169 }
170 return nil
171}
172
173func isFlowOperSuccess(statusCode uint32, oper of.Command) bool {
174 volthaErrorCode := infraerrorcode.ErrorCode(statusCode)
175
176 if volthaErrorCode == infraerrorcode.ErrOk {
177 return true
178 }
179
180 if oper == of.CommandAdd && volthaErrorCode == infraerrorcode.ErrAlreadyExists {
181 return true
Naveen Sampath04696f72022-06-13 15:19:14 +0530182 } else if oper == of.CommandDel && volthaErrorCode == infraerrorcode.ErrNotExists {
183 return true
184 }
185 return false
186}
187
188// func getBwAvailInfo(bwAvailInfo []*voltha.ResponseMsg) of.BwAvailDetails {
189// var bwInfo of.BwAvailDetails
190// // convert the bw details sent from olt to a struct
191// // received msg format:
192// // additional_data[Data{ResponseMsg
193// //{"key":"prevBW","value":"111111"},
194// //{"key":"presentBW","value":"10000"}]
195// if len(bwAvailInfo) > 1 {
196// prevBwResp := bwAvailInfo[0]
197// if prevBwResp.Key == of.PrevBwInfo {
198// _, err := strconv.Atoi(prevBwResp.Val)
199// if err == nil {
200// bwInfo.PrevBw = prevBwResp.Val
201// }
202// }
203
204// presentBwResp := bwAvailInfo[1]
205// if presentBwResp.Key == of.PresentBwInfo {
206// _, err := strconv.Atoi(prevBwResp.Val)
207// if err == nil {
208// bwInfo.PresentBw = presentBwResp.Val
209// }
210// }
211// if bwInfo.PresentBw == bwInfo.PrevBw {
212// return of.BwAvailDetails{}
213// }
214// logger.Infow(ctx, "Bandwidth-consumed-info", log.Fields{"BwConsumed": bwInfo})
215// }
216// return bwInfo
217// }