blob: f2fd10a6fe4d6ac9b5ef85af5e7fcf91c35adea5 [file] [log] [blame]
Mahir Gunyel03de0d32020-06-03 01:36:59 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package device
18
19import (
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070020 "context"
21
22 "github.com/gogo/protobuf/proto"
23 coreutils "github.com/opencord/voltha-go/rw_core/utils"
Maninderdfadc982020-10-28 14:04:33 +053024 fu "github.com/opencord/voltha-lib-go/v4/pkg/flows"
25 "github.com/opencord/voltha-lib-go/v4/pkg/log"
26 ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
27 "github.com/opencord/voltha-protos/v4/go/voltha"
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070028 "google.golang.org/grpc/codes"
29 "google.golang.org/grpc/status"
Mahir Gunyel03de0d32020-06-03 01:36:59 -070030)
31
32// listDeviceFlows returns device flows
33func (agent *Agent) listDeviceFlows() map[uint64]*ofp.OfpFlowStats {
Kent Hagermanfa9d6d42020-05-25 11:49:40 -040034 flowIDs := agent.flowLoader.ListIDs()
Mahir Gunyel03de0d32020-06-03 01:36:59 -070035 flows := make(map[uint64]*ofp.OfpFlowStats, len(flowIDs))
36 for flowID := range flowIDs {
37 if flowHandle, have := agent.flowLoader.Lock(flowID); have {
38 flows[flowID] = flowHandle.GetReadOnly()
39 flowHandle.Unlock()
40 }
41 }
42 return flows
43}
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070044
45func (agent *Agent) addFlowsToAdapter(ctx context.Context, newFlows []*ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +000046 logger.Debugw(ctx, "add-flows-to-adapters", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "flow-metadata": flowMetadata})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070047
48 if (len(newFlows)) == 0 {
Rohan Agrawal31f21802020-06-12 05:38:46 +000049 logger.Debugw(ctx, "nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": newFlows})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070050 return coreutils.DoneResponse(), nil
51 }
Kent Hagermancba2f302020-07-28 13:37:36 -040052 device, err := agent.getDeviceReadOnly(ctx)
53 if err != nil {
54 return coreutils.DoneResponse(), status.Errorf(codes.Aborted, "%s", err)
55 }
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070056 dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
57 if err != nil {
58 return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
59 }
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070060 flowsToAdd := make([]*ofp.OfpFlowStats, 0)
61 flowsToDelete := make([]*ofp.OfpFlowStats, 0)
62 for _, flow := range newFlows {
63 flowHandle, created, err := agent.flowLoader.LockOrCreate(ctx, flow)
64 if err != nil {
65 return coreutils.DoneResponse(), err
66 }
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070067 if created {
68 flowsToAdd = append(flowsToAdd, flow)
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070069 } else {
70 flowToReplace := flowHandle.GetReadOnly()
71 if !proto.Equal(flowToReplace, flow) {
72 //Flow needs to be updated.
73 if err := flowHandle.Update(ctx, flow); err != nil {
74 flowHandle.Unlock()
75 return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-flow-%d-to-device-%s", flow.Id, agent.deviceID)
76 }
77 flowsToDelete = append(flowsToDelete, flowToReplace)
78 flowsToAdd = append(flowsToAdd, flow)
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070079 } else {
80 //No need to change the flow. It is already exist.
Himani Chawlab4c25912020-11-12 17:16:38 +053081 logger.Debugw(ctx, "no-need-to-change-already-existing-flow", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "flow-metadata": flowMetadata})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070082 }
83 }
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070084 flowHandle.Unlock()
85 }
86
87 // Sanity check
88 if (len(flowsToAdd)) == 0 {
Rohan Agrawal31f21802020-06-12 05:38:46 +000089 logger.Debugw(ctx, "no-flows-to-update", log.Fields{"device-id": agent.deviceID, "flows": newFlows})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070090 return coreutils.DoneResponse(), nil
91 }
92
93 // Send update to adapters
Rohan Agrawalcf12f202020-08-03 04:42:01 +000094 subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
Himani Chawlab4c25912020-11-12 17:16:38 +053095 subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
96
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070097 response := coreutils.NewResponse()
98 if !dType.AcceptsAddRemoveFlowUpdates {
Kent Hagermana7c9d792020-07-16 17:39:01 -040099
100 updatedAllFlows := agent.listDeviceFlows()
101 rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, updatedAllFlows, nil, flowMetadata)
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700102 if err != nil {
103 cancel()
104 return coreutils.DoneResponse(), err
105 }
106 go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
107 } else {
108 flowChanges := &ofp.FlowChanges{
109 ToAdd: &voltha.Flows{Items: flowsToAdd},
110 ToRemove: &voltha.Flows{Items: flowsToDelete},
111 }
112 groupChanges := &ofp.FlowGroupChanges{
113 ToAdd: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
114 ToRemove: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
115 ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
116 }
117 rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
118 if err != nil {
119 cancel()
120 return coreutils.DoneResponse(), err
121 }
122 go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
123 }
124 return response, nil
125}
126
127func (agent *Agent) deleteFlowsFromAdapter(ctx context.Context, flowsToDel []*ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000128 logger.Debugw(ctx, "delete-flows-from-adapter", log.Fields{"device-id": agent.deviceID, "flows": flowsToDel})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700129
130 if (len(flowsToDel)) == 0 {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000131 logger.Debugw(ctx, "nothing-to-delete", log.Fields{"device-id": agent.deviceID, "flows": flowsToDel})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700132 return coreutils.DoneResponse(), nil
133 }
134
Kent Hagermancba2f302020-07-28 13:37:36 -0400135 device, err := agent.getDeviceReadOnly(ctx)
136 if err != nil {
137 return coreutils.DoneResponse(), status.Errorf(codes.Aborted, "%s", err)
138 }
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700139 dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
140 if err != nil {
141 return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
142 }
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700143 for _, flow := range flowsToDel {
144 if flowHandle, have := agent.flowLoader.Lock(flow.Id); have {
145 // Update the store and cache
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700146 if err := flowHandle.Delete(ctx); err != nil {
147 flowHandle.Unlock()
148 return coreutils.DoneResponse(), err
149 }
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700150 flowHandle.Unlock()
151 }
152 }
153
154 // Send update to adapters
Rohan Agrawalcf12f202020-08-03 04:42:01 +0000155 subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
Himani Chawlab4c25912020-11-12 17:16:38 +0530156 subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
157
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700158 response := coreutils.NewResponse()
159 if !dType.AcceptsAddRemoveFlowUpdates {
Kent Hagermana7c9d792020-07-16 17:39:01 -0400160
161 updatedAllFlows := agent.listDeviceFlows()
162 rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, updatedAllFlows, nil, flowMetadata)
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700163 if err != nil {
164 cancel()
165 return coreutils.DoneResponse(), err
166 }
167 go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
168 } else {
169 flowChanges := &ofp.FlowChanges{
170 ToAdd: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
171 ToRemove: &voltha.Flows{Items: flowsToDel},
172 }
173 groupChanges := &ofp.FlowGroupChanges{
174 ToAdd: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
175 ToRemove: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
176 ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
177 }
178 rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
179 if err != nil {
180 cancel()
181 return coreutils.DoneResponse(), err
182 }
183 go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
184 }
185 return response, nil
186}
187
188func (agent *Agent) updateFlowsToAdapter(ctx context.Context, updatedFlows []*ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
Himani Chawlab4c25912020-11-12 17:16:38 +0530189 logger.Debugw(ctx, "update-flows-to-adapter", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700190
191 if (len(updatedFlows)) == 0 {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000192 logger.Debugw(ctx, "nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700193 return coreutils.DoneResponse(), nil
194 }
195
Kent Hagermancba2f302020-07-28 13:37:36 -0400196 device, err := agent.getDeviceReadOnly(ctx)
197 if err != nil {
198 return coreutils.DoneResponse(), status.Errorf(codes.Aborted, "%s", err)
199 }
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700200 if device.OperStatus != voltha.OperStatus_ACTIVE || device.ConnectStatus != voltha.ConnectStatus_REACHABLE || device.AdminState != voltha.AdminState_ENABLED {
201 return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "invalid device states")
202 }
203 dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
204 if err != nil {
205 return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
206 }
Kent Hagermana7c9d792020-07-16 17:39:01 -0400207 flowsToAdd := make([]*ofp.OfpFlowStats, 0, len(updatedFlows))
208 flowsToDelete := make([]*ofp.OfpFlowStats, 0, len(updatedFlows))
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700209 for _, flow := range updatedFlows {
210 if flowHandle, have := agent.flowLoader.Lock(flow.Id); have {
211 flowToDelete := flowHandle.GetReadOnly()
212 // Update the store and cache
213 if err := flowHandle.Update(ctx, flow); err != nil {
214 flowHandle.Unlock()
215 return coreutils.DoneResponse(), err
216 }
217
218 flowsToDelete = append(flowsToDelete, flowToDelete)
219 flowsToAdd = append(flowsToAdd, flow)
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700220 flowHandle.Unlock()
221 }
222 }
223
Rohan Agrawalcf12f202020-08-03 04:42:01 +0000224 subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
Himani Chawlab4c25912020-11-12 17:16:38 +0530225 subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
226
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700227 response := coreutils.NewResponse()
228 // Process bulk flow update differently than incremental update
229 if !dType.AcceptsAddRemoveFlowUpdates {
Kent Hagermana7c9d792020-07-16 17:39:01 -0400230 updatedAllFlows := agent.listDeviceFlows()
231 rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, updatedAllFlows, nil, nil)
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700232 if err != nil {
233 cancel()
234 return coreutils.DoneResponse(), err
235 }
236 go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
237 } else {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000238 logger.Debugw(ctx, "updating-flows-and-groups",
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700239 log.Fields{
240 "device-id": agent.deviceID,
241 "flows-to-add": flowsToAdd,
242 "flows-to-delete": flowsToDelete,
243 })
244 // Sanity check
245 if (len(flowsToAdd) | len(flowsToDelete)) == 0 {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000246 logger.Debugw(ctx, "nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700247 cancel()
248 return coreutils.DoneResponse(), nil
249 }
250
251 flowChanges := &ofp.FlowChanges{
252 ToAdd: &voltha.Flows{Items: flowsToAdd},
253 ToRemove: &voltha.Flows{Items: flowsToDelete},
254 }
255 groupChanges := &ofp.FlowGroupChanges{
256 ToAdd: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
257 ToRemove: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
258 ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
259 }
260 rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
261 if err != nil {
262 cancel()
263 return coreutils.DoneResponse(), err
264 }
265 go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
266 }
267
268 return response, nil
269}
270
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700271//filterOutFlows removes flows from a device using the uni-port as filter
272func (agent *Agent) filterOutFlows(ctx context.Context, uniPort uint32, flowMetadata *voltha.FlowMetadata) error {
273 var flowsToDelete []*ofp.OfpFlowStats
274 // If an existing flow has the uniPort as an InPort or OutPort or as a Tunnel ID then it needs to be removed
275 for flowID := range agent.flowLoader.ListIDs() {
276 if flowHandle, have := agent.flowLoader.Lock(flowID); have {
277 flow := flowHandle.GetReadOnly()
278 if flow != nil && (fu.GetInPort(flow) == uniPort || fu.GetOutPort(flow) == uniPort || fu.GetTunnelId(flow) == uint64(uniPort)) {
279 flowsToDelete = append(flowsToDelete, flow)
280 }
281 flowHandle.Unlock()
282 }
283 }
284
Rohan Agrawal31f21802020-06-12 05:38:46 +0000285 logger.Debugw(ctx, "flows-to-delete", log.Fields{"device-id": agent.deviceID, "uni-port": uniPort, "flows": flowsToDelete})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700286 if len(flowsToDelete) == 0 {
287 return nil
288 }
289
290 response, err := agent.deleteFlowsFromAdapter(ctx, flowsToDelete, flowMetadata)
291 if err != nil {
292 return err
293 }
294 if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, response); res != nil {
295 return status.Errorf(codes.Aborted, "errors-%s", res)
296 }
297 return nil
298}
299
300//deleteAllFlows deletes all flows in the device table
301func (agent *Agent) deleteAllFlows(ctx context.Context) error {
divyadesaicb8b59d2020-08-18 09:55:47 +0000302 logger.Debugw(ctx, "deleteAllFlows", log.Fields{"device-id": agent.deviceID})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700303
304 for flowID := range agent.flowLoader.ListIDs() {
305 if flowHandle, have := agent.flowLoader.Lock(flowID); have {
306 // Update the store and cache
307 if err := flowHandle.Delete(ctx); err != nil {
308 flowHandle.Unlock()
Rohan Agrawal31f21802020-06-12 05:38:46 +0000309 logger.Errorw(ctx, "unable-to-delete-flow", log.Fields{"device-id": agent.deviceID, "flowID": flowID})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700310 continue
311 }
312 flowHandle.Unlock()
313 }
314 }
315 return nil
316}