blob: d107f052beaadcc6afffa433dbbbf091a7a8dbd8 [file] [log] [blame]
alshabib3b1eadc2016-02-01 17:57:00 -08001/*
Brian O'Connorcf85aa82017-08-03 22:46:01 -07002 * Copyright 2016-present Open Networking Foundation
alshabib3b1eadc2016-02-01 17:57:00 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Daniele Moro8ea9e102020-03-24 18:56:52 +010016package org.opencord.cordmcast.impl;
alshabib3b1eadc2016-02-01 17:57:00 -080017
Esin Karaman39b24852019-08-28 13:57:30 +000018import com.google.common.collect.ImmutableSet;
19import com.google.common.collect.Sets;
alshabib09069c92016-02-21 14:49:51 -080020import org.apache.commons.lang3.tuple.ImmutablePair;
alshabib3b1eadc2016-02-01 17:57:00 -080021import org.onlab.packet.Ethernet;
alshabib3b1eadc2016-02-01 17:57:00 -080022import org.onlab.packet.IpAddress;
23import org.onlab.packet.VlanId;
Esin Karaman39b24852019-08-28 13:57:30 +000024import org.onlab.util.KryoNamespace;
Jonathan Hart28271642016-02-10 16:13:54 -080025import org.onosproject.cfg.ComponentConfigService;
Esin Karaman39b24852019-08-28 13:57:30 +000026import org.onosproject.cluster.ClusterService;
27import org.onosproject.cluster.LeadershipService;
28import org.onosproject.cluster.NodeId;
alshabib3b1eadc2016-02-01 17:57:00 -080029import org.onosproject.core.ApplicationId;
30import org.onosproject.core.CoreService;
Esin Karaman39b24852019-08-28 13:57:30 +000031import org.onosproject.mastership.MastershipService;
32import org.onosproject.mcast.api.McastEvent;
33import org.onosproject.mcast.api.McastListener;
34import org.onosproject.mcast.api.McastRoute;
35import org.onosproject.mcast.api.MulticastRouteService;
alshabib3b1eadc2016-02-01 17:57:00 -080036import org.onosproject.net.ConnectPoint;
Daniele Moro8ea9e102020-03-24 18:56:52 +010037import org.onosproject.net.Device;
ke han9590c812017-02-28 15:02:26 +080038import org.onosproject.net.DeviceId;
Esin Karaman39b24852019-08-28 13:57:30 +000039import org.onosproject.net.HostId;
ke han9590c812017-02-28 15:02:26 +080040import org.onosproject.net.PortNumber;
ke hanf1709e82016-08-12 10:48:17 +080041import org.onosproject.net.config.ConfigFactory;
42import org.onosproject.net.config.NetworkConfigEvent;
43import org.onosproject.net.config.NetworkConfigListener;
44import org.onosproject.net.config.NetworkConfigRegistry;
Esin Karaman39b24852019-08-28 13:57:30 +000045import org.onosproject.net.config.basics.McastConfig;
ke hanf1709e82016-08-12 10:48:17 +080046import org.onosproject.net.config.basics.SubjectFactories;
Esin Karaman39b24852019-08-28 13:57:30 +000047import org.onosproject.net.device.DeviceService;
alshabib3b1eadc2016-02-01 17:57:00 -080048import org.onosproject.net.flow.DefaultTrafficSelector;
49import org.onosproject.net.flow.DefaultTrafficTreatment;
50import org.onosproject.net.flow.TrafficSelector;
51import org.onosproject.net.flowobjective.DefaultForwardingObjective;
52import org.onosproject.net.flowobjective.DefaultNextObjective;
Esin Karaman39b24852019-08-28 13:57:30 +000053import org.onosproject.net.flowobjective.DefaultObjectiveContext;
alshabib3b1eadc2016-02-01 17:57:00 -080054import org.onosproject.net.flowobjective.FlowObjectiveService;
55import org.onosproject.net.flowobjective.ForwardingObjective;
56import org.onosproject.net.flowobjective.NextObjective;
57import org.onosproject.net.flowobjective.Objective;
58import org.onosproject.net.flowobjective.ObjectiveContext;
59import org.onosproject.net.flowobjective.ObjectiveError;
Esin Karaman39b24852019-08-28 13:57:30 +000060import org.onosproject.store.serializers.KryoNamespaces;
61import org.onosproject.store.service.ConsistentMap;
62import org.onosproject.store.service.Serializer;
63import org.onosproject.store.service.StorageService;
64import org.onosproject.store.service.Versioned;
Daniele Moro8ea9e102020-03-24 18:56:52 +010065import org.opencord.cordmcast.CordMcastService;
66import org.opencord.cordmcast.CordMcastStatisticsService;
67import org.opencord.sadis.SadisService;
68import org.opencord.sadis.SubscriberAndDeviceInformation;
Jonathan Hart28271642016-02-10 16:13:54 -080069import org.osgi.service.component.ComponentContext;
Daniele Moro8ea9e102020-03-24 18:56:52 +010070import org.osgi.service.component.annotations.Activate;
71import org.osgi.service.component.annotations.Component;
72import org.osgi.service.component.annotations.Deactivate;
73import org.osgi.service.component.annotations.Modified;
74import org.osgi.service.component.annotations.Reference;
75import org.osgi.service.component.annotations.ReferenceCardinality;
alshabib3b1eadc2016-02-01 17:57:00 -080076import org.slf4j.Logger;
77
Jonathan Hart28271642016-02-10 16:13:54 -080078import java.util.Dictionary;
alshabib3b1eadc2016-02-01 17:57:00 -080079import java.util.Map;
ke han9590c812017-02-28 15:02:26 +080080import java.util.Objects;
Jonathan Hart0c194962016-05-23 17:08:15 -070081import java.util.Optional;
alshabibfc1cb032016-02-17 15:37:56 -080082import java.util.Properties;
Esin Karaman39b24852019-08-28 13:57:30 +000083import java.util.Set;
84import java.util.concurrent.ExecutorService;
85import java.util.concurrent.locks.Lock;
86import java.util.concurrent.locks.ReentrantLock;
alshabib3b1eadc2016-02-01 17:57:00 -080087
alshabibfc1cb032016-02-17 15:37:56 -080088import static com.google.common.base.Strings.isNullOrEmpty;
Esin Karaman39b24852019-08-28 13:57:30 +000089import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
alshabibfc1cb032016-02-17 15:37:56 -080090import static org.onlab.util.Tools.get;
Esin Karaman39b24852019-08-28 13:57:30 +000091import static org.onlab.util.Tools.groupedThreads;
Daniele Moro8ea9e102020-03-24 18:56:52 +010092import static org.opencord.cordmcast.impl.OsgiPropertyConstants.*;
alshabib3b1eadc2016-02-01 17:57:00 -080093import static org.slf4j.LoggerFactory.getLogger;
94
Esin Karaman39b24852019-08-28 13:57:30 +000095
alshabib3b1eadc2016-02-01 17:57:00 -080096/**
Jonathan Hartc3f84eb2016-02-19 12:44:36 -080097 * CORD multicast provisioning application. Operates by listening to
Jonathan Hart28271642016-02-10 16:13:54 -080098 * events on the multicast rib and provisioning groups to program multicast
alshabib3b1eadc2016-02-01 17:57:00 -080099 * flows on the dataplane.
100 */
Carmelo Cascone995fd682019-11-14 14:22:39 -0800101@Component(immediate = true,
102 property = {
103 VLAN_ENABLED + ":Boolean=" + DEFAULT_VLAN_ENABLED,
104 PRIORITY + ":Integer=" + DEFAULT_PRIORITY,
105})
Daniele Moro8ea9e102020-03-24 18:56:52 +0100106public class CordMcast implements CordMcastService {
107 private static final String APP_NAME = "org.opencord.mcast";
alshabib3b1eadc2016-02-01 17:57:00 -0800108
Jonathan Hart0c194962016-05-23 17:08:15 -0700109 private final Logger log = getLogger(getClass());
alshabib09069c92016-02-21 14:49:51 -0800110
alshabib09069c92016-02-21 14:49:51 -0800111 private static final int DEFAULT_PRIORITY = 500;
alshabib3b1eadc2016-02-01 17:57:00 -0800112 private static final short DEFAULT_MCAST_VLAN = 4000;
alshabibfc1cb032016-02-17 15:37:56 -0800113
Carmelo Cascone995fd682019-11-14 14:22:39 -0800114 @Reference(cardinality = ReferenceCardinality.MANDATORY)
alshabib3b1eadc2016-02-01 17:57:00 -0800115 protected MulticastRouteService mcastService;
116
Carmelo Cascone995fd682019-11-14 14:22:39 -0800117 @Reference(cardinality = ReferenceCardinality.MANDATORY)
alshabib3b1eadc2016-02-01 17:57:00 -0800118 protected FlowObjectiveService flowObjectiveService;
119
Carmelo Cascone995fd682019-11-14 14:22:39 -0800120 @Reference(cardinality = ReferenceCardinality.MANDATORY)
alshabib3b1eadc2016-02-01 17:57:00 -0800121 protected CoreService coreService;
122
Carmelo Cascone995fd682019-11-14 14:22:39 -0800123 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jonathan Hart28271642016-02-10 16:13:54 -0800124 protected ComponentConfigService componentConfigService;
125
Carmelo Cascone995fd682019-11-14 14:22:39 -0800126 @Reference(cardinality = ReferenceCardinality.MANDATORY)
ke hanf1709e82016-08-12 10:48:17 +0800127 protected NetworkConfigRegistry networkConfig;
128
Carmelo Cascone995fd682019-11-14 14:22:39 -0800129 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Esin Karaman39b24852019-08-28 13:57:30 +0000130 protected StorageService storageService;
131
Carmelo Cascone995fd682019-11-14 14:22:39 -0800132 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Esin Karaman39b24852019-08-28 13:57:30 +0000133 protected MastershipService mastershipService;
134
Carmelo Cascone995fd682019-11-14 14:22:39 -0800135 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Esin Karaman39b24852019-08-28 13:57:30 +0000136 public DeviceService deviceService;
137
Carmelo Cascone995fd682019-11-14 14:22:39 -0800138 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Esin Karaman39b24852019-08-28 13:57:30 +0000139 private ClusterService clusterService;
140
Carmelo Cascone995fd682019-11-14 14:22:39 -0800141 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Esin Karaman39b24852019-08-28 13:57:30 +0000142 private LeadershipService leadershipService;
143
Esin Karaman996177c2020-03-05 13:21:09 +0000144 @Reference(cardinality = ReferenceCardinality.MANDATORY)
145 protected SadisService sadisService;
146
Arjun E Kabf9e6e2020-03-02 10:15:21 +0000147 @Reference(cardinality = ReferenceCardinality.MANDATORY)
148 protected CordMcastStatisticsService cordMcastStatisticsService;
149
alshabib3b1eadc2016-02-01 17:57:00 -0800150 protected McastListener listener = new InternalMulticastListener();
Arjun E Kabf9e6e2020-03-02 10:15:21 +0000151
ke hanf1709e82016-08-12 10:48:17 +0800152 private InternalNetworkConfigListener configListener =
153 new InternalNetworkConfigListener();
alshabib3b1eadc2016-02-01 17:57:00 -0800154
Esin Karaman39b24852019-08-28 13:57:30 +0000155 private ConsistentMap<NextKey, NextContent> groups;
alshabib3b1eadc2016-02-01 17:57:00 -0800156
alshabib3b1eadc2016-02-01 17:57:00 -0800157 private ApplicationId appId;
ke hanf1709e82016-08-12 10:48:17 +0800158 private ApplicationId coreAppId;
Esin Karaman39b24852019-08-28 13:57:30 +0000159 private short mcastVlan = DEFAULT_MCAST_VLAN;
Esin Karamanbb35a3b2020-03-18 13:53:24 +0000160 private VlanId mcastInnerVlan = VlanId.NONE;
alshabib3b1eadc2016-02-01 17:57:00 -0800161
Carmelo Cascone995fd682019-11-14 14:22:39 -0800162 /**
163 * Whether to use VLAN for multicast traffic.
164 **/
alshabib09069c92016-02-21 14:49:51 -0800165 private boolean vlanEnabled = DEFAULT_VLAN_ENABLED;
alshabibfc1cb032016-02-17 15:37:56 -0800166
Carmelo Cascone995fd682019-11-14 14:22:39 -0800167 /**
168 * Priority for multicast rules.
169 **/
alshabib3b1eadc2016-02-01 17:57:00 -0800170 private int priority = DEFAULT_PRIORITY;
171
ke hanf1709e82016-08-12 10:48:17 +0800172 private static final Class<McastConfig> CORD_MCAST_CONFIG_CLASS =
173 McastConfig.class;
174
175 private ConfigFactory<ApplicationId, McastConfig> cordMcastConfigFactory =
176 new ConfigFactory<ApplicationId, McastConfig>(
177 SubjectFactories.APP_SUBJECT_FACTORY, CORD_MCAST_CONFIG_CLASS, "multicast") {
178 @Override
179 public McastConfig createConfig() {
180 return new McastConfig();
181 }
182 };
Jonathan Hart28271642016-02-10 16:13:54 -0800183
Esin Karaman39b24852019-08-28 13:57:30 +0000184 // lock to synchronize local operations
185 private final Lock mcastLock = new ReentrantLock();
186 private void mcastLock() {
187 mcastLock.lock();
188 }
189 private void mcastUnlock() {
190 mcastLock.unlock();
191 }
192 private ExecutorService eventExecutor;
193
alshabib3b1eadc2016-02-01 17:57:00 -0800194 @Activate
Jonathan Hart435ffc42016-02-19 10:32:05 -0800195 public void activate(ComponentContext context) {
Jonathan Hartc3f84eb2016-02-19 12:44:36 -0800196 componentConfigService.registerProperties(getClass());
Jonathan Hart435ffc42016-02-19 10:32:05 -0800197 modified(context);
198
Charles Chanf867c4b2017-01-20 11:22:25 -0800199 appId = coreService.registerApplication(APP_NAME);
ke hanf1709e82016-08-12 10:48:17 +0800200 coreAppId = coreService.registerApplication(CoreService.CORE_APP_NAME);
Jonathan Hart28271642016-02-10 16:13:54 -0800201
Esin Karaman39b24852019-08-28 13:57:30 +0000202 eventExecutor = newSingleThreadScheduledExecutor(groupedThreads("cord/mcast",
203 "events-mcast-%d", log));
204
205 KryoNamespace.Builder groupsKryo = new KryoNamespace.Builder()
206 .register(KryoNamespaces.API)
207 .register(NextKey.class)
208 .register(NextContent.class);
209 groups = storageService
210 .<NextKey, NextContent>consistentMapBuilder()
211 .withName("cord-mcast-groups-store")
212 .withSerializer(Serializer.using(groupsKryo.build("CordMcast-Groups")))
213 .build();
214
ke hanf1709e82016-08-12 10:48:17 +0800215 networkConfig.registerConfigFactory(cordMcastConfigFactory);
216 networkConfig.addListener(configListener);
Jonathan Hartc3f84eb2016-02-19 12:44:36 -0800217 mcastService.addListener(listener);
218
alshabib09069c92016-02-21 14:49:51 -0800219 mcastService.getRoutes().stream()
Esin Karaman39b24852019-08-28 13:57:30 +0000220 .map(r -> new ImmutablePair<>(r, mcastService.sinks(r)))
alshabib09069c92016-02-21 14:49:51 -0800221 .filter(pair -> pair.getRight() != null && !pair.getRight().isEmpty())
Esin Karaman39b24852019-08-28 13:57:30 +0000222 .forEach(pair -> pair.getRight().forEach(sink -> addSink(pair.getLeft(),
223 sink)));
Jonathan Hartc3f84eb2016-02-19 12:44:36 -0800224
ke hanf1709e82016-08-12 10:48:17 +0800225 McastConfig config = networkConfig.getConfig(coreAppId, CORD_MCAST_CONFIG_CLASS);
Esin Karaman39b24852019-08-28 13:57:30 +0000226 updateConfig(config);
alshabib3b1eadc2016-02-01 17:57:00 -0800227 log.info("Started");
228 }
229
Arjun E Kabf9e6e2020-03-02 10:15:21 +0000230 @Modified
231 public void modified(ComponentContext context) {
232 Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
233
234 String s = get(properties, VLAN_ENABLED);
235 vlanEnabled = isNullOrEmpty(s) ? DEFAULT_VLAN_ENABLED : Boolean.parseBoolean(s.trim());
236
237 try {
238 s = get(properties, PRIORITY);
239 priority = isNullOrEmpty(s) ? DEFAULT_PRIORITY : Integer.parseInt(s.trim());
240 } catch (NumberFormatException ne) {
241 log.error("Unable to parse configuration parameter for priority", ne);
242 priority = DEFAULT_PRIORITY;
243 }
Esin Karamane4890012020-04-19 11:58:54 +0000244 feedStatsServiceWithVlanConfigValues();
Arjun E Kabf9e6e2020-03-02 10:15:21 +0000245 }
246
alshabib3b1eadc2016-02-01 17:57:00 -0800247 @Deactivate
248 public void deactivate() {
Jonathan Hartc3f84eb2016-02-19 12:44:36 -0800249 componentConfigService.unregisterProperties(getClass(), false);
alshabib3b1eadc2016-02-01 17:57:00 -0800250 mcastService.removeListener(listener);
ke hanf1709e82016-08-12 10:48:17 +0800251 networkConfig.removeListener(configListener);
ke han9590c812017-02-28 15:02:26 +0800252 networkConfig.unregisterConfigFactory(cordMcastConfigFactory);
Esin Karaman39b24852019-08-28 13:57:30 +0000253 eventExecutor.shutdown();
ke han9590c812017-02-28 15:02:26 +0800254 clearGroups();
Esin Karaman39b24852019-08-28 13:57:30 +0000255 groups.destroy();
alshabib3b1eadc2016-02-01 17:57:00 -0800256 log.info("Stopped");
257 }
258
Esin Karamane4890012020-04-19 11:58:54 +0000259 /**
260 * Updates the stats service with current VLAN config values.
261 */
262 private void feedStatsServiceWithVlanConfigValues() {
263 cordMcastStatisticsService.setVlanValue(assignedVlan());
264 cordMcastStatisticsService.setInnerVlanValue(assignedInnerVlan());
265 }
266
ke han9590c812017-02-28 15:02:26 +0800267 public void clearGroups() {
Esin Karaman39b24852019-08-28 13:57:30 +0000268 mcastLock();
269 try {
270 groups.keySet().forEach(groupInfo -> {
271 if (!isLocalLeader(groupInfo.getDevice())) {
272 return;
273 }
274 NextContent next = groups.get(groupInfo).value();
275
Sonal Kasliwala0bbe6c2020-01-06 10:46:30 +0000276 if (next != null) {
277 ObjectiveContext context = new DefaultObjectiveContext(
278 (objective) -> log.debug("Successfully remove {}",
279 groupInfo.group),
280 (objective, error) -> log.warn("Failed to remove {}: {}",
281 groupInfo.group, error));
282 // remove the flow rule
283 flowObjectiveService.forward(groupInfo.getDevice(), fwdObject(next.getNextId(),
284 groupInfo.group).remove(context));
285 // remove all ports from the group
286 next.getOutPorts().stream().forEach(portNumber ->
287 flowObjectiveService.next(groupInfo.getDevice(), nextObject(next.getNextId(),
288 portNumber,
289 NextType.RemoveFromExisting,
290 groupInfo.group))
291 );
Esin Karaman39b24852019-08-28 13:57:30 +0000292
Sonal Kasliwala0bbe6c2020-01-06 10:46:30 +0000293 }
Esin Karaman39b24852019-08-28 13:57:30 +0000294 });
295 groups.clear();
296 } finally {
297 mcastUnlock();
298 }
299 }
300
301 private VlanId multicastVlan() {
302 return VlanId.vlanId(mcastVlan);
303 }
304
Arjun E Kabf9e6e2020-03-02 10:15:21 +0000305 protected VlanId assignedVlan() {
Esin Karaman39b24852019-08-28 13:57:30 +0000306 return vlanEnabled ? multicastVlan() : VlanId.NONE;
ke han9590c812017-02-28 15:02:26 +0800307 }
308
Esin Karamanbb35a3b2020-03-18 13:53:24 +0000309 protected VlanId assignedInnerVlan() {
310 return vlanEnabled ? mcastInnerVlan : VlanId.NONE;
311 }
312
alshabib3b1eadc2016-02-01 17:57:00 -0800313 private class InternalMulticastListener implements McastListener {
314 @Override
315 public void event(McastEvent event) {
Esin Karaman39b24852019-08-28 13:57:30 +0000316 eventExecutor.execute(() -> {
317 switch (event.type()) {
318 case ROUTE_ADDED:
319 case ROUTE_REMOVED:
320 case SOURCES_ADDED:
321 break;
322 case SINKS_ADDED:
323 addSinks(event);
324 break;
325 case SINKS_REMOVED:
326 removeSinks(event);
327 break;
328 default:
329 log.warn("Unknown mcast event {}", event.type());
330 }
331 });
332 }
333 }
334
335 /**
336 * Processes previous, and new sinks then finds the sinks to be removed.
337 * @param prevSinks the previous sinks to be evaluated
338 * @param newSinks the new sinks to be evaluated
339 * @returnt the set of the sinks to be removed
340 */
341 private Set<ConnectPoint> getSinksToBeRemoved(Map<HostId, Set<ConnectPoint>> prevSinks,
342 Map<HostId, Set<ConnectPoint>> newSinks) {
343 return getSinksToBeProcessed(prevSinks, newSinks);
344 }
345
346
347 /**
348 * Processes previous, and new sinks then finds the sinks to be added.
349 * @param newSinks the new sinks to be processed
350 * @param allPrevSinks all previous sinks
351 * @return the set of the sinks to be added
352 */
353 private Set<ConnectPoint> getSinksToBeAdded(Map<HostId, Set<ConnectPoint>> newSinks,
354 Map<HostId, Set<ConnectPoint>> allPrevSinks) {
355 return getSinksToBeProcessed(newSinks, allPrevSinks);
356 }
357
358 /**
359 * Gets single-homed sinks that are in set1 but not in set2.
360 * @param sinkSet1 the first sink map
361 * @param sinkSet2 the second sink map
362 * @return a set containing all the single-homed sinks found in set1 but not in set2
363 */
364 private Set<ConnectPoint> getSinksToBeProcessed(Map<HostId, Set<ConnectPoint>> sinkSet1,
365 Map<HostId, Set<ConnectPoint>> sinkSet2) {
366 final Set<ConnectPoint> sinksToBeProcessed = Sets.newHashSet();
367 sinkSet1.forEach(((hostId, connectPoints) -> {
368 if (HostId.NONE.equals(hostId)) {
369 //assume all connect points associated with HostId.NONE are single homed sinks
370 sinksToBeProcessed.addAll(connectPoints);
371 return;
372 }
373 }));
374 Set<ConnectPoint> singleHomedSinksOfSet2 = sinkSet2.get(HostId.NONE) == null ?
375 Sets.newHashSet() :
376 sinkSet2.get(HostId.NONE);
377 return Sets.difference(sinksToBeProcessed, singleHomedSinksOfSet2);
378 };
379
380
381 private void removeSinks(McastEvent event) {
382 mcastLock();
383 try {
384 Set<ConnectPoint> sinksToBeRemoved = getSinksToBeRemoved(event.prevSubject().sinks(),
385 event.subject().sinks());
386 sinksToBeRemoved.forEach(sink -> removeSink(event.subject().route().group(), sink));
387 } finally {
388 mcastUnlock();
389 }
390 }
391
392 private void removeSink(IpAddress group, ConnectPoint sink) {
393 if (!isLocalLeader(sink.deviceId())) {
394 log.debug("Not the leader of {}. Skip sink_removed event for the sink {} and group {}",
395 sink.deviceId(), sink, group);
396 return;
397 }
398
Esin Karaman996177c2020-03-05 13:21:09 +0000399 Optional<SubscriberAndDeviceInformation> oltInfo = getSubscriberAndDeviceInformation(sink.deviceId());
Esin Karaman39b24852019-08-28 13:57:30 +0000400
401 if (!oltInfo.isPresent()) {
402 log.warn("Unknown OLT device : {}", sink.deviceId());
403 return;
404 }
405
406 log.debug("Removing sink {} from the group {}", sink, group);
407
408 NextKey key = new NextKey(sink.deviceId(), group);
409 groups.computeIfPresent(key, (k, v) -> {
410 flowObjectiveService.next(sink.deviceId(), nextObject(v.getNextId(), sink.port(),
411 NextType.RemoveFromExisting, group));
412
413 Set<PortNumber> outPorts = Sets.newHashSet(v.getOutPorts());
414 outPorts.remove(sink.port());
415
416 if (outPorts.isEmpty()) {
417 // this is the last sink
418 ObjectiveContext context = new DefaultObjectiveContext(
419 (objective) -> log.debug("Successfully remove {} on {}",
420 group, sink),
421 (objective, error) -> log.warn("Failed to remove {} on {}: {}",
422 group, sink, error));
423 ForwardingObjective fwdObj = fwdObject(v.getNextId(), group).remove(context);
424 flowObjectiveService.forward(sink.deviceId(), fwdObj);
425 }
426 // remove the whole entity if no out port exists in the port list
427 return outPorts.isEmpty() ? null : new NextContent(v.getNextId(),
428 ImmutableSet.copyOf(outPorts));
429 });
430 }
431
432 private void addSinks(McastEvent event) {
433 mcastLock();
434 try {
435 Set<ConnectPoint> sinksToBeAdded = getSinksToBeAdded(event.subject().sinks(),
436 event.prevSubject().sinks());
437 sinksToBeAdded.forEach(sink -> addSink(event.subject().route(), sink));
438 } finally {
439 mcastUnlock();
440 }
441 }
442
443 private void addSink(McastRoute route, ConnectPoint sink) {
Arjun E Kabf9e6e2020-03-02 10:15:21 +0000444
Esin Karaman39b24852019-08-28 13:57:30 +0000445 if (!isLocalLeader(sink.deviceId())) {
446 log.debug("Not the leader of {}. Skip sink_added event for the sink {} and group {}",
447 sink.deviceId(), sink, route.group());
448 return;
449 }
450
Esin Karaman996177c2020-03-05 13:21:09 +0000451 Optional<SubscriberAndDeviceInformation> oltInfo = getSubscriberAndDeviceInformation(sink.deviceId());
Esin Karaman39b24852019-08-28 13:57:30 +0000452
453 if (!oltInfo.isPresent()) {
454 log.warn("Unknown OLT device : {}", sink.deviceId());
455 return;
456 }
457
458 log.debug("Adding sink {} to the group {}", sink, route.group());
459
460 NextKey key = new NextKey(sink.deviceId(), route.group());
461 NextObjective newNextObj;
462
463 boolean theFirstSinkOfGroup = false;
464 if (!groups.containsKey(key)) {
465 // First time someone request this mcast group via this device
466 Integer nextId = flowObjectiveService.allocateNextId();
467 newNextObj = nextObject(nextId, sink.port(), NextType.AddNew, route.group());
468 // Store the new port
469 groups.put(key, new NextContent(nextId, ImmutableSet.of(sink.port())));
470 theFirstSinkOfGroup = true;
471 } else {
472 // This device already serves some subscribers of this mcast group
473 Versioned<NextContent> nextObj = groups.get(key);
474 if (nextObj.value().getOutPorts().contains(sink.port())) {
475 log.info("Group {} already serves the sink connected to {}", route.group(), sink);
476 return;
477 }
478 newNextObj = nextObject(nextObj.value().getNextId(), sink.port(),
479 NextType.AddToExisting, route.group());
480 // add new port to the group
481 Set<PortNumber> outPorts = Sets.newHashSet(nextObj.value().getOutPorts());
482 outPorts.add(sink.port());
483 groups.put(key, new NextContent(newNextObj.id(), ImmutableSet.copyOf(outPorts)));
484 }
485
486 ObjectiveContext context = new DefaultObjectiveContext(
Esin Karamanbb35a3b2020-03-18 13:53:24 +0000487 (objective) -> log.debug("Successfully add {} on {}/{}, vlan {}, inner vlan {}",
Esin Karaman39b24852019-08-28 13:57:30 +0000488 route.group(), sink.deviceId(), sink.port().toLong(),
Esin Karamanbb35a3b2020-03-18 13:53:24 +0000489 assignedVlan(), assignedInnerVlan()),
Esin Karaman39b24852019-08-28 13:57:30 +0000490 (objective, error) -> {
Esin Karamanbb35a3b2020-03-18 13:53:24 +0000491 log.warn("Failed to add {} on {}/{}, vlan {}, inner vlan {}: {}",
Esin Karaman39b24852019-08-28 13:57:30 +0000492 route.group(), sink.deviceId(), sink.port().toLong(), assignedVlan(),
Esin Karamanbb35a3b2020-03-18 13:53:24 +0000493 assignedInnerVlan(), error);
Esin Karaman39b24852019-08-28 13:57:30 +0000494 });
495
496 flowObjectiveService.next(sink.deviceId(), newNextObj);
497
498 if (theFirstSinkOfGroup) {
499 // create the necessary flow rule if this is the first sink request for the group
500 // on this device
501 flowObjectiveService.forward(sink.deviceId(), fwdObject(newNextObj.id(),
502 route.group()).add(context));
503 }
504 }
505
Esin Karaman996177c2020-03-05 13:21:09 +0000506 /**
507 * Fetches device information associated with the device serial number from SADIS.
508 *
509 * @param serialNumber serial number of a device
510 * @return device information; an empty Optional otherwise.
511 */
512 private Optional<SubscriberAndDeviceInformation> getSubscriberAndDeviceInformation(String serialNumber) {
513 long start = System.currentTimeMillis();
514 try {
515 return Optional.ofNullable(sadisService.getSubscriberInfoService().get(serialNumber));
516 } finally {
517 if (log.isDebugEnabled()) {
518 // SADIS may call remote systems to fetch device data and this calls can take a long time.
519 // This measurement is just for monitoring these kinds of situations.
520 log.debug("Device fetched from SADIS. Elapsed {} msec", System.currentTimeMillis() - start);
521 }
522
523 }
524 }
525
526 /**
527 * Fetches device information associated with the device serial number from SADIS.
528 *
529 * @param deviceId device id
530 * @return device information; an empty Optional otherwise.
531 */
532 private Optional<SubscriberAndDeviceInformation> getSubscriberAndDeviceInformation(DeviceId deviceId) {
533 Device device = deviceService.getDevice(deviceId);
534 if (device == null || device.serialNumber() == null) {
535 return Optional.empty();
536 }
537 return getSubscriberAndDeviceInformation(device.serialNumber());
538 }
539
Esin Karaman39b24852019-08-28 13:57:30 +0000540 private class InternalNetworkConfigListener implements NetworkConfigListener {
541 @Override
542 public void event(NetworkConfigEvent event) {
543 eventExecutor.execute(() -> {
544 switch (event.type()) {
545
546 case CONFIG_ADDED:
547 case CONFIG_UPDATED:
548 if (event.configClass().equals(CORD_MCAST_CONFIG_CLASS)) {
549 McastConfig config = networkConfig.getConfig(coreAppId, CORD_MCAST_CONFIG_CLASS);
550 if (config != null) {
551 //TODO: Simply remove flows/groups, hosts will response period query
552 // and re-sent IGMP report, so the flows can be rebuild.
553 // However, better to remove and re-add mcast flow rules here
Esin Karamanbb35a3b2020-03-18 13:53:24 +0000554 if (vlanEnabled && (mcastVlan != config.egressVlan().toShort() ||
555 !mcastInnerVlan.equals(config.egressInnerVlan()))) {
Esin Karaman39b24852019-08-28 13:57:30 +0000556 clearGroups();
557 }
558 updateConfig(config);
559 }
560 }
561 break;
562 case CONFIG_REGISTERED:
563 case CONFIG_UNREGISTERED:
564 case CONFIG_REMOVED:
565 break;
566 default:
567 break;
568 }
569 });
570 }
571 }
572
573 private void updateConfig(McastConfig config) {
574 if (config == null) {
575 return;
576 }
577 log.debug("multicast config received: {}", config);
578
579 if (config.egressVlan() != null) {
580 mcastVlan = config.egressVlan().toShort();
581 }
Esin Karamanbb35a3b2020-03-18 13:53:24 +0000582 if (config.egressInnerVlan() != null) {
583 mcastInnerVlan = config.egressInnerVlan();
584 }
Esin Karamane4890012020-04-19 11:58:54 +0000585 feedStatsServiceWithVlanConfigValues();
Esin Karaman39b24852019-08-28 13:57:30 +0000586 }
587
588 private class NextKey {
589 private DeviceId device;
590 private IpAddress group;
591
592 public NextKey(DeviceId deviceId, IpAddress groupAddress) {
593 device = deviceId;
594 group = groupAddress;
595 }
596
597 public DeviceId getDevice() {
598 return device;
599 }
600
601 public int hashCode() {
602 return Objects.hash(this.device, this.group);
603 }
604
605 public boolean equals(Object obj) {
606 if (this == obj) {
607 return true;
608 } else if (!(obj instanceof NextKey)) {
609 return false;
610 } else {
611 NextKey that = (NextKey) obj;
612 return this.getClass() == that.getClass() &&
613 Objects.equals(this.device, that.device) &&
614 Objects.equals(this.group, that.group);
615 }
616 }
617 }
618
619 private class NextContent {
620 private Integer nextId;
621 private Set<PortNumber> outPorts;
622
623 public NextContent(Integer nextId, Set<PortNumber> outPorts) {
624 this.nextId = nextId;
625 this.outPorts = outPorts;
626 }
627
628 public Integer getNextId() {
629 return nextId;
630 }
631
632 public Set<PortNumber> getOutPorts() {
633 return ImmutableSet.copyOf(outPorts);
634 }
635
636 public int hashCode() {
637 return Objects.hash(this.nextId, this.outPorts);
638 }
639
640 public boolean equals(Object obj) {
641 if (this == obj) {
642 return true;
643 } else if (!(obj instanceof NextContent)) {
644 return false;
645 } else {
646 NextContent that = (NextContent) obj;
647 return this.getClass() == that.getClass() &&
648 Objects.equals(this.nextId, that.nextId) &&
649 Objects.equals(this.outPorts, that.outPorts);
alshabib3b1eadc2016-02-01 17:57:00 -0800650 }
651 }
652 }
653
ke han9590c812017-02-28 15:02:26 +0800654 private enum NextType { AddNew, AddToExisting, Remove, RemoveFromExisting };
655
Esin Karaman39b24852019-08-28 13:57:30 +0000656 private NextObjective nextObject(Integer nextId, PortNumber port,
657 NextType nextType, IpAddress mcastIp) {
ke han9590c812017-02-28 15:02:26 +0800658
Esin Karaman39b24852019-08-28 13:57:30 +0000659 // Build the meta selector with the fwd objective info
660 TrafficSelector.Builder metadata = DefaultTrafficSelector.builder()
661 .matchIPDst(mcastIp.toIpPrefix());
662
663 if (vlanEnabled) {
664 metadata.matchVlanId(multicastVlan());
Esin Karamanbb35a3b2020-03-18 13:53:24 +0000665
666 if (!mcastInnerVlan.equals(VlanId.NONE)) {
667 metadata.matchInnerVlanId(mcastInnerVlan);
668 }
Esin Karaman39b24852019-08-28 13:57:30 +0000669 }
670
ke han9590c812017-02-28 15:02:26 +0800671 DefaultNextObjective.Builder build = DefaultNextObjective.builder()
672 .fromApp(appId)
673 .addTreatment(DefaultTrafficTreatment.builder().setOutput(port).build())
674 .withType(NextObjective.Type.BROADCAST)
Esin Karaman39b24852019-08-28 13:57:30 +0000675 .withId(nextId)
676 .withMeta(metadata.build());
677
ke han9590c812017-02-28 15:02:26 +0800678 ObjectiveContext content = new ObjectiveContext() {
679 @Override
680 public void onSuccess(Objective objective) {
Esin Karaman39b24852019-08-28 13:57:30 +0000681 log.debug("Next Objective {} installed", objective.id());
ke han9590c812017-02-28 15:02:26 +0800682 }
683
684 @Override
685 public void onError(Objective objective, ObjectiveError error) {
Esin Karaman39b24852019-08-28 13:57:30 +0000686 log.debug("Next Objective {} failed, because {}",
687 objective.id(),
688 error);
ke han9590c812017-02-28 15:02:26 +0800689 }
690 };
691
692 switch (nextType) {
693 case AddNew:
694 return build.add(content);
695 case AddToExisting:
696 return build.addToExisting(content);
697 case Remove:
698 return build.remove(content);
699 case RemoveFromExisting:
700 return build.removeFromExisting(content);
701 default:
702 return null;
703 }
704 }
705
Esin Karaman39b24852019-08-28 13:57:30 +0000706 private ForwardingObjective.Builder fwdObject(int nextId, IpAddress mcastIp) {
707 TrafficSelector.Builder mcast = DefaultTrafficSelector.builder()
708 .matchEthType(Ethernet.TYPE_IPV4)
709 .matchIPDst(mcastIp.toIpPrefix());
ke han9590c812017-02-28 15:02:26 +0800710
Esin Karaman39b24852019-08-28 13:57:30 +0000711 //build the meta selector
712 TrafficSelector.Builder metabuilder = DefaultTrafficSelector.builder();
713 if (vlanEnabled) {
714 metabuilder.matchVlanId(multicastVlan());
Esin Karamanbb35a3b2020-03-18 13:53:24 +0000715
716 if (!mcastInnerVlan.equals(VlanId.NONE)) {
717 metabuilder.matchInnerVlanId(mcastInnerVlan);
718 }
Jonathan Hart718c0452016-02-18 15:56:22 -0800719 }
720
Esin Karaman39b24852019-08-28 13:57:30 +0000721 ForwardingObjective.Builder fwdBuilder = DefaultForwardingObjective.builder()
722 .fromApp(appId)
723 .nextStep(nextId)
724 .makePermanent()
725 .withFlag(ForwardingObjective.Flag.SPECIFIC)
726 .withPriority(priority)
727 .withSelector(mcast.build())
728 .withMeta(metabuilder.build());
alshabibfc1cb032016-02-17 15:37:56 -0800729
Esin Karaman39b24852019-08-28 13:57:30 +0000730 return fwdBuilder;
alshabibfc1cb032016-02-17 15:37:56 -0800731 }
732
Esin Karaman39b24852019-08-28 13:57:30 +0000733 // Custom-built function, when the device is not available we need a fallback mechanism
734 private boolean isLocalLeader(DeviceId deviceId) {
735 if (!mastershipService.isLocalMaster(deviceId)) {
736 // When the device is available we just check the mastership
737 if (deviceService.isAvailable(deviceId)) {
ke han9590c812017-02-28 15:02:26 +0800738 return false;
ke han9590c812017-02-28 15:02:26 +0800739 }
Esin Karaman39b24852019-08-28 13:57:30 +0000740 // Fallback with Leadership service - device id is used as topic
741 NodeId leader = leadershipService.runForLeadership(
742 deviceId.toString()).leaderNodeId();
743 // Verify if this node is the leader
744 return clusterService.getLocalNode().id().equals(leader);
ke han9590c812017-02-28 15:02:26 +0800745 }
Esin Karaman39b24852019-08-28 13:57:30 +0000746 return true;
ke han9590c812017-02-28 15:02:26 +0800747 }
Esin Karaman39b24852019-08-28 13:57:30 +0000748
alshabib3b1eadc2016-02-01 17:57:00 -0800749}
ke hanf1709e82016-08-12 10:48:17 +0800750
ke han9590c812017-02-28 15:02:26 +0800751