blob: 2dab42421f9936cf2beeb00f2a684dc75dcfc940 [file] [log] [blame]
Tinoj Josephcf161be2022-07-07 19:47:47 +05301/*
2* Copyright 2022-present Open Networking Foundation
3* Licensed under the Apache License, Version 2.0 (the "License");
4* you may not use this file except in compliance with the License.
5* You may obtain a copy of the License at
6*
7* http://www.apache.org/licenses/LICENSE-2.0
8*
9* Unless required by applicable law or agreed to in writing, software
10* distributed under the License is distributed on an "AS IS" BASIS,
11* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12* See the License for the specific language governing permissions and
13* limitations under the License.
vinokuma926cb3e2023-03-29 11:41:06 +053014 */
Tinoj Josephcf161be2022-07-07 19:47:47 +053015
16package application
17
18import (
Tinoj Joseph07cc5372022-07-18 22:53:51 +053019 "context"
Tinoj Josephcf161be2022-07-07 19:47:47 +053020 "encoding/json"
21 "net"
22 "sync"
23 "time"
24
25 "github.com/google/gopacket/layers"
26
27 "voltha-go-controller/database"
28 "voltha-go-controller/internal/pkg/of"
29 "voltha-go-controller/internal/pkg/util"
30 "voltha-go-controller/log"
31)
32
33// IgmpGroup implements a single MCIP that may have multiple receivers
34// connected via multiple devices (OLTs). The IGMP group is stored on the
35// VOLT application.
36type IgmpGroup struct {
Tinoj Josephcf161be2022-07-07 19:47:47 +053037 Devices map[string]*IgmpGroupDevice `json:"-"`
38 PendingGroupForDevice map[string]time.Time //map [deviceId, timestamp] (ExpiryTime = leave time + 15mins)
39 Version string
vinokuma926cb3e2023-03-29 11:41:06 +053040 GroupName string
41 GroupAddr net.IP
42 PendingPoolLock sync.RWMutex
43 IgmpGroupLock sync.RWMutex
44 GroupID uint32
45 Mvlan of.VlanType
46 PonVlan of.VlanType
Tinoj Josephcf161be2022-07-07 19:47:47 +053047 IsPonVlanPresent bool
48 IsChannelBasedGroup bool
Tinoj Josephcf161be2022-07-07 19:47:47 +053049 IsGroupStatic bool
Tinoj Josephcf161be2022-07-07 19:47:47 +053050}
51
52// NewIgmpGroup is constructor for an IGMP group
53func NewIgmpGroup(name string, vlan of.VlanType) *IgmpGroup {
54 ig := IgmpGroup{}
55 ig.GroupName = name
56 ig.Mvlan = vlan
57 ig.Devices = make(map[string]*IgmpGroupDevice)
58 ig.PendingGroupForDevice = make(map[string]time.Time)
59 return &ig
60}
61
62// IgmpGroupInit to initialize igmp group members
63func (ig *IgmpGroup) IgmpGroupInit(name string, gip net.IP, mvp *MvlanProfile) {
64 ig.GroupName = name
65 ig.Mvlan = mvp.Mvlan
66 ig.PonVlan = mvp.PonVlan
67 ig.IsPonVlanPresent = mvp.IsPonVlanPresent
68 ig.Devices = make(map[string]*IgmpGroupDevice)
69 ig.PendingGroupForDevice = make(map[string]time.Time)
70 ig.IsChannelBasedGroup = mvp.IsChannelBasedGroup
71 ig.IsGroupStatic = mvp.Groups[name].IsStatic
72 if ig.IsChannelBasedGroup {
73 ig.GroupAddr = gip
74 } else {
75 ig.GroupAddr = net.ParseIP("0.0.0.0")
76 }
77}
78
79// IgmpGroupReInit to re-initialize igmp group members
Tinoj Joseph07cc5372022-07-18 22:53:51 +053080func (ig *IgmpGroup) IgmpGroupReInit(cntx context.Context, name string, gip net.IP) {
Tinoj Josephcf161be2022-07-07 19:47:47 +053081 logger.Infow(ctx, "Reinitialize Igmp Group", log.Fields{"GroupID": ig.GroupID, "OldName": ig.GroupName, "Name": name, "OldAddr": ig.GroupAddr.String(), "GroupAddr": gip.String()})
82
83 ig.GroupName = name
84 if ig.IsChannelBasedGroup {
85 ig.GroupAddr = gip
86 } else {
87 ig.GroupAddr = net.ParseIP("0.0.0.0")
88 }
89
90 for _, igd := range ig.Devices {
Tinoj Joseph07cc5372022-07-18 22:53:51 +053091 igd.IgmpGroupDeviceReInit(cntx, ig)
Tinoj Josephcf161be2022-07-07 19:47:47 +053092 }
93}
94
95// updateGroupName to update group name
Tinoj Joseph07cc5372022-07-18 22:53:51 +053096func (ig *IgmpGroup) updateGroupName(cntx context.Context, newGroupName string) {
Tinoj Josephcf161be2022-07-07 19:47:47 +053097 if !ig.IsChannelBasedGroup {
98 logger.Errorw(ctx, "Group name update not supported for GroupChannel based group", log.Fields{"Ig": ig})
99 return
100 }
101 oldKey := ig.getKey()
102 ig.GroupName = newGroupName
103 for _, igd := range ig.Devices {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530104 igd.updateGroupName(cntx, newGroupName)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530105 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530106 if err := ig.WriteToDb(cntx); err != nil {
Tinoj Josephcf161be2022-07-07 19:47:47 +0530107 logger.Errorw(ctx, "Igmp group Write to DB failed", log.Fields{"groupName": ig.GroupName})
108 }
109 if !ig.IsChannelBasedGroup {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530110 _ = db.DelIgmpGroup(cntx, oldKey)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530111 }
112}
113
vinokuma926cb3e2023-03-29 11:41:06 +0530114// HandleGroupMigration - handles migration of group members between static & dynamic
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530115func (ig *IgmpGroup) HandleGroupMigration(cntx context.Context, deviceID string, groupAddr net.IP) {
Tinoj Josephcf161be2022-07-07 19:47:47 +0530116 var group *layers.IGMPv3GroupRecord
117 app := GetApplication()
118 if deviceID == "" {
119 logger.Infow(ctx, "Handle Group Migration Request for all devices", log.Fields{"DeviceID": deviceID, "GroupAddr": groupAddr, "IG": ig.GroupName, "Mvlan": ig.Mvlan})
120 for device := range ig.Devices {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530121 ig.HandleGroupMigration(cntx, device, groupAddr)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530122 }
123 } else {
124 logger.Infow(ctx, "Handle Group Migration Request", log.Fields{"DeviceID": deviceID, "GroupAddr": groupAddr, "IG": ig.GroupName})
125 var newIg *IgmpGroup
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530126 receivers := ig.DelIgmpChannel(cntx, deviceID, groupAddr)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530127 if ig.NumDevicesActive() == 0 {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530128 app.DelIgmpGroup(cntx, ig)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530129 }
130 if newIg = app.GetIgmpGroup(ig.Mvlan, groupAddr); newIg == nil {
131 logger.Infow(ctx, "IG Group doesn't exist, creating new group", log.Fields{"DeviceID": deviceID, "GroupAddr": groupAddr, "IG": ig.GroupName, "Mvlan": ig.Mvlan})
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530132 if newIg = app.AddIgmpGroup(cntx, app.GetMvlanProfileByTag(ig.Mvlan).Name, groupAddr, deviceID); newIg == nil {
Tinoj Josephcf161be2022-07-07 19:47:47 +0530133 logger.Errorw(ctx, "Group Creation failed during group migration", log.Fields{"DeviceID": deviceID, "GroupAddr": groupAddr})
134 return
135 }
136 }
137 mvp := app.GetMvlanProfileByTag(ig.Mvlan)
138 isStaticGroup := mvp.IsStaticGroup(ig.GroupName)
139 logger.Infow(ctx, "Existing receivers for old group", log.Fields{"Receivers": receivers})
140 newIg.IgmpGroupLock.Lock()
141 for port, igp := range receivers {
142 if !isStaticGroup && port == StaticPort {
143 continue
144 }
145 group = nil
146 var reqType layers.IGMPv3GroupRecordType
147 srcAddresses := []net.IP{}
148 if igp.Version == IgmpVersion3 {
149 if igp.Exclude {
150 srcAddresses = append(srcAddresses, igp.ExcludeList...)
151 reqType = layers.IGMPIsEx
152 } else {
153 srcAddresses = append(srcAddresses, igp.IncludeList...)
154 reqType = layers.IGMPIsIn
155 }
156 group = &layers.IGMPv3GroupRecord{
157 SourceAddresses: srcAddresses,
158 Type: reqType,
159 }
160 }
161 logger.Infow(ctx, "Adding receiver to new group", log.Fields{"DeviceID": deviceID, "GroupAddr": groupAddr, "newIg": newIg.GroupName, "IGP": igp})
162 ponPort := GetApplication().GetPonPortID(deviceID, port)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530163 newIg.AddReceiver(cntx, deviceID, port, groupAddr, group, igp.Version, igp.CVlan, igp.Pbit, ponPort)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530164 }
165 newIg.IgmpGroupLock.Unlock()
166 }
167}
168
169// AddIgmpGroupDevice add a device to the group which happens when the first receiver of the device
170// is added to the IGMP group.
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530171func (ig *IgmpGroup) AddIgmpGroupDevice(cntx context.Context, device string, id uint32, version uint8) *IgmpGroupDevice {
Tinoj Josephcf161be2022-07-07 19:47:47 +0530172 logger.Infow(ctx, "Adding Device to IGMP group", log.Fields{"Device": device, "GroupName": ig.GroupName})
173 igd := NewIgmpGroupDevice(device, ig, id, version)
174 ig.Devices[device] = igd
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530175 if err := igd.WriteToDb(cntx); err != nil {
Tinoj Josephcf161be2022-07-07 19:47:47 +0530176 logger.Errorw(ctx, "Igmp group device Write to DB failed", log.Fields{"Device": igd.Device, "GroupName": igd.GroupName, "GroupAddr": igd.GroupAddr.String()})
177 }
178 return igd
179}
180
181// DelIgmpGroupDevice delete the device from the group which happens when we receive a leave or when
182// there is not response for IGMP query from the receiver
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530183func (ig *IgmpGroup) DelIgmpGroupDevice(cntx context.Context, igd *IgmpGroupDevice) {
Tinoj Josephcf161be2022-07-07 19:47:47 +0530184 logger.Infow(ctx, "Deleting Device from IGMP group", log.Fields{"Device": igd.Device, "Name": ig.GroupName})
185 va := GetApplication()
186 countersToBeUpdated := false
187 if igd.NumReceivers() != 0 {
188 countersToBeUpdated = true
189 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530190 igd.DelAllChannels(cntx)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530191
vinokuma926cb3e2023-03-29 11:41:06 +0530192 // Clear all internal maps so that the groups can be reused
Tinoj Josephcf161be2022-07-07 19:47:47 +0530193 igd.PortChannelMap.Range(func(key, value interface{}) bool {
vinokuma926cb3e2023-03-29 11:41:06 +0530194 // Update the counters only if not already updated
195 // (i.e) 1. In case of channel remove during Mvlan Update
Tinoj Josephcf161be2022-07-07 19:47:47 +0530196 if countersToBeUpdated {
197 port := key.(string)
198 channelList := value.([]net.IP)
199 ponPortID := va.GetPonPortID(igd.Device, port)
200
201 for _, channel := range channelList {
202 igd.RemoveChannelFromChannelsPerPon(port, channel, ponPortID)
203 }
204 }
205
206 igd.PortChannelMap.Delete(key)
207 return true
208 })
209 igd.PonPortChannelMap = util.NewConcurrentMap()
210
211 if mcastCfg := va.GetMcastConfig(igd.SerialNo, va.GetMvlanProfileByTag(igd.Mvlan).Name); mcastCfg != nil {
212 mcastCfg.IgmpGroupDevices.Delete(igd.GroupID)
213 logger.Debugw(ctx, "Igd deleted from mcast config", log.Fields{"mvlan": mcastCfg.MvlanProfileID, "groupId": igd.GroupID})
214 }
215 if !igd.GroupInstalled {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530216 _ = db.DelIgmpDevice(cntx, igd.Mvlan, ig.GroupName, ig.GroupAddr, igd.Device)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530217 delete(ig.Devices, igd.Device)
218 }
219}
220
221// AddReceiver delete the device from the group which happens when we receive a leave or when
222// there is not response for IGMP query from the receiver
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530223func (ig *IgmpGroup) AddReceiver(cntx context.Context, device string, port string, groupIP net.IP,
Tinoj Josephcf161be2022-07-07 19:47:47 +0530224 group *layers.IGMPv3GroupRecord, ver uint8, cvlan uint16, pbit uint8, ponPort uint32) {
Tinoj Josephcf161be2022-07-07 19:47:47 +0530225 logger.Debugw(ctx, "Adding Receiver", log.Fields{"Port": port})
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530226 if igd, ok := ig.getIgmpGroupDevice(cntx, device); !ok {
227 igd = ig.AddIgmpGroupDevice(cntx, device, ig.GroupID, ver)
228 igd.AddReceiver(cntx, port, groupIP, group, ver, cvlan, pbit, ponPort)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530229 } else {
230 logger.Infow(ctx, "IGMP Group Receiver", log.Fields{"IGD": igd.Device})
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530231 igd.AddReceiver(cntx, port, groupIP, group, ver, cvlan, pbit, ponPort)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530232 }
233}
234
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530235func (ig *IgmpGroup) getIgmpGroupDevice(cntx context.Context, device string) (*IgmpGroupDevice, bool) {
Tinoj Josephcf161be2022-07-07 19:47:47 +0530236 ig.PendingPoolLock.Lock()
237 defer ig.PendingPoolLock.Unlock()
238
239 if _, ok := ig.PendingGroupForDevice[device]; ok {
240 logger.Infow(ctx, "Removing the IgmpGroupDevice from pending pool", log.Fields{"GroupID": ig.GroupID, "Device": device, "GroupName": ig.GroupName, "GroupAddr": ig.GroupAddr.String()})
241 delete(ig.PendingGroupForDevice, device)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530242 if err := ig.WriteToDb(cntx); err != nil {
Tinoj Josephcf161be2022-07-07 19:47:47 +0530243 logger.Errorw(ctx, "Igmp group Write to DB failed", log.Fields{"groupName": ig.GroupName})
244 }
245 }
246 igd, ok := ig.Devices[device]
247 return igd, ok
248}
249
250// DelReceiveronDownInd deletes a receiver which is the combination of device (OLT)
251// and port on Port Down event
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530252func (ig *IgmpGroup) DelReceiveronDownInd(cntx context.Context, device string, port string, ponPortID uint32) {
Tinoj Josephcf161be2022-07-07 19:47:47 +0530253 logger.Debugw(ctx, "Deleting Receiver for Group", log.Fields{"Device": device, "port": port})
254
255 mvp := GetApplication().GetMvlanProfileByTag(ig.Mvlan)
256 mvp.mvpLock.RLock()
257 defer mvp.mvpLock.RUnlock()
258 igd, ok := ig.Devices[device]
259 if !ok {
260 logger.Infow(ctx, "IGMP Group device was not found for ", log.Fields{"Device": device})
261 return
262 }
263 ipsList := []net.IP{}
264 ipsListIntf, ok := igd.PortChannelMap.Load(port)
265 if ok {
266 ipsList = append(ipsList, ipsListIntf.([]net.IP)...)
267 }
268 logger.Infow(ctx, "Port Channel List", log.Fields{"Port": port, "IPsList": ipsList})
269 igd.PortChannelMap.Range(printPortChannel)
270
Tinoj Josephcf161be2022-07-07 19:47:47 +0530271 for _, groupAddr := range ipsList {
272 logger.Debugw(ctx, "Port Channels", log.Fields{"Port": port, "IPsList": ipsList, "GroupAddr": groupAddr, "Len": len(ipsList)})
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530273 igd.DelReceiver(cntx, groupAddr, port, nil, ponPortID)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530274 }
275
276 if igd.NumReceivers() == 0 {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530277 ig.DelIgmpGroupDevice(cntx, igd)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530278 }
279}
280
281// DelReceiver deletes a receiver which is the combination of device (OLT)
282// and port
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530283func (ig *IgmpGroup) DelReceiver(cntx context.Context, device string, port string, groupAddr net.IP, group *layers.IGMPv3GroupRecord, ponPortID uint32) {
Tinoj Josephcf161be2022-07-07 19:47:47 +0530284 logger.Debugw(ctx, "Deleting Receiver for Group", log.Fields{"Device": device, "port": port, "GroupIP": groupAddr.String()})
285 if igd, ok := ig.Devices[device]; ok {
286 //igd.DelReceiverForGroupAddr(groupAddr, port)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530287 igd.DelReceiver(cntx, groupAddr, port, group, ponPortID)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530288 if igd.NumReceivers() == 0 {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530289 ig.DelIgmpGroupDevice(cntx, igd)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530290 }
291 }
292}
293
294// GetAllIgmpChannelForDevice - Returns all channels with active members associated to the Igmp Group for the given device
295func (ig *IgmpGroup) GetAllIgmpChannelForDevice(deviceID string) map[string]string {
Tinoj Josephcf161be2022-07-07 19:47:47 +0530296 if deviceID == "" {
297 return ig.GetAllIgmpChannel()
298 }
299
300 allChannels := make(map[string]string)
301 igd := ig.Devices[deviceID]
302 getAllChannels := func(key interface{}, value interface{}) bool {
303 channels := key.(string)
304 allChannels[channels] = channels //same value as only key is required
305 return true
306 }
307 igd.GroupChannels.Range(getAllChannels)
308
309 return allChannels
310}
311
312// GetAllIgmpChannel - Returns all channels with active members associated to the Igmp Group
313func (ig *IgmpGroup) GetAllIgmpChannel() map[string]string {
314 allChannels := make(map[string]string)
315 for _, igd := range ig.Devices {
316 getAllChannels := func(key interface{}, value interface{}) bool {
317 channels := key.(string)
318 allChannels[channels] = channels
319 return true
320 }
321 igd.GroupChannels.Range(getAllChannels)
322 }
323 return allChannels
324}
325
326// DelIgmpChannel deletes all receivers for the provided igmp group channel for the given device
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530327func (ig *IgmpGroup) DelIgmpChannel(cntx context.Context, deviceID string, groupAddr net.IP) map[string]*IgmpGroupPort {
Tinoj Josephcf161be2022-07-07 19:47:47 +0530328 logger.Infow(ctx, "Deleting Channel from devices", log.Fields{"Device": deviceID, "Group": ig.GroupName, "Channel": groupAddr.String()})
329 if deviceID == "" {
330 for device := range ig.Devices {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530331 ig.DelIgmpChannel(cntx, device, groupAddr)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530332 }
333 return nil
334 }
335 igd := ig.Devices[deviceID]
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530336 receivers := igd.DelChannelReceiver(cntx, groupAddr)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530337 if igd.NumReceivers() == 0 {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530338 ig.DelIgmpGroupDevice(cntx, igd)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530339 }
340 return receivers
341}
342
343// IsNewReceiver checks if the received port is new receiver or existing one.
344// Returns true if new receiver.
345func (ig *IgmpGroup) IsNewReceiver(device, uniPortID string, groupAddr net.IP) bool {
346 if ig == nil {
347 // IGMP group does not exists. So considering it as new receiver.
348 return true
349 }
350 logger.Debugw(ctx, "IGMP Group", log.Fields{"channel": groupAddr, "groupName": ig.GroupName}) // TODO: Remove me
351 igd, exists := ig.Devices[device]
352 if !exists || !igd.GroupInstalled {
353 // IGMP group not exists OR Group is not created in the device.
354 // So this is a new receiver.
355 logger.Debugw(ctx, "igd not exists or group is not created in device", log.Fields{"exists": exists}) // TODO: Remove me
356 return true
357 }
358 if igc, ok := igd.GroupChannels.Load(groupAddr.String()); ok {
359 logger.Debugw(ctx, "IGMP Channel receivers", log.Fields{"igc-receivers": igc.(*IgmpGroupChannel).CurReceivers}) // TODO: Remove me
360 _, rcvrExistCur := igc.(*IgmpGroupChannel).CurReceivers[uniPortID]
361 _, rcvrExistNew := igc.(*IgmpGroupChannel).NewReceivers[uniPortID]
362 if rcvrExistCur || rcvrExistNew {
363 // Existing receiver
364 return false
365 }
366 }
367 return true
368}
369
370// Tick for Addition of groups to an MVLAN profile
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530371func (ig *IgmpGroup) Tick(cntx context.Context) {
Tinoj Josephcf161be2022-07-07 19:47:47 +0530372 now := time.Now()
373 for _, igd := range ig.Devices {
374 var igdChangeCnt uint8
375
376 if _, ok := GetApplication().DevicesDisc.Load(igd.Device); !ok {
377 logger.Info(ctx, "Skipping Query and Expiry check since Device is unavailable")
378 continue
379 }
380 if now.After(igd.NextQueryTime) {
381 // Set the next query time and the query expiry time to
382 // KeepAliveInterval and MaxResp seconds after current time
383 igd.NextQueryTime = now.Add(time.Duration(igd.proxyCfg.KeepAliveInterval) * time.Second)
384 igd.QueryExpiryTime = now.Add(time.Duration(igd.proxyCfg.MaxResp) * time.Second)
385 logger.Debugw(ctx, "Query Start", log.Fields{"NextQuery": igd.NextQueryTime, "Expiry": igd.QueryExpiryTime})
386 igdChangeCnt++
387 logger.Debugw(ctx, "Sending Query to device", log.Fields{"Device": igd.Device})
388 sendQueryForAllChannels := func(key interface{}, value interface{}) bool {
389 igc := value.(*IgmpGroupChannel)
390 //TODO - Do generic query to avoid multiple msgs
391 igc.SendQuery()
392 return true
393 }
394 igd.GroupChannels.Range(sendQueryForAllChannels)
395 }
396 if now.After(igd.QueryExpiryTime) {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530397 igd.QueryExpiry(cntx)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530398 // This will keep it quiet till the next query time and then
399 // it will be reset to a value after the query initiation time
400 igd.QueryExpiryTime = igd.NextQueryTime
401 logger.Debugw(ctx, "Expiry", log.Fields{"NextQuery": igd.NextQueryTime, "Expiry": igd.QueryExpiryTime})
402 igdChangeCnt++
403 if igd.NumReceivers() == 0 {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530404 ig.DelIgmpGroupDevice(cntx, igd)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530405 continue
406 }
407 }
408
409 igdChangeCnt += igd.Tick()
410
411 if igdChangeCnt > 0 {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530412 if err := igd.WriteToDb(cntx); err != nil {
Tinoj Josephcf161be2022-07-07 19:47:47 +0530413 logger.Errorw(ctx, "Igmp group device Write to DB failed", log.Fields{"Device": igd.Device,
vinokuma926cb3e2023-03-29 11:41:06 +0530414 "GroupName": igd.GroupName, "GroupAddr": igd.GroupAddr.String()})
Tinoj Josephcf161be2022-07-07 19:47:47 +0530415 }
416 }
417 }
418}
419
420// QueryExpiry processes expiry of query sent to the receivers. Up on
421// expiry, process the consolidated response for each of the devices participating
422// in the MC stream. When a device has no receivers, the device is deleted
423// from the group.
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530424func (ig *IgmpGroup) QueryExpiry(cntx context.Context) {
Tinoj Josephcf161be2022-07-07 19:47:47 +0530425 for _, igd := range ig.Devices {
426 if _, ok := GetApplication().DevicesDisc.Load(igd.Device); ok {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530427 igd.QueryExpiry(cntx)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530428 if igd.NumReceivers() == 0 {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530429 ig.DelIgmpGroupDevice(cntx, igd)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530430 }
Tinoj Josephcf161be2022-07-07 19:47:47 +0530431 } else {
432 logger.Info(ctx, "Skipping Expiry since Device is unavailable")
433 }
434 }
435}
436
437// Hash : The IGMP group hash is used to distribute the processing of timers so that
438// the processing is spread across doesn't spike at one instant. This also
439// ensures that there is sufficient responsiveness to other requests happening
440// simultaneously.
441func (ig *IgmpGroup) Hash() uint16 {
442 mvp := GetApplication().GetMvlanProfileByTag(ig.Mvlan)
443
444 if mvp == nil {
445 return 0
446 }
447
448 mvp.mvpLock.RLock()
449 defer mvp.mvpLock.RUnlock()
450 group := mvp.Groups[ig.GroupName]
451
vinokuma926cb3e2023-03-29 11:41:06 +0530452 // Case where mvlan update in-progress
Tinoj Josephcf161be2022-07-07 19:47:47 +0530453 if group == nil || len(group.McIPs) == 0 {
454 return 0
455 }
456 groupIP := group.McIPs[0]
457 return uint16(groupIP[2])<<8 + uint16(groupIP[3])
458}
459
460// NumDevicesAll returns the number of devices (OLT) active on the IGMP group. When
461// the last device leaves the IGMP group is removed. If this is not done,
462// the number of IGMP groups only keep increasing and can impact CPU when
463// the system runs for a very long duration
464func (ig *IgmpGroup) NumDevicesAll() int {
465 return len(ig.Devices)
466}
467
468// NumDevicesActive returns the number of devices (OLT) active on the IGMP group. When
469// the last device leaves the IGMP group is removed. If this is not done,
470// the number of IGMP groups only keep increasing and can impact CPU when
471// the system runs for a very long duration
472func (ig *IgmpGroup) NumDevicesActive() int {
473 count := 0
474 for _, igd := range ig.Devices {
475 if igd.NumReceivers() == 0 && igd.GroupInstalled {
476 continue
477 }
478 count++
479 }
480 return count
481}
482
483// NumReceivers to return receiver list
484func (ig *IgmpGroup) NumReceivers() map[string]int {
485 receiverList := make(map[string]int)
486 for device, igd := range ig.Devices {
487 receiverList[device] = igd.NumReceivers()
488 }
489 return receiverList
490}
491
492// RestoreDevices : IGMP group write to DB
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530493func (ig *IgmpGroup) RestoreDevices(cntx context.Context) {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530494 ig.migrateIgmpDevices(cntx)
495 devices, _ := db.GetIgmpDevices(cntx, ig.Mvlan, ig.GroupName, ig.GroupAddr)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530496 for _, device := range devices {
497 b, ok := device.Value.([]byte)
498 if !ok {
499 logger.Warn(ctx, "The value type is not []byte")
500 continue
501 }
502 if igd, err := NewIgmpGroupDeviceFromBytes(b); err == nil {
503 igd.PonPortChannelMap = util.NewConcurrentMap()
504 // Update the proxy config pointers.
505 var mcastCfg *McastConfig
506 igd.proxyCfg, igd.IgmpProxyIP, mcastCfg = getIgmpProxyCfgAndIP(ig.Mvlan, igd.SerialNo)
507 if mcastCfg != nil {
508 mcastCfg.IgmpGroupDevices.Store(igd.GroupID, igd)
509 logger.Debugw(ctx, "Igd added to mcast config", log.Fields{"mvlan": mcastCfg.MvlanProfileID, "groupId": igd.GroupID})
510 }
511
512 mvp := GetApplication().GetMvlanProfileByTag(igd.Mvlan)
513 igd.ServVersion = mvp.IgmpServVersion[igd.SerialNo]
514
515 // During vgc upgrade from old version, igd.NextQueryTime and igd.QueryExpiryTime will not be present in db.
516 // hence they are initialized with current time offset.
517 emptyTime := time.Time{}
518 if emptyTime == igd.NextQueryTime {
519 logger.Debugw(ctx, "VGC igd upgrade", log.Fields{"igd grp name": igd.GroupName})
520 igd.NextQueryTime = time.Now().Add(time.Duration(igd.proxyCfg.KeepAliveInterval) * time.Second)
521 igd.QueryExpiryTime = time.Now().Add(time.Duration(igd.proxyCfg.KeepAliveInterval) * time.Second)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530522 if err := igd.WriteToDb(cntx); err != nil {
Tinoj Josephcf161be2022-07-07 19:47:47 +0530523 logger.Errorw(ctx, "Igmp group device Write to DB failed", log.Fields{"Device": igd.Device,
vinokuma926cb3e2023-03-29 11:41:06 +0530524 "GroupName": igd.GroupName, "GroupAddr": igd.GroupAddr.String()})
Tinoj Josephcf161be2022-07-07 19:47:47 +0530525 }
526 }
527
528 ig.Devices[igd.Device] = igd
529 if ig.IsChannelBasedGroup {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530530 channel, _ := db.GetIgmpChannel(cntx, igd.Mvlan, igd.GroupName, igd.Device, igd.GroupAddr)
531 igd.RestoreChannel(cntx, []byte(channel))
Tinoj Josephcf161be2022-07-07 19:47:47 +0530532 } else {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530533 igd.RestoreChannels(cntx)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530534 }
535 igd.PortChannelMap.Range(printPortChannel)
536 logger.Infow(ctx, "Group Device Restored", log.Fields{"IGD": igd})
537 } else {
538 logger.Warnw(ctx, "Unable to decode device from database", log.Fields{"str": string(b)})
539 }
540 }
541}
542
543// getKey to return group key
544func (ig *IgmpGroup) getKey() string {
545 profile, ok := GetApplication().MvlanProfilesByTag.Load(ig.Mvlan)
546 if ok {
547 mvp := profile.(*MvlanProfile)
548 return mvp.generateGroupKey(ig.GroupName, ig.GroupAddr.String())
549 }
550 return ""
551}
552
553// WriteToDb is utility to write Igmp Group Info to database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530554func (ig *IgmpGroup) WriteToDb(cntx context.Context) error {
vinokuma926cb3e2023-03-29 11:41:06 +0530555 ig.Version = database.PresentVersionMap[database.IgmpGroupPath]
556 b, err := json.Marshal(ig)
557 if err != nil {
558 return err
559 }
560 if err1 := db.PutIgmpGroup(cntx, ig.getKey(), string(b)); err1 != nil {
561 return err1
562 }
563 return nil
Tinoj Josephcf161be2022-07-07 19:47:47 +0530564}
565
566// UpdateIgmpGroup : When the pending group is allocated to new
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530567func (ig *IgmpGroup) UpdateIgmpGroup(cntx context.Context, oldKey, newKey string) {
vinokuma926cb3e2023-03-29 11:41:06 +0530568 // If the group is allocated to same McastGroup, no need to update the
569 // IgmpGroups map
570 if oldKey == newKey {
571 return
572 }
573 logger.Infow(ctx, "Updating Igmp Group with new MVP Group Info", log.Fields{"OldKey": oldKey, "NewKey": newKey, "GroupID": ig.GroupID})
Tinoj Josephcf161be2022-07-07 19:47:47 +0530574
vinokuma926cb3e2023-03-29 11:41:06 +0530575 GetApplication().IgmpGroups.Delete(oldKey)
576 _ = db.DelIgmpGroup(cntx, oldKey)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530577
vinokuma926cb3e2023-03-29 11:41:06 +0530578 GetApplication().IgmpGroups.Store(newKey, ig)
579 if err := ig.WriteToDb(cntx); err != nil {
580 logger.Errorw(ctx, "Igmp group Write to DB failed", log.Fields{"groupName": ig.GroupName})
581 }
Tinoj Josephcf161be2022-07-07 19:47:47 +0530582}
583
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530584func (ig *IgmpGroup) removeExpiredGroupFromDevice(cntx context.Context) {
vinokuma926cb3e2023-03-29 11:41:06 +0530585 ig.PendingPoolLock.Lock()
586 defer ig.PendingPoolLock.Unlock()
Tinoj Josephcf161be2022-07-07 19:47:47 +0530587
vinokuma926cb3e2023-03-29 11:41:06 +0530588 for device, timer := range ig.PendingGroupForDevice {
589 // To ensure no race-condition between the expiry time and the new Join,
590 // ensure the group exists in pending pool before deletion
591 groupExistsInPendingPool := true
Tinoj Josephcf161be2022-07-07 19:47:47 +0530592
vinokuma926cb3e2023-03-29 11:41:06 +0530593 if !time.Now().After(timer) {
594 continue
595 }
Tinoj Josephcf161be2022-07-07 19:47:47 +0530596
vinokuma926cb3e2023-03-29 11:41:06 +0530597 // Check if the IgmpGroup obj has no active member across any device
598 // If Yes, then this group is part of global pending pool (IgmpPendingPool), hence if expired,
599 // Remove only the IgmpGroup obj referenced to this device from global pool also.
600 if ig.NumDevicesActive() == 0 {
601 groupExistsInPendingPool = GetApplication().RemoveGroupFromPendingPool(device, ig)
602 }
Tinoj Josephcf161be2022-07-07 19:47:47 +0530603
vinokuma926cb3e2023-03-29 11:41:06 +0530604 // Remove the group entry from device and remove the IgmpDev Obj
605 // from IgmpGrp Pending pool
606 if groupExistsInPendingPool {
607 ig.DeleteIgmpGroupDevice(cntx, device)
608 }
609 }
Tinoj Josephcf161be2022-07-07 19:47:47 +0530610}
611
vinokuma926cb3e2023-03-29 11:41:06 +0530612// DeleteIgmpGroupDevice - removes the IgmpGroupDevice obj from IgmpGroup and database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530613func (ig *IgmpGroup) DeleteIgmpGroupDevice(cntx context.Context, device string) {
vinokuma926cb3e2023-03-29 11:41:06 +0530614 logger.Infow(ctx, "Deleting IgmpGroupDevice from IG Pending Pool", log.Fields{"Device": device, "GroupID": ig.GroupID, "GroupName": ig.GroupName, "GroupAddr": ig.GroupAddr.String(), "PendingDevices": len(ig.Devices)})
Tinoj Josephcf161be2022-07-07 19:47:47 +0530615
vinokuma926cb3e2023-03-29 11:41:06 +0530616 igd := ig.Devices[device]
617 igd.DelMcGroup(true)
618 delete(ig.Devices, device)
619 delete(ig.PendingGroupForDevice, device)
620 _ = db.DelIgmpDevice(cntx, igd.Mvlan, igd.GroupName, igd.GroupAddr, igd.Device)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530621
vinokuma926cb3e2023-03-29 11:41:06 +0530622 // If the group is not associated to any other device, then the entire Igmp Group obj itself can be removed
623 if ig.NumDevicesAll() == 0 {
624 logger.Infow(ctx, "Deleting IgmpGroup as all pending groups has expired", log.Fields{"Device": device, "GroupID": ig.GroupID, "GroupName": ig.GroupName, "GroupAddr": ig.GroupAddr.String(), "PendingDevices": len(ig.Devices)})
625 GetApplication().DelIgmpGroup(cntx, ig)
626 return
627 }
628 if err := ig.WriteToDb(cntx); err != nil {
629 logger.Errorw(ctx, "Igmp group Write to DB failed", log.Fields{"groupName": ig.GroupName})
630 }
Tinoj Josephcf161be2022-07-07 19:47:47 +0530631}
632
633// DelIgmpGroup deletes all devices for the provided igmp group
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530634func (ig *IgmpGroup) DelIgmpGroup(cntx context.Context) {
vinokuma926cb3e2023-03-29 11:41:06 +0530635 logger.Infow(ctx, "Deleting All Device for Group", log.Fields{"Group": ig.GroupName})
636 for _, igd := range ig.Devices {
637 ig.DelIgmpGroupDevice(cntx, igd)
638 }
639 GetApplication().DelIgmpGroup(cntx, ig)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530640}