blob: 747a0b4255d6c2e3a6fef63b502af10a517d624c [file] [log] [blame]
alshabibf0e7e702015-05-30 18:22:36 -07001/*
Matteo Scandoloaa2adde2021-09-13 12:45:32 -07002 * Copyright 2021-present Open Networking Foundation
alshabibf0e7e702015-05-30 18:22:36 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
alshabib36a4d732016-06-01 16:03:59 -070016package org.opencord.olt.impl;
alshabibf0e7e702015-05-30 18:22:36 -070017
alshabibf0e7e702015-05-30 18:22:36 -070018import org.onlab.packet.VlanId;
Jonathan Hart4f178fa2020-02-03 10:46:01 -080019import org.onlab.util.KryoNamespace;
Gamze Abaka1f62dd92020-05-07 08:58:13 +000020import org.onosproject.cfg.ComponentConfigService;
Jonathan Hart4f178fa2020-02-03 10:46:01 -080021import org.onosproject.cluster.ClusterService;
Andrea Campanellaaf39b4c2020-05-13 14:07:44 +020022import org.onosproject.cluster.LeadershipService;
alshabibf0e7e702015-05-30 18:22:36 -070023import org.onosproject.core.ApplicationId;
24import org.onosproject.core.CoreService;
alshabib8e4fd2f2016-01-12 15:55:53 -080025import org.onosproject.event.AbstractListenerManager;
Andrea Campanellaaf39b4c2020-05-13 14:07:44 +020026import org.onosproject.mastership.MastershipService;
Amit Ghosh1ed9aef2018-07-17 17:08:16 +010027import org.onosproject.net.AnnotationKeys;
Jonathan Harte533a422015-10-20 17:31:24 -070028import org.onosproject.net.ConnectPoint;
Amit Ghosh1ed9aef2018-07-17 17:08:16 +010029import org.onosproject.net.Device;
alshabibf0e7e702015-05-30 18:22:36 -070030import org.onosproject.net.DeviceId;
alshabibdec2e252016-01-15 12:20:25 -080031import org.onosproject.net.Port;
alshabibf0e7e702015-05-30 18:22:36 -070032import org.onosproject.net.PortNumber;
33import org.onosproject.net.device.DeviceEvent;
34import org.onosproject.net.device.DeviceListener;
35import org.onosproject.net.device.DeviceService;
Jonathan Hart4f178fa2020-02-03 10:46:01 -080036import org.onosproject.store.serializers.KryoNamespaces;
Jonathan Hart4f178fa2020-02-03 10:46:01 -080037import org.onosproject.store.service.Serializer;
38import org.onosproject.store.service.StorageService;
alshabib36a4d732016-06-01 16:03:59 -070039import org.opencord.olt.AccessDeviceEvent;
40import org.opencord.olt.AccessDeviceListener;
41import org.opencord.olt.AccessDeviceService;
Gamze Abaka641fc072018-09-04 09:16:27 +000042import org.opencord.sadis.BaseInformationService;
43import org.opencord.sadis.SadisService;
Amit Ghosh1ed9aef2018-07-17 17:08:16 +010044import org.opencord.sadis.SubscriberAndDeviceInformation;
Andrea Campanellacbbb7952019-11-25 06:38:41 +000045import org.opencord.sadis.UniTagInformation;
alshabibe0559672016-02-21 14:49:51 -080046import org.osgi.service.component.ComponentContext;
Carmelo Casconeca931162019-07-15 18:22:24 -070047import org.osgi.service.component.annotations.Activate;
48import org.osgi.service.component.annotations.Component;
49import org.osgi.service.component.annotations.Deactivate;
50import org.osgi.service.component.annotations.Modified;
51import org.osgi.service.component.annotations.Reference;
52import org.osgi.service.component.annotations.ReferenceCardinality;
Ilayda Ozdemir90a93622021-02-25 09:40:58 +000053import org.osgi.service.component.annotations.ReferencePolicy;
alshabibf0e7e702015-05-30 18:22:36 -070054import org.slf4j.Logger;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -070055import org.slf4j.LoggerFactory;
alshabibf0e7e702015-05-30 18:22:36 -070056
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010057import java.util.ArrayList;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010058import java.util.Dictionary;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -070059import java.util.Iterator;
60import java.util.LinkedList;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010061import java.util.List;
62import java.util.Map;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010063import java.util.Properties;
64import java.util.Set;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010065import java.util.concurrent.ExecutorService;
66import java.util.concurrent.Executors;
67import java.util.concurrent.LinkedBlockingQueue;
68import java.util.concurrent.ScheduledExecutorService;
69import java.util.concurrent.TimeUnit;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -070070import java.util.concurrent.locks.Lock;
71import java.util.concurrent.locks.ReentrantReadWriteLock;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010072
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010073import static com.google.common.base.Strings.isNullOrEmpty;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010074import static org.onlab.util.Tools.get;
75import static org.onlab.util.Tools.groupedThreads;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -070076import static org.opencord.olt.impl.OltUtils.getPortName;
77import static org.opencord.olt.impl.OltUtils.portWithName;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010078import static org.opencord.olt.impl.OsgiPropertyConstants.*;
alshabibf0e7e702015-05-30 18:22:36 -070079
80/**
Matteo Scandoloaa2adde2021-09-13 12:45:32 -070081 * OLT Application.
alshabibf0e7e702015-05-30 18:22:36 -070082 */
Carmelo Casconeca931162019-07-15 18:22:24 -070083@Component(immediate = true,
84 property = {
Carmelo Casconeca931162019-07-15 18:22:24 -070085 DEFAULT_BP_ID + ":String=" + DEFAULT_BP_ID_DEFAULT,
Andrea Campanellacbbb7952019-11-25 06:38:41 +000086 DEFAULT_MCAST_SERVICE_NAME + ":String=" + DEFAULT_MCAST_SERVICE_NAME_DEFAULT,
Matteo Scandoloaa2adde2021-09-13 12:45:32 -070087 FLOW_PROCESSING_THREADS + ":Integer=" + FLOW_PROCESSING_THREADS_DEFAULT,
88 SUBSCRIBER_PROCESSING_THREADS + ":Integer=" + SUBSCRIBER_PROCESSING_THREADS_DEFAULT,
89 REQUEUE_DELAY + ":Integer=" + REQUEUE_DELAY_DEFAULT
Carmelo Casconeca931162019-07-15 18:22:24 -070090 })
alshabib8e4fd2f2016-01-12 15:55:53 -080091public class Olt
92 extends AbstractListenerManager<AccessDeviceEvent, AccessDeviceListener>
93 implements AccessDeviceService {
alshabibf0e7e702015-05-30 18:22:36 -070094
Carmelo Casconeca931162019-07-15 18:22:24 -070095 @Reference(cardinality = ReferenceCardinality.MANDATORY)
alshabibf0e7e702015-05-30 18:22:36 -070096 protected DeviceService deviceService;
97
Matteo Scandoloaa2adde2021-09-13 12:45:32 -070098 @Reference(cardinality = ReferenceCardinality.MANDATORY)
99 protected ComponentConfigService cfgService;
Andrea Campanella438e1ad2021-03-26 11:41:16 +0100100
Carmelo Casconeca931162019-07-15 18:22:24 -0700101 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700102 protected MastershipService mastershipService;
alshabibf0e7e702015-05-30 18:22:36 -0700103
Andrea Campanella438e1ad2021-03-26 11:41:16 +0100104 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700105 protected ClusterService clusterService;
106
107 @Reference(cardinality = ReferenceCardinality.MANDATORY)
108 protected LeadershipService leadershipService;
Andrea Campanella438e1ad2021-03-26 11:41:16 +0100109
Ilayda Ozdemir90a93622021-02-25 09:40:58 +0000110 @Reference(cardinality = ReferenceCardinality.OPTIONAL,
111 bind = "bindSadisService",
112 unbind = "unbindSadisService",
113 policy = ReferencePolicy.DYNAMIC)
114 protected volatile SadisService sadisService;
Gamze Abaka641fc072018-09-04 09:16:27 +0000115
Carmelo Casconeca931162019-07-15 18:22:24 -0700116 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700117 protected OltDeviceServiceInterface oltDeviceService;
alshabibe0559672016-02-21 14:49:51 -0800118
Carmelo Casconeca931162019-07-15 18:22:24 -0700119 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700120 protected OltFlowServiceInterface oltFlowService;
121
122 @Reference(cardinality = ReferenceCardinality.MANDATORY)
123 protected OltMeterServiceInterface oltMeterService;
Gamze Abakaad329652018-12-20 10:12:21 +0000124
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800125 @Reference(cardinality = ReferenceCardinality.MANDATORY)
126 protected StorageService storageService;
127
128 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700129 protected CoreService coreService;
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800130
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700131 protected ApplicationId appId;
Andrea Campanellaaf39b4c2020-05-13 14:07:44 +0200132
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700133 private static final String ONOS_OLT_SERVICE = "onos/olt-service";
Tunahan Sezena07fe962021-02-24 08:24:24 +0000134
Carmelo Casconeca931162019-07-15 18:22:24 -0700135 /**
Carmelo Cascone95ff5122019-11-14 14:19:13 -0800136 * Default bandwidth profile id that is used for authentication trap flows.
Carmelo Casconeca931162019-07-15 18:22:24 -0700137 **/
138 protected String defaultBpId = DEFAULT_BP_ID_DEFAULT;
Gamze Abakaad329652018-12-20 10:12:21 +0000139
Carmelo Casconeca931162019-07-15 18:22:24 -0700140 /**
Gamze Abaka51a34e82020-05-08 13:03:14 +0000141 * Default multicast service name.
Carmelo Casconeca931162019-07-15 18:22:24 -0700142 **/
Gamze Abaka51a34e82020-05-08 13:03:14 +0000143 protected String multicastServiceName = DEFAULT_MCAST_SERVICE_NAME_DEFAULT;
Gamze Abaka33feef52019-02-27 08:16:47 +0000144
Saurav Das2d3777a2020-08-07 18:48:51 -0700145 /**
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700146 * Number of threads used to process flows.
Saurav Das2d3777a2020-08-07 18:48:51 -0700147 **/
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700148 protected int flowProcessingThreads = FLOW_PROCESSING_THREADS_DEFAULT;
Saurav Das2d3777a2020-08-07 18:48:51 -0700149
Matteo Scandoloa4aaa972020-10-23 15:24:38 -0700150 /**
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700151 * Number of threads used to process flows.
152 **/
153 protected int subscriberProcessingThreads = SUBSCRIBER_PROCESSING_THREADS_DEFAULT;
154
155 /**
156 * Delay in ms to put an event back in the queue, used to avoid retrying things to often if conditions are not met.
157 **/
158 protected int requeueDelay = REQUEUE_DELAY_DEFAULT;
159
160 private final Logger log = LoggerFactory.getLogger(getClass());
161
162 /**
163 * A queue to asynchronously process events.
Matteo Scandoloa4aaa972020-10-23 15:24:38 -0700164 */
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700165 protected Map<ConnectPoint, LinkedBlockingQueue<DiscoveredSubscriber>> eventsQueues;
alshabibf0e7e702015-05-30 18:22:36 -0700166
Gamze Abaka641fc072018-09-04 09:16:27 +0000167 protected BaseInformationService<SubscriberAndDeviceInformation> subsService;
alshabibf0e7e702015-05-30 18:22:36 -0700168
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700169 /**
170 * Listener for OLT devices events.
171 */
172 protected OltDeviceListener deviceListener = new OltDeviceListener();
173 protected ScheduledExecutorService discoveredSubscriberExecutor =
174 Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/olt",
175 "discovered-cp-%d", log));
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100176
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700177 protected ScheduledExecutorService queueExecutor =
178 Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/olt",
179 "discovered-cp-restore-%d", log));
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700180
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700181 /**
182 * Executor used to defer flow provisioning to a different thread pool.
183 */
184 protected ExecutorService flowsExecutor;
Saurav Dasa9d5f442019-03-06 19:32:48 -0800185
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700186 /**
187 * Executor used to defer subscriber handling from API call to a different thread pool.
188 */
189 protected ExecutorService subscriberExecutor;
190
191 private static final String APP_NAME = "org.opencord.olt";
192
193 private final ReentrantReadWriteLock queueLock = new ReentrantReadWriteLock();
194 private final Lock queueWriteLock = queueLock.writeLock();
195 private final Lock queueReadLock = queueLock.readLock();
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700196
alshabibf0e7e702015-05-30 18:22:36 -0700197 @Activate
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700198 protected void activate(ComponentContext context) {
199 cfgService.registerProperties(getClass());
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700200
alshabibe0559672016-02-21 14:49:51 -0800201 modified(context);
Saurav Das62ad75e2019-03-05 12:22:22 -0800202
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700203 appId = coreService.registerApplication(APP_NAME);
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800204 KryoNamespace serializer = KryoNamespace.newBuilder()
205 .register(KryoNamespaces.API)
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700206 .register(ConnectPoint.class)
207 .register(DiscoveredSubscriber.class)
208 .register(DiscoveredSubscriber.Status.class)
209 .register(SubscriberAndDeviceInformation.class)
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800210 .register(UniTagInformation.class)
Ilayda Ozdemir90a93622021-02-25 09:40:58 +0000211 .register(LinkedBlockingQueue.class)
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800212 .build();
213
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700214 eventsQueues = storageService.<ConnectPoint, LinkedBlockingQueue<DiscoveredSubscriber>>consistentMapBuilder()
215 .withName("volt-subscriber-queues")
216 .withApplicationId(appId)
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800217 .withSerializer(Serializer.using(serializer))
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700218 .build().asJavaMap();
alshabib4ceaed32016-03-03 18:00:58 -0800219
alshabibba357492016-01-27 13:49:46 -0800220 deviceService.addListener(deviceListener);
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700221
222 discoveredSubscriberExecutor.execute(this::processDiscoveredSubscribers);
223 eventDispatcher.addSink(AccessDeviceEvent.class, listenerRegistry);
224 log.info("Started");
225
226 deviceListener.handleExistingPorts();
alshabibf0e7e702015-05-30 18:22:36 -0700227 }
228
229 @Deactivate
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700230 protected void deactivate(ComponentContext context) {
231 cfgService.unregisterProperties(getClass(), false);
232 discoveredSubscriberExecutor.shutdown();
233 flowsExecutor.shutdown();
234 subscriberExecutor.shutdown();
235 deviceListener.deactivate();
alshabibf0e7e702015-05-30 18:22:36 -0700236 log.info("Stopped");
237 }
238
alshabibe0559672016-02-21 14:49:51 -0800239 @Modified
240 public void modified(ComponentContext context) {
241 Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700242 if (context != null) {
Andrea Campanella971d5b92020-05-07 11:20:43 +0200243 String bpId = get(properties, DEFAULT_BP_ID);
244 defaultBpId = isNullOrEmpty(bpId) ? defaultBpId : bpId;
Gamze Abakaad329652018-12-20 10:12:21 +0000245
Andrea Campanella971d5b92020-05-07 11:20:43 +0200246 String mcastSN = get(properties, DEFAULT_MCAST_SERVICE_NAME);
247 multicastServiceName = isNullOrEmpty(mcastSN) ? multicastServiceName : mcastSN;
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000248
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700249 String flowThreads = get(properties, FLOW_PROCESSING_THREADS);
250 int oldFlowProcessingThreads = flowProcessingThreads;
251 flowProcessingThreads = isNullOrEmpty(flowThreads) ?
252 oldFlowProcessingThreads : Integer.parseInt(flowThreads.trim());
Saurav Das2d3777a2020-08-07 18:48:51 -0700253
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700254 if (flowsExecutor == null || oldFlowProcessingThreads != flowProcessingThreads) {
255 if (flowsExecutor != null) {
256 flowsExecutor.shutdown();
257 }
258 flowsExecutor = Executors.newFixedThreadPool(flowProcessingThreads,
259 groupedThreads(ONOS_OLT_SERVICE,
260 "flows-installer-%d"));
261 }
Gamze Abaka33feef52019-02-27 08:16:47 +0000262
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700263 String subscriberThreads = get(properties, SUBSCRIBER_PROCESSING_THREADS);
264 int oldSubscriberProcessingThreads = subscriberProcessingThreads;
265 subscriberProcessingThreads = isNullOrEmpty(subscriberThreads) ?
266 oldSubscriberProcessingThreads : Integer.parseInt(subscriberThreads.trim());
267
268 if (subscriberExecutor == null || oldSubscriberProcessingThreads != subscriberProcessingThreads) {
269 if (subscriberExecutor != null) {
270 subscriberExecutor.shutdown();
271 }
272 subscriberExecutor = Executors.newFixedThreadPool(subscriberProcessingThreads,
273 groupedThreads(ONOS_OLT_SERVICE,
274 "subscriber-installer-%d"));
275 }
276
277 String queueDelay = get(properties, REQUEUE_DELAY);
278 requeueDelay = isNullOrEmpty(queueDelay) ?
279 REQUEUE_DELAY_DEFAULT : Integer.parseInt(queueDelay.trim());
alshabibe0559672016-02-21 14:49:51 -0800280 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700281 log.info("Modified. Values = {}: {}, {}: {}, " +
282 "{}:{}, {}:{}, {}:{}",
283 DEFAULT_BP_ID, defaultBpId,
284 DEFAULT_MCAST_SERVICE_NAME, multicastServiceName,
285 FLOW_PROCESSING_THREADS, flowProcessingThreads,
286 SUBSCRIBER_PROCESSING_THREADS, subscriberProcessingThreads,
287 REQUEUE_DELAY, requeueDelay);
alshabibe0559672016-02-21 14:49:51 -0800288 }
289
Ilayda Ozdemir90a93622021-02-25 09:40:58 +0000290
alshabib32232c82016-02-25 17:57:24 -0500291 @Override
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700292 public boolean provisionSubscriber(ConnectPoint cp) {
293 subscriberExecutor.submit(() -> {
294 Device device = deviceService.getDevice(cp.deviceId());
295 Port port = deviceService.getPort(device.id(), cp.port());
296 AccessDevicePort accessDevicePort = new AccessDevicePort(port);
Hardik Windlass395ff372019-06-13 05:16:00 +0000297
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700298 if (oltDeviceService.isNniPort(device, port.number())) {
299 log.warn("will not provision a subscriber on the NNI {}", accessDevicePort);
300 return false;
Saurav Das026650f2020-09-21 18:56:35 -0700301 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700302
303 log.info("Provisioning subscriber on {}", accessDevicePort);
304
305 if (oltFlowService.isSubscriberServiceProvisioned(accessDevicePort)) {
306 log.error("Subscriber on {} is already provisioned", accessDevicePort);
307 return false;
308 }
309
310 SubscriberAndDeviceInformation si = subsService.get(getPortName(port));
311 if (si == null) {
312 log.error("Subscriber information not found in sadis for port {}", accessDevicePort);
313 return false;
314 }
315 DiscoveredSubscriber sub = new DiscoveredSubscriber(device, port,
316 DiscoveredSubscriber.Status.ADDED, true, si);
317
318 // NOTE we need to keep a list of the subscribers that are provisioned on a port,
319 // regardless of the flow status
320 si.uniTagList().forEach(uti -> {
321 ServiceKey sk = new ServiceKey(accessDevicePort, uti);
322 oltFlowService.updateProvisionedSubscriberStatus(sk, true);
323 });
324
325 addSubscriberToQueue(sub);
326 return true;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +0100327 });
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700328 //NOTE this only means we have taken the request in, nothing more.
Amit Ghosh31939522018-08-16 13:28:21 +0100329 return true;
alshabibbf23a1f2016-01-14 17:27:11 -0800330 }
331
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000332 @Override
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700333 public boolean removeSubscriber(ConnectPoint cp) {
334 subscriberExecutor.submit(() -> {
335 Device device = deviceService.getDevice(DeviceId.deviceId(cp.deviceId().toString()));
336 Port port = deviceService.getPort(device.id(), cp.port());
337 AccessDevicePort accessDevicePort = new AccessDevicePort(port);
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000338
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700339 if (oltDeviceService.isNniPort(device, port.number())) {
340 log.warn("will not un-provision a subscriber on the NNI {}",
341 accessDevicePort);
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100342 return false;
343 }
344
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700345 log.info("Un-provisioning subscriber on {}", accessDevicePort);
346
347 if (!oltFlowService.isSubscriberServiceProvisioned(accessDevicePort)) {
348 log.error("Subscriber on {} is not provisioned", accessDevicePort);
349 return false;
350 }
351
352 SubscriberAndDeviceInformation si = subsService.get(getPortName(port));
353 if (si == null) {
354 log.error("Subscriber information not found in sadis for port {}",
355 accessDevicePort);
356 // NOTE that we are returning true so that the subscriber is removed from the queue
357 // and we can move on provisioning others
358 return false;
359 }
360 DiscoveredSubscriber sub = new DiscoveredSubscriber(device, port,
361 DiscoveredSubscriber.Status.REMOVED, true, si);
362
363 // NOTE we need to keep a list of the subscribers that are provisioned on a port,
364 // regardless of the flow status
365 si.uniTagList().forEach(uti -> {
366 ServiceKey sk = new ServiceKey(accessDevicePort, uti);
367 oltFlowService.updateProvisionedSubscriberStatus(sk, false);
Gamze Abakaf59c0912019-04-19 08:24:28 +0000368 });
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700369
370 addSubscriberToQueue(sub);
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100371 return true;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700372 });
373 //NOTE this only means we have taken the request in, nothing more.
374 return true;
375 }
376
377 @Override
378 public boolean provisionSubscriber(ConnectPoint cp, VlanId cTag, VlanId sTag, Integer tpId) {
379 log.debug("Provisioning subscriber on {}, with cTag {}, stag {}, tpId {}",
380 cp, cTag, sTag, tpId);
381 Device device = deviceService.getDevice(cp.deviceId());
382 Port port = deviceService.getPort(device.id(), cp.port());
383 AccessDevicePort accessDevicePort = new AccessDevicePort(port);
384
385 if (oltDeviceService.isNniPort(device, port.number())) {
386 log.warn("will not provision a subscriber on the NNI {}", accessDevicePort);
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100387 return false;
388 }
Amit Ghosh95e2f652017-08-23 12:49:46 +0100389
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700390 SubscriberAndDeviceInformation si = new SubscriberAndDeviceInformation();
391 UniTagInformation specificService = getUniTagInformation(getPortName(port), cTag, sTag, tpId);
392 if (specificService == null) {
393 log.error("Can't find Information for subscriber on {}, with cTag {}, " +
394 "stag {}, tpId {}", cp, cTag, sTag, tpId);
Amit Ghosh31939522018-08-16 13:28:21 +0100395 return false;
396 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700397 List<UniTagInformation> uniTagInformationList = new LinkedList<>();
398 uniTagInformationList.add(specificService);
399 si.setUniTagList(uniTagInformationList);
400 DiscoveredSubscriber sub = new DiscoveredSubscriber(device, port,
401 DiscoveredSubscriber.Status.ADDED, true, si);
Amit Ghosh31939522018-08-16 13:28:21 +0100402
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700403 // NOTE we need to keep a list of the subscribers that are provisioned on a port,
404 // regardless of the flow status
405 ServiceKey sk = new ServiceKey(accessDevicePort, specificService);
406 if (oltFlowService.isSubscriberServiceProvisioned(sk)) {
407 log.error("Subscriber on {} is already provisioned", sk);
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100408 return false;
409 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700410 oltFlowService.updateProvisionedSubscriberStatus(sk, true);
411
412 addSubscriberToQueue(sub);
413 return true;
Amit Ghosh31939522018-08-16 13:28:21 +0100414 }
415
416 @Override
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700417 public boolean removeSubscriber(ConnectPoint cp, VlanId cTag, VlanId sTag, Integer tpId) {
418 log.debug("Un-provisioning subscriber on {} with cTag {}, stag {}, tpId {}",
419 cp, cTag, sTag, tpId);
420 Device device = deviceService.getDevice(cp.deviceId());
421 Port port = deviceService.getPort(device.id(), cp.port());
422 AccessDevicePort accessDevicePort = new AccessDevicePort(port);
423
424 if (oltDeviceService.isNniPort(device, port.number())) {
425 log.warn("will not un-provision a subscriber on the NNI {}",
426 accessDevicePort);
427 return false;
428 }
429
430 SubscriberAndDeviceInformation si = new SubscriberAndDeviceInformation();
431 UniTagInformation specificService = getUniTagInformation(getPortName(port), cTag, sTag, tpId);
432 if (specificService == null) {
433 log.error("Can't find Information for subscriber on {}, with cTag {}, " +
434 "stag {}, tpId {}", cp, cTag, sTag, tpId);
435 return false;
436 }
437 List<UniTagInformation> uniTagInformationList = new LinkedList<>();
438 uniTagInformationList.add(specificService);
439 si.setUniTagList(uniTagInformationList);
440 DiscoveredSubscriber sub = new DiscoveredSubscriber(device, port,
441 DiscoveredSubscriber.Status.ADDED, true, si);
442
443 // NOTE we need to keep a list of the subscribers that are provisioned on a port,
444 // regardless of the flow status
445 ServiceKey sk = new ServiceKey(accessDevicePort, specificService);
446 if (!oltFlowService.isSubscriberServiceProvisioned(sk)) {
447 log.error("Subscriber on {} is not provisioned", sk);
448 return false;
449 }
450 oltFlowService.updateProvisionedSubscriberStatus(sk, false);
451
452 addSubscriberToQueue(sub);
453 return true;
Saurav Das82b8e6d2018-10-04 15:25:12 -0700454 }
455
456 @Override
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700457 public List<DeviceId> getConnectedOlts() {
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100458 List<DeviceId> olts = new ArrayList<>();
459 Iterable<Device> devices = deviceService.getDevices();
460 for (Device d : devices) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700461 if (oltDeviceService.isOlt(d)) {
Saurav Das82b8e6d2018-10-04 15:25:12 -0700462 // So this is indeed an OLT device
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100463 olts.add(d.id());
464 }
465 }
466 return olts;
alshabibe0559672016-02-21 14:49:51 -0800467 }
468
Amit Ghosh31939522018-08-16 13:28:21 +0100469 /**
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700470 * Finds the connect-point to which a subscriber is connected.
Amit Ghosh31939522018-08-16 13:28:21 +0100471 *
472 * @param id The id of the subscriber, this is the same ID as in Sadis
473 * @return Subscribers ConnectPoint if found else null
474 */
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700475 @Override
476 public ConnectPoint findSubscriberConnectPoint(String id) {
Amit Ghosh31939522018-08-16 13:28:21 +0100477
478 Iterable<Device> devices = deviceService.getDevices();
479 for (Device d : devices) {
480 for (Port p : deviceService.getPorts(d.id())) {
481 log.trace("Comparing {} with {}", p.annotations().value(AnnotationKeys.PORT_NAME), id);
482 if (p.annotations().value(AnnotationKeys.PORT_NAME).equals(id)) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700483 log.debug("Found on {}", portWithName(p));
Amit Ghosh31939522018-08-16 13:28:21 +0100484 return new ConnectPoint(d.id(), p.number());
485 }
486 }
487 }
488 return null;
489 }
490
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700491 protected void processDiscoveredSubscribers() {
492 log.info("Started processDiscoveredSubscribers loop");
493 while (true) {
494 Set<ConnectPoint> discoveredCps;
495 try {
496 queueReadLock.lock();
497 discoveredCps = eventsQueues.keySet();
498 } catch (Exception e) {
499 log.error("Cannot read keys from queue map", e);
500 return;
501 } finally {
502 queueReadLock.unlock();
503 }
Gamze Abaka641fc072018-09-04 09:16:27 +0000504
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700505 discoveredCps.forEach(cp -> {
506 LinkedBlockingQueue<DiscoveredSubscriber> eventsQueue;
alshabibbf23a1f2016-01-14 17:27:11 -0800507
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700508 try {
509 queueReadLock.lock();
510 eventsQueue = eventsQueues.get(cp);
511 } catch (Exception e) {
512 log.error("Cannot get key from queue map", e);
513 return;
514 } finally {
515 queueReadLock.unlock();
Tunahan Sezena07fe962021-02-24 08:24:24 +0000516 }
Tunahan Sezena07fe962021-02-24 08:24:24 +0000517
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700518 if (!oltDeviceService.isLocalLeader(cp.deviceId())) {
519 // if we're not local leader for this device, ignore this queue
520 if (log.isTraceEnabled()) {
521 log.trace("Ignoring queue on CP {} as not master of the device", cp);
Andrea Campanella7a1d7e72020-11-05 10:40:10 +0100522 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700523 return;
Tunahan Sezena07fe962021-02-24 08:24:24 +0000524 }
525
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700526 flowsExecutor.execute(() -> {
527 if (!eventsQueue.isEmpty()) {
528 // we do not remove the event from the queue until it has been processed
529 // in that way we guarantee that events are processed in order
530 DiscoveredSubscriber sub = eventsQueue.peek();
531 if (sub == null) {
532 // the queue is empty
533 return;
Tunahan Sezena07fe962021-02-24 08:24:24 +0000534 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700535
536 if (log.isTraceEnabled()) {
537 log.trace("Processing subscriber on port {} with status {}",
538 portWithName(sub.port), sub.status);
539 }
540
541 if (sub.hasSubscriber) {
542 // this is a provision subscriber call
543 if (oltFlowService.handleSubscriberFlows(sub, defaultBpId, multicastServiceName)) {
544 if (log.isTraceEnabled()) {
545 log.trace("Provisioning of subscriber on {} completed",
546 portWithName(sub.port));
547 }
548 removeSubscriberFromQueue(sub);
549 }
550 } else {
551 // this is a port event (ENABLED/DISABLED)
552 // means no subscriber was provisioned on that port
553
554 if (!deviceService.isAvailable(sub.device.id()) ||
555 deviceService.getPort(sub.device.id(), sub.port.number()) == null) {
556 // If the device is not connected or the port is not available do nothing
557 // This can happen when we disable and then immediately delete the device,
558 // the queue is populated but the meters and flows are already gone
559 // thus there is nothing left to do
560 return;
561 }
562
563 if (oltFlowService.handleBasicPortFlows(sub, defaultBpId, defaultBpId)) {
564 if (log.isTraceEnabled()) {
565 log.trace("Processing of port {} completed",
566 portWithName(sub.port));
567 }
568 removeSubscriberFromQueue(sub);
569 }
Tunahan Sezena07fe962021-02-24 08:24:24 +0000570 }
571 }
572 });
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700573 });
Tunahan Sezena07fe962021-02-24 08:24:24 +0000574
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700575 try {
576 TimeUnit.MILLISECONDS.sleep(requeueDelay);
577 } catch (InterruptedException e) {
578 continue;
Tunahan Sezena07fe962021-02-24 08:24:24 +0000579 }
580 }
Tunahan Sezena07fe962021-02-24 08:24:24 +0000581 }
582
583 /**
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000584 * Checks the subscriber uni tag list and find the uni tag information.
585 * using the pon c tag, pon s tag and the technology profile id
586 * May return Optional<null>
587 *
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700588 * @param portName port of the subscriber
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000589 * @param innerVlan pon c tag
590 * @param outerVlan pon s tag
591 * @param tpId the technology profile id
592 * @return the found uni tag information
593 */
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700594 private UniTagInformation getUniTagInformation(String portName, VlanId innerVlan,
595 VlanId outerVlan, int tpId) {
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000596 log.debug("Getting uni tag information for {}, innerVlan: {}, outerVlan: {}, tpId: {}",
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700597 portName, innerVlan, outerVlan, tpId);
598 SubscriberAndDeviceInformation subInfo = subsService.get(portName);
Gamze Abakaf59c0912019-04-19 08:24:28 +0000599 if (subInfo == null) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700600 log.warn("Subscriber information doesn't exist for {}", portName);
601 return null;
Gamze Abakaf59c0912019-04-19 08:24:28 +0000602 }
603
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000604 List<UniTagInformation> uniTagList = subInfo.uniTagList();
605 if (uniTagList == null) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700606 log.warn("Uni tag list is not found for the subscriber {} on {}", subInfo.id(), portName);
607 return null;
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000608 }
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100609
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000610 UniTagInformation service = null;
611 for (UniTagInformation tagInfo : subInfo.uniTagList()) {
612 if (innerVlan.equals(tagInfo.getPonCTag()) && outerVlan.equals(tagInfo.getPonSTag())
613 && tpId == tagInfo.getTechnologyProfileId()) {
614 service = tagInfo;
615 break;
Andy Bavier160e8682019-05-07 18:32:22 -0700616 }
Gamze Abaka1efc80c2019-02-15 12:10:54 +0000617 }
Gamze Abaka1efc80c2019-02-15 12:10:54 +0000618
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000619 if (service == null) {
Matteo Scandolo19b56f62020-10-29 13:29:21 -0700620 log.warn("SADIS doesn't include the service with ponCtag {} ponStag {} and tpId {} on {}",
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700621 innerVlan, outerVlan, tpId, portName);
Saurav Das82b8e6d2018-10-04 15:25:12 -0700622 return null;
623 }
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100624
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700625 return service;
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100626 }
627
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700628 protected void bindSadisService(SadisService service) {
629 sadisService = service;
630 subsService = sadisService.getSubscriberInfoService();
631
632 log.info("Sadis-service binds to onos.");
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100633 }
634
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700635 protected void unbindSadisService(SadisService service) {
636 deviceService.removeListener(deviceListener);
637 deviceListener = null;
638 sadisService = null;
639 subsService = null;
640 log.info("Sadis-service unbinds from onos.");
Jonathan Hart1d34c8b2018-05-05 15:37:28 -0700641 }
642
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700643 protected void addSubscriberToQueue(DiscoveredSubscriber sub) {
644 ConnectPoint cp = new ConnectPoint(sub.device.id(), sub.port.number());
645 LinkedBlockingQueue<DiscoveredSubscriber> q = null;
646 try {
647 queueReadLock.lock();
648 q = eventsQueues.getOrDefault(cp, new LinkedBlockingQueue<>());
649 } finally {
650 queueReadLock.unlock();
Ilayda Ozdemir90a93622021-02-25 09:40:58 +0000651 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700652 if (!q.contains(sub)) {
653 log.info("Adding subscriber to queue: {} with status {} and subscriber {}",
654 portWithName(sub.port), sub.status, sub.hasSubscriber);
655 q.add(sub);
Andrea Campanella1edf8832021-05-06 12:51:33 +0200656 } else {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700657 log.debug("Not adding subscriber to queue as already present: {} with status {}",
658 portWithName(sub.port), sub.status);
659 // no need to update the queue in the map if nothing has changed
660 return;
661 }
662 try {
663 queueWriteLock.lock();
664 eventsQueues.put(cp, q);
665 } catch (UnsupportedOperationException | ClassCastException |
666 NullPointerException | IllegalArgumentException e) {
667 log.error("Cannot add subscriber to queue: {}", e.getMessage());
668 } finally {
669 queueWriteLock.unlock();
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800670 }
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800671 }
672
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700673 protected void removeSubscriberFromQueue(DiscoveredSubscriber sub) {
674 ConnectPoint cp = new ConnectPoint(sub.device.id(), sub.port.number());
675 LinkedBlockingQueue<DiscoveredSubscriber> q = null;
676 if (log.isTraceEnabled()) {
677 log.trace("removing subscriber {} from queue", sub);
kdarapuaa5da252020-04-10 15:58:05 +0530678 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700679 try {
680 queueReadLock.lock();
681 q = eventsQueues.get(cp);
682 } finally {
683 queueReadLock.unlock();
684 }
685 if (q == null) {
686 log.warn("Cannot find queue for connectPoint {}", cp);
687 return;
688 }
689 boolean removed = q.remove(sub);
690 if (!removed) {
691 log.warn("Subscriber {} has not been removed from queue, is it still there? {}", sub, q);
692 return;
693 } else {
694 log.debug("Subscriber {} has been removed from the queue", sub);
Tunahan Sezena07fe962021-02-24 08:24:24 +0000695 }
696
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700697 try {
698 queueWriteLock.lock();
699 eventsQueues.remove(cp); // am I needed??
700 eventsQueues.put(cp, q);
701 } catch (UnsupportedOperationException | ClassCastException |
702 NullPointerException | IllegalArgumentException e) {
703 log.error("Cannot remove subscriber {} from queue: {}", sub, e.getMessage());
704 } finally {
705 queueWriteLock.unlock();
Tunahan Sezena07fe962021-02-24 08:24:24 +0000706 }
707 }
708
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700709 protected class OltDeviceListener
710 implements DeviceListener {
711
712 private final Logger log = LoggerFactory.getLogger(getClass());
713 protected ExecutorService eventExecutor;
714
715 /**
716 * Builds the listener with all the proper services and information needed.
717 */
718 public OltDeviceListener() {
719 this.eventExecutor = Executors.newFixedThreadPool(flowProcessingThreads,
720 groupedThreads("onos/olt-device-listener-event", "event-%d", log));
721 }
722
723 public void deactivate() {
724 this.eventExecutor.shutdown();
725 }
Saurav Dasa9d5f442019-03-06 19:32:48 -0800726
alshabibf0e7e702015-05-30 18:22:36 -0700727 @Override
728 public void event(DeviceEvent event) {
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700729 eventExecutor.execute(() -> {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700730 boolean isOlt = oltDeviceService.isOlt(event.subject());
731 DeviceId deviceId = event.subject().id();
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700732 switch (event.type()) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700733 case PORT_STATS_UPDATED:
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700734 case DEVICE_ADDED:
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700735 return;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700736 case PORT_ADDED:
737 case PORT_UPDATED:
738 case PORT_REMOVED:
739 if (!isOlt) {
740 log.trace("Ignoring event {}, this is not an OLT device", deviceId);
741 return;
742 }
743 if (!oltDeviceService.isLocalLeader(deviceId)) {
744 log.trace("Device {} is not local to this node", deviceId);
745 return;
746 }
747 // port added, updated and removed are treated in the same way as we only care whether the port
748 // is enabled or not
749 handleOltPort(event.type(), event.subject(), event.port());
750 return;
751 case DEVICE_AVAILABILITY_CHANGED:
752 if (!isOlt) {
753 log.trace("Ignoring event {}, this is not an OLT device", deviceId);
754 return;
755 }
756 if (deviceService.isAvailable(deviceId)) {
757 if (!oltDeviceService.isLocalLeader(deviceId)) {
758 if (log.isTraceEnabled()) {
759 log.trace("Device {} is not local to this node, not handling available device",
760 deviceId);
761 }
762 } else {
763 log.info("Handling available device: {}", deviceId);
764 handleExistingPorts();
765 }
766 } else if (!deviceService.isAvailable(deviceId) && deviceService.getPorts(deviceId).isEmpty()) {
767 // NOTE that upon disconnection there is no mastership on the device,
768 // and we should anyway clear the local cache of the flows/meters across instances.
769 // We're only clearing the device if there are no available ports,
770 // otherwise we assume it's a temporary disconnection
771 log.info("Device {} availability changed to false and ports are empty, " +
772 "purging meters and flows", deviceId);
773 //NOTE all the instances will call these methods
774 oltFlowService.purgeDeviceFlows(deviceId);
775 oltMeterService.purgeDeviceMeters(deviceId);
776 clearQueueForDevice(deviceId);
777 } else {
778 log.info("Device {} availability changed to false, but ports are still available, " +
779 "assuming temporary disconnection. Ports: {}",
780 deviceId, deviceService.getPorts(deviceId));
781 }
782 return;
783 case DEVICE_REMOVED:
784 if (!isOlt) {
785 log.trace("Ignoring event {}, this is not an OLT device", deviceId);
786 return;
787 }
788 log.info("Device {} Removed, purging meters and flows", deviceId);
789 oltFlowService.purgeDeviceFlows(deviceId);
790 oltMeterService.purgeDeviceMeters(deviceId);
791 clearQueueForDevice(deviceId);
792 return;
793 default:
794 if (log.isTraceEnabled()) {
795 log.trace("Not handling event: {}, ", event);
796 }
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700797 }
798 });
alshabibf0e7e702015-05-30 18:22:36 -0700799 }
Gamze Abaka838d8142019-02-21 07:06:55 +0000800
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700801 protected void clearQueueForDevice(DeviceId devId) {
802 try {
803 queueWriteLock.lock();
804 Iterator<Map.Entry<ConnectPoint, LinkedBlockingQueue<DiscoveredSubscriber>>> iter =
805 eventsQueues.entrySet().iterator();
806 while (iter.hasNext()) {
807 Map.Entry<ConnectPoint, LinkedBlockingQueue<DiscoveredSubscriber>> entry = iter.next();
808 if (entry.getKey().deviceId().equals(devId)) {
809 eventsQueues.remove(entry.getKey());
810 }
811 }
812 } finally {
813 queueWriteLock.unlock();
Gamze Abaka838d8142019-02-21 07:06:55 +0000814 }
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000815 }
816
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700817 protected void handleOltPort(DeviceEvent.Type type, Device device, Port port) {
818 log.info("OltDeviceListener receives event {} for port {} with status {} on device {}", type,
819 portWithName(port), port.isEnabled() ? "ENABLED" : "DISABLED", device.id());
820 if (port.isEnabled()) {
821 if (oltDeviceService.isNniPort(device, port.number())) {
822 // NOTE in the NNI case we receive a PORT_REMOVED event with status ENABLED, thus we need to
823 // pass the floeAction to the handleNniFlows method
824 OltFlowService.FlowOperation action = OltFlowService.FlowOperation.ADD;
825 if (type == DeviceEvent.Type.PORT_REMOVED) {
826 action = OltFlowService.FlowOperation.REMOVE;
827 }
828 oltFlowService.handleNniFlows(device, port, action);
829 } else {
830 post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_ADDED, device.id(), port));
831 // NOTE if the subscriber was previously provisioned,
832 // then add it back to the queue to be re-provisioned
833 boolean provisionSubscriber = oltFlowService.
834 isSubscriberServiceProvisioned(new AccessDevicePort(port));
835 SubscriberAndDeviceInformation si = subsService.get(getPortName(port));
836 if (si == null) {
837 //NOTE this should not happen given that the subscriber was provisioned before
838 log.error("Subscriber information not found in sadis for port {}",
839 portWithName(port));
840 return;
841 }
842
843 DiscoveredSubscriber.Status status = DiscoveredSubscriber.Status.ADDED;
844 if (type == DeviceEvent.Type.PORT_REMOVED) {
845 status = DiscoveredSubscriber.Status.REMOVED;
846 }
847
848 DiscoveredSubscriber sub =
849 new DiscoveredSubscriber(device, port,
850 status, provisionSubscriber, si);
851 addSubscriberToQueue(sub);
852 }
853 } else {
854 if (oltDeviceService.isNniPort(device, port.number())) {
855 // NOTE this may need to be handled on DEVICE_REMOVE as we don't disable the NNI
856 oltFlowService.handleNniFlows(device, port, OltFlowService.FlowOperation.REMOVE);
857 } else {
858 post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_REMOVED, device.id(), port));
859 // NOTE we are assuming that if a subscriber has default eapol
860 // it does not have subscriber flows
861 if (oltFlowService.hasDefaultEapol(port)) {
862 SubscriberAndDeviceInformation si = subsService.get(getPortName(port));
863 if (si == null) {
864 //NOTE this should not happen given that the subscriber was provisioned before
865 log.error("Subscriber information not found in sadis for port {}",
866 portWithName(port));
867 return;
868 }
869 DiscoveredSubscriber sub =
870 new DiscoveredSubscriber(device, port,
871 DiscoveredSubscriber.Status.REMOVED, false, si);
872
873 addSubscriberToQueue(sub);
874
875 } else if (oltFlowService.isSubscriberServiceProvisioned(new AccessDevicePort(port))) {
876 SubscriberAndDeviceInformation si = subsService.get(getPortName(port));
877 if (si == null) {
878 //NOTE this should not happen given that the subscriber was provisioned before
879 log.error("Subscriber information not found in sadis for port {}",
880 portWithName(port));
881 return;
882 }
883 DiscoveredSubscriber sub =
884 new DiscoveredSubscriber(device, port,
885 DiscoveredSubscriber.Status.REMOVED, true, si);
886 addSubscriberToQueue(sub);
887 }
888 }
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000889 }
Gamze Abaka838d8142019-02-21 07:06:55 +0000890 }
Gamze Abakada282b42019-03-11 13:16:48 +0000891
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700892 /**
893 * This method is invoked on app activation in order to deal
894 * with devices and ports that are already existing in the system
895 * and thus won't trigger an event.
896 * It is also needed on instance reboot and device reconnect
897 */
898 protected void handleExistingPorts() {
899 Iterable<DeviceId> devices = getConnectedOlts();
900 for (DeviceId deviceId : devices) {
901 log.info("Handling existing OLT Ports for device {}", deviceId);
902 if (oltDeviceService.isLocalLeader(deviceId)) {
903 List<Port> ports = deviceService.getPorts(deviceId);
904 for (Port p : ports) {
905 if (PortNumber.LOCAL.equals(p.number()) || !p.isEnabled()) {
906 continue;
907 }
908 Device device = deviceService.getDevice(deviceId);
909 deviceListener.handleOltPort(DeviceEvent.Type.PORT_UPDATED, device, p);
910 }
911 }
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800912 }
913 }
914 }
Hardik Windlass395ff372019-06-13 05:16:00 +0000915}