Tinoj Joseph | cf161be | 2022-07-07 19:47:47 +0530 | [diff] [blame] | 1 | /* |
| 2 | * Copyright 2022-present Open Networking Foundation |
| 3 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | * you may not use this file except in compliance with the License. |
| 5 | * You may obtain a copy of the License at |
| 6 | * |
| 7 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | * |
| 9 | * Unless required by applicable law or agreed to in writing, software |
| 10 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | * See the License for the specific language governing permissions and |
| 13 | * limitations under the License. |
| 14 | */ |
| 15 | |
| 16 | package application |
| 17 | |
| 18 | import ( |
Tinoj Joseph | 07cc537 | 2022-07-18 22:53:51 +0530 | [diff] [blame] | 19 | "context" |
Tinoj Joseph | cf161be | 2022-07-07 19:47:47 +0530 | [diff] [blame] | 20 | "encoding/json" |
| 21 | "net" |
| 22 | |
| 23 | "github.com/google/gopacket/layers" |
| 24 | |
| 25 | cntlr "voltha-go-controller/internal/pkg/controller" |
| 26 | "voltha-go-controller/internal/pkg/types" |
| 27 | "voltha-go-controller/internal/pkg/of" |
| 28 | "voltha-go-controller/log" |
| 29 | ) |
| 30 | |
| 31 | // IgmpGroupChannel structure |
| 32 | type IgmpGroupChannel struct { |
| 33 | Device string |
| 34 | GroupID uint32 |
| 35 | GroupName string |
| 36 | GroupAddr net.IP |
| 37 | Mvlan of.VlanType |
| 38 | Exclude int |
| 39 | ExcludeList []net.IP |
| 40 | IncludeList []net.IP |
| 41 | Version uint8 |
| 42 | ServVersion *uint8 `json:"-"` |
| 43 | CurReceivers map[string]*IgmpGroupPort `json:"-"` |
| 44 | NewReceivers map[string]*IgmpGroupPort `json:"-"` |
| 45 | proxyCfg **IgmpProfile |
| 46 | IgmpProxyIP **net.IP `json:"-"` |
| 47 | } |
| 48 | |
| 49 | // NewIgmpGroupChannel is constructor for a channel. The default IGMP version is set to 3 |
| 50 | // as the protocol defines the way to manage backward compatibility |
| 51 | // The implementation handles simultaneous presense of lower versioned |
| 52 | // receivers |
| 53 | func NewIgmpGroupChannel(igd *IgmpGroupDevice, groupAddr net.IP, version uint8) *IgmpGroupChannel { |
| 54 | var igc IgmpGroupChannel |
| 55 | igc.Device = igd.Device |
| 56 | igc.GroupID = igd.GroupID |
| 57 | igc.GroupName = igd.GroupName |
| 58 | igc.GroupAddr = groupAddr |
| 59 | igc.Mvlan = igd.Mvlan |
| 60 | igc.Version = version |
| 61 | igc.CurReceivers = make(map[string]*IgmpGroupPort) |
| 62 | igc.NewReceivers = make(map[string]*IgmpGroupPort) |
| 63 | igc.proxyCfg = &igd.proxyCfg |
| 64 | igc.IgmpProxyIP = &igd.IgmpProxyIP |
| 65 | igc.ServVersion = igd.ServVersion |
| 66 | return &igc |
| 67 | } |
| 68 | |
| 69 | // NewIgmpGroupChannelFromBytes create the IGMP group channel from a byte slice |
| 70 | func NewIgmpGroupChannelFromBytes(b []byte) (*IgmpGroupChannel, error) { |
| 71 | var igc IgmpGroupChannel |
| 72 | if err := json.Unmarshal(b, &igc); err != nil { |
| 73 | return nil, err |
| 74 | } |
| 75 | igc.CurReceivers = make(map[string]*IgmpGroupPort) |
| 76 | igc.NewReceivers = make(map[string]*IgmpGroupPort) |
| 77 | return &igc, nil |
| 78 | } |
| 79 | |
| 80 | // RestorePorts to restore ports |
Tinoj Joseph | 07cc537 | 2022-07-18 22:53:51 +0530 | [diff] [blame] | 81 | func (igc *IgmpGroupChannel) RestorePorts(cntx context.Context) { |
Tinoj Joseph | cf161be | 2022-07-07 19:47:47 +0530 | [diff] [blame] | 82 | |
Tinoj Joseph | 07cc537 | 2022-07-18 22:53:51 +0530 | [diff] [blame] | 83 | igc.migrateIgmpPorts(cntx) |
| 84 | ports, _ := db.GetIgmpRcvrs(cntx, igc.Mvlan, igc.GroupAddr, igc.Device) |
Tinoj Joseph | cf161be | 2022-07-07 19:47:47 +0530 | [diff] [blame] | 85 | for _, port := range ports { |
| 86 | b, ok := port.Value.([]byte) |
| 87 | if !ok { |
| 88 | logger.Warn(ctx, "The value type is not []byte") |
| 89 | continue |
| 90 | } |
| 91 | if igp, err := NewIgmpGroupPortFromBytes(b); err == nil { |
| 92 | igc.NewReceivers[igp.Port] = igp |
| 93 | logger.Infow(ctx, "Group Port Restored", log.Fields{"IGP": igp}) |
| 94 | } else { |
| 95 | logger.Warn(ctx, "Failed to decode port from DB") |
| 96 | } |
| 97 | } |
Tinoj Joseph | 07cc537 | 2022-07-18 22:53:51 +0530 | [diff] [blame] | 98 | if err := igc.WriteToDb(cntx); err != nil { |
Tinoj Joseph | cf161be | 2022-07-07 19:47:47 +0530 | [diff] [blame] | 99 | logger.Errorw(ctx, "Igmp group channel Write to DB failed", log.Fields{"mvlan": igc.Mvlan, "GroupAddr": igc.GroupAddr}) |
| 100 | } |
| 101 | } |
| 102 | |
| 103 | // WriteToDb is utility to write IGMPGroupChannel Info to database |
Tinoj Joseph | 07cc537 | 2022-07-18 22:53:51 +0530 | [diff] [blame] | 104 | func (igc *IgmpGroupChannel) WriteToDb(cntx context.Context) error { |
Tinoj Joseph | cf161be | 2022-07-07 19:47:47 +0530 | [diff] [blame] | 105 | b, err := json.Marshal(igc) |
| 106 | if err != nil { |
| 107 | return err |
| 108 | } |
Tinoj Joseph | 07cc537 | 2022-07-18 22:53:51 +0530 | [diff] [blame] | 109 | if err1 := db.PutIgmpChannel(cntx, igc.Mvlan, igc.GroupName, igc.Device, igc.GroupAddr, string(b)); err1 != nil { |
Tinoj Joseph | cf161be | 2022-07-07 19:47:47 +0530 | [diff] [blame] | 110 | return err1 |
| 111 | } |
| 112 | logger.Info(ctx, "IGC Updated") |
| 113 | return nil |
| 114 | } |
| 115 | |
| 116 | |
| 117 | // InclSourceIsIn checks if a source is in include list |
| 118 | func (igc *IgmpGroupChannel) InclSourceIsIn(src net.IP) bool { |
| 119 | return IsIPPresent(src, igc.IncludeList) |
| 120 | } |
| 121 | |
| 122 | // ExclSourceIsIn checks if a source is in exclude list |
| 123 | func (igc *IgmpGroupChannel) ExclSourceIsIn(src net.IP) bool { |
| 124 | return IsIPPresent(src, igc.ExcludeList) |
| 125 | } |
| 126 | |
| 127 | // AddInclSource adds a source is in include list |
| 128 | func (igc *IgmpGroupChannel) AddInclSource(src net.IP) { |
| 129 | logger.Debugw(ctx, "Adding Include Source for Channel", log.Fields{"Channel": igc.GroupAddr.String(), "Device": igc.Device, "Src": src}) |
| 130 | igc.IncludeList = append(igc.IncludeList, src) |
| 131 | } |
| 132 | |
| 133 | // AddExclSource adds a source is in exclude list |
| 134 | func (igc *IgmpGroupChannel) AddExclSource(src net.IP) { |
| 135 | logger.Debugw(ctx, "Adding Exclude Source for Channel", log.Fields{"Channel": igc.GroupAddr.String(), "Device": igc.Device, "Src": src}) |
| 136 | igc.ExcludeList = append(igc.ExcludeList, src) |
| 137 | } |
| 138 | |
| 139 | // UpdateExclSource update excl source list for the given channel |
| 140 | func (igc *IgmpGroupChannel) UpdateExclSource(srcList []net.IP) bool { |
| 141 | |
| 142 | logger.Debugw(ctx, "Updating Exclude Source for Channel", log.Fields{"Channel": igc.GroupAddr.String(), "Device": igc.Device, "Current List": igc.ExcludeList, "Incoming List": srcList}) |
| 143 | if !igc.IsExclListChanged(srcList) { |
| 144 | return false |
| 145 | } |
| 146 | |
| 147 | if igc.NumReceivers() == 1 { |
| 148 | igc.ExcludeList = srcList |
| 149 | } else { |
| 150 | igc.ExcludeList = igc.computeExclList(srcList) |
| 151 | } |
| 152 | |
| 153 | logger.Debugw(ctx, "Updated Exclude Source for Channel", log.Fields{"Channel": igc.GroupAddr.String(), "Device": igc.Device, "Updated Excl List": igc.ExcludeList}) |
| 154 | return true |
| 155 | } |
| 156 | |
| 157 | // computeExclList computes intersection of pervious & current src list |
| 158 | func (igc *IgmpGroupChannel) computeExclList(srcList []net.IP) []net.IP { |
| 159 | |
| 160 | updatedSrcList := []net.IP{} |
| 161 | for _, src := range srcList { |
| 162 | for _, excl := range igc.ExcludeList { |
| 163 | if src.Equal(excl) { |
| 164 | updatedSrcList = append(updatedSrcList, src) |
| 165 | } |
| 166 | } |
| 167 | } |
| 168 | return updatedSrcList |
| 169 | } |
| 170 | |
| 171 | // IsExclListChanged checks if excl list has been updated |
| 172 | func (igc *IgmpGroupChannel) IsExclListChanged(srcList []net.IP) bool { |
| 173 | |
| 174 | srcPresent := false |
| 175 | if len(igc.ExcludeList) != len(srcList) { |
| 176 | return true |
| 177 | } |
| 178 | |
| 179 | for _, src := range srcList { |
| 180 | for _, excl := range igc.ExcludeList { |
| 181 | srcPresent = false |
| 182 | if src.Equal(excl) { |
| 183 | srcPresent = true |
| 184 | break |
| 185 | } |
| 186 | } |
| 187 | if !srcPresent { |
| 188 | return true |
| 189 | } |
| 190 | } |
| 191 | return false |
| 192 | } |
| 193 | |
| 194 | // DelInclSource deletes a source is in include list |
| 195 | func (igc *IgmpGroupChannel) DelInclSource(src net.IP) { |
| 196 | mvp := GetApplication().GetMvlanProfileByTag(igc.Mvlan) |
| 197 | /* If the SSM proxy is configured, then we can del the src ip from igc as whatever is in proxy that is final list */ |
| 198 | if _, ok := mvp.Proxy[igc.GroupName]; !ok { |
| 199 | logger.Debugw(ctx, "Deleting Include Source for Channel", log.Fields{"Channel": igc.GroupAddr.String(), "Device": igc.Device, "Src": src}) |
| 200 | for _, igp := range igc.CurReceivers { |
| 201 | if igp.InclSourceIsIn(src) { |
| 202 | logger.Infow(ctx, "Skipping deletion: Source Present for another Receiver", log.Fields{"Receiver": igp.Port}) |
| 203 | return |
| 204 | } |
| 205 | } |
| 206 | for _, igp := range igc.NewReceivers { |
| 207 | if igp.InclSourceIsIn(src) { |
| 208 | logger.Infow(ctx, "Skipping deletion: Source Present for another Receiver", log.Fields{"Receiver": igp.Port}) |
| 209 | return |
| 210 | } |
| 211 | } |
| 212 | } else { |
| 213 | logger.Debug(ctx, "Proxy configured, not Deleting Include Source for Channel") |
| 214 | } |
| 215 | for i, addr := range igc.IncludeList { |
| 216 | if addr.Equal(src) { |
| 217 | igc.IncludeList = append(igc.IncludeList[:i], igc.IncludeList[i+1:]...) |
| 218 | return |
| 219 | } |
| 220 | } |
| 221 | } |
| 222 | |
| 223 | // DelExclSource deletes a source is in exclude list |
| 224 | func (igc *IgmpGroupChannel) DelExclSource(src net.IP) { |
| 225 | logger.Debugw(ctx, "Deleting Exclude Source for Channel", log.Fields{"Channel": igc.GroupAddr.String(), "Device": igc.Device, "Src": src}) |
| 226 | |
| 227 | for _, igp := range igc.CurReceivers { |
| 228 | if igp.ExclSourceIsIn(src) { |
| 229 | logger.Infow(ctx, "Skipping deletion: Source Present for another Receiver", log.Fields{"Receiver": igp.Port}) |
| 230 | return |
| 231 | } |
| 232 | } |
| 233 | for _, igp := range igc.NewReceivers { |
| 234 | if igp.ExclSourceIsIn(src) { |
| 235 | logger.Infow(ctx, "Skipping deletion: Source Present for another Receiver", log.Fields{"Receiver": igp.Port}) |
| 236 | return |
| 237 | } |
| 238 | } |
| 239 | for i, addr := range igc.ExcludeList { |
| 240 | if addr.Equal(src) { |
| 241 | igc.ExcludeList = append(igc.ExcludeList[:i], igc.ExcludeList[i+1:]...) |
| 242 | return |
| 243 | } |
| 244 | } |
| 245 | } |
| 246 | |
| 247 | // ProcessSources process the received list of either included sources or the excluded sources |
| 248 | // The return value indicate sif the group is modified and needs to be informed |
| 249 | // to the upstream multicast servers |
Tinoj Joseph | 07cc537 | 2022-07-18 22:53:51 +0530 | [diff] [blame] | 250 | func (igc *IgmpGroupChannel) ProcessSources(cntx context.Context, port string, ip []net.IP, incl bool) (bool, bool) { |
Tinoj Joseph | cf161be | 2022-07-07 19:47:47 +0530 | [diff] [blame] | 251 | groupChanged := false |
| 252 | groupExclUpdated := false |
| 253 | receiverSrcListEmpty := false |
| 254 | // If the version type is 2, there isn't anything to process here |
| 255 | if igc.Version == IgmpVersion2 && *igc.ServVersion == IgmpVersion2 { |
| 256 | return false, false |
| 257 | } |
| 258 | |
| 259 | igp := igc.GetReceiver(port) |
| 260 | if igp == nil { |
| 261 | logger.Warnw(ctx, "Receiver not found", log.Fields{"Port": port}) |
| 262 | return false, false |
| 263 | } |
| 264 | mvp := GetApplication().GetMvlanProfileByTag(igc.Mvlan) |
| 265 | if incl { |
| 266 | for _, src := range ip { |
| 267 | |
| 268 | if igp.ExclSourceIsIn(src) { |
| 269 | igp.DelExclSource(src) |
| 270 | if igc.ExclSourceIsIn(src) { |
| 271 | igc.DelExclSource(src) |
| 272 | groupChanged = true |
| 273 | } |
| 274 | } |
| 275 | |
| 276 | // If the source is not in the list of include sources for the port |
| 277 | // add it. If so, check also if it is in list of include sources |
| 278 | // at the device level. |
| 279 | if !igp.InclSourceIsIn(src) { |
| 280 | igp.AddInclSource(src) |
| 281 | if !igc.InclSourceIsIn(src) { |
| 282 | igc.AddInclSource(src) |
| 283 | groupChanged = true |
| 284 | } |
| 285 | } |
| 286 | } |
| 287 | /* If any of the existing ip in the source list is removed we need to remove from the list in igp and igc */ |
| 288 | if _, ok := mvp.Proxy[igc.GroupName]; ok { |
| 289 | /* If we get leave message from any subscriber, we do not have to delete the entries in the src list |
| 290 | Only if ther is any modification in the src list by proxy config update only then we need to update */ |
| 291 | if len(ip) != 0 && len(ip) != len(igc.IncludeList) { |
| 292 | for i := len(igc.IncludeList) - 1; i >= 0; i-- { |
| 293 | src := igc.IncludeList[i] |
| 294 | if !IsIPPresent(src, ip) { |
| 295 | igp.DelInclSource(src) |
| 296 | igc.DelInclSource(src) |
| 297 | groupChanged = true |
| 298 | } |
| 299 | } |
| 300 | } |
| 301 | } |
| 302 | } else { |
| 303 | for _, src := range ip { |
| 304 | |
| 305 | if igp.InclSourceIsIn(src) { |
| 306 | igp.DelInclSource(src) |
| 307 | if igc.InclSourceIsIn(src) { |
| 308 | igc.DelInclSource(src) |
| 309 | groupChanged = true |
| 310 | } |
| 311 | if len(igp.IncludeList) == 0 { |
| 312 | receiverSrcListEmpty = true |
| 313 | } |
| 314 | } |
| 315 | |
| 316 | // If the source is not in the list of exclude sources for the port |
| 317 | // add it. If so, check also if it is in list of include sources |
| 318 | // at the device level. |
| 319 | if !igp.ExclSourceIsIn(src) { |
| 320 | igp.AddExclSource(src) |
| 321 | /* If there is any update in the src list of proxy we need to update the igc */ |
| 322 | if _, ok := mvp.Proxy[igc.GroupName]; ok { |
| 323 | if !igc.ExclSourceIsIn(src) { |
| 324 | igc.AddExclSource(src) |
| 325 | groupChanged = true |
| 326 | } |
| 327 | } |
| 328 | } |
| 329 | } |
| 330 | /* If any of the existing ip in the source list is removed we need to remove from the list in igp and igc */ |
| 331 | if _, ok := mvp.Proxy[igc.GroupName]; ok { |
| 332 | if len(ip) != len(igc.ExcludeList) { |
| 333 | for i := len(igc.ExcludeList) - 1; i >= 0; i-- { |
| 334 | src := igc.ExcludeList[i] |
| 335 | if !IsIPPresent(src, ip) { |
| 336 | igp.DelExclSource(src) |
| 337 | igc.DelExclSource(src) |
| 338 | groupChanged = true |
| 339 | } |
| 340 | } |
| 341 | } |
| 342 | } |
| 343 | groupExclUpdated = igc.UpdateExclSource(ip) |
| 344 | } |
Tinoj Joseph | 07cc537 | 2022-07-18 22:53:51 +0530 | [diff] [blame] | 345 | if err := igp.WriteToDb(cntx, igc.Mvlan, igc.GroupAddr, igc.Device); err != nil { |
Tinoj Joseph | cf161be | 2022-07-07 19:47:47 +0530 | [diff] [blame] | 346 | logger.Errorw(ctx, "Igmp group port Write to DB failed", log.Fields{"mvlan": igc.Mvlan, "GroupAddr": igc.GroupAddr}) |
| 347 | } |
| 348 | return (groupChanged || groupExclUpdated), receiverSrcListEmpty |
| 349 | } |
| 350 | |
| 351 | // GetReceiver to get receiver info |
| 352 | func (igc *IgmpGroupChannel) GetReceiver(port string) *IgmpGroupPort { |
| 353 | igp := igc.NewReceivers[port] |
| 354 | if igp == nil { |
| 355 | igp = igc.CurReceivers[port] |
| 356 | } |
| 357 | return igp |
| 358 | } |
| 359 | |
| 360 | // AddReceiver add the receiver to the device and perform other actions such as adding the group |
| 361 | // to the physical device, add members, add flows to point the MC packets to the |
| 362 | // group. Also, send a IGMP report upstream if there is a change in the group |
Tinoj Joseph | 07cc537 | 2022-07-18 22:53:51 +0530 | [diff] [blame] | 363 | func (igc *IgmpGroupChannel) AddReceiver(cntx context.Context, port string, group *layers.IGMPv3GroupRecord, cvlan uint16, pbit uint8) bool { |
Tinoj Joseph | cf161be | 2022-07-07 19:47:47 +0530 | [diff] [blame] | 364 | |
| 365 | var igp *IgmpGroupPort |
| 366 | var groupModified = false |
| 367 | var isNewReceiver = false |
| 368 | |
| 369 | var ip []net.IP |
| 370 | incl := false |
| 371 | mvp := GetApplication().GetMvlanProfileByTag(igc.Mvlan) |
| 372 | if _, ok := mvp.Proxy[igc.GroupName]; ok { |
| 373 | if mvp.Proxy[igc.GroupName].Mode == common.Include { |
| 374 | incl = true |
| 375 | } |
| 376 | ip = mvp.Proxy[igc.GroupName].SourceList |
| 377 | } else if group != nil { |
| 378 | incl = isIncl(group.Type) |
| 379 | ip = group.SourceAddresses |
| 380 | } |
| 381 | logger.Debugw(ctx, "Attempting to add receiver", log.Fields{"Version": igc.Version, "Port": port, "Incl": incl, "srcIp": ip}) |
| 382 | |
| 383 | //logger.Infow(ctx, "Receivers", log.Fields{"New": igc.NewReceivers, "Current": igc.CurReceivers}) |
| 384 | logger.Debugw(ctx, "Receiver Group", log.Fields{"Igd GId": igc.GroupID}) |
| 385 | logger.Debugw(ctx, "Receiver Channel", log.Fields{"Igd addr": igc.GroupAddr}) |
| 386 | logger.Debugw(ctx, "Receiver Mvlan", log.Fields{"Igd mvlan": igc.Mvlan}) |
| 387 | logger.Debugw(ctx, "Receiver Sources", log.Fields{"Igd addr": ip}) |
| 388 | |
| 389 | ponPortID := GetApplication().GetPonPortID(igc.Device, port) |
| 390 | |
| 391 | // Process the IGMP receiver. If it is already in, we should only process the changes |
| 392 | // to source list. |
| 393 | var newRcvExists bool |
| 394 | igp, newRcvExists = igc.NewReceivers[port] |
| 395 | if !newRcvExists { |
| 396 | // Add the receiver to the list of receivers and make the necessary group modification |
| 397 | // if this is the first time the receiver is added |
| 398 | var curRcvExists bool |
| 399 | if igp, curRcvExists = igc.CurReceivers[port]; curRcvExists { |
| 400 | logger.Debugw(ctx, "Existing IGMP receiver", log.Fields{"Group": igc.GroupAddr.String(), "Port": port}) |
| 401 | delete(igc.CurReceivers, port) |
| 402 | igp.QueryTimeoutCount = 0 |
| 403 | igc.NewReceivers[port] = igp |
| 404 | } else { |
| 405 | // New receiver who wasn't part of earlier list |
| 406 | // Need to send out IGMP group modification for this port |
| 407 | igp = NewIgmpGroupPort(port, cvlan, pbit, igc.Version, incl, uint32(ponPortID)) |
| 408 | igc.NewReceivers[port] = igp |
| 409 | isNewReceiver = true |
| 410 | logger.Debugw(ctx, "New IGMP receiver", log.Fields{"Group": igc.GroupAddr.String(), "Port": port}) |
| 411 | if len(igc.NewReceivers) == 1 && len(igc.CurReceivers) == 0 { |
| 412 | groupModified = true |
Tinoj Joseph | 07cc537 | 2022-07-18 22:53:51 +0530 | [diff] [blame] | 413 | igc.AddMcFlow(cntx) |
Tinoj Joseph | cf161be | 2022-07-07 19:47:47 +0530 | [diff] [blame] | 414 | logger.Debugw(ctx, "Added New Flow", log.Fields{"Group": igc.GroupAddr.String(), "Port": port}) |
| 415 | } |
| 416 | if !incl { |
| 417 | igc.Exclude++ |
| 418 | } |
| 419 | } |
| 420 | } |
| 421 | |
| 422 | // Process the include/exclude list which may end up modifying the group |
Tinoj Joseph | 07cc537 | 2022-07-18 22:53:51 +0530 | [diff] [blame] | 423 | if change, _ := igc.ProcessSources(cntx, port, ip, incl); change { |
Tinoj Joseph | cf161be | 2022-07-07 19:47:47 +0530 | [diff] [blame] | 424 | groupModified = true |
| 425 | } |
| 426 | igc.ProcessMode(port, incl) |
| 427 | |
| 428 | // If the group is modified as this is the first receiver or due to include/exclude list modification |
| 429 | // send a report to the upstream multicast servers |
| 430 | if groupModified { |
| 431 | logger.Debug(ctx, "Group Modified and IGMP report sent to the upstream server") |
| 432 | igc.SendReport(false) |
| 433 | } else if newRcvExists { |
| 434 | return false |
| 435 | } |
| 436 | |
| 437 | logger.Debugw(ctx, "Channel Receiver Added", log.Fields{"Group Channel": igc.GroupAddr, "Group Port": igp}) |
| 438 | |
Tinoj Joseph | 07cc537 | 2022-07-18 22:53:51 +0530 | [diff] [blame] | 439 | if err := igc.WriteToDb(cntx); err != nil { |
Tinoj Joseph | cf161be | 2022-07-07 19:47:47 +0530 | [diff] [blame] | 440 | logger.Errorw(ctx, "Igmp group channel Write to DB failed", log.Fields{"mvlan": igc.Mvlan, "GroupAddr": igc.GroupAddr}) |
| 441 | } |
Tinoj Joseph | 07cc537 | 2022-07-18 22:53:51 +0530 | [diff] [blame] | 442 | if err := igp.WriteToDb(cntx, igc.Mvlan, igc.GroupAddr, igc.Device); err != nil { |
Tinoj Joseph | cf161be | 2022-07-07 19:47:47 +0530 | [diff] [blame] | 443 | logger.Errorw(ctx, "Igmp group port Write to DB failed", log.Fields{"mvlan": igc.Mvlan, "GroupAddr": igc.GroupAddr}) |
| 444 | } |
| 445 | return isNewReceiver |
| 446 | } |
| 447 | |
| 448 | // DelReceiver is called when Query expiry happened for a receiver. This removes the receiver from the |
| 449 | // the group |
Tinoj Joseph | 07cc537 | 2022-07-18 22:53:51 +0530 | [diff] [blame] | 450 | func (igc *IgmpGroupChannel) DelReceiver(cntx context.Context, port string, incl bool, srcList []net.IP) bool { |
Tinoj Joseph | cf161be | 2022-07-07 19:47:47 +0530 | [diff] [blame] | 451 | // The receiver may exist either in NewReceiver list or |
| 452 | // the CurReceivers list. Find and remove it from either |
| 453 | // of the lists. |
| 454 | logger.Debugw(ctx, "Deleting Receiver from Channel", log.Fields{"Port": port, "SrcList": srcList, "Incl": incl}) |
| 455 | logger.Debugw(ctx, "New Receivers", log.Fields{"New": igc.NewReceivers}) |
| 456 | logger.Debugw(ctx, "Current Receivers", log.Fields{"Current": igc.CurReceivers}) |
| 457 | |
| 458 | receiversUpdated := false |
Tinoj Joseph | 07cc537 | 2022-07-18 22:53:51 +0530 | [diff] [blame] | 459 | groupModified, receiverSrcListEmpty := igc.ProcessSources(cntx, port, srcList, incl) |
Tinoj Joseph | cf161be | 2022-07-07 19:47:47 +0530 | [diff] [blame] | 460 | |
| 461 | if len(srcList) == 0 || len(igc.IncludeList) == 0 || receiverSrcListEmpty { |
| 462 | if igp, ok := igc.NewReceivers[port]; ok { |
| 463 | logger.Debug(ctx, "Deleting from NewReceivers") |
| 464 | delete(igc.NewReceivers, port) |
| 465 | receiversUpdated = true |
| 466 | if igp.Exclude { |
| 467 | igc.Exclude-- |
| 468 | } |
| 469 | } else { |
| 470 | if igp, ok1 := igc.CurReceivers[port]; ok1 { |
| 471 | logger.Debug(ctx, "Deleting from CurReceivers") |
| 472 | delete(igc.CurReceivers, port) |
| 473 | receiversUpdated = true |
| 474 | if igp.Exclude { |
| 475 | igc.Exclude-- |
| 476 | } |
| 477 | } else { |
| 478 | logger.Debug(ctx, "Receiver doesnot exist. Dropping Igmp leave") |
| 479 | return false |
| 480 | } |
| 481 | } |
Tinoj Joseph | 07cc537 | 2022-07-18 22:53:51 +0530 | [diff] [blame] | 482 | _ = db.DelIgmpRcvr(cntx, igc.Mvlan, igc.GroupAddr, igc.Device, port) |
Tinoj Joseph | cf161be | 2022-07-07 19:47:47 +0530 | [diff] [blame] | 483 | } |
| 484 | |
| 485 | if igc.NumReceivers() == 0 { |
Tinoj Joseph | 07cc537 | 2022-07-18 22:53:51 +0530 | [diff] [blame] | 486 | igc.DelMcFlow(cntx) |
Tinoj Joseph | cf161be | 2022-07-07 19:47:47 +0530 | [diff] [blame] | 487 | mvp := GetApplication().GetMvlanProfileByTag(igc.Mvlan) |
| 488 | /* If proxy is configured and NumReceivers is 0, then we can reset the igc src list so that we send leave */ |
| 489 | if _, ok := mvp.Proxy[igc.GroupName]; ok { |
| 490 | igc.IncludeList = []net.IP{} |
| 491 | } |
| 492 | igc.SendLeaveToServer() |
| 493 | logger.Debugw(ctx, "Deleted the receiver Flow", log.Fields{"Num Receivers": igc.NumReceivers()}) |
| 494 | return true |
| 495 | } |
| 496 | if groupModified { |
| 497 | igc.SendReport(false) |
| 498 | logger.Infow(ctx, "Updated SourceList for Channel", log.Fields{"Current": igc.CurReceivers, "New": igc.NewReceivers}) |
| 499 | } |
Tinoj Joseph | 07cc537 | 2022-07-18 22:53:51 +0530 | [diff] [blame] | 500 | if err := igc.WriteToDb(cntx); err != nil { |
Tinoj Joseph | cf161be | 2022-07-07 19:47:47 +0530 | [diff] [blame] | 501 | logger.Errorw(ctx, "Igmp group channel Write to DB failed", log.Fields{"mvlan": igc.Mvlan, "GroupAddr": igc.GroupAddr}) |
| 502 | } |
| 503 | logger.Infow(ctx, "Updated Receiver info for Channel", log.Fields{"Current": igc.CurReceivers, "New": igc.NewReceivers}) |
| 504 | |
| 505 | return receiversUpdated |
| 506 | } |
| 507 | |
| 508 | // DelAllReceivers deletes all receiver for the provided igmp device |
Tinoj Joseph | 07cc537 | 2022-07-18 22:53:51 +0530 | [diff] [blame] | 509 | func (igc *IgmpGroupChannel) DelAllReceivers(cntx context.Context) { |
Tinoj Joseph | cf161be | 2022-07-07 19:47:47 +0530 | [diff] [blame] | 510 | logger.Infow(ctx, "Deleting All Receiver for Channel", log.Fields{"Device": igc.Device, "Channel": igc.GroupAddr.String()}) |
Tinoj Joseph | 07cc537 | 2022-07-18 22:53:51 +0530 | [diff] [blame] | 511 | _ = db.DelAllIgmpRcvr(cntx, igc.Mvlan, igc.GroupAddr, igc.Device) |
Tinoj Joseph | cf161be | 2022-07-07 19:47:47 +0530 | [diff] [blame] | 512 | igc.Exclude = 0 |
Tinoj Joseph | 07cc537 | 2022-07-18 22:53:51 +0530 | [diff] [blame] | 513 | igc.DelMcFlow(cntx) |
Tinoj Joseph | cf161be | 2022-07-07 19:47:47 +0530 | [diff] [blame] | 514 | igc.SendLeaveToServer() |
| 515 | logger.Infow(ctx, "MC Flow deleted and Leave sent", log.Fields{"Channel": igc.GroupAddr.String(), "Device": igc.Device}) |
| 516 | } |
| 517 | |
| 518 | // Igmpv2ReportPacket build an IGMPv2 Report for the upstream servers |
| 519 | func (igc *IgmpGroupChannel) Igmpv2ReportPacket() ([]byte, error) { |
| 520 | logger.Debugw(ctx, "Buidling IGMP version 2 Report", log.Fields{"Device": igc.Device}) |
| 521 | return IgmpReportv2Packet(igc.GroupAddr, igc.Mvlan, (*igc.proxyCfg).IgmpCos, **igc.IgmpProxyIP) |
| 522 | } |
| 523 | |
| 524 | // Igmpv3ReportPacket build an IGMPv3 Report for the upstream servers |
| 525 | func (igc *IgmpGroupChannel) Igmpv3ReportPacket() ([]byte, error) { |
| 526 | logger.Debugw(ctx, "Buidling IGMP version 3 Report", log.Fields{"Device": igc.Device, "Exclude": igc.Exclude}) |
| 527 | if igc.Exclude > 0 { |
| 528 | return Igmpv3ReportPacket(igc.GroupAddr, igc.Mvlan, (*igc.proxyCfg).IgmpCos, **igc.IgmpProxyIP, false, igc.ExcludeList) |
| 529 | } |
| 530 | return Igmpv3ReportPacket(igc.GroupAddr, igc.Mvlan, (*igc.proxyCfg).IgmpCos, **igc.IgmpProxyIP, true, igc.IncludeList) |
| 531 | } |
| 532 | |
| 533 | // SendReport send a consolidated report to the server |
| 534 | func (igc *IgmpGroupChannel) SendReport(isQuery bool) { |
| 535 | var report []byte |
| 536 | var err error |
| 537 | logger.Debugw(ctx, "Checking Version", log.Fields{"IGC Version": igc.Version, "Proxy Version": (*igc.proxyCfg).IgmpVerToServer, |
| 538 | "Result": (getVersion((*igc.proxyCfg).IgmpVerToServer) == IgmpVersion2)}) |
| 539 | |
| 540 | /** |
| 541 | +------------------------------------------------------------------------+ |
| 542 | | IGMP version(towards BNG) Configured at VGC | |
| 543 | +-------------------------------+----------------------------------------+ |
| 544 | | v2 | v3 | |
| 545 | +===================+==========+===============================+========================================+ |
| 546 | | Received From RG | V2 Join | Process and Send as V2 to BNG | Process, Convert to v3 and Send to BNG | |
| 547 | | | | | Process, Send as v2, if the BNG is v2 | |
| 548 | +===================+----------+-------------------------------+----------------------------------------+ |
| 549 | | V3 Join | Process and Send as V2 to BNG | Process, Send v3 to BNG | |
| 550 | | | | Process, Convert, Send as v2, if the | |
| 551 | | | | BNG is v2 | |
| 552 | +===================+==========+===============================+========================================+ |
| 553 | | Received From BNG | V2 Query | V2 response to BNG | V2 response to BNG | |
| 554 | +===================+----------+-------------------------------+----------------------------------------+ |
| 555 | | V3 Query | Discard | V3 response to BNG | |
| 556 | +==========+===============================+========================================+ |
| 557 | */ |
| 558 | // igc.Version: igmp version received from RG. |
| 559 | // igc.ServVersion: igmp version received from BNG or IgmpVerToServer present in proxy igmp conf. |
| 560 | |
| 561 | if isQuery && *igc.ServVersion == IgmpVersion3 && getVersion((*igc.proxyCfg).IgmpVerToServer) == IgmpVersion2 { |
| 562 | // This is the last scenario where we must discard the query processing. |
| 563 | logger.Debug(ctx, "Dropping query packet since the server verion is v3 but igmp proxy version is v2") |
| 564 | return |
| 565 | } |
| 566 | |
| 567 | if *igc.ServVersion == IgmpVersion2 || getVersion((*igc.proxyCfg).IgmpVerToServer) == IgmpVersion2 { |
| 568 | report, err = igc.Igmpv2ReportPacket() |
| 569 | } else { |
| 570 | report, err = igc.Igmpv3ReportPacket() |
| 571 | } |
| 572 | if err != nil { |
| 573 | logger.Warnw(ctx, "Error Preparing Report", log.Fields{"Device": igc.Device, "Ver": igc.Version, "Reason": err.Error()}) |
| 574 | return |
| 575 | } |
| 576 | nni, err := GetApplication().GetNniPort(igc.Device) |
| 577 | if err == nil { |
| 578 | _ = cntlr.GetController().PacketOutReq(igc.Device, nni, nni, report, false) |
| 579 | } else { |
| 580 | logger.Warnw(ctx, "Didn't find NNI port", log.Fields{"Device": igc.Device}) |
| 581 | } |
| 582 | } |
| 583 | |
| 584 | // AddMcFlow adds flow to the device when the first receiver joins |
Tinoj Joseph | 07cc537 | 2022-07-18 22:53:51 +0530 | [diff] [blame] | 585 | func (igc *IgmpGroupChannel) AddMcFlow(cntx context.Context) { |
Tinoj Joseph | cf161be | 2022-07-07 19:47:47 +0530 | [diff] [blame] | 586 | flow, err := igc.BuildMcFlow() |
| 587 | if err != nil { |
| 588 | logger.Warnw(ctx, "MC Flow Build Failed", log.Fields{"Reason": err.Error()}) |
| 589 | return |
| 590 | } |
| 591 | port, _ := GetApplication().GetNniPort(igc.Device) |
Tinoj Joseph | 07cc537 | 2022-07-18 22:53:51 +0530 | [diff] [blame] | 592 | _ = cntlr.GetController().AddFlows(cntx, port, igc.Device, flow) |
Tinoj Joseph | cf161be | 2022-07-07 19:47:47 +0530 | [diff] [blame] | 593 | } |
| 594 | |
| 595 | // DelMcFlow deletes flow from the device when the last receiver leaves |
Tinoj Joseph | 07cc537 | 2022-07-18 22:53:51 +0530 | [diff] [blame] | 596 | func (igc *IgmpGroupChannel) DelMcFlow(cntx context.Context) { |
Tinoj Joseph | cf161be | 2022-07-07 19:47:47 +0530 | [diff] [blame] | 597 | flow, err := igc.BuildMcFlow() |
| 598 | if err != nil { |
| 599 | logger.Warnw(ctx, "MC Flow Build Failed", log.Fields{"Reason": err.Error()}) |
| 600 | return |
| 601 | } |
| 602 | flow.ForceAction = true |
| 603 | device := GetApplication().GetDevice(igc.Device) |
| 604 | |
| 605 | if mvpIntf, _ := GetApplication().MvlanProfilesByTag.Load(igc.Mvlan); mvpIntf != nil { |
| 606 | mvp := mvpIntf.(*MvlanProfile) |
Tinoj Joseph | 07cc537 | 2022-07-18 22:53:51 +0530 | [diff] [blame] | 607 | err := mvp.DelFlows(cntx, device, flow) |
Tinoj Joseph | cf161be | 2022-07-07 19:47:47 +0530 | [diff] [blame] | 608 | if err != nil { |
| 609 | logger.Warnw(ctx, "Delering IGMP Flow for device failed ", log.Fields{"Device": device, "err": err}) |
| 610 | } |
| 611 | } |
| 612 | } |
| 613 | |
| 614 | // BuildMcFlow builds the flow using which it is added/deleted |
| 615 | func (igc *IgmpGroupChannel) BuildMcFlow() (*of.VoltFlow, error) { |
| 616 | flow := &of.VoltFlow{} |
| 617 | flow.SubFlows = make(map[uint64]*of.VoltSubFlow) |
| 618 | //va := GetApplication() |
| 619 | logger.Infow(ctx, "Building Mcast flow", log.Fields{"Mcast Group": igc.GroupAddr.String(), "Mvlan": igc.Mvlan.String()}) |
| 620 | uintGroupAddr := ipv4ToUint(igc.GroupAddr) |
| 621 | subFlow := of.NewVoltSubFlow() |
| 622 | subFlow.SetMatchVlan(igc.Mvlan) |
| 623 | subFlow.SetIpv4Match() |
| 624 | subFlow.SetMatchDstIpv4(igc.GroupAddr) |
| 625 | mvp := GetApplication().GetMvlanProfileByTag(igc.Mvlan) |
| 626 | //nni, err := va.GetNniPort(igc.Device) |
| 627 | //if err != nil { |
| 628 | // return nil, err |
| 629 | //} |
| 630 | //inport, err := va.GetPortID(nni) |
| 631 | //if err != nil { |
| 632 | // return nil, err |
| 633 | //} |
| 634 | //subFlow.SetInPort(inport) |
| 635 | subFlow.SetOutGroup(igc.GroupID) |
| 636 | cookiePort := uintGroupAddr |
| 637 | subFlow.Cookie = uint64(cookiePort)<<32 | uint64(igc.Mvlan) |
| 638 | subFlow.Priority = of.McFlowPriority |
| 639 | metadata := uint64(mvp.PonVlan) |
| 640 | subFlow.SetTableMetadata(metadata) |
| 641 | |
| 642 | flow.SubFlows[subFlow.Cookie] = subFlow |
| 643 | logger.Infow(ctx, "Built Mcast flow", log.Fields{"cookie": subFlow.Cookie, "subflow": subFlow}) |
| 644 | return flow, nil |
| 645 | } |
| 646 | |
| 647 | // IgmpLeaveToServer sends IGMP leave to server. Called when the last receiver leaves the group |
| 648 | func (igc *IgmpGroupChannel) IgmpLeaveToServer() { |
| 649 | if leave, err := IgmpLeavePacket(igc.GroupAddr, igc.Mvlan, (*igc.proxyCfg).IgmpCos, **igc.IgmpProxyIP); err == nil { |
| 650 | nni, err1 := GetApplication().GetNniPort(igc.Device) |
| 651 | if err1 == nil { |
| 652 | _ = cntlr.GetController().PacketOutReq(igc.Device, nni, nni, leave, false) |
| 653 | } |
| 654 | } |
| 655 | } |
| 656 | |
| 657 | // SendLeaveToServer delete the group when the last receiver leaves the group |
| 658 | func (igc *IgmpGroupChannel) SendLeaveToServer() { |
| 659 | /** |
| 660 | +-------------------------------------------------------------------------+ |
| 661 | | IGMP version(towards BNG) Configured at VGC | |
| 662 | +-------------------------------+-----------------------------------------+ |
| 663 | | v2 | v3 | |
| 664 | +===================+==========+===============================+=========================================+ |
| 665 | | Received From RG | V2 Leave | Process and Send as V2 to BNG | Process, Convert to V3 and Send to BNG/ | |
| 666 | | | | | Process, Send as V2, if the BNG is V2 | |
| 667 | +===================+----------+-------------------------------+-----------------------------------------+ |
| 668 | | V3 Leave | Process and Send as V2 to BNG | Process, Send V3 to BNG | |
| 669 | | | | Process, Convert, Send as V2, if the | |
| 670 | | | | BNG is v2 | |
| 671 | +==========+===============================+=========================================+ |
| 672 | */ |
| 673 | // igc.Version: igmp version received from RG. |
| 674 | // igc.ServVersion: igmp version received from BNG or IgmpVerToServer present in proxy igmp conf. |
| 675 | |
| 676 | logger.Debugw(ctx, "Sending IGMP leave upstream", log.Fields{"Device": igc.Device}) |
| 677 | if *igc.ServVersion == IgmpVersion2 || getVersion((*igc.proxyCfg).IgmpVerToServer) == IgmpVersion2 { |
| 678 | igc.IgmpLeaveToServer() |
| 679 | } else { |
| 680 | igc.SendReport(false) |
| 681 | } |
| 682 | } |
| 683 | |
| 684 | // NumReceivers returns total number of receivers left on the group |
| 685 | func (igc *IgmpGroupChannel) NumReceivers() uint32 { |
| 686 | return uint32(len(igc.CurReceivers) + len(igc.NewReceivers)) |
| 687 | } |
| 688 | |
| 689 | // SendQuery sends query to the receivers for counting purpose |
| 690 | func (igc *IgmpGroupChannel) SendQuery() { |
| 691 | //var b []byte |
| 692 | //var err error |
| 693 | for portKey, port := range igc.NewReceivers { |
| 694 | igc.CurReceivers[portKey] = port |
| 695 | } |
| 696 | |
| 697 | igc.NewReceivers = make(map[string]*IgmpGroupPort) |
| 698 | |
| 699 | logger.Debugw(ctx, "Sending Query to receivers", log.Fields{"Receivers": igc.CurReceivers}) |
| 700 | for port, groupPort := range igc.CurReceivers { |
| 701 | if port == StaticPort { |
| 702 | continue |
| 703 | } |
| 704 | if queryPkt, err := igc.buildQuery(igc.GroupAddr, of.VlanType(groupPort.CVlan), groupPort.Pbit); err == nil { |
| 705 | _ = cntlr.GetController().PacketOutReq(igc.Device, port, port, queryPkt, false) |
| 706 | logger.Debugw(ctx, "Query Sent", log.Fields{"Device": igc.Device, "Port": port, "Packet": queryPkt}) |
| 707 | } else { |
| 708 | logger.Warnw(ctx, "Query Creation Failed", log.Fields{"Reason": err.Error()}) |
| 709 | } |
| 710 | } |
| 711 | |
| 712 | } |
| 713 | |
| 714 | // buildQuery to build query packet |
| 715 | func (igc *IgmpGroupChannel) buildQuery(groupAddr net.IP, cVlan of.VlanType, pbit uint8) ([]byte, error) { |
| 716 | if igc.Version == IgmpVersion2 { |
| 717 | return Igmpv2QueryPacket(igc.GroupAddr, cVlan, **igc.IgmpProxyIP, pbit, (*igc.proxyCfg).MaxResp) |
| 718 | } |
| 719 | return Igmpv3QueryPacket(igc.GroupAddr, cVlan, **igc.IgmpProxyIP, pbit, (*igc.proxyCfg).MaxResp) |
| 720 | } |
| 721 | |
| 722 | // ProcessMode process the received mode and updated the igp |
| 723 | func (igc *IgmpGroupChannel) ProcessMode(port string, incl bool) { |
| 724 | /* Update the mode in igp if the mode has changed */ |
| 725 | igp := igc.GetReceiver(port) |
| 726 | if igp.Exclude && incl { |
| 727 | igp.Exclude = !incl |
| 728 | if igc.Exclude > 0 { |
| 729 | igc.Exclude-- |
| 730 | } |
| 731 | } else if !incl && !igp.Exclude { |
| 732 | igp.Exclude = !incl |
| 733 | igc.Exclude++ |
| 734 | } |
| 735 | } |
| 736 | |