blob: 2dab42421f9936cf2beeb00f2a684dc75dcfc940 [file] [log] [blame]
/*
* Copyright 2022-present Open Networking Foundation
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package application
import (
"context"
"encoding/json"
"net"
"sync"
"time"
"github.com/google/gopacket/layers"
"voltha-go-controller/database"
"voltha-go-controller/internal/pkg/of"
"voltha-go-controller/internal/pkg/util"
"voltha-go-controller/log"
)
// IgmpGroup implements a single MCIP that may have multiple receivers
// connected via multiple devices (OLTs). The IGMP group is stored on the
// VOLT application.
type IgmpGroup struct {
Devices map[string]*IgmpGroupDevice `json:"-"`
PendingGroupForDevice map[string]time.Time //map [deviceId, timestamp] (ExpiryTime = leave time + 15mins)
Version string
GroupName string
GroupAddr net.IP
PendingPoolLock sync.RWMutex
IgmpGroupLock sync.RWMutex
GroupID uint32
Mvlan of.VlanType
PonVlan of.VlanType
IsPonVlanPresent bool
IsChannelBasedGroup bool
IsGroupStatic bool
}
// NewIgmpGroup is constructor for an IGMP group
func NewIgmpGroup(name string, vlan of.VlanType) *IgmpGroup {
ig := IgmpGroup{}
ig.GroupName = name
ig.Mvlan = vlan
ig.Devices = make(map[string]*IgmpGroupDevice)
ig.PendingGroupForDevice = make(map[string]time.Time)
return &ig
}
// IgmpGroupInit to initialize igmp group members
func (ig *IgmpGroup) IgmpGroupInit(name string, gip net.IP, mvp *MvlanProfile) {
ig.GroupName = name
ig.Mvlan = mvp.Mvlan
ig.PonVlan = mvp.PonVlan
ig.IsPonVlanPresent = mvp.IsPonVlanPresent
ig.Devices = make(map[string]*IgmpGroupDevice)
ig.PendingGroupForDevice = make(map[string]time.Time)
ig.IsChannelBasedGroup = mvp.IsChannelBasedGroup
ig.IsGroupStatic = mvp.Groups[name].IsStatic
if ig.IsChannelBasedGroup {
ig.GroupAddr = gip
} else {
ig.GroupAddr = net.ParseIP("0.0.0.0")
}
}
// IgmpGroupReInit to re-initialize igmp group members
func (ig *IgmpGroup) IgmpGroupReInit(cntx context.Context, name string, gip net.IP) {
logger.Infow(ctx, "Reinitialize Igmp Group", log.Fields{"GroupID": ig.GroupID, "OldName": ig.GroupName, "Name": name, "OldAddr": ig.GroupAddr.String(), "GroupAddr": gip.String()})
ig.GroupName = name
if ig.IsChannelBasedGroup {
ig.GroupAddr = gip
} else {
ig.GroupAddr = net.ParseIP("0.0.0.0")
}
for _, igd := range ig.Devices {
igd.IgmpGroupDeviceReInit(cntx, ig)
}
}
// updateGroupName to update group name
func (ig *IgmpGroup) updateGroupName(cntx context.Context, newGroupName string) {
if !ig.IsChannelBasedGroup {
logger.Errorw(ctx, "Group name update not supported for GroupChannel based group", log.Fields{"Ig": ig})
return
}
oldKey := ig.getKey()
ig.GroupName = newGroupName
for _, igd := range ig.Devices {
igd.updateGroupName(cntx, newGroupName)
}
if err := ig.WriteToDb(cntx); err != nil {
logger.Errorw(ctx, "Igmp group Write to DB failed", log.Fields{"groupName": ig.GroupName})
}
if !ig.IsChannelBasedGroup {
_ = db.DelIgmpGroup(cntx, oldKey)
}
}
// HandleGroupMigration - handles migration of group members between static & dynamic
func (ig *IgmpGroup) HandleGroupMigration(cntx context.Context, deviceID string, groupAddr net.IP) {
var group *layers.IGMPv3GroupRecord
app := GetApplication()
if deviceID == "" {
logger.Infow(ctx, "Handle Group Migration Request for all devices", log.Fields{"DeviceID": deviceID, "GroupAddr": groupAddr, "IG": ig.GroupName, "Mvlan": ig.Mvlan})
for device := range ig.Devices {
ig.HandleGroupMigration(cntx, device, groupAddr)
}
} else {
logger.Infow(ctx, "Handle Group Migration Request", log.Fields{"DeviceID": deviceID, "GroupAddr": groupAddr, "IG": ig.GroupName})
var newIg *IgmpGroup
receivers := ig.DelIgmpChannel(cntx, deviceID, groupAddr)
if ig.NumDevicesActive() == 0 {
app.DelIgmpGroup(cntx, ig)
}
if newIg = app.GetIgmpGroup(ig.Mvlan, groupAddr); newIg == nil {
logger.Infow(ctx, "IG Group doesn't exist, creating new group", log.Fields{"DeviceID": deviceID, "GroupAddr": groupAddr, "IG": ig.GroupName, "Mvlan": ig.Mvlan})
if newIg = app.AddIgmpGroup(cntx, app.GetMvlanProfileByTag(ig.Mvlan).Name, groupAddr, deviceID); newIg == nil {
logger.Errorw(ctx, "Group Creation failed during group migration", log.Fields{"DeviceID": deviceID, "GroupAddr": groupAddr})
return
}
}
mvp := app.GetMvlanProfileByTag(ig.Mvlan)
isStaticGroup := mvp.IsStaticGroup(ig.GroupName)
logger.Infow(ctx, "Existing receivers for old group", log.Fields{"Receivers": receivers})
newIg.IgmpGroupLock.Lock()
for port, igp := range receivers {
if !isStaticGroup && port == StaticPort {
continue
}
group = nil
var reqType layers.IGMPv3GroupRecordType
srcAddresses := []net.IP{}
if igp.Version == IgmpVersion3 {
if igp.Exclude {
srcAddresses = append(srcAddresses, igp.ExcludeList...)
reqType = layers.IGMPIsEx
} else {
srcAddresses = append(srcAddresses, igp.IncludeList...)
reqType = layers.IGMPIsIn
}
group = &layers.IGMPv3GroupRecord{
SourceAddresses: srcAddresses,
Type: reqType,
}
}
logger.Infow(ctx, "Adding receiver to new group", log.Fields{"DeviceID": deviceID, "GroupAddr": groupAddr, "newIg": newIg.GroupName, "IGP": igp})
ponPort := GetApplication().GetPonPortID(deviceID, port)
newIg.AddReceiver(cntx, deviceID, port, groupAddr, group, igp.Version, igp.CVlan, igp.Pbit, ponPort)
}
newIg.IgmpGroupLock.Unlock()
}
}
// AddIgmpGroupDevice add a device to the group which happens when the first receiver of the device
// is added to the IGMP group.
func (ig *IgmpGroup) AddIgmpGroupDevice(cntx context.Context, device string, id uint32, version uint8) *IgmpGroupDevice {
logger.Infow(ctx, "Adding Device to IGMP group", log.Fields{"Device": device, "GroupName": ig.GroupName})
igd := NewIgmpGroupDevice(device, ig, id, version)
ig.Devices[device] = igd
if err := igd.WriteToDb(cntx); err != nil {
logger.Errorw(ctx, "Igmp group device Write to DB failed", log.Fields{"Device": igd.Device, "GroupName": igd.GroupName, "GroupAddr": igd.GroupAddr.String()})
}
return igd
}
// DelIgmpGroupDevice delete the device from the group which happens when we receive a leave or when
// there is not response for IGMP query from the receiver
func (ig *IgmpGroup) DelIgmpGroupDevice(cntx context.Context, igd *IgmpGroupDevice) {
logger.Infow(ctx, "Deleting Device from IGMP group", log.Fields{"Device": igd.Device, "Name": ig.GroupName})
va := GetApplication()
countersToBeUpdated := false
if igd.NumReceivers() != 0 {
countersToBeUpdated = true
}
igd.DelAllChannels(cntx)
// Clear all internal maps so that the groups can be reused
igd.PortChannelMap.Range(func(key, value interface{}) bool {
// Update the counters only if not already updated
// (i.e) 1. In case of channel remove during Mvlan Update
if countersToBeUpdated {
port := key.(string)
channelList := value.([]net.IP)
ponPortID := va.GetPonPortID(igd.Device, port)
for _, channel := range channelList {
igd.RemoveChannelFromChannelsPerPon(port, channel, ponPortID)
}
}
igd.PortChannelMap.Delete(key)
return true
})
igd.PonPortChannelMap = util.NewConcurrentMap()
if mcastCfg := va.GetMcastConfig(igd.SerialNo, va.GetMvlanProfileByTag(igd.Mvlan).Name); mcastCfg != nil {
mcastCfg.IgmpGroupDevices.Delete(igd.GroupID)
logger.Debugw(ctx, "Igd deleted from mcast config", log.Fields{"mvlan": mcastCfg.MvlanProfileID, "groupId": igd.GroupID})
}
if !igd.GroupInstalled {
_ = db.DelIgmpDevice(cntx, igd.Mvlan, ig.GroupName, ig.GroupAddr, igd.Device)
delete(ig.Devices, igd.Device)
}
}
// AddReceiver delete the device from the group which happens when we receive a leave or when
// there is not response for IGMP query from the receiver
func (ig *IgmpGroup) AddReceiver(cntx context.Context, device string, port string, groupIP net.IP,
group *layers.IGMPv3GroupRecord, ver uint8, cvlan uint16, pbit uint8, ponPort uint32) {
logger.Debugw(ctx, "Adding Receiver", log.Fields{"Port": port})
if igd, ok := ig.getIgmpGroupDevice(cntx, device); !ok {
igd = ig.AddIgmpGroupDevice(cntx, device, ig.GroupID, ver)
igd.AddReceiver(cntx, port, groupIP, group, ver, cvlan, pbit, ponPort)
} else {
logger.Infow(ctx, "IGMP Group Receiver", log.Fields{"IGD": igd.Device})
igd.AddReceiver(cntx, port, groupIP, group, ver, cvlan, pbit, ponPort)
}
}
func (ig *IgmpGroup) getIgmpGroupDevice(cntx context.Context, device string) (*IgmpGroupDevice, bool) {
ig.PendingPoolLock.Lock()
defer ig.PendingPoolLock.Unlock()
if _, ok := ig.PendingGroupForDevice[device]; ok {
logger.Infow(ctx, "Removing the IgmpGroupDevice from pending pool", log.Fields{"GroupID": ig.GroupID, "Device": device, "GroupName": ig.GroupName, "GroupAddr": ig.GroupAddr.String()})
delete(ig.PendingGroupForDevice, device)
if err := ig.WriteToDb(cntx); err != nil {
logger.Errorw(ctx, "Igmp group Write to DB failed", log.Fields{"groupName": ig.GroupName})
}
}
igd, ok := ig.Devices[device]
return igd, ok
}
// DelReceiveronDownInd deletes a receiver which is the combination of device (OLT)
// and port on Port Down event
func (ig *IgmpGroup) DelReceiveronDownInd(cntx context.Context, device string, port string, ponPortID uint32) {
logger.Debugw(ctx, "Deleting Receiver for Group", log.Fields{"Device": device, "port": port})
mvp := GetApplication().GetMvlanProfileByTag(ig.Mvlan)
mvp.mvpLock.RLock()
defer mvp.mvpLock.RUnlock()
igd, ok := ig.Devices[device]
if !ok {
logger.Infow(ctx, "IGMP Group device was not found for ", log.Fields{"Device": device})
return
}
ipsList := []net.IP{}
ipsListIntf, ok := igd.PortChannelMap.Load(port)
if ok {
ipsList = append(ipsList, ipsListIntf.([]net.IP)...)
}
logger.Infow(ctx, "Port Channel List", log.Fields{"Port": port, "IPsList": ipsList})
igd.PortChannelMap.Range(printPortChannel)
for _, groupAddr := range ipsList {
logger.Debugw(ctx, "Port Channels", log.Fields{"Port": port, "IPsList": ipsList, "GroupAddr": groupAddr, "Len": len(ipsList)})
igd.DelReceiver(cntx, groupAddr, port, nil, ponPortID)
}
if igd.NumReceivers() == 0 {
ig.DelIgmpGroupDevice(cntx, igd)
}
}
// DelReceiver deletes a receiver which is the combination of device (OLT)
// and port
func (ig *IgmpGroup) DelReceiver(cntx context.Context, device string, port string, groupAddr net.IP, group *layers.IGMPv3GroupRecord, ponPortID uint32) {
logger.Debugw(ctx, "Deleting Receiver for Group", log.Fields{"Device": device, "port": port, "GroupIP": groupAddr.String()})
if igd, ok := ig.Devices[device]; ok {
//igd.DelReceiverForGroupAddr(groupAddr, port)
igd.DelReceiver(cntx, groupAddr, port, group, ponPortID)
if igd.NumReceivers() == 0 {
ig.DelIgmpGroupDevice(cntx, igd)
}
}
}
// GetAllIgmpChannelForDevice - Returns all channels with active members associated to the Igmp Group for the given device
func (ig *IgmpGroup) GetAllIgmpChannelForDevice(deviceID string) map[string]string {
if deviceID == "" {
return ig.GetAllIgmpChannel()
}
allChannels := make(map[string]string)
igd := ig.Devices[deviceID]
getAllChannels := func(key interface{}, value interface{}) bool {
channels := key.(string)
allChannels[channels] = channels //same value as only key is required
return true
}
igd.GroupChannels.Range(getAllChannels)
return allChannels
}
// GetAllIgmpChannel - Returns all channels with active members associated to the Igmp Group
func (ig *IgmpGroup) GetAllIgmpChannel() map[string]string {
allChannels := make(map[string]string)
for _, igd := range ig.Devices {
getAllChannels := func(key interface{}, value interface{}) bool {
channels := key.(string)
allChannels[channels] = channels
return true
}
igd.GroupChannels.Range(getAllChannels)
}
return allChannels
}
// DelIgmpChannel deletes all receivers for the provided igmp group channel for the given device
func (ig *IgmpGroup) DelIgmpChannel(cntx context.Context, deviceID string, groupAddr net.IP) map[string]*IgmpGroupPort {
logger.Infow(ctx, "Deleting Channel from devices", log.Fields{"Device": deviceID, "Group": ig.GroupName, "Channel": groupAddr.String()})
if deviceID == "" {
for device := range ig.Devices {
ig.DelIgmpChannel(cntx, device, groupAddr)
}
return nil
}
igd := ig.Devices[deviceID]
receivers := igd.DelChannelReceiver(cntx, groupAddr)
if igd.NumReceivers() == 0 {
ig.DelIgmpGroupDevice(cntx, igd)
}
return receivers
}
// IsNewReceiver checks if the received port is new receiver or existing one.
// Returns true if new receiver.
func (ig *IgmpGroup) IsNewReceiver(device, uniPortID string, groupAddr net.IP) bool {
if ig == nil {
// IGMP group does not exists. So considering it as new receiver.
return true
}
logger.Debugw(ctx, "IGMP Group", log.Fields{"channel": groupAddr, "groupName": ig.GroupName}) // TODO: Remove me
igd, exists := ig.Devices[device]
if !exists || !igd.GroupInstalled {
// IGMP group not exists OR Group is not created in the device.
// So this is a new receiver.
logger.Debugw(ctx, "igd not exists or group is not created in device", log.Fields{"exists": exists}) // TODO: Remove me
return true
}
if igc, ok := igd.GroupChannels.Load(groupAddr.String()); ok {
logger.Debugw(ctx, "IGMP Channel receivers", log.Fields{"igc-receivers": igc.(*IgmpGroupChannel).CurReceivers}) // TODO: Remove me
_, rcvrExistCur := igc.(*IgmpGroupChannel).CurReceivers[uniPortID]
_, rcvrExistNew := igc.(*IgmpGroupChannel).NewReceivers[uniPortID]
if rcvrExistCur || rcvrExistNew {
// Existing receiver
return false
}
}
return true
}
// Tick for Addition of groups to an MVLAN profile
func (ig *IgmpGroup) Tick(cntx context.Context) {
now := time.Now()
for _, igd := range ig.Devices {
var igdChangeCnt uint8
if _, ok := GetApplication().DevicesDisc.Load(igd.Device); !ok {
logger.Info(ctx, "Skipping Query and Expiry check since Device is unavailable")
continue
}
if now.After(igd.NextQueryTime) {
// Set the next query time and the query expiry time to
// KeepAliveInterval and MaxResp seconds after current time
igd.NextQueryTime = now.Add(time.Duration(igd.proxyCfg.KeepAliveInterval) * time.Second)
igd.QueryExpiryTime = now.Add(time.Duration(igd.proxyCfg.MaxResp) * time.Second)
logger.Debugw(ctx, "Query Start", log.Fields{"NextQuery": igd.NextQueryTime, "Expiry": igd.QueryExpiryTime})
igdChangeCnt++
logger.Debugw(ctx, "Sending Query to device", log.Fields{"Device": igd.Device})
sendQueryForAllChannels := func(key interface{}, value interface{}) bool {
igc := value.(*IgmpGroupChannel)
//TODO - Do generic query to avoid multiple msgs
igc.SendQuery()
return true
}
igd.GroupChannels.Range(sendQueryForAllChannels)
}
if now.After(igd.QueryExpiryTime) {
igd.QueryExpiry(cntx)
// This will keep it quiet till the next query time and then
// it will be reset to a value after the query initiation time
igd.QueryExpiryTime = igd.NextQueryTime
logger.Debugw(ctx, "Expiry", log.Fields{"NextQuery": igd.NextQueryTime, "Expiry": igd.QueryExpiryTime})
igdChangeCnt++
if igd.NumReceivers() == 0 {
ig.DelIgmpGroupDevice(cntx, igd)
continue
}
}
igdChangeCnt += igd.Tick()
if igdChangeCnt > 0 {
if err := igd.WriteToDb(cntx); err != nil {
logger.Errorw(ctx, "Igmp group device Write to DB failed", log.Fields{"Device": igd.Device,
"GroupName": igd.GroupName, "GroupAddr": igd.GroupAddr.String()})
}
}
}
}
// QueryExpiry processes expiry of query sent to the receivers. Up on
// expiry, process the consolidated response for each of the devices participating
// in the MC stream. When a device has no receivers, the device is deleted
// from the group.
func (ig *IgmpGroup) QueryExpiry(cntx context.Context) {
for _, igd := range ig.Devices {
if _, ok := GetApplication().DevicesDisc.Load(igd.Device); ok {
igd.QueryExpiry(cntx)
if igd.NumReceivers() == 0 {
ig.DelIgmpGroupDevice(cntx, igd)
}
} else {
logger.Info(ctx, "Skipping Expiry since Device is unavailable")
}
}
}
// Hash : The IGMP group hash is used to distribute the processing of timers so that
// the processing is spread across doesn't spike at one instant. This also
// ensures that there is sufficient responsiveness to other requests happening
// simultaneously.
func (ig *IgmpGroup) Hash() uint16 {
mvp := GetApplication().GetMvlanProfileByTag(ig.Mvlan)
if mvp == nil {
return 0
}
mvp.mvpLock.RLock()
defer mvp.mvpLock.RUnlock()
group := mvp.Groups[ig.GroupName]
// Case where mvlan update in-progress
if group == nil || len(group.McIPs) == 0 {
return 0
}
groupIP := group.McIPs[0]
return uint16(groupIP[2])<<8 + uint16(groupIP[3])
}
// NumDevicesAll returns the number of devices (OLT) active on the IGMP group. When
// the last device leaves the IGMP group is removed. If this is not done,
// the number of IGMP groups only keep increasing and can impact CPU when
// the system runs for a very long duration
func (ig *IgmpGroup) NumDevicesAll() int {
return len(ig.Devices)
}
// NumDevicesActive returns the number of devices (OLT) active on the IGMP group. When
// the last device leaves the IGMP group is removed. If this is not done,
// the number of IGMP groups only keep increasing and can impact CPU when
// the system runs for a very long duration
func (ig *IgmpGroup) NumDevicesActive() int {
count := 0
for _, igd := range ig.Devices {
if igd.NumReceivers() == 0 && igd.GroupInstalled {
continue
}
count++
}
return count
}
// NumReceivers to return receiver list
func (ig *IgmpGroup) NumReceivers() map[string]int {
receiverList := make(map[string]int)
for device, igd := range ig.Devices {
receiverList[device] = igd.NumReceivers()
}
return receiverList
}
// RestoreDevices : IGMP group write to DB
func (ig *IgmpGroup) RestoreDevices(cntx context.Context) {
ig.migrateIgmpDevices(cntx)
devices, _ := db.GetIgmpDevices(cntx, ig.Mvlan, ig.GroupName, ig.GroupAddr)
for _, device := range devices {
b, ok := device.Value.([]byte)
if !ok {
logger.Warn(ctx, "The value type is not []byte")
continue
}
if igd, err := NewIgmpGroupDeviceFromBytes(b); err == nil {
igd.PonPortChannelMap = util.NewConcurrentMap()
// Update the proxy config pointers.
var mcastCfg *McastConfig
igd.proxyCfg, igd.IgmpProxyIP, mcastCfg = getIgmpProxyCfgAndIP(ig.Mvlan, igd.SerialNo)
if mcastCfg != nil {
mcastCfg.IgmpGroupDevices.Store(igd.GroupID, igd)
logger.Debugw(ctx, "Igd added to mcast config", log.Fields{"mvlan": mcastCfg.MvlanProfileID, "groupId": igd.GroupID})
}
mvp := GetApplication().GetMvlanProfileByTag(igd.Mvlan)
igd.ServVersion = mvp.IgmpServVersion[igd.SerialNo]
// During vgc upgrade from old version, igd.NextQueryTime and igd.QueryExpiryTime will not be present in db.
// hence they are initialized with current time offset.
emptyTime := time.Time{}
if emptyTime == igd.NextQueryTime {
logger.Debugw(ctx, "VGC igd upgrade", log.Fields{"igd grp name": igd.GroupName})
igd.NextQueryTime = time.Now().Add(time.Duration(igd.proxyCfg.KeepAliveInterval) * time.Second)
igd.QueryExpiryTime = time.Now().Add(time.Duration(igd.proxyCfg.KeepAliveInterval) * time.Second)
if err := igd.WriteToDb(cntx); err != nil {
logger.Errorw(ctx, "Igmp group device Write to DB failed", log.Fields{"Device": igd.Device,
"GroupName": igd.GroupName, "GroupAddr": igd.GroupAddr.String()})
}
}
ig.Devices[igd.Device] = igd
if ig.IsChannelBasedGroup {
channel, _ := db.GetIgmpChannel(cntx, igd.Mvlan, igd.GroupName, igd.Device, igd.GroupAddr)
igd.RestoreChannel(cntx, []byte(channel))
} else {
igd.RestoreChannels(cntx)
}
igd.PortChannelMap.Range(printPortChannel)
logger.Infow(ctx, "Group Device Restored", log.Fields{"IGD": igd})
} else {
logger.Warnw(ctx, "Unable to decode device from database", log.Fields{"str": string(b)})
}
}
}
// getKey to return group key
func (ig *IgmpGroup) getKey() string {
profile, ok := GetApplication().MvlanProfilesByTag.Load(ig.Mvlan)
if ok {
mvp := profile.(*MvlanProfile)
return mvp.generateGroupKey(ig.GroupName, ig.GroupAddr.String())
}
return ""
}
// WriteToDb is utility to write Igmp Group Info to database
func (ig *IgmpGroup) WriteToDb(cntx context.Context) error {
ig.Version = database.PresentVersionMap[database.IgmpGroupPath]
b, err := json.Marshal(ig)
if err != nil {
return err
}
if err1 := db.PutIgmpGroup(cntx, ig.getKey(), string(b)); err1 != nil {
return err1
}
return nil
}
// UpdateIgmpGroup : When the pending group is allocated to new
func (ig *IgmpGroup) UpdateIgmpGroup(cntx context.Context, oldKey, newKey string) {
// If the group is allocated to same McastGroup, no need to update the
// IgmpGroups map
if oldKey == newKey {
return
}
logger.Infow(ctx, "Updating Igmp Group with new MVP Group Info", log.Fields{"OldKey": oldKey, "NewKey": newKey, "GroupID": ig.GroupID})
GetApplication().IgmpGroups.Delete(oldKey)
_ = db.DelIgmpGroup(cntx, oldKey)
GetApplication().IgmpGroups.Store(newKey, ig)
if err := ig.WriteToDb(cntx); err != nil {
logger.Errorw(ctx, "Igmp group Write to DB failed", log.Fields{"groupName": ig.GroupName})
}
}
func (ig *IgmpGroup) removeExpiredGroupFromDevice(cntx context.Context) {
ig.PendingPoolLock.Lock()
defer ig.PendingPoolLock.Unlock()
for device, timer := range ig.PendingGroupForDevice {
// To ensure no race-condition between the expiry time and the new Join,
// ensure the group exists in pending pool before deletion
groupExistsInPendingPool := true
if !time.Now().After(timer) {
continue
}
// Check if the IgmpGroup obj has no active member across any device
// If Yes, then this group is part of global pending pool (IgmpPendingPool), hence if expired,
// Remove only the IgmpGroup obj referenced to this device from global pool also.
if ig.NumDevicesActive() == 0 {
groupExistsInPendingPool = GetApplication().RemoveGroupFromPendingPool(device, ig)
}
// Remove the group entry from device and remove the IgmpDev Obj
// from IgmpGrp Pending pool
if groupExistsInPendingPool {
ig.DeleteIgmpGroupDevice(cntx, device)
}
}
}
// DeleteIgmpGroupDevice - removes the IgmpGroupDevice obj from IgmpGroup and database
func (ig *IgmpGroup) DeleteIgmpGroupDevice(cntx context.Context, device string) {
logger.Infow(ctx, "Deleting IgmpGroupDevice from IG Pending Pool", log.Fields{"Device": device, "GroupID": ig.GroupID, "GroupName": ig.GroupName, "GroupAddr": ig.GroupAddr.String(), "PendingDevices": len(ig.Devices)})
igd := ig.Devices[device]
igd.DelMcGroup(true)
delete(ig.Devices, device)
delete(ig.PendingGroupForDevice, device)
_ = db.DelIgmpDevice(cntx, igd.Mvlan, igd.GroupName, igd.GroupAddr, igd.Device)
// If the group is not associated to any other device, then the entire Igmp Group obj itself can be removed
if ig.NumDevicesAll() == 0 {
logger.Infow(ctx, "Deleting IgmpGroup as all pending groups has expired", log.Fields{"Device": device, "GroupID": ig.GroupID, "GroupName": ig.GroupName, "GroupAddr": ig.GroupAddr.String(), "PendingDevices": len(ig.Devices)})
GetApplication().DelIgmpGroup(cntx, ig)
return
}
if err := ig.WriteToDb(cntx); err != nil {
logger.Errorw(ctx, "Igmp group Write to DB failed", log.Fields{"groupName": ig.GroupName})
}
}
// DelIgmpGroup deletes all devices for the provided igmp group
func (ig *IgmpGroup) DelIgmpGroup(cntx context.Context) {
logger.Infow(ctx, "Deleting All Device for Group", log.Fields{"Group": ig.GroupName})
for _, igd := range ig.Devices {
ig.DelIgmpGroupDevice(cntx, igd)
}
GetApplication().DelIgmpGroup(cntx, ig)
}