blob: ba3f8fe5726b86e92f0bc7c74bdefe04f77d67d6 [file] [log] [blame]
Naveen Sampath04696f72022-06-13 15:19:14 +05301/*
2* Copyright 2022-present Open Networking Foundation
3* Licensed under the Apache License, Version 2.0 (the "License");
4* you may not use this file except in compliance with the License.
5* You may obtain a copy of the License at
6*
7* http://www.apache.org/licenses/LICENSE-2.0
8*
9* Unless required by applicable law or agreed to in writing, software
10* distributed under the License is distributed on an "AS IS" BASIS,
11* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12* See the License for the specific language governing permissions and
13* limitations under the License.
14*/
15
16package application
17
18import (
19 "encoding/json"
20 "errors"
21 "net"
22 "reflect"
23 "voltha-go-controller/internal/pkg/types"
24 "strconv"
25 "strings"
26 "sync"
27 "time"
28
29 "github.com/google/gopacket"
30 "github.com/google/gopacket/layers"
31
32 cntlr "voltha-go-controller/internal/pkg/controller"
33 "voltha-go-controller/database"
34 "voltha-go-controller/internal/pkg/of"
35 "voltha-go-controller/internal/pkg/util"
Tinoj Joseph1d108322022-07-13 10:07:39 +053036 "voltha-go-controller/log"
Naveen Sampath04696f72022-06-13 15:19:14 +053037)
38
39const (
40 // IgmpVersion0 constant (Default init value)
41 IgmpVersion0 uint8 = 0
42 // IgmpVersion1 constant
43 IgmpVersion1 uint8 = 1
44 // IgmpVersion2 constant
45 IgmpVersion2 uint8 = 2
46 // IgmpVersion3 constant
47 IgmpVersion3 uint8 = 3
48 // MinKeepAliveInterval constant
49 MinKeepAliveInterval uint32 = 10
50 // MaxDiffKAIntervalResp constant
51 MaxDiffKAIntervalResp uint32 = 5
52 // StaticGroup constant
53 StaticGroup string = "static"
54 // DynamicGroup constant
55 DynamicGroup string = "dynamic"
56 // StaticPort constant
57 StaticPort string = "static_port"
58 // DefaultIgmpProfID constant
59 DefaultIgmpProfID = ""
60 //GroupExpiryTime - group expiry time in minutes
61 GroupExpiryTime uint32 = 15
62)
63
64const (
65 // JoinUnsuccessful constant
66 JoinUnsuccessful string = "JOIN-UNSUCCESSFUL"
67 // JoinUnsuccessfulExceededIGMPChanel constant
68 JoinUnsuccessfulExceededIGMPChanel string = "Exceeded subscriber or PON port IGMP channels threshold"
69 // JoinUnsuccessfulAddFlowGroupFailed constant
70 JoinUnsuccessfulAddFlowGroupFailed string = "Failed to add flow or group for a channel"
71 // JoinUnsuccessfulGroupNotConfigured constant
72 JoinUnsuccessfulGroupNotConfigured string = "Join received from a subscriber on non-configured group"
73 // JoinUnsuccessfulVlanDisabled constant
74 JoinUnsuccessfulVlanDisabled string = "Vlan is disabled"
75 // JoinUnsuccessfulDescription constant
76 JoinUnsuccessfulDescription string = "igmp join unsuccessful"
77 // QueryExpired constant
78 QueryExpired string = "QUERY-EXPIRED"
79 // QueryExpiredGroupSpecific constant
80 QueryExpiredGroupSpecific string = "Group specific multicast query expired"
81 // QueryExpiredDescription constant
82 QueryExpiredDescription string = "igmp query expired"
83)
84
85// IgmpProfile structure
86type IgmpProfile struct {
87 ProfileID string
88 UnsolicitedTimeOut uint32 //In seconds
89 MaxResp uint32
90 KeepAliveInterval uint32
91 KeepAliveCount uint32
92 LastQueryInterval uint32
93 LastQueryCount uint32
94 FastLeave bool
95 PeriodicQuery bool
96 IgmpCos uint8
97 WithRAUpLink bool
98 WithRADownLink bool
99 IgmpVerToServer string
100 IgmpSourceIP net.IP
101 Version string
102}
103
104// McastConfig structure
105type McastConfig struct {
106 OltSerialNum string
107 MvlanProfileID string
108 IgmpProfileID string
109 IgmpProxyIP net.IP
110 OperState OperInProgress
111 Version string
112 // This map will help in updating the igds whenever there is a igmp profile id update
113 IgmpGroupDevices sync.Map `json:"-"` // Key is group id
114}
115
116var (
117 // NullIPAddr is null ip address var
118 NullIPAddr = net.ParseIP("0.0.0.0")
119 // igmpSrcMac for the proxy
120 igmpSrcMac string
121)
122
123func init() {
124 RegisterPacketHandler(IGMP, ProcessIgmpPacket)
125}
126
127// ProcessIgmpPacket : CallBack function registered with application to handle IGMP packetIn
128func ProcessIgmpPacket(device string, port string, pkt gopacket.Packet) {
129 GetApplication().IgmpPacketInd(device, port, pkt)
130}
131
132func ipv4ToUint(ip net.IP) uint32 {
133 result := uint32(0)
134 addr := ip.To4()
135 if addr == nil {
136 logger.Warnw(ctx, "Invalid Group Addr", log.Fields{"IP": ip})
137 return 0
138 }
139 result = result + uint32(addr[0])<<24
140 result = result + uint32(addr[1])<<16
141 result = result + uint32(addr[2])<<8
142 result = result + uint32(addr[3])
143 return result
144}
145
146func getPodMacAddr() (string, error) {
147 ifas, err := net.Interfaces()
148 if err != nil {
149 return "", err
150 }
151 var ipv4Addr net.IP
152 for _, ifa := range ifas {
153 addrs, err := ifa.Addrs()
154 if err != nil {
155 return "", err
156 }
157 for _, addr := range addrs {
158 if ipv4Addr = addr.(*net.IPNet).IP.To4(); ipv4Addr != nil {
159 if ipv4Addr.IsGlobalUnicast() {
160 logger.Infow(ctx, "Igmp Static config", log.Fields{"MacAddr": ifa.HardwareAddr.String(), "ipAddr": ipv4Addr})
161 return ifa.HardwareAddr.String(), nil
162 }
163 }
164 }
165
166 }
167 return "", errors.New("MAC Address not found,Setting default")
168}
169
170// IgmpUsEthLayer : Layers defined for upstream communication
171// Ethernet layer for upstream communication
172func IgmpUsEthLayer(mcip net.IP) *layers.Ethernet {
173 eth := &layers.Ethernet{}
174 // TODO: Set the source MAC properly and remove hardcoding
175 eth.SrcMAC, _ = net.ParseMAC(igmpSrcMac)
176 eth.DstMAC, _ = net.ParseMAC("01:00:5e:00:00:00")
177 eth.DstMAC[3] = mcip[1] & 0x7f
178 eth.DstMAC[4] = mcip[2]
179 eth.DstMAC[5] = mcip[3]
180 eth.EthernetType = layers.EthernetTypeDot1Q
181 return eth
182}
183
184// IgmpUsDot1qLayer set US VLAN layer
185func IgmpUsDot1qLayer(vlan of.VlanType, priority uint8) *layers.Dot1Q {
186 dot1q := &layers.Dot1Q{}
187 dot1q.Priority = priority
188 dot1q.DropEligible = false
189 dot1q.VLANIdentifier = uint16(vlan)
190 dot1q.Type = layers.EthernetTypeIPv4
191 return dot1q
192}
193
194// Igmpv2UsIpv4Layer : Set the IP layer for IGMPv2
195// TODO - Identify correct way of obtaining source IP
196// This should be the configured IGMP proxy address which should be per OLT
197// We should probably be able to have a single function for both
198// upstream and downstream
199func Igmpv2UsIpv4Layer(src net.IP, mcip net.IP) *layers.IPv4 {
200 ip := &layers.IPv4{}
201 ip.Version = 4
202 ip.Protocol = layers.IPProtocolIGMP
203 ip.TTL = 1
204 ip.SrcIP = src
205 ip.DstIP = mcip
206 return ip
207}
208
209// Igmpv3UsIpv4Layer : Set the IP layer for IGMPv3
210// TODO - Identify correct way of obtaining source IP
211// This should be the configured IGMP proxy address which should be per OLT
212// We should probably be able to have a single function for both
213// upstream and downstream
214func Igmpv3UsIpv4Layer(src net.IP) *layers.IPv4 {
215 ip := &layers.IPv4{}
216 ip.Version = 4
217 ip.Protocol = layers.IPProtocolIGMP
218 ip.TTL = 1
219 ip.SrcIP = src
220 ip.DstIP = net.ParseIP("224.0.0.22")
221 return ip
222}
223
224// IgmpDsEthLayer : Layers defined for downstream communication
225// Ethernet layer for downstream communication
226func IgmpDsEthLayer(mcip net.IP) *layers.Ethernet {
227 eth := &layers.Ethernet{}
228 // TODO: Set the source and dest MAC properly and remove hardcoding
229 eth.SrcMAC, _ = net.ParseMAC(igmpSrcMac)
230 eth.DstMAC, _ = net.ParseMAC("01:00:5e:00:00:00")
231 eth.DstMAC[3] = mcip[1] & 0x7f
232 eth.DstMAC[4] = mcip[2]
233 eth.DstMAC[5] = mcip[3]
234 eth.EthernetType = layers.EthernetTypeDot1Q
235 return eth
236}
237
238// IgmpDsDot1qLayer set the DS VLAN layer
239func IgmpDsDot1qLayer(vlan of.VlanType, priority uint8) *layers.Dot1Q {
240 dot1q := &layers.Dot1Q{}
241 dot1q.Priority = priority
242 dot1q.DropEligible = false
243 dot1q.VLANIdentifier = uint16(vlan)
244 dot1q.Type = layers.EthernetTypeIPv4
245 return dot1q
246}
247
248// IgmpDsIpv4Layer set the IP layer
249func IgmpDsIpv4Layer(src net.IP, mcip net.IP) *layers.IPv4 {
250 ip := &layers.IPv4{}
251 ip.Version = 4
252 ip.Protocol = layers.IPProtocolIGMP
253 ip.TTL = 1
254 ip.SrcIP = src
255 if mcip.Equal(net.ParseIP("0.0.0.0")) {
256 mcip = net.ParseIP("224.0.0.1")
257 }
258 ip.DstIP = mcip
259 return ip
260}
261
262// IgmpQueryv2Layer : IGMP Query Layer
263func IgmpQueryv2Layer(mcip net.IP, resptime time.Duration) *layers.IGMPv1or2 {
264 igmp := &layers.IGMPv1or2{}
265 igmp.Type = layers.IGMPMembershipQuery
266 igmp.GroupAddress = mcip
267 igmp.MaxResponseTime = resptime
268 return igmp
269}
270
271// IgmpQueryv3Layer : IGMP v3 Query Layer
272func IgmpQueryv3Layer(mcip net.IP, resptime time.Duration) *layers.IGMP {
273 igmp := &layers.IGMP{}
274 igmp.Type = layers.IGMPMembershipQuery
275 igmp.GroupAddress = mcip
276 igmp.MaxResponseTime = resptime
277 return igmp
278}
279
280// IgmpReportv2Layer : IGMP Layer
281func IgmpReportv2Layer(mcip net.IP) *layers.IGMPv1or2 {
282 igmp := &layers.IGMPv1or2{}
283 igmp.Type = layers.IGMPMembershipReportV2
284 igmp.GroupAddress = mcip
285 return igmp
286}
287
288// IgmpLeavev2Layer : IGMP Leave Layer
289func IgmpLeavev2Layer(mcip net.IP) *layers.IGMPv1or2 {
290 igmp := &layers.IGMPv1or2{}
291 igmp.Type = layers.IGMPLeaveGroup
292 igmp.GroupAddress = mcip
293 return igmp
294}
295
296// IgmpReportv3Layer : IGMP v3 Report Layer
297func IgmpReportv3Layer(mcip net.IP, incl bool, srclist []net.IP) *layers.IGMP {
298 // IGMP base
299 igmp := &layers.IGMP{}
300 igmp.Type = layers.IGMPMembershipReportV3
301 igmp.NumberOfGroupRecords = 1
302
303 // IGMP Group
304 group := layers.IGMPv3GroupRecord{}
305 if incl {
306 group.Type = layers.IGMPIsIn
307 } else {
308 group.Type = layers.IGMPIsEx
309 }
310 group.MulticastAddress = mcip
311 group.NumberOfSources = uint16(len(srclist))
312 group.SourceAddresses = srclist
313 igmp.GroupRecords = append(igmp.GroupRecords, group)
314
315 return igmp
316}
317
318// Igmpv2QueryPacket : IGMP Query in Downstream
319func Igmpv2QueryPacket(mcip net.IP, vlan of.VlanType, selfip net.IP, pbit uint8, maxResp uint32) ([]byte, error) {
320 // Construct the layers that form the packet
321 eth := IgmpDsEthLayer(mcip)
322 dot1q := IgmpDsDot1qLayer(vlan, pbit)
323 ip := IgmpDsIpv4Layer(selfip, mcip)
324 igmp := IgmpQueryv2Layer(mcip, time.Duration(maxResp)*time.Second)
325
326 // Now prepare the buffer into which the layers are to be serialized
327 buff := gopacket.NewSerializeBuffer()
328 opts := gopacket.SerializeOptions{
329 FixLengths: true,
330 ComputeChecksums: true,
331 }
332 if err := gopacket.SerializeLayers(buff, opts, eth, dot1q, ip, igmp); err != nil {
333 logger.Error(ctx, "Error in serializing layers")
334 return nil, err
335 }
336 return buff.Bytes(), nil
337}
338
339// Igmpv3QueryPacket : IGMPv3 Query in Downstream
340func Igmpv3QueryPacket(mcip net.IP, vlan of.VlanType, selfip net.IP, pbit uint8, maxResp uint32) ([]byte, error) {
341 // Construct the layers that form the packet
342 eth := IgmpDsEthLayer(mcip)
343 dot1q := IgmpDsDot1qLayer(vlan, pbit)
344 ip := IgmpDsIpv4Layer(selfip, mcip)
345 igmp := IgmpQueryv3Layer(mcip, time.Duration(maxResp)*time.Second)
346
347 // Now prepare the buffer into which the layers are to be serialized
348 buff := gopacket.NewSerializeBuffer()
349 opts := gopacket.SerializeOptions{
350 FixLengths: true,
351 ComputeChecksums: true,
352 }
353 if err := gopacket.SerializeLayers(buff, opts, eth, dot1q, ip, igmp); err != nil {
354 logger.Error(ctx, "Error in serializing layers")
355 return nil, err
356 }
357 return buff.Bytes(), nil
358}
359
360// IgmpReportv2Packet : Packet - IGMP v2 report in upstream
361func IgmpReportv2Packet(mcip net.IP, vlan of.VlanType, priority uint8, selfip net.IP) ([]byte, error) {
362 // Construct the layers that form the packet
363 eth := IgmpUsEthLayer(mcip)
364 dot1q := IgmpUsDot1qLayer(vlan, priority)
365 ip := Igmpv2UsIpv4Layer(selfip, mcip)
366 igmp := IgmpReportv2Layer(mcip)
367
368 // Now prepare the buffer into which the layers are to be serialized
369 buff := gopacket.NewSerializeBuffer()
370 opts := gopacket.SerializeOptions{
371 FixLengths: true,
372 ComputeChecksums: true,
373 }
374 if err := gopacket.SerializeLayers(buff, opts, eth, dot1q, ip, igmp); err != nil {
375 logger.Error(ctx, "Error in serializing layers")
376 return nil, err
377 }
378 return buff.Bytes(), nil
379}
380
381// Igmpv3ReportPacket : Packet - IGMP v3 report in upstream
382func Igmpv3ReportPacket(mcip net.IP, vlan of.VlanType, priority uint8, selfip net.IP, incl bool, srclist []net.IP) ([]byte, error) {
383 // Construct the layers that form the packet
384 eth := IgmpUsEthLayer(net.ParseIP("224.0.0.22").To4())
385 dot1q := IgmpUsDot1qLayer(vlan, priority)
386 ip := Igmpv3UsIpv4Layer(selfip)
387 igmp := IgmpReportv3Layer(mcip, incl, srclist)
388
389 // Now prepare the buffer into which the layers are to be serialized
390 buff := gopacket.NewSerializeBuffer()
391 opts := gopacket.SerializeOptions{
392 FixLengths: true,
393 ComputeChecksums: true,
394 }
395 if err := gopacket.SerializeLayers(buff, opts, eth, dot1q, ip, igmp); err != nil {
396 logger.Error(ctx, "Error in serializing layers")
397 return nil, err
398 }
399 return buff.Bytes(), nil
400}
401
402// IgmpLeavePacket : Packet- IGMP Leave in upstream
403func IgmpLeavePacket(mcip net.IP, vlan of.VlanType, priority uint8, selfip net.IP) ([]byte, error) {
404 // Construct the layers that form the packet
405 eth := IgmpUsEthLayer(mcip)
406 dot1q := IgmpUsDot1qLayer(vlan, priority)
407 ip := Igmpv2UsIpv4Layer(selfip, mcip)
408 igmp := IgmpLeavev2Layer(mcip)
409
410 // Now prepare the buffer into which the layers are to be serialized
411 buff := gopacket.NewSerializeBuffer()
412 opts := gopacket.SerializeOptions{
413 FixLengths: true,
414 ComputeChecksums: true,
415 }
416 if err := gopacket.SerializeLayers(buff, opts, eth, dot1q, ip, igmp); err != nil {
417 logger.Error(ctx, "Error in serializing layers")
418 return nil, err
419 }
420 return buff.Bytes(), nil
421}
422
423// getVersion to get igmp version type
424func getVersion(ver string) uint8 {
425 if ver == "2" || ver == "v2" {
426 return IgmpVersion2
427 }
428 return IgmpVersion3
429}
430
431// IsIPPresent is Utility to check if an IP address is in a list
432func IsIPPresent(i net.IP, ips []net.IP) bool {
433 for _, ip := range ips {
434 if i.Equal(ip) {
435 return true
436 }
437 }
438 return false
439}
440
441// IgmpGroupPort : IGMP port implements a port which is associated with an IGMP
442// version and the list of sources it implements for a given IGMP
443// channel. We may improve this to have all IGMP channels so that
444// we can implement per subscriber IGMP channel registration limits
445// As a rule a single port cannot have both include and exclude
446// lists. If we receive a include list we should purge the other
447// list which is TODO
448type IgmpGroupPort struct {
449 Port string
450 CVlan uint16
451 Pbit uint8
452 Version uint8
453 Exclude bool
454 ExcludeList []net.IP
455 IncludeList []net.IP
456 QueryTimeoutCount uint32
457 PonPortID uint32
458}
459
460// NewIgmpGroupPort is constructor for a port
461func NewIgmpGroupPort(port string, cvlan uint16, pbit uint8, version uint8, incl bool, ponPortID uint32) *IgmpGroupPort {
462 var igp IgmpGroupPort
463 igp.Port = port
464 igp.CVlan = cvlan
465 igp.Pbit = pbit
466 igp.Version = version
467 igp.Exclude = !incl
468 igp.QueryTimeoutCount = 0
469 igp.PonPortID = ponPortID
470 return &igp
471}
472
473// InclSourceIsIn checks if a source is in include list
474func (igp *IgmpGroupPort) InclSourceIsIn(src net.IP) bool {
475 return IsIPPresent(src, igp.IncludeList)
476}
477
478// ExclSourceIsIn checks if a source is in exclude list
479func (igp *IgmpGroupPort) ExclSourceIsIn(src net.IP) bool {
480 return IsIPPresent(src, igp.ExcludeList)
481}
482
483// AddInclSource adds a source is in include list
484func (igp *IgmpGroupPort) AddInclSource(src net.IP) {
485 logger.Debugw(ctx, "Adding Include Source", log.Fields{"Port": igp.Port, "Src": src})
486 igp.IncludeList = append(igp.IncludeList, src)
487}
488
489// AddExclSource adds a source is in exclude list
490func (igp *IgmpGroupPort) AddExclSource(src net.IP) {
491 logger.Debugw(ctx, "Adding Exclude Source", log.Fields{"Port": igp.Port, "Src": src})
492 igp.ExcludeList = append(igp.ExcludeList, src)
493}
494
495// DelInclSource deletes a source is in include list
496func (igp *IgmpGroupPort) DelInclSource(src net.IP) {
497 logger.Debugw(ctx, "Deleting Include Source", log.Fields{"Port": igp.Port, "Src": src})
498 for i, addr := range igp.IncludeList {
499 if addr.Equal(src) {
500 igp.IncludeList = append(igp.IncludeList[:i], igp.IncludeList[i+1:]...)
501 return
502 }
503 }
504}
505
506// DelExclSource deletes a source is in exclude list
507func (igp *IgmpGroupPort) DelExclSource(src net.IP) {
508 logger.Debugw(ctx, "Deleting Exclude Source", log.Fields{"Port": igp.Port, "Src": src})
509 for i, addr := range igp.ExcludeList {
510 if addr.Equal(src) {
511 igp.ExcludeList = append(igp.ExcludeList[:i], igp.ExcludeList[i+1:]...)
512 return
513 }
514 }
515}
516
517// WriteToDb is utility to write IGMP Group Port Info to database
518func (igp *IgmpGroupPort) WriteToDb(mvlan of.VlanType, gip net.IP, device string) error {
519 b, err := json.Marshal(igp)
520 if err != nil {
521 return err
522 }
523 if err1 := db.PutIgmpRcvr(mvlan, gip, device, igp.Port, string(b)); err1 != nil {
524 return err1
525 }
526 return nil
527}
528
529// NewIgmpGroupPortFromBytes create the IGMP group port from a byte slice
530func NewIgmpGroupPortFromBytes(b []byte) (*IgmpGroupPort, error) {
531 var igp IgmpGroupPort
532 if err := json.Unmarshal(b, &igp); err != nil {
533 logger.Warnw(ctx, "Decode of port failed", log.Fields{"str": string(b)})
534 return nil, err
535 }
536 return &igp, nil
537}
538
539// IgmpGroupChannel structure
540type IgmpGroupChannel struct {
541 Device string
542 GroupID uint32
543 GroupName string
544 GroupAddr net.IP
545 Mvlan of.VlanType
546 Exclude int
547 ExcludeList []net.IP
548 IncludeList []net.IP
549 Version uint8
550 ServVersion *uint8 `json:"-"`
551 CurReceivers map[string]*IgmpGroupPort `json:"-"`
552 NewReceivers map[string]*IgmpGroupPort `json:"-"`
553 proxyCfg **IgmpProfile
554 IgmpProxyIP **net.IP `json:"-"`
555}
556
557// NewIgmpGroupChannel is constructor for a channel. The default IGMP version is set to 3
558// as the protocol defines the way to manage backward compatibility
559// The implementation handles simultaneous presense of lower versioned
560// receivers
561func NewIgmpGroupChannel(igd *IgmpGroupDevice, groupAddr net.IP, version uint8) *IgmpGroupChannel {
562 var igc IgmpGroupChannel
563 igc.Device = igd.Device
564 igc.GroupID = igd.GroupID
565 igc.GroupName = igd.GroupName
566 igc.GroupAddr = groupAddr
567 igc.Mvlan = igd.Mvlan
568 igc.Version = version
569 igc.CurReceivers = make(map[string]*IgmpGroupPort)
570 igc.NewReceivers = make(map[string]*IgmpGroupPort)
571 igc.proxyCfg = &igd.proxyCfg
572 igc.IgmpProxyIP = &igd.IgmpProxyIP
573 igc.ServVersion = igd.ServVersion
574 return &igc
575}
576
577// NewIgmpGroupChannelFromBytes create the IGMP group channel from a byte slice
578func NewIgmpGroupChannelFromBytes(b []byte) (*IgmpGroupChannel, error) {
579 var igc IgmpGroupChannel
580 if err := json.Unmarshal(b, &igc); err != nil {
581 return nil, err
582 }
583 igc.CurReceivers = make(map[string]*IgmpGroupPort)
584 igc.NewReceivers = make(map[string]*IgmpGroupPort)
585 return &igc, nil
586}
587
588// RestorePorts to restore ports
589func (igc *IgmpGroupChannel) RestorePorts() {
590
591 igc.migrateIgmpPorts()
592 ports, _ := db.GetIgmpRcvrs(igc.Mvlan, igc.GroupAddr, igc.Device)
593 for _, port := range ports {
594 b, ok := port.Value.([]byte)
595 if !ok {
596 logger.Warn(ctx, "The value type is not []byte")
597 continue
598 }
599 if igp, err := NewIgmpGroupPortFromBytes(b); err == nil {
600 igc.NewReceivers[igp.Port] = igp
601 logger.Infow(ctx, "Group Port Restored", log.Fields{"IGP": igp})
602 } else {
603 logger.Warn(ctx, "Failed to decode port from DB")
604 }
605 }
606 if err := igc.WriteToDb(); err != nil {
607 logger.Errorw(ctx, "Igmp group channel Write to DB failed", log.Fields{"mvlan": igc.Mvlan, "GroupAddr": igc.GroupAddr})
608 }
609}
610
611// WriteToDb is utility to write IGMPGroupChannel Info to database
612func (igc *IgmpGroupChannel) WriteToDb() error {
613 b, err := json.Marshal(igc)
614 if err != nil {
615 return err
616 }
617 if err1 := db.PutIgmpChannel(igc.Mvlan, igc.GroupName, igc.Device, igc.GroupAddr, string(b)); err1 != nil {
618 return err1
619 }
620 logger.Info(ctx, "IGC Updated")
621 return nil
622}
623
624// UniPortList : UNI Port list per channle has stores the UNI port list for this
625// channel.
626type UniPortList struct {
627 UNIList *util.ConcurrentMap // [UNIPort] UNIPort
628}
629
630// NewUniPortsList is Constructor for UniPortList structure
631func NewUniPortsList() *UniPortList {
632 var uniPortsList UniPortList
633
634 uniPortsList.UNIList = util.NewConcurrentMap()
635 return &uniPortsList
636}
637
638// GetUniPortCount returns the number of UNI ports subscribed to
639// current channel.
640func (uniPortsList *UniPortList) GetUniPortCount() uint64 {
641 return uniPortsList.UNIList.Length()
642}
643
644// PonPortChannels : PON port channel map keeps the active channel list and its
645// count for this group.
646type PonPortChannels struct {
647 ChannelList *util.ConcurrentMap // [channelIP]*UniPortList
648}
649
650// NewPonPortChannels is constructor for PonPortChannel.
651func NewPonPortChannels() *PonPortChannels {
652 var ponPortChannel PonPortChannels
653
654 ponPortChannel.ChannelList = util.NewConcurrentMap()
655 return &ponPortChannel
656}
657
658// GetActiveChannelCount returns the number of active channel count
659// for this pon port in the current group.
660func (ponPortChannels *PonPortChannels) GetActiveChannelCount() uint32 {
661 return uint32(ponPortChannels.ChannelList.Length())
662}
663
664// AddChannelToMap Adds new channel to the pon port map
665func (ponPortChannels *PonPortChannels) AddChannelToMap(uniPort, channel string) bool {
666
667 isNewChannel := bool(false)
668 uniList, ok := ponPortChannels.ChannelList.Get(channel)
669 if !ok {
670 // Channel doesn't exists. Adding new channel.
671 uniList = NewUniPortsList()
672 isNewChannel = true
673 }
674 uniList.(*UniPortList).UNIList.Set(uniPort, uniPort)
675 ponPortChannels.ChannelList.Set(channel, uniList)
676 return isNewChannel
677}
678
679// RemoveChannelFromMap Removed channel from the pon port map
680func (ponPortChannels *PonPortChannels) RemoveChannelFromMap(uniPort, channel string) bool {
681
682 isDeleted := bool(false)
683 uniList, ok := ponPortChannels.ChannelList.Get(channel)
684 if ok {
685 uniList.(*UniPortList).UNIList.Remove(uniPort)
686 if uniList.(*UniPortList).UNIList.Length() == 0 {
687 // Last port from the channel is removed.
688 // Removing channel from PON port map.
689 ponPortChannels.ChannelList.Remove(channel)
690 isDeleted = true
691 } else {
692 ponPortChannels.ChannelList.Set(channel, uniList)
693 }
694 } else {
695 logger.Warnw(ctx, "Channel doesn't exists in the active channels list", log.Fields{"Channel": channel})
696 return isDeleted
697 }
698 return isDeleted
699}
700
701// IgmpGroupDevice : IGMP Group Device manages the IGMP group for all listerns on
702// a single OLT. It aggregates reports received on a single group
703// and performs the count. It is responsible for sending upstream
704// report when the first listener joins and is responsible for
705// sending responses to upstream queries
706type IgmpGroupDevice struct {
707 Device string
708 SerialNo string
709 GroupID uint32
710 GroupName string
711 GroupAddr net.IP
712 RecvVersion uint8
713 ServVersion *uint8
714 RecvVersionExpiry time.Time
715 ServVersionExpiry time.Time
716 Mvlan of.VlanType
717 PonVlan of.VlanType
718 IsPonVlanPresent bool
719 GroupInstalled bool
720 GroupChannels sync.Map `json:"-"` // [ipAddr]*IgmpGroupChannel
721 PortChannelMap sync.Map `json:"-"` // [portName][]net.IP
722 PonPortChannelMap *util.ConcurrentMap `json:"-"` // [ponPortId]*PonPortChannels
723 proxyCfg *IgmpProfile // IgmpSrcIp from IgmpProfile is not used, it is kept for backward compatibility
724 IgmpProxyIP *net.IP `json:"-"`
725 NextQueryTime time.Time
726 QueryExpiryTime time.Time
727}
728
729// NewIgmpGroupDevice is constructor for a device. The default IGMP version is set to 3
730// as the protocol defines the way to manage backward compatibility
731// The implementation handles simultaneous presense of lower versioned
732// receivers
733func NewIgmpGroupDevice(name string, ig *IgmpGroup, id uint32, version uint8) *IgmpGroupDevice {
734 var igd IgmpGroupDevice
735 igd.Device = name
736 igd.GroupID = id
737 igd.GroupName = ig.GroupName
738 igd.GroupAddr = ig.GroupAddr
739 igd.Mvlan = ig.Mvlan
740 igd.PonVlan = ig.PonVlan
741 igd.IsPonVlanPresent = ig.IsPonVlanPresent
742 igd.GroupInstalled = false
743 igd.RecvVersion = version
744 igd.RecvVersionExpiry = time.Now()
745 igd.ServVersionExpiry = time.Now()
746 igd.PonPortChannelMap = util.NewConcurrentMap()
747
748 va := GetApplication()
749 if vd := va.GetDevice(igd.Device); vd != nil {
750 igd.SerialNo = vd.SerialNum
751 } else {
752 logger.Errorw(ctx, "Volt Device not found. log.Fields", log.Fields{"igd.Device": igd.Device})
753 return nil
754 }
755 mvp := GetApplication().GetMvlanProfileByTag(igd.Mvlan)
756 igd.ServVersion = mvp.IgmpServVersion[igd.SerialNo]
757
758 var mcastCfg *McastConfig
759 igd.proxyCfg, igd.IgmpProxyIP, mcastCfg = getIgmpProxyCfgAndIP(ig.Mvlan, igd.SerialNo)
760
761 // mvlan profile id + olt serial number---igmp group id
762 //igmpgroup id
763 igd.NextQueryTime = time.Now().Add(time.Duration(igd.proxyCfg.KeepAliveInterval) * time.Second)
764 igd.QueryExpiryTime = time.Now().Add(time.Duration(igd.proxyCfg.KeepAliveInterval) * time.Second)
765
766 if mcastCfg != nil {
767 mcastCfg.IgmpGroupDevices.Store(id, &igd)
768 logger.Debugw(ctx, "Igd added to mcast config", log.Fields{"mvlan": mcastCfg.MvlanProfileID, "groupId": id})
769 }
770 return &igd
771}
772
773// IgmpGroupDeviceReInit is re-initializer for a device. The default IGMP version is set to 3
774// as the protocol defines the way to manage backward compatibility
775func (igd *IgmpGroupDevice) IgmpGroupDeviceReInit(ig *IgmpGroup) {
776
777 logger.Infow(ctx, "Reinitialize Igmp Group Device", log.Fields{"Device": igd.Device, "GroupID": ig.GroupID, "OldName": igd.GroupName, "Name": ig.GroupName, "OldAddr": igd.GroupAddr.String(), "GroupAddr": ig.GroupAddr.String()})
778
779 if (igd.GroupName != ig.GroupName) || !igd.GroupAddr.Equal(ig.GroupAddr) {
780 _ = db.DelIgmpDevice(igd.Mvlan, igd.GroupName, igd.GroupAddr, igd.Device)
781 igd.GroupName = ig.GroupName
782 igd.GroupAddr = ig.GroupAddr
783 }
784 igd.RecvVersionExpiry = time.Now()
785 igd.ServVersionExpiry = time.Now()
786 igd.PonPortChannelMap = util.NewConcurrentMap()
787
788 var mcastCfg *McastConfig
789 igd.proxyCfg, igd.IgmpProxyIP, mcastCfg = getIgmpProxyCfgAndIP(ig.Mvlan, igd.SerialNo)
790
791 igd.NextQueryTime = time.Now().Add(time.Duration(igd.proxyCfg.KeepAliveInterval) * time.Second)
792 igd.QueryExpiryTime = time.Now().Add(time.Duration(igd.proxyCfg.KeepAliveInterval) * time.Second)
793
794 if mcastCfg != nil {
795 mcastCfg.IgmpGroupDevices.Store(ig.GroupID, igd)
796 logger.Debugw(ctx, "Igd added to mcast config", log.Fields{"mvlan": mcastCfg.MvlanProfileID, "groupId": ig.GroupID})
797 }
798 if err := igd.WriteToDb(); err != nil {
799 logger.Errorw(ctx, "Igmp group device Write to DB failed", log.Fields{"Device": igd.Device, "GroupName": igd.GroupName, "GroupAddr": igd.GroupAddr.String()})
800 }
801}
802
803func getIgmpProxyCfgAndIP(mvlan of.VlanType, serialNo string) (*IgmpProfile, *net.IP, *McastConfig) {
804 va := GetApplication()
805 mVLANProfileID := va.GetMvlanProfileByTag(mvlan).Name
806 var mcastCfg *McastConfig
807 if mcastCfg = va.GetMcastConfig(serialNo, mVLANProfileID); mcastCfg == nil || (mcastCfg != nil && mcastCfg.IgmpProfileID == "") {
808 logger.Debugw(ctx, "Default IGMP config to be used", log.Fields{"mVLANProfileID": mVLANProfileID, "OltSerialNo": serialNo})
809 igmpProf := va.getIgmpProfileMap(DefaultIgmpProfID)
810 return igmpProf, &igmpProf.IgmpSourceIP, mcastCfg
811 }
812 return va.getIgmpProfileMap(mcastCfg.IgmpProfileID), &mcastCfg.IgmpProxyIP, mcastCfg
813}
814
815// updateGroupName to update the group name
816func (igd *IgmpGroupDevice) updateGroupName(newGroupName string) {
817
818 oldName := igd.GroupName
819 igd.GroupName = newGroupName
820 updateGroupName := func(key, value interface{}) bool {
821 igc := value.(*IgmpGroupChannel)
822 igc.GroupName = newGroupName
823 if err := igc.WriteToDb(); err != nil {
824 logger.Errorw(ctx, "Igmp group channel Write to DB failed", log.Fields{"mvlan": igc.Mvlan, "GroupAddr": igc.GroupAddr})
825 }
826 _ = db.DelIgmpChannel(igc.Mvlan, oldName, igc.Device, igc.GroupAddr)
827 return true
828 }
829 igd.GroupChannels.Range(updateGroupName)
830 if err := igd.WriteToDb(); err != nil {
831 logger.Errorw(ctx, "Igmp group device Write to DB failed", log.Fields{"Device": igd.Device, "GroupName": igd.GroupName, "GroupAddr": igd.GroupAddr.String()})
832 }
833 _ = db.DelIgmpDevice(igd.Mvlan, oldName, igd.GroupAddr, igd.Device)
834}
835
836// NewIgmpGroupDeviceFromBytes is to create the IGMP group port from a byte slice
837func NewIgmpGroupDeviceFromBytes(b []byte) (*IgmpGroupDevice, error) {
838 var igd IgmpGroupDevice
839 if err := json.Unmarshal(b, &igd); err != nil {
840 return nil, err
841 }
842 return &igd, nil
843}
844
845// GetKey to get group name as key
846func (igd *IgmpGroupDevice) GetKey() string {
847
848 if !net.ParseIP("0.0.0.0").Equal(igd.GroupAddr) {
849 return igd.GroupName + "_" + igd.GroupAddr.String()
850 }
851 return igd.GroupName
852
853}
854
855// RestoreChannel to restore channel
856func (igd *IgmpGroupDevice) RestoreChannel(igmpGroupChannel []byte) {
857
858 if igc, err := NewIgmpGroupChannelFromBytes(igmpGroupChannel); err == nil {
859 igc.ServVersion = igd.ServVersion
860 igc.IgmpProxyIP = &igd.IgmpProxyIP
861 igc.proxyCfg = &igd.proxyCfg
862 igd.GroupChannels.Store(igc.GroupAddr.String(), igc)
863 igc.RestorePorts()
864
865 for port, igp := range igc.NewReceivers {
866 ipsList := []net.IP{}
867 ipsIntf, _ := igd.PortChannelMap.Load(port)
868 if ipsIntf != nil {
869 ipsList = ipsIntf.([]net.IP)
870 }
871
872 ipsList = append(ipsList, igc.GroupAddr)
873 igd.PortChannelMap.Store(port, ipsList)
874 logger.Infow(ctx, "Group Channels Restored", log.Fields{"IGC": igc})
875 igd.AddChannelToChannelsPerPon(port, igc.GroupAddr, igp.PonPortID)
876 }
877 } else {
878 logger.Warnw(ctx, "Failed to decode port from DB", log.Fields{"err": err})
879 }
880 logger.Info(ctx, "Group Device & Channels Restored")
881 igd.PortChannelMap.Range(printPortChannel)
882 igd.GroupChannels.Range(printChannel)
883
884}
885
886// RestoreChannels to restore channels
887func (igd *IgmpGroupDevice) RestoreChannels() {
888
889 igd.migrateIgmpChannels()
890 channels, _ := db.GetIgmpChannels(igd.Mvlan, igd.GroupName, igd.Device)
891 for _, channel := range channels {
892
893 b, ok := channel.Value.([]byte)
894 if !ok {
895 logger.Warn(ctx, "The value type is not []byte")
896 continue
897 }
898 igd.RestoreChannel(b)
899 }
900
901}
902
903// printChannel to print channel info
904func printChannel(key interface{}, value interface{}) bool {
905 logger.Infow(ctx, "ChannelMap", log.Fields{"Channel": key.(string), "Igc": value.(*IgmpGroupChannel)})
906 return true
907}
908
909// printPortChannel to print port channel
910func printPortChannel(key interface{}, value interface{}) bool {
911 logger.Infow(ctx, "PortChannelMap", log.Fields{"Port": key.(string), "List": value.([]net.IP)})
912 return true
913}
914
915// WriteToDb is utility to write IGMP Group Device Info to the database
916func (igd *IgmpGroupDevice) WriteToDb() error {
917 b, err := json.Marshal(igd)
918 if err != nil {
919 return err
920 }
921 if err1 := db.PutIgmpDevice(igd.Mvlan, igd.GroupName, igd.GroupAddr, igd.Device, string(b)); err1 != nil {
922 return err1
923 }
924 logger.Info(ctx, "IGD Updated")
925 return nil
926}
927
928// Tick processes timing tick used to run timers within the device
929func (igd *IgmpGroupDevice) Tick() uint8 {
930 /* Not using RecvVersionExpiry as it is not used anywhere
931 if time.Now().After(igd.RecvVersionExpiry) {
932 igd.RecvVersion = IgmpVersion3
933 return true
934 }
935 */
936 return 0
937}
938
939// GetSubscriberCountForChannelAndPonPort Gets the active subscriber count
940// for the given channel for one particular PON port
941func (igd *IgmpGroupDevice) GetSubscriberCountForChannelAndPonPort(ponPortID uint32, channelIP net.IP) uint64 {
942 if portMapIntf, ok := igd.PonPortChannelMap.Get(ponPortID); ok {
943 portChannelMap := portMapIntf.(*PonPortChannels)
944
945 if channel, present := portChannelMap.ChannelList.Get(channelIP.String()); present {
946 return channel.(*UniPortList).UNIList.Length()
947 }
948 } else {
949 logger.Warnw(ctx, "PON port not found in PortChannelMap", log.Fields{"PON": ponPortID, "channel": channelIP})
950 }
951 return 0
952}
953
954// AddChannelToChannelsPerPon Adds the new channel into the per Pon channel list
955func (igd *IgmpGroupDevice) AddChannelToChannelsPerPon(uniPort string, channelIP net.IP, ponPortID uint32) bool {
956 logger.Debugw(ctx, "Adding channel to ActiveChannelsPerPon list", log.Fields{"PonPort": ponPortID, "channelIP": channelIP})
957
958 isNewChannel := bool(false)
959 isNewReceiver := false
960 if port, ok := igd.PonPortChannelMap.Get(ponPortID); !ok {
961 // PON port not exists in igd. adding it.
962 isNewReceiver = true
963 ponPortChannels := NewPonPortChannels()
964 isNewChannel = ponPortChannels.AddChannelToMap(uniPort, channelIP.String())
965 igd.PonPortChannelMap.Set(ponPortID, ponPortChannels)
966 } else {
967 // PON port exists in igd. Appending the channel list
968 // in the PON port.
969 isNewChannel = port.(*PonPortChannels).AddChannelToMap(uniPort, channelIP.String())
970 igd.PonPortChannelMap.Set(ponPortID, port)
971 count := port.(*PonPortChannels).GetActiveChannelCount()
972
973 logger.Debugw(ctx, "activeChannelCount", log.Fields{"count": count})
974 }
975 GetApplication().UpdateActiveChannelCountForPonPort(igd.Device, uniPort, ponPortID, true, isNewChannel, igd)
976 return isNewReceiver
977}
978
979// RemoveChannelFromChannelsPerPon removes the channel from the per pon channel list.
980func (igd *IgmpGroupDevice) RemoveChannelFromChannelsPerPon(uniPort string, channelIP net.IP, ponPortID uint32) bool {
981 logger.Debugw(ctx, "Removing channel from ActiveChannelsPerPon list", log.Fields{"PonPort": ponPortID, "channelIP": channelIP})
982 var deleted bool
983 ponRemoved := false
984
985 if port, ok := igd.PonPortChannelMap.Get(ponPortID); ok {
986 channelPortMap := port.(*PonPortChannels)
987 deleted = channelPortMap.RemoveChannelFromMap(uniPort, channelIP.String())
988 if deleted && channelPortMap.ChannelList.Length() == 0 {
989 igd.PonPortChannelMap.Remove(ponPortID)
990 ponRemoved = true
991 }
992 GetApplication().UpdateActiveChannelCountForPonPort(igd.Device, uniPort, ponPortID, false, deleted, igd)
993 } else {
994 logger.Warnw(ctx, "PON port doesn't exists in the igd", log.Fields{"PonPortID": ponPortID})
995 }
996 return ponRemoved
997}
998
999// InclSourceIsIn checks if a source is in include list
1000func (igc *IgmpGroupChannel) InclSourceIsIn(src net.IP) bool {
1001 return IsIPPresent(src, igc.IncludeList)
1002}
1003
1004// ExclSourceIsIn checks if a source is in exclude list
1005func (igc *IgmpGroupChannel) ExclSourceIsIn(src net.IP) bool {
1006 return IsIPPresent(src, igc.ExcludeList)
1007}
1008
1009// AddInclSource adds a source is in include list
1010func (igc *IgmpGroupChannel) AddInclSource(src net.IP) {
1011 logger.Debugw(ctx, "Adding Include Source for Channel", log.Fields{"Channel": igc.GroupAddr.String(), "Device": igc.Device, "Src": src})
1012 igc.IncludeList = append(igc.IncludeList, src)
1013}
1014
1015// AddExclSource adds a source is in exclude list
1016func (igc *IgmpGroupChannel) AddExclSource(src net.IP) {
1017 logger.Debugw(ctx, "Adding Exclude Source for Channel", log.Fields{"Channel": igc.GroupAddr.String(), "Device": igc.Device, "Src": src})
1018 igc.ExcludeList = append(igc.ExcludeList, src)
1019}
1020
1021// UpdateExclSource update excl source list for the given channel
1022func (igc *IgmpGroupChannel) UpdateExclSource(srcList []net.IP) bool {
1023
1024 logger.Debugw(ctx, "Updating Exclude Source for Channel", log.Fields{"Channel": igc.GroupAddr.String(), "Device": igc.Device, "Current List": igc.ExcludeList, "Incoming List": srcList})
1025 if !igc.IsExclListChanged(srcList) {
1026 return false
1027 }
1028
1029 if igc.NumReceivers() == 1 {
1030 igc.ExcludeList = srcList
1031 } else {
1032 igc.ExcludeList = igc.computeExclList(srcList)
1033 }
1034
1035 logger.Debugw(ctx, "Updated Exclude Source for Channel", log.Fields{"Channel": igc.GroupAddr.String(), "Device": igc.Device, "Updated Excl List": igc.ExcludeList})
1036 return true
1037}
1038
1039// computeExclList computes intersection of pervious & current src list
1040func (igc *IgmpGroupChannel) computeExclList(srcList []net.IP) []net.IP {
1041
1042 updatedSrcList := []net.IP{}
1043 for _, src := range srcList {
1044 for _, excl := range igc.ExcludeList {
1045 if src.Equal(excl) {
1046 updatedSrcList = append(updatedSrcList, src)
1047 }
1048 }
1049 }
1050 return updatedSrcList
1051}
1052
1053// IsExclListChanged checks if excl list has been updated
1054func (igc *IgmpGroupChannel) IsExclListChanged(srcList []net.IP) bool {
1055
1056 srcPresent := false
1057 if len(igc.ExcludeList) != len(srcList) {
1058 return true
1059 }
1060
1061 for _, src := range srcList {
1062 for _, excl := range igc.ExcludeList {
1063 srcPresent = false
1064 if src.Equal(excl) {
1065 srcPresent = true
1066 break
1067 }
1068 }
1069 if !srcPresent {
1070 return true
1071 }
1072 }
1073 return false
1074}
1075
1076// DelInclSource deletes a source is in include list
1077func (igc *IgmpGroupChannel) DelInclSource(src net.IP) {
1078 mvp := GetApplication().GetMvlanProfileByTag(igc.Mvlan)
1079 /* If the SSM proxy is configured, then we can del the src ip from igc as whatever is in proxy that is final list */
1080 if _, ok := mvp.Proxy[igc.GroupName]; !ok {
1081 logger.Debugw(ctx, "Deleting Include Source for Channel", log.Fields{"Channel": igc.GroupAddr.String(), "Device": igc.Device, "Src": src})
1082 for _, igp := range igc.CurReceivers {
1083 if igp.InclSourceIsIn(src) {
1084 logger.Infow(ctx, "Skipping deletion: Source Present for another Receiver", log.Fields{"Receiver": igp.Port})
1085 return
1086 }
1087 }
1088 for _, igp := range igc.NewReceivers {
1089 if igp.InclSourceIsIn(src) {
1090 logger.Infow(ctx, "Skipping deletion: Source Present for another Receiver", log.Fields{"Receiver": igp.Port})
1091 return
1092 }
1093 }
1094 } else {
1095 logger.Debug(ctx, "Proxy configured, not Deleting Include Source for Channel")
1096 }
1097 for i, addr := range igc.IncludeList {
1098 if addr.Equal(src) {
1099 igc.IncludeList = append(igc.IncludeList[:i], igc.IncludeList[i+1:]...)
1100 return
1101 }
1102 }
1103}
1104
1105// DelExclSource deletes a source is in exclude list
1106func (igc *IgmpGroupChannel) DelExclSource(src net.IP) {
1107 logger.Debugw(ctx, "Deleting Exclude Source for Channel", log.Fields{"Channel": igc.GroupAddr.String(), "Device": igc.Device, "Src": src})
1108
1109 for _, igp := range igc.CurReceivers {
1110 if igp.ExclSourceIsIn(src) {
1111 logger.Infow(ctx, "Skipping deletion: Source Present for another Receiver", log.Fields{"Receiver": igp.Port})
1112 return
1113 }
1114 }
1115 for _, igp := range igc.NewReceivers {
1116 if igp.ExclSourceIsIn(src) {
1117 logger.Infow(ctx, "Skipping deletion: Source Present for another Receiver", log.Fields{"Receiver": igp.Port})
1118 return
1119 }
1120 }
1121 for i, addr := range igc.ExcludeList {
1122 if addr.Equal(src) {
1123 igc.ExcludeList = append(igc.ExcludeList[:i], igc.ExcludeList[i+1:]...)
1124 return
1125 }
1126 }
1127}
1128
1129// ProcessSources process the received list of either included sources or the excluded sources
1130// The return value indicate sif the group is modified and needs to be informed
1131// to the upstream multicast servers
1132func (igc *IgmpGroupChannel) ProcessSources(port string, ip []net.IP, incl bool) (bool, bool) {
1133 groupChanged := false
1134 groupExclUpdated := false
1135 receiverSrcListEmpty := false
1136 // If the version type is 2, there isn't anything to process here
1137 if igc.Version == IgmpVersion2 && *igc.ServVersion == IgmpVersion2 {
1138 return false, false
1139 }
1140
1141 igp := igc.GetReceiver(port)
1142 if igp == nil {
1143 logger.Warnw(ctx, "Receiver not found", log.Fields{"Port": port})
1144 return false, false
1145 }
1146 mvp := GetApplication().GetMvlanProfileByTag(igc.Mvlan)
1147 if incl {
1148 for _, src := range ip {
1149
1150 if igp.ExclSourceIsIn(src) {
1151 igp.DelExclSource(src)
1152 if igc.ExclSourceIsIn(src) {
1153 igc.DelExclSource(src)
1154 groupChanged = true
1155 }
1156 }
1157
1158 // If the source is not in the list of include sources for the port
1159 // add it. If so, check also if it is in list of include sources
1160 // at the device level.
1161 if !igp.InclSourceIsIn(src) {
1162 igp.AddInclSource(src)
1163 if !igc.InclSourceIsIn(src) {
1164 igc.AddInclSource(src)
1165 groupChanged = true
1166 }
1167 }
1168 }
1169 /* If any of the existing ip in the source list is removed we need to remove from the list in igp and igc */
1170 if _, ok := mvp.Proxy[igc.GroupName]; ok {
1171 /* If we get leave message from any subscriber, we do not have to delete the entries in the src list
1172 Only if ther is any modification in the src list by proxy config update only then we need to update */
1173 if len(ip) != 0 && len(ip) != len(igc.IncludeList) {
1174 for i := len(igc.IncludeList) - 1; i >= 0; i-- {
1175 src := igc.IncludeList[i]
1176 if !IsIPPresent(src, ip) {
1177 igp.DelInclSource(src)
1178 igc.DelInclSource(src)
1179 groupChanged = true
1180 }
1181 }
1182 }
1183 }
1184 } else {
1185 for _, src := range ip {
1186
1187 if igp.InclSourceIsIn(src) {
1188 igp.DelInclSource(src)
1189 if igc.InclSourceIsIn(src) {
1190 igc.DelInclSource(src)
1191 groupChanged = true
1192 }
1193 if len(igp.IncludeList) == 0 {
1194 receiverSrcListEmpty = true
1195 }
1196 }
1197
1198 // If the source is not in the list of exclude sources for the port
1199 // add it. If so, check also if it is in list of include sources
1200 // at the device level.
1201 if !igp.ExclSourceIsIn(src) {
1202 igp.AddExclSource(src)
1203 /* If there is any update in the src list of proxy we need to update the igc */
1204 if _, ok := mvp.Proxy[igc.GroupName]; ok {
1205 if !igc.ExclSourceIsIn(src) {
1206 igc.AddExclSource(src)
1207 groupChanged = true
1208 }
1209 }
1210 }
1211 }
1212 /* If any of the existing ip in the source list is removed we need to remove from the list in igp and igc */
1213 if _, ok := mvp.Proxy[igc.GroupName]; ok {
1214 if len(ip) != len(igc.ExcludeList) {
1215 for i := len(igc.ExcludeList) - 1; i >= 0; i-- {
1216 src := igc.ExcludeList[i]
1217 if !IsIPPresent(src, ip) {
1218 igp.DelExclSource(src)
1219 igc.DelExclSource(src)
1220 groupChanged = true
1221 }
1222 }
1223 }
1224 }
1225 groupExclUpdated = igc.UpdateExclSource(ip)
1226 }
1227 if err := igp.WriteToDb(igc.Mvlan, igc.GroupAddr, igc.Device); err != nil {
1228 logger.Errorw(ctx, "Igmp group port Write to DB failed", log.Fields{"mvlan": igc.Mvlan, "GroupAddr": igc.GroupAddr})
1229 }
1230 return (groupChanged || groupExclUpdated), receiverSrcListEmpty
1231}
1232
1233// GetReceiver to get receiver info
1234func (igc *IgmpGroupChannel) GetReceiver(port string) *IgmpGroupPort {
1235 igp := igc.NewReceivers[port]
1236 if igp == nil {
1237 igp = igc.CurReceivers[port]
1238 }
1239 return igp
1240}
1241
1242// AddReceiver add the receiver to the device and perform other actions such as adding the group
1243// to the physical device, add members, add flows to point the MC packets to the
1244// group. Also, send a IGMP report upstream if there is a change in the group
1245func (igd *IgmpGroupDevice) AddReceiver(port string, groupAddr net.IP,
1246 group *layers.IGMPv3GroupRecord, version uint8, cvlan uint16, pbit uint8, ponPortID uint32) {
1247
1248 var igc *IgmpGroupChannel
1249 logger.Debugw(ctx, "Processing receiver for device", log.Fields{"Channel": groupAddr, "Port": port, "Device": igd.Device})
1250
1251 igcIntf, ok := igd.GroupChannels.Load(groupAddr.String())
1252 if !ok {
1253 igc = NewIgmpGroupChannel(igd, groupAddr, version)
1254 igd.GroupChannels.Store(groupAddr.String(), igc)
1255 } else {
1256 igc = igcIntf.(*IgmpGroupChannel)
1257 }
1258
1259 if !igd.GroupInstalled {
1260 igd.AddNewReceiver(port, groupAddr, group, cvlan, pbit, ponPortID)
1261 return
1262 }
1263
1264 isNewReceiver := igc.AddReceiver(port, group, cvlan, pbit)
1265 if isNewReceiver {
1266 ipsList := []net.IP{}
1267 ipsIntf, _ := igd.PortChannelMap.Load(port)
1268 if ipsIntf != nil {
1269 ipsList = ipsIntf.([]net.IP)
1270 }
1271 ipsList = append(ipsList, groupAddr)
1272 igd.PortChannelMap.Store(port, ipsList)
1273 logger.Debugw(ctx, "Port Channel Updated", log.Fields{"Port": port, "AddedChannelList": ipsList, "Addr": groupAddr})
1274
1275 isNewPonReceiver := igd.AddChannelToChannelsPerPon(port, groupAddr, ponPortID)
1276 //Modify group only if this is the first time the port is subscribing for the group
1277 if isNewPonReceiver {
1278 igd.ModMcGroup()
1279 }
1280 }
1281 if err := igd.WriteToDb(); err != nil {
1282 logger.Errorw(ctx, "Igmp group device Write to DB failed", log.Fields{"Device": igd.Device, "GroupName": igd.GroupName, "GroupAddr": igd.GroupAddr.String()})
1283 }
1284}
1285
1286// AddNewReceiver to add new receiver
1287func (igd *IgmpGroupDevice) AddNewReceiver(port string, groupAddr net.IP, group *layers.IGMPv3GroupRecord, cvlan uint16, pbit uint8, ponPortID uint32) {
1288
1289 logger.Debugw(ctx, "Adding New Device Receiver", log.Fields{"Channel": groupAddr, "Port": port, "Device": igd.Device})
1290 igcIntf, _ := igd.GroupChannels.Load(groupAddr.String())
1291 if igcIntf == nil {
1292 logger.Warnw(ctx, "No Group Channel present for given channel", log.Fields{"Channel": groupAddr, "Port": port, "Device": igd.Device})
1293 return
1294 }
1295
1296 igc := igcIntf.(*IgmpGroupChannel)
1297 ipsList := []net.IP{}
1298 ipsIntf, _ := igd.PortChannelMap.Load(port)
1299 if ipsIntf != nil {
1300 ipsList = ipsIntf.([]net.IP)
1301 }
1302 ipsList = append(ipsList, groupAddr)
1303 igd.PortChannelMap.Store(port, ipsList)
1304 igd.AddChannelToChannelsPerPon(port, groupAddr, ponPortID)
1305 logger.Debugw(ctx, "Port Channel Updated", log.Fields{"Port": port, "NewChannelList": ipsList, "Addr": groupAddr})
1306
1307 igd.AddMcGroup()
1308 igc.AddReceiver(port, group, cvlan, pbit)
1309 if err := igd.WriteToDb(); err != nil {
1310 logger.Errorw(ctx, "Igmp group device Write to DB failed", log.Fields{"Device": igd.Device, "GroupName": igd.GroupName, "GroupAddr": igd.GroupAddr.String()})
1311 }
1312}
1313
1314// AddReceiver add the receiver to the device and perform other actions such as adding the group
1315// to the physical device, add members, add flows to point the MC packets to the
1316// group. Also, send a IGMP report upstream if there is a change in the group
1317func (igc *IgmpGroupChannel) AddReceiver(port string, group *layers.IGMPv3GroupRecord, cvlan uint16, pbit uint8) bool {
1318
1319 var igp *IgmpGroupPort
1320 var groupModified = false
1321 var isNewReceiver = false
1322
1323 var ip []net.IP
1324 incl := false
1325 mvp := GetApplication().GetMvlanProfileByTag(igc.Mvlan)
1326 if _, ok := mvp.Proxy[igc.GroupName]; ok {
1327 if mvp.Proxy[igc.GroupName].Mode == common.Include {
1328 incl = true
1329 }
1330 ip = mvp.Proxy[igc.GroupName].SourceList
1331 } else if group != nil {
1332 incl = isIncl(group.Type)
1333 ip = group.SourceAddresses
1334 }
1335 logger.Debugw(ctx, "Attempting to add receiver", log.Fields{"Version": igc.Version, "Port": port, "Incl": incl, "srcIp": ip})
1336
1337 //logger.Infow(ctx, "Receivers", log.Fields{"New": igc.NewReceivers, "Current": igc.CurReceivers})
1338 logger.Debugw(ctx, "Receiver Group", log.Fields{"Igd GId": igc.GroupID})
1339 logger.Debugw(ctx, "Receiver Channel", log.Fields{"Igd addr": igc.GroupAddr})
1340 logger.Debugw(ctx, "Receiver Mvlan", log.Fields{"Igd mvlan": igc.Mvlan})
1341 logger.Debugw(ctx, "Receiver Sources", log.Fields{"Igd addr": ip})
1342
1343 ponPortID := GetApplication().GetPonPortID(igc.Device, port)
1344
1345 // Process the IGMP receiver. If it is already in, we should only process the changes
1346 // to source list.
1347 var newRcvExists bool
1348 igp, newRcvExists = igc.NewReceivers[port]
1349 if !newRcvExists {
1350 // Add the receiver to the list of receivers and make the necessary group modification
1351 // if this is the first time the receiver is added
1352 var curRcvExists bool
1353 if igp, curRcvExists = igc.CurReceivers[port]; curRcvExists {
1354 logger.Debugw(ctx, "Existing IGMP receiver", log.Fields{"Group": igc.GroupAddr.String(), "Port": port})
1355 delete(igc.CurReceivers, port)
1356 igp.QueryTimeoutCount = 0
1357 igc.NewReceivers[port] = igp
1358 } else {
1359 // New receiver who wasn't part of earlier list
1360 // Need to send out IGMP group modification for this port
1361 igp = NewIgmpGroupPort(port, cvlan, pbit, igc.Version, incl, uint32(ponPortID))
1362 igc.NewReceivers[port] = igp
1363 isNewReceiver = true
1364 logger.Debugw(ctx, "New IGMP receiver", log.Fields{"Group": igc.GroupAddr.String(), "Port": port})
1365 if len(igc.NewReceivers) == 1 && len(igc.CurReceivers) == 0 {
1366 groupModified = true
1367 igc.AddMcFlow()
1368 logger.Debugw(ctx, "Added New Flow", log.Fields{"Group": igc.GroupAddr.String(), "Port": port})
1369 }
1370 if !incl {
1371 igc.Exclude++
1372 }
1373 }
1374 }
1375
1376 // Process the include/exclude list which may end up modifying the group
1377 if change, _ := igc.ProcessSources(port, ip, incl); change {
1378 groupModified = true
1379 }
1380 igc.ProcessMode(port, incl)
1381
1382 // If the group is modified as this is the first receiver or due to include/exclude list modification
1383 // send a report to the upstream multicast servers
1384 if groupModified {
1385 logger.Debug(ctx, "Group Modified and IGMP report sent to the upstream server")
1386 igc.SendReport(false)
1387 } else if newRcvExists {
1388 return false
1389 }
1390
1391 logger.Debugw(ctx, "Channel Receiver Added", log.Fields{"Group Channel": igc.GroupAddr, "Group Port": igp})
1392
1393 if err := igc.WriteToDb(); err != nil {
1394 logger.Errorw(ctx, "Igmp group channel Write to DB failed", log.Fields{"mvlan": igc.Mvlan, "GroupAddr": igc.GroupAddr})
1395 }
1396 if err := igp.WriteToDb(igc.Mvlan, igc.GroupAddr, igc.Device); err != nil {
1397 logger.Errorw(ctx, "Igmp group port Write to DB failed", log.Fields{"mvlan": igc.Mvlan, "GroupAddr": igc.GroupAddr})
1398 }
1399 return isNewReceiver
1400}
1401
1402// DelReceiver is called when Query expiry happened for a receiver. This removes the receiver from the
1403// the group
1404func (igc *IgmpGroupChannel) DelReceiver(port string, incl bool, srcList []net.IP) bool {
1405 // The receiver may exist either in NewReceiver list or
1406 // the CurReceivers list. Find and remove it from either
1407 // of the lists.
1408 logger.Debugw(ctx, "Deleting Receiver from Channel", log.Fields{"Port": port, "SrcList": srcList, "Incl": incl})
1409 logger.Debugw(ctx, "New Receivers", log.Fields{"New": igc.NewReceivers})
1410 logger.Debugw(ctx, "Current Receivers", log.Fields{"Current": igc.CurReceivers})
1411
1412 receiversUpdated := false
1413 groupModified, receiverSrcListEmpty := igc.ProcessSources(port, srcList, incl)
1414
1415 if len(srcList) == 0 || len(igc.IncludeList) == 0 || receiverSrcListEmpty {
1416 if igp, ok := igc.NewReceivers[port]; ok {
1417 logger.Debug(ctx, "Deleting from NewReceivers")
1418 delete(igc.NewReceivers, port)
1419 receiversUpdated = true
1420 if igp.Exclude {
1421 igc.Exclude--
1422 }
1423 } else {
1424 if igp, ok1 := igc.CurReceivers[port]; ok1 {
1425 logger.Debug(ctx, "Deleting from CurReceivers")
1426 delete(igc.CurReceivers, port)
1427 receiversUpdated = true
1428 if igp.Exclude {
1429 igc.Exclude--
1430 }
1431 } else {
1432 logger.Debug(ctx, "Receiver doesnot exist. Dropping Igmp leave")
1433 return false
1434 }
1435 }
1436 _ = db.DelIgmpRcvr(igc.Mvlan, igc.GroupAddr, igc.Device, port)
1437 }
1438
1439 if igc.NumReceivers() == 0 {
1440 igc.DelMcFlow()
1441 mvp := GetApplication().GetMvlanProfileByTag(igc.Mvlan)
1442 /* If proxy is configured and NumReceivers is 0, then we can reset the igc src list so that we send leave */
1443 if _, ok := mvp.Proxy[igc.GroupName]; ok {
1444 igc.IncludeList = []net.IP{}
1445 }
1446 igc.SendLeaveToServer()
1447 logger.Debugw(ctx, "Deleted the receiver Flow", log.Fields{"Num Receivers": igc.NumReceivers()})
1448 return true
1449 }
1450 if groupModified {
1451 igc.SendReport(false)
1452 logger.Infow(ctx, "Updated SourceList for Channel", log.Fields{"Current": igc.CurReceivers, "New": igc.NewReceivers})
1453 }
1454 if err := igc.WriteToDb(); err != nil {
1455 logger.Errorw(ctx, "Igmp group channel Write to DB failed", log.Fields{"mvlan": igc.Mvlan, "GroupAddr": igc.GroupAddr})
1456 }
1457 logger.Infow(ctx, "Updated Receiver info for Channel", log.Fields{"Current": igc.CurReceivers, "New": igc.NewReceivers})
1458
1459 return receiversUpdated
1460}
1461
1462// NumReceivers to get number of receivers
1463func (igd *IgmpGroupDevice) NumReceivers() int {
1464 var numReceivers int
1465 len := func(key interface{}, value interface{}) bool {
1466 numReceivers++
1467 return true
1468 }
1469 igd.PortChannelMap.Range(len)
1470 return numReceivers
1471}
1472
1473// DelReceiver is called when Query expiry happened for a receiver. This removes the receiver from the
1474// the group
1475func (igd *IgmpGroupDevice) DelReceiver(groupAddr net.IP, port string, group *layers.IGMPv3GroupRecord, ponPortID uint32) {
1476
1477 logger.Debugw(ctx, "Deleting Receiver for Device", log.Fields{"port": port, "GroupIP": groupAddr.String()})
1478 var igc *IgmpGroupChannel
1479 var igcIntf interface{}
1480 var ok bool
1481 var srcList []net.IP
1482 incl := false
1483 mvp := GetApplication().GetMvlanProfileByTag(igd.Mvlan)
1484
1485 if _, ok := mvp.Proxy[igd.GroupName]; ok {
1486 incl = true
1487 } else if group != nil {
1488 srcList = group.SourceAddresses
1489 incl = isIncl(group.Type)
1490 }
1491
1492 if igcIntf, ok = igd.GroupChannels.Load(groupAddr.String()); !ok {
1493 logger.Warnw(ctx, "Igmp Channel for group IP doesnt exist", log.Fields{"GroupAddr": groupAddr.String()})
1494 return
1495 }
1496 igc = igcIntf.(*IgmpGroupChannel)
1497 if ok := igc.DelReceiver(port, incl, srcList); !ok {
1498 return
1499 }
1500
1501 if igc.NumReceivers() == 0 {
1502 igd.DelIgmpGroupChannel(igc)
1503 }
1504 igd.DelPortFromChannel(port, groupAddr)
1505 isGroupModified := igd.RemoveChannelFromChannelsPerPon(port, groupAddr, ponPortID)
1506
1507 //Remove port from receiver if port has no subscription to any of the group channels
1508 if isGroupModified {
1509 igd.ModMcGroup()
1510 }
1511 if err := igd.WriteToDb(); err != nil {
1512 logger.Errorw(ctx, "Igmp group device Write to DB failed", log.Fields{"Device": igd.Device, "GroupName": igd.GroupName, "GroupAddr": igd.GroupAddr.String()})
1513 }
1514}
1515
1516// DelChannelReceiver is called when Query expiry happened for a receiver. This removes the receiver from the
1517// the group
1518func (igd *IgmpGroupDevice) DelChannelReceiver(groupAddr net.IP) map[string]*IgmpGroupPort {
1519
1520 portsRemoved := make(map[string]*IgmpGroupPort)
1521 groupModified := false
1522 // ifEmpty := true
1523 igcIntf, _ := igd.GroupChannels.Load(groupAddr.String())
1524
1525 if igcIntf == nil {
1526 return portsRemoved
1527 }
1528 igc := igcIntf.(*IgmpGroupChannel)
1529
1530 for port, igp := range igc.NewReceivers {
1531 _ = db.DelIgmpRcvr(igc.Mvlan, igc.GroupAddr, igc.Device, port) //TODO: Y not here
1532 igd.DelPortFromChannel(port, igc.GroupAddr)
1533 ponPortID := GetApplication().GetPonPortID(igd.Device, port)
1534 groupModified = igd.RemoveChannelFromChannelsPerPon(port, igc.GroupAddr, ponPortID)
1535 delete(igc.NewReceivers, port)
1536 portsRemoved[port] = igp
1537 }
1538 for port, igp := range igc.CurReceivers {
1539 _ = db.DelIgmpRcvr(igc.Mvlan, igc.GroupAddr, igc.Device, port)
1540 igd.DelPortFromChannel(port, igc.GroupAddr)
1541 ponPortID := GetApplication().GetPonPortID(igd.Device, port)
1542 groupModified = igd.RemoveChannelFromChannelsPerPon(port, igc.GroupAddr, ponPortID)
1543 delete(igc.CurReceivers, port)
1544 portsRemoved[port] = igp
1545 }
1546
1547 igc.DelMcFlow()
1548 igd.DelIgmpGroupChannel(igc)
1549 igc.Exclude = 0
1550 igc.SendLeaveToServer()
1551
1552 if groupModified {
1553 igd.ModMcGroup()
1554 }
1555 if err := igd.WriteToDb(); err != nil {
1556 logger.Errorw(ctx, "Igmp group device Write to DB failed", log.Fields{"Device": igd.Device, "GroupName": igd.GroupName, "GroupAddr": igd.GroupAddr.String()})
1557 }
1558 logger.Debugw(ctx, "Deleted the receiver Flow", log.Fields{"Num Receivers": igc.NumReceivers()})
1559 return portsRemoved
1560}
1561
1562// DelIgmpGroupChannel to delete igmp group channel
1563func (igd *IgmpGroupDevice) DelIgmpGroupChannel(igc *IgmpGroupChannel) {
1564
1565 if igc.NumReceivers() != 0 {
1566 igc.DelAllReceivers()
1567 }
1568 _ = db.DelIgmpChannel(igc.Mvlan, igc.GroupName, igc.Device, igc.GroupAddr)
1569 igd.GroupChannels.Delete(igc.GroupAddr.String())
1570 logger.Infow(ctx, "Deleted the Channel from Device", log.Fields{"Channel": igc.GroupAddr.String()})
1571 isLenZero := true
1572 checkIfEmpty := func(key interface{}, value interface{}) bool {
1573 isLenZero = false
1574 return false
1575 }
1576 igd.GroupChannels.Range(checkIfEmpty)
1577 if isLenZero {
1578 logger.Infow(ctx, "No more active channels. Deleting MC Group", log.Fields{"Device": igd.Device, "Group": igd.GroupName})
1579 igd.DelMcGroup(false)
1580 }
1581}
1582
1583// func (igd *IgmpGroupDevice) DelIgmpChannel(igc *IgmpGroupChannel) {
1584// db.DelIgmpChannel(igc.GroupName, igc.Device, igc.GroupAddr)
1585// delete(igd.GroupChannels, igc.GroupAddr.String())
1586// logger.Debugw(ctx, "Deleted the Channel", log.Fields{"Num Receivers": igc.NumReceivers()})
1587// }
1588
1589// DelPortFromChannel to delete port from channel
1590func (igd *IgmpGroupDevice) DelPortFromChannel(port string, groupAddr net.IP) bool {
1591 ipsList := []net.IP{}
1592 ipsListIntf, _ := igd.PortChannelMap.Load(port)
1593 if ipsListIntf != nil {
1594 ipsList = ipsListIntf.([]net.IP)
1595 }
1596 for i, addr := range ipsList {
1597 if addr.Equal(groupAddr) {
1598 ipsList = append(ipsList[:i], ipsList[i+1:]...)
1599 //Remove port from receiver if port has no subscription to any of the group channels
1600 if len(ipsList) == 0 {
1601 igd.PortChannelMap.Delete(port)
1602 } else {
1603 //Update the map with modified ips list
1604 igd.PortChannelMap.Store(port, ipsList)
1605 }
1606 logger.Debugw(ctx, "Port Channel Updated", log.Fields{"Port": port, "DelChannelList": ipsList, "Addr": groupAddr.String()})
1607 return true
1608 }
1609 }
1610 return false
1611}
1612
1613// DelIgmpGroup deletes all devices for the provided igmp group
1614func (ig *IgmpGroup) DelIgmpGroup() {
1615 logger.Infow(ctx, "Deleting All Device for Group", log.Fields{"Group": ig.GroupName})
1616 for _, igd := range ig.Devices {
1617 ig.DelIgmpGroupDevice(igd)
1618 }
1619 GetApplication().DelIgmpGroup(ig)
1620}
1621
1622// DelAllChannels deletes all receiver for the provided igmp device
1623func (igd *IgmpGroupDevice) DelAllChannels() {
1624 logger.Infow(ctx, "Deleting All Channel for Device", log.Fields{"Device": igd.Device, "Group": igd.GroupName})
1625 delGroupChannels := func(key interface{}, value interface{}) bool {
1626 igc := value.(*IgmpGroupChannel)
1627 igd.DelIgmpGroupChannel(igc)
1628 return true
1629 }
1630 igd.GroupChannels.Range(delGroupChannels)
1631}
1632
1633// DelAllReceivers deletes all receiver for the provided igmp device
1634func (igc *IgmpGroupChannel) DelAllReceivers() {
1635 logger.Infow(ctx, "Deleting All Receiver for Channel", log.Fields{"Device": igc.Device, "Channel": igc.GroupAddr.String()})
1636 _ = db.DelAllIgmpRcvr(igc.Mvlan, igc.GroupAddr, igc.Device)
1637 igc.Exclude = 0
1638 igc.DelMcFlow()
1639 igc.SendLeaveToServer()
1640 logger.Infow(ctx, "MC Flow deleted and Leave sent", log.Fields{"Channel": igc.GroupAddr.String(), "Device": igc.Device})
1641}
1642
1643// ProcessQuery process query received from the upstream IGMP server
1644func (igd *IgmpGroupDevice) ProcessQuery(groupAddr net.IP, ver uint8) {
1645 logger.Debugw(ctx, "Received Query From Server", log.Fields{"Version": ver})
1646 if ver != *igd.ServVersion {
1647 igd.ServVersionExpiry = time.Now().Add(time.Duration(2*igd.proxyCfg.KeepAliveInterval) * time.Second)
1648 *igd.ServVersion = ver
1649 mvp := GetApplication().GetMvlanProfileByTag(igd.Mvlan)
1650 if err := mvp.WriteToDb(); err != nil {
1651 logger.Errorw(ctx, "Mvlan profile write to DB failed", log.Fields{"ProfileName": mvp.Name})
1652 }
1653 }
1654 if igc, ok := igd.GroupChannels.Load(groupAddr.String()); ok {
1655 igc.(*IgmpGroupChannel).SendReport(true)
1656 return
1657 }
1658 logger.Infow(ctx, "No Members for Channel. Dropping Igmp Query", log.Fields{"Group": igd.GroupName, "Channel": groupAddr.String()})
1659}
1660
1661// Igmpv2ReportPacket build an IGMPv2 Report for the upstream servers
1662func (igc *IgmpGroupChannel) Igmpv2ReportPacket() ([]byte, error) {
1663 logger.Debugw(ctx, "Buidling IGMP version 2 Report", log.Fields{"Device": igc.Device})
1664 return IgmpReportv2Packet(igc.GroupAddr, igc.Mvlan, (*igc.proxyCfg).IgmpCos, **igc.IgmpProxyIP)
1665}
1666
1667// Igmpv3ReportPacket build an IGMPv3 Report for the upstream servers
1668func (igc *IgmpGroupChannel) Igmpv3ReportPacket() ([]byte, error) {
1669 logger.Debugw(ctx, "Buidling IGMP version 3 Report", log.Fields{"Device": igc.Device, "Exclude": igc.Exclude})
1670 if igc.Exclude > 0 {
1671 return Igmpv3ReportPacket(igc.GroupAddr, igc.Mvlan, (*igc.proxyCfg).IgmpCos, **igc.IgmpProxyIP, false, igc.ExcludeList)
1672 }
1673 return Igmpv3ReportPacket(igc.GroupAddr, igc.Mvlan, (*igc.proxyCfg).IgmpCos, **igc.IgmpProxyIP, true, igc.IncludeList)
1674}
1675
1676// SendReport send a consolidated report to the server
1677func (igc *IgmpGroupChannel) SendReport(isQuery bool) {
1678 var report []byte
1679 var err error
1680 logger.Debugw(ctx, "Checking Version", log.Fields{"IGC Version": igc.Version, "Proxy Version": (*igc.proxyCfg).IgmpVerToServer,
1681 "Result": (getVersion((*igc.proxyCfg).IgmpVerToServer) == IgmpVersion2)})
1682
1683 /**
1684 +------------------------------------------------------------------------+
1685 | IGMP version(towards BNG) Configured at VGC |
1686 +-------------------------------+----------------------------------------+
1687 | v2 | v3 |
1688 +===================+==========+===============================+========================================+
1689 | Received From RG | V2 Join | Process and Send as V2 to BNG | Process, Convert to v3 and Send to BNG |
1690 | | | | Process, Send as v2, if the BNG is v2 |
1691 +===================+----------+-------------------------------+----------------------------------------+
1692 | V3 Join | Process and Send as V2 to BNG | Process, Send v3 to BNG |
1693 | | | Process, Convert, Send as v2, if the |
1694 | | | BNG is v2 |
1695 +===================+==========+===============================+========================================+
1696 | Received From BNG | V2 Query | V2 response to BNG | V2 response to BNG |
1697 +===================+----------+-------------------------------+----------------------------------------+
1698 | V3 Query | Discard | V3 response to BNG |
1699 +==========+===============================+========================================+
1700 */
1701 // igc.Version: igmp version received from RG.
1702 // igc.ServVersion: igmp version received from BNG or IgmpVerToServer present in proxy igmp conf.
1703
1704 if isQuery && *igc.ServVersion == IgmpVersion3 && getVersion((*igc.proxyCfg).IgmpVerToServer) == IgmpVersion2 {
1705 // This is the last scenario where we must discard the query processing.
1706 logger.Debug(ctx, "Dropping query packet since the server verion is v3 but igmp proxy version is v2")
1707 return
1708 }
1709
1710 if *igc.ServVersion == IgmpVersion2 || getVersion((*igc.proxyCfg).IgmpVerToServer) == IgmpVersion2 {
1711 report, err = igc.Igmpv2ReportPacket()
1712 } else {
1713 report, err = igc.Igmpv3ReportPacket()
1714 }
1715 if err != nil {
1716 logger.Warnw(ctx, "Error Preparing Report", log.Fields{"Device": igc.Device, "Ver": igc.Version, "Reason": err.Error()})
1717 return
1718 }
1719 nni, err := GetApplication().GetNniPort(igc.Device)
1720 if err == nil {
1721 _ = cntlr.GetController().PacketOutReq(igc.Device, nni, nni, report, false)
1722 } else {
1723 logger.Warnw(ctx, "Didn't find NNI port", log.Fields{"Device": igc.Device})
1724 }
1725}
1726
1727// AddMcFlow adds flow to the device when the first receiver joins
1728func (igc *IgmpGroupChannel) AddMcFlow() {
1729 flow, err := igc.BuildMcFlow()
1730 if err != nil {
1731 logger.Warnw(ctx, "MC Flow Build Failed", log.Fields{"Reason": err.Error()})
1732 return
1733 }
1734 port, _ := GetApplication().GetNniPort(igc.Device)
1735 _ = cntlr.GetController().AddFlows(port, igc.Device, flow)
1736}
1737
1738// DelMcFlow deletes flow from the device when the last receiver leaves
1739func (igc *IgmpGroupChannel) DelMcFlow() {
1740 flow, err := igc.BuildMcFlow()
1741 if err != nil {
1742 logger.Warnw(ctx, "MC Flow Build Failed", log.Fields{"Reason": err.Error()})
1743 return
1744 }
1745 flow.ForceAction = true
1746 device := GetApplication().GetDevice(igc.Device)
1747
1748 if mvpIntf, _ := GetApplication().MvlanProfilesByTag.Load(igc.Mvlan); mvpIntf != nil {
1749 mvp := mvpIntf.(*MvlanProfile)
1750 err := mvp.DelFlows(device, flow)
1751 if err != nil {
1752 logger.Warnw(ctx, "Delering IGMP Flow for device failed ", log.Fields{"Device": device, "err": err})
1753 }
1754 }
1755}
1756
1757// BuildMcFlow builds the flow using which it is added/deleted
1758func (igc *IgmpGroupChannel) BuildMcFlow() (*of.VoltFlow, error) {
1759 flow := &of.VoltFlow{}
1760 flow.SubFlows = make(map[uint64]*of.VoltSubFlow)
1761 //va := GetApplication()
1762 logger.Infow(ctx, "Building Mcast flow", log.Fields{"Mcast Group": igc.GroupAddr.String(), "Mvlan": igc.Mvlan.String()})
1763 uintGroupAddr := ipv4ToUint(igc.GroupAddr)
1764 subFlow := of.NewVoltSubFlow()
1765 subFlow.SetMatchVlan(igc.Mvlan)
1766 subFlow.SetIpv4Match()
1767 subFlow.SetMatchDstIpv4(igc.GroupAddr)
1768 mvp := GetApplication().GetMvlanProfileByTag(igc.Mvlan)
1769 //nni, err := va.GetNniPort(igc.Device)
1770 //if err != nil {
1771 // return nil, err
1772 //}
1773 //inport, err := va.GetPortID(nni)
1774 //if err != nil {
1775 // return nil, err
1776 //}
1777 //subFlow.SetInPort(inport)
1778 subFlow.SetOutGroup(igc.GroupID)
1779 cookiePort := uintGroupAddr
1780 subFlow.Cookie = uint64(cookiePort)<<32 | uint64(igc.Mvlan)
1781 subFlow.Priority = of.McFlowPriority
1782 metadata := uint64(mvp.PonVlan)
1783 subFlow.SetTableMetadata(metadata)
1784
1785 flow.SubFlows[subFlow.Cookie] = subFlow
1786 logger.Infow(ctx, "Built Mcast flow", log.Fields{"cookie": subFlow.Cookie, "subflow": subFlow})
1787 return flow, nil
1788}
1789
1790//DelFlows - Triggers flow deletion after registering for flow indication event
1791func (mvp *MvlanProfile) DelFlows(device *VoltDevice, flow *of.VoltFlow) error {
1792 mvp.mvpFlowLock.Lock()
1793 defer mvp.mvpFlowLock.Unlock()
1794
1795 var flowMap map[string]bool
1796 var ok bool
1797
1798 for cookie := range flow.SubFlows {
1799 cookie := strconv.FormatUint(cookie, 10)
1800 fe := &FlowEvent{
1801 eType: EventTypeMcastFlowRemoved,
1802 device: device.Name,
1803 cookie: cookie,
1804 eventData: mvp,
1805 }
1806 device.RegisterFlowDelEvent(cookie, fe)
1807
1808 if flowMap, ok = mvp.PendingDeleteFlow[device.Name]; !ok {
1809 flowMap = make(map[string]bool)
1810 }
1811 flowMap[cookie] = true
1812 mvp.PendingDeleteFlow[device.Name] = flowMap
1813 }
1814 if err := mvp.WriteToDb(); err != nil {
1815 logger.Errorw(ctx, "Mvlan profile write to DB failed", log.Fields{"ProfileName": mvp.Name})
1816 }
1817 return cntlr.GetController().DelFlows(device.NniPort, device.Name, flow)
1818}
1819
1820//FlowRemoveSuccess - Process flow success indication
1821func (mvp *MvlanProfile) FlowRemoveSuccess(cookie string, device string) {
1822 mvp.mvpFlowLock.Lock()
1823 defer mvp.mvpFlowLock.Unlock()
1824
1825 logger.Infow(ctx, "Mvlan Flow Remove Success Notification", log.Fields{"MvlanProfile": mvp.Name, "Cookie": cookie, "Device": device})
1826
1827 if _, ok := mvp.PendingDeleteFlow[device]; ok {
1828 delete(mvp.PendingDeleteFlow[device], cookie)
1829 }
1830
1831 if err := mvp.WriteToDb(); err != nil {
1832 logger.Errorw(ctx, "Mvlan profile write to DB failed", log.Fields{"ProfileName": mvp.Name})
1833 }
1834}
1835
1836//FlowRemoveFailure - Process flow failure indication
1837func (mvp *MvlanProfile) FlowRemoveFailure(cookie string, device string, errorCode uint32, errReason string) {
1838
1839 mvp.mvpFlowLock.Lock()
1840 defer mvp.mvpFlowLock.Unlock()
1841
1842 if flowMap, ok := mvp.PendingDeleteFlow[device]; ok {
1843 if _, ok := flowMap[cookie]; ok {
1844 logger.Errorw(ctx, "Mvlan Flow Remove Failure Notification", log.Fields{"MvlanProfile": mvp.Name, "Cookie": cookie, "ErrorCode": errorCode, "ErrorReason": errReason, "Device": device})
1845 return
1846 }
1847 }
1848 logger.Errorw(ctx, "Mvlan Flow Del Failure Notification for Unknown cookie", log.Fields{"MvlanProfile": mvp.Name, "Cookie": cookie, "ErrorCode": errorCode, "ErrorReason": errReason})
1849
1850}
1851
1852// AddMcGroup add the new group on the device when a receiver joins the group
1853func (igd *IgmpGroupDevice) AddMcGroup() {
1854 if !igd.GroupInstalled {
1855 group := &of.Group{}
1856 group.Command = of.GroupCommandAdd
1857 group.GroupID = igd.GroupID
1858 group.Device = igd.Device
1859 group.SetVlan = igd.PonVlan
1860 group.IsPonVlanPresent = igd.IsPonVlanPresent
1861
1862 addbuckets := func(key interface{}, value interface{}) bool {
1863 port := key.(string)
1864 var portID uint32
1865 if d := GetApplication().GetDevice(group.Device); d != nil {
1866 GetApplication().portLock.Lock()
1867 p := d.GetPort(port)
1868 GetApplication().portLock.Unlock()
1869 portID = p.ID
1870 }
1871 //ponPortID := key.(uint32)
1872 if portID != 0xFF {
1873 group.Buckets = append(group.Buckets, portID)
1874 }
1875 return true
1876 }
1877 igd.PortChannelMap.Range(addbuckets)
1878
1879 port, _ := GetApplication().GetNniPort(igd.Device)
1880 _ = cntlr.GetController().GroupUpdate(port, igd.Device, group)
1881 igd.GroupInstalled = true
1882 }
1883}
1884
1885// ModMcGroup updates the group on the device when either a receiver leaves
1886// or joins the group
1887func (igd *IgmpGroupDevice) ModMcGroup() {
1888 if igd.GroupInstalled {
1889 group := &of.Group{}
1890 group.Command = of.GroupCommandMod
1891 group.GroupID = igd.GroupID
1892 group.Device = igd.Device
1893 group.SetVlan = igd.PonVlan
1894 group.IsPonVlanPresent = igd.IsPonVlanPresent
1895
1896 addbuckets := func(key interface{}, value interface{}) bool {
1897 port := key.(string)
1898 var portID uint32
1899 if d := GetApplication().GetDevice(group.Device); d != nil {
1900 GetApplication().portLock.Lock()
1901 p := d.GetPort(port)
1902 GetApplication().portLock.Unlock()
1903 portID = p.ID
1904 }
1905 //ponPortID := key.(uint32)
1906 if portID != 0xFF {
1907 group.Buckets = append(group.Buckets, portID)
1908 }
1909 return true
1910 }
1911 igd.PortChannelMap.Range(addbuckets)
1912
1913 port, _ := GetApplication().GetNniPort(igd.Device)
1914 _ = cntlr.GetController().GroupUpdate(port, igd.Device, group)
1915 } else {
1916 logger.Warnw(ctx, "Update Group Failed. Group not yet created", log.Fields{"Igd": igd.Device})
1917 }
1918}
1919
1920// DelMcGroup : The group is deleted when the last receiver leaves the group
1921func (igd *IgmpGroupDevice) DelMcGroup(forceDelete bool) {
1922
1923 logger.Infow(ctx, "Delete Mc Group Request", log.Fields{"Device": igd.Device, "GroupID": igd.GroupID, "ForceFlag": forceDelete, "GroupInstalled": igd.GroupInstalled})
1924 /*
1925 if !forceDelete && !checkIfForceGroupRemove(igd.Device) {
1926 if success := AddToPendingPool(igd.Device, igd.getKey()); success {
1927 return
1928 }
1929 }*/
1930 if igd.GroupInstalled {
1931 logger.Debugw(ctx, "Deleting Group", log.Fields{"Device": igd.Device, "Id": igd.GroupID})
1932 group := &of.Group{}
1933 group.Command = of.GroupCommandDel
1934 group.GroupID = igd.GroupID
1935 group.Device = igd.Device
1936 group.ForceAction = true
1937
1938 port, _ := GetApplication().GetNniPort(igd.Device)
1939 _ = cntlr.GetController().GroupUpdate(port, igd.Device, group)
1940 igd.GroupInstalled = false
1941 }
1942}
1943
1944//AddToPendingPool - adds Igmp Device obj to pending pool
1945func AddToPendingPool(device string, groupKey string) bool {
1946
1947 logger.Infow(ctx, "Add Device to IgmpGroup Pending Pool", log.Fields{"Device": device, "GroupKey": groupKey})
1948 if grp, ok := GetApplication().IgmpGroups.Load(groupKey); ok {
1949 ig := grp.(*IgmpGroup)
1950 ig.PendingPoolLock.Lock()
1951 logger.Infow(ctx, "Adding Device to IgmpGroup Pending Pool", log.Fields{"Device": device, "GroupID": ig.GroupID, "GroupName": ig.GroupName, "GroupAddr": ig.GroupAddr.String()})
1952 ig.PendingGroupForDevice[device] = time.Now().Add(time.Duration(GroupExpiryTime) * time.Minute)
1953 ig.PendingPoolLock.Unlock()
1954 if err := ig.WriteToDb(); err != nil {
1955 logger.Errorw(ctx, "Igmp group Write to DB failed", log.Fields{"groupName": ig.GroupName})
1956 }
1957 return true
1958 }
1959 return false
1960}
1961
1962/*
1963func checkIfForceGroupRemove(device string) bool {
1964 if d := GetApplication().GetDevice(device); d != nil {
1965 if d.State == cntlr.DeviceStateREBOOTED || d.State == cntlr.DeviceStateDOWN {
1966 return true
1967 }
1968 }
1969 return false
1970}*/
1971
1972// IgmpLeaveToServer sends IGMP leave to server. Called when the last receiver leaves the group
1973func (igc *IgmpGroupChannel) IgmpLeaveToServer() {
1974 if leave, err := IgmpLeavePacket(igc.GroupAddr, igc.Mvlan, (*igc.proxyCfg).IgmpCos, **igc.IgmpProxyIP); err == nil {
1975 nni, err1 := GetApplication().GetNniPort(igc.Device)
1976 if err1 == nil {
1977 _ = cntlr.GetController().PacketOutReq(igc.Device, nni, nni, leave, false)
1978 }
1979 }
1980}
1981
1982// SendLeaveToServer delete the group when the last receiver leaves the group
1983func (igc *IgmpGroupChannel) SendLeaveToServer() {
1984 /**
1985 +-------------------------------------------------------------------------+
1986 | IGMP version(towards BNG) Configured at VGC |
1987 +-------------------------------+-----------------------------------------+
1988 | v2 | v3 |
1989 +===================+==========+===============================+=========================================+
1990 | Received From RG | V2 Leave | Process and Send as V2 to BNG | Process, Convert to V3 and Send to BNG/ |
1991 | | | | Process, Send as V2, if the BNG is V2 |
1992 +===================+----------+-------------------------------+-----------------------------------------+
1993 | V3 Leave | Process and Send as V2 to BNG | Process, Send V3 to BNG |
1994 | | | Process, Convert, Send as V2, if the |
1995 | | | BNG is v2 |
1996 +==========+===============================+=========================================+
1997 */
1998 // igc.Version: igmp version received from RG.
1999 // igc.ServVersion: igmp version received from BNG or IgmpVerToServer present in proxy igmp conf.
2000
2001 logger.Debugw(ctx, "Sending IGMP leave upstream", log.Fields{"Device": igc.Device})
2002 if *igc.ServVersion == IgmpVersion2 || getVersion((*igc.proxyCfg).IgmpVerToServer) == IgmpVersion2 {
2003 igc.IgmpLeaveToServer()
2004 } else {
2005 igc.SendReport(false)
2006 }
2007}
2008
2009// QueryExpiry processes query expiry. Upon expiry, take stock of the situation
2010// add either retain/release the group based on number of receivers left
2011func (igd *IgmpGroupDevice) QueryExpiry() {
2012 logger.Debugw(ctx, "Query Expiry", log.Fields{"Device": igd.Device})
2013
2014
2015 // Delete the IGMP flow added for this port if port state is down or query count exceeded
2016 handleQueryExp := func(key interface{}, value interface{}) bool {
2017 igc := value.(*IgmpGroupChannel)
2018 for portKey, port := range igc.CurReceivers {
2019
2020 if portKey == StaticPort {
2021 continue
2022 }
2023
2024 logger.Warnw(ctx, "Expired Receiver Port", log.Fields{"PortKey": portKey, "IGP": port, "GroupAddr": igc.GroupAddr,
2025 "Count": port.QueryTimeoutCount})
2026 state, err := cntlr.GetController().GetPortState(igc.Device, portKey)
2027 logger.Debugw(ctx, "Expired Member Port State", log.Fields{"state": state})
2028 ponPortID := GetApplication().GetPonPortID(igd.Device, portKey)
2029 if err == nil && state == cntlr.PortStateDown {
2030 igd.DelReceiver(igc.GroupAddr, portKey, nil, ponPortID)
2031 }
2032
2033 port.QueryTimeoutCount++
2034 logger.Debugw(ctx, "Expired Port TimeoutCount", log.Fields{"count": port.QueryTimeoutCount})
2035 if port.QueryTimeoutCount >= (*igc.proxyCfg).KeepAliveCount {
2036 logger.Errorw(ctx, "Expiry Timeout count exceeded. Trigger delete receiver", log.Fields{"PortKey": portKey,
2037 "GroupAddr": igc.GroupAddr, "Count": port.QueryTimeoutCount})
2038 igd.DelReceiver(igc.GroupAddr, portKey, nil, ponPortID)
2039 SendQueryExpiredEventGroupSpecific(portKey, igd, igc)
2040 } else {
2041 _ = port.WriteToDb(igc.Mvlan, igc.GroupAddr, igc.Device)
2042 }
2043 }
2044 return true
2045 }
2046 igd.GroupChannels.Range(handleQueryExp)
2047}
2048
2049// SendQueryExpiredEventGroupSpecific to send group specific query expired event.
2050func SendQueryExpiredEventGroupSpecific(portKey string, igd *IgmpGroupDevice, igc *IgmpGroupChannel) {
2051
2052 logger.Info(ctx, "Processing-SendQueryExpiredEventGroupSpecific-Event")
2053 va := GetApplication()
2054 mvpName := va.GetMvlanProfileByTag(igd.Mvlan).Name
2055
2056 sendEvent := func(key interface{}, value interface{}) bool {
2057 if value.(*VoltService).IgmpEnabled && value.(*VoltService).MvlanProfileName == mvpName {
2058 logger.Debugw(ctx, "sending-query-expired-group-specific-event", log.Fields{"EventType": QueryExpiredGroupSpecific, "ServiceName": value.(*VoltService).Name})
2059 }
2060 return false
2061 }
2062
2063 // Fetching service name to send with query expired event.
2064 vpvs, _ := va.VnetsByPort.Load(portKey)
2065 if vpvs == nil {
2066 logger.Errorw(ctx, "volt-port-vnet-is-nil", log.Fields{"vpvs": vpvs})
2067 return
2068 }
2069
2070 for _, vpv := range vpvs.([]*VoltPortVnet) {
2071 vpv.services.Range(sendEvent)
2072 }
2073}
2074
2075// GetMcastServiceForSubAlarm to get mcast service name for subscriber alarm.
2076func GetMcastServiceForSubAlarm(uniPort *VoltPort, mvp *MvlanProfile) string {
2077
2078 var serviceName string
2079 mvpName := mvp.Name
2080
2081 va := GetApplication()
2082
2083 sendAlm := func(key interface{}, value interface{}) bool {
2084 if value.(*VoltService).IgmpEnabled && value.(*VoltService).MvlanProfileName == mvpName {
2085 serviceName = value.(*VoltService).Name
2086 }
2087 return true
2088 }
2089
2090 // Fetching service name to send with active channels exceeded per subscriber alarm.
2091 vpvs, _ := va.VnetsByPort.Load(uniPort.Name)
2092 if vpvs == nil {
2093 logger.Errorw(ctx, "volt-port-vnet-is-nil", log.Fields{"vpvs": vpvs})
2094 return serviceName
2095 }
2096
2097 for _, vpv := range vpvs.([]*VoltPortVnet) {
2098 vpv.services.Range(sendAlm)
2099 }
2100
2101 return serviceName
2102
2103}
2104
2105// NumReceivers returns total number of receivers left on the group
2106func (igc *IgmpGroupChannel) NumReceivers() uint32 {
2107 return uint32(len(igc.CurReceivers) + len(igc.NewReceivers))
2108}
2109
2110// SendQuery sends query to the receivers for counting purpose
2111func (igc *IgmpGroupChannel) SendQuery() {
2112 //var b []byte
2113 //var err error
2114 for portKey, port := range igc.NewReceivers {
2115 igc.CurReceivers[portKey] = port
2116 }
2117
2118 igc.NewReceivers = make(map[string]*IgmpGroupPort)
2119
2120 logger.Debugw(ctx, "Sending Query to receivers", log.Fields{"Receivers": igc.CurReceivers})
2121 for port, groupPort := range igc.CurReceivers {
2122 if port == StaticPort {
2123 continue
2124 }
2125 if queryPkt, err := igc.buildQuery(igc.GroupAddr, of.VlanType(groupPort.CVlan), groupPort.Pbit); err == nil {
2126 _ = cntlr.GetController().PacketOutReq(igc.Device, port, port, queryPkt, false)
2127 logger.Debugw(ctx, "Query Sent", log.Fields{"Device": igc.Device, "Port": port, "Packet": queryPkt})
2128 } else {
2129 logger.Warnw(ctx, "Query Creation Failed", log.Fields{"Reason": err.Error()})
2130 }
2131 }
2132
2133}
2134
2135// buildQuery to build query packet
2136func (igc *IgmpGroupChannel) buildQuery(groupAddr net.IP, cVlan of.VlanType, pbit uint8) ([]byte, error) {
2137 if igc.Version == IgmpVersion2 {
2138 return Igmpv2QueryPacket(igc.GroupAddr, cVlan, **igc.IgmpProxyIP, pbit, (*igc.proxyCfg).MaxResp)
2139 }
2140 return Igmpv3QueryPacket(igc.GroupAddr, cVlan, **igc.IgmpProxyIP, pbit, (*igc.proxyCfg).MaxResp)
2141}
2142
2143// IgmpGroup implements a single MCIP that may have multiple receivers
2144// connected via multiple devices (OLTs). The IGMP group is stored on the
2145// VOLT application.
2146type IgmpGroup struct {
2147 GroupID uint32
2148 Mvlan of.VlanType
2149 PonVlan of.VlanType
2150 GroupName string
2151 GroupAddr net.IP
2152 Devices map[string]*IgmpGroupDevice `json:"-"`
2153 PendingGroupForDevice map[string]time.Time //map [deviceId, timestamp] (ExpiryTime = leave time + 15mins)
2154 Version string
2155 IsPonVlanPresent bool
2156 IsChannelBasedGroup bool
2157 PendingPoolLock sync.RWMutex
2158 IsGroupStatic bool
2159 IgmpGroupLock sync.RWMutex
2160}
2161
2162// NewIgmpGroup is constructor for an IGMP group
2163func NewIgmpGroup(name string, vlan of.VlanType) *IgmpGroup {
2164 ig := IgmpGroup{}
2165 ig.GroupName = name
2166 ig.Mvlan = vlan
2167 ig.Devices = make(map[string]*IgmpGroupDevice)
2168 ig.PendingGroupForDevice = make(map[string]time.Time)
2169 return &ig
2170}
2171
2172// IgmpGroupInit to initialize igmp group members
2173func (ig *IgmpGroup) IgmpGroupInit(name string, gip net.IP, mvp *MvlanProfile) {
2174 ig.GroupName = name
2175 ig.Mvlan = mvp.Mvlan
2176 ig.PonVlan = mvp.PonVlan
2177 ig.IsPonVlanPresent = mvp.IsPonVlanPresent
2178 ig.Devices = make(map[string]*IgmpGroupDevice)
2179 ig.PendingGroupForDevice = make(map[string]time.Time)
2180 ig.IsChannelBasedGroup = mvp.IsChannelBasedGroup
2181 ig.IsGroupStatic = mvp.Groups[name].IsStatic
2182 if ig.IsChannelBasedGroup {
2183 ig.GroupAddr = gip
2184 } else {
2185 ig.GroupAddr = net.ParseIP("0.0.0.0")
2186 }
2187}
2188
2189// IgmpGroupReInit to re-initialize igmp group members
2190func (ig *IgmpGroup) IgmpGroupReInit(name string, gip net.IP) {
2191
2192 logger.Infow(ctx, "Reinitialize Igmp Group", log.Fields{"GroupID": ig.GroupID, "OldName": ig.GroupName, "Name": name, "OldAddr": ig.GroupAddr.String(), "GroupAddr": gip.String()})
2193
2194 ig.GroupName = name
2195 if ig.IsChannelBasedGroup {
2196 ig.GroupAddr = gip
2197 } else {
2198 ig.GroupAddr = net.ParseIP("0.0.0.0")
2199 }
2200
2201 for _, igd := range ig.Devices {
2202 igd.IgmpGroupDeviceReInit(ig)
2203 }
2204}
2205
2206// IsStaticGroup to check if group is static
2207func (mvp *MvlanProfile) IsStaticGroup(groupName string) bool {
2208 return mvp.Groups[groupName].IsStatic
2209}
2210
2211// updateGroupName to update group name
2212func (ig *IgmpGroup) updateGroupName(newGroupName string) {
2213 if !ig.IsChannelBasedGroup {
2214 logger.Errorw(ctx, "Group name update not supported for GroupChannel based group", log.Fields{"Ig": ig})
2215 return
2216 }
2217 oldKey := ig.getKey()
2218 ig.GroupName = newGroupName
2219 for _, igd := range ig.Devices {
2220 igd.updateGroupName(newGroupName)
2221 }
2222 if err := ig.WriteToDb(); err != nil {
2223 logger.Errorw(ctx, "Igmp group Write to DB failed", log.Fields{"groupName": ig.GroupName})
2224 }
2225 if !ig.IsChannelBasedGroup {
2226 _ = db.DelIgmpGroup(oldKey)
2227 }
2228}
2229
2230//HandleGroupMigration - handles migration of group members between static & dynamic
2231func (ig *IgmpGroup) HandleGroupMigration(deviceID string, groupAddr net.IP) {
2232
2233 var group *layers.IGMPv3GroupRecord
2234 app := GetApplication()
2235 if deviceID == "" {
2236 logger.Infow(ctx, "Handle Group Migration Request for all devices", log.Fields{"DeviceID": deviceID, "GroupAddr": groupAddr, "IG": ig.GroupName, "Mvlan": ig.Mvlan})
2237 for device := range ig.Devices {
2238 ig.HandleGroupMigration(device, groupAddr)
2239 }
2240 } else {
2241 logger.Infow(ctx, "Handle Group Migration Request", log.Fields{"DeviceID": deviceID, "GroupAddr": groupAddr, "IG": ig.GroupName})
2242 var newIg *IgmpGroup
2243 receivers := ig.DelIgmpChannel(deviceID, groupAddr)
2244 if ig.NumDevicesActive() == 0 {
2245 app.DelIgmpGroup(ig)
2246 }
2247 if newIg = app.GetIgmpGroup(ig.Mvlan, groupAddr); newIg == nil {
2248 logger.Infow(ctx, "IG Group doesn't exist, creating new group", log.Fields{"DeviceID": deviceID, "GroupAddr": groupAddr, "IG": ig.GroupName, "Mvlan": ig.Mvlan})
2249 if newIg = app.AddIgmpGroup(app.GetMvlanProfileByTag(ig.Mvlan).Name, groupAddr, deviceID); newIg == nil {
2250 logger.Errorw(ctx, "Group Creation failed during group migration", log.Fields{"DeviceID": deviceID, "GroupAddr": groupAddr})
2251 return
2252 }
2253 }
2254 mvp := app.GetMvlanProfileByTag(ig.Mvlan)
2255 isStaticGroup := mvp.IsStaticGroup(ig.GroupName)
2256 logger.Infow(ctx, "Existing receivers for old group", log.Fields{"Receivers": receivers})
2257 newIg.IgmpGroupLock.Lock()
2258 for port, igp := range receivers {
2259 if !isStaticGroup && port == StaticPort {
2260 continue
2261 }
2262 group = nil
2263 var reqType layers.IGMPv3GroupRecordType
2264 srcAddresses := []net.IP{}
2265 if igp.Version == IgmpVersion3 {
2266 if igp.Exclude {
2267 srcAddresses = append(srcAddresses, igp.ExcludeList...)
2268 reqType = layers.IGMPIsEx
2269 } else {
2270 srcAddresses = append(srcAddresses, igp.IncludeList...)
2271 reqType = layers.IGMPIsIn
2272 }
2273 group = &layers.IGMPv3GroupRecord{
2274 SourceAddresses: srcAddresses,
2275 Type: reqType,
2276 }
2277 }
2278 logger.Infow(ctx, "Adding receiver to new group", log.Fields{"DeviceID": deviceID, "GroupAddr": groupAddr, "newIg": newIg.GroupName, "IGP": igp})
2279 ponPort := GetApplication().GetPonPortID(deviceID, port)
2280 newIg.AddReceiver(deviceID, port, groupAddr, group, igp.Version, igp.CVlan, igp.Pbit, ponPort)
2281 }
2282 newIg.IgmpGroupLock.Unlock()
2283 }
2284}
2285
2286// AddIgmpGroupDevice add a device to the group which happens when the first receiver of the device
2287// is added to the IGMP group.
2288func (ig *IgmpGroup) AddIgmpGroupDevice(device string, id uint32, version uint8) *IgmpGroupDevice {
2289 logger.Infow(ctx, "Adding Device to IGMP group", log.Fields{"Device": device, "GroupName": ig.GroupName})
2290 igd := NewIgmpGroupDevice(device, ig, id, version)
2291 ig.Devices[device] = igd
2292 if err := igd.WriteToDb(); err != nil {
2293 logger.Errorw(ctx, "Igmp group device Write to DB failed", log.Fields{"Device": igd.Device, "GroupName": igd.GroupName, "GroupAddr": igd.GroupAddr.String()})
2294 }
2295 return igd
2296}
2297
2298// DelIgmpGroupDevice delete the device from the group which happens when we receive a leave or when
2299// there is not response for IGMP query from the receiver
2300func (ig *IgmpGroup) DelIgmpGroupDevice(igd *IgmpGroupDevice) {
2301 logger.Infow(ctx, "Deleting Device from IGMP group", log.Fields{"Device": igd.Device, "Name": ig.GroupName})
2302 va := GetApplication()
2303 countersToBeUpdated := false
2304 if igd.NumReceivers() != 0 {
2305 countersToBeUpdated = true
2306 }
2307 igd.DelAllChannels()
2308
2309 //Clear all internal maps so that the groups can be reused
2310 igd.PortChannelMap.Range(func(key, value interface{}) bool {
2311
2312 //Update the counters only if not already updated
2313 //(i.e) 1. In case of channel remove during Mvlan Update
2314 if countersToBeUpdated {
2315 port := key.(string)
2316 channelList := value.([]net.IP)
2317 ponPortID := va.GetPonPortID(igd.Device, port)
2318
2319 for _, channel := range channelList {
2320 igd.RemoveChannelFromChannelsPerPon(port, channel, ponPortID)
2321 }
2322 }
2323
2324 igd.PortChannelMap.Delete(key)
2325 return true
2326 })
2327 igd.PonPortChannelMap = util.NewConcurrentMap()
2328
2329 if mcastCfg := va.GetMcastConfig(igd.SerialNo, va.GetMvlanProfileByTag(igd.Mvlan).Name); mcastCfg != nil {
2330 mcastCfg.IgmpGroupDevices.Delete(igd.GroupID)
2331 logger.Debugw(ctx, "Igd deleted from mcast config", log.Fields{"mvlan": mcastCfg.MvlanProfileID, "groupId": igd.GroupID})
2332 }
2333 if !igd.GroupInstalled {
2334 _ = db.DelIgmpDevice(igd.Mvlan, ig.GroupName, ig.GroupAddr, igd.Device)
2335 delete(ig.Devices, igd.Device)
2336 }
2337}
2338
2339// AddReceiver delete the device from the group which happens when we receive a leave or when
2340// there is not response for IGMP query from the receiver
2341func (ig *IgmpGroup) AddReceiver(device string, port string, groupIP net.IP,
2342 group *layers.IGMPv3GroupRecord, ver uint8, cvlan uint16, pbit uint8, ponPort uint32) {
2343
2344 logger.Debugw(ctx, "Adding Receiver", log.Fields{"Port": port})
2345 if igd, ok := ig.getIgmpGroupDevice(device); !ok {
2346 igd = ig.AddIgmpGroupDevice(device, ig.GroupID, ver)
2347 igd.AddReceiver(port, groupIP, group, ver, cvlan, pbit, ponPort)
2348 } else {
2349 logger.Infow(ctx, "IGMP Group Receiver", log.Fields{"IGD": igd.Device})
2350 igd.AddReceiver(port, groupIP, group, ver, cvlan, pbit, ponPort)
2351 }
2352}
2353
2354func (ig *IgmpGroup) getIgmpGroupDevice(device string) (*IgmpGroupDevice, bool) {
2355 ig.PendingPoolLock.Lock()
2356 defer ig.PendingPoolLock.Unlock()
2357
2358 if _, ok := ig.PendingGroupForDevice[device]; ok {
2359 logger.Infow(ctx, "Removing the IgmpGroupDevice from pending pool", log.Fields{"GroupID": ig.GroupID, "Device": device, "GroupName": ig.GroupName, "GroupAddr": ig.GroupAddr.String()})
2360 delete(ig.PendingGroupForDevice, device)
2361 if err := ig.WriteToDb(); err != nil {
2362 logger.Errorw(ctx, "Igmp group Write to DB failed", log.Fields{"groupName": ig.GroupName})
2363 }
2364 }
2365 igd, ok := ig.Devices[device]
2366 return igd, ok
2367}
2368
2369// DelReceiveronDownInd deletes a receiver which is the combination of device (OLT)
2370// and port on Port Down event
2371func (ig *IgmpGroup) DelReceiveronDownInd(device string, port string, ponPortID uint32) {
2372 logger.Debugw(ctx, "Deleting Receiver for Group", log.Fields{"Device": device, "port": port})
2373
2374 mvp := GetApplication().GetMvlanProfileByTag(ig.Mvlan)
2375 mvp.mvpLock.RLock()
2376 defer mvp.mvpLock.RUnlock()
2377 igd, ok := ig.Devices[device]
2378 if !ok {
2379 logger.Infow(ctx, "IGMP Group device was not found for ", log.Fields{"Device": device})
2380 return
2381 }
2382 ipsList := []net.IP{}
2383 ipsListIntf, ok := igd.PortChannelMap.Load(port)
2384 if ok {
2385 ipsList = append(ipsList, ipsListIntf.([]net.IP)...)
2386 }
2387 logger.Infow(ctx, "Port Channel List", log.Fields{"Port": port, "IPsList": ipsList})
2388 igd.PortChannelMap.Range(printPortChannel)
2389
2390
2391 for _, groupAddr := range ipsList {
2392 logger.Debugw(ctx, "Port Channels", log.Fields{"Port": port, "IPsList": ipsList, "GroupAddr": groupAddr, "Len": len(ipsList)})
2393 igd.DelReceiver(groupAddr, port, nil, ponPortID)
2394 }
2395
2396 if igd.NumReceivers() == 0 {
2397 ig.DelIgmpGroupDevice(igd)
2398 }
2399}
2400
2401// DelReceiver deletes a receiver which is the combination of device (OLT)
2402// and port
2403func (ig *IgmpGroup) DelReceiver(device string, port string, groupAddr net.IP, group *layers.IGMPv3GroupRecord, ponPortID uint32) {
2404 logger.Debugw(ctx, "Deleting Receiver for Group", log.Fields{"Device": device, "port": port, "GroupIP": groupAddr.String()})
2405 if igd, ok := ig.Devices[device]; ok {
2406 //igd.DelReceiverForGroupAddr(groupAddr, port)
2407 igd.DelReceiver(groupAddr, port, group, ponPortID)
2408 if igd.NumReceivers() == 0 {
2409 ig.DelIgmpGroupDevice(igd)
2410 }
2411 }
2412}
2413
2414// GetAllIgmpChannelForDevice - Returns all channels with active members associated to the Igmp Group for the given device
2415func (ig *IgmpGroup) GetAllIgmpChannelForDevice(deviceID string) map[string]string {
2416
2417 if deviceID == "" {
2418 return ig.GetAllIgmpChannel()
2419 }
2420
2421 allChannels := make(map[string]string)
2422 igd := ig.Devices[deviceID]
2423 getAllChannels := func(key interface{}, value interface{}) bool {
2424 channels := key.(string)
2425 allChannels[channels] = channels //same value as only key is required
2426 return true
2427 }
2428 igd.GroupChannels.Range(getAllChannels)
2429
2430 return allChannels
2431}
2432
2433// GetAllIgmpChannel - Returns all channels with active members associated to the Igmp Group
2434func (ig *IgmpGroup) GetAllIgmpChannel() map[string]string {
2435 allChannels := make(map[string]string)
2436 for _, igd := range ig.Devices {
2437 getAllChannels := func(key interface{}, value interface{}) bool {
2438 channels := key.(string)
2439 allChannels[channels] = channels
2440 return true
2441 }
2442 igd.GroupChannels.Range(getAllChannels)
2443 }
2444 return allChannels
2445}
2446
2447// DelIgmpChannel deletes all receivers for the provided igmp group channel for the given device
2448func (ig *IgmpGroup) DelIgmpChannel(deviceID string, groupAddr net.IP) map[string]*IgmpGroupPort {
2449 logger.Infow(ctx, "Deleting Channel from devices", log.Fields{"Device": deviceID, "Group": ig.GroupName, "Channel": groupAddr.String()})
2450 if deviceID == "" {
2451 for device := range ig.Devices {
2452 ig.DelIgmpChannel(device, groupAddr)
2453 }
2454 return nil
2455 }
2456 igd := ig.Devices[deviceID]
2457 receivers := igd.DelChannelReceiver(groupAddr)
2458 if igd.NumReceivers() == 0 {
2459 ig.DelIgmpGroupDevice(igd)
2460 }
2461 return receivers
2462}
2463
2464// IsNewReceiver checks if the received port is new receiver or existing one.
2465// Returns true if new receiver.
2466func (ig *IgmpGroup) IsNewReceiver(device, uniPortID string, groupAddr net.IP) bool {
2467 if ig == nil {
2468 // IGMP group does not exists. So considering it as new receiver.
2469 return true
2470 }
2471 logger.Debugw(ctx, "IGMP Group", log.Fields{"channel": groupAddr, "groupName": ig.GroupName}) // TODO: Remove me
2472 igd, exists := ig.Devices[device]
2473 if !exists || !igd.GroupInstalled {
2474 // IGMP group not exists OR Group is not created in the device.
2475 // So this is a new receiver.
2476 logger.Debugw(ctx, "igd not exists or group is not created in device", log.Fields{"exists": exists}) // TODO: Remove me
2477 return true
2478 }
2479 if igc, ok := igd.GroupChannels.Load(groupAddr.String()); ok {
2480 logger.Debugw(ctx, "IGMP Channel receivers", log.Fields{"igc-receivers": igc.(*IgmpGroupChannel).CurReceivers}) // TODO: Remove me
2481 _, rcvrExistCur := igc.(*IgmpGroupChannel).CurReceivers[uniPortID]
2482 _, rcvrExistNew := igc.(*IgmpGroupChannel).NewReceivers[uniPortID]
2483 if rcvrExistCur || rcvrExistNew {
2484 // Existing receiver
2485 return false
2486 }
2487 }
2488 return true
2489}
2490
2491// Tick for Addition of groups to an MVLAN profile
2492func (ig *IgmpGroup) Tick() {
2493 now := time.Now()
2494 for _, igd := range ig.Devices {
2495 var igdChangeCnt uint8
2496
2497 if _, ok := GetApplication().DevicesDisc.Load(igd.Device); !ok {
2498 logger.Info(ctx, "Skipping Query and Expiry check since Device is unavailable")
2499 continue
2500 }
2501 if now.After(igd.NextQueryTime) {
2502 // Set the next query time and the query expiry time to
2503 // KeepAliveInterval and MaxResp seconds after current time
2504 igd.NextQueryTime = now.Add(time.Duration(igd.proxyCfg.KeepAliveInterval) * time.Second)
2505 igd.QueryExpiryTime = now.Add(time.Duration(igd.proxyCfg.MaxResp) * time.Second)
2506 logger.Debugw(ctx, "Query Start", log.Fields{"NextQuery": igd.NextQueryTime, "Expiry": igd.QueryExpiryTime})
2507 igdChangeCnt++
2508 logger.Debugw(ctx, "Sending Query to device", log.Fields{"Device": igd.Device})
2509 sendQueryForAllChannels := func(key interface{}, value interface{}) bool {
2510 igc := value.(*IgmpGroupChannel)
2511 //TODO - Do generic query to avoid multiple msgs
2512 igc.SendQuery()
2513 return true
2514 }
2515 igd.GroupChannels.Range(sendQueryForAllChannels)
2516 }
2517 if now.After(igd.QueryExpiryTime) {
2518 igd.QueryExpiry()
2519 // This will keep it quiet till the next query time and then
2520 // it will be reset to a value after the query initiation time
2521 igd.QueryExpiryTime = igd.NextQueryTime
2522 logger.Debugw(ctx, "Expiry", log.Fields{"NextQuery": igd.NextQueryTime, "Expiry": igd.QueryExpiryTime})
2523 igdChangeCnt++
2524 if igd.NumReceivers() == 0 {
2525 ig.DelIgmpGroupDevice(igd)
2526 continue
2527 }
2528 }
2529
2530 igdChangeCnt += igd.Tick()
2531
2532 if igdChangeCnt > 0 {
2533 if err := igd.WriteToDb(); err != nil {
2534 logger.Errorw(ctx, "Igmp group device Write to DB failed", log.Fields{"Device": igd.Device,
2535 "GroupName": igd.GroupName, "GroupAddr": igd.GroupAddr.String()})
2536 }
2537 }
2538 }
2539}
2540
2541// QueryExpiry processes expiry of query sent to the receivers. Up on
2542// expiry, process the consolidated response for each of the devices participating
2543// in the MC stream. When a device has no receivers, the device is deleted
2544// from the group.
2545func (ig *IgmpGroup) QueryExpiry() {
2546 for _, igd := range ig.Devices {
2547 if _, ok := GetApplication().DevicesDisc.Load(igd.Device); ok {
2548 igd.QueryExpiry()
2549 if igd.NumReceivers() == 0 {
2550 ig.DelIgmpGroupDevice(igd)
2551 }
2552
2553 } else {
2554 logger.Info(ctx, "Skipping Expiry since Device is unavailable")
2555 }
2556 }
2557}
2558
2559// Hash : The IGMP group hash is used to distribute the processing of timers so that
2560// the processing is spread across doesn't spike at one instant. This also
2561// ensures that there is sufficient responsiveness to other requests happening
2562// simultaneously.
2563func (ig *IgmpGroup) Hash() uint16 {
2564 mvp := GetApplication().GetMvlanProfileByTag(ig.Mvlan)
2565
2566 if mvp == nil {
2567 return 0
2568 }
2569
2570 mvp.mvpLock.RLock()
2571 defer mvp.mvpLock.RUnlock()
2572 group := mvp.Groups[ig.GroupName]
2573
2574 //Case where mvlan update in-progress
2575 if group == nil || len(group.McIPs) == 0 {
2576 return 0
2577 }
2578 groupIP := group.McIPs[0]
2579 return uint16(groupIP[2])<<8 + uint16(groupIP[3])
2580}
2581
2582// NumDevicesAll returns the number of devices (OLT) active on the IGMP group. When
2583// the last device leaves the IGMP group is removed. If this is not done,
2584// the number of IGMP groups only keep increasing and can impact CPU when
2585// the system runs for a very long duration
2586func (ig *IgmpGroup) NumDevicesAll() int {
2587 return len(ig.Devices)
2588}
2589
2590// NumDevicesActive returns the number of devices (OLT) active on the IGMP group. When
2591// the last device leaves the IGMP group is removed. If this is not done,
2592// the number of IGMP groups only keep increasing and can impact CPU when
2593// the system runs for a very long duration
2594func (ig *IgmpGroup) NumDevicesActive() int {
2595 count := 0
2596 for _, igd := range ig.Devices {
2597 if igd.NumReceivers() == 0 && igd.GroupInstalled {
2598 continue
2599 }
2600 count++
2601 }
2602 return count
2603}
2604
2605// NumReceivers to return receiver list
2606func (ig *IgmpGroup) NumReceivers() map[string]int {
2607 receiverList := make(map[string]int)
2608 for device, igd := range ig.Devices {
2609 receiverList[device] = igd.NumReceivers()
2610 }
2611 return receiverList
2612}
2613
2614// RestoreDevices : IGMP group write to DB
2615func (ig *IgmpGroup) RestoreDevices() {
2616
2617 ig.migrateIgmpDevices()
2618 devices, _ := db.GetIgmpDevices(ig.Mvlan, ig.GroupName, ig.GroupAddr)
2619 for _, device := range devices {
2620 b, ok := device.Value.([]byte)
2621 if !ok {
2622 logger.Warn(ctx, "The value type is not []byte")
2623 continue
2624 }
2625 if igd, err := NewIgmpGroupDeviceFromBytes(b); err == nil {
2626 igd.PonPortChannelMap = util.NewConcurrentMap()
2627 // Update the proxy config pointers.
2628 var mcastCfg *McastConfig
2629 igd.proxyCfg, igd.IgmpProxyIP, mcastCfg = getIgmpProxyCfgAndIP(ig.Mvlan, igd.SerialNo)
2630 if mcastCfg != nil {
2631 mcastCfg.IgmpGroupDevices.Store(igd.GroupID, igd)
2632 logger.Debugw(ctx, "Igd added to mcast config", log.Fields{"mvlan": mcastCfg.MvlanProfileID, "groupId": igd.GroupID})
2633 }
2634
2635 mvp := GetApplication().GetMvlanProfileByTag(igd.Mvlan)
2636 igd.ServVersion = mvp.IgmpServVersion[igd.SerialNo]
2637
2638 // During vgc upgrade from old version, igd.NextQueryTime and igd.QueryExpiryTime will not be present in db.
2639 // hence they are initialized with current time offset.
2640 emptyTime := time.Time{}
2641 if emptyTime == igd.NextQueryTime {
2642 logger.Debugw(ctx, "VGC igd upgrade", log.Fields{"igd grp name": igd.GroupName})
2643 igd.NextQueryTime = time.Now().Add(time.Duration(igd.proxyCfg.KeepAliveInterval) * time.Second)
2644 igd.QueryExpiryTime = time.Now().Add(time.Duration(igd.proxyCfg.KeepAliveInterval) * time.Second)
2645 if err := igd.WriteToDb(); err != nil {
2646 logger.Errorw(ctx, "Igmp group device Write to DB failed", log.Fields{"Device": igd.Device,
2647 "GroupName": igd.GroupName, "GroupAddr": igd.GroupAddr.String()})
2648 }
2649 }
2650
2651 ig.Devices[igd.Device] = igd
2652 if ig.IsChannelBasedGroup {
2653 channel, _ := db.GetIgmpChannel(igd.Mvlan, igd.GroupName, igd.Device, igd.GroupAddr)
2654 igd.RestoreChannel([]byte(channel))
2655 } else {
2656 igd.RestoreChannels()
2657 }
2658 igd.PortChannelMap.Range(printPortChannel)
2659 logger.Infow(ctx, "Group Device Restored", log.Fields{"IGD": igd})
2660 } else {
2661 logger.Warnw(ctx, "Unable to decode device from database", log.Fields{"str": string(b)})
2662 }
2663 }
2664}
2665
2666// getKey to return group key
2667func (ig *IgmpGroup) getKey() string {
2668 profile, ok := GetApplication().MvlanProfilesByTag.Load(ig.Mvlan)
2669 if ok {
2670 mvp := profile.(*MvlanProfile)
2671 return mvp.generateGroupKey(ig.GroupName, ig.GroupAddr.String())
2672 }
2673 return ""
2674}
2675
2676/*
2677// getKey to return group key
2678func (igd *IgmpGroupDevice) getKey() string {
2679 profile, ok := GetApplication().MvlanProfilesByTag.Load(igd.Mvlan)
2680 if ok {
2681 mvp := profile.(*MvlanProfile)
2682 return mvp.generateGroupKey(igd.GroupName, igd.GroupAddr.String())
2683 }
2684 return ""
2685}*/
2686
2687// generateGroupKey to generate group key
2688func (mvp *MvlanProfile) generateGroupKey(name string, ipAddr string) string {
2689 if mvp.IsChannelBasedGroup {
2690 return mvp.Mvlan.String() + "_" + ipAddr
2691 }
2692 return mvp.Mvlan.String() + "_" + name
2693}
2694
2695// WriteToDb is utility to write Igmp Group Info to database
2696func (ig *IgmpGroup) WriteToDb() error {
2697 ig.Version = database.PresentVersionMap[database.IgmpGroupPath]
2698 b, err := json.Marshal(ig)
2699 if err != nil {
2700 return err
2701 }
2702 if err1 := db.PutIgmpGroup(ig.getKey(), string(b)); err1 != nil {
2703 return err1
2704 }
2705 return nil
2706}
2707
2708// RestoreIgmpGroupsFromDb to restore igmp groups from database
2709func (va *VoltApplication) RestoreIgmpGroupsFromDb() {
2710
2711 groups, _ := db.GetIgmpGroups()
2712 for _, group := range groups {
2713 b, ok := group.Value.([]byte)
2714 if !ok {
2715 logger.Warn(ctx, "The value type is not []byte")
2716 continue
2717 }
2718 var ig IgmpGroup
2719 err := json.Unmarshal(b, &ig)
2720 if err != nil {
2721 logger.Warn(ctx, "Unmarshal of IGMP Group failed")
2722 continue
2723 }
2724 ig.Devices = make(map[string]*IgmpGroupDevice)
2725
2726 //For Upgrade Case
2727 if len(ig.PendingGroupForDevice) == 0 {
2728 ig.PendingGroupForDevice = make(map[string]time.Time)
2729 }
2730 logger.Infow(ctx, "Restoring Groups", log.Fields{"igGroupID": ig.GroupID, "igGroupName": ig.GroupName, "igMvlan": ig.Mvlan})
2731 grpKey := ig.getKey()
2732 va.IgmpGroups.Store(grpKey, &ig)
2733 // Just delete and lose the IGMP group with the same group Id
2734 if _, err := va.GetIgmpGroupID(ig.GroupID); err != nil {
2735 logger.Warnw(ctx, "GetIgmpGroupID Failed", log.Fields{"igGroupID": ig.GroupID, "Error": err})
2736 }
2737 ig.RestoreDevices()
2738
2739 if ig.NumDevicesActive() == 0 {
2740 va.AddGroupToPendingPool(&ig)
2741 }
2742 logger.Infow(ctx, "Restored Groups", log.Fields{"igGroupID": ig.GroupID, "igGroupName": ig.GroupName, "igMvlan": ig.Mvlan})
2743 }
2744}
2745
2746// AddIgmpGroup : When the first IGMP packet is received, the MVLAN profile is identified
2747// for the IGMP group and grp obj is obtained from the available pending pool of groups.
2748// If not, new group obj will be created based on available group IDs
2749func (va *VoltApplication) AddIgmpGroup(mvpName string, gip net.IP, device string) *IgmpGroup {
2750
2751 var ig *IgmpGroup
2752 if mvp, grpName := va.GetMvlanProfileForMcIP(mvpName, gip); mvp != nil {
2753 if ig = va.GetGroupFromPendingPool(mvp.Mvlan, device); ig != nil {
2754 logger.Infow(ctx, "Igmp Group obtained from global pending pool", log.Fields{"MvlanProfile": mvpName, "GroupID": ig.GroupID, "Device": device, "GroupName": ig.GroupName, "GroupAddr": ig.GroupAddr.String()})
2755 oldKey := mvp.generateGroupKey(ig.GroupName, ig.GroupAddr.String())
2756 ig.IgmpGroupReInit(grpName, gip)
2757 ig.IsGroupStatic = mvp.Groups[grpName].IsStatic
2758 ig.UpdateIgmpGroup(oldKey, ig.getKey())
2759 } else {
2760 logger.Infow(ctx, "No Igmp Group available in global pending pool. Creating new Igmp Group", log.Fields{"MvlanProfile": mvpName, "Device": device, "GroupAddr": gip.String()})
2761 if ig = va.GetAvailIgmpGroupID(); ig == nil {
2762 logger.Error(ctx, "Igmp Group Creation Failed: Group Id Unavailable")
2763 return nil
2764 }
2765 ig.IgmpGroupInit(grpName, gip, mvp)
2766 grpKey := ig.getKey()
2767 va.IgmpGroups.Store(grpKey, ig)
2768 }
2769 if err := ig.WriteToDb(); err != nil {
2770 logger.Errorw(ctx, "Igmp group Write to DB failed", log.Fields{"groupName": ig.GroupName})
2771 }
2772 return ig
2773 }
Tinoj Joseph1d108322022-07-13 10:07:39 +05302774 logger.Errorw(ctx, "GetMvlan Pro failed", log.Fields{"Group": gip})
Naveen Sampath04696f72022-06-13 15:19:14 +05302775 return nil
2776}
2777
2778// GetIgmpGroup helps search for the IGMP group from the list of
2779// active IGMP groups. For now, the assumption is that a group
2780// cannot belong to more than on MVLAN. If we change that definition,
2781// we have to take a relook at this implementation. The key will include
2782// both MVLAN and the group IP.
2783func (va *VoltApplication) GetIgmpGroup(mvlan of.VlanType, gip net.IP) *IgmpGroup {
2784
2785 profile, _ := va.MvlanProfilesByTag.Load(mvlan)
2786 if profile == nil {
2787 logger.Errorw(ctx, "Mvlan Profile not found for incoming packet. Dropping Request", log.Fields{"Mvlan": mvlan, "GroupAddr": gip.String()})
2788 return nil
2789 }
2790 mvp := profile.(*MvlanProfile)
2791 _, gName := va.GetMvlanProfileForMcIP(mvp.Name, gip)
2792 grpKey := mvp.generateGroupKey(gName, gip.String())
2793 logger.Debugw(ctx, "Get IGMP Group", log.Fields{"Group": grpKey})
2794 igIntf, ok := va.IgmpGroups.Load(grpKey)
2795 if ok {
2796 logger.Debugw(ctx, "Get IGMP Group Success", log.Fields{"Group": grpKey})
2797 ig := igIntf.(*IgmpGroup)
2798
2799 //Case: Group was part of pending and Join came with same channel or different channel from same group
2800 // (from same or different device)
2801 // In that case, the same group will be allocated since the group is still part of va.IgmpGroups
2802 // So, the groups needs to be removed from global pending pool
2803 va.RemoveGroupDevicesFromPendingPool(ig)
2804 return ig
2805 }
2806 return nil
2807}
2808
2809// GetStaticGroupName to get static igmp group
2810func (mvp *MvlanProfile) GetStaticGroupName(gip net.IP) string {
2811 for _, mvg := range mvp.Groups {
2812 if mvg.IsStatic {
2813 if doesIPMatch(gip, mvg.McIPs) {
2814 return mvg.Name
2815 }
2816 }
2817 }
2818 return ""
2819}
2820
2821// GetStaticIgmpGroup to get static igmp group
2822func (mvp *MvlanProfile) GetStaticIgmpGroup(gip net.IP) *IgmpGroup {
2823
2824 staticGroupName := mvp.GetStaticGroupName(gip)
2825 grpKey := mvp.generateGroupKey(staticGroupName, gip.String())
2826 logger.Debugw(ctx, "Get Static IGMP Group", log.Fields{"Group": grpKey})
2827 ig, ok := GetApplication().IgmpGroups.Load(grpKey)
2828 if ok {
2829 logger.Debugw(ctx, "Get Static IGMP Group Success", log.Fields{"Group": grpKey})
2830 return ig.(*IgmpGroup)
2831 }
2832 return nil
2833}
2834
2835// UpdateIgmpGroup : When the pending group is allocated to new
2836func (ig *IgmpGroup) UpdateIgmpGroup(oldKey, newKey string) {
2837
2838 //If the group is allocated to same McastGroup, no need to update the
2839 //IgmpGroups map
2840 if oldKey == newKey {
2841 return
2842 }
2843 logger.Infow(ctx, "Updating Igmp Group with new MVP Group Info", log.Fields{"OldKey": oldKey, "NewKey": newKey, "GroupID": ig.GroupID})
2844
2845 GetApplication().IgmpGroups.Delete(oldKey)
2846 _ = db.DelIgmpGroup(oldKey)
2847
2848 GetApplication().IgmpGroups.Store(newKey, ig)
2849 if err := ig.WriteToDb(); err != nil {
2850 logger.Errorw(ctx, "Igmp group Write to DB failed", log.Fields{"groupName": ig.GroupName})
2851 }
2852}
2853
2854// DelIgmpGroup : When the last subscriber leaves the IGMP group across all the devices
2855// the IGMP group is removed.
2856func (va *VoltApplication) DelIgmpGroup(ig *IgmpGroup) {
2857
2858 profile, found := GetApplication().MvlanProfilesByTag.Load(ig.Mvlan)
2859 if found {
2860 mvp := profile.(*MvlanProfile)
2861
2862 grpKey := mvp.generateGroupKey(ig.GroupName, ig.GroupAddr.String())
2863
2864 if igIntf, ok := va.IgmpGroups.Load(grpKey); ok {
2865 ig := igIntf.(*IgmpGroup)
2866 ig.IgmpGroupLock.Lock()
2867 if ig.NumDevicesAll() == 0 {
2868 logger.Debugw(ctx, "Deleting IGMP Group", log.Fields{"Group": grpKey})
2869 va.PutIgmpGroupID(ig)
2870 va.IgmpGroups.Delete(grpKey)
2871 _ = db.DelIgmpGroup(grpKey)
2872 } else {
2873 logger.Infow(ctx, "Skipping IgmpGroup Device. Pending Igmp Group Devices present", log.Fields{"GroupID": ig.GroupID, "GroupName": ig.GroupName, "GroupAddr": ig.GroupAddr.String(), "PendingDevices": len(ig.Devices)})
2874 va.AddGroupToPendingPool(ig)
2875 if err := ig.WriteToDb(); err != nil {
2876 logger.Errorw(ctx, "Igmp group Write to DB failed", log.Fields{"groupName": ig.GroupName})
2877 }
2878 }
2879 ig.IgmpGroupLock.Unlock()
2880 }
2881
2882 }
2883}
2884
2885// GetPonPortID Gets the PON port ID from uniPortID
2886func (va *VoltApplication) GetPonPortID(device, uniPortID string) uint32 {
2887
2888 isNNI := strings.Contains(uniPortID, "nni")
2889 if isNNI || uniPortID == StaticPort {
2890 logger.Debugw(ctx, "Cannot get pon port from UNI port", log.Fields{"port": uniPortID})
2891 return 0xFF
2892 }
2893 dIntf, ok := va.DevicesDisc.Load(device)
2894 if !ok {
2895 return 0xFF
2896 }
2897 d := dIntf.(*VoltDevice)
2898
2899 uniPort := d.GetPort(uniPortID)
2900 if uniPort == nil {
2901 return 0xFF
2902 }
2903 return GetPonPortIDFromUNIPort(uniPort.ID)
2904}
2905
2906// AggActiveChannelsCountPerSub aggregates the active channel count for given uni port.
2907// It will iterate over all the groups and store the sum of active channels in VoltPort.
2908func (va *VoltApplication) AggActiveChannelsCountPerSub(device, uniPort string, port *VoltPort) {
2909 var activeChannelCount uint32
2910
2911 collectActiveChannelCount := func(key interface{}, value interface{}) bool {
2912 ig := value.(*IgmpGroup)
2913 igd := ig.Devices[device]
2914 if igd == nil {
2915 return true
2916 }
2917 if portChannels, ok := igd.PortChannelMap.Load(uniPort); ok {
2918 channelList := portChannels.([]net.IP)
2919 activeChannelCount += uint32(len(channelList))
2920 }
2921 return true
2922 }
2923 va.IgmpGroups.Range(collectActiveChannelCount)
2924
2925 logger.Debugw(ctx, "AggrActiveChannelCount for Subscriber",
2926 log.Fields{"UniPortID": uniPort, "count": activeChannelCount})
2927
2928 port.ActiveChannels = activeChannelCount
2929}
2930
2931// AggActiveChannelsCountForPonPort Aggregates the active channel count for given pon port.
2932// It will iterate over all the groups and store the sum of active channels in VoltDevice.
2933func (va *VoltApplication) AggActiveChannelsCountForPonPort(device string, ponPortID uint32, port *PonPortCfg) {
2934
2935 var activeChannelCount uint32
2936
2937 collectActiveChannelCount := func(key interface{}, value interface{}) bool {
2938 ig := value.(*IgmpGroup)
2939 igd := ig.Devices[device]
2940 if igd == nil {
2941 return true
2942 }
2943 if ponPortChannels, ok := igd.PonPortChannelMap.Get(ponPortID); ok {
2944 activeChannelCount += ponPortChannels.(*PonPortChannels).GetActiveChannelCount()
2945 }
2946 return true
2947 }
2948 va.IgmpGroups.Range(collectActiveChannelCount)
2949
2950 logger.Debugw(ctx, "AggrActiveChannelCount for Pon Port",
2951 log.Fields{"PonPortID": ponPortID, "count": activeChannelCount})
2952
2953 port.ActiveIGMPChannels = activeChannelCount
2954}
2955
2956// UpdateActiveChannelCountForPonPort increments the global counter for active
2957// channel count per pon port.
2958func (va *VoltApplication) UpdateActiveChannelCountForPonPort(device, uniPortID string, ponPortID uint32, isAdd, isChannel bool, igd *IgmpGroupDevice) {
2959 incrDecr := func(value uint32) uint32 {
2960 if isAdd {
2961 return value + 1
2962 }
2963 return value - 1
2964 }
2965 if d, exists := va.DevicesDisc.Load(device); exists {
2966 voltDevice := d.(*VoltDevice)
2967
2968 if isChannel {
2969 voltDevice.ActiveChannelCountLock.Lock()
2970 // If New channel is added/deleted, then only update the ActiveChannelsPerPon
2971 if value, ok := voltDevice.ActiveChannelsPerPon.Load(ponPortID); ok {
2972 port := value.(*PonPortCfg)
2973 port.ActiveIGMPChannels = incrDecr(port.ActiveIGMPChannels)
2974 voltDevice.ActiveChannelsPerPon.Store(ponPortID, port)
2975 logger.Debugw(ctx, "+++ActiveChannelsPerPon", log.Fields{"count": port.ActiveIGMPChannels}) // TODO: remove me
2976 }
2977 voltDevice.ActiveChannelCountLock.Unlock()
2978 }
2979 if uPort, ok := voltDevice.Ports.Load(uniPortID); ok {
2980 uniPort := uPort.(*VoltPort)
2981 uniPort.ActiveChannels = incrDecr(uniPort.ActiveChannels)
2982 voltDevice.Ports.Store(uniPortID, uniPort)
2983 logger.Debugw(ctx, "+++ActiveChannelsPerSub", log.Fields{"count": uniPort.ActiveChannels}) // TODO: remove me
2984 }
2985 }
2986}
2987
2988// IsMaxChannelsCountExceeded checks if the PON port active channel
2989// capacity and subscriber level channel capacity is reached to max allowed
2990// channel per pon threshold. If Exceeds, return true else return false.
2991func (va *VoltApplication) IsMaxChannelsCountExceeded(device, uniPortID string,
2992 ponPortID uint32, ig *IgmpGroup, channelIP net.IP, mvp *MvlanProfile) bool {
2993
2994 // New receiver check is required to identify the IgmpReportMsg received
2995 // in response to the IGMP Query sent from VGC.
2996 if newReceiver := ig.IsNewReceiver(device, uniPortID, channelIP); !newReceiver {
2997 logger.Debugw(ctx, "Not a new receiver. It is a response to IGMP Query",
2998 log.Fields{"port": uniPortID, "channel": channelIP})
2999 return false
3000 }
3001
3002 if vDev, exists := va.DevicesDisc.Load(device); exists {
3003 voltDevice := vDev.(*VoltDevice)
3004
3005 // Checking subscriber active channel count with maxChannelsAllowedPerSub
3006 if uniPort, present := voltDevice.Ports.Load(uniPortID); present {
3007 if uniPort.(*VoltPort).ActiveChannels >= mvp.MaxActiveChannels {
3008 logger.Errorw(ctx, "Max allowed channels per subscriber is exceeded",
3009 log.Fields{"activeCount": uniPort.(*VoltPort).ActiveChannels, "channel": channelIP, "UNI": uniPort.(*VoltPort).Name})
3010 if !(uniPort.(*VoltPort).ChannelPerSubAlarmRaised) {
3011 serviceName := GetMcastServiceForSubAlarm(uniPort.(*VoltPort), mvp)
3012 logger.Debugw(ctx, "Raising-SendActiveChannelPerSubscriberAlarm-Initiated", log.Fields{"ActiveChannels": uniPort.(*VoltPort).ActiveChannels, "ServiceName": serviceName})
3013 uniPort.(*VoltPort).ChannelPerSubAlarmRaised = true
3014 }
3015 return true
3016 }
3017 } else {
3018 logger.Errorw(ctx, "UNI port not found in VoltDevice", log.Fields{"uniPortID": uniPortID})
3019 }
3020 if value, ok := voltDevice.ActiveChannelsPerPon.Load(ponPortID); ok {
3021 ponPort := value.(*PonPortCfg)
3022
3023 logger.Debugw(ctx, "----Active channels count for PON port",
3024 log.Fields{"PonPortID": ponPortID, "activeChannels": ponPort.ActiveIGMPChannels,
3025 "maxAllowedChannelsPerPon": ponPort.MaxActiveChannels})
3026
3027 if ponPort.ActiveIGMPChannels < ponPort.MaxActiveChannels {
3028 // PON port active channel capacity is not yet reached to max allowed channels per pon.
3029 // So allowing to add receiver.
3030 return false
3031 } else if ponPort.ActiveIGMPChannels >= ponPort.MaxActiveChannels && ig != nil {
3032 // PON port active channel capacity is reached to max allowed channels per pon.
3033 // Check if same channel is already configured on that PON port.
3034 // If that channel is present, then allow AddReceiver else it will be rejected.
3035 igd, isPresent := ig.Devices[device]
3036 if isPresent {
3037 if channelListForPonPort, _ := igd.PonPortChannelMap.Get(ponPortID); channelListForPonPort != nil {
3038 if _, isExists := channelListForPonPort.(*PonPortChannels).ChannelList.Get(channelIP.String()); isExists {
3039 return false
3040 }
3041 }
3042 }
3043 }
3044 logger.Errorw(ctx, "Active channels count for PON port exceeded",
3045 log.Fields{"PonPortID": ponPortID, "activeChannels": ponPort.ActiveIGMPChannels, "channel": channelIP, "UNI": uniPortID})
3046 } else {
3047 logger.Warnw(ctx, "PON port level active channel count does not exists",
3048 log.Fields{"ponPortID": ponPortID})
3049 return false
3050 }
3051 }
3052 logger.Warnw(ctx, "Max allowed channels per pon threshold is reached", log.Fields{"PonPortID": ponPortID})
3053 return true
3054}
3055
3056// ProcessIgmpv2Pkt : This is IGMPv2 packet.
3057func (va *VoltApplication) ProcessIgmpv2Pkt(device string, port string, pkt gopacket.Packet) {
3058 // First get the layers of interest
3059 dot1Q := pkt.Layer(layers.LayerTypeDot1Q).(*layers.Dot1Q)
3060 pktVlan := of.VlanType(dot1Q.VLANIdentifier)
3061 igmpv2 := pkt.Layer(layers.LayerTypeIGMP).(*layers.IGMPv1or2)
3062
3063 ponPortID := va.GetPonPortID(device, port)
3064
3065 var vpv *VoltPortVnet
3066
3067 logger.Debugw(ctx, "Received IGMPv2 Type", log.Fields{"Type": igmpv2.Type})
3068
3069 if igmpv2.Type == layers.IGMPMembershipReportV2 || igmpv2.Type == layers.IGMPMembershipReportV1 {
3070
3071 logger.Infow(ctx, "IGMP Join received: v2", log.Fields{"Addr": igmpv2.GroupAddress, "Port": port})
3072
3073 // This is a report coming from the PON. We must be able to first find the
3074 // subscriber from the VLAN tag and port and verify if the IGMP proxy is
3075 // enabled for the subscriber
3076 vpv, _ = va.GetVnetFromPkt(device, port, pkt)
3077
3078 if vpv == nil {
3079 logger.Errorw(ctx, "Couldn't find VNET associated with port", log.Fields{"Port": port})
3080 return
3081 } else if !vpv.IgmpEnabled {
3082 logger.Errorw(ctx, "IGMP is not activated on the port", log.Fields{"Port": port})
3083 return
3084 }
3085
3086 mvp := va.GetMvlanProfileByName(vpv.MvlanProfileName)
3087 if mvp == nil {
3088 logger.Errorw(ctx, "Igmp Packet Received for Subscriber with Missing Mvlan Profile",
3089 log.Fields{"Receiver": vpv.Port, "MvlanProfile": vpv.MvlanProfileName})
3090 return
3091 }
3092 mvlan := mvp.Mvlan
3093
3094 mvp.mvpLock.RLock()
3095 defer mvp.mvpLock.RUnlock()
3096 // The subscriber is validated and now process the IGMP report
3097 ig := va.GetIgmpGroup(mvlan, igmpv2.GroupAddress)
3098
3099 if yes := va.IsMaxChannelsCountExceeded(device, port, ponPortID, ig, igmpv2.GroupAddress, mvp); yes {
3100 logger.Warnw(ctx, "Dropping IGMP Join v2: Active channel threshold exceeded",
3101 log.Fields{"PonPortID": ponPortID, "Addr": igmpv2.GroupAddress, "MvlanProfile": vpv.MvlanProfileName})
3102 return
3103 }
3104 if ig != nil {
3105 logger.Infow(ctx, "IGMP Group", log.Fields{"Group": ig.GroupID, "devices": ig.Devices})
3106 // If the IGMP group is already created. just add the receiver
3107 ig.IgmpGroupLock.Lock()
3108 // Check for port state to avoid race condition where PortDown event
3109 // acquired lock before packet processing
3110 vd := GetApplication().GetDevice(device)
3111 vp := vd.GetPort(port)
3112 if vp == nil || vp.State != PortStateUp {
3113 logger.Warnw(ctx, "Join received from a Port that is DOWN or not present",
3114 log.Fields{"Port": port})
3115 ig.IgmpGroupLock.Unlock()
3116 return
3117 }
3118 ig.AddReceiver(device, port, igmpv2.GroupAddress, nil, IgmpVersion2, dot1Q.VLANIdentifier, dot1Q.Priority, ponPortID)
3119 ig.IgmpGroupLock.Unlock()
3120 } else {
3121 // Create the IGMP group and then add the receiver to the group
3122 if ig := va.AddIgmpGroup(vpv.MvlanProfileName, igmpv2.GroupAddress, device); ig != nil {
3123 logger.Infow(ctx, "New IGMP Group", log.Fields{"Group": ig.GroupID, "devices": ig.Devices})
3124 ig.IgmpGroupLock.Lock()
3125 // Check for port state to avoid race condition where PortDown event
3126 // acquired lock before packet processing
3127 vd := GetApplication().GetDevice(device)
3128 vp := vd.GetPort(port)
3129 if vp == nil || vp.State != PortStateUp {
3130 logger.Warnw(ctx, "Join received from a Port that is DOWN or not present",
3131 log.Fields{"Port": port})
3132 ig.IgmpGroupLock.Unlock()
3133 return
3134 }
3135 ig.AddReceiver(device, port, igmpv2.GroupAddress, nil, IgmpVersion2, dot1Q.VLANIdentifier, dot1Q.Priority, ponPortID)
3136 ig.IgmpGroupLock.Unlock()
3137 } else {
3138 logger.Errorw(ctx, "IGMP Group Creation Failed", log.Fields{"Addr": igmpv2.GroupAddress})
3139 return
3140 }
3141 }
3142 } else if igmpv2.Type == layers.IGMPLeaveGroup {
3143 // This is a IGMP leave coming from one of the receivers. We essentially remove the
3144 // the receiver.
3145 logger.Infow(ctx, "IGMP Leave received: v2", log.Fields{"Addr": igmpv2.GroupAddress, "Port": port})
3146
3147 vpv, _ = va.GetVnetFromPkt(device, port, pkt)
3148 if vpv == nil {
3149 logger.Errorw(ctx, "Couldn't find VNET associated with port", log.Fields{"Port": port})
3150 return
3151 } else if !vpv.IgmpEnabled {
3152 logger.Errorw(ctx, "IGMP is not activated on the port", log.Fields{"Port": port})
3153 return
3154 }
3155
3156 mvp := va.GetMvlanProfileByName(vpv.MvlanProfileName)
3157 mvp.mvpLock.RLock()
3158 defer mvp.mvpLock.RUnlock()
3159 mvlan := mvp.Mvlan
3160 // The subscriber is validated and now process the IGMP report
3161 if ig := va.GetIgmpGroup(mvlan, igmpv2.GroupAddress); ig != nil {
3162 ig.IgmpGroupLock.Lock()
3163 // Delete the receiver once the IgmpGroup is identified
3164 ig.DelReceiver(device, port, igmpv2.GroupAddress, nil, ponPortID)
3165 ig.IgmpGroupLock.Unlock()
3166 if ig.NumDevicesActive() == 0 {
3167 va.DelIgmpGroup(ig)
3168 }
3169 }
3170 } else {
3171 // This must be a query on the NNI port. However, we dont make that assumption.
3172 // Need to look for the IGMP group based on the VLAN in the packet as
3173 // the MVLAN
3174
3175 //Check if mvlan profile exist for the incoming pkt vlan
3176 profile, _ := va.MvlanProfilesByTag.Load(pktVlan)
3177 if profile == nil {
3178 logger.Errorw(ctx, "Mvlan Profile not found for incoming packet. Dropping Request", log.Fields{"Mvlan": pktVlan})
3179 return
3180 }
3181 mvp := profile.(*MvlanProfile)
3182 mvp.mvpLock.RLock()
3183 defer mvp.mvpLock.RUnlock()
3184
3185 if net.ParseIP("0.0.0.0").Equal(igmpv2.GroupAddress) {
3186 va.processIgmpQueries(device, pktVlan, IgmpVersion2)
3187 } else {
3188 if ig := va.GetIgmpGroup(pktVlan, igmpv2.GroupAddress); ig != nil {
3189 ig.IgmpGroupLock.Lock()
3190 igd, ok := ig.Devices[device]
3191 if ok {
3192 igd.ProcessQuery(igmpv2.GroupAddress, IgmpVersion2)
3193 } else {
3194 logger.Warnw(ctx, "IGMP Device not found", log.Fields{"Device": device, "Group": igmpv2.GroupAddress})
3195 }
3196 ig.IgmpGroupLock.Unlock()
3197 }
3198 }
3199 }
3200}
3201
3202// ProcessIgmpv3Pkt : Process IGMPv3 packet
3203func (va *VoltApplication) ProcessIgmpv3Pkt(device string, port string, pkt gopacket.Packet) {
3204 // First get the layers of interest
3205 dot1QLayer := pkt.Layer(layers.LayerTypeDot1Q)
3206
3207 if dot1QLayer == nil {
3208 logger.Error(ctx, "Igmp Packet Received without Vlan - Dropping pkt")
3209 return
3210 }
3211 dot1Q := dot1QLayer.(*layers.Dot1Q)
3212 pktVlan := of.VlanType(dot1Q.VLANIdentifier)
3213 igmpv3 := pkt.Layer(layers.LayerTypeIGMP).(*layers.IGMP)
3214
3215 ponPortID := va.GetPonPortID(device, port)
3216
3217 var vpv *VoltPortVnet
3218 logger.Debugw(ctx, "Received IGMPv3 Type", log.Fields{"Type": igmpv3.Type})
3219
3220 if igmpv3.Type == layers.IGMPMembershipReportV3 {
3221 // This is a report coming from the PON. We must be able to first find the
3222 // subscriber from the VLAN tag and port and verify if the IGMP proxy is
3223 // enabled for the subscriber
3224 vpv, _ = va.GetVnetFromPkt(device, port, pkt)
3225 if vpv == nil {
3226 logger.Errorw(ctx, "Couldn't find VNET associated with port", log.Fields{"Port": port})
3227 return
3228 } else if !vpv.IgmpEnabled {
3229 logger.Errorw(ctx, "IGMP is not activated on the port", log.Fields{"Port": port})
3230 return
3231 }
3232 mvp := va.GetMvlanProfileByName(vpv.MvlanProfileName)
3233 if mvp == nil {
3234 logger.Errorw(ctx, "Igmp Packet received for Subscriber with Missing Mvlan Profile",
3235 log.Fields{"Receiver": vpv.Port, "MvlanProfile": vpv.MvlanProfileName})
3236 return
3237 }
3238 mvp.mvpLock.RLock()
3239 defer mvp.mvpLock.RUnlock()
3240 mvlan := mvp.Mvlan
3241
3242 for _, group := range igmpv3.GroupRecords {
3243
3244 isJoin := isIgmpJoin(group.Type, group.SourceAddresses)
3245 // The subscriber is validated and now process the IGMP report
3246 ig := va.GetIgmpGroup(mvlan, group.MulticastAddress)
3247 if isJoin {
3248 if yes := va.IsMaxChannelsCountExceeded(device, port, ponPortID, ig, group.MulticastAddress, mvp); yes {
3249 logger.Warnw(ctx, "Dropping IGMP Join v3: Active channel threshold exceeded",
3250 log.Fields{"PonPortID": ponPortID, "Addr": group.MulticastAddress, "MvlanProfile": vpv.MvlanProfileName})
3251
3252 return
3253 }
3254 if ig != nil {
3255 // If the IGMP group is already created. just add the receiver
3256 logger.Infow(ctx, "IGMP Join received for existing group", log.Fields{"Addr": group.MulticastAddress, "Port": port})
3257 ig.IgmpGroupLock.Lock()
3258 // Check for port state to avoid race condition where PortDown event
3259 // acquired lock before packet processing
3260 vd := GetApplication().GetDevice(device)
3261 vp := vd.GetPort(port)
3262 if vp == nil || vp.State != PortStateUp {
3263 logger.Warnw(ctx, "Join received from a Port that is DOWN or not present",
3264 log.Fields{"Port": port})
3265 ig.IgmpGroupLock.Unlock()
3266 return
3267 }
3268 ig.AddReceiver(device, port, group.MulticastAddress, &group, IgmpVersion3,
3269 dot1Q.VLANIdentifier, dot1Q.Priority, ponPortID)
3270 ig.IgmpGroupLock.Unlock()
3271 } else {
3272 // Create the IGMP group and then add the receiver to the group
3273 logger.Infow(ctx, "IGMP Join received for new group", log.Fields{"Addr": group.MulticastAddress, "Port": port})
3274 if ig := va.AddIgmpGroup(vpv.MvlanProfileName, group.MulticastAddress, device); ig != nil {
3275 ig.IgmpGroupLock.Lock()
3276 // Check for port state to avoid race condition where PortDown event
3277 // acquired lock before packet processing
3278 vd := GetApplication().GetDevice(device)
3279 vp := vd.GetPort(port)
3280 if vp == nil || vp.State != PortStateUp {
3281 logger.Warnw(ctx, "Join received from a Port that is DOWN or not present",
3282 log.Fields{"Port": port})
3283 ig.IgmpGroupLock.Unlock()
3284 return
3285 }
3286 ig.AddReceiver(device, port, group.MulticastAddress, &group, IgmpVersion3,
3287 dot1Q.VLANIdentifier, dot1Q.Priority, ponPortID)
3288 ig.IgmpGroupLock.Unlock()
3289 } else {
3290 logger.Warnw(ctx, "IGMP Group Creation Failed", log.Fields{"Addr": group.MulticastAddress})
3291 }
3292 }
3293 } else if ig != nil {
3294 logger.Infow(ctx, "IGMP Leave received for existing group", log.Fields{"Addr": group.MulticastAddress, "Port": port})
3295 ig.IgmpGroupLock.Lock()
3296 ig.DelReceiver(device, port, group.MulticastAddress, &group, ponPortID)
3297 ig.IgmpGroupLock.Unlock()
3298 if ig.NumDevicesActive() == 0 {
3299 va.DelIgmpGroup(ig)
3300 }
3301 } else {
3302 logger.Warnw(ctx, "IGMP Leave received for unknown group", log.Fields{"Addr": group.MulticastAddress})
3303 }
3304 }
3305 } else {
3306 // This must be a query on the NNI port. However, we dont make that assumption.
3307 // Need to look for the IGMP group based on the VLAN in the packet as
3308 // the MVLAN
3309
3310 //Check if mvlan profile exist for the incoming pkt vlan
3311 profile, _ := va.MvlanProfilesByTag.Load(pktVlan)
3312 if profile == nil {
3313 logger.Errorw(ctx, "Mvlan Profile not found for incoming packet. Dropping Request", log.Fields{"Mvlan": pktVlan})
3314 return
3315 }
3316 mvp := profile.(*MvlanProfile)
3317 mvp.mvpLock.RLock()
3318 defer mvp.mvpLock.RUnlock()
3319
3320 if net.ParseIP("0.0.0.0").Equal(igmpv3.GroupAddress) {
3321 va.processIgmpQueries(device, pktVlan, IgmpVersion3)
3322 } else {
3323 if ig := va.GetIgmpGroup(pktVlan, igmpv3.GroupAddress); ig != nil {
3324 ig.IgmpGroupLock.Lock()
3325 igd, ok := ig.Devices[device]
3326 if ok {
3327 igd.ProcessQuery(igmpv3.GroupAddress, IgmpVersion3)
3328 } else {
3329 logger.Warnw(ctx, "IGMP Device not found", log.Fields{"Device": device, "Group": igmpv3.GroupAddress})
3330 }
3331 ig.IgmpGroupLock.Unlock()
3332 }
3333 }
3334 }
3335}
3336
3337// processIgmpQueries to process the igmp queries
3338func (va *VoltApplication) processIgmpQueries(device string, pktVlan of.VlanType, version uint8) {
3339 // This is a generic query and respond with all the groups channels in currently being viewed.
3340 processquery := func(key interface{}, value interface{}) bool {
3341 ig := value.(*IgmpGroup)
3342 ig.IgmpGroupLock.Lock()
3343 if ig.Mvlan != pktVlan {
3344 ig.IgmpGroupLock.Unlock()
3345 return true
3346 }
3347 igd, ok := ig.Devices[device]
3348 if !ok {
3349 logger.Warnw(ctx, "IGMP Device not found", log.Fields{"Device": device})
3350 ig.IgmpGroupLock.Unlock()
3351 return true
3352 }
3353 processQueryForEachChannel := func(key interface{}, value interface{}) bool {
3354 groupAddr := key.(string)
3355 igd.ProcessQuery(net.ParseIP(groupAddr), version)
3356 return true
3357 }
3358 igd.GroupChannels.Range(processQueryForEachChannel)
3359 ig.IgmpGroupLock.Unlock()
3360 return true
3361 }
3362 va.IgmpGroups.Range(processquery)
3363}
3364
3365// isIgmpJoin to check if it is igmp join
3366func isIgmpJoin(recordType layers.IGMPv3GroupRecordType, sourceAddr []net.IP) bool {
3367 var join = false
3368
3369 if (layers.IGMPToEx == recordType) || (layers.IGMPIsEx == recordType) {
3370 join = true
3371 } else if layers.IGMPBlock == recordType {
3372 if len(sourceAddr) == 0 {
3373 join = true
3374 }
3375 } else if (layers.IGMPToIn == recordType) || (layers.IGMPIsIn == recordType) || (layers.IGMPAllow == recordType) {
3376 if len(sourceAddr) != 0 {
3377 join = true
3378 }
3379 }
3380 return join
3381}
3382
3383func isIncl(recordType layers.IGMPv3GroupRecordType) bool {
3384
3385 if (layers.IGMPToIn == recordType) || (layers.IGMPIsIn == recordType) || (layers.IGMPAllow == recordType) {
3386 return true
3387 }
3388 return false
3389}
3390
3391// IgmpProcessPkt to process the IGMP packet received. The packet received brings along with it
3392// the port on which the packet is received and the device the port is in.
3393func (va *VoltApplication) IgmpProcessPkt(device string, port string, pkt gopacket.Packet) {
3394 igmpl := pkt.Layer(layers.LayerTypeIGMP)
3395 if igmpl == nil {
3396 logger.Error(ctx, "Invalid IGMP packet arrived as IGMP packet")
3397 return
3398 }
3399 if igmp, ok := igmpl.(*layers.IGMPv1or2); ok {
3400 // This is an IGMPv2 packet.
3401 logger.Debugw(ctx, "IGMPv2 Packet Received", log.Fields{"IPAddr": igmp.GroupAddress})
3402 va.ProcessIgmpv2Pkt(device, port, pkt)
3403 return
3404 }
3405 if igmpv3, ok := igmpl.(*layers.IGMP); ok {
3406 logger.Debugw(ctx, "IGMPv3 Packet Received", log.Fields{"NumOfGroups": igmpv3.NumberOfGroupRecords})
3407 va.ProcessIgmpv3Pkt(device, port, pkt)
3408 }
3409}
3410
3411// IgmpPacketInd for igmp packet indication
3412func (va *VoltApplication) IgmpPacketInd(device string, port string, pkt gopacket.Packet) {
3413 pt := NewIgmpPacketTask(device, port, pkt)
3414 va.IgmpTasks.AddTask(pt)
3415}
3416
3417// ------------------------------------------------------------
3418// MVLAN related implemnetation
3419//
3420// Each MVLAN is configured with groups of multicast IPs. The idea of
3421// groups is to be able to group some multicast channels into an individual
3422// PON group and have a unique multicast GEM port for that set. However, in
3423// the current implementation, the concept of grouping is not fully utilized.
3424
3425// MvlanGroup structure
3426// A set of MC IPs form a group
3427
3428// MCGroupProxy identifies source specific multicast(SSM) config.
3429type MCGroupProxy struct {
3430 // Mode represents source list include/exclude
3431 Mode common.MulticastSrcListMode
3432 // SourceList represents list of multicast server IP addresses.
3433 SourceList []net.IP
3434}
3435
3436// MvlanGroup identifies MC group info
3437type MvlanGroup struct {
3438 Name string
3439 Wildcard bool
3440 McIPs []string
3441 IsStatic bool
3442}
3443
3444// OperInProgress type
3445type OperInProgress uint8
3446
3447const (
3448 // UpdateInProgress constant
3449 UpdateInProgress OperInProgress = 2
3450 // NoOp constant
3451 NoOp OperInProgress = 1
3452 // Nil constant
3453 Nil OperInProgress = 0
3454)
3455
3456// MvlanProfile : A set of groups of MC IPs for a MVLAN profile. It is assumed that
3457// the MVLAN IP is not repeated within multiples groups and across
3458// MVLAN profiles. The first match is used up on search to lcoate the
3459// MVLAN profile for an MC IP
3460type MvlanProfile struct {
3461 Name string
3462 Mvlan of.VlanType
3463 PonVlan of.VlanType
3464 Groups map[string]*MvlanGroup
3465 Proxy map[string]*MCGroupProxy
3466 Version string
3467 IsPonVlanPresent bool
3468 IsChannelBasedGroup bool
3469 DevicesList map[string]OperInProgress //device serial number //here
3470 oldGroups map[string]*MvlanGroup
3471 oldProxy map[string]*MCGroupProxy
3472 MaxActiveChannels uint32
3473 PendingDeleteFlow map[string]map[string]bool
3474 DeleteInProgress bool
3475 IgmpServVersion map[string]*uint8
3476 mvpLock sync.RWMutex
3477 mvpFlowLock sync.RWMutex
3478}
3479
3480// NewMvlanProfile is constructor for MVLAN profile.
3481func NewMvlanProfile(name string, mvlan of.VlanType, ponVlan of.VlanType, isChannelBasedGroup bool, OLTSerialNums []string, actChannelPerPon uint32) *MvlanProfile {
3482 var mvp MvlanProfile
3483 mvp.Name = name
3484 mvp.Mvlan = mvlan
3485 mvp.PonVlan = ponVlan
3486 mvp.mvpLock = sync.RWMutex{}
3487 mvp.Groups = make(map[string]*MvlanGroup)
3488 mvp.Proxy = make(map[string]*MCGroupProxy)
3489 mvp.DevicesList = make(map[string]OperInProgress)
3490 mvp.PendingDeleteFlow = make(map[string]map[string]bool)
3491 mvp.IsChannelBasedGroup = isChannelBasedGroup
3492 mvp.MaxActiveChannels = actChannelPerPon
3493 mvp.DeleteInProgress = false
3494 mvp.IgmpServVersion = make(map[string]*uint8)
3495
3496 if (ponVlan != of.VlanNone) && (ponVlan != 0) {
3497 mvp.IsPonVlanPresent = true
3498 }
3499 return &mvp
3500}
3501
3502// AddMvlanProxy for addition of groups to an MVLAN profile
3503func (mvp *MvlanProfile) AddMvlanProxy(name string, proxyInfo common.MulticastGroupProxy) {
3504 proxy := &MCGroupProxy{}
3505 proxy.Mode = proxyInfo.Mode
3506 proxy.SourceList = util.GetExpIPList(proxyInfo.SourceList)
3507
3508 if _, ok := mvp.Proxy[name]; !ok {
3509 logger.Debugw(ctx, "Added MVLAN Proxy", log.Fields{"Name": name, "Proxy": proxy})
3510 } else {
3511 logger.Debugw(ctx, "Updated MVLAN Proxy", log.Fields{"Name": name, "Proxy": proxy})
3512 }
3513 if proxyInfo.IsStatic == common.IsStaticYes {
3514 mvp.Groups[name].IsStatic = true
3515 }
3516 mvp.Proxy[name] = proxy
3517}
3518
3519// AddMvlanGroup for addition of groups to an MVLAN profile
3520func (mvp *MvlanProfile) AddMvlanGroup(name string, ips []string) {
3521 mvg := &MvlanGroup{}
3522 mvg.Name = name
3523 mvg.Wildcard = len(ips) == 0
3524 mvg.McIPs = ips
3525 mvg.IsStatic = false
3526 if _, ok := mvp.Groups[name]; !ok {
3527 logger.Debugw(ctx, "Added MVLAN Group", log.Fields{"VLAN": mvp.Mvlan, "Name": name, "mvg": mvg, "IPs": mvg.McIPs})
3528 } else {
3529 logger.Debugw(ctx, "Updated MVLAN Group", log.Fields{"VLAN": mvp.Mvlan, "Name": name})
3530 }
3531 mvp.Groups[name] = mvg
3532}
3533
3534// GetUsMatchVlan provides mvlan for US Match parameter
3535func (mvp *MvlanProfile) GetUsMatchVlan() of.VlanType {
3536 if mvp.IsPonVlanPresent {
3537 return mvp.PonVlan
3538 }
3539 return mvp.Mvlan
3540}
3541
3542// WriteToDb is utility to write Mvlan Profile Info to database
3543func (mvp *MvlanProfile) WriteToDb() error {
3544
3545 if mvp.DeleteInProgress {
3546 logger.Warnw(ctx, "Skipping Redis Update for MvlanProfile, MvlanProfile delete in progress", log.Fields{"Mvlan": mvp.Mvlan})
3547 return nil
3548 }
3549
3550 mvp.Version = database.PresentVersionMap[database.MvlanPath]
3551 b, err := json.Marshal(mvp)
3552 if err != nil {
3553 return err
3554 }
3555 if err1 := db.PutMvlan(uint16(mvp.Mvlan), string(b)); err1 != nil {
3556 return err1
3557 }
3558 return nil
3559}
3560
3561//isChannelStatic - Returns true if the given channel is part of static group in the Mvlan Profile
3562func (mvp *MvlanProfile) isChannelStatic(channel net.IP) bool {
3563 for _, mvg := range mvp.Groups {
3564 if mvg.IsStatic {
3565 if isChannelStatic := doesIPMatch(channel, mvg.McIPs); isChannelStatic {
3566 return true
3567 }
3568 }
3569 }
3570 return false
3571}
3572
3573//containsStaticChannels - Returns if any static channels is part of the Mvlan Profile
3574func (mvp *MvlanProfile) containsStaticChannels() bool {
3575 for _, mvg := range mvp.Groups {
3576 if mvg.IsStatic && len(mvg.McIPs) != 0 {
3577 return true
3578 }
3579 }
3580 return false
3581}
3582
3583//getAllStaticChannels - Returns all static channels in the Mvlan Profile
3584func (mvp *MvlanProfile) getAllStaticChannels() ([]net.IP, bool) {
3585 channelList := []net.IP{}
3586 containsStatic := false
3587 for _, mvg := range mvp.Groups {
3588 if mvg.IsStatic {
3589 staticChannels, _ := mvg.getAllChannels()
3590 channelList = append(channelList, staticChannels...)
3591 }
3592 }
3593 if len(channelList) > 0 {
3594 containsStatic = true
3595 }
3596 return channelList, containsStatic
3597}
3598
3599//getAllOldGroupStaticChannels - Returns all static channels in the Mvlan Profile
3600func (mvp *MvlanProfile) getAllOldGroupStaticChannels() ([]net.IP, bool) {
3601 channelList := []net.IP{}
3602 containsStatic := false
3603 for _, mvg := range mvp.oldGroups {
3604 if mvg.IsStatic {
3605 staticChannels, _ := mvg.getAllChannels()
3606 channelList = append(channelList, staticChannels...)
3607 }
3608 }
3609 if len(channelList) > 0 {
3610 containsStatic = true
3611 }
3612 return channelList, containsStatic
3613}
3614
3615//getAllChannels - Returns all channels in the Mvlan Profile
3616func (mvg *MvlanGroup) getAllChannels() ([]net.IP, bool) {
3617 channelList := []net.IP{}
3618
3619 if mvg == nil || len(mvg.McIPs) == 0 {
3620 return []net.IP{}, false
3621 }
3622
3623 grpChannelOrRange := mvg.McIPs
3624 for _, channelOrRange := range grpChannelOrRange {
3625 if strings.Contains(channelOrRange, "-") {
3626 var splits = strings.Split(channelOrRange, "-")
3627 ipStart := util.IP2LongConv(net.ParseIP(splits[0]))
3628 ipEnd := util.IP2LongConv(net.ParseIP(splits[1]))
3629
3630 for i := ipStart; i <= ipEnd; i++ {
3631 channelList = append(channelList, util.Long2ipConv(i))
3632 }
3633 } else {
3634 channelList = append(channelList, net.ParseIP(channelOrRange))
3635 }
3636 }
3637 return channelList, true
3638}
3639
3640//SetUpdateStatus - Sets profile update status for devices
3641func (mvp *MvlanProfile) SetUpdateStatus(serialNum string, status OperInProgress) {
3642 if serialNum != "" {
3643 mvp.DevicesList[serialNum] = status
3644 return
3645 }
3646
3647 for srNo := range mvp.DevicesList {
3648 mvp.DevicesList[srNo] = status
3649 }
3650}
3651
3652//isUpdateInProgress - checking is update is in progress for the mvlan profile
3653func (mvp *MvlanProfile) isUpdateInProgress() bool {
3654
3655 for srNo := range mvp.DevicesList {
3656 if mvp.DevicesList[srNo] == UpdateInProgress {
3657 return true
3658 }
3659 }
3660 return false
3661}
3662
3663//IsUpdateInProgressForDevice - Checks is Mvlan Profile update is is progress for the given device
3664func (mvp *MvlanProfile) IsUpdateInProgressForDevice(device string) bool {
3665 if vd := GetApplication().GetDevice(device); vd != nil {
3666 if mvp.DevicesList[vd.SerialNum] == UpdateInProgress {
3667 return true
3668 }
3669 }
3670 return false
3671}
3672
3673// DelFromDb to delere mvlan from database
3674func (mvp *MvlanProfile) DelFromDb() {
3675 _ = db.DelMvlan(uint16(mvp.Mvlan))
3676}
3677
3678// storeMvlansMap to store mvlan map
3679func (va *VoltApplication) storeMvlansMap(mvlan of.VlanType, name string, mvp *MvlanProfile) {
3680 va.MvlanProfilesByTag.Store(mvlan, mvp)
3681 va.MvlanProfilesByName.Store(name, mvp)
3682}
3683
3684// deleteMvlansMap to delete mvlan map
3685func (va *VoltApplication) deleteMvlansMap(mvlan of.VlanType, name string) {
3686 va.MvlanProfilesByTag.Delete(mvlan)
3687 va.MvlanProfilesByName.Delete(name)
3688}
3689
3690// RestoreMvlansFromDb to read from the DB and restore all the MVLANs
3691func (va *VoltApplication) RestoreMvlansFromDb() {
3692 mvlans, _ := db.GetMvlans()
3693 for _, mvlan := range mvlans {
3694 b, ok := mvlan.Value.([]byte)
3695 if !ok {
3696 logger.Warn(ctx, "The value type is not []byte")
3697 continue
3698 }
3699 var mvp MvlanProfile
3700 err := json.Unmarshal(b, &mvp)
3701 if err != nil {
3702 logger.Warn(ctx, "Unmarshal of MVLAN failed")
3703 continue
3704 }
3705 va.storeMvlansMap(mvp.Mvlan, mvp.Name, &mvp)
3706
3707 for srNo := range mvp.DevicesList {
3708 if mvp.IgmpServVersion[srNo] == nil {
3709 servVersion := IgmpVersion0
3710 mvp.IgmpServVersion[srNo] = &servVersion
3711 }
3712 }
3713 logger.Infow(ctx, "Restored Mvlan Profile", log.Fields{"MVPName": mvp.Name})
3714 }
3715}
3716
3717// GetMvlanProfileByTag fetches MVLAN profile based on the MC VLAN
3718func (va *VoltApplication) GetMvlanProfileByTag(vlan of.VlanType) *MvlanProfile {
3719 if mvp, ok := va.MvlanProfilesByTag.Load(vlan); ok {
3720 return mvp.(*MvlanProfile)
3721 }
3722 return nil
3723}
3724
3725// GetMvlanProfileByName fetches MVLAN profile based on the profile name.
3726func (va *VoltApplication) GetMvlanProfileByName(name string) *MvlanProfile {
3727 if mvp, ok := va.MvlanProfilesByName.Load(name); ok {
3728 return mvp.(*MvlanProfile)
3729 }
3730 return nil
3731}
3732
3733//UpdateMvlanProfile - only channel groups be updated
3734func (va *VoltApplication) UpdateMvlanProfile(name string, vlan of.VlanType, groups map[string][]string, activeChannelCount int, proxy map[string]common.MulticastGroupProxy) error {
3735
3736 mvpIntf, ok := va.MvlanProfilesByName.Load(name)
3737 if !ok {
3738 logger.Error(ctx, "Update Mvlan Failed: Profile does not exist")
3739 return errors.New("MVLAN profile not found")
3740 }
3741 mvp := mvpIntf.(*MvlanProfile)
3742 // check if groups are same then just update the OLTSerial numbers, push the config on new serial numbers
3743
3744 existingGroup := mvp.Groups
3745 existingProxy := mvp.Proxy
3746 mvp.Groups = make(map[string]*MvlanGroup)
3747 mvp.Proxy = make(map[string]*MCGroupProxy)
3748
3749 /* Need to protect groups and proxy write lock */
3750 mvp.mvpLock.Lock()
3751 for grpName, grpIPList := range groups {
3752 mvp.AddMvlanGroup(grpName, grpIPList)
3753 }
3754 for grpName, proxyInfo := range proxy {
3755 mvp.AddMvlanProxy(grpName, proxyInfo)
3756 }
3757 if _, ok := mvp.Groups[common.StaticGroup]; ok {
3758 if _, yes := mvp.Proxy[common.StaticGroup]; !yes {
3759 mvp.Groups[common.StaticGroup].IsStatic = true
3760 }
3761 }
3762 prevMaxActiveChannels := mvp.MaxActiveChannels
3763 if reflect.DeepEqual(mvp.Groups, existingGroup) && reflect.DeepEqual(mvp.Proxy, existingProxy) {
3764 logger.Info(ctx, "No change in groups config")
3765 if uint32(activeChannelCount) != mvp.MaxActiveChannels {
3766 mvp.MaxActiveChannels = uint32(activeChannelCount)
3767 if err := mvp.WriteToDb(); err != nil {
3768 logger.Errorw(ctx, "Mvlan profile Write to DB failed", log.Fields{"ProfileName": mvp.Name})
3769 }
3770 if prevMaxActiveChannels != mvp.MaxActiveChannels {
3771 mvp.UpdateActiveChannelSubscriberAlarm()
3772 }
3773 }
3774 mvp.mvpLock.Unlock()
3775 return nil
3776 }
3777 mvp.mvpLock.Unlock()
3778 mvp.MaxActiveChannels = uint32(activeChannelCount)
3779
3780 // Status is maintained so that in the event of any crash or reboot during update,
3781 // the recovery is possible once the pod is UP again
3782 mvp.SetUpdateStatus("", UpdateInProgress)
3783 mvp.oldGroups = existingGroup
3784 mvp.oldProxy = existingProxy
3785 va.storeMvlansMap(vlan, name, mvp)
3786 if err := mvp.WriteToDb(); err != nil {
3787 logger.Errorw(ctx, "Mvlan profile Write to DB failed", log.Fields{"ProfileName": mvp.Name})
3788 }
3789 if prevMaxActiveChannels != mvp.MaxActiveChannels {
3790 mvp.UpdateActiveChannelSubscriberAlarm()
3791 }
3792
3793 // The update task is added as part of Igm p task list, so that any parallel igmp pkt processing is avoided
3794 // Until, the update operation is completed, the igmp pkt processing will be enqueued
3795 updateTask := NewUpdateMvlanTask(mvp, "")
3796 va.IgmpTasks.AddTask(updateTask)
3797 return nil
3798}
3799
3800// isDeviceInList to check if device is the list
3801func isDeviceInList(serialNum string, OLTSerialNums []string) bool {
3802 for _, oltSerialNum := range OLTSerialNums {
3803 if serialNum == oltSerialNum {
3804 return true
3805 }
3806 }
3807 return false
3808}
3809
3810// McastConfigKey creates the key using the olt serial number and mvlan profile id
3811func McastConfigKey(oltSerialNum string, mvlanProfID string) string {
3812 return oltSerialNum + "_" + mvlanProfID
3813}
3814
3815// GetMcastConfig to get McastConfig Information by OLT and Mvlan Profile ID
3816func (va *VoltApplication) GetMcastConfig(oltSerialNum string, mvlanProfID string) *McastConfig {
3817 if mc, ok := va.McastConfigMap.Load(McastConfigKey(oltSerialNum, mvlanProfID)); ok {
3818 return mc.(*McastConfig)
3819 }
3820 return nil
3821}
3822
3823func (va *VoltApplication) storeMcastConfig(oltSerialNum string, mvlanProfID string, mcastConfig *McastConfig) {
3824 va.McastConfigMap.Store(McastConfigKey(oltSerialNum, mvlanProfID), mcastConfig)
3825}
3826
3827func (va *VoltApplication) deleteMcastConfig(oltSerialNum string, mvlanProfID string) {
3828 va.McastConfigMap.Delete(McastConfigKey(oltSerialNum, mvlanProfID))
3829}
3830
3831// AddMcastConfig for addition of a MVLAN profile
3832func (va *VoltApplication) AddMcastConfig(MvlanProfileID string, IgmpProfileID string, IgmpProxyIP string, OltSerialNum string) error {
3833 var mcastCfg *McastConfig
3834
3835 mcastCfg = va.GetMcastConfig(OltSerialNum, MvlanProfileID)
3836 if mcastCfg == nil {
3837 mcastCfg = &McastConfig{}
3838 } else {
3839 logger.Debugw(ctx, "Mcast Config already exists", log.Fields{"OltSerialNum": mcastCfg.OltSerialNum,
3840 "MVLAN Profile ID": mcastCfg.MvlanProfileID})
3841 }
3842
3843 // Update all igds available
3844 mvpIntf, ok := va.MvlanProfilesByName.Load(MvlanProfileID)
3845 if !ok {
3846 return errors.New("MVLAN profile not found during add mcast config")
3847 }
3848 mvlan := mvpIntf.(*MvlanProfile).Mvlan
3849
3850 mcastCfg.OltSerialNum = OltSerialNum
3851 mcastCfg.MvlanProfileID = MvlanProfileID
3852 mcastCfg.IgmpProfileID = IgmpProfileID
3853 mcastCfg.IgmpProxyIP = net.ParseIP(IgmpProxyIP)
3854
3855 proxyCfg := va.getIgmpProfileMap(IgmpProfileID)
3856
3857 iterIgmpGroups := func(key interface{}, value interface{}) bool {
3858 ig := value.(*IgmpGroup)
3859 if ig.Mvlan != mvlan {
3860 return true
3861 }
3862
3863 for _, igd := range ig.Devices {
3864 if igd.SerialNo != OltSerialNum {
3865 continue
3866 }
3867 igd.proxyCfg = proxyCfg
3868 if IgmpProfileID == "" {
3869 igd.IgmpProxyIP = &igd.proxyCfg.IgmpSourceIP
3870 } else {
3871 igd.IgmpProxyIP = &mcastCfg.IgmpProxyIP
3872 }
3873 mcastCfg.IgmpGroupDevices.Store(igd.GroupID, igd)
3874 logger.Debugw(ctx, "Igd updated with proxyCfg and proxyIP", log.Fields{"name": igd.GroupName,
3875 "IgmpProfileID": IgmpProfileID, "ProxyIP": mcastCfg.IgmpProxyIP})
3876 }
3877 return true
3878 }
3879 va.IgmpGroups.Range(iterIgmpGroups)
3880
3881 va.storeMcastConfig(OltSerialNum, MvlanProfileID, mcastCfg)
3882 if err := mcastCfg.WriteToDb(); err != nil {
3883 logger.Errorw(ctx, "McastConfig Write to DB failed", log.Fields{"OltSerialNum": mcastCfg.OltSerialNum, "MvlanProfileID": mcastCfg.MvlanProfileID})
3884 }
3885 va.addOltToMvlan(MvlanProfileID, OltSerialNum)
3886
3887 return nil
3888}
3889
3890func (va *VoltApplication) addOltToMvlan(MvlanProfileID string, OltSerialNum string) {
3891 var mvp *MvlanProfile
3892 if mvpIntf, ok := va.MvlanProfilesByName.Load(MvlanProfileID); ok {
3893 servVersion := IgmpVersion0
3894 mvp = mvpIntf.(*MvlanProfile)
3895 mvp.DevicesList[OltSerialNum] = NoOp
3896 mvp.IgmpServVersion[OltSerialNum] = &servVersion
3897 if err := mvp.WriteToDb(); err != nil {
3898 logger.Errorw(ctx, "Mvlan profile Write to DB failed", log.Fields{"ProfileName": mvp.Name})
3899 }
3900 mvp.pushIgmpMcastFlows(OltSerialNum)
3901 }
3902}
3903
3904func (va *VoltApplication) delOltFromMvlan(MvlanProfileID string, OltSerialNum string) {
3905 var mvp *MvlanProfile
3906 if mvpIntf, ok := va.MvlanProfilesByName.Load(MvlanProfileID); ok {
3907 mvp = mvpIntf.(*MvlanProfile)
3908 //Delete from mvp list
3909 mvp.removeIgmpMcastFlows(OltSerialNum)
3910 delete(mvp.DevicesList, OltSerialNum)
3911 if err := mvp.WriteToDb(); err != nil {
3912 logger.Errorw(ctx, "Mvlan profile Write to DB failed", log.Fields{"ProfileName": mvp.Name})
3913 }
3914 }
3915}
3916
3917// DelMcastConfig for addition of a MVLAN profile
3918func (va *VoltApplication) DelMcastConfig(MvlanProfileID string, IgmpProfileID string, IgmpProxyIP string, OltSerialNum string) {
3919
3920 va.delOltFromMvlan(MvlanProfileID, OltSerialNum)
3921 va.deleteMcastConfig(OltSerialNum, MvlanProfileID)
3922 _ = db.DelMcastConfig(McastConfigKey(OltSerialNum, MvlanProfileID))
3923 if d := va.GetDeviceBySerialNo(OltSerialNum); d != nil {
3924 if mvp := va.GetMvlanProfileByName(MvlanProfileID); mvp != nil {
3925 va.RemoveGroupsFromPendingPool(d.Name, mvp.Mvlan)
3926 }
3927 }
3928}
3929
3930// DelAllMcastConfig for deletion of all mcast config
3931func (va *VoltApplication) DelAllMcastConfig(OltSerialNum string) error {
3932
3933 deleteIndividualMcastConfig := func(key interface{}, value interface{}) bool {
3934 mcastCfg := value.(*McastConfig)
3935 if mcastCfg.OltSerialNum == OltSerialNum {
3936 va.DelMcastConfig(mcastCfg.MvlanProfileID, mcastCfg.IgmpProfileID, mcastCfg.IgmpProxyIP.String(), mcastCfg.OltSerialNum)
3937 }
3938 return true
3939 }
3940 va.McastConfigMap.Range(deleteIndividualMcastConfig)
3941 return nil
3942}
3943
3944// UpdateMcastConfig for addition of a MVLAN profile
3945func (va *VoltApplication) UpdateMcastConfig(MvlanProfileID string, IgmpProfileID string, IgmpProxyIP string, OltSerialNum string) error {
3946
3947 mcastCfg := va.GetMcastConfig(OltSerialNum, MvlanProfileID)
3948 if mcastCfg == nil {
3949 logger.Warnw(ctx, "Mcast Config not found. Unable to update", log.Fields{"Mvlan Profile ID": MvlanProfileID, "OltSerialNum": OltSerialNum})
3950 return nil
3951 }
3952
3953 oldProfID := mcastCfg.IgmpProfileID
3954 mcastCfg.IgmpProfileID = IgmpProfileID
3955 mcastCfg.IgmpProxyIP = net.ParseIP(IgmpProxyIP)
3956
3957 va.storeMcastConfig(OltSerialNum, MvlanProfileID, mcastCfg)
3958
3959 // Update all igds
3960 if oldProfID != mcastCfg.IgmpProfileID {
3961 updateIgdProxyCfg := func(key interface{}, value interface{}) bool {
3962 igd := value.(*IgmpGroupDevice)
3963 igd.proxyCfg = va.getIgmpProfileMap(mcastCfg.IgmpProfileID)
3964 if IgmpProfileID == "" {
3965 igd.IgmpProxyIP = &igd.proxyCfg.IgmpSourceIP
3966 } else {
3967 igd.IgmpProxyIP = &mcastCfg.IgmpProxyIP
3968 }
3969 return true
3970 }
3971 mcastCfg.IgmpGroupDevices.Range(updateIgdProxyCfg)
3972 }
3973
3974 if err := mcastCfg.WriteToDb(); err != nil {
3975 logger.Errorw(ctx, "McastConfig Write to DB failed", log.Fields{"OltSerialNum": mcastCfg.OltSerialNum, "MvlanProfileID": mcastCfg.MvlanProfileID})
3976 }
3977
3978 return nil
3979}
3980
3981// WriteToDb is utility to write Mcast config Info to database
3982func (mc *McastConfig) WriteToDb() error {
3983 mc.Version = database.PresentVersionMap[database.McastConfigPath]
3984 b, err := json.Marshal(mc)
3985 if err != nil {
3986 return err
3987 }
3988 if err1 := db.PutMcastConfig(McastConfigKey(mc.OltSerialNum, mc.MvlanProfileID), string(b)); err1 != nil {
3989 return err1
3990 }
3991 return nil
3992}
3993
3994// RestoreMcastConfigsFromDb to read from the DB and restore Mcast configs
3995func (va *VoltApplication) RestoreMcastConfigsFromDb() {
3996 mcastConfigs, _ := db.GetMcastConfigs()
3997 for hash, mcastConfig := range mcastConfigs {
3998 b, ok := mcastConfig.Value.([]byte)
3999 if !ok {
4000 logger.Warn(ctx, "The value type is not []byte")
4001 continue
4002 }
4003 var mc McastConfig
4004 err := json.Unmarshal(b, &mc)
4005 if err != nil {
4006 logger.Warn(ctx, "Unmarshal of Mcast config failed")
4007 continue
4008 }
4009 va.storeMcastConfig(mc.OltSerialNum, mc.MvlanProfileID, &mc)
4010 logger.Infow(ctx, "Restored Mcast config", log.Fields{"OltSerialNum": mc.OltSerialNum, "MvlanProfileID": mc.MvlanProfileID, "hash": hash})
4011 }
4012}
4013
4014// AddMvlanProfile for addition of a MVLAN profile
4015func (va *VoltApplication) AddMvlanProfile(name string, mvlan of.VlanType, ponVlan of.VlanType,
4016 groups map[string][]string, isChannelBasedGroup bool, OLTSerialNum []string, activeChannelsPerPon int, proxy map[string]common.MulticastGroupProxy) error {
4017 var mvp *MvlanProfile
4018
4019 if mvp = va.GetMvlanProfileByTag(mvlan); mvp != nil {
4020 logger.Errorw(ctx, "Duplicate MVLAN ID configured", log.Fields{"mvlan": mvlan})
4021 return errors.New("MVLAN profile with same VLANID exists")
4022 }
4023 if mvpIntf, ok := va.MvlanProfilesByName.Load(name); ok {
4024 mvp = mvpIntf.(*MvlanProfile)
4025 for _, serialNum := range OLTSerialNum {
4026 if mvp.DevicesList[serialNum] != Nil {
4027 //This is backup restore scenario, just update the profile
4028 logger.Info(ctx, "Add Mvlan : Profile Name already exists, update-the-profile")
4029 return va.UpdateMvlanProfile(name, mvlan, groups, activeChannelsPerPon, proxy)
4030 }
4031 }
4032 }
4033
4034 if mvp == nil {
4035 mvp = NewMvlanProfile(name, mvlan, ponVlan, isChannelBasedGroup, OLTSerialNum, uint32(activeChannelsPerPon))
4036 }
4037
4038 va.storeMvlansMap(mvlan, name, mvp)
4039
4040 /* Need to protect groups and proxy write lock */
4041 mvp.mvpLock.Lock()
4042 for grpName, grpInfo := range groups {
4043 mvp.AddMvlanGroup(grpName, grpInfo)
4044 }
4045 for grpName, proxyInfo := range proxy {
4046 mvp.AddMvlanProxy(grpName, proxyInfo)
4047 }
4048 if _, ok := mvp.Groups[common.StaticGroup]; ok {
4049 if _, yes := mvp.Proxy[common.StaticGroup]; !yes {
4050 mvp.Groups[common.StaticGroup].IsStatic = true
4051 }
4052 }
4053
4054 logger.Debugw(ctx, "Added MVLAN Profile", log.Fields{"MVLAN": mvp.Mvlan, "PonVlan": mvp.PonVlan, "Name": mvp.Name, "Grp IPs": mvp.Groups, "IsPonVlanPresent": mvp.IsPonVlanPresent})
4055 mvp.mvpLock.Unlock()
4056
4057 if err := mvp.WriteToDb(); err != nil {
4058 logger.Errorw(ctx, "Mvlan profile Write to DB failed", log.Fields{"ProfileName": mvp.Name})
4059 }
4060
4061 return nil
4062}
4063
4064//pushIgmpMcastFlows - Adds all IGMP related flows (generic DS flow & static group flows)
4065func (mvp *MvlanProfile) pushIgmpMcastFlows(OLTSerialNum string) {
4066
4067 mvp.mvpLock.RLock()
4068 defer mvp.mvpLock.RUnlock()
4069
4070 if mvp.DevicesList[OLTSerialNum] == Nil {
4071 logger.Infow(ctx, "Mvlan Profile not configure for device", log.Fields{"Device": OLTSerialNum, "Mvlan": mvp.Mvlan})
4072 return
4073 }
4074
4075 d := GetApplication().GetDeviceBySerialNo(OLTSerialNum)
4076 if d == nil {
4077 logger.Warnw(ctx, "Skipping Igmp & Mcast Flow processing: Device Not Found", log.Fields{"Device_SrNo": OLTSerialNum, "Mvlan": mvp.Mvlan})
4078 return
4079 }
4080
4081 p := d.GetPort(d.NniPort)
4082
4083 if p != nil && p.State == PortStateUp {
4084 logger.Infow(ctx, "NNI Port Status is: UP & Vlan Enabled", log.Fields{"Device": d, "port": p})
4085
4086 //Push Igmp DS Control Flows
4087 err := mvp.ApplyIgmpDSFlowForMvp(d.Name)
4088 if err != nil {
4089 logger.Errorw(ctx, "DS IGMP Flow Add Failed for device",
4090 log.Fields{"Reason": err.Error(), "device": d.Name})
4091 }
4092
4093 //Trigger Join for static channels
4094 if channelList, containsStatic := mvp.getAllStaticChannels(); containsStatic {
4095 mvp.ProcessStaticGroup(d.Name, channelList, true)
4096 } else {
4097 logger.Infow(ctx, "No Static Channels Present", log.Fields{"mvp": mvp.Name, "Mvlan": mvp.Mvlan})
4098 }
4099 }
4100}
4101
4102/*
4103//pushIgmpMcastFlowsToAllOlt - Adds all IGMP related flows (generic DS flow & static group flows) to all OLTs
4104func (mvp *MvlanProfile) pushIgmpMcastFlowsToAllOlt() {
4105
4106 //for all devices apply igmp DS trap flow rules
4107 pushIgmpFlows := func(key interface{}, value interface{}) bool {
4108 d := value.(*VoltDevice)
4109 p := d.GetPort(d.NniPort)
4110 if p != nil && p.State == PortStateUp {
4111 logger.Infow(ctx, "NNI Port Status is: UP & Vlan Enabled", log.Fields{"Device": d, "port": p})
4112
4113 //Push Igmp DS Control Flows
4114 err := mvp.ApplyIgmpDSFlowForMvp(d.Name)
4115 if err != nil {
4116 logger.Errorw(ctx, "DS IGMP Flow Add Failed for device",
4117 log.Fields{"Reason": err.Error(), "device": d.Name})
4118 }
4119
4120 //Trigger Join for static channels
4121 if channelList, containsStatic := mvp.getAllStaticChannels(); containsStatic {
4122 mvp.ProcessStaticGroup(d.Name, channelList, true)
4123 } else {
4124 logger.Infow(ctx, "No Static Channels Present", log.Fields{"mvp": mvp.Name, "Mvlan": mvp.Mvlan})
4125 }
4126 }
4127 return true
4128 }
4129 mvp.mvpLock.RLock()
4130 defer mvp.mvpLock.RUnlock()
4131 GetApplication().DevicesDisc.Range(pushIgmpFlows)
4132}
4133
4134//removeIgmpFlows - Removes all IGMP related flows (generic DS flow)
4135func (mvp *MvlanProfile) removeIgmpFlows(oltSerialNum string) {
4136
4137 if d := GetApplication().GetDeviceBySerialNo(oltSerialNum); d != nil {
4138 p := d.GetPort(d.NniPort)
4139 if p != nil {
4140 logger.Infow(ctx, "NNI Port Status is: UP", log.Fields{"Device": d, "port": p})
4141 err := mvp.RemoveIgmpDSFlowForMvp(d.Name)
4142 if err != nil {
4143 logger.Errorw(ctx, "DS IGMP Flow Del Failed", log.Fields{"Reason": err.Error(), "device": d.Name})
4144 }
4145 }
4146 }
4147}*/
4148
4149//removeIgmpMcastFlows - Removes all IGMP related flows (generic DS flow & static group flows)
4150func (mvp *MvlanProfile) removeIgmpMcastFlows(oltSerialNum string) {
4151
4152 mvp.mvpLock.RLock()
4153 defer mvp.mvpLock.RUnlock()
4154
4155 if d := GetApplication().GetDeviceBySerialNo(oltSerialNum); d != nil {
4156 p := d.GetPort(d.NniPort)
4157 if p != nil {
4158 logger.Infow(ctx, "NNI Port Status is: UP", log.Fields{"Device": d, "port": p})
4159
4160 // ***Do not change the order***
4161 // When Vlan is disabled, the process end is determined by the DS Igmp flag in device
4162
4163 //Trigger Leave for static channels
4164 if channelList, containsStatic := mvp.getAllStaticChannels(); containsStatic {
4165 mvp.ProcessStaticGroup(d.Name, channelList, false)
4166 } else {
4167 logger.Infow(ctx, "No Static Channels Present", log.Fields{"mvp": mvp.Name, "Mvlan": mvp.Mvlan})
4168 }
4169
4170 //Remove all dynamic members for the Mvlan Profile
4171 GetApplication().IgmpGroups.Range(func(key, value interface{}) bool {
4172 ig := value.(*IgmpGroup)
4173 if ig.Mvlan == mvp.Mvlan {
4174 igd := ig.Devices[d.Name]
4175 ig.DelIgmpGroupDevice(igd)
4176 if ig.NumDevicesActive() == 0 {
4177 GetApplication().DelIgmpGroup(ig)
4178 }
4179 }
4180 return true
4181 })
4182
4183 //Remove DS Igmp trap flow
4184 err := mvp.RemoveIgmpDSFlowForMvp(d.Name)
4185 if err != nil {
4186 logger.Errorw(ctx, "DS IGMP Flow Del Failed", log.Fields{"Reason": err.Error(), "device": d.Name})
4187 }
4188 }
4189 }
4190}
4191
4192// ApplyIgmpDSFlowForMvp to apply Igmp DS flow for mvlan.
4193func (mvp *MvlanProfile) ApplyIgmpDSFlowForMvp(device string) error {
4194 va := GetApplication()
4195 dIntf, ok := va.DevicesDisc.Load(device)
4196 if !ok {
4197 return errors.New("Device Doesn't Exist")
4198 }
4199 d := dIntf.(*VoltDevice)
4200 mvlan := mvp.Mvlan
4201
4202 flowAlreadyApplied, ok := d.IgmpDsFlowAppliedForMvlan[uint16(mvlan)]
4203 if !ok || !flowAlreadyApplied {
4204 flows, err := mvp.BuildIgmpDSFlows(device)
4205 if err == nil {
4206 err = cntlr.GetController().AddFlows(d.NniPort, device, flows)
4207 if err != nil {
4208 logger.Warnw(ctx, "Configuring IGMP Flow for device failed ", log.Fields{"Device": device, "err": err})
4209 return err
4210 }
4211 d.IgmpDsFlowAppliedForMvlan[uint16(mvlan)] = true
4212 logger.Infow(ctx, "Updating voltDevice that IGMP DS flow as \"added\" for ",
4213 log.Fields{"device": d.SerialNum, "mvlan": mvlan})
4214 } else {
4215 logger.Errorw(ctx, "DS IGMP Flow Add Failed", log.Fields{"Reason": err.Error(), "Mvlan": mvlan})
4216 }
4217 }
4218
4219 return nil
4220}
4221
4222// RemoveIgmpDSFlowForMvp to remove Igmp DS flow for mvlan.
4223func (mvp *MvlanProfile) RemoveIgmpDSFlowForMvp(device string) error {
4224
4225 va := GetApplication()
4226 mvlan := mvp.Mvlan
4227
4228 dIntf, ok := va.DevicesDisc.Load(device)
4229 if !ok {
4230 return errors.New("Device Doesn't Exist")
4231 }
4232 d := dIntf.(*VoltDevice)
4233 /* No need of strict check during DS IGMP deletion
4234 flowAlreadyApplied, ok := d.IgmpDsFlowAppliedForMvlan[uint16(mvlan)]
4235 if ok && flowAlreadyApplied
4236 */
4237 flows, err := mvp.BuildIgmpDSFlows(device)
4238 if err == nil {
4239 flows.ForceAction = true
4240
4241 err = mvp.DelFlows(d, flows)
4242 if err != nil {
4243 logger.Warnw(ctx, "De-Configuring IGMP Flow for device failed ", log.Fields{"Device": device, "err": err})
4244 return err
4245 }
4246 d.IgmpDsFlowAppliedForMvlan[uint16(mvlan)] = false
4247 logger.Infow(ctx, "Updating voltDevice that IGMP DS flow as \"removed\" for ",
4248 log.Fields{"device": d.SerialNum, "mvlan": mvlan})
4249 } else {
4250 logger.Errorw(ctx, "DS IGMP Flow Del Failed", log.Fields{"Reason": err.Error()})
4251 }
4252
4253 return nil
4254}
4255
4256// BuildIgmpDSFlows to build Igmp DS flows for NNI port
4257func (mvp *MvlanProfile) BuildIgmpDSFlows(device string) (*of.VoltFlow, error) {
4258 dIntf, ok := GetApplication().DevicesDisc.Load(device)
4259 if !ok {
4260 return nil, errors.New("Device Doesn't Exist")
4261 }
4262 d := dIntf.(*VoltDevice)
4263
4264 logger.Infow(ctx, "Building DS IGMP Flow for NNI port", log.Fields{"vs": d.NniPort, "Mvlan": mvp.Mvlan})
4265 flow := &of.VoltFlow{}
4266 flow.SubFlows = make(map[uint64]*of.VoltSubFlow)
4267 subFlow := of.NewVoltSubFlow()
4268 subFlow.SetTableID(0)
4269 subFlow.SetMatchVlan(mvp.Mvlan)
4270
4271 nniPort, err := GetApplication().GetNniPort(device)
4272 if err != nil {
4273 return nil, err
4274 }
4275 nniPortID, err1 := GetApplication().GetPortID(nniPort)
4276 if err1 != nil {
4277 return nil, errors.New("Unknown NNI outport")
4278 }
4279 subFlow.SetInPort(nniPortID)
4280 subFlow.SetIgmpMatch()
4281 subFlow.SetReportToController()
4282 subFlow.Cookie = uint64(nniPortID)<<32 | uint64(mvp.Mvlan)
4283 subFlow.Priority = of.IgmpFlowPriority
4284
4285 flow.SubFlows[subFlow.Cookie] = subFlow
4286 logger.Infow(ctx, "Built DS IGMP flow", log.Fields{"cookie": subFlow.Cookie, "subflow": subFlow})
4287 return flow, nil
4288}
4289
4290//updateStaticGroups - Generates static joins & leaves for newly added and removed static channels respectively
4291func (mvp *MvlanProfile) updateStaticGroups(deviceID string, added []net.IP, removed []net.IP) {
4292
4293 //Update static group configs for all associated devices
4294 updateGroups := func(key interface{}, value interface{}) bool {
4295 d := value.(*VoltDevice)
4296
4297 if mvp.DevicesList[d.SerialNum] == Nil {
4298 logger.Infow(ctx, "Mvlan Profile not configure for device", log.Fields{"Device": d, "Profile Device List": mvp.DevicesList})
4299 return true
4300 }
4301 //TODO if mvp.IsChannelBasedGroup {
4302 mvp.ProcessStaticGroup(d.Name, added, true)
4303 mvp.ProcessStaticGroup(d.Name, removed, false)
4304 //}
4305 return true
4306 }
4307
4308 if deviceID != "" {
4309 vd := GetApplication().GetDevice(deviceID)
4310 updateGroups(deviceID, vd)
4311 } else {
4312 GetApplication().DevicesDisc.Range(updateGroups)
4313 }
4314}
4315
4316//updateDynamicGroups - Generates joins with updated sources for existing channels
4317func (mvp *MvlanProfile) updateDynamicGroups(deviceID string, added []net.IP, removed []net.IP) {
4318
4319 //mvlan := mvp.Mvlan
4320 va := GetApplication()
4321
4322 updateGroups := func(key interface{}, value interface{}) bool {
4323 d := value.(*VoltDevice)
4324
4325 if mvp.DevicesList[d.SerialNum] == Nil {
4326 logger.Infow(ctx, "Mvlan Profile not configure for device", log.Fields{"Device": d, "Profile Device List": mvp.DevicesList})
4327 return true
4328 }
4329 for _, groupAddr := range added {
4330
4331 _, gName := va.GetMvlanProfileForMcIP(mvp.Name, groupAddr)
4332 grpKey := mvp.generateGroupKey(gName, groupAddr.String())
4333 logger.Debugw(ctx, "IGMP Group", log.Fields{"Group": grpKey, "groupAddr": groupAddr})
4334 if igIntf, ok := va.IgmpGroups.Load(grpKey); ok {
4335 ig := igIntf.(*IgmpGroup)
4336 if igd, ok := ig.getIgmpGroupDevice(d.Name); ok {
4337 if igcIntf, ok := igd.GroupChannels.Load(groupAddr.String()); ok {
4338 igc := igcIntf.(*IgmpGroupChannel)
4339 incl := false
4340 var ip []net.IP
4341 var groupModified = false
4342 if _, ok := mvp.Proxy[igc.GroupName]; ok {
4343 if mvp.Proxy[igc.GroupName].Mode == common.Include {
4344 incl = true
4345 }
4346 ip = mvp.Proxy[igc.GroupName].SourceList
4347 }
4348 for port, igp := range igc.NewReceivers {
4349 // Process the include/exclude list which may end up modifying the group
4350 if change, _ := igc.ProcessSources(port, ip, incl); change {
4351 groupModified = true
4352 }
4353 igc.ProcessMode(port, incl)
4354
4355 if err := igp.WriteToDb(igc.Mvlan, igc.GroupAddr, igc.Device); err != nil {
4356 logger.Errorw(ctx, "Igmp group port Write to DB failed", log.Fields{"mvlan": igc.Mvlan, "GroupAddr": igc.GroupAddr})
4357 }
4358 }
4359 // If the group is modified as this is the first receiver or due to include/exclude list modification
4360 // send a report to the upstream multicast servers
4361 if groupModified {
4362 logger.Debug(ctx, "Group Modified and IGMP report sent to the upstream server")
4363 igc.SendReport(false)
4364 }
4365 if err := igc.WriteToDb(); err != nil {
4366 logger.Errorw(ctx, "Igmp group channel Write to DB failed", log.Fields{"mvlan": igc.Mvlan, "GroupAddr": igc.GroupAddr})
4367 }
4368 }
4369 }
4370 }
4371 }
4372
4373 return true
4374 }
4375
4376 if deviceID != "" {
4377 vd := GetApplication().GetDevice(deviceID)
4378 updateGroups(deviceID, vd)
4379 } else {
4380 GetApplication().DevicesDisc.Range(updateGroups)
4381 }
4382}
4383
4384//GroupsUpdated - Handles removing of Igmp Groups, flows & group table entries for
4385//channels removed as part of update
4386func (mvp *MvlanProfile) GroupsUpdated(deviceID string) {
4387
4388 deleteChannelIfRemoved := func(key interface{}, value interface{}) bool {
4389 ig := value.(*IgmpGroup)
4390
4391 if ig.Mvlan != mvp.Mvlan {
4392 return true
4393 }
4394 grpName := ig.GroupName
4395 logger.Infow(ctx, "###Update Cycle", log.Fields{"IG": ig.GroupName, "Addr": ig.GroupAddr})
4396 //Check if group exists and remove the entire group object otherwise
4397 if currentChannels := mvp.Groups[grpName]; currentChannels != nil {
4398
4399 if mvp.IsChannelBasedGroup {
4400 channelPresent := doesIPMatch(ig.GroupAddr, currentChannels.McIPs)
4401 if channelPresent || mvp.isChannelStatic(ig.GroupAddr) {
4402 return true
4403 }
4404 } else {
4405 allExistingChannels := ig.GetAllIgmpChannelForDevice(deviceID)
4406 for channel := range allExistingChannels {
4407 channelIP := net.ParseIP(channel)
4408 channelPresent := mvp.IsChannelPresent(channelIP, currentChannels.McIPs, mvp.IsStaticGroup(ig.GroupName))
4409 if channelPresent {
4410 staticChannel := mvp.isChannelStatic(channelIP)
4411 logger.Infow(ctx, "###Channel Comparision", log.Fields{"staticChannel": staticChannel, "Group": mvp.IsStaticGroup(ig.GroupName), "Channel": channel})
4412 // Logic:
4413 // If channel is Static & existing Group is also static - No migration required
4414 // If channel is not Static & existing Group is also not static - No migration required
4415
4416 // If channel is Static and existing Group is not static - Migrate (from dynamic to static)
4417 // (Channel already part of dynamic, added to static)
4418
4419 // If channel is not Static but existing Group is static - Migrate (from static to dynamic)
4420 // (Channel removed from satic but part of dynamic)
4421 if (staticChannel != mvp.IsStaticGroup(ig.GroupName)) || (ig.IsGroupStatic != mvp.IsStaticGroup(ig.GroupName)) { // Equivalent of XOR
4422 ig.HandleGroupMigration(deviceID, channelIP)
4423 } else {
4424 if (ig.IsGroupStatic) && mvp.IsStaticGroup(ig.GroupName) {
4425 if ig.GroupName != mvp.GetStaticGroupName(channelIP) {
4426 ig.HandleGroupMigration(deviceID, channelIP)
4427 }
4428 }
4429 continue
4430 }
4431 } else {
4432 logger.Debugw(ctx, "Channel Removed", log.Fields{"Channel": channel, "Group": grpName})
4433 ig.DelIgmpChannel(deviceID, net.ParseIP(channel))
4434 if ig.NumDevicesActive() == 0 {
4435 GetApplication().DelIgmpGroup(ig)
4436 }
4437 }
4438 }
4439 ig.IsGroupStatic = mvp.IsStaticGroup(ig.GroupName)
4440 if err := ig.WriteToDb(); err != nil {
4441 logger.Errorw(ctx, "Igmp group Write to DB failed", log.Fields{"groupName": ig.GroupName})
4442 }
4443 return true
4444 }
4445 }
4446 logger.Debugw(ctx, "Group Removed", log.Fields{"Channel": ig.GroupAddr, "Group": grpName, "ChannelBasedGroup": ig.IsChannelBasedGroup})
4447 ig.DelIgmpGroup()
4448 logger.Debugw(ctx, "Removed Igmp Group", log.Fields{"Channel": ig.GroupAddr, "Group": grpName})
4449 return true
4450 }
4451 GetApplication().IgmpGroups.Range(deleteChannelIfRemoved)
4452}
4453
4454// IsChannelPresent to check if channel is present
4455func (mvp *MvlanProfile) IsChannelPresent(channelIP net.IP, groupChannelList []string, IsStaticGroup bool) bool {
4456 // Only in case of static group, migration need to be supported.
4457 // Dynamic to dynamic group migration not supported currently
4458 if doesIPMatch(channelIP, groupChannelList) || mvp.isChannelStatic(channelIP) {
4459 return true
4460 } else if IsStaticGroup {
4461 return (mvp.GetMvlanGroup(channelIP) != "")
4462 }
4463
4464 return false
4465}
4466
4467// GetMvlanProfileForMcIP - Get an MVLAN profile for a given MC IP. This is used when an
4468// IGMP report is received from the PON port. The MVLAN profile
4469// located is used to idnetify the MC VLAN used in upstream for
4470// join/leave
4471func (va *VoltApplication) GetMvlanProfileForMcIP(profileName string, ip net.IP) (*MvlanProfile, string) {
4472 if mvpIntf, ok := va.MvlanProfilesByName.Load(profileName); ok {
4473 mvp := mvpIntf.(*MvlanProfile)
4474 if grpName := mvp.GetMvlanGroup(ip); grpName != "" {
4475 return mvp, grpName
4476 }
4477 } else {
4478 logger.Warnw(ctx, "Mvlan Profile not found for given profile name", log.Fields{"Profile": profileName})
4479 }
4480 return nil, ""
4481}
4482
4483// GetMvlanGroup to get mvlan group
4484func (mvp *MvlanProfile) GetMvlanGroup(ip net.IP) string {
4485 //Check for Static Group First
4486 if mvp.containsStaticChannels() {
4487 grpName := mvp.GetStaticGroupName(ip)
4488 if grpName != "" {
4489 return grpName
4490 }
4491 }
4492
4493 for _, mvg := range mvp.Groups {
4494 if mvg.Wildcard {
4495 return mvg.Name
4496 }
4497 if doesIPMatch(ip, mvg.McIPs) {
4498 return mvg.Name
4499 }
4500 }
4501 return ""
4502}
4503
4504// IgmpTick for igmp tick info
4505func (va *VoltApplication) IgmpTick() {
4506 tickCount++
4507 if (tickCount % 1000) == 0 {
4508 logger.Debugw(ctx, "Time @ Tick", log.Fields{"Tick": tickCount, "Time": time.Now()})
4509 }
4510 igmptick := func(key interface{}, value interface{}) bool {
4511 ig := value.(*IgmpGroup)
4512 if ig.NumDevicesActive() != 0 {
4513 if tickCount%10 == ig.Hash()%10 {
4514 ig.IgmpGroupLock.Lock()
4515 ig.Tick()
4516 ig.IgmpGroupLock.Unlock()
4517 if ig.NumDevicesActive() == 0 {
4518 va.DelIgmpGroup(ig)
4519 }
4520 }
4521 }
4522 return true
4523 }
4524 va.IgmpGroups.Range(igmptick)
4525}
4526
4527// Tick to add Tick Task
4528func (va *VoltApplication) Tick() {
4529 tt := NewTickTask()
4530 va.IgmpTasks.AddTask(tt)
4531 // va.IgmpTick()
4532}
4533
4534//AddIgmpProfile for addition of IGMP Profile
4535func (va *VoltApplication) AddIgmpProfile(igmpProfileConfig *common.IGMPConfig) error {
4536 var igmpProfile *IgmpProfile
4537
4538 if igmpProfileConfig.ProfileID == DefaultIgmpProfID {
4539 logger.Info(ctx, "Updating default IGMP profile")
4540 return va.UpdateIgmpProfile(igmpProfileConfig)
4541 }
4542
4543 igmpProfile = va.checkIgmpProfileMap(igmpProfileConfig.ProfileID)
4544 if igmpProfile == nil {
4545 igmpProfile = newIgmpProfile(igmpProfileConfig)
4546 } else {
4547 logger.Errorw(ctx, "IGMP profile already exists", log.Fields{"IgmpProfile": igmpProfileConfig.ProfileID})
4548 return errors.New("IGMP Profile already exists")
4549 }
4550
4551 va.storeIgmpProfileMap(igmpProfileConfig.ProfileID, igmpProfile)
4552
4553 if err := igmpProfile.WriteToDb(); err != nil {
4554 logger.Errorw(ctx, "Igmp profile Write to DB failed", log.Fields{"profileID": igmpProfile.ProfileID})
4555 }
4556
4557 return nil
4558}
4559
4560func newIgmpProfile(igmpProfileConfig *common.IGMPConfig) *IgmpProfile {
4561 var igmpProfile IgmpProfile
4562 igmpProfile.ProfileID = igmpProfileConfig.ProfileID
4563 igmpProfile.UnsolicitedTimeOut = uint32(igmpProfileConfig.UnsolicitedTimeOut)
4564 igmpProfile.MaxResp = uint32(igmpProfileConfig.MaxResp)
4565
4566 keepAliveInterval := uint32(igmpProfileConfig.KeepAliveInterval)
4567
4568 //KeepAliveInterval should have a min of 10 seconds
4569 if keepAliveInterval < MinKeepAliveInterval {
4570 keepAliveInterval = MinKeepAliveInterval
4571 logger.Infow(ctx, "Auto adjust keepAliveInterval - Value < 10", log.Fields{"Received": igmpProfileConfig.KeepAliveInterval, "Configured": keepAliveInterval})
4572 }
4573 igmpProfile.KeepAliveInterval = keepAliveInterval
4574
4575 igmpProfile.KeepAliveCount = uint32(igmpProfileConfig.KeepAliveCount)
4576 igmpProfile.LastQueryInterval = uint32(igmpProfileConfig.LastQueryInterval)
4577 igmpProfile.LastQueryCount = uint32(igmpProfileConfig.LastQueryCount)
4578 igmpProfile.FastLeave = *igmpProfileConfig.FastLeave
4579 igmpProfile.PeriodicQuery = *igmpProfileConfig.PeriodicQuery
4580 igmpProfile.IgmpCos = uint8(igmpProfileConfig.IgmpCos)
4581 igmpProfile.WithRAUpLink = *igmpProfileConfig.WithRAUpLink
4582 igmpProfile.WithRADownLink = *igmpProfileConfig.WithRADownLink
4583
4584 if igmpProfileConfig.IgmpVerToServer == "2" || igmpProfileConfig.IgmpVerToServer == "v2" {
4585 igmpProfile.IgmpVerToServer = "2"
4586 } else {
4587 igmpProfile.IgmpVerToServer = "3"
4588 }
4589 igmpProfile.IgmpSourceIP = net.ParseIP(igmpProfileConfig.IgmpSourceIP)
4590
4591 return &igmpProfile
4592}
4593
4594// checkIgmpProfileMap to get Igmp Profile. If not found return nil
4595func (va *VoltApplication) checkIgmpProfileMap(name string) *IgmpProfile {
4596 if igmpProfileIntf, ok := va.IgmpProfilesByName.Load(name); ok {
4597 return igmpProfileIntf.(*IgmpProfile)
4598 }
4599 return nil
4600}
4601
4602// newDefaultIgmpProfile Igmp profiles with default values
4603func newDefaultIgmpProfile() *IgmpProfile {
4604 return &IgmpProfile{
4605 ProfileID: DefaultIgmpProfID,
4606 UnsolicitedTimeOut: 60,
4607 MaxResp: 10, // seconds
4608 KeepAliveInterval: 60, // seconds
4609 KeepAliveCount: 3, // TODO - May not be needed
4610 LastQueryInterval: 0, // TODO - May not be needed
4611 LastQueryCount: 0, // TODO - May not be needed
4612 FastLeave: true,
4613 PeriodicQuery: false, // TODO - May not be needed
4614 IgmpCos: 7, //p-bit value included in the IGMP packet
4615 WithRAUpLink: false, // TODO - May not be needed
4616 WithRADownLink: false, // TODO - May not be needed
4617 IgmpVerToServer: "3",
4618 IgmpSourceIP: net.ParseIP("172.27.0.1"), // This will be replaced by configuration
4619 }
4620}
4621
4622func (va *VoltApplication) resetIgmpProfileToDefault() {
4623 igmpProf := va.getIgmpProfileMap(DefaultIgmpProfID)
4624 defIgmpProf := newDefaultIgmpProfile()
4625
4626 igmpProf.UnsolicitedTimeOut = defIgmpProf.UnsolicitedTimeOut
4627 igmpProf.MaxResp = defIgmpProf.MaxResp
4628 igmpProf.KeepAliveInterval = defIgmpProf.KeepAliveInterval
4629 igmpProf.KeepAliveCount = defIgmpProf.KeepAliveCount
4630 igmpProf.LastQueryInterval = defIgmpProf.LastQueryInterval
4631 igmpProf.LastQueryCount = defIgmpProf.LastQueryCount
4632 igmpProf.FastLeave = defIgmpProf.FastLeave
4633 igmpProf.PeriodicQuery = defIgmpProf.PeriodicQuery
4634 igmpProf.IgmpCos = defIgmpProf.IgmpCos
4635 igmpProf.WithRAUpLink = defIgmpProf.WithRAUpLink
4636 igmpProf.WithRADownLink = defIgmpProf.WithRADownLink
4637 igmpProf.IgmpVerToServer = defIgmpProf.IgmpVerToServer
4638 igmpProf.IgmpSourceIP = defIgmpProf.IgmpSourceIP
4639
4640 if err := igmpProf.WriteToDb(); err != nil {
4641 logger.Errorw(ctx, "Igmp profile Write to DB failed", log.Fields{"profileID": igmpProf.ProfileID})
4642 }
4643}
4644
4645// getIgmpProfileMap to get Igmp Profile. If not found return default IGMP config
4646func (va *VoltApplication) getIgmpProfileMap(name string) *IgmpProfile {
4647 if igmpProfileIntf, ok := va.IgmpProfilesByName.Load(name); ok {
4648 return igmpProfileIntf.(*IgmpProfile)
4649 }
4650
4651 // There will be always a default igmp profile.
4652 defaultIgmpProfileIntf, _ := va.IgmpProfilesByName.Load(DefaultIgmpProfID)
4653 return defaultIgmpProfileIntf.(*IgmpProfile)
4654}
4655
4656// storeIgmpProfileMap to store Igmp Profile
4657func (va *VoltApplication) storeIgmpProfileMap(name string, igmpProfile *IgmpProfile) {
4658 va.IgmpProfilesByName.Store(name, igmpProfile)
4659}
4660
4661// deleteIgmpProfileMap to delete Igmp Profile
4662func (va *VoltApplication) deleteIgmpProfileMap(name string) {
4663 va.IgmpProfilesByName.Delete(name)
4664}
4665
4666// WriteToDb is utility to write Igmp Config Info to database
4667func (igmpProfile *IgmpProfile) WriteToDb() error {
4668 igmpProfile.Version = database.PresentVersionMap[database.IgmpProfPath]
4669 b, err := json.Marshal(igmpProfile)
4670 if err != nil {
4671 return err
4672 }
4673 if err1 := db.PutIgmpProfile(igmpProfile.ProfileID, string(b)); err1 != nil {
4674 return err1
4675 }
4676 return nil
4677}
4678
4679//DelIgmpProfile for addition of IGMP Profile
4680func (va *VoltApplication) DelIgmpProfile(igmpProfileConfig *common.IGMPConfig) error {
4681 // Deletion of default igmp profile is blocked from submgr. Keeping additional check for safety.
4682 if igmpProfileConfig.ProfileID == DefaultIgmpProfID {
4683 logger.Info(ctx, "Resetting default IGMP profile")
4684 va.resetIgmpProfileToDefault()
4685 return nil
4686 }
4687 igmpProfile := va.checkIgmpProfileMap(igmpProfileConfig.ProfileID)
4688 if igmpProfile == nil {
4689 logger.Warnw(ctx, "Igmp Profile not found. Unable to delete", log.Fields{"Profile ID": igmpProfileConfig.ProfileID})
4690 return nil
4691 }
4692
4693 va.deleteIgmpProfileMap(igmpProfileConfig.ProfileID)
4694
4695 _ = db.DelIgmpProfile(igmpProfileConfig.ProfileID)
4696
4697 return nil
4698}
4699
4700//UpdateIgmpProfile for addition of IGMP Profile
4701func (va *VoltApplication) UpdateIgmpProfile(igmpProfileConfig *common.IGMPConfig) error {
4702 igmpProfile := va.checkIgmpProfileMap(igmpProfileConfig.ProfileID)
4703 if igmpProfile == nil {
4704 logger.Errorw(ctx, "Igmp Profile not found. Unable to update", log.Fields{"Profile ID": igmpProfileConfig.ProfileID})
4705 return errors.New("IGMP Profile not found")
4706 }
4707
4708 igmpProfile.ProfileID = igmpProfileConfig.ProfileID
4709 igmpProfile.UnsolicitedTimeOut = uint32(igmpProfileConfig.UnsolicitedTimeOut)
4710 igmpProfile.MaxResp = uint32(igmpProfileConfig.MaxResp)
4711
4712 keepAliveInterval := uint32(igmpProfileConfig.KeepAliveInterval)
4713
4714 //KeepAliveInterval should have a min of 10 seconds
4715 if keepAliveInterval < MinKeepAliveInterval {
4716 keepAliveInterval = MinKeepAliveInterval
4717 logger.Infow(ctx, "Auto adjust keepAliveInterval - Value < 10", log.Fields{"Received": igmpProfileConfig.KeepAliveInterval, "Configured": keepAliveInterval})
4718 }
4719 igmpProfile.KeepAliveInterval = keepAliveInterval
4720
4721 igmpProfile.KeepAliveCount = uint32(igmpProfileConfig.KeepAliveCount)
4722 igmpProfile.LastQueryInterval = uint32(igmpProfileConfig.LastQueryInterval)
4723 igmpProfile.LastQueryCount = uint32(igmpProfileConfig.LastQueryCount)
4724 igmpProfile.FastLeave = *igmpProfileConfig.FastLeave
4725 igmpProfile.PeriodicQuery = *igmpProfileConfig.PeriodicQuery
4726 igmpProfile.IgmpCos = uint8(igmpProfileConfig.IgmpCos)
4727 igmpProfile.WithRAUpLink = *igmpProfileConfig.WithRAUpLink
4728 igmpProfile.WithRADownLink = *igmpProfileConfig.WithRADownLink
4729
4730 if igmpProfileConfig.IgmpVerToServer == "2" || igmpProfileConfig.IgmpVerToServer == "v2" {
4731 igmpProfile.IgmpVerToServer = "2"
4732 } else {
4733 igmpProfile.IgmpVerToServer = "3"
4734 }
4735
4736 if igmpProfileConfig.IgmpSourceIP != "" {
4737 igmpProfile.IgmpSourceIP = net.ParseIP(igmpProfileConfig.IgmpSourceIP)
4738 }
4739
4740 if err := igmpProfile.WriteToDb(); err != nil {
4741 logger.Errorw(ctx, "Igmp profile Write to DB failed", log.Fields{"profileID": igmpProfile.ProfileID})
4742 }
4743
4744 return nil
4745}
4746
4747// RestoreIGMPProfilesFromDb to read from the DB and restore IGMP Profiles
4748func (va *VoltApplication) RestoreIGMPProfilesFromDb() {
4749 // Loading IGMP profiles
4750 igmpProfiles, _ := db.GetIgmpProfiles()
4751 for _, igmpProfile := range igmpProfiles {
4752 b, ok := igmpProfile.Value.([]byte)
4753 if !ok {
4754 logger.Warn(ctx, "The value type is not []byte")
4755 continue
4756 }
4757 var igmpProf IgmpProfile
4758 err := json.Unmarshal(b, &igmpProf)
4759 if err != nil {
4760 logger.Warn(ctx, "Unmarshal of IGMP Profile failed")
4761 continue
4762 }
4763 va.storeIgmpProfileMap(igmpProf.ProfileID, &igmpProf)
4764 logger.Infow(ctx, "Restored Igmp Profile", log.Fields{"Conf": igmpProf})
4765 }
4766}
4767
4768// InitIgmpSrcMac for initialization of igmp source mac
4769func (va *VoltApplication) InitIgmpSrcMac() {
4770 srcMac, err := getPodMacAddr()
4771 if err != nil {
4772 igmpSrcMac = "00:11:11:11:11:11"
4773 return
4774 }
4775 igmpSrcMac = srcMac
4776}
4777
4778// removeIPFromList to remove ip from the list
4779func removeIPFromList(s []net.IP, value net.IP) []net.IP {
4780 i := 0
4781 for i = 0; i < len(s); i++ {
4782 if s[i].Equal(value) {
4783 break
4784 }
4785 }
4786 if i != len(s) {
4787 //It means value is found in the slice
4788 return append(s[0:i], s[i+1:]...)
4789 }
4790 return s
4791}
4792
4793// DelMvlanProfile for deletion of a MVLAN group
4794func (va *VoltApplication) DelMvlanProfile(name string) error {
4795 if mvpIntf, ok := va.MvlanProfilesByName.Load(name); ok {
4796 mvp := mvpIntf.(*MvlanProfile)
4797
4798 if len(mvp.DevicesList) == 0 {
4799 mvp.DeleteInProgress = true
4800 mvp.DelFromDb()
4801 va.deleteMvlansMap(mvp.Mvlan, name)
4802 logger.Debugw(ctx, "Deleted MVLAN Profile", log.Fields{"Name": mvp.Name})
4803 } else {
4804 logger.Errorw(ctx, "Unable to delete Mvlan Profile as there is still an OLT attached to it", log.Fields{"Name": mvp.Name,
4805 "Device List": mvp.DevicesList})
4806 return errors.New("MVLAN attached to devices")
4807 }
4808
4809 return nil
4810 }
4811 logger.Errorw(ctx, "MVLAN Profile not found", log.Fields{"MvlanProfile Name": name})
4812 return nil
4813}
4814
4815// ReceiverUpInd for receiver up indication
4816func (va *VoltApplication) ReceiverUpInd(device string, port string, mvpName string, vlan of.VlanType, pbits []of.PbitType) {
4817 logger.Infow(ctx, "Receiver Indication: UP", log.Fields{"device": device, "port": port, "MVP": mvpName, "vlan": vlan, "pbits": pbits})
4818 if mvpIntf, ok := va.MvlanProfilesByName.Load(mvpName); ok {
4819 mvp := mvpIntf.(*MvlanProfile)
4820 if devIntf, ok := va.DevicesDisc.Load(device); ok {
4821 dev := devIntf.(*VoltDevice)
4822 proxyCfg, proxyIP, _ := getIgmpProxyCfgAndIP(mvp.Mvlan, dev.SerialNum)
4823 for _, pbit := range pbits {
4824 sendGeneralQuery(device, port, vlan, uint8(pbit), proxyCfg, proxyIP)
4825 }
4826 } else {
4827 logger.Warnw(ctx, "Device not found for given port", log.Fields{"device": device, "port": port})
4828 }
4829 } else {
4830 logger.Warnw(ctx, "Mvlan Profile not found for given profileName", log.Fields{"MVP": mvpName, "vlan": vlan})
4831 }
4832}
4833
4834// sendGeneralQuery to send general query
4835func sendGeneralQuery(device string, port string, cVlan of.VlanType, pbit uint8, proxyCfg *IgmpProfile, proxyIP *net.IP) {
4836
4837 if queryPkt, err := Igmpv2QueryPacket(NullIPAddr, cVlan, *proxyIP, pbit, proxyCfg.MaxResp); err == nil {
4838 if err := cntlr.GetController().PacketOutReq(device, port, port, queryPkt, false); err != nil {
4839 logger.Warnw(ctx, "General Igmpv2 Query Failed to send", log.Fields{"Device": device, "Port": port, "Packet": queryPkt, "Pbit": pbit})
4840 } else {
4841 logger.Debugw(ctx, "General Igmpv2 Query Sent", log.Fields{"Device": device, "Port": port, "Packet": queryPkt, "Pbit": pbit})
4842 }
4843 }
4844 if getVersion(proxyCfg.IgmpVerToServer) == IgmpVersion3 {
4845 if queryPkt, err := Igmpv3QueryPacket(NullIPAddr, cVlan, *proxyIP, pbit, proxyCfg.MaxResp); err == nil {
4846 if err := cntlr.GetController().PacketOutReq(device, port, port, queryPkt, false); err != nil {
4847 logger.Warnw(ctx, "General Igmpv3 Query Failed to send", log.Fields{"Device": device, "Port": port, "Packet": queryPkt, "Pbit": pbit})
4848 } else {
4849 logger.Debugw(ctx, "General Igmpv3 Query Sent", log.Fields{"Device": device, "Port": port, "Packet": queryPkt, "Pbit": pbit})
4850 }
4851 }
4852 }
4853}
4854
4855// ReceiverDownInd to send receiver down indication
4856func (va *VoltApplication) ReceiverDownInd(device string, port string) {
4857 logger.Infow(ctx, " Receiver Indication: DOWN", log.Fields{"device": device, "port": port})
4858
4859 ponPortID := va.GetPonPortID(device, port)
4860
4861 del := func(key interface{}, value interface{}) bool {
4862 ig := value.(*IgmpGroup)
4863 ig.IgmpGroupLock.Lock()
4864 ig.DelReceiveronDownInd(device, port, ponPortID)
4865 ig.IgmpGroupLock.Unlock()
4866 if ig.NumDevicesActive() == 0 {
4867 va.DelIgmpGroup(ig)
4868 }
4869 return true
4870 }
4871 va.IgmpGroups.Range(del)
4872}
4873
4874// doesIPMatch to check if ip match with any ip from the list
4875func doesIPMatch(ip net.IP, ipsOrRange []string) bool {
4876 for _, ipOrRange := range ipsOrRange {
4877 if strings.Contains(ipOrRange, "-") {
4878 var splits = strings.Split(ipOrRange, "-")
4879 ipStart := util.IP2LongConv(net.ParseIP(splits[0]))
4880 ipEnd := util.IP2LongConv(net.ParseIP(splits[1]))
4881 if ipEnd < ipStart {
4882 return false
4883 }
4884 ipInt := util.IP2LongConv(ip)
4885 if ipInt >= ipStart && ipInt <= ipEnd {
4886 return true
4887 }
4888 } else if ip.Equal(net.ParseIP(ipOrRange)) {
4889 return true
4890 }
4891 }
4892 return false
4893}
4894
4895// ProcessStaticGroup - Process Static Join/Leave Req for static channels
4896func (mvp *MvlanProfile) ProcessStaticGroup(device string, groupAddresses []net.IP, isJoin bool) {
4897
4898 logger.Debugw(ctx, "Received Static Group Request", log.Fields{"Device": device, "Join": isJoin, "Group Address List": groupAddresses})
4899
4900 mvlan := mvp.Mvlan
4901 va := GetApplication()
4902
4903 //TODO - Handle bulk add of groupAddr
4904 for _, groupAddr := range groupAddresses {
4905
4906 ig := mvp.GetStaticIgmpGroup(groupAddr)
4907 if isJoin {
4908 vd := va.GetDevice(device)
4909 igmpProf, _, _ := getIgmpProxyCfgAndIP(mvlan, vd.SerialNum)
4910 ver := igmpProf.IgmpVerToServer
4911
4912 if ig == nil {
4913 // First time group Creation: Create the IGMP group and then add the receiver to the group
4914 logger.Infow(ctx, "Static IGMP Add received for new group", log.Fields{"Addr": groupAddr, "Port": StaticPort})
4915 if ig := GetApplication().AddIgmpGroup(mvp.Name, groupAddr, device); ig != nil {
4916 ig.IgmpGroupLock.Lock()
4917 ig.AddReceiver(device, StaticPort, groupAddr, nil, getVersion(ver),
4918 0, 0, 0xFF)
4919 ig.IgmpGroupLock.Unlock()
4920 } else {
4921 logger.Warnw(ctx, "Static IGMP Group Creation Failed", log.Fields{"Addr": groupAddr})
4922 }
4923 } else {
4924 //Converting existing dynamic group to static group
4925 if !mvp.IsStaticGroup(ig.GroupName) {
4926 ig.updateGroupName(ig.GroupName)
4927 }
4928 // Update case: If the IGMP group is already created. just add the receiver
4929 logger.Infow(ctx, "Static IGMP Add received for existing group", log.Fields{"Addr": groupAddr, "Port": StaticPort})
4930 ig.IgmpGroupLock.Lock()
4931 ig.AddReceiver(device, StaticPort, groupAddr, nil, getVersion(ver),
4932 0, 0, 0xFF)
4933 ig.IgmpGroupLock.Unlock()
4934 }
4935 } else if ig != nil {
4936 logger.Infow(ctx, "Static IGMP Del received for existing group", log.Fields{"Addr": groupAddr, "Port": StaticPort})
4937
4938 if ig.IsChannelBasedGroup {
4939 grpName := mvp.GetMvlanGroup(ig.GroupAddr)
4940 if grpName != "" {
4941 ig.IgmpGroupLock.Lock()
4942 ig.DelReceiver(device, StaticPort, groupAddr, nil, 0xFF)
4943 ig.IgmpGroupLock.Unlock()
4944 ig.updateGroupName(grpName)
4945 } else {
4946 ig.DelIgmpGroup()
4947 }
4948 } else {
4949 ig.IgmpGroupLock.Lock()
4950 ig.DelReceiver(device, StaticPort, groupAddr, nil, 0xFF)
4951 ig.IgmpGroupLock.Unlock()
4952 }
4953 if ig.NumDevicesActive() == 0 {
4954 GetApplication().DelIgmpGroup(ig)
4955 }
4956 } else {
4957 logger.Warnw(ctx, "Static IGMP Del received for unknown group", log.Fields{"Addr": groupAddr})
4958 }
4959 }
4960}
4961
4962//getStaticChannelDiff - return the static channel newly added and removed from existing static group
4963func (mvp *MvlanProfile) getStaticChannelDiff() (newlyAdded []net.IP, removed []net.IP, common []net.IP) {
4964
4965 var commonChannels []net.IP
4966 newChannelList, _ := mvp.getAllStaticChannels()
4967 existingChannelList, _ := mvp.getAllOldGroupStaticChannels()
4968 if len(existingChannelList) == 0 {
4969 return newChannelList, []net.IP{}, []net.IP{}
4970 }
4971 for _, newChannel := range append([]net.IP{}, newChannelList...) {
4972 for _, existChannel := range append([]net.IP{}, existingChannelList...) {
4973
4974 //Remove common channels between existing and new list
4975 // The remaining in the below slices give the results
4976 // Remaining in newChannelList: Newly added
4977 // Remaining in existingChannelList: Removed channels
4978 if existChannel.Equal(newChannel) {
4979 existingChannelList = removeIPFromList(existingChannelList, existChannel)
4980 newChannelList = removeIPFromList(newChannelList, newChannel)
4981 commonChannels = append(commonChannels, newChannel)
4982 logger.Infow(ctx, "#############Channel: "+existChannel.String()+" New: "+newChannel.String(), log.Fields{"Added": newChannelList, "Removed": existingChannelList})
4983 break
4984 }
4985 }
4986 }
4987 return newChannelList, existingChannelList, commonChannels
4988}
4989
4990//getGroupChannelDiff - return the channel newly added and removed from existing group
4991func (mvp *MvlanProfile) getGroupChannelDiff(newGroup *MvlanGroup, oldGroup *MvlanGroup) (newlyAdded []net.IP, removed []net.IP, common []net.IP) {
4992
4993 var commonChannels []net.IP
4994 newChannelList, _ := newGroup.getAllChannels()
4995 existingChannelList, _ := oldGroup.getAllChannels()
4996 if len(existingChannelList) == 0 {
4997 return newChannelList, []net.IP{}, []net.IP{}
4998 }
4999 for _, newChannel := range append([]net.IP{}, newChannelList...) {
5000 for _, existChannel := range append([]net.IP{}, existingChannelList...) {
5001
5002 //Remove common channels between existing and new list
5003 // The remaining in the below slices give the results
5004 // Remaining in newChannelList: Newly added
5005 // Remaining in existingChannelList: Removed channels
5006 if existChannel.Equal(newChannel) {
5007 existingChannelList = removeIPFromList(existingChannelList, existChannel)
5008 newChannelList = removeIPFromList(newChannelList, newChannel)
5009 commonChannels = append(commonChannels, newChannel)
5010 logger.Infow(ctx, "#############Channel: "+existChannel.String()+" New: "+newChannel.String(), log.Fields{"Added": newChannelList, "Removed": existingChannelList})
5011 break
5012 }
5013 }
5014 }
5015 return newChannelList, existingChannelList, commonChannels
5016}
5017
5018// UpdateProfile - Updates the group & member info w.r.t the mvlan profile for the given device
5019func (mvp *MvlanProfile) UpdateProfile(deviceID string) {
5020 logger.Infow(ctx, "Update Mvlan Profile task triggered", log.Fields{"Mvlan": mvp.Mvlan})
5021 var removedStaticChannels []net.IP
5022 addedStaticChannels := []net.IP{}
5023 /* Taking mvpLock to protect the mvp groups and proxy */
5024 mvp.mvpLock.RLock()
5025 defer mvp.mvpLock.RUnlock()
5026
5027 serialNo := ""
5028 if deviceID != "" {
5029 if vd := GetApplication().GetDevice(deviceID); vd != nil {
5030 serialNo = vd.SerialNum
5031 if mvp.DevicesList[serialNo] != UpdateInProgress {
5032 logger.Warnw(ctx, "Exiting Update Task since device not present in MvlanProfile", log.Fields{"Device": deviceID, "SerialNum": vd.SerialNum, "MvlanProfile": mvp})
5033 return
5034 }
5035 } else {
5036 logger.Errorw(ctx, "Volt Device not found. Stopping Update Mvlan Profile processing for device", log.Fields{"SerialNo": deviceID, "MvlanProfile": mvp})
5037 return
5038 }
5039 }
5040
5041 //Update the groups based on static channels added & removed
5042 if mvp.containsStaticChannels() {
5043 addedStaticChannels, removedStaticChannels, _ = mvp.getStaticChannelDiff()
5044 logger.Debugw(ctx, "Update Task - Static Group Changes", log.Fields{"Added": addedStaticChannels, "Removed": removedStaticChannels})
5045
5046 if len(addedStaticChannels) > 0 || len(removedStaticChannels) > 0 {
5047 mvp.updateStaticGroups(deviceID, []net.IP{}, removedStaticChannels)
5048 }
5049 }
5050 mvp.GroupsUpdated(deviceID)
5051 if len(addedStaticChannels) > 0 {
5052 mvp.updateStaticGroups(deviceID, addedStaticChannels, []net.IP{})
5053 }
5054
5055 /* Need to handle if SSM params are modified for groups */
5056 for key := range mvp.Groups {
5057 _, _, commonChannels := mvp.getGroupChannelDiff(mvp.Groups[key], mvp.oldGroups[key])
5058 if mvp.checkStaticGrpSSMProxyDiff(mvp.oldProxy[key], mvp.Proxy[key]) {
5059 if mvp.Groups[key].IsStatic {
5060 /* Static group proxy modified, need to trigger membership report with new mode/src-list for existing channels */
5061 mvp.updateStaticGroups(deviceID, commonChannels, []net.IP{})
5062 } else {
5063 /* Dynamic group proxy modified, need to trigger membership report with new mode/src-list for existing channels */
5064 mvp.updateDynamicGroups(deviceID, commonChannels, []net.IP{})
5065 }
5066 }
5067 }
5068
5069 mvp.SetUpdateStatus(serialNo, NoOp)
5070
5071 if deviceID == "" || !mvp.isUpdateInProgress() {
5072 mvp.oldGroups = nil
5073 }
5074 if err := mvp.WriteToDb(); err != nil {
5075 logger.Errorw(ctx, "Mvlan profile write to DB failed", log.Fields{"ProfileName": mvp.Name})
5076 }
5077 logger.Debugw(ctx, "Updated MVLAN Profile", log.Fields{"VLAN": mvp.Mvlan, "Name": mvp.Name, "Grp IPs": mvp.Groups})
5078}
5079
5080//checkStaticGrpSSMProxyDiff- return true if the proxy of oldGroup is modified in newGroup
5081func (mvp *MvlanProfile) checkStaticGrpSSMProxyDiff(oldProxy *MCGroupProxy, newProxy *MCGroupProxy) bool {
5082
5083 if oldProxy == nil && newProxy == nil {
5084 return false
5085 }
5086 if (oldProxy == nil && newProxy != nil) ||
5087 (oldProxy != nil && newProxy == nil) {
5088 return true
5089 }
5090
5091 if oldProxy.Mode != newProxy.Mode {
5092 return true
5093 }
5094
5095 oldSrcLst := oldProxy.SourceList
5096 newSrcLst := newProxy.SourceList
5097 oLen := len(oldSrcLst)
5098 nLen := len(newSrcLst)
5099 if oLen != nLen {
5100 return true
5101 }
5102
5103 visited := make([]bool, nLen)
5104
5105 /* check if any new IPs added in the src list, return true if present */
5106 for i := 0; i < nLen; i++ {
5107 found := false
5108 element := newSrcLst[i]
5109 for j := 0; j < oLen; j++ {
5110 if visited[j] {
5111 continue
5112 }
5113 if element.Equal(oldSrcLst[j]) {
5114 visited[j] = true
5115 found = true
5116 break
5117 }
5118 }
5119 if !found {
5120 return true
5121 }
5122 }
5123
5124 visited = make([]bool, nLen)
5125 /* check if any IPs removed from existing src list, return true if removed */
5126 for i := 0; i < oLen; i++ {
5127 found := false
5128 element := oldSrcLst[i]
5129 for j := 0; j < nLen; j++ {
5130 if visited[j] {
5131 continue
5132 }
5133 if element.Equal(newSrcLst[j]) {
5134 visited[j] = true
5135 found = true
5136 break
5137 }
5138 }
5139 if !found {
5140 return true
5141 }
5142 }
5143 return false
5144}
5145
5146// ProcessMode process the received mode and updated the igp
5147func (igc *IgmpGroupChannel) ProcessMode(port string, incl bool) {
5148 /* Update the mode in igp if the mode has changed */
5149 igp := igc.GetReceiver(port)
5150 if igp.Exclude && incl {
5151 igp.Exclude = !incl
5152 if igc.Exclude > 0 {
5153 igc.Exclude--
5154 }
5155 } else if !incl && !igp.Exclude {
5156 igp.Exclude = !incl
5157 igc.Exclude++
5158 }
5159}
5160
5161func (ig *IgmpGroup) removeExpiredGroupFromDevice() {
5162 ig.PendingPoolLock.Lock()
5163 defer ig.PendingPoolLock.Unlock()
5164
5165 for device, timer := range ig.PendingGroupForDevice {
5166
5167 // To ensure no race-condition between the expiry time and the new Join,
5168 // ensure the group exists in pending pool before deletion
5169 groupExistsInPendingPool := true
5170
5171 if !time.Now().After(timer) {
5172 continue
5173 }
5174
5175 // Check if the IgmpGroup obj has no active member across any device
5176 // If Yes, then this group is part of global pending pool (IgmpPendingPool), hence if expired,
5177 // Remove only the IgmpGroup obj referenced to this device from global pool also.
5178 if ig.NumDevicesActive() == 0 {
5179 groupExistsInPendingPool = GetApplication().RemoveGroupFromPendingPool(device, ig)
5180 }
5181
5182 // Remove the group entry from device and remove the IgmpDev Obj
5183 // from IgmpGrp Pending pool
5184 if groupExistsInPendingPool {
5185 ig.DeleteIgmpGroupDevice(device)
5186 }
5187 }
5188}
5189
5190//DeleteIgmpGroupDevice - removes the IgmpGroupDevice obj from IgmpGroup and database
5191func (ig *IgmpGroup) DeleteIgmpGroupDevice(device string) {
5192
5193 logger.Infow(ctx, "Deleting IgmpGroupDevice from IG Pending Pool", log.Fields{"Device": device, "GroupID": ig.GroupID, "GroupName": ig.GroupName, "GroupAddr": ig.GroupAddr.String(), "PendingDevices": len(ig.Devices)})
5194
5195 igd := ig.Devices[device]
5196 igd.DelMcGroup(true)
5197 delete(ig.Devices, device)
5198 delete(ig.PendingGroupForDevice, device)
5199 _ = db.DelIgmpDevice(igd.Mvlan, igd.GroupName, igd.GroupAddr, igd.Device)
5200
5201 //If the group is not associated to any other device, then the entire Igmp Group obj itself can be removed
5202 if ig.NumDevicesAll() == 0 {
5203 logger.Infow(ctx, "Deleting IgmpGroup as all pending groups has expired", log.Fields{"Device": device, "GroupID": ig.GroupID, "GroupName": ig.GroupName, "GroupAddr": ig.GroupAddr.String(), "PendingDevices": len(ig.Devices)})
5204 GetApplication().DelIgmpGroup(ig)
5205 return
5206 }
5207 if err := ig.WriteToDb(); err != nil {
5208 logger.Errorw(ctx, "Igmp group Write to DB failed", log.Fields{"groupName": ig.GroupName})
5209 }
5210}
5211
5212//UpdateActiveChannelSubscriberAlarm - Updates the Active Channel Subscriber Alarm
5213func (mvp *MvlanProfile) UpdateActiveChannelSubscriberAlarm() {
5214 va := GetApplication()
5215 logger.Debugw(ctx, "Update of Active Channel Subscriber Alarm", log.Fields{"Mvlan": mvp.Mvlan})
5216 for srNo := range mvp.DevicesList {
5217 d := va.GetDeviceBySerialNo(srNo)
5218 if d == nil {
5219 logger.Warnw(ctx, "Device info not found", log.Fields{"Device_SrNo": srNo, "Mvlan": mvp.Mvlan})
5220 return
5221 }
5222 d.Ports.Range(func(key, value interface{}) bool {
5223 //port := key.(string)
5224 vp := value.(*VoltPort)
5225 if vp.Type != VoltPortTypeAccess {
5226 return true
5227 }
5228 if mvp.MaxActiveChannels > vp.ActiveChannels && vp.ChannelPerSubAlarmRaised {
5229 serviceName := GetMcastServiceForSubAlarm(vp, mvp)
5230 logger.Debugw(ctx, "Clearing-SendActiveChannelPerSubscriberAlarm-due-to-update", log.Fields{"ActiveChannels": vp.ActiveChannels, "ServiceName": serviceName})
5231 vp.ChannelPerSubAlarmRaised = false
5232 } else if mvp.MaxActiveChannels < vp.ActiveChannels && !vp.ChannelPerSubAlarmRaised {
5233 /* When the max active channel count is reduced via update, we raise an alarm.
5234 But the previous excess channels still exist until a leave or expiry */
5235 serviceName := GetMcastServiceForSubAlarm(vp, mvp)
5236 logger.Debugw(ctx, "Raising-SendActiveChannelPerSubscriberAlarm-due-to-update", log.Fields{"ActiveChannels": vp.ActiveChannels, "ServiceName": serviceName})
5237 vp.ChannelPerSubAlarmRaised = true
5238 }
5239 return true
5240 })
5241 }
5242}
5243
5244//TriggerAssociatedFlowDelete - Re-trigger delete for pending delete flows
5245func (mvp *MvlanProfile) TriggerAssociatedFlowDelete(device string) bool {
5246 mvp.mvpFlowLock.Lock()
5247
5248 cookieList := []uint64{}
5249 flowMap := mvp.PendingDeleteFlow[device]
5250
5251 for cookie := range flowMap {
5252 cookieList = append(cookieList, convertToUInt64(cookie))
5253 }
5254 mvp.mvpFlowLock.Unlock()
5255
5256 if len(cookieList) == 0 {
5257 return false
5258 }
5259
5260 for _, cookie := range cookieList {
5261 if vd := GetApplication().GetDevice(device); vd != nil {
5262 flow := &of.VoltFlow{}
5263 flow.SubFlows = make(map[uint64]*of.VoltSubFlow)
5264 subFlow := of.NewVoltSubFlow()
5265 subFlow.Cookie = cookie
5266 flow.SubFlows[cookie] = subFlow
5267 logger.Infow(ctx, "Retriggering Vnet Delete Flow", log.Fields{"Device": device, "Mvlan": mvp.Mvlan.String(), "Cookie": cookie})
5268 err := mvp.DelFlows(vd, flow)
5269 if err != nil {
5270 logger.Warnw(ctx, "De-Configuring IGMP Flow for device failed ", log.Fields{"Device": device, "err": err})
5271 }
5272 }
5273 }
5274 return true
5275}
Tinoj Josephc2ccd6b2022-07-19 04:32:15 +05305276
5277// JsonMarshal wrapper function for json Marshal MvlanProfile
5278func (mvp *MvlanProfile) JsonMarshal() ([]byte, error) {
5279 return json.Marshal(MvlanProfile{
5280 Name: mvp.Name,
5281 Mvlan: mvp.Mvlan,
5282 PonVlan: mvp.PonVlan,
5283 Groups: mvp.Groups,
5284 Proxy: mvp.Proxy,
5285 Version: mvp.Version,
5286 IsPonVlanPresent: mvp.IsPonVlanPresent,
5287 IsChannelBasedGroup: mvp.IsChannelBasedGroup,
5288 DevicesList: mvp.DevicesList,
5289 MaxActiveChannels: mvp.MaxActiveChannels,
5290 PendingDeleteFlow: mvp.PendingDeleteFlow,
5291 DeleteInProgress: mvp.DeleteInProgress,
5292 IgmpServVersion: mvp.IgmpServVersion,
5293 })
5294}