blob: e963997b1f88ad9188f171a9ae19dcea1b880336 [file] [log] [blame]
alshabibf0e7e702015-05-30 18:22:36 -07001/*
Matteo Scandoloaa2adde2021-09-13 12:45:32 -07002 * Copyright 2021-present Open Networking Foundation
alshabibf0e7e702015-05-30 18:22:36 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
alshabib36a4d732016-06-01 16:03:59 -070016package org.opencord.olt.impl;
alshabibf0e7e702015-05-30 18:22:36 -070017
alshabibf0e7e702015-05-30 18:22:36 -070018import org.onlab.packet.VlanId;
Jonathan Hart4f178fa2020-02-03 10:46:01 -080019import org.onlab.util.KryoNamespace;
Gamze Abaka1f62dd92020-05-07 08:58:13 +000020import org.onosproject.cfg.ComponentConfigService;
Jonathan Hart4f178fa2020-02-03 10:46:01 -080021import org.onosproject.cluster.ClusterService;
Andrea Campanellaaf39b4c2020-05-13 14:07:44 +020022import org.onosproject.cluster.LeadershipService;
alshabibf0e7e702015-05-30 18:22:36 -070023import org.onosproject.core.ApplicationId;
24import org.onosproject.core.CoreService;
alshabib8e4fd2f2016-01-12 15:55:53 -080025import org.onosproject.event.AbstractListenerManager;
Andrea Campanellaaf39b4c2020-05-13 14:07:44 +020026import org.onosproject.mastership.MastershipService;
Amit Ghosh1ed9aef2018-07-17 17:08:16 +010027import org.onosproject.net.AnnotationKeys;
Jonathan Harte533a422015-10-20 17:31:24 -070028import org.onosproject.net.ConnectPoint;
Amit Ghosh1ed9aef2018-07-17 17:08:16 +010029import org.onosproject.net.Device;
alshabibf0e7e702015-05-30 18:22:36 -070030import org.onosproject.net.DeviceId;
alshabibdec2e252016-01-15 12:20:25 -080031import org.onosproject.net.Port;
alshabibf0e7e702015-05-30 18:22:36 -070032import org.onosproject.net.PortNumber;
33import org.onosproject.net.device.DeviceEvent;
34import org.onosproject.net.device.DeviceListener;
35import org.onosproject.net.device.DeviceService;
Jonathan Hart4f178fa2020-02-03 10:46:01 -080036import org.onosproject.store.serializers.KryoNamespaces;
Jonathan Hart4f178fa2020-02-03 10:46:01 -080037import org.onosproject.store.service.Serializer;
38import org.onosproject.store.service.StorageService;
alshabib36a4d732016-06-01 16:03:59 -070039import org.opencord.olt.AccessDeviceEvent;
40import org.opencord.olt.AccessDeviceListener;
Gustavo Silva29fb20e2022-05-26 09:59:54 -030041import org.opencord.olt.AccessDevicePort;
alshabib36a4d732016-06-01 16:03:59 -070042import org.opencord.olt.AccessDeviceService;
Gustavo Silva29fb20e2022-05-26 09:59:54 -030043import org.opencord.olt.DiscoveredSubscriber;
44import org.opencord.olt.OltDeviceServiceInterface;
45import org.opencord.olt.OltFlowServiceInterface;
46import org.opencord.olt.OltMeterServiceInterface;
47import org.opencord.olt.ServiceKey;
48import org.opencord.olt.FlowOperation;
Gamze Abaka641fc072018-09-04 09:16:27 +000049import org.opencord.sadis.BaseInformationService;
50import org.opencord.sadis.SadisService;
Amit Ghosh1ed9aef2018-07-17 17:08:16 +010051import org.opencord.sadis.SubscriberAndDeviceInformation;
Andrea Campanellacbbb7952019-11-25 06:38:41 +000052import org.opencord.sadis.UniTagInformation;
alshabibe0559672016-02-21 14:49:51 -080053import org.osgi.service.component.ComponentContext;
Carmelo Casconeca931162019-07-15 18:22:24 -070054import org.osgi.service.component.annotations.Activate;
55import org.osgi.service.component.annotations.Component;
56import org.osgi.service.component.annotations.Deactivate;
57import org.osgi.service.component.annotations.Modified;
58import org.osgi.service.component.annotations.Reference;
59import org.osgi.service.component.annotations.ReferenceCardinality;
Ilayda Ozdemir90a93622021-02-25 09:40:58 +000060import org.osgi.service.component.annotations.ReferencePolicy;
alshabibf0e7e702015-05-30 18:22:36 -070061import org.slf4j.Logger;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -070062import org.slf4j.LoggerFactory;
alshabibf0e7e702015-05-30 18:22:36 -070063
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010064import java.util.ArrayList;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010065import java.util.Dictionary;
Andrea Campanella61650a12022-01-24 18:09:44 -080066import java.util.HashSet;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -070067import java.util.Iterator;
68import java.util.LinkedList;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010069import java.util.List;
70import java.util.Map;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010071import java.util.Properties;
72import java.util.Set;
Andrea Campanella61650a12022-01-24 18:09:44 -080073import java.util.concurrent.ArrayBlockingQueue;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010074import java.util.concurrent.ExecutorService;
75import java.util.concurrent.Executors;
76import java.util.concurrent.LinkedBlockingQueue;
77import java.util.concurrent.ScheduledExecutorService;
Andrea Campanella61650a12022-01-24 18:09:44 -080078import java.util.concurrent.ThreadPoolExecutor;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010079import java.util.concurrent.TimeUnit;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -070080import java.util.concurrent.locks.Lock;
81import java.util.concurrent.locks.ReentrantReadWriteLock;
Andrea Campanella4bfc2422022-06-28 14:33:01 +020082import java.util.stream.Collectors;
83import java.util.Optional;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010084
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010085import static com.google.common.base.Strings.isNullOrEmpty;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010086import static org.onlab.util.Tools.get;
87import static org.onlab.util.Tools.groupedThreads;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -070088import static org.opencord.olt.impl.OltUtils.getPortName;
89import static org.opencord.olt.impl.OltUtils.portWithName;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010090import static org.opencord.olt.impl.OsgiPropertyConstants.*;
alshabibf0e7e702015-05-30 18:22:36 -070091
92/**
Matteo Scandoloaa2adde2021-09-13 12:45:32 -070093 * OLT Application.
alshabibf0e7e702015-05-30 18:22:36 -070094 */
Carmelo Casconeca931162019-07-15 18:22:24 -070095@Component(immediate = true,
96 property = {
Carmelo Casconeca931162019-07-15 18:22:24 -070097 DEFAULT_BP_ID + ":String=" + DEFAULT_BP_ID_DEFAULT,
Andrea Campanellacbbb7952019-11-25 06:38:41 +000098 DEFAULT_MCAST_SERVICE_NAME + ":String=" + DEFAULT_MCAST_SERVICE_NAME_DEFAULT,
Matteo Scandoloaa2adde2021-09-13 12:45:32 -070099 FLOW_PROCESSING_THREADS + ":Integer=" + FLOW_PROCESSING_THREADS_DEFAULT,
Andrea Campanella61650a12022-01-24 18:09:44 -0800100 FLOW_EXECUTOR_QUEUE_SIZE + ":Integer=" + FLOW_EXECUTOR_QUEUE_SIZE_DEFAULT,
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700101 SUBSCRIBER_PROCESSING_THREADS + ":Integer=" + SUBSCRIBER_PROCESSING_THREADS_DEFAULT,
102 REQUEUE_DELAY + ":Integer=" + REQUEUE_DELAY_DEFAULT
Carmelo Casconeca931162019-07-15 18:22:24 -0700103 })
alshabib8e4fd2f2016-01-12 15:55:53 -0800104public class Olt
105 extends AbstractListenerManager<AccessDeviceEvent, AccessDeviceListener>
106 implements AccessDeviceService {
alshabibf0e7e702015-05-30 18:22:36 -0700107
Carmelo Casconeca931162019-07-15 18:22:24 -0700108 @Reference(cardinality = ReferenceCardinality.MANDATORY)
alshabibf0e7e702015-05-30 18:22:36 -0700109 protected DeviceService deviceService;
110
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700111 @Reference(cardinality = ReferenceCardinality.MANDATORY)
112 protected ComponentConfigService cfgService;
Andrea Campanella438e1ad2021-03-26 11:41:16 +0100113
Carmelo Casconeca931162019-07-15 18:22:24 -0700114 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700115 protected MastershipService mastershipService;
alshabibf0e7e702015-05-30 18:22:36 -0700116
Andrea Campanella438e1ad2021-03-26 11:41:16 +0100117 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700118 protected ClusterService clusterService;
119
120 @Reference(cardinality = ReferenceCardinality.MANDATORY)
121 protected LeadershipService leadershipService;
Andrea Campanella438e1ad2021-03-26 11:41:16 +0100122
Ilayda Ozdemir90a93622021-02-25 09:40:58 +0000123 @Reference(cardinality = ReferenceCardinality.OPTIONAL,
124 bind = "bindSadisService",
125 unbind = "unbindSadisService",
126 policy = ReferencePolicy.DYNAMIC)
127 protected volatile SadisService sadisService;
Gamze Abaka641fc072018-09-04 09:16:27 +0000128
Carmelo Casconeca931162019-07-15 18:22:24 -0700129 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700130 protected OltDeviceServiceInterface oltDeviceService;
alshabibe0559672016-02-21 14:49:51 -0800131
Carmelo Casconeca931162019-07-15 18:22:24 -0700132 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700133 protected OltFlowServiceInterface oltFlowService;
134
135 @Reference(cardinality = ReferenceCardinality.MANDATORY)
136 protected OltMeterServiceInterface oltMeterService;
Gamze Abakaad329652018-12-20 10:12:21 +0000137
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800138 @Reference(cardinality = ReferenceCardinality.MANDATORY)
139 protected StorageService storageService;
140
141 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700142 protected CoreService coreService;
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800143
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700144 protected ApplicationId appId;
Andrea Campanellaaf39b4c2020-05-13 14:07:44 +0200145
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700146 private static final String ONOS_OLT_SERVICE = "onos/olt-service";
Tunahan Sezena07fe962021-02-24 08:24:24 +0000147
Carmelo Casconeca931162019-07-15 18:22:24 -0700148 /**
Carmelo Cascone95ff5122019-11-14 14:19:13 -0800149 * Default bandwidth profile id that is used for authentication trap flows.
Carmelo Casconeca931162019-07-15 18:22:24 -0700150 **/
151 protected String defaultBpId = DEFAULT_BP_ID_DEFAULT;
Gamze Abakaad329652018-12-20 10:12:21 +0000152
Carmelo Casconeca931162019-07-15 18:22:24 -0700153 /**
Gamze Abaka51a34e82020-05-08 13:03:14 +0000154 * Default multicast service name.
Carmelo Casconeca931162019-07-15 18:22:24 -0700155 **/
Gamze Abaka51a34e82020-05-08 13:03:14 +0000156 protected String multicastServiceName = DEFAULT_MCAST_SERVICE_NAME_DEFAULT;
Gamze Abaka33feef52019-02-27 08:16:47 +0000157
Saurav Das2d3777a2020-08-07 18:48:51 -0700158 /**
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700159 * Number of threads used to process flows.
Saurav Das2d3777a2020-08-07 18:48:51 -0700160 **/
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700161 protected int flowProcessingThreads = FLOW_PROCESSING_THREADS_DEFAULT;
Saurav Das2d3777a2020-08-07 18:48:51 -0700162
Matteo Scandoloa4aaa972020-10-23 15:24:38 -0700163 /**
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700164 * Number of threads used to process flows.
165 **/
Andrea Campanella61650a12022-01-24 18:09:44 -0800166 protected int flowExecutorQueueSize = FLOW_EXECUTOR_QUEUE_SIZE_DEFAULT;
167
168 /**
169 * Number of threads used to process flows.
170 **/
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700171 protected int subscriberProcessingThreads = SUBSCRIBER_PROCESSING_THREADS_DEFAULT;
172
173 /**
174 * Delay in ms to put an event back in the queue, used to avoid retrying things to often if conditions are not met.
175 **/
176 protected int requeueDelay = REQUEUE_DELAY_DEFAULT;
177
178 private final Logger log = LoggerFactory.getLogger(getClass());
179
180 /**
181 * A queue to asynchronously process events.
Matteo Scandoloa4aaa972020-10-23 15:24:38 -0700182 */
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700183 protected Map<ConnectPoint, LinkedBlockingQueue<DiscoveredSubscriber>> eventsQueues;
alshabibf0e7e702015-05-30 18:22:36 -0700184
Gamze Abaka641fc072018-09-04 09:16:27 +0000185 protected BaseInformationService<SubscriberAndDeviceInformation> subsService;
alshabibf0e7e702015-05-30 18:22:36 -0700186
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700187 /**
188 * Listener for OLT devices events.
189 */
190 protected OltDeviceListener deviceListener = new OltDeviceListener();
191 protected ScheduledExecutorService discoveredSubscriberExecutor =
192 Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/olt",
193 "discovered-cp-%d", log));
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100194
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700195 protected ScheduledExecutorService queueExecutor =
196 Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/olt",
197 "discovered-cp-restore-%d", log));
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700198
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700199 /**
200 * Executor used to defer flow provisioning to a different thread pool.
201 */
202 protected ExecutorService flowsExecutor;
Saurav Dasa9d5f442019-03-06 19:32:48 -0800203
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700204 /**
205 * Executor used to defer subscriber handling from API call to a different thread pool.
206 */
207 protected ExecutorService subscriberExecutor;
208
209 private static final String APP_NAME = "org.opencord.olt";
210
211 private final ReentrantReadWriteLock queueLock = new ReentrantReadWriteLock();
212 private final Lock queueWriteLock = queueLock.writeLock();
213 private final Lock queueReadLock = queueLock.readLock();
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700214
alshabibf0e7e702015-05-30 18:22:36 -0700215 @Activate
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700216 protected void activate(ComponentContext context) {
217 cfgService.registerProperties(getClass());
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700218
alshabibe0559672016-02-21 14:49:51 -0800219 modified(context);
Saurav Das62ad75e2019-03-05 12:22:22 -0800220
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700221 appId = coreService.registerApplication(APP_NAME);
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800222 KryoNamespace serializer = KryoNamespace.newBuilder()
223 .register(KryoNamespaces.API)
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700224 .register(ConnectPoint.class)
225 .register(DiscoveredSubscriber.class)
226 .register(DiscoveredSubscriber.Status.class)
227 .register(SubscriberAndDeviceInformation.class)
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800228 .register(UniTagInformation.class)
Ilayda Ozdemir90a93622021-02-25 09:40:58 +0000229 .register(LinkedBlockingQueue.class)
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800230 .build();
231
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700232 eventsQueues = storageService.<ConnectPoint, LinkedBlockingQueue<DiscoveredSubscriber>>consistentMapBuilder()
233 .withName("volt-subscriber-queues")
234 .withApplicationId(appId)
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800235 .withSerializer(Serializer.using(serializer))
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700236 .build().asJavaMap();
alshabib4ceaed32016-03-03 18:00:58 -0800237
alshabibba357492016-01-27 13:49:46 -0800238 deviceService.addListener(deviceListener);
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700239
240 discoveredSubscriberExecutor.execute(this::processDiscoveredSubscribers);
241 eventDispatcher.addSink(AccessDeviceEvent.class, listenerRegistry);
242 log.info("Started");
243
244 deviceListener.handleExistingPorts();
alshabibf0e7e702015-05-30 18:22:36 -0700245 }
246
247 @Deactivate
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700248 protected void deactivate(ComponentContext context) {
249 cfgService.unregisterProperties(getClass(), false);
250 discoveredSubscriberExecutor.shutdown();
Andrea Campanellaeaf23952021-12-30 15:58:54 +0100251 deviceService.removeListener(deviceListener);
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700252 flowsExecutor.shutdown();
253 subscriberExecutor.shutdown();
254 deviceListener.deactivate();
alshabibf0e7e702015-05-30 18:22:36 -0700255 log.info("Stopped");
256 }
257
alshabibe0559672016-02-21 14:49:51 -0800258 @Modified
259 public void modified(ComponentContext context) {
260 Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700261 if (context != null) {
Andrea Campanella971d5b92020-05-07 11:20:43 +0200262 String bpId = get(properties, DEFAULT_BP_ID);
263 defaultBpId = isNullOrEmpty(bpId) ? defaultBpId : bpId;
Gamze Abakaad329652018-12-20 10:12:21 +0000264
Andrea Campanella971d5b92020-05-07 11:20:43 +0200265 String mcastSN = get(properties, DEFAULT_MCAST_SERVICE_NAME);
266 multicastServiceName = isNullOrEmpty(mcastSN) ? multicastServiceName : mcastSN;
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000267
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700268 String flowThreads = get(properties, FLOW_PROCESSING_THREADS);
269 int oldFlowProcessingThreads = flowProcessingThreads;
270 flowProcessingThreads = isNullOrEmpty(flowThreads) ?
271 oldFlowProcessingThreads : Integer.parseInt(flowThreads.trim());
Saurav Das2d3777a2020-08-07 18:48:51 -0700272
Andrea Campanella61650a12022-01-24 18:09:44 -0800273 String executorQueueSize = get(properties, FLOW_EXECUTOR_QUEUE_SIZE);
274 int oldExecutorQueueSize = flowExecutorQueueSize;
275 flowExecutorQueueSize = isNullOrEmpty(executorQueueSize) ?
276 oldExecutorQueueSize : Integer.parseInt(executorQueueSize.trim());
277
278 if (flowsExecutor == null || oldFlowProcessingThreads != flowProcessingThreads
279 || oldExecutorQueueSize != flowExecutorQueueSize) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700280 if (flowsExecutor != null) {
281 flowsExecutor.shutdown();
282 }
Andrea Campanella61650a12022-01-24 18:09:44 -0800283
284 flowsExecutor = new ThreadPoolExecutor(0, flowProcessingThreads, 30,
285 TimeUnit.SECONDS, new ThreadPoolQueue(flowExecutorQueueSize),
286 new ThreadPoolExecutor.DiscardPolicy());
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700287 }
Gamze Abaka33feef52019-02-27 08:16:47 +0000288
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700289 String subscriberThreads = get(properties, SUBSCRIBER_PROCESSING_THREADS);
290 int oldSubscriberProcessingThreads = subscriberProcessingThreads;
291 subscriberProcessingThreads = isNullOrEmpty(subscriberThreads) ?
292 oldSubscriberProcessingThreads : Integer.parseInt(subscriberThreads.trim());
293
294 if (subscriberExecutor == null || oldSubscriberProcessingThreads != subscriberProcessingThreads) {
295 if (subscriberExecutor != null) {
296 subscriberExecutor.shutdown();
297 }
298 subscriberExecutor = Executors.newFixedThreadPool(subscriberProcessingThreads,
299 groupedThreads(ONOS_OLT_SERVICE,
300 "subscriber-installer-%d"));
301 }
302
303 String queueDelay = get(properties, REQUEUE_DELAY);
304 requeueDelay = isNullOrEmpty(queueDelay) ?
305 REQUEUE_DELAY_DEFAULT : Integer.parseInt(queueDelay.trim());
alshabibe0559672016-02-21 14:49:51 -0800306 }
Andrea Campanella61650a12022-01-24 18:09:44 -0800307 log.info("Modified. Values = {}: {}, {}:{}, {}:{}," +
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700308 "{}:{}, {}:{}, {}:{}",
309 DEFAULT_BP_ID, defaultBpId,
310 DEFAULT_MCAST_SERVICE_NAME, multicastServiceName,
311 FLOW_PROCESSING_THREADS, flowProcessingThreads,
Andrea Campanella61650a12022-01-24 18:09:44 -0800312 FLOW_EXECUTOR_QUEUE_SIZE, flowExecutorQueueSize,
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700313 SUBSCRIBER_PROCESSING_THREADS, subscriberProcessingThreads,
314 REQUEUE_DELAY, requeueDelay);
alshabibe0559672016-02-21 14:49:51 -0800315 }
316
Ilayda Ozdemir90a93622021-02-25 09:40:58 +0000317
alshabib32232c82016-02-25 17:57:24 -0500318 @Override
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700319 public boolean provisionSubscriber(ConnectPoint cp) {
320 subscriberExecutor.submit(() -> {
321 Device device = deviceService.getDevice(cp.deviceId());
322 Port port = deviceService.getPort(device.id(), cp.port());
323 AccessDevicePort accessDevicePort = new AccessDevicePort(port);
Hardik Windlass395ff372019-06-13 05:16:00 +0000324
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700325 if (oltDeviceService.isNniPort(device, port.number())) {
326 log.warn("will not provision a subscriber on the NNI {}", accessDevicePort);
327 return false;
Saurav Das026650f2020-09-21 18:56:35 -0700328 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700329
330 log.info("Provisioning subscriber on {}", accessDevicePort);
331
332 if (oltFlowService.isSubscriberServiceProvisioned(accessDevicePort)) {
333 log.error("Subscriber on {} is already provisioned", accessDevicePort);
334 return false;
335 }
336
337 SubscriberAndDeviceInformation si = subsService.get(getPortName(port));
338 if (si == null) {
339 log.error("Subscriber information not found in sadis for port {}", accessDevicePort);
340 return false;
341 }
342 DiscoveredSubscriber sub = new DiscoveredSubscriber(device, port,
343 DiscoveredSubscriber.Status.ADDED, true, si);
344
345 // NOTE we need to keep a list of the subscribers that are provisioned on a port,
346 // regardless of the flow status
347 si.uniTagList().forEach(uti -> {
348 ServiceKey sk = new ServiceKey(accessDevicePort, uti);
349 oltFlowService.updateProvisionedSubscriberStatus(sk, true);
350 });
351
352 addSubscriberToQueue(sub);
353 return true;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +0100354 });
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700355 //NOTE this only means we have taken the request in, nothing more.
Amit Ghosh31939522018-08-16 13:28:21 +0100356 return true;
alshabibbf23a1f2016-01-14 17:27:11 -0800357 }
358
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000359 @Override
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700360 public boolean removeSubscriber(ConnectPoint cp) {
361 subscriberExecutor.submit(() -> {
362 Device device = deviceService.getDevice(DeviceId.deviceId(cp.deviceId().toString()));
363 Port port = deviceService.getPort(device.id(), cp.port());
364 AccessDevicePort accessDevicePort = new AccessDevicePort(port);
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000365
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700366 if (oltDeviceService.isNniPort(device, port.number())) {
367 log.warn("will not un-provision a subscriber on the NNI {}",
368 accessDevicePort);
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100369 return false;
370 }
371
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700372 log.info("Un-provisioning subscriber on {}", accessDevicePort);
373
374 if (!oltFlowService.isSubscriberServiceProvisioned(accessDevicePort)) {
375 log.error("Subscriber on {} is not provisioned", accessDevicePort);
376 return false;
377 }
Andrea Campanella4bfc2422022-06-28 14:33:01 +0200378 SubscriberAndDeviceInformation si;
379 //First check if the subscriber is in the programmed subscriber map, if not fallback to sadis
380 List<Map.Entry<ServiceKey, UniTagInformation>> entries =
381 oltFlowService.getProgrammedSubscribers().entrySet().stream()
382 .filter(entry -> entry.getKey().getPort().equals(accessDevicePort))
383 .collect(Collectors.toList());
384 if (!entries.isEmpty()) {
385 List<UniTagInformation> programmedList = entries.stream()
386 .map(entry -> entry.getKey().getService())
387 .collect(Collectors.toList());
388 si = new SubscriberAndDeviceInformation();
389 si.setUniTagList(programmedList);
390 } else {
391 si = subsService.get(getPortName(port));
392 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700393 if (si == null) {
Andrea Campanella4bfc2422022-06-28 14:33:01 +0200394 log.error("Subscriber information not found in programmed subscribers and sadis for port {}",
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700395 accessDevicePort);
396 // NOTE that we are returning true so that the subscriber is removed from the queue
397 // and we can move on provisioning others
398 return false;
399 }
400 DiscoveredSubscriber sub = new DiscoveredSubscriber(device, port,
Andrea Campanella34ce61a2022-04-28 18:55:46 +0200401 DiscoveredSubscriber.Status.ADMIN_REMOVED, true, si);
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700402
403 // NOTE we need to keep a list of the subscribers that are provisioned on a port,
404 // regardless of the flow status
405 si.uniTagList().forEach(uti -> {
406 ServiceKey sk = new ServiceKey(accessDevicePort, uti);
Andrea Campanella4bfc2422022-06-28 14:33:01 +0200407 log.debug("Updating status for {} to false", sk);
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700408 oltFlowService.updateProvisionedSubscriberStatus(sk, false);
Gamze Abakaf59c0912019-04-19 08:24:28 +0000409 });
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700410
411 addSubscriberToQueue(sub);
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100412 return true;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700413 });
414 //NOTE this only means we have taken the request in, nothing more.
415 return true;
416 }
417
418 @Override
419 public boolean provisionSubscriber(ConnectPoint cp, VlanId cTag, VlanId sTag, Integer tpId) {
420 log.debug("Provisioning subscriber on {}, with cTag {}, stag {}, tpId {}",
421 cp, cTag, sTag, tpId);
422 Device device = deviceService.getDevice(cp.deviceId());
423 Port port = deviceService.getPort(device.id(), cp.port());
424 AccessDevicePort accessDevicePort = new AccessDevicePort(port);
425
426 if (oltDeviceService.isNniPort(device, port.number())) {
427 log.warn("will not provision a subscriber on the NNI {}", accessDevicePort);
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100428 return false;
429 }
Amit Ghosh95e2f652017-08-23 12:49:46 +0100430
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700431 SubscriberAndDeviceInformation si = new SubscriberAndDeviceInformation();
Andrea Campanella4bfc2422022-06-28 14:33:01 +0200432 UniTagInformation specificService = getUniTagInformation(port, cTag, sTag, tpId);
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700433 if (specificService == null) {
434 log.error("Can't find Information for subscriber on {}, with cTag {}, " +
435 "stag {}, tpId {}", cp, cTag, sTag, tpId);
Amit Ghosh31939522018-08-16 13:28:21 +0100436 return false;
437 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700438 List<UniTagInformation> uniTagInformationList = new LinkedList<>();
439 uniTagInformationList.add(specificService);
440 si.setUniTagList(uniTagInformationList);
441 DiscoveredSubscriber sub = new DiscoveredSubscriber(device, port,
442 DiscoveredSubscriber.Status.ADDED, true, si);
Amit Ghosh31939522018-08-16 13:28:21 +0100443
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700444 // NOTE we need to keep a list of the subscribers that are provisioned on a port,
445 // regardless of the flow status
446 ServiceKey sk = new ServiceKey(accessDevicePort, specificService);
447 if (oltFlowService.isSubscriberServiceProvisioned(sk)) {
448 log.error("Subscriber on {} is already provisioned", sk);
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100449 return false;
450 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700451 oltFlowService.updateProvisionedSubscriberStatus(sk, true);
452
453 addSubscriberToQueue(sub);
454 return true;
Amit Ghosh31939522018-08-16 13:28:21 +0100455 }
456
457 @Override
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700458 public boolean removeSubscriber(ConnectPoint cp, VlanId cTag, VlanId sTag, Integer tpId) {
459 log.debug("Un-provisioning subscriber on {} with cTag {}, stag {}, tpId {}",
460 cp, cTag, sTag, tpId);
461 Device device = deviceService.getDevice(cp.deviceId());
462 Port port = deviceService.getPort(device.id(), cp.port());
463 AccessDevicePort accessDevicePort = new AccessDevicePort(port);
464
465 if (oltDeviceService.isNniPort(device, port.number())) {
466 log.warn("will not un-provision a subscriber on the NNI {}",
467 accessDevicePort);
468 return false;
469 }
470
471 SubscriberAndDeviceInformation si = new SubscriberAndDeviceInformation();
Andrea Campanella4bfc2422022-06-28 14:33:01 +0200472 UniTagInformation specificService = getUniTagInformation(port, cTag, sTag, tpId);
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700473 if (specificService == null) {
474 log.error("Can't find Information for subscriber on {}, with cTag {}, " +
475 "stag {}, tpId {}", cp, cTag, sTag, tpId);
476 return false;
477 }
478 List<UniTagInformation> uniTagInformationList = new LinkedList<>();
479 uniTagInformationList.add(specificService);
480 si.setUniTagList(uniTagInformationList);
481 DiscoveredSubscriber sub = new DiscoveredSubscriber(device, port,
Andrea Campanella34ce61a2022-04-28 18:55:46 +0200482 DiscoveredSubscriber.Status.ADMIN_REMOVED, true, si);
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700483
484 // NOTE we need to keep a list of the subscribers that are provisioned on a port,
485 // regardless of the flow status
486 ServiceKey sk = new ServiceKey(accessDevicePort, specificService);
487 if (!oltFlowService.isSubscriberServiceProvisioned(sk)) {
488 log.error("Subscriber on {} is not provisioned", sk);
489 return false;
490 }
491 oltFlowService.updateProvisionedSubscriberStatus(sk, false);
492
493 addSubscriberToQueue(sub);
494 return true;
Saurav Das82b8e6d2018-10-04 15:25:12 -0700495 }
496
497 @Override
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700498 public List<DeviceId> getConnectedOlts() {
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100499 List<DeviceId> olts = new ArrayList<>();
500 Iterable<Device> devices = deviceService.getDevices();
501 for (Device d : devices) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700502 if (oltDeviceService.isOlt(d)) {
Saurav Das82b8e6d2018-10-04 15:25:12 -0700503 // So this is indeed an OLT device
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100504 olts.add(d.id());
505 }
506 }
507 return olts;
alshabibe0559672016-02-21 14:49:51 -0800508 }
509
Amit Ghosh31939522018-08-16 13:28:21 +0100510 /**
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700511 * Finds the connect-point to which a subscriber is connected.
Amit Ghosh31939522018-08-16 13:28:21 +0100512 *
513 * @param id The id of the subscriber, this is the same ID as in Sadis
514 * @return Subscribers ConnectPoint if found else null
515 */
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700516 @Override
517 public ConnectPoint findSubscriberConnectPoint(String id) {
Amit Ghosh31939522018-08-16 13:28:21 +0100518
519 Iterable<Device> devices = deviceService.getDevices();
520 for (Device d : devices) {
521 for (Port p : deviceService.getPorts(d.id())) {
522 log.trace("Comparing {} with {}", p.annotations().value(AnnotationKeys.PORT_NAME), id);
523 if (p.annotations().value(AnnotationKeys.PORT_NAME).equals(id)) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700524 log.debug("Found on {}", portWithName(p));
Amit Ghosh31939522018-08-16 13:28:21 +0100525 return new ConnectPoint(d.id(), p.number());
526 }
527 }
528 }
529 return null;
530 }
531
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700532 protected void processDiscoveredSubscribers() {
Gamze Abaka641fc072018-09-04 09:16:27 +0000533
Andrea Campanella61650a12022-01-24 18:09:44 -0800534 log.info("Started processDiscoveredSubscribers loop");
535 while (true) {
536 Set<ConnectPoint> discoveredCps;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700537 try {
538 queueReadLock.lock();
Andrea Campanella61650a12022-01-24 18:09:44 -0800539 discoveredCps = new HashSet<>(eventsQueues.keySet());
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700540 } catch (Exception e) {
Andrea Campanella61650a12022-01-24 18:09:44 -0800541 log.error("Cannot read keys from queue map", e);
542 continue;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700543 } finally {
544 queueReadLock.unlock();
Tunahan Sezena07fe962021-02-24 08:24:24 +0000545 }
Tunahan Sezena07fe962021-02-24 08:24:24 +0000546
Andrea Campanella61650a12022-01-24 18:09:44 -0800547 discoveredCps.forEach(cp -> {
548 LinkedBlockingQueue<DiscoveredSubscriber> eventsQueue;
549
550 try {
551 queueReadLock.lock();
552 eventsQueue = eventsQueues.get(cp);
553 } catch (Exception e) {
554 log.error("Cannot get key from queue map", e);
555 return;
556 } finally {
557 queueReadLock.unlock();
Andrea Campanella7a1d7e72020-11-05 10:40:10 +0100558 }
Tunahan Sezena07fe962021-02-24 08:24:24 +0000559
Andrea Campanella61650a12022-01-24 18:09:44 -0800560 if (!oltDeviceService.isLocalLeader(cp.deviceId())) {
561 // if we're not local leader for this device, ignore this queue
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700562 if (log.isTraceEnabled()) {
Andrea Campanella61650a12022-01-24 18:09:44 -0800563 log.trace("Ignoring queue on CP {} as not master of the device", cp);
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700564 }
Andrea Campanella61650a12022-01-24 18:09:44 -0800565 return;
566 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700567
Andrea Campanella61650a12022-01-24 18:09:44 -0800568 try {
569 flowsExecutor.execute(() -> {
570 if (!eventsQueue.isEmpty()) {
571 // we do not remove the event from the queue until it has been processed
572 // in that way we guarantee that events are processed in order
573 DiscoveredSubscriber sub = eventsQueue.peek();
574 if (sub == null) {
575 // the queue is empty
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700576 return;
577 }
578
Andrea Campanella61650a12022-01-24 18:09:44 -0800579 if (log.isTraceEnabled()) {
Andrea Campanellaccb32862022-02-17 16:29:10 +0100580 log.trace("Processing subscriber {} on port {} with status {}, has subscriber {}",
Andrea Campanella61650a12022-01-24 18:09:44 -0800581 sub, portWithName(sub.port), sub.status, sub.hasSubscriber);
582 }
583
584 if (sub.hasSubscriber) {
585 // this is a provision subscriber call
586 if (oltFlowService.handleSubscriberFlows(sub, defaultBpId, multicastServiceName)) {
587 removeSubscriberFromQueue(sub);
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700588 }
Andrea Campanella61650a12022-01-24 18:09:44 -0800589 } else {
590 // this is a port event (ENABLED/DISABLED)
591 // means no subscriber was provisioned on that port
592
593 if (!deviceService.isAvailable(sub.device.id()) ||
594 deviceService.getPort(sub.device.id(), sub.port.number()) == null) {
595 // If the device is not connected or the port is not available do nothing
596 // This can happen when we disable and then immediately delete the device,
597 // the queue is populated but the meters and flows are already gone
598 // thus there is nothing left to do
599 return;
600 }
601
602 if (oltFlowService.handleBasicPortFlows(sub, defaultBpId, defaultBpId)) {
603 if (log.isTraceEnabled()) {
604 log.trace("Processing of port {} completed",
605 portWithName(sub.port));
606 }
607 removeSubscriberFromQueue(sub);
Andrea Campanellaccb32862022-02-17 16:29:10 +0100608 } else {
609 log.debug("Not handling basic port flows " +
610 "for {}, leaving in the queue",
611 portWithName(sub.port));
Andrea Campanella61650a12022-01-24 18:09:44 -0800612 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700613 }
Tunahan Sezena07fe962021-02-24 08:24:24 +0000614 }
Andrea Campanella61650a12022-01-24 18:09:44 -0800615 });
616 } catch (Exception e) {
617 log.error("Exception processing subscriber", e);
Tunahan Sezena07fe962021-02-24 08:24:24 +0000618 }
619 });
Tunahan Sezena07fe962021-02-24 08:24:24 +0000620
Andrea Campanella61650a12022-01-24 18:09:44 -0800621 try {
622 TimeUnit.MILLISECONDS.sleep(requeueDelay);
623 } catch (InterruptedException e) {
624 log.debug("Interrupted while waiting to requeue", e);
625 }
Tunahan Sezena07fe962021-02-24 08:24:24 +0000626 }
Tunahan Sezena07fe962021-02-24 08:24:24 +0000627 }
628
Andrea Campanella61650a12022-01-24 18:09:44 -0800629
Tunahan Sezena07fe962021-02-24 08:24:24 +0000630 /**
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000631 * Checks the subscriber uni tag list and find the uni tag information.
632 * using the pon c tag, pon s tag and the technology profile id
633 * May return Optional<null>
634 *
Andrea Campanella4bfc2422022-06-28 14:33:01 +0200635 * @param port port of the subscriber
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000636 * @param innerVlan pon c tag
637 * @param outerVlan pon s tag
638 * @param tpId the technology profile id
639 * @return the found uni tag information
640 */
Andrea Campanella4bfc2422022-06-28 14:33:01 +0200641 private UniTagInformation getUniTagInformation(Port port, VlanId innerVlan,
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700642 VlanId outerVlan, int tpId) {
Andrea Campanella4bfc2422022-06-28 14:33:01 +0200643 String portName = portWithName(port);
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000644 log.debug("Getting uni tag information for {}, innerVlan: {}, outerVlan: {}, tpId: {}",
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700645 portName, innerVlan, outerVlan, tpId);
Andrea Campanella4bfc2422022-06-28 14:33:01 +0200646 //First check if the subscriber is in the programmed subscriber map, if not fallback to sadis
647 //there should be only one sub service with these characteristics.
648 Optional<Map.Entry<ServiceKey, UniTagInformation>> service = oltFlowService.getProgrammedSubscribers()
649 .entrySet().stream()
650 .filter(entry -> entry.getKey().getPort().equals(new AccessDevicePort(port))
651 && entry.getValue().getPonSTag().equals(outerVlan)
652 && entry.getValue().getPonCTag().equals(innerVlan))
653 .findFirst();
654 if (service.isPresent()) {
655 log.debug("Subscriber was programmed with " +
656 "unit tag info for {}, {}, {}, {}", port, innerVlan, outerVlan, tpId);
657 return service.get().getValue();
658 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700659 SubscriberAndDeviceInformation subInfo = subsService.get(portName);
Gamze Abakaf59c0912019-04-19 08:24:28 +0000660 if (subInfo == null) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700661 log.warn("Subscriber information doesn't exist for {}", portName);
662 return null;
Gamze Abakaf59c0912019-04-19 08:24:28 +0000663 }
664
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000665 List<UniTagInformation> uniTagList = subInfo.uniTagList();
666 if (uniTagList == null) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700667 log.warn("Uni tag list is not found for the subscriber {} on {}", subInfo.id(), portName);
668 return null;
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000669 }
Andrea Campanella4bfc2422022-06-28 14:33:01 +0200670 UniTagInformation uniTagInformation = OltUtils.getUniTagInformation(subInfo, innerVlan, outerVlan, tpId);
671 if (uniTagInformation == null) {
Harsh Awasthic1e4bf52022-02-09 14:14:14 +0530672 // Try again after invalidating cache for the particular port name.
673 subsService.invalidateId(portName);
674 subInfo = subsService.get(portName);
Andrea Campanella4bfc2422022-06-28 14:33:01 +0200675 uniTagInformation = OltUtils.getUniTagInformation(subInfo, innerVlan, outerVlan, tpId);
Gamze Abaka1efc80c2019-02-15 12:10:54 +0000676 }
Gamze Abaka1efc80c2019-02-15 12:10:54 +0000677
Andrea Campanella4bfc2422022-06-28 14:33:01 +0200678 if (uniTagInformation == null) {
Matteo Scandolo19b56f62020-10-29 13:29:21 -0700679 log.warn("SADIS doesn't include the service with ponCtag {} ponStag {} and tpId {} on {}",
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700680 innerVlan, outerVlan, tpId, portName);
Saurav Das82b8e6d2018-10-04 15:25:12 -0700681 return null;
682 }
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100683
Andrea Campanella4bfc2422022-06-28 14:33:01 +0200684 return uniTagInformation;
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100685 }
686
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700687 protected void bindSadisService(SadisService service) {
688 sadisService = service;
689 subsService = sadisService.getSubscriberInfoService();
690
691 log.info("Sadis-service binds to onos.");
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100692 }
693
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700694 protected void unbindSadisService(SadisService service) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700695 deviceListener = null;
696 sadisService = null;
697 subsService = null;
698 log.info("Sadis-service unbinds from onos.");
Jonathan Hart1d34c8b2018-05-05 15:37:28 -0700699 }
700
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700701 protected void addSubscriberToQueue(DiscoveredSubscriber sub) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700702 try {
Andrea Campanella61650a12022-01-24 18:09:44 -0800703 ConnectPoint cp = new ConnectPoint(sub.device.id(), sub.port.number());
Matteo Scandolod7aa89c2021-12-07 10:21:34 -0800704
Andrea Campanella61650a12022-01-24 18:09:44 -0800705 try {
706 queueWriteLock.lock();
707 eventsQueues.compute(cp, (subcp, queue) -> {
708 queue = queue == null ? new LinkedBlockingQueue<>() : queue;
709 log.info("Adding subscriber {} to queue: {} with existing {}",
710 sub, portWithName(sub.port), queue);
711 queue.add(sub);
712 return queue;
713 });
714 } catch (UnsupportedOperationException | ClassCastException |
715 NullPointerException | IllegalArgumentException e) {
716 log.error("Cannot add subscriber {} to queue: {}", portWithName(sub.port), e.getMessage());
717 } finally {
718 queueWriteLock.unlock();
719 }
720 } catch (Exception e) {
721 log.error("Can't add {} to queue", sub, e);
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800722 }
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800723 }
724
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700725 protected void removeSubscriberFromQueue(DiscoveredSubscriber sub) {
726 ConnectPoint cp = new ConnectPoint(sub.device.id(), sub.port.number());
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700727 try {
728 queueWriteLock.lock();
Andrea Campanella61650a12022-01-24 18:09:44 -0800729 eventsQueues.compute(cp, (subcp, queue) -> {
730 if (log.isTraceEnabled()) {
731 log.trace("Removing subscriber {} from queue : {} " +
732 "with existing {}", sub,
733 portWithName(sub.port), queue);
734 }
735 if (queue == null) {
736 log.warn("Cannot find queue for connectPoint {}", cp);
737 return queue;
738 }
739 boolean removed = queue.remove(sub);
740 if (!removed) {
741 log.warn("Subscriber {} has not been removed from queue, " +
742 "is it still there? {}", sub, queue);
743 return queue;
744 } else {
745 log.debug("Subscriber {} has been removed from the queue {}",
746 sub, queue);
747 }
748
749 return queue;
750 });
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700751 } catch (UnsupportedOperationException | ClassCastException |
752 NullPointerException | IllegalArgumentException e) {
753 log.error("Cannot remove subscriber {} from queue: {}", sub, e.getMessage());
754 } finally {
755 queueWriteLock.unlock();
Tunahan Sezena07fe962021-02-24 08:24:24 +0000756 }
757 }
758
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700759 protected class OltDeviceListener
760 implements DeviceListener {
761
762 private final Logger log = LoggerFactory.getLogger(getClass());
763 protected ExecutorService eventExecutor;
764
765 /**
766 * Builds the listener with all the proper services and information needed.
767 */
768 public OltDeviceListener() {
769 this.eventExecutor = Executors.newFixedThreadPool(flowProcessingThreads,
770 groupedThreads("onos/olt-device-listener-event", "event-%d", log));
771 }
772
773 public void deactivate() {
774 this.eventExecutor.shutdown();
775 }
Saurav Dasa9d5f442019-03-06 19:32:48 -0800776
alshabibf0e7e702015-05-30 18:22:36 -0700777 @Override
778 public void event(DeviceEvent event) {
Andrea Campanella61650a12022-01-24 18:09:44 -0800779 if (log.isTraceEnabled()) {
780 log.trace("OltListener receives event {} for: {}/{}", event.type(), event.subject().id(),
781 event.port() != null ? event.port().number() : null);
782 }
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700783 eventExecutor.execute(() -> {
Andrea Campanella61650a12022-01-24 18:09:44 -0800784 if (log.isTraceEnabled()) {
785 log.trace("OltListener Executor receives event {} for: {}/{}",
786 event.type(), event.subject().id(),
787 event.port() != null ? event.port().number() : null);
788 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700789 boolean isOlt = oltDeviceService.isOlt(event.subject());
790 DeviceId deviceId = event.subject().id();
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700791 switch (event.type()) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700792 case PORT_STATS_UPDATED:
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700793 case DEVICE_ADDED:
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700794 return;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700795 case PORT_ADDED:
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700796 case PORT_REMOVED:
797 if (!isOlt) {
798 log.trace("Ignoring event {}, this is not an OLT device", deviceId);
799 return;
800 }
801 if (!oltDeviceService.isLocalLeader(deviceId)) {
802 log.trace("Device {} is not local to this node", deviceId);
803 return;
804 }
805 // port added, updated and removed are treated in the same way as we only care whether the port
806 // is enabled or not
807 handleOltPort(event.type(), event.subject(), event.port());
808 return;
Andrea Campanellacc6dc7e2022-03-22 10:38:43 +0100809 case PORT_UPDATED:
810 if (!isOlt) {
811 log.trace("Ignoring event {}, this is not an OLT device", deviceId);
812 return;
813 }
814 // port updated are handled only when the device is available, makes not sense otherwise.
815 // this also solves an issue with port events and device disconnection events handled
816 // in sparse order causing failures in the ofagent disconnect test
817 // (see https://jira.opencord.org/browse/VOL-4669)
818 if (!deviceService.isAvailable(deviceId)) {
819 log.debug("Ignoring port event {} on {} as it is disconnected", event, deviceId);
820 return;
821 }
822 if (!oltDeviceService.isLocalLeader(deviceId)) {
823 log.trace("Device {} is not local to this node", deviceId);
824 return;
825 }
826 handleOltPort(event.type(), event.subject(), event.port());
827 return;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700828 case DEVICE_AVAILABILITY_CHANGED:
829 if (!isOlt) {
830 log.trace("Ignoring event {}, this is not an OLT device", deviceId);
831 return;
832 }
833 if (deviceService.isAvailable(deviceId)) {
834 if (!oltDeviceService.isLocalLeader(deviceId)) {
835 if (log.isTraceEnabled()) {
836 log.trace("Device {} is not local to this node, not handling available device",
837 deviceId);
838 }
839 } else {
840 log.info("Handling available device: {}", deviceId);
841 handleExistingPorts();
842 }
843 } else if (!deviceService.isAvailable(deviceId) && deviceService.getPorts(deviceId).isEmpty()) {
844 // NOTE that upon disconnection there is no mastership on the device,
845 // and we should anyway clear the local cache of the flows/meters across instances.
846 // We're only clearing the device if there are no available ports,
847 // otherwise we assume it's a temporary disconnection
848 log.info("Device {} availability changed to false and ports are empty, " +
849 "purging meters and flows", deviceId);
850 //NOTE all the instances will call these methods
851 oltFlowService.purgeDeviceFlows(deviceId);
852 oltMeterService.purgeDeviceMeters(deviceId);
Andrea Campanellaccb32862022-02-17 16:29:10 +0100853 // cpStatus is a distributed map, thus only master will update it.
854 if (oltDeviceService.isLocalLeader(deviceId)) {
855 log.debug("Master, clearing cp status for {}", deviceId);
856 clearQueueForDevice(deviceId);
857 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700858 } else {
859 log.info("Device {} availability changed to false, but ports are still available, " +
Matteo Scandolod7aa89c2021-12-07 10:21:34 -0800860 "assuming temporary disconnection.",
861 deviceId);
862 if (log.isTraceEnabled()) {
863 log.trace("Available ports: {}", deviceService.getPorts(deviceId));
864 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700865 }
866 return;
867 case DEVICE_REMOVED:
868 if (!isOlt) {
869 log.trace("Ignoring event {}, this is not an OLT device", deviceId);
870 return;
871 }
872 log.info("Device {} Removed, purging meters and flows", deviceId);
873 oltFlowService.purgeDeviceFlows(deviceId);
874 oltMeterService.purgeDeviceMeters(deviceId);
Andrea Campanellaccb32862022-02-17 16:29:10 +0100875 if (oltDeviceService.isLocalLeader(deviceId)) {
876 log.debug("Master, clearing cp status for {}", deviceId);
877 clearQueueForDevice(deviceId);
878 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700879 return;
880 default:
881 if (log.isTraceEnabled()) {
882 log.trace("Not handling event: {}, ", event);
883 }
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700884 }
885 });
alshabibf0e7e702015-05-30 18:22:36 -0700886 }
Gamze Abaka838d8142019-02-21 07:06:55 +0000887
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700888 protected void clearQueueForDevice(DeviceId devId) {
889 try {
890 queueWriteLock.lock();
891 Iterator<Map.Entry<ConnectPoint, LinkedBlockingQueue<DiscoveredSubscriber>>> iter =
892 eventsQueues.entrySet().iterator();
893 while (iter.hasNext()) {
894 Map.Entry<ConnectPoint, LinkedBlockingQueue<DiscoveredSubscriber>> entry = iter.next();
895 if (entry.getKey().deviceId().equals(devId)) {
896 eventsQueues.remove(entry.getKey());
Andrea Campanellaccb32862022-02-17 16:29:10 +0100897 log.debug("Removing key from queue {}", entry.getKey());
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700898 }
899 }
900 } finally {
901 queueWriteLock.unlock();
Gamze Abaka838d8142019-02-21 07:06:55 +0000902 }
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000903 }
904
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700905 protected void handleOltPort(DeviceEvent.Type type, Device device, Port port) {
906 log.info("OltDeviceListener receives event {} for port {} with status {} on device {}", type,
907 portWithName(port), port.isEnabled() ? "ENABLED" : "DISABLED", device.id());
Andrea Campanella61650a12022-01-24 18:09:44 -0800908 boolean isNni = oltDeviceService.isNniPort(device, port.number());
909
910 if (!isNni && type == DeviceEvent.Type.PORT_ADDED) {
911 log.debug("Ignoring PORT_ADD on UNI port {}", portWithName(port));
912 return;
913 }
914
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700915 if (port.isEnabled()) {
Andrea Campanella61650a12022-01-24 18:09:44 -0800916 if (isNni) {
Gustavo Silva29fb20e2022-05-26 09:59:54 -0300917 FlowOperation action = FlowOperation.ADD;
Matteo Scandolo1f8de332021-12-06 12:18:24 -0800918 // NOTE the NNI is only disabled if the OLT shuts down (reboot or failure).
919 // In that case the flows are purged anyway, so there's no need to deal with them,
920 // it would actually be counter-productive as the openflow connection is severed and they won't
921 // be correctly processed
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700922 if (type == DeviceEvent.Type.PORT_REMOVED) {
Matteo Scandolo1f8de332021-12-06 12:18:24 -0800923 log.debug("NNI port went down, " +
924 "ignoring event as flows will be removed in the generic device cleanup");
925 return;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700926 }
927 oltFlowService.handleNniFlows(device, port, action);
928 } else {
929 post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_ADDED, device.id(), port));
930 // NOTE if the subscriber was previously provisioned,
931 // then add it back to the queue to be re-provisioned
932 boolean provisionSubscriber = oltFlowService.
933 isSubscriberServiceProvisioned(new AccessDevicePort(port));
934 SubscriberAndDeviceInformation si = subsService.get(getPortName(port));
935 if (si == null) {
936 //NOTE this should not happen given that the subscriber was provisioned before
937 log.error("Subscriber information not found in sadis for port {}",
938 portWithName(port));
939 return;
940 }
941
942 DiscoveredSubscriber.Status status = DiscoveredSubscriber.Status.ADDED;
943 if (type == DeviceEvent.Type.PORT_REMOVED) {
944 status = DiscoveredSubscriber.Status.REMOVED;
945 }
946
947 DiscoveredSubscriber sub =
948 new DiscoveredSubscriber(device, port,
949 status, provisionSubscriber, si);
950 addSubscriberToQueue(sub);
951 }
952 } else {
Andrea Campanella61650a12022-01-24 18:09:44 -0800953 if (isNni) {
Matteo Scandolob6981dc2021-12-02 16:31:44 -0800954 // NOTE the NNI is only disabled if the OLT shuts down (reboot or failure).
955 // In that case the flows are purged anyway, so there's no need to deal with them,
956 // it would actually be counter-productive as the openflow connection is severed and they won't
957 // be correctly processed
958 log.debug("NNI port went down, " +
959 "ignoring event as flows will be removed in the generic device cleanup");
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700960 } else {
961 post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_REMOVED, device.id(), port));
962 // NOTE we are assuming that if a subscriber has default eapol
963 // it does not have subscriber flows
964 if (oltFlowService.hasDefaultEapol(port)) {
965 SubscriberAndDeviceInformation si = subsService.get(getPortName(port));
966 if (si == null) {
967 //NOTE this should not happen given that the subscriber was provisioned before
968 log.error("Subscriber information not found in sadis for port {}",
969 portWithName(port));
970 return;
971 }
972 DiscoveredSubscriber sub =
973 new DiscoveredSubscriber(device, port,
974 DiscoveredSubscriber.Status.REMOVED, false, si);
975
976 addSubscriberToQueue(sub);
977
978 } else if (oltFlowService.isSubscriberServiceProvisioned(new AccessDevicePort(port))) {
979 SubscriberAndDeviceInformation si = subsService.get(getPortName(port));
980 if (si == null) {
981 //NOTE this should not happen given that the subscriber was provisioned before
982 log.error("Subscriber information not found in sadis for port {}",
983 portWithName(port));
984 return;
985 }
986 DiscoveredSubscriber sub =
987 new DiscoveredSubscriber(device, port,
988 DiscoveredSubscriber.Status.REMOVED, true, si);
989 addSubscriberToQueue(sub);
990 }
991 }
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000992 }
Gamze Abaka838d8142019-02-21 07:06:55 +0000993 }
Gamze Abakada282b42019-03-11 13:16:48 +0000994
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700995 /**
996 * This method is invoked on app activation in order to deal
997 * with devices and ports that are already existing in the system
998 * and thus won't trigger an event.
999 * It is also needed on instance reboot and device reconnect
1000 */
1001 protected void handleExistingPorts() {
1002 Iterable<DeviceId> devices = getConnectedOlts();
1003 for (DeviceId deviceId : devices) {
1004 log.info("Handling existing OLT Ports for device {}", deviceId);
1005 if (oltDeviceService.isLocalLeader(deviceId)) {
1006 List<Port> ports = deviceService.getPorts(deviceId);
1007 for (Port p : ports) {
1008 if (PortNumber.LOCAL.equals(p.number()) || !p.isEnabled()) {
1009 continue;
1010 }
1011 Device device = deviceService.getDevice(deviceId);
1012 deviceListener.handleOltPort(DeviceEvent.Type.PORT_UPDATED, device, p);
1013 }
1014 }
Jonathan Hart4f178fa2020-02-03 10:46:01 -08001015 }
1016 }
1017 }
Andrea Campanella61650a12022-01-24 18:09:44 -08001018
1019 protected final class ThreadPoolQueue extends ArrayBlockingQueue<Runnable> {
1020
1021 public ThreadPoolQueue(int capacity) {
1022 super(capacity);
1023 }
1024
1025 @Override
1026 public boolean offer(Runnable runnable) {
1027 if (runnable == null) {
1028 return false;
1029 }
1030 try {
1031 put(runnable);
1032 } catch (InterruptedException e1) {
1033 Thread.currentThread().interrupt();
1034 return false;
1035 }
1036 return true;
1037 }
1038
1039 }
Hardik Windlass395ff372019-06-13 05:16:00 +00001040}