blob: b95bc087bf66868460d61fd4bc3d9e57cde1bd40 [file] [log] [blame]
alshabibf0e7e702015-05-30 18:22:36 -07001/*
Brian O'Connord6a135a2017-08-03 22:46:05 -07002 * Copyright 2016-present Open Networking Foundation
alshabibf0e7e702015-05-30 18:22:36 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
alshabib36a4d732016-06-01 16:03:59 -070016package org.opencord.olt.impl;
alshabibf0e7e702015-05-30 18:22:36 -070017
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010018import com.google.common.collect.ImmutableMap;
19import com.google.common.collect.Sets;
alshabibf0e7e702015-05-30 18:22:36 -070020import org.onlab.packet.VlanId;
Jonathan Hart4f178fa2020-02-03 10:46:01 -080021import org.onlab.util.KryoNamespace;
Gamze Abaka1f62dd92020-05-07 08:58:13 +000022import org.onosproject.cfg.ComponentConfigService;
Jonathan Hart4f178fa2020-02-03 10:46:01 -080023import org.onosproject.cluster.ClusterEvent;
24import org.onosproject.cluster.ClusterEventListener;
25import org.onosproject.cluster.ClusterService;
26import org.onosproject.cluster.ControllerNode;
Andrea Campanellaaf39b4c2020-05-13 14:07:44 +020027import org.onosproject.cluster.LeadershipService;
Jonathan Hart4f178fa2020-02-03 10:46:01 -080028import org.onosproject.cluster.NodeId;
alshabibf0e7e702015-05-30 18:22:36 -070029import org.onosproject.core.ApplicationId;
30import org.onosproject.core.CoreService;
alshabib8e4fd2f2016-01-12 15:55:53 -080031import org.onosproject.event.AbstractListenerManager;
Andrea Campanellaaf39b4c2020-05-13 14:07:44 +020032import org.onosproject.mastership.MastershipService;
Amit Ghosh1ed9aef2018-07-17 17:08:16 +010033import org.onosproject.net.AnnotationKeys;
Jonathan Harte533a422015-10-20 17:31:24 -070034import org.onosproject.net.ConnectPoint;
Amit Ghosh1ed9aef2018-07-17 17:08:16 +010035import org.onosproject.net.Device;
alshabibf0e7e702015-05-30 18:22:36 -070036import org.onosproject.net.DeviceId;
alshabibdec2e252016-01-15 12:20:25 -080037import org.onosproject.net.Port;
alshabibf0e7e702015-05-30 18:22:36 -070038import org.onosproject.net.PortNumber;
39import org.onosproject.net.device.DeviceEvent;
40import org.onosproject.net.device.DeviceListener;
41import org.onosproject.net.device.DeviceService;
Hardik Windlassa58fbee2020-03-12 18:33:55 +053042import org.onosproject.net.flow.FlowRuleService;
alshabibf0e7e702015-05-30 18:22:36 -070043import org.onosproject.net.flowobjective.FlowObjectiveService;
44import org.onosproject.net.flowobjective.ForwardingObjective;
alshabib3ea82642016-01-12 18:06:53 -080045import org.onosproject.net.flowobjective.Objective;
46import org.onosproject.net.flowobjective.ObjectiveContext;
47import org.onosproject.net.flowobjective.ObjectiveError;
Saurav Daseae48de2019-06-19 13:26:15 -070048import org.onosproject.net.meter.MeterId;
Jonathan Hart4f178fa2020-02-03 10:46:01 -080049import org.onosproject.store.serializers.KryoNamespaces;
50import org.onosproject.store.service.ConsistentMultimap;
51import org.onosproject.store.service.Serializer;
52import org.onosproject.store.service.StorageService;
alshabib36a4d732016-06-01 16:03:59 -070053import org.opencord.olt.AccessDeviceEvent;
54import org.opencord.olt.AccessDeviceListener;
55import org.opencord.olt.AccessDeviceService;
Amit Ghosh31939522018-08-16 13:28:21 +010056import org.opencord.olt.AccessSubscriberId;
Andrea Campanellacbbb7952019-11-25 06:38:41 +000057import org.opencord.olt.internalapi.AccessDeviceFlowService;
58import org.opencord.olt.internalapi.AccessDeviceMeterService;
Gamze Abaka641fc072018-09-04 09:16:27 +000059import org.opencord.sadis.BandwidthProfileInformation;
60import org.opencord.sadis.BaseInformationService;
61import org.opencord.sadis.SadisService;
Amit Ghosh1ed9aef2018-07-17 17:08:16 +010062import org.opencord.sadis.SubscriberAndDeviceInformation;
Andrea Campanellacbbb7952019-11-25 06:38:41 +000063import org.opencord.sadis.UniTagInformation;
alshabibe0559672016-02-21 14:49:51 -080064import org.osgi.service.component.ComponentContext;
Carmelo Casconeca931162019-07-15 18:22:24 -070065import org.osgi.service.component.annotations.Activate;
66import org.osgi.service.component.annotations.Component;
67import org.osgi.service.component.annotations.Deactivate;
68import org.osgi.service.component.annotations.Modified;
69import org.osgi.service.component.annotations.Reference;
70import org.osgi.service.component.annotations.ReferenceCardinality;
Ilayda Ozdemir90a93622021-02-25 09:40:58 +000071import org.osgi.service.component.annotations.ReferencePolicy;
alshabibf0e7e702015-05-30 18:22:36 -070072import org.slf4j.Logger;
73
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010074import java.util.ArrayList;
75import java.util.Collection;
76import java.util.Dictionary;
77import java.util.List;
78import java.util.Map;
79import java.util.Optional;
80import java.util.Properties;
81import java.util.Set;
82import java.util.concurrent.BlockingQueue;
83import java.util.concurrent.CompletableFuture;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010084import java.util.concurrent.ExecutorService;
85import java.util.concurrent.Executors;
86import java.util.concurrent.LinkedBlockingQueue;
87import java.util.concurrent.ScheduledExecutorService;
88import java.util.concurrent.TimeUnit;
89import java.util.concurrent.atomic.AtomicBoolean;
90
91import static com.google.common.base.Preconditions.checkNotNull;
92import static com.google.common.base.Strings.isNullOrEmpty;
93import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
94import static java.util.stream.Collectors.*;
95import static org.onlab.util.Tools.get;
96import static org.onlab.util.Tools.groupedThreads;
97import static org.opencord.olt.impl.OsgiPropertyConstants.*;
98import static org.slf4j.LoggerFactory.getLogger;
alshabibf0e7e702015-05-30 18:22:36 -070099
100/**
Jonathan Harte533a422015-10-20 17:31:24 -0700101 * Provisions rules on access devices.
alshabibf0e7e702015-05-30 18:22:36 -0700102 */
Carmelo Casconeca931162019-07-15 18:22:24 -0700103@Component(immediate = true,
104 property = {
Carmelo Casconeca931162019-07-15 18:22:24 -0700105 DEFAULT_BP_ID + ":String=" + DEFAULT_BP_ID_DEFAULT,
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000106 DEFAULT_MCAST_SERVICE_NAME + ":String=" + DEFAULT_MCAST_SERVICE_NAME_DEFAULT,
Saurav Das2d3777a2020-08-07 18:48:51 -0700107 EAPOL_DELETE_RETRY_MAX_ATTEMPS + ":Integer=" +
108 EAPOL_DELETE_RETRY_MAX_ATTEMPS_DEFAULT,
Matteo Scandoloa4aaa972020-10-23 15:24:38 -0700109 PROVISION_DELAY + ":Integer=" + PROVISION_DELAY_DEFAULT,
Carmelo Casconeca931162019-07-15 18:22:24 -0700110 })
alshabib8e4fd2f2016-01-12 15:55:53 -0800111public class Olt
112 extends AbstractListenerManager<AccessDeviceEvent, AccessDeviceListener>
113 implements AccessDeviceService {
Ilayda Ozdemir90a93622021-02-25 09:40:58 +0000114 private static final String SADIS_NOT_RUNNING = "Sadis is not running.";
Charles Chan54f110f2017-01-20 11:22:42 -0800115 private static final String APP_NAME = "org.opencord.olt";
alshabibe0559672016-02-21 14:49:51 -0800116
Gamze Abakada282b42019-03-11 13:16:48 +0000117 private static final short EAPOL_DEFAULT_VLAN = 4091;
Gamze Abaka838d8142019-02-21 07:06:55 +0000118 private static final String NO_UPLINK_PORT = "No uplink port found for OLT device {}";
alshabibe0559672016-02-21 14:49:51 -0800119
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800120 public static final int HASH_WEIGHT = 10;
121
alshabibf0e7e702015-05-30 18:22:36 -0700122 private final Logger log = getLogger(getClass());
123
Thomas Lee Sd7735f92020-02-20 19:21:47 +0530124 private static final String NNI = "nni-";
125
Carmelo Casconeca931162019-07-15 18:22:24 -0700126 @Reference(cardinality = ReferenceCardinality.MANDATORY)
alshabibf0e7e702015-05-30 18:22:36 -0700127 protected FlowObjectiveService flowObjectiveService;
128
Carmelo Casconeca931162019-07-15 18:22:24 -0700129 @Reference(cardinality = ReferenceCardinality.MANDATORY)
alshabibf0e7e702015-05-30 18:22:36 -0700130 protected DeviceService deviceService;
131
Carmelo Casconeca931162019-07-15 18:22:24 -0700132 @Reference(cardinality = ReferenceCardinality.MANDATORY)
alshabibf0e7e702015-05-30 18:22:36 -0700133 protected CoreService coreService;
134
Ilayda Ozdemir90a93622021-02-25 09:40:58 +0000135 @Reference(cardinality = ReferenceCardinality.OPTIONAL,
136 bind = "bindSadisService",
137 unbind = "unbindSadisService",
138 policy = ReferencePolicy.DYNAMIC)
139 protected volatile SadisService sadisService;
Gamze Abaka641fc072018-09-04 09:16:27 +0000140
Carmelo Casconeca931162019-07-15 18:22:24 -0700141 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000142 protected AccessDeviceFlowService oltFlowService;
alshabibe0559672016-02-21 14:49:51 -0800143
Carmelo Casconeca931162019-07-15 18:22:24 -0700144 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000145 protected AccessDeviceMeterService oltMeterService;
Gamze Abakaad329652018-12-20 10:12:21 +0000146
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800147 @Reference(cardinality = ReferenceCardinality.MANDATORY)
148 protected StorageService storageService;
149
150 @Reference(cardinality = ReferenceCardinality.MANDATORY)
151 protected ClusterService clusterService;
152
Hardik Windlassa58fbee2020-03-12 18:33:55 +0530153 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Andrea Campanellaaf39b4c2020-05-13 14:07:44 +0200154 protected MastershipService mastershipService;
155
156 @Reference(cardinality = ReferenceCardinality.MANDATORY)
157 protected LeadershipService leadershipService;
158
159 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Hardik Windlassa58fbee2020-03-12 18:33:55 +0530160 protected FlowRuleService flowRuleService;
161
Gamze Abaka1f62dd92020-05-07 08:58:13 +0000162 @Reference(cardinality = ReferenceCardinality.MANDATORY)
163 protected ComponentConfigService componentConfigService;
164
Carmelo Casconeca931162019-07-15 18:22:24 -0700165 /**
Carmelo Cascone95ff5122019-11-14 14:19:13 -0800166 * Default bandwidth profile id that is used for authentication trap flows.
Carmelo Casconeca931162019-07-15 18:22:24 -0700167 **/
168 protected String defaultBpId = DEFAULT_BP_ID_DEFAULT;
Gamze Abakaad329652018-12-20 10:12:21 +0000169
Carmelo Casconeca931162019-07-15 18:22:24 -0700170 /**
Gamze Abaka51a34e82020-05-08 13:03:14 +0000171 * Default multicast service name.
Carmelo Casconeca931162019-07-15 18:22:24 -0700172 **/
Gamze Abaka51a34e82020-05-08 13:03:14 +0000173 protected String multicastServiceName = DEFAULT_MCAST_SERVICE_NAME_DEFAULT;
Gamze Abaka33feef52019-02-27 08:16:47 +0000174
Saurav Das2d3777a2020-08-07 18:48:51 -0700175 /**
176 * Default amounts of eapol retry.
177 **/
178 protected int eapolDeleteRetryMaxAttempts = EAPOL_DELETE_RETRY_MAX_ATTEMPS_DEFAULT;
179
Matteo Scandoloa4aaa972020-10-23 15:24:38 -0700180 /**
181 * Delay between EAPOL removal and data plane flows provisioning.
182 */
183 protected int provisionDelay = PROVISION_DELAY_DEFAULT;
184
alshabibf0e7e702015-05-30 18:22:36 -0700185 private final DeviceListener deviceListener = new InternalDeviceListener();
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800186 private final ClusterEventListener clusterListener = new InternalClusterListener();
187
188 private ConsistentHasher hasher;
alshabibf0e7e702015-05-30 18:22:36 -0700189
Gamze Abaka641fc072018-09-04 09:16:27 +0000190 protected BaseInformationService<SubscriberAndDeviceInformation> subsService;
191 private BaseInformationService<BandwidthProfileInformation> bpService;
alshabibf0e7e702015-05-30 18:22:36 -0700192
Gamze Abaka641fc072018-09-04 09:16:27 +0000193 private ExecutorService oltInstallers = Executors.newFixedThreadPool(4,
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000194 groupedThreads("onos/olt-service",
195 "olt-installer-%d"));
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100196
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700197 protected ExecutorService eventExecutor;
Saurav Das2d3777a2020-08-07 18:48:51 -0700198 protected ExecutorService retryExecutor;
Matteo Scandoloa4aaa972020-10-23 15:24:38 -0700199 protected ScheduledExecutorService provisionExecutor;
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700200
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800201 private ConsistentMultimap<ConnectPoint, UniTagInformation> programmedSubs;
Saurav Das2d3777a2020-08-07 18:48:51 -0700202 private ConsistentMultimap<ConnectPoint, UniTagInformation> failedSubs;
Saurav Dasa9d5f442019-03-06 19:32:48 -0800203
Ilayda Ozdemir90a93622021-02-25 09:40:58 +0000204 protected Map<DeviceId, BlockingQueue<SubscriberFlowInfo>> pendingSubscribersForDevice;
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700205
alshabibf0e7e702015-05-30 18:22:36 -0700206 @Activate
alshabibe0559672016-02-21 14:49:51 -0800207 public void activate(ComponentContext context) {
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000208 eventExecutor = newSingleThreadScheduledExecutor(groupedThreads("onos/olt",
209 "events-%d", log));
Saurav Das2d3777a2020-08-07 18:48:51 -0700210 retryExecutor = Executors.newCachedThreadPool();
Matteo Scandoloa4aaa972020-10-23 15:24:38 -0700211 provisionExecutor = Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/olt",
212 "provision-%d", log));
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700213
alshabibe0559672016-02-21 14:49:51 -0800214 modified(context);
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000215 ApplicationId appId = coreService.registerApplication(APP_NAME);
Gamze Abaka1f62dd92020-05-07 08:58:13 +0000216 componentConfigService.registerProperties(getClass());
Saurav Das62ad75e2019-03-05 12:22:22 -0800217
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800218 KryoNamespace serializer = KryoNamespace.newBuilder()
219 .register(KryoNamespaces.API)
220 .register(UniTagInformation.class)
Ilayda Ozdemir90a93622021-02-25 09:40:58 +0000221 .register(SubscriberFlowInfo.class)
222 .register(LinkedBlockingQueue.class)
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800223 .build();
224
225 programmedSubs = storageService.<ConnectPoint, UniTagInformation>consistentMultimapBuilder()
226 .withName("volt-programmed-subs")
227 .withSerializer(Serializer.using(serializer))
228 .withApplicationId(appId)
229 .build();
alshabibc4dfe852015-06-05 13:35:13 -0700230
Saurav Das2d3777a2020-08-07 18:48:51 -0700231 failedSubs = storageService.<ConnectPoint, UniTagInformation>consistentMultimapBuilder()
232 .withName("volt-failed-subs")
233 .withSerializer(Serializer.using(serializer))
234 .withApplicationId(appId)
235 .build();
236
Ilayda Ozdemir90a93622021-02-25 09:40:58 +0000237 pendingSubscribersForDevice = storageService.<DeviceId, BlockingQueue<SubscriberFlowInfo>>consistentMapBuilder()
238 .withName("volt-pending-subs")
239 .withSerializer(Serializer.using(serializer))
240 .withApplicationId(appId)
241 .build().asJavaMap();
alshabib8e4fd2f2016-01-12 15:55:53 -0800242 eventDispatcher.addSink(AccessDeviceEvent.class, listenerRegistry);
243
Ilayda Ozdemir90a93622021-02-25 09:40:58 +0000244 if (sadisService != null) {
245 subsService = sadisService.getSubscriberInfoService();
246 bpService = sadisService.getBandwidthProfileService();
247 } else {
248 log.warn(SADIS_NOT_RUNNING);
249 }
Gamze Abaka641fc072018-09-04 09:16:27 +0000250
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800251 List<NodeId> readyNodes = clusterService.getNodes().stream()
252 .filter(c -> clusterService.getState(c.id()) == ControllerNode.State.READY)
253 .map(ControllerNode::id)
254 .collect(toList());
255 hasher = new ConsistentHasher(readyNodes, HASH_WEIGHT);
256 clusterService.addListener(clusterListener);
257
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100258 // look for all provisioned devices in Sadis and create EAPOL flows for the
259 // UNI ports
260 Iterable<Device> devices = deviceService.getDevices();
261 for (Device d : devices) {
Andrea Campanellaaf39b4c2020-05-13 14:07:44 +0200262 if (isLocalLeader(d.id())) {
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800263 checkAndCreateDeviceFlows(d);
264 }
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100265 }
alshabib4ceaed32016-03-03 18:00:58 -0800266
alshabibba357492016-01-27 13:49:46 -0800267 deviceService.addListener(deviceListener);
alshabibf0e7e702015-05-30 18:22:36 -0700268 log.info("Started with Application ID {}", appId.id());
269 }
270
271 @Deactivate
272 public void deactivate() {
Gamze Abaka1f62dd92020-05-07 08:58:13 +0000273 componentConfigService.unregisterProperties(getClass(), false);
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800274 clusterService.removeListener(clusterListener);
alshabib62e9ce72016-02-11 17:31:36 -0800275 deviceService.removeListener(deviceListener);
Jonathan Hart5f1c8142018-07-24 17:31:59 -0700276 eventDispatcher.removeSink(AccessDeviceEvent.class);
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700277 eventExecutor.shutdown();
Saurav Das2d3777a2020-08-07 18:48:51 -0700278 retryExecutor.shutdown();
Matteo Scandoloa4aaa972020-10-23 15:24:38 -0700279 provisionExecutor.shutdown();
alshabibf0e7e702015-05-30 18:22:36 -0700280 log.info("Stopped");
281 }
282
alshabibe0559672016-02-21 14:49:51 -0800283 @Modified
284 public void modified(ComponentContext context) {
285 Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
286
287 try {
Andrea Campanella971d5b92020-05-07 11:20:43 +0200288 String bpId = get(properties, DEFAULT_BP_ID);
289 defaultBpId = isNullOrEmpty(bpId) ? defaultBpId : bpId;
Gamze Abakaad329652018-12-20 10:12:21 +0000290
Andrea Campanella971d5b92020-05-07 11:20:43 +0200291 String mcastSN = get(properties, DEFAULT_MCAST_SERVICE_NAME);
292 multicastServiceName = isNullOrEmpty(mcastSN) ? multicastServiceName : mcastSN;
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000293
Saurav Das2d3777a2020-08-07 18:48:51 -0700294 String eapolDeleteRetryNew = get(properties, EAPOL_DELETE_RETRY_MAX_ATTEMPS);
295 eapolDeleteRetryMaxAttempts = isNullOrEmpty(eapolDeleteRetryNew) ? EAPOL_DELETE_RETRY_MAX_ATTEMPS_DEFAULT :
296 Integer.parseInt(eapolDeleteRetryNew.trim());
297
298 log.debug("OLT properties: DefaultBpId: {}, MulticastServiceName: {}, EapolDeleteRetryMaxAttempts: {}",
299 defaultBpId, multicastServiceName, eapolDeleteRetryMaxAttempts);
Gamze Abaka33feef52019-02-27 08:16:47 +0000300
alshabibe0559672016-02-21 14:49:51 -0800301 } catch (Exception e) {
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000302 log.error("Error while modifying the properties", e);
Andrea Campanella971d5b92020-05-07 11:20:43 +0200303 defaultBpId = DEFAULT_BP_ID_DEFAULT;
304 multicastServiceName = DEFAULT_MCAST_SERVICE_NAME_DEFAULT;
alshabibe0559672016-02-21 14:49:51 -0800305 }
306 }
307
Ilayda Ozdemir90a93622021-02-25 09:40:58 +0000308 protected void bindSadisService(SadisService service) {
309 sadisService = service;
310 bpService = sadisService.getBandwidthProfileService();
311 subsService = sadisService.getSubscriberInfoService();
312 log.info("Sadis-service binds to onos.");
313 }
314
315 protected void unbindSadisService(SadisService service) {
316 sadisService = null;
317 bpService = null;
318 subsService = null;
319 log.info("Sadis-service unbinds from onos.");
320 }
321
alshabib32232c82016-02-25 17:57:24 -0500322 @Override
Gamze Abaka838d8142019-02-21 07:06:55 +0000323 public boolean provisionSubscriber(ConnectPoint connectPoint) {
Andrea Campanella3ce4d282020-06-09 13:46:58 +0200324 log.info("Call to provision subscriber at {}", connectPoint);
Gamze Abaka838d8142019-02-21 07:06:55 +0000325 DeviceId deviceId = connectPoint.deviceId();
326 PortNumber subscriberPortNo = connectPoint.port();
Gamze Abaka838d8142019-02-21 07:06:55 +0000327 checkNotNull(deviceService.getPort(deviceId, subscriberPortNo),
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000328 "Invalid connect point:" + connectPoint);
Hardik Windlass395ff372019-06-13 05:16:00 +0000329
Saurav Das026650f2020-09-21 18:56:35 -0700330 if (isSubscriberInstalled(connectPoint)) {
331 log.warn("Subscriber at {} already provisioned or in the process .."
332 + " not taking any more action", connectPoint);
333 return true;
334 }
335
336 // Find the subscriber config at this connect point
Gamze Abaka838d8142019-02-21 07:06:55 +0000337 SubscriberAndDeviceInformation sub = getSubscriber(connectPoint);
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100338 if (sub == null) {
Gamze Abaka838d8142019-02-21 07:06:55 +0000339 log.warn("No subscriber found for {}", connectPoint);
Amit Ghosh31939522018-08-16 13:28:21 +0100340 return false;
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100341 }
Jonathan Harte533a422015-10-20 17:31:24 -0700342
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100343 // Get the uplink port
Gamze Abaka838d8142019-02-21 07:06:55 +0000344 Port uplinkPort = getUplinkPort(deviceService.getDevice(deviceId));
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100345 if (uplinkPort == null) {
Gamze Abaka838d8142019-02-21 07:06:55 +0000346 log.warn(NO_UPLINK_PORT, deviceId);
Amit Ghosh31939522018-08-16 13:28:21 +0100347 return false;
Jonathan Harte533a422015-10-20 17:31:24 -0700348 }
349
Saurav Das2d3777a2020-08-07 18:48:51 -0700350 // delete Eapol authentication flow with default bandwidth
351 // wait until Eapol rule with defaultBpId is removed to install subscriber-based rules
352 // retry deletion if it fails/times-out
353 retryExecutor.execute(new DeleteEapolInstallSub(connectPoint,
354 uplinkPort, sub, 1));
Amit Ghosh31939522018-08-16 13:28:21 +0100355 return true;
alshabibb7a9e172016-01-13 11:23:53 -0800356 }
357
Saurav Das026650f2020-09-21 18:56:35 -0700358 // returns true if subscriber is programmed or in the process of being programmed
359 private boolean isSubscriberInstalled(ConnectPoint connectPoint) {
360 Collection<? extends UniTagInformation> uniTagInformationSet =
361 programmedSubs.get(connectPoint).value();
362 if (!uniTagInformationSet.isEmpty()) {
363 return true;
364 }
Andrea Campanella7a1d7e72020-11-05 10:40:10 +0100365 //Check if the subscriber is already getting provisioned
366 // so we do not provision twice
367 AtomicBoolean isPending = new AtomicBoolean(false);
368 pendingSubscribersForDevice.computeIfPresent(connectPoint.deviceId(), (id, infos) -> {
369 for (SubscriberFlowInfo fi : infos) {
370 if (fi.getUniPort().equals(connectPoint.port())) {
371 isPending.set(true);
372 break;
373 }
Saurav Das026650f2020-09-21 18:56:35 -0700374 }
Andrea Campanella7a1d7e72020-11-05 10:40:10 +0100375 return infos;
376 });
Saurav Das026650f2020-09-21 18:56:35 -0700377
Andrea Campanella7a1d7e72020-11-05 10:40:10 +0100378 return isPending.get();
Saurav Das026650f2020-09-21 18:56:35 -0700379 }
380
Saurav Das2d3777a2020-08-07 18:48:51 -0700381 private class DeleteEapolInstallSub implements Runnable {
382 ConnectPoint cp;
383 Port uplinkPort;
384 SubscriberAndDeviceInformation sub;
385 private int attemptNumber;
386
387 DeleteEapolInstallSub(ConnectPoint cp, Port uplinkPort,
388 SubscriberAndDeviceInformation sub,
389 int attemptNumber) {
390 this.cp = cp;
391 this.uplinkPort = uplinkPort;
392 this.sub = sub;
393 this.attemptNumber = attemptNumber;
394 }
395
396 @Override
397 public void run() {
398 CompletableFuture<ObjectiveError> filterFuture = new CompletableFuture();
399 oltFlowService.processEapolFilteringObjectives(cp.deviceId(), cp.port(),
400 defaultBpId, filterFuture,
401 VlanId.vlanId(EAPOL_DEFAULT_VLAN),
402 false);
403 filterFuture.thenAcceptAsync(filterStatus -> {
404 if (filterStatus == null) {
405 log.info("Default eapol flow deleted in attempt {} of {}"
406 + "... provisioning subscriber flows {}",
407 attemptNumber, eapolDeleteRetryMaxAttempts, cp);
Matteo Scandoloa4aaa972020-10-23 15:24:38 -0700408
409 // FIXME this is needed to prevent that default EAPOL flow removal and
410 // data plane flows install are received by the device at the same time
411 provisionExecutor.schedule(
412 () -> provisionUniTagList(cp, uplinkPort.number(), sub),
413 provisionDelay, TimeUnit.MILLISECONDS);
Saurav Das2d3777a2020-08-07 18:48:51 -0700414 } else {
415 if (attemptNumber <= eapolDeleteRetryMaxAttempts) {
416 log.warn("The filtering future failed {} for subscriber {}"
417 + "... retrying {} of {} attempts",
418 filterStatus, cp, attemptNumber, eapolDeleteRetryMaxAttempts);
419 retryExecutor.execute(
420 new DeleteEapolInstallSub(cp, uplinkPort, sub,
421 attemptNumber + 1));
422 } else {
423 log.error("The filtering future failed {} for subscriber {}"
424 + "after {} attempts. Subscriber provisioning failed",
425 filterStatus, cp, eapolDeleteRetryMaxAttempts);
426 sub.uniTagList().forEach(ut -> failedSubs.put(cp, ut));
427 }
428 }
429 });
430 }
431
432 }
433
alshabibb7a9e172016-01-13 11:23:53 -0800434 @Override
Gamze Abaka838d8142019-02-21 07:06:55 +0000435 public boolean removeSubscriber(ConnectPoint connectPoint) {
Saurav Daseae48de2019-06-19 13:26:15 -0700436 log.info("Call to un-provision subscriber at {}", connectPoint);
Gamze Abaka838d8142019-02-21 07:06:55 +0000437
Saurav Daseae48de2019-06-19 13:26:15 -0700438 // Get the subscriber connected to this port from the local cache
439 // If we don't know about the subscriber there's no need to remove it
Gamze Abaka838d8142019-02-21 07:06:55 +0000440 DeviceId deviceId = connectPoint.deviceId();
441 PortNumber subscriberPortNo = connectPoint.port();
442
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800443 Collection<? extends UniTagInformation> uniTagInformationSet = programmedSubs.get(connectPoint).value();
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000444 if (uniTagInformationSet == null || uniTagInformationSet.isEmpty()) {
Gamze Abaka838d8142019-02-21 07:06:55 +0000445 log.warn("Subscriber on connectionPoint {} was not previously programmed, " +
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000446 "no need to remove it", connectPoint);
Matteo Scandolo962a6ad2018-12-11 15:39:42 -0800447 return true;
alshabibbf23a1f2016-01-14 17:27:11 -0800448 }
449
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100450 // Get the uplink port
Gamze Abaka838d8142019-02-21 07:06:55 +0000451 Port uplinkPort = getUplinkPort(deviceService.getDevice(deviceId));
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100452 if (uplinkPort == null) {
Gamze Abaka838d8142019-02-21 07:06:55 +0000453 log.warn(NO_UPLINK_PORT, deviceId);
Amit Ghosh31939522018-08-16 13:28:21 +0100454 return false;
alshabib4ceaed32016-03-03 18:00:58 -0800455 }
456
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000457 for (UniTagInformation uniTag : uniTagInformationSet) {
Amit Ghosh95e2f652017-08-23 12:49:46 +0100458
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000459 if (multicastServiceName.equals(uniTag.getServiceName())) {
460 continue;
461 }
Gamze Abaka838d8142019-02-21 07:06:55 +0000462
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000463 unprovisionVlans(deviceId, uplinkPort.number(), subscriberPortNo, uniTag);
alshabibbf23a1f2016-01-14 17:27:11 -0800464
Saurav Das9da7d522020-03-23 19:14:35 -0700465 // remove eapol with subscriber bandwidth profile
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000466 oltFlowService.processEapolFilteringObjectives(deviceId, subscriberPortNo,
467 uniTag.getUpstreamBandwidthProfile(),
468 null, uniTag.getPonCTag(), false);
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100469
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000470 Port port = deviceService.getPort(deviceId, subscriberPortNo);
471 if (port != null && port.isEnabled()) {
Saurav Das9da7d522020-03-23 19:14:35 -0700472 // reinstall eapol with default bandwidth profile
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000473 oltFlowService.processEapolFilteringObjectives(deviceId, subscriberPortNo, defaultBpId,
474 null, VlanId.vlanId(EAPOL_DEFAULT_VLAN), true);
475 } else {
476 log.debug("Port {} is no longer enabled or it's unavailable. Not "
477 + "reprogramming default eapol flow", connectPoint);
478 }
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100479 }
Amit Ghosh31939522018-08-16 13:28:21 +0100480 return true;
alshabibbf23a1f2016-01-14 17:27:11 -0800481 }
482
Gamze Abakaf59c0912019-04-19 08:24:28 +0000483
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000484 @Override
485 public boolean provisionSubscriber(AccessSubscriberId subscriberId, Optional<VlanId> sTag,
486 Optional<VlanId> cTag, Optional<Integer> tpId) {
487
488 log.info("Provisioning subscriber using subscriberId {}, sTag {}, cTag {}, tpId {}" +
489 "", subscriberId, sTag, cTag, tpId);
Gamze Abakaf59c0912019-04-19 08:24:28 +0000490
Amit Ghosh31939522018-08-16 13:28:21 +0100491 // Check if we can find the connect point to which this subscriber is connected
492 ConnectPoint subsPort = findSubscriberConnectPoint(subscriberId.toString());
493 if (subsPort == null) {
494 log.warn("ConnectPoint for {} not found", subscriberId);
495 return false;
496 }
497
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100498 if (!sTag.isPresent() && !cTag.isPresent()) {
499 return provisionSubscriber(subsPort);
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000500 } else if (sTag.isPresent() && cTag.isPresent() && tpId.isPresent()) {
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100501 Port uplinkPort = getUplinkPort(deviceService.getDevice(subsPort.deviceId()));
502 if (uplinkPort == null) {
Gamze Abaka838d8142019-02-21 07:06:55 +0000503 log.warn(NO_UPLINK_PORT, subsPort.deviceId());
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100504 return false;
505 }
506
Gamze Abakaf59c0912019-04-19 08:24:28 +0000507 //delete Eapol authentication flow with default bandwidth
508 //wait until Eapol rule with defaultBpId is removed to install subscriber-based rules
Gamze Abakaf59c0912019-04-19 08:24:28 +0000509 //install subscriber flows
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000510 CompletableFuture<ObjectiveError> filterFuture = new CompletableFuture();
511 oltFlowService.processEapolFilteringObjectives(subsPort.deviceId(), subsPort.port(), defaultBpId,
512 filterFuture, VlanId.vlanId(EAPOL_DEFAULT_VLAN), false);
Gamze Abakaf59c0912019-04-19 08:24:28 +0000513 filterFuture.thenAcceptAsync(filterStatus -> {
514 if (filterStatus == null) {
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000515 provisionUniTagInformation(subsPort.deviceId(), uplinkPort.number(), subsPort.port(),
516 cTag.get(), sTag.get(), tpId.get());
Gamze Abakaf59c0912019-04-19 08:24:28 +0000517 }
518 });
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100519 return true;
520 } else {
521 log.warn("Provisioning failed for subscriber: {}", subscriberId);
522 return false;
523 }
Amit Ghosh31939522018-08-16 13:28:21 +0100524 }
Amit Ghosh95e2f652017-08-23 12:49:46 +0100525
alshabibe0559672016-02-21 14:49:51 -0800526 @Override
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000527 public boolean removeSubscriber(AccessSubscriberId subscriberId, Optional<VlanId> sTag,
528 Optional<VlanId> cTag, Optional<Integer> tpId) {
Amit Ghosh31939522018-08-16 13:28:21 +0100529 // Check if we can find the connect point to which this subscriber is connected
530 ConnectPoint subsPort = findSubscriberConnectPoint(subscriberId.toString());
531 if (subsPort == null) {
532 log.warn("ConnectPoint for {} not found", subscriberId);
533 return false;
534 }
535
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100536 if (!sTag.isPresent() && !cTag.isPresent()) {
537 return removeSubscriber(subsPort);
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000538 } else if (sTag.isPresent() && cTag.isPresent() && tpId.isPresent()) {
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100539 // Get the uplink port
540 Port uplinkPort = getUplinkPort(deviceService.getDevice(subsPort.deviceId()));
541 if (uplinkPort == null) {
Gamze Abaka838d8142019-02-21 07:06:55 +0000542 log.warn(NO_UPLINK_PORT, subsPort.deviceId());
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100543 return false;
544 }
545
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000546 Optional<UniTagInformation> tagInfo = getUniTagInformation(subsPort, cTag.get(), sTag.get(), tpId.get());
547 if (!tagInfo.isPresent()) {
548 log.warn("UniTagInformation does not exist for Device/Port {}, cTag {}, sTag {}, tpId {}",
549 subsPort, cTag, sTag, tpId);
550 return false;
551 }
Gamze Abakaf59c0912019-04-19 08:24:28 +0000552
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000553 unprovisionVlans(subsPort.deviceId(), uplinkPort.number(), subsPort.port(), tagInfo.get());
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100554 return true;
555 } else {
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000556 log.warn("Removing subscriber is not possible - please check the provided information" +
557 "for the subscriber: {}", subscriberId);
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100558 return false;
559 }
Amit Ghosh31939522018-08-16 13:28:21 +0100560 }
561
562 @Override
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000563 public ImmutableMap<ConnectPoint, Set<UniTagInformation>> getProgSubs() {
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800564 return programmedSubs.stream()
565 .collect(collectingAndThen(
566 groupingBy(Map.Entry::getKey, mapping(Map.Entry::getValue, toSet())),
567 ImmutableMap::copyOf));
Saurav Das82b8e6d2018-10-04 15:25:12 -0700568 }
569
570 @Override
Saurav Das2d3777a2020-08-07 18:48:51 -0700571 public ImmutableMap<ConnectPoint, Set<UniTagInformation>> getFailedSubs() {
572 return failedSubs.stream()
573 .collect(collectingAndThen(
574 groupingBy(Map.Entry::getKey, mapping(Map.Entry::getValue, toSet())),
575 ImmutableMap::copyOf));
576 }
577
578 @Override
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100579 public List<DeviceId> fetchOlts() {
580 // look through all the devices and find the ones that are OLTs as per Sadis
581 List<DeviceId> olts = new ArrayList<>();
582 Iterable<Device> devices = deviceService.getDevices();
583 for (Device d : devices) {
Saurav Das82b8e6d2018-10-04 15:25:12 -0700584 if (getOltInfo(d) != null) {
585 // So this is indeed an OLT device
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100586 olts.add(d.id());
587 }
588 }
589 return olts;
alshabibe0559672016-02-21 14:49:51 -0800590 }
591
Amit Ghosh31939522018-08-16 13:28:21 +0100592 /**
593 * Finds the connect point to which a subscriber is connected.
594 *
595 * @param id The id of the subscriber, this is the same ID as in Sadis
596 * @return Subscribers ConnectPoint if found else null
597 */
598 private ConnectPoint findSubscriberConnectPoint(String id) {
599
600 Iterable<Device> devices = deviceService.getDevices();
601 for (Device d : devices) {
602 for (Port p : deviceService.getPorts(d.id())) {
603 log.trace("Comparing {} with {}", p.annotations().value(AnnotationKeys.PORT_NAME), id);
604 if (p.annotations().value(AnnotationKeys.PORT_NAME).equals(id)) {
605 log.debug("Found on device {} port {}", d.id(), p.number());
606 return new ConnectPoint(d.id(), p.number());
607 }
608 }
609 }
610 return null;
611 }
612
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000613 /**
614 * Gets the context of the bandwidth profile information for the given parameter.
615 *
616 * @param bandwidthProfile the bandwidth profile id
617 * @return the context of the bandwidth profile information
618 */
Gamze Abaka641fc072018-09-04 09:16:27 +0000619 private BandwidthProfileInformation getBandwidthProfileInformation(String bandwidthProfile) {
Ilayda Ozdemir90a93622021-02-25 09:40:58 +0000620 if (bpService == null) {
621 log.warn(SADIS_NOT_RUNNING);
622 return null;
623 }
Gamze Abaka641fc072018-09-04 09:16:27 +0000624 if (bandwidthProfile == null) {
625 return null;
626 }
627 return bpService.get(bandwidthProfile);
628 }
629
Gamze Abaka838d8142019-02-21 07:06:55 +0000630 /**
Gamze Abaka33feef52019-02-27 08:16:47 +0000631 * Removes subscriber vlan flows.
Gamze Abaka838d8142019-02-21 07:06:55 +0000632 *
633 * @param deviceId the device identifier
634 * @param uplink uplink port of the OLT
635 * @param subscriberPort uni port
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000636 * @param uniTag uni tag information
Gamze Abaka838d8142019-02-21 07:06:55 +0000637 */
Gamze Abaka33feef52019-02-27 08:16:47 +0000638 private void unprovisionVlans(DeviceId deviceId, PortNumber uplink,
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000639 PortNumber subscriberPort, UniTagInformation uniTag) {
640
641 log.info("Unprovisioning vlans for {} at {}/{}", uniTag, deviceId, subscriberPort);
alshabibbf23a1f2016-01-14 17:27:11 -0800642
643 CompletableFuture<ObjectiveError> downFuture = new CompletableFuture();
644 CompletableFuture<ObjectiveError> upFuture = new CompletableFuture();
645
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000646 VlanId deviceVlan = uniTag.getPonSTag();
647 VlanId subscriberVlan = uniTag.getPonCTag();
Gamze Abaka641fc072018-09-04 09:16:27 +0000648
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000649 MeterId upstreamMeterId = oltMeterService
650 .getMeterIdFromBpMapping(deviceId, uniTag.getUpstreamBandwidthProfile());
651 MeterId downstreamMeterId = oltMeterService
652 .getMeterIdFromBpMapping(deviceId, uniTag.getDownstreamBandwidthProfile());
Gamze Abaka641fc072018-09-04 09:16:27 +0000653
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000654 ForwardingObjective.Builder upFwd =
655 oltFlowService.createUpBuilder(uplink, subscriberPort, upstreamMeterId, uniTag);
656 ForwardingObjective.Builder downFwd =
657 oltFlowService.createDownBuilder(uplink, subscriberPort, downstreamMeterId, uniTag);
658
Andrea Campanella7c49b792020-05-11 11:36:53 +0200659 oltFlowService.processIgmpFilteringObjectives(deviceId, subscriberPort,
660 upstreamMeterId, uniTag, false, true);
661 oltFlowService.processDhcpFilteringObjectives(deviceId, subscriberPort,
662 upstreamMeterId, uniTag, false, true);
Gustavo Silva5c492dd2021-02-12 10:21:11 -0300663 oltFlowService.processPPPoEDFilteringObjectives(deviceId, subscriberPort,
664 upstreamMeterId, uniTag, false, true);
alshabibbf23a1f2016-01-14 17:27:11 -0800665
alshabib4ceaed32016-03-03 18:00:58 -0800666 flowObjectiveService.forward(deviceId, upFwd.remove(new ObjectiveContext() {
667 @Override
668 public void onSuccess(Objective objective) {
669 upFuture.complete(null);
670 }
alshabibbf23a1f2016-01-14 17:27:11 -0800671
alshabib4ceaed32016-03-03 18:00:58 -0800672 @Override
673 public void onError(Objective objective, ObjectiveError error) {
674 upFuture.complete(error);
675 }
676 }));
677
678 flowObjectiveService.forward(deviceId, downFwd.remove(new ObjectiveContext() {
679 @Override
680 public void onSuccess(Objective objective) {
681 downFuture.complete(null);
682 }
683
684 @Override
685 public void onError(Objective objective, ObjectiveError error) {
686 downFuture.complete(error);
687 }
688 }));
alshabibbf23a1f2016-01-14 17:27:11 -0800689
690 upFuture.thenAcceptBothAsync(downFuture, (upStatus, downStatus) -> {
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000691 AccessDeviceEvent.Type type = AccessDeviceEvent.Type.SUBSCRIBER_UNI_TAG_UNREGISTERED;
alshabibbf23a1f2016-01-14 17:27:11 -0800692 if (upStatus == null && downStatus == null) {
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000693 log.debug("Uni tag information is unregistered successfully for cTag {}, sTag {}, tpId {}, and" +
694 "Device/Port{}", uniTag.getPonCTag(), uniTag.getPonSTag(),
695 uniTag.getTechnologyProfileId(), subscriberPort);
696 updateProgrammedSubscriber(new ConnectPoint(deviceId, subscriberPort), uniTag, false);
alshabibbf23a1f2016-01-14 17:27:11 -0800697 } else if (downStatus != null) {
698 log.error("Subscriber with vlan {} on device {} " +
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000699 "on port {} failed downstream uninstallation: {}",
700 subscriberVlan, deviceId, subscriberPort, downStatus);
701 type = AccessDeviceEvent.Type.SUBSCRIBER_UNI_TAG_UNREGISTRATION_FAILED;
alshabibbf23a1f2016-01-14 17:27:11 -0800702 } else if (upStatus != null) {
703 log.error("Subscriber with vlan {} on device {} " +
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000704 "on port {} failed upstream uninstallation: {}",
705 subscriberVlan, deviceId, subscriberPort, upStatus);
706 type = AccessDeviceEvent.Type.SUBSCRIBER_UNI_TAG_UNREGISTRATION_FAILED;
alshabibbf23a1f2016-01-14 17:27:11 -0800707 }
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000708 Port port = deviceService.getPort(deviceId, subscriberPort);
709 post(new AccessDeviceEvent(type, deviceId, port, deviceVlan, subscriberVlan,
710 uniTag.getTechnologyProfileId()));
alshabibbf23a1f2016-01-14 17:27:11 -0800711 }, oltInstallers);
Jonathan Harte533a422015-10-20 17:31:24 -0700712 }
713
Gamze Abaka838d8142019-02-21 07:06:55 +0000714 /**
Gamze Abaka33feef52019-02-27 08:16:47 +0000715 * Adds subscriber vlan flows, dhcp, eapol and igmp trap flows for the related uni port.
Gamze Abaka838d8142019-02-21 07:06:55 +0000716 *
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000717 * @param connectPoint the connection point of the subscriber
718 * @param uplinkPort uplink port of the OLT (the nni port)
719 * @param sub subscriber information that includes s, c tags, tech profile and bandwidth profile references
Gamze Abaka838d8142019-02-21 07:06:55 +0000720 */
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000721 private void provisionUniTagList(ConnectPoint connectPoint, PortNumber uplinkPort,
722 SubscriberAndDeviceInformation sub) {
Gamze Abaka641fc072018-09-04 09:16:27 +0000723
Matteo Scandolo19b56f62020-10-29 13:29:21 -0700724 log.debug("Provisioning vlans for subscriber on dev/port: {}", connectPoint.toString());
725 if (log.isTraceEnabled()) {
726 log.trace("Subscriber informations {}", sub);
727 }
Gamze Abaka641fc072018-09-04 09:16:27 +0000728
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000729 if (sub.uniTagList() == null || sub.uniTagList().isEmpty()) {
Matteo Scandolo19b56f62020-10-29 13:29:21 -0700730 log.warn("Unitaglist doesn't exist for the subscriber {} on dev/port {}",
731 sub.id(), connectPoint.toString());
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000732 return;
733 }
Gamze Abaka641fc072018-09-04 09:16:27 +0000734
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000735 DeviceId deviceId = connectPoint.deviceId();
736 PortNumber subscriberPort = connectPoint.port();
Gamze Abaka641fc072018-09-04 09:16:27 +0000737
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000738 for (UniTagInformation uniTag : sub.uniTagList()) {
739 handleSubscriberFlows(deviceId, uplinkPort, subscriberPort, uniTag);
740 }
741 }
alshabib3ea82642016-01-12 18:06:53 -0800742
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000743 /**
744 * Finds the uni tag information and provisions the found information.
745 * If the uni tag information is not found, returns
746 *
747 * @param deviceId the access device id
748 * @param uplinkPort the nni port
749 * @param subscriberPort the uni port
750 * @param innerVlan the pon c tag
751 * @param outerVlan the pon s tag
752 * @param tpId the technology profile id
753 */
754 private void provisionUniTagInformation(DeviceId deviceId, PortNumber uplinkPort,
755 PortNumber subscriberPort,
756 VlanId innerVlan,
757 VlanId outerVlan,
758 Integer tpId) {
Jonathan Harte533a422015-10-20 17:31:24 -0700759
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000760 ConnectPoint cp = new ConnectPoint(deviceId, subscriberPort);
761 Optional<UniTagInformation> gotTagInformation = getUniTagInformation(cp, innerVlan, outerVlan, tpId);
762 if (!gotTagInformation.isPresent()) {
763 return;
764 }
765 UniTagInformation tagInformation = gotTagInformation.get();
766 handleSubscriberFlows(deviceId, uplinkPort, subscriberPort, tagInformation);
767 }
alshabib3ea82642016-01-12 18:06:53 -0800768
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000769 private void updateProgrammedSubscriber(ConnectPoint connectPoint, UniTagInformation tagInformation, Boolean add) {
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800770 if (add) {
771 programmedSubs.put(connectPoint, tagInformation);
772 } else {
773 programmedSubs.remove(connectPoint, tagInformation);
774 }
Jonathan Harte533a422015-10-20 17:31:24 -0700775 }
776
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000777 /**
778 * Installs a uni tag information flow.
779 *
780 * @param deviceId the access device id
781 * @param uplinkPort the nni port
782 * @param subscriberPort the uni port
783 * @param tagInfo the uni tag information
784 */
785 private void handleSubscriberFlows(DeviceId deviceId, PortNumber uplinkPort, PortNumber subscriberPort,
786 UniTagInformation tagInfo) {
787
Matteo Scandolo19b56f62020-10-29 13:29:21 -0700788 log.debug("Provisioning vlan-based flows for the uniTagInformation {} on dev/port {}/{}",
789 tagInfo, deviceId, subscriberPort);
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000790
791 Port port = deviceService.getPort(deviceId, subscriberPort);
792
793 if (multicastServiceName.equals(tagInfo.getServiceName())) {
794 // IGMP flows are taken care of along with VOD service
795 // Please note that for each service, Subscriber Registered event will be sent
796 post(new AccessDeviceEvent(AccessDeviceEvent.Type.SUBSCRIBER_UNI_TAG_REGISTERED,
797 deviceId, port, tagInfo.getPonSTag(), tagInfo.getPonCTag(),
798 tagInfo.getTechnologyProfileId()));
799 return;
Gamze Abaka641fc072018-09-04 09:16:27 +0000800 }
801
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000802 BandwidthProfileInformation upstreamBpInfo =
803 getBandwidthProfileInformation(tagInfo.getUpstreamBandwidthProfile());
804 BandwidthProfileInformation downstreamBpInfo =
805 getBandwidthProfileInformation(tagInfo.getDownstreamBandwidthProfile());
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700806 if (upstreamBpInfo == null) {
807 log.warn("No meter installed since no Upstream BW Profile definition found for "
Matteo Scandolo19b56f62020-10-29 13:29:21 -0700808 + "ctag {} stag {} tpId {} and dev/port: {}/{}",
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700809 tagInfo.getPonCTag(), tagInfo.getPonSTag(),
810 tagInfo.getTechnologyProfileId(), deviceId,
811 subscriberPort);
812 return;
813 }
814 if (downstreamBpInfo == null) {
815 log.warn("No meter installed since no Downstream BW Profile definition found for "
Matteo Scandolo19b56f62020-10-29 13:29:21 -0700816 + "ctag {} stag {} tpId {} and dev/port: {}/{}",
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700817 tagInfo.getPonCTag(), tagInfo.getPonSTag(),
818 tagInfo.getTechnologyProfileId(), deviceId,
819 subscriberPort);
820 return;
821 }
Gamze Abakaf59c0912019-04-19 08:24:28 +0000822
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700823 // check for meterIds for the upstream and downstream bandwidth profiles
824 MeterId upMeterId = oltMeterService
825 .getMeterIdFromBpMapping(deviceId, upstreamBpInfo.id());
826 MeterId downMeterId = oltMeterService
827 .getMeterIdFromBpMapping(deviceId, downstreamBpInfo.id());
828 SubscriberFlowInfo fi = new SubscriberFlowInfo(deviceId, uplinkPort, subscriberPort,
829 tagInfo, downMeterId, upMeterId,
830 downstreamBpInfo.id(), upstreamBpInfo.id());
831
832 if (upMeterId != null && downMeterId != null) {
Matteo Scandolo19b56f62020-10-29 13:29:21 -0700833 log.debug("Meters are existing for upstream {} and downstream {} on dev/port {}/{}",
834 upstreamBpInfo.id(), downstreamBpInfo.id(), deviceId, subscriberPort);
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700835 handleSubFlowsWithMeters(fi);
836 } else {
Matteo Scandolo19b56f62020-10-29 13:29:21 -0700837 log.debug("Adding {} on {}/{} to pending subs", fi, deviceId, subscriberPort);
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700838 // one or both meters are not ready. It's possible they are in the process of being
839 // created for other subscribers that share the same bandwidth profile.
Andrea Campanella7a1d7e72020-11-05 10:40:10 +0100840 pendingSubscribersForDevice.compute(deviceId, (id, queue) -> {
841 if (queue == null) {
842 queue = new LinkedBlockingQueue<>();
843 }
844 queue.add(fi);
845 log.info("Added {} to pending subscribers on {}/{}", fi, deviceId, subscriberPort);
846 return queue;
847 });
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700848
849 // queue up the meters to be created
850 if (upMeterId == null) {
Matteo Scandolo19b56f62020-10-29 13:29:21 -0700851 log.debug("Missing meter for upstream {} on {}/{}", upstreamBpInfo.id(), deviceId, subscriberPort);
Andrea Campanella600d2e22020-06-22 11:00:31 +0200852 checkAndCreateDevMeter(deviceId, upstreamBpInfo);
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700853 }
854 if (downMeterId == null) {
Matteo Scandolo19b56f62020-10-29 13:29:21 -0700855 log.debug("Missing meter for downstream {} on {}/{}", downstreamBpInfo.id(), deviceId, subscriberPort);
Andrea Campanella600d2e22020-06-22 11:00:31 +0200856 checkAndCreateDevMeter(deviceId, downstreamBpInfo);
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700857 }
858 }
859 }
Ilayda Ozdemir90a93622021-02-25 09:40:58 +0000860
Andrea Campanella600d2e22020-06-22 11:00:31 +0200861 private void checkAndCreateDevMeter(DeviceId deviceId, BandwidthProfileInformation bwpInfo) {
Andrea Campanellad1e26642020-10-23 12:08:32 +0200862 //If false the meter is already being installed, skipping installation
863 if (!oltMeterService.checkAndAddPendingMeter(deviceId, bwpInfo)) {
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700864 return;
865 }
Andrea Campanella600d2e22020-06-22 11:00:31 +0200866 createMeter(deviceId, bwpInfo);
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700867 }
868
Andrea Campanella600d2e22020-06-22 11:00:31 +0200869 private void createMeter(DeviceId deviceId, BandwidthProfileInformation bwpInfo) {
Matteo Scandolo19b56f62020-10-29 13:29:21 -0700870 log.debug("Creating Meter with {} on {}", bwpInfo, deviceId);
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700871 CompletableFuture<Object> meterFuture = new CompletableFuture<>();
Andrea Campanella3ce4d282020-06-09 13:46:58 +0200872
Andrea Campanella600d2e22020-06-22 11:00:31 +0200873 MeterId meterId = oltMeterService.createMeter(deviceId, bwpInfo,
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700874 meterFuture);
875
876 meterFuture.thenAcceptAsync(result -> {
Andrea Campanella7a1d7e72020-11-05 10:40:10 +0100877 BlockingQueue<SubscriberFlowInfo> queue = pendingSubscribersForDevice.get(deviceId);
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700878 // iterate through the subscribers on hold
Andrea Campanella7a1d7e72020-11-05 10:40:10 +0100879 if (queue != null) {
880 while (true) {
881 //TODO this might return the reference and not the actual object so
882 // it can be actually swapped underneath us.
883 SubscriberFlowInfo fi = queue.peek();
884 if (fi == null) {
885 log.debug("No more subscribers pending on {}", deviceId);
Ilayda Ozdemir90a93622021-02-25 09:40:58 +0000886 pendingSubscribersForDevice.replace(deviceId, queue);
Andrea Campanella7a1d7e72020-11-05 10:40:10 +0100887 break;
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700888 }
Andrea Campanella7a1d7e72020-11-05 10:40:10 +0100889 if (result == null) {
890 // meter install sent to device
891 log.debug("Meter {} installed for bw {} on {}", meterId, bwpInfo, deviceId);
892
893 MeterId upMeterId = oltMeterService
894 .getMeterIdFromBpMapping(deviceId, fi.getUpBpInfo());
895 MeterId downMeterId = oltMeterService
896 .getMeterIdFromBpMapping(deviceId, fi.getDownBpInfo());
897 if (upMeterId != null && downMeterId != null) {
898 log.debug("Provisioning subscriber after meter {} " +
899 "installation and both meters are present " +
900 "upstream {} and downstream {} on {}/{}",
901 meterId, upMeterId, downMeterId, deviceId, fi.getUniPort());
902 // put in the meterIds because when fi was first
903 // created there may or may not have been a meterId
904 // depending on whether the meter was created or
905 // not at that time.
906 fi.setUpMeterId(upMeterId);
907 fi.setDownMeterId(downMeterId);
908 handleSubFlowsWithMeters(fi);
909 queue.remove(fi);
910 }
911 oltMeterService.removeFromPendingMeters(deviceId, bwpInfo);
912 } else {
913 // meter install failed
914 log.error("Addition of subscriber {} on {}/{} failed due to meter " +
915 "{} with result {}", fi, deviceId, fi.getUniPort(),
916 meterId, result);
917 queue.remove(fi);
918 oltMeterService.removeFromPendingMeters(deviceId, bwpInfo);
919 }
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700920 }
Andrea Campanella7a1d7e72020-11-05 10:40:10 +0100921 } else {
922 log.info("No pending subscribers on {}", deviceId);
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700923 }
924 });
Andrea Campanella7a1d7e72020-11-05 10:40:10 +0100925
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700926 }
Ilayda Ozdemir90a93622021-02-25 09:40:58 +0000927
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700928 /**
929 * Add subscriber flows given meter information for both upstream and
930 * downstream directions.
931 *
932 * @param subscriberFlowInfo relevant information for subscriber
933 */
934 private void handleSubFlowsWithMeters(SubscriberFlowInfo subscriberFlowInfo) {
Matteo Scandolo19b56f62020-10-29 13:29:21 -0700935 log.debug("Provisioning subscriber flows on {}/{} based on {}",
936 subscriberFlowInfo.getDevId(), subscriberFlowInfo.getUniPort(), subscriberFlowInfo);
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700937 UniTagInformation tagInfo = subscriberFlowInfo.getTagInfo();
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000938 CompletableFuture<ObjectiveError> upFuture = new CompletableFuture<>();
939 CompletableFuture<ObjectiveError> downFuture = new CompletableFuture<>();
Gamze Abakaf59c0912019-04-19 08:24:28 +0000940
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700941 ForwardingObjective.Builder upFwd =
942 oltFlowService.createUpBuilder(subscriberFlowInfo.getNniPort(), subscriberFlowInfo.getUniPort(),
943 subscriberFlowInfo.getUpId(), subscriberFlowInfo.getTagInfo());
944 flowObjectiveService.forward(subscriberFlowInfo.getDevId(), upFwd.add(new ObjectiveContext() {
945 @Override
946 public void onSuccess(Objective objective) {
Matteo Scandolo19b56f62020-10-29 13:29:21 -0700947 log.debug("Upstream HSIA flow {} installed successfully on {}/{}",
948 subscriberFlowInfo, subscriberFlowInfo.getDevId(), subscriberFlowInfo.getUniPort());
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700949 upFuture.complete(null);
Gamze Abakaf59c0912019-04-19 08:24:28 +0000950 }
Gamze Abakaf59c0912019-04-19 08:24:28 +0000951
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700952 @Override
953 public void onError(Objective objective, ObjectiveError error) {
954 upFuture.complete(error);
Gamze Abakaf59c0912019-04-19 08:24:28 +0000955 }
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700956 }));
957
958 ForwardingObjective.Builder downFwd =
959 oltFlowService.createDownBuilder(subscriberFlowInfo.getNniPort(), subscriberFlowInfo.getUniPort(),
960 subscriberFlowInfo.getDownId(), subscriberFlowInfo.getTagInfo());
961 flowObjectiveService.forward(subscriberFlowInfo.getDevId(), downFwd.add(new ObjectiveContext() {
962 @Override
963 public void onSuccess(Objective objective) {
Matteo Scandolo19b56f62020-10-29 13:29:21 -0700964 log.debug("Downstream HSIA flow {} installed successfully on {}/{}",
965 subscriberFlowInfo, subscriberFlowInfo.getDevId(), subscriberFlowInfo.getUniPort());
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700966 downFuture.complete(null);
967 }
968
969 @Override
970 public void onError(Objective objective, ObjectiveError error) {
971 downFuture.complete(error);
972 }
973 }));
Gamze Abakaf59c0912019-04-19 08:24:28 +0000974
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100975 upFuture.thenAcceptBothAsync(downFuture, (upStatus, downStatus) -> {
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000976 AccessDeviceEvent.Type type = AccessDeviceEvent.Type.SUBSCRIBER_UNI_TAG_REGISTERED;
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100977 if (downStatus != null) {
Matteo Scandolo19b56f62020-10-29 13:29:21 -0700978 log.error("Flow with innervlan {} and outerVlan {} on {}/{} failed downstream installation: {}",
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700979 tagInfo.getPonCTag(), tagInfo.getPonSTag(), subscriberFlowInfo.getDevId(),
980 subscriberFlowInfo.getUniPort(), downStatus);
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000981 type = AccessDeviceEvent.Type.SUBSCRIBER_UNI_TAG_REGISTRATION_FAILED;
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100982 } else if (upStatus != null) {
Matteo Scandolo19b56f62020-10-29 13:29:21 -0700983 log.error("Flow with innervlan {} and outerVlan {} on {}/{} failed downstream installation: {}",
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700984 tagInfo.getPonCTag(), tagInfo.getPonSTag(), subscriberFlowInfo.getDevId(),
985 subscriberFlowInfo.getUniPort(), upStatus);
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000986 type = AccessDeviceEvent.Type.SUBSCRIBER_UNI_TAG_REGISTRATION_FAILED;
Gamze Abakaf59c0912019-04-19 08:24:28 +0000987 } else {
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700988 log.debug("Upstream and downstream data plane flows are installed successfully " +
Matteo Scandolo19b56f62020-10-29 13:29:21 -0700989 "for {}/{}", subscriberFlowInfo.getDevId(), subscriberFlowInfo.getUniPort());
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700990 oltFlowService.processEapolFilteringObjectives(subscriberFlowInfo.getDevId(),
991 subscriberFlowInfo.getUniPort(),
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000992 tagInfo.getUpstreamBandwidthProfile(),
993 null, tagInfo.getPonCTag(), true);
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700994 oltFlowService.processDhcpFilteringObjectives(subscriberFlowInfo.getDevId(),
995 subscriberFlowInfo.getUniPort(),
996 subscriberFlowInfo.getUpId(),
997 tagInfo, true, true);
Gamze Abakaf59c0912019-04-19 08:24:28 +0000998
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700999 oltFlowService.processIgmpFilteringObjectives(subscriberFlowInfo.getDevId(),
1000 subscriberFlowInfo.getUniPort(),
1001 subscriberFlowInfo.getUpId(),
1002 tagInfo, true, true);
Gustavo Silva5c492dd2021-02-12 10:21:11 -03001003
1004 oltFlowService.processPPPoEDFilteringObjectives(subscriberFlowInfo.getDevId(),
1005 subscriberFlowInfo.getUniPort(),
1006 subscriberFlowInfo.getUpId(),
1007 tagInfo, true, true);
Andrea Campanella0c3309d2020-05-29 01:51:18 -07001008 updateProgrammedSubscriber(new ConnectPoint(subscriberFlowInfo.getDevId(),
1009 subscriberFlowInfo.getUniPort()),
1010 tagInfo, true);
Amit Ghoshe1d3f092018-10-09 19:44:33 +01001011 }
Andrea Campanella0c3309d2020-05-29 01:51:18 -07001012 post(new AccessDeviceEvent(type, subscriberFlowInfo.getDevId(),
1013 deviceService.getPort(subscriberFlowInfo.getDevId(),
1014 subscriberFlowInfo.getUniPort()),
1015 tagInfo.getPonSTag(), tagInfo.getPonCTag(),
1016 tagInfo.getTechnologyProfileId()));
Amit Ghoshe1d3f092018-10-09 19:44:33 +01001017 }, oltInstallers);
Amit Ghoshe1d3f092018-10-09 19:44:33 +01001018 }
1019
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001020 /**
1021 * Checks the subscriber uni tag list and find the uni tag information.
1022 * using the pon c tag, pon s tag and the technology profile id
1023 * May return Optional<null>
1024 *
1025 * @param cp the connection point of the subscriber
1026 * @param innerVlan pon c tag
1027 * @param outerVlan pon s tag
1028 * @param tpId the technology profile id
1029 * @return the found uni tag information
1030 */
1031 private Optional<UniTagInformation> getUniTagInformation(ConnectPoint cp, VlanId innerVlan, VlanId outerVlan,
1032 int tpId) {
Matteo Scandolo19b56f62020-10-29 13:29:21 -07001033 log.debug("Getting uni tag information for cp: {}, innerVlan: {}, outerVlan: {}, tpId: {}",
1034 cp.toString(), innerVlan, outerVlan, tpId);
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001035 SubscriberAndDeviceInformation subInfo = getSubscriber(cp);
Gamze Abakaf59c0912019-04-19 08:24:28 +00001036 if (subInfo == null) {
Matteo Scandolo19b56f62020-10-29 13:29:21 -07001037 log.warn("Subscriber information doesn't exist for the connect point {}", cp.toString());
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001038 return Optional.empty();
Gamze Abakaf59c0912019-04-19 08:24:28 +00001039 }
1040
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001041 List<UniTagInformation> uniTagList = subInfo.uniTagList();
1042 if (uniTagList == null) {
Matteo Scandolo19b56f62020-10-29 13:29:21 -07001043 log.warn("Uni tag list is not found for the subscriber {} on {}", subInfo.id(), cp.toString());
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001044 return Optional.empty();
1045 }
Amit Ghoshe1d3f092018-10-09 19:44:33 +01001046
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001047 UniTagInformation service = null;
1048 for (UniTagInformation tagInfo : subInfo.uniTagList()) {
1049 if (innerVlan.equals(tagInfo.getPonCTag()) && outerVlan.equals(tagInfo.getPonSTag())
1050 && tpId == tagInfo.getTechnologyProfileId()) {
1051 service = tagInfo;
1052 break;
Andy Bavier160e8682019-05-07 18:32:22 -07001053 }
Gamze Abaka1efc80c2019-02-15 12:10:54 +00001054 }
Gamze Abaka1efc80c2019-02-15 12:10:54 +00001055
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001056 if (service == null) {
Matteo Scandolo19b56f62020-10-29 13:29:21 -07001057 log.warn("SADIS doesn't include the service with ponCtag {} ponStag {} and tpId {} on {}",
1058 innerVlan, outerVlan, tpId, cp.toString());
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001059 return Optional.empty();
Gamze Abaka33feef52019-02-27 08:16:47 +00001060 }
1061
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001062 return Optional.of(service);
Amit Ghosh95e2f652017-08-23 12:49:46 +01001063 }
1064
Amit Ghosh1ed9aef2018-07-17 17:08:16 +01001065 /**
Jonathan Hart403372d2018-08-22 11:44:13 -07001066 * Creates trap flows for device, including DHCP and LLDP trap on NNI and
1067 * EAPOL trap on the UNIs, if device is present in Sadis config.
Amit Ghosh1ed9aef2018-07-17 17:08:16 +01001068 *
1069 * @param dev Device to look for
1070 */
Jonathan Hart403372d2018-08-22 11:44:13 -07001071 private void checkAndCreateDeviceFlows(Device dev) {
Amit Ghosh1ed9aef2018-07-17 17:08:16 +01001072 // check if this device is provisioned in Sadis
Gamze Abaka641fc072018-09-04 09:16:27 +00001073 SubscriberAndDeviceInformation deviceInfo = getOltInfo(dev);
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001074 log.info("checkAndCreateDeviceFlows: deviceInfo {}", deviceInfo);
Amit Ghosh1ed9aef2018-07-17 17:08:16 +01001075
1076 if (deviceInfo != null) {
Jonathan Hart403372d2018-08-22 11:44:13 -07001077 // This is an OLT device as per Sadis, we create flows for UNI and NNI ports
Amit Ghosh1ed9aef2018-07-17 17:08:16 +01001078 for (Port p : deviceService.getPorts(dev.id())) {
Jonathan Hart4f178fa2020-02-03 10:46:01 -08001079 if (PortNumber.LOCAL.equals(p.number()) || !p.isEnabled()) {
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001080 continue;
1081 }
Amit Ghosh1ed9aef2018-07-17 17:08:16 +01001082 if (isUniPort(dev, p)) {
Andrea Campanellaa2491782020-03-13 18:09:31 +01001083 if (!programmedSubs.containsKey(new ConnectPoint(dev.id(), p.number()))) {
Matteo Scandolo19b56f62020-10-29 13:29:21 -07001084 log.info("Creating Eapol on {}/{}", dev.id(), p.number());
Andrea Campanellaa2491782020-03-13 18:09:31 +01001085 oltFlowService.processEapolFilteringObjectives(dev.id(), p.number(), defaultBpId, null,
1086 VlanId.vlanId(EAPOL_DEFAULT_VLAN), true);
1087 } else {
Matteo Scandolo19b56f62020-10-29 13:29:21 -07001088 log.debug("Subscriber Eapol on {}/{} is already provisioned, not installing default",
1089 dev.id(), p.number());
Andrea Campanellaa2491782020-03-13 18:09:31 +01001090 }
Jonathan Hart403372d2018-08-22 11:44:13 -07001091 } else {
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001092 oltFlowService.processNniFilteringObjectives(dev.id(), p.number(), true);
Amit Ghosh1ed9aef2018-07-17 17:08:16 +01001093 }
1094 }
1095 }
1096 }
1097
Jonathan Hart403372d2018-08-22 11:44:13 -07001098
Amit Ghosh1ed9aef2018-07-17 17:08:16 +01001099 /**
1100 * Get the uplink for of the OLT device.
Gamze Abakaad329652018-12-20 10:12:21 +00001101 * <p>
Amit Ghosh1ed9aef2018-07-17 17:08:16 +01001102 * This assumes that the OLT has a single uplink port. When more uplink ports need to be supported
1103 * this logic needs to be changed
1104 *
1105 * @param dev Device to look for
1106 * @return The uplink Port of the OLT
1107 */
1108 private Port getUplinkPort(Device dev) {
1109 // check if this device is provisioned in Sadis
Gamze Abaka641fc072018-09-04 09:16:27 +00001110 SubscriberAndDeviceInformation deviceInfo = getOltInfo(dev);
Saurav Daseae48de2019-06-19 13:26:15 -07001111 log.trace("getUplinkPort: deviceInfo {}", deviceInfo);
Saurav Das82b8e6d2018-10-04 15:25:12 -07001112 if (deviceInfo == null) {
1113 log.warn("Device {} is not configured in SADIS .. cannot fetch device"
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001114 + " info", dev.id());
Saurav Das82b8e6d2018-10-04 15:25:12 -07001115 return null;
1116 }
1117 // Return the port that has been configured as the uplink port of this OLT in Sadis
kdarapuaa5da252020-04-10 15:58:05 +05301118 Optional<Port> optionalPort = deviceService.getPorts(dev.id()).stream()
Andrea Campanella0c3309d2020-05-29 01:51:18 -07001119 .filter(port -> isNniPort(port) ||
1120 (port.number().toLong() == deviceInfo.uplinkPort()))
1121 .findFirst();
kdarapuaa5da252020-04-10 15:58:05 +05301122 if (optionalPort.isPresent()) {
1123 log.trace("getUplinkPort: Found port {}", optionalPort.get());
1124 return optionalPort.get();
Amit Ghosh1ed9aef2018-07-17 17:08:16 +01001125 }
1126
Saurav Daseae48de2019-06-19 13:26:15 -07001127 log.warn("getUplinkPort: " + NO_UPLINK_PORT, dev.id());
Amit Ghosh1ed9aef2018-07-17 17:08:16 +01001128 return null;
1129 }
1130
1131 /**
1132 * Return the subscriber on a port.
1133 *
Matteo Scandolo962a6ad2018-12-11 15:39:42 -08001134 * @param cp ConnectPoint on which to find the subscriber
Amit Ghosh1ed9aef2018-07-17 17:08:16 +01001135 * @return subscriber if found else null
1136 */
Ilayda Ozdemir90a93622021-02-25 09:40:58 +00001137 protected SubscriberAndDeviceInformation getSubscriber(ConnectPoint cp) {
1138 if (subsService == null) {
1139 log.warn(SADIS_NOT_RUNNING);
1140 return null;
1141 }
Matteo Scandolo962a6ad2018-12-11 15:39:42 -08001142 Port port = deviceService.getPort(cp);
1143 checkNotNull(port, "Invalid connect point");
1144 String portName = port.annotations().value(AnnotationKeys.PORT_NAME);
Amit Ghosh1ed9aef2018-07-17 17:08:16 +01001145 return subsService.get(portName);
1146 }
1147
Gamze Abakaad329652018-12-20 10:12:21 +00001148 /**
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001149 * Checks whether the given port of the device is a uni port or not.
1150 *
1151 * @param d the access device
1152 * @param p the port of the device
1153 * @return true if the given port is a uni port
Gamze Abakaad329652018-12-20 10:12:21 +00001154 */
Amit Ghosh1ed9aef2018-07-17 17:08:16 +01001155 private boolean isUniPort(Device d, Port p) {
1156 Port ulPort = getUplinkPort(d);
1157 if (ulPort != null) {
1158 return (ulPort.number().toLong() != p.number().toLong());
1159 }
Thomas Lee Sd7735f92020-02-20 19:21:47 +05301160 //handles a special case where NNI port is misconfigured in SADIS and getUplinkPort(d) returns null
1161 //checks whether the port name starts with nni- which is the signature of an NNI Port
1162 if (p.annotations().value(AnnotationKeys.PORT_NAME) != null &&
1163 p.annotations().value(AnnotationKeys.PORT_NAME).startsWith(NNI)) {
1164 log.error("NNI port number {} is not matching with configured value", p.number().toLong());
1165 return false;
1166 }
1167 return true;
Jonathan Hart1d34c8b2018-05-05 15:37:28 -07001168 }
1169
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001170 /**
1171 * Gets the given device details from SADIS.
1172 * If the device is not found, returns null
1173 *
1174 * @param dev the access device
1175 * @return the olt information
1176 */
Jonathan Hart4c538002018-08-23 10:11:54 -07001177 private SubscriberAndDeviceInformation getOltInfo(Device dev) {
Ilayda Ozdemir90a93622021-02-25 09:40:58 +00001178 if (subsService == null) {
1179 log.warn(SADIS_NOT_RUNNING);
1180 return null;
1181 }
Jonathan Hart4c538002018-08-23 10:11:54 -07001182 String devSerialNo = dev.serialNumber();
Gamze Abaka641fc072018-09-04 09:16:27 +00001183 return subsService.get(devSerialNo);
Jonathan Hart4c538002018-08-23 10:11:54 -07001184 }
1185
Andrea Campanellaaf39b4c2020-05-13 14:07:44 +02001186 // Custom-built function, when the device is not available we need a fallback mechanism
1187 private boolean isLocalLeader(DeviceId deviceId) {
1188 if (!mastershipService.isLocalMaster(deviceId)) {
1189 // When the device is available we just check the mastership
1190 if (deviceService.isAvailable(deviceId)) {
1191 return false;
1192 }
1193 // Fallback with Leadership service - device id is used as topic
1194 NodeId leader = leadershipService.runForLeadership(
1195 deviceId.toString()).leaderNodeId();
1196 // Verify if this node is the leader
1197 return clusterService.getLocalNode().id().equals(leader);
Jonathan Hart4f178fa2020-02-03 10:46:01 -08001198 }
Andrea Campanellaaf39b4c2020-05-13 14:07:44 +02001199 return true;
Jonathan Hart4f178fa2020-02-03 10:46:01 -08001200 }
1201
kdarapuaa5da252020-04-10 15:58:05 +05301202 private boolean isNniPort(Port port) {
1203 if (port.annotations().keys().contains(AnnotationKeys.PORT_NAME)) {
1204 return port.annotations().value(AnnotationKeys.PORT_NAME).contains(NNI);
1205 }
1206 return false;
1207 }
1208
alshabibf0e7e702015-05-30 18:22:36 -07001209 private class InternalDeviceListener implements DeviceListener {
Saurav Dasa9d5f442019-03-06 19:32:48 -08001210 private Set<DeviceId> programmedDevices = Sets.newConcurrentHashSet();
1211
alshabibf0e7e702015-05-30 18:22:36 -07001212 @Override
1213 public void event(DeviceEvent event) {
Matteo Scandolo632f0fc2018-09-07 12:21:45 -07001214 eventExecutor.execute(() -> {
1215 DeviceId devId = event.subject().id();
1216 Device dev = event.subject();
Gamze Abaka838d8142019-02-21 07:06:55 +00001217 Port port = event.port();
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001218 DeviceEvent.Type eventType = event.type();
Jonathan Hart4c538002018-08-23 10:11:54 -07001219
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001220 if (DeviceEvent.Type.PORT_STATS_UPDATED.equals(eventType) ||
1221 DeviceEvent.Type.DEVICE_SUSPENDED.equals(eventType) ||
1222 DeviceEvent.Type.DEVICE_UPDATED.equals(eventType)) {
Matteo Scandolo632f0fc2018-09-07 12:21:45 -07001223 return;
1224 }
Jonathan Hart4c538002018-08-23 10:11:54 -07001225
Andrea Campanellaaf39b4c2020-05-13 14:07:44 +02001226 boolean isLocalLeader = isLocalLeader(devId);
Jonathan Hart4f178fa2020-02-03 10:46:01 -08001227 // Only handle the event if the device belongs to us
Andrea Campanellaaf39b4c2020-05-13 14:07:44 +02001228 if (!isLocalLeader && event.type().equals(DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED)
1229 && !deviceService.isAvailable(devId) && deviceService.getPorts(devId).isEmpty()) {
1230 log.info("Cleaning local state for non master instance upon " +
Andrea Campanella0c3309d2020-05-29 01:51:18 -07001231 "device disconnection {}", devId);
Andrea Campanellaaf39b4c2020-05-13 14:07:44 +02001232 // Since no mastership of the device is present upon disconnection
1233 // the method in the FlowRuleManager only empties the local copy
1234 // of the DeviceFlowTable thus this method needs to get called
1235 // on every instance, see how it's done in the InternalDeviceListener
1236 // in FlowRuleManager: no mastership check for purgeOnDisconnection
Andrea Campanella3f34c992020-07-15 10:54:10 +02001237 handleDeviceDisconnection(dev, false, false);
Andrea Campanellaaf39b4c2020-05-13 14:07:44 +02001238 return;
1239 } else if (!isLocalLeader) {
Andrea Campanella506df202020-05-21 10:26:12 +02001240 log.debug("Not handling event because instance is not leader for {}", devId);
Jonathan Hart4f178fa2020-02-03 10:46:01 -08001241 return;
1242 }
1243
Matteo Scandolo19b56f62020-10-29 13:29:21 -07001244 log.debug("OLT got {} event for {}/{}", eventType, event.subject(), event.port());
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001245
Matteo Scandolo632f0fc2018-09-07 12:21:45 -07001246 if (getOltInfo(dev) == null) {
Saurav Dasa9d5f442019-03-06 19:32:48 -08001247 // it's possible that we got an event for a previously
1248 // programmed OLT that is no longer available in SADIS
1249 // we let such events go through
1250 if (!programmedDevices.contains(devId)) {
1251 log.warn("No device info found for {}, this is either "
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001252 + "not an OLT or not known to sadis", dev);
Saurav Dasa9d5f442019-03-06 19:32:48 -08001253 return;
1254 }
Matteo Scandolo632f0fc2018-09-07 12:21:45 -07001255 }
Jonathan Hart4c538002018-08-23 10:11:54 -07001256
Matteo Scandolo632f0fc2018-09-07 12:21:45 -07001257 switch (event.type()) {
1258 //TODO: Port handling and bookkeeping should be improved once
1259 // olt firmware handles correct behaviour.
1260 case PORT_ADDED:
Andrea Campanella3f34c992020-07-15 10:54:10 +02001261 if (!deviceService.isAvailable(devId)) {
1262 log.warn("Received {} for disconnected device {}, ignoring", event, devId);
1263 return;
1264 }
Gamze Abaka838d8142019-02-21 07:06:55 +00001265 if (isUniPort(dev, port)) {
1266 post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_ADDED, devId, port));
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001267
1268 if (port.isEnabled() && !port.number().equals(PortNumber.LOCAL)) {
Matteo Scandolo19b56f62020-10-29 13:29:21 -07001269 log.info("eapol will be sent for port added {}/{}", devId, port);
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001270 oltFlowService.processEapolFilteringObjectives(devId, port.number(), defaultBpId,
1271 null,
Andrea Campanella3f34c992020-07-15 10:54:10 +02001272 VlanId.vlanId(EAPOL_DEFAULT_VLAN),
1273 true);
Matteo Scandolo632f0fc2018-09-07 12:21:45 -07001274 }
1275 } else {
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001276 SubscriberAndDeviceInformation deviceInfo = getOltInfo(dev);
1277 if (deviceInfo != null) {
1278 oltFlowService.processNniFilteringObjectives(dev.id(), port.number(), true);
1279 }
Matteo Scandolo632f0fc2018-09-07 12:21:45 -07001280 }
1281 break;
1282 case PORT_REMOVED:
Gamze Abaka838d8142019-02-21 07:06:55 +00001283 if (isUniPort(dev, port)) {
Andrea Campanellacf0e3052020-08-27 11:05:39 +02001284 // if no subscriber is provisioned we need to remove the default EAPOL
1285 // if a subscriber was provisioned the default EAPOL will not be there and we can skip.
1286 // The EAPOL with subscriber tag will be removed by removeSubscriber call.
1287 Collection<? extends UniTagInformation> uniTagInformationSet =
1288 programmedSubs.get(new ConnectPoint(port.element().id(), port.number())).value();
1289 if (uniTagInformationSet == null || uniTagInformationSet.isEmpty()) {
Matteo Scandolo19b56f62020-10-29 13:29:21 -07001290 log.info("No subscriber provisioned on port {}/{} in PORT_REMOVED event, " +
1291 "removing default EAPOL flow", devId, port);
Andrea Campanellacf0e3052020-08-27 11:05:39 +02001292 oltFlowService.processEapolFilteringObjectives(devId, port.number(), defaultBpId,
1293 null,
1294 VlanId.vlanId(EAPOL_DEFAULT_VLAN),
1295 false);
1296 } else {
1297 removeSubscriber(new ConnectPoint(devId, port.number()));
1298 }
Andy Bavier160e8682019-05-07 18:32:22 -07001299
Gamze Abaka838d8142019-02-21 07:06:55 +00001300 post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_REMOVED, devId, port));
Matteo Scandolo632f0fc2018-09-07 12:21:45 -07001301 }
Matteo Scandolo632f0fc2018-09-07 12:21:45 -07001302 break;
1303 case PORT_UPDATED:
Andrea Campanella3f34c992020-07-15 10:54:10 +02001304 if (!deviceService.isAvailable(devId)) {
1305 log.warn("Received {} for disconnected device {}, ignoring", event, devId);
1306 return;
1307 }
Gamze Abaka838d8142019-02-21 07:06:55 +00001308 if (!isUniPort(dev, port)) {
Saurav Das9da7d522020-03-23 19:14:35 -07001309 SubscriberAndDeviceInformation deviceInfo = getOltInfo(dev);
1310 if (deviceInfo != null && port.isEnabled()) {
1311 log.debug("NNI dev/port {}/{} enabled", dev.id(),
1312 port.number());
1313 oltFlowService.processNniFilteringObjectives(dev.id(),
1314 port.number(), true);
1315 }
1316 return;
Matteo Scandolo632f0fc2018-09-07 12:21:45 -07001317 }
Jonathan Hart4f178fa2020-02-03 10:46:01 -08001318 ConnectPoint cp = new ConnectPoint(devId, port.number());
1319 Collection<? extends UniTagInformation> uniTagInformationSet = programmedSubs.get(cp).value();
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001320 if (uniTagInformationSet == null || uniTagInformationSet.isEmpty()) {
Saurav Dasb776aef2020-03-09 14:29:46 -07001321 if (!port.number().equals(PortNumber.LOCAL)) {
Matteo Scandolo3a037a32020-04-01 12:17:50 -07001322 log.info("eapol will be {} for dev/port updated {}/{} with default vlan {}",
Saurav Dasb776aef2020-03-09 14:29:46 -07001323 (port.isEnabled()) ? "added" : "removed",
Matteo Scandolo3a037a32020-04-01 12:17:50 -07001324 devId, port.number(), EAPOL_DEFAULT_VLAN);
Matteo Scandolo27c471c2020-02-11 16:41:53 -08001325 oltFlowService.processEapolFilteringObjectives(devId, port.number(), defaultBpId,
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001326 null,
1327 VlanId.vlanId(EAPOL_DEFAULT_VLAN),
1328 port.isEnabled());
1329 }
1330 } else {
Saurav Dasb776aef2020-03-09 14:29:46 -07001331 log.info("eapol will be {} for dev/port updated {}/{}",
1332 (port.isEnabled()) ? "added" : "removed",
1333 devId, port.number());
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001334 uniTagInformationSet.forEach(uniTag ->
1335 oltFlowService.processEapolFilteringObjectives(devId, port.number(),
1336 uniTag.getUpstreamBandwidthProfile(), null,
1337 uniTag.getPonCTag(), port.isEnabled()));
1338 }
Gamze Abaka838d8142019-02-21 07:06:55 +00001339 if (port.isEnabled()) {
Gamze Abaka838d8142019-02-21 07:06:55 +00001340 post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_ADDED, devId, port));
Matteo Scandolo632f0fc2018-09-07 12:21:45 -07001341 } else {
Gamze Abaka838d8142019-02-21 07:06:55 +00001342 post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_REMOVED, devId, port));
Jonathan Hart1d34c8b2018-05-05 15:37:28 -07001343 }
alshabibbb83aa22016-02-10 15:08:23 -08001344 break;
Matteo Scandolo632f0fc2018-09-07 12:21:45 -07001345 case DEVICE_ADDED:
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001346 handleDeviceConnection(dev, true);
Matteo Scandolo632f0fc2018-09-07 12:21:45 -07001347 break;
1348 case DEVICE_REMOVED:
Andrea Campanella3f34c992020-07-15 10:54:10 +02001349 handleDeviceDisconnection(dev, true, true);
Matteo Scandolo632f0fc2018-09-07 12:21:45 -07001350 break;
1351 case DEVICE_AVAILABILITY_CHANGED:
1352 if (deviceService.isAvailable(devId)) {
Saurav Dasbd3b6712020-03-31 23:28:35 -07001353 log.info("Handling available device: {}", dev.id());
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001354 handleDeviceConnection(dev, false);
Matteo Scandolo632f0fc2018-09-07 12:21:45 -07001355 } else {
Hardik Windlassa58fbee2020-03-12 18:33:55 +05301356 if (deviceService.getPorts(devId).isEmpty()) {
Saurav Dasbd3b6712020-03-31 23:28:35 -07001357 log.info("Handling controlled device disconnection .. "
Andrea Campanella0c3309d2020-05-29 01:51:18 -07001358 + "flushing all state for dev:{}", devId);
Andrea Campanella3f34c992020-07-15 10:54:10 +02001359 handleDeviceDisconnection(dev, true, false);
Saurav Dasbd3b6712020-03-31 23:28:35 -07001360 } else {
1361 log.info("Disconnected device has available ports .. "
Andrea Campanella0c3309d2020-05-29 01:51:18 -07001362 + "assuming temporary disconnection, "
1363 + "retaining state for device {}", devId);
Hardik Windlassa58fbee2020-03-12 18:33:55 +05301364 }
Matteo Scandolo632f0fc2018-09-07 12:21:45 -07001365 }
1366 break;
Matteo Scandolo632f0fc2018-09-07 12:21:45 -07001367 default:
Andrea Campanella3f34c992020-07-15 10:54:10 +02001368 log.debug("Not handling event {}", event);
Matteo Scandolo632f0fc2018-09-07 12:21:45 -07001369 return;
1370 }
1371 });
alshabibf0e7e702015-05-30 18:22:36 -07001372 }
Gamze Abaka838d8142019-02-21 07:06:55 +00001373
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001374 private void sendUniEvent(Device device, AccessDeviceEvent.Type eventType) {
1375 deviceService.getPorts(device.id()).stream()
1376 .filter(p -> !PortNumber.LOCAL.equals(p.number()))
1377 .filter(p -> isUniPort(device, p))
1378 .forEach(p -> post(new AccessDeviceEvent(eventType, device.id(), p)));
1379 }
1380
Andrea Campanella3f34c992020-07-15 10:54:10 +02001381 private void handleDeviceDisconnection(Device device, boolean sendDisconnectedEvent, boolean sendUniEvent) {
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001382 programmedDevices.remove(device.id());
1383 removeAllSubscribers(device.id());
Andrea Campanella600d2e22020-06-22 11:00:31 +02001384 //Handle case where OLT disconnects during subscriber provisioning
Andrea Campanella7a1d7e72020-11-05 10:40:10 +01001385 pendingSubscribersForDevice.remove(device.id());
Andrea Campanella600d2e22020-06-22 11:00:31 +02001386 oltFlowService.clearDeviceState(device.id());
1387
1388 //Complete meter and flow purge
Hardik Windlassa58fbee2020-03-12 18:33:55 +05301389 flowRuleService.purgeFlowRules(device.id());
Jonathan Hart4f178fa2020-02-03 10:46:01 -08001390 oltMeterService.clearMeters(device.id());
Andrea Campanella3f34c992020-07-15 10:54:10 +02001391 if (sendDisconnectedEvent) {
1392 post(new AccessDeviceEvent(
1393 AccessDeviceEvent.Type.DEVICE_DISCONNECTED, device.id(),
1394 null, null, null));
1395 }
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001396 if (sendUniEvent) {
1397 sendUniEvent(device, AccessDeviceEvent.Type.UNI_REMOVED);
Gamze Abaka838d8142019-02-21 07:06:55 +00001398 }
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001399 }
1400
1401 private void handleDeviceConnection(Device dev, boolean sendUniEvent) {
1402 post(new AccessDeviceEvent(
1403 AccessDeviceEvent.Type.DEVICE_CONNECTED, dev.id(),
1404 null, null, null));
1405 programmedDevices.add(dev.id());
1406 checkAndCreateDeviceFlows(dev);
1407 if (sendUniEvent) {
1408 sendUniEvent(dev, AccessDeviceEvent.Type.UNI_ADDED);
1409 }
Gamze Abaka838d8142019-02-21 07:06:55 +00001410 }
Gamze Abakada282b42019-03-11 13:16:48 +00001411
1412 private void removeAllSubscribers(DeviceId deviceId) {
Jonathan Hart4f178fa2020-02-03 10:46:01 -08001413 List<Map.Entry<ConnectPoint, UniTagInformation>> subs = programmedSubs.stream()
1414 .filter(e -> e.getKey().deviceId().equals(deviceId))
1415 .collect(toList());
Gamze Abakada282b42019-03-11 13:16:48 +00001416
Jonathan Hart4f178fa2020-02-03 10:46:01 -08001417 subs.forEach(e -> programmedSubs.remove(e.getKey(), e.getValue()));
Gamze Abakada282b42019-03-11 13:16:48 +00001418 }
Gamze Abaka641fc072018-09-04 09:16:27 +00001419
Gamze Abaka641fc072018-09-04 09:16:27 +00001420 }
Jonathan Hart4f178fa2020-02-03 10:46:01 -08001421
1422 private class InternalClusterListener implements ClusterEventListener {
1423
1424 @Override
1425 public void event(ClusterEvent event) {
1426 if (event.type() == ClusterEvent.Type.INSTANCE_READY) {
1427 hasher.addServer(event.subject().id());
1428 }
1429 if (event.type() == ClusterEvent.Type.INSTANCE_DEACTIVATED) {
1430 hasher.removeServer(event.subject().id());
1431 }
1432 }
1433 }
Andrea Campanella0c3309d2020-05-29 01:51:18 -07001434
Hardik Windlass395ff372019-06-13 05:16:00 +00001435}