blob: e3915375411ae44b55340c1143a863fb15c3a10d [file] [log] [blame]
alshabibf0e7e702015-05-30 18:22:36 -07001/*
Matteo Scandoloaa2adde2021-09-13 12:45:32 -07002 * Copyright 2021-present Open Networking Foundation
alshabibf0e7e702015-05-30 18:22:36 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
alshabib36a4d732016-06-01 16:03:59 -070016package org.opencord.olt.impl;
alshabibf0e7e702015-05-30 18:22:36 -070017
alshabibf0e7e702015-05-30 18:22:36 -070018import org.onlab.packet.VlanId;
Jonathan Hart4f178fa2020-02-03 10:46:01 -080019import org.onlab.util.KryoNamespace;
Gamze Abaka1f62dd92020-05-07 08:58:13 +000020import org.onosproject.cfg.ComponentConfigService;
Jonathan Hart4f178fa2020-02-03 10:46:01 -080021import org.onosproject.cluster.ClusterService;
Andrea Campanellaaf39b4c2020-05-13 14:07:44 +020022import org.onosproject.cluster.LeadershipService;
alshabibf0e7e702015-05-30 18:22:36 -070023import org.onosproject.core.ApplicationId;
24import org.onosproject.core.CoreService;
alshabib8e4fd2f2016-01-12 15:55:53 -080025import org.onosproject.event.AbstractListenerManager;
Andrea Campanellaaf39b4c2020-05-13 14:07:44 +020026import org.onosproject.mastership.MastershipService;
Amit Ghosh1ed9aef2018-07-17 17:08:16 +010027import org.onosproject.net.AnnotationKeys;
Jonathan Harte533a422015-10-20 17:31:24 -070028import org.onosproject.net.ConnectPoint;
Amit Ghosh1ed9aef2018-07-17 17:08:16 +010029import org.onosproject.net.Device;
alshabibf0e7e702015-05-30 18:22:36 -070030import org.onosproject.net.DeviceId;
alshabibdec2e252016-01-15 12:20:25 -080031import org.onosproject.net.Port;
alshabibf0e7e702015-05-30 18:22:36 -070032import org.onosproject.net.PortNumber;
33import org.onosproject.net.device.DeviceEvent;
34import org.onosproject.net.device.DeviceListener;
35import org.onosproject.net.device.DeviceService;
Jonathan Hart4f178fa2020-02-03 10:46:01 -080036import org.onosproject.store.serializers.KryoNamespaces;
Jonathan Hart4f178fa2020-02-03 10:46:01 -080037import org.onosproject.store.service.Serializer;
38import org.onosproject.store.service.StorageService;
alshabib36a4d732016-06-01 16:03:59 -070039import org.opencord.olt.AccessDeviceEvent;
40import org.opencord.olt.AccessDeviceListener;
41import org.opencord.olt.AccessDeviceService;
Gamze Abaka641fc072018-09-04 09:16:27 +000042import org.opencord.sadis.BaseInformationService;
43import org.opencord.sadis.SadisService;
Amit Ghosh1ed9aef2018-07-17 17:08:16 +010044import org.opencord.sadis.SubscriberAndDeviceInformation;
Andrea Campanellacbbb7952019-11-25 06:38:41 +000045import org.opencord.sadis.UniTagInformation;
alshabibe0559672016-02-21 14:49:51 -080046import org.osgi.service.component.ComponentContext;
Carmelo Casconeca931162019-07-15 18:22:24 -070047import org.osgi.service.component.annotations.Activate;
48import org.osgi.service.component.annotations.Component;
49import org.osgi.service.component.annotations.Deactivate;
50import org.osgi.service.component.annotations.Modified;
51import org.osgi.service.component.annotations.Reference;
52import org.osgi.service.component.annotations.ReferenceCardinality;
Ilayda Ozdemir90a93622021-02-25 09:40:58 +000053import org.osgi.service.component.annotations.ReferencePolicy;
alshabibf0e7e702015-05-30 18:22:36 -070054import org.slf4j.Logger;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -070055import org.slf4j.LoggerFactory;
alshabibf0e7e702015-05-30 18:22:36 -070056
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010057import java.util.ArrayList;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010058import java.util.Dictionary;
Andrea Campanella61650a12022-01-24 18:09:44 -080059import java.util.HashSet;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -070060import java.util.Iterator;
61import java.util.LinkedList;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010062import java.util.List;
63import java.util.Map;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010064import java.util.Properties;
65import java.util.Set;
Andrea Campanella61650a12022-01-24 18:09:44 -080066import java.util.concurrent.ArrayBlockingQueue;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010067import java.util.concurrent.ExecutorService;
68import java.util.concurrent.Executors;
69import java.util.concurrent.LinkedBlockingQueue;
70import java.util.concurrent.ScheduledExecutorService;
Andrea Campanella61650a12022-01-24 18:09:44 -080071import java.util.concurrent.ThreadPoolExecutor;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010072import java.util.concurrent.TimeUnit;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -070073import java.util.concurrent.locks.Lock;
74import java.util.concurrent.locks.ReentrantReadWriteLock;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010075
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010076import static com.google.common.base.Strings.isNullOrEmpty;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010077import static org.onlab.util.Tools.get;
78import static org.onlab.util.Tools.groupedThreads;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -070079import static org.opencord.olt.impl.OltUtils.getPortName;
80import static org.opencord.olt.impl.OltUtils.portWithName;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010081import static org.opencord.olt.impl.OsgiPropertyConstants.*;
alshabibf0e7e702015-05-30 18:22:36 -070082
83/**
Matteo Scandoloaa2adde2021-09-13 12:45:32 -070084 * OLT Application.
alshabibf0e7e702015-05-30 18:22:36 -070085 */
Carmelo Casconeca931162019-07-15 18:22:24 -070086@Component(immediate = true,
87 property = {
Carmelo Casconeca931162019-07-15 18:22:24 -070088 DEFAULT_BP_ID + ":String=" + DEFAULT_BP_ID_DEFAULT,
Andrea Campanellacbbb7952019-11-25 06:38:41 +000089 DEFAULT_MCAST_SERVICE_NAME + ":String=" + DEFAULT_MCAST_SERVICE_NAME_DEFAULT,
Matteo Scandoloaa2adde2021-09-13 12:45:32 -070090 FLOW_PROCESSING_THREADS + ":Integer=" + FLOW_PROCESSING_THREADS_DEFAULT,
Andrea Campanella61650a12022-01-24 18:09:44 -080091 FLOW_EXECUTOR_QUEUE_SIZE + ":Integer=" + FLOW_EXECUTOR_QUEUE_SIZE_DEFAULT,
Matteo Scandoloaa2adde2021-09-13 12:45:32 -070092 SUBSCRIBER_PROCESSING_THREADS + ":Integer=" + SUBSCRIBER_PROCESSING_THREADS_DEFAULT,
93 REQUEUE_DELAY + ":Integer=" + REQUEUE_DELAY_DEFAULT
Carmelo Casconeca931162019-07-15 18:22:24 -070094 })
alshabib8e4fd2f2016-01-12 15:55:53 -080095public class Olt
96 extends AbstractListenerManager<AccessDeviceEvent, AccessDeviceListener>
97 implements AccessDeviceService {
alshabibf0e7e702015-05-30 18:22:36 -070098
Carmelo Casconeca931162019-07-15 18:22:24 -070099 @Reference(cardinality = ReferenceCardinality.MANDATORY)
alshabibf0e7e702015-05-30 18:22:36 -0700100 protected DeviceService deviceService;
101
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700102 @Reference(cardinality = ReferenceCardinality.MANDATORY)
103 protected ComponentConfigService cfgService;
Andrea Campanella438e1ad2021-03-26 11:41:16 +0100104
Carmelo Casconeca931162019-07-15 18:22:24 -0700105 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700106 protected MastershipService mastershipService;
alshabibf0e7e702015-05-30 18:22:36 -0700107
Andrea Campanella438e1ad2021-03-26 11:41:16 +0100108 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700109 protected ClusterService clusterService;
110
111 @Reference(cardinality = ReferenceCardinality.MANDATORY)
112 protected LeadershipService leadershipService;
Andrea Campanella438e1ad2021-03-26 11:41:16 +0100113
Ilayda Ozdemir90a93622021-02-25 09:40:58 +0000114 @Reference(cardinality = ReferenceCardinality.OPTIONAL,
115 bind = "bindSadisService",
116 unbind = "unbindSadisService",
117 policy = ReferencePolicy.DYNAMIC)
118 protected volatile SadisService sadisService;
Gamze Abaka641fc072018-09-04 09:16:27 +0000119
Carmelo Casconeca931162019-07-15 18:22:24 -0700120 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700121 protected OltDeviceServiceInterface oltDeviceService;
alshabibe0559672016-02-21 14:49:51 -0800122
Carmelo Casconeca931162019-07-15 18:22:24 -0700123 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700124 protected OltFlowServiceInterface oltFlowService;
125
126 @Reference(cardinality = ReferenceCardinality.MANDATORY)
127 protected OltMeterServiceInterface oltMeterService;
Gamze Abakaad329652018-12-20 10:12:21 +0000128
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800129 @Reference(cardinality = ReferenceCardinality.MANDATORY)
130 protected StorageService storageService;
131
132 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700133 protected CoreService coreService;
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800134
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700135 protected ApplicationId appId;
Andrea Campanellaaf39b4c2020-05-13 14:07:44 +0200136
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700137 private static final String ONOS_OLT_SERVICE = "onos/olt-service";
Tunahan Sezena07fe962021-02-24 08:24:24 +0000138
Carmelo Casconeca931162019-07-15 18:22:24 -0700139 /**
Carmelo Cascone95ff5122019-11-14 14:19:13 -0800140 * Default bandwidth profile id that is used for authentication trap flows.
Carmelo Casconeca931162019-07-15 18:22:24 -0700141 **/
142 protected String defaultBpId = DEFAULT_BP_ID_DEFAULT;
Gamze Abakaad329652018-12-20 10:12:21 +0000143
Carmelo Casconeca931162019-07-15 18:22:24 -0700144 /**
Gamze Abaka51a34e82020-05-08 13:03:14 +0000145 * Default multicast service name.
Carmelo Casconeca931162019-07-15 18:22:24 -0700146 **/
Gamze Abaka51a34e82020-05-08 13:03:14 +0000147 protected String multicastServiceName = DEFAULT_MCAST_SERVICE_NAME_DEFAULT;
Gamze Abaka33feef52019-02-27 08:16:47 +0000148
Saurav Das2d3777a2020-08-07 18:48:51 -0700149 /**
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700150 * Number of threads used to process flows.
Saurav Das2d3777a2020-08-07 18:48:51 -0700151 **/
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700152 protected int flowProcessingThreads = FLOW_PROCESSING_THREADS_DEFAULT;
Saurav Das2d3777a2020-08-07 18:48:51 -0700153
Matteo Scandoloa4aaa972020-10-23 15:24:38 -0700154 /**
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700155 * Number of threads used to process flows.
156 **/
Andrea Campanella61650a12022-01-24 18:09:44 -0800157 protected int flowExecutorQueueSize = FLOW_EXECUTOR_QUEUE_SIZE_DEFAULT;
158
159 /**
160 * Number of threads used to process flows.
161 **/
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700162 protected int subscriberProcessingThreads = SUBSCRIBER_PROCESSING_THREADS_DEFAULT;
163
164 /**
165 * Delay in ms to put an event back in the queue, used to avoid retrying things to often if conditions are not met.
166 **/
167 protected int requeueDelay = REQUEUE_DELAY_DEFAULT;
168
169 private final Logger log = LoggerFactory.getLogger(getClass());
170
171 /**
172 * A queue to asynchronously process events.
Matteo Scandoloa4aaa972020-10-23 15:24:38 -0700173 */
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700174 protected Map<ConnectPoint, LinkedBlockingQueue<DiscoveredSubscriber>> eventsQueues;
alshabibf0e7e702015-05-30 18:22:36 -0700175
Gamze Abaka641fc072018-09-04 09:16:27 +0000176 protected BaseInformationService<SubscriberAndDeviceInformation> subsService;
alshabibf0e7e702015-05-30 18:22:36 -0700177
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700178 /**
179 * Listener for OLT devices events.
180 */
181 protected OltDeviceListener deviceListener = new OltDeviceListener();
182 protected ScheduledExecutorService discoveredSubscriberExecutor =
183 Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/olt",
184 "discovered-cp-%d", log));
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100185
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700186 protected ScheduledExecutorService queueExecutor =
187 Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/olt",
188 "discovered-cp-restore-%d", log));
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700189
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700190 /**
191 * Executor used to defer flow provisioning to a different thread pool.
192 */
193 protected ExecutorService flowsExecutor;
Saurav Dasa9d5f442019-03-06 19:32:48 -0800194
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700195 /**
196 * Executor used to defer subscriber handling from API call to a different thread pool.
197 */
198 protected ExecutorService subscriberExecutor;
199
200 private static final String APP_NAME = "org.opencord.olt";
201
202 private final ReentrantReadWriteLock queueLock = new ReentrantReadWriteLock();
203 private final Lock queueWriteLock = queueLock.writeLock();
204 private final Lock queueReadLock = queueLock.readLock();
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700205
alshabibf0e7e702015-05-30 18:22:36 -0700206 @Activate
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700207 protected void activate(ComponentContext context) {
208 cfgService.registerProperties(getClass());
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700209
alshabibe0559672016-02-21 14:49:51 -0800210 modified(context);
Saurav Das62ad75e2019-03-05 12:22:22 -0800211
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700212 appId = coreService.registerApplication(APP_NAME);
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800213 KryoNamespace serializer = KryoNamespace.newBuilder()
214 .register(KryoNamespaces.API)
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700215 .register(ConnectPoint.class)
216 .register(DiscoveredSubscriber.class)
217 .register(DiscoveredSubscriber.Status.class)
218 .register(SubscriberAndDeviceInformation.class)
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800219 .register(UniTagInformation.class)
Ilayda Ozdemir90a93622021-02-25 09:40:58 +0000220 .register(LinkedBlockingQueue.class)
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800221 .build();
222
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700223 eventsQueues = storageService.<ConnectPoint, LinkedBlockingQueue<DiscoveredSubscriber>>consistentMapBuilder()
224 .withName("volt-subscriber-queues")
225 .withApplicationId(appId)
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800226 .withSerializer(Serializer.using(serializer))
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700227 .build().asJavaMap();
alshabib4ceaed32016-03-03 18:00:58 -0800228
alshabibba357492016-01-27 13:49:46 -0800229 deviceService.addListener(deviceListener);
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700230
231 discoveredSubscriberExecutor.execute(this::processDiscoveredSubscribers);
232 eventDispatcher.addSink(AccessDeviceEvent.class, listenerRegistry);
233 log.info("Started");
234
235 deviceListener.handleExistingPorts();
alshabibf0e7e702015-05-30 18:22:36 -0700236 }
237
238 @Deactivate
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700239 protected void deactivate(ComponentContext context) {
240 cfgService.unregisterProperties(getClass(), false);
241 discoveredSubscriberExecutor.shutdown();
Andrea Campanellaeaf23952021-12-30 15:58:54 +0100242 deviceService.removeListener(deviceListener);
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700243 flowsExecutor.shutdown();
244 subscriberExecutor.shutdown();
245 deviceListener.deactivate();
alshabibf0e7e702015-05-30 18:22:36 -0700246 log.info("Stopped");
247 }
248
alshabibe0559672016-02-21 14:49:51 -0800249 @Modified
250 public void modified(ComponentContext context) {
251 Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700252 if (context != null) {
Andrea Campanella971d5b92020-05-07 11:20:43 +0200253 String bpId = get(properties, DEFAULT_BP_ID);
254 defaultBpId = isNullOrEmpty(bpId) ? defaultBpId : bpId;
Gamze Abakaad329652018-12-20 10:12:21 +0000255
Andrea Campanella971d5b92020-05-07 11:20:43 +0200256 String mcastSN = get(properties, DEFAULT_MCAST_SERVICE_NAME);
257 multicastServiceName = isNullOrEmpty(mcastSN) ? multicastServiceName : mcastSN;
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000258
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700259 String flowThreads = get(properties, FLOW_PROCESSING_THREADS);
260 int oldFlowProcessingThreads = flowProcessingThreads;
261 flowProcessingThreads = isNullOrEmpty(flowThreads) ?
262 oldFlowProcessingThreads : Integer.parseInt(flowThreads.trim());
Saurav Das2d3777a2020-08-07 18:48:51 -0700263
Andrea Campanella61650a12022-01-24 18:09:44 -0800264 String executorQueueSize = get(properties, FLOW_EXECUTOR_QUEUE_SIZE);
265 int oldExecutorQueueSize = flowExecutorQueueSize;
266 flowExecutorQueueSize = isNullOrEmpty(executorQueueSize) ?
267 oldExecutorQueueSize : Integer.parseInt(executorQueueSize.trim());
268
269 if (flowsExecutor == null || oldFlowProcessingThreads != flowProcessingThreads
270 || oldExecutorQueueSize != flowExecutorQueueSize) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700271 if (flowsExecutor != null) {
272 flowsExecutor.shutdown();
273 }
Andrea Campanella61650a12022-01-24 18:09:44 -0800274
275 flowsExecutor = new ThreadPoolExecutor(0, flowProcessingThreads, 30,
276 TimeUnit.SECONDS, new ThreadPoolQueue(flowExecutorQueueSize),
277 new ThreadPoolExecutor.DiscardPolicy());
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700278 }
Gamze Abaka33feef52019-02-27 08:16:47 +0000279
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700280 String subscriberThreads = get(properties, SUBSCRIBER_PROCESSING_THREADS);
281 int oldSubscriberProcessingThreads = subscriberProcessingThreads;
282 subscriberProcessingThreads = isNullOrEmpty(subscriberThreads) ?
283 oldSubscriberProcessingThreads : Integer.parseInt(subscriberThreads.trim());
284
285 if (subscriberExecutor == null || oldSubscriberProcessingThreads != subscriberProcessingThreads) {
286 if (subscriberExecutor != null) {
287 subscriberExecutor.shutdown();
288 }
289 subscriberExecutor = Executors.newFixedThreadPool(subscriberProcessingThreads,
290 groupedThreads(ONOS_OLT_SERVICE,
291 "subscriber-installer-%d"));
292 }
293
294 String queueDelay = get(properties, REQUEUE_DELAY);
295 requeueDelay = isNullOrEmpty(queueDelay) ?
296 REQUEUE_DELAY_DEFAULT : Integer.parseInt(queueDelay.trim());
alshabibe0559672016-02-21 14:49:51 -0800297 }
Andrea Campanella61650a12022-01-24 18:09:44 -0800298 log.info("Modified. Values = {}: {}, {}:{}, {}:{}," +
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700299 "{}:{}, {}:{}, {}:{}",
300 DEFAULT_BP_ID, defaultBpId,
301 DEFAULT_MCAST_SERVICE_NAME, multicastServiceName,
302 FLOW_PROCESSING_THREADS, flowProcessingThreads,
Andrea Campanella61650a12022-01-24 18:09:44 -0800303 FLOW_EXECUTOR_QUEUE_SIZE, flowExecutorQueueSize,
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700304 SUBSCRIBER_PROCESSING_THREADS, subscriberProcessingThreads,
305 REQUEUE_DELAY, requeueDelay);
alshabibe0559672016-02-21 14:49:51 -0800306 }
307
Ilayda Ozdemir90a93622021-02-25 09:40:58 +0000308
alshabib32232c82016-02-25 17:57:24 -0500309 @Override
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700310 public boolean provisionSubscriber(ConnectPoint cp) {
311 subscriberExecutor.submit(() -> {
312 Device device = deviceService.getDevice(cp.deviceId());
313 Port port = deviceService.getPort(device.id(), cp.port());
314 AccessDevicePort accessDevicePort = new AccessDevicePort(port);
Hardik Windlass395ff372019-06-13 05:16:00 +0000315
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700316 if (oltDeviceService.isNniPort(device, port.number())) {
317 log.warn("will not provision a subscriber on the NNI {}", accessDevicePort);
318 return false;
Saurav Das026650f2020-09-21 18:56:35 -0700319 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700320
321 log.info("Provisioning subscriber on {}", accessDevicePort);
322
323 if (oltFlowService.isSubscriberServiceProvisioned(accessDevicePort)) {
324 log.error("Subscriber on {} is already provisioned", accessDevicePort);
325 return false;
326 }
327
328 SubscriberAndDeviceInformation si = subsService.get(getPortName(port));
329 if (si == null) {
330 log.error("Subscriber information not found in sadis for port {}", accessDevicePort);
331 return false;
332 }
333 DiscoveredSubscriber sub = new DiscoveredSubscriber(device, port,
334 DiscoveredSubscriber.Status.ADDED, true, si);
335
336 // NOTE we need to keep a list of the subscribers that are provisioned on a port,
337 // regardless of the flow status
338 si.uniTagList().forEach(uti -> {
339 ServiceKey sk = new ServiceKey(accessDevicePort, uti);
340 oltFlowService.updateProvisionedSubscriberStatus(sk, true);
341 });
342
343 addSubscriberToQueue(sub);
344 return true;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +0100345 });
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700346 //NOTE this only means we have taken the request in, nothing more.
Amit Ghosh31939522018-08-16 13:28:21 +0100347 return true;
alshabibbf23a1f2016-01-14 17:27:11 -0800348 }
349
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000350 @Override
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700351 public boolean removeSubscriber(ConnectPoint cp) {
352 subscriberExecutor.submit(() -> {
353 Device device = deviceService.getDevice(DeviceId.deviceId(cp.deviceId().toString()));
354 Port port = deviceService.getPort(device.id(), cp.port());
355 AccessDevicePort accessDevicePort = new AccessDevicePort(port);
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000356
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700357 if (oltDeviceService.isNniPort(device, port.number())) {
358 log.warn("will not un-provision a subscriber on the NNI {}",
359 accessDevicePort);
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100360 return false;
361 }
362
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700363 log.info("Un-provisioning subscriber on {}", accessDevicePort);
364
365 if (!oltFlowService.isSubscriberServiceProvisioned(accessDevicePort)) {
366 log.error("Subscriber on {} is not provisioned", accessDevicePort);
367 return false;
368 }
369
370 SubscriberAndDeviceInformation si = subsService.get(getPortName(port));
371 if (si == null) {
372 log.error("Subscriber information not found in sadis for port {}",
373 accessDevicePort);
374 // NOTE that we are returning true so that the subscriber is removed from the queue
375 // and we can move on provisioning others
376 return false;
377 }
378 DiscoveredSubscriber sub = new DiscoveredSubscriber(device, port,
379 DiscoveredSubscriber.Status.REMOVED, true, si);
380
381 // NOTE we need to keep a list of the subscribers that are provisioned on a port,
382 // regardless of the flow status
383 si.uniTagList().forEach(uti -> {
384 ServiceKey sk = new ServiceKey(accessDevicePort, uti);
385 oltFlowService.updateProvisionedSubscriberStatus(sk, false);
Gamze Abakaf59c0912019-04-19 08:24:28 +0000386 });
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700387
388 addSubscriberToQueue(sub);
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100389 return true;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700390 });
391 //NOTE this only means we have taken the request in, nothing more.
392 return true;
393 }
394
395 @Override
396 public boolean provisionSubscriber(ConnectPoint cp, VlanId cTag, VlanId sTag, Integer tpId) {
397 log.debug("Provisioning subscriber on {}, with cTag {}, stag {}, tpId {}",
398 cp, cTag, sTag, tpId);
399 Device device = deviceService.getDevice(cp.deviceId());
400 Port port = deviceService.getPort(device.id(), cp.port());
401 AccessDevicePort accessDevicePort = new AccessDevicePort(port);
402
403 if (oltDeviceService.isNniPort(device, port.number())) {
404 log.warn("will not provision a subscriber on the NNI {}", accessDevicePort);
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100405 return false;
406 }
Amit Ghosh95e2f652017-08-23 12:49:46 +0100407
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700408 SubscriberAndDeviceInformation si = new SubscriberAndDeviceInformation();
409 UniTagInformation specificService = getUniTagInformation(getPortName(port), cTag, sTag, tpId);
410 if (specificService == null) {
411 log.error("Can't find Information for subscriber on {}, with cTag {}, " +
412 "stag {}, tpId {}", cp, cTag, sTag, tpId);
Amit Ghosh31939522018-08-16 13:28:21 +0100413 return false;
414 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700415 List<UniTagInformation> uniTagInformationList = new LinkedList<>();
416 uniTagInformationList.add(specificService);
417 si.setUniTagList(uniTagInformationList);
418 DiscoveredSubscriber sub = new DiscoveredSubscriber(device, port,
419 DiscoveredSubscriber.Status.ADDED, true, si);
Amit Ghosh31939522018-08-16 13:28:21 +0100420
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700421 // NOTE we need to keep a list of the subscribers that are provisioned on a port,
422 // regardless of the flow status
423 ServiceKey sk = new ServiceKey(accessDevicePort, specificService);
424 if (oltFlowService.isSubscriberServiceProvisioned(sk)) {
425 log.error("Subscriber on {} is already provisioned", sk);
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100426 return false;
427 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700428 oltFlowService.updateProvisionedSubscriberStatus(sk, true);
429
430 addSubscriberToQueue(sub);
431 return true;
Amit Ghosh31939522018-08-16 13:28:21 +0100432 }
433
434 @Override
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700435 public boolean removeSubscriber(ConnectPoint cp, VlanId cTag, VlanId sTag, Integer tpId) {
436 log.debug("Un-provisioning subscriber on {} with cTag {}, stag {}, tpId {}",
437 cp, cTag, sTag, tpId);
438 Device device = deviceService.getDevice(cp.deviceId());
439 Port port = deviceService.getPort(device.id(), cp.port());
440 AccessDevicePort accessDevicePort = new AccessDevicePort(port);
441
442 if (oltDeviceService.isNniPort(device, port.number())) {
443 log.warn("will not un-provision a subscriber on the NNI {}",
444 accessDevicePort);
445 return false;
446 }
447
448 SubscriberAndDeviceInformation si = new SubscriberAndDeviceInformation();
449 UniTagInformation specificService = getUniTagInformation(getPortName(port), cTag, sTag, tpId);
450 if (specificService == null) {
451 log.error("Can't find Information for subscriber on {}, with cTag {}, " +
452 "stag {}, tpId {}", cp, cTag, sTag, tpId);
453 return false;
454 }
455 List<UniTagInformation> uniTagInformationList = new LinkedList<>();
456 uniTagInformationList.add(specificService);
457 si.setUniTagList(uniTagInformationList);
458 DiscoveredSubscriber sub = new DiscoveredSubscriber(device, port,
Andrea Campanella982fd332022-01-19 09:14:12 +0100459 DiscoveredSubscriber.Status.REMOVED, true, si);
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700460
461 // NOTE we need to keep a list of the subscribers that are provisioned on a port,
462 // regardless of the flow status
463 ServiceKey sk = new ServiceKey(accessDevicePort, specificService);
464 if (!oltFlowService.isSubscriberServiceProvisioned(sk)) {
465 log.error("Subscriber on {} is not provisioned", sk);
466 return false;
467 }
468 oltFlowService.updateProvisionedSubscriberStatus(sk, false);
469
470 addSubscriberToQueue(sub);
471 return true;
Saurav Das82b8e6d2018-10-04 15:25:12 -0700472 }
473
474 @Override
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700475 public List<DeviceId> getConnectedOlts() {
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100476 List<DeviceId> olts = new ArrayList<>();
477 Iterable<Device> devices = deviceService.getDevices();
478 for (Device d : devices) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700479 if (oltDeviceService.isOlt(d)) {
Saurav Das82b8e6d2018-10-04 15:25:12 -0700480 // So this is indeed an OLT device
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100481 olts.add(d.id());
482 }
483 }
484 return olts;
alshabibe0559672016-02-21 14:49:51 -0800485 }
486
Amit Ghosh31939522018-08-16 13:28:21 +0100487 /**
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700488 * Finds the connect-point to which a subscriber is connected.
Amit Ghosh31939522018-08-16 13:28:21 +0100489 *
490 * @param id The id of the subscriber, this is the same ID as in Sadis
491 * @return Subscribers ConnectPoint if found else null
492 */
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700493 @Override
494 public ConnectPoint findSubscriberConnectPoint(String id) {
Amit Ghosh31939522018-08-16 13:28:21 +0100495
496 Iterable<Device> devices = deviceService.getDevices();
497 for (Device d : devices) {
498 for (Port p : deviceService.getPorts(d.id())) {
499 log.trace("Comparing {} with {}", p.annotations().value(AnnotationKeys.PORT_NAME), id);
500 if (p.annotations().value(AnnotationKeys.PORT_NAME).equals(id)) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700501 log.debug("Found on {}", portWithName(p));
Amit Ghosh31939522018-08-16 13:28:21 +0100502 return new ConnectPoint(d.id(), p.number());
503 }
504 }
505 }
506 return null;
507 }
508
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700509 protected void processDiscoveredSubscribers() {
Gamze Abaka641fc072018-09-04 09:16:27 +0000510
Andrea Campanella61650a12022-01-24 18:09:44 -0800511 log.info("Started processDiscoveredSubscribers loop");
512 while (true) {
513 Set<ConnectPoint> discoveredCps;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700514 try {
515 queueReadLock.lock();
Andrea Campanella61650a12022-01-24 18:09:44 -0800516 discoveredCps = new HashSet<>(eventsQueues.keySet());
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700517 } catch (Exception e) {
Andrea Campanella61650a12022-01-24 18:09:44 -0800518 log.error("Cannot read keys from queue map", e);
519 continue;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700520 } finally {
521 queueReadLock.unlock();
Tunahan Sezena07fe962021-02-24 08:24:24 +0000522 }
Tunahan Sezena07fe962021-02-24 08:24:24 +0000523
Andrea Campanella61650a12022-01-24 18:09:44 -0800524 discoveredCps.forEach(cp -> {
525 LinkedBlockingQueue<DiscoveredSubscriber> eventsQueue;
526
527 try {
528 queueReadLock.lock();
529 eventsQueue = eventsQueues.get(cp);
530 } catch (Exception e) {
531 log.error("Cannot get key from queue map", e);
532 return;
533 } finally {
534 queueReadLock.unlock();
Andrea Campanella7a1d7e72020-11-05 10:40:10 +0100535 }
Tunahan Sezena07fe962021-02-24 08:24:24 +0000536
Andrea Campanella61650a12022-01-24 18:09:44 -0800537 if (!oltDeviceService.isLocalLeader(cp.deviceId())) {
538 // if we're not local leader for this device, ignore this queue
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700539 if (log.isTraceEnabled()) {
Andrea Campanella61650a12022-01-24 18:09:44 -0800540 log.trace("Ignoring queue on CP {} as not master of the device", cp);
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700541 }
Andrea Campanella61650a12022-01-24 18:09:44 -0800542 return;
543 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700544
Andrea Campanella61650a12022-01-24 18:09:44 -0800545 try {
546 flowsExecutor.execute(() -> {
547 if (!eventsQueue.isEmpty()) {
548 // we do not remove the event from the queue until it has been processed
549 // in that way we guarantee that events are processed in order
550 DiscoveredSubscriber sub = eventsQueue.peek();
551 if (sub == null) {
552 // the queue is empty
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700553 return;
554 }
555
Andrea Campanella61650a12022-01-24 18:09:44 -0800556 if (log.isTraceEnabled()) {
557 log.trace("Processing subscriber {} on port {} with status {}, has subscriber {}",
558 sub, portWithName(sub.port), sub.status, sub.hasSubscriber);
559 }
560
561 if (sub.hasSubscriber) {
562 // this is a provision subscriber call
563 if (oltFlowService.handleSubscriberFlows(sub, defaultBpId, multicastServiceName)) {
564 removeSubscriberFromQueue(sub);
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700565 }
Andrea Campanella61650a12022-01-24 18:09:44 -0800566 } else {
567 // this is a port event (ENABLED/DISABLED)
568 // means no subscriber was provisioned on that port
569
570 if (!deviceService.isAvailable(sub.device.id()) ||
571 deviceService.getPort(sub.device.id(), sub.port.number()) == null) {
572 // If the device is not connected or the port is not available do nothing
573 // This can happen when we disable and then immediately delete the device,
574 // the queue is populated but the meters and flows are already gone
575 // thus there is nothing left to do
576 return;
577 }
578
579 if (oltFlowService.handleBasicPortFlows(sub, defaultBpId, defaultBpId)) {
580 if (log.isTraceEnabled()) {
581 log.trace("Processing of port {} completed",
582 portWithName(sub.port));
583 }
584 removeSubscriberFromQueue(sub);
585 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700586 }
Tunahan Sezena07fe962021-02-24 08:24:24 +0000587 }
Andrea Campanella61650a12022-01-24 18:09:44 -0800588 });
589 } catch (Exception e) {
590 log.error("Exception processing subscriber", e);
Tunahan Sezena07fe962021-02-24 08:24:24 +0000591 }
592 });
Tunahan Sezena07fe962021-02-24 08:24:24 +0000593
Andrea Campanella61650a12022-01-24 18:09:44 -0800594 try {
595 TimeUnit.MILLISECONDS.sleep(requeueDelay);
596 } catch (InterruptedException e) {
597 log.debug("Interrupted while waiting to requeue", e);
598 }
Tunahan Sezena07fe962021-02-24 08:24:24 +0000599 }
Tunahan Sezena07fe962021-02-24 08:24:24 +0000600 }
601
Andrea Campanella61650a12022-01-24 18:09:44 -0800602
Tunahan Sezena07fe962021-02-24 08:24:24 +0000603 /**
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000604 * Checks the subscriber uni tag list and find the uni tag information.
605 * using the pon c tag, pon s tag and the technology profile id
606 * May return Optional<null>
607 *
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700608 * @param portName port of the subscriber
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000609 * @param innerVlan pon c tag
610 * @param outerVlan pon s tag
611 * @param tpId the technology profile id
612 * @return the found uni tag information
613 */
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700614 private UniTagInformation getUniTagInformation(String portName, VlanId innerVlan,
615 VlanId outerVlan, int tpId) {
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000616 log.debug("Getting uni tag information for {}, innerVlan: {}, outerVlan: {}, tpId: {}",
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700617 portName, innerVlan, outerVlan, tpId);
618 SubscriberAndDeviceInformation subInfo = subsService.get(portName);
Gamze Abakaf59c0912019-04-19 08:24:28 +0000619 if (subInfo == null) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700620 log.warn("Subscriber information doesn't exist for {}", portName);
621 return null;
Gamze Abakaf59c0912019-04-19 08:24:28 +0000622 }
623
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000624 List<UniTagInformation> uniTagList = subInfo.uniTagList();
625 if (uniTagList == null) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700626 log.warn("Uni tag list is not found for the subscriber {} on {}", subInfo.id(), portName);
627 return null;
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000628 }
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100629
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000630 UniTagInformation service = null;
631 for (UniTagInformation tagInfo : subInfo.uniTagList()) {
632 if (innerVlan.equals(tagInfo.getPonCTag()) && outerVlan.equals(tagInfo.getPonSTag())
633 && tpId == tagInfo.getTechnologyProfileId()) {
634 service = tagInfo;
635 break;
Andy Bavier160e8682019-05-07 18:32:22 -0700636 }
Gamze Abaka1efc80c2019-02-15 12:10:54 +0000637 }
Gamze Abaka1efc80c2019-02-15 12:10:54 +0000638
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000639 if (service == null) {
Matteo Scandolo19b56f62020-10-29 13:29:21 -0700640 log.warn("SADIS doesn't include the service with ponCtag {} ponStag {} and tpId {} on {}",
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700641 innerVlan, outerVlan, tpId, portName);
Saurav Das82b8e6d2018-10-04 15:25:12 -0700642 return null;
643 }
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100644
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700645 return service;
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100646 }
647
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700648 protected void bindSadisService(SadisService service) {
649 sadisService = service;
650 subsService = sadisService.getSubscriberInfoService();
651
652 log.info("Sadis-service binds to onos.");
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100653 }
654
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700655 protected void unbindSadisService(SadisService service) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700656 deviceListener = null;
657 sadisService = null;
658 subsService = null;
659 log.info("Sadis-service unbinds from onos.");
Jonathan Hart1d34c8b2018-05-05 15:37:28 -0700660 }
661
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700662 protected void addSubscriberToQueue(DiscoveredSubscriber sub) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700663 try {
Andrea Campanella61650a12022-01-24 18:09:44 -0800664 ConnectPoint cp = new ConnectPoint(sub.device.id(), sub.port.number());
Matteo Scandolod7aa89c2021-12-07 10:21:34 -0800665
Andrea Campanella61650a12022-01-24 18:09:44 -0800666 try {
667 queueWriteLock.lock();
668 eventsQueues.compute(cp, (subcp, queue) -> {
669 queue = queue == null ? new LinkedBlockingQueue<>() : queue;
670 log.info("Adding subscriber {} to queue: {} with existing {}",
671 sub, portWithName(sub.port), queue);
672 queue.add(sub);
673 return queue;
674 });
675 } catch (UnsupportedOperationException | ClassCastException |
676 NullPointerException | IllegalArgumentException e) {
677 log.error("Cannot add subscriber {} to queue: {}", portWithName(sub.port), e.getMessage());
678 } finally {
679 queueWriteLock.unlock();
680 }
681 } catch (Exception e) {
682 log.error("Can't add {} to queue", sub, e);
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800683 }
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800684 }
685
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700686 protected void removeSubscriberFromQueue(DiscoveredSubscriber sub) {
687 ConnectPoint cp = new ConnectPoint(sub.device.id(), sub.port.number());
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700688 try {
689 queueWriteLock.lock();
Andrea Campanella61650a12022-01-24 18:09:44 -0800690 eventsQueues.compute(cp, (subcp, queue) -> {
691 if (log.isTraceEnabled()) {
692 log.trace("Removing subscriber {} from queue : {} " +
693 "with existing {}", sub,
694 portWithName(sub.port), queue);
695 }
696 if (queue == null) {
697 log.warn("Cannot find queue for connectPoint {}", cp);
698 return queue;
699 }
700 boolean removed = queue.remove(sub);
701 if (!removed) {
702 log.warn("Subscriber {} has not been removed from queue, " +
703 "is it still there? {}", sub, queue);
704 return queue;
705 } else {
706 log.debug("Subscriber {} has been removed from the queue {}",
707 sub, queue);
708 }
709
710 return queue;
711 });
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700712 } catch (UnsupportedOperationException | ClassCastException |
713 NullPointerException | IllegalArgumentException e) {
714 log.error("Cannot remove subscriber {} from queue: {}", sub, e.getMessage());
715 } finally {
716 queueWriteLock.unlock();
Tunahan Sezena07fe962021-02-24 08:24:24 +0000717 }
718 }
719
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700720 protected class OltDeviceListener
721 implements DeviceListener {
722
723 private final Logger log = LoggerFactory.getLogger(getClass());
724 protected ExecutorService eventExecutor;
725
726 /**
727 * Builds the listener with all the proper services and information needed.
728 */
729 public OltDeviceListener() {
730 this.eventExecutor = Executors.newFixedThreadPool(flowProcessingThreads,
731 groupedThreads("onos/olt-device-listener-event", "event-%d", log));
732 }
733
734 public void deactivate() {
735 this.eventExecutor.shutdown();
736 }
Saurav Dasa9d5f442019-03-06 19:32:48 -0800737
alshabibf0e7e702015-05-30 18:22:36 -0700738 @Override
739 public void event(DeviceEvent event) {
Andrea Campanella61650a12022-01-24 18:09:44 -0800740 if (log.isTraceEnabled()) {
741 log.trace("OltListener receives event {} for: {}/{}", event.type(), event.subject().id(),
742 event.port() != null ? event.port().number() : null);
743 }
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700744 eventExecutor.execute(() -> {
Andrea Campanella61650a12022-01-24 18:09:44 -0800745 if (log.isTraceEnabled()) {
746 log.trace("OltListener Executor receives event {} for: {}/{}",
747 event.type(), event.subject().id(),
748 event.port() != null ? event.port().number() : null);
749 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700750 boolean isOlt = oltDeviceService.isOlt(event.subject());
751 DeviceId deviceId = event.subject().id();
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700752 switch (event.type()) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700753 case PORT_STATS_UPDATED:
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700754 case DEVICE_ADDED:
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700755 return;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700756 case PORT_ADDED:
757 case PORT_UPDATED:
758 case PORT_REMOVED:
759 if (!isOlt) {
760 log.trace("Ignoring event {}, this is not an OLT device", deviceId);
761 return;
762 }
763 if (!oltDeviceService.isLocalLeader(deviceId)) {
764 log.trace("Device {} is not local to this node", deviceId);
765 return;
766 }
767 // port added, updated and removed are treated in the same way as we only care whether the port
768 // is enabled or not
769 handleOltPort(event.type(), event.subject(), event.port());
770 return;
771 case DEVICE_AVAILABILITY_CHANGED:
772 if (!isOlt) {
773 log.trace("Ignoring event {}, this is not an OLT device", deviceId);
774 return;
775 }
776 if (deviceService.isAvailable(deviceId)) {
777 if (!oltDeviceService.isLocalLeader(deviceId)) {
778 if (log.isTraceEnabled()) {
779 log.trace("Device {} is not local to this node, not handling available device",
780 deviceId);
781 }
782 } else {
783 log.info("Handling available device: {}", deviceId);
784 handleExistingPorts();
785 }
786 } else if (!deviceService.isAvailable(deviceId) && deviceService.getPorts(deviceId).isEmpty()) {
787 // NOTE that upon disconnection there is no mastership on the device,
788 // and we should anyway clear the local cache of the flows/meters across instances.
789 // We're only clearing the device if there are no available ports,
790 // otherwise we assume it's a temporary disconnection
791 log.info("Device {} availability changed to false and ports are empty, " +
792 "purging meters and flows", deviceId);
793 //NOTE all the instances will call these methods
794 oltFlowService.purgeDeviceFlows(deviceId);
795 oltMeterService.purgeDeviceMeters(deviceId);
796 clearQueueForDevice(deviceId);
797 } else {
798 log.info("Device {} availability changed to false, but ports are still available, " +
Matteo Scandolod7aa89c2021-12-07 10:21:34 -0800799 "assuming temporary disconnection.",
800 deviceId);
801 if (log.isTraceEnabled()) {
802 log.trace("Available ports: {}", deviceService.getPorts(deviceId));
803 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700804 }
805 return;
806 case DEVICE_REMOVED:
807 if (!isOlt) {
808 log.trace("Ignoring event {}, this is not an OLT device", deviceId);
809 return;
810 }
811 log.info("Device {} Removed, purging meters and flows", deviceId);
812 oltFlowService.purgeDeviceFlows(deviceId);
813 oltMeterService.purgeDeviceMeters(deviceId);
814 clearQueueForDevice(deviceId);
815 return;
816 default:
817 if (log.isTraceEnabled()) {
818 log.trace("Not handling event: {}, ", event);
819 }
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700820 }
821 });
alshabibf0e7e702015-05-30 18:22:36 -0700822 }
Gamze Abaka838d8142019-02-21 07:06:55 +0000823
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700824 protected void clearQueueForDevice(DeviceId devId) {
825 try {
826 queueWriteLock.lock();
827 Iterator<Map.Entry<ConnectPoint, LinkedBlockingQueue<DiscoveredSubscriber>>> iter =
828 eventsQueues.entrySet().iterator();
829 while (iter.hasNext()) {
830 Map.Entry<ConnectPoint, LinkedBlockingQueue<DiscoveredSubscriber>> entry = iter.next();
831 if (entry.getKey().deviceId().equals(devId)) {
832 eventsQueues.remove(entry.getKey());
833 }
834 }
835 } finally {
836 queueWriteLock.unlock();
Gamze Abaka838d8142019-02-21 07:06:55 +0000837 }
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000838 }
839
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700840 protected void handleOltPort(DeviceEvent.Type type, Device device, Port port) {
841 log.info("OltDeviceListener receives event {} for port {} with status {} on device {}", type,
842 portWithName(port), port.isEnabled() ? "ENABLED" : "DISABLED", device.id());
Andrea Campanella61650a12022-01-24 18:09:44 -0800843 boolean isNni = oltDeviceService.isNniPort(device, port.number());
844
845 if (!isNni && type == DeviceEvent.Type.PORT_ADDED) {
846 log.debug("Ignoring PORT_ADD on UNI port {}", portWithName(port));
847 return;
848 }
849
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700850 if (port.isEnabled()) {
Andrea Campanella61650a12022-01-24 18:09:44 -0800851 if (isNni) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700852 OltFlowService.FlowOperation action = OltFlowService.FlowOperation.ADD;
Matteo Scandolo1f8de332021-12-06 12:18:24 -0800853 // NOTE the NNI is only disabled if the OLT shuts down (reboot or failure).
854 // In that case the flows are purged anyway, so there's no need to deal with them,
855 // it would actually be counter-productive as the openflow connection is severed and they won't
856 // be correctly processed
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700857 if (type == DeviceEvent.Type.PORT_REMOVED) {
Matteo Scandolo1f8de332021-12-06 12:18:24 -0800858 log.debug("NNI port went down, " +
859 "ignoring event as flows will be removed in the generic device cleanup");
860 return;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700861 }
862 oltFlowService.handleNniFlows(device, port, action);
863 } else {
864 post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_ADDED, device.id(), port));
865 // NOTE if the subscriber was previously provisioned,
866 // then add it back to the queue to be re-provisioned
867 boolean provisionSubscriber = oltFlowService.
868 isSubscriberServiceProvisioned(new AccessDevicePort(port));
869 SubscriberAndDeviceInformation si = subsService.get(getPortName(port));
870 if (si == null) {
871 //NOTE this should not happen given that the subscriber was provisioned before
872 log.error("Subscriber information not found in sadis for port {}",
873 portWithName(port));
874 return;
875 }
876
877 DiscoveredSubscriber.Status status = DiscoveredSubscriber.Status.ADDED;
878 if (type == DeviceEvent.Type.PORT_REMOVED) {
879 status = DiscoveredSubscriber.Status.REMOVED;
880 }
881
882 DiscoveredSubscriber sub =
883 new DiscoveredSubscriber(device, port,
884 status, provisionSubscriber, si);
885 addSubscriberToQueue(sub);
886 }
887 } else {
Andrea Campanella61650a12022-01-24 18:09:44 -0800888 if (isNni) {
Matteo Scandolob6981dc2021-12-02 16:31:44 -0800889 // NOTE the NNI is only disabled if the OLT shuts down (reboot or failure).
890 // In that case the flows are purged anyway, so there's no need to deal with them,
891 // it would actually be counter-productive as the openflow connection is severed and they won't
892 // be correctly processed
893 log.debug("NNI port went down, " +
894 "ignoring event as flows will be removed in the generic device cleanup");
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700895 } else {
896 post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_REMOVED, device.id(), port));
897 // NOTE we are assuming that if a subscriber has default eapol
898 // it does not have subscriber flows
899 if (oltFlowService.hasDefaultEapol(port)) {
900 SubscriberAndDeviceInformation si = subsService.get(getPortName(port));
901 if (si == null) {
902 //NOTE this should not happen given that the subscriber was provisioned before
903 log.error("Subscriber information not found in sadis for port {}",
904 portWithName(port));
905 return;
906 }
907 DiscoveredSubscriber sub =
908 new DiscoveredSubscriber(device, port,
909 DiscoveredSubscriber.Status.REMOVED, false, si);
910
911 addSubscriberToQueue(sub);
912
913 } else if (oltFlowService.isSubscriberServiceProvisioned(new AccessDevicePort(port))) {
914 SubscriberAndDeviceInformation si = subsService.get(getPortName(port));
915 if (si == null) {
916 //NOTE this should not happen given that the subscriber was provisioned before
917 log.error("Subscriber information not found in sadis for port {}",
918 portWithName(port));
919 return;
920 }
921 DiscoveredSubscriber sub =
922 new DiscoveredSubscriber(device, port,
923 DiscoveredSubscriber.Status.REMOVED, true, si);
924 addSubscriberToQueue(sub);
925 }
926 }
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000927 }
Gamze Abaka838d8142019-02-21 07:06:55 +0000928 }
Gamze Abakada282b42019-03-11 13:16:48 +0000929
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700930 /**
931 * This method is invoked on app activation in order to deal
932 * with devices and ports that are already existing in the system
933 * and thus won't trigger an event.
934 * It is also needed on instance reboot and device reconnect
935 */
936 protected void handleExistingPorts() {
937 Iterable<DeviceId> devices = getConnectedOlts();
938 for (DeviceId deviceId : devices) {
939 log.info("Handling existing OLT Ports for device {}", deviceId);
940 if (oltDeviceService.isLocalLeader(deviceId)) {
941 List<Port> ports = deviceService.getPorts(deviceId);
942 for (Port p : ports) {
943 if (PortNumber.LOCAL.equals(p.number()) || !p.isEnabled()) {
944 continue;
945 }
946 Device device = deviceService.getDevice(deviceId);
947 deviceListener.handleOltPort(DeviceEvent.Type.PORT_UPDATED, device, p);
948 }
949 }
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800950 }
951 }
952 }
Andrea Campanella61650a12022-01-24 18:09:44 -0800953
954 protected final class ThreadPoolQueue extends ArrayBlockingQueue<Runnable> {
955
956 public ThreadPoolQueue(int capacity) {
957 super(capacity);
958 }
959
960 @Override
961 public boolean offer(Runnable runnable) {
962 if (runnable == null) {
963 return false;
964 }
965 try {
966 put(runnable);
967 } catch (InterruptedException e1) {
968 Thread.currentThread().interrupt();
969 return false;
970 }
971 return true;
972 }
973
974 }
Hardik Windlass395ff372019-06-13 05:16:00 +0000975}