blob: 7524d00bf796f843e51223b00ec3ffb5b9410ced [file] [log] [blame]
Naveen Sampath04696f72022-06-13 15:19:14 +05301/*
2* Copyright 2022-present Open Networking Foundation
3* Licensed under the Apache License, Version 2.0 (the "License");
4* you may not use this file except in compliance with the License.
5* You may obtain a copy of the License at
6*
7* http://www.apache.org/licenses/LICENSE-2.0
8*
9* Unless required by applicable law or agreed to in writing, software
10* distributed under the License is distributed on an "AS IS" BASIS,
11* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12* See the License for the specific language governing permissions and
13* limitations under the License.
vinokuma926cb3e2023-03-29 11:41:06 +053014 */
Naveen Sampath04696f72022-06-13 15:19:14 +053015
16package controller
17
18import (
19 "context"
vinokuma926cb3e2023-03-29 11:41:06 +053020 "time"
Naveen Sampath04696f72022-06-13 15:19:14 +053021 infraerror "voltha-go-controller/internal/pkg/errorcodes"
22 infraerrorcode "voltha-go-controller/internal/pkg/errorcodes/service"
Naveen Sampath04696f72022-06-13 15:19:14 +053023
24 "voltha-go-controller/internal/pkg/of"
Tinoj Joseph1d108322022-07-13 10:07:39 +053025 "voltha-go-controller/log"
Naveen Sampath04696f72022-06-13 15:19:14 +053026)
27
28const (
vinokuma926cb3e2023-03-29 11:41:06 +053029 // MaxRetryCount - Maximum retry attempts on failure
Naveen Sampath04696f72022-06-13 15:19:14 +053030 MaxRetryCount int = 1
31)
32
33// AddFlowsTask structure
34type AddFlowsTask struct {
Naveen Sampath04696f72022-06-13 15:19:14 +053035 ctx context.Context
36 flow *of.VoltFlow
37 device *Device
38 timestamp string
vinokuma926cb3e2023-03-29 11:41:06 +053039 taskID uint8
Naveen Sampath04696f72022-06-13 15:19:14 +053040}
41
42// NewAddFlowsTask is constructor for AddFlowsTask
43func NewAddFlowsTask(ctx context.Context, flow *of.VoltFlow, device *Device) *AddFlowsTask {
44 var aft AddFlowsTask
45 aft.device = device
46 aft.flow = flow
47 aft.ctx = ctx
48 tstamp := (time.Now()).Format(time.RFC3339Nano)
49 aft.timestamp = tstamp
50 return &aft
51}
52
53// Name to add flow task
54func (aft *AddFlowsTask) Name() string {
55 for _, flow := range aft.flow.SubFlows {
56 logger.Infow(ctx, "Flow Cookies", log.Fields{"Cookie": flow.Cookie})
57 }
58 return "Add Flows Task"
59}
60
61// TaskID to return task ID
62func (aft *AddFlowsTask) TaskID() uint8 {
63 return aft.taskID
64}
65
66// Timestamp to return timestamp
67func (aft *AddFlowsTask) Timestamp() string {
68 return aft.timestamp
69}
70
71// Stop to stop the add flow task
72func (aft *AddFlowsTask) Stop() {
73}
74
75// Start to start adding flow task
76func (aft *AddFlowsTask) Start(ctx context.Context, taskID uint8) error {
77 var err error
78 aft.taskID = taskID
79 aft.ctx = ctx
80 flowsToProcess := make(map[uint64]*of.VoltSubFlow)
81 flowsPresent := 0
82 // First add/delete the flows first locally before passing them to actual device
83 for _, flow := range aft.flow.SubFlows {
84 logger.Infow(ctx, "Flow Mod Request", log.Fields{"Cookie": flow.Cookie, "Oper": aft.flow.Command, "Port": aft.flow.PortID})
85 if aft.flow.Command == of.CommandAdd {
86 flow.State = of.FlowAddPending
vinokuma926cb3e2023-03-29 11:41:06 +053087 if err = aft.device.AddFlow(ctx, flow); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +053088 logger.Warnw(ctx, "Add Flow Error", log.Fields{"Cookie": flow.Cookie, "Reason": err.Error()})
89
90 // If flow already exists in cache, check for flow state
91 // If Success: Trigger success FLow Indication
92 // if Failure: Continue process, so add-retry happens
93 if err.Error() == ErrDuplicateFlow {
94 dbFlow, _ := aft.device.GetFlow(flow.Cookie)
95 if dbFlow.State == of.FlowAddSuccess {
Tinoj Joseph07cc5372022-07-18 22:53:51 +053096 aft.device.triggerFlowNotification(ctx, flow.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil)
Naveen Sampath04696f72022-06-13 15:19:14 +053097 flowsPresent++
98 }
99 }
100 }
101 flowsToProcess[flow.Cookie] = flow
102 } else {
103 dbFlow, ok := aft.device.GetFlow(flow.Cookie)
104 if !ok {
105 logger.Warnw(ctx, "Delete Flow Error: Flow Does not Exist", log.Fields{"Cookie": flow.Cookie, "Device": aft.device.ID})
106 } else {
107 // dbFlow.State = of.FlowDelPending
108 // aft.device.AddFlowToDb(dbFlow)
109 flowsToProcess[flow.Cookie] = dbFlow
110 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530111 aft.device.triggerFlowNotification(ctx, flow.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil)
Naveen Sampath04696f72022-06-13 15:19:14 +0530112 }
113 }
114
115 if flowsPresent == len(aft.flow.SubFlows) {
116 logger.Warn(ctx, "All Flows already present in database. Skipping Flow Push to SB")
117 }
118
119 // PortName and PortID are used for validation of PortID, whether it is still valid and associated with old PortName or
120 // PortID got assigned to another PortName. If the condition met, skip these flow update to voltha core
121 if aft.flow.PortName != "" && aft.flow.PortID != 0 {
122 portName, _ := aft.device.GetPortName(aft.flow.PortID)
123 if aft.flow.PortName != portName && portName != "" {
124 for _, flow := range aft.flow.SubFlows {
Akash Soni6168f312023-05-18 20:57:33 +0530125 logger.Warnw(ctx, "Skip Flow Update", log.Fields{"Reason": "Port Deleted", "PortName": aft.flow.PortName, "PortNo": aft.flow.PortID, "Cookie": flow.Cookie, "Operation": aft.flow.Command})
Naveen Sampath04696f72022-06-13 15:19:14 +0530126 if aft.flow.Command == of.CommandDel {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530127 aft.device.triggerFlowNotification(ctx, flow.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil)
Naveen Sampath04696f72022-06-13 15:19:14 +0530128 }
129 }
130 return nil
131 }
132 }
133
134 if !aft.device.isSBOperAllowed(aft.flow.ForceAction) {
135 for _, flow := range aft.flow.SubFlows {
Akash Soni6168f312023-05-18 20:57:33 +0530136 logger.Warnw(ctx, "Skipping Flow Table Update", log.Fields{"Reason": "Device State not UP", "State": aft.device.State, "Cookie": flow.Cookie, "Operation": aft.flow.Command})
Naveen Sampath04696f72022-06-13 15:19:14 +0530137 }
138 return nil
139 }
140
141 flows := of.ProcessVoltFlow(aft.device.ID, aft.flow.Command, flowsToProcess)
142 for _, flow := range flows {
143 attempt := 0
144 if vc := aft.device.VolthaClient(); vc != nil {
145 for {
146 if _, err = vc.UpdateLogicalDeviceFlowTable(aft.ctx, flow); err != nil {
147 logger.Errorw(ctx, "Update Flow Table Failed", log.Fields{"Cookie": flow.GetFlowMod().Cookie, "Reason": err.Error(), "Operation": aft.flow.Command})
148 statusCode, _ := infraerror.GetErrorInfo(err)
149
150 // Retry on flow delete failure once.
151 // Do NOT retry incase of failure with reason: Entry Not Found
152 if aft.flow.Command == of.CommandDel && statusCode != uint32(infraerrorcode.ErrNotExists) {
153 if attempt != MaxRetryCount {
Akash Soni6168f312023-05-18 20:57:33 +0530154 logger.Warnw(ctx, "Retrying Flow Delete", log.Fields{"Cookie": flow.GetFlowMod().Cookie, "Attempt": attempt})
Naveen Sampath04696f72022-06-13 15:19:14 +0530155 attempt++
156 continue
157 }
158 logger.Errorw(ctx, "Flow Delete failed even aft max retries", log.Fields{"Flow": flow, "Attempt": attempt})
159 }
160 }
161 break
162 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530163 aft.device.triggerFlowNotification(ctx, flow.FlowMod.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil)
Naveen Sampath04696f72022-06-13 15:19:14 +0530164 } else {
165 logger.Errorw(ctx, "Update Flow Table Failed: Voltha Client Unavailable", log.Fields{"Flow": flow})
166 }
167 }
168 return nil
169}
170
171func isFlowOperSuccess(statusCode uint32, oper of.Command) bool {
172 volthaErrorCode := infraerrorcode.ErrorCode(statusCode)
173
174 if volthaErrorCode == infraerrorcode.ErrOk {
175 return true
176 }
177
178 if oper == of.CommandAdd && volthaErrorCode == infraerrorcode.ErrAlreadyExists {
179 return true
Naveen Sampath04696f72022-06-13 15:19:14 +0530180 } else if oper == of.CommandDel && volthaErrorCode == infraerrorcode.ErrNotExists {
181 return true
182 }
183 return false
184}
185
186// func getBwAvailInfo(bwAvailInfo []*voltha.ResponseMsg) of.BwAvailDetails {
187// var bwInfo of.BwAvailDetails
188// // convert the bw details sent from olt to a struct
189// // received msg format:
190// // additional_data[Data{ResponseMsg
191// //{"key":"prevBW","value":"111111"},
192// //{"key":"presentBW","value":"10000"}]
193// if len(bwAvailInfo) > 1 {
194// prevBwResp := bwAvailInfo[0]
195// if prevBwResp.Key == of.PrevBwInfo {
196// _, err := strconv.Atoi(prevBwResp.Val)
197// if err == nil {
198// bwInfo.PrevBw = prevBwResp.Val
199// }
200// }
201
202// presentBwResp := bwAvailInfo[1]
203// if presentBwResp.Key == of.PresentBwInfo {
204// _, err := strconv.Atoi(prevBwResp.Val)
205// if err == nil {
206// bwInfo.PresentBw = presentBwResp.Val
207// }
208// }
209// if bwInfo.PresentBw == bwInfo.PrevBw {
210// return of.BwAvailDetails{}
211// }
212// logger.Infow(ctx, "Bandwidth-consumed-info", log.Fields{"BwConsumed": bwInfo})
213// }
214// return bwInfo
215// }