blob: f6537e5c70f56dfc4d17ea6fd329186e4d7c3036 [file] [log] [blame]
alshabibf0e7e702015-05-30 18:22:36 -07001/*
Joey Armstrong7f6d6d22023-01-09 17:09:50 -05002 * Copyright 2021-2023 Open Networking Foundation (ONF) and the ONF Contributors
alshabibf0e7e702015-05-30 18:22:36 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
alshabib36a4d732016-06-01 16:03:59 -070016package org.opencord.olt.impl;
alshabibf0e7e702015-05-30 18:22:36 -070017
alshabibf0e7e702015-05-30 18:22:36 -070018import org.onlab.packet.VlanId;
Jonathan Hart4f178fa2020-02-03 10:46:01 -080019import org.onlab.util.KryoNamespace;
Gamze Abaka1f62dd92020-05-07 08:58:13 +000020import org.onosproject.cfg.ComponentConfigService;
Jonathan Hart4f178fa2020-02-03 10:46:01 -080021import org.onosproject.cluster.ClusterService;
Andrea Campanellaaf39b4c2020-05-13 14:07:44 +020022import org.onosproject.cluster.LeadershipService;
alshabibf0e7e702015-05-30 18:22:36 -070023import org.onosproject.core.ApplicationId;
24import org.onosproject.core.CoreService;
alshabib8e4fd2f2016-01-12 15:55:53 -080025import org.onosproject.event.AbstractListenerManager;
Andrea Campanellaaf39b4c2020-05-13 14:07:44 +020026import org.onosproject.mastership.MastershipService;
Amit Ghosh1ed9aef2018-07-17 17:08:16 +010027import org.onosproject.net.AnnotationKeys;
Jonathan Harte533a422015-10-20 17:31:24 -070028import org.onosproject.net.ConnectPoint;
Amit Ghosh1ed9aef2018-07-17 17:08:16 +010029import org.onosproject.net.Device;
alshabibf0e7e702015-05-30 18:22:36 -070030import org.onosproject.net.DeviceId;
alshabibdec2e252016-01-15 12:20:25 -080031import org.onosproject.net.Port;
alshabibf0e7e702015-05-30 18:22:36 -070032import org.onosproject.net.PortNumber;
33import org.onosproject.net.device.DeviceEvent;
34import org.onosproject.net.device.DeviceListener;
35import org.onosproject.net.device.DeviceService;
Jonathan Hart4f178fa2020-02-03 10:46:01 -080036import org.onosproject.store.serializers.KryoNamespaces;
Jonathan Hart4f178fa2020-02-03 10:46:01 -080037import org.onosproject.store.service.Serializer;
38import org.onosproject.store.service.StorageService;
alshabib36a4d732016-06-01 16:03:59 -070039import org.opencord.olt.AccessDeviceEvent;
40import org.opencord.olt.AccessDeviceListener;
Gustavo Silva29fb20e2022-05-26 09:59:54 -030041import org.opencord.olt.AccessDevicePort;
alshabib36a4d732016-06-01 16:03:59 -070042import org.opencord.olt.AccessDeviceService;
Gustavo Silva29fb20e2022-05-26 09:59:54 -030043import org.opencord.olt.DiscoveredSubscriber;
44import org.opencord.olt.OltDeviceServiceInterface;
45import org.opencord.olt.OltFlowServiceInterface;
46import org.opencord.olt.OltMeterServiceInterface;
47import org.opencord.olt.ServiceKey;
48import org.opencord.olt.FlowOperation;
Gamze Abaka641fc072018-09-04 09:16:27 +000049import org.opencord.sadis.BaseInformationService;
50import org.opencord.sadis.SadisService;
Amit Ghosh1ed9aef2018-07-17 17:08:16 +010051import org.opencord.sadis.SubscriberAndDeviceInformation;
Andrea Campanellacbbb7952019-11-25 06:38:41 +000052import org.opencord.sadis.UniTagInformation;
alshabibe0559672016-02-21 14:49:51 -080053import org.osgi.service.component.ComponentContext;
Carmelo Casconeca931162019-07-15 18:22:24 -070054import org.osgi.service.component.annotations.Activate;
55import org.osgi.service.component.annotations.Component;
56import org.osgi.service.component.annotations.Deactivate;
57import org.osgi.service.component.annotations.Modified;
58import org.osgi.service.component.annotations.Reference;
59import org.osgi.service.component.annotations.ReferenceCardinality;
Ilayda Ozdemir90a93622021-02-25 09:40:58 +000060import org.osgi.service.component.annotations.ReferencePolicy;
alshabibf0e7e702015-05-30 18:22:36 -070061import org.slf4j.Logger;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -070062import org.slf4j.LoggerFactory;
alshabibf0e7e702015-05-30 18:22:36 -070063
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010064import java.util.ArrayList;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010065import java.util.Dictionary;
Andrea Campanella61650a12022-01-24 18:09:44 -080066import java.util.HashSet;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -070067import java.util.Iterator;
68import java.util.LinkedList;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010069import java.util.List;
70import java.util.Map;
Gustavo Silva89e2f042022-08-01 09:58:04 -030071import java.util.Optional;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010072import java.util.Properties;
73import java.util.Set;
Andrea Campanella61650a12022-01-24 18:09:44 -080074import java.util.concurrent.ArrayBlockingQueue;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010075import java.util.concurrent.ExecutorService;
76import java.util.concurrent.Executors;
77import java.util.concurrent.LinkedBlockingQueue;
78import java.util.concurrent.ScheduledExecutorService;
Andrea Campanella61650a12022-01-24 18:09:44 -080079import java.util.concurrent.ThreadPoolExecutor;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010080import java.util.concurrent.TimeUnit;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -070081import java.util.concurrent.locks.Lock;
82import java.util.concurrent.locks.ReentrantReadWriteLock;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010083
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010084import static com.google.common.base.Strings.isNullOrEmpty;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010085import static org.onlab.util.Tools.get;
86import static org.onlab.util.Tools.groupedThreads;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -070087import static org.opencord.olt.impl.OltUtils.getPortName;
88import static org.opencord.olt.impl.OltUtils.portWithName;
Gustavo Silva89e2f042022-08-01 09:58:04 -030089import static org.opencord.olt.impl.OltUtils.getProgrammedSubscriber;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010090import static org.opencord.olt.impl.OsgiPropertyConstants.*;
alshabibf0e7e702015-05-30 18:22:36 -070091
92/**
Matteo Scandoloaa2adde2021-09-13 12:45:32 -070093 * OLT Application.
alshabibf0e7e702015-05-30 18:22:36 -070094 */
Carmelo Casconeca931162019-07-15 18:22:24 -070095@Component(immediate = true,
96 property = {
Carmelo Casconeca931162019-07-15 18:22:24 -070097 DEFAULT_BP_ID + ":String=" + DEFAULT_BP_ID_DEFAULT,
Andrea Campanellacbbb7952019-11-25 06:38:41 +000098 DEFAULT_MCAST_SERVICE_NAME + ":String=" + DEFAULT_MCAST_SERVICE_NAME_DEFAULT,
Matteo Scandoloaa2adde2021-09-13 12:45:32 -070099 FLOW_PROCESSING_THREADS + ":Integer=" + FLOW_PROCESSING_THREADS_DEFAULT,
Andrea Campanella61650a12022-01-24 18:09:44 -0800100 FLOW_EXECUTOR_QUEUE_SIZE + ":Integer=" + FLOW_EXECUTOR_QUEUE_SIZE_DEFAULT,
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700101 SUBSCRIBER_PROCESSING_THREADS + ":Integer=" + SUBSCRIBER_PROCESSING_THREADS_DEFAULT,
102 REQUEUE_DELAY + ":Integer=" + REQUEUE_DELAY_DEFAULT
Carmelo Casconeca931162019-07-15 18:22:24 -0700103 })
alshabib8e4fd2f2016-01-12 15:55:53 -0800104public class Olt
105 extends AbstractListenerManager<AccessDeviceEvent, AccessDeviceListener>
106 implements AccessDeviceService {
alshabibf0e7e702015-05-30 18:22:36 -0700107
Carmelo Casconeca931162019-07-15 18:22:24 -0700108 @Reference(cardinality = ReferenceCardinality.MANDATORY)
alshabibf0e7e702015-05-30 18:22:36 -0700109 protected DeviceService deviceService;
110
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700111 @Reference(cardinality = ReferenceCardinality.MANDATORY)
112 protected ComponentConfigService cfgService;
Andrea Campanella438e1ad2021-03-26 11:41:16 +0100113
Carmelo Casconeca931162019-07-15 18:22:24 -0700114 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700115 protected MastershipService mastershipService;
alshabibf0e7e702015-05-30 18:22:36 -0700116
Andrea Campanella438e1ad2021-03-26 11:41:16 +0100117 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700118 protected ClusterService clusterService;
119
120 @Reference(cardinality = ReferenceCardinality.MANDATORY)
121 protected LeadershipService leadershipService;
Andrea Campanella438e1ad2021-03-26 11:41:16 +0100122
Ilayda Ozdemir90a93622021-02-25 09:40:58 +0000123 @Reference(cardinality = ReferenceCardinality.OPTIONAL,
124 bind = "bindSadisService",
125 unbind = "unbindSadisService",
126 policy = ReferencePolicy.DYNAMIC)
127 protected volatile SadisService sadisService;
Gamze Abaka641fc072018-09-04 09:16:27 +0000128
Carmelo Casconeca931162019-07-15 18:22:24 -0700129 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700130 protected OltDeviceServiceInterface oltDeviceService;
alshabibe0559672016-02-21 14:49:51 -0800131
Carmelo Casconeca931162019-07-15 18:22:24 -0700132 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700133 protected OltFlowServiceInterface oltFlowService;
134
135 @Reference(cardinality = ReferenceCardinality.MANDATORY)
136 protected OltMeterServiceInterface oltMeterService;
Gamze Abakaad329652018-12-20 10:12:21 +0000137
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800138 @Reference(cardinality = ReferenceCardinality.MANDATORY)
139 protected StorageService storageService;
140
141 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700142 protected CoreService coreService;
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800143
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700144 protected ApplicationId appId;
Andrea Campanellaaf39b4c2020-05-13 14:07:44 +0200145
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700146 private static final String ONOS_OLT_SERVICE = "onos/olt-service";
Tunahan Sezena07fe962021-02-24 08:24:24 +0000147
Carmelo Casconeca931162019-07-15 18:22:24 -0700148 /**
Carmelo Cascone95ff5122019-11-14 14:19:13 -0800149 * Default bandwidth profile id that is used for authentication trap flows.
Carmelo Casconeca931162019-07-15 18:22:24 -0700150 **/
151 protected String defaultBpId = DEFAULT_BP_ID_DEFAULT;
Gamze Abakaad329652018-12-20 10:12:21 +0000152
Carmelo Casconeca931162019-07-15 18:22:24 -0700153 /**
Gamze Abaka51a34e82020-05-08 13:03:14 +0000154 * Default multicast service name.
Carmelo Casconeca931162019-07-15 18:22:24 -0700155 **/
Gamze Abaka51a34e82020-05-08 13:03:14 +0000156 protected String multicastServiceName = DEFAULT_MCAST_SERVICE_NAME_DEFAULT;
Gamze Abaka33feef52019-02-27 08:16:47 +0000157
Saurav Das2d3777a2020-08-07 18:48:51 -0700158 /**
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700159 * Number of threads used to process flows.
Saurav Das2d3777a2020-08-07 18:48:51 -0700160 **/
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700161 protected int flowProcessingThreads = FLOW_PROCESSING_THREADS_DEFAULT;
Saurav Das2d3777a2020-08-07 18:48:51 -0700162
Matteo Scandoloa4aaa972020-10-23 15:24:38 -0700163 /**
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700164 * Number of threads used to process flows.
165 **/
Andrea Campanella61650a12022-01-24 18:09:44 -0800166 protected int flowExecutorQueueSize = FLOW_EXECUTOR_QUEUE_SIZE_DEFAULT;
167
168 /**
169 * Number of threads used to process flows.
170 **/
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700171 protected int subscriberProcessingThreads = SUBSCRIBER_PROCESSING_THREADS_DEFAULT;
172
173 /**
174 * Delay in ms to put an event back in the queue, used to avoid retrying things to often if conditions are not met.
175 **/
176 protected int requeueDelay = REQUEUE_DELAY_DEFAULT;
177
178 private final Logger log = LoggerFactory.getLogger(getClass());
179
180 /**
181 * A queue to asynchronously process events.
Matteo Scandoloa4aaa972020-10-23 15:24:38 -0700182 */
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700183 protected Map<ConnectPoint, LinkedBlockingQueue<DiscoveredSubscriber>> eventsQueues;
alshabibf0e7e702015-05-30 18:22:36 -0700184
Gamze Abaka641fc072018-09-04 09:16:27 +0000185 protected BaseInformationService<SubscriberAndDeviceInformation> subsService;
alshabibf0e7e702015-05-30 18:22:36 -0700186
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700187 /**
188 * Listener for OLT devices events.
189 */
190 protected OltDeviceListener deviceListener = new OltDeviceListener();
191 protected ScheduledExecutorService discoveredSubscriberExecutor =
192 Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/olt",
193 "discovered-cp-%d", log));
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100194
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700195 protected ScheduledExecutorService queueExecutor =
196 Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/olt",
197 "discovered-cp-restore-%d", log));
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700198
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700199 /**
200 * Executor used to defer flow provisioning to a different thread pool.
201 */
202 protected ExecutorService flowsExecutor;
Saurav Dasa9d5f442019-03-06 19:32:48 -0800203
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700204 /**
205 * Executor used to defer subscriber handling from API call to a different thread pool.
206 */
207 protected ExecutorService subscriberExecutor;
208
209 private static final String APP_NAME = "org.opencord.olt";
210
211 private final ReentrantReadWriteLock queueLock = new ReentrantReadWriteLock();
212 private final Lock queueWriteLock = queueLock.writeLock();
213 private final Lock queueReadLock = queueLock.readLock();
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700214
alshabibf0e7e702015-05-30 18:22:36 -0700215 @Activate
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700216 protected void activate(ComponentContext context) {
217 cfgService.registerProperties(getClass());
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700218
alshabibe0559672016-02-21 14:49:51 -0800219 modified(context);
Saurav Das62ad75e2019-03-05 12:22:22 -0800220
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700221 appId = coreService.registerApplication(APP_NAME);
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800222 KryoNamespace serializer = KryoNamespace.newBuilder()
223 .register(KryoNamespaces.API)
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700224 .register(ConnectPoint.class)
225 .register(DiscoveredSubscriber.class)
226 .register(DiscoveredSubscriber.Status.class)
227 .register(SubscriberAndDeviceInformation.class)
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800228 .register(UniTagInformation.class)
Ilayda Ozdemir90a93622021-02-25 09:40:58 +0000229 .register(LinkedBlockingQueue.class)
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800230 .build();
231
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700232 eventsQueues = storageService.<ConnectPoint, LinkedBlockingQueue<DiscoveredSubscriber>>consistentMapBuilder()
233 .withName("volt-subscriber-queues")
234 .withApplicationId(appId)
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800235 .withSerializer(Serializer.using(serializer))
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700236 .build().asJavaMap();
alshabib4ceaed32016-03-03 18:00:58 -0800237
alshabibba357492016-01-27 13:49:46 -0800238 deviceService.addListener(deviceListener);
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700239
240 discoveredSubscriberExecutor.execute(this::processDiscoveredSubscribers);
241 eventDispatcher.addSink(AccessDeviceEvent.class, listenerRegistry);
242 log.info("Started");
243
244 deviceListener.handleExistingPorts();
alshabibf0e7e702015-05-30 18:22:36 -0700245 }
246
247 @Deactivate
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700248 protected void deactivate(ComponentContext context) {
249 cfgService.unregisterProperties(getClass(), false);
250 discoveredSubscriberExecutor.shutdown();
Andrea Campanellaeaf23952021-12-30 15:58:54 +0100251 deviceService.removeListener(deviceListener);
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700252 flowsExecutor.shutdown();
253 subscriberExecutor.shutdown();
254 deviceListener.deactivate();
alshabibf0e7e702015-05-30 18:22:36 -0700255 log.info("Stopped");
256 }
257
alshabibe0559672016-02-21 14:49:51 -0800258 @Modified
259 public void modified(ComponentContext context) {
260 Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700261 if (context != null) {
Andrea Campanella971d5b92020-05-07 11:20:43 +0200262 String bpId = get(properties, DEFAULT_BP_ID);
263 defaultBpId = isNullOrEmpty(bpId) ? defaultBpId : bpId;
Gamze Abakaad329652018-12-20 10:12:21 +0000264
Andrea Campanella971d5b92020-05-07 11:20:43 +0200265 String mcastSN = get(properties, DEFAULT_MCAST_SERVICE_NAME);
266 multicastServiceName = isNullOrEmpty(mcastSN) ? multicastServiceName : mcastSN;
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000267
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700268 String flowThreads = get(properties, FLOW_PROCESSING_THREADS);
269 int oldFlowProcessingThreads = flowProcessingThreads;
270 flowProcessingThreads = isNullOrEmpty(flowThreads) ?
271 oldFlowProcessingThreads : Integer.parseInt(flowThreads.trim());
Saurav Das2d3777a2020-08-07 18:48:51 -0700272
Andrea Campanella61650a12022-01-24 18:09:44 -0800273 String executorQueueSize = get(properties, FLOW_EXECUTOR_QUEUE_SIZE);
274 int oldExecutorQueueSize = flowExecutorQueueSize;
275 flowExecutorQueueSize = isNullOrEmpty(executorQueueSize) ?
276 oldExecutorQueueSize : Integer.parseInt(executorQueueSize.trim());
277
278 if (flowsExecutor == null || oldFlowProcessingThreads != flowProcessingThreads
279 || oldExecutorQueueSize != flowExecutorQueueSize) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700280 if (flowsExecutor != null) {
281 flowsExecutor.shutdown();
282 }
Andrea Campanella61650a12022-01-24 18:09:44 -0800283
284 flowsExecutor = new ThreadPoolExecutor(0, flowProcessingThreads, 30,
285 TimeUnit.SECONDS, new ThreadPoolQueue(flowExecutorQueueSize),
286 new ThreadPoolExecutor.DiscardPolicy());
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700287 }
Gamze Abaka33feef52019-02-27 08:16:47 +0000288
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700289 String subscriberThreads = get(properties, SUBSCRIBER_PROCESSING_THREADS);
290 int oldSubscriberProcessingThreads = subscriberProcessingThreads;
291 subscriberProcessingThreads = isNullOrEmpty(subscriberThreads) ?
292 oldSubscriberProcessingThreads : Integer.parseInt(subscriberThreads.trim());
293
294 if (subscriberExecutor == null || oldSubscriberProcessingThreads != subscriberProcessingThreads) {
295 if (subscriberExecutor != null) {
296 subscriberExecutor.shutdown();
297 }
298 subscriberExecutor = Executors.newFixedThreadPool(subscriberProcessingThreads,
299 groupedThreads(ONOS_OLT_SERVICE,
300 "subscriber-installer-%d"));
301 }
302
303 String queueDelay = get(properties, REQUEUE_DELAY);
304 requeueDelay = isNullOrEmpty(queueDelay) ?
305 REQUEUE_DELAY_DEFAULT : Integer.parseInt(queueDelay.trim());
alshabibe0559672016-02-21 14:49:51 -0800306 }
Andrea Campanella61650a12022-01-24 18:09:44 -0800307 log.info("Modified. Values = {}: {}, {}:{}, {}:{}," +
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700308 "{}:{}, {}:{}, {}:{}",
309 DEFAULT_BP_ID, defaultBpId,
310 DEFAULT_MCAST_SERVICE_NAME, multicastServiceName,
311 FLOW_PROCESSING_THREADS, flowProcessingThreads,
Andrea Campanella61650a12022-01-24 18:09:44 -0800312 FLOW_EXECUTOR_QUEUE_SIZE, flowExecutorQueueSize,
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700313 SUBSCRIBER_PROCESSING_THREADS, subscriberProcessingThreads,
314 REQUEUE_DELAY, requeueDelay);
alshabibe0559672016-02-21 14:49:51 -0800315 }
316
Ilayda Ozdemir90a93622021-02-25 09:40:58 +0000317
alshabib32232c82016-02-25 17:57:24 -0500318 @Override
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700319 public boolean provisionSubscriber(ConnectPoint cp) {
320 subscriberExecutor.submit(() -> {
321 Device device = deviceService.getDevice(cp.deviceId());
322 Port port = deviceService.getPort(device.id(), cp.port());
323 AccessDevicePort accessDevicePort = new AccessDevicePort(port);
Hardik Windlass395ff372019-06-13 05:16:00 +0000324
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700325 if (oltDeviceService.isNniPort(device, port.number())) {
326 log.warn("will not provision a subscriber on the NNI {}", accessDevicePort);
327 return false;
Saurav Das026650f2020-09-21 18:56:35 -0700328 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700329
330 log.info("Provisioning subscriber on {}", accessDevicePort);
331
332 if (oltFlowService.isSubscriberServiceProvisioned(accessDevicePort)) {
333 log.error("Subscriber on {} is already provisioned", accessDevicePort);
334 return false;
335 }
336
337 SubscriberAndDeviceInformation si = subsService.get(getPortName(port));
338 if (si == null) {
339 log.error("Subscriber information not found in sadis for port {}", accessDevicePort);
340 return false;
341 }
342 DiscoveredSubscriber sub = new DiscoveredSubscriber(device, port,
343 DiscoveredSubscriber.Status.ADDED, true, si);
344
345 // NOTE we need to keep a list of the subscribers that are provisioned on a port,
346 // regardless of the flow status
347 si.uniTagList().forEach(uti -> {
348 ServiceKey sk = new ServiceKey(accessDevicePort, uti);
349 oltFlowService.updateProvisionedSubscriberStatus(sk, true);
350 });
351
352 addSubscriberToQueue(sub);
353 return true;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +0100354 });
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700355 //NOTE this only means we have taken the request in, nothing more.
Amit Ghosh31939522018-08-16 13:28:21 +0100356 return true;
alshabibbf23a1f2016-01-14 17:27:11 -0800357 }
358
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000359 @Override
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700360 public boolean removeSubscriber(ConnectPoint cp) {
361 subscriberExecutor.submit(() -> {
362 Device device = deviceService.getDevice(DeviceId.deviceId(cp.deviceId().toString()));
363 Port port = deviceService.getPort(device.id(), cp.port());
364 AccessDevicePort accessDevicePort = new AccessDevicePort(port);
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000365
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700366 if (oltDeviceService.isNniPort(device, port.number())) {
367 log.warn("will not un-provision a subscriber on the NNI {}",
368 accessDevicePort);
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100369 return false;
370 }
371
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700372 log.info("Un-provisioning subscriber on {}", accessDevicePort);
373
374 if (!oltFlowService.isSubscriberServiceProvisioned(accessDevicePort)) {
375 log.error("Subscriber on {} is not provisioned", accessDevicePort);
376 return false;
377 }
Matteo Scandolo27629042022-06-29 01:35:59 +0000378
Gustavo Silva89e2f042022-08-01 09:58:04 -0300379 //First check if the subscriber is in the programmed subscriber map, if not fallback to sadis
380 SubscriberAndDeviceInformation si = getProgrammedSubscriber(oltFlowService, accessDevicePort);
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700381 if (si == null) {
Gustavo Silva89e2f042022-08-01 09:58:04 -0300382 si = subsService.get(getPortName(port));
383 }
384 // if it's still null we can't proceed
385 if (si == null) {
386 log.error("Subscriber information not found in programmed subscribers or sadis for port {}",
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700387 accessDevicePort);
388 // NOTE that we are returning true so that the subscriber is removed from the queue
389 // and we can move on provisioning others
390 return false;
391 }
Gustavo Silva89e2f042022-08-01 09:58:04 -0300392
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700393 DiscoveredSubscriber sub = new DiscoveredSubscriber(device, port,
Andrea Campanella34ce61a2022-04-28 18:55:46 +0200394 DiscoveredSubscriber.Status.ADMIN_REMOVED, true, si);
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700395
396 // NOTE we need to keep a list of the subscribers that are provisioned on a port,
397 // regardless of the flow status
398 si.uniTagList().forEach(uti -> {
399 ServiceKey sk = new ServiceKey(accessDevicePort, uti);
400 oltFlowService.updateProvisionedSubscriberStatus(sk, false);
Gamze Abakaf59c0912019-04-19 08:24:28 +0000401 });
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700402
403 addSubscriberToQueue(sub);
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100404 return true;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700405 });
406 //NOTE this only means we have taken the request in, nothing more.
407 return true;
408 }
409
410 @Override
411 public boolean provisionSubscriber(ConnectPoint cp, VlanId cTag, VlanId sTag, Integer tpId) {
412 log.debug("Provisioning subscriber on {}, with cTag {}, stag {}, tpId {}",
413 cp, cTag, sTag, tpId);
414 Device device = deviceService.getDevice(cp.deviceId());
415 Port port = deviceService.getPort(device.id(), cp.port());
416 AccessDevicePort accessDevicePort = new AccessDevicePort(port);
417
418 if (oltDeviceService.isNniPort(device, port.number())) {
419 log.warn("will not provision a subscriber on the NNI {}", accessDevicePort);
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100420 return false;
421 }
Amit Ghosh95e2f652017-08-23 12:49:46 +0100422
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700423 SubscriberAndDeviceInformation si = new SubscriberAndDeviceInformation();
Gustavo Silva89e2f042022-08-01 09:58:04 -0300424 UniTagInformation specificService = getUniTagInformation(port, cTag, sTag, tpId);
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700425 if (specificService == null) {
426 log.error("Can't find Information for subscriber on {}, with cTag {}, " +
427 "stag {}, tpId {}", cp, cTag, sTag, tpId);
Amit Ghosh31939522018-08-16 13:28:21 +0100428 return false;
429 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700430 List<UniTagInformation> uniTagInformationList = new LinkedList<>();
431 uniTagInformationList.add(specificService);
432 si.setUniTagList(uniTagInformationList);
433 DiscoveredSubscriber sub = new DiscoveredSubscriber(device, port,
434 DiscoveredSubscriber.Status.ADDED, true, si);
Amit Ghosh31939522018-08-16 13:28:21 +0100435
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700436 // NOTE we need to keep a list of the subscribers that are provisioned on a port,
437 // regardless of the flow status
438 ServiceKey sk = new ServiceKey(accessDevicePort, specificService);
439 if (oltFlowService.isSubscriberServiceProvisioned(sk)) {
440 log.error("Subscriber on {} is already provisioned", sk);
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100441 return false;
442 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700443 oltFlowService.updateProvisionedSubscriberStatus(sk, true);
444
445 addSubscriberToQueue(sub);
446 return true;
Amit Ghosh31939522018-08-16 13:28:21 +0100447 }
448
449 @Override
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700450 public boolean removeSubscriber(ConnectPoint cp, VlanId cTag, VlanId sTag, Integer tpId) {
451 log.debug("Un-provisioning subscriber on {} with cTag {}, stag {}, tpId {}",
452 cp, cTag, sTag, tpId);
453 Device device = deviceService.getDevice(cp.deviceId());
454 Port port = deviceService.getPort(device.id(), cp.port());
455 AccessDevicePort accessDevicePort = new AccessDevicePort(port);
456
457 if (oltDeviceService.isNniPort(device, port.number())) {
458 log.warn("will not un-provision a subscriber on the NNI {}",
459 accessDevicePort);
460 return false;
461 }
462
463 SubscriberAndDeviceInformation si = new SubscriberAndDeviceInformation();
Gustavo Silva89e2f042022-08-01 09:58:04 -0300464 UniTagInformation specificService = getUniTagInformation(port, cTag, sTag, tpId);
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700465 if (specificService == null) {
466 log.error("Can't find Information for subscriber on {}, with cTag {}, " +
467 "stag {}, tpId {}", cp, cTag, sTag, tpId);
468 return false;
469 }
470 List<UniTagInformation> uniTagInformationList = new LinkedList<>();
471 uniTagInformationList.add(specificService);
472 si.setUniTagList(uniTagInformationList);
473 DiscoveredSubscriber sub = new DiscoveredSubscriber(device, port,
Andrea Campanella34ce61a2022-04-28 18:55:46 +0200474 DiscoveredSubscriber.Status.ADMIN_REMOVED, true, si);
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700475
476 // NOTE we need to keep a list of the subscribers that are provisioned on a port,
477 // regardless of the flow status
478 ServiceKey sk = new ServiceKey(accessDevicePort, specificService);
479 if (!oltFlowService.isSubscriberServiceProvisioned(sk)) {
480 log.error("Subscriber on {} is not provisioned", sk);
481 return false;
482 }
483 oltFlowService.updateProvisionedSubscriberStatus(sk, false);
484
485 addSubscriberToQueue(sub);
486 return true;
Saurav Das82b8e6d2018-10-04 15:25:12 -0700487 }
488
489 @Override
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700490 public List<DeviceId> getConnectedOlts() {
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100491 List<DeviceId> olts = new ArrayList<>();
492 Iterable<Device> devices = deviceService.getDevices();
493 for (Device d : devices) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700494 if (oltDeviceService.isOlt(d)) {
Saurav Das82b8e6d2018-10-04 15:25:12 -0700495 // So this is indeed an OLT device
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100496 olts.add(d.id());
497 }
498 }
499 return olts;
alshabibe0559672016-02-21 14:49:51 -0800500 }
501
Amit Ghosh31939522018-08-16 13:28:21 +0100502 /**
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700503 * Finds the connect-point to which a subscriber is connected.
Amit Ghosh31939522018-08-16 13:28:21 +0100504 *
505 * @param id The id of the subscriber, this is the same ID as in Sadis
506 * @return Subscribers ConnectPoint if found else null
507 */
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700508 @Override
509 public ConnectPoint findSubscriberConnectPoint(String id) {
Amit Ghosh31939522018-08-16 13:28:21 +0100510
511 Iterable<Device> devices = deviceService.getDevices();
512 for (Device d : devices) {
513 for (Port p : deviceService.getPorts(d.id())) {
514 log.trace("Comparing {} with {}", p.annotations().value(AnnotationKeys.PORT_NAME), id);
515 if (p.annotations().value(AnnotationKeys.PORT_NAME).equals(id)) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700516 log.debug("Found on {}", portWithName(p));
Amit Ghosh31939522018-08-16 13:28:21 +0100517 return new ConnectPoint(d.id(), p.number());
518 }
519 }
520 }
521 return null;
522 }
523
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700524 protected void processDiscoveredSubscribers() {
Gamze Abaka641fc072018-09-04 09:16:27 +0000525
Andrea Campanella61650a12022-01-24 18:09:44 -0800526 log.info("Started processDiscoveredSubscribers loop");
527 while (true) {
528 Set<ConnectPoint> discoveredCps;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700529 try {
530 queueReadLock.lock();
Andrea Campanella61650a12022-01-24 18:09:44 -0800531 discoveredCps = new HashSet<>(eventsQueues.keySet());
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700532 } catch (Exception e) {
Andrea Campanella61650a12022-01-24 18:09:44 -0800533 log.error("Cannot read keys from queue map", e);
534 continue;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700535 } finally {
536 queueReadLock.unlock();
Tunahan Sezena07fe962021-02-24 08:24:24 +0000537 }
Tunahan Sezena07fe962021-02-24 08:24:24 +0000538
Andrea Campanella61650a12022-01-24 18:09:44 -0800539 discoveredCps.forEach(cp -> {
540 LinkedBlockingQueue<DiscoveredSubscriber> eventsQueue;
541
542 try {
543 queueReadLock.lock();
544 eventsQueue = eventsQueues.get(cp);
545 } catch (Exception e) {
546 log.error("Cannot get key from queue map", e);
547 return;
548 } finally {
549 queueReadLock.unlock();
Andrea Campanella7a1d7e72020-11-05 10:40:10 +0100550 }
Tunahan Sezena07fe962021-02-24 08:24:24 +0000551
Andrea Campanella61650a12022-01-24 18:09:44 -0800552 if (!oltDeviceService.isLocalLeader(cp.deviceId())) {
553 // if we're not local leader for this device, ignore this queue
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700554 if (log.isTraceEnabled()) {
Andrea Campanella61650a12022-01-24 18:09:44 -0800555 log.trace("Ignoring queue on CP {} as not master of the device", cp);
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700556 }
Andrea Campanella61650a12022-01-24 18:09:44 -0800557 return;
558 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700559
Andrea Campanella61650a12022-01-24 18:09:44 -0800560 try {
561 flowsExecutor.execute(() -> {
562 if (!eventsQueue.isEmpty()) {
563 // we do not remove the event from the queue until it has been processed
564 // in that way we guarantee that events are processed in order
565 DiscoveredSubscriber sub = eventsQueue.peek();
566 if (sub == null) {
567 // the queue is empty
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700568 return;
569 }
570
Andrea Campanella61650a12022-01-24 18:09:44 -0800571 if (log.isTraceEnabled()) {
Andrea Campanellaccb32862022-02-17 16:29:10 +0100572 log.trace("Processing subscriber {} on port {} with status {}, has subscriber {}",
Andrea Campanella61650a12022-01-24 18:09:44 -0800573 sub, portWithName(sub.port), sub.status, sub.hasSubscriber);
574 }
575
576 if (sub.hasSubscriber) {
577 // this is a provision subscriber call
578 if (oltFlowService.handleSubscriberFlows(sub, defaultBpId, multicastServiceName)) {
579 removeSubscriberFromQueue(sub);
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700580 }
Andrea Campanella61650a12022-01-24 18:09:44 -0800581 } else {
582 // this is a port event (ENABLED/DISABLED)
583 // means no subscriber was provisioned on that port
584
585 if (!deviceService.isAvailable(sub.device.id()) ||
586 deviceService.getPort(sub.device.id(), sub.port.number()) == null) {
587 // If the device is not connected or the port is not available do nothing
588 // This can happen when we disable and then immediately delete the device,
589 // the queue is populated but the meters and flows are already gone
590 // thus there is nothing left to do
591 return;
592 }
593
594 if (oltFlowService.handleBasicPortFlows(sub, defaultBpId, defaultBpId)) {
595 if (log.isTraceEnabled()) {
596 log.trace("Processing of port {} completed",
597 portWithName(sub.port));
598 }
599 removeSubscriberFromQueue(sub);
Andrea Campanellaccb32862022-02-17 16:29:10 +0100600 } else {
601 log.debug("Not handling basic port flows " +
602 "for {}, leaving in the queue",
603 portWithName(sub.port));
Andrea Campanella61650a12022-01-24 18:09:44 -0800604 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700605 }
Tunahan Sezena07fe962021-02-24 08:24:24 +0000606 }
Andrea Campanella61650a12022-01-24 18:09:44 -0800607 });
608 } catch (Exception e) {
609 log.error("Exception processing subscriber", e);
Tunahan Sezena07fe962021-02-24 08:24:24 +0000610 }
611 });
Tunahan Sezena07fe962021-02-24 08:24:24 +0000612
Andrea Campanella61650a12022-01-24 18:09:44 -0800613 try {
614 TimeUnit.MILLISECONDS.sleep(requeueDelay);
615 } catch (InterruptedException e) {
616 log.debug("Interrupted while waiting to requeue", e);
617 }
Tunahan Sezena07fe962021-02-24 08:24:24 +0000618 }
Tunahan Sezena07fe962021-02-24 08:24:24 +0000619 }
620
Andrea Campanella61650a12022-01-24 18:09:44 -0800621
Tunahan Sezena07fe962021-02-24 08:24:24 +0000622 /**
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000623 * Checks the subscriber uni tag list and find the uni tag information.
624 * using the pon c tag, pon s tag and the technology profile id
625 * May return Optional<null>
626 *
Gustavo Silva89e2f042022-08-01 09:58:04 -0300627 * @param port port of the subscriber
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000628 * @param innerVlan pon c tag
629 * @param outerVlan pon s tag
630 * @param tpId the technology profile id
631 * @return the found uni tag information
632 */
Gustavo Silva89e2f042022-08-01 09:58:04 -0300633 private UniTagInformation getUniTagInformation(Port port, VlanId innerVlan,
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700634 VlanId outerVlan, int tpId) {
Gustavo Silva89e2f042022-08-01 09:58:04 -0300635 String portName = portWithName(port);
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000636 log.debug("Getting uni tag information for {}, innerVlan: {}, outerVlan: {}, tpId: {}",
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700637 portName, innerVlan, outerVlan, tpId);
Gustavo Silva89e2f042022-08-01 09:58:04 -0300638 //First check if the subscriber is in the programmed subscriber map, if not fallback to sadis
639 //there should be only one sub service with these characteristics.
640 Optional<Map.Entry<ServiceKey, UniTagInformation>> service = oltFlowService.getProgrammedSubscribers()
641 .entrySet().stream()
642 .filter(entry -> entry.getKey().getPort().equals(new AccessDevicePort(port))
643 && entry.getValue().getPonSTag().equals(outerVlan)
644 && entry.getValue().getPonCTag().equals(innerVlan))
645 .findFirst();
646 if (service.isPresent()) {
647 log.debug("Subscriber was programmed with uni tag info for {}, innerVlan: {}, outerVlan: {}, tpId: {}",
648 portName, innerVlan, outerVlan, tpId);
649 return service.get().getValue();
650 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700651 SubscriberAndDeviceInformation subInfo = subsService.get(portName);
Gamze Abakaf59c0912019-04-19 08:24:28 +0000652 if (subInfo == null) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700653 log.warn("Subscriber information doesn't exist for {}", portName);
654 return null;
Gamze Abakaf59c0912019-04-19 08:24:28 +0000655 }
656
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000657 List<UniTagInformation> uniTagList = subInfo.uniTagList();
658 if (uniTagList == null) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700659 log.warn("Uni tag list is not found for the subscriber {} on {}", subInfo.id(), portName);
660 return null;
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000661 }
Matteo Scandolo27629042022-06-29 01:35:59 +0000662
Gustavo Silva89e2f042022-08-01 09:58:04 -0300663 UniTagInformation uniTagInformation = OltUtils.getUniTagInformation(subInfo, innerVlan, outerVlan, tpId);
664 if (uniTagInformation == null) {
Matteo Scandolo27629042022-06-29 01:35:59 +0000665
Harsh Awasthic1e4bf52022-02-09 14:14:14 +0530666 // Try again after invalidating cache for the particular port name.
667 subsService.invalidateId(portName);
668 subInfo = subsService.get(portName);
Gustavo Silva89e2f042022-08-01 09:58:04 -0300669 uniTagInformation = OltUtils.getUniTagInformation(subInfo, innerVlan, outerVlan, tpId);
Gamze Abaka1efc80c2019-02-15 12:10:54 +0000670 }
Gamze Abaka1efc80c2019-02-15 12:10:54 +0000671
Gustavo Silva89e2f042022-08-01 09:58:04 -0300672 if (uniTagInformation == null) {
Matteo Scandolo19b56f62020-10-29 13:29:21 -0700673 log.warn("SADIS doesn't include the service with ponCtag {} ponStag {} and tpId {} on {}",
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700674 innerVlan, outerVlan, tpId, portName);
Saurav Das82b8e6d2018-10-04 15:25:12 -0700675 return null;
676 }
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100677
Gustavo Silva89e2f042022-08-01 09:58:04 -0300678 return uniTagInformation;
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100679 }
680
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700681 protected void bindSadisService(SadisService service) {
682 sadisService = service;
683 subsService = sadisService.getSubscriberInfoService();
684
685 log.info("Sadis-service binds to onos.");
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100686 }
687
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700688 protected void unbindSadisService(SadisService service) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700689 deviceListener = null;
690 sadisService = null;
691 subsService = null;
692 log.info("Sadis-service unbinds from onos.");
Jonathan Hart1d34c8b2018-05-05 15:37:28 -0700693 }
694
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700695 protected void addSubscriberToQueue(DiscoveredSubscriber sub) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700696 try {
Andrea Campanella61650a12022-01-24 18:09:44 -0800697 ConnectPoint cp = new ConnectPoint(sub.device.id(), sub.port.number());
Matteo Scandolod7aa89c2021-12-07 10:21:34 -0800698
Andrea Campanella61650a12022-01-24 18:09:44 -0800699 try {
700 queueWriteLock.lock();
701 eventsQueues.compute(cp, (subcp, queue) -> {
702 queue = queue == null ? new LinkedBlockingQueue<>() : queue;
703 log.info("Adding subscriber {} to queue: {} with existing {}",
704 sub, portWithName(sub.port), queue);
705 queue.add(sub);
706 return queue;
707 });
708 } catch (UnsupportedOperationException | ClassCastException |
709 NullPointerException | IllegalArgumentException e) {
710 log.error("Cannot add subscriber {} to queue: {}", portWithName(sub.port), e.getMessage());
711 } finally {
712 queueWriteLock.unlock();
713 }
714 } catch (Exception e) {
715 log.error("Can't add {} to queue", sub, e);
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800716 }
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800717 }
718
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700719 protected void removeSubscriberFromQueue(DiscoveredSubscriber sub) {
720 ConnectPoint cp = new ConnectPoint(sub.device.id(), sub.port.number());
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700721 try {
722 queueWriteLock.lock();
Andrea Campanella61650a12022-01-24 18:09:44 -0800723 eventsQueues.compute(cp, (subcp, queue) -> {
724 if (log.isTraceEnabled()) {
725 log.trace("Removing subscriber {} from queue : {} " +
726 "with existing {}", sub,
727 portWithName(sub.port), queue);
728 }
729 if (queue == null) {
730 log.warn("Cannot find queue for connectPoint {}", cp);
731 return queue;
732 }
733 boolean removed = queue.remove(sub);
734 if (!removed) {
735 log.warn("Subscriber {} has not been removed from queue, " +
736 "is it still there? {}", sub, queue);
737 return queue;
738 } else {
739 log.debug("Subscriber {} has been removed from the queue {}",
740 sub, queue);
741 }
742
743 return queue;
744 });
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700745 } catch (UnsupportedOperationException | ClassCastException |
746 NullPointerException | IllegalArgumentException e) {
747 log.error("Cannot remove subscriber {} from queue: {}", sub, e.getMessage());
748 } finally {
749 queueWriteLock.unlock();
Tunahan Sezena07fe962021-02-24 08:24:24 +0000750 }
751 }
752
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700753 protected class OltDeviceListener
754 implements DeviceListener {
755
756 private final Logger log = LoggerFactory.getLogger(getClass());
757 protected ExecutorService eventExecutor;
758
759 /**
760 * Builds the listener with all the proper services and information needed.
761 */
762 public OltDeviceListener() {
763 this.eventExecutor = Executors.newFixedThreadPool(flowProcessingThreads,
764 groupedThreads("onos/olt-device-listener-event", "event-%d", log));
765 }
766
767 public void deactivate() {
768 this.eventExecutor.shutdown();
769 }
Saurav Dasa9d5f442019-03-06 19:32:48 -0800770
alshabibf0e7e702015-05-30 18:22:36 -0700771 @Override
772 public void event(DeviceEvent event) {
Andrea Campanella61650a12022-01-24 18:09:44 -0800773 if (log.isTraceEnabled()) {
774 log.trace("OltListener receives event {} for: {}/{}", event.type(), event.subject().id(),
775 event.port() != null ? event.port().number() : null);
776 }
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700777 eventExecutor.execute(() -> {
Andrea Campanella61650a12022-01-24 18:09:44 -0800778 if (log.isTraceEnabled()) {
779 log.trace("OltListener Executor receives event {} for: {}/{}",
780 event.type(), event.subject().id(),
781 event.port() != null ? event.port().number() : null);
782 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700783 boolean isOlt = oltDeviceService.isOlt(event.subject());
784 DeviceId deviceId = event.subject().id();
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700785 switch (event.type()) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700786 case PORT_STATS_UPDATED:
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700787 case DEVICE_ADDED:
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700788 return;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700789 case PORT_ADDED:
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700790 case PORT_REMOVED:
791 if (!isOlt) {
792 log.trace("Ignoring event {}, this is not an OLT device", deviceId);
793 return;
794 }
795 if (!oltDeviceService.isLocalLeader(deviceId)) {
796 log.trace("Device {} is not local to this node", deviceId);
797 return;
798 }
799 // port added, updated and removed are treated in the same way as we only care whether the port
800 // is enabled or not
801 handleOltPort(event.type(), event.subject(), event.port());
802 return;
Andrea Campanellacc6dc7e2022-03-22 10:38:43 +0100803 case PORT_UPDATED:
804 if (!isOlt) {
805 log.trace("Ignoring event {}, this is not an OLT device", deviceId);
806 return;
807 }
808 // port updated are handled only when the device is available, makes not sense otherwise.
809 // this also solves an issue with port events and device disconnection events handled
810 // in sparse order causing failures in the ofagent disconnect test
811 // (see https://jira.opencord.org/browse/VOL-4669)
812 if (!deviceService.isAvailable(deviceId)) {
813 log.debug("Ignoring port event {} on {} as it is disconnected", event, deviceId);
814 return;
815 }
816 if (!oltDeviceService.isLocalLeader(deviceId)) {
817 log.trace("Device {} is not local to this node", deviceId);
818 return;
819 }
820 handleOltPort(event.type(), event.subject(), event.port());
821 return;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700822 case DEVICE_AVAILABILITY_CHANGED:
823 if (!isOlt) {
824 log.trace("Ignoring event {}, this is not an OLT device", deviceId);
825 return;
826 }
827 if (deviceService.isAvailable(deviceId)) {
828 if (!oltDeviceService.isLocalLeader(deviceId)) {
829 if (log.isTraceEnabled()) {
830 log.trace("Device {} is not local to this node, not handling available device",
831 deviceId);
832 }
833 } else {
834 log.info("Handling available device: {}", deviceId);
835 handleExistingPorts();
836 }
837 } else if (!deviceService.isAvailable(deviceId) && deviceService.getPorts(deviceId).isEmpty()) {
838 // NOTE that upon disconnection there is no mastership on the device,
839 // and we should anyway clear the local cache of the flows/meters across instances.
840 // We're only clearing the device if there are no available ports,
841 // otherwise we assume it's a temporary disconnection
842 log.info("Device {} availability changed to false and ports are empty, " +
843 "purging meters and flows", deviceId);
844 //NOTE all the instances will call these methods
845 oltFlowService.purgeDeviceFlows(deviceId);
846 oltMeterService.purgeDeviceMeters(deviceId);
Andrea Campanellaccb32862022-02-17 16:29:10 +0100847 // cpStatus is a distributed map, thus only master will update it.
848 if (oltDeviceService.isLocalLeader(deviceId)) {
849 log.debug("Master, clearing cp status for {}", deviceId);
850 clearQueueForDevice(deviceId);
851 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700852 } else {
853 log.info("Device {} availability changed to false, but ports are still available, " +
Matteo Scandolod7aa89c2021-12-07 10:21:34 -0800854 "assuming temporary disconnection.",
855 deviceId);
856 if (log.isTraceEnabled()) {
857 log.trace("Available ports: {}", deviceService.getPorts(deviceId));
858 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700859 }
860 return;
861 case DEVICE_REMOVED:
862 if (!isOlt) {
863 log.trace("Ignoring event {}, this is not an OLT device", deviceId);
864 return;
865 }
866 log.info("Device {} Removed, purging meters and flows", deviceId);
867 oltFlowService.purgeDeviceFlows(deviceId);
868 oltMeterService.purgeDeviceMeters(deviceId);
Andrea Campanellaccb32862022-02-17 16:29:10 +0100869 if (oltDeviceService.isLocalLeader(deviceId)) {
870 log.debug("Master, clearing cp status for {}", deviceId);
871 clearQueueForDevice(deviceId);
872 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700873 return;
874 default:
875 if (log.isTraceEnabled()) {
876 log.trace("Not handling event: {}, ", event);
877 }
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700878 }
879 });
alshabibf0e7e702015-05-30 18:22:36 -0700880 }
Gamze Abaka838d8142019-02-21 07:06:55 +0000881
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700882 protected void clearQueueForDevice(DeviceId devId) {
883 try {
884 queueWriteLock.lock();
885 Iterator<Map.Entry<ConnectPoint, LinkedBlockingQueue<DiscoveredSubscriber>>> iter =
886 eventsQueues.entrySet().iterator();
887 while (iter.hasNext()) {
888 Map.Entry<ConnectPoint, LinkedBlockingQueue<DiscoveredSubscriber>> entry = iter.next();
889 if (entry.getKey().deviceId().equals(devId)) {
890 eventsQueues.remove(entry.getKey());
Andrea Campanellaccb32862022-02-17 16:29:10 +0100891 log.debug("Removing key from queue {}", entry.getKey());
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700892 }
893 }
894 } finally {
895 queueWriteLock.unlock();
Gamze Abaka838d8142019-02-21 07:06:55 +0000896 }
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000897 }
898
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700899 protected void handleOltPort(DeviceEvent.Type type, Device device, Port port) {
900 log.info("OltDeviceListener receives event {} for port {} with status {} on device {}", type,
901 portWithName(port), port.isEnabled() ? "ENABLED" : "DISABLED", device.id());
Andrea Campanella61650a12022-01-24 18:09:44 -0800902 boolean isNni = oltDeviceService.isNniPort(device, port.number());
903
904 if (!isNni && type == DeviceEvent.Type.PORT_ADDED) {
905 log.debug("Ignoring PORT_ADD on UNI port {}", portWithName(port));
906 return;
907 }
Gustavo Silva89e2f042022-08-01 09:58:04 -0300908 AccessDevicePort accessDevicePort = new AccessDevicePort(port);
Andrea Campanella61650a12022-01-24 18:09:44 -0800909
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700910 if (port.isEnabled()) {
Andrea Campanella61650a12022-01-24 18:09:44 -0800911 if (isNni) {
Gustavo Silva29fb20e2022-05-26 09:59:54 -0300912 FlowOperation action = FlowOperation.ADD;
Matteo Scandolo1f8de332021-12-06 12:18:24 -0800913 // NOTE the NNI is only disabled if the OLT shuts down (reboot or failure).
914 // In that case the flows are purged anyway, so there's no need to deal with them,
915 // it would actually be counter-productive as the openflow connection is severed and they won't
916 // be correctly processed
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700917 if (type == DeviceEvent.Type.PORT_REMOVED) {
Matteo Scandolo1f8de332021-12-06 12:18:24 -0800918 log.debug("NNI port went down, " +
919 "ignoring event as flows will be removed in the generic device cleanup");
920 return;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700921 }
922 oltFlowService.handleNniFlows(device, port, action);
923 } else {
924 post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_ADDED, device.id(), port));
925 // NOTE if the subscriber was previously provisioned,
926 // then add it back to the queue to be re-provisioned
927 boolean provisionSubscriber = oltFlowService.
Gustavo Silva89e2f042022-08-01 09:58:04 -0300928 isSubscriberServiceProvisioned(accessDevicePort);
929
930 SubscriberAndDeviceInformation si;
931 DiscoveredSubscriber.Status status = DiscoveredSubscriber.Status.ADDED;
932 if (type == DeviceEvent.Type.PORT_REMOVED) {
933 status = DiscoveredSubscriber.Status.REMOVED;
934 si = getProgrammedSubscriber(oltFlowService, accessDevicePort);
935 } else {
936 si = subsService.get(getPortName(port));
937 }
938
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700939 if (si == null) {
940 //NOTE this should not happen given that the subscriber was provisioned before
941 log.error("Subscriber information not found in sadis for port {}",
942 portWithName(port));
943 return;
944 }
945
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700946 DiscoveredSubscriber sub =
947 new DiscoveredSubscriber(device, port,
948 status, provisionSubscriber, si);
949 addSubscriberToQueue(sub);
950 }
951 } else {
Andrea Campanella61650a12022-01-24 18:09:44 -0800952 if (isNni) {
Matteo Scandolob6981dc2021-12-02 16:31:44 -0800953 // NOTE the NNI is only disabled if the OLT shuts down (reboot or failure).
954 // In that case the flows are purged anyway, so there's no need to deal with them,
955 // it would actually be counter-productive as the openflow connection is severed and they won't
956 // be correctly processed
957 log.debug("NNI port went down, " +
958 "ignoring event as flows will be removed in the generic device cleanup");
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700959 } else {
960 post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_REMOVED, device.id(), port));
961 // NOTE we are assuming that if a subscriber has default eapol
962 // it does not have subscriber flows
963 if (oltFlowService.hasDefaultEapol(port)) {
964 SubscriberAndDeviceInformation si = subsService.get(getPortName(port));
965 if (si == null) {
966 //NOTE this should not happen given that the subscriber was provisioned before
967 log.error("Subscriber information not found in sadis for port {}",
968 portWithName(port));
969 return;
970 }
971 DiscoveredSubscriber sub =
972 new DiscoveredSubscriber(device, port,
973 DiscoveredSubscriber.Status.REMOVED, false, si);
974
975 addSubscriberToQueue(sub);
976
977 } else if (oltFlowService.isSubscriberServiceProvisioned(new AccessDevicePort(port))) {
Gustavo Silva89e2f042022-08-01 09:58:04 -0300978 //First check if the subscriber is in the programmed subscriber map, if not fallback to sadis
979 SubscriberAndDeviceInformation si = getProgrammedSubscriber(oltFlowService, accessDevicePort);
980 if (si == null) {
981 si = subsService.get(getPortName(port));
982 }
983 // if it's still null we can't proceed
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700984 if (si == null) {
985 //NOTE this should not happen given that the subscriber was provisioned before
986 log.error("Subscriber information not found in sadis for port {}",
987 portWithName(port));
988 return;
989 }
990 DiscoveredSubscriber sub =
991 new DiscoveredSubscriber(device, port,
992 DiscoveredSubscriber.Status.REMOVED, true, si);
993 addSubscriberToQueue(sub);
994 }
995 }
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000996 }
Gamze Abaka838d8142019-02-21 07:06:55 +0000997 }
Gamze Abakada282b42019-03-11 13:16:48 +0000998
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700999 /**
1000 * This method is invoked on app activation in order to deal
1001 * with devices and ports that are already existing in the system
1002 * and thus won't trigger an event.
1003 * It is also needed on instance reboot and device reconnect
1004 */
1005 protected void handleExistingPorts() {
1006 Iterable<DeviceId> devices = getConnectedOlts();
1007 for (DeviceId deviceId : devices) {
1008 log.info("Handling existing OLT Ports for device {}", deviceId);
1009 if (oltDeviceService.isLocalLeader(deviceId)) {
1010 List<Port> ports = deviceService.getPorts(deviceId);
1011 for (Port p : ports) {
1012 if (PortNumber.LOCAL.equals(p.number()) || !p.isEnabled()) {
1013 continue;
1014 }
1015 Device device = deviceService.getDevice(deviceId);
1016 deviceListener.handleOltPort(DeviceEvent.Type.PORT_UPDATED, device, p);
1017 }
1018 }
Jonathan Hart4f178fa2020-02-03 10:46:01 -08001019 }
1020 }
1021 }
Andrea Campanella61650a12022-01-24 18:09:44 -08001022
1023 protected final class ThreadPoolQueue extends ArrayBlockingQueue<Runnable> {
1024
1025 public ThreadPoolQueue(int capacity) {
1026 super(capacity);
1027 }
1028
1029 @Override
1030 public boolean offer(Runnable runnable) {
1031 if (runnable == null) {
1032 return false;
1033 }
1034 try {
1035 put(runnable);
1036 } catch (InterruptedException e1) {
1037 Thread.currentThread().interrupt();
1038 return false;
1039 }
1040 return true;
1041 }
1042
1043 }
Hardik Windlass395ff372019-06-13 05:16:00 +00001044}