blob: 0bbfe6d1b2f08268ed8852b42741608616af7668 [file] [log] [blame]
David K. Bainbridged77028f2017-08-01 12:47:55 -07001/*
Brian O'Connor4d084702017-08-03 22:45:58 -07002 * Copyright 2017-present Open Networking Foundation
David K. Bainbridged77028f2017-08-01 12:47:55 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
ke han81a38b92017-03-10 18:41:44 +080016package org.opencord.igmpproxy;
17
18import com.google.common.collect.Maps;
Esin Karamaneff10392019-06-27 18:09:13 +000019import com.google.common.collect.Sets;
Matteo Scandolo7482bbe2020-02-12 14:37:09 -080020import org.onosproject.net.Device;
21import org.opencord.sadis.BaseInformationService;
22import org.opencord.sadis.SadisService;
23import org.opencord.sadis.SubscriberAndDeviceInformation;
Carmelo Casconebef302e2019-11-14 19:58:20 -080024import org.osgi.service.component.annotations.Activate;
25import org.osgi.service.component.annotations.Component;
26import org.osgi.service.component.annotations.Deactivate;
27import org.osgi.service.component.annotations.Reference;
28import org.osgi.service.component.annotations.ReferenceCardinality;
ke han81a38b92017-03-10 18:41:44 +080029import org.onlab.packet.EthType;
30import org.onlab.packet.Ethernet;
31import org.onlab.packet.IGMP;
32import org.onlab.packet.IGMPGroup;
33import org.onlab.packet.IGMPMembership;
34import org.onlab.packet.IGMPQuery;
35import org.onlab.packet.IPv4;
36import org.onlab.packet.Ip4Address;
37import org.onlab.packet.IpAddress;
38import org.onlab.packet.VlanId;
39import org.onosproject.core.ApplicationId;
40import org.onosproject.core.CoreService;
ke han81a38b92017-03-10 18:41:44 +080041import org.onosproject.mastership.MastershipService;
42import org.onosproject.net.AnnotationKeys;
43import org.onosproject.net.ConnectPoint;
44import org.onosproject.net.DeviceId;
45import org.onosproject.net.Port;
46import org.onosproject.net.PortNumber;
47import org.onosproject.net.config.ConfigFactory;
48import org.onosproject.net.config.NetworkConfigEvent;
49import org.onosproject.net.config.NetworkConfigListener;
50import org.onosproject.net.config.NetworkConfigRegistry;
Jonathan Hart488e1142018-05-02 17:30:05 -070051import org.onosproject.net.config.basics.McastConfig;
ke han81a38b92017-03-10 18:41:44 +080052import org.onosproject.net.config.basics.SubjectFactories;
53import org.onosproject.net.device.DeviceEvent;
54import org.onosproject.net.device.DeviceListener;
55import org.onosproject.net.device.DeviceService;
56import org.onosproject.net.flow.DefaultTrafficTreatment;
57import org.onosproject.net.flow.FlowRuleService;
58import org.onosproject.net.flow.criteria.Criteria;
59import org.onosproject.net.flowobjective.DefaultFilteringObjective;
60import org.onosproject.net.flowobjective.FilteringObjective;
61import org.onosproject.net.flowobjective.FlowObjectiveService;
62import org.onosproject.net.flowobjective.Objective;
63import org.onosproject.net.flowobjective.ObjectiveContext;
64import org.onosproject.net.flowobjective.ObjectiveError;
Esin Karamaneff10392019-06-27 18:09:13 +000065import org.onosproject.mcast.api.McastRoute;
66import org.onosproject.mcast.api.MulticastRouteService;
ke han81a38b92017-03-10 18:41:44 +080067import org.onosproject.net.packet.InboundPacket;
68import org.onosproject.net.packet.PacketContext;
69import org.onosproject.net.packet.PacketProcessor;
70import org.onosproject.net.packet.PacketService;
ke han81a38b92017-03-10 18:41:44 +080071import org.slf4j.Logger;
72import org.slf4j.LoggerFactory;
73
74import java.util.ArrayList;
75import java.util.Collection;
Esin Karamaneff10392019-06-27 18:09:13 +000076import java.util.HashSet;
ke han81a38b92017-03-10 18:41:44 +080077import java.util.Iterator;
78import java.util.List;
79import java.util.Map;
Esin Karamaneff10392019-06-27 18:09:13 +000080import java.util.Optional;
ke han81a38b92017-03-10 18:41:44 +080081import java.util.Set;
82import java.util.TimerTask;
83import java.util.concurrent.ConcurrentHashMap;
Esin Karamanb38700c2019-09-17 13:01:25 +000084import java.util.concurrent.ExecutorService;
ke han81a38b92017-03-10 18:41:44 +080085import java.util.concurrent.Executors;
86import java.util.concurrent.ScheduledExecutorService;
87import java.util.concurrent.TimeUnit;
88
Esin Karamanb38700c2019-09-17 13:01:25 +000089import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
90import static org.onlab.util.Tools.groupedThreads;
91
ke han81a38b92017-03-10 18:41:44 +080092/**
93 * Igmp process application, use proxy mode, support first join/ last leave , fast leave
94 * period query and keep alive, packet out igmp message to uplink port features.
95 */
96@Component(immediate = true)
97public class IgmpManager {
98
Matteo Scandolo7482bbe2020-02-12 14:37:09 -080099 private static final String APP_NAME = "org.opencord.igmpproxy";
100
ke han81a38b92017-03-10 18:41:44 +0800101 private static final Class<IgmpproxyConfig> IGMPPROXY_CONFIG_CLASS =
102 IgmpproxyConfig.class;
103 private static final Class<IgmpproxySsmTranslateConfig> IGMPPROXY_SSM_CONFIG_CLASS =
104 IgmpproxySsmTranslateConfig.class;
105 private static final Class<McastConfig> MCAST_CONFIG_CLASS =
106 McastConfig.class;
Esin Karamaneff10392019-06-27 18:09:13 +0000107
ke han81a38b92017-03-10 18:41:44 +0800108 public static Map<String, GroupMember> groupMemberMap = Maps.newConcurrentMap();
109 private static ApplicationId appId;
Matteo Scandolo7482bbe2020-02-12 14:37:09 -0800110
ke han81a38b92017-03-10 18:41:44 +0800111 private static int unSolicitedTimeout = 3; // unit is 1 sec
112 private static int keepAliveCount = 3;
113 private static int lastQueryInterval = 2; //unit is 1 sec
114 private static int lastQueryCount = 2;
115 private static boolean fastLeave = true;
116 private static boolean withRAUplink = true;
117 private static boolean withRADownlink = false;
118 private static boolean periodicQuery = true;
119 private static short mvlan = 4000;
120 private static byte igmpCos = 7;
121 public static boolean connectPointMode = true;
122 public static ConnectPoint connectPoint = null;
Esin Karamaneff10392019-06-27 18:09:13 +0000123 private static ConnectPoint sourceDeviceAndPort = null;
124 private static boolean enableIgmpProvisioning = false;
Esin Karamanb38700c2019-09-17 13:01:25 +0000125 private static boolean igmpOnPodBasis = false;
Esin Karamaneff10392019-06-27 18:09:13 +0000126
127 private static final Integer MAX_PRIORITY = 10000;
128 private static final String INSTALLED = "installed";
129 private static final String REMOVED = "removed";
130 private static final String INSTALLATION = "installation";
131 private static final String REMOVAL = "removal";
ke han81a38b92017-03-10 18:41:44 +0800132
ke han29af27b2017-09-08 10:29:12 +0800133 private static boolean pimSSmInterworking = false;
134 private static final String DEFAULT_PIMSSM_HOST = "127.0.0.1";
ke han81a38b92017-03-10 18:41:44 +0800135 private final ScheduledExecutorService scheduledExecutorService =
136 Executors.newScheduledThreadPool(1);
Matteo Scandolo7482bbe2020-02-12 14:37:09 -0800137
Carmelo Casconebef302e2019-11-14 19:58:20 -0800138 @Reference(cardinality = ReferenceCardinality.MANDATORY)
ke han81a38b92017-03-10 18:41:44 +0800139 protected CoreService coreService;
Matteo Scandolo7482bbe2020-02-12 14:37:09 -0800140
Carmelo Casconebef302e2019-11-14 19:58:20 -0800141 @Reference(cardinality = ReferenceCardinality.MANDATORY)
ke han81a38b92017-03-10 18:41:44 +0800142 protected PacketService packetService;
Matteo Scandolo7482bbe2020-02-12 14:37:09 -0800143
Carmelo Casconebef302e2019-11-14 19:58:20 -0800144 @Reference(cardinality = ReferenceCardinality.MANDATORY)
ke han81a38b92017-03-10 18:41:44 +0800145 protected MastershipService mastershipService;
Matteo Scandolo7482bbe2020-02-12 14:37:09 -0800146
Carmelo Casconebef302e2019-11-14 19:58:20 -0800147 @Reference(cardinality = ReferenceCardinality.MANDATORY)
ke han81a38b92017-03-10 18:41:44 +0800148 protected FlowRuleService flowRuleService;
Matteo Scandolo7482bbe2020-02-12 14:37:09 -0800149
Carmelo Casconebef302e2019-11-14 19:58:20 -0800150 @Reference(cardinality = ReferenceCardinality.MANDATORY)
ke han81a38b92017-03-10 18:41:44 +0800151 protected DeviceService deviceService;
Matteo Scandolo7482bbe2020-02-12 14:37:09 -0800152
Carmelo Casconebef302e2019-11-14 19:58:20 -0800153 @Reference(cardinality = ReferenceCardinality.MANDATORY)
ke han81a38b92017-03-10 18:41:44 +0800154 protected FlowObjectiveService flowObjectiveService;
Matteo Scandolo7482bbe2020-02-12 14:37:09 -0800155
Carmelo Casconebef302e2019-11-14 19:58:20 -0800156 @Reference(cardinality = ReferenceCardinality.MANDATORY)
ke han81a38b92017-03-10 18:41:44 +0800157 protected NetworkConfigRegistry networkConfig;
Matteo Scandolo7482bbe2020-02-12 14:37:09 -0800158
Carmelo Casconebef302e2019-11-14 19:58:20 -0800159 @Reference(cardinality = ReferenceCardinality.MANDATORY)
ke han81a38b92017-03-10 18:41:44 +0800160 protected MulticastRouteService multicastService;
Matteo Scandolo7482bbe2020-02-12 14:37:09 -0800161
162 @Reference(cardinality = ReferenceCardinality.MANDATORY)
163 protected SadisService sadisService;
164
ke han81a38b92017-03-10 18:41:44 +0800165 private IgmpPacketProcessor processor = new IgmpPacketProcessor();
166 private Logger log = LoggerFactory.getLogger(getClass());
167 private ApplicationId coreAppId;
168 private Map<Ip4Address, Ip4Address> ssmTranslateTable = new ConcurrentHashMap<>();
Esin Karamaneff10392019-06-27 18:09:13 +0000169
ke han81a38b92017-03-10 18:41:44 +0800170 private InternalNetworkConfigListener configListener =
171 new InternalNetworkConfigListener();
172 private DeviceListener deviceListener = new InternalDeviceListener();
Matteo Scandolo7482bbe2020-02-12 14:37:09 -0800173
ke han81a38b92017-03-10 18:41:44 +0800174 private ConfigFactory<ApplicationId, IgmpproxyConfig> igmpproxyConfigFactory =
175 new ConfigFactory<ApplicationId, IgmpproxyConfig>(
176 SubjectFactories.APP_SUBJECT_FACTORY, IGMPPROXY_CONFIG_CLASS, "igmpproxy") {
177 @Override
178 public IgmpproxyConfig createConfig() {
179 return new IgmpproxyConfig();
180 }
181 };
182 private ConfigFactory<ApplicationId, IgmpproxySsmTranslateConfig> igmpproxySsmConfigFactory =
183 new ConfigFactory<ApplicationId, IgmpproxySsmTranslateConfig>(
184 SubjectFactories.APP_SUBJECT_FACTORY, IGMPPROXY_SSM_CONFIG_CLASS, "ssmTranslate", true) {
185 @Override
186 public IgmpproxySsmTranslateConfig createConfig() {
187 return new IgmpproxySsmTranslateConfig();
188 }
189 };
Esin Karamaneff10392019-06-27 18:09:13 +0000190
ke han81a38b92017-03-10 18:41:44 +0800191 private int maxResp = 10; //unit is 1 sec
192 private int keepAliveInterval = 120; //unit is 1 sec
193
Esin Karamanb38700c2019-09-17 13:01:25 +0000194 private ExecutorService eventExecutor;
195
ke han81a38b92017-03-10 18:41:44 +0800196 public static int getUnsolicitedTimeout() {
197 return unSolicitedTimeout;
198 }
199
Matteo Scandolo7482bbe2020-02-12 14:37:09 -0800200 protected BaseInformationService<SubscriberAndDeviceInformation> subsService;
201 private static Map<DeviceId, SubscriberAndDeviceInformation> oltData = new ConcurrentHashMap<>();
202
203
ke han81a38b92017-03-10 18:41:44 +0800204 @Activate
205 protected void activate() {
Matteo Scandolo7482bbe2020-02-12 14:37:09 -0800206 appId = coreService.registerApplication(APP_NAME);
ke han81a38b92017-03-10 18:41:44 +0800207 coreAppId = coreService.registerApplication(CoreService.CORE_APP_NAME);
208 packetService.addProcessor(processor, PacketProcessor.director(4));
209 IgmpSender.init(packetService, mastershipService);
210
ke han81a38b92017-03-10 18:41:44 +0800211 networkConfig.registerConfigFactory(igmpproxySsmConfigFactory);
212 networkConfig.registerConfigFactory(igmpproxyConfigFactory);
213 networkConfig.addListener(configListener);
214
215 configListener.reconfigureNetwork(networkConfig.getConfig(appId, IGMPPROXY_CONFIG_CLASS));
216 configListener.reconfigureSsmTable(networkConfig.getConfig(appId, IGMPPROXY_SSM_CONFIG_CLASS));
217
Matteo Scandolo7482bbe2020-02-12 14:37:09 -0800218 subsService = sadisService.getSubscriberInfoService();
ke han81a38b92017-03-10 18:41:44 +0800219
Matteo Scandolo7482bbe2020-02-12 14:37:09 -0800220 networkConfig.getSubjects(DeviceId.class).forEach(subject -> {
221 SubscriberAndDeviceInformation olt = subsService.get(subject.toString());
222 if (olt != null) {
223 oltData.put(subject, olt);
224 }
225 });
226
ke han81a38b92017-03-10 18:41:44 +0800227 if (connectPointMode) {
228 provisionConnectPointFlows();
229 } else {
230 provisionUplinkFlows();
231 }
232
233 McastConfig config = networkConfig.getConfig(coreAppId, MCAST_CONFIG_CLASS);
234 if (config != null) {
235 mvlan = config.egressVlan().toShort();
Deepa Vaddireddy88134a42017-07-05 12:16:03 +0530236 IgmpSender.getInstance().setMvlan(mvlan);
ke han81a38b92017-03-10 18:41:44 +0800237 }
238 deviceService.addListener(deviceListener);
Sonal Kasliwalddc3ff22019-11-18 11:52:49 +0000239 scheduledExecutorService.scheduleAtFixedRate(new IgmpProxyTimerTask(), 1000, 1000, TimeUnit.MILLISECONDS);
Esin Karamanb38700c2019-09-17 13:01:25 +0000240 eventExecutor = newSingleThreadScheduledExecutor(groupedThreads("cord/igmpproxy",
241 "events-igmp-%d", log));
ke han81a38b92017-03-10 18:41:44 +0800242
243 log.info("Started");
244 }
245
246 @Deactivate
247 protected void deactivate() {
248 scheduledExecutorService.shutdown();
Esin Karamanb38700c2019-09-17 13:01:25 +0000249 eventExecutor.shutdown();
ke han81a38b92017-03-10 18:41:44 +0800250
251 // de-register and null our handler
252 networkConfig.removeListener(configListener);
ke han81a38b92017-03-10 18:41:44 +0800253 networkConfig.unregisterConfigFactory(igmpproxyConfigFactory);
254 networkConfig.unregisterConfigFactory(igmpproxySsmConfigFactory);
255 deviceService.removeListener(deviceListener);
256 packetService.removeProcessor(processor);
257 flowRuleService.removeFlowRulesById(appId);
258
259 log.info("Stopped");
260 }
261
262 protected Ip4Address getDeviceIp(DeviceId ofDeviceId) {
263 try {
264 String[] mgmtAddress = deviceService.getDevice(ofDeviceId)
265 .annotations().value(AnnotationKeys.MANAGEMENT_ADDRESS).split(":");
266 return Ip4Address.valueOf(mgmtAddress[0]);
267 } catch (Exception ex) {
268 log.info("No valid Ipaddress for " + ofDeviceId.toString());
269 return null;
270 }
271 }
272
273 private void processIgmpQuery(IGMPQuery igmpQuery, ConnectPoint cp, int maxResp) {
274
275 DeviceId deviceId = cp.deviceId();
276 Ip4Address gAddr = igmpQuery.getGaddr().getIp4Address();
Deepa Vaddireddybcd52352017-09-21 05:04:48 +0000277 maxResp = calculateMaxResp(maxResp);
278 if (gAddr != null && !gAddr.isZero()) {
279 StateMachine.specialQuery(deviceId, gAddr, maxResp);
280 } else {
281 StateMachine.generalQuery(deviceId, maxResp);
282 }
283 }
ke han81a38b92017-03-10 18:41:44 +0800284
Deepa Vaddireddybcd52352017-09-21 05:04:48 +0000285 private void processIgmpConnectPointQuery(IGMPQuery igmpQuery, ConnectPoint cp, int maxResp) {
286
287 DeviceId deviceId = cp.deviceId();
288 Ip4Address gAddr = igmpQuery.getGaddr().getIp4Address();
289 maxResp = calculateMaxResp(maxResp);
290 //The query is received on the ConnectPoint
291 // send query accordingly to the registered OLT devices.
292 if (gAddr != null && !gAddr.isZero()) {
293 for (DeviceId devId : oltData.keySet()) {
294 StateMachine.specialQuery(devId, gAddr, maxResp);
295 }
296 } else {
297 //Don't know which group is targeted by the query
298 //So query all the members(in all the OLTs) and proxy their reports
299 StateMachine.generalQuery(maxResp);
300 }
301 }
302
303
304 private int calculateMaxResp(int maxResp) {
ke han81a38b92017-03-10 18:41:44 +0800305 if (maxResp >= 128) {
306 int mant = maxResp & 0xf;
307 int exp = (maxResp >> 4) & 0x7;
308 maxResp = (mant | 0x10) << (exp + 3);
309 }
310
Deepa Vaddireddybcd52352017-09-21 05:04:48 +0000311 return (maxResp + 5) / 10;
ke han81a38b92017-03-10 18:41:44 +0800312 }
313
314 private Ip4Address ssmTranslateRoute(IpAddress group) {
315 return ssmTranslateTable.get(group);
316 }
317
318 private void processIgmpReport(IGMPMembership igmpGroup, VlanId vlan, ConnectPoint cp, byte igmpType) {
319 DeviceId deviceId = cp.deviceId();
320 PortNumber portNumber = cp.port();
321
322 Ip4Address groupIp = igmpGroup.getGaddr().getIp4Address();
323 if (!groupIp.isMulticast()) {
324 log.info(groupIp.toString() + " is not a valid group address");
325 return;
326 }
327 Ip4Address srcIp = getDeviceIp(deviceId);
328
329 byte recordType = igmpGroup.getRecordType();
330 boolean join = false;
331
332 ArrayList<Ip4Address> sourceList = new ArrayList<>();
333
334 if (igmpGroup.getSources().size() > 0) {
335 igmpGroup.getSources().forEach(source -> sourceList.add(source.getIp4Address()));
336 if (recordType == IGMPMembership.CHANGE_TO_EXCLUDE_MODE ||
337 recordType == IGMPMembership.MODE_IS_EXCLUDE ||
338 recordType == IGMPMembership.BLOCK_OLD_SOURCES) {
339 join = false;
340 } else if (recordType == IGMPMembership.CHANGE_TO_INCLUDE_MODE ||
341 recordType == IGMPMembership.MODE_IS_INCLUDE ||
342 recordType == IGMPMembership.ALLOW_NEW_SOURCES) {
343 join = true;
344 }
345 } else {
ke han29af27b2017-09-08 10:29:12 +0800346 IpAddress src = null;
347 if (pimSSmInterworking) {
348 src = ssmTranslateRoute(groupIp);
349 if (src == null) {
350 log.info("no ssm translate for group " + groupIp.toString());
351 return;
352 }
353 } else {
354 src = IpAddress.valueOf(DEFAULT_PIMSSM_HOST);
ke han81a38b92017-03-10 18:41:44 +0800355 }
356 sourceList.add(src.getIp4Address());
357 if (recordType == IGMPMembership.CHANGE_TO_EXCLUDE_MODE ||
358 recordType == IGMPMembership.MODE_IS_EXCLUDE ||
359 recordType == IGMPMembership.BLOCK_OLD_SOURCES) {
360 join = true;
361 } else if (recordType == IGMPMembership.CHANGE_TO_INCLUDE_MODE ||
362 recordType == IGMPMembership.MODE_IS_INCLUDE ||
363 recordType == IGMPMembership.ALLOW_NEW_SOURCES) {
364 join = false;
365 }
366 }
367 String groupMemberKey = GroupMember.getkey(groupIp, deviceId, portNumber);
368 GroupMember groupMember = groupMemberMap.get(groupMemberKey);
369
370 if (join) {
371 if (groupMember == null) {
372 if (igmpType == IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT) {
373 groupMember = new GroupMember(groupIp, vlan, deviceId, portNumber, true);
374 } else {
375 groupMember = new GroupMember(groupIp, vlan, deviceId, portNumber, false);
376 }
Esin Karamaneff10392019-06-27 18:09:13 +0000377
378 Optional<ConnectPoint> sourceConfigured = getSource();
379 if (!sourceConfigured.isPresent()) {
380 log.warn("Unable to process IGMP Join from {} since no source " +
381 "configuration is found.", deviceId);
382 return;
383 }
384 HashSet<ConnectPoint> sourceConnectPoints = Sets.newHashSet(sourceConfigured.get());
385
ke han81a38b92017-03-10 18:41:44 +0800386 StateMachine.join(deviceId, groupIp, srcIp);
387 groupMemberMap.put(groupMemberKey, groupMember);
388 groupMember.updateList(recordType, sourceList);
Esin Karamaneff10392019-06-27 18:09:13 +0000389 groupMember.getSourceList().forEach(source -> {
390 McastRoute route = new McastRoute(source, groupIp, McastRoute.Type.IGMP);
391 //add route
392 multicastService.add(route);
393 //add source to the route
394 multicastService.addSources(route, Sets.newHashSet(sourceConnectPoints));
395 //add sink to the route
396 multicastService.addSinks(route, Sets.newHashSet(cp));
397 });
398
ke han81a38b92017-03-10 18:41:44 +0800399 }
400 groupMember.resetAllTimers();
401 groupMember.updateList(recordType, sourceList);
402 groupMember.setLeave(false);
403 } else {
404 if (groupMember == null) {
405 log.info("receive leave but no instance, group " + groupIp.toString() +
406 " device:" + deviceId.toString() + " port:" + portNumber.toString());
407 return;
408 } else {
409 groupMember.setLeave(true);
410 if (fastLeave) {
411 leaveAction(groupMember);
412 } else {
413 sendQuery(groupMember);
414 }
415 }
416 }
417 }
418
419 private void leaveAction(GroupMember groupMember) {
420 ConnectPoint cp = new ConnectPoint(groupMember.getDeviceId(), groupMember.getPortNumber());
421 StateMachine.leave(groupMember.getDeviceId(), groupMember.getGroupIp());
Esin Karamaneff10392019-06-27 18:09:13 +0000422 groupMember.getSourceList().forEach(source -> multicastService.removeSinks(
ke han81a38b92017-03-10 18:41:44 +0800423 new McastRoute(source, groupMember.getGroupIp(),
Esin Karamaneff10392019-06-27 18:09:13 +0000424 McastRoute.Type.IGMP), Sets.newHashSet(cp)));
ke han81a38b92017-03-10 18:41:44 +0800425 groupMemberMap.remove(groupMember.getId());
426 }
427
428 private void sendQuery(GroupMember groupMember) {
429 Ethernet ethpkt;
430 Ip4Address srcIp = getDeviceIp(groupMember.getDeviceId());
431 if (groupMember.getv2()) {
432 ethpkt = IgmpSender.getInstance().buildIgmpV2Query(groupMember.getGroupIp(), srcIp);
433 } else {
434 ethpkt = IgmpSender.getInstance().buildIgmpV3Query(groupMember.getGroupIp(), srcIp);
435 }
436 IgmpSender.getInstance().sendIgmpPacket(ethpkt, groupMember.getDeviceId(), groupMember.getPortNumber());
437 }
438
Esin Karamaneff10392019-06-27 18:09:13 +0000439 /**
440 * @return connect point of the source if configured; and empty Optional otherwise.
441 */
442 public static Optional<ConnectPoint> getSource() {
443 return sourceDeviceAndPort == null ? Optional.empty() :
444 Optional.of(sourceDeviceAndPort);
ke han81a38b92017-03-10 18:41:44 +0800445 }
446
447 /**
448 * Packet processor responsible for forwarding packets along their paths.
449 */
450 private class IgmpPacketProcessor implements PacketProcessor {
451 @Override
452 public void process(PacketContext context) {
Esin Karamanb38700c2019-09-17 13:01:25 +0000453 eventExecutor.execute(() -> {
454 try {
455 InboundPacket pkt = context.inPacket();
456 Ethernet ethPkt = pkt.parsed();
457 if (ethPkt == null) {
458 return;
459 }
ke han81a38b92017-03-10 18:41:44 +0800460
Esin Karamanb38700c2019-09-17 13:01:25 +0000461 if (ethPkt.getEtherType() != Ethernet.TYPE_IPV4) {
462 return;
463 }
ke han81a38b92017-03-10 18:41:44 +0800464
Esin Karamanb38700c2019-09-17 13:01:25 +0000465 IPv4 ipv4Pkt = (IPv4) ethPkt.getPayload();
ke han81a38b92017-03-10 18:41:44 +0800466
Esin Karamanb38700c2019-09-17 13:01:25 +0000467 if (ipv4Pkt.getProtocol() != IPv4.PROTOCOL_IGMP) {
468 return;
469 }
ke han81a38b92017-03-10 18:41:44 +0800470
Esin Karamanb38700c2019-09-17 13:01:25 +0000471 short vlan = ethPkt.getVlanID();
472 DeviceId deviceId = pkt.receivedFrom().deviceId();
ke han81a38b92017-03-10 18:41:44 +0800473
Esin Karamanb38700c2019-09-17 13:01:25 +0000474 if (oltData.get(deviceId) == null &&
475 !isConnectPoint(deviceId, pkt.receivedFrom().port())) {
476 log.error("Device not registered in netcfg :" + deviceId.toString());
477 return;
478 }
ke han81a38b92017-03-10 18:41:44 +0800479
Esin Karamanb38700c2019-09-17 13:01:25 +0000480 IGMP igmp = (IGMP) ipv4Pkt.getPayload();
481 switch (igmp.getIgmpType()) {
482 case IGMP.TYPE_IGMPV3_MEMBERSHIP_QUERY:
483 //Discard Query from OLT’s non-uplink port’s
484 if (!pkt.receivedFrom().port().equals(getDeviceUplink(deviceId))) {
485 if (isConnectPoint(deviceId, pkt.receivedFrom().port())) {
486 log.info("IGMP Picked up query from connectPoint");
487 //OK to process packet
488 processIgmpConnectPointQuery((IGMPQuery) igmp.getGroups().get(0),
489 pkt.receivedFrom(),
490 0xff & igmp.getMaxRespField());
491 break;
492 } else {
493 //Not OK to process packet
494 log.warn("IGMP Picked up query from non-uplink port");
495 return;
496 }
497 }
ke han81a38b92017-03-10 18:41:44 +0800498
Esin Karamanb38700c2019-09-17 13:01:25 +0000499 processIgmpQuery((IGMPQuery) igmp.getGroups().get(0), pkt.receivedFrom(),
500 0xff & igmp.getMaxRespField());
501 break;
502 case IGMP.TYPE_IGMPV1_MEMBERSHIP_REPORT:
503 log.debug("IGMP version 1 message types are not currently supported.");
504 break;
505 case IGMP.TYPE_IGMPV3_MEMBERSHIP_REPORT:
506 case IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT:
507 case IGMP.TYPE_IGMPV2_LEAVE_GROUP:
508 //Discard join/leave from OLT’s uplink port’s
509 if (pkt.receivedFrom().port().equals(getDeviceUplink(deviceId)) ||
510 isConnectPoint(deviceId, pkt.receivedFrom().port())) {
511 log.info("IGMP Picked up join/leave from uplink/connectPoint port");
Deepa Vaddireddybcd52352017-09-21 05:04:48 +0000512 return;
513 }
ke han81a38b92017-03-10 18:41:44 +0800514
Esin Karamanb38700c2019-09-17 13:01:25 +0000515 Iterator<IGMPGroup> itr = igmp.getGroups().iterator();
516 while (itr.hasNext()) {
517 IGMPGroup group = itr.next();
518 if (group instanceof IGMPMembership) {
519 processIgmpReport((IGMPMembership) group, VlanId.vlanId(vlan),
520 pkt.receivedFrom(), igmp.getIgmpType());
521 } else if (group instanceof IGMPQuery) {
522 IGMPMembership mgroup;
523 mgroup = new IGMPMembership(group.getGaddr().getIp4Address());
524 mgroup.setRecordType(igmp.getIgmpType() == IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT ?
525 IGMPMembership.MODE_IS_EXCLUDE :
526 IGMPMembership.MODE_IS_INCLUDE);
527 processIgmpReport(mgroup, VlanId.vlanId(vlan),
528 pkt.receivedFrom(), igmp.getIgmpType());
529 }
ke han81a38b92017-03-10 18:41:44 +0800530 }
Esin Karamanb38700c2019-09-17 13:01:25 +0000531 break;
ke han81a38b92017-03-10 18:41:44 +0800532
Esin Karamanb38700c2019-09-17 13:01:25 +0000533 default:
534 log.info("wrong IGMP v3 type:" + igmp.getIgmpType());
535 break;
536 }
537
538 } catch (Exception ex) {
539 log.error("igmp process error : {} ", ex);
540 ex.printStackTrace();
ke han81a38b92017-03-10 18:41:44 +0800541 }
Esin Karamanb38700c2019-09-17 13:01:25 +0000542 });
ke han81a38b92017-03-10 18:41:44 +0800543 }
544 }
545
546 private class IgmpProxyTimerTask extends TimerTask {
547 public void run() {
548 try {
549 IgmpTimer.timeOut1s();
550 queryMembers();
551 } catch (Exception ex) {
552 log.warn("Igmp timer task error : {}", ex.getMessage());
553 }
554 }
555
556 private void queryMembers() {
557 GroupMember groupMember;
558 Set groupMemberSet = groupMemberMap.entrySet();
559 Iterator itr = groupMemberSet.iterator();
560 while (itr.hasNext()) {
561 Map.Entry entry = (Map.Entry) itr.next();
562 groupMember = (GroupMember) entry.getValue();
563 DeviceId did = groupMember.getDeviceId();
564 if (mastershipService.isLocalMaster(did)) {
565 if (groupMember.isLeave()) {
566 lastQuery(groupMember);
567 } else if (periodicQuery) {
568 periodicQuery(groupMember);
569 }
570 }
571 }
572 }
573
574 private void lastQuery(GroupMember groupMember) {
575 if (groupMember.getLastQueryInterval() < lastQueryInterval) {
576 groupMember.lastQueryInterval(true); // count times
577 } else if (groupMember.getLastQueryCount() < lastQueryCount - 1) {
578 sendQuery(groupMember);
579 groupMember.lastQueryInterval(false); // reset count number
580 groupMember.lastQueryCount(true); //count times
581 } else if (groupMember.getLastQueryCount() == lastQueryCount - 1) {
582 leaveAction(groupMember);
583 }
584 }
585
586 private void periodicQuery(GroupMember groupMember) {
587 if (groupMember.getKeepAliveQueryInterval() < keepAliveInterval) {
588 groupMember.keepAliveInterval(true);
589 } else if (groupMember.getKeepAliveQueryCount() < keepAliveCount) {
590 sendQuery(groupMember);
591 groupMember.keepAliveInterval(false);
592 groupMember.keepAliveQueryCount(true);
593 } else if (groupMember.getKeepAliveQueryCount() == keepAliveCount) {
594 leaveAction(groupMember);
595 }
596 }
597
598 }
599
600 public static PortNumber getDeviceUplink(DeviceId devId) {
Deepa Vaddireddybcd52352017-09-21 05:04:48 +0000601 if (oltData.get(devId) != null) {
Matteo Scandolo7482bbe2020-02-12 14:37:09 -0800602 return PortNumber.portNumber(oltData.get(devId).uplinkPort());
Deepa Vaddireddybcd52352017-09-21 05:04:48 +0000603 } else {
604 return null;
605 }
ke han81a38b92017-03-10 18:41:44 +0800606 }
607
Esin Karamanb38700c2019-09-17 13:01:25 +0000608 public static boolean isIgmpOnPodBasis() {
609 return igmpOnPodBasis;
610 }
611
ke han81a38b92017-03-10 18:41:44 +0800612 private void processFilterObjective(DeviceId devId, PortNumber port, boolean remove) {
Esin Karamaneff10392019-06-27 18:09:13 +0000613 if (!enableIgmpProvisioning) {
614 log.debug("IGMP trap rules won't be installed since enableIgmpProvisioning flag is not set");
615 return;
616 }
ke han81a38b92017-03-10 18:41:44 +0800617 //TODO migrate to packet requests when packet service uses filtering objectives
618 DefaultFilteringObjective.Builder builder = DefaultFilteringObjective.builder();
619
620 builder = remove ? builder.deny() : builder.permit();
621
622 FilteringObjective igmp = builder
623 .withKey(Criteria.matchInPort(port))
624 .addCondition(Criteria.matchEthType(EthType.EtherType.IPV4.ethType()))
625 .addCondition(Criteria.matchIPProtocol(IPv4.PROTOCOL_IGMP))
626 .withMeta(DefaultTrafficTreatment.builder().setOutput(PortNumber.CONTROLLER).build())
627 .fromApp(appId)
Esin Karamaneff10392019-06-27 18:09:13 +0000628 .withPriority(MAX_PRIORITY)
ke han81a38b92017-03-10 18:41:44 +0800629 .add(new ObjectiveContext() {
630 @Override
631 public void onSuccess(Objective objective) {
Esin Karamaneff10392019-06-27 18:09:13 +0000632 log.info("Igmp filter for {} on {} {}.",
633 devId, port, (remove) ? REMOVED : INSTALLED);
ke han81a38b92017-03-10 18:41:44 +0800634 }
635
636 @Override
637 public void onError(Objective objective, ObjectiveError error) {
Esin Karamaneff10392019-06-27 18:09:13 +0000638 log.info("Igmp filter {} for device {} on port {} failed because of {}",
639 (remove) ? INSTALLATION : REMOVAL, devId, port,
640 error);
ke han81a38b92017-03-10 18:41:44 +0800641 }
642 });
643
644 flowObjectiveService.filter(devId, igmp);
Esin Karamaneff10392019-06-27 18:09:13 +0000645
ke han81a38b92017-03-10 18:41:44 +0800646 }
647
648 private boolean isConnectPoint(DeviceId device, PortNumber port) {
Deepa Vaddireddy01f33b42017-07-02 13:26:53 +0530649 if (connectPoint != null) {
650 return (connectPointMode && connectPoint.deviceId().equals(device)
651 && connectPoint.port().equals(port));
652 } else {
653 log.info("connectPoint not configured for device {}", device);
654 return false;
655 }
ke han81a38b92017-03-10 18:41:44 +0800656 }
Deepa Vaddireddy01f33b42017-07-02 13:26:53 +0530657
ke han81a38b92017-03-10 18:41:44 +0800658 private boolean isUplink(DeviceId device, PortNumber port) {
Matteo Scandolo7482bbe2020-02-12 14:37:09 -0800659 return ((!connectPointMode) && getDeviceUplink(device).equals(port));
ke han81a38b92017-03-10 18:41:44 +0800660 }
661
662 private class InternalDeviceListener implements DeviceListener {
663 @Override
664 public void event(DeviceEvent event) {
665 DeviceId devId = event.subject().id();
Deepa Vaddireddyca7b25d2017-09-28 13:47:18 +0000666 Port p = event.port();
667 if (oltData.get(devId) == null &&
668 !(p != null && isConnectPoint(devId, p.number()))) {
ke han81a38b92017-03-10 18:41:44 +0800669 return;
670 }
Deepa Vaddireddyca7b25d2017-09-28 13:47:18 +0000671 PortNumber port;
672
ke han81a38b92017-03-10 18:41:44 +0800673 switch (event.type()) {
674
675 case DEVICE_ADDED:
676 case DEVICE_UPDATED:
677 case DEVICE_REMOVED:
678 case DEVICE_SUSPENDED:
679 case DEVICE_AVAILABILITY_CHANGED:
680 case PORT_STATS_UPDATED:
681 break;
682 case PORT_ADDED:
Deepa Vaddireddyca7b25d2017-09-28 13:47:18 +0000683 port = p.number();
ke han81a38b92017-03-10 18:41:44 +0800684 if (oltData.containsKey(devId) && !isUplink(devId, port) && !isConnectPoint(devId, port)) {
685 processFilterObjective(devId, port, false);
686 } else if (isUplink(devId, port)) {
687 provisionUplinkFlows();
688 } else if (isConnectPoint(devId, port)) {
689 provisionConnectPointFlows();
690 }
691 break;
692 case PORT_UPDATED:
Deepa Vaddireddyca7b25d2017-09-28 13:47:18 +0000693 port = p.number();
ke han81a38b92017-03-10 18:41:44 +0800694 if (oltData.containsKey(devId) && !isUplink(devId, port) && !isConnectPoint(devId, port)) {
695 if (event.port().isEnabled()) {
696 processFilterObjective(devId, port, false);
697 } else {
698 processFilterObjective(devId, port, true);
699 }
700 } else if (isUplink(devId, port)) {
701 if (event.port().isEnabled()) {
702 provisionUplinkFlows(devId);
703 } else {
704 processFilterObjective(devId, port, true);
705 }
706 } else if (isConnectPoint(devId, port)) {
707 if (event.port().isEnabled()) {
708 provisionConnectPointFlows();
709 } else {
710 unprovisionConnectPointFlows();
711 }
712 }
713 break;
714 case PORT_REMOVED:
Deepa Vaddireddyca7b25d2017-09-28 13:47:18 +0000715 port = p.number();
ke han81a38b92017-03-10 18:41:44 +0800716 processFilterObjective(devId, port, true);
717 break;
718 default:
719 log.info("Unknown device event {}", event.type());
720 break;
721 }
722 }
723
724 @Override
725 public boolean isRelevant(DeviceEvent event) {
726 return true;
727 }
728 }
729
730 private class InternalNetworkConfigListener implements NetworkConfigListener {
731
732 private void reconfigureNetwork(IgmpproxyConfig cfg) {
Esin Karamaneff10392019-06-27 18:09:13 +0000733 IgmpproxyConfig newCfg = cfg == null ? new IgmpproxyConfig() : cfg;
ke han81a38b92017-03-10 18:41:44 +0800734
735 unSolicitedTimeout = newCfg.unsolicitedTimeOut();
736 maxResp = newCfg.maxResp();
737 keepAliveInterval = newCfg.keepAliveInterval();
738 keepAliveCount = newCfg.keepAliveCount();
739 lastQueryInterval = newCfg.lastQueryInterval();
740 lastQueryCount = newCfg.lastQueryCount();
741 withRAUplink = newCfg.withRAUplink();
742 withRADownlink = newCfg.withRADownlink();
743 igmpCos = newCfg.igmpCos();
744 periodicQuery = newCfg.periodicQuery();
745 fastLeave = newCfg.fastLeave();
ke han29af27b2017-09-08 10:29:12 +0800746 pimSSmInterworking = newCfg.pimSsmInterworking();
Esin Karamaneff10392019-06-27 18:09:13 +0000747 enableIgmpProvisioning = newCfg.enableIgmpProvisioning();
Esin Karamanb38700c2019-09-17 13:01:25 +0000748 igmpOnPodBasis = newCfg.igmpOnPodBasis();
Deepa Vaddireddyca7b25d2017-09-28 13:47:18 +0000749
750 if (connectPointMode != newCfg.connectPointMode() ||
751 connectPoint != newCfg.connectPoint()) {
ke han81a38b92017-03-10 18:41:44 +0800752 connectPointMode = newCfg.connectPointMode();
Deepa Vaddireddyca7b25d2017-09-28 13:47:18 +0000753 connectPoint = newCfg.connectPoint();
ke han81a38b92017-03-10 18:41:44 +0800754 if (connectPointMode) {
755 unprovisionUplinkFlows();
756 provisionConnectPointFlows();
757 } else {
758 unprovisionConnectPointFlows();
759 provisionUplinkFlows();
760 }
761 }
762 if (connectPoint != null) {
Esin Karamaneff10392019-06-27 18:09:13 +0000763 log.info("connect point : {}", connectPoint);
ke han81a38b92017-03-10 18:41:44 +0800764 }
Esin Karamaneff10392019-06-27 18:09:13 +0000765 log.info("mode: {}", connectPointMode);
766
767 getSourceConnectPoint(newCfg);
ke han81a38b92017-03-10 18:41:44 +0800768
769 IgmpSender.getInstance().setIgmpCos(igmpCos);
770 IgmpSender.getInstance().setMaxResp(maxResp);
771 IgmpSender.getInstance().setMvlan(mvlan);
772 IgmpSender.getInstance().setWithRADownlink(withRADownlink);
773 IgmpSender.getInstance().setWithRAUplink(withRAUplink);
Esin Karamaneff10392019-06-27 18:09:13 +0000774 }
ke han81a38b92017-03-10 18:41:44 +0800775
Esin Karamaneff10392019-06-27 18:09:13 +0000776 void getSourceConnectPoint(IgmpproxyConfig cfg) {
777 sourceDeviceAndPort = cfg.getSourceDeviceAndPort();
778 if (sourceDeviceAndPort != null) {
779 log.debug("source parameter configured to {}", sourceDeviceAndPort);
780 }
ke han81a38b92017-03-10 18:41:44 +0800781 }
782
783 public void reconfigureSsmTable(IgmpproxySsmTranslateConfig cfg) {
784 if (cfg == null) {
785 return;
786 }
787 Collection<McastRoute> translations = cfg.getSsmTranslations();
788 for (McastRoute route : translations) {
Esin Karamaneff10392019-06-27 18:09:13 +0000789 ssmTranslateTable.put(route.group().getIp4Address(), route.source().get().getIp4Address());
ke han81a38b92017-03-10 18:41:44 +0800790 }
791 }
792
793 @Override
794 public void event(NetworkConfigEvent event) {
795 switch (event.type()) {
796 case CONFIG_ADDED:
797 case CONFIG_UPDATED:
Matteo Scandolo7482bbe2020-02-12 14:37:09 -0800798 // NOTE how to know if something has changed in sadis?
ke han81a38b92017-03-10 18:41:44 +0800799
800 if (event.configClass().equals(IGMPPROXY_CONFIG_CLASS)) {
801 IgmpproxyConfig config = networkConfig.getConfig(appId, IGMPPROXY_CONFIG_CLASS);
802 if (config != null) {
Esin Karamaneff10392019-06-27 18:09:13 +0000803 log.info("igmpproxy config received. {}", config);
ke han81a38b92017-03-10 18:41:44 +0800804 reconfigureNetwork(config);
805 }
806 }
807
808 if (event.configClass().equals(IGMPPROXY_SSM_CONFIG_CLASS)) {
809 IgmpproxySsmTranslateConfig config = networkConfig.getConfig(appId, IGMPPROXY_SSM_CONFIG_CLASS);
810 if (config != null) {
811 reconfigureSsmTable(config);
812 }
813 }
814
815 if (event.configClass().equals(MCAST_CONFIG_CLASS)) {
816 McastConfig config = networkConfig.getConfig(coreAppId, MCAST_CONFIG_CLASS);
817 if (config != null && mvlan != config.egressVlan().toShort()) {
818 mvlan = config.egressVlan().toShort();
Deepa Vaddireddy88134a42017-07-05 12:16:03 +0530819 IgmpSender.getInstance().setMvlan(mvlan);
ke han81a38b92017-03-10 18:41:44 +0800820 groupMemberMap.values().forEach(m -> leaveAction(m));
821 }
822 }
823
824 log.info("Reconfigured");
825 break;
826 case CONFIG_REGISTERED:
827 case CONFIG_UNREGISTERED:
828 break;
829 case CONFIG_REMOVED:
Matteo Scandolo7482bbe2020-02-12 14:37:09 -0800830 // NOTE how to know if something has changed in sadis?
ke han81a38b92017-03-10 18:41:44 +0800831 default:
832 break;
833 }
834 }
835 }
836
837 private void provisionDefaultFlows(DeviceId deviceId) {
838 List<Port> ports = deviceService.getPorts(deviceId);
Matteo Scandolo7482bbe2020-02-12 14:37:09 -0800839
ke han81a38b92017-03-10 18:41:44 +0800840 ports.stream()
Matteo Scandolo7482bbe2020-02-12 14:37:09 -0800841 .filter(p -> (!getDeviceUplink(((Device) p.element()).id()).equals(p.number()) && p.isEnabled()))
ke han81a38b92017-03-10 18:41:44 +0800842 .forEach(p -> processFilterObjective((DeviceId) p.element().id(), p.number(), false));
843 }
844
845 private void provisionUplinkFlows(DeviceId deviceId) {
846 if (connectPointMode) {
847 return;
848 }
849
Matteo Scandolo7482bbe2020-02-12 14:37:09 -0800850 processFilterObjective(deviceId, getDeviceUplink(deviceId), false);
ke han81a38b92017-03-10 18:41:44 +0800851 }
852
853 private void provisionUplinkFlows() {
854 if (connectPointMode) {
855 return;
856 }
857
David K. Bainbridge4809c0e2017-08-17 09:54:40 -0700858 oltData.keySet().forEach(deviceId -> provisionUplinkFlows(deviceId));
ke han81a38b92017-03-10 18:41:44 +0800859 }
860 private void unprovisionUplinkFlows() {
861 oltData.keySet().forEach(deviceId ->
Matteo Scandolo7482bbe2020-02-12 14:37:09 -0800862 processFilterObjective(deviceId, getDeviceUplink(deviceId), true));
ke han81a38b92017-03-10 18:41:44 +0800863 }
864
865 private void provisionConnectPointFlows() {
866 if ((!connectPointMode) || connectPoint == null) {
867 return;
868 }
869
870 processFilterObjective(connectPoint.deviceId(), connectPoint.port(), false);
871 }
872 private void unprovisionConnectPointFlows() {
873 if (connectPoint == null) {
874 return;
875 }
876 processFilterObjective(connectPoint.deviceId(), connectPoint.port(), true);
877 }
878}