blob: e11aee8f079229eeb8db211010961b3d56807a04 [file] [log] [blame]
Mahir Gunyel03de0d32020-06-03 01:36:59 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package device
18
19import (
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070020 "context"
21 "strconv"
22
23 "github.com/gogo/protobuf/proto"
24 coreutils "github.com/opencord/voltha-go/rw_core/utils"
25 fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
26 "github.com/opencord/voltha-lib-go/v3/pkg/log"
Mahir Gunyel03de0d32020-06-03 01:36:59 -070027 ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070028 "github.com/opencord/voltha-protos/v3/go/voltha"
29 "google.golang.org/grpc/codes"
30 "google.golang.org/grpc/status"
Mahir Gunyel03de0d32020-06-03 01:36:59 -070031)
32
33// listDeviceGroups returns logical device flow groups
34func (agent *Agent) listDeviceGroups() map[uint32]*ofp.OfpGroupEntry {
Kent Hagermanfa9d6d42020-05-25 11:49:40 -040035 groupIDs := agent.groupLoader.ListIDs()
Mahir Gunyel03de0d32020-06-03 01:36:59 -070036 groups := make(map[uint32]*ofp.OfpGroupEntry, len(groupIDs))
37 for groupID := range groupIDs {
38 if groupHandle, have := agent.groupLoader.Lock(groupID); have {
39 groups[groupID] = groupHandle.GetReadOnly()
40 groupHandle.Unlock()
41 }
42 }
43 return groups
44}
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070045
46func (agent *Agent) addGroupsToAdapter(ctx context.Context, newGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +000047 logger.Debugw(ctx, "add-groups-to-adapters", log.Fields{"device-id": agent.deviceID, "groups": newGroups, "flow-metadata": flowMetadata})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070048
49 if (len(newGroups)) == 0 {
Rohan Agrawal31f21802020-06-12 05:38:46 +000050 logger.Debugw(ctx, "nothing-to-update", log.Fields{"device-id": agent.deviceID, "groups": newGroups})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070051 return coreutils.DoneResponse(), nil
52 }
53
54 device := agent.getDeviceWithoutLock()
55 dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
56 if err != nil {
57 return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
58 }
59 updatedAllGroups := make([]*ofp.OfpGroupEntry, 0)
60 if !dType.AcceptsAddRemoveFlowUpdates {
61 groupIDs := agent.groupLoader.ListIDs()
62 for groupID := range groupIDs {
63 if grpHandle, have := agent.groupLoader.Lock(groupID); have {
64 updatedAllGroups = append(updatedAllGroups, grpHandle.GetReadOnly())
65 grpHandle.Unlock()
66 }
67 }
68 }
69
70 groupsToAdd := make([]*ofp.OfpGroupEntry, 0)
71 groupsToDelete := make([]*ofp.OfpGroupEntry, 0)
72 for _, group := range newGroups {
73
74 groupHandle, created, err := agent.groupLoader.LockOrCreate(ctx, group)
75 if err != nil {
76 return coreutils.DoneResponse(), err
77 }
78
79 if created {
80 groupsToAdd = append(groupsToAdd, group)
81 updatedAllGroups = append(updatedAllGroups, group)
82 } else {
83 groupToChange := groupHandle.GetReadOnly()
84 if !proto.Equal(groupToChange, group) {
85 //Group needs to be updated.
86 if err := groupHandle.Update(ctx, group); err != nil {
87 groupHandle.Unlock()
88 return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-group-%s-to-device-%s", strconv.Itoa(int(group.Desc.GroupId)), agent.deviceID)
89 }
90 groupsToDelete = append(groupsToDelete, groupToChange)
91 groupsToAdd = append(groupsToAdd, group)
92 updatedAllGroups = replaceGroupInList(updatedAllGroups, groupToChange, group)
93 } else {
94 //No need to change the group. It is already exist.
Rohan Agrawal31f21802020-06-12 05:38:46 +000095 logger.Debugw(ctx, "No-need-to-change-already-existing-group", log.Fields{"device-id": agent.deviceID, "group": newGroups, "flow-metadata": flowMetadata})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070096 }
97 }
98
99 groupHandle.Unlock()
100 }
101 // Sanity check
102 if (len(groupsToAdd)) == 0 {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000103 logger.Debugw(ctx, "no-groups-to-update", log.Fields{"device-id": agent.deviceID, "groups": newGroups})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700104 return coreutils.DoneResponse(), nil
105 }
106
107 // Send update to adapters
108 subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
109 response := coreutils.NewResponse()
110 if !dType.AcceptsAddRemoveFlowUpdates {
111 rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: []*ofp.OfpFlowStats{}}, &voltha.FlowGroups{Items: updatedAllGroups}, flowMetadata)
112 if err != nil {
113 cancel()
114 return coreutils.DoneResponse(), err
115 }
116 go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
117 } else {
118 flowChanges := &ofp.FlowChanges{
119 ToAdd: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
120 ToRemove: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
121 }
122 groupChanges := &ofp.FlowGroupChanges{
123 ToAdd: &voltha.FlowGroups{Items: groupsToAdd},
124 ToRemove: &voltha.FlowGroups{Items: groupsToDelete},
125 ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
126 }
127 rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
128 if err != nil {
129 cancel()
130 return coreutils.DoneResponse(), err
131 }
132 go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
133 }
134 return response, nil
135}
136
137func (agent *Agent) deleteGroupsFromAdapter(ctx context.Context, groupsToDel []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000138 logger.Debugw(ctx, "delete-groups-from-adapter", log.Fields{"device-id": agent.deviceID, "groups": groupsToDel})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700139
140 if (len(groupsToDel)) == 0 {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000141 logger.Debugw(ctx, "nothing-to-delete", log.Fields{"device-id": agent.deviceID})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700142 return coreutils.DoneResponse(), nil
143 }
144 device := agent.getDeviceWithoutLock()
145 dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
146 if err != nil {
147 return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
148 }
149 updatedAllGroups := make([]*ofp.OfpGroupEntry, 0)
150 if !dType.AcceptsAddRemoveFlowUpdates {
151 groupIDs := agent.groupLoader.ListIDs()
152 for groupID := range groupIDs {
153 if grpHandle, have := agent.groupLoader.Lock(groupID); have {
154 updatedAllGroups = append(updatedAllGroups, grpHandle.GetReadOnly())
155 grpHandle.Unlock()
156 }
157 }
158 }
159
160 for _, group := range groupsToDel {
161 if groupHandle, have := agent.groupLoader.Lock(group.Desc.GroupId); have {
162 // Update the store and cache
163 if err := groupHandle.Delete(ctx); err != nil {
164 groupHandle.Unlock()
165 return coreutils.DoneResponse(), err
166 }
167 if idx := fu.FindGroup(updatedAllGroups, group.Desc.GroupId); idx != -1 {
168 updatedAllGroups = deleteGroupWithoutPreservingOrder(updatedAllGroups, idx)
169 }
170 groupHandle.Unlock()
171 }
172 }
173
174 // Send update to adapters
175 subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
176 response := coreutils.NewResponse()
177 if !dType.AcceptsAddRemoveFlowUpdates {
178 rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: []*ofp.OfpFlowStats{}}, &voltha.FlowGroups{Items: updatedAllGroups}, flowMetadata)
179 if err != nil {
180 cancel()
181 return coreutils.DoneResponse(), err
182 }
183 go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
184 } else {
185 flowChanges := &ofp.FlowChanges{
186 ToAdd: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
187 ToRemove: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
188 }
189 groupChanges := &ofp.FlowGroupChanges{
190 ToAdd: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
191 ToRemove: &voltha.FlowGroups{Items: groupsToDel},
192 ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
193 }
194 rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
195 if err != nil {
196 cancel()
197 return coreutils.DoneResponse(), err
198 }
199 go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
200 }
201 return response, nil
202}
203
204func (agent *Agent) updateGroupsToAdapter(ctx context.Context, updatedGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000205 logger.Debugw(ctx, "updateGroupsToAdapter", log.Fields{"device-id": agent.deviceID, "groups": updatedGroups})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700206
207 if (len(updatedGroups)) == 0 {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000208 logger.Debugw(ctx, "nothing-to-update", log.Fields{"device-id": agent.deviceID, "groups": updatedGroups})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700209 return coreutils.DoneResponse(), nil
210 }
211
212 device := agent.getDeviceWithoutLock()
213 if device.OperStatus != voltha.OperStatus_ACTIVE || device.ConnectStatus != voltha.ConnectStatus_REACHABLE || device.AdminState != voltha.AdminState_ENABLED {
214 return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "invalid device states-oper-%s-connect-%s-admin-%s", device.OperStatus, device.ConnectStatus, device.AdminState)
215 }
216 dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
217 if err != nil {
218 return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
219 }
220 updatedAllGroups := make([]*ofp.OfpGroupEntry, 0)
221 if !dType.AcceptsAddRemoveFlowUpdates {
222 groupIDs := agent.groupLoader.ListIDs()
223 for groupID := range groupIDs {
224 if grpHandle, have := agent.groupLoader.Lock(groupID); have {
225 updatedAllGroups = append(updatedAllGroups, grpHandle.GetReadOnly())
226 grpHandle.Unlock()
227 }
228 }
229 }
230 groupsToUpdate := make([]*ofp.OfpGroupEntry, 0)
231
232 for _, group := range updatedGroups {
233 if groupHandle, have := agent.groupLoader.Lock(group.Desc.GroupId); have {
234 // Update the store and cache
235 if err := groupHandle.Update(ctx, group); err != nil {
236 groupHandle.Unlock()
237 return coreutils.DoneResponse(), err
238 }
239 groupsToUpdate = append(groupsToUpdate, group)
240 updatedAllGroups = replaceGroupInList(updatedAllGroups, groupHandle.GetReadOnly(), group)
241 groupHandle.Unlock()
242 }
243 }
244
245 subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
246 response := coreutils.NewResponse()
247 // Process bulk flow update differently than incremental update
248 if !dType.AcceptsAddRemoveFlowUpdates {
249 rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: []*ofp.OfpFlowStats{}}, &voltha.FlowGroups{Items: updatedAllGroups}, nil)
250 if err != nil {
251 cancel()
252 return coreutils.DoneResponse(), err
253 }
254 go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
255 } else {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000256 logger.Debugw(ctx, "updating-groups",
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700257 log.Fields{
258 "device-id": agent.deviceID,
259 "groups-to-update": groupsToUpdate,
260 })
261
262 // Sanity check
263 if (len(groupsToUpdate)) == 0 {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000264 logger.Debugw(ctx, "nothing-to-update", log.Fields{"device-id": agent.deviceID, "groups": groupsToUpdate})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700265 cancel()
266 return coreutils.DoneResponse(), nil
267 }
268
269 flowChanges := &ofp.FlowChanges{
270 ToAdd: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
271 ToRemove: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
272 }
273 groupChanges := &ofp.FlowGroupChanges{
274 ToAdd: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
275 ToRemove: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
276 ToUpdate: &voltha.FlowGroups{Items: groupsToUpdate},
277 }
278 rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
279 if err != nil {
280 cancel()
281 return coreutils.DoneResponse(), err
282 }
283 go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
284 }
285
286 return response, nil
287}
288
289//replaceGroupInList removes the old group from list and adds the new one.
290func replaceGroupInList(groupList []*ofp.OfpGroupEntry, oldGroup *ofp.OfpGroupEntry, newGroup *ofp.OfpGroupEntry) []*ofp.OfpGroupEntry {
291 if idx := fu.FindGroup(groupList, oldGroup.Desc.GroupId); idx != -1 {
292 groupList = deleteGroupWithoutPreservingOrder(groupList, idx)
293 }
294 groupList = append(groupList, newGroup)
295 return groupList
296}
297
298//deleteGroupWithoutPreservingOrder removes a group specified by index from the groups slice. This function will
299//panic if the index is out of range.
300func deleteGroupWithoutPreservingOrder(groups []*ofp.OfpGroupEntry, index int) []*ofp.OfpGroupEntry {
301 groups[index] = groups[len(groups)-1]
302 groups[len(groups)-1] = nil
303 return groups[:len(groups)-1]
304}