blob: a4f8a8da7ead5162c24a6da885b50354fb922d6e [file] [log] [blame]
Naveen Sampath04696f72022-06-13 15:19:14 +05301/*
2* Copyright 2022-present Open Networking Foundation
3* Licensed under the Apache License, Version 2.0 (the "License");
4* you may not use this file except in compliance with the License.
5* You may obtain a copy of the License at
6*
7* http://www.apache.org/licenses/LICENSE-2.0
8*
9* Unless required by applicable law or agreed to in writing, software
10* distributed under the License is distributed on an "AS IS" BASIS,
11* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12* See the License for the specific language governing permissions and
13* limitations under the License.
vinokuma926cb3e2023-03-29 11:41:06 +053014 */
Naveen Sampath04696f72022-06-13 15:19:14 +053015
16package controller
17
18import (
19 "context"
vinokuma926cb3e2023-03-29 11:41:06 +053020 "time"
Naveen Sampath04696f72022-06-13 15:19:14 +053021 infraerror "voltha-go-controller/internal/pkg/errorcodes"
22 infraerrorcode "voltha-go-controller/internal/pkg/errorcodes/service"
Naveen Sampath04696f72022-06-13 15:19:14 +053023
24 "voltha-go-controller/internal/pkg/of"
Tinoj Joseph1d108322022-07-13 10:07:39 +053025 "voltha-go-controller/log"
Naveen Sampath04696f72022-06-13 15:19:14 +053026)
27
28const (
vinokuma926cb3e2023-03-29 11:41:06 +053029 // MaxRetryCount - Maximum retry attempts on failure
Naveen Sampath04696f72022-06-13 15:19:14 +053030 MaxRetryCount int = 1
31)
32
33// AddFlowsTask structure
34type AddFlowsTask struct {
Naveen Sampath04696f72022-06-13 15:19:14 +053035 ctx context.Context
36 flow *of.VoltFlow
37 device *Device
38 timestamp string
vinokuma926cb3e2023-03-29 11:41:06 +053039 taskID uint8
Naveen Sampath04696f72022-06-13 15:19:14 +053040}
41
42// NewAddFlowsTask is constructor for AddFlowsTask
43func NewAddFlowsTask(ctx context.Context, flow *of.VoltFlow, device *Device) *AddFlowsTask {
44 var aft AddFlowsTask
45 aft.device = device
46 aft.flow = flow
47 aft.ctx = ctx
48 tstamp := (time.Now()).Format(time.RFC3339Nano)
49 aft.timestamp = tstamp
50 return &aft
51}
52
53// Name to add flow task
54func (aft *AddFlowsTask) Name() string {
55 for _, flow := range aft.flow.SubFlows {
Akash Sonie863fe42023-11-30 14:35:01 +053056 logger.Debugw(ctx, "Flow Cookies", log.Fields{"Cookie": flow.Cookie})
Naveen Sampath04696f72022-06-13 15:19:14 +053057 }
58 return "Add Flows Task"
59}
60
61// TaskID to return task ID
62func (aft *AddFlowsTask) TaskID() uint8 {
63 return aft.taskID
64}
65
66// Timestamp to return timestamp
67func (aft *AddFlowsTask) Timestamp() string {
68 return aft.timestamp
69}
70
71// Stop to stop the add flow task
72func (aft *AddFlowsTask) Stop() {
73}
74
75// Start to start adding flow task
76func (aft *AddFlowsTask) Start(ctx context.Context, taskID uint8) error {
77 var err error
78 aft.taskID = taskID
79 aft.ctx = ctx
80 flowsToProcess := make(map[uint64]*of.VoltSubFlow)
81 flowsPresent := 0
82 // First add/delete the flows first locally before passing them to actual device
83 for _, flow := range aft.flow.SubFlows {
Akash Sonie863fe42023-11-30 14:35:01 +053084 logger.Debugw(ctx, "Flow Mod Request", log.Fields{"Cookie": flow.Cookie, "Oper": aft.flow.Command, "Port": aft.flow.PortID})
Naveen Sampath04696f72022-06-13 15:19:14 +053085 if aft.flow.Command == of.CommandAdd {
86 flow.State = of.FlowAddPending
vinokuma926cb3e2023-03-29 11:41:06 +053087 if err = aft.device.AddFlow(ctx, flow); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +053088 logger.Warnw(ctx, "Add Flow Error", log.Fields{"Cookie": flow.Cookie, "Reason": err.Error()})
89
90 // If flow already exists in cache, check for flow state
91 // If Success: Trigger success FLow Indication
92 // if Failure: Continue process, so add-retry happens
93 if err.Error() == ErrDuplicateFlow {
94 dbFlow, _ := aft.device.GetFlow(flow.Cookie)
95 if dbFlow.State == of.FlowAddSuccess {
Akash Sonief452f12024-12-12 18:20:28 +053096 aft.device.triggerFlowNotification(ctx, flow.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil)
Naveen Sampath04696f72022-06-13 15:19:14 +053097 flowsPresent++
Akash Soni38f0c242024-01-24 11:37:34 +053098 continue
Naveen Sampath04696f72022-06-13 15:19:14 +053099 }
100 }
101 }
102 flowsToProcess[flow.Cookie] = flow
103 } else {
104 dbFlow, ok := aft.device.GetFlow(flow.Cookie)
105 if !ok {
106 logger.Warnw(ctx, "Delete Flow Error: Flow Does not Exist", log.Fields{"Cookie": flow.Cookie, "Device": aft.device.ID})
107 } else {
108 // dbFlow.State = of.FlowDelPending
109 // aft.device.AddFlowToDb(dbFlow)
110 flowsToProcess[flow.Cookie] = dbFlow
111 }
Akash Sonief452f12024-12-12 18:20:28 +0530112 // Below call will delete flow from DB and will not allow to maintain flow count and state. Hence commenting the below call.
113 //aft.device.triggerFlowNotification(ctx, flow.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil, false)
Naveen Sampath04696f72022-06-13 15:19:14 +0530114 }
115 }
116
117 if flowsPresent == len(aft.flow.SubFlows) {
118 logger.Warn(ctx, "All Flows already present in database. Skipping Flow Push to SB")
Akash Soni38f0c242024-01-24 11:37:34 +0530119 return nil
Naveen Sampath04696f72022-06-13 15:19:14 +0530120 }
121
122 // PortName and PortID are used for validation of PortID, whether it is still valid and associated with old PortName or
123 // PortID got assigned to another PortName. If the condition met, skip these flow update to voltha core
124 if aft.flow.PortName != "" && aft.flow.PortID != 0 {
125 portName, _ := aft.device.GetPortName(aft.flow.PortID)
126 if aft.flow.PortName != portName && portName != "" {
127 for _, flow := range aft.flow.SubFlows {
Akash Soni6168f312023-05-18 20:57:33 +0530128 logger.Warnw(ctx, "Skip Flow Update", log.Fields{"Reason": "Port Deleted", "PortName": aft.flow.PortName, "PortNo": aft.flow.PortID, "Cookie": flow.Cookie, "Operation": aft.flow.Command})
Naveen Sampath04696f72022-06-13 15:19:14 +0530129 if aft.flow.Command == of.CommandDel {
Akash Sonief452f12024-12-12 18:20:28 +0530130 aft.device.triggerFlowNotification(ctx, flow.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil)
Naveen Sampath04696f72022-06-13 15:19:14 +0530131 }
132 }
133 return nil
134 }
135 }
136
137 if !aft.device.isSBOperAllowed(aft.flow.ForceAction) {
138 for _, flow := range aft.flow.SubFlows {
Akash Soni6168f312023-05-18 20:57:33 +0530139 logger.Warnw(ctx, "Skipping Flow Table Update", log.Fields{"Reason": "Device State not UP", "State": aft.device.State, "Cookie": flow.Cookie, "Operation": aft.flow.Command})
Naveen Sampath04696f72022-06-13 15:19:14 +0530140 }
141 return nil
142 }
143
144 flows := of.ProcessVoltFlow(aft.device.ID, aft.flow.Command, flowsToProcess)
145 for _, flow := range flows {
146 attempt := 0
147 if vc := aft.device.VolthaClient(); vc != nil {
148 for {
149 if _, err = vc.UpdateLogicalDeviceFlowTable(aft.ctx, flow); err != nil {
150 logger.Errorw(ctx, "Update Flow Table Failed", log.Fields{"Cookie": flow.GetFlowMod().Cookie, "Reason": err.Error(), "Operation": aft.flow.Command})
151 statusCode, _ := infraerror.GetErrorInfo(err)
152
153 // Retry on flow delete failure once.
154 // Do NOT retry incase of failure with reason: Entry Not Found
155 if aft.flow.Command == of.CommandDel && statusCode != uint32(infraerrorcode.ErrNotExists) {
156 if attempt != MaxRetryCount {
Akash Soni6168f312023-05-18 20:57:33 +0530157 logger.Warnw(ctx, "Retrying Flow Delete", log.Fields{"Cookie": flow.GetFlowMod().Cookie, "Attempt": attempt})
Naveen Sampath04696f72022-06-13 15:19:14 +0530158 attempt++
159 continue
160 }
161 logger.Errorw(ctx, "Flow Delete failed even aft max retries", log.Fields{"Flow": flow, "Attempt": attempt})
162 }
163 }
164 break
165 }
Akash Sonief452f12024-12-12 18:20:28 +0530166 aft.device.triggerFlowNotification(ctx, flow.FlowMod.Cookie, aft.flow.Command, of.BwAvailDetails{}, err)
Naveen Sampath04696f72022-06-13 15:19:14 +0530167 } else {
168 logger.Errorw(ctx, "Update Flow Table Failed: Voltha Client Unavailable", log.Fields{"Flow": flow})
169 }
170 }
171 return nil
172}
173
174func isFlowOperSuccess(statusCode uint32, oper of.Command) bool {
175 volthaErrorCode := infraerrorcode.ErrorCode(statusCode)
176
177 if volthaErrorCode == infraerrorcode.ErrOk {
178 return true
179 }
180
181 if oper == of.CommandAdd && volthaErrorCode == infraerrorcode.ErrAlreadyExists {
182 return true
Naveen Sampath04696f72022-06-13 15:19:14 +0530183 } else if oper == of.CommandDel && volthaErrorCode == infraerrorcode.ErrNotExists {
184 return true
185 }
186 return false
187}
188
189// func getBwAvailInfo(bwAvailInfo []*voltha.ResponseMsg) of.BwAvailDetails {
190// var bwInfo of.BwAvailDetails
191// // convert the bw details sent from olt to a struct
192// // received msg format:
193// // additional_data[Data{ResponseMsg
194// //{"key":"prevBW","value":"111111"},
195// //{"key":"presentBW","value":"10000"}]
196// if len(bwAvailInfo) > 1 {
197// prevBwResp := bwAvailInfo[0]
198// if prevBwResp.Key == of.PrevBwInfo {
199// _, err := strconv.Atoi(prevBwResp.Val)
200// if err == nil {
201// bwInfo.PrevBw = prevBwResp.Val
202// }
203// }
204
205// presentBwResp := bwAvailInfo[1]
206// if presentBwResp.Key == of.PresentBwInfo {
207// _, err := strconv.Atoi(prevBwResp.Val)
208// if err == nil {
209// bwInfo.PresentBw = presentBwResp.Val
210// }
211// }
212// if bwInfo.PresentBw == bwInfo.PrevBw {
213// return of.BwAvailDetails{}
214// }
215// logger.Infow(ctx, "Bandwidth-consumed-info", log.Fields{"BwConsumed": bwInfo})
216// }
217// return bwInfo
218// }