blob: c5f7806167d12c5dd240c149629e6b5a11bee24d [file] [log] [blame]
alshabibf0e7e702015-05-30 18:22:36 -07001/*
Matteo Scandoloaa2adde2021-09-13 12:45:32 -07002 * Copyright 2021-present Open Networking Foundation
alshabibf0e7e702015-05-30 18:22:36 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
alshabib36a4d732016-06-01 16:03:59 -070016package org.opencord.olt.impl;
alshabibf0e7e702015-05-30 18:22:36 -070017
alshabibf0e7e702015-05-30 18:22:36 -070018import org.onlab.packet.VlanId;
Jonathan Hart4f178fa2020-02-03 10:46:01 -080019import org.onlab.util.KryoNamespace;
Gamze Abaka1f62dd92020-05-07 08:58:13 +000020import org.onosproject.cfg.ComponentConfigService;
Jonathan Hart4f178fa2020-02-03 10:46:01 -080021import org.onosproject.cluster.ClusterService;
Andrea Campanellaaf39b4c2020-05-13 14:07:44 +020022import org.onosproject.cluster.LeadershipService;
alshabibf0e7e702015-05-30 18:22:36 -070023import org.onosproject.core.ApplicationId;
24import org.onosproject.core.CoreService;
alshabib8e4fd2f2016-01-12 15:55:53 -080025import org.onosproject.event.AbstractListenerManager;
Andrea Campanellaaf39b4c2020-05-13 14:07:44 +020026import org.onosproject.mastership.MastershipService;
Amit Ghosh1ed9aef2018-07-17 17:08:16 +010027import org.onosproject.net.AnnotationKeys;
Jonathan Harte533a422015-10-20 17:31:24 -070028import org.onosproject.net.ConnectPoint;
Amit Ghosh1ed9aef2018-07-17 17:08:16 +010029import org.onosproject.net.Device;
alshabibf0e7e702015-05-30 18:22:36 -070030import org.onosproject.net.DeviceId;
alshabibdec2e252016-01-15 12:20:25 -080031import org.onosproject.net.Port;
alshabibf0e7e702015-05-30 18:22:36 -070032import org.onosproject.net.PortNumber;
33import org.onosproject.net.device.DeviceEvent;
34import org.onosproject.net.device.DeviceListener;
35import org.onosproject.net.device.DeviceService;
Jonathan Hart4f178fa2020-02-03 10:46:01 -080036import org.onosproject.store.serializers.KryoNamespaces;
Jonathan Hart4f178fa2020-02-03 10:46:01 -080037import org.onosproject.store.service.Serializer;
38import org.onosproject.store.service.StorageService;
alshabib36a4d732016-06-01 16:03:59 -070039import org.opencord.olt.AccessDeviceEvent;
40import org.opencord.olt.AccessDeviceListener;
41import org.opencord.olt.AccessDeviceService;
Gamze Abaka641fc072018-09-04 09:16:27 +000042import org.opencord.sadis.BaseInformationService;
43import org.opencord.sadis.SadisService;
Amit Ghosh1ed9aef2018-07-17 17:08:16 +010044import org.opencord.sadis.SubscriberAndDeviceInformation;
Andrea Campanellacbbb7952019-11-25 06:38:41 +000045import org.opencord.sadis.UniTagInformation;
alshabibe0559672016-02-21 14:49:51 -080046import org.osgi.service.component.ComponentContext;
Carmelo Casconeca931162019-07-15 18:22:24 -070047import org.osgi.service.component.annotations.Activate;
48import org.osgi.service.component.annotations.Component;
49import org.osgi.service.component.annotations.Deactivate;
50import org.osgi.service.component.annotations.Modified;
51import org.osgi.service.component.annotations.Reference;
52import org.osgi.service.component.annotations.ReferenceCardinality;
Ilayda Ozdemir90a93622021-02-25 09:40:58 +000053import org.osgi.service.component.annotations.ReferencePolicy;
alshabibf0e7e702015-05-30 18:22:36 -070054import org.slf4j.Logger;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -070055import org.slf4j.LoggerFactory;
alshabibf0e7e702015-05-30 18:22:36 -070056
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010057import java.util.ArrayList;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010058import java.util.Dictionary;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -070059import java.util.Iterator;
60import java.util.LinkedList;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010061import java.util.List;
62import java.util.Map;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010063import java.util.Properties;
64import java.util.Set;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010065import java.util.concurrent.ExecutorService;
66import java.util.concurrent.Executors;
67import java.util.concurrent.LinkedBlockingQueue;
68import java.util.concurrent.ScheduledExecutorService;
69import java.util.concurrent.TimeUnit;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -070070import java.util.concurrent.locks.Lock;
71import java.util.concurrent.locks.ReentrantReadWriteLock;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010072
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010073import static com.google.common.base.Strings.isNullOrEmpty;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010074import static org.onlab.util.Tools.get;
75import static org.onlab.util.Tools.groupedThreads;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -070076import static org.opencord.olt.impl.OltUtils.getPortName;
77import static org.opencord.olt.impl.OltUtils.portWithName;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010078import static org.opencord.olt.impl.OsgiPropertyConstants.*;
alshabibf0e7e702015-05-30 18:22:36 -070079
80/**
Matteo Scandoloaa2adde2021-09-13 12:45:32 -070081 * OLT Application.
alshabibf0e7e702015-05-30 18:22:36 -070082 */
Carmelo Casconeca931162019-07-15 18:22:24 -070083@Component(immediate = true,
84 property = {
Carmelo Casconeca931162019-07-15 18:22:24 -070085 DEFAULT_BP_ID + ":String=" + DEFAULT_BP_ID_DEFAULT,
Andrea Campanellacbbb7952019-11-25 06:38:41 +000086 DEFAULT_MCAST_SERVICE_NAME + ":String=" + DEFAULT_MCAST_SERVICE_NAME_DEFAULT,
Matteo Scandoloaa2adde2021-09-13 12:45:32 -070087 FLOW_PROCESSING_THREADS + ":Integer=" + FLOW_PROCESSING_THREADS_DEFAULT,
88 SUBSCRIBER_PROCESSING_THREADS + ":Integer=" + SUBSCRIBER_PROCESSING_THREADS_DEFAULT,
89 REQUEUE_DELAY + ":Integer=" + REQUEUE_DELAY_DEFAULT
Carmelo Casconeca931162019-07-15 18:22:24 -070090 })
alshabib8e4fd2f2016-01-12 15:55:53 -080091public class Olt
92 extends AbstractListenerManager<AccessDeviceEvent, AccessDeviceListener>
93 implements AccessDeviceService {
alshabibf0e7e702015-05-30 18:22:36 -070094
Carmelo Casconeca931162019-07-15 18:22:24 -070095 @Reference(cardinality = ReferenceCardinality.MANDATORY)
alshabibf0e7e702015-05-30 18:22:36 -070096 protected DeviceService deviceService;
97
Matteo Scandoloaa2adde2021-09-13 12:45:32 -070098 @Reference(cardinality = ReferenceCardinality.MANDATORY)
99 protected ComponentConfigService cfgService;
Andrea Campanella438e1ad2021-03-26 11:41:16 +0100100
Carmelo Casconeca931162019-07-15 18:22:24 -0700101 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700102 protected MastershipService mastershipService;
alshabibf0e7e702015-05-30 18:22:36 -0700103
Andrea Campanella438e1ad2021-03-26 11:41:16 +0100104 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700105 protected ClusterService clusterService;
106
107 @Reference(cardinality = ReferenceCardinality.MANDATORY)
108 protected LeadershipService leadershipService;
Andrea Campanella438e1ad2021-03-26 11:41:16 +0100109
Ilayda Ozdemir90a93622021-02-25 09:40:58 +0000110 @Reference(cardinality = ReferenceCardinality.OPTIONAL,
111 bind = "bindSadisService",
112 unbind = "unbindSadisService",
113 policy = ReferencePolicy.DYNAMIC)
114 protected volatile SadisService sadisService;
Gamze Abaka641fc072018-09-04 09:16:27 +0000115
Carmelo Casconeca931162019-07-15 18:22:24 -0700116 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700117 protected OltDeviceServiceInterface oltDeviceService;
alshabibe0559672016-02-21 14:49:51 -0800118
Carmelo Casconeca931162019-07-15 18:22:24 -0700119 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700120 protected OltFlowServiceInterface oltFlowService;
121
122 @Reference(cardinality = ReferenceCardinality.MANDATORY)
123 protected OltMeterServiceInterface oltMeterService;
Gamze Abakaad329652018-12-20 10:12:21 +0000124
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800125 @Reference(cardinality = ReferenceCardinality.MANDATORY)
126 protected StorageService storageService;
127
128 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700129 protected CoreService coreService;
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800130
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700131 protected ApplicationId appId;
Andrea Campanellaaf39b4c2020-05-13 14:07:44 +0200132
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700133 private static final String ONOS_OLT_SERVICE = "onos/olt-service";
Tunahan Sezena07fe962021-02-24 08:24:24 +0000134
Carmelo Casconeca931162019-07-15 18:22:24 -0700135 /**
Carmelo Cascone95ff5122019-11-14 14:19:13 -0800136 * Default bandwidth profile id that is used for authentication trap flows.
Carmelo Casconeca931162019-07-15 18:22:24 -0700137 **/
138 protected String defaultBpId = DEFAULT_BP_ID_DEFAULT;
Gamze Abakaad329652018-12-20 10:12:21 +0000139
Carmelo Casconeca931162019-07-15 18:22:24 -0700140 /**
Gamze Abaka51a34e82020-05-08 13:03:14 +0000141 * Default multicast service name.
Carmelo Casconeca931162019-07-15 18:22:24 -0700142 **/
Gamze Abaka51a34e82020-05-08 13:03:14 +0000143 protected String multicastServiceName = DEFAULT_MCAST_SERVICE_NAME_DEFAULT;
Gamze Abaka33feef52019-02-27 08:16:47 +0000144
Saurav Das2d3777a2020-08-07 18:48:51 -0700145 /**
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700146 * Number of threads used to process flows.
Saurav Das2d3777a2020-08-07 18:48:51 -0700147 **/
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700148 protected int flowProcessingThreads = FLOW_PROCESSING_THREADS_DEFAULT;
Saurav Das2d3777a2020-08-07 18:48:51 -0700149
Matteo Scandoloa4aaa972020-10-23 15:24:38 -0700150 /**
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700151 * Number of threads used to process flows.
152 **/
153 protected int subscriberProcessingThreads = SUBSCRIBER_PROCESSING_THREADS_DEFAULT;
154
155 /**
156 * Delay in ms to put an event back in the queue, used to avoid retrying things to often if conditions are not met.
157 **/
158 protected int requeueDelay = REQUEUE_DELAY_DEFAULT;
159
160 private final Logger log = LoggerFactory.getLogger(getClass());
161
162 /**
163 * A queue to asynchronously process events.
Matteo Scandoloa4aaa972020-10-23 15:24:38 -0700164 */
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700165 protected Map<ConnectPoint, LinkedBlockingQueue<DiscoveredSubscriber>> eventsQueues;
alshabibf0e7e702015-05-30 18:22:36 -0700166
Gamze Abaka641fc072018-09-04 09:16:27 +0000167 protected BaseInformationService<SubscriberAndDeviceInformation> subsService;
alshabibf0e7e702015-05-30 18:22:36 -0700168
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700169 /**
170 * Listener for OLT devices events.
171 */
172 protected OltDeviceListener deviceListener = new OltDeviceListener();
173 protected ScheduledExecutorService discoveredSubscriberExecutor =
174 Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/olt",
175 "discovered-cp-%d", log));
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100176
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700177 protected ScheduledExecutorService queueExecutor =
178 Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/olt",
179 "discovered-cp-restore-%d", log));
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700180
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700181 /**
182 * Executor used to defer flow provisioning to a different thread pool.
183 */
184 protected ExecutorService flowsExecutor;
Saurav Dasa9d5f442019-03-06 19:32:48 -0800185
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700186 /**
187 * Executor used to defer subscriber handling from API call to a different thread pool.
188 */
189 protected ExecutorService subscriberExecutor;
190
191 private static final String APP_NAME = "org.opencord.olt";
192
193 private final ReentrantReadWriteLock queueLock = new ReentrantReadWriteLock();
194 private final Lock queueWriteLock = queueLock.writeLock();
195 private final Lock queueReadLock = queueLock.readLock();
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700196
alshabibf0e7e702015-05-30 18:22:36 -0700197 @Activate
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700198 protected void activate(ComponentContext context) {
199 cfgService.registerProperties(getClass());
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700200
alshabibe0559672016-02-21 14:49:51 -0800201 modified(context);
Saurav Das62ad75e2019-03-05 12:22:22 -0800202
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700203 appId = coreService.registerApplication(APP_NAME);
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800204 KryoNamespace serializer = KryoNamespace.newBuilder()
205 .register(KryoNamespaces.API)
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700206 .register(ConnectPoint.class)
207 .register(DiscoveredSubscriber.class)
208 .register(DiscoveredSubscriber.Status.class)
209 .register(SubscriberAndDeviceInformation.class)
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800210 .register(UniTagInformation.class)
Ilayda Ozdemir90a93622021-02-25 09:40:58 +0000211 .register(LinkedBlockingQueue.class)
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800212 .build();
213
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700214 eventsQueues = storageService.<ConnectPoint, LinkedBlockingQueue<DiscoveredSubscriber>>consistentMapBuilder()
215 .withName("volt-subscriber-queues")
216 .withApplicationId(appId)
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800217 .withSerializer(Serializer.using(serializer))
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700218 .build().asJavaMap();
alshabib4ceaed32016-03-03 18:00:58 -0800219
alshabibba357492016-01-27 13:49:46 -0800220 deviceService.addListener(deviceListener);
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700221
222 discoveredSubscriberExecutor.execute(this::processDiscoveredSubscribers);
223 eventDispatcher.addSink(AccessDeviceEvent.class, listenerRegistry);
224 log.info("Started");
225
226 deviceListener.handleExistingPorts();
alshabibf0e7e702015-05-30 18:22:36 -0700227 }
228
229 @Deactivate
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700230 protected void deactivate(ComponentContext context) {
231 cfgService.unregisterProperties(getClass(), false);
232 discoveredSubscriberExecutor.shutdown();
233 flowsExecutor.shutdown();
234 subscriberExecutor.shutdown();
235 deviceListener.deactivate();
alshabibf0e7e702015-05-30 18:22:36 -0700236 log.info("Stopped");
237 }
238
alshabibe0559672016-02-21 14:49:51 -0800239 @Modified
240 public void modified(ComponentContext context) {
241 Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700242 if (context != null) {
Andrea Campanella971d5b92020-05-07 11:20:43 +0200243 String bpId = get(properties, DEFAULT_BP_ID);
244 defaultBpId = isNullOrEmpty(bpId) ? defaultBpId : bpId;
Gamze Abakaad329652018-12-20 10:12:21 +0000245
Andrea Campanella971d5b92020-05-07 11:20:43 +0200246 String mcastSN = get(properties, DEFAULT_MCAST_SERVICE_NAME);
247 multicastServiceName = isNullOrEmpty(mcastSN) ? multicastServiceName : mcastSN;
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000248
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700249 String flowThreads = get(properties, FLOW_PROCESSING_THREADS);
250 int oldFlowProcessingThreads = flowProcessingThreads;
251 flowProcessingThreads = isNullOrEmpty(flowThreads) ?
252 oldFlowProcessingThreads : Integer.parseInt(flowThreads.trim());
Saurav Das2d3777a2020-08-07 18:48:51 -0700253
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700254 if (flowsExecutor == null || oldFlowProcessingThreads != flowProcessingThreads) {
255 if (flowsExecutor != null) {
256 flowsExecutor.shutdown();
257 }
258 flowsExecutor = Executors.newFixedThreadPool(flowProcessingThreads,
259 groupedThreads(ONOS_OLT_SERVICE,
260 "flows-installer-%d"));
261 }
Gamze Abaka33feef52019-02-27 08:16:47 +0000262
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700263 String subscriberThreads = get(properties, SUBSCRIBER_PROCESSING_THREADS);
264 int oldSubscriberProcessingThreads = subscriberProcessingThreads;
265 subscriberProcessingThreads = isNullOrEmpty(subscriberThreads) ?
266 oldSubscriberProcessingThreads : Integer.parseInt(subscriberThreads.trim());
267
268 if (subscriberExecutor == null || oldSubscriberProcessingThreads != subscriberProcessingThreads) {
269 if (subscriberExecutor != null) {
270 subscriberExecutor.shutdown();
271 }
272 subscriberExecutor = Executors.newFixedThreadPool(subscriberProcessingThreads,
273 groupedThreads(ONOS_OLT_SERVICE,
274 "subscriber-installer-%d"));
275 }
276
277 String queueDelay = get(properties, REQUEUE_DELAY);
278 requeueDelay = isNullOrEmpty(queueDelay) ?
279 REQUEUE_DELAY_DEFAULT : Integer.parseInt(queueDelay.trim());
alshabibe0559672016-02-21 14:49:51 -0800280 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700281 log.info("Modified. Values = {}: {}, {}: {}, " +
282 "{}:{}, {}:{}, {}:{}",
283 DEFAULT_BP_ID, defaultBpId,
284 DEFAULT_MCAST_SERVICE_NAME, multicastServiceName,
285 FLOW_PROCESSING_THREADS, flowProcessingThreads,
286 SUBSCRIBER_PROCESSING_THREADS, subscriberProcessingThreads,
287 REQUEUE_DELAY, requeueDelay);
alshabibe0559672016-02-21 14:49:51 -0800288 }
289
Ilayda Ozdemir90a93622021-02-25 09:40:58 +0000290
alshabib32232c82016-02-25 17:57:24 -0500291 @Override
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700292 public boolean provisionSubscriber(ConnectPoint cp) {
293 subscriberExecutor.submit(() -> {
294 Device device = deviceService.getDevice(cp.deviceId());
295 Port port = deviceService.getPort(device.id(), cp.port());
296 AccessDevicePort accessDevicePort = new AccessDevicePort(port);
Hardik Windlass395ff372019-06-13 05:16:00 +0000297
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700298 if (oltDeviceService.isNniPort(device, port.number())) {
299 log.warn("will not provision a subscriber on the NNI {}", accessDevicePort);
300 return false;
Saurav Das026650f2020-09-21 18:56:35 -0700301 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700302
303 log.info("Provisioning subscriber on {}", accessDevicePort);
304
305 if (oltFlowService.isSubscriberServiceProvisioned(accessDevicePort)) {
306 log.error("Subscriber on {} is already provisioned", accessDevicePort);
307 return false;
308 }
309
310 SubscriberAndDeviceInformation si = subsService.get(getPortName(port));
311 if (si == null) {
312 log.error("Subscriber information not found in sadis for port {}", accessDevicePort);
313 return false;
314 }
315 DiscoveredSubscriber sub = new DiscoveredSubscriber(device, port,
316 DiscoveredSubscriber.Status.ADDED, true, si);
317
318 // NOTE we need to keep a list of the subscribers that are provisioned on a port,
319 // regardless of the flow status
320 si.uniTagList().forEach(uti -> {
321 ServiceKey sk = new ServiceKey(accessDevicePort, uti);
322 oltFlowService.updateProvisionedSubscriberStatus(sk, true);
323 });
324
325 addSubscriberToQueue(sub);
326 return true;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +0100327 });
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700328 //NOTE this only means we have taken the request in, nothing more.
Amit Ghosh31939522018-08-16 13:28:21 +0100329 return true;
alshabibbf23a1f2016-01-14 17:27:11 -0800330 }
331
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000332 @Override
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700333 public boolean removeSubscriber(ConnectPoint cp) {
334 subscriberExecutor.submit(() -> {
335 Device device = deviceService.getDevice(DeviceId.deviceId(cp.deviceId().toString()));
336 Port port = deviceService.getPort(device.id(), cp.port());
337 AccessDevicePort accessDevicePort = new AccessDevicePort(port);
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000338
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700339 if (oltDeviceService.isNniPort(device, port.number())) {
340 log.warn("will not un-provision a subscriber on the NNI {}",
341 accessDevicePort);
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100342 return false;
343 }
344
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700345 log.info("Un-provisioning subscriber on {}", accessDevicePort);
346
347 if (!oltFlowService.isSubscriberServiceProvisioned(accessDevicePort)) {
348 log.error("Subscriber on {} is not provisioned", accessDevicePort);
349 return false;
350 }
351
352 SubscriberAndDeviceInformation si = subsService.get(getPortName(port));
353 if (si == null) {
354 log.error("Subscriber information not found in sadis for port {}",
355 accessDevicePort);
356 // NOTE that we are returning true so that the subscriber is removed from the queue
357 // and we can move on provisioning others
358 return false;
359 }
360 DiscoveredSubscriber sub = new DiscoveredSubscriber(device, port,
361 DiscoveredSubscriber.Status.REMOVED, true, si);
362
363 // NOTE we need to keep a list of the subscribers that are provisioned on a port,
364 // regardless of the flow status
365 si.uniTagList().forEach(uti -> {
366 ServiceKey sk = new ServiceKey(accessDevicePort, uti);
367 oltFlowService.updateProvisionedSubscriberStatus(sk, false);
Gamze Abakaf59c0912019-04-19 08:24:28 +0000368 });
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700369
370 addSubscriberToQueue(sub);
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100371 return true;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700372 });
373 //NOTE this only means we have taken the request in, nothing more.
374 return true;
375 }
376
377 @Override
378 public boolean provisionSubscriber(ConnectPoint cp, VlanId cTag, VlanId sTag, Integer tpId) {
379 log.debug("Provisioning subscriber on {}, with cTag {}, stag {}, tpId {}",
380 cp, cTag, sTag, tpId);
381 Device device = deviceService.getDevice(cp.deviceId());
382 Port port = deviceService.getPort(device.id(), cp.port());
383 AccessDevicePort accessDevicePort = new AccessDevicePort(port);
384
385 if (oltDeviceService.isNniPort(device, port.number())) {
386 log.warn("will not provision a subscriber on the NNI {}", accessDevicePort);
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100387 return false;
388 }
Amit Ghosh95e2f652017-08-23 12:49:46 +0100389
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700390 SubscriberAndDeviceInformation si = new SubscriberAndDeviceInformation();
391 UniTagInformation specificService = getUniTagInformation(getPortName(port), cTag, sTag, tpId);
392 if (specificService == null) {
393 log.error("Can't find Information for subscriber on {}, with cTag {}, " +
394 "stag {}, tpId {}", cp, cTag, sTag, tpId);
Amit Ghosh31939522018-08-16 13:28:21 +0100395 return false;
396 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700397 List<UniTagInformation> uniTagInformationList = new LinkedList<>();
398 uniTagInformationList.add(specificService);
399 si.setUniTagList(uniTagInformationList);
400 DiscoveredSubscriber sub = new DiscoveredSubscriber(device, port,
401 DiscoveredSubscriber.Status.ADDED, true, si);
Amit Ghosh31939522018-08-16 13:28:21 +0100402
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700403 // NOTE we need to keep a list of the subscribers that are provisioned on a port,
404 // regardless of the flow status
405 ServiceKey sk = new ServiceKey(accessDevicePort, specificService);
406 if (oltFlowService.isSubscriberServiceProvisioned(sk)) {
407 log.error("Subscriber on {} is already provisioned", sk);
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100408 return false;
409 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700410 oltFlowService.updateProvisionedSubscriberStatus(sk, true);
411
412 addSubscriberToQueue(sub);
413 return true;
Amit Ghosh31939522018-08-16 13:28:21 +0100414 }
415
416 @Override
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700417 public boolean removeSubscriber(ConnectPoint cp, VlanId cTag, VlanId sTag, Integer tpId) {
418 log.debug("Un-provisioning subscriber on {} with cTag {}, stag {}, tpId {}",
419 cp, cTag, sTag, tpId);
420 Device device = deviceService.getDevice(cp.deviceId());
421 Port port = deviceService.getPort(device.id(), cp.port());
422 AccessDevicePort accessDevicePort = new AccessDevicePort(port);
423
424 if (oltDeviceService.isNniPort(device, port.number())) {
425 log.warn("will not un-provision a subscriber on the NNI {}",
426 accessDevicePort);
427 return false;
428 }
429
430 SubscriberAndDeviceInformation si = new SubscriberAndDeviceInformation();
431 UniTagInformation specificService = getUniTagInformation(getPortName(port), cTag, sTag, tpId);
432 if (specificService == null) {
433 log.error("Can't find Information for subscriber on {}, with cTag {}, " +
434 "stag {}, tpId {}", cp, cTag, sTag, tpId);
435 return false;
436 }
437 List<UniTagInformation> uniTagInformationList = new LinkedList<>();
438 uniTagInformationList.add(specificService);
439 si.setUniTagList(uniTagInformationList);
440 DiscoveredSubscriber sub = new DiscoveredSubscriber(device, port,
441 DiscoveredSubscriber.Status.ADDED, true, si);
442
443 // NOTE we need to keep a list of the subscribers that are provisioned on a port,
444 // regardless of the flow status
445 ServiceKey sk = new ServiceKey(accessDevicePort, specificService);
446 if (!oltFlowService.isSubscriberServiceProvisioned(sk)) {
447 log.error("Subscriber on {} is not provisioned", sk);
448 return false;
449 }
450 oltFlowService.updateProvisionedSubscriberStatus(sk, false);
451
452 addSubscriberToQueue(sub);
453 return true;
Saurav Das82b8e6d2018-10-04 15:25:12 -0700454 }
455
456 @Override
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700457 public List<DeviceId> getConnectedOlts() {
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100458 List<DeviceId> olts = new ArrayList<>();
459 Iterable<Device> devices = deviceService.getDevices();
460 for (Device d : devices) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700461 if (oltDeviceService.isOlt(d)) {
Saurav Das82b8e6d2018-10-04 15:25:12 -0700462 // So this is indeed an OLT device
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100463 olts.add(d.id());
464 }
465 }
466 return olts;
alshabibe0559672016-02-21 14:49:51 -0800467 }
468
Amit Ghosh31939522018-08-16 13:28:21 +0100469 /**
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700470 * Finds the connect-point to which a subscriber is connected.
Amit Ghosh31939522018-08-16 13:28:21 +0100471 *
472 * @param id The id of the subscriber, this is the same ID as in Sadis
473 * @return Subscribers ConnectPoint if found else null
474 */
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700475 @Override
476 public ConnectPoint findSubscriberConnectPoint(String id) {
Amit Ghosh31939522018-08-16 13:28:21 +0100477
478 Iterable<Device> devices = deviceService.getDevices();
479 for (Device d : devices) {
480 for (Port p : deviceService.getPorts(d.id())) {
481 log.trace("Comparing {} with {}", p.annotations().value(AnnotationKeys.PORT_NAME), id);
482 if (p.annotations().value(AnnotationKeys.PORT_NAME).equals(id)) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700483 log.debug("Found on {}", portWithName(p));
Amit Ghosh31939522018-08-16 13:28:21 +0100484 return new ConnectPoint(d.id(), p.number());
485 }
486 }
487 }
488 return null;
489 }
490
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700491 protected void processDiscoveredSubscribers() {
492 log.info("Started processDiscoveredSubscribers loop");
493 while (true) {
494 Set<ConnectPoint> discoveredCps;
495 try {
496 queueReadLock.lock();
497 discoveredCps = eventsQueues.keySet();
498 } catch (Exception e) {
499 log.error("Cannot read keys from queue map", e);
500 return;
501 } finally {
502 queueReadLock.unlock();
503 }
Gamze Abaka641fc072018-09-04 09:16:27 +0000504
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700505 discoveredCps.forEach(cp -> {
506 LinkedBlockingQueue<DiscoveredSubscriber> eventsQueue;
alshabibbf23a1f2016-01-14 17:27:11 -0800507
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700508 try {
509 queueReadLock.lock();
510 eventsQueue = eventsQueues.get(cp);
511 } catch (Exception e) {
512 log.error("Cannot get key from queue map", e);
513 return;
514 } finally {
515 queueReadLock.unlock();
Tunahan Sezena07fe962021-02-24 08:24:24 +0000516 }
Tunahan Sezena07fe962021-02-24 08:24:24 +0000517
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700518 if (!oltDeviceService.isLocalLeader(cp.deviceId())) {
519 // if we're not local leader for this device, ignore this queue
520 if (log.isTraceEnabled()) {
521 log.trace("Ignoring queue on CP {} as not master of the device", cp);
Andrea Campanella7a1d7e72020-11-05 10:40:10 +0100522 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700523 return;
Tunahan Sezena07fe962021-02-24 08:24:24 +0000524 }
525
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700526 flowsExecutor.execute(() -> {
527 if (!eventsQueue.isEmpty()) {
528 // we do not remove the event from the queue until it has been processed
529 // in that way we guarantee that events are processed in order
530 DiscoveredSubscriber sub = eventsQueue.peek();
531 if (sub == null) {
532 // the queue is empty
533 return;
Tunahan Sezena07fe962021-02-24 08:24:24 +0000534 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700535
536 if (log.isTraceEnabled()) {
537 log.trace("Processing subscriber on port {} with status {}",
538 portWithName(sub.port), sub.status);
539 }
540
541 if (sub.hasSubscriber) {
542 // this is a provision subscriber call
543 if (oltFlowService.handleSubscriberFlows(sub, defaultBpId, multicastServiceName)) {
544 if (log.isTraceEnabled()) {
545 log.trace("Provisioning of subscriber on {} completed",
546 portWithName(sub.port));
547 }
548 removeSubscriberFromQueue(sub);
549 }
550 } else {
551 // this is a port event (ENABLED/DISABLED)
552 // means no subscriber was provisioned on that port
553
554 if (!deviceService.isAvailable(sub.device.id()) ||
555 deviceService.getPort(sub.device.id(), sub.port.number()) == null) {
556 // If the device is not connected or the port is not available do nothing
557 // This can happen when we disable and then immediately delete the device,
558 // the queue is populated but the meters and flows are already gone
559 // thus there is nothing left to do
560 return;
561 }
562
563 if (oltFlowService.handleBasicPortFlows(sub, defaultBpId, defaultBpId)) {
564 if (log.isTraceEnabled()) {
565 log.trace("Processing of port {} completed",
566 portWithName(sub.port));
567 }
568 removeSubscriberFromQueue(sub);
569 }
Tunahan Sezena07fe962021-02-24 08:24:24 +0000570 }
571 }
572 });
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700573 });
Tunahan Sezena07fe962021-02-24 08:24:24 +0000574
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700575 try {
576 TimeUnit.MILLISECONDS.sleep(requeueDelay);
577 } catch (InterruptedException e) {
578 continue;
Tunahan Sezena07fe962021-02-24 08:24:24 +0000579 }
580 }
Tunahan Sezena07fe962021-02-24 08:24:24 +0000581 }
582
583 /**
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000584 * Checks the subscriber uni tag list and find the uni tag information.
585 * using the pon c tag, pon s tag and the technology profile id
586 * May return Optional<null>
587 *
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700588 * @param portName port of the subscriber
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000589 * @param innerVlan pon c tag
590 * @param outerVlan pon s tag
591 * @param tpId the technology profile id
592 * @return the found uni tag information
593 */
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700594 private UniTagInformation getUniTagInformation(String portName, VlanId innerVlan,
595 VlanId outerVlan, int tpId) {
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000596 log.debug("Getting uni tag information for {}, innerVlan: {}, outerVlan: {}, tpId: {}",
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700597 portName, innerVlan, outerVlan, tpId);
598 SubscriberAndDeviceInformation subInfo = subsService.get(portName);
Gamze Abakaf59c0912019-04-19 08:24:28 +0000599 if (subInfo == null) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700600 log.warn("Subscriber information doesn't exist for {}", portName);
601 return null;
Gamze Abakaf59c0912019-04-19 08:24:28 +0000602 }
603
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000604 List<UniTagInformation> uniTagList = subInfo.uniTagList();
605 if (uniTagList == null) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700606 log.warn("Uni tag list is not found for the subscriber {} on {}", subInfo.id(), portName);
607 return null;
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000608 }
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100609
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000610 UniTagInformation service = null;
611 for (UniTagInformation tagInfo : subInfo.uniTagList()) {
612 if (innerVlan.equals(tagInfo.getPonCTag()) && outerVlan.equals(tagInfo.getPonSTag())
613 && tpId == tagInfo.getTechnologyProfileId()) {
614 service = tagInfo;
615 break;
Andy Bavier160e8682019-05-07 18:32:22 -0700616 }
Gamze Abaka1efc80c2019-02-15 12:10:54 +0000617 }
Gamze Abaka1efc80c2019-02-15 12:10:54 +0000618
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000619 if (service == null) {
Matteo Scandolo19b56f62020-10-29 13:29:21 -0700620 log.warn("SADIS doesn't include the service with ponCtag {} ponStag {} and tpId {} on {}",
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700621 innerVlan, outerVlan, tpId, portName);
Saurav Das82b8e6d2018-10-04 15:25:12 -0700622 return null;
623 }
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100624
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700625 return service;
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100626 }
627
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700628 protected void bindSadisService(SadisService service) {
629 sadisService = service;
630 subsService = sadisService.getSubscriberInfoService();
631
632 log.info("Sadis-service binds to onos.");
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100633 }
634
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700635 protected void unbindSadisService(SadisService service) {
636 deviceService.removeListener(deviceListener);
637 deviceListener = null;
638 sadisService = null;
639 subsService = null;
640 log.info("Sadis-service unbinds from onos.");
Jonathan Hart1d34c8b2018-05-05 15:37:28 -0700641 }
642
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700643 protected void addSubscriberToQueue(DiscoveredSubscriber sub) {
644 ConnectPoint cp = new ConnectPoint(sub.device.id(), sub.port.number());
645 LinkedBlockingQueue<DiscoveredSubscriber> q = null;
646 try {
647 queueReadLock.lock();
648 q = eventsQueues.getOrDefault(cp, new LinkedBlockingQueue<>());
649 } finally {
650 queueReadLock.unlock();
Ilayda Ozdemir90a93622021-02-25 09:40:58 +0000651 }
Matteo Scandolod7aa89c2021-12-07 10:21:34 -0800652
653 log.info("Adding subscriber to queue: {} with status {} and subscriber {}",
654 portWithName(sub.port), sub.status, sub.hasSubscriber);
655 q.add(sub);
656
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700657 try {
658 queueWriteLock.lock();
659 eventsQueues.put(cp, q);
660 } catch (UnsupportedOperationException | ClassCastException |
661 NullPointerException | IllegalArgumentException e) {
662 log.error("Cannot add subscriber to queue: {}", e.getMessage());
663 } finally {
664 queueWriteLock.unlock();
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800665 }
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800666 }
667
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700668 protected void removeSubscriberFromQueue(DiscoveredSubscriber sub) {
669 ConnectPoint cp = new ConnectPoint(sub.device.id(), sub.port.number());
670 LinkedBlockingQueue<DiscoveredSubscriber> q = null;
671 if (log.isTraceEnabled()) {
672 log.trace("removing subscriber {} from queue", sub);
kdarapuaa5da252020-04-10 15:58:05 +0530673 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700674 try {
675 queueReadLock.lock();
676 q = eventsQueues.get(cp);
677 } finally {
678 queueReadLock.unlock();
679 }
680 if (q == null) {
681 log.warn("Cannot find queue for connectPoint {}", cp);
682 return;
683 }
684 boolean removed = q.remove(sub);
685 if (!removed) {
686 log.warn("Subscriber {} has not been removed from queue, is it still there? {}", sub, q);
687 return;
688 } else {
689 log.debug("Subscriber {} has been removed from the queue", sub);
Tunahan Sezena07fe962021-02-24 08:24:24 +0000690 }
691
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700692 try {
693 queueWriteLock.lock();
694 eventsQueues.remove(cp); // am I needed??
695 eventsQueues.put(cp, q);
696 } catch (UnsupportedOperationException | ClassCastException |
697 NullPointerException | IllegalArgumentException e) {
698 log.error("Cannot remove subscriber {} from queue: {}", sub, e.getMessage());
699 } finally {
700 queueWriteLock.unlock();
Tunahan Sezena07fe962021-02-24 08:24:24 +0000701 }
702 }
703
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700704 protected class OltDeviceListener
705 implements DeviceListener {
706
707 private final Logger log = LoggerFactory.getLogger(getClass());
708 protected ExecutorService eventExecutor;
709
710 /**
711 * Builds the listener with all the proper services and information needed.
712 */
713 public OltDeviceListener() {
714 this.eventExecutor = Executors.newFixedThreadPool(flowProcessingThreads,
715 groupedThreads("onos/olt-device-listener-event", "event-%d", log));
716 }
717
718 public void deactivate() {
719 this.eventExecutor.shutdown();
720 }
Saurav Dasa9d5f442019-03-06 19:32:48 -0800721
alshabibf0e7e702015-05-30 18:22:36 -0700722 @Override
723 public void event(DeviceEvent event) {
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700724 eventExecutor.execute(() -> {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700725 boolean isOlt = oltDeviceService.isOlt(event.subject());
726 DeviceId deviceId = event.subject().id();
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700727 switch (event.type()) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700728 case PORT_STATS_UPDATED:
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700729 case DEVICE_ADDED:
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700730 return;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700731 case PORT_ADDED:
732 case PORT_UPDATED:
733 case PORT_REMOVED:
734 if (!isOlt) {
735 log.trace("Ignoring event {}, this is not an OLT device", deviceId);
736 return;
737 }
738 if (!oltDeviceService.isLocalLeader(deviceId)) {
739 log.trace("Device {} is not local to this node", deviceId);
740 return;
741 }
742 // port added, updated and removed are treated in the same way as we only care whether the port
743 // is enabled or not
744 handleOltPort(event.type(), event.subject(), event.port());
745 return;
746 case DEVICE_AVAILABILITY_CHANGED:
747 if (!isOlt) {
748 log.trace("Ignoring event {}, this is not an OLT device", deviceId);
749 return;
750 }
751 if (deviceService.isAvailable(deviceId)) {
752 if (!oltDeviceService.isLocalLeader(deviceId)) {
753 if (log.isTraceEnabled()) {
754 log.trace("Device {} is not local to this node, not handling available device",
755 deviceId);
756 }
757 } else {
758 log.info("Handling available device: {}", deviceId);
759 handleExistingPorts();
760 }
761 } else if (!deviceService.isAvailable(deviceId) && deviceService.getPorts(deviceId).isEmpty()) {
762 // NOTE that upon disconnection there is no mastership on the device,
763 // and we should anyway clear the local cache of the flows/meters across instances.
764 // We're only clearing the device if there are no available ports,
765 // otherwise we assume it's a temporary disconnection
766 log.info("Device {} availability changed to false and ports are empty, " +
767 "purging meters and flows", deviceId);
768 //NOTE all the instances will call these methods
769 oltFlowService.purgeDeviceFlows(deviceId);
770 oltMeterService.purgeDeviceMeters(deviceId);
771 clearQueueForDevice(deviceId);
772 } else {
773 log.info("Device {} availability changed to false, but ports are still available, " +
Matteo Scandolod7aa89c2021-12-07 10:21:34 -0800774 "assuming temporary disconnection.",
775 deviceId);
776 if (log.isTraceEnabled()) {
777 log.trace("Available ports: {}", deviceService.getPorts(deviceId));
778 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700779 }
780 return;
781 case DEVICE_REMOVED:
782 if (!isOlt) {
783 log.trace("Ignoring event {}, this is not an OLT device", deviceId);
784 return;
785 }
786 log.info("Device {} Removed, purging meters and flows", deviceId);
787 oltFlowService.purgeDeviceFlows(deviceId);
788 oltMeterService.purgeDeviceMeters(deviceId);
789 clearQueueForDevice(deviceId);
790 return;
791 default:
792 if (log.isTraceEnabled()) {
793 log.trace("Not handling event: {}, ", event);
794 }
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700795 }
796 });
alshabibf0e7e702015-05-30 18:22:36 -0700797 }
Gamze Abaka838d8142019-02-21 07:06:55 +0000798
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700799 protected void clearQueueForDevice(DeviceId devId) {
800 try {
801 queueWriteLock.lock();
802 Iterator<Map.Entry<ConnectPoint, LinkedBlockingQueue<DiscoveredSubscriber>>> iter =
803 eventsQueues.entrySet().iterator();
804 while (iter.hasNext()) {
805 Map.Entry<ConnectPoint, LinkedBlockingQueue<DiscoveredSubscriber>> entry = iter.next();
806 if (entry.getKey().deviceId().equals(devId)) {
807 eventsQueues.remove(entry.getKey());
808 }
809 }
810 } finally {
811 queueWriteLock.unlock();
Gamze Abaka838d8142019-02-21 07:06:55 +0000812 }
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000813 }
814
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700815 protected void handleOltPort(DeviceEvent.Type type, Device device, Port port) {
816 log.info("OltDeviceListener receives event {} for port {} with status {} on device {}", type,
817 portWithName(port), port.isEnabled() ? "ENABLED" : "DISABLED", device.id());
818 if (port.isEnabled()) {
819 if (oltDeviceService.isNniPort(device, port.number())) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700820 OltFlowService.FlowOperation action = OltFlowService.FlowOperation.ADD;
Matteo Scandolo1f8de332021-12-06 12:18:24 -0800821 // NOTE the NNI is only disabled if the OLT shuts down (reboot or failure).
822 // In that case the flows are purged anyway, so there's no need to deal with them,
823 // it would actually be counter-productive as the openflow connection is severed and they won't
824 // be correctly processed
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700825 if (type == DeviceEvent.Type.PORT_REMOVED) {
Matteo Scandolo1f8de332021-12-06 12:18:24 -0800826 log.debug("NNI port went down, " +
827 "ignoring event as flows will be removed in the generic device cleanup");
828 return;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700829 }
830 oltFlowService.handleNniFlows(device, port, action);
831 } else {
832 post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_ADDED, device.id(), port));
833 // NOTE if the subscriber was previously provisioned,
834 // then add it back to the queue to be re-provisioned
835 boolean provisionSubscriber = oltFlowService.
836 isSubscriberServiceProvisioned(new AccessDevicePort(port));
837 SubscriberAndDeviceInformation si = subsService.get(getPortName(port));
838 if (si == null) {
839 //NOTE this should not happen given that the subscriber was provisioned before
840 log.error("Subscriber information not found in sadis for port {}",
841 portWithName(port));
842 return;
843 }
844
845 DiscoveredSubscriber.Status status = DiscoveredSubscriber.Status.ADDED;
846 if (type == DeviceEvent.Type.PORT_REMOVED) {
847 status = DiscoveredSubscriber.Status.REMOVED;
848 }
849
850 DiscoveredSubscriber sub =
851 new DiscoveredSubscriber(device, port,
852 status, provisionSubscriber, si);
853 addSubscriberToQueue(sub);
854 }
855 } else {
856 if (oltDeviceService.isNniPort(device, port.number())) {
Matteo Scandolob6981dc2021-12-02 16:31:44 -0800857 // NOTE the NNI is only disabled if the OLT shuts down (reboot or failure).
858 // In that case the flows are purged anyway, so there's no need to deal with them,
859 // it would actually be counter-productive as the openflow connection is severed and they won't
860 // be correctly processed
861 log.debug("NNI port went down, " +
862 "ignoring event as flows will be removed in the generic device cleanup");
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700863 } else {
864 post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_REMOVED, device.id(), port));
865 // NOTE we are assuming that if a subscriber has default eapol
866 // it does not have subscriber flows
867 if (oltFlowService.hasDefaultEapol(port)) {
868 SubscriberAndDeviceInformation si = subsService.get(getPortName(port));
869 if (si == null) {
870 //NOTE this should not happen given that the subscriber was provisioned before
871 log.error("Subscriber information not found in sadis for port {}",
872 portWithName(port));
873 return;
874 }
875 DiscoveredSubscriber sub =
876 new DiscoveredSubscriber(device, port,
877 DiscoveredSubscriber.Status.REMOVED, false, si);
878
879 addSubscriberToQueue(sub);
880
881 } else if (oltFlowService.isSubscriberServiceProvisioned(new AccessDevicePort(port))) {
882 SubscriberAndDeviceInformation si = subsService.get(getPortName(port));
883 if (si == null) {
884 //NOTE this should not happen given that the subscriber was provisioned before
885 log.error("Subscriber information not found in sadis for port {}",
886 portWithName(port));
887 return;
888 }
889 DiscoveredSubscriber sub =
890 new DiscoveredSubscriber(device, port,
891 DiscoveredSubscriber.Status.REMOVED, true, si);
892 addSubscriberToQueue(sub);
893 }
894 }
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000895 }
Gamze Abaka838d8142019-02-21 07:06:55 +0000896 }
Gamze Abakada282b42019-03-11 13:16:48 +0000897
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700898 /**
899 * This method is invoked on app activation in order to deal
900 * with devices and ports that are already existing in the system
901 * and thus won't trigger an event.
902 * It is also needed on instance reboot and device reconnect
903 */
904 protected void handleExistingPorts() {
905 Iterable<DeviceId> devices = getConnectedOlts();
906 for (DeviceId deviceId : devices) {
907 log.info("Handling existing OLT Ports for device {}", deviceId);
908 if (oltDeviceService.isLocalLeader(deviceId)) {
909 List<Port> ports = deviceService.getPorts(deviceId);
910 for (Port p : ports) {
911 if (PortNumber.LOCAL.equals(p.number()) || !p.isEnabled()) {
912 continue;
913 }
914 Device device = deviceService.getDevice(deviceId);
915 deviceListener.handleOltPort(DeviceEvent.Type.PORT_UPDATED, device, p);
916 }
917 }
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800918 }
919 }
920 }
Hardik Windlass395ff372019-06-13 05:16:00 +0000921}