blob: ba3f8fe5726b86e92f0bc7c74bdefe04f77d67d6 [file] [log] [blame]
/*
* Copyright 2022-present Open Networking Foundation
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package application
import (
"encoding/json"
"errors"
"net"
"reflect"
"voltha-go-controller/internal/pkg/types"
"strconv"
"strings"
"sync"
"time"
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
cntlr "voltha-go-controller/internal/pkg/controller"
"voltha-go-controller/database"
"voltha-go-controller/internal/pkg/of"
"voltha-go-controller/internal/pkg/util"
"voltha-go-controller/log"
)
const (
// IgmpVersion0 constant (Default init value)
IgmpVersion0 uint8 = 0
// IgmpVersion1 constant
IgmpVersion1 uint8 = 1
// IgmpVersion2 constant
IgmpVersion2 uint8 = 2
// IgmpVersion3 constant
IgmpVersion3 uint8 = 3
// MinKeepAliveInterval constant
MinKeepAliveInterval uint32 = 10
// MaxDiffKAIntervalResp constant
MaxDiffKAIntervalResp uint32 = 5
// StaticGroup constant
StaticGroup string = "static"
// DynamicGroup constant
DynamicGroup string = "dynamic"
// StaticPort constant
StaticPort string = "static_port"
// DefaultIgmpProfID constant
DefaultIgmpProfID = ""
//GroupExpiryTime - group expiry time in minutes
GroupExpiryTime uint32 = 15
)
const (
// JoinUnsuccessful constant
JoinUnsuccessful string = "JOIN-UNSUCCESSFUL"
// JoinUnsuccessfulExceededIGMPChanel constant
JoinUnsuccessfulExceededIGMPChanel string = "Exceeded subscriber or PON port IGMP channels threshold"
// JoinUnsuccessfulAddFlowGroupFailed constant
JoinUnsuccessfulAddFlowGroupFailed string = "Failed to add flow or group for a channel"
// JoinUnsuccessfulGroupNotConfigured constant
JoinUnsuccessfulGroupNotConfigured string = "Join received from a subscriber on non-configured group"
// JoinUnsuccessfulVlanDisabled constant
JoinUnsuccessfulVlanDisabled string = "Vlan is disabled"
// JoinUnsuccessfulDescription constant
JoinUnsuccessfulDescription string = "igmp join unsuccessful"
// QueryExpired constant
QueryExpired string = "QUERY-EXPIRED"
// QueryExpiredGroupSpecific constant
QueryExpiredGroupSpecific string = "Group specific multicast query expired"
// QueryExpiredDescription constant
QueryExpiredDescription string = "igmp query expired"
)
// IgmpProfile structure
type IgmpProfile struct {
ProfileID string
UnsolicitedTimeOut uint32 //In seconds
MaxResp uint32
KeepAliveInterval uint32
KeepAliveCount uint32
LastQueryInterval uint32
LastQueryCount uint32
FastLeave bool
PeriodicQuery bool
IgmpCos uint8
WithRAUpLink bool
WithRADownLink bool
IgmpVerToServer string
IgmpSourceIP net.IP
Version string
}
// McastConfig structure
type McastConfig struct {
OltSerialNum string
MvlanProfileID string
IgmpProfileID string
IgmpProxyIP net.IP
OperState OperInProgress
Version string
// This map will help in updating the igds whenever there is a igmp profile id update
IgmpGroupDevices sync.Map `json:"-"` // Key is group id
}
var (
// NullIPAddr is null ip address var
NullIPAddr = net.ParseIP("0.0.0.0")
// igmpSrcMac for the proxy
igmpSrcMac string
)
func init() {
RegisterPacketHandler(IGMP, ProcessIgmpPacket)
}
// ProcessIgmpPacket : CallBack function registered with application to handle IGMP packetIn
func ProcessIgmpPacket(device string, port string, pkt gopacket.Packet) {
GetApplication().IgmpPacketInd(device, port, pkt)
}
func ipv4ToUint(ip net.IP) uint32 {
result := uint32(0)
addr := ip.To4()
if addr == nil {
logger.Warnw(ctx, "Invalid Group Addr", log.Fields{"IP": ip})
return 0
}
result = result + uint32(addr[0])<<24
result = result + uint32(addr[1])<<16
result = result + uint32(addr[2])<<8
result = result + uint32(addr[3])
return result
}
func getPodMacAddr() (string, error) {
ifas, err := net.Interfaces()
if err != nil {
return "", err
}
var ipv4Addr net.IP
for _, ifa := range ifas {
addrs, err := ifa.Addrs()
if err != nil {
return "", err
}
for _, addr := range addrs {
if ipv4Addr = addr.(*net.IPNet).IP.To4(); ipv4Addr != nil {
if ipv4Addr.IsGlobalUnicast() {
logger.Infow(ctx, "Igmp Static config", log.Fields{"MacAddr": ifa.HardwareAddr.String(), "ipAddr": ipv4Addr})
return ifa.HardwareAddr.String(), nil
}
}
}
}
return "", errors.New("MAC Address not found,Setting default")
}
// IgmpUsEthLayer : Layers defined for upstream communication
// Ethernet layer for upstream communication
func IgmpUsEthLayer(mcip net.IP) *layers.Ethernet {
eth := &layers.Ethernet{}
// TODO: Set the source MAC properly and remove hardcoding
eth.SrcMAC, _ = net.ParseMAC(igmpSrcMac)
eth.DstMAC, _ = net.ParseMAC("01:00:5e:00:00:00")
eth.DstMAC[3] = mcip[1] & 0x7f
eth.DstMAC[4] = mcip[2]
eth.DstMAC[5] = mcip[3]
eth.EthernetType = layers.EthernetTypeDot1Q
return eth
}
// IgmpUsDot1qLayer set US VLAN layer
func IgmpUsDot1qLayer(vlan of.VlanType, priority uint8) *layers.Dot1Q {
dot1q := &layers.Dot1Q{}
dot1q.Priority = priority
dot1q.DropEligible = false
dot1q.VLANIdentifier = uint16(vlan)
dot1q.Type = layers.EthernetTypeIPv4
return dot1q
}
// Igmpv2UsIpv4Layer : Set the IP layer for IGMPv2
// TODO - Identify correct way of obtaining source IP
// This should be the configured IGMP proxy address which should be per OLT
// We should probably be able to have a single function for both
// upstream and downstream
func Igmpv2UsIpv4Layer(src net.IP, mcip net.IP) *layers.IPv4 {
ip := &layers.IPv4{}
ip.Version = 4
ip.Protocol = layers.IPProtocolIGMP
ip.TTL = 1
ip.SrcIP = src
ip.DstIP = mcip
return ip
}
// Igmpv3UsIpv4Layer : Set the IP layer for IGMPv3
// TODO - Identify correct way of obtaining source IP
// This should be the configured IGMP proxy address which should be per OLT
// We should probably be able to have a single function for both
// upstream and downstream
func Igmpv3UsIpv4Layer(src net.IP) *layers.IPv4 {
ip := &layers.IPv4{}
ip.Version = 4
ip.Protocol = layers.IPProtocolIGMP
ip.TTL = 1
ip.SrcIP = src
ip.DstIP = net.ParseIP("224.0.0.22")
return ip
}
// IgmpDsEthLayer : Layers defined for downstream communication
// Ethernet layer for downstream communication
func IgmpDsEthLayer(mcip net.IP) *layers.Ethernet {
eth := &layers.Ethernet{}
// TODO: Set the source and dest MAC properly and remove hardcoding
eth.SrcMAC, _ = net.ParseMAC(igmpSrcMac)
eth.DstMAC, _ = net.ParseMAC("01:00:5e:00:00:00")
eth.DstMAC[3] = mcip[1] & 0x7f
eth.DstMAC[4] = mcip[2]
eth.DstMAC[5] = mcip[3]
eth.EthernetType = layers.EthernetTypeDot1Q
return eth
}
// IgmpDsDot1qLayer set the DS VLAN layer
func IgmpDsDot1qLayer(vlan of.VlanType, priority uint8) *layers.Dot1Q {
dot1q := &layers.Dot1Q{}
dot1q.Priority = priority
dot1q.DropEligible = false
dot1q.VLANIdentifier = uint16(vlan)
dot1q.Type = layers.EthernetTypeIPv4
return dot1q
}
// IgmpDsIpv4Layer set the IP layer
func IgmpDsIpv4Layer(src net.IP, mcip net.IP) *layers.IPv4 {
ip := &layers.IPv4{}
ip.Version = 4
ip.Protocol = layers.IPProtocolIGMP
ip.TTL = 1
ip.SrcIP = src
if mcip.Equal(net.ParseIP("0.0.0.0")) {
mcip = net.ParseIP("224.0.0.1")
}
ip.DstIP = mcip
return ip
}
// IgmpQueryv2Layer : IGMP Query Layer
func IgmpQueryv2Layer(mcip net.IP, resptime time.Duration) *layers.IGMPv1or2 {
igmp := &layers.IGMPv1or2{}
igmp.Type = layers.IGMPMembershipQuery
igmp.GroupAddress = mcip
igmp.MaxResponseTime = resptime
return igmp
}
// IgmpQueryv3Layer : IGMP v3 Query Layer
func IgmpQueryv3Layer(mcip net.IP, resptime time.Duration) *layers.IGMP {
igmp := &layers.IGMP{}
igmp.Type = layers.IGMPMembershipQuery
igmp.GroupAddress = mcip
igmp.MaxResponseTime = resptime
return igmp
}
// IgmpReportv2Layer : IGMP Layer
func IgmpReportv2Layer(mcip net.IP) *layers.IGMPv1or2 {
igmp := &layers.IGMPv1or2{}
igmp.Type = layers.IGMPMembershipReportV2
igmp.GroupAddress = mcip
return igmp
}
// IgmpLeavev2Layer : IGMP Leave Layer
func IgmpLeavev2Layer(mcip net.IP) *layers.IGMPv1or2 {
igmp := &layers.IGMPv1or2{}
igmp.Type = layers.IGMPLeaveGroup
igmp.GroupAddress = mcip
return igmp
}
// IgmpReportv3Layer : IGMP v3 Report Layer
func IgmpReportv3Layer(mcip net.IP, incl bool, srclist []net.IP) *layers.IGMP {
// IGMP base
igmp := &layers.IGMP{}
igmp.Type = layers.IGMPMembershipReportV3
igmp.NumberOfGroupRecords = 1
// IGMP Group
group := layers.IGMPv3GroupRecord{}
if incl {
group.Type = layers.IGMPIsIn
} else {
group.Type = layers.IGMPIsEx
}
group.MulticastAddress = mcip
group.NumberOfSources = uint16(len(srclist))
group.SourceAddresses = srclist
igmp.GroupRecords = append(igmp.GroupRecords, group)
return igmp
}
// Igmpv2QueryPacket : IGMP Query in Downstream
func Igmpv2QueryPacket(mcip net.IP, vlan of.VlanType, selfip net.IP, pbit uint8, maxResp uint32) ([]byte, error) {
// Construct the layers that form the packet
eth := IgmpDsEthLayer(mcip)
dot1q := IgmpDsDot1qLayer(vlan, pbit)
ip := IgmpDsIpv4Layer(selfip, mcip)
igmp := IgmpQueryv2Layer(mcip, time.Duration(maxResp)*time.Second)
// Now prepare the buffer into which the layers are to be serialized
buff := gopacket.NewSerializeBuffer()
opts := gopacket.SerializeOptions{
FixLengths: true,
ComputeChecksums: true,
}
if err := gopacket.SerializeLayers(buff, opts, eth, dot1q, ip, igmp); err != nil {
logger.Error(ctx, "Error in serializing layers")
return nil, err
}
return buff.Bytes(), nil
}
// Igmpv3QueryPacket : IGMPv3 Query in Downstream
func Igmpv3QueryPacket(mcip net.IP, vlan of.VlanType, selfip net.IP, pbit uint8, maxResp uint32) ([]byte, error) {
// Construct the layers that form the packet
eth := IgmpDsEthLayer(mcip)
dot1q := IgmpDsDot1qLayer(vlan, pbit)
ip := IgmpDsIpv4Layer(selfip, mcip)
igmp := IgmpQueryv3Layer(mcip, time.Duration(maxResp)*time.Second)
// Now prepare the buffer into which the layers are to be serialized
buff := gopacket.NewSerializeBuffer()
opts := gopacket.SerializeOptions{
FixLengths: true,
ComputeChecksums: true,
}
if err := gopacket.SerializeLayers(buff, opts, eth, dot1q, ip, igmp); err != nil {
logger.Error(ctx, "Error in serializing layers")
return nil, err
}
return buff.Bytes(), nil
}
// IgmpReportv2Packet : Packet - IGMP v2 report in upstream
func IgmpReportv2Packet(mcip net.IP, vlan of.VlanType, priority uint8, selfip net.IP) ([]byte, error) {
// Construct the layers that form the packet
eth := IgmpUsEthLayer(mcip)
dot1q := IgmpUsDot1qLayer(vlan, priority)
ip := Igmpv2UsIpv4Layer(selfip, mcip)
igmp := IgmpReportv2Layer(mcip)
// Now prepare the buffer into which the layers are to be serialized
buff := gopacket.NewSerializeBuffer()
opts := gopacket.SerializeOptions{
FixLengths: true,
ComputeChecksums: true,
}
if err := gopacket.SerializeLayers(buff, opts, eth, dot1q, ip, igmp); err != nil {
logger.Error(ctx, "Error in serializing layers")
return nil, err
}
return buff.Bytes(), nil
}
// Igmpv3ReportPacket : Packet - IGMP v3 report in upstream
func Igmpv3ReportPacket(mcip net.IP, vlan of.VlanType, priority uint8, selfip net.IP, incl bool, srclist []net.IP) ([]byte, error) {
// Construct the layers that form the packet
eth := IgmpUsEthLayer(net.ParseIP("224.0.0.22").To4())
dot1q := IgmpUsDot1qLayer(vlan, priority)
ip := Igmpv3UsIpv4Layer(selfip)
igmp := IgmpReportv3Layer(mcip, incl, srclist)
// Now prepare the buffer into which the layers are to be serialized
buff := gopacket.NewSerializeBuffer()
opts := gopacket.SerializeOptions{
FixLengths: true,
ComputeChecksums: true,
}
if err := gopacket.SerializeLayers(buff, opts, eth, dot1q, ip, igmp); err != nil {
logger.Error(ctx, "Error in serializing layers")
return nil, err
}
return buff.Bytes(), nil
}
// IgmpLeavePacket : Packet- IGMP Leave in upstream
func IgmpLeavePacket(mcip net.IP, vlan of.VlanType, priority uint8, selfip net.IP) ([]byte, error) {
// Construct the layers that form the packet
eth := IgmpUsEthLayer(mcip)
dot1q := IgmpUsDot1qLayer(vlan, priority)
ip := Igmpv2UsIpv4Layer(selfip, mcip)
igmp := IgmpLeavev2Layer(mcip)
// Now prepare the buffer into which the layers are to be serialized
buff := gopacket.NewSerializeBuffer()
opts := gopacket.SerializeOptions{
FixLengths: true,
ComputeChecksums: true,
}
if err := gopacket.SerializeLayers(buff, opts, eth, dot1q, ip, igmp); err != nil {
logger.Error(ctx, "Error in serializing layers")
return nil, err
}
return buff.Bytes(), nil
}
// getVersion to get igmp version type
func getVersion(ver string) uint8 {
if ver == "2" || ver == "v2" {
return IgmpVersion2
}
return IgmpVersion3
}
// IsIPPresent is Utility to check if an IP address is in a list
func IsIPPresent(i net.IP, ips []net.IP) bool {
for _, ip := range ips {
if i.Equal(ip) {
return true
}
}
return false
}
// IgmpGroupPort : IGMP port implements a port which is associated with an IGMP
// version and the list of sources it implements for a given IGMP
// channel. We may improve this to have all IGMP channels so that
// we can implement per subscriber IGMP channel registration limits
// As a rule a single port cannot have both include and exclude
// lists. If we receive a include list we should purge the other
// list which is TODO
type IgmpGroupPort struct {
Port string
CVlan uint16
Pbit uint8
Version uint8
Exclude bool
ExcludeList []net.IP
IncludeList []net.IP
QueryTimeoutCount uint32
PonPortID uint32
}
// NewIgmpGroupPort is constructor for a port
func NewIgmpGroupPort(port string, cvlan uint16, pbit uint8, version uint8, incl bool, ponPortID uint32) *IgmpGroupPort {
var igp IgmpGroupPort
igp.Port = port
igp.CVlan = cvlan
igp.Pbit = pbit
igp.Version = version
igp.Exclude = !incl
igp.QueryTimeoutCount = 0
igp.PonPortID = ponPortID
return &igp
}
// InclSourceIsIn checks if a source is in include list
func (igp *IgmpGroupPort) InclSourceIsIn(src net.IP) bool {
return IsIPPresent(src, igp.IncludeList)
}
// ExclSourceIsIn checks if a source is in exclude list
func (igp *IgmpGroupPort) ExclSourceIsIn(src net.IP) bool {
return IsIPPresent(src, igp.ExcludeList)
}
// AddInclSource adds a source is in include list
func (igp *IgmpGroupPort) AddInclSource(src net.IP) {
logger.Debugw(ctx, "Adding Include Source", log.Fields{"Port": igp.Port, "Src": src})
igp.IncludeList = append(igp.IncludeList, src)
}
// AddExclSource adds a source is in exclude list
func (igp *IgmpGroupPort) AddExclSource(src net.IP) {
logger.Debugw(ctx, "Adding Exclude Source", log.Fields{"Port": igp.Port, "Src": src})
igp.ExcludeList = append(igp.ExcludeList, src)
}
// DelInclSource deletes a source is in include list
func (igp *IgmpGroupPort) DelInclSource(src net.IP) {
logger.Debugw(ctx, "Deleting Include Source", log.Fields{"Port": igp.Port, "Src": src})
for i, addr := range igp.IncludeList {
if addr.Equal(src) {
igp.IncludeList = append(igp.IncludeList[:i], igp.IncludeList[i+1:]...)
return
}
}
}
// DelExclSource deletes a source is in exclude list
func (igp *IgmpGroupPort) DelExclSource(src net.IP) {
logger.Debugw(ctx, "Deleting Exclude Source", log.Fields{"Port": igp.Port, "Src": src})
for i, addr := range igp.ExcludeList {
if addr.Equal(src) {
igp.ExcludeList = append(igp.ExcludeList[:i], igp.ExcludeList[i+1:]...)
return
}
}
}
// WriteToDb is utility to write IGMP Group Port Info to database
func (igp *IgmpGroupPort) WriteToDb(mvlan of.VlanType, gip net.IP, device string) error {
b, err := json.Marshal(igp)
if err != nil {
return err
}
if err1 := db.PutIgmpRcvr(mvlan, gip, device, igp.Port, string(b)); err1 != nil {
return err1
}
return nil
}
// NewIgmpGroupPortFromBytes create the IGMP group port from a byte slice
func NewIgmpGroupPortFromBytes(b []byte) (*IgmpGroupPort, error) {
var igp IgmpGroupPort
if err := json.Unmarshal(b, &igp); err != nil {
logger.Warnw(ctx, "Decode of port failed", log.Fields{"str": string(b)})
return nil, err
}
return &igp, nil
}
// IgmpGroupChannel structure
type IgmpGroupChannel struct {
Device string
GroupID uint32
GroupName string
GroupAddr net.IP
Mvlan of.VlanType
Exclude int
ExcludeList []net.IP
IncludeList []net.IP
Version uint8
ServVersion *uint8 `json:"-"`
CurReceivers map[string]*IgmpGroupPort `json:"-"`
NewReceivers map[string]*IgmpGroupPort `json:"-"`
proxyCfg **IgmpProfile
IgmpProxyIP **net.IP `json:"-"`
}
// NewIgmpGroupChannel is constructor for a channel. The default IGMP version is set to 3
// as the protocol defines the way to manage backward compatibility
// The implementation handles simultaneous presense of lower versioned
// receivers
func NewIgmpGroupChannel(igd *IgmpGroupDevice, groupAddr net.IP, version uint8) *IgmpGroupChannel {
var igc IgmpGroupChannel
igc.Device = igd.Device
igc.GroupID = igd.GroupID
igc.GroupName = igd.GroupName
igc.GroupAddr = groupAddr
igc.Mvlan = igd.Mvlan
igc.Version = version
igc.CurReceivers = make(map[string]*IgmpGroupPort)
igc.NewReceivers = make(map[string]*IgmpGroupPort)
igc.proxyCfg = &igd.proxyCfg
igc.IgmpProxyIP = &igd.IgmpProxyIP
igc.ServVersion = igd.ServVersion
return &igc
}
// NewIgmpGroupChannelFromBytes create the IGMP group channel from a byte slice
func NewIgmpGroupChannelFromBytes(b []byte) (*IgmpGroupChannel, error) {
var igc IgmpGroupChannel
if err := json.Unmarshal(b, &igc); err != nil {
return nil, err
}
igc.CurReceivers = make(map[string]*IgmpGroupPort)
igc.NewReceivers = make(map[string]*IgmpGroupPort)
return &igc, nil
}
// RestorePorts to restore ports
func (igc *IgmpGroupChannel) RestorePorts() {
igc.migrateIgmpPorts()
ports, _ := db.GetIgmpRcvrs(igc.Mvlan, igc.GroupAddr, igc.Device)
for _, port := range ports {
b, ok := port.Value.([]byte)
if !ok {
logger.Warn(ctx, "The value type is not []byte")
continue
}
if igp, err := NewIgmpGroupPortFromBytes(b); err == nil {
igc.NewReceivers[igp.Port] = igp
logger.Infow(ctx, "Group Port Restored", log.Fields{"IGP": igp})
} else {
logger.Warn(ctx, "Failed to decode port from DB")
}
}
if err := igc.WriteToDb(); err != nil {
logger.Errorw(ctx, "Igmp group channel Write to DB failed", log.Fields{"mvlan": igc.Mvlan, "GroupAddr": igc.GroupAddr})
}
}
// WriteToDb is utility to write IGMPGroupChannel Info to database
func (igc *IgmpGroupChannel) WriteToDb() error {
b, err := json.Marshal(igc)
if err != nil {
return err
}
if err1 := db.PutIgmpChannel(igc.Mvlan, igc.GroupName, igc.Device, igc.GroupAddr, string(b)); err1 != nil {
return err1
}
logger.Info(ctx, "IGC Updated")
return nil
}
// UniPortList : UNI Port list per channle has stores the UNI port list for this
// channel.
type UniPortList struct {
UNIList *util.ConcurrentMap // [UNIPort] UNIPort
}
// NewUniPortsList is Constructor for UniPortList structure
func NewUniPortsList() *UniPortList {
var uniPortsList UniPortList
uniPortsList.UNIList = util.NewConcurrentMap()
return &uniPortsList
}
// GetUniPortCount returns the number of UNI ports subscribed to
// current channel.
func (uniPortsList *UniPortList) GetUniPortCount() uint64 {
return uniPortsList.UNIList.Length()
}
// PonPortChannels : PON port channel map keeps the active channel list and its
// count for this group.
type PonPortChannels struct {
ChannelList *util.ConcurrentMap // [channelIP]*UniPortList
}
// NewPonPortChannels is constructor for PonPortChannel.
func NewPonPortChannels() *PonPortChannels {
var ponPortChannel PonPortChannels
ponPortChannel.ChannelList = util.NewConcurrentMap()
return &ponPortChannel
}
// GetActiveChannelCount returns the number of active channel count
// for this pon port in the current group.
func (ponPortChannels *PonPortChannels) GetActiveChannelCount() uint32 {
return uint32(ponPortChannels.ChannelList.Length())
}
// AddChannelToMap Adds new channel to the pon port map
func (ponPortChannels *PonPortChannels) AddChannelToMap(uniPort, channel string) bool {
isNewChannel := bool(false)
uniList, ok := ponPortChannels.ChannelList.Get(channel)
if !ok {
// Channel doesn't exists. Adding new channel.
uniList = NewUniPortsList()
isNewChannel = true
}
uniList.(*UniPortList).UNIList.Set(uniPort, uniPort)
ponPortChannels.ChannelList.Set(channel, uniList)
return isNewChannel
}
// RemoveChannelFromMap Removed channel from the pon port map
func (ponPortChannels *PonPortChannels) RemoveChannelFromMap(uniPort, channel string) bool {
isDeleted := bool(false)
uniList, ok := ponPortChannels.ChannelList.Get(channel)
if ok {
uniList.(*UniPortList).UNIList.Remove(uniPort)
if uniList.(*UniPortList).UNIList.Length() == 0 {
// Last port from the channel is removed.
// Removing channel from PON port map.
ponPortChannels.ChannelList.Remove(channel)
isDeleted = true
} else {
ponPortChannels.ChannelList.Set(channel, uniList)
}
} else {
logger.Warnw(ctx, "Channel doesn't exists in the active channels list", log.Fields{"Channel": channel})
return isDeleted
}
return isDeleted
}
// IgmpGroupDevice : IGMP Group Device manages the IGMP group for all listerns on
// a single OLT. It aggregates reports received on a single group
// and performs the count. It is responsible for sending upstream
// report when the first listener joins and is responsible for
// sending responses to upstream queries
type IgmpGroupDevice struct {
Device string
SerialNo string
GroupID uint32
GroupName string
GroupAddr net.IP
RecvVersion uint8
ServVersion *uint8
RecvVersionExpiry time.Time
ServVersionExpiry time.Time
Mvlan of.VlanType
PonVlan of.VlanType
IsPonVlanPresent bool
GroupInstalled bool
GroupChannels sync.Map `json:"-"` // [ipAddr]*IgmpGroupChannel
PortChannelMap sync.Map `json:"-"` // [portName][]net.IP
PonPortChannelMap *util.ConcurrentMap `json:"-"` // [ponPortId]*PonPortChannels
proxyCfg *IgmpProfile // IgmpSrcIp from IgmpProfile is not used, it is kept for backward compatibility
IgmpProxyIP *net.IP `json:"-"`
NextQueryTime time.Time
QueryExpiryTime time.Time
}
// NewIgmpGroupDevice is constructor for a device. The default IGMP version is set to 3
// as the protocol defines the way to manage backward compatibility
// The implementation handles simultaneous presense of lower versioned
// receivers
func NewIgmpGroupDevice(name string, ig *IgmpGroup, id uint32, version uint8) *IgmpGroupDevice {
var igd IgmpGroupDevice
igd.Device = name
igd.GroupID = id
igd.GroupName = ig.GroupName
igd.GroupAddr = ig.GroupAddr
igd.Mvlan = ig.Mvlan
igd.PonVlan = ig.PonVlan
igd.IsPonVlanPresent = ig.IsPonVlanPresent
igd.GroupInstalled = false
igd.RecvVersion = version
igd.RecvVersionExpiry = time.Now()
igd.ServVersionExpiry = time.Now()
igd.PonPortChannelMap = util.NewConcurrentMap()
va := GetApplication()
if vd := va.GetDevice(igd.Device); vd != nil {
igd.SerialNo = vd.SerialNum
} else {
logger.Errorw(ctx, "Volt Device not found. log.Fields", log.Fields{"igd.Device": igd.Device})
return nil
}
mvp := GetApplication().GetMvlanProfileByTag(igd.Mvlan)
igd.ServVersion = mvp.IgmpServVersion[igd.SerialNo]
var mcastCfg *McastConfig
igd.proxyCfg, igd.IgmpProxyIP, mcastCfg = getIgmpProxyCfgAndIP(ig.Mvlan, igd.SerialNo)
// mvlan profile id + olt serial number---igmp group id
//igmpgroup id
igd.NextQueryTime = time.Now().Add(time.Duration(igd.proxyCfg.KeepAliveInterval) * time.Second)
igd.QueryExpiryTime = time.Now().Add(time.Duration(igd.proxyCfg.KeepAliveInterval) * time.Second)
if mcastCfg != nil {
mcastCfg.IgmpGroupDevices.Store(id, &igd)
logger.Debugw(ctx, "Igd added to mcast config", log.Fields{"mvlan": mcastCfg.MvlanProfileID, "groupId": id})
}
return &igd
}
// IgmpGroupDeviceReInit is re-initializer for a device. The default IGMP version is set to 3
// as the protocol defines the way to manage backward compatibility
func (igd *IgmpGroupDevice) IgmpGroupDeviceReInit(ig *IgmpGroup) {
logger.Infow(ctx, "Reinitialize Igmp Group Device", log.Fields{"Device": igd.Device, "GroupID": ig.GroupID, "OldName": igd.GroupName, "Name": ig.GroupName, "OldAddr": igd.GroupAddr.String(), "GroupAddr": ig.GroupAddr.String()})
if (igd.GroupName != ig.GroupName) || !igd.GroupAddr.Equal(ig.GroupAddr) {
_ = db.DelIgmpDevice(igd.Mvlan, igd.GroupName, igd.GroupAddr, igd.Device)
igd.GroupName = ig.GroupName
igd.GroupAddr = ig.GroupAddr
}
igd.RecvVersionExpiry = time.Now()
igd.ServVersionExpiry = time.Now()
igd.PonPortChannelMap = util.NewConcurrentMap()
var mcastCfg *McastConfig
igd.proxyCfg, igd.IgmpProxyIP, mcastCfg = getIgmpProxyCfgAndIP(ig.Mvlan, igd.SerialNo)
igd.NextQueryTime = time.Now().Add(time.Duration(igd.proxyCfg.KeepAliveInterval) * time.Second)
igd.QueryExpiryTime = time.Now().Add(time.Duration(igd.proxyCfg.KeepAliveInterval) * time.Second)
if mcastCfg != nil {
mcastCfg.IgmpGroupDevices.Store(ig.GroupID, igd)
logger.Debugw(ctx, "Igd added to mcast config", log.Fields{"mvlan": mcastCfg.MvlanProfileID, "groupId": ig.GroupID})
}
if err := igd.WriteToDb(); err != nil {
logger.Errorw(ctx, "Igmp group device Write to DB failed", log.Fields{"Device": igd.Device, "GroupName": igd.GroupName, "GroupAddr": igd.GroupAddr.String()})
}
}
func getIgmpProxyCfgAndIP(mvlan of.VlanType, serialNo string) (*IgmpProfile, *net.IP, *McastConfig) {
va := GetApplication()
mVLANProfileID := va.GetMvlanProfileByTag(mvlan).Name
var mcastCfg *McastConfig
if mcastCfg = va.GetMcastConfig(serialNo, mVLANProfileID); mcastCfg == nil || (mcastCfg != nil && mcastCfg.IgmpProfileID == "") {
logger.Debugw(ctx, "Default IGMP config to be used", log.Fields{"mVLANProfileID": mVLANProfileID, "OltSerialNo": serialNo})
igmpProf := va.getIgmpProfileMap(DefaultIgmpProfID)
return igmpProf, &igmpProf.IgmpSourceIP, mcastCfg
}
return va.getIgmpProfileMap(mcastCfg.IgmpProfileID), &mcastCfg.IgmpProxyIP, mcastCfg
}
// updateGroupName to update the group name
func (igd *IgmpGroupDevice) updateGroupName(newGroupName string) {
oldName := igd.GroupName
igd.GroupName = newGroupName
updateGroupName := func(key, value interface{}) bool {
igc := value.(*IgmpGroupChannel)
igc.GroupName = newGroupName
if err := igc.WriteToDb(); err != nil {
logger.Errorw(ctx, "Igmp group channel Write to DB failed", log.Fields{"mvlan": igc.Mvlan, "GroupAddr": igc.GroupAddr})
}
_ = db.DelIgmpChannel(igc.Mvlan, oldName, igc.Device, igc.GroupAddr)
return true
}
igd.GroupChannels.Range(updateGroupName)
if err := igd.WriteToDb(); err != nil {
logger.Errorw(ctx, "Igmp group device Write to DB failed", log.Fields{"Device": igd.Device, "GroupName": igd.GroupName, "GroupAddr": igd.GroupAddr.String()})
}
_ = db.DelIgmpDevice(igd.Mvlan, oldName, igd.GroupAddr, igd.Device)
}
// NewIgmpGroupDeviceFromBytes is to create the IGMP group port from a byte slice
func NewIgmpGroupDeviceFromBytes(b []byte) (*IgmpGroupDevice, error) {
var igd IgmpGroupDevice
if err := json.Unmarshal(b, &igd); err != nil {
return nil, err
}
return &igd, nil
}
// GetKey to get group name as key
func (igd *IgmpGroupDevice) GetKey() string {
if !net.ParseIP("0.0.0.0").Equal(igd.GroupAddr) {
return igd.GroupName + "_" + igd.GroupAddr.String()
}
return igd.GroupName
}
// RestoreChannel to restore channel
func (igd *IgmpGroupDevice) RestoreChannel(igmpGroupChannel []byte) {
if igc, err := NewIgmpGroupChannelFromBytes(igmpGroupChannel); err == nil {
igc.ServVersion = igd.ServVersion
igc.IgmpProxyIP = &igd.IgmpProxyIP
igc.proxyCfg = &igd.proxyCfg
igd.GroupChannels.Store(igc.GroupAddr.String(), igc)
igc.RestorePorts()
for port, igp := range igc.NewReceivers {
ipsList := []net.IP{}
ipsIntf, _ := igd.PortChannelMap.Load(port)
if ipsIntf != nil {
ipsList = ipsIntf.([]net.IP)
}
ipsList = append(ipsList, igc.GroupAddr)
igd.PortChannelMap.Store(port, ipsList)
logger.Infow(ctx, "Group Channels Restored", log.Fields{"IGC": igc})
igd.AddChannelToChannelsPerPon(port, igc.GroupAddr, igp.PonPortID)
}
} else {
logger.Warnw(ctx, "Failed to decode port from DB", log.Fields{"err": err})
}
logger.Info(ctx, "Group Device & Channels Restored")
igd.PortChannelMap.Range(printPortChannel)
igd.GroupChannels.Range(printChannel)
}
// RestoreChannels to restore channels
func (igd *IgmpGroupDevice) RestoreChannels() {
igd.migrateIgmpChannels()
channels, _ := db.GetIgmpChannels(igd.Mvlan, igd.GroupName, igd.Device)
for _, channel := range channels {
b, ok := channel.Value.([]byte)
if !ok {
logger.Warn(ctx, "The value type is not []byte")
continue
}
igd.RestoreChannel(b)
}
}
// printChannel to print channel info
func printChannel(key interface{}, value interface{}) bool {
logger.Infow(ctx, "ChannelMap", log.Fields{"Channel": key.(string), "Igc": value.(*IgmpGroupChannel)})
return true
}
// printPortChannel to print port channel
func printPortChannel(key interface{}, value interface{}) bool {
logger.Infow(ctx, "PortChannelMap", log.Fields{"Port": key.(string), "List": value.([]net.IP)})
return true
}
// WriteToDb is utility to write IGMP Group Device Info to the database
func (igd *IgmpGroupDevice) WriteToDb() error {
b, err := json.Marshal(igd)
if err != nil {
return err
}
if err1 := db.PutIgmpDevice(igd.Mvlan, igd.GroupName, igd.GroupAddr, igd.Device, string(b)); err1 != nil {
return err1
}
logger.Info(ctx, "IGD Updated")
return nil
}
// Tick processes timing tick used to run timers within the device
func (igd *IgmpGroupDevice) Tick() uint8 {
/* Not using RecvVersionExpiry as it is not used anywhere
if time.Now().After(igd.RecvVersionExpiry) {
igd.RecvVersion = IgmpVersion3
return true
}
*/
return 0
}
// GetSubscriberCountForChannelAndPonPort Gets the active subscriber count
// for the given channel for one particular PON port
func (igd *IgmpGroupDevice) GetSubscriberCountForChannelAndPonPort(ponPortID uint32, channelIP net.IP) uint64 {
if portMapIntf, ok := igd.PonPortChannelMap.Get(ponPortID); ok {
portChannelMap := portMapIntf.(*PonPortChannels)
if channel, present := portChannelMap.ChannelList.Get(channelIP.String()); present {
return channel.(*UniPortList).UNIList.Length()
}
} else {
logger.Warnw(ctx, "PON port not found in PortChannelMap", log.Fields{"PON": ponPortID, "channel": channelIP})
}
return 0
}
// AddChannelToChannelsPerPon Adds the new channel into the per Pon channel list
func (igd *IgmpGroupDevice) AddChannelToChannelsPerPon(uniPort string, channelIP net.IP, ponPortID uint32) bool {
logger.Debugw(ctx, "Adding channel to ActiveChannelsPerPon list", log.Fields{"PonPort": ponPortID, "channelIP": channelIP})
isNewChannel := bool(false)
isNewReceiver := false
if port, ok := igd.PonPortChannelMap.Get(ponPortID); !ok {
// PON port not exists in igd. adding it.
isNewReceiver = true
ponPortChannels := NewPonPortChannels()
isNewChannel = ponPortChannels.AddChannelToMap(uniPort, channelIP.String())
igd.PonPortChannelMap.Set(ponPortID, ponPortChannels)
} else {
// PON port exists in igd. Appending the channel list
// in the PON port.
isNewChannel = port.(*PonPortChannels).AddChannelToMap(uniPort, channelIP.String())
igd.PonPortChannelMap.Set(ponPortID, port)
count := port.(*PonPortChannels).GetActiveChannelCount()
logger.Debugw(ctx, "activeChannelCount", log.Fields{"count": count})
}
GetApplication().UpdateActiveChannelCountForPonPort(igd.Device, uniPort, ponPortID, true, isNewChannel, igd)
return isNewReceiver
}
// RemoveChannelFromChannelsPerPon removes the channel from the per pon channel list.
func (igd *IgmpGroupDevice) RemoveChannelFromChannelsPerPon(uniPort string, channelIP net.IP, ponPortID uint32) bool {
logger.Debugw(ctx, "Removing channel from ActiveChannelsPerPon list", log.Fields{"PonPort": ponPortID, "channelIP": channelIP})
var deleted bool
ponRemoved := false
if port, ok := igd.PonPortChannelMap.Get(ponPortID); ok {
channelPortMap := port.(*PonPortChannels)
deleted = channelPortMap.RemoveChannelFromMap(uniPort, channelIP.String())
if deleted && channelPortMap.ChannelList.Length() == 0 {
igd.PonPortChannelMap.Remove(ponPortID)
ponRemoved = true
}
GetApplication().UpdateActiveChannelCountForPonPort(igd.Device, uniPort, ponPortID, false, deleted, igd)
} else {
logger.Warnw(ctx, "PON port doesn't exists in the igd", log.Fields{"PonPortID": ponPortID})
}
return ponRemoved
}
// InclSourceIsIn checks if a source is in include list
func (igc *IgmpGroupChannel) InclSourceIsIn(src net.IP) bool {
return IsIPPresent(src, igc.IncludeList)
}
// ExclSourceIsIn checks if a source is in exclude list
func (igc *IgmpGroupChannel) ExclSourceIsIn(src net.IP) bool {
return IsIPPresent(src, igc.ExcludeList)
}
// AddInclSource adds a source is in include list
func (igc *IgmpGroupChannel) AddInclSource(src net.IP) {
logger.Debugw(ctx, "Adding Include Source for Channel", log.Fields{"Channel": igc.GroupAddr.String(), "Device": igc.Device, "Src": src})
igc.IncludeList = append(igc.IncludeList, src)
}
// AddExclSource adds a source is in exclude list
func (igc *IgmpGroupChannel) AddExclSource(src net.IP) {
logger.Debugw(ctx, "Adding Exclude Source for Channel", log.Fields{"Channel": igc.GroupAddr.String(), "Device": igc.Device, "Src": src})
igc.ExcludeList = append(igc.ExcludeList, src)
}
// UpdateExclSource update excl source list for the given channel
func (igc *IgmpGroupChannel) UpdateExclSource(srcList []net.IP) bool {
logger.Debugw(ctx, "Updating Exclude Source for Channel", log.Fields{"Channel": igc.GroupAddr.String(), "Device": igc.Device, "Current List": igc.ExcludeList, "Incoming List": srcList})
if !igc.IsExclListChanged(srcList) {
return false
}
if igc.NumReceivers() == 1 {
igc.ExcludeList = srcList
} else {
igc.ExcludeList = igc.computeExclList(srcList)
}
logger.Debugw(ctx, "Updated Exclude Source for Channel", log.Fields{"Channel": igc.GroupAddr.String(), "Device": igc.Device, "Updated Excl List": igc.ExcludeList})
return true
}
// computeExclList computes intersection of pervious & current src list
func (igc *IgmpGroupChannel) computeExclList(srcList []net.IP) []net.IP {
updatedSrcList := []net.IP{}
for _, src := range srcList {
for _, excl := range igc.ExcludeList {
if src.Equal(excl) {
updatedSrcList = append(updatedSrcList, src)
}
}
}
return updatedSrcList
}
// IsExclListChanged checks if excl list has been updated
func (igc *IgmpGroupChannel) IsExclListChanged(srcList []net.IP) bool {
srcPresent := false
if len(igc.ExcludeList) != len(srcList) {
return true
}
for _, src := range srcList {
for _, excl := range igc.ExcludeList {
srcPresent = false
if src.Equal(excl) {
srcPresent = true
break
}
}
if !srcPresent {
return true
}
}
return false
}
// DelInclSource deletes a source is in include list
func (igc *IgmpGroupChannel) DelInclSource(src net.IP) {
mvp := GetApplication().GetMvlanProfileByTag(igc.Mvlan)
/* If the SSM proxy is configured, then we can del the src ip from igc as whatever is in proxy that is final list */
if _, ok := mvp.Proxy[igc.GroupName]; !ok {
logger.Debugw(ctx, "Deleting Include Source for Channel", log.Fields{"Channel": igc.GroupAddr.String(), "Device": igc.Device, "Src": src})
for _, igp := range igc.CurReceivers {
if igp.InclSourceIsIn(src) {
logger.Infow(ctx, "Skipping deletion: Source Present for another Receiver", log.Fields{"Receiver": igp.Port})
return
}
}
for _, igp := range igc.NewReceivers {
if igp.InclSourceIsIn(src) {
logger.Infow(ctx, "Skipping deletion: Source Present for another Receiver", log.Fields{"Receiver": igp.Port})
return
}
}
} else {
logger.Debug(ctx, "Proxy configured, not Deleting Include Source for Channel")
}
for i, addr := range igc.IncludeList {
if addr.Equal(src) {
igc.IncludeList = append(igc.IncludeList[:i], igc.IncludeList[i+1:]...)
return
}
}
}
// DelExclSource deletes a source is in exclude list
func (igc *IgmpGroupChannel) DelExclSource(src net.IP) {
logger.Debugw(ctx, "Deleting Exclude Source for Channel", log.Fields{"Channel": igc.GroupAddr.String(), "Device": igc.Device, "Src": src})
for _, igp := range igc.CurReceivers {
if igp.ExclSourceIsIn(src) {
logger.Infow(ctx, "Skipping deletion: Source Present for another Receiver", log.Fields{"Receiver": igp.Port})
return
}
}
for _, igp := range igc.NewReceivers {
if igp.ExclSourceIsIn(src) {
logger.Infow(ctx, "Skipping deletion: Source Present for another Receiver", log.Fields{"Receiver": igp.Port})
return
}
}
for i, addr := range igc.ExcludeList {
if addr.Equal(src) {
igc.ExcludeList = append(igc.ExcludeList[:i], igc.ExcludeList[i+1:]...)
return
}
}
}
// ProcessSources process the received list of either included sources or the excluded sources
// The return value indicate sif the group is modified and needs to be informed
// to the upstream multicast servers
func (igc *IgmpGroupChannel) ProcessSources(port string, ip []net.IP, incl bool) (bool, bool) {
groupChanged := false
groupExclUpdated := false
receiverSrcListEmpty := false
// If the version type is 2, there isn't anything to process here
if igc.Version == IgmpVersion2 && *igc.ServVersion == IgmpVersion2 {
return false, false
}
igp := igc.GetReceiver(port)
if igp == nil {
logger.Warnw(ctx, "Receiver not found", log.Fields{"Port": port})
return false, false
}
mvp := GetApplication().GetMvlanProfileByTag(igc.Mvlan)
if incl {
for _, src := range ip {
if igp.ExclSourceIsIn(src) {
igp.DelExclSource(src)
if igc.ExclSourceIsIn(src) {
igc.DelExclSource(src)
groupChanged = true
}
}
// If the source is not in the list of include sources for the port
// add it. If so, check also if it is in list of include sources
// at the device level.
if !igp.InclSourceIsIn(src) {
igp.AddInclSource(src)
if !igc.InclSourceIsIn(src) {
igc.AddInclSource(src)
groupChanged = true
}
}
}
/* If any of the existing ip in the source list is removed we need to remove from the list in igp and igc */
if _, ok := mvp.Proxy[igc.GroupName]; ok {
/* If we get leave message from any subscriber, we do not have to delete the entries in the src list
Only if ther is any modification in the src list by proxy config update only then we need to update */
if len(ip) != 0 && len(ip) != len(igc.IncludeList) {
for i := len(igc.IncludeList) - 1; i >= 0; i-- {
src := igc.IncludeList[i]
if !IsIPPresent(src, ip) {
igp.DelInclSource(src)
igc.DelInclSource(src)
groupChanged = true
}
}
}
}
} else {
for _, src := range ip {
if igp.InclSourceIsIn(src) {
igp.DelInclSource(src)
if igc.InclSourceIsIn(src) {
igc.DelInclSource(src)
groupChanged = true
}
if len(igp.IncludeList) == 0 {
receiverSrcListEmpty = true
}
}
// If the source is not in the list of exclude sources for the port
// add it. If so, check also if it is in list of include sources
// at the device level.
if !igp.ExclSourceIsIn(src) {
igp.AddExclSource(src)
/* If there is any update in the src list of proxy we need to update the igc */
if _, ok := mvp.Proxy[igc.GroupName]; ok {
if !igc.ExclSourceIsIn(src) {
igc.AddExclSource(src)
groupChanged = true
}
}
}
}
/* If any of the existing ip in the source list is removed we need to remove from the list in igp and igc */
if _, ok := mvp.Proxy[igc.GroupName]; ok {
if len(ip) != len(igc.ExcludeList) {
for i := len(igc.ExcludeList) - 1; i >= 0; i-- {
src := igc.ExcludeList[i]
if !IsIPPresent(src, ip) {
igp.DelExclSource(src)
igc.DelExclSource(src)
groupChanged = true
}
}
}
}
groupExclUpdated = igc.UpdateExclSource(ip)
}
if err := igp.WriteToDb(igc.Mvlan, igc.GroupAddr, igc.Device); err != nil {
logger.Errorw(ctx, "Igmp group port Write to DB failed", log.Fields{"mvlan": igc.Mvlan, "GroupAddr": igc.GroupAddr})
}
return (groupChanged || groupExclUpdated), receiverSrcListEmpty
}
// GetReceiver to get receiver info
func (igc *IgmpGroupChannel) GetReceiver(port string) *IgmpGroupPort {
igp := igc.NewReceivers[port]
if igp == nil {
igp = igc.CurReceivers[port]
}
return igp
}
// AddReceiver add the receiver to the device and perform other actions such as adding the group
// to the physical device, add members, add flows to point the MC packets to the
// group. Also, send a IGMP report upstream if there is a change in the group
func (igd *IgmpGroupDevice) AddReceiver(port string, groupAddr net.IP,
group *layers.IGMPv3GroupRecord, version uint8, cvlan uint16, pbit uint8, ponPortID uint32) {
var igc *IgmpGroupChannel
logger.Debugw(ctx, "Processing receiver for device", log.Fields{"Channel": groupAddr, "Port": port, "Device": igd.Device})
igcIntf, ok := igd.GroupChannels.Load(groupAddr.String())
if !ok {
igc = NewIgmpGroupChannel(igd, groupAddr, version)
igd.GroupChannels.Store(groupAddr.String(), igc)
} else {
igc = igcIntf.(*IgmpGroupChannel)
}
if !igd.GroupInstalled {
igd.AddNewReceiver(port, groupAddr, group, cvlan, pbit, ponPortID)
return
}
isNewReceiver := igc.AddReceiver(port, group, cvlan, pbit)
if isNewReceiver {
ipsList := []net.IP{}
ipsIntf, _ := igd.PortChannelMap.Load(port)
if ipsIntf != nil {
ipsList = ipsIntf.([]net.IP)
}
ipsList = append(ipsList, groupAddr)
igd.PortChannelMap.Store(port, ipsList)
logger.Debugw(ctx, "Port Channel Updated", log.Fields{"Port": port, "AddedChannelList": ipsList, "Addr": groupAddr})
isNewPonReceiver := igd.AddChannelToChannelsPerPon(port, groupAddr, ponPortID)
//Modify group only if this is the first time the port is subscribing for the group
if isNewPonReceiver {
igd.ModMcGroup()
}
}
if err := igd.WriteToDb(); err != nil {
logger.Errorw(ctx, "Igmp group device Write to DB failed", log.Fields{"Device": igd.Device, "GroupName": igd.GroupName, "GroupAddr": igd.GroupAddr.String()})
}
}
// AddNewReceiver to add new receiver
func (igd *IgmpGroupDevice) AddNewReceiver(port string, groupAddr net.IP, group *layers.IGMPv3GroupRecord, cvlan uint16, pbit uint8, ponPortID uint32) {
logger.Debugw(ctx, "Adding New Device Receiver", log.Fields{"Channel": groupAddr, "Port": port, "Device": igd.Device})
igcIntf, _ := igd.GroupChannels.Load(groupAddr.String())
if igcIntf == nil {
logger.Warnw(ctx, "No Group Channel present for given channel", log.Fields{"Channel": groupAddr, "Port": port, "Device": igd.Device})
return
}
igc := igcIntf.(*IgmpGroupChannel)
ipsList := []net.IP{}
ipsIntf, _ := igd.PortChannelMap.Load(port)
if ipsIntf != nil {
ipsList = ipsIntf.([]net.IP)
}
ipsList = append(ipsList, groupAddr)
igd.PortChannelMap.Store(port, ipsList)
igd.AddChannelToChannelsPerPon(port, groupAddr, ponPortID)
logger.Debugw(ctx, "Port Channel Updated", log.Fields{"Port": port, "NewChannelList": ipsList, "Addr": groupAddr})
igd.AddMcGroup()
igc.AddReceiver(port, group, cvlan, pbit)
if err := igd.WriteToDb(); err != nil {
logger.Errorw(ctx, "Igmp group device Write to DB failed", log.Fields{"Device": igd.Device, "GroupName": igd.GroupName, "GroupAddr": igd.GroupAddr.String()})
}
}
// AddReceiver add the receiver to the device and perform other actions such as adding the group
// to the physical device, add members, add flows to point the MC packets to the
// group. Also, send a IGMP report upstream if there is a change in the group
func (igc *IgmpGroupChannel) AddReceiver(port string, group *layers.IGMPv3GroupRecord, cvlan uint16, pbit uint8) bool {
var igp *IgmpGroupPort
var groupModified = false
var isNewReceiver = false
var ip []net.IP
incl := false
mvp := GetApplication().GetMvlanProfileByTag(igc.Mvlan)
if _, ok := mvp.Proxy[igc.GroupName]; ok {
if mvp.Proxy[igc.GroupName].Mode == common.Include {
incl = true
}
ip = mvp.Proxy[igc.GroupName].SourceList
} else if group != nil {
incl = isIncl(group.Type)
ip = group.SourceAddresses
}
logger.Debugw(ctx, "Attempting to add receiver", log.Fields{"Version": igc.Version, "Port": port, "Incl": incl, "srcIp": ip})
//logger.Infow(ctx, "Receivers", log.Fields{"New": igc.NewReceivers, "Current": igc.CurReceivers})
logger.Debugw(ctx, "Receiver Group", log.Fields{"Igd GId": igc.GroupID})
logger.Debugw(ctx, "Receiver Channel", log.Fields{"Igd addr": igc.GroupAddr})
logger.Debugw(ctx, "Receiver Mvlan", log.Fields{"Igd mvlan": igc.Mvlan})
logger.Debugw(ctx, "Receiver Sources", log.Fields{"Igd addr": ip})
ponPortID := GetApplication().GetPonPortID(igc.Device, port)
// Process the IGMP receiver. If it is already in, we should only process the changes
// to source list.
var newRcvExists bool
igp, newRcvExists = igc.NewReceivers[port]
if !newRcvExists {
// Add the receiver to the list of receivers and make the necessary group modification
// if this is the first time the receiver is added
var curRcvExists bool
if igp, curRcvExists = igc.CurReceivers[port]; curRcvExists {
logger.Debugw(ctx, "Existing IGMP receiver", log.Fields{"Group": igc.GroupAddr.String(), "Port": port})
delete(igc.CurReceivers, port)
igp.QueryTimeoutCount = 0
igc.NewReceivers[port] = igp
} else {
// New receiver who wasn't part of earlier list
// Need to send out IGMP group modification for this port
igp = NewIgmpGroupPort(port, cvlan, pbit, igc.Version, incl, uint32(ponPortID))
igc.NewReceivers[port] = igp
isNewReceiver = true
logger.Debugw(ctx, "New IGMP receiver", log.Fields{"Group": igc.GroupAddr.String(), "Port": port})
if len(igc.NewReceivers) == 1 && len(igc.CurReceivers) == 0 {
groupModified = true
igc.AddMcFlow()
logger.Debugw(ctx, "Added New Flow", log.Fields{"Group": igc.GroupAddr.String(), "Port": port})
}
if !incl {
igc.Exclude++
}
}
}
// Process the include/exclude list which may end up modifying the group
if change, _ := igc.ProcessSources(port, ip, incl); change {
groupModified = true
}
igc.ProcessMode(port, incl)
// If the group is modified as this is the first receiver or due to include/exclude list modification
// send a report to the upstream multicast servers
if groupModified {
logger.Debug(ctx, "Group Modified and IGMP report sent to the upstream server")
igc.SendReport(false)
} else if newRcvExists {
return false
}
logger.Debugw(ctx, "Channel Receiver Added", log.Fields{"Group Channel": igc.GroupAddr, "Group Port": igp})
if err := igc.WriteToDb(); err != nil {
logger.Errorw(ctx, "Igmp group channel Write to DB failed", log.Fields{"mvlan": igc.Mvlan, "GroupAddr": igc.GroupAddr})
}
if err := igp.WriteToDb(igc.Mvlan, igc.GroupAddr, igc.Device); err != nil {
logger.Errorw(ctx, "Igmp group port Write to DB failed", log.Fields{"mvlan": igc.Mvlan, "GroupAddr": igc.GroupAddr})
}
return isNewReceiver
}
// DelReceiver is called when Query expiry happened for a receiver. This removes the receiver from the
// the group
func (igc *IgmpGroupChannel) DelReceiver(port string, incl bool, srcList []net.IP) bool {
// The receiver may exist either in NewReceiver list or
// the CurReceivers list. Find and remove it from either
// of the lists.
logger.Debugw(ctx, "Deleting Receiver from Channel", log.Fields{"Port": port, "SrcList": srcList, "Incl": incl})
logger.Debugw(ctx, "New Receivers", log.Fields{"New": igc.NewReceivers})
logger.Debugw(ctx, "Current Receivers", log.Fields{"Current": igc.CurReceivers})
receiversUpdated := false
groupModified, receiverSrcListEmpty := igc.ProcessSources(port, srcList, incl)
if len(srcList) == 0 || len(igc.IncludeList) == 0 || receiverSrcListEmpty {
if igp, ok := igc.NewReceivers[port]; ok {
logger.Debug(ctx, "Deleting from NewReceivers")
delete(igc.NewReceivers, port)
receiversUpdated = true
if igp.Exclude {
igc.Exclude--
}
} else {
if igp, ok1 := igc.CurReceivers[port]; ok1 {
logger.Debug(ctx, "Deleting from CurReceivers")
delete(igc.CurReceivers, port)
receiversUpdated = true
if igp.Exclude {
igc.Exclude--
}
} else {
logger.Debug(ctx, "Receiver doesnot exist. Dropping Igmp leave")
return false
}
}
_ = db.DelIgmpRcvr(igc.Mvlan, igc.GroupAddr, igc.Device, port)
}
if igc.NumReceivers() == 0 {
igc.DelMcFlow()
mvp := GetApplication().GetMvlanProfileByTag(igc.Mvlan)
/* If proxy is configured and NumReceivers is 0, then we can reset the igc src list so that we send leave */
if _, ok := mvp.Proxy[igc.GroupName]; ok {
igc.IncludeList = []net.IP{}
}
igc.SendLeaveToServer()
logger.Debugw(ctx, "Deleted the receiver Flow", log.Fields{"Num Receivers": igc.NumReceivers()})
return true
}
if groupModified {
igc.SendReport(false)
logger.Infow(ctx, "Updated SourceList for Channel", log.Fields{"Current": igc.CurReceivers, "New": igc.NewReceivers})
}
if err := igc.WriteToDb(); err != nil {
logger.Errorw(ctx, "Igmp group channel Write to DB failed", log.Fields{"mvlan": igc.Mvlan, "GroupAddr": igc.GroupAddr})
}
logger.Infow(ctx, "Updated Receiver info for Channel", log.Fields{"Current": igc.CurReceivers, "New": igc.NewReceivers})
return receiversUpdated
}
// NumReceivers to get number of receivers
func (igd *IgmpGroupDevice) NumReceivers() int {
var numReceivers int
len := func(key interface{}, value interface{}) bool {
numReceivers++
return true
}
igd.PortChannelMap.Range(len)
return numReceivers
}
// DelReceiver is called when Query expiry happened for a receiver. This removes the receiver from the
// the group
func (igd *IgmpGroupDevice) DelReceiver(groupAddr net.IP, port string, group *layers.IGMPv3GroupRecord, ponPortID uint32) {
logger.Debugw(ctx, "Deleting Receiver for Device", log.Fields{"port": port, "GroupIP": groupAddr.String()})
var igc *IgmpGroupChannel
var igcIntf interface{}
var ok bool
var srcList []net.IP
incl := false
mvp := GetApplication().GetMvlanProfileByTag(igd.Mvlan)
if _, ok := mvp.Proxy[igd.GroupName]; ok {
incl = true
} else if group != nil {
srcList = group.SourceAddresses
incl = isIncl(group.Type)
}
if igcIntf, ok = igd.GroupChannels.Load(groupAddr.String()); !ok {
logger.Warnw(ctx, "Igmp Channel for group IP doesnt exist", log.Fields{"GroupAddr": groupAddr.String()})
return
}
igc = igcIntf.(*IgmpGroupChannel)
if ok := igc.DelReceiver(port, incl, srcList); !ok {
return
}
if igc.NumReceivers() == 0 {
igd.DelIgmpGroupChannel(igc)
}
igd.DelPortFromChannel(port, groupAddr)
isGroupModified := igd.RemoveChannelFromChannelsPerPon(port, groupAddr, ponPortID)
//Remove port from receiver if port has no subscription to any of the group channels
if isGroupModified {
igd.ModMcGroup()
}
if err := igd.WriteToDb(); err != nil {
logger.Errorw(ctx, "Igmp group device Write to DB failed", log.Fields{"Device": igd.Device, "GroupName": igd.GroupName, "GroupAddr": igd.GroupAddr.String()})
}
}
// DelChannelReceiver is called when Query expiry happened for a receiver. This removes the receiver from the
// the group
func (igd *IgmpGroupDevice) DelChannelReceiver(groupAddr net.IP) map[string]*IgmpGroupPort {
portsRemoved := make(map[string]*IgmpGroupPort)
groupModified := false
// ifEmpty := true
igcIntf, _ := igd.GroupChannels.Load(groupAddr.String())
if igcIntf == nil {
return portsRemoved
}
igc := igcIntf.(*IgmpGroupChannel)
for port, igp := range igc.NewReceivers {
_ = db.DelIgmpRcvr(igc.Mvlan, igc.GroupAddr, igc.Device, port) //TODO: Y not here
igd.DelPortFromChannel(port, igc.GroupAddr)
ponPortID := GetApplication().GetPonPortID(igd.Device, port)
groupModified = igd.RemoveChannelFromChannelsPerPon(port, igc.GroupAddr, ponPortID)
delete(igc.NewReceivers, port)
portsRemoved[port] = igp
}
for port, igp := range igc.CurReceivers {
_ = db.DelIgmpRcvr(igc.Mvlan, igc.GroupAddr, igc.Device, port)
igd.DelPortFromChannel(port, igc.GroupAddr)
ponPortID := GetApplication().GetPonPortID(igd.Device, port)
groupModified = igd.RemoveChannelFromChannelsPerPon(port, igc.GroupAddr, ponPortID)
delete(igc.CurReceivers, port)
portsRemoved[port] = igp
}
igc.DelMcFlow()
igd.DelIgmpGroupChannel(igc)
igc.Exclude = 0
igc.SendLeaveToServer()
if groupModified {
igd.ModMcGroup()
}
if err := igd.WriteToDb(); err != nil {
logger.Errorw(ctx, "Igmp group device Write to DB failed", log.Fields{"Device": igd.Device, "GroupName": igd.GroupName, "GroupAddr": igd.GroupAddr.String()})
}
logger.Debugw(ctx, "Deleted the receiver Flow", log.Fields{"Num Receivers": igc.NumReceivers()})
return portsRemoved
}
// DelIgmpGroupChannel to delete igmp group channel
func (igd *IgmpGroupDevice) DelIgmpGroupChannel(igc *IgmpGroupChannel) {
if igc.NumReceivers() != 0 {
igc.DelAllReceivers()
}
_ = db.DelIgmpChannel(igc.Mvlan, igc.GroupName, igc.Device, igc.GroupAddr)
igd.GroupChannels.Delete(igc.GroupAddr.String())
logger.Infow(ctx, "Deleted the Channel from Device", log.Fields{"Channel": igc.GroupAddr.String()})
isLenZero := true
checkIfEmpty := func(key interface{}, value interface{}) bool {
isLenZero = false
return false
}
igd.GroupChannels.Range(checkIfEmpty)
if isLenZero {
logger.Infow(ctx, "No more active channels. Deleting MC Group", log.Fields{"Device": igd.Device, "Group": igd.GroupName})
igd.DelMcGroup(false)
}
}
// func (igd *IgmpGroupDevice) DelIgmpChannel(igc *IgmpGroupChannel) {
// db.DelIgmpChannel(igc.GroupName, igc.Device, igc.GroupAddr)
// delete(igd.GroupChannels, igc.GroupAddr.String())
// logger.Debugw(ctx, "Deleted the Channel", log.Fields{"Num Receivers": igc.NumReceivers()})
// }
// DelPortFromChannel to delete port from channel
func (igd *IgmpGroupDevice) DelPortFromChannel(port string, groupAddr net.IP) bool {
ipsList := []net.IP{}
ipsListIntf, _ := igd.PortChannelMap.Load(port)
if ipsListIntf != nil {
ipsList = ipsListIntf.([]net.IP)
}
for i, addr := range ipsList {
if addr.Equal(groupAddr) {
ipsList = append(ipsList[:i], ipsList[i+1:]...)
//Remove port from receiver if port has no subscription to any of the group channels
if len(ipsList) == 0 {
igd.PortChannelMap.Delete(port)
} else {
//Update the map with modified ips list
igd.PortChannelMap.Store(port, ipsList)
}
logger.Debugw(ctx, "Port Channel Updated", log.Fields{"Port": port, "DelChannelList": ipsList, "Addr": groupAddr.String()})
return true
}
}
return false
}
// DelIgmpGroup deletes all devices for the provided igmp group
func (ig *IgmpGroup) DelIgmpGroup() {
logger.Infow(ctx, "Deleting All Device for Group", log.Fields{"Group": ig.GroupName})
for _, igd := range ig.Devices {
ig.DelIgmpGroupDevice(igd)
}
GetApplication().DelIgmpGroup(ig)
}
// DelAllChannels deletes all receiver for the provided igmp device
func (igd *IgmpGroupDevice) DelAllChannels() {
logger.Infow(ctx, "Deleting All Channel for Device", log.Fields{"Device": igd.Device, "Group": igd.GroupName})
delGroupChannels := func(key interface{}, value interface{}) bool {
igc := value.(*IgmpGroupChannel)
igd.DelIgmpGroupChannel(igc)
return true
}
igd.GroupChannels.Range(delGroupChannels)
}
// DelAllReceivers deletes all receiver for the provided igmp device
func (igc *IgmpGroupChannel) DelAllReceivers() {
logger.Infow(ctx, "Deleting All Receiver for Channel", log.Fields{"Device": igc.Device, "Channel": igc.GroupAddr.String()})
_ = db.DelAllIgmpRcvr(igc.Mvlan, igc.GroupAddr, igc.Device)
igc.Exclude = 0
igc.DelMcFlow()
igc.SendLeaveToServer()
logger.Infow(ctx, "MC Flow deleted and Leave sent", log.Fields{"Channel": igc.GroupAddr.String(), "Device": igc.Device})
}
// ProcessQuery process query received from the upstream IGMP server
func (igd *IgmpGroupDevice) ProcessQuery(groupAddr net.IP, ver uint8) {
logger.Debugw(ctx, "Received Query From Server", log.Fields{"Version": ver})
if ver != *igd.ServVersion {
igd.ServVersionExpiry = time.Now().Add(time.Duration(2*igd.proxyCfg.KeepAliveInterval) * time.Second)
*igd.ServVersion = ver
mvp := GetApplication().GetMvlanProfileByTag(igd.Mvlan)
if err := mvp.WriteToDb(); err != nil {
logger.Errorw(ctx, "Mvlan profile write to DB failed", log.Fields{"ProfileName": mvp.Name})
}
}
if igc, ok := igd.GroupChannels.Load(groupAddr.String()); ok {
igc.(*IgmpGroupChannel).SendReport(true)
return
}
logger.Infow(ctx, "No Members for Channel. Dropping Igmp Query", log.Fields{"Group": igd.GroupName, "Channel": groupAddr.String()})
}
// Igmpv2ReportPacket build an IGMPv2 Report for the upstream servers
func (igc *IgmpGroupChannel) Igmpv2ReportPacket() ([]byte, error) {
logger.Debugw(ctx, "Buidling IGMP version 2 Report", log.Fields{"Device": igc.Device})
return IgmpReportv2Packet(igc.GroupAddr, igc.Mvlan, (*igc.proxyCfg).IgmpCos, **igc.IgmpProxyIP)
}
// Igmpv3ReportPacket build an IGMPv3 Report for the upstream servers
func (igc *IgmpGroupChannel) Igmpv3ReportPacket() ([]byte, error) {
logger.Debugw(ctx, "Buidling IGMP version 3 Report", log.Fields{"Device": igc.Device, "Exclude": igc.Exclude})
if igc.Exclude > 0 {
return Igmpv3ReportPacket(igc.GroupAddr, igc.Mvlan, (*igc.proxyCfg).IgmpCos, **igc.IgmpProxyIP, false, igc.ExcludeList)
}
return Igmpv3ReportPacket(igc.GroupAddr, igc.Mvlan, (*igc.proxyCfg).IgmpCos, **igc.IgmpProxyIP, true, igc.IncludeList)
}
// SendReport send a consolidated report to the server
func (igc *IgmpGroupChannel) SendReport(isQuery bool) {
var report []byte
var err error
logger.Debugw(ctx, "Checking Version", log.Fields{"IGC Version": igc.Version, "Proxy Version": (*igc.proxyCfg).IgmpVerToServer,
"Result": (getVersion((*igc.proxyCfg).IgmpVerToServer) == IgmpVersion2)})
/**
+------------------------------------------------------------------------+
| IGMP version(towards BNG) Configured at VGC |
+-------------------------------+----------------------------------------+
| v2 | v3 |
+===================+==========+===============================+========================================+
| Received From RG | V2 Join | Process and Send as V2 to BNG | Process, Convert to v3 and Send to BNG |
| | | | Process, Send as v2, if the BNG is v2 |
+===================+----------+-------------------------------+----------------------------------------+
| V3 Join | Process and Send as V2 to BNG | Process, Send v3 to BNG |
| | | Process, Convert, Send as v2, if the |
| | | BNG is v2 |
+===================+==========+===============================+========================================+
| Received From BNG | V2 Query | V2 response to BNG | V2 response to BNG |
+===================+----------+-------------------------------+----------------------------------------+
| V3 Query | Discard | V3 response to BNG |
+==========+===============================+========================================+
*/
// igc.Version: igmp version received from RG.
// igc.ServVersion: igmp version received from BNG or IgmpVerToServer present in proxy igmp conf.
if isQuery && *igc.ServVersion == IgmpVersion3 && getVersion((*igc.proxyCfg).IgmpVerToServer) == IgmpVersion2 {
// This is the last scenario where we must discard the query processing.
logger.Debug(ctx, "Dropping query packet since the server verion is v3 but igmp proxy version is v2")
return
}
if *igc.ServVersion == IgmpVersion2 || getVersion((*igc.proxyCfg).IgmpVerToServer) == IgmpVersion2 {
report, err = igc.Igmpv2ReportPacket()
} else {
report, err = igc.Igmpv3ReportPacket()
}
if err != nil {
logger.Warnw(ctx, "Error Preparing Report", log.Fields{"Device": igc.Device, "Ver": igc.Version, "Reason": err.Error()})
return
}
nni, err := GetApplication().GetNniPort(igc.Device)
if err == nil {
_ = cntlr.GetController().PacketOutReq(igc.Device, nni, nni, report, false)
} else {
logger.Warnw(ctx, "Didn't find NNI port", log.Fields{"Device": igc.Device})
}
}
// AddMcFlow adds flow to the device when the first receiver joins
func (igc *IgmpGroupChannel) AddMcFlow() {
flow, err := igc.BuildMcFlow()
if err != nil {
logger.Warnw(ctx, "MC Flow Build Failed", log.Fields{"Reason": err.Error()})
return
}
port, _ := GetApplication().GetNniPort(igc.Device)
_ = cntlr.GetController().AddFlows(port, igc.Device, flow)
}
// DelMcFlow deletes flow from the device when the last receiver leaves
func (igc *IgmpGroupChannel) DelMcFlow() {
flow, err := igc.BuildMcFlow()
if err != nil {
logger.Warnw(ctx, "MC Flow Build Failed", log.Fields{"Reason": err.Error()})
return
}
flow.ForceAction = true
device := GetApplication().GetDevice(igc.Device)
if mvpIntf, _ := GetApplication().MvlanProfilesByTag.Load(igc.Mvlan); mvpIntf != nil {
mvp := mvpIntf.(*MvlanProfile)
err := mvp.DelFlows(device, flow)
if err != nil {
logger.Warnw(ctx, "Delering IGMP Flow for device failed ", log.Fields{"Device": device, "err": err})
}
}
}
// BuildMcFlow builds the flow using which it is added/deleted
func (igc *IgmpGroupChannel) BuildMcFlow() (*of.VoltFlow, error) {
flow := &of.VoltFlow{}
flow.SubFlows = make(map[uint64]*of.VoltSubFlow)
//va := GetApplication()
logger.Infow(ctx, "Building Mcast flow", log.Fields{"Mcast Group": igc.GroupAddr.String(), "Mvlan": igc.Mvlan.String()})
uintGroupAddr := ipv4ToUint(igc.GroupAddr)
subFlow := of.NewVoltSubFlow()
subFlow.SetMatchVlan(igc.Mvlan)
subFlow.SetIpv4Match()
subFlow.SetMatchDstIpv4(igc.GroupAddr)
mvp := GetApplication().GetMvlanProfileByTag(igc.Mvlan)
//nni, err := va.GetNniPort(igc.Device)
//if err != nil {
// return nil, err
//}
//inport, err := va.GetPortID(nni)
//if err != nil {
// return nil, err
//}
//subFlow.SetInPort(inport)
subFlow.SetOutGroup(igc.GroupID)
cookiePort := uintGroupAddr
subFlow.Cookie = uint64(cookiePort)<<32 | uint64(igc.Mvlan)
subFlow.Priority = of.McFlowPriority
metadata := uint64(mvp.PonVlan)
subFlow.SetTableMetadata(metadata)
flow.SubFlows[subFlow.Cookie] = subFlow
logger.Infow(ctx, "Built Mcast flow", log.Fields{"cookie": subFlow.Cookie, "subflow": subFlow})
return flow, nil
}
//DelFlows - Triggers flow deletion after registering for flow indication event
func (mvp *MvlanProfile) DelFlows(device *VoltDevice, flow *of.VoltFlow) error {
mvp.mvpFlowLock.Lock()
defer mvp.mvpFlowLock.Unlock()
var flowMap map[string]bool
var ok bool
for cookie := range flow.SubFlows {
cookie := strconv.FormatUint(cookie, 10)
fe := &FlowEvent{
eType: EventTypeMcastFlowRemoved,
device: device.Name,
cookie: cookie,
eventData: mvp,
}
device.RegisterFlowDelEvent(cookie, fe)
if flowMap, ok = mvp.PendingDeleteFlow[device.Name]; !ok {
flowMap = make(map[string]bool)
}
flowMap[cookie] = true
mvp.PendingDeleteFlow[device.Name] = flowMap
}
if err := mvp.WriteToDb(); err != nil {
logger.Errorw(ctx, "Mvlan profile write to DB failed", log.Fields{"ProfileName": mvp.Name})
}
return cntlr.GetController().DelFlows(device.NniPort, device.Name, flow)
}
//FlowRemoveSuccess - Process flow success indication
func (mvp *MvlanProfile) FlowRemoveSuccess(cookie string, device string) {
mvp.mvpFlowLock.Lock()
defer mvp.mvpFlowLock.Unlock()
logger.Infow(ctx, "Mvlan Flow Remove Success Notification", log.Fields{"MvlanProfile": mvp.Name, "Cookie": cookie, "Device": device})
if _, ok := mvp.PendingDeleteFlow[device]; ok {
delete(mvp.PendingDeleteFlow[device], cookie)
}
if err := mvp.WriteToDb(); err != nil {
logger.Errorw(ctx, "Mvlan profile write to DB failed", log.Fields{"ProfileName": mvp.Name})
}
}
//FlowRemoveFailure - Process flow failure indication
func (mvp *MvlanProfile) FlowRemoveFailure(cookie string, device string, errorCode uint32, errReason string) {
mvp.mvpFlowLock.Lock()
defer mvp.mvpFlowLock.Unlock()
if flowMap, ok := mvp.PendingDeleteFlow[device]; ok {
if _, ok := flowMap[cookie]; ok {
logger.Errorw(ctx, "Mvlan Flow Remove Failure Notification", log.Fields{"MvlanProfile": mvp.Name, "Cookie": cookie, "ErrorCode": errorCode, "ErrorReason": errReason, "Device": device})
return
}
}
logger.Errorw(ctx, "Mvlan Flow Del Failure Notification for Unknown cookie", log.Fields{"MvlanProfile": mvp.Name, "Cookie": cookie, "ErrorCode": errorCode, "ErrorReason": errReason})
}
// AddMcGroup add the new group on the device when a receiver joins the group
func (igd *IgmpGroupDevice) AddMcGroup() {
if !igd.GroupInstalled {
group := &of.Group{}
group.Command = of.GroupCommandAdd
group.GroupID = igd.GroupID
group.Device = igd.Device
group.SetVlan = igd.PonVlan
group.IsPonVlanPresent = igd.IsPonVlanPresent
addbuckets := func(key interface{}, value interface{}) bool {
port := key.(string)
var portID uint32
if d := GetApplication().GetDevice(group.Device); d != nil {
GetApplication().portLock.Lock()
p := d.GetPort(port)
GetApplication().portLock.Unlock()
portID = p.ID
}
//ponPortID := key.(uint32)
if portID != 0xFF {
group.Buckets = append(group.Buckets, portID)
}
return true
}
igd.PortChannelMap.Range(addbuckets)
port, _ := GetApplication().GetNniPort(igd.Device)
_ = cntlr.GetController().GroupUpdate(port, igd.Device, group)
igd.GroupInstalled = true
}
}
// ModMcGroup updates the group on the device when either a receiver leaves
// or joins the group
func (igd *IgmpGroupDevice) ModMcGroup() {
if igd.GroupInstalled {
group := &of.Group{}
group.Command = of.GroupCommandMod
group.GroupID = igd.GroupID
group.Device = igd.Device
group.SetVlan = igd.PonVlan
group.IsPonVlanPresent = igd.IsPonVlanPresent
addbuckets := func(key interface{}, value interface{}) bool {
port := key.(string)
var portID uint32
if d := GetApplication().GetDevice(group.Device); d != nil {
GetApplication().portLock.Lock()
p := d.GetPort(port)
GetApplication().portLock.Unlock()
portID = p.ID
}
//ponPortID := key.(uint32)
if portID != 0xFF {
group.Buckets = append(group.Buckets, portID)
}
return true
}
igd.PortChannelMap.Range(addbuckets)
port, _ := GetApplication().GetNniPort(igd.Device)
_ = cntlr.GetController().GroupUpdate(port, igd.Device, group)
} else {
logger.Warnw(ctx, "Update Group Failed. Group not yet created", log.Fields{"Igd": igd.Device})
}
}
// DelMcGroup : The group is deleted when the last receiver leaves the group
func (igd *IgmpGroupDevice) DelMcGroup(forceDelete bool) {
logger.Infow(ctx, "Delete Mc Group Request", log.Fields{"Device": igd.Device, "GroupID": igd.GroupID, "ForceFlag": forceDelete, "GroupInstalled": igd.GroupInstalled})
/*
if !forceDelete && !checkIfForceGroupRemove(igd.Device) {
if success := AddToPendingPool(igd.Device, igd.getKey()); success {
return
}
}*/
if igd.GroupInstalled {
logger.Debugw(ctx, "Deleting Group", log.Fields{"Device": igd.Device, "Id": igd.GroupID})
group := &of.Group{}
group.Command = of.GroupCommandDel
group.GroupID = igd.GroupID
group.Device = igd.Device
group.ForceAction = true
port, _ := GetApplication().GetNniPort(igd.Device)
_ = cntlr.GetController().GroupUpdate(port, igd.Device, group)
igd.GroupInstalled = false
}
}
//AddToPendingPool - adds Igmp Device obj to pending pool
func AddToPendingPool(device string, groupKey string) bool {
logger.Infow(ctx, "Add Device to IgmpGroup Pending Pool", log.Fields{"Device": device, "GroupKey": groupKey})
if grp, ok := GetApplication().IgmpGroups.Load(groupKey); ok {
ig := grp.(*IgmpGroup)
ig.PendingPoolLock.Lock()
logger.Infow(ctx, "Adding Device to IgmpGroup Pending Pool", log.Fields{"Device": device, "GroupID": ig.GroupID, "GroupName": ig.GroupName, "GroupAddr": ig.GroupAddr.String()})
ig.PendingGroupForDevice[device] = time.Now().Add(time.Duration(GroupExpiryTime) * time.Minute)
ig.PendingPoolLock.Unlock()
if err := ig.WriteToDb(); err != nil {
logger.Errorw(ctx, "Igmp group Write to DB failed", log.Fields{"groupName": ig.GroupName})
}
return true
}
return false
}
/*
func checkIfForceGroupRemove(device string) bool {
if d := GetApplication().GetDevice(device); d != nil {
if d.State == cntlr.DeviceStateREBOOTED || d.State == cntlr.DeviceStateDOWN {
return true
}
}
return false
}*/
// IgmpLeaveToServer sends IGMP leave to server. Called when the last receiver leaves the group
func (igc *IgmpGroupChannel) IgmpLeaveToServer() {
if leave, err := IgmpLeavePacket(igc.GroupAddr, igc.Mvlan, (*igc.proxyCfg).IgmpCos, **igc.IgmpProxyIP); err == nil {
nni, err1 := GetApplication().GetNniPort(igc.Device)
if err1 == nil {
_ = cntlr.GetController().PacketOutReq(igc.Device, nni, nni, leave, false)
}
}
}
// SendLeaveToServer delete the group when the last receiver leaves the group
func (igc *IgmpGroupChannel) SendLeaveToServer() {
/**
+-------------------------------------------------------------------------+
| IGMP version(towards BNG) Configured at VGC |
+-------------------------------+-----------------------------------------+
| v2 | v3 |
+===================+==========+===============================+=========================================+
| Received From RG | V2 Leave | Process and Send as V2 to BNG | Process, Convert to V3 and Send to BNG/ |
| | | | Process, Send as V2, if the BNG is V2 |
+===================+----------+-------------------------------+-----------------------------------------+
| V3 Leave | Process and Send as V2 to BNG | Process, Send V3 to BNG |
| | | Process, Convert, Send as V2, if the |
| | | BNG is v2 |
+==========+===============================+=========================================+
*/
// igc.Version: igmp version received from RG.
// igc.ServVersion: igmp version received from BNG or IgmpVerToServer present in proxy igmp conf.
logger.Debugw(ctx, "Sending IGMP leave upstream", log.Fields{"Device": igc.Device})
if *igc.ServVersion == IgmpVersion2 || getVersion((*igc.proxyCfg).IgmpVerToServer) == IgmpVersion2 {
igc.IgmpLeaveToServer()
} else {
igc.SendReport(false)
}
}
// QueryExpiry processes query expiry. Upon expiry, take stock of the situation
// add either retain/release the group based on number of receivers left
func (igd *IgmpGroupDevice) QueryExpiry() {
logger.Debugw(ctx, "Query Expiry", log.Fields{"Device": igd.Device})
// Delete the IGMP flow added for this port if port state is down or query count exceeded
handleQueryExp := func(key interface{}, value interface{}) bool {
igc := value.(*IgmpGroupChannel)
for portKey, port := range igc.CurReceivers {
if portKey == StaticPort {
continue
}
logger.Warnw(ctx, "Expired Receiver Port", log.Fields{"PortKey": portKey, "IGP": port, "GroupAddr": igc.GroupAddr,
"Count": port.QueryTimeoutCount})
state, err := cntlr.GetController().GetPortState(igc.Device, portKey)
logger.Debugw(ctx, "Expired Member Port State", log.Fields{"state": state})
ponPortID := GetApplication().GetPonPortID(igd.Device, portKey)
if err == nil && state == cntlr.PortStateDown {
igd.DelReceiver(igc.GroupAddr, portKey, nil, ponPortID)
}
port.QueryTimeoutCount++
logger.Debugw(ctx, "Expired Port TimeoutCount", log.Fields{"count": port.QueryTimeoutCount})
if port.QueryTimeoutCount >= (*igc.proxyCfg).KeepAliveCount {
logger.Errorw(ctx, "Expiry Timeout count exceeded. Trigger delete receiver", log.Fields{"PortKey": portKey,
"GroupAddr": igc.GroupAddr, "Count": port.QueryTimeoutCount})
igd.DelReceiver(igc.GroupAddr, portKey, nil, ponPortID)
SendQueryExpiredEventGroupSpecific(portKey, igd, igc)
} else {
_ = port.WriteToDb(igc.Mvlan, igc.GroupAddr, igc.Device)
}
}
return true
}
igd.GroupChannels.Range(handleQueryExp)
}
// SendQueryExpiredEventGroupSpecific to send group specific query expired event.
func SendQueryExpiredEventGroupSpecific(portKey string, igd *IgmpGroupDevice, igc *IgmpGroupChannel) {
logger.Info(ctx, "Processing-SendQueryExpiredEventGroupSpecific-Event")
va := GetApplication()
mvpName := va.GetMvlanProfileByTag(igd.Mvlan).Name
sendEvent := func(key interface{}, value interface{}) bool {
if value.(*VoltService).IgmpEnabled && value.(*VoltService).MvlanProfileName == mvpName {
logger.Debugw(ctx, "sending-query-expired-group-specific-event", log.Fields{"EventType": QueryExpiredGroupSpecific, "ServiceName": value.(*VoltService).Name})
}
return false
}
// Fetching service name to send with query expired event.
vpvs, _ := va.VnetsByPort.Load(portKey)
if vpvs == nil {
logger.Errorw(ctx, "volt-port-vnet-is-nil", log.Fields{"vpvs": vpvs})
return
}
for _, vpv := range vpvs.([]*VoltPortVnet) {
vpv.services.Range(sendEvent)
}
}
// GetMcastServiceForSubAlarm to get mcast service name for subscriber alarm.
func GetMcastServiceForSubAlarm(uniPort *VoltPort, mvp *MvlanProfile) string {
var serviceName string
mvpName := mvp.Name
va := GetApplication()
sendAlm := func(key interface{}, value interface{}) bool {
if value.(*VoltService).IgmpEnabled && value.(*VoltService).MvlanProfileName == mvpName {
serviceName = value.(*VoltService).Name
}
return true
}
// Fetching service name to send with active channels exceeded per subscriber alarm.
vpvs, _ := va.VnetsByPort.Load(uniPort.Name)
if vpvs == nil {
logger.Errorw(ctx, "volt-port-vnet-is-nil", log.Fields{"vpvs": vpvs})
return serviceName
}
for _, vpv := range vpvs.([]*VoltPortVnet) {
vpv.services.Range(sendAlm)
}
return serviceName
}
// NumReceivers returns total number of receivers left on the group
func (igc *IgmpGroupChannel) NumReceivers() uint32 {
return uint32(len(igc.CurReceivers) + len(igc.NewReceivers))
}
// SendQuery sends query to the receivers for counting purpose
func (igc *IgmpGroupChannel) SendQuery() {
//var b []byte
//var err error
for portKey, port := range igc.NewReceivers {
igc.CurReceivers[portKey] = port
}
igc.NewReceivers = make(map[string]*IgmpGroupPort)
logger.Debugw(ctx, "Sending Query to receivers", log.Fields{"Receivers": igc.CurReceivers})
for port, groupPort := range igc.CurReceivers {
if port == StaticPort {
continue
}
if queryPkt, err := igc.buildQuery(igc.GroupAddr, of.VlanType(groupPort.CVlan), groupPort.Pbit); err == nil {
_ = cntlr.GetController().PacketOutReq(igc.Device, port, port, queryPkt, false)
logger.Debugw(ctx, "Query Sent", log.Fields{"Device": igc.Device, "Port": port, "Packet": queryPkt})
} else {
logger.Warnw(ctx, "Query Creation Failed", log.Fields{"Reason": err.Error()})
}
}
}
// buildQuery to build query packet
func (igc *IgmpGroupChannel) buildQuery(groupAddr net.IP, cVlan of.VlanType, pbit uint8) ([]byte, error) {
if igc.Version == IgmpVersion2 {
return Igmpv2QueryPacket(igc.GroupAddr, cVlan, **igc.IgmpProxyIP, pbit, (*igc.proxyCfg).MaxResp)
}
return Igmpv3QueryPacket(igc.GroupAddr, cVlan, **igc.IgmpProxyIP, pbit, (*igc.proxyCfg).MaxResp)
}
// IgmpGroup implements a single MCIP that may have multiple receivers
// connected via multiple devices (OLTs). The IGMP group is stored on the
// VOLT application.
type IgmpGroup struct {
GroupID uint32
Mvlan of.VlanType
PonVlan of.VlanType
GroupName string
GroupAddr net.IP
Devices map[string]*IgmpGroupDevice `json:"-"`
PendingGroupForDevice map[string]time.Time //map [deviceId, timestamp] (ExpiryTime = leave time + 15mins)
Version string
IsPonVlanPresent bool
IsChannelBasedGroup bool
PendingPoolLock sync.RWMutex
IsGroupStatic bool
IgmpGroupLock sync.RWMutex
}
// NewIgmpGroup is constructor for an IGMP group
func NewIgmpGroup(name string, vlan of.VlanType) *IgmpGroup {
ig := IgmpGroup{}
ig.GroupName = name
ig.Mvlan = vlan
ig.Devices = make(map[string]*IgmpGroupDevice)
ig.PendingGroupForDevice = make(map[string]time.Time)
return &ig
}
// IgmpGroupInit to initialize igmp group members
func (ig *IgmpGroup) IgmpGroupInit(name string, gip net.IP, mvp *MvlanProfile) {
ig.GroupName = name
ig.Mvlan = mvp.Mvlan
ig.PonVlan = mvp.PonVlan
ig.IsPonVlanPresent = mvp.IsPonVlanPresent
ig.Devices = make(map[string]*IgmpGroupDevice)
ig.PendingGroupForDevice = make(map[string]time.Time)
ig.IsChannelBasedGroup = mvp.IsChannelBasedGroup
ig.IsGroupStatic = mvp.Groups[name].IsStatic
if ig.IsChannelBasedGroup {
ig.GroupAddr = gip
} else {
ig.GroupAddr = net.ParseIP("0.0.0.0")
}
}
// IgmpGroupReInit to re-initialize igmp group members
func (ig *IgmpGroup) IgmpGroupReInit(name string, gip net.IP) {
logger.Infow(ctx, "Reinitialize Igmp Group", log.Fields{"GroupID": ig.GroupID, "OldName": ig.GroupName, "Name": name, "OldAddr": ig.GroupAddr.String(), "GroupAddr": gip.String()})
ig.GroupName = name
if ig.IsChannelBasedGroup {
ig.GroupAddr = gip
} else {
ig.GroupAddr = net.ParseIP("0.0.0.0")
}
for _, igd := range ig.Devices {
igd.IgmpGroupDeviceReInit(ig)
}
}
// IsStaticGroup to check if group is static
func (mvp *MvlanProfile) IsStaticGroup(groupName string) bool {
return mvp.Groups[groupName].IsStatic
}
// updateGroupName to update group name
func (ig *IgmpGroup) updateGroupName(newGroupName string) {
if !ig.IsChannelBasedGroup {
logger.Errorw(ctx, "Group name update not supported for GroupChannel based group", log.Fields{"Ig": ig})
return
}
oldKey := ig.getKey()
ig.GroupName = newGroupName
for _, igd := range ig.Devices {
igd.updateGroupName(newGroupName)
}
if err := ig.WriteToDb(); err != nil {
logger.Errorw(ctx, "Igmp group Write to DB failed", log.Fields{"groupName": ig.GroupName})
}
if !ig.IsChannelBasedGroup {
_ = db.DelIgmpGroup(oldKey)
}
}
//HandleGroupMigration - handles migration of group members between static & dynamic
func (ig *IgmpGroup) HandleGroupMigration(deviceID string, groupAddr net.IP) {
var group *layers.IGMPv3GroupRecord
app := GetApplication()
if deviceID == "" {
logger.Infow(ctx, "Handle Group Migration Request for all devices", log.Fields{"DeviceID": deviceID, "GroupAddr": groupAddr, "IG": ig.GroupName, "Mvlan": ig.Mvlan})
for device := range ig.Devices {
ig.HandleGroupMigration(device, groupAddr)
}
} else {
logger.Infow(ctx, "Handle Group Migration Request", log.Fields{"DeviceID": deviceID, "GroupAddr": groupAddr, "IG": ig.GroupName})
var newIg *IgmpGroup
receivers := ig.DelIgmpChannel(deviceID, groupAddr)
if ig.NumDevicesActive() == 0 {
app.DelIgmpGroup(ig)
}
if newIg = app.GetIgmpGroup(ig.Mvlan, groupAddr); newIg == nil {
logger.Infow(ctx, "IG Group doesn't exist, creating new group", log.Fields{"DeviceID": deviceID, "GroupAddr": groupAddr, "IG": ig.GroupName, "Mvlan": ig.Mvlan})
if newIg = app.AddIgmpGroup(app.GetMvlanProfileByTag(ig.Mvlan).Name, groupAddr, deviceID); newIg == nil {
logger.Errorw(ctx, "Group Creation failed during group migration", log.Fields{"DeviceID": deviceID, "GroupAddr": groupAddr})
return
}
}
mvp := app.GetMvlanProfileByTag(ig.Mvlan)
isStaticGroup := mvp.IsStaticGroup(ig.GroupName)
logger.Infow(ctx, "Existing receivers for old group", log.Fields{"Receivers": receivers})
newIg.IgmpGroupLock.Lock()
for port, igp := range receivers {
if !isStaticGroup && port == StaticPort {
continue
}
group = nil
var reqType layers.IGMPv3GroupRecordType
srcAddresses := []net.IP{}
if igp.Version == IgmpVersion3 {
if igp.Exclude {
srcAddresses = append(srcAddresses, igp.ExcludeList...)
reqType = layers.IGMPIsEx
} else {
srcAddresses = append(srcAddresses, igp.IncludeList...)
reqType = layers.IGMPIsIn
}
group = &layers.IGMPv3GroupRecord{
SourceAddresses: srcAddresses,
Type: reqType,
}
}
logger.Infow(ctx, "Adding receiver to new group", log.Fields{"DeviceID": deviceID, "GroupAddr": groupAddr, "newIg": newIg.GroupName, "IGP": igp})
ponPort := GetApplication().GetPonPortID(deviceID, port)
newIg.AddReceiver(deviceID, port, groupAddr, group, igp.Version, igp.CVlan, igp.Pbit, ponPort)
}
newIg.IgmpGroupLock.Unlock()
}
}
// AddIgmpGroupDevice add a device to the group which happens when the first receiver of the device
// is added to the IGMP group.
func (ig *IgmpGroup) AddIgmpGroupDevice(device string, id uint32, version uint8) *IgmpGroupDevice {
logger.Infow(ctx, "Adding Device to IGMP group", log.Fields{"Device": device, "GroupName": ig.GroupName})
igd := NewIgmpGroupDevice(device, ig, id, version)
ig.Devices[device] = igd
if err := igd.WriteToDb(); err != nil {
logger.Errorw(ctx, "Igmp group device Write to DB failed", log.Fields{"Device": igd.Device, "GroupName": igd.GroupName, "GroupAddr": igd.GroupAddr.String()})
}
return igd
}
// DelIgmpGroupDevice delete the device from the group which happens when we receive a leave or when
// there is not response for IGMP query from the receiver
func (ig *IgmpGroup) DelIgmpGroupDevice(igd *IgmpGroupDevice) {
logger.Infow(ctx, "Deleting Device from IGMP group", log.Fields{"Device": igd.Device, "Name": ig.GroupName})
va := GetApplication()
countersToBeUpdated := false
if igd.NumReceivers() != 0 {
countersToBeUpdated = true
}
igd.DelAllChannels()
//Clear all internal maps so that the groups can be reused
igd.PortChannelMap.Range(func(key, value interface{}) bool {
//Update the counters only if not already updated
//(i.e) 1. In case of channel remove during Mvlan Update
if countersToBeUpdated {
port := key.(string)
channelList := value.([]net.IP)
ponPortID := va.GetPonPortID(igd.Device, port)
for _, channel := range channelList {
igd.RemoveChannelFromChannelsPerPon(port, channel, ponPortID)
}
}
igd.PortChannelMap.Delete(key)
return true
})
igd.PonPortChannelMap = util.NewConcurrentMap()
if mcastCfg := va.GetMcastConfig(igd.SerialNo, va.GetMvlanProfileByTag(igd.Mvlan).Name); mcastCfg != nil {
mcastCfg.IgmpGroupDevices.Delete(igd.GroupID)
logger.Debugw(ctx, "Igd deleted from mcast config", log.Fields{"mvlan": mcastCfg.MvlanProfileID, "groupId": igd.GroupID})
}
if !igd.GroupInstalled {
_ = db.DelIgmpDevice(igd.Mvlan, ig.GroupName, ig.GroupAddr, igd.Device)
delete(ig.Devices, igd.Device)
}
}
// AddReceiver delete the device from the group which happens when we receive a leave or when
// there is not response for IGMP query from the receiver
func (ig *IgmpGroup) AddReceiver(device string, port string, groupIP net.IP,
group *layers.IGMPv3GroupRecord, ver uint8, cvlan uint16, pbit uint8, ponPort uint32) {
logger.Debugw(ctx, "Adding Receiver", log.Fields{"Port": port})
if igd, ok := ig.getIgmpGroupDevice(device); !ok {
igd = ig.AddIgmpGroupDevice(device, ig.GroupID, ver)
igd.AddReceiver(port, groupIP, group, ver, cvlan, pbit, ponPort)
} else {
logger.Infow(ctx, "IGMP Group Receiver", log.Fields{"IGD": igd.Device})
igd.AddReceiver(port, groupIP, group, ver, cvlan, pbit, ponPort)
}
}
func (ig *IgmpGroup) getIgmpGroupDevice(device string) (*IgmpGroupDevice, bool) {
ig.PendingPoolLock.Lock()
defer ig.PendingPoolLock.Unlock()
if _, ok := ig.PendingGroupForDevice[device]; ok {
logger.Infow(ctx, "Removing the IgmpGroupDevice from pending pool", log.Fields{"GroupID": ig.GroupID, "Device": device, "GroupName": ig.GroupName, "GroupAddr": ig.GroupAddr.String()})
delete(ig.PendingGroupForDevice, device)
if err := ig.WriteToDb(); err != nil {
logger.Errorw(ctx, "Igmp group Write to DB failed", log.Fields{"groupName": ig.GroupName})
}
}
igd, ok := ig.Devices[device]
return igd, ok
}
// DelReceiveronDownInd deletes a receiver which is the combination of device (OLT)
// and port on Port Down event
func (ig *IgmpGroup) DelReceiveronDownInd(device string, port string, ponPortID uint32) {
logger.Debugw(ctx, "Deleting Receiver for Group", log.Fields{"Device": device, "port": port})
mvp := GetApplication().GetMvlanProfileByTag(ig.Mvlan)
mvp.mvpLock.RLock()
defer mvp.mvpLock.RUnlock()
igd, ok := ig.Devices[device]
if !ok {
logger.Infow(ctx, "IGMP Group device was not found for ", log.Fields{"Device": device})
return
}
ipsList := []net.IP{}
ipsListIntf, ok := igd.PortChannelMap.Load(port)
if ok {
ipsList = append(ipsList, ipsListIntf.([]net.IP)...)
}
logger.Infow(ctx, "Port Channel List", log.Fields{"Port": port, "IPsList": ipsList})
igd.PortChannelMap.Range(printPortChannel)
for _, groupAddr := range ipsList {
logger.Debugw(ctx, "Port Channels", log.Fields{"Port": port, "IPsList": ipsList, "GroupAddr": groupAddr, "Len": len(ipsList)})
igd.DelReceiver(groupAddr, port, nil, ponPortID)
}
if igd.NumReceivers() == 0 {
ig.DelIgmpGroupDevice(igd)
}
}
// DelReceiver deletes a receiver which is the combination of device (OLT)
// and port
func (ig *IgmpGroup) DelReceiver(device string, port string, groupAddr net.IP, group *layers.IGMPv3GroupRecord, ponPortID uint32) {
logger.Debugw(ctx, "Deleting Receiver for Group", log.Fields{"Device": device, "port": port, "GroupIP": groupAddr.String()})
if igd, ok := ig.Devices[device]; ok {
//igd.DelReceiverForGroupAddr(groupAddr, port)
igd.DelReceiver(groupAddr, port, group, ponPortID)
if igd.NumReceivers() == 0 {
ig.DelIgmpGroupDevice(igd)
}
}
}
// GetAllIgmpChannelForDevice - Returns all channels with active members associated to the Igmp Group for the given device
func (ig *IgmpGroup) GetAllIgmpChannelForDevice(deviceID string) map[string]string {
if deviceID == "" {
return ig.GetAllIgmpChannel()
}
allChannels := make(map[string]string)
igd := ig.Devices[deviceID]
getAllChannels := func(key interface{}, value interface{}) bool {
channels := key.(string)
allChannels[channels] = channels //same value as only key is required
return true
}
igd.GroupChannels.Range(getAllChannels)
return allChannels
}
// GetAllIgmpChannel - Returns all channels with active members associated to the Igmp Group
func (ig *IgmpGroup) GetAllIgmpChannel() map[string]string {
allChannels := make(map[string]string)
for _, igd := range ig.Devices {
getAllChannels := func(key interface{}, value interface{}) bool {
channels := key.(string)
allChannels[channels] = channels
return true
}
igd.GroupChannels.Range(getAllChannels)
}
return allChannels
}
// DelIgmpChannel deletes all receivers for the provided igmp group channel for the given device
func (ig *IgmpGroup) DelIgmpChannel(deviceID string, groupAddr net.IP) map[string]*IgmpGroupPort {
logger.Infow(ctx, "Deleting Channel from devices", log.Fields{"Device": deviceID, "Group": ig.GroupName, "Channel": groupAddr.String()})
if deviceID == "" {
for device := range ig.Devices {
ig.DelIgmpChannel(device, groupAddr)
}
return nil
}
igd := ig.Devices[deviceID]
receivers := igd.DelChannelReceiver(groupAddr)
if igd.NumReceivers() == 0 {
ig.DelIgmpGroupDevice(igd)
}
return receivers
}
// IsNewReceiver checks if the received port is new receiver or existing one.
// Returns true if new receiver.
func (ig *IgmpGroup) IsNewReceiver(device, uniPortID string, groupAddr net.IP) bool {
if ig == nil {
// IGMP group does not exists. So considering it as new receiver.
return true
}
logger.Debugw(ctx, "IGMP Group", log.Fields{"channel": groupAddr, "groupName": ig.GroupName}) // TODO: Remove me
igd, exists := ig.Devices[device]
if !exists || !igd.GroupInstalled {
// IGMP group not exists OR Group is not created in the device.
// So this is a new receiver.
logger.Debugw(ctx, "igd not exists or group is not created in device", log.Fields{"exists": exists}) // TODO: Remove me
return true
}
if igc, ok := igd.GroupChannels.Load(groupAddr.String()); ok {
logger.Debugw(ctx, "IGMP Channel receivers", log.Fields{"igc-receivers": igc.(*IgmpGroupChannel).CurReceivers}) // TODO: Remove me
_, rcvrExistCur := igc.(*IgmpGroupChannel).CurReceivers[uniPortID]
_, rcvrExistNew := igc.(*IgmpGroupChannel).NewReceivers[uniPortID]
if rcvrExistCur || rcvrExistNew {
// Existing receiver
return false
}
}
return true
}
// Tick for Addition of groups to an MVLAN profile
func (ig *IgmpGroup) Tick() {
now := time.Now()
for _, igd := range ig.Devices {
var igdChangeCnt uint8
if _, ok := GetApplication().DevicesDisc.Load(igd.Device); !ok {
logger.Info(ctx, "Skipping Query and Expiry check since Device is unavailable")
continue
}
if now.After(igd.NextQueryTime) {
// Set the next query time and the query expiry time to
// KeepAliveInterval and MaxResp seconds after current time
igd.NextQueryTime = now.Add(time.Duration(igd.proxyCfg.KeepAliveInterval) * time.Second)
igd.QueryExpiryTime = now.Add(time.Duration(igd.proxyCfg.MaxResp) * time.Second)
logger.Debugw(ctx, "Query Start", log.Fields{"NextQuery": igd.NextQueryTime, "Expiry": igd.QueryExpiryTime})
igdChangeCnt++
logger.Debugw(ctx, "Sending Query to device", log.Fields{"Device": igd.Device})
sendQueryForAllChannels := func(key interface{}, value interface{}) bool {
igc := value.(*IgmpGroupChannel)
//TODO - Do generic query to avoid multiple msgs
igc.SendQuery()
return true
}
igd.GroupChannels.Range(sendQueryForAllChannels)
}
if now.After(igd.QueryExpiryTime) {
igd.QueryExpiry()
// This will keep it quiet till the next query time and then
// it will be reset to a value after the query initiation time
igd.QueryExpiryTime = igd.NextQueryTime
logger.Debugw(ctx, "Expiry", log.Fields{"NextQuery": igd.NextQueryTime, "Expiry": igd.QueryExpiryTime})
igdChangeCnt++
if igd.NumReceivers() == 0 {
ig.DelIgmpGroupDevice(igd)
continue
}
}
igdChangeCnt += igd.Tick()
if igdChangeCnt > 0 {
if err := igd.WriteToDb(); err != nil {
logger.Errorw(ctx, "Igmp group device Write to DB failed", log.Fields{"Device": igd.Device,
"GroupName": igd.GroupName, "GroupAddr": igd.GroupAddr.String()})
}
}
}
}
// QueryExpiry processes expiry of query sent to the receivers. Up on
// expiry, process the consolidated response for each of the devices participating
// in the MC stream. When a device has no receivers, the device is deleted
// from the group.
func (ig *IgmpGroup) QueryExpiry() {
for _, igd := range ig.Devices {
if _, ok := GetApplication().DevicesDisc.Load(igd.Device); ok {
igd.QueryExpiry()
if igd.NumReceivers() == 0 {
ig.DelIgmpGroupDevice(igd)
}
} else {
logger.Info(ctx, "Skipping Expiry since Device is unavailable")
}
}
}
// Hash : The IGMP group hash is used to distribute the processing of timers so that
// the processing is spread across doesn't spike at one instant. This also
// ensures that there is sufficient responsiveness to other requests happening
// simultaneously.
func (ig *IgmpGroup) Hash() uint16 {
mvp := GetApplication().GetMvlanProfileByTag(ig.Mvlan)
if mvp == nil {
return 0
}
mvp.mvpLock.RLock()
defer mvp.mvpLock.RUnlock()
group := mvp.Groups[ig.GroupName]
//Case where mvlan update in-progress
if group == nil || len(group.McIPs) == 0 {
return 0
}
groupIP := group.McIPs[0]
return uint16(groupIP[2])<<8 + uint16(groupIP[3])
}
// NumDevicesAll returns the number of devices (OLT) active on the IGMP group. When
// the last device leaves the IGMP group is removed. If this is not done,
// the number of IGMP groups only keep increasing and can impact CPU when
// the system runs for a very long duration
func (ig *IgmpGroup) NumDevicesAll() int {
return len(ig.Devices)
}
// NumDevicesActive returns the number of devices (OLT) active on the IGMP group. When
// the last device leaves the IGMP group is removed. If this is not done,
// the number of IGMP groups only keep increasing and can impact CPU when
// the system runs for a very long duration
func (ig *IgmpGroup) NumDevicesActive() int {
count := 0
for _, igd := range ig.Devices {
if igd.NumReceivers() == 0 && igd.GroupInstalled {
continue
}
count++
}
return count
}
// NumReceivers to return receiver list
func (ig *IgmpGroup) NumReceivers() map[string]int {
receiverList := make(map[string]int)
for device, igd := range ig.Devices {
receiverList[device] = igd.NumReceivers()
}
return receiverList
}
// RestoreDevices : IGMP group write to DB
func (ig *IgmpGroup) RestoreDevices() {
ig.migrateIgmpDevices()
devices, _ := db.GetIgmpDevices(ig.Mvlan, ig.GroupName, ig.GroupAddr)
for _, device := range devices {
b, ok := device.Value.([]byte)
if !ok {
logger.Warn(ctx, "The value type is not []byte")
continue
}
if igd, err := NewIgmpGroupDeviceFromBytes(b); err == nil {
igd.PonPortChannelMap = util.NewConcurrentMap()
// Update the proxy config pointers.
var mcastCfg *McastConfig
igd.proxyCfg, igd.IgmpProxyIP, mcastCfg = getIgmpProxyCfgAndIP(ig.Mvlan, igd.SerialNo)
if mcastCfg != nil {
mcastCfg.IgmpGroupDevices.Store(igd.GroupID, igd)
logger.Debugw(ctx, "Igd added to mcast config", log.Fields{"mvlan": mcastCfg.MvlanProfileID, "groupId": igd.GroupID})
}
mvp := GetApplication().GetMvlanProfileByTag(igd.Mvlan)
igd.ServVersion = mvp.IgmpServVersion[igd.SerialNo]
// During vgc upgrade from old version, igd.NextQueryTime and igd.QueryExpiryTime will not be present in db.
// hence they are initialized with current time offset.
emptyTime := time.Time{}
if emptyTime == igd.NextQueryTime {
logger.Debugw(ctx, "VGC igd upgrade", log.Fields{"igd grp name": igd.GroupName})
igd.NextQueryTime = time.Now().Add(time.Duration(igd.proxyCfg.KeepAliveInterval) * time.Second)
igd.QueryExpiryTime = time.Now().Add(time.Duration(igd.proxyCfg.KeepAliveInterval) * time.Second)
if err := igd.WriteToDb(); err != nil {
logger.Errorw(ctx, "Igmp group device Write to DB failed", log.Fields{"Device": igd.Device,
"GroupName": igd.GroupName, "GroupAddr": igd.GroupAddr.String()})
}
}
ig.Devices[igd.Device] = igd
if ig.IsChannelBasedGroup {
channel, _ := db.GetIgmpChannel(igd.Mvlan, igd.GroupName, igd.Device, igd.GroupAddr)
igd.RestoreChannel([]byte(channel))
} else {
igd.RestoreChannels()
}
igd.PortChannelMap.Range(printPortChannel)
logger.Infow(ctx, "Group Device Restored", log.Fields{"IGD": igd})
} else {
logger.Warnw(ctx, "Unable to decode device from database", log.Fields{"str": string(b)})
}
}
}
// getKey to return group key
func (ig *IgmpGroup) getKey() string {
profile, ok := GetApplication().MvlanProfilesByTag.Load(ig.Mvlan)
if ok {
mvp := profile.(*MvlanProfile)
return mvp.generateGroupKey(ig.GroupName, ig.GroupAddr.String())
}
return ""
}
/*
// getKey to return group key
func (igd *IgmpGroupDevice) getKey() string {
profile, ok := GetApplication().MvlanProfilesByTag.Load(igd.Mvlan)
if ok {
mvp := profile.(*MvlanProfile)
return mvp.generateGroupKey(igd.GroupName, igd.GroupAddr.String())
}
return ""
}*/
// generateGroupKey to generate group key
func (mvp *MvlanProfile) generateGroupKey(name string, ipAddr string) string {
if mvp.IsChannelBasedGroup {
return mvp.Mvlan.String() + "_" + ipAddr
}
return mvp.Mvlan.String() + "_" + name
}
// WriteToDb is utility to write Igmp Group Info to database
func (ig *IgmpGroup) WriteToDb() error {
ig.Version = database.PresentVersionMap[database.IgmpGroupPath]
b, err := json.Marshal(ig)
if err != nil {
return err
}
if err1 := db.PutIgmpGroup(ig.getKey(), string(b)); err1 != nil {
return err1
}
return nil
}
// RestoreIgmpGroupsFromDb to restore igmp groups from database
func (va *VoltApplication) RestoreIgmpGroupsFromDb() {
groups, _ := db.GetIgmpGroups()
for _, group := range groups {
b, ok := group.Value.([]byte)
if !ok {
logger.Warn(ctx, "The value type is not []byte")
continue
}
var ig IgmpGroup
err := json.Unmarshal(b, &ig)
if err != nil {
logger.Warn(ctx, "Unmarshal of IGMP Group failed")
continue
}
ig.Devices = make(map[string]*IgmpGroupDevice)
//For Upgrade Case
if len(ig.PendingGroupForDevice) == 0 {
ig.PendingGroupForDevice = make(map[string]time.Time)
}
logger.Infow(ctx, "Restoring Groups", log.Fields{"igGroupID": ig.GroupID, "igGroupName": ig.GroupName, "igMvlan": ig.Mvlan})
grpKey := ig.getKey()
va.IgmpGroups.Store(grpKey, &ig)
// Just delete and lose the IGMP group with the same group Id
if _, err := va.GetIgmpGroupID(ig.GroupID); err != nil {
logger.Warnw(ctx, "GetIgmpGroupID Failed", log.Fields{"igGroupID": ig.GroupID, "Error": err})
}
ig.RestoreDevices()
if ig.NumDevicesActive() == 0 {
va.AddGroupToPendingPool(&ig)
}
logger.Infow(ctx, "Restored Groups", log.Fields{"igGroupID": ig.GroupID, "igGroupName": ig.GroupName, "igMvlan": ig.Mvlan})
}
}
// AddIgmpGroup : When the first IGMP packet is received, the MVLAN profile is identified
// for the IGMP group and grp obj is obtained from the available pending pool of groups.
// If not, new group obj will be created based on available group IDs
func (va *VoltApplication) AddIgmpGroup(mvpName string, gip net.IP, device string) *IgmpGroup {
var ig *IgmpGroup
if mvp, grpName := va.GetMvlanProfileForMcIP(mvpName, gip); mvp != nil {
if ig = va.GetGroupFromPendingPool(mvp.Mvlan, device); ig != nil {
logger.Infow(ctx, "Igmp Group obtained from global pending pool", log.Fields{"MvlanProfile": mvpName, "GroupID": ig.GroupID, "Device": device, "GroupName": ig.GroupName, "GroupAddr": ig.GroupAddr.String()})
oldKey := mvp.generateGroupKey(ig.GroupName, ig.GroupAddr.String())
ig.IgmpGroupReInit(grpName, gip)
ig.IsGroupStatic = mvp.Groups[grpName].IsStatic
ig.UpdateIgmpGroup(oldKey, ig.getKey())
} else {
logger.Infow(ctx, "No Igmp Group available in global pending pool. Creating new Igmp Group", log.Fields{"MvlanProfile": mvpName, "Device": device, "GroupAddr": gip.String()})
if ig = va.GetAvailIgmpGroupID(); ig == nil {
logger.Error(ctx, "Igmp Group Creation Failed: Group Id Unavailable")
return nil
}
ig.IgmpGroupInit(grpName, gip, mvp)
grpKey := ig.getKey()
va.IgmpGroups.Store(grpKey, ig)
}
if err := ig.WriteToDb(); err != nil {
logger.Errorw(ctx, "Igmp group Write to DB failed", log.Fields{"groupName": ig.GroupName})
}
return ig
}
logger.Errorw(ctx, "GetMvlan Pro failed", log.Fields{"Group": gip})
return nil
}
// GetIgmpGroup helps search for the IGMP group from the list of
// active IGMP groups. For now, the assumption is that a group
// cannot belong to more than on MVLAN. If we change that definition,
// we have to take a relook at this implementation. The key will include
// both MVLAN and the group IP.
func (va *VoltApplication) GetIgmpGroup(mvlan of.VlanType, gip net.IP) *IgmpGroup {
profile, _ := va.MvlanProfilesByTag.Load(mvlan)
if profile == nil {
logger.Errorw(ctx, "Mvlan Profile not found for incoming packet. Dropping Request", log.Fields{"Mvlan": mvlan, "GroupAddr": gip.String()})
return nil
}
mvp := profile.(*MvlanProfile)
_, gName := va.GetMvlanProfileForMcIP(mvp.Name, gip)
grpKey := mvp.generateGroupKey(gName, gip.String())
logger.Debugw(ctx, "Get IGMP Group", log.Fields{"Group": grpKey})
igIntf, ok := va.IgmpGroups.Load(grpKey)
if ok {
logger.Debugw(ctx, "Get IGMP Group Success", log.Fields{"Group": grpKey})
ig := igIntf.(*IgmpGroup)
//Case: Group was part of pending and Join came with same channel or different channel from same group
// (from same or different device)
// In that case, the same group will be allocated since the group is still part of va.IgmpGroups
// So, the groups needs to be removed from global pending pool
va.RemoveGroupDevicesFromPendingPool(ig)
return ig
}
return nil
}
// GetStaticGroupName to get static igmp group
func (mvp *MvlanProfile) GetStaticGroupName(gip net.IP) string {
for _, mvg := range mvp.Groups {
if mvg.IsStatic {
if doesIPMatch(gip, mvg.McIPs) {
return mvg.Name
}
}
}
return ""
}
// GetStaticIgmpGroup to get static igmp group
func (mvp *MvlanProfile) GetStaticIgmpGroup(gip net.IP) *IgmpGroup {
staticGroupName := mvp.GetStaticGroupName(gip)
grpKey := mvp.generateGroupKey(staticGroupName, gip.String())
logger.Debugw(ctx, "Get Static IGMP Group", log.Fields{"Group": grpKey})
ig, ok := GetApplication().IgmpGroups.Load(grpKey)
if ok {
logger.Debugw(ctx, "Get Static IGMP Group Success", log.Fields{"Group": grpKey})
return ig.(*IgmpGroup)
}
return nil
}
// UpdateIgmpGroup : When the pending group is allocated to new
func (ig *IgmpGroup) UpdateIgmpGroup(oldKey, newKey string) {
//If the group is allocated to same McastGroup, no need to update the
//IgmpGroups map
if oldKey == newKey {
return
}
logger.Infow(ctx, "Updating Igmp Group with new MVP Group Info", log.Fields{"OldKey": oldKey, "NewKey": newKey, "GroupID": ig.GroupID})
GetApplication().IgmpGroups.Delete(oldKey)
_ = db.DelIgmpGroup(oldKey)
GetApplication().IgmpGroups.Store(newKey, ig)
if err := ig.WriteToDb(); err != nil {
logger.Errorw(ctx, "Igmp group Write to DB failed", log.Fields{"groupName": ig.GroupName})
}
}
// DelIgmpGroup : When the last subscriber leaves the IGMP group across all the devices
// the IGMP group is removed.
func (va *VoltApplication) DelIgmpGroup(ig *IgmpGroup) {
profile, found := GetApplication().MvlanProfilesByTag.Load(ig.Mvlan)
if found {
mvp := profile.(*MvlanProfile)
grpKey := mvp.generateGroupKey(ig.GroupName, ig.GroupAddr.String())
if igIntf, ok := va.IgmpGroups.Load(grpKey); ok {
ig := igIntf.(*IgmpGroup)
ig.IgmpGroupLock.Lock()
if ig.NumDevicesAll() == 0 {
logger.Debugw(ctx, "Deleting IGMP Group", log.Fields{"Group": grpKey})
va.PutIgmpGroupID(ig)
va.IgmpGroups.Delete(grpKey)
_ = db.DelIgmpGroup(grpKey)
} else {
logger.Infow(ctx, "Skipping IgmpGroup Device. Pending Igmp Group Devices present", log.Fields{"GroupID": ig.GroupID, "GroupName": ig.GroupName, "GroupAddr": ig.GroupAddr.String(), "PendingDevices": len(ig.Devices)})
va.AddGroupToPendingPool(ig)
if err := ig.WriteToDb(); err != nil {
logger.Errorw(ctx, "Igmp group Write to DB failed", log.Fields{"groupName": ig.GroupName})
}
}
ig.IgmpGroupLock.Unlock()
}
}
}
// GetPonPortID Gets the PON port ID from uniPortID
func (va *VoltApplication) GetPonPortID(device, uniPortID string) uint32 {
isNNI := strings.Contains(uniPortID, "nni")
if isNNI || uniPortID == StaticPort {
logger.Debugw(ctx, "Cannot get pon port from UNI port", log.Fields{"port": uniPortID})
return 0xFF
}
dIntf, ok := va.DevicesDisc.Load(device)
if !ok {
return 0xFF
}
d := dIntf.(*VoltDevice)
uniPort := d.GetPort(uniPortID)
if uniPort == nil {
return 0xFF
}
return GetPonPortIDFromUNIPort(uniPort.ID)
}
// AggActiveChannelsCountPerSub aggregates the active channel count for given uni port.
// It will iterate over all the groups and store the sum of active channels in VoltPort.
func (va *VoltApplication) AggActiveChannelsCountPerSub(device, uniPort string, port *VoltPort) {
var activeChannelCount uint32
collectActiveChannelCount := func(key interface{}, value interface{}) bool {
ig := value.(*IgmpGroup)
igd := ig.Devices[device]
if igd == nil {
return true
}
if portChannels, ok := igd.PortChannelMap.Load(uniPort); ok {
channelList := portChannels.([]net.IP)
activeChannelCount += uint32(len(channelList))
}
return true
}
va.IgmpGroups.Range(collectActiveChannelCount)
logger.Debugw(ctx, "AggrActiveChannelCount for Subscriber",
log.Fields{"UniPortID": uniPort, "count": activeChannelCount})
port.ActiveChannels = activeChannelCount
}
// AggActiveChannelsCountForPonPort Aggregates the active channel count for given pon port.
// It will iterate over all the groups and store the sum of active channels in VoltDevice.
func (va *VoltApplication) AggActiveChannelsCountForPonPort(device string, ponPortID uint32, port *PonPortCfg) {
var activeChannelCount uint32
collectActiveChannelCount := func(key interface{}, value interface{}) bool {
ig := value.(*IgmpGroup)
igd := ig.Devices[device]
if igd == nil {
return true
}
if ponPortChannels, ok := igd.PonPortChannelMap.Get(ponPortID); ok {
activeChannelCount += ponPortChannels.(*PonPortChannels).GetActiveChannelCount()
}
return true
}
va.IgmpGroups.Range(collectActiveChannelCount)
logger.Debugw(ctx, "AggrActiveChannelCount for Pon Port",
log.Fields{"PonPortID": ponPortID, "count": activeChannelCount})
port.ActiveIGMPChannels = activeChannelCount
}
// UpdateActiveChannelCountForPonPort increments the global counter for active
// channel count per pon port.
func (va *VoltApplication) UpdateActiveChannelCountForPonPort(device, uniPortID string, ponPortID uint32, isAdd, isChannel bool, igd *IgmpGroupDevice) {
incrDecr := func(value uint32) uint32 {
if isAdd {
return value + 1
}
return value - 1
}
if d, exists := va.DevicesDisc.Load(device); exists {
voltDevice := d.(*VoltDevice)
if isChannel {
voltDevice.ActiveChannelCountLock.Lock()
// If New channel is added/deleted, then only update the ActiveChannelsPerPon
if value, ok := voltDevice.ActiveChannelsPerPon.Load(ponPortID); ok {
port := value.(*PonPortCfg)
port.ActiveIGMPChannels = incrDecr(port.ActiveIGMPChannels)
voltDevice.ActiveChannelsPerPon.Store(ponPortID, port)
logger.Debugw(ctx, "+++ActiveChannelsPerPon", log.Fields{"count": port.ActiveIGMPChannels}) // TODO: remove me
}
voltDevice.ActiveChannelCountLock.Unlock()
}
if uPort, ok := voltDevice.Ports.Load(uniPortID); ok {
uniPort := uPort.(*VoltPort)
uniPort.ActiveChannels = incrDecr(uniPort.ActiveChannels)
voltDevice.Ports.Store(uniPortID, uniPort)
logger.Debugw(ctx, "+++ActiveChannelsPerSub", log.Fields{"count": uniPort.ActiveChannels}) // TODO: remove me
}
}
}
// IsMaxChannelsCountExceeded checks if the PON port active channel
// capacity and subscriber level channel capacity is reached to max allowed
// channel per pon threshold. If Exceeds, return true else return false.
func (va *VoltApplication) IsMaxChannelsCountExceeded(device, uniPortID string,
ponPortID uint32, ig *IgmpGroup, channelIP net.IP, mvp *MvlanProfile) bool {
// New receiver check is required to identify the IgmpReportMsg received
// in response to the IGMP Query sent from VGC.
if newReceiver := ig.IsNewReceiver(device, uniPortID, channelIP); !newReceiver {
logger.Debugw(ctx, "Not a new receiver. It is a response to IGMP Query",
log.Fields{"port": uniPortID, "channel": channelIP})
return false
}
if vDev, exists := va.DevicesDisc.Load(device); exists {
voltDevice := vDev.(*VoltDevice)
// Checking subscriber active channel count with maxChannelsAllowedPerSub
if uniPort, present := voltDevice.Ports.Load(uniPortID); present {
if uniPort.(*VoltPort).ActiveChannels >= mvp.MaxActiveChannels {
logger.Errorw(ctx, "Max allowed channels per subscriber is exceeded",
log.Fields{"activeCount": uniPort.(*VoltPort).ActiveChannels, "channel": channelIP, "UNI": uniPort.(*VoltPort).Name})
if !(uniPort.(*VoltPort).ChannelPerSubAlarmRaised) {
serviceName := GetMcastServiceForSubAlarm(uniPort.(*VoltPort), mvp)
logger.Debugw(ctx, "Raising-SendActiveChannelPerSubscriberAlarm-Initiated", log.Fields{"ActiveChannels": uniPort.(*VoltPort).ActiveChannels, "ServiceName": serviceName})
uniPort.(*VoltPort).ChannelPerSubAlarmRaised = true
}
return true
}
} else {
logger.Errorw(ctx, "UNI port not found in VoltDevice", log.Fields{"uniPortID": uniPortID})
}
if value, ok := voltDevice.ActiveChannelsPerPon.Load(ponPortID); ok {
ponPort := value.(*PonPortCfg)
logger.Debugw(ctx, "----Active channels count for PON port",
log.Fields{"PonPortID": ponPortID, "activeChannels": ponPort.ActiveIGMPChannels,
"maxAllowedChannelsPerPon": ponPort.MaxActiveChannels})
if ponPort.ActiveIGMPChannels < ponPort.MaxActiveChannels {
// PON port active channel capacity is not yet reached to max allowed channels per pon.
// So allowing to add receiver.
return false
} else if ponPort.ActiveIGMPChannels >= ponPort.MaxActiveChannels && ig != nil {
// PON port active channel capacity is reached to max allowed channels per pon.
// Check if same channel is already configured on that PON port.
// If that channel is present, then allow AddReceiver else it will be rejected.
igd, isPresent := ig.Devices[device]
if isPresent {
if channelListForPonPort, _ := igd.PonPortChannelMap.Get(ponPortID); channelListForPonPort != nil {
if _, isExists := channelListForPonPort.(*PonPortChannels).ChannelList.Get(channelIP.String()); isExists {
return false
}
}
}
}
logger.Errorw(ctx, "Active channels count for PON port exceeded",
log.Fields{"PonPortID": ponPortID, "activeChannels": ponPort.ActiveIGMPChannels, "channel": channelIP, "UNI": uniPortID})
} else {
logger.Warnw(ctx, "PON port level active channel count does not exists",
log.Fields{"ponPortID": ponPortID})
return false
}
}
logger.Warnw(ctx, "Max allowed channels per pon threshold is reached", log.Fields{"PonPortID": ponPortID})
return true
}
// ProcessIgmpv2Pkt : This is IGMPv2 packet.
func (va *VoltApplication) ProcessIgmpv2Pkt(device string, port string, pkt gopacket.Packet) {
// First get the layers of interest
dot1Q := pkt.Layer(layers.LayerTypeDot1Q).(*layers.Dot1Q)
pktVlan := of.VlanType(dot1Q.VLANIdentifier)
igmpv2 := pkt.Layer(layers.LayerTypeIGMP).(*layers.IGMPv1or2)
ponPortID := va.GetPonPortID(device, port)
var vpv *VoltPortVnet
logger.Debugw(ctx, "Received IGMPv2 Type", log.Fields{"Type": igmpv2.Type})
if igmpv2.Type == layers.IGMPMembershipReportV2 || igmpv2.Type == layers.IGMPMembershipReportV1 {
logger.Infow(ctx, "IGMP Join received: v2", log.Fields{"Addr": igmpv2.GroupAddress, "Port": port})
// This is a report coming from the PON. We must be able to first find the
// subscriber from the VLAN tag and port and verify if the IGMP proxy is
// enabled for the subscriber
vpv, _ = va.GetVnetFromPkt(device, port, pkt)
if vpv == nil {
logger.Errorw(ctx, "Couldn't find VNET associated with port", log.Fields{"Port": port})
return
} else if !vpv.IgmpEnabled {
logger.Errorw(ctx, "IGMP is not activated on the port", log.Fields{"Port": port})
return
}
mvp := va.GetMvlanProfileByName(vpv.MvlanProfileName)
if mvp == nil {
logger.Errorw(ctx, "Igmp Packet Received for Subscriber with Missing Mvlan Profile",
log.Fields{"Receiver": vpv.Port, "MvlanProfile": vpv.MvlanProfileName})
return
}
mvlan := mvp.Mvlan
mvp.mvpLock.RLock()
defer mvp.mvpLock.RUnlock()
// The subscriber is validated and now process the IGMP report
ig := va.GetIgmpGroup(mvlan, igmpv2.GroupAddress)
if yes := va.IsMaxChannelsCountExceeded(device, port, ponPortID, ig, igmpv2.GroupAddress, mvp); yes {
logger.Warnw(ctx, "Dropping IGMP Join v2: Active channel threshold exceeded",
log.Fields{"PonPortID": ponPortID, "Addr": igmpv2.GroupAddress, "MvlanProfile": vpv.MvlanProfileName})
return
}
if ig != nil {
logger.Infow(ctx, "IGMP Group", log.Fields{"Group": ig.GroupID, "devices": ig.Devices})
// If the IGMP group is already created. just add the receiver
ig.IgmpGroupLock.Lock()
// Check for port state to avoid race condition where PortDown event
// acquired lock before packet processing
vd := GetApplication().GetDevice(device)
vp := vd.GetPort(port)
if vp == nil || vp.State != PortStateUp {
logger.Warnw(ctx, "Join received from a Port that is DOWN or not present",
log.Fields{"Port": port})
ig.IgmpGroupLock.Unlock()
return
}
ig.AddReceiver(device, port, igmpv2.GroupAddress, nil, IgmpVersion2, dot1Q.VLANIdentifier, dot1Q.Priority, ponPortID)
ig.IgmpGroupLock.Unlock()
} else {
// Create the IGMP group and then add the receiver to the group
if ig := va.AddIgmpGroup(vpv.MvlanProfileName, igmpv2.GroupAddress, device); ig != nil {
logger.Infow(ctx, "New IGMP Group", log.Fields{"Group": ig.GroupID, "devices": ig.Devices})
ig.IgmpGroupLock.Lock()
// Check for port state to avoid race condition where PortDown event
// acquired lock before packet processing
vd := GetApplication().GetDevice(device)
vp := vd.GetPort(port)
if vp == nil || vp.State != PortStateUp {
logger.Warnw(ctx, "Join received from a Port that is DOWN or not present",
log.Fields{"Port": port})
ig.IgmpGroupLock.Unlock()
return
}
ig.AddReceiver(device, port, igmpv2.GroupAddress, nil, IgmpVersion2, dot1Q.VLANIdentifier, dot1Q.Priority, ponPortID)
ig.IgmpGroupLock.Unlock()
} else {
logger.Errorw(ctx, "IGMP Group Creation Failed", log.Fields{"Addr": igmpv2.GroupAddress})
return
}
}
} else if igmpv2.Type == layers.IGMPLeaveGroup {
// This is a IGMP leave coming from one of the receivers. We essentially remove the
// the receiver.
logger.Infow(ctx, "IGMP Leave received: v2", log.Fields{"Addr": igmpv2.GroupAddress, "Port": port})
vpv, _ = va.GetVnetFromPkt(device, port, pkt)
if vpv == nil {
logger.Errorw(ctx, "Couldn't find VNET associated with port", log.Fields{"Port": port})
return
} else if !vpv.IgmpEnabled {
logger.Errorw(ctx, "IGMP is not activated on the port", log.Fields{"Port": port})
return
}
mvp := va.GetMvlanProfileByName(vpv.MvlanProfileName)
mvp.mvpLock.RLock()
defer mvp.mvpLock.RUnlock()
mvlan := mvp.Mvlan
// The subscriber is validated and now process the IGMP report
if ig := va.GetIgmpGroup(mvlan, igmpv2.GroupAddress); ig != nil {
ig.IgmpGroupLock.Lock()
// Delete the receiver once the IgmpGroup is identified
ig.DelReceiver(device, port, igmpv2.GroupAddress, nil, ponPortID)
ig.IgmpGroupLock.Unlock()
if ig.NumDevicesActive() == 0 {
va.DelIgmpGroup(ig)
}
}
} else {
// This must be a query on the NNI port. However, we dont make that assumption.
// Need to look for the IGMP group based on the VLAN in the packet as
// the MVLAN
//Check if mvlan profile exist for the incoming pkt vlan
profile, _ := va.MvlanProfilesByTag.Load(pktVlan)
if profile == nil {
logger.Errorw(ctx, "Mvlan Profile not found for incoming packet. Dropping Request", log.Fields{"Mvlan": pktVlan})
return
}
mvp := profile.(*MvlanProfile)
mvp.mvpLock.RLock()
defer mvp.mvpLock.RUnlock()
if net.ParseIP("0.0.0.0").Equal(igmpv2.GroupAddress) {
va.processIgmpQueries(device, pktVlan, IgmpVersion2)
} else {
if ig := va.GetIgmpGroup(pktVlan, igmpv2.GroupAddress); ig != nil {
ig.IgmpGroupLock.Lock()
igd, ok := ig.Devices[device]
if ok {
igd.ProcessQuery(igmpv2.GroupAddress, IgmpVersion2)
} else {
logger.Warnw(ctx, "IGMP Device not found", log.Fields{"Device": device, "Group": igmpv2.GroupAddress})
}
ig.IgmpGroupLock.Unlock()
}
}
}
}
// ProcessIgmpv3Pkt : Process IGMPv3 packet
func (va *VoltApplication) ProcessIgmpv3Pkt(device string, port string, pkt gopacket.Packet) {
// First get the layers of interest
dot1QLayer := pkt.Layer(layers.LayerTypeDot1Q)
if dot1QLayer == nil {
logger.Error(ctx, "Igmp Packet Received without Vlan - Dropping pkt")
return
}
dot1Q := dot1QLayer.(*layers.Dot1Q)
pktVlan := of.VlanType(dot1Q.VLANIdentifier)
igmpv3 := pkt.Layer(layers.LayerTypeIGMP).(*layers.IGMP)
ponPortID := va.GetPonPortID(device, port)
var vpv *VoltPortVnet
logger.Debugw(ctx, "Received IGMPv3 Type", log.Fields{"Type": igmpv3.Type})
if igmpv3.Type == layers.IGMPMembershipReportV3 {
// This is a report coming from the PON. We must be able to first find the
// subscriber from the VLAN tag and port and verify if the IGMP proxy is
// enabled for the subscriber
vpv, _ = va.GetVnetFromPkt(device, port, pkt)
if vpv == nil {
logger.Errorw(ctx, "Couldn't find VNET associated with port", log.Fields{"Port": port})
return
} else if !vpv.IgmpEnabled {
logger.Errorw(ctx, "IGMP is not activated on the port", log.Fields{"Port": port})
return
}
mvp := va.GetMvlanProfileByName(vpv.MvlanProfileName)
if mvp == nil {
logger.Errorw(ctx, "Igmp Packet received for Subscriber with Missing Mvlan Profile",
log.Fields{"Receiver": vpv.Port, "MvlanProfile": vpv.MvlanProfileName})
return
}
mvp.mvpLock.RLock()
defer mvp.mvpLock.RUnlock()
mvlan := mvp.Mvlan
for _, group := range igmpv3.GroupRecords {
isJoin := isIgmpJoin(group.Type, group.SourceAddresses)
// The subscriber is validated and now process the IGMP report
ig := va.GetIgmpGroup(mvlan, group.MulticastAddress)
if isJoin {
if yes := va.IsMaxChannelsCountExceeded(device, port, ponPortID, ig, group.MulticastAddress, mvp); yes {
logger.Warnw(ctx, "Dropping IGMP Join v3: Active channel threshold exceeded",
log.Fields{"PonPortID": ponPortID, "Addr": group.MulticastAddress, "MvlanProfile": vpv.MvlanProfileName})
return
}
if ig != nil {
// If the IGMP group is already created. just add the receiver
logger.Infow(ctx, "IGMP Join received for existing group", log.Fields{"Addr": group.MulticastAddress, "Port": port})
ig.IgmpGroupLock.Lock()
// Check for port state to avoid race condition where PortDown event
// acquired lock before packet processing
vd := GetApplication().GetDevice(device)
vp := vd.GetPort(port)
if vp == nil || vp.State != PortStateUp {
logger.Warnw(ctx, "Join received from a Port that is DOWN or not present",
log.Fields{"Port": port})
ig.IgmpGroupLock.Unlock()
return
}
ig.AddReceiver(device, port, group.MulticastAddress, &group, IgmpVersion3,
dot1Q.VLANIdentifier, dot1Q.Priority, ponPortID)
ig.IgmpGroupLock.Unlock()
} else {
// Create the IGMP group and then add the receiver to the group
logger.Infow(ctx, "IGMP Join received for new group", log.Fields{"Addr": group.MulticastAddress, "Port": port})
if ig := va.AddIgmpGroup(vpv.MvlanProfileName, group.MulticastAddress, device); ig != nil {
ig.IgmpGroupLock.Lock()
// Check for port state to avoid race condition where PortDown event
// acquired lock before packet processing
vd := GetApplication().GetDevice(device)
vp := vd.GetPort(port)
if vp == nil || vp.State != PortStateUp {
logger.Warnw(ctx, "Join received from a Port that is DOWN or not present",
log.Fields{"Port": port})
ig.IgmpGroupLock.Unlock()
return
}
ig.AddReceiver(device, port, group.MulticastAddress, &group, IgmpVersion3,
dot1Q.VLANIdentifier, dot1Q.Priority, ponPortID)
ig.IgmpGroupLock.Unlock()
} else {
logger.Warnw(ctx, "IGMP Group Creation Failed", log.Fields{"Addr": group.MulticastAddress})
}
}
} else if ig != nil {
logger.Infow(ctx, "IGMP Leave received for existing group", log.Fields{"Addr": group.MulticastAddress, "Port": port})
ig.IgmpGroupLock.Lock()
ig.DelReceiver(device, port, group.MulticastAddress, &group, ponPortID)
ig.IgmpGroupLock.Unlock()
if ig.NumDevicesActive() == 0 {
va.DelIgmpGroup(ig)
}
} else {
logger.Warnw(ctx, "IGMP Leave received for unknown group", log.Fields{"Addr": group.MulticastAddress})
}
}
} else {
// This must be a query on the NNI port. However, we dont make that assumption.
// Need to look for the IGMP group based on the VLAN in the packet as
// the MVLAN
//Check if mvlan profile exist for the incoming pkt vlan
profile, _ := va.MvlanProfilesByTag.Load(pktVlan)
if profile == nil {
logger.Errorw(ctx, "Mvlan Profile not found for incoming packet. Dropping Request", log.Fields{"Mvlan": pktVlan})
return
}
mvp := profile.(*MvlanProfile)
mvp.mvpLock.RLock()
defer mvp.mvpLock.RUnlock()
if net.ParseIP("0.0.0.0").Equal(igmpv3.GroupAddress) {
va.processIgmpQueries(device, pktVlan, IgmpVersion3)
} else {
if ig := va.GetIgmpGroup(pktVlan, igmpv3.GroupAddress); ig != nil {
ig.IgmpGroupLock.Lock()
igd, ok := ig.Devices[device]
if ok {
igd.ProcessQuery(igmpv3.GroupAddress, IgmpVersion3)
} else {
logger.Warnw(ctx, "IGMP Device not found", log.Fields{"Device": device, "Group": igmpv3.GroupAddress})
}
ig.IgmpGroupLock.Unlock()
}
}
}
}
// processIgmpQueries to process the igmp queries
func (va *VoltApplication) processIgmpQueries(device string, pktVlan of.VlanType, version uint8) {
// This is a generic query and respond with all the groups channels in currently being viewed.
processquery := func(key interface{}, value interface{}) bool {
ig := value.(*IgmpGroup)
ig.IgmpGroupLock.Lock()
if ig.Mvlan != pktVlan {
ig.IgmpGroupLock.Unlock()
return true
}
igd, ok := ig.Devices[device]
if !ok {
logger.Warnw(ctx, "IGMP Device not found", log.Fields{"Device": device})
ig.IgmpGroupLock.Unlock()
return true
}
processQueryForEachChannel := func(key interface{}, value interface{}) bool {
groupAddr := key.(string)
igd.ProcessQuery(net.ParseIP(groupAddr), version)
return true
}
igd.GroupChannels.Range(processQueryForEachChannel)
ig.IgmpGroupLock.Unlock()
return true
}
va.IgmpGroups.Range(processquery)
}
// isIgmpJoin to check if it is igmp join
func isIgmpJoin(recordType layers.IGMPv3GroupRecordType, sourceAddr []net.IP) bool {
var join = false
if (layers.IGMPToEx == recordType) || (layers.IGMPIsEx == recordType) {
join = true
} else if layers.IGMPBlock == recordType {
if len(sourceAddr) == 0 {
join = true
}
} else if (layers.IGMPToIn == recordType) || (layers.IGMPIsIn == recordType) || (layers.IGMPAllow == recordType) {
if len(sourceAddr) != 0 {
join = true
}
}
return join
}
func isIncl(recordType layers.IGMPv3GroupRecordType) bool {
if (layers.IGMPToIn == recordType) || (layers.IGMPIsIn == recordType) || (layers.IGMPAllow == recordType) {
return true
}
return false
}
// IgmpProcessPkt to process the IGMP packet received. The packet received brings along with it
// the port on which the packet is received and the device the port is in.
func (va *VoltApplication) IgmpProcessPkt(device string, port string, pkt gopacket.Packet) {
igmpl := pkt.Layer(layers.LayerTypeIGMP)
if igmpl == nil {
logger.Error(ctx, "Invalid IGMP packet arrived as IGMP packet")
return
}
if igmp, ok := igmpl.(*layers.IGMPv1or2); ok {
// This is an IGMPv2 packet.
logger.Debugw(ctx, "IGMPv2 Packet Received", log.Fields{"IPAddr": igmp.GroupAddress})
va.ProcessIgmpv2Pkt(device, port, pkt)
return
}
if igmpv3, ok := igmpl.(*layers.IGMP); ok {
logger.Debugw(ctx, "IGMPv3 Packet Received", log.Fields{"NumOfGroups": igmpv3.NumberOfGroupRecords})
va.ProcessIgmpv3Pkt(device, port, pkt)
}
}
// IgmpPacketInd for igmp packet indication
func (va *VoltApplication) IgmpPacketInd(device string, port string, pkt gopacket.Packet) {
pt := NewIgmpPacketTask(device, port, pkt)
va.IgmpTasks.AddTask(pt)
}
// ------------------------------------------------------------
// MVLAN related implemnetation
//
// Each MVLAN is configured with groups of multicast IPs. The idea of
// groups is to be able to group some multicast channels into an individual
// PON group and have a unique multicast GEM port for that set. However, in
// the current implementation, the concept of grouping is not fully utilized.
// MvlanGroup structure
// A set of MC IPs form a group
// MCGroupProxy identifies source specific multicast(SSM) config.
type MCGroupProxy struct {
// Mode represents source list include/exclude
Mode common.MulticastSrcListMode
// SourceList represents list of multicast server IP addresses.
SourceList []net.IP
}
// MvlanGroup identifies MC group info
type MvlanGroup struct {
Name string
Wildcard bool
McIPs []string
IsStatic bool
}
// OperInProgress type
type OperInProgress uint8
const (
// UpdateInProgress constant
UpdateInProgress OperInProgress = 2
// NoOp constant
NoOp OperInProgress = 1
// Nil constant
Nil OperInProgress = 0
)
// MvlanProfile : A set of groups of MC IPs for a MVLAN profile. It is assumed that
// the MVLAN IP is not repeated within multiples groups and across
// MVLAN profiles. The first match is used up on search to lcoate the
// MVLAN profile for an MC IP
type MvlanProfile struct {
Name string
Mvlan of.VlanType
PonVlan of.VlanType
Groups map[string]*MvlanGroup
Proxy map[string]*MCGroupProxy
Version string
IsPonVlanPresent bool
IsChannelBasedGroup bool
DevicesList map[string]OperInProgress //device serial number //here
oldGroups map[string]*MvlanGroup
oldProxy map[string]*MCGroupProxy
MaxActiveChannels uint32
PendingDeleteFlow map[string]map[string]bool
DeleteInProgress bool
IgmpServVersion map[string]*uint8
mvpLock sync.RWMutex
mvpFlowLock sync.RWMutex
}
// NewMvlanProfile is constructor for MVLAN profile.
func NewMvlanProfile(name string, mvlan of.VlanType, ponVlan of.VlanType, isChannelBasedGroup bool, OLTSerialNums []string, actChannelPerPon uint32) *MvlanProfile {
var mvp MvlanProfile
mvp.Name = name
mvp.Mvlan = mvlan
mvp.PonVlan = ponVlan
mvp.mvpLock = sync.RWMutex{}
mvp.Groups = make(map[string]*MvlanGroup)
mvp.Proxy = make(map[string]*MCGroupProxy)
mvp.DevicesList = make(map[string]OperInProgress)
mvp.PendingDeleteFlow = make(map[string]map[string]bool)
mvp.IsChannelBasedGroup = isChannelBasedGroup
mvp.MaxActiveChannels = actChannelPerPon
mvp.DeleteInProgress = false
mvp.IgmpServVersion = make(map[string]*uint8)
if (ponVlan != of.VlanNone) && (ponVlan != 0) {
mvp.IsPonVlanPresent = true
}
return &mvp
}
// AddMvlanProxy for addition of groups to an MVLAN profile
func (mvp *MvlanProfile) AddMvlanProxy(name string, proxyInfo common.MulticastGroupProxy) {
proxy := &MCGroupProxy{}
proxy.Mode = proxyInfo.Mode
proxy.SourceList = util.GetExpIPList(proxyInfo.SourceList)
if _, ok := mvp.Proxy[name]; !ok {
logger.Debugw(ctx, "Added MVLAN Proxy", log.Fields{"Name": name, "Proxy": proxy})
} else {
logger.Debugw(ctx, "Updated MVLAN Proxy", log.Fields{"Name": name, "Proxy": proxy})
}
if proxyInfo.IsStatic == common.IsStaticYes {
mvp.Groups[name].IsStatic = true
}
mvp.Proxy[name] = proxy
}
// AddMvlanGroup for addition of groups to an MVLAN profile
func (mvp *MvlanProfile) AddMvlanGroup(name string, ips []string) {
mvg := &MvlanGroup{}
mvg.Name = name
mvg.Wildcard = len(ips) == 0
mvg.McIPs = ips
mvg.IsStatic = false
if _, ok := mvp.Groups[name]; !ok {
logger.Debugw(ctx, "Added MVLAN Group", log.Fields{"VLAN": mvp.Mvlan, "Name": name, "mvg": mvg, "IPs": mvg.McIPs})
} else {
logger.Debugw(ctx, "Updated MVLAN Group", log.Fields{"VLAN": mvp.Mvlan, "Name": name})
}
mvp.Groups[name] = mvg
}
// GetUsMatchVlan provides mvlan for US Match parameter
func (mvp *MvlanProfile) GetUsMatchVlan() of.VlanType {
if mvp.IsPonVlanPresent {
return mvp.PonVlan
}
return mvp.Mvlan
}
// WriteToDb is utility to write Mvlan Profile Info to database
func (mvp *MvlanProfile) WriteToDb() error {
if mvp.DeleteInProgress {
logger.Warnw(ctx, "Skipping Redis Update for MvlanProfile, MvlanProfile delete in progress", log.Fields{"Mvlan": mvp.Mvlan})
return nil
}
mvp.Version = database.PresentVersionMap[database.MvlanPath]
b, err := json.Marshal(mvp)
if err != nil {
return err
}
if err1 := db.PutMvlan(uint16(mvp.Mvlan), string(b)); err1 != nil {
return err1
}
return nil
}
//isChannelStatic - Returns true if the given channel is part of static group in the Mvlan Profile
func (mvp *MvlanProfile) isChannelStatic(channel net.IP) bool {
for _, mvg := range mvp.Groups {
if mvg.IsStatic {
if isChannelStatic := doesIPMatch(channel, mvg.McIPs); isChannelStatic {
return true
}
}
}
return false
}
//containsStaticChannels - Returns if any static channels is part of the Mvlan Profile
func (mvp *MvlanProfile) containsStaticChannels() bool {
for _, mvg := range mvp.Groups {
if mvg.IsStatic && len(mvg.McIPs) != 0 {
return true
}
}
return false
}
//getAllStaticChannels - Returns all static channels in the Mvlan Profile
func (mvp *MvlanProfile) getAllStaticChannels() ([]net.IP, bool) {
channelList := []net.IP{}
containsStatic := false
for _, mvg := range mvp.Groups {
if mvg.IsStatic {
staticChannels, _ := mvg.getAllChannels()
channelList = append(channelList, staticChannels...)
}
}
if len(channelList) > 0 {
containsStatic = true
}
return channelList, containsStatic
}
//getAllOldGroupStaticChannels - Returns all static channels in the Mvlan Profile
func (mvp *MvlanProfile) getAllOldGroupStaticChannels() ([]net.IP, bool) {
channelList := []net.IP{}
containsStatic := false
for _, mvg := range mvp.oldGroups {
if mvg.IsStatic {
staticChannels, _ := mvg.getAllChannels()
channelList = append(channelList, staticChannels...)
}
}
if len(channelList) > 0 {
containsStatic = true
}
return channelList, containsStatic
}
//getAllChannels - Returns all channels in the Mvlan Profile
func (mvg *MvlanGroup) getAllChannels() ([]net.IP, bool) {
channelList := []net.IP{}
if mvg == nil || len(mvg.McIPs) == 0 {
return []net.IP{}, false
}
grpChannelOrRange := mvg.McIPs
for _, channelOrRange := range grpChannelOrRange {
if strings.Contains(channelOrRange, "-") {
var splits = strings.Split(channelOrRange, "-")
ipStart := util.IP2LongConv(net.ParseIP(splits[0]))
ipEnd := util.IP2LongConv(net.ParseIP(splits[1]))
for i := ipStart; i <= ipEnd; i++ {
channelList = append(channelList, util.Long2ipConv(i))
}
} else {
channelList = append(channelList, net.ParseIP(channelOrRange))
}
}
return channelList, true
}
//SetUpdateStatus - Sets profile update status for devices
func (mvp *MvlanProfile) SetUpdateStatus(serialNum string, status OperInProgress) {
if serialNum != "" {
mvp.DevicesList[serialNum] = status
return
}
for srNo := range mvp.DevicesList {
mvp.DevicesList[srNo] = status
}
}
//isUpdateInProgress - checking is update is in progress for the mvlan profile
func (mvp *MvlanProfile) isUpdateInProgress() bool {
for srNo := range mvp.DevicesList {
if mvp.DevicesList[srNo] == UpdateInProgress {
return true
}
}
return false
}
//IsUpdateInProgressForDevice - Checks is Mvlan Profile update is is progress for the given device
func (mvp *MvlanProfile) IsUpdateInProgressForDevice(device string) bool {
if vd := GetApplication().GetDevice(device); vd != nil {
if mvp.DevicesList[vd.SerialNum] == UpdateInProgress {
return true
}
}
return false
}
// DelFromDb to delere mvlan from database
func (mvp *MvlanProfile) DelFromDb() {
_ = db.DelMvlan(uint16(mvp.Mvlan))
}
// storeMvlansMap to store mvlan map
func (va *VoltApplication) storeMvlansMap(mvlan of.VlanType, name string, mvp *MvlanProfile) {
va.MvlanProfilesByTag.Store(mvlan, mvp)
va.MvlanProfilesByName.Store(name, mvp)
}
// deleteMvlansMap to delete mvlan map
func (va *VoltApplication) deleteMvlansMap(mvlan of.VlanType, name string) {
va.MvlanProfilesByTag.Delete(mvlan)
va.MvlanProfilesByName.Delete(name)
}
// RestoreMvlansFromDb to read from the DB and restore all the MVLANs
func (va *VoltApplication) RestoreMvlansFromDb() {
mvlans, _ := db.GetMvlans()
for _, mvlan := range mvlans {
b, ok := mvlan.Value.([]byte)
if !ok {
logger.Warn(ctx, "The value type is not []byte")
continue
}
var mvp MvlanProfile
err := json.Unmarshal(b, &mvp)
if err != nil {
logger.Warn(ctx, "Unmarshal of MVLAN failed")
continue
}
va.storeMvlansMap(mvp.Mvlan, mvp.Name, &mvp)
for srNo := range mvp.DevicesList {
if mvp.IgmpServVersion[srNo] == nil {
servVersion := IgmpVersion0
mvp.IgmpServVersion[srNo] = &servVersion
}
}
logger.Infow(ctx, "Restored Mvlan Profile", log.Fields{"MVPName": mvp.Name})
}
}
// GetMvlanProfileByTag fetches MVLAN profile based on the MC VLAN
func (va *VoltApplication) GetMvlanProfileByTag(vlan of.VlanType) *MvlanProfile {
if mvp, ok := va.MvlanProfilesByTag.Load(vlan); ok {
return mvp.(*MvlanProfile)
}
return nil
}
// GetMvlanProfileByName fetches MVLAN profile based on the profile name.
func (va *VoltApplication) GetMvlanProfileByName(name string) *MvlanProfile {
if mvp, ok := va.MvlanProfilesByName.Load(name); ok {
return mvp.(*MvlanProfile)
}
return nil
}
//UpdateMvlanProfile - only channel groups be updated
func (va *VoltApplication) UpdateMvlanProfile(name string, vlan of.VlanType, groups map[string][]string, activeChannelCount int, proxy map[string]common.MulticastGroupProxy) error {
mvpIntf, ok := va.MvlanProfilesByName.Load(name)
if !ok {
logger.Error(ctx, "Update Mvlan Failed: Profile does not exist")
return errors.New("MVLAN profile not found")
}
mvp := mvpIntf.(*MvlanProfile)
// check if groups are same then just update the OLTSerial numbers, push the config on new serial numbers
existingGroup := mvp.Groups
existingProxy := mvp.Proxy
mvp.Groups = make(map[string]*MvlanGroup)
mvp.Proxy = make(map[string]*MCGroupProxy)
/* Need to protect groups and proxy write lock */
mvp.mvpLock.Lock()
for grpName, grpIPList := range groups {
mvp.AddMvlanGroup(grpName, grpIPList)
}
for grpName, proxyInfo := range proxy {
mvp.AddMvlanProxy(grpName, proxyInfo)
}
if _, ok := mvp.Groups[common.StaticGroup]; ok {
if _, yes := mvp.Proxy[common.StaticGroup]; !yes {
mvp.Groups[common.StaticGroup].IsStatic = true
}
}
prevMaxActiveChannels := mvp.MaxActiveChannels
if reflect.DeepEqual(mvp.Groups, existingGroup) && reflect.DeepEqual(mvp.Proxy, existingProxy) {
logger.Info(ctx, "No change in groups config")
if uint32(activeChannelCount) != mvp.MaxActiveChannels {
mvp.MaxActiveChannels = uint32(activeChannelCount)
if err := mvp.WriteToDb(); err != nil {
logger.Errorw(ctx, "Mvlan profile Write to DB failed", log.Fields{"ProfileName": mvp.Name})
}
if prevMaxActiveChannels != mvp.MaxActiveChannels {
mvp.UpdateActiveChannelSubscriberAlarm()
}
}
mvp.mvpLock.Unlock()
return nil
}
mvp.mvpLock.Unlock()
mvp.MaxActiveChannels = uint32(activeChannelCount)
// Status is maintained so that in the event of any crash or reboot during update,
// the recovery is possible once the pod is UP again
mvp.SetUpdateStatus("", UpdateInProgress)
mvp.oldGroups = existingGroup
mvp.oldProxy = existingProxy
va.storeMvlansMap(vlan, name, mvp)
if err := mvp.WriteToDb(); err != nil {
logger.Errorw(ctx, "Mvlan profile Write to DB failed", log.Fields{"ProfileName": mvp.Name})
}
if prevMaxActiveChannels != mvp.MaxActiveChannels {
mvp.UpdateActiveChannelSubscriberAlarm()
}
// The update task is added as part of Igm p task list, so that any parallel igmp pkt processing is avoided
// Until, the update operation is completed, the igmp pkt processing will be enqueued
updateTask := NewUpdateMvlanTask(mvp, "")
va.IgmpTasks.AddTask(updateTask)
return nil
}
// isDeviceInList to check if device is the list
func isDeviceInList(serialNum string, OLTSerialNums []string) bool {
for _, oltSerialNum := range OLTSerialNums {
if serialNum == oltSerialNum {
return true
}
}
return false
}
// McastConfigKey creates the key using the olt serial number and mvlan profile id
func McastConfigKey(oltSerialNum string, mvlanProfID string) string {
return oltSerialNum + "_" + mvlanProfID
}
// GetMcastConfig to get McastConfig Information by OLT and Mvlan Profile ID
func (va *VoltApplication) GetMcastConfig(oltSerialNum string, mvlanProfID string) *McastConfig {
if mc, ok := va.McastConfigMap.Load(McastConfigKey(oltSerialNum, mvlanProfID)); ok {
return mc.(*McastConfig)
}
return nil
}
func (va *VoltApplication) storeMcastConfig(oltSerialNum string, mvlanProfID string, mcastConfig *McastConfig) {
va.McastConfigMap.Store(McastConfigKey(oltSerialNum, mvlanProfID), mcastConfig)
}
func (va *VoltApplication) deleteMcastConfig(oltSerialNum string, mvlanProfID string) {
va.McastConfigMap.Delete(McastConfigKey(oltSerialNum, mvlanProfID))
}
// AddMcastConfig for addition of a MVLAN profile
func (va *VoltApplication) AddMcastConfig(MvlanProfileID string, IgmpProfileID string, IgmpProxyIP string, OltSerialNum string) error {
var mcastCfg *McastConfig
mcastCfg = va.GetMcastConfig(OltSerialNum, MvlanProfileID)
if mcastCfg == nil {
mcastCfg = &McastConfig{}
} else {
logger.Debugw(ctx, "Mcast Config already exists", log.Fields{"OltSerialNum": mcastCfg.OltSerialNum,
"MVLAN Profile ID": mcastCfg.MvlanProfileID})
}
// Update all igds available
mvpIntf, ok := va.MvlanProfilesByName.Load(MvlanProfileID)
if !ok {
return errors.New("MVLAN profile not found during add mcast config")
}
mvlan := mvpIntf.(*MvlanProfile).Mvlan
mcastCfg.OltSerialNum = OltSerialNum
mcastCfg.MvlanProfileID = MvlanProfileID
mcastCfg.IgmpProfileID = IgmpProfileID
mcastCfg.IgmpProxyIP = net.ParseIP(IgmpProxyIP)
proxyCfg := va.getIgmpProfileMap(IgmpProfileID)
iterIgmpGroups := func(key interface{}, value interface{}) bool {
ig := value.(*IgmpGroup)
if ig.Mvlan != mvlan {
return true
}
for _, igd := range ig.Devices {
if igd.SerialNo != OltSerialNum {
continue
}
igd.proxyCfg = proxyCfg
if IgmpProfileID == "" {
igd.IgmpProxyIP = &igd.proxyCfg.IgmpSourceIP
} else {
igd.IgmpProxyIP = &mcastCfg.IgmpProxyIP
}
mcastCfg.IgmpGroupDevices.Store(igd.GroupID, igd)
logger.Debugw(ctx, "Igd updated with proxyCfg and proxyIP", log.Fields{"name": igd.GroupName,
"IgmpProfileID": IgmpProfileID, "ProxyIP": mcastCfg.IgmpProxyIP})
}
return true
}
va.IgmpGroups.Range(iterIgmpGroups)
va.storeMcastConfig(OltSerialNum, MvlanProfileID, mcastCfg)
if err := mcastCfg.WriteToDb(); err != nil {
logger.Errorw(ctx, "McastConfig Write to DB failed", log.Fields{"OltSerialNum": mcastCfg.OltSerialNum, "MvlanProfileID": mcastCfg.MvlanProfileID})
}
va.addOltToMvlan(MvlanProfileID, OltSerialNum)
return nil
}
func (va *VoltApplication) addOltToMvlan(MvlanProfileID string, OltSerialNum string) {
var mvp *MvlanProfile
if mvpIntf, ok := va.MvlanProfilesByName.Load(MvlanProfileID); ok {
servVersion := IgmpVersion0
mvp = mvpIntf.(*MvlanProfile)
mvp.DevicesList[OltSerialNum] = NoOp
mvp.IgmpServVersion[OltSerialNum] = &servVersion
if err := mvp.WriteToDb(); err != nil {
logger.Errorw(ctx, "Mvlan profile Write to DB failed", log.Fields{"ProfileName": mvp.Name})
}
mvp.pushIgmpMcastFlows(OltSerialNum)
}
}
func (va *VoltApplication) delOltFromMvlan(MvlanProfileID string, OltSerialNum string) {
var mvp *MvlanProfile
if mvpIntf, ok := va.MvlanProfilesByName.Load(MvlanProfileID); ok {
mvp = mvpIntf.(*MvlanProfile)
//Delete from mvp list
mvp.removeIgmpMcastFlows(OltSerialNum)
delete(mvp.DevicesList, OltSerialNum)
if err := mvp.WriteToDb(); err != nil {
logger.Errorw(ctx, "Mvlan profile Write to DB failed", log.Fields{"ProfileName": mvp.Name})
}
}
}
// DelMcastConfig for addition of a MVLAN profile
func (va *VoltApplication) DelMcastConfig(MvlanProfileID string, IgmpProfileID string, IgmpProxyIP string, OltSerialNum string) {
va.delOltFromMvlan(MvlanProfileID, OltSerialNum)
va.deleteMcastConfig(OltSerialNum, MvlanProfileID)
_ = db.DelMcastConfig(McastConfigKey(OltSerialNum, MvlanProfileID))
if d := va.GetDeviceBySerialNo(OltSerialNum); d != nil {
if mvp := va.GetMvlanProfileByName(MvlanProfileID); mvp != nil {
va.RemoveGroupsFromPendingPool(d.Name, mvp.Mvlan)
}
}
}
// DelAllMcastConfig for deletion of all mcast config
func (va *VoltApplication) DelAllMcastConfig(OltSerialNum string) error {
deleteIndividualMcastConfig := func(key interface{}, value interface{}) bool {
mcastCfg := value.(*McastConfig)
if mcastCfg.OltSerialNum == OltSerialNum {
va.DelMcastConfig(mcastCfg.MvlanProfileID, mcastCfg.IgmpProfileID, mcastCfg.IgmpProxyIP.String(), mcastCfg.OltSerialNum)
}
return true
}
va.McastConfigMap.Range(deleteIndividualMcastConfig)
return nil
}
// UpdateMcastConfig for addition of a MVLAN profile
func (va *VoltApplication) UpdateMcastConfig(MvlanProfileID string, IgmpProfileID string, IgmpProxyIP string, OltSerialNum string) error {
mcastCfg := va.GetMcastConfig(OltSerialNum, MvlanProfileID)
if mcastCfg == nil {
logger.Warnw(ctx, "Mcast Config not found. Unable to update", log.Fields{"Mvlan Profile ID": MvlanProfileID, "OltSerialNum": OltSerialNum})
return nil
}
oldProfID := mcastCfg.IgmpProfileID
mcastCfg.IgmpProfileID = IgmpProfileID
mcastCfg.IgmpProxyIP = net.ParseIP(IgmpProxyIP)
va.storeMcastConfig(OltSerialNum, MvlanProfileID, mcastCfg)
// Update all igds
if oldProfID != mcastCfg.IgmpProfileID {
updateIgdProxyCfg := func(key interface{}, value interface{}) bool {
igd := value.(*IgmpGroupDevice)
igd.proxyCfg = va.getIgmpProfileMap(mcastCfg.IgmpProfileID)
if IgmpProfileID == "" {
igd.IgmpProxyIP = &igd.proxyCfg.IgmpSourceIP
} else {
igd.IgmpProxyIP = &mcastCfg.IgmpProxyIP
}
return true
}
mcastCfg.IgmpGroupDevices.Range(updateIgdProxyCfg)
}
if err := mcastCfg.WriteToDb(); err != nil {
logger.Errorw(ctx, "McastConfig Write to DB failed", log.Fields{"OltSerialNum": mcastCfg.OltSerialNum, "MvlanProfileID": mcastCfg.MvlanProfileID})
}
return nil
}
// WriteToDb is utility to write Mcast config Info to database
func (mc *McastConfig) WriteToDb() error {
mc.Version = database.PresentVersionMap[database.McastConfigPath]
b, err := json.Marshal(mc)
if err != nil {
return err
}
if err1 := db.PutMcastConfig(McastConfigKey(mc.OltSerialNum, mc.MvlanProfileID), string(b)); err1 != nil {
return err1
}
return nil
}
// RestoreMcastConfigsFromDb to read from the DB and restore Mcast configs
func (va *VoltApplication) RestoreMcastConfigsFromDb() {
mcastConfigs, _ := db.GetMcastConfigs()
for hash, mcastConfig := range mcastConfigs {
b, ok := mcastConfig.Value.([]byte)
if !ok {
logger.Warn(ctx, "The value type is not []byte")
continue
}
var mc McastConfig
err := json.Unmarshal(b, &mc)
if err != nil {
logger.Warn(ctx, "Unmarshal of Mcast config failed")
continue
}
va.storeMcastConfig(mc.OltSerialNum, mc.MvlanProfileID, &mc)
logger.Infow(ctx, "Restored Mcast config", log.Fields{"OltSerialNum": mc.OltSerialNum, "MvlanProfileID": mc.MvlanProfileID, "hash": hash})
}
}
// AddMvlanProfile for addition of a MVLAN profile
func (va *VoltApplication) AddMvlanProfile(name string, mvlan of.VlanType, ponVlan of.VlanType,
groups map[string][]string, isChannelBasedGroup bool, OLTSerialNum []string, activeChannelsPerPon int, proxy map[string]common.MulticastGroupProxy) error {
var mvp *MvlanProfile
if mvp = va.GetMvlanProfileByTag(mvlan); mvp != nil {
logger.Errorw(ctx, "Duplicate MVLAN ID configured", log.Fields{"mvlan": mvlan})
return errors.New("MVLAN profile with same VLANID exists")
}
if mvpIntf, ok := va.MvlanProfilesByName.Load(name); ok {
mvp = mvpIntf.(*MvlanProfile)
for _, serialNum := range OLTSerialNum {
if mvp.DevicesList[serialNum] != Nil {
//This is backup restore scenario, just update the profile
logger.Info(ctx, "Add Mvlan : Profile Name already exists, update-the-profile")
return va.UpdateMvlanProfile(name, mvlan, groups, activeChannelsPerPon, proxy)
}
}
}
if mvp == nil {
mvp = NewMvlanProfile(name, mvlan, ponVlan, isChannelBasedGroup, OLTSerialNum, uint32(activeChannelsPerPon))
}
va.storeMvlansMap(mvlan, name, mvp)
/* Need to protect groups and proxy write lock */
mvp.mvpLock.Lock()
for grpName, grpInfo := range groups {
mvp.AddMvlanGroup(grpName, grpInfo)
}
for grpName, proxyInfo := range proxy {
mvp.AddMvlanProxy(grpName, proxyInfo)
}
if _, ok := mvp.Groups[common.StaticGroup]; ok {
if _, yes := mvp.Proxy[common.StaticGroup]; !yes {
mvp.Groups[common.StaticGroup].IsStatic = true
}
}
logger.Debugw(ctx, "Added MVLAN Profile", log.Fields{"MVLAN": mvp.Mvlan, "PonVlan": mvp.PonVlan, "Name": mvp.Name, "Grp IPs": mvp.Groups, "IsPonVlanPresent": mvp.IsPonVlanPresent})
mvp.mvpLock.Unlock()
if err := mvp.WriteToDb(); err != nil {
logger.Errorw(ctx, "Mvlan profile Write to DB failed", log.Fields{"ProfileName": mvp.Name})
}
return nil
}
//pushIgmpMcastFlows - Adds all IGMP related flows (generic DS flow & static group flows)
func (mvp *MvlanProfile) pushIgmpMcastFlows(OLTSerialNum string) {
mvp.mvpLock.RLock()
defer mvp.mvpLock.RUnlock()
if mvp.DevicesList[OLTSerialNum] == Nil {
logger.Infow(ctx, "Mvlan Profile not configure for device", log.Fields{"Device": OLTSerialNum, "Mvlan": mvp.Mvlan})
return
}
d := GetApplication().GetDeviceBySerialNo(OLTSerialNum)
if d == nil {
logger.Warnw(ctx, "Skipping Igmp & Mcast Flow processing: Device Not Found", log.Fields{"Device_SrNo": OLTSerialNum, "Mvlan": mvp.Mvlan})
return
}
p := d.GetPort(d.NniPort)
if p != nil && p.State == PortStateUp {
logger.Infow(ctx, "NNI Port Status is: UP & Vlan Enabled", log.Fields{"Device": d, "port": p})
//Push Igmp DS Control Flows
err := mvp.ApplyIgmpDSFlowForMvp(d.Name)
if err != nil {
logger.Errorw(ctx, "DS IGMP Flow Add Failed for device",
log.Fields{"Reason": err.Error(), "device": d.Name})
}
//Trigger Join for static channels
if channelList, containsStatic := mvp.getAllStaticChannels(); containsStatic {
mvp.ProcessStaticGroup(d.Name, channelList, true)
} else {
logger.Infow(ctx, "No Static Channels Present", log.Fields{"mvp": mvp.Name, "Mvlan": mvp.Mvlan})
}
}
}
/*
//pushIgmpMcastFlowsToAllOlt - Adds all IGMP related flows (generic DS flow & static group flows) to all OLTs
func (mvp *MvlanProfile) pushIgmpMcastFlowsToAllOlt() {
//for all devices apply igmp DS trap flow rules
pushIgmpFlows := func(key interface{}, value interface{}) bool {
d := value.(*VoltDevice)
p := d.GetPort(d.NniPort)
if p != nil && p.State == PortStateUp {
logger.Infow(ctx, "NNI Port Status is: UP & Vlan Enabled", log.Fields{"Device": d, "port": p})
//Push Igmp DS Control Flows
err := mvp.ApplyIgmpDSFlowForMvp(d.Name)
if err != nil {
logger.Errorw(ctx, "DS IGMP Flow Add Failed for device",
log.Fields{"Reason": err.Error(), "device": d.Name})
}
//Trigger Join for static channels
if channelList, containsStatic := mvp.getAllStaticChannels(); containsStatic {
mvp.ProcessStaticGroup(d.Name, channelList, true)
} else {
logger.Infow(ctx, "No Static Channels Present", log.Fields{"mvp": mvp.Name, "Mvlan": mvp.Mvlan})
}
}
return true
}
mvp.mvpLock.RLock()
defer mvp.mvpLock.RUnlock()
GetApplication().DevicesDisc.Range(pushIgmpFlows)
}
//removeIgmpFlows - Removes all IGMP related flows (generic DS flow)
func (mvp *MvlanProfile) removeIgmpFlows(oltSerialNum string) {
if d := GetApplication().GetDeviceBySerialNo(oltSerialNum); d != nil {
p := d.GetPort(d.NniPort)
if p != nil {
logger.Infow(ctx, "NNI Port Status is: UP", log.Fields{"Device": d, "port": p})
err := mvp.RemoveIgmpDSFlowForMvp(d.Name)
if err != nil {
logger.Errorw(ctx, "DS IGMP Flow Del Failed", log.Fields{"Reason": err.Error(), "device": d.Name})
}
}
}
}*/
//removeIgmpMcastFlows - Removes all IGMP related flows (generic DS flow & static group flows)
func (mvp *MvlanProfile) removeIgmpMcastFlows(oltSerialNum string) {
mvp.mvpLock.RLock()
defer mvp.mvpLock.RUnlock()
if d := GetApplication().GetDeviceBySerialNo(oltSerialNum); d != nil {
p := d.GetPort(d.NniPort)
if p != nil {
logger.Infow(ctx, "NNI Port Status is: UP", log.Fields{"Device": d, "port": p})
// ***Do not change the order***
// When Vlan is disabled, the process end is determined by the DS Igmp flag in device
//Trigger Leave for static channels
if channelList, containsStatic := mvp.getAllStaticChannels(); containsStatic {
mvp.ProcessStaticGroup(d.Name, channelList, false)
} else {
logger.Infow(ctx, "No Static Channels Present", log.Fields{"mvp": mvp.Name, "Mvlan": mvp.Mvlan})
}
//Remove all dynamic members for the Mvlan Profile
GetApplication().IgmpGroups.Range(func(key, value interface{}) bool {
ig := value.(*IgmpGroup)
if ig.Mvlan == mvp.Mvlan {
igd := ig.Devices[d.Name]
ig.DelIgmpGroupDevice(igd)
if ig.NumDevicesActive() == 0 {
GetApplication().DelIgmpGroup(ig)
}
}
return true
})
//Remove DS Igmp trap flow
err := mvp.RemoveIgmpDSFlowForMvp(d.Name)
if err != nil {
logger.Errorw(ctx, "DS IGMP Flow Del Failed", log.Fields{"Reason": err.Error(), "device": d.Name})
}
}
}
}
// ApplyIgmpDSFlowForMvp to apply Igmp DS flow for mvlan.
func (mvp *MvlanProfile) ApplyIgmpDSFlowForMvp(device string) error {
va := GetApplication()
dIntf, ok := va.DevicesDisc.Load(device)
if !ok {
return errors.New("Device Doesn't Exist")
}
d := dIntf.(*VoltDevice)
mvlan := mvp.Mvlan
flowAlreadyApplied, ok := d.IgmpDsFlowAppliedForMvlan[uint16(mvlan)]
if !ok || !flowAlreadyApplied {
flows, err := mvp.BuildIgmpDSFlows(device)
if err == nil {
err = cntlr.GetController().AddFlows(d.NniPort, device, flows)
if err != nil {
logger.Warnw(ctx, "Configuring IGMP Flow for device failed ", log.Fields{"Device": device, "err": err})
return err
}
d.IgmpDsFlowAppliedForMvlan[uint16(mvlan)] = true
logger.Infow(ctx, "Updating voltDevice that IGMP DS flow as \"added\" for ",
log.Fields{"device": d.SerialNum, "mvlan": mvlan})
} else {
logger.Errorw(ctx, "DS IGMP Flow Add Failed", log.Fields{"Reason": err.Error(), "Mvlan": mvlan})
}
}
return nil
}
// RemoveIgmpDSFlowForMvp to remove Igmp DS flow for mvlan.
func (mvp *MvlanProfile) RemoveIgmpDSFlowForMvp(device string) error {
va := GetApplication()
mvlan := mvp.Mvlan
dIntf, ok := va.DevicesDisc.Load(device)
if !ok {
return errors.New("Device Doesn't Exist")
}
d := dIntf.(*VoltDevice)
/* No need of strict check during DS IGMP deletion
flowAlreadyApplied, ok := d.IgmpDsFlowAppliedForMvlan[uint16(mvlan)]
if ok && flowAlreadyApplied
*/
flows, err := mvp.BuildIgmpDSFlows(device)
if err == nil {
flows.ForceAction = true
err = mvp.DelFlows(d, flows)
if err != nil {
logger.Warnw(ctx, "De-Configuring IGMP Flow for device failed ", log.Fields{"Device": device, "err": err})
return err
}
d.IgmpDsFlowAppliedForMvlan[uint16(mvlan)] = false
logger.Infow(ctx, "Updating voltDevice that IGMP DS flow as \"removed\" for ",
log.Fields{"device": d.SerialNum, "mvlan": mvlan})
} else {
logger.Errorw(ctx, "DS IGMP Flow Del Failed", log.Fields{"Reason": err.Error()})
}
return nil
}
// BuildIgmpDSFlows to build Igmp DS flows for NNI port
func (mvp *MvlanProfile) BuildIgmpDSFlows(device string) (*of.VoltFlow, error) {
dIntf, ok := GetApplication().DevicesDisc.Load(device)
if !ok {
return nil, errors.New("Device Doesn't Exist")
}
d := dIntf.(*VoltDevice)
logger.Infow(ctx, "Building DS IGMP Flow for NNI port", log.Fields{"vs": d.NniPort, "Mvlan": mvp.Mvlan})
flow := &of.VoltFlow{}
flow.SubFlows = make(map[uint64]*of.VoltSubFlow)
subFlow := of.NewVoltSubFlow()
subFlow.SetTableID(0)
subFlow.SetMatchVlan(mvp.Mvlan)
nniPort, err := GetApplication().GetNniPort(device)
if err != nil {
return nil, err
}
nniPortID, err1 := GetApplication().GetPortID(nniPort)
if err1 != nil {
return nil, errors.New("Unknown NNI outport")
}
subFlow.SetInPort(nniPortID)
subFlow.SetIgmpMatch()
subFlow.SetReportToController()
subFlow.Cookie = uint64(nniPortID)<<32 | uint64(mvp.Mvlan)
subFlow.Priority = of.IgmpFlowPriority
flow.SubFlows[subFlow.Cookie] = subFlow
logger.Infow(ctx, "Built DS IGMP flow", log.Fields{"cookie": subFlow.Cookie, "subflow": subFlow})
return flow, nil
}
//updateStaticGroups - Generates static joins & leaves for newly added and removed static channels respectively
func (mvp *MvlanProfile) updateStaticGroups(deviceID string, added []net.IP, removed []net.IP) {
//Update static group configs for all associated devices
updateGroups := func(key interface{}, value interface{}) bool {
d := value.(*VoltDevice)
if mvp.DevicesList[d.SerialNum] == Nil {
logger.Infow(ctx, "Mvlan Profile not configure for device", log.Fields{"Device": d, "Profile Device List": mvp.DevicesList})
return true
}
//TODO if mvp.IsChannelBasedGroup {
mvp.ProcessStaticGroup(d.Name, added, true)
mvp.ProcessStaticGroup(d.Name, removed, false)
//}
return true
}
if deviceID != "" {
vd := GetApplication().GetDevice(deviceID)
updateGroups(deviceID, vd)
} else {
GetApplication().DevicesDisc.Range(updateGroups)
}
}
//updateDynamicGroups - Generates joins with updated sources for existing channels
func (mvp *MvlanProfile) updateDynamicGroups(deviceID string, added []net.IP, removed []net.IP) {
//mvlan := mvp.Mvlan
va := GetApplication()
updateGroups := func(key interface{}, value interface{}) bool {
d := value.(*VoltDevice)
if mvp.DevicesList[d.SerialNum] == Nil {
logger.Infow(ctx, "Mvlan Profile not configure for device", log.Fields{"Device": d, "Profile Device List": mvp.DevicesList})
return true
}
for _, groupAddr := range added {
_, gName := va.GetMvlanProfileForMcIP(mvp.Name, groupAddr)
grpKey := mvp.generateGroupKey(gName, groupAddr.String())
logger.Debugw(ctx, "IGMP Group", log.Fields{"Group": grpKey, "groupAddr": groupAddr})
if igIntf, ok := va.IgmpGroups.Load(grpKey); ok {
ig := igIntf.(*IgmpGroup)
if igd, ok := ig.getIgmpGroupDevice(d.Name); ok {
if igcIntf, ok := igd.GroupChannels.Load(groupAddr.String()); ok {
igc := igcIntf.(*IgmpGroupChannel)
incl := false
var ip []net.IP
var groupModified = false
if _, ok := mvp.Proxy[igc.GroupName]; ok {
if mvp.Proxy[igc.GroupName].Mode == common.Include {
incl = true
}
ip = mvp.Proxy[igc.GroupName].SourceList
}
for port, igp := range igc.NewReceivers {
// Process the include/exclude list which may end up modifying the group
if change, _ := igc.ProcessSources(port, ip, incl); change {
groupModified = true
}
igc.ProcessMode(port, incl)
if err := igp.WriteToDb(igc.Mvlan, igc.GroupAddr, igc.Device); err != nil {
logger.Errorw(ctx, "Igmp group port Write to DB failed", log.Fields{"mvlan": igc.Mvlan, "GroupAddr": igc.GroupAddr})
}
}
// If the group is modified as this is the first receiver or due to include/exclude list modification
// send a report to the upstream multicast servers
if groupModified {
logger.Debug(ctx, "Group Modified and IGMP report sent to the upstream server")
igc.SendReport(false)
}
if err := igc.WriteToDb(); err != nil {
logger.Errorw(ctx, "Igmp group channel Write to DB failed", log.Fields{"mvlan": igc.Mvlan, "GroupAddr": igc.GroupAddr})
}
}
}
}
}
return true
}
if deviceID != "" {
vd := GetApplication().GetDevice(deviceID)
updateGroups(deviceID, vd)
} else {
GetApplication().DevicesDisc.Range(updateGroups)
}
}
//GroupsUpdated - Handles removing of Igmp Groups, flows & group table entries for
//channels removed as part of update
func (mvp *MvlanProfile) GroupsUpdated(deviceID string) {
deleteChannelIfRemoved := func(key interface{}, value interface{}) bool {
ig := value.(*IgmpGroup)
if ig.Mvlan != mvp.Mvlan {
return true
}
grpName := ig.GroupName
logger.Infow(ctx, "###Update Cycle", log.Fields{"IG": ig.GroupName, "Addr": ig.GroupAddr})
//Check if group exists and remove the entire group object otherwise
if currentChannels := mvp.Groups[grpName]; currentChannels != nil {
if mvp.IsChannelBasedGroup {
channelPresent := doesIPMatch(ig.GroupAddr, currentChannels.McIPs)
if channelPresent || mvp.isChannelStatic(ig.GroupAddr) {
return true
}
} else {
allExistingChannels := ig.GetAllIgmpChannelForDevice(deviceID)
for channel := range allExistingChannels {
channelIP := net.ParseIP(channel)
channelPresent := mvp.IsChannelPresent(channelIP, currentChannels.McIPs, mvp.IsStaticGroup(ig.GroupName))
if channelPresent {
staticChannel := mvp.isChannelStatic(channelIP)
logger.Infow(ctx, "###Channel Comparision", log.Fields{"staticChannel": staticChannel, "Group": mvp.IsStaticGroup(ig.GroupName), "Channel": channel})
// Logic:
// If channel is Static & existing Group is also static - No migration required
// If channel is not Static & existing Group is also not static - No migration required
// If channel is Static and existing Group is not static - Migrate (from dynamic to static)
// (Channel already part of dynamic, added to static)
// If channel is not Static but existing Group is static - Migrate (from static to dynamic)
// (Channel removed from satic but part of dynamic)
if (staticChannel != mvp.IsStaticGroup(ig.GroupName)) || (ig.IsGroupStatic != mvp.IsStaticGroup(ig.GroupName)) { // Equivalent of XOR
ig.HandleGroupMigration(deviceID, channelIP)
} else {
if (ig.IsGroupStatic) && mvp.IsStaticGroup(ig.GroupName) {
if ig.GroupName != mvp.GetStaticGroupName(channelIP) {
ig.HandleGroupMigration(deviceID, channelIP)
}
}
continue
}
} else {
logger.Debugw(ctx, "Channel Removed", log.Fields{"Channel": channel, "Group": grpName})
ig.DelIgmpChannel(deviceID, net.ParseIP(channel))
if ig.NumDevicesActive() == 0 {
GetApplication().DelIgmpGroup(ig)
}
}
}
ig.IsGroupStatic = mvp.IsStaticGroup(ig.GroupName)
if err := ig.WriteToDb(); err != nil {
logger.Errorw(ctx, "Igmp group Write to DB failed", log.Fields{"groupName": ig.GroupName})
}
return true
}
}
logger.Debugw(ctx, "Group Removed", log.Fields{"Channel": ig.GroupAddr, "Group": grpName, "ChannelBasedGroup": ig.IsChannelBasedGroup})
ig.DelIgmpGroup()
logger.Debugw(ctx, "Removed Igmp Group", log.Fields{"Channel": ig.GroupAddr, "Group": grpName})
return true
}
GetApplication().IgmpGroups.Range(deleteChannelIfRemoved)
}
// IsChannelPresent to check if channel is present
func (mvp *MvlanProfile) IsChannelPresent(channelIP net.IP, groupChannelList []string, IsStaticGroup bool) bool {
// Only in case of static group, migration need to be supported.
// Dynamic to dynamic group migration not supported currently
if doesIPMatch(channelIP, groupChannelList) || mvp.isChannelStatic(channelIP) {
return true
} else if IsStaticGroup {
return (mvp.GetMvlanGroup(channelIP) != "")
}
return false
}
// GetMvlanProfileForMcIP - Get an MVLAN profile for a given MC IP. This is used when an
// IGMP report is received from the PON port. The MVLAN profile
// located is used to idnetify the MC VLAN used in upstream for
// join/leave
func (va *VoltApplication) GetMvlanProfileForMcIP(profileName string, ip net.IP) (*MvlanProfile, string) {
if mvpIntf, ok := va.MvlanProfilesByName.Load(profileName); ok {
mvp := mvpIntf.(*MvlanProfile)
if grpName := mvp.GetMvlanGroup(ip); grpName != "" {
return mvp, grpName
}
} else {
logger.Warnw(ctx, "Mvlan Profile not found for given profile name", log.Fields{"Profile": profileName})
}
return nil, ""
}
// GetMvlanGroup to get mvlan group
func (mvp *MvlanProfile) GetMvlanGroup(ip net.IP) string {
//Check for Static Group First
if mvp.containsStaticChannels() {
grpName := mvp.GetStaticGroupName(ip)
if grpName != "" {
return grpName
}
}
for _, mvg := range mvp.Groups {
if mvg.Wildcard {
return mvg.Name
}
if doesIPMatch(ip, mvg.McIPs) {
return mvg.Name
}
}
return ""
}
// IgmpTick for igmp tick info
func (va *VoltApplication) IgmpTick() {
tickCount++
if (tickCount % 1000) == 0 {
logger.Debugw(ctx, "Time @ Tick", log.Fields{"Tick": tickCount, "Time": time.Now()})
}
igmptick := func(key interface{}, value interface{}) bool {
ig := value.(*IgmpGroup)
if ig.NumDevicesActive() != 0 {
if tickCount%10 == ig.Hash()%10 {
ig.IgmpGroupLock.Lock()
ig.Tick()
ig.IgmpGroupLock.Unlock()
if ig.NumDevicesActive() == 0 {
va.DelIgmpGroup(ig)
}
}
}
return true
}
va.IgmpGroups.Range(igmptick)
}
// Tick to add Tick Task
func (va *VoltApplication) Tick() {
tt := NewTickTask()
va.IgmpTasks.AddTask(tt)
// va.IgmpTick()
}
//AddIgmpProfile for addition of IGMP Profile
func (va *VoltApplication) AddIgmpProfile(igmpProfileConfig *common.IGMPConfig) error {
var igmpProfile *IgmpProfile
if igmpProfileConfig.ProfileID == DefaultIgmpProfID {
logger.Info(ctx, "Updating default IGMP profile")
return va.UpdateIgmpProfile(igmpProfileConfig)
}
igmpProfile = va.checkIgmpProfileMap(igmpProfileConfig.ProfileID)
if igmpProfile == nil {
igmpProfile = newIgmpProfile(igmpProfileConfig)
} else {
logger.Errorw(ctx, "IGMP profile already exists", log.Fields{"IgmpProfile": igmpProfileConfig.ProfileID})
return errors.New("IGMP Profile already exists")
}
va.storeIgmpProfileMap(igmpProfileConfig.ProfileID, igmpProfile)
if err := igmpProfile.WriteToDb(); err != nil {
logger.Errorw(ctx, "Igmp profile Write to DB failed", log.Fields{"profileID": igmpProfile.ProfileID})
}
return nil
}
func newIgmpProfile(igmpProfileConfig *common.IGMPConfig) *IgmpProfile {
var igmpProfile IgmpProfile
igmpProfile.ProfileID = igmpProfileConfig.ProfileID
igmpProfile.UnsolicitedTimeOut = uint32(igmpProfileConfig.UnsolicitedTimeOut)
igmpProfile.MaxResp = uint32(igmpProfileConfig.MaxResp)
keepAliveInterval := uint32(igmpProfileConfig.KeepAliveInterval)
//KeepAliveInterval should have a min of 10 seconds
if keepAliveInterval < MinKeepAliveInterval {
keepAliveInterval = MinKeepAliveInterval
logger.Infow(ctx, "Auto adjust keepAliveInterval - Value < 10", log.Fields{"Received": igmpProfileConfig.KeepAliveInterval, "Configured": keepAliveInterval})
}
igmpProfile.KeepAliveInterval = keepAliveInterval
igmpProfile.KeepAliveCount = uint32(igmpProfileConfig.KeepAliveCount)
igmpProfile.LastQueryInterval = uint32(igmpProfileConfig.LastQueryInterval)
igmpProfile.LastQueryCount = uint32(igmpProfileConfig.LastQueryCount)
igmpProfile.FastLeave = *igmpProfileConfig.FastLeave
igmpProfile.PeriodicQuery = *igmpProfileConfig.PeriodicQuery
igmpProfile.IgmpCos = uint8(igmpProfileConfig.IgmpCos)
igmpProfile.WithRAUpLink = *igmpProfileConfig.WithRAUpLink
igmpProfile.WithRADownLink = *igmpProfileConfig.WithRADownLink
if igmpProfileConfig.IgmpVerToServer == "2" || igmpProfileConfig.IgmpVerToServer == "v2" {
igmpProfile.IgmpVerToServer = "2"
} else {
igmpProfile.IgmpVerToServer = "3"
}
igmpProfile.IgmpSourceIP = net.ParseIP(igmpProfileConfig.IgmpSourceIP)
return &igmpProfile
}
// checkIgmpProfileMap to get Igmp Profile. If not found return nil
func (va *VoltApplication) checkIgmpProfileMap(name string) *IgmpProfile {
if igmpProfileIntf, ok := va.IgmpProfilesByName.Load(name); ok {
return igmpProfileIntf.(*IgmpProfile)
}
return nil
}
// newDefaultIgmpProfile Igmp profiles with default values
func newDefaultIgmpProfile() *IgmpProfile {
return &IgmpProfile{
ProfileID: DefaultIgmpProfID,
UnsolicitedTimeOut: 60,
MaxResp: 10, // seconds
KeepAliveInterval: 60, // seconds
KeepAliveCount: 3, // TODO - May not be needed
LastQueryInterval: 0, // TODO - May not be needed
LastQueryCount: 0, // TODO - May not be needed
FastLeave: true,
PeriodicQuery: false, // TODO - May not be needed
IgmpCos: 7, //p-bit value included in the IGMP packet
WithRAUpLink: false, // TODO - May not be needed
WithRADownLink: false, // TODO - May not be needed
IgmpVerToServer: "3",
IgmpSourceIP: net.ParseIP("172.27.0.1"), // This will be replaced by configuration
}
}
func (va *VoltApplication) resetIgmpProfileToDefault() {
igmpProf := va.getIgmpProfileMap(DefaultIgmpProfID)
defIgmpProf := newDefaultIgmpProfile()
igmpProf.UnsolicitedTimeOut = defIgmpProf.UnsolicitedTimeOut
igmpProf.MaxResp = defIgmpProf.MaxResp
igmpProf.KeepAliveInterval = defIgmpProf.KeepAliveInterval
igmpProf.KeepAliveCount = defIgmpProf.KeepAliveCount
igmpProf.LastQueryInterval = defIgmpProf.LastQueryInterval
igmpProf.LastQueryCount = defIgmpProf.LastQueryCount
igmpProf.FastLeave = defIgmpProf.FastLeave
igmpProf.PeriodicQuery = defIgmpProf.PeriodicQuery
igmpProf.IgmpCos = defIgmpProf.IgmpCos
igmpProf.WithRAUpLink = defIgmpProf.WithRAUpLink
igmpProf.WithRADownLink = defIgmpProf.WithRADownLink
igmpProf.IgmpVerToServer = defIgmpProf.IgmpVerToServer
igmpProf.IgmpSourceIP = defIgmpProf.IgmpSourceIP
if err := igmpProf.WriteToDb(); err != nil {
logger.Errorw(ctx, "Igmp profile Write to DB failed", log.Fields{"profileID": igmpProf.ProfileID})
}
}
// getIgmpProfileMap to get Igmp Profile. If not found return default IGMP config
func (va *VoltApplication) getIgmpProfileMap(name string) *IgmpProfile {
if igmpProfileIntf, ok := va.IgmpProfilesByName.Load(name); ok {
return igmpProfileIntf.(*IgmpProfile)
}
// There will be always a default igmp profile.
defaultIgmpProfileIntf, _ := va.IgmpProfilesByName.Load(DefaultIgmpProfID)
return defaultIgmpProfileIntf.(*IgmpProfile)
}
// storeIgmpProfileMap to store Igmp Profile
func (va *VoltApplication) storeIgmpProfileMap(name string, igmpProfile *IgmpProfile) {
va.IgmpProfilesByName.Store(name, igmpProfile)
}
// deleteIgmpProfileMap to delete Igmp Profile
func (va *VoltApplication) deleteIgmpProfileMap(name string) {
va.IgmpProfilesByName.Delete(name)
}
// WriteToDb is utility to write Igmp Config Info to database
func (igmpProfile *IgmpProfile) WriteToDb() error {
igmpProfile.Version = database.PresentVersionMap[database.IgmpProfPath]
b, err := json.Marshal(igmpProfile)
if err != nil {
return err
}
if err1 := db.PutIgmpProfile(igmpProfile.ProfileID, string(b)); err1 != nil {
return err1
}
return nil
}
//DelIgmpProfile for addition of IGMP Profile
func (va *VoltApplication) DelIgmpProfile(igmpProfileConfig *common.IGMPConfig) error {
// Deletion of default igmp profile is blocked from submgr. Keeping additional check for safety.
if igmpProfileConfig.ProfileID == DefaultIgmpProfID {
logger.Info(ctx, "Resetting default IGMP profile")
va.resetIgmpProfileToDefault()
return nil
}
igmpProfile := va.checkIgmpProfileMap(igmpProfileConfig.ProfileID)
if igmpProfile == nil {
logger.Warnw(ctx, "Igmp Profile not found. Unable to delete", log.Fields{"Profile ID": igmpProfileConfig.ProfileID})
return nil
}
va.deleteIgmpProfileMap(igmpProfileConfig.ProfileID)
_ = db.DelIgmpProfile(igmpProfileConfig.ProfileID)
return nil
}
//UpdateIgmpProfile for addition of IGMP Profile
func (va *VoltApplication) UpdateIgmpProfile(igmpProfileConfig *common.IGMPConfig) error {
igmpProfile := va.checkIgmpProfileMap(igmpProfileConfig.ProfileID)
if igmpProfile == nil {
logger.Errorw(ctx, "Igmp Profile not found. Unable to update", log.Fields{"Profile ID": igmpProfileConfig.ProfileID})
return errors.New("IGMP Profile not found")
}
igmpProfile.ProfileID = igmpProfileConfig.ProfileID
igmpProfile.UnsolicitedTimeOut = uint32(igmpProfileConfig.UnsolicitedTimeOut)
igmpProfile.MaxResp = uint32(igmpProfileConfig.MaxResp)
keepAliveInterval := uint32(igmpProfileConfig.KeepAliveInterval)
//KeepAliveInterval should have a min of 10 seconds
if keepAliveInterval < MinKeepAliveInterval {
keepAliveInterval = MinKeepAliveInterval
logger.Infow(ctx, "Auto adjust keepAliveInterval - Value < 10", log.Fields{"Received": igmpProfileConfig.KeepAliveInterval, "Configured": keepAliveInterval})
}
igmpProfile.KeepAliveInterval = keepAliveInterval
igmpProfile.KeepAliveCount = uint32(igmpProfileConfig.KeepAliveCount)
igmpProfile.LastQueryInterval = uint32(igmpProfileConfig.LastQueryInterval)
igmpProfile.LastQueryCount = uint32(igmpProfileConfig.LastQueryCount)
igmpProfile.FastLeave = *igmpProfileConfig.FastLeave
igmpProfile.PeriodicQuery = *igmpProfileConfig.PeriodicQuery
igmpProfile.IgmpCos = uint8(igmpProfileConfig.IgmpCos)
igmpProfile.WithRAUpLink = *igmpProfileConfig.WithRAUpLink
igmpProfile.WithRADownLink = *igmpProfileConfig.WithRADownLink
if igmpProfileConfig.IgmpVerToServer == "2" || igmpProfileConfig.IgmpVerToServer == "v2" {
igmpProfile.IgmpVerToServer = "2"
} else {
igmpProfile.IgmpVerToServer = "3"
}
if igmpProfileConfig.IgmpSourceIP != "" {
igmpProfile.IgmpSourceIP = net.ParseIP(igmpProfileConfig.IgmpSourceIP)
}
if err := igmpProfile.WriteToDb(); err != nil {
logger.Errorw(ctx, "Igmp profile Write to DB failed", log.Fields{"profileID": igmpProfile.ProfileID})
}
return nil
}
// RestoreIGMPProfilesFromDb to read from the DB and restore IGMP Profiles
func (va *VoltApplication) RestoreIGMPProfilesFromDb() {
// Loading IGMP profiles
igmpProfiles, _ := db.GetIgmpProfiles()
for _, igmpProfile := range igmpProfiles {
b, ok := igmpProfile.Value.([]byte)
if !ok {
logger.Warn(ctx, "The value type is not []byte")
continue
}
var igmpProf IgmpProfile
err := json.Unmarshal(b, &igmpProf)
if err != nil {
logger.Warn(ctx, "Unmarshal of IGMP Profile failed")
continue
}
va.storeIgmpProfileMap(igmpProf.ProfileID, &igmpProf)
logger.Infow(ctx, "Restored Igmp Profile", log.Fields{"Conf": igmpProf})
}
}
// InitIgmpSrcMac for initialization of igmp source mac
func (va *VoltApplication) InitIgmpSrcMac() {
srcMac, err := getPodMacAddr()
if err != nil {
igmpSrcMac = "00:11:11:11:11:11"
return
}
igmpSrcMac = srcMac
}
// removeIPFromList to remove ip from the list
func removeIPFromList(s []net.IP, value net.IP) []net.IP {
i := 0
for i = 0; i < len(s); i++ {
if s[i].Equal(value) {
break
}
}
if i != len(s) {
//It means value is found in the slice
return append(s[0:i], s[i+1:]...)
}
return s
}
// DelMvlanProfile for deletion of a MVLAN group
func (va *VoltApplication) DelMvlanProfile(name string) error {
if mvpIntf, ok := va.MvlanProfilesByName.Load(name); ok {
mvp := mvpIntf.(*MvlanProfile)
if len(mvp.DevicesList) == 0 {
mvp.DeleteInProgress = true
mvp.DelFromDb()
va.deleteMvlansMap(mvp.Mvlan, name)
logger.Debugw(ctx, "Deleted MVLAN Profile", log.Fields{"Name": mvp.Name})
} else {
logger.Errorw(ctx, "Unable to delete Mvlan Profile as there is still an OLT attached to it", log.Fields{"Name": mvp.Name,
"Device List": mvp.DevicesList})
return errors.New("MVLAN attached to devices")
}
return nil
}
logger.Errorw(ctx, "MVLAN Profile not found", log.Fields{"MvlanProfile Name": name})
return nil
}
// ReceiverUpInd for receiver up indication
func (va *VoltApplication) ReceiverUpInd(device string, port string, mvpName string, vlan of.VlanType, pbits []of.PbitType) {
logger.Infow(ctx, "Receiver Indication: UP", log.Fields{"device": device, "port": port, "MVP": mvpName, "vlan": vlan, "pbits": pbits})
if mvpIntf, ok := va.MvlanProfilesByName.Load(mvpName); ok {
mvp := mvpIntf.(*MvlanProfile)
if devIntf, ok := va.DevicesDisc.Load(device); ok {
dev := devIntf.(*VoltDevice)
proxyCfg, proxyIP, _ := getIgmpProxyCfgAndIP(mvp.Mvlan, dev.SerialNum)
for _, pbit := range pbits {
sendGeneralQuery(device, port, vlan, uint8(pbit), proxyCfg, proxyIP)
}
} else {
logger.Warnw(ctx, "Device not found for given port", log.Fields{"device": device, "port": port})
}
} else {
logger.Warnw(ctx, "Mvlan Profile not found for given profileName", log.Fields{"MVP": mvpName, "vlan": vlan})
}
}
// sendGeneralQuery to send general query
func sendGeneralQuery(device string, port string, cVlan of.VlanType, pbit uint8, proxyCfg *IgmpProfile, proxyIP *net.IP) {
if queryPkt, err := Igmpv2QueryPacket(NullIPAddr, cVlan, *proxyIP, pbit, proxyCfg.MaxResp); err == nil {
if err := cntlr.GetController().PacketOutReq(device, port, port, queryPkt, false); err != nil {
logger.Warnw(ctx, "General Igmpv2 Query Failed to send", log.Fields{"Device": device, "Port": port, "Packet": queryPkt, "Pbit": pbit})
} else {
logger.Debugw(ctx, "General Igmpv2 Query Sent", log.Fields{"Device": device, "Port": port, "Packet": queryPkt, "Pbit": pbit})
}
}
if getVersion(proxyCfg.IgmpVerToServer) == IgmpVersion3 {
if queryPkt, err := Igmpv3QueryPacket(NullIPAddr, cVlan, *proxyIP, pbit, proxyCfg.MaxResp); err == nil {
if err := cntlr.GetController().PacketOutReq(device, port, port, queryPkt, false); err != nil {
logger.Warnw(ctx, "General Igmpv3 Query Failed to send", log.Fields{"Device": device, "Port": port, "Packet": queryPkt, "Pbit": pbit})
} else {
logger.Debugw(ctx, "General Igmpv3 Query Sent", log.Fields{"Device": device, "Port": port, "Packet": queryPkt, "Pbit": pbit})
}
}
}
}
// ReceiverDownInd to send receiver down indication
func (va *VoltApplication) ReceiverDownInd(device string, port string) {
logger.Infow(ctx, " Receiver Indication: DOWN", log.Fields{"device": device, "port": port})
ponPortID := va.GetPonPortID(device, port)
del := func(key interface{}, value interface{}) bool {
ig := value.(*IgmpGroup)
ig.IgmpGroupLock.Lock()
ig.DelReceiveronDownInd(device, port, ponPortID)
ig.IgmpGroupLock.Unlock()
if ig.NumDevicesActive() == 0 {
va.DelIgmpGroup(ig)
}
return true
}
va.IgmpGroups.Range(del)
}
// doesIPMatch to check if ip match with any ip from the list
func doesIPMatch(ip net.IP, ipsOrRange []string) bool {
for _, ipOrRange := range ipsOrRange {
if strings.Contains(ipOrRange, "-") {
var splits = strings.Split(ipOrRange, "-")
ipStart := util.IP2LongConv(net.ParseIP(splits[0]))
ipEnd := util.IP2LongConv(net.ParseIP(splits[1]))
if ipEnd < ipStart {
return false
}
ipInt := util.IP2LongConv(ip)
if ipInt >= ipStart && ipInt <= ipEnd {
return true
}
} else if ip.Equal(net.ParseIP(ipOrRange)) {
return true
}
}
return false
}
// ProcessStaticGroup - Process Static Join/Leave Req for static channels
func (mvp *MvlanProfile) ProcessStaticGroup(device string, groupAddresses []net.IP, isJoin bool) {
logger.Debugw(ctx, "Received Static Group Request", log.Fields{"Device": device, "Join": isJoin, "Group Address List": groupAddresses})
mvlan := mvp.Mvlan
va := GetApplication()
//TODO - Handle bulk add of groupAddr
for _, groupAddr := range groupAddresses {
ig := mvp.GetStaticIgmpGroup(groupAddr)
if isJoin {
vd := va.GetDevice(device)
igmpProf, _, _ := getIgmpProxyCfgAndIP(mvlan, vd.SerialNum)
ver := igmpProf.IgmpVerToServer
if ig == nil {
// First time group Creation: Create the IGMP group and then add the receiver to the group
logger.Infow(ctx, "Static IGMP Add received for new group", log.Fields{"Addr": groupAddr, "Port": StaticPort})
if ig := GetApplication().AddIgmpGroup(mvp.Name, groupAddr, device); ig != nil {
ig.IgmpGroupLock.Lock()
ig.AddReceiver(device, StaticPort, groupAddr, nil, getVersion(ver),
0, 0, 0xFF)
ig.IgmpGroupLock.Unlock()
} else {
logger.Warnw(ctx, "Static IGMP Group Creation Failed", log.Fields{"Addr": groupAddr})
}
} else {
//Converting existing dynamic group to static group
if !mvp.IsStaticGroup(ig.GroupName) {
ig.updateGroupName(ig.GroupName)
}
// Update case: If the IGMP group is already created. just add the receiver
logger.Infow(ctx, "Static IGMP Add received for existing group", log.Fields{"Addr": groupAddr, "Port": StaticPort})
ig.IgmpGroupLock.Lock()
ig.AddReceiver(device, StaticPort, groupAddr, nil, getVersion(ver),
0, 0, 0xFF)
ig.IgmpGroupLock.Unlock()
}
} else if ig != nil {
logger.Infow(ctx, "Static IGMP Del received for existing group", log.Fields{"Addr": groupAddr, "Port": StaticPort})
if ig.IsChannelBasedGroup {
grpName := mvp.GetMvlanGroup(ig.GroupAddr)
if grpName != "" {
ig.IgmpGroupLock.Lock()
ig.DelReceiver(device, StaticPort, groupAddr, nil, 0xFF)
ig.IgmpGroupLock.Unlock()
ig.updateGroupName(grpName)
} else {
ig.DelIgmpGroup()
}
} else {
ig.IgmpGroupLock.Lock()
ig.DelReceiver(device, StaticPort, groupAddr, nil, 0xFF)
ig.IgmpGroupLock.Unlock()
}
if ig.NumDevicesActive() == 0 {
GetApplication().DelIgmpGroup(ig)
}
} else {
logger.Warnw(ctx, "Static IGMP Del received for unknown group", log.Fields{"Addr": groupAddr})
}
}
}
//getStaticChannelDiff - return the static channel newly added and removed from existing static group
func (mvp *MvlanProfile) getStaticChannelDiff() (newlyAdded []net.IP, removed []net.IP, common []net.IP) {
var commonChannels []net.IP
newChannelList, _ := mvp.getAllStaticChannels()
existingChannelList, _ := mvp.getAllOldGroupStaticChannels()
if len(existingChannelList) == 0 {
return newChannelList, []net.IP{}, []net.IP{}
}
for _, newChannel := range append([]net.IP{}, newChannelList...) {
for _, existChannel := range append([]net.IP{}, existingChannelList...) {
//Remove common channels between existing and new list
// The remaining in the below slices give the results
// Remaining in newChannelList: Newly added
// Remaining in existingChannelList: Removed channels
if existChannel.Equal(newChannel) {
existingChannelList = removeIPFromList(existingChannelList, existChannel)
newChannelList = removeIPFromList(newChannelList, newChannel)
commonChannels = append(commonChannels, newChannel)
logger.Infow(ctx, "#############Channel: "+existChannel.String()+" New: "+newChannel.String(), log.Fields{"Added": newChannelList, "Removed": existingChannelList})
break
}
}
}
return newChannelList, existingChannelList, commonChannels
}
//getGroupChannelDiff - return the channel newly added and removed from existing group
func (mvp *MvlanProfile) getGroupChannelDiff(newGroup *MvlanGroup, oldGroup *MvlanGroup) (newlyAdded []net.IP, removed []net.IP, common []net.IP) {
var commonChannels []net.IP
newChannelList, _ := newGroup.getAllChannels()
existingChannelList, _ := oldGroup.getAllChannels()
if len(existingChannelList) == 0 {
return newChannelList, []net.IP{}, []net.IP{}
}
for _, newChannel := range append([]net.IP{}, newChannelList...) {
for _, existChannel := range append([]net.IP{}, existingChannelList...) {
//Remove common channels between existing and new list
// The remaining in the below slices give the results
// Remaining in newChannelList: Newly added
// Remaining in existingChannelList: Removed channels
if existChannel.Equal(newChannel) {
existingChannelList = removeIPFromList(existingChannelList, existChannel)
newChannelList = removeIPFromList(newChannelList, newChannel)
commonChannels = append(commonChannels, newChannel)
logger.Infow(ctx, "#############Channel: "+existChannel.String()+" New: "+newChannel.String(), log.Fields{"Added": newChannelList, "Removed": existingChannelList})
break
}
}
}
return newChannelList, existingChannelList, commonChannels
}
// UpdateProfile - Updates the group & member info w.r.t the mvlan profile for the given device
func (mvp *MvlanProfile) UpdateProfile(deviceID string) {
logger.Infow(ctx, "Update Mvlan Profile task triggered", log.Fields{"Mvlan": mvp.Mvlan})
var removedStaticChannels []net.IP
addedStaticChannels := []net.IP{}
/* Taking mvpLock to protect the mvp groups and proxy */
mvp.mvpLock.RLock()
defer mvp.mvpLock.RUnlock()
serialNo := ""
if deviceID != "" {
if vd := GetApplication().GetDevice(deviceID); vd != nil {
serialNo = vd.SerialNum
if mvp.DevicesList[serialNo] != UpdateInProgress {
logger.Warnw(ctx, "Exiting Update Task since device not present in MvlanProfile", log.Fields{"Device": deviceID, "SerialNum": vd.SerialNum, "MvlanProfile": mvp})
return
}
} else {
logger.Errorw(ctx, "Volt Device not found. Stopping Update Mvlan Profile processing for device", log.Fields{"SerialNo": deviceID, "MvlanProfile": mvp})
return
}
}
//Update the groups based on static channels added & removed
if mvp.containsStaticChannels() {
addedStaticChannels, removedStaticChannels, _ = mvp.getStaticChannelDiff()
logger.Debugw(ctx, "Update Task - Static Group Changes", log.Fields{"Added": addedStaticChannels, "Removed": removedStaticChannels})
if len(addedStaticChannels) > 0 || len(removedStaticChannels) > 0 {
mvp.updateStaticGroups(deviceID, []net.IP{}, removedStaticChannels)
}
}
mvp.GroupsUpdated(deviceID)
if len(addedStaticChannels) > 0 {
mvp.updateStaticGroups(deviceID, addedStaticChannels, []net.IP{})
}
/* Need to handle if SSM params are modified for groups */
for key := range mvp.Groups {
_, _, commonChannels := mvp.getGroupChannelDiff(mvp.Groups[key], mvp.oldGroups[key])
if mvp.checkStaticGrpSSMProxyDiff(mvp.oldProxy[key], mvp.Proxy[key]) {
if mvp.Groups[key].IsStatic {
/* Static group proxy modified, need to trigger membership report with new mode/src-list for existing channels */
mvp.updateStaticGroups(deviceID, commonChannels, []net.IP{})
} else {
/* Dynamic group proxy modified, need to trigger membership report with new mode/src-list for existing channels */
mvp.updateDynamicGroups(deviceID, commonChannels, []net.IP{})
}
}
}
mvp.SetUpdateStatus(serialNo, NoOp)
if deviceID == "" || !mvp.isUpdateInProgress() {
mvp.oldGroups = nil
}
if err := mvp.WriteToDb(); err != nil {
logger.Errorw(ctx, "Mvlan profile write to DB failed", log.Fields{"ProfileName": mvp.Name})
}
logger.Debugw(ctx, "Updated MVLAN Profile", log.Fields{"VLAN": mvp.Mvlan, "Name": mvp.Name, "Grp IPs": mvp.Groups})
}
//checkStaticGrpSSMProxyDiff- return true if the proxy of oldGroup is modified in newGroup
func (mvp *MvlanProfile) checkStaticGrpSSMProxyDiff(oldProxy *MCGroupProxy, newProxy *MCGroupProxy) bool {
if oldProxy == nil && newProxy == nil {
return false
}
if (oldProxy == nil && newProxy != nil) ||
(oldProxy != nil && newProxy == nil) {
return true
}
if oldProxy.Mode != newProxy.Mode {
return true
}
oldSrcLst := oldProxy.SourceList
newSrcLst := newProxy.SourceList
oLen := len(oldSrcLst)
nLen := len(newSrcLst)
if oLen != nLen {
return true
}
visited := make([]bool, nLen)
/* check if any new IPs added in the src list, return true if present */
for i := 0; i < nLen; i++ {
found := false
element := newSrcLst[i]
for j := 0; j < oLen; j++ {
if visited[j] {
continue
}
if element.Equal(oldSrcLst[j]) {
visited[j] = true
found = true
break
}
}
if !found {
return true
}
}
visited = make([]bool, nLen)
/* check if any IPs removed from existing src list, return true if removed */
for i := 0; i < oLen; i++ {
found := false
element := oldSrcLst[i]
for j := 0; j < nLen; j++ {
if visited[j] {
continue
}
if element.Equal(newSrcLst[j]) {
visited[j] = true
found = true
break
}
}
if !found {
return true
}
}
return false
}
// ProcessMode process the received mode and updated the igp
func (igc *IgmpGroupChannel) ProcessMode(port string, incl bool) {
/* Update the mode in igp if the mode has changed */
igp := igc.GetReceiver(port)
if igp.Exclude && incl {
igp.Exclude = !incl
if igc.Exclude > 0 {
igc.Exclude--
}
} else if !incl && !igp.Exclude {
igp.Exclude = !incl
igc.Exclude++
}
}
func (ig *IgmpGroup) removeExpiredGroupFromDevice() {
ig.PendingPoolLock.Lock()
defer ig.PendingPoolLock.Unlock()
for device, timer := range ig.PendingGroupForDevice {
// To ensure no race-condition between the expiry time and the new Join,
// ensure the group exists in pending pool before deletion
groupExistsInPendingPool := true
if !time.Now().After(timer) {
continue
}
// Check if the IgmpGroup obj has no active member across any device
// If Yes, then this group is part of global pending pool (IgmpPendingPool), hence if expired,
// Remove only the IgmpGroup obj referenced to this device from global pool also.
if ig.NumDevicesActive() == 0 {
groupExistsInPendingPool = GetApplication().RemoveGroupFromPendingPool(device, ig)
}
// Remove the group entry from device and remove the IgmpDev Obj
// from IgmpGrp Pending pool
if groupExistsInPendingPool {
ig.DeleteIgmpGroupDevice(device)
}
}
}
//DeleteIgmpGroupDevice - removes the IgmpGroupDevice obj from IgmpGroup and database
func (ig *IgmpGroup) DeleteIgmpGroupDevice(device string) {
logger.Infow(ctx, "Deleting IgmpGroupDevice from IG Pending Pool", log.Fields{"Device": device, "GroupID": ig.GroupID, "GroupName": ig.GroupName, "GroupAddr": ig.GroupAddr.String(), "PendingDevices": len(ig.Devices)})
igd := ig.Devices[device]
igd.DelMcGroup(true)
delete(ig.Devices, device)
delete(ig.PendingGroupForDevice, device)
_ = db.DelIgmpDevice(igd.Mvlan, igd.GroupName, igd.GroupAddr, igd.Device)
//If the group is not associated to any other device, then the entire Igmp Group obj itself can be removed
if ig.NumDevicesAll() == 0 {
logger.Infow(ctx, "Deleting IgmpGroup as all pending groups has expired", log.Fields{"Device": device, "GroupID": ig.GroupID, "GroupName": ig.GroupName, "GroupAddr": ig.GroupAddr.String(), "PendingDevices": len(ig.Devices)})
GetApplication().DelIgmpGroup(ig)
return
}
if err := ig.WriteToDb(); err != nil {
logger.Errorw(ctx, "Igmp group Write to DB failed", log.Fields{"groupName": ig.GroupName})
}
}
//UpdateActiveChannelSubscriberAlarm - Updates the Active Channel Subscriber Alarm
func (mvp *MvlanProfile) UpdateActiveChannelSubscriberAlarm() {
va := GetApplication()
logger.Debugw(ctx, "Update of Active Channel Subscriber Alarm", log.Fields{"Mvlan": mvp.Mvlan})
for srNo := range mvp.DevicesList {
d := va.GetDeviceBySerialNo(srNo)
if d == nil {
logger.Warnw(ctx, "Device info not found", log.Fields{"Device_SrNo": srNo, "Mvlan": mvp.Mvlan})
return
}
d.Ports.Range(func(key, value interface{}) bool {
//port := key.(string)
vp := value.(*VoltPort)
if vp.Type != VoltPortTypeAccess {
return true
}
if mvp.MaxActiveChannels > vp.ActiveChannels && vp.ChannelPerSubAlarmRaised {
serviceName := GetMcastServiceForSubAlarm(vp, mvp)
logger.Debugw(ctx, "Clearing-SendActiveChannelPerSubscriberAlarm-due-to-update", log.Fields{"ActiveChannels": vp.ActiveChannels, "ServiceName": serviceName})
vp.ChannelPerSubAlarmRaised = false
} else if mvp.MaxActiveChannels < vp.ActiveChannels && !vp.ChannelPerSubAlarmRaised {
/* When the max active channel count is reduced via update, we raise an alarm.
But the previous excess channels still exist until a leave or expiry */
serviceName := GetMcastServiceForSubAlarm(vp, mvp)
logger.Debugw(ctx, "Raising-SendActiveChannelPerSubscriberAlarm-due-to-update", log.Fields{"ActiveChannels": vp.ActiveChannels, "ServiceName": serviceName})
vp.ChannelPerSubAlarmRaised = true
}
return true
})
}
}
//TriggerAssociatedFlowDelete - Re-trigger delete for pending delete flows
func (mvp *MvlanProfile) TriggerAssociatedFlowDelete(device string) bool {
mvp.mvpFlowLock.Lock()
cookieList := []uint64{}
flowMap := mvp.PendingDeleteFlow[device]
for cookie := range flowMap {
cookieList = append(cookieList, convertToUInt64(cookie))
}
mvp.mvpFlowLock.Unlock()
if len(cookieList) == 0 {
return false
}
for _, cookie := range cookieList {
if vd := GetApplication().GetDevice(device); vd != nil {
flow := &of.VoltFlow{}
flow.SubFlows = make(map[uint64]*of.VoltSubFlow)
subFlow := of.NewVoltSubFlow()
subFlow.Cookie = cookie
flow.SubFlows[cookie] = subFlow
logger.Infow(ctx, "Retriggering Vnet Delete Flow", log.Fields{"Device": device, "Mvlan": mvp.Mvlan.String(), "Cookie": cookie})
err := mvp.DelFlows(vd, flow)
if err != nil {
logger.Warnw(ctx, "De-Configuring IGMP Flow for device failed ", log.Fields{"Device": device, "err": err})
}
}
}
return true
}
// JsonMarshal wrapper function for json Marshal MvlanProfile
func (mvp *MvlanProfile) JsonMarshal() ([]byte, error) {
return json.Marshal(MvlanProfile{
Name: mvp.Name,
Mvlan: mvp.Mvlan,
PonVlan: mvp.PonVlan,
Groups: mvp.Groups,
Proxy: mvp.Proxy,
Version: mvp.Version,
IsPonVlanPresent: mvp.IsPonVlanPresent,
IsChannelBasedGroup: mvp.IsChannelBasedGroup,
DevicesList: mvp.DevicesList,
MaxActiveChannels: mvp.MaxActiveChannels,
PendingDeleteFlow: mvp.PendingDeleteFlow,
DeleteInProgress: mvp.DeleteInProgress,
IgmpServVersion: mvp.IgmpServVersion,
})
}