blob: c47f9d1913d79cd891911cf857a9c3d16eccb3f1 [file] [log] [blame]
alshabib3b1eadc2016-02-01 17:57:00 -08001/*
Brian O'Connorcf85aa82017-08-03 22:46:01 -07002 * Copyright 2016-present Open Networking Foundation
alshabib3b1eadc2016-02-01 17:57:00 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Daniele Moro8ea9e102020-03-24 18:56:52 +010016package org.opencord.cordmcast.impl;
alshabib3b1eadc2016-02-01 17:57:00 -080017
Esin Karaman39b24852019-08-28 13:57:30 +000018import com.google.common.collect.ImmutableSet;
19import com.google.common.collect.Sets;
alshabib09069c92016-02-21 14:49:51 -080020import org.apache.commons.lang3.tuple.ImmutablePair;
alshabib3b1eadc2016-02-01 17:57:00 -080021import org.onlab.packet.Ethernet;
alshabib3b1eadc2016-02-01 17:57:00 -080022import org.onlab.packet.IpAddress;
23import org.onlab.packet.VlanId;
Esin Karaman39b24852019-08-28 13:57:30 +000024import org.onlab.util.KryoNamespace;
Jonathan Hart28271642016-02-10 16:13:54 -080025import org.onosproject.cfg.ComponentConfigService;
Esin Karaman39b24852019-08-28 13:57:30 +000026import org.onosproject.cluster.ClusterService;
27import org.onosproject.cluster.LeadershipService;
28import org.onosproject.cluster.NodeId;
alshabib3b1eadc2016-02-01 17:57:00 -080029import org.onosproject.core.ApplicationId;
30import org.onosproject.core.CoreService;
Esin Karaman39b24852019-08-28 13:57:30 +000031import org.onosproject.mastership.MastershipService;
32import org.onosproject.mcast.api.McastEvent;
33import org.onosproject.mcast.api.McastListener;
34import org.onosproject.mcast.api.McastRoute;
35import org.onosproject.mcast.api.MulticastRouteService;
alshabib3b1eadc2016-02-01 17:57:00 -080036import org.onosproject.net.ConnectPoint;
Daniele Moro8ea9e102020-03-24 18:56:52 +010037import org.onosproject.net.Device;
ke han9590c812017-02-28 15:02:26 +080038import org.onosproject.net.DeviceId;
Esin Karaman39b24852019-08-28 13:57:30 +000039import org.onosproject.net.HostId;
ke han9590c812017-02-28 15:02:26 +080040import org.onosproject.net.PortNumber;
ke hanf1709e82016-08-12 10:48:17 +080041import org.onosproject.net.config.ConfigFactory;
42import org.onosproject.net.config.NetworkConfigEvent;
43import org.onosproject.net.config.NetworkConfigListener;
44import org.onosproject.net.config.NetworkConfigRegistry;
Esin Karaman39b24852019-08-28 13:57:30 +000045import org.onosproject.net.config.basics.McastConfig;
ke hanf1709e82016-08-12 10:48:17 +080046import org.onosproject.net.config.basics.SubjectFactories;
Esin Karaman39b24852019-08-28 13:57:30 +000047import org.onosproject.net.device.DeviceService;
alshabib3b1eadc2016-02-01 17:57:00 -080048import org.onosproject.net.flow.DefaultTrafficSelector;
49import org.onosproject.net.flow.DefaultTrafficTreatment;
50import org.onosproject.net.flow.TrafficSelector;
51import org.onosproject.net.flowobjective.DefaultForwardingObjective;
52import org.onosproject.net.flowobjective.DefaultNextObjective;
Esin Karaman39b24852019-08-28 13:57:30 +000053import org.onosproject.net.flowobjective.DefaultObjectiveContext;
alshabib3b1eadc2016-02-01 17:57:00 -080054import org.onosproject.net.flowobjective.FlowObjectiveService;
55import org.onosproject.net.flowobjective.ForwardingObjective;
56import org.onosproject.net.flowobjective.NextObjective;
57import org.onosproject.net.flowobjective.Objective;
58import org.onosproject.net.flowobjective.ObjectiveContext;
59import org.onosproject.net.flowobjective.ObjectiveError;
Esin Karaman39b24852019-08-28 13:57:30 +000060import org.onosproject.store.serializers.KryoNamespaces;
61import org.onosproject.store.service.ConsistentMap;
62import org.onosproject.store.service.Serializer;
63import org.onosproject.store.service.StorageService;
64import org.onosproject.store.service.Versioned;
Daniele Moro8ea9e102020-03-24 18:56:52 +010065import org.opencord.cordmcast.CordMcastService;
66import org.opencord.cordmcast.CordMcastStatisticsService;
67import org.opencord.sadis.SadisService;
68import org.opencord.sadis.SubscriberAndDeviceInformation;
Jonathan Hart28271642016-02-10 16:13:54 -080069import org.osgi.service.component.ComponentContext;
Daniele Moro8ea9e102020-03-24 18:56:52 +010070import org.osgi.service.component.annotations.Activate;
71import org.osgi.service.component.annotations.Component;
72import org.osgi.service.component.annotations.Deactivate;
73import org.osgi.service.component.annotations.Modified;
74import org.osgi.service.component.annotations.Reference;
75import org.osgi.service.component.annotations.ReferenceCardinality;
alshabib3b1eadc2016-02-01 17:57:00 -080076import org.slf4j.Logger;
77
Jonathan Hart28271642016-02-10 16:13:54 -080078import java.util.Dictionary;
alshabib3b1eadc2016-02-01 17:57:00 -080079import java.util.Map;
ke han9590c812017-02-28 15:02:26 +080080import java.util.Objects;
Jonathan Hart0c194962016-05-23 17:08:15 -070081import java.util.Optional;
alshabibfc1cb032016-02-17 15:37:56 -080082import java.util.Properties;
Esin Karaman39b24852019-08-28 13:57:30 +000083import java.util.Set;
84import java.util.concurrent.ExecutorService;
85import java.util.concurrent.locks.Lock;
86import java.util.concurrent.locks.ReentrantLock;
Andrea Campanella03652352020-05-06 16:12:00 +020087import java.util.function.Consumer;
alshabib3b1eadc2016-02-01 17:57:00 -080088
alshabibfc1cb032016-02-17 15:37:56 -080089import static com.google.common.base.Strings.isNullOrEmpty;
Esin Karaman39b24852019-08-28 13:57:30 +000090import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
alshabibfc1cb032016-02-17 15:37:56 -080091import static org.onlab.util.Tools.get;
Esin Karaman39b24852019-08-28 13:57:30 +000092import static org.onlab.util.Tools.groupedThreads;
Daniele Moro8ea9e102020-03-24 18:56:52 +010093import static org.opencord.cordmcast.impl.OsgiPropertyConstants.*;
alshabib3b1eadc2016-02-01 17:57:00 -080094import static org.slf4j.LoggerFactory.getLogger;
95
Esin Karaman39b24852019-08-28 13:57:30 +000096
alshabib3b1eadc2016-02-01 17:57:00 -080097/**
Jonathan Hartc3f84eb2016-02-19 12:44:36 -080098 * CORD multicast provisioning application. Operates by listening to
Jonathan Hart28271642016-02-10 16:13:54 -080099 * events on the multicast rib and provisioning groups to program multicast
alshabib3b1eadc2016-02-01 17:57:00 -0800100 * flows on the dataplane.
101 */
Carmelo Cascone995fd682019-11-14 14:22:39 -0800102@Component(immediate = true,
103 property = {
104 VLAN_ENABLED + ":Boolean=" + DEFAULT_VLAN_ENABLED,
105 PRIORITY + ":Integer=" + DEFAULT_PRIORITY,
106})
Daniele Moro8ea9e102020-03-24 18:56:52 +0100107public class CordMcast implements CordMcastService {
108 private static final String APP_NAME = "org.opencord.mcast";
alshabib3b1eadc2016-02-01 17:57:00 -0800109
Jonathan Hart0c194962016-05-23 17:08:15 -0700110 private final Logger log = getLogger(getClass());
alshabib09069c92016-02-21 14:49:51 -0800111
alshabib09069c92016-02-21 14:49:51 -0800112 private static final int DEFAULT_PRIORITY = 500;
alshabib3b1eadc2016-02-01 17:57:00 -0800113 private static final short DEFAULT_MCAST_VLAN = 4000;
alshabibfc1cb032016-02-17 15:37:56 -0800114
Carmelo Cascone995fd682019-11-14 14:22:39 -0800115 @Reference(cardinality = ReferenceCardinality.MANDATORY)
alshabib3b1eadc2016-02-01 17:57:00 -0800116 protected MulticastRouteService mcastService;
117
Carmelo Cascone995fd682019-11-14 14:22:39 -0800118 @Reference(cardinality = ReferenceCardinality.MANDATORY)
alshabib3b1eadc2016-02-01 17:57:00 -0800119 protected FlowObjectiveService flowObjectiveService;
120
Carmelo Cascone995fd682019-11-14 14:22:39 -0800121 @Reference(cardinality = ReferenceCardinality.MANDATORY)
alshabib3b1eadc2016-02-01 17:57:00 -0800122 protected CoreService coreService;
123
Carmelo Cascone995fd682019-11-14 14:22:39 -0800124 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jonathan Hart28271642016-02-10 16:13:54 -0800125 protected ComponentConfigService componentConfigService;
126
Carmelo Cascone995fd682019-11-14 14:22:39 -0800127 @Reference(cardinality = ReferenceCardinality.MANDATORY)
ke hanf1709e82016-08-12 10:48:17 +0800128 protected NetworkConfigRegistry networkConfig;
129
Carmelo Cascone995fd682019-11-14 14:22:39 -0800130 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Esin Karaman39b24852019-08-28 13:57:30 +0000131 protected StorageService storageService;
132
Carmelo Cascone995fd682019-11-14 14:22:39 -0800133 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Esin Karaman39b24852019-08-28 13:57:30 +0000134 protected MastershipService mastershipService;
135
Carmelo Cascone995fd682019-11-14 14:22:39 -0800136 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Esin Karaman39b24852019-08-28 13:57:30 +0000137 public DeviceService deviceService;
138
Carmelo Cascone995fd682019-11-14 14:22:39 -0800139 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Esin Karaman39b24852019-08-28 13:57:30 +0000140 private ClusterService clusterService;
141
Carmelo Cascone995fd682019-11-14 14:22:39 -0800142 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Esin Karaman39b24852019-08-28 13:57:30 +0000143 private LeadershipService leadershipService;
144
Esin Karaman996177c2020-03-05 13:21:09 +0000145 @Reference(cardinality = ReferenceCardinality.MANDATORY)
146 protected SadisService sadisService;
147
Arjun E Kabf9e6e2020-03-02 10:15:21 +0000148 @Reference(cardinality = ReferenceCardinality.MANDATORY)
149 protected CordMcastStatisticsService cordMcastStatisticsService;
150
alshabib3b1eadc2016-02-01 17:57:00 -0800151 protected McastListener listener = new InternalMulticastListener();
Arjun E Kabf9e6e2020-03-02 10:15:21 +0000152
ke hanf1709e82016-08-12 10:48:17 +0800153 private InternalNetworkConfigListener configListener =
154 new InternalNetworkConfigListener();
alshabib3b1eadc2016-02-01 17:57:00 -0800155
Esin Karaman39b24852019-08-28 13:57:30 +0000156 private ConsistentMap<NextKey, NextContent> groups;
alshabib3b1eadc2016-02-01 17:57:00 -0800157
alshabib3b1eadc2016-02-01 17:57:00 -0800158 private ApplicationId appId;
ke hanf1709e82016-08-12 10:48:17 +0800159 private ApplicationId coreAppId;
Esin Karaman39b24852019-08-28 13:57:30 +0000160 private short mcastVlan = DEFAULT_MCAST_VLAN;
Esin Karamanbb35a3b2020-03-18 13:53:24 +0000161 private VlanId mcastInnerVlan = VlanId.NONE;
alshabib3b1eadc2016-02-01 17:57:00 -0800162
Carmelo Cascone995fd682019-11-14 14:22:39 -0800163 /**
164 * Whether to use VLAN for multicast traffic.
165 **/
alshabib09069c92016-02-21 14:49:51 -0800166 private boolean vlanEnabled = DEFAULT_VLAN_ENABLED;
alshabibfc1cb032016-02-17 15:37:56 -0800167
Carmelo Cascone995fd682019-11-14 14:22:39 -0800168 /**
169 * Priority for multicast rules.
170 **/
alshabib3b1eadc2016-02-01 17:57:00 -0800171 private int priority = DEFAULT_PRIORITY;
172
ke hanf1709e82016-08-12 10:48:17 +0800173 private static final Class<McastConfig> CORD_MCAST_CONFIG_CLASS =
174 McastConfig.class;
175
176 private ConfigFactory<ApplicationId, McastConfig> cordMcastConfigFactory =
177 new ConfigFactory<ApplicationId, McastConfig>(
178 SubjectFactories.APP_SUBJECT_FACTORY, CORD_MCAST_CONFIG_CLASS, "multicast") {
179 @Override
180 public McastConfig createConfig() {
181 return new McastConfig();
182 }
183 };
Jonathan Hart28271642016-02-10 16:13:54 -0800184
Esin Karaman39b24852019-08-28 13:57:30 +0000185 // lock to synchronize local operations
186 private final Lock mcastLock = new ReentrantLock();
187 private void mcastLock() {
188 mcastLock.lock();
189 }
190 private void mcastUnlock() {
191 mcastLock.unlock();
192 }
193 private ExecutorService eventExecutor;
194
alshabib3b1eadc2016-02-01 17:57:00 -0800195 @Activate
Jonathan Hart435ffc42016-02-19 10:32:05 -0800196 public void activate(ComponentContext context) {
Jonathan Hartc3f84eb2016-02-19 12:44:36 -0800197 componentConfigService.registerProperties(getClass());
Jonathan Hart435ffc42016-02-19 10:32:05 -0800198 modified(context);
199
Charles Chanf867c4b2017-01-20 11:22:25 -0800200 appId = coreService.registerApplication(APP_NAME);
ke hanf1709e82016-08-12 10:48:17 +0800201 coreAppId = coreService.registerApplication(CoreService.CORE_APP_NAME);
Jonathan Hart28271642016-02-10 16:13:54 -0800202
Esin Karaman39b24852019-08-28 13:57:30 +0000203 eventExecutor = newSingleThreadScheduledExecutor(groupedThreads("cord/mcast",
204 "events-mcast-%d", log));
205
206 KryoNamespace.Builder groupsKryo = new KryoNamespace.Builder()
207 .register(KryoNamespaces.API)
208 .register(NextKey.class)
209 .register(NextContent.class);
210 groups = storageService
211 .<NextKey, NextContent>consistentMapBuilder()
212 .withName("cord-mcast-groups-store")
213 .withSerializer(Serializer.using(groupsKryo.build("CordMcast-Groups")))
214 .build();
215
ke hanf1709e82016-08-12 10:48:17 +0800216 networkConfig.registerConfigFactory(cordMcastConfigFactory);
217 networkConfig.addListener(configListener);
Jonathan Hartc3f84eb2016-02-19 12:44:36 -0800218 mcastService.addListener(listener);
219
alshabib09069c92016-02-21 14:49:51 -0800220 mcastService.getRoutes().stream()
Esin Karaman39b24852019-08-28 13:57:30 +0000221 .map(r -> new ImmutablePair<>(r, mcastService.sinks(r)))
alshabib09069c92016-02-21 14:49:51 -0800222 .filter(pair -> pair.getRight() != null && !pair.getRight().isEmpty())
Esin Karaman39b24852019-08-28 13:57:30 +0000223 .forEach(pair -> pair.getRight().forEach(sink -> addSink(pair.getLeft(),
224 sink)));
Jonathan Hartc3f84eb2016-02-19 12:44:36 -0800225
ke hanf1709e82016-08-12 10:48:17 +0800226 McastConfig config = networkConfig.getConfig(coreAppId, CORD_MCAST_CONFIG_CLASS);
Esin Karaman39b24852019-08-28 13:57:30 +0000227 updateConfig(config);
alshabib3b1eadc2016-02-01 17:57:00 -0800228 log.info("Started");
229 }
230
Arjun E Kabf9e6e2020-03-02 10:15:21 +0000231 @Modified
232 public void modified(ComponentContext context) {
233 Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
234
235 String s = get(properties, VLAN_ENABLED);
236 vlanEnabled = isNullOrEmpty(s) ? DEFAULT_VLAN_ENABLED : Boolean.parseBoolean(s.trim());
237
238 try {
239 s = get(properties, PRIORITY);
240 priority = isNullOrEmpty(s) ? DEFAULT_PRIORITY : Integer.parseInt(s.trim());
241 } catch (NumberFormatException ne) {
242 log.error("Unable to parse configuration parameter for priority", ne);
243 priority = DEFAULT_PRIORITY;
244 }
Esin Karamane4890012020-04-19 11:58:54 +0000245 feedStatsServiceWithVlanConfigValues();
Arjun E Kabf9e6e2020-03-02 10:15:21 +0000246 }
247
alshabib3b1eadc2016-02-01 17:57:00 -0800248 @Deactivate
249 public void deactivate() {
Jonathan Hartc3f84eb2016-02-19 12:44:36 -0800250 componentConfigService.unregisterProperties(getClass(), false);
alshabib3b1eadc2016-02-01 17:57:00 -0800251 mcastService.removeListener(listener);
ke hanf1709e82016-08-12 10:48:17 +0800252 networkConfig.removeListener(configListener);
ke han9590c812017-02-28 15:02:26 +0800253 networkConfig.unregisterConfigFactory(cordMcastConfigFactory);
Esin Karaman39b24852019-08-28 13:57:30 +0000254 eventExecutor.shutdown();
Andrea Campanella03652352020-05-06 16:12:00 +0200255 eventExecutor = null;
256 groups.clear();
Esin Karaman39b24852019-08-28 13:57:30 +0000257 groups.destroy();
alshabib3b1eadc2016-02-01 17:57:00 -0800258 log.info("Stopped");
259 }
260
Esin Karamane4890012020-04-19 11:58:54 +0000261 /**
262 * Updates the stats service with current VLAN config values.
263 */
264 private void feedStatsServiceWithVlanConfigValues() {
265 cordMcastStatisticsService.setVlanValue(assignedVlan());
266 cordMcastStatisticsService.setInnerVlanValue(assignedInnerVlan());
267 }
268
Andrea Campanella03652352020-05-06 16:12:00 +0200269 private void clearGroups() {
Esin Karaman39b24852019-08-28 13:57:30 +0000270 mcastLock();
271 try {
272 groups.keySet().forEach(groupInfo -> {
Andrea Campanella03652352020-05-06 16:12:00 +0200273 NextContent next = groups.get(groupInfo).value();
Esin Karaman39b24852019-08-28 13:57:30 +0000274 if (!isLocalLeader(groupInfo.getDevice())) {
275 return;
276 }
Sonal Kasliwala0bbe6c2020-01-06 10:46:30 +0000277 if (next != null) {
Andrea Campanella03652352020-05-06 16:12:00 +0200278 //On Success of removing the fwd objective we remove also the group.
279 Consumer<Objective> onSuccess = (objective) -> {
280 log.debug("Successfully removed fwd objective for {} on {}, " +
281 "removing next objective {}", groupInfo.group,
282 groupInfo.getDevice(), next.getNextId());
283 eventExecutor.submit(() -> flowObjectiveService.next(groupInfo.getDevice(),
284 nextObject(next.getNextId(),
285 null,
286 NextType.Remove, groupInfo.group)));
287 };
288
289 ObjectiveContext context =
290 new DefaultObjectiveContext(onSuccess, (objective, error) ->
291 log.warn("Failed to remove {} on {}: {}",
292 groupInfo.group, next.getNextId(), error));
Sonal Kasliwala0bbe6c2020-01-06 10:46:30 +0000293 // remove the flow rule
Andrea Campanella03652352020-05-06 16:12:00 +0200294 flowObjectiveService.forward(groupInfo.getDevice(),
295 fwdObject(next.getNextId(),
296 groupInfo.group).remove(context));
Esin Karaman39b24852019-08-28 13:57:30 +0000297
Sonal Kasliwala0bbe6c2020-01-06 10:46:30 +0000298 }
Esin Karaman39b24852019-08-28 13:57:30 +0000299 });
Esin Karaman39b24852019-08-28 13:57:30 +0000300 } finally {
301 mcastUnlock();
302 }
303 }
304
305 private VlanId multicastVlan() {
306 return VlanId.vlanId(mcastVlan);
307 }
308
Arjun E Kabf9e6e2020-03-02 10:15:21 +0000309 protected VlanId assignedVlan() {
Esin Karaman39b24852019-08-28 13:57:30 +0000310 return vlanEnabled ? multicastVlan() : VlanId.NONE;
ke han9590c812017-02-28 15:02:26 +0800311 }
312
Esin Karamanbb35a3b2020-03-18 13:53:24 +0000313 protected VlanId assignedInnerVlan() {
314 return vlanEnabled ? mcastInnerVlan : VlanId.NONE;
315 }
316
alshabib3b1eadc2016-02-01 17:57:00 -0800317 private class InternalMulticastListener implements McastListener {
318 @Override
319 public void event(McastEvent event) {
Esin Karaman39b24852019-08-28 13:57:30 +0000320 eventExecutor.execute(() -> {
321 switch (event.type()) {
322 case ROUTE_ADDED:
323 case ROUTE_REMOVED:
324 case SOURCES_ADDED:
325 break;
326 case SINKS_ADDED:
327 addSinks(event);
328 break;
329 case SINKS_REMOVED:
330 removeSinks(event);
331 break;
332 default:
333 log.warn("Unknown mcast event {}", event.type());
334 }
335 });
336 }
337 }
338
339 /**
340 * Processes previous, and new sinks then finds the sinks to be removed.
341 * @param prevSinks the previous sinks to be evaluated
342 * @param newSinks the new sinks to be evaluated
343 * @returnt the set of the sinks to be removed
344 */
345 private Set<ConnectPoint> getSinksToBeRemoved(Map<HostId, Set<ConnectPoint>> prevSinks,
346 Map<HostId, Set<ConnectPoint>> newSinks) {
347 return getSinksToBeProcessed(prevSinks, newSinks);
348 }
349
350
351 /**
352 * Processes previous, and new sinks then finds the sinks to be added.
353 * @param newSinks the new sinks to be processed
354 * @param allPrevSinks all previous sinks
355 * @return the set of the sinks to be added
356 */
357 private Set<ConnectPoint> getSinksToBeAdded(Map<HostId, Set<ConnectPoint>> newSinks,
358 Map<HostId, Set<ConnectPoint>> allPrevSinks) {
359 return getSinksToBeProcessed(newSinks, allPrevSinks);
360 }
361
362 /**
363 * Gets single-homed sinks that are in set1 but not in set2.
364 * @param sinkSet1 the first sink map
365 * @param sinkSet2 the second sink map
366 * @return a set containing all the single-homed sinks found in set1 but not in set2
367 */
368 private Set<ConnectPoint> getSinksToBeProcessed(Map<HostId, Set<ConnectPoint>> sinkSet1,
369 Map<HostId, Set<ConnectPoint>> sinkSet2) {
370 final Set<ConnectPoint> sinksToBeProcessed = Sets.newHashSet();
371 sinkSet1.forEach(((hostId, connectPoints) -> {
372 if (HostId.NONE.equals(hostId)) {
373 //assume all connect points associated with HostId.NONE are single homed sinks
374 sinksToBeProcessed.addAll(connectPoints);
375 return;
376 }
377 }));
378 Set<ConnectPoint> singleHomedSinksOfSet2 = sinkSet2.get(HostId.NONE) == null ?
379 Sets.newHashSet() :
380 sinkSet2.get(HostId.NONE);
381 return Sets.difference(sinksToBeProcessed, singleHomedSinksOfSet2);
382 };
383
384
385 private void removeSinks(McastEvent event) {
386 mcastLock();
387 try {
388 Set<ConnectPoint> sinksToBeRemoved = getSinksToBeRemoved(event.prevSubject().sinks(),
389 event.subject().sinks());
390 sinksToBeRemoved.forEach(sink -> removeSink(event.subject().route().group(), sink));
391 } finally {
392 mcastUnlock();
393 }
394 }
395
396 private void removeSink(IpAddress group, ConnectPoint sink) {
397 if (!isLocalLeader(sink.deviceId())) {
398 log.debug("Not the leader of {}. Skip sink_removed event for the sink {} and group {}",
399 sink.deviceId(), sink, group);
400 return;
401 }
402
Esin Karaman996177c2020-03-05 13:21:09 +0000403 Optional<SubscriberAndDeviceInformation> oltInfo = getSubscriberAndDeviceInformation(sink.deviceId());
Esin Karaman39b24852019-08-28 13:57:30 +0000404
405 if (!oltInfo.isPresent()) {
406 log.warn("Unknown OLT device : {}", sink.deviceId());
407 return;
408 }
409
410 log.debug("Removing sink {} from the group {}", sink, group);
411
412 NextKey key = new NextKey(sink.deviceId(), group);
413 groups.computeIfPresent(key, (k, v) -> {
Esin Karaman39b24852019-08-28 13:57:30 +0000414
415 Set<PortNumber> outPorts = Sets.newHashSet(v.getOutPorts());
416 outPorts.remove(sink.port());
417
418 if (outPorts.isEmpty()) {
Andrea Campanella03652352020-05-06 16:12:00 +0200419 log.debug("No more output ports for group {}, removing next and fwd objectives", group);
420
421 //On Success of removing the fwd objective we remove also the group.
422 Consumer<Objective> onSuccess = (objective) -> {
423 log.debug("Successfully removed fwd objective for {} on {}, " +
424 "removing next objective {}", group, sink, v.getNextId());
425 eventExecutor.execute(() -> {
426 //No port is needed since it's a remove Operation
427 flowObjectiveService.next(sink.deviceId(), nextObject(v.getNextId(),
428 null,
429 NextType.Remove, group));
430 });
431 };
432
Esin Karaman39b24852019-08-28 13:57:30 +0000433 // this is the last sink
Andrea Campanella03652352020-05-06 16:12:00 +0200434 ObjectiveContext context = new DefaultObjectiveContext(onSuccess,
Esin Karaman39b24852019-08-28 13:57:30 +0000435 (objective, error) -> log.warn("Failed to remove {} on {}: {}",
436 group, sink, error));
437 ForwardingObjective fwdObj = fwdObject(v.getNextId(), group).remove(context);
438 flowObjectiveService.forward(sink.deviceId(), fwdObj);
Andrea Campanella03652352020-05-06 16:12:00 +0200439 } else {
440 log.debug("Group {} has remaining {} ports, removing just {} " +
441 "from it's sinks", group, outPorts, sink.port());
442 flowObjectiveService.next(sink.deviceId(), nextObject(v.getNextId(), sink.port(),
443 NextType.RemoveFromExisting, group));
Esin Karaman39b24852019-08-28 13:57:30 +0000444 }
445 // remove the whole entity if no out port exists in the port list
446 return outPorts.isEmpty() ? null : new NextContent(v.getNextId(),
447 ImmutableSet.copyOf(outPorts));
448 });
449 }
450
451 private void addSinks(McastEvent event) {
452 mcastLock();
453 try {
454 Set<ConnectPoint> sinksToBeAdded = getSinksToBeAdded(event.subject().sinks(),
455 event.prevSubject().sinks());
456 sinksToBeAdded.forEach(sink -> addSink(event.subject().route(), sink));
457 } finally {
458 mcastUnlock();
459 }
460 }
461
462 private void addSink(McastRoute route, ConnectPoint sink) {
Arjun E Kabf9e6e2020-03-02 10:15:21 +0000463
Esin Karaman39b24852019-08-28 13:57:30 +0000464 if (!isLocalLeader(sink.deviceId())) {
465 log.debug("Not the leader of {}. Skip sink_added event for the sink {} and group {}",
466 sink.deviceId(), sink, route.group());
467 return;
468 }
469
Esin Karaman996177c2020-03-05 13:21:09 +0000470 Optional<SubscriberAndDeviceInformation> oltInfo = getSubscriberAndDeviceInformation(sink.deviceId());
Esin Karaman39b24852019-08-28 13:57:30 +0000471
472 if (!oltInfo.isPresent()) {
473 log.warn("Unknown OLT device : {}", sink.deviceId());
474 return;
475 }
476
477 log.debug("Adding sink {} to the group {}", sink, route.group());
478
479 NextKey key = new NextKey(sink.deviceId(), route.group());
480 NextObjective newNextObj;
481
482 boolean theFirstSinkOfGroup = false;
483 if (!groups.containsKey(key)) {
484 // First time someone request this mcast group via this device
485 Integer nextId = flowObjectiveService.allocateNextId();
486 newNextObj = nextObject(nextId, sink.port(), NextType.AddNew, route.group());
487 // Store the new port
488 groups.put(key, new NextContent(nextId, ImmutableSet.of(sink.port())));
489 theFirstSinkOfGroup = true;
490 } else {
491 // This device already serves some subscribers of this mcast group
492 Versioned<NextContent> nextObj = groups.get(key);
493 if (nextObj.value().getOutPorts().contains(sink.port())) {
494 log.info("Group {} already serves the sink connected to {}", route.group(), sink);
495 return;
496 }
497 newNextObj = nextObject(nextObj.value().getNextId(), sink.port(),
498 NextType.AddToExisting, route.group());
499 // add new port to the group
500 Set<PortNumber> outPorts = Sets.newHashSet(nextObj.value().getOutPorts());
501 outPorts.add(sink.port());
502 groups.put(key, new NextContent(newNextObj.id(), ImmutableSet.copyOf(outPorts)));
503 }
504
505 ObjectiveContext context = new DefaultObjectiveContext(
Esin Karamanbb35a3b2020-03-18 13:53:24 +0000506 (objective) -> log.debug("Successfully add {} on {}/{}, vlan {}, inner vlan {}",
Esin Karaman39b24852019-08-28 13:57:30 +0000507 route.group(), sink.deviceId(), sink.port().toLong(),
Esin Karamanbb35a3b2020-03-18 13:53:24 +0000508 assignedVlan(), assignedInnerVlan()),
Esin Karaman39b24852019-08-28 13:57:30 +0000509 (objective, error) -> {
Esin Karamanbb35a3b2020-03-18 13:53:24 +0000510 log.warn("Failed to add {} on {}/{}, vlan {}, inner vlan {}: {}",
Esin Karaman39b24852019-08-28 13:57:30 +0000511 route.group(), sink.deviceId(), sink.port().toLong(), assignedVlan(),
Esin Karamanbb35a3b2020-03-18 13:53:24 +0000512 assignedInnerVlan(), error);
Esin Karaman39b24852019-08-28 13:57:30 +0000513 });
514
515 flowObjectiveService.next(sink.deviceId(), newNextObj);
516
517 if (theFirstSinkOfGroup) {
518 // create the necessary flow rule if this is the first sink request for the group
519 // on this device
520 flowObjectiveService.forward(sink.deviceId(), fwdObject(newNextObj.id(),
521 route.group()).add(context));
522 }
523 }
524
Esin Karaman996177c2020-03-05 13:21:09 +0000525 /**
526 * Fetches device information associated with the device serial number from SADIS.
527 *
528 * @param serialNumber serial number of a device
529 * @return device information; an empty Optional otherwise.
530 */
531 private Optional<SubscriberAndDeviceInformation> getSubscriberAndDeviceInformation(String serialNumber) {
532 long start = System.currentTimeMillis();
533 try {
534 return Optional.ofNullable(sadisService.getSubscriberInfoService().get(serialNumber));
535 } finally {
536 if (log.isDebugEnabled()) {
537 // SADIS may call remote systems to fetch device data and this calls can take a long time.
538 // This measurement is just for monitoring these kinds of situations.
539 log.debug("Device fetched from SADIS. Elapsed {} msec", System.currentTimeMillis() - start);
540 }
541
542 }
543 }
544
545 /**
546 * Fetches device information associated with the device serial number from SADIS.
547 *
548 * @param deviceId device id
549 * @return device information; an empty Optional otherwise.
550 */
551 private Optional<SubscriberAndDeviceInformation> getSubscriberAndDeviceInformation(DeviceId deviceId) {
552 Device device = deviceService.getDevice(deviceId);
553 if (device == null || device.serialNumber() == null) {
554 return Optional.empty();
555 }
556 return getSubscriberAndDeviceInformation(device.serialNumber());
557 }
558
Esin Karaman39b24852019-08-28 13:57:30 +0000559 private class InternalNetworkConfigListener implements NetworkConfigListener {
560 @Override
561 public void event(NetworkConfigEvent event) {
562 eventExecutor.execute(() -> {
563 switch (event.type()) {
564
565 case CONFIG_ADDED:
566 case CONFIG_UPDATED:
567 if (event.configClass().equals(CORD_MCAST_CONFIG_CLASS)) {
568 McastConfig config = networkConfig.getConfig(coreAppId, CORD_MCAST_CONFIG_CLASS);
569 if (config != null) {
570 //TODO: Simply remove flows/groups, hosts will response period query
571 // and re-sent IGMP report, so the flows can be rebuild.
572 // However, better to remove and re-add mcast flow rules here
Esin Karamanbb35a3b2020-03-18 13:53:24 +0000573 if (vlanEnabled && (mcastVlan != config.egressVlan().toShort() ||
574 !mcastInnerVlan.equals(config.egressInnerVlan()))) {
Esin Karaman39b24852019-08-28 13:57:30 +0000575 clearGroups();
576 }
577 updateConfig(config);
578 }
579 }
580 break;
581 case CONFIG_REGISTERED:
582 case CONFIG_UNREGISTERED:
583 case CONFIG_REMOVED:
584 break;
585 default:
586 break;
587 }
588 });
589 }
590 }
591
592 private void updateConfig(McastConfig config) {
593 if (config == null) {
594 return;
595 }
596 log.debug("multicast config received: {}", config);
597
598 if (config.egressVlan() != null) {
599 mcastVlan = config.egressVlan().toShort();
600 }
Esin Karamanbb35a3b2020-03-18 13:53:24 +0000601 if (config.egressInnerVlan() != null) {
602 mcastInnerVlan = config.egressInnerVlan();
603 }
Esin Karamane4890012020-04-19 11:58:54 +0000604 feedStatsServiceWithVlanConfigValues();
Esin Karaman39b24852019-08-28 13:57:30 +0000605 }
606
607 private class NextKey {
608 private DeviceId device;
609 private IpAddress group;
610
611 public NextKey(DeviceId deviceId, IpAddress groupAddress) {
612 device = deviceId;
613 group = groupAddress;
614 }
615
616 public DeviceId getDevice() {
617 return device;
618 }
619
620 public int hashCode() {
621 return Objects.hash(this.device, this.group);
622 }
623
624 public boolean equals(Object obj) {
625 if (this == obj) {
626 return true;
627 } else if (!(obj instanceof NextKey)) {
628 return false;
629 } else {
630 NextKey that = (NextKey) obj;
631 return this.getClass() == that.getClass() &&
632 Objects.equals(this.device, that.device) &&
633 Objects.equals(this.group, that.group);
634 }
635 }
636 }
637
638 private class NextContent {
639 private Integer nextId;
640 private Set<PortNumber> outPorts;
641
642 public NextContent(Integer nextId, Set<PortNumber> outPorts) {
643 this.nextId = nextId;
644 this.outPorts = outPorts;
645 }
646
647 public Integer getNextId() {
648 return nextId;
649 }
650
651 public Set<PortNumber> getOutPorts() {
652 return ImmutableSet.copyOf(outPorts);
653 }
654
655 public int hashCode() {
656 return Objects.hash(this.nextId, this.outPorts);
657 }
658
659 public boolean equals(Object obj) {
660 if (this == obj) {
661 return true;
662 } else if (!(obj instanceof NextContent)) {
663 return false;
664 } else {
665 NextContent that = (NextContent) obj;
666 return this.getClass() == that.getClass() &&
667 Objects.equals(this.nextId, that.nextId) &&
668 Objects.equals(this.outPorts, that.outPorts);
alshabib3b1eadc2016-02-01 17:57:00 -0800669 }
670 }
671 }
672
ke han9590c812017-02-28 15:02:26 +0800673 private enum NextType { AddNew, AddToExisting, Remove, RemoveFromExisting };
674
Esin Karaman39b24852019-08-28 13:57:30 +0000675 private NextObjective nextObject(Integer nextId, PortNumber port,
676 NextType nextType, IpAddress mcastIp) {
ke han9590c812017-02-28 15:02:26 +0800677
Esin Karaman39b24852019-08-28 13:57:30 +0000678 // Build the meta selector with the fwd objective info
679 TrafficSelector.Builder metadata = DefaultTrafficSelector.builder()
680 .matchIPDst(mcastIp.toIpPrefix());
681
682 if (vlanEnabled) {
683 metadata.matchVlanId(multicastVlan());
Esin Karamanbb35a3b2020-03-18 13:53:24 +0000684
685 if (!mcastInnerVlan.equals(VlanId.NONE)) {
686 metadata.matchInnerVlanId(mcastInnerVlan);
687 }
Esin Karaman39b24852019-08-28 13:57:30 +0000688 }
689
Andrea Campanella03652352020-05-06 16:12:00 +0200690 DefaultNextObjective.Builder builder = DefaultNextObjective.builder()
ke han9590c812017-02-28 15:02:26 +0800691 .fromApp(appId)
ke han9590c812017-02-28 15:02:26 +0800692 .withType(NextObjective.Type.BROADCAST)
Esin Karaman39b24852019-08-28 13:57:30 +0000693 .withId(nextId)
694 .withMeta(metadata.build());
695
Andrea Campanella03652352020-05-06 16:12:00 +0200696 if (port == null && !nextType.equals(NextType.Remove)) {
697 log.error("Port can't be null with operation {}", nextType);
698 return null;
699 } else if (port != null && !nextType.equals(NextType.Remove)) {
700 builder.addTreatment(DefaultTrafficTreatment.builder().setOutput(port).build());
701 }
702
703 ObjectiveContext context = new ObjectiveContext() {
ke han9590c812017-02-28 15:02:26 +0800704 @Override
705 public void onSuccess(Objective objective) {
Andrea Campanella03652352020-05-06 16:12:00 +0200706 log.debug("Success for operation {} on Next Objective {}", objective.id(), nextType);
ke han9590c812017-02-28 15:02:26 +0800707 }
708
709 @Override
710 public void onError(Objective objective, ObjectiveError error) {
Esin Karaman39b24852019-08-28 13:57:30 +0000711 log.debug("Next Objective {} failed, because {}",
712 objective.id(),
713 error);
ke han9590c812017-02-28 15:02:26 +0800714 }
715 };
716
717 switch (nextType) {
718 case AddNew:
Andrea Campanella03652352020-05-06 16:12:00 +0200719 return builder.add(context);
ke han9590c812017-02-28 15:02:26 +0800720 case AddToExisting:
Andrea Campanella03652352020-05-06 16:12:00 +0200721 return builder.addToExisting(context);
ke han9590c812017-02-28 15:02:26 +0800722 case Remove:
Andrea Campanella03652352020-05-06 16:12:00 +0200723 return builder.remove(context);
ke han9590c812017-02-28 15:02:26 +0800724 case RemoveFromExisting:
Andrea Campanella03652352020-05-06 16:12:00 +0200725 return builder.removeFromExisting(context);
ke han9590c812017-02-28 15:02:26 +0800726 default:
727 return null;
728 }
729 }
730
Esin Karaman39b24852019-08-28 13:57:30 +0000731 private ForwardingObjective.Builder fwdObject(int nextId, IpAddress mcastIp) {
732 TrafficSelector.Builder mcast = DefaultTrafficSelector.builder()
733 .matchEthType(Ethernet.TYPE_IPV4)
734 .matchIPDst(mcastIp.toIpPrefix());
ke han9590c812017-02-28 15:02:26 +0800735
Esin Karaman39b24852019-08-28 13:57:30 +0000736 //build the meta selector
737 TrafficSelector.Builder metabuilder = DefaultTrafficSelector.builder();
738 if (vlanEnabled) {
739 metabuilder.matchVlanId(multicastVlan());
Esin Karamanbb35a3b2020-03-18 13:53:24 +0000740
741 if (!mcastInnerVlan.equals(VlanId.NONE)) {
742 metabuilder.matchInnerVlanId(mcastInnerVlan);
743 }
Jonathan Hart718c0452016-02-18 15:56:22 -0800744 }
745
Esin Karaman39b24852019-08-28 13:57:30 +0000746 ForwardingObjective.Builder fwdBuilder = DefaultForwardingObjective.builder()
747 .fromApp(appId)
748 .nextStep(nextId)
749 .makePermanent()
750 .withFlag(ForwardingObjective.Flag.SPECIFIC)
751 .withPriority(priority)
752 .withSelector(mcast.build())
753 .withMeta(metabuilder.build());
alshabibfc1cb032016-02-17 15:37:56 -0800754
Esin Karaman39b24852019-08-28 13:57:30 +0000755 return fwdBuilder;
alshabibfc1cb032016-02-17 15:37:56 -0800756 }
757
Esin Karaman39b24852019-08-28 13:57:30 +0000758 // Custom-built function, when the device is not available we need a fallback mechanism
759 private boolean isLocalLeader(DeviceId deviceId) {
760 if (!mastershipService.isLocalMaster(deviceId)) {
761 // When the device is available we just check the mastership
762 if (deviceService.isAvailable(deviceId)) {
ke han9590c812017-02-28 15:02:26 +0800763 return false;
ke han9590c812017-02-28 15:02:26 +0800764 }
Esin Karaman39b24852019-08-28 13:57:30 +0000765 // Fallback with Leadership service - device id is used as topic
766 NodeId leader = leadershipService.runForLeadership(
767 deviceId.toString()).leaderNodeId();
768 // Verify if this node is the leader
769 return clusterService.getLocalNode().id().equals(leader);
ke han9590c812017-02-28 15:02:26 +0800770 }
Esin Karaman39b24852019-08-28 13:57:30 +0000771 return true;
ke han9590c812017-02-28 15:02:26 +0800772 }
Esin Karaman39b24852019-08-28 13:57:30 +0000773
alshabib3b1eadc2016-02-01 17:57:00 -0800774}
ke hanf1709e82016-08-12 10:48:17 +0800775
ke han9590c812017-02-28 15:02:26 +0800776