[VOL-4754] Igmp code re-organization
Change-Id: Ib34ea7af0b445e0091593bca6882412fea220fb1
diff --git a/internal/pkg/application/igmpgroup.go b/internal/pkg/application/igmpgroup.go
new file mode 100644
index 0000000..629d92c
--- /dev/null
+++ b/internal/pkg/application/igmpgroup.go
@@ -0,0 +1,650 @@
+/*
+* Copyright 2022-present Open Networking Foundation
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package application
+
+import (
+ "encoding/json"
+ "net"
+ "sync"
+ "time"
+
+ "github.com/google/gopacket/layers"
+
+ "voltha-go-controller/database"
+ "voltha-go-controller/internal/pkg/of"
+ "voltha-go-controller/internal/pkg/util"
+ "voltha-go-controller/log"
+)
+
+// IgmpGroup implements a single MCIP that may have multiple receivers
+// connected via multiple devices (OLTs). The IGMP group is stored on the
+// VOLT application.
+type IgmpGroup struct {
+ GroupID uint32
+ Mvlan of.VlanType
+ PonVlan of.VlanType
+ GroupName string
+ GroupAddr net.IP
+ Devices map[string]*IgmpGroupDevice `json:"-"`
+ PendingGroupForDevice map[string]time.Time //map [deviceId, timestamp] (ExpiryTime = leave time + 15mins)
+ Version string
+ IsPonVlanPresent bool
+ IsChannelBasedGroup bool
+ PendingPoolLock sync.RWMutex
+ IsGroupStatic bool
+ IgmpGroupLock sync.RWMutex
+}
+
+// NewIgmpGroup is constructor for an IGMP group
+func NewIgmpGroup(name string, vlan of.VlanType) *IgmpGroup {
+ ig := IgmpGroup{}
+ ig.GroupName = name
+ ig.Mvlan = vlan
+ ig.Devices = make(map[string]*IgmpGroupDevice)
+ ig.PendingGroupForDevice = make(map[string]time.Time)
+ return &ig
+}
+
+// IgmpGroupInit to initialize igmp group members
+func (ig *IgmpGroup) IgmpGroupInit(name string, gip net.IP, mvp *MvlanProfile) {
+ ig.GroupName = name
+ ig.Mvlan = mvp.Mvlan
+ ig.PonVlan = mvp.PonVlan
+ ig.IsPonVlanPresent = mvp.IsPonVlanPresent
+ ig.Devices = make(map[string]*IgmpGroupDevice)
+ ig.PendingGroupForDevice = make(map[string]time.Time)
+ ig.IsChannelBasedGroup = mvp.IsChannelBasedGroup
+ ig.IsGroupStatic = mvp.Groups[name].IsStatic
+ if ig.IsChannelBasedGroup {
+ ig.GroupAddr = gip
+ } else {
+ ig.GroupAddr = net.ParseIP("0.0.0.0")
+ }
+}
+
+// IgmpGroupReInit to re-initialize igmp group members
+func (ig *IgmpGroup) IgmpGroupReInit(name string, gip net.IP) {
+
+ logger.Infow(ctx, "Reinitialize Igmp Group", log.Fields{"GroupID": ig.GroupID, "OldName": ig.GroupName, "Name": name, "OldAddr": ig.GroupAddr.String(), "GroupAddr": gip.String()})
+
+ ig.GroupName = name
+ if ig.IsChannelBasedGroup {
+ ig.GroupAddr = gip
+ } else {
+ ig.GroupAddr = net.ParseIP("0.0.0.0")
+ }
+
+ for _, igd := range ig.Devices {
+ igd.IgmpGroupDeviceReInit(ig)
+ }
+}
+
+// updateGroupName to update group name
+func (ig *IgmpGroup) updateGroupName(newGroupName string) {
+ if !ig.IsChannelBasedGroup {
+ logger.Errorw(ctx, "Group name update not supported for GroupChannel based group", log.Fields{"Ig": ig})
+ return
+ }
+ oldKey := ig.getKey()
+ ig.GroupName = newGroupName
+ for _, igd := range ig.Devices {
+ igd.updateGroupName(newGroupName)
+ }
+ if err := ig.WriteToDb(); err != nil {
+ logger.Errorw(ctx, "Igmp group Write to DB failed", log.Fields{"groupName": ig.GroupName})
+ }
+ if !ig.IsChannelBasedGroup {
+ _ = db.DelIgmpGroup(oldKey)
+ }
+}
+
+//HandleGroupMigration - handles migration of group members between static & dynamic
+func (ig *IgmpGroup) HandleGroupMigration(deviceID string, groupAddr net.IP) {
+
+ var group *layers.IGMPv3GroupRecord
+ app := GetApplication()
+ if deviceID == "" {
+ logger.Infow(ctx, "Handle Group Migration Request for all devices", log.Fields{"DeviceID": deviceID, "GroupAddr": groupAddr, "IG": ig.GroupName, "Mvlan": ig.Mvlan})
+ for device := range ig.Devices {
+ ig.HandleGroupMigration(device, groupAddr)
+ }
+ } else {
+ logger.Infow(ctx, "Handle Group Migration Request", log.Fields{"DeviceID": deviceID, "GroupAddr": groupAddr, "IG": ig.GroupName})
+ var newIg *IgmpGroup
+ receivers := ig.DelIgmpChannel(deviceID, groupAddr)
+ if ig.NumDevicesActive() == 0 {
+ app.DelIgmpGroup(ig)
+ }
+ if newIg = app.GetIgmpGroup(ig.Mvlan, groupAddr); newIg == nil {
+ logger.Infow(ctx, "IG Group doesn't exist, creating new group", log.Fields{"DeviceID": deviceID, "GroupAddr": groupAddr, "IG": ig.GroupName, "Mvlan": ig.Mvlan})
+ if newIg = app.AddIgmpGroup(app.GetMvlanProfileByTag(ig.Mvlan).Name, groupAddr, deviceID); newIg == nil {
+ logger.Errorw(ctx, "Group Creation failed during group migration", log.Fields{"DeviceID": deviceID, "GroupAddr": groupAddr})
+ return
+ }
+ }
+ mvp := app.GetMvlanProfileByTag(ig.Mvlan)
+ isStaticGroup := mvp.IsStaticGroup(ig.GroupName)
+ logger.Infow(ctx, "Existing receivers for old group", log.Fields{"Receivers": receivers})
+ newIg.IgmpGroupLock.Lock()
+ for port, igp := range receivers {
+ if !isStaticGroup && port == StaticPort {
+ continue
+ }
+ group = nil
+ var reqType layers.IGMPv3GroupRecordType
+ srcAddresses := []net.IP{}
+ if igp.Version == IgmpVersion3 {
+ if igp.Exclude {
+ srcAddresses = append(srcAddresses, igp.ExcludeList...)
+ reqType = layers.IGMPIsEx
+ } else {
+ srcAddresses = append(srcAddresses, igp.IncludeList...)
+ reqType = layers.IGMPIsIn
+ }
+ group = &layers.IGMPv3GroupRecord{
+ SourceAddresses: srcAddresses,
+ Type: reqType,
+ }
+ }
+ logger.Infow(ctx, "Adding receiver to new group", log.Fields{"DeviceID": deviceID, "GroupAddr": groupAddr, "newIg": newIg.GroupName, "IGP": igp})
+ ponPort := GetApplication().GetPonPortID(deviceID, port)
+ newIg.AddReceiver(deviceID, port, groupAddr, group, igp.Version, igp.CVlan, igp.Pbit, ponPort)
+ }
+ newIg.IgmpGroupLock.Unlock()
+ }
+}
+
+// AddIgmpGroupDevice add a device to the group which happens when the first receiver of the device
+// is added to the IGMP group.
+func (ig *IgmpGroup) AddIgmpGroupDevice(device string, id uint32, version uint8) *IgmpGroupDevice {
+ logger.Infow(ctx, "Adding Device to IGMP group", log.Fields{"Device": device, "GroupName": ig.GroupName})
+ igd := NewIgmpGroupDevice(device, ig, id, version)
+ ig.Devices[device] = igd
+ if err := igd.WriteToDb(); err != nil {
+ logger.Errorw(ctx, "Igmp group device Write to DB failed", log.Fields{"Device": igd.Device, "GroupName": igd.GroupName, "GroupAddr": igd.GroupAddr.String()})
+ }
+ return igd
+}
+
+// DelIgmpGroupDevice delete the device from the group which happens when we receive a leave or when
+// there is not response for IGMP query from the receiver
+func (ig *IgmpGroup) DelIgmpGroupDevice(igd *IgmpGroupDevice) {
+ logger.Infow(ctx, "Deleting Device from IGMP group", log.Fields{"Device": igd.Device, "Name": ig.GroupName})
+ va := GetApplication()
+ countersToBeUpdated := false
+ if igd.NumReceivers() != 0 {
+ countersToBeUpdated = true
+ }
+ igd.DelAllChannels()
+
+ //Clear all internal maps so that the groups can be reused
+ igd.PortChannelMap.Range(func(key, value interface{}) bool {
+
+ //Update the counters only if not already updated
+ //(i.e) 1. In case of channel remove during Mvlan Update
+ if countersToBeUpdated {
+ port := key.(string)
+ channelList := value.([]net.IP)
+ ponPortID := va.GetPonPortID(igd.Device, port)
+
+ for _, channel := range channelList {
+ igd.RemoveChannelFromChannelsPerPon(port, channel, ponPortID)
+ }
+ }
+
+ igd.PortChannelMap.Delete(key)
+ return true
+ })
+ igd.PonPortChannelMap = util.NewConcurrentMap()
+
+ if mcastCfg := va.GetMcastConfig(igd.SerialNo, va.GetMvlanProfileByTag(igd.Mvlan).Name); mcastCfg != nil {
+ mcastCfg.IgmpGroupDevices.Delete(igd.GroupID)
+ logger.Debugw(ctx, "Igd deleted from mcast config", log.Fields{"mvlan": mcastCfg.MvlanProfileID, "groupId": igd.GroupID})
+ }
+ if !igd.GroupInstalled {
+ _ = db.DelIgmpDevice(igd.Mvlan, ig.GroupName, ig.GroupAddr, igd.Device)
+ delete(ig.Devices, igd.Device)
+ }
+}
+
+// AddReceiver delete the device from the group which happens when we receive a leave or when
+// there is not response for IGMP query from the receiver
+func (ig *IgmpGroup) AddReceiver(device string, port string, groupIP net.IP,
+ group *layers.IGMPv3GroupRecord, ver uint8, cvlan uint16, pbit uint8, ponPort uint32) {
+
+ logger.Debugw(ctx, "Adding Receiver", log.Fields{"Port": port})
+ if igd, ok := ig.getIgmpGroupDevice(device); !ok {
+ igd = ig.AddIgmpGroupDevice(device, ig.GroupID, ver)
+ igd.AddReceiver(port, groupIP, group, ver, cvlan, pbit, ponPort)
+ } else {
+ logger.Infow(ctx, "IGMP Group Receiver", log.Fields{"IGD": igd.Device})
+ igd.AddReceiver(port, groupIP, group, ver, cvlan, pbit, ponPort)
+ }
+}
+
+func (ig *IgmpGroup) getIgmpGroupDevice(device string) (*IgmpGroupDevice, bool) {
+ ig.PendingPoolLock.Lock()
+ defer ig.PendingPoolLock.Unlock()
+
+ if _, ok := ig.PendingGroupForDevice[device]; ok {
+ logger.Infow(ctx, "Removing the IgmpGroupDevice from pending pool", log.Fields{"GroupID": ig.GroupID, "Device": device, "GroupName": ig.GroupName, "GroupAddr": ig.GroupAddr.String()})
+ delete(ig.PendingGroupForDevice, device)
+ if err := ig.WriteToDb(); err != nil {
+ logger.Errorw(ctx, "Igmp group Write to DB failed", log.Fields{"groupName": ig.GroupName})
+ }
+ }
+ igd, ok := ig.Devices[device]
+ return igd, ok
+}
+
+// DelReceiveronDownInd deletes a receiver which is the combination of device (OLT)
+// and port on Port Down event
+func (ig *IgmpGroup) DelReceiveronDownInd(device string, port string, ponPortID uint32) {
+ logger.Debugw(ctx, "Deleting Receiver for Group", log.Fields{"Device": device, "port": port})
+
+ mvp := GetApplication().GetMvlanProfileByTag(ig.Mvlan)
+ mvp.mvpLock.RLock()
+ defer mvp.mvpLock.RUnlock()
+ igd, ok := ig.Devices[device]
+ if !ok {
+ logger.Infow(ctx, "IGMP Group device was not found for ", log.Fields{"Device": device})
+ return
+ }
+ ipsList := []net.IP{}
+ ipsListIntf, ok := igd.PortChannelMap.Load(port)
+ if ok {
+ ipsList = append(ipsList, ipsListIntf.([]net.IP)...)
+ }
+ logger.Infow(ctx, "Port Channel List", log.Fields{"Port": port, "IPsList": ipsList})
+ igd.PortChannelMap.Range(printPortChannel)
+
+
+ for _, groupAddr := range ipsList {
+ logger.Debugw(ctx, "Port Channels", log.Fields{"Port": port, "IPsList": ipsList, "GroupAddr": groupAddr, "Len": len(ipsList)})
+ igd.DelReceiver(groupAddr, port, nil, ponPortID)
+ }
+
+ if igd.NumReceivers() == 0 {
+ ig.DelIgmpGroupDevice(igd)
+ }
+}
+
+// DelReceiver deletes a receiver which is the combination of device (OLT)
+// and port
+func (ig *IgmpGroup) DelReceiver(device string, port string, groupAddr net.IP, group *layers.IGMPv3GroupRecord, ponPortID uint32) {
+ logger.Debugw(ctx, "Deleting Receiver for Group", log.Fields{"Device": device, "port": port, "GroupIP": groupAddr.String()})
+ if igd, ok := ig.Devices[device]; ok {
+ //igd.DelReceiverForGroupAddr(groupAddr, port)
+ igd.DelReceiver(groupAddr, port, group, ponPortID)
+ if igd.NumReceivers() == 0 {
+ ig.DelIgmpGroupDevice(igd)
+ }
+ }
+}
+
+// GetAllIgmpChannelForDevice - Returns all channels with active members associated to the Igmp Group for the given device
+func (ig *IgmpGroup) GetAllIgmpChannelForDevice(deviceID string) map[string]string {
+
+ if deviceID == "" {
+ return ig.GetAllIgmpChannel()
+ }
+
+ allChannels := make(map[string]string)
+ igd := ig.Devices[deviceID]
+ getAllChannels := func(key interface{}, value interface{}) bool {
+ channels := key.(string)
+ allChannels[channels] = channels //same value as only key is required
+ return true
+ }
+ igd.GroupChannels.Range(getAllChannels)
+
+ return allChannels
+}
+
+// GetAllIgmpChannel - Returns all channels with active members associated to the Igmp Group
+func (ig *IgmpGroup) GetAllIgmpChannel() map[string]string {
+ allChannels := make(map[string]string)
+ for _, igd := range ig.Devices {
+ getAllChannels := func(key interface{}, value interface{}) bool {
+ channels := key.(string)
+ allChannels[channels] = channels
+ return true
+ }
+ igd.GroupChannels.Range(getAllChannels)
+ }
+ return allChannels
+}
+
+// DelIgmpChannel deletes all receivers for the provided igmp group channel for the given device
+func (ig *IgmpGroup) DelIgmpChannel(deviceID string, groupAddr net.IP) map[string]*IgmpGroupPort {
+ logger.Infow(ctx, "Deleting Channel from devices", log.Fields{"Device": deviceID, "Group": ig.GroupName, "Channel": groupAddr.String()})
+ if deviceID == "" {
+ for device := range ig.Devices {
+ ig.DelIgmpChannel(device, groupAddr)
+ }
+ return nil
+ }
+ igd := ig.Devices[deviceID]
+ receivers := igd.DelChannelReceiver(groupAddr)
+ if igd.NumReceivers() == 0 {
+ ig.DelIgmpGroupDevice(igd)
+ }
+ return receivers
+}
+
+// IsNewReceiver checks if the received port is new receiver or existing one.
+// Returns true if new receiver.
+func (ig *IgmpGroup) IsNewReceiver(device, uniPortID string, groupAddr net.IP) bool {
+ if ig == nil {
+ // IGMP group does not exists. So considering it as new receiver.
+ return true
+ }
+ logger.Debugw(ctx, "IGMP Group", log.Fields{"channel": groupAddr, "groupName": ig.GroupName}) // TODO: Remove me
+ igd, exists := ig.Devices[device]
+ if !exists || !igd.GroupInstalled {
+ // IGMP group not exists OR Group is not created in the device.
+ // So this is a new receiver.
+ logger.Debugw(ctx, "igd not exists or group is not created in device", log.Fields{"exists": exists}) // TODO: Remove me
+ return true
+ }
+ if igc, ok := igd.GroupChannels.Load(groupAddr.String()); ok {
+ logger.Debugw(ctx, "IGMP Channel receivers", log.Fields{"igc-receivers": igc.(*IgmpGroupChannel).CurReceivers}) // TODO: Remove me
+ _, rcvrExistCur := igc.(*IgmpGroupChannel).CurReceivers[uniPortID]
+ _, rcvrExistNew := igc.(*IgmpGroupChannel).NewReceivers[uniPortID]
+ if rcvrExistCur || rcvrExistNew {
+ // Existing receiver
+ return false
+ }
+ }
+ return true
+}
+
+// Tick for Addition of groups to an MVLAN profile
+func (ig *IgmpGroup) Tick() {
+ now := time.Now()
+ for _, igd := range ig.Devices {
+ var igdChangeCnt uint8
+
+ if _, ok := GetApplication().DevicesDisc.Load(igd.Device); !ok {
+ logger.Info(ctx, "Skipping Query and Expiry check since Device is unavailable")
+ continue
+ }
+ if now.After(igd.NextQueryTime) {
+ // Set the next query time and the query expiry time to
+ // KeepAliveInterval and MaxResp seconds after current time
+ igd.NextQueryTime = now.Add(time.Duration(igd.proxyCfg.KeepAliveInterval) * time.Second)
+ igd.QueryExpiryTime = now.Add(time.Duration(igd.proxyCfg.MaxResp) * time.Second)
+ logger.Debugw(ctx, "Query Start", log.Fields{"NextQuery": igd.NextQueryTime, "Expiry": igd.QueryExpiryTime})
+ igdChangeCnt++
+ logger.Debugw(ctx, "Sending Query to device", log.Fields{"Device": igd.Device})
+ sendQueryForAllChannels := func(key interface{}, value interface{}) bool {
+ igc := value.(*IgmpGroupChannel)
+ //TODO - Do generic query to avoid multiple msgs
+ igc.SendQuery()
+ return true
+ }
+ igd.GroupChannels.Range(sendQueryForAllChannels)
+ }
+ if now.After(igd.QueryExpiryTime) {
+ igd.QueryExpiry()
+ // This will keep it quiet till the next query time and then
+ // it will be reset to a value after the query initiation time
+ igd.QueryExpiryTime = igd.NextQueryTime
+ logger.Debugw(ctx, "Expiry", log.Fields{"NextQuery": igd.NextQueryTime, "Expiry": igd.QueryExpiryTime})
+ igdChangeCnt++
+ if igd.NumReceivers() == 0 {
+ ig.DelIgmpGroupDevice(igd)
+ continue
+ }
+ }
+
+ igdChangeCnt += igd.Tick()
+
+ if igdChangeCnt > 0 {
+ if err := igd.WriteToDb(); err != nil {
+ logger.Errorw(ctx, "Igmp group device Write to DB failed", log.Fields{"Device": igd.Device,
+ "GroupName": igd.GroupName, "GroupAddr": igd.GroupAddr.String()})
+ }
+ }
+ }
+}
+
+// QueryExpiry processes expiry of query sent to the receivers. Up on
+// expiry, process the consolidated response for each of the devices participating
+// in the MC stream. When a device has no receivers, the device is deleted
+// from the group.
+func (ig *IgmpGroup) QueryExpiry() {
+ for _, igd := range ig.Devices {
+ if _, ok := GetApplication().DevicesDisc.Load(igd.Device); ok {
+ igd.QueryExpiry()
+ if igd.NumReceivers() == 0 {
+ ig.DelIgmpGroupDevice(igd)
+ }
+
+ } else {
+ logger.Info(ctx, "Skipping Expiry since Device is unavailable")
+ }
+ }
+}
+
+// Hash : The IGMP group hash is used to distribute the processing of timers so that
+// the processing is spread across doesn't spike at one instant. This also
+// ensures that there is sufficient responsiveness to other requests happening
+// simultaneously.
+func (ig *IgmpGroup) Hash() uint16 {
+ mvp := GetApplication().GetMvlanProfileByTag(ig.Mvlan)
+
+ if mvp == nil {
+ return 0
+ }
+
+ mvp.mvpLock.RLock()
+ defer mvp.mvpLock.RUnlock()
+ group := mvp.Groups[ig.GroupName]
+
+ //Case where mvlan update in-progress
+ if group == nil || len(group.McIPs) == 0 {
+ return 0
+ }
+ groupIP := group.McIPs[0]
+ return uint16(groupIP[2])<<8 + uint16(groupIP[3])
+}
+
+// NumDevicesAll returns the number of devices (OLT) active on the IGMP group. When
+// the last device leaves the IGMP group is removed. If this is not done,
+// the number of IGMP groups only keep increasing and can impact CPU when
+// the system runs for a very long duration
+func (ig *IgmpGroup) NumDevicesAll() int {
+ return len(ig.Devices)
+}
+
+// NumDevicesActive returns the number of devices (OLT) active on the IGMP group. When
+// the last device leaves the IGMP group is removed. If this is not done,
+// the number of IGMP groups only keep increasing and can impact CPU when
+// the system runs for a very long duration
+func (ig *IgmpGroup) NumDevicesActive() int {
+ count := 0
+ for _, igd := range ig.Devices {
+ if igd.NumReceivers() == 0 && igd.GroupInstalled {
+ continue
+ }
+ count++
+ }
+ return count
+}
+
+// NumReceivers to return receiver list
+func (ig *IgmpGroup) NumReceivers() map[string]int {
+ receiverList := make(map[string]int)
+ for device, igd := range ig.Devices {
+ receiverList[device] = igd.NumReceivers()
+ }
+ return receiverList
+}
+
+// RestoreDevices : IGMP group write to DB
+func (ig *IgmpGroup) RestoreDevices() {
+
+ ig.migrateIgmpDevices()
+ devices, _ := db.GetIgmpDevices(ig.Mvlan, ig.GroupName, ig.GroupAddr)
+ for _, device := range devices {
+ b, ok := device.Value.([]byte)
+ if !ok {
+ logger.Warn(ctx, "The value type is not []byte")
+ continue
+ }
+ if igd, err := NewIgmpGroupDeviceFromBytes(b); err == nil {
+ igd.PonPortChannelMap = util.NewConcurrentMap()
+ // Update the proxy config pointers.
+ var mcastCfg *McastConfig
+ igd.proxyCfg, igd.IgmpProxyIP, mcastCfg = getIgmpProxyCfgAndIP(ig.Mvlan, igd.SerialNo)
+ if mcastCfg != nil {
+ mcastCfg.IgmpGroupDevices.Store(igd.GroupID, igd)
+ logger.Debugw(ctx, "Igd added to mcast config", log.Fields{"mvlan": mcastCfg.MvlanProfileID, "groupId": igd.GroupID})
+ }
+
+ mvp := GetApplication().GetMvlanProfileByTag(igd.Mvlan)
+ igd.ServVersion = mvp.IgmpServVersion[igd.SerialNo]
+
+ // During vgc upgrade from old version, igd.NextQueryTime and igd.QueryExpiryTime will not be present in db.
+ // hence they are initialized with current time offset.
+ emptyTime := time.Time{}
+ if emptyTime == igd.NextQueryTime {
+ logger.Debugw(ctx, "VGC igd upgrade", log.Fields{"igd grp name": igd.GroupName})
+ igd.NextQueryTime = time.Now().Add(time.Duration(igd.proxyCfg.KeepAliveInterval) * time.Second)
+ igd.QueryExpiryTime = time.Now().Add(time.Duration(igd.proxyCfg.KeepAliveInterval) * time.Second)
+ if err := igd.WriteToDb(); err != nil {
+ logger.Errorw(ctx, "Igmp group device Write to DB failed", log.Fields{"Device": igd.Device,
+ "GroupName": igd.GroupName, "GroupAddr": igd.GroupAddr.String()})
+ }
+ }
+
+ ig.Devices[igd.Device] = igd
+ if ig.IsChannelBasedGroup {
+ channel, _ := db.GetIgmpChannel(igd.Mvlan, igd.GroupName, igd.Device, igd.GroupAddr)
+ igd.RestoreChannel([]byte(channel))
+ } else {
+ igd.RestoreChannels()
+ }
+ igd.PortChannelMap.Range(printPortChannel)
+ logger.Infow(ctx, "Group Device Restored", log.Fields{"IGD": igd})
+ } else {
+ logger.Warnw(ctx, "Unable to decode device from database", log.Fields{"str": string(b)})
+ }
+ }
+}
+
+// getKey to return group key
+func (ig *IgmpGroup) getKey() string {
+ profile, ok := GetApplication().MvlanProfilesByTag.Load(ig.Mvlan)
+ if ok {
+ mvp := profile.(*MvlanProfile)
+ return mvp.generateGroupKey(ig.GroupName, ig.GroupAddr.String())
+ }
+ return ""
+}
+
+// WriteToDb is utility to write Igmp Group Info to database
+func (ig *IgmpGroup) WriteToDb() error {
+ ig.Version = database.PresentVersionMap[database.IgmpGroupPath]
+ b, err := json.Marshal(ig)
+ if err != nil {
+ return err
+ }
+ if err1 := db.PutIgmpGroup(ig.getKey(), string(b)); err1 != nil {
+ return err1
+ }
+ return nil
+}
+
+// UpdateIgmpGroup : When the pending group is allocated to new
+func (ig *IgmpGroup) UpdateIgmpGroup(oldKey, newKey string) {
+
+ //If the group is allocated to same McastGroup, no need to update the
+ //IgmpGroups map
+ if oldKey == newKey {
+ return
+ }
+ logger.Infow(ctx, "Updating Igmp Group with new MVP Group Info", log.Fields{"OldKey": oldKey, "NewKey": newKey, "GroupID": ig.GroupID})
+
+ GetApplication().IgmpGroups.Delete(oldKey)
+ _ = db.DelIgmpGroup(oldKey)
+
+ GetApplication().IgmpGroups.Store(newKey, ig)
+ if err := ig.WriteToDb(); err != nil {
+ logger.Errorw(ctx, "Igmp group Write to DB failed", log.Fields{"groupName": ig.GroupName})
+ }
+}
+
+func (ig *IgmpGroup) removeExpiredGroupFromDevice() {
+ ig.PendingPoolLock.Lock()
+ defer ig.PendingPoolLock.Unlock()
+
+ for device, timer := range ig.PendingGroupForDevice {
+
+ // To ensure no race-condition between the expiry time and the new Join,
+ // ensure the group exists in pending pool before deletion
+ groupExistsInPendingPool := true
+
+ if !time.Now().After(timer) {
+ continue
+ }
+
+ // Check if the IgmpGroup obj has no active member across any device
+ // If Yes, then this group is part of global pending pool (IgmpPendingPool), hence if expired,
+ // Remove only the IgmpGroup obj referenced to this device from global pool also.
+ if ig.NumDevicesActive() == 0 {
+ groupExistsInPendingPool = GetApplication().RemoveGroupFromPendingPool(device, ig)
+ }
+
+ // Remove the group entry from device and remove the IgmpDev Obj
+ // from IgmpGrp Pending pool
+ if groupExistsInPendingPool {
+ ig.DeleteIgmpGroupDevice(device)
+ }
+ }
+}
+
+//DeleteIgmpGroupDevice - removes the IgmpGroupDevice obj from IgmpGroup and database
+func (ig *IgmpGroup) DeleteIgmpGroupDevice(device string) {
+
+ logger.Infow(ctx, "Deleting IgmpGroupDevice from IG Pending Pool", log.Fields{"Device": device, "GroupID": ig.GroupID, "GroupName": ig.GroupName, "GroupAddr": ig.GroupAddr.String(), "PendingDevices": len(ig.Devices)})
+
+ igd := ig.Devices[device]
+ igd.DelMcGroup(true)
+ delete(ig.Devices, device)
+ delete(ig.PendingGroupForDevice, device)
+ _ = db.DelIgmpDevice(igd.Mvlan, igd.GroupName, igd.GroupAddr, igd.Device)
+
+ //If the group is not associated to any other device, then the entire Igmp Group obj itself can be removed
+ if ig.NumDevicesAll() == 0 {
+ logger.Infow(ctx, "Deleting IgmpGroup as all pending groups has expired", log.Fields{"Device": device, "GroupID": ig.GroupID, "GroupName": ig.GroupName, "GroupAddr": ig.GroupAddr.String(), "PendingDevices": len(ig.Devices)})
+ GetApplication().DelIgmpGroup(ig)
+ return
+ }
+ if err := ig.WriteToDb(); err != nil {
+ logger.Errorw(ctx, "Igmp group Write to DB failed", log.Fields{"groupName": ig.GroupName})
+ }
+}
+
+// DelIgmpGroup deletes all devices for the provided igmp group
+func (ig *IgmpGroup) DelIgmpGroup() {
+ logger.Infow(ctx, "Deleting All Device for Group", log.Fields{"Group": ig.GroupName})
+ for _, igd := range ig.Devices {
+ ig.DelIgmpGroupDevice(igd)
+ }
+ GetApplication().DelIgmpGroup(ig)
+}