blob: ad3f2a76b8334b5e6006cab4e882569dd61bdd91 [file] [log] [blame]
alshabib3b1eadc2016-02-01 17:57:00 -08001/*
Brian O'Connorcf85aa82017-08-03 22:46:01 -07002 * Copyright 2016-present Open Networking Foundation
alshabib3b1eadc2016-02-01 17:57:00 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
alshabib772e1582016-06-01 17:50:05 -070016package org.opencord.cordmcast;
alshabib3b1eadc2016-02-01 17:57:00 -080017
Esin Karaman39b24852019-08-28 13:57:30 +000018import com.google.common.collect.ImmutableSet;
19import com.google.common.collect.Sets;
alshabib09069c92016-02-21 14:49:51 -080020import org.apache.commons.lang3.tuple.ImmutablePair;
Carmelo Cascone995fd682019-11-14 14:22:39 -080021import org.osgi.service.component.annotations.Activate;
22import org.osgi.service.component.annotations.Component;
23import org.osgi.service.component.annotations.Deactivate;
24import org.osgi.service.component.annotations.Modified;
25import org.osgi.service.component.annotations.Reference;
26import org.osgi.service.component.annotations.ReferenceCardinality;
alshabib3b1eadc2016-02-01 17:57:00 -080027import org.onlab.packet.Ethernet;
alshabib3b1eadc2016-02-01 17:57:00 -080028import org.onlab.packet.IpAddress;
29import org.onlab.packet.VlanId;
Esin Karaman39b24852019-08-28 13:57:30 +000030import org.onlab.util.KryoNamespace;
Jonathan Hart28271642016-02-10 16:13:54 -080031import org.onosproject.cfg.ComponentConfigService;
Esin Karaman39b24852019-08-28 13:57:30 +000032import org.onosproject.cluster.ClusterService;
33import org.onosproject.cluster.LeadershipService;
34import org.onosproject.cluster.NodeId;
alshabib3b1eadc2016-02-01 17:57:00 -080035import org.onosproject.core.ApplicationId;
36import org.onosproject.core.CoreService;
Esin Karaman39b24852019-08-28 13:57:30 +000037import org.onosproject.mastership.MastershipService;
38import org.onosproject.mcast.api.McastEvent;
39import org.onosproject.mcast.api.McastListener;
40import org.onosproject.mcast.api.McastRoute;
41import org.onosproject.mcast.api.MulticastRouteService;
alshabib3b1eadc2016-02-01 17:57:00 -080042import org.onosproject.net.ConnectPoint;
ke han9590c812017-02-28 15:02:26 +080043import org.onosproject.net.DeviceId;
Esin Karaman39b24852019-08-28 13:57:30 +000044import org.onosproject.net.HostId;
ke han9590c812017-02-28 15:02:26 +080045import org.onosproject.net.PortNumber;
ke hanf1709e82016-08-12 10:48:17 +080046import org.onosproject.net.config.ConfigFactory;
47import org.onosproject.net.config.NetworkConfigEvent;
48import org.onosproject.net.config.NetworkConfigListener;
49import org.onosproject.net.config.NetworkConfigRegistry;
Esin Karaman39b24852019-08-28 13:57:30 +000050import org.onosproject.net.config.basics.McastConfig;
ke hanf1709e82016-08-12 10:48:17 +080051import org.onosproject.net.config.basics.SubjectFactories;
Esin Karaman39b24852019-08-28 13:57:30 +000052import org.onosproject.net.device.DeviceService;
alshabib3b1eadc2016-02-01 17:57:00 -080053import org.onosproject.net.flow.DefaultTrafficSelector;
54import org.onosproject.net.flow.DefaultTrafficTreatment;
55import org.onosproject.net.flow.TrafficSelector;
56import org.onosproject.net.flowobjective.DefaultForwardingObjective;
57import org.onosproject.net.flowobjective.DefaultNextObjective;
Esin Karaman39b24852019-08-28 13:57:30 +000058import org.onosproject.net.flowobjective.DefaultObjectiveContext;
alshabib3b1eadc2016-02-01 17:57:00 -080059import org.onosproject.net.flowobjective.FlowObjectiveService;
60import org.onosproject.net.flowobjective.ForwardingObjective;
61import org.onosproject.net.flowobjective.NextObjective;
62import org.onosproject.net.flowobjective.Objective;
63import org.onosproject.net.flowobjective.ObjectiveContext;
64import org.onosproject.net.flowobjective.ObjectiveError;
Esin Karaman39b24852019-08-28 13:57:30 +000065import org.onosproject.store.serializers.KryoNamespaces;
66import org.onosproject.store.service.ConsistentMap;
67import org.onosproject.store.service.Serializer;
68import org.onosproject.store.service.StorageService;
69import org.onosproject.store.service.Versioned;
Charles Chane8ed8ee2016-06-13 16:37:01 -070070import org.opencord.cordconfig.CordConfigService;
Esin Karaman39b24852019-08-28 13:57:30 +000071import org.opencord.cordconfig.access.AccessDeviceData;
Jonathan Hart28271642016-02-10 16:13:54 -080072import org.osgi.service.component.ComponentContext;
alshabib3b1eadc2016-02-01 17:57:00 -080073import org.slf4j.Logger;
74
Jonathan Hart28271642016-02-10 16:13:54 -080075import java.util.Dictionary;
alshabib3b1eadc2016-02-01 17:57:00 -080076import java.util.Map;
ke han9590c812017-02-28 15:02:26 +080077import java.util.Objects;
Jonathan Hart0c194962016-05-23 17:08:15 -070078import java.util.Optional;
alshabibfc1cb032016-02-17 15:37:56 -080079import java.util.Properties;
Esin Karaman39b24852019-08-28 13:57:30 +000080import java.util.Set;
81import java.util.concurrent.ExecutorService;
82import java.util.concurrent.locks.Lock;
83import java.util.concurrent.locks.ReentrantLock;
alshabib3b1eadc2016-02-01 17:57:00 -080084
alshabibfc1cb032016-02-17 15:37:56 -080085import static com.google.common.base.Strings.isNullOrEmpty;
Esin Karaman39b24852019-08-28 13:57:30 +000086import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
alshabibfc1cb032016-02-17 15:37:56 -080087import static org.onlab.util.Tools.get;
Esin Karaman39b24852019-08-28 13:57:30 +000088import static org.onlab.util.Tools.groupedThreads;
Carmelo Cascone995fd682019-11-14 14:22:39 -080089import static org.opencord.cordmcast.OsgiPropertyConstants.DEFAULT_VLAN_ENABLED;
90import static org.opencord.cordmcast.OsgiPropertyConstants.DEFAULT_PRIORITY;
91import static org.opencord.cordmcast.OsgiPropertyConstants.PRIORITY;
92import static org.opencord.cordmcast.OsgiPropertyConstants.VLAN_ENABLED;
alshabib3b1eadc2016-02-01 17:57:00 -080093import static org.slf4j.LoggerFactory.getLogger;
94
Esin Karaman39b24852019-08-28 13:57:30 +000095
alshabib3b1eadc2016-02-01 17:57:00 -080096/**
Jonathan Hartc3f84eb2016-02-19 12:44:36 -080097 * CORD multicast provisioning application. Operates by listening to
Jonathan Hart28271642016-02-10 16:13:54 -080098 * events on the multicast rib and provisioning groups to program multicast
alshabib3b1eadc2016-02-01 17:57:00 -080099 * flows on the dataplane.
100 */
Carmelo Cascone995fd682019-11-14 14:22:39 -0800101@Component(immediate = true,
102 property = {
103 VLAN_ENABLED + ":Boolean=" + DEFAULT_VLAN_ENABLED,
104 PRIORITY + ":Integer=" + DEFAULT_PRIORITY,
105})
alshabib3b1eadc2016-02-01 17:57:00 -0800106public class CordMcast {
Charles Chanf867c4b2017-01-20 11:22:25 -0800107 private static final String APP_NAME = "org.opencord.cordmcast";
alshabib3b1eadc2016-02-01 17:57:00 -0800108
Jonathan Hart0c194962016-05-23 17:08:15 -0700109 private final Logger log = getLogger(getClass());
alshabib09069c92016-02-21 14:49:51 -0800110
alshabib09069c92016-02-21 14:49:51 -0800111 private static final int DEFAULT_PRIORITY = 500;
alshabib3b1eadc2016-02-01 17:57:00 -0800112 private static final short DEFAULT_MCAST_VLAN = 4000;
alshabibfc1cb032016-02-17 15:37:56 -0800113
Carmelo Cascone995fd682019-11-14 14:22:39 -0800114 @Reference(cardinality = ReferenceCardinality.MANDATORY)
alshabib3b1eadc2016-02-01 17:57:00 -0800115 protected MulticastRouteService mcastService;
116
Carmelo Cascone995fd682019-11-14 14:22:39 -0800117 @Reference(cardinality = ReferenceCardinality.MANDATORY)
alshabib3b1eadc2016-02-01 17:57:00 -0800118 protected FlowObjectiveService flowObjectiveService;
119
Carmelo Cascone995fd682019-11-14 14:22:39 -0800120 @Reference(cardinality = ReferenceCardinality.MANDATORY)
alshabib3b1eadc2016-02-01 17:57:00 -0800121 protected CoreService coreService;
122
Carmelo Cascone995fd682019-11-14 14:22:39 -0800123 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jonathan Hart28271642016-02-10 16:13:54 -0800124 protected ComponentConfigService componentConfigService;
125
Carmelo Cascone995fd682019-11-14 14:22:39 -0800126 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jonathan Hart0c194962016-05-23 17:08:15 -0700127 protected CordConfigService cordConfigService;
alshabib09069c92016-02-21 14:49:51 -0800128
Carmelo Cascone995fd682019-11-14 14:22:39 -0800129 @Reference(cardinality = ReferenceCardinality.MANDATORY)
ke hanf1709e82016-08-12 10:48:17 +0800130 protected NetworkConfigRegistry networkConfig;
131
Carmelo Cascone995fd682019-11-14 14:22:39 -0800132 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Esin Karaman39b24852019-08-28 13:57:30 +0000133 protected StorageService storageService;
134
Carmelo Cascone995fd682019-11-14 14:22:39 -0800135 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Esin Karaman39b24852019-08-28 13:57:30 +0000136 protected MastershipService mastershipService;
137
Carmelo Cascone995fd682019-11-14 14:22:39 -0800138 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Esin Karaman39b24852019-08-28 13:57:30 +0000139 public DeviceService deviceService;
140
Carmelo Cascone995fd682019-11-14 14:22:39 -0800141 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Esin Karaman39b24852019-08-28 13:57:30 +0000142 private ClusterService clusterService;
143
Carmelo Cascone995fd682019-11-14 14:22:39 -0800144 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Esin Karaman39b24852019-08-28 13:57:30 +0000145 private LeadershipService leadershipService;
146
alshabib3b1eadc2016-02-01 17:57:00 -0800147 protected McastListener listener = new InternalMulticastListener();
ke hanf1709e82016-08-12 10:48:17 +0800148 private InternalNetworkConfigListener configListener =
149 new InternalNetworkConfigListener();
alshabib3b1eadc2016-02-01 17:57:00 -0800150
Esin Karaman39b24852019-08-28 13:57:30 +0000151 private ConsistentMap<NextKey, NextContent> groups;
alshabib3b1eadc2016-02-01 17:57:00 -0800152
alshabib3b1eadc2016-02-01 17:57:00 -0800153 private ApplicationId appId;
ke hanf1709e82016-08-12 10:48:17 +0800154 private ApplicationId coreAppId;
Esin Karaman39b24852019-08-28 13:57:30 +0000155 private short mcastVlan = DEFAULT_MCAST_VLAN;
alshabib3b1eadc2016-02-01 17:57:00 -0800156
Carmelo Cascone995fd682019-11-14 14:22:39 -0800157 /**
158 * Whether to use VLAN for multicast traffic.
159 **/
alshabib09069c92016-02-21 14:49:51 -0800160 private boolean vlanEnabled = DEFAULT_VLAN_ENABLED;
alshabibfc1cb032016-02-17 15:37:56 -0800161
Carmelo Cascone995fd682019-11-14 14:22:39 -0800162 /**
163 * Priority for multicast rules.
164 **/
alshabib3b1eadc2016-02-01 17:57:00 -0800165 private int priority = DEFAULT_PRIORITY;
166
ke hanf1709e82016-08-12 10:48:17 +0800167 private static final Class<McastConfig> CORD_MCAST_CONFIG_CLASS =
168 McastConfig.class;
169
170 private ConfigFactory<ApplicationId, McastConfig> cordMcastConfigFactory =
171 new ConfigFactory<ApplicationId, McastConfig>(
172 SubjectFactories.APP_SUBJECT_FACTORY, CORD_MCAST_CONFIG_CLASS, "multicast") {
173 @Override
174 public McastConfig createConfig() {
175 return new McastConfig();
176 }
177 };
Jonathan Hart28271642016-02-10 16:13:54 -0800178
Esin Karaman39b24852019-08-28 13:57:30 +0000179 // lock to synchronize local operations
180 private final Lock mcastLock = new ReentrantLock();
181 private void mcastLock() {
182 mcastLock.lock();
183 }
184 private void mcastUnlock() {
185 mcastLock.unlock();
186 }
187 private ExecutorService eventExecutor;
188
alshabib3b1eadc2016-02-01 17:57:00 -0800189 @Activate
Jonathan Hart435ffc42016-02-19 10:32:05 -0800190 public void activate(ComponentContext context) {
Jonathan Hartc3f84eb2016-02-19 12:44:36 -0800191 componentConfigService.registerProperties(getClass());
Jonathan Hart435ffc42016-02-19 10:32:05 -0800192 modified(context);
193
Charles Chanf867c4b2017-01-20 11:22:25 -0800194 appId = coreService.registerApplication(APP_NAME);
ke hanf1709e82016-08-12 10:48:17 +0800195 coreAppId = coreService.registerApplication(CoreService.CORE_APP_NAME);
Jonathan Hart28271642016-02-10 16:13:54 -0800196
Esin Karaman39b24852019-08-28 13:57:30 +0000197 eventExecutor = newSingleThreadScheduledExecutor(groupedThreads("cord/mcast",
198 "events-mcast-%d", log));
199
200 KryoNamespace.Builder groupsKryo = new KryoNamespace.Builder()
201 .register(KryoNamespaces.API)
202 .register(NextKey.class)
203 .register(NextContent.class);
204 groups = storageService
205 .<NextKey, NextContent>consistentMapBuilder()
206 .withName("cord-mcast-groups-store")
207 .withSerializer(Serializer.using(groupsKryo.build("CordMcast-Groups")))
208 .build();
209
ke hanf1709e82016-08-12 10:48:17 +0800210 networkConfig.registerConfigFactory(cordMcastConfigFactory);
211 networkConfig.addListener(configListener);
Jonathan Hartc3f84eb2016-02-19 12:44:36 -0800212 mcastService.addListener(listener);
213
alshabib09069c92016-02-21 14:49:51 -0800214 mcastService.getRoutes().stream()
Esin Karaman39b24852019-08-28 13:57:30 +0000215 .map(r -> new ImmutablePair<>(r, mcastService.sinks(r)))
alshabib09069c92016-02-21 14:49:51 -0800216 .filter(pair -> pair.getRight() != null && !pair.getRight().isEmpty())
Esin Karaman39b24852019-08-28 13:57:30 +0000217 .forEach(pair -> pair.getRight().forEach(sink -> addSink(pair.getLeft(),
218 sink)));
Jonathan Hartc3f84eb2016-02-19 12:44:36 -0800219
ke hanf1709e82016-08-12 10:48:17 +0800220 McastConfig config = networkConfig.getConfig(coreAppId, CORD_MCAST_CONFIG_CLASS);
Esin Karaman39b24852019-08-28 13:57:30 +0000221 updateConfig(config);
ke hanf1709e82016-08-12 10:48:17 +0800222
alshabib3b1eadc2016-02-01 17:57:00 -0800223 log.info("Started");
224 }
225
226 @Deactivate
227 public void deactivate() {
Jonathan Hartc3f84eb2016-02-19 12:44:36 -0800228 componentConfigService.unregisterProperties(getClass(), false);
alshabib3b1eadc2016-02-01 17:57:00 -0800229 mcastService.removeListener(listener);
ke hanf1709e82016-08-12 10:48:17 +0800230 networkConfig.removeListener(configListener);
ke han9590c812017-02-28 15:02:26 +0800231 networkConfig.unregisterConfigFactory(cordMcastConfigFactory);
Esin Karaman39b24852019-08-28 13:57:30 +0000232 eventExecutor.shutdown();
ke han9590c812017-02-28 15:02:26 +0800233 clearGroups();
Esin Karaman39b24852019-08-28 13:57:30 +0000234 groups.destroy();
alshabib3b1eadc2016-02-01 17:57:00 -0800235 log.info("Stopped");
236 }
237
ke han9590c812017-02-28 15:02:26 +0800238 public void clearGroups() {
Esin Karaman39b24852019-08-28 13:57:30 +0000239 mcastLock();
240 try {
241 groups.keySet().forEach(groupInfo -> {
242 if (!isLocalLeader(groupInfo.getDevice())) {
243 return;
244 }
245 NextContent next = groups.get(groupInfo).value();
246
247 ObjectiveContext context = new DefaultObjectiveContext(
248 (objective) -> log.debug("Successfully remove {}",
249 groupInfo.group),
250 (objective, error) -> log.warn("Failed to remove {}: {}",
251 groupInfo.group, error));
252 // remove the flow rule
253 flowObjectiveService.forward(groupInfo.getDevice(), fwdObject(next.getNextId(),
254 groupInfo.group).remove(context));
255 // remove all ports from the group
256 next.getOutPorts().stream().forEach(portNumber ->
257 flowObjectiveService.next(groupInfo.getDevice(), nextObject(next.getNextId(),
258 portNumber, NextType.RemoveFromExisting,
259 groupInfo.group))
260 );
261
262 });
263 groups.clear();
264 } finally {
265 mcastUnlock();
266 }
267 }
268
269 private VlanId multicastVlan() {
270 return VlanId.vlanId(mcastVlan);
271 }
272
273 private VlanId assignedVlan() {
274 return vlanEnabled ? multicastVlan() : VlanId.NONE;
ke han9590c812017-02-28 15:02:26 +0800275 }
276
Jonathan Hart28271642016-02-10 16:13:54 -0800277 @Modified
278 public void modified(ComponentContext context) {
alshabibfc1cb032016-02-17 15:37:56 -0800279 Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
280
alshabibfc1cb032016-02-17 15:37:56 -0800281 try {
Esin Karaman39b24852019-08-28 13:57:30 +0000282 String s = get(properties, "vlanEnabled");
alshabib09069c92016-02-21 14:49:51 -0800283 vlanEnabled = isNullOrEmpty(s) ? DEFAULT_VLAN_ENABLED : Boolean.parseBoolean(s.trim());
alshabibfc1cb032016-02-17 15:37:56 -0800284
285 s = get(properties, "priority");
286 priority = isNullOrEmpty(s) ? DEFAULT_PRIORITY : Integer.parseInt(s.trim());
287
alshabibfc1cb032016-02-17 15:37:56 -0800288 } catch (Exception e) {
Esin Karaman39b24852019-08-28 13:57:30 +0000289 log.error("Unable to parse configuration parameter.", e);
alshabibfc1cb032016-02-17 15:37:56 -0800290 vlanEnabled = false;
291 priority = DEFAULT_PRIORITY;
292 }
Jonathan Hart28271642016-02-10 16:13:54 -0800293 }
294
alshabib3b1eadc2016-02-01 17:57:00 -0800295 private class InternalMulticastListener implements McastListener {
296 @Override
297 public void event(McastEvent event) {
Esin Karaman39b24852019-08-28 13:57:30 +0000298 eventExecutor.execute(() -> {
299 switch (event.type()) {
300 case ROUTE_ADDED:
301 case ROUTE_REMOVED:
302 case SOURCES_ADDED:
303 break;
304 case SINKS_ADDED:
305 addSinks(event);
306 break;
307 case SINKS_REMOVED:
308 removeSinks(event);
309 break;
310 default:
311 log.warn("Unknown mcast event {}", event.type());
312 }
313 });
314 }
315 }
316
317 /**
318 * Processes previous, and new sinks then finds the sinks to be removed.
319 * @param prevSinks the previous sinks to be evaluated
320 * @param newSinks the new sinks to be evaluated
321 * @returnt the set of the sinks to be removed
322 */
323 private Set<ConnectPoint> getSinksToBeRemoved(Map<HostId, Set<ConnectPoint>> prevSinks,
324 Map<HostId, Set<ConnectPoint>> newSinks) {
325 return getSinksToBeProcessed(prevSinks, newSinks);
326 }
327
328
329 /**
330 * Processes previous, and new sinks then finds the sinks to be added.
331 * @param newSinks the new sinks to be processed
332 * @param allPrevSinks all previous sinks
333 * @return the set of the sinks to be added
334 */
335 private Set<ConnectPoint> getSinksToBeAdded(Map<HostId, Set<ConnectPoint>> newSinks,
336 Map<HostId, Set<ConnectPoint>> allPrevSinks) {
337 return getSinksToBeProcessed(newSinks, allPrevSinks);
338 }
339
340 /**
341 * Gets single-homed sinks that are in set1 but not in set2.
342 * @param sinkSet1 the first sink map
343 * @param sinkSet2 the second sink map
344 * @return a set containing all the single-homed sinks found in set1 but not in set2
345 */
346 private Set<ConnectPoint> getSinksToBeProcessed(Map<HostId, Set<ConnectPoint>> sinkSet1,
347 Map<HostId, Set<ConnectPoint>> sinkSet2) {
348 final Set<ConnectPoint> sinksToBeProcessed = Sets.newHashSet();
349 sinkSet1.forEach(((hostId, connectPoints) -> {
350 if (HostId.NONE.equals(hostId)) {
351 //assume all connect points associated with HostId.NONE are single homed sinks
352 sinksToBeProcessed.addAll(connectPoints);
353 return;
354 }
355 }));
356 Set<ConnectPoint> singleHomedSinksOfSet2 = sinkSet2.get(HostId.NONE) == null ?
357 Sets.newHashSet() :
358 sinkSet2.get(HostId.NONE);
359 return Sets.difference(sinksToBeProcessed, singleHomedSinksOfSet2);
360 };
361
362
363 private void removeSinks(McastEvent event) {
364 mcastLock();
365 try {
366 Set<ConnectPoint> sinksToBeRemoved = getSinksToBeRemoved(event.prevSubject().sinks(),
367 event.subject().sinks());
368 sinksToBeRemoved.forEach(sink -> removeSink(event.subject().route().group(), sink));
369 } finally {
370 mcastUnlock();
371 }
372 }
373
374 private void removeSink(IpAddress group, ConnectPoint sink) {
375 if (!isLocalLeader(sink.deviceId())) {
376 log.debug("Not the leader of {}. Skip sink_removed event for the sink {} and group {}",
377 sink.deviceId(), sink, group);
378 return;
379 }
380
381 Optional<AccessDeviceData> oltInfo = cordConfigService.getAccessDevice(sink.deviceId());
382
383 if (!oltInfo.isPresent()) {
384 log.warn("Unknown OLT device : {}", sink.deviceId());
385 return;
386 }
387
388 log.debug("Removing sink {} from the group {}", sink, group);
389
390 NextKey key = new NextKey(sink.deviceId(), group);
391 groups.computeIfPresent(key, (k, v) -> {
392 flowObjectiveService.next(sink.deviceId(), nextObject(v.getNextId(), sink.port(),
393 NextType.RemoveFromExisting, group));
394
395 Set<PortNumber> outPorts = Sets.newHashSet(v.getOutPorts());
396 outPorts.remove(sink.port());
397
398 if (outPorts.isEmpty()) {
399 // this is the last sink
400 ObjectiveContext context = new DefaultObjectiveContext(
401 (objective) -> log.debug("Successfully remove {} on {}",
402 group, sink),
403 (objective, error) -> log.warn("Failed to remove {} on {}: {}",
404 group, sink, error));
405 ForwardingObjective fwdObj = fwdObject(v.getNextId(), group).remove(context);
406 flowObjectiveService.forward(sink.deviceId(), fwdObj);
407 }
408 // remove the whole entity if no out port exists in the port list
409 return outPorts.isEmpty() ? null : new NextContent(v.getNextId(),
410 ImmutableSet.copyOf(outPorts));
411 });
412 }
413
414 private void addSinks(McastEvent event) {
415 mcastLock();
416 try {
417 Set<ConnectPoint> sinksToBeAdded = getSinksToBeAdded(event.subject().sinks(),
418 event.prevSubject().sinks());
419 sinksToBeAdded.forEach(sink -> addSink(event.subject().route(), sink));
420 } finally {
421 mcastUnlock();
422 }
423 }
424
425 private void addSink(McastRoute route, ConnectPoint sink) {
426 if (!isLocalLeader(sink.deviceId())) {
427 log.debug("Not the leader of {}. Skip sink_added event for the sink {} and group {}",
428 sink.deviceId(), sink, route.group());
429 return;
430 }
431
432 Optional<AccessDeviceData> oltInfo = cordConfigService.getAccessDevice(sink.deviceId());
433
434 if (!oltInfo.isPresent()) {
435 log.warn("Unknown OLT device : {}", sink.deviceId());
436 return;
437 }
438
439 log.debug("Adding sink {} to the group {}", sink, route.group());
440
441 NextKey key = new NextKey(sink.deviceId(), route.group());
442 NextObjective newNextObj;
443
444 boolean theFirstSinkOfGroup = false;
445 if (!groups.containsKey(key)) {
446 // First time someone request this mcast group via this device
447 Integer nextId = flowObjectiveService.allocateNextId();
448 newNextObj = nextObject(nextId, sink.port(), NextType.AddNew, route.group());
449 // Store the new port
450 groups.put(key, new NextContent(nextId, ImmutableSet.of(sink.port())));
451 theFirstSinkOfGroup = true;
452 } else {
453 // This device already serves some subscribers of this mcast group
454 Versioned<NextContent> nextObj = groups.get(key);
455 if (nextObj.value().getOutPorts().contains(sink.port())) {
456 log.info("Group {} already serves the sink connected to {}", route.group(), sink);
457 return;
458 }
459 newNextObj = nextObject(nextObj.value().getNextId(), sink.port(),
460 NextType.AddToExisting, route.group());
461 // add new port to the group
462 Set<PortNumber> outPorts = Sets.newHashSet(nextObj.value().getOutPorts());
463 outPorts.add(sink.port());
464 groups.put(key, new NextContent(newNextObj.id(), ImmutableSet.copyOf(outPorts)));
465 }
466
467 ObjectiveContext context = new DefaultObjectiveContext(
468 (objective) -> log.debug("Successfully add {} on {}/{}, vlan {}",
469 route.group(), sink.deviceId(), sink.port().toLong(),
470 assignedVlan()),
471 (objective, error) -> {
472 log.warn("Failed to add {} on {}/{}, vlan {}: {}",
473 route.group(), sink.deviceId(), sink.port().toLong(), assignedVlan(),
474 error);
475 });
476
477 flowObjectiveService.next(sink.deviceId(), newNextObj);
478
479 if (theFirstSinkOfGroup) {
480 // create the necessary flow rule if this is the first sink request for the group
481 // on this device
482 flowObjectiveService.forward(sink.deviceId(), fwdObject(newNextObj.id(),
483 route.group()).add(context));
484 }
485 }
486
487 private class InternalNetworkConfigListener implements NetworkConfigListener {
488 @Override
489 public void event(NetworkConfigEvent event) {
490 eventExecutor.execute(() -> {
491 switch (event.type()) {
492
493 case CONFIG_ADDED:
494 case CONFIG_UPDATED:
495 if (event.configClass().equals(CORD_MCAST_CONFIG_CLASS)) {
496 McastConfig config = networkConfig.getConfig(coreAppId, CORD_MCAST_CONFIG_CLASS);
497 if (config != null) {
498 //TODO: Simply remove flows/groups, hosts will response period query
499 // and re-sent IGMP report, so the flows can be rebuild.
500 // However, better to remove and re-add mcast flow rules here
501 if (mcastVlan != config.egressVlan().toShort() && vlanEnabled) {
502 clearGroups();
503 }
504 updateConfig(config);
505 }
506 }
507 break;
508 case CONFIG_REGISTERED:
509 case CONFIG_UNREGISTERED:
510 case CONFIG_REMOVED:
511 break;
512 default:
513 break;
514 }
515 });
516 }
517 }
518
519 private void updateConfig(McastConfig config) {
520 if (config == null) {
521 return;
522 }
523 log.debug("multicast config received: {}", config);
524
525 if (config.egressVlan() != null) {
526 mcastVlan = config.egressVlan().toShort();
527 }
528 }
529
530 private class NextKey {
531 private DeviceId device;
532 private IpAddress group;
533
534 public NextKey(DeviceId deviceId, IpAddress groupAddress) {
535 device = deviceId;
536 group = groupAddress;
537 }
538
539 public DeviceId getDevice() {
540 return device;
541 }
542
543 public int hashCode() {
544 return Objects.hash(this.device, this.group);
545 }
546
547 public boolean equals(Object obj) {
548 if (this == obj) {
549 return true;
550 } else if (!(obj instanceof NextKey)) {
551 return false;
552 } else {
553 NextKey that = (NextKey) obj;
554 return this.getClass() == that.getClass() &&
555 Objects.equals(this.device, that.device) &&
556 Objects.equals(this.group, that.group);
557 }
558 }
559 }
560
561 private class NextContent {
562 private Integer nextId;
563 private Set<PortNumber> outPorts;
564
565 public NextContent(Integer nextId, Set<PortNumber> outPorts) {
566 this.nextId = nextId;
567 this.outPorts = outPorts;
568 }
569
570 public Integer getNextId() {
571 return nextId;
572 }
573
574 public Set<PortNumber> getOutPorts() {
575 return ImmutableSet.copyOf(outPorts);
576 }
577
578 public int hashCode() {
579 return Objects.hash(this.nextId, this.outPorts);
580 }
581
582 public boolean equals(Object obj) {
583 if (this == obj) {
584 return true;
585 } else if (!(obj instanceof NextContent)) {
586 return false;
587 } else {
588 NextContent that = (NextContent) obj;
589 return this.getClass() == that.getClass() &&
590 Objects.equals(this.nextId, that.nextId) &&
591 Objects.equals(this.outPorts, that.outPorts);
alshabib3b1eadc2016-02-01 17:57:00 -0800592 }
593 }
594 }
595
ke han9590c812017-02-28 15:02:26 +0800596 private enum NextType { AddNew, AddToExisting, Remove, RemoveFromExisting };
597
Esin Karaman39b24852019-08-28 13:57:30 +0000598 private NextObjective nextObject(Integer nextId, PortNumber port,
599 NextType nextType, IpAddress mcastIp) {
ke han9590c812017-02-28 15:02:26 +0800600
Esin Karaman39b24852019-08-28 13:57:30 +0000601 // Build the meta selector with the fwd objective info
602 TrafficSelector.Builder metadata = DefaultTrafficSelector.builder()
603 .matchIPDst(mcastIp.toIpPrefix());
604
605 if (vlanEnabled) {
606 metadata.matchVlanId(multicastVlan());
607 }
608
ke han9590c812017-02-28 15:02:26 +0800609 DefaultNextObjective.Builder build = DefaultNextObjective.builder()
610 .fromApp(appId)
611 .addTreatment(DefaultTrafficTreatment.builder().setOutput(port).build())
612 .withType(NextObjective.Type.BROADCAST)
Esin Karaman39b24852019-08-28 13:57:30 +0000613 .withId(nextId)
614 .withMeta(metadata.build());
615
ke han9590c812017-02-28 15:02:26 +0800616 ObjectiveContext content = new ObjectiveContext() {
617 @Override
618 public void onSuccess(Objective objective) {
Esin Karaman39b24852019-08-28 13:57:30 +0000619 log.debug("Next Objective {} installed", objective.id());
ke han9590c812017-02-28 15:02:26 +0800620 }
621
622 @Override
623 public void onError(Objective objective, ObjectiveError error) {
Esin Karaman39b24852019-08-28 13:57:30 +0000624 log.debug("Next Objective {} failed, because {}",
625 objective.id(),
626 error);
ke han9590c812017-02-28 15:02:26 +0800627 }
628 };
629
630 switch (nextType) {
631 case AddNew:
632 return build.add(content);
633 case AddToExisting:
634 return build.addToExisting(content);
635 case Remove:
636 return build.remove(content);
637 case RemoveFromExisting:
638 return build.removeFromExisting(content);
639 default:
640 return null;
641 }
642 }
643
Esin Karaman39b24852019-08-28 13:57:30 +0000644 private ForwardingObjective.Builder fwdObject(int nextId, IpAddress mcastIp) {
645 TrafficSelector.Builder mcast = DefaultTrafficSelector.builder()
646 .matchEthType(Ethernet.TYPE_IPV4)
647 .matchIPDst(mcastIp.toIpPrefix());
ke han9590c812017-02-28 15:02:26 +0800648
Esin Karaman39b24852019-08-28 13:57:30 +0000649 //build the meta selector
650 TrafficSelector.Builder metabuilder = DefaultTrafficSelector.builder();
651 if (vlanEnabled) {
652 metabuilder.matchVlanId(multicastVlan());
Jonathan Hart718c0452016-02-18 15:56:22 -0800653 }
654
Esin Karaman39b24852019-08-28 13:57:30 +0000655 ForwardingObjective.Builder fwdBuilder = DefaultForwardingObjective.builder()
656 .fromApp(appId)
657 .nextStep(nextId)
658 .makePermanent()
659 .withFlag(ForwardingObjective.Flag.SPECIFIC)
660 .withPriority(priority)
661 .withSelector(mcast.build())
662 .withMeta(metabuilder.build());
alshabibfc1cb032016-02-17 15:37:56 -0800663
Esin Karaman39b24852019-08-28 13:57:30 +0000664 return fwdBuilder;
alshabibfc1cb032016-02-17 15:37:56 -0800665 }
666
Esin Karaman39b24852019-08-28 13:57:30 +0000667 // Custom-built function, when the device is not available we need a fallback mechanism
668 private boolean isLocalLeader(DeviceId deviceId) {
669 if (!mastershipService.isLocalMaster(deviceId)) {
670 // When the device is available we just check the mastership
671 if (deviceService.isAvailable(deviceId)) {
ke han9590c812017-02-28 15:02:26 +0800672 return false;
ke han9590c812017-02-28 15:02:26 +0800673 }
Esin Karaman39b24852019-08-28 13:57:30 +0000674 // Fallback with Leadership service - device id is used as topic
675 NodeId leader = leadershipService.runForLeadership(
676 deviceId.toString()).leaderNodeId();
677 // Verify if this node is the leader
678 return clusterService.getLocalNode().id().equals(leader);
ke han9590c812017-02-28 15:02:26 +0800679 }
Esin Karaman39b24852019-08-28 13:57:30 +0000680 return true;
ke han9590c812017-02-28 15:02:26 +0800681 }
Esin Karaman39b24852019-08-28 13:57:30 +0000682
alshabib3b1eadc2016-02-01 17:57:00 -0800683}
ke hanf1709e82016-08-12 10:48:17 +0800684
ke han9590c812017-02-28 15:02:26 +0800685