| /* |
| * Copyright 2022-present Open Networking Foundation |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package application |
| |
| import ( |
| "context" |
| "encoding/json" |
| "errors" |
| "net" |
| "reflect" |
| "strings" |
| "sync" |
| "time" |
| common "voltha-go-controller/internal/pkg/types" |
| |
| "github.com/google/gopacket" |
| "github.com/google/gopacket/layers" |
| |
| "voltha-go-controller/database" |
| cntlr "voltha-go-controller/internal/pkg/controller" |
| "voltha-go-controller/internal/pkg/of" |
| "voltha-go-controller/log" |
| ) |
| |
| const ( |
| // IgmpVersion0 constant (Default init value) |
| IgmpVersion0 uint8 = 0 |
| // IgmpVersion1 constant |
| IgmpVersion1 uint8 = 1 |
| // IgmpVersion2 constant |
| IgmpVersion2 uint8 = 2 |
| // IgmpVersion3 constant |
| IgmpVersion3 uint8 = 3 |
| // MinKeepAliveInterval constant |
| MinKeepAliveInterval uint32 = 10 |
| // MaxDiffKAIntervalResp constant |
| MaxDiffKAIntervalResp uint32 = 5 |
| // StaticGroup constant |
| StaticGroup string = "static" |
| // DynamicGroup constant |
| DynamicGroup string = "dynamic" |
| // StaticPort constant |
| StaticPort string = "static_port" |
| // DefaultIgmpProfID constant |
| DefaultIgmpProfID = "" |
| //GroupExpiryTime - group expiry time in minutes |
| GroupExpiryTime uint32 = 15 |
| ) |
| |
| const ( |
| // JoinUnsuccessful constant |
| JoinUnsuccessful string = "JOIN-UNSUCCESSFUL" |
| // JoinUnsuccessfulExceededIGMPChanel constant |
| JoinUnsuccessfulExceededIGMPChanel string = "Exceeded subscriber or PON port IGMP channels threshold" |
| // JoinUnsuccessfulAddFlowGroupFailed constant |
| JoinUnsuccessfulAddFlowGroupFailed string = "Failed to add flow or group for a channel" |
| // JoinUnsuccessfulGroupNotConfigured constant |
| JoinUnsuccessfulGroupNotConfigured string = "Join received from a subscriber on non-configured group" |
| // JoinUnsuccessfulVlanDisabled constant |
| JoinUnsuccessfulVlanDisabled string = "Vlan is disabled" |
| // JoinUnsuccessfulDescription constant |
| JoinUnsuccessfulDescription string = "igmp join unsuccessful" |
| // QueryExpired constant |
| QueryExpired string = "QUERY-EXPIRED" |
| // QueryExpiredGroupSpecific constant |
| QueryExpiredGroupSpecific string = "Group specific multicast query expired" |
| // QueryExpiredDescription constant |
| QueryExpiredDescription string = "igmp query expired" |
| ) |
| |
| // McastConfig structure |
| type McastConfig struct { |
| OltSerialNum string |
| MvlanProfileID string |
| IgmpProfileID string |
| IgmpProxyIP net.IP |
| OperState OperInProgress |
| Version string |
| // This map will help in updating the igds whenever there is a igmp profile id update |
| IgmpGroupDevices sync.Map `json:"-"` // Key is group id |
| } |
| |
| var ( |
| // NullIPAddr is null ip address var |
| NullIPAddr = net.ParseIP("0.0.0.0") |
| // AllSystemsMulticastGroupIP |
| AllSystemsMulticastGroupIP = net.ParseIP("224.0.0.1") |
| // igmpSrcMac for the proxy |
| igmpSrcMac string |
| ) |
| |
| func init() { |
| RegisterPacketHandler(IGMP, ProcessIgmpPacket) |
| } |
| |
| // ProcessIgmpPacket : CallBack function registered with application to handle IGMP packetIn |
| func ProcessIgmpPacket(cntx context.Context, device string, port string, pkt gopacket.Packet) { |
| GetApplication().IgmpPacketInd(device, port, pkt) |
| } |
| |
| func ipv4ToUint(ip net.IP) uint32 { |
| result := uint32(0) |
| addr := ip.To4() |
| if addr == nil { |
| logger.Warnw(ctx, "Invalid Group Addr", log.Fields{"IP": ip}) |
| return 0 |
| } |
| result = result + uint32(addr[0])<<24 |
| result = result + uint32(addr[1])<<16 |
| result = result + uint32(addr[2])<<8 |
| result = result + uint32(addr[3]) |
| return result |
| } |
| |
| func getPodMacAddr() (string, error) { |
| ifas, err := net.Interfaces() |
| if err != nil { |
| return "", err |
| } |
| var ipv4Addr net.IP |
| for _, ifa := range ifas { |
| addrs, err := ifa.Addrs() |
| if err != nil { |
| return "", err |
| } |
| for _, addr := range addrs { |
| if ipv4Addr = addr.(*net.IPNet).IP.To4(); ipv4Addr != nil { |
| if ipv4Addr.IsGlobalUnicast() { |
| logger.Infow(ctx, "Igmp Static config", log.Fields{"MacAddr": ifa.HardwareAddr.String(), "ipAddr": ipv4Addr}) |
| return ifa.HardwareAddr.String(), nil |
| } |
| } |
| } |
| |
| } |
| return "", errors.New("MAC Address not found,Setting default") |
| } |
| |
| // IgmpUsEthLayer : Layers defined for upstream communication |
| // Ethernet layer for upstream communication |
| func IgmpUsEthLayer(mcip net.IP) *layers.Ethernet { |
| eth := &layers.Ethernet{} |
| // TODO: Set the source MAC properly and remove hardcoding |
| eth.SrcMAC, _ = net.ParseMAC(igmpSrcMac) |
| eth.DstMAC, _ = net.ParseMAC("01:00:5e:00:00:00") |
| eth.DstMAC[3] = mcip[1] & 0x7f |
| eth.DstMAC[4] = mcip[2] |
| eth.DstMAC[5] = mcip[3] |
| eth.EthernetType = layers.EthernetTypeDot1Q |
| return eth |
| } |
| |
| // IgmpUsDot1qLayer set US VLAN layer |
| func IgmpUsDot1qLayer(vlan of.VlanType, priority uint8) *layers.Dot1Q { |
| dot1q := &layers.Dot1Q{} |
| dot1q.Priority = priority |
| dot1q.DropEligible = false |
| dot1q.VLANIdentifier = uint16(vlan) |
| dot1q.Type = layers.EthernetTypeIPv4 |
| return dot1q |
| } |
| |
| // Igmpv2UsIpv4Layer : Set the IP layer for IGMPv2 |
| // TODO - Identify correct way of obtaining source IP |
| // This should be the configured IGMP proxy address which should be per OLT |
| // We should probably be able to have a single function for both |
| // upstream and downstream |
| func Igmpv2UsIpv4Layer(src net.IP, mcip net.IP) *layers.IPv4 { |
| ip := &layers.IPv4{} |
| ip.Version = 4 |
| ip.Protocol = layers.IPProtocolIGMP |
| ip.TTL = 1 |
| ip.SrcIP = src |
| ip.DstIP = mcip |
| return ip |
| } |
| |
| // Igmpv3UsIpv4Layer : Set the IP layer for IGMPv3 |
| // TODO - Identify correct way of obtaining source IP |
| // This should be the configured IGMP proxy address which should be per OLT |
| // We should probably be able to have a single function for both |
| // upstream and downstream |
| func Igmpv3UsIpv4Layer(src net.IP) *layers.IPv4 { |
| ip := &layers.IPv4{} |
| ip.Version = 4 |
| ip.Protocol = layers.IPProtocolIGMP |
| ip.TTL = 1 |
| ip.SrcIP = src |
| ip.DstIP = net.ParseIP("224.0.0.22") |
| return ip |
| } |
| |
| // IgmpDsEthLayer : Layers defined for downstream communication |
| // Ethernet layer for downstream communication |
| func IgmpDsEthLayer(mcip net.IP) *layers.Ethernet { |
| eth := &layers.Ethernet{} |
| // TODO: Set the source and dest MAC properly and remove hardcoding |
| eth.SrcMAC, _ = net.ParseMAC(igmpSrcMac) |
| eth.DstMAC, _ = net.ParseMAC("01:00:5e:00:00:00") |
| eth.DstMAC[3] = mcip[1] & 0x7f |
| eth.DstMAC[4] = mcip[2] |
| eth.DstMAC[5] = mcip[3] |
| eth.EthernetType = layers.EthernetTypeDot1Q |
| return eth |
| } |
| |
| // IgmpDsDot1qLayer set the DS VLAN layer |
| func IgmpDsDot1qLayer(vlan of.VlanType, priority uint8) *layers.Dot1Q { |
| dot1q := &layers.Dot1Q{} |
| dot1q.Priority = priority |
| dot1q.DropEligible = false |
| dot1q.VLANIdentifier = uint16(vlan) |
| dot1q.Type = layers.EthernetTypeIPv4 |
| return dot1q |
| } |
| |
| // IgmpDsIpv4Layer set the IP layer |
| func IgmpDsIpv4Layer(src net.IP, mcip net.IP) *layers.IPv4 { |
| ip := &layers.IPv4{} |
| ip.Version = 4 |
| ip.Protocol = layers.IPProtocolIGMP |
| ip.TTL = 1 |
| ip.SrcIP = src |
| if mcip.Equal(net.ParseIP("0.0.0.0")) { |
| mcip = net.ParseIP("224.0.0.1") |
| } |
| ip.DstIP = mcip |
| return ip |
| } |
| |
| // IgmpQueryv2Layer : IGMP Query Layer |
| func IgmpQueryv2Layer(mcip net.IP, resptime time.Duration) *layers.IGMPv1or2 { |
| igmp := &layers.IGMPv1or2{} |
| igmp.Type = layers.IGMPMembershipQuery |
| igmp.GroupAddress = mcip |
| igmp.MaxResponseTime = resptime |
| return igmp |
| } |
| |
| // IgmpQueryv3Layer : IGMP v3 Query Layer |
| func IgmpQueryv3Layer(mcip net.IP, resptime time.Duration) *layers.IGMP { |
| igmp := &layers.IGMP{} |
| igmp.Type = layers.IGMPMembershipQuery |
| igmp.GroupAddress = mcip |
| igmp.MaxResponseTime = resptime |
| return igmp |
| } |
| |
| // IgmpReportv2Layer : IGMP Layer |
| func IgmpReportv2Layer(mcip net.IP) *layers.IGMPv1or2 { |
| igmp := &layers.IGMPv1or2{} |
| igmp.Type = layers.IGMPMembershipReportV2 |
| igmp.GroupAddress = mcip |
| return igmp |
| } |
| |
| // IgmpLeavev2Layer : IGMP Leave Layer |
| func IgmpLeavev2Layer(mcip net.IP) *layers.IGMPv1or2 { |
| igmp := &layers.IGMPv1or2{} |
| igmp.Type = layers.IGMPLeaveGroup |
| igmp.GroupAddress = mcip |
| return igmp |
| } |
| |
| // IgmpReportv3Layer : IGMP v3 Report Layer |
| func IgmpReportv3Layer(mcip net.IP, incl bool, srclist []net.IP) *layers.IGMP { |
| // IGMP base |
| igmp := &layers.IGMP{} |
| igmp.Type = layers.IGMPMembershipReportV3 |
| igmp.NumberOfGroupRecords = 1 |
| |
| // IGMP Group |
| group := layers.IGMPv3GroupRecord{} |
| if incl { |
| group.Type = layers.IGMPIsIn |
| } else { |
| group.Type = layers.IGMPIsEx |
| } |
| group.MulticastAddress = mcip |
| group.NumberOfSources = uint16(len(srclist)) |
| group.SourceAddresses = srclist |
| igmp.GroupRecords = append(igmp.GroupRecords, group) |
| |
| return igmp |
| } |
| |
| // Igmpv2QueryPacket : IGMP Query in Downstream |
| func Igmpv2QueryPacket(mcip net.IP, vlan of.VlanType, selfip net.IP, pbit uint8, maxResp uint32) ([]byte, error) { |
| // Construct the layers that form the packet |
| eth := IgmpDsEthLayer(mcip) |
| dot1q := IgmpDsDot1qLayer(vlan, pbit) |
| ip := IgmpDsIpv4Layer(selfip, mcip) |
| igmp := IgmpQueryv2Layer(mcip, time.Duration(maxResp)*time.Second) |
| |
| // Now prepare the buffer into which the layers are to be serialized |
| buff := gopacket.NewSerializeBuffer() |
| opts := gopacket.SerializeOptions{ |
| FixLengths: true, |
| ComputeChecksums: true, |
| } |
| if err := gopacket.SerializeLayers(buff, opts, eth, dot1q, ip, igmp); err != nil { |
| logger.Error(ctx, "Error in serializing layers") |
| return nil, err |
| } |
| return buff.Bytes(), nil |
| } |
| |
| // Igmpv3QueryPacket : IGMPv3 Query in Downstream |
| func Igmpv3QueryPacket(mcip net.IP, vlan of.VlanType, selfip net.IP, pbit uint8, maxResp uint32) ([]byte, error) { |
| // Construct the layers that form the packet |
| eth := IgmpDsEthLayer(mcip) |
| dot1q := IgmpDsDot1qLayer(vlan, pbit) |
| ip := IgmpDsIpv4Layer(selfip, mcip) |
| igmp := IgmpQueryv3Layer(mcip, time.Duration(maxResp)*time.Second) |
| |
| // Now prepare the buffer into which the layers are to be serialized |
| buff := gopacket.NewSerializeBuffer() |
| opts := gopacket.SerializeOptions{ |
| FixLengths: true, |
| ComputeChecksums: true, |
| } |
| if err := gopacket.SerializeLayers(buff, opts, eth, dot1q, ip, igmp); err != nil { |
| logger.Error(ctx, "Error in serializing layers") |
| return nil, err |
| } |
| return buff.Bytes(), nil |
| } |
| |
| // IgmpReportv2Packet : Packet - IGMP v2 report in upstream |
| func IgmpReportv2Packet(mcip net.IP, vlan of.VlanType, priority uint8, selfip net.IP) ([]byte, error) { |
| // Construct the layers that form the packet |
| eth := IgmpUsEthLayer(mcip) |
| dot1q := IgmpUsDot1qLayer(vlan, priority) |
| ip := Igmpv2UsIpv4Layer(selfip, mcip) |
| igmp := IgmpReportv2Layer(mcip) |
| |
| // Now prepare the buffer into which the layers are to be serialized |
| buff := gopacket.NewSerializeBuffer() |
| opts := gopacket.SerializeOptions{ |
| FixLengths: true, |
| ComputeChecksums: true, |
| } |
| if err := gopacket.SerializeLayers(buff, opts, eth, dot1q, ip, igmp); err != nil { |
| logger.Error(ctx, "Error in serializing layers") |
| return nil, err |
| } |
| return buff.Bytes(), nil |
| } |
| |
| // Igmpv3ReportPacket : Packet - IGMP v3 report in upstream |
| func Igmpv3ReportPacket(mcip net.IP, vlan of.VlanType, priority uint8, selfip net.IP, incl bool, srclist []net.IP) ([]byte, error) { |
| // Construct the layers that form the packet |
| eth := IgmpUsEthLayer(net.ParseIP("224.0.0.22").To4()) |
| dot1q := IgmpUsDot1qLayer(vlan, priority) |
| ip := Igmpv3UsIpv4Layer(selfip) |
| igmp := IgmpReportv3Layer(mcip, incl, srclist) |
| |
| // Now prepare the buffer into which the layers are to be serialized |
| buff := gopacket.NewSerializeBuffer() |
| opts := gopacket.SerializeOptions{ |
| FixLengths: true, |
| ComputeChecksums: true, |
| } |
| if err := gopacket.SerializeLayers(buff, opts, eth, dot1q, ip, igmp); err != nil { |
| logger.Error(ctx, "Error in serializing layers") |
| return nil, err |
| } |
| return buff.Bytes(), nil |
| } |
| |
| // IgmpLeavePacket : Packet- IGMP Leave in upstream |
| func IgmpLeavePacket(mcip net.IP, vlan of.VlanType, priority uint8, selfip net.IP) ([]byte, error) { |
| // Construct the layers that form the packet |
| eth := IgmpUsEthLayer(mcip) |
| dot1q := IgmpUsDot1qLayer(vlan, priority) |
| ip := Igmpv2UsIpv4Layer(selfip, mcip) |
| igmp := IgmpLeavev2Layer(mcip) |
| |
| // Now prepare the buffer into which the layers are to be serialized |
| buff := gopacket.NewSerializeBuffer() |
| opts := gopacket.SerializeOptions{ |
| FixLengths: true, |
| ComputeChecksums: true, |
| } |
| if err := gopacket.SerializeLayers(buff, opts, eth, dot1q, ip, igmp); err != nil { |
| logger.Error(ctx, "Error in serializing layers") |
| return nil, err |
| } |
| return buff.Bytes(), nil |
| } |
| |
| // getVersion to get igmp version type |
| func getVersion(ver string) uint8 { |
| if ver == "2" || ver == "v2" { |
| return IgmpVersion2 |
| } |
| return IgmpVersion3 |
| } |
| |
| // IsIPPresent is Utility to check if an IP address is in a list |
| func IsIPPresent(i net.IP, ips []net.IP) bool { |
| for _, ip := range ips { |
| if i.Equal(ip) { |
| return true |
| } |
| } |
| return false |
| } |
| |
| //AddToPendingPool - adds Igmp Device obj to pending pool |
| func AddToPendingPool(cntx context.Context, device string, groupKey string) bool { |
| |
| logger.Infow(ctx, "Add Device to IgmpGroup Pending Pool", log.Fields{"Device": device, "GroupKey": groupKey}) |
| if grp, ok := GetApplication().IgmpGroups.Load(groupKey); ok { |
| ig := grp.(*IgmpGroup) |
| ig.PendingPoolLock.Lock() |
| logger.Infow(ctx, "Adding Device to IgmpGroup Pending Pool", log.Fields{"Device": device, "GroupID": ig.GroupID, "GroupName": ig.GroupName, "GroupAddr": ig.GroupAddr.String()}) |
| ig.PendingGroupForDevice[device] = time.Now().Add(time.Duration(GroupExpiryTime) * time.Minute) |
| ig.PendingPoolLock.Unlock() |
| if err := ig.WriteToDb(cntx); err != nil { |
| logger.Errorw(ctx, "Igmp group Write to DB failed", log.Fields{"groupName": ig.GroupName}) |
| } |
| return true |
| } |
| return false |
| } |
| |
| /* |
| func checkIfForceGroupRemove(device string) bool { |
| if d := GetApplication().GetDevice(device); d != nil { |
| if d.State == cntlr.DeviceStateREBOOTED || d.State == cntlr.DeviceStateDOWN { |
| return true |
| } |
| } |
| return false |
| }*/ |
| |
| // SendQueryExpiredEventGroupSpecific to send group specific query expired event. |
| func SendQueryExpiredEventGroupSpecific(portKey string, igd *IgmpGroupDevice, igc *IgmpGroupChannel) { |
| |
| logger.Info(ctx, "Processing-SendQueryExpiredEventGroupSpecific-Event") |
| va := GetApplication() |
| mvpName := va.GetMvlanProfileByTag(igd.Mvlan).Name |
| |
| sendEvent := func(key interface{}, value interface{}) bool { |
| if value.(*VoltService).IgmpEnabled && value.(*VoltService).MvlanProfileName == mvpName { |
| logger.Debugw(ctx, "sending-query-expired-group-specific-event", log.Fields{"EventType": QueryExpiredGroupSpecific, "ServiceName": value.(*VoltService).Name}) |
| } |
| return false |
| } |
| |
| // Fetching service name to send with query expired event. |
| vpvs, _ := va.VnetsByPort.Load(portKey) |
| if vpvs == nil { |
| logger.Errorw(ctx, "volt-port-vnet-is-nil", log.Fields{"vpvs": vpvs}) |
| return |
| } |
| |
| for _, vpv := range vpvs.([]*VoltPortVnet) { |
| vpv.services.Range(sendEvent) |
| } |
| } |
| |
| // GetMcastServiceForSubAlarm to get mcast service name for subscriber alarm. |
| func GetMcastServiceForSubAlarm(uniPort *VoltPort, mvp *MvlanProfile) string { |
| |
| var serviceName string |
| mvpName := mvp.Name |
| |
| va := GetApplication() |
| |
| sendAlm := func(key interface{}, value interface{}) bool { |
| if value.(*VoltService).IgmpEnabled && value.(*VoltService).MvlanProfileName == mvpName { |
| serviceName = value.(*VoltService).Name |
| } |
| return true |
| } |
| |
| // Fetching service name to send with active channels exceeded per subscriber alarm. |
| vpvs, _ := va.VnetsByPort.Load(uniPort.Name) |
| if vpvs == nil { |
| logger.Errorw(ctx, "volt-port-vnet-is-nil", log.Fields{"vpvs": vpvs}) |
| return serviceName |
| } |
| |
| for _, vpv := range vpvs.([]*VoltPortVnet) { |
| vpv.services.Range(sendAlm) |
| } |
| |
| return serviceName |
| |
| } |
| |
| // RestoreIgmpGroupsFromDb to restore igmp groups from database |
| func (va *VoltApplication) RestoreIgmpGroupsFromDb(cntx context.Context) { |
| |
| groups, _ := db.GetIgmpGroups(cntx) |
| for _, group := range groups { |
| b, ok := group.Value.([]byte) |
| if !ok { |
| logger.Warn(ctx, "The value type is not []byte") |
| continue |
| } |
| var ig IgmpGroup |
| err := json.Unmarshal(b, &ig) |
| if err != nil { |
| logger.Warn(ctx, "Unmarshal of IGMP Group failed") |
| continue |
| } |
| ig.Devices = make(map[string]*IgmpGroupDevice) |
| |
| //For Upgrade Case |
| if len(ig.PendingGroupForDevice) == 0 { |
| ig.PendingGroupForDevice = make(map[string]time.Time) |
| } |
| logger.Infow(ctx, "Restoring Groups", log.Fields{"igGroupID": ig.GroupID, "igGroupName": ig.GroupName, "igMvlan": ig.Mvlan}) |
| grpKey := ig.getKey() |
| va.IgmpGroups.Store(grpKey, &ig) |
| // Just delete and lose the IGMP group with the same group Id |
| if _, err := va.GetIgmpGroupID(ig.GroupID); err != nil { |
| logger.Warnw(ctx, "GetIgmpGroupID Failed", log.Fields{"igGroupID": ig.GroupID, "Error": err}) |
| } |
| ig.RestoreDevices(cntx) |
| |
| if ig.NumDevicesActive() == 0 { |
| va.AddGroupToPendingPool(&ig) |
| } |
| logger.Infow(ctx, "Restored Groups", log.Fields{"igGroupID": ig.GroupID, "igGroupName": ig.GroupName, "igMvlan": ig.Mvlan}) |
| } |
| } |
| |
| // AddIgmpGroup : When the first IGMP packet is received, the MVLAN profile is identified |
| // for the IGMP group and grp obj is obtained from the available pending pool of groups. |
| // If not, new group obj will be created based on available group IDs |
| func (va *VoltApplication) AddIgmpGroup(cntx context.Context, mvpName string, gip net.IP, device string) *IgmpGroup { |
| |
| var ig *IgmpGroup |
| if mvp, grpName := va.GetMvlanProfileForMcIP(mvpName, gip); mvp != nil { |
| if ig = va.GetGroupFromPendingPool(mvp.Mvlan, device); ig != nil { |
| logger.Infow(ctx, "Igmp Group obtained from global pending pool", log.Fields{"MvlanProfile": mvpName, "GroupID": ig.GroupID, "Device": device, "GroupName": ig.GroupName, "GroupAddr": ig.GroupAddr.String()}) |
| oldKey := mvp.generateGroupKey(ig.GroupName, ig.GroupAddr.String()) |
| ig.IgmpGroupReInit(cntx, grpName, gip) |
| ig.IsGroupStatic = mvp.Groups[grpName].IsStatic |
| ig.UpdateIgmpGroup(cntx, oldKey, ig.getKey()) |
| } else { |
| logger.Infow(ctx, "No Igmp Group available in global pending pool. Creating new Igmp Group", log.Fields{"MvlanProfile": mvpName, "Device": device, "GroupAddr": gip.String()}) |
| if ig = va.GetAvailIgmpGroupID(); ig == nil { |
| logger.Error(ctx, "Igmp Group Creation Failed: Group Id Unavailable") |
| return nil |
| } |
| ig.IgmpGroupInit(grpName, gip, mvp) |
| grpKey := ig.getKey() |
| va.IgmpGroups.Store(grpKey, ig) |
| } |
| if err := ig.WriteToDb(cntx); err != nil { |
| logger.Errorw(ctx, "Igmp group Write to DB failed", log.Fields{"groupName": ig.GroupName}) |
| } |
| return ig |
| } |
| logger.Errorw(ctx, "GetMvlan Pro failed", log.Fields{"Group": gip}) |
| return nil |
| } |
| |
| // GetIgmpGroup helps search for the IGMP group from the list of |
| // active IGMP groups. For now, the assumption is that a group |
| // cannot belong to more than on MVLAN. If we change that definition, |
| // we have to take a relook at this implementation. The key will include |
| // both MVLAN and the group IP. |
| func (va *VoltApplication) GetIgmpGroup(mvlan of.VlanType, gip net.IP) *IgmpGroup { |
| |
| profile, _ := va.MvlanProfilesByTag.Load(mvlan) |
| if profile == nil { |
| logger.Errorw(ctx, "Mvlan Profile not found for incoming packet. Dropping Request", log.Fields{"Mvlan": mvlan, "GroupAddr": gip.String()}) |
| return nil |
| } |
| mvp := profile.(*MvlanProfile) |
| _, gName := va.GetMvlanProfileForMcIP(mvp.Name, gip) |
| grpKey := mvp.generateGroupKey(gName, gip.String()) |
| logger.Debugw(ctx, "Get IGMP Group", log.Fields{"Group": grpKey}) |
| igIntf, ok := va.IgmpGroups.Load(grpKey) |
| if ok { |
| logger.Debugw(ctx, "Get IGMP Group Success", log.Fields{"Group": grpKey}) |
| ig := igIntf.(*IgmpGroup) |
| |
| //Case: Group was part of pending and Join came with same channel or different channel from same group |
| // (from same or different device) |
| // In that case, the same group will be allocated since the group is still part of va.IgmpGroups |
| // So, the groups needs to be removed from global pending pool |
| va.RemoveGroupDevicesFromPendingPool(ig) |
| return ig |
| } |
| return nil |
| } |
| |
| // DelIgmpGroup : When the last subscriber leaves the IGMP group across all the devices |
| // the IGMP group is removed. |
| func (va *VoltApplication) DelIgmpGroup(cntx context.Context, ig *IgmpGroup) { |
| |
| profile, found := GetApplication().MvlanProfilesByTag.Load(ig.Mvlan) |
| if found { |
| mvp := profile.(*MvlanProfile) |
| |
| grpKey := mvp.generateGroupKey(ig.GroupName, ig.GroupAddr.String()) |
| |
| if igIntf, ok := va.IgmpGroups.Load(grpKey); ok { |
| ig := igIntf.(*IgmpGroup) |
| ig.IgmpGroupLock.Lock() |
| if ig.NumDevicesAll() == 0 { |
| logger.Debugw(ctx, "Deleting IGMP Group", log.Fields{"Group": grpKey}) |
| va.PutIgmpGroupID(ig) |
| va.IgmpGroups.Delete(grpKey) |
| _ = db.DelIgmpGroup(cntx, grpKey) |
| } else { |
| logger.Infow(ctx, "Skipping IgmpGroup Device. Pending Igmp Group Devices present", log.Fields{"GroupID": ig.GroupID, "GroupName": ig.GroupName, "GroupAddr": ig.GroupAddr.String(), "PendingDevices": len(ig.Devices)}) |
| va.AddGroupToPendingPool(ig) |
| if err := ig.WriteToDb(cntx); err != nil { |
| logger.Errorw(ctx, "Igmp group Write to DB failed", log.Fields{"groupName": ig.GroupName}) |
| } |
| } |
| ig.IgmpGroupLock.Unlock() |
| } |
| |
| } |
| } |
| |
| // GetPonPortID Gets the PON port ID from uniPortID |
| func (va *VoltApplication) GetPonPortID(device, uniPortID string) uint32 { |
| |
| isNNI := strings.Contains(uniPortID, "nni") |
| if isNNI || uniPortID == StaticPort { |
| logger.Debugw(ctx, "Cannot get pon port from UNI port", log.Fields{"port": uniPortID}) |
| return 0xFF |
| } |
| dIntf, ok := va.DevicesDisc.Load(device) |
| if !ok { |
| return 0xFF |
| } |
| d := dIntf.(*VoltDevice) |
| |
| uniPort := d.GetPort(uniPortID) |
| if uniPort == nil { |
| return 0xFF |
| } |
| return GetPonPortIDFromUNIPort(uniPort.ID) |
| } |
| |
| // AggActiveChannelsCountPerSub aggregates the active channel count for given uni port. |
| // It will iterate over all the groups and store the sum of active channels in VoltPort. |
| func (va *VoltApplication) AggActiveChannelsCountPerSub(device, uniPort string, port *VoltPort) { |
| var activeChannelCount uint32 |
| |
| collectActiveChannelCount := func(key interface{}, value interface{}) bool { |
| ig := value.(*IgmpGroup) |
| igd := ig.Devices[device] |
| if igd == nil { |
| return true |
| } |
| if portChannels, ok := igd.PortChannelMap.Load(uniPort); ok { |
| channelList := portChannels.([]net.IP) |
| activeChannelCount += uint32(len(channelList)) |
| } |
| return true |
| } |
| va.IgmpGroups.Range(collectActiveChannelCount) |
| |
| logger.Debugw(ctx, "AggrActiveChannelCount for Subscriber", |
| log.Fields{"UniPortID": uniPort, "count": activeChannelCount}) |
| |
| port.ActiveChannels = activeChannelCount |
| } |
| |
| // AggActiveChannelsCountForPonPort Aggregates the active channel count for given pon port. |
| // It will iterate over all the groups and store the sum of active channels in VoltDevice. |
| func (va *VoltApplication) AggActiveChannelsCountForPonPort(device string, ponPortID uint32, port *PonPortCfg) { |
| |
| var activeChannelCount uint32 |
| |
| collectActiveChannelCount := func(key interface{}, value interface{}) bool { |
| ig := value.(*IgmpGroup) |
| igd := ig.Devices[device] |
| if igd == nil { |
| return true |
| } |
| if ponPortChannels, ok := igd.PonPortChannelMap.Get(ponPortID); ok { |
| activeChannelCount += ponPortChannels.(*PonPortChannels).GetActiveChannelCount() |
| } |
| return true |
| } |
| va.IgmpGroups.Range(collectActiveChannelCount) |
| |
| logger.Debugw(ctx, "AggrActiveChannelCount for Pon Port", |
| log.Fields{"PonPortID": ponPortID, "count": activeChannelCount}) |
| |
| port.ActiveIGMPChannels = activeChannelCount |
| } |
| |
| // UpdateActiveChannelCountForPonPort increments the global counter for active |
| // channel count per pon port. |
| func (va *VoltApplication) UpdateActiveChannelCountForPonPort(device, uniPortID string, ponPortID uint32, isAdd, isChannel bool, igd *IgmpGroupDevice) { |
| incrDecr := func(value uint32) uint32 { |
| if isAdd { |
| return value + 1 |
| } |
| return value - 1 |
| } |
| if d, exists := va.DevicesDisc.Load(device); exists { |
| voltDevice := d.(*VoltDevice) |
| |
| if isChannel { |
| voltDevice.ActiveChannelCountLock.Lock() |
| // If New channel is added/deleted, then only update the ActiveChannelsPerPon |
| if value, ok := voltDevice.ActiveChannelsPerPon.Load(ponPortID); ok { |
| port := value.(*PonPortCfg) |
| port.ActiveIGMPChannels = incrDecr(port.ActiveIGMPChannels) |
| voltDevice.ActiveChannelsPerPon.Store(ponPortID, port) |
| logger.Debugw(ctx, "+++ActiveChannelsPerPon", log.Fields{"count": port.ActiveIGMPChannels}) // TODO: remove me |
| } |
| voltDevice.ActiveChannelCountLock.Unlock() |
| } |
| if uPort, ok := voltDevice.Ports.Load(uniPortID); ok { |
| uniPort := uPort.(*VoltPort) |
| uniPort.ActiveChannels = incrDecr(uniPort.ActiveChannels) |
| voltDevice.Ports.Store(uniPortID, uniPort) |
| logger.Debugw(ctx, "+++ActiveChannelsPerSub", log.Fields{"count": uniPort.ActiveChannels}) // TODO: remove me |
| } |
| } |
| } |
| |
| // IsMaxChannelsCountExceeded checks if the PON port active channel |
| // capacity and subscriber level channel capacity is reached to max allowed |
| // channel per pon threshold. If Exceeds, return true else return false. |
| func (va *VoltApplication) IsMaxChannelsCountExceeded(device, uniPortID string, |
| ponPortID uint32, ig *IgmpGroup, channelIP net.IP, mvp *MvlanProfile) bool { |
| |
| // New receiver check is required to identify the IgmpReportMsg received |
| // in response to the IGMP Query sent from VGC. |
| if newReceiver := ig.IsNewReceiver(device, uniPortID, channelIP); !newReceiver { |
| logger.Debugw(ctx, "Not a new receiver. It is a response to IGMP Query", |
| log.Fields{"port": uniPortID, "channel": channelIP}) |
| return false |
| } |
| |
| if vDev, exists := va.DevicesDisc.Load(device); exists { |
| voltDevice := vDev.(*VoltDevice) |
| |
| // Checking subscriber active channel count with maxChannelsAllowedPerSub |
| if uniPort, present := voltDevice.Ports.Load(uniPortID); present { |
| if uniPort.(*VoltPort).ActiveChannels >= mvp.MaxActiveChannels { |
| logger.Errorw(ctx, "Max allowed channels per subscriber is exceeded", |
| log.Fields{"activeCount": uniPort.(*VoltPort).ActiveChannels, "channel": channelIP, "UNI": uniPort.(*VoltPort).Name}) |
| if !(uniPort.(*VoltPort).ChannelPerSubAlarmRaised) { |
| serviceName := GetMcastServiceForSubAlarm(uniPort.(*VoltPort), mvp) |
| logger.Debugw(ctx, "Raising-SendActiveChannelPerSubscriberAlarm-Initiated", log.Fields{"ActiveChannels": uniPort.(*VoltPort).ActiveChannels, "ServiceName": serviceName}) |
| uniPort.(*VoltPort).ChannelPerSubAlarmRaised = true |
| } |
| return true |
| } |
| } else { |
| logger.Errorw(ctx, "UNI port not found in VoltDevice", log.Fields{"uniPortID": uniPortID}) |
| } |
| if value, ok := voltDevice.ActiveChannelsPerPon.Load(ponPortID); ok { |
| ponPort := value.(*PonPortCfg) |
| |
| logger.Debugw(ctx, "----Active channels count for PON port", |
| log.Fields{"PonPortID": ponPortID, "activeChannels": ponPort.ActiveIGMPChannels, |
| "maxAllowedChannelsPerPon": ponPort.MaxActiveChannels}) |
| |
| if ponPort.ActiveIGMPChannels < ponPort.MaxActiveChannels { |
| // PON port active channel capacity is not yet reached to max allowed channels per pon. |
| // So allowing to add receiver. |
| return false |
| } else if ponPort.ActiveIGMPChannels >= ponPort.MaxActiveChannels && ig != nil { |
| // PON port active channel capacity is reached to max allowed channels per pon. |
| // Check if same channel is already configured on that PON port. |
| // If that channel is present, then allow AddReceiver else it will be rejected. |
| igd, isPresent := ig.Devices[device] |
| if isPresent { |
| if channelListForPonPort, _ := igd.PonPortChannelMap.Get(ponPortID); channelListForPonPort != nil { |
| if _, isExists := channelListForPonPort.(*PonPortChannels).ChannelList.Get(channelIP.String()); isExists { |
| return false |
| } |
| } |
| } |
| } |
| logger.Errorw(ctx, "Active channels count for PON port exceeded", |
| log.Fields{"PonPortID": ponPortID, "activeChannels": ponPort.ActiveIGMPChannels, "channel": channelIP, "UNI": uniPortID}) |
| } else { |
| logger.Warnw(ctx, "PON port level active channel count does not exists", |
| log.Fields{"ponPortID": ponPortID}) |
| return false |
| } |
| } |
| logger.Warnw(ctx, "Max allowed channels per pon threshold is reached", log.Fields{"PonPortID": ponPortID}) |
| return true |
| } |
| |
| // ProcessIgmpv2Pkt : This is IGMPv2 packet. |
| func (va *VoltApplication) ProcessIgmpv2Pkt(cntx context.Context, device string, port string, pkt gopacket.Packet) { |
| // First get the layers of interest |
| dot1Q := pkt.Layer(layers.LayerTypeDot1Q).(*layers.Dot1Q) |
| pktVlan := of.VlanType(dot1Q.VLANIdentifier) |
| igmpv2 := pkt.Layer(layers.LayerTypeIGMP).(*layers.IGMPv1or2) |
| |
| ponPortID := va.GetPonPortID(device, port) |
| |
| var vpv *VoltPortVnet |
| |
| logger.Debugw(ctx, "Received IGMPv2 Type", log.Fields{"Type": igmpv2.Type}) |
| |
| if igmpv2.Type == layers.IGMPMembershipReportV2 || igmpv2.Type == layers.IGMPMembershipReportV1 { |
| |
| logger.Infow(ctx, "IGMP Join received: v2", log.Fields{"Addr": igmpv2.GroupAddress, "Port": port}) |
| |
| // This is a report coming from the PON. We must be able to first find the |
| // subscriber from the VLAN tag and port and verify if the IGMP proxy is |
| // enabled for the subscriber |
| vpv, _ = va.GetVnetFromPkt(device, port, pkt) |
| |
| if vpv == nil { |
| logger.Errorw(ctx, "Couldn't find VNET associated with port", log.Fields{"Port": port}) |
| return |
| } else if !vpv.IgmpEnabled { |
| logger.Errorw(ctx, "IGMP is not activated on the port", log.Fields{"Port": port}) |
| return |
| } |
| |
| mvp := va.GetMvlanProfileByName(vpv.MvlanProfileName) |
| if mvp == nil { |
| logger.Errorw(ctx, "Igmp Packet Received for Subscriber with Missing Mvlan Profile", |
| log.Fields{"Receiver": vpv.Port, "MvlanProfile": vpv.MvlanProfileName}) |
| return |
| } |
| mvlan := mvp.Mvlan |
| |
| mvp.mvpLock.RLock() |
| defer mvp.mvpLock.RUnlock() |
| // The subscriber is validated and now process the IGMP report |
| ig := va.GetIgmpGroup(mvlan, igmpv2.GroupAddress) |
| |
| if yes := va.IsMaxChannelsCountExceeded(device, port, ponPortID, ig, igmpv2.GroupAddress, mvp); yes { |
| logger.Warnw(ctx, "Dropping IGMP Join v2: Active channel threshold exceeded", |
| log.Fields{"PonPortID": ponPortID, "Addr": igmpv2.GroupAddress, "MvlanProfile": vpv.MvlanProfileName}) |
| return |
| } |
| if ig != nil { |
| logger.Infow(ctx, "IGMP Group", log.Fields{"Group": ig.GroupID, "devices": ig.Devices}) |
| // If the IGMP group is already created. just add the receiver |
| ig.IgmpGroupLock.Lock() |
| // Check for port state to avoid race condition where PortDown event |
| // acquired lock before packet processing |
| vd := GetApplication().GetDevice(device) |
| vp := vd.GetPort(port) |
| if vp == nil || vp.State != PortStateUp { |
| logger.Warnw(ctx, "Join received from a Port that is DOWN or not present", |
| log.Fields{"Port": port}) |
| ig.IgmpGroupLock.Unlock() |
| return |
| } |
| ig.AddReceiver(cntx, device, port, igmpv2.GroupAddress, nil, IgmpVersion2, dot1Q.VLANIdentifier, dot1Q.Priority, ponPortID) |
| ig.IgmpGroupLock.Unlock() |
| } else { |
| // Create the IGMP group and then add the receiver to the group |
| if ig := va.AddIgmpGroup(cntx, vpv.MvlanProfileName, igmpv2.GroupAddress, device); ig != nil { |
| logger.Infow(ctx, "New IGMP Group", log.Fields{"Group": ig.GroupID, "devices": ig.Devices}) |
| ig.IgmpGroupLock.Lock() |
| // Check for port state to avoid race condition where PortDown event |
| // acquired lock before packet processing |
| vd := GetApplication().GetDevice(device) |
| vp := vd.GetPort(port) |
| if vp == nil || vp.State != PortStateUp { |
| logger.Warnw(ctx, "Join received from a Port that is DOWN or not present", |
| log.Fields{"Port": port}) |
| ig.IgmpGroupLock.Unlock() |
| return |
| } |
| ig.AddReceiver(cntx, device, port, igmpv2.GroupAddress, nil, IgmpVersion2, dot1Q.VLANIdentifier, dot1Q.Priority, ponPortID) |
| ig.IgmpGroupLock.Unlock() |
| } else { |
| logger.Errorw(ctx, "IGMP Group Creation Failed", log.Fields{"Addr": igmpv2.GroupAddress}) |
| return |
| } |
| } |
| } else if igmpv2.Type == layers.IGMPLeaveGroup { |
| // This is a IGMP leave coming from one of the receivers. We essentially remove the |
| // the receiver. |
| logger.Infow(ctx, "IGMP Leave received: v2", log.Fields{"Addr": igmpv2.GroupAddress, "Port": port}) |
| |
| vpv, _ = va.GetVnetFromPkt(device, port, pkt) |
| if vpv == nil { |
| logger.Errorw(ctx, "Couldn't find VNET associated with port", log.Fields{"Port": port}) |
| return |
| } else if !vpv.IgmpEnabled { |
| logger.Errorw(ctx, "IGMP is not activated on the port", log.Fields{"Port": port}) |
| return |
| } |
| |
| mvp := va.GetMvlanProfileByName(vpv.MvlanProfileName) |
| mvp.mvpLock.RLock() |
| defer mvp.mvpLock.RUnlock() |
| mvlan := mvp.Mvlan |
| // The subscriber is validated and now process the IGMP report |
| if ig := va.GetIgmpGroup(mvlan, igmpv2.GroupAddress); ig != nil { |
| ig.IgmpGroupLock.Lock() |
| // Delete the receiver once the IgmpGroup is identified |
| ig.DelReceiver(cntx, device, port, igmpv2.GroupAddress, nil, ponPortID) |
| ig.IgmpGroupLock.Unlock() |
| if ig.NumDevicesActive() == 0 { |
| va.DelIgmpGroup(cntx, ig) |
| } |
| } |
| } else { |
| // This must be a query on the NNI port. However, we dont make that assumption. |
| // Need to look for the IGMP group based on the VLAN in the packet as |
| // the MVLAN |
| |
| //Check if mvlan profile exist for the incoming pkt vlan |
| profile, _ := va.MvlanProfilesByTag.Load(pktVlan) |
| if profile == nil { |
| logger.Errorw(ctx, "Mvlan Profile not found for incoming packet. Dropping Request", log.Fields{"Mvlan": pktVlan}) |
| return |
| } |
| mvp := profile.(*MvlanProfile) |
| mvp.mvpLock.RLock() |
| defer mvp.mvpLock.RUnlock() |
| |
| if net.ParseIP("0.0.0.0").Equal(igmpv2.GroupAddress) { |
| va.processIgmpQueries(cntx, device, pktVlan, IgmpVersion2) |
| } else { |
| if ig := va.GetIgmpGroup(pktVlan, igmpv2.GroupAddress); ig != nil { |
| ig.IgmpGroupLock.Lock() |
| igd, ok := ig.Devices[device] |
| if ok { |
| igd.ProcessQuery(cntx, igmpv2.GroupAddress, IgmpVersion2) |
| } else { |
| logger.Warnw(ctx, "IGMP Device not found", log.Fields{"Device": device, "Group": igmpv2.GroupAddress}) |
| } |
| ig.IgmpGroupLock.Unlock() |
| } |
| } |
| } |
| } |
| |
| // ProcessIgmpv3Pkt : Process IGMPv3 packet |
| func (va *VoltApplication) ProcessIgmpv3Pkt(cntx context.Context, device string, port string, pkt gopacket.Packet) { |
| // First get the layers of interest |
| dot1QLayer := pkt.Layer(layers.LayerTypeDot1Q) |
| |
| if dot1QLayer == nil { |
| logger.Error(ctx, "Igmp Packet Received without Vlan - Dropping pkt") |
| return |
| } |
| dot1Q := dot1QLayer.(*layers.Dot1Q) |
| pktVlan := of.VlanType(dot1Q.VLANIdentifier) |
| igmpv3 := pkt.Layer(layers.LayerTypeIGMP).(*layers.IGMP) |
| |
| ponPortID := va.GetPonPortID(device, port) |
| |
| var vpv *VoltPortVnet |
| logger.Debugw(ctx, "Received IGMPv3 Type", log.Fields{"Type": igmpv3.Type}) |
| |
| if igmpv3.Type == layers.IGMPMembershipReportV3 { |
| // This is a report coming from the PON. We must be able to first find the |
| // subscriber from the VLAN tag and port and verify if the IGMP proxy is |
| // enabled for the subscriber |
| vpv, _ = va.GetVnetFromPkt(device, port, pkt) |
| if vpv == nil { |
| logger.Errorw(ctx, "Couldn't find VNET associated with port", log.Fields{"Port": port}) |
| return |
| } else if !vpv.IgmpEnabled { |
| logger.Errorw(ctx, "IGMP is not activated on the port", log.Fields{"Port": port}) |
| return |
| } |
| mvp := va.GetMvlanProfileByName(vpv.MvlanProfileName) |
| if mvp == nil { |
| logger.Errorw(ctx, "Igmp Packet received for Subscriber with Missing Mvlan Profile", |
| log.Fields{"Receiver": vpv.Port, "MvlanProfile": vpv.MvlanProfileName}) |
| return |
| } |
| mvp.mvpLock.RLock() |
| defer mvp.mvpLock.RUnlock() |
| mvlan := mvp.Mvlan |
| |
| for _, group := range igmpv3.GroupRecords { |
| |
| isJoin := isIgmpJoin(group.Type, group.SourceAddresses) |
| // The subscriber is validated and now process the IGMP report |
| ig := va.GetIgmpGroup(mvlan, group.MulticastAddress) |
| if isJoin { |
| if yes := va.IsMaxChannelsCountExceeded(device, port, ponPortID, ig, group.MulticastAddress, mvp); yes { |
| logger.Warnw(ctx, "Dropping IGMP Join v3: Active channel threshold exceeded", |
| log.Fields{"PonPortID": ponPortID, "Addr": group.MulticastAddress, "MvlanProfile": vpv.MvlanProfileName}) |
| |
| return |
| } |
| if ig != nil { |
| // If the IGMP group is already created. just add the receiver |
| logger.Infow(ctx, "IGMP Join received for existing group", log.Fields{"Addr": group.MulticastAddress, "Port": port}) |
| ig.IgmpGroupLock.Lock() |
| // Check for port state to avoid race condition where PortDown event |
| // acquired lock before packet processing |
| vd := GetApplication().GetDevice(device) |
| vp := vd.GetPort(port) |
| if vp == nil || vp.State != PortStateUp { |
| logger.Warnw(ctx, "Join received from a Port that is DOWN or not present", |
| log.Fields{"Port": port}) |
| ig.IgmpGroupLock.Unlock() |
| return |
| } |
| ig.AddReceiver(cntx, device, port, group.MulticastAddress, &group, IgmpVersion3, |
| dot1Q.VLANIdentifier, dot1Q.Priority, ponPortID) |
| ig.IgmpGroupLock.Unlock() |
| } else { |
| // Create the IGMP group and then add the receiver to the group |
| logger.Infow(ctx, "IGMP Join received for new group", log.Fields{"Addr": group.MulticastAddress, "Port": port}) |
| if ig := va.AddIgmpGroup(cntx, vpv.MvlanProfileName, group.MulticastAddress, device); ig != nil { |
| ig.IgmpGroupLock.Lock() |
| // Check for port state to avoid race condition where PortDown event |
| // acquired lock before packet processing |
| vd := GetApplication().GetDevice(device) |
| vp := vd.GetPort(port) |
| if vp == nil || vp.State != PortStateUp { |
| logger.Warnw(ctx, "Join received from a Port that is DOWN or not present", |
| log.Fields{"Port": port}) |
| ig.IgmpGroupLock.Unlock() |
| return |
| } |
| ig.AddReceiver(cntx, device, port, group.MulticastAddress, &group, IgmpVersion3, |
| dot1Q.VLANIdentifier, dot1Q.Priority, ponPortID) |
| ig.IgmpGroupLock.Unlock() |
| } else { |
| logger.Warnw(ctx, "IGMP Group Creation Failed", log.Fields{"Addr": group.MulticastAddress}) |
| } |
| } |
| } else if ig != nil { |
| logger.Infow(ctx, "IGMP Leave received for existing group", log.Fields{"Addr": group.MulticastAddress, "Port": port}) |
| ig.IgmpGroupLock.Lock() |
| ig.DelReceiver(cntx, device, port, group.MulticastAddress, &group, ponPortID) |
| ig.IgmpGroupLock.Unlock() |
| if ig.NumDevicesActive() == 0 { |
| va.DelIgmpGroup(cntx, ig) |
| } |
| } else { |
| logger.Warnw(ctx, "IGMP Leave received for unknown group", log.Fields{"Addr": group.MulticastAddress}) |
| } |
| } |
| } else { |
| // This must be a query on the NNI port. However, we dont make that assumption. |
| // Need to look for the IGMP group based on the VLAN in the packet as |
| // the MVLAN |
| |
| //Check if mvlan profile exist for the incoming pkt vlan |
| profile, _ := va.MvlanProfilesByTag.Load(pktVlan) |
| if profile == nil { |
| logger.Errorw(ctx, "Mvlan Profile not found for incoming packet. Dropping Request", log.Fields{"Mvlan": pktVlan}) |
| return |
| } |
| mvp := profile.(*MvlanProfile) |
| mvp.mvpLock.RLock() |
| defer mvp.mvpLock.RUnlock() |
| |
| if net.ParseIP("0.0.0.0").Equal(igmpv3.GroupAddress) { |
| va.processIgmpQueries(cntx, device, pktVlan, IgmpVersion3) |
| } else { |
| if ig := va.GetIgmpGroup(pktVlan, igmpv3.GroupAddress); ig != nil { |
| ig.IgmpGroupLock.Lock() |
| igd, ok := ig.Devices[device] |
| if ok { |
| igd.ProcessQuery(cntx, igmpv3.GroupAddress, IgmpVersion3) |
| } else { |
| logger.Warnw(ctx, "IGMP Device not found", log.Fields{"Device": device, "Group": igmpv3.GroupAddress}) |
| } |
| ig.IgmpGroupLock.Unlock() |
| } |
| } |
| } |
| } |
| |
| // processIgmpQueries to process the igmp queries |
| func (va *VoltApplication) processIgmpQueries(cntx context.Context, device string, pktVlan of.VlanType, version uint8) { |
| // This is a generic query and respond with all the groups channels in currently being viewed. |
| processquery := func(key interface{}, value interface{}) bool { |
| ig := value.(*IgmpGroup) |
| ig.IgmpGroupLock.Lock() |
| if ig.Mvlan != pktVlan { |
| ig.IgmpGroupLock.Unlock() |
| return true |
| } |
| igd, ok := ig.Devices[device] |
| if !ok { |
| logger.Warnw(ctx, "IGMP Device not found", log.Fields{"Device": device}) |
| ig.IgmpGroupLock.Unlock() |
| return true |
| } |
| processQueryForEachChannel := func(key interface{}, value interface{}) bool { |
| groupAddr := key.(string) |
| igd.ProcessQuery(cntx, net.ParseIP(groupAddr), version) |
| return true |
| } |
| igd.GroupChannels.Range(processQueryForEachChannel) |
| ig.IgmpGroupLock.Unlock() |
| return true |
| } |
| va.IgmpGroups.Range(processquery) |
| } |
| |
| // isIgmpJoin to check if it is igmp join |
| func isIgmpJoin(recordType layers.IGMPv3GroupRecordType, sourceAddr []net.IP) bool { |
| var join = false |
| |
| if (layers.IGMPToEx == recordType) || (layers.IGMPIsEx == recordType) { |
| join = true |
| } else if layers.IGMPBlock == recordType { |
| if len(sourceAddr) == 0 { |
| join = true |
| } |
| } else if (layers.IGMPToIn == recordType) || (layers.IGMPIsIn == recordType) || (layers.IGMPAllow == recordType) { |
| if len(sourceAddr) != 0 { |
| join = true |
| } |
| } |
| return join |
| } |
| |
| func isIncl(recordType layers.IGMPv3GroupRecordType) bool { |
| |
| if (layers.IGMPToIn == recordType) || (layers.IGMPIsIn == recordType) || (layers.IGMPAllow == recordType) { |
| return true |
| } |
| return false |
| } |
| |
| // IgmpProcessPkt to process the IGMP packet received. The packet received brings along with it |
| // the port on which the packet is received and the device the port is in. |
| func (va *VoltApplication) IgmpProcessPkt(cntx context.Context, device string, port string, pkt gopacket.Packet) { |
| igmpl := pkt.Layer(layers.LayerTypeIGMP) |
| if igmpl == nil { |
| logger.Error(ctx, "Invalid IGMP packet arrived as IGMP packet") |
| return |
| } |
| if igmp, ok := igmpl.(*layers.IGMPv1or2); ok { |
| // This is an IGMPv2 packet. |
| logger.Debugw(ctx, "IGMPv2 Packet Received", log.Fields{"IPAddr": igmp.GroupAddress}) |
| va.ProcessIgmpv2Pkt(cntx, device, port, pkt) |
| return |
| } |
| if igmpv3, ok := igmpl.(*layers.IGMP); ok { |
| logger.Debugw(ctx, "IGMPv3 Packet Received", log.Fields{"NumOfGroups": igmpv3.NumberOfGroupRecords}) |
| va.ProcessIgmpv3Pkt(cntx, device, port, pkt) |
| } |
| } |
| |
| // IgmpPacketInd for igmp packet indication |
| func (va *VoltApplication) IgmpPacketInd(device string, port string, pkt gopacket.Packet) { |
| pt := NewIgmpPacketTask(device, port, pkt) |
| va.IgmpTasks.AddTask(pt) |
| } |
| |
| // storeMvlansMap to store mvlan map |
| func (va *VoltApplication) storeMvlansMap(mvlan of.VlanType, name string, mvp *MvlanProfile) { |
| va.MvlanProfilesByTag.Store(mvlan, mvp) |
| va.MvlanProfilesByName.Store(name, mvp) |
| } |
| |
| // deleteMvlansMap to delete mvlan map |
| func (va *VoltApplication) deleteMvlansMap(mvlan of.VlanType, name string) { |
| va.MvlanProfilesByTag.Delete(mvlan) |
| va.MvlanProfilesByName.Delete(name) |
| } |
| |
| // RestoreMvlansFromDb to read from the DB and restore all the MVLANs |
| func (va *VoltApplication) RestoreMvlansFromDb(cntx context.Context) { |
| mvlans, _ := db.GetMvlans(cntx) |
| for _, mvlan := range mvlans { |
| b, ok := mvlan.Value.([]byte) |
| if !ok { |
| logger.Warn(ctx, "The value type is not []byte") |
| continue |
| } |
| var mvp MvlanProfile |
| err := json.Unmarshal(b, &mvp) |
| if err != nil { |
| logger.Warn(ctx, "Unmarshal of MVLAN failed") |
| continue |
| } |
| va.storeMvlansMap(mvp.Mvlan, mvp.Name, &mvp) |
| |
| for srNo := range mvp.DevicesList { |
| if mvp.IgmpServVersion[srNo] == nil { |
| servVersion := IgmpVersion0 |
| mvp.IgmpServVersion[srNo] = &servVersion |
| } |
| } |
| logger.Infow(ctx, "Restored Mvlan Profile", log.Fields{"MVPName": mvp.Name}) |
| } |
| } |
| |
| // GetMvlanProfileByTag fetches MVLAN profile based on the MC VLAN |
| func (va *VoltApplication) GetMvlanProfileByTag(vlan of.VlanType) *MvlanProfile { |
| if mvp, ok := va.MvlanProfilesByTag.Load(vlan); ok { |
| return mvp.(*MvlanProfile) |
| } |
| return nil |
| } |
| |
| // GetMvlanProfileByName fetches MVLAN profile based on the profile name. |
| func (va *VoltApplication) GetMvlanProfileByName(name string) *MvlanProfile { |
| if mvp, ok := va.MvlanProfilesByName.Load(name); ok { |
| return mvp.(*MvlanProfile) |
| } |
| return nil |
| } |
| |
| //UpdateMvlanProfile - only channel groups be updated |
| func (va *VoltApplication) UpdateMvlanProfile(cntx context.Context, name string, vlan of.VlanType, groups map[string][]string, activeChannelCount int, proxy map[string]common.MulticastGroupProxy) error { |
| |
| mvpIntf, ok := va.MvlanProfilesByName.Load(name) |
| if !ok { |
| logger.Error(ctx, "Update Mvlan Failed: Profile does not exist") |
| return errors.New("MVLAN profile not found") |
| } |
| mvp := mvpIntf.(*MvlanProfile) |
| // check if groups are same then just update the OLTSerial numbers, push the config on new serial numbers |
| |
| existingGroup := mvp.Groups |
| existingProxy := mvp.Proxy |
| mvp.Groups = make(map[string]*MvlanGroup) |
| mvp.Proxy = make(map[string]*MCGroupProxy) |
| |
| /* Need to protect groups and proxy write lock */ |
| mvp.mvpLock.Lock() |
| for grpName, grpIPList := range groups { |
| mvp.AddMvlanGroup(grpName, grpIPList) |
| } |
| for grpName, proxyInfo := range proxy { |
| mvp.AddMvlanProxy(grpName, proxyInfo) |
| } |
| if _, ok := mvp.Groups[common.StaticGroup]; ok { |
| if _, yes := mvp.Proxy[common.StaticGroup]; !yes { |
| mvp.Groups[common.StaticGroup].IsStatic = true |
| } |
| } |
| prevMaxActiveChannels := mvp.MaxActiveChannels |
| if reflect.DeepEqual(mvp.Groups, existingGroup) && reflect.DeepEqual(mvp.Proxy, existingProxy) { |
| logger.Info(ctx, "No change in groups config") |
| if uint32(activeChannelCount) != mvp.MaxActiveChannels { |
| mvp.MaxActiveChannels = uint32(activeChannelCount) |
| if err := mvp.WriteToDb(cntx); err != nil { |
| logger.Errorw(ctx, "Mvlan profile Write to DB failed", log.Fields{"ProfileName": mvp.Name}) |
| } |
| if prevMaxActiveChannels != mvp.MaxActiveChannels { |
| mvp.UpdateActiveChannelSubscriberAlarm() |
| } |
| } |
| mvp.mvpLock.Unlock() |
| return nil |
| } |
| mvp.mvpLock.Unlock() |
| mvp.MaxActiveChannels = uint32(activeChannelCount) |
| |
| // Status is maintained so that in the event of any crash or reboot during update, |
| // the recovery is possible once the pod is UP again |
| mvp.SetUpdateStatus("", UpdateInProgress) |
| mvp.oldGroups = existingGroup |
| mvp.oldProxy = existingProxy |
| va.storeMvlansMap(vlan, name, mvp) |
| if err := mvp.WriteToDb(cntx); err != nil { |
| logger.Errorw(ctx, "Mvlan profile Write to DB failed", log.Fields{"ProfileName": mvp.Name}) |
| } |
| if prevMaxActiveChannels != mvp.MaxActiveChannels { |
| mvp.UpdateActiveChannelSubscriberAlarm() |
| } |
| |
| // The update task is added as part of Igm p task list, so that any parallel igmp pkt processing is avoided |
| // Until, the update operation is completed, the igmp pkt processing will be enqueued |
| updateTask := NewUpdateMvlanTask(mvp, "") |
| va.IgmpTasks.AddTask(updateTask) |
| return nil |
| } |
| |
| // isDeviceInList to check if device is the list |
| func isDeviceInList(serialNum string, OLTSerialNums []string) bool { |
| for _, oltSerialNum := range OLTSerialNums { |
| if serialNum == oltSerialNum { |
| return true |
| } |
| } |
| return false |
| } |
| |
| // McastConfigKey creates the key using the olt serial number and mvlan profile id |
| func McastConfigKey(oltSerialNum string, mvlanProfID string) string { |
| return oltSerialNum + "_" + mvlanProfID |
| } |
| |
| // GetMcastConfig to get McastConfig Information by OLT and Mvlan Profile ID |
| func (va *VoltApplication) GetMcastConfig(oltSerialNum string, mvlanProfID string) *McastConfig { |
| if mc, ok := va.McastConfigMap.Load(McastConfigKey(oltSerialNum, mvlanProfID)); ok { |
| return mc.(*McastConfig) |
| } |
| return nil |
| } |
| |
| func (va *VoltApplication) storeMcastConfig(oltSerialNum string, mvlanProfID string, mcastConfig *McastConfig) { |
| va.McastConfigMap.Store(McastConfigKey(oltSerialNum, mvlanProfID), mcastConfig) |
| } |
| |
| func (va *VoltApplication) deleteMcastConfig(oltSerialNum string, mvlanProfID string) { |
| va.McastConfigMap.Delete(McastConfigKey(oltSerialNum, mvlanProfID)) |
| } |
| |
| // AddMcastConfig for addition of a MVLAN profile |
| func (va *VoltApplication) AddMcastConfig(cntx context.Context, MvlanProfileID string, IgmpProfileID string, IgmpProxyIP string, OltSerialNum string) error { |
| var mcastCfg *McastConfig |
| |
| mcastCfg = va.GetMcastConfig(OltSerialNum, MvlanProfileID) |
| if mcastCfg == nil { |
| mcastCfg = &McastConfig{} |
| } else { |
| logger.Debugw(ctx, "Mcast Config already exists", log.Fields{"OltSerialNum": mcastCfg.OltSerialNum, |
| "MVLAN Profile ID": mcastCfg.MvlanProfileID}) |
| } |
| |
| // Update all igds available |
| mvpIntf, ok := va.MvlanProfilesByName.Load(MvlanProfileID) |
| if !ok { |
| return errors.New("MVLAN profile not found during add mcast config") |
| } |
| mvlan := mvpIntf.(*MvlanProfile).Mvlan |
| |
| mcastCfg.OltSerialNum = OltSerialNum |
| mcastCfg.MvlanProfileID = MvlanProfileID |
| mcastCfg.IgmpProfileID = IgmpProfileID |
| mcastCfg.IgmpProxyIP = net.ParseIP(IgmpProxyIP) |
| |
| proxyCfg := va.getIgmpProfileMap(IgmpProfileID) |
| |
| iterIgmpGroups := func(key interface{}, value interface{}) bool { |
| ig := value.(*IgmpGroup) |
| if ig.Mvlan != mvlan { |
| return true |
| } |
| |
| for _, igd := range ig.Devices { |
| if igd.SerialNo != OltSerialNum { |
| continue |
| } |
| igd.proxyCfg = proxyCfg |
| if IgmpProfileID == "" { |
| igd.IgmpProxyIP = &igd.proxyCfg.IgmpSourceIP |
| } else { |
| igd.IgmpProxyIP = &mcastCfg.IgmpProxyIP |
| } |
| mcastCfg.IgmpGroupDevices.Store(igd.GroupID, igd) |
| logger.Debugw(ctx, "Igd updated with proxyCfg and proxyIP", log.Fields{"name": igd.GroupName, |
| "IgmpProfileID": IgmpProfileID, "ProxyIP": mcastCfg.IgmpProxyIP}) |
| } |
| return true |
| } |
| va.IgmpGroups.Range(iterIgmpGroups) |
| |
| va.storeMcastConfig(OltSerialNum, MvlanProfileID, mcastCfg) |
| if err := mcastCfg.WriteToDb(cntx); err != nil { |
| logger.Errorw(ctx, "McastConfig Write to DB failed", log.Fields{"OltSerialNum": mcastCfg.OltSerialNum, "MvlanProfileID": mcastCfg.MvlanProfileID}) |
| } |
| va.addOltToMvlan(cntx, MvlanProfileID, OltSerialNum) |
| |
| return nil |
| } |
| |
| func (va *VoltApplication) addOltToMvlan(cntx context.Context, MvlanProfileID string, OltSerialNum string) { |
| var mvp *MvlanProfile |
| if mvpIntf, ok := va.MvlanProfilesByName.Load(MvlanProfileID); ok { |
| servVersion := IgmpVersion0 |
| mvp = mvpIntf.(*MvlanProfile) |
| mvp.DevicesList[OltSerialNum] = NoOp |
| mvp.IgmpServVersion[OltSerialNum] = &servVersion |
| if err := mvp.WriteToDb(cntx); err != nil { |
| logger.Errorw(ctx, "Mvlan profile Write to DB failed", log.Fields{"ProfileName": mvp.Name}) |
| } |
| mvp.pushIgmpMcastFlows(cntx, OltSerialNum) |
| } |
| } |
| |
| func (va *VoltApplication) delOltFromMvlan(cntx context.Context, MvlanProfileID string, OltSerialNum string) { |
| var mvp *MvlanProfile |
| if mvpIntf, ok := va.MvlanProfilesByName.Load(MvlanProfileID); ok { |
| mvp = mvpIntf.(*MvlanProfile) |
| //Delete from mvp list |
| mvp.removeIgmpMcastFlows(cntx, OltSerialNum) |
| delete(mvp.DevicesList, OltSerialNum) |
| if err := mvp.WriteToDb(cntx); err != nil { |
| logger.Errorw(ctx, "Mvlan profile Write to DB failed", log.Fields{"ProfileName": mvp.Name}) |
| } |
| } |
| } |
| |
| // DelMcastConfig for addition of a MVLAN profile |
| func (va *VoltApplication) DelMcastConfig(cntx context.Context, MvlanProfileID string, IgmpProfileID string, IgmpProxyIP string, OltSerialNum string) { |
| |
| va.delOltFromMvlan(cntx, MvlanProfileID, OltSerialNum) |
| va.deleteMcastConfig(OltSerialNum, MvlanProfileID) |
| _ = db.DelMcastConfig(cntx, McastConfigKey(OltSerialNum, MvlanProfileID)) |
| if d, _ := va.GetDeviceBySerialNo(OltSerialNum); d != nil { |
| if mvp := va.GetMvlanProfileByName(MvlanProfileID); mvp != nil { |
| va.RemoveGroupsFromPendingPool(cntx, d.Name, mvp.Mvlan) |
| } |
| } |
| } |
| |
| // DelAllMcastConfig for deletion of all mcast config |
| func (va *VoltApplication) DelAllMcastConfig(cntx context.Context, OltSerialNum string) error { |
| |
| deleteIndividualMcastConfig := func(key interface{}, value interface{}) bool { |
| mcastCfg := value.(*McastConfig) |
| if mcastCfg.OltSerialNum == OltSerialNum { |
| va.DelMcastConfig(cntx, mcastCfg.MvlanProfileID, mcastCfg.IgmpProfileID, mcastCfg.IgmpProxyIP.String(), mcastCfg.OltSerialNum) |
| } |
| return true |
| } |
| va.McastConfigMap.Range(deleteIndividualMcastConfig) |
| return nil |
| } |
| |
| // UpdateMcastConfig for addition of a MVLAN profile |
| func (va *VoltApplication) UpdateMcastConfig(cntx context.Context, MvlanProfileID string, IgmpProfileID string, IgmpProxyIP string, OltSerialNum string) error { |
| |
| mcastCfg := va.GetMcastConfig(OltSerialNum, MvlanProfileID) |
| if mcastCfg == nil { |
| logger.Warnw(ctx, "Mcast Config not found. Unable to update", log.Fields{"Mvlan Profile ID": MvlanProfileID, "OltSerialNum": OltSerialNum}) |
| return nil |
| } |
| |
| oldProfID := mcastCfg.IgmpProfileID |
| mcastCfg.IgmpProfileID = IgmpProfileID |
| mcastCfg.IgmpProxyIP = net.ParseIP(IgmpProxyIP) |
| |
| va.storeMcastConfig(OltSerialNum, MvlanProfileID, mcastCfg) |
| |
| // Update all igds |
| if oldProfID != mcastCfg.IgmpProfileID { |
| updateIgdProxyCfg := func(key interface{}, value interface{}) bool { |
| igd := value.(*IgmpGroupDevice) |
| igd.proxyCfg = va.getIgmpProfileMap(mcastCfg.IgmpProfileID) |
| if IgmpProfileID == "" { |
| igd.IgmpProxyIP = &igd.proxyCfg.IgmpSourceIP |
| } else { |
| igd.IgmpProxyIP = &mcastCfg.IgmpProxyIP |
| } |
| return true |
| } |
| mcastCfg.IgmpGroupDevices.Range(updateIgdProxyCfg) |
| } |
| |
| if err := mcastCfg.WriteToDb(cntx); err != nil { |
| logger.Errorw(ctx, "McastConfig Write to DB failed", log.Fields{"OltSerialNum": mcastCfg.OltSerialNum, "MvlanProfileID": mcastCfg.MvlanProfileID}) |
| } |
| |
| return nil |
| } |
| |
| // WriteToDb is utility to write Mcast config Info to database |
| func (mc *McastConfig) WriteToDb(cntx context.Context) error { |
| mc.Version = database.PresentVersionMap[database.McastConfigPath] |
| b, err := json.Marshal(mc) |
| if err != nil { |
| return err |
| } |
| if err1 := db.PutMcastConfig(cntx, McastConfigKey(mc.OltSerialNum, mc.MvlanProfileID), string(b)); err1 != nil { |
| return err1 |
| } |
| return nil |
| } |
| |
| // RestoreMcastConfigsFromDb to read from the DB and restore Mcast configs |
| func (va *VoltApplication) RestoreMcastConfigsFromDb(cntx context.Context) { |
| mcastConfigs, _ := db.GetMcastConfigs(cntx) |
| for hash, mcastConfig := range mcastConfigs { |
| b, ok := mcastConfig.Value.([]byte) |
| if !ok { |
| logger.Warn(ctx, "The value type is not []byte") |
| continue |
| } |
| var mc McastConfig |
| err := json.Unmarshal(b, &mc) |
| if err != nil { |
| logger.Warn(ctx, "Unmarshal of Mcast config failed") |
| continue |
| } |
| va.storeMcastConfig(mc.OltSerialNum, mc.MvlanProfileID, &mc) |
| logger.Infow(ctx, "Restored Mcast config", log.Fields{"OltSerialNum": mc.OltSerialNum, "MvlanProfileID": mc.MvlanProfileID, "hash": hash}) |
| } |
| } |
| |
| // AddMvlanProfile for addition of a MVLAN profile |
| func (va *VoltApplication) AddMvlanProfile(cntx context.Context, name string, mvlan of.VlanType, ponVlan of.VlanType, |
| groups map[string][]string, isChannelBasedGroup bool, OLTSerialNum []string, activeChannelsPerPon int, proxy map[string]common.MulticastGroupProxy) error { |
| var mvp *MvlanProfile |
| |
| if mvp = va.GetMvlanProfileByTag(mvlan); mvp != nil { |
| logger.Errorw(ctx, "Duplicate MVLAN ID configured", log.Fields{"mvlan": mvlan}) |
| return errors.New("MVLAN profile with same VLANID exists") |
| } |
| if mvpIntf, ok := va.MvlanProfilesByName.Load(name); ok { |
| mvp = mvpIntf.(*MvlanProfile) |
| for _, serialNum := range OLTSerialNum { |
| if mvp.DevicesList[serialNum] != Nil { |
| //This is backup restore scenario, just update the profile |
| logger.Info(ctx, "Add Mvlan : Profile Name already exists, update-the-profile") |
| return va.UpdateMvlanProfile(cntx, name, mvlan, groups, activeChannelsPerPon, proxy) |
| } |
| } |
| } |
| |
| if mvp == nil { |
| mvp = NewMvlanProfile(name, mvlan, ponVlan, isChannelBasedGroup, OLTSerialNum, uint32(activeChannelsPerPon)) |
| } |
| |
| va.storeMvlansMap(mvlan, name, mvp) |
| |
| /* Need to protect groups and proxy write lock */ |
| mvp.mvpLock.Lock() |
| for grpName, grpInfo := range groups { |
| mvp.AddMvlanGroup(grpName, grpInfo) |
| } |
| for grpName, proxyInfo := range proxy { |
| mvp.AddMvlanProxy(grpName, proxyInfo) |
| } |
| if _, ok := mvp.Groups[common.StaticGroup]; ok { |
| if _, yes := mvp.Proxy[common.StaticGroup]; !yes { |
| mvp.Groups[common.StaticGroup].IsStatic = true |
| } |
| } |
| |
| logger.Debugw(ctx, "Added MVLAN Profile", log.Fields{"MVLAN": mvp.Mvlan, "PonVlan": mvp.PonVlan, "Name": mvp.Name, "Grp IPs": mvp.Groups, "IsPonVlanPresent": mvp.IsPonVlanPresent}) |
| mvp.mvpLock.Unlock() |
| |
| if err := mvp.WriteToDb(cntx); err != nil { |
| logger.Errorw(ctx, "Mvlan profile Write to DB failed", log.Fields{"ProfileName": mvp.Name}) |
| } |
| |
| return nil |
| } |
| |
| // GetMvlanProfileForMcIP - Get an MVLAN profile for a given MC IP. This is used when an |
| // IGMP report is received from the PON port. The MVLAN profile |
| // located is used to idnetify the MC VLAN used in upstream for |
| // join/leave |
| func (va *VoltApplication) GetMvlanProfileForMcIP(profileName string, ip net.IP) (*MvlanProfile, string) { |
| if mvpIntf, ok := va.MvlanProfilesByName.Load(profileName); ok { |
| mvp := mvpIntf.(*MvlanProfile) |
| if grpName := mvp.GetMvlanGroup(ip); grpName != "" { |
| return mvp, grpName |
| } |
| } else { |
| logger.Warnw(ctx, "Mvlan Profile not found for given profile name", log.Fields{"Profile": profileName}) |
| } |
| return nil, "" |
| } |
| |
| // IgmpTick for igmp tick info |
| func (va *VoltApplication) IgmpTick(cntx context.Context) { |
| tickCount++ |
| if (tickCount % 1000) == 0 { |
| logger.Debugw(ctx, "Time @ Tick", log.Fields{"Tick": tickCount, "Time": time.Now()}) |
| } |
| igmptick := func(key interface{}, value interface{}) bool { |
| ig := value.(*IgmpGroup) |
| if ig.NumDevicesActive() != 0 { |
| if tickCount%10 == ig.Hash()%10 { |
| ig.IgmpGroupLock.Lock() |
| ig.Tick(cntx) |
| ig.IgmpGroupLock.Unlock() |
| if ig.NumDevicesActive() == 0 { |
| va.DelIgmpGroup(cntx, ig) |
| } |
| } |
| } |
| return true |
| } |
| va.IgmpGroups.Range(igmptick) |
| } |
| |
| // Tick to add Tick Task |
| func (va *VoltApplication) Tick() { |
| tt := NewTickTask() |
| va.IgmpTasks.AddTask(tt) |
| // va.IgmpTick() |
| } |
| |
| //AddIgmpProfile for addition of IGMP Profile |
| func (va *VoltApplication) AddIgmpProfile(cntx context.Context, igmpProfileConfig *common.IGMPConfig) error { |
| var igmpProfile *IgmpProfile |
| |
| if igmpProfileConfig.ProfileID == DefaultIgmpProfID { |
| logger.Info(ctx, "Updating default IGMP profile") |
| return va.UpdateIgmpProfile(cntx, igmpProfileConfig) |
| } |
| |
| igmpProfile = va.checkIgmpProfileMap(igmpProfileConfig.ProfileID) |
| if igmpProfile == nil { |
| igmpProfile = newIgmpProfile(igmpProfileConfig) |
| } else { |
| logger.Errorw(ctx, "IGMP profile already exists", log.Fields{"IgmpProfile": igmpProfileConfig.ProfileID}) |
| return errors.New("IGMP Profile already exists") |
| } |
| |
| va.storeIgmpProfileMap(igmpProfileConfig.ProfileID, igmpProfile) |
| |
| if err := igmpProfile.WriteToDb(cntx); err != nil { |
| logger.Errorw(ctx, "Igmp profile Write to DB failed", log.Fields{"profileID": igmpProfile.ProfileID}) |
| } |
| |
| return nil |
| } |
| |
| // checkIgmpProfileMap to get Igmp Profile. If not found return nil |
| func (va *VoltApplication) checkIgmpProfileMap(name string) *IgmpProfile { |
| if igmpProfileIntf, ok := va.IgmpProfilesByName.Load(name); ok { |
| return igmpProfileIntf.(*IgmpProfile) |
| } |
| return nil |
| } |
| |
| func (va *VoltApplication) resetIgmpProfileToDefault(cntx context.Context) { |
| igmpProf := va.getIgmpProfileMap(DefaultIgmpProfID) |
| defIgmpProf := newDefaultIgmpProfile() |
| |
| igmpProf.UnsolicitedTimeOut = defIgmpProf.UnsolicitedTimeOut |
| igmpProf.MaxResp = defIgmpProf.MaxResp |
| igmpProf.KeepAliveInterval = defIgmpProf.KeepAliveInterval |
| igmpProf.KeepAliveCount = defIgmpProf.KeepAliveCount |
| igmpProf.LastQueryInterval = defIgmpProf.LastQueryInterval |
| igmpProf.LastQueryCount = defIgmpProf.LastQueryCount |
| igmpProf.FastLeave = defIgmpProf.FastLeave |
| igmpProf.PeriodicQuery = defIgmpProf.PeriodicQuery |
| igmpProf.IgmpCos = defIgmpProf.IgmpCos |
| igmpProf.WithRAUpLink = defIgmpProf.WithRAUpLink |
| igmpProf.WithRADownLink = defIgmpProf.WithRADownLink |
| igmpProf.IgmpVerToServer = defIgmpProf.IgmpVerToServer |
| igmpProf.IgmpSourceIP = defIgmpProf.IgmpSourceIP |
| |
| if err := igmpProf.WriteToDb(cntx); err != nil { |
| logger.Errorw(ctx, "Igmp profile Write to DB failed", log.Fields{"profileID": igmpProf.ProfileID}) |
| } |
| } |
| |
| // getIgmpProfileMap to get Igmp Profile. If not found return default IGMP config |
| func (va *VoltApplication) getIgmpProfileMap(name string) *IgmpProfile { |
| if igmpProfileIntf, ok := va.IgmpProfilesByName.Load(name); ok { |
| return igmpProfileIntf.(*IgmpProfile) |
| } |
| |
| // There will be always a default igmp profile. |
| defaultIgmpProfileIntf, _ := va.IgmpProfilesByName.Load(DefaultIgmpProfID) |
| return defaultIgmpProfileIntf.(*IgmpProfile) |
| } |
| |
| // storeIgmpProfileMap to store Igmp Profile |
| func (va *VoltApplication) storeIgmpProfileMap(name string, igmpProfile *IgmpProfile) { |
| va.IgmpProfilesByName.Store(name, igmpProfile) |
| } |
| |
| // deleteIgmpProfileMap to delete Igmp Profile |
| func (va *VoltApplication) deleteIgmpProfileMap(name string) { |
| va.IgmpProfilesByName.Delete(name) |
| } |
| |
| // TODO - DelIgmpProfile for deleting IGMP Profile based on profile Id |
| // func (va *VoltApplication) DelIgmpProfile(cntx context.Context, igmpProfileConfig *common.IGMPConfig) error { |
| // // Deletion of default igmp profile is blocked from submgr. Keeping additional check for safety. |
| // if igmpProfileConfig.ProfileID == DefaultIgmpProfID { |
| // logger.Info(ctx, "Resetting default IGMP profile") |
| // va.resetIgmpProfileToDefault(cntx) |
| // return nil |
| // } |
| // igmpProfile := va.checkIgmpProfileMap(igmpProfileConfig.ProfileID) |
| // if igmpProfile == nil { |
| // logger.Warnw(ctx, "Igmp Profile not found. Unable to delete", log.Fields{"Profile ID": igmpProfileConfig.ProfileID}) |
| // return nil |
| // } |
| |
| // va.deleteIgmpProfileMap(igmpProfileConfig.ProfileID) |
| |
| // _ = db.DelIgmpProfile(cntx, igmpProfileConfig.ProfileID) |
| |
| // return nil |
| // } |
| |
| // DelIgmpProfile for deleting IGMP Profile based on profile Id |
| func (va *VoltApplication) DelIgmpProfile(cntx context.Context, profileID string) error { |
| // Deletion of default igmp profile is blocked from submgr. Keeping additional check for safety. |
| if profileID == DefaultIgmpProfID { |
| logger.Info(ctx, "Resetting default IGMP profile") |
| va.resetIgmpProfileToDefault(cntx) |
| return nil |
| } |
| igmpProfile := va.checkIgmpProfileMap(profileID) |
| if igmpProfile == nil { |
| logger.Warnw(ctx, "Igmp Profile not found. Unable to delete", log.Fields{"Profile ID": profileID}) |
| return nil |
| } |
| |
| va.deleteIgmpProfileMap(profileID) |
| |
| err := db.DelIgmpProfile(cntx, profileID) |
| if err != nil { |
| logger.Errorw(ctx, "Failed to delete Igmp profile from DB", log.Fields{"Error": err}) |
| return err |
| } |
| |
| return nil |
| } |
| |
| //UpdateIgmpProfile for addition of IGMP Profile |
| func (va *VoltApplication) UpdateIgmpProfile(cntx context.Context, igmpProfileConfig *common.IGMPConfig) error { |
| igmpProfile := va.checkIgmpProfileMap(igmpProfileConfig.ProfileID) |
| if igmpProfile == nil { |
| logger.Errorw(ctx, "Igmp Profile not found. Unable to update", log.Fields{"Profile ID": igmpProfileConfig.ProfileID}) |
| return errors.New("IGMP Profile not found") |
| } |
| |
| igmpProfile.ProfileID = igmpProfileConfig.ProfileID |
| igmpProfile.UnsolicitedTimeOut = uint32(igmpProfileConfig.UnsolicitedTimeOut) |
| igmpProfile.MaxResp = uint32(igmpProfileConfig.MaxResp) |
| |
| keepAliveInterval := uint32(igmpProfileConfig.KeepAliveInterval) |
| |
| //KeepAliveInterval should have a min of 10 seconds |
| if keepAliveInterval < MinKeepAliveInterval { |
| keepAliveInterval = MinKeepAliveInterval |
| logger.Infow(ctx, "Auto adjust keepAliveInterval - Value < 10", log.Fields{"Received": igmpProfileConfig.KeepAliveInterval, "Configured": keepAliveInterval}) |
| } |
| igmpProfile.KeepAliveInterval = keepAliveInterval |
| |
| igmpProfile.KeepAliveCount = uint32(igmpProfileConfig.KeepAliveCount) |
| igmpProfile.LastQueryInterval = uint32(igmpProfileConfig.LastQueryInterval) |
| igmpProfile.LastQueryCount = uint32(igmpProfileConfig.LastQueryCount) |
| igmpProfile.FastLeave = *igmpProfileConfig.FastLeave |
| igmpProfile.PeriodicQuery = *igmpProfileConfig.PeriodicQuery |
| igmpProfile.IgmpCos = uint8(igmpProfileConfig.IgmpCos) |
| igmpProfile.WithRAUpLink = *igmpProfileConfig.WithRAUpLink |
| igmpProfile.WithRADownLink = *igmpProfileConfig.WithRADownLink |
| |
| if igmpProfileConfig.IgmpVerToServer == "2" || igmpProfileConfig.IgmpVerToServer == "v2" { |
| igmpProfile.IgmpVerToServer = "2" |
| } else { |
| igmpProfile.IgmpVerToServer = "3" |
| } |
| |
| if igmpProfileConfig.IgmpSourceIP != "" { |
| igmpProfile.IgmpSourceIP = net.ParseIP(igmpProfileConfig.IgmpSourceIP) |
| } |
| |
| if err := igmpProfile.WriteToDb(cntx); err != nil { |
| logger.Errorw(ctx, "Igmp profile Write to DB failed", log.Fields{"profileID": igmpProfile.ProfileID}) |
| } |
| |
| return nil |
| } |
| |
| // RestoreIGMPProfilesFromDb to read from the DB and restore IGMP Profiles |
| func (va *VoltApplication) RestoreIGMPProfilesFromDb(cntx context.Context) { |
| // Loading IGMP profiles |
| igmpProfiles, _ := db.GetIgmpProfiles(cntx) |
| for _, igmpProfile := range igmpProfiles { |
| b, ok := igmpProfile.Value.([]byte) |
| if !ok { |
| logger.Warn(ctx, "The value type is not []byte") |
| continue |
| } |
| var igmpProf IgmpProfile |
| err := json.Unmarshal(b, &igmpProf) |
| if err != nil { |
| logger.Warn(ctx, "Unmarshal of IGMP Profile failed") |
| continue |
| } |
| va.storeIgmpProfileMap(igmpProf.ProfileID, &igmpProf) |
| logger.Infow(ctx, "Restored Igmp Profile", log.Fields{"Conf": igmpProf}) |
| } |
| } |
| |
| // InitIgmpSrcMac for initialization of igmp source mac |
| func (va *VoltApplication) InitIgmpSrcMac() { |
| srcMac, err := getPodMacAddr() |
| if err != nil { |
| igmpSrcMac = "00:11:11:11:11:11" |
| return |
| } |
| igmpSrcMac = srcMac |
| } |
| |
| // DelMvlanProfile for deletion of a MVLAN group |
| func (va *VoltApplication) DelMvlanProfile(cntx context.Context, name string) error { |
| if mvpIntf, ok := va.MvlanProfilesByName.Load(name); ok { |
| mvp := mvpIntf.(*MvlanProfile) |
| |
| if len(mvp.DevicesList) == 0 { |
| mvp.DeleteInProgress = true |
| mvp.DelFromDb(cntx) |
| va.deleteMvlansMap(mvp.Mvlan, name) |
| logger.Debugw(ctx, "Deleted MVLAN Profile", log.Fields{"Name": mvp.Name}) |
| } else { |
| logger.Errorw(ctx, "Unable to delete Mvlan Profile as there is still an OLT attached to it", log.Fields{"Name": mvp.Name, |
| "Device List": mvp.DevicesList}) |
| return errors.New("MVLAN attached to devices") |
| } |
| |
| return nil |
| } |
| logger.Errorw(ctx, "MVLAN Profile not found", log.Fields{"MvlanProfile Name": name}) |
| return nil |
| } |
| |
| // ReceiverUpInd for receiver up indication |
| func (va *VoltApplication) ReceiverUpInd(device string, port string, mvpName string, vlan of.VlanType, pbits []of.PbitType) { |
| logger.Infow(ctx, "Receiver Indication: UP", log.Fields{"device": device, "port": port, "MVP": mvpName, "vlan": vlan, "pbits": pbits}) |
| if mvpIntf, ok := va.MvlanProfilesByName.Load(mvpName); ok { |
| mvp := mvpIntf.(*MvlanProfile) |
| if devIntf, ok := va.DevicesDisc.Load(device); ok { |
| dev := devIntf.(*VoltDevice) |
| proxyCfg, proxyIP, _ := getIgmpProxyCfgAndIP(mvp.Mvlan, dev.SerialNum) |
| for _, pbit := range pbits { |
| sendGeneralQuery(device, port, vlan, uint8(pbit), proxyCfg, proxyIP) |
| } |
| } else { |
| logger.Warnw(ctx, "Device not found for given port", log.Fields{"device": device, "port": port}) |
| } |
| } else { |
| logger.Warnw(ctx, "Mvlan Profile not found for given profileName", log.Fields{"MVP": mvpName, "vlan": vlan}) |
| } |
| } |
| |
| // sendGeneralQuery to send general query |
| func sendGeneralQuery(device string, port string, cVlan of.VlanType, pbit uint8, proxyCfg *IgmpProfile, proxyIP *net.IP) { |
| |
| if queryPkt, err := Igmpv2QueryPacket(AllSystemsMulticastGroupIP, cVlan, *proxyIP, pbit, proxyCfg.MaxResp); err == nil { |
| if err := cntlr.GetController().PacketOutReq(device, port, port, queryPkt, false); err != nil { |
| logger.Warnw(ctx, "General Igmpv2 Query Failed to send", log.Fields{"Device": device, "Port": port, "Packet": queryPkt, "Pbit": pbit}) |
| } else { |
| logger.Debugw(ctx, "General Igmpv2 Query Sent", log.Fields{"Device": device, "Port": port, "Packet": queryPkt, "Pbit": pbit}) |
| } |
| } |
| if getVersion(proxyCfg.IgmpVerToServer) == IgmpVersion3 { |
| if queryPkt, err := Igmpv3QueryPacket(AllSystemsMulticastGroupIP, cVlan, *proxyIP, pbit, proxyCfg.MaxResp); err == nil { |
| if err := cntlr.GetController().PacketOutReq(device, port, port, queryPkt, false); err != nil { |
| logger.Warnw(ctx, "General Igmpv3 Query Failed to send", log.Fields{"Device": device, "Port": port, "Packet": queryPkt, "Pbit": pbit}) |
| } else { |
| logger.Debugw(ctx, "General Igmpv3 Query Sent", log.Fields{"Device": device, "Port": port, "Packet": queryPkt, "Pbit": pbit}) |
| } |
| } |
| } |
| } |
| |
| // ReceiverDownInd to send receiver down indication |
| func (va *VoltApplication) ReceiverDownInd(cntx context.Context, device string, port string) { |
| logger.Infow(ctx, " Receiver Indication: DOWN", log.Fields{"device": device, "port": port}) |
| |
| ponPortID := va.GetPonPortID(device, port) |
| |
| del := func(key interface{}, value interface{}) bool { |
| ig := value.(*IgmpGroup) |
| ig.IgmpGroupLock.Lock() |
| ig.DelReceiveronDownInd(cntx, device, port, ponPortID) |
| ig.IgmpGroupLock.Unlock() |
| if ig.NumDevicesActive() == 0 { |
| va.DelIgmpGroup(cntx, ig) |
| } |
| return true |
| } |
| va.IgmpGroups.Range(del) |
| } |