Naveen Sampath | 04696f7 | 2022-06-13 15:19:14 +0530 | [diff] [blame] | 1 | /* |
| 2 | * Copyright 2022-present Open Networking Foundation |
| 3 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | * you may not use this file except in compliance with the License. |
| 5 | * You may obtain a copy of the License at |
| 6 | * |
| 7 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | * |
| 9 | * Unless required by applicable law or agreed to in writing, software |
| 10 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | * See the License for the specific language governing permissions and |
| 13 | * limitations under the License. |
| 14 | */ |
| 15 | |
| 16 | package controller |
| 17 | |
| 18 | import ( |
| 19 | "context" |
| 20 | infraerror "voltha-go-controller/internal/pkg/errorcodes" |
| 21 | infraerrorcode "voltha-go-controller/internal/pkg/errorcodes/service" |
| 22 | "time" |
| 23 | |
| 24 | "voltha-go-controller/internal/pkg/of" |
| 25 | "github.com/opencord/voltha-lib-go/v7/pkg/log" |
| 26 | ) |
| 27 | |
| 28 | const ( |
| 29 | //MaxRetryCount - Maximum retry attempts on failure |
| 30 | MaxRetryCount int = 1 |
| 31 | ) |
| 32 | |
| 33 | // AddFlowsTask structure |
| 34 | type AddFlowsTask struct { |
| 35 | taskID uint8 |
| 36 | ctx context.Context |
| 37 | flow *of.VoltFlow |
| 38 | device *Device |
| 39 | timestamp string |
| 40 | } |
| 41 | |
| 42 | // NewAddFlowsTask is constructor for AddFlowsTask |
| 43 | func NewAddFlowsTask(ctx context.Context, flow *of.VoltFlow, device *Device) *AddFlowsTask { |
| 44 | var aft AddFlowsTask |
| 45 | aft.device = device |
| 46 | aft.flow = flow |
| 47 | aft.ctx = ctx |
| 48 | tstamp := (time.Now()).Format(time.RFC3339Nano) |
| 49 | aft.timestamp = tstamp |
| 50 | return &aft |
| 51 | } |
| 52 | |
| 53 | // Name to add flow task |
| 54 | func (aft *AddFlowsTask) Name() string { |
| 55 | for _, flow := range aft.flow.SubFlows { |
| 56 | logger.Infow(ctx, "Flow Cookies", log.Fields{"Cookie": flow.Cookie}) |
| 57 | } |
| 58 | return "Add Flows Task" |
| 59 | } |
| 60 | |
| 61 | // TaskID to return task ID |
| 62 | func (aft *AddFlowsTask) TaskID() uint8 { |
| 63 | return aft.taskID |
| 64 | } |
| 65 | |
| 66 | // Timestamp to return timestamp |
| 67 | func (aft *AddFlowsTask) Timestamp() string { |
| 68 | return aft.timestamp |
| 69 | } |
| 70 | |
| 71 | // Stop to stop the add flow task |
| 72 | func (aft *AddFlowsTask) Stop() { |
| 73 | } |
| 74 | |
| 75 | // Start to start adding flow task |
| 76 | func (aft *AddFlowsTask) Start(ctx context.Context, taskID uint8) error { |
| 77 | var err error |
| 78 | aft.taskID = taskID |
| 79 | aft.ctx = ctx |
| 80 | flowsToProcess := make(map[uint64]*of.VoltSubFlow) |
| 81 | flowsPresent := 0 |
| 82 | // First add/delete the flows first locally before passing them to actual device |
| 83 | for _, flow := range aft.flow.SubFlows { |
| 84 | logger.Infow(ctx, "Flow Mod Request", log.Fields{"Cookie": flow.Cookie, "Oper": aft.flow.Command, "Port": aft.flow.PortID}) |
| 85 | if aft.flow.Command == of.CommandAdd { |
| 86 | flow.State = of.FlowAddPending |
| 87 | if err := aft.device.AddFlow(flow); err != nil { |
| 88 | logger.Warnw(ctx, "Add Flow Error", log.Fields{"Cookie": flow.Cookie, "Reason": err.Error()}) |
| 89 | |
| 90 | // If flow already exists in cache, check for flow state |
| 91 | // If Success: Trigger success FLow Indication |
| 92 | // if Failure: Continue process, so add-retry happens |
| 93 | if err.Error() == ErrDuplicateFlow { |
| 94 | dbFlow, _ := aft.device.GetFlow(flow.Cookie) |
| 95 | if dbFlow.State == of.FlowAddSuccess { |
| 96 | aft.device.triggerFlowNotification(flow.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil) |
| 97 | flowsPresent++ |
| 98 | } |
| 99 | } |
| 100 | } |
| 101 | flowsToProcess[flow.Cookie] = flow |
| 102 | } else { |
| 103 | dbFlow, ok := aft.device.GetFlow(flow.Cookie) |
| 104 | if !ok { |
| 105 | logger.Warnw(ctx, "Delete Flow Error: Flow Does not Exist", log.Fields{"Cookie": flow.Cookie, "Device": aft.device.ID}) |
| 106 | } else { |
| 107 | // dbFlow.State = of.FlowDelPending |
| 108 | // aft.device.AddFlowToDb(dbFlow) |
| 109 | flowsToProcess[flow.Cookie] = dbFlow |
| 110 | } |
| 111 | aft.device.triggerFlowNotification(flow.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil) |
| 112 | } |
| 113 | } |
| 114 | |
| 115 | if flowsPresent == len(aft.flow.SubFlows) { |
| 116 | logger.Warn(ctx, "All Flows already present in database. Skipping Flow Push to SB") |
| 117 | } |
| 118 | |
| 119 | // PortName and PortID are used for validation of PortID, whether it is still valid and associated with old PortName or |
| 120 | // PortID got assigned to another PortName. If the condition met, skip these flow update to voltha core |
| 121 | if aft.flow.PortName != "" && aft.flow.PortID != 0 { |
| 122 | portName, _ := aft.device.GetPortName(aft.flow.PortID) |
| 123 | if aft.flow.PortName != portName && portName != "" { |
| 124 | for _, flow := range aft.flow.SubFlows { |
| 125 | logger.Errorw(ctx, "Skip Flow Update", log.Fields{"Reason": "Port Deleted", "PortName": aft.flow.PortName, "PortNo": aft.flow.PortID, "Cookie": flow.Cookie, "Operation": aft.flow.Command}) |
| 126 | if aft.flow.Command == of.CommandDel { |
| 127 | aft.device.triggerFlowNotification(flow.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil) |
| 128 | } |
| 129 | } |
| 130 | return nil |
| 131 | } |
| 132 | } |
| 133 | |
| 134 | if !aft.device.isSBOperAllowed(aft.flow.ForceAction) { |
| 135 | for _, flow := range aft.flow.SubFlows { |
| 136 | logger.Errorw(ctx, "Skipping Flow Table Update", log.Fields{"Reason": "Device State not UP", "State": aft.device.State, "Cookie": flow.Cookie, "Operation": aft.flow.Command}) |
| 137 | } |
| 138 | return nil |
| 139 | } |
| 140 | |
| 141 | flows := of.ProcessVoltFlow(aft.device.ID, aft.flow.Command, flowsToProcess) |
| 142 | for _, flow := range flows { |
| 143 | attempt := 0 |
| 144 | if vc := aft.device.VolthaClient(); vc != nil { |
| 145 | for { |
| 146 | if _, err = vc.UpdateLogicalDeviceFlowTable(aft.ctx, flow); err != nil { |
| 147 | logger.Errorw(ctx, "Update Flow Table Failed", log.Fields{"Cookie": flow.GetFlowMod().Cookie, "Reason": err.Error(), "Operation": aft.flow.Command}) |
| 148 | statusCode, _ := infraerror.GetErrorInfo(err) |
| 149 | |
| 150 | // Retry on flow delete failure once. |
| 151 | // Do NOT retry incase of failure with reason: Entry Not Found |
| 152 | if aft.flow.Command == of.CommandDel && statusCode != uint32(infraerrorcode.ErrNotExists) { |
| 153 | if attempt != MaxRetryCount { |
| 154 | logger.Errorw(ctx, "Retrying Flow Delete", log.Fields{"Cookie": flow.GetFlowMod().Cookie, "Attempt": attempt}) |
| 155 | attempt++ |
| 156 | continue |
| 157 | } |
| 158 | logger.Errorw(ctx, "Flow Delete failed even aft max retries", log.Fields{"Flow": flow, "Attempt": attempt}) |
| 159 | } |
| 160 | } |
| 161 | break |
| 162 | } |
| 163 | aft.device.triggerFlowNotification(flow.FlowMod.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil) |
| 164 | |
| 165 | } else { |
| 166 | logger.Errorw(ctx, "Update Flow Table Failed: Voltha Client Unavailable", log.Fields{"Flow": flow}) |
| 167 | } |
| 168 | } |
| 169 | return nil |
| 170 | } |
| 171 | |
| 172 | func isFlowOperSuccess(statusCode uint32, oper of.Command) bool { |
| 173 | volthaErrorCode := infraerrorcode.ErrorCode(statusCode) |
| 174 | |
| 175 | if volthaErrorCode == infraerrorcode.ErrOk { |
| 176 | return true |
| 177 | } |
| 178 | |
| 179 | if oper == of.CommandAdd && volthaErrorCode == infraerrorcode.ErrAlreadyExists { |
| 180 | return true |
| 181 | |
| 182 | } else if oper == of.CommandDel && volthaErrorCode == infraerrorcode.ErrNotExists { |
| 183 | return true |
| 184 | } |
| 185 | return false |
| 186 | } |
| 187 | |
| 188 | // func getBwAvailInfo(bwAvailInfo []*voltha.ResponseMsg) of.BwAvailDetails { |
| 189 | // var bwInfo of.BwAvailDetails |
| 190 | // // convert the bw details sent from olt to a struct |
| 191 | // // received msg format: |
| 192 | // // additional_data[Data{ResponseMsg |
| 193 | // //{"key":"prevBW","value":"111111"}, |
| 194 | // //{"key":"presentBW","value":"10000"}] |
| 195 | // if len(bwAvailInfo) > 1 { |
| 196 | // prevBwResp := bwAvailInfo[0] |
| 197 | // if prevBwResp.Key == of.PrevBwInfo { |
| 198 | // _, err := strconv.Atoi(prevBwResp.Val) |
| 199 | // if err == nil { |
| 200 | // bwInfo.PrevBw = prevBwResp.Val |
| 201 | // } |
| 202 | // } |
| 203 | |
| 204 | // presentBwResp := bwAvailInfo[1] |
| 205 | // if presentBwResp.Key == of.PresentBwInfo { |
| 206 | // _, err := strconv.Atoi(prevBwResp.Val) |
| 207 | // if err == nil { |
| 208 | // bwInfo.PresentBw = presentBwResp.Val |
| 209 | // } |
| 210 | // } |
| 211 | // if bwInfo.PresentBw == bwInfo.PrevBw { |
| 212 | // return of.BwAvailDetails{} |
| 213 | // } |
| 214 | // logger.Infow(ctx, "Bandwidth-consumed-info", log.Fields{"BwConsumed": bwInfo}) |
| 215 | // } |
| 216 | // return bwInfo |
| 217 | // } |