[VOL-4754] Igmp code re-organization
Change-Id: Ib34ea7af0b445e0091593bca6882412fea220fb1
diff --git a/internal/pkg/application/igmpgroupchannel.go b/internal/pkg/application/igmpgroupchannel.go
new file mode 100644
index 0000000..ed39d23
--- /dev/null
+++ b/internal/pkg/application/igmpgroupchannel.go
@@ -0,0 +1,735 @@
+/*
+* Copyright 2022-present Open Networking Foundation
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package application
+
+import (
+ "encoding/json"
+ "net"
+
+ "github.com/google/gopacket/layers"
+
+ cntlr "voltha-go-controller/internal/pkg/controller"
+ "voltha-go-controller/internal/pkg/types"
+ "voltha-go-controller/internal/pkg/of"
+ "voltha-go-controller/log"
+)
+
+// IgmpGroupChannel structure
+type IgmpGroupChannel struct {
+ Device string
+ GroupID uint32
+ GroupName string
+ GroupAddr net.IP
+ Mvlan of.VlanType
+ Exclude int
+ ExcludeList []net.IP
+ IncludeList []net.IP
+ Version uint8
+ ServVersion *uint8 `json:"-"`
+ CurReceivers map[string]*IgmpGroupPort `json:"-"`
+ NewReceivers map[string]*IgmpGroupPort `json:"-"`
+ proxyCfg **IgmpProfile
+ IgmpProxyIP **net.IP `json:"-"`
+}
+
+// NewIgmpGroupChannel is constructor for a channel. The default IGMP version is set to 3
+// as the protocol defines the way to manage backward compatibility
+// The implementation handles simultaneous presense of lower versioned
+// receivers
+func NewIgmpGroupChannel(igd *IgmpGroupDevice, groupAddr net.IP, version uint8) *IgmpGroupChannel {
+ var igc IgmpGroupChannel
+ igc.Device = igd.Device
+ igc.GroupID = igd.GroupID
+ igc.GroupName = igd.GroupName
+ igc.GroupAddr = groupAddr
+ igc.Mvlan = igd.Mvlan
+ igc.Version = version
+ igc.CurReceivers = make(map[string]*IgmpGroupPort)
+ igc.NewReceivers = make(map[string]*IgmpGroupPort)
+ igc.proxyCfg = &igd.proxyCfg
+ igc.IgmpProxyIP = &igd.IgmpProxyIP
+ igc.ServVersion = igd.ServVersion
+ return &igc
+}
+
+// NewIgmpGroupChannelFromBytes create the IGMP group channel from a byte slice
+func NewIgmpGroupChannelFromBytes(b []byte) (*IgmpGroupChannel, error) {
+ var igc IgmpGroupChannel
+ if err := json.Unmarshal(b, &igc); err != nil {
+ return nil, err
+ }
+ igc.CurReceivers = make(map[string]*IgmpGroupPort)
+ igc.NewReceivers = make(map[string]*IgmpGroupPort)
+ return &igc, nil
+}
+
+// RestorePorts to restore ports
+func (igc *IgmpGroupChannel) RestorePorts() {
+
+ igc.migrateIgmpPorts()
+ ports, _ := db.GetIgmpRcvrs(igc.Mvlan, igc.GroupAddr, igc.Device)
+ for _, port := range ports {
+ b, ok := port.Value.([]byte)
+ if !ok {
+ logger.Warn(ctx, "The value type is not []byte")
+ continue
+ }
+ if igp, err := NewIgmpGroupPortFromBytes(b); err == nil {
+ igc.NewReceivers[igp.Port] = igp
+ logger.Infow(ctx, "Group Port Restored", log.Fields{"IGP": igp})
+ } else {
+ logger.Warn(ctx, "Failed to decode port from DB")
+ }
+ }
+ if err := igc.WriteToDb(); err != nil {
+ logger.Errorw(ctx, "Igmp group channel Write to DB failed", log.Fields{"mvlan": igc.Mvlan, "GroupAddr": igc.GroupAddr})
+ }
+}
+
+// WriteToDb is utility to write IGMPGroupChannel Info to database
+func (igc *IgmpGroupChannel) WriteToDb() error {
+ b, err := json.Marshal(igc)
+ if err != nil {
+ return err
+ }
+ if err1 := db.PutIgmpChannel(igc.Mvlan, igc.GroupName, igc.Device, igc.GroupAddr, string(b)); err1 != nil {
+ return err1
+ }
+ logger.Info(ctx, "IGC Updated")
+ return nil
+}
+
+
+// InclSourceIsIn checks if a source is in include list
+func (igc *IgmpGroupChannel) InclSourceIsIn(src net.IP) bool {
+ return IsIPPresent(src, igc.IncludeList)
+}
+
+// ExclSourceIsIn checks if a source is in exclude list
+func (igc *IgmpGroupChannel) ExclSourceIsIn(src net.IP) bool {
+ return IsIPPresent(src, igc.ExcludeList)
+}
+
+// AddInclSource adds a source is in include list
+func (igc *IgmpGroupChannel) AddInclSource(src net.IP) {
+ logger.Debugw(ctx, "Adding Include Source for Channel", log.Fields{"Channel": igc.GroupAddr.String(), "Device": igc.Device, "Src": src})
+ igc.IncludeList = append(igc.IncludeList, src)
+}
+
+// AddExclSource adds a source is in exclude list
+func (igc *IgmpGroupChannel) AddExclSource(src net.IP) {
+ logger.Debugw(ctx, "Adding Exclude Source for Channel", log.Fields{"Channel": igc.GroupAddr.String(), "Device": igc.Device, "Src": src})
+ igc.ExcludeList = append(igc.ExcludeList, src)
+}
+
+// UpdateExclSource update excl source list for the given channel
+func (igc *IgmpGroupChannel) UpdateExclSource(srcList []net.IP) bool {
+
+ logger.Debugw(ctx, "Updating Exclude Source for Channel", log.Fields{"Channel": igc.GroupAddr.String(), "Device": igc.Device, "Current List": igc.ExcludeList, "Incoming List": srcList})
+ if !igc.IsExclListChanged(srcList) {
+ return false
+ }
+
+ if igc.NumReceivers() == 1 {
+ igc.ExcludeList = srcList
+ } else {
+ igc.ExcludeList = igc.computeExclList(srcList)
+ }
+
+ logger.Debugw(ctx, "Updated Exclude Source for Channel", log.Fields{"Channel": igc.GroupAddr.String(), "Device": igc.Device, "Updated Excl List": igc.ExcludeList})
+ return true
+}
+
+// computeExclList computes intersection of pervious & current src list
+func (igc *IgmpGroupChannel) computeExclList(srcList []net.IP) []net.IP {
+
+ updatedSrcList := []net.IP{}
+ for _, src := range srcList {
+ for _, excl := range igc.ExcludeList {
+ if src.Equal(excl) {
+ updatedSrcList = append(updatedSrcList, src)
+ }
+ }
+ }
+ return updatedSrcList
+}
+
+// IsExclListChanged checks if excl list has been updated
+func (igc *IgmpGroupChannel) IsExclListChanged(srcList []net.IP) bool {
+
+ srcPresent := false
+ if len(igc.ExcludeList) != len(srcList) {
+ return true
+ }
+
+ for _, src := range srcList {
+ for _, excl := range igc.ExcludeList {
+ srcPresent = false
+ if src.Equal(excl) {
+ srcPresent = true
+ break
+ }
+ }
+ if !srcPresent {
+ return true
+ }
+ }
+ return false
+}
+
+// DelInclSource deletes a source is in include list
+func (igc *IgmpGroupChannel) DelInclSource(src net.IP) {
+ mvp := GetApplication().GetMvlanProfileByTag(igc.Mvlan)
+ /* If the SSM proxy is configured, then we can del the src ip from igc as whatever is in proxy that is final list */
+ if _, ok := mvp.Proxy[igc.GroupName]; !ok {
+ logger.Debugw(ctx, "Deleting Include Source for Channel", log.Fields{"Channel": igc.GroupAddr.String(), "Device": igc.Device, "Src": src})
+ for _, igp := range igc.CurReceivers {
+ if igp.InclSourceIsIn(src) {
+ logger.Infow(ctx, "Skipping deletion: Source Present for another Receiver", log.Fields{"Receiver": igp.Port})
+ return
+ }
+ }
+ for _, igp := range igc.NewReceivers {
+ if igp.InclSourceIsIn(src) {
+ logger.Infow(ctx, "Skipping deletion: Source Present for another Receiver", log.Fields{"Receiver": igp.Port})
+ return
+ }
+ }
+ } else {
+ logger.Debug(ctx, "Proxy configured, not Deleting Include Source for Channel")
+ }
+ for i, addr := range igc.IncludeList {
+ if addr.Equal(src) {
+ igc.IncludeList = append(igc.IncludeList[:i], igc.IncludeList[i+1:]...)
+ return
+ }
+ }
+}
+
+// DelExclSource deletes a source is in exclude list
+func (igc *IgmpGroupChannel) DelExclSource(src net.IP) {
+ logger.Debugw(ctx, "Deleting Exclude Source for Channel", log.Fields{"Channel": igc.GroupAddr.String(), "Device": igc.Device, "Src": src})
+
+ for _, igp := range igc.CurReceivers {
+ if igp.ExclSourceIsIn(src) {
+ logger.Infow(ctx, "Skipping deletion: Source Present for another Receiver", log.Fields{"Receiver": igp.Port})
+ return
+ }
+ }
+ for _, igp := range igc.NewReceivers {
+ if igp.ExclSourceIsIn(src) {
+ logger.Infow(ctx, "Skipping deletion: Source Present for another Receiver", log.Fields{"Receiver": igp.Port})
+ return
+ }
+ }
+ for i, addr := range igc.ExcludeList {
+ if addr.Equal(src) {
+ igc.ExcludeList = append(igc.ExcludeList[:i], igc.ExcludeList[i+1:]...)
+ return
+ }
+ }
+}
+
+// ProcessSources process the received list of either included sources or the excluded sources
+// The return value indicate sif the group is modified and needs to be informed
+// to the upstream multicast servers
+func (igc *IgmpGroupChannel) ProcessSources(port string, ip []net.IP, incl bool) (bool, bool) {
+ groupChanged := false
+ groupExclUpdated := false
+ receiverSrcListEmpty := false
+ // If the version type is 2, there isn't anything to process here
+ if igc.Version == IgmpVersion2 && *igc.ServVersion == IgmpVersion2 {
+ return false, false
+ }
+
+ igp := igc.GetReceiver(port)
+ if igp == nil {
+ logger.Warnw(ctx, "Receiver not found", log.Fields{"Port": port})
+ return false, false
+ }
+ mvp := GetApplication().GetMvlanProfileByTag(igc.Mvlan)
+ if incl {
+ for _, src := range ip {
+
+ if igp.ExclSourceIsIn(src) {
+ igp.DelExclSource(src)
+ if igc.ExclSourceIsIn(src) {
+ igc.DelExclSource(src)
+ groupChanged = true
+ }
+ }
+
+ // If the source is not in the list of include sources for the port
+ // add it. If so, check also if it is in list of include sources
+ // at the device level.
+ if !igp.InclSourceIsIn(src) {
+ igp.AddInclSource(src)
+ if !igc.InclSourceIsIn(src) {
+ igc.AddInclSource(src)
+ groupChanged = true
+ }
+ }
+ }
+ /* If any of the existing ip in the source list is removed we need to remove from the list in igp and igc */
+ if _, ok := mvp.Proxy[igc.GroupName]; ok {
+ /* If we get leave message from any subscriber, we do not have to delete the entries in the src list
+ Only if ther is any modification in the src list by proxy config update only then we need to update */
+ if len(ip) != 0 && len(ip) != len(igc.IncludeList) {
+ for i := len(igc.IncludeList) - 1; i >= 0; i-- {
+ src := igc.IncludeList[i]
+ if !IsIPPresent(src, ip) {
+ igp.DelInclSource(src)
+ igc.DelInclSource(src)
+ groupChanged = true
+ }
+ }
+ }
+ }
+ } else {
+ for _, src := range ip {
+
+ if igp.InclSourceIsIn(src) {
+ igp.DelInclSource(src)
+ if igc.InclSourceIsIn(src) {
+ igc.DelInclSource(src)
+ groupChanged = true
+ }
+ if len(igp.IncludeList) == 0 {
+ receiverSrcListEmpty = true
+ }
+ }
+
+ // If the source is not in the list of exclude sources for the port
+ // add it. If so, check also if it is in list of include sources
+ // at the device level.
+ if !igp.ExclSourceIsIn(src) {
+ igp.AddExclSource(src)
+ /* If there is any update in the src list of proxy we need to update the igc */
+ if _, ok := mvp.Proxy[igc.GroupName]; ok {
+ if !igc.ExclSourceIsIn(src) {
+ igc.AddExclSource(src)
+ groupChanged = true
+ }
+ }
+ }
+ }
+ /* If any of the existing ip in the source list is removed we need to remove from the list in igp and igc */
+ if _, ok := mvp.Proxy[igc.GroupName]; ok {
+ if len(ip) != len(igc.ExcludeList) {
+ for i := len(igc.ExcludeList) - 1; i >= 0; i-- {
+ src := igc.ExcludeList[i]
+ if !IsIPPresent(src, ip) {
+ igp.DelExclSource(src)
+ igc.DelExclSource(src)
+ groupChanged = true
+ }
+ }
+ }
+ }
+ groupExclUpdated = igc.UpdateExclSource(ip)
+ }
+ if err := igp.WriteToDb(igc.Mvlan, igc.GroupAddr, igc.Device); err != nil {
+ logger.Errorw(ctx, "Igmp group port Write to DB failed", log.Fields{"mvlan": igc.Mvlan, "GroupAddr": igc.GroupAddr})
+ }
+ return (groupChanged || groupExclUpdated), receiverSrcListEmpty
+}
+
+// GetReceiver to get receiver info
+func (igc *IgmpGroupChannel) GetReceiver(port string) *IgmpGroupPort {
+ igp := igc.NewReceivers[port]
+ if igp == nil {
+ igp = igc.CurReceivers[port]
+ }
+ return igp
+}
+
+// AddReceiver add the receiver to the device and perform other actions such as adding the group
+// to the physical device, add members, add flows to point the MC packets to the
+// group. Also, send a IGMP report upstream if there is a change in the group
+func (igc *IgmpGroupChannel) AddReceiver(port string, group *layers.IGMPv3GroupRecord, cvlan uint16, pbit uint8) bool {
+
+ var igp *IgmpGroupPort
+ var groupModified = false
+ var isNewReceiver = false
+
+ var ip []net.IP
+ incl := false
+ mvp := GetApplication().GetMvlanProfileByTag(igc.Mvlan)
+ if _, ok := mvp.Proxy[igc.GroupName]; ok {
+ if mvp.Proxy[igc.GroupName].Mode == common.Include {
+ incl = true
+ }
+ ip = mvp.Proxy[igc.GroupName].SourceList
+ } else if group != nil {
+ incl = isIncl(group.Type)
+ ip = group.SourceAddresses
+ }
+ logger.Debugw(ctx, "Attempting to add receiver", log.Fields{"Version": igc.Version, "Port": port, "Incl": incl, "srcIp": ip})
+
+ //logger.Infow(ctx, "Receivers", log.Fields{"New": igc.NewReceivers, "Current": igc.CurReceivers})
+ logger.Debugw(ctx, "Receiver Group", log.Fields{"Igd GId": igc.GroupID})
+ logger.Debugw(ctx, "Receiver Channel", log.Fields{"Igd addr": igc.GroupAddr})
+ logger.Debugw(ctx, "Receiver Mvlan", log.Fields{"Igd mvlan": igc.Mvlan})
+ logger.Debugw(ctx, "Receiver Sources", log.Fields{"Igd addr": ip})
+
+ ponPortID := GetApplication().GetPonPortID(igc.Device, port)
+
+ // Process the IGMP receiver. If it is already in, we should only process the changes
+ // to source list.
+ var newRcvExists bool
+ igp, newRcvExists = igc.NewReceivers[port]
+ if !newRcvExists {
+ // Add the receiver to the list of receivers and make the necessary group modification
+ // if this is the first time the receiver is added
+ var curRcvExists bool
+ if igp, curRcvExists = igc.CurReceivers[port]; curRcvExists {
+ logger.Debugw(ctx, "Existing IGMP receiver", log.Fields{"Group": igc.GroupAddr.String(), "Port": port})
+ delete(igc.CurReceivers, port)
+ igp.QueryTimeoutCount = 0
+ igc.NewReceivers[port] = igp
+ } else {
+ // New receiver who wasn't part of earlier list
+ // Need to send out IGMP group modification for this port
+ igp = NewIgmpGroupPort(port, cvlan, pbit, igc.Version, incl, uint32(ponPortID))
+ igc.NewReceivers[port] = igp
+ isNewReceiver = true
+ logger.Debugw(ctx, "New IGMP receiver", log.Fields{"Group": igc.GroupAddr.String(), "Port": port})
+ if len(igc.NewReceivers) == 1 && len(igc.CurReceivers) == 0 {
+ groupModified = true
+ igc.AddMcFlow()
+ logger.Debugw(ctx, "Added New Flow", log.Fields{"Group": igc.GroupAddr.String(), "Port": port})
+ }
+ if !incl {
+ igc.Exclude++
+ }
+ }
+ }
+
+ // Process the include/exclude list which may end up modifying the group
+ if change, _ := igc.ProcessSources(port, ip, incl); change {
+ groupModified = true
+ }
+ igc.ProcessMode(port, incl)
+
+ // If the group is modified as this is the first receiver or due to include/exclude list modification
+ // send a report to the upstream multicast servers
+ if groupModified {
+ logger.Debug(ctx, "Group Modified and IGMP report sent to the upstream server")
+ igc.SendReport(false)
+ } else if newRcvExists {
+ return false
+ }
+
+ logger.Debugw(ctx, "Channel Receiver Added", log.Fields{"Group Channel": igc.GroupAddr, "Group Port": igp})
+
+ if err := igc.WriteToDb(); err != nil {
+ logger.Errorw(ctx, "Igmp group channel Write to DB failed", log.Fields{"mvlan": igc.Mvlan, "GroupAddr": igc.GroupAddr})
+ }
+ if err := igp.WriteToDb(igc.Mvlan, igc.GroupAddr, igc.Device); err != nil {
+ logger.Errorw(ctx, "Igmp group port Write to DB failed", log.Fields{"mvlan": igc.Mvlan, "GroupAddr": igc.GroupAddr})
+ }
+ return isNewReceiver
+}
+
+// DelReceiver is called when Query expiry happened for a receiver. This removes the receiver from the
+// the group
+func (igc *IgmpGroupChannel) DelReceiver(port string, incl bool, srcList []net.IP) bool {
+ // The receiver may exist either in NewReceiver list or
+ // the CurReceivers list. Find and remove it from either
+ // of the lists.
+ logger.Debugw(ctx, "Deleting Receiver from Channel", log.Fields{"Port": port, "SrcList": srcList, "Incl": incl})
+ logger.Debugw(ctx, "New Receivers", log.Fields{"New": igc.NewReceivers})
+ logger.Debugw(ctx, "Current Receivers", log.Fields{"Current": igc.CurReceivers})
+
+ receiversUpdated := false
+ groupModified, receiverSrcListEmpty := igc.ProcessSources(port, srcList, incl)
+
+ if len(srcList) == 0 || len(igc.IncludeList) == 0 || receiverSrcListEmpty {
+ if igp, ok := igc.NewReceivers[port]; ok {
+ logger.Debug(ctx, "Deleting from NewReceivers")
+ delete(igc.NewReceivers, port)
+ receiversUpdated = true
+ if igp.Exclude {
+ igc.Exclude--
+ }
+ } else {
+ if igp, ok1 := igc.CurReceivers[port]; ok1 {
+ logger.Debug(ctx, "Deleting from CurReceivers")
+ delete(igc.CurReceivers, port)
+ receiversUpdated = true
+ if igp.Exclude {
+ igc.Exclude--
+ }
+ } else {
+ logger.Debug(ctx, "Receiver doesnot exist. Dropping Igmp leave")
+ return false
+ }
+ }
+ _ = db.DelIgmpRcvr(igc.Mvlan, igc.GroupAddr, igc.Device, port)
+ }
+
+ if igc.NumReceivers() == 0 {
+ igc.DelMcFlow()
+ mvp := GetApplication().GetMvlanProfileByTag(igc.Mvlan)
+ /* If proxy is configured and NumReceivers is 0, then we can reset the igc src list so that we send leave */
+ if _, ok := mvp.Proxy[igc.GroupName]; ok {
+ igc.IncludeList = []net.IP{}
+ }
+ igc.SendLeaveToServer()
+ logger.Debugw(ctx, "Deleted the receiver Flow", log.Fields{"Num Receivers": igc.NumReceivers()})
+ return true
+ }
+ if groupModified {
+ igc.SendReport(false)
+ logger.Infow(ctx, "Updated SourceList for Channel", log.Fields{"Current": igc.CurReceivers, "New": igc.NewReceivers})
+ }
+ if err := igc.WriteToDb(); err != nil {
+ logger.Errorw(ctx, "Igmp group channel Write to DB failed", log.Fields{"mvlan": igc.Mvlan, "GroupAddr": igc.GroupAddr})
+ }
+ logger.Infow(ctx, "Updated Receiver info for Channel", log.Fields{"Current": igc.CurReceivers, "New": igc.NewReceivers})
+
+ return receiversUpdated
+}
+
+// DelAllReceivers deletes all receiver for the provided igmp device
+func (igc *IgmpGroupChannel) DelAllReceivers() {
+ logger.Infow(ctx, "Deleting All Receiver for Channel", log.Fields{"Device": igc.Device, "Channel": igc.GroupAddr.String()})
+ _ = db.DelAllIgmpRcvr(igc.Mvlan, igc.GroupAddr, igc.Device)
+ igc.Exclude = 0
+ igc.DelMcFlow()
+ igc.SendLeaveToServer()
+ logger.Infow(ctx, "MC Flow deleted and Leave sent", log.Fields{"Channel": igc.GroupAddr.String(), "Device": igc.Device})
+}
+
+// Igmpv2ReportPacket build an IGMPv2 Report for the upstream servers
+func (igc *IgmpGroupChannel) Igmpv2ReportPacket() ([]byte, error) {
+ logger.Debugw(ctx, "Buidling IGMP version 2 Report", log.Fields{"Device": igc.Device})
+ return IgmpReportv2Packet(igc.GroupAddr, igc.Mvlan, (*igc.proxyCfg).IgmpCos, **igc.IgmpProxyIP)
+}
+
+// Igmpv3ReportPacket build an IGMPv3 Report for the upstream servers
+func (igc *IgmpGroupChannel) Igmpv3ReportPacket() ([]byte, error) {
+ logger.Debugw(ctx, "Buidling IGMP version 3 Report", log.Fields{"Device": igc.Device, "Exclude": igc.Exclude})
+ if igc.Exclude > 0 {
+ return Igmpv3ReportPacket(igc.GroupAddr, igc.Mvlan, (*igc.proxyCfg).IgmpCos, **igc.IgmpProxyIP, false, igc.ExcludeList)
+ }
+ return Igmpv3ReportPacket(igc.GroupAddr, igc.Mvlan, (*igc.proxyCfg).IgmpCos, **igc.IgmpProxyIP, true, igc.IncludeList)
+}
+
+// SendReport send a consolidated report to the server
+func (igc *IgmpGroupChannel) SendReport(isQuery bool) {
+ var report []byte
+ var err error
+ logger.Debugw(ctx, "Checking Version", log.Fields{"IGC Version": igc.Version, "Proxy Version": (*igc.proxyCfg).IgmpVerToServer,
+ "Result": (getVersion((*igc.proxyCfg).IgmpVerToServer) == IgmpVersion2)})
+
+ /**
+ +------------------------------------------------------------------------+
+ | IGMP version(towards BNG) Configured at VGC |
+ +-------------------------------+----------------------------------------+
+ | v2 | v3 |
+ +===================+==========+===============================+========================================+
+ | Received From RG | V2 Join | Process and Send as V2 to BNG | Process, Convert to v3 and Send to BNG |
+ | | | | Process, Send as v2, if the BNG is v2 |
+ +===================+----------+-------------------------------+----------------------------------------+
+ | V3 Join | Process and Send as V2 to BNG | Process, Send v3 to BNG |
+ | | | Process, Convert, Send as v2, if the |
+ | | | BNG is v2 |
+ +===================+==========+===============================+========================================+
+ | Received From BNG | V2 Query | V2 response to BNG | V2 response to BNG |
+ +===================+----------+-------------------------------+----------------------------------------+
+ | V3 Query | Discard | V3 response to BNG |
+ +==========+===============================+========================================+
+ */
+ // igc.Version: igmp version received from RG.
+ // igc.ServVersion: igmp version received from BNG or IgmpVerToServer present in proxy igmp conf.
+
+ if isQuery && *igc.ServVersion == IgmpVersion3 && getVersion((*igc.proxyCfg).IgmpVerToServer) == IgmpVersion2 {
+ // This is the last scenario where we must discard the query processing.
+ logger.Debug(ctx, "Dropping query packet since the server verion is v3 but igmp proxy version is v2")
+ return
+ }
+
+ if *igc.ServVersion == IgmpVersion2 || getVersion((*igc.proxyCfg).IgmpVerToServer) == IgmpVersion2 {
+ report, err = igc.Igmpv2ReportPacket()
+ } else {
+ report, err = igc.Igmpv3ReportPacket()
+ }
+ if err != nil {
+ logger.Warnw(ctx, "Error Preparing Report", log.Fields{"Device": igc.Device, "Ver": igc.Version, "Reason": err.Error()})
+ return
+ }
+ nni, err := GetApplication().GetNniPort(igc.Device)
+ if err == nil {
+ _ = cntlr.GetController().PacketOutReq(igc.Device, nni, nni, report, false)
+ } else {
+ logger.Warnw(ctx, "Didn't find NNI port", log.Fields{"Device": igc.Device})
+ }
+}
+
+// AddMcFlow adds flow to the device when the first receiver joins
+func (igc *IgmpGroupChannel) AddMcFlow() {
+ flow, err := igc.BuildMcFlow()
+ if err != nil {
+ logger.Warnw(ctx, "MC Flow Build Failed", log.Fields{"Reason": err.Error()})
+ return
+ }
+ port, _ := GetApplication().GetNniPort(igc.Device)
+ _ = cntlr.GetController().AddFlows(port, igc.Device, flow)
+}
+
+// DelMcFlow deletes flow from the device when the last receiver leaves
+func (igc *IgmpGroupChannel) DelMcFlow() {
+ flow, err := igc.BuildMcFlow()
+ if err != nil {
+ logger.Warnw(ctx, "MC Flow Build Failed", log.Fields{"Reason": err.Error()})
+ return
+ }
+ flow.ForceAction = true
+ device := GetApplication().GetDevice(igc.Device)
+
+ if mvpIntf, _ := GetApplication().MvlanProfilesByTag.Load(igc.Mvlan); mvpIntf != nil {
+ mvp := mvpIntf.(*MvlanProfile)
+ err := mvp.DelFlows(device, flow)
+ if err != nil {
+ logger.Warnw(ctx, "Delering IGMP Flow for device failed ", log.Fields{"Device": device, "err": err})
+ }
+ }
+}
+
+// BuildMcFlow builds the flow using which it is added/deleted
+func (igc *IgmpGroupChannel) BuildMcFlow() (*of.VoltFlow, error) {
+ flow := &of.VoltFlow{}
+ flow.SubFlows = make(map[uint64]*of.VoltSubFlow)
+ //va := GetApplication()
+ logger.Infow(ctx, "Building Mcast flow", log.Fields{"Mcast Group": igc.GroupAddr.String(), "Mvlan": igc.Mvlan.String()})
+ uintGroupAddr := ipv4ToUint(igc.GroupAddr)
+ subFlow := of.NewVoltSubFlow()
+ subFlow.SetMatchVlan(igc.Mvlan)
+ subFlow.SetIpv4Match()
+ subFlow.SetMatchDstIpv4(igc.GroupAddr)
+ mvp := GetApplication().GetMvlanProfileByTag(igc.Mvlan)
+ //nni, err := va.GetNniPort(igc.Device)
+ //if err != nil {
+ // return nil, err
+ //}
+ //inport, err := va.GetPortID(nni)
+ //if err != nil {
+ // return nil, err
+ //}
+ //subFlow.SetInPort(inport)
+ subFlow.SetOutGroup(igc.GroupID)
+ cookiePort := uintGroupAddr
+ subFlow.Cookie = uint64(cookiePort)<<32 | uint64(igc.Mvlan)
+ subFlow.Priority = of.McFlowPriority
+ metadata := uint64(mvp.PonVlan)
+ subFlow.SetTableMetadata(metadata)
+
+ flow.SubFlows[subFlow.Cookie] = subFlow
+ logger.Infow(ctx, "Built Mcast flow", log.Fields{"cookie": subFlow.Cookie, "subflow": subFlow})
+ return flow, nil
+}
+
+// IgmpLeaveToServer sends IGMP leave to server. Called when the last receiver leaves the group
+func (igc *IgmpGroupChannel) IgmpLeaveToServer() {
+ if leave, err := IgmpLeavePacket(igc.GroupAddr, igc.Mvlan, (*igc.proxyCfg).IgmpCos, **igc.IgmpProxyIP); err == nil {
+ nni, err1 := GetApplication().GetNniPort(igc.Device)
+ if err1 == nil {
+ _ = cntlr.GetController().PacketOutReq(igc.Device, nni, nni, leave, false)
+ }
+ }
+}
+
+// SendLeaveToServer delete the group when the last receiver leaves the group
+func (igc *IgmpGroupChannel) SendLeaveToServer() {
+ /**
+ +-------------------------------------------------------------------------+
+ | IGMP version(towards BNG) Configured at VGC |
+ +-------------------------------+-----------------------------------------+
+ | v2 | v3 |
+ +===================+==========+===============================+=========================================+
+ | Received From RG | V2 Leave | Process and Send as V2 to BNG | Process, Convert to V3 and Send to BNG/ |
+ | | | | Process, Send as V2, if the BNG is V2 |
+ +===================+----------+-------------------------------+-----------------------------------------+
+ | V3 Leave | Process and Send as V2 to BNG | Process, Send V3 to BNG |
+ | | | Process, Convert, Send as V2, if the |
+ | | | BNG is v2 |
+ +==========+===============================+=========================================+
+ */
+ // igc.Version: igmp version received from RG.
+ // igc.ServVersion: igmp version received from BNG or IgmpVerToServer present in proxy igmp conf.
+
+ logger.Debugw(ctx, "Sending IGMP leave upstream", log.Fields{"Device": igc.Device})
+ if *igc.ServVersion == IgmpVersion2 || getVersion((*igc.proxyCfg).IgmpVerToServer) == IgmpVersion2 {
+ igc.IgmpLeaveToServer()
+ } else {
+ igc.SendReport(false)
+ }
+}
+
+// NumReceivers returns total number of receivers left on the group
+func (igc *IgmpGroupChannel) NumReceivers() uint32 {
+ return uint32(len(igc.CurReceivers) + len(igc.NewReceivers))
+}
+
+// SendQuery sends query to the receivers for counting purpose
+func (igc *IgmpGroupChannel) SendQuery() {
+ //var b []byte
+ //var err error
+ for portKey, port := range igc.NewReceivers {
+ igc.CurReceivers[portKey] = port
+ }
+
+ igc.NewReceivers = make(map[string]*IgmpGroupPort)
+
+ logger.Debugw(ctx, "Sending Query to receivers", log.Fields{"Receivers": igc.CurReceivers})
+ for port, groupPort := range igc.CurReceivers {
+ if port == StaticPort {
+ continue
+ }
+ if queryPkt, err := igc.buildQuery(igc.GroupAddr, of.VlanType(groupPort.CVlan), groupPort.Pbit); err == nil {
+ _ = cntlr.GetController().PacketOutReq(igc.Device, port, port, queryPkt, false)
+ logger.Debugw(ctx, "Query Sent", log.Fields{"Device": igc.Device, "Port": port, "Packet": queryPkt})
+ } else {
+ logger.Warnw(ctx, "Query Creation Failed", log.Fields{"Reason": err.Error()})
+ }
+ }
+
+}
+
+// buildQuery to build query packet
+func (igc *IgmpGroupChannel) buildQuery(groupAddr net.IP, cVlan of.VlanType, pbit uint8) ([]byte, error) {
+ if igc.Version == IgmpVersion2 {
+ return Igmpv2QueryPacket(igc.GroupAddr, cVlan, **igc.IgmpProxyIP, pbit, (*igc.proxyCfg).MaxResp)
+ }
+ return Igmpv3QueryPacket(igc.GroupAddr, cVlan, **igc.IgmpProxyIP, pbit, (*igc.proxyCfg).MaxResp)
+}
+
+// ProcessMode process the received mode and updated the igp
+func (igc *IgmpGroupChannel) ProcessMode(port string, incl bool) {
+ /* Update the mode in igp if the mode has changed */
+ igp := igc.GetReceiver(port)
+ if igp.Exclude && incl {
+ igp.Exclude = !incl
+ if igc.Exclude > 0 {
+ igc.Exclude--
+ }
+ } else if !incl && !igp.Exclude {
+ igp.Exclude = !incl
+ igc.Exclude++
+ }
+}
+