blob: 0d20e9c8578dc597e51d66d29155f20bbdaaadc5 [file] [log] [blame]
alshabib3b1eadc2016-02-01 17:57:00 -08001/*
Brian O'Connorcf85aa82017-08-03 22:46:01 -07002 * Copyright 2016-present Open Networking Foundation
alshabib3b1eadc2016-02-01 17:57:00 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
alshabib772e1582016-06-01 17:50:05 -070016package org.opencord.cordmcast;
alshabib3b1eadc2016-02-01 17:57:00 -080017
Esin Karaman39b24852019-08-28 13:57:30 +000018import com.google.common.collect.ImmutableSet;
19import com.google.common.collect.Sets;
alshabib09069c92016-02-21 14:49:51 -080020import org.apache.commons.lang3.tuple.ImmutablePair;
Esin Karaman996177c2020-03-05 13:21:09 +000021import org.onosproject.net.Device;
22import org.opencord.sadis.SadisService;
23import org.opencord.sadis.SubscriberAndDeviceInformation;
Carmelo Cascone995fd682019-11-14 14:22:39 -080024import org.osgi.service.component.annotations.Activate;
25import org.osgi.service.component.annotations.Component;
26import org.osgi.service.component.annotations.Deactivate;
27import org.osgi.service.component.annotations.Modified;
28import org.osgi.service.component.annotations.Reference;
29import org.osgi.service.component.annotations.ReferenceCardinality;
alshabib3b1eadc2016-02-01 17:57:00 -080030import org.onlab.packet.Ethernet;
alshabib3b1eadc2016-02-01 17:57:00 -080031import org.onlab.packet.IpAddress;
32import org.onlab.packet.VlanId;
Esin Karaman39b24852019-08-28 13:57:30 +000033import org.onlab.util.KryoNamespace;
Jonathan Hart28271642016-02-10 16:13:54 -080034import org.onosproject.cfg.ComponentConfigService;
Esin Karaman39b24852019-08-28 13:57:30 +000035import org.onosproject.cluster.ClusterService;
36import org.onosproject.cluster.LeadershipService;
37import org.onosproject.cluster.NodeId;
alshabib3b1eadc2016-02-01 17:57:00 -080038import org.onosproject.core.ApplicationId;
39import org.onosproject.core.CoreService;
Esin Karaman39b24852019-08-28 13:57:30 +000040import org.onosproject.mastership.MastershipService;
41import org.onosproject.mcast.api.McastEvent;
42import org.onosproject.mcast.api.McastListener;
43import org.onosproject.mcast.api.McastRoute;
44import org.onosproject.mcast.api.MulticastRouteService;
alshabib3b1eadc2016-02-01 17:57:00 -080045import org.onosproject.net.ConnectPoint;
ke han9590c812017-02-28 15:02:26 +080046import org.onosproject.net.DeviceId;
Esin Karaman39b24852019-08-28 13:57:30 +000047import org.onosproject.net.HostId;
ke han9590c812017-02-28 15:02:26 +080048import org.onosproject.net.PortNumber;
ke hanf1709e82016-08-12 10:48:17 +080049import org.onosproject.net.config.ConfigFactory;
50import org.onosproject.net.config.NetworkConfigEvent;
51import org.onosproject.net.config.NetworkConfigListener;
52import org.onosproject.net.config.NetworkConfigRegistry;
Esin Karaman39b24852019-08-28 13:57:30 +000053import org.onosproject.net.config.basics.McastConfig;
ke hanf1709e82016-08-12 10:48:17 +080054import org.onosproject.net.config.basics.SubjectFactories;
Esin Karaman39b24852019-08-28 13:57:30 +000055import org.onosproject.net.device.DeviceService;
alshabib3b1eadc2016-02-01 17:57:00 -080056import org.onosproject.net.flow.DefaultTrafficSelector;
57import org.onosproject.net.flow.DefaultTrafficTreatment;
58import org.onosproject.net.flow.TrafficSelector;
59import org.onosproject.net.flowobjective.DefaultForwardingObjective;
60import org.onosproject.net.flowobjective.DefaultNextObjective;
Esin Karaman39b24852019-08-28 13:57:30 +000061import org.onosproject.net.flowobjective.DefaultObjectiveContext;
alshabib3b1eadc2016-02-01 17:57:00 -080062import org.onosproject.net.flowobjective.FlowObjectiveService;
63import org.onosproject.net.flowobjective.ForwardingObjective;
64import org.onosproject.net.flowobjective.NextObjective;
65import org.onosproject.net.flowobjective.Objective;
66import org.onosproject.net.flowobjective.ObjectiveContext;
67import org.onosproject.net.flowobjective.ObjectiveError;
Esin Karaman39b24852019-08-28 13:57:30 +000068import org.onosproject.store.serializers.KryoNamespaces;
69import org.onosproject.store.service.ConsistentMap;
70import org.onosproject.store.service.Serializer;
71import org.onosproject.store.service.StorageService;
72import org.onosproject.store.service.Versioned;
Jonathan Hart28271642016-02-10 16:13:54 -080073import org.osgi.service.component.ComponentContext;
alshabib3b1eadc2016-02-01 17:57:00 -080074import org.slf4j.Logger;
75
Jonathan Hart28271642016-02-10 16:13:54 -080076import java.util.Dictionary;
alshabib3b1eadc2016-02-01 17:57:00 -080077import java.util.Map;
ke han9590c812017-02-28 15:02:26 +080078import java.util.Objects;
Jonathan Hart0c194962016-05-23 17:08:15 -070079import java.util.Optional;
alshabibfc1cb032016-02-17 15:37:56 -080080import java.util.Properties;
Esin Karaman39b24852019-08-28 13:57:30 +000081import java.util.Set;
82import java.util.concurrent.ExecutorService;
83import java.util.concurrent.locks.Lock;
84import java.util.concurrent.locks.ReentrantLock;
alshabib3b1eadc2016-02-01 17:57:00 -080085
alshabibfc1cb032016-02-17 15:37:56 -080086import static com.google.common.base.Strings.isNullOrEmpty;
Esin Karaman39b24852019-08-28 13:57:30 +000087import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
alshabibfc1cb032016-02-17 15:37:56 -080088import static org.onlab.util.Tools.get;
Esin Karaman39b24852019-08-28 13:57:30 +000089import static org.onlab.util.Tools.groupedThreads;
Arjun E Kabf9e6e2020-03-02 10:15:21 +000090
Carmelo Cascone995fd682019-11-14 14:22:39 -080091import static org.opencord.cordmcast.OsgiPropertyConstants.DEFAULT_PRIORITY;
92import static org.opencord.cordmcast.OsgiPropertyConstants.PRIORITY;
Arjun E Kabf9e6e2020-03-02 10:15:21 +000093import static org.opencord.cordmcast.OsgiPropertyConstants.DEFAULT_VLAN_ENABLED;
Carmelo Cascone995fd682019-11-14 14:22:39 -080094import static org.opencord.cordmcast.OsgiPropertyConstants.VLAN_ENABLED;
alshabib3b1eadc2016-02-01 17:57:00 -080095import static org.slf4j.LoggerFactory.getLogger;
96
Esin Karaman39b24852019-08-28 13:57:30 +000097
alshabib3b1eadc2016-02-01 17:57:00 -080098/**
Jonathan Hartc3f84eb2016-02-19 12:44:36 -080099 * CORD multicast provisioning application. Operates by listening to
Jonathan Hart28271642016-02-10 16:13:54 -0800100 * events on the multicast rib and provisioning groups to program multicast
alshabib3b1eadc2016-02-01 17:57:00 -0800101 * flows on the dataplane.
102 */
Carmelo Cascone995fd682019-11-14 14:22:39 -0800103@Component(immediate = true,
104 property = {
105 VLAN_ENABLED + ":Boolean=" + DEFAULT_VLAN_ENABLED,
106 PRIORITY + ":Integer=" + DEFAULT_PRIORITY,
107})
alshabib3b1eadc2016-02-01 17:57:00 -0800108public class CordMcast {
Charles Chanf867c4b2017-01-20 11:22:25 -0800109 private static final String APP_NAME = "org.opencord.cordmcast";
alshabib3b1eadc2016-02-01 17:57:00 -0800110
Jonathan Hart0c194962016-05-23 17:08:15 -0700111 private final Logger log = getLogger(getClass());
alshabib09069c92016-02-21 14:49:51 -0800112
alshabib09069c92016-02-21 14:49:51 -0800113 private static final int DEFAULT_PRIORITY = 500;
alshabib3b1eadc2016-02-01 17:57:00 -0800114 private static final short DEFAULT_MCAST_VLAN = 4000;
alshabibfc1cb032016-02-17 15:37:56 -0800115
Carmelo Cascone995fd682019-11-14 14:22:39 -0800116 @Reference(cardinality = ReferenceCardinality.MANDATORY)
alshabib3b1eadc2016-02-01 17:57:00 -0800117 protected MulticastRouteService mcastService;
118
Carmelo Cascone995fd682019-11-14 14:22:39 -0800119 @Reference(cardinality = ReferenceCardinality.MANDATORY)
alshabib3b1eadc2016-02-01 17:57:00 -0800120 protected FlowObjectiveService flowObjectiveService;
121
Carmelo Cascone995fd682019-11-14 14:22:39 -0800122 @Reference(cardinality = ReferenceCardinality.MANDATORY)
alshabib3b1eadc2016-02-01 17:57:00 -0800123 protected CoreService coreService;
124
Carmelo Cascone995fd682019-11-14 14:22:39 -0800125 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jonathan Hart28271642016-02-10 16:13:54 -0800126 protected ComponentConfigService componentConfigService;
127
Carmelo Cascone995fd682019-11-14 14:22:39 -0800128 @Reference(cardinality = ReferenceCardinality.MANDATORY)
ke hanf1709e82016-08-12 10:48:17 +0800129 protected NetworkConfigRegistry networkConfig;
130
Carmelo Cascone995fd682019-11-14 14:22:39 -0800131 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Esin Karaman39b24852019-08-28 13:57:30 +0000132 protected StorageService storageService;
133
Carmelo Cascone995fd682019-11-14 14:22:39 -0800134 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Esin Karaman39b24852019-08-28 13:57:30 +0000135 protected MastershipService mastershipService;
136
Carmelo Cascone995fd682019-11-14 14:22:39 -0800137 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Esin Karaman39b24852019-08-28 13:57:30 +0000138 public DeviceService deviceService;
139
Carmelo Cascone995fd682019-11-14 14:22:39 -0800140 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Esin Karaman39b24852019-08-28 13:57:30 +0000141 private ClusterService clusterService;
142
Carmelo Cascone995fd682019-11-14 14:22:39 -0800143 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Esin Karaman39b24852019-08-28 13:57:30 +0000144 private LeadershipService leadershipService;
145
Esin Karaman996177c2020-03-05 13:21:09 +0000146 @Reference(cardinality = ReferenceCardinality.MANDATORY)
147 protected SadisService sadisService;
148
Arjun E Kabf9e6e2020-03-02 10:15:21 +0000149 @Reference(cardinality = ReferenceCardinality.MANDATORY)
150 protected CordMcastStatisticsService cordMcastStatisticsService;
151
alshabib3b1eadc2016-02-01 17:57:00 -0800152 protected McastListener listener = new InternalMulticastListener();
Arjun E Kabf9e6e2020-03-02 10:15:21 +0000153
ke hanf1709e82016-08-12 10:48:17 +0800154 private InternalNetworkConfigListener configListener =
155 new InternalNetworkConfigListener();
alshabib3b1eadc2016-02-01 17:57:00 -0800156
Esin Karaman39b24852019-08-28 13:57:30 +0000157 private ConsistentMap<NextKey, NextContent> groups;
alshabib3b1eadc2016-02-01 17:57:00 -0800158
alshabib3b1eadc2016-02-01 17:57:00 -0800159 private ApplicationId appId;
ke hanf1709e82016-08-12 10:48:17 +0800160 private ApplicationId coreAppId;
Esin Karaman39b24852019-08-28 13:57:30 +0000161 private short mcastVlan = DEFAULT_MCAST_VLAN;
alshabib3b1eadc2016-02-01 17:57:00 -0800162
Carmelo Cascone995fd682019-11-14 14:22:39 -0800163 /**
164 * Whether to use VLAN for multicast traffic.
165 **/
alshabib09069c92016-02-21 14:49:51 -0800166 private boolean vlanEnabled = DEFAULT_VLAN_ENABLED;
alshabibfc1cb032016-02-17 15:37:56 -0800167
Carmelo Cascone995fd682019-11-14 14:22:39 -0800168 /**
169 * Priority for multicast rules.
170 **/
alshabib3b1eadc2016-02-01 17:57:00 -0800171 private int priority = DEFAULT_PRIORITY;
172
ke hanf1709e82016-08-12 10:48:17 +0800173 private static final Class<McastConfig> CORD_MCAST_CONFIG_CLASS =
174 McastConfig.class;
175
176 private ConfigFactory<ApplicationId, McastConfig> cordMcastConfigFactory =
177 new ConfigFactory<ApplicationId, McastConfig>(
178 SubjectFactories.APP_SUBJECT_FACTORY, CORD_MCAST_CONFIG_CLASS, "multicast") {
179 @Override
180 public McastConfig createConfig() {
181 return new McastConfig();
182 }
183 };
Jonathan Hart28271642016-02-10 16:13:54 -0800184
Esin Karaman39b24852019-08-28 13:57:30 +0000185 // lock to synchronize local operations
186 private final Lock mcastLock = new ReentrantLock();
187 private void mcastLock() {
188 mcastLock.lock();
189 }
190 private void mcastUnlock() {
191 mcastLock.unlock();
192 }
193 private ExecutorService eventExecutor;
194
alshabib3b1eadc2016-02-01 17:57:00 -0800195 @Activate
Jonathan Hart435ffc42016-02-19 10:32:05 -0800196 public void activate(ComponentContext context) {
Jonathan Hartc3f84eb2016-02-19 12:44:36 -0800197 componentConfigService.registerProperties(getClass());
Jonathan Hart435ffc42016-02-19 10:32:05 -0800198 modified(context);
199
Charles Chanf867c4b2017-01-20 11:22:25 -0800200 appId = coreService.registerApplication(APP_NAME);
ke hanf1709e82016-08-12 10:48:17 +0800201 coreAppId = coreService.registerApplication(CoreService.CORE_APP_NAME);
Jonathan Hart28271642016-02-10 16:13:54 -0800202
Esin Karaman39b24852019-08-28 13:57:30 +0000203 eventExecutor = newSingleThreadScheduledExecutor(groupedThreads("cord/mcast",
204 "events-mcast-%d", log));
205
206 KryoNamespace.Builder groupsKryo = new KryoNamespace.Builder()
207 .register(KryoNamespaces.API)
208 .register(NextKey.class)
209 .register(NextContent.class);
210 groups = storageService
211 .<NextKey, NextContent>consistentMapBuilder()
212 .withName("cord-mcast-groups-store")
213 .withSerializer(Serializer.using(groupsKryo.build("CordMcast-Groups")))
214 .build();
215
ke hanf1709e82016-08-12 10:48:17 +0800216 networkConfig.registerConfigFactory(cordMcastConfigFactory);
217 networkConfig.addListener(configListener);
Jonathan Hartc3f84eb2016-02-19 12:44:36 -0800218 mcastService.addListener(listener);
219
alshabib09069c92016-02-21 14:49:51 -0800220 mcastService.getRoutes().stream()
Esin Karaman39b24852019-08-28 13:57:30 +0000221 .map(r -> new ImmutablePair<>(r, mcastService.sinks(r)))
alshabib09069c92016-02-21 14:49:51 -0800222 .filter(pair -> pair.getRight() != null && !pair.getRight().isEmpty())
Esin Karaman39b24852019-08-28 13:57:30 +0000223 .forEach(pair -> pair.getRight().forEach(sink -> addSink(pair.getLeft(),
224 sink)));
Jonathan Hartc3f84eb2016-02-19 12:44:36 -0800225
ke hanf1709e82016-08-12 10:48:17 +0800226 McastConfig config = networkConfig.getConfig(coreAppId, CORD_MCAST_CONFIG_CLASS);
Esin Karaman39b24852019-08-28 13:57:30 +0000227 updateConfig(config);
alshabib3b1eadc2016-02-01 17:57:00 -0800228 log.info("Started");
229 }
230
Arjun E Kabf9e6e2020-03-02 10:15:21 +0000231 @Modified
232 public void modified(ComponentContext context) {
233 Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
234
235 String s = get(properties, VLAN_ENABLED);
236 vlanEnabled = isNullOrEmpty(s) ? DEFAULT_VLAN_ENABLED : Boolean.parseBoolean(s.trim());
237
238 try {
239 s = get(properties, PRIORITY);
240 priority = isNullOrEmpty(s) ? DEFAULT_PRIORITY : Integer.parseInt(s.trim());
241 } catch (NumberFormatException ne) {
242 log.error("Unable to parse configuration parameter for priority", ne);
243 priority = DEFAULT_PRIORITY;
244 }
245 cordMcastStatisticsService.setVlanValue(assignedVlan());
246 }
247
alshabib3b1eadc2016-02-01 17:57:00 -0800248 @Deactivate
249 public void deactivate() {
Jonathan Hartc3f84eb2016-02-19 12:44:36 -0800250 componentConfigService.unregisterProperties(getClass(), false);
alshabib3b1eadc2016-02-01 17:57:00 -0800251 mcastService.removeListener(listener);
ke hanf1709e82016-08-12 10:48:17 +0800252 networkConfig.removeListener(configListener);
ke han9590c812017-02-28 15:02:26 +0800253 networkConfig.unregisterConfigFactory(cordMcastConfigFactory);
Esin Karaman39b24852019-08-28 13:57:30 +0000254 eventExecutor.shutdown();
ke han9590c812017-02-28 15:02:26 +0800255 clearGroups();
Esin Karaman39b24852019-08-28 13:57:30 +0000256 groups.destroy();
alshabib3b1eadc2016-02-01 17:57:00 -0800257 log.info("Stopped");
258 }
259
ke han9590c812017-02-28 15:02:26 +0800260 public void clearGroups() {
Esin Karaman39b24852019-08-28 13:57:30 +0000261 mcastLock();
262 try {
263 groups.keySet().forEach(groupInfo -> {
264 if (!isLocalLeader(groupInfo.getDevice())) {
265 return;
266 }
267 NextContent next = groups.get(groupInfo).value();
268
Sonal Kasliwala0bbe6c2020-01-06 10:46:30 +0000269 if (next != null) {
270 ObjectiveContext context = new DefaultObjectiveContext(
271 (objective) -> log.debug("Successfully remove {}",
272 groupInfo.group),
273 (objective, error) -> log.warn("Failed to remove {}: {}",
274 groupInfo.group, error));
275 // remove the flow rule
276 flowObjectiveService.forward(groupInfo.getDevice(), fwdObject(next.getNextId(),
277 groupInfo.group).remove(context));
278 // remove all ports from the group
279 next.getOutPorts().stream().forEach(portNumber ->
280 flowObjectiveService.next(groupInfo.getDevice(), nextObject(next.getNextId(),
281 portNumber,
282 NextType.RemoveFromExisting,
283 groupInfo.group))
284 );
Esin Karaman39b24852019-08-28 13:57:30 +0000285
Sonal Kasliwala0bbe6c2020-01-06 10:46:30 +0000286 }
Esin Karaman39b24852019-08-28 13:57:30 +0000287 });
288 groups.clear();
289 } finally {
290 mcastUnlock();
291 }
292 }
293
294 private VlanId multicastVlan() {
295 return VlanId.vlanId(mcastVlan);
296 }
297
Arjun E Kabf9e6e2020-03-02 10:15:21 +0000298 protected VlanId assignedVlan() {
Esin Karaman39b24852019-08-28 13:57:30 +0000299 return vlanEnabled ? multicastVlan() : VlanId.NONE;
ke han9590c812017-02-28 15:02:26 +0800300 }
301
alshabib3b1eadc2016-02-01 17:57:00 -0800302 private class InternalMulticastListener implements McastListener {
303 @Override
304 public void event(McastEvent event) {
Esin Karaman39b24852019-08-28 13:57:30 +0000305 eventExecutor.execute(() -> {
306 switch (event.type()) {
307 case ROUTE_ADDED:
308 case ROUTE_REMOVED:
309 case SOURCES_ADDED:
310 break;
311 case SINKS_ADDED:
312 addSinks(event);
313 break;
314 case SINKS_REMOVED:
315 removeSinks(event);
316 break;
317 default:
318 log.warn("Unknown mcast event {}", event.type());
319 }
320 });
321 }
322 }
323
324 /**
325 * Processes previous, and new sinks then finds the sinks to be removed.
326 * @param prevSinks the previous sinks to be evaluated
327 * @param newSinks the new sinks to be evaluated
328 * @returnt the set of the sinks to be removed
329 */
330 private Set<ConnectPoint> getSinksToBeRemoved(Map<HostId, Set<ConnectPoint>> prevSinks,
331 Map<HostId, Set<ConnectPoint>> newSinks) {
332 return getSinksToBeProcessed(prevSinks, newSinks);
333 }
334
335
336 /**
337 * Processes previous, and new sinks then finds the sinks to be added.
338 * @param newSinks the new sinks to be processed
339 * @param allPrevSinks all previous sinks
340 * @return the set of the sinks to be added
341 */
342 private Set<ConnectPoint> getSinksToBeAdded(Map<HostId, Set<ConnectPoint>> newSinks,
343 Map<HostId, Set<ConnectPoint>> allPrevSinks) {
344 return getSinksToBeProcessed(newSinks, allPrevSinks);
345 }
346
347 /**
348 * Gets single-homed sinks that are in set1 but not in set2.
349 * @param sinkSet1 the first sink map
350 * @param sinkSet2 the second sink map
351 * @return a set containing all the single-homed sinks found in set1 but not in set2
352 */
353 private Set<ConnectPoint> getSinksToBeProcessed(Map<HostId, Set<ConnectPoint>> sinkSet1,
354 Map<HostId, Set<ConnectPoint>> sinkSet2) {
355 final Set<ConnectPoint> sinksToBeProcessed = Sets.newHashSet();
356 sinkSet1.forEach(((hostId, connectPoints) -> {
357 if (HostId.NONE.equals(hostId)) {
358 //assume all connect points associated with HostId.NONE are single homed sinks
359 sinksToBeProcessed.addAll(connectPoints);
360 return;
361 }
362 }));
363 Set<ConnectPoint> singleHomedSinksOfSet2 = sinkSet2.get(HostId.NONE) == null ?
364 Sets.newHashSet() :
365 sinkSet2.get(HostId.NONE);
366 return Sets.difference(sinksToBeProcessed, singleHomedSinksOfSet2);
367 };
368
369
370 private void removeSinks(McastEvent event) {
371 mcastLock();
372 try {
373 Set<ConnectPoint> sinksToBeRemoved = getSinksToBeRemoved(event.prevSubject().sinks(),
374 event.subject().sinks());
375 sinksToBeRemoved.forEach(sink -> removeSink(event.subject().route().group(), sink));
376 } finally {
377 mcastUnlock();
378 }
379 }
380
381 private void removeSink(IpAddress group, ConnectPoint sink) {
382 if (!isLocalLeader(sink.deviceId())) {
383 log.debug("Not the leader of {}. Skip sink_removed event for the sink {} and group {}",
384 sink.deviceId(), sink, group);
385 return;
386 }
387
Esin Karaman996177c2020-03-05 13:21:09 +0000388 Optional<SubscriberAndDeviceInformation> oltInfo = getSubscriberAndDeviceInformation(sink.deviceId());
Esin Karaman39b24852019-08-28 13:57:30 +0000389
390 if (!oltInfo.isPresent()) {
391 log.warn("Unknown OLT device : {}", sink.deviceId());
392 return;
393 }
394
395 log.debug("Removing sink {} from the group {}", sink, group);
396
397 NextKey key = new NextKey(sink.deviceId(), group);
398 groups.computeIfPresent(key, (k, v) -> {
399 flowObjectiveService.next(sink.deviceId(), nextObject(v.getNextId(), sink.port(),
400 NextType.RemoveFromExisting, group));
401
402 Set<PortNumber> outPorts = Sets.newHashSet(v.getOutPorts());
403 outPorts.remove(sink.port());
404
405 if (outPorts.isEmpty()) {
406 // this is the last sink
407 ObjectiveContext context = new DefaultObjectiveContext(
408 (objective) -> log.debug("Successfully remove {} on {}",
409 group, sink),
410 (objective, error) -> log.warn("Failed to remove {} on {}: {}",
411 group, sink, error));
412 ForwardingObjective fwdObj = fwdObject(v.getNextId(), group).remove(context);
413 flowObjectiveService.forward(sink.deviceId(), fwdObj);
414 }
415 // remove the whole entity if no out port exists in the port list
416 return outPorts.isEmpty() ? null : new NextContent(v.getNextId(),
417 ImmutableSet.copyOf(outPorts));
418 });
419 }
420
421 private void addSinks(McastEvent event) {
422 mcastLock();
423 try {
424 Set<ConnectPoint> sinksToBeAdded = getSinksToBeAdded(event.subject().sinks(),
425 event.prevSubject().sinks());
426 sinksToBeAdded.forEach(sink -> addSink(event.subject().route(), sink));
427 } finally {
428 mcastUnlock();
429 }
430 }
431
432 private void addSink(McastRoute route, ConnectPoint sink) {
Arjun E Kabf9e6e2020-03-02 10:15:21 +0000433
Esin Karaman39b24852019-08-28 13:57:30 +0000434 if (!isLocalLeader(sink.deviceId())) {
435 log.debug("Not the leader of {}. Skip sink_added event for the sink {} and group {}",
436 sink.deviceId(), sink, route.group());
437 return;
438 }
439
Esin Karaman996177c2020-03-05 13:21:09 +0000440 Optional<SubscriberAndDeviceInformation> oltInfo = getSubscriberAndDeviceInformation(sink.deviceId());
Esin Karaman39b24852019-08-28 13:57:30 +0000441
442 if (!oltInfo.isPresent()) {
443 log.warn("Unknown OLT device : {}", sink.deviceId());
444 return;
445 }
446
447 log.debug("Adding sink {} to the group {}", sink, route.group());
448
449 NextKey key = new NextKey(sink.deviceId(), route.group());
450 NextObjective newNextObj;
451
452 boolean theFirstSinkOfGroup = false;
453 if (!groups.containsKey(key)) {
454 // First time someone request this mcast group via this device
455 Integer nextId = flowObjectiveService.allocateNextId();
456 newNextObj = nextObject(nextId, sink.port(), NextType.AddNew, route.group());
457 // Store the new port
458 groups.put(key, new NextContent(nextId, ImmutableSet.of(sink.port())));
459 theFirstSinkOfGroup = true;
460 } else {
461 // This device already serves some subscribers of this mcast group
462 Versioned<NextContent> nextObj = groups.get(key);
463 if (nextObj.value().getOutPorts().contains(sink.port())) {
464 log.info("Group {} already serves the sink connected to {}", route.group(), sink);
465 return;
466 }
467 newNextObj = nextObject(nextObj.value().getNextId(), sink.port(),
468 NextType.AddToExisting, route.group());
469 // add new port to the group
470 Set<PortNumber> outPorts = Sets.newHashSet(nextObj.value().getOutPorts());
471 outPorts.add(sink.port());
472 groups.put(key, new NextContent(newNextObj.id(), ImmutableSet.copyOf(outPorts)));
473 }
474
475 ObjectiveContext context = new DefaultObjectiveContext(
476 (objective) -> log.debug("Successfully add {} on {}/{}, vlan {}",
477 route.group(), sink.deviceId(), sink.port().toLong(),
478 assignedVlan()),
479 (objective, error) -> {
480 log.warn("Failed to add {} on {}/{}, vlan {}: {}",
481 route.group(), sink.deviceId(), sink.port().toLong(), assignedVlan(),
482 error);
483 });
484
485 flowObjectiveService.next(sink.deviceId(), newNextObj);
486
487 if (theFirstSinkOfGroup) {
488 // create the necessary flow rule if this is the first sink request for the group
489 // on this device
490 flowObjectiveService.forward(sink.deviceId(), fwdObject(newNextObj.id(),
491 route.group()).add(context));
492 }
493 }
494
Esin Karaman996177c2020-03-05 13:21:09 +0000495 /**
496 * Fetches device information associated with the device serial number from SADIS.
497 *
498 * @param serialNumber serial number of a device
499 * @return device information; an empty Optional otherwise.
500 */
501 private Optional<SubscriberAndDeviceInformation> getSubscriberAndDeviceInformation(String serialNumber) {
502 long start = System.currentTimeMillis();
503 try {
504 return Optional.ofNullable(sadisService.getSubscriberInfoService().get(serialNumber));
505 } finally {
506 if (log.isDebugEnabled()) {
507 // SADIS may call remote systems to fetch device data and this calls can take a long time.
508 // This measurement is just for monitoring these kinds of situations.
509 log.debug("Device fetched from SADIS. Elapsed {} msec", System.currentTimeMillis() - start);
510 }
511
512 }
513 }
514
515 /**
516 * Fetches device information associated with the device serial number from SADIS.
517 *
518 * @param deviceId device id
519 * @return device information; an empty Optional otherwise.
520 */
521 private Optional<SubscriberAndDeviceInformation> getSubscriberAndDeviceInformation(DeviceId deviceId) {
522 Device device = deviceService.getDevice(deviceId);
523 if (device == null || device.serialNumber() == null) {
524 return Optional.empty();
525 }
526 return getSubscriberAndDeviceInformation(device.serialNumber());
527 }
528
Esin Karaman39b24852019-08-28 13:57:30 +0000529 private class InternalNetworkConfigListener implements NetworkConfigListener {
530 @Override
531 public void event(NetworkConfigEvent event) {
532 eventExecutor.execute(() -> {
533 switch (event.type()) {
534
535 case CONFIG_ADDED:
536 case CONFIG_UPDATED:
537 if (event.configClass().equals(CORD_MCAST_CONFIG_CLASS)) {
538 McastConfig config = networkConfig.getConfig(coreAppId, CORD_MCAST_CONFIG_CLASS);
539 if (config != null) {
540 //TODO: Simply remove flows/groups, hosts will response period query
541 // and re-sent IGMP report, so the flows can be rebuild.
542 // However, better to remove and re-add mcast flow rules here
543 if (mcastVlan != config.egressVlan().toShort() && vlanEnabled) {
544 clearGroups();
545 }
546 updateConfig(config);
547 }
548 }
549 break;
550 case CONFIG_REGISTERED:
551 case CONFIG_UNREGISTERED:
552 case CONFIG_REMOVED:
553 break;
554 default:
555 break;
556 }
557 });
558 }
559 }
560
561 private void updateConfig(McastConfig config) {
562 if (config == null) {
563 return;
564 }
565 log.debug("multicast config received: {}", config);
566
567 if (config.egressVlan() != null) {
568 mcastVlan = config.egressVlan().toShort();
569 }
570 }
571
572 private class NextKey {
573 private DeviceId device;
574 private IpAddress group;
575
576 public NextKey(DeviceId deviceId, IpAddress groupAddress) {
577 device = deviceId;
578 group = groupAddress;
579 }
580
581 public DeviceId getDevice() {
582 return device;
583 }
584
585 public int hashCode() {
586 return Objects.hash(this.device, this.group);
587 }
588
589 public boolean equals(Object obj) {
590 if (this == obj) {
591 return true;
592 } else if (!(obj instanceof NextKey)) {
593 return false;
594 } else {
595 NextKey that = (NextKey) obj;
596 return this.getClass() == that.getClass() &&
597 Objects.equals(this.device, that.device) &&
598 Objects.equals(this.group, that.group);
599 }
600 }
601 }
602
603 private class NextContent {
604 private Integer nextId;
605 private Set<PortNumber> outPorts;
606
607 public NextContent(Integer nextId, Set<PortNumber> outPorts) {
608 this.nextId = nextId;
609 this.outPorts = outPorts;
610 }
611
612 public Integer getNextId() {
613 return nextId;
614 }
615
616 public Set<PortNumber> getOutPorts() {
617 return ImmutableSet.copyOf(outPorts);
618 }
619
620 public int hashCode() {
621 return Objects.hash(this.nextId, this.outPorts);
622 }
623
624 public boolean equals(Object obj) {
625 if (this == obj) {
626 return true;
627 } else if (!(obj instanceof NextContent)) {
628 return false;
629 } else {
630 NextContent that = (NextContent) obj;
631 return this.getClass() == that.getClass() &&
632 Objects.equals(this.nextId, that.nextId) &&
633 Objects.equals(this.outPorts, that.outPorts);
alshabib3b1eadc2016-02-01 17:57:00 -0800634 }
635 }
636 }
637
ke han9590c812017-02-28 15:02:26 +0800638 private enum NextType { AddNew, AddToExisting, Remove, RemoveFromExisting };
639
Esin Karaman39b24852019-08-28 13:57:30 +0000640 private NextObjective nextObject(Integer nextId, PortNumber port,
641 NextType nextType, IpAddress mcastIp) {
ke han9590c812017-02-28 15:02:26 +0800642
Esin Karaman39b24852019-08-28 13:57:30 +0000643 // Build the meta selector with the fwd objective info
644 TrafficSelector.Builder metadata = DefaultTrafficSelector.builder()
645 .matchIPDst(mcastIp.toIpPrefix());
646
647 if (vlanEnabled) {
648 metadata.matchVlanId(multicastVlan());
649 }
650
ke han9590c812017-02-28 15:02:26 +0800651 DefaultNextObjective.Builder build = DefaultNextObjective.builder()
652 .fromApp(appId)
653 .addTreatment(DefaultTrafficTreatment.builder().setOutput(port).build())
654 .withType(NextObjective.Type.BROADCAST)
Esin Karaman39b24852019-08-28 13:57:30 +0000655 .withId(nextId)
656 .withMeta(metadata.build());
657
ke han9590c812017-02-28 15:02:26 +0800658 ObjectiveContext content = new ObjectiveContext() {
659 @Override
660 public void onSuccess(Objective objective) {
Esin Karaman39b24852019-08-28 13:57:30 +0000661 log.debug("Next Objective {} installed", objective.id());
ke han9590c812017-02-28 15:02:26 +0800662 }
663
664 @Override
665 public void onError(Objective objective, ObjectiveError error) {
Esin Karaman39b24852019-08-28 13:57:30 +0000666 log.debug("Next Objective {} failed, because {}",
667 objective.id(),
668 error);
ke han9590c812017-02-28 15:02:26 +0800669 }
670 };
671
672 switch (nextType) {
673 case AddNew:
674 return build.add(content);
675 case AddToExisting:
676 return build.addToExisting(content);
677 case Remove:
678 return build.remove(content);
679 case RemoveFromExisting:
680 return build.removeFromExisting(content);
681 default:
682 return null;
683 }
684 }
685
Esin Karaman39b24852019-08-28 13:57:30 +0000686 private ForwardingObjective.Builder fwdObject(int nextId, IpAddress mcastIp) {
687 TrafficSelector.Builder mcast = DefaultTrafficSelector.builder()
688 .matchEthType(Ethernet.TYPE_IPV4)
689 .matchIPDst(mcastIp.toIpPrefix());
ke han9590c812017-02-28 15:02:26 +0800690
Esin Karaman39b24852019-08-28 13:57:30 +0000691 //build the meta selector
692 TrafficSelector.Builder metabuilder = DefaultTrafficSelector.builder();
693 if (vlanEnabled) {
694 metabuilder.matchVlanId(multicastVlan());
Jonathan Hart718c0452016-02-18 15:56:22 -0800695 }
696
Esin Karaman39b24852019-08-28 13:57:30 +0000697 ForwardingObjective.Builder fwdBuilder = DefaultForwardingObjective.builder()
698 .fromApp(appId)
699 .nextStep(nextId)
700 .makePermanent()
701 .withFlag(ForwardingObjective.Flag.SPECIFIC)
702 .withPriority(priority)
703 .withSelector(mcast.build())
704 .withMeta(metabuilder.build());
alshabibfc1cb032016-02-17 15:37:56 -0800705
Esin Karaman39b24852019-08-28 13:57:30 +0000706 return fwdBuilder;
alshabibfc1cb032016-02-17 15:37:56 -0800707 }
708
Esin Karaman39b24852019-08-28 13:57:30 +0000709 // Custom-built function, when the device is not available we need a fallback mechanism
710 private boolean isLocalLeader(DeviceId deviceId) {
711 if (!mastershipService.isLocalMaster(deviceId)) {
712 // When the device is available we just check the mastership
713 if (deviceService.isAvailable(deviceId)) {
ke han9590c812017-02-28 15:02:26 +0800714 return false;
ke han9590c812017-02-28 15:02:26 +0800715 }
Esin Karaman39b24852019-08-28 13:57:30 +0000716 // Fallback with Leadership service - device id is used as topic
717 NodeId leader = leadershipService.runForLeadership(
718 deviceId.toString()).leaderNodeId();
719 // Verify if this node is the leader
720 return clusterService.getLocalNode().id().equals(leader);
ke han9590c812017-02-28 15:02:26 +0800721 }
Esin Karaman39b24852019-08-28 13:57:30 +0000722 return true;
ke han9590c812017-02-28 15:02:26 +0800723 }
Esin Karaman39b24852019-08-28 13:57:30 +0000724
alshabib3b1eadc2016-02-01 17:57:00 -0800725}
ke hanf1709e82016-08-12 10:48:17 +0800726
ke han9590c812017-02-28 15:02:26 +0800727