blob: 1f103d84b3053a7be0f638bf185461b75dc9316e [file] [log] [blame]
alshabibf0e7e702015-05-30 18:22:36 -07001/*
Matteo Scandoloaa2adde2021-09-13 12:45:32 -07002 * Copyright 2021-present Open Networking Foundation
alshabibf0e7e702015-05-30 18:22:36 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
alshabib36a4d732016-06-01 16:03:59 -070016package org.opencord.olt.impl;
alshabibf0e7e702015-05-30 18:22:36 -070017
alshabibf0e7e702015-05-30 18:22:36 -070018import org.onlab.packet.VlanId;
Jonathan Hart4f178fa2020-02-03 10:46:01 -080019import org.onlab.util.KryoNamespace;
Gamze Abaka1f62dd92020-05-07 08:58:13 +000020import org.onosproject.cfg.ComponentConfigService;
Jonathan Hart4f178fa2020-02-03 10:46:01 -080021import org.onosproject.cluster.ClusterService;
Andrea Campanellaaf39b4c2020-05-13 14:07:44 +020022import org.onosproject.cluster.LeadershipService;
alshabibf0e7e702015-05-30 18:22:36 -070023import org.onosproject.core.ApplicationId;
24import org.onosproject.core.CoreService;
alshabib8e4fd2f2016-01-12 15:55:53 -080025import org.onosproject.event.AbstractListenerManager;
Andrea Campanellaaf39b4c2020-05-13 14:07:44 +020026import org.onosproject.mastership.MastershipService;
Amit Ghosh1ed9aef2018-07-17 17:08:16 +010027import org.onosproject.net.AnnotationKeys;
Jonathan Harte533a422015-10-20 17:31:24 -070028import org.onosproject.net.ConnectPoint;
Amit Ghosh1ed9aef2018-07-17 17:08:16 +010029import org.onosproject.net.Device;
alshabibf0e7e702015-05-30 18:22:36 -070030import org.onosproject.net.DeviceId;
alshabibdec2e252016-01-15 12:20:25 -080031import org.onosproject.net.Port;
alshabibf0e7e702015-05-30 18:22:36 -070032import org.onosproject.net.PortNumber;
33import org.onosproject.net.device.DeviceEvent;
34import org.onosproject.net.device.DeviceListener;
35import org.onosproject.net.device.DeviceService;
Jonathan Hart4f178fa2020-02-03 10:46:01 -080036import org.onosproject.store.serializers.KryoNamespaces;
Jonathan Hart4f178fa2020-02-03 10:46:01 -080037import org.onosproject.store.service.Serializer;
38import org.onosproject.store.service.StorageService;
alshabib36a4d732016-06-01 16:03:59 -070039import org.opencord.olt.AccessDeviceEvent;
40import org.opencord.olt.AccessDeviceListener;
41import org.opencord.olt.AccessDeviceService;
Gamze Abaka641fc072018-09-04 09:16:27 +000042import org.opencord.sadis.BaseInformationService;
43import org.opencord.sadis.SadisService;
Amit Ghosh1ed9aef2018-07-17 17:08:16 +010044import org.opencord.sadis.SubscriberAndDeviceInformation;
Andrea Campanellacbbb7952019-11-25 06:38:41 +000045import org.opencord.sadis.UniTagInformation;
alshabibe0559672016-02-21 14:49:51 -080046import org.osgi.service.component.ComponentContext;
Carmelo Casconeca931162019-07-15 18:22:24 -070047import org.osgi.service.component.annotations.Activate;
48import org.osgi.service.component.annotations.Component;
49import org.osgi.service.component.annotations.Deactivate;
50import org.osgi.service.component.annotations.Modified;
51import org.osgi.service.component.annotations.Reference;
52import org.osgi.service.component.annotations.ReferenceCardinality;
Ilayda Ozdemir90a93622021-02-25 09:40:58 +000053import org.osgi.service.component.annotations.ReferencePolicy;
alshabibf0e7e702015-05-30 18:22:36 -070054import org.slf4j.Logger;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -070055import org.slf4j.LoggerFactory;
alshabibf0e7e702015-05-30 18:22:36 -070056
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010057import java.util.ArrayList;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010058import java.util.Dictionary;
Andrea Campanella61650a12022-01-24 18:09:44 -080059import java.util.HashSet;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -070060import java.util.Iterator;
61import java.util.LinkedList;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010062import java.util.List;
63import java.util.Map;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010064import java.util.Properties;
65import java.util.Set;
Andrea Campanella61650a12022-01-24 18:09:44 -080066import java.util.concurrent.ArrayBlockingQueue;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010067import java.util.concurrent.ExecutorService;
68import java.util.concurrent.Executors;
69import java.util.concurrent.LinkedBlockingQueue;
70import java.util.concurrent.ScheduledExecutorService;
Andrea Campanella61650a12022-01-24 18:09:44 -080071import java.util.concurrent.ThreadPoolExecutor;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010072import java.util.concurrent.TimeUnit;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -070073import java.util.concurrent.locks.Lock;
74import java.util.concurrent.locks.ReentrantReadWriteLock;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010075
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010076import static com.google.common.base.Strings.isNullOrEmpty;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010077import static org.onlab.util.Tools.get;
78import static org.onlab.util.Tools.groupedThreads;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -070079import static org.opencord.olt.impl.OltUtils.getPortName;
80import static org.opencord.olt.impl.OltUtils.portWithName;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010081import static org.opencord.olt.impl.OsgiPropertyConstants.*;
alshabibf0e7e702015-05-30 18:22:36 -070082
83/**
Matteo Scandoloaa2adde2021-09-13 12:45:32 -070084 * OLT Application.
alshabibf0e7e702015-05-30 18:22:36 -070085 */
Carmelo Casconeca931162019-07-15 18:22:24 -070086@Component(immediate = true,
87 property = {
Carmelo Casconeca931162019-07-15 18:22:24 -070088 DEFAULT_BP_ID + ":String=" + DEFAULT_BP_ID_DEFAULT,
Andrea Campanellacbbb7952019-11-25 06:38:41 +000089 DEFAULT_MCAST_SERVICE_NAME + ":String=" + DEFAULT_MCAST_SERVICE_NAME_DEFAULT,
Matteo Scandoloaa2adde2021-09-13 12:45:32 -070090 FLOW_PROCESSING_THREADS + ":Integer=" + FLOW_PROCESSING_THREADS_DEFAULT,
Andrea Campanella61650a12022-01-24 18:09:44 -080091 FLOW_EXECUTOR_QUEUE_SIZE + ":Integer=" + FLOW_EXECUTOR_QUEUE_SIZE_DEFAULT,
Matteo Scandoloaa2adde2021-09-13 12:45:32 -070092 SUBSCRIBER_PROCESSING_THREADS + ":Integer=" + SUBSCRIBER_PROCESSING_THREADS_DEFAULT,
93 REQUEUE_DELAY + ":Integer=" + REQUEUE_DELAY_DEFAULT
Carmelo Casconeca931162019-07-15 18:22:24 -070094 })
alshabib8e4fd2f2016-01-12 15:55:53 -080095public class Olt
96 extends AbstractListenerManager<AccessDeviceEvent, AccessDeviceListener>
97 implements AccessDeviceService {
alshabibf0e7e702015-05-30 18:22:36 -070098
Carmelo Casconeca931162019-07-15 18:22:24 -070099 @Reference(cardinality = ReferenceCardinality.MANDATORY)
alshabibf0e7e702015-05-30 18:22:36 -0700100 protected DeviceService deviceService;
101
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700102 @Reference(cardinality = ReferenceCardinality.MANDATORY)
103 protected ComponentConfigService cfgService;
Andrea Campanella438e1ad2021-03-26 11:41:16 +0100104
Carmelo Casconeca931162019-07-15 18:22:24 -0700105 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700106 protected MastershipService mastershipService;
alshabibf0e7e702015-05-30 18:22:36 -0700107
Andrea Campanella438e1ad2021-03-26 11:41:16 +0100108 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700109 protected ClusterService clusterService;
110
111 @Reference(cardinality = ReferenceCardinality.MANDATORY)
112 protected LeadershipService leadershipService;
Andrea Campanella438e1ad2021-03-26 11:41:16 +0100113
Ilayda Ozdemir90a93622021-02-25 09:40:58 +0000114 @Reference(cardinality = ReferenceCardinality.OPTIONAL,
115 bind = "bindSadisService",
116 unbind = "unbindSadisService",
117 policy = ReferencePolicy.DYNAMIC)
118 protected volatile SadisService sadisService;
Gamze Abaka641fc072018-09-04 09:16:27 +0000119
Carmelo Casconeca931162019-07-15 18:22:24 -0700120 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700121 protected OltDeviceServiceInterface oltDeviceService;
alshabibe0559672016-02-21 14:49:51 -0800122
Carmelo Casconeca931162019-07-15 18:22:24 -0700123 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700124 protected OltFlowServiceInterface oltFlowService;
125
126 @Reference(cardinality = ReferenceCardinality.MANDATORY)
127 protected OltMeterServiceInterface oltMeterService;
Gamze Abakaad329652018-12-20 10:12:21 +0000128
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800129 @Reference(cardinality = ReferenceCardinality.MANDATORY)
130 protected StorageService storageService;
131
132 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700133 protected CoreService coreService;
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800134
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700135 protected ApplicationId appId;
Andrea Campanellaaf39b4c2020-05-13 14:07:44 +0200136
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700137 private static final String ONOS_OLT_SERVICE = "onos/olt-service";
Tunahan Sezena07fe962021-02-24 08:24:24 +0000138
Carmelo Casconeca931162019-07-15 18:22:24 -0700139 /**
Carmelo Cascone95ff5122019-11-14 14:19:13 -0800140 * Default bandwidth profile id that is used for authentication trap flows.
Carmelo Casconeca931162019-07-15 18:22:24 -0700141 **/
142 protected String defaultBpId = DEFAULT_BP_ID_DEFAULT;
Gamze Abakaad329652018-12-20 10:12:21 +0000143
Carmelo Casconeca931162019-07-15 18:22:24 -0700144 /**
Gamze Abaka51a34e82020-05-08 13:03:14 +0000145 * Default multicast service name.
Carmelo Casconeca931162019-07-15 18:22:24 -0700146 **/
Gamze Abaka51a34e82020-05-08 13:03:14 +0000147 protected String multicastServiceName = DEFAULT_MCAST_SERVICE_NAME_DEFAULT;
Gamze Abaka33feef52019-02-27 08:16:47 +0000148
Saurav Das2d3777a2020-08-07 18:48:51 -0700149 /**
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700150 * Number of threads used to process flows.
Saurav Das2d3777a2020-08-07 18:48:51 -0700151 **/
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700152 protected int flowProcessingThreads = FLOW_PROCESSING_THREADS_DEFAULT;
Saurav Das2d3777a2020-08-07 18:48:51 -0700153
Matteo Scandoloa4aaa972020-10-23 15:24:38 -0700154 /**
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700155 * Number of threads used to process flows.
156 **/
Andrea Campanella61650a12022-01-24 18:09:44 -0800157 protected int flowExecutorQueueSize = FLOW_EXECUTOR_QUEUE_SIZE_DEFAULT;
158
159 /**
160 * Number of threads used to process flows.
161 **/
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700162 protected int subscriberProcessingThreads = SUBSCRIBER_PROCESSING_THREADS_DEFAULT;
163
164 /**
165 * Delay in ms to put an event back in the queue, used to avoid retrying things to often if conditions are not met.
166 **/
167 protected int requeueDelay = REQUEUE_DELAY_DEFAULT;
168
169 private final Logger log = LoggerFactory.getLogger(getClass());
170
171 /**
172 * A queue to asynchronously process events.
Matteo Scandoloa4aaa972020-10-23 15:24:38 -0700173 */
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700174 protected Map<ConnectPoint, LinkedBlockingQueue<DiscoveredSubscriber>> eventsQueues;
alshabibf0e7e702015-05-30 18:22:36 -0700175
Gamze Abaka641fc072018-09-04 09:16:27 +0000176 protected BaseInformationService<SubscriberAndDeviceInformation> subsService;
alshabibf0e7e702015-05-30 18:22:36 -0700177
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700178 /**
179 * Listener for OLT devices events.
180 */
181 protected OltDeviceListener deviceListener = new OltDeviceListener();
182 protected ScheduledExecutorService discoveredSubscriberExecutor =
183 Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/olt",
184 "discovered-cp-%d", log));
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100185
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700186 protected ScheduledExecutorService queueExecutor =
187 Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/olt",
188 "discovered-cp-restore-%d", log));
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700189
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700190 /**
191 * Executor used to defer flow provisioning to a different thread pool.
192 */
193 protected ExecutorService flowsExecutor;
Saurav Dasa9d5f442019-03-06 19:32:48 -0800194
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700195 /**
196 * Executor used to defer subscriber handling from API call to a different thread pool.
197 */
198 protected ExecutorService subscriberExecutor;
199
200 private static final String APP_NAME = "org.opencord.olt";
201
202 private final ReentrantReadWriteLock queueLock = new ReentrantReadWriteLock();
203 private final Lock queueWriteLock = queueLock.writeLock();
204 private final Lock queueReadLock = queueLock.readLock();
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700205
alshabibf0e7e702015-05-30 18:22:36 -0700206 @Activate
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700207 protected void activate(ComponentContext context) {
208 cfgService.registerProperties(getClass());
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700209
alshabibe0559672016-02-21 14:49:51 -0800210 modified(context);
Saurav Das62ad75e2019-03-05 12:22:22 -0800211
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700212 appId = coreService.registerApplication(APP_NAME);
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800213 KryoNamespace serializer = KryoNamespace.newBuilder()
214 .register(KryoNamespaces.API)
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700215 .register(ConnectPoint.class)
216 .register(DiscoveredSubscriber.class)
217 .register(DiscoveredSubscriber.Status.class)
218 .register(SubscriberAndDeviceInformation.class)
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800219 .register(UniTagInformation.class)
Ilayda Ozdemir90a93622021-02-25 09:40:58 +0000220 .register(LinkedBlockingQueue.class)
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800221 .build();
222
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700223 eventsQueues = storageService.<ConnectPoint, LinkedBlockingQueue<DiscoveredSubscriber>>consistentMapBuilder()
224 .withName("volt-subscriber-queues")
225 .withApplicationId(appId)
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800226 .withSerializer(Serializer.using(serializer))
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700227 .build().asJavaMap();
alshabib4ceaed32016-03-03 18:00:58 -0800228
alshabibba357492016-01-27 13:49:46 -0800229 deviceService.addListener(deviceListener);
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700230
231 discoveredSubscriberExecutor.execute(this::processDiscoveredSubscribers);
232 eventDispatcher.addSink(AccessDeviceEvent.class, listenerRegistry);
233 log.info("Started");
234
235 deviceListener.handleExistingPorts();
alshabibf0e7e702015-05-30 18:22:36 -0700236 }
237
238 @Deactivate
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700239 protected void deactivate(ComponentContext context) {
240 cfgService.unregisterProperties(getClass(), false);
241 discoveredSubscriberExecutor.shutdown();
Andrea Campanellaeaf23952021-12-30 15:58:54 +0100242 deviceService.removeListener(deviceListener);
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700243 flowsExecutor.shutdown();
244 subscriberExecutor.shutdown();
245 deviceListener.deactivate();
alshabibf0e7e702015-05-30 18:22:36 -0700246 log.info("Stopped");
247 }
248
alshabibe0559672016-02-21 14:49:51 -0800249 @Modified
250 public void modified(ComponentContext context) {
251 Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700252 if (context != null) {
Andrea Campanella971d5b92020-05-07 11:20:43 +0200253 String bpId = get(properties, DEFAULT_BP_ID);
254 defaultBpId = isNullOrEmpty(bpId) ? defaultBpId : bpId;
Gamze Abakaad329652018-12-20 10:12:21 +0000255
Andrea Campanella971d5b92020-05-07 11:20:43 +0200256 String mcastSN = get(properties, DEFAULT_MCAST_SERVICE_NAME);
257 multicastServiceName = isNullOrEmpty(mcastSN) ? multicastServiceName : mcastSN;
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000258
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700259 String flowThreads = get(properties, FLOW_PROCESSING_THREADS);
260 int oldFlowProcessingThreads = flowProcessingThreads;
261 flowProcessingThreads = isNullOrEmpty(flowThreads) ?
262 oldFlowProcessingThreads : Integer.parseInt(flowThreads.trim());
Saurav Das2d3777a2020-08-07 18:48:51 -0700263
Andrea Campanella61650a12022-01-24 18:09:44 -0800264 String executorQueueSize = get(properties, FLOW_EXECUTOR_QUEUE_SIZE);
265 int oldExecutorQueueSize = flowExecutorQueueSize;
266 flowExecutorQueueSize = isNullOrEmpty(executorQueueSize) ?
267 oldExecutorQueueSize : Integer.parseInt(executorQueueSize.trim());
268
269 if (flowsExecutor == null || oldFlowProcessingThreads != flowProcessingThreads
270 || oldExecutorQueueSize != flowExecutorQueueSize) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700271 if (flowsExecutor != null) {
272 flowsExecutor.shutdown();
273 }
Andrea Campanella61650a12022-01-24 18:09:44 -0800274
275 flowsExecutor = new ThreadPoolExecutor(0, flowProcessingThreads, 30,
276 TimeUnit.SECONDS, new ThreadPoolQueue(flowExecutorQueueSize),
277 new ThreadPoolExecutor.DiscardPolicy());
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700278 }
Gamze Abaka33feef52019-02-27 08:16:47 +0000279
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700280 String subscriberThreads = get(properties, SUBSCRIBER_PROCESSING_THREADS);
281 int oldSubscriberProcessingThreads = subscriberProcessingThreads;
282 subscriberProcessingThreads = isNullOrEmpty(subscriberThreads) ?
283 oldSubscriberProcessingThreads : Integer.parseInt(subscriberThreads.trim());
284
285 if (subscriberExecutor == null || oldSubscriberProcessingThreads != subscriberProcessingThreads) {
286 if (subscriberExecutor != null) {
287 subscriberExecutor.shutdown();
288 }
289 subscriberExecutor = Executors.newFixedThreadPool(subscriberProcessingThreads,
290 groupedThreads(ONOS_OLT_SERVICE,
291 "subscriber-installer-%d"));
292 }
293
294 String queueDelay = get(properties, REQUEUE_DELAY);
295 requeueDelay = isNullOrEmpty(queueDelay) ?
296 REQUEUE_DELAY_DEFAULT : Integer.parseInt(queueDelay.trim());
alshabibe0559672016-02-21 14:49:51 -0800297 }
Andrea Campanella61650a12022-01-24 18:09:44 -0800298 log.info("Modified. Values = {}: {}, {}:{}, {}:{}," +
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700299 "{}:{}, {}:{}, {}:{}",
300 DEFAULT_BP_ID, defaultBpId,
301 DEFAULT_MCAST_SERVICE_NAME, multicastServiceName,
302 FLOW_PROCESSING_THREADS, flowProcessingThreads,
Andrea Campanella61650a12022-01-24 18:09:44 -0800303 FLOW_EXECUTOR_QUEUE_SIZE, flowExecutorQueueSize,
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700304 SUBSCRIBER_PROCESSING_THREADS, subscriberProcessingThreads,
305 REQUEUE_DELAY, requeueDelay);
alshabibe0559672016-02-21 14:49:51 -0800306 }
307
Ilayda Ozdemir90a93622021-02-25 09:40:58 +0000308
alshabib32232c82016-02-25 17:57:24 -0500309 @Override
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700310 public boolean provisionSubscriber(ConnectPoint cp) {
311 subscriberExecutor.submit(() -> {
312 Device device = deviceService.getDevice(cp.deviceId());
313 Port port = deviceService.getPort(device.id(), cp.port());
314 AccessDevicePort accessDevicePort = new AccessDevicePort(port);
Hardik Windlass395ff372019-06-13 05:16:00 +0000315
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700316 if (oltDeviceService.isNniPort(device, port.number())) {
317 log.warn("will not provision a subscriber on the NNI {}", accessDevicePort);
318 return false;
Saurav Das026650f2020-09-21 18:56:35 -0700319 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700320
321 log.info("Provisioning subscriber on {}", accessDevicePort);
322
323 if (oltFlowService.isSubscriberServiceProvisioned(accessDevicePort)) {
324 log.error("Subscriber on {} is already provisioned", accessDevicePort);
325 return false;
326 }
327
328 SubscriberAndDeviceInformation si = subsService.get(getPortName(port));
329 if (si == null) {
330 log.error("Subscriber information not found in sadis for port {}", accessDevicePort);
331 return false;
332 }
333 DiscoveredSubscriber sub = new DiscoveredSubscriber(device, port,
334 DiscoveredSubscriber.Status.ADDED, true, si);
335
336 // NOTE we need to keep a list of the subscribers that are provisioned on a port,
337 // regardless of the flow status
338 si.uniTagList().forEach(uti -> {
339 ServiceKey sk = new ServiceKey(accessDevicePort, uti);
340 oltFlowService.updateProvisionedSubscriberStatus(sk, true);
341 });
342
343 addSubscriberToQueue(sub);
344 return true;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +0100345 });
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700346 //NOTE this only means we have taken the request in, nothing more.
Amit Ghosh31939522018-08-16 13:28:21 +0100347 return true;
alshabibbf23a1f2016-01-14 17:27:11 -0800348 }
349
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000350 @Override
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700351 public boolean removeSubscriber(ConnectPoint cp) {
352 subscriberExecutor.submit(() -> {
353 Device device = deviceService.getDevice(DeviceId.deviceId(cp.deviceId().toString()));
354 Port port = deviceService.getPort(device.id(), cp.port());
355 AccessDevicePort accessDevicePort = new AccessDevicePort(port);
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000356
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700357 if (oltDeviceService.isNniPort(device, port.number())) {
358 log.warn("will not un-provision a subscriber on the NNI {}",
359 accessDevicePort);
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100360 return false;
361 }
362
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700363 log.info("Un-provisioning subscriber on {}", accessDevicePort);
364
365 if (!oltFlowService.isSubscriberServiceProvisioned(accessDevicePort)) {
366 log.error("Subscriber on {} is not provisioned", accessDevicePort);
367 return false;
368 }
369
370 SubscriberAndDeviceInformation si = subsService.get(getPortName(port));
371 if (si == null) {
372 log.error("Subscriber information not found in sadis for port {}",
373 accessDevicePort);
374 // NOTE that we are returning true so that the subscriber is removed from the queue
375 // and we can move on provisioning others
376 return false;
377 }
378 DiscoveredSubscriber sub = new DiscoveredSubscriber(device, port,
379 DiscoveredSubscriber.Status.REMOVED, true, si);
380
381 // NOTE we need to keep a list of the subscribers that are provisioned on a port,
382 // regardless of the flow status
383 si.uniTagList().forEach(uti -> {
384 ServiceKey sk = new ServiceKey(accessDevicePort, uti);
385 oltFlowService.updateProvisionedSubscriberStatus(sk, false);
Gamze Abakaf59c0912019-04-19 08:24:28 +0000386 });
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700387
388 addSubscriberToQueue(sub);
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100389 return true;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700390 });
391 //NOTE this only means we have taken the request in, nothing more.
392 return true;
393 }
394
395 @Override
396 public boolean provisionSubscriber(ConnectPoint cp, VlanId cTag, VlanId sTag, Integer tpId) {
397 log.debug("Provisioning subscriber on {}, with cTag {}, stag {}, tpId {}",
398 cp, cTag, sTag, tpId);
399 Device device = deviceService.getDevice(cp.deviceId());
400 Port port = deviceService.getPort(device.id(), cp.port());
401 AccessDevicePort accessDevicePort = new AccessDevicePort(port);
402
403 if (oltDeviceService.isNniPort(device, port.number())) {
404 log.warn("will not provision a subscriber on the NNI {}", accessDevicePort);
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100405 return false;
406 }
Amit Ghosh95e2f652017-08-23 12:49:46 +0100407
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700408 SubscriberAndDeviceInformation si = new SubscriberAndDeviceInformation();
409 UniTagInformation specificService = getUniTagInformation(getPortName(port), cTag, sTag, tpId);
410 if (specificService == null) {
411 log.error("Can't find Information for subscriber on {}, with cTag {}, " +
412 "stag {}, tpId {}", cp, cTag, sTag, tpId);
Amit Ghosh31939522018-08-16 13:28:21 +0100413 return false;
414 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700415 List<UniTagInformation> uniTagInformationList = new LinkedList<>();
416 uniTagInformationList.add(specificService);
417 si.setUniTagList(uniTagInformationList);
418 DiscoveredSubscriber sub = new DiscoveredSubscriber(device, port,
419 DiscoveredSubscriber.Status.ADDED, true, si);
Amit Ghosh31939522018-08-16 13:28:21 +0100420
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700421 // NOTE we need to keep a list of the subscribers that are provisioned on a port,
422 // regardless of the flow status
423 ServiceKey sk = new ServiceKey(accessDevicePort, specificService);
424 if (oltFlowService.isSubscriberServiceProvisioned(sk)) {
425 log.error("Subscriber on {} is already provisioned", sk);
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100426 return false;
427 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700428 oltFlowService.updateProvisionedSubscriberStatus(sk, true);
429
430 addSubscriberToQueue(sub);
431 return true;
Amit Ghosh31939522018-08-16 13:28:21 +0100432 }
433
434 @Override
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700435 public boolean removeSubscriber(ConnectPoint cp, VlanId cTag, VlanId sTag, Integer tpId) {
436 log.debug("Un-provisioning subscriber on {} with cTag {}, stag {}, tpId {}",
437 cp, cTag, sTag, tpId);
438 Device device = deviceService.getDevice(cp.deviceId());
439 Port port = deviceService.getPort(device.id(), cp.port());
440 AccessDevicePort accessDevicePort = new AccessDevicePort(port);
441
442 if (oltDeviceService.isNniPort(device, port.number())) {
443 log.warn("will not un-provision a subscriber on the NNI {}",
444 accessDevicePort);
445 return false;
446 }
447
448 SubscriberAndDeviceInformation si = new SubscriberAndDeviceInformation();
449 UniTagInformation specificService = getUniTagInformation(getPortName(port), cTag, sTag, tpId);
450 if (specificService == null) {
451 log.error("Can't find Information for subscriber on {}, with cTag {}, " +
452 "stag {}, tpId {}", cp, cTag, sTag, tpId);
453 return false;
454 }
455 List<UniTagInformation> uniTagInformationList = new LinkedList<>();
456 uniTagInformationList.add(specificService);
457 si.setUniTagList(uniTagInformationList);
458 DiscoveredSubscriber sub = new DiscoveredSubscriber(device, port,
Andrea Campanella982fd332022-01-19 09:14:12 +0100459 DiscoveredSubscriber.Status.REMOVED, true, si);
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700460
461 // NOTE we need to keep a list of the subscribers that are provisioned on a port,
462 // regardless of the flow status
463 ServiceKey sk = new ServiceKey(accessDevicePort, specificService);
464 if (!oltFlowService.isSubscriberServiceProvisioned(sk)) {
465 log.error("Subscriber on {} is not provisioned", sk);
466 return false;
467 }
468 oltFlowService.updateProvisionedSubscriberStatus(sk, false);
469
470 addSubscriberToQueue(sub);
471 return true;
Saurav Das82b8e6d2018-10-04 15:25:12 -0700472 }
473
474 @Override
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700475 public List<DeviceId> getConnectedOlts() {
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100476 List<DeviceId> olts = new ArrayList<>();
477 Iterable<Device> devices = deviceService.getDevices();
478 for (Device d : devices) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700479 if (oltDeviceService.isOlt(d)) {
Saurav Das82b8e6d2018-10-04 15:25:12 -0700480 // So this is indeed an OLT device
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100481 olts.add(d.id());
482 }
483 }
484 return olts;
alshabibe0559672016-02-21 14:49:51 -0800485 }
486
Amit Ghosh31939522018-08-16 13:28:21 +0100487 /**
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700488 * Finds the connect-point to which a subscriber is connected.
Amit Ghosh31939522018-08-16 13:28:21 +0100489 *
490 * @param id The id of the subscriber, this is the same ID as in Sadis
491 * @return Subscribers ConnectPoint if found else null
492 */
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700493 @Override
494 public ConnectPoint findSubscriberConnectPoint(String id) {
Amit Ghosh31939522018-08-16 13:28:21 +0100495
496 Iterable<Device> devices = deviceService.getDevices();
497 for (Device d : devices) {
498 for (Port p : deviceService.getPorts(d.id())) {
499 log.trace("Comparing {} with {}", p.annotations().value(AnnotationKeys.PORT_NAME), id);
500 if (p.annotations().value(AnnotationKeys.PORT_NAME).equals(id)) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700501 log.debug("Found on {}", portWithName(p));
Amit Ghosh31939522018-08-16 13:28:21 +0100502 return new ConnectPoint(d.id(), p.number());
503 }
504 }
505 }
506 return null;
507 }
508
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700509 protected void processDiscoveredSubscribers() {
Gamze Abaka641fc072018-09-04 09:16:27 +0000510
Andrea Campanella61650a12022-01-24 18:09:44 -0800511 log.info("Started processDiscoveredSubscribers loop");
512 while (true) {
513 Set<ConnectPoint> discoveredCps;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700514 try {
515 queueReadLock.lock();
Andrea Campanella61650a12022-01-24 18:09:44 -0800516 discoveredCps = new HashSet<>(eventsQueues.keySet());
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700517 } catch (Exception e) {
Andrea Campanella61650a12022-01-24 18:09:44 -0800518 log.error("Cannot read keys from queue map", e);
519 continue;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700520 } finally {
521 queueReadLock.unlock();
Tunahan Sezena07fe962021-02-24 08:24:24 +0000522 }
Tunahan Sezena07fe962021-02-24 08:24:24 +0000523
Andrea Campanella61650a12022-01-24 18:09:44 -0800524 discoveredCps.forEach(cp -> {
525 LinkedBlockingQueue<DiscoveredSubscriber> eventsQueue;
526
527 try {
528 queueReadLock.lock();
529 eventsQueue = eventsQueues.get(cp);
530 } catch (Exception e) {
531 log.error("Cannot get key from queue map", e);
532 return;
533 } finally {
534 queueReadLock.unlock();
Andrea Campanella7a1d7e72020-11-05 10:40:10 +0100535 }
Tunahan Sezena07fe962021-02-24 08:24:24 +0000536
Andrea Campanella61650a12022-01-24 18:09:44 -0800537 if (!oltDeviceService.isLocalLeader(cp.deviceId())) {
538 // if we're not local leader for this device, ignore this queue
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700539 if (log.isTraceEnabled()) {
Andrea Campanella61650a12022-01-24 18:09:44 -0800540 log.trace("Ignoring queue on CP {} as not master of the device", cp);
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700541 }
Andrea Campanella61650a12022-01-24 18:09:44 -0800542 return;
543 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700544
Andrea Campanella61650a12022-01-24 18:09:44 -0800545 try {
546 flowsExecutor.execute(() -> {
547 if (!eventsQueue.isEmpty()) {
548 // we do not remove the event from the queue until it has been processed
549 // in that way we guarantee that events are processed in order
550 DiscoveredSubscriber sub = eventsQueue.peek();
551 if (sub == null) {
552 // the queue is empty
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700553 return;
554 }
555
Andrea Campanella61650a12022-01-24 18:09:44 -0800556 if (log.isTraceEnabled()) {
Andrea Campanellaccb32862022-02-17 16:29:10 +0100557 log.trace("Processing subscriber {} on port {} with status {}, has subscriber {}",
Andrea Campanella61650a12022-01-24 18:09:44 -0800558 sub, portWithName(sub.port), sub.status, sub.hasSubscriber);
559 }
560
561 if (sub.hasSubscriber) {
562 // this is a provision subscriber call
563 if (oltFlowService.handleSubscriberFlows(sub, defaultBpId, multicastServiceName)) {
564 removeSubscriberFromQueue(sub);
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700565 }
Andrea Campanella61650a12022-01-24 18:09:44 -0800566 } else {
567 // this is a port event (ENABLED/DISABLED)
568 // means no subscriber was provisioned on that port
569
570 if (!deviceService.isAvailable(sub.device.id()) ||
571 deviceService.getPort(sub.device.id(), sub.port.number()) == null) {
572 // If the device is not connected or the port is not available do nothing
573 // This can happen when we disable and then immediately delete the device,
574 // the queue is populated but the meters and flows are already gone
575 // thus there is nothing left to do
576 return;
577 }
578
579 if (oltFlowService.handleBasicPortFlows(sub, defaultBpId, defaultBpId)) {
580 if (log.isTraceEnabled()) {
581 log.trace("Processing of port {} completed",
582 portWithName(sub.port));
583 }
584 removeSubscriberFromQueue(sub);
Andrea Campanellaccb32862022-02-17 16:29:10 +0100585 } else {
586 log.debug("Not handling basic port flows " +
587 "for {}, leaving in the queue",
588 portWithName(sub.port));
Andrea Campanella61650a12022-01-24 18:09:44 -0800589 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700590 }
Tunahan Sezena07fe962021-02-24 08:24:24 +0000591 }
Andrea Campanella61650a12022-01-24 18:09:44 -0800592 });
593 } catch (Exception e) {
594 log.error("Exception processing subscriber", e);
Tunahan Sezena07fe962021-02-24 08:24:24 +0000595 }
596 });
Tunahan Sezena07fe962021-02-24 08:24:24 +0000597
Andrea Campanella61650a12022-01-24 18:09:44 -0800598 try {
599 TimeUnit.MILLISECONDS.sleep(requeueDelay);
600 } catch (InterruptedException e) {
601 log.debug("Interrupted while waiting to requeue", e);
602 }
Tunahan Sezena07fe962021-02-24 08:24:24 +0000603 }
Tunahan Sezena07fe962021-02-24 08:24:24 +0000604 }
605
Andrea Campanella61650a12022-01-24 18:09:44 -0800606
Tunahan Sezena07fe962021-02-24 08:24:24 +0000607 /**
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000608 * Checks the subscriber uni tag list and find the uni tag information.
609 * using the pon c tag, pon s tag and the technology profile id
610 * May return Optional<null>
611 *
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700612 * @param portName port of the subscriber
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000613 * @param innerVlan pon c tag
614 * @param outerVlan pon s tag
615 * @param tpId the technology profile id
616 * @return the found uni tag information
617 */
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700618 private UniTagInformation getUniTagInformation(String portName, VlanId innerVlan,
619 VlanId outerVlan, int tpId) {
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000620 log.debug("Getting uni tag information for {}, innerVlan: {}, outerVlan: {}, tpId: {}",
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700621 portName, innerVlan, outerVlan, tpId);
622 SubscriberAndDeviceInformation subInfo = subsService.get(portName);
Gamze Abakaf59c0912019-04-19 08:24:28 +0000623 if (subInfo == null) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700624 log.warn("Subscriber information doesn't exist for {}", portName);
625 return null;
Gamze Abakaf59c0912019-04-19 08:24:28 +0000626 }
627
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000628 List<UniTagInformation> uniTagList = subInfo.uniTagList();
629 if (uniTagList == null) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700630 log.warn("Uni tag list is not found for the subscriber {} on {}", subInfo.id(), portName);
631 return null;
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000632 }
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100633
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000634 UniTagInformation service = null;
635 for (UniTagInformation tagInfo : subInfo.uniTagList()) {
636 if (innerVlan.equals(tagInfo.getPonCTag()) && outerVlan.equals(tagInfo.getPonSTag())
637 && tpId == tagInfo.getTechnologyProfileId()) {
638 service = tagInfo;
639 break;
Andy Bavier160e8682019-05-07 18:32:22 -0700640 }
Gamze Abaka1efc80c2019-02-15 12:10:54 +0000641 }
Gamze Abaka1efc80c2019-02-15 12:10:54 +0000642
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000643 if (service == null) {
Matteo Scandolo19b56f62020-10-29 13:29:21 -0700644 log.warn("SADIS doesn't include the service with ponCtag {} ponStag {} and tpId {} on {}",
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700645 innerVlan, outerVlan, tpId, portName);
Saurav Das82b8e6d2018-10-04 15:25:12 -0700646 return null;
647 }
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100648
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700649 return service;
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100650 }
651
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700652 protected void bindSadisService(SadisService service) {
653 sadisService = service;
654 subsService = sadisService.getSubscriberInfoService();
655
656 log.info("Sadis-service binds to onos.");
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100657 }
658
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700659 protected void unbindSadisService(SadisService service) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700660 deviceListener = null;
661 sadisService = null;
662 subsService = null;
663 log.info("Sadis-service unbinds from onos.");
Jonathan Hart1d34c8b2018-05-05 15:37:28 -0700664 }
665
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700666 protected void addSubscriberToQueue(DiscoveredSubscriber sub) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700667 try {
Andrea Campanella61650a12022-01-24 18:09:44 -0800668 ConnectPoint cp = new ConnectPoint(sub.device.id(), sub.port.number());
Matteo Scandolod7aa89c2021-12-07 10:21:34 -0800669
Andrea Campanella61650a12022-01-24 18:09:44 -0800670 try {
671 queueWriteLock.lock();
672 eventsQueues.compute(cp, (subcp, queue) -> {
673 queue = queue == null ? new LinkedBlockingQueue<>() : queue;
674 log.info("Adding subscriber {} to queue: {} with existing {}",
675 sub, portWithName(sub.port), queue);
676 queue.add(sub);
677 return queue;
678 });
679 } catch (UnsupportedOperationException | ClassCastException |
680 NullPointerException | IllegalArgumentException e) {
681 log.error("Cannot add subscriber {} to queue: {}", portWithName(sub.port), e.getMessage());
682 } finally {
683 queueWriteLock.unlock();
684 }
685 } catch (Exception e) {
686 log.error("Can't add {} to queue", sub, e);
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800687 }
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800688 }
689
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700690 protected void removeSubscriberFromQueue(DiscoveredSubscriber sub) {
691 ConnectPoint cp = new ConnectPoint(sub.device.id(), sub.port.number());
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700692 try {
693 queueWriteLock.lock();
Andrea Campanella61650a12022-01-24 18:09:44 -0800694 eventsQueues.compute(cp, (subcp, queue) -> {
695 if (log.isTraceEnabled()) {
696 log.trace("Removing subscriber {} from queue : {} " +
697 "with existing {}", sub,
698 portWithName(sub.port), queue);
699 }
700 if (queue == null) {
701 log.warn("Cannot find queue for connectPoint {}", cp);
702 return queue;
703 }
704 boolean removed = queue.remove(sub);
705 if (!removed) {
706 log.warn("Subscriber {} has not been removed from queue, " +
707 "is it still there? {}", sub, queue);
708 return queue;
709 } else {
710 log.debug("Subscriber {} has been removed from the queue {}",
711 sub, queue);
712 }
713
714 return queue;
715 });
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700716 } catch (UnsupportedOperationException | ClassCastException |
717 NullPointerException | IllegalArgumentException e) {
718 log.error("Cannot remove subscriber {} from queue: {}", sub, e.getMessage());
719 } finally {
720 queueWriteLock.unlock();
Tunahan Sezena07fe962021-02-24 08:24:24 +0000721 }
722 }
723
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700724 protected class OltDeviceListener
725 implements DeviceListener {
726
727 private final Logger log = LoggerFactory.getLogger(getClass());
728 protected ExecutorService eventExecutor;
729
730 /**
731 * Builds the listener with all the proper services and information needed.
732 */
733 public OltDeviceListener() {
734 this.eventExecutor = Executors.newFixedThreadPool(flowProcessingThreads,
735 groupedThreads("onos/olt-device-listener-event", "event-%d", log));
736 }
737
738 public void deactivate() {
739 this.eventExecutor.shutdown();
740 }
Saurav Dasa9d5f442019-03-06 19:32:48 -0800741
alshabibf0e7e702015-05-30 18:22:36 -0700742 @Override
743 public void event(DeviceEvent event) {
Andrea Campanella61650a12022-01-24 18:09:44 -0800744 if (log.isTraceEnabled()) {
745 log.trace("OltListener receives event {} for: {}/{}", event.type(), event.subject().id(),
746 event.port() != null ? event.port().number() : null);
747 }
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700748 eventExecutor.execute(() -> {
Andrea Campanella61650a12022-01-24 18:09:44 -0800749 if (log.isTraceEnabled()) {
750 log.trace("OltListener Executor receives event {} for: {}/{}",
751 event.type(), event.subject().id(),
752 event.port() != null ? event.port().number() : null);
753 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700754 boolean isOlt = oltDeviceService.isOlt(event.subject());
755 DeviceId deviceId = event.subject().id();
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700756 switch (event.type()) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700757 case PORT_STATS_UPDATED:
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700758 case DEVICE_ADDED:
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700759 return;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700760 case PORT_ADDED:
761 case PORT_UPDATED:
762 case PORT_REMOVED:
763 if (!isOlt) {
764 log.trace("Ignoring event {}, this is not an OLT device", deviceId);
765 return;
766 }
767 if (!oltDeviceService.isLocalLeader(deviceId)) {
768 log.trace("Device {} is not local to this node", deviceId);
769 return;
770 }
771 // port added, updated and removed are treated in the same way as we only care whether the port
772 // is enabled or not
773 handleOltPort(event.type(), event.subject(), event.port());
774 return;
775 case DEVICE_AVAILABILITY_CHANGED:
776 if (!isOlt) {
777 log.trace("Ignoring event {}, this is not an OLT device", deviceId);
778 return;
779 }
780 if (deviceService.isAvailable(deviceId)) {
781 if (!oltDeviceService.isLocalLeader(deviceId)) {
782 if (log.isTraceEnabled()) {
783 log.trace("Device {} is not local to this node, not handling available device",
784 deviceId);
785 }
786 } else {
787 log.info("Handling available device: {}", deviceId);
788 handleExistingPorts();
789 }
790 } else if (!deviceService.isAvailable(deviceId) && deviceService.getPorts(deviceId).isEmpty()) {
791 // NOTE that upon disconnection there is no mastership on the device,
792 // and we should anyway clear the local cache of the flows/meters across instances.
793 // We're only clearing the device if there are no available ports,
794 // otherwise we assume it's a temporary disconnection
795 log.info("Device {} availability changed to false and ports are empty, " +
796 "purging meters and flows", deviceId);
797 //NOTE all the instances will call these methods
798 oltFlowService.purgeDeviceFlows(deviceId);
799 oltMeterService.purgeDeviceMeters(deviceId);
Andrea Campanellaccb32862022-02-17 16:29:10 +0100800 // cpStatus is a distributed map, thus only master will update it.
801 if (oltDeviceService.isLocalLeader(deviceId)) {
802 log.debug("Master, clearing cp status for {}", deviceId);
803 clearQueueForDevice(deviceId);
804 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700805 } else {
806 log.info("Device {} availability changed to false, but ports are still available, " +
Matteo Scandolod7aa89c2021-12-07 10:21:34 -0800807 "assuming temporary disconnection.",
808 deviceId);
809 if (log.isTraceEnabled()) {
810 log.trace("Available ports: {}", deviceService.getPorts(deviceId));
811 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700812 }
813 return;
814 case DEVICE_REMOVED:
815 if (!isOlt) {
816 log.trace("Ignoring event {}, this is not an OLT device", deviceId);
817 return;
818 }
819 log.info("Device {} Removed, purging meters and flows", deviceId);
820 oltFlowService.purgeDeviceFlows(deviceId);
821 oltMeterService.purgeDeviceMeters(deviceId);
Andrea Campanellaccb32862022-02-17 16:29:10 +0100822 if (oltDeviceService.isLocalLeader(deviceId)) {
823 log.debug("Master, clearing cp status for {}", deviceId);
824 clearQueueForDevice(deviceId);
825 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700826 return;
827 default:
828 if (log.isTraceEnabled()) {
829 log.trace("Not handling event: {}, ", event);
830 }
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700831 }
832 });
alshabibf0e7e702015-05-30 18:22:36 -0700833 }
Gamze Abaka838d8142019-02-21 07:06:55 +0000834
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700835 protected void clearQueueForDevice(DeviceId devId) {
836 try {
837 queueWriteLock.lock();
838 Iterator<Map.Entry<ConnectPoint, LinkedBlockingQueue<DiscoveredSubscriber>>> iter =
839 eventsQueues.entrySet().iterator();
840 while (iter.hasNext()) {
841 Map.Entry<ConnectPoint, LinkedBlockingQueue<DiscoveredSubscriber>> entry = iter.next();
842 if (entry.getKey().deviceId().equals(devId)) {
843 eventsQueues.remove(entry.getKey());
Andrea Campanellaccb32862022-02-17 16:29:10 +0100844 log.debug("Removing key from queue {}", entry.getKey());
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700845 }
846 }
847 } finally {
848 queueWriteLock.unlock();
Gamze Abaka838d8142019-02-21 07:06:55 +0000849 }
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000850 }
851
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700852 protected void handleOltPort(DeviceEvent.Type type, Device device, Port port) {
853 log.info("OltDeviceListener receives event {} for port {} with status {} on device {}", type,
854 portWithName(port), port.isEnabled() ? "ENABLED" : "DISABLED", device.id());
Andrea Campanella61650a12022-01-24 18:09:44 -0800855 boolean isNni = oltDeviceService.isNniPort(device, port.number());
856
857 if (!isNni && type == DeviceEvent.Type.PORT_ADDED) {
858 log.debug("Ignoring PORT_ADD on UNI port {}", portWithName(port));
859 return;
860 }
861
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700862 if (port.isEnabled()) {
Andrea Campanella61650a12022-01-24 18:09:44 -0800863 if (isNni) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700864 OltFlowService.FlowOperation action = OltFlowService.FlowOperation.ADD;
Matteo Scandolo1f8de332021-12-06 12:18:24 -0800865 // NOTE the NNI is only disabled if the OLT shuts down (reboot or failure).
866 // In that case the flows are purged anyway, so there's no need to deal with them,
867 // it would actually be counter-productive as the openflow connection is severed and they won't
868 // be correctly processed
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700869 if (type == DeviceEvent.Type.PORT_REMOVED) {
Matteo Scandolo1f8de332021-12-06 12:18:24 -0800870 log.debug("NNI port went down, " +
871 "ignoring event as flows will be removed in the generic device cleanup");
872 return;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700873 }
874 oltFlowService.handleNniFlows(device, port, action);
875 } else {
876 post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_ADDED, device.id(), port));
877 // NOTE if the subscriber was previously provisioned,
878 // then add it back to the queue to be re-provisioned
879 boolean provisionSubscriber = oltFlowService.
880 isSubscriberServiceProvisioned(new AccessDevicePort(port));
881 SubscriberAndDeviceInformation si = subsService.get(getPortName(port));
882 if (si == null) {
883 //NOTE this should not happen given that the subscriber was provisioned before
884 log.error("Subscriber information not found in sadis for port {}",
885 portWithName(port));
886 return;
887 }
888
889 DiscoveredSubscriber.Status status = DiscoveredSubscriber.Status.ADDED;
890 if (type == DeviceEvent.Type.PORT_REMOVED) {
891 status = DiscoveredSubscriber.Status.REMOVED;
892 }
893
894 DiscoveredSubscriber sub =
895 new DiscoveredSubscriber(device, port,
896 status, provisionSubscriber, si);
897 addSubscriberToQueue(sub);
898 }
899 } else {
Andrea Campanella61650a12022-01-24 18:09:44 -0800900 if (isNni) {
Matteo Scandolob6981dc2021-12-02 16:31:44 -0800901 // NOTE the NNI is only disabled if the OLT shuts down (reboot or failure).
902 // In that case the flows are purged anyway, so there's no need to deal with them,
903 // it would actually be counter-productive as the openflow connection is severed and they won't
904 // be correctly processed
905 log.debug("NNI port went down, " +
906 "ignoring event as flows will be removed in the generic device cleanup");
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700907 } else {
908 post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_REMOVED, device.id(), port));
909 // NOTE we are assuming that if a subscriber has default eapol
910 // it does not have subscriber flows
911 if (oltFlowService.hasDefaultEapol(port)) {
912 SubscriberAndDeviceInformation si = subsService.get(getPortName(port));
913 if (si == null) {
914 //NOTE this should not happen given that the subscriber was provisioned before
915 log.error("Subscriber information not found in sadis for port {}",
916 portWithName(port));
917 return;
918 }
919 DiscoveredSubscriber sub =
920 new DiscoveredSubscriber(device, port,
921 DiscoveredSubscriber.Status.REMOVED, false, si);
922
923 addSubscriberToQueue(sub);
924
925 } else if (oltFlowService.isSubscriberServiceProvisioned(new AccessDevicePort(port))) {
926 SubscriberAndDeviceInformation si = subsService.get(getPortName(port));
927 if (si == null) {
928 //NOTE this should not happen given that the subscriber was provisioned before
929 log.error("Subscriber information not found in sadis for port {}",
930 portWithName(port));
931 return;
932 }
933 DiscoveredSubscriber sub =
934 new DiscoveredSubscriber(device, port,
935 DiscoveredSubscriber.Status.REMOVED, true, si);
936 addSubscriberToQueue(sub);
937 }
938 }
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000939 }
Gamze Abaka838d8142019-02-21 07:06:55 +0000940 }
Gamze Abakada282b42019-03-11 13:16:48 +0000941
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700942 /**
943 * This method is invoked on app activation in order to deal
944 * with devices and ports that are already existing in the system
945 * and thus won't trigger an event.
946 * It is also needed on instance reboot and device reconnect
947 */
948 protected void handleExistingPorts() {
949 Iterable<DeviceId> devices = getConnectedOlts();
950 for (DeviceId deviceId : devices) {
951 log.info("Handling existing OLT Ports for device {}", deviceId);
952 if (oltDeviceService.isLocalLeader(deviceId)) {
953 List<Port> ports = deviceService.getPorts(deviceId);
954 for (Port p : ports) {
955 if (PortNumber.LOCAL.equals(p.number()) || !p.isEnabled()) {
956 continue;
957 }
958 Device device = deviceService.getDevice(deviceId);
959 deviceListener.handleOltPort(DeviceEvent.Type.PORT_UPDATED, device, p);
960 }
961 }
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800962 }
963 }
964 }
Andrea Campanella61650a12022-01-24 18:09:44 -0800965
966 protected final class ThreadPoolQueue extends ArrayBlockingQueue<Runnable> {
967
968 public ThreadPoolQueue(int capacity) {
969 super(capacity);
970 }
971
972 @Override
973 public boolean offer(Runnable runnable) {
974 if (runnable == null) {
975 return false;
976 }
977 try {
978 put(runnable);
979 } catch (InterruptedException e1) {
980 Thread.currentThread().interrupt();
981 return false;
982 }
983 return true;
984 }
985
986 }
Hardik Windlass395ff372019-06-13 05:16:00 +0000987}