blob: 6a885d724745ade3b8d0889d9e5268bd11d7c175 [file] [log] [blame]
ke han81a38b92017-03-10 18:41:44 +08001package org.opencord.igmpproxy;
2
3import com.google.common.collect.Maps;
4import org.apache.felix.scr.annotations.Activate;
5import org.apache.felix.scr.annotations.Component;
6import org.apache.felix.scr.annotations.Deactivate;
7import org.apache.felix.scr.annotations.Reference;
8import org.apache.felix.scr.annotations.ReferenceCardinality;
9import org.onlab.packet.EthType;
10import org.onlab.packet.Ethernet;
11import org.onlab.packet.IGMP;
12import org.onlab.packet.IGMPGroup;
13import org.onlab.packet.IGMPMembership;
14import org.onlab.packet.IGMPQuery;
15import org.onlab.packet.IPv4;
16import org.onlab.packet.Ip4Address;
17import org.onlab.packet.IpAddress;
18import org.onlab.packet.VlanId;
19import org.onosproject.core.ApplicationId;
20import org.onosproject.core.CoreService;
21import org.onosproject.incubator.net.config.basics.McastConfig;
22import org.onosproject.mastership.MastershipService;
23import org.onosproject.net.AnnotationKeys;
24import org.onosproject.net.ConnectPoint;
25import org.onosproject.net.DeviceId;
26import org.onosproject.net.Port;
27import org.onosproject.net.PortNumber;
28import org.onosproject.net.config.ConfigFactory;
29import org.onosproject.net.config.NetworkConfigEvent;
30import org.onosproject.net.config.NetworkConfigListener;
31import org.onosproject.net.config.NetworkConfigRegistry;
32import org.onosproject.net.config.basics.SubjectFactories;
33import org.onosproject.net.device.DeviceEvent;
34import org.onosproject.net.device.DeviceListener;
35import org.onosproject.net.device.DeviceService;
36import org.onosproject.net.flow.DefaultTrafficTreatment;
37import org.onosproject.net.flow.FlowRuleService;
38import org.onosproject.net.flow.criteria.Criteria;
39import org.onosproject.net.flowobjective.DefaultFilteringObjective;
40import org.onosproject.net.flowobjective.FilteringObjective;
41import org.onosproject.net.flowobjective.FlowObjectiveService;
42import org.onosproject.net.flowobjective.Objective;
43import org.onosproject.net.flowobjective.ObjectiveContext;
44import org.onosproject.net.flowobjective.ObjectiveError;
45import org.onosproject.net.mcast.McastRoute;
46import org.onosproject.net.mcast.MulticastRouteService;
47import org.onosproject.net.packet.InboundPacket;
48import org.onosproject.net.packet.PacketContext;
49import org.onosproject.net.packet.PacketProcessor;
50import org.onosproject.net.packet.PacketService;
51import org.opencord.cordconfig.access.AccessDeviceConfig;
52import org.opencord.cordconfig.access.AccessDeviceData;
53import org.slf4j.Logger;
54import org.slf4j.LoggerFactory;
55
56import java.util.ArrayList;
57import java.util.Collection;
58import java.util.Iterator;
59import java.util.List;
60import java.util.Map;
61import java.util.Set;
62import java.util.TimerTask;
63import java.util.concurrent.ConcurrentHashMap;
64import java.util.concurrent.Executors;
65import java.util.concurrent.ScheduledExecutorService;
66import java.util.concurrent.TimeUnit;
67
68/**
69 * Igmp process application, use proxy mode, support first join/ last leave , fast leave
70 * period query and keep alive, packet out igmp message to uplink port features.
71 */
72@Component(immediate = true)
73public class IgmpManager {
74
75 private static final Class<AccessDeviceConfig> CONFIG_CLASS =
76 AccessDeviceConfig.class;
77 private static final Class<IgmpproxyConfig> IGMPPROXY_CONFIG_CLASS =
78 IgmpproxyConfig.class;
79 private static final Class<IgmpproxySsmTranslateConfig> IGMPPROXY_SSM_CONFIG_CLASS =
80 IgmpproxySsmTranslateConfig.class;
81 private static final Class<McastConfig> MCAST_CONFIG_CLASS =
82 McastConfig.class;
83 public static Map<String, GroupMember> groupMemberMap = Maps.newConcurrentMap();
84 private static ApplicationId appId;
85 private static Map<DeviceId, AccessDeviceData> oltData = new ConcurrentHashMap<>();
86 private static int unSolicitedTimeout = 3; // unit is 1 sec
87 private static int keepAliveCount = 3;
88 private static int lastQueryInterval = 2; //unit is 1 sec
89 private static int lastQueryCount = 2;
90 private static boolean fastLeave = true;
91 private static boolean withRAUplink = true;
92 private static boolean withRADownlink = false;
93 private static boolean periodicQuery = true;
94 private static short mvlan = 4000;
95 private static byte igmpCos = 7;
96 public static boolean connectPointMode = true;
97 public static ConnectPoint connectPoint = null;
98
99 private final ScheduledExecutorService scheduledExecutorService =
100 Executors.newScheduledThreadPool(1);
101 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
102 protected CoreService coreService;
103 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
104 protected PacketService packetService;
105 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
106 protected MastershipService mastershipService;
107 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
108 protected FlowRuleService flowRuleService;
109 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
110 protected DeviceService deviceService;
111 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
112 protected FlowObjectiveService flowObjectiveService;
113 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
114 protected NetworkConfigRegistry networkConfig;
115 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
116 protected MulticastRouteService multicastService;
117 private IgmpPacketProcessor processor = new IgmpPacketProcessor();
118 private Logger log = LoggerFactory.getLogger(getClass());
119 private ApplicationId coreAppId;
120 private Map<Ip4Address, Ip4Address> ssmTranslateTable = new ConcurrentHashMap<>();
121 private InternalNetworkConfigListener configListener =
122 new InternalNetworkConfigListener();
123 private DeviceListener deviceListener = new InternalDeviceListener();
124 private ConfigFactory<DeviceId, AccessDeviceConfig> configFactory = null;
125 private ConfigFactory<ApplicationId, IgmpproxyConfig> igmpproxyConfigFactory =
126 new ConfigFactory<ApplicationId, IgmpproxyConfig>(
127 SubjectFactories.APP_SUBJECT_FACTORY, IGMPPROXY_CONFIG_CLASS, "igmpproxy") {
128 @Override
129 public IgmpproxyConfig createConfig() {
130 return new IgmpproxyConfig();
131 }
132 };
133 private ConfigFactory<ApplicationId, IgmpproxySsmTranslateConfig> igmpproxySsmConfigFactory =
134 new ConfigFactory<ApplicationId, IgmpproxySsmTranslateConfig>(
135 SubjectFactories.APP_SUBJECT_FACTORY, IGMPPROXY_SSM_CONFIG_CLASS, "ssmTranslate", true) {
136 @Override
137 public IgmpproxySsmTranslateConfig createConfig() {
138 return new IgmpproxySsmTranslateConfig();
139 }
140 };
141 private int maxResp = 10; //unit is 1 sec
142 private int keepAliveInterval = 120; //unit is 1 sec
143
144 public static int getUnsolicitedTimeout() {
145 return unSolicitedTimeout;
146 }
147
148 @Activate
149 protected void activate() {
150 appId = coreService.registerApplication("org.opencord.igmpproxy");
151 coreAppId = coreService.registerApplication(CoreService.CORE_APP_NAME);
152 packetService.addProcessor(processor, PacketProcessor.director(4));
153 IgmpSender.init(packetService, mastershipService);
154
155 if (networkConfig.getConfigFactory(CONFIG_CLASS) == null) {
156 configFactory =
157 new ConfigFactory<DeviceId, AccessDeviceConfig>(
158 SubjectFactories.DEVICE_SUBJECT_FACTORY, CONFIG_CLASS, "accessDevice") {
159 @Override
160 public AccessDeviceConfig createConfig() {
161 return new AccessDeviceConfig();
162 }
163 };
164 networkConfig.registerConfigFactory(configFactory);
165 }
166 networkConfig.registerConfigFactory(igmpproxySsmConfigFactory);
167 networkConfig.registerConfigFactory(igmpproxyConfigFactory);
168 networkConfig.addListener(configListener);
169
170 configListener.reconfigureNetwork(networkConfig.getConfig(appId, IGMPPROXY_CONFIG_CLASS));
171 configListener.reconfigureSsmTable(networkConfig.getConfig(appId, IGMPPROXY_SSM_CONFIG_CLASS));
172
173 networkConfig.getSubjects(DeviceId.class, AccessDeviceConfig.class).forEach(
174 subject -> {
175 AccessDeviceConfig config = networkConfig.getConfig(subject,
176 AccessDeviceConfig.class);
177 if (config != null) {
178 AccessDeviceData data = config.getAccessDevice();
179 oltData.put(data.deviceId(), data);
180 }
181 }
182 );
183
184 oltData.keySet().forEach(d->provisionDefaultFlows(d));
185 if (connectPointMode) {
186 provisionConnectPointFlows();
187 } else {
188 provisionUplinkFlows();
189 }
190
191 McastConfig config = networkConfig.getConfig(coreAppId, MCAST_CONFIG_CLASS);
192 if (config != null) {
193 mvlan = config.egressVlan().toShort();
194 }
195 deviceService.addListener(deviceListener);
196 scheduledExecutorService.scheduleAtFixedRate(new IgmpProxyTimerTask(), 0, 1000, TimeUnit.MILLISECONDS);
197
198 log.info("Started");
199 }
200
201 @Deactivate
202 protected void deactivate() {
203 scheduledExecutorService.shutdown();
204
205 // de-register and null our handler
206 networkConfig.removeListener(configListener);
207 if (configFactory != null) {
208 networkConfig.unregisterConfigFactory(configFactory);
209 }
210 networkConfig.unregisterConfigFactory(igmpproxyConfigFactory);
211 networkConfig.unregisterConfigFactory(igmpproxySsmConfigFactory);
212 deviceService.removeListener(deviceListener);
213 packetService.removeProcessor(processor);
214 flowRuleService.removeFlowRulesById(appId);
215
216 log.info("Stopped");
217 }
218
219 protected Ip4Address getDeviceIp(DeviceId ofDeviceId) {
220 try {
221 String[] mgmtAddress = deviceService.getDevice(ofDeviceId)
222 .annotations().value(AnnotationKeys.MANAGEMENT_ADDRESS).split(":");
223 return Ip4Address.valueOf(mgmtAddress[0]);
224 } catch (Exception ex) {
225 log.info("No valid Ipaddress for " + ofDeviceId.toString());
226 return null;
227 }
228 }
229
230 private void processIgmpQuery(IGMPQuery igmpQuery, ConnectPoint cp, int maxResp) {
231
232 DeviceId deviceId = cp.deviceId();
233 Ip4Address gAddr = igmpQuery.getGaddr().getIp4Address();
234
235 if (maxResp >= 128) {
236 int mant = maxResp & 0xf;
237 int exp = (maxResp >> 4) & 0x7;
238 maxResp = (mant | 0x10) << (exp + 3);
239 }
240
241 maxResp = (maxResp + 5) / 10;
242
243 if (gAddr != null && !gAddr.isZero()) {
244 StateMachine.specialQuery(deviceId, gAddr, maxResp);
245 } else {
246 StateMachine.generalQuery(deviceId, maxResp);
247 }
248
249 }
250
251 private Ip4Address ssmTranslateRoute(IpAddress group) {
252 return ssmTranslateTable.get(group);
253 }
254
255 private void processIgmpReport(IGMPMembership igmpGroup, VlanId vlan, ConnectPoint cp, byte igmpType) {
256 DeviceId deviceId = cp.deviceId();
257 PortNumber portNumber = cp.port();
258
259 Ip4Address groupIp = igmpGroup.getGaddr().getIp4Address();
260 if (!groupIp.isMulticast()) {
261 log.info(groupIp.toString() + " is not a valid group address");
262 return;
263 }
264 Ip4Address srcIp = getDeviceIp(deviceId);
265
266 byte recordType = igmpGroup.getRecordType();
267 boolean join = false;
268
269 ArrayList<Ip4Address> sourceList = new ArrayList<>();
270
271 if (igmpGroup.getSources().size() > 0) {
272 igmpGroup.getSources().forEach(source -> sourceList.add(source.getIp4Address()));
273 if (recordType == IGMPMembership.CHANGE_TO_EXCLUDE_MODE ||
274 recordType == IGMPMembership.MODE_IS_EXCLUDE ||
275 recordType == IGMPMembership.BLOCK_OLD_SOURCES) {
276 join = false;
277 } else if (recordType == IGMPMembership.CHANGE_TO_INCLUDE_MODE ||
278 recordType == IGMPMembership.MODE_IS_INCLUDE ||
279 recordType == IGMPMembership.ALLOW_NEW_SOURCES) {
280 join = true;
281 }
282 } else {
283 IpAddress src = ssmTranslateRoute(groupIp);
284 if (src == null) {
285 log.info("no ssm translate for group " + groupIp.toString());
286 return;
287 }
288 sourceList.add(src.getIp4Address());
289 if (recordType == IGMPMembership.CHANGE_TO_EXCLUDE_MODE ||
290 recordType == IGMPMembership.MODE_IS_EXCLUDE ||
291 recordType == IGMPMembership.BLOCK_OLD_SOURCES) {
292 join = true;
293 } else if (recordType == IGMPMembership.CHANGE_TO_INCLUDE_MODE ||
294 recordType == IGMPMembership.MODE_IS_INCLUDE ||
295 recordType == IGMPMembership.ALLOW_NEW_SOURCES) {
296 join = false;
297 }
298 }
299 String groupMemberKey = GroupMember.getkey(groupIp, deviceId, portNumber);
300 GroupMember groupMember = groupMemberMap.get(groupMemberKey);
301
302 if (join) {
303 if (groupMember == null) {
304 if (igmpType == IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT) {
305 groupMember = new GroupMember(groupIp, vlan, deviceId, portNumber, true);
306 } else {
307 groupMember = new GroupMember(groupIp, vlan, deviceId, portNumber, false);
308 }
309 StateMachine.join(deviceId, groupIp, srcIp);
310 groupMemberMap.put(groupMemberKey, groupMember);
311 groupMember.updateList(recordType, sourceList);
312 groupMember.getSourceList().forEach(source -> multicastService.addSink(
313 new McastRoute(source, groupIp, McastRoute.Type.IGMP), cp));
314 }
315 groupMember.resetAllTimers();
316 groupMember.updateList(recordType, sourceList);
317 groupMember.setLeave(false);
318 } else {
319 if (groupMember == null) {
320 log.info("receive leave but no instance, group " + groupIp.toString() +
321 " device:" + deviceId.toString() + " port:" + portNumber.toString());
322 return;
323 } else {
324 groupMember.setLeave(true);
325 if (fastLeave) {
326 leaveAction(groupMember);
327 } else {
328 sendQuery(groupMember);
329 }
330 }
331 }
332 }
333
334 private void leaveAction(GroupMember groupMember) {
335 ConnectPoint cp = new ConnectPoint(groupMember.getDeviceId(), groupMember.getPortNumber());
336 StateMachine.leave(groupMember.getDeviceId(), groupMember.getGroupIp());
337 groupMember.getSourceList().forEach(source -> multicastService.removeSink(
338 new McastRoute(source, groupMember.getGroupIp(),
339 McastRoute.Type.IGMP), cp));
340 groupMemberMap.remove(groupMember.getId());
341 }
342
343 private void sendQuery(GroupMember groupMember) {
344 Ethernet ethpkt;
345 Ip4Address srcIp = getDeviceIp(groupMember.getDeviceId());
346 if (groupMember.getv2()) {
347 ethpkt = IgmpSender.getInstance().buildIgmpV2Query(groupMember.getGroupIp(), srcIp);
348 } else {
349 ethpkt = IgmpSender.getInstance().buildIgmpV3Query(groupMember.getGroupIp(), srcIp);
350 }
351 IgmpSender.getInstance().sendIgmpPacket(ethpkt, groupMember.getDeviceId(), groupMember.getPortNumber());
352 }
353
354 private void processFilterObjective(DeviceId devId, Port port, boolean remove) {
355
356 //TODO migrate to packet requests when packet service uses filtering objectives
357 DefaultFilteringObjective.Builder builder = DefaultFilteringObjective.builder();
358
359 builder = remove ? builder.deny() : builder.permit();
360
361 FilteringObjective igmp = builder
362 .withKey(Criteria.matchInPort(port.number()))
363 .addCondition(Criteria.matchEthType(EthType.EtherType.IPV4.ethType()))
364 .addCondition(Criteria.matchIPProtocol(IPv4.PROTOCOL_IGMP))
365 .withMeta(DefaultTrafficTreatment.builder().setOutput(PortNumber.CONTROLLER).build())
366 .fromApp(appId)
367 .withPriority(10000)
368 .add(new ObjectiveContext() {
369 @Override
370 public void onSuccess(Objective objective) {
371 log.info("IgmpProxy filter for {} on {} installed.",
372 devId, port);
373 }
374
375 @Override
376 public void onError(Objective objective, ObjectiveError error) {
377 log.info("IgmpProxy filter for {} on {} failed because {}.",
378 devId, port, error);
379 }
380 });
381
382 flowObjectiveService.filter(devId, igmp);
383 }
384
385 /**
386 * Packet processor responsible for forwarding packets along their paths.
387 */
388 private class IgmpPacketProcessor implements PacketProcessor {
389 @Override
390 public void process(PacketContext context) {
391
392 try {
393 InboundPacket pkt = context.inPacket();
394 Ethernet ethPkt = pkt.parsed();
395 if (ethPkt == null) {
396 return;
397 }
398
399 if (ethPkt.getEtherType() != Ethernet.TYPE_IPV4) {
400 return;
401 }
402
403 IPv4 ipv4Pkt = (IPv4) ethPkt.getPayload();
404
405 if (ipv4Pkt.getProtocol() != IPv4.PROTOCOL_IGMP) {
406 return;
407 }
408
409 short vlan = ethPkt.getVlanID();
410 DeviceId deviceId = pkt.receivedFrom().deviceId();
411
412 if (oltData.get(deviceId) == null) {
413 log.error("Device not registered in netcfg :" + deviceId.toString());
414 return;
415 }
416
417 IGMP igmp = (IGMP) ipv4Pkt.getPayload();
418 switch (igmp.getIgmpType()) {
419 case IGMP.TYPE_IGMPV3_MEMBERSHIP_QUERY:
420 //Discard Query from OLT’s non-uplink port’s
421 if (!pkt.receivedFrom().port().equals(getDeviceUplink(deviceId))) {
422 log.info("IGMP Picked up query from non-uplink port");
423 return;
424 }
425
426 processIgmpQuery((IGMPQuery) igmp.getGroups().get(0), pkt.receivedFrom(),
427 0xff & igmp.getMaxRespField());
428 break;
429 case IGMP.TYPE_IGMPV1_MEMBERSHIP_REPORT:
430 log.debug("IGMP version 1 message types are not currently supported.");
431 break;
432 case IGMP.TYPE_IGMPV3_MEMBERSHIP_REPORT:
433 case IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT:
434 case IGMP.TYPE_IGMPV2_LEAVE_GROUP:
435 //Discard join/leave from OLT’s uplink port’s
436 if (pkt.receivedFrom().port().equals(getDeviceUplink(deviceId))) {
437 log.info("IGMP Picked up join/leave from the olt uplink port");
438 return;
439 }
440
441 Iterator<IGMPGroup> itr = igmp.getGroups().iterator();
442 while (itr.hasNext()) {
443 IGMPGroup group = itr.next();
444 if (group instanceof IGMPMembership) {
445 processIgmpReport((IGMPMembership) group, VlanId.vlanId(vlan),
446 pkt.receivedFrom(), igmp.getIgmpType());
447 } else if (group instanceof IGMPQuery) {
448 IGMPMembership mgroup;
449 mgroup = new IGMPMembership(group.getGaddr().getIp4Address());
450 mgroup.setRecordType(igmp.getIgmpType() == IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT ?
451 IGMPMembership.MODE_IS_EXCLUDE : IGMPMembership.MODE_IS_INCLUDE);
452 processIgmpReport(mgroup, VlanId.vlanId(vlan),
453 pkt.receivedFrom(), igmp.getIgmpType());
454 }
455 }
456 break;
457
458 default:
459 log.info("wrong IGMP v3 type:" + igmp.getIgmpType());
460 break;
461 }
462
463 } catch (Exception ex) {
464 log.error("igmp process error : " + ex.toString());
465 ex.printStackTrace();
466 }
467 }
468 }
469
470 private class IgmpProxyTimerTask extends TimerTask {
471 public void run() {
472 try {
473 IgmpTimer.timeOut1s();
474 queryMembers();
475 } catch (Exception ex) {
476 log.warn("Igmp timer task error : {}", ex.getMessage());
477 }
478 }
479
480 private void queryMembers() {
481 GroupMember groupMember;
482 Set groupMemberSet = groupMemberMap.entrySet();
483 Iterator itr = groupMemberSet.iterator();
484 while (itr.hasNext()) {
485 Map.Entry entry = (Map.Entry) itr.next();
486 groupMember = (GroupMember) entry.getValue();
487 DeviceId did = groupMember.getDeviceId();
488 if (mastershipService.isLocalMaster(did)) {
489 if (groupMember.isLeave()) {
490 lastQuery(groupMember);
491 } else if (periodicQuery) {
492 periodicQuery(groupMember);
493 }
494 }
495 }
496 }
497
498 private void lastQuery(GroupMember groupMember) {
499 if (groupMember.getLastQueryInterval() < lastQueryInterval) {
500 groupMember.lastQueryInterval(true); // count times
501 } else if (groupMember.getLastQueryCount() < lastQueryCount - 1) {
502 sendQuery(groupMember);
503 groupMember.lastQueryInterval(false); // reset count number
504 groupMember.lastQueryCount(true); //count times
505 } else if (groupMember.getLastQueryCount() == lastQueryCount - 1) {
506 leaveAction(groupMember);
507 }
508 }
509
510 private void periodicQuery(GroupMember groupMember) {
511 if (groupMember.getKeepAliveQueryInterval() < keepAliveInterval) {
512 groupMember.keepAliveInterval(true);
513 } else if (groupMember.getKeepAliveQueryCount() < keepAliveCount) {
514 sendQuery(groupMember);
515 groupMember.keepAliveInterval(false);
516 groupMember.keepAliveQueryCount(true);
517 } else if (groupMember.getKeepAliveQueryCount() == keepAliveCount) {
518 leaveAction(groupMember);
519 }
520 }
521
522 }
523
524 public static PortNumber getDeviceUplink(DeviceId devId) {
525 return oltData.get(devId).uplink();
526 }
527
528 private void processFilterObjective(DeviceId devId, PortNumber port, boolean remove) {
529 //TODO migrate to packet requests when packet service uses filtering objectives
530 DefaultFilteringObjective.Builder builder = DefaultFilteringObjective.builder();
531
532 builder = remove ? builder.deny() : builder.permit();
533
534 FilteringObjective igmp = builder
535 .withKey(Criteria.matchInPort(port))
536 .addCondition(Criteria.matchEthType(EthType.EtherType.IPV4.ethType()))
537 .addCondition(Criteria.matchIPProtocol(IPv4.PROTOCOL_IGMP))
538 .withMeta(DefaultTrafficTreatment.builder().setOutput(PortNumber.CONTROLLER).build())
539 .fromApp(appId)
540 .withPriority(10000)
541 .add(new ObjectiveContext() {
542 @Override
543 public void onSuccess(Objective objective) {
544 log.info("IgmpProxy filter for {} on {} installed.",
545 devId, port);
546 }
547
548 @Override
549 public void onError(Objective objective, ObjectiveError error) {
550 log.info("IgmpProxy filter for {} on {} failed because {}.",
551 devId, port, error);
552 }
553 });
554
555 flowObjectiveService.filter(devId, igmp);
556 }
557
558 private boolean isConnectPoint(DeviceId device, PortNumber port) {
Deepa Vaddireddy01f33b42017-07-02 13:26:53 +0530559 if (connectPoint != null) {
560 return (connectPointMode && connectPoint.deviceId().equals(device)
561 && connectPoint.port().equals(port));
562 } else {
563 log.info("connectPoint not configured for device {}", device);
564 return false;
565 }
ke han81a38b92017-03-10 18:41:44 +0800566 }
Deepa Vaddireddy01f33b42017-07-02 13:26:53 +0530567
ke han81a38b92017-03-10 18:41:44 +0800568 private boolean isUplink(DeviceId device, PortNumber port) {
569 return ((!connectPointMode) && oltData.containsKey(device)
570 && oltData.get(device).uplink().equals(port));
571 }
572
573 private class InternalDeviceListener implements DeviceListener {
574 @Override
575 public void event(DeviceEvent event) {
576 DeviceId devId = event.subject().id();
577 PortNumber port;
578 if (oltData.get(devId) == null) {
579 return;
580 }
581 switch (event.type()) {
582
583 case DEVICE_ADDED:
584 case DEVICE_UPDATED:
585 case DEVICE_REMOVED:
586 case DEVICE_SUSPENDED:
587 case DEVICE_AVAILABILITY_CHANGED:
588 case PORT_STATS_UPDATED:
589 break;
590 case PORT_ADDED:
591 port = event.port().number();
592 if (oltData.containsKey(devId) && !isUplink(devId, port) && !isConnectPoint(devId, port)) {
593 processFilterObjective(devId, port, false);
594 } else if (isUplink(devId, port)) {
595 provisionUplinkFlows();
596 } else if (isConnectPoint(devId, port)) {
597 provisionConnectPointFlows();
598 }
599 break;
600 case PORT_UPDATED:
601 port = event.port().number();
602 if (oltData.containsKey(devId) && !isUplink(devId, port) && !isConnectPoint(devId, port)) {
603 if (event.port().isEnabled()) {
604 processFilterObjective(devId, port, false);
605 } else {
606 processFilterObjective(devId, port, true);
607 }
608 } else if (isUplink(devId, port)) {
609 if (event.port().isEnabled()) {
610 provisionUplinkFlows(devId);
611 } else {
612 processFilterObjective(devId, port, true);
613 }
614 } else if (isConnectPoint(devId, port)) {
615 if (event.port().isEnabled()) {
616 provisionConnectPointFlows();
617 } else {
618 unprovisionConnectPointFlows();
619 }
620 }
621 break;
622 case PORT_REMOVED:
623 port = event.port().number();
624 processFilterObjective(devId, port, true);
625 break;
626 default:
627 log.info("Unknown device event {}", event.type());
628 break;
629 }
630 }
631
632 @Override
633 public boolean isRelevant(DeviceEvent event) {
634 return true;
635 }
636 }
637
638 private class InternalNetworkConfigListener implements NetworkConfigListener {
639
640 private void reconfigureNetwork(IgmpproxyConfig cfg) {
641 IgmpproxyConfig newCfg;
642 if (cfg == null) {
643 newCfg = new IgmpproxyConfig();
644 } else {
645 newCfg = cfg;
646 }
647
648 unSolicitedTimeout = newCfg.unsolicitedTimeOut();
649 maxResp = newCfg.maxResp();
650 keepAliveInterval = newCfg.keepAliveInterval();
651 keepAliveCount = newCfg.keepAliveCount();
652 lastQueryInterval = newCfg.lastQueryInterval();
653 lastQueryCount = newCfg.lastQueryCount();
654 withRAUplink = newCfg.withRAUplink();
655 withRADownlink = newCfg.withRADownlink();
656 igmpCos = newCfg.igmpCos();
657 periodicQuery = newCfg.periodicQuery();
658 fastLeave = newCfg.fastLeave();
659
660 connectPoint = newCfg.connectPoint();
661 if (connectPointMode != newCfg.connectPointMode()) {
662 connectPointMode = newCfg.connectPointMode();
663 if (connectPointMode) {
664 unprovisionUplinkFlows();
665 provisionConnectPointFlows();
666 } else {
667 unprovisionConnectPointFlows();
668 provisionUplinkFlows();
669 }
670 }
671 if (connectPoint != null) {
672 log.info("connect point :" + connectPoint.toString());
673 }
674 log.info(" mode: " + connectPointMode);
675
676 IgmpSender.getInstance().setIgmpCos(igmpCos);
677 IgmpSender.getInstance().setMaxResp(maxResp);
678 IgmpSender.getInstance().setMvlan(mvlan);
679 IgmpSender.getInstance().setWithRADownlink(withRADownlink);
680 IgmpSender.getInstance().setWithRAUplink(withRAUplink);
681
682 }
683
684 public void reconfigureSsmTable(IgmpproxySsmTranslateConfig cfg) {
685 if (cfg == null) {
686 return;
687 }
688 Collection<McastRoute> translations = cfg.getSsmTranslations();
689 for (McastRoute route : translations) {
690 ssmTranslateTable.put(route.group().getIp4Address(), route.source().getIp4Address());
691 }
692 }
693
694 @Override
695 public void event(NetworkConfigEvent event) {
696 switch (event.type()) {
697 case CONFIG_ADDED:
698 case CONFIG_UPDATED:
699 if (event.configClass().equals(CONFIG_CLASS)) {
700 AccessDeviceConfig config =
701 networkConfig.getConfig((DeviceId) event.subject(), CONFIG_CLASS);
702 if (config != null) {
703 oltData.put(config.getAccessDevice().deviceId(), config.getAccessDevice());
704 provisionDefaultFlows((DeviceId) event.subject());
705 provisionUplinkFlows((DeviceId) event.subject());
706 }
707 }
708
709 if (event.configClass().equals(IGMPPROXY_CONFIG_CLASS)) {
710 IgmpproxyConfig config = networkConfig.getConfig(appId, IGMPPROXY_CONFIG_CLASS);
711 if (config != null) {
712 reconfigureNetwork(config);
713 }
714 }
715
716 if (event.configClass().equals(IGMPPROXY_SSM_CONFIG_CLASS)) {
717 IgmpproxySsmTranslateConfig config = networkConfig.getConfig(appId, IGMPPROXY_SSM_CONFIG_CLASS);
718 if (config != null) {
719 reconfigureSsmTable(config);
720 }
721 }
722
723 if (event.configClass().equals(MCAST_CONFIG_CLASS)) {
724 McastConfig config = networkConfig.getConfig(coreAppId, MCAST_CONFIG_CLASS);
725 if (config != null && mvlan != config.egressVlan().toShort()) {
726 mvlan = config.egressVlan().toShort();
727 groupMemberMap.values().forEach(m -> leaveAction(m));
728 }
729 }
730
731 log.info("Reconfigured");
732 break;
733 case CONFIG_REGISTERED:
734 case CONFIG_UNREGISTERED:
735 break;
736 case CONFIG_REMOVED:
737 if (event.configClass().equals(CONFIG_CLASS)) {
738 oltData.remove(event.subject());
739 }
740
741 default:
742 break;
743 }
744 }
745 }
746
747 private void provisionDefaultFlows(DeviceId deviceId) {
748 List<Port> ports = deviceService.getPorts(deviceId);
749 ports.stream()
750 .filter(p -> (!oltData.get(p.element().id()).uplink().equals(p.number()) && p.isEnabled()))
751 .forEach(p -> processFilterObjective((DeviceId) p.element().id(), p.number(), false));
752 }
753
754 private void provisionUplinkFlows(DeviceId deviceId) {
755 if (connectPointMode) {
756 return;
757 }
758
759 processFilterObjective(deviceId, oltData.get(deviceId).uplink(), false);
760 }
761
762 private void provisionUplinkFlows() {
763 if (connectPointMode) {
764 return;
765 }
766
767 oltData.keySet().forEach(deviceId ->provisionUplinkFlows(deviceId));
768 }
769 private void unprovisionUplinkFlows() {
770 oltData.keySet().forEach(deviceId ->
771 processFilterObjective(deviceId, oltData.get(deviceId).uplink(), true));
772 }
773
774 private void provisionConnectPointFlows() {
775 if ((!connectPointMode) || connectPoint == null) {
776 return;
777 }
778
779 processFilterObjective(connectPoint.deviceId(), connectPoint.port(), false);
780 }
781 private void unprovisionConnectPointFlows() {
782 if (connectPoint == null) {
783 return;
784 }
785 processFilterObjective(connectPoint.deviceId(), connectPoint.port(), true);
786 }
787}