blob: cc3ab9838197cac81d0e78c78d671b913a75186b [file] [log] [blame]
David K. Bainbridged77028f2017-08-01 12:47:55 -07001/*
Brian O'Connor4d084702017-08-03 22:45:58 -07002 * Copyright 2017-present Open Networking Foundation
David K. Bainbridged77028f2017-08-01 12:47:55 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
ke han81a38b92017-03-10 18:41:44 +080016package org.opencord.igmpproxy;
17
18import com.google.common.collect.Maps;
19import org.apache.felix.scr.annotations.Activate;
20import org.apache.felix.scr.annotations.Component;
21import org.apache.felix.scr.annotations.Deactivate;
22import org.apache.felix.scr.annotations.Reference;
23import org.apache.felix.scr.annotations.ReferenceCardinality;
24import org.onlab.packet.EthType;
25import org.onlab.packet.Ethernet;
26import org.onlab.packet.IGMP;
27import org.onlab.packet.IGMPGroup;
28import org.onlab.packet.IGMPMembership;
29import org.onlab.packet.IGMPQuery;
30import org.onlab.packet.IPv4;
31import org.onlab.packet.Ip4Address;
32import org.onlab.packet.IpAddress;
33import org.onlab.packet.VlanId;
34import org.onosproject.core.ApplicationId;
35import org.onosproject.core.CoreService;
36import org.onosproject.incubator.net.config.basics.McastConfig;
37import org.onosproject.mastership.MastershipService;
38import org.onosproject.net.AnnotationKeys;
39import org.onosproject.net.ConnectPoint;
40import org.onosproject.net.DeviceId;
41import org.onosproject.net.Port;
42import org.onosproject.net.PortNumber;
43import org.onosproject.net.config.ConfigFactory;
44import org.onosproject.net.config.NetworkConfigEvent;
45import org.onosproject.net.config.NetworkConfigListener;
46import org.onosproject.net.config.NetworkConfigRegistry;
47import org.onosproject.net.config.basics.SubjectFactories;
48import org.onosproject.net.device.DeviceEvent;
49import org.onosproject.net.device.DeviceListener;
50import org.onosproject.net.device.DeviceService;
51import org.onosproject.net.flow.DefaultTrafficTreatment;
52import org.onosproject.net.flow.FlowRuleService;
53import org.onosproject.net.flow.criteria.Criteria;
54import org.onosproject.net.flowobjective.DefaultFilteringObjective;
55import org.onosproject.net.flowobjective.FilteringObjective;
56import org.onosproject.net.flowobjective.FlowObjectiveService;
57import org.onosproject.net.flowobjective.Objective;
58import org.onosproject.net.flowobjective.ObjectiveContext;
59import org.onosproject.net.flowobjective.ObjectiveError;
60import org.onosproject.net.mcast.McastRoute;
61import org.onosproject.net.mcast.MulticastRouteService;
62import org.onosproject.net.packet.InboundPacket;
63import org.onosproject.net.packet.PacketContext;
64import org.onosproject.net.packet.PacketProcessor;
65import org.onosproject.net.packet.PacketService;
66import org.opencord.cordconfig.access.AccessDeviceConfig;
67import org.opencord.cordconfig.access.AccessDeviceData;
68import org.slf4j.Logger;
69import org.slf4j.LoggerFactory;
70
71import java.util.ArrayList;
72import java.util.Collection;
73import java.util.Iterator;
74import java.util.List;
75import java.util.Map;
76import java.util.Set;
77import java.util.TimerTask;
78import java.util.concurrent.ConcurrentHashMap;
79import java.util.concurrent.Executors;
80import java.util.concurrent.ScheduledExecutorService;
81import java.util.concurrent.TimeUnit;
82
83/**
84 * Igmp process application, use proxy mode, support first join/ last leave , fast leave
85 * period query and keep alive, packet out igmp message to uplink port features.
86 */
87@Component(immediate = true)
88public class IgmpManager {
89
90 private static final Class<AccessDeviceConfig> CONFIG_CLASS =
91 AccessDeviceConfig.class;
92 private static final Class<IgmpproxyConfig> IGMPPROXY_CONFIG_CLASS =
93 IgmpproxyConfig.class;
94 private static final Class<IgmpproxySsmTranslateConfig> IGMPPROXY_SSM_CONFIG_CLASS =
95 IgmpproxySsmTranslateConfig.class;
96 private static final Class<McastConfig> MCAST_CONFIG_CLASS =
97 McastConfig.class;
98 public static Map<String, GroupMember> groupMemberMap = Maps.newConcurrentMap();
99 private static ApplicationId appId;
100 private static Map<DeviceId, AccessDeviceData> oltData = new ConcurrentHashMap<>();
101 private static int unSolicitedTimeout = 3; // unit is 1 sec
102 private static int keepAliveCount = 3;
103 private static int lastQueryInterval = 2; //unit is 1 sec
104 private static int lastQueryCount = 2;
105 private static boolean fastLeave = true;
106 private static boolean withRAUplink = true;
107 private static boolean withRADownlink = false;
108 private static boolean periodicQuery = true;
109 private static short mvlan = 4000;
110 private static byte igmpCos = 7;
111 public static boolean connectPointMode = true;
112 public static ConnectPoint connectPoint = null;
113
ke han29af27b2017-09-08 10:29:12 +0800114 private static boolean pimSSmInterworking = false;
115 private static final String DEFAULT_PIMSSM_HOST = "127.0.0.1";
ke han81a38b92017-03-10 18:41:44 +0800116 private final ScheduledExecutorService scheduledExecutorService =
117 Executors.newScheduledThreadPool(1);
118 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
119 protected CoreService coreService;
120 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
121 protected PacketService packetService;
122 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
123 protected MastershipService mastershipService;
124 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
125 protected FlowRuleService flowRuleService;
126 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
127 protected DeviceService deviceService;
128 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
129 protected FlowObjectiveService flowObjectiveService;
130 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
131 protected NetworkConfigRegistry networkConfig;
132 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
133 protected MulticastRouteService multicastService;
134 private IgmpPacketProcessor processor = new IgmpPacketProcessor();
135 private Logger log = LoggerFactory.getLogger(getClass());
136 private ApplicationId coreAppId;
137 private Map<Ip4Address, Ip4Address> ssmTranslateTable = new ConcurrentHashMap<>();
138 private InternalNetworkConfigListener configListener =
139 new InternalNetworkConfigListener();
140 private DeviceListener deviceListener = new InternalDeviceListener();
141 private ConfigFactory<DeviceId, AccessDeviceConfig> configFactory = null;
142 private ConfigFactory<ApplicationId, IgmpproxyConfig> igmpproxyConfigFactory =
143 new ConfigFactory<ApplicationId, IgmpproxyConfig>(
144 SubjectFactories.APP_SUBJECT_FACTORY, IGMPPROXY_CONFIG_CLASS, "igmpproxy") {
145 @Override
146 public IgmpproxyConfig createConfig() {
147 return new IgmpproxyConfig();
148 }
149 };
150 private ConfigFactory<ApplicationId, IgmpproxySsmTranslateConfig> igmpproxySsmConfigFactory =
151 new ConfigFactory<ApplicationId, IgmpproxySsmTranslateConfig>(
152 SubjectFactories.APP_SUBJECT_FACTORY, IGMPPROXY_SSM_CONFIG_CLASS, "ssmTranslate", true) {
153 @Override
154 public IgmpproxySsmTranslateConfig createConfig() {
155 return new IgmpproxySsmTranslateConfig();
156 }
157 };
158 private int maxResp = 10; //unit is 1 sec
159 private int keepAliveInterval = 120; //unit is 1 sec
160
161 public static int getUnsolicitedTimeout() {
162 return unSolicitedTimeout;
163 }
164
165 @Activate
166 protected void activate() {
167 appId = coreService.registerApplication("org.opencord.igmpproxy");
168 coreAppId = coreService.registerApplication(CoreService.CORE_APP_NAME);
169 packetService.addProcessor(processor, PacketProcessor.director(4));
170 IgmpSender.init(packetService, mastershipService);
171
172 if (networkConfig.getConfigFactory(CONFIG_CLASS) == null) {
173 configFactory =
174 new ConfigFactory<DeviceId, AccessDeviceConfig>(
175 SubjectFactories.DEVICE_SUBJECT_FACTORY, CONFIG_CLASS, "accessDevice") {
176 @Override
177 public AccessDeviceConfig createConfig() {
178 return new AccessDeviceConfig();
179 }
180 };
181 networkConfig.registerConfigFactory(configFactory);
182 }
183 networkConfig.registerConfigFactory(igmpproxySsmConfigFactory);
184 networkConfig.registerConfigFactory(igmpproxyConfigFactory);
185 networkConfig.addListener(configListener);
186
187 configListener.reconfigureNetwork(networkConfig.getConfig(appId, IGMPPROXY_CONFIG_CLASS));
188 configListener.reconfigureSsmTable(networkConfig.getConfig(appId, IGMPPROXY_SSM_CONFIG_CLASS));
189
190 networkConfig.getSubjects(DeviceId.class, AccessDeviceConfig.class).forEach(
191 subject -> {
192 AccessDeviceConfig config = networkConfig.getConfig(subject,
193 AccessDeviceConfig.class);
194 if (config != null) {
195 AccessDeviceData data = config.getAccessDevice();
196 oltData.put(data.deviceId(), data);
197 }
198 }
199 );
200
David K. Bainbridge4809c0e2017-08-17 09:54:40 -0700201 oltData.keySet().forEach(d -> provisionDefaultFlows(d));
ke han81a38b92017-03-10 18:41:44 +0800202 if (connectPointMode) {
203 provisionConnectPointFlows();
204 } else {
205 provisionUplinkFlows();
206 }
207
208 McastConfig config = networkConfig.getConfig(coreAppId, MCAST_CONFIG_CLASS);
209 if (config != null) {
210 mvlan = config.egressVlan().toShort();
Deepa Vaddireddy88134a42017-07-05 12:16:03 +0530211 IgmpSender.getInstance().setMvlan(mvlan);
ke han81a38b92017-03-10 18:41:44 +0800212 }
213 deviceService.addListener(deviceListener);
214 scheduledExecutorService.scheduleAtFixedRate(new IgmpProxyTimerTask(), 0, 1000, TimeUnit.MILLISECONDS);
215
216 log.info("Started");
217 }
218
219 @Deactivate
220 protected void deactivate() {
221 scheduledExecutorService.shutdown();
222
223 // de-register and null our handler
224 networkConfig.removeListener(configListener);
225 if (configFactory != null) {
226 networkConfig.unregisterConfigFactory(configFactory);
227 }
228 networkConfig.unregisterConfigFactory(igmpproxyConfigFactory);
229 networkConfig.unregisterConfigFactory(igmpproxySsmConfigFactory);
230 deviceService.removeListener(deviceListener);
231 packetService.removeProcessor(processor);
232 flowRuleService.removeFlowRulesById(appId);
233
234 log.info("Stopped");
235 }
236
237 protected Ip4Address getDeviceIp(DeviceId ofDeviceId) {
238 try {
239 String[] mgmtAddress = deviceService.getDevice(ofDeviceId)
240 .annotations().value(AnnotationKeys.MANAGEMENT_ADDRESS).split(":");
241 return Ip4Address.valueOf(mgmtAddress[0]);
242 } catch (Exception ex) {
243 log.info("No valid Ipaddress for " + ofDeviceId.toString());
244 return null;
245 }
246 }
247
248 private void processIgmpQuery(IGMPQuery igmpQuery, ConnectPoint cp, int maxResp) {
249
250 DeviceId deviceId = cp.deviceId();
251 Ip4Address gAddr = igmpQuery.getGaddr().getIp4Address();
Deepa Vaddireddybcd52352017-09-21 05:04:48 +0000252 maxResp = calculateMaxResp(maxResp);
253 if (gAddr != null && !gAddr.isZero()) {
254 StateMachine.specialQuery(deviceId, gAddr, maxResp);
255 } else {
256 StateMachine.generalQuery(deviceId, maxResp);
257 }
258 }
ke han81a38b92017-03-10 18:41:44 +0800259
Deepa Vaddireddybcd52352017-09-21 05:04:48 +0000260 private void processIgmpConnectPointQuery(IGMPQuery igmpQuery, ConnectPoint cp, int maxResp) {
261
262 DeviceId deviceId = cp.deviceId();
263 Ip4Address gAddr = igmpQuery.getGaddr().getIp4Address();
264 maxResp = calculateMaxResp(maxResp);
265 //The query is received on the ConnectPoint
266 // send query accordingly to the registered OLT devices.
267 if (gAddr != null && !gAddr.isZero()) {
268 for (DeviceId devId : oltData.keySet()) {
269 StateMachine.specialQuery(devId, gAddr, maxResp);
270 }
271 } else {
272 //Don't know which group is targeted by the query
273 //So query all the members(in all the OLTs) and proxy their reports
274 StateMachine.generalQuery(maxResp);
275 }
276 }
277
278
279 private int calculateMaxResp(int maxResp) {
ke han81a38b92017-03-10 18:41:44 +0800280 if (maxResp >= 128) {
281 int mant = maxResp & 0xf;
282 int exp = (maxResp >> 4) & 0x7;
283 maxResp = (mant | 0x10) << (exp + 3);
284 }
285
Deepa Vaddireddybcd52352017-09-21 05:04:48 +0000286 return (maxResp + 5) / 10;
ke han81a38b92017-03-10 18:41:44 +0800287 }
288
289 private Ip4Address ssmTranslateRoute(IpAddress group) {
290 return ssmTranslateTable.get(group);
291 }
292
293 private void processIgmpReport(IGMPMembership igmpGroup, VlanId vlan, ConnectPoint cp, byte igmpType) {
294 DeviceId deviceId = cp.deviceId();
295 PortNumber portNumber = cp.port();
296
297 Ip4Address groupIp = igmpGroup.getGaddr().getIp4Address();
298 if (!groupIp.isMulticast()) {
299 log.info(groupIp.toString() + " is not a valid group address");
300 return;
301 }
302 Ip4Address srcIp = getDeviceIp(deviceId);
303
304 byte recordType = igmpGroup.getRecordType();
305 boolean join = false;
306
307 ArrayList<Ip4Address> sourceList = new ArrayList<>();
308
309 if (igmpGroup.getSources().size() > 0) {
310 igmpGroup.getSources().forEach(source -> sourceList.add(source.getIp4Address()));
311 if (recordType == IGMPMembership.CHANGE_TO_EXCLUDE_MODE ||
312 recordType == IGMPMembership.MODE_IS_EXCLUDE ||
313 recordType == IGMPMembership.BLOCK_OLD_SOURCES) {
314 join = false;
315 } else if (recordType == IGMPMembership.CHANGE_TO_INCLUDE_MODE ||
316 recordType == IGMPMembership.MODE_IS_INCLUDE ||
317 recordType == IGMPMembership.ALLOW_NEW_SOURCES) {
318 join = true;
319 }
320 } else {
ke han29af27b2017-09-08 10:29:12 +0800321 IpAddress src = null;
322 if (pimSSmInterworking) {
323 src = ssmTranslateRoute(groupIp);
324 if (src == null) {
325 log.info("no ssm translate for group " + groupIp.toString());
326 return;
327 }
328 } else {
329 src = IpAddress.valueOf(DEFAULT_PIMSSM_HOST);
ke han81a38b92017-03-10 18:41:44 +0800330 }
331 sourceList.add(src.getIp4Address());
332 if (recordType == IGMPMembership.CHANGE_TO_EXCLUDE_MODE ||
333 recordType == IGMPMembership.MODE_IS_EXCLUDE ||
334 recordType == IGMPMembership.BLOCK_OLD_SOURCES) {
335 join = true;
336 } else if (recordType == IGMPMembership.CHANGE_TO_INCLUDE_MODE ||
337 recordType == IGMPMembership.MODE_IS_INCLUDE ||
338 recordType == IGMPMembership.ALLOW_NEW_SOURCES) {
339 join = false;
340 }
341 }
342 String groupMemberKey = GroupMember.getkey(groupIp, deviceId, portNumber);
343 GroupMember groupMember = groupMemberMap.get(groupMemberKey);
344
345 if (join) {
346 if (groupMember == null) {
347 if (igmpType == IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT) {
348 groupMember = new GroupMember(groupIp, vlan, deviceId, portNumber, true);
349 } else {
350 groupMember = new GroupMember(groupIp, vlan, deviceId, portNumber, false);
351 }
352 StateMachine.join(deviceId, groupIp, srcIp);
353 groupMemberMap.put(groupMemberKey, groupMember);
354 groupMember.updateList(recordType, sourceList);
355 groupMember.getSourceList().forEach(source -> multicastService.addSink(
356 new McastRoute(source, groupIp, McastRoute.Type.IGMP), cp));
357 }
358 groupMember.resetAllTimers();
359 groupMember.updateList(recordType, sourceList);
360 groupMember.setLeave(false);
361 } else {
362 if (groupMember == null) {
363 log.info("receive leave but no instance, group " + groupIp.toString() +
364 " device:" + deviceId.toString() + " port:" + portNumber.toString());
365 return;
366 } else {
367 groupMember.setLeave(true);
368 if (fastLeave) {
369 leaveAction(groupMember);
370 } else {
371 sendQuery(groupMember);
372 }
373 }
374 }
375 }
376
377 private void leaveAction(GroupMember groupMember) {
378 ConnectPoint cp = new ConnectPoint(groupMember.getDeviceId(), groupMember.getPortNumber());
379 StateMachine.leave(groupMember.getDeviceId(), groupMember.getGroupIp());
380 groupMember.getSourceList().forEach(source -> multicastService.removeSink(
381 new McastRoute(source, groupMember.getGroupIp(),
382 McastRoute.Type.IGMP), cp));
383 groupMemberMap.remove(groupMember.getId());
384 }
385
386 private void sendQuery(GroupMember groupMember) {
387 Ethernet ethpkt;
388 Ip4Address srcIp = getDeviceIp(groupMember.getDeviceId());
389 if (groupMember.getv2()) {
390 ethpkt = IgmpSender.getInstance().buildIgmpV2Query(groupMember.getGroupIp(), srcIp);
391 } else {
392 ethpkt = IgmpSender.getInstance().buildIgmpV3Query(groupMember.getGroupIp(), srcIp);
393 }
394 IgmpSender.getInstance().sendIgmpPacket(ethpkt, groupMember.getDeviceId(), groupMember.getPortNumber());
395 }
396
397 private void processFilterObjective(DeviceId devId, Port port, boolean remove) {
398
399 //TODO migrate to packet requests when packet service uses filtering objectives
400 DefaultFilteringObjective.Builder builder = DefaultFilteringObjective.builder();
401
402 builder = remove ? builder.deny() : builder.permit();
403
404 FilteringObjective igmp = builder
405 .withKey(Criteria.matchInPort(port.number()))
406 .addCondition(Criteria.matchEthType(EthType.EtherType.IPV4.ethType()))
407 .addCondition(Criteria.matchIPProtocol(IPv4.PROTOCOL_IGMP))
408 .withMeta(DefaultTrafficTreatment.builder().setOutput(PortNumber.CONTROLLER).build())
409 .fromApp(appId)
410 .withPriority(10000)
411 .add(new ObjectiveContext() {
412 @Override
413 public void onSuccess(Objective objective) {
414 log.info("IgmpProxy filter for {} on {} installed.",
415 devId, port);
416 }
417
418 @Override
419 public void onError(Objective objective, ObjectiveError error) {
420 log.info("IgmpProxy filter for {} on {} failed because {}.",
421 devId, port, error);
422 }
423 });
424
425 flowObjectiveService.filter(devId, igmp);
426 }
427
428 /**
429 * Packet processor responsible for forwarding packets along their paths.
430 */
431 private class IgmpPacketProcessor implements PacketProcessor {
432 @Override
433 public void process(PacketContext context) {
434
435 try {
436 InboundPacket pkt = context.inPacket();
437 Ethernet ethPkt = pkt.parsed();
438 if (ethPkt == null) {
439 return;
440 }
441
442 if (ethPkt.getEtherType() != Ethernet.TYPE_IPV4) {
443 return;
444 }
445
446 IPv4 ipv4Pkt = (IPv4) ethPkt.getPayload();
447
448 if (ipv4Pkt.getProtocol() != IPv4.PROTOCOL_IGMP) {
449 return;
450 }
451
452 short vlan = ethPkt.getVlanID();
453 DeviceId deviceId = pkt.receivedFrom().deviceId();
454
Deepa Vaddireddybcd52352017-09-21 05:04:48 +0000455 if (oltData.get(deviceId) == null &&
456 !(connectPointMode && deviceId.equals(connectPoint.deviceId()))) {
ke han81a38b92017-03-10 18:41:44 +0800457 log.error("Device not registered in netcfg :" + deviceId.toString());
458 return;
459 }
460
461 IGMP igmp = (IGMP) ipv4Pkt.getPayload();
462 switch (igmp.getIgmpType()) {
463 case IGMP.TYPE_IGMPV3_MEMBERSHIP_QUERY:
464 //Discard Query from OLT’s non-uplink port’s
465 if (!pkt.receivedFrom().port().equals(getDeviceUplink(deviceId))) {
Deepa Vaddireddybcd52352017-09-21 05:04:48 +0000466 if (isConnectPoint(deviceId, pkt.receivedFrom().port())) {
467 log.info("IGMP Picked up query from connectPoint");
468 //OK to process packet
469 processIgmpConnectPointQuery((IGMPQuery) igmp.getGroups().get(0), pkt.receivedFrom(),
470 0xff & igmp.getMaxRespField());
471 break;
472 } else {
473 //Not OK to process packet
474 log.warn("IGMP Picked up query from non-uplink port");
475 return;
476 }
ke han81a38b92017-03-10 18:41:44 +0800477 }
478
479 processIgmpQuery((IGMPQuery) igmp.getGroups().get(0), pkt.receivedFrom(),
480 0xff & igmp.getMaxRespField());
481 break;
482 case IGMP.TYPE_IGMPV1_MEMBERSHIP_REPORT:
483 log.debug("IGMP version 1 message types are not currently supported.");
484 break;
485 case IGMP.TYPE_IGMPV3_MEMBERSHIP_REPORT:
486 case IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT:
487 case IGMP.TYPE_IGMPV2_LEAVE_GROUP:
488 //Discard join/leave from OLT’s uplink port’s
489 if (pkt.receivedFrom().port().equals(getDeviceUplink(deviceId))) {
490 log.info("IGMP Picked up join/leave from the olt uplink port");
491 return;
492 }
493
494 Iterator<IGMPGroup> itr = igmp.getGroups().iterator();
495 while (itr.hasNext()) {
496 IGMPGroup group = itr.next();
497 if (group instanceof IGMPMembership) {
498 processIgmpReport((IGMPMembership) group, VlanId.vlanId(vlan),
499 pkt.receivedFrom(), igmp.getIgmpType());
500 } else if (group instanceof IGMPQuery) {
501 IGMPMembership mgroup;
502 mgroup = new IGMPMembership(group.getGaddr().getIp4Address());
503 mgroup.setRecordType(igmp.getIgmpType() == IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT ?
504 IGMPMembership.MODE_IS_EXCLUDE : IGMPMembership.MODE_IS_INCLUDE);
505 processIgmpReport(mgroup, VlanId.vlanId(vlan),
506 pkt.receivedFrom(), igmp.getIgmpType());
507 }
508 }
509 break;
510
511 default:
512 log.info("wrong IGMP v3 type:" + igmp.getIgmpType());
513 break;
514 }
515
516 } catch (Exception ex) {
Deepa Vaddireddybcd52352017-09-21 05:04:48 +0000517 log.error("igmp process error : {} ", ex);
ke han81a38b92017-03-10 18:41:44 +0800518 ex.printStackTrace();
519 }
520 }
521 }
522
523 private class IgmpProxyTimerTask extends TimerTask {
524 public void run() {
525 try {
526 IgmpTimer.timeOut1s();
527 queryMembers();
528 } catch (Exception ex) {
529 log.warn("Igmp timer task error : {}", ex.getMessage());
530 }
531 }
532
533 private void queryMembers() {
534 GroupMember groupMember;
535 Set groupMemberSet = groupMemberMap.entrySet();
536 Iterator itr = groupMemberSet.iterator();
537 while (itr.hasNext()) {
538 Map.Entry entry = (Map.Entry) itr.next();
539 groupMember = (GroupMember) entry.getValue();
540 DeviceId did = groupMember.getDeviceId();
541 if (mastershipService.isLocalMaster(did)) {
542 if (groupMember.isLeave()) {
543 lastQuery(groupMember);
544 } else if (periodicQuery) {
545 periodicQuery(groupMember);
546 }
547 }
548 }
549 }
550
551 private void lastQuery(GroupMember groupMember) {
552 if (groupMember.getLastQueryInterval() < lastQueryInterval) {
553 groupMember.lastQueryInterval(true); // count times
554 } else if (groupMember.getLastQueryCount() < lastQueryCount - 1) {
555 sendQuery(groupMember);
556 groupMember.lastQueryInterval(false); // reset count number
557 groupMember.lastQueryCount(true); //count times
558 } else if (groupMember.getLastQueryCount() == lastQueryCount - 1) {
559 leaveAction(groupMember);
560 }
561 }
562
563 private void periodicQuery(GroupMember groupMember) {
564 if (groupMember.getKeepAliveQueryInterval() < keepAliveInterval) {
565 groupMember.keepAliveInterval(true);
566 } else if (groupMember.getKeepAliveQueryCount() < keepAliveCount) {
567 sendQuery(groupMember);
568 groupMember.keepAliveInterval(false);
569 groupMember.keepAliveQueryCount(true);
570 } else if (groupMember.getKeepAliveQueryCount() == keepAliveCount) {
571 leaveAction(groupMember);
572 }
573 }
574
575 }
576
577 public static PortNumber getDeviceUplink(DeviceId devId) {
Deepa Vaddireddybcd52352017-09-21 05:04:48 +0000578 if (oltData.get(devId) != null) {
579 return oltData.get(devId).uplink();
580 } else {
581 return null;
582 }
ke han81a38b92017-03-10 18:41:44 +0800583 }
584
585 private void processFilterObjective(DeviceId devId, PortNumber port, boolean remove) {
586 //TODO migrate to packet requests when packet service uses filtering objectives
587 DefaultFilteringObjective.Builder builder = DefaultFilteringObjective.builder();
588
589 builder = remove ? builder.deny() : builder.permit();
590
591 FilteringObjective igmp = builder
592 .withKey(Criteria.matchInPort(port))
593 .addCondition(Criteria.matchEthType(EthType.EtherType.IPV4.ethType()))
594 .addCondition(Criteria.matchIPProtocol(IPv4.PROTOCOL_IGMP))
595 .withMeta(DefaultTrafficTreatment.builder().setOutput(PortNumber.CONTROLLER).build())
596 .fromApp(appId)
597 .withPriority(10000)
598 .add(new ObjectiveContext() {
599 @Override
600 public void onSuccess(Objective objective) {
601 log.info("IgmpProxy filter for {} on {} installed.",
602 devId, port);
603 }
604
605 @Override
606 public void onError(Objective objective, ObjectiveError error) {
607 log.info("IgmpProxy filter for {} on {} failed because {}.",
608 devId, port, error);
609 }
610 });
611
612 flowObjectiveService.filter(devId, igmp);
613 }
614
615 private boolean isConnectPoint(DeviceId device, PortNumber port) {
Deepa Vaddireddy01f33b42017-07-02 13:26:53 +0530616 if (connectPoint != null) {
617 return (connectPointMode && connectPoint.deviceId().equals(device)
618 && connectPoint.port().equals(port));
619 } else {
620 log.info("connectPoint not configured for device {}", device);
621 return false;
622 }
ke han81a38b92017-03-10 18:41:44 +0800623 }
Deepa Vaddireddy01f33b42017-07-02 13:26:53 +0530624
ke han81a38b92017-03-10 18:41:44 +0800625 private boolean isUplink(DeviceId device, PortNumber port) {
626 return ((!connectPointMode) && oltData.containsKey(device)
627 && oltData.get(device).uplink().equals(port));
628 }
629
630 private class InternalDeviceListener implements DeviceListener {
631 @Override
632 public void event(DeviceEvent event) {
633 DeviceId devId = event.subject().id();
634 PortNumber port;
635 if (oltData.get(devId) == null) {
636 return;
637 }
638 switch (event.type()) {
639
640 case DEVICE_ADDED:
641 case DEVICE_UPDATED:
642 case DEVICE_REMOVED:
643 case DEVICE_SUSPENDED:
644 case DEVICE_AVAILABILITY_CHANGED:
645 case PORT_STATS_UPDATED:
646 break;
647 case PORT_ADDED:
648 port = event.port().number();
649 if (oltData.containsKey(devId) && !isUplink(devId, port) && !isConnectPoint(devId, port)) {
650 processFilterObjective(devId, port, false);
651 } else if (isUplink(devId, port)) {
652 provisionUplinkFlows();
653 } else if (isConnectPoint(devId, port)) {
654 provisionConnectPointFlows();
655 }
656 break;
657 case PORT_UPDATED:
658 port = event.port().number();
659 if (oltData.containsKey(devId) && !isUplink(devId, port) && !isConnectPoint(devId, port)) {
660 if (event.port().isEnabled()) {
661 processFilterObjective(devId, port, false);
662 } else {
663 processFilterObjective(devId, port, true);
664 }
665 } else if (isUplink(devId, port)) {
666 if (event.port().isEnabled()) {
667 provisionUplinkFlows(devId);
668 } else {
669 processFilterObjective(devId, port, true);
670 }
671 } else if (isConnectPoint(devId, port)) {
672 if (event.port().isEnabled()) {
673 provisionConnectPointFlows();
674 } else {
675 unprovisionConnectPointFlows();
676 }
677 }
678 break;
679 case PORT_REMOVED:
680 port = event.port().number();
681 processFilterObjective(devId, port, true);
682 break;
683 default:
684 log.info("Unknown device event {}", event.type());
685 break;
686 }
687 }
688
689 @Override
690 public boolean isRelevant(DeviceEvent event) {
691 return true;
692 }
693 }
694
695 private class InternalNetworkConfigListener implements NetworkConfigListener {
696
697 private void reconfigureNetwork(IgmpproxyConfig cfg) {
698 IgmpproxyConfig newCfg;
699 if (cfg == null) {
700 newCfg = new IgmpproxyConfig();
701 } else {
702 newCfg = cfg;
703 }
704
705 unSolicitedTimeout = newCfg.unsolicitedTimeOut();
706 maxResp = newCfg.maxResp();
707 keepAliveInterval = newCfg.keepAliveInterval();
708 keepAliveCount = newCfg.keepAliveCount();
709 lastQueryInterval = newCfg.lastQueryInterval();
710 lastQueryCount = newCfg.lastQueryCount();
711 withRAUplink = newCfg.withRAUplink();
712 withRADownlink = newCfg.withRADownlink();
713 igmpCos = newCfg.igmpCos();
714 periodicQuery = newCfg.periodicQuery();
715 fastLeave = newCfg.fastLeave();
ke han29af27b2017-09-08 10:29:12 +0800716 pimSSmInterworking = newCfg.pimSsmInterworking();
ke han81a38b92017-03-10 18:41:44 +0800717 connectPoint = newCfg.connectPoint();
718 if (connectPointMode != newCfg.connectPointMode()) {
719 connectPointMode = newCfg.connectPointMode();
720 if (connectPointMode) {
721 unprovisionUplinkFlows();
722 provisionConnectPointFlows();
723 } else {
724 unprovisionConnectPointFlows();
725 provisionUplinkFlows();
726 }
727 }
728 if (connectPoint != null) {
729 log.info("connect point :" + connectPoint.toString());
730 }
731 log.info(" mode: " + connectPointMode);
732
733 IgmpSender.getInstance().setIgmpCos(igmpCos);
734 IgmpSender.getInstance().setMaxResp(maxResp);
735 IgmpSender.getInstance().setMvlan(mvlan);
736 IgmpSender.getInstance().setWithRADownlink(withRADownlink);
737 IgmpSender.getInstance().setWithRAUplink(withRAUplink);
738
739 }
740
741 public void reconfigureSsmTable(IgmpproxySsmTranslateConfig cfg) {
742 if (cfg == null) {
743 return;
744 }
745 Collection<McastRoute> translations = cfg.getSsmTranslations();
746 for (McastRoute route : translations) {
747 ssmTranslateTable.put(route.group().getIp4Address(), route.source().getIp4Address());
748 }
749 }
750
751 @Override
752 public void event(NetworkConfigEvent event) {
753 switch (event.type()) {
754 case CONFIG_ADDED:
755 case CONFIG_UPDATED:
756 if (event.configClass().equals(CONFIG_CLASS)) {
757 AccessDeviceConfig config =
758 networkConfig.getConfig((DeviceId) event.subject(), CONFIG_CLASS);
759 if (config != null) {
760 oltData.put(config.getAccessDevice().deviceId(), config.getAccessDevice());
761 provisionDefaultFlows((DeviceId) event.subject());
762 provisionUplinkFlows((DeviceId) event.subject());
763 }
764 }
765
766 if (event.configClass().equals(IGMPPROXY_CONFIG_CLASS)) {
767 IgmpproxyConfig config = networkConfig.getConfig(appId, IGMPPROXY_CONFIG_CLASS);
768 if (config != null) {
769 reconfigureNetwork(config);
770 }
771 }
772
773 if (event.configClass().equals(IGMPPROXY_SSM_CONFIG_CLASS)) {
774 IgmpproxySsmTranslateConfig config = networkConfig.getConfig(appId, IGMPPROXY_SSM_CONFIG_CLASS);
775 if (config != null) {
776 reconfigureSsmTable(config);
777 }
778 }
779
780 if (event.configClass().equals(MCAST_CONFIG_CLASS)) {
781 McastConfig config = networkConfig.getConfig(coreAppId, MCAST_CONFIG_CLASS);
782 if (config != null && mvlan != config.egressVlan().toShort()) {
783 mvlan = config.egressVlan().toShort();
Deepa Vaddireddy88134a42017-07-05 12:16:03 +0530784 IgmpSender.getInstance().setMvlan(mvlan);
ke han81a38b92017-03-10 18:41:44 +0800785 groupMemberMap.values().forEach(m -> leaveAction(m));
786 }
787 }
788
789 log.info("Reconfigured");
790 break;
791 case CONFIG_REGISTERED:
792 case CONFIG_UNREGISTERED:
793 break;
794 case CONFIG_REMOVED:
795 if (event.configClass().equals(CONFIG_CLASS)) {
796 oltData.remove(event.subject());
797 }
798
799 default:
800 break;
801 }
802 }
803 }
804
805 private void provisionDefaultFlows(DeviceId deviceId) {
806 List<Port> ports = deviceService.getPorts(deviceId);
807 ports.stream()
808 .filter(p -> (!oltData.get(p.element().id()).uplink().equals(p.number()) && p.isEnabled()))
809 .forEach(p -> processFilterObjective((DeviceId) p.element().id(), p.number(), false));
810 }
811
812 private void provisionUplinkFlows(DeviceId deviceId) {
813 if (connectPointMode) {
814 return;
815 }
816
817 processFilterObjective(deviceId, oltData.get(deviceId).uplink(), false);
818 }
819
820 private void provisionUplinkFlows() {
821 if (connectPointMode) {
822 return;
823 }
824
David K. Bainbridge4809c0e2017-08-17 09:54:40 -0700825 oltData.keySet().forEach(deviceId -> provisionUplinkFlows(deviceId));
ke han81a38b92017-03-10 18:41:44 +0800826 }
827 private void unprovisionUplinkFlows() {
828 oltData.keySet().forEach(deviceId ->
829 processFilterObjective(deviceId, oltData.get(deviceId).uplink(), true));
830 }
831
832 private void provisionConnectPointFlows() {
833 if ((!connectPointMode) || connectPoint == null) {
834 return;
835 }
836
837 processFilterObjective(connectPoint.deviceId(), connectPoint.port(), false);
838 }
839 private void unprovisionConnectPointFlows() {
840 if (connectPoint == null) {
841 return;
842 }
843 processFilterObjective(connectPoint.deviceId(), connectPoint.port(), true);
844 }
845}