blob: 1a12bc4ed02a91a62aa4e270d0419571bce57529 [file] [log] [blame]
Tinoj Josephcf161be2022-07-07 19:47:47 +05301/*
2* Copyright 2022-present Open Networking Foundation
3* Licensed under the Apache License, Version 2.0 (the "License");
4* you may not use this file except in compliance with the License.
5* You may obtain a copy of the License at
6*
7* http://www.apache.org/licenses/LICENSE-2.0
8*
9* Unless required by applicable law or agreed to in writing, software
10* distributed under the License is distributed on an "AS IS" BASIS,
11* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12* See the License for the specific language governing permissions and
13* limitations under the License.
vinokuma926cb3e2023-03-29 11:41:06 +053014 */
Tinoj Josephcf161be2022-07-07 19:47:47 +053015
16package application
17
18import (
Tinoj Joseph07cc5372022-07-18 22:53:51 +053019 "context"
Tinoj Josephcf161be2022-07-07 19:47:47 +053020 "encoding/json"
21 "net"
22 "sync"
23 "time"
24
25 "github.com/google/gopacket/layers"
26
27 cntlr "voltha-go-controller/internal/pkg/controller"
28 "voltha-go-controller/internal/pkg/of"
29 "voltha-go-controller/internal/pkg/util"
30 "voltha-go-controller/log"
31)
32
33// IgmpGroupDevice : IGMP Group Device manages the IGMP group for all listerns on
34// a single OLT. It aggregates reports received on a single group
35// and performs the count. It is responsible for sending upstream
36// report when the first listener joins and is responsible for
37// sending responses to upstream queries
38type IgmpGroupDevice struct {
vinokuma926cb3e2023-03-29 11:41:06 +053039 PonPortChannelMap *util.ConcurrentMap `json:"-"` // [ponPortId]*PonPortChannels
40 proxyCfg *IgmpProfile // IgmpSrcIp from IgmpProfile is not used, it is kept for backward compatibility
41 IgmpProxyIP *net.IP `json:"-"`
42 ServVersion *uint8
43 Device string
44 SerialNo string
45 GroupName string
46 GroupChannels sync.Map `json:"-"` // [ipAddr]*IgmpGroupChannel
47 PortChannelMap sync.Map `json:"-"` // [portName][]net.IP
48 NextQueryTime time.Time
49 QueryExpiryTime time.Time
50 RecvVersionExpiry time.Time
51 ServVersionExpiry time.Time
52 GroupAddr net.IP
53 GroupID uint32
54 Mvlan of.VlanType
55 PonVlan of.VlanType
56 RecvVersion uint8
57 IsPonVlanPresent bool
58 GroupInstalled bool
Tinoj Josephcf161be2022-07-07 19:47:47 +053059}
60
61// NewIgmpGroupDevice is constructor for a device. The default IGMP version is set to 3
62// as the protocol defines the way to manage backward compatibility
vinokuma926cb3e2023-03-29 11:41:06 +053063// The implementation handles simultaneous presence of lower versioned
Tinoj Josephcf161be2022-07-07 19:47:47 +053064// receivers
65func NewIgmpGroupDevice(name string, ig *IgmpGroup, id uint32, version uint8) *IgmpGroupDevice {
vinokuma926cb3e2023-03-29 11:41:06 +053066 var igd IgmpGroupDevice
67 igd.Device = name
68 igd.GroupID = id
69 igd.GroupName = ig.GroupName
70 igd.GroupAddr = ig.GroupAddr
71 igd.Mvlan = ig.Mvlan
72 igd.PonVlan = ig.PonVlan
73 igd.IsPonVlanPresent = ig.IsPonVlanPresent
74 igd.GroupInstalled = false
75 igd.RecvVersion = version
76 igd.RecvVersionExpiry = time.Now()
77 igd.ServVersionExpiry = time.Now()
78 igd.PonPortChannelMap = util.NewConcurrentMap()
Tinoj Josephcf161be2022-07-07 19:47:47 +053079
vinokuma926cb3e2023-03-29 11:41:06 +053080 va := GetApplication()
81 if vd := va.GetDevice(igd.Device); vd != nil {
82 igd.SerialNo = vd.SerialNum
83 } else {
84 logger.Errorw(ctx, "Volt Device not found. log.Fields", log.Fields{"igd.Device": igd.Device})
85 return nil
86 }
87 mvp := GetApplication().GetMvlanProfileByTag(igd.Mvlan)
88 igd.ServVersion = mvp.IgmpServVersion[igd.SerialNo]
Tinoj Josephcf161be2022-07-07 19:47:47 +053089
vinokuma926cb3e2023-03-29 11:41:06 +053090 var mcastCfg *McastConfig
91 igd.proxyCfg, igd.IgmpProxyIP, mcastCfg = getIgmpProxyCfgAndIP(ig.Mvlan, igd.SerialNo)
Tinoj Josephcf161be2022-07-07 19:47:47 +053092
vinokuma926cb3e2023-03-29 11:41:06 +053093 // mvlan profile id + olt serial number---igmp group id
94 //igmpgroup id
95 igd.NextQueryTime = time.Now().Add(time.Duration(igd.proxyCfg.KeepAliveInterval) * time.Second)
96 igd.QueryExpiryTime = time.Now().Add(time.Duration(igd.proxyCfg.KeepAliveInterval) * time.Second)
Tinoj Josephcf161be2022-07-07 19:47:47 +053097
vinokuma926cb3e2023-03-29 11:41:06 +053098 if mcastCfg != nil {
99 mcastCfg.IgmpGroupDevices.Store(id, &igd)
100 logger.Debugw(ctx, "Igd added to mcast config", log.Fields{"mvlan": mcastCfg.MvlanProfileID, "groupId": id})
101 }
102 return &igd
Tinoj Josephcf161be2022-07-07 19:47:47 +0530103}
104
105// IgmpGroupDeviceReInit is re-initializer for a device. The default IGMP version is set to 3
106// as the protocol defines the way to manage backward compatibility
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530107func (igd *IgmpGroupDevice) IgmpGroupDeviceReInit(cntx context.Context, ig *IgmpGroup) {
vinokuma926cb3e2023-03-29 11:41:06 +0530108 logger.Infow(ctx, "Reinitialize Igmp Group Device", log.Fields{"Device": igd.Device, "GroupID": ig.GroupID, "OldName": igd.GroupName, "Name": ig.GroupName, "OldAddr": igd.GroupAddr.String(), "GroupAddr": ig.GroupAddr.String()})
Tinoj Josephcf161be2022-07-07 19:47:47 +0530109
vinokuma926cb3e2023-03-29 11:41:06 +0530110 if (igd.GroupName != ig.GroupName) || !igd.GroupAddr.Equal(ig.GroupAddr) {
111 _ = db.DelIgmpDevice(cntx, igd.Mvlan, igd.GroupName, igd.GroupAddr, igd.Device)
112 igd.GroupName = ig.GroupName
113 igd.GroupAddr = ig.GroupAddr
114 }
115 igd.RecvVersionExpiry = time.Now()
116 igd.ServVersionExpiry = time.Now()
117 igd.PonPortChannelMap = util.NewConcurrentMap()
Tinoj Josephcf161be2022-07-07 19:47:47 +0530118
vinokuma926cb3e2023-03-29 11:41:06 +0530119 var mcastCfg *McastConfig
120 igd.proxyCfg, igd.IgmpProxyIP, mcastCfg = getIgmpProxyCfgAndIP(ig.Mvlan, igd.SerialNo)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530121
vinokuma926cb3e2023-03-29 11:41:06 +0530122 igd.NextQueryTime = time.Now().Add(time.Duration(igd.proxyCfg.KeepAliveInterval) * time.Second)
123 igd.QueryExpiryTime = time.Now().Add(time.Duration(igd.proxyCfg.KeepAliveInterval) * time.Second)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530124
vinokuma926cb3e2023-03-29 11:41:06 +0530125 if mcastCfg != nil {
126 mcastCfg.IgmpGroupDevices.Store(ig.GroupID, igd)
127 logger.Debugw(ctx, "Igd added to mcast config", log.Fields{"mvlan": mcastCfg.MvlanProfileID, "groupId": ig.GroupID})
128 }
129 if err := igd.WriteToDb(cntx); err != nil {
130 logger.Errorw(ctx, "Igmp group device Write to DB failed", log.Fields{"Device": igd.Device, "GroupName": igd.GroupName, "GroupAddr": igd.GroupAddr.String()})
131 }
Tinoj Josephcf161be2022-07-07 19:47:47 +0530132}
133
134func getIgmpProxyCfgAndIP(mvlan of.VlanType, serialNo string) (*IgmpProfile, *net.IP, *McastConfig) {
vinokuma926cb3e2023-03-29 11:41:06 +0530135 va := GetApplication()
136 mVLANProfileID := va.GetMvlanProfileByTag(mvlan).Name
137 var mcastCfg *McastConfig
138 if mcastCfg = va.GetMcastConfig(serialNo, mVLANProfileID); mcastCfg == nil || (mcastCfg != nil && mcastCfg.IgmpProfileID == "") {
139 logger.Debugw(ctx, "Default IGMP config to be used", log.Fields{"mVLANProfileID": mVLANProfileID, "OltSerialNo": serialNo})
140 igmpProf := va.getIgmpProfileMap(DefaultIgmpProfID)
141 return igmpProf, &igmpProf.IgmpSourceIP, mcastCfg
142 }
143 return va.getIgmpProfileMap(mcastCfg.IgmpProfileID), &mcastCfg.IgmpProxyIP, mcastCfg
Tinoj Josephcf161be2022-07-07 19:47:47 +0530144}
145
146// updateGroupName to update the group name
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530147func (igd *IgmpGroupDevice) updateGroupName(cntx context.Context, newGroupName string) {
vinokuma926cb3e2023-03-29 11:41:06 +0530148 oldName := igd.GroupName
149 igd.GroupName = newGroupName
150 updateGroupName := func(key, value interface{}) bool {
151 igc := value.(*IgmpGroupChannel)
152 igc.GroupName = newGroupName
153 if err := igc.WriteToDb(cntx); err != nil {
154 logger.Errorw(ctx, "Igmp group channel Write to DB failed", log.Fields{"mvlan": igc.Mvlan, "GroupAddr": igc.GroupAddr})
155 }
156 _ = db.DelIgmpChannel(cntx, igc.Mvlan, oldName, igc.Device, igc.GroupAddr)
157 return true
158 }
159 igd.GroupChannels.Range(updateGroupName)
160 if err := igd.WriteToDb(cntx); err != nil {
161 logger.Errorw(ctx, "Igmp group device Write to DB failed", log.Fields{"Device": igd.Device, "GroupName": igd.GroupName, "GroupAddr": igd.GroupAddr.String()})
162 }
163 _ = db.DelIgmpDevice(cntx, igd.Mvlan, oldName, igd.GroupAddr, igd.Device)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530164}
165
166// NewIgmpGroupDeviceFromBytes is to create the IGMP group port from a byte slice
167func NewIgmpGroupDeviceFromBytes(b []byte) (*IgmpGroupDevice, error) {
vinokuma926cb3e2023-03-29 11:41:06 +0530168 var igd IgmpGroupDevice
169 if err := json.Unmarshal(b, &igd); err != nil {
170 return nil, err
171 }
172 return &igd, nil
Tinoj Josephcf161be2022-07-07 19:47:47 +0530173}
174
175// GetKey to get group name as key
176func (igd *IgmpGroupDevice) GetKey() string {
vinokuma926cb3e2023-03-29 11:41:06 +0530177 if !net.ParseIP("0.0.0.0").Equal(igd.GroupAddr) {
178 return igd.GroupName + "_" + igd.GroupAddr.String()
179 }
180 return igd.GroupName
Tinoj Josephcf161be2022-07-07 19:47:47 +0530181}
182
183// RestoreChannel to restore channel
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530184func (igd *IgmpGroupDevice) RestoreChannel(cntx context.Context, igmpGroupChannel []byte) {
vinokuma926cb3e2023-03-29 11:41:06 +0530185 if igc, err := NewIgmpGroupChannelFromBytes(igmpGroupChannel); err == nil {
186 igc.ServVersion = igd.ServVersion
187 igc.IgmpProxyIP = &igd.IgmpProxyIP
188 igc.proxyCfg = &igd.proxyCfg
189 igd.GroupChannels.Store(igc.GroupAddr.String(), igc)
190 igc.RestorePorts(cntx)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530191
vinokuma926cb3e2023-03-29 11:41:06 +0530192 for port, igp := range igc.NewReceivers {
193 ipsList := []net.IP{}
194 ipsIntf, _ := igd.PortChannelMap.Load(port)
195 if ipsIntf != nil {
196 ipsList = ipsIntf.([]net.IP)
197 }
Tinoj Josephcf161be2022-07-07 19:47:47 +0530198
vinokuma926cb3e2023-03-29 11:41:06 +0530199 ipsList = append(ipsList, igc.GroupAddr)
200 igd.PortChannelMap.Store(port, ipsList)
201 logger.Infow(ctx, "Group Channels Restored", log.Fields{"IGC": igc})
202 igd.AddChannelToChannelsPerPon(port, igc.GroupAddr, igp.PonPortID)
203 }
204 } else {
205 logger.Warnw(ctx, "Failed to decode port from DB", log.Fields{"err": err})
206 }
207 logger.Info(ctx, "Group Device & Channels Restored")
208 igd.PortChannelMap.Range(printPortChannel)
209 igd.GroupChannels.Range(printChannel)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530210}
211
212// RestoreChannels to restore channels
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530213func (igd *IgmpGroupDevice) RestoreChannels(cntx context.Context) {
vinokuma926cb3e2023-03-29 11:41:06 +0530214 igd.migrateIgmpChannels(cntx)
215 channels, _ := db.GetIgmpChannels(cntx, igd.Mvlan, igd.GroupName, igd.Device)
216 for _, channel := range channels {
217 b, ok := channel.Value.([]byte)
218 if !ok {
219 logger.Warn(ctx, "The value type is not []byte")
220 continue
221 }
222 igd.RestoreChannel(cntx, b)
223 }
Tinoj Josephcf161be2022-07-07 19:47:47 +0530224}
225
Tinoj Josephcf161be2022-07-07 19:47:47 +0530226// WriteToDb is utility to write IGMP Group Device Info to the database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530227func (igd *IgmpGroupDevice) WriteToDb(cntx context.Context) error {
vinokuma926cb3e2023-03-29 11:41:06 +0530228 b, err := json.Marshal(igd)
229 if err != nil {
230 return err
231 }
232 if err1 := db.PutIgmpDevice(cntx, igd.Mvlan, igd.GroupName, igd.GroupAddr, igd.Device, string(b)); err1 != nil {
233 return err1
234 }
235 logger.Info(ctx, "IGD Updated")
236 return nil
Tinoj Josephcf161be2022-07-07 19:47:47 +0530237}
238
239// Tick processes timing tick used to run timers within the device
240func (igd *IgmpGroupDevice) Tick() uint8 {
vinokuma926cb3e2023-03-29 11:41:06 +0530241 /* Not using RecvVersionExpiry as it is not used anywhere
242 if time.Now().After(igd.RecvVersionExpiry) {
243 igd.RecvVersion = IgmpVersion3
244 return true
245 }
246 */
247 return 0
Tinoj Josephcf161be2022-07-07 19:47:47 +0530248}
249
250// GetSubscriberCountForChannelAndPonPort Gets the active subscriber count
251// for the given channel for one particular PON port
252func (igd *IgmpGroupDevice) GetSubscriberCountForChannelAndPonPort(ponPortID uint32, channelIP net.IP) uint64 {
vinokuma926cb3e2023-03-29 11:41:06 +0530253 if portMapIntf, ok := igd.PonPortChannelMap.Get(ponPortID); ok {
254 portChannelMap := portMapIntf.(*PonPortChannels)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530255
vinokuma926cb3e2023-03-29 11:41:06 +0530256 if channel, present := portChannelMap.ChannelList.Get(channelIP.String()); present {
257 return channel.(*UniPortList).UNIList.Length()
258 }
259 } else {
260 logger.Warnw(ctx, "PON port not found in PortChannelMap", log.Fields{"PON": ponPortID, "channel": channelIP})
261 }
262 return 0
Tinoj Josephcf161be2022-07-07 19:47:47 +0530263}
264
265// AddChannelToChannelsPerPon Adds the new channel into the per Pon channel list
266func (igd *IgmpGroupDevice) AddChannelToChannelsPerPon(uniPort string, channelIP net.IP, ponPortID uint32) bool {
vinokuma926cb3e2023-03-29 11:41:06 +0530267 logger.Debugw(ctx, "Adding channel to ActiveChannelsPerPon list", log.Fields{"PonPort": ponPortID, "channelIP": channelIP})
Tinoj Josephcf161be2022-07-07 19:47:47 +0530268
vinokuma926cb3e2023-03-29 11:41:06 +0530269 isNewChannel := bool(false)
270 isNewReceiver := false
271 if port, ok := igd.PonPortChannelMap.Get(ponPortID); !ok {
272 // PON port not exists in igd. adding it.
273 isNewReceiver = true
274 ponPortChannels := NewPonPortChannels()
275 isNewChannel = ponPortChannels.AddChannelToMap(uniPort, channelIP.String())
276 igd.PonPortChannelMap.Set(ponPortID, ponPortChannels)
277 } else {
278 // PON port exists in igd. Appending the channel list
279 // in the PON port.
280 isNewChannel = port.(*PonPortChannels).AddChannelToMap(uniPort, channelIP.String())
281 igd.PonPortChannelMap.Set(ponPortID, port)
282 count := port.(*PonPortChannels).GetActiveChannelCount()
Tinoj Josephcf161be2022-07-07 19:47:47 +0530283
vinokuma926cb3e2023-03-29 11:41:06 +0530284 logger.Debugw(ctx, "activeChannelCount", log.Fields{"count": count})
285 }
286 GetApplication().UpdateActiveChannelCountForPonPort(igd.Device, uniPort, ponPortID, true, isNewChannel, igd)
287 return isNewReceiver
Tinoj Josephcf161be2022-07-07 19:47:47 +0530288}
289
290// RemoveChannelFromChannelsPerPon removes the channel from the per pon channel list.
291func (igd *IgmpGroupDevice) RemoveChannelFromChannelsPerPon(uniPort string, channelIP net.IP, ponPortID uint32) bool {
vinokuma926cb3e2023-03-29 11:41:06 +0530292 logger.Debugw(ctx, "Removing channel from ActiveChannelsPerPon list", log.Fields{"PonPort": ponPortID, "channelIP": channelIP})
293 var deleted bool
294 ponRemoved := false
Tinoj Josephcf161be2022-07-07 19:47:47 +0530295
vinokuma926cb3e2023-03-29 11:41:06 +0530296 if port, ok := igd.PonPortChannelMap.Get(ponPortID); ok {
297 channelPortMap := port.(*PonPortChannels)
298 deleted = channelPortMap.RemoveChannelFromMap(uniPort, channelIP.String())
299 if deleted && channelPortMap.ChannelList.Length() == 0 {
300 igd.PonPortChannelMap.Remove(ponPortID)
301 ponRemoved = true
302 }
303 GetApplication().UpdateActiveChannelCountForPonPort(igd.Device, uniPort, ponPortID, false, deleted, igd)
304 } else {
305 logger.Warnw(ctx, "PON port doesn't exists in the igd", log.Fields{"PonPortID": ponPortID})
306 }
307 return ponRemoved
Tinoj Josephcf161be2022-07-07 19:47:47 +0530308}
309
310// printChannel to print channel info
311func printChannel(key interface{}, value interface{}) bool {
vinokuma926cb3e2023-03-29 11:41:06 +0530312 logger.Infow(ctx, "ChannelMap", log.Fields{"Channel": key.(string), "Igc": value.(*IgmpGroupChannel)})
313 return true
Tinoj Josephcf161be2022-07-07 19:47:47 +0530314}
315
316// printPortChannel to print port channel
317func printPortChannel(key interface{}, value interface{}) bool {
vinokuma926cb3e2023-03-29 11:41:06 +0530318 logger.Infow(ctx, "PortChannelMap", log.Fields{"Port": key.(string), "List": value.([]net.IP)})
319 return true
Tinoj Josephcf161be2022-07-07 19:47:47 +0530320}
321
Tinoj Josephcf161be2022-07-07 19:47:47 +0530322// AddReceiver add the receiver to the device and perform other actions such as adding the group
323// to the physical device, add members, add flows to point the MC packets to the
324// group. Also, send a IGMP report upstream if there is a change in the group
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530325func (igd *IgmpGroupDevice) AddReceiver(cntx context.Context, port string, groupAddr net.IP,
vinokuma926cb3e2023-03-29 11:41:06 +0530326 group *layers.IGMPv3GroupRecord, version uint8, cvlan uint16, pbit uint8, ponPortID uint32) {
327 var igc *IgmpGroupChannel
328 logger.Debugw(ctx, "Processing receiver for device", log.Fields{"Channel": groupAddr, "Port": port, "Device": igd.Device})
Tinoj Josephcf161be2022-07-07 19:47:47 +0530329
vinokuma926cb3e2023-03-29 11:41:06 +0530330 igcIntf, ok := igd.GroupChannels.Load(groupAddr.String())
331 if !ok {
332 igc = NewIgmpGroupChannel(igd, groupAddr, version)
333 igd.GroupChannels.Store(groupAddr.String(), igc)
334 } else {
335 igc = igcIntf.(*IgmpGroupChannel)
336 }
Tinoj Josephcf161be2022-07-07 19:47:47 +0530337
vinokuma926cb3e2023-03-29 11:41:06 +0530338 if !igd.GroupInstalled {
339 igd.AddNewReceiver(cntx, port, groupAddr, group, cvlan, pbit, ponPortID)
340 return
341 }
Tinoj Josephcf161be2022-07-07 19:47:47 +0530342
vinokuma926cb3e2023-03-29 11:41:06 +0530343 isNewReceiver := igc.AddReceiver(cntx, port, group, cvlan, pbit)
344 if isNewReceiver {
345 ipsList := []net.IP{}
346 ipsIntf, _ := igd.PortChannelMap.Load(port)
347 if ipsIntf != nil {
348 ipsList = ipsIntf.([]net.IP)
349 }
350 ipsList = append(ipsList, groupAddr)
351 igd.PortChannelMap.Store(port, ipsList)
352 logger.Debugw(ctx, "Port Channel Updated", log.Fields{"Port": port, "AddedChannelList": ipsList, "Addr": groupAddr})
Tinoj Josephcf161be2022-07-07 19:47:47 +0530353
vinokuma926cb3e2023-03-29 11:41:06 +0530354 isNewPonReceiver := igd.AddChannelToChannelsPerPon(port, groupAddr, ponPortID)
355 //Modify group only if this is the first time the port is subscribing for the group
356 if isNewPonReceiver {
357 igd.ModMcGroup()
358 }
359 }
360 if err := igd.WriteToDb(cntx); err != nil {
361 logger.Errorw(ctx, "Igmp group device Write to DB failed", log.Fields{"Device": igd.Device, "GroupName": igd.GroupName, "GroupAddr": igd.GroupAddr.String()})
362 }
Tinoj Josephcf161be2022-07-07 19:47:47 +0530363}
364
365// AddNewReceiver to add new receiver
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530366func (igd *IgmpGroupDevice) AddNewReceiver(cntx context.Context, port string, groupAddr net.IP, group *layers.IGMPv3GroupRecord, cvlan uint16, pbit uint8, ponPortID uint32) {
vinokuma926cb3e2023-03-29 11:41:06 +0530367 logger.Debugw(ctx, "Adding New Device Receiver", log.Fields{"Channel": groupAddr, "Port": port, "Device": igd.Device})
368 igcIntf, _ := igd.GroupChannels.Load(groupAddr.String())
369 if igcIntf == nil {
370 logger.Warnw(ctx, "No Group Channel present for given channel", log.Fields{"Channel": groupAddr, "Port": port, "Device": igd.Device})
371 return
372 }
Tinoj Josephcf161be2022-07-07 19:47:47 +0530373
vinokuma926cb3e2023-03-29 11:41:06 +0530374 igc := igcIntf.(*IgmpGroupChannel)
375 ipsList := []net.IP{}
376 ipsIntf, _ := igd.PortChannelMap.Load(port)
377 if ipsIntf != nil {
378 ipsList = ipsIntf.([]net.IP)
379 }
380 ipsList = append(ipsList, groupAddr)
381 igd.PortChannelMap.Store(port, ipsList)
382 igd.AddChannelToChannelsPerPon(port, groupAddr, ponPortID)
383 logger.Debugw(ctx, "Port Channel Updated", log.Fields{"Port": port, "NewChannelList": ipsList, "Addr": groupAddr})
Tinoj Josephcf161be2022-07-07 19:47:47 +0530384
vinokuma926cb3e2023-03-29 11:41:06 +0530385 igd.AddMcGroup()
386 igc.AddReceiver(cntx, port, group, cvlan, pbit)
387 if err := igd.WriteToDb(cntx); err != nil {
388 logger.Errorw(ctx, "Igmp group device Write to DB failed", log.Fields{"Device": igd.Device, "GroupName": igd.GroupName, "GroupAddr": igd.GroupAddr.String()})
389 }
Tinoj Josephcf161be2022-07-07 19:47:47 +0530390}
391
Tinoj Josephcf161be2022-07-07 19:47:47 +0530392// NumReceivers to get number of receivers
393func (igd *IgmpGroupDevice) NumReceivers() int {
vinokuma926cb3e2023-03-29 11:41:06 +0530394 var numReceivers int
395 len := func(key interface{}, value interface{}) bool {
396 numReceivers++
397 return true
398 }
399 igd.PortChannelMap.Range(len)
400 return numReceivers
Tinoj Josephcf161be2022-07-07 19:47:47 +0530401}
402
403// DelReceiver is called when Query expiry happened for a receiver. This removes the receiver from the
404// the group
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530405func (igd *IgmpGroupDevice) DelReceiver(cntx context.Context, groupAddr net.IP, port string, group *layers.IGMPv3GroupRecord, ponPortID uint32) {
vinokuma926cb3e2023-03-29 11:41:06 +0530406 logger.Debugw(ctx, "Deleting Receiver for Device", log.Fields{"port": port, "GroupIP": groupAddr.String()})
407 var igc *IgmpGroupChannel
408 var igcIntf interface{}
409 var ok bool
410 var srcList []net.IP
411 incl := false
412 mvp := GetApplication().GetMvlanProfileByTag(igd.Mvlan)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530413
vinokuma926cb3e2023-03-29 11:41:06 +0530414 if _, ok = mvp.Proxy[igd.GroupName]; ok {
415 incl = true
416 } else if group != nil {
417 srcList = group.SourceAddresses
418 incl = isIncl(group.Type)
419 }
Tinoj Josephcf161be2022-07-07 19:47:47 +0530420
vinokuma926cb3e2023-03-29 11:41:06 +0530421 if igcIntf, ok = igd.GroupChannels.Load(groupAddr.String()); !ok {
422 logger.Warnw(ctx, "Igmp Channel for group IP doesnt exist", log.Fields{"GroupAddr": groupAddr.String()})
423 return
424 }
425 igc = igcIntf.(*IgmpGroupChannel)
426 if ok := igc.DelReceiver(cntx, port, incl, srcList); !ok {
427 return
428 }
Tinoj Josephcf161be2022-07-07 19:47:47 +0530429
vinokuma926cb3e2023-03-29 11:41:06 +0530430 if igc.NumReceivers() == 0 {
431 igd.DelIgmpGroupChannel(cntx, igc)
432 }
433 igd.DelPortFromChannel(port, groupAddr)
434 isGroupModified := igd.RemoveChannelFromChannelsPerPon(port, groupAddr, ponPortID)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530435
vinokuma926cb3e2023-03-29 11:41:06 +0530436 //Remove port from receiver if port has no subscription to any of the group channels
437 if isGroupModified {
438 igd.ModMcGroup()
439 }
440 if err := igd.WriteToDb(cntx); err != nil {
441 logger.Errorw(ctx, "Igmp group device Write to DB failed", log.Fields{"Device": igd.Device, "GroupName": igd.GroupName, "GroupAddr": igd.GroupAddr.String()})
442 }
Tinoj Josephcf161be2022-07-07 19:47:47 +0530443}
444
445// DelChannelReceiver is called when Query expiry happened for a receiver. This removes the receiver from the
446// the group
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530447func (igd *IgmpGroupDevice) DelChannelReceiver(cntx context.Context, groupAddr net.IP) map[string]*IgmpGroupPort {
vinokuma926cb3e2023-03-29 11:41:06 +0530448 portsRemoved := make(map[string]*IgmpGroupPort)
449 groupModified := false
450 // ifEmpty := true
451 igcIntf, _ := igd.GroupChannels.Load(groupAddr.String())
Tinoj Josephcf161be2022-07-07 19:47:47 +0530452
vinokuma926cb3e2023-03-29 11:41:06 +0530453 if igcIntf == nil {
454 return portsRemoved
455 }
456 igc := igcIntf.(*IgmpGroupChannel)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530457
vinokuma926cb3e2023-03-29 11:41:06 +0530458 for port, igp := range igc.NewReceivers {
459 _ = db.DelIgmpRcvr(cntx, igc.Mvlan, igc.GroupAddr, igc.Device, port) //TODO: Y not here
460 igd.DelPortFromChannel(port, igc.GroupAddr)
461 ponPortID := GetApplication().GetPonPortID(igd.Device, port)
462 groupModified = igd.RemoveChannelFromChannelsPerPon(port, igc.GroupAddr, ponPortID)
463 delete(igc.NewReceivers, port)
464 portsRemoved[port] = igp
465 }
466 for port, igp := range igc.CurReceivers {
467 _ = db.DelIgmpRcvr(cntx, igc.Mvlan, igc.GroupAddr, igc.Device, port)
468 igd.DelPortFromChannel(port, igc.GroupAddr)
469 ponPortID := GetApplication().GetPonPortID(igd.Device, port)
470 groupModified = igd.RemoveChannelFromChannelsPerPon(port, igc.GroupAddr, ponPortID)
471 delete(igc.CurReceivers, port)
472 portsRemoved[port] = igp
473 }
Tinoj Josephcf161be2022-07-07 19:47:47 +0530474
vinokuma926cb3e2023-03-29 11:41:06 +0530475 igc.DelMcFlow(cntx)
476 igd.DelIgmpGroupChannel(cntx, igc)
477 igc.Exclude = 0
478 igc.SendLeaveToServer()
Tinoj Josephcf161be2022-07-07 19:47:47 +0530479
vinokuma926cb3e2023-03-29 11:41:06 +0530480 if groupModified {
481 igd.ModMcGroup()
482 }
483 if err := igd.WriteToDb(cntx); err != nil {
484 logger.Errorw(ctx, "Igmp group device Write to DB failed", log.Fields{"Device": igd.Device, "GroupName": igd.GroupName, "GroupAddr": igd.GroupAddr.String()})
485 }
486 logger.Debugw(ctx, "Deleted the receiver Flow", log.Fields{"Num Receivers": igc.NumReceivers()})
487 return portsRemoved
Tinoj Josephcf161be2022-07-07 19:47:47 +0530488}
489
490// DelIgmpGroupChannel to delete igmp group channel
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530491func (igd *IgmpGroupDevice) DelIgmpGroupChannel(cntx context.Context, igc *IgmpGroupChannel) {
vinokuma926cb3e2023-03-29 11:41:06 +0530492 if igc.NumReceivers() != 0 {
493 igc.DelAllReceivers(cntx)
494 }
495 _ = db.DelIgmpChannel(cntx, igc.Mvlan, igc.GroupName, igc.Device, igc.GroupAddr)
496 igd.GroupChannels.Delete(igc.GroupAddr.String())
497 logger.Infow(ctx, "Deleted the Channel from Device", log.Fields{"Channel": igc.GroupAddr.String()})
498 isLenZero := true
499 checkIfEmpty := func(key interface{}, value interface{}) bool {
500 isLenZero = false
501 return false
502 }
503 igd.GroupChannels.Range(checkIfEmpty)
504 if isLenZero {
505 logger.Infow(ctx, "No more active channels. Deleting MC Group", log.Fields{"Device": igd.Device, "Group": igd.GroupName})
506 igd.DelMcGroup(false)
507 }
Tinoj Josephcf161be2022-07-07 19:47:47 +0530508}
509
510// func (igd *IgmpGroupDevice) DelIgmpChannel(igc *IgmpGroupChannel) {
511// db.DelIgmpChannel(igc.GroupName, igc.Device, igc.GroupAddr)
512// delete(igd.GroupChannels, igc.GroupAddr.String())
513// logger.Debugw(ctx, "Deleted the Channel", log.Fields{"Num Receivers": igc.NumReceivers()})
514// }
515
516// DelPortFromChannel to delete port from channel
517func (igd *IgmpGroupDevice) DelPortFromChannel(port string, groupAddr net.IP) bool {
vinokuma926cb3e2023-03-29 11:41:06 +0530518 ipsList := []net.IP{}
519 ipsListIntf, _ := igd.PortChannelMap.Load(port)
520 if ipsListIntf != nil {
521 ipsList = ipsListIntf.([]net.IP)
522 }
523 for i, addr := range ipsList {
524 if addr.Equal(groupAddr) {
525 ipsList = append(ipsList[:i], ipsList[i+1:]...)
526 //Remove port from receiver if port has no subscription to any of the group channels
527 if len(ipsList) == 0 {
528 igd.PortChannelMap.Delete(port)
529 } else {
530 //Update the map with modified ips list
531 igd.PortChannelMap.Store(port, ipsList)
532 }
533 logger.Debugw(ctx, "Port Channel Updated", log.Fields{"Port": port, "DelChannelList": ipsList, "Addr": groupAddr.String()})
534 return true
535 }
536 }
537 return false
Tinoj Josephcf161be2022-07-07 19:47:47 +0530538}
539
540// DelAllChannels deletes all receiver for the provided igmp device
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530541func (igd *IgmpGroupDevice) DelAllChannels(cntx context.Context) {
vinokuma926cb3e2023-03-29 11:41:06 +0530542 logger.Infow(ctx, "Deleting All Channel for Device", log.Fields{"Device": igd.Device, "Group": igd.GroupName})
543 delGroupChannels := func(key interface{}, value interface{}) bool {
544 igc := value.(*IgmpGroupChannel)
545 igd.DelIgmpGroupChannel(cntx, igc)
546 return true
547 }
548 igd.GroupChannels.Range(delGroupChannels)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530549}
550
551// ProcessQuery process query received from the upstream IGMP server
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530552func (igd *IgmpGroupDevice) ProcessQuery(cntx context.Context, groupAddr net.IP, ver uint8) {
vinokuma926cb3e2023-03-29 11:41:06 +0530553 logger.Debugw(ctx, "Received Query From Server", log.Fields{"Version": ver})
554 if ver != *igd.ServVersion {
555 igd.ServVersionExpiry = time.Now().Add(time.Duration(2*igd.proxyCfg.KeepAliveInterval) * time.Second)
556 *igd.ServVersion = ver
557 mvp := GetApplication().GetMvlanProfileByTag(igd.Mvlan)
558 if err := mvp.WriteToDb(cntx); err != nil {
559 logger.Errorw(ctx, "Mvlan profile write to DB failed", log.Fields{"ProfileName": mvp.Name})
560 }
561 }
562 if igc, ok := igd.GroupChannels.Load(groupAddr.String()); ok {
563 igc.(*IgmpGroupChannel).SendReport(true)
564 return
565 }
566 logger.Infow(ctx, "No Members for Channel. Dropping Igmp Query", log.Fields{"Group": igd.GroupName, "Channel": groupAddr.String()})
Tinoj Josephcf161be2022-07-07 19:47:47 +0530567}
568
569// AddMcGroup add the new group on the device when a receiver joins the group
570func (igd *IgmpGroupDevice) AddMcGroup() {
vinokuma926cb3e2023-03-29 11:41:06 +0530571 if !igd.GroupInstalled {
572 group := &of.Group{}
573 group.Command = of.GroupCommandAdd
574 group.GroupID = igd.GroupID
575 group.Device = igd.Device
576 group.SetVlan = igd.PonVlan
577 group.IsPonVlanPresent = igd.IsPonVlanPresent
Tinoj Josephcf161be2022-07-07 19:47:47 +0530578
vinokuma926cb3e2023-03-29 11:41:06 +0530579 addbuckets := func(key interface{}, value interface{}) bool {
580 port := key.(string)
581 var portID uint32
582 if d := GetApplication().GetDevice(group.Device); d != nil {
583 GetApplication().portLock.Lock()
584 p := d.GetPort(port)
585 GetApplication().portLock.Unlock()
586 portID = p.ID
587 }
588 //ponPortID := key.(uint32)
589 if portID != 0xFF {
590 group.Buckets = append(group.Buckets, portID)
591 }
592 return true
593 }
594 igd.PortChannelMap.Range(addbuckets)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530595
vinokuma926cb3e2023-03-29 11:41:06 +0530596 port, _ := GetApplication().GetNniPort(igd.Device)
597 _ = cntlr.GetController().GroupUpdate(port, igd.Device, group)
598 igd.GroupInstalled = true
599 }
Tinoj Josephcf161be2022-07-07 19:47:47 +0530600}
601
602// ModMcGroup updates the group on the device when either a receiver leaves
603// or joins the group
604func (igd *IgmpGroupDevice) ModMcGroup() {
vinokuma926cb3e2023-03-29 11:41:06 +0530605 if igd.GroupInstalled {
606 group := &of.Group{}
607 group.Command = of.GroupCommandMod
608 group.GroupID = igd.GroupID
609 group.Device = igd.Device
610 group.SetVlan = igd.PonVlan
611 group.IsPonVlanPresent = igd.IsPonVlanPresent
Tinoj Josephcf161be2022-07-07 19:47:47 +0530612
vinokuma926cb3e2023-03-29 11:41:06 +0530613 addbuckets := func(key interface{}, value interface{}) bool {
614 port := key.(string)
615 var portID uint32
616 if d := GetApplication().GetDevice(group.Device); d != nil {
617 GetApplication().portLock.Lock()
618 p := d.GetPort(port)
619 GetApplication().portLock.Unlock()
620 portID = p.ID
621 }
622 //ponPortID := key.(uint32)
623 if portID != 0xFF {
624 group.Buckets = append(group.Buckets, portID)
625 }
626 return true
627 }
628 igd.PortChannelMap.Range(addbuckets)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530629
vinokuma926cb3e2023-03-29 11:41:06 +0530630 port, _ := GetApplication().GetNniPort(igd.Device)
631 _ = cntlr.GetController().GroupUpdate(port, igd.Device, group)
632 } else {
633 logger.Warnw(ctx, "Update Group Failed. Group not yet created", log.Fields{"Igd": igd.Device})
634 }
Tinoj Josephcf161be2022-07-07 19:47:47 +0530635}
636
637// DelMcGroup : The group is deleted when the last receiver leaves the group
638func (igd *IgmpGroupDevice) DelMcGroup(forceDelete bool) {
vinokuma926cb3e2023-03-29 11:41:06 +0530639 logger.Infow(ctx, "Delete Mc Group Request", log.Fields{"Device": igd.Device, "GroupID": igd.GroupID, "ForceFlag": forceDelete, "GroupInstalled": igd.GroupInstalled})
640 /*
641 if !forceDelete && !checkIfForceGroupRemove(igd.Device) {
642 if success := AddToPendingPool(igd.Device, igd.getKey()); success {
643 return
644 }
645 }*/
646 if igd.GroupInstalled {
647 logger.Debugw(ctx, "Deleting Group", log.Fields{"Device": igd.Device, "Id": igd.GroupID})
648 group := &of.Group{}
649 group.Command = of.GroupCommandDel
650 group.GroupID = igd.GroupID
651 group.Device = igd.Device
652 group.ForceAction = true
Tinoj Josephcf161be2022-07-07 19:47:47 +0530653
vinokuma926cb3e2023-03-29 11:41:06 +0530654 port, _ := GetApplication().GetNniPort(igd.Device)
655 _ = cntlr.GetController().GroupUpdate(port, igd.Device, group)
656 igd.GroupInstalled = false
657 }
Tinoj Josephcf161be2022-07-07 19:47:47 +0530658}
659
660// QueryExpiry processes query expiry. Upon expiry, take stock of the situation
661// add either retain/release the group based on number of receivers left
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530662func (igd *IgmpGroupDevice) QueryExpiry(cntx context.Context) {
vinokuma926cb3e2023-03-29 11:41:06 +0530663 logger.Debugw(ctx, "Query Expiry", log.Fields{"Device": igd.Device})
Tinoj Josephcf161be2022-07-07 19:47:47 +0530664
vinokuma926cb3e2023-03-29 11:41:06 +0530665 // Delete the IGMP flow added for this port if port state is down or query count exceeded
666 handleQueryExp := func(key interface{}, value interface{}) bool {
667 igc := value.(*IgmpGroupChannel)
668 for portKey, port := range igc.CurReceivers {
669 if portKey == StaticPort {
670 continue
671 }
Tinoj Josephcf161be2022-07-07 19:47:47 +0530672
vinokuma926cb3e2023-03-29 11:41:06 +0530673 logger.Warnw(ctx, "Expired Receiver Port", log.Fields{"PortKey": portKey, "IGP": port, "GroupAddr": igc.GroupAddr,
674 "Count": port.QueryTimeoutCount})
675 state, err := cntlr.GetController().GetPortState(igc.Device, portKey)
676 logger.Debugw(ctx, "Expired Member Port State", log.Fields{"state": state})
677 ponPortID := GetApplication().GetPonPortID(igd.Device, portKey)
678 if err == nil && state == cntlr.PortStateDown {
679 igd.DelReceiver(cntx, igc.GroupAddr, portKey, nil, ponPortID)
680 }
Tinoj Josephcf161be2022-07-07 19:47:47 +0530681
vinokuma926cb3e2023-03-29 11:41:06 +0530682 port.QueryTimeoutCount++
683 logger.Debugw(ctx, "Expired Port TimeoutCount", log.Fields{"count": port.QueryTimeoutCount})
684 if port.QueryTimeoutCount >= (*igc.proxyCfg).KeepAliveCount {
685 logger.Errorw(ctx, "Expiry Timeout count exceeded. Trigger delete receiver", log.Fields{"PortKey": portKey,
686 "GroupAddr": igc.GroupAddr, "Count": port.QueryTimeoutCount})
687 igd.DelReceiver(cntx, igc.GroupAddr, portKey, nil, ponPortID)
688 SendQueryExpiredEventGroupSpecific(portKey, igd, igc)
689 } else {
690 _ = port.WriteToDb(cntx, igc.Mvlan, igc.GroupAddr, igc.Device)
691 }
692 }
693 return true
694 }
695 igd.GroupChannels.Range(handleQueryExp)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530696}