blob: c8860d0543a061c9b888ab93762cd3d9303d340f [file] [log] [blame]
alshabib3b1eadc2016-02-01 17:57:00 -08001/*
Brian O'Connorcf85aa82017-08-03 22:46:01 -07002 * Copyright 2016-present Open Networking Foundation
alshabib3b1eadc2016-02-01 17:57:00 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Daniele Moro8ea9e102020-03-24 18:56:52 +010016package org.opencord.cordmcast.impl;
alshabib3b1eadc2016-02-01 17:57:00 -080017
Esin Karaman39b24852019-08-28 13:57:30 +000018import com.google.common.collect.ImmutableSet;
19import com.google.common.collect.Sets;
alshabib09069c92016-02-21 14:49:51 -080020import org.apache.commons.lang3.tuple.ImmutablePair;
alshabib3b1eadc2016-02-01 17:57:00 -080021import org.onlab.packet.Ethernet;
alshabib3b1eadc2016-02-01 17:57:00 -080022import org.onlab.packet.IpAddress;
23import org.onlab.packet.VlanId;
Esin Karaman39b24852019-08-28 13:57:30 +000024import org.onlab.util.KryoNamespace;
Jonathan Hart28271642016-02-10 16:13:54 -080025import org.onosproject.cfg.ComponentConfigService;
Esin Karaman39b24852019-08-28 13:57:30 +000026import org.onosproject.cluster.ClusterService;
27import org.onosproject.cluster.LeadershipService;
28import org.onosproject.cluster.NodeId;
alshabib3b1eadc2016-02-01 17:57:00 -080029import org.onosproject.core.ApplicationId;
30import org.onosproject.core.CoreService;
Esin Karaman39b24852019-08-28 13:57:30 +000031import org.onosproject.mastership.MastershipService;
32import org.onosproject.mcast.api.McastEvent;
33import org.onosproject.mcast.api.McastListener;
34import org.onosproject.mcast.api.McastRoute;
35import org.onosproject.mcast.api.MulticastRouteService;
alshabib3b1eadc2016-02-01 17:57:00 -080036import org.onosproject.net.ConnectPoint;
Daniele Moro8ea9e102020-03-24 18:56:52 +010037import org.onosproject.net.Device;
ke han9590c812017-02-28 15:02:26 +080038import org.onosproject.net.DeviceId;
Esin Karaman39b24852019-08-28 13:57:30 +000039import org.onosproject.net.HostId;
ke han9590c812017-02-28 15:02:26 +080040import org.onosproject.net.PortNumber;
ke hanf1709e82016-08-12 10:48:17 +080041import org.onosproject.net.config.ConfigFactory;
42import org.onosproject.net.config.NetworkConfigEvent;
43import org.onosproject.net.config.NetworkConfigListener;
44import org.onosproject.net.config.NetworkConfigRegistry;
Esin Karaman39b24852019-08-28 13:57:30 +000045import org.onosproject.net.config.basics.McastConfig;
ke hanf1709e82016-08-12 10:48:17 +080046import org.onosproject.net.config.basics.SubjectFactories;
Andrea Campanellac2f3ddd2020-05-18 11:09:11 +020047import org.onosproject.net.device.DeviceEvent;
48import org.onosproject.net.device.DeviceListener;
Esin Karaman39b24852019-08-28 13:57:30 +000049import org.onosproject.net.device.DeviceService;
alshabib3b1eadc2016-02-01 17:57:00 -080050import org.onosproject.net.flow.DefaultTrafficSelector;
51import org.onosproject.net.flow.DefaultTrafficTreatment;
52import org.onosproject.net.flow.TrafficSelector;
53import org.onosproject.net.flowobjective.DefaultForwardingObjective;
54import org.onosproject.net.flowobjective.DefaultNextObjective;
Esin Karaman39b24852019-08-28 13:57:30 +000055import org.onosproject.net.flowobjective.DefaultObjectiveContext;
alshabib3b1eadc2016-02-01 17:57:00 -080056import org.onosproject.net.flowobjective.FlowObjectiveService;
57import org.onosproject.net.flowobjective.ForwardingObjective;
58import org.onosproject.net.flowobjective.NextObjective;
59import org.onosproject.net.flowobjective.Objective;
60import org.onosproject.net.flowobjective.ObjectiveContext;
61import org.onosproject.net.flowobjective.ObjectiveError;
Andrea Campanellac2f3ddd2020-05-18 11:09:11 +020062import org.onosproject.net.group.GroupService;
Esin Karaman39b24852019-08-28 13:57:30 +000063import org.onosproject.store.serializers.KryoNamespaces;
64import org.onosproject.store.service.ConsistentMap;
65import org.onosproject.store.service.Serializer;
66import org.onosproject.store.service.StorageService;
67import org.onosproject.store.service.Versioned;
Daniele Moro8ea9e102020-03-24 18:56:52 +010068import org.opencord.cordmcast.CordMcastService;
69import org.opencord.cordmcast.CordMcastStatisticsService;
70import org.opencord.sadis.SadisService;
71import org.opencord.sadis.SubscriberAndDeviceInformation;
Jonathan Hart28271642016-02-10 16:13:54 -080072import org.osgi.service.component.ComponentContext;
Daniele Moro8ea9e102020-03-24 18:56:52 +010073import org.osgi.service.component.annotations.Activate;
74import org.osgi.service.component.annotations.Component;
75import org.osgi.service.component.annotations.Deactivate;
76import org.osgi.service.component.annotations.Modified;
77import org.osgi.service.component.annotations.Reference;
78import org.osgi.service.component.annotations.ReferenceCardinality;
alshabib3b1eadc2016-02-01 17:57:00 -080079import org.slf4j.Logger;
80
Jonathan Hart28271642016-02-10 16:13:54 -080081import java.util.Dictionary;
alshabib3b1eadc2016-02-01 17:57:00 -080082import java.util.Map;
ke han9590c812017-02-28 15:02:26 +080083import java.util.Objects;
Jonathan Hart0c194962016-05-23 17:08:15 -070084import java.util.Optional;
alshabibfc1cb032016-02-17 15:37:56 -080085import java.util.Properties;
Esin Karaman39b24852019-08-28 13:57:30 +000086import java.util.Set;
87import java.util.concurrent.ExecutorService;
88import java.util.concurrent.locks.Lock;
89import java.util.concurrent.locks.ReentrantLock;
Andrea Campanella03652352020-05-06 16:12:00 +020090import java.util.function.Consumer;
alshabib3b1eadc2016-02-01 17:57:00 -080091
alshabibfc1cb032016-02-17 15:37:56 -080092import static com.google.common.base.Strings.isNullOrEmpty;
Esin Karaman39b24852019-08-28 13:57:30 +000093import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
alshabibfc1cb032016-02-17 15:37:56 -080094import static org.onlab.util.Tools.get;
Esin Karaman39b24852019-08-28 13:57:30 +000095import static org.onlab.util.Tools.groupedThreads;
Daniele Moro8ea9e102020-03-24 18:56:52 +010096import static org.opencord.cordmcast.impl.OsgiPropertyConstants.*;
alshabib3b1eadc2016-02-01 17:57:00 -080097import static org.slf4j.LoggerFactory.getLogger;
98
Esin Karaman39b24852019-08-28 13:57:30 +000099
alshabib3b1eadc2016-02-01 17:57:00 -0800100/**
Jonathan Hartc3f84eb2016-02-19 12:44:36 -0800101 * CORD multicast provisioning application. Operates by listening to
Jonathan Hart28271642016-02-10 16:13:54 -0800102 * events on the multicast rib and provisioning groups to program multicast
alshabib3b1eadc2016-02-01 17:57:00 -0800103 * flows on the dataplane.
104 */
Carmelo Cascone995fd682019-11-14 14:22:39 -0800105@Component(immediate = true,
106 property = {
107 VLAN_ENABLED + ":Boolean=" + DEFAULT_VLAN_ENABLED,
108 PRIORITY + ":Integer=" + DEFAULT_PRIORITY,
109})
Daniele Moro8ea9e102020-03-24 18:56:52 +0100110public class CordMcast implements CordMcastService {
111 private static final String APP_NAME = "org.opencord.mcast";
alshabib3b1eadc2016-02-01 17:57:00 -0800112
Jonathan Hart0c194962016-05-23 17:08:15 -0700113 private final Logger log = getLogger(getClass());
alshabib09069c92016-02-21 14:49:51 -0800114
alshabib09069c92016-02-21 14:49:51 -0800115 private static final int DEFAULT_PRIORITY = 500;
alshabib3b1eadc2016-02-01 17:57:00 -0800116 private static final short DEFAULT_MCAST_VLAN = 4000;
alshabibfc1cb032016-02-17 15:37:56 -0800117
Carmelo Cascone995fd682019-11-14 14:22:39 -0800118 @Reference(cardinality = ReferenceCardinality.MANDATORY)
alshabib3b1eadc2016-02-01 17:57:00 -0800119 protected MulticastRouteService mcastService;
120
Carmelo Cascone995fd682019-11-14 14:22:39 -0800121 @Reference(cardinality = ReferenceCardinality.MANDATORY)
alshabib3b1eadc2016-02-01 17:57:00 -0800122 protected FlowObjectiveService flowObjectiveService;
123
Carmelo Cascone995fd682019-11-14 14:22:39 -0800124 @Reference(cardinality = ReferenceCardinality.MANDATORY)
alshabib3b1eadc2016-02-01 17:57:00 -0800125 protected CoreService coreService;
126
Carmelo Cascone995fd682019-11-14 14:22:39 -0800127 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jonathan Hart28271642016-02-10 16:13:54 -0800128 protected ComponentConfigService componentConfigService;
129
Carmelo Cascone995fd682019-11-14 14:22:39 -0800130 @Reference(cardinality = ReferenceCardinality.MANDATORY)
ke hanf1709e82016-08-12 10:48:17 +0800131 protected NetworkConfigRegistry networkConfig;
132
Carmelo Cascone995fd682019-11-14 14:22:39 -0800133 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Esin Karaman39b24852019-08-28 13:57:30 +0000134 protected StorageService storageService;
135
Carmelo Cascone995fd682019-11-14 14:22:39 -0800136 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Esin Karaman39b24852019-08-28 13:57:30 +0000137 protected MastershipService mastershipService;
138
Carmelo Cascone995fd682019-11-14 14:22:39 -0800139 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Esin Karaman39b24852019-08-28 13:57:30 +0000140 public DeviceService deviceService;
141
Carmelo Cascone995fd682019-11-14 14:22:39 -0800142 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Esin Karaman39b24852019-08-28 13:57:30 +0000143 private ClusterService clusterService;
144
Carmelo Cascone995fd682019-11-14 14:22:39 -0800145 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Esin Karaman39b24852019-08-28 13:57:30 +0000146 private LeadershipService leadershipService;
147
Esin Karaman996177c2020-03-05 13:21:09 +0000148 @Reference(cardinality = ReferenceCardinality.MANDATORY)
149 protected SadisService sadisService;
150
Arjun E Kabf9e6e2020-03-02 10:15:21 +0000151 @Reference(cardinality = ReferenceCardinality.MANDATORY)
152 protected CordMcastStatisticsService cordMcastStatisticsService;
153
Andrea Campanellac2f3ddd2020-05-18 11:09:11 +0200154 @Reference(cardinality = ReferenceCardinality.MANDATORY)
155 protected GroupService groupService;
156
alshabib3b1eadc2016-02-01 17:57:00 -0800157 protected McastListener listener = new InternalMulticastListener();
Arjun E Kabf9e6e2020-03-02 10:15:21 +0000158
ke hanf1709e82016-08-12 10:48:17 +0800159 private InternalNetworkConfigListener configListener =
160 new InternalNetworkConfigListener();
alshabib3b1eadc2016-02-01 17:57:00 -0800161
Esin Karaman39b24852019-08-28 13:57:30 +0000162 private ConsistentMap<NextKey, NextContent> groups;
alshabib3b1eadc2016-02-01 17:57:00 -0800163
alshabib3b1eadc2016-02-01 17:57:00 -0800164 private ApplicationId appId;
ke hanf1709e82016-08-12 10:48:17 +0800165 private ApplicationId coreAppId;
Esin Karaman39b24852019-08-28 13:57:30 +0000166 private short mcastVlan = DEFAULT_MCAST_VLAN;
Esin Karamanbb35a3b2020-03-18 13:53:24 +0000167 private VlanId mcastInnerVlan = VlanId.NONE;
alshabib3b1eadc2016-02-01 17:57:00 -0800168
Carmelo Cascone995fd682019-11-14 14:22:39 -0800169 /**
170 * Whether to use VLAN for multicast traffic.
171 **/
alshabib09069c92016-02-21 14:49:51 -0800172 private boolean vlanEnabled = DEFAULT_VLAN_ENABLED;
alshabibfc1cb032016-02-17 15:37:56 -0800173
Carmelo Cascone995fd682019-11-14 14:22:39 -0800174 /**
175 * Priority for multicast rules.
176 **/
alshabib3b1eadc2016-02-01 17:57:00 -0800177 private int priority = DEFAULT_PRIORITY;
178
ke hanf1709e82016-08-12 10:48:17 +0800179 private static final Class<McastConfig> CORD_MCAST_CONFIG_CLASS =
180 McastConfig.class;
181
182 private ConfigFactory<ApplicationId, McastConfig> cordMcastConfigFactory =
183 new ConfigFactory<ApplicationId, McastConfig>(
184 SubjectFactories.APP_SUBJECT_FACTORY, CORD_MCAST_CONFIG_CLASS, "multicast") {
185 @Override
186 public McastConfig createConfig() {
187 return new McastConfig();
188 }
189 };
Jonathan Hart28271642016-02-10 16:13:54 -0800190
Esin Karaman39b24852019-08-28 13:57:30 +0000191 // lock to synchronize local operations
192 private final Lock mcastLock = new ReentrantLock();
193 private void mcastLock() {
194 mcastLock.lock();
195 }
196 private void mcastUnlock() {
197 mcastLock.unlock();
198 }
199 private ExecutorService eventExecutor;
200
Andrea Campanellac2f3ddd2020-05-18 11:09:11 +0200201 //Device listener to purge groups upon device disconnection.
202 private DeviceListener deviceListener = new InternalDeviceListener();
203
alshabib3b1eadc2016-02-01 17:57:00 -0800204 @Activate
Jonathan Hart435ffc42016-02-19 10:32:05 -0800205 public void activate(ComponentContext context) {
Jonathan Hartc3f84eb2016-02-19 12:44:36 -0800206 componentConfigService.registerProperties(getClass());
Jonathan Hart435ffc42016-02-19 10:32:05 -0800207 modified(context);
208
Charles Chanf867c4b2017-01-20 11:22:25 -0800209 appId = coreService.registerApplication(APP_NAME);
ke hanf1709e82016-08-12 10:48:17 +0800210 coreAppId = coreService.registerApplication(CoreService.CORE_APP_NAME);
Jonathan Hart28271642016-02-10 16:13:54 -0800211
Esin Karaman39b24852019-08-28 13:57:30 +0000212 eventExecutor = newSingleThreadScheduledExecutor(groupedThreads("cord/mcast",
213 "events-mcast-%d", log));
214
215 KryoNamespace.Builder groupsKryo = new KryoNamespace.Builder()
216 .register(KryoNamespaces.API)
217 .register(NextKey.class)
218 .register(NextContent.class);
219 groups = storageService
220 .<NextKey, NextContent>consistentMapBuilder()
221 .withName("cord-mcast-groups-store")
222 .withSerializer(Serializer.using(groupsKryo.build("CordMcast-Groups")))
223 .build();
224
ke hanf1709e82016-08-12 10:48:17 +0800225 networkConfig.registerConfigFactory(cordMcastConfigFactory);
226 networkConfig.addListener(configListener);
Jonathan Hartc3f84eb2016-02-19 12:44:36 -0800227 mcastService.addListener(listener);
228
alshabib09069c92016-02-21 14:49:51 -0800229 mcastService.getRoutes().stream()
Esin Karaman39b24852019-08-28 13:57:30 +0000230 .map(r -> new ImmutablePair<>(r, mcastService.sinks(r)))
alshabib09069c92016-02-21 14:49:51 -0800231 .filter(pair -> pair.getRight() != null && !pair.getRight().isEmpty())
Esin Karaman39b24852019-08-28 13:57:30 +0000232 .forEach(pair -> pair.getRight().forEach(sink -> addSink(pair.getLeft(),
233 sink)));
Jonathan Hartc3f84eb2016-02-19 12:44:36 -0800234
ke hanf1709e82016-08-12 10:48:17 +0800235 McastConfig config = networkConfig.getConfig(coreAppId, CORD_MCAST_CONFIG_CLASS);
Esin Karaman39b24852019-08-28 13:57:30 +0000236 updateConfig(config);
Andrea Campanellac2f3ddd2020-05-18 11:09:11 +0200237 deviceService.addListener(deviceListener);
alshabib3b1eadc2016-02-01 17:57:00 -0800238 log.info("Started");
239 }
240
Arjun E Kabf9e6e2020-03-02 10:15:21 +0000241 @Modified
242 public void modified(ComponentContext context) {
243 Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
244
245 String s = get(properties, VLAN_ENABLED);
246 vlanEnabled = isNullOrEmpty(s) ? DEFAULT_VLAN_ENABLED : Boolean.parseBoolean(s.trim());
247
248 try {
249 s = get(properties, PRIORITY);
250 priority = isNullOrEmpty(s) ? DEFAULT_PRIORITY : Integer.parseInt(s.trim());
251 } catch (NumberFormatException ne) {
252 log.error("Unable to parse configuration parameter for priority", ne);
253 priority = DEFAULT_PRIORITY;
254 }
Esin Karamane4890012020-04-19 11:58:54 +0000255 feedStatsServiceWithVlanConfigValues();
Arjun E Kabf9e6e2020-03-02 10:15:21 +0000256 }
257
alshabib3b1eadc2016-02-01 17:57:00 -0800258 @Deactivate
259 public void deactivate() {
Andrea Campanellac2f3ddd2020-05-18 11:09:11 +0200260 deviceService.removeListener(deviceListener);
Jonathan Hartc3f84eb2016-02-19 12:44:36 -0800261 componentConfigService.unregisterProperties(getClass(), false);
alshabib3b1eadc2016-02-01 17:57:00 -0800262 mcastService.removeListener(listener);
ke hanf1709e82016-08-12 10:48:17 +0800263 networkConfig.removeListener(configListener);
ke han9590c812017-02-28 15:02:26 +0800264 networkConfig.unregisterConfigFactory(cordMcastConfigFactory);
Esin Karaman39b24852019-08-28 13:57:30 +0000265 eventExecutor.shutdown();
Andrea Campanella03652352020-05-06 16:12:00 +0200266 eventExecutor = null;
267 groups.clear();
Esin Karaman39b24852019-08-28 13:57:30 +0000268 groups.destroy();
alshabib3b1eadc2016-02-01 17:57:00 -0800269 log.info("Stopped");
270 }
271
Esin Karamane4890012020-04-19 11:58:54 +0000272 /**
273 * Updates the stats service with current VLAN config values.
274 */
275 private void feedStatsServiceWithVlanConfigValues() {
276 cordMcastStatisticsService.setVlanValue(assignedVlan());
277 cordMcastStatisticsService.setInnerVlanValue(assignedInnerVlan());
278 }
279
Andrea Campanella03652352020-05-06 16:12:00 +0200280 private void clearGroups() {
Esin Karaman39b24852019-08-28 13:57:30 +0000281 mcastLock();
282 try {
283 groups.keySet().forEach(groupInfo -> {
Andrea Campanella03652352020-05-06 16:12:00 +0200284 NextContent next = groups.get(groupInfo).value();
Esin Karaman39b24852019-08-28 13:57:30 +0000285 if (!isLocalLeader(groupInfo.getDevice())) {
286 return;
287 }
Sonal Kasliwala0bbe6c2020-01-06 10:46:30 +0000288 if (next != null) {
Andrea Campanella03652352020-05-06 16:12:00 +0200289 //On Success of removing the fwd objective we remove also the group.
290 Consumer<Objective> onSuccess = (objective) -> {
291 log.debug("Successfully removed fwd objective for {} on {}, " +
292 "removing next objective {}", groupInfo.group,
293 groupInfo.getDevice(), next.getNextId());
294 eventExecutor.submit(() -> flowObjectiveService.next(groupInfo.getDevice(),
295 nextObject(next.getNextId(),
296 null,
297 NextType.Remove, groupInfo.group)));
298 };
299
300 ObjectiveContext context =
301 new DefaultObjectiveContext(onSuccess, (objective, error) ->
302 log.warn("Failed to remove {} on {}: {}",
303 groupInfo.group, next.getNextId(), error));
Sonal Kasliwala0bbe6c2020-01-06 10:46:30 +0000304 // remove the flow rule
Andrea Campanella03652352020-05-06 16:12:00 +0200305 flowObjectiveService.forward(groupInfo.getDevice(),
306 fwdObject(next.getNextId(),
307 groupInfo.group).remove(context));
Esin Karaman39b24852019-08-28 13:57:30 +0000308
Sonal Kasliwala0bbe6c2020-01-06 10:46:30 +0000309 }
Esin Karaman39b24852019-08-28 13:57:30 +0000310 });
Esin Karaman39b24852019-08-28 13:57:30 +0000311 } finally {
312 mcastUnlock();
313 }
314 }
315
316 private VlanId multicastVlan() {
317 return VlanId.vlanId(mcastVlan);
318 }
319
Arjun E Kabf9e6e2020-03-02 10:15:21 +0000320 protected VlanId assignedVlan() {
Esin Karaman39b24852019-08-28 13:57:30 +0000321 return vlanEnabled ? multicastVlan() : VlanId.NONE;
ke han9590c812017-02-28 15:02:26 +0800322 }
323
Esin Karamanbb35a3b2020-03-18 13:53:24 +0000324 protected VlanId assignedInnerVlan() {
325 return vlanEnabled ? mcastInnerVlan : VlanId.NONE;
326 }
327
alshabib3b1eadc2016-02-01 17:57:00 -0800328 private class InternalMulticastListener implements McastListener {
329 @Override
330 public void event(McastEvent event) {
Esin Karaman39b24852019-08-28 13:57:30 +0000331 eventExecutor.execute(() -> {
332 switch (event.type()) {
333 case ROUTE_ADDED:
334 case ROUTE_REMOVED:
335 case SOURCES_ADDED:
336 break;
337 case SINKS_ADDED:
338 addSinks(event);
339 break;
340 case SINKS_REMOVED:
341 removeSinks(event);
342 break;
343 default:
344 log.warn("Unknown mcast event {}", event.type());
345 }
346 });
347 }
348 }
349
350 /**
351 * Processes previous, and new sinks then finds the sinks to be removed.
352 * @param prevSinks the previous sinks to be evaluated
353 * @param newSinks the new sinks to be evaluated
354 * @returnt the set of the sinks to be removed
355 */
356 private Set<ConnectPoint> getSinksToBeRemoved(Map<HostId, Set<ConnectPoint>> prevSinks,
357 Map<HostId, Set<ConnectPoint>> newSinks) {
358 return getSinksToBeProcessed(prevSinks, newSinks);
359 }
360
361
362 /**
363 * Processes previous, and new sinks then finds the sinks to be added.
364 * @param newSinks the new sinks to be processed
365 * @param allPrevSinks all previous sinks
366 * @return the set of the sinks to be added
367 */
368 private Set<ConnectPoint> getSinksToBeAdded(Map<HostId, Set<ConnectPoint>> newSinks,
369 Map<HostId, Set<ConnectPoint>> allPrevSinks) {
370 return getSinksToBeProcessed(newSinks, allPrevSinks);
371 }
372
373 /**
374 * Gets single-homed sinks that are in set1 but not in set2.
375 * @param sinkSet1 the first sink map
376 * @param sinkSet2 the second sink map
377 * @return a set containing all the single-homed sinks found in set1 but not in set2
378 */
379 private Set<ConnectPoint> getSinksToBeProcessed(Map<HostId, Set<ConnectPoint>> sinkSet1,
380 Map<HostId, Set<ConnectPoint>> sinkSet2) {
381 final Set<ConnectPoint> sinksToBeProcessed = Sets.newHashSet();
382 sinkSet1.forEach(((hostId, connectPoints) -> {
383 if (HostId.NONE.equals(hostId)) {
384 //assume all connect points associated with HostId.NONE are single homed sinks
385 sinksToBeProcessed.addAll(connectPoints);
386 return;
387 }
388 }));
389 Set<ConnectPoint> singleHomedSinksOfSet2 = sinkSet2.get(HostId.NONE) == null ?
390 Sets.newHashSet() :
391 sinkSet2.get(HostId.NONE);
392 return Sets.difference(sinksToBeProcessed, singleHomedSinksOfSet2);
393 };
394
395
396 private void removeSinks(McastEvent event) {
397 mcastLock();
398 try {
399 Set<ConnectPoint> sinksToBeRemoved = getSinksToBeRemoved(event.prevSubject().sinks(),
400 event.subject().sinks());
401 sinksToBeRemoved.forEach(sink -> removeSink(event.subject().route().group(), sink));
402 } finally {
403 mcastUnlock();
404 }
405 }
406
407 private void removeSink(IpAddress group, ConnectPoint sink) {
408 if (!isLocalLeader(sink.deviceId())) {
409 log.debug("Not the leader of {}. Skip sink_removed event for the sink {} and group {}",
410 sink.deviceId(), sink, group);
411 return;
412 }
413
Esin Karaman996177c2020-03-05 13:21:09 +0000414 Optional<SubscriberAndDeviceInformation> oltInfo = getSubscriberAndDeviceInformation(sink.deviceId());
Esin Karaman39b24852019-08-28 13:57:30 +0000415
416 if (!oltInfo.isPresent()) {
417 log.warn("Unknown OLT device : {}", sink.deviceId());
418 return;
419 }
420
421 log.debug("Removing sink {} from the group {}", sink, group);
422
423 NextKey key = new NextKey(sink.deviceId(), group);
424 groups.computeIfPresent(key, (k, v) -> {
Esin Karaman39b24852019-08-28 13:57:30 +0000425
426 Set<PortNumber> outPorts = Sets.newHashSet(v.getOutPorts());
427 outPorts.remove(sink.port());
428
429 if (outPorts.isEmpty()) {
Andrea Campanella03652352020-05-06 16:12:00 +0200430 log.debug("No more output ports for group {}, removing next and fwd objectives", group);
431
432 //On Success of removing the fwd objective we remove also the group.
433 Consumer<Objective> onSuccess = (objective) -> {
434 log.debug("Successfully removed fwd objective for {} on {}, " +
435 "removing next objective {}", group, sink, v.getNextId());
436 eventExecutor.execute(() -> {
437 //No port is needed since it's a remove Operation
438 flowObjectiveService.next(sink.deviceId(), nextObject(v.getNextId(),
439 null,
440 NextType.Remove, group));
441 });
442 };
443
Esin Karaman39b24852019-08-28 13:57:30 +0000444 // this is the last sink
Andrea Campanella03652352020-05-06 16:12:00 +0200445 ObjectiveContext context = new DefaultObjectiveContext(onSuccess,
Esin Karaman39b24852019-08-28 13:57:30 +0000446 (objective, error) -> log.warn("Failed to remove {} on {}: {}",
447 group, sink, error));
448 ForwardingObjective fwdObj = fwdObject(v.getNextId(), group).remove(context);
449 flowObjectiveService.forward(sink.deviceId(), fwdObj);
Andrea Campanella03652352020-05-06 16:12:00 +0200450 } else {
451 log.debug("Group {} has remaining {} ports, removing just {} " +
452 "from it's sinks", group, outPorts, sink.port());
453 flowObjectiveService.next(sink.deviceId(), nextObject(v.getNextId(), sink.port(),
454 NextType.RemoveFromExisting, group));
Esin Karaman39b24852019-08-28 13:57:30 +0000455 }
456 // remove the whole entity if no out port exists in the port list
457 return outPorts.isEmpty() ? null : new NextContent(v.getNextId(),
458 ImmutableSet.copyOf(outPorts));
459 });
460 }
461
462 private void addSinks(McastEvent event) {
463 mcastLock();
464 try {
465 Set<ConnectPoint> sinksToBeAdded = getSinksToBeAdded(event.subject().sinks(),
466 event.prevSubject().sinks());
467 sinksToBeAdded.forEach(sink -> addSink(event.subject().route(), sink));
468 } finally {
469 mcastUnlock();
470 }
471 }
472
473 private void addSink(McastRoute route, ConnectPoint sink) {
Arjun E Kabf9e6e2020-03-02 10:15:21 +0000474
Esin Karaman39b24852019-08-28 13:57:30 +0000475 if (!isLocalLeader(sink.deviceId())) {
476 log.debug("Not the leader of {}. Skip sink_added event for the sink {} and group {}",
477 sink.deviceId(), sink, route.group());
478 return;
479 }
480
Esin Karaman996177c2020-03-05 13:21:09 +0000481 Optional<SubscriberAndDeviceInformation> oltInfo = getSubscriberAndDeviceInformation(sink.deviceId());
Esin Karaman39b24852019-08-28 13:57:30 +0000482
483 if (!oltInfo.isPresent()) {
484 log.warn("Unknown OLT device : {}", sink.deviceId());
485 return;
486 }
487
488 log.debug("Adding sink {} to the group {}", sink, route.group());
489
490 NextKey key = new NextKey(sink.deviceId(), route.group());
491 NextObjective newNextObj;
492
493 boolean theFirstSinkOfGroup = false;
494 if (!groups.containsKey(key)) {
495 // First time someone request this mcast group via this device
496 Integer nextId = flowObjectiveService.allocateNextId();
497 newNextObj = nextObject(nextId, sink.port(), NextType.AddNew, route.group());
498 // Store the new port
499 groups.put(key, new NextContent(nextId, ImmutableSet.of(sink.port())));
500 theFirstSinkOfGroup = true;
501 } else {
502 // This device already serves some subscribers of this mcast group
503 Versioned<NextContent> nextObj = groups.get(key);
504 if (nextObj.value().getOutPorts().contains(sink.port())) {
505 log.info("Group {} already serves the sink connected to {}", route.group(), sink);
506 return;
507 }
508 newNextObj = nextObject(nextObj.value().getNextId(), sink.port(),
509 NextType.AddToExisting, route.group());
510 // add new port to the group
511 Set<PortNumber> outPorts = Sets.newHashSet(nextObj.value().getOutPorts());
512 outPorts.add(sink.port());
513 groups.put(key, new NextContent(newNextObj.id(), ImmutableSet.copyOf(outPorts)));
514 }
515
516 ObjectiveContext context = new DefaultObjectiveContext(
Esin Karamanbb35a3b2020-03-18 13:53:24 +0000517 (objective) -> log.debug("Successfully add {} on {}/{}, vlan {}, inner vlan {}",
Esin Karaman39b24852019-08-28 13:57:30 +0000518 route.group(), sink.deviceId(), sink.port().toLong(),
Esin Karamanbb35a3b2020-03-18 13:53:24 +0000519 assignedVlan(), assignedInnerVlan()),
Esin Karaman39b24852019-08-28 13:57:30 +0000520 (objective, error) -> {
Esin Karamanbb35a3b2020-03-18 13:53:24 +0000521 log.warn("Failed to add {} on {}/{}, vlan {}, inner vlan {}: {}",
Esin Karaman39b24852019-08-28 13:57:30 +0000522 route.group(), sink.deviceId(), sink.port().toLong(), assignedVlan(),
Esin Karamanbb35a3b2020-03-18 13:53:24 +0000523 assignedInnerVlan(), error);
Esin Karaman39b24852019-08-28 13:57:30 +0000524 });
525
526 flowObjectiveService.next(sink.deviceId(), newNextObj);
527
528 if (theFirstSinkOfGroup) {
529 // create the necessary flow rule if this is the first sink request for the group
530 // on this device
531 flowObjectiveService.forward(sink.deviceId(), fwdObject(newNextObj.id(),
532 route.group()).add(context));
533 }
534 }
535
Esin Karaman996177c2020-03-05 13:21:09 +0000536 /**
537 * Fetches device information associated with the device serial number from SADIS.
538 *
539 * @param serialNumber serial number of a device
540 * @return device information; an empty Optional otherwise.
541 */
542 private Optional<SubscriberAndDeviceInformation> getSubscriberAndDeviceInformation(String serialNumber) {
543 long start = System.currentTimeMillis();
544 try {
545 return Optional.ofNullable(sadisService.getSubscriberInfoService().get(serialNumber));
546 } finally {
547 if (log.isDebugEnabled()) {
548 // SADIS may call remote systems to fetch device data and this calls can take a long time.
549 // This measurement is just for monitoring these kinds of situations.
550 log.debug("Device fetched from SADIS. Elapsed {} msec", System.currentTimeMillis() - start);
551 }
552
553 }
554 }
555
556 /**
557 * Fetches device information associated with the device serial number from SADIS.
558 *
559 * @param deviceId device id
560 * @return device information; an empty Optional otherwise.
561 */
562 private Optional<SubscriberAndDeviceInformation> getSubscriberAndDeviceInformation(DeviceId deviceId) {
563 Device device = deviceService.getDevice(deviceId);
564 if (device == null || device.serialNumber() == null) {
565 return Optional.empty();
566 }
567 return getSubscriberAndDeviceInformation(device.serialNumber());
568 }
569
Esin Karaman39b24852019-08-28 13:57:30 +0000570 private class InternalNetworkConfigListener implements NetworkConfigListener {
571 @Override
572 public void event(NetworkConfigEvent event) {
573 eventExecutor.execute(() -> {
574 switch (event.type()) {
575
576 case CONFIG_ADDED:
577 case CONFIG_UPDATED:
578 if (event.configClass().equals(CORD_MCAST_CONFIG_CLASS)) {
579 McastConfig config = networkConfig.getConfig(coreAppId, CORD_MCAST_CONFIG_CLASS);
580 if (config != null) {
581 //TODO: Simply remove flows/groups, hosts will response period query
582 // and re-sent IGMP report, so the flows can be rebuild.
583 // However, better to remove and re-add mcast flow rules here
Esin Karamanbb35a3b2020-03-18 13:53:24 +0000584 if (vlanEnabled && (mcastVlan != config.egressVlan().toShort() ||
585 !mcastInnerVlan.equals(config.egressInnerVlan()))) {
Esin Karaman39b24852019-08-28 13:57:30 +0000586 clearGroups();
Andrea Campanellac2f3ddd2020-05-18 11:09:11 +0200587 groups.clear();
Esin Karaman39b24852019-08-28 13:57:30 +0000588 }
589 updateConfig(config);
590 }
591 }
592 break;
593 case CONFIG_REGISTERED:
594 case CONFIG_UNREGISTERED:
595 case CONFIG_REMOVED:
596 break;
597 default:
598 break;
599 }
600 });
601 }
602 }
603
604 private void updateConfig(McastConfig config) {
605 if (config == null) {
606 return;
607 }
608 log.debug("multicast config received: {}", config);
609
610 if (config.egressVlan() != null) {
611 mcastVlan = config.egressVlan().toShort();
612 }
Esin Karamanbb35a3b2020-03-18 13:53:24 +0000613 if (config.egressInnerVlan() != null) {
614 mcastInnerVlan = config.egressInnerVlan();
615 }
Esin Karamane4890012020-04-19 11:58:54 +0000616 feedStatsServiceWithVlanConfigValues();
Esin Karaman39b24852019-08-28 13:57:30 +0000617 }
618
619 private class NextKey {
620 private DeviceId device;
621 private IpAddress group;
622
623 public NextKey(DeviceId deviceId, IpAddress groupAddress) {
624 device = deviceId;
625 group = groupAddress;
626 }
627
628 public DeviceId getDevice() {
629 return device;
630 }
631
632 public int hashCode() {
633 return Objects.hash(this.device, this.group);
634 }
635
636 public boolean equals(Object obj) {
637 if (this == obj) {
638 return true;
639 } else if (!(obj instanceof NextKey)) {
640 return false;
641 } else {
642 NextKey that = (NextKey) obj;
643 return this.getClass() == that.getClass() &&
644 Objects.equals(this.device, that.device) &&
645 Objects.equals(this.group, that.group);
646 }
647 }
648 }
649
650 private class NextContent {
651 private Integer nextId;
652 private Set<PortNumber> outPorts;
653
654 public NextContent(Integer nextId, Set<PortNumber> outPorts) {
655 this.nextId = nextId;
656 this.outPorts = outPorts;
657 }
658
659 public Integer getNextId() {
660 return nextId;
661 }
662
663 public Set<PortNumber> getOutPorts() {
664 return ImmutableSet.copyOf(outPorts);
665 }
666
667 public int hashCode() {
668 return Objects.hash(this.nextId, this.outPorts);
669 }
670
671 public boolean equals(Object obj) {
672 if (this == obj) {
673 return true;
674 } else if (!(obj instanceof NextContent)) {
675 return false;
676 } else {
677 NextContent that = (NextContent) obj;
678 return this.getClass() == that.getClass() &&
679 Objects.equals(this.nextId, that.nextId) &&
680 Objects.equals(this.outPorts, that.outPorts);
alshabib3b1eadc2016-02-01 17:57:00 -0800681 }
682 }
683 }
684
ke han9590c812017-02-28 15:02:26 +0800685 private enum NextType { AddNew, AddToExisting, Remove, RemoveFromExisting };
686
Esin Karaman39b24852019-08-28 13:57:30 +0000687 private NextObjective nextObject(Integer nextId, PortNumber port,
688 NextType nextType, IpAddress mcastIp) {
ke han9590c812017-02-28 15:02:26 +0800689
Esin Karaman39b24852019-08-28 13:57:30 +0000690 // Build the meta selector with the fwd objective info
691 TrafficSelector.Builder metadata = DefaultTrafficSelector.builder()
692 .matchIPDst(mcastIp.toIpPrefix());
693
694 if (vlanEnabled) {
695 metadata.matchVlanId(multicastVlan());
Esin Karamanbb35a3b2020-03-18 13:53:24 +0000696
697 if (!mcastInnerVlan.equals(VlanId.NONE)) {
698 metadata.matchInnerVlanId(mcastInnerVlan);
699 }
Esin Karaman39b24852019-08-28 13:57:30 +0000700 }
701
Andrea Campanella03652352020-05-06 16:12:00 +0200702 DefaultNextObjective.Builder builder = DefaultNextObjective.builder()
ke han9590c812017-02-28 15:02:26 +0800703 .fromApp(appId)
ke han9590c812017-02-28 15:02:26 +0800704 .withType(NextObjective.Type.BROADCAST)
Esin Karaman39b24852019-08-28 13:57:30 +0000705 .withId(nextId)
706 .withMeta(metadata.build());
707
Andrea Campanella03652352020-05-06 16:12:00 +0200708 if (port == null && !nextType.equals(NextType.Remove)) {
709 log.error("Port can't be null with operation {}", nextType);
710 return null;
711 } else if (port != null && !nextType.equals(NextType.Remove)) {
712 builder.addTreatment(DefaultTrafficTreatment.builder().setOutput(port).build());
713 }
714
715 ObjectiveContext context = new ObjectiveContext() {
ke han9590c812017-02-28 15:02:26 +0800716 @Override
717 public void onSuccess(Objective objective) {
Andrea Campanella03652352020-05-06 16:12:00 +0200718 log.debug("Success for operation {} on Next Objective {}", objective.id(), nextType);
ke han9590c812017-02-28 15:02:26 +0800719 }
720
721 @Override
722 public void onError(Objective objective, ObjectiveError error) {
Esin Karaman39b24852019-08-28 13:57:30 +0000723 log.debug("Next Objective {} failed, because {}",
724 objective.id(),
725 error);
ke han9590c812017-02-28 15:02:26 +0800726 }
727 };
728
729 switch (nextType) {
730 case AddNew:
Andrea Campanella03652352020-05-06 16:12:00 +0200731 return builder.add(context);
ke han9590c812017-02-28 15:02:26 +0800732 case AddToExisting:
Andrea Campanella03652352020-05-06 16:12:00 +0200733 return builder.addToExisting(context);
ke han9590c812017-02-28 15:02:26 +0800734 case Remove:
Andrea Campanella03652352020-05-06 16:12:00 +0200735 return builder.remove(context);
ke han9590c812017-02-28 15:02:26 +0800736 case RemoveFromExisting:
Andrea Campanella03652352020-05-06 16:12:00 +0200737 return builder.removeFromExisting(context);
ke han9590c812017-02-28 15:02:26 +0800738 default:
739 return null;
740 }
741 }
742
Esin Karaman39b24852019-08-28 13:57:30 +0000743 private ForwardingObjective.Builder fwdObject(int nextId, IpAddress mcastIp) {
744 TrafficSelector.Builder mcast = DefaultTrafficSelector.builder()
745 .matchEthType(Ethernet.TYPE_IPV4)
746 .matchIPDst(mcastIp.toIpPrefix());
ke han9590c812017-02-28 15:02:26 +0800747
Esin Karaman39b24852019-08-28 13:57:30 +0000748 //build the meta selector
749 TrafficSelector.Builder metabuilder = DefaultTrafficSelector.builder();
750 if (vlanEnabled) {
751 metabuilder.matchVlanId(multicastVlan());
Esin Karamanbb35a3b2020-03-18 13:53:24 +0000752
753 if (!mcastInnerVlan.equals(VlanId.NONE)) {
754 metabuilder.matchInnerVlanId(mcastInnerVlan);
755 }
Jonathan Hart718c0452016-02-18 15:56:22 -0800756 }
757
Esin Karaman39b24852019-08-28 13:57:30 +0000758 ForwardingObjective.Builder fwdBuilder = DefaultForwardingObjective.builder()
759 .fromApp(appId)
760 .nextStep(nextId)
761 .makePermanent()
762 .withFlag(ForwardingObjective.Flag.SPECIFIC)
763 .withPriority(priority)
764 .withSelector(mcast.build())
765 .withMeta(metabuilder.build());
alshabibfc1cb032016-02-17 15:37:56 -0800766
Esin Karaman39b24852019-08-28 13:57:30 +0000767 return fwdBuilder;
alshabibfc1cb032016-02-17 15:37:56 -0800768 }
769
Esin Karaman39b24852019-08-28 13:57:30 +0000770 // Custom-built function, when the device is not available we need a fallback mechanism
771 private boolean isLocalLeader(DeviceId deviceId) {
772 if (!mastershipService.isLocalMaster(deviceId)) {
773 // When the device is available we just check the mastership
774 if (deviceService.isAvailable(deviceId)) {
ke han9590c812017-02-28 15:02:26 +0800775 return false;
ke han9590c812017-02-28 15:02:26 +0800776 }
Esin Karaman39b24852019-08-28 13:57:30 +0000777 // Fallback with Leadership service - device id is used as topic
778 NodeId leader = leadershipService.runForLeadership(
779 deviceId.toString()).leaderNodeId();
780 // Verify if this node is the leader
781 return clusterService.getLocalNode().id().equals(leader);
ke han9590c812017-02-28 15:02:26 +0800782 }
Esin Karaman39b24852019-08-28 13:57:30 +0000783 return true;
ke han9590c812017-02-28 15:02:26 +0800784 }
Esin Karaman39b24852019-08-28 13:57:30 +0000785
Andrea Campanellac2f3ddd2020-05-18 11:09:11 +0200786 private class InternalDeviceListener implements DeviceListener {
787
788 @Override
789 public void event(DeviceEvent event) {
790 eventExecutor.execute(() -> {
791 DeviceId devId = event.subject().id();
Andrea Campanella86bee262020-05-18 20:15:01 +0200792 if (!deviceService.isAvailable(devId) &&
793 isLocalLeader(event.subject().id())) {
Andrea Campanellac2f3ddd2020-05-18 11:09:11 +0200794 if (deviceService.getPorts(devId).isEmpty()) {
795 log.info("Handling controlled device disconnection .. "
796 + "flushing all state for dev:{}", devId);
797 groupService.purgeGroupEntries(devId);
798 groups.keySet().iterator().forEachRemaining(groupInfo -> {
799 if (groupInfo.device.equals(devId)) {
800 log.debug("Removing next key {} from distributed mcast map", groupInfo.group);
801 groups.remove(groupInfo);
802 }
803 });
804 } else {
805 log.info("Disconnected device has available ports .. "
806 + "assuming temporary disconnection, "
807 + "retaining state for device {}", devId);
808 }
Andrea Campanellac2f3ddd2020-05-18 11:09:11 +0200809 }
810 });
811
812 }
813
814 @Override
815 public boolean isRelevant(DeviceEvent event) {
Andrea Campanella86bee262020-05-18 20:15:01 +0200816 return event.type().equals(DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED);
Andrea Campanellac2f3ddd2020-05-18 11:09:11 +0200817 }
818 }
819
alshabib3b1eadc2016-02-01 17:57:00 -0800820}
ke hanf1709e82016-08-12 10:48:17 +0800821
ke han9590c812017-02-28 15:02:26 +0800822