blob: 108ceda7ad4a97827aecd655a834e6cdc4930184 [file] [log] [blame]
alshabib3b1eadc2016-02-01 17:57:00 -08001/*
Brian O'Connorcf85aa82017-08-03 22:46:01 -07002 * Copyright 2016-present Open Networking Foundation
alshabib3b1eadc2016-02-01 17:57:00 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
alshabib772e1582016-06-01 17:50:05 -070016package org.opencord.cordmcast;
alshabib3b1eadc2016-02-01 17:57:00 -080017
Esin Karaman39b24852019-08-28 13:57:30 +000018import com.google.common.collect.ImmutableSet;
19import com.google.common.collect.Sets;
alshabib09069c92016-02-21 14:49:51 -080020import org.apache.commons.lang3.tuple.ImmutablePair;
Esin Karaman996177c2020-03-05 13:21:09 +000021import org.onosproject.net.Device;
22import org.opencord.sadis.SadisService;
23import org.opencord.sadis.SubscriberAndDeviceInformation;
Carmelo Cascone995fd682019-11-14 14:22:39 -080024import org.osgi.service.component.annotations.Activate;
25import org.osgi.service.component.annotations.Component;
26import org.osgi.service.component.annotations.Deactivate;
27import org.osgi.service.component.annotations.Modified;
28import org.osgi.service.component.annotations.Reference;
29import org.osgi.service.component.annotations.ReferenceCardinality;
alshabib3b1eadc2016-02-01 17:57:00 -080030import org.onlab.packet.Ethernet;
alshabib3b1eadc2016-02-01 17:57:00 -080031import org.onlab.packet.IpAddress;
32import org.onlab.packet.VlanId;
Esin Karaman39b24852019-08-28 13:57:30 +000033import org.onlab.util.KryoNamespace;
Jonathan Hart28271642016-02-10 16:13:54 -080034import org.onosproject.cfg.ComponentConfigService;
Esin Karaman39b24852019-08-28 13:57:30 +000035import org.onosproject.cluster.ClusterService;
36import org.onosproject.cluster.LeadershipService;
37import org.onosproject.cluster.NodeId;
alshabib3b1eadc2016-02-01 17:57:00 -080038import org.onosproject.core.ApplicationId;
39import org.onosproject.core.CoreService;
Esin Karaman39b24852019-08-28 13:57:30 +000040import org.onosproject.mastership.MastershipService;
41import org.onosproject.mcast.api.McastEvent;
42import org.onosproject.mcast.api.McastListener;
43import org.onosproject.mcast.api.McastRoute;
44import org.onosproject.mcast.api.MulticastRouteService;
alshabib3b1eadc2016-02-01 17:57:00 -080045import org.onosproject.net.ConnectPoint;
ke han9590c812017-02-28 15:02:26 +080046import org.onosproject.net.DeviceId;
Esin Karaman39b24852019-08-28 13:57:30 +000047import org.onosproject.net.HostId;
ke han9590c812017-02-28 15:02:26 +080048import org.onosproject.net.PortNumber;
ke hanf1709e82016-08-12 10:48:17 +080049import org.onosproject.net.config.ConfigFactory;
50import org.onosproject.net.config.NetworkConfigEvent;
51import org.onosproject.net.config.NetworkConfigListener;
52import org.onosproject.net.config.NetworkConfigRegistry;
Esin Karaman39b24852019-08-28 13:57:30 +000053import org.onosproject.net.config.basics.McastConfig;
ke hanf1709e82016-08-12 10:48:17 +080054import org.onosproject.net.config.basics.SubjectFactories;
Esin Karaman39b24852019-08-28 13:57:30 +000055import org.onosproject.net.device.DeviceService;
alshabib3b1eadc2016-02-01 17:57:00 -080056import org.onosproject.net.flow.DefaultTrafficSelector;
57import org.onosproject.net.flow.DefaultTrafficTreatment;
58import org.onosproject.net.flow.TrafficSelector;
59import org.onosproject.net.flowobjective.DefaultForwardingObjective;
60import org.onosproject.net.flowobjective.DefaultNextObjective;
Esin Karaman39b24852019-08-28 13:57:30 +000061import org.onosproject.net.flowobjective.DefaultObjectiveContext;
alshabib3b1eadc2016-02-01 17:57:00 -080062import org.onosproject.net.flowobjective.FlowObjectiveService;
63import org.onosproject.net.flowobjective.ForwardingObjective;
64import org.onosproject.net.flowobjective.NextObjective;
65import org.onosproject.net.flowobjective.Objective;
66import org.onosproject.net.flowobjective.ObjectiveContext;
67import org.onosproject.net.flowobjective.ObjectiveError;
Esin Karaman39b24852019-08-28 13:57:30 +000068import org.onosproject.store.serializers.KryoNamespaces;
69import org.onosproject.store.service.ConsistentMap;
70import org.onosproject.store.service.Serializer;
71import org.onosproject.store.service.StorageService;
72import org.onosproject.store.service.Versioned;
Jonathan Hart28271642016-02-10 16:13:54 -080073import org.osgi.service.component.ComponentContext;
alshabib3b1eadc2016-02-01 17:57:00 -080074import org.slf4j.Logger;
75
Jonathan Hart28271642016-02-10 16:13:54 -080076import java.util.Dictionary;
alshabib3b1eadc2016-02-01 17:57:00 -080077import java.util.Map;
ke han9590c812017-02-28 15:02:26 +080078import java.util.Objects;
Jonathan Hart0c194962016-05-23 17:08:15 -070079import java.util.Optional;
alshabibfc1cb032016-02-17 15:37:56 -080080import java.util.Properties;
Esin Karaman39b24852019-08-28 13:57:30 +000081import java.util.Set;
82import java.util.concurrent.ExecutorService;
83import java.util.concurrent.locks.Lock;
84import java.util.concurrent.locks.ReentrantLock;
alshabib3b1eadc2016-02-01 17:57:00 -080085
alshabibfc1cb032016-02-17 15:37:56 -080086import static com.google.common.base.Strings.isNullOrEmpty;
Esin Karaman39b24852019-08-28 13:57:30 +000087import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
alshabibfc1cb032016-02-17 15:37:56 -080088import static org.onlab.util.Tools.get;
Esin Karaman39b24852019-08-28 13:57:30 +000089import static org.onlab.util.Tools.groupedThreads;
Carmelo Cascone995fd682019-11-14 14:22:39 -080090import static org.opencord.cordmcast.OsgiPropertyConstants.DEFAULT_VLAN_ENABLED;
91import static org.opencord.cordmcast.OsgiPropertyConstants.DEFAULT_PRIORITY;
92import static org.opencord.cordmcast.OsgiPropertyConstants.PRIORITY;
93import static org.opencord.cordmcast.OsgiPropertyConstants.VLAN_ENABLED;
alshabib3b1eadc2016-02-01 17:57:00 -080094import static org.slf4j.LoggerFactory.getLogger;
95
Esin Karaman39b24852019-08-28 13:57:30 +000096
alshabib3b1eadc2016-02-01 17:57:00 -080097/**
Jonathan Hartc3f84eb2016-02-19 12:44:36 -080098 * CORD multicast provisioning application. Operates by listening to
Jonathan Hart28271642016-02-10 16:13:54 -080099 * events on the multicast rib and provisioning groups to program multicast
alshabib3b1eadc2016-02-01 17:57:00 -0800100 * flows on the dataplane.
101 */
Carmelo Cascone995fd682019-11-14 14:22:39 -0800102@Component(immediate = true,
103 property = {
104 VLAN_ENABLED + ":Boolean=" + DEFAULT_VLAN_ENABLED,
105 PRIORITY + ":Integer=" + DEFAULT_PRIORITY,
106})
alshabib3b1eadc2016-02-01 17:57:00 -0800107public class CordMcast {
Charles Chanf867c4b2017-01-20 11:22:25 -0800108 private static final String APP_NAME = "org.opencord.cordmcast";
alshabib3b1eadc2016-02-01 17:57:00 -0800109
Jonathan Hart0c194962016-05-23 17:08:15 -0700110 private final Logger log = getLogger(getClass());
alshabib09069c92016-02-21 14:49:51 -0800111
alshabib09069c92016-02-21 14:49:51 -0800112 private static final int DEFAULT_PRIORITY = 500;
alshabib3b1eadc2016-02-01 17:57:00 -0800113 private static final short DEFAULT_MCAST_VLAN = 4000;
alshabibfc1cb032016-02-17 15:37:56 -0800114
Carmelo Cascone995fd682019-11-14 14:22:39 -0800115 @Reference(cardinality = ReferenceCardinality.MANDATORY)
alshabib3b1eadc2016-02-01 17:57:00 -0800116 protected MulticastRouteService mcastService;
117
Carmelo Cascone995fd682019-11-14 14:22:39 -0800118 @Reference(cardinality = ReferenceCardinality.MANDATORY)
alshabib3b1eadc2016-02-01 17:57:00 -0800119 protected FlowObjectiveService flowObjectiveService;
120
Carmelo Cascone995fd682019-11-14 14:22:39 -0800121 @Reference(cardinality = ReferenceCardinality.MANDATORY)
alshabib3b1eadc2016-02-01 17:57:00 -0800122 protected CoreService coreService;
123
Carmelo Cascone995fd682019-11-14 14:22:39 -0800124 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jonathan Hart28271642016-02-10 16:13:54 -0800125 protected ComponentConfigService componentConfigService;
126
Carmelo Cascone995fd682019-11-14 14:22:39 -0800127 @Reference(cardinality = ReferenceCardinality.MANDATORY)
ke hanf1709e82016-08-12 10:48:17 +0800128 protected NetworkConfigRegistry networkConfig;
129
Carmelo Cascone995fd682019-11-14 14:22:39 -0800130 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Esin Karaman39b24852019-08-28 13:57:30 +0000131 protected StorageService storageService;
132
Carmelo Cascone995fd682019-11-14 14:22:39 -0800133 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Esin Karaman39b24852019-08-28 13:57:30 +0000134 protected MastershipService mastershipService;
135
Carmelo Cascone995fd682019-11-14 14:22:39 -0800136 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Esin Karaman39b24852019-08-28 13:57:30 +0000137 public DeviceService deviceService;
138
Carmelo Cascone995fd682019-11-14 14:22:39 -0800139 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Esin Karaman39b24852019-08-28 13:57:30 +0000140 private ClusterService clusterService;
141
Carmelo Cascone995fd682019-11-14 14:22:39 -0800142 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Esin Karaman39b24852019-08-28 13:57:30 +0000143 private LeadershipService leadershipService;
144
Esin Karaman996177c2020-03-05 13:21:09 +0000145 @Reference(cardinality = ReferenceCardinality.MANDATORY)
146 protected SadisService sadisService;
147
alshabib3b1eadc2016-02-01 17:57:00 -0800148 protected McastListener listener = new InternalMulticastListener();
ke hanf1709e82016-08-12 10:48:17 +0800149 private InternalNetworkConfigListener configListener =
150 new InternalNetworkConfigListener();
alshabib3b1eadc2016-02-01 17:57:00 -0800151
Esin Karaman39b24852019-08-28 13:57:30 +0000152 private ConsistentMap<NextKey, NextContent> groups;
alshabib3b1eadc2016-02-01 17:57:00 -0800153
alshabib3b1eadc2016-02-01 17:57:00 -0800154 private ApplicationId appId;
ke hanf1709e82016-08-12 10:48:17 +0800155 private ApplicationId coreAppId;
Esin Karaman39b24852019-08-28 13:57:30 +0000156 private short mcastVlan = DEFAULT_MCAST_VLAN;
alshabib3b1eadc2016-02-01 17:57:00 -0800157
Carmelo Cascone995fd682019-11-14 14:22:39 -0800158 /**
159 * Whether to use VLAN for multicast traffic.
160 **/
alshabib09069c92016-02-21 14:49:51 -0800161 private boolean vlanEnabled = DEFAULT_VLAN_ENABLED;
alshabibfc1cb032016-02-17 15:37:56 -0800162
Carmelo Cascone995fd682019-11-14 14:22:39 -0800163 /**
164 * Priority for multicast rules.
165 **/
alshabib3b1eadc2016-02-01 17:57:00 -0800166 private int priority = DEFAULT_PRIORITY;
167
ke hanf1709e82016-08-12 10:48:17 +0800168 private static final Class<McastConfig> CORD_MCAST_CONFIG_CLASS =
169 McastConfig.class;
170
171 private ConfigFactory<ApplicationId, McastConfig> cordMcastConfigFactory =
172 new ConfigFactory<ApplicationId, McastConfig>(
173 SubjectFactories.APP_SUBJECT_FACTORY, CORD_MCAST_CONFIG_CLASS, "multicast") {
174 @Override
175 public McastConfig createConfig() {
176 return new McastConfig();
177 }
178 };
Jonathan Hart28271642016-02-10 16:13:54 -0800179
Esin Karaman39b24852019-08-28 13:57:30 +0000180 // lock to synchronize local operations
181 private final Lock mcastLock = new ReentrantLock();
182 private void mcastLock() {
183 mcastLock.lock();
184 }
185 private void mcastUnlock() {
186 mcastLock.unlock();
187 }
188 private ExecutorService eventExecutor;
189
alshabib3b1eadc2016-02-01 17:57:00 -0800190 @Activate
Jonathan Hart435ffc42016-02-19 10:32:05 -0800191 public void activate(ComponentContext context) {
Jonathan Hartc3f84eb2016-02-19 12:44:36 -0800192 componentConfigService.registerProperties(getClass());
Jonathan Hart435ffc42016-02-19 10:32:05 -0800193 modified(context);
194
Charles Chanf867c4b2017-01-20 11:22:25 -0800195 appId = coreService.registerApplication(APP_NAME);
ke hanf1709e82016-08-12 10:48:17 +0800196 coreAppId = coreService.registerApplication(CoreService.CORE_APP_NAME);
Jonathan Hart28271642016-02-10 16:13:54 -0800197
Esin Karaman39b24852019-08-28 13:57:30 +0000198 eventExecutor = newSingleThreadScheduledExecutor(groupedThreads("cord/mcast",
199 "events-mcast-%d", log));
200
201 KryoNamespace.Builder groupsKryo = new KryoNamespace.Builder()
202 .register(KryoNamespaces.API)
203 .register(NextKey.class)
204 .register(NextContent.class);
205 groups = storageService
206 .<NextKey, NextContent>consistentMapBuilder()
207 .withName("cord-mcast-groups-store")
208 .withSerializer(Serializer.using(groupsKryo.build("CordMcast-Groups")))
209 .build();
210
ke hanf1709e82016-08-12 10:48:17 +0800211 networkConfig.registerConfigFactory(cordMcastConfigFactory);
212 networkConfig.addListener(configListener);
Jonathan Hartc3f84eb2016-02-19 12:44:36 -0800213 mcastService.addListener(listener);
214
alshabib09069c92016-02-21 14:49:51 -0800215 mcastService.getRoutes().stream()
Esin Karaman39b24852019-08-28 13:57:30 +0000216 .map(r -> new ImmutablePair<>(r, mcastService.sinks(r)))
alshabib09069c92016-02-21 14:49:51 -0800217 .filter(pair -> pair.getRight() != null && !pair.getRight().isEmpty())
Esin Karaman39b24852019-08-28 13:57:30 +0000218 .forEach(pair -> pair.getRight().forEach(sink -> addSink(pair.getLeft(),
219 sink)));
Jonathan Hartc3f84eb2016-02-19 12:44:36 -0800220
ke hanf1709e82016-08-12 10:48:17 +0800221 McastConfig config = networkConfig.getConfig(coreAppId, CORD_MCAST_CONFIG_CLASS);
Esin Karaman39b24852019-08-28 13:57:30 +0000222 updateConfig(config);
ke hanf1709e82016-08-12 10:48:17 +0800223
alshabib3b1eadc2016-02-01 17:57:00 -0800224 log.info("Started");
225 }
226
227 @Deactivate
228 public void deactivate() {
Jonathan Hartc3f84eb2016-02-19 12:44:36 -0800229 componentConfigService.unregisterProperties(getClass(), false);
alshabib3b1eadc2016-02-01 17:57:00 -0800230 mcastService.removeListener(listener);
ke hanf1709e82016-08-12 10:48:17 +0800231 networkConfig.removeListener(configListener);
ke han9590c812017-02-28 15:02:26 +0800232 networkConfig.unregisterConfigFactory(cordMcastConfigFactory);
Esin Karaman39b24852019-08-28 13:57:30 +0000233 eventExecutor.shutdown();
ke han9590c812017-02-28 15:02:26 +0800234 clearGroups();
Esin Karaman39b24852019-08-28 13:57:30 +0000235 groups.destroy();
alshabib3b1eadc2016-02-01 17:57:00 -0800236 log.info("Stopped");
237 }
238
ke han9590c812017-02-28 15:02:26 +0800239 public void clearGroups() {
Esin Karaman39b24852019-08-28 13:57:30 +0000240 mcastLock();
241 try {
242 groups.keySet().forEach(groupInfo -> {
243 if (!isLocalLeader(groupInfo.getDevice())) {
244 return;
245 }
246 NextContent next = groups.get(groupInfo).value();
247
Sonal Kasliwala0bbe6c2020-01-06 10:46:30 +0000248 if (next != null) {
249 ObjectiveContext context = new DefaultObjectiveContext(
250 (objective) -> log.debug("Successfully remove {}",
251 groupInfo.group),
252 (objective, error) -> log.warn("Failed to remove {}: {}",
253 groupInfo.group, error));
254 // remove the flow rule
255 flowObjectiveService.forward(groupInfo.getDevice(), fwdObject(next.getNextId(),
256 groupInfo.group).remove(context));
257 // remove all ports from the group
258 next.getOutPorts().stream().forEach(portNumber ->
259 flowObjectiveService.next(groupInfo.getDevice(), nextObject(next.getNextId(),
260 portNumber,
261 NextType.RemoveFromExisting,
262 groupInfo.group))
263 );
Esin Karaman39b24852019-08-28 13:57:30 +0000264
Sonal Kasliwala0bbe6c2020-01-06 10:46:30 +0000265 }
Esin Karaman39b24852019-08-28 13:57:30 +0000266 });
267 groups.clear();
268 } finally {
269 mcastUnlock();
270 }
271 }
272
273 private VlanId multicastVlan() {
274 return VlanId.vlanId(mcastVlan);
275 }
276
277 private VlanId assignedVlan() {
278 return vlanEnabled ? multicastVlan() : VlanId.NONE;
ke han9590c812017-02-28 15:02:26 +0800279 }
280
Jonathan Hart28271642016-02-10 16:13:54 -0800281 @Modified
282 public void modified(ComponentContext context) {
alshabibfc1cb032016-02-17 15:37:56 -0800283 Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
284
alshabibfc1cb032016-02-17 15:37:56 -0800285 try {
Esin Karaman39b24852019-08-28 13:57:30 +0000286 String s = get(properties, "vlanEnabled");
alshabib09069c92016-02-21 14:49:51 -0800287 vlanEnabled = isNullOrEmpty(s) ? DEFAULT_VLAN_ENABLED : Boolean.parseBoolean(s.trim());
alshabibfc1cb032016-02-17 15:37:56 -0800288
289 s = get(properties, "priority");
290 priority = isNullOrEmpty(s) ? DEFAULT_PRIORITY : Integer.parseInt(s.trim());
291
alshabibfc1cb032016-02-17 15:37:56 -0800292 } catch (Exception e) {
Esin Karaman39b24852019-08-28 13:57:30 +0000293 log.error("Unable to parse configuration parameter.", e);
alshabibfc1cb032016-02-17 15:37:56 -0800294 vlanEnabled = false;
295 priority = DEFAULT_PRIORITY;
296 }
Jonathan Hart28271642016-02-10 16:13:54 -0800297 }
298
alshabib3b1eadc2016-02-01 17:57:00 -0800299 private class InternalMulticastListener implements McastListener {
300 @Override
301 public void event(McastEvent event) {
Esin Karaman39b24852019-08-28 13:57:30 +0000302 eventExecutor.execute(() -> {
303 switch (event.type()) {
304 case ROUTE_ADDED:
305 case ROUTE_REMOVED:
306 case SOURCES_ADDED:
307 break;
308 case SINKS_ADDED:
309 addSinks(event);
310 break;
311 case SINKS_REMOVED:
312 removeSinks(event);
313 break;
314 default:
315 log.warn("Unknown mcast event {}", event.type());
316 }
317 });
318 }
319 }
320
321 /**
322 * Processes previous, and new sinks then finds the sinks to be removed.
323 * @param prevSinks the previous sinks to be evaluated
324 * @param newSinks the new sinks to be evaluated
325 * @returnt the set of the sinks to be removed
326 */
327 private Set<ConnectPoint> getSinksToBeRemoved(Map<HostId, Set<ConnectPoint>> prevSinks,
328 Map<HostId, Set<ConnectPoint>> newSinks) {
329 return getSinksToBeProcessed(prevSinks, newSinks);
330 }
331
332
333 /**
334 * Processes previous, and new sinks then finds the sinks to be added.
335 * @param newSinks the new sinks to be processed
336 * @param allPrevSinks all previous sinks
337 * @return the set of the sinks to be added
338 */
339 private Set<ConnectPoint> getSinksToBeAdded(Map<HostId, Set<ConnectPoint>> newSinks,
340 Map<HostId, Set<ConnectPoint>> allPrevSinks) {
341 return getSinksToBeProcessed(newSinks, allPrevSinks);
342 }
343
344 /**
345 * Gets single-homed sinks that are in set1 but not in set2.
346 * @param sinkSet1 the first sink map
347 * @param sinkSet2 the second sink map
348 * @return a set containing all the single-homed sinks found in set1 but not in set2
349 */
350 private Set<ConnectPoint> getSinksToBeProcessed(Map<HostId, Set<ConnectPoint>> sinkSet1,
351 Map<HostId, Set<ConnectPoint>> sinkSet2) {
352 final Set<ConnectPoint> sinksToBeProcessed = Sets.newHashSet();
353 sinkSet1.forEach(((hostId, connectPoints) -> {
354 if (HostId.NONE.equals(hostId)) {
355 //assume all connect points associated with HostId.NONE are single homed sinks
356 sinksToBeProcessed.addAll(connectPoints);
357 return;
358 }
359 }));
360 Set<ConnectPoint> singleHomedSinksOfSet2 = sinkSet2.get(HostId.NONE) == null ?
361 Sets.newHashSet() :
362 sinkSet2.get(HostId.NONE);
363 return Sets.difference(sinksToBeProcessed, singleHomedSinksOfSet2);
364 };
365
366
367 private void removeSinks(McastEvent event) {
368 mcastLock();
369 try {
370 Set<ConnectPoint> sinksToBeRemoved = getSinksToBeRemoved(event.prevSubject().sinks(),
371 event.subject().sinks());
372 sinksToBeRemoved.forEach(sink -> removeSink(event.subject().route().group(), sink));
373 } finally {
374 mcastUnlock();
375 }
376 }
377
378 private void removeSink(IpAddress group, ConnectPoint sink) {
379 if (!isLocalLeader(sink.deviceId())) {
380 log.debug("Not the leader of {}. Skip sink_removed event for the sink {} and group {}",
381 sink.deviceId(), sink, group);
382 return;
383 }
384
Esin Karaman996177c2020-03-05 13:21:09 +0000385 Optional<SubscriberAndDeviceInformation> oltInfo = getSubscriberAndDeviceInformation(sink.deviceId());
Esin Karaman39b24852019-08-28 13:57:30 +0000386
387 if (!oltInfo.isPresent()) {
388 log.warn("Unknown OLT device : {}", sink.deviceId());
389 return;
390 }
391
392 log.debug("Removing sink {} from the group {}", sink, group);
393
394 NextKey key = new NextKey(sink.deviceId(), group);
395 groups.computeIfPresent(key, (k, v) -> {
396 flowObjectiveService.next(sink.deviceId(), nextObject(v.getNextId(), sink.port(),
397 NextType.RemoveFromExisting, group));
398
399 Set<PortNumber> outPorts = Sets.newHashSet(v.getOutPorts());
400 outPorts.remove(sink.port());
401
402 if (outPorts.isEmpty()) {
403 // this is the last sink
404 ObjectiveContext context = new DefaultObjectiveContext(
405 (objective) -> log.debug("Successfully remove {} on {}",
406 group, sink),
407 (objective, error) -> log.warn("Failed to remove {} on {}: {}",
408 group, sink, error));
409 ForwardingObjective fwdObj = fwdObject(v.getNextId(), group).remove(context);
410 flowObjectiveService.forward(sink.deviceId(), fwdObj);
411 }
412 // remove the whole entity if no out port exists in the port list
413 return outPorts.isEmpty() ? null : new NextContent(v.getNextId(),
414 ImmutableSet.copyOf(outPorts));
415 });
416 }
417
418 private void addSinks(McastEvent event) {
419 mcastLock();
420 try {
421 Set<ConnectPoint> sinksToBeAdded = getSinksToBeAdded(event.subject().sinks(),
422 event.prevSubject().sinks());
423 sinksToBeAdded.forEach(sink -> addSink(event.subject().route(), sink));
424 } finally {
425 mcastUnlock();
426 }
427 }
428
429 private void addSink(McastRoute route, ConnectPoint sink) {
430 if (!isLocalLeader(sink.deviceId())) {
431 log.debug("Not the leader of {}. Skip sink_added event for the sink {} and group {}",
432 sink.deviceId(), sink, route.group());
433 return;
434 }
435
Esin Karaman996177c2020-03-05 13:21:09 +0000436 Optional<SubscriberAndDeviceInformation> oltInfo = getSubscriberAndDeviceInformation(sink.deviceId());
Esin Karaman39b24852019-08-28 13:57:30 +0000437
438 if (!oltInfo.isPresent()) {
439 log.warn("Unknown OLT device : {}", sink.deviceId());
440 return;
441 }
442
443 log.debug("Adding sink {} to the group {}", sink, route.group());
444
445 NextKey key = new NextKey(sink.deviceId(), route.group());
446 NextObjective newNextObj;
447
448 boolean theFirstSinkOfGroup = false;
449 if (!groups.containsKey(key)) {
450 // First time someone request this mcast group via this device
451 Integer nextId = flowObjectiveService.allocateNextId();
452 newNextObj = nextObject(nextId, sink.port(), NextType.AddNew, route.group());
453 // Store the new port
454 groups.put(key, new NextContent(nextId, ImmutableSet.of(sink.port())));
455 theFirstSinkOfGroup = true;
456 } else {
457 // This device already serves some subscribers of this mcast group
458 Versioned<NextContent> nextObj = groups.get(key);
459 if (nextObj.value().getOutPorts().contains(sink.port())) {
460 log.info("Group {} already serves the sink connected to {}", route.group(), sink);
461 return;
462 }
463 newNextObj = nextObject(nextObj.value().getNextId(), sink.port(),
464 NextType.AddToExisting, route.group());
465 // add new port to the group
466 Set<PortNumber> outPorts = Sets.newHashSet(nextObj.value().getOutPorts());
467 outPorts.add(sink.port());
468 groups.put(key, new NextContent(newNextObj.id(), ImmutableSet.copyOf(outPorts)));
469 }
470
471 ObjectiveContext context = new DefaultObjectiveContext(
472 (objective) -> log.debug("Successfully add {} on {}/{}, vlan {}",
473 route.group(), sink.deviceId(), sink.port().toLong(),
474 assignedVlan()),
475 (objective, error) -> {
476 log.warn("Failed to add {} on {}/{}, vlan {}: {}",
477 route.group(), sink.deviceId(), sink.port().toLong(), assignedVlan(),
478 error);
479 });
480
481 flowObjectiveService.next(sink.deviceId(), newNextObj);
482
483 if (theFirstSinkOfGroup) {
484 // create the necessary flow rule if this is the first sink request for the group
485 // on this device
486 flowObjectiveService.forward(sink.deviceId(), fwdObject(newNextObj.id(),
487 route.group()).add(context));
488 }
489 }
490
Esin Karaman996177c2020-03-05 13:21:09 +0000491 /**
492 * Fetches device information associated with the device serial number from SADIS.
493 *
494 * @param serialNumber serial number of a device
495 * @return device information; an empty Optional otherwise.
496 */
497 private Optional<SubscriberAndDeviceInformation> getSubscriberAndDeviceInformation(String serialNumber) {
498 long start = System.currentTimeMillis();
499 try {
500 return Optional.ofNullable(sadisService.getSubscriberInfoService().get(serialNumber));
501 } finally {
502 if (log.isDebugEnabled()) {
503 // SADIS may call remote systems to fetch device data and this calls can take a long time.
504 // This measurement is just for monitoring these kinds of situations.
505 log.debug("Device fetched from SADIS. Elapsed {} msec", System.currentTimeMillis() - start);
506 }
507
508 }
509 }
510
511 /**
512 * Fetches device information associated with the device serial number from SADIS.
513 *
514 * @param deviceId device id
515 * @return device information; an empty Optional otherwise.
516 */
517 private Optional<SubscriberAndDeviceInformation> getSubscriberAndDeviceInformation(DeviceId deviceId) {
518 Device device = deviceService.getDevice(deviceId);
519 if (device == null || device.serialNumber() == null) {
520 return Optional.empty();
521 }
522 return getSubscriberAndDeviceInformation(device.serialNumber());
523 }
524
Esin Karaman39b24852019-08-28 13:57:30 +0000525 private class InternalNetworkConfigListener implements NetworkConfigListener {
526 @Override
527 public void event(NetworkConfigEvent event) {
528 eventExecutor.execute(() -> {
529 switch (event.type()) {
530
531 case CONFIG_ADDED:
532 case CONFIG_UPDATED:
533 if (event.configClass().equals(CORD_MCAST_CONFIG_CLASS)) {
534 McastConfig config = networkConfig.getConfig(coreAppId, CORD_MCAST_CONFIG_CLASS);
535 if (config != null) {
536 //TODO: Simply remove flows/groups, hosts will response period query
537 // and re-sent IGMP report, so the flows can be rebuild.
538 // However, better to remove and re-add mcast flow rules here
539 if (mcastVlan != config.egressVlan().toShort() && vlanEnabled) {
540 clearGroups();
541 }
542 updateConfig(config);
543 }
544 }
545 break;
546 case CONFIG_REGISTERED:
547 case CONFIG_UNREGISTERED:
548 case CONFIG_REMOVED:
549 break;
550 default:
551 break;
552 }
553 });
554 }
555 }
556
557 private void updateConfig(McastConfig config) {
558 if (config == null) {
559 return;
560 }
561 log.debug("multicast config received: {}", config);
562
563 if (config.egressVlan() != null) {
564 mcastVlan = config.egressVlan().toShort();
565 }
566 }
567
568 private class NextKey {
569 private DeviceId device;
570 private IpAddress group;
571
572 public NextKey(DeviceId deviceId, IpAddress groupAddress) {
573 device = deviceId;
574 group = groupAddress;
575 }
576
577 public DeviceId getDevice() {
578 return device;
579 }
580
581 public int hashCode() {
582 return Objects.hash(this.device, this.group);
583 }
584
585 public boolean equals(Object obj) {
586 if (this == obj) {
587 return true;
588 } else if (!(obj instanceof NextKey)) {
589 return false;
590 } else {
591 NextKey that = (NextKey) obj;
592 return this.getClass() == that.getClass() &&
593 Objects.equals(this.device, that.device) &&
594 Objects.equals(this.group, that.group);
595 }
596 }
597 }
598
599 private class NextContent {
600 private Integer nextId;
601 private Set<PortNumber> outPorts;
602
603 public NextContent(Integer nextId, Set<PortNumber> outPorts) {
604 this.nextId = nextId;
605 this.outPorts = outPorts;
606 }
607
608 public Integer getNextId() {
609 return nextId;
610 }
611
612 public Set<PortNumber> getOutPorts() {
613 return ImmutableSet.copyOf(outPorts);
614 }
615
616 public int hashCode() {
617 return Objects.hash(this.nextId, this.outPorts);
618 }
619
620 public boolean equals(Object obj) {
621 if (this == obj) {
622 return true;
623 } else if (!(obj instanceof NextContent)) {
624 return false;
625 } else {
626 NextContent that = (NextContent) obj;
627 return this.getClass() == that.getClass() &&
628 Objects.equals(this.nextId, that.nextId) &&
629 Objects.equals(this.outPorts, that.outPorts);
alshabib3b1eadc2016-02-01 17:57:00 -0800630 }
631 }
632 }
633
ke han9590c812017-02-28 15:02:26 +0800634 private enum NextType { AddNew, AddToExisting, Remove, RemoveFromExisting };
635
Esin Karaman39b24852019-08-28 13:57:30 +0000636 private NextObjective nextObject(Integer nextId, PortNumber port,
637 NextType nextType, IpAddress mcastIp) {
ke han9590c812017-02-28 15:02:26 +0800638
Esin Karaman39b24852019-08-28 13:57:30 +0000639 // Build the meta selector with the fwd objective info
640 TrafficSelector.Builder metadata = DefaultTrafficSelector.builder()
641 .matchIPDst(mcastIp.toIpPrefix());
642
643 if (vlanEnabled) {
644 metadata.matchVlanId(multicastVlan());
645 }
646
ke han9590c812017-02-28 15:02:26 +0800647 DefaultNextObjective.Builder build = DefaultNextObjective.builder()
648 .fromApp(appId)
649 .addTreatment(DefaultTrafficTreatment.builder().setOutput(port).build())
650 .withType(NextObjective.Type.BROADCAST)
Esin Karaman39b24852019-08-28 13:57:30 +0000651 .withId(nextId)
652 .withMeta(metadata.build());
653
ke han9590c812017-02-28 15:02:26 +0800654 ObjectiveContext content = new ObjectiveContext() {
655 @Override
656 public void onSuccess(Objective objective) {
Esin Karaman39b24852019-08-28 13:57:30 +0000657 log.debug("Next Objective {} installed", objective.id());
ke han9590c812017-02-28 15:02:26 +0800658 }
659
660 @Override
661 public void onError(Objective objective, ObjectiveError error) {
Esin Karaman39b24852019-08-28 13:57:30 +0000662 log.debug("Next Objective {} failed, because {}",
663 objective.id(),
664 error);
ke han9590c812017-02-28 15:02:26 +0800665 }
666 };
667
668 switch (nextType) {
669 case AddNew:
670 return build.add(content);
671 case AddToExisting:
672 return build.addToExisting(content);
673 case Remove:
674 return build.remove(content);
675 case RemoveFromExisting:
676 return build.removeFromExisting(content);
677 default:
678 return null;
679 }
680 }
681
Esin Karaman39b24852019-08-28 13:57:30 +0000682 private ForwardingObjective.Builder fwdObject(int nextId, IpAddress mcastIp) {
683 TrafficSelector.Builder mcast = DefaultTrafficSelector.builder()
684 .matchEthType(Ethernet.TYPE_IPV4)
685 .matchIPDst(mcastIp.toIpPrefix());
ke han9590c812017-02-28 15:02:26 +0800686
Esin Karaman39b24852019-08-28 13:57:30 +0000687 //build the meta selector
688 TrafficSelector.Builder metabuilder = DefaultTrafficSelector.builder();
689 if (vlanEnabled) {
690 metabuilder.matchVlanId(multicastVlan());
Jonathan Hart718c0452016-02-18 15:56:22 -0800691 }
692
Esin Karaman39b24852019-08-28 13:57:30 +0000693 ForwardingObjective.Builder fwdBuilder = DefaultForwardingObjective.builder()
694 .fromApp(appId)
695 .nextStep(nextId)
696 .makePermanent()
697 .withFlag(ForwardingObjective.Flag.SPECIFIC)
698 .withPriority(priority)
699 .withSelector(mcast.build())
700 .withMeta(metabuilder.build());
alshabibfc1cb032016-02-17 15:37:56 -0800701
Esin Karaman39b24852019-08-28 13:57:30 +0000702 return fwdBuilder;
alshabibfc1cb032016-02-17 15:37:56 -0800703 }
704
Esin Karaman39b24852019-08-28 13:57:30 +0000705 // Custom-built function, when the device is not available we need a fallback mechanism
706 private boolean isLocalLeader(DeviceId deviceId) {
707 if (!mastershipService.isLocalMaster(deviceId)) {
708 // When the device is available we just check the mastership
709 if (deviceService.isAvailable(deviceId)) {
ke han9590c812017-02-28 15:02:26 +0800710 return false;
ke han9590c812017-02-28 15:02:26 +0800711 }
Esin Karaman39b24852019-08-28 13:57:30 +0000712 // Fallback with Leadership service - device id is used as topic
713 NodeId leader = leadershipService.runForLeadership(
714 deviceId.toString()).leaderNodeId();
715 // Verify if this node is the leader
716 return clusterService.getLocalNode().id().equals(leader);
ke han9590c812017-02-28 15:02:26 +0800717 }
Esin Karaman39b24852019-08-28 13:57:30 +0000718 return true;
ke han9590c812017-02-28 15:02:26 +0800719 }
Esin Karaman39b24852019-08-28 13:57:30 +0000720
alshabib3b1eadc2016-02-01 17:57:00 -0800721}
ke hanf1709e82016-08-12 10:48:17 +0800722
ke han9590c812017-02-28 15:02:26 +0800723