blob: cb32eff8d159da49cbecf11f287231cf9ac24e21 [file] [log] [blame]
Mahir Gunyel03de0d32020-06-03 01:36:59 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package device
18
19import (
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070020 "context"
Maninder9a1bc0d2020-10-26 11:34:02 +053021 "fmt"
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070022
khenaidoo9beaaf12021-10-19 17:32:01 -040023 ca "github.com/opencord/voltha-protos/v5/go/core_adapter"
khenaidood948f772021-08-11 17:49:24 -040024
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070025 "github.com/gogo/protobuf/proto"
26 coreutils "github.com/opencord/voltha-go/rw_core/utils"
khenaidood948f772021-08-11 17:49:24 -040027 fu "github.com/opencord/voltha-lib-go/v7/pkg/flows"
28 "github.com/opencord/voltha-lib-go/v7/pkg/log"
29 "github.com/opencord/voltha-protos/v5/go/common"
30 ofp "github.com/opencord/voltha-protos/v5/go/openflow_13"
31 "github.com/opencord/voltha-protos/v5/go/voltha"
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070032 "google.golang.org/grpc/codes"
33 "google.golang.org/grpc/status"
Mahir Gunyel03de0d32020-06-03 01:36:59 -070034)
35
36// listDeviceFlows returns device flows
37func (agent *Agent) listDeviceFlows() map[uint64]*ofp.OfpFlowStats {
khenaidoo7585a962021-06-10 16:15:38 -040038 flowIDs := agent.flowCache.ListIDs()
Mahir Gunyel03de0d32020-06-03 01:36:59 -070039 flows := make(map[uint64]*ofp.OfpFlowStats, len(flowIDs))
40 for flowID := range flowIDs {
khenaidoo7585a962021-06-10 16:15:38 -040041 if flowHandle, have := agent.flowCache.Lock(flowID); have {
Mahir Gunyel03de0d32020-06-03 01:36:59 -070042 flows[flowID] = flowHandle.GetReadOnly()
43 flowHandle.Unlock()
44 }
45 }
46 return flows
47}
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070048
khenaidoo9beaaf12021-10-19 17:32:01 -040049func (agent *Agent) addFlowsToAdapter(ctx context.Context, newFlows []*ofp.OfpFlowStats, flowMetadata *ofp.FlowMetadata) (coreutils.Response, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +000050 logger.Debugw(ctx, "add-flows-to-adapters", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "flow-metadata": flowMetadata})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070051
khenaidood948f772021-08-11 17:49:24 -040052 var err error
Maninder9a1bc0d2020-10-26 11:34:02 +053053 var desc string
54 operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
khenaidood948f772021-08-11 17:49:24 -040055 defer func() { agent.logDeviceUpdate(ctx, nil, nil, operStatus, err, desc) }()
Maninder9a1bc0d2020-10-26 11:34:02 +053056
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070057 if (len(newFlows)) == 0 {
Rohan Agrawal31f21802020-06-12 05:38:46 +000058 logger.Debugw(ctx, "nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": newFlows})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070059 return coreutils.DoneResponse(), nil
60 }
Kent Hagermancba2f302020-07-28 13:37:36 -040061 device, err := agent.getDeviceReadOnly(ctx)
62 if err != nil {
63 return coreutils.DoneResponse(), status.Errorf(codes.Aborted, "%s", err)
64 }
Maninder2195ccc2021-06-23 20:23:01 +053065
66 if !agent.proceedWithRequest(device) {
khenaidood948f772021-08-11 17:49:24 -040067 err = status.Errorf(codes.FailedPrecondition, "%s", "cannot complete operation as device deletion is in progress or reconciling is in progress/failed")
68 return coreutils.DoneResponse(), err
Maninder2195ccc2021-06-23 20:23:01 +053069 }
70
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070071 dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
72 if err != nil {
khenaidood948f772021-08-11 17:49:24 -040073 return coreutils.DoneResponse(), err
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070074 }
Maninder2195ccc2021-06-23 20:23:01 +053075
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070076 flowsToAdd := make([]*ofp.OfpFlowStats, 0)
77 flowsToDelete := make([]*ofp.OfpFlowStats, 0)
78 for _, flow := range newFlows {
khenaidoo7585a962021-06-10 16:15:38 -040079 flowHandle, created, err := agent.flowCache.LockOrCreate(ctx, flow)
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070080 if err != nil {
Maninder9a1bc0d2020-10-26 11:34:02 +053081 desc = err.Error()
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070082 return coreutils.DoneResponse(), err
83 }
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070084 if created {
85 flowsToAdd = append(flowsToAdd, flow)
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070086 } else {
87 flowToReplace := flowHandle.GetReadOnly()
88 if !proto.Equal(flowToReplace, flow) {
89 //Flow needs to be updated.
90 if err := flowHandle.Update(ctx, flow); err != nil {
91 flowHandle.Unlock()
khenaidood948f772021-08-11 17:49:24 -040092 return coreutils.DoneResponse(), err
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070093 }
94 flowsToDelete = append(flowsToDelete, flowToReplace)
95 flowsToAdd = append(flowsToAdd, flow)
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070096 } else {
97 //No need to change the flow. It is already exist.
Himani Chawlab4c25912020-11-12 17:16:38 +053098 logger.Debugw(ctx, "no-need-to-change-already-existing-flow", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "flow-metadata": flowMetadata})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070099 }
100 }
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700101 flowHandle.Unlock()
102 }
103
104 // Sanity check
105 if (len(flowsToAdd)) == 0 {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000106 logger.Debugw(ctx, "no-flows-to-update", log.Fields{"device-id": agent.deviceID, "flows": newFlows})
khenaidood948f772021-08-11 17:49:24 -0400107 operStatus.Code = common.OperationResp_OPERATION_SUCCESS
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700108 return coreutils.DoneResponse(), nil
109 }
110
111 // Send update to adapters
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700112 response := coreutils.NewResponse()
khenaidood948f772021-08-11 17:49:24 -0400113 subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700114 if !dType.AcceptsAddRemoveFlowUpdates {
Kent Hagermana7c9d792020-07-16 17:39:01 -0400115 updatedAllFlows := agent.listDeviceFlows()
khenaidood948f772021-08-11 17:49:24 -0400116 ctr, flowSlice := 0, make([]*ofp.OfpFlowStats, len(updatedAllFlows))
117 for _, flow := range updatedAllFlows {
118 flowSlice[ctr] = flow
119 ctr++
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700120 }
khenaidoo9beaaf12021-10-19 17:32:01 -0400121 go agent.sendBulkFlows(subCtx, device, &ofp.Flows{Items: flowSlice}, nil, flowMetadata, response)
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700122 } else {
123 flowChanges := &ofp.FlowChanges{
khenaidoo9beaaf12021-10-19 17:32:01 -0400124 ToAdd: &ofp.Flows{Items: flowsToAdd},
125 ToRemove: &ofp.Flows{Items: flowsToDelete},
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700126 }
127 groupChanges := &ofp.FlowGroupChanges{
khenaidoo9beaaf12021-10-19 17:32:01 -0400128 ToAdd: &ofp.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
129 ToRemove: &ofp.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
130 ToUpdate: &ofp.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700131 }
khenaidood948f772021-08-11 17:49:24 -0400132 go agent.sendIncrementalFlows(subCtx, device, flowChanges, groupChanges, flowMetadata, response)
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700133 }
Maninder9a1bc0d2020-10-26 11:34:02 +0530134 operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700135 return response, nil
136}
137
khenaidood948f772021-08-11 17:49:24 -0400138func (agent *Agent) sendBulkFlows(
139 ctx context.Context,
140 device *voltha.Device,
khenaidoo9beaaf12021-10-19 17:32:01 -0400141 flows *ofp.Flows,
142 groups *ofp.FlowGroups,
143 flowMetadata *ofp.FlowMetadata,
khenaidood948f772021-08-11 17:49:24 -0400144 response coreutils.Response,
145) {
146 var err error
147 var desc string
148 operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
149 defer func() { agent.logDeviceUpdate(ctx, nil, nil, operStatus, err, desc) }()
150
151 // Get a grpc client
152 client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
153 if err != nil {
154 logger.Errorw(ctx, "grpc-client-nil",
155 log.Fields{
156 "error": err,
157 "device-id": agent.deviceID,
158 "device-type": agent.deviceType,
159 "adapter-endpoint": device.AdapterEndpoint,
160 })
161 response.Error(err)
162 return
163 }
Himani Chawla4b4bd252021-11-08 15:59:40 +0530164 subCtx, cancel := context.WithTimeout(ctx, agent.flowTimeout)
khenaidood948f772021-08-11 17:49:24 -0400165 defer cancel()
166
khenaidoo9beaaf12021-10-19 17:32:01 -0400167 if _, err = client.UpdateFlowsBulk(subCtx, &ca.BulkFlows{
khenaidood948f772021-08-11 17:49:24 -0400168 Device: device,
169 Flows: flows,
170 Groups: groups,
171 FlowMetadata: flowMetadata,
172 }); err != nil {
173 response.Error(err)
174 } else {
175 response.Done()
176 operStatus.Code = common.OperationResp_OPERATION_SUCCESS
177 }
178}
179
180func (agent *Agent) sendIncrementalFlows(
181 ctx context.Context,
182 device *voltha.Device,
183 flowChanges *ofp.FlowChanges,
184 groupChanges *ofp.FlowGroupChanges,
khenaidoo9beaaf12021-10-19 17:32:01 -0400185 flowMetadata *ofp.FlowMetadata,
khenaidood948f772021-08-11 17:49:24 -0400186 response coreutils.Response,
187) {
188 var err error
189 var desc string
190 operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
191 defer func() { agent.logDeviceUpdate(ctx, nil, nil, operStatus, err, desc) }()
192
193 // Get a grpc client
194 client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
195 if err != nil {
196 logger.Errorw(ctx, "grpc-client-nil",
197 log.Fields{
198 "error": err,
199 "device-id": agent.deviceID,
200 "device-type": agent.deviceType,
201 "adapter-endpoint": device.AdapterEndpoint,
202 })
203 response.Error(err)
204 return
205 }
Himani Chawla4b4bd252021-11-08 15:59:40 +0530206 subCtx, cancel := context.WithTimeout(ctx, agent.flowTimeout)
khenaidood948f772021-08-11 17:49:24 -0400207 defer cancel()
khenaidoo9beaaf12021-10-19 17:32:01 -0400208 if _, err = client.UpdateFlowsIncrementally(subCtx, &ca.IncrementalFlows{
khenaidood948f772021-08-11 17:49:24 -0400209 Device: device,
210 Flows: flowChanges,
211 Groups: groupChanges,
212 FlowMetadata: flowMetadata,
213 }); err != nil {
214 response.Error(err)
215 } else {
216 response.Done()
217 operStatus.Code = common.OperationResp_OPERATION_SUCCESS
218 }
219}
220
khenaidoo9beaaf12021-10-19 17:32:01 -0400221func (agent *Agent) deleteFlowsFromAdapter(ctx context.Context, flowsToDel []*ofp.OfpFlowStats, flowMetadata *ofp.FlowMetadata) (coreutils.Response, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000222 logger.Debugw(ctx, "delete-flows-from-adapter", log.Fields{"device-id": agent.deviceID, "flows": flowsToDel})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700223
Maninder9a1bc0d2020-10-26 11:34:02 +0530224 var desc string
khenaidood948f772021-08-11 17:49:24 -0400225 var err error
Maninder9a1bc0d2020-10-26 11:34:02 +0530226 operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
khenaidood948f772021-08-11 17:49:24 -0400227 defer func() { agent.logDeviceUpdate(ctx, nil, nil, operStatus, err, desc) }()
Maninder9a1bc0d2020-10-26 11:34:02 +0530228
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700229 if (len(flowsToDel)) == 0 {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000230 logger.Debugw(ctx, "nothing-to-delete", log.Fields{"device-id": agent.deviceID, "flows": flowsToDel})
khenaidood948f772021-08-11 17:49:24 -0400231 operStatus.Code = common.OperationResp_OPERATION_SUCCESS
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700232 return coreutils.DoneResponse(), nil
233 }
234
Kent Hagermancba2f302020-07-28 13:37:36 -0400235 device, err := agent.getDeviceReadOnly(ctx)
236 if err != nil {
khenaidood948f772021-08-11 17:49:24 -0400237 return coreutils.DoneResponse(), err
Kent Hagermancba2f302020-07-28 13:37:36 -0400238 }
Maninder2195ccc2021-06-23 20:23:01 +0530239
240 if !agent.proceedWithRequest(device) {
khenaidood948f772021-08-11 17:49:24 -0400241 err = status.Errorf(codes.FailedPrecondition, "%s", "cannot complete operation as device deletion is in progress or reconciling is in progress/failed")
242 return coreutils.DoneResponse(), err
Maninder2195ccc2021-06-23 20:23:01 +0530243 }
244
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700245 dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
246 if err != nil {
khenaidood948f772021-08-11 17:49:24 -0400247 return coreutils.DoneResponse(), err
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700248 }
Maninder2195ccc2021-06-23 20:23:01 +0530249
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700250 for _, flow := range flowsToDel {
khenaidoo7585a962021-06-10 16:15:38 -0400251 if flowHandle, have := agent.flowCache.Lock(flow.Id); have {
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700252 // Update the store and cache
khenaidood948f772021-08-11 17:49:24 -0400253 if err = flowHandle.Delete(ctx); err != nil {
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700254 flowHandle.Unlock()
Maninder9a1bc0d2020-10-26 11:34:02 +0530255 desc = err.Error()
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700256 return coreutils.DoneResponse(), err
257 }
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700258 flowHandle.Unlock()
259 }
260 }
261
262 // Send update to adapters
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700263 response := coreutils.NewResponse()
khenaidood948f772021-08-11 17:49:24 -0400264 subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700265 if !dType.AcceptsAddRemoveFlowUpdates {
Kent Hagermana7c9d792020-07-16 17:39:01 -0400266 updatedAllFlows := agent.listDeviceFlows()
khenaidood948f772021-08-11 17:49:24 -0400267 ctr, flowSlice := 0, make([]*ofp.OfpFlowStats, len(updatedAllFlows))
268 for _, flow := range updatedAllFlows {
269 flowSlice[ctr] = flow
270 ctr++
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700271 }
khenaidoo9beaaf12021-10-19 17:32:01 -0400272 go agent.sendBulkFlows(subCtx, device, &ofp.Flows{Items: flowSlice}, nil, flowMetadata, response)
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700273 } else {
274 flowChanges := &ofp.FlowChanges{
khenaidoo9beaaf12021-10-19 17:32:01 -0400275 ToAdd: &ofp.Flows{Items: []*ofp.OfpFlowStats{}},
276 ToRemove: &ofp.Flows{Items: flowsToDel},
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700277 }
278 groupChanges := &ofp.FlowGroupChanges{
khenaidoo9beaaf12021-10-19 17:32:01 -0400279 ToAdd: &ofp.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
280 ToRemove: &ofp.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
281 ToUpdate: &ofp.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700282 }
khenaidood948f772021-08-11 17:49:24 -0400283 go agent.sendIncrementalFlows(subCtx, device, flowChanges, groupChanges, flowMetadata, response)
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700284 }
Maninder9a1bc0d2020-10-26 11:34:02 +0530285 operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700286 return response, nil
287}
288
khenaidoo9beaaf12021-10-19 17:32:01 -0400289func (agent *Agent) updateFlowsToAdapter(ctx context.Context, updatedFlows []*ofp.OfpFlowStats, flowMetadata *ofp.FlowMetadata) (coreutils.Response, error) {
Himani Chawlab4c25912020-11-12 17:16:38 +0530290 logger.Debugw(ctx, "update-flows-to-adapter", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700291
khenaidood948f772021-08-11 17:49:24 -0400292 var err error
Maninder9a1bc0d2020-10-26 11:34:02 +0530293 var desc string
294 operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
khenaidood948f772021-08-11 17:49:24 -0400295 defer func() { agent.logDeviceUpdate(ctx, nil, nil, operStatus, err, desc) }()
Maninder9a1bc0d2020-10-26 11:34:02 +0530296
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700297 if (len(updatedFlows)) == 0 {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000298 logger.Debugw(ctx, "nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows})
khenaidood948f772021-08-11 17:49:24 -0400299 operStatus.Code = common.OperationResp_OPERATION_SUCCESS
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700300 return coreutils.DoneResponse(), nil
301 }
302
Kent Hagermancba2f302020-07-28 13:37:36 -0400303 device, err := agent.getDeviceReadOnly(ctx)
304 if err != nil {
khenaidood948f772021-08-11 17:49:24 -0400305 return coreutils.DoneResponse(), err
Kent Hagermancba2f302020-07-28 13:37:36 -0400306 }
Maninder2195ccc2021-06-23 20:23:01 +0530307
308 if !agent.proceedWithRequest(device) {
khenaidood948f772021-08-11 17:49:24 -0400309 err = status.Errorf(codes.FailedPrecondition, "%s", "cannot complete operation as device deletion is in progress or reconciling is in progress/failed")
310 return coreutils.DoneResponse(), err
Maninder2195ccc2021-06-23 20:23:01 +0530311 }
312
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700313 if device.OperStatus != voltha.OperStatus_ACTIVE || device.ConnectStatus != voltha.ConnectStatus_REACHABLE || device.AdminState != voltha.AdminState_ENABLED {
khenaidood948f772021-08-11 17:49:24 -0400314 err = status.Errorf(codes.FailedPrecondition, "invalid device states")
315 return coreutils.DoneResponse(), err
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700316 }
Maninder2195ccc2021-06-23 20:23:01 +0530317
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700318 dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
319 if err != nil {
khenaidood948f772021-08-11 17:49:24 -0400320 return coreutils.DoneResponse(), err
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700321 }
Maninder2195ccc2021-06-23 20:23:01 +0530322
Kent Hagermana7c9d792020-07-16 17:39:01 -0400323 flowsToAdd := make([]*ofp.OfpFlowStats, 0, len(updatedFlows))
324 flowsToDelete := make([]*ofp.OfpFlowStats, 0, len(updatedFlows))
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700325 for _, flow := range updatedFlows {
khenaidoo7585a962021-06-10 16:15:38 -0400326 if flowHandle, have := agent.flowCache.Lock(flow.Id); have {
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700327 flowToDelete := flowHandle.GetReadOnly()
328 // Update the store and cache
khenaidood948f772021-08-11 17:49:24 -0400329 if err = flowHandle.Update(ctx, flow); err != nil {
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700330 flowHandle.Unlock()
331 return coreutils.DoneResponse(), err
332 }
333
334 flowsToDelete = append(flowsToDelete, flowToDelete)
335 flowsToAdd = append(flowsToAdd, flow)
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700336 flowHandle.Unlock()
337 }
338 }
339
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700340 response := coreutils.NewResponse()
khenaidood948f772021-08-11 17:49:24 -0400341 subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700342 // Process bulk flow update differently than incremental update
343 if !dType.AcceptsAddRemoveFlowUpdates {
Kent Hagermana7c9d792020-07-16 17:39:01 -0400344 updatedAllFlows := agent.listDeviceFlows()
khenaidood948f772021-08-11 17:49:24 -0400345 ctr, flowSlice := 0, make([]*ofp.OfpFlowStats, len(updatedAllFlows))
346 for _, flow := range updatedAllFlows {
347 flowSlice[ctr] = flow
348 ctr++
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700349 }
khenaidoo9beaaf12021-10-19 17:32:01 -0400350 go agent.sendBulkFlows(subCtx, device, &ofp.Flows{Items: flowSlice}, nil, flowMetadata, response)
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700351 } else {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000352 logger.Debugw(ctx, "updating-flows-and-groups",
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700353 log.Fields{
354 "device-id": agent.deviceID,
355 "flows-to-add": flowsToAdd,
356 "flows-to-delete": flowsToDelete,
357 })
358 // Sanity check
359 if (len(flowsToAdd) | len(flowsToDelete)) == 0 {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000360 logger.Debugw(ctx, "nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows})
khenaidood948f772021-08-11 17:49:24 -0400361 operStatus.Code = common.OperationResp_OPERATION_SUCCESS
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700362 return coreutils.DoneResponse(), nil
363 }
364
365 flowChanges := &ofp.FlowChanges{
khenaidoo9beaaf12021-10-19 17:32:01 -0400366 ToAdd: &ofp.Flows{Items: flowsToAdd},
367 ToRemove: &ofp.Flows{Items: flowsToDelete},
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700368 }
369 groupChanges := &ofp.FlowGroupChanges{
khenaidoo9beaaf12021-10-19 17:32:01 -0400370 ToAdd: &ofp.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
371 ToRemove: &ofp.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
372 ToUpdate: &ofp.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700373 }
khenaidood948f772021-08-11 17:49:24 -0400374 go agent.sendIncrementalFlows(subCtx, device, flowChanges, groupChanges, flowMetadata, response)
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700375 }
Maninder9a1bc0d2020-10-26 11:34:02 +0530376 operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700377 return response, nil
378}
379
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700380//filterOutFlows removes flows from a device using the uni-port as filter
khenaidoo9beaaf12021-10-19 17:32:01 -0400381func (agent *Agent) filterOutFlows(ctx context.Context, uniPort uint32, flowMetadata *ofp.FlowMetadata) error {
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700382 var flowsToDelete []*ofp.OfpFlowStats
383 // If an existing flow has the uniPort as an InPort or OutPort or as a Tunnel ID then it needs to be removed
khenaidoo7585a962021-06-10 16:15:38 -0400384 for flowID := range agent.flowCache.ListIDs() {
385 if flowHandle, have := agent.flowCache.Lock(flowID); have {
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700386 flow := flowHandle.GetReadOnly()
387 if flow != nil && (fu.GetInPort(flow) == uniPort || fu.GetOutPort(flow) == uniPort || fu.GetTunnelId(flow) == uint64(uniPort)) {
388 flowsToDelete = append(flowsToDelete, flow)
389 }
390 flowHandle.Unlock()
391 }
392 }
393
Rohan Agrawal31f21802020-06-12 05:38:46 +0000394 logger.Debugw(ctx, "flows-to-delete", log.Fields{"device-id": agent.deviceID, "uni-port": uniPort, "flows": flowsToDelete})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700395 if len(flowsToDelete) == 0 {
396 return nil
397 }
398
399 response, err := agent.deleteFlowsFromAdapter(ctx, flowsToDelete, flowMetadata)
400 if err != nil {
401 return err
402 }
Himani Chawla4b4bd252021-11-08 15:59:40 +0530403 if res := coreutils.WaitForNilOrErrorResponses(agent.flowTimeout, response); res != nil {
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700404 return status.Errorf(codes.Aborted, "errors-%s", res)
405 }
406 return nil
407}
408
409//deleteAllFlows deletes all flows in the device table
410func (agent *Agent) deleteAllFlows(ctx context.Context) error {
divyadesaicb8b59d2020-08-18 09:55:47 +0000411 logger.Debugw(ctx, "deleteAllFlows", log.Fields{"device-id": agent.deviceID})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700412
khenaidood948f772021-08-11 17:49:24 -0400413 var err error
414 var errFlows string
Maninder9a1bc0d2020-10-26 11:34:02 +0530415 var desc string
416 operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
khenaidood948f772021-08-11 17:49:24 -0400417 defer func() { agent.logDeviceUpdate(ctx, nil, nil, operStatus, err, desc) }()
Maninder9a1bc0d2020-10-26 11:34:02 +0530418
khenaidoo7585a962021-06-10 16:15:38 -0400419 for flowID := range agent.flowCache.ListIDs() {
420 if flowHandle, have := agent.flowCache.Lock(flowID); have {
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700421 // Update the store and cache
khenaidood948f772021-08-11 17:49:24 -0400422 if err = flowHandle.Delete(ctx); err != nil {
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700423 flowHandle.Unlock()
khenaidood948f772021-08-11 17:49:24 -0400424 errFlows += fmt.Sprintf("%v ", flowID)
425 logger.Errorw(ctx, "unable-to-delete-flow", log.Fields{"device-id": agent.deviceID, "flowID": flowID, "error": err})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700426 continue
427 }
428 flowHandle.Unlock()
429 }
430 }
Maninder9a1bc0d2020-10-26 11:34:02 +0530431
khenaidood948f772021-08-11 17:49:24 -0400432 if errFlows != "" {
433 err = fmt.Errorf("unable to delete flows : %s", errFlows)
Maninder9a1bc0d2020-10-26 11:34:02 +0530434 } else {
435 operStatus.Code = common.OperationResp_OPERATION_SUCCESS
436 }
khenaidood948f772021-08-11 17:49:24 -0400437 return err
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700438}