blob: 96254e4fca730f54812f9e4a8081751d584f0af7 [file] [log] [blame]
Mahir Gunyel03de0d32020-06-03 01:36:59 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package device
18
19import (
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070020 "context"
21 "strconv"
22
23 "github.com/gogo/protobuf/proto"
24 coreutils "github.com/opencord/voltha-go/rw_core/utils"
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070025 "github.com/opencord/voltha-lib-go/v3/pkg/log"
Mahir Gunyel03de0d32020-06-03 01:36:59 -070026 ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070027 "github.com/opencord/voltha-protos/v3/go/voltha"
28 "google.golang.org/grpc/codes"
29 "google.golang.org/grpc/status"
Mahir Gunyel03de0d32020-06-03 01:36:59 -070030)
31
32// listDeviceGroups returns logical device flow groups
33func (agent *Agent) listDeviceGroups() map[uint32]*ofp.OfpGroupEntry {
Kent Hagermanfa9d6d42020-05-25 11:49:40 -040034 groupIDs := agent.groupLoader.ListIDs()
Mahir Gunyel03de0d32020-06-03 01:36:59 -070035 groups := make(map[uint32]*ofp.OfpGroupEntry, len(groupIDs))
36 for groupID := range groupIDs {
37 if groupHandle, have := agent.groupLoader.Lock(groupID); have {
38 groups[groupID] = groupHandle.GetReadOnly()
39 groupHandle.Unlock()
40 }
41 }
42 return groups
43}
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070044
45func (agent *Agent) addGroupsToAdapter(ctx context.Context, newGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +000046 logger.Debugw(ctx, "add-groups-to-adapters", log.Fields{"device-id": agent.deviceID, "groups": newGroups, "flow-metadata": flowMetadata})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070047
48 if (len(newGroups)) == 0 {
Rohan Agrawal31f21802020-06-12 05:38:46 +000049 logger.Debugw(ctx, "nothing-to-update", log.Fields{"device-id": agent.deviceID, "groups": newGroups})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070050 return coreutils.DoneResponse(), nil
51 }
52
Kent Hagermancba2f302020-07-28 13:37:36 -040053 device, err := agent.getDeviceReadOnly(ctx)
54 if err != nil {
55 return coreutils.DoneResponse(), status.Errorf(codes.Aborted, "%s", err)
56 }
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070057 dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
58 if err != nil {
59 return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
60 }
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070061
62 groupsToAdd := make([]*ofp.OfpGroupEntry, 0)
63 groupsToDelete := make([]*ofp.OfpGroupEntry, 0)
64 for _, group := range newGroups {
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070065 groupHandle, created, err := agent.groupLoader.LockOrCreate(ctx, group)
66 if err != nil {
67 return coreutils.DoneResponse(), err
68 }
69
70 if created {
71 groupsToAdd = append(groupsToAdd, group)
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070072 } else {
73 groupToChange := groupHandle.GetReadOnly()
74 if !proto.Equal(groupToChange, group) {
75 //Group needs to be updated.
76 if err := groupHandle.Update(ctx, group); err != nil {
77 groupHandle.Unlock()
78 return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-group-%s-to-device-%s", strconv.Itoa(int(group.Desc.GroupId)), agent.deviceID)
79 }
80 groupsToDelete = append(groupsToDelete, groupToChange)
81 groupsToAdd = append(groupsToAdd, group)
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070082 } else {
83 //No need to change the group. It is already exist.
Rohan Agrawal31f21802020-06-12 05:38:46 +000084 logger.Debugw(ctx, "No-need-to-change-already-existing-group", log.Fields{"device-id": agent.deviceID, "group": newGroups, "flow-metadata": flowMetadata})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070085 }
86 }
87
88 groupHandle.Unlock()
89 }
90 // Sanity check
91 if (len(groupsToAdd)) == 0 {
Rohan Agrawal31f21802020-06-12 05:38:46 +000092 logger.Debugw(ctx, "no-groups-to-update", log.Fields{"device-id": agent.deviceID, "groups": newGroups})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070093 return coreutils.DoneResponse(), nil
94 }
95
96 // Send update to adapters
97 subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
98 response := coreutils.NewResponse()
99 if !dType.AcceptsAddRemoveFlowUpdates {
Kent Hagermana7c9d792020-07-16 17:39:01 -0400100 updatedAllGroups := agent.listDeviceGroups()
101 rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, nil, updatedAllGroups, flowMetadata)
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700102 if err != nil {
103 cancel()
104 return coreutils.DoneResponse(), err
105 }
106 go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
107 } else {
108 flowChanges := &ofp.FlowChanges{
109 ToAdd: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
110 ToRemove: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
111 }
112 groupChanges := &ofp.FlowGroupChanges{
113 ToAdd: &voltha.FlowGroups{Items: groupsToAdd},
114 ToRemove: &voltha.FlowGroups{Items: groupsToDelete},
115 ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
116 }
117 rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
118 if err != nil {
119 cancel()
120 return coreutils.DoneResponse(), err
121 }
122 go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
123 }
124 return response, nil
125}
126
127func (agent *Agent) deleteGroupsFromAdapter(ctx context.Context, groupsToDel []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000128 logger.Debugw(ctx, "delete-groups-from-adapter", log.Fields{"device-id": agent.deviceID, "groups": groupsToDel})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700129
130 if (len(groupsToDel)) == 0 {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000131 logger.Debugw(ctx, "nothing-to-delete", log.Fields{"device-id": agent.deviceID})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700132 return coreutils.DoneResponse(), nil
133 }
Kent Hagermancba2f302020-07-28 13:37:36 -0400134 device, err := agent.getDeviceReadOnly(ctx)
135 if err != nil {
136 return coreutils.DoneResponse(), status.Errorf(codes.Aborted, "%s", err)
137 }
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700138 dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
139 if err != nil {
140 return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
141 }
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700142
143 for _, group := range groupsToDel {
144 if groupHandle, have := agent.groupLoader.Lock(group.Desc.GroupId); have {
145 // Update the store and cache
146 if err := groupHandle.Delete(ctx); err != nil {
147 groupHandle.Unlock()
148 return coreutils.DoneResponse(), err
149 }
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700150 groupHandle.Unlock()
151 }
152 }
153
154 // Send update to adapters
155 subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
156 response := coreutils.NewResponse()
157 if !dType.AcceptsAddRemoveFlowUpdates {
Kent Hagermana7c9d792020-07-16 17:39:01 -0400158 updatedAllGroups := agent.listDeviceGroups()
159 rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, nil, updatedAllGroups, flowMetadata)
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700160 if err != nil {
161 cancel()
162 return coreutils.DoneResponse(), err
163 }
164 go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
165 } else {
166 flowChanges := &ofp.FlowChanges{
167 ToAdd: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
168 ToRemove: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
169 }
170 groupChanges := &ofp.FlowGroupChanges{
171 ToAdd: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
172 ToRemove: &voltha.FlowGroups{Items: groupsToDel},
173 ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
174 }
175 rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
176 if err != nil {
177 cancel()
178 return coreutils.DoneResponse(), err
179 }
180 go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
181 }
182 return response, nil
183}
184
185func (agent *Agent) updateGroupsToAdapter(ctx context.Context, updatedGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000186 logger.Debugw(ctx, "updateGroupsToAdapter", log.Fields{"device-id": agent.deviceID, "groups": updatedGroups})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700187
188 if (len(updatedGroups)) == 0 {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000189 logger.Debugw(ctx, "nothing-to-update", log.Fields{"device-id": agent.deviceID, "groups": updatedGroups})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700190 return coreutils.DoneResponse(), nil
191 }
192
Kent Hagermancba2f302020-07-28 13:37:36 -0400193 device, err := agent.getDeviceReadOnly(ctx)
194 if err != nil {
195 return coreutils.DoneResponse(), status.Errorf(codes.Aborted, "%s", err)
196 }
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700197 if device.OperStatus != voltha.OperStatus_ACTIVE || device.ConnectStatus != voltha.ConnectStatus_REACHABLE || device.AdminState != voltha.AdminState_ENABLED {
198 return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "invalid device states-oper-%s-connect-%s-admin-%s", device.OperStatus, device.ConnectStatus, device.AdminState)
199 }
200 dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
201 if err != nil {
202 return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
203 }
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700204
Kent Hagermana7c9d792020-07-16 17:39:01 -0400205 groupsToUpdate := make([]*ofp.OfpGroupEntry, 0)
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700206 for _, group := range updatedGroups {
207 if groupHandle, have := agent.groupLoader.Lock(group.Desc.GroupId); have {
208 // Update the store and cache
209 if err := groupHandle.Update(ctx, group); err != nil {
210 groupHandle.Unlock()
211 return coreutils.DoneResponse(), err
212 }
213 groupsToUpdate = append(groupsToUpdate, group)
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700214 groupHandle.Unlock()
215 }
216 }
217
218 subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
219 response := coreutils.NewResponse()
220 // Process bulk flow update differently than incremental update
221 if !dType.AcceptsAddRemoveFlowUpdates {
Kent Hagermana7c9d792020-07-16 17:39:01 -0400222 updatedAllGroups := agent.listDeviceGroups()
223 rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, nil, updatedAllGroups, nil)
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700224 if err != nil {
225 cancel()
226 return coreutils.DoneResponse(), err
227 }
228 go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
229 } else {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000230 logger.Debugw(ctx, "updating-groups",
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700231 log.Fields{
232 "device-id": agent.deviceID,
233 "groups-to-update": groupsToUpdate,
234 })
235
236 // Sanity check
237 if (len(groupsToUpdate)) == 0 {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000238 logger.Debugw(ctx, "nothing-to-update", log.Fields{"device-id": agent.deviceID, "groups": groupsToUpdate})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700239 cancel()
240 return coreutils.DoneResponse(), nil
241 }
242
243 flowChanges := &ofp.FlowChanges{
244 ToAdd: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
245 ToRemove: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
246 }
247 groupChanges := &ofp.FlowGroupChanges{
248 ToAdd: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
249 ToRemove: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
250 ToUpdate: &voltha.FlowGroups{Items: groupsToUpdate},
251 }
252 rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
253 if err != nil {
254 cancel()
255 return coreutils.DoneResponse(), err
256 }
257 go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
258 }
259
260 return response, nil
261}