blob: 32eed0dc601287e08f84e8375dc17f2b46100e4f [file] [log] [blame]
Naveen Sampath04696f72022-06-13 15:19:14 +05301/*
2* Copyright 2022-present Open Networking Foundation
3* Licensed under the Apache License, Version 2.0 (the "License");
4* you may not use this file except in compliance with the License.
5* You may obtain a copy of the License at
6*
7* http://www.apache.org/licenses/LICENSE-2.0
8*
9* Unless required by applicable law or agreed to in writing, software
10* distributed under the License is distributed on an "AS IS" BASIS,
11* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12* See the License for the specific language governing permissions and
13* limitations under the License.
14*/
15
16package application
17
18import (
19 "encoding/json"
20 "errors"
21 "net"
22 "reflect"
23 "voltha-go-controller/internal/pkg/types"
Naveen Sampath04696f72022-06-13 15:19:14 +053024 "strings"
25 "sync"
26 "time"
27
28 "github.com/google/gopacket"
29 "github.com/google/gopacket/layers"
30
31 cntlr "voltha-go-controller/internal/pkg/controller"
32 "voltha-go-controller/database"
33 "voltha-go-controller/internal/pkg/of"
Tinoj Joseph1d108322022-07-13 10:07:39 +053034 "voltha-go-controller/log"
Naveen Sampath04696f72022-06-13 15:19:14 +053035)
36
37const (
38 // IgmpVersion0 constant (Default init value)
39 IgmpVersion0 uint8 = 0
40 // IgmpVersion1 constant
41 IgmpVersion1 uint8 = 1
42 // IgmpVersion2 constant
43 IgmpVersion2 uint8 = 2
44 // IgmpVersion3 constant
45 IgmpVersion3 uint8 = 3
46 // MinKeepAliveInterval constant
47 MinKeepAliveInterval uint32 = 10
48 // MaxDiffKAIntervalResp constant
49 MaxDiffKAIntervalResp uint32 = 5
50 // StaticGroup constant
51 StaticGroup string = "static"
52 // DynamicGroup constant
53 DynamicGroup string = "dynamic"
54 // StaticPort constant
55 StaticPort string = "static_port"
56 // DefaultIgmpProfID constant
57 DefaultIgmpProfID = ""
58 //GroupExpiryTime - group expiry time in minutes
59 GroupExpiryTime uint32 = 15
60)
61
62const (
63 // JoinUnsuccessful constant
64 JoinUnsuccessful string = "JOIN-UNSUCCESSFUL"
65 // JoinUnsuccessfulExceededIGMPChanel constant
66 JoinUnsuccessfulExceededIGMPChanel string = "Exceeded subscriber or PON port IGMP channels threshold"
67 // JoinUnsuccessfulAddFlowGroupFailed constant
68 JoinUnsuccessfulAddFlowGroupFailed string = "Failed to add flow or group for a channel"
69 // JoinUnsuccessfulGroupNotConfigured constant
70 JoinUnsuccessfulGroupNotConfigured string = "Join received from a subscriber on non-configured group"
71 // JoinUnsuccessfulVlanDisabled constant
72 JoinUnsuccessfulVlanDisabled string = "Vlan is disabled"
73 // JoinUnsuccessfulDescription constant
74 JoinUnsuccessfulDescription string = "igmp join unsuccessful"
75 // QueryExpired constant
76 QueryExpired string = "QUERY-EXPIRED"
77 // QueryExpiredGroupSpecific constant
78 QueryExpiredGroupSpecific string = "Group specific multicast query expired"
79 // QueryExpiredDescription constant
80 QueryExpiredDescription string = "igmp query expired"
81)
82
Naveen Sampath04696f72022-06-13 15:19:14 +053083// McastConfig structure
84type McastConfig struct {
85 OltSerialNum string
86 MvlanProfileID string
87 IgmpProfileID string
88 IgmpProxyIP net.IP
89 OperState OperInProgress
90 Version string
91 // This map will help in updating the igds whenever there is a igmp profile id update
92 IgmpGroupDevices sync.Map `json:"-"` // Key is group id
93}
94
95var (
96 // NullIPAddr is null ip address var
97 NullIPAddr = net.ParseIP("0.0.0.0")
Tinoj Josephcf161be2022-07-07 19:47:47 +053098 // AllSystemsMulticastGroupIP
99 AllSystemsMulticastGroupIP = net.ParseIP("224.0.0.1")
Naveen Sampath04696f72022-06-13 15:19:14 +0530100 // igmpSrcMac for the proxy
101 igmpSrcMac string
102)
103
104func init() {
105 RegisterPacketHandler(IGMP, ProcessIgmpPacket)
106}
107
108// ProcessIgmpPacket : CallBack function registered with application to handle IGMP packetIn
109func ProcessIgmpPacket(device string, port string, pkt gopacket.Packet) {
110 GetApplication().IgmpPacketInd(device, port, pkt)
111}
112
113func ipv4ToUint(ip net.IP) uint32 {
114 result := uint32(0)
115 addr := ip.To4()
116 if addr == nil {
117 logger.Warnw(ctx, "Invalid Group Addr", log.Fields{"IP": ip})
118 return 0
119 }
120 result = result + uint32(addr[0])<<24
121 result = result + uint32(addr[1])<<16
122 result = result + uint32(addr[2])<<8
123 result = result + uint32(addr[3])
124 return result
125}
126
127func getPodMacAddr() (string, error) {
128 ifas, err := net.Interfaces()
129 if err != nil {
130 return "", err
131 }
132 var ipv4Addr net.IP
133 for _, ifa := range ifas {
134 addrs, err := ifa.Addrs()
135 if err != nil {
136 return "", err
137 }
138 for _, addr := range addrs {
139 if ipv4Addr = addr.(*net.IPNet).IP.To4(); ipv4Addr != nil {
140 if ipv4Addr.IsGlobalUnicast() {
141 logger.Infow(ctx, "Igmp Static config", log.Fields{"MacAddr": ifa.HardwareAddr.String(), "ipAddr": ipv4Addr})
142 return ifa.HardwareAddr.String(), nil
143 }
144 }
145 }
146
147 }
148 return "", errors.New("MAC Address not found,Setting default")
149}
150
151// IgmpUsEthLayer : Layers defined for upstream communication
152// Ethernet layer for upstream communication
153func IgmpUsEthLayer(mcip net.IP) *layers.Ethernet {
154 eth := &layers.Ethernet{}
155 // TODO: Set the source MAC properly and remove hardcoding
156 eth.SrcMAC, _ = net.ParseMAC(igmpSrcMac)
157 eth.DstMAC, _ = net.ParseMAC("01:00:5e:00:00:00")
158 eth.DstMAC[3] = mcip[1] & 0x7f
159 eth.DstMAC[4] = mcip[2]
160 eth.DstMAC[5] = mcip[3]
161 eth.EthernetType = layers.EthernetTypeDot1Q
162 return eth
163}
164
165// IgmpUsDot1qLayer set US VLAN layer
166func IgmpUsDot1qLayer(vlan of.VlanType, priority uint8) *layers.Dot1Q {
167 dot1q := &layers.Dot1Q{}
168 dot1q.Priority = priority
169 dot1q.DropEligible = false
170 dot1q.VLANIdentifier = uint16(vlan)
171 dot1q.Type = layers.EthernetTypeIPv4
172 return dot1q
173}
174
175// Igmpv2UsIpv4Layer : Set the IP layer for IGMPv2
176// TODO - Identify correct way of obtaining source IP
177// This should be the configured IGMP proxy address which should be per OLT
178// We should probably be able to have a single function for both
179// upstream and downstream
180func Igmpv2UsIpv4Layer(src net.IP, mcip net.IP) *layers.IPv4 {
181 ip := &layers.IPv4{}
182 ip.Version = 4
183 ip.Protocol = layers.IPProtocolIGMP
184 ip.TTL = 1
185 ip.SrcIP = src
186 ip.DstIP = mcip
187 return ip
188}
189
190// Igmpv3UsIpv4Layer : Set the IP layer for IGMPv3
191// TODO - Identify correct way of obtaining source IP
192// This should be the configured IGMP proxy address which should be per OLT
193// We should probably be able to have a single function for both
194// upstream and downstream
195func Igmpv3UsIpv4Layer(src net.IP) *layers.IPv4 {
196 ip := &layers.IPv4{}
197 ip.Version = 4
198 ip.Protocol = layers.IPProtocolIGMP
199 ip.TTL = 1
200 ip.SrcIP = src
201 ip.DstIP = net.ParseIP("224.0.0.22")
202 return ip
203}
204
205// IgmpDsEthLayer : Layers defined for downstream communication
206// Ethernet layer for downstream communication
207func IgmpDsEthLayer(mcip net.IP) *layers.Ethernet {
208 eth := &layers.Ethernet{}
209 // TODO: Set the source and dest MAC properly and remove hardcoding
210 eth.SrcMAC, _ = net.ParseMAC(igmpSrcMac)
211 eth.DstMAC, _ = net.ParseMAC("01:00:5e:00:00:00")
212 eth.DstMAC[3] = mcip[1] & 0x7f
213 eth.DstMAC[4] = mcip[2]
214 eth.DstMAC[5] = mcip[3]
215 eth.EthernetType = layers.EthernetTypeDot1Q
216 return eth
217}
218
219// IgmpDsDot1qLayer set the DS VLAN layer
220func IgmpDsDot1qLayer(vlan of.VlanType, priority uint8) *layers.Dot1Q {
221 dot1q := &layers.Dot1Q{}
222 dot1q.Priority = priority
223 dot1q.DropEligible = false
224 dot1q.VLANIdentifier = uint16(vlan)
225 dot1q.Type = layers.EthernetTypeIPv4
226 return dot1q
227}
228
229// IgmpDsIpv4Layer set the IP layer
230func IgmpDsIpv4Layer(src net.IP, mcip net.IP) *layers.IPv4 {
231 ip := &layers.IPv4{}
232 ip.Version = 4
233 ip.Protocol = layers.IPProtocolIGMP
234 ip.TTL = 1
235 ip.SrcIP = src
236 if mcip.Equal(net.ParseIP("0.0.0.0")) {
237 mcip = net.ParseIP("224.0.0.1")
238 }
239 ip.DstIP = mcip
240 return ip
241}
242
243// IgmpQueryv2Layer : IGMP Query Layer
244func IgmpQueryv2Layer(mcip net.IP, resptime time.Duration) *layers.IGMPv1or2 {
245 igmp := &layers.IGMPv1or2{}
246 igmp.Type = layers.IGMPMembershipQuery
247 igmp.GroupAddress = mcip
248 igmp.MaxResponseTime = resptime
249 return igmp
250}
251
252// IgmpQueryv3Layer : IGMP v3 Query Layer
253func IgmpQueryv3Layer(mcip net.IP, resptime time.Duration) *layers.IGMP {
254 igmp := &layers.IGMP{}
255 igmp.Type = layers.IGMPMembershipQuery
256 igmp.GroupAddress = mcip
257 igmp.MaxResponseTime = resptime
258 return igmp
259}
260
261// IgmpReportv2Layer : IGMP Layer
262func IgmpReportv2Layer(mcip net.IP) *layers.IGMPv1or2 {
263 igmp := &layers.IGMPv1or2{}
264 igmp.Type = layers.IGMPMembershipReportV2
265 igmp.GroupAddress = mcip
266 return igmp
267}
268
269// IgmpLeavev2Layer : IGMP Leave Layer
270func IgmpLeavev2Layer(mcip net.IP) *layers.IGMPv1or2 {
271 igmp := &layers.IGMPv1or2{}
272 igmp.Type = layers.IGMPLeaveGroup
273 igmp.GroupAddress = mcip
274 return igmp
275}
276
277// IgmpReportv3Layer : IGMP v3 Report Layer
278func IgmpReportv3Layer(mcip net.IP, incl bool, srclist []net.IP) *layers.IGMP {
279 // IGMP base
280 igmp := &layers.IGMP{}
281 igmp.Type = layers.IGMPMembershipReportV3
282 igmp.NumberOfGroupRecords = 1
283
284 // IGMP Group
285 group := layers.IGMPv3GroupRecord{}
286 if incl {
287 group.Type = layers.IGMPIsIn
288 } else {
289 group.Type = layers.IGMPIsEx
290 }
291 group.MulticastAddress = mcip
292 group.NumberOfSources = uint16(len(srclist))
293 group.SourceAddresses = srclist
294 igmp.GroupRecords = append(igmp.GroupRecords, group)
295
296 return igmp
297}
298
299// Igmpv2QueryPacket : IGMP Query in Downstream
300func Igmpv2QueryPacket(mcip net.IP, vlan of.VlanType, selfip net.IP, pbit uint8, maxResp uint32) ([]byte, error) {
301 // Construct the layers that form the packet
302 eth := IgmpDsEthLayer(mcip)
303 dot1q := IgmpDsDot1qLayer(vlan, pbit)
304 ip := IgmpDsIpv4Layer(selfip, mcip)
305 igmp := IgmpQueryv2Layer(mcip, time.Duration(maxResp)*time.Second)
306
307 // Now prepare the buffer into which the layers are to be serialized
308 buff := gopacket.NewSerializeBuffer()
309 opts := gopacket.SerializeOptions{
310 FixLengths: true,
311 ComputeChecksums: true,
312 }
313 if err := gopacket.SerializeLayers(buff, opts, eth, dot1q, ip, igmp); err != nil {
314 logger.Error(ctx, "Error in serializing layers")
315 return nil, err
316 }
317 return buff.Bytes(), nil
318}
319
320// Igmpv3QueryPacket : IGMPv3 Query in Downstream
321func Igmpv3QueryPacket(mcip net.IP, vlan of.VlanType, selfip net.IP, pbit uint8, maxResp uint32) ([]byte, error) {
322 // Construct the layers that form the packet
323 eth := IgmpDsEthLayer(mcip)
324 dot1q := IgmpDsDot1qLayer(vlan, pbit)
325 ip := IgmpDsIpv4Layer(selfip, mcip)
326 igmp := IgmpQueryv3Layer(mcip, time.Duration(maxResp)*time.Second)
327
328 // Now prepare the buffer into which the layers are to be serialized
329 buff := gopacket.NewSerializeBuffer()
330 opts := gopacket.SerializeOptions{
331 FixLengths: true,
332 ComputeChecksums: true,
333 }
334 if err := gopacket.SerializeLayers(buff, opts, eth, dot1q, ip, igmp); err != nil {
335 logger.Error(ctx, "Error in serializing layers")
336 return nil, err
337 }
338 return buff.Bytes(), nil
339}
340
341// IgmpReportv2Packet : Packet - IGMP v2 report in upstream
342func IgmpReportv2Packet(mcip net.IP, vlan of.VlanType, priority uint8, selfip net.IP) ([]byte, error) {
343 // Construct the layers that form the packet
344 eth := IgmpUsEthLayer(mcip)
345 dot1q := IgmpUsDot1qLayer(vlan, priority)
346 ip := Igmpv2UsIpv4Layer(selfip, mcip)
347 igmp := IgmpReportv2Layer(mcip)
348
349 // Now prepare the buffer into which the layers are to be serialized
350 buff := gopacket.NewSerializeBuffer()
351 opts := gopacket.SerializeOptions{
352 FixLengths: true,
353 ComputeChecksums: true,
354 }
355 if err := gopacket.SerializeLayers(buff, opts, eth, dot1q, ip, igmp); err != nil {
356 logger.Error(ctx, "Error in serializing layers")
357 return nil, err
358 }
359 return buff.Bytes(), nil
360}
361
362// Igmpv3ReportPacket : Packet - IGMP v3 report in upstream
363func Igmpv3ReportPacket(mcip net.IP, vlan of.VlanType, priority uint8, selfip net.IP, incl bool, srclist []net.IP) ([]byte, error) {
364 // Construct the layers that form the packet
365 eth := IgmpUsEthLayer(net.ParseIP("224.0.0.22").To4())
366 dot1q := IgmpUsDot1qLayer(vlan, priority)
367 ip := Igmpv3UsIpv4Layer(selfip)
368 igmp := IgmpReportv3Layer(mcip, incl, srclist)
369
370 // Now prepare the buffer into which the layers are to be serialized
371 buff := gopacket.NewSerializeBuffer()
372 opts := gopacket.SerializeOptions{
373 FixLengths: true,
374 ComputeChecksums: true,
375 }
376 if err := gopacket.SerializeLayers(buff, opts, eth, dot1q, ip, igmp); err != nil {
377 logger.Error(ctx, "Error in serializing layers")
378 return nil, err
379 }
380 return buff.Bytes(), nil
381}
382
383// IgmpLeavePacket : Packet- IGMP Leave in upstream
384func IgmpLeavePacket(mcip net.IP, vlan of.VlanType, priority uint8, selfip net.IP) ([]byte, error) {
385 // Construct the layers that form the packet
386 eth := IgmpUsEthLayer(mcip)
387 dot1q := IgmpUsDot1qLayer(vlan, priority)
388 ip := Igmpv2UsIpv4Layer(selfip, mcip)
389 igmp := IgmpLeavev2Layer(mcip)
390
391 // Now prepare the buffer into which the layers are to be serialized
392 buff := gopacket.NewSerializeBuffer()
393 opts := gopacket.SerializeOptions{
394 FixLengths: true,
395 ComputeChecksums: true,
396 }
397 if err := gopacket.SerializeLayers(buff, opts, eth, dot1q, ip, igmp); err != nil {
398 logger.Error(ctx, "Error in serializing layers")
399 return nil, err
400 }
401 return buff.Bytes(), nil
402}
403
404// getVersion to get igmp version type
405func getVersion(ver string) uint8 {
406 if ver == "2" || ver == "v2" {
407 return IgmpVersion2
408 }
409 return IgmpVersion3
410}
411
412// IsIPPresent is Utility to check if an IP address is in a list
413func IsIPPresent(i net.IP, ips []net.IP) bool {
414 for _, ip := range ips {
415 if i.Equal(ip) {
416 return true
417 }
418 }
419 return false
420}
421
Naveen Sampath04696f72022-06-13 15:19:14 +0530422//AddToPendingPool - adds Igmp Device obj to pending pool
423func AddToPendingPool(device string, groupKey string) bool {
424
425 logger.Infow(ctx, "Add Device to IgmpGroup Pending Pool", log.Fields{"Device": device, "GroupKey": groupKey})
426 if grp, ok := GetApplication().IgmpGroups.Load(groupKey); ok {
427 ig := grp.(*IgmpGroup)
428 ig.PendingPoolLock.Lock()
429 logger.Infow(ctx, "Adding Device to IgmpGroup Pending Pool", log.Fields{"Device": device, "GroupID": ig.GroupID, "GroupName": ig.GroupName, "GroupAddr": ig.GroupAddr.String()})
430 ig.PendingGroupForDevice[device] = time.Now().Add(time.Duration(GroupExpiryTime) * time.Minute)
431 ig.PendingPoolLock.Unlock()
432 if err := ig.WriteToDb(); err != nil {
433 logger.Errorw(ctx, "Igmp group Write to DB failed", log.Fields{"groupName": ig.GroupName})
434 }
435 return true
436 }
437 return false
438}
439
440/*
441func checkIfForceGroupRemove(device string) bool {
442 if d := GetApplication().GetDevice(device); d != nil {
443 if d.State == cntlr.DeviceStateREBOOTED || d.State == cntlr.DeviceStateDOWN {
444 return true
445 }
446 }
447 return false
448}*/
449
Naveen Sampath04696f72022-06-13 15:19:14 +0530450// SendQueryExpiredEventGroupSpecific to send group specific query expired event.
451func SendQueryExpiredEventGroupSpecific(portKey string, igd *IgmpGroupDevice, igc *IgmpGroupChannel) {
452
453 logger.Info(ctx, "Processing-SendQueryExpiredEventGroupSpecific-Event")
454 va := GetApplication()
455 mvpName := va.GetMvlanProfileByTag(igd.Mvlan).Name
456
457 sendEvent := func(key interface{}, value interface{}) bool {
458 if value.(*VoltService).IgmpEnabled && value.(*VoltService).MvlanProfileName == mvpName {
459 logger.Debugw(ctx, "sending-query-expired-group-specific-event", log.Fields{"EventType": QueryExpiredGroupSpecific, "ServiceName": value.(*VoltService).Name})
460 }
461 return false
462 }
463
464 // Fetching service name to send with query expired event.
465 vpvs, _ := va.VnetsByPort.Load(portKey)
466 if vpvs == nil {
467 logger.Errorw(ctx, "volt-port-vnet-is-nil", log.Fields{"vpvs": vpvs})
468 return
469 }
470
471 for _, vpv := range vpvs.([]*VoltPortVnet) {
472 vpv.services.Range(sendEvent)
473 }
474}
475
476// GetMcastServiceForSubAlarm to get mcast service name for subscriber alarm.
477func GetMcastServiceForSubAlarm(uniPort *VoltPort, mvp *MvlanProfile) string {
478
479 var serviceName string
480 mvpName := mvp.Name
481
482 va := GetApplication()
483
484 sendAlm := func(key interface{}, value interface{}) bool {
485 if value.(*VoltService).IgmpEnabled && value.(*VoltService).MvlanProfileName == mvpName {
486 serviceName = value.(*VoltService).Name
487 }
488 return true
489 }
490
491 // Fetching service name to send with active channels exceeded per subscriber alarm.
492 vpvs, _ := va.VnetsByPort.Load(uniPort.Name)
493 if vpvs == nil {
494 logger.Errorw(ctx, "volt-port-vnet-is-nil", log.Fields{"vpvs": vpvs})
495 return serviceName
496 }
497
498 for _, vpv := range vpvs.([]*VoltPortVnet) {
499 vpv.services.Range(sendAlm)
500 }
501
502 return serviceName
503
504}
505
Naveen Sampath04696f72022-06-13 15:19:14 +0530506// RestoreIgmpGroupsFromDb to restore igmp groups from database
507func (va *VoltApplication) RestoreIgmpGroupsFromDb() {
508
509 groups, _ := db.GetIgmpGroups()
510 for _, group := range groups {
511 b, ok := group.Value.([]byte)
512 if !ok {
513 logger.Warn(ctx, "The value type is not []byte")
514 continue
515 }
516 var ig IgmpGroup
517 err := json.Unmarshal(b, &ig)
518 if err != nil {
519 logger.Warn(ctx, "Unmarshal of IGMP Group failed")
520 continue
521 }
522 ig.Devices = make(map[string]*IgmpGroupDevice)
523
524 //For Upgrade Case
525 if len(ig.PendingGroupForDevice) == 0 {
526 ig.PendingGroupForDevice = make(map[string]time.Time)
527 }
528 logger.Infow(ctx, "Restoring Groups", log.Fields{"igGroupID": ig.GroupID, "igGroupName": ig.GroupName, "igMvlan": ig.Mvlan})
529 grpKey := ig.getKey()
530 va.IgmpGroups.Store(grpKey, &ig)
531 // Just delete and lose the IGMP group with the same group Id
532 if _, err := va.GetIgmpGroupID(ig.GroupID); err != nil {
533 logger.Warnw(ctx, "GetIgmpGroupID Failed", log.Fields{"igGroupID": ig.GroupID, "Error": err})
534 }
535 ig.RestoreDevices()
536
537 if ig.NumDevicesActive() == 0 {
538 va.AddGroupToPendingPool(&ig)
539 }
540 logger.Infow(ctx, "Restored Groups", log.Fields{"igGroupID": ig.GroupID, "igGroupName": ig.GroupName, "igMvlan": ig.Mvlan})
541 }
542}
543
544// AddIgmpGroup : When the first IGMP packet is received, the MVLAN profile is identified
545// for the IGMP group and grp obj is obtained from the available pending pool of groups.
546// If not, new group obj will be created based on available group IDs
547func (va *VoltApplication) AddIgmpGroup(mvpName string, gip net.IP, device string) *IgmpGroup {
548
549 var ig *IgmpGroup
550 if mvp, grpName := va.GetMvlanProfileForMcIP(mvpName, gip); mvp != nil {
551 if ig = va.GetGroupFromPendingPool(mvp.Mvlan, device); ig != nil {
552 logger.Infow(ctx, "Igmp Group obtained from global pending pool", log.Fields{"MvlanProfile": mvpName, "GroupID": ig.GroupID, "Device": device, "GroupName": ig.GroupName, "GroupAddr": ig.GroupAddr.String()})
553 oldKey := mvp.generateGroupKey(ig.GroupName, ig.GroupAddr.String())
554 ig.IgmpGroupReInit(grpName, gip)
555 ig.IsGroupStatic = mvp.Groups[grpName].IsStatic
556 ig.UpdateIgmpGroup(oldKey, ig.getKey())
557 } else {
558 logger.Infow(ctx, "No Igmp Group available in global pending pool. Creating new Igmp Group", log.Fields{"MvlanProfile": mvpName, "Device": device, "GroupAddr": gip.String()})
559 if ig = va.GetAvailIgmpGroupID(); ig == nil {
560 logger.Error(ctx, "Igmp Group Creation Failed: Group Id Unavailable")
561 return nil
562 }
563 ig.IgmpGroupInit(grpName, gip, mvp)
564 grpKey := ig.getKey()
565 va.IgmpGroups.Store(grpKey, ig)
566 }
567 if err := ig.WriteToDb(); err != nil {
568 logger.Errorw(ctx, "Igmp group Write to DB failed", log.Fields{"groupName": ig.GroupName})
569 }
570 return ig
571 }
Tinoj Joseph1d108322022-07-13 10:07:39 +0530572 logger.Errorw(ctx, "GetMvlan Pro failed", log.Fields{"Group": gip})
Naveen Sampath04696f72022-06-13 15:19:14 +0530573 return nil
574}
575
576// GetIgmpGroup helps search for the IGMP group from the list of
577// active IGMP groups. For now, the assumption is that a group
578// cannot belong to more than on MVLAN. If we change that definition,
579// we have to take a relook at this implementation. The key will include
580// both MVLAN and the group IP.
581func (va *VoltApplication) GetIgmpGroup(mvlan of.VlanType, gip net.IP) *IgmpGroup {
582
583 profile, _ := va.MvlanProfilesByTag.Load(mvlan)
584 if profile == nil {
585 logger.Errorw(ctx, "Mvlan Profile not found for incoming packet. Dropping Request", log.Fields{"Mvlan": mvlan, "GroupAddr": gip.String()})
586 return nil
587 }
588 mvp := profile.(*MvlanProfile)
589 _, gName := va.GetMvlanProfileForMcIP(mvp.Name, gip)
590 grpKey := mvp.generateGroupKey(gName, gip.String())
591 logger.Debugw(ctx, "Get IGMP Group", log.Fields{"Group": grpKey})
592 igIntf, ok := va.IgmpGroups.Load(grpKey)
593 if ok {
594 logger.Debugw(ctx, "Get IGMP Group Success", log.Fields{"Group": grpKey})
595 ig := igIntf.(*IgmpGroup)
596
597 //Case: Group was part of pending and Join came with same channel or different channel from same group
598 // (from same or different device)
599 // In that case, the same group will be allocated since the group is still part of va.IgmpGroups
600 // So, the groups needs to be removed from global pending pool
601 va.RemoveGroupDevicesFromPendingPool(ig)
602 return ig
603 }
604 return nil
605}
606
Naveen Sampath04696f72022-06-13 15:19:14 +0530607// DelIgmpGroup : When the last subscriber leaves the IGMP group across all the devices
608// the IGMP group is removed.
609func (va *VoltApplication) DelIgmpGroup(ig *IgmpGroup) {
610
611 profile, found := GetApplication().MvlanProfilesByTag.Load(ig.Mvlan)
612 if found {
613 mvp := profile.(*MvlanProfile)
614
615 grpKey := mvp.generateGroupKey(ig.GroupName, ig.GroupAddr.String())
616
617 if igIntf, ok := va.IgmpGroups.Load(grpKey); ok {
618 ig := igIntf.(*IgmpGroup)
619 ig.IgmpGroupLock.Lock()
620 if ig.NumDevicesAll() == 0 {
621 logger.Debugw(ctx, "Deleting IGMP Group", log.Fields{"Group": grpKey})
622 va.PutIgmpGroupID(ig)
623 va.IgmpGroups.Delete(grpKey)
624 _ = db.DelIgmpGroup(grpKey)
625 } else {
626 logger.Infow(ctx, "Skipping IgmpGroup Device. Pending Igmp Group Devices present", log.Fields{"GroupID": ig.GroupID, "GroupName": ig.GroupName, "GroupAddr": ig.GroupAddr.String(), "PendingDevices": len(ig.Devices)})
627 va.AddGroupToPendingPool(ig)
628 if err := ig.WriteToDb(); err != nil {
629 logger.Errorw(ctx, "Igmp group Write to DB failed", log.Fields{"groupName": ig.GroupName})
630 }
631 }
632 ig.IgmpGroupLock.Unlock()
633 }
634
635 }
636}
637
638// GetPonPortID Gets the PON port ID from uniPortID
639func (va *VoltApplication) GetPonPortID(device, uniPortID string) uint32 {
640
641 isNNI := strings.Contains(uniPortID, "nni")
642 if isNNI || uniPortID == StaticPort {
643 logger.Debugw(ctx, "Cannot get pon port from UNI port", log.Fields{"port": uniPortID})
644 return 0xFF
645 }
646 dIntf, ok := va.DevicesDisc.Load(device)
647 if !ok {
648 return 0xFF
649 }
650 d := dIntf.(*VoltDevice)
651
652 uniPort := d.GetPort(uniPortID)
653 if uniPort == nil {
654 return 0xFF
655 }
656 return GetPonPortIDFromUNIPort(uniPort.ID)
657}
658
659// AggActiveChannelsCountPerSub aggregates the active channel count for given uni port.
660// It will iterate over all the groups and store the sum of active channels in VoltPort.
661func (va *VoltApplication) AggActiveChannelsCountPerSub(device, uniPort string, port *VoltPort) {
662 var activeChannelCount uint32
663
664 collectActiveChannelCount := func(key interface{}, value interface{}) bool {
665 ig := value.(*IgmpGroup)
666 igd := ig.Devices[device]
667 if igd == nil {
668 return true
669 }
670 if portChannels, ok := igd.PortChannelMap.Load(uniPort); ok {
671 channelList := portChannels.([]net.IP)
672 activeChannelCount += uint32(len(channelList))
673 }
674 return true
675 }
676 va.IgmpGroups.Range(collectActiveChannelCount)
677
678 logger.Debugw(ctx, "AggrActiveChannelCount for Subscriber",
679 log.Fields{"UniPortID": uniPort, "count": activeChannelCount})
680
681 port.ActiveChannels = activeChannelCount
682}
683
684// AggActiveChannelsCountForPonPort Aggregates the active channel count for given pon port.
685// It will iterate over all the groups and store the sum of active channels in VoltDevice.
686func (va *VoltApplication) AggActiveChannelsCountForPonPort(device string, ponPortID uint32, port *PonPortCfg) {
687
688 var activeChannelCount uint32
689
690 collectActiveChannelCount := func(key interface{}, value interface{}) bool {
691 ig := value.(*IgmpGroup)
692 igd := ig.Devices[device]
693 if igd == nil {
694 return true
695 }
696 if ponPortChannels, ok := igd.PonPortChannelMap.Get(ponPortID); ok {
697 activeChannelCount += ponPortChannels.(*PonPortChannels).GetActiveChannelCount()
698 }
699 return true
700 }
701 va.IgmpGroups.Range(collectActiveChannelCount)
702
703 logger.Debugw(ctx, "AggrActiveChannelCount for Pon Port",
704 log.Fields{"PonPortID": ponPortID, "count": activeChannelCount})
705
706 port.ActiveIGMPChannels = activeChannelCount
707}
708
709// UpdateActiveChannelCountForPonPort increments the global counter for active
710// channel count per pon port.
711func (va *VoltApplication) UpdateActiveChannelCountForPonPort(device, uniPortID string, ponPortID uint32, isAdd, isChannel bool, igd *IgmpGroupDevice) {
712 incrDecr := func(value uint32) uint32 {
713 if isAdd {
714 return value + 1
715 }
716 return value - 1
717 }
718 if d, exists := va.DevicesDisc.Load(device); exists {
719 voltDevice := d.(*VoltDevice)
720
721 if isChannel {
722 voltDevice.ActiveChannelCountLock.Lock()
723 // If New channel is added/deleted, then only update the ActiveChannelsPerPon
724 if value, ok := voltDevice.ActiveChannelsPerPon.Load(ponPortID); ok {
725 port := value.(*PonPortCfg)
726 port.ActiveIGMPChannels = incrDecr(port.ActiveIGMPChannels)
727 voltDevice.ActiveChannelsPerPon.Store(ponPortID, port)
728 logger.Debugw(ctx, "+++ActiveChannelsPerPon", log.Fields{"count": port.ActiveIGMPChannels}) // TODO: remove me
729 }
730 voltDevice.ActiveChannelCountLock.Unlock()
731 }
732 if uPort, ok := voltDevice.Ports.Load(uniPortID); ok {
733 uniPort := uPort.(*VoltPort)
734 uniPort.ActiveChannels = incrDecr(uniPort.ActiveChannels)
735 voltDevice.Ports.Store(uniPortID, uniPort)
736 logger.Debugw(ctx, "+++ActiveChannelsPerSub", log.Fields{"count": uniPort.ActiveChannels}) // TODO: remove me
737 }
738 }
739}
740
741// IsMaxChannelsCountExceeded checks if the PON port active channel
742// capacity and subscriber level channel capacity is reached to max allowed
743// channel per pon threshold. If Exceeds, return true else return false.
744func (va *VoltApplication) IsMaxChannelsCountExceeded(device, uniPortID string,
745 ponPortID uint32, ig *IgmpGroup, channelIP net.IP, mvp *MvlanProfile) bool {
746
747 // New receiver check is required to identify the IgmpReportMsg received
748 // in response to the IGMP Query sent from VGC.
749 if newReceiver := ig.IsNewReceiver(device, uniPortID, channelIP); !newReceiver {
750 logger.Debugw(ctx, "Not a new receiver. It is a response to IGMP Query",
751 log.Fields{"port": uniPortID, "channel": channelIP})
752 return false
753 }
754
755 if vDev, exists := va.DevicesDisc.Load(device); exists {
756 voltDevice := vDev.(*VoltDevice)
757
758 // Checking subscriber active channel count with maxChannelsAllowedPerSub
759 if uniPort, present := voltDevice.Ports.Load(uniPortID); present {
760 if uniPort.(*VoltPort).ActiveChannels >= mvp.MaxActiveChannels {
761 logger.Errorw(ctx, "Max allowed channels per subscriber is exceeded",
762 log.Fields{"activeCount": uniPort.(*VoltPort).ActiveChannels, "channel": channelIP, "UNI": uniPort.(*VoltPort).Name})
763 if !(uniPort.(*VoltPort).ChannelPerSubAlarmRaised) {
764 serviceName := GetMcastServiceForSubAlarm(uniPort.(*VoltPort), mvp)
765 logger.Debugw(ctx, "Raising-SendActiveChannelPerSubscriberAlarm-Initiated", log.Fields{"ActiveChannels": uniPort.(*VoltPort).ActiveChannels, "ServiceName": serviceName})
766 uniPort.(*VoltPort).ChannelPerSubAlarmRaised = true
767 }
768 return true
769 }
770 } else {
771 logger.Errorw(ctx, "UNI port not found in VoltDevice", log.Fields{"uniPortID": uniPortID})
772 }
773 if value, ok := voltDevice.ActiveChannelsPerPon.Load(ponPortID); ok {
774 ponPort := value.(*PonPortCfg)
775
776 logger.Debugw(ctx, "----Active channels count for PON port",
777 log.Fields{"PonPortID": ponPortID, "activeChannels": ponPort.ActiveIGMPChannels,
778 "maxAllowedChannelsPerPon": ponPort.MaxActiveChannels})
779
780 if ponPort.ActiveIGMPChannels < ponPort.MaxActiveChannels {
781 // PON port active channel capacity is not yet reached to max allowed channels per pon.
782 // So allowing to add receiver.
783 return false
784 } else if ponPort.ActiveIGMPChannels >= ponPort.MaxActiveChannels && ig != nil {
785 // PON port active channel capacity is reached to max allowed channels per pon.
786 // Check if same channel is already configured on that PON port.
787 // If that channel is present, then allow AddReceiver else it will be rejected.
788 igd, isPresent := ig.Devices[device]
789 if isPresent {
790 if channelListForPonPort, _ := igd.PonPortChannelMap.Get(ponPortID); channelListForPonPort != nil {
791 if _, isExists := channelListForPonPort.(*PonPortChannels).ChannelList.Get(channelIP.String()); isExists {
792 return false
793 }
794 }
795 }
796 }
797 logger.Errorw(ctx, "Active channels count for PON port exceeded",
798 log.Fields{"PonPortID": ponPortID, "activeChannels": ponPort.ActiveIGMPChannels, "channel": channelIP, "UNI": uniPortID})
799 } else {
800 logger.Warnw(ctx, "PON port level active channel count does not exists",
801 log.Fields{"ponPortID": ponPortID})
802 return false
803 }
804 }
805 logger.Warnw(ctx, "Max allowed channels per pon threshold is reached", log.Fields{"PonPortID": ponPortID})
806 return true
807}
808
809// ProcessIgmpv2Pkt : This is IGMPv2 packet.
810func (va *VoltApplication) ProcessIgmpv2Pkt(device string, port string, pkt gopacket.Packet) {
811 // First get the layers of interest
812 dot1Q := pkt.Layer(layers.LayerTypeDot1Q).(*layers.Dot1Q)
813 pktVlan := of.VlanType(dot1Q.VLANIdentifier)
814 igmpv2 := pkt.Layer(layers.LayerTypeIGMP).(*layers.IGMPv1or2)
815
816 ponPortID := va.GetPonPortID(device, port)
817
818 var vpv *VoltPortVnet
819
820 logger.Debugw(ctx, "Received IGMPv2 Type", log.Fields{"Type": igmpv2.Type})
821
822 if igmpv2.Type == layers.IGMPMembershipReportV2 || igmpv2.Type == layers.IGMPMembershipReportV1 {
823
824 logger.Infow(ctx, "IGMP Join received: v2", log.Fields{"Addr": igmpv2.GroupAddress, "Port": port})
825
826 // This is a report coming from the PON. We must be able to first find the
827 // subscriber from the VLAN tag and port and verify if the IGMP proxy is
828 // enabled for the subscriber
829 vpv, _ = va.GetVnetFromPkt(device, port, pkt)
830
831 if vpv == nil {
832 logger.Errorw(ctx, "Couldn't find VNET associated with port", log.Fields{"Port": port})
833 return
834 } else if !vpv.IgmpEnabled {
835 logger.Errorw(ctx, "IGMP is not activated on the port", log.Fields{"Port": port})
836 return
837 }
838
839 mvp := va.GetMvlanProfileByName(vpv.MvlanProfileName)
840 if mvp == nil {
841 logger.Errorw(ctx, "Igmp Packet Received for Subscriber with Missing Mvlan Profile",
842 log.Fields{"Receiver": vpv.Port, "MvlanProfile": vpv.MvlanProfileName})
843 return
844 }
845 mvlan := mvp.Mvlan
846
847 mvp.mvpLock.RLock()
848 defer mvp.mvpLock.RUnlock()
849 // The subscriber is validated and now process the IGMP report
850 ig := va.GetIgmpGroup(mvlan, igmpv2.GroupAddress)
851
852 if yes := va.IsMaxChannelsCountExceeded(device, port, ponPortID, ig, igmpv2.GroupAddress, mvp); yes {
853 logger.Warnw(ctx, "Dropping IGMP Join v2: Active channel threshold exceeded",
854 log.Fields{"PonPortID": ponPortID, "Addr": igmpv2.GroupAddress, "MvlanProfile": vpv.MvlanProfileName})
855 return
856 }
857 if ig != nil {
858 logger.Infow(ctx, "IGMP Group", log.Fields{"Group": ig.GroupID, "devices": ig.Devices})
859 // If the IGMP group is already created. just add the receiver
860 ig.IgmpGroupLock.Lock()
861 // Check for port state to avoid race condition where PortDown event
862 // acquired lock before packet processing
863 vd := GetApplication().GetDevice(device)
864 vp := vd.GetPort(port)
865 if vp == nil || vp.State != PortStateUp {
866 logger.Warnw(ctx, "Join received from a Port that is DOWN or not present",
867 log.Fields{"Port": port})
868 ig.IgmpGroupLock.Unlock()
869 return
870 }
871 ig.AddReceiver(device, port, igmpv2.GroupAddress, nil, IgmpVersion2, dot1Q.VLANIdentifier, dot1Q.Priority, ponPortID)
872 ig.IgmpGroupLock.Unlock()
873 } else {
874 // Create the IGMP group and then add the receiver to the group
875 if ig := va.AddIgmpGroup(vpv.MvlanProfileName, igmpv2.GroupAddress, device); ig != nil {
876 logger.Infow(ctx, "New IGMP Group", log.Fields{"Group": ig.GroupID, "devices": ig.Devices})
877 ig.IgmpGroupLock.Lock()
878 // Check for port state to avoid race condition where PortDown event
879 // acquired lock before packet processing
880 vd := GetApplication().GetDevice(device)
881 vp := vd.GetPort(port)
882 if vp == nil || vp.State != PortStateUp {
883 logger.Warnw(ctx, "Join received from a Port that is DOWN or not present",
884 log.Fields{"Port": port})
885 ig.IgmpGroupLock.Unlock()
886 return
887 }
888 ig.AddReceiver(device, port, igmpv2.GroupAddress, nil, IgmpVersion2, dot1Q.VLANIdentifier, dot1Q.Priority, ponPortID)
889 ig.IgmpGroupLock.Unlock()
890 } else {
891 logger.Errorw(ctx, "IGMP Group Creation Failed", log.Fields{"Addr": igmpv2.GroupAddress})
892 return
893 }
894 }
895 } else if igmpv2.Type == layers.IGMPLeaveGroup {
896 // This is a IGMP leave coming from one of the receivers. We essentially remove the
897 // the receiver.
898 logger.Infow(ctx, "IGMP Leave received: v2", log.Fields{"Addr": igmpv2.GroupAddress, "Port": port})
899
900 vpv, _ = va.GetVnetFromPkt(device, port, pkt)
901 if vpv == nil {
902 logger.Errorw(ctx, "Couldn't find VNET associated with port", log.Fields{"Port": port})
903 return
904 } else if !vpv.IgmpEnabled {
905 logger.Errorw(ctx, "IGMP is not activated on the port", log.Fields{"Port": port})
906 return
907 }
908
909 mvp := va.GetMvlanProfileByName(vpv.MvlanProfileName)
910 mvp.mvpLock.RLock()
911 defer mvp.mvpLock.RUnlock()
912 mvlan := mvp.Mvlan
913 // The subscriber is validated and now process the IGMP report
914 if ig := va.GetIgmpGroup(mvlan, igmpv2.GroupAddress); ig != nil {
915 ig.IgmpGroupLock.Lock()
916 // Delete the receiver once the IgmpGroup is identified
917 ig.DelReceiver(device, port, igmpv2.GroupAddress, nil, ponPortID)
918 ig.IgmpGroupLock.Unlock()
919 if ig.NumDevicesActive() == 0 {
920 va.DelIgmpGroup(ig)
921 }
922 }
923 } else {
924 // This must be a query on the NNI port. However, we dont make that assumption.
925 // Need to look for the IGMP group based on the VLAN in the packet as
926 // the MVLAN
927
928 //Check if mvlan profile exist for the incoming pkt vlan
929 profile, _ := va.MvlanProfilesByTag.Load(pktVlan)
930 if profile == nil {
931 logger.Errorw(ctx, "Mvlan Profile not found for incoming packet. Dropping Request", log.Fields{"Mvlan": pktVlan})
932 return
933 }
934 mvp := profile.(*MvlanProfile)
935 mvp.mvpLock.RLock()
936 defer mvp.mvpLock.RUnlock()
937
938 if net.ParseIP("0.0.0.0").Equal(igmpv2.GroupAddress) {
939 va.processIgmpQueries(device, pktVlan, IgmpVersion2)
940 } else {
941 if ig := va.GetIgmpGroup(pktVlan, igmpv2.GroupAddress); ig != nil {
942 ig.IgmpGroupLock.Lock()
943 igd, ok := ig.Devices[device]
944 if ok {
945 igd.ProcessQuery(igmpv2.GroupAddress, IgmpVersion2)
946 } else {
947 logger.Warnw(ctx, "IGMP Device not found", log.Fields{"Device": device, "Group": igmpv2.GroupAddress})
948 }
949 ig.IgmpGroupLock.Unlock()
950 }
951 }
952 }
953}
954
955// ProcessIgmpv3Pkt : Process IGMPv3 packet
956func (va *VoltApplication) ProcessIgmpv3Pkt(device string, port string, pkt gopacket.Packet) {
957 // First get the layers of interest
958 dot1QLayer := pkt.Layer(layers.LayerTypeDot1Q)
959
960 if dot1QLayer == nil {
961 logger.Error(ctx, "Igmp Packet Received without Vlan - Dropping pkt")
962 return
963 }
964 dot1Q := dot1QLayer.(*layers.Dot1Q)
965 pktVlan := of.VlanType(dot1Q.VLANIdentifier)
966 igmpv3 := pkt.Layer(layers.LayerTypeIGMP).(*layers.IGMP)
967
968 ponPortID := va.GetPonPortID(device, port)
969
970 var vpv *VoltPortVnet
971 logger.Debugw(ctx, "Received IGMPv3 Type", log.Fields{"Type": igmpv3.Type})
972
973 if igmpv3.Type == layers.IGMPMembershipReportV3 {
974 // This is a report coming from the PON. We must be able to first find the
975 // subscriber from the VLAN tag and port and verify if the IGMP proxy is
976 // enabled for the subscriber
977 vpv, _ = va.GetVnetFromPkt(device, port, pkt)
978 if vpv == nil {
979 logger.Errorw(ctx, "Couldn't find VNET associated with port", log.Fields{"Port": port})
980 return
981 } else if !vpv.IgmpEnabled {
982 logger.Errorw(ctx, "IGMP is not activated on the port", log.Fields{"Port": port})
983 return
984 }
985 mvp := va.GetMvlanProfileByName(vpv.MvlanProfileName)
986 if mvp == nil {
987 logger.Errorw(ctx, "Igmp Packet received for Subscriber with Missing Mvlan Profile",
988 log.Fields{"Receiver": vpv.Port, "MvlanProfile": vpv.MvlanProfileName})
989 return
990 }
991 mvp.mvpLock.RLock()
992 defer mvp.mvpLock.RUnlock()
993 mvlan := mvp.Mvlan
994
995 for _, group := range igmpv3.GroupRecords {
996
997 isJoin := isIgmpJoin(group.Type, group.SourceAddresses)
998 // The subscriber is validated and now process the IGMP report
999 ig := va.GetIgmpGroup(mvlan, group.MulticastAddress)
1000 if isJoin {
1001 if yes := va.IsMaxChannelsCountExceeded(device, port, ponPortID, ig, group.MulticastAddress, mvp); yes {
1002 logger.Warnw(ctx, "Dropping IGMP Join v3: Active channel threshold exceeded",
1003 log.Fields{"PonPortID": ponPortID, "Addr": group.MulticastAddress, "MvlanProfile": vpv.MvlanProfileName})
1004
1005 return
1006 }
1007 if ig != nil {
1008 // If the IGMP group is already created. just add the receiver
1009 logger.Infow(ctx, "IGMP Join received for existing group", log.Fields{"Addr": group.MulticastAddress, "Port": port})
1010 ig.IgmpGroupLock.Lock()
1011 // Check for port state to avoid race condition where PortDown event
1012 // acquired lock before packet processing
1013 vd := GetApplication().GetDevice(device)
1014 vp := vd.GetPort(port)
1015 if vp == nil || vp.State != PortStateUp {
1016 logger.Warnw(ctx, "Join received from a Port that is DOWN or not present",
1017 log.Fields{"Port": port})
1018 ig.IgmpGroupLock.Unlock()
1019 return
1020 }
1021 ig.AddReceiver(device, port, group.MulticastAddress, &group, IgmpVersion3,
1022 dot1Q.VLANIdentifier, dot1Q.Priority, ponPortID)
1023 ig.IgmpGroupLock.Unlock()
1024 } else {
1025 // Create the IGMP group and then add the receiver to the group
1026 logger.Infow(ctx, "IGMP Join received for new group", log.Fields{"Addr": group.MulticastAddress, "Port": port})
1027 if ig := va.AddIgmpGroup(vpv.MvlanProfileName, group.MulticastAddress, device); ig != nil {
1028 ig.IgmpGroupLock.Lock()
1029 // Check for port state to avoid race condition where PortDown event
1030 // acquired lock before packet processing
1031 vd := GetApplication().GetDevice(device)
1032 vp := vd.GetPort(port)
1033 if vp == nil || vp.State != PortStateUp {
1034 logger.Warnw(ctx, "Join received from a Port that is DOWN or not present",
1035 log.Fields{"Port": port})
1036 ig.IgmpGroupLock.Unlock()
1037 return
1038 }
1039 ig.AddReceiver(device, port, group.MulticastAddress, &group, IgmpVersion3,
1040 dot1Q.VLANIdentifier, dot1Q.Priority, ponPortID)
1041 ig.IgmpGroupLock.Unlock()
1042 } else {
1043 logger.Warnw(ctx, "IGMP Group Creation Failed", log.Fields{"Addr": group.MulticastAddress})
1044 }
1045 }
1046 } else if ig != nil {
1047 logger.Infow(ctx, "IGMP Leave received for existing group", log.Fields{"Addr": group.MulticastAddress, "Port": port})
1048 ig.IgmpGroupLock.Lock()
1049 ig.DelReceiver(device, port, group.MulticastAddress, &group, ponPortID)
1050 ig.IgmpGroupLock.Unlock()
1051 if ig.NumDevicesActive() == 0 {
1052 va.DelIgmpGroup(ig)
1053 }
1054 } else {
1055 logger.Warnw(ctx, "IGMP Leave received for unknown group", log.Fields{"Addr": group.MulticastAddress})
1056 }
1057 }
1058 } else {
1059 // This must be a query on the NNI port. However, we dont make that assumption.
1060 // Need to look for the IGMP group based on the VLAN in the packet as
1061 // the MVLAN
1062
1063 //Check if mvlan profile exist for the incoming pkt vlan
1064 profile, _ := va.MvlanProfilesByTag.Load(pktVlan)
1065 if profile == nil {
1066 logger.Errorw(ctx, "Mvlan Profile not found for incoming packet. Dropping Request", log.Fields{"Mvlan": pktVlan})
1067 return
1068 }
1069 mvp := profile.(*MvlanProfile)
1070 mvp.mvpLock.RLock()
1071 defer mvp.mvpLock.RUnlock()
1072
1073 if net.ParseIP("0.0.0.0").Equal(igmpv3.GroupAddress) {
1074 va.processIgmpQueries(device, pktVlan, IgmpVersion3)
1075 } else {
1076 if ig := va.GetIgmpGroup(pktVlan, igmpv3.GroupAddress); ig != nil {
1077 ig.IgmpGroupLock.Lock()
1078 igd, ok := ig.Devices[device]
1079 if ok {
1080 igd.ProcessQuery(igmpv3.GroupAddress, IgmpVersion3)
1081 } else {
1082 logger.Warnw(ctx, "IGMP Device not found", log.Fields{"Device": device, "Group": igmpv3.GroupAddress})
1083 }
1084 ig.IgmpGroupLock.Unlock()
1085 }
1086 }
1087 }
1088}
1089
1090// processIgmpQueries to process the igmp queries
1091func (va *VoltApplication) processIgmpQueries(device string, pktVlan of.VlanType, version uint8) {
1092 // This is a generic query and respond with all the groups channels in currently being viewed.
1093 processquery := func(key interface{}, value interface{}) bool {
1094 ig := value.(*IgmpGroup)
1095 ig.IgmpGroupLock.Lock()
1096 if ig.Mvlan != pktVlan {
1097 ig.IgmpGroupLock.Unlock()
1098 return true
1099 }
1100 igd, ok := ig.Devices[device]
1101 if !ok {
1102 logger.Warnw(ctx, "IGMP Device not found", log.Fields{"Device": device})
1103 ig.IgmpGroupLock.Unlock()
1104 return true
1105 }
1106 processQueryForEachChannel := func(key interface{}, value interface{}) bool {
1107 groupAddr := key.(string)
1108 igd.ProcessQuery(net.ParseIP(groupAddr), version)
1109 return true
1110 }
1111 igd.GroupChannels.Range(processQueryForEachChannel)
1112 ig.IgmpGroupLock.Unlock()
1113 return true
1114 }
1115 va.IgmpGroups.Range(processquery)
1116}
1117
1118// isIgmpJoin to check if it is igmp join
1119func isIgmpJoin(recordType layers.IGMPv3GroupRecordType, sourceAddr []net.IP) bool {
1120 var join = false
1121
1122 if (layers.IGMPToEx == recordType) || (layers.IGMPIsEx == recordType) {
1123 join = true
1124 } else if layers.IGMPBlock == recordType {
1125 if len(sourceAddr) == 0 {
1126 join = true
1127 }
1128 } else if (layers.IGMPToIn == recordType) || (layers.IGMPIsIn == recordType) || (layers.IGMPAllow == recordType) {
1129 if len(sourceAddr) != 0 {
1130 join = true
1131 }
1132 }
1133 return join
1134}
1135
1136func isIncl(recordType layers.IGMPv3GroupRecordType) bool {
1137
1138 if (layers.IGMPToIn == recordType) || (layers.IGMPIsIn == recordType) || (layers.IGMPAllow == recordType) {
1139 return true
1140 }
1141 return false
1142}
1143
1144// IgmpProcessPkt to process the IGMP packet received. The packet received brings along with it
1145// the port on which the packet is received and the device the port is in.
1146func (va *VoltApplication) IgmpProcessPkt(device string, port string, pkt gopacket.Packet) {
1147 igmpl := pkt.Layer(layers.LayerTypeIGMP)
1148 if igmpl == nil {
1149 logger.Error(ctx, "Invalid IGMP packet arrived as IGMP packet")
1150 return
1151 }
1152 if igmp, ok := igmpl.(*layers.IGMPv1or2); ok {
1153 // This is an IGMPv2 packet.
1154 logger.Debugw(ctx, "IGMPv2 Packet Received", log.Fields{"IPAddr": igmp.GroupAddress})
1155 va.ProcessIgmpv2Pkt(device, port, pkt)
1156 return
1157 }
1158 if igmpv3, ok := igmpl.(*layers.IGMP); ok {
1159 logger.Debugw(ctx, "IGMPv3 Packet Received", log.Fields{"NumOfGroups": igmpv3.NumberOfGroupRecords})
1160 va.ProcessIgmpv3Pkt(device, port, pkt)
1161 }
1162}
1163
1164// IgmpPacketInd for igmp packet indication
1165func (va *VoltApplication) IgmpPacketInd(device string, port string, pkt gopacket.Packet) {
1166 pt := NewIgmpPacketTask(device, port, pkt)
1167 va.IgmpTasks.AddTask(pt)
1168}
1169
Naveen Sampath04696f72022-06-13 15:19:14 +05301170// storeMvlansMap to store mvlan map
1171func (va *VoltApplication) storeMvlansMap(mvlan of.VlanType, name string, mvp *MvlanProfile) {
1172 va.MvlanProfilesByTag.Store(mvlan, mvp)
1173 va.MvlanProfilesByName.Store(name, mvp)
1174}
1175
1176// deleteMvlansMap to delete mvlan map
1177func (va *VoltApplication) deleteMvlansMap(mvlan of.VlanType, name string) {
1178 va.MvlanProfilesByTag.Delete(mvlan)
1179 va.MvlanProfilesByName.Delete(name)
1180}
1181
1182// RestoreMvlansFromDb to read from the DB and restore all the MVLANs
1183func (va *VoltApplication) RestoreMvlansFromDb() {
1184 mvlans, _ := db.GetMvlans()
1185 for _, mvlan := range mvlans {
1186 b, ok := mvlan.Value.([]byte)
1187 if !ok {
1188 logger.Warn(ctx, "The value type is not []byte")
1189 continue
1190 }
1191 var mvp MvlanProfile
1192 err := json.Unmarshal(b, &mvp)
1193 if err != nil {
1194 logger.Warn(ctx, "Unmarshal of MVLAN failed")
1195 continue
1196 }
1197 va.storeMvlansMap(mvp.Mvlan, mvp.Name, &mvp)
1198
1199 for srNo := range mvp.DevicesList {
1200 if mvp.IgmpServVersion[srNo] == nil {
1201 servVersion := IgmpVersion0
1202 mvp.IgmpServVersion[srNo] = &servVersion
1203 }
1204 }
1205 logger.Infow(ctx, "Restored Mvlan Profile", log.Fields{"MVPName": mvp.Name})
1206 }
1207}
1208
1209// GetMvlanProfileByTag fetches MVLAN profile based on the MC VLAN
1210func (va *VoltApplication) GetMvlanProfileByTag(vlan of.VlanType) *MvlanProfile {
1211 if mvp, ok := va.MvlanProfilesByTag.Load(vlan); ok {
1212 return mvp.(*MvlanProfile)
1213 }
1214 return nil
1215}
1216
1217// GetMvlanProfileByName fetches MVLAN profile based on the profile name.
1218func (va *VoltApplication) GetMvlanProfileByName(name string) *MvlanProfile {
1219 if mvp, ok := va.MvlanProfilesByName.Load(name); ok {
1220 return mvp.(*MvlanProfile)
1221 }
1222 return nil
1223}
1224
1225//UpdateMvlanProfile - only channel groups be updated
1226func (va *VoltApplication) UpdateMvlanProfile(name string, vlan of.VlanType, groups map[string][]string, activeChannelCount int, proxy map[string]common.MulticastGroupProxy) error {
1227
1228 mvpIntf, ok := va.MvlanProfilesByName.Load(name)
1229 if !ok {
1230 logger.Error(ctx, "Update Mvlan Failed: Profile does not exist")
1231 return errors.New("MVLAN profile not found")
1232 }
1233 mvp := mvpIntf.(*MvlanProfile)
1234 // check if groups are same then just update the OLTSerial numbers, push the config on new serial numbers
1235
1236 existingGroup := mvp.Groups
1237 existingProxy := mvp.Proxy
1238 mvp.Groups = make(map[string]*MvlanGroup)
1239 mvp.Proxy = make(map[string]*MCGroupProxy)
1240
1241 /* Need to protect groups and proxy write lock */
1242 mvp.mvpLock.Lock()
1243 for grpName, grpIPList := range groups {
1244 mvp.AddMvlanGroup(grpName, grpIPList)
1245 }
1246 for grpName, proxyInfo := range proxy {
1247 mvp.AddMvlanProxy(grpName, proxyInfo)
1248 }
1249 if _, ok := mvp.Groups[common.StaticGroup]; ok {
1250 if _, yes := mvp.Proxy[common.StaticGroup]; !yes {
1251 mvp.Groups[common.StaticGroup].IsStatic = true
1252 }
1253 }
1254 prevMaxActiveChannels := mvp.MaxActiveChannels
1255 if reflect.DeepEqual(mvp.Groups, existingGroup) && reflect.DeepEqual(mvp.Proxy, existingProxy) {
1256 logger.Info(ctx, "No change in groups config")
1257 if uint32(activeChannelCount) != mvp.MaxActiveChannels {
1258 mvp.MaxActiveChannels = uint32(activeChannelCount)
1259 if err := mvp.WriteToDb(); err != nil {
1260 logger.Errorw(ctx, "Mvlan profile Write to DB failed", log.Fields{"ProfileName": mvp.Name})
1261 }
1262 if prevMaxActiveChannels != mvp.MaxActiveChannels {
1263 mvp.UpdateActiveChannelSubscriberAlarm()
1264 }
1265 }
1266 mvp.mvpLock.Unlock()
1267 return nil
1268 }
1269 mvp.mvpLock.Unlock()
1270 mvp.MaxActiveChannels = uint32(activeChannelCount)
1271
1272 // Status is maintained so that in the event of any crash or reboot during update,
1273 // the recovery is possible once the pod is UP again
1274 mvp.SetUpdateStatus("", UpdateInProgress)
1275 mvp.oldGroups = existingGroup
1276 mvp.oldProxy = existingProxy
1277 va.storeMvlansMap(vlan, name, mvp)
1278 if err := mvp.WriteToDb(); err != nil {
1279 logger.Errorw(ctx, "Mvlan profile Write to DB failed", log.Fields{"ProfileName": mvp.Name})
1280 }
1281 if prevMaxActiveChannels != mvp.MaxActiveChannels {
1282 mvp.UpdateActiveChannelSubscriberAlarm()
1283 }
1284
1285 // The update task is added as part of Igm p task list, so that any parallel igmp pkt processing is avoided
1286 // Until, the update operation is completed, the igmp pkt processing will be enqueued
1287 updateTask := NewUpdateMvlanTask(mvp, "")
1288 va.IgmpTasks.AddTask(updateTask)
1289 return nil
1290}
1291
1292// isDeviceInList to check if device is the list
1293func isDeviceInList(serialNum string, OLTSerialNums []string) bool {
1294 for _, oltSerialNum := range OLTSerialNums {
1295 if serialNum == oltSerialNum {
1296 return true
1297 }
1298 }
1299 return false
1300}
1301
1302// McastConfigKey creates the key using the olt serial number and mvlan profile id
1303func McastConfigKey(oltSerialNum string, mvlanProfID string) string {
1304 return oltSerialNum + "_" + mvlanProfID
1305}
1306
1307// GetMcastConfig to get McastConfig Information by OLT and Mvlan Profile ID
1308func (va *VoltApplication) GetMcastConfig(oltSerialNum string, mvlanProfID string) *McastConfig {
1309 if mc, ok := va.McastConfigMap.Load(McastConfigKey(oltSerialNum, mvlanProfID)); ok {
1310 return mc.(*McastConfig)
1311 }
1312 return nil
1313}
1314
1315func (va *VoltApplication) storeMcastConfig(oltSerialNum string, mvlanProfID string, mcastConfig *McastConfig) {
1316 va.McastConfigMap.Store(McastConfigKey(oltSerialNum, mvlanProfID), mcastConfig)
1317}
1318
1319func (va *VoltApplication) deleteMcastConfig(oltSerialNum string, mvlanProfID string) {
1320 va.McastConfigMap.Delete(McastConfigKey(oltSerialNum, mvlanProfID))
1321}
1322
1323// AddMcastConfig for addition of a MVLAN profile
1324func (va *VoltApplication) AddMcastConfig(MvlanProfileID string, IgmpProfileID string, IgmpProxyIP string, OltSerialNum string) error {
1325 var mcastCfg *McastConfig
1326
1327 mcastCfg = va.GetMcastConfig(OltSerialNum, MvlanProfileID)
1328 if mcastCfg == nil {
1329 mcastCfg = &McastConfig{}
1330 } else {
1331 logger.Debugw(ctx, "Mcast Config already exists", log.Fields{"OltSerialNum": mcastCfg.OltSerialNum,
1332 "MVLAN Profile ID": mcastCfg.MvlanProfileID})
1333 }
1334
1335 // Update all igds available
1336 mvpIntf, ok := va.MvlanProfilesByName.Load(MvlanProfileID)
1337 if !ok {
1338 return errors.New("MVLAN profile not found during add mcast config")
1339 }
1340 mvlan := mvpIntf.(*MvlanProfile).Mvlan
1341
1342 mcastCfg.OltSerialNum = OltSerialNum
1343 mcastCfg.MvlanProfileID = MvlanProfileID
1344 mcastCfg.IgmpProfileID = IgmpProfileID
1345 mcastCfg.IgmpProxyIP = net.ParseIP(IgmpProxyIP)
1346
1347 proxyCfg := va.getIgmpProfileMap(IgmpProfileID)
1348
1349 iterIgmpGroups := func(key interface{}, value interface{}) bool {
1350 ig := value.(*IgmpGroup)
1351 if ig.Mvlan != mvlan {
1352 return true
1353 }
1354
1355 for _, igd := range ig.Devices {
1356 if igd.SerialNo != OltSerialNum {
1357 continue
1358 }
1359 igd.proxyCfg = proxyCfg
1360 if IgmpProfileID == "" {
1361 igd.IgmpProxyIP = &igd.proxyCfg.IgmpSourceIP
1362 } else {
1363 igd.IgmpProxyIP = &mcastCfg.IgmpProxyIP
1364 }
1365 mcastCfg.IgmpGroupDevices.Store(igd.GroupID, igd)
1366 logger.Debugw(ctx, "Igd updated with proxyCfg and proxyIP", log.Fields{"name": igd.GroupName,
1367 "IgmpProfileID": IgmpProfileID, "ProxyIP": mcastCfg.IgmpProxyIP})
1368 }
1369 return true
1370 }
1371 va.IgmpGroups.Range(iterIgmpGroups)
1372
1373 va.storeMcastConfig(OltSerialNum, MvlanProfileID, mcastCfg)
1374 if err := mcastCfg.WriteToDb(); err != nil {
1375 logger.Errorw(ctx, "McastConfig Write to DB failed", log.Fields{"OltSerialNum": mcastCfg.OltSerialNum, "MvlanProfileID": mcastCfg.MvlanProfileID})
1376 }
1377 va.addOltToMvlan(MvlanProfileID, OltSerialNum)
1378
1379 return nil
1380}
1381
1382func (va *VoltApplication) addOltToMvlan(MvlanProfileID string, OltSerialNum string) {
1383 var mvp *MvlanProfile
1384 if mvpIntf, ok := va.MvlanProfilesByName.Load(MvlanProfileID); ok {
1385 servVersion := IgmpVersion0
1386 mvp = mvpIntf.(*MvlanProfile)
1387 mvp.DevicesList[OltSerialNum] = NoOp
1388 mvp.IgmpServVersion[OltSerialNum] = &servVersion
1389 if err := mvp.WriteToDb(); err != nil {
1390 logger.Errorw(ctx, "Mvlan profile Write to DB failed", log.Fields{"ProfileName": mvp.Name})
1391 }
1392 mvp.pushIgmpMcastFlows(OltSerialNum)
1393 }
1394}
1395
1396func (va *VoltApplication) delOltFromMvlan(MvlanProfileID string, OltSerialNum string) {
1397 var mvp *MvlanProfile
1398 if mvpIntf, ok := va.MvlanProfilesByName.Load(MvlanProfileID); ok {
1399 mvp = mvpIntf.(*MvlanProfile)
1400 //Delete from mvp list
1401 mvp.removeIgmpMcastFlows(OltSerialNum)
1402 delete(mvp.DevicesList, OltSerialNum)
1403 if err := mvp.WriteToDb(); err != nil {
1404 logger.Errorw(ctx, "Mvlan profile Write to DB failed", log.Fields{"ProfileName": mvp.Name})
1405 }
1406 }
1407}
1408
1409// DelMcastConfig for addition of a MVLAN profile
1410func (va *VoltApplication) DelMcastConfig(MvlanProfileID string, IgmpProfileID string, IgmpProxyIP string, OltSerialNum string) {
1411
1412 va.delOltFromMvlan(MvlanProfileID, OltSerialNum)
1413 va.deleteMcastConfig(OltSerialNum, MvlanProfileID)
1414 _ = db.DelMcastConfig(McastConfigKey(OltSerialNum, MvlanProfileID))
1415 if d := va.GetDeviceBySerialNo(OltSerialNum); d != nil {
1416 if mvp := va.GetMvlanProfileByName(MvlanProfileID); mvp != nil {
1417 va.RemoveGroupsFromPendingPool(d.Name, mvp.Mvlan)
1418 }
1419 }
1420}
1421
1422// DelAllMcastConfig for deletion of all mcast config
1423func (va *VoltApplication) DelAllMcastConfig(OltSerialNum string) error {
1424
1425 deleteIndividualMcastConfig := func(key interface{}, value interface{}) bool {
1426 mcastCfg := value.(*McastConfig)
1427 if mcastCfg.OltSerialNum == OltSerialNum {
1428 va.DelMcastConfig(mcastCfg.MvlanProfileID, mcastCfg.IgmpProfileID, mcastCfg.IgmpProxyIP.String(), mcastCfg.OltSerialNum)
1429 }
1430 return true
1431 }
1432 va.McastConfigMap.Range(deleteIndividualMcastConfig)
1433 return nil
1434}
1435
1436// UpdateMcastConfig for addition of a MVLAN profile
1437func (va *VoltApplication) UpdateMcastConfig(MvlanProfileID string, IgmpProfileID string, IgmpProxyIP string, OltSerialNum string) error {
1438
1439 mcastCfg := va.GetMcastConfig(OltSerialNum, MvlanProfileID)
1440 if mcastCfg == nil {
1441 logger.Warnw(ctx, "Mcast Config not found. Unable to update", log.Fields{"Mvlan Profile ID": MvlanProfileID, "OltSerialNum": OltSerialNum})
1442 return nil
1443 }
1444
1445 oldProfID := mcastCfg.IgmpProfileID
1446 mcastCfg.IgmpProfileID = IgmpProfileID
1447 mcastCfg.IgmpProxyIP = net.ParseIP(IgmpProxyIP)
1448
1449 va.storeMcastConfig(OltSerialNum, MvlanProfileID, mcastCfg)
1450
1451 // Update all igds
1452 if oldProfID != mcastCfg.IgmpProfileID {
1453 updateIgdProxyCfg := func(key interface{}, value interface{}) bool {
1454 igd := value.(*IgmpGroupDevice)
1455 igd.proxyCfg = va.getIgmpProfileMap(mcastCfg.IgmpProfileID)
1456 if IgmpProfileID == "" {
1457 igd.IgmpProxyIP = &igd.proxyCfg.IgmpSourceIP
1458 } else {
1459 igd.IgmpProxyIP = &mcastCfg.IgmpProxyIP
1460 }
1461 return true
1462 }
1463 mcastCfg.IgmpGroupDevices.Range(updateIgdProxyCfg)
1464 }
1465
1466 if err := mcastCfg.WriteToDb(); err != nil {
1467 logger.Errorw(ctx, "McastConfig Write to DB failed", log.Fields{"OltSerialNum": mcastCfg.OltSerialNum, "MvlanProfileID": mcastCfg.MvlanProfileID})
1468 }
1469
1470 return nil
1471}
1472
1473// WriteToDb is utility to write Mcast config Info to database
1474func (mc *McastConfig) WriteToDb() error {
1475 mc.Version = database.PresentVersionMap[database.McastConfigPath]
1476 b, err := json.Marshal(mc)
1477 if err != nil {
1478 return err
1479 }
1480 if err1 := db.PutMcastConfig(McastConfigKey(mc.OltSerialNum, mc.MvlanProfileID), string(b)); err1 != nil {
1481 return err1
1482 }
1483 return nil
1484}
1485
1486// RestoreMcastConfigsFromDb to read from the DB and restore Mcast configs
1487func (va *VoltApplication) RestoreMcastConfigsFromDb() {
1488 mcastConfigs, _ := db.GetMcastConfigs()
1489 for hash, mcastConfig := range mcastConfigs {
1490 b, ok := mcastConfig.Value.([]byte)
1491 if !ok {
1492 logger.Warn(ctx, "The value type is not []byte")
1493 continue
1494 }
1495 var mc McastConfig
1496 err := json.Unmarshal(b, &mc)
1497 if err != nil {
1498 logger.Warn(ctx, "Unmarshal of Mcast config failed")
1499 continue
1500 }
1501 va.storeMcastConfig(mc.OltSerialNum, mc.MvlanProfileID, &mc)
1502 logger.Infow(ctx, "Restored Mcast config", log.Fields{"OltSerialNum": mc.OltSerialNum, "MvlanProfileID": mc.MvlanProfileID, "hash": hash})
1503 }
1504}
1505
1506// AddMvlanProfile for addition of a MVLAN profile
1507func (va *VoltApplication) AddMvlanProfile(name string, mvlan of.VlanType, ponVlan of.VlanType,
1508 groups map[string][]string, isChannelBasedGroup bool, OLTSerialNum []string, activeChannelsPerPon int, proxy map[string]common.MulticastGroupProxy) error {
1509 var mvp *MvlanProfile
1510
1511 if mvp = va.GetMvlanProfileByTag(mvlan); mvp != nil {
1512 logger.Errorw(ctx, "Duplicate MVLAN ID configured", log.Fields{"mvlan": mvlan})
1513 return errors.New("MVLAN profile with same VLANID exists")
1514 }
1515 if mvpIntf, ok := va.MvlanProfilesByName.Load(name); ok {
1516 mvp = mvpIntf.(*MvlanProfile)
1517 for _, serialNum := range OLTSerialNum {
1518 if mvp.DevicesList[serialNum] != Nil {
1519 //This is backup restore scenario, just update the profile
1520 logger.Info(ctx, "Add Mvlan : Profile Name already exists, update-the-profile")
1521 return va.UpdateMvlanProfile(name, mvlan, groups, activeChannelsPerPon, proxy)
1522 }
1523 }
1524 }
1525
1526 if mvp == nil {
1527 mvp = NewMvlanProfile(name, mvlan, ponVlan, isChannelBasedGroup, OLTSerialNum, uint32(activeChannelsPerPon))
1528 }
1529
1530 va.storeMvlansMap(mvlan, name, mvp)
1531
1532 /* Need to protect groups and proxy write lock */
1533 mvp.mvpLock.Lock()
1534 for grpName, grpInfo := range groups {
1535 mvp.AddMvlanGroup(grpName, grpInfo)
1536 }
1537 for grpName, proxyInfo := range proxy {
1538 mvp.AddMvlanProxy(grpName, proxyInfo)
1539 }
1540 if _, ok := mvp.Groups[common.StaticGroup]; ok {
1541 if _, yes := mvp.Proxy[common.StaticGroup]; !yes {
1542 mvp.Groups[common.StaticGroup].IsStatic = true
1543 }
1544 }
1545
1546 logger.Debugw(ctx, "Added MVLAN Profile", log.Fields{"MVLAN": mvp.Mvlan, "PonVlan": mvp.PonVlan, "Name": mvp.Name, "Grp IPs": mvp.Groups, "IsPonVlanPresent": mvp.IsPonVlanPresent})
1547 mvp.mvpLock.Unlock()
1548
1549 if err := mvp.WriteToDb(); err != nil {
1550 logger.Errorw(ctx, "Mvlan profile Write to DB failed", log.Fields{"ProfileName": mvp.Name})
1551 }
1552
1553 return nil
1554}
1555
Naveen Sampath04696f72022-06-13 15:19:14 +05301556// GetMvlanProfileForMcIP - Get an MVLAN profile for a given MC IP. This is used when an
1557// IGMP report is received from the PON port. The MVLAN profile
1558// located is used to idnetify the MC VLAN used in upstream for
1559// join/leave
1560func (va *VoltApplication) GetMvlanProfileForMcIP(profileName string, ip net.IP) (*MvlanProfile, string) {
1561 if mvpIntf, ok := va.MvlanProfilesByName.Load(profileName); ok {
1562 mvp := mvpIntf.(*MvlanProfile)
1563 if grpName := mvp.GetMvlanGroup(ip); grpName != "" {
1564 return mvp, grpName
1565 }
1566 } else {
1567 logger.Warnw(ctx, "Mvlan Profile not found for given profile name", log.Fields{"Profile": profileName})
1568 }
1569 return nil, ""
1570}
1571
Naveen Sampath04696f72022-06-13 15:19:14 +05301572// IgmpTick for igmp tick info
1573func (va *VoltApplication) IgmpTick() {
1574 tickCount++
1575 if (tickCount % 1000) == 0 {
1576 logger.Debugw(ctx, "Time @ Tick", log.Fields{"Tick": tickCount, "Time": time.Now()})
1577 }
1578 igmptick := func(key interface{}, value interface{}) bool {
1579 ig := value.(*IgmpGroup)
1580 if ig.NumDevicesActive() != 0 {
1581 if tickCount%10 == ig.Hash()%10 {
1582 ig.IgmpGroupLock.Lock()
1583 ig.Tick()
1584 ig.IgmpGroupLock.Unlock()
1585 if ig.NumDevicesActive() == 0 {
1586 va.DelIgmpGroup(ig)
1587 }
1588 }
1589 }
1590 return true
1591 }
1592 va.IgmpGroups.Range(igmptick)
1593}
1594
1595// Tick to add Tick Task
1596func (va *VoltApplication) Tick() {
1597 tt := NewTickTask()
1598 va.IgmpTasks.AddTask(tt)
1599 // va.IgmpTick()
1600}
1601
1602//AddIgmpProfile for addition of IGMP Profile
1603func (va *VoltApplication) AddIgmpProfile(igmpProfileConfig *common.IGMPConfig) error {
1604 var igmpProfile *IgmpProfile
1605
1606 if igmpProfileConfig.ProfileID == DefaultIgmpProfID {
1607 logger.Info(ctx, "Updating default IGMP profile")
1608 return va.UpdateIgmpProfile(igmpProfileConfig)
1609 }
1610
1611 igmpProfile = va.checkIgmpProfileMap(igmpProfileConfig.ProfileID)
1612 if igmpProfile == nil {
1613 igmpProfile = newIgmpProfile(igmpProfileConfig)
1614 } else {
1615 logger.Errorw(ctx, "IGMP profile already exists", log.Fields{"IgmpProfile": igmpProfileConfig.ProfileID})
1616 return errors.New("IGMP Profile already exists")
1617 }
1618
1619 va.storeIgmpProfileMap(igmpProfileConfig.ProfileID, igmpProfile)
1620
1621 if err := igmpProfile.WriteToDb(); err != nil {
1622 logger.Errorw(ctx, "Igmp profile Write to DB failed", log.Fields{"profileID": igmpProfile.ProfileID})
1623 }
1624
1625 return nil
1626}
1627
Naveen Sampath04696f72022-06-13 15:19:14 +05301628// checkIgmpProfileMap to get Igmp Profile. If not found return nil
1629func (va *VoltApplication) checkIgmpProfileMap(name string) *IgmpProfile {
1630 if igmpProfileIntf, ok := va.IgmpProfilesByName.Load(name); ok {
1631 return igmpProfileIntf.(*IgmpProfile)
1632 }
1633 return nil
1634}
1635
Naveen Sampath04696f72022-06-13 15:19:14 +05301636func (va *VoltApplication) resetIgmpProfileToDefault() {
1637 igmpProf := va.getIgmpProfileMap(DefaultIgmpProfID)
1638 defIgmpProf := newDefaultIgmpProfile()
1639
1640 igmpProf.UnsolicitedTimeOut = defIgmpProf.UnsolicitedTimeOut
1641 igmpProf.MaxResp = defIgmpProf.MaxResp
1642 igmpProf.KeepAliveInterval = defIgmpProf.KeepAliveInterval
1643 igmpProf.KeepAliveCount = defIgmpProf.KeepAliveCount
1644 igmpProf.LastQueryInterval = defIgmpProf.LastQueryInterval
1645 igmpProf.LastQueryCount = defIgmpProf.LastQueryCount
1646 igmpProf.FastLeave = defIgmpProf.FastLeave
1647 igmpProf.PeriodicQuery = defIgmpProf.PeriodicQuery
1648 igmpProf.IgmpCos = defIgmpProf.IgmpCos
1649 igmpProf.WithRAUpLink = defIgmpProf.WithRAUpLink
1650 igmpProf.WithRADownLink = defIgmpProf.WithRADownLink
1651 igmpProf.IgmpVerToServer = defIgmpProf.IgmpVerToServer
1652 igmpProf.IgmpSourceIP = defIgmpProf.IgmpSourceIP
1653
1654 if err := igmpProf.WriteToDb(); err != nil {
1655 logger.Errorw(ctx, "Igmp profile Write to DB failed", log.Fields{"profileID": igmpProf.ProfileID})
1656 }
1657}
1658
1659// getIgmpProfileMap to get Igmp Profile. If not found return default IGMP config
1660func (va *VoltApplication) getIgmpProfileMap(name string) *IgmpProfile {
1661 if igmpProfileIntf, ok := va.IgmpProfilesByName.Load(name); ok {
1662 return igmpProfileIntf.(*IgmpProfile)
1663 }
1664
1665 // There will be always a default igmp profile.
1666 defaultIgmpProfileIntf, _ := va.IgmpProfilesByName.Load(DefaultIgmpProfID)
1667 return defaultIgmpProfileIntf.(*IgmpProfile)
1668}
1669
1670// storeIgmpProfileMap to store Igmp Profile
1671func (va *VoltApplication) storeIgmpProfileMap(name string, igmpProfile *IgmpProfile) {
1672 va.IgmpProfilesByName.Store(name, igmpProfile)
1673}
1674
1675// deleteIgmpProfileMap to delete Igmp Profile
1676func (va *VoltApplication) deleteIgmpProfileMap(name string) {
1677 va.IgmpProfilesByName.Delete(name)
1678}
1679
Naveen Sampath04696f72022-06-13 15:19:14 +05301680//DelIgmpProfile for addition of IGMP Profile
1681func (va *VoltApplication) DelIgmpProfile(igmpProfileConfig *common.IGMPConfig) error {
1682 // Deletion of default igmp profile is blocked from submgr. Keeping additional check for safety.
1683 if igmpProfileConfig.ProfileID == DefaultIgmpProfID {
1684 logger.Info(ctx, "Resetting default IGMP profile")
1685 va.resetIgmpProfileToDefault()
1686 return nil
1687 }
1688 igmpProfile := va.checkIgmpProfileMap(igmpProfileConfig.ProfileID)
1689 if igmpProfile == nil {
1690 logger.Warnw(ctx, "Igmp Profile not found. Unable to delete", log.Fields{"Profile ID": igmpProfileConfig.ProfileID})
1691 return nil
1692 }
1693
1694 va.deleteIgmpProfileMap(igmpProfileConfig.ProfileID)
1695
1696 _ = db.DelIgmpProfile(igmpProfileConfig.ProfileID)
1697
1698 return nil
1699}
1700
1701//UpdateIgmpProfile for addition of IGMP Profile
1702func (va *VoltApplication) UpdateIgmpProfile(igmpProfileConfig *common.IGMPConfig) error {
1703 igmpProfile := va.checkIgmpProfileMap(igmpProfileConfig.ProfileID)
1704 if igmpProfile == nil {
1705 logger.Errorw(ctx, "Igmp Profile not found. Unable to update", log.Fields{"Profile ID": igmpProfileConfig.ProfileID})
1706 return errors.New("IGMP Profile not found")
1707 }
1708
1709 igmpProfile.ProfileID = igmpProfileConfig.ProfileID
1710 igmpProfile.UnsolicitedTimeOut = uint32(igmpProfileConfig.UnsolicitedTimeOut)
1711 igmpProfile.MaxResp = uint32(igmpProfileConfig.MaxResp)
1712
1713 keepAliveInterval := uint32(igmpProfileConfig.KeepAliveInterval)
1714
1715 //KeepAliveInterval should have a min of 10 seconds
1716 if keepAliveInterval < MinKeepAliveInterval {
1717 keepAliveInterval = MinKeepAliveInterval
1718 logger.Infow(ctx, "Auto adjust keepAliveInterval - Value < 10", log.Fields{"Received": igmpProfileConfig.KeepAliveInterval, "Configured": keepAliveInterval})
1719 }
1720 igmpProfile.KeepAliveInterval = keepAliveInterval
1721
1722 igmpProfile.KeepAliveCount = uint32(igmpProfileConfig.KeepAliveCount)
1723 igmpProfile.LastQueryInterval = uint32(igmpProfileConfig.LastQueryInterval)
1724 igmpProfile.LastQueryCount = uint32(igmpProfileConfig.LastQueryCount)
1725 igmpProfile.FastLeave = *igmpProfileConfig.FastLeave
1726 igmpProfile.PeriodicQuery = *igmpProfileConfig.PeriodicQuery
1727 igmpProfile.IgmpCos = uint8(igmpProfileConfig.IgmpCos)
1728 igmpProfile.WithRAUpLink = *igmpProfileConfig.WithRAUpLink
1729 igmpProfile.WithRADownLink = *igmpProfileConfig.WithRADownLink
1730
1731 if igmpProfileConfig.IgmpVerToServer == "2" || igmpProfileConfig.IgmpVerToServer == "v2" {
1732 igmpProfile.IgmpVerToServer = "2"
1733 } else {
1734 igmpProfile.IgmpVerToServer = "3"
1735 }
1736
1737 if igmpProfileConfig.IgmpSourceIP != "" {
1738 igmpProfile.IgmpSourceIP = net.ParseIP(igmpProfileConfig.IgmpSourceIP)
1739 }
1740
1741 if err := igmpProfile.WriteToDb(); err != nil {
1742 logger.Errorw(ctx, "Igmp profile Write to DB failed", log.Fields{"profileID": igmpProfile.ProfileID})
1743 }
1744
1745 return nil
1746}
1747
1748// RestoreIGMPProfilesFromDb to read from the DB and restore IGMP Profiles
1749func (va *VoltApplication) RestoreIGMPProfilesFromDb() {
1750 // Loading IGMP profiles
1751 igmpProfiles, _ := db.GetIgmpProfiles()
1752 for _, igmpProfile := range igmpProfiles {
1753 b, ok := igmpProfile.Value.([]byte)
1754 if !ok {
1755 logger.Warn(ctx, "The value type is not []byte")
1756 continue
1757 }
1758 var igmpProf IgmpProfile
1759 err := json.Unmarshal(b, &igmpProf)
1760 if err != nil {
1761 logger.Warn(ctx, "Unmarshal of IGMP Profile failed")
1762 continue
1763 }
1764 va.storeIgmpProfileMap(igmpProf.ProfileID, &igmpProf)
1765 logger.Infow(ctx, "Restored Igmp Profile", log.Fields{"Conf": igmpProf})
1766 }
1767}
1768
1769// InitIgmpSrcMac for initialization of igmp source mac
1770func (va *VoltApplication) InitIgmpSrcMac() {
1771 srcMac, err := getPodMacAddr()
1772 if err != nil {
1773 igmpSrcMac = "00:11:11:11:11:11"
1774 return
1775 }
1776 igmpSrcMac = srcMac
1777}
1778
Naveen Sampath04696f72022-06-13 15:19:14 +05301779// DelMvlanProfile for deletion of a MVLAN group
1780func (va *VoltApplication) DelMvlanProfile(name string) error {
1781 if mvpIntf, ok := va.MvlanProfilesByName.Load(name); ok {
1782 mvp := mvpIntf.(*MvlanProfile)
1783
1784 if len(mvp.DevicesList) == 0 {
1785 mvp.DeleteInProgress = true
1786 mvp.DelFromDb()
1787 va.deleteMvlansMap(mvp.Mvlan, name)
1788 logger.Debugw(ctx, "Deleted MVLAN Profile", log.Fields{"Name": mvp.Name})
1789 } else {
1790 logger.Errorw(ctx, "Unable to delete Mvlan Profile as there is still an OLT attached to it", log.Fields{"Name": mvp.Name,
1791 "Device List": mvp.DevicesList})
1792 return errors.New("MVLAN attached to devices")
1793 }
1794
1795 return nil
1796 }
1797 logger.Errorw(ctx, "MVLAN Profile not found", log.Fields{"MvlanProfile Name": name})
1798 return nil
1799}
1800
1801// ReceiverUpInd for receiver up indication
1802func (va *VoltApplication) ReceiverUpInd(device string, port string, mvpName string, vlan of.VlanType, pbits []of.PbitType) {
1803 logger.Infow(ctx, "Receiver Indication: UP", log.Fields{"device": device, "port": port, "MVP": mvpName, "vlan": vlan, "pbits": pbits})
1804 if mvpIntf, ok := va.MvlanProfilesByName.Load(mvpName); ok {
1805 mvp := mvpIntf.(*MvlanProfile)
1806 if devIntf, ok := va.DevicesDisc.Load(device); ok {
1807 dev := devIntf.(*VoltDevice)
1808 proxyCfg, proxyIP, _ := getIgmpProxyCfgAndIP(mvp.Mvlan, dev.SerialNum)
1809 for _, pbit := range pbits {
1810 sendGeneralQuery(device, port, vlan, uint8(pbit), proxyCfg, proxyIP)
1811 }
1812 } else {
1813 logger.Warnw(ctx, "Device not found for given port", log.Fields{"device": device, "port": port})
1814 }
1815 } else {
1816 logger.Warnw(ctx, "Mvlan Profile not found for given profileName", log.Fields{"MVP": mvpName, "vlan": vlan})
1817 }
1818}
1819
1820// sendGeneralQuery to send general query
1821func sendGeneralQuery(device string, port string, cVlan of.VlanType, pbit uint8, proxyCfg *IgmpProfile, proxyIP *net.IP) {
1822
Tinoj Josephcf161be2022-07-07 19:47:47 +05301823 if queryPkt, err := Igmpv2QueryPacket(AllSystemsMulticastGroupIP, cVlan, *proxyIP, pbit, proxyCfg.MaxResp); err == nil {
Naveen Sampath04696f72022-06-13 15:19:14 +05301824 if err := cntlr.GetController().PacketOutReq(device, port, port, queryPkt, false); err != nil {
1825 logger.Warnw(ctx, "General Igmpv2 Query Failed to send", log.Fields{"Device": device, "Port": port, "Packet": queryPkt, "Pbit": pbit})
1826 } else {
1827 logger.Debugw(ctx, "General Igmpv2 Query Sent", log.Fields{"Device": device, "Port": port, "Packet": queryPkt, "Pbit": pbit})
1828 }
1829 }
1830 if getVersion(proxyCfg.IgmpVerToServer) == IgmpVersion3 {
Tinoj Josephcf161be2022-07-07 19:47:47 +05301831 if queryPkt, err := Igmpv3QueryPacket(AllSystemsMulticastGroupIP, cVlan, *proxyIP, pbit, proxyCfg.MaxResp); err == nil {
Naveen Sampath04696f72022-06-13 15:19:14 +05301832 if err := cntlr.GetController().PacketOutReq(device, port, port, queryPkt, false); err != nil {
1833 logger.Warnw(ctx, "General Igmpv3 Query Failed to send", log.Fields{"Device": device, "Port": port, "Packet": queryPkt, "Pbit": pbit})
1834 } else {
1835 logger.Debugw(ctx, "General Igmpv3 Query Sent", log.Fields{"Device": device, "Port": port, "Packet": queryPkt, "Pbit": pbit})
1836 }
1837 }
1838 }
1839}
1840
1841// ReceiverDownInd to send receiver down indication
1842func (va *VoltApplication) ReceiverDownInd(device string, port string) {
1843 logger.Infow(ctx, " Receiver Indication: DOWN", log.Fields{"device": device, "port": port})
1844
1845 ponPortID := va.GetPonPortID(device, port)
1846
1847 del := func(key interface{}, value interface{}) bool {
1848 ig := value.(*IgmpGroup)
1849 ig.IgmpGroupLock.Lock()
1850 ig.DelReceiveronDownInd(device, port, ponPortID)
1851 ig.IgmpGroupLock.Unlock()
1852 if ig.NumDevicesActive() == 0 {
1853 va.DelIgmpGroup(ig)
1854 }
1855 return true
1856 }
1857 va.IgmpGroups.Range(del)
1858}