blob: 629d92cb8c276d9e16bb068c015b0289ac43f2f5 [file] [log] [blame]
Tinoj Josephcf161be2022-07-07 19:47:47 +05301/*
2* Copyright 2022-present Open Networking Foundation
3* Licensed under the Apache License, Version 2.0 (the "License");
4* you may not use this file except in compliance with the License.
5* You may obtain a copy of the License at
6*
7* http://www.apache.org/licenses/LICENSE-2.0
8*
9* Unless required by applicable law or agreed to in writing, software
10* distributed under the License is distributed on an "AS IS" BASIS,
11* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12* See the License for the specific language governing permissions and
13* limitations under the License.
14*/
15
16package application
17
18import (
19 "encoding/json"
20 "net"
21 "sync"
22 "time"
23
24 "github.com/google/gopacket/layers"
25
26 "voltha-go-controller/database"
27 "voltha-go-controller/internal/pkg/of"
28 "voltha-go-controller/internal/pkg/util"
29 "voltha-go-controller/log"
30)
31
32// IgmpGroup implements a single MCIP that may have multiple receivers
33// connected via multiple devices (OLTs). The IGMP group is stored on the
34// VOLT application.
35type IgmpGroup struct {
36 GroupID uint32
37 Mvlan of.VlanType
38 PonVlan of.VlanType
39 GroupName string
40 GroupAddr net.IP
41 Devices map[string]*IgmpGroupDevice `json:"-"`
42 PendingGroupForDevice map[string]time.Time //map [deviceId, timestamp] (ExpiryTime = leave time + 15mins)
43 Version string
44 IsPonVlanPresent bool
45 IsChannelBasedGroup bool
46 PendingPoolLock sync.RWMutex
47 IsGroupStatic bool
48 IgmpGroupLock sync.RWMutex
49}
50
51// NewIgmpGroup is constructor for an IGMP group
52func NewIgmpGroup(name string, vlan of.VlanType) *IgmpGroup {
53 ig := IgmpGroup{}
54 ig.GroupName = name
55 ig.Mvlan = vlan
56 ig.Devices = make(map[string]*IgmpGroupDevice)
57 ig.PendingGroupForDevice = make(map[string]time.Time)
58 return &ig
59}
60
61// IgmpGroupInit to initialize igmp group members
62func (ig *IgmpGroup) IgmpGroupInit(name string, gip net.IP, mvp *MvlanProfile) {
63 ig.GroupName = name
64 ig.Mvlan = mvp.Mvlan
65 ig.PonVlan = mvp.PonVlan
66 ig.IsPonVlanPresent = mvp.IsPonVlanPresent
67 ig.Devices = make(map[string]*IgmpGroupDevice)
68 ig.PendingGroupForDevice = make(map[string]time.Time)
69 ig.IsChannelBasedGroup = mvp.IsChannelBasedGroup
70 ig.IsGroupStatic = mvp.Groups[name].IsStatic
71 if ig.IsChannelBasedGroup {
72 ig.GroupAddr = gip
73 } else {
74 ig.GroupAddr = net.ParseIP("0.0.0.0")
75 }
76}
77
78// IgmpGroupReInit to re-initialize igmp group members
79func (ig *IgmpGroup) IgmpGroupReInit(name string, gip net.IP) {
80
81 logger.Infow(ctx, "Reinitialize Igmp Group", log.Fields{"GroupID": ig.GroupID, "OldName": ig.GroupName, "Name": name, "OldAddr": ig.GroupAddr.String(), "GroupAddr": gip.String()})
82
83 ig.GroupName = name
84 if ig.IsChannelBasedGroup {
85 ig.GroupAddr = gip
86 } else {
87 ig.GroupAddr = net.ParseIP("0.0.0.0")
88 }
89
90 for _, igd := range ig.Devices {
91 igd.IgmpGroupDeviceReInit(ig)
92 }
93}
94
95// updateGroupName to update group name
96func (ig *IgmpGroup) updateGroupName(newGroupName string) {
97 if !ig.IsChannelBasedGroup {
98 logger.Errorw(ctx, "Group name update not supported for GroupChannel based group", log.Fields{"Ig": ig})
99 return
100 }
101 oldKey := ig.getKey()
102 ig.GroupName = newGroupName
103 for _, igd := range ig.Devices {
104 igd.updateGroupName(newGroupName)
105 }
106 if err := ig.WriteToDb(); err != nil {
107 logger.Errorw(ctx, "Igmp group Write to DB failed", log.Fields{"groupName": ig.GroupName})
108 }
109 if !ig.IsChannelBasedGroup {
110 _ = db.DelIgmpGroup(oldKey)
111 }
112}
113
114//HandleGroupMigration - handles migration of group members between static & dynamic
115func (ig *IgmpGroup) HandleGroupMigration(deviceID string, groupAddr net.IP) {
116
117 var group *layers.IGMPv3GroupRecord
118 app := GetApplication()
119 if deviceID == "" {
120 logger.Infow(ctx, "Handle Group Migration Request for all devices", log.Fields{"DeviceID": deviceID, "GroupAddr": groupAddr, "IG": ig.GroupName, "Mvlan": ig.Mvlan})
121 for device := range ig.Devices {
122 ig.HandleGroupMigration(device, groupAddr)
123 }
124 } else {
125 logger.Infow(ctx, "Handle Group Migration Request", log.Fields{"DeviceID": deviceID, "GroupAddr": groupAddr, "IG": ig.GroupName})
126 var newIg *IgmpGroup
127 receivers := ig.DelIgmpChannel(deviceID, groupAddr)
128 if ig.NumDevicesActive() == 0 {
129 app.DelIgmpGroup(ig)
130 }
131 if newIg = app.GetIgmpGroup(ig.Mvlan, groupAddr); newIg == nil {
132 logger.Infow(ctx, "IG Group doesn't exist, creating new group", log.Fields{"DeviceID": deviceID, "GroupAddr": groupAddr, "IG": ig.GroupName, "Mvlan": ig.Mvlan})
133 if newIg = app.AddIgmpGroup(app.GetMvlanProfileByTag(ig.Mvlan).Name, groupAddr, deviceID); newIg == nil {
134 logger.Errorw(ctx, "Group Creation failed during group migration", log.Fields{"DeviceID": deviceID, "GroupAddr": groupAddr})
135 return
136 }
137 }
138 mvp := app.GetMvlanProfileByTag(ig.Mvlan)
139 isStaticGroup := mvp.IsStaticGroup(ig.GroupName)
140 logger.Infow(ctx, "Existing receivers for old group", log.Fields{"Receivers": receivers})
141 newIg.IgmpGroupLock.Lock()
142 for port, igp := range receivers {
143 if !isStaticGroup && port == StaticPort {
144 continue
145 }
146 group = nil
147 var reqType layers.IGMPv3GroupRecordType
148 srcAddresses := []net.IP{}
149 if igp.Version == IgmpVersion3 {
150 if igp.Exclude {
151 srcAddresses = append(srcAddresses, igp.ExcludeList...)
152 reqType = layers.IGMPIsEx
153 } else {
154 srcAddresses = append(srcAddresses, igp.IncludeList...)
155 reqType = layers.IGMPIsIn
156 }
157 group = &layers.IGMPv3GroupRecord{
158 SourceAddresses: srcAddresses,
159 Type: reqType,
160 }
161 }
162 logger.Infow(ctx, "Adding receiver to new group", log.Fields{"DeviceID": deviceID, "GroupAddr": groupAddr, "newIg": newIg.GroupName, "IGP": igp})
163 ponPort := GetApplication().GetPonPortID(deviceID, port)
164 newIg.AddReceiver(deviceID, port, groupAddr, group, igp.Version, igp.CVlan, igp.Pbit, ponPort)
165 }
166 newIg.IgmpGroupLock.Unlock()
167 }
168}
169
170// AddIgmpGroupDevice add a device to the group which happens when the first receiver of the device
171// is added to the IGMP group.
172func (ig *IgmpGroup) AddIgmpGroupDevice(device string, id uint32, version uint8) *IgmpGroupDevice {
173 logger.Infow(ctx, "Adding Device to IGMP group", log.Fields{"Device": device, "GroupName": ig.GroupName})
174 igd := NewIgmpGroupDevice(device, ig, id, version)
175 ig.Devices[device] = igd
176 if err := igd.WriteToDb(); err != nil {
177 logger.Errorw(ctx, "Igmp group device Write to DB failed", log.Fields{"Device": igd.Device, "GroupName": igd.GroupName, "GroupAddr": igd.GroupAddr.String()})
178 }
179 return igd
180}
181
182// DelIgmpGroupDevice delete the device from the group which happens when we receive a leave or when
183// there is not response for IGMP query from the receiver
184func (ig *IgmpGroup) DelIgmpGroupDevice(igd *IgmpGroupDevice) {
185 logger.Infow(ctx, "Deleting Device from IGMP group", log.Fields{"Device": igd.Device, "Name": ig.GroupName})
186 va := GetApplication()
187 countersToBeUpdated := false
188 if igd.NumReceivers() != 0 {
189 countersToBeUpdated = true
190 }
191 igd.DelAllChannels()
192
193 //Clear all internal maps so that the groups can be reused
194 igd.PortChannelMap.Range(func(key, value interface{}) bool {
195
196 //Update the counters only if not already updated
197 //(i.e) 1. In case of channel remove during Mvlan Update
198 if countersToBeUpdated {
199 port := key.(string)
200 channelList := value.([]net.IP)
201 ponPortID := va.GetPonPortID(igd.Device, port)
202
203 for _, channel := range channelList {
204 igd.RemoveChannelFromChannelsPerPon(port, channel, ponPortID)
205 }
206 }
207
208 igd.PortChannelMap.Delete(key)
209 return true
210 })
211 igd.PonPortChannelMap = util.NewConcurrentMap()
212
213 if mcastCfg := va.GetMcastConfig(igd.SerialNo, va.GetMvlanProfileByTag(igd.Mvlan).Name); mcastCfg != nil {
214 mcastCfg.IgmpGroupDevices.Delete(igd.GroupID)
215 logger.Debugw(ctx, "Igd deleted from mcast config", log.Fields{"mvlan": mcastCfg.MvlanProfileID, "groupId": igd.GroupID})
216 }
217 if !igd.GroupInstalled {
218 _ = db.DelIgmpDevice(igd.Mvlan, ig.GroupName, ig.GroupAddr, igd.Device)
219 delete(ig.Devices, igd.Device)
220 }
221}
222
223// AddReceiver delete the device from the group which happens when we receive a leave or when
224// there is not response for IGMP query from the receiver
225func (ig *IgmpGroup) AddReceiver(device string, port string, groupIP net.IP,
226 group *layers.IGMPv3GroupRecord, ver uint8, cvlan uint16, pbit uint8, ponPort uint32) {
227
228 logger.Debugw(ctx, "Adding Receiver", log.Fields{"Port": port})
229 if igd, ok := ig.getIgmpGroupDevice(device); !ok {
230 igd = ig.AddIgmpGroupDevice(device, ig.GroupID, ver)
231 igd.AddReceiver(port, groupIP, group, ver, cvlan, pbit, ponPort)
232 } else {
233 logger.Infow(ctx, "IGMP Group Receiver", log.Fields{"IGD": igd.Device})
234 igd.AddReceiver(port, groupIP, group, ver, cvlan, pbit, ponPort)
235 }
236}
237
238func (ig *IgmpGroup) getIgmpGroupDevice(device string) (*IgmpGroupDevice, bool) {
239 ig.PendingPoolLock.Lock()
240 defer ig.PendingPoolLock.Unlock()
241
242 if _, ok := ig.PendingGroupForDevice[device]; ok {
243 logger.Infow(ctx, "Removing the IgmpGroupDevice from pending pool", log.Fields{"GroupID": ig.GroupID, "Device": device, "GroupName": ig.GroupName, "GroupAddr": ig.GroupAddr.String()})
244 delete(ig.PendingGroupForDevice, device)
245 if err := ig.WriteToDb(); err != nil {
246 logger.Errorw(ctx, "Igmp group Write to DB failed", log.Fields{"groupName": ig.GroupName})
247 }
248 }
249 igd, ok := ig.Devices[device]
250 return igd, ok
251}
252
253// DelReceiveronDownInd deletes a receiver which is the combination of device (OLT)
254// and port on Port Down event
255func (ig *IgmpGroup) DelReceiveronDownInd(device string, port string, ponPortID uint32) {
256 logger.Debugw(ctx, "Deleting Receiver for Group", log.Fields{"Device": device, "port": port})
257
258 mvp := GetApplication().GetMvlanProfileByTag(ig.Mvlan)
259 mvp.mvpLock.RLock()
260 defer mvp.mvpLock.RUnlock()
261 igd, ok := ig.Devices[device]
262 if !ok {
263 logger.Infow(ctx, "IGMP Group device was not found for ", log.Fields{"Device": device})
264 return
265 }
266 ipsList := []net.IP{}
267 ipsListIntf, ok := igd.PortChannelMap.Load(port)
268 if ok {
269 ipsList = append(ipsList, ipsListIntf.([]net.IP)...)
270 }
271 logger.Infow(ctx, "Port Channel List", log.Fields{"Port": port, "IPsList": ipsList})
272 igd.PortChannelMap.Range(printPortChannel)
273
274
275 for _, groupAddr := range ipsList {
276 logger.Debugw(ctx, "Port Channels", log.Fields{"Port": port, "IPsList": ipsList, "GroupAddr": groupAddr, "Len": len(ipsList)})
277 igd.DelReceiver(groupAddr, port, nil, ponPortID)
278 }
279
280 if igd.NumReceivers() == 0 {
281 ig.DelIgmpGroupDevice(igd)
282 }
283}
284
285// DelReceiver deletes a receiver which is the combination of device (OLT)
286// and port
287func (ig *IgmpGroup) DelReceiver(device string, port string, groupAddr net.IP, group *layers.IGMPv3GroupRecord, ponPortID uint32) {
288 logger.Debugw(ctx, "Deleting Receiver for Group", log.Fields{"Device": device, "port": port, "GroupIP": groupAddr.String()})
289 if igd, ok := ig.Devices[device]; ok {
290 //igd.DelReceiverForGroupAddr(groupAddr, port)
291 igd.DelReceiver(groupAddr, port, group, ponPortID)
292 if igd.NumReceivers() == 0 {
293 ig.DelIgmpGroupDevice(igd)
294 }
295 }
296}
297
298// GetAllIgmpChannelForDevice - Returns all channels with active members associated to the Igmp Group for the given device
299func (ig *IgmpGroup) GetAllIgmpChannelForDevice(deviceID string) map[string]string {
300
301 if deviceID == "" {
302 return ig.GetAllIgmpChannel()
303 }
304
305 allChannels := make(map[string]string)
306 igd := ig.Devices[deviceID]
307 getAllChannels := func(key interface{}, value interface{}) bool {
308 channels := key.(string)
309 allChannels[channels] = channels //same value as only key is required
310 return true
311 }
312 igd.GroupChannels.Range(getAllChannels)
313
314 return allChannels
315}
316
317// GetAllIgmpChannel - Returns all channels with active members associated to the Igmp Group
318func (ig *IgmpGroup) GetAllIgmpChannel() map[string]string {
319 allChannels := make(map[string]string)
320 for _, igd := range ig.Devices {
321 getAllChannels := func(key interface{}, value interface{}) bool {
322 channels := key.(string)
323 allChannels[channels] = channels
324 return true
325 }
326 igd.GroupChannels.Range(getAllChannels)
327 }
328 return allChannels
329}
330
331// DelIgmpChannel deletes all receivers for the provided igmp group channel for the given device
332func (ig *IgmpGroup) DelIgmpChannel(deviceID string, groupAddr net.IP) map[string]*IgmpGroupPort {
333 logger.Infow(ctx, "Deleting Channel from devices", log.Fields{"Device": deviceID, "Group": ig.GroupName, "Channel": groupAddr.String()})
334 if deviceID == "" {
335 for device := range ig.Devices {
336 ig.DelIgmpChannel(device, groupAddr)
337 }
338 return nil
339 }
340 igd := ig.Devices[deviceID]
341 receivers := igd.DelChannelReceiver(groupAddr)
342 if igd.NumReceivers() == 0 {
343 ig.DelIgmpGroupDevice(igd)
344 }
345 return receivers
346}
347
348// IsNewReceiver checks if the received port is new receiver or existing one.
349// Returns true if new receiver.
350func (ig *IgmpGroup) IsNewReceiver(device, uniPortID string, groupAddr net.IP) bool {
351 if ig == nil {
352 // IGMP group does not exists. So considering it as new receiver.
353 return true
354 }
355 logger.Debugw(ctx, "IGMP Group", log.Fields{"channel": groupAddr, "groupName": ig.GroupName}) // TODO: Remove me
356 igd, exists := ig.Devices[device]
357 if !exists || !igd.GroupInstalled {
358 // IGMP group not exists OR Group is not created in the device.
359 // So this is a new receiver.
360 logger.Debugw(ctx, "igd not exists or group is not created in device", log.Fields{"exists": exists}) // TODO: Remove me
361 return true
362 }
363 if igc, ok := igd.GroupChannels.Load(groupAddr.String()); ok {
364 logger.Debugw(ctx, "IGMP Channel receivers", log.Fields{"igc-receivers": igc.(*IgmpGroupChannel).CurReceivers}) // TODO: Remove me
365 _, rcvrExistCur := igc.(*IgmpGroupChannel).CurReceivers[uniPortID]
366 _, rcvrExistNew := igc.(*IgmpGroupChannel).NewReceivers[uniPortID]
367 if rcvrExistCur || rcvrExistNew {
368 // Existing receiver
369 return false
370 }
371 }
372 return true
373}
374
375// Tick for Addition of groups to an MVLAN profile
376func (ig *IgmpGroup) Tick() {
377 now := time.Now()
378 for _, igd := range ig.Devices {
379 var igdChangeCnt uint8
380
381 if _, ok := GetApplication().DevicesDisc.Load(igd.Device); !ok {
382 logger.Info(ctx, "Skipping Query and Expiry check since Device is unavailable")
383 continue
384 }
385 if now.After(igd.NextQueryTime) {
386 // Set the next query time and the query expiry time to
387 // KeepAliveInterval and MaxResp seconds after current time
388 igd.NextQueryTime = now.Add(time.Duration(igd.proxyCfg.KeepAliveInterval) * time.Second)
389 igd.QueryExpiryTime = now.Add(time.Duration(igd.proxyCfg.MaxResp) * time.Second)
390 logger.Debugw(ctx, "Query Start", log.Fields{"NextQuery": igd.NextQueryTime, "Expiry": igd.QueryExpiryTime})
391 igdChangeCnt++
392 logger.Debugw(ctx, "Sending Query to device", log.Fields{"Device": igd.Device})
393 sendQueryForAllChannels := func(key interface{}, value interface{}) bool {
394 igc := value.(*IgmpGroupChannel)
395 //TODO - Do generic query to avoid multiple msgs
396 igc.SendQuery()
397 return true
398 }
399 igd.GroupChannels.Range(sendQueryForAllChannels)
400 }
401 if now.After(igd.QueryExpiryTime) {
402 igd.QueryExpiry()
403 // This will keep it quiet till the next query time and then
404 // it will be reset to a value after the query initiation time
405 igd.QueryExpiryTime = igd.NextQueryTime
406 logger.Debugw(ctx, "Expiry", log.Fields{"NextQuery": igd.NextQueryTime, "Expiry": igd.QueryExpiryTime})
407 igdChangeCnt++
408 if igd.NumReceivers() == 0 {
409 ig.DelIgmpGroupDevice(igd)
410 continue
411 }
412 }
413
414 igdChangeCnt += igd.Tick()
415
416 if igdChangeCnt > 0 {
417 if err := igd.WriteToDb(); err != nil {
418 logger.Errorw(ctx, "Igmp group device Write to DB failed", log.Fields{"Device": igd.Device,
419 "GroupName": igd.GroupName, "GroupAddr": igd.GroupAddr.String()})
420 }
421 }
422 }
423}
424
425// QueryExpiry processes expiry of query sent to the receivers. Up on
426// expiry, process the consolidated response for each of the devices participating
427// in the MC stream. When a device has no receivers, the device is deleted
428// from the group.
429func (ig *IgmpGroup) QueryExpiry() {
430 for _, igd := range ig.Devices {
431 if _, ok := GetApplication().DevicesDisc.Load(igd.Device); ok {
432 igd.QueryExpiry()
433 if igd.NumReceivers() == 0 {
434 ig.DelIgmpGroupDevice(igd)
435 }
436
437 } else {
438 logger.Info(ctx, "Skipping Expiry since Device is unavailable")
439 }
440 }
441}
442
443// Hash : The IGMP group hash is used to distribute the processing of timers so that
444// the processing is spread across doesn't spike at one instant. This also
445// ensures that there is sufficient responsiveness to other requests happening
446// simultaneously.
447func (ig *IgmpGroup) Hash() uint16 {
448 mvp := GetApplication().GetMvlanProfileByTag(ig.Mvlan)
449
450 if mvp == nil {
451 return 0
452 }
453
454 mvp.mvpLock.RLock()
455 defer mvp.mvpLock.RUnlock()
456 group := mvp.Groups[ig.GroupName]
457
458 //Case where mvlan update in-progress
459 if group == nil || len(group.McIPs) == 0 {
460 return 0
461 }
462 groupIP := group.McIPs[0]
463 return uint16(groupIP[2])<<8 + uint16(groupIP[3])
464}
465
466// NumDevicesAll returns the number of devices (OLT) active on the IGMP group. When
467// the last device leaves the IGMP group is removed. If this is not done,
468// the number of IGMP groups only keep increasing and can impact CPU when
469// the system runs for a very long duration
470func (ig *IgmpGroup) NumDevicesAll() int {
471 return len(ig.Devices)
472}
473
474// NumDevicesActive returns the number of devices (OLT) active on the IGMP group. When
475// the last device leaves the IGMP group is removed. If this is not done,
476// the number of IGMP groups only keep increasing and can impact CPU when
477// the system runs for a very long duration
478func (ig *IgmpGroup) NumDevicesActive() int {
479 count := 0
480 for _, igd := range ig.Devices {
481 if igd.NumReceivers() == 0 && igd.GroupInstalled {
482 continue
483 }
484 count++
485 }
486 return count
487}
488
489// NumReceivers to return receiver list
490func (ig *IgmpGroup) NumReceivers() map[string]int {
491 receiverList := make(map[string]int)
492 for device, igd := range ig.Devices {
493 receiverList[device] = igd.NumReceivers()
494 }
495 return receiverList
496}
497
498// RestoreDevices : IGMP group write to DB
499func (ig *IgmpGroup) RestoreDevices() {
500
501 ig.migrateIgmpDevices()
502 devices, _ := db.GetIgmpDevices(ig.Mvlan, ig.GroupName, ig.GroupAddr)
503 for _, device := range devices {
504 b, ok := device.Value.([]byte)
505 if !ok {
506 logger.Warn(ctx, "The value type is not []byte")
507 continue
508 }
509 if igd, err := NewIgmpGroupDeviceFromBytes(b); err == nil {
510 igd.PonPortChannelMap = util.NewConcurrentMap()
511 // Update the proxy config pointers.
512 var mcastCfg *McastConfig
513 igd.proxyCfg, igd.IgmpProxyIP, mcastCfg = getIgmpProxyCfgAndIP(ig.Mvlan, igd.SerialNo)
514 if mcastCfg != nil {
515 mcastCfg.IgmpGroupDevices.Store(igd.GroupID, igd)
516 logger.Debugw(ctx, "Igd added to mcast config", log.Fields{"mvlan": mcastCfg.MvlanProfileID, "groupId": igd.GroupID})
517 }
518
519 mvp := GetApplication().GetMvlanProfileByTag(igd.Mvlan)
520 igd.ServVersion = mvp.IgmpServVersion[igd.SerialNo]
521
522 // During vgc upgrade from old version, igd.NextQueryTime and igd.QueryExpiryTime will not be present in db.
523 // hence they are initialized with current time offset.
524 emptyTime := time.Time{}
525 if emptyTime == igd.NextQueryTime {
526 logger.Debugw(ctx, "VGC igd upgrade", log.Fields{"igd grp name": igd.GroupName})
527 igd.NextQueryTime = time.Now().Add(time.Duration(igd.proxyCfg.KeepAliveInterval) * time.Second)
528 igd.QueryExpiryTime = time.Now().Add(time.Duration(igd.proxyCfg.KeepAliveInterval) * time.Second)
529 if err := igd.WriteToDb(); err != nil {
530 logger.Errorw(ctx, "Igmp group device Write to DB failed", log.Fields{"Device": igd.Device,
531 "GroupName": igd.GroupName, "GroupAddr": igd.GroupAddr.String()})
532 }
533 }
534
535 ig.Devices[igd.Device] = igd
536 if ig.IsChannelBasedGroup {
537 channel, _ := db.GetIgmpChannel(igd.Mvlan, igd.GroupName, igd.Device, igd.GroupAddr)
538 igd.RestoreChannel([]byte(channel))
539 } else {
540 igd.RestoreChannels()
541 }
542 igd.PortChannelMap.Range(printPortChannel)
543 logger.Infow(ctx, "Group Device Restored", log.Fields{"IGD": igd})
544 } else {
545 logger.Warnw(ctx, "Unable to decode device from database", log.Fields{"str": string(b)})
546 }
547 }
548}
549
550// getKey to return group key
551func (ig *IgmpGroup) getKey() string {
552 profile, ok := GetApplication().MvlanProfilesByTag.Load(ig.Mvlan)
553 if ok {
554 mvp := profile.(*MvlanProfile)
555 return mvp.generateGroupKey(ig.GroupName, ig.GroupAddr.String())
556 }
557 return ""
558}
559
560// WriteToDb is utility to write Igmp Group Info to database
561func (ig *IgmpGroup) WriteToDb() error {
562 ig.Version = database.PresentVersionMap[database.IgmpGroupPath]
563 b, err := json.Marshal(ig)
564 if err != nil {
565 return err
566 }
567 if err1 := db.PutIgmpGroup(ig.getKey(), string(b)); err1 != nil {
568 return err1
569 }
570 return nil
571}
572
573// UpdateIgmpGroup : When the pending group is allocated to new
574func (ig *IgmpGroup) UpdateIgmpGroup(oldKey, newKey string) {
575
576 //If the group is allocated to same McastGroup, no need to update the
577 //IgmpGroups map
578 if oldKey == newKey {
579 return
580 }
581 logger.Infow(ctx, "Updating Igmp Group with new MVP Group Info", log.Fields{"OldKey": oldKey, "NewKey": newKey, "GroupID": ig.GroupID})
582
583 GetApplication().IgmpGroups.Delete(oldKey)
584 _ = db.DelIgmpGroup(oldKey)
585
586 GetApplication().IgmpGroups.Store(newKey, ig)
587 if err := ig.WriteToDb(); err != nil {
588 logger.Errorw(ctx, "Igmp group Write to DB failed", log.Fields{"groupName": ig.GroupName})
589 }
590}
591
592func (ig *IgmpGroup) removeExpiredGroupFromDevice() {
593 ig.PendingPoolLock.Lock()
594 defer ig.PendingPoolLock.Unlock()
595
596 for device, timer := range ig.PendingGroupForDevice {
597
598 // To ensure no race-condition between the expiry time and the new Join,
599 // ensure the group exists in pending pool before deletion
600 groupExistsInPendingPool := true
601
602 if !time.Now().After(timer) {
603 continue
604 }
605
606 // Check if the IgmpGroup obj has no active member across any device
607 // If Yes, then this group is part of global pending pool (IgmpPendingPool), hence if expired,
608 // Remove only the IgmpGroup obj referenced to this device from global pool also.
609 if ig.NumDevicesActive() == 0 {
610 groupExistsInPendingPool = GetApplication().RemoveGroupFromPendingPool(device, ig)
611 }
612
613 // Remove the group entry from device and remove the IgmpDev Obj
614 // from IgmpGrp Pending pool
615 if groupExistsInPendingPool {
616 ig.DeleteIgmpGroupDevice(device)
617 }
618 }
619}
620
621//DeleteIgmpGroupDevice - removes the IgmpGroupDevice obj from IgmpGroup and database
622func (ig *IgmpGroup) DeleteIgmpGroupDevice(device string) {
623
624 logger.Infow(ctx, "Deleting IgmpGroupDevice from IG Pending Pool", log.Fields{"Device": device, "GroupID": ig.GroupID, "GroupName": ig.GroupName, "GroupAddr": ig.GroupAddr.String(), "PendingDevices": len(ig.Devices)})
625
626 igd := ig.Devices[device]
627 igd.DelMcGroup(true)
628 delete(ig.Devices, device)
629 delete(ig.PendingGroupForDevice, device)
630 _ = db.DelIgmpDevice(igd.Mvlan, igd.GroupName, igd.GroupAddr, igd.Device)
631
632 //If the group is not associated to any other device, then the entire Igmp Group obj itself can be removed
633 if ig.NumDevicesAll() == 0 {
634 logger.Infow(ctx, "Deleting IgmpGroup as all pending groups has expired", log.Fields{"Device": device, "GroupID": ig.GroupID, "GroupName": ig.GroupName, "GroupAddr": ig.GroupAddr.String(), "PendingDevices": len(ig.Devices)})
635 GetApplication().DelIgmpGroup(ig)
636 return
637 }
638 if err := ig.WriteToDb(); err != nil {
639 logger.Errorw(ctx, "Igmp group Write to DB failed", log.Fields{"groupName": ig.GroupName})
640 }
641}
642
643// DelIgmpGroup deletes all devices for the provided igmp group
644func (ig *IgmpGroup) DelIgmpGroup() {
645 logger.Infow(ctx, "Deleting All Device for Group", log.Fields{"Group": ig.GroupName})
646 for _, igd := range ig.Devices {
647 ig.DelIgmpGroupDevice(igd)
648 }
649 GetApplication().DelIgmpGroup(ig)
650}