blob: 56b09da70f225df36b1ce9cfd24909d43423299c [file] [log] [blame]
Mahir Gunyel03de0d32020-06-03 01:36:59 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package device
18
19import (
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070020 "context"
21
22 "github.com/gogo/protobuf/proto"
23 coreutils "github.com/opencord/voltha-go/rw_core/utils"
24 fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
25 "github.com/opencord/voltha-lib-go/v3/pkg/log"
Mahir Gunyel03de0d32020-06-03 01:36:59 -070026 ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070027 "github.com/opencord/voltha-protos/v3/go/voltha"
28 "google.golang.org/grpc/codes"
29 "google.golang.org/grpc/status"
Mahir Gunyel03de0d32020-06-03 01:36:59 -070030)
31
32// listDeviceFlows returns device flows
33func (agent *Agent) listDeviceFlows() map[uint64]*ofp.OfpFlowStats {
Kent Hagermanfa9d6d42020-05-25 11:49:40 -040034 flowIDs := agent.flowLoader.ListIDs()
Mahir Gunyel03de0d32020-06-03 01:36:59 -070035 flows := make(map[uint64]*ofp.OfpFlowStats, len(flowIDs))
36 for flowID := range flowIDs {
37 if flowHandle, have := agent.flowLoader.Lock(flowID); have {
38 flows[flowID] = flowHandle.GetReadOnly()
39 flowHandle.Unlock()
40 }
41 }
42 return flows
43}
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070044
45func (agent *Agent) addFlowsToAdapter(ctx context.Context, newFlows []*ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +000046 logger.Debugw(ctx, "add-flows-to-adapters", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "flow-metadata": flowMetadata})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070047
48 if (len(newFlows)) == 0 {
Rohan Agrawal31f21802020-06-12 05:38:46 +000049 logger.Debugw(ctx, "nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": newFlows})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070050 return coreutils.DoneResponse(), nil
51 }
Kent Hagermancba2f302020-07-28 13:37:36 -040052 device, err := agent.getDeviceReadOnly(ctx)
53 if err != nil {
54 return coreutils.DoneResponse(), status.Errorf(codes.Aborted, "%s", err)
55 }
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070056 dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
57 if err != nil {
58 return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
59 }
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070060 flowsToAdd := make([]*ofp.OfpFlowStats, 0)
61 flowsToDelete := make([]*ofp.OfpFlowStats, 0)
62 for _, flow := range newFlows {
63 flowHandle, created, err := agent.flowLoader.LockOrCreate(ctx, flow)
64 if err != nil {
65 return coreutils.DoneResponse(), err
66 }
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070067 if created {
68 flowsToAdd = append(flowsToAdd, flow)
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070069 } else {
70 flowToReplace := flowHandle.GetReadOnly()
71 if !proto.Equal(flowToReplace, flow) {
72 //Flow needs to be updated.
73 if err := flowHandle.Update(ctx, flow); err != nil {
74 flowHandle.Unlock()
75 return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-flow-%d-to-device-%s", flow.Id, agent.deviceID)
76 }
77 flowsToDelete = append(flowsToDelete, flowToReplace)
78 flowsToAdd = append(flowsToAdd, flow)
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070079 } else {
80 //No need to change the flow. It is already exist.
Rohan Agrawal31f21802020-06-12 05:38:46 +000081 logger.Debugw(ctx, "No-need-to-change-already-existing-flow", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "flow-metadata": flowMetadata})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070082 }
83 }
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070084 flowHandle.Unlock()
85 }
86
87 // Sanity check
88 if (len(flowsToAdd)) == 0 {
Rohan Agrawal31f21802020-06-12 05:38:46 +000089 logger.Debugw(ctx, "no-flows-to-update", log.Fields{"device-id": agent.deviceID, "flows": newFlows})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070090 return coreutils.DoneResponse(), nil
91 }
92
93 // Send update to adapters
Rohan Agrawalcf12f202020-08-03 04:42:01 +000094 subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070095 response := coreutils.NewResponse()
96 if !dType.AcceptsAddRemoveFlowUpdates {
Kent Hagermana7c9d792020-07-16 17:39:01 -040097
98 updatedAllFlows := agent.listDeviceFlows()
99 rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, updatedAllFlows, nil, flowMetadata)
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700100 if err != nil {
101 cancel()
102 return coreutils.DoneResponse(), err
103 }
104 go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
105 } else {
106 flowChanges := &ofp.FlowChanges{
107 ToAdd: &voltha.Flows{Items: flowsToAdd},
108 ToRemove: &voltha.Flows{Items: flowsToDelete},
109 }
110 groupChanges := &ofp.FlowGroupChanges{
111 ToAdd: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
112 ToRemove: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
113 ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
114 }
115 rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
116 if err != nil {
117 cancel()
118 return coreutils.DoneResponse(), err
119 }
120 go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
121 }
122 return response, nil
123}
124
125func (agent *Agent) deleteFlowsFromAdapter(ctx context.Context, flowsToDel []*ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000126 logger.Debugw(ctx, "delete-flows-from-adapter", log.Fields{"device-id": agent.deviceID, "flows": flowsToDel})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700127
128 if (len(flowsToDel)) == 0 {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000129 logger.Debugw(ctx, "nothing-to-delete", log.Fields{"device-id": agent.deviceID, "flows": flowsToDel})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700130 return coreutils.DoneResponse(), nil
131 }
132
Kent Hagermancba2f302020-07-28 13:37:36 -0400133 device, err := agent.getDeviceReadOnly(ctx)
134 if err != nil {
135 return coreutils.DoneResponse(), status.Errorf(codes.Aborted, "%s", err)
136 }
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700137 dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
138 if err != nil {
139 return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
140 }
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700141 for _, flow := range flowsToDel {
142 if flowHandle, have := agent.flowLoader.Lock(flow.Id); have {
143 // Update the store and cache
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700144 if err := flowHandle.Delete(ctx); err != nil {
145 flowHandle.Unlock()
146 return coreutils.DoneResponse(), err
147 }
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700148 flowHandle.Unlock()
149 }
150 }
151
152 // Send update to adapters
Rohan Agrawalcf12f202020-08-03 04:42:01 +0000153 subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700154 response := coreutils.NewResponse()
155 if !dType.AcceptsAddRemoveFlowUpdates {
Kent Hagermana7c9d792020-07-16 17:39:01 -0400156
157 updatedAllFlows := agent.listDeviceFlows()
158 rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, updatedAllFlows, nil, flowMetadata)
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700159 if err != nil {
160 cancel()
161 return coreutils.DoneResponse(), err
162 }
163 go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
164 } else {
165 flowChanges := &ofp.FlowChanges{
166 ToAdd: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
167 ToRemove: &voltha.Flows{Items: flowsToDel},
168 }
169 groupChanges := &ofp.FlowGroupChanges{
170 ToAdd: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
171 ToRemove: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
172 ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
173 }
174 rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
175 if err != nil {
176 cancel()
177 return coreutils.DoneResponse(), err
178 }
179 go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
180 }
181 return response, nil
182}
183
184func (agent *Agent) updateFlowsToAdapter(ctx context.Context, updatedFlows []*ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000185 logger.Debugw(ctx, "updateFlowsToAdapter", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700186
187 if (len(updatedFlows)) == 0 {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000188 logger.Debugw(ctx, "nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700189 return coreutils.DoneResponse(), nil
190 }
191
Kent Hagermancba2f302020-07-28 13:37:36 -0400192 device, err := agent.getDeviceReadOnly(ctx)
193 if err != nil {
194 return coreutils.DoneResponse(), status.Errorf(codes.Aborted, "%s", err)
195 }
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700196 if device.OperStatus != voltha.OperStatus_ACTIVE || device.ConnectStatus != voltha.ConnectStatus_REACHABLE || device.AdminState != voltha.AdminState_ENABLED {
197 return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "invalid device states")
198 }
199 dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
200 if err != nil {
201 return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
202 }
Kent Hagermana7c9d792020-07-16 17:39:01 -0400203 flowsToAdd := make([]*ofp.OfpFlowStats, 0, len(updatedFlows))
204 flowsToDelete := make([]*ofp.OfpFlowStats, 0, len(updatedFlows))
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700205 for _, flow := range updatedFlows {
206 if flowHandle, have := agent.flowLoader.Lock(flow.Id); have {
207 flowToDelete := flowHandle.GetReadOnly()
208 // Update the store and cache
209 if err := flowHandle.Update(ctx, flow); err != nil {
210 flowHandle.Unlock()
211 return coreutils.DoneResponse(), err
212 }
213
214 flowsToDelete = append(flowsToDelete, flowToDelete)
215 flowsToAdd = append(flowsToAdd, flow)
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700216 flowHandle.Unlock()
217 }
218 }
219
Rohan Agrawalcf12f202020-08-03 04:42:01 +0000220 subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700221 response := coreutils.NewResponse()
222 // Process bulk flow update differently than incremental update
223 if !dType.AcceptsAddRemoveFlowUpdates {
Kent Hagermana7c9d792020-07-16 17:39:01 -0400224 updatedAllFlows := agent.listDeviceFlows()
225 rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, updatedAllFlows, nil, nil)
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700226 if err != nil {
227 cancel()
228 return coreutils.DoneResponse(), err
229 }
230 go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
231 } else {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000232 logger.Debugw(ctx, "updating-flows-and-groups",
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700233 log.Fields{
234 "device-id": agent.deviceID,
235 "flows-to-add": flowsToAdd,
236 "flows-to-delete": flowsToDelete,
237 })
238 // Sanity check
239 if (len(flowsToAdd) | len(flowsToDelete)) == 0 {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000240 logger.Debugw(ctx, "nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700241 cancel()
242 return coreutils.DoneResponse(), nil
243 }
244
245 flowChanges := &ofp.FlowChanges{
246 ToAdd: &voltha.Flows{Items: flowsToAdd},
247 ToRemove: &voltha.Flows{Items: flowsToDelete},
248 }
249 groupChanges := &ofp.FlowGroupChanges{
250 ToAdd: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
251 ToRemove: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
252 ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
253 }
254 rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
255 if err != nil {
256 cancel()
257 return coreutils.DoneResponse(), err
258 }
259 go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
260 }
261
262 return response, nil
263}
264
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700265//filterOutFlows removes flows from a device using the uni-port as filter
266func (agent *Agent) filterOutFlows(ctx context.Context, uniPort uint32, flowMetadata *voltha.FlowMetadata) error {
267 var flowsToDelete []*ofp.OfpFlowStats
268 // If an existing flow has the uniPort as an InPort or OutPort or as a Tunnel ID then it needs to be removed
269 for flowID := range agent.flowLoader.ListIDs() {
270 if flowHandle, have := agent.flowLoader.Lock(flowID); have {
271 flow := flowHandle.GetReadOnly()
272 if flow != nil && (fu.GetInPort(flow) == uniPort || fu.GetOutPort(flow) == uniPort || fu.GetTunnelId(flow) == uint64(uniPort)) {
273 flowsToDelete = append(flowsToDelete, flow)
274 }
275 flowHandle.Unlock()
276 }
277 }
278
Rohan Agrawal31f21802020-06-12 05:38:46 +0000279 logger.Debugw(ctx, "flows-to-delete", log.Fields{"device-id": agent.deviceID, "uni-port": uniPort, "flows": flowsToDelete})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700280 if len(flowsToDelete) == 0 {
281 return nil
282 }
283
284 response, err := agent.deleteFlowsFromAdapter(ctx, flowsToDelete, flowMetadata)
285 if err != nil {
286 return err
287 }
288 if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, response); res != nil {
289 return status.Errorf(codes.Aborted, "errors-%s", res)
290 }
291 return nil
292}
293
294//deleteAllFlows deletes all flows in the device table
295func (agent *Agent) deleteAllFlows(ctx context.Context) error {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000296 logger.Debugw(ctx, "deleteAllFlows", log.Fields{"deviceId": agent.deviceID})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700297
298 for flowID := range agent.flowLoader.ListIDs() {
299 if flowHandle, have := agent.flowLoader.Lock(flowID); have {
300 // Update the store and cache
301 if err := flowHandle.Delete(ctx); err != nil {
302 flowHandle.Unlock()
Rohan Agrawal31f21802020-06-12 05:38:46 +0000303 logger.Errorw(ctx, "unable-to-delete-flow", log.Fields{"device-id": agent.deviceID, "flowID": flowID})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700304 continue
305 }
306 flowHandle.Unlock()
307 }
308 }
309 return nil
310}