Girish Gowdra | 9602eb4 | 2020-09-09 15:50:39 -0700 | [diff] [blame] | 1 | /* |
| 2 | * Copyright 2020-present Open Networking Foundation |
| 3 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | * you may not use this file except in compliance with the License. |
| 5 | * You may obtain a copy of the License at |
| 6 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 7 | * Unless required by applicable law or agreed to in writing, software |
| 8 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 9 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 10 | * See the License for the specific language governing permissions and |
| 11 | * limitations under the License. |
| 12 | */ |
| 13 | |
| 14 | //Package core provides the utility for olt devices, flows, groups and statistics |
| 15 | package core |
| 16 | |
| 17 | import ( |
| 18 | "context" |
khenaidoo | 106c61a | 2021-08-11 18:05:46 -0400 | [diff] [blame] | 19 | "sync" |
| 20 | |
| 21 | "github.com/opencord/voltha-lib-go/v7/pkg/flows" |
| 22 | "github.com/opencord/voltha-lib-go/v7/pkg/log" |
Mahir Gunyel | 85f61c1 | 2021-10-06 11:53:45 -0700 | [diff] [blame] | 23 | plt "github.com/opencord/voltha-lib-go/v7/pkg/platform" |
Girish Gowdra | 9602eb4 | 2020-09-09 15:50:39 -0700 | [diff] [blame] | 24 | "github.com/opencord/voltha-openolt-adapter/internal/pkg/olterrors" |
| 25 | rsrcMgr "github.com/opencord/voltha-openolt-adapter/internal/pkg/resourcemanager" |
khenaidoo | 106c61a | 2021-08-11 18:05:46 -0400 | [diff] [blame] | 26 | ofp "github.com/opencord/voltha-protos/v5/go/openflow_13" |
| 27 | openoltpb2 "github.com/opencord/voltha-protos/v5/go/openolt" |
Girish Gowdra | 9602eb4 | 2020-09-09 15:50:39 -0700 | [diff] [blame] | 28 | "google.golang.org/grpc/codes" |
| 29 | "google.golang.org/grpc/status" |
Girish Gowdra | 9602eb4 | 2020-09-09 15:50:39 -0700 | [diff] [blame] | 30 | ) |
| 31 | |
| 32 | //QueueInfoBrief has information about gemPortID and service priority associated with Mcast group |
| 33 | type QueueInfoBrief struct { |
| 34 | gemPortID uint32 |
| 35 | servicePriority uint32 |
| 36 | } |
| 37 | |
| 38 | //OpenOltGroupMgr creates the Structure of OpenOltGroupMgr obj |
| 39 | type OpenOltGroupMgr struct { |
| 40 | deviceHandler *DeviceHandler |
| 41 | resourceMgr *rsrcMgr.OpenOltResourceMgr |
| 42 | interfaceToMcastQueueMap map[uint32]*QueueInfoBrief /*pon interface -> multicast queue map. Required to assign GEM to a bucket during group population*/ |
| 43 | interfaceToMcastQueueMapLock sync.RWMutex |
| 44 | } |
| 45 | |
| 46 | ////////////////////////////////////////////// |
| 47 | // EXPORTED FUNCTIONS // |
| 48 | ////////////////////////////////////////////// |
| 49 | |
| 50 | //NewGroupManager creates OpenOltGroupMgr object and initializes the parameters |
| 51 | func NewGroupManager(ctx context.Context, dh *DeviceHandler, rMgr *rsrcMgr.OpenOltResourceMgr) *OpenOltGroupMgr { |
Girish Gowdra | 4736e5c | 2021-08-25 15:19:10 -0700 | [diff] [blame] | 52 | logger.Infow(ctx, "initializing-group-manager", log.Fields{"device-id": dh.device.Id}) |
Girish Gowdra | 9602eb4 | 2020-09-09 15:50:39 -0700 | [diff] [blame] | 53 | var grpMgr OpenOltGroupMgr |
| 54 | grpMgr.deviceHandler = dh |
| 55 | grpMgr.resourceMgr = rMgr |
| 56 | grpMgr.interfaceToMcastQueueMap = make(map[uint32]*QueueInfoBrief) |
| 57 | grpMgr.interfaceToMcastQueueMapLock = sync.RWMutex{} |
| 58 | logger.Info(ctx, "initialization-of-group-manager-success") |
| 59 | return &grpMgr |
| 60 | } |
| 61 | |
| 62 | // AddGroup add or update the group |
| 63 | func (g *OpenOltGroupMgr) AddGroup(ctx context.Context, group *ofp.OfpGroupEntry) error { |
| 64 | logger.Infow(ctx, "add-group", log.Fields{"group": group}) |
| 65 | if group == nil { |
| 66 | return olterrors.NewErrInvalidValue(log.Fields{"group": group}, nil) |
| 67 | } |
| 68 | groupToOlt := openoltpb2.Group{ |
| 69 | GroupId: group.Desc.GroupId, |
| 70 | Command: openoltpb2.Group_SET_MEMBERS, |
| 71 | Action: g.buildGroupAction(), |
| 72 | } |
| 73 | logger.Debugw(ctx, "sending-group-to-device", log.Fields{"groupToOlt": groupToOlt}) |
| 74 | _, err := g.deviceHandler.Client.PerformGroupOperation(ctx, &groupToOlt) |
| 75 | if err != nil { |
| 76 | return olterrors.NewErrAdapter("add-group-operation-failed", log.Fields{"groupToOlt": groupToOlt}, err) |
| 77 | } |
| 78 | // group members not created yet. So let's store the group |
| 79 | if err := g.resourceMgr.AddFlowGroupToKVStore(ctx, group, true); err != nil { |
Girish Gowdra | a09aeab | 2020-09-14 16:30:52 -0700 | [diff] [blame] | 80 | return olterrors.NewErrPersistence("add", "flow-group", uint64(group.Desc.GroupId), log.Fields{"group": group}, err) |
Girish Gowdra | 9602eb4 | 2020-09-09 15:50:39 -0700 | [diff] [blame] | 81 | } |
| 82 | logger.Infow(ctx, "add-group-operation-performed-on-the-device-successfully ", log.Fields{"groupToOlt": groupToOlt}) |
| 83 | return nil |
| 84 | } |
| 85 | |
| 86 | // DeleteGroup deletes a group from the device |
| 87 | func (g *OpenOltGroupMgr) DeleteGroup(ctx context.Context, group *ofp.OfpGroupEntry) error { |
| 88 | logger.Debugw(ctx, "delete-group", log.Fields{"group": group}) |
| 89 | if group == nil { |
| 90 | logger.Error(ctx, "unable-to-delete-group--invalid-argument--group-is-nil") |
| 91 | return olterrors.NewErrInvalidValue(log.Fields{"group": group}, nil) |
| 92 | } |
| 93 | groupToOlt := openoltpb2.Group{ |
| 94 | GroupId: group.Desc.GroupId, |
| 95 | } |
| 96 | logger.Debugw(ctx, "deleting-group-from-device", log.Fields{"groupToOlt": groupToOlt}) |
| 97 | _, err := g.deviceHandler.Client.DeleteGroup(ctx, &groupToOlt) |
| 98 | if err != nil { |
| 99 | logger.Errorw(ctx, "delete-group-failed-on-dev", log.Fields{"groupToOlt": groupToOlt, "err": err}) |
| 100 | return olterrors.NewErrAdapter("delete-group-operation-failed", log.Fields{"groupToOlt": groupToOlt}, err) |
| 101 | } |
| 102 | //remove group from the store |
| 103 | if err := g.resourceMgr.RemoveFlowGroupFromKVStore(ctx, group.Desc.GroupId, false); err != nil { |
Girish Gowdra | a09aeab | 2020-09-14 16:30:52 -0700 | [diff] [blame] | 104 | return olterrors.NewErrPersistence("delete", "flow-group", uint64(group.Desc.GroupId), log.Fields{"group": group}, err) |
Girish Gowdra | 9602eb4 | 2020-09-09 15:50:39 -0700 | [diff] [blame] | 105 | } |
| 106 | logger.Debugw(ctx, "delete-group-operation-performed-on-the-device-successfully ", log.Fields{"groupToOlt": groupToOlt}) |
| 107 | return nil |
| 108 | } |
| 109 | |
| 110 | // ModifyGroup updates the group |
| 111 | func (g *OpenOltGroupMgr) ModifyGroup(ctx context.Context, group *ofp.OfpGroupEntry) error { |
| 112 | logger.Infow(ctx, "modify-group", log.Fields{"group": group}) |
| 113 | if group == nil || group.Desc == nil { |
| 114 | return olterrors.NewErrInvalidValue(log.Fields{"group": group}, nil) |
| 115 | } |
| 116 | newGroup := g.buildGroup(ctx, group.Desc.GroupId, group.Desc.Buckets) |
| 117 | //get existing members of the group |
| 118 | val, groupExists, err := g.getFlowGroupFromKVStore(ctx, group.Desc.GroupId, false) |
| 119 | if err != nil { |
| 120 | return olterrors.NewErrNotFound("flow-group-in-kv-store", log.Fields{"groupId": group.Desc.GroupId}, err) |
| 121 | } |
| 122 | var current *openoltpb2.Group // represents the group on the device |
| 123 | if groupExists { |
| 124 | // group already exists |
| 125 | current = g.buildGroup(ctx, group.Desc.GroupId, val.Desc.GetBuckets()) |
| 126 | logger.Debugw(ctx, "modify-group--group exists", |
| 127 | log.Fields{ |
| 128 | "group on the device": val, |
| 129 | "new": group}) |
| 130 | } else { |
| 131 | current = g.buildGroup(ctx, group.Desc.GroupId, nil) |
| 132 | } |
| 133 | logger.Debugw(ctx, "modify-group--comparing-current-and-new", |
| 134 | log.Fields{ |
| 135 | "group on the device": current, |
| 136 | "new": newGroup}) |
| 137 | // get members to be added |
| 138 | membersToBeAdded := g.findDiff(current, newGroup) |
| 139 | // get members to be removed |
| 140 | membersToBeRemoved := g.findDiff(newGroup, current) |
| 141 | logger.Infow(ctx, "modify-group--differences found", log.Fields{ |
| 142 | "membersToBeAdded": membersToBeAdded, |
| 143 | "membersToBeRemoved": membersToBeRemoved, |
| 144 | "groupId": group.Desc.GroupId}) |
| 145 | groupToOlt := openoltpb2.Group{ |
| 146 | GroupId: group.Desc.GroupId, |
| 147 | } |
| 148 | var errAdd, errRemoved error |
| 149 | if len(membersToBeAdded) > 0 { |
| 150 | groupToOlt.Command = openoltpb2.Group_ADD_MEMBERS |
| 151 | groupToOlt.Members = membersToBeAdded |
| 152 | //execute addMembers |
| 153 | errAdd = g.callGroupAddRemove(ctx, &groupToOlt) |
| 154 | } |
| 155 | if len(membersToBeRemoved) > 0 { |
| 156 | groupToOlt.Command = openoltpb2.Group_REMOVE_MEMBERS |
| 157 | groupToOlt.Members = membersToBeRemoved |
| 158 | //execute removeMembers |
| 159 | errRemoved = g.callGroupAddRemove(ctx, &groupToOlt) |
| 160 | } |
| 161 | //save the modified group |
| 162 | if errAdd == nil && errRemoved == nil { |
| 163 | if err := g.resourceMgr.AddFlowGroupToKVStore(ctx, group, false); err != nil { |
Girish Gowdra | a09aeab | 2020-09-14 16:30:52 -0700 | [diff] [blame] | 164 | return olterrors.NewErrPersistence("add", "flow-group", uint64(group.Desc.GroupId), log.Fields{"group": group}, err) |
Girish Gowdra | 9602eb4 | 2020-09-09 15:50:39 -0700 | [diff] [blame] | 165 | } |
| 166 | logger.Infow(ctx, "modify-group-was-success--storing-group", |
| 167 | log.Fields{ |
| 168 | "group": group, |
| 169 | "existingGroup": current}) |
| 170 | } else { |
| 171 | logger.Warnw(ctx, "one-of-the-group-add/remove-operations-failed--cannot-save-group-modifications", |
| 172 | log.Fields{"group": group}) |
| 173 | if errAdd != nil { |
| 174 | return errAdd |
| 175 | } |
| 176 | return errRemoved |
| 177 | } |
| 178 | return nil |
| 179 | } |
| 180 | |
| 181 | //LoadInterfaceToMulticastQueueMap reads multicast queues per interface from the KV store |
| 182 | //and put them into interfaceToMcastQueueMap. |
| 183 | func (g *OpenOltGroupMgr) LoadInterfaceToMulticastQueueMap(ctx context.Context) { |
| 184 | storedMulticastQueueMap, err := g.resourceMgr.GetMcastQueuePerInterfaceMap(ctx) |
| 185 | if err != nil { |
| 186 | logger.Error(ctx, "failed-to-get-pon-interface-to-multicast-queue-map") |
| 187 | return |
| 188 | } |
| 189 | for intf, queueInfo := range storedMulticastQueueMap { |
| 190 | q := QueueInfoBrief{ |
| 191 | gemPortID: queueInfo[0], |
| 192 | servicePriority: queueInfo[1], |
| 193 | } |
| 194 | g.interfaceToMcastQueueMap[intf] = &q |
| 195 | } |
| 196 | } |
| 197 | |
| 198 | //GetInterfaceToMcastQueueMap gets the mcast queue mapped to to the PON interface |
| 199 | func (g *OpenOltGroupMgr) GetInterfaceToMcastQueueMap(intfID uint32) (*QueueInfoBrief, bool) { |
| 200 | g.interfaceToMcastQueueMapLock.RLock() |
| 201 | defer g.interfaceToMcastQueueMapLock.RUnlock() |
| 202 | val, present := g.interfaceToMcastQueueMap[intfID] |
| 203 | return val, present |
| 204 | } |
| 205 | |
| 206 | //UpdateInterfaceToMcastQueueMap updates the mcast queue information mapped to a given PON interface |
| 207 | func (g *OpenOltGroupMgr) UpdateInterfaceToMcastQueueMap(intfID uint32, val *QueueInfoBrief) { |
| 208 | g.interfaceToMcastQueueMapLock.Lock() |
| 209 | defer g.interfaceToMcastQueueMapLock.Unlock() |
| 210 | g.interfaceToMcastQueueMap[intfID] = val |
| 211 | } |
| 212 | |
| 213 | //////////////////////////////////////////////// |
| 214 | // INTERNAL or UNEXPORTED FUNCTIONS // |
| 215 | //////////////////////////////////////////////// |
| 216 | //getFlowGroupFromKVStore fetches and returns flow group from the KV store. Returns (nil, false, error) if any problem occurs during |
| 217 | //fetching the data. Returns (group, true, nil) if the group is fetched and returned successfully. |
| 218 | //Returns (nil, false, nil) if the group does not exists in the KV store. |
| 219 | func (g *OpenOltGroupMgr) getFlowGroupFromKVStore(ctx context.Context, groupID uint32, cached bool) (*ofp.OfpGroupEntry, bool, error) { |
| 220 | exists, groupInfo, err := g.resourceMgr.GetFlowGroupFromKVStore(ctx, groupID, cached) |
| 221 | if err != nil { |
| 222 | return nil, false, olterrors.NewErrNotFound("flow-group", log.Fields{"group-id": groupID}, err) |
| 223 | } |
| 224 | if exists { |
| 225 | return newGroup(groupInfo.GroupID, groupInfo.OutPorts), exists, nil |
| 226 | } |
| 227 | return nil, exists, nil |
| 228 | } |
| 229 | func newGroup(groupID uint32, outPorts []uint32) *ofp.OfpGroupEntry { |
| 230 | groupDesc := ofp.OfpGroupDesc{ |
| 231 | Type: ofp.OfpGroupType_OFPGT_ALL, |
| 232 | GroupId: groupID, |
| 233 | } |
| 234 | groupEntry := ofp.OfpGroupEntry{ |
| 235 | Desc: &groupDesc, |
| 236 | } |
| 237 | for i := 0; i < len(outPorts); i++ { |
| 238 | var acts []*ofp.OfpAction |
| 239 | acts = append(acts, flows.Output(outPorts[i])) |
| 240 | bucket := ofp.OfpBucket{ |
| 241 | Actions: acts, |
| 242 | } |
| 243 | groupDesc.Buckets = append(groupDesc.Buckets, &bucket) |
| 244 | } |
| 245 | return &groupEntry |
| 246 | } |
| 247 | |
| 248 | //buildGroupAction creates and returns a group action |
| 249 | func (g *OpenOltGroupMgr) buildGroupAction() *openoltpb2.Action { |
| 250 | var actionCmd openoltpb2.ActionCmd |
| 251 | var action openoltpb2.Action |
| 252 | action.Cmd = &actionCmd |
| 253 | //pop outer vlan |
| 254 | action.Cmd.RemoveOuterTag = true |
| 255 | return &action |
| 256 | } |
| 257 | |
| 258 | //callGroupAddRemove performs add/remove buckets operation for the indicated group |
| 259 | func (g *OpenOltGroupMgr) callGroupAddRemove(ctx context.Context, group *openoltpb2.Group) error { |
| 260 | if err := g.performGroupOperation(ctx, group); err != nil { |
| 261 | st, _ := status.FromError(err) |
| 262 | //ignore already exists error code |
| 263 | if st.Code() != codes.AlreadyExists { |
| 264 | return olterrors.NewErrGroupOp("groupAddRemove", group.GroupId, log.Fields{"status": st}, err) |
| 265 | } |
| 266 | } |
| 267 | return nil |
| 268 | } |
| 269 | |
| 270 | //findDiff compares group members and finds members which only exists in groups2 |
| 271 | func (g *OpenOltGroupMgr) findDiff(group1 *openoltpb2.Group, group2 *openoltpb2.Group) []*openoltpb2.GroupMember { |
| 272 | var members []*openoltpb2.GroupMember |
| 273 | for _, bucket := range group2.Members { |
| 274 | if !g.contains(group1.Members, bucket) { |
| 275 | // bucket does not exist and must be added |
| 276 | members = append(members, bucket) |
| 277 | } |
| 278 | } |
| 279 | return members |
| 280 | } |
| 281 | |
| 282 | //contains returns true if the members list contains the given member; false otherwise |
| 283 | func (g *OpenOltGroupMgr) contains(members []*openoltpb2.GroupMember, member *openoltpb2.GroupMember) bool { |
| 284 | for _, groupMember := range members { |
| 285 | if groupMember.InterfaceId == member.InterfaceId { |
| 286 | return true |
| 287 | } |
| 288 | } |
| 289 | return false |
| 290 | } |
| 291 | |
| 292 | //performGroupOperation call performGroupOperation operation of openolt proto |
| 293 | func (g *OpenOltGroupMgr) performGroupOperation(ctx context.Context, group *openoltpb2.Group) error { |
| 294 | logger.Debugw(ctx, "sending-group-to-device", |
| 295 | log.Fields{ |
| 296 | "groupToOlt": group, |
| 297 | "command": group.Command}) |
| 298 | _, err := g.deviceHandler.Client.PerformGroupOperation(log.WithSpanFromContext(context.Background(), ctx), group) |
| 299 | if err != nil { |
| 300 | return olterrors.NewErrAdapter("group-operation-failed", log.Fields{"groupToOlt": group}, err) |
| 301 | } |
| 302 | return nil |
| 303 | } |
| 304 | |
| 305 | //buildGroup build openoltpb2.Group from given group id and bucket list |
| 306 | func (g *OpenOltGroupMgr) buildGroup(ctx context.Context, groupID uint32, buckets []*ofp.OfpBucket) *openoltpb2.Group { |
| 307 | group := openoltpb2.Group{ |
| 308 | GroupId: groupID} |
| 309 | // create members of the group |
| 310 | for _, ofBucket := range buckets { |
| 311 | member := g.buildMember(ctx, ofBucket) |
| 312 | if member != nil && !g.contains(group.Members, member) { |
| 313 | group.Members = append(group.Members, member) |
| 314 | } |
| 315 | } |
| 316 | return &group |
| 317 | } |
| 318 | |
| 319 | //buildMember builds openoltpb2.GroupMember from an OpenFlow bucket |
| 320 | func (g *OpenOltGroupMgr) buildMember(ctx context.Context, ofBucket *ofp.OfpBucket) *openoltpb2.GroupMember { |
| 321 | var outPort uint32 |
| 322 | outPortFound := false |
| 323 | for _, ofAction := range ofBucket.Actions { |
| 324 | if ofAction.Type == ofp.OfpActionType_OFPAT_OUTPUT { |
| 325 | outPort = ofAction.GetOutput().Port |
| 326 | outPortFound = true |
| 327 | } |
| 328 | } |
| 329 | if !outPortFound { |
| 330 | logger.Debugw(ctx, "bucket-skipped-since-no-out-port-found-in-it", log.Fields{"ofBucket": ofBucket}) |
| 331 | return nil |
| 332 | } |
Mahir Gunyel | 85f61c1 | 2021-10-06 11:53:45 -0700 | [diff] [blame] | 333 | interfaceID := plt.IntfIDFromUniPortNum(outPort) |
Girish Gowdra | 9602eb4 | 2020-09-09 15:50:39 -0700 | [diff] [blame] | 334 | logger.Debugw(ctx, "got-associated-interface-id-of-the-port", |
| 335 | log.Fields{ |
| 336 | "portNumber:": outPort, |
| 337 | "interfaceId:": interfaceID}) |
| 338 | g.interfaceToMcastQueueMapLock.RLock() |
| 339 | defer g.interfaceToMcastQueueMapLock.RUnlock() |
| 340 | if groupInfo, ok := g.interfaceToMcastQueueMap[interfaceID]; ok { |
| 341 | member := openoltpb2.GroupMember{ |
| 342 | InterfaceId: interfaceID, |
| 343 | InterfaceType: openoltpb2.GroupMember_PON, |
| 344 | GemPortId: groupInfo.gemPortID, |
| 345 | Priority: groupInfo.servicePriority, |
| 346 | } |
| 347 | //add member to the group |
| 348 | return &member |
| 349 | } |
| 350 | logger.Warnf(ctx, "bucket-skipped-since-interface-2-gem-mapping-cannot-be-found", log.Fields{"ofBucket": ofBucket}) |
| 351 | return nil |
| 352 | } |