blob: ff51041163ccde77ea1920b582b4aae89c0372de [file] [log] [blame]
Girish Gowdra9602eb42020-09-09 15:50:39 -07001/*
2 * Copyright 2020-present Open Networking Foundation
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 * http://www.apache.org/licenses/LICENSE-2.0
7 * Unless required by applicable law or agreed to in writing, software
8 * distributed under the License is distributed on an "AS IS" BASIS,
9 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10 * See the License for the specific language governing permissions and
11 * limitations under the License.
12 */
13
14//Package core provides the utility for olt devices, flows, groups and statistics
15package core
16
17import (
18 "context"
khenaidoo106c61a2021-08-11 18:05:46 -040019 "sync"
20
21 "github.com/opencord/voltha-lib-go/v7/pkg/flows"
22 "github.com/opencord/voltha-lib-go/v7/pkg/log"
Girish Gowdra9602eb42020-09-09 15:50:39 -070023 "github.com/opencord/voltha-openolt-adapter/internal/pkg/olterrors"
24 rsrcMgr "github.com/opencord/voltha-openolt-adapter/internal/pkg/resourcemanager"
khenaidoo106c61a2021-08-11 18:05:46 -040025 ofp "github.com/opencord/voltha-protos/v5/go/openflow_13"
26 openoltpb2 "github.com/opencord/voltha-protos/v5/go/openolt"
Girish Gowdra9602eb42020-09-09 15:50:39 -070027 "google.golang.org/grpc/codes"
28 "google.golang.org/grpc/status"
Girish Gowdra9602eb42020-09-09 15:50:39 -070029)
30
31//QueueInfoBrief has information about gemPortID and service priority associated with Mcast group
32type QueueInfoBrief struct {
33 gemPortID uint32
34 servicePriority uint32
35}
36
37//OpenOltGroupMgr creates the Structure of OpenOltGroupMgr obj
38type OpenOltGroupMgr struct {
39 deviceHandler *DeviceHandler
40 resourceMgr *rsrcMgr.OpenOltResourceMgr
41 interfaceToMcastQueueMap map[uint32]*QueueInfoBrief /*pon interface -> multicast queue map. Required to assign GEM to a bucket during group population*/
42 interfaceToMcastQueueMapLock sync.RWMutex
43}
44
45//////////////////////////////////////////////
46// EXPORTED FUNCTIONS //
47//////////////////////////////////////////////
48
49//NewGroupManager creates OpenOltGroupMgr object and initializes the parameters
50func NewGroupManager(ctx context.Context, dh *DeviceHandler, rMgr *rsrcMgr.OpenOltResourceMgr) *OpenOltGroupMgr {
Girish Gowdra4736e5c2021-08-25 15:19:10 -070051 logger.Infow(ctx, "initializing-group-manager", log.Fields{"device-id": dh.device.Id})
Girish Gowdra9602eb42020-09-09 15:50:39 -070052 var grpMgr OpenOltGroupMgr
53 grpMgr.deviceHandler = dh
54 grpMgr.resourceMgr = rMgr
55 grpMgr.interfaceToMcastQueueMap = make(map[uint32]*QueueInfoBrief)
56 grpMgr.interfaceToMcastQueueMapLock = sync.RWMutex{}
57 logger.Info(ctx, "initialization-of-group-manager-success")
58 return &grpMgr
59}
60
61// AddGroup add or update the group
62func (g *OpenOltGroupMgr) AddGroup(ctx context.Context, group *ofp.OfpGroupEntry) error {
63 logger.Infow(ctx, "add-group", log.Fields{"group": group})
64 if group == nil {
65 return olterrors.NewErrInvalidValue(log.Fields{"group": group}, nil)
66 }
67 groupToOlt := openoltpb2.Group{
68 GroupId: group.Desc.GroupId,
69 Command: openoltpb2.Group_SET_MEMBERS,
70 Action: g.buildGroupAction(),
71 }
72 logger.Debugw(ctx, "sending-group-to-device", log.Fields{"groupToOlt": groupToOlt})
73 _, err := g.deviceHandler.Client.PerformGroupOperation(ctx, &groupToOlt)
74 if err != nil {
75 return olterrors.NewErrAdapter("add-group-operation-failed", log.Fields{"groupToOlt": groupToOlt}, err)
76 }
77 // group members not created yet. So let's store the group
78 if err := g.resourceMgr.AddFlowGroupToKVStore(ctx, group, true); err != nil {
Girish Gowdraa09aeab2020-09-14 16:30:52 -070079 return olterrors.NewErrPersistence("add", "flow-group", uint64(group.Desc.GroupId), log.Fields{"group": group}, err)
Girish Gowdra9602eb42020-09-09 15:50:39 -070080 }
81 logger.Infow(ctx, "add-group-operation-performed-on-the-device-successfully ", log.Fields{"groupToOlt": groupToOlt})
82 return nil
83}
84
85// DeleteGroup deletes a group from the device
86func (g *OpenOltGroupMgr) DeleteGroup(ctx context.Context, group *ofp.OfpGroupEntry) error {
87 logger.Debugw(ctx, "delete-group", log.Fields{"group": group})
88 if group == nil {
89 logger.Error(ctx, "unable-to-delete-group--invalid-argument--group-is-nil")
90 return olterrors.NewErrInvalidValue(log.Fields{"group": group}, nil)
91 }
92 groupToOlt := openoltpb2.Group{
93 GroupId: group.Desc.GroupId,
94 }
95 logger.Debugw(ctx, "deleting-group-from-device", log.Fields{"groupToOlt": groupToOlt})
96 _, err := g.deviceHandler.Client.DeleteGroup(ctx, &groupToOlt)
97 if err != nil {
98 logger.Errorw(ctx, "delete-group-failed-on-dev", log.Fields{"groupToOlt": groupToOlt, "err": err})
99 return olterrors.NewErrAdapter("delete-group-operation-failed", log.Fields{"groupToOlt": groupToOlt}, err)
100 }
101 //remove group from the store
102 if err := g.resourceMgr.RemoveFlowGroupFromKVStore(ctx, group.Desc.GroupId, false); err != nil {
Girish Gowdraa09aeab2020-09-14 16:30:52 -0700103 return olterrors.NewErrPersistence("delete", "flow-group", uint64(group.Desc.GroupId), log.Fields{"group": group}, err)
Girish Gowdra9602eb42020-09-09 15:50:39 -0700104 }
105 logger.Debugw(ctx, "delete-group-operation-performed-on-the-device-successfully ", log.Fields{"groupToOlt": groupToOlt})
106 return nil
107}
108
109// ModifyGroup updates the group
110func (g *OpenOltGroupMgr) ModifyGroup(ctx context.Context, group *ofp.OfpGroupEntry) error {
111 logger.Infow(ctx, "modify-group", log.Fields{"group": group})
112 if group == nil || group.Desc == nil {
113 return olterrors.NewErrInvalidValue(log.Fields{"group": group}, nil)
114 }
115 newGroup := g.buildGroup(ctx, group.Desc.GroupId, group.Desc.Buckets)
116 //get existing members of the group
117 val, groupExists, err := g.getFlowGroupFromKVStore(ctx, group.Desc.GroupId, false)
118 if err != nil {
119 return olterrors.NewErrNotFound("flow-group-in-kv-store", log.Fields{"groupId": group.Desc.GroupId}, err)
120 }
121 var current *openoltpb2.Group // represents the group on the device
122 if groupExists {
123 // group already exists
124 current = g.buildGroup(ctx, group.Desc.GroupId, val.Desc.GetBuckets())
125 logger.Debugw(ctx, "modify-group--group exists",
126 log.Fields{
127 "group on the device": val,
128 "new": group})
129 } else {
130 current = g.buildGroup(ctx, group.Desc.GroupId, nil)
131 }
132 logger.Debugw(ctx, "modify-group--comparing-current-and-new",
133 log.Fields{
134 "group on the device": current,
135 "new": newGroup})
136 // get members to be added
137 membersToBeAdded := g.findDiff(current, newGroup)
138 // get members to be removed
139 membersToBeRemoved := g.findDiff(newGroup, current)
140 logger.Infow(ctx, "modify-group--differences found", log.Fields{
141 "membersToBeAdded": membersToBeAdded,
142 "membersToBeRemoved": membersToBeRemoved,
143 "groupId": group.Desc.GroupId})
144 groupToOlt := openoltpb2.Group{
145 GroupId: group.Desc.GroupId,
146 }
147 var errAdd, errRemoved error
148 if len(membersToBeAdded) > 0 {
149 groupToOlt.Command = openoltpb2.Group_ADD_MEMBERS
150 groupToOlt.Members = membersToBeAdded
151 //execute addMembers
152 errAdd = g.callGroupAddRemove(ctx, &groupToOlt)
153 }
154 if len(membersToBeRemoved) > 0 {
155 groupToOlt.Command = openoltpb2.Group_REMOVE_MEMBERS
156 groupToOlt.Members = membersToBeRemoved
157 //execute removeMembers
158 errRemoved = g.callGroupAddRemove(ctx, &groupToOlt)
159 }
160 //save the modified group
161 if errAdd == nil && errRemoved == nil {
162 if err := g.resourceMgr.AddFlowGroupToKVStore(ctx, group, false); err != nil {
Girish Gowdraa09aeab2020-09-14 16:30:52 -0700163 return olterrors.NewErrPersistence("add", "flow-group", uint64(group.Desc.GroupId), log.Fields{"group": group}, err)
Girish Gowdra9602eb42020-09-09 15:50:39 -0700164 }
165 logger.Infow(ctx, "modify-group-was-success--storing-group",
166 log.Fields{
167 "group": group,
168 "existingGroup": current})
169 } else {
170 logger.Warnw(ctx, "one-of-the-group-add/remove-operations-failed--cannot-save-group-modifications",
171 log.Fields{"group": group})
172 if errAdd != nil {
173 return errAdd
174 }
175 return errRemoved
176 }
177 return nil
178}
179
180//LoadInterfaceToMulticastQueueMap reads multicast queues per interface from the KV store
181//and put them into interfaceToMcastQueueMap.
182func (g *OpenOltGroupMgr) LoadInterfaceToMulticastQueueMap(ctx context.Context) {
183 storedMulticastQueueMap, err := g.resourceMgr.GetMcastQueuePerInterfaceMap(ctx)
184 if err != nil {
185 logger.Error(ctx, "failed-to-get-pon-interface-to-multicast-queue-map")
186 return
187 }
188 for intf, queueInfo := range storedMulticastQueueMap {
189 q := QueueInfoBrief{
190 gemPortID: queueInfo[0],
191 servicePriority: queueInfo[1],
192 }
193 g.interfaceToMcastQueueMap[intf] = &q
194 }
195}
196
197//GetInterfaceToMcastQueueMap gets the mcast queue mapped to to the PON interface
198func (g *OpenOltGroupMgr) GetInterfaceToMcastQueueMap(intfID uint32) (*QueueInfoBrief, bool) {
199 g.interfaceToMcastQueueMapLock.RLock()
200 defer g.interfaceToMcastQueueMapLock.RUnlock()
201 val, present := g.interfaceToMcastQueueMap[intfID]
202 return val, present
203}
204
205//UpdateInterfaceToMcastQueueMap updates the mcast queue information mapped to a given PON interface
206func (g *OpenOltGroupMgr) UpdateInterfaceToMcastQueueMap(intfID uint32, val *QueueInfoBrief) {
207 g.interfaceToMcastQueueMapLock.Lock()
208 defer g.interfaceToMcastQueueMapLock.Unlock()
209 g.interfaceToMcastQueueMap[intfID] = val
210}
211
212////////////////////////////////////////////////
213// INTERNAL or UNEXPORTED FUNCTIONS //
214////////////////////////////////////////////////
215//getFlowGroupFromKVStore fetches and returns flow group from the KV store. Returns (nil, false, error) if any problem occurs during
216//fetching the data. Returns (group, true, nil) if the group is fetched and returned successfully.
217//Returns (nil, false, nil) if the group does not exists in the KV store.
218func (g *OpenOltGroupMgr) getFlowGroupFromKVStore(ctx context.Context, groupID uint32, cached bool) (*ofp.OfpGroupEntry, bool, error) {
219 exists, groupInfo, err := g.resourceMgr.GetFlowGroupFromKVStore(ctx, groupID, cached)
220 if err != nil {
221 return nil, false, olterrors.NewErrNotFound("flow-group", log.Fields{"group-id": groupID}, err)
222 }
223 if exists {
224 return newGroup(groupInfo.GroupID, groupInfo.OutPorts), exists, nil
225 }
226 return nil, exists, nil
227}
228func newGroup(groupID uint32, outPorts []uint32) *ofp.OfpGroupEntry {
229 groupDesc := ofp.OfpGroupDesc{
230 Type: ofp.OfpGroupType_OFPGT_ALL,
231 GroupId: groupID,
232 }
233 groupEntry := ofp.OfpGroupEntry{
234 Desc: &groupDesc,
235 }
236 for i := 0; i < len(outPorts); i++ {
237 var acts []*ofp.OfpAction
238 acts = append(acts, flows.Output(outPorts[i]))
239 bucket := ofp.OfpBucket{
240 Actions: acts,
241 }
242 groupDesc.Buckets = append(groupDesc.Buckets, &bucket)
243 }
244 return &groupEntry
245}
246
247//buildGroupAction creates and returns a group action
248func (g *OpenOltGroupMgr) buildGroupAction() *openoltpb2.Action {
249 var actionCmd openoltpb2.ActionCmd
250 var action openoltpb2.Action
251 action.Cmd = &actionCmd
252 //pop outer vlan
253 action.Cmd.RemoveOuterTag = true
254 return &action
255}
256
257//callGroupAddRemove performs add/remove buckets operation for the indicated group
258func (g *OpenOltGroupMgr) callGroupAddRemove(ctx context.Context, group *openoltpb2.Group) error {
259 if err := g.performGroupOperation(ctx, group); err != nil {
260 st, _ := status.FromError(err)
261 //ignore already exists error code
262 if st.Code() != codes.AlreadyExists {
263 return olterrors.NewErrGroupOp("groupAddRemove", group.GroupId, log.Fields{"status": st}, err)
264 }
265 }
266 return nil
267}
268
269//findDiff compares group members and finds members which only exists in groups2
270func (g *OpenOltGroupMgr) findDiff(group1 *openoltpb2.Group, group2 *openoltpb2.Group) []*openoltpb2.GroupMember {
271 var members []*openoltpb2.GroupMember
272 for _, bucket := range group2.Members {
273 if !g.contains(group1.Members, bucket) {
274 // bucket does not exist and must be added
275 members = append(members, bucket)
276 }
277 }
278 return members
279}
280
281//contains returns true if the members list contains the given member; false otherwise
282func (g *OpenOltGroupMgr) contains(members []*openoltpb2.GroupMember, member *openoltpb2.GroupMember) bool {
283 for _, groupMember := range members {
284 if groupMember.InterfaceId == member.InterfaceId {
285 return true
286 }
287 }
288 return false
289}
290
291//performGroupOperation call performGroupOperation operation of openolt proto
292func (g *OpenOltGroupMgr) performGroupOperation(ctx context.Context, group *openoltpb2.Group) error {
293 logger.Debugw(ctx, "sending-group-to-device",
294 log.Fields{
295 "groupToOlt": group,
296 "command": group.Command})
297 _, err := g.deviceHandler.Client.PerformGroupOperation(log.WithSpanFromContext(context.Background(), ctx), group)
298 if err != nil {
299 return olterrors.NewErrAdapter("group-operation-failed", log.Fields{"groupToOlt": group}, err)
300 }
301 return nil
302}
303
304//buildGroup build openoltpb2.Group from given group id and bucket list
305func (g *OpenOltGroupMgr) buildGroup(ctx context.Context, groupID uint32, buckets []*ofp.OfpBucket) *openoltpb2.Group {
306 group := openoltpb2.Group{
307 GroupId: groupID}
308 // create members of the group
309 for _, ofBucket := range buckets {
310 member := g.buildMember(ctx, ofBucket)
311 if member != nil && !g.contains(group.Members, member) {
312 group.Members = append(group.Members, member)
313 }
314 }
315 return &group
316}
317
318//buildMember builds openoltpb2.GroupMember from an OpenFlow bucket
319func (g *OpenOltGroupMgr) buildMember(ctx context.Context, ofBucket *ofp.OfpBucket) *openoltpb2.GroupMember {
320 var outPort uint32
321 outPortFound := false
322 for _, ofAction := range ofBucket.Actions {
323 if ofAction.Type == ofp.OfpActionType_OFPAT_OUTPUT {
324 outPort = ofAction.GetOutput().Port
325 outPortFound = true
326 }
327 }
328 if !outPortFound {
329 logger.Debugw(ctx, "bucket-skipped-since-no-out-port-found-in-it", log.Fields{"ofBucket": ofBucket})
330 return nil
331 }
332 interfaceID := IntfIDFromUniPortNum(outPort)
333 logger.Debugw(ctx, "got-associated-interface-id-of-the-port",
334 log.Fields{
335 "portNumber:": outPort,
336 "interfaceId:": interfaceID})
337 g.interfaceToMcastQueueMapLock.RLock()
338 defer g.interfaceToMcastQueueMapLock.RUnlock()
339 if groupInfo, ok := g.interfaceToMcastQueueMap[interfaceID]; ok {
340 member := openoltpb2.GroupMember{
341 InterfaceId: interfaceID,
342 InterfaceType: openoltpb2.GroupMember_PON,
343 GemPortId: groupInfo.gemPortID,
344 Priority: groupInfo.servicePriority,
345 }
346 //add member to the group
347 return &member
348 }
349 logger.Warnf(ctx, "bucket-skipped-since-interface-2-gem-mapping-cannot-be-found", log.Fields{"ofBucket": ofBucket})
350 return nil
351}