blob: 2af224c91748b157087c2ff79caeac4c6a4acd5e [file] [log] [blame]
Mahir Gunyel03de0d32020-06-03 01:36:59 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package device
18
19import (
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070020 "context"
21
22 "github.com/gogo/protobuf/proto"
23 coreutils "github.com/opencord/voltha-go/rw_core/utils"
24 fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
25 "github.com/opencord/voltha-lib-go/v3/pkg/log"
Mahir Gunyel03de0d32020-06-03 01:36:59 -070026 ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070027 "github.com/opencord/voltha-protos/v3/go/voltha"
28 "google.golang.org/grpc/codes"
29 "google.golang.org/grpc/status"
Mahir Gunyel03de0d32020-06-03 01:36:59 -070030)
31
32// listDeviceFlows returns device flows
33func (agent *Agent) listDeviceFlows() map[uint64]*ofp.OfpFlowStats {
Kent Hagermanfa9d6d42020-05-25 11:49:40 -040034 flowIDs := agent.flowLoader.ListIDs()
Mahir Gunyel03de0d32020-06-03 01:36:59 -070035 flows := make(map[uint64]*ofp.OfpFlowStats, len(flowIDs))
36 for flowID := range flowIDs {
37 if flowHandle, have := agent.flowLoader.Lock(flowID); have {
38 flows[flowID] = flowHandle.GetReadOnly()
39 flowHandle.Unlock()
40 }
41 }
42 return flows
43}
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070044
45func (agent *Agent) addFlowsToAdapter(ctx context.Context, newFlows []*ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
46 logger.Debugw("add-flows-to-adapters", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "flow-metadata": flowMetadata})
47
48 if (len(newFlows)) == 0 {
49 logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": newFlows})
50 return coreutils.DoneResponse(), nil
51 }
52 device := agent.getDeviceWithoutLock()
53 dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
54 if err != nil {
55 return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
56 }
57 updatedAllFlows := make([]*ofp.OfpFlowStats, 0)
58 if !dType.AcceptsAddRemoveFlowUpdates {
59 flowIDs := agent.flowLoader.ListIDs()
60 for flowID := range flowIDs {
61 if flowHandle, have := agent.flowLoader.Lock(flowID); have {
62 updatedAllFlows = append(updatedAllFlows, flowHandle.GetReadOnly())
63 flowHandle.Unlock()
64 }
65 }
66 }
67 flowsToAdd := make([]*ofp.OfpFlowStats, 0)
68 flowsToDelete := make([]*ofp.OfpFlowStats, 0)
69 for _, flow := range newFlows {
70 flowHandle, created, err := agent.flowLoader.LockOrCreate(ctx, flow)
71 if err != nil {
72 return coreutils.DoneResponse(), err
73 }
74
75 if created {
76 flowsToAdd = append(flowsToAdd, flow)
77 updatedAllFlows = append(updatedAllFlows, flow)
78 } else {
79 flowToReplace := flowHandle.GetReadOnly()
80 if !proto.Equal(flowToReplace, flow) {
81 //Flow needs to be updated.
82 if err := flowHandle.Update(ctx, flow); err != nil {
83 flowHandle.Unlock()
84 return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-flow-%d-to-device-%s", flow.Id, agent.deviceID)
85 }
86 flowsToDelete = append(flowsToDelete, flowToReplace)
87 flowsToAdd = append(flowsToAdd, flow)
88 updatedAllFlows = replaceFlowInList(updatedAllFlows, flowToReplace, flow)
89 } else {
90 //No need to change the flow. It is already exist.
91 logger.Debugw("No-need-to-change-already-existing-flow", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "flow-metadata": flowMetadata})
92 }
93 }
94
95 flowHandle.Unlock()
96 }
97
98 // Sanity check
99 if (len(flowsToAdd)) == 0 {
100 logger.Debugw("no-flows-to-update", log.Fields{"device-id": agent.deviceID, "flows": newFlows})
101 return coreutils.DoneResponse(), nil
102 }
103
104 // Send update to adapters
105 subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
106 response := coreutils.NewResponse()
107 if !dType.AcceptsAddRemoveFlowUpdates {
108 rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &ofp.Flows{Items: updatedAllFlows}, &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}}, flowMetadata)
109 if err != nil {
110 cancel()
111 return coreutils.DoneResponse(), err
112 }
113 go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
114 } else {
115 flowChanges := &ofp.FlowChanges{
116 ToAdd: &voltha.Flows{Items: flowsToAdd},
117 ToRemove: &voltha.Flows{Items: flowsToDelete},
118 }
119 groupChanges := &ofp.FlowGroupChanges{
120 ToAdd: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
121 ToRemove: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
122 ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
123 }
124 rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
125 if err != nil {
126 cancel()
127 return coreutils.DoneResponse(), err
128 }
129 go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
130 }
131 return response, nil
132}
133
134func (agent *Agent) deleteFlowsFromAdapter(ctx context.Context, flowsToDel []*ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
135 logger.Debugw("delete-flows-from-adapter", log.Fields{"device-id": agent.deviceID, "flows": flowsToDel})
136
137 if (len(flowsToDel)) == 0 {
138 logger.Debugw("nothing-to-delete", log.Fields{"device-id": agent.deviceID, "flows": flowsToDel})
139 return coreutils.DoneResponse(), nil
140 }
141
142 device := agent.getDeviceWithoutLock()
143 dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
144 if err != nil {
145 return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
146 }
147 updatedAllFlows := make([]*ofp.OfpFlowStats, 0)
148 if !dType.AcceptsAddRemoveFlowUpdates {
149 flowIDs := agent.flowLoader.ListIDs()
150 for flowID := range flowIDs {
151 if flowHandle, have := agent.flowLoader.Lock(flowID); have {
152 updatedAllFlows = append(updatedAllFlows, flowHandle.GetReadOnly())
153 flowHandle.Unlock()
154 }
155 }
156 }
157 for _, flow := range flowsToDel {
158 if flowHandle, have := agent.flowLoader.Lock(flow.Id); have {
159 // Update the store and cache
160 flowToDelete := flowHandle.GetReadOnly()
161 if err := flowHandle.Delete(ctx); err != nil {
162 flowHandle.Unlock()
163 return coreutils.DoneResponse(), err
164 }
165 if idx := fu.FindFlows(updatedAllFlows, flowToDelete); idx != -1 {
166 updatedAllFlows = deleteFlowWithoutPreservingOrder(updatedAllFlows, idx)
167 }
168 flowHandle.Unlock()
169 }
170 }
171
172 // Send update to adapters
173 subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
174 response := coreutils.NewResponse()
175 if !dType.AcceptsAddRemoveFlowUpdates {
176 rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: updatedAllFlows}, &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}}, flowMetadata)
177 if err != nil {
178 cancel()
179 return coreutils.DoneResponse(), err
180 }
181 go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
182 } else {
183 flowChanges := &ofp.FlowChanges{
184 ToAdd: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
185 ToRemove: &voltha.Flows{Items: flowsToDel},
186 }
187 groupChanges := &ofp.FlowGroupChanges{
188 ToAdd: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
189 ToRemove: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
190 ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
191 }
192 rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
193 if err != nil {
194 cancel()
195 return coreutils.DoneResponse(), err
196 }
197 go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
198 }
199 return response, nil
200}
201
202func (agent *Agent) updateFlowsToAdapter(ctx context.Context, updatedFlows []*ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
203 logger.Debugw("updateFlowsToAdapter", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows})
204
205 if (len(updatedFlows)) == 0 {
206 logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows})
207 return coreutils.DoneResponse(), nil
208 }
209
210 device := agent.getDeviceWithoutLock()
211 if device.OperStatus != voltha.OperStatus_ACTIVE || device.ConnectStatus != voltha.ConnectStatus_REACHABLE || device.AdminState != voltha.AdminState_ENABLED {
212 return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "invalid device states")
213 }
214 dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
215 if err != nil {
216 return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
217 }
218 updatedAllFlows := make([]*ofp.OfpFlowStats, 0)
219 if !dType.AcceptsAddRemoveFlowUpdates {
220 flowIDs := agent.flowLoader.ListIDs()
221 for flowID := range flowIDs {
222 if flowHandle, have := agent.flowLoader.Lock(flowID); have {
223 updatedAllFlows = append(updatedAllFlows, flowHandle.GetReadOnly())
224 flowHandle.Unlock()
225 }
226 }
227 }
228 flowsToAdd := make([]*ofp.OfpFlowStats, 0)
229 flowsToDelete := make([]*ofp.OfpFlowStats, 0)
230
231 for _, flow := range updatedFlows {
232 if flowHandle, have := agent.flowLoader.Lock(flow.Id); have {
233 flowToDelete := flowHandle.GetReadOnly()
234 // Update the store and cache
235 if err := flowHandle.Update(ctx, flow); err != nil {
236 flowHandle.Unlock()
237 return coreutils.DoneResponse(), err
238 }
239
240 flowsToDelete = append(flowsToDelete, flowToDelete)
241 flowsToAdd = append(flowsToAdd, flow)
242 updatedAllFlows = replaceFlowInList(updatedAllFlows, flowToDelete, flow)
243 flowHandle.Unlock()
244 }
245 }
246
247 subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
248 response := coreutils.NewResponse()
249 // Process bulk flow update differently than incremental update
250 if !dType.AcceptsAddRemoveFlowUpdates {
251 rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: updatedAllFlows}, &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}}, nil)
252 if err != nil {
253 cancel()
254 return coreutils.DoneResponse(), err
255 }
256 go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
257 } else {
258 logger.Debugw("updating-flows-and-groups",
259 log.Fields{
260 "device-id": agent.deviceID,
261 "flows-to-add": flowsToAdd,
262 "flows-to-delete": flowsToDelete,
263 })
264 // Sanity check
265 if (len(flowsToAdd) | len(flowsToDelete)) == 0 {
266 logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows})
267 cancel()
268 return coreutils.DoneResponse(), nil
269 }
270
271 flowChanges := &ofp.FlowChanges{
272 ToAdd: &voltha.Flows{Items: flowsToAdd},
273 ToRemove: &voltha.Flows{Items: flowsToDelete},
274 }
275 groupChanges := &ofp.FlowGroupChanges{
276 ToAdd: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
277 ToRemove: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
278 ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
279 }
280 rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
281 if err != nil {
282 cancel()
283 return coreutils.DoneResponse(), err
284 }
285 go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
286 }
287
288 return response, nil
289}
290
291//replaceFlowInList removes the old flow from list and adds the new one.
292func replaceFlowInList(flowList []*ofp.OfpFlowStats, oldFlow *ofp.OfpFlowStats, newFlow *ofp.OfpFlowStats) []*ofp.OfpFlowStats {
293 if idx := fu.FindFlows(flowList, oldFlow); idx != -1 {
294 flowList = deleteFlowWithoutPreservingOrder(flowList, idx)
295 }
296 flowList = append(flowList, newFlow)
297 return flowList
298}
299
300//deleteFlowWithoutPreservingOrder removes a flow specified by index from the flows slice. This function will
301//panic if the index is out of range.
302func deleteFlowWithoutPreservingOrder(flows []*ofp.OfpFlowStats, index int) []*ofp.OfpFlowStats {
303 flows[index] = flows[len(flows)-1]
304 flows[len(flows)-1] = nil
305 return flows[:len(flows)-1]
306}
307
308//filterOutFlows removes flows from a device using the uni-port as filter
309func (agent *Agent) filterOutFlows(ctx context.Context, uniPort uint32, flowMetadata *voltha.FlowMetadata) error {
310 var flowsToDelete []*ofp.OfpFlowStats
311 // If an existing flow has the uniPort as an InPort or OutPort or as a Tunnel ID then it needs to be removed
312 for flowID := range agent.flowLoader.ListIDs() {
313 if flowHandle, have := agent.flowLoader.Lock(flowID); have {
314 flow := flowHandle.GetReadOnly()
315 if flow != nil && (fu.GetInPort(flow) == uniPort || fu.GetOutPort(flow) == uniPort || fu.GetTunnelId(flow) == uint64(uniPort)) {
316 flowsToDelete = append(flowsToDelete, flow)
317 }
318 flowHandle.Unlock()
319 }
320 }
321
322 logger.Debugw("flows-to-delete", log.Fields{"device-id": agent.deviceID, "uni-port": uniPort, "flows": flowsToDelete})
323 if len(flowsToDelete) == 0 {
324 return nil
325 }
326
327 response, err := agent.deleteFlowsFromAdapter(ctx, flowsToDelete, flowMetadata)
328 if err != nil {
329 return err
330 }
331 if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, response); res != nil {
332 return status.Errorf(codes.Aborted, "errors-%s", res)
333 }
334 return nil
335}
336
337//deleteAllFlows deletes all flows in the device table
338func (agent *Agent) deleteAllFlows(ctx context.Context) error {
339 logger.Debugw("deleteAllFlows", log.Fields{"deviceId": agent.deviceID})
340
341 for flowID := range agent.flowLoader.ListIDs() {
342 if flowHandle, have := agent.flowLoader.Lock(flowID); have {
343 // Update the store and cache
344 if err := flowHandle.Delete(ctx); err != nil {
345 flowHandle.Unlock()
346 logger.Errorw("unable-to-delete-flow", log.Fields{"device-id": agent.deviceID, "flowID": flowID})
347 continue
348 }
349 flowHandle.Unlock()
350 }
351 }
352 return nil
353}