blob: da2b7c570e7768436b40d07c26ab2288d3541150 [file] [log] [blame]
Mahir Gunyel03de0d32020-06-03 01:36:59 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package device
18
19import (
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070020 "context"
21 "strconv"
22
23 "github.com/gogo/protobuf/proto"
24 coreutils "github.com/opencord/voltha-go/rw_core/utils"
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070025 "github.com/opencord/voltha-lib-go/v3/pkg/log"
Mahir Gunyel03de0d32020-06-03 01:36:59 -070026 ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070027 "github.com/opencord/voltha-protos/v3/go/voltha"
28 "google.golang.org/grpc/codes"
29 "google.golang.org/grpc/status"
Mahir Gunyel03de0d32020-06-03 01:36:59 -070030)
31
32// listDeviceGroups returns logical device flow groups
33func (agent *Agent) listDeviceGroups() map[uint32]*ofp.OfpGroupEntry {
Kent Hagermanfa9d6d42020-05-25 11:49:40 -040034 groupIDs := agent.groupLoader.ListIDs()
Mahir Gunyel03de0d32020-06-03 01:36:59 -070035 groups := make(map[uint32]*ofp.OfpGroupEntry, len(groupIDs))
36 for groupID := range groupIDs {
37 if groupHandle, have := agent.groupLoader.Lock(groupID); have {
38 groups[groupID] = groupHandle.GetReadOnly()
39 groupHandle.Unlock()
40 }
41 }
42 return groups
43}
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070044
45func (agent *Agent) addGroupsToAdapter(ctx context.Context, newGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +000046 logger.Debugw(ctx, "add-groups-to-adapters", log.Fields{"device-id": agent.deviceID, "groups": newGroups, "flow-metadata": flowMetadata})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070047
48 if (len(newGroups)) == 0 {
Rohan Agrawal31f21802020-06-12 05:38:46 +000049 logger.Debugw(ctx, "nothing-to-update", log.Fields{"device-id": agent.deviceID, "groups": newGroups})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070050 return coreutils.DoneResponse(), nil
51 }
52
Kent Hagermanf6db9f12020-07-22 17:16:19 -040053 device := agent.getDeviceReadOnly()
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070054 dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
55 if err != nil {
56 return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
57 }
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070058
59 groupsToAdd := make([]*ofp.OfpGroupEntry, 0)
60 groupsToDelete := make([]*ofp.OfpGroupEntry, 0)
61 for _, group := range newGroups {
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070062 groupHandle, created, err := agent.groupLoader.LockOrCreate(ctx, group)
63 if err != nil {
64 return coreutils.DoneResponse(), err
65 }
66
67 if created {
68 groupsToAdd = append(groupsToAdd, group)
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070069 } else {
70 groupToChange := groupHandle.GetReadOnly()
71 if !proto.Equal(groupToChange, group) {
72 //Group needs to be updated.
73 if err := groupHandle.Update(ctx, group); err != nil {
74 groupHandle.Unlock()
75 return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-group-%s-to-device-%s", strconv.Itoa(int(group.Desc.GroupId)), agent.deviceID)
76 }
77 groupsToDelete = append(groupsToDelete, groupToChange)
78 groupsToAdd = append(groupsToAdd, group)
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070079 } else {
80 //No need to change the group. It is already exist.
Rohan Agrawal31f21802020-06-12 05:38:46 +000081 logger.Debugw(ctx, "No-need-to-change-already-existing-group", log.Fields{"device-id": agent.deviceID, "group": newGroups, "flow-metadata": flowMetadata})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070082 }
83 }
84
85 groupHandle.Unlock()
86 }
87 // Sanity check
88 if (len(groupsToAdd)) == 0 {
Rohan Agrawal31f21802020-06-12 05:38:46 +000089 logger.Debugw(ctx, "no-groups-to-update", log.Fields{"device-id": agent.deviceID, "groups": newGroups})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070090 return coreutils.DoneResponse(), nil
91 }
92
93 // Send update to adapters
94 subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
95 response := coreutils.NewResponse()
96 if !dType.AcceptsAddRemoveFlowUpdates {
Kent Hagermana7c9d792020-07-16 17:39:01 -040097 updatedAllGroups := agent.listDeviceGroups()
98 rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, nil, updatedAllGroups, flowMetadata)
Mahir Gunyelfa6ea272020-06-10 17:03:51 -070099 if err != nil {
100 cancel()
101 return coreutils.DoneResponse(), err
102 }
103 go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
104 } else {
105 flowChanges := &ofp.FlowChanges{
106 ToAdd: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
107 ToRemove: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
108 }
109 groupChanges := &ofp.FlowGroupChanges{
110 ToAdd: &voltha.FlowGroups{Items: groupsToAdd},
111 ToRemove: &voltha.FlowGroups{Items: groupsToDelete},
112 ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
113 }
114 rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
115 if err != nil {
116 cancel()
117 return coreutils.DoneResponse(), err
118 }
119 go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
120 }
121 return response, nil
122}
123
124func (agent *Agent) deleteGroupsFromAdapter(ctx context.Context, groupsToDel []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000125 logger.Debugw(ctx, "delete-groups-from-adapter", log.Fields{"device-id": agent.deviceID, "groups": groupsToDel})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700126
127 if (len(groupsToDel)) == 0 {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000128 logger.Debugw(ctx, "nothing-to-delete", log.Fields{"device-id": agent.deviceID})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700129 return coreutils.DoneResponse(), nil
130 }
Kent Hagermanf6db9f12020-07-22 17:16:19 -0400131 device := agent.getDeviceReadOnly()
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700132 dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
133 if err != nil {
134 return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
135 }
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700136
137 for _, group := range groupsToDel {
138 if groupHandle, have := agent.groupLoader.Lock(group.Desc.GroupId); have {
139 // Update the store and cache
140 if err := groupHandle.Delete(ctx); err != nil {
141 groupHandle.Unlock()
142 return coreutils.DoneResponse(), err
143 }
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700144 groupHandle.Unlock()
145 }
146 }
147
148 // Send update to adapters
149 subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
150 response := coreutils.NewResponse()
151 if !dType.AcceptsAddRemoveFlowUpdates {
Kent Hagermana7c9d792020-07-16 17:39:01 -0400152 updatedAllGroups := agent.listDeviceGroups()
153 rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, nil, updatedAllGroups, flowMetadata)
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700154 if err != nil {
155 cancel()
156 return coreutils.DoneResponse(), err
157 }
158 go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
159 } else {
160 flowChanges := &ofp.FlowChanges{
161 ToAdd: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
162 ToRemove: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
163 }
164 groupChanges := &ofp.FlowGroupChanges{
165 ToAdd: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
166 ToRemove: &voltha.FlowGroups{Items: groupsToDel},
167 ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
168 }
169 rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
170 if err != nil {
171 cancel()
172 return coreutils.DoneResponse(), err
173 }
174 go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
175 }
176 return response, nil
177}
178
179func (agent *Agent) updateGroupsToAdapter(ctx context.Context, updatedGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000180 logger.Debugw(ctx, "updateGroupsToAdapter", log.Fields{"device-id": agent.deviceID, "groups": updatedGroups})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700181
182 if (len(updatedGroups)) == 0 {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000183 logger.Debugw(ctx, "nothing-to-update", log.Fields{"device-id": agent.deviceID, "groups": updatedGroups})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700184 return coreutils.DoneResponse(), nil
185 }
186
Kent Hagermanf6db9f12020-07-22 17:16:19 -0400187 device := agent.getDeviceReadOnly()
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700188 if device.OperStatus != voltha.OperStatus_ACTIVE || device.ConnectStatus != voltha.ConnectStatus_REACHABLE || device.AdminState != voltha.AdminState_ENABLED {
189 return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "invalid device states-oper-%s-connect-%s-admin-%s", device.OperStatus, device.ConnectStatus, device.AdminState)
190 }
191 dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
192 if err != nil {
193 return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
194 }
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700195
Kent Hagermana7c9d792020-07-16 17:39:01 -0400196 groupsToUpdate := make([]*ofp.OfpGroupEntry, 0)
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700197 for _, group := range updatedGroups {
198 if groupHandle, have := agent.groupLoader.Lock(group.Desc.GroupId); have {
199 // Update the store and cache
200 if err := groupHandle.Update(ctx, group); err != nil {
201 groupHandle.Unlock()
202 return coreutils.DoneResponse(), err
203 }
204 groupsToUpdate = append(groupsToUpdate, group)
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700205 groupHandle.Unlock()
206 }
207 }
208
209 subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
210 response := coreutils.NewResponse()
211 // Process bulk flow update differently than incremental update
212 if !dType.AcceptsAddRemoveFlowUpdates {
Kent Hagermana7c9d792020-07-16 17:39:01 -0400213 updatedAllGroups := agent.listDeviceGroups()
214 rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, nil, updatedAllGroups, nil)
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700215 if err != nil {
216 cancel()
217 return coreutils.DoneResponse(), err
218 }
219 go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
220 } else {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000221 logger.Debugw(ctx, "updating-groups",
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700222 log.Fields{
223 "device-id": agent.deviceID,
224 "groups-to-update": groupsToUpdate,
225 })
226
227 // Sanity check
228 if (len(groupsToUpdate)) == 0 {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000229 logger.Debugw(ctx, "nothing-to-update", log.Fields{"device-id": agent.deviceID, "groups": groupsToUpdate})
Mahir Gunyelfa6ea272020-06-10 17:03:51 -0700230 cancel()
231 return coreutils.DoneResponse(), nil
232 }
233
234 flowChanges := &ofp.FlowChanges{
235 ToAdd: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
236 ToRemove: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
237 }
238 groupChanges := &ofp.FlowGroupChanges{
239 ToAdd: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
240 ToRemove: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
241 ToUpdate: &voltha.FlowGroups{Items: groupsToUpdate},
242 }
243 rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
244 if err != nil {
245 cancel()
246 return coreutils.DoneResponse(), err
247 }
248 go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
249 }
250
251 return response, nil
252}