blob: 40d390e4568a68e3aea3359d6cc1b6b705956da7 [file] [log] [blame]
David K. Bainbridged77028f2017-08-01 12:47:55 -07001/*
Brian O'Connor4d084702017-08-03 22:45:58 -07002 * Copyright 2017-present Open Networking Foundation
David K. Bainbridged77028f2017-08-01 12:47:55 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
ke han81a38b92017-03-10 18:41:44 +080016package org.opencord.igmpproxy;
17
18import com.google.common.collect.Maps;
Esin Karamaneff10392019-06-27 18:09:13 +000019import com.google.common.collect.Sets;
ke han81a38b92017-03-10 18:41:44 +080020import org.apache.felix.scr.annotations.Activate;
21import org.apache.felix.scr.annotations.Component;
22import org.apache.felix.scr.annotations.Deactivate;
23import org.apache.felix.scr.annotations.Reference;
24import org.apache.felix.scr.annotations.ReferenceCardinality;
25import org.onlab.packet.EthType;
26import org.onlab.packet.Ethernet;
27import org.onlab.packet.IGMP;
28import org.onlab.packet.IGMPGroup;
29import org.onlab.packet.IGMPMembership;
30import org.onlab.packet.IGMPQuery;
31import org.onlab.packet.IPv4;
32import org.onlab.packet.Ip4Address;
33import org.onlab.packet.IpAddress;
34import org.onlab.packet.VlanId;
35import org.onosproject.core.ApplicationId;
36import org.onosproject.core.CoreService;
ke han81a38b92017-03-10 18:41:44 +080037import org.onosproject.mastership.MastershipService;
38import org.onosproject.net.AnnotationKeys;
39import org.onosproject.net.ConnectPoint;
40import org.onosproject.net.DeviceId;
41import org.onosproject.net.Port;
42import org.onosproject.net.PortNumber;
43import org.onosproject.net.config.ConfigFactory;
44import org.onosproject.net.config.NetworkConfigEvent;
45import org.onosproject.net.config.NetworkConfigListener;
46import org.onosproject.net.config.NetworkConfigRegistry;
Jonathan Hart488e1142018-05-02 17:30:05 -070047import org.onosproject.net.config.basics.McastConfig;
ke han81a38b92017-03-10 18:41:44 +080048import org.onosproject.net.config.basics.SubjectFactories;
49import org.onosproject.net.device.DeviceEvent;
50import org.onosproject.net.device.DeviceListener;
51import org.onosproject.net.device.DeviceService;
52import org.onosproject.net.flow.DefaultTrafficTreatment;
53import org.onosproject.net.flow.FlowRuleService;
54import org.onosproject.net.flow.criteria.Criteria;
55import org.onosproject.net.flowobjective.DefaultFilteringObjective;
56import org.onosproject.net.flowobjective.FilteringObjective;
57import org.onosproject.net.flowobjective.FlowObjectiveService;
58import org.onosproject.net.flowobjective.Objective;
59import org.onosproject.net.flowobjective.ObjectiveContext;
60import org.onosproject.net.flowobjective.ObjectiveError;
Esin Karamaneff10392019-06-27 18:09:13 +000061import org.onosproject.mcast.api.McastRoute;
62import org.onosproject.mcast.api.MulticastRouteService;
ke han81a38b92017-03-10 18:41:44 +080063import org.onosproject.net.packet.InboundPacket;
64import org.onosproject.net.packet.PacketContext;
65import org.onosproject.net.packet.PacketProcessor;
66import org.onosproject.net.packet.PacketService;
67import org.opencord.cordconfig.access.AccessDeviceConfig;
68import org.opencord.cordconfig.access.AccessDeviceData;
69import org.slf4j.Logger;
70import org.slf4j.LoggerFactory;
71
72import java.util.ArrayList;
73import java.util.Collection;
Esin Karamaneff10392019-06-27 18:09:13 +000074import java.util.HashSet;
ke han81a38b92017-03-10 18:41:44 +080075import java.util.Iterator;
76import java.util.List;
77import java.util.Map;
Esin Karamaneff10392019-06-27 18:09:13 +000078import java.util.Optional;
ke han81a38b92017-03-10 18:41:44 +080079import java.util.Set;
80import java.util.TimerTask;
81import java.util.concurrent.ConcurrentHashMap;
82import java.util.concurrent.Executors;
83import java.util.concurrent.ScheduledExecutorService;
84import java.util.concurrent.TimeUnit;
85
86/**
87 * Igmp process application, use proxy mode, support first join/ last leave , fast leave
88 * period query and keep alive, packet out igmp message to uplink port features.
89 */
90@Component(immediate = true)
91public class IgmpManager {
92
93 private static final Class<AccessDeviceConfig> CONFIG_CLASS =
94 AccessDeviceConfig.class;
95 private static final Class<IgmpproxyConfig> IGMPPROXY_CONFIG_CLASS =
96 IgmpproxyConfig.class;
97 private static final Class<IgmpproxySsmTranslateConfig> IGMPPROXY_SSM_CONFIG_CLASS =
98 IgmpproxySsmTranslateConfig.class;
99 private static final Class<McastConfig> MCAST_CONFIG_CLASS =
100 McastConfig.class;
Esin Karamaneff10392019-06-27 18:09:13 +0000101
ke han81a38b92017-03-10 18:41:44 +0800102 public static Map<String, GroupMember> groupMemberMap = Maps.newConcurrentMap();
103 private static ApplicationId appId;
104 private static Map<DeviceId, AccessDeviceData> oltData = new ConcurrentHashMap<>();
105 private static int unSolicitedTimeout = 3; // unit is 1 sec
106 private static int keepAliveCount = 3;
107 private static int lastQueryInterval = 2; //unit is 1 sec
108 private static int lastQueryCount = 2;
109 private static boolean fastLeave = true;
110 private static boolean withRAUplink = true;
111 private static boolean withRADownlink = false;
112 private static boolean periodicQuery = true;
113 private static short mvlan = 4000;
114 private static byte igmpCos = 7;
115 public static boolean connectPointMode = true;
116 public static ConnectPoint connectPoint = null;
Esin Karamaneff10392019-06-27 18:09:13 +0000117 private static ConnectPoint sourceDeviceAndPort = null;
118 private static boolean enableIgmpProvisioning = false;
119
120 private static final Integer MAX_PRIORITY = 10000;
121 private static final String INSTALLED = "installed";
122 private static final String REMOVED = "removed";
123 private static final String INSTALLATION = "installation";
124 private static final String REMOVAL = "removal";
ke han81a38b92017-03-10 18:41:44 +0800125
ke han29af27b2017-09-08 10:29:12 +0800126 private static boolean pimSSmInterworking = false;
127 private static final String DEFAULT_PIMSSM_HOST = "127.0.0.1";
ke han81a38b92017-03-10 18:41:44 +0800128 private final ScheduledExecutorService scheduledExecutorService =
129 Executors.newScheduledThreadPool(1);
130 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
131 protected CoreService coreService;
132 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
133 protected PacketService packetService;
134 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
135 protected MastershipService mastershipService;
136 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
137 protected FlowRuleService flowRuleService;
138 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
139 protected DeviceService deviceService;
140 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
141 protected FlowObjectiveService flowObjectiveService;
142 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
143 protected NetworkConfigRegistry networkConfig;
144 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
145 protected MulticastRouteService multicastService;
146 private IgmpPacketProcessor processor = new IgmpPacketProcessor();
147 private Logger log = LoggerFactory.getLogger(getClass());
148 private ApplicationId coreAppId;
149 private Map<Ip4Address, Ip4Address> ssmTranslateTable = new ConcurrentHashMap<>();
Esin Karamaneff10392019-06-27 18:09:13 +0000150
ke han81a38b92017-03-10 18:41:44 +0800151 private InternalNetworkConfigListener configListener =
152 new InternalNetworkConfigListener();
153 private DeviceListener deviceListener = new InternalDeviceListener();
154 private ConfigFactory<DeviceId, AccessDeviceConfig> configFactory = null;
155 private ConfigFactory<ApplicationId, IgmpproxyConfig> igmpproxyConfigFactory =
156 new ConfigFactory<ApplicationId, IgmpproxyConfig>(
157 SubjectFactories.APP_SUBJECT_FACTORY, IGMPPROXY_CONFIG_CLASS, "igmpproxy") {
158 @Override
159 public IgmpproxyConfig createConfig() {
160 return new IgmpproxyConfig();
161 }
162 };
163 private ConfigFactory<ApplicationId, IgmpproxySsmTranslateConfig> igmpproxySsmConfigFactory =
164 new ConfigFactory<ApplicationId, IgmpproxySsmTranslateConfig>(
165 SubjectFactories.APP_SUBJECT_FACTORY, IGMPPROXY_SSM_CONFIG_CLASS, "ssmTranslate", true) {
166 @Override
167 public IgmpproxySsmTranslateConfig createConfig() {
168 return new IgmpproxySsmTranslateConfig();
169 }
170 };
Esin Karamaneff10392019-06-27 18:09:13 +0000171
ke han81a38b92017-03-10 18:41:44 +0800172 private int maxResp = 10; //unit is 1 sec
173 private int keepAliveInterval = 120; //unit is 1 sec
174
175 public static int getUnsolicitedTimeout() {
176 return unSolicitedTimeout;
177 }
178
179 @Activate
180 protected void activate() {
181 appId = coreService.registerApplication("org.opencord.igmpproxy");
182 coreAppId = coreService.registerApplication(CoreService.CORE_APP_NAME);
183 packetService.addProcessor(processor, PacketProcessor.director(4));
184 IgmpSender.init(packetService, mastershipService);
185
186 if (networkConfig.getConfigFactory(CONFIG_CLASS) == null) {
187 configFactory =
188 new ConfigFactory<DeviceId, AccessDeviceConfig>(
189 SubjectFactories.DEVICE_SUBJECT_FACTORY, CONFIG_CLASS, "accessDevice") {
190 @Override
191 public AccessDeviceConfig createConfig() {
192 return new AccessDeviceConfig();
193 }
194 };
195 networkConfig.registerConfigFactory(configFactory);
196 }
197 networkConfig.registerConfigFactory(igmpproxySsmConfigFactory);
198 networkConfig.registerConfigFactory(igmpproxyConfigFactory);
199 networkConfig.addListener(configListener);
200
201 configListener.reconfigureNetwork(networkConfig.getConfig(appId, IGMPPROXY_CONFIG_CLASS));
202 configListener.reconfigureSsmTable(networkConfig.getConfig(appId, IGMPPROXY_SSM_CONFIG_CLASS));
203
204 networkConfig.getSubjects(DeviceId.class, AccessDeviceConfig.class).forEach(
205 subject -> {
206 AccessDeviceConfig config = networkConfig.getConfig(subject,
207 AccessDeviceConfig.class);
208 if (config != null) {
209 AccessDeviceData data = config.getAccessDevice();
210 oltData.put(data.deviceId(), data);
211 }
212 }
213 );
214
David K. Bainbridge4809c0e2017-08-17 09:54:40 -0700215 oltData.keySet().forEach(d -> provisionDefaultFlows(d));
ke han81a38b92017-03-10 18:41:44 +0800216 if (connectPointMode) {
217 provisionConnectPointFlows();
218 } else {
219 provisionUplinkFlows();
220 }
221
222 McastConfig config = networkConfig.getConfig(coreAppId, MCAST_CONFIG_CLASS);
223 if (config != null) {
224 mvlan = config.egressVlan().toShort();
Deepa Vaddireddy88134a42017-07-05 12:16:03 +0530225 IgmpSender.getInstance().setMvlan(mvlan);
ke han81a38b92017-03-10 18:41:44 +0800226 }
227 deviceService.addListener(deviceListener);
228 scheduledExecutorService.scheduleAtFixedRate(new IgmpProxyTimerTask(), 0, 1000, TimeUnit.MILLISECONDS);
229
230 log.info("Started");
231 }
232
233 @Deactivate
234 protected void deactivate() {
235 scheduledExecutorService.shutdown();
236
237 // de-register and null our handler
238 networkConfig.removeListener(configListener);
239 if (configFactory != null) {
240 networkConfig.unregisterConfigFactory(configFactory);
241 }
242 networkConfig.unregisterConfigFactory(igmpproxyConfigFactory);
243 networkConfig.unregisterConfigFactory(igmpproxySsmConfigFactory);
244 deviceService.removeListener(deviceListener);
245 packetService.removeProcessor(processor);
246 flowRuleService.removeFlowRulesById(appId);
247
248 log.info("Stopped");
249 }
250
251 protected Ip4Address getDeviceIp(DeviceId ofDeviceId) {
252 try {
253 String[] mgmtAddress = deviceService.getDevice(ofDeviceId)
254 .annotations().value(AnnotationKeys.MANAGEMENT_ADDRESS).split(":");
255 return Ip4Address.valueOf(mgmtAddress[0]);
256 } catch (Exception ex) {
257 log.info("No valid Ipaddress for " + ofDeviceId.toString());
258 return null;
259 }
260 }
261
262 private void processIgmpQuery(IGMPQuery igmpQuery, ConnectPoint cp, int maxResp) {
263
264 DeviceId deviceId = cp.deviceId();
265 Ip4Address gAddr = igmpQuery.getGaddr().getIp4Address();
Deepa Vaddireddybcd52352017-09-21 05:04:48 +0000266 maxResp = calculateMaxResp(maxResp);
267 if (gAddr != null && !gAddr.isZero()) {
268 StateMachine.specialQuery(deviceId, gAddr, maxResp);
269 } else {
270 StateMachine.generalQuery(deviceId, maxResp);
271 }
272 }
ke han81a38b92017-03-10 18:41:44 +0800273
Deepa Vaddireddybcd52352017-09-21 05:04:48 +0000274 private void processIgmpConnectPointQuery(IGMPQuery igmpQuery, ConnectPoint cp, int maxResp) {
275
276 DeviceId deviceId = cp.deviceId();
277 Ip4Address gAddr = igmpQuery.getGaddr().getIp4Address();
278 maxResp = calculateMaxResp(maxResp);
279 //The query is received on the ConnectPoint
280 // send query accordingly to the registered OLT devices.
281 if (gAddr != null && !gAddr.isZero()) {
282 for (DeviceId devId : oltData.keySet()) {
283 StateMachine.specialQuery(devId, gAddr, maxResp);
284 }
285 } else {
286 //Don't know which group is targeted by the query
287 //So query all the members(in all the OLTs) and proxy their reports
288 StateMachine.generalQuery(maxResp);
289 }
290 }
291
292
293 private int calculateMaxResp(int maxResp) {
ke han81a38b92017-03-10 18:41:44 +0800294 if (maxResp >= 128) {
295 int mant = maxResp & 0xf;
296 int exp = (maxResp >> 4) & 0x7;
297 maxResp = (mant | 0x10) << (exp + 3);
298 }
299
Deepa Vaddireddybcd52352017-09-21 05:04:48 +0000300 return (maxResp + 5) / 10;
ke han81a38b92017-03-10 18:41:44 +0800301 }
302
303 private Ip4Address ssmTranslateRoute(IpAddress group) {
304 return ssmTranslateTable.get(group);
305 }
306
307 private void processIgmpReport(IGMPMembership igmpGroup, VlanId vlan, ConnectPoint cp, byte igmpType) {
308 DeviceId deviceId = cp.deviceId();
309 PortNumber portNumber = cp.port();
310
311 Ip4Address groupIp = igmpGroup.getGaddr().getIp4Address();
312 if (!groupIp.isMulticast()) {
313 log.info(groupIp.toString() + " is not a valid group address");
314 return;
315 }
316 Ip4Address srcIp = getDeviceIp(deviceId);
317
318 byte recordType = igmpGroup.getRecordType();
319 boolean join = false;
320
321 ArrayList<Ip4Address> sourceList = new ArrayList<>();
322
323 if (igmpGroup.getSources().size() > 0) {
324 igmpGroup.getSources().forEach(source -> sourceList.add(source.getIp4Address()));
325 if (recordType == IGMPMembership.CHANGE_TO_EXCLUDE_MODE ||
326 recordType == IGMPMembership.MODE_IS_EXCLUDE ||
327 recordType == IGMPMembership.BLOCK_OLD_SOURCES) {
328 join = false;
329 } else if (recordType == IGMPMembership.CHANGE_TO_INCLUDE_MODE ||
330 recordType == IGMPMembership.MODE_IS_INCLUDE ||
331 recordType == IGMPMembership.ALLOW_NEW_SOURCES) {
332 join = true;
333 }
334 } else {
ke han29af27b2017-09-08 10:29:12 +0800335 IpAddress src = null;
336 if (pimSSmInterworking) {
337 src = ssmTranslateRoute(groupIp);
338 if (src == null) {
339 log.info("no ssm translate for group " + groupIp.toString());
340 return;
341 }
342 } else {
343 src = IpAddress.valueOf(DEFAULT_PIMSSM_HOST);
ke han81a38b92017-03-10 18:41:44 +0800344 }
345 sourceList.add(src.getIp4Address());
346 if (recordType == IGMPMembership.CHANGE_TO_EXCLUDE_MODE ||
347 recordType == IGMPMembership.MODE_IS_EXCLUDE ||
348 recordType == IGMPMembership.BLOCK_OLD_SOURCES) {
349 join = true;
350 } else if (recordType == IGMPMembership.CHANGE_TO_INCLUDE_MODE ||
351 recordType == IGMPMembership.MODE_IS_INCLUDE ||
352 recordType == IGMPMembership.ALLOW_NEW_SOURCES) {
353 join = false;
354 }
355 }
356 String groupMemberKey = GroupMember.getkey(groupIp, deviceId, portNumber);
357 GroupMember groupMember = groupMemberMap.get(groupMemberKey);
358
359 if (join) {
360 if (groupMember == null) {
361 if (igmpType == IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT) {
362 groupMember = new GroupMember(groupIp, vlan, deviceId, portNumber, true);
363 } else {
364 groupMember = new GroupMember(groupIp, vlan, deviceId, portNumber, false);
365 }
Esin Karamaneff10392019-06-27 18:09:13 +0000366
367 Optional<ConnectPoint> sourceConfigured = getSource();
368 if (!sourceConfigured.isPresent()) {
369 log.warn("Unable to process IGMP Join from {} since no source " +
370 "configuration is found.", deviceId);
371 return;
372 }
373 HashSet<ConnectPoint> sourceConnectPoints = Sets.newHashSet(sourceConfigured.get());
374
ke han81a38b92017-03-10 18:41:44 +0800375 StateMachine.join(deviceId, groupIp, srcIp);
376 groupMemberMap.put(groupMemberKey, groupMember);
377 groupMember.updateList(recordType, sourceList);
Esin Karamaneff10392019-06-27 18:09:13 +0000378 groupMember.getSourceList().forEach(source -> {
379 McastRoute route = new McastRoute(source, groupIp, McastRoute.Type.IGMP);
380 //add route
381 multicastService.add(route);
382 //add source to the route
383 multicastService.addSources(route, Sets.newHashSet(sourceConnectPoints));
384 //add sink to the route
385 multicastService.addSinks(route, Sets.newHashSet(cp));
386 });
387
ke han81a38b92017-03-10 18:41:44 +0800388 }
389 groupMember.resetAllTimers();
390 groupMember.updateList(recordType, sourceList);
391 groupMember.setLeave(false);
392 } else {
393 if (groupMember == null) {
394 log.info("receive leave but no instance, group " + groupIp.toString() +
395 " device:" + deviceId.toString() + " port:" + portNumber.toString());
396 return;
397 } else {
398 groupMember.setLeave(true);
399 if (fastLeave) {
400 leaveAction(groupMember);
401 } else {
402 sendQuery(groupMember);
403 }
404 }
405 }
406 }
407
408 private void leaveAction(GroupMember groupMember) {
409 ConnectPoint cp = new ConnectPoint(groupMember.getDeviceId(), groupMember.getPortNumber());
410 StateMachine.leave(groupMember.getDeviceId(), groupMember.getGroupIp());
Esin Karamaneff10392019-06-27 18:09:13 +0000411 groupMember.getSourceList().forEach(source -> multicastService.removeSinks(
ke han81a38b92017-03-10 18:41:44 +0800412 new McastRoute(source, groupMember.getGroupIp(),
Esin Karamaneff10392019-06-27 18:09:13 +0000413 McastRoute.Type.IGMP), Sets.newHashSet(cp)));
ke han81a38b92017-03-10 18:41:44 +0800414 groupMemberMap.remove(groupMember.getId());
415 }
416
417 private void sendQuery(GroupMember groupMember) {
418 Ethernet ethpkt;
419 Ip4Address srcIp = getDeviceIp(groupMember.getDeviceId());
420 if (groupMember.getv2()) {
421 ethpkt = IgmpSender.getInstance().buildIgmpV2Query(groupMember.getGroupIp(), srcIp);
422 } else {
423 ethpkt = IgmpSender.getInstance().buildIgmpV3Query(groupMember.getGroupIp(), srcIp);
424 }
425 IgmpSender.getInstance().sendIgmpPacket(ethpkt, groupMember.getDeviceId(), groupMember.getPortNumber());
426 }
427
Esin Karamaneff10392019-06-27 18:09:13 +0000428 /**
429 * @return connect point of the source if configured; and empty Optional otherwise.
430 */
431 public static Optional<ConnectPoint> getSource() {
432 return sourceDeviceAndPort == null ? Optional.empty() :
433 Optional.of(sourceDeviceAndPort);
ke han81a38b92017-03-10 18:41:44 +0800434 }
435
436 /**
437 * Packet processor responsible for forwarding packets along their paths.
438 */
439 private class IgmpPacketProcessor implements PacketProcessor {
440 @Override
441 public void process(PacketContext context) {
442
443 try {
444 InboundPacket pkt = context.inPacket();
445 Ethernet ethPkt = pkt.parsed();
446 if (ethPkt == null) {
447 return;
448 }
449
450 if (ethPkt.getEtherType() != Ethernet.TYPE_IPV4) {
451 return;
452 }
453
454 IPv4 ipv4Pkt = (IPv4) ethPkt.getPayload();
455
456 if (ipv4Pkt.getProtocol() != IPv4.PROTOCOL_IGMP) {
457 return;
458 }
459
460 short vlan = ethPkt.getVlanID();
461 DeviceId deviceId = pkt.receivedFrom().deviceId();
462
Deepa Vaddireddybcd52352017-09-21 05:04:48 +0000463 if (oltData.get(deviceId) == null &&
Deepa Vaddireddyca7b25d2017-09-28 13:47:18 +0000464 !isConnectPoint(deviceId, pkt.receivedFrom().port())) {
ke han81a38b92017-03-10 18:41:44 +0800465 log.error("Device not registered in netcfg :" + deviceId.toString());
466 return;
467 }
468
469 IGMP igmp = (IGMP) ipv4Pkt.getPayload();
470 switch (igmp.getIgmpType()) {
471 case IGMP.TYPE_IGMPV3_MEMBERSHIP_QUERY:
472 //Discard Query from OLT’s non-uplink port’s
473 if (!pkt.receivedFrom().port().equals(getDeviceUplink(deviceId))) {
Deepa Vaddireddybcd52352017-09-21 05:04:48 +0000474 if (isConnectPoint(deviceId, pkt.receivedFrom().port())) {
475 log.info("IGMP Picked up query from connectPoint");
476 //OK to process packet
477 processIgmpConnectPointQuery((IGMPQuery) igmp.getGroups().get(0), pkt.receivedFrom(),
478 0xff & igmp.getMaxRespField());
479 break;
480 } else {
481 //Not OK to process packet
482 log.warn("IGMP Picked up query from non-uplink port");
483 return;
484 }
ke han81a38b92017-03-10 18:41:44 +0800485 }
486
487 processIgmpQuery((IGMPQuery) igmp.getGroups().get(0), pkt.receivedFrom(),
488 0xff & igmp.getMaxRespField());
489 break;
490 case IGMP.TYPE_IGMPV1_MEMBERSHIP_REPORT:
491 log.debug("IGMP version 1 message types are not currently supported.");
492 break;
493 case IGMP.TYPE_IGMPV3_MEMBERSHIP_REPORT:
494 case IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT:
495 case IGMP.TYPE_IGMPV2_LEAVE_GROUP:
496 //Discard join/leave from OLT’s uplink port’s
Deepa Vaddireddyca7b25d2017-09-28 13:47:18 +0000497 if (pkt.receivedFrom().port().equals(getDeviceUplink(deviceId)) ||
498 isConnectPoint(deviceId, pkt.receivedFrom().port())) {
499 log.info("IGMP Picked up join/leave from uplink/connectPoint port");
ke han81a38b92017-03-10 18:41:44 +0800500 return;
501 }
502
503 Iterator<IGMPGroup> itr = igmp.getGroups().iterator();
504 while (itr.hasNext()) {
505 IGMPGroup group = itr.next();
506 if (group instanceof IGMPMembership) {
507 processIgmpReport((IGMPMembership) group, VlanId.vlanId(vlan),
508 pkt.receivedFrom(), igmp.getIgmpType());
509 } else if (group instanceof IGMPQuery) {
510 IGMPMembership mgroup;
511 mgroup = new IGMPMembership(group.getGaddr().getIp4Address());
512 mgroup.setRecordType(igmp.getIgmpType() == IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT ?
513 IGMPMembership.MODE_IS_EXCLUDE : IGMPMembership.MODE_IS_INCLUDE);
514 processIgmpReport(mgroup, VlanId.vlanId(vlan),
515 pkt.receivedFrom(), igmp.getIgmpType());
516 }
517 }
518 break;
519
520 default:
521 log.info("wrong IGMP v3 type:" + igmp.getIgmpType());
522 break;
523 }
524
525 } catch (Exception ex) {
Deepa Vaddireddybcd52352017-09-21 05:04:48 +0000526 log.error("igmp process error : {} ", ex);
ke han81a38b92017-03-10 18:41:44 +0800527 ex.printStackTrace();
528 }
529 }
530 }
531
532 private class IgmpProxyTimerTask extends TimerTask {
533 public void run() {
534 try {
535 IgmpTimer.timeOut1s();
536 queryMembers();
537 } catch (Exception ex) {
538 log.warn("Igmp timer task error : {}", ex.getMessage());
539 }
540 }
541
542 private void queryMembers() {
543 GroupMember groupMember;
544 Set groupMemberSet = groupMemberMap.entrySet();
545 Iterator itr = groupMemberSet.iterator();
546 while (itr.hasNext()) {
547 Map.Entry entry = (Map.Entry) itr.next();
548 groupMember = (GroupMember) entry.getValue();
549 DeviceId did = groupMember.getDeviceId();
550 if (mastershipService.isLocalMaster(did)) {
551 if (groupMember.isLeave()) {
552 lastQuery(groupMember);
553 } else if (periodicQuery) {
554 periodicQuery(groupMember);
555 }
556 }
557 }
558 }
559
560 private void lastQuery(GroupMember groupMember) {
561 if (groupMember.getLastQueryInterval() < lastQueryInterval) {
562 groupMember.lastQueryInterval(true); // count times
563 } else if (groupMember.getLastQueryCount() < lastQueryCount - 1) {
564 sendQuery(groupMember);
565 groupMember.lastQueryInterval(false); // reset count number
566 groupMember.lastQueryCount(true); //count times
567 } else if (groupMember.getLastQueryCount() == lastQueryCount - 1) {
568 leaveAction(groupMember);
569 }
570 }
571
572 private void periodicQuery(GroupMember groupMember) {
573 if (groupMember.getKeepAliveQueryInterval() < keepAliveInterval) {
574 groupMember.keepAliveInterval(true);
575 } else if (groupMember.getKeepAliveQueryCount() < keepAliveCount) {
576 sendQuery(groupMember);
577 groupMember.keepAliveInterval(false);
578 groupMember.keepAliveQueryCount(true);
579 } else if (groupMember.getKeepAliveQueryCount() == keepAliveCount) {
580 leaveAction(groupMember);
581 }
582 }
583
584 }
585
586 public static PortNumber getDeviceUplink(DeviceId devId) {
Deepa Vaddireddybcd52352017-09-21 05:04:48 +0000587 if (oltData.get(devId) != null) {
588 return oltData.get(devId).uplink();
589 } else {
590 return null;
591 }
ke han81a38b92017-03-10 18:41:44 +0800592 }
593
594 private void processFilterObjective(DeviceId devId, PortNumber port, boolean remove) {
Esin Karamaneff10392019-06-27 18:09:13 +0000595 if (!enableIgmpProvisioning) {
596 log.debug("IGMP trap rules won't be installed since enableIgmpProvisioning flag is not set");
597 return;
598 }
ke han81a38b92017-03-10 18:41:44 +0800599 //TODO migrate to packet requests when packet service uses filtering objectives
600 DefaultFilteringObjective.Builder builder = DefaultFilteringObjective.builder();
601
602 builder = remove ? builder.deny() : builder.permit();
603
604 FilteringObjective igmp = builder
605 .withKey(Criteria.matchInPort(port))
606 .addCondition(Criteria.matchEthType(EthType.EtherType.IPV4.ethType()))
607 .addCondition(Criteria.matchIPProtocol(IPv4.PROTOCOL_IGMP))
608 .withMeta(DefaultTrafficTreatment.builder().setOutput(PortNumber.CONTROLLER).build())
609 .fromApp(appId)
Esin Karamaneff10392019-06-27 18:09:13 +0000610 .withPriority(MAX_PRIORITY)
ke han81a38b92017-03-10 18:41:44 +0800611 .add(new ObjectiveContext() {
612 @Override
613 public void onSuccess(Objective objective) {
Esin Karamaneff10392019-06-27 18:09:13 +0000614 log.info("Igmp filter for {} on {} {}.",
615 devId, port, (remove) ? REMOVED : INSTALLED);
ke han81a38b92017-03-10 18:41:44 +0800616 }
617
618 @Override
619 public void onError(Objective objective, ObjectiveError error) {
Esin Karamaneff10392019-06-27 18:09:13 +0000620 log.info("Igmp filter {} for device {} on port {} failed because of {}",
621 (remove) ? INSTALLATION : REMOVAL, devId, port,
622 error);
ke han81a38b92017-03-10 18:41:44 +0800623 }
624 });
625
626 flowObjectiveService.filter(devId, igmp);
Esin Karamaneff10392019-06-27 18:09:13 +0000627
ke han81a38b92017-03-10 18:41:44 +0800628 }
629
630 private boolean isConnectPoint(DeviceId device, PortNumber port) {
Deepa Vaddireddy01f33b42017-07-02 13:26:53 +0530631 if (connectPoint != null) {
632 return (connectPointMode && connectPoint.deviceId().equals(device)
633 && connectPoint.port().equals(port));
634 } else {
635 log.info("connectPoint not configured for device {}", device);
636 return false;
637 }
ke han81a38b92017-03-10 18:41:44 +0800638 }
Deepa Vaddireddy01f33b42017-07-02 13:26:53 +0530639
ke han81a38b92017-03-10 18:41:44 +0800640 private boolean isUplink(DeviceId device, PortNumber port) {
641 return ((!connectPointMode) && oltData.containsKey(device)
642 && oltData.get(device).uplink().equals(port));
643 }
644
645 private class InternalDeviceListener implements DeviceListener {
646 @Override
647 public void event(DeviceEvent event) {
648 DeviceId devId = event.subject().id();
Deepa Vaddireddyca7b25d2017-09-28 13:47:18 +0000649 Port p = event.port();
650 if (oltData.get(devId) == null &&
651 !(p != null && isConnectPoint(devId, p.number()))) {
ke han81a38b92017-03-10 18:41:44 +0800652 return;
653 }
Deepa Vaddireddyca7b25d2017-09-28 13:47:18 +0000654 PortNumber port;
655
ke han81a38b92017-03-10 18:41:44 +0800656 switch (event.type()) {
657
658 case DEVICE_ADDED:
659 case DEVICE_UPDATED:
660 case DEVICE_REMOVED:
661 case DEVICE_SUSPENDED:
662 case DEVICE_AVAILABILITY_CHANGED:
663 case PORT_STATS_UPDATED:
664 break;
665 case PORT_ADDED:
Deepa Vaddireddyca7b25d2017-09-28 13:47:18 +0000666 port = p.number();
ke han81a38b92017-03-10 18:41:44 +0800667 if (oltData.containsKey(devId) && !isUplink(devId, port) && !isConnectPoint(devId, port)) {
668 processFilterObjective(devId, port, false);
669 } else if (isUplink(devId, port)) {
670 provisionUplinkFlows();
671 } else if (isConnectPoint(devId, port)) {
672 provisionConnectPointFlows();
673 }
674 break;
675 case PORT_UPDATED:
Deepa Vaddireddyca7b25d2017-09-28 13:47:18 +0000676 port = p.number();
ke han81a38b92017-03-10 18:41:44 +0800677 if (oltData.containsKey(devId) && !isUplink(devId, port) && !isConnectPoint(devId, port)) {
678 if (event.port().isEnabled()) {
679 processFilterObjective(devId, port, false);
680 } else {
681 processFilterObjective(devId, port, true);
682 }
683 } else if (isUplink(devId, port)) {
684 if (event.port().isEnabled()) {
685 provisionUplinkFlows(devId);
686 } else {
687 processFilterObjective(devId, port, true);
688 }
689 } else if (isConnectPoint(devId, port)) {
690 if (event.port().isEnabled()) {
691 provisionConnectPointFlows();
692 } else {
693 unprovisionConnectPointFlows();
694 }
695 }
696 break;
697 case PORT_REMOVED:
Deepa Vaddireddyca7b25d2017-09-28 13:47:18 +0000698 port = p.number();
ke han81a38b92017-03-10 18:41:44 +0800699 processFilterObjective(devId, port, true);
700 break;
701 default:
702 log.info("Unknown device event {}", event.type());
703 break;
704 }
705 }
706
707 @Override
708 public boolean isRelevant(DeviceEvent event) {
709 return true;
710 }
711 }
712
713 private class InternalNetworkConfigListener implements NetworkConfigListener {
714
715 private void reconfigureNetwork(IgmpproxyConfig cfg) {
Esin Karamaneff10392019-06-27 18:09:13 +0000716 IgmpproxyConfig newCfg = cfg == null ? new IgmpproxyConfig() : cfg;
ke han81a38b92017-03-10 18:41:44 +0800717
718 unSolicitedTimeout = newCfg.unsolicitedTimeOut();
719 maxResp = newCfg.maxResp();
720 keepAliveInterval = newCfg.keepAliveInterval();
721 keepAliveCount = newCfg.keepAliveCount();
722 lastQueryInterval = newCfg.lastQueryInterval();
723 lastQueryCount = newCfg.lastQueryCount();
724 withRAUplink = newCfg.withRAUplink();
725 withRADownlink = newCfg.withRADownlink();
726 igmpCos = newCfg.igmpCos();
727 periodicQuery = newCfg.periodicQuery();
728 fastLeave = newCfg.fastLeave();
ke han29af27b2017-09-08 10:29:12 +0800729 pimSSmInterworking = newCfg.pimSsmInterworking();
Esin Karamaneff10392019-06-27 18:09:13 +0000730 enableIgmpProvisioning = newCfg.enableIgmpProvisioning();
Deepa Vaddireddyca7b25d2017-09-28 13:47:18 +0000731
732 if (connectPointMode != newCfg.connectPointMode() ||
733 connectPoint != newCfg.connectPoint()) {
ke han81a38b92017-03-10 18:41:44 +0800734 connectPointMode = newCfg.connectPointMode();
Deepa Vaddireddyca7b25d2017-09-28 13:47:18 +0000735 connectPoint = newCfg.connectPoint();
ke han81a38b92017-03-10 18:41:44 +0800736 if (connectPointMode) {
737 unprovisionUplinkFlows();
738 provisionConnectPointFlows();
739 } else {
740 unprovisionConnectPointFlows();
741 provisionUplinkFlows();
742 }
743 }
744 if (connectPoint != null) {
Esin Karamaneff10392019-06-27 18:09:13 +0000745 log.info("connect point : {}", connectPoint);
ke han81a38b92017-03-10 18:41:44 +0800746 }
Esin Karamaneff10392019-06-27 18:09:13 +0000747 log.info("mode: {}", connectPointMode);
748
749 getSourceConnectPoint(newCfg);
ke han81a38b92017-03-10 18:41:44 +0800750
751 IgmpSender.getInstance().setIgmpCos(igmpCos);
752 IgmpSender.getInstance().setMaxResp(maxResp);
753 IgmpSender.getInstance().setMvlan(mvlan);
754 IgmpSender.getInstance().setWithRADownlink(withRADownlink);
755 IgmpSender.getInstance().setWithRAUplink(withRAUplink);
Esin Karamaneff10392019-06-27 18:09:13 +0000756 }
ke han81a38b92017-03-10 18:41:44 +0800757
Esin Karamaneff10392019-06-27 18:09:13 +0000758 void getSourceConnectPoint(IgmpproxyConfig cfg) {
759 sourceDeviceAndPort = cfg.getSourceDeviceAndPort();
760 if (sourceDeviceAndPort != null) {
761 log.debug("source parameter configured to {}", sourceDeviceAndPort);
762 }
ke han81a38b92017-03-10 18:41:44 +0800763 }
764
765 public void reconfigureSsmTable(IgmpproxySsmTranslateConfig cfg) {
766 if (cfg == null) {
767 return;
768 }
769 Collection<McastRoute> translations = cfg.getSsmTranslations();
770 for (McastRoute route : translations) {
Esin Karamaneff10392019-06-27 18:09:13 +0000771 ssmTranslateTable.put(route.group().getIp4Address(), route.source().get().getIp4Address());
ke han81a38b92017-03-10 18:41:44 +0800772 }
773 }
774
775 @Override
776 public void event(NetworkConfigEvent event) {
777 switch (event.type()) {
778 case CONFIG_ADDED:
779 case CONFIG_UPDATED:
780 if (event.configClass().equals(CONFIG_CLASS)) {
781 AccessDeviceConfig config =
782 networkConfig.getConfig((DeviceId) event.subject(), CONFIG_CLASS);
783 if (config != null) {
784 oltData.put(config.getAccessDevice().deviceId(), config.getAccessDevice());
785 provisionDefaultFlows((DeviceId) event.subject());
786 provisionUplinkFlows((DeviceId) event.subject());
787 }
788 }
789
790 if (event.configClass().equals(IGMPPROXY_CONFIG_CLASS)) {
791 IgmpproxyConfig config = networkConfig.getConfig(appId, IGMPPROXY_CONFIG_CLASS);
792 if (config != null) {
Esin Karamaneff10392019-06-27 18:09:13 +0000793 log.info("igmpproxy config received. {}", config);
ke han81a38b92017-03-10 18:41:44 +0800794 reconfigureNetwork(config);
795 }
796 }
797
798 if (event.configClass().equals(IGMPPROXY_SSM_CONFIG_CLASS)) {
799 IgmpproxySsmTranslateConfig config = networkConfig.getConfig(appId, IGMPPROXY_SSM_CONFIG_CLASS);
800 if (config != null) {
801 reconfigureSsmTable(config);
802 }
803 }
804
805 if (event.configClass().equals(MCAST_CONFIG_CLASS)) {
806 McastConfig config = networkConfig.getConfig(coreAppId, MCAST_CONFIG_CLASS);
807 if (config != null && mvlan != config.egressVlan().toShort()) {
808 mvlan = config.egressVlan().toShort();
Deepa Vaddireddy88134a42017-07-05 12:16:03 +0530809 IgmpSender.getInstance().setMvlan(mvlan);
ke han81a38b92017-03-10 18:41:44 +0800810 groupMemberMap.values().forEach(m -> leaveAction(m));
811 }
812 }
813
814 log.info("Reconfigured");
815 break;
816 case CONFIG_REGISTERED:
817 case CONFIG_UNREGISTERED:
818 break;
819 case CONFIG_REMOVED:
820 if (event.configClass().equals(CONFIG_CLASS)) {
821 oltData.remove(event.subject());
822 }
823
824 default:
825 break;
826 }
827 }
828 }
829
830 private void provisionDefaultFlows(DeviceId deviceId) {
831 List<Port> ports = deviceService.getPorts(deviceId);
832 ports.stream()
833 .filter(p -> (!oltData.get(p.element().id()).uplink().equals(p.number()) && p.isEnabled()))
834 .forEach(p -> processFilterObjective((DeviceId) p.element().id(), p.number(), false));
835 }
836
837 private void provisionUplinkFlows(DeviceId deviceId) {
838 if (connectPointMode) {
839 return;
840 }
841
842 processFilterObjective(deviceId, oltData.get(deviceId).uplink(), false);
843 }
844
845 private void provisionUplinkFlows() {
846 if (connectPointMode) {
847 return;
848 }
849
David K. Bainbridge4809c0e2017-08-17 09:54:40 -0700850 oltData.keySet().forEach(deviceId -> provisionUplinkFlows(deviceId));
ke han81a38b92017-03-10 18:41:44 +0800851 }
852 private void unprovisionUplinkFlows() {
853 oltData.keySet().forEach(deviceId ->
854 processFilterObjective(deviceId, oltData.get(deviceId).uplink(), true));
855 }
856
857 private void provisionConnectPointFlows() {
858 if ((!connectPointMode) || connectPoint == null) {
859 return;
860 }
861
862 processFilterObjective(connectPoint.deviceId(), connectPoint.port(), false);
863 }
864 private void unprovisionConnectPointFlows() {
865 if (connectPoint == null) {
866 return;
867 }
868 processFilterObjective(connectPoint.deviceId(), connectPoint.port(), true);
869 }
870}