blob: bd10980d7a90b250b80981d5e363f014d4ece849 [file] [log] [blame]
alshabib3b1eadc2016-02-01 17:57:00 -08001/*
Brian O'Connorcf85aa82017-08-03 22:46:01 -07002 * Copyright 2016-present Open Networking Foundation
alshabib3b1eadc2016-02-01 17:57:00 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
alshabib772e1582016-06-01 17:50:05 -070016package org.opencord.cordmcast;
alshabib3b1eadc2016-02-01 17:57:00 -080017
Esin Karaman39b24852019-08-28 13:57:30 +000018import com.google.common.collect.ImmutableSet;
19import com.google.common.collect.Sets;
alshabib09069c92016-02-21 14:49:51 -080020import org.apache.commons.lang3.tuple.ImmutablePair;
Carmelo Cascone995fd682019-11-14 14:22:39 -080021import org.osgi.service.component.annotations.Activate;
22import org.osgi.service.component.annotations.Component;
23import org.osgi.service.component.annotations.Deactivate;
24import org.osgi.service.component.annotations.Modified;
25import org.osgi.service.component.annotations.Reference;
26import org.osgi.service.component.annotations.ReferenceCardinality;
alshabib3b1eadc2016-02-01 17:57:00 -080027import org.onlab.packet.Ethernet;
alshabib3b1eadc2016-02-01 17:57:00 -080028import org.onlab.packet.IpAddress;
29import org.onlab.packet.VlanId;
Esin Karaman39b24852019-08-28 13:57:30 +000030import org.onlab.util.KryoNamespace;
Jonathan Hart28271642016-02-10 16:13:54 -080031import org.onosproject.cfg.ComponentConfigService;
Esin Karaman39b24852019-08-28 13:57:30 +000032import org.onosproject.cluster.ClusterService;
33import org.onosproject.cluster.LeadershipService;
34import org.onosproject.cluster.NodeId;
alshabib3b1eadc2016-02-01 17:57:00 -080035import org.onosproject.core.ApplicationId;
36import org.onosproject.core.CoreService;
Esin Karaman39b24852019-08-28 13:57:30 +000037import org.onosproject.mastership.MastershipService;
38import org.onosproject.mcast.api.McastEvent;
39import org.onosproject.mcast.api.McastListener;
40import org.onosproject.mcast.api.McastRoute;
41import org.onosproject.mcast.api.MulticastRouteService;
alshabib3b1eadc2016-02-01 17:57:00 -080042import org.onosproject.net.ConnectPoint;
ke han9590c812017-02-28 15:02:26 +080043import org.onosproject.net.DeviceId;
Esin Karaman39b24852019-08-28 13:57:30 +000044import org.onosproject.net.HostId;
ke han9590c812017-02-28 15:02:26 +080045import org.onosproject.net.PortNumber;
ke hanf1709e82016-08-12 10:48:17 +080046import org.onosproject.net.config.ConfigFactory;
47import org.onosproject.net.config.NetworkConfigEvent;
48import org.onosproject.net.config.NetworkConfigListener;
49import org.onosproject.net.config.NetworkConfigRegistry;
Esin Karaman39b24852019-08-28 13:57:30 +000050import org.onosproject.net.config.basics.McastConfig;
ke hanf1709e82016-08-12 10:48:17 +080051import org.onosproject.net.config.basics.SubjectFactories;
Esin Karaman39b24852019-08-28 13:57:30 +000052import org.onosproject.net.device.DeviceService;
alshabib3b1eadc2016-02-01 17:57:00 -080053import org.onosproject.net.flow.DefaultTrafficSelector;
54import org.onosproject.net.flow.DefaultTrafficTreatment;
55import org.onosproject.net.flow.TrafficSelector;
56import org.onosproject.net.flowobjective.DefaultForwardingObjective;
57import org.onosproject.net.flowobjective.DefaultNextObjective;
Esin Karaman39b24852019-08-28 13:57:30 +000058import org.onosproject.net.flowobjective.DefaultObjectiveContext;
alshabib3b1eadc2016-02-01 17:57:00 -080059import org.onosproject.net.flowobjective.FlowObjectiveService;
60import org.onosproject.net.flowobjective.ForwardingObjective;
61import org.onosproject.net.flowobjective.NextObjective;
62import org.onosproject.net.flowobjective.Objective;
63import org.onosproject.net.flowobjective.ObjectiveContext;
64import org.onosproject.net.flowobjective.ObjectiveError;
Esin Karaman39b24852019-08-28 13:57:30 +000065import org.onosproject.store.serializers.KryoNamespaces;
66import org.onosproject.store.service.ConsistentMap;
67import org.onosproject.store.service.Serializer;
68import org.onosproject.store.service.StorageService;
69import org.onosproject.store.service.Versioned;
Charles Chane8ed8ee2016-06-13 16:37:01 -070070import org.opencord.cordconfig.CordConfigService;
Esin Karaman39b24852019-08-28 13:57:30 +000071import org.opencord.cordconfig.access.AccessDeviceData;
Jonathan Hart28271642016-02-10 16:13:54 -080072import org.osgi.service.component.ComponentContext;
alshabib3b1eadc2016-02-01 17:57:00 -080073import org.slf4j.Logger;
74
Jonathan Hart28271642016-02-10 16:13:54 -080075import java.util.Dictionary;
alshabib3b1eadc2016-02-01 17:57:00 -080076import java.util.Map;
ke han9590c812017-02-28 15:02:26 +080077import java.util.Objects;
Jonathan Hart0c194962016-05-23 17:08:15 -070078import java.util.Optional;
alshabibfc1cb032016-02-17 15:37:56 -080079import java.util.Properties;
Esin Karaman39b24852019-08-28 13:57:30 +000080import java.util.Set;
81import java.util.concurrent.ExecutorService;
82import java.util.concurrent.locks.Lock;
83import java.util.concurrent.locks.ReentrantLock;
alshabib3b1eadc2016-02-01 17:57:00 -080084
alshabibfc1cb032016-02-17 15:37:56 -080085import static com.google.common.base.Strings.isNullOrEmpty;
Esin Karaman39b24852019-08-28 13:57:30 +000086import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
alshabibfc1cb032016-02-17 15:37:56 -080087import static org.onlab.util.Tools.get;
Esin Karaman39b24852019-08-28 13:57:30 +000088import static org.onlab.util.Tools.groupedThreads;
Carmelo Cascone995fd682019-11-14 14:22:39 -080089import static org.opencord.cordmcast.OsgiPropertyConstants.DEFAULT_VLAN_ENABLED;
90import static org.opencord.cordmcast.OsgiPropertyConstants.DEFAULT_PRIORITY;
91import static org.opencord.cordmcast.OsgiPropertyConstants.PRIORITY;
92import static org.opencord.cordmcast.OsgiPropertyConstants.VLAN_ENABLED;
alshabib3b1eadc2016-02-01 17:57:00 -080093import static org.slf4j.LoggerFactory.getLogger;
94
Esin Karaman39b24852019-08-28 13:57:30 +000095
alshabib3b1eadc2016-02-01 17:57:00 -080096/**
Jonathan Hartc3f84eb2016-02-19 12:44:36 -080097 * CORD multicast provisioning application. Operates by listening to
Jonathan Hart28271642016-02-10 16:13:54 -080098 * events on the multicast rib and provisioning groups to program multicast
alshabib3b1eadc2016-02-01 17:57:00 -080099 * flows on the dataplane.
100 */
Carmelo Cascone995fd682019-11-14 14:22:39 -0800101@Component(immediate = true,
102 property = {
103 VLAN_ENABLED + ":Boolean=" + DEFAULT_VLAN_ENABLED,
104 PRIORITY + ":Integer=" + DEFAULT_PRIORITY,
105})
alshabib3b1eadc2016-02-01 17:57:00 -0800106public class CordMcast {
Charles Chanf867c4b2017-01-20 11:22:25 -0800107 private static final String APP_NAME = "org.opencord.cordmcast";
alshabib3b1eadc2016-02-01 17:57:00 -0800108
Jonathan Hart0c194962016-05-23 17:08:15 -0700109 private final Logger log = getLogger(getClass());
alshabib09069c92016-02-21 14:49:51 -0800110
alshabib09069c92016-02-21 14:49:51 -0800111 private static final int DEFAULT_PRIORITY = 500;
alshabib3b1eadc2016-02-01 17:57:00 -0800112 private static final short DEFAULT_MCAST_VLAN = 4000;
alshabibfc1cb032016-02-17 15:37:56 -0800113
Carmelo Cascone995fd682019-11-14 14:22:39 -0800114 @Reference(cardinality = ReferenceCardinality.MANDATORY)
alshabib3b1eadc2016-02-01 17:57:00 -0800115 protected MulticastRouteService mcastService;
116
Carmelo Cascone995fd682019-11-14 14:22:39 -0800117 @Reference(cardinality = ReferenceCardinality.MANDATORY)
alshabib3b1eadc2016-02-01 17:57:00 -0800118 protected FlowObjectiveService flowObjectiveService;
119
Carmelo Cascone995fd682019-11-14 14:22:39 -0800120 @Reference(cardinality = ReferenceCardinality.MANDATORY)
alshabib3b1eadc2016-02-01 17:57:00 -0800121 protected CoreService coreService;
122
Carmelo Cascone995fd682019-11-14 14:22:39 -0800123 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jonathan Hart28271642016-02-10 16:13:54 -0800124 protected ComponentConfigService componentConfigService;
125
Carmelo Cascone995fd682019-11-14 14:22:39 -0800126 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jonathan Hart0c194962016-05-23 17:08:15 -0700127 protected CordConfigService cordConfigService;
alshabib09069c92016-02-21 14:49:51 -0800128
Carmelo Cascone995fd682019-11-14 14:22:39 -0800129 @Reference(cardinality = ReferenceCardinality.MANDATORY)
ke hanf1709e82016-08-12 10:48:17 +0800130 protected NetworkConfigRegistry networkConfig;
131
Carmelo Cascone995fd682019-11-14 14:22:39 -0800132 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Esin Karaman39b24852019-08-28 13:57:30 +0000133 protected StorageService storageService;
134
Carmelo Cascone995fd682019-11-14 14:22:39 -0800135 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Esin Karaman39b24852019-08-28 13:57:30 +0000136 protected MastershipService mastershipService;
137
Carmelo Cascone995fd682019-11-14 14:22:39 -0800138 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Esin Karaman39b24852019-08-28 13:57:30 +0000139 public DeviceService deviceService;
140
Carmelo Cascone995fd682019-11-14 14:22:39 -0800141 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Esin Karaman39b24852019-08-28 13:57:30 +0000142 private ClusterService clusterService;
143
Carmelo Cascone995fd682019-11-14 14:22:39 -0800144 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Esin Karaman39b24852019-08-28 13:57:30 +0000145 private LeadershipService leadershipService;
146
alshabib3b1eadc2016-02-01 17:57:00 -0800147 protected McastListener listener = new InternalMulticastListener();
ke hanf1709e82016-08-12 10:48:17 +0800148 private InternalNetworkConfigListener configListener =
149 new InternalNetworkConfigListener();
alshabib3b1eadc2016-02-01 17:57:00 -0800150
Esin Karaman39b24852019-08-28 13:57:30 +0000151 private ConsistentMap<NextKey, NextContent> groups;
alshabib3b1eadc2016-02-01 17:57:00 -0800152
alshabib3b1eadc2016-02-01 17:57:00 -0800153 private ApplicationId appId;
ke hanf1709e82016-08-12 10:48:17 +0800154 private ApplicationId coreAppId;
Esin Karaman39b24852019-08-28 13:57:30 +0000155 private short mcastVlan = DEFAULT_MCAST_VLAN;
alshabib3b1eadc2016-02-01 17:57:00 -0800156
Carmelo Cascone995fd682019-11-14 14:22:39 -0800157 /**
158 * Whether to use VLAN for multicast traffic.
159 **/
alshabib09069c92016-02-21 14:49:51 -0800160 private boolean vlanEnabled = DEFAULT_VLAN_ENABLED;
alshabibfc1cb032016-02-17 15:37:56 -0800161
Carmelo Cascone995fd682019-11-14 14:22:39 -0800162 /**
163 * Priority for multicast rules.
164 **/
alshabib3b1eadc2016-02-01 17:57:00 -0800165 private int priority = DEFAULT_PRIORITY;
166
ke hanf1709e82016-08-12 10:48:17 +0800167 private static final Class<McastConfig> CORD_MCAST_CONFIG_CLASS =
168 McastConfig.class;
169
170 private ConfigFactory<ApplicationId, McastConfig> cordMcastConfigFactory =
171 new ConfigFactory<ApplicationId, McastConfig>(
172 SubjectFactories.APP_SUBJECT_FACTORY, CORD_MCAST_CONFIG_CLASS, "multicast") {
173 @Override
174 public McastConfig createConfig() {
175 return new McastConfig();
176 }
177 };
Jonathan Hart28271642016-02-10 16:13:54 -0800178
Esin Karaman39b24852019-08-28 13:57:30 +0000179 // lock to synchronize local operations
180 private final Lock mcastLock = new ReentrantLock();
181 private void mcastLock() {
182 mcastLock.lock();
183 }
184 private void mcastUnlock() {
185 mcastLock.unlock();
186 }
187 private ExecutorService eventExecutor;
188
alshabib3b1eadc2016-02-01 17:57:00 -0800189 @Activate
Jonathan Hart435ffc42016-02-19 10:32:05 -0800190 public void activate(ComponentContext context) {
Jonathan Hartc3f84eb2016-02-19 12:44:36 -0800191 componentConfigService.registerProperties(getClass());
Jonathan Hart435ffc42016-02-19 10:32:05 -0800192 modified(context);
193
Charles Chanf867c4b2017-01-20 11:22:25 -0800194 appId = coreService.registerApplication(APP_NAME);
ke hanf1709e82016-08-12 10:48:17 +0800195 coreAppId = coreService.registerApplication(CoreService.CORE_APP_NAME);
Jonathan Hart28271642016-02-10 16:13:54 -0800196
Esin Karaman39b24852019-08-28 13:57:30 +0000197 eventExecutor = newSingleThreadScheduledExecutor(groupedThreads("cord/mcast",
198 "events-mcast-%d", log));
199
200 KryoNamespace.Builder groupsKryo = new KryoNamespace.Builder()
201 .register(KryoNamespaces.API)
202 .register(NextKey.class)
203 .register(NextContent.class);
204 groups = storageService
205 .<NextKey, NextContent>consistentMapBuilder()
206 .withName("cord-mcast-groups-store")
207 .withSerializer(Serializer.using(groupsKryo.build("CordMcast-Groups")))
208 .build();
209
ke hanf1709e82016-08-12 10:48:17 +0800210 networkConfig.registerConfigFactory(cordMcastConfigFactory);
211 networkConfig.addListener(configListener);
Jonathan Hartc3f84eb2016-02-19 12:44:36 -0800212 mcastService.addListener(listener);
213
alshabib09069c92016-02-21 14:49:51 -0800214 mcastService.getRoutes().stream()
Esin Karaman39b24852019-08-28 13:57:30 +0000215 .map(r -> new ImmutablePair<>(r, mcastService.sinks(r)))
alshabib09069c92016-02-21 14:49:51 -0800216 .filter(pair -> pair.getRight() != null && !pair.getRight().isEmpty())
Esin Karaman39b24852019-08-28 13:57:30 +0000217 .forEach(pair -> pair.getRight().forEach(sink -> addSink(pair.getLeft(),
218 sink)));
Jonathan Hartc3f84eb2016-02-19 12:44:36 -0800219
ke hanf1709e82016-08-12 10:48:17 +0800220 McastConfig config = networkConfig.getConfig(coreAppId, CORD_MCAST_CONFIG_CLASS);
Esin Karaman39b24852019-08-28 13:57:30 +0000221 updateConfig(config);
ke hanf1709e82016-08-12 10:48:17 +0800222
alshabib3b1eadc2016-02-01 17:57:00 -0800223 log.info("Started");
224 }
225
226 @Deactivate
227 public void deactivate() {
Jonathan Hartc3f84eb2016-02-19 12:44:36 -0800228 componentConfigService.unregisterProperties(getClass(), false);
alshabib3b1eadc2016-02-01 17:57:00 -0800229 mcastService.removeListener(listener);
ke hanf1709e82016-08-12 10:48:17 +0800230 networkConfig.removeListener(configListener);
ke han9590c812017-02-28 15:02:26 +0800231 networkConfig.unregisterConfigFactory(cordMcastConfigFactory);
Esin Karaman39b24852019-08-28 13:57:30 +0000232 eventExecutor.shutdown();
ke han9590c812017-02-28 15:02:26 +0800233 clearGroups();
Esin Karaman39b24852019-08-28 13:57:30 +0000234 groups.destroy();
alshabib3b1eadc2016-02-01 17:57:00 -0800235 log.info("Stopped");
236 }
237
ke han9590c812017-02-28 15:02:26 +0800238 public void clearGroups() {
Esin Karaman39b24852019-08-28 13:57:30 +0000239 mcastLock();
240 try {
241 groups.keySet().forEach(groupInfo -> {
242 if (!isLocalLeader(groupInfo.getDevice())) {
243 return;
244 }
245 NextContent next = groups.get(groupInfo).value();
246
Sonal Kasliwala0bbe6c2020-01-06 10:46:30 +0000247 if (next != null) {
248 ObjectiveContext context = new DefaultObjectiveContext(
249 (objective) -> log.debug("Successfully remove {}",
250 groupInfo.group),
251 (objective, error) -> log.warn("Failed to remove {}: {}",
252 groupInfo.group, error));
253 // remove the flow rule
254 flowObjectiveService.forward(groupInfo.getDevice(), fwdObject(next.getNextId(),
255 groupInfo.group).remove(context));
256 // remove all ports from the group
257 next.getOutPorts().stream().forEach(portNumber ->
258 flowObjectiveService.next(groupInfo.getDevice(), nextObject(next.getNextId(),
259 portNumber,
260 NextType.RemoveFromExisting,
261 groupInfo.group))
262 );
Esin Karaman39b24852019-08-28 13:57:30 +0000263
Sonal Kasliwala0bbe6c2020-01-06 10:46:30 +0000264 }
Esin Karaman39b24852019-08-28 13:57:30 +0000265 });
266 groups.clear();
267 } finally {
268 mcastUnlock();
269 }
270 }
271
272 private VlanId multicastVlan() {
273 return VlanId.vlanId(mcastVlan);
274 }
275
276 private VlanId assignedVlan() {
277 return vlanEnabled ? multicastVlan() : VlanId.NONE;
ke han9590c812017-02-28 15:02:26 +0800278 }
279
Jonathan Hart28271642016-02-10 16:13:54 -0800280 @Modified
281 public void modified(ComponentContext context) {
alshabibfc1cb032016-02-17 15:37:56 -0800282 Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
283
alshabibfc1cb032016-02-17 15:37:56 -0800284 try {
Esin Karaman39b24852019-08-28 13:57:30 +0000285 String s = get(properties, "vlanEnabled");
alshabib09069c92016-02-21 14:49:51 -0800286 vlanEnabled = isNullOrEmpty(s) ? DEFAULT_VLAN_ENABLED : Boolean.parseBoolean(s.trim());
alshabibfc1cb032016-02-17 15:37:56 -0800287
288 s = get(properties, "priority");
289 priority = isNullOrEmpty(s) ? DEFAULT_PRIORITY : Integer.parseInt(s.trim());
290
alshabibfc1cb032016-02-17 15:37:56 -0800291 } catch (Exception e) {
Esin Karaman39b24852019-08-28 13:57:30 +0000292 log.error("Unable to parse configuration parameter.", e);
alshabibfc1cb032016-02-17 15:37:56 -0800293 vlanEnabled = false;
294 priority = DEFAULT_PRIORITY;
295 }
Jonathan Hart28271642016-02-10 16:13:54 -0800296 }
297
alshabib3b1eadc2016-02-01 17:57:00 -0800298 private class InternalMulticastListener implements McastListener {
299 @Override
300 public void event(McastEvent event) {
Esin Karaman39b24852019-08-28 13:57:30 +0000301 eventExecutor.execute(() -> {
302 switch (event.type()) {
303 case ROUTE_ADDED:
304 case ROUTE_REMOVED:
305 case SOURCES_ADDED:
306 break;
307 case SINKS_ADDED:
308 addSinks(event);
309 break;
310 case SINKS_REMOVED:
311 removeSinks(event);
312 break;
313 default:
314 log.warn("Unknown mcast event {}", event.type());
315 }
316 });
317 }
318 }
319
320 /**
321 * Processes previous, and new sinks then finds the sinks to be removed.
322 * @param prevSinks the previous sinks to be evaluated
323 * @param newSinks the new sinks to be evaluated
324 * @returnt the set of the sinks to be removed
325 */
326 private Set<ConnectPoint> getSinksToBeRemoved(Map<HostId, Set<ConnectPoint>> prevSinks,
327 Map<HostId, Set<ConnectPoint>> newSinks) {
328 return getSinksToBeProcessed(prevSinks, newSinks);
329 }
330
331
332 /**
333 * Processes previous, and new sinks then finds the sinks to be added.
334 * @param newSinks the new sinks to be processed
335 * @param allPrevSinks all previous sinks
336 * @return the set of the sinks to be added
337 */
338 private Set<ConnectPoint> getSinksToBeAdded(Map<HostId, Set<ConnectPoint>> newSinks,
339 Map<HostId, Set<ConnectPoint>> allPrevSinks) {
340 return getSinksToBeProcessed(newSinks, allPrevSinks);
341 }
342
343 /**
344 * Gets single-homed sinks that are in set1 but not in set2.
345 * @param sinkSet1 the first sink map
346 * @param sinkSet2 the second sink map
347 * @return a set containing all the single-homed sinks found in set1 but not in set2
348 */
349 private Set<ConnectPoint> getSinksToBeProcessed(Map<HostId, Set<ConnectPoint>> sinkSet1,
350 Map<HostId, Set<ConnectPoint>> sinkSet2) {
351 final Set<ConnectPoint> sinksToBeProcessed = Sets.newHashSet();
352 sinkSet1.forEach(((hostId, connectPoints) -> {
353 if (HostId.NONE.equals(hostId)) {
354 //assume all connect points associated with HostId.NONE are single homed sinks
355 sinksToBeProcessed.addAll(connectPoints);
356 return;
357 }
358 }));
359 Set<ConnectPoint> singleHomedSinksOfSet2 = sinkSet2.get(HostId.NONE) == null ?
360 Sets.newHashSet() :
361 sinkSet2.get(HostId.NONE);
362 return Sets.difference(sinksToBeProcessed, singleHomedSinksOfSet2);
363 };
364
365
366 private void removeSinks(McastEvent event) {
367 mcastLock();
368 try {
369 Set<ConnectPoint> sinksToBeRemoved = getSinksToBeRemoved(event.prevSubject().sinks(),
370 event.subject().sinks());
371 sinksToBeRemoved.forEach(sink -> removeSink(event.subject().route().group(), sink));
372 } finally {
373 mcastUnlock();
374 }
375 }
376
377 private void removeSink(IpAddress group, ConnectPoint sink) {
378 if (!isLocalLeader(sink.deviceId())) {
379 log.debug("Not the leader of {}. Skip sink_removed event for the sink {} and group {}",
380 sink.deviceId(), sink, group);
381 return;
382 }
383
384 Optional<AccessDeviceData> oltInfo = cordConfigService.getAccessDevice(sink.deviceId());
385
386 if (!oltInfo.isPresent()) {
387 log.warn("Unknown OLT device : {}", sink.deviceId());
388 return;
389 }
390
391 log.debug("Removing sink {} from the group {}", sink, group);
392
393 NextKey key = new NextKey(sink.deviceId(), group);
394 groups.computeIfPresent(key, (k, v) -> {
395 flowObjectiveService.next(sink.deviceId(), nextObject(v.getNextId(), sink.port(),
396 NextType.RemoveFromExisting, group));
397
398 Set<PortNumber> outPorts = Sets.newHashSet(v.getOutPorts());
399 outPorts.remove(sink.port());
400
401 if (outPorts.isEmpty()) {
402 // this is the last sink
403 ObjectiveContext context = new DefaultObjectiveContext(
404 (objective) -> log.debug("Successfully remove {} on {}",
405 group, sink),
406 (objective, error) -> log.warn("Failed to remove {} on {}: {}",
407 group, sink, error));
408 ForwardingObjective fwdObj = fwdObject(v.getNextId(), group).remove(context);
409 flowObjectiveService.forward(sink.deviceId(), fwdObj);
410 }
411 // remove the whole entity if no out port exists in the port list
412 return outPorts.isEmpty() ? null : new NextContent(v.getNextId(),
413 ImmutableSet.copyOf(outPorts));
414 });
415 }
416
417 private void addSinks(McastEvent event) {
418 mcastLock();
419 try {
420 Set<ConnectPoint> sinksToBeAdded = getSinksToBeAdded(event.subject().sinks(),
421 event.prevSubject().sinks());
422 sinksToBeAdded.forEach(sink -> addSink(event.subject().route(), sink));
423 } finally {
424 mcastUnlock();
425 }
426 }
427
428 private void addSink(McastRoute route, ConnectPoint sink) {
429 if (!isLocalLeader(sink.deviceId())) {
430 log.debug("Not the leader of {}. Skip sink_added event for the sink {} and group {}",
431 sink.deviceId(), sink, route.group());
432 return;
433 }
434
435 Optional<AccessDeviceData> oltInfo = cordConfigService.getAccessDevice(sink.deviceId());
436
437 if (!oltInfo.isPresent()) {
438 log.warn("Unknown OLT device : {}", sink.deviceId());
439 return;
440 }
441
442 log.debug("Adding sink {} to the group {}", sink, route.group());
443
444 NextKey key = new NextKey(sink.deviceId(), route.group());
445 NextObjective newNextObj;
446
447 boolean theFirstSinkOfGroup = false;
448 if (!groups.containsKey(key)) {
449 // First time someone request this mcast group via this device
450 Integer nextId = flowObjectiveService.allocateNextId();
451 newNextObj = nextObject(nextId, sink.port(), NextType.AddNew, route.group());
452 // Store the new port
453 groups.put(key, new NextContent(nextId, ImmutableSet.of(sink.port())));
454 theFirstSinkOfGroup = true;
455 } else {
456 // This device already serves some subscribers of this mcast group
457 Versioned<NextContent> nextObj = groups.get(key);
458 if (nextObj.value().getOutPorts().contains(sink.port())) {
459 log.info("Group {} already serves the sink connected to {}", route.group(), sink);
460 return;
461 }
462 newNextObj = nextObject(nextObj.value().getNextId(), sink.port(),
463 NextType.AddToExisting, route.group());
464 // add new port to the group
465 Set<PortNumber> outPorts = Sets.newHashSet(nextObj.value().getOutPorts());
466 outPorts.add(sink.port());
467 groups.put(key, new NextContent(newNextObj.id(), ImmutableSet.copyOf(outPorts)));
468 }
469
470 ObjectiveContext context = new DefaultObjectiveContext(
471 (objective) -> log.debug("Successfully add {} on {}/{}, vlan {}",
472 route.group(), sink.deviceId(), sink.port().toLong(),
473 assignedVlan()),
474 (objective, error) -> {
475 log.warn("Failed to add {} on {}/{}, vlan {}: {}",
476 route.group(), sink.deviceId(), sink.port().toLong(), assignedVlan(),
477 error);
478 });
479
480 flowObjectiveService.next(sink.deviceId(), newNextObj);
481
482 if (theFirstSinkOfGroup) {
483 // create the necessary flow rule if this is the first sink request for the group
484 // on this device
485 flowObjectiveService.forward(sink.deviceId(), fwdObject(newNextObj.id(),
486 route.group()).add(context));
487 }
488 }
489
490 private class InternalNetworkConfigListener implements NetworkConfigListener {
491 @Override
492 public void event(NetworkConfigEvent event) {
493 eventExecutor.execute(() -> {
494 switch (event.type()) {
495
496 case CONFIG_ADDED:
497 case CONFIG_UPDATED:
498 if (event.configClass().equals(CORD_MCAST_CONFIG_CLASS)) {
499 McastConfig config = networkConfig.getConfig(coreAppId, CORD_MCAST_CONFIG_CLASS);
500 if (config != null) {
501 //TODO: Simply remove flows/groups, hosts will response period query
502 // and re-sent IGMP report, so the flows can be rebuild.
503 // However, better to remove and re-add mcast flow rules here
504 if (mcastVlan != config.egressVlan().toShort() && vlanEnabled) {
505 clearGroups();
506 }
507 updateConfig(config);
508 }
509 }
510 break;
511 case CONFIG_REGISTERED:
512 case CONFIG_UNREGISTERED:
513 case CONFIG_REMOVED:
514 break;
515 default:
516 break;
517 }
518 });
519 }
520 }
521
522 private void updateConfig(McastConfig config) {
523 if (config == null) {
524 return;
525 }
526 log.debug("multicast config received: {}", config);
527
528 if (config.egressVlan() != null) {
529 mcastVlan = config.egressVlan().toShort();
530 }
531 }
532
533 private class NextKey {
534 private DeviceId device;
535 private IpAddress group;
536
537 public NextKey(DeviceId deviceId, IpAddress groupAddress) {
538 device = deviceId;
539 group = groupAddress;
540 }
541
542 public DeviceId getDevice() {
543 return device;
544 }
545
546 public int hashCode() {
547 return Objects.hash(this.device, this.group);
548 }
549
550 public boolean equals(Object obj) {
551 if (this == obj) {
552 return true;
553 } else if (!(obj instanceof NextKey)) {
554 return false;
555 } else {
556 NextKey that = (NextKey) obj;
557 return this.getClass() == that.getClass() &&
558 Objects.equals(this.device, that.device) &&
559 Objects.equals(this.group, that.group);
560 }
561 }
562 }
563
564 private class NextContent {
565 private Integer nextId;
566 private Set<PortNumber> outPorts;
567
568 public NextContent(Integer nextId, Set<PortNumber> outPorts) {
569 this.nextId = nextId;
570 this.outPorts = outPorts;
571 }
572
573 public Integer getNextId() {
574 return nextId;
575 }
576
577 public Set<PortNumber> getOutPorts() {
578 return ImmutableSet.copyOf(outPorts);
579 }
580
581 public int hashCode() {
582 return Objects.hash(this.nextId, this.outPorts);
583 }
584
585 public boolean equals(Object obj) {
586 if (this == obj) {
587 return true;
588 } else if (!(obj instanceof NextContent)) {
589 return false;
590 } else {
591 NextContent that = (NextContent) obj;
592 return this.getClass() == that.getClass() &&
593 Objects.equals(this.nextId, that.nextId) &&
594 Objects.equals(this.outPorts, that.outPorts);
alshabib3b1eadc2016-02-01 17:57:00 -0800595 }
596 }
597 }
598
ke han9590c812017-02-28 15:02:26 +0800599 private enum NextType { AddNew, AddToExisting, Remove, RemoveFromExisting };
600
Esin Karaman39b24852019-08-28 13:57:30 +0000601 private NextObjective nextObject(Integer nextId, PortNumber port,
602 NextType nextType, IpAddress mcastIp) {
ke han9590c812017-02-28 15:02:26 +0800603
Esin Karaman39b24852019-08-28 13:57:30 +0000604 // Build the meta selector with the fwd objective info
605 TrafficSelector.Builder metadata = DefaultTrafficSelector.builder()
606 .matchIPDst(mcastIp.toIpPrefix());
607
608 if (vlanEnabled) {
609 metadata.matchVlanId(multicastVlan());
610 }
611
ke han9590c812017-02-28 15:02:26 +0800612 DefaultNextObjective.Builder build = DefaultNextObjective.builder()
613 .fromApp(appId)
614 .addTreatment(DefaultTrafficTreatment.builder().setOutput(port).build())
615 .withType(NextObjective.Type.BROADCAST)
Esin Karaman39b24852019-08-28 13:57:30 +0000616 .withId(nextId)
617 .withMeta(metadata.build());
618
ke han9590c812017-02-28 15:02:26 +0800619 ObjectiveContext content = new ObjectiveContext() {
620 @Override
621 public void onSuccess(Objective objective) {
Esin Karaman39b24852019-08-28 13:57:30 +0000622 log.debug("Next Objective {} installed", objective.id());
ke han9590c812017-02-28 15:02:26 +0800623 }
624
625 @Override
626 public void onError(Objective objective, ObjectiveError error) {
Esin Karaman39b24852019-08-28 13:57:30 +0000627 log.debug("Next Objective {} failed, because {}",
628 objective.id(),
629 error);
ke han9590c812017-02-28 15:02:26 +0800630 }
631 };
632
633 switch (nextType) {
634 case AddNew:
635 return build.add(content);
636 case AddToExisting:
637 return build.addToExisting(content);
638 case Remove:
639 return build.remove(content);
640 case RemoveFromExisting:
641 return build.removeFromExisting(content);
642 default:
643 return null;
644 }
645 }
646
Esin Karaman39b24852019-08-28 13:57:30 +0000647 private ForwardingObjective.Builder fwdObject(int nextId, IpAddress mcastIp) {
648 TrafficSelector.Builder mcast = DefaultTrafficSelector.builder()
649 .matchEthType(Ethernet.TYPE_IPV4)
650 .matchIPDst(mcastIp.toIpPrefix());
ke han9590c812017-02-28 15:02:26 +0800651
Esin Karaman39b24852019-08-28 13:57:30 +0000652 //build the meta selector
653 TrafficSelector.Builder metabuilder = DefaultTrafficSelector.builder();
654 if (vlanEnabled) {
655 metabuilder.matchVlanId(multicastVlan());
Jonathan Hart718c0452016-02-18 15:56:22 -0800656 }
657
Esin Karaman39b24852019-08-28 13:57:30 +0000658 ForwardingObjective.Builder fwdBuilder = DefaultForwardingObjective.builder()
659 .fromApp(appId)
660 .nextStep(nextId)
661 .makePermanent()
662 .withFlag(ForwardingObjective.Flag.SPECIFIC)
663 .withPriority(priority)
664 .withSelector(mcast.build())
665 .withMeta(metabuilder.build());
alshabibfc1cb032016-02-17 15:37:56 -0800666
Esin Karaman39b24852019-08-28 13:57:30 +0000667 return fwdBuilder;
alshabibfc1cb032016-02-17 15:37:56 -0800668 }
669
Esin Karaman39b24852019-08-28 13:57:30 +0000670 // Custom-built function, when the device is not available we need a fallback mechanism
671 private boolean isLocalLeader(DeviceId deviceId) {
672 if (!mastershipService.isLocalMaster(deviceId)) {
673 // When the device is available we just check the mastership
674 if (deviceService.isAvailable(deviceId)) {
ke han9590c812017-02-28 15:02:26 +0800675 return false;
ke han9590c812017-02-28 15:02:26 +0800676 }
Esin Karaman39b24852019-08-28 13:57:30 +0000677 // Fallback with Leadership service - device id is used as topic
678 NodeId leader = leadershipService.runForLeadership(
679 deviceId.toString()).leaderNodeId();
680 // Verify if this node is the leader
681 return clusterService.getLocalNode().id().equals(leader);
ke han9590c812017-02-28 15:02:26 +0800682 }
Esin Karaman39b24852019-08-28 13:57:30 +0000683 return true;
ke han9590c812017-02-28 15:02:26 +0800684 }
Esin Karaman39b24852019-08-28 13:57:30 +0000685
alshabib3b1eadc2016-02-01 17:57:00 -0800686}
ke hanf1709e82016-08-12 10:48:17 +0800687
ke han9590c812017-02-28 15:02:26 +0800688