blob: 0d597474f2b8b72e6ca1c6ac8f68cfff87f6d919 [file] [log] [blame]
Tinoj Josephcf161be2022-07-07 19:47:47 +05301/*
2* Copyright 2022-present Open Networking Foundation
3* Licensed under the Apache License, Version 2.0 (the "License");
4* you may not use this file except in compliance with the License.
5* You may obtain a copy of the License at
6*
7* http://www.apache.org/licenses/LICENSE-2.0
8*
9* Unless required by applicable law or agreed to in writing, software
10* distributed under the License is distributed on an "AS IS" BASIS,
11* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12* See the License for the specific language governing permissions and
13* limitations under the License.
14*/
15
16package application
17
18import (
Tinoj Joseph07cc5372022-07-18 22:53:51 +053019 "context"
Tinoj Josephcf161be2022-07-07 19:47:47 +053020 "encoding/json"
21 "net"
22 "sync"
23 "time"
24
25 "github.com/google/gopacket/layers"
26
27 cntlr "voltha-go-controller/internal/pkg/controller"
28 "voltha-go-controller/internal/pkg/of"
29 "voltha-go-controller/internal/pkg/util"
30 "voltha-go-controller/log"
31)
32
33// IgmpGroupDevice : IGMP Group Device manages the IGMP group for all listerns on
34// a single OLT. It aggregates reports received on a single group
35// and performs the count. It is responsible for sending upstream
36// report when the first listener joins and is responsible for
37// sending responses to upstream queries
38type IgmpGroupDevice struct {
39 Device string
40 SerialNo string
41 GroupID uint32
42 GroupName string
43 GroupAddr net.IP
44 RecvVersion uint8
45 ServVersion *uint8
46 RecvVersionExpiry time.Time
47 ServVersionExpiry time.Time
48 Mvlan of.VlanType
49 PonVlan of.VlanType
50 IsPonVlanPresent bool
51 GroupInstalled bool
52 GroupChannels sync.Map `json:"-"` // [ipAddr]*IgmpGroupChannel
53 PortChannelMap sync.Map `json:"-"` // [portName][]net.IP
54 PonPortChannelMap *util.ConcurrentMap `json:"-"` // [ponPortId]*PonPortChannels
55 proxyCfg *IgmpProfile // IgmpSrcIp from IgmpProfile is not used, it is kept for backward compatibility
56 IgmpProxyIP *net.IP `json:"-"`
57 NextQueryTime time.Time
58 QueryExpiryTime time.Time
59}
60
61// NewIgmpGroupDevice is constructor for a device. The default IGMP version is set to 3
62// as the protocol defines the way to manage backward compatibility
63// The implementation handles simultaneous presense of lower versioned
64// receivers
65func NewIgmpGroupDevice(name string, ig *IgmpGroup, id uint32, version uint8) *IgmpGroupDevice {
66 var igd IgmpGroupDevice
67 igd.Device = name
68 igd.GroupID = id
69 igd.GroupName = ig.GroupName
70 igd.GroupAddr = ig.GroupAddr
71 igd.Mvlan = ig.Mvlan
72 igd.PonVlan = ig.PonVlan
73 igd.IsPonVlanPresent = ig.IsPonVlanPresent
74 igd.GroupInstalled = false
75 igd.RecvVersion = version
76 igd.RecvVersionExpiry = time.Now()
77 igd.ServVersionExpiry = time.Now()
78 igd.PonPortChannelMap = util.NewConcurrentMap()
79
80 va := GetApplication()
81 if vd := va.GetDevice(igd.Device); vd != nil {
82 igd.SerialNo = vd.SerialNum
83 } else {
84 logger.Errorw(ctx, "Volt Device not found. log.Fields", log.Fields{"igd.Device": igd.Device})
85 return nil
86 }
87 mvp := GetApplication().GetMvlanProfileByTag(igd.Mvlan)
88 igd.ServVersion = mvp.IgmpServVersion[igd.SerialNo]
89
90 var mcastCfg *McastConfig
91 igd.proxyCfg, igd.IgmpProxyIP, mcastCfg = getIgmpProxyCfgAndIP(ig.Mvlan, igd.SerialNo)
92
93 // mvlan profile id + olt serial number---igmp group id
94 //igmpgroup id
95 igd.NextQueryTime = time.Now().Add(time.Duration(igd.proxyCfg.KeepAliveInterval) * time.Second)
96 igd.QueryExpiryTime = time.Now().Add(time.Duration(igd.proxyCfg.KeepAliveInterval) * time.Second)
97
98 if mcastCfg != nil {
99 mcastCfg.IgmpGroupDevices.Store(id, &igd)
100 logger.Debugw(ctx, "Igd added to mcast config", log.Fields{"mvlan": mcastCfg.MvlanProfileID, "groupId": id})
101 }
102 return &igd
103}
104
105// IgmpGroupDeviceReInit is re-initializer for a device. The default IGMP version is set to 3
106// as the protocol defines the way to manage backward compatibility
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530107func (igd *IgmpGroupDevice) IgmpGroupDeviceReInit(cntx context.Context, ig *IgmpGroup) {
Tinoj Josephcf161be2022-07-07 19:47:47 +0530108
109 logger.Infow(ctx, "Reinitialize Igmp Group Device", log.Fields{"Device": igd.Device, "GroupID": ig.GroupID, "OldName": igd.GroupName, "Name": ig.GroupName, "OldAddr": igd.GroupAddr.String(), "GroupAddr": ig.GroupAddr.String()})
110
111 if (igd.GroupName != ig.GroupName) || !igd.GroupAddr.Equal(ig.GroupAddr) {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530112 _ = db.DelIgmpDevice(cntx, igd.Mvlan, igd.GroupName, igd.GroupAddr, igd.Device)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530113 igd.GroupName = ig.GroupName
114 igd.GroupAddr = ig.GroupAddr
115 }
116 igd.RecvVersionExpiry = time.Now()
117 igd.ServVersionExpiry = time.Now()
118 igd.PonPortChannelMap = util.NewConcurrentMap()
119
120 var mcastCfg *McastConfig
121 igd.proxyCfg, igd.IgmpProxyIP, mcastCfg = getIgmpProxyCfgAndIP(ig.Mvlan, igd.SerialNo)
122
123 igd.NextQueryTime = time.Now().Add(time.Duration(igd.proxyCfg.KeepAliveInterval) * time.Second)
124 igd.QueryExpiryTime = time.Now().Add(time.Duration(igd.proxyCfg.KeepAliveInterval) * time.Second)
125
126 if mcastCfg != nil {
127 mcastCfg.IgmpGroupDevices.Store(ig.GroupID, igd)
128 logger.Debugw(ctx, "Igd added to mcast config", log.Fields{"mvlan": mcastCfg.MvlanProfileID, "groupId": ig.GroupID})
129 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530130 if err := igd.WriteToDb(cntx); err != nil {
Tinoj Josephcf161be2022-07-07 19:47:47 +0530131 logger.Errorw(ctx, "Igmp group device Write to DB failed", log.Fields{"Device": igd.Device, "GroupName": igd.GroupName, "GroupAddr": igd.GroupAddr.String()})
132 }
133}
134
135func getIgmpProxyCfgAndIP(mvlan of.VlanType, serialNo string) (*IgmpProfile, *net.IP, *McastConfig) {
136 va := GetApplication()
137 mVLANProfileID := va.GetMvlanProfileByTag(mvlan).Name
138 var mcastCfg *McastConfig
139 if mcastCfg = va.GetMcastConfig(serialNo, mVLANProfileID); mcastCfg == nil || (mcastCfg != nil && mcastCfg.IgmpProfileID == "") {
140 logger.Debugw(ctx, "Default IGMP config to be used", log.Fields{"mVLANProfileID": mVLANProfileID, "OltSerialNo": serialNo})
141 igmpProf := va.getIgmpProfileMap(DefaultIgmpProfID)
142 return igmpProf, &igmpProf.IgmpSourceIP, mcastCfg
143 }
144 return va.getIgmpProfileMap(mcastCfg.IgmpProfileID), &mcastCfg.IgmpProxyIP, mcastCfg
145}
146
147// updateGroupName to update the group name
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530148func (igd *IgmpGroupDevice) updateGroupName(cntx context.Context, newGroupName string) {
Tinoj Josephcf161be2022-07-07 19:47:47 +0530149
150 oldName := igd.GroupName
151 igd.GroupName = newGroupName
152 updateGroupName := func(key, value interface{}) bool {
153 igc := value.(*IgmpGroupChannel)
154 igc.GroupName = newGroupName
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530155 if err := igc.WriteToDb(cntx); err != nil {
Tinoj Josephcf161be2022-07-07 19:47:47 +0530156 logger.Errorw(ctx, "Igmp group channel Write to DB failed", log.Fields{"mvlan": igc.Mvlan, "GroupAddr": igc.GroupAddr})
157 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530158 _ = db.DelIgmpChannel(cntx, igc.Mvlan, oldName, igc.Device, igc.GroupAddr)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530159 return true
160 }
161 igd.GroupChannels.Range(updateGroupName)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530162 if err := igd.WriteToDb(cntx); err != nil {
Tinoj Josephcf161be2022-07-07 19:47:47 +0530163 logger.Errorw(ctx, "Igmp group device Write to DB failed", log.Fields{"Device": igd.Device, "GroupName": igd.GroupName, "GroupAddr": igd.GroupAddr.String()})
164 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530165 _ = db.DelIgmpDevice(cntx, igd.Mvlan, oldName, igd.GroupAddr, igd.Device)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530166}
167
168// NewIgmpGroupDeviceFromBytes is to create the IGMP group port from a byte slice
169func NewIgmpGroupDeviceFromBytes(b []byte) (*IgmpGroupDevice, error) {
170 var igd IgmpGroupDevice
171 if err := json.Unmarshal(b, &igd); err != nil {
172 return nil, err
173 }
174 return &igd, nil
175}
176
177// GetKey to get group name as key
178func (igd *IgmpGroupDevice) GetKey() string {
179
180 if !net.ParseIP("0.0.0.0").Equal(igd.GroupAddr) {
181 return igd.GroupName + "_" + igd.GroupAddr.String()
182 }
183 return igd.GroupName
184
185}
186
187// RestoreChannel to restore channel
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530188func (igd *IgmpGroupDevice) RestoreChannel(cntx context.Context, igmpGroupChannel []byte) {
Tinoj Josephcf161be2022-07-07 19:47:47 +0530189
190 if igc, err := NewIgmpGroupChannelFromBytes(igmpGroupChannel); err == nil {
191 igc.ServVersion = igd.ServVersion
192 igc.IgmpProxyIP = &igd.IgmpProxyIP
193 igc.proxyCfg = &igd.proxyCfg
194 igd.GroupChannels.Store(igc.GroupAddr.String(), igc)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530195 igc.RestorePorts(cntx)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530196
197 for port, igp := range igc.NewReceivers {
198 ipsList := []net.IP{}
199 ipsIntf, _ := igd.PortChannelMap.Load(port)
200 if ipsIntf != nil {
201 ipsList = ipsIntf.([]net.IP)
202 }
203
204 ipsList = append(ipsList, igc.GroupAddr)
205 igd.PortChannelMap.Store(port, ipsList)
206 logger.Infow(ctx, "Group Channels Restored", log.Fields{"IGC": igc})
207 igd.AddChannelToChannelsPerPon(port, igc.GroupAddr, igp.PonPortID)
208 }
209 } else {
210 logger.Warnw(ctx, "Failed to decode port from DB", log.Fields{"err": err})
211 }
212 logger.Info(ctx, "Group Device & Channels Restored")
213 igd.PortChannelMap.Range(printPortChannel)
214 igd.GroupChannels.Range(printChannel)
215
216}
217
218// RestoreChannels to restore channels
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530219func (igd *IgmpGroupDevice) RestoreChannels(cntx context.Context) {
Tinoj Josephcf161be2022-07-07 19:47:47 +0530220
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530221 igd.migrateIgmpChannels(cntx)
222 channels, _ := db.GetIgmpChannels(cntx, igd.Mvlan, igd.GroupName, igd.Device)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530223 for _, channel := range channels {
224
225 b, ok := channel.Value.([]byte)
226 if !ok {
227 logger.Warn(ctx, "The value type is not []byte")
228 continue
229 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530230 igd.RestoreChannel(cntx, b)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530231 }
232
233}
234
235
236// WriteToDb is utility to write IGMP Group Device Info to the database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530237func (igd *IgmpGroupDevice) WriteToDb(cntx context.Context) error {
Tinoj Josephcf161be2022-07-07 19:47:47 +0530238 b, err := json.Marshal(igd)
239 if err != nil {
240 return err
241 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530242 if err1 := db.PutIgmpDevice(cntx, igd.Mvlan, igd.GroupName, igd.GroupAddr, igd.Device, string(b)); err1 != nil {
Tinoj Josephcf161be2022-07-07 19:47:47 +0530243 return err1
244 }
245 logger.Info(ctx, "IGD Updated")
246 return nil
247}
248
249// Tick processes timing tick used to run timers within the device
250func (igd *IgmpGroupDevice) Tick() uint8 {
251 /* Not using RecvVersionExpiry as it is not used anywhere
252 if time.Now().After(igd.RecvVersionExpiry) {
253 igd.RecvVersion = IgmpVersion3
254 return true
255 }
256 */
257 return 0
258}
259
260// GetSubscriberCountForChannelAndPonPort Gets the active subscriber count
261// for the given channel for one particular PON port
262func (igd *IgmpGroupDevice) GetSubscriberCountForChannelAndPonPort(ponPortID uint32, channelIP net.IP) uint64 {
263 if portMapIntf, ok := igd.PonPortChannelMap.Get(ponPortID); ok {
264 portChannelMap := portMapIntf.(*PonPortChannels)
265
266 if channel, present := portChannelMap.ChannelList.Get(channelIP.String()); present {
267 return channel.(*UniPortList).UNIList.Length()
268 }
269 } else {
270 logger.Warnw(ctx, "PON port not found in PortChannelMap", log.Fields{"PON": ponPortID, "channel": channelIP})
271 }
272 return 0
273}
274
275// AddChannelToChannelsPerPon Adds the new channel into the per Pon channel list
276func (igd *IgmpGroupDevice) AddChannelToChannelsPerPon(uniPort string, channelIP net.IP, ponPortID uint32) bool {
277 logger.Debugw(ctx, "Adding channel to ActiveChannelsPerPon list", log.Fields{"PonPort": ponPortID, "channelIP": channelIP})
278
279 isNewChannel := bool(false)
280 isNewReceiver := false
281 if port, ok := igd.PonPortChannelMap.Get(ponPortID); !ok {
282 // PON port not exists in igd. adding it.
283 isNewReceiver = true
284 ponPortChannels := NewPonPortChannels()
285 isNewChannel = ponPortChannels.AddChannelToMap(uniPort, channelIP.String())
286 igd.PonPortChannelMap.Set(ponPortID, ponPortChannels)
287 } else {
288 // PON port exists in igd. Appending the channel list
289 // in the PON port.
290 isNewChannel = port.(*PonPortChannels).AddChannelToMap(uniPort, channelIP.String())
291 igd.PonPortChannelMap.Set(ponPortID, port)
292 count := port.(*PonPortChannels).GetActiveChannelCount()
293
294 logger.Debugw(ctx, "activeChannelCount", log.Fields{"count": count})
295 }
296 GetApplication().UpdateActiveChannelCountForPonPort(igd.Device, uniPort, ponPortID, true, isNewChannel, igd)
297 return isNewReceiver
298}
299
300// RemoveChannelFromChannelsPerPon removes the channel from the per pon channel list.
301func (igd *IgmpGroupDevice) RemoveChannelFromChannelsPerPon(uniPort string, channelIP net.IP, ponPortID uint32) bool {
302 logger.Debugw(ctx, "Removing channel from ActiveChannelsPerPon list", log.Fields{"PonPort": ponPortID, "channelIP": channelIP})
303 var deleted bool
304 ponRemoved := false
305
306 if port, ok := igd.PonPortChannelMap.Get(ponPortID); ok {
307 channelPortMap := port.(*PonPortChannels)
308 deleted = channelPortMap.RemoveChannelFromMap(uniPort, channelIP.String())
309 if deleted && channelPortMap.ChannelList.Length() == 0 {
310 igd.PonPortChannelMap.Remove(ponPortID)
311 ponRemoved = true
312 }
313 GetApplication().UpdateActiveChannelCountForPonPort(igd.Device, uniPort, ponPortID, false, deleted, igd)
314 } else {
315 logger.Warnw(ctx, "PON port doesn't exists in the igd", log.Fields{"PonPortID": ponPortID})
316 }
317 return ponRemoved
318}
319
320// printChannel to print channel info
321func printChannel(key interface{}, value interface{}) bool {
322 logger.Infow(ctx, "ChannelMap", log.Fields{"Channel": key.(string), "Igc": value.(*IgmpGroupChannel)})
323 return true
324}
325
326// printPortChannel to print port channel
327func printPortChannel(key interface{}, value interface{}) bool {
328 logger.Infow(ctx, "PortChannelMap", log.Fields{"Port": key.(string), "List": value.([]net.IP)})
329 return true
330}
331
332
333// AddReceiver add the receiver to the device and perform other actions such as adding the group
334// to the physical device, add members, add flows to point the MC packets to the
335// group. Also, send a IGMP report upstream if there is a change in the group
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530336func (igd *IgmpGroupDevice) AddReceiver(cntx context.Context, port string, groupAddr net.IP,
Tinoj Josephcf161be2022-07-07 19:47:47 +0530337 group *layers.IGMPv3GroupRecord, version uint8, cvlan uint16, pbit uint8, ponPortID uint32) {
338
339 var igc *IgmpGroupChannel
340 logger.Debugw(ctx, "Processing receiver for device", log.Fields{"Channel": groupAddr, "Port": port, "Device": igd.Device})
341
342 igcIntf, ok := igd.GroupChannels.Load(groupAddr.String())
343 if !ok {
344 igc = NewIgmpGroupChannel(igd, groupAddr, version)
345 igd.GroupChannels.Store(groupAddr.String(), igc)
346 } else {
347 igc = igcIntf.(*IgmpGroupChannel)
348 }
349
350 if !igd.GroupInstalled {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530351 igd.AddNewReceiver(cntx, port, groupAddr, group, cvlan, pbit, ponPortID)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530352 return
353 }
354
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530355 isNewReceiver := igc.AddReceiver(cntx, port, group, cvlan, pbit)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530356 if isNewReceiver {
357 ipsList := []net.IP{}
358 ipsIntf, _ := igd.PortChannelMap.Load(port)
359 if ipsIntf != nil {
360 ipsList = ipsIntf.([]net.IP)
361 }
362 ipsList = append(ipsList, groupAddr)
363 igd.PortChannelMap.Store(port, ipsList)
364 logger.Debugw(ctx, "Port Channel Updated", log.Fields{"Port": port, "AddedChannelList": ipsList, "Addr": groupAddr})
365
366 isNewPonReceiver := igd.AddChannelToChannelsPerPon(port, groupAddr, ponPortID)
367 //Modify group only if this is the first time the port is subscribing for the group
368 if isNewPonReceiver {
369 igd.ModMcGroup()
370 }
371 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530372 if err := igd.WriteToDb(cntx); err != nil {
Tinoj Josephcf161be2022-07-07 19:47:47 +0530373 logger.Errorw(ctx, "Igmp group device Write to DB failed", log.Fields{"Device": igd.Device, "GroupName": igd.GroupName, "GroupAddr": igd.GroupAddr.String()})
374 }
375}
376
377// AddNewReceiver to add new receiver
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530378func (igd *IgmpGroupDevice) AddNewReceiver(cntx context.Context, port string, groupAddr net.IP, group *layers.IGMPv3GroupRecord, cvlan uint16, pbit uint8, ponPortID uint32) {
Tinoj Josephcf161be2022-07-07 19:47:47 +0530379
380 logger.Debugw(ctx, "Adding New Device Receiver", log.Fields{"Channel": groupAddr, "Port": port, "Device": igd.Device})
381 igcIntf, _ := igd.GroupChannels.Load(groupAddr.String())
382 if igcIntf == nil {
383 logger.Warnw(ctx, "No Group Channel present for given channel", log.Fields{"Channel": groupAddr, "Port": port, "Device": igd.Device})
384 return
385 }
386
387 igc := igcIntf.(*IgmpGroupChannel)
388 ipsList := []net.IP{}
389 ipsIntf, _ := igd.PortChannelMap.Load(port)
390 if ipsIntf != nil {
391 ipsList = ipsIntf.([]net.IP)
392 }
393 ipsList = append(ipsList, groupAddr)
394 igd.PortChannelMap.Store(port, ipsList)
395 igd.AddChannelToChannelsPerPon(port, groupAddr, ponPortID)
396 logger.Debugw(ctx, "Port Channel Updated", log.Fields{"Port": port, "NewChannelList": ipsList, "Addr": groupAddr})
397
398 igd.AddMcGroup()
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530399 igc.AddReceiver(cntx, port, group, cvlan, pbit)
400 if err := igd.WriteToDb(cntx); err != nil {
Tinoj Josephcf161be2022-07-07 19:47:47 +0530401 logger.Errorw(ctx, "Igmp group device Write to DB failed", log.Fields{"Device": igd.Device, "GroupName": igd.GroupName, "GroupAddr": igd.GroupAddr.String()})
402 }
403}
404
405
406// NumReceivers to get number of receivers
407func (igd *IgmpGroupDevice) NumReceivers() int {
408 var numReceivers int
409 len := func(key interface{}, value interface{}) bool {
410 numReceivers++
411 return true
412 }
413 igd.PortChannelMap.Range(len)
414 return numReceivers
415}
416
417// DelReceiver is called when Query expiry happened for a receiver. This removes the receiver from the
418// the group
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530419func (igd *IgmpGroupDevice) DelReceiver(cntx context.Context, groupAddr net.IP, port string, group *layers.IGMPv3GroupRecord, ponPortID uint32) {
Tinoj Josephcf161be2022-07-07 19:47:47 +0530420
421 logger.Debugw(ctx, "Deleting Receiver for Device", log.Fields{"port": port, "GroupIP": groupAddr.String()})
422 var igc *IgmpGroupChannel
423 var igcIntf interface{}
424 var ok bool
425 var srcList []net.IP
426 incl := false
427 mvp := GetApplication().GetMvlanProfileByTag(igd.Mvlan)
428
429 if _, ok := mvp.Proxy[igd.GroupName]; ok {
430 incl = true
431 } else if group != nil {
432 srcList = group.SourceAddresses
433 incl = isIncl(group.Type)
434 }
435
436 if igcIntf, ok = igd.GroupChannels.Load(groupAddr.String()); !ok {
437 logger.Warnw(ctx, "Igmp Channel for group IP doesnt exist", log.Fields{"GroupAddr": groupAddr.String()})
438 return
439 }
440 igc = igcIntf.(*IgmpGroupChannel)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530441 if ok := igc.DelReceiver(cntx, port, incl, srcList); !ok {
Tinoj Josephcf161be2022-07-07 19:47:47 +0530442 return
443 }
444
445 if igc.NumReceivers() == 0 {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530446 igd.DelIgmpGroupChannel(cntx, igc)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530447 }
448 igd.DelPortFromChannel(port, groupAddr)
449 isGroupModified := igd.RemoveChannelFromChannelsPerPon(port, groupAddr, ponPortID)
450
451 //Remove port from receiver if port has no subscription to any of the group channels
452 if isGroupModified {
453 igd.ModMcGroup()
454 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530455 if err := igd.WriteToDb(cntx); err != nil {
Tinoj Josephcf161be2022-07-07 19:47:47 +0530456 logger.Errorw(ctx, "Igmp group device Write to DB failed", log.Fields{"Device": igd.Device, "GroupName": igd.GroupName, "GroupAddr": igd.GroupAddr.String()})
457 }
458}
459
460// DelChannelReceiver is called when Query expiry happened for a receiver. This removes the receiver from the
461// the group
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530462func (igd *IgmpGroupDevice) DelChannelReceiver(cntx context.Context, groupAddr net.IP) map[string]*IgmpGroupPort {
Tinoj Josephcf161be2022-07-07 19:47:47 +0530463
464 portsRemoved := make(map[string]*IgmpGroupPort)
465 groupModified := false
466 // ifEmpty := true
467 igcIntf, _ := igd.GroupChannels.Load(groupAddr.String())
468
469 if igcIntf == nil {
470 return portsRemoved
471 }
472 igc := igcIntf.(*IgmpGroupChannel)
473
474 for port, igp := range igc.NewReceivers {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530475 _ = db.DelIgmpRcvr(cntx, igc.Mvlan, igc.GroupAddr, igc.Device, port) //TODO: Y not here
Tinoj Josephcf161be2022-07-07 19:47:47 +0530476 igd.DelPortFromChannel(port, igc.GroupAddr)
477 ponPortID := GetApplication().GetPonPortID(igd.Device, port)
478 groupModified = igd.RemoveChannelFromChannelsPerPon(port, igc.GroupAddr, ponPortID)
479 delete(igc.NewReceivers, port)
480 portsRemoved[port] = igp
481 }
482 for port, igp := range igc.CurReceivers {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530483 _ = db.DelIgmpRcvr(cntx, igc.Mvlan, igc.GroupAddr, igc.Device, port)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530484 igd.DelPortFromChannel(port, igc.GroupAddr)
485 ponPortID := GetApplication().GetPonPortID(igd.Device, port)
486 groupModified = igd.RemoveChannelFromChannelsPerPon(port, igc.GroupAddr, ponPortID)
487 delete(igc.CurReceivers, port)
488 portsRemoved[port] = igp
489 }
490
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530491 igc.DelMcFlow(cntx)
492 igd.DelIgmpGroupChannel(cntx, igc)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530493 igc.Exclude = 0
494 igc.SendLeaveToServer()
495
496 if groupModified {
497 igd.ModMcGroup()
498 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530499 if err := igd.WriteToDb(cntx); err != nil {
Tinoj Josephcf161be2022-07-07 19:47:47 +0530500 logger.Errorw(ctx, "Igmp group device Write to DB failed", log.Fields{"Device": igd.Device, "GroupName": igd.GroupName, "GroupAddr": igd.GroupAddr.String()})
501 }
502 logger.Debugw(ctx, "Deleted the receiver Flow", log.Fields{"Num Receivers": igc.NumReceivers()})
503 return portsRemoved
504}
505
506// DelIgmpGroupChannel to delete igmp group channel
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530507func (igd *IgmpGroupDevice) DelIgmpGroupChannel(cntx context.Context, igc *IgmpGroupChannel) {
Tinoj Josephcf161be2022-07-07 19:47:47 +0530508
509 if igc.NumReceivers() != 0 {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530510 igc.DelAllReceivers(cntx)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530511 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530512 _ = db.DelIgmpChannel(cntx, igc.Mvlan, igc.GroupName, igc.Device, igc.GroupAddr)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530513 igd.GroupChannels.Delete(igc.GroupAddr.String())
514 logger.Infow(ctx, "Deleted the Channel from Device", log.Fields{"Channel": igc.GroupAddr.String()})
515 isLenZero := true
516 checkIfEmpty := func(key interface{}, value interface{}) bool {
517 isLenZero = false
518 return false
519 }
520 igd.GroupChannels.Range(checkIfEmpty)
521 if isLenZero {
522 logger.Infow(ctx, "No more active channels. Deleting MC Group", log.Fields{"Device": igd.Device, "Group": igd.GroupName})
523 igd.DelMcGroup(false)
524 }
525}
526
527// func (igd *IgmpGroupDevice) DelIgmpChannel(igc *IgmpGroupChannel) {
528// db.DelIgmpChannel(igc.GroupName, igc.Device, igc.GroupAddr)
529// delete(igd.GroupChannels, igc.GroupAddr.String())
530// logger.Debugw(ctx, "Deleted the Channel", log.Fields{"Num Receivers": igc.NumReceivers()})
531// }
532
533// DelPortFromChannel to delete port from channel
534func (igd *IgmpGroupDevice) DelPortFromChannel(port string, groupAddr net.IP) bool {
535 ipsList := []net.IP{}
536 ipsListIntf, _ := igd.PortChannelMap.Load(port)
537 if ipsListIntf != nil {
538 ipsList = ipsListIntf.([]net.IP)
539 }
540 for i, addr := range ipsList {
541 if addr.Equal(groupAddr) {
542 ipsList = append(ipsList[:i], ipsList[i+1:]...)
543 //Remove port from receiver if port has no subscription to any of the group channels
544 if len(ipsList) == 0 {
545 igd.PortChannelMap.Delete(port)
546 } else {
547 //Update the map with modified ips list
548 igd.PortChannelMap.Store(port, ipsList)
549 }
550 logger.Debugw(ctx, "Port Channel Updated", log.Fields{"Port": port, "DelChannelList": ipsList, "Addr": groupAddr.String()})
551 return true
552 }
553 }
554 return false
555}
556
557// DelAllChannels deletes all receiver for the provided igmp device
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530558func (igd *IgmpGroupDevice) DelAllChannels(cntx context.Context) {
Tinoj Josephcf161be2022-07-07 19:47:47 +0530559 logger.Infow(ctx, "Deleting All Channel for Device", log.Fields{"Device": igd.Device, "Group": igd.GroupName})
560 delGroupChannels := func(key interface{}, value interface{}) bool {
561 igc := value.(*IgmpGroupChannel)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530562 igd.DelIgmpGroupChannel(cntx, igc)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530563 return true
564 }
565 igd.GroupChannels.Range(delGroupChannels)
566}
567
568// ProcessQuery process query received from the upstream IGMP server
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530569func (igd *IgmpGroupDevice) ProcessQuery(cntx context.Context, groupAddr net.IP, ver uint8) {
Tinoj Josephcf161be2022-07-07 19:47:47 +0530570 logger.Debugw(ctx, "Received Query From Server", log.Fields{"Version": ver})
571 if ver != *igd.ServVersion {
572 igd.ServVersionExpiry = time.Now().Add(time.Duration(2*igd.proxyCfg.KeepAliveInterval) * time.Second)
573 *igd.ServVersion = ver
574 mvp := GetApplication().GetMvlanProfileByTag(igd.Mvlan)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530575 if err := mvp.WriteToDb(cntx); err != nil {
Tinoj Josephcf161be2022-07-07 19:47:47 +0530576 logger.Errorw(ctx, "Mvlan profile write to DB failed", log.Fields{"ProfileName": mvp.Name})
577 }
578 }
579 if igc, ok := igd.GroupChannels.Load(groupAddr.String()); ok {
580 igc.(*IgmpGroupChannel).SendReport(true)
581 return
582 }
583 logger.Infow(ctx, "No Members for Channel. Dropping Igmp Query", log.Fields{"Group": igd.GroupName, "Channel": groupAddr.String()})
584}
585
586// AddMcGroup add the new group on the device when a receiver joins the group
587func (igd *IgmpGroupDevice) AddMcGroup() {
588 if !igd.GroupInstalled {
589 group := &of.Group{}
590 group.Command = of.GroupCommandAdd
591 group.GroupID = igd.GroupID
592 group.Device = igd.Device
593 group.SetVlan = igd.PonVlan
594 group.IsPonVlanPresent = igd.IsPonVlanPresent
595
596 addbuckets := func(key interface{}, value interface{}) bool {
597 port := key.(string)
598 var portID uint32
599 if d := GetApplication().GetDevice(group.Device); d != nil {
600 GetApplication().portLock.Lock()
601 p := d.GetPort(port)
602 GetApplication().portLock.Unlock()
603 portID = p.ID
604 }
605 //ponPortID := key.(uint32)
606 if portID != 0xFF {
607 group.Buckets = append(group.Buckets, portID)
608 }
609 return true
610 }
611 igd.PortChannelMap.Range(addbuckets)
612
613 port, _ := GetApplication().GetNniPort(igd.Device)
614 _ = cntlr.GetController().GroupUpdate(port, igd.Device, group)
615 igd.GroupInstalled = true
616 }
617}
618
619// ModMcGroup updates the group on the device when either a receiver leaves
620// or joins the group
621func (igd *IgmpGroupDevice) ModMcGroup() {
622 if igd.GroupInstalled {
623 group := &of.Group{}
624 group.Command = of.GroupCommandMod
625 group.GroupID = igd.GroupID
626 group.Device = igd.Device
627 group.SetVlan = igd.PonVlan
628 group.IsPonVlanPresent = igd.IsPonVlanPresent
629
630 addbuckets := func(key interface{}, value interface{}) bool {
631 port := key.(string)
632 var portID uint32
633 if d := GetApplication().GetDevice(group.Device); d != nil {
634 GetApplication().portLock.Lock()
635 p := d.GetPort(port)
636 GetApplication().portLock.Unlock()
637 portID = p.ID
638 }
639 //ponPortID := key.(uint32)
640 if portID != 0xFF {
641 group.Buckets = append(group.Buckets, portID)
642 }
643 return true
644 }
645 igd.PortChannelMap.Range(addbuckets)
646
647 port, _ := GetApplication().GetNniPort(igd.Device)
648 _ = cntlr.GetController().GroupUpdate(port, igd.Device, group)
649 } else {
650 logger.Warnw(ctx, "Update Group Failed. Group not yet created", log.Fields{"Igd": igd.Device})
651 }
652}
653
654// DelMcGroup : The group is deleted when the last receiver leaves the group
655func (igd *IgmpGroupDevice) DelMcGroup(forceDelete bool) {
656
657 logger.Infow(ctx, "Delete Mc Group Request", log.Fields{"Device": igd.Device, "GroupID": igd.GroupID, "ForceFlag": forceDelete, "GroupInstalled": igd.GroupInstalled})
658 /*
659 if !forceDelete && !checkIfForceGroupRemove(igd.Device) {
660 if success := AddToPendingPool(igd.Device, igd.getKey()); success {
661 return
662 }
663 }*/
664 if igd.GroupInstalled {
665 logger.Debugw(ctx, "Deleting Group", log.Fields{"Device": igd.Device, "Id": igd.GroupID})
666 group := &of.Group{}
667 group.Command = of.GroupCommandDel
668 group.GroupID = igd.GroupID
669 group.Device = igd.Device
670 group.ForceAction = true
671
672 port, _ := GetApplication().GetNniPort(igd.Device)
673 _ = cntlr.GetController().GroupUpdate(port, igd.Device, group)
674 igd.GroupInstalled = false
675 }
676}
677
678// QueryExpiry processes query expiry. Upon expiry, take stock of the situation
679// add either retain/release the group based on number of receivers left
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530680func (igd *IgmpGroupDevice) QueryExpiry(cntx context.Context) {
Tinoj Josephcf161be2022-07-07 19:47:47 +0530681 logger.Debugw(ctx, "Query Expiry", log.Fields{"Device": igd.Device})
682
683
684 // Delete the IGMP flow added for this port if port state is down or query count exceeded
685 handleQueryExp := func(key interface{}, value interface{}) bool {
686 igc := value.(*IgmpGroupChannel)
687 for portKey, port := range igc.CurReceivers {
688
689 if portKey == StaticPort {
690 continue
691 }
692
693 logger.Warnw(ctx, "Expired Receiver Port", log.Fields{"PortKey": portKey, "IGP": port, "GroupAddr": igc.GroupAddr,
694 "Count": port.QueryTimeoutCount})
695 state, err := cntlr.GetController().GetPortState(igc.Device, portKey)
696 logger.Debugw(ctx, "Expired Member Port State", log.Fields{"state": state})
697 ponPortID := GetApplication().GetPonPortID(igd.Device, portKey)
698 if err == nil && state == cntlr.PortStateDown {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530699 igd.DelReceiver(cntx, igc.GroupAddr, portKey, nil, ponPortID)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530700 }
701
702 port.QueryTimeoutCount++
703 logger.Debugw(ctx, "Expired Port TimeoutCount", log.Fields{"count": port.QueryTimeoutCount})
704 if port.QueryTimeoutCount >= (*igc.proxyCfg).KeepAliveCount {
705 logger.Errorw(ctx, "Expiry Timeout count exceeded. Trigger delete receiver", log.Fields{"PortKey": portKey,
706 "GroupAddr": igc.GroupAddr, "Count": port.QueryTimeoutCount})
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530707 igd.DelReceiver(cntx, igc.GroupAddr, portKey, nil, ponPortID)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530708 SendQueryExpiredEventGroupSpecific(portKey, igd, igc)
709 } else {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530710 _ = port.WriteToDb(cntx, igc.Mvlan, igc.GroupAddr, igc.Device)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530711 }
712 }
713 return true
714 }
715 igd.GroupChannels.Range(handleQueryExp)
716}