blob: 2e49a41d51a970b621a54118faa2c6077aa5c54a [file] [log] [blame]
Naveen Sampath04696f72022-06-13 15:19:14 +05301/*
2* Copyright 2022-present Open Networking Foundation
3* Licensed under the Apache License, Version 2.0 (the "License");
4* you may not use this file except in compliance with the License.
5* You may obtain a copy of the License at
6*
7* http://www.apache.org/licenses/LICENSE-2.0
8*
9* Unless required by applicable law or agreed to in writing, software
10* distributed under the License is distributed on an "AS IS" BASIS,
11* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12* See the License for the specific language governing permissions and
13* limitations under the License.
14*/
15
16package application
17
18import (
Tinoj Joseph07cc5372022-07-18 22:53:51 +053019 "context"
Naveen Sampath04696f72022-06-13 15:19:14 +053020 "encoding/json"
21 "errors"
22 "net"
23 "reflect"
24 "voltha-go-controller/internal/pkg/types"
Naveen Sampath04696f72022-06-13 15:19:14 +053025 "strings"
26 "sync"
27 "time"
28
29 "github.com/google/gopacket"
30 "github.com/google/gopacket/layers"
31
32 cntlr "voltha-go-controller/internal/pkg/controller"
33 "voltha-go-controller/database"
34 "voltha-go-controller/internal/pkg/of"
Tinoj Joseph1d108322022-07-13 10:07:39 +053035 "voltha-go-controller/log"
Naveen Sampath04696f72022-06-13 15:19:14 +053036)
37
38const (
39 // IgmpVersion0 constant (Default init value)
40 IgmpVersion0 uint8 = 0
41 // IgmpVersion1 constant
42 IgmpVersion1 uint8 = 1
43 // IgmpVersion2 constant
44 IgmpVersion2 uint8 = 2
45 // IgmpVersion3 constant
46 IgmpVersion3 uint8 = 3
47 // MinKeepAliveInterval constant
48 MinKeepAliveInterval uint32 = 10
49 // MaxDiffKAIntervalResp constant
50 MaxDiffKAIntervalResp uint32 = 5
51 // StaticGroup constant
52 StaticGroup string = "static"
53 // DynamicGroup constant
54 DynamicGroup string = "dynamic"
55 // StaticPort constant
56 StaticPort string = "static_port"
57 // DefaultIgmpProfID constant
58 DefaultIgmpProfID = ""
59 //GroupExpiryTime - group expiry time in minutes
60 GroupExpiryTime uint32 = 15
61)
62
63const (
64 // JoinUnsuccessful constant
65 JoinUnsuccessful string = "JOIN-UNSUCCESSFUL"
66 // JoinUnsuccessfulExceededIGMPChanel constant
67 JoinUnsuccessfulExceededIGMPChanel string = "Exceeded subscriber or PON port IGMP channels threshold"
68 // JoinUnsuccessfulAddFlowGroupFailed constant
69 JoinUnsuccessfulAddFlowGroupFailed string = "Failed to add flow or group for a channel"
70 // JoinUnsuccessfulGroupNotConfigured constant
71 JoinUnsuccessfulGroupNotConfigured string = "Join received from a subscriber on non-configured group"
72 // JoinUnsuccessfulVlanDisabled constant
73 JoinUnsuccessfulVlanDisabled string = "Vlan is disabled"
74 // JoinUnsuccessfulDescription constant
75 JoinUnsuccessfulDescription string = "igmp join unsuccessful"
76 // QueryExpired constant
77 QueryExpired string = "QUERY-EXPIRED"
78 // QueryExpiredGroupSpecific constant
79 QueryExpiredGroupSpecific string = "Group specific multicast query expired"
80 // QueryExpiredDescription constant
81 QueryExpiredDescription string = "igmp query expired"
82)
83
Naveen Sampath04696f72022-06-13 15:19:14 +053084// McastConfig structure
85type McastConfig struct {
86 OltSerialNum string
87 MvlanProfileID string
88 IgmpProfileID string
89 IgmpProxyIP net.IP
90 OperState OperInProgress
91 Version string
92 // This map will help in updating the igds whenever there is a igmp profile id update
93 IgmpGroupDevices sync.Map `json:"-"` // Key is group id
94}
95
96var (
97 // NullIPAddr is null ip address var
98 NullIPAddr = net.ParseIP("0.0.0.0")
Tinoj Josephcf161be2022-07-07 19:47:47 +053099 // AllSystemsMulticastGroupIP
100 AllSystemsMulticastGroupIP = net.ParseIP("224.0.0.1")
Naveen Sampath04696f72022-06-13 15:19:14 +0530101 // igmpSrcMac for the proxy
102 igmpSrcMac string
103)
104
105func init() {
106 RegisterPacketHandler(IGMP, ProcessIgmpPacket)
107}
108
109// ProcessIgmpPacket : CallBack function registered with application to handle IGMP packetIn
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530110func ProcessIgmpPacket(cntx context.Context, device string, port string, pkt gopacket.Packet) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530111 GetApplication().IgmpPacketInd(device, port, pkt)
112}
113
114func ipv4ToUint(ip net.IP) uint32 {
115 result := uint32(0)
116 addr := ip.To4()
117 if addr == nil {
118 logger.Warnw(ctx, "Invalid Group Addr", log.Fields{"IP": ip})
119 return 0
120 }
121 result = result + uint32(addr[0])<<24
122 result = result + uint32(addr[1])<<16
123 result = result + uint32(addr[2])<<8
124 result = result + uint32(addr[3])
125 return result
126}
127
128func getPodMacAddr() (string, error) {
129 ifas, err := net.Interfaces()
130 if err != nil {
131 return "", err
132 }
133 var ipv4Addr net.IP
134 for _, ifa := range ifas {
135 addrs, err := ifa.Addrs()
136 if err != nil {
137 return "", err
138 }
139 for _, addr := range addrs {
140 if ipv4Addr = addr.(*net.IPNet).IP.To4(); ipv4Addr != nil {
141 if ipv4Addr.IsGlobalUnicast() {
142 logger.Infow(ctx, "Igmp Static config", log.Fields{"MacAddr": ifa.HardwareAddr.String(), "ipAddr": ipv4Addr})
143 return ifa.HardwareAddr.String(), nil
144 }
145 }
146 }
147
148 }
149 return "", errors.New("MAC Address not found,Setting default")
150}
151
152// IgmpUsEthLayer : Layers defined for upstream communication
153// Ethernet layer for upstream communication
154func IgmpUsEthLayer(mcip net.IP) *layers.Ethernet {
155 eth := &layers.Ethernet{}
156 // TODO: Set the source MAC properly and remove hardcoding
157 eth.SrcMAC, _ = net.ParseMAC(igmpSrcMac)
158 eth.DstMAC, _ = net.ParseMAC("01:00:5e:00:00:00")
159 eth.DstMAC[3] = mcip[1] & 0x7f
160 eth.DstMAC[4] = mcip[2]
161 eth.DstMAC[5] = mcip[3]
162 eth.EthernetType = layers.EthernetTypeDot1Q
163 return eth
164}
165
166// IgmpUsDot1qLayer set US VLAN layer
167func IgmpUsDot1qLayer(vlan of.VlanType, priority uint8) *layers.Dot1Q {
168 dot1q := &layers.Dot1Q{}
169 dot1q.Priority = priority
170 dot1q.DropEligible = false
171 dot1q.VLANIdentifier = uint16(vlan)
172 dot1q.Type = layers.EthernetTypeIPv4
173 return dot1q
174}
175
176// Igmpv2UsIpv4Layer : Set the IP layer for IGMPv2
177// TODO - Identify correct way of obtaining source IP
178// This should be the configured IGMP proxy address which should be per OLT
179// We should probably be able to have a single function for both
180// upstream and downstream
181func Igmpv2UsIpv4Layer(src net.IP, mcip net.IP) *layers.IPv4 {
182 ip := &layers.IPv4{}
183 ip.Version = 4
184 ip.Protocol = layers.IPProtocolIGMP
185 ip.TTL = 1
186 ip.SrcIP = src
187 ip.DstIP = mcip
188 return ip
189}
190
191// Igmpv3UsIpv4Layer : Set the IP layer for IGMPv3
192// TODO - Identify correct way of obtaining source IP
193// This should be the configured IGMP proxy address which should be per OLT
194// We should probably be able to have a single function for both
195// upstream and downstream
196func Igmpv3UsIpv4Layer(src net.IP) *layers.IPv4 {
197 ip := &layers.IPv4{}
198 ip.Version = 4
199 ip.Protocol = layers.IPProtocolIGMP
200 ip.TTL = 1
201 ip.SrcIP = src
202 ip.DstIP = net.ParseIP("224.0.0.22")
203 return ip
204}
205
206// IgmpDsEthLayer : Layers defined for downstream communication
207// Ethernet layer for downstream communication
208func IgmpDsEthLayer(mcip net.IP) *layers.Ethernet {
209 eth := &layers.Ethernet{}
210 // TODO: Set the source and dest MAC properly and remove hardcoding
211 eth.SrcMAC, _ = net.ParseMAC(igmpSrcMac)
212 eth.DstMAC, _ = net.ParseMAC("01:00:5e:00:00:00")
213 eth.DstMAC[3] = mcip[1] & 0x7f
214 eth.DstMAC[4] = mcip[2]
215 eth.DstMAC[5] = mcip[3]
216 eth.EthernetType = layers.EthernetTypeDot1Q
217 return eth
218}
219
220// IgmpDsDot1qLayer set the DS VLAN layer
221func IgmpDsDot1qLayer(vlan of.VlanType, priority uint8) *layers.Dot1Q {
222 dot1q := &layers.Dot1Q{}
223 dot1q.Priority = priority
224 dot1q.DropEligible = false
225 dot1q.VLANIdentifier = uint16(vlan)
226 dot1q.Type = layers.EthernetTypeIPv4
227 return dot1q
228}
229
230// IgmpDsIpv4Layer set the IP layer
231func IgmpDsIpv4Layer(src net.IP, mcip net.IP) *layers.IPv4 {
232 ip := &layers.IPv4{}
233 ip.Version = 4
234 ip.Protocol = layers.IPProtocolIGMP
235 ip.TTL = 1
236 ip.SrcIP = src
237 if mcip.Equal(net.ParseIP("0.0.0.0")) {
238 mcip = net.ParseIP("224.0.0.1")
239 }
240 ip.DstIP = mcip
241 return ip
242}
243
244// IgmpQueryv2Layer : IGMP Query Layer
245func IgmpQueryv2Layer(mcip net.IP, resptime time.Duration) *layers.IGMPv1or2 {
246 igmp := &layers.IGMPv1or2{}
247 igmp.Type = layers.IGMPMembershipQuery
248 igmp.GroupAddress = mcip
249 igmp.MaxResponseTime = resptime
250 return igmp
251}
252
253// IgmpQueryv3Layer : IGMP v3 Query Layer
254func IgmpQueryv3Layer(mcip net.IP, resptime time.Duration) *layers.IGMP {
255 igmp := &layers.IGMP{}
256 igmp.Type = layers.IGMPMembershipQuery
257 igmp.GroupAddress = mcip
258 igmp.MaxResponseTime = resptime
259 return igmp
260}
261
262// IgmpReportv2Layer : IGMP Layer
263func IgmpReportv2Layer(mcip net.IP) *layers.IGMPv1or2 {
264 igmp := &layers.IGMPv1or2{}
265 igmp.Type = layers.IGMPMembershipReportV2
266 igmp.GroupAddress = mcip
267 return igmp
268}
269
270// IgmpLeavev2Layer : IGMP Leave Layer
271func IgmpLeavev2Layer(mcip net.IP) *layers.IGMPv1or2 {
272 igmp := &layers.IGMPv1or2{}
273 igmp.Type = layers.IGMPLeaveGroup
274 igmp.GroupAddress = mcip
275 return igmp
276}
277
278// IgmpReportv3Layer : IGMP v3 Report Layer
279func IgmpReportv3Layer(mcip net.IP, incl bool, srclist []net.IP) *layers.IGMP {
280 // IGMP base
281 igmp := &layers.IGMP{}
282 igmp.Type = layers.IGMPMembershipReportV3
283 igmp.NumberOfGroupRecords = 1
284
285 // IGMP Group
286 group := layers.IGMPv3GroupRecord{}
287 if incl {
288 group.Type = layers.IGMPIsIn
289 } else {
290 group.Type = layers.IGMPIsEx
291 }
292 group.MulticastAddress = mcip
293 group.NumberOfSources = uint16(len(srclist))
294 group.SourceAddresses = srclist
295 igmp.GroupRecords = append(igmp.GroupRecords, group)
296
297 return igmp
298}
299
300// Igmpv2QueryPacket : IGMP Query in Downstream
301func Igmpv2QueryPacket(mcip net.IP, vlan of.VlanType, selfip net.IP, pbit uint8, maxResp uint32) ([]byte, error) {
302 // Construct the layers that form the packet
303 eth := IgmpDsEthLayer(mcip)
304 dot1q := IgmpDsDot1qLayer(vlan, pbit)
305 ip := IgmpDsIpv4Layer(selfip, mcip)
306 igmp := IgmpQueryv2Layer(mcip, time.Duration(maxResp)*time.Second)
307
308 // Now prepare the buffer into which the layers are to be serialized
309 buff := gopacket.NewSerializeBuffer()
310 opts := gopacket.SerializeOptions{
311 FixLengths: true,
312 ComputeChecksums: true,
313 }
314 if err := gopacket.SerializeLayers(buff, opts, eth, dot1q, ip, igmp); err != nil {
315 logger.Error(ctx, "Error in serializing layers")
316 return nil, err
317 }
318 return buff.Bytes(), nil
319}
320
321// Igmpv3QueryPacket : IGMPv3 Query in Downstream
322func Igmpv3QueryPacket(mcip net.IP, vlan of.VlanType, selfip net.IP, pbit uint8, maxResp uint32) ([]byte, error) {
323 // Construct the layers that form the packet
324 eth := IgmpDsEthLayer(mcip)
325 dot1q := IgmpDsDot1qLayer(vlan, pbit)
326 ip := IgmpDsIpv4Layer(selfip, mcip)
327 igmp := IgmpQueryv3Layer(mcip, time.Duration(maxResp)*time.Second)
328
329 // Now prepare the buffer into which the layers are to be serialized
330 buff := gopacket.NewSerializeBuffer()
331 opts := gopacket.SerializeOptions{
332 FixLengths: true,
333 ComputeChecksums: true,
334 }
335 if err := gopacket.SerializeLayers(buff, opts, eth, dot1q, ip, igmp); err != nil {
336 logger.Error(ctx, "Error in serializing layers")
337 return nil, err
338 }
339 return buff.Bytes(), nil
340}
341
342// IgmpReportv2Packet : Packet - IGMP v2 report in upstream
343func IgmpReportv2Packet(mcip net.IP, vlan of.VlanType, priority uint8, selfip net.IP) ([]byte, error) {
344 // Construct the layers that form the packet
345 eth := IgmpUsEthLayer(mcip)
346 dot1q := IgmpUsDot1qLayer(vlan, priority)
347 ip := Igmpv2UsIpv4Layer(selfip, mcip)
348 igmp := IgmpReportv2Layer(mcip)
349
350 // Now prepare the buffer into which the layers are to be serialized
351 buff := gopacket.NewSerializeBuffer()
352 opts := gopacket.SerializeOptions{
353 FixLengths: true,
354 ComputeChecksums: true,
355 }
356 if err := gopacket.SerializeLayers(buff, opts, eth, dot1q, ip, igmp); err != nil {
357 logger.Error(ctx, "Error in serializing layers")
358 return nil, err
359 }
360 return buff.Bytes(), nil
361}
362
363// Igmpv3ReportPacket : Packet - IGMP v3 report in upstream
364func Igmpv3ReportPacket(mcip net.IP, vlan of.VlanType, priority uint8, selfip net.IP, incl bool, srclist []net.IP) ([]byte, error) {
365 // Construct the layers that form the packet
366 eth := IgmpUsEthLayer(net.ParseIP("224.0.0.22").To4())
367 dot1q := IgmpUsDot1qLayer(vlan, priority)
368 ip := Igmpv3UsIpv4Layer(selfip)
369 igmp := IgmpReportv3Layer(mcip, incl, srclist)
370
371 // Now prepare the buffer into which the layers are to be serialized
372 buff := gopacket.NewSerializeBuffer()
373 opts := gopacket.SerializeOptions{
374 FixLengths: true,
375 ComputeChecksums: true,
376 }
377 if err := gopacket.SerializeLayers(buff, opts, eth, dot1q, ip, igmp); err != nil {
378 logger.Error(ctx, "Error in serializing layers")
379 return nil, err
380 }
381 return buff.Bytes(), nil
382}
383
384// IgmpLeavePacket : Packet- IGMP Leave in upstream
385func IgmpLeavePacket(mcip net.IP, vlan of.VlanType, priority uint8, selfip net.IP) ([]byte, error) {
386 // Construct the layers that form the packet
387 eth := IgmpUsEthLayer(mcip)
388 dot1q := IgmpUsDot1qLayer(vlan, priority)
389 ip := Igmpv2UsIpv4Layer(selfip, mcip)
390 igmp := IgmpLeavev2Layer(mcip)
391
392 // Now prepare the buffer into which the layers are to be serialized
393 buff := gopacket.NewSerializeBuffer()
394 opts := gopacket.SerializeOptions{
395 FixLengths: true,
396 ComputeChecksums: true,
397 }
398 if err := gopacket.SerializeLayers(buff, opts, eth, dot1q, ip, igmp); err != nil {
399 logger.Error(ctx, "Error in serializing layers")
400 return nil, err
401 }
402 return buff.Bytes(), nil
403}
404
405// getVersion to get igmp version type
406func getVersion(ver string) uint8 {
407 if ver == "2" || ver == "v2" {
408 return IgmpVersion2
409 }
410 return IgmpVersion3
411}
412
413// IsIPPresent is Utility to check if an IP address is in a list
414func IsIPPresent(i net.IP, ips []net.IP) bool {
415 for _, ip := range ips {
416 if i.Equal(ip) {
417 return true
418 }
419 }
420 return false
421}
422
Naveen Sampath04696f72022-06-13 15:19:14 +0530423//AddToPendingPool - adds Igmp Device obj to pending pool
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530424func AddToPendingPool(cntx context.Context, device string, groupKey string) bool {
Naveen Sampath04696f72022-06-13 15:19:14 +0530425
426 logger.Infow(ctx, "Add Device to IgmpGroup Pending Pool", log.Fields{"Device": device, "GroupKey": groupKey})
427 if grp, ok := GetApplication().IgmpGroups.Load(groupKey); ok {
428 ig := grp.(*IgmpGroup)
429 ig.PendingPoolLock.Lock()
430 logger.Infow(ctx, "Adding Device to IgmpGroup Pending Pool", log.Fields{"Device": device, "GroupID": ig.GroupID, "GroupName": ig.GroupName, "GroupAddr": ig.GroupAddr.String()})
431 ig.PendingGroupForDevice[device] = time.Now().Add(time.Duration(GroupExpiryTime) * time.Minute)
432 ig.PendingPoolLock.Unlock()
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530433 if err := ig.WriteToDb(cntx); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +0530434 logger.Errorw(ctx, "Igmp group Write to DB failed", log.Fields{"groupName": ig.GroupName})
435 }
436 return true
437 }
438 return false
439}
440
441/*
442func checkIfForceGroupRemove(device string) bool {
443 if d := GetApplication().GetDevice(device); d != nil {
444 if d.State == cntlr.DeviceStateREBOOTED || d.State == cntlr.DeviceStateDOWN {
445 return true
446 }
447 }
448 return false
449}*/
450
Naveen Sampath04696f72022-06-13 15:19:14 +0530451// SendQueryExpiredEventGroupSpecific to send group specific query expired event.
452func SendQueryExpiredEventGroupSpecific(portKey string, igd *IgmpGroupDevice, igc *IgmpGroupChannel) {
453
454 logger.Info(ctx, "Processing-SendQueryExpiredEventGroupSpecific-Event")
455 va := GetApplication()
456 mvpName := va.GetMvlanProfileByTag(igd.Mvlan).Name
457
458 sendEvent := func(key interface{}, value interface{}) bool {
459 if value.(*VoltService).IgmpEnabled && value.(*VoltService).MvlanProfileName == mvpName {
460 logger.Debugw(ctx, "sending-query-expired-group-specific-event", log.Fields{"EventType": QueryExpiredGroupSpecific, "ServiceName": value.(*VoltService).Name})
461 }
462 return false
463 }
464
465 // Fetching service name to send with query expired event.
466 vpvs, _ := va.VnetsByPort.Load(portKey)
467 if vpvs == nil {
468 logger.Errorw(ctx, "volt-port-vnet-is-nil", log.Fields{"vpvs": vpvs})
469 return
470 }
471
472 for _, vpv := range vpvs.([]*VoltPortVnet) {
473 vpv.services.Range(sendEvent)
474 }
475}
476
477// GetMcastServiceForSubAlarm to get mcast service name for subscriber alarm.
478func GetMcastServiceForSubAlarm(uniPort *VoltPort, mvp *MvlanProfile) string {
479
480 var serviceName string
481 mvpName := mvp.Name
482
483 va := GetApplication()
484
485 sendAlm := func(key interface{}, value interface{}) bool {
486 if value.(*VoltService).IgmpEnabled && value.(*VoltService).MvlanProfileName == mvpName {
487 serviceName = value.(*VoltService).Name
488 }
489 return true
490 }
491
492 // Fetching service name to send with active channels exceeded per subscriber alarm.
493 vpvs, _ := va.VnetsByPort.Load(uniPort.Name)
494 if vpvs == nil {
495 logger.Errorw(ctx, "volt-port-vnet-is-nil", log.Fields{"vpvs": vpvs})
496 return serviceName
497 }
498
499 for _, vpv := range vpvs.([]*VoltPortVnet) {
500 vpv.services.Range(sendAlm)
501 }
502
503 return serviceName
504
505}
506
Naveen Sampath04696f72022-06-13 15:19:14 +0530507// RestoreIgmpGroupsFromDb to restore igmp groups from database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530508func (va *VoltApplication) RestoreIgmpGroupsFromDb(cntx context.Context) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530509
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530510 groups, _ := db.GetIgmpGroups(cntx)
Naveen Sampath04696f72022-06-13 15:19:14 +0530511 for _, group := range groups {
512 b, ok := group.Value.([]byte)
513 if !ok {
514 logger.Warn(ctx, "The value type is not []byte")
515 continue
516 }
517 var ig IgmpGroup
518 err := json.Unmarshal(b, &ig)
519 if err != nil {
520 logger.Warn(ctx, "Unmarshal of IGMP Group failed")
521 continue
522 }
523 ig.Devices = make(map[string]*IgmpGroupDevice)
524
525 //For Upgrade Case
526 if len(ig.PendingGroupForDevice) == 0 {
527 ig.PendingGroupForDevice = make(map[string]time.Time)
528 }
529 logger.Infow(ctx, "Restoring Groups", log.Fields{"igGroupID": ig.GroupID, "igGroupName": ig.GroupName, "igMvlan": ig.Mvlan})
530 grpKey := ig.getKey()
531 va.IgmpGroups.Store(grpKey, &ig)
532 // Just delete and lose the IGMP group with the same group Id
533 if _, err := va.GetIgmpGroupID(ig.GroupID); err != nil {
534 logger.Warnw(ctx, "GetIgmpGroupID Failed", log.Fields{"igGroupID": ig.GroupID, "Error": err})
535 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530536 ig.RestoreDevices(cntx)
Naveen Sampath04696f72022-06-13 15:19:14 +0530537
538 if ig.NumDevicesActive() == 0 {
539 va.AddGroupToPendingPool(&ig)
540 }
541 logger.Infow(ctx, "Restored Groups", log.Fields{"igGroupID": ig.GroupID, "igGroupName": ig.GroupName, "igMvlan": ig.Mvlan})
542 }
543}
544
545// AddIgmpGroup : When the first IGMP packet is received, the MVLAN profile is identified
546// for the IGMP group and grp obj is obtained from the available pending pool of groups.
547// If not, new group obj will be created based on available group IDs
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530548func (va *VoltApplication) AddIgmpGroup(cntx context.Context, mvpName string, gip net.IP, device string) *IgmpGroup {
Naveen Sampath04696f72022-06-13 15:19:14 +0530549
550 var ig *IgmpGroup
551 if mvp, grpName := va.GetMvlanProfileForMcIP(mvpName, gip); mvp != nil {
552 if ig = va.GetGroupFromPendingPool(mvp.Mvlan, device); ig != nil {
553 logger.Infow(ctx, "Igmp Group obtained from global pending pool", log.Fields{"MvlanProfile": mvpName, "GroupID": ig.GroupID, "Device": device, "GroupName": ig.GroupName, "GroupAddr": ig.GroupAddr.String()})
554 oldKey := mvp.generateGroupKey(ig.GroupName, ig.GroupAddr.String())
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530555 ig.IgmpGroupReInit(cntx, grpName, gip)
Naveen Sampath04696f72022-06-13 15:19:14 +0530556 ig.IsGroupStatic = mvp.Groups[grpName].IsStatic
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530557 ig.UpdateIgmpGroup(cntx, oldKey, ig.getKey())
Naveen Sampath04696f72022-06-13 15:19:14 +0530558 } else {
559 logger.Infow(ctx, "No Igmp Group available in global pending pool. Creating new Igmp Group", log.Fields{"MvlanProfile": mvpName, "Device": device, "GroupAddr": gip.String()})
560 if ig = va.GetAvailIgmpGroupID(); ig == nil {
561 logger.Error(ctx, "Igmp Group Creation Failed: Group Id Unavailable")
562 return nil
563 }
564 ig.IgmpGroupInit(grpName, gip, mvp)
565 grpKey := ig.getKey()
566 va.IgmpGroups.Store(grpKey, ig)
567 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530568 if err := ig.WriteToDb(cntx); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +0530569 logger.Errorw(ctx, "Igmp group Write to DB failed", log.Fields{"groupName": ig.GroupName})
570 }
571 return ig
572 }
Tinoj Joseph1d108322022-07-13 10:07:39 +0530573 logger.Errorw(ctx, "GetMvlan Pro failed", log.Fields{"Group": gip})
Naveen Sampath04696f72022-06-13 15:19:14 +0530574 return nil
575}
576
577// GetIgmpGroup helps search for the IGMP group from the list of
578// active IGMP groups. For now, the assumption is that a group
579// cannot belong to more than on MVLAN. If we change that definition,
580// we have to take a relook at this implementation. The key will include
581// both MVLAN and the group IP.
582func (va *VoltApplication) GetIgmpGroup(mvlan of.VlanType, gip net.IP) *IgmpGroup {
583
584 profile, _ := va.MvlanProfilesByTag.Load(mvlan)
585 if profile == nil {
586 logger.Errorw(ctx, "Mvlan Profile not found for incoming packet. Dropping Request", log.Fields{"Mvlan": mvlan, "GroupAddr": gip.String()})
587 return nil
588 }
589 mvp := profile.(*MvlanProfile)
590 _, gName := va.GetMvlanProfileForMcIP(mvp.Name, gip)
591 grpKey := mvp.generateGroupKey(gName, gip.String())
592 logger.Debugw(ctx, "Get IGMP Group", log.Fields{"Group": grpKey})
593 igIntf, ok := va.IgmpGroups.Load(grpKey)
594 if ok {
595 logger.Debugw(ctx, "Get IGMP Group Success", log.Fields{"Group": grpKey})
596 ig := igIntf.(*IgmpGroup)
597
598 //Case: Group was part of pending and Join came with same channel or different channel from same group
599 // (from same or different device)
600 // In that case, the same group will be allocated since the group is still part of va.IgmpGroups
601 // So, the groups needs to be removed from global pending pool
602 va.RemoveGroupDevicesFromPendingPool(ig)
603 return ig
604 }
605 return nil
606}
607
Naveen Sampath04696f72022-06-13 15:19:14 +0530608// DelIgmpGroup : When the last subscriber leaves the IGMP group across all the devices
609// the IGMP group is removed.
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530610func (va *VoltApplication) DelIgmpGroup(cntx context.Context, ig *IgmpGroup) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530611
612 profile, found := GetApplication().MvlanProfilesByTag.Load(ig.Mvlan)
613 if found {
614 mvp := profile.(*MvlanProfile)
615
616 grpKey := mvp.generateGroupKey(ig.GroupName, ig.GroupAddr.String())
617
618 if igIntf, ok := va.IgmpGroups.Load(grpKey); ok {
619 ig := igIntf.(*IgmpGroup)
620 ig.IgmpGroupLock.Lock()
621 if ig.NumDevicesAll() == 0 {
622 logger.Debugw(ctx, "Deleting IGMP Group", log.Fields{"Group": grpKey})
623 va.PutIgmpGroupID(ig)
624 va.IgmpGroups.Delete(grpKey)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530625 _ = db.DelIgmpGroup(cntx, grpKey)
Naveen Sampath04696f72022-06-13 15:19:14 +0530626 } else {
627 logger.Infow(ctx, "Skipping IgmpGroup Device. Pending Igmp Group Devices present", log.Fields{"GroupID": ig.GroupID, "GroupName": ig.GroupName, "GroupAddr": ig.GroupAddr.String(), "PendingDevices": len(ig.Devices)})
628 va.AddGroupToPendingPool(ig)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530629 if err := ig.WriteToDb(cntx); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +0530630 logger.Errorw(ctx, "Igmp group Write to DB failed", log.Fields{"groupName": ig.GroupName})
631 }
632 }
633 ig.IgmpGroupLock.Unlock()
634 }
635
636 }
637}
638
639// GetPonPortID Gets the PON port ID from uniPortID
640func (va *VoltApplication) GetPonPortID(device, uniPortID string) uint32 {
641
642 isNNI := strings.Contains(uniPortID, "nni")
643 if isNNI || uniPortID == StaticPort {
644 logger.Debugw(ctx, "Cannot get pon port from UNI port", log.Fields{"port": uniPortID})
645 return 0xFF
646 }
647 dIntf, ok := va.DevicesDisc.Load(device)
648 if !ok {
649 return 0xFF
650 }
651 d := dIntf.(*VoltDevice)
652
653 uniPort := d.GetPort(uniPortID)
654 if uniPort == nil {
655 return 0xFF
656 }
657 return GetPonPortIDFromUNIPort(uniPort.ID)
658}
659
660// AggActiveChannelsCountPerSub aggregates the active channel count for given uni port.
661// It will iterate over all the groups and store the sum of active channels in VoltPort.
662func (va *VoltApplication) AggActiveChannelsCountPerSub(device, uniPort string, port *VoltPort) {
663 var activeChannelCount uint32
664
665 collectActiveChannelCount := func(key interface{}, value interface{}) bool {
666 ig := value.(*IgmpGroup)
667 igd := ig.Devices[device]
668 if igd == nil {
669 return true
670 }
671 if portChannels, ok := igd.PortChannelMap.Load(uniPort); ok {
672 channelList := portChannels.([]net.IP)
673 activeChannelCount += uint32(len(channelList))
674 }
675 return true
676 }
677 va.IgmpGroups.Range(collectActiveChannelCount)
678
679 logger.Debugw(ctx, "AggrActiveChannelCount for Subscriber",
680 log.Fields{"UniPortID": uniPort, "count": activeChannelCount})
681
682 port.ActiveChannels = activeChannelCount
683}
684
685// AggActiveChannelsCountForPonPort Aggregates the active channel count for given pon port.
686// It will iterate over all the groups and store the sum of active channels in VoltDevice.
687func (va *VoltApplication) AggActiveChannelsCountForPonPort(device string, ponPortID uint32, port *PonPortCfg) {
688
689 var activeChannelCount uint32
690
691 collectActiveChannelCount := func(key interface{}, value interface{}) bool {
692 ig := value.(*IgmpGroup)
693 igd := ig.Devices[device]
694 if igd == nil {
695 return true
696 }
697 if ponPortChannels, ok := igd.PonPortChannelMap.Get(ponPortID); ok {
698 activeChannelCount += ponPortChannels.(*PonPortChannels).GetActiveChannelCount()
699 }
700 return true
701 }
702 va.IgmpGroups.Range(collectActiveChannelCount)
703
704 logger.Debugw(ctx, "AggrActiveChannelCount for Pon Port",
705 log.Fields{"PonPortID": ponPortID, "count": activeChannelCount})
706
707 port.ActiveIGMPChannels = activeChannelCount
708}
709
710// UpdateActiveChannelCountForPonPort increments the global counter for active
711// channel count per pon port.
712func (va *VoltApplication) UpdateActiveChannelCountForPonPort(device, uniPortID string, ponPortID uint32, isAdd, isChannel bool, igd *IgmpGroupDevice) {
713 incrDecr := func(value uint32) uint32 {
714 if isAdd {
715 return value + 1
716 }
717 return value - 1
718 }
719 if d, exists := va.DevicesDisc.Load(device); exists {
720 voltDevice := d.(*VoltDevice)
721
722 if isChannel {
723 voltDevice.ActiveChannelCountLock.Lock()
724 // If New channel is added/deleted, then only update the ActiveChannelsPerPon
725 if value, ok := voltDevice.ActiveChannelsPerPon.Load(ponPortID); ok {
726 port := value.(*PonPortCfg)
727 port.ActiveIGMPChannels = incrDecr(port.ActiveIGMPChannels)
728 voltDevice.ActiveChannelsPerPon.Store(ponPortID, port)
729 logger.Debugw(ctx, "+++ActiveChannelsPerPon", log.Fields{"count": port.ActiveIGMPChannels}) // TODO: remove me
730 }
731 voltDevice.ActiveChannelCountLock.Unlock()
732 }
733 if uPort, ok := voltDevice.Ports.Load(uniPortID); ok {
734 uniPort := uPort.(*VoltPort)
735 uniPort.ActiveChannels = incrDecr(uniPort.ActiveChannels)
736 voltDevice.Ports.Store(uniPortID, uniPort)
737 logger.Debugw(ctx, "+++ActiveChannelsPerSub", log.Fields{"count": uniPort.ActiveChannels}) // TODO: remove me
738 }
739 }
740}
741
742// IsMaxChannelsCountExceeded checks if the PON port active channel
743// capacity and subscriber level channel capacity is reached to max allowed
744// channel per pon threshold. If Exceeds, return true else return false.
745func (va *VoltApplication) IsMaxChannelsCountExceeded(device, uniPortID string,
746 ponPortID uint32, ig *IgmpGroup, channelIP net.IP, mvp *MvlanProfile) bool {
747
748 // New receiver check is required to identify the IgmpReportMsg received
749 // in response to the IGMP Query sent from VGC.
750 if newReceiver := ig.IsNewReceiver(device, uniPortID, channelIP); !newReceiver {
751 logger.Debugw(ctx, "Not a new receiver. It is a response to IGMP Query",
752 log.Fields{"port": uniPortID, "channel": channelIP})
753 return false
754 }
755
756 if vDev, exists := va.DevicesDisc.Load(device); exists {
757 voltDevice := vDev.(*VoltDevice)
758
759 // Checking subscriber active channel count with maxChannelsAllowedPerSub
760 if uniPort, present := voltDevice.Ports.Load(uniPortID); present {
761 if uniPort.(*VoltPort).ActiveChannels >= mvp.MaxActiveChannels {
762 logger.Errorw(ctx, "Max allowed channels per subscriber is exceeded",
763 log.Fields{"activeCount": uniPort.(*VoltPort).ActiveChannels, "channel": channelIP, "UNI": uniPort.(*VoltPort).Name})
764 if !(uniPort.(*VoltPort).ChannelPerSubAlarmRaised) {
765 serviceName := GetMcastServiceForSubAlarm(uniPort.(*VoltPort), mvp)
766 logger.Debugw(ctx, "Raising-SendActiveChannelPerSubscriberAlarm-Initiated", log.Fields{"ActiveChannels": uniPort.(*VoltPort).ActiveChannels, "ServiceName": serviceName})
767 uniPort.(*VoltPort).ChannelPerSubAlarmRaised = true
768 }
769 return true
770 }
771 } else {
772 logger.Errorw(ctx, "UNI port not found in VoltDevice", log.Fields{"uniPortID": uniPortID})
773 }
774 if value, ok := voltDevice.ActiveChannelsPerPon.Load(ponPortID); ok {
775 ponPort := value.(*PonPortCfg)
776
777 logger.Debugw(ctx, "----Active channels count for PON port",
778 log.Fields{"PonPortID": ponPortID, "activeChannels": ponPort.ActiveIGMPChannels,
779 "maxAllowedChannelsPerPon": ponPort.MaxActiveChannels})
780
781 if ponPort.ActiveIGMPChannels < ponPort.MaxActiveChannels {
782 // PON port active channel capacity is not yet reached to max allowed channels per pon.
783 // So allowing to add receiver.
784 return false
785 } else if ponPort.ActiveIGMPChannels >= ponPort.MaxActiveChannels && ig != nil {
786 // PON port active channel capacity is reached to max allowed channels per pon.
787 // Check if same channel is already configured on that PON port.
788 // If that channel is present, then allow AddReceiver else it will be rejected.
789 igd, isPresent := ig.Devices[device]
790 if isPresent {
791 if channelListForPonPort, _ := igd.PonPortChannelMap.Get(ponPortID); channelListForPonPort != nil {
792 if _, isExists := channelListForPonPort.(*PonPortChannels).ChannelList.Get(channelIP.String()); isExists {
793 return false
794 }
795 }
796 }
797 }
798 logger.Errorw(ctx, "Active channels count for PON port exceeded",
799 log.Fields{"PonPortID": ponPortID, "activeChannels": ponPort.ActiveIGMPChannels, "channel": channelIP, "UNI": uniPortID})
800 } else {
801 logger.Warnw(ctx, "PON port level active channel count does not exists",
802 log.Fields{"ponPortID": ponPortID})
803 return false
804 }
805 }
806 logger.Warnw(ctx, "Max allowed channels per pon threshold is reached", log.Fields{"PonPortID": ponPortID})
807 return true
808}
809
810// ProcessIgmpv2Pkt : This is IGMPv2 packet.
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530811func (va *VoltApplication) ProcessIgmpv2Pkt(cntx context.Context, device string, port string, pkt gopacket.Packet) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530812 // First get the layers of interest
813 dot1Q := pkt.Layer(layers.LayerTypeDot1Q).(*layers.Dot1Q)
814 pktVlan := of.VlanType(dot1Q.VLANIdentifier)
815 igmpv2 := pkt.Layer(layers.LayerTypeIGMP).(*layers.IGMPv1or2)
816
817 ponPortID := va.GetPonPortID(device, port)
818
819 var vpv *VoltPortVnet
820
821 logger.Debugw(ctx, "Received IGMPv2 Type", log.Fields{"Type": igmpv2.Type})
822
823 if igmpv2.Type == layers.IGMPMembershipReportV2 || igmpv2.Type == layers.IGMPMembershipReportV1 {
824
825 logger.Infow(ctx, "IGMP Join received: v2", log.Fields{"Addr": igmpv2.GroupAddress, "Port": port})
826
827 // This is a report coming from the PON. We must be able to first find the
828 // subscriber from the VLAN tag and port and verify if the IGMP proxy is
829 // enabled for the subscriber
830 vpv, _ = va.GetVnetFromPkt(device, port, pkt)
831
832 if vpv == nil {
833 logger.Errorw(ctx, "Couldn't find VNET associated with port", log.Fields{"Port": port})
834 return
835 } else if !vpv.IgmpEnabled {
836 logger.Errorw(ctx, "IGMP is not activated on the port", log.Fields{"Port": port})
837 return
838 }
839
840 mvp := va.GetMvlanProfileByName(vpv.MvlanProfileName)
841 if mvp == nil {
842 logger.Errorw(ctx, "Igmp Packet Received for Subscriber with Missing Mvlan Profile",
843 log.Fields{"Receiver": vpv.Port, "MvlanProfile": vpv.MvlanProfileName})
844 return
845 }
846 mvlan := mvp.Mvlan
847
848 mvp.mvpLock.RLock()
849 defer mvp.mvpLock.RUnlock()
850 // The subscriber is validated and now process the IGMP report
851 ig := va.GetIgmpGroup(mvlan, igmpv2.GroupAddress)
852
853 if yes := va.IsMaxChannelsCountExceeded(device, port, ponPortID, ig, igmpv2.GroupAddress, mvp); yes {
854 logger.Warnw(ctx, "Dropping IGMP Join v2: Active channel threshold exceeded",
855 log.Fields{"PonPortID": ponPortID, "Addr": igmpv2.GroupAddress, "MvlanProfile": vpv.MvlanProfileName})
856 return
857 }
858 if ig != nil {
859 logger.Infow(ctx, "IGMP Group", log.Fields{"Group": ig.GroupID, "devices": ig.Devices})
860 // If the IGMP group is already created. just add the receiver
861 ig.IgmpGroupLock.Lock()
862 // Check for port state to avoid race condition where PortDown event
863 // acquired lock before packet processing
864 vd := GetApplication().GetDevice(device)
865 vp := vd.GetPort(port)
866 if vp == nil || vp.State != PortStateUp {
867 logger.Warnw(ctx, "Join received from a Port that is DOWN or not present",
868 log.Fields{"Port": port})
869 ig.IgmpGroupLock.Unlock()
870 return
871 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530872 ig.AddReceiver(cntx, device, port, igmpv2.GroupAddress, nil, IgmpVersion2, dot1Q.VLANIdentifier, dot1Q.Priority, ponPortID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530873 ig.IgmpGroupLock.Unlock()
874 } else {
875 // Create the IGMP group and then add the receiver to the group
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530876 if ig := va.AddIgmpGroup(cntx, vpv.MvlanProfileName, igmpv2.GroupAddress, device); ig != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +0530877 logger.Infow(ctx, "New IGMP Group", log.Fields{"Group": ig.GroupID, "devices": ig.Devices})
878 ig.IgmpGroupLock.Lock()
879 // Check for port state to avoid race condition where PortDown event
880 // acquired lock before packet processing
881 vd := GetApplication().GetDevice(device)
882 vp := vd.GetPort(port)
883 if vp == nil || vp.State != PortStateUp {
884 logger.Warnw(ctx, "Join received from a Port that is DOWN or not present",
885 log.Fields{"Port": port})
886 ig.IgmpGroupLock.Unlock()
887 return
888 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530889 ig.AddReceiver(cntx, device, port, igmpv2.GroupAddress, nil, IgmpVersion2, dot1Q.VLANIdentifier, dot1Q.Priority, ponPortID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530890 ig.IgmpGroupLock.Unlock()
891 } else {
892 logger.Errorw(ctx, "IGMP Group Creation Failed", log.Fields{"Addr": igmpv2.GroupAddress})
893 return
894 }
895 }
896 } else if igmpv2.Type == layers.IGMPLeaveGroup {
897 // This is a IGMP leave coming from one of the receivers. We essentially remove the
898 // the receiver.
899 logger.Infow(ctx, "IGMP Leave received: v2", log.Fields{"Addr": igmpv2.GroupAddress, "Port": port})
900
901 vpv, _ = va.GetVnetFromPkt(device, port, pkt)
902 if vpv == nil {
903 logger.Errorw(ctx, "Couldn't find VNET associated with port", log.Fields{"Port": port})
904 return
905 } else if !vpv.IgmpEnabled {
906 logger.Errorw(ctx, "IGMP is not activated on the port", log.Fields{"Port": port})
907 return
908 }
909
910 mvp := va.GetMvlanProfileByName(vpv.MvlanProfileName)
911 mvp.mvpLock.RLock()
912 defer mvp.mvpLock.RUnlock()
913 mvlan := mvp.Mvlan
914 // The subscriber is validated and now process the IGMP report
915 if ig := va.GetIgmpGroup(mvlan, igmpv2.GroupAddress); ig != nil {
916 ig.IgmpGroupLock.Lock()
917 // Delete the receiver once the IgmpGroup is identified
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530918 ig.DelReceiver(cntx, device, port, igmpv2.GroupAddress, nil, ponPortID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530919 ig.IgmpGroupLock.Unlock()
920 if ig.NumDevicesActive() == 0 {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530921 va.DelIgmpGroup(cntx, ig)
Naveen Sampath04696f72022-06-13 15:19:14 +0530922 }
923 }
924 } else {
925 // This must be a query on the NNI port. However, we dont make that assumption.
926 // Need to look for the IGMP group based on the VLAN in the packet as
927 // the MVLAN
928
929 //Check if mvlan profile exist for the incoming pkt vlan
930 profile, _ := va.MvlanProfilesByTag.Load(pktVlan)
931 if profile == nil {
932 logger.Errorw(ctx, "Mvlan Profile not found for incoming packet. Dropping Request", log.Fields{"Mvlan": pktVlan})
933 return
934 }
935 mvp := profile.(*MvlanProfile)
936 mvp.mvpLock.RLock()
937 defer mvp.mvpLock.RUnlock()
938
939 if net.ParseIP("0.0.0.0").Equal(igmpv2.GroupAddress) {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530940 va.processIgmpQueries(cntx, device, pktVlan, IgmpVersion2)
Naveen Sampath04696f72022-06-13 15:19:14 +0530941 } else {
942 if ig := va.GetIgmpGroup(pktVlan, igmpv2.GroupAddress); ig != nil {
943 ig.IgmpGroupLock.Lock()
944 igd, ok := ig.Devices[device]
945 if ok {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530946 igd.ProcessQuery(cntx, igmpv2.GroupAddress, IgmpVersion2)
Naveen Sampath04696f72022-06-13 15:19:14 +0530947 } else {
948 logger.Warnw(ctx, "IGMP Device not found", log.Fields{"Device": device, "Group": igmpv2.GroupAddress})
949 }
950 ig.IgmpGroupLock.Unlock()
951 }
952 }
953 }
954}
955
956// ProcessIgmpv3Pkt : Process IGMPv3 packet
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530957func (va *VoltApplication) ProcessIgmpv3Pkt(cntx context.Context, device string, port string, pkt gopacket.Packet) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530958 // First get the layers of interest
959 dot1QLayer := pkt.Layer(layers.LayerTypeDot1Q)
960
961 if dot1QLayer == nil {
962 logger.Error(ctx, "Igmp Packet Received without Vlan - Dropping pkt")
963 return
964 }
965 dot1Q := dot1QLayer.(*layers.Dot1Q)
966 pktVlan := of.VlanType(dot1Q.VLANIdentifier)
967 igmpv3 := pkt.Layer(layers.LayerTypeIGMP).(*layers.IGMP)
968
969 ponPortID := va.GetPonPortID(device, port)
970
971 var vpv *VoltPortVnet
972 logger.Debugw(ctx, "Received IGMPv3 Type", log.Fields{"Type": igmpv3.Type})
973
974 if igmpv3.Type == layers.IGMPMembershipReportV3 {
975 // This is a report coming from the PON. We must be able to first find the
976 // subscriber from the VLAN tag and port and verify if the IGMP proxy is
977 // enabled for the subscriber
978 vpv, _ = va.GetVnetFromPkt(device, port, pkt)
979 if vpv == nil {
980 logger.Errorw(ctx, "Couldn't find VNET associated with port", log.Fields{"Port": port})
981 return
982 } else if !vpv.IgmpEnabled {
983 logger.Errorw(ctx, "IGMP is not activated on the port", log.Fields{"Port": port})
984 return
985 }
986 mvp := va.GetMvlanProfileByName(vpv.MvlanProfileName)
987 if mvp == nil {
988 logger.Errorw(ctx, "Igmp Packet received for Subscriber with Missing Mvlan Profile",
989 log.Fields{"Receiver": vpv.Port, "MvlanProfile": vpv.MvlanProfileName})
990 return
991 }
992 mvp.mvpLock.RLock()
993 defer mvp.mvpLock.RUnlock()
994 mvlan := mvp.Mvlan
995
996 for _, group := range igmpv3.GroupRecords {
997
998 isJoin := isIgmpJoin(group.Type, group.SourceAddresses)
999 // The subscriber is validated and now process the IGMP report
1000 ig := va.GetIgmpGroup(mvlan, group.MulticastAddress)
1001 if isJoin {
1002 if yes := va.IsMaxChannelsCountExceeded(device, port, ponPortID, ig, group.MulticastAddress, mvp); yes {
1003 logger.Warnw(ctx, "Dropping IGMP Join v3: Active channel threshold exceeded",
1004 log.Fields{"PonPortID": ponPortID, "Addr": group.MulticastAddress, "MvlanProfile": vpv.MvlanProfileName})
1005
1006 return
1007 }
1008 if ig != nil {
1009 // If the IGMP group is already created. just add the receiver
1010 logger.Infow(ctx, "IGMP Join received for existing group", log.Fields{"Addr": group.MulticastAddress, "Port": port})
1011 ig.IgmpGroupLock.Lock()
1012 // Check for port state to avoid race condition where PortDown event
1013 // acquired lock before packet processing
1014 vd := GetApplication().GetDevice(device)
1015 vp := vd.GetPort(port)
1016 if vp == nil || vp.State != PortStateUp {
1017 logger.Warnw(ctx, "Join received from a Port that is DOWN or not present",
1018 log.Fields{"Port": port})
1019 ig.IgmpGroupLock.Unlock()
1020 return
1021 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301022 ig.AddReceiver(cntx, device, port, group.MulticastAddress, &group, IgmpVersion3,
Naveen Sampath04696f72022-06-13 15:19:14 +05301023 dot1Q.VLANIdentifier, dot1Q.Priority, ponPortID)
1024 ig.IgmpGroupLock.Unlock()
1025 } else {
1026 // Create the IGMP group and then add the receiver to the group
1027 logger.Infow(ctx, "IGMP Join received for new group", log.Fields{"Addr": group.MulticastAddress, "Port": port})
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301028 if ig := va.AddIgmpGroup(cntx, vpv.MvlanProfileName, group.MulticastAddress, device); ig != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +05301029 ig.IgmpGroupLock.Lock()
1030 // Check for port state to avoid race condition where PortDown event
1031 // acquired lock before packet processing
1032 vd := GetApplication().GetDevice(device)
1033 vp := vd.GetPort(port)
1034 if vp == nil || vp.State != PortStateUp {
1035 logger.Warnw(ctx, "Join received from a Port that is DOWN or not present",
1036 log.Fields{"Port": port})
1037 ig.IgmpGroupLock.Unlock()
1038 return
1039 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301040 ig.AddReceiver(cntx, device, port, group.MulticastAddress, &group, IgmpVersion3,
Naveen Sampath04696f72022-06-13 15:19:14 +05301041 dot1Q.VLANIdentifier, dot1Q.Priority, ponPortID)
1042 ig.IgmpGroupLock.Unlock()
1043 } else {
1044 logger.Warnw(ctx, "IGMP Group Creation Failed", log.Fields{"Addr": group.MulticastAddress})
1045 }
1046 }
1047 } else if ig != nil {
1048 logger.Infow(ctx, "IGMP Leave received for existing group", log.Fields{"Addr": group.MulticastAddress, "Port": port})
1049 ig.IgmpGroupLock.Lock()
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301050 ig.DelReceiver(cntx, device, port, group.MulticastAddress, &group, ponPortID)
Naveen Sampath04696f72022-06-13 15:19:14 +05301051 ig.IgmpGroupLock.Unlock()
1052 if ig.NumDevicesActive() == 0 {
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301053 va.DelIgmpGroup(cntx, ig)
Naveen Sampath04696f72022-06-13 15:19:14 +05301054 }
1055 } else {
1056 logger.Warnw(ctx, "IGMP Leave received for unknown group", log.Fields{"Addr": group.MulticastAddress})
1057 }
1058 }
1059 } else {
1060 // This must be a query on the NNI port. However, we dont make that assumption.
1061 // Need to look for the IGMP group based on the VLAN in the packet as
1062 // the MVLAN
1063
1064 //Check if mvlan profile exist for the incoming pkt vlan
1065 profile, _ := va.MvlanProfilesByTag.Load(pktVlan)
1066 if profile == nil {
1067 logger.Errorw(ctx, "Mvlan Profile not found for incoming packet. Dropping Request", log.Fields{"Mvlan": pktVlan})
1068 return
1069 }
1070 mvp := profile.(*MvlanProfile)
1071 mvp.mvpLock.RLock()
1072 defer mvp.mvpLock.RUnlock()
1073
1074 if net.ParseIP("0.0.0.0").Equal(igmpv3.GroupAddress) {
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301075 va.processIgmpQueries(cntx, device, pktVlan, IgmpVersion3)
Naveen Sampath04696f72022-06-13 15:19:14 +05301076 } else {
1077 if ig := va.GetIgmpGroup(pktVlan, igmpv3.GroupAddress); ig != nil {
1078 ig.IgmpGroupLock.Lock()
1079 igd, ok := ig.Devices[device]
1080 if ok {
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301081 igd.ProcessQuery(cntx, igmpv3.GroupAddress, IgmpVersion3)
Naveen Sampath04696f72022-06-13 15:19:14 +05301082 } else {
1083 logger.Warnw(ctx, "IGMP Device not found", log.Fields{"Device": device, "Group": igmpv3.GroupAddress})
1084 }
1085 ig.IgmpGroupLock.Unlock()
1086 }
1087 }
1088 }
1089}
1090
1091// processIgmpQueries to process the igmp queries
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301092func (va *VoltApplication) processIgmpQueries(cntx context.Context, device string, pktVlan of.VlanType, version uint8) {
Naveen Sampath04696f72022-06-13 15:19:14 +05301093 // This is a generic query and respond with all the groups channels in currently being viewed.
1094 processquery := func(key interface{}, value interface{}) bool {
1095 ig := value.(*IgmpGroup)
1096 ig.IgmpGroupLock.Lock()
1097 if ig.Mvlan != pktVlan {
1098 ig.IgmpGroupLock.Unlock()
1099 return true
1100 }
1101 igd, ok := ig.Devices[device]
1102 if !ok {
1103 logger.Warnw(ctx, "IGMP Device not found", log.Fields{"Device": device})
1104 ig.IgmpGroupLock.Unlock()
1105 return true
1106 }
1107 processQueryForEachChannel := func(key interface{}, value interface{}) bool {
1108 groupAddr := key.(string)
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301109 igd.ProcessQuery(cntx, net.ParseIP(groupAddr), version)
Naveen Sampath04696f72022-06-13 15:19:14 +05301110 return true
1111 }
1112 igd.GroupChannels.Range(processQueryForEachChannel)
1113 ig.IgmpGroupLock.Unlock()
1114 return true
1115 }
1116 va.IgmpGroups.Range(processquery)
1117}
1118
1119// isIgmpJoin to check if it is igmp join
1120func isIgmpJoin(recordType layers.IGMPv3GroupRecordType, sourceAddr []net.IP) bool {
1121 var join = false
1122
1123 if (layers.IGMPToEx == recordType) || (layers.IGMPIsEx == recordType) {
1124 join = true
1125 } else if layers.IGMPBlock == recordType {
1126 if len(sourceAddr) == 0 {
1127 join = true
1128 }
1129 } else if (layers.IGMPToIn == recordType) || (layers.IGMPIsIn == recordType) || (layers.IGMPAllow == recordType) {
1130 if len(sourceAddr) != 0 {
1131 join = true
1132 }
1133 }
1134 return join
1135}
1136
1137func isIncl(recordType layers.IGMPv3GroupRecordType) bool {
1138
1139 if (layers.IGMPToIn == recordType) || (layers.IGMPIsIn == recordType) || (layers.IGMPAllow == recordType) {
1140 return true
1141 }
1142 return false
1143}
1144
1145// IgmpProcessPkt to process the IGMP packet received. The packet received brings along with it
1146// the port on which the packet is received and the device the port is in.
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301147func (va *VoltApplication) IgmpProcessPkt(cntx context.Context, device string, port string, pkt gopacket.Packet) {
Naveen Sampath04696f72022-06-13 15:19:14 +05301148 igmpl := pkt.Layer(layers.LayerTypeIGMP)
1149 if igmpl == nil {
1150 logger.Error(ctx, "Invalid IGMP packet arrived as IGMP packet")
1151 return
1152 }
1153 if igmp, ok := igmpl.(*layers.IGMPv1or2); ok {
1154 // This is an IGMPv2 packet.
1155 logger.Debugw(ctx, "IGMPv2 Packet Received", log.Fields{"IPAddr": igmp.GroupAddress})
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301156 va.ProcessIgmpv2Pkt(cntx, device, port, pkt)
Naveen Sampath04696f72022-06-13 15:19:14 +05301157 return
1158 }
1159 if igmpv3, ok := igmpl.(*layers.IGMP); ok {
1160 logger.Debugw(ctx, "IGMPv3 Packet Received", log.Fields{"NumOfGroups": igmpv3.NumberOfGroupRecords})
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301161 va.ProcessIgmpv3Pkt(cntx, device, port, pkt)
Naveen Sampath04696f72022-06-13 15:19:14 +05301162 }
1163}
1164
1165// IgmpPacketInd for igmp packet indication
1166func (va *VoltApplication) IgmpPacketInd(device string, port string, pkt gopacket.Packet) {
1167 pt := NewIgmpPacketTask(device, port, pkt)
1168 va.IgmpTasks.AddTask(pt)
1169}
1170
Naveen Sampath04696f72022-06-13 15:19:14 +05301171// storeMvlansMap to store mvlan map
1172func (va *VoltApplication) storeMvlansMap(mvlan of.VlanType, name string, mvp *MvlanProfile) {
1173 va.MvlanProfilesByTag.Store(mvlan, mvp)
1174 va.MvlanProfilesByName.Store(name, mvp)
1175}
1176
1177// deleteMvlansMap to delete mvlan map
1178func (va *VoltApplication) deleteMvlansMap(mvlan of.VlanType, name string) {
1179 va.MvlanProfilesByTag.Delete(mvlan)
1180 va.MvlanProfilesByName.Delete(name)
1181}
1182
1183// RestoreMvlansFromDb to read from the DB and restore all the MVLANs
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301184func (va *VoltApplication) RestoreMvlansFromDb(cntx context.Context) {
1185 mvlans, _ := db.GetMvlans(cntx)
Naveen Sampath04696f72022-06-13 15:19:14 +05301186 for _, mvlan := range mvlans {
1187 b, ok := mvlan.Value.([]byte)
1188 if !ok {
1189 logger.Warn(ctx, "The value type is not []byte")
1190 continue
1191 }
1192 var mvp MvlanProfile
1193 err := json.Unmarshal(b, &mvp)
1194 if err != nil {
1195 logger.Warn(ctx, "Unmarshal of MVLAN failed")
1196 continue
1197 }
1198 va.storeMvlansMap(mvp.Mvlan, mvp.Name, &mvp)
1199
1200 for srNo := range mvp.DevicesList {
1201 if mvp.IgmpServVersion[srNo] == nil {
1202 servVersion := IgmpVersion0
1203 mvp.IgmpServVersion[srNo] = &servVersion
1204 }
1205 }
1206 logger.Infow(ctx, "Restored Mvlan Profile", log.Fields{"MVPName": mvp.Name})
1207 }
1208}
1209
1210// GetMvlanProfileByTag fetches MVLAN profile based on the MC VLAN
1211func (va *VoltApplication) GetMvlanProfileByTag(vlan of.VlanType) *MvlanProfile {
1212 if mvp, ok := va.MvlanProfilesByTag.Load(vlan); ok {
1213 return mvp.(*MvlanProfile)
1214 }
1215 return nil
1216}
1217
1218// GetMvlanProfileByName fetches MVLAN profile based on the profile name.
1219func (va *VoltApplication) GetMvlanProfileByName(name string) *MvlanProfile {
1220 if mvp, ok := va.MvlanProfilesByName.Load(name); ok {
1221 return mvp.(*MvlanProfile)
1222 }
1223 return nil
1224}
1225
1226//UpdateMvlanProfile - only channel groups be updated
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301227func (va *VoltApplication) UpdateMvlanProfile(cntx context.Context, name string, vlan of.VlanType, groups map[string][]string, activeChannelCount int, proxy map[string]common.MulticastGroupProxy) error {
Naveen Sampath04696f72022-06-13 15:19:14 +05301228
1229 mvpIntf, ok := va.MvlanProfilesByName.Load(name)
1230 if !ok {
1231 logger.Error(ctx, "Update Mvlan Failed: Profile does not exist")
1232 return errors.New("MVLAN profile not found")
1233 }
1234 mvp := mvpIntf.(*MvlanProfile)
1235 // check if groups are same then just update the OLTSerial numbers, push the config on new serial numbers
1236
1237 existingGroup := mvp.Groups
1238 existingProxy := mvp.Proxy
1239 mvp.Groups = make(map[string]*MvlanGroup)
1240 mvp.Proxy = make(map[string]*MCGroupProxy)
1241
1242 /* Need to protect groups and proxy write lock */
1243 mvp.mvpLock.Lock()
1244 for grpName, grpIPList := range groups {
1245 mvp.AddMvlanGroup(grpName, grpIPList)
1246 }
1247 for grpName, proxyInfo := range proxy {
1248 mvp.AddMvlanProxy(grpName, proxyInfo)
1249 }
1250 if _, ok := mvp.Groups[common.StaticGroup]; ok {
1251 if _, yes := mvp.Proxy[common.StaticGroup]; !yes {
1252 mvp.Groups[common.StaticGroup].IsStatic = true
1253 }
1254 }
1255 prevMaxActiveChannels := mvp.MaxActiveChannels
1256 if reflect.DeepEqual(mvp.Groups, existingGroup) && reflect.DeepEqual(mvp.Proxy, existingProxy) {
1257 logger.Info(ctx, "No change in groups config")
1258 if uint32(activeChannelCount) != mvp.MaxActiveChannels {
1259 mvp.MaxActiveChannels = uint32(activeChannelCount)
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301260 if err := mvp.WriteToDb(cntx); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +05301261 logger.Errorw(ctx, "Mvlan profile Write to DB failed", log.Fields{"ProfileName": mvp.Name})
1262 }
1263 if prevMaxActiveChannels != mvp.MaxActiveChannels {
1264 mvp.UpdateActiveChannelSubscriberAlarm()
1265 }
1266 }
1267 mvp.mvpLock.Unlock()
1268 return nil
1269 }
1270 mvp.mvpLock.Unlock()
1271 mvp.MaxActiveChannels = uint32(activeChannelCount)
1272
1273 // Status is maintained so that in the event of any crash or reboot during update,
1274 // the recovery is possible once the pod is UP again
1275 mvp.SetUpdateStatus("", UpdateInProgress)
1276 mvp.oldGroups = existingGroup
1277 mvp.oldProxy = existingProxy
1278 va.storeMvlansMap(vlan, name, mvp)
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301279 if err := mvp.WriteToDb(cntx); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +05301280 logger.Errorw(ctx, "Mvlan profile Write to DB failed", log.Fields{"ProfileName": mvp.Name})
1281 }
1282 if prevMaxActiveChannels != mvp.MaxActiveChannels {
1283 mvp.UpdateActiveChannelSubscriberAlarm()
1284 }
1285
1286 // The update task is added as part of Igm p task list, so that any parallel igmp pkt processing is avoided
1287 // Until, the update operation is completed, the igmp pkt processing will be enqueued
1288 updateTask := NewUpdateMvlanTask(mvp, "")
1289 va.IgmpTasks.AddTask(updateTask)
1290 return nil
1291}
1292
1293// isDeviceInList to check if device is the list
1294func isDeviceInList(serialNum string, OLTSerialNums []string) bool {
1295 for _, oltSerialNum := range OLTSerialNums {
1296 if serialNum == oltSerialNum {
1297 return true
1298 }
1299 }
1300 return false
1301}
1302
1303// McastConfigKey creates the key using the olt serial number and mvlan profile id
1304func McastConfigKey(oltSerialNum string, mvlanProfID string) string {
1305 return oltSerialNum + "_" + mvlanProfID
1306}
1307
1308// GetMcastConfig to get McastConfig Information by OLT and Mvlan Profile ID
1309func (va *VoltApplication) GetMcastConfig(oltSerialNum string, mvlanProfID string) *McastConfig {
1310 if mc, ok := va.McastConfigMap.Load(McastConfigKey(oltSerialNum, mvlanProfID)); ok {
1311 return mc.(*McastConfig)
1312 }
1313 return nil
1314}
1315
1316func (va *VoltApplication) storeMcastConfig(oltSerialNum string, mvlanProfID string, mcastConfig *McastConfig) {
1317 va.McastConfigMap.Store(McastConfigKey(oltSerialNum, mvlanProfID), mcastConfig)
1318}
1319
1320func (va *VoltApplication) deleteMcastConfig(oltSerialNum string, mvlanProfID string) {
1321 va.McastConfigMap.Delete(McastConfigKey(oltSerialNum, mvlanProfID))
1322}
1323
1324// AddMcastConfig for addition of a MVLAN profile
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301325func (va *VoltApplication) AddMcastConfig(cntx context.Context, MvlanProfileID string, IgmpProfileID string, IgmpProxyIP string, OltSerialNum string) error {
Naveen Sampath04696f72022-06-13 15:19:14 +05301326 var mcastCfg *McastConfig
1327
1328 mcastCfg = va.GetMcastConfig(OltSerialNum, MvlanProfileID)
1329 if mcastCfg == nil {
1330 mcastCfg = &McastConfig{}
1331 } else {
1332 logger.Debugw(ctx, "Mcast Config already exists", log.Fields{"OltSerialNum": mcastCfg.OltSerialNum,
1333 "MVLAN Profile ID": mcastCfg.MvlanProfileID})
1334 }
1335
1336 // Update all igds available
1337 mvpIntf, ok := va.MvlanProfilesByName.Load(MvlanProfileID)
1338 if !ok {
1339 return errors.New("MVLAN profile not found during add mcast config")
1340 }
1341 mvlan := mvpIntf.(*MvlanProfile).Mvlan
1342
1343 mcastCfg.OltSerialNum = OltSerialNum
1344 mcastCfg.MvlanProfileID = MvlanProfileID
1345 mcastCfg.IgmpProfileID = IgmpProfileID
1346 mcastCfg.IgmpProxyIP = net.ParseIP(IgmpProxyIP)
1347
1348 proxyCfg := va.getIgmpProfileMap(IgmpProfileID)
1349
1350 iterIgmpGroups := func(key interface{}, value interface{}) bool {
1351 ig := value.(*IgmpGroup)
1352 if ig.Mvlan != mvlan {
1353 return true
1354 }
1355
1356 for _, igd := range ig.Devices {
1357 if igd.SerialNo != OltSerialNum {
1358 continue
1359 }
1360 igd.proxyCfg = proxyCfg
1361 if IgmpProfileID == "" {
1362 igd.IgmpProxyIP = &igd.proxyCfg.IgmpSourceIP
1363 } else {
1364 igd.IgmpProxyIP = &mcastCfg.IgmpProxyIP
1365 }
1366 mcastCfg.IgmpGroupDevices.Store(igd.GroupID, igd)
1367 logger.Debugw(ctx, "Igd updated with proxyCfg and proxyIP", log.Fields{"name": igd.GroupName,
1368 "IgmpProfileID": IgmpProfileID, "ProxyIP": mcastCfg.IgmpProxyIP})
1369 }
1370 return true
1371 }
1372 va.IgmpGroups.Range(iterIgmpGroups)
1373
1374 va.storeMcastConfig(OltSerialNum, MvlanProfileID, mcastCfg)
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301375 if err := mcastCfg.WriteToDb(cntx); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +05301376 logger.Errorw(ctx, "McastConfig Write to DB failed", log.Fields{"OltSerialNum": mcastCfg.OltSerialNum, "MvlanProfileID": mcastCfg.MvlanProfileID})
1377 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301378 va.addOltToMvlan(cntx, MvlanProfileID, OltSerialNum)
Naveen Sampath04696f72022-06-13 15:19:14 +05301379
1380 return nil
1381}
1382
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301383func (va *VoltApplication) addOltToMvlan(cntx context.Context, MvlanProfileID string, OltSerialNum string) {
Naveen Sampath04696f72022-06-13 15:19:14 +05301384 var mvp *MvlanProfile
1385 if mvpIntf, ok := va.MvlanProfilesByName.Load(MvlanProfileID); ok {
1386 servVersion := IgmpVersion0
1387 mvp = mvpIntf.(*MvlanProfile)
1388 mvp.DevicesList[OltSerialNum] = NoOp
1389 mvp.IgmpServVersion[OltSerialNum] = &servVersion
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301390 if err := mvp.WriteToDb(cntx); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +05301391 logger.Errorw(ctx, "Mvlan profile Write to DB failed", log.Fields{"ProfileName": mvp.Name})
1392 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301393 mvp.pushIgmpMcastFlows(cntx, OltSerialNum)
Naveen Sampath04696f72022-06-13 15:19:14 +05301394 }
1395}
1396
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301397func (va *VoltApplication) delOltFromMvlan(cntx context.Context, MvlanProfileID string, OltSerialNum string) {
Naveen Sampath04696f72022-06-13 15:19:14 +05301398 var mvp *MvlanProfile
1399 if mvpIntf, ok := va.MvlanProfilesByName.Load(MvlanProfileID); ok {
1400 mvp = mvpIntf.(*MvlanProfile)
1401 //Delete from mvp list
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301402 mvp.removeIgmpMcastFlows(cntx, OltSerialNum)
Naveen Sampath04696f72022-06-13 15:19:14 +05301403 delete(mvp.DevicesList, OltSerialNum)
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301404 if err := mvp.WriteToDb(cntx); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +05301405 logger.Errorw(ctx, "Mvlan profile Write to DB failed", log.Fields{"ProfileName": mvp.Name})
1406 }
1407 }
1408}
1409
1410// DelMcastConfig for addition of a MVLAN profile
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301411func (va *VoltApplication) DelMcastConfig(cntx context.Context, MvlanProfileID string, IgmpProfileID string, IgmpProxyIP string, OltSerialNum string) {
Naveen Sampath04696f72022-06-13 15:19:14 +05301412
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301413 va.delOltFromMvlan(cntx, MvlanProfileID, OltSerialNum)
Naveen Sampath04696f72022-06-13 15:19:14 +05301414 va.deleteMcastConfig(OltSerialNum, MvlanProfileID)
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301415 _ = db.DelMcastConfig(cntx, McastConfigKey(OltSerialNum, MvlanProfileID))
Naveen Sampath04696f72022-06-13 15:19:14 +05301416 if d := va.GetDeviceBySerialNo(OltSerialNum); d != nil {
1417 if mvp := va.GetMvlanProfileByName(MvlanProfileID); mvp != nil {
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301418 va.RemoveGroupsFromPendingPool(cntx, d.Name, mvp.Mvlan)
Naveen Sampath04696f72022-06-13 15:19:14 +05301419 }
1420 }
1421}
1422
1423// DelAllMcastConfig for deletion of all mcast config
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301424func (va *VoltApplication) DelAllMcastConfig(cntx context.Context, OltSerialNum string) error {
Naveen Sampath04696f72022-06-13 15:19:14 +05301425
1426 deleteIndividualMcastConfig := func(key interface{}, value interface{}) bool {
1427 mcastCfg := value.(*McastConfig)
1428 if mcastCfg.OltSerialNum == OltSerialNum {
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301429 va.DelMcastConfig(cntx, mcastCfg.MvlanProfileID, mcastCfg.IgmpProfileID, mcastCfg.IgmpProxyIP.String(), mcastCfg.OltSerialNum)
Naveen Sampath04696f72022-06-13 15:19:14 +05301430 }
1431 return true
1432 }
1433 va.McastConfigMap.Range(deleteIndividualMcastConfig)
1434 return nil
1435}
1436
1437// UpdateMcastConfig for addition of a MVLAN profile
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301438func (va *VoltApplication) UpdateMcastConfig(cntx context.Context, MvlanProfileID string, IgmpProfileID string, IgmpProxyIP string, OltSerialNum string) error {
Naveen Sampath04696f72022-06-13 15:19:14 +05301439
1440 mcastCfg := va.GetMcastConfig(OltSerialNum, MvlanProfileID)
1441 if mcastCfg == nil {
1442 logger.Warnw(ctx, "Mcast Config not found. Unable to update", log.Fields{"Mvlan Profile ID": MvlanProfileID, "OltSerialNum": OltSerialNum})
1443 return nil
1444 }
1445
1446 oldProfID := mcastCfg.IgmpProfileID
1447 mcastCfg.IgmpProfileID = IgmpProfileID
1448 mcastCfg.IgmpProxyIP = net.ParseIP(IgmpProxyIP)
1449
1450 va.storeMcastConfig(OltSerialNum, MvlanProfileID, mcastCfg)
1451
1452 // Update all igds
1453 if oldProfID != mcastCfg.IgmpProfileID {
1454 updateIgdProxyCfg := func(key interface{}, value interface{}) bool {
1455 igd := value.(*IgmpGroupDevice)
1456 igd.proxyCfg = va.getIgmpProfileMap(mcastCfg.IgmpProfileID)
1457 if IgmpProfileID == "" {
1458 igd.IgmpProxyIP = &igd.proxyCfg.IgmpSourceIP
1459 } else {
1460 igd.IgmpProxyIP = &mcastCfg.IgmpProxyIP
1461 }
1462 return true
1463 }
1464 mcastCfg.IgmpGroupDevices.Range(updateIgdProxyCfg)
1465 }
1466
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301467 if err := mcastCfg.WriteToDb(cntx); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +05301468 logger.Errorw(ctx, "McastConfig Write to DB failed", log.Fields{"OltSerialNum": mcastCfg.OltSerialNum, "MvlanProfileID": mcastCfg.MvlanProfileID})
1469 }
1470
1471 return nil
1472}
1473
1474// WriteToDb is utility to write Mcast config Info to database
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301475func (mc *McastConfig) WriteToDb(cntx context.Context) error {
Naveen Sampath04696f72022-06-13 15:19:14 +05301476 mc.Version = database.PresentVersionMap[database.McastConfigPath]
1477 b, err := json.Marshal(mc)
1478 if err != nil {
1479 return err
1480 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301481 if err1 := db.PutMcastConfig(cntx, McastConfigKey(mc.OltSerialNum, mc.MvlanProfileID), string(b)); err1 != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +05301482 return err1
1483 }
1484 return nil
1485}
1486
1487// RestoreMcastConfigsFromDb to read from the DB and restore Mcast configs
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301488func (va *VoltApplication) RestoreMcastConfigsFromDb(cntx context.Context) {
1489 mcastConfigs, _ := db.GetMcastConfigs(cntx)
Naveen Sampath04696f72022-06-13 15:19:14 +05301490 for hash, mcastConfig := range mcastConfigs {
1491 b, ok := mcastConfig.Value.([]byte)
1492 if !ok {
1493 logger.Warn(ctx, "The value type is not []byte")
1494 continue
1495 }
1496 var mc McastConfig
1497 err := json.Unmarshal(b, &mc)
1498 if err != nil {
1499 logger.Warn(ctx, "Unmarshal of Mcast config failed")
1500 continue
1501 }
1502 va.storeMcastConfig(mc.OltSerialNum, mc.MvlanProfileID, &mc)
1503 logger.Infow(ctx, "Restored Mcast config", log.Fields{"OltSerialNum": mc.OltSerialNum, "MvlanProfileID": mc.MvlanProfileID, "hash": hash})
1504 }
1505}
1506
1507// AddMvlanProfile for addition of a MVLAN profile
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301508func (va *VoltApplication) AddMvlanProfile(cntx context.Context, name string, mvlan of.VlanType, ponVlan of.VlanType,
Naveen Sampath04696f72022-06-13 15:19:14 +05301509 groups map[string][]string, isChannelBasedGroup bool, OLTSerialNum []string, activeChannelsPerPon int, proxy map[string]common.MulticastGroupProxy) error {
1510 var mvp *MvlanProfile
1511
1512 if mvp = va.GetMvlanProfileByTag(mvlan); mvp != nil {
1513 logger.Errorw(ctx, "Duplicate MVLAN ID configured", log.Fields{"mvlan": mvlan})
1514 return errors.New("MVLAN profile with same VLANID exists")
1515 }
1516 if mvpIntf, ok := va.MvlanProfilesByName.Load(name); ok {
1517 mvp = mvpIntf.(*MvlanProfile)
1518 for _, serialNum := range OLTSerialNum {
1519 if mvp.DevicesList[serialNum] != Nil {
1520 //This is backup restore scenario, just update the profile
1521 logger.Info(ctx, "Add Mvlan : Profile Name already exists, update-the-profile")
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301522 return va.UpdateMvlanProfile(cntx, name, mvlan, groups, activeChannelsPerPon, proxy)
Naveen Sampath04696f72022-06-13 15:19:14 +05301523 }
1524 }
1525 }
1526
1527 if mvp == nil {
1528 mvp = NewMvlanProfile(name, mvlan, ponVlan, isChannelBasedGroup, OLTSerialNum, uint32(activeChannelsPerPon))
1529 }
1530
1531 va.storeMvlansMap(mvlan, name, mvp)
1532
1533 /* Need to protect groups and proxy write lock */
1534 mvp.mvpLock.Lock()
1535 for grpName, grpInfo := range groups {
1536 mvp.AddMvlanGroup(grpName, grpInfo)
1537 }
1538 for grpName, proxyInfo := range proxy {
1539 mvp.AddMvlanProxy(grpName, proxyInfo)
1540 }
1541 if _, ok := mvp.Groups[common.StaticGroup]; ok {
1542 if _, yes := mvp.Proxy[common.StaticGroup]; !yes {
1543 mvp.Groups[common.StaticGroup].IsStatic = true
1544 }
1545 }
1546
1547 logger.Debugw(ctx, "Added MVLAN Profile", log.Fields{"MVLAN": mvp.Mvlan, "PonVlan": mvp.PonVlan, "Name": mvp.Name, "Grp IPs": mvp.Groups, "IsPonVlanPresent": mvp.IsPonVlanPresent})
1548 mvp.mvpLock.Unlock()
1549
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301550 if err := mvp.WriteToDb(cntx); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +05301551 logger.Errorw(ctx, "Mvlan profile Write to DB failed", log.Fields{"ProfileName": mvp.Name})
1552 }
1553
1554 return nil
1555}
1556
Naveen Sampath04696f72022-06-13 15:19:14 +05301557// GetMvlanProfileForMcIP - Get an MVLAN profile for a given MC IP. This is used when an
1558// IGMP report is received from the PON port. The MVLAN profile
1559// located is used to idnetify the MC VLAN used in upstream for
1560// join/leave
1561func (va *VoltApplication) GetMvlanProfileForMcIP(profileName string, ip net.IP) (*MvlanProfile, string) {
1562 if mvpIntf, ok := va.MvlanProfilesByName.Load(profileName); ok {
1563 mvp := mvpIntf.(*MvlanProfile)
1564 if grpName := mvp.GetMvlanGroup(ip); grpName != "" {
1565 return mvp, grpName
1566 }
1567 } else {
1568 logger.Warnw(ctx, "Mvlan Profile not found for given profile name", log.Fields{"Profile": profileName})
1569 }
1570 return nil, ""
1571}
1572
Naveen Sampath04696f72022-06-13 15:19:14 +05301573// IgmpTick for igmp tick info
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301574func (va *VoltApplication) IgmpTick(cntx context.Context) {
Naveen Sampath04696f72022-06-13 15:19:14 +05301575 tickCount++
1576 if (tickCount % 1000) == 0 {
1577 logger.Debugw(ctx, "Time @ Tick", log.Fields{"Tick": tickCount, "Time": time.Now()})
1578 }
1579 igmptick := func(key interface{}, value interface{}) bool {
1580 ig := value.(*IgmpGroup)
1581 if ig.NumDevicesActive() != 0 {
1582 if tickCount%10 == ig.Hash()%10 {
1583 ig.IgmpGroupLock.Lock()
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301584 ig.Tick(cntx)
Naveen Sampath04696f72022-06-13 15:19:14 +05301585 ig.IgmpGroupLock.Unlock()
1586 if ig.NumDevicesActive() == 0 {
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301587 va.DelIgmpGroup(cntx, ig)
Naveen Sampath04696f72022-06-13 15:19:14 +05301588 }
1589 }
1590 }
1591 return true
1592 }
1593 va.IgmpGroups.Range(igmptick)
1594}
1595
1596// Tick to add Tick Task
1597func (va *VoltApplication) Tick() {
1598 tt := NewTickTask()
1599 va.IgmpTasks.AddTask(tt)
1600 // va.IgmpTick()
1601}
1602
1603//AddIgmpProfile for addition of IGMP Profile
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301604func (va *VoltApplication) AddIgmpProfile(cntx context.Context, igmpProfileConfig *common.IGMPConfig) error {
Naveen Sampath04696f72022-06-13 15:19:14 +05301605 var igmpProfile *IgmpProfile
1606
1607 if igmpProfileConfig.ProfileID == DefaultIgmpProfID {
1608 logger.Info(ctx, "Updating default IGMP profile")
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301609 return va.UpdateIgmpProfile(cntx, igmpProfileConfig)
Naveen Sampath04696f72022-06-13 15:19:14 +05301610 }
1611
1612 igmpProfile = va.checkIgmpProfileMap(igmpProfileConfig.ProfileID)
1613 if igmpProfile == nil {
1614 igmpProfile = newIgmpProfile(igmpProfileConfig)
1615 } else {
1616 logger.Errorw(ctx, "IGMP profile already exists", log.Fields{"IgmpProfile": igmpProfileConfig.ProfileID})
1617 return errors.New("IGMP Profile already exists")
1618 }
1619
1620 va.storeIgmpProfileMap(igmpProfileConfig.ProfileID, igmpProfile)
1621
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301622 if err := igmpProfile.WriteToDb(cntx); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +05301623 logger.Errorw(ctx, "Igmp profile Write to DB failed", log.Fields{"profileID": igmpProfile.ProfileID})
1624 }
1625
1626 return nil
1627}
1628
Naveen Sampath04696f72022-06-13 15:19:14 +05301629// checkIgmpProfileMap to get Igmp Profile. If not found return nil
1630func (va *VoltApplication) checkIgmpProfileMap(name string) *IgmpProfile {
1631 if igmpProfileIntf, ok := va.IgmpProfilesByName.Load(name); ok {
1632 return igmpProfileIntf.(*IgmpProfile)
1633 }
1634 return nil
1635}
1636
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301637func (va *VoltApplication) resetIgmpProfileToDefault(cntx context.Context) {
Naveen Sampath04696f72022-06-13 15:19:14 +05301638 igmpProf := va.getIgmpProfileMap(DefaultIgmpProfID)
1639 defIgmpProf := newDefaultIgmpProfile()
1640
1641 igmpProf.UnsolicitedTimeOut = defIgmpProf.UnsolicitedTimeOut
1642 igmpProf.MaxResp = defIgmpProf.MaxResp
1643 igmpProf.KeepAliveInterval = defIgmpProf.KeepAliveInterval
1644 igmpProf.KeepAliveCount = defIgmpProf.KeepAliveCount
1645 igmpProf.LastQueryInterval = defIgmpProf.LastQueryInterval
1646 igmpProf.LastQueryCount = defIgmpProf.LastQueryCount
1647 igmpProf.FastLeave = defIgmpProf.FastLeave
1648 igmpProf.PeriodicQuery = defIgmpProf.PeriodicQuery
1649 igmpProf.IgmpCos = defIgmpProf.IgmpCos
1650 igmpProf.WithRAUpLink = defIgmpProf.WithRAUpLink
1651 igmpProf.WithRADownLink = defIgmpProf.WithRADownLink
1652 igmpProf.IgmpVerToServer = defIgmpProf.IgmpVerToServer
1653 igmpProf.IgmpSourceIP = defIgmpProf.IgmpSourceIP
1654
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301655 if err := igmpProf.WriteToDb(cntx); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +05301656 logger.Errorw(ctx, "Igmp profile Write to DB failed", log.Fields{"profileID": igmpProf.ProfileID})
1657 }
1658}
1659
1660// getIgmpProfileMap to get Igmp Profile. If not found return default IGMP config
1661func (va *VoltApplication) getIgmpProfileMap(name string) *IgmpProfile {
1662 if igmpProfileIntf, ok := va.IgmpProfilesByName.Load(name); ok {
1663 return igmpProfileIntf.(*IgmpProfile)
1664 }
1665
1666 // There will be always a default igmp profile.
1667 defaultIgmpProfileIntf, _ := va.IgmpProfilesByName.Load(DefaultIgmpProfID)
1668 return defaultIgmpProfileIntf.(*IgmpProfile)
1669}
1670
1671// storeIgmpProfileMap to store Igmp Profile
1672func (va *VoltApplication) storeIgmpProfileMap(name string, igmpProfile *IgmpProfile) {
1673 va.IgmpProfilesByName.Store(name, igmpProfile)
1674}
1675
1676// deleteIgmpProfileMap to delete Igmp Profile
1677func (va *VoltApplication) deleteIgmpProfileMap(name string) {
1678 va.IgmpProfilesByName.Delete(name)
1679}
1680
Naveen Sampath04696f72022-06-13 15:19:14 +05301681//DelIgmpProfile for addition of IGMP Profile
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301682func (va *VoltApplication) DelIgmpProfile(cntx context.Context, igmpProfileConfig *common.IGMPConfig) error {
Naveen Sampath04696f72022-06-13 15:19:14 +05301683 // Deletion of default igmp profile is blocked from submgr. Keeping additional check for safety.
1684 if igmpProfileConfig.ProfileID == DefaultIgmpProfID {
1685 logger.Info(ctx, "Resetting default IGMP profile")
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301686 va.resetIgmpProfileToDefault(cntx)
Naveen Sampath04696f72022-06-13 15:19:14 +05301687 return nil
1688 }
1689 igmpProfile := va.checkIgmpProfileMap(igmpProfileConfig.ProfileID)
1690 if igmpProfile == nil {
1691 logger.Warnw(ctx, "Igmp Profile not found. Unable to delete", log.Fields{"Profile ID": igmpProfileConfig.ProfileID})
1692 return nil
1693 }
1694
1695 va.deleteIgmpProfileMap(igmpProfileConfig.ProfileID)
1696
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301697 _ = db.DelIgmpProfile(cntx, igmpProfileConfig.ProfileID)
Naveen Sampath04696f72022-06-13 15:19:14 +05301698
1699 return nil
1700}
1701
1702//UpdateIgmpProfile for addition of IGMP Profile
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301703func (va *VoltApplication) UpdateIgmpProfile(cntx context.Context, igmpProfileConfig *common.IGMPConfig) error {
Naveen Sampath04696f72022-06-13 15:19:14 +05301704 igmpProfile := va.checkIgmpProfileMap(igmpProfileConfig.ProfileID)
1705 if igmpProfile == nil {
1706 logger.Errorw(ctx, "Igmp Profile not found. Unable to update", log.Fields{"Profile ID": igmpProfileConfig.ProfileID})
1707 return errors.New("IGMP Profile not found")
1708 }
1709
1710 igmpProfile.ProfileID = igmpProfileConfig.ProfileID
1711 igmpProfile.UnsolicitedTimeOut = uint32(igmpProfileConfig.UnsolicitedTimeOut)
1712 igmpProfile.MaxResp = uint32(igmpProfileConfig.MaxResp)
1713
1714 keepAliveInterval := uint32(igmpProfileConfig.KeepAliveInterval)
1715
1716 //KeepAliveInterval should have a min of 10 seconds
1717 if keepAliveInterval < MinKeepAliveInterval {
1718 keepAliveInterval = MinKeepAliveInterval
1719 logger.Infow(ctx, "Auto adjust keepAliveInterval - Value < 10", log.Fields{"Received": igmpProfileConfig.KeepAliveInterval, "Configured": keepAliveInterval})
1720 }
1721 igmpProfile.KeepAliveInterval = keepAliveInterval
1722
1723 igmpProfile.KeepAliveCount = uint32(igmpProfileConfig.KeepAliveCount)
1724 igmpProfile.LastQueryInterval = uint32(igmpProfileConfig.LastQueryInterval)
1725 igmpProfile.LastQueryCount = uint32(igmpProfileConfig.LastQueryCount)
1726 igmpProfile.FastLeave = *igmpProfileConfig.FastLeave
1727 igmpProfile.PeriodicQuery = *igmpProfileConfig.PeriodicQuery
1728 igmpProfile.IgmpCos = uint8(igmpProfileConfig.IgmpCos)
1729 igmpProfile.WithRAUpLink = *igmpProfileConfig.WithRAUpLink
1730 igmpProfile.WithRADownLink = *igmpProfileConfig.WithRADownLink
1731
1732 if igmpProfileConfig.IgmpVerToServer == "2" || igmpProfileConfig.IgmpVerToServer == "v2" {
1733 igmpProfile.IgmpVerToServer = "2"
1734 } else {
1735 igmpProfile.IgmpVerToServer = "3"
1736 }
1737
1738 if igmpProfileConfig.IgmpSourceIP != "" {
1739 igmpProfile.IgmpSourceIP = net.ParseIP(igmpProfileConfig.IgmpSourceIP)
1740 }
1741
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301742 if err := igmpProfile.WriteToDb(cntx); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +05301743 logger.Errorw(ctx, "Igmp profile Write to DB failed", log.Fields{"profileID": igmpProfile.ProfileID})
1744 }
1745
1746 return nil
1747}
1748
1749// RestoreIGMPProfilesFromDb to read from the DB and restore IGMP Profiles
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301750func (va *VoltApplication) RestoreIGMPProfilesFromDb(cntx context.Context) {
Naveen Sampath04696f72022-06-13 15:19:14 +05301751 // Loading IGMP profiles
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301752 igmpProfiles, _ := db.GetIgmpProfiles(cntx)
Naveen Sampath04696f72022-06-13 15:19:14 +05301753 for _, igmpProfile := range igmpProfiles {
1754 b, ok := igmpProfile.Value.([]byte)
1755 if !ok {
1756 logger.Warn(ctx, "The value type is not []byte")
1757 continue
1758 }
1759 var igmpProf IgmpProfile
1760 err := json.Unmarshal(b, &igmpProf)
1761 if err != nil {
1762 logger.Warn(ctx, "Unmarshal of IGMP Profile failed")
1763 continue
1764 }
1765 va.storeIgmpProfileMap(igmpProf.ProfileID, &igmpProf)
1766 logger.Infow(ctx, "Restored Igmp Profile", log.Fields{"Conf": igmpProf})
1767 }
1768}
1769
1770// InitIgmpSrcMac for initialization of igmp source mac
1771func (va *VoltApplication) InitIgmpSrcMac() {
1772 srcMac, err := getPodMacAddr()
1773 if err != nil {
1774 igmpSrcMac = "00:11:11:11:11:11"
1775 return
1776 }
1777 igmpSrcMac = srcMac
1778}
1779
Naveen Sampath04696f72022-06-13 15:19:14 +05301780// DelMvlanProfile for deletion of a MVLAN group
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301781func (va *VoltApplication) DelMvlanProfile(cntx context.Context, name string) error {
Naveen Sampath04696f72022-06-13 15:19:14 +05301782 if mvpIntf, ok := va.MvlanProfilesByName.Load(name); ok {
1783 mvp := mvpIntf.(*MvlanProfile)
1784
1785 if len(mvp.DevicesList) == 0 {
1786 mvp.DeleteInProgress = true
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301787 mvp.DelFromDb(cntx)
Naveen Sampath04696f72022-06-13 15:19:14 +05301788 va.deleteMvlansMap(mvp.Mvlan, name)
1789 logger.Debugw(ctx, "Deleted MVLAN Profile", log.Fields{"Name": mvp.Name})
1790 } else {
1791 logger.Errorw(ctx, "Unable to delete Mvlan Profile as there is still an OLT attached to it", log.Fields{"Name": mvp.Name,
1792 "Device List": mvp.DevicesList})
1793 return errors.New("MVLAN attached to devices")
1794 }
1795
1796 return nil
1797 }
1798 logger.Errorw(ctx, "MVLAN Profile not found", log.Fields{"MvlanProfile Name": name})
1799 return nil
1800}
1801
1802// ReceiverUpInd for receiver up indication
1803func (va *VoltApplication) ReceiverUpInd(device string, port string, mvpName string, vlan of.VlanType, pbits []of.PbitType) {
1804 logger.Infow(ctx, "Receiver Indication: UP", log.Fields{"device": device, "port": port, "MVP": mvpName, "vlan": vlan, "pbits": pbits})
1805 if mvpIntf, ok := va.MvlanProfilesByName.Load(mvpName); ok {
1806 mvp := mvpIntf.(*MvlanProfile)
1807 if devIntf, ok := va.DevicesDisc.Load(device); ok {
1808 dev := devIntf.(*VoltDevice)
1809 proxyCfg, proxyIP, _ := getIgmpProxyCfgAndIP(mvp.Mvlan, dev.SerialNum)
1810 for _, pbit := range pbits {
1811 sendGeneralQuery(device, port, vlan, uint8(pbit), proxyCfg, proxyIP)
1812 }
1813 } else {
1814 logger.Warnw(ctx, "Device not found for given port", log.Fields{"device": device, "port": port})
1815 }
1816 } else {
1817 logger.Warnw(ctx, "Mvlan Profile not found for given profileName", log.Fields{"MVP": mvpName, "vlan": vlan})
1818 }
1819}
1820
1821// sendGeneralQuery to send general query
1822func sendGeneralQuery(device string, port string, cVlan of.VlanType, pbit uint8, proxyCfg *IgmpProfile, proxyIP *net.IP) {
1823
Tinoj Josephcf161be2022-07-07 19:47:47 +05301824 if queryPkt, err := Igmpv2QueryPacket(AllSystemsMulticastGroupIP, cVlan, *proxyIP, pbit, proxyCfg.MaxResp); err == nil {
Naveen Sampath04696f72022-06-13 15:19:14 +05301825 if err := cntlr.GetController().PacketOutReq(device, port, port, queryPkt, false); err != nil {
1826 logger.Warnw(ctx, "General Igmpv2 Query Failed to send", log.Fields{"Device": device, "Port": port, "Packet": queryPkt, "Pbit": pbit})
1827 } else {
1828 logger.Debugw(ctx, "General Igmpv2 Query Sent", log.Fields{"Device": device, "Port": port, "Packet": queryPkt, "Pbit": pbit})
1829 }
1830 }
1831 if getVersion(proxyCfg.IgmpVerToServer) == IgmpVersion3 {
Tinoj Josephcf161be2022-07-07 19:47:47 +05301832 if queryPkt, err := Igmpv3QueryPacket(AllSystemsMulticastGroupIP, cVlan, *proxyIP, pbit, proxyCfg.MaxResp); err == nil {
Naveen Sampath04696f72022-06-13 15:19:14 +05301833 if err := cntlr.GetController().PacketOutReq(device, port, port, queryPkt, false); err != nil {
1834 logger.Warnw(ctx, "General Igmpv3 Query Failed to send", log.Fields{"Device": device, "Port": port, "Packet": queryPkt, "Pbit": pbit})
1835 } else {
1836 logger.Debugw(ctx, "General Igmpv3 Query Sent", log.Fields{"Device": device, "Port": port, "Packet": queryPkt, "Pbit": pbit})
1837 }
1838 }
1839 }
1840}
1841
1842// ReceiverDownInd to send receiver down indication
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301843func (va *VoltApplication) ReceiverDownInd(cntx context.Context, device string, port string) {
Naveen Sampath04696f72022-06-13 15:19:14 +05301844 logger.Infow(ctx, " Receiver Indication: DOWN", log.Fields{"device": device, "port": port})
1845
1846 ponPortID := va.GetPonPortID(device, port)
1847
1848 del := func(key interface{}, value interface{}) bool {
1849 ig := value.(*IgmpGroup)
1850 ig.IgmpGroupLock.Lock()
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301851 ig.DelReceiveronDownInd(cntx, device, port, ponPortID)
Naveen Sampath04696f72022-06-13 15:19:14 +05301852 ig.IgmpGroupLock.Unlock()
1853 if ig.NumDevicesActive() == 0 {
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301854 va.DelIgmpGroup(cntx, ig)
Naveen Sampath04696f72022-06-13 15:19:14 +05301855 }
1856 return true
1857 }
1858 va.IgmpGroups.Range(del)
1859}