blob: 99a4773bd6ea334a542bcb3b0e6b7332a9dbd8d3 [file] [log] [blame]
David K. Bainbridged77028f2017-08-01 12:47:55 -07001/*
Brian O'Connor4d084702017-08-03 22:45:58 -07002 * Copyright 2017-present Open Networking Foundation
David K. Bainbridged77028f2017-08-01 12:47:55 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
ke han81a38b92017-03-10 18:41:44 +080016package org.opencord.igmpproxy;
17
18import com.google.common.collect.Maps;
19import org.apache.felix.scr.annotations.Activate;
20import org.apache.felix.scr.annotations.Component;
21import org.apache.felix.scr.annotations.Deactivate;
22import org.apache.felix.scr.annotations.Reference;
23import org.apache.felix.scr.annotations.ReferenceCardinality;
24import org.onlab.packet.EthType;
25import org.onlab.packet.Ethernet;
26import org.onlab.packet.IGMP;
27import org.onlab.packet.IGMPGroup;
28import org.onlab.packet.IGMPMembership;
29import org.onlab.packet.IGMPQuery;
30import org.onlab.packet.IPv4;
31import org.onlab.packet.Ip4Address;
32import org.onlab.packet.IpAddress;
33import org.onlab.packet.VlanId;
34import org.onosproject.core.ApplicationId;
35import org.onosproject.core.CoreService;
36import org.onosproject.incubator.net.config.basics.McastConfig;
37import org.onosproject.mastership.MastershipService;
38import org.onosproject.net.AnnotationKeys;
39import org.onosproject.net.ConnectPoint;
40import org.onosproject.net.DeviceId;
41import org.onosproject.net.Port;
42import org.onosproject.net.PortNumber;
43import org.onosproject.net.config.ConfigFactory;
44import org.onosproject.net.config.NetworkConfigEvent;
45import org.onosproject.net.config.NetworkConfigListener;
46import org.onosproject.net.config.NetworkConfigRegistry;
47import org.onosproject.net.config.basics.SubjectFactories;
48import org.onosproject.net.device.DeviceEvent;
49import org.onosproject.net.device.DeviceListener;
50import org.onosproject.net.device.DeviceService;
51import org.onosproject.net.flow.DefaultTrafficTreatment;
52import org.onosproject.net.flow.FlowRuleService;
53import org.onosproject.net.flow.criteria.Criteria;
54import org.onosproject.net.flowobjective.DefaultFilteringObjective;
55import org.onosproject.net.flowobjective.FilteringObjective;
56import org.onosproject.net.flowobjective.FlowObjectiveService;
57import org.onosproject.net.flowobjective.Objective;
58import org.onosproject.net.flowobjective.ObjectiveContext;
59import org.onosproject.net.flowobjective.ObjectiveError;
60import org.onosproject.net.mcast.McastRoute;
61import org.onosproject.net.mcast.MulticastRouteService;
62import org.onosproject.net.packet.InboundPacket;
63import org.onosproject.net.packet.PacketContext;
64import org.onosproject.net.packet.PacketProcessor;
65import org.onosproject.net.packet.PacketService;
66import org.opencord.cordconfig.access.AccessDeviceConfig;
67import org.opencord.cordconfig.access.AccessDeviceData;
68import org.slf4j.Logger;
69import org.slf4j.LoggerFactory;
70
71import java.util.ArrayList;
72import java.util.Collection;
73import java.util.Iterator;
74import java.util.List;
75import java.util.Map;
76import java.util.Set;
77import java.util.TimerTask;
78import java.util.concurrent.ConcurrentHashMap;
79import java.util.concurrent.Executors;
80import java.util.concurrent.ScheduledExecutorService;
81import java.util.concurrent.TimeUnit;
82
83/**
84 * Igmp process application, use proxy mode, support first join/ last leave , fast leave
85 * period query and keep alive, packet out igmp message to uplink port features.
86 */
87@Component(immediate = true)
88public class IgmpManager {
89
90 private static final Class<AccessDeviceConfig> CONFIG_CLASS =
91 AccessDeviceConfig.class;
92 private static final Class<IgmpproxyConfig> IGMPPROXY_CONFIG_CLASS =
93 IgmpproxyConfig.class;
94 private static final Class<IgmpproxySsmTranslateConfig> IGMPPROXY_SSM_CONFIG_CLASS =
95 IgmpproxySsmTranslateConfig.class;
96 private static final Class<McastConfig> MCAST_CONFIG_CLASS =
97 McastConfig.class;
98 public static Map<String, GroupMember> groupMemberMap = Maps.newConcurrentMap();
99 private static ApplicationId appId;
100 private static Map<DeviceId, AccessDeviceData> oltData = new ConcurrentHashMap<>();
101 private static int unSolicitedTimeout = 3; // unit is 1 sec
102 private static int keepAliveCount = 3;
103 private static int lastQueryInterval = 2; //unit is 1 sec
104 private static int lastQueryCount = 2;
105 private static boolean fastLeave = true;
106 private static boolean withRAUplink = true;
107 private static boolean withRADownlink = false;
108 private static boolean periodicQuery = true;
109 private static short mvlan = 4000;
110 private static byte igmpCos = 7;
111 public static boolean connectPointMode = true;
112 public static ConnectPoint connectPoint = null;
113
ke hanf9ed58c2017-09-08 10:29:12 +0800114 private static boolean pimSSmInterworking = false;
115 private static final String DEFAULT_PIMSSM_HOST = "127.0.0.1";
ke han81a38b92017-03-10 18:41:44 +0800116 private final ScheduledExecutorService scheduledExecutorService =
117 Executors.newScheduledThreadPool(1);
118 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
119 protected CoreService coreService;
120 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
121 protected PacketService packetService;
122 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
123 protected MastershipService mastershipService;
124 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
125 protected FlowRuleService flowRuleService;
126 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
127 protected DeviceService deviceService;
128 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
129 protected FlowObjectiveService flowObjectiveService;
130 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
131 protected NetworkConfigRegistry networkConfig;
132 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
133 protected MulticastRouteService multicastService;
134 private IgmpPacketProcessor processor = new IgmpPacketProcessor();
135 private Logger log = LoggerFactory.getLogger(getClass());
136 private ApplicationId coreAppId;
137 private Map<Ip4Address, Ip4Address> ssmTranslateTable = new ConcurrentHashMap<>();
138 private InternalNetworkConfigListener configListener =
139 new InternalNetworkConfigListener();
140 private DeviceListener deviceListener = new InternalDeviceListener();
141 private ConfigFactory<DeviceId, AccessDeviceConfig> configFactory = null;
142 private ConfigFactory<ApplicationId, IgmpproxyConfig> igmpproxyConfigFactory =
143 new ConfigFactory<ApplicationId, IgmpproxyConfig>(
144 SubjectFactories.APP_SUBJECT_FACTORY, IGMPPROXY_CONFIG_CLASS, "igmpproxy") {
145 @Override
146 public IgmpproxyConfig createConfig() {
147 return new IgmpproxyConfig();
148 }
149 };
150 private ConfigFactory<ApplicationId, IgmpproxySsmTranslateConfig> igmpproxySsmConfigFactory =
151 new ConfigFactory<ApplicationId, IgmpproxySsmTranslateConfig>(
152 SubjectFactories.APP_SUBJECT_FACTORY, IGMPPROXY_SSM_CONFIG_CLASS, "ssmTranslate", true) {
153 @Override
154 public IgmpproxySsmTranslateConfig createConfig() {
155 return new IgmpproxySsmTranslateConfig();
156 }
157 };
158 private int maxResp = 10; //unit is 1 sec
159 private int keepAliveInterval = 120; //unit is 1 sec
160
161 public static int getUnsolicitedTimeout() {
162 return unSolicitedTimeout;
163 }
164
165 @Activate
166 protected void activate() {
167 appId = coreService.registerApplication("org.opencord.igmpproxy");
168 coreAppId = coreService.registerApplication(CoreService.CORE_APP_NAME);
169 packetService.addProcessor(processor, PacketProcessor.director(4));
170 IgmpSender.init(packetService, mastershipService);
171
172 if (networkConfig.getConfigFactory(CONFIG_CLASS) == null) {
173 configFactory =
174 new ConfigFactory<DeviceId, AccessDeviceConfig>(
175 SubjectFactories.DEVICE_SUBJECT_FACTORY, CONFIG_CLASS, "accessDevice") {
176 @Override
177 public AccessDeviceConfig createConfig() {
178 return new AccessDeviceConfig();
179 }
180 };
181 networkConfig.registerConfigFactory(configFactory);
182 }
183 networkConfig.registerConfigFactory(igmpproxySsmConfigFactory);
184 networkConfig.registerConfigFactory(igmpproxyConfigFactory);
185 networkConfig.addListener(configListener);
186
187 configListener.reconfigureNetwork(networkConfig.getConfig(appId, IGMPPROXY_CONFIG_CLASS));
188 configListener.reconfigureSsmTable(networkConfig.getConfig(appId, IGMPPROXY_SSM_CONFIG_CLASS));
189
190 networkConfig.getSubjects(DeviceId.class, AccessDeviceConfig.class).forEach(
191 subject -> {
192 AccessDeviceConfig config = networkConfig.getConfig(subject,
193 AccessDeviceConfig.class);
194 if (config != null) {
195 AccessDeviceData data = config.getAccessDevice();
196 oltData.put(data.deviceId(), data);
197 }
198 }
199 );
200
David K. Bainbridge4809c0e2017-08-17 09:54:40 -0700201 oltData.keySet().forEach(d -> provisionDefaultFlows(d));
ke han81a38b92017-03-10 18:41:44 +0800202 if (connectPointMode) {
203 provisionConnectPointFlows();
204 } else {
205 provisionUplinkFlows();
206 }
207
208 McastConfig config = networkConfig.getConfig(coreAppId, MCAST_CONFIG_CLASS);
209 if (config != null) {
210 mvlan = config.egressVlan().toShort();
Deepa Vaddireddy88134a42017-07-05 12:16:03 +0530211 IgmpSender.getInstance().setMvlan(mvlan);
ke han81a38b92017-03-10 18:41:44 +0800212 }
213 deviceService.addListener(deviceListener);
214 scheduledExecutorService.scheduleAtFixedRate(new IgmpProxyTimerTask(), 0, 1000, TimeUnit.MILLISECONDS);
215
216 log.info("Started");
217 }
218
219 @Deactivate
220 protected void deactivate() {
221 scheduledExecutorService.shutdown();
222
223 // de-register and null our handler
224 networkConfig.removeListener(configListener);
225 if (configFactory != null) {
226 networkConfig.unregisterConfigFactory(configFactory);
227 }
228 networkConfig.unregisterConfigFactory(igmpproxyConfigFactory);
229 networkConfig.unregisterConfigFactory(igmpproxySsmConfigFactory);
230 deviceService.removeListener(deviceListener);
231 packetService.removeProcessor(processor);
232 flowRuleService.removeFlowRulesById(appId);
233
234 log.info("Stopped");
235 }
236
237 protected Ip4Address getDeviceIp(DeviceId ofDeviceId) {
238 try {
239 String[] mgmtAddress = deviceService.getDevice(ofDeviceId)
240 .annotations().value(AnnotationKeys.MANAGEMENT_ADDRESS).split(":");
241 return Ip4Address.valueOf(mgmtAddress[0]);
242 } catch (Exception ex) {
243 log.info("No valid Ipaddress for " + ofDeviceId.toString());
244 return null;
245 }
246 }
247
248 private void processIgmpQuery(IGMPQuery igmpQuery, ConnectPoint cp, int maxResp) {
249
250 DeviceId deviceId = cp.deviceId();
251 Ip4Address gAddr = igmpQuery.getGaddr().getIp4Address();
Deepa Vaddireddy8467a922017-09-21 05:04:48 +0000252 maxResp = calculateMaxResp(maxResp);
253 if (gAddr != null && !gAddr.isZero()) {
254 StateMachine.specialQuery(deviceId, gAddr, maxResp);
255 } else {
256 StateMachine.generalQuery(deviceId, maxResp);
257 }
258 }
ke han81a38b92017-03-10 18:41:44 +0800259
Deepa Vaddireddy8467a922017-09-21 05:04:48 +0000260 private void processIgmpConnectPointQuery(IGMPQuery igmpQuery, ConnectPoint cp, int maxResp) {
261
262 DeviceId deviceId = cp.deviceId();
263 Ip4Address gAddr = igmpQuery.getGaddr().getIp4Address();
264 maxResp = calculateMaxResp(maxResp);
265 //The query is received on the ConnectPoint
266 // send query accordingly to the registered OLT devices.
267 if (gAddr != null && !gAddr.isZero()) {
268 for (DeviceId devId : oltData.keySet()) {
269 StateMachine.specialQuery(devId, gAddr, maxResp);
270 }
271 } else {
272 //Don't know which group is targeted by the query
273 //So query all the members(in all the OLTs) and proxy their reports
274 StateMachine.generalQuery(maxResp);
275 }
276 }
277
278
279 private int calculateMaxResp(int maxResp) {
ke han81a38b92017-03-10 18:41:44 +0800280 if (maxResp >= 128) {
281 int mant = maxResp & 0xf;
282 int exp = (maxResp >> 4) & 0x7;
283 maxResp = (mant | 0x10) << (exp + 3);
284 }
285
Deepa Vaddireddy8467a922017-09-21 05:04:48 +0000286 return (maxResp + 5) / 10;
ke han81a38b92017-03-10 18:41:44 +0800287 }
288
289 private Ip4Address ssmTranslateRoute(IpAddress group) {
290 return ssmTranslateTable.get(group);
291 }
292
293 private void processIgmpReport(IGMPMembership igmpGroup, VlanId vlan, ConnectPoint cp, byte igmpType) {
294 DeviceId deviceId = cp.deviceId();
295 PortNumber portNumber = cp.port();
296
297 Ip4Address groupIp = igmpGroup.getGaddr().getIp4Address();
298 if (!groupIp.isMulticast()) {
299 log.info(groupIp.toString() + " is not a valid group address");
300 return;
301 }
302 Ip4Address srcIp = getDeviceIp(deviceId);
303
304 byte recordType = igmpGroup.getRecordType();
305 boolean join = false;
306
307 ArrayList<Ip4Address> sourceList = new ArrayList<>();
308
309 if (igmpGroup.getSources().size() > 0) {
310 igmpGroup.getSources().forEach(source -> sourceList.add(source.getIp4Address()));
311 if (recordType == IGMPMembership.CHANGE_TO_EXCLUDE_MODE ||
312 recordType == IGMPMembership.MODE_IS_EXCLUDE ||
313 recordType == IGMPMembership.BLOCK_OLD_SOURCES) {
314 join = false;
315 } else if (recordType == IGMPMembership.CHANGE_TO_INCLUDE_MODE ||
316 recordType == IGMPMembership.MODE_IS_INCLUDE ||
317 recordType == IGMPMembership.ALLOW_NEW_SOURCES) {
318 join = true;
319 }
320 } else {
ke hanf9ed58c2017-09-08 10:29:12 +0800321 IpAddress src = null;
322 if (pimSSmInterworking) {
323 src = ssmTranslateRoute(groupIp);
324 if (src == null) {
325 log.info("no ssm translate for group " + groupIp.toString());
326 return;
327 }
328 } else {
329 src = IpAddress.valueOf(DEFAULT_PIMSSM_HOST);
ke han81a38b92017-03-10 18:41:44 +0800330 }
331 sourceList.add(src.getIp4Address());
332 if (recordType == IGMPMembership.CHANGE_TO_EXCLUDE_MODE ||
333 recordType == IGMPMembership.MODE_IS_EXCLUDE ||
334 recordType == IGMPMembership.BLOCK_OLD_SOURCES) {
335 join = true;
336 } else if (recordType == IGMPMembership.CHANGE_TO_INCLUDE_MODE ||
337 recordType == IGMPMembership.MODE_IS_INCLUDE ||
338 recordType == IGMPMembership.ALLOW_NEW_SOURCES) {
339 join = false;
340 }
341 }
342 String groupMemberKey = GroupMember.getkey(groupIp, deviceId, portNumber);
343 GroupMember groupMember = groupMemberMap.get(groupMemberKey);
344
345 if (join) {
346 if (groupMember == null) {
347 if (igmpType == IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT) {
348 groupMember = new GroupMember(groupIp, vlan, deviceId, portNumber, true);
349 } else {
350 groupMember = new GroupMember(groupIp, vlan, deviceId, portNumber, false);
351 }
352 StateMachine.join(deviceId, groupIp, srcIp);
353 groupMemberMap.put(groupMemberKey, groupMember);
354 groupMember.updateList(recordType, sourceList);
355 groupMember.getSourceList().forEach(source -> multicastService.addSink(
356 new McastRoute(source, groupIp, McastRoute.Type.IGMP), cp));
357 }
358 groupMember.resetAllTimers();
359 groupMember.updateList(recordType, sourceList);
360 groupMember.setLeave(false);
361 } else {
362 if (groupMember == null) {
363 log.info("receive leave but no instance, group " + groupIp.toString() +
364 " device:" + deviceId.toString() + " port:" + portNumber.toString());
365 return;
366 } else {
367 groupMember.setLeave(true);
368 if (fastLeave) {
369 leaveAction(groupMember);
370 } else {
371 sendQuery(groupMember);
372 }
373 }
374 }
375 }
376
377 private void leaveAction(GroupMember groupMember) {
378 ConnectPoint cp = new ConnectPoint(groupMember.getDeviceId(), groupMember.getPortNumber());
379 StateMachine.leave(groupMember.getDeviceId(), groupMember.getGroupIp());
380 groupMember.getSourceList().forEach(source -> multicastService.removeSink(
381 new McastRoute(source, groupMember.getGroupIp(),
382 McastRoute.Type.IGMP), cp));
383 groupMemberMap.remove(groupMember.getId());
384 }
385
386 private void sendQuery(GroupMember groupMember) {
387 Ethernet ethpkt;
388 Ip4Address srcIp = getDeviceIp(groupMember.getDeviceId());
389 if (groupMember.getv2()) {
390 ethpkt = IgmpSender.getInstance().buildIgmpV2Query(groupMember.getGroupIp(), srcIp);
391 } else {
392 ethpkt = IgmpSender.getInstance().buildIgmpV3Query(groupMember.getGroupIp(), srcIp);
393 }
394 IgmpSender.getInstance().sendIgmpPacket(ethpkt, groupMember.getDeviceId(), groupMember.getPortNumber());
395 }
396
397 private void processFilterObjective(DeviceId devId, Port port, boolean remove) {
398
399 //TODO migrate to packet requests when packet service uses filtering objectives
400 DefaultFilteringObjective.Builder builder = DefaultFilteringObjective.builder();
401
402 builder = remove ? builder.deny() : builder.permit();
403
404 FilteringObjective igmp = builder
405 .withKey(Criteria.matchInPort(port.number()))
406 .addCondition(Criteria.matchEthType(EthType.EtherType.IPV4.ethType()))
407 .addCondition(Criteria.matchIPProtocol(IPv4.PROTOCOL_IGMP))
408 .withMeta(DefaultTrafficTreatment.builder().setOutput(PortNumber.CONTROLLER).build())
409 .fromApp(appId)
410 .withPriority(10000)
411 .add(new ObjectiveContext() {
412 @Override
413 public void onSuccess(Objective objective) {
414 log.info("IgmpProxy filter for {} on {} installed.",
415 devId, port);
416 }
417
418 @Override
419 public void onError(Objective objective, ObjectiveError error) {
420 log.info("IgmpProxy filter for {} on {} failed because {}.",
421 devId, port, error);
422 }
423 });
424
425 flowObjectiveService.filter(devId, igmp);
426 }
427
428 /**
429 * Packet processor responsible for forwarding packets along their paths.
430 */
431 private class IgmpPacketProcessor implements PacketProcessor {
432 @Override
433 public void process(PacketContext context) {
434
435 try {
436 InboundPacket pkt = context.inPacket();
437 Ethernet ethPkt = pkt.parsed();
438 if (ethPkt == null) {
439 return;
440 }
441
442 if (ethPkt.getEtherType() != Ethernet.TYPE_IPV4) {
443 return;
444 }
445
446 IPv4 ipv4Pkt = (IPv4) ethPkt.getPayload();
447
448 if (ipv4Pkt.getProtocol() != IPv4.PROTOCOL_IGMP) {
449 return;
450 }
451
452 short vlan = ethPkt.getVlanID();
453 DeviceId deviceId = pkt.receivedFrom().deviceId();
454
Deepa Vaddireddy8467a922017-09-21 05:04:48 +0000455 if (oltData.get(deviceId) == null &&
Deepa Vaddireddya29db762017-09-28 13:47:18 +0000456 !isConnectPoint(deviceId, pkt.receivedFrom().port())) {
ke han81a38b92017-03-10 18:41:44 +0800457 log.error("Device not registered in netcfg :" + deviceId.toString());
458 return;
459 }
460
461 IGMP igmp = (IGMP) ipv4Pkt.getPayload();
462 switch (igmp.getIgmpType()) {
463 case IGMP.TYPE_IGMPV3_MEMBERSHIP_QUERY:
464 //Discard Query from OLT’s non-uplink port’s
465 if (!pkt.receivedFrom().port().equals(getDeviceUplink(deviceId))) {
Deepa Vaddireddy8467a922017-09-21 05:04:48 +0000466 if (isConnectPoint(deviceId, pkt.receivedFrom().port())) {
467 log.info("IGMP Picked up query from connectPoint");
468 //OK to process packet
469 processIgmpConnectPointQuery((IGMPQuery) igmp.getGroups().get(0), pkt.receivedFrom(),
470 0xff & igmp.getMaxRespField());
471 break;
472 } else {
473 //Not OK to process packet
474 log.warn("IGMP Picked up query from non-uplink port");
475 return;
476 }
ke han81a38b92017-03-10 18:41:44 +0800477 }
478
479 processIgmpQuery((IGMPQuery) igmp.getGroups().get(0), pkt.receivedFrom(),
480 0xff & igmp.getMaxRespField());
481 break;
482 case IGMP.TYPE_IGMPV1_MEMBERSHIP_REPORT:
483 log.debug("IGMP version 1 message types are not currently supported.");
484 break;
485 case IGMP.TYPE_IGMPV3_MEMBERSHIP_REPORT:
486 case IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT:
487 case IGMP.TYPE_IGMPV2_LEAVE_GROUP:
488 //Discard join/leave from OLT’s uplink port’s
Deepa Vaddireddya29db762017-09-28 13:47:18 +0000489 if (pkt.receivedFrom().port().equals(getDeviceUplink(deviceId)) ||
490 isConnectPoint(deviceId, pkt.receivedFrom().port())) {
491 log.info("IGMP Picked up join/leave from uplink/connectPoint port");
ke han81a38b92017-03-10 18:41:44 +0800492 return;
493 }
494
495 Iterator<IGMPGroup> itr = igmp.getGroups().iterator();
496 while (itr.hasNext()) {
497 IGMPGroup group = itr.next();
498 if (group instanceof IGMPMembership) {
499 processIgmpReport((IGMPMembership) group, VlanId.vlanId(vlan),
500 pkt.receivedFrom(), igmp.getIgmpType());
501 } else if (group instanceof IGMPQuery) {
502 IGMPMembership mgroup;
503 mgroup = new IGMPMembership(group.getGaddr().getIp4Address());
504 mgroup.setRecordType(igmp.getIgmpType() == IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT ?
505 IGMPMembership.MODE_IS_EXCLUDE : IGMPMembership.MODE_IS_INCLUDE);
506 processIgmpReport(mgroup, VlanId.vlanId(vlan),
507 pkt.receivedFrom(), igmp.getIgmpType());
508 }
509 }
510 break;
511
512 default:
513 log.info("wrong IGMP v3 type:" + igmp.getIgmpType());
514 break;
515 }
516
517 } catch (Exception ex) {
Deepa Vaddireddy8467a922017-09-21 05:04:48 +0000518 log.error("igmp process error : {} ", ex);
ke han81a38b92017-03-10 18:41:44 +0800519 ex.printStackTrace();
520 }
521 }
522 }
523
524 private class IgmpProxyTimerTask extends TimerTask {
525 public void run() {
526 try {
527 IgmpTimer.timeOut1s();
528 queryMembers();
529 } catch (Exception ex) {
530 log.warn("Igmp timer task error : {}", ex.getMessage());
531 }
532 }
533
534 private void queryMembers() {
535 GroupMember groupMember;
536 Set groupMemberSet = groupMemberMap.entrySet();
537 Iterator itr = groupMemberSet.iterator();
538 while (itr.hasNext()) {
539 Map.Entry entry = (Map.Entry) itr.next();
540 groupMember = (GroupMember) entry.getValue();
541 DeviceId did = groupMember.getDeviceId();
542 if (mastershipService.isLocalMaster(did)) {
543 if (groupMember.isLeave()) {
544 lastQuery(groupMember);
545 } else if (periodicQuery) {
546 periodicQuery(groupMember);
547 }
548 }
549 }
550 }
551
552 private void lastQuery(GroupMember groupMember) {
553 if (groupMember.getLastQueryInterval() < lastQueryInterval) {
554 groupMember.lastQueryInterval(true); // count times
555 } else if (groupMember.getLastQueryCount() < lastQueryCount - 1) {
556 sendQuery(groupMember);
557 groupMember.lastQueryInterval(false); // reset count number
558 groupMember.lastQueryCount(true); //count times
559 } else if (groupMember.getLastQueryCount() == lastQueryCount - 1) {
560 leaveAction(groupMember);
561 }
562 }
563
564 private void periodicQuery(GroupMember groupMember) {
565 if (groupMember.getKeepAliveQueryInterval() < keepAliveInterval) {
566 groupMember.keepAliveInterval(true);
567 } else if (groupMember.getKeepAliveQueryCount() < keepAliveCount) {
568 sendQuery(groupMember);
569 groupMember.keepAliveInterval(false);
570 groupMember.keepAliveQueryCount(true);
571 } else if (groupMember.getKeepAliveQueryCount() == keepAliveCount) {
572 leaveAction(groupMember);
573 }
574 }
575
576 }
577
578 public static PortNumber getDeviceUplink(DeviceId devId) {
Deepa Vaddireddy8467a922017-09-21 05:04:48 +0000579 if (oltData.get(devId) != null) {
580 return oltData.get(devId).uplink();
581 } else {
582 return null;
583 }
ke han81a38b92017-03-10 18:41:44 +0800584 }
585
586 private void processFilterObjective(DeviceId devId, PortNumber port, boolean remove) {
587 //TODO migrate to packet requests when packet service uses filtering objectives
588 DefaultFilteringObjective.Builder builder = DefaultFilteringObjective.builder();
589
590 builder = remove ? builder.deny() : builder.permit();
591
592 FilteringObjective igmp = builder
593 .withKey(Criteria.matchInPort(port))
594 .addCondition(Criteria.matchEthType(EthType.EtherType.IPV4.ethType()))
595 .addCondition(Criteria.matchIPProtocol(IPv4.PROTOCOL_IGMP))
596 .withMeta(DefaultTrafficTreatment.builder().setOutput(PortNumber.CONTROLLER).build())
597 .fromApp(appId)
598 .withPriority(10000)
599 .add(new ObjectiveContext() {
600 @Override
601 public void onSuccess(Objective objective) {
602 log.info("IgmpProxy filter for {} on {} installed.",
603 devId, port);
604 }
605
606 @Override
607 public void onError(Objective objective, ObjectiveError error) {
608 log.info("IgmpProxy filter for {} on {} failed because {}.",
609 devId, port, error);
610 }
611 });
612
613 flowObjectiveService.filter(devId, igmp);
614 }
615
616 private boolean isConnectPoint(DeviceId device, PortNumber port) {
Deepa Vaddireddy01f33b42017-07-02 13:26:53 +0530617 if (connectPoint != null) {
618 return (connectPointMode && connectPoint.deviceId().equals(device)
619 && connectPoint.port().equals(port));
620 } else {
621 log.info("connectPoint not configured for device {}", device);
622 return false;
623 }
ke han81a38b92017-03-10 18:41:44 +0800624 }
Deepa Vaddireddy01f33b42017-07-02 13:26:53 +0530625
ke han81a38b92017-03-10 18:41:44 +0800626 private boolean isUplink(DeviceId device, PortNumber port) {
627 return ((!connectPointMode) && oltData.containsKey(device)
628 && oltData.get(device).uplink().equals(port));
629 }
630
631 private class InternalDeviceListener implements DeviceListener {
632 @Override
633 public void event(DeviceEvent event) {
634 DeviceId devId = event.subject().id();
Deepa Vaddireddya29db762017-09-28 13:47:18 +0000635 Port p = event.port();
636 if (oltData.get(devId) == null &&
637 !(p != null && isConnectPoint(devId, p.number()))) {
ke han81a38b92017-03-10 18:41:44 +0800638 return;
639 }
Deepa Vaddireddya29db762017-09-28 13:47:18 +0000640 PortNumber port;
641
ke han81a38b92017-03-10 18:41:44 +0800642 switch (event.type()) {
643
644 case DEVICE_ADDED:
645 case DEVICE_UPDATED:
646 case DEVICE_REMOVED:
647 case DEVICE_SUSPENDED:
648 case DEVICE_AVAILABILITY_CHANGED:
649 case PORT_STATS_UPDATED:
650 break;
651 case PORT_ADDED:
Deepa Vaddireddya29db762017-09-28 13:47:18 +0000652 port = p.number();
ke han81a38b92017-03-10 18:41:44 +0800653 if (oltData.containsKey(devId) && !isUplink(devId, port) && !isConnectPoint(devId, port)) {
654 processFilterObjective(devId, port, false);
655 } else if (isUplink(devId, port)) {
656 provisionUplinkFlows();
657 } else if (isConnectPoint(devId, port)) {
658 provisionConnectPointFlows();
659 }
660 break;
661 case PORT_UPDATED:
Deepa Vaddireddya29db762017-09-28 13:47:18 +0000662 port = p.number();
ke han81a38b92017-03-10 18:41:44 +0800663 if (oltData.containsKey(devId) && !isUplink(devId, port) && !isConnectPoint(devId, port)) {
664 if (event.port().isEnabled()) {
665 processFilterObjective(devId, port, false);
666 } else {
667 processFilterObjective(devId, port, true);
668 }
669 } else if (isUplink(devId, port)) {
670 if (event.port().isEnabled()) {
671 provisionUplinkFlows(devId);
672 } else {
673 processFilterObjective(devId, port, true);
674 }
675 } else if (isConnectPoint(devId, port)) {
676 if (event.port().isEnabled()) {
677 provisionConnectPointFlows();
678 } else {
679 unprovisionConnectPointFlows();
680 }
681 }
682 break;
683 case PORT_REMOVED:
Deepa Vaddireddya29db762017-09-28 13:47:18 +0000684 port = p.number();
ke han81a38b92017-03-10 18:41:44 +0800685 processFilterObjective(devId, port, true);
686 break;
687 default:
688 log.info("Unknown device event {}", event.type());
689 break;
690 }
691 }
692
693 @Override
694 public boolean isRelevant(DeviceEvent event) {
695 return true;
696 }
697 }
698
699 private class InternalNetworkConfigListener implements NetworkConfigListener {
700
701 private void reconfigureNetwork(IgmpproxyConfig cfg) {
702 IgmpproxyConfig newCfg;
703 if (cfg == null) {
704 newCfg = new IgmpproxyConfig();
705 } else {
706 newCfg = cfg;
707 }
708
709 unSolicitedTimeout = newCfg.unsolicitedTimeOut();
710 maxResp = newCfg.maxResp();
711 keepAliveInterval = newCfg.keepAliveInterval();
712 keepAliveCount = newCfg.keepAliveCount();
713 lastQueryInterval = newCfg.lastQueryInterval();
714 lastQueryCount = newCfg.lastQueryCount();
715 withRAUplink = newCfg.withRAUplink();
716 withRADownlink = newCfg.withRADownlink();
717 igmpCos = newCfg.igmpCos();
718 periodicQuery = newCfg.periodicQuery();
719 fastLeave = newCfg.fastLeave();
ke hanf9ed58c2017-09-08 10:29:12 +0800720 pimSSmInterworking = newCfg.pimSsmInterworking();
Deepa Vaddireddya29db762017-09-28 13:47:18 +0000721
722 if (connectPointMode != newCfg.connectPointMode() ||
723 connectPoint != newCfg.connectPoint()) {
ke han81a38b92017-03-10 18:41:44 +0800724 connectPointMode = newCfg.connectPointMode();
Deepa Vaddireddya29db762017-09-28 13:47:18 +0000725 connectPoint = newCfg.connectPoint();
ke han81a38b92017-03-10 18:41:44 +0800726 if (connectPointMode) {
727 unprovisionUplinkFlows();
728 provisionConnectPointFlows();
729 } else {
730 unprovisionConnectPointFlows();
731 provisionUplinkFlows();
732 }
733 }
734 if (connectPoint != null) {
735 log.info("connect point :" + connectPoint.toString());
736 }
737 log.info(" mode: " + connectPointMode);
738
739 IgmpSender.getInstance().setIgmpCos(igmpCos);
740 IgmpSender.getInstance().setMaxResp(maxResp);
741 IgmpSender.getInstance().setMvlan(mvlan);
742 IgmpSender.getInstance().setWithRADownlink(withRADownlink);
743 IgmpSender.getInstance().setWithRAUplink(withRAUplink);
744
745 }
746
747 public void reconfigureSsmTable(IgmpproxySsmTranslateConfig cfg) {
748 if (cfg == null) {
749 return;
750 }
751 Collection<McastRoute> translations = cfg.getSsmTranslations();
752 for (McastRoute route : translations) {
753 ssmTranslateTable.put(route.group().getIp4Address(), route.source().getIp4Address());
754 }
755 }
756
757 @Override
758 public void event(NetworkConfigEvent event) {
759 switch (event.type()) {
760 case CONFIG_ADDED:
761 case CONFIG_UPDATED:
762 if (event.configClass().equals(CONFIG_CLASS)) {
763 AccessDeviceConfig config =
764 networkConfig.getConfig((DeviceId) event.subject(), CONFIG_CLASS);
765 if (config != null) {
766 oltData.put(config.getAccessDevice().deviceId(), config.getAccessDevice());
767 provisionDefaultFlows((DeviceId) event.subject());
768 provisionUplinkFlows((DeviceId) event.subject());
769 }
770 }
771
772 if (event.configClass().equals(IGMPPROXY_CONFIG_CLASS)) {
773 IgmpproxyConfig config = networkConfig.getConfig(appId, IGMPPROXY_CONFIG_CLASS);
774 if (config != null) {
775 reconfigureNetwork(config);
776 }
777 }
778
779 if (event.configClass().equals(IGMPPROXY_SSM_CONFIG_CLASS)) {
780 IgmpproxySsmTranslateConfig config = networkConfig.getConfig(appId, IGMPPROXY_SSM_CONFIG_CLASS);
781 if (config != null) {
782 reconfigureSsmTable(config);
783 }
784 }
785
786 if (event.configClass().equals(MCAST_CONFIG_CLASS)) {
787 McastConfig config = networkConfig.getConfig(coreAppId, MCAST_CONFIG_CLASS);
788 if (config != null && mvlan != config.egressVlan().toShort()) {
789 mvlan = config.egressVlan().toShort();
Deepa Vaddireddy88134a42017-07-05 12:16:03 +0530790 IgmpSender.getInstance().setMvlan(mvlan);
ke han81a38b92017-03-10 18:41:44 +0800791 groupMemberMap.values().forEach(m -> leaveAction(m));
792 }
793 }
794
795 log.info("Reconfigured");
796 break;
797 case CONFIG_REGISTERED:
798 case CONFIG_UNREGISTERED:
799 break;
800 case CONFIG_REMOVED:
801 if (event.configClass().equals(CONFIG_CLASS)) {
802 oltData.remove(event.subject());
803 }
804
805 default:
806 break;
807 }
808 }
809 }
810
811 private void provisionDefaultFlows(DeviceId deviceId) {
812 List<Port> ports = deviceService.getPorts(deviceId);
813 ports.stream()
814 .filter(p -> (!oltData.get(p.element().id()).uplink().equals(p.number()) && p.isEnabled()))
815 .forEach(p -> processFilterObjective((DeviceId) p.element().id(), p.number(), false));
816 }
817
818 private void provisionUplinkFlows(DeviceId deviceId) {
819 if (connectPointMode) {
820 return;
821 }
822
823 processFilterObjective(deviceId, oltData.get(deviceId).uplink(), false);
824 }
825
826 private void provisionUplinkFlows() {
827 if (connectPointMode) {
828 return;
829 }
830
David K. Bainbridge4809c0e2017-08-17 09:54:40 -0700831 oltData.keySet().forEach(deviceId -> provisionUplinkFlows(deviceId));
ke han81a38b92017-03-10 18:41:44 +0800832 }
833 private void unprovisionUplinkFlows() {
834 oltData.keySet().forEach(deviceId ->
835 processFilterObjective(deviceId, oltData.get(deviceId).uplink(), true));
836 }
837
838 private void provisionConnectPointFlows() {
839 if ((!connectPointMode) || connectPoint == null) {
840 return;
841 }
842
843 processFilterObjective(connectPoint.deviceId(), connectPoint.port(), false);
844 }
845 private void unprovisionConnectPointFlows() {
846 if (connectPoint == null) {
847 return;
848 }
849 processFilterObjective(connectPoint.deviceId(), connectPoint.port(), true);
850 }
851}