| /* |
| * Copyright 2022-present Open Networking Foundation |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package application |
| |
| import ( |
| "context" |
| "encoding/json" |
| "net" |
| |
| "github.com/google/gopacket/layers" |
| |
| cntlr "voltha-go-controller/internal/pkg/controller" |
| "voltha-go-controller/internal/pkg/types" |
| "voltha-go-controller/internal/pkg/of" |
| "voltha-go-controller/log" |
| ) |
| |
| // IgmpGroupChannel structure |
| type IgmpGroupChannel struct { |
| Device string |
| GroupID uint32 |
| GroupName string |
| GroupAddr net.IP |
| Mvlan of.VlanType |
| Exclude int |
| ExcludeList []net.IP |
| IncludeList []net.IP |
| Version uint8 |
| ServVersion *uint8 `json:"-"` |
| CurReceivers map[string]*IgmpGroupPort `json:"-"` |
| NewReceivers map[string]*IgmpGroupPort `json:"-"` |
| proxyCfg **IgmpProfile |
| IgmpProxyIP **net.IP `json:"-"` |
| } |
| |
| // NewIgmpGroupChannel is constructor for a channel. The default IGMP version is set to 3 |
| // as the protocol defines the way to manage backward compatibility |
| // The implementation handles simultaneous presense of lower versioned |
| // receivers |
| func NewIgmpGroupChannel(igd *IgmpGroupDevice, groupAddr net.IP, version uint8) *IgmpGroupChannel { |
| var igc IgmpGroupChannel |
| igc.Device = igd.Device |
| igc.GroupID = igd.GroupID |
| igc.GroupName = igd.GroupName |
| igc.GroupAddr = groupAddr |
| igc.Mvlan = igd.Mvlan |
| igc.Version = version |
| igc.CurReceivers = make(map[string]*IgmpGroupPort) |
| igc.NewReceivers = make(map[string]*IgmpGroupPort) |
| igc.proxyCfg = &igd.proxyCfg |
| igc.IgmpProxyIP = &igd.IgmpProxyIP |
| igc.ServVersion = igd.ServVersion |
| return &igc |
| } |
| |
| // NewIgmpGroupChannelFromBytes create the IGMP group channel from a byte slice |
| func NewIgmpGroupChannelFromBytes(b []byte) (*IgmpGroupChannel, error) { |
| var igc IgmpGroupChannel |
| if err := json.Unmarshal(b, &igc); err != nil { |
| return nil, err |
| } |
| igc.CurReceivers = make(map[string]*IgmpGroupPort) |
| igc.NewReceivers = make(map[string]*IgmpGroupPort) |
| return &igc, nil |
| } |
| |
| // RestorePorts to restore ports |
| func (igc *IgmpGroupChannel) RestorePorts(cntx context.Context) { |
| |
| igc.migrateIgmpPorts(cntx) |
| ports, _ := db.GetIgmpRcvrs(cntx, igc.Mvlan, igc.GroupAddr, igc.Device) |
| for _, port := range ports { |
| b, ok := port.Value.([]byte) |
| if !ok { |
| logger.Warn(ctx, "The value type is not []byte") |
| continue |
| } |
| if igp, err := NewIgmpGroupPortFromBytes(b); err == nil { |
| igc.NewReceivers[igp.Port] = igp |
| logger.Infow(ctx, "Group Port Restored", log.Fields{"IGP": igp}) |
| } else { |
| logger.Warn(ctx, "Failed to decode port from DB") |
| } |
| } |
| if err := igc.WriteToDb(cntx); err != nil { |
| logger.Errorw(ctx, "Igmp group channel Write to DB failed", log.Fields{"mvlan": igc.Mvlan, "GroupAddr": igc.GroupAddr}) |
| } |
| } |
| |
| // WriteToDb is utility to write IGMPGroupChannel Info to database |
| func (igc *IgmpGroupChannel) WriteToDb(cntx context.Context) error { |
| b, err := json.Marshal(igc) |
| if err != nil { |
| return err |
| } |
| if err1 := db.PutIgmpChannel(cntx, igc.Mvlan, igc.GroupName, igc.Device, igc.GroupAddr, string(b)); err1 != nil { |
| return err1 |
| } |
| logger.Info(ctx, "IGC Updated") |
| return nil |
| } |
| |
| |
| // InclSourceIsIn checks if a source is in include list |
| func (igc *IgmpGroupChannel) InclSourceIsIn(src net.IP) bool { |
| return IsIPPresent(src, igc.IncludeList) |
| } |
| |
| // ExclSourceIsIn checks if a source is in exclude list |
| func (igc *IgmpGroupChannel) ExclSourceIsIn(src net.IP) bool { |
| return IsIPPresent(src, igc.ExcludeList) |
| } |
| |
| // AddInclSource adds a source is in include list |
| func (igc *IgmpGroupChannel) AddInclSource(src net.IP) { |
| logger.Debugw(ctx, "Adding Include Source for Channel", log.Fields{"Channel": igc.GroupAddr.String(), "Device": igc.Device, "Src": src}) |
| igc.IncludeList = append(igc.IncludeList, src) |
| } |
| |
| // AddExclSource adds a source is in exclude list |
| func (igc *IgmpGroupChannel) AddExclSource(src net.IP) { |
| logger.Debugw(ctx, "Adding Exclude Source for Channel", log.Fields{"Channel": igc.GroupAddr.String(), "Device": igc.Device, "Src": src}) |
| igc.ExcludeList = append(igc.ExcludeList, src) |
| } |
| |
| // UpdateExclSource update excl source list for the given channel |
| func (igc *IgmpGroupChannel) UpdateExclSource(srcList []net.IP) bool { |
| |
| logger.Debugw(ctx, "Updating Exclude Source for Channel", log.Fields{"Channel": igc.GroupAddr.String(), "Device": igc.Device, "Current List": igc.ExcludeList, "Incoming List": srcList}) |
| if !igc.IsExclListChanged(srcList) { |
| return false |
| } |
| |
| if igc.NumReceivers() == 1 { |
| igc.ExcludeList = srcList |
| } else { |
| igc.ExcludeList = igc.computeExclList(srcList) |
| } |
| |
| logger.Debugw(ctx, "Updated Exclude Source for Channel", log.Fields{"Channel": igc.GroupAddr.String(), "Device": igc.Device, "Updated Excl List": igc.ExcludeList}) |
| return true |
| } |
| |
| // computeExclList computes intersection of pervious & current src list |
| func (igc *IgmpGroupChannel) computeExclList(srcList []net.IP) []net.IP { |
| |
| updatedSrcList := []net.IP{} |
| for _, src := range srcList { |
| for _, excl := range igc.ExcludeList { |
| if src.Equal(excl) { |
| updatedSrcList = append(updatedSrcList, src) |
| } |
| } |
| } |
| return updatedSrcList |
| } |
| |
| // IsExclListChanged checks if excl list has been updated |
| func (igc *IgmpGroupChannel) IsExclListChanged(srcList []net.IP) bool { |
| |
| srcPresent := false |
| if len(igc.ExcludeList) != len(srcList) { |
| return true |
| } |
| |
| for _, src := range srcList { |
| for _, excl := range igc.ExcludeList { |
| srcPresent = false |
| if src.Equal(excl) { |
| srcPresent = true |
| break |
| } |
| } |
| if !srcPresent { |
| return true |
| } |
| } |
| return false |
| } |
| |
| // DelInclSource deletes a source is in include list |
| func (igc *IgmpGroupChannel) DelInclSource(src net.IP) { |
| mvp := GetApplication().GetMvlanProfileByTag(igc.Mvlan) |
| /* If the SSM proxy is configured, then we can del the src ip from igc as whatever is in proxy that is final list */ |
| if _, ok := mvp.Proxy[igc.GroupName]; !ok { |
| logger.Debugw(ctx, "Deleting Include Source for Channel", log.Fields{"Channel": igc.GroupAddr.String(), "Device": igc.Device, "Src": src}) |
| for _, igp := range igc.CurReceivers { |
| if igp.InclSourceIsIn(src) { |
| logger.Infow(ctx, "Skipping deletion: Source Present for another Receiver", log.Fields{"Receiver": igp.Port}) |
| return |
| } |
| } |
| for _, igp := range igc.NewReceivers { |
| if igp.InclSourceIsIn(src) { |
| logger.Infow(ctx, "Skipping deletion: Source Present for another Receiver", log.Fields{"Receiver": igp.Port}) |
| return |
| } |
| } |
| } else { |
| logger.Debug(ctx, "Proxy configured, not Deleting Include Source for Channel") |
| } |
| for i, addr := range igc.IncludeList { |
| if addr.Equal(src) { |
| igc.IncludeList = append(igc.IncludeList[:i], igc.IncludeList[i+1:]...) |
| return |
| } |
| } |
| } |
| |
| // DelExclSource deletes a source is in exclude list |
| func (igc *IgmpGroupChannel) DelExclSource(src net.IP) { |
| logger.Debugw(ctx, "Deleting Exclude Source for Channel", log.Fields{"Channel": igc.GroupAddr.String(), "Device": igc.Device, "Src": src}) |
| |
| for _, igp := range igc.CurReceivers { |
| if igp.ExclSourceIsIn(src) { |
| logger.Infow(ctx, "Skipping deletion: Source Present for another Receiver", log.Fields{"Receiver": igp.Port}) |
| return |
| } |
| } |
| for _, igp := range igc.NewReceivers { |
| if igp.ExclSourceIsIn(src) { |
| logger.Infow(ctx, "Skipping deletion: Source Present for another Receiver", log.Fields{"Receiver": igp.Port}) |
| return |
| } |
| } |
| for i, addr := range igc.ExcludeList { |
| if addr.Equal(src) { |
| igc.ExcludeList = append(igc.ExcludeList[:i], igc.ExcludeList[i+1:]...) |
| return |
| } |
| } |
| } |
| |
| // ProcessSources process the received list of either included sources or the excluded sources |
| // The return value indicate sif the group is modified and needs to be informed |
| // to the upstream multicast servers |
| func (igc *IgmpGroupChannel) ProcessSources(cntx context.Context, port string, ip []net.IP, incl bool) (bool, bool) { |
| groupChanged := false |
| groupExclUpdated := false |
| receiverSrcListEmpty := false |
| // If the version type is 2, there isn't anything to process here |
| if igc.Version == IgmpVersion2 && *igc.ServVersion == IgmpVersion2 { |
| return false, false |
| } |
| |
| igp := igc.GetReceiver(port) |
| if igp == nil { |
| logger.Warnw(ctx, "Receiver not found", log.Fields{"Port": port}) |
| return false, false |
| } |
| mvp := GetApplication().GetMvlanProfileByTag(igc.Mvlan) |
| if incl { |
| for _, src := range ip { |
| |
| if igp.ExclSourceIsIn(src) { |
| igp.DelExclSource(src) |
| if igc.ExclSourceIsIn(src) { |
| igc.DelExclSource(src) |
| groupChanged = true |
| } |
| } |
| |
| // If the source is not in the list of include sources for the port |
| // add it. If so, check also if it is in list of include sources |
| // at the device level. |
| if !igp.InclSourceIsIn(src) { |
| igp.AddInclSource(src) |
| if !igc.InclSourceIsIn(src) { |
| igc.AddInclSource(src) |
| groupChanged = true |
| } |
| } |
| } |
| /* If any of the existing ip in the source list is removed we need to remove from the list in igp and igc */ |
| if _, ok := mvp.Proxy[igc.GroupName]; ok { |
| /* If we get leave message from any subscriber, we do not have to delete the entries in the src list |
| Only if ther is any modification in the src list by proxy config update only then we need to update */ |
| if len(ip) != 0 && len(ip) != len(igc.IncludeList) { |
| for i := len(igc.IncludeList) - 1; i >= 0; i-- { |
| src := igc.IncludeList[i] |
| if !IsIPPresent(src, ip) { |
| igp.DelInclSource(src) |
| igc.DelInclSource(src) |
| groupChanged = true |
| } |
| } |
| } |
| } |
| } else { |
| for _, src := range ip { |
| |
| if igp.InclSourceIsIn(src) { |
| igp.DelInclSource(src) |
| if igc.InclSourceIsIn(src) { |
| igc.DelInclSource(src) |
| groupChanged = true |
| } |
| if len(igp.IncludeList) == 0 { |
| receiverSrcListEmpty = true |
| } |
| } |
| |
| // If the source is not in the list of exclude sources for the port |
| // add it. If so, check also if it is in list of include sources |
| // at the device level. |
| if !igp.ExclSourceIsIn(src) { |
| igp.AddExclSource(src) |
| /* If there is any update in the src list of proxy we need to update the igc */ |
| if _, ok := mvp.Proxy[igc.GroupName]; ok { |
| if !igc.ExclSourceIsIn(src) { |
| igc.AddExclSource(src) |
| groupChanged = true |
| } |
| } |
| } |
| } |
| /* If any of the existing ip in the source list is removed we need to remove from the list in igp and igc */ |
| if _, ok := mvp.Proxy[igc.GroupName]; ok { |
| if len(ip) != len(igc.ExcludeList) { |
| for i := len(igc.ExcludeList) - 1; i >= 0; i-- { |
| src := igc.ExcludeList[i] |
| if !IsIPPresent(src, ip) { |
| igp.DelExclSource(src) |
| igc.DelExclSource(src) |
| groupChanged = true |
| } |
| } |
| } |
| } |
| groupExclUpdated = igc.UpdateExclSource(ip) |
| } |
| if err := igp.WriteToDb(cntx, igc.Mvlan, igc.GroupAddr, igc.Device); err != nil { |
| logger.Errorw(ctx, "Igmp group port Write to DB failed", log.Fields{"mvlan": igc.Mvlan, "GroupAddr": igc.GroupAddr}) |
| } |
| return (groupChanged || groupExclUpdated), receiverSrcListEmpty |
| } |
| |
| // GetReceiver to get receiver info |
| func (igc *IgmpGroupChannel) GetReceiver(port string) *IgmpGroupPort { |
| igp := igc.NewReceivers[port] |
| if igp == nil { |
| igp = igc.CurReceivers[port] |
| } |
| return igp |
| } |
| |
| // AddReceiver add the receiver to the device and perform other actions such as adding the group |
| // to the physical device, add members, add flows to point the MC packets to the |
| // group. Also, send a IGMP report upstream if there is a change in the group |
| func (igc *IgmpGroupChannel) AddReceiver(cntx context.Context, port string, group *layers.IGMPv3GroupRecord, cvlan uint16, pbit uint8) bool { |
| |
| var igp *IgmpGroupPort |
| var groupModified = false |
| var isNewReceiver = false |
| |
| var ip []net.IP |
| incl := false |
| mvp := GetApplication().GetMvlanProfileByTag(igc.Mvlan) |
| if _, ok := mvp.Proxy[igc.GroupName]; ok { |
| if mvp.Proxy[igc.GroupName].Mode == common.Include { |
| incl = true |
| } |
| ip = mvp.Proxy[igc.GroupName].SourceList |
| } else if group != nil { |
| incl = isIncl(group.Type) |
| ip = group.SourceAddresses |
| } |
| logger.Debugw(ctx, "Attempting to add receiver", log.Fields{"Version": igc.Version, "Port": port, "Incl": incl, "srcIp": ip}) |
| |
| //logger.Infow(ctx, "Receivers", log.Fields{"New": igc.NewReceivers, "Current": igc.CurReceivers}) |
| logger.Debugw(ctx, "Receiver Group", log.Fields{"Igd GId": igc.GroupID}) |
| logger.Debugw(ctx, "Receiver Channel", log.Fields{"Igd addr": igc.GroupAddr}) |
| logger.Debugw(ctx, "Receiver Mvlan", log.Fields{"Igd mvlan": igc.Mvlan}) |
| logger.Debugw(ctx, "Receiver Sources", log.Fields{"Igd addr": ip}) |
| |
| ponPortID := GetApplication().GetPonPortID(igc.Device, port) |
| |
| // Process the IGMP receiver. If it is already in, we should only process the changes |
| // to source list. |
| var newRcvExists bool |
| igp, newRcvExists = igc.NewReceivers[port] |
| if !newRcvExists { |
| // Add the receiver to the list of receivers and make the necessary group modification |
| // if this is the first time the receiver is added |
| var curRcvExists bool |
| if igp, curRcvExists = igc.CurReceivers[port]; curRcvExists { |
| logger.Debugw(ctx, "Existing IGMP receiver", log.Fields{"Group": igc.GroupAddr.String(), "Port": port}) |
| delete(igc.CurReceivers, port) |
| igp.QueryTimeoutCount = 0 |
| igc.NewReceivers[port] = igp |
| } else { |
| // New receiver who wasn't part of earlier list |
| // Need to send out IGMP group modification for this port |
| igp = NewIgmpGroupPort(port, cvlan, pbit, igc.Version, incl, uint32(ponPortID)) |
| igc.NewReceivers[port] = igp |
| isNewReceiver = true |
| logger.Debugw(ctx, "New IGMP receiver", log.Fields{"Group": igc.GroupAddr.String(), "Port": port}) |
| if len(igc.NewReceivers) == 1 && len(igc.CurReceivers) == 0 { |
| groupModified = true |
| igc.AddMcFlow(cntx) |
| logger.Debugw(ctx, "Added New Flow", log.Fields{"Group": igc.GroupAddr.String(), "Port": port}) |
| } |
| if !incl { |
| igc.Exclude++ |
| } |
| } |
| } |
| |
| // Process the include/exclude list which may end up modifying the group |
| if change, _ := igc.ProcessSources(cntx, port, ip, incl); change { |
| groupModified = true |
| } |
| igc.ProcessMode(port, incl) |
| |
| // If the group is modified as this is the first receiver or due to include/exclude list modification |
| // send a report to the upstream multicast servers |
| if groupModified { |
| logger.Debug(ctx, "Group Modified and IGMP report sent to the upstream server") |
| igc.SendReport(false) |
| } else if newRcvExists { |
| return false |
| } |
| |
| logger.Debugw(ctx, "Channel Receiver Added", log.Fields{"Group Channel": igc.GroupAddr, "Group Port": igp}) |
| |
| if err := igc.WriteToDb(cntx); err != nil { |
| logger.Errorw(ctx, "Igmp group channel Write to DB failed", log.Fields{"mvlan": igc.Mvlan, "GroupAddr": igc.GroupAddr}) |
| } |
| if err := igp.WriteToDb(cntx, igc.Mvlan, igc.GroupAddr, igc.Device); err != nil { |
| logger.Errorw(ctx, "Igmp group port Write to DB failed", log.Fields{"mvlan": igc.Mvlan, "GroupAddr": igc.GroupAddr}) |
| } |
| return isNewReceiver |
| } |
| |
| // DelReceiver is called when Query expiry happened for a receiver. This removes the receiver from the |
| // the group |
| func (igc *IgmpGroupChannel) DelReceiver(cntx context.Context, port string, incl bool, srcList []net.IP) bool { |
| // The receiver may exist either in NewReceiver list or |
| // the CurReceivers list. Find and remove it from either |
| // of the lists. |
| logger.Debugw(ctx, "Deleting Receiver from Channel", log.Fields{"Port": port, "SrcList": srcList, "Incl": incl}) |
| logger.Debugw(ctx, "New Receivers", log.Fields{"New": igc.NewReceivers}) |
| logger.Debugw(ctx, "Current Receivers", log.Fields{"Current": igc.CurReceivers}) |
| |
| receiversUpdated := false |
| groupModified, receiverSrcListEmpty := igc.ProcessSources(cntx, port, srcList, incl) |
| |
| if len(srcList) == 0 || len(igc.IncludeList) == 0 || receiverSrcListEmpty { |
| if igp, ok := igc.NewReceivers[port]; ok { |
| logger.Debug(ctx, "Deleting from NewReceivers") |
| delete(igc.NewReceivers, port) |
| receiversUpdated = true |
| if igp.Exclude { |
| igc.Exclude-- |
| } |
| } else { |
| if igp, ok1 := igc.CurReceivers[port]; ok1 { |
| logger.Debug(ctx, "Deleting from CurReceivers") |
| delete(igc.CurReceivers, port) |
| receiversUpdated = true |
| if igp.Exclude { |
| igc.Exclude-- |
| } |
| } else { |
| logger.Debug(ctx, "Receiver doesnot exist. Dropping Igmp leave") |
| return false |
| } |
| } |
| _ = db.DelIgmpRcvr(cntx, igc.Mvlan, igc.GroupAddr, igc.Device, port) |
| } |
| |
| if igc.NumReceivers() == 0 { |
| igc.DelMcFlow(cntx) |
| mvp := GetApplication().GetMvlanProfileByTag(igc.Mvlan) |
| /* If proxy is configured and NumReceivers is 0, then we can reset the igc src list so that we send leave */ |
| if _, ok := mvp.Proxy[igc.GroupName]; ok { |
| igc.IncludeList = []net.IP{} |
| } |
| igc.SendLeaveToServer() |
| logger.Debugw(ctx, "Deleted the receiver Flow", log.Fields{"Num Receivers": igc.NumReceivers()}) |
| return true |
| } |
| if groupModified { |
| igc.SendReport(false) |
| logger.Infow(ctx, "Updated SourceList for Channel", log.Fields{"Current": igc.CurReceivers, "New": igc.NewReceivers}) |
| } |
| if err := igc.WriteToDb(cntx); err != nil { |
| logger.Errorw(ctx, "Igmp group channel Write to DB failed", log.Fields{"mvlan": igc.Mvlan, "GroupAddr": igc.GroupAddr}) |
| } |
| logger.Infow(ctx, "Updated Receiver info for Channel", log.Fields{"Current": igc.CurReceivers, "New": igc.NewReceivers}) |
| |
| return receiversUpdated |
| } |
| |
| // DelAllReceivers deletes all receiver for the provided igmp device |
| func (igc *IgmpGroupChannel) DelAllReceivers(cntx context.Context) { |
| logger.Infow(ctx, "Deleting All Receiver for Channel", log.Fields{"Device": igc.Device, "Channel": igc.GroupAddr.String()}) |
| _ = db.DelAllIgmpRcvr(cntx, igc.Mvlan, igc.GroupAddr, igc.Device) |
| igc.Exclude = 0 |
| igc.DelMcFlow(cntx) |
| igc.SendLeaveToServer() |
| logger.Infow(ctx, "MC Flow deleted and Leave sent", log.Fields{"Channel": igc.GroupAddr.String(), "Device": igc.Device}) |
| } |
| |
| // Igmpv2ReportPacket build an IGMPv2 Report for the upstream servers |
| func (igc *IgmpGroupChannel) Igmpv2ReportPacket() ([]byte, error) { |
| logger.Debugw(ctx, "Buidling IGMP version 2 Report", log.Fields{"Device": igc.Device}) |
| return IgmpReportv2Packet(igc.GroupAddr, igc.Mvlan, (*igc.proxyCfg).IgmpCos, **igc.IgmpProxyIP) |
| } |
| |
| // Igmpv3ReportPacket build an IGMPv3 Report for the upstream servers |
| func (igc *IgmpGroupChannel) Igmpv3ReportPacket() ([]byte, error) { |
| logger.Debugw(ctx, "Buidling IGMP version 3 Report", log.Fields{"Device": igc.Device, "Exclude": igc.Exclude}) |
| if igc.Exclude > 0 { |
| return Igmpv3ReportPacket(igc.GroupAddr, igc.Mvlan, (*igc.proxyCfg).IgmpCos, **igc.IgmpProxyIP, false, igc.ExcludeList) |
| } |
| return Igmpv3ReportPacket(igc.GroupAddr, igc.Mvlan, (*igc.proxyCfg).IgmpCos, **igc.IgmpProxyIP, true, igc.IncludeList) |
| } |
| |
| // SendReport send a consolidated report to the server |
| func (igc *IgmpGroupChannel) SendReport(isQuery bool) { |
| var report []byte |
| var err error |
| logger.Debugw(ctx, "Checking Version", log.Fields{"IGC Version": igc.Version, "Proxy Version": (*igc.proxyCfg).IgmpVerToServer, |
| "Result": (getVersion((*igc.proxyCfg).IgmpVerToServer) == IgmpVersion2)}) |
| |
| /** |
| +------------------------------------------------------------------------+ |
| | IGMP version(towards BNG) Configured at VGC | |
| +-------------------------------+----------------------------------------+ |
| | v2 | v3 | |
| +===================+==========+===============================+========================================+ |
| | Received From RG | V2 Join | Process and Send as V2 to BNG | Process, Convert to v3 and Send to BNG | |
| | | | | Process, Send as v2, if the BNG is v2 | |
| +===================+----------+-------------------------------+----------------------------------------+ |
| | V3 Join | Process and Send as V2 to BNG | Process, Send v3 to BNG | |
| | | | Process, Convert, Send as v2, if the | |
| | | | BNG is v2 | |
| +===================+==========+===============================+========================================+ |
| | Received From BNG | V2 Query | V2 response to BNG | V2 response to BNG | |
| +===================+----------+-------------------------------+----------------------------------------+ |
| | V3 Query | Discard | V3 response to BNG | |
| +==========+===============================+========================================+ |
| */ |
| // igc.Version: igmp version received from RG. |
| // igc.ServVersion: igmp version received from BNG or IgmpVerToServer present in proxy igmp conf. |
| |
| if isQuery && *igc.ServVersion == IgmpVersion3 && getVersion((*igc.proxyCfg).IgmpVerToServer) == IgmpVersion2 { |
| // This is the last scenario where we must discard the query processing. |
| logger.Debug(ctx, "Dropping query packet since the server verion is v3 but igmp proxy version is v2") |
| return |
| } |
| |
| if *igc.ServVersion == IgmpVersion2 || getVersion((*igc.proxyCfg).IgmpVerToServer) == IgmpVersion2 { |
| report, err = igc.Igmpv2ReportPacket() |
| } else { |
| report, err = igc.Igmpv3ReportPacket() |
| } |
| if err != nil { |
| logger.Warnw(ctx, "Error Preparing Report", log.Fields{"Device": igc.Device, "Ver": igc.Version, "Reason": err.Error()}) |
| return |
| } |
| nni, err := GetApplication().GetNniPort(igc.Device) |
| if err == nil { |
| _ = cntlr.GetController().PacketOutReq(igc.Device, nni, nni, report, false) |
| } else { |
| logger.Warnw(ctx, "Didn't find NNI port", log.Fields{"Device": igc.Device}) |
| } |
| } |
| |
| // AddMcFlow adds flow to the device when the first receiver joins |
| func (igc *IgmpGroupChannel) AddMcFlow(cntx context.Context) { |
| flow, err := igc.BuildMcFlow() |
| if err != nil { |
| logger.Warnw(ctx, "MC Flow Build Failed", log.Fields{"Reason": err.Error()}) |
| return |
| } |
| port, _ := GetApplication().GetNniPort(igc.Device) |
| _ = cntlr.GetController().AddFlows(cntx, port, igc.Device, flow) |
| } |
| |
| // DelMcFlow deletes flow from the device when the last receiver leaves |
| func (igc *IgmpGroupChannel) DelMcFlow(cntx context.Context) { |
| flow, err := igc.BuildMcFlow() |
| if err != nil { |
| logger.Warnw(ctx, "MC Flow Build Failed", log.Fields{"Reason": err.Error()}) |
| return |
| } |
| flow.ForceAction = true |
| device := GetApplication().GetDevice(igc.Device) |
| |
| if mvpIntf, _ := GetApplication().MvlanProfilesByTag.Load(igc.Mvlan); mvpIntf != nil { |
| mvp := mvpIntf.(*MvlanProfile) |
| err := mvp.DelFlows(cntx, device, flow) |
| if err != nil { |
| logger.Warnw(ctx, "Delering IGMP Flow for device failed ", log.Fields{"Device": device, "err": err}) |
| } |
| } |
| } |
| |
| // BuildMcFlow builds the flow using which it is added/deleted |
| func (igc *IgmpGroupChannel) BuildMcFlow() (*of.VoltFlow, error) { |
| flow := &of.VoltFlow{} |
| flow.SubFlows = make(map[uint64]*of.VoltSubFlow) |
| //va := GetApplication() |
| logger.Infow(ctx, "Building Mcast flow", log.Fields{"Mcast Group": igc.GroupAddr.String(), "Mvlan": igc.Mvlan.String()}) |
| uintGroupAddr := ipv4ToUint(igc.GroupAddr) |
| subFlow := of.NewVoltSubFlow() |
| subFlow.SetMatchVlan(igc.Mvlan) |
| subFlow.SetIpv4Match() |
| subFlow.SetMatchDstIpv4(igc.GroupAddr) |
| mvp := GetApplication().GetMvlanProfileByTag(igc.Mvlan) |
| //nni, err := va.GetNniPort(igc.Device) |
| //if err != nil { |
| // return nil, err |
| //} |
| //inport, err := va.GetPortID(nni) |
| //if err != nil { |
| // return nil, err |
| //} |
| //subFlow.SetInPort(inport) |
| subFlow.SetOutGroup(igc.GroupID) |
| cookiePort := uintGroupAddr |
| subFlow.Cookie = uint64(cookiePort)<<32 | uint64(igc.Mvlan) |
| subFlow.Priority = of.McFlowPriority |
| metadata := uint64(mvp.PonVlan) |
| subFlow.SetTableMetadata(metadata) |
| |
| flow.SubFlows[subFlow.Cookie] = subFlow |
| logger.Infow(ctx, "Built Mcast flow", log.Fields{"cookie": subFlow.Cookie, "subflow": subFlow}) |
| return flow, nil |
| } |
| |
| // IgmpLeaveToServer sends IGMP leave to server. Called when the last receiver leaves the group |
| func (igc *IgmpGroupChannel) IgmpLeaveToServer() { |
| if leave, err := IgmpLeavePacket(igc.GroupAddr, igc.Mvlan, (*igc.proxyCfg).IgmpCos, **igc.IgmpProxyIP); err == nil { |
| nni, err1 := GetApplication().GetNniPort(igc.Device) |
| if err1 == nil { |
| _ = cntlr.GetController().PacketOutReq(igc.Device, nni, nni, leave, false) |
| } |
| } |
| } |
| |
| // SendLeaveToServer delete the group when the last receiver leaves the group |
| func (igc *IgmpGroupChannel) SendLeaveToServer() { |
| /** |
| +-------------------------------------------------------------------------+ |
| | IGMP version(towards BNG) Configured at VGC | |
| +-------------------------------+-----------------------------------------+ |
| | v2 | v3 | |
| +===================+==========+===============================+=========================================+ |
| | Received From RG | V2 Leave | Process and Send as V2 to BNG | Process, Convert to V3 and Send to BNG/ | |
| | | | | Process, Send as V2, if the BNG is V2 | |
| +===================+----------+-------------------------------+-----------------------------------------+ |
| | V3 Leave | Process and Send as V2 to BNG | Process, Send V3 to BNG | |
| | | | Process, Convert, Send as V2, if the | |
| | | | BNG is v2 | |
| +==========+===============================+=========================================+ |
| */ |
| // igc.Version: igmp version received from RG. |
| // igc.ServVersion: igmp version received from BNG or IgmpVerToServer present in proxy igmp conf. |
| |
| logger.Debugw(ctx, "Sending IGMP leave upstream", log.Fields{"Device": igc.Device}) |
| if *igc.ServVersion == IgmpVersion2 || getVersion((*igc.proxyCfg).IgmpVerToServer) == IgmpVersion2 { |
| igc.IgmpLeaveToServer() |
| } else { |
| igc.SendReport(false) |
| } |
| } |
| |
| // NumReceivers returns total number of receivers left on the group |
| func (igc *IgmpGroupChannel) NumReceivers() uint32 { |
| return uint32(len(igc.CurReceivers) + len(igc.NewReceivers)) |
| } |
| |
| // SendQuery sends query to the receivers for counting purpose |
| func (igc *IgmpGroupChannel) SendQuery() { |
| //var b []byte |
| //var err error |
| for portKey, port := range igc.NewReceivers { |
| igc.CurReceivers[portKey] = port |
| } |
| |
| igc.NewReceivers = make(map[string]*IgmpGroupPort) |
| |
| logger.Debugw(ctx, "Sending Query to receivers", log.Fields{"Receivers": igc.CurReceivers}) |
| for port, groupPort := range igc.CurReceivers { |
| if port == StaticPort { |
| continue |
| } |
| if queryPkt, err := igc.buildQuery(igc.GroupAddr, of.VlanType(groupPort.CVlan), groupPort.Pbit); err == nil { |
| _ = cntlr.GetController().PacketOutReq(igc.Device, port, port, queryPkt, false) |
| logger.Debugw(ctx, "Query Sent", log.Fields{"Device": igc.Device, "Port": port, "Packet": queryPkt}) |
| } else { |
| logger.Warnw(ctx, "Query Creation Failed", log.Fields{"Reason": err.Error()}) |
| } |
| } |
| |
| } |
| |
| // buildQuery to build query packet |
| func (igc *IgmpGroupChannel) buildQuery(groupAddr net.IP, cVlan of.VlanType, pbit uint8) ([]byte, error) { |
| if igc.Version == IgmpVersion2 { |
| return Igmpv2QueryPacket(igc.GroupAddr, cVlan, **igc.IgmpProxyIP, pbit, (*igc.proxyCfg).MaxResp) |
| } |
| return Igmpv3QueryPacket(igc.GroupAddr, cVlan, **igc.IgmpProxyIP, pbit, (*igc.proxyCfg).MaxResp) |
| } |
| |
| // ProcessMode process the received mode and updated the igp |
| func (igc *IgmpGroupChannel) ProcessMode(port string, incl bool) { |
| /* Update the mode in igp if the mode has changed */ |
| igp := igc.GetReceiver(port) |
| if igp.Exclude && incl { |
| igp.Exclude = !incl |
| if igc.Exclude > 0 { |
| igc.Exclude-- |
| } |
| } else if !incl && !igp.Exclude { |
| igp.Exclude = !incl |
| igc.Exclude++ |
| } |
| } |
| |