blob: 1d03e09c32849bdc232259abedf7fbf4b0b78882 [file] [log] [blame]
David K. Bainbridged77028f2017-08-01 12:47:55 -07001/*
Brian O'Connor4d084702017-08-03 22:45:58 -07002 * Copyright 2017-present Open Networking Foundation
David K. Bainbridged77028f2017-08-01 12:47:55 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
ke han81a38b92017-03-10 18:41:44 +080016package org.opencord.igmpproxy;
17
18import com.google.common.collect.Maps;
Esin Karamaneff10392019-06-27 18:09:13 +000019import com.google.common.collect.Sets;
Carmelo Casconebef302e2019-11-14 19:58:20 -080020import org.osgi.service.component.annotations.Activate;
21import org.osgi.service.component.annotations.Component;
22import org.osgi.service.component.annotations.Deactivate;
23import org.osgi.service.component.annotations.Reference;
24import org.osgi.service.component.annotations.ReferenceCardinality;
ke han81a38b92017-03-10 18:41:44 +080025import org.onlab.packet.EthType;
26import org.onlab.packet.Ethernet;
27import org.onlab.packet.IGMP;
28import org.onlab.packet.IGMPGroup;
29import org.onlab.packet.IGMPMembership;
30import org.onlab.packet.IGMPQuery;
31import org.onlab.packet.IPv4;
32import org.onlab.packet.Ip4Address;
33import org.onlab.packet.IpAddress;
34import org.onlab.packet.VlanId;
35import org.onosproject.core.ApplicationId;
36import org.onosproject.core.CoreService;
ke han81a38b92017-03-10 18:41:44 +080037import org.onosproject.mastership.MastershipService;
38import org.onosproject.net.AnnotationKeys;
39import org.onosproject.net.ConnectPoint;
40import org.onosproject.net.DeviceId;
41import org.onosproject.net.Port;
42import org.onosproject.net.PortNumber;
43import org.onosproject.net.config.ConfigFactory;
44import org.onosproject.net.config.NetworkConfigEvent;
45import org.onosproject.net.config.NetworkConfigListener;
46import org.onosproject.net.config.NetworkConfigRegistry;
Jonathan Hart488e1142018-05-02 17:30:05 -070047import org.onosproject.net.config.basics.McastConfig;
ke han81a38b92017-03-10 18:41:44 +080048import org.onosproject.net.config.basics.SubjectFactories;
49import org.onosproject.net.device.DeviceEvent;
50import org.onosproject.net.device.DeviceListener;
51import org.onosproject.net.device.DeviceService;
52import org.onosproject.net.flow.DefaultTrafficTreatment;
53import org.onosproject.net.flow.FlowRuleService;
54import org.onosproject.net.flow.criteria.Criteria;
55import org.onosproject.net.flowobjective.DefaultFilteringObjective;
56import org.onosproject.net.flowobjective.FilteringObjective;
57import org.onosproject.net.flowobjective.FlowObjectiveService;
58import org.onosproject.net.flowobjective.Objective;
59import org.onosproject.net.flowobjective.ObjectiveContext;
60import org.onosproject.net.flowobjective.ObjectiveError;
Esin Karamaneff10392019-06-27 18:09:13 +000061import org.onosproject.mcast.api.McastRoute;
62import org.onosproject.mcast.api.MulticastRouteService;
ke han81a38b92017-03-10 18:41:44 +080063import org.onosproject.net.packet.InboundPacket;
64import org.onosproject.net.packet.PacketContext;
65import org.onosproject.net.packet.PacketProcessor;
66import org.onosproject.net.packet.PacketService;
67import org.opencord.cordconfig.access.AccessDeviceConfig;
68import org.opencord.cordconfig.access.AccessDeviceData;
69import org.slf4j.Logger;
70import org.slf4j.LoggerFactory;
71
72import java.util.ArrayList;
73import java.util.Collection;
Esin Karamaneff10392019-06-27 18:09:13 +000074import java.util.HashSet;
ke han81a38b92017-03-10 18:41:44 +080075import java.util.Iterator;
76import java.util.List;
77import java.util.Map;
Esin Karamaneff10392019-06-27 18:09:13 +000078import java.util.Optional;
ke han81a38b92017-03-10 18:41:44 +080079import java.util.Set;
80import java.util.TimerTask;
81import java.util.concurrent.ConcurrentHashMap;
Esin Karamanb38700c2019-09-17 13:01:25 +000082import java.util.concurrent.ExecutorService;
ke han81a38b92017-03-10 18:41:44 +080083import java.util.concurrent.Executors;
84import java.util.concurrent.ScheduledExecutorService;
85import java.util.concurrent.TimeUnit;
86
Esin Karamanb38700c2019-09-17 13:01:25 +000087import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
88import static org.onlab.util.Tools.groupedThreads;
89
ke han81a38b92017-03-10 18:41:44 +080090/**
91 * Igmp process application, use proxy mode, support first join/ last leave , fast leave
92 * period query and keep alive, packet out igmp message to uplink port features.
93 */
94@Component(immediate = true)
95public class IgmpManager {
96
97 private static final Class<AccessDeviceConfig> CONFIG_CLASS =
98 AccessDeviceConfig.class;
99 private static final Class<IgmpproxyConfig> IGMPPROXY_CONFIG_CLASS =
100 IgmpproxyConfig.class;
101 private static final Class<IgmpproxySsmTranslateConfig> IGMPPROXY_SSM_CONFIG_CLASS =
102 IgmpproxySsmTranslateConfig.class;
103 private static final Class<McastConfig> MCAST_CONFIG_CLASS =
104 McastConfig.class;
Esin Karamaneff10392019-06-27 18:09:13 +0000105
ke han81a38b92017-03-10 18:41:44 +0800106 public static Map<String, GroupMember> groupMemberMap = Maps.newConcurrentMap();
107 private static ApplicationId appId;
108 private static Map<DeviceId, AccessDeviceData> oltData = new ConcurrentHashMap<>();
109 private static int unSolicitedTimeout = 3; // unit is 1 sec
110 private static int keepAliveCount = 3;
111 private static int lastQueryInterval = 2; //unit is 1 sec
112 private static int lastQueryCount = 2;
113 private static boolean fastLeave = true;
114 private static boolean withRAUplink = true;
115 private static boolean withRADownlink = false;
116 private static boolean periodicQuery = true;
117 private static short mvlan = 4000;
118 private static byte igmpCos = 7;
119 public static boolean connectPointMode = true;
120 public static ConnectPoint connectPoint = null;
Esin Karamaneff10392019-06-27 18:09:13 +0000121 private static ConnectPoint sourceDeviceAndPort = null;
122 private static boolean enableIgmpProvisioning = false;
Esin Karamanb38700c2019-09-17 13:01:25 +0000123 private static boolean igmpOnPodBasis = false;
Esin Karamaneff10392019-06-27 18:09:13 +0000124
125 private static final Integer MAX_PRIORITY = 10000;
126 private static final String INSTALLED = "installed";
127 private static final String REMOVED = "removed";
128 private static final String INSTALLATION = "installation";
129 private static final String REMOVAL = "removal";
ke han81a38b92017-03-10 18:41:44 +0800130
ke han29af27b2017-09-08 10:29:12 +0800131 private static boolean pimSSmInterworking = false;
132 private static final String DEFAULT_PIMSSM_HOST = "127.0.0.1";
ke han81a38b92017-03-10 18:41:44 +0800133 private final ScheduledExecutorService scheduledExecutorService =
134 Executors.newScheduledThreadPool(1);
Carmelo Casconebef302e2019-11-14 19:58:20 -0800135 @Reference(cardinality = ReferenceCardinality.MANDATORY)
ke han81a38b92017-03-10 18:41:44 +0800136 protected CoreService coreService;
Carmelo Casconebef302e2019-11-14 19:58:20 -0800137 @Reference(cardinality = ReferenceCardinality.MANDATORY)
ke han81a38b92017-03-10 18:41:44 +0800138 protected PacketService packetService;
Carmelo Casconebef302e2019-11-14 19:58:20 -0800139 @Reference(cardinality = ReferenceCardinality.MANDATORY)
ke han81a38b92017-03-10 18:41:44 +0800140 protected MastershipService mastershipService;
Carmelo Casconebef302e2019-11-14 19:58:20 -0800141 @Reference(cardinality = ReferenceCardinality.MANDATORY)
ke han81a38b92017-03-10 18:41:44 +0800142 protected FlowRuleService flowRuleService;
Carmelo Casconebef302e2019-11-14 19:58:20 -0800143 @Reference(cardinality = ReferenceCardinality.MANDATORY)
ke han81a38b92017-03-10 18:41:44 +0800144 protected DeviceService deviceService;
Carmelo Casconebef302e2019-11-14 19:58:20 -0800145 @Reference(cardinality = ReferenceCardinality.MANDATORY)
ke han81a38b92017-03-10 18:41:44 +0800146 protected FlowObjectiveService flowObjectiveService;
Carmelo Casconebef302e2019-11-14 19:58:20 -0800147 @Reference(cardinality = ReferenceCardinality.MANDATORY)
ke han81a38b92017-03-10 18:41:44 +0800148 protected NetworkConfigRegistry networkConfig;
Carmelo Casconebef302e2019-11-14 19:58:20 -0800149 @Reference(cardinality = ReferenceCardinality.MANDATORY)
ke han81a38b92017-03-10 18:41:44 +0800150 protected MulticastRouteService multicastService;
151 private IgmpPacketProcessor processor = new IgmpPacketProcessor();
152 private Logger log = LoggerFactory.getLogger(getClass());
153 private ApplicationId coreAppId;
154 private Map<Ip4Address, Ip4Address> ssmTranslateTable = new ConcurrentHashMap<>();
Esin Karamaneff10392019-06-27 18:09:13 +0000155
ke han81a38b92017-03-10 18:41:44 +0800156 private InternalNetworkConfigListener configListener =
157 new InternalNetworkConfigListener();
158 private DeviceListener deviceListener = new InternalDeviceListener();
159 private ConfigFactory<DeviceId, AccessDeviceConfig> configFactory = null;
160 private ConfigFactory<ApplicationId, IgmpproxyConfig> igmpproxyConfigFactory =
161 new ConfigFactory<ApplicationId, IgmpproxyConfig>(
162 SubjectFactories.APP_SUBJECT_FACTORY, IGMPPROXY_CONFIG_CLASS, "igmpproxy") {
163 @Override
164 public IgmpproxyConfig createConfig() {
165 return new IgmpproxyConfig();
166 }
167 };
168 private ConfigFactory<ApplicationId, IgmpproxySsmTranslateConfig> igmpproxySsmConfigFactory =
169 new ConfigFactory<ApplicationId, IgmpproxySsmTranslateConfig>(
170 SubjectFactories.APP_SUBJECT_FACTORY, IGMPPROXY_SSM_CONFIG_CLASS, "ssmTranslate", true) {
171 @Override
172 public IgmpproxySsmTranslateConfig createConfig() {
173 return new IgmpproxySsmTranslateConfig();
174 }
175 };
Esin Karamaneff10392019-06-27 18:09:13 +0000176
ke han81a38b92017-03-10 18:41:44 +0800177 private int maxResp = 10; //unit is 1 sec
178 private int keepAliveInterval = 120; //unit is 1 sec
179
Esin Karamanb38700c2019-09-17 13:01:25 +0000180 private ExecutorService eventExecutor;
181
ke han81a38b92017-03-10 18:41:44 +0800182 public static int getUnsolicitedTimeout() {
183 return unSolicitedTimeout;
184 }
185
186 @Activate
187 protected void activate() {
188 appId = coreService.registerApplication("org.opencord.igmpproxy");
189 coreAppId = coreService.registerApplication(CoreService.CORE_APP_NAME);
190 packetService.addProcessor(processor, PacketProcessor.director(4));
191 IgmpSender.init(packetService, mastershipService);
192
193 if (networkConfig.getConfigFactory(CONFIG_CLASS) == null) {
194 configFactory =
195 new ConfigFactory<DeviceId, AccessDeviceConfig>(
196 SubjectFactories.DEVICE_SUBJECT_FACTORY, CONFIG_CLASS, "accessDevice") {
197 @Override
198 public AccessDeviceConfig createConfig() {
199 return new AccessDeviceConfig();
200 }
201 };
202 networkConfig.registerConfigFactory(configFactory);
203 }
204 networkConfig.registerConfigFactory(igmpproxySsmConfigFactory);
205 networkConfig.registerConfigFactory(igmpproxyConfigFactory);
206 networkConfig.addListener(configListener);
207
208 configListener.reconfigureNetwork(networkConfig.getConfig(appId, IGMPPROXY_CONFIG_CLASS));
209 configListener.reconfigureSsmTable(networkConfig.getConfig(appId, IGMPPROXY_SSM_CONFIG_CLASS));
210
211 networkConfig.getSubjects(DeviceId.class, AccessDeviceConfig.class).forEach(
212 subject -> {
213 AccessDeviceConfig config = networkConfig.getConfig(subject,
214 AccessDeviceConfig.class);
215 if (config != null) {
216 AccessDeviceData data = config.getAccessDevice();
217 oltData.put(data.deviceId(), data);
218 }
219 }
220 );
221
David K. Bainbridge4809c0e2017-08-17 09:54:40 -0700222 oltData.keySet().forEach(d -> provisionDefaultFlows(d));
ke han81a38b92017-03-10 18:41:44 +0800223 if (connectPointMode) {
224 provisionConnectPointFlows();
225 } else {
226 provisionUplinkFlows();
227 }
228
229 McastConfig config = networkConfig.getConfig(coreAppId, MCAST_CONFIG_CLASS);
230 if (config != null) {
231 mvlan = config.egressVlan().toShort();
Deepa Vaddireddy88134a42017-07-05 12:16:03 +0530232 IgmpSender.getInstance().setMvlan(mvlan);
ke han81a38b92017-03-10 18:41:44 +0800233 }
234 deviceService.addListener(deviceListener);
235 scheduledExecutorService.scheduleAtFixedRate(new IgmpProxyTimerTask(), 0, 1000, TimeUnit.MILLISECONDS);
Esin Karamanb38700c2019-09-17 13:01:25 +0000236 eventExecutor = newSingleThreadScheduledExecutor(groupedThreads("cord/igmpproxy",
237 "events-igmp-%d", log));
ke han81a38b92017-03-10 18:41:44 +0800238
239 log.info("Started");
240 }
241
242 @Deactivate
243 protected void deactivate() {
244 scheduledExecutorService.shutdown();
Esin Karamanb38700c2019-09-17 13:01:25 +0000245 eventExecutor.shutdown();
ke han81a38b92017-03-10 18:41:44 +0800246
247 // de-register and null our handler
248 networkConfig.removeListener(configListener);
249 if (configFactory != null) {
250 networkConfig.unregisterConfigFactory(configFactory);
251 }
252 networkConfig.unregisterConfigFactory(igmpproxyConfigFactory);
253 networkConfig.unregisterConfigFactory(igmpproxySsmConfigFactory);
254 deviceService.removeListener(deviceListener);
255 packetService.removeProcessor(processor);
256 flowRuleService.removeFlowRulesById(appId);
257
258 log.info("Stopped");
259 }
260
261 protected Ip4Address getDeviceIp(DeviceId ofDeviceId) {
262 try {
263 String[] mgmtAddress = deviceService.getDevice(ofDeviceId)
264 .annotations().value(AnnotationKeys.MANAGEMENT_ADDRESS).split(":");
265 return Ip4Address.valueOf(mgmtAddress[0]);
266 } catch (Exception ex) {
267 log.info("No valid Ipaddress for " + ofDeviceId.toString());
268 return null;
269 }
270 }
271
272 private void processIgmpQuery(IGMPQuery igmpQuery, ConnectPoint cp, int maxResp) {
273
274 DeviceId deviceId = cp.deviceId();
275 Ip4Address gAddr = igmpQuery.getGaddr().getIp4Address();
Deepa Vaddireddybcd52352017-09-21 05:04:48 +0000276 maxResp = calculateMaxResp(maxResp);
277 if (gAddr != null && !gAddr.isZero()) {
278 StateMachine.specialQuery(deviceId, gAddr, maxResp);
279 } else {
280 StateMachine.generalQuery(deviceId, maxResp);
281 }
282 }
ke han81a38b92017-03-10 18:41:44 +0800283
Deepa Vaddireddybcd52352017-09-21 05:04:48 +0000284 private void processIgmpConnectPointQuery(IGMPQuery igmpQuery, ConnectPoint cp, int maxResp) {
285
286 DeviceId deviceId = cp.deviceId();
287 Ip4Address gAddr = igmpQuery.getGaddr().getIp4Address();
288 maxResp = calculateMaxResp(maxResp);
289 //The query is received on the ConnectPoint
290 // send query accordingly to the registered OLT devices.
291 if (gAddr != null && !gAddr.isZero()) {
292 for (DeviceId devId : oltData.keySet()) {
293 StateMachine.specialQuery(devId, gAddr, maxResp);
294 }
295 } else {
296 //Don't know which group is targeted by the query
297 //So query all the members(in all the OLTs) and proxy their reports
298 StateMachine.generalQuery(maxResp);
299 }
300 }
301
302
303 private int calculateMaxResp(int maxResp) {
ke han81a38b92017-03-10 18:41:44 +0800304 if (maxResp >= 128) {
305 int mant = maxResp & 0xf;
306 int exp = (maxResp >> 4) & 0x7;
307 maxResp = (mant | 0x10) << (exp + 3);
308 }
309
Deepa Vaddireddybcd52352017-09-21 05:04:48 +0000310 return (maxResp + 5) / 10;
ke han81a38b92017-03-10 18:41:44 +0800311 }
312
313 private Ip4Address ssmTranslateRoute(IpAddress group) {
314 return ssmTranslateTable.get(group);
315 }
316
317 private void processIgmpReport(IGMPMembership igmpGroup, VlanId vlan, ConnectPoint cp, byte igmpType) {
318 DeviceId deviceId = cp.deviceId();
319 PortNumber portNumber = cp.port();
320
321 Ip4Address groupIp = igmpGroup.getGaddr().getIp4Address();
322 if (!groupIp.isMulticast()) {
323 log.info(groupIp.toString() + " is not a valid group address");
324 return;
325 }
326 Ip4Address srcIp = getDeviceIp(deviceId);
327
328 byte recordType = igmpGroup.getRecordType();
329 boolean join = false;
330
331 ArrayList<Ip4Address> sourceList = new ArrayList<>();
332
333 if (igmpGroup.getSources().size() > 0) {
334 igmpGroup.getSources().forEach(source -> sourceList.add(source.getIp4Address()));
335 if (recordType == IGMPMembership.CHANGE_TO_EXCLUDE_MODE ||
336 recordType == IGMPMembership.MODE_IS_EXCLUDE ||
337 recordType == IGMPMembership.BLOCK_OLD_SOURCES) {
338 join = false;
339 } else if (recordType == IGMPMembership.CHANGE_TO_INCLUDE_MODE ||
340 recordType == IGMPMembership.MODE_IS_INCLUDE ||
341 recordType == IGMPMembership.ALLOW_NEW_SOURCES) {
342 join = true;
343 }
344 } else {
ke han29af27b2017-09-08 10:29:12 +0800345 IpAddress src = null;
346 if (pimSSmInterworking) {
347 src = ssmTranslateRoute(groupIp);
348 if (src == null) {
349 log.info("no ssm translate for group " + groupIp.toString());
350 return;
351 }
352 } else {
353 src = IpAddress.valueOf(DEFAULT_PIMSSM_HOST);
ke han81a38b92017-03-10 18:41:44 +0800354 }
355 sourceList.add(src.getIp4Address());
356 if (recordType == IGMPMembership.CHANGE_TO_EXCLUDE_MODE ||
357 recordType == IGMPMembership.MODE_IS_EXCLUDE ||
358 recordType == IGMPMembership.BLOCK_OLD_SOURCES) {
359 join = true;
360 } else if (recordType == IGMPMembership.CHANGE_TO_INCLUDE_MODE ||
361 recordType == IGMPMembership.MODE_IS_INCLUDE ||
362 recordType == IGMPMembership.ALLOW_NEW_SOURCES) {
363 join = false;
364 }
365 }
366 String groupMemberKey = GroupMember.getkey(groupIp, deviceId, portNumber);
367 GroupMember groupMember = groupMemberMap.get(groupMemberKey);
368
369 if (join) {
370 if (groupMember == null) {
371 if (igmpType == IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT) {
372 groupMember = new GroupMember(groupIp, vlan, deviceId, portNumber, true);
373 } else {
374 groupMember = new GroupMember(groupIp, vlan, deviceId, portNumber, false);
375 }
Esin Karamaneff10392019-06-27 18:09:13 +0000376
377 Optional<ConnectPoint> sourceConfigured = getSource();
378 if (!sourceConfigured.isPresent()) {
379 log.warn("Unable to process IGMP Join from {} since no source " +
380 "configuration is found.", deviceId);
381 return;
382 }
383 HashSet<ConnectPoint> sourceConnectPoints = Sets.newHashSet(sourceConfigured.get());
384
ke han81a38b92017-03-10 18:41:44 +0800385 StateMachine.join(deviceId, groupIp, srcIp);
386 groupMemberMap.put(groupMemberKey, groupMember);
387 groupMember.updateList(recordType, sourceList);
Esin Karamaneff10392019-06-27 18:09:13 +0000388 groupMember.getSourceList().forEach(source -> {
389 McastRoute route = new McastRoute(source, groupIp, McastRoute.Type.IGMP);
390 //add route
391 multicastService.add(route);
392 //add source to the route
393 multicastService.addSources(route, Sets.newHashSet(sourceConnectPoints));
394 //add sink to the route
395 multicastService.addSinks(route, Sets.newHashSet(cp));
396 });
397
ke han81a38b92017-03-10 18:41:44 +0800398 }
399 groupMember.resetAllTimers();
400 groupMember.updateList(recordType, sourceList);
401 groupMember.setLeave(false);
402 } else {
403 if (groupMember == null) {
404 log.info("receive leave but no instance, group " + groupIp.toString() +
405 " device:" + deviceId.toString() + " port:" + portNumber.toString());
406 return;
407 } else {
408 groupMember.setLeave(true);
409 if (fastLeave) {
410 leaveAction(groupMember);
411 } else {
412 sendQuery(groupMember);
413 }
414 }
415 }
416 }
417
418 private void leaveAction(GroupMember groupMember) {
419 ConnectPoint cp = new ConnectPoint(groupMember.getDeviceId(), groupMember.getPortNumber());
420 StateMachine.leave(groupMember.getDeviceId(), groupMember.getGroupIp());
Esin Karamaneff10392019-06-27 18:09:13 +0000421 groupMember.getSourceList().forEach(source -> multicastService.removeSinks(
ke han81a38b92017-03-10 18:41:44 +0800422 new McastRoute(source, groupMember.getGroupIp(),
Esin Karamaneff10392019-06-27 18:09:13 +0000423 McastRoute.Type.IGMP), Sets.newHashSet(cp)));
ke han81a38b92017-03-10 18:41:44 +0800424 groupMemberMap.remove(groupMember.getId());
425 }
426
427 private void sendQuery(GroupMember groupMember) {
428 Ethernet ethpkt;
429 Ip4Address srcIp = getDeviceIp(groupMember.getDeviceId());
430 if (groupMember.getv2()) {
431 ethpkt = IgmpSender.getInstance().buildIgmpV2Query(groupMember.getGroupIp(), srcIp);
432 } else {
433 ethpkt = IgmpSender.getInstance().buildIgmpV3Query(groupMember.getGroupIp(), srcIp);
434 }
435 IgmpSender.getInstance().sendIgmpPacket(ethpkt, groupMember.getDeviceId(), groupMember.getPortNumber());
436 }
437
Esin Karamaneff10392019-06-27 18:09:13 +0000438 /**
439 * @return connect point of the source if configured; and empty Optional otherwise.
440 */
441 public static Optional<ConnectPoint> getSource() {
442 return sourceDeviceAndPort == null ? Optional.empty() :
443 Optional.of(sourceDeviceAndPort);
ke han81a38b92017-03-10 18:41:44 +0800444 }
445
446 /**
447 * Packet processor responsible for forwarding packets along their paths.
448 */
449 private class IgmpPacketProcessor implements PacketProcessor {
450 @Override
451 public void process(PacketContext context) {
Esin Karamanb38700c2019-09-17 13:01:25 +0000452 eventExecutor.execute(() -> {
453 try {
454 InboundPacket pkt = context.inPacket();
455 Ethernet ethPkt = pkt.parsed();
456 if (ethPkt == null) {
457 return;
458 }
ke han81a38b92017-03-10 18:41:44 +0800459
Esin Karamanb38700c2019-09-17 13:01:25 +0000460 if (ethPkt.getEtherType() != Ethernet.TYPE_IPV4) {
461 return;
462 }
ke han81a38b92017-03-10 18:41:44 +0800463
Esin Karamanb38700c2019-09-17 13:01:25 +0000464 IPv4 ipv4Pkt = (IPv4) ethPkt.getPayload();
ke han81a38b92017-03-10 18:41:44 +0800465
Esin Karamanb38700c2019-09-17 13:01:25 +0000466 if (ipv4Pkt.getProtocol() != IPv4.PROTOCOL_IGMP) {
467 return;
468 }
ke han81a38b92017-03-10 18:41:44 +0800469
Esin Karamanb38700c2019-09-17 13:01:25 +0000470 short vlan = ethPkt.getVlanID();
471 DeviceId deviceId = pkt.receivedFrom().deviceId();
ke han81a38b92017-03-10 18:41:44 +0800472
Esin Karamanb38700c2019-09-17 13:01:25 +0000473 if (oltData.get(deviceId) == null &&
474 !isConnectPoint(deviceId, pkt.receivedFrom().port())) {
475 log.error("Device not registered in netcfg :" + deviceId.toString());
476 return;
477 }
ke han81a38b92017-03-10 18:41:44 +0800478
Esin Karamanb38700c2019-09-17 13:01:25 +0000479 IGMP igmp = (IGMP) ipv4Pkt.getPayload();
480 switch (igmp.getIgmpType()) {
481 case IGMP.TYPE_IGMPV3_MEMBERSHIP_QUERY:
482 //Discard Query from OLT’s non-uplink port’s
483 if (!pkt.receivedFrom().port().equals(getDeviceUplink(deviceId))) {
484 if (isConnectPoint(deviceId, pkt.receivedFrom().port())) {
485 log.info("IGMP Picked up query from connectPoint");
486 //OK to process packet
487 processIgmpConnectPointQuery((IGMPQuery) igmp.getGroups().get(0),
488 pkt.receivedFrom(),
489 0xff & igmp.getMaxRespField());
490 break;
491 } else {
492 //Not OK to process packet
493 log.warn("IGMP Picked up query from non-uplink port");
494 return;
495 }
496 }
ke han81a38b92017-03-10 18:41:44 +0800497
Esin Karamanb38700c2019-09-17 13:01:25 +0000498 processIgmpQuery((IGMPQuery) igmp.getGroups().get(0), pkt.receivedFrom(),
499 0xff & igmp.getMaxRespField());
500 break;
501 case IGMP.TYPE_IGMPV1_MEMBERSHIP_REPORT:
502 log.debug("IGMP version 1 message types are not currently supported.");
503 break;
504 case IGMP.TYPE_IGMPV3_MEMBERSHIP_REPORT:
505 case IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT:
506 case IGMP.TYPE_IGMPV2_LEAVE_GROUP:
507 //Discard join/leave from OLT’s uplink port’s
508 if (pkt.receivedFrom().port().equals(getDeviceUplink(deviceId)) ||
509 isConnectPoint(deviceId, pkt.receivedFrom().port())) {
510 log.info("IGMP Picked up join/leave from uplink/connectPoint port");
Deepa Vaddireddybcd52352017-09-21 05:04:48 +0000511 return;
512 }
ke han81a38b92017-03-10 18:41:44 +0800513
Esin Karamanb38700c2019-09-17 13:01:25 +0000514 Iterator<IGMPGroup> itr = igmp.getGroups().iterator();
515 while (itr.hasNext()) {
516 IGMPGroup group = itr.next();
517 if (group instanceof IGMPMembership) {
518 processIgmpReport((IGMPMembership) group, VlanId.vlanId(vlan),
519 pkt.receivedFrom(), igmp.getIgmpType());
520 } else if (group instanceof IGMPQuery) {
521 IGMPMembership mgroup;
522 mgroup = new IGMPMembership(group.getGaddr().getIp4Address());
523 mgroup.setRecordType(igmp.getIgmpType() == IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT ?
524 IGMPMembership.MODE_IS_EXCLUDE :
525 IGMPMembership.MODE_IS_INCLUDE);
526 processIgmpReport(mgroup, VlanId.vlanId(vlan),
527 pkt.receivedFrom(), igmp.getIgmpType());
528 }
ke han81a38b92017-03-10 18:41:44 +0800529 }
Esin Karamanb38700c2019-09-17 13:01:25 +0000530 break;
ke han81a38b92017-03-10 18:41:44 +0800531
Esin Karamanb38700c2019-09-17 13:01:25 +0000532 default:
533 log.info("wrong IGMP v3 type:" + igmp.getIgmpType());
534 break;
535 }
536
537 } catch (Exception ex) {
538 log.error("igmp process error : {} ", ex);
539 ex.printStackTrace();
ke han81a38b92017-03-10 18:41:44 +0800540 }
Esin Karamanb38700c2019-09-17 13:01:25 +0000541 });
ke han81a38b92017-03-10 18:41:44 +0800542 }
543 }
544
545 private class IgmpProxyTimerTask extends TimerTask {
546 public void run() {
547 try {
548 IgmpTimer.timeOut1s();
549 queryMembers();
550 } catch (Exception ex) {
551 log.warn("Igmp timer task error : {}", ex.getMessage());
552 }
553 }
554
555 private void queryMembers() {
556 GroupMember groupMember;
557 Set groupMemberSet = groupMemberMap.entrySet();
558 Iterator itr = groupMemberSet.iterator();
559 while (itr.hasNext()) {
560 Map.Entry entry = (Map.Entry) itr.next();
561 groupMember = (GroupMember) entry.getValue();
562 DeviceId did = groupMember.getDeviceId();
563 if (mastershipService.isLocalMaster(did)) {
564 if (groupMember.isLeave()) {
565 lastQuery(groupMember);
566 } else if (periodicQuery) {
567 periodicQuery(groupMember);
568 }
569 }
570 }
571 }
572
573 private void lastQuery(GroupMember groupMember) {
574 if (groupMember.getLastQueryInterval() < lastQueryInterval) {
575 groupMember.lastQueryInterval(true); // count times
576 } else if (groupMember.getLastQueryCount() < lastQueryCount - 1) {
577 sendQuery(groupMember);
578 groupMember.lastQueryInterval(false); // reset count number
579 groupMember.lastQueryCount(true); //count times
580 } else if (groupMember.getLastQueryCount() == lastQueryCount - 1) {
581 leaveAction(groupMember);
582 }
583 }
584
585 private void periodicQuery(GroupMember groupMember) {
586 if (groupMember.getKeepAliveQueryInterval() < keepAliveInterval) {
587 groupMember.keepAliveInterval(true);
588 } else if (groupMember.getKeepAliveQueryCount() < keepAliveCount) {
589 sendQuery(groupMember);
590 groupMember.keepAliveInterval(false);
591 groupMember.keepAliveQueryCount(true);
592 } else if (groupMember.getKeepAliveQueryCount() == keepAliveCount) {
593 leaveAction(groupMember);
594 }
595 }
596
597 }
598
599 public static PortNumber getDeviceUplink(DeviceId devId) {
Deepa Vaddireddybcd52352017-09-21 05:04:48 +0000600 if (oltData.get(devId) != null) {
601 return oltData.get(devId).uplink();
602 } else {
603 return null;
604 }
ke han81a38b92017-03-10 18:41:44 +0800605 }
606
Esin Karamanb38700c2019-09-17 13:01:25 +0000607 public static boolean isIgmpOnPodBasis() {
608 return igmpOnPodBasis;
609 }
610
ke han81a38b92017-03-10 18:41:44 +0800611 private void processFilterObjective(DeviceId devId, PortNumber port, boolean remove) {
Esin Karamaneff10392019-06-27 18:09:13 +0000612 if (!enableIgmpProvisioning) {
613 log.debug("IGMP trap rules won't be installed since enableIgmpProvisioning flag is not set");
614 return;
615 }
ke han81a38b92017-03-10 18:41:44 +0800616 //TODO migrate to packet requests when packet service uses filtering objectives
617 DefaultFilteringObjective.Builder builder = DefaultFilteringObjective.builder();
618
619 builder = remove ? builder.deny() : builder.permit();
620
621 FilteringObjective igmp = builder
622 .withKey(Criteria.matchInPort(port))
623 .addCondition(Criteria.matchEthType(EthType.EtherType.IPV4.ethType()))
624 .addCondition(Criteria.matchIPProtocol(IPv4.PROTOCOL_IGMP))
625 .withMeta(DefaultTrafficTreatment.builder().setOutput(PortNumber.CONTROLLER).build())
626 .fromApp(appId)
Esin Karamaneff10392019-06-27 18:09:13 +0000627 .withPriority(MAX_PRIORITY)
ke han81a38b92017-03-10 18:41:44 +0800628 .add(new ObjectiveContext() {
629 @Override
630 public void onSuccess(Objective objective) {
Esin Karamaneff10392019-06-27 18:09:13 +0000631 log.info("Igmp filter for {} on {} {}.",
632 devId, port, (remove) ? REMOVED : INSTALLED);
ke han81a38b92017-03-10 18:41:44 +0800633 }
634
635 @Override
636 public void onError(Objective objective, ObjectiveError error) {
Esin Karamaneff10392019-06-27 18:09:13 +0000637 log.info("Igmp filter {} for device {} on port {} failed because of {}",
638 (remove) ? INSTALLATION : REMOVAL, devId, port,
639 error);
ke han81a38b92017-03-10 18:41:44 +0800640 }
641 });
642
643 flowObjectiveService.filter(devId, igmp);
Esin Karamaneff10392019-06-27 18:09:13 +0000644
ke han81a38b92017-03-10 18:41:44 +0800645 }
646
647 private boolean isConnectPoint(DeviceId device, PortNumber port) {
Deepa Vaddireddy01f33b42017-07-02 13:26:53 +0530648 if (connectPoint != null) {
649 return (connectPointMode && connectPoint.deviceId().equals(device)
650 && connectPoint.port().equals(port));
651 } else {
652 log.info("connectPoint not configured for device {}", device);
653 return false;
654 }
ke han81a38b92017-03-10 18:41:44 +0800655 }
Deepa Vaddireddy01f33b42017-07-02 13:26:53 +0530656
ke han81a38b92017-03-10 18:41:44 +0800657 private boolean isUplink(DeviceId device, PortNumber port) {
658 return ((!connectPointMode) && oltData.containsKey(device)
659 && oltData.get(device).uplink().equals(port));
660 }
661
662 private class InternalDeviceListener implements DeviceListener {
663 @Override
664 public void event(DeviceEvent event) {
665 DeviceId devId = event.subject().id();
Deepa Vaddireddyca7b25d2017-09-28 13:47:18 +0000666 Port p = event.port();
667 if (oltData.get(devId) == null &&
668 !(p != null && isConnectPoint(devId, p.number()))) {
ke han81a38b92017-03-10 18:41:44 +0800669 return;
670 }
Deepa Vaddireddyca7b25d2017-09-28 13:47:18 +0000671 PortNumber port;
672
ke han81a38b92017-03-10 18:41:44 +0800673 switch (event.type()) {
674
675 case DEVICE_ADDED:
676 case DEVICE_UPDATED:
677 case DEVICE_REMOVED:
678 case DEVICE_SUSPENDED:
679 case DEVICE_AVAILABILITY_CHANGED:
680 case PORT_STATS_UPDATED:
681 break;
682 case PORT_ADDED:
Deepa Vaddireddyca7b25d2017-09-28 13:47:18 +0000683 port = p.number();
ke han81a38b92017-03-10 18:41:44 +0800684 if (oltData.containsKey(devId) && !isUplink(devId, port) && !isConnectPoint(devId, port)) {
685 processFilterObjective(devId, port, false);
686 } else if (isUplink(devId, port)) {
687 provisionUplinkFlows();
688 } else if (isConnectPoint(devId, port)) {
689 provisionConnectPointFlows();
690 }
691 break;
692 case PORT_UPDATED:
Deepa Vaddireddyca7b25d2017-09-28 13:47:18 +0000693 port = p.number();
ke han81a38b92017-03-10 18:41:44 +0800694 if (oltData.containsKey(devId) && !isUplink(devId, port) && !isConnectPoint(devId, port)) {
695 if (event.port().isEnabled()) {
696 processFilterObjective(devId, port, false);
697 } else {
698 processFilterObjective(devId, port, true);
699 }
700 } else if (isUplink(devId, port)) {
701 if (event.port().isEnabled()) {
702 provisionUplinkFlows(devId);
703 } else {
704 processFilterObjective(devId, port, true);
705 }
706 } else if (isConnectPoint(devId, port)) {
707 if (event.port().isEnabled()) {
708 provisionConnectPointFlows();
709 } else {
710 unprovisionConnectPointFlows();
711 }
712 }
713 break;
714 case PORT_REMOVED:
Deepa Vaddireddyca7b25d2017-09-28 13:47:18 +0000715 port = p.number();
ke han81a38b92017-03-10 18:41:44 +0800716 processFilterObjective(devId, port, true);
717 break;
718 default:
719 log.info("Unknown device event {}", event.type());
720 break;
721 }
722 }
723
724 @Override
725 public boolean isRelevant(DeviceEvent event) {
726 return true;
727 }
728 }
729
730 private class InternalNetworkConfigListener implements NetworkConfigListener {
731
732 private void reconfigureNetwork(IgmpproxyConfig cfg) {
Esin Karamaneff10392019-06-27 18:09:13 +0000733 IgmpproxyConfig newCfg = cfg == null ? new IgmpproxyConfig() : cfg;
ke han81a38b92017-03-10 18:41:44 +0800734
735 unSolicitedTimeout = newCfg.unsolicitedTimeOut();
736 maxResp = newCfg.maxResp();
737 keepAliveInterval = newCfg.keepAliveInterval();
738 keepAliveCount = newCfg.keepAliveCount();
739 lastQueryInterval = newCfg.lastQueryInterval();
740 lastQueryCount = newCfg.lastQueryCount();
741 withRAUplink = newCfg.withRAUplink();
742 withRADownlink = newCfg.withRADownlink();
743 igmpCos = newCfg.igmpCos();
744 periodicQuery = newCfg.periodicQuery();
745 fastLeave = newCfg.fastLeave();
ke han29af27b2017-09-08 10:29:12 +0800746 pimSSmInterworking = newCfg.pimSsmInterworking();
Esin Karamaneff10392019-06-27 18:09:13 +0000747 enableIgmpProvisioning = newCfg.enableIgmpProvisioning();
Esin Karamanb38700c2019-09-17 13:01:25 +0000748 igmpOnPodBasis = newCfg.igmpOnPodBasis();
Deepa Vaddireddyca7b25d2017-09-28 13:47:18 +0000749
750 if (connectPointMode != newCfg.connectPointMode() ||
751 connectPoint != newCfg.connectPoint()) {
ke han81a38b92017-03-10 18:41:44 +0800752 connectPointMode = newCfg.connectPointMode();
Deepa Vaddireddyca7b25d2017-09-28 13:47:18 +0000753 connectPoint = newCfg.connectPoint();
ke han81a38b92017-03-10 18:41:44 +0800754 if (connectPointMode) {
755 unprovisionUplinkFlows();
756 provisionConnectPointFlows();
757 } else {
758 unprovisionConnectPointFlows();
759 provisionUplinkFlows();
760 }
761 }
762 if (connectPoint != null) {
Esin Karamaneff10392019-06-27 18:09:13 +0000763 log.info("connect point : {}", connectPoint);
ke han81a38b92017-03-10 18:41:44 +0800764 }
Esin Karamaneff10392019-06-27 18:09:13 +0000765 log.info("mode: {}", connectPointMode);
766
767 getSourceConnectPoint(newCfg);
ke han81a38b92017-03-10 18:41:44 +0800768
769 IgmpSender.getInstance().setIgmpCos(igmpCos);
770 IgmpSender.getInstance().setMaxResp(maxResp);
771 IgmpSender.getInstance().setMvlan(mvlan);
772 IgmpSender.getInstance().setWithRADownlink(withRADownlink);
773 IgmpSender.getInstance().setWithRAUplink(withRAUplink);
Esin Karamaneff10392019-06-27 18:09:13 +0000774 }
ke han81a38b92017-03-10 18:41:44 +0800775
Esin Karamaneff10392019-06-27 18:09:13 +0000776 void getSourceConnectPoint(IgmpproxyConfig cfg) {
777 sourceDeviceAndPort = cfg.getSourceDeviceAndPort();
778 if (sourceDeviceAndPort != null) {
779 log.debug("source parameter configured to {}", sourceDeviceAndPort);
780 }
ke han81a38b92017-03-10 18:41:44 +0800781 }
782
783 public void reconfigureSsmTable(IgmpproxySsmTranslateConfig cfg) {
784 if (cfg == null) {
785 return;
786 }
787 Collection<McastRoute> translations = cfg.getSsmTranslations();
788 for (McastRoute route : translations) {
Esin Karamaneff10392019-06-27 18:09:13 +0000789 ssmTranslateTable.put(route.group().getIp4Address(), route.source().get().getIp4Address());
ke han81a38b92017-03-10 18:41:44 +0800790 }
791 }
792
793 @Override
794 public void event(NetworkConfigEvent event) {
795 switch (event.type()) {
796 case CONFIG_ADDED:
797 case CONFIG_UPDATED:
798 if (event.configClass().equals(CONFIG_CLASS)) {
799 AccessDeviceConfig config =
800 networkConfig.getConfig((DeviceId) event.subject(), CONFIG_CLASS);
801 if (config != null) {
802 oltData.put(config.getAccessDevice().deviceId(), config.getAccessDevice());
803 provisionDefaultFlows((DeviceId) event.subject());
804 provisionUplinkFlows((DeviceId) event.subject());
805 }
806 }
807
808 if (event.configClass().equals(IGMPPROXY_CONFIG_CLASS)) {
809 IgmpproxyConfig config = networkConfig.getConfig(appId, IGMPPROXY_CONFIG_CLASS);
810 if (config != null) {
Esin Karamaneff10392019-06-27 18:09:13 +0000811 log.info("igmpproxy config received. {}", config);
ke han81a38b92017-03-10 18:41:44 +0800812 reconfigureNetwork(config);
813 }
814 }
815
816 if (event.configClass().equals(IGMPPROXY_SSM_CONFIG_CLASS)) {
817 IgmpproxySsmTranslateConfig config = networkConfig.getConfig(appId, IGMPPROXY_SSM_CONFIG_CLASS);
818 if (config != null) {
819 reconfigureSsmTable(config);
820 }
821 }
822
823 if (event.configClass().equals(MCAST_CONFIG_CLASS)) {
824 McastConfig config = networkConfig.getConfig(coreAppId, MCAST_CONFIG_CLASS);
825 if (config != null && mvlan != config.egressVlan().toShort()) {
826 mvlan = config.egressVlan().toShort();
Deepa Vaddireddy88134a42017-07-05 12:16:03 +0530827 IgmpSender.getInstance().setMvlan(mvlan);
ke han81a38b92017-03-10 18:41:44 +0800828 groupMemberMap.values().forEach(m -> leaveAction(m));
829 }
830 }
831
832 log.info("Reconfigured");
833 break;
834 case CONFIG_REGISTERED:
835 case CONFIG_UNREGISTERED:
836 break;
837 case CONFIG_REMOVED:
838 if (event.configClass().equals(CONFIG_CLASS)) {
839 oltData.remove(event.subject());
840 }
841
842 default:
843 break;
844 }
845 }
846 }
847
848 private void provisionDefaultFlows(DeviceId deviceId) {
849 List<Port> ports = deviceService.getPorts(deviceId);
850 ports.stream()
851 .filter(p -> (!oltData.get(p.element().id()).uplink().equals(p.number()) && p.isEnabled()))
852 .forEach(p -> processFilterObjective((DeviceId) p.element().id(), p.number(), false));
853 }
854
855 private void provisionUplinkFlows(DeviceId deviceId) {
856 if (connectPointMode) {
857 return;
858 }
859
860 processFilterObjective(deviceId, oltData.get(deviceId).uplink(), false);
861 }
862
863 private void provisionUplinkFlows() {
864 if (connectPointMode) {
865 return;
866 }
867
David K. Bainbridge4809c0e2017-08-17 09:54:40 -0700868 oltData.keySet().forEach(deviceId -> provisionUplinkFlows(deviceId));
ke han81a38b92017-03-10 18:41:44 +0800869 }
870 private void unprovisionUplinkFlows() {
871 oltData.keySet().forEach(deviceId ->
872 processFilterObjective(deviceId, oltData.get(deviceId).uplink(), true));
873 }
874
875 private void provisionConnectPointFlows() {
876 if ((!connectPointMode) || connectPoint == null) {
877 return;
878 }
879
880 processFilterObjective(connectPoint.deviceId(), connectPoint.port(), false);
881 }
882 private void unprovisionConnectPointFlows() {
883 if (connectPoint == null) {
884 return;
885 }
886 processFilterObjective(connectPoint.deviceId(), connectPoint.port(), true);
887 }
888}