blob: 7483432652181c9aa818d3a4f0247b425d1f4896 [file] [log] [blame]
alshabib3b1eadc2016-02-01 17:57:00 -08001/*
Brian O'Connorcf85aa82017-08-03 22:46:01 -07002 * Copyright 2016-present Open Networking Foundation
alshabib3b1eadc2016-02-01 17:57:00 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
alshabib772e1582016-06-01 17:50:05 -070016package org.opencord.cordmcast;
alshabib3b1eadc2016-02-01 17:57:00 -080017
Esin Karaman73b0e572019-08-28 13:57:30 +000018import com.google.common.collect.ImmutableSet;
19import com.google.common.collect.Sets;
alshabib09069c92016-02-21 14:49:51 -080020import org.apache.commons.lang3.tuple.ImmutablePair;
alshabib3b1eadc2016-02-01 17:57:00 -080021import org.apache.felix.scr.annotations.Activate;
22import org.apache.felix.scr.annotations.Component;
23import org.apache.felix.scr.annotations.Deactivate;
Jonathan Hart28271642016-02-10 16:13:54 -080024import org.apache.felix.scr.annotations.Modified;
25import org.apache.felix.scr.annotations.Property;
alshabib3b1eadc2016-02-01 17:57:00 -080026import org.apache.felix.scr.annotations.Reference;
27import org.apache.felix.scr.annotations.ReferenceCardinality;
28import org.onlab.packet.Ethernet;
alshabib3b1eadc2016-02-01 17:57:00 -080029import org.onlab.packet.IpAddress;
30import org.onlab.packet.VlanId;
Esin Karaman73b0e572019-08-28 13:57:30 +000031import org.onlab.util.KryoNamespace;
Jonathan Hart28271642016-02-10 16:13:54 -080032import org.onosproject.cfg.ComponentConfigService;
Esin Karaman73b0e572019-08-28 13:57:30 +000033import org.onosproject.cluster.ClusterService;
34import org.onosproject.cluster.LeadershipService;
35import org.onosproject.cluster.NodeId;
alshabib3b1eadc2016-02-01 17:57:00 -080036import org.onosproject.core.ApplicationId;
37import org.onosproject.core.CoreService;
Esin Karaman73b0e572019-08-28 13:57:30 +000038import org.onosproject.mastership.MastershipService;
39import org.onosproject.mcast.api.McastEvent;
40import org.onosproject.mcast.api.McastListener;
41import org.onosproject.mcast.api.McastRoute;
42import org.onosproject.mcast.api.MulticastRouteService;
alshabib3b1eadc2016-02-01 17:57:00 -080043import org.onosproject.net.ConnectPoint;
ke han9590c812017-02-28 15:02:26 +080044import org.onosproject.net.DeviceId;
Esin Karaman73b0e572019-08-28 13:57:30 +000045import org.onosproject.net.HostId;
ke han9590c812017-02-28 15:02:26 +080046import org.onosproject.net.PortNumber;
ke hanf1709e82016-08-12 10:48:17 +080047import org.onosproject.net.config.ConfigFactory;
48import org.onosproject.net.config.NetworkConfigEvent;
49import org.onosproject.net.config.NetworkConfigListener;
50import org.onosproject.net.config.NetworkConfigRegistry;
Esin Karaman73b0e572019-08-28 13:57:30 +000051import org.onosproject.net.config.basics.McastConfig;
ke hanf1709e82016-08-12 10:48:17 +080052import org.onosproject.net.config.basics.SubjectFactories;
Esin Karaman73b0e572019-08-28 13:57:30 +000053import org.onosproject.net.device.DeviceService;
alshabib3b1eadc2016-02-01 17:57:00 -080054import org.onosproject.net.flow.DefaultTrafficSelector;
55import org.onosproject.net.flow.DefaultTrafficTreatment;
ke han9590c812017-02-28 15:02:26 +080056import org.onosproject.net.flow.FlowRuleService;
alshabib3b1eadc2016-02-01 17:57:00 -080057import org.onosproject.net.flow.TrafficSelector;
58import org.onosproject.net.flowobjective.DefaultForwardingObjective;
59import org.onosproject.net.flowobjective.DefaultNextObjective;
Esin Karaman73b0e572019-08-28 13:57:30 +000060import org.onosproject.net.flowobjective.DefaultObjectiveContext;
alshabib3b1eadc2016-02-01 17:57:00 -080061import org.onosproject.net.flowobjective.FlowObjectiveService;
62import org.onosproject.net.flowobjective.ForwardingObjective;
63import org.onosproject.net.flowobjective.NextObjective;
64import org.onosproject.net.flowobjective.Objective;
65import org.onosproject.net.flowobjective.ObjectiveContext;
66import org.onosproject.net.flowobjective.ObjectiveError;
Esin Karaman73b0e572019-08-28 13:57:30 +000067import org.onosproject.store.serializers.KryoNamespaces;
68import org.onosproject.store.service.ConsistentMap;
69import org.onosproject.store.service.Serializer;
70import org.onosproject.store.service.StorageService;
71import org.onosproject.store.service.Versioned;
Charles Chane8ed8ee2016-06-13 16:37:01 -070072import org.opencord.cordconfig.CordConfigService;
Esin Karaman73b0e572019-08-28 13:57:30 +000073import org.opencord.cordconfig.access.AccessDeviceData;
Jonathan Hart28271642016-02-10 16:13:54 -080074import org.osgi.service.component.ComponentContext;
alshabib3b1eadc2016-02-01 17:57:00 -080075import org.slf4j.Logger;
76
Jonathan Hart28271642016-02-10 16:13:54 -080077import java.util.Dictionary;
alshabib3b1eadc2016-02-01 17:57:00 -080078import java.util.Map;
ke han9590c812017-02-28 15:02:26 +080079import java.util.Objects;
Jonathan Hart0c194962016-05-23 17:08:15 -070080import java.util.Optional;
alshabibfc1cb032016-02-17 15:37:56 -080081import java.util.Properties;
Esin Karaman73b0e572019-08-28 13:57:30 +000082import java.util.Set;
83import java.util.concurrent.ExecutorService;
84import java.util.concurrent.locks.Lock;
85import java.util.concurrent.locks.ReentrantLock;
alshabib3b1eadc2016-02-01 17:57:00 -080086
alshabibfc1cb032016-02-17 15:37:56 -080087import static com.google.common.base.Strings.isNullOrEmpty;
Esin Karaman73b0e572019-08-28 13:57:30 +000088import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
alshabibfc1cb032016-02-17 15:37:56 -080089import static org.onlab.util.Tools.get;
Esin Karaman73b0e572019-08-28 13:57:30 +000090import static org.onlab.util.Tools.groupedThreads;
alshabib3b1eadc2016-02-01 17:57:00 -080091import static org.slf4j.LoggerFactory.getLogger;
92
Esin Karaman73b0e572019-08-28 13:57:30 +000093
alshabib3b1eadc2016-02-01 17:57:00 -080094/**
Jonathan Hartc3f84eb2016-02-19 12:44:36 -080095 * CORD multicast provisioning application. Operates by listening to
Jonathan Hart28271642016-02-10 16:13:54 -080096 * events on the multicast rib and provisioning groups to program multicast
alshabib3b1eadc2016-02-01 17:57:00 -080097 * flows on the dataplane.
98 */
99@Component(immediate = true)
100public class CordMcast {
Charles Chanf867c4b2017-01-20 11:22:25 -0800101 private static final String APP_NAME = "org.opencord.cordmcast";
alshabib3b1eadc2016-02-01 17:57:00 -0800102
Jonathan Hart0c194962016-05-23 17:08:15 -0700103 private final Logger log = getLogger(getClass());
alshabib09069c92016-02-21 14:49:51 -0800104
alshabib09069c92016-02-21 14:49:51 -0800105 private static final int DEFAULT_PRIORITY = 500;
alshabib3b1eadc2016-02-01 17:57:00 -0800106 private static final short DEFAULT_MCAST_VLAN = 4000;
alshabib09069c92016-02-21 14:49:51 -0800107 private static final boolean DEFAULT_VLAN_ENABLED = true;
alshabibfc1cb032016-02-17 15:37:56 -0800108
alshabib3b1eadc2016-02-01 17:57:00 -0800109 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
110 protected MulticastRouteService mcastService;
111
112 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
alshabib3b1eadc2016-02-01 17:57:00 -0800113 protected FlowObjectiveService flowObjectiveService;
114
115 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
116 protected CoreService coreService;
117
Jonathan Hart28271642016-02-10 16:13:54 -0800118 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jonathan Hart28271642016-02-10 16:13:54 -0800119 protected ComponentConfigService componentConfigService;
120
alshabib09069c92016-02-21 14:49:51 -0800121 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jonathan Hart0c194962016-05-23 17:08:15 -0700122 protected CordConfigService cordConfigService;
alshabib09069c92016-02-21 14:49:51 -0800123
ke hanf1709e82016-08-12 10:48:17 +0800124 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
125 protected NetworkConfigRegistry networkConfig;
126
ke han9590c812017-02-28 15:02:26 +0800127 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
128 protected FlowRuleService flowRuleService;
129
Esin Karaman73b0e572019-08-28 13:57:30 +0000130 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
131 protected StorageService storageService;
132
133 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
134 protected MastershipService mastershipService;
135
136 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
137 public DeviceService deviceService;
138
139 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
140 private ClusterService clusterService;
141
142 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
143 private LeadershipService leadershipService;
144
alshabib3b1eadc2016-02-01 17:57:00 -0800145 protected McastListener listener = new InternalMulticastListener();
ke hanf1709e82016-08-12 10:48:17 +0800146 private InternalNetworkConfigListener configListener =
147 new InternalNetworkConfigListener();
alshabib3b1eadc2016-02-01 17:57:00 -0800148
Esin Karaman73b0e572019-08-28 13:57:30 +0000149 private ConsistentMap<NextKey, NextContent> groups;
alshabib3b1eadc2016-02-01 17:57:00 -0800150
alshabib3b1eadc2016-02-01 17:57:00 -0800151 private ApplicationId appId;
ke hanf1709e82016-08-12 10:48:17 +0800152 private ApplicationId coreAppId;
Esin Karaman73b0e572019-08-28 13:57:30 +0000153 private short mcastVlan = DEFAULT_MCAST_VLAN;
alshabib3b1eadc2016-02-01 17:57:00 -0800154
alshabib09069c92016-02-21 14:49:51 -0800155 @Property(name = "vlanEnabled", boolValue = DEFAULT_VLAN_ENABLED,
156 label = "Use vlan for multicast traffic?")
157 private boolean vlanEnabled = DEFAULT_VLAN_ENABLED;
alshabibfc1cb032016-02-17 15:37:56 -0800158
159 @Property(name = "priority", intValue = DEFAULT_PRIORITY,
160 label = "Priority for multicast rules")
alshabib3b1eadc2016-02-01 17:57:00 -0800161 private int priority = DEFAULT_PRIORITY;
162
ke hanf1709e82016-08-12 10:48:17 +0800163 private static final Class<McastConfig> CORD_MCAST_CONFIG_CLASS =
164 McastConfig.class;
165
166 private ConfigFactory<ApplicationId, McastConfig> cordMcastConfigFactory =
167 new ConfigFactory<ApplicationId, McastConfig>(
168 SubjectFactories.APP_SUBJECT_FACTORY, CORD_MCAST_CONFIG_CLASS, "multicast") {
169 @Override
170 public McastConfig createConfig() {
171 return new McastConfig();
172 }
173 };
Jonathan Hart28271642016-02-10 16:13:54 -0800174
Esin Karaman73b0e572019-08-28 13:57:30 +0000175 // lock to synchronize local operations
176 private final Lock mcastLock = new ReentrantLock();
177 private void mcastLock() {
178 mcastLock.lock();
179 }
180 private void mcastUnlock() {
181 mcastLock.unlock();
182 }
183 private ExecutorService eventExecutor;
184
alshabib3b1eadc2016-02-01 17:57:00 -0800185 @Activate
Jonathan Hart435ffc42016-02-19 10:32:05 -0800186 public void activate(ComponentContext context) {
Jonathan Hartc3f84eb2016-02-19 12:44:36 -0800187 componentConfigService.registerProperties(getClass());
Jonathan Hart435ffc42016-02-19 10:32:05 -0800188 modified(context);
189
Charles Chanf867c4b2017-01-20 11:22:25 -0800190 appId = coreService.registerApplication(APP_NAME);
ke hanf1709e82016-08-12 10:48:17 +0800191 coreAppId = coreService.registerApplication(CoreService.CORE_APP_NAME);
Jonathan Hart28271642016-02-10 16:13:54 -0800192
Esin Karaman73b0e572019-08-28 13:57:30 +0000193 eventExecutor = newSingleThreadScheduledExecutor(groupedThreads("cord/mcast",
194 "events-mcast-%d", log));
195
196 KryoNamespace.Builder groupsKryo = new KryoNamespace.Builder()
197 .register(KryoNamespaces.API)
198 .register(NextKey.class)
199 .register(NextContent.class);
200 groups = storageService
201 .<NextKey, NextContent>consistentMapBuilder()
202 .withName("cord-mcast-groups-store")
203 .withSerializer(Serializer.using(groupsKryo.build("CordMcast-Groups")))
204 .build();
205
ke hanf1709e82016-08-12 10:48:17 +0800206 networkConfig.registerConfigFactory(cordMcastConfigFactory);
207 networkConfig.addListener(configListener);
Jonathan Hartc3f84eb2016-02-19 12:44:36 -0800208 mcastService.addListener(listener);
209
alshabib09069c92016-02-21 14:49:51 -0800210 mcastService.getRoutes().stream()
Esin Karaman73b0e572019-08-28 13:57:30 +0000211 .map(r -> new ImmutablePair<>(r, mcastService.sinks(r)))
alshabib09069c92016-02-21 14:49:51 -0800212 .filter(pair -> pair.getRight() != null && !pair.getRight().isEmpty())
Esin Karaman73b0e572019-08-28 13:57:30 +0000213 .forEach(pair -> pair.getRight().forEach(sink -> addSink(pair.getLeft(),
214 sink)));
Jonathan Hartc3f84eb2016-02-19 12:44:36 -0800215
ke hanf1709e82016-08-12 10:48:17 +0800216 McastConfig config = networkConfig.getConfig(coreAppId, CORD_MCAST_CONFIG_CLASS);
Esin Karaman73b0e572019-08-28 13:57:30 +0000217 updateConfig(config);
ke hanf1709e82016-08-12 10:48:17 +0800218
alshabib3b1eadc2016-02-01 17:57:00 -0800219 log.info("Started");
220 }
221
222 @Deactivate
223 public void deactivate() {
Jonathan Hartc3f84eb2016-02-19 12:44:36 -0800224 componentConfigService.unregisterProperties(getClass(), false);
alshabib3b1eadc2016-02-01 17:57:00 -0800225 mcastService.removeListener(listener);
ke hanf1709e82016-08-12 10:48:17 +0800226 networkConfig.removeListener(configListener);
ke han9590c812017-02-28 15:02:26 +0800227 networkConfig.unregisterConfigFactory(cordMcastConfigFactory);
Esin Karaman73b0e572019-08-28 13:57:30 +0000228 eventExecutor.shutdown();
ke han9590c812017-02-28 15:02:26 +0800229 clearGroups();
Esin Karaman73b0e572019-08-28 13:57:30 +0000230 groups.destroy();
alshabib3b1eadc2016-02-01 17:57:00 -0800231 log.info("Stopped");
232 }
233
ke han9590c812017-02-28 15:02:26 +0800234 public void clearGroups() {
Esin Karaman73b0e572019-08-28 13:57:30 +0000235 mcastLock();
236 try {
237 groups.keySet().forEach(groupInfo -> {
238 if (!isLocalLeader(groupInfo.getDevice())) {
239 return;
240 }
241 NextContent next = groups.get(groupInfo).value();
242
Sonal Kasliwalee230262020-01-06 10:46:30 +0000243 if (next != null) {
244 ObjectiveContext context = new DefaultObjectiveContext(
245 (objective) -> log.debug("Successfully remove {}",
246 groupInfo.group),
247 (objective, error) -> log.warn("Failed to remove {}: {}",
248 groupInfo.group, error));
249 // remove the flow rule
250 flowObjectiveService.forward(groupInfo.getDevice(), fwdObject(next.getNextId(),
251 groupInfo.group).remove(context));
252 // remove all ports from the group
253 next.getOutPorts().stream().forEach(portNumber ->
254 flowObjectiveService.next(groupInfo.getDevice(), nextObject(next.getNextId(),
255 portNumber,
256 NextType.RemoveFromExisting,
257 groupInfo.group))
258 );
Esin Karaman73b0e572019-08-28 13:57:30 +0000259
Sonal Kasliwalee230262020-01-06 10:46:30 +0000260 }
Esin Karaman73b0e572019-08-28 13:57:30 +0000261 });
262 groups.clear();
263 } finally {
264 mcastUnlock();
265 }
266 }
267
268 private VlanId multicastVlan() {
269 return VlanId.vlanId(mcastVlan);
270 }
271
272 private VlanId assignedVlan() {
273 return vlanEnabled ? multicastVlan() : VlanId.NONE;
ke han9590c812017-02-28 15:02:26 +0800274 }
275
Jonathan Hart28271642016-02-10 16:13:54 -0800276 @Modified
277 public void modified(ComponentContext context) {
alshabibfc1cb032016-02-17 15:37:56 -0800278 Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
279
alshabibfc1cb032016-02-17 15:37:56 -0800280 try {
Esin Karaman73b0e572019-08-28 13:57:30 +0000281 String s = get(properties, "vlanEnabled");
alshabib09069c92016-02-21 14:49:51 -0800282 vlanEnabled = isNullOrEmpty(s) ? DEFAULT_VLAN_ENABLED : Boolean.parseBoolean(s.trim());
alshabibfc1cb032016-02-17 15:37:56 -0800283
284 s = get(properties, "priority");
285 priority = isNullOrEmpty(s) ? DEFAULT_PRIORITY : Integer.parseInt(s.trim());
286
alshabibfc1cb032016-02-17 15:37:56 -0800287 } catch (Exception e) {
Esin Karaman73b0e572019-08-28 13:57:30 +0000288 log.error("Unable to parse configuration parameter.", e);
alshabibfc1cb032016-02-17 15:37:56 -0800289 vlanEnabled = false;
290 priority = DEFAULT_PRIORITY;
291 }
Jonathan Hart28271642016-02-10 16:13:54 -0800292 }
293
alshabib3b1eadc2016-02-01 17:57:00 -0800294 private class InternalMulticastListener implements McastListener {
295 @Override
296 public void event(McastEvent event) {
Esin Karaman73b0e572019-08-28 13:57:30 +0000297 eventExecutor.execute(() -> {
298 switch (event.type()) {
299 case ROUTE_ADDED:
300 case ROUTE_REMOVED:
301 case SOURCES_ADDED:
302 break;
303 case SINKS_ADDED:
304 addSinks(event);
305 break;
306 case SINKS_REMOVED:
307 removeSinks(event);
308 break;
309 default:
310 log.warn("Unknown mcast event {}", event.type());
311 }
312 });
313 }
314 }
315
316 /**
317 * Processes previous, and new sinks then finds the sinks to be removed.
318 * @param prevSinks the previous sinks to be evaluated
319 * @param newSinks the new sinks to be evaluated
320 * @returnt the set of the sinks to be removed
321 */
322 private Set<ConnectPoint> getSinksToBeRemoved(Map<HostId, Set<ConnectPoint>> prevSinks,
323 Map<HostId, Set<ConnectPoint>> newSinks) {
324 return getSinksToBeProcessed(prevSinks, newSinks);
325 }
326
327
328 /**
329 * Processes previous, and new sinks then finds the sinks to be added.
330 * @param newSinks the new sinks to be processed
331 * @param allPrevSinks all previous sinks
332 * @return the set of the sinks to be added
333 */
334 private Set<ConnectPoint> getSinksToBeAdded(Map<HostId, Set<ConnectPoint>> newSinks,
335 Map<HostId, Set<ConnectPoint>> allPrevSinks) {
336 return getSinksToBeProcessed(newSinks, allPrevSinks);
337 }
338
339 /**
340 * Gets single-homed sinks that are in set1 but not in set2.
341 * @param sinkSet1 the first sink map
342 * @param sinkSet2 the second sink map
343 * @return a set containing all the single-homed sinks found in set1 but not in set2
344 */
345 private Set<ConnectPoint> getSinksToBeProcessed(Map<HostId, Set<ConnectPoint>> sinkSet1,
346 Map<HostId, Set<ConnectPoint>> sinkSet2) {
347 final Set<ConnectPoint> sinksToBeProcessed = Sets.newHashSet();
348 sinkSet1.forEach(((hostId, connectPoints) -> {
349 if (HostId.NONE.equals(hostId)) {
350 //assume all connect points associated with HostId.NONE are single homed sinks
351 sinksToBeProcessed.addAll(connectPoints);
352 return;
353 }
354 }));
355 Set<ConnectPoint> singleHomedSinksOfSet2 = sinkSet2.get(HostId.NONE) == null ?
356 Sets.newHashSet() :
357 sinkSet2.get(HostId.NONE);
358 return Sets.difference(sinksToBeProcessed, singleHomedSinksOfSet2);
359 };
360
361
362 private void removeSinks(McastEvent event) {
363 mcastLock();
364 try {
365 Set<ConnectPoint> sinksToBeRemoved = getSinksToBeRemoved(event.prevSubject().sinks(),
366 event.subject().sinks());
367 sinksToBeRemoved.forEach(sink -> removeSink(event.subject().route().group(), sink));
368 } finally {
369 mcastUnlock();
370 }
371 }
372
373 private void removeSink(IpAddress group, ConnectPoint sink) {
374 if (!isLocalLeader(sink.deviceId())) {
375 log.debug("Not the leader of {}. Skip sink_removed event for the sink {} and group {}",
376 sink.deviceId(), sink, group);
377 return;
378 }
379
380 Optional<AccessDeviceData> oltInfo = cordConfigService.getAccessDevice(sink.deviceId());
381
382 if (!oltInfo.isPresent()) {
383 log.warn("Unknown OLT device : {}", sink.deviceId());
384 return;
385 }
386
387 log.debug("Removing sink {} from the group {}", sink, group);
388
389 NextKey key = new NextKey(sink.deviceId(), group);
390 groups.computeIfPresent(key, (k, v) -> {
391 flowObjectiveService.next(sink.deviceId(), nextObject(v.getNextId(), sink.port(),
392 NextType.RemoveFromExisting, group));
393
394 Set<PortNumber> outPorts = Sets.newHashSet(v.getOutPorts());
395 outPorts.remove(sink.port());
396
397 if (outPorts.isEmpty()) {
398 // this is the last sink
399 ObjectiveContext context = new DefaultObjectiveContext(
400 (objective) -> log.debug("Successfully remove {} on {}",
401 group, sink),
402 (objective, error) -> log.warn("Failed to remove {} on {}: {}",
403 group, sink, error));
404 ForwardingObjective fwdObj = fwdObject(v.getNextId(), group).remove(context);
405 flowObjectiveService.forward(sink.deviceId(), fwdObj);
406 }
407 // remove the whole entity if no out port exists in the port list
408 return outPorts.isEmpty() ? null : new NextContent(v.getNextId(),
409 ImmutableSet.copyOf(outPorts));
410 });
411 }
412
413 private void addSinks(McastEvent event) {
414 mcastLock();
415 try {
416 Set<ConnectPoint> sinksToBeAdded = getSinksToBeAdded(event.subject().sinks(),
417 event.prevSubject().sinks());
418 sinksToBeAdded.forEach(sink -> addSink(event.subject().route(), sink));
419 } finally {
420 mcastUnlock();
421 }
422 }
423
424 private void addSink(McastRoute route, ConnectPoint sink) {
425 if (!isLocalLeader(sink.deviceId())) {
426 log.debug("Not the leader of {}. Skip sink_added event for the sink {} and group {}",
427 sink.deviceId(), sink, route.group());
428 return;
429 }
430
431 Optional<AccessDeviceData> oltInfo = cordConfigService.getAccessDevice(sink.deviceId());
432
433 if (!oltInfo.isPresent()) {
434 log.warn("Unknown OLT device : {}", sink.deviceId());
435 return;
436 }
437
438 log.debug("Adding sink {} to the group {}", sink, route.group());
439
440 NextKey key = new NextKey(sink.deviceId(), route.group());
441 NextObjective newNextObj;
442
443 boolean theFirstSinkOfGroup = false;
444 if (!groups.containsKey(key)) {
445 // First time someone request this mcast group via this device
446 Integer nextId = flowObjectiveService.allocateNextId();
447 newNextObj = nextObject(nextId, sink.port(), NextType.AddNew, route.group());
448 // Store the new port
449 groups.put(key, new NextContent(nextId, ImmutableSet.of(sink.port())));
450 theFirstSinkOfGroup = true;
451 } else {
452 // This device already serves some subscribers of this mcast group
453 Versioned<NextContent> nextObj = groups.get(key);
454 if (nextObj.value().getOutPorts().contains(sink.port())) {
455 log.info("Group {} already serves the sink connected to {}", route.group(), sink);
456 return;
457 }
458 newNextObj = nextObject(nextObj.value().getNextId(), sink.port(),
459 NextType.AddToExisting, route.group());
460 // add new port to the group
461 Set<PortNumber> outPorts = Sets.newHashSet(nextObj.value().getOutPorts());
462 outPorts.add(sink.port());
463 groups.put(key, new NextContent(newNextObj.id(), ImmutableSet.copyOf(outPorts)));
464 }
465
466 ObjectiveContext context = new DefaultObjectiveContext(
467 (objective) -> log.debug("Successfully add {} on {}/{}, vlan {}",
468 route.group(), sink.deviceId(), sink.port().toLong(),
469 assignedVlan()),
470 (objective, error) -> {
471 log.warn("Failed to add {} on {}/{}, vlan {}: {}",
472 route.group(), sink.deviceId(), sink.port().toLong(), assignedVlan(),
473 error);
474 });
475
476 flowObjectiveService.next(sink.deviceId(), newNextObj);
477
478 if (theFirstSinkOfGroup) {
479 // create the necessary flow rule if this is the first sink request for the group
480 // on this device
481 flowObjectiveService.forward(sink.deviceId(), fwdObject(newNextObj.id(),
482 route.group()).add(context));
483 }
484 }
485
486 private class InternalNetworkConfigListener implements NetworkConfigListener {
487 @Override
488 public void event(NetworkConfigEvent event) {
489 eventExecutor.execute(() -> {
490 switch (event.type()) {
491
492 case CONFIG_ADDED:
493 case CONFIG_UPDATED:
494 if (event.configClass().equals(CORD_MCAST_CONFIG_CLASS)) {
495 McastConfig config = networkConfig.getConfig(coreAppId, CORD_MCAST_CONFIG_CLASS);
496 if (config != null) {
497 //TODO: Simply remove flows/groups, hosts will response period query
498 // and re-sent IGMP report, so the flows can be rebuild.
499 // However, better to remove and re-add mcast flow rules here
500 if (mcastVlan != config.egressVlan().toShort() && vlanEnabled) {
501 clearGroups();
502 }
503 updateConfig(config);
504 }
505 }
506 break;
507 case CONFIG_REGISTERED:
508 case CONFIG_UNREGISTERED:
509 case CONFIG_REMOVED:
510 break;
511 default:
512 break;
513 }
514 });
515 }
516 }
517
518 private void updateConfig(McastConfig config) {
519 if (config == null) {
520 return;
521 }
522 log.debug("multicast config received: {}", config);
523
524 if (config.egressVlan() != null) {
525 mcastVlan = config.egressVlan().toShort();
526 }
527 }
528
529 private class NextKey {
530 private DeviceId device;
531 private IpAddress group;
532
533 public NextKey(DeviceId deviceId, IpAddress groupAddress) {
534 device = deviceId;
535 group = groupAddress;
536 }
537
538 public DeviceId getDevice() {
539 return device;
540 }
541
542 public int hashCode() {
543 return Objects.hash(this.device, this.group);
544 }
545
546 public boolean equals(Object obj) {
547 if (this == obj) {
548 return true;
549 } else if (!(obj instanceof NextKey)) {
550 return false;
551 } else {
552 NextKey that = (NextKey) obj;
553 return this.getClass() == that.getClass() &&
554 Objects.equals(this.device, that.device) &&
555 Objects.equals(this.group, that.group);
556 }
557 }
558 }
559
560 private class NextContent {
561 private Integer nextId;
562 private Set<PortNumber> outPorts;
563
564 public NextContent(Integer nextId, Set<PortNumber> outPorts) {
565 this.nextId = nextId;
566 this.outPorts = outPorts;
567 }
568
569 public Integer getNextId() {
570 return nextId;
571 }
572
573 public Set<PortNumber> getOutPorts() {
574 return ImmutableSet.copyOf(outPorts);
575 }
576
577 public int hashCode() {
578 return Objects.hash(this.nextId, this.outPorts);
579 }
580
581 public boolean equals(Object obj) {
582 if (this == obj) {
583 return true;
584 } else if (!(obj instanceof NextContent)) {
585 return false;
586 } else {
587 NextContent that = (NextContent) obj;
588 return this.getClass() == that.getClass() &&
589 Objects.equals(this.nextId, that.nextId) &&
590 Objects.equals(this.outPorts, that.outPorts);
alshabib3b1eadc2016-02-01 17:57:00 -0800591 }
592 }
593 }
594
ke han9590c812017-02-28 15:02:26 +0800595 private enum NextType { AddNew, AddToExisting, Remove, RemoveFromExisting };
596
Esin Karaman73b0e572019-08-28 13:57:30 +0000597 private NextObjective nextObject(Integer nextId, PortNumber port,
598 NextType nextType, IpAddress mcastIp) {
ke han9590c812017-02-28 15:02:26 +0800599
Esin Karaman73b0e572019-08-28 13:57:30 +0000600 // Build the meta selector with the fwd objective info
601 TrafficSelector.Builder metadata = DefaultTrafficSelector.builder()
602 .matchIPDst(mcastIp.toIpPrefix());
603
604 if (vlanEnabled) {
605 metadata.matchVlanId(multicastVlan());
606 }
607
ke han9590c812017-02-28 15:02:26 +0800608 DefaultNextObjective.Builder build = DefaultNextObjective.builder()
609 .fromApp(appId)
610 .addTreatment(DefaultTrafficTreatment.builder().setOutput(port).build())
611 .withType(NextObjective.Type.BROADCAST)
Esin Karaman73b0e572019-08-28 13:57:30 +0000612 .withId(nextId)
613 .withMeta(metadata.build());
614
ke han9590c812017-02-28 15:02:26 +0800615 ObjectiveContext content = new ObjectiveContext() {
616 @Override
617 public void onSuccess(Objective objective) {
Esin Karaman73b0e572019-08-28 13:57:30 +0000618 log.debug("Next Objective {} installed", objective.id());
ke han9590c812017-02-28 15:02:26 +0800619 }
620
621 @Override
622 public void onError(Objective objective, ObjectiveError error) {
Esin Karaman73b0e572019-08-28 13:57:30 +0000623 log.debug("Next Objective {} failed, because {}",
624 objective.id(),
625 error);
ke han9590c812017-02-28 15:02:26 +0800626 }
627 };
628
629 switch (nextType) {
630 case AddNew:
631 return build.add(content);
632 case AddToExisting:
633 return build.addToExisting(content);
634 case Remove:
635 return build.remove(content);
636 case RemoveFromExisting:
637 return build.removeFromExisting(content);
638 default:
639 return null;
640 }
641 }
642
Esin Karaman73b0e572019-08-28 13:57:30 +0000643 private ForwardingObjective.Builder fwdObject(int nextId, IpAddress mcastIp) {
644 TrafficSelector.Builder mcast = DefaultTrafficSelector.builder()
645 .matchEthType(Ethernet.TYPE_IPV4)
646 .matchIPDst(mcastIp.toIpPrefix());
ke han9590c812017-02-28 15:02:26 +0800647
Esin Karaman73b0e572019-08-28 13:57:30 +0000648 //build the meta selector
649 TrafficSelector.Builder metabuilder = DefaultTrafficSelector.builder();
650 if (vlanEnabled) {
651 metabuilder.matchVlanId(multicastVlan());
Jonathan Hart718c0452016-02-18 15:56:22 -0800652 }
653
Esin Karaman73b0e572019-08-28 13:57:30 +0000654 ForwardingObjective.Builder fwdBuilder = DefaultForwardingObjective.builder()
655 .fromApp(appId)
656 .nextStep(nextId)
657 .makePermanent()
658 .withFlag(ForwardingObjective.Flag.SPECIFIC)
659 .withPriority(priority)
660 .withSelector(mcast.build())
661 .withMeta(metabuilder.build());
alshabibfc1cb032016-02-17 15:37:56 -0800662
Esin Karaman73b0e572019-08-28 13:57:30 +0000663 return fwdBuilder;
alshabibfc1cb032016-02-17 15:37:56 -0800664 }
665
Esin Karaman73b0e572019-08-28 13:57:30 +0000666 // Custom-built function, when the device is not available we need a fallback mechanism
667 private boolean isLocalLeader(DeviceId deviceId) {
668 if (!mastershipService.isLocalMaster(deviceId)) {
669 // When the device is available we just check the mastership
670 if (deviceService.isAvailable(deviceId)) {
ke han9590c812017-02-28 15:02:26 +0800671 return false;
ke han9590c812017-02-28 15:02:26 +0800672 }
Esin Karaman73b0e572019-08-28 13:57:30 +0000673 // Fallback with Leadership service - device id is used as topic
674 NodeId leader = leadershipService.runForLeadership(
675 deviceId.toString()).leaderNodeId();
676 // Verify if this node is the leader
677 return clusterService.getLocalNode().id().equals(leader);
ke han9590c812017-02-28 15:02:26 +0800678 }
Esin Karaman73b0e572019-08-28 13:57:30 +0000679 return true;
ke han9590c812017-02-28 15:02:26 +0800680 }
Esin Karaman73b0e572019-08-28 13:57:30 +0000681
alshabib3b1eadc2016-02-01 17:57:00 -0800682}
ke hanf1709e82016-08-12 10:48:17 +0800683
ke han9590c812017-02-28 15:02:26 +0800684