blob: 2936e5330da2d4d09fe311e0e992e59e0c6bf784 [file] [log] [blame]
alshabib3b1eadc2016-02-01 17:57:00 -08001/*
Brian O'Connorcf85aa82017-08-03 22:46:01 -07002 * Copyright 2016-present Open Networking Foundation
alshabib3b1eadc2016-02-01 17:57:00 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
alshabib772e1582016-06-01 17:50:05 -070016package org.opencord.cordmcast;
alshabib3b1eadc2016-02-01 17:57:00 -080017
Esin Karaman73b0e572019-08-28 13:57:30 +000018import com.google.common.collect.ImmutableSet;
19import com.google.common.collect.Sets;
alshabib09069c92016-02-21 14:49:51 -080020import org.apache.commons.lang3.tuple.ImmutablePair;
alshabib3b1eadc2016-02-01 17:57:00 -080021import org.apache.felix.scr.annotations.Activate;
22import org.apache.felix.scr.annotations.Component;
23import org.apache.felix.scr.annotations.Deactivate;
Jonathan Hart28271642016-02-10 16:13:54 -080024import org.apache.felix.scr.annotations.Modified;
25import org.apache.felix.scr.annotations.Property;
alshabib3b1eadc2016-02-01 17:57:00 -080026import org.apache.felix.scr.annotations.Reference;
27import org.apache.felix.scr.annotations.ReferenceCardinality;
28import org.onlab.packet.Ethernet;
alshabib3b1eadc2016-02-01 17:57:00 -080029import org.onlab.packet.IpAddress;
30import org.onlab.packet.VlanId;
Esin Karaman73b0e572019-08-28 13:57:30 +000031import org.onlab.util.KryoNamespace;
Jonathan Hart28271642016-02-10 16:13:54 -080032import org.onosproject.cfg.ComponentConfigService;
Esin Karaman73b0e572019-08-28 13:57:30 +000033import org.onosproject.cluster.ClusterService;
34import org.onosproject.cluster.LeadershipService;
35import org.onosproject.cluster.NodeId;
alshabib3b1eadc2016-02-01 17:57:00 -080036import org.onosproject.core.ApplicationId;
37import org.onosproject.core.CoreService;
Esin Karaman73b0e572019-08-28 13:57:30 +000038import org.onosproject.mastership.MastershipService;
39import org.onosproject.mcast.api.McastEvent;
40import org.onosproject.mcast.api.McastListener;
41import org.onosproject.mcast.api.McastRoute;
42import org.onosproject.mcast.api.MulticastRouteService;
alshabib3b1eadc2016-02-01 17:57:00 -080043import org.onosproject.net.ConnectPoint;
Esin Karaman09050642020-03-06 14:31:14 +000044import org.onosproject.net.Device;
ke han9590c812017-02-28 15:02:26 +080045import org.onosproject.net.DeviceId;
Esin Karaman73b0e572019-08-28 13:57:30 +000046import org.onosproject.net.HostId;
ke han9590c812017-02-28 15:02:26 +080047import org.onosproject.net.PortNumber;
ke hanf1709e82016-08-12 10:48:17 +080048import org.onosproject.net.config.ConfigFactory;
49import org.onosproject.net.config.NetworkConfigEvent;
50import org.onosproject.net.config.NetworkConfigListener;
51import org.onosproject.net.config.NetworkConfigRegistry;
Esin Karaman73b0e572019-08-28 13:57:30 +000052import org.onosproject.net.config.basics.McastConfig;
ke hanf1709e82016-08-12 10:48:17 +080053import org.onosproject.net.config.basics.SubjectFactories;
Esin Karaman73b0e572019-08-28 13:57:30 +000054import org.onosproject.net.device.DeviceService;
alshabib3b1eadc2016-02-01 17:57:00 -080055import org.onosproject.net.flow.DefaultTrafficSelector;
56import org.onosproject.net.flow.DefaultTrafficTreatment;
ke han9590c812017-02-28 15:02:26 +080057import org.onosproject.net.flow.FlowRuleService;
alshabib3b1eadc2016-02-01 17:57:00 -080058import org.onosproject.net.flow.TrafficSelector;
59import org.onosproject.net.flowobjective.DefaultForwardingObjective;
60import org.onosproject.net.flowobjective.DefaultNextObjective;
Esin Karaman73b0e572019-08-28 13:57:30 +000061import org.onosproject.net.flowobjective.DefaultObjectiveContext;
alshabib3b1eadc2016-02-01 17:57:00 -080062import org.onosproject.net.flowobjective.FlowObjectiveService;
63import org.onosproject.net.flowobjective.ForwardingObjective;
64import org.onosproject.net.flowobjective.NextObjective;
65import org.onosproject.net.flowobjective.Objective;
66import org.onosproject.net.flowobjective.ObjectiveContext;
67import org.onosproject.net.flowobjective.ObjectiveError;
Esin Karaman73b0e572019-08-28 13:57:30 +000068import org.onosproject.store.serializers.KryoNamespaces;
69import org.onosproject.store.service.ConsistentMap;
70import org.onosproject.store.service.Serializer;
71import org.onosproject.store.service.StorageService;
72import org.onosproject.store.service.Versioned;
Esin Karaman09050642020-03-06 14:31:14 +000073import org.opencord.sadis.SadisService;
74import org.opencord.sadis.SubscriberAndDeviceInformation;
Jonathan Hart28271642016-02-10 16:13:54 -080075import org.osgi.service.component.ComponentContext;
alshabib3b1eadc2016-02-01 17:57:00 -080076import org.slf4j.Logger;
77
Jonathan Hart28271642016-02-10 16:13:54 -080078import java.util.Dictionary;
alshabib3b1eadc2016-02-01 17:57:00 -080079import java.util.Map;
ke han9590c812017-02-28 15:02:26 +080080import java.util.Objects;
Jonathan Hart0c194962016-05-23 17:08:15 -070081import java.util.Optional;
alshabibfc1cb032016-02-17 15:37:56 -080082import java.util.Properties;
Esin Karaman73b0e572019-08-28 13:57:30 +000083import java.util.Set;
84import java.util.concurrent.ExecutorService;
85import java.util.concurrent.locks.Lock;
86import java.util.concurrent.locks.ReentrantLock;
alshabib3b1eadc2016-02-01 17:57:00 -080087
alshabibfc1cb032016-02-17 15:37:56 -080088import static com.google.common.base.Strings.isNullOrEmpty;
Esin Karaman73b0e572019-08-28 13:57:30 +000089import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
alshabibfc1cb032016-02-17 15:37:56 -080090import static org.onlab.util.Tools.get;
Esin Karaman73b0e572019-08-28 13:57:30 +000091import static org.onlab.util.Tools.groupedThreads;
alshabib3b1eadc2016-02-01 17:57:00 -080092import static org.slf4j.LoggerFactory.getLogger;
93
Esin Karaman73b0e572019-08-28 13:57:30 +000094
alshabib3b1eadc2016-02-01 17:57:00 -080095/**
Jonathan Hartc3f84eb2016-02-19 12:44:36 -080096 * CORD multicast provisioning application. Operates by listening to
Jonathan Hart28271642016-02-10 16:13:54 -080097 * events on the multicast rib and provisioning groups to program multicast
alshabib3b1eadc2016-02-01 17:57:00 -080098 * flows on the dataplane.
99 */
100@Component(immediate = true)
101public class CordMcast {
Charles Chanf867c4b2017-01-20 11:22:25 -0800102 private static final String APP_NAME = "org.opencord.cordmcast";
alshabib3b1eadc2016-02-01 17:57:00 -0800103
Jonathan Hart0c194962016-05-23 17:08:15 -0700104 private final Logger log = getLogger(getClass());
alshabib09069c92016-02-21 14:49:51 -0800105
alshabib09069c92016-02-21 14:49:51 -0800106 private static final int DEFAULT_PRIORITY = 500;
alshabib3b1eadc2016-02-01 17:57:00 -0800107 private static final short DEFAULT_MCAST_VLAN = 4000;
alshabib09069c92016-02-21 14:49:51 -0800108 private static final boolean DEFAULT_VLAN_ENABLED = true;
alshabibfc1cb032016-02-17 15:37:56 -0800109
alshabib3b1eadc2016-02-01 17:57:00 -0800110 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
111 protected MulticastRouteService mcastService;
112
113 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
alshabib3b1eadc2016-02-01 17:57:00 -0800114 protected FlowObjectiveService flowObjectiveService;
115
116 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
117 protected CoreService coreService;
118
Jonathan Hart28271642016-02-10 16:13:54 -0800119 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jonathan Hart28271642016-02-10 16:13:54 -0800120 protected ComponentConfigService componentConfigService;
121
alshabib09069c92016-02-21 14:49:51 -0800122 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Esin Karaman09050642020-03-06 14:31:14 +0000123 protected SadisService sadisService;
alshabib09069c92016-02-21 14:49:51 -0800124
ke hanf1709e82016-08-12 10:48:17 +0800125 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
126 protected NetworkConfigRegistry networkConfig;
127
ke han9590c812017-02-28 15:02:26 +0800128 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
129 protected FlowRuleService flowRuleService;
130
Esin Karaman73b0e572019-08-28 13:57:30 +0000131 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
132 protected StorageService storageService;
133
134 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
135 protected MastershipService mastershipService;
136
137 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
138 public DeviceService deviceService;
139
140 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
141 private ClusterService clusterService;
142
143 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
144 private LeadershipService leadershipService;
145
alshabib3b1eadc2016-02-01 17:57:00 -0800146 protected McastListener listener = new InternalMulticastListener();
ke hanf1709e82016-08-12 10:48:17 +0800147 private InternalNetworkConfigListener configListener =
148 new InternalNetworkConfigListener();
alshabib3b1eadc2016-02-01 17:57:00 -0800149
Esin Karaman73b0e572019-08-28 13:57:30 +0000150 private ConsistentMap<NextKey, NextContent> groups;
alshabib3b1eadc2016-02-01 17:57:00 -0800151
alshabib3b1eadc2016-02-01 17:57:00 -0800152 private ApplicationId appId;
ke hanf1709e82016-08-12 10:48:17 +0800153 private ApplicationId coreAppId;
Esin Karaman73b0e572019-08-28 13:57:30 +0000154 private short mcastVlan = DEFAULT_MCAST_VLAN;
alshabib3b1eadc2016-02-01 17:57:00 -0800155
alshabib09069c92016-02-21 14:49:51 -0800156 @Property(name = "vlanEnabled", boolValue = DEFAULT_VLAN_ENABLED,
157 label = "Use vlan for multicast traffic?")
158 private boolean vlanEnabled = DEFAULT_VLAN_ENABLED;
alshabibfc1cb032016-02-17 15:37:56 -0800159
160 @Property(name = "priority", intValue = DEFAULT_PRIORITY,
161 label = "Priority for multicast rules")
alshabib3b1eadc2016-02-01 17:57:00 -0800162 private int priority = DEFAULT_PRIORITY;
163
ke hanf1709e82016-08-12 10:48:17 +0800164 private static final Class<McastConfig> CORD_MCAST_CONFIG_CLASS =
165 McastConfig.class;
166
167 private ConfigFactory<ApplicationId, McastConfig> cordMcastConfigFactory =
168 new ConfigFactory<ApplicationId, McastConfig>(
169 SubjectFactories.APP_SUBJECT_FACTORY, CORD_MCAST_CONFIG_CLASS, "multicast") {
170 @Override
171 public McastConfig createConfig() {
172 return new McastConfig();
173 }
174 };
Jonathan Hart28271642016-02-10 16:13:54 -0800175
Esin Karaman73b0e572019-08-28 13:57:30 +0000176 // lock to synchronize local operations
177 private final Lock mcastLock = new ReentrantLock();
178 private void mcastLock() {
179 mcastLock.lock();
180 }
181 private void mcastUnlock() {
182 mcastLock.unlock();
183 }
184 private ExecutorService eventExecutor;
185
alshabib3b1eadc2016-02-01 17:57:00 -0800186 @Activate
Jonathan Hart435ffc42016-02-19 10:32:05 -0800187 public void activate(ComponentContext context) {
Jonathan Hartc3f84eb2016-02-19 12:44:36 -0800188 componentConfigService.registerProperties(getClass());
Jonathan Hart435ffc42016-02-19 10:32:05 -0800189 modified(context);
190
Charles Chanf867c4b2017-01-20 11:22:25 -0800191 appId = coreService.registerApplication(APP_NAME);
ke hanf1709e82016-08-12 10:48:17 +0800192 coreAppId = coreService.registerApplication(CoreService.CORE_APP_NAME);
Jonathan Hart28271642016-02-10 16:13:54 -0800193
Esin Karaman73b0e572019-08-28 13:57:30 +0000194 eventExecutor = newSingleThreadScheduledExecutor(groupedThreads("cord/mcast",
195 "events-mcast-%d", log));
196
197 KryoNamespace.Builder groupsKryo = new KryoNamespace.Builder()
198 .register(KryoNamespaces.API)
199 .register(NextKey.class)
200 .register(NextContent.class);
201 groups = storageService
202 .<NextKey, NextContent>consistentMapBuilder()
203 .withName("cord-mcast-groups-store")
204 .withSerializer(Serializer.using(groupsKryo.build("CordMcast-Groups")))
205 .build();
206
ke hanf1709e82016-08-12 10:48:17 +0800207 networkConfig.registerConfigFactory(cordMcastConfigFactory);
208 networkConfig.addListener(configListener);
Jonathan Hartc3f84eb2016-02-19 12:44:36 -0800209 mcastService.addListener(listener);
210
alshabib09069c92016-02-21 14:49:51 -0800211 mcastService.getRoutes().stream()
Esin Karaman73b0e572019-08-28 13:57:30 +0000212 .map(r -> new ImmutablePair<>(r, mcastService.sinks(r)))
alshabib09069c92016-02-21 14:49:51 -0800213 .filter(pair -> pair.getRight() != null && !pair.getRight().isEmpty())
Esin Karaman73b0e572019-08-28 13:57:30 +0000214 .forEach(pair -> pair.getRight().forEach(sink -> addSink(pair.getLeft(),
215 sink)));
Jonathan Hartc3f84eb2016-02-19 12:44:36 -0800216
ke hanf1709e82016-08-12 10:48:17 +0800217 McastConfig config = networkConfig.getConfig(coreAppId, CORD_MCAST_CONFIG_CLASS);
Esin Karaman73b0e572019-08-28 13:57:30 +0000218 updateConfig(config);
ke hanf1709e82016-08-12 10:48:17 +0800219
alshabib3b1eadc2016-02-01 17:57:00 -0800220 log.info("Started");
221 }
222
223 @Deactivate
224 public void deactivate() {
Jonathan Hartc3f84eb2016-02-19 12:44:36 -0800225 componentConfigService.unregisterProperties(getClass(), false);
alshabib3b1eadc2016-02-01 17:57:00 -0800226 mcastService.removeListener(listener);
ke hanf1709e82016-08-12 10:48:17 +0800227 networkConfig.removeListener(configListener);
ke han9590c812017-02-28 15:02:26 +0800228 networkConfig.unregisterConfigFactory(cordMcastConfigFactory);
Esin Karaman73b0e572019-08-28 13:57:30 +0000229 eventExecutor.shutdown();
ke han9590c812017-02-28 15:02:26 +0800230 clearGroups();
Esin Karaman73b0e572019-08-28 13:57:30 +0000231 groups.destroy();
alshabib3b1eadc2016-02-01 17:57:00 -0800232 log.info("Stopped");
233 }
234
ke han9590c812017-02-28 15:02:26 +0800235 public void clearGroups() {
Esin Karaman73b0e572019-08-28 13:57:30 +0000236 mcastLock();
237 try {
238 groups.keySet().forEach(groupInfo -> {
239 if (!isLocalLeader(groupInfo.getDevice())) {
240 return;
241 }
242 NextContent next = groups.get(groupInfo).value();
243
Sonal Kasliwalee230262020-01-06 10:46:30 +0000244 if (next != null) {
245 ObjectiveContext context = new DefaultObjectiveContext(
246 (objective) -> log.debug("Successfully remove {}",
247 groupInfo.group),
248 (objective, error) -> log.warn("Failed to remove {}: {}",
249 groupInfo.group, error));
250 // remove the flow rule
251 flowObjectiveService.forward(groupInfo.getDevice(), fwdObject(next.getNextId(),
252 groupInfo.group).remove(context));
253 // remove all ports from the group
254 next.getOutPorts().stream().forEach(portNumber ->
255 flowObjectiveService.next(groupInfo.getDevice(), nextObject(next.getNextId(),
256 portNumber,
257 NextType.RemoveFromExisting,
258 groupInfo.group))
259 );
Esin Karaman73b0e572019-08-28 13:57:30 +0000260
Sonal Kasliwalee230262020-01-06 10:46:30 +0000261 }
Esin Karaman73b0e572019-08-28 13:57:30 +0000262 });
263 groups.clear();
264 } finally {
265 mcastUnlock();
266 }
267 }
268
269 private VlanId multicastVlan() {
270 return VlanId.vlanId(mcastVlan);
271 }
272
273 private VlanId assignedVlan() {
274 return vlanEnabled ? multicastVlan() : VlanId.NONE;
ke han9590c812017-02-28 15:02:26 +0800275 }
276
Jonathan Hart28271642016-02-10 16:13:54 -0800277 @Modified
278 public void modified(ComponentContext context) {
alshabibfc1cb032016-02-17 15:37:56 -0800279 Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
280
alshabibfc1cb032016-02-17 15:37:56 -0800281 try {
Esin Karaman73b0e572019-08-28 13:57:30 +0000282 String s = get(properties, "vlanEnabled");
alshabib09069c92016-02-21 14:49:51 -0800283 vlanEnabled = isNullOrEmpty(s) ? DEFAULT_VLAN_ENABLED : Boolean.parseBoolean(s.trim());
alshabibfc1cb032016-02-17 15:37:56 -0800284
285 s = get(properties, "priority");
286 priority = isNullOrEmpty(s) ? DEFAULT_PRIORITY : Integer.parseInt(s.trim());
287
alshabibfc1cb032016-02-17 15:37:56 -0800288 } catch (Exception e) {
Esin Karaman73b0e572019-08-28 13:57:30 +0000289 log.error("Unable to parse configuration parameter.", e);
alshabibfc1cb032016-02-17 15:37:56 -0800290 vlanEnabled = false;
291 priority = DEFAULT_PRIORITY;
292 }
Jonathan Hart28271642016-02-10 16:13:54 -0800293 }
294
alshabib3b1eadc2016-02-01 17:57:00 -0800295 private class InternalMulticastListener implements McastListener {
296 @Override
297 public void event(McastEvent event) {
Esin Karaman73b0e572019-08-28 13:57:30 +0000298 eventExecutor.execute(() -> {
299 switch (event.type()) {
300 case ROUTE_ADDED:
301 case ROUTE_REMOVED:
302 case SOURCES_ADDED:
303 break;
304 case SINKS_ADDED:
305 addSinks(event);
306 break;
307 case SINKS_REMOVED:
308 removeSinks(event);
309 break;
310 default:
311 log.warn("Unknown mcast event {}", event.type());
312 }
313 });
314 }
315 }
316
317 /**
318 * Processes previous, and new sinks then finds the sinks to be removed.
319 * @param prevSinks the previous sinks to be evaluated
320 * @param newSinks the new sinks to be evaluated
321 * @returnt the set of the sinks to be removed
322 */
323 private Set<ConnectPoint> getSinksToBeRemoved(Map<HostId, Set<ConnectPoint>> prevSinks,
324 Map<HostId, Set<ConnectPoint>> newSinks) {
325 return getSinksToBeProcessed(prevSinks, newSinks);
326 }
327
328
329 /**
330 * Processes previous, and new sinks then finds the sinks to be added.
331 * @param newSinks the new sinks to be processed
332 * @param allPrevSinks all previous sinks
333 * @return the set of the sinks to be added
334 */
335 private Set<ConnectPoint> getSinksToBeAdded(Map<HostId, Set<ConnectPoint>> newSinks,
336 Map<HostId, Set<ConnectPoint>> allPrevSinks) {
337 return getSinksToBeProcessed(newSinks, allPrevSinks);
338 }
339
340 /**
341 * Gets single-homed sinks that are in set1 but not in set2.
342 * @param sinkSet1 the first sink map
343 * @param sinkSet2 the second sink map
344 * @return a set containing all the single-homed sinks found in set1 but not in set2
345 */
346 private Set<ConnectPoint> getSinksToBeProcessed(Map<HostId, Set<ConnectPoint>> sinkSet1,
347 Map<HostId, Set<ConnectPoint>> sinkSet2) {
348 final Set<ConnectPoint> sinksToBeProcessed = Sets.newHashSet();
349 sinkSet1.forEach(((hostId, connectPoints) -> {
350 if (HostId.NONE.equals(hostId)) {
351 //assume all connect points associated with HostId.NONE are single homed sinks
352 sinksToBeProcessed.addAll(connectPoints);
353 return;
354 }
355 }));
356 Set<ConnectPoint> singleHomedSinksOfSet2 = sinkSet2.get(HostId.NONE) == null ?
357 Sets.newHashSet() :
358 sinkSet2.get(HostId.NONE);
359 return Sets.difference(sinksToBeProcessed, singleHomedSinksOfSet2);
360 };
361
362
363 private void removeSinks(McastEvent event) {
364 mcastLock();
365 try {
366 Set<ConnectPoint> sinksToBeRemoved = getSinksToBeRemoved(event.prevSubject().sinks(),
367 event.subject().sinks());
368 sinksToBeRemoved.forEach(sink -> removeSink(event.subject().route().group(), sink));
369 } finally {
370 mcastUnlock();
371 }
372 }
373
374 private void removeSink(IpAddress group, ConnectPoint sink) {
375 if (!isLocalLeader(sink.deviceId())) {
376 log.debug("Not the leader of {}. Skip sink_removed event for the sink {} and group {}",
377 sink.deviceId(), sink, group);
378 return;
379 }
380
Esin Karaman09050642020-03-06 14:31:14 +0000381 Optional<SubscriberAndDeviceInformation> oltInfo = getSubscriberAndDeviceInformation(sink.deviceId());
Esin Karaman73b0e572019-08-28 13:57:30 +0000382
383 if (!oltInfo.isPresent()) {
384 log.warn("Unknown OLT device : {}", sink.deviceId());
385 return;
386 }
387
388 log.debug("Removing sink {} from the group {}", sink, group);
389
390 NextKey key = new NextKey(sink.deviceId(), group);
391 groups.computeIfPresent(key, (k, v) -> {
392 flowObjectiveService.next(sink.deviceId(), nextObject(v.getNextId(), sink.port(),
393 NextType.RemoveFromExisting, group));
394
395 Set<PortNumber> outPorts = Sets.newHashSet(v.getOutPorts());
396 outPorts.remove(sink.port());
397
398 if (outPorts.isEmpty()) {
399 // this is the last sink
400 ObjectiveContext context = new DefaultObjectiveContext(
401 (objective) -> log.debug("Successfully remove {} on {}",
402 group, sink),
403 (objective, error) -> log.warn("Failed to remove {} on {}: {}",
404 group, sink, error));
405 ForwardingObjective fwdObj = fwdObject(v.getNextId(), group).remove(context);
406 flowObjectiveService.forward(sink.deviceId(), fwdObj);
407 }
408 // remove the whole entity if no out port exists in the port list
409 return outPorts.isEmpty() ? null : new NextContent(v.getNextId(),
410 ImmutableSet.copyOf(outPorts));
411 });
412 }
413
414 private void addSinks(McastEvent event) {
415 mcastLock();
416 try {
417 Set<ConnectPoint> sinksToBeAdded = getSinksToBeAdded(event.subject().sinks(),
418 event.prevSubject().sinks());
419 sinksToBeAdded.forEach(sink -> addSink(event.subject().route(), sink));
420 } finally {
421 mcastUnlock();
422 }
423 }
424
425 private void addSink(McastRoute route, ConnectPoint sink) {
426 if (!isLocalLeader(sink.deviceId())) {
427 log.debug("Not the leader of {}. Skip sink_added event for the sink {} and group {}",
428 sink.deviceId(), sink, route.group());
429 return;
430 }
431
Esin Karaman09050642020-03-06 14:31:14 +0000432 Optional<SubscriberAndDeviceInformation> oltInfo = getSubscriberAndDeviceInformation(sink.deviceId());
Esin Karaman73b0e572019-08-28 13:57:30 +0000433
434 if (!oltInfo.isPresent()) {
435 log.warn("Unknown OLT device : {}", sink.deviceId());
436 return;
437 }
438
439 log.debug("Adding sink {} to the group {}", sink, route.group());
440
441 NextKey key = new NextKey(sink.deviceId(), route.group());
442 NextObjective newNextObj;
443
444 boolean theFirstSinkOfGroup = false;
445 if (!groups.containsKey(key)) {
446 // First time someone request this mcast group via this device
447 Integer nextId = flowObjectiveService.allocateNextId();
448 newNextObj = nextObject(nextId, sink.port(), NextType.AddNew, route.group());
449 // Store the new port
450 groups.put(key, new NextContent(nextId, ImmutableSet.of(sink.port())));
451 theFirstSinkOfGroup = true;
452 } else {
453 // This device already serves some subscribers of this mcast group
454 Versioned<NextContent> nextObj = groups.get(key);
455 if (nextObj.value().getOutPorts().contains(sink.port())) {
456 log.info("Group {} already serves the sink connected to {}", route.group(), sink);
457 return;
458 }
459 newNextObj = nextObject(nextObj.value().getNextId(), sink.port(),
460 NextType.AddToExisting, route.group());
461 // add new port to the group
462 Set<PortNumber> outPorts = Sets.newHashSet(nextObj.value().getOutPorts());
463 outPorts.add(sink.port());
464 groups.put(key, new NextContent(newNextObj.id(), ImmutableSet.copyOf(outPorts)));
465 }
466
467 ObjectiveContext context = new DefaultObjectiveContext(
468 (objective) -> log.debug("Successfully add {} on {}/{}, vlan {}",
469 route.group(), sink.deviceId(), sink.port().toLong(),
470 assignedVlan()),
471 (objective, error) -> {
472 log.warn("Failed to add {} on {}/{}, vlan {}: {}",
473 route.group(), sink.deviceId(), sink.port().toLong(), assignedVlan(),
474 error);
475 });
476
477 flowObjectiveService.next(sink.deviceId(), newNextObj);
478
479 if (theFirstSinkOfGroup) {
480 // create the necessary flow rule if this is the first sink request for the group
481 // on this device
482 flowObjectiveService.forward(sink.deviceId(), fwdObject(newNextObj.id(),
483 route.group()).add(context));
484 }
485 }
486
Esin Karaman09050642020-03-06 14:31:14 +0000487 /**
488 * Fetches device information associated with the device serial number from SADIS.
489 *
490 * @param serialNumber serial number of a device
491 * @return device information; an empty Optional otherwise.
492 */
493 private Optional<SubscriberAndDeviceInformation> getSubscriberAndDeviceInformation(String serialNumber) {
494 long start = System.currentTimeMillis();
495 try {
496 return Optional.ofNullable(sadisService.getSubscriberInfoService().get(serialNumber));
497 } finally {
498 if (log.isDebugEnabled()) {
499 // SADIS may call remote systems to fetch device data and this calls can take a long time.
500 // This measurement is just for monitoring these kinds of situations.
501 log.debug("Device fetched from SADIS. Elapsed {} msec", System.currentTimeMillis() - start);
502 }
503
504 }
505 }
506
507 /**
508 * Fetches device information associated with the device serial number from SADIS.
509 *
510 * @param deviceId device id
511 * @return device information; an empty Optional otherwise.
512 */
513 private Optional<SubscriberAndDeviceInformation> getSubscriberAndDeviceInformation(DeviceId deviceId) {
514 Device device = deviceService.getDevice(deviceId);
515 if (device == null || device.serialNumber() == null) {
516 return Optional.empty();
517 }
518 return getSubscriberAndDeviceInformation(device.serialNumber());
519 }
520
Esin Karaman73b0e572019-08-28 13:57:30 +0000521 private class InternalNetworkConfigListener implements NetworkConfigListener {
522 @Override
523 public void event(NetworkConfigEvent event) {
524 eventExecutor.execute(() -> {
525 switch (event.type()) {
526
527 case CONFIG_ADDED:
528 case CONFIG_UPDATED:
529 if (event.configClass().equals(CORD_MCAST_CONFIG_CLASS)) {
530 McastConfig config = networkConfig.getConfig(coreAppId, CORD_MCAST_CONFIG_CLASS);
531 if (config != null) {
532 //TODO: Simply remove flows/groups, hosts will response period query
533 // and re-sent IGMP report, so the flows can be rebuild.
534 // However, better to remove and re-add mcast flow rules here
535 if (mcastVlan != config.egressVlan().toShort() && vlanEnabled) {
536 clearGroups();
537 }
538 updateConfig(config);
539 }
540 }
541 break;
542 case CONFIG_REGISTERED:
543 case CONFIG_UNREGISTERED:
544 case CONFIG_REMOVED:
545 break;
546 default:
547 break;
548 }
549 });
550 }
551 }
552
553 private void updateConfig(McastConfig config) {
554 if (config == null) {
555 return;
556 }
557 log.debug("multicast config received: {}", config);
558
559 if (config.egressVlan() != null) {
560 mcastVlan = config.egressVlan().toShort();
561 }
562 }
563
564 private class NextKey {
565 private DeviceId device;
566 private IpAddress group;
567
568 public NextKey(DeviceId deviceId, IpAddress groupAddress) {
569 device = deviceId;
570 group = groupAddress;
571 }
572
573 public DeviceId getDevice() {
574 return device;
575 }
576
577 public int hashCode() {
578 return Objects.hash(this.device, this.group);
579 }
580
581 public boolean equals(Object obj) {
582 if (this == obj) {
583 return true;
584 } else if (!(obj instanceof NextKey)) {
585 return false;
586 } else {
587 NextKey that = (NextKey) obj;
588 return this.getClass() == that.getClass() &&
589 Objects.equals(this.device, that.device) &&
590 Objects.equals(this.group, that.group);
591 }
592 }
593 }
594
595 private class NextContent {
596 private Integer nextId;
597 private Set<PortNumber> outPorts;
598
599 public NextContent(Integer nextId, Set<PortNumber> outPorts) {
600 this.nextId = nextId;
601 this.outPorts = outPorts;
602 }
603
604 public Integer getNextId() {
605 return nextId;
606 }
607
608 public Set<PortNumber> getOutPorts() {
609 return ImmutableSet.copyOf(outPorts);
610 }
611
612 public int hashCode() {
613 return Objects.hash(this.nextId, this.outPorts);
614 }
615
616 public boolean equals(Object obj) {
617 if (this == obj) {
618 return true;
619 } else if (!(obj instanceof NextContent)) {
620 return false;
621 } else {
622 NextContent that = (NextContent) obj;
623 return this.getClass() == that.getClass() &&
624 Objects.equals(this.nextId, that.nextId) &&
625 Objects.equals(this.outPorts, that.outPorts);
alshabib3b1eadc2016-02-01 17:57:00 -0800626 }
627 }
628 }
629
ke han9590c812017-02-28 15:02:26 +0800630 private enum NextType { AddNew, AddToExisting, Remove, RemoveFromExisting };
631
Esin Karaman73b0e572019-08-28 13:57:30 +0000632 private NextObjective nextObject(Integer nextId, PortNumber port,
633 NextType nextType, IpAddress mcastIp) {
ke han9590c812017-02-28 15:02:26 +0800634
Esin Karaman73b0e572019-08-28 13:57:30 +0000635 // Build the meta selector with the fwd objective info
636 TrafficSelector.Builder metadata = DefaultTrafficSelector.builder()
637 .matchIPDst(mcastIp.toIpPrefix());
638
639 if (vlanEnabled) {
640 metadata.matchVlanId(multicastVlan());
641 }
642
ke han9590c812017-02-28 15:02:26 +0800643 DefaultNextObjective.Builder build = DefaultNextObjective.builder()
644 .fromApp(appId)
645 .addTreatment(DefaultTrafficTreatment.builder().setOutput(port).build())
646 .withType(NextObjective.Type.BROADCAST)
Esin Karaman73b0e572019-08-28 13:57:30 +0000647 .withId(nextId)
648 .withMeta(metadata.build());
649
ke han9590c812017-02-28 15:02:26 +0800650 ObjectiveContext content = new ObjectiveContext() {
651 @Override
652 public void onSuccess(Objective objective) {
Esin Karaman73b0e572019-08-28 13:57:30 +0000653 log.debug("Next Objective {} installed", objective.id());
ke han9590c812017-02-28 15:02:26 +0800654 }
655
656 @Override
657 public void onError(Objective objective, ObjectiveError error) {
Esin Karaman73b0e572019-08-28 13:57:30 +0000658 log.debug("Next Objective {} failed, because {}",
659 objective.id(),
660 error);
ke han9590c812017-02-28 15:02:26 +0800661 }
662 };
663
664 switch (nextType) {
665 case AddNew:
666 return build.add(content);
667 case AddToExisting:
668 return build.addToExisting(content);
669 case Remove:
670 return build.remove(content);
671 case RemoveFromExisting:
672 return build.removeFromExisting(content);
673 default:
674 return null;
675 }
676 }
677
Esin Karaman73b0e572019-08-28 13:57:30 +0000678 private ForwardingObjective.Builder fwdObject(int nextId, IpAddress mcastIp) {
679 TrafficSelector.Builder mcast = DefaultTrafficSelector.builder()
680 .matchEthType(Ethernet.TYPE_IPV4)
681 .matchIPDst(mcastIp.toIpPrefix());
ke han9590c812017-02-28 15:02:26 +0800682
Esin Karaman73b0e572019-08-28 13:57:30 +0000683 //build the meta selector
684 TrafficSelector.Builder metabuilder = DefaultTrafficSelector.builder();
685 if (vlanEnabled) {
686 metabuilder.matchVlanId(multicastVlan());
Jonathan Hart718c0452016-02-18 15:56:22 -0800687 }
688
Esin Karaman73b0e572019-08-28 13:57:30 +0000689 ForwardingObjective.Builder fwdBuilder = DefaultForwardingObjective.builder()
690 .fromApp(appId)
691 .nextStep(nextId)
692 .makePermanent()
693 .withFlag(ForwardingObjective.Flag.SPECIFIC)
694 .withPriority(priority)
695 .withSelector(mcast.build())
696 .withMeta(metabuilder.build());
alshabibfc1cb032016-02-17 15:37:56 -0800697
Esin Karaman73b0e572019-08-28 13:57:30 +0000698 return fwdBuilder;
alshabibfc1cb032016-02-17 15:37:56 -0800699 }
700
Esin Karaman73b0e572019-08-28 13:57:30 +0000701 // Custom-built function, when the device is not available we need a fallback mechanism
702 private boolean isLocalLeader(DeviceId deviceId) {
703 if (!mastershipService.isLocalMaster(deviceId)) {
704 // When the device is available we just check the mastership
705 if (deviceService.isAvailable(deviceId)) {
ke han9590c812017-02-28 15:02:26 +0800706 return false;
ke han9590c812017-02-28 15:02:26 +0800707 }
Esin Karaman73b0e572019-08-28 13:57:30 +0000708 // Fallback with Leadership service - device id is used as topic
709 NodeId leader = leadershipService.runForLeadership(
710 deviceId.toString()).leaderNodeId();
711 // Verify if this node is the leader
712 return clusterService.getLocalNode().id().equals(leader);
ke han9590c812017-02-28 15:02:26 +0800713 }
Esin Karaman73b0e572019-08-28 13:57:30 +0000714 return true;
ke han9590c812017-02-28 15:02:26 +0800715 }
Esin Karaman73b0e572019-08-28 13:57:30 +0000716
alshabib3b1eadc2016-02-01 17:57:00 -0800717}
ke hanf1709e82016-08-12 10:48:17 +0800718
ke han9590c812017-02-28 15:02:26 +0800719