blob: a7107cd2871ba4d9d4a31ba33de789572514d41a [file] [log] [blame]
Tinoj Josephcf161be2022-07-07 19:47:47 +05301/*
2* Copyright 2022-present Open Networking Foundation
3* Licensed under the Apache License, Version 2.0 (the "License");
4* you may not use this file except in compliance with the License.
5* You may obtain a copy of the License at
6*
7* http://www.apache.org/licenses/LICENSE-2.0
8*
9* Unless required by applicable law or agreed to in writing, software
10* distributed under the License is distributed on an "AS IS" BASIS,
11* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12* See the License for the specific language governing permissions and
13* limitations under the License.
14*/
15
16package application
17
18import (
Tinoj Joseph07cc5372022-07-18 22:53:51 +053019 "context"
Tinoj Josephcf161be2022-07-07 19:47:47 +053020 "encoding/json"
21 "net"
22 "sync"
23 "time"
24
25 "github.com/google/gopacket/layers"
26
27 "voltha-go-controller/database"
28 "voltha-go-controller/internal/pkg/of"
29 "voltha-go-controller/internal/pkg/util"
30 "voltha-go-controller/log"
31)
32
33// IgmpGroup implements a single MCIP that may have multiple receivers
34// connected via multiple devices (OLTs). The IGMP group is stored on the
35// VOLT application.
36type IgmpGroup struct {
37 GroupID uint32
38 Mvlan of.VlanType
39 PonVlan of.VlanType
40 GroupName string
41 GroupAddr net.IP
42 Devices map[string]*IgmpGroupDevice `json:"-"`
43 PendingGroupForDevice map[string]time.Time //map [deviceId, timestamp] (ExpiryTime = leave time + 15mins)
44 Version string
45 IsPonVlanPresent bool
46 IsChannelBasedGroup bool
47 PendingPoolLock sync.RWMutex
48 IsGroupStatic bool
49 IgmpGroupLock sync.RWMutex
50}
51
52// NewIgmpGroup is constructor for an IGMP group
53func NewIgmpGroup(name string, vlan of.VlanType) *IgmpGroup {
54 ig := IgmpGroup{}
55 ig.GroupName = name
56 ig.Mvlan = vlan
57 ig.Devices = make(map[string]*IgmpGroupDevice)
58 ig.PendingGroupForDevice = make(map[string]time.Time)
59 return &ig
60}
61
62// IgmpGroupInit to initialize igmp group members
63func (ig *IgmpGroup) IgmpGroupInit(name string, gip net.IP, mvp *MvlanProfile) {
64 ig.GroupName = name
65 ig.Mvlan = mvp.Mvlan
66 ig.PonVlan = mvp.PonVlan
67 ig.IsPonVlanPresent = mvp.IsPonVlanPresent
68 ig.Devices = make(map[string]*IgmpGroupDevice)
69 ig.PendingGroupForDevice = make(map[string]time.Time)
70 ig.IsChannelBasedGroup = mvp.IsChannelBasedGroup
71 ig.IsGroupStatic = mvp.Groups[name].IsStatic
72 if ig.IsChannelBasedGroup {
73 ig.GroupAddr = gip
74 } else {
75 ig.GroupAddr = net.ParseIP("0.0.0.0")
76 }
77}
78
79// IgmpGroupReInit to re-initialize igmp group members
Tinoj Joseph07cc5372022-07-18 22:53:51 +053080func (ig *IgmpGroup) IgmpGroupReInit(cntx context.Context, name string, gip net.IP) {
Tinoj Josephcf161be2022-07-07 19:47:47 +053081
82 logger.Infow(ctx, "Reinitialize Igmp Group", log.Fields{"GroupID": ig.GroupID, "OldName": ig.GroupName, "Name": name, "OldAddr": ig.GroupAddr.String(), "GroupAddr": gip.String()})
83
84 ig.GroupName = name
85 if ig.IsChannelBasedGroup {
86 ig.GroupAddr = gip
87 } else {
88 ig.GroupAddr = net.ParseIP("0.0.0.0")
89 }
90
91 for _, igd := range ig.Devices {
Tinoj Joseph07cc5372022-07-18 22:53:51 +053092 igd.IgmpGroupDeviceReInit(cntx, ig)
Tinoj Josephcf161be2022-07-07 19:47:47 +053093 }
94}
95
96// updateGroupName to update group name
Tinoj Joseph07cc5372022-07-18 22:53:51 +053097func (ig *IgmpGroup) updateGroupName(cntx context.Context, newGroupName string) {
Tinoj Josephcf161be2022-07-07 19:47:47 +053098 if !ig.IsChannelBasedGroup {
99 logger.Errorw(ctx, "Group name update not supported for GroupChannel based group", log.Fields{"Ig": ig})
100 return
101 }
102 oldKey := ig.getKey()
103 ig.GroupName = newGroupName
104 for _, igd := range ig.Devices {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530105 igd.updateGroupName(cntx, newGroupName)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530106 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530107 if err := ig.WriteToDb(cntx); err != nil {
Tinoj Josephcf161be2022-07-07 19:47:47 +0530108 logger.Errorw(ctx, "Igmp group Write to DB failed", log.Fields{"groupName": ig.GroupName})
109 }
110 if !ig.IsChannelBasedGroup {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530111 _ = db.DelIgmpGroup(cntx, oldKey)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530112 }
113}
114
115//HandleGroupMigration - handles migration of group members between static & dynamic
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530116func (ig *IgmpGroup) HandleGroupMigration(cntx context.Context, deviceID string, groupAddr net.IP) {
Tinoj Josephcf161be2022-07-07 19:47:47 +0530117
118 var group *layers.IGMPv3GroupRecord
119 app := GetApplication()
120 if deviceID == "" {
121 logger.Infow(ctx, "Handle Group Migration Request for all devices", log.Fields{"DeviceID": deviceID, "GroupAddr": groupAddr, "IG": ig.GroupName, "Mvlan": ig.Mvlan})
122 for device := range ig.Devices {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530123 ig.HandleGroupMigration(cntx, device, groupAddr)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530124 }
125 } else {
126 logger.Infow(ctx, "Handle Group Migration Request", log.Fields{"DeviceID": deviceID, "GroupAddr": groupAddr, "IG": ig.GroupName})
127 var newIg *IgmpGroup
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530128 receivers := ig.DelIgmpChannel(cntx, deviceID, groupAddr)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530129 if ig.NumDevicesActive() == 0 {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530130 app.DelIgmpGroup(cntx, ig)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530131 }
132 if newIg = app.GetIgmpGroup(ig.Mvlan, groupAddr); newIg == nil {
133 logger.Infow(ctx, "IG Group doesn't exist, creating new group", log.Fields{"DeviceID": deviceID, "GroupAddr": groupAddr, "IG": ig.GroupName, "Mvlan": ig.Mvlan})
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530134 if newIg = app.AddIgmpGroup(cntx, app.GetMvlanProfileByTag(ig.Mvlan).Name, groupAddr, deviceID); newIg == nil {
Tinoj Josephcf161be2022-07-07 19:47:47 +0530135 logger.Errorw(ctx, "Group Creation failed during group migration", log.Fields{"DeviceID": deviceID, "GroupAddr": groupAddr})
136 return
137 }
138 }
139 mvp := app.GetMvlanProfileByTag(ig.Mvlan)
140 isStaticGroup := mvp.IsStaticGroup(ig.GroupName)
141 logger.Infow(ctx, "Existing receivers for old group", log.Fields{"Receivers": receivers})
142 newIg.IgmpGroupLock.Lock()
143 for port, igp := range receivers {
144 if !isStaticGroup && port == StaticPort {
145 continue
146 }
147 group = nil
148 var reqType layers.IGMPv3GroupRecordType
149 srcAddresses := []net.IP{}
150 if igp.Version == IgmpVersion3 {
151 if igp.Exclude {
152 srcAddresses = append(srcAddresses, igp.ExcludeList...)
153 reqType = layers.IGMPIsEx
154 } else {
155 srcAddresses = append(srcAddresses, igp.IncludeList...)
156 reqType = layers.IGMPIsIn
157 }
158 group = &layers.IGMPv3GroupRecord{
159 SourceAddresses: srcAddresses,
160 Type: reqType,
161 }
162 }
163 logger.Infow(ctx, "Adding receiver to new group", log.Fields{"DeviceID": deviceID, "GroupAddr": groupAddr, "newIg": newIg.GroupName, "IGP": igp})
164 ponPort := GetApplication().GetPonPortID(deviceID, port)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530165 newIg.AddReceiver(cntx, deviceID, port, groupAddr, group, igp.Version, igp.CVlan, igp.Pbit, ponPort)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530166 }
167 newIg.IgmpGroupLock.Unlock()
168 }
169}
170
171// AddIgmpGroupDevice add a device to the group which happens when the first receiver of the device
172// is added to the IGMP group.
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530173func (ig *IgmpGroup) AddIgmpGroupDevice(cntx context.Context, device string, id uint32, version uint8) *IgmpGroupDevice {
Tinoj Josephcf161be2022-07-07 19:47:47 +0530174 logger.Infow(ctx, "Adding Device to IGMP group", log.Fields{"Device": device, "GroupName": ig.GroupName})
175 igd := NewIgmpGroupDevice(device, ig, id, version)
176 ig.Devices[device] = igd
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530177 if err := igd.WriteToDb(cntx); err != nil {
Tinoj Josephcf161be2022-07-07 19:47:47 +0530178 logger.Errorw(ctx, "Igmp group device Write to DB failed", log.Fields{"Device": igd.Device, "GroupName": igd.GroupName, "GroupAddr": igd.GroupAddr.String()})
179 }
180 return igd
181}
182
183// DelIgmpGroupDevice delete the device from the group which happens when we receive a leave or when
184// there is not response for IGMP query from the receiver
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530185func (ig *IgmpGroup) DelIgmpGroupDevice(cntx context.Context, igd *IgmpGroupDevice) {
Tinoj Josephcf161be2022-07-07 19:47:47 +0530186 logger.Infow(ctx, "Deleting Device from IGMP group", log.Fields{"Device": igd.Device, "Name": ig.GroupName})
187 va := GetApplication()
188 countersToBeUpdated := false
189 if igd.NumReceivers() != 0 {
190 countersToBeUpdated = true
191 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530192 igd.DelAllChannels(cntx)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530193
194 //Clear all internal maps so that the groups can be reused
195 igd.PortChannelMap.Range(func(key, value interface{}) bool {
196
197 //Update the counters only if not already updated
198 //(i.e) 1. In case of channel remove during Mvlan Update
199 if countersToBeUpdated {
200 port := key.(string)
201 channelList := value.([]net.IP)
202 ponPortID := va.GetPonPortID(igd.Device, port)
203
204 for _, channel := range channelList {
205 igd.RemoveChannelFromChannelsPerPon(port, channel, ponPortID)
206 }
207 }
208
209 igd.PortChannelMap.Delete(key)
210 return true
211 })
212 igd.PonPortChannelMap = util.NewConcurrentMap()
213
214 if mcastCfg := va.GetMcastConfig(igd.SerialNo, va.GetMvlanProfileByTag(igd.Mvlan).Name); mcastCfg != nil {
215 mcastCfg.IgmpGroupDevices.Delete(igd.GroupID)
216 logger.Debugw(ctx, "Igd deleted from mcast config", log.Fields{"mvlan": mcastCfg.MvlanProfileID, "groupId": igd.GroupID})
217 }
218 if !igd.GroupInstalled {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530219 _ = db.DelIgmpDevice(cntx, igd.Mvlan, ig.GroupName, ig.GroupAddr, igd.Device)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530220 delete(ig.Devices, igd.Device)
221 }
222}
223
224// AddReceiver delete the device from the group which happens when we receive a leave or when
225// there is not response for IGMP query from the receiver
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530226func (ig *IgmpGroup) AddReceiver(cntx context.Context, device string, port string, groupIP net.IP,
Tinoj Josephcf161be2022-07-07 19:47:47 +0530227 group *layers.IGMPv3GroupRecord, ver uint8, cvlan uint16, pbit uint8, ponPort uint32) {
228
229 logger.Debugw(ctx, "Adding Receiver", log.Fields{"Port": port})
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530230 if igd, ok := ig.getIgmpGroupDevice(cntx, device); !ok {
231 igd = ig.AddIgmpGroupDevice(cntx, device, ig.GroupID, ver)
232 igd.AddReceiver(cntx, port, groupIP, group, ver, cvlan, pbit, ponPort)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530233 } else {
234 logger.Infow(ctx, "IGMP Group Receiver", log.Fields{"IGD": igd.Device})
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530235 igd.AddReceiver(cntx, port, groupIP, group, ver, cvlan, pbit, ponPort)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530236 }
237}
238
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530239func (ig *IgmpGroup) getIgmpGroupDevice(cntx context.Context, device string) (*IgmpGroupDevice, bool) {
Tinoj Josephcf161be2022-07-07 19:47:47 +0530240 ig.PendingPoolLock.Lock()
241 defer ig.PendingPoolLock.Unlock()
242
243 if _, ok := ig.PendingGroupForDevice[device]; ok {
244 logger.Infow(ctx, "Removing the IgmpGroupDevice from pending pool", log.Fields{"GroupID": ig.GroupID, "Device": device, "GroupName": ig.GroupName, "GroupAddr": ig.GroupAddr.String()})
245 delete(ig.PendingGroupForDevice, device)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530246 if err := ig.WriteToDb(cntx); err != nil {
Tinoj Josephcf161be2022-07-07 19:47:47 +0530247 logger.Errorw(ctx, "Igmp group Write to DB failed", log.Fields{"groupName": ig.GroupName})
248 }
249 }
250 igd, ok := ig.Devices[device]
251 return igd, ok
252}
253
254// DelReceiveronDownInd deletes a receiver which is the combination of device (OLT)
255// and port on Port Down event
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530256func (ig *IgmpGroup) DelReceiveronDownInd(cntx context.Context, device string, port string, ponPortID uint32) {
Tinoj Josephcf161be2022-07-07 19:47:47 +0530257 logger.Debugw(ctx, "Deleting Receiver for Group", log.Fields{"Device": device, "port": port})
258
259 mvp := GetApplication().GetMvlanProfileByTag(ig.Mvlan)
260 mvp.mvpLock.RLock()
261 defer mvp.mvpLock.RUnlock()
262 igd, ok := ig.Devices[device]
263 if !ok {
264 logger.Infow(ctx, "IGMP Group device was not found for ", log.Fields{"Device": device})
265 return
266 }
267 ipsList := []net.IP{}
268 ipsListIntf, ok := igd.PortChannelMap.Load(port)
269 if ok {
270 ipsList = append(ipsList, ipsListIntf.([]net.IP)...)
271 }
272 logger.Infow(ctx, "Port Channel List", log.Fields{"Port": port, "IPsList": ipsList})
273 igd.PortChannelMap.Range(printPortChannel)
274
275
276 for _, groupAddr := range ipsList {
277 logger.Debugw(ctx, "Port Channels", log.Fields{"Port": port, "IPsList": ipsList, "GroupAddr": groupAddr, "Len": len(ipsList)})
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530278 igd.DelReceiver(cntx, groupAddr, port, nil, ponPortID)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530279 }
280
281 if igd.NumReceivers() == 0 {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530282 ig.DelIgmpGroupDevice(cntx, igd)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530283 }
284}
285
286// DelReceiver deletes a receiver which is the combination of device (OLT)
287// and port
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530288func (ig *IgmpGroup) DelReceiver(cntx context.Context, device string, port string, groupAddr net.IP, group *layers.IGMPv3GroupRecord, ponPortID uint32) {
Tinoj Josephcf161be2022-07-07 19:47:47 +0530289 logger.Debugw(ctx, "Deleting Receiver for Group", log.Fields{"Device": device, "port": port, "GroupIP": groupAddr.String()})
290 if igd, ok := ig.Devices[device]; ok {
291 //igd.DelReceiverForGroupAddr(groupAddr, port)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530292 igd.DelReceiver(cntx, groupAddr, port, group, ponPortID)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530293 if igd.NumReceivers() == 0 {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530294 ig.DelIgmpGroupDevice(cntx, igd)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530295 }
296 }
297}
298
299// GetAllIgmpChannelForDevice - Returns all channels with active members associated to the Igmp Group for the given device
300func (ig *IgmpGroup) GetAllIgmpChannelForDevice(deviceID string) map[string]string {
301
302 if deviceID == "" {
303 return ig.GetAllIgmpChannel()
304 }
305
306 allChannels := make(map[string]string)
307 igd := ig.Devices[deviceID]
308 getAllChannels := func(key interface{}, value interface{}) bool {
309 channels := key.(string)
310 allChannels[channels] = channels //same value as only key is required
311 return true
312 }
313 igd.GroupChannels.Range(getAllChannels)
314
315 return allChannels
316}
317
318// GetAllIgmpChannel - Returns all channels with active members associated to the Igmp Group
319func (ig *IgmpGroup) GetAllIgmpChannel() map[string]string {
320 allChannels := make(map[string]string)
321 for _, igd := range ig.Devices {
322 getAllChannels := func(key interface{}, value interface{}) bool {
323 channels := key.(string)
324 allChannels[channels] = channels
325 return true
326 }
327 igd.GroupChannels.Range(getAllChannels)
328 }
329 return allChannels
330}
331
332// DelIgmpChannel deletes all receivers for the provided igmp group channel for the given device
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530333func (ig *IgmpGroup) DelIgmpChannel(cntx context.Context, deviceID string, groupAddr net.IP) map[string]*IgmpGroupPort {
Tinoj Josephcf161be2022-07-07 19:47:47 +0530334 logger.Infow(ctx, "Deleting Channel from devices", log.Fields{"Device": deviceID, "Group": ig.GroupName, "Channel": groupAddr.String()})
335 if deviceID == "" {
336 for device := range ig.Devices {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530337 ig.DelIgmpChannel(cntx, device, groupAddr)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530338 }
339 return nil
340 }
341 igd := ig.Devices[deviceID]
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530342 receivers := igd.DelChannelReceiver(cntx, groupAddr)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530343 if igd.NumReceivers() == 0 {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530344 ig.DelIgmpGroupDevice(cntx, igd)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530345 }
346 return receivers
347}
348
349// IsNewReceiver checks if the received port is new receiver or existing one.
350// Returns true if new receiver.
351func (ig *IgmpGroup) IsNewReceiver(device, uniPortID string, groupAddr net.IP) bool {
352 if ig == nil {
353 // IGMP group does not exists. So considering it as new receiver.
354 return true
355 }
356 logger.Debugw(ctx, "IGMP Group", log.Fields{"channel": groupAddr, "groupName": ig.GroupName}) // TODO: Remove me
357 igd, exists := ig.Devices[device]
358 if !exists || !igd.GroupInstalled {
359 // IGMP group not exists OR Group is not created in the device.
360 // So this is a new receiver.
361 logger.Debugw(ctx, "igd not exists or group is not created in device", log.Fields{"exists": exists}) // TODO: Remove me
362 return true
363 }
364 if igc, ok := igd.GroupChannels.Load(groupAddr.String()); ok {
365 logger.Debugw(ctx, "IGMP Channel receivers", log.Fields{"igc-receivers": igc.(*IgmpGroupChannel).CurReceivers}) // TODO: Remove me
366 _, rcvrExistCur := igc.(*IgmpGroupChannel).CurReceivers[uniPortID]
367 _, rcvrExistNew := igc.(*IgmpGroupChannel).NewReceivers[uniPortID]
368 if rcvrExistCur || rcvrExistNew {
369 // Existing receiver
370 return false
371 }
372 }
373 return true
374}
375
376// Tick for Addition of groups to an MVLAN profile
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530377func (ig *IgmpGroup) Tick(cntx context.Context) {
Tinoj Josephcf161be2022-07-07 19:47:47 +0530378 now := time.Now()
379 for _, igd := range ig.Devices {
380 var igdChangeCnt uint8
381
382 if _, ok := GetApplication().DevicesDisc.Load(igd.Device); !ok {
383 logger.Info(ctx, "Skipping Query and Expiry check since Device is unavailable")
384 continue
385 }
386 if now.After(igd.NextQueryTime) {
387 // Set the next query time and the query expiry time to
388 // KeepAliveInterval and MaxResp seconds after current time
389 igd.NextQueryTime = now.Add(time.Duration(igd.proxyCfg.KeepAliveInterval) * time.Second)
390 igd.QueryExpiryTime = now.Add(time.Duration(igd.proxyCfg.MaxResp) * time.Second)
391 logger.Debugw(ctx, "Query Start", log.Fields{"NextQuery": igd.NextQueryTime, "Expiry": igd.QueryExpiryTime})
392 igdChangeCnt++
393 logger.Debugw(ctx, "Sending Query to device", log.Fields{"Device": igd.Device})
394 sendQueryForAllChannels := func(key interface{}, value interface{}) bool {
395 igc := value.(*IgmpGroupChannel)
396 //TODO - Do generic query to avoid multiple msgs
397 igc.SendQuery()
398 return true
399 }
400 igd.GroupChannels.Range(sendQueryForAllChannels)
401 }
402 if now.After(igd.QueryExpiryTime) {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530403 igd.QueryExpiry(cntx)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530404 // This will keep it quiet till the next query time and then
405 // it will be reset to a value after the query initiation time
406 igd.QueryExpiryTime = igd.NextQueryTime
407 logger.Debugw(ctx, "Expiry", log.Fields{"NextQuery": igd.NextQueryTime, "Expiry": igd.QueryExpiryTime})
408 igdChangeCnt++
409 if igd.NumReceivers() == 0 {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530410 ig.DelIgmpGroupDevice(cntx, igd)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530411 continue
412 }
413 }
414
415 igdChangeCnt += igd.Tick()
416
417 if igdChangeCnt > 0 {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530418 if err := igd.WriteToDb(cntx); err != nil {
Tinoj Josephcf161be2022-07-07 19:47:47 +0530419 logger.Errorw(ctx, "Igmp group device Write to DB failed", log.Fields{"Device": igd.Device,
420 "GroupName": igd.GroupName, "GroupAddr": igd.GroupAddr.String()})
421 }
422 }
423 }
424}
425
426// QueryExpiry processes expiry of query sent to the receivers. Up on
427// expiry, process the consolidated response for each of the devices participating
428// in the MC stream. When a device has no receivers, the device is deleted
429// from the group.
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530430func (ig *IgmpGroup) QueryExpiry(cntx context.Context) {
Tinoj Josephcf161be2022-07-07 19:47:47 +0530431 for _, igd := range ig.Devices {
432 if _, ok := GetApplication().DevicesDisc.Load(igd.Device); ok {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530433 igd.QueryExpiry(cntx)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530434 if igd.NumReceivers() == 0 {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530435 ig.DelIgmpGroupDevice(cntx, igd)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530436 }
437
438 } else {
439 logger.Info(ctx, "Skipping Expiry since Device is unavailable")
440 }
441 }
442}
443
444// Hash : The IGMP group hash is used to distribute the processing of timers so that
445// the processing is spread across doesn't spike at one instant. This also
446// ensures that there is sufficient responsiveness to other requests happening
447// simultaneously.
448func (ig *IgmpGroup) Hash() uint16 {
449 mvp := GetApplication().GetMvlanProfileByTag(ig.Mvlan)
450
451 if mvp == nil {
452 return 0
453 }
454
455 mvp.mvpLock.RLock()
456 defer mvp.mvpLock.RUnlock()
457 group := mvp.Groups[ig.GroupName]
458
459 //Case where mvlan update in-progress
460 if group == nil || len(group.McIPs) == 0 {
461 return 0
462 }
463 groupIP := group.McIPs[0]
464 return uint16(groupIP[2])<<8 + uint16(groupIP[3])
465}
466
467// NumDevicesAll returns the number of devices (OLT) active on the IGMP group. When
468// the last device leaves the IGMP group is removed. If this is not done,
469// the number of IGMP groups only keep increasing and can impact CPU when
470// the system runs for a very long duration
471func (ig *IgmpGroup) NumDevicesAll() int {
472 return len(ig.Devices)
473}
474
475// NumDevicesActive returns the number of devices (OLT) active on the IGMP group. When
476// the last device leaves the IGMP group is removed. If this is not done,
477// the number of IGMP groups only keep increasing and can impact CPU when
478// the system runs for a very long duration
479func (ig *IgmpGroup) NumDevicesActive() int {
480 count := 0
481 for _, igd := range ig.Devices {
482 if igd.NumReceivers() == 0 && igd.GroupInstalled {
483 continue
484 }
485 count++
486 }
487 return count
488}
489
490// NumReceivers to return receiver list
491func (ig *IgmpGroup) NumReceivers() map[string]int {
492 receiverList := make(map[string]int)
493 for device, igd := range ig.Devices {
494 receiverList[device] = igd.NumReceivers()
495 }
496 return receiverList
497}
498
499// RestoreDevices : IGMP group write to DB
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530500func (ig *IgmpGroup) RestoreDevices(cntx context.Context) {
Tinoj Josephcf161be2022-07-07 19:47:47 +0530501
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530502 ig.migrateIgmpDevices(cntx)
503 devices, _ := db.GetIgmpDevices(cntx, ig.Mvlan, ig.GroupName, ig.GroupAddr)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530504 for _, device := range devices {
505 b, ok := device.Value.([]byte)
506 if !ok {
507 logger.Warn(ctx, "The value type is not []byte")
508 continue
509 }
510 if igd, err := NewIgmpGroupDeviceFromBytes(b); err == nil {
511 igd.PonPortChannelMap = util.NewConcurrentMap()
512 // Update the proxy config pointers.
513 var mcastCfg *McastConfig
514 igd.proxyCfg, igd.IgmpProxyIP, mcastCfg = getIgmpProxyCfgAndIP(ig.Mvlan, igd.SerialNo)
515 if mcastCfg != nil {
516 mcastCfg.IgmpGroupDevices.Store(igd.GroupID, igd)
517 logger.Debugw(ctx, "Igd added to mcast config", log.Fields{"mvlan": mcastCfg.MvlanProfileID, "groupId": igd.GroupID})
518 }
519
520 mvp := GetApplication().GetMvlanProfileByTag(igd.Mvlan)
521 igd.ServVersion = mvp.IgmpServVersion[igd.SerialNo]
522
523 // During vgc upgrade from old version, igd.NextQueryTime and igd.QueryExpiryTime will not be present in db.
524 // hence they are initialized with current time offset.
525 emptyTime := time.Time{}
526 if emptyTime == igd.NextQueryTime {
527 logger.Debugw(ctx, "VGC igd upgrade", log.Fields{"igd grp name": igd.GroupName})
528 igd.NextQueryTime = time.Now().Add(time.Duration(igd.proxyCfg.KeepAliveInterval) * time.Second)
529 igd.QueryExpiryTime = time.Now().Add(time.Duration(igd.proxyCfg.KeepAliveInterval) * time.Second)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530530 if err := igd.WriteToDb(cntx); err != nil {
Tinoj Josephcf161be2022-07-07 19:47:47 +0530531 logger.Errorw(ctx, "Igmp group device Write to DB failed", log.Fields{"Device": igd.Device,
532 "GroupName": igd.GroupName, "GroupAddr": igd.GroupAddr.String()})
533 }
534 }
535
536 ig.Devices[igd.Device] = igd
537 if ig.IsChannelBasedGroup {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530538 channel, _ := db.GetIgmpChannel(cntx, igd.Mvlan, igd.GroupName, igd.Device, igd.GroupAddr)
539 igd.RestoreChannel(cntx, []byte(channel))
Tinoj Josephcf161be2022-07-07 19:47:47 +0530540 } else {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530541 igd.RestoreChannels(cntx)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530542 }
543 igd.PortChannelMap.Range(printPortChannel)
544 logger.Infow(ctx, "Group Device Restored", log.Fields{"IGD": igd})
545 } else {
546 logger.Warnw(ctx, "Unable to decode device from database", log.Fields{"str": string(b)})
547 }
548 }
549}
550
551// getKey to return group key
552func (ig *IgmpGroup) getKey() string {
553 profile, ok := GetApplication().MvlanProfilesByTag.Load(ig.Mvlan)
554 if ok {
555 mvp := profile.(*MvlanProfile)
556 return mvp.generateGroupKey(ig.GroupName, ig.GroupAddr.String())
557 }
558 return ""
559}
560
561// WriteToDb is utility to write Igmp Group Info to database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530562func (ig *IgmpGroup) WriteToDb(cntx context.Context) error {
Tinoj Josephcf161be2022-07-07 19:47:47 +0530563 ig.Version = database.PresentVersionMap[database.IgmpGroupPath]
564 b, err := json.Marshal(ig)
565 if err != nil {
566 return err
567 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530568 if err1 := db.PutIgmpGroup(cntx, ig.getKey(), string(b)); err1 != nil {
Tinoj Josephcf161be2022-07-07 19:47:47 +0530569 return err1
570 }
571 return nil
572}
573
574// UpdateIgmpGroup : When the pending group is allocated to new
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530575func (ig *IgmpGroup) UpdateIgmpGroup(cntx context.Context, oldKey, newKey string) {
Tinoj Josephcf161be2022-07-07 19:47:47 +0530576
577 //If the group is allocated to same McastGroup, no need to update the
578 //IgmpGroups map
579 if oldKey == newKey {
580 return
581 }
582 logger.Infow(ctx, "Updating Igmp Group with new MVP Group Info", log.Fields{"OldKey": oldKey, "NewKey": newKey, "GroupID": ig.GroupID})
583
584 GetApplication().IgmpGroups.Delete(oldKey)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530585 _ = db.DelIgmpGroup(cntx, oldKey)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530586
587 GetApplication().IgmpGroups.Store(newKey, ig)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530588 if err := ig.WriteToDb(cntx); err != nil {
Tinoj Josephcf161be2022-07-07 19:47:47 +0530589 logger.Errorw(ctx, "Igmp group Write to DB failed", log.Fields{"groupName": ig.GroupName})
590 }
591}
592
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530593func (ig *IgmpGroup) removeExpiredGroupFromDevice(cntx context.Context) {
Tinoj Josephcf161be2022-07-07 19:47:47 +0530594 ig.PendingPoolLock.Lock()
595 defer ig.PendingPoolLock.Unlock()
596
597 for device, timer := range ig.PendingGroupForDevice {
598
599 // To ensure no race-condition between the expiry time and the new Join,
600 // ensure the group exists in pending pool before deletion
601 groupExistsInPendingPool := true
602
603 if !time.Now().After(timer) {
604 continue
605 }
606
607 // Check if the IgmpGroup obj has no active member across any device
608 // If Yes, then this group is part of global pending pool (IgmpPendingPool), hence if expired,
609 // Remove only the IgmpGroup obj referenced to this device from global pool also.
610 if ig.NumDevicesActive() == 0 {
611 groupExistsInPendingPool = GetApplication().RemoveGroupFromPendingPool(device, ig)
612 }
613
614 // Remove the group entry from device and remove the IgmpDev Obj
615 // from IgmpGrp Pending pool
616 if groupExistsInPendingPool {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530617 ig.DeleteIgmpGroupDevice(cntx, device)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530618 }
619 }
620}
621
622//DeleteIgmpGroupDevice - removes the IgmpGroupDevice obj from IgmpGroup and database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530623func (ig *IgmpGroup) DeleteIgmpGroupDevice(cntx context.Context, device string) {
Tinoj Josephcf161be2022-07-07 19:47:47 +0530624
625 logger.Infow(ctx, "Deleting IgmpGroupDevice from IG Pending Pool", log.Fields{"Device": device, "GroupID": ig.GroupID, "GroupName": ig.GroupName, "GroupAddr": ig.GroupAddr.String(), "PendingDevices": len(ig.Devices)})
626
627 igd := ig.Devices[device]
628 igd.DelMcGroup(true)
629 delete(ig.Devices, device)
630 delete(ig.PendingGroupForDevice, device)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530631 _ = db.DelIgmpDevice(cntx, igd.Mvlan, igd.GroupName, igd.GroupAddr, igd.Device)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530632
633 //If the group is not associated to any other device, then the entire Igmp Group obj itself can be removed
634 if ig.NumDevicesAll() == 0 {
635 logger.Infow(ctx, "Deleting IgmpGroup as all pending groups has expired", log.Fields{"Device": device, "GroupID": ig.GroupID, "GroupName": ig.GroupName, "GroupAddr": ig.GroupAddr.String(), "PendingDevices": len(ig.Devices)})
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530636 GetApplication().DelIgmpGroup(cntx, ig)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530637 return
638 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530639 if err := ig.WriteToDb(cntx); err != nil {
Tinoj Josephcf161be2022-07-07 19:47:47 +0530640 logger.Errorw(ctx, "Igmp group Write to DB failed", log.Fields{"groupName": ig.GroupName})
641 }
642}
643
644// DelIgmpGroup deletes all devices for the provided igmp group
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530645func (ig *IgmpGroup) DelIgmpGroup(cntx context.Context) {
Tinoj Josephcf161be2022-07-07 19:47:47 +0530646 logger.Infow(ctx, "Deleting All Device for Group", log.Fields{"Group": ig.GroupName})
647 for _, igd := range ig.Devices {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530648 ig.DelIgmpGroupDevice(cntx, igd)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530649 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530650 GetApplication().DelIgmpGroup(cntx, ig)
Tinoj Josephcf161be2022-07-07 19:47:47 +0530651}