blob: 6063b96de4a27393400722f82e7f077dff547709 [file] [log] [blame]
David K. Bainbridged77028f2017-08-01 12:47:55 -07001/*
Brian O'Connor4d084702017-08-03 22:45:58 -07002 * Copyright 2017-present Open Networking Foundation
David K. Bainbridged77028f2017-08-01 12:47:55 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
ke han81a38b92017-03-10 18:41:44 +080016package org.opencord.igmpproxy;
17
18import com.google.common.collect.Maps;
19import org.apache.felix.scr.annotations.Activate;
20import org.apache.felix.scr.annotations.Component;
21import org.apache.felix.scr.annotations.Deactivate;
22import org.apache.felix.scr.annotations.Reference;
23import org.apache.felix.scr.annotations.ReferenceCardinality;
24import org.onlab.packet.EthType;
25import org.onlab.packet.Ethernet;
26import org.onlab.packet.IGMP;
27import org.onlab.packet.IGMPGroup;
28import org.onlab.packet.IGMPMembership;
29import org.onlab.packet.IGMPQuery;
30import org.onlab.packet.IPv4;
31import org.onlab.packet.Ip4Address;
32import org.onlab.packet.IpAddress;
33import org.onlab.packet.VlanId;
34import org.onosproject.core.ApplicationId;
35import org.onosproject.core.CoreService;
36import org.onosproject.incubator.net.config.basics.McastConfig;
37import org.onosproject.mastership.MastershipService;
38import org.onosproject.net.AnnotationKeys;
39import org.onosproject.net.ConnectPoint;
40import org.onosproject.net.DeviceId;
41import org.onosproject.net.Port;
42import org.onosproject.net.PortNumber;
43import org.onosproject.net.config.ConfigFactory;
44import org.onosproject.net.config.NetworkConfigEvent;
45import org.onosproject.net.config.NetworkConfigListener;
46import org.onosproject.net.config.NetworkConfigRegistry;
47import org.onosproject.net.config.basics.SubjectFactories;
48import org.onosproject.net.device.DeviceEvent;
49import org.onosproject.net.device.DeviceListener;
50import org.onosproject.net.device.DeviceService;
51import org.onosproject.net.flow.DefaultTrafficTreatment;
52import org.onosproject.net.flow.FlowRuleService;
53import org.onosproject.net.flow.criteria.Criteria;
54import org.onosproject.net.flowobjective.DefaultFilteringObjective;
55import org.onosproject.net.flowobjective.FilteringObjective;
56import org.onosproject.net.flowobjective.FlowObjectiveService;
57import org.onosproject.net.flowobjective.Objective;
58import org.onosproject.net.flowobjective.ObjectiveContext;
59import org.onosproject.net.flowobjective.ObjectiveError;
60import org.onosproject.net.mcast.McastRoute;
61import org.onosproject.net.mcast.MulticastRouteService;
62import org.onosproject.net.packet.InboundPacket;
63import org.onosproject.net.packet.PacketContext;
64import org.onosproject.net.packet.PacketProcessor;
65import org.onosproject.net.packet.PacketService;
66import org.opencord.cordconfig.access.AccessDeviceConfig;
67import org.opencord.cordconfig.access.AccessDeviceData;
68import org.slf4j.Logger;
69import org.slf4j.LoggerFactory;
70
71import java.util.ArrayList;
72import java.util.Collection;
73import java.util.Iterator;
74import java.util.List;
75import java.util.Map;
76import java.util.Set;
77import java.util.TimerTask;
78import java.util.concurrent.ConcurrentHashMap;
79import java.util.concurrent.Executors;
80import java.util.concurrent.ScheduledExecutorService;
81import java.util.concurrent.TimeUnit;
82
83/**
84 * Igmp process application, use proxy mode, support first join/ last leave , fast leave
85 * period query and keep alive, packet out igmp message to uplink port features.
86 */
87@Component(immediate = true)
88public class IgmpManager {
89
90 private static final Class<AccessDeviceConfig> CONFIG_CLASS =
91 AccessDeviceConfig.class;
92 private static final Class<IgmpproxyConfig> IGMPPROXY_CONFIG_CLASS =
93 IgmpproxyConfig.class;
94 private static final Class<IgmpproxySsmTranslateConfig> IGMPPROXY_SSM_CONFIG_CLASS =
95 IgmpproxySsmTranslateConfig.class;
96 private static final Class<McastConfig> MCAST_CONFIG_CLASS =
97 McastConfig.class;
98 public static Map<String, GroupMember> groupMemberMap = Maps.newConcurrentMap();
99 private static ApplicationId appId;
100 private static Map<DeviceId, AccessDeviceData> oltData = new ConcurrentHashMap<>();
101 private static int unSolicitedTimeout = 3; // unit is 1 sec
102 private static int keepAliveCount = 3;
103 private static int lastQueryInterval = 2; //unit is 1 sec
104 private static int lastQueryCount = 2;
105 private static boolean fastLeave = true;
106 private static boolean withRAUplink = true;
107 private static boolean withRADownlink = false;
108 private static boolean periodicQuery = true;
109 private static short mvlan = 4000;
110 private static byte igmpCos = 7;
111 public static boolean connectPointMode = true;
112 public static ConnectPoint connectPoint = null;
113
ke hanf9ed58c2017-09-08 10:29:12 +0800114 private static boolean pimSSmInterworking = false;
115 private static final String DEFAULT_PIMSSM_HOST = "127.0.0.1";
ke han81a38b92017-03-10 18:41:44 +0800116 private final ScheduledExecutorService scheduledExecutorService =
117 Executors.newScheduledThreadPool(1);
118 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
119 protected CoreService coreService;
120 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
121 protected PacketService packetService;
122 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
123 protected MastershipService mastershipService;
124 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
125 protected FlowRuleService flowRuleService;
126 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
127 protected DeviceService deviceService;
128 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
129 protected FlowObjectiveService flowObjectiveService;
130 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
131 protected NetworkConfigRegistry networkConfig;
132 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
133 protected MulticastRouteService multicastService;
134 private IgmpPacketProcessor processor = new IgmpPacketProcessor();
135 private Logger log = LoggerFactory.getLogger(getClass());
136 private ApplicationId coreAppId;
137 private Map<Ip4Address, Ip4Address> ssmTranslateTable = new ConcurrentHashMap<>();
138 private InternalNetworkConfigListener configListener =
139 new InternalNetworkConfigListener();
140 private DeviceListener deviceListener = new InternalDeviceListener();
141 private ConfigFactory<DeviceId, AccessDeviceConfig> configFactory = null;
142 private ConfigFactory<ApplicationId, IgmpproxyConfig> igmpproxyConfigFactory =
143 new ConfigFactory<ApplicationId, IgmpproxyConfig>(
144 SubjectFactories.APP_SUBJECT_FACTORY, IGMPPROXY_CONFIG_CLASS, "igmpproxy") {
145 @Override
146 public IgmpproxyConfig createConfig() {
147 return new IgmpproxyConfig();
148 }
149 };
150 private ConfigFactory<ApplicationId, IgmpproxySsmTranslateConfig> igmpproxySsmConfigFactory =
151 new ConfigFactory<ApplicationId, IgmpproxySsmTranslateConfig>(
152 SubjectFactories.APP_SUBJECT_FACTORY, IGMPPROXY_SSM_CONFIG_CLASS, "ssmTranslate", true) {
153 @Override
154 public IgmpproxySsmTranslateConfig createConfig() {
155 return new IgmpproxySsmTranslateConfig();
156 }
157 };
158 private int maxResp = 10; //unit is 1 sec
159 private int keepAliveInterval = 120; //unit is 1 sec
160
161 public static int getUnsolicitedTimeout() {
162 return unSolicitedTimeout;
163 }
164
165 @Activate
166 protected void activate() {
167 appId = coreService.registerApplication("org.opencord.igmpproxy");
168 coreAppId = coreService.registerApplication(CoreService.CORE_APP_NAME);
169 packetService.addProcessor(processor, PacketProcessor.director(4));
170 IgmpSender.init(packetService, mastershipService);
171
172 if (networkConfig.getConfigFactory(CONFIG_CLASS) == null) {
173 configFactory =
174 new ConfigFactory<DeviceId, AccessDeviceConfig>(
175 SubjectFactories.DEVICE_SUBJECT_FACTORY, CONFIG_CLASS, "accessDevice") {
176 @Override
177 public AccessDeviceConfig createConfig() {
178 return new AccessDeviceConfig();
179 }
180 };
181 networkConfig.registerConfigFactory(configFactory);
182 }
183 networkConfig.registerConfigFactory(igmpproxySsmConfigFactory);
184 networkConfig.registerConfigFactory(igmpproxyConfigFactory);
185 networkConfig.addListener(configListener);
186
187 configListener.reconfigureNetwork(networkConfig.getConfig(appId, IGMPPROXY_CONFIG_CLASS));
188 configListener.reconfigureSsmTable(networkConfig.getConfig(appId, IGMPPROXY_SSM_CONFIG_CLASS));
189
190 networkConfig.getSubjects(DeviceId.class, AccessDeviceConfig.class).forEach(
191 subject -> {
192 AccessDeviceConfig config = networkConfig.getConfig(subject,
193 AccessDeviceConfig.class);
194 if (config != null) {
195 AccessDeviceData data = config.getAccessDevice();
196 oltData.put(data.deviceId(), data);
197 }
198 }
199 );
200
David K. Bainbridge4809c0e2017-08-17 09:54:40 -0700201 oltData.keySet().forEach(d -> provisionDefaultFlows(d));
ke han81a38b92017-03-10 18:41:44 +0800202 if (connectPointMode) {
203 provisionConnectPointFlows();
204 } else {
205 provisionUplinkFlows();
206 }
207
208 McastConfig config = networkConfig.getConfig(coreAppId, MCAST_CONFIG_CLASS);
209 if (config != null) {
210 mvlan = config.egressVlan().toShort();
Deepa Vaddireddy88134a42017-07-05 12:16:03 +0530211 IgmpSender.getInstance().setMvlan(mvlan);
ke han81a38b92017-03-10 18:41:44 +0800212 }
213 deviceService.addListener(deviceListener);
214 scheduledExecutorService.scheduleAtFixedRate(new IgmpProxyTimerTask(), 0, 1000, TimeUnit.MILLISECONDS);
215
216 log.info("Started");
217 }
218
219 @Deactivate
220 protected void deactivate() {
221 scheduledExecutorService.shutdown();
222
223 // de-register and null our handler
224 networkConfig.removeListener(configListener);
225 if (configFactory != null) {
226 networkConfig.unregisterConfigFactory(configFactory);
227 }
228 networkConfig.unregisterConfigFactory(igmpproxyConfigFactory);
229 networkConfig.unregisterConfigFactory(igmpproxySsmConfigFactory);
230 deviceService.removeListener(deviceListener);
231 packetService.removeProcessor(processor);
232 flowRuleService.removeFlowRulesById(appId);
233
234 log.info("Stopped");
235 }
236
237 protected Ip4Address getDeviceIp(DeviceId ofDeviceId) {
238 try {
239 String[] mgmtAddress = deviceService.getDevice(ofDeviceId)
240 .annotations().value(AnnotationKeys.MANAGEMENT_ADDRESS).split(":");
241 return Ip4Address.valueOf(mgmtAddress[0]);
242 } catch (Exception ex) {
243 log.info("No valid Ipaddress for " + ofDeviceId.toString());
244 return null;
245 }
246 }
247
248 private void processIgmpQuery(IGMPQuery igmpQuery, ConnectPoint cp, int maxResp) {
249
250 DeviceId deviceId = cp.deviceId();
251 Ip4Address gAddr = igmpQuery.getGaddr().getIp4Address();
252
253 if (maxResp >= 128) {
254 int mant = maxResp & 0xf;
255 int exp = (maxResp >> 4) & 0x7;
256 maxResp = (mant | 0x10) << (exp + 3);
257 }
258
259 maxResp = (maxResp + 5) / 10;
260
261 if (gAddr != null && !gAddr.isZero()) {
262 StateMachine.specialQuery(deviceId, gAddr, maxResp);
263 } else {
264 StateMachine.generalQuery(deviceId, maxResp);
265 }
266
267 }
268
269 private Ip4Address ssmTranslateRoute(IpAddress group) {
270 return ssmTranslateTable.get(group);
271 }
272
273 private void processIgmpReport(IGMPMembership igmpGroup, VlanId vlan, ConnectPoint cp, byte igmpType) {
274 DeviceId deviceId = cp.deviceId();
275 PortNumber portNumber = cp.port();
276
277 Ip4Address groupIp = igmpGroup.getGaddr().getIp4Address();
278 if (!groupIp.isMulticast()) {
279 log.info(groupIp.toString() + " is not a valid group address");
280 return;
281 }
282 Ip4Address srcIp = getDeviceIp(deviceId);
283
284 byte recordType = igmpGroup.getRecordType();
285 boolean join = false;
286
287 ArrayList<Ip4Address> sourceList = new ArrayList<>();
288
289 if (igmpGroup.getSources().size() > 0) {
290 igmpGroup.getSources().forEach(source -> sourceList.add(source.getIp4Address()));
291 if (recordType == IGMPMembership.CHANGE_TO_EXCLUDE_MODE ||
292 recordType == IGMPMembership.MODE_IS_EXCLUDE ||
293 recordType == IGMPMembership.BLOCK_OLD_SOURCES) {
294 join = false;
295 } else if (recordType == IGMPMembership.CHANGE_TO_INCLUDE_MODE ||
296 recordType == IGMPMembership.MODE_IS_INCLUDE ||
297 recordType == IGMPMembership.ALLOW_NEW_SOURCES) {
298 join = true;
299 }
300 } else {
ke hanf9ed58c2017-09-08 10:29:12 +0800301 IpAddress src = null;
302 if (pimSSmInterworking) {
303 src = ssmTranslateRoute(groupIp);
304 if (src == null) {
305 log.info("no ssm translate for group " + groupIp.toString());
306 return;
307 }
308 } else {
309 src = IpAddress.valueOf(DEFAULT_PIMSSM_HOST);
ke han81a38b92017-03-10 18:41:44 +0800310 }
311 sourceList.add(src.getIp4Address());
312 if (recordType == IGMPMembership.CHANGE_TO_EXCLUDE_MODE ||
313 recordType == IGMPMembership.MODE_IS_EXCLUDE ||
314 recordType == IGMPMembership.BLOCK_OLD_SOURCES) {
315 join = true;
316 } else if (recordType == IGMPMembership.CHANGE_TO_INCLUDE_MODE ||
317 recordType == IGMPMembership.MODE_IS_INCLUDE ||
318 recordType == IGMPMembership.ALLOW_NEW_SOURCES) {
319 join = false;
320 }
321 }
322 String groupMemberKey = GroupMember.getkey(groupIp, deviceId, portNumber);
323 GroupMember groupMember = groupMemberMap.get(groupMemberKey);
324
325 if (join) {
326 if (groupMember == null) {
327 if (igmpType == IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT) {
328 groupMember = new GroupMember(groupIp, vlan, deviceId, portNumber, true);
329 } else {
330 groupMember = new GroupMember(groupIp, vlan, deviceId, portNumber, false);
331 }
332 StateMachine.join(deviceId, groupIp, srcIp);
333 groupMemberMap.put(groupMemberKey, groupMember);
334 groupMember.updateList(recordType, sourceList);
335 groupMember.getSourceList().forEach(source -> multicastService.addSink(
336 new McastRoute(source, groupIp, McastRoute.Type.IGMP), cp));
337 }
338 groupMember.resetAllTimers();
339 groupMember.updateList(recordType, sourceList);
340 groupMember.setLeave(false);
341 } else {
342 if (groupMember == null) {
343 log.info("receive leave but no instance, group " + groupIp.toString() +
344 " device:" + deviceId.toString() + " port:" + portNumber.toString());
345 return;
346 } else {
347 groupMember.setLeave(true);
348 if (fastLeave) {
349 leaveAction(groupMember);
350 } else {
351 sendQuery(groupMember);
352 }
353 }
354 }
355 }
356
357 private void leaveAction(GroupMember groupMember) {
358 ConnectPoint cp = new ConnectPoint(groupMember.getDeviceId(), groupMember.getPortNumber());
359 StateMachine.leave(groupMember.getDeviceId(), groupMember.getGroupIp());
360 groupMember.getSourceList().forEach(source -> multicastService.removeSink(
361 new McastRoute(source, groupMember.getGroupIp(),
362 McastRoute.Type.IGMP), cp));
363 groupMemberMap.remove(groupMember.getId());
364 }
365
366 private void sendQuery(GroupMember groupMember) {
367 Ethernet ethpkt;
368 Ip4Address srcIp = getDeviceIp(groupMember.getDeviceId());
369 if (groupMember.getv2()) {
370 ethpkt = IgmpSender.getInstance().buildIgmpV2Query(groupMember.getGroupIp(), srcIp);
371 } else {
372 ethpkt = IgmpSender.getInstance().buildIgmpV3Query(groupMember.getGroupIp(), srcIp);
373 }
374 IgmpSender.getInstance().sendIgmpPacket(ethpkt, groupMember.getDeviceId(), groupMember.getPortNumber());
375 }
376
377 private void processFilterObjective(DeviceId devId, Port port, boolean remove) {
378
379 //TODO migrate to packet requests when packet service uses filtering objectives
380 DefaultFilteringObjective.Builder builder = DefaultFilteringObjective.builder();
381
382 builder = remove ? builder.deny() : builder.permit();
383
384 FilteringObjective igmp = builder
385 .withKey(Criteria.matchInPort(port.number()))
386 .addCondition(Criteria.matchEthType(EthType.EtherType.IPV4.ethType()))
387 .addCondition(Criteria.matchIPProtocol(IPv4.PROTOCOL_IGMP))
388 .withMeta(DefaultTrafficTreatment.builder().setOutput(PortNumber.CONTROLLER).build())
389 .fromApp(appId)
390 .withPriority(10000)
391 .add(new ObjectiveContext() {
392 @Override
393 public void onSuccess(Objective objective) {
394 log.info("IgmpProxy filter for {} on {} installed.",
395 devId, port);
396 }
397
398 @Override
399 public void onError(Objective objective, ObjectiveError error) {
400 log.info("IgmpProxy filter for {} on {} failed because {}.",
401 devId, port, error);
402 }
403 });
404
405 flowObjectiveService.filter(devId, igmp);
406 }
407
408 /**
409 * Packet processor responsible for forwarding packets along their paths.
410 */
411 private class IgmpPacketProcessor implements PacketProcessor {
412 @Override
413 public void process(PacketContext context) {
414
415 try {
416 InboundPacket pkt = context.inPacket();
417 Ethernet ethPkt = pkt.parsed();
418 if (ethPkt == null) {
419 return;
420 }
421
422 if (ethPkt.getEtherType() != Ethernet.TYPE_IPV4) {
423 return;
424 }
425
426 IPv4 ipv4Pkt = (IPv4) ethPkt.getPayload();
427
428 if (ipv4Pkt.getProtocol() != IPv4.PROTOCOL_IGMP) {
429 return;
430 }
431
432 short vlan = ethPkt.getVlanID();
433 DeviceId deviceId = pkt.receivedFrom().deviceId();
434
435 if (oltData.get(deviceId) == null) {
436 log.error("Device not registered in netcfg :" + deviceId.toString());
437 return;
438 }
439
440 IGMP igmp = (IGMP) ipv4Pkt.getPayload();
441 switch (igmp.getIgmpType()) {
442 case IGMP.TYPE_IGMPV3_MEMBERSHIP_QUERY:
443 //Discard Query from OLT’s non-uplink port’s
444 if (!pkt.receivedFrom().port().equals(getDeviceUplink(deviceId))) {
445 log.info("IGMP Picked up query from non-uplink port");
446 return;
447 }
448
449 processIgmpQuery((IGMPQuery) igmp.getGroups().get(0), pkt.receivedFrom(),
450 0xff & igmp.getMaxRespField());
451 break;
452 case IGMP.TYPE_IGMPV1_MEMBERSHIP_REPORT:
453 log.debug("IGMP version 1 message types are not currently supported.");
454 break;
455 case IGMP.TYPE_IGMPV3_MEMBERSHIP_REPORT:
456 case IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT:
457 case IGMP.TYPE_IGMPV2_LEAVE_GROUP:
458 //Discard join/leave from OLT’s uplink port’s
459 if (pkt.receivedFrom().port().equals(getDeviceUplink(deviceId))) {
460 log.info("IGMP Picked up join/leave from the olt uplink port");
461 return;
462 }
463
464 Iterator<IGMPGroup> itr = igmp.getGroups().iterator();
465 while (itr.hasNext()) {
466 IGMPGroup group = itr.next();
467 if (group instanceof IGMPMembership) {
468 processIgmpReport((IGMPMembership) group, VlanId.vlanId(vlan),
469 pkt.receivedFrom(), igmp.getIgmpType());
470 } else if (group instanceof IGMPQuery) {
471 IGMPMembership mgroup;
472 mgroup = new IGMPMembership(group.getGaddr().getIp4Address());
473 mgroup.setRecordType(igmp.getIgmpType() == IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT ?
474 IGMPMembership.MODE_IS_EXCLUDE : IGMPMembership.MODE_IS_INCLUDE);
475 processIgmpReport(mgroup, VlanId.vlanId(vlan),
476 pkt.receivedFrom(), igmp.getIgmpType());
477 }
478 }
479 break;
480
481 default:
482 log.info("wrong IGMP v3 type:" + igmp.getIgmpType());
483 break;
484 }
485
486 } catch (Exception ex) {
487 log.error("igmp process error : " + ex.toString());
488 ex.printStackTrace();
489 }
490 }
491 }
492
493 private class IgmpProxyTimerTask extends TimerTask {
494 public void run() {
495 try {
496 IgmpTimer.timeOut1s();
497 queryMembers();
498 } catch (Exception ex) {
499 log.warn("Igmp timer task error : {}", ex.getMessage());
500 }
501 }
502
503 private void queryMembers() {
504 GroupMember groupMember;
505 Set groupMemberSet = groupMemberMap.entrySet();
506 Iterator itr = groupMemberSet.iterator();
507 while (itr.hasNext()) {
508 Map.Entry entry = (Map.Entry) itr.next();
509 groupMember = (GroupMember) entry.getValue();
510 DeviceId did = groupMember.getDeviceId();
511 if (mastershipService.isLocalMaster(did)) {
512 if (groupMember.isLeave()) {
513 lastQuery(groupMember);
514 } else if (periodicQuery) {
515 periodicQuery(groupMember);
516 }
517 }
518 }
519 }
520
521 private void lastQuery(GroupMember groupMember) {
522 if (groupMember.getLastQueryInterval() < lastQueryInterval) {
523 groupMember.lastQueryInterval(true); // count times
524 } else if (groupMember.getLastQueryCount() < lastQueryCount - 1) {
525 sendQuery(groupMember);
526 groupMember.lastQueryInterval(false); // reset count number
527 groupMember.lastQueryCount(true); //count times
528 } else if (groupMember.getLastQueryCount() == lastQueryCount - 1) {
529 leaveAction(groupMember);
530 }
531 }
532
533 private void periodicQuery(GroupMember groupMember) {
534 if (groupMember.getKeepAliveQueryInterval() < keepAliveInterval) {
535 groupMember.keepAliveInterval(true);
536 } else if (groupMember.getKeepAliveQueryCount() < keepAliveCount) {
537 sendQuery(groupMember);
538 groupMember.keepAliveInterval(false);
539 groupMember.keepAliveQueryCount(true);
540 } else if (groupMember.getKeepAliveQueryCount() == keepAliveCount) {
541 leaveAction(groupMember);
542 }
543 }
544
545 }
546
547 public static PortNumber getDeviceUplink(DeviceId devId) {
548 return oltData.get(devId).uplink();
549 }
550
551 private void processFilterObjective(DeviceId devId, PortNumber port, boolean remove) {
552 //TODO migrate to packet requests when packet service uses filtering objectives
553 DefaultFilteringObjective.Builder builder = DefaultFilteringObjective.builder();
554
555 builder = remove ? builder.deny() : builder.permit();
556
557 FilteringObjective igmp = builder
558 .withKey(Criteria.matchInPort(port))
559 .addCondition(Criteria.matchEthType(EthType.EtherType.IPV4.ethType()))
560 .addCondition(Criteria.matchIPProtocol(IPv4.PROTOCOL_IGMP))
561 .withMeta(DefaultTrafficTreatment.builder().setOutput(PortNumber.CONTROLLER).build())
562 .fromApp(appId)
563 .withPriority(10000)
564 .add(new ObjectiveContext() {
565 @Override
566 public void onSuccess(Objective objective) {
567 log.info("IgmpProxy filter for {} on {} installed.",
568 devId, port);
569 }
570
571 @Override
572 public void onError(Objective objective, ObjectiveError error) {
573 log.info("IgmpProxy filter for {} on {} failed because {}.",
574 devId, port, error);
575 }
576 });
577
578 flowObjectiveService.filter(devId, igmp);
579 }
580
581 private boolean isConnectPoint(DeviceId device, PortNumber port) {
Deepa Vaddireddy01f33b42017-07-02 13:26:53 +0530582 if (connectPoint != null) {
583 return (connectPointMode && connectPoint.deviceId().equals(device)
584 && connectPoint.port().equals(port));
585 } else {
586 log.info("connectPoint not configured for device {}", device);
587 return false;
588 }
ke han81a38b92017-03-10 18:41:44 +0800589 }
Deepa Vaddireddy01f33b42017-07-02 13:26:53 +0530590
ke han81a38b92017-03-10 18:41:44 +0800591 private boolean isUplink(DeviceId device, PortNumber port) {
592 return ((!connectPointMode) && oltData.containsKey(device)
593 && oltData.get(device).uplink().equals(port));
594 }
595
596 private class InternalDeviceListener implements DeviceListener {
597 @Override
598 public void event(DeviceEvent event) {
599 DeviceId devId = event.subject().id();
600 PortNumber port;
601 if (oltData.get(devId) == null) {
602 return;
603 }
604 switch (event.type()) {
605
606 case DEVICE_ADDED:
607 case DEVICE_UPDATED:
608 case DEVICE_REMOVED:
609 case DEVICE_SUSPENDED:
610 case DEVICE_AVAILABILITY_CHANGED:
611 case PORT_STATS_UPDATED:
612 break;
613 case PORT_ADDED:
614 port = event.port().number();
615 if (oltData.containsKey(devId) && !isUplink(devId, port) && !isConnectPoint(devId, port)) {
616 processFilterObjective(devId, port, false);
617 } else if (isUplink(devId, port)) {
618 provisionUplinkFlows();
619 } else if (isConnectPoint(devId, port)) {
620 provisionConnectPointFlows();
621 }
622 break;
623 case PORT_UPDATED:
624 port = event.port().number();
625 if (oltData.containsKey(devId) && !isUplink(devId, port) && !isConnectPoint(devId, port)) {
626 if (event.port().isEnabled()) {
627 processFilterObjective(devId, port, false);
628 } else {
629 processFilterObjective(devId, port, true);
630 }
631 } else if (isUplink(devId, port)) {
632 if (event.port().isEnabled()) {
633 provisionUplinkFlows(devId);
634 } else {
635 processFilterObjective(devId, port, true);
636 }
637 } else if (isConnectPoint(devId, port)) {
638 if (event.port().isEnabled()) {
639 provisionConnectPointFlows();
640 } else {
641 unprovisionConnectPointFlows();
642 }
643 }
644 break;
645 case PORT_REMOVED:
646 port = event.port().number();
647 processFilterObjective(devId, port, true);
648 break;
649 default:
650 log.info("Unknown device event {}", event.type());
651 break;
652 }
653 }
654
655 @Override
656 public boolean isRelevant(DeviceEvent event) {
657 return true;
658 }
659 }
660
661 private class InternalNetworkConfigListener implements NetworkConfigListener {
662
663 private void reconfigureNetwork(IgmpproxyConfig cfg) {
664 IgmpproxyConfig newCfg;
665 if (cfg == null) {
666 newCfg = new IgmpproxyConfig();
667 } else {
668 newCfg = cfg;
669 }
670
671 unSolicitedTimeout = newCfg.unsolicitedTimeOut();
672 maxResp = newCfg.maxResp();
673 keepAliveInterval = newCfg.keepAliveInterval();
674 keepAliveCount = newCfg.keepAliveCount();
675 lastQueryInterval = newCfg.lastQueryInterval();
676 lastQueryCount = newCfg.lastQueryCount();
677 withRAUplink = newCfg.withRAUplink();
678 withRADownlink = newCfg.withRADownlink();
679 igmpCos = newCfg.igmpCos();
680 periodicQuery = newCfg.periodicQuery();
681 fastLeave = newCfg.fastLeave();
ke hanf9ed58c2017-09-08 10:29:12 +0800682 pimSSmInterworking = newCfg.pimSsmInterworking();
ke han81a38b92017-03-10 18:41:44 +0800683 connectPoint = newCfg.connectPoint();
684 if (connectPointMode != newCfg.connectPointMode()) {
685 connectPointMode = newCfg.connectPointMode();
686 if (connectPointMode) {
687 unprovisionUplinkFlows();
688 provisionConnectPointFlows();
689 } else {
690 unprovisionConnectPointFlows();
691 provisionUplinkFlows();
692 }
693 }
694 if (connectPoint != null) {
695 log.info("connect point :" + connectPoint.toString());
696 }
697 log.info(" mode: " + connectPointMode);
698
699 IgmpSender.getInstance().setIgmpCos(igmpCos);
700 IgmpSender.getInstance().setMaxResp(maxResp);
701 IgmpSender.getInstance().setMvlan(mvlan);
702 IgmpSender.getInstance().setWithRADownlink(withRADownlink);
703 IgmpSender.getInstance().setWithRAUplink(withRAUplink);
704
705 }
706
707 public void reconfigureSsmTable(IgmpproxySsmTranslateConfig cfg) {
708 if (cfg == null) {
709 return;
710 }
711 Collection<McastRoute> translations = cfg.getSsmTranslations();
712 for (McastRoute route : translations) {
713 ssmTranslateTable.put(route.group().getIp4Address(), route.source().getIp4Address());
714 }
715 }
716
717 @Override
718 public void event(NetworkConfigEvent event) {
719 switch (event.type()) {
720 case CONFIG_ADDED:
721 case CONFIG_UPDATED:
722 if (event.configClass().equals(CONFIG_CLASS)) {
723 AccessDeviceConfig config =
724 networkConfig.getConfig((DeviceId) event.subject(), CONFIG_CLASS);
725 if (config != null) {
726 oltData.put(config.getAccessDevice().deviceId(), config.getAccessDevice());
727 provisionDefaultFlows((DeviceId) event.subject());
728 provisionUplinkFlows((DeviceId) event.subject());
729 }
730 }
731
732 if (event.configClass().equals(IGMPPROXY_CONFIG_CLASS)) {
733 IgmpproxyConfig config = networkConfig.getConfig(appId, IGMPPROXY_CONFIG_CLASS);
734 if (config != null) {
735 reconfigureNetwork(config);
736 }
737 }
738
739 if (event.configClass().equals(IGMPPROXY_SSM_CONFIG_CLASS)) {
740 IgmpproxySsmTranslateConfig config = networkConfig.getConfig(appId, IGMPPROXY_SSM_CONFIG_CLASS);
741 if (config != null) {
742 reconfigureSsmTable(config);
743 }
744 }
745
746 if (event.configClass().equals(MCAST_CONFIG_CLASS)) {
747 McastConfig config = networkConfig.getConfig(coreAppId, MCAST_CONFIG_CLASS);
748 if (config != null && mvlan != config.egressVlan().toShort()) {
749 mvlan = config.egressVlan().toShort();
Deepa Vaddireddy88134a42017-07-05 12:16:03 +0530750 IgmpSender.getInstance().setMvlan(mvlan);
ke han81a38b92017-03-10 18:41:44 +0800751 groupMemberMap.values().forEach(m -> leaveAction(m));
752 }
753 }
754
755 log.info("Reconfigured");
756 break;
757 case CONFIG_REGISTERED:
758 case CONFIG_UNREGISTERED:
759 break;
760 case CONFIG_REMOVED:
761 if (event.configClass().equals(CONFIG_CLASS)) {
762 oltData.remove(event.subject());
763 }
764
765 default:
766 break;
767 }
768 }
769 }
770
771 private void provisionDefaultFlows(DeviceId deviceId) {
772 List<Port> ports = deviceService.getPorts(deviceId);
773 ports.stream()
774 .filter(p -> (!oltData.get(p.element().id()).uplink().equals(p.number()) && p.isEnabled()))
775 .forEach(p -> processFilterObjective((DeviceId) p.element().id(), p.number(), false));
776 }
777
778 private void provisionUplinkFlows(DeviceId deviceId) {
779 if (connectPointMode) {
780 return;
781 }
782
783 processFilterObjective(deviceId, oltData.get(deviceId).uplink(), false);
784 }
785
786 private void provisionUplinkFlows() {
787 if (connectPointMode) {
788 return;
789 }
790
David K. Bainbridge4809c0e2017-08-17 09:54:40 -0700791 oltData.keySet().forEach(deviceId -> provisionUplinkFlows(deviceId));
ke han81a38b92017-03-10 18:41:44 +0800792 }
793 private void unprovisionUplinkFlows() {
794 oltData.keySet().forEach(deviceId ->
795 processFilterObjective(deviceId, oltData.get(deviceId).uplink(), true));
796 }
797
798 private void provisionConnectPointFlows() {
799 if ((!connectPointMode) || connectPoint == null) {
800 return;
801 }
802
803 processFilterObjective(connectPoint.deviceId(), connectPoint.port(), false);
804 }
805 private void unprovisionConnectPointFlows() {
806 if (connectPoint == null) {
807 return;
808 }
809 processFilterObjective(connectPoint.deviceId(), connectPoint.port(), true);
810 }
811}