blob: 09ca7ed8c5ccfc13df2f8bb38acd15bc8ddb9415 [file] [log] [blame]
Mahir Gunyel03de0d32020-06-03 01:36:59 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package device
18
19import (
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070020 "context"
21
22 "github.com/gogo/protobuf/proto"
23 coreutils "github.com/opencord/voltha-go/rw_core/utils"
24 fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
25 "github.com/opencord/voltha-lib-go/v3/pkg/log"
Mahir Gunyel03de0d32020-06-03 01:36:59 -070026 ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070027 "github.com/opencord/voltha-protos/v3/go/voltha"
28 "google.golang.org/grpc/codes"
29 "google.golang.org/grpc/status"
Mahir Gunyel03de0d32020-06-03 01:36:59 -070030)
31
32// listDeviceFlows returns device flows
33func (agent *Agent) listDeviceFlows() map[uint64]*ofp.OfpFlowStats {
Kent Hagermanfa9d6d42020-05-25 11:49:40 -040034 flowIDs := agent.flowLoader.ListIDs()
Mahir Gunyel03de0d32020-06-03 01:36:59 -070035 flows := make(map[uint64]*ofp.OfpFlowStats, len(flowIDs))
36 for flowID := range flowIDs {
37 if flowHandle, have := agent.flowLoader.Lock(flowID); have {
38 flows[flowID] = flowHandle.GetReadOnly()
39 flowHandle.Unlock()
40 }
41 }
42 return flows
43}
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070044
45func (agent *Agent) addFlowsToAdapter(ctx context.Context, newFlows []*ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +000046 logger.Debugw(ctx, "add-flows-to-adapters", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "flow-metadata": flowMetadata})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070047
48 if (len(newFlows)) == 0 {
Rohan Agrawal31f21802020-06-12 05:38:46 +000049 logger.Debugw(ctx, "nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": newFlows})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070050 return coreutils.DoneResponse(), nil
51 }
Kent Hagermanf6db9f12020-07-22 17:16:19 -040052 device := agent.getDeviceReadOnly()
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070053 dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
54 if err != nil {
55 return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
56 }
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070057 flowsToAdd := make([]*ofp.OfpFlowStats, 0)
58 flowsToDelete := make([]*ofp.OfpFlowStats, 0)
59 for _, flow := range newFlows {
60 flowHandle, created, err := agent.flowLoader.LockOrCreate(ctx, flow)
61 if err != nil {
62 return coreutils.DoneResponse(), err
63 }
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070064 if created {
65 flowsToAdd = append(flowsToAdd, flow)
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070066 } else {
67 flowToReplace := flowHandle.GetReadOnly()
68 if !proto.Equal(flowToReplace, flow) {
69 //Flow needs to be updated.
70 if err := flowHandle.Update(ctx, flow); err != nil {
71 flowHandle.Unlock()
72 return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-flow-%d-to-device-%s", flow.Id, agent.deviceID)
73 }
74 flowsToDelete = append(flowsToDelete, flowToReplace)
75 flowsToAdd = append(flowsToAdd, flow)
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070076 } else {
77 //No need to change the flow. It is already exist.
Rohan Agrawal31f21802020-06-12 05:38:46 +000078 logger.Debugw(ctx, "No-need-to-change-already-existing-flow", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "flow-metadata": flowMetadata})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070079 }
80 }
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070081 flowHandle.Unlock()
82 }
83
84 // Sanity check
85 if (len(flowsToAdd)) == 0 {
Rohan Agrawal31f21802020-06-12 05:38:46 +000086 logger.Debugw(ctx, "no-flows-to-update", log.Fields{"device-id": agent.deviceID, "flows": newFlows})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070087 return coreutils.DoneResponse(), nil
88 }
89
90 // Send update to adapters
91 subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
92 response := coreutils.NewResponse()
93 if !dType.AcceptsAddRemoveFlowUpdates {
Kent Hagermana7c9d792020-07-16 17:39:01 -040094
95 updatedAllFlows := agent.listDeviceFlows()
96 rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, updatedAllFlows, nil, flowMetadata)
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070097 if err != nil {
98 cancel()
99 return coreutils.DoneResponse(), err
100 }
101 go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
102 } else {
103 flowChanges := &ofp.FlowChanges{
104 ToAdd: &voltha.Flows{Items: flowsToAdd},
105 ToRemove: &voltha.Flows{Items: flowsToDelete},
106 }
107 groupChanges := &ofp.FlowGroupChanges{
108 ToAdd: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
109 ToRemove: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
110 ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
111 }
112 rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
113 if err != nil {
114 cancel()
115 return coreutils.DoneResponse(), err
116 }
117 go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
118 }
119 return response, nil
120}
121
122func (agent *Agent) deleteFlowsFromAdapter(ctx context.Context, flowsToDel []*ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000123 logger.Debugw(ctx, "delete-flows-from-adapter", log.Fields{"device-id": agent.deviceID, "flows": flowsToDel})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700124
125 if (len(flowsToDel)) == 0 {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000126 logger.Debugw(ctx, "nothing-to-delete", log.Fields{"device-id": agent.deviceID, "flows": flowsToDel})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700127 return coreutils.DoneResponse(), nil
128 }
129
Kent Hagermanf6db9f12020-07-22 17:16:19 -0400130 device := agent.getDeviceReadOnly()
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700131 dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
132 if err != nil {
133 return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
134 }
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700135 for _, flow := range flowsToDel {
136 if flowHandle, have := agent.flowLoader.Lock(flow.Id); have {
137 // Update the store and cache
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700138 if err := flowHandle.Delete(ctx); err != nil {
139 flowHandle.Unlock()
140 return coreutils.DoneResponse(), err
141 }
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700142 flowHandle.Unlock()
143 }
144 }
145
146 // Send update to adapters
147 subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
148 response := coreutils.NewResponse()
149 if !dType.AcceptsAddRemoveFlowUpdates {
Kent Hagermana7c9d792020-07-16 17:39:01 -0400150
151 updatedAllFlows := agent.listDeviceFlows()
152 rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, updatedAllFlows, nil, flowMetadata)
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700153 if err != nil {
154 cancel()
155 return coreutils.DoneResponse(), err
156 }
157 go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
158 } else {
159 flowChanges := &ofp.FlowChanges{
160 ToAdd: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
161 ToRemove: &voltha.Flows{Items: flowsToDel},
162 }
163 groupChanges := &ofp.FlowGroupChanges{
164 ToAdd: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
165 ToRemove: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
166 ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
167 }
168 rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
169 if err != nil {
170 cancel()
171 return coreutils.DoneResponse(), err
172 }
173 go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
174 }
175 return response, nil
176}
177
178func (agent *Agent) updateFlowsToAdapter(ctx context.Context, updatedFlows []*ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000179 logger.Debugw(ctx, "updateFlowsToAdapter", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700180
181 if (len(updatedFlows)) == 0 {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000182 logger.Debugw(ctx, "nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700183 return coreutils.DoneResponse(), nil
184 }
185
Kent Hagermanf6db9f12020-07-22 17:16:19 -0400186 device := agent.getDeviceReadOnly()
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700187 if device.OperStatus != voltha.OperStatus_ACTIVE || device.ConnectStatus != voltha.ConnectStatus_REACHABLE || device.AdminState != voltha.AdminState_ENABLED {
188 return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "invalid device states")
189 }
190 dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
191 if err != nil {
192 return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
193 }
Kent Hagermana7c9d792020-07-16 17:39:01 -0400194 flowsToAdd := make([]*ofp.OfpFlowStats, 0, len(updatedFlows))
195 flowsToDelete := make([]*ofp.OfpFlowStats, 0, len(updatedFlows))
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700196 for _, flow := range updatedFlows {
197 if flowHandle, have := agent.flowLoader.Lock(flow.Id); have {
198 flowToDelete := flowHandle.GetReadOnly()
199 // Update the store and cache
200 if err := flowHandle.Update(ctx, flow); err != nil {
201 flowHandle.Unlock()
202 return coreutils.DoneResponse(), err
203 }
204
205 flowsToDelete = append(flowsToDelete, flowToDelete)
206 flowsToAdd = append(flowsToAdd, flow)
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700207 flowHandle.Unlock()
208 }
209 }
210
211 subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
212 response := coreutils.NewResponse()
213 // Process bulk flow update differently than incremental update
214 if !dType.AcceptsAddRemoveFlowUpdates {
Kent Hagermana7c9d792020-07-16 17:39:01 -0400215 updatedAllFlows := agent.listDeviceFlows()
216 rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, updatedAllFlows, nil, nil)
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700217 if err != nil {
218 cancel()
219 return coreutils.DoneResponse(), err
220 }
221 go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
222 } else {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000223 logger.Debugw(ctx, "updating-flows-and-groups",
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700224 log.Fields{
225 "device-id": agent.deviceID,
226 "flows-to-add": flowsToAdd,
227 "flows-to-delete": flowsToDelete,
228 })
229 // Sanity check
230 if (len(flowsToAdd) | len(flowsToDelete)) == 0 {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000231 logger.Debugw(ctx, "nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700232 cancel()
233 return coreutils.DoneResponse(), nil
234 }
235
236 flowChanges := &ofp.FlowChanges{
237 ToAdd: &voltha.Flows{Items: flowsToAdd},
238 ToRemove: &voltha.Flows{Items: flowsToDelete},
239 }
240 groupChanges := &ofp.FlowGroupChanges{
241 ToAdd: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
242 ToRemove: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
243 ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
244 }
245 rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
246 if err != nil {
247 cancel()
248 return coreutils.DoneResponse(), err
249 }
250 go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
251 }
252
253 return response, nil
254}
255
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700256//filterOutFlows removes flows from a device using the uni-port as filter
257func (agent *Agent) filterOutFlows(ctx context.Context, uniPort uint32, flowMetadata *voltha.FlowMetadata) error {
258 var flowsToDelete []*ofp.OfpFlowStats
259 // If an existing flow has the uniPort as an InPort or OutPort or as a Tunnel ID then it needs to be removed
260 for flowID := range agent.flowLoader.ListIDs() {
261 if flowHandle, have := agent.flowLoader.Lock(flowID); have {
262 flow := flowHandle.GetReadOnly()
263 if flow != nil && (fu.GetInPort(flow) == uniPort || fu.GetOutPort(flow) == uniPort || fu.GetTunnelId(flow) == uint64(uniPort)) {
264 flowsToDelete = append(flowsToDelete, flow)
265 }
266 flowHandle.Unlock()
267 }
268 }
269
Rohan Agrawal31f21802020-06-12 05:38:46 +0000270 logger.Debugw(ctx, "flows-to-delete", log.Fields{"device-id": agent.deviceID, "uni-port": uniPort, "flows": flowsToDelete})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700271 if len(flowsToDelete) == 0 {
272 return nil
273 }
274
275 response, err := agent.deleteFlowsFromAdapter(ctx, flowsToDelete, flowMetadata)
276 if err != nil {
277 return err
278 }
279 if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, response); res != nil {
280 return status.Errorf(codes.Aborted, "errors-%s", res)
281 }
282 return nil
283}
284
285//deleteAllFlows deletes all flows in the device table
286func (agent *Agent) deleteAllFlows(ctx context.Context) error {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000287 logger.Debugw(ctx, "deleteAllFlows", log.Fields{"deviceId": agent.deviceID})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700288
289 for flowID := range agent.flowLoader.ListIDs() {
290 if flowHandle, have := agent.flowLoader.Lock(flowID); have {
291 // Update the store and cache
292 if err := flowHandle.Delete(ctx); err != nil {
293 flowHandle.Unlock()
Rohan Agrawal31f21802020-06-12 05:38:46 +0000294 logger.Errorw(ctx, "unable-to-delete-flow", log.Fields{"device-id": agent.deviceID, "flowID": flowID})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700295 continue
296 }
297 flowHandle.Unlock()
298 }
299 }
300 return nil
301}