blob: 70a3753472db7722e6adc120acdc9b7593972fd5 [file] [log] [blame]
alshabibf0e7e702015-05-30 18:22:36 -07001/*
Brian O'Connord6a135a2017-08-03 22:46:05 -07002 * Copyright 2016-present Open Networking Foundation
alshabibf0e7e702015-05-30 18:22:36 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
alshabib36a4d732016-06-01 16:03:59 -070016package org.opencord.olt.impl;
alshabibf0e7e702015-05-30 18:22:36 -070017
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010018import com.google.common.collect.ImmutableMap;
19import com.google.common.collect.Sets;
Tunahan Sezena07fe962021-02-24 08:24:24 +000020import org.onlab.packet.MacAddress;
alshabibf0e7e702015-05-30 18:22:36 -070021import org.onlab.packet.VlanId;
Jonathan Hart4f178fa2020-02-03 10:46:01 -080022import org.onlab.util.KryoNamespace;
Gamze Abaka1f62dd92020-05-07 08:58:13 +000023import org.onosproject.cfg.ComponentConfigService;
Jonathan Hart4f178fa2020-02-03 10:46:01 -080024import org.onosproject.cluster.ClusterEvent;
25import org.onosproject.cluster.ClusterEventListener;
26import org.onosproject.cluster.ClusterService;
27import org.onosproject.cluster.ControllerNode;
Andrea Campanellaaf39b4c2020-05-13 14:07:44 +020028import org.onosproject.cluster.LeadershipService;
Jonathan Hart4f178fa2020-02-03 10:46:01 -080029import org.onosproject.cluster.NodeId;
alshabibf0e7e702015-05-30 18:22:36 -070030import org.onosproject.core.ApplicationId;
31import org.onosproject.core.CoreService;
alshabib8e4fd2f2016-01-12 15:55:53 -080032import org.onosproject.event.AbstractListenerManager;
Andrea Campanellaaf39b4c2020-05-13 14:07:44 +020033import org.onosproject.mastership.MastershipService;
Amit Ghosh1ed9aef2018-07-17 17:08:16 +010034import org.onosproject.net.AnnotationKeys;
Jonathan Harte533a422015-10-20 17:31:24 -070035import org.onosproject.net.ConnectPoint;
Amit Ghosh1ed9aef2018-07-17 17:08:16 +010036import org.onosproject.net.Device;
alshabibf0e7e702015-05-30 18:22:36 -070037import org.onosproject.net.DeviceId;
Tunahan Sezena07fe962021-02-24 08:24:24 +000038import org.onosproject.net.Host;
alshabibdec2e252016-01-15 12:20:25 -080039import org.onosproject.net.Port;
alshabibf0e7e702015-05-30 18:22:36 -070040import org.onosproject.net.PortNumber;
41import org.onosproject.net.device.DeviceEvent;
42import org.onosproject.net.device.DeviceListener;
43import org.onosproject.net.device.DeviceService;
Andrea Campanella438e1ad2021-03-26 11:41:16 +010044import org.onosproject.net.driver.DriverService;
Hardik Windlassa58fbee2020-03-12 18:33:55 +053045import org.onosproject.net.flow.FlowRuleService;
alshabibf0e7e702015-05-30 18:22:36 -070046import org.onosproject.net.flowobjective.FlowObjectiveService;
47import org.onosproject.net.flowobjective.ForwardingObjective;
alshabib3ea82642016-01-12 18:06:53 -080048import org.onosproject.net.flowobjective.Objective;
49import org.onosproject.net.flowobjective.ObjectiveContext;
50import org.onosproject.net.flowobjective.ObjectiveError;
Tunahan Sezena07fe962021-02-24 08:24:24 +000051import org.onosproject.net.host.HostEvent;
52import org.onosproject.net.host.HostListener;
53import org.onosproject.net.host.HostService;
Saurav Daseae48de2019-06-19 13:26:15 -070054import org.onosproject.net.meter.MeterId;
Jonathan Hart4f178fa2020-02-03 10:46:01 -080055import org.onosproject.store.serializers.KryoNamespaces;
56import org.onosproject.store.service.ConsistentMultimap;
57import org.onosproject.store.service.Serializer;
58import org.onosproject.store.service.StorageService;
alshabib36a4d732016-06-01 16:03:59 -070059import org.opencord.olt.AccessDeviceEvent;
60import org.opencord.olt.AccessDeviceListener;
Tunahan Sezenf0843b92021-04-30 07:13:16 +000061import org.opencord.olt.AccessDevicePort;
alshabib36a4d732016-06-01 16:03:59 -070062import org.opencord.olt.AccessDeviceService;
Amit Ghosh31939522018-08-16 13:28:21 +010063import org.opencord.olt.AccessSubscriberId;
Andrea Campanellacbbb7952019-11-25 06:38:41 +000064import org.opencord.olt.internalapi.AccessDeviceFlowService;
65import org.opencord.olt.internalapi.AccessDeviceMeterService;
Gamze Abaka641fc072018-09-04 09:16:27 +000066import org.opencord.sadis.BandwidthProfileInformation;
67import org.opencord.sadis.BaseInformationService;
68import org.opencord.sadis.SadisService;
Amit Ghosh1ed9aef2018-07-17 17:08:16 +010069import org.opencord.sadis.SubscriberAndDeviceInformation;
Andrea Campanellacbbb7952019-11-25 06:38:41 +000070import org.opencord.sadis.UniTagInformation;
alshabibe0559672016-02-21 14:49:51 -080071import org.osgi.service.component.ComponentContext;
Carmelo Casconeca931162019-07-15 18:22:24 -070072import org.osgi.service.component.annotations.Activate;
73import org.osgi.service.component.annotations.Component;
74import org.osgi.service.component.annotations.Deactivate;
75import org.osgi.service.component.annotations.Modified;
76import org.osgi.service.component.annotations.Reference;
77import org.osgi.service.component.annotations.ReferenceCardinality;
Ilayda Ozdemir90a93622021-02-25 09:40:58 +000078import org.osgi.service.component.annotations.ReferencePolicy;
alshabibf0e7e702015-05-30 18:22:36 -070079import org.slf4j.Logger;
80
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010081import java.util.ArrayList;
82import java.util.Collection;
83import java.util.Dictionary;
84import java.util.List;
85import java.util.Map;
86import java.util.Optional;
87import java.util.Properties;
88import java.util.Set;
89import java.util.concurrent.BlockingQueue;
90import java.util.concurrent.CompletableFuture;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010091import java.util.concurrent.ExecutorService;
92import java.util.concurrent.Executors;
93import java.util.concurrent.LinkedBlockingQueue;
94import java.util.concurrent.ScheduledExecutorService;
95import java.util.concurrent.TimeUnit;
96import java.util.concurrent.atomic.AtomicBoolean;
97
98import static com.google.common.base.Preconditions.checkNotNull;
99import static com.google.common.base.Strings.isNullOrEmpty;
100import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
101import static java.util.stream.Collectors.*;
102import static org.onlab.util.Tools.get;
103import static org.onlab.util.Tools.groupedThreads;
104import static org.opencord.olt.impl.OsgiPropertyConstants.*;
105import static org.slf4j.LoggerFactory.getLogger;
alshabibf0e7e702015-05-30 18:22:36 -0700106
107/**
Jonathan Harte533a422015-10-20 17:31:24 -0700108 * Provisions rules on access devices.
alshabibf0e7e702015-05-30 18:22:36 -0700109 */
Carmelo Casconeca931162019-07-15 18:22:24 -0700110@Component(immediate = true,
111 property = {
Carmelo Casconeca931162019-07-15 18:22:24 -0700112 DEFAULT_BP_ID + ":String=" + DEFAULT_BP_ID_DEFAULT,
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000113 DEFAULT_MCAST_SERVICE_NAME + ":String=" + DEFAULT_MCAST_SERVICE_NAME_DEFAULT,
Saurav Das2d3777a2020-08-07 18:48:51 -0700114 EAPOL_DELETE_RETRY_MAX_ATTEMPS + ":Integer=" +
115 EAPOL_DELETE_RETRY_MAX_ATTEMPS_DEFAULT,
Matteo Scandoloa4aaa972020-10-23 15:24:38 -0700116 PROVISION_DELAY + ":Integer=" + PROVISION_DELAY_DEFAULT,
Carmelo Casconeca931162019-07-15 18:22:24 -0700117 })
alshabib8e4fd2f2016-01-12 15:55:53 -0800118public class Olt
119 extends AbstractListenerManager<AccessDeviceEvent, AccessDeviceListener>
120 implements AccessDeviceService {
Ilayda Ozdemir90a93622021-02-25 09:40:58 +0000121 private static final String SADIS_NOT_RUNNING = "Sadis is not running.";
Charles Chan54f110f2017-01-20 11:22:42 -0800122 private static final String APP_NAME = "org.opencord.olt";
alshabibe0559672016-02-21 14:49:51 -0800123
Gamze Abakada282b42019-03-11 13:16:48 +0000124 private static final short EAPOL_DEFAULT_VLAN = 4091;
Gamze Abaka838d8142019-02-21 07:06:55 +0000125 private static final String NO_UPLINK_PORT = "No uplink port found for OLT device {}";
alshabibe0559672016-02-21 14:49:51 -0800126
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800127 public static final int HASH_WEIGHT = 10;
128
alshabibf0e7e702015-05-30 18:22:36 -0700129 private final Logger log = getLogger(getClass());
130
Thomas Lee Sd7735f92020-02-20 19:21:47 +0530131 private static final String NNI = "nni-";
132
Carmelo Casconeca931162019-07-15 18:22:24 -0700133 @Reference(cardinality = ReferenceCardinality.MANDATORY)
alshabibf0e7e702015-05-30 18:22:36 -0700134 protected FlowObjectiveService flowObjectiveService;
135
Carmelo Casconeca931162019-07-15 18:22:24 -0700136 @Reference(cardinality = ReferenceCardinality.MANDATORY)
alshabibf0e7e702015-05-30 18:22:36 -0700137 protected DeviceService deviceService;
138
Andrea Campanella438e1ad2021-03-26 11:41:16 +0100139
Carmelo Casconeca931162019-07-15 18:22:24 -0700140 @Reference(cardinality = ReferenceCardinality.MANDATORY)
alshabibf0e7e702015-05-30 18:22:36 -0700141 protected CoreService coreService;
142
Andrea Campanella438e1ad2021-03-26 11:41:16 +0100143 //Dependency on driver service is to ensure correct startup order
144 @Reference(cardinality = ReferenceCardinality.MANDATORY)
145 protected DriverService driverService;
146
Ilayda Ozdemir90a93622021-02-25 09:40:58 +0000147 @Reference(cardinality = ReferenceCardinality.OPTIONAL,
148 bind = "bindSadisService",
149 unbind = "unbindSadisService",
150 policy = ReferencePolicy.DYNAMIC)
151 protected volatile SadisService sadisService;
Gamze Abaka641fc072018-09-04 09:16:27 +0000152
Carmelo Casconeca931162019-07-15 18:22:24 -0700153 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000154 protected AccessDeviceFlowService oltFlowService;
alshabibe0559672016-02-21 14:49:51 -0800155
Carmelo Casconeca931162019-07-15 18:22:24 -0700156 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000157 protected AccessDeviceMeterService oltMeterService;
Gamze Abakaad329652018-12-20 10:12:21 +0000158
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800159 @Reference(cardinality = ReferenceCardinality.MANDATORY)
160 protected StorageService storageService;
161
162 @Reference(cardinality = ReferenceCardinality.MANDATORY)
163 protected ClusterService clusterService;
164
Hardik Windlassa58fbee2020-03-12 18:33:55 +0530165 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Andrea Campanellaaf39b4c2020-05-13 14:07:44 +0200166 protected MastershipService mastershipService;
167
168 @Reference(cardinality = ReferenceCardinality.MANDATORY)
169 protected LeadershipService leadershipService;
170
171 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Hardik Windlassa58fbee2020-03-12 18:33:55 +0530172 protected FlowRuleService flowRuleService;
173
Gamze Abaka1f62dd92020-05-07 08:58:13 +0000174 @Reference(cardinality = ReferenceCardinality.MANDATORY)
175 protected ComponentConfigService componentConfigService;
176
Tunahan Sezena07fe962021-02-24 08:24:24 +0000177 @Reference(cardinality = ReferenceCardinality.MANDATORY)
178 protected HostService hostService;
179
Carmelo Casconeca931162019-07-15 18:22:24 -0700180 /**
Carmelo Cascone95ff5122019-11-14 14:19:13 -0800181 * Default bandwidth profile id that is used for authentication trap flows.
Carmelo Casconeca931162019-07-15 18:22:24 -0700182 **/
183 protected String defaultBpId = DEFAULT_BP_ID_DEFAULT;
Gamze Abakaad329652018-12-20 10:12:21 +0000184
Carmelo Casconeca931162019-07-15 18:22:24 -0700185 /**
Gamze Abaka51a34e82020-05-08 13:03:14 +0000186 * Default multicast service name.
Carmelo Casconeca931162019-07-15 18:22:24 -0700187 **/
Gamze Abaka51a34e82020-05-08 13:03:14 +0000188 protected String multicastServiceName = DEFAULT_MCAST_SERVICE_NAME_DEFAULT;
Gamze Abaka33feef52019-02-27 08:16:47 +0000189
Saurav Das2d3777a2020-08-07 18:48:51 -0700190 /**
191 * Default amounts of eapol retry.
192 **/
193 protected int eapolDeleteRetryMaxAttempts = EAPOL_DELETE_RETRY_MAX_ATTEMPS_DEFAULT;
194
Matteo Scandoloa4aaa972020-10-23 15:24:38 -0700195 /**
196 * Delay between EAPOL removal and data plane flows provisioning.
197 */
198 protected int provisionDelay = PROVISION_DELAY_DEFAULT;
199
alshabibf0e7e702015-05-30 18:22:36 -0700200 private final DeviceListener deviceListener = new InternalDeviceListener();
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800201 private final ClusterEventListener clusterListener = new InternalClusterListener();
Tunahan Sezena07fe962021-02-24 08:24:24 +0000202 private final HostListener hostListener = new InternalHostListener();
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800203
204 private ConsistentHasher hasher;
alshabibf0e7e702015-05-30 18:22:36 -0700205
Gamze Abaka641fc072018-09-04 09:16:27 +0000206 protected BaseInformationService<SubscriberAndDeviceInformation> subsService;
207 private BaseInformationService<BandwidthProfileInformation> bpService;
alshabibf0e7e702015-05-30 18:22:36 -0700208
Gamze Abaka641fc072018-09-04 09:16:27 +0000209 private ExecutorService oltInstallers = Executors.newFixedThreadPool(4,
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000210 groupedThreads("onos/olt-service",
211 "olt-installer-%d"));
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100212
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700213 protected ExecutorService eventExecutor;
Tunahan Sezena07fe962021-02-24 08:24:24 +0000214 protected ExecutorService hostEventExecutor;
Saurav Das2d3777a2020-08-07 18:48:51 -0700215 protected ExecutorService retryExecutor;
Matteo Scandoloa4aaa972020-10-23 15:24:38 -0700216 protected ScheduledExecutorService provisionExecutor;
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700217
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800218 private ConsistentMultimap<ConnectPoint, UniTagInformation> programmedSubs;
Saurav Das2d3777a2020-08-07 18:48:51 -0700219 private ConsistentMultimap<ConnectPoint, UniTagInformation> failedSubs;
Saurav Dasa9d5f442019-03-06 19:32:48 -0800220
Ilayda Ozdemir90a93622021-02-25 09:40:58 +0000221 protected Map<DeviceId, BlockingQueue<SubscriberFlowInfo>> pendingSubscribersForDevice;
Tunahan Sezena07fe962021-02-24 08:24:24 +0000222 private ConsistentMultimap<ConnectPoint, SubscriberFlowInfo> waitingMacSubscribers;
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700223
alshabibf0e7e702015-05-30 18:22:36 -0700224 @Activate
alshabibe0559672016-02-21 14:49:51 -0800225 public void activate(ComponentContext context) {
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000226 eventExecutor = newSingleThreadScheduledExecutor(groupedThreads("onos/olt",
227 "events-%d", log));
Tunahan Sezena07fe962021-02-24 08:24:24 +0000228 hostEventExecutor = Executors.newFixedThreadPool(8, groupedThreads("onos/olt", "mac-events-%d", log));
Saurav Das2d3777a2020-08-07 18:48:51 -0700229 retryExecutor = Executors.newCachedThreadPool();
Matteo Scandoloa4aaa972020-10-23 15:24:38 -0700230 provisionExecutor = Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/olt",
231 "provision-%d", log));
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700232
alshabibe0559672016-02-21 14:49:51 -0800233 modified(context);
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000234 ApplicationId appId = coreService.registerApplication(APP_NAME);
Gamze Abaka1f62dd92020-05-07 08:58:13 +0000235 componentConfigService.registerProperties(getClass());
Saurav Das62ad75e2019-03-05 12:22:22 -0800236
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800237 KryoNamespace serializer = KryoNamespace.newBuilder()
238 .register(KryoNamespaces.API)
239 .register(UniTagInformation.class)
Ilayda Ozdemir90a93622021-02-25 09:40:58 +0000240 .register(SubscriberFlowInfo.class)
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000241 .register(AccessDevicePort.class)
242 .register(AccessDevicePort.Type.class)
Ilayda Ozdemir90a93622021-02-25 09:40:58 +0000243 .register(LinkedBlockingQueue.class)
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800244 .build();
245
246 programmedSubs = storageService.<ConnectPoint, UniTagInformation>consistentMultimapBuilder()
247 .withName("volt-programmed-subs")
248 .withSerializer(Serializer.using(serializer))
249 .withApplicationId(appId)
250 .build();
alshabibc4dfe852015-06-05 13:35:13 -0700251
Saurav Das2d3777a2020-08-07 18:48:51 -0700252 failedSubs = storageService.<ConnectPoint, UniTagInformation>consistentMultimapBuilder()
253 .withName("volt-failed-subs")
254 .withSerializer(Serializer.using(serializer))
255 .withApplicationId(appId)
256 .build();
257
Tunahan Sezena07fe962021-02-24 08:24:24 +0000258 KryoNamespace macSerializer = KryoNamespace.newBuilder()
259 .register(KryoNamespaces.API)
260 .register(SubscriberFlowInfo.class)
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000261 .register(AccessDevicePort.class)
262 .register(AccessDevicePort.Type.class)
Tunahan Sezena07fe962021-02-24 08:24:24 +0000263 .register(UniTagInformation.class)
264 .build();
265
266 waitingMacSubscribers = storageService.<ConnectPoint, SubscriberFlowInfo>consistentMultimapBuilder()
267 .withName("volt-waiting-mac-subs")
268 .withSerializer(Serializer.using(macSerializer))
269 .withApplicationId(appId)
270 .build();
271
Ilayda Ozdemir90a93622021-02-25 09:40:58 +0000272 pendingSubscribersForDevice = storageService.<DeviceId, BlockingQueue<SubscriberFlowInfo>>consistentMapBuilder()
273 .withName("volt-pending-subs")
274 .withSerializer(Serializer.using(serializer))
275 .withApplicationId(appId)
276 .build().asJavaMap();
alshabib8e4fd2f2016-01-12 15:55:53 -0800277 eventDispatcher.addSink(AccessDeviceEvent.class, listenerRegistry);
278
Ilayda Ozdemir90a93622021-02-25 09:40:58 +0000279 if (sadisService != null) {
280 subsService = sadisService.getSubscriberInfoService();
281 bpService = sadisService.getBandwidthProfileService();
282 } else {
283 log.warn(SADIS_NOT_RUNNING);
284 }
Gamze Abaka641fc072018-09-04 09:16:27 +0000285
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800286 List<NodeId> readyNodes = clusterService.getNodes().stream()
287 .filter(c -> clusterService.getState(c.id()) == ControllerNode.State.READY)
288 .map(ControllerNode::id)
289 .collect(toList());
290 hasher = new ConsistentHasher(readyNodes, HASH_WEIGHT);
291 clusterService.addListener(clusterListener);
292
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100293 // look for all provisioned devices in Sadis and create EAPOL flows for the
294 // UNI ports
295 Iterable<Device> devices = deviceService.getDevices();
296 for (Device d : devices) {
Andrea Campanellaaf39b4c2020-05-13 14:07:44 +0200297 if (isLocalLeader(d.id())) {
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800298 checkAndCreateDeviceFlows(d);
299 }
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100300 }
alshabib4ceaed32016-03-03 18:00:58 -0800301
alshabibba357492016-01-27 13:49:46 -0800302 deviceService.addListener(deviceListener);
Tunahan Sezena07fe962021-02-24 08:24:24 +0000303 hostService.addListener(hostListener);
alshabibf0e7e702015-05-30 18:22:36 -0700304 log.info("Started with Application ID {}", appId.id());
305 }
306
307 @Deactivate
308 public void deactivate() {
Gamze Abaka1f62dd92020-05-07 08:58:13 +0000309 componentConfigService.unregisterProperties(getClass(), false);
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800310 clusterService.removeListener(clusterListener);
alshabib62e9ce72016-02-11 17:31:36 -0800311 deviceService.removeListener(deviceListener);
Tunahan Sezena07fe962021-02-24 08:24:24 +0000312 hostService.removeListener(hostListener);
Jonathan Hart5f1c8142018-07-24 17:31:59 -0700313 eventDispatcher.removeSink(AccessDeviceEvent.class);
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700314 eventExecutor.shutdown();
Tunahan Sezena07fe962021-02-24 08:24:24 +0000315 hostEventExecutor.shutdown();
Saurav Das2d3777a2020-08-07 18:48:51 -0700316 retryExecutor.shutdown();
Matteo Scandoloa4aaa972020-10-23 15:24:38 -0700317 provisionExecutor.shutdown();
alshabibf0e7e702015-05-30 18:22:36 -0700318 log.info("Stopped");
319 }
320
alshabibe0559672016-02-21 14:49:51 -0800321 @Modified
322 public void modified(ComponentContext context) {
323 Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
324
325 try {
Andrea Campanella971d5b92020-05-07 11:20:43 +0200326 String bpId = get(properties, DEFAULT_BP_ID);
327 defaultBpId = isNullOrEmpty(bpId) ? defaultBpId : bpId;
Gamze Abakaad329652018-12-20 10:12:21 +0000328
Andrea Campanella971d5b92020-05-07 11:20:43 +0200329 String mcastSN = get(properties, DEFAULT_MCAST_SERVICE_NAME);
330 multicastServiceName = isNullOrEmpty(mcastSN) ? multicastServiceName : mcastSN;
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000331
Saurav Das2d3777a2020-08-07 18:48:51 -0700332 String eapolDeleteRetryNew = get(properties, EAPOL_DELETE_RETRY_MAX_ATTEMPS);
333 eapolDeleteRetryMaxAttempts = isNullOrEmpty(eapolDeleteRetryNew) ? EAPOL_DELETE_RETRY_MAX_ATTEMPS_DEFAULT :
334 Integer.parseInt(eapolDeleteRetryNew.trim());
335
336 log.debug("OLT properties: DefaultBpId: {}, MulticastServiceName: {}, EapolDeleteRetryMaxAttempts: {}",
337 defaultBpId, multicastServiceName, eapolDeleteRetryMaxAttempts);
Gamze Abaka33feef52019-02-27 08:16:47 +0000338
alshabibe0559672016-02-21 14:49:51 -0800339 } catch (Exception e) {
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000340 log.error("Error while modifying the properties", e);
Andrea Campanella971d5b92020-05-07 11:20:43 +0200341 defaultBpId = DEFAULT_BP_ID_DEFAULT;
342 multicastServiceName = DEFAULT_MCAST_SERVICE_NAME_DEFAULT;
alshabibe0559672016-02-21 14:49:51 -0800343 }
344 }
345
Ilayda Ozdemir90a93622021-02-25 09:40:58 +0000346 protected void bindSadisService(SadisService service) {
347 sadisService = service;
348 bpService = sadisService.getBandwidthProfileService();
349 subsService = sadisService.getSubscriberInfoService();
350 log.info("Sadis-service binds to onos.");
351 }
352
353 protected void unbindSadisService(SadisService service) {
354 sadisService = null;
355 bpService = null;
356 subsService = null;
357 log.info("Sadis-service unbinds from onos.");
358 }
359
alshabib32232c82016-02-25 17:57:24 -0500360 @Override
Gamze Abaka838d8142019-02-21 07:06:55 +0000361 public boolean provisionSubscriber(ConnectPoint connectPoint) {
Andrea Campanella3ce4d282020-06-09 13:46:58 +0200362 log.info("Call to provision subscriber at {}", connectPoint);
Gamze Abaka838d8142019-02-21 07:06:55 +0000363 DeviceId deviceId = connectPoint.deviceId();
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000364 Port subscriberPortOnos = deviceService.getPort(deviceId, connectPoint.port());
365 checkNotNull(subscriberPortOnos, "Invalid connect point:" + connectPoint);
366 AccessDevicePort subscriberPort = new AccessDevicePort(subscriberPortOnos, AccessDevicePort.Type.UNI);
Hardik Windlass395ff372019-06-13 05:16:00 +0000367
Saurav Das026650f2020-09-21 18:56:35 -0700368 if (isSubscriberInstalled(connectPoint)) {
369 log.warn("Subscriber at {} already provisioned or in the process .."
370 + " not taking any more action", connectPoint);
371 return true;
372 }
373
374 // Find the subscriber config at this connect point
Gamze Abaka838d8142019-02-21 07:06:55 +0000375 SubscriberAndDeviceInformation sub = getSubscriber(connectPoint);
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100376 if (sub == null) {
Gamze Abaka838d8142019-02-21 07:06:55 +0000377 log.warn("No subscriber found for {}", connectPoint);
Amit Ghosh31939522018-08-16 13:28:21 +0100378 return false;
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100379 }
Jonathan Harte533a422015-10-20 17:31:24 -0700380
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100381 // Get the uplink port
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000382 AccessDevicePort uplinkPort = getUplinkPort(deviceService.getDevice(deviceId));
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100383 if (uplinkPort == null) {
Gamze Abaka838d8142019-02-21 07:06:55 +0000384 log.warn(NO_UPLINK_PORT, deviceId);
Amit Ghosh31939522018-08-16 13:28:21 +0100385 return false;
Jonathan Harte533a422015-10-20 17:31:24 -0700386 }
387
Saurav Das2d3777a2020-08-07 18:48:51 -0700388 // delete Eapol authentication flow with default bandwidth
389 // wait until Eapol rule with defaultBpId is removed to install subscriber-based rules
390 // retry deletion if it fails/times-out
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000391 retryExecutor.execute(new DeleteEapolInstallSub(subscriberPort, uplinkPort, sub, 1));
Amit Ghosh31939522018-08-16 13:28:21 +0100392 return true;
alshabibb7a9e172016-01-13 11:23:53 -0800393 }
394
Saurav Das026650f2020-09-21 18:56:35 -0700395 // returns true if subscriber is programmed or in the process of being programmed
396 private boolean isSubscriberInstalled(ConnectPoint connectPoint) {
397 Collection<? extends UniTagInformation> uniTagInformationSet =
398 programmedSubs.get(connectPoint).value();
399 if (!uniTagInformationSet.isEmpty()) {
400 return true;
401 }
Andrea Campanella7a1d7e72020-11-05 10:40:10 +0100402 //Check if the subscriber is already getting provisioned
403 // so we do not provision twice
404 AtomicBoolean isPending = new AtomicBoolean(false);
405 pendingSubscribersForDevice.computeIfPresent(connectPoint.deviceId(), (id, infos) -> {
406 for (SubscriberFlowInfo fi : infos) {
407 if (fi.getUniPort().equals(connectPoint.port())) {
408 isPending.set(true);
409 break;
410 }
Saurav Das026650f2020-09-21 18:56:35 -0700411 }
Andrea Campanella7a1d7e72020-11-05 10:40:10 +0100412 return infos;
413 });
Saurav Das026650f2020-09-21 18:56:35 -0700414
Andrea Campanella7a1d7e72020-11-05 10:40:10 +0100415 return isPending.get();
Saurav Das026650f2020-09-21 18:56:35 -0700416 }
417
Saurav Das2d3777a2020-08-07 18:48:51 -0700418 private class DeleteEapolInstallSub implements Runnable {
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000419 AccessDevicePort subscriberPort;
420 AccessDevicePort uplinkPort;
Saurav Das2d3777a2020-08-07 18:48:51 -0700421 SubscriberAndDeviceInformation sub;
422 private int attemptNumber;
423
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000424 DeleteEapolInstallSub(AccessDevicePort subscriberPort, AccessDevicePort uplinkPort,
Saurav Das2d3777a2020-08-07 18:48:51 -0700425 SubscriberAndDeviceInformation sub,
426 int attemptNumber) {
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000427 this.subscriberPort = subscriberPort;
Saurav Das2d3777a2020-08-07 18:48:51 -0700428 this.uplinkPort = uplinkPort;
429 this.sub = sub;
430 this.attemptNumber = attemptNumber;
431 }
432
433 @Override
434 public void run() {
Tunahan Sezena07fe962021-02-24 08:24:24 +0000435 CompletableFuture<ObjectiveError> filterFuture = new CompletableFuture<>();
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000436 oltFlowService.processEapolFilteringObjectives(subscriberPort,
Saurav Das2d3777a2020-08-07 18:48:51 -0700437 defaultBpId, filterFuture,
438 VlanId.vlanId(EAPOL_DEFAULT_VLAN),
439 false);
440 filterFuture.thenAcceptAsync(filterStatus -> {
441 if (filterStatus == null) {
442 log.info("Default eapol flow deleted in attempt {} of {}"
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000443 + "... provisioning subscriber flows on {}",
444 attemptNumber, eapolDeleteRetryMaxAttempts, subscriberPort);
Matteo Scandoloa4aaa972020-10-23 15:24:38 -0700445
446 // FIXME this is needed to prevent that default EAPOL flow removal and
447 // data plane flows install are received by the device at the same time
448 provisionExecutor.schedule(
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000449 () -> provisionUniTagList(subscriberPort, uplinkPort, sub),
Matteo Scandoloa4aaa972020-10-23 15:24:38 -0700450 provisionDelay, TimeUnit.MILLISECONDS);
Saurav Das2d3777a2020-08-07 18:48:51 -0700451 } else {
452 if (attemptNumber <= eapolDeleteRetryMaxAttempts) {
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000453 log.warn("The filtering future failed {} for subscriber on {}"
Saurav Das2d3777a2020-08-07 18:48:51 -0700454 + "... retrying {} of {} attempts",
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000455 filterStatus, subscriberPort, attemptNumber, eapolDeleteRetryMaxAttempts);
Saurav Das2d3777a2020-08-07 18:48:51 -0700456 retryExecutor.execute(
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000457 new DeleteEapolInstallSub(subscriberPort, uplinkPort, sub,
Saurav Das2d3777a2020-08-07 18:48:51 -0700458 attemptNumber + 1));
459 } else {
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000460 log.error("The filtering future failed {} for subscriber on {}"
Saurav Das2d3777a2020-08-07 18:48:51 -0700461 + "after {} attempts. Subscriber provisioning failed",
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000462 filterStatus, subscriberPort, eapolDeleteRetryMaxAttempts);
463 sub.uniTagList().forEach(ut ->
464 failedSubs.put(
465 new ConnectPoint(subscriberPort.deviceId(), subscriberPort.number()), ut));
Saurav Das2d3777a2020-08-07 18:48:51 -0700466 }
467 }
468 });
469 }
470
471 }
472
alshabibb7a9e172016-01-13 11:23:53 -0800473 @Override
Gamze Abaka838d8142019-02-21 07:06:55 +0000474 public boolean removeSubscriber(ConnectPoint connectPoint) {
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000475 Port subscriberPort = deviceService.getPort(connectPoint);
476 if (subscriberPort == null) {
477 log.error("Subscriber port not found at: {}", connectPoint);
478 return false;
479 }
480 return removeSubscriber(new AccessDevicePort(subscriberPort, AccessDevicePort.Type.UNI));
481 }
482
483 private boolean removeSubscriber(AccessDevicePort subscriberPort) {
484 log.info("Call to un-provision subscriber at {}", subscriberPort);
Gamze Abaka838d8142019-02-21 07:06:55 +0000485
Saurav Daseae48de2019-06-19 13:26:15 -0700486 // Get the subscriber connected to this port from the local cache
487 // If we don't know about the subscriber there's no need to remove it
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000488 DeviceId deviceId = subscriberPort.deviceId();
Gamze Abaka838d8142019-02-21 07:06:55 +0000489
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000490 ConnectPoint connectPoint = new ConnectPoint(deviceId, subscriberPort.number());
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800491 Collection<? extends UniTagInformation> uniTagInformationSet = programmedSubs.get(connectPoint).value();
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000492 if (uniTagInformationSet == null || uniTagInformationSet.isEmpty()) {
Gamze Abaka838d8142019-02-21 07:06:55 +0000493 log.warn("Subscriber on connectionPoint {} was not previously programmed, " +
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000494 "no need to remove it", connectPoint);
Matteo Scandolo962a6ad2018-12-11 15:39:42 -0800495 return true;
alshabibbf23a1f2016-01-14 17:27:11 -0800496 }
497
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100498 // Get the uplink port
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000499 AccessDevicePort uplinkPort = getUplinkPort(deviceService.getDevice(deviceId));
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100500 if (uplinkPort == null) {
Gamze Abaka838d8142019-02-21 07:06:55 +0000501 log.warn(NO_UPLINK_PORT, deviceId);
Amit Ghosh31939522018-08-16 13:28:21 +0100502 return false;
alshabib4ceaed32016-03-03 18:00:58 -0800503 }
504
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000505 for (UniTagInformation uniTag : uniTagInformationSet) {
Amit Ghosh95e2f652017-08-23 12:49:46 +0100506
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000507 if (multicastServiceName.equals(uniTag.getServiceName())) {
508 continue;
509 }
Gamze Abaka838d8142019-02-21 07:06:55 +0000510
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000511 unprovisionVlans(uplinkPort, subscriberPort, uniTag);
alshabibbf23a1f2016-01-14 17:27:11 -0800512
Saurav Das9da7d522020-03-23 19:14:35 -0700513 // remove eapol with subscriber bandwidth profile
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000514 oltFlowService.processEapolFilteringObjectives(subscriberPort,
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000515 uniTag.getUpstreamBandwidthProfile(),
516 null, uniTag.getPonCTag(), false);
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100517
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000518 if (subscriberPort.port() != null && subscriberPort.isEnabled()) {
Saurav Das9da7d522020-03-23 19:14:35 -0700519 // reinstall eapol with default bandwidth profile
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000520 oltFlowService.processEapolFilteringObjectives(subscriberPort, defaultBpId,
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000521 null, VlanId.vlanId(EAPOL_DEFAULT_VLAN), true);
522 } else {
523 log.debug("Port {} is no longer enabled or it's unavailable. Not "
524 + "reprogramming default eapol flow", connectPoint);
525 }
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100526 }
Amit Ghosh31939522018-08-16 13:28:21 +0100527 return true;
alshabibbf23a1f2016-01-14 17:27:11 -0800528 }
529
Gamze Abakaf59c0912019-04-19 08:24:28 +0000530
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000531 @Override
532 public boolean provisionSubscriber(AccessSubscriberId subscriberId, Optional<VlanId> sTag,
533 Optional<VlanId> cTag, Optional<Integer> tpId) {
534
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000535 log.info("Provisioning subscriber using subscriberId {}, sTag {}, cTag {}, tpId {}",
536 subscriberId, sTag, cTag, tpId);
Gamze Abakaf59c0912019-04-19 08:24:28 +0000537
Amit Ghosh31939522018-08-16 13:28:21 +0100538 // Check if we can find the connect point to which this subscriber is connected
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000539 ConnectPoint cp = findSubscriberConnectPoint(subscriberId.toString());
540 if (cp == null) {
Amit Ghosh31939522018-08-16 13:28:21 +0100541 log.warn("ConnectPoint for {} not found", subscriberId);
542 return false;
543 }
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000544 AccessDevicePort subscriberPort = new AccessDevicePort(deviceService.getPort(cp), AccessDevicePort.Type.UNI);
Amit Ghosh31939522018-08-16 13:28:21 +0100545
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100546 if (!sTag.isPresent() && !cTag.isPresent()) {
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000547 return provisionSubscriber(cp);
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000548 } else if (sTag.isPresent() && cTag.isPresent() && tpId.isPresent()) {
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000549 AccessDevicePort uplinkPort = getUplinkPort(deviceService.getDevice(cp.deviceId()));
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100550 if (uplinkPort == null) {
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000551 log.warn(NO_UPLINK_PORT, cp.deviceId());
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100552 return false;
553 }
554
Gamze Abakaf59c0912019-04-19 08:24:28 +0000555 //delete Eapol authentication flow with default bandwidth
556 //wait until Eapol rule with defaultBpId is removed to install subscriber-based rules
Gamze Abakaf59c0912019-04-19 08:24:28 +0000557 //install subscriber flows
Tunahan Sezena07fe962021-02-24 08:24:24 +0000558 CompletableFuture<ObjectiveError> filterFuture = new CompletableFuture<>();
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000559 oltFlowService.processEapolFilteringObjectives(subscriberPort, defaultBpId,
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000560 filterFuture, VlanId.vlanId(EAPOL_DEFAULT_VLAN), false);
Gamze Abakaf59c0912019-04-19 08:24:28 +0000561 filterFuture.thenAcceptAsync(filterStatus -> {
562 if (filterStatus == null) {
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000563 provisionUniTagInformation(uplinkPort, subscriberPort, cTag.get(), sTag.get(), tpId.get());
Gamze Abakaf59c0912019-04-19 08:24:28 +0000564 }
565 });
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100566 return true;
567 } else {
568 log.warn("Provisioning failed for subscriber: {}", subscriberId);
569 return false;
570 }
Amit Ghosh31939522018-08-16 13:28:21 +0100571 }
Amit Ghosh95e2f652017-08-23 12:49:46 +0100572
alshabibe0559672016-02-21 14:49:51 -0800573 @Override
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000574 public boolean removeSubscriber(AccessSubscriberId subscriberId, Optional<VlanId> sTag,
575 Optional<VlanId> cTag, Optional<Integer> tpId) {
Amit Ghosh31939522018-08-16 13:28:21 +0100576 // Check if we can find the connect point to which this subscriber is connected
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000577 ConnectPoint cp = findSubscriberConnectPoint(subscriberId.toString());
578 if (cp == null) {
Amit Ghosh31939522018-08-16 13:28:21 +0100579 log.warn("ConnectPoint for {} not found", subscriberId);
580 return false;
581 }
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000582 AccessDevicePort subscriberPort = new AccessDevicePort(deviceService.getPort(cp), AccessDevicePort.Type.UNI);
Amit Ghosh31939522018-08-16 13:28:21 +0100583
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100584 if (!sTag.isPresent() && !cTag.isPresent()) {
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000585 return removeSubscriber(cp);
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000586 } else if (sTag.isPresent() && cTag.isPresent() && tpId.isPresent()) {
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100587 // Get the uplink port
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000588 AccessDevicePort uplinkPort = getUplinkPort(deviceService.getDevice(cp.deviceId()));
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100589 if (uplinkPort == null) {
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000590 log.warn(NO_UPLINK_PORT, cp.deviceId());
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100591 return false;
592 }
593
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000594 Optional<UniTagInformation> tagInfo = getUniTagInformation(subscriberPort, cTag.get(),
595 sTag.get(), tpId.get());
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000596 if (!tagInfo.isPresent()) {
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000597 log.warn("UniTagInformation does not exist for {}, cTag {}, sTag {}, tpId {}",
598 subscriberPort, cTag, sTag, tpId);
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000599 return false;
600 }
Gamze Abakaf59c0912019-04-19 08:24:28 +0000601
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000602 unprovisionVlans(uplinkPort, subscriberPort, tagInfo.get());
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100603 return true;
604 } else {
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000605 log.warn("Removing subscriber is not possible - please check the provided information" +
606 "for the subscriber: {}", subscriberId);
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100607 return false;
608 }
Amit Ghosh31939522018-08-16 13:28:21 +0100609 }
610
611 @Override
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000612 public ImmutableMap<ConnectPoint, Set<UniTagInformation>> getProgSubs() {
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800613 return programmedSubs.stream()
614 .collect(collectingAndThen(
615 groupingBy(Map.Entry::getKey, mapping(Map.Entry::getValue, toSet())),
616 ImmutableMap::copyOf));
Saurav Das82b8e6d2018-10-04 15:25:12 -0700617 }
618
619 @Override
Saurav Das2d3777a2020-08-07 18:48:51 -0700620 public ImmutableMap<ConnectPoint, Set<UniTagInformation>> getFailedSubs() {
621 return failedSubs.stream()
622 .collect(collectingAndThen(
623 groupingBy(Map.Entry::getKey, mapping(Map.Entry::getValue, toSet())),
624 ImmutableMap::copyOf));
625 }
626
627 @Override
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100628 public List<DeviceId> fetchOlts() {
629 // look through all the devices and find the ones that are OLTs as per Sadis
630 List<DeviceId> olts = new ArrayList<>();
631 Iterable<Device> devices = deviceService.getDevices();
632 for (Device d : devices) {
Saurav Das82b8e6d2018-10-04 15:25:12 -0700633 if (getOltInfo(d) != null) {
634 // So this is indeed an OLT device
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100635 olts.add(d.id());
636 }
637 }
638 return olts;
alshabibe0559672016-02-21 14:49:51 -0800639 }
640
Amit Ghosh31939522018-08-16 13:28:21 +0100641 /**
642 * Finds the connect point to which a subscriber is connected.
643 *
644 * @param id The id of the subscriber, this is the same ID as in Sadis
645 * @return Subscribers ConnectPoint if found else null
646 */
647 private ConnectPoint findSubscriberConnectPoint(String id) {
648
649 Iterable<Device> devices = deviceService.getDevices();
650 for (Device d : devices) {
651 for (Port p : deviceService.getPorts(d.id())) {
652 log.trace("Comparing {} with {}", p.annotations().value(AnnotationKeys.PORT_NAME), id);
653 if (p.annotations().value(AnnotationKeys.PORT_NAME).equals(id)) {
654 log.debug("Found on device {} port {}", d.id(), p.number());
655 return new ConnectPoint(d.id(), p.number());
656 }
657 }
658 }
659 return null;
660 }
661
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000662 /**
663 * Gets the context of the bandwidth profile information for the given parameter.
664 *
665 * @param bandwidthProfile the bandwidth profile id
666 * @return the context of the bandwidth profile information
667 */
Gamze Abaka641fc072018-09-04 09:16:27 +0000668 private BandwidthProfileInformation getBandwidthProfileInformation(String bandwidthProfile) {
Ilayda Ozdemir90a93622021-02-25 09:40:58 +0000669 if (bpService == null) {
670 log.warn(SADIS_NOT_RUNNING);
671 return null;
672 }
Gamze Abaka641fc072018-09-04 09:16:27 +0000673 if (bandwidthProfile == null) {
674 return null;
675 }
676 return bpService.get(bandwidthProfile);
677 }
678
Gamze Abaka838d8142019-02-21 07:06:55 +0000679 /**
Gamze Abaka33feef52019-02-27 08:16:47 +0000680 * Removes subscriber vlan flows.
Gamze Abaka838d8142019-02-21 07:06:55 +0000681 *
Gamze Abaka838d8142019-02-21 07:06:55 +0000682 * @param uplink uplink port of the OLT
683 * @param subscriberPort uni port
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000684 * @param uniTag uni tag information
Gamze Abaka838d8142019-02-21 07:06:55 +0000685 */
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000686 private void unprovisionVlans(AccessDevicePort uplink, AccessDevicePort subscriberPort, UniTagInformation uniTag) {
687 log.info("Unprovisioning vlans for {} at {}", uniTag, subscriberPort);
688 DeviceId deviceId = subscriberPort.deviceId();
alshabibbf23a1f2016-01-14 17:27:11 -0800689
Tunahan Sezena07fe962021-02-24 08:24:24 +0000690 CompletableFuture<ObjectiveError> downFuture = new CompletableFuture<>();
691 CompletableFuture<ObjectiveError> upFuture = new CompletableFuture<>();
alshabibbf23a1f2016-01-14 17:27:11 -0800692
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000693 VlanId deviceVlan = uniTag.getPonSTag();
694 VlanId subscriberVlan = uniTag.getPonCTag();
Gamze Abaka641fc072018-09-04 09:16:27 +0000695
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000696 MeterId upstreamMeterId = oltMeterService
697 .getMeterIdFromBpMapping(deviceId, uniTag.getUpstreamBandwidthProfile());
698 MeterId downstreamMeterId = oltMeterService
699 .getMeterIdFromBpMapping(deviceId, uniTag.getDownstreamBandwidthProfile());
Gamze Abaka641fc072018-09-04 09:16:27 +0000700
Tunahan Sezena07fe962021-02-24 08:24:24 +0000701 Optional<SubscriberFlowInfo> waitingMacSubFlowInfo =
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000702 getAndRemoveWaitingMacSubFlowInfoForCTag(new ConnectPoint(deviceId, subscriberPort.number()),
703 subscriberVlan);
Tunahan Sezena07fe962021-02-24 08:24:24 +0000704 if (waitingMacSubFlowInfo.isPresent()) {
705 // only dhcp objectives applied previously, so only dhcp uninstallation objective will be processed
706 log.debug("Waiting MAC service removed and dhcp uninstallation objective will be processed. " +
707 "waitingMacSubFlowInfo:{}", waitingMacSubFlowInfo.get());
708 CompletableFuture<ObjectiveError> dhcpFuture = new CompletableFuture<>();
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000709 oltFlowService.processDhcpFilteringObjectives(subscriberPort,
Tunahan Sezena07fe962021-02-24 08:24:24 +0000710 upstreamMeterId, uniTag, false, true, Optional.of(dhcpFuture));
711 dhcpFuture.thenAcceptAsync(dhcpStatus -> {
712 AccessDeviceEvent.Type type;
713 if (dhcpStatus == null) {
714 type = AccessDeviceEvent.Type.SUBSCRIBER_UNI_TAG_UNREGISTERED;
715 log.debug("Dhcp uninstallation objective was processed successfully for cTag {}, sTag {}, " +
716 "tpId {} and Device/Port:{}", uniTag.getPonCTag(), uniTag.getPonSTag(),
717 uniTag.getTechnologyProfileId(), subscriberPort);
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000718 updateProgrammedSubscriber(subscriberPort, uniTag, false);
Tunahan Sezena07fe962021-02-24 08:24:24 +0000719 } else {
720 type = AccessDeviceEvent.Type.SUBSCRIBER_UNI_TAG_UNREGISTRATION_FAILED;
721 log.error("Dhcp uninstallation objective was failed for cTag {}, sTag {}, " +
722 "tpId {} and Device/Port:{} :{}", uniTag.getPonCTag(), uniTag.getPonSTag(),
723 uniTag.getTechnologyProfileId(), subscriberPort, dhcpStatus);
724 }
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000725 post(new AccessDeviceEvent(type, deviceId, subscriberPort.port(),
Tunahan Sezena07fe962021-02-24 08:24:24 +0000726 deviceVlan, subscriberVlan, uniTag.getTechnologyProfileId()));
727 });
728 return;
729 } else {
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000730 log.debug("There is no waiting MAC service for {} and subscriberVlan: {}", subscriberPort, subscriberVlan);
Tunahan Sezena07fe962021-02-24 08:24:24 +0000731 }
732
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000733 ForwardingObjective.Builder upFwd =
734 oltFlowService.createUpBuilder(uplink, subscriberPort, upstreamMeterId, uniTag);
Tunahan Sezena07fe962021-02-24 08:24:24 +0000735
736 Optional<MacAddress> macAddress = getMacAddress(deviceId, subscriberPort, uniTag);
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000737 ForwardingObjective.Builder downFwd =
Tunahan Sezena07fe962021-02-24 08:24:24 +0000738 oltFlowService.createDownBuilder(uplink, subscriberPort, downstreamMeterId, uniTag, macAddress);
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000739
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000740 oltFlowService.processIgmpFilteringObjectives(subscriberPort, upstreamMeterId, uniTag, false, true);
741 oltFlowService.processDhcpFilteringObjectives(subscriberPort,
742 upstreamMeterId, uniTag, false, true, Optional.empty());
743 oltFlowService.processPPPoEDFilteringObjectives(subscriberPort, upstreamMeterId, uniTag, false, true);
alshabibbf23a1f2016-01-14 17:27:11 -0800744
alshabib4ceaed32016-03-03 18:00:58 -0800745 flowObjectiveService.forward(deviceId, upFwd.remove(new ObjectiveContext() {
746 @Override
747 public void onSuccess(Objective objective) {
748 upFuture.complete(null);
749 }
alshabibbf23a1f2016-01-14 17:27:11 -0800750
alshabib4ceaed32016-03-03 18:00:58 -0800751 @Override
752 public void onError(Objective objective, ObjectiveError error) {
753 upFuture.complete(error);
754 }
755 }));
756
757 flowObjectiveService.forward(deviceId, downFwd.remove(new ObjectiveContext() {
758 @Override
759 public void onSuccess(Objective objective) {
760 downFuture.complete(null);
761 }
762
763 @Override
764 public void onError(Objective objective, ObjectiveError error) {
765 downFuture.complete(error);
766 }
767 }));
alshabibbf23a1f2016-01-14 17:27:11 -0800768
769 upFuture.thenAcceptBothAsync(downFuture, (upStatus, downStatus) -> {
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000770 AccessDeviceEvent.Type type = AccessDeviceEvent.Type.SUBSCRIBER_UNI_TAG_UNREGISTERED;
alshabibbf23a1f2016-01-14 17:27:11 -0800771 if (upStatus == null && downStatus == null) {
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000772 log.debug("Uni tag information is unregistered successfully for cTag {}, sTag {}, tpId {} on {}",
773 uniTag.getPonCTag(), uniTag.getPonSTag(), uniTag.getTechnologyProfileId(), subscriberPort);
774 updateProgrammedSubscriber(subscriberPort, uniTag, false);
alshabibbf23a1f2016-01-14 17:27:11 -0800775 } else if (downStatus != null) {
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000776 log.error("Subscriber with vlan {} on {} failed downstream uninstallation: {}",
777 subscriberVlan, subscriberPort, downStatus);
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000778 type = AccessDeviceEvent.Type.SUBSCRIBER_UNI_TAG_UNREGISTRATION_FAILED;
alshabibbf23a1f2016-01-14 17:27:11 -0800779 } else if (upStatus != null) {
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000780 log.error("Subscriber with vlan {} on {} failed upstream uninstallation: {}",
781 subscriberVlan, subscriberPort, upStatus);
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000782 type = AccessDeviceEvent.Type.SUBSCRIBER_UNI_TAG_UNREGISTRATION_FAILED;
alshabibbf23a1f2016-01-14 17:27:11 -0800783 }
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000784 post(new AccessDeviceEvent(type, deviceId, subscriberPort.port(), deviceVlan, subscriberVlan,
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000785 uniTag.getTechnologyProfileId()));
alshabibbf23a1f2016-01-14 17:27:11 -0800786 }, oltInstallers);
Jonathan Harte533a422015-10-20 17:31:24 -0700787 }
788
Tunahan Sezena07fe962021-02-24 08:24:24 +0000789 private Optional<SubscriberFlowInfo> getAndRemoveWaitingMacSubFlowInfoForCTag(ConnectPoint cp, VlanId cTag) {
790 SubscriberFlowInfo returnSubFlowInfo = null;
791 Collection<? extends SubscriberFlowInfo> subFlowInfoSet = waitingMacSubscribers.get(cp).value();
792 for (SubscriberFlowInfo subFlowInfo : subFlowInfoSet) {
793 if (subFlowInfo.getTagInfo().getPonCTag().equals(cTag)) {
794 returnSubFlowInfo = subFlowInfo;
795 break;
796 }
797 }
798 if (returnSubFlowInfo != null) {
799 waitingMacSubscribers.remove(cp, returnSubFlowInfo);
800 return Optional.of(returnSubFlowInfo);
801 }
802 return Optional.empty();
803 }
804
Gamze Abaka838d8142019-02-21 07:06:55 +0000805 /**
Gamze Abaka33feef52019-02-27 08:16:47 +0000806 * Adds subscriber vlan flows, dhcp, eapol and igmp trap flows for the related uni port.
Gamze Abaka838d8142019-02-21 07:06:55 +0000807 *
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000808 * @param subPort the connection point of the subscriber
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000809 * @param uplinkPort uplink port of the OLT (the nni port)
810 * @param sub subscriber information that includes s, c tags, tech profile and bandwidth profile references
Gamze Abaka838d8142019-02-21 07:06:55 +0000811 */
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000812 private void provisionUniTagList(AccessDevicePort subPort, AccessDevicePort uplinkPort,
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000813 SubscriberAndDeviceInformation sub) {
Gamze Abaka641fc072018-09-04 09:16:27 +0000814
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000815 log.debug("Provisioning vlans for subscriber on {}", subPort);
Matteo Scandolo19b56f62020-10-29 13:29:21 -0700816 if (log.isTraceEnabled()) {
817 log.trace("Subscriber informations {}", sub);
818 }
Gamze Abaka641fc072018-09-04 09:16:27 +0000819
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000820 if (sub.uniTagList() == null || sub.uniTagList().isEmpty()) {
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000821 log.warn("Unitaglist doesn't exist for the subscriber {} on {}", sub.id(), subPort);
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000822 return;
823 }
Gamze Abaka641fc072018-09-04 09:16:27 +0000824
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000825 for (UniTagInformation uniTag : sub.uniTagList()) {
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000826 handleSubscriberFlows(uplinkPort, subPort, uniTag);
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000827 }
828 }
alshabib3ea82642016-01-12 18:06:53 -0800829
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000830 /**
831 * Finds the uni tag information and provisions the found information.
832 * If the uni tag information is not found, returns
833 *
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000834 * @param uplinkPort the nni port
835 * @param subscriberPort the uni port
836 * @param innerVlan the pon c tag
837 * @param outerVlan the pon s tag
838 * @param tpId the technology profile id
839 */
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000840 private void provisionUniTagInformation(AccessDevicePort uplinkPort,
841 AccessDevicePort subscriberPort,
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000842 VlanId innerVlan,
843 VlanId outerVlan,
844 Integer tpId) {
Jonathan Harte533a422015-10-20 17:31:24 -0700845
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000846 Optional<UniTagInformation> gotTagInformation = getUniTagInformation(subscriberPort, innerVlan,
847 outerVlan, tpId);
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000848 if (!gotTagInformation.isPresent()) {
849 return;
850 }
851 UniTagInformation tagInformation = gotTagInformation.get();
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000852 handleSubscriberFlows(uplinkPort, subscriberPort, tagInformation);
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000853 }
alshabib3ea82642016-01-12 18:06:53 -0800854
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000855 private void updateProgrammedSubscriber(AccessDevicePort port, UniTagInformation tagInformation, boolean add) {
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800856 if (add) {
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000857 programmedSubs.put(new ConnectPoint(port.deviceId(), port.number()), tagInformation);
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800858 } else {
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000859 programmedSubs.remove(new ConnectPoint(port.deviceId(), port.number()), tagInformation);
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800860 }
Jonathan Harte533a422015-10-20 17:31:24 -0700861 }
862
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000863 /**
864 * Installs a uni tag information flow.
865 *
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000866 * @param uplinkPort the nni port
867 * @param subscriberPort the uni port
868 * @param tagInfo the uni tag information
869 */
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000870 private void handleSubscriberFlows(AccessDevicePort uplinkPort, AccessDevicePort subscriberPort,
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000871 UniTagInformation tagInfo) {
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000872 log.debug("Provisioning vlan-based flows for the uniTagInformation {} on {}", tagInfo, subscriberPort);
873 DeviceId deviceId = subscriberPort.deviceId();
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000874
875 if (multicastServiceName.equals(tagInfo.getServiceName())) {
876 // IGMP flows are taken care of along with VOD service
877 // Please note that for each service, Subscriber Registered event will be sent
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000878 post(new AccessDeviceEvent(AccessDeviceEvent.Type.SUBSCRIBER_UNI_TAG_REGISTERED, deviceId,
879 subscriberPort.port(), tagInfo.getPonSTag(), tagInfo.getPonCTag(),
880 tagInfo.getTechnologyProfileId()));
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000881 return;
Gamze Abaka641fc072018-09-04 09:16:27 +0000882 }
883
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000884 BandwidthProfileInformation upstreamBpInfo =
885 getBandwidthProfileInformation(tagInfo.getUpstreamBandwidthProfile());
886 BandwidthProfileInformation downstreamBpInfo =
887 getBandwidthProfileInformation(tagInfo.getDownstreamBandwidthProfile());
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700888 if (upstreamBpInfo == null) {
889 log.warn("No meter installed since no Upstream BW Profile definition found for "
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000890 + "ctag {} stag {} tpId {} on {}",
891 tagInfo.getPonCTag(), tagInfo.getPonSTag(), tagInfo.getTechnologyProfileId(), subscriberPort);
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700892 return;
893 }
894 if (downstreamBpInfo == null) {
895 log.warn("No meter installed since no Downstream BW Profile definition found for "
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000896 + "ctag {} stag {} tpId {} on {}",
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700897 tagInfo.getPonCTag(), tagInfo.getPonSTag(),
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000898 tagInfo.getTechnologyProfileId(), subscriberPort);
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700899 return;
900 }
Gamze Abakaf59c0912019-04-19 08:24:28 +0000901
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700902 // check for meterIds for the upstream and downstream bandwidth profiles
903 MeterId upMeterId = oltMeterService
904 .getMeterIdFromBpMapping(deviceId, upstreamBpInfo.id());
905 MeterId downMeterId = oltMeterService
906 .getMeterIdFromBpMapping(deviceId, downstreamBpInfo.id());
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000907 SubscriberFlowInfo fi = new SubscriberFlowInfo(uplinkPort, subscriberPort,
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700908 tagInfo, downMeterId, upMeterId,
909 downstreamBpInfo.id(), upstreamBpInfo.id());
910
911 if (upMeterId != null && downMeterId != null) {
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000912 log.debug("Meters are existing for upstream {} and downstream {} on {}",
913 upstreamBpInfo.id(), downstreamBpInfo.id(), subscriberPort);
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700914 handleSubFlowsWithMeters(fi);
915 } else {
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000916 log.debug("Adding {} on {} to pending subs", fi, subscriberPort);
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700917 // one or both meters are not ready. It's possible they are in the process of being
918 // created for other subscribers that share the same bandwidth profile.
Andrea Campanella7a1d7e72020-11-05 10:40:10 +0100919 pendingSubscribersForDevice.compute(deviceId, (id, queue) -> {
920 if (queue == null) {
921 queue = new LinkedBlockingQueue<>();
922 }
923 queue.add(fi);
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000924 log.info("Added {} to pending subscribers on {}", fi, subscriberPort);
Andrea Campanella7a1d7e72020-11-05 10:40:10 +0100925 return queue;
926 });
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700927
928 // queue up the meters to be created
929 if (upMeterId == null) {
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000930 log.debug("Missing meter for upstream {} on {}", upstreamBpInfo.id(), subscriberPort);
Andrea Campanella600d2e22020-06-22 11:00:31 +0200931 checkAndCreateDevMeter(deviceId, upstreamBpInfo);
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700932 }
933 if (downMeterId == null) {
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000934 log.debug("Missing meter for downstream {} on {}", downstreamBpInfo.id(), subscriberPort);
Andrea Campanella600d2e22020-06-22 11:00:31 +0200935 checkAndCreateDevMeter(deviceId, downstreamBpInfo);
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700936 }
937 }
938 }
Ilayda Ozdemir90a93622021-02-25 09:40:58 +0000939
Andrea Campanella600d2e22020-06-22 11:00:31 +0200940 private void checkAndCreateDevMeter(DeviceId deviceId, BandwidthProfileInformation bwpInfo) {
Andrea Campanellad1e26642020-10-23 12:08:32 +0200941 //If false the meter is already being installed, skipping installation
942 if (!oltMeterService.checkAndAddPendingMeter(deviceId, bwpInfo)) {
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700943 return;
944 }
Andrea Campanella600d2e22020-06-22 11:00:31 +0200945 createMeter(deviceId, bwpInfo);
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700946 }
947
Andrea Campanella600d2e22020-06-22 11:00:31 +0200948 private void createMeter(DeviceId deviceId, BandwidthProfileInformation bwpInfo) {
Matteo Scandolo19b56f62020-10-29 13:29:21 -0700949 log.debug("Creating Meter with {} on {}", bwpInfo, deviceId);
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700950 CompletableFuture<Object> meterFuture = new CompletableFuture<>();
Andrea Campanella3ce4d282020-06-09 13:46:58 +0200951
Andrea Campanella600d2e22020-06-22 11:00:31 +0200952 MeterId meterId = oltMeterService.createMeter(deviceId, bwpInfo,
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700953 meterFuture);
954
955 meterFuture.thenAcceptAsync(result -> {
Andrea Campanella7a1d7e72020-11-05 10:40:10 +0100956 BlockingQueue<SubscriberFlowInfo> queue = pendingSubscribersForDevice.get(deviceId);
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700957 // iterate through the subscribers on hold
Andrea Campanella7a1d7e72020-11-05 10:40:10 +0100958 if (queue != null) {
959 while (true) {
960 //TODO this might return the reference and not the actual object so
961 // it can be actually swapped underneath us.
962 SubscriberFlowInfo fi = queue.peek();
963 if (fi == null) {
964 log.debug("No more subscribers pending on {}", deviceId);
Ilayda Ozdemir90a93622021-02-25 09:40:58 +0000965 pendingSubscribersForDevice.replace(deviceId, queue);
Andrea Campanella7a1d7e72020-11-05 10:40:10 +0100966 break;
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700967 }
Andrea Campanella7a1d7e72020-11-05 10:40:10 +0100968 if (result == null) {
969 // meter install sent to device
970 log.debug("Meter {} installed for bw {} on {}", meterId, bwpInfo, deviceId);
971
972 MeterId upMeterId = oltMeterService
973 .getMeterIdFromBpMapping(deviceId, fi.getUpBpInfo());
974 MeterId downMeterId = oltMeterService
975 .getMeterIdFromBpMapping(deviceId, fi.getDownBpInfo());
976 if (upMeterId != null && downMeterId != null) {
977 log.debug("Provisioning subscriber after meter {} " +
978 "installation and both meters are present " +
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000979 "upstream {} and downstream {} on {}",
980 meterId, upMeterId, downMeterId, fi.getUniPort());
Andrea Campanella7a1d7e72020-11-05 10:40:10 +0100981 // put in the meterIds because when fi was first
982 // created there may or may not have been a meterId
983 // depending on whether the meter was created or
984 // not at that time.
985 fi.setUpMeterId(upMeterId);
986 fi.setDownMeterId(downMeterId);
987 handleSubFlowsWithMeters(fi);
988 queue.remove(fi);
989 }
990 oltMeterService.removeFromPendingMeters(deviceId, bwpInfo);
991 } else {
992 // meter install failed
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000993 log.error("Addition of subscriber {} on {} failed due to meter " +
994 "{} with result {}", fi, fi.getUniPort(), meterId, result);
Andrea Campanella7a1d7e72020-11-05 10:40:10 +0100995 queue.remove(fi);
996 oltMeterService.removeFromPendingMeters(deviceId, bwpInfo);
997 }
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700998 }
Andrea Campanella7a1d7e72020-11-05 10:40:10 +0100999 } else {
1000 log.info("No pending subscribers on {}", deviceId);
Andrea Campanella0c3309d2020-05-29 01:51:18 -07001001 }
1002 });
Andrea Campanella7a1d7e72020-11-05 10:40:10 +01001003
Andrea Campanella0c3309d2020-05-29 01:51:18 -07001004 }
Ilayda Ozdemir90a93622021-02-25 09:40:58 +00001005
Andrea Campanella0c3309d2020-05-29 01:51:18 -07001006 /**
1007 * Add subscriber flows given meter information for both upstream and
1008 * downstream directions.
1009 *
1010 * @param subscriberFlowInfo relevant information for subscriber
1011 */
1012 private void handleSubFlowsWithMeters(SubscriberFlowInfo subscriberFlowInfo) {
Tunahan Sezena07fe962021-02-24 08:24:24 +00001013 log.debug("Provisioning subscriber flows based on {}", subscriberFlowInfo);
1014 UniTagInformation tagInfo = subscriberFlowInfo.getTagInfo();
1015 if (tagInfo.getIsDhcpRequired()) {
1016 Optional<MacAddress> macAddress =
1017 getMacAddress(subscriberFlowInfo.getDevId(), subscriberFlowInfo.getUniPort(), tagInfo);
1018 if (subscriberFlowInfo.getTagInfo().getEnableMacLearning()) {
Tunahan Sezenf0843b92021-04-30 07:13:16 +00001019 ConnectPoint cp = new ConnectPoint(subscriberFlowInfo.getDevId(),
1020 subscriberFlowInfo.getUniPort().number());
Tunahan Sezena07fe962021-02-24 08:24:24 +00001021 if (macAddress.isPresent()) {
1022 log.debug("MAC Address {} obtained for {}", macAddress.get(), subscriberFlowInfo);
1023 } else {
1024 waitingMacSubscribers.put(cp, subscriberFlowInfo);
1025 log.debug("Adding sub to waiting mac map: {}", subscriberFlowInfo);
1026 }
1027
1028 CompletableFuture<ObjectiveError> dhcpFuture = new CompletableFuture<>();
Tunahan Sezenf0843b92021-04-30 07:13:16 +00001029 oltFlowService.processDhcpFilteringObjectives(subscriberFlowInfo.getUniPort(),
1030 subscriberFlowInfo.getUpId(), tagInfo, true, true, Optional.of(dhcpFuture));
Tunahan Sezena07fe962021-02-24 08:24:24 +00001031 dhcpFuture.thenAcceptAsync(dhcpStatus -> {
1032 if (dhcpStatus != null) {
1033 log.error("Dhcp Objective failed for {}: {}", subscriberFlowInfo, dhcpStatus);
1034 if (macAddress.isEmpty()) {
1035 waitingMacSubscribers.remove(cp, subscriberFlowInfo);
1036 }
1037 post(new AccessDeviceEvent(AccessDeviceEvent.Type.SUBSCRIBER_UNI_TAG_REGISTRATION_FAILED,
Tunahan Sezenf0843b92021-04-30 07:13:16 +00001038 subscriberFlowInfo.getDevId(), subscriberFlowInfo.getUniPort().port(),
Tunahan Sezena07fe962021-02-24 08:24:24 +00001039 tagInfo.getPonSTag(), tagInfo.getPonCTag(), tagInfo.getTechnologyProfileId()));
1040 } else {
1041 log.debug("Dhcp Objective success for: {}", subscriberFlowInfo);
1042 if (macAddress.isPresent()) {
1043 continueProvisioningSubs(subscriberFlowInfo, macAddress);
1044 }
1045 }
1046 });
1047 } else {
1048 log.debug("Dynamic MAC Learning disabled, so will not learn for: {}", subscriberFlowInfo);
1049 // dhcp flows will handle after data plane flows
1050 continueProvisioningSubs(subscriberFlowInfo, macAddress);
1051 }
1052 } else {
1053 // dhcp not required for this service
1054 continueProvisioningSubs(subscriberFlowInfo, Optional.empty());
1055 }
1056 }
1057
1058 private void continueProvisioningSubs(SubscriberFlowInfo subscriberFlowInfo, Optional<MacAddress> macAddress) {
Tunahan Sezenf0843b92021-04-30 07:13:16 +00001059 AccessDevicePort uniPort = subscriberFlowInfo.getUniPort();
1060 log.debug("Provisioning subscriber flows on {} based on {}", uniPort, subscriberFlowInfo);
Andrea Campanella0c3309d2020-05-29 01:51:18 -07001061 UniTagInformation tagInfo = subscriberFlowInfo.getTagInfo();
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001062 CompletableFuture<ObjectiveError> upFuture = new CompletableFuture<>();
1063 CompletableFuture<ObjectiveError> downFuture = new CompletableFuture<>();
Gamze Abakaf59c0912019-04-19 08:24:28 +00001064
Andrea Campanella0c3309d2020-05-29 01:51:18 -07001065 ForwardingObjective.Builder upFwd =
Tunahan Sezenf0843b92021-04-30 07:13:16 +00001066 oltFlowService.createUpBuilder(subscriberFlowInfo.getNniPort(), uniPort,
Andrea Campanella0c3309d2020-05-29 01:51:18 -07001067 subscriberFlowInfo.getUpId(), subscriberFlowInfo.getTagInfo());
1068 flowObjectiveService.forward(subscriberFlowInfo.getDevId(), upFwd.add(new ObjectiveContext() {
1069 @Override
1070 public void onSuccess(Objective objective) {
Tunahan Sezenf0843b92021-04-30 07:13:16 +00001071 log.debug("Upstream HSIA flow {} installed successfully on {}", subscriberFlowInfo, uniPort);
Andrea Campanella0c3309d2020-05-29 01:51:18 -07001072 upFuture.complete(null);
Gamze Abakaf59c0912019-04-19 08:24:28 +00001073 }
Gamze Abakaf59c0912019-04-19 08:24:28 +00001074
Andrea Campanella0c3309d2020-05-29 01:51:18 -07001075 @Override
1076 public void onError(Objective objective, ObjectiveError error) {
1077 upFuture.complete(error);
Gamze Abakaf59c0912019-04-19 08:24:28 +00001078 }
Andrea Campanella0c3309d2020-05-29 01:51:18 -07001079 }));
1080
1081 ForwardingObjective.Builder downFwd =
Tunahan Sezenf0843b92021-04-30 07:13:16 +00001082 oltFlowService.createDownBuilder(subscriberFlowInfo.getNniPort(), uniPort,
Tunahan Sezena07fe962021-02-24 08:24:24 +00001083 subscriberFlowInfo.getDownId(), subscriberFlowInfo.getTagInfo(), macAddress);
Andrea Campanella0c3309d2020-05-29 01:51:18 -07001084 flowObjectiveService.forward(subscriberFlowInfo.getDevId(), downFwd.add(new ObjectiveContext() {
1085 @Override
1086 public void onSuccess(Objective objective) {
Tunahan Sezenf0843b92021-04-30 07:13:16 +00001087 log.debug("Downstream HSIA flow {} installed successfully on {}", subscriberFlowInfo, uniPort);
Andrea Campanella0c3309d2020-05-29 01:51:18 -07001088 downFuture.complete(null);
1089 }
1090
1091 @Override
1092 public void onError(Objective objective, ObjectiveError error) {
1093 downFuture.complete(error);
1094 }
1095 }));
Gamze Abakaf59c0912019-04-19 08:24:28 +00001096
Amit Ghoshe1d3f092018-10-09 19:44:33 +01001097 upFuture.thenAcceptBothAsync(downFuture, (upStatus, downStatus) -> {
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001098 AccessDeviceEvent.Type type = AccessDeviceEvent.Type.SUBSCRIBER_UNI_TAG_REGISTERED;
Amit Ghoshe1d3f092018-10-09 19:44:33 +01001099 if (downStatus != null) {
Tunahan Sezenf0843b92021-04-30 07:13:16 +00001100 log.error("Flow with innervlan {} and outerVlan {} on {} failed downstream installation: {}",
1101 tagInfo.getPonCTag(), tagInfo.getPonSTag(), uniPort, downStatus);
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001102 type = AccessDeviceEvent.Type.SUBSCRIBER_UNI_TAG_REGISTRATION_FAILED;
Amit Ghoshe1d3f092018-10-09 19:44:33 +01001103 } else if (upStatus != null) {
Tunahan Sezenf0843b92021-04-30 07:13:16 +00001104 log.error("Flow with innervlan {} and outerVlan {} on {} failed upstream installation: {}",
1105 tagInfo.getPonCTag(), tagInfo.getPonSTag(), uniPort, upStatus);
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001106 type = AccessDeviceEvent.Type.SUBSCRIBER_UNI_TAG_REGISTRATION_FAILED;
Gamze Abakaf59c0912019-04-19 08:24:28 +00001107 } else {
Tunahan Sezenf0843b92021-04-30 07:13:16 +00001108 log.debug("Upstream and downstream data plane flows are installed successfully on {}", uniPort);
1109 oltFlowService.processEapolFilteringObjectives(uniPort, tagInfo.getUpstreamBandwidthProfile(),
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001110 null, tagInfo.getPonCTag(), true);
Tunahan Sezena07fe962021-02-24 08:24:24 +00001111
1112
1113 if (!tagInfo.getEnableMacLearning()) {
Tunahan Sezenf0843b92021-04-30 07:13:16 +00001114 oltFlowService.processDhcpFilteringObjectives(uniPort, subscriberFlowInfo.getUpId(),
Tunahan Sezena07fe962021-02-24 08:24:24 +00001115 tagInfo, true, true, Optional.empty());
1116 }
Gamze Abakaf59c0912019-04-19 08:24:28 +00001117
Tunahan Sezenf0843b92021-04-30 07:13:16 +00001118 oltFlowService.processIgmpFilteringObjectives(uniPort, subscriberFlowInfo.getUpId(),
Andrea Campanella0c3309d2020-05-29 01:51:18 -07001119 tagInfo, true, true);
Gustavo Silva5c492dd2021-02-12 10:21:11 -03001120
Tunahan Sezenf0843b92021-04-30 07:13:16 +00001121 oltFlowService.processPPPoEDFilteringObjectives(uniPort, subscriberFlowInfo.getUpId(),
Gustavo Silva5c492dd2021-02-12 10:21:11 -03001122 tagInfo, true, true);
Tunahan Sezenf0843b92021-04-30 07:13:16 +00001123 updateProgrammedSubscriber(uniPort, tagInfo, true);
Amit Ghoshe1d3f092018-10-09 19:44:33 +01001124 }
Tunahan Sezenf0843b92021-04-30 07:13:16 +00001125 post(new AccessDeviceEvent(type, subscriberFlowInfo.getDevId(), uniPort.port(),
Andrea Campanella0c3309d2020-05-29 01:51:18 -07001126 tagInfo.getPonSTag(), tagInfo.getPonCTag(),
1127 tagInfo.getTechnologyProfileId()));
Amit Ghoshe1d3f092018-10-09 19:44:33 +01001128 }, oltInstallers);
Amit Ghoshe1d3f092018-10-09 19:44:33 +01001129 }
1130
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001131 /**
Tunahan Sezena07fe962021-02-24 08:24:24 +00001132 * Gets mac address from tag info if present, else checks the host service.
1133 *
1134 * @param deviceId device ID
Tunahan Sezenf0843b92021-04-30 07:13:16 +00001135 * @param port uni port
Tunahan Sezena07fe962021-02-24 08:24:24 +00001136 * @param tagInformation tag info
1137 * @return MAC Address of subscriber
1138 */
Tunahan Sezenf0843b92021-04-30 07:13:16 +00001139 private Optional<MacAddress> getMacAddress(DeviceId deviceId, AccessDevicePort port,
Tunahan Sezena07fe962021-02-24 08:24:24 +00001140 UniTagInformation tagInformation) {
1141 if (isMacAddressValid(tagInformation)) {
Tunahan Sezenf0843b92021-04-30 07:13:16 +00001142 log.debug("Got MAC Address {} from the uniTagInformation for {} and cTag {}",
1143 tagInformation.getConfiguredMacAddress(), port, tagInformation.getPonCTag());
Tunahan Sezena07fe962021-02-24 08:24:24 +00001144 return Optional.of(MacAddress.valueOf(tagInformation.getConfiguredMacAddress()));
1145 } else if (tagInformation.getEnableMacLearning()) {
Tunahan Sezenf0843b92021-04-30 07:13:16 +00001146 Optional<Host> optHost = hostService.getConnectedHosts(new ConnectPoint(deviceId, port.number()))
Tunahan Sezena07fe962021-02-24 08:24:24 +00001147 .stream().filter(host -> host.vlan().equals(tagInformation.getPonCTag())).findFirst();
1148 if (optHost.isPresent()) {
Tunahan Sezenf0843b92021-04-30 07:13:16 +00001149 log.debug("Got MAC Address {} from the hostService for {} and cTag {}",
1150 optHost.get().mac(), port, tagInformation.getPonCTag());
Tunahan Sezena07fe962021-02-24 08:24:24 +00001151 return Optional.of(optHost.get().mac());
1152 }
1153 }
Tunahan Sezenf0843b92021-04-30 07:13:16 +00001154 log.debug("Could not obtain MAC Address for {} and cTag {}", port, tagInformation.getPonCTag());
Tunahan Sezena07fe962021-02-24 08:24:24 +00001155 return Optional.empty();
1156 }
1157
1158 private boolean isMacAddressValid(UniTagInformation tagInformation) {
1159 return tagInformation.getConfiguredMacAddress() != null &&
1160 !tagInformation.getConfiguredMacAddress().trim().equals("") &&
1161 !MacAddress.NONE.equals(MacAddress.valueOf(tagInformation.getConfiguredMacAddress()));
1162 }
1163
1164 /**
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001165 * Checks the subscriber uni tag list and find the uni tag information.
1166 * using the pon c tag, pon s tag and the technology profile id
1167 * May return Optional<null>
1168 *
Tunahan Sezenf0843b92021-04-30 07:13:16 +00001169 * @param port port of the subscriber
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001170 * @param innerVlan pon c tag
1171 * @param outerVlan pon s tag
1172 * @param tpId the technology profile id
1173 * @return the found uni tag information
1174 */
Tunahan Sezenf0843b92021-04-30 07:13:16 +00001175 private Optional<UniTagInformation> getUniTagInformation(AccessDevicePort port, VlanId innerVlan,
1176 VlanId outerVlan, int tpId) {
1177 log.debug("Getting uni tag information for {}, innerVlan: {}, outerVlan: {}, tpId: {}",
1178 port, innerVlan, outerVlan, tpId);
1179 SubscriberAndDeviceInformation subInfo = getSubscriber(new ConnectPoint(port.deviceId(), port.number()));
Gamze Abakaf59c0912019-04-19 08:24:28 +00001180 if (subInfo == null) {
Tunahan Sezenf0843b92021-04-30 07:13:16 +00001181 log.warn("Subscriber information doesn't exist for {}", port);
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001182 return Optional.empty();
Gamze Abakaf59c0912019-04-19 08:24:28 +00001183 }
1184
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001185 List<UniTagInformation> uniTagList = subInfo.uniTagList();
1186 if (uniTagList == null) {
Tunahan Sezenf0843b92021-04-30 07:13:16 +00001187 log.warn("Uni tag list is not found for the subscriber {} on {}", subInfo.id(), port);
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001188 return Optional.empty();
1189 }
Amit Ghoshe1d3f092018-10-09 19:44:33 +01001190
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001191 UniTagInformation service = null;
1192 for (UniTagInformation tagInfo : subInfo.uniTagList()) {
1193 if (innerVlan.equals(tagInfo.getPonCTag()) && outerVlan.equals(tagInfo.getPonSTag())
1194 && tpId == tagInfo.getTechnologyProfileId()) {
1195 service = tagInfo;
1196 break;
Andy Bavier160e8682019-05-07 18:32:22 -07001197 }
Gamze Abaka1efc80c2019-02-15 12:10:54 +00001198 }
Gamze Abaka1efc80c2019-02-15 12:10:54 +00001199
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001200 if (service == null) {
Matteo Scandolo19b56f62020-10-29 13:29:21 -07001201 log.warn("SADIS doesn't include the service with ponCtag {} ponStag {} and tpId {} on {}",
Tunahan Sezenf0843b92021-04-30 07:13:16 +00001202 innerVlan, outerVlan, tpId, port);
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001203 return Optional.empty();
Gamze Abaka33feef52019-02-27 08:16:47 +00001204 }
1205
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001206 return Optional.of(service);
Amit Ghosh95e2f652017-08-23 12:49:46 +01001207 }
1208
Amit Ghosh1ed9aef2018-07-17 17:08:16 +01001209 /**
Jonathan Hart403372d2018-08-22 11:44:13 -07001210 * Creates trap flows for device, including DHCP and LLDP trap on NNI and
1211 * EAPOL trap on the UNIs, if device is present in Sadis config.
Amit Ghosh1ed9aef2018-07-17 17:08:16 +01001212 *
1213 * @param dev Device to look for
1214 */
Jonathan Hart403372d2018-08-22 11:44:13 -07001215 private void checkAndCreateDeviceFlows(Device dev) {
Amit Ghosh1ed9aef2018-07-17 17:08:16 +01001216 // check if this device is provisioned in Sadis
Gamze Abaka641fc072018-09-04 09:16:27 +00001217 SubscriberAndDeviceInformation deviceInfo = getOltInfo(dev);
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001218 log.info("checkAndCreateDeviceFlows: deviceInfo {}", deviceInfo);
Amit Ghosh1ed9aef2018-07-17 17:08:16 +01001219
1220 if (deviceInfo != null) {
Andrea Campanella438e1ad2021-03-26 11:41:16 +01001221 log.debug("Driver for device {} is {}", dev.id(),
1222 driverService.getDriver(dev.id()));
Amit Ghosh1ed9aef2018-07-17 17:08:16 +01001223 for (Port p : deviceService.getPorts(dev.id())) {
Jonathan Hart4f178fa2020-02-03 10:46:01 -08001224 if (PortNumber.LOCAL.equals(p.number()) || !p.isEnabled()) {
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001225 continue;
1226 }
Amit Ghosh1ed9aef2018-07-17 17:08:16 +01001227 if (isUniPort(dev, p)) {
Tunahan Sezenf0843b92021-04-30 07:13:16 +00001228 AccessDevicePort port = new AccessDevicePort(p, AccessDevicePort.Type.UNI);
Andrea Campanellaa2491782020-03-13 18:09:31 +01001229 if (!programmedSubs.containsKey(new ConnectPoint(dev.id(), p.number()))) {
Tunahan Sezenf0843b92021-04-30 07:13:16 +00001230 log.info("Creating Eapol on {}", port);
1231 oltFlowService.processEapolFilteringObjectives(port, defaultBpId, null,
Andrea Campanellaa2491782020-03-13 18:09:31 +01001232 VlanId.vlanId(EAPOL_DEFAULT_VLAN), true);
1233 } else {
Tunahan Sezenf0843b92021-04-30 07:13:16 +00001234 log.debug("Subscriber Eapol on {} is already provisioned, not installing default", port);
Andrea Campanellaa2491782020-03-13 18:09:31 +01001235 }
Jonathan Hart403372d2018-08-22 11:44:13 -07001236 } else {
Tunahan Sezenf0843b92021-04-30 07:13:16 +00001237 AccessDevicePort port = new AccessDevicePort(p, AccessDevicePort.Type.NNI);
1238 oltFlowService.processNniFilteringObjectives(port, true);
Amit Ghosh1ed9aef2018-07-17 17:08:16 +01001239 }
1240 }
1241 }
1242 }
1243
Jonathan Hart403372d2018-08-22 11:44:13 -07001244
Amit Ghosh1ed9aef2018-07-17 17:08:16 +01001245 /**
1246 * Get the uplink for of the OLT device.
Gamze Abakaad329652018-12-20 10:12:21 +00001247 * <p>
Amit Ghosh1ed9aef2018-07-17 17:08:16 +01001248 * This assumes that the OLT has a single uplink port. When more uplink ports need to be supported
1249 * this logic needs to be changed
1250 *
1251 * @param dev Device to look for
1252 * @return The uplink Port of the OLT
1253 */
Tunahan Sezenf0843b92021-04-30 07:13:16 +00001254 private AccessDevicePort getUplinkPort(Device dev) {
Amit Ghosh1ed9aef2018-07-17 17:08:16 +01001255 // check if this device is provisioned in Sadis
Gamze Abaka641fc072018-09-04 09:16:27 +00001256 SubscriberAndDeviceInformation deviceInfo = getOltInfo(dev);
Saurav Daseae48de2019-06-19 13:26:15 -07001257 log.trace("getUplinkPort: deviceInfo {}", deviceInfo);
Saurav Das82b8e6d2018-10-04 15:25:12 -07001258 if (deviceInfo == null) {
1259 log.warn("Device {} is not configured in SADIS .. cannot fetch device"
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001260 + " info", dev.id());
Saurav Das82b8e6d2018-10-04 15:25:12 -07001261 return null;
1262 }
1263 // Return the port that has been configured as the uplink port of this OLT in Sadis
kdarapuaa5da252020-04-10 15:58:05 +05301264 Optional<Port> optionalPort = deviceService.getPorts(dev.id()).stream()
Andrea Campanella0c3309d2020-05-29 01:51:18 -07001265 .filter(port -> isNniPort(port) ||
1266 (port.number().toLong() == deviceInfo.uplinkPort()))
1267 .findFirst();
kdarapuaa5da252020-04-10 15:58:05 +05301268 if (optionalPort.isPresent()) {
1269 log.trace("getUplinkPort: Found port {}", optionalPort.get());
Tunahan Sezenf0843b92021-04-30 07:13:16 +00001270 return new AccessDevicePort(optionalPort.get(), AccessDevicePort.Type.NNI);
Amit Ghosh1ed9aef2018-07-17 17:08:16 +01001271 }
1272
Saurav Daseae48de2019-06-19 13:26:15 -07001273 log.warn("getUplinkPort: " + NO_UPLINK_PORT, dev.id());
Amit Ghosh1ed9aef2018-07-17 17:08:16 +01001274 return null;
1275 }
1276
1277 /**
1278 * Return the subscriber on a port.
1279 *
Matteo Scandolo962a6ad2018-12-11 15:39:42 -08001280 * @param cp ConnectPoint on which to find the subscriber
Amit Ghosh1ed9aef2018-07-17 17:08:16 +01001281 * @return subscriber if found else null
1282 */
Ilayda Ozdemir90a93622021-02-25 09:40:58 +00001283 protected SubscriberAndDeviceInformation getSubscriber(ConnectPoint cp) {
1284 if (subsService == null) {
1285 log.warn(SADIS_NOT_RUNNING);
1286 return null;
1287 }
Matteo Scandolo962a6ad2018-12-11 15:39:42 -08001288 Port port = deviceService.getPort(cp);
1289 checkNotNull(port, "Invalid connect point");
1290 String portName = port.annotations().value(AnnotationKeys.PORT_NAME);
Amit Ghosh1ed9aef2018-07-17 17:08:16 +01001291 return subsService.get(portName);
1292 }
1293
Gamze Abakaad329652018-12-20 10:12:21 +00001294 /**
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001295 * Checks whether the given port of the device is a uni port or not.
1296 *
1297 * @param d the access device
1298 * @param p the port of the device
1299 * @return true if the given port is a uni port
Gamze Abakaad329652018-12-20 10:12:21 +00001300 */
Amit Ghosh1ed9aef2018-07-17 17:08:16 +01001301 private boolean isUniPort(Device d, Port p) {
Tunahan Sezenf0843b92021-04-30 07:13:16 +00001302 AccessDevicePort ulPort = getUplinkPort(d);
Amit Ghosh1ed9aef2018-07-17 17:08:16 +01001303 if (ulPort != null) {
1304 return (ulPort.number().toLong() != p.number().toLong());
1305 }
Thomas Lee Sd7735f92020-02-20 19:21:47 +05301306 //handles a special case where NNI port is misconfigured in SADIS and getUplinkPort(d) returns null
1307 //checks whether the port name starts with nni- which is the signature of an NNI Port
1308 if (p.annotations().value(AnnotationKeys.PORT_NAME) != null &&
1309 p.annotations().value(AnnotationKeys.PORT_NAME).startsWith(NNI)) {
1310 log.error("NNI port number {} is not matching with configured value", p.number().toLong());
1311 return false;
1312 }
1313 return true;
Jonathan Hart1d34c8b2018-05-05 15:37:28 -07001314 }
1315
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001316 /**
1317 * Gets the given device details from SADIS.
1318 * If the device is not found, returns null
1319 *
1320 * @param dev the access device
1321 * @return the olt information
1322 */
Jonathan Hart4c538002018-08-23 10:11:54 -07001323 private SubscriberAndDeviceInformation getOltInfo(Device dev) {
Ilayda Ozdemir90a93622021-02-25 09:40:58 +00001324 if (subsService == null) {
1325 log.warn(SADIS_NOT_RUNNING);
1326 return null;
1327 }
Jonathan Hart4c538002018-08-23 10:11:54 -07001328 String devSerialNo = dev.serialNumber();
Gamze Abaka641fc072018-09-04 09:16:27 +00001329 return subsService.get(devSerialNo);
Jonathan Hart4c538002018-08-23 10:11:54 -07001330 }
1331
Andrea Campanella1edf8832021-05-06 12:51:33 +02001332 /**
1333 * Checks for mastership or falls back to leadership on deviceId.
1334 * If the device is available use mastership,
1335 * otherwise fallback on leadership.
1336 * Leadership on the device topic is needed because the master can be NONE
1337 * in case the device went away, we still need to handle events
1338 * consistently
1339 */
Andrea Campanellaaf39b4c2020-05-13 14:07:44 +02001340 private boolean isLocalLeader(DeviceId deviceId) {
Andrea Campanella1edf8832021-05-06 12:51:33 +02001341 if (deviceService.isAvailable(deviceId)) {
1342 return mastershipService.isLocalMaster(deviceId);
1343 } else {
Andrea Campanellaaf39b4c2020-05-13 14:07:44 +02001344 // Fallback with Leadership service - device id is used as topic
1345 NodeId leader = leadershipService.runForLeadership(
1346 deviceId.toString()).leaderNodeId();
1347 // Verify if this node is the leader
1348 return clusterService.getLocalNode().id().equals(leader);
Jonathan Hart4f178fa2020-02-03 10:46:01 -08001349 }
Jonathan Hart4f178fa2020-02-03 10:46:01 -08001350 }
1351
kdarapuaa5da252020-04-10 15:58:05 +05301352 private boolean isNniPort(Port port) {
1353 if (port.annotations().keys().contains(AnnotationKeys.PORT_NAME)) {
1354 return port.annotations().value(AnnotationKeys.PORT_NAME).contains(NNI);
1355 }
1356 return false;
1357 }
1358
Tunahan Sezena07fe962021-02-24 08:24:24 +00001359 private class InternalHostListener implements HostListener {
1360 @Override
1361 public void event(HostEvent event) {
1362 hostEventExecutor.execute(() -> {
1363 Host host = event.subject();
1364 switch (event.type()) {
1365 case HOST_ADDED:
1366 ConnectPoint cp = new ConnectPoint(host.location().deviceId(), host.location().port());
1367 Optional<SubscriberFlowInfo> optSubFlowInfo =
1368 getAndRemoveWaitingMacSubFlowInfoForCTag(cp, host.vlan());
1369 if (optSubFlowInfo.isPresent()) {
1370 log.debug("Continuing provisioning for waiting mac service. event: {}", event);
1371 continueProvisioningSubs(optSubFlowInfo.get(), Optional.of(host.mac()));
1372 } else {
1373 log.debug("There is no waiting mac sub. event: {}", event);
1374 }
1375 break;
1376 case HOST_UPDATED:
1377 if (event.prevSubject() != null && !event.prevSubject().mac().equals(event.subject().mac())) {
Tunahan Sezenf0843b92021-04-30 07:13:16 +00001378 log.debug("Subscriber's MAC address changed from {} to {}. " +
1379 "devId/portNumber: {}/{} vlan: {}", event.prevSubject().mac(),
1380 event.subject().mac(), host.location().deviceId(), host.location().port(),
1381 host.vlan());
Tunahan Sezena07fe962021-02-24 08:24:24 +00001382 // TODO handle subscriber MAC Address changed
1383 } else {
1384 log.debug("Unhandled HOST_UPDATED event: {}", event);
1385 }
1386 break;
1387 default:
1388 log.debug("Unhandled host event received. event: {}", event);
1389 }
1390 });
1391 }
1392
1393 @Override
1394 public boolean isRelevant(HostEvent event) {
1395 return isLocalLeader(event.subject().location().deviceId());
1396 }
1397 }
1398
alshabibf0e7e702015-05-30 18:22:36 -07001399 private class InternalDeviceListener implements DeviceListener {
Saurav Dasa9d5f442019-03-06 19:32:48 -08001400 private Set<DeviceId> programmedDevices = Sets.newConcurrentHashSet();
1401
alshabibf0e7e702015-05-30 18:22:36 -07001402 @Override
1403 public void event(DeviceEvent event) {
Matteo Scandolo632f0fc2018-09-07 12:21:45 -07001404 eventExecutor.execute(() -> {
1405 DeviceId devId = event.subject().id();
1406 Device dev = event.subject();
Tunahan Sezenf0843b92021-04-30 07:13:16 +00001407 Port p = event.port();
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001408 DeviceEvent.Type eventType = event.type();
Jonathan Hart4c538002018-08-23 10:11:54 -07001409
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001410 if (DeviceEvent.Type.PORT_STATS_UPDATED.equals(eventType) ||
1411 DeviceEvent.Type.DEVICE_SUSPENDED.equals(eventType) ||
1412 DeviceEvent.Type.DEVICE_UPDATED.equals(eventType)) {
Matteo Scandolo632f0fc2018-09-07 12:21:45 -07001413 return;
1414 }
Jonathan Hart4c538002018-08-23 10:11:54 -07001415
Andrea Campanellaaf39b4c2020-05-13 14:07:44 +02001416 boolean isLocalLeader = isLocalLeader(devId);
Jonathan Hart4f178fa2020-02-03 10:46:01 -08001417 // Only handle the event if the device belongs to us
Andrea Campanellaaf39b4c2020-05-13 14:07:44 +02001418 if (!isLocalLeader && event.type().equals(DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED)
1419 && !deviceService.isAvailable(devId) && deviceService.getPorts(devId).isEmpty()) {
1420 log.info("Cleaning local state for non master instance upon " +
Andrea Campanella0c3309d2020-05-29 01:51:18 -07001421 "device disconnection {}", devId);
Andrea Campanellaaf39b4c2020-05-13 14:07:44 +02001422 // Since no mastership of the device is present upon disconnection
1423 // the method in the FlowRuleManager only empties the local copy
1424 // of the DeviceFlowTable thus this method needs to get called
1425 // on every instance, see how it's done in the InternalDeviceListener
1426 // in FlowRuleManager: no mastership check for purgeOnDisconnection
Andrea Campanella3f34c992020-07-15 10:54:10 +02001427 handleDeviceDisconnection(dev, false, false);
Andrea Campanellaaf39b4c2020-05-13 14:07:44 +02001428 return;
1429 } else if (!isLocalLeader) {
Andrea Campanella506df202020-05-21 10:26:12 +02001430 log.debug("Not handling event because instance is not leader for {}", devId);
Jonathan Hart4f178fa2020-02-03 10:46:01 -08001431 return;
1432 }
1433
Matteo Scandolo19b56f62020-10-29 13:29:21 -07001434 log.debug("OLT got {} event for {}/{}", eventType, event.subject(), event.port());
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001435
Matteo Scandolo632f0fc2018-09-07 12:21:45 -07001436 if (getOltInfo(dev) == null) {
Saurav Dasa9d5f442019-03-06 19:32:48 -08001437 // it's possible that we got an event for a previously
1438 // programmed OLT that is no longer available in SADIS
1439 // we let such events go through
1440 if (!programmedDevices.contains(devId)) {
1441 log.warn("No device info found for {}, this is either "
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001442 + "not an OLT or not known to sadis", dev);
Saurav Dasa9d5f442019-03-06 19:32:48 -08001443 return;
1444 }
Matteo Scandolo632f0fc2018-09-07 12:21:45 -07001445 }
Tunahan Sezenf0843b92021-04-30 07:13:16 +00001446 AccessDevicePort port = null;
1447 if (p != null) {
1448 if (isUniPort(dev, p)) {
1449 port = new AccessDevicePort(p, AccessDevicePort.Type.UNI);
1450 } else {
1451 port = new AccessDevicePort(p, AccessDevicePort.Type.NNI);
1452 }
1453 }
Jonathan Hart4c538002018-08-23 10:11:54 -07001454
Matteo Scandolo632f0fc2018-09-07 12:21:45 -07001455 switch (event.type()) {
1456 //TODO: Port handling and bookkeeping should be improved once
1457 // olt firmware handles correct behaviour.
1458 case PORT_ADDED:
Andrea Campanella3f34c992020-07-15 10:54:10 +02001459 if (!deviceService.isAvailable(devId)) {
1460 log.warn("Received {} for disconnected device {}, ignoring", event, devId);
1461 return;
1462 }
Tunahan Sezenf0843b92021-04-30 07:13:16 +00001463 if (port.type().equals(AccessDevicePort.Type.UNI)) {
1464 post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_ADDED, devId, port.port()));
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001465
1466 if (port.isEnabled() && !port.number().equals(PortNumber.LOCAL)) {
Tunahan Sezenf0843b92021-04-30 07:13:16 +00001467 log.info("eapol will be sent for port added {}", port);
1468 oltFlowService.processEapolFilteringObjectives(port, defaultBpId,
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001469 null,
Andrea Campanella3f34c992020-07-15 10:54:10 +02001470 VlanId.vlanId(EAPOL_DEFAULT_VLAN),
1471 true);
Matteo Scandolo632f0fc2018-09-07 12:21:45 -07001472 }
1473 } else {
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001474 SubscriberAndDeviceInformation deviceInfo = getOltInfo(dev);
1475 if (deviceInfo != null) {
Tunahan Sezenf0843b92021-04-30 07:13:16 +00001476 oltFlowService.processNniFilteringObjectives(port, true);
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001477 }
Matteo Scandolo632f0fc2018-09-07 12:21:45 -07001478 }
1479 break;
1480 case PORT_REMOVED:
Tunahan Sezenf0843b92021-04-30 07:13:16 +00001481 if (port.type().equals(AccessDevicePort.Type.UNI)) {
Andrea Campanellacf0e3052020-08-27 11:05:39 +02001482 // if no subscriber is provisioned we need to remove the default EAPOL
1483 // if a subscriber was provisioned the default EAPOL will not be there and we can skip.
1484 // The EAPOL with subscriber tag will be removed by removeSubscriber call.
1485 Collection<? extends UniTagInformation> uniTagInformationSet =
Tunahan Sezenf0843b92021-04-30 07:13:16 +00001486 programmedSubs.get(new ConnectPoint(port.deviceId(), port.number())).value();
Andrea Campanellacf0e3052020-08-27 11:05:39 +02001487 if (uniTagInformationSet == null || uniTagInformationSet.isEmpty()) {
Tunahan Sezenf0843b92021-04-30 07:13:16 +00001488 log.info("No subscriber provisioned on port {} in PORT_REMOVED event, " +
1489 "removing default EAPOL flow", port);
1490 oltFlowService.processEapolFilteringObjectives(port, defaultBpId, null,
Andrea Campanellacf0e3052020-08-27 11:05:39 +02001491 VlanId.vlanId(EAPOL_DEFAULT_VLAN),
1492 false);
1493 } else {
Tunahan Sezenf0843b92021-04-30 07:13:16 +00001494 removeSubscriber(port);
Andrea Campanellacf0e3052020-08-27 11:05:39 +02001495 }
Andy Bavier160e8682019-05-07 18:32:22 -07001496
Tunahan Sezenf0843b92021-04-30 07:13:16 +00001497 post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_REMOVED, devId, port.port()));
Matteo Scandolo632f0fc2018-09-07 12:21:45 -07001498 }
Matteo Scandolo632f0fc2018-09-07 12:21:45 -07001499 break;
1500 case PORT_UPDATED:
Andrea Campanella3f34c992020-07-15 10:54:10 +02001501 if (!deviceService.isAvailable(devId)) {
1502 log.warn("Received {} for disconnected device {}, ignoring", event, devId);
1503 return;
1504 }
Tunahan Sezenf0843b92021-04-30 07:13:16 +00001505 if (port.type().equals(AccessDevicePort.Type.NNI)) {
Saurav Das9da7d522020-03-23 19:14:35 -07001506 SubscriberAndDeviceInformation deviceInfo = getOltInfo(dev);
1507 if (deviceInfo != null && port.isEnabled()) {
Tunahan Sezenf0843b92021-04-30 07:13:16 +00001508 log.debug("NNI {} enabled", port);
1509 oltFlowService.processNniFilteringObjectives(port, true);
Saurav Das9da7d522020-03-23 19:14:35 -07001510 }
1511 return;
Matteo Scandolo632f0fc2018-09-07 12:21:45 -07001512 }
Jonathan Hart4f178fa2020-02-03 10:46:01 -08001513 ConnectPoint cp = new ConnectPoint(devId, port.number());
1514 Collection<? extends UniTagInformation> uniTagInformationSet = programmedSubs.get(cp).value();
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001515 if (uniTagInformationSet == null || uniTagInformationSet.isEmpty()) {
Saurav Dasb776aef2020-03-09 14:29:46 -07001516 if (!port.number().equals(PortNumber.LOCAL)) {
Tunahan Sezenf0843b92021-04-30 07:13:16 +00001517 log.info("eapol will be {} updated for {} with default vlan {}",
1518 (port.isEnabled()) ? "added" : "removed", port, EAPOL_DEFAULT_VLAN);
1519 oltFlowService.processEapolFilteringObjectives(port, defaultBpId, null,
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001520 VlanId.vlanId(EAPOL_DEFAULT_VLAN),
1521 port.isEnabled());
1522 }
1523 } else {
Tunahan Sezenf0843b92021-04-30 07:13:16 +00001524 log.info("eapol will be {} updated for {}", (port.isEnabled()) ? "added" : "removed",
1525 port);
1526 for (UniTagInformation uniTag : uniTagInformationSet) {
1527 oltFlowService.processEapolFilteringObjectives(port,
1528 uniTag.getUpstreamBandwidthProfile(), null,
1529 uniTag.getPonCTag(), port.isEnabled());
1530 }
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001531 }
Gamze Abaka838d8142019-02-21 07:06:55 +00001532 if (port.isEnabled()) {
Tunahan Sezenf0843b92021-04-30 07:13:16 +00001533 post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_ADDED, devId, port.port()));
Matteo Scandolo632f0fc2018-09-07 12:21:45 -07001534 } else {
Tunahan Sezenf0843b92021-04-30 07:13:16 +00001535 post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_REMOVED, devId, port.port()));
Jonathan Hart1d34c8b2018-05-05 15:37:28 -07001536 }
alshabibbb83aa22016-02-10 15:08:23 -08001537 break;
Matteo Scandolo632f0fc2018-09-07 12:21:45 -07001538 case DEVICE_ADDED:
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001539 handleDeviceConnection(dev, true);
Matteo Scandolo632f0fc2018-09-07 12:21:45 -07001540 break;
1541 case DEVICE_REMOVED:
Andrea Campanella3f34c992020-07-15 10:54:10 +02001542 handleDeviceDisconnection(dev, true, true);
Matteo Scandolo632f0fc2018-09-07 12:21:45 -07001543 break;
1544 case DEVICE_AVAILABILITY_CHANGED:
1545 if (deviceService.isAvailable(devId)) {
Saurav Dasbd3b6712020-03-31 23:28:35 -07001546 log.info("Handling available device: {}", dev.id());
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001547 handleDeviceConnection(dev, false);
Matteo Scandolo632f0fc2018-09-07 12:21:45 -07001548 } else {
Hardik Windlassa58fbee2020-03-12 18:33:55 +05301549 if (deviceService.getPorts(devId).isEmpty()) {
Saurav Dasbd3b6712020-03-31 23:28:35 -07001550 log.info("Handling controlled device disconnection .. "
Andrea Campanella0c3309d2020-05-29 01:51:18 -07001551 + "flushing all state for dev:{}", devId);
Andrea Campanella3f34c992020-07-15 10:54:10 +02001552 handleDeviceDisconnection(dev, true, false);
Saurav Dasbd3b6712020-03-31 23:28:35 -07001553 } else {
1554 log.info("Disconnected device has available ports .. "
Andrea Campanella0c3309d2020-05-29 01:51:18 -07001555 + "assuming temporary disconnection, "
1556 + "retaining state for device {}", devId);
Hardik Windlassa58fbee2020-03-12 18:33:55 +05301557 }
Matteo Scandolo632f0fc2018-09-07 12:21:45 -07001558 }
1559 break;
Matteo Scandolo632f0fc2018-09-07 12:21:45 -07001560 default:
Andrea Campanella3f34c992020-07-15 10:54:10 +02001561 log.debug("Not handling event {}", event);
Matteo Scandolo632f0fc2018-09-07 12:21:45 -07001562 return;
1563 }
1564 });
alshabibf0e7e702015-05-30 18:22:36 -07001565 }
Gamze Abaka838d8142019-02-21 07:06:55 +00001566
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001567 private void sendUniEvent(Device device, AccessDeviceEvent.Type eventType) {
1568 deviceService.getPorts(device.id()).stream()
1569 .filter(p -> !PortNumber.LOCAL.equals(p.number()))
1570 .filter(p -> isUniPort(device, p))
1571 .forEach(p -> post(new AccessDeviceEvent(eventType, device.id(), p)));
1572 }
1573
Andrea Campanella3f34c992020-07-15 10:54:10 +02001574 private void handleDeviceDisconnection(Device device, boolean sendDisconnectedEvent, boolean sendUniEvent) {
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001575 programmedDevices.remove(device.id());
1576 removeAllSubscribers(device.id());
Tunahan Sezena07fe962021-02-24 08:24:24 +00001577 removeWaitingMacSubs(device.id());
Andrea Campanella600d2e22020-06-22 11:00:31 +02001578 //Handle case where OLT disconnects during subscriber provisioning
Andrea Campanella7a1d7e72020-11-05 10:40:10 +01001579 pendingSubscribersForDevice.remove(device.id());
Andrea Campanella600d2e22020-06-22 11:00:31 +02001580 oltFlowService.clearDeviceState(device.id());
1581
1582 //Complete meter and flow purge
Hardik Windlassa58fbee2020-03-12 18:33:55 +05301583 flowRuleService.purgeFlowRules(device.id());
Jonathan Hart4f178fa2020-02-03 10:46:01 -08001584 oltMeterService.clearMeters(device.id());
Andrea Campanella3f34c992020-07-15 10:54:10 +02001585 if (sendDisconnectedEvent) {
1586 post(new AccessDeviceEvent(
1587 AccessDeviceEvent.Type.DEVICE_DISCONNECTED, device.id(),
1588 null, null, null));
1589 }
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001590 if (sendUniEvent) {
1591 sendUniEvent(device, AccessDeviceEvent.Type.UNI_REMOVED);
Gamze Abaka838d8142019-02-21 07:06:55 +00001592 }
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001593 }
1594
1595 private void handleDeviceConnection(Device dev, boolean sendUniEvent) {
1596 post(new AccessDeviceEvent(
1597 AccessDeviceEvent.Type.DEVICE_CONNECTED, dev.id(),
1598 null, null, null));
1599 programmedDevices.add(dev.id());
1600 checkAndCreateDeviceFlows(dev);
1601 if (sendUniEvent) {
1602 sendUniEvent(dev, AccessDeviceEvent.Type.UNI_ADDED);
1603 }
Gamze Abaka838d8142019-02-21 07:06:55 +00001604 }
Gamze Abakada282b42019-03-11 13:16:48 +00001605
1606 private void removeAllSubscribers(DeviceId deviceId) {
Jonathan Hart4f178fa2020-02-03 10:46:01 -08001607 List<Map.Entry<ConnectPoint, UniTagInformation>> subs = programmedSubs.stream()
1608 .filter(e -> e.getKey().deviceId().equals(deviceId))
1609 .collect(toList());
Gamze Abakada282b42019-03-11 13:16:48 +00001610
Jonathan Hart4f178fa2020-02-03 10:46:01 -08001611 subs.forEach(e -> programmedSubs.remove(e.getKey(), e.getValue()));
Gamze Abakada282b42019-03-11 13:16:48 +00001612 }
Gamze Abaka641fc072018-09-04 09:16:27 +00001613
Tunahan Sezena07fe962021-02-24 08:24:24 +00001614 private void removeWaitingMacSubs(DeviceId deviceId) {
1615 List<ConnectPoint> waitingMacKeys = waitingMacSubscribers.stream()
1616 .filter(cp -> cp.getKey().deviceId().equals(deviceId))
1617 .map(Map.Entry::getKey)
1618 .collect(toList());
1619 waitingMacKeys.forEach(cp -> waitingMacSubscribers.removeAll(cp));
1620 }
1621
Gamze Abaka641fc072018-09-04 09:16:27 +00001622 }
Jonathan Hart4f178fa2020-02-03 10:46:01 -08001623
1624 private class InternalClusterListener implements ClusterEventListener {
1625
1626 @Override
1627 public void event(ClusterEvent event) {
1628 if (event.type() == ClusterEvent.Type.INSTANCE_READY) {
1629 hasher.addServer(event.subject().id());
1630 }
1631 if (event.type() == ClusterEvent.Type.INSTANCE_DEACTIVATED) {
1632 hasher.removeServer(event.subject().id());
1633 }
1634 }
1635 }
Andrea Campanella0c3309d2020-05-29 01:51:18 -07001636
Hardik Windlass395ff372019-06-13 05:16:00 +00001637}