blob: 07c2f825a76ae9a06d45386daf21b9d35d17ee1f [file] [log] [blame]
Tinoj Josephcf161be2022-07-07 19:47:47 +05301/*
2* Copyright 2022-present Open Networking Foundation
3* Licensed under the Apache License, Version 2.0 (the "License");
4* you may not use this file except in compliance with the License.
5* You may obtain a copy of the License at
6*
7* http://www.apache.org/licenses/LICENSE-2.0
8*
9* Unless required by applicable law or agreed to in writing, software
10* distributed under the License is distributed on an "AS IS" BASIS,
11* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12* See the License for the specific language governing permissions and
13* limitations under the License.
14*/
15
16package application
17
18import (
19 "encoding/json"
20 "net"
21 "sync"
22 "time"
23
24 "github.com/google/gopacket/layers"
25
26 cntlr "voltha-go-controller/internal/pkg/controller"
27 "voltha-go-controller/internal/pkg/of"
28 "voltha-go-controller/internal/pkg/util"
29 "voltha-go-controller/log"
30)
31
32// IgmpGroupDevice : IGMP Group Device manages the IGMP group for all listerns on
33// a single OLT. It aggregates reports received on a single group
34// and performs the count. It is responsible for sending upstream
35// report when the first listener joins and is responsible for
36// sending responses to upstream queries
37type IgmpGroupDevice struct {
38 Device string
39 SerialNo string
40 GroupID uint32
41 GroupName string
42 GroupAddr net.IP
43 RecvVersion uint8
44 ServVersion *uint8
45 RecvVersionExpiry time.Time
46 ServVersionExpiry time.Time
47 Mvlan of.VlanType
48 PonVlan of.VlanType
49 IsPonVlanPresent bool
50 GroupInstalled bool
51 GroupChannels sync.Map `json:"-"` // [ipAddr]*IgmpGroupChannel
52 PortChannelMap sync.Map `json:"-"` // [portName][]net.IP
53 PonPortChannelMap *util.ConcurrentMap `json:"-"` // [ponPortId]*PonPortChannels
54 proxyCfg *IgmpProfile // IgmpSrcIp from IgmpProfile is not used, it is kept for backward compatibility
55 IgmpProxyIP *net.IP `json:"-"`
56 NextQueryTime time.Time
57 QueryExpiryTime time.Time
58}
59
60// NewIgmpGroupDevice is constructor for a device. The default IGMP version is set to 3
61// as the protocol defines the way to manage backward compatibility
62// The implementation handles simultaneous presense of lower versioned
63// receivers
64func NewIgmpGroupDevice(name string, ig *IgmpGroup, id uint32, version uint8) *IgmpGroupDevice {
65 var igd IgmpGroupDevice
66 igd.Device = name
67 igd.GroupID = id
68 igd.GroupName = ig.GroupName
69 igd.GroupAddr = ig.GroupAddr
70 igd.Mvlan = ig.Mvlan
71 igd.PonVlan = ig.PonVlan
72 igd.IsPonVlanPresent = ig.IsPonVlanPresent
73 igd.GroupInstalled = false
74 igd.RecvVersion = version
75 igd.RecvVersionExpiry = time.Now()
76 igd.ServVersionExpiry = time.Now()
77 igd.PonPortChannelMap = util.NewConcurrentMap()
78
79 va := GetApplication()
80 if vd := va.GetDevice(igd.Device); vd != nil {
81 igd.SerialNo = vd.SerialNum
82 } else {
83 logger.Errorw(ctx, "Volt Device not found. log.Fields", log.Fields{"igd.Device": igd.Device})
84 return nil
85 }
86 mvp := GetApplication().GetMvlanProfileByTag(igd.Mvlan)
87 igd.ServVersion = mvp.IgmpServVersion[igd.SerialNo]
88
89 var mcastCfg *McastConfig
90 igd.proxyCfg, igd.IgmpProxyIP, mcastCfg = getIgmpProxyCfgAndIP(ig.Mvlan, igd.SerialNo)
91
92 // mvlan profile id + olt serial number---igmp group id
93 //igmpgroup id
94 igd.NextQueryTime = time.Now().Add(time.Duration(igd.proxyCfg.KeepAliveInterval) * time.Second)
95 igd.QueryExpiryTime = time.Now().Add(time.Duration(igd.proxyCfg.KeepAliveInterval) * time.Second)
96
97 if mcastCfg != nil {
98 mcastCfg.IgmpGroupDevices.Store(id, &igd)
99 logger.Debugw(ctx, "Igd added to mcast config", log.Fields{"mvlan": mcastCfg.MvlanProfileID, "groupId": id})
100 }
101 return &igd
102}
103
104// IgmpGroupDeviceReInit is re-initializer for a device. The default IGMP version is set to 3
105// as the protocol defines the way to manage backward compatibility
106func (igd *IgmpGroupDevice) IgmpGroupDeviceReInit(ig *IgmpGroup) {
107
108 logger.Infow(ctx, "Reinitialize Igmp Group Device", log.Fields{"Device": igd.Device, "GroupID": ig.GroupID, "OldName": igd.GroupName, "Name": ig.GroupName, "OldAddr": igd.GroupAddr.String(), "GroupAddr": ig.GroupAddr.String()})
109
110 if (igd.GroupName != ig.GroupName) || !igd.GroupAddr.Equal(ig.GroupAddr) {
111 _ = db.DelIgmpDevice(igd.Mvlan, igd.GroupName, igd.GroupAddr, igd.Device)
112 igd.GroupName = ig.GroupName
113 igd.GroupAddr = ig.GroupAddr
114 }
115 igd.RecvVersionExpiry = time.Now()
116 igd.ServVersionExpiry = time.Now()
117 igd.PonPortChannelMap = util.NewConcurrentMap()
118
119 var mcastCfg *McastConfig
120 igd.proxyCfg, igd.IgmpProxyIP, mcastCfg = getIgmpProxyCfgAndIP(ig.Mvlan, igd.SerialNo)
121
122 igd.NextQueryTime = time.Now().Add(time.Duration(igd.proxyCfg.KeepAliveInterval) * time.Second)
123 igd.QueryExpiryTime = time.Now().Add(time.Duration(igd.proxyCfg.KeepAliveInterval) * time.Second)
124
125 if mcastCfg != nil {
126 mcastCfg.IgmpGroupDevices.Store(ig.GroupID, igd)
127 logger.Debugw(ctx, "Igd added to mcast config", log.Fields{"mvlan": mcastCfg.MvlanProfileID, "groupId": ig.GroupID})
128 }
129 if err := igd.WriteToDb(); err != nil {
130 logger.Errorw(ctx, "Igmp group device Write to DB failed", log.Fields{"Device": igd.Device, "GroupName": igd.GroupName, "GroupAddr": igd.GroupAddr.String()})
131 }
132}
133
134func getIgmpProxyCfgAndIP(mvlan of.VlanType, serialNo string) (*IgmpProfile, *net.IP, *McastConfig) {
135 va := GetApplication()
136 mVLANProfileID := va.GetMvlanProfileByTag(mvlan).Name
137 var mcastCfg *McastConfig
138 if mcastCfg = va.GetMcastConfig(serialNo, mVLANProfileID); mcastCfg == nil || (mcastCfg != nil && mcastCfg.IgmpProfileID == "") {
139 logger.Debugw(ctx, "Default IGMP config to be used", log.Fields{"mVLANProfileID": mVLANProfileID, "OltSerialNo": serialNo})
140 igmpProf := va.getIgmpProfileMap(DefaultIgmpProfID)
141 return igmpProf, &igmpProf.IgmpSourceIP, mcastCfg
142 }
143 return va.getIgmpProfileMap(mcastCfg.IgmpProfileID), &mcastCfg.IgmpProxyIP, mcastCfg
144}
145
146// updateGroupName to update the group name
147func (igd *IgmpGroupDevice) updateGroupName(newGroupName string) {
148
149 oldName := igd.GroupName
150 igd.GroupName = newGroupName
151 updateGroupName := func(key, value interface{}) bool {
152 igc := value.(*IgmpGroupChannel)
153 igc.GroupName = newGroupName
154 if err := igc.WriteToDb(); err != nil {
155 logger.Errorw(ctx, "Igmp group channel Write to DB failed", log.Fields{"mvlan": igc.Mvlan, "GroupAddr": igc.GroupAddr})
156 }
157 _ = db.DelIgmpChannel(igc.Mvlan, oldName, igc.Device, igc.GroupAddr)
158 return true
159 }
160 igd.GroupChannels.Range(updateGroupName)
161 if err := igd.WriteToDb(); err != nil {
162 logger.Errorw(ctx, "Igmp group device Write to DB failed", log.Fields{"Device": igd.Device, "GroupName": igd.GroupName, "GroupAddr": igd.GroupAddr.String()})
163 }
164 _ = db.DelIgmpDevice(igd.Mvlan, oldName, igd.GroupAddr, igd.Device)
165}
166
167// NewIgmpGroupDeviceFromBytes is to create the IGMP group port from a byte slice
168func NewIgmpGroupDeviceFromBytes(b []byte) (*IgmpGroupDevice, error) {
169 var igd IgmpGroupDevice
170 if err := json.Unmarshal(b, &igd); err != nil {
171 return nil, err
172 }
173 return &igd, nil
174}
175
176// GetKey to get group name as key
177func (igd *IgmpGroupDevice) GetKey() string {
178
179 if !net.ParseIP("0.0.0.0").Equal(igd.GroupAddr) {
180 return igd.GroupName + "_" + igd.GroupAddr.String()
181 }
182 return igd.GroupName
183
184}
185
186// RestoreChannel to restore channel
187func (igd *IgmpGroupDevice) RestoreChannel(igmpGroupChannel []byte) {
188
189 if igc, err := NewIgmpGroupChannelFromBytes(igmpGroupChannel); err == nil {
190 igc.ServVersion = igd.ServVersion
191 igc.IgmpProxyIP = &igd.IgmpProxyIP
192 igc.proxyCfg = &igd.proxyCfg
193 igd.GroupChannels.Store(igc.GroupAddr.String(), igc)
194 igc.RestorePorts()
195
196 for port, igp := range igc.NewReceivers {
197 ipsList := []net.IP{}
198 ipsIntf, _ := igd.PortChannelMap.Load(port)
199 if ipsIntf != nil {
200 ipsList = ipsIntf.([]net.IP)
201 }
202
203 ipsList = append(ipsList, igc.GroupAddr)
204 igd.PortChannelMap.Store(port, ipsList)
205 logger.Infow(ctx, "Group Channels Restored", log.Fields{"IGC": igc})
206 igd.AddChannelToChannelsPerPon(port, igc.GroupAddr, igp.PonPortID)
207 }
208 } else {
209 logger.Warnw(ctx, "Failed to decode port from DB", log.Fields{"err": err})
210 }
211 logger.Info(ctx, "Group Device & Channels Restored")
212 igd.PortChannelMap.Range(printPortChannel)
213 igd.GroupChannels.Range(printChannel)
214
215}
216
217// RestoreChannels to restore channels
218func (igd *IgmpGroupDevice) RestoreChannels() {
219
220 igd.migrateIgmpChannels()
221 channels, _ := db.GetIgmpChannels(igd.Mvlan, igd.GroupName, igd.Device)
222 for _, channel := range channels {
223
224 b, ok := channel.Value.([]byte)
225 if !ok {
226 logger.Warn(ctx, "The value type is not []byte")
227 continue
228 }
229 igd.RestoreChannel(b)
230 }
231
232}
233
234
235// WriteToDb is utility to write IGMP Group Device Info to the database
236func (igd *IgmpGroupDevice) WriteToDb() error {
237 b, err := json.Marshal(igd)
238 if err != nil {
239 return err
240 }
241 if err1 := db.PutIgmpDevice(igd.Mvlan, igd.GroupName, igd.GroupAddr, igd.Device, string(b)); err1 != nil {
242 return err1
243 }
244 logger.Info(ctx, "IGD Updated")
245 return nil
246}
247
248// Tick processes timing tick used to run timers within the device
249func (igd *IgmpGroupDevice) Tick() uint8 {
250 /* Not using RecvVersionExpiry as it is not used anywhere
251 if time.Now().After(igd.RecvVersionExpiry) {
252 igd.RecvVersion = IgmpVersion3
253 return true
254 }
255 */
256 return 0
257}
258
259// GetSubscriberCountForChannelAndPonPort Gets the active subscriber count
260// for the given channel for one particular PON port
261func (igd *IgmpGroupDevice) GetSubscriberCountForChannelAndPonPort(ponPortID uint32, channelIP net.IP) uint64 {
262 if portMapIntf, ok := igd.PonPortChannelMap.Get(ponPortID); ok {
263 portChannelMap := portMapIntf.(*PonPortChannels)
264
265 if channel, present := portChannelMap.ChannelList.Get(channelIP.String()); present {
266 return channel.(*UniPortList).UNIList.Length()
267 }
268 } else {
269 logger.Warnw(ctx, "PON port not found in PortChannelMap", log.Fields{"PON": ponPortID, "channel": channelIP})
270 }
271 return 0
272}
273
274// AddChannelToChannelsPerPon Adds the new channel into the per Pon channel list
275func (igd *IgmpGroupDevice) AddChannelToChannelsPerPon(uniPort string, channelIP net.IP, ponPortID uint32) bool {
276 logger.Debugw(ctx, "Adding channel to ActiveChannelsPerPon list", log.Fields{"PonPort": ponPortID, "channelIP": channelIP})
277
278 isNewChannel := bool(false)
279 isNewReceiver := false
280 if port, ok := igd.PonPortChannelMap.Get(ponPortID); !ok {
281 // PON port not exists in igd. adding it.
282 isNewReceiver = true
283 ponPortChannels := NewPonPortChannels()
284 isNewChannel = ponPortChannels.AddChannelToMap(uniPort, channelIP.String())
285 igd.PonPortChannelMap.Set(ponPortID, ponPortChannels)
286 } else {
287 // PON port exists in igd. Appending the channel list
288 // in the PON port.
289 isNewChannel = port.(*PonPortChannels).AddChannelToMap(uniPort, channelIP.String())
290 igd.PonPortChannelMap.Set(ponPortID, port)
291 count := port.(*PonPortChannels).GetActiveChannelCount()
292
293 logger.Debugw(ctx, "activeChannelCount", log.Fields{"count": count})
294 }
295 GetApplication().UpdateActiveChannelCountForPonPort(igd.Device, uniPort, ponPortID, true, isNewChannel, igd)
296 return isNewReceiver
297}
298
299// RemoveChannelFromChannelsPerPon removes the channel from the per pon channel list.
300func (igd *IgmpGroupDevice) RemoveChannelFromChannelsPerPon(uniPort string, channelIP net.IP, ponPortID uint32) bool {
301 logger.Debugw(ctx, "Removing channel from ActiveChannelsPerPon list", log.Fields{"PonPort": ponPortID, "channelIP": channelIP})
302 var deleted bool
303 ponRemoved := false
304
305 if port, ok := igd.PonPortChannelMap.Get(ponPortID); ok {
306 channelPortMap := port.(*PonPortChannels)
307 deleted = channelPortMap.RemoveChannelFromMap(uniPort, channelIP.String())
308 if deleted && channelPortMap.ChannelList.Length() == 0 {
309 igd.PonPortChannelMap.Remove(ponPortID)
310 ponRemoved = true
311 }
312 GetApplication().UpdateActiveChannelCountForPonPort(igd.Device, uniPort, ponPortID, false, deleted, igd)
313 } else {
314 logger.Warnw(ctx, "PON port doesn't exists in the igd", log.Fields{"PonPortID": ponPortID})
315 }
316 return ponRemoved
317}
318
319// printChannel to print channel info
320func printChannel(key interface{}, value interface{}) bool {
321 logger.Infow(ctx, "ChannelMap", log.Fields{"Channel": key.(string), "Igc": value.(*IgmpGroupChannel)})
322 return true
323}
324
325// printPortChannel to print port channel
326func printPortChannel(key interface{}, value interface{}) bool {
327 logger.Infow(ctx, "PortChannelMap", log.Fields{"Port": key.(string), "List": value.([]net.IP)})
328 return true
329}
330
331
332// AddReceiver add the receiver to the device and perform other actions such as adding the group
333// to the physical device, add members, add flows to point the MC packets to the
334// group. Also, send a IGMP report upstream if there is a change in the group
335func (igd *IgmpGroupDevice) AddReceiver(port string, groupAddr net.IP,
336 group *layers.IGMPv3GroupRecord, version uint8, cvlan uint16, pbit uint8, ponPortID uint32) {
337
338 var igc *IgmpGroupChannel
339 logger.Debugw(ctx, "Processing receiver for device", log.Fields{"Channel": groupAddr, "Port": port, "Device": igd.Device})
340
341 igcIntf, ok := igd.GroupChannels.Load(groupAddr.String())
342 if !ok {
343 igc = NewIgmpGroupChannel(igd, groupAddr, version)
344 igd.GroupChannels.Store(groupAddr.String(), igc)
345 } else {
346 igc = igcIntf.(*IgmpGroupChannel)
347 }
348
349 if !igd.GroupInstalled {
350 igd.AddNewReceiver(port, groupAddr, group, cvlan, pbit, ponPortID)
351 return
352 }
353
354 isNewReceiver := igc.AddReceiver(port, group, cvlan, pbit)
355 if isNewReceiver {
356 ipsList := []net.IP{}
357 ipsIntf, _ := igd.PortChannelMap.Load(port)
358 if ipsIntf != nil {
359 ipsList = ipsIntf.([]net.IP)
360 }
361 ipsList = append(ipsList, groupAddr)
362 igd.PortChannelMap.Store(port, ipsList)
363 logger.Debugw(ctx, "Port Channel Updated", log.Fields{"Port": port, "AddedChannelList": ipsList, "Addr": groupAddr})
364
365 isNewPonReceiver := igd.AddChannelToChannelsPerPon(port, groupAddr, ponPortID)
366 //Modify group only if this is the first time the port is subscribing for the group
367 if isNewPonReceiver {
368 igd.ModMcGroup()
369 }
370 }
371 if err := igd.WriteToDb(); err != nil {
372 logger.Errorw(ctx, "Igmp group device Write to DB failed", log.Fields{"Device": igd.Device, "GroupName": igd.GroupName, "GroupAddr": igd.GroupAddr.String()})
373 }
374}
375
376// AddNewReceiver to add new receiver
377func (igd *IgmpGroupDevice) AddNewReceiver(port string, groupAddr net.IP, group *layers.IGMPv3GroupRecord, cvlan uint16, pbit uint8, ponPortID uint32) {
378
379 logger.Debugw(ctx, "Adding New Device Receiver", log.Fields{"Channel": groupAddr, "Port": port, "Device": igd.Device})
380 igcIntf, _ := igd.GroupChannels.Load(groupAddr.String())
381 if igcIntf == nil {
382 logger.Warnw(ctx, "No Group Channel present for given channel", log.Fields{"Channel": groupAddr, "Port": port, "Device": igd.Device})
383 return
384 }
385
386 igc := igcIntf.(*IgmpGroupChannel)
387 ipsList := []net.IP{}
388 ipsIntf, _ := igd.PortChannelMap.Load(port)
389 if ipsIntf != nil {
390 ipsList = ipsIntf.([]net.IP)
391 }
392 ipsList = append(ipsList, groupAddr)
393 igd.PortChannelMap.Store(port, ipsList)
394 igd.AddChannelToChannelsPerPon(port, groupAddr, ponPortID)
395 logger.Debugw(ctx, "Port Channel Updated", log.Fields{"Port": port, "NewChannelList": ipsList, "Addr": groupAddr})
396
397 igd.AddMcGroup()
398 igc.AddReceiver(port, group, cvlan, pbit)
399 if err := igd.WriteToDb(); err != nil {
400 logger.Errorw(ctx, "Igmp group device Write to DB failed", log.Fields{"Device": igd.Device, "GroupName": igd.GroupName, "GroupAddr": igd.GroupAddr.String()})
401 }
402}
403
404
405// NumReceivers to get number of receivers
406func (igd *IgmpGroupDevice) NumReceivers() int {
407 var numReceivers int
408 len := func(key interface{}, value interface{}) bool {
409 numReceivers++
410 return true
411 }
412 igd.PortChannelMap.Range(len)
413 return numReceivers
414}
415
416// DelReceiver is called when Query expiry happened for a receiver. This removes the receiver from the
417// the group
418func (igd *IgmpGroupDevice) DelReceiver(groupAddr net.IP, port string, group *layers.IGMPv3GroupRecord, ponPortID uint32) {
419
420 logger.Debugw(ctx, "Deleting Receiver for Device", log.Fields{"port": port, "GroupIP": groupAddr.String()})
421 var igc *IgmpGroupChannel
422 var igcIntf interface{}
423 var ok bool
424 var srcList []net.IP
425 incl := false
426 mvp := GetApplication().GetMvlanProfileByTag(igd.Mvlan)
427
428 if _, ok := mvp.Proxy[igd.GroupName]; ok {
429 incl = true
430 } else if group != nil {
431 srcList = group.SourceAddresses
432 incl = isIncl(group.Type)
433 }
434
435 if igcIntf, ok = igd.GroupChannels.Load(groupAddr.String()); !ok {
436 logger.Warnw(ctx, "Igmp Channel for group IP doesnt exist", log.Fields{"GroupAddr": groupAddr.String()})
437 return
438 }
439 igc = igcIntf.(*IgmpGroupChannel)
440 if ok := igc.DelReceiver(port, incl, srcList); !ok {
441 return
442 }
443
444 if igc.NumReceivers() == 0 {
445 igd.DelIgmpGroupChannel(igc)
446 }
447 igd.DelPortFromChannel(port, groupAddr)
448 isGroupModified := igd.RemoveChannelFromChannelsPerPon(port, groupAddr, ponPortID)
449
450 //Remove port from receiver if port has no subscription to any of the group channels
451 if isGroupModified {
452 igd.ModMcGroup()
453 }
454 if err := igd.WriteToDb(); err != nil {
455 logger.Errorw(ctx, "Igmp group device Write to DB failed", log.Fields{"Device": igd.Device, "GroupName": igd.GroupName, "GroupAddr": igd.GroupAddr.String()})
456 }
457}
458
459// DelChannelReceiver is called when Query expiry happened for a receiver. This removes the receiver from the
460// the group
461func (igd *IgmpGroupDevice) DelChannelReceiver(groupAddr net.IP) map[string]*IgmpGroupPort {
462
463 portsRemoved := make(map[string]*IgmpGroupPort)
464 groupModified := false
465 // ifEmpty := true
466 igcIntf, _ := igd.GroupChannels.Load(groupAddr.String())
467
468 if igcIntf == nil {
469 return portsRemoved
470 }
471 igc := igcIntf.(*IgmpGroupChannel)
472
473 for port, igp := range igc.NewReceivers {
474 _ = db.DelIgmpRcvr(igc.Mvlan, igc.GroupAddr, igc.Device, port) //TODO: Y not here
475 igd.DelPortFromChannel(port, igc.GroupAddr)
476 ponPortID := GetApplication().GetPonPortID(igd.Device, port)
477 groupModified = igd.RemoveChannelFromChannelsPerPon(port, igc.GroupAddr, ponPortID)
478 delete(igc.NewReceivers, port)
479 portsRemoved[port] = igp
480 }
481 for port, igp := range igc.CurReceivers {
482 _ = db.DelIgmpRcvr(igc.Mvlan, igc.GroupAddr, igc.Device, port)
483 igd.DelPortFromChannel(port, igc.GroupAddr)
484 ponPortID := GetApplication().GetPonPortID(igd.Device, port)
485 groupModified = igd.RemoveChannelFromChannelsPerPon(port, igc.GroupAddr, ponPortID)
486 delete(igc.CurReceivers, port)
487 portsRemoved[port] = igp
488 }
489
490 igc.DelMcFlow()
491 igd.DelIgmpGroupChannel(igc)
492 igc.Exclude = 0
493 igc.SendLeaveToServer()
494
495 if groupModified {
496 igd.ModMcGroup()
497 }
498 if err := igd.WriteToDb(); err != nil {
499 logger.Errorw(ctx, "Igmp group device Write to DB failed", log.Fields{"Device": igd.Device, "GroupName": igd.GroupName, "GroupAddr": igd.GroupAddr.String()})
500 }
501 logger.Debugw(ctx, "Deleted the receiver Flow", log.Fields{"Num Receivers": igc.NumReceivers()})
502 return portsRemoved
503}
504
505// DelIgmpGroupChannel to delete igmp group channel
506func (igd *IgmpGroupDevice) DelIgmpGroupChannel(igc *IgmpGroupChannel) {
507
508 if igc.NumReceivers() != 0 {
509 igc.DelAllReceivers()
510 }
511 _ = db.DelIgmpChannel(igc.Mvlan, igc.GroupName, igc.Device, igc.GroupAddr)
512 igd.GroupChannels.Delete(igc.GroupAddr.String())
513 logger.Infow(ctx, "Deleted the Channel from Device", log.Fields{"Channel": igc.GroupAddr.String()})
514 isLenZero := true
515 checkIfEmpty := func(key interface{}, value interface{}) bool {
516 isLenZero = false
517 return false
518 }
519 igd.GroupChannels.Range(checkIfEmpty)
520 if isLenZero {
521 logger.Infow(ctx, "No more active channels. Deleting MC Group", log.Fields{"Device": igd.Device, "Group": igd.GroupName})
522 igd.DelMcGroup(false)
523 }
524}
525
526// func (igd *IgmpGroupDevice) DelIgmpChannel(igc *IgmpGroupChannel) {
527// db.DelIgmpChannel(igc.GroupName, igc.Device, igc.GroupAddr)
528// delete(igd.GroupChannels, igc.GroupAddr.String())
529// logger.Debugw(ctx, "Deleted the Channel", log.Fields{"Num Receivers": igc.NumReceivers()})
530// }
531
532// DelPortFromChannel to delete port from channel
533func (igd *IgmpGroupDevice) DelPortFromChannel(port string, groupAddr net.IP) bool {
534 ipsList := []net.IP{}
535 ipsListIntf, _ := igd.PortChannelMap.Load(port)
536 if ipsListIntf != nil {
537 ipsList = ipsListIntf.([]net.IP)
538 }
539 for i, addr := range ipsList {
540 if addr.Equal(groupAddr) {
541 ipsList = append(ipsList[:i], ipsList[i+1:]...)
542 //Remove port from receiver if port has no subscription to any of the group channels
543 if len(ipsList) == 0 {
544 igd.PortChannelMap.Delete(port)
545 } else {
546 //Update the map with modified ips list
547 igd.PortChannelMap.Store(port, ipsList)
548 }
549 logger.Debugw(ctx, "Port Channel Updated", log.Fields{"Port": port, "DelChannelList": ipsList, "Addr": groupAddr.String()})
550 return true
551 }
552 }
553 return false
554}
555
556// DelAllChannels deletes all receiver for the provided igmp device
557func (igd *IgmpGroupDevice) DelAllChannels() {
558 logger.Infow(ctx, "Deleting All Channel for Device", log.Fields{"Device": igd.Device, "Group": igd.GroupName})
559 delGroupChannels := func(key interface{}, value interface{}) bool {
560 igc := value.(*IgmpGroupChannel)
561 igd.DelIgmpGroupChannel(igc)
562 return true
563 }
564 igd.GroupChannels.Range(delGroupChannels)
565}
566
567// ProcessQuery process query received from the upstream IGMP server
568func (igd *IgmpGroupDevice) ProcessQuery(groupAddr net.IP, ver uint8) {
569 logger.Debugw(ctx, "Received Query From Server", log.Fields{"Version": ver})
570 if ver != *igd.ServVersion {
571 igd.ServVersionExpiry = time.Now().Add(time.Duration(2*igd.proxyCfg.KeepAliveInterval) * time.Second)
572 *igd.ServVersion = ver
573 mvp := GetApplication().GetMvlanProfileByTag(igd.Mvlan)
574 if err := mvp.WriteToDb(); err != nil {
575 logger.Errorw(ctx, "Mvlan profile write to DB failed", log.Fields{"ProfileName": mvp.Name})
576 }
577 }
578 if igc, ok := igd.GroupChannels.Load(groupAddr.String()); ok {
579 igc.(*IgmpGroupChannel).SendReport(true)
580 return
581 }
582 logger.Infow(ctx, "No Members for Channel. Dropping Igmp Query", log.Fields{"Group": igd.GroupName, "Channel": groupAddr.String()})
583}
584
585// AddMcGroup add the new group on the device when a receiver joins the group
586func (igd *IgmpGroupDevice) AddMcGroup() {
587 if !igd.GroupInstalled {
588 group := &of.Group{}
589 group.Command = of.GroupCommandAdd
590 group.GroupID = igd.GroupID
591 group.Device = igd.Device
592 group.SetVlan = igd.PonVlan
593 group.IsPonVlanPresent = igd.IsPonVlanPresent
594
595 addbuckets := func(key interface{}, value interface{}) bool {
596 port := key.(string)
597 var portID uint32
598 if d := GetApplication().GetDevice(group.Device); d != nil {
599 GetApplication().portLock.Lock()
600 p := d.GetPort(port)
601 GetApplication().portLock.Unlock()
602 portID = p.ID
603 }
604 //ponPortID := key.(uint32)
605 if portID != 0xFF {
606 group.Buckets = append(group.Buckets, portID)
607 }
608 return true
609 }
610 igd.PortChannelMap.Range(addbuckets)
611
612 port, _ := GetApplication().GetNniPort(igd.Device)
613 _ = cntlr.GetController().GroupUpdate(port, igd.Device, group)
614 igd.GroupInstalled = true
615 }
616}
617
618// ModMcGroup updates the group on the device when either a receiver leaves
619// or joins the group
620func (igd *IgmpGroupDevice) ModMcGroup() {
621 if igd.GroupInstalled {
622 group := &of.Group{}
623 group.Command = of.GroupCommandMod
624 group.GroupID = igd.GroupID
625 group.Device = igd.Device
626 group.SetVlan = igd.PonVlan
627 group.IsPonVlanPresent = igd.IsPonVlanPresent
628
629 addbuckets := func(key interface{}, value interface{}) bool {
630 port := key.(string)
631 var portID uint32
632 if d := GetApplication().GetDevice(group.Device); d != nil {
633 GetApplication().portLock.Lock()
634 p := d.GetPort(port)
635 GetApplication().portLock.Unlock()
636 portID = p.ID
637 }
638 //ponPortID := key.(uint32)
639 if portID != 0xFF {
640 group.Buckets = append(group.Buckets, portID)
641 }
642 return true
643 }
644 igd.PortChannelMap.Range(addbuckets)
645
646 port, _ := GetApplication().GetNniPort(igd.Device)
647 _ = cntlr.GetController().GroupUpdate(port, igd.Device, group)
648 } else {
649 logger.Warnw(ctx, "Update Group Failed. Group not yet created", log.Fields{"Igd": igd.Device})
650 }
651}
652
653// DelMcGroup : The group is deleted when the last receiver leaves the group
654func (igd *IgmpGroupDevice) DelMcGroup(forceDelete bool) {
655
656 logger.Infow(ctx, "Delete Mc Group Request", log.Fields{"Device": igd.Device, "GroupID": igd.GroupID, "ForceFlag": forceDelete, "GroupInstalled": igd.GroupInstalled})
657 /*
658 if !forceDelete && !checkIfForceGroupRemove(igd.Device) {
659 if success := AddToPendingPool(igd.Device, igd.getKey()); success {
660 return
661 }
662 }*/
663 if igd.GroupInstalled {
664 logger.Debugw(ctx, "Deleting Group", log.Fields{"Device": igd.Device, "Id": igd.GroupID})
665 group := &of.Group{}
666 group.Command = of.GroupCommandDel
667 group.GroupID = igd.GroupID
668 group.Device = igd.Device
669 group.ForceAction = true
670
671 port, _ := GetApplication().GetNniPort(igd.Device)
672 _ = cntlr.GetController().GroupUpdate(port, igd.Device, group)
673 igd.GroupInstalled = false
674 }
675}
676
677// QueryExpiry processes query expiry. Upon expiry, take stock of the situation
678// add either retain/release the group based on number of receivers left
679func (igd *IgmpGroupDevice) QueryExpiry() {
680 logger.Debugw(ctx, "Query Expiry", log.Fields{"Device": igd.Device})
681
682
683 // Delete the IGMP flow added for this port if port state is down or query count exceeded
684 handleQueryExp := func(key interface{}, value interface{}) bool {
685 igc := value.(*IgmpGroupChannel)
686 for portKey, port := range igc.CurReceivers {
687
688 if portKey == StaticPort {
689 continue
690 }
691
692 logger.Warnw(ctx, "Expired Receiver Port", log.Fields{"PortKey": portKey, "IGP": port, "GroupAddr": igc.GroupAddr,
693 "Count": port.QueryTimeoutCount})
694 state, err := cntlr.GetController().GetPortState(igc.Device, portKey)
695 logger.Debugw(ctx, "Expired Member Port State", log.Fields{"state": state})
696 ponPortID := GetApplication().GetPonPortID(igd.Device, portKey)
697 if err == nil && state == cntlr.PortStateDown {
698 igd.DelReceiver(igc.GroupAddr, portKey, nil, ponPortID)
699 }
700
701 port.QueryTimeoutCount++
702 logger.Debugw(ctx, "Expired Port TimeoutCount", log.Fields{"count": port.QueryTimeoutCount})
703 if port.QueryTimeoutCount >= (*igc.proxyCfg).KeepAliveCount {
704 logger.Errorw(ctx, "Expiry Timeout count exceeded. Trigger delete receiver", log.Fields{"PortKey": portKey,
705 "GroupAddr": igc.GroupAddr, "Count": port.QueryTimeoutCount})
706 igd.DelReceiver(igc.GroupAddr, portKey, nil, ponPortID)
707 SendQueryExpiredEventGroupSpecific(portKey, igd, igc)
708 } else {
709 _ = port.WriteToDb(igc.Mvlan, igc.GroupAddr, igc.Device)
710 }
711 }
712 return true
713 }
714 igd.GroupChannels.Range(handleQueryExp)
715}