blob: a87073b64c0dc15caec745667b651a38b6b5ad8a [file] [log] [blame]
Girish Gowdra9602eb42020-09-09 15:50:39 -07001/*
2 * Copyright 2020-present Open Networking Foundation
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 * http://www.apache.org/licenses/LICENSE-2.0
7 * Unless required by applicable law or agreed to in writing, software
8 * distributed under the License is distributed on an "AS IS" BASIS,
9 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10 * See the License for the specific language governing permissions and
11 * limitations under the License.
12 */
13
14//Package core provides the utility for olt devices, flows, groups and statistics
15package core
16
17import (
18 "context"
Girish Gowdraa09aeab2020-09-14 16:30:52 -070019 "github.com/opencord/voltha-lib-go/v4/pkg/flows"
20 "github.com/opencord/voltha-lib-go/v4/pkg/log"
Girish Gowdra9602eb42020-09-09 15:50:39 -070021 "github.com/opencord/voltha-openolt-adapter/internal/pkg/olterrors"
22 rsrcMgr "github.com/opencord/voltha-openolt-adapter/internal/pkg/resourcemanager"
Girish Gowdraa09aeab2020-09-14 16:30:52 -070023 ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
24 openoltpb2 "github.com/opencord/voltha-protos/v4/go/openolt"
Girish Gowdra9602eb42020-09-09 15:50:39 -070025 "google.golang.org/grpc/codes"
26 "google.golang.org/grpc/status"
27 "sync"
28)
29
30//QueueInfoBrief has information about gemPortID and service priority associated with Mcast group
31type QueueInfoBrief struct {
32 gemPortID uint32
33 servicePriority uint32
34}
35
36//OpenOltGroupMgr creates the Structure of OpenOltGroupMgr obj
37type OpenOltGroupMgr struct {
38 deviceHandler *DeviceHandler
39 resourceMgr *rsrcMgr.OpenOltResourceMgr
40 interfaceToMcastQueueMap map[uint32]*QueueInfoBrief /*pon interface -> multicast queue map. Required to assign GEM to a bucket during group population*/
41 interfaceToMcastQueueMapLock sync.RWMutex
42}
43
44//////////////////////////////////////////////
45// EXPORTED FUNCTIONS //
46//////////////////////////////////////////////
47
48//NewGroupManager creates OpenOltGroupMgr object and initializes the parameters
49func NewGroupManager(ctx context.Context, dh *DeviceHandler, rMgr *rsrcMgr.OpenOltResourceMgr) *OpenOltGroupMgr {
50 logger.Infow(ctx, "initializing-flow-manager", log.Fields{"device-id": dh.device.Id})
51 var grpMgr OpenOltGroupMgr
52 grpMgr.deviceHandler = dh
53 grpMgr.resourceMgr = rMgr
54 grpMgr.interfaceToMcastQueueMap = make(map[uint32]*QueueInfoBrief)
55 grpMgr.interfaceToMcastQueueMapLock = sync.RWMutex{}
56 logger.Info(ctx, "initialization-of-group-manager-success")
57 return &grpMgr
58}
59
60// AddGroup add or update the group
61func (g *OpenOltGroupMgr) AddGroup(ctx context.Context, group *ofp.OfpGroupEntry) error {
62 logger.Infow(ctx, "add-group", log.Fields{"group": group})
63 if group == nil {
64 return olterrors.NewErrInvalidValue(log.Fields{"group": group}, nil)
65 }
66 groupToOlt := openoltpb2.Group{
67 GroupId: group.Desc.GroupId,
68 Command: openoltpb2.Group_SET_MEMBERS,
69 Action: g.buildGroupAction(),
70 }
71 logger.Debugw(ctx, "sending-group-to-device", log.Fields{"groupToOlt": groupToOlt})
72 _, err := g.deviceHandler.Client.PerformGroupOperation(ctx, &groupToOlt)
73 if err != nil {
74 return olterrors.NewErrAdapter("add-group-operation-failed", log.Fields{"groupToOlt": groupToOlt}, err)
75 }
76 // group members not created yet. So let's store the group
77 if err := g.resourceMgr.AddFlowGroupToKVStore(ctx, group, true); err != nil {
Girish Gowdraa09aeab2020-09-14 16:30:52 -070078 return olterrors.NewErrPersistence("add", "flow-group", uint64(group.Desc.GroupId), log.Fields{"group": group}, err)
Girish Gowdra9602eb42020-09-09 15:50:39 -070079 }
80 logger.Infow(ctx, "add-group-operation-performed-on-the-device-successfully ", log.Fields{"groupToOlt": groupToOlt})
81 return nil
82}
83
84// DeleteGroup deletes a group from the device
85func (g *OpenOltGroupMgr) DeleteGroup(ctx context.Context, group *ofp.OfpGroupEntry) error {
86 logger.Debugw(ctx, "delete-group", log.Fields{"group": group})
87 if group == nil {
88 logger.Error(ctx, "unable-to-delete-group--invalid-argument--group-is-nil")
89 return olterrors.NewErrInvalidValue(log.Fields{"group": group}, nil)
90 }
91 groupToOlt := openoltpb2.Group{
92 GroupId: group.Desc.GroupId,
93 }
94 logger.Debugw(ctx, "deleting-group-from-device", log.Fields{"groupToOlt": groupToOlt})
95 _, err := g.deviceHandler.Client.DeleteGroup(ctx, &groupToOlt)
96 if err != nil {
97 logger.Errorw(ctx, "delete-group-failed-on-dev", log.Fields{"groupToOlt": groupToOlt, "err": err})
98 return olterrors.NewErrAdapter("delete-group-operation-failed", log.Fields{"groupToOlt": groupToOlt}, err)
99 }
100 //remove group from the store
101 if err := g.resourceMgr.RemoveFlowGroupFromKVStore(ctx, group.Desc.GroupId, false); err != nil {
Girish Gowdraa09aeab2020-09-14 16:30:52 -0700102 return olterrors.NewErrPersistence("delete", "flow-group", uint64(group.Desc.GroupId), log.Fields{"group": group}, err)
Girish Gowdra9602eb42020-09-09 15:50:39 -0700103 }
104 logger.Debugw(ctx, "delete-group-operation-performed-on-the-device-successfully ", log.Fields{"groupToOlt": groupToOlt})
105 return nil
106}
107
108// ModifyGroup updates the group
109func (g *OpenOltGroupMgr) ModifyGroup(ctx context.Context, group *ofp.OfpGroupEntry) error {
110 logger.Infow(ctx, "modify-group", log.Fields{"group": group})
111 if group == nil || group.Desc == nil {
112 return olterrors.NewErrInvalidValue(log.Fields{"group": group}, nil)
113 }
114 newGroup := g.buildGroup(ctx, group.Desc.GroupId, group.Desc.Buckets)
115 //get existing members of the group
116 val, groupExists, err := g.getFlowGroupFromKVStore(ctx, group.Desc.GroupId, false)
117 if err != nil {
118 return olterrors.NewErrNotFound("flow-group-in-kv-store", log.Fields{"groupId": group.Desc.GroupId}, err)
119 }
120 var current *openoltpb2.Group // represents the group on the device
121 if groupExists {
122 // group already exists
123 current = g.buildGroup(ctx, group.Desc.GroupId, val.Desc.GetBuckets())
124 logger.Debugw(ctx, "modify-group--group exists",
125 log.Fields{
126 "group on the device": val,
127 "new": group})
128 } else {
129 current = g.buildGroup(ctx, group.Desc.GroupId, nil)
130 }
131 logger.Debugw(ctx, "modify-group--comparing-current-and-new",
132 log.Fields{
133 "group on the device": current,
134 "new": newGroup})
135 // get members to be added
136 membersToBeAdded := g.findDiff(current, newGroup)
137 // get members to be removed
138 membersToBeRemoved := g.findDiff(newGroup, current)
139 logger.Infow(ctx, "modify-group--differences found", log.Fields{
140 "membersToBeAdded": membersToBeAdded,
141 "membersToBeRemoved": membersToBeRemoved,
142 "groupId": group.Desc.GroupId})
143 groupToOlt := openoltpb2.Group{
144 GroupId: group.Desc.GroupId,
145 }
146 var errAdd, errRemoved error
147 if len(membersToBeAdded) > 0 {
148 groupToOlt.Command = openoltpb2.Group_ADD_MEMBERS
149 groupToOlt.Members = membersToBeAdded
150 //execute addMembers
151 errAdd = g.callGroupAddRemove(ctx, &groupToOlt)
152 }
153 if len(membersToBeRemoved) > 0 {
154 groupToOlt.Command = openoltpb2.Group_REMOVE_MEMBERS
155 groupToOlt.Members = membersToBeRemoved
156 //execute removeMembers
157 errRemoved = g.callGroupAddRemove(ctx, &groupToOlt)
158 }
159 //save the modified group
160 if errAdd == nil && errRemoved == nil {
161 if err := g.resourceMgr.AddFlowGroupToKVStore(ctx, group, false); err != nil {
Girish Gowdraa09aeab2020-09-14 16:30:52 -0700162 return olterrors.NewErrPersistence("add", "flow-group", uint64(group.Desc.GroupId), log.Fields{"group": group}, err)
Girish Gowdra9602eb42020-09-09 15:50:39 -0700163 }
164 logger.Infow(ctx, "modify-group-was-success--storing-group",
165 log.Fields{
166 "group": group,
167 "existingGroup": current})
168 } else {
169 logger.Warnw(ctx, "one-of-the-group-add/remove-operations-failed--cannot-save-group-modifications",
170 log.Fields{"group": group})
171 if errAdd != nil {
172 return errAdd
173 }
174 return errRemoved
175 }
176 return nil
177}
178
179//LoadInterfaceToMulticastQueueMap reads multicast queues per interface from the KV store
180//and put them into interfaceToMcastQueueMap.
181func (g *OpenOltGroupMgr) LoadInterfaceToMulticastQueueMap(ctx context.Context) {
182 storedMulticastQueueMap, err := g.resourceMgr.GetMcastQueuePerInterfaceMap(ctx)
183 if err != nil {
184 logger.Error(ctx, "failed-to-get-pon-interface-to-multicast-queue-map")
185 return
186 }
187 for intf, queueInfo := range storedMulticastQueueMap {
188 q := QueueInfoBrief{
189 gemPortID: queueInfo[0],
190 servicePriority: queueInfo[1],
191 }
192 g.interfaceToMcastQueueMap[intf] = &q
193 }
194}
195
196//GetInterfaceToMcastQueueMap gets the mcast queue mapped to to the PON interface
197func (g *OpenOltGroupMgr) GetInterfaceToMcastQueueMap(intfID uint32) (*QueueInfoBrief, bool) {
198 g.interfaceToMcastQueueMapLock.RLock()
199 defer g.interfaceToMcastQueueMapLock.RUnlock()
200 val, present := g.interfaceToMcastQueueMap[intfID]
201 return val, present
202}
203
204//UpdateInterfaceToMcastQueueMap updates the mcast queue information mapped to a given PON interface
205func (g *OpenOltGroupMgr) UpdateInterfaceToMcastQueueMap(intfID uint32, val *QueueInfoBrief) {
206 g.interfaceToMcastQueueMapLock.Lock()
207 defer g.interfaceToMcastQueueMapLock.Unlock()
208 g.interfaceToMcastQueueMap[intfID] = val
209}
210
211////////////////////////////////////////////////
212// INTERNAL or UNEXPORTED FUNCTIONS //
213////////////////////////////////////////////////
214//getFlowGroupFromKVStore fetches and returns flow group from the KV store. Returns (nil, false, error) if any problem occurs during
215//fetching the data. Returns (group, true, nil) if the group is fetched and returned successfully.
216//Returns (nil, false, nil) if the group does not exists in the KV store.
217func (g *OpenOltGroupMgr) getFlowGroupFromKVStore(ctx context.Context, groupID uint32, cached bool) (*ofp.OfpGroupEntry, bool, error) {
218 exists, groupInfo, err := g.resourceMgr.GetFlowGroupFromKVStore(ctx, groupID, cached)
219 if err != nil {
220 return nil, false, olterrors.NewErrNotFound("flow-group", log.Fields{"group-id": groupID}, err)
221 }
222 if exists {
223 return newGroup(groupInfo.GroupID, groupInfo.OutPorts), exists, nil
224 }
225 return nil, exists, nil
226}
227func newGroup(groupID uint32, outPorts []uint32) *ofp.OfpGroupEntry {
228 groupDesc := ofp.OfpGroupDesc{
229 Type: ofp.OfpGroupType_OFPGT_ALL,
230 GroupId: groupID,
231 }
232 groupEntry := ofp.OfpGroupEntry{
233 Desc: &groupDesc,
234 }
235 for i := 0; i < len(outPorts); i++ {
236 var acts []*ofp.OfpAction
237 acts = append(acts, flows.Output(outPorts[i]))
238 bucket := ofp.OfpBucket{
239 Actions: acts,
240 }
241 groupDesc.Buckets = append(groupDesc.Buckets, &bucket)
242 }
243 return &groupEntry
244}
245
246//buildGroupAction creates and returns a group action
247func (g *OpenOltGroupMgr) buildGroupAction() *openoltpb2.Action {
248 var actionCmd openoltpb2.ActionCmd
249 var action openoltpb2.Action
250 action.Cmd = &actionCmd
251 //pop outer vlan
252 action.Cmd.RemoveOuterTag = true
253 return &action
254}
255
256//callGroupAddRemove performs add/remove buckets operation for the indicated group
257func (g *OpenOltGroupMgr) callGroupAddRemove(ctx context.Context, group *openoltpb2.Group) error {
258 if err := g.performGroupOperation(ctx, group); err != nil {
259 st, _ := status.FromError(err)
260 //ignore already exists error code
261 if st.Code() != codes.AlreadyExists {
262 return olterrors.NewErrGroupOp("groupAddRemove", group.GroupId, log.Fields{"status": st}, err)
263 }
264 }
265 return nil
266}
267
268//findDiff compares group members and finds members which only exists in groups2
269func (g *OpenOltGroupMgr) findDiff(group1 *openoltpb2.Group, group2 *openoltpb2.Group) []*openoltpb2.GroupMember {
270 var members []*openoltpb2.GroupMember
271 for _, bucket := range group2.Members {
272 if !g.contains(group1.Members, bucket) {
273 // bucket does not exist and must be added
274 members = append(members, bucket)
275 }
276 }
277 return members
278}
279
280//contains returns true if the members list contains the given member; false otherwise
281func (g *OpenOltGroupMgr) contains(members []*openoltpb2.GroupMember, member *openoltpb2.GroupMember) bool {
282 for _, groupMember := range members {
283 if groupMember.InterfaceId == member.InterfaceId {
284 return true
285 }
286 }
287 return false
288}
289
290//performGroupOperation call performGroupOperation operation of openolt proto
291func (g *OpenOltGroupMgr) performGroupOperation(ctx context.Context, group *openoltpb2.Group) error {
292 logger.Debugw(ctx, "sending-group-to-device",
293 log.Fields{
294 "groupToOlt": group,
295 "command": group.Command})
296 _, err := g.deviceHandler.Client.PerformGroupOperation(log.WithSpanFromContext(context.Background(), ctx), group)
297 if err != nil {
298 return olterrors.NewErrAdapter("group-operation-failed", log.Fields{"groupToOlt": group}, err)
299 }
300 return nil
301}
302
303//buildGroup build openoltpb2.Group from given group id and bucket list
304func (g *OpenOltGroupMgr) buildGroup(ctx context.Context, groupID uint32, buckets []*ofp.OfpBucket) *openoltpb2.Group {
305 group := openoltpb2.Group{
306 GroupId: groupID}
307 // create members of the group
308 for _, ofBucket := range buckets {
309 member := g.buildMember(ctx, ofBucket)
310 if member != nil && !g.contains(group.Members, member) {
311 group.Members = append(group.Members, member)
312 }
313 }
314 return &group
315}
316
317//buildMember builds openoltpb2.GroupMember from an OpenFlow bucket
318func (g *OpenOltGroupMgr) buildMember(ctx context.Context, ofBucket *ofp.OfpBucket) *openoltpb2.GroupMember {
319 var outPort uint32
320 outPortFound := false
321 for _, ofAction := range ofBucket.Actions {
322 if ofAction.Type == ofp.OfpActionType_OFPAT_OUTPUT {
323 outPort = ofAction.GetOutput().Port
324 outPortFound = true
325 }
326 }
327 if !outPortFound {
328 logger.Debugw(ctx, "bucket-skipped-since-no-out-port-found-in-it", log.Fields{"ofBucket": ofBucket})
329 return nil
330 }
331 interfaceID := IntfIDFromUniPortNum(outPort)
332 logger.Debugw(ctx, "got-associated-interface-id-of-the-port",
333 log.Fields{
334 "portNumber:": outPort,
335 "interfaceId:": interfaceID})
336 g.interfaceToMcastQueueMapLock.RLock()
337 defer g.interfaceToMcastQueueMapLock.RUnlock()
338 if groupInfo, ok := g.interfaceToMcastQueueMap[interfaceID]; ok {
339 member := openoltpb2.GroupMember{
340 InterfaceId: interfaceID,
341 InterfaceType: openoltpb2.GroupMember_PON,
342 GemPortId: groupInfo.gemPortID,
343 Priority: groupInfo.servicePriority,
344 }
345 //add member to the group
346 return &member
347 }
348 logger.Warnf(ctx, "bucket-skipped-since-interface-2-gem-mapping-cannot-be-found", log.Fields{"ofBucket": ofBucket})
349 return nil
350}