blob: 79d30c68a205e186337b125a7408817f39392bb9 [file] [log] [blame]
alshabibf0e7e702015-05-30 18:22:36 -07001/*
Matteo Scandoloaa2adde2021-09-13 12:45:32 -07002 * Copyright 2021-present Open Networking Foundation
alshabibf0e7e702015-05-30 18:22:36 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
alshabib36a4d732016-06-01 16:03:59 -070016package org.opencord.olt.impl;
alshabibf0e7e702015-05-30 18:22:36 -070017
alshabibf0e7e702015-05-30 18:22:36 -070018import org.onlab.packet.VlanId;
Jonathan Hart4f178fa2020-02-03 10:46:01 -080019import org.onlab.util.KryoNamespace;
Gamze Abaka1f62dd92020-05-07 08:58:13 +000020import org.onosproject.cfg.ComponentConfigService;
Jonathan Hart4f178fa2020-02-03 10:46:01 -080021import org.onosproject.cluster.ClusterService;
Andrea Campanellaaf39b4c2020-05-13 14:07:44 +020022import org.onosproject.cluster.LeadershipService;
alshabibf0e7e702015-05-30 18:22:36 -070023import org.onosproject.core.ApplicationId;
24import org.onosproject.core.CoreService;
alshabib8e4fd2f2016-01-12 15:55:53 -080025import org.onosproject.event.AbstractListenerManager;
Andrea Campanellaaf39b4c2020-05-13 14:07:44 +020026import org.onosproject.mastership.MastershipService;
Amit Ghosh1ed9aef2018-07-17 17:08:16 +010027import org.onosproject.net.AnnotationKeys;
Jonathan Harte533a422015-10-20 17:31:24 -070028import org.onosproject.net.ConnectPoint;
Amit Ghosh1ed9aef2018-07-17 17:08:16 +010029import org.onosproject.net.Device;
alshabibf0e7e702015-05-30 18:22:36 -070030import org.onosproject.net.DeviceId;
alshabibdec2e252016-01-15 12:20:25 -080031import org.onosproject.net.Port;
alshabibf0e7e702015-05-30 18:22:36 -070032import org.onosproject.net.PortNumber;
33import org.onosproject.net.device.DeviceEvent;
34import org.onosproject.net.device.DeviceListener;
35import org.onosproject.net.device.DeviceService;
Jonathan Hart4f178fa2020-02-03 10:46:01 -080036import org.onosproject.store.serializers.KryoNamespaces;
Jonathan Hart4f178fa2020-02-03 10:46:01 -080037import org.onosproject.store.service.Serializer;
38import org.onosproject.store.service.StorageService;
alshabib36a4d732016-06-01 16:03:59 -070039import org.opencord.olt.AccessDeviceEvent;
40import org.opencord.olt.AccessDeviceListener;
41import org.opencord.olt.AccessDeviceService;
Gamze Abaka641fc072018-09-04 09:16:27 +000042import org.opencord.sadis.BaseInformationService;
43import org.opencord.sadis.SadisService;
Amit Ghosh1ed9aef2018-07-17 17:08:16 +010044import org.opencord.sadis.SubscriberAndDeviceInformation;
Andrea Campanellacbbb7952019-11-25 06:38:41 +000045import org.opencord.sadis.UniTagInformation;
alshabibe0559672016-02-21 14:49:51 -080046import org.osgi.service.component.ComponentContext;
Carmelo Casconeca931162019-07-15 18:22:24 -070047import org.osgi.service.component.annotations.Activate;
48import org.osgi.service.component.annotations.Component;
49import org.osgi.service.component.annotations.Deactivate;
50import org.osgi.service.component.annotations.Modified;
51import org.osgi.service.component.annotations.Reference;
52import org.osgi.service.component.annotations.ReferenceCardinality;
Ilayda Ozdemir90a93622021-02-25 09:40:58 +000053import org.osgi.service.component.annotations.ReferencePolicy;
alshabibf0e7e702015-05-30 18:22:36 -070054import org.slf4j.Logger;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -070055import org.slf4j.LoggerFactory;
alshabibf0e7e702015-05-30 18:22:36 -070056
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010057import java.util.ArrayList;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010058import java.util.Dictionary;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -070059import java.util.Iterator;
60import java.util.LinkedList;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010061import java.util.List;
62import java.util.Map;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010063import java.util.Properties;
64import java.util.Set;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010065import java.util.concurrent.ExecutorService;
66import java.util.concurrent.Executors;
67import java.util.concurrent.LinkedBlockingQueue;
68import java.util.concurrent.ScheduledExecutorService;
69import java.util.concurrent.TimeUnit;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -070070import java.util.concurrent.locks.Lock;
71import java.util.concurrent.locks.ReentrantReadWriteLock;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010072
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010073import static com.google.common.base.Strings.isNullOrEmpty;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010074import static org.onlab.util.Tools.get;
75import static org.onlab.util.Tools.groupedThreads;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -070076import static org.opencord.olt.impl.OltUtils.getPortName;
77import static org.opencord.olt.impl.OltUtils.portWithName;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010078import static org.opencord.olt.impl.OsgiPropertyConstants.*;
alshabibf0e7e702015-05-30 18:22:36 -070079
80/**
Matteo Scandoloaa2adde2021-09-13 12:45:32 -070081 * OLT Application.
alshabibf0e7e702015-05-30 18:22:36 -070082 */
Carmelo Casconeca931162019-07-15 18:22:24 -070083@Component(immediate = true,
84 property = {
Carmelo Casconeca931162019-07-15 18:22:24 -070085 DEFAULT_BP_ID + ":String=" + DEFAULT_BP_ID_DEFAULT,
Andrea Campanellacbbb7952019-11-25 06:38:41 +000086 DEFAULT_MCAST_SERVICE_NAME + ":String=" + DEFAULT_MCAST_SERVICE_NAME_DEFAULT,
Matteo Scandoloaa2adde2021-09-13 12:45:32 -070087 FLOW_PROCESSING_THREADS + ":Integer=" + FLOW_PROCESSING_THREADS_DEFAULT,
88 SUBSCRIBER_PROCESSING_THREADS + ":Integer=" + SUBSCRIBER_PROCESSING_THREADS_DEFAULT,
89 REQUEUE_DELAY + ":Integer=" + REQUEUE_DELAY_DEFAULT
Carmelo Casconeca931162019-07-15 18:22:24 -070090 })
alshabib8e4fd2f2016-01-12 15:55:53 -080091public class Olt
92 extends AbstractListenerManager<AccessDeviceEvent, AccessDeviceListener>
93 implements AccessDeviceService {
alshabibf0e7e702015-05-30 18:22:36 -070094
Carmelo Casconeca931162019-07-15 18:22:24 -070095 @Reference(cardinality = ReferenceCardinality.MANDATORY)
alshabibf0e7e702015-05-30 18:22:36 -070096 protected DeviceService deviceService;
97
Matteo Scandoloaa2adde2021-09-13 12:45:32 -070098 @Reference(cardinality = ReferenceCardinality.MANDATORY)
99 protected ComponentConfigService cfgService;
Andrea Campanella438e1ad2021-03-26 11:41:16 +0100100
Carmelo Casconeca931162019-07-15 18:22:24 -0700101 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700102 protected MastershipService mastershipService;
alshabibf0e7e702015-05-30 18:22:36 -0700103
Andrea Campanella438e1ad2021-03-26 11:41:16 +0100104 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700105 protected ClusterService clusterService;
106
107 @Reference(cardinality = ReferenceCardinality.MANDATORY)
108 protected LeadershipService leadershipService;
Andrea Campanella438e1ad2021-03-26 11:41:16 +0100109
Ilayda Ozdemir90a93622021-02-25 09:40:58 +0000110 @Reference(cardinality = ReferenceCardinality.OPTIONAL,
111 bind = "bindSadisService",
112 unbind = "unbindSadisService",
113 policy = ReferencePolicy.DYNAMIC)
114 protected volatile SadisService sadisService;
Gamze Abaka641fc072018-09-04 09:16:27 +0000115
Carmelo Casconeca931162019-07-15 18:22:24 -0700116 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700117 protected OltDeviceServiceInterface oltDeviceService;
alshabibe0559672016-02-21 14:49:51 -0800118
Carmelo Casconeca931162019-07-15 18:22:24 -0700119 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700120 protected OltFlowServiceInterface oltFlowService;
121
122 @Reference(cardinality = ReferenceCardinality.MANDATORY)
123 protected OltMeterServiceInterface oltMeterService;
Gamze Abakaad329652018-12-20 10:12:21 +0000124
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800125 @Reference(cardinality = ReferenceCardinality.MANDATORY)
126 protected StorageService storageService;
127
128 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700129 protected CoreService coreService;
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800130
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700131 protected ApplicationId appId;
Andrea Campanellaaf39b4c2020-05-13 14:07:44 +0200132
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700133 private static final String ONOS_OLT_SERVICE = "onos/olt-service";
Tunahan Sezena07fe962021-02-24 08:24:24 +0000134
Carmelo Casconeca931162019-07-15 18:22:24 -0700135 /**
Carmelo Cascone95ff5122019-11-14 14:19:13 -0800136 * Default bandwidth profile id that is used for authentication trap flows.
Carmelo Casconeca931162019-07-15 18:22:24 -0700137 **/
138 protected String defaultBpId = DEFAULT_BP_ID_DEFAULT;
Gamze Abakaad329652018-12-20 10:12:21 +0000139
Carmelo Casconeca931162019-07-15 18:22:24 -0700140 /**
Gamze Abaka51a34e82020-05-08 13:03:14 +0000141 * Default multicast service name.
Carmelo Casconeca931162019-07-15 18:22:24 -0700142 **/
Gamze Abaka51a34e82020-05-08 13:03:14 +0000143 protected String multicastServiceName = DEFAULT_MCAST_SERVICE_NAME_DEFAULT;
Gamze Abaka33feef52019-02-27 08:16:47 +0000144
Saurav Das2d3777a2020-08-07 18:48:51 -0700145 /**
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700146 * Number of threads used to process flows.
Saurav Das2d3777a2020-08-07 18:48:51 -0700147 **/
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700148 protected int flowProcessingThreads = FLOW_PROCESSING_THREADS_DEFAULT;
Saurav Das2d3777a2020-08-07 18:48:51 -0700149
Matteo Scandoloa4aaa972020-10-23 15:24:38 -0700150 /**
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700151 * Number of threads used to process flows.
152 **/
153 protected int subscriberProcessingThreads = SUBSCRIBER_PROCESSING_THREADS_DEFAULT;
154
155 /**
156 * Delay in ms to put an event back in the queue, used to avoid retrying things to often if conditions are not met.
157 **/
158 protected int requeueDelay = REQUEUE_DELAY_DEFAULT;
159
160 private final Logger log = LoggerFactory.getLogger(getClass());
161
162 /**
163 * A queue to asynchronously process events.
Matteo Scandoloa4aaa972020-10-23 15:24:38 -0700164 */
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700165 protected Map<ConnectPoint, LinkedBlockingQueue<DiscoveredSubscriber>> eventsQueues;
alshabibf0e7e702015-05-30 18:22:36 -0700166
Gamze Abaka641fc072018-09-04 09:16:27 +0000167 protected BaseInformationService<SubscriberAndDeviceInformation> subsService;
alshabibf0e7e702015-05-30 18:22:36 -0700168
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700169 /**
170 * Listener for OLT devices events.
171 */
172 protected OltDeviceListener deviceListener = new OltDeviceListener();
173 protected ScheduledExecutorService discoveredSubscriberExecutor =
174 Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/olt",
175 "discovered-cp-%d", log));
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100176
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700177 protected ScheduledExecutorService queueExecutor =
178 Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/olt",
179 "discovered-cp-restore-%d", log));
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700180
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700181 /**
182 * Executor used to defer flow provisioning to a different thread pool.
183 */
184 protected ExecutorService flowsExecutor;
Saurav Dasa9d5f442019-03-06 19:32:48 -0800185
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700186 /**
187 * Executor used to defer subscriber handling from API call to a different thread pool.
188 */
189 protected ExecutorService subscriberExecutor;
190
191 private static final String APP_NAME = "org.opencord.olt";
192
193 private final ReentrantReadWriteLock queueLock = new ReentrantReadWriteLock();
194 private final Lock queueWriteLock = queueLock.writeLock();
195 private final Lock queueReadLock = queueLock.readLock();
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700196
alshabibf0e7e702015-05-30 18:22:36 -0700197 @Activate
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700198 protected void activate(ComponentContext context) {
199 cfgService.registerProperties(getClass());
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700200
alshabibe0559672016-02-21 14:49:51 -0800201 modified(context);
Saurav Das62ad75e2019-03-05 12:22:22 -0800202
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700203 appId = coreService.registerApplication(APP_NAME);
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800204 KryoNamespace serializer = KryoNamespace.newBuilder()
205 .register(KryoNamespaces.API)
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700206 .register(ConnectPoint.class)
207 .register(DiscoveredSubscriber.class)
208 .register(DiscoveredSubscriber.Status.class)
209 .register(SubscriberAndDeviceInformation.class)
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800210 .register(UniTagInformation.class)
Ilayda Ozdemir90a93622021-02-25 09:40:58 +0000211 .register(LinkedBlockingQueue.class)
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800212 .build();
213
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700214 eventsQueues = storageService.<ConnectPoint, LinkedBlockingQueue<DiscoveredSubscriber>>consistentMapBuilder()
215 .withName("volt-subscriber-queues")
216 .withApplicationId(appId)
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800217 .withSerializer(Serializer.using(serializer))
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700218 .build().asJavaMap();
alshabib4ceaed32016-03-03 18:00:58 -0800219
alshabibba357492016-01-27 13:49:46 -0800220 deviceService.addListener(deviceListener);
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700221
222 discoveredSubscriberExecutor.execute(this::processDiscoveredSubscribers);
223 eventDispatcher.addSink(AccessDeviceEvent.class, listenerRegistry);
224 log.info("Started");
225
226 deviceListener.handleExistingPorts();
alshabibf0e7e702015-05-30 18:22:36 -0700227 }
228
229 @Deactivate
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700230 protected void deactivate(ComponentContext context) {
231 cfgService.unregisterProperties(getClass(), false);
232 discoveredSubscriberExecutor.shutdown();
Andrea Campanellaeaf23952021-12-30 15:58:54 +0100233 deviceService.removeListener(deviceListener);
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700234 flowsExecutor.shutdown();
235 subscriberExecutor.shutdown();
236 deviceListener.deactivate();
alshabibf0e7e702015-05-30 18:22:36 -0700237 log.info("Stopped");
238 }
239
alshabibe0559672016-02-21 14:49:51 -0800240 @Modified
241 public void modified(ComponentContext context) {
242 Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700243 if (context != null) {
Andrea Campanella971d5b92020-05-07 11:20:43 +0200244 String bpId = get(properties, DEFAULT_BP_ID);
245 defaultBpId = isNullOrEmpty(bpId) ? defaultBpId : bpId;
Gamze Abakaad329652018-12-20 10:12:21 +0000246
Andrea Campanella971d5b92020-05-07 11:20:43 +0200247 String mcastSN = get(properties, DEFAULT_MCAST_SERVICE_NAME);
248 multicastServiceName = isNullOrEmpty(mcastSN) ? multicastServiceName : mcastSN;
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000249
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700250 String flowThreads = get(properties, FLOW_PROCESSING_THREADS);
251 int oldFlowProcessingThreads = flowProcessingThreads;
252 flowProcessingThreads = isNullOrEmpty(flowThreads) ?
253 oldFlowProcessingThreads : Integer.parseInt(flowThreads.trim());
Saurav Das2d3777a2020-08-07 18:48:51 -0700254
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700255 if (flowsExecutor == null || oldFlowProcessingThreads != flowProcessingThreads) {
256 if (flowsExecutor != null) {
257 flowsExecutor.shutdown();
258 }
259 flowsExecutor = Executors.newFixedThreadPool(flowProcessingThreads,
260 groupedThreads(ONOS_OLT_SERVICE,
261 "flows-installer-%d"));
262 }
Gamze Abaka33feef52019-02-27 08:16:47 +0000263
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700264 String subscriberThreads = get(properties, SUBSCRIBER_PROCESSING_THREADS);
265 int oldSubscriberProcessingThreads = subscriberProcessingThreads;
266 subscriberProcessingThreads = isNullOrEmpty(subscriberThreads) ?
267 oldSubscriberProcessingThreads : Integer.parseInt(subscriberThreads.trim());
268
269 if (subscriberExecutor == null || oldSubscriberProcessingThreads != subscriberProcessingThreads) {
270 if (subscriberExecutor != null) {
271 subscriberExecutor.shutdown();
272 }
273 subscriberExecutor = Executors.newFixedThreadPool(subscriberProcessingThreads,
274 groupedThreads(ONOS_OLT_SERVICE,
275 "subscriber-installer-%d"));
276 }
277
278 String queueDelay = get(properties, REQUEUE_DELAY);
279 requeueDelay = isNullOrEmpty(queueDelay) ?
280 REQUEUE_DELAY_DEFAULT : Integer.parseInt(queueDelay.trim());
alshabibe0559672016-02-21 14:49:51 -0800281 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700282 log.info("Modified. Values = {}: {}, {}: {}, " +
283 "{}:{}, {}:{}, {}:{}",
284 DEFAULT_BP_ID, defaultBpId,
285 DEFAULT_MCAST_SERVICE_NAME, multicastServiceName,
286 FLOW_PROCESSING_THREADS, flowProcessingThreads,
287 SUBSCRIBER_PROCESSING_THREADS, subscriberProcessingThreads,
288 REQUEUE_DELAY, requeueDelay);
alshabibe0559672016-02-21 14:49:51 -0800289 }
290
Ilayda Ozdemir90a93622021-02-25 09:40:58 +0000291
alshabib32232c82016-02-25 17:57:24 -0500292 @Override
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700293 public boolean provisionSubscriber(ConnectPoint cp) {
294 subscriberExecutor.submit(() -> {
295 Device device = deviceService.getDevice(cp.deviceId());
296 Port port = deviceService.getPort(device.id(), cp.port());
297 AccessDevicePort accessDevicePort = new AccessDevicePort(port);
Hardik Windlass395ff372019-06-13 05:16:00 +0000298
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700299 if (oltDeviceService.isNniPort(device, port.number())) {
300 log.warn("will not provision a subscriber on the NNI {}", accessDevicePort);
301 return false;
Saurav Das026650f2020-09-21 18:56:35 -0700302 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700303
304 log.info("Provisioning subscriber on {}", accessDevicePort);
305
306 if (oltFlowService.isSubscriberServiceProvisioned(accessDevicePort)) {
307 log.error("Subscriber on {} is already provisioned", accessDevicePort);
308 return false;
309 }
310
311 SubscriberAndDeviceInformation si = subsService.get(getPortName(port));
312 if (si == null) {
313 log.error("Subscriber information not found in sadis for port {}", accessDevicePort);
314 return false;
315 }
316 DiscoveredSubscriber sub = new DiscoveredSubscriber(device, port,
317 DiscoveredSubscriber.Status.ADDED, true, si);
318
319 // NOTE we need to keep a list of the subscribers that are provisioned on a port,
320 // regardless of the flow status
321 si.uniTagList().forEach(uti -> {
322 ServiceKey sk = new ServiceKey(accessDevicePort, uti);
323 oltFlowService.updateProvisionedSubscriberStatus(sk, true);
324 });
325
326 addSubscriberToQueue(sub);
327 return true;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +0100328 });
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700329 //NOTE this only means we have taken the request in, nothing more.
Amit Ghosh31939522018-08-16 13:28:21 +0100330 return true;
alshabibbf23a1f2016-01-14 17:27:11 -0800331 }
332
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000333 @Override
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700334 public boolean removeSubscriber(ConnectPoint cp) {
335 subscriberExecutor.submit(() -> {
336 Device device = deviceService.getDevice(DeviceId.deviceId(cp.deviceId().toString()));
337 Port port = deviceService.getPort(device.id(), cp.port());
338 AccessDevicePort accessDevicePort = new AccessDevicePort(port);
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000339
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700340 if (oltDeviceService.isNniPort(device, port.number())) {
341 log.warn("will not un-provision a subscriber on the NNI {}",
342 accessDevicePort);
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100343 return false;
344 }
345
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700346 log.info("Un-provisioning subscriber on {}", accessDevicePort);
347
348 if (!oltFlowService.isSubscriberServiceProvisioned(accessDevicePort)) {
349 log.error("Subscriber on {} is not provisioned", accessDevicePort);
350 return false;
351 }
352
353 SubscriberAndDeviceInformation si = subsService.get(getPortName(port));
354 if (si == null) {
355 log.error("Subscriber information not found in sadis for port {}",
356 accessDevicePort);
357 // NOTE that we are returning true so that the subscriber is removed from the queue
358 // and we can move on provisioning others
359 return false;
360 }
361 DiscoveredSubscriber sub = new DiscoveredSubscriber(device, port,
362 DiscoveredSubscriber.Status.REMOVED, true, si);
363
364 // NOTE we need to keep a list of the subscribers that are provisioned on a port,
365 // regardless of the flow status
366 si.uniTagList().forEach(uti -> {
367 ServiceKey sk = new ServiceKey(accessDevicePort, uti);
368 oltFlowService.updateProvisionedSubscriberStatus(sk, false);
Gamze Abakaf59c0912019-04-19 08:24:28 +0000369 });
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700370
371 addSubscriberToQueue(sub);
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100372 return true;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700373 });
374 //NOTE this only means we have taken the request in, nothing more.
375 return true;
376 }
377
378 @Override
379 public boolean provisionSubscriber(ConnectPoint cp, VlanId cTag, VlanId sTag, Integer tpId) {
380 log.debug("Provisioning subscriber on {}, with cTag {}, stag {}, tpId {}",
381 cp, cTag, sTag, tpId);
382 Device device = deviceService.getDevice(cp.deviceId());
383 Port port = deviceService.getPort(device.id(), cp.port());
384 AccessDevicePort accessDevicePort = new AccessDevicePort(port);
385
386 if (oltDeviceService.isNniPort(device, port.number())) {
387 log.warn("will not provision a subscriber on the NNI {}", accessDevicePort);
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100388 return false;
389 }
Amit Ghosh95e2f652017-08-23 12:49:46 +0100390
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700391 SubscriberAndDeviceInformation si = new SubscriberAndDeviceInformation();
392 UniTagInformation specificService = getUniTagInformation(getPortName(port), cTag, sTag, tpId);
393 if (specificService == null) {
394 log.error("Can't find Information for subscriber on {}, with cTag {}, " +
395 "stag {}, tpId {}", cp, cTag, sTag, tpId);
Amit Ghosh31939522018-08-16 13:28:21 +0100396 return false;
397 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700398 List<UniTagInformation> uniTagInformationList = new LinkedList<>();
399 uniTagInformationList.add(specificService);
400 si.setUniTagList(uniTagInformationList);
401 DiscoveredSubscriber sub = new DiscoveredSubscriber(device, port,
402 DiscoveredSubscriber.Status.ADDED, true, si);
Amit Ghosh31939522018-08-16 13:28:21 +0100403
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700404 // NOTE we need to keep a list of the subscribers that are provisioned on a port,
405 // regardless of the flow status
406 ServiceKey sk = new ServiceKey(accessDevicePort, specificService);
407 if (oltFlowService.isSubscriberServiceProvisioned(sk)) {
408 log.error("Subscriber on {} is already provisioned", sk);
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100409 return false;
410 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700411 oltFlowService.updateProvisionedSubscriberStatus(sk, true);
412
413 addSubscriberToQueue(sub);
414 return true;
Amit Ghosh31939522018-08-16 13:28:21 +0100415 }
416
417 @Override
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700418 public boolean removeSubscriber(ConnectPoint cp, VlanId cTag, VlanId sTag, Integer tpId) {
419 log.debug("Un-provisioning subscriber on {} with cTag {}, stag {}, tpId {}",
420 cp, cTag, sTag, tpId);
421 Device device = deviceService.getDevice(cp.deviceId());
422 Port port = deviceService.getPort(device.id(), cp.port());
423 AccessDevicePort accessDevicePort = new AccessDevicePort(port);
424
425 if (oltDeviceService.isNniPort(device, port.number())) {
426 log.warn("will not un-provision a subscriber on the NNI {}",
427 accessDevicePort);
428 return false;
429 }
430
431 SubscriberAndDeviceInformation si = new SubscriberAndDeviceInformation();
432 UniTagInformation specificService = getUniTagInformation(getPortName(port), cTag, sTag, tpId);
433 if (specificService == null) {
434 log.error("Can't find Information for subscriber on {}, with cTag {}, " +
435 "stag {}, tpId {}", cp, cTag, sTag, tpId);
436 return false;
437 }
438 List<UniTagInformation> uniTagInformationList = new LinkedList<>();
439 uniTagInformationList.add(specificService);
440 si.setUniTagList(uniTagInformationList);
441 DiscoveredSubscriber sub = new DiscoveredSubscriber(device, port,
442 DiscoveredSubscriber.Status.ADDED, true, si);
443
444 // NOTE we need to keep a list of the subscribers that are provisioned on a port,
445 // regardless of the flow status
446 ServiceKey sk = new ServiceKey(accessDevicePort, specificService);
447 if (!oltFlowService.isSubscriberServiceProvisioned(sk)) {
448 log.error("Subscriber on {} is not provisioned", sk);
449 return false;
450 }
451 oltFlowService.updateProvisionedSubscriberStatus(sk, false);
452
453 addSubscriberToQueue(sub);
454 return true;
Saurav Das82b8e6d2018-10-04 15:25:12 -0700455 }
456
457 @Override
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700458 public List<DeviceId> getConnectedOlts() {
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100459 List<DeviceId> olts = new ArrayList<>();
460 Iterable<Device> devices = deviceService.getDevices();
461 for (Device d : devices) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700462 if (oltDeviceService.isOlt(d)) {
Saurav Das82b8e6d2018-10-04 15:25:12 -0700463 // So this is indeed an OLT device
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100464 olts.add(d.id());
465 }
466 }
467 return olts;
alshabibe0559672016-02-21 14:49:51 -0800468 }
469
Amit Ghosh31939522018-08-16 13:28:21 +0100470 /**
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700471 * Finds the connect-point to which a subscriber is connected.
Amit Ghosh31939522018-08-16 13:28:21 +0100472 *
473 * @param id The id of the subscriber, this is the same ID as in Sadis
474 * @return Subscribers ConnectPoint if found else null
475 */
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700476 @Override
477 public ConnectPoint findSubscriberConnectPoint(String id) {
Amit Ghosh31939522018-08-16 13:28:21 +0100478
479 Iterable<Device> devices = deviceService.getDevices();
480 for (Device d : devices) {
481 for (Port p : deviceService.getPorts(d.id())) {
482 log.trace("Comparing {} with {}", p.annotations().value(AnnotationKeys.PORT_NAME), id);
483 if (p.annotations().value(AnnotationKeys.PORT_NAME).equals(id)) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700484 log.debug("Found on {}", portWithName(p));
Amit Ghosh31939522018-08-16 13:28:21 +0100485 return new ConnectPoint(d.id(), p.number());
486 }
487 }
488 }
489 return null;
490 }
491
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700492 protected void processDiscoveredSubscribers() {
493 log.info("Started processDiscoveredSubscribers loop");
494 while (true) {
495 Set<ConnectPoint> discoveredCps;
496 try {
497 queueReadLock.lock();
498 discoveredCps = eventsQueues.keySet();
499 } catch (Exception e) {
500 log.error("Cannot read keys from queue map", e);
501 return;
502 } finally {
503 queueReadLock.unlock();
504 }
Gamze Abaka641fc072018-09-04 09:16:27 +0000505
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700506 discoveredCps.forEach(cp -> {
507 LinkedBlockingQueue<DiscoveredSubscriber> eventsQueue;
alshabibbf23a1f2016-01-14 17:27:11 -0800508
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700509 try {
510 queueReadLock.lock();
511 eventsQueue = eventsQueues.get(cp);
512 } catch (Exception e) {
513 log.error("Cannot get key from queue map", e);
514 return;
515 } finally {
516 queueReadLock.unlock();
Tunahan Sezena07fe962021-02-24 08:24:24 +0000517 }
Tunahan Sezena07fe962021-02-24 08:24:24 +0000518
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700519 if (!oltDeviceService.isLocalLeader(cp.deviceId())) {
520 // if we're not local leader for this device, ignore this queue
521 if (log.isTraceEnabled()) {
522 log.trace("Ignoring queue on CP {} as not master of the device", cp);
Andrea Campanella7a1d7e72020-11-05 10:40:10 +0100523 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700524 return;
Tunahan Sezena07fe962021-02-24 08:24:24 +0000525 }
526
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700527 flowsExecutor.execute(() -> {
528 if (!eventsQueue.isEmpty()) {
529 // we do not remove the event from the queue until it has been processed
530 // in that way we guarantee that events are processed in order
531 DiscoveredSubscriber sub = eventsQueue.peek();
532 if (sub == null) {
533 // the queue is empty
534 return;
Tunahan Sezena07fe962021-02-24 08:24:24 +0000535 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700536
537 if (log.isTraceEnabled()) {
538 log.trace("Processing subscriber on port {} with status {}",
539 portWithName(sub.port), sub.status);
540 }
541
542 if (sub.hasSubscriber) {
543 // this is a provision subscriber call
544 if (oltFlowService.handleSubscriberFlows(sub, defaultBpId, multicastServiceName)) {
545 if (log.isTraceEnabled()) {
546 log.trace("Provisioning of subscriber on {} completed",
547 portWithName(sub.port));
548 }
549 removeSubscriberFromQueue(sub);
550 }
551 } else {
552 // this is a port event (ENABLED/DISABLED)
553 // means no subscriber was provisioned on that port
554
555 if (!deviceService.isAvailable(sub.device.id()) ||
556 deviceService.getPort(sub.device.id(), sub.port.number()) == null) {
557 // If the device is not connected or the port is not available do nothing
558 // This can happen when we disable and then immediately delete the device,
559 // the queue is populated but the meters and flows are already gone
560 // thus there is nothing left to do
561 return;
562 }
563
564 if (oltFlowService.handleBasicPortFlows(sub, defaultBpId, defaultBpId)) {
565 if (log.isTraceEnabled()) {
566 log.trace("Processing of port {} completed",
567 portWithName(sub.port));
568 }
569 removeSubscriberFromQueue(sub);
570 }
Tunahan Sezena07fe962021-02-24 08:24:24 +0000571 }
572 }
573 });
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700574 });
Tunahan Sezena07fe962021-02-24 08:24:24 +0000575
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700576 try {
577 TimeUnit.MILLISECONDS.sleep(requeueDelay);
578 } catch (InterruptedException e) {
579 continue;
Tunahan Sezena07fe962021-02-24 08:24:24 +0000580 }
581 }
Tunahan Sezena07fe962021-02-24 08:24:24 +0000582 }
583
584 /**
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000585 * Checks the subscriber uni tag list and find the uni tag information.
586 * using the pon c tag, pon s tag and the technology profile id
587 * May return Optional<null>
588 *
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700589 * @param portName port of the subscriber
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000590 * @param innerVlan pon c tag
591 * @param outerVlan pon s tag
592 * @param tpId the technology profile id
593 * @return the found uni tag information
594 */
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700595 private UniTagInformation getUniTagInformation(String portName, VlanId innerVlan,
596 VlanId outerVlan, int tpId) {
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000597 log.debug("Getting uni tag information for {}, innerVlan: {}, outerVlan: {}, tpId: {}",
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700598 portName, innerVlan, outerVlan, tpId);
599 SubscriberAndDeviceInformation subInfo = subsService.get(portName);
Gamze Abakaf59c0912019-04-19 08:24:28 +0000600 if (subInfo == null) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700601 log.warn("Subscriber information doesn't exist for {}", portName);
602 return null;
Gamze Abakaf59c0912019-04-19 08:24:28 +0000603 }
604
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000605 List<UniTagInformation> uniTagList = subInfo.uniTagList();
606 if (uniTagList == null) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700607 log.warn("Uni tag list is not found for the subscriber {} on {}", subInfo.id(), portName);
608 return null;
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000609 }
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100610
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000611 UniTagInformation service = null;
612 for (UniTagInformation tagInfo : subInfo.uniTagList()) {
613 if (innerVlan.equals(tagInfo.getPonCTag()) && outerVlan.equals(tagInfo.getPonSTag())
614 && tpId == tagInfo.getTechnologyProfileId()) {
615 service = tagInfo;
616 break;
Andy Bavier160e8682019-05-07 18:32:22 -0700617 }
Gamze Abaka1efc80c2019-02-15 12:10:54 +0000618 }
Gamze Abaka1efc80c2019-02-15 12:10:54 +0000619
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000620 if (service == null) {
Matteo Scandolo19b56f62020-10-29 13:29:21 -0700621 log.warn("SADIS doesn't include the service with ponCtag {} ponStag {} and tpId {} on {}",
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700622 innerVlan, outerVlan, tpId, portName);
Saurav Das82b8e6d2018-10-04 15:25:12 -0700623 return null;
624 }
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100625
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700626 return service;
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100627 }
628
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700629 protected void bindSadisService(SadisService service) {
630 sadisService = service;
631 subsService = sadisService.getSubscriberInfoService();
632
633 log.info("Sadis-service binds to onos.");
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100634 }
635
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700636 protected void unbindSadisService(SadisService service) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700637 deviceListener = null;
638 sadisService = null;
639 subsService = null;
640 log.info("Sadis-service unbinds from onos.");
Jonathan Hart1d34c8b2018-05-05 15:37:28 -0700641 }
642
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700643 protected void addSubscriberToQueue(DiscoveredSubscriber sub) {
644 ConnectPoint cp = new ConnectPoint(sub.device.id(), sub.port.number());
645 LinkedBlockingQueue<DiscoveredSubscriber> q = null;
646 try {
647 queueReadLock.lock();
648 q = eventsQueues.getOrDefault(cp, new LinkedBlockingQueue<>());
649 } finally {
650 queueReadLock.unlock();
Ilayda Ozdemir90a93622021-02-25 09:40:58 +0000651 }
Matteo Scandolod7aa89c2021-12-07 10:21:34 -0800652
653 log.info("Adding subscriber to queue: {} with status {} and subscriber {}",
654 portWithName(sub.port), sub.status, sub.hasSubscriber);
655 q.add(sub);
656
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700657 try {
658 queueWriteLock.lock();
659 eventsQueues.put(cp, q);
660 } catch (UnsupportedOperationException | ClassCastException |
661 NullPointerException | IllegalArgumentException e) {
662 log.error("Cannot add subscriber to queue: {}", e.getMessage());
663 } finally {
664 queueWriteLock.unlock();
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800665 }
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800666 }
667
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700668 protected void removeSubscriberFromQueue(DiscoveredSubscriber sub) {
669 ConnectPoint cp = new ConnectPoint(sub.device.id(), sub.port.number());
670 LinkedBlockingQueue<DiscoveredSubscriber> q = null;
671 if (log.isTraceEnabled()) {
672 log.trace("removing subscriber {} from queue", sub);
kdarapuaa5da252020-04-10 15:58:05 +0530673 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700674 try {
675 queueReadLock.lock();
676 q = eventsQueues.get(cp);
677 } finally {
678 queueReadLock.unlock();
679 }
680 if (q == null) {
681 log.warn("Cannot find queue for connectPoint {}", cp);
682 return;
683 }
684 boolean removed = q.remove(sub);
685 if (!removed) {
686 log.warn("Subscriber {} has not been removed from queue, is it still there? {}", sub, q);
687 return;
688 } else {
689 log.debug("Subscriber {} has been removed from the queue", sub);
Tunahan Sezena07fe962021-02-24 08:24:24 +0000690 }
691
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700692 try {
693 queueWriteLock.lock();
694 eventsQueues.remove(cp); // am I needed??
695 eventsQueues.put(cp, q);
696 } catch (UnsupportedOperationException | ClassCastException |
697 NullPointerException | IllegalArgumentException e) {
698 log.error("Cannot remove subscriber {} from queue: {}", sub, e.getMessage());
699 } finally {
700 queueWriteLock.unlock();
Tunahan Sezena07fe962021-02-24 08:24:24 +0000701 }
702 }
703
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700704 protected class OltDeviceListener
705 implements DeviceListener {
706
707 private final Logger log = LoggerFactory.getLogger(getClass());
708 protected ExecutorService eventExecutor;
709
710 /**
711 * Builds the listener with all the proper services and information needed.
712 */
713 public OltDeviceListener() {
714 this.eventExecutor = Executors.newFixedThreadPool(flowProcessingThreads,
715 groupedThreads("onos/olt-device-listener-event", "event-%d", log));
716 }
717
718 public void deactivate() {
719 this.eventExecutor.shutdown();
720 }
Saurav Dasa9d5f442019-03-06 19:32:48 -0800721
alshabibf0e7e702015-05-30 18:22:36 -0700722 @Override
723 public void event(DeviceEvent event) {
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700724 eventExecutor.execute(() -> {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700725 boolean isOlt = oltDeviceService.isOlt(event.subject());
726 DeviceId deviceId = event.subject().id();
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700727 switch (event.type()) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700728 case PORT_STATS_UPDATED:
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700729 case DEVICE_ADDED:
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700730 return;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700731 case PORT_ADDED:
732 case PORT_UPDATED:
733 case PORT_REMOVED:
734 if (!isOlt) {
735 log.trace("Ignoring event {}, this is not an OLT device", deviceId);
736 return;
737 }
738 if (!oltDeviceService.isLocalLeader(deviceId)) {
739 log.trace("Device {} is not local to this node", deviceId);
740 return;
741 }
742 // port added, updated and removed are treated in the same way as we only care whether the port
743 // is enabled or not
744 handleOltPort(event.type(), event.subject(), event.port());
745 return;
746 case DEVICE_AVAILABILITY_CHANGED:
747 if (!isOlt) {
748 log.trace("Ignoring event {}, this is not an OLT device", deviceId);
749 return;
750 }
751 if (deviceService.isAvailable(deviceId)) {
752 if (!oltDeviceService.isLocalLeader(deviceId)) {
753 if (log.isTraceEnabled()) {
754 log.trace("Device {} is not local to this node, not handling available device",
755 deviceId);
756 }
757 } else {
758 log.info("Handling available device: {}", deviceId);
759 handleExistingPorts();
760 }
761 } else if (!deviceService.isAvailable(deviceId) && deviceService.getPorts(deviceId).isEmpty()) {
762 // NOTE that upon disconnection there is no mastership on the device,
763 // and we should anyway clear the local cache of the flows/meters across instances.
764 // We're only clearing the device if there are no available ports,
765 // otherwise we assume it's a temporary disconnection
766 log.info("Device {} availability changed to false and ports are empty, " +
767 "purging meters and flows", deviceId);
768 //NOTE all the instances will call these methods
769 oltFlowService.purgeDeviceFlows(deviceId);
770 oltMeterService.purgeDeviceMeters(deviceId);
771 clearQueueForDevice(deviceId);
772 } else {
773 log.info("Device {} availability changed to false, but ports are still available, " +
Matteo Scandolod7aa89c2021-12-07 10:21:34 -0800774 "assuming temporary disconnection.",
775 deviceId);
776 if (log.isTraceEnabled()) {
777 log.trace("Available ports: {}", deviceService.getPorts(deviceId));
778 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700779 }
780 return;
781 case DEVICE_REMOVED:
782 if (!isOlt) {
783 log.trace("Ignoring event {}, this is not an OLT device", deviceId);
784 return;
785 }
786 log.info("Device {} Removed, purging meters and flows", deviceId);
787 oltFlowService.purgeDeviceFlows(deviceId);
788 oltMeterService.purgeDeviceMeters(deviceId);
789 clearQueueForDevice(deviceId);
790 return;
791 default:
792 if (log.isTraceEnabled()) {
793 log.trace("Not handling event: {}, ", event);
794 }
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700795 }
796 });
alshabibf0e7e702015-05-30 18:22:36 -0700797 }
Gamze Abaka838d8142019-02-21 07:06:55 +0000798
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700799 protected void clearQueueForDevice(DeviceId devId) {
800 try {
801 queueWriteLock.lock();
802 Iterator<Map.Entry<ConnectPoint, LinkedBlockingQueue<DiscoveredSubscriber>>> iter =
803 eventsQueues.entrySet().iterator();
804 while (iter.hasNext()) {
805 Map.Entry<ConnectPoint, LinkedBlockingQueue<DiscoveredSubscriber>> entry = iter.next();
806 if (entry.getKey().deviceId().equals(devId)) {
807 eventsQueues.remove(entry.getKey());
808 }
809 }
810 } finally {
811 queueWriteLock.unlock();
Gamze Abaka838d8142019-02-21 07:06:55 +0000812 }
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000813 }
814
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700815 protected void handleOltPort(DeviceEvent.Type type, Device device, Port port) {
816 log.info("OltDeviceListener receives event {} for port {} with status {} on device {}", type,
817 portWithName(port), port.isEnabled() ? "ENABLED" : "DISABLED", device.id());
818 if (port.isEnabled()) {
819 if (oltDeviceService.isNniPort(device, port.number())) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700820 OltFlowService.FlowOperation action = OltFlowService.FlowOperation.ADD;
Matteo Scandolo1f8de332021-12-06 12:18:24 -0800821 // NOTE the NNI is only disabled if the OLT shuts down (reboot or failure).
822 // In that case the flows are purged anyway, so there's no need to deal with them,
823 // it would actually be counter-productive as the openflow connection is severed and they won't
824 // be correctly processed
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700825 if (type == DeviceEvent.Type.PORT_REMOVED) {
Matteo Scandolo1f8de332021-12-06 12:18:24 -0800826 log.debug("NNI port went down, " +
827 "ignoring event as flows will be removed in the generic device cleanup");
828 return;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700829 }
830 oltFlowService.handleNniFlows(device, port, action);
831 } else {
832 post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_ADDED, device.id(), port));
833 // NOTE if the subscriber was previously provisioned,
834 // then add it back to the queue to be re-provisioned
835 boolean provisionSubscriber = oltFlowService.
836 isSubscriberServiceProvisioned(new AccessDevicePort(port));
837 SubscriberAndDeviceInformation si = subsService.get(getPortName(port));
838 if (si == null) {
839 //NOTE this should not happen given that the subscriber was provisioned before
840 log.error("Subscriber information not found in sadis for port {}",
841 portWithName(port));
842 return;
843 }
844
845 DiscoveredSubscriber.Status status = DiscoveredSubscriber.Status.ADDED;
846 if (type == DeviceEvent.Type.PORT_REMOVED) {
847 status = DiscoveredSubscriber.Status.REMOVED;
848 }
849
850 DiscoveredSubscriber sub =
851 new DiscoveredSubscriber(device, port,
852 status, provisionSubscriber, si);
853 addSubscriberToQueue(sub);
854 }
855 } else {
856 if (oltDeviceService.isNniPort(device, port.number())) {
Matteo Scandolob6981dc2021-12-02 16:31:44 -0800857 // NOTE the NNI is only disabled if the OLT shuts down (reboot or failure).
858 // In that case the flows are purged anyway, so there's no need to deal with them,
859 // it would actually be counter-productive as the openflow connection is severed and they won't
860 // be correctly processed
861 log.debug("NNI port went down, " +
862 "ignoring event as flows will be removed in the generic device cleanup");
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700863 } else {
864 post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_REMOVED, device.id(), port));
865 // NOTE we are assuming that if a subscriber has default eapol
866 // it does not have subscriber flows
867 if (oltFlowService.hasDefaultEapol(port)) {
868 SubscriberAndDeviceInformation si = subsService.get(getPortName(port));
869 if (si == null) {
870 //NOTE this should not happen given that the subscriber was provisioned before
871 log.error("Subscriber information not found in sadis for port {}",
872 portWithName(port));
873 return;
874 }
875 DiscoveredSubscriber sub =
876 new DiscoveredSubscriber(device, port,
877 DiscoveredSubscriber.Status.REMOVED, false, si);
878
879 addSubscriberToQueue(sub);
880
881 } else if (oltFlowService.isSubscriberServiceProvisioned(new AccessDevicePort(port))) {
882 SubscriberAndDeviceInformation si = subsService.get(getPortName(port));
883 if (si == null) {
884 //NOTE this should not happen given that the subscriber was provisioned before
885 log.error("Subscriber information not found in sadis for port {}",
886 portWithName(port));
887 return;
888 }
889 DiscoveredSubscriber sub =
890 new DiscoveredSubscriber(device, port,
891 DiscoveredSubscriber.Status.REMOVED, true, si);
892 addSubscriberToQueue(sub);
893 }
894 }
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000895 }
Gamze Abaka838d8142019-02-21 07:06:55 +0000896 }
Gamze Abakada282b42019-03-11 13:16:48 +0000897
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700898 /**
899 * This method is invoked on app activation in order to deal
900 * with devices and ports that are already existing in the system
901 * and thus won't trigger an event.
902 * It is also needed on instance reboot and device reconnect
903 */
904 protected void handleExistingPorts() {
905 Iterable<DeviceId> devices = getConnectedOlts();
906 for (DeviceId deviceId : devices) {
907 log.info("Handling existing OLT Ports for device {}", deviceId);
908 if (oltDeviceService.isLocalLeader(deviceId)) {
909 List<Port> ports = deviceService.getPorts(deviceId);
910 for (Port p : ports) {
911 if (PortNumber.LOCAL.equals(p.number()) || !p.isEnabled()) {
912 continue;
913 }
914 Device device = deviceService.getDevice(deviceId);
915 deviceListener.handleOltPort(DeviceEvent.Type.PORT_UPDATED, device, p);
916 }
917 }
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800918 }
919 }
920 }
Hardik Windlass395ff372019-06-13 05:16:00 +0000921}