blob: ace976b35a115de9ea59d0a6bb8cc4dce14c7e08 [file] [log] [blame]
alshabibf0e7e702015-05-30 18:22:36 -07001/*
Matteo Scandoloaa2adde2021-09-13 12:45:32 -07002 * Copyright 2021-present Open Networking Foundation
alshabibf0e7e702015-05-30 18:22:36 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
alshabib36a4d732016-06-01 16:03:59 -070016package org.opencord.olt.impl;
alshabibf0e7e702015-05-30 18:22:36 -070017
alshabibf0e7e702015-05-30 18:22:36 -070018import org.onlab.packet.VlanId;
Jonathan Hart4f178fa2020-02-03 10:46:01 -080019import org.onlab.util.KryoNamespace;
Gamze Abaka1f62dd92020-05-07 08:58:13 +000020import org.onosproject.cfg.ComponentConfigService;
Jonathan Hart4f178fa2020-02-03 10:46:01 -080021import org.onosproject.cluster.ClusterService;
Andrea Campanellaaf39b4c2020-05-13 14:07:44 +020022import org.onosproject.cluster.LeadershipService;
alshabibf0e7e702015-05-30 18:22:36 -070023import org.onosproject.core.ApplicationId;
24import org.onosproject.core.CoreService;
alshabib8e4fd2f2016-01-12 15:55:53 -080025import org.onosproject.event.AbstractListenerManager;
Andrea Campanellaaf39b4c2020-05-13 14:07:44 +020026import org.onosproject.mastership.MastershipService;
Amit Ghosh1ed9aef2018-07-17 17:08:16 +010027import org.onosproject.net.AnnotationKeys;
Jonathan Harte533a422015-10-20 17:31:24 -070028import org.onosproject.net.ConnectPoint;
Amit Ghosh1ed9aef2018-07-17 17:08:16 +010029import org.onosproject.net.Device;
alshabibf0e7e702015-05-30 18:22:36 -070030import org.onosproject.net.DeviceId;
alshabibdec2e252016-01-15 12:20:25 -080031import org.onosproject.net.Port;
alshabibf0e7e702015-05-30 18:22:36 -070032import org.onosproject.net.PortNumber;
33import org.onosproject.net.device.DeviceEvent;
34import org.onosproject.net.device.DeviceListener;
35import org.onosproject.net.device.DeviceService;
Jonathan Hart4f178fa2020-02-03 10:46:01 -080036import org.onosproject.store.serializers.KryoNamespaces;
Jonathan Hart4f178fa2020-02-03 10:46:01 -080037import org.onosproject.store.service.Serializer;
38import org.onosproject.store.service.StorageService;
alshabib36a4d732016-06-01 16:03:59 -070039import org.opencord.olt.AccessDeviceEvent;
40import org.opencord.olt.AccessDeviceListener;
41import org.opencord.olt.AccessDeviceService;
Gamze Abaka641fc072018-09-04 09:16:27 +000042import org.opencord.sadis.BaseInformationService;
43import org.opencord.sadis.SadisService;
Amit Ghosh1ed9aef2018-07-17 17:08:16 +010044import org.opencord.sadis.SubscriberAndDeviceInformation;
Andrea Campanellacbbb7952019-11-25 06:38:41 +000045import org.opencord.sadis.UniTagInformation;
alshabibe0559672016-02-21 14:49:51 -080046import org.osgi.service.component.ComponentContext;
Carmelo Casconeca931162019-07-15 18:22:24 -070047import org.osgi.service.component.annotations.Activate;
48import org.osgi.service.component.annotations.Component;
49import org.osgi.service.component.annotations.Deactivate;
50import org.osgi.service.component.annotations.Modified;
51import org.osgi.service.component.annotations.Reference;
52import org.osgi.service.component.annotations.ReferenceCardinality;
Ilayda Ozdemir90a93622021-02-25 09:40:58 +000053import org.osgi.service.component.annotations.ReferencePolicy;
alshabibf0e7e702015-05-30 18:22:36 -070054import org.slf4j.Logger;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -070055import org.slf4j.LoggerFactory;
alshabibf0e7e702015-05-30 18:22:36 -070056
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010057import java.util.ArrayList;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010058import java.util.Dictionary;
Andrea Campanella61650a12022-01-24 18:09:44 -080059import java.util.HashSet;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -070060import java.util.Iterator;
61import java.util.LinkedList;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010062import java.util.List;
63import java.util.Map;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010064import java.util.Properties;
65import java.util.Set;
Andrea Campanella61650a12022-01-24 18:09:44 -080066import java.util.concurrent.ArrayBlockingQueue;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010067import java.util.concurrent.ExecutorService;
68import java.util.concurrent.Executors;
69import java.util.concurrent.LinkedBlockingQueue;
70import java.util.concurrent.ScheduledExecutorService;
Andrea Campanella61650a12022-01-24 18:09:44 -080071import java.util.concurrent.ThreadPoolExecutor;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010072import java.util.concurrent.TimeUnit;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -070073import java.util.concurrent.locks.Lock;
74import java.util.concurrent.locks.ReentrantReadWriteLock;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010075
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010076import static com.google.common.base.Strings.isNullOrEmpty;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010077import static org.onlab.util.Tools.get;
78import static org.onlab.util.Tools.groupedThreads;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -070079import static org.opencord.olt.impl.OltUtils.getPortName;
80import static org.opencord.olt.impl.OltUtils.portWithName;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010081import static org.opencord.olt.impl.OsgiPropertyConstants.*;
alshabibf0e7e702015-05-30 18:22:36 -070082
83/**
Matteo Scandoloaa2adde2021-09-13 12:45:32 -070084 * OLT Application.
alshabibf0e7e702015-05-30 18:22:36 -070085 */
Carmelo Casconeca931162019-07-15 18:22:24 -070086@Component(immediate = true,
87 property = {
Carmelo Casconeca931162019-07-15 18:22:24 -070088 DEFAULT_BP_ID + ":String=" + DEFAULT_BP_ID_DEFAULT,
Andrea Campanellacbbb7952019-11-25 06:38:41 +000089 DEFAULT_MCAST_SERVICE_NAME + ":String=" + DEFAULT_MCAST_SERVICE_NAME_DEFAULT,
Matteo Scandoloaa2adde2021-09-13 12:45:32 -070090 FLOW_PROCESSING_THREADS + ":Integer=" + FLOW_PROCESSING_THREADS_DEFAULT,
Andrea Campanella61650a12022-01-24 18:09:44 -080091 FLOW_EXECUTOR_QUEUE_SIZE + ":Integer=" + FLOW_EXECUTOR_QUEUE_SIZE_DEFAULT,
Matteo Scandoloaa2adde2021-09-13 12:45:32 -070092 SUBSCRIBER_PROCESSING_THREADS + ":Integer=" + SUBSCRIBER_PROCESSING_THREADS_DEFAULT,
93 REQUEUE_DELAY + ":Integer=" + REQUEUE_DELAY_DEFAULT
Carmelo Casconeca931162019-07-15 18:22:24 -070094 })
alshabib8e4fd2f2016-01-12 15:55:53 -080095public class Olt
96 extends AbstractListenerManager<AccessDeviceEvent, AccessDeviceListener>
97 implements AccessDeviceService {
alshabibf0e7e702015-05-30 18:22:36 -070098
Carmelo Casconeca931162019-07-15 18:22:24 -070099 @Reference(cardinality = ReferenceCardinality.MANDATORY)
alshabibf0e7e702015-05-30 18:22:36 -0700100 protected DeviceService deviceService;
101
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700102 @Reference(cardinality = ReferenceCardinality.MANDATORY)
103 protected ComponentConfigService cfgService;
Andrea Campanella438e1ad2021-03-26 11:41:16 +0100104
Carmelo Casconeca931162019-07-15 18:22:24 -0700105 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700106 protected MastershipService mastershipService;
alshabibf0e7e702015-05-30 18:22:36 -0700107
Andrea Campanella438e1ad2021-03-26 11:41:16 +0100108 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700109 protected ClusterService clusterService;
110
111 @Reference(cardinality = ReferenceCardinality.MANDATORY)
112 protected LeadershipService leadershipService;
Andrea Campanella438e1ad2021-03-26 11:41:16 +0100113
Ilayda Ozdemir90a93622021-02-25 09:40:58 +0000114 @Reference(cardinality = ReferenceCardinality.OPTIONAL,
115 bind = "bindSadisService",
116 unbind = "unbindSadisService",
117 policy = ReferencePolicy.DYNAMIC)
118 protected volatile SadisService sadisService;
Gamze Abaka641fc072018-09-04 09:16:27 +0000119
Carmelo Casconeca931162019-07-15 18:22:24 -0700120 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700121 protected OltDeviceServiceInterface oltDeviceService;
alshabibe0559672016-02-21 14:49:51 -0800122
Carmelo Casconeca931162019-07-15 18:22:24 -0700123 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700124 protected OltFlowServiceInterface oltFlowService;
125
126 @Reference(cardinality = ReferenceCardinality.MANDATORY)
127 protected OltMeterServiceInterface oltMeterService;
Gamze Abakaad329652018-12-20 10:12:21 +0000128
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800129 @Reference(cardinality = ReferenceCardinality.MANDATORY)
130 protected StorageService storageService;
131
132 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700133 protected CoreService coreService;
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800134
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700135 protected ApplicationId appId;
Andrea Campanellaaf39b4c2020-05-13 14:07:44 +0200136
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700137 private static final String ONOS_OLT_SERVICE = "onos/olt-service";
Tunahan Sezena07fe962021-02-24 08:24:24 +0000138
Carmelo Casconeca931162019-07-15 18:22:24 -0700139 /**
Carmelo Cascone95ff5122019-11-14 14:19:13 -0800140 * Default bandwidth profile id that is used for authentication trap flows.
Carmelo Casconeca931162019-07-15 18:22:24 -0700141 **/
142 protected String defaultBpId = DEFAULT_BP_ID_DEFAULT;
Gamze Abakaad329652018-12-20 10:12:21 +0000143
Carmelo Casconeca931162019-07-15 18:22:24 -0700144 /**
Gamze Abaka51a34e82020-05-08 13:03:14 +0000145 * Default multicast service name.
Carmelo Casconeca931162019-07-15 18:22:24 -0700146 **/
Gamze Abaka51a34e82020-05-08 13:03:14 +0000147 protected String multicastServiceName = DEFAULT_MCAST_SERVICE_NAME_DEFAULT;
Gamze Abaka33feef52019-02-27 08:16:47 +0000148
Saurav Das2d3777a2020-08-07 18:48:51 -0700149 /**
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700150 * Number of threads used to process flows.
Saurav Das2d3777a2020-08-07 18:48:51 -0700151 **/
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700152 protected int flowProcessingThreads = FLOW_PROCESSING_THREADS_DEFAULT;
Saurav Das2d3777a2020-08-07 18:48:51 -0700153
Matteo Scandoloa4aaa972020-10-23 15:24:38 -0700154 /**
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700155 * Number of threads used to process flows.
156 **/
Andrea Campanella61650a12022-01-24 18:09:44 -0800157 protected int flowExecutorQueueSize = FLOW_EXECUTOR_QUEUE_SIZE_DEFAULT;
158
159 /**
160 * Number of threads used to process flows.
161 **/
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700162 protected int subscriberProcessingThreads = SUBSCRIBER_PROCESSING_THREADS_DEFAULT;
163
164 /**
165 * Delay in ms to put an event back in the queue, used to avoid retrying things to often if conditions are not met.
166 **/
167 protected int requeueDelay = REQUEUE_DELAY_DEFAULT;
168
169 private final Logger log = LoggerFactory.getLogger(getClass());
170
171 /**
172 * A queue to asynchronously process events.
Matteo Scandoloa4aaa972020-10-23 15:24:38 -0700173 */
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700174 protected Map<ConnectPoint, LinkedBlockingQueue<DiscoveredSubscriber>> eventsQueues;
alshabibf0e7e702015-05-30 18:22:36 -0700175
Gamze Abaka641fc072018-09-04 09:16:27 +0000176 protected BaseInformationService<SubscriberAndDeviceInformation> subsService;
alshabibf0e7e702015-05-30 18:22:36 -0700177
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700178 /**
179 * Listener for OLT devices events.
180 */
181 protected OltDeviceListener deviceListener = new OltDeviceListener();
182 protected ScheduledExecutorService discoveredSubscriberExecutor =
183 Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/olt",
184 "discovered-cp-%d", log));
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100185
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700186 protected ScheduledExecutorService queueExecutor =
187 Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/olt",
188 "discovered-cp-restore-%d", log));
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700189
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700190 /**
191 * Executor used to defer flow provisioning to a different thread pool.
192 */
193 protected ExecutorService flowsExecutor;
Saurav Dasa9d5f442019-03-06 19:32:48 -0800194
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700195 /**
196 * Executor used to defer subscriber handling from API call to a different thread pool.
197 */
198 protected ExecutorService subscriberExecutor;
199
200 private static final String APP_NAME = "org.opencord.olt";
201
202 private final ReentrantReadWriteLock queueLock = new ReentrantReadWriteLock();
203 private final Lock queueWriteLock = queueLock.writeLock();
204 private final Lock queueReadLock = queueLock.readLock();
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700205
alshabibf0e7e702015-05-30 18:22:36 -0700206 @Activate
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700207 protected void activate(ComponentContext context) {
208 cfgService.registerProperties(getClass());
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700209
alshabibe0559672016-02-21 14:49:51 -0800210 modified(context);
Saurav Das62ad75e2019-03-05 12:22:22 -0800211
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700212 appId = coreService.registerApplication(APP_NAME);
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800213 KryoNamespace serializer = KryoNamespace.newBuilder()
214 .register(KryoNamespaces.API)
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700215 .register(ConnectPoint.class)
216 .register(DiscoveredSubscriber.class)
217 .register(DiscoveredSubscriber.Status.class)
218 .register(SubscriberAndDeviceInformation.class)
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800219 .register(UniTagInformation.class)
Ilayda Ozdemir90a93622021-02-25 09:40:58 +0000220 .register(LinkedBlockingQueue.class)
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800221 .build();
222
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700223 eventsQueues = storageService.<ConnectPoint, LinkedBlockingQueue<DiscoveredSubscriber>>consistentMapBuilder()
224 .withName("volt-subscriber-queues")
225 .withApplicationId(appId)
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800226 .withSerializer(Serializer.using(serializer))
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700227 .build().asJavaMap();
alshabib4ceaed32016-03-03 18:00:58 -0800228
alshabibba357492016-01-27 13:49:46 -0800229 deviceService.addListener(deviceListener);
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700230
231 discoveredSubscriberExecutor.execute(this::processDiscoveredSubscribers);
232 eventDispatcher.addSink(AccessDeviceEvent.class, listenerRegistry);
233 log.info("Started");
234
235 deviceListener.handleExistingPorts();
alshabibf0e7e702015-05-30 18:22:36 -0700236 }
237
238 @Deactivate
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700239 protected void deactivate(ComponentContext context) {
240 cfgService.unregisterProperties(getClass(), false);
241 discoveredSubscriberExecutor.shutdown();
Andrea Campanellaeaf23952021-12-30 15:58:54 +0100242 deviceService.removeListener(deviceListener);
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700243 flowsExecutor.shutdown();
244 subscriberExecutor.shutdown();
245 deviceListener.deactivate();
alshabibf0e7e702015-05-30 18:22:36 -0700246 log.info("Stopped");
247 }
248
alshabibe0559672016-02-21 14:49:51 -0800249 @Modified
250 public void modified(ComponentContext context) {
251 Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700252 if (context != null) {
Andrea Campanella971d5b92020-05-07 11:20:43 +0200253 String bpId = get(properties, DEFAULT_BP_ID);
254 defaultBpId = isNullOrEmpty(bpId) ? defaultBpId : bpId;
Gamze Abakaad329652018-12-20 10:12:21 +0000255
Andrea Campanella971d5b92020-05-07 11:20:43 +0200256 String mcastSN = get(properties, DEFAULT_MCAST_SERVICE_NAME);
257 multicastServiceName = isNullOrEmpty(mcastSN) ? multicastServiceName : mcastSN;
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000258
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700259 String flowThreads = get(properties, FLOW_PROCESSING_THREADS);
260 int oldFlowProcessingThreads = flowProcessingThreads;
261 flowProcessingThreads = isNullOrEmpty(flowThreads) ?
262 oldFlowProcessingThreads : Integer.parseInt(flowThreads.trim());
Saurav Das2d3777a2020-08-07 18:48:51 -0700263
Andrea Campanella61650a12022-01-24 18:09:44 -0800264 String executorQueueSize = get(properties, FLOW_EXECUTOR_QUEUE_SIZE);
265 int oldExecutorQueueSize = flowExecutorQueueSize;
266 flowExecutorQueueSize = isNullOrEmpty(executorQueueSize) ?
267 oldExecutorQueueSize : Integer.parseInt(executorQueueSize.trim());
268
269 if (flowsExecutor == null || oldFlowProcessingThreads != flowProcessingThreads
270 || oldExecutorQueueSize != flowExecutorQueueSize) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700271 if (flowsExecutor != null) {
272 flowsExecutor.shutdown();
273 }
Andrea Campanella61650a12022-01-24 18:09:44 -0800274
275 flowsExecutor = new ThreadPoolExecutor(0, flowProcessingThreads, 30,
276 TimeUnit.SECONDS, new ThreadPoolQueue(flowExecutorQueueSize),
277 new ThreadPoolExecutor.DiscardPolicy());
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700278 }
Gamze Abaka33feef52019-02-27 08:16:47 +0000279
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700280 String subscriberThreads = get(properties, SUBSCRIBER_PROCESSING_THREADS);
281 int oldSubscriberProcessingThreads = subscriberProcessingThreads;
282 subscriberProcessingThreads = isNullOrEmpty(subscriberThreads) ?
283 oldSubscriberProcessingThreads : Integer.parseInt(subscriberThreads.trim());
284
285 if (subscriberExecutor == null || oldSubscriberProcessingThreads != subscriberProcessingThreads) {
286 if (subscriberExecutor != null) {
287 subscriberExecutor.shutdown();
288 }
289 subscriberExecutor = Executors.newFixedThreadPool(subscriberProcessingThreads,
290 groupedThreads(ONOS_OLT_SERVICE,
291 "subscriber-installer-%d"));
292 }
293
294 String queueDelay = get(properties, REQUEUE_DELAY);
295 requeueDelay = isNullOrEmpty(queueDelay) ?
296 REQUEUE_DELAY_DEFAULT : Integer.parseInt(queueDelay.trim());
alshabibe0559672016-02-21 14:49:51 -0800297 }
Andrea Campanella61650a12022-01-24 18:09:44 -0800298 log.info("Modified. Values = {}: {}, {}:{}, {}:{}," +
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700299 "{}:{}, {}:{}, {}:{}",
300 DEFAULT_BP_ID, defaultBpId,
301 DEFAULT_MCAST_SERVICE_NAME, multicastServiceName,
302 FLOW_PROCESSING_THREADS, flowProcessingThreads,
Andrea Campanella61650a12022-01-24 18:09:44 -0800303 FLOW_EXECUTOR_QUEUE_SIZE, flowExecutorQueueSize,
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700304 SUBSCRIBER_PROCESSING_THREADS, subscriberProcessingThreads,
305 REQUEUE_DELAY, requeueDelay);
alshabibe0559672016-02-21 14:49:51 -0800306 }
307
Ilayda Ozdemir90a93622021-02-25 09:40:58 +0000308
alshabib32232c82016-02-25 17:57:24 -0500309 @Override
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700310 public boolean provisionSubscriber(ConnectPoint cp) {
311 subscriberExecutor.submit(() -> {
312 Device device = deviceService.getDevice(cp.deviceId());
313 Port port = deviceService.getPort(device.id(), cp.port());
314 AccessDevicePort accessDevicePort = new AccessDevicePort(port);
Hardik Windlass395ff372019-06-13 05:16:00 +0000315
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700316 if (oltDeviceService.isNniPort(device, port.number())) {
317 log.warn("will not provision a subscriber on the NNI {}", accessDevicePort);
318 return false;
Saurav Das026650f2020-09-21 18:56:35 -0700319 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700320
321 log.info("Provisioning subscriber on {}", accessDevicePort);
322
323 if (oltFlowService.isSubscriberServiceProvisioned(accessDevicePort)) {
324 log.error("Subscriber on {} is already provisioned", accessDevicePort);
325 return false;
326 }
327
328 SubscriberAndDeviceInformation si = subsService.get(getPortName(port));
329 if (si == null) {
330 log.error("Subscriber information not found in sadis for port {}", accessDevicePort);
331 return false;
332 }
333 DiscoveredSubscriber sub = new DiscoveredSubscriber(device, port,
334 DiscoveredSubscriber.Status.ADDED, true, si);
335
336 // NOTE we need to keep a list of the subscribers that are provisioned on a port,
337 // regardless of the flow status
338 si.uniTagList().forEach(uti -> {
339 ServiceKey sk = new ServiceKey(accessDevicePort, uti);
340 oltFlowService.updateProvisionedSubscriberStatus(sk, true);
341 });
342
343 addSubscriberToQueue(sub);
344 return true;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +0100345 });
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700346 //NOTE this only means we have taken the request in, nothing more.
Amit Ghosh31939522018-08-16 13:28:21 +0100347 return true;
alshabibbf23a1f2016-01-14 17:27:11 -0800348 }
349
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000350 @Override
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700351 public boolean removeSubscriber(ConnectPoint cp) {
352 subscriberExecutor.submit(() -> {
353 Device device = deviceService.getDevice(DeviceId.deviceId(cp.deviceId().toString()));
354 Port port = deviceService.getPort(device.id(), cp.port());
355 AccessDevicePort accessDevicePort = new AccessDevicePort(port);
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000356
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700357 if (oltDeviceService.isNniPort(device, port.number())) {
358 log.warn("will not un-provision a subscriber on the NNI {}",
359 accessDevicePort);
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100360 return false;
361 }
362
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700363 log.info("Un-provisioning subscriber on {}", accessDevicePort);
364
365 if (!oltFlowService.isSubscriberServiceProvisioned(accessDevicePort)) {
366 log.error("Subscriber on {} is not provisioned", accessDevicePort);
367 return false;
368 }
369
370 SubscriberAndDeviceInformation si = subsService.get(getPortName(port));
371 if (si == null) {
372 log.error("Subscriber information not found in sadis for port {}",
373 accessDevicePort);
374 // NOTE that we are returning true so that the subscriber is removed from the queue
375 // and we can move on provisioning others
376 return false;
377 }
378 DiscoveredSubscriber sub = new DiscoveredSubscriber(device, port,
379 DiscoveredSubscriber.Status.REMOVED, true, si);
380
381 // NOTE we need to keep a list of the subscribers that are provisioned on a port,
382 // regardless of the flow status
383 si.uniTagList().forEach(uti -> {
384 ServiceKey sk = new ServiceKey(accessDevicePort, uti);
385 oltFlowService.updateProvisionedSubscriberStatus(sk, false);
Gamze Abakaf59c0912019-04-19 08:24:28 +0000386 });
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700387
388 addSubscriberToQueue(sub);
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100389 return true;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700390 });
391 //NOTE this only means we have taken the request in, nothing more.
392 return true;
393 }
394
395 @Override
396 public boolean provisionSubscriber(ConnectPoint cp, VlanId cTag, VlanId sTag, Integer tpId) {
397 log.debug("Provisioning subscriber on {}, with cTag {}, stag {}, tpId {}",
398 cp, cTag, sTag, tpId);
399 Device device = deviceService.getDevice(cp.deviceId());
400 Port port = deviceService.getPort(device.id(), cp.port());
401 AccessDevicePort accessDevicePort = new AccessDevicePort(port);
402
403 if (oltDeviceService.isNniPort(device, port.number())) {
404 log.warn("will not provision a subscriber on the NNI {}", accessDevicePort);
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100405 return false;
406 }
Amit Ghosh95e2f652017-08-23 12:49:46 +0100407
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700408 SubscriberAndDeviceInformation si = new SubscriberAndDeviceInformation();
409 UniTagInformation specificService = getUniTagInformation(getPortName(port), cTag, sTag, tpId);
410 if (specificService == null) {
411 log.error("Can't find Information for subscriber on {}, with cTag {}, " +
412 "stag {}, tpId {}", cp, cTag, sTag, tpId);
Amit Ghosh31939522018-08-16 13:28:21 +0100413 return false;
414 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700415 List<UniTagInformation> uniTagInformationList = new LinkedList<>();
416 uniTagInformationList.add(specificService);
417 si.setUniTagList(uniTagInformationList);
418 DiscoveredSubscriber sub = new DiscoveredSubscriber(device, port,
419 DiscoveredSubscriber.Status.ADDED, true, si);
Amit Ghosh31939522018-08-16 13:28:21 +0100420
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700421 // NOTE we need to keep a list of the subscribers that are provisioned on a port,
422 // regardless of the flow status
423 ServiceKey sk = new ServiceKey(accessDevicePort, specificService);
424 if (oltFlowService.isSubscriberServiceProvisioned(sk)) {
425 log.error("Subscriber on {} is already provisioned", sk);
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100426 return false;
427 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700428 oltFlowService.updateProvisionedSubscriberStatus(sk, true);
429
430 addSubscriberToQueue(sub);
431 return true;
Amit Ghosh31939522018-08-16 13:28:21 +0100432 }
433
434 @Override
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700435 public boolean removeSubscriber(ConnectPoint cp, VlanId cTag, VlanId sTag, Integer tpId) {
436 log.debug("Un-provisioning subscriber on {} with cTag {}, stag {}, tpId {}",
437 cp, cTag, sTag, tpId);
438 Device device = deviceService.getDevice(cp.deviceId());
439 Port port = deviceService.getPort(device.id(), cp.port());
440 AccessDevicePort accessDevicePort = new AccessDevicePort(port);
441
442 if (oltDeviceService.isNniPort(device, port.number())) {
443 log.warn("will not un-provision a subscriber on the NNI {}",
444 accessDevicePort);
445 return false;
446 }
447
448 SubscriberAndDeviceInformation si = new SubscriberAndDeviceInformation();
449 UniTagInformation specificService = getUniTagInformation(getPortName(port), cTag, sTag, tpId);
450 if (specificService == null) {
451 log.error("Can't find Information for subscriber on {}, with cTag {}, " +
452 "stag {}, tpId {}", cp, cTag, sTag, tpId);
453 return false;
454 }
455 List<UniTagInformation> uniTagInformationList = new LinkedList<>();
456 uniTagInformationList.add(specificService);
457 si.setUniTagList(uniTagInformationList);
458 DiscoveredSubscriber sub = new DiscoveredSubscriber(device, port,
Andrea Campanella982fd332022-01-19 09:14:12 +0100459 DiscoveredSubscriber.Status.REMOVED, true, si);
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700460
461 // NOTE we need to keep a list of the subscribers that are provisioned on a port,
462 // regardless of the flow status
463 ServiceKey sk = new ServiceKey(accessDevicePort, specificService);
464 if (!oltFlowService.isSubscriberServiceProvisioned(sk)) {
465 log.error("Subscriber on {} is not provisioned", sk);
466 return false;
467 }
468 oltFlowService.updateProvisionedSubscriberStatus(sk, false);
469
470 addSubscriberToQueue(sub);
471 return true;
Saurav Das82b8e6d2018-10-04 15:25:12 -0700472 }
473
474 @Override
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700475 public List<DeviceId> getConnectedOlts() {
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100476 List<DeviceId> olts = new ArrayList<>();
477 Iterable<Device> devices = deviceService.getDevices();
478 for (Device d : devices) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700479 if (oltDeviceService.isOlt(d)) {
Saurav Das82b8e6d2018-10-04 15:25:12 -0700480 // So this is indeed an OLT device
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100481 olts.add(d.id());
482 }
483 }
484 return olts;
alshabibe0559672016-02-21 14:49:51 -0800485 }
486
Amit Ghosh31939522018-08-16 13:28:21 +0100487 /**
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700488 * Finds the connect-point to which a subscriber is connected.
Amit Ghosh31939522018-08-16 13:28:21 +0100489 *
490 * @param id The id of the subscriber, this is the same ID as in Sadis
491 * @return Subscribers ConnectPoint if found else null
492 */
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700493 @Override
494 public ConnectPoint findSubscriberConnectPoint(String id) {
Amit Ghosh31939522018-08-16 13:28:21 +0100495
496 Iterable<Device> devices = deviceService.getDevices();
497 for (Device d : devices) {
498 for (Port p : deviceService.getPorts(d.id())) {
499 log.trace("Comparing {} with {}", p.annotations().value(AnnotationKeys.PORT_NAME), id);
500 if (p.annotations().value(AnnotationKeys.PORT_NAME).equals(id)) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700501 log.debug("Found on {}", portWithName(p));
Amit Ghosh31939522018-08-16 13:28:21 +0100502 return new ConnectPoint(d.id(), p.number());
503 }
504 }
505 }
506 return null;
507 }
508
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700509 protected void processDiscoveredSubscribers() {
Gamze Abaka641fc072018-09-04 09:16:27 +0000510
Andrea Campanella61650a12022-01-24 18:09:44 -0800511 log.info("Started processDiscoveredSubscribers loop");
512 while (true) {
513 Set<ConnectPoint> discoveredCps;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700514 try {
515 queueReadLock.lock();
Andrea Campanella61650a12022-01-24 18:09:44 -0800516 discoveredCps = new HashSet<>(eventsQueues.keySet());
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700517 } catch (Exception e) {
Andrea Campanella61650a12022-01-24 18:09:44 -0800518 log.error("Cannot read keys from queue map", e);
519 continue;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700520 } finally {
521 queueReadLock.unlock();
Tunahan Sezena07fe962021-02-24 08:24:24 +0000522 }
Tunahan Sezena07fe962021-02-24 08:24:24 +0000523
Andrea Campanella61650a12022-01-24 18:09:44 -0800524 discoveredCps.forEach(cp -> {
525 LinkedBlockingQueue<DiscoveredSubscriber> eventsQueue;
526
527 try {
528 queueReadLock.lock();
529 eventsQueue = eventsQueues.get(cp);
530 } catch (Exception e) {
531 log.error("Cannot get key from queue map", e);
532 return;
533 } finally {
534 queueReadLock.unlock();
Andrea Campanella7a1d7e72020-11-05 10:40:10 +0100535 }
Tunahan Sezena07fe962021-02-24 08:24:24 +0000536
Andrea Campanella61650a12022-01-24 18:09:44 -0800537 if (!oltDeviceService.isLocalLeader(cp.deviceId())) {
538 // if we're not local leader for this device, ignore this queue
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700539 if (log.isTraceEnabled()) {
Andrea Campanella61650a12022-01-24 18:09:44 -0800540 log.trace("Ignoring queue on CP {} as not master of the device", cp);
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700541 }
Andrea Campanella61650a12022-01-24 18:09:44 -0800542 return;
543 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700544
Andrea Campanella61650a12022-01-24 18:09:44 -0800545 try {
546 flowsExecutor.execute(() -> {
547 if (!eventsQueue.isEmpty()) {
548 // we do not remove the event from the queue until it has been processed
549 // in that way we guarantee that events are processed in order
550 DiscoveredSubscriber sub = eventsQueue.peek();
551 if (sub == null) {
552 // the queue is empty
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700553 return;
554 }
555
Andrea Campanella61650a12022-01-24 18:09:44 -0800556 if (log.isTraceEnabled()) {
Andrea Campanellaccb32862022-02-17 16:29:10 +0100557 log.trace("Processing subscriber {} on port {} with status {}, has subscriber {}",
Andrea Campanella61650a12022-01-24 18:09:44 -0800558 sub, portWithName(sub.port), sub.status, sub.hasSubscriber);
559 }
560
561 if (sub.hasSubscriber) {
562 // this is a provision subscriber call
563 if (oltFlowService.handleSubscriberFlows(sub, defaultBpId, multicastServiceName)) {
564 removeSubscriberFromQueue(sub);
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700565 }
Andrea Campanella61650a12022-01-24 18:09:44 -0800566 } else {
567 // this is a port event (ENABLED/DISABLED)
568 // means no subscriber was provisioned on that port
569
570 if (!deviceService.isAvailable(sub.device.id()) ||
571 deviceService.getPort(sub.device.id(), sub.port.number()) == null) {
572 // If the device is not connected or the port is not available do nothing
573 // This can happen when we disable and then immediately delete the device,
574 // the queue is populated but the meters and flows are already gone
575 // thus there is nothing left to do
576 return;
577 }
578
579 if (oltFlowService.handleBasicPortFlows(sub, defaultBpId, defaultBpId)) {
580 if (log.isTraceEnabled()) {
581 log.trace("Processing of port {} completed",
582 portWithName(sub.port));
583 }
584 removeSubscriberFromQueue(sub);
Andrea Campanellaccb32862022-02-17 16:29:10 +0100585 } else {
586 log.debug("Not handling basic port flows " +
587 "for {}, leaving in the queue",
588 portWithName(sub.port));
Andrea Campanella61650a12022-01-24 18:09:44 -0800589 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700590 }
Tunahan Sezena07fe962021-02-24 08:24:24 +0000591 }
Andrea Campanella61650a12022-01-24 18:09:44 -0800592 });
593 } catch (Exception e) {
594 log.error("Exception processing subscriber", e);
Tunahan Sezena07fe962021-02-24 08:24:24 +0000595 }
596 });
Tunahan Sezena07fe962021-02-24 08:24:24 +0000597
Andrea Campanella61650a12022-01-24 18:09:44 -0800598 try {
599 TimeUnit.MILLISECONDS.sleep(requeueDelay);
600 } catch (InterruptedException e) {
601 log.debug("Interrupted while waiting to requeue", e);
602 }
Tunahan Sezena07fe962021-02-24 08:24:24 +0000603 }
Tunahan Sezena07fe962021-02-24 08:24:24 +0000604 }
605
Andrea Campanella61650a12022-01-24 18:09:44 -0800606
Tunahan Sezena07fe962021-02-24 08:24:24 +0000607 /**
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000608 * Checks the subscriber uni tag list and find the uni tag information.
609 * using the pon c tag, pon s tag and the technology profile id
610 * May return Optional<null>
611 *
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700612 * @param portName port of the subscriber
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000613 * @param innerVlan pon c tag
614 * @param outerVlan pon s tag
615 * @param tpId the technology profile id
616 * @return the found uni tag information
617 */
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700618 private UniTagInformation getUniTagInformation(String portName, VlanId innerVlan,
619 VlanId outerVlan, int tpId) {
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000620 log.debug("Getting uni tag information for {}, innerVlan: {}, outerVlan: {}, tpId: {}",
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700621 portName, innerVlan, outerVlan, tpId);
622 SubscriberAndDeviceInformation subInfo = subsService.get(portName);
Gamze Abakaf59c0912019-04-19 08:24:28 +0000623 if (subInfo == null) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700624 log.warn("Subscriber information doesn't exist for {}", portName);
625 return null;
Gamze Abakaf59c0912019-04-19 08:24:28 +0000626 }
627
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000628 List<UniTagInformation> uniTagList = subInfo.uniTagList();
629 if (uniTagList == null) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700630 log.warn("Uni tag list is not found for the subscriber {} on {}", subInfo.id(), portName);
631 return null;
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000632 }
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100633
Harsh Awasthic1e4bf52022-02-09 14:14:14 +0530634 UniTagInformation service = OltUtils.getUniTagInformation(subInfo, innerVlan, outerVlan, tpId);
635
636 if (service == null) {
637 // Try again after invalidating cache for the particular port name.
638 subsService.invalidateId(portName);
639 subInfo = subsService.get(portName);
640 service = OltUtils.getUniTagInformation(subInfo, innerVlan, outerVlan, tpId);
Gamze Abaka1efc80c2019-02-15 12:10:54 +0000641 }
Gamze Abaka1efc80c2019-02-15 12:10:54 +0000642
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000643 if (service == null) {
Matteo Scandolo19b56f62020-10-29 13:29:21 -0700644 log.warn("SADIS doesn't include the service with ponCtag {} ponStag {} and tpId {} on {}",
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700645 innerVlan, outerVlan, tpId, portName);
Saurav Das82b8e6d2018-10-04 15:25:12 -0700646 return null;
647 }
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100648
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700649 return service;
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100650 }
651
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700652 protected void bindSadisService(SadisService service) {
653 sadisService = service;
654 subsService = sadisService.getSubscriberInfoService();
655
656 log.info("Sadis-service binds to onos.");
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100657 }
658
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700659 protected void unbindSadisService(SadisService service) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700660 deviceListener = null;
661 sadisService = null;
662 subsService = null;
663 log.info("Sadis-service unbinds from onos.");
Jonathan Hart1d34c8b2018-05-05 15:37:28 -0700664 }
665
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700666 protected void addSubscriberToQueue(DiscoveredSubscriber sub) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700667 try {
Andrea Campanella61650a12022-01-24 18:09:44 -0800668 ConnectPoint cp = new ConnectPoint(sub.device.id(), sub.port.number());
Matteo Scandolod7aa89c2021-12-07 10:21:34 -0800669
Andrea Campanella61650a12022-01-24 18:09:44 -0800670 try {
671 queueWriteLock.lock();
672 eventsQueues.compute(cp, (subcp, queue) -> {
673 queue = queue == null ? new LinkedBlockingQueue<>() : queue;
674 log.info("Adding subscriber {} to queue: {} with existing {}",
675 sub, portWithName(sub.port), queue);
676 queue.add(sub);
677 return queue;
678 });
679 } catch (UnsupportedOperationException | ClassCastException |
680 NullPointerException | IllegalArgumentException e) {
681 log.error("Cannot add subscriber {} to queue: {}", portWithName(sub.port), e.getMessage());
682 } finally {
683 queueWriteLock.unlock();
684 }
685 } catch (Exception e) {
686 log.error("Can't add {} to queue", sub, e);
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800687 }
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800688 }
689
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700690 protected void removeSubscriberFromQueue(DiscoveredSubscriber sub) {
691 ConnectPoint cp = new ConnectPoint(sub.device.id(), sub.port.number());
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700692 try {
693 queueWriteLock.lock();
Andrea Campanella61650a12022-01-24 18:09:44 -0800694 eventsQueues.compute(cp, (subcp, queue) -> {
695 if (log.isTraceEnabled()) {
696 log.trace("Removing subscriber {} from queue : {} " +
697 "with existing {}", sub,
698 portWithName(sub.port), queue);
699 }
700 if (queue == null) {
701 log.warn("Cannot find queue for connectPoint {}", cp);
702 return queue;
703 }
704 boolean removed = queue.remove(sub);
705 if (!removed) {
706 log.warn("Subscriber {} has not been removed from queue, " +
707 "is it still there? {}", sub, queue);
708 return queue;
709 } else {
710 log.debug("Subscriber {} has been removed from the queue {}",
711 sub, queue);
712 }
713
714 return queue;
715 });
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700716 } catch (UnsupportedOperationException | ClassCastException |
717 NullPointerException | IllegalArgumentException e) {
718 log.error("Cannot remove subscriber {} from queue: {}", sub, e.getMessage());
719 } finally {
720 queueWriteLock.unlock();
Tunahan Sezena07fe962021-02-24 08:24:24 +0000721 }
722 }
723
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700724 protected class OltDeviceListener
725 implements DeviceListener {
726
727 private final Logger log = LoggerFactory.getLogger(getClass());
728 protected ExecutorService eventExecutor;
729
730 /**
731 * Builds the listener with all the proper services and information needed.
732 */
733 public OltDeviceListener() {
734 this.eventExecutor = Executors.newFixedThreadPool(flowProcessingThreads,
735 groupedThreads("onos/olt-device-listener-event", "event-%d", log));
736 }
737
738 public void deactivate() {
739 this.eventExecutor.shutdown();
740 }
Saurav Dasa9d5f442019-03-06 19:32:48 -0800741
alshabibf0e7e702015-05-30 18:22:36 -0700742 @Override
743 public void event(DeviceEvent event) {
Andrea Campanella61650a12022-01-24 18:09:44 -0800744 if (log.isTraceEnabled()) {
745 log.trace("OltListener receives event {} for: {}/{}", event.type(), event.subject().id(),
746 event.port() != null ? event.port().number() : null);
747 }
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700748 eventExecutor.execute(() -> {
Andrea Campanella61650a12022-01-24 18:09:44 -0800749 if (log.isTraceEnabled()) {
750 log.trace("OltListener Executor receives event {} for: {}/{}",
751 event.type(), event.subject().id(),
752 event.port() != null ? event.port().number() : null);
753 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700754 boolean isOlt = oltDeviceService.isOlt(event.subject());
755 DeviceId deviceId = event.subject().id();
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700756 switch (event.type()) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700757 case PORT_STATS_UPDATED:
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700758 case DEVICE_ADDED:
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700759 return;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700760 case PORT_ADDED:
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700761 case PORT_REMOVED:
762 if (!isOlt) {
763 log.trace("Ignoring event {}, this is not an OLT device", deviceId);
764 return;
765 }
766 if (!oltDeviceService.isLocalLeader(deviceId)) {
767 log.trace("Device {} is not local to this node", deviceId);
768 return;
769 }
770 // port added, updated and removed are treated in the same way as we only care whether the port
771 // is enabled or not
772 handleOltPort(event.type(), event.subject(), event.port());
773 return;
Andrea Campanellacc6dc7e2022-03-22 10:38:43 +0100774 case PORT_UPDATED:
775 if (!isOlt) {
776 log.trace("Ignoring event {}, this is not an OLT device", deviceId);
777 return;
778 }
779 // port updated are handled only when the device is available, makes not sense otherwise.
780 // this also solves an issue with port events and device disconnection events handled
781 // in sparse order causing failures in the ofagent disconnect test
782 // (see https://jira.opencord.org/browse/VOL-4669)
783 if (!deviceService.isAvailable(deviceId)) {
784 log.debug("Ignoring port event {} on {} as it is disconnected", event, deviceId);
785 return;
786 }
787 if (!oltDeviceService.isLocalLeader(deviceId)) {
788 log.trace("Device {} is not local to this node", deviceId);
789 return;
790 }
791 handleOltPort(event.type(), event.subject(), event.port());
792 return;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700793 case DEVICE_AVAILABILITY_CHANGED:
794 if (!isOlt) {
795 log.trace("Ignoring event {}, this is not an OLT device", deviceId);
796 return;
797 }
798 if (deviceService.isAvailable(deviceId)) {
799 if (!oltDeviceService.isLocalLeader(deviceId)) {
800 if (log.isTraceEnabled()) {
801 log.trace("Device {} is not local to this node, not handling available device",
802 deviceId);
803 }
804 } else {
805 log.info("Handling available device: {}", deviceId);
806 handleExistingPorts();
807 }
808 } else if (!deviceService.isAvailable(deviceId) && deviceService.getPorts(deviceId).isEmpty()) {
809 // NOTE that upon disconnection there is no mastership on the device,
810 // and we should anyway clear the local cache of the flows/meters across instances.
811 // We're only clearing the device if there are no available ports,
812 // otherwise we assume it's a temporary disconnection
813 log.info("Device {} availability changed to false and ports are empty, " +
814 "purging meters and flows", deviceId);
815 //NOTE all the instances will call these methods
816 oltFlowService.purgeDeviceFlows(deviceId);
817 oltMeterService.purgeDeviceMeters(deviceId);
Andrea Campanellaccb32862022-02-17 16:29:10 +0100818 // cpStatus is a distributed map, thus only master will update it.
819 if (oltDeviceService.isLocalLeader(deviceId)) {
820 log.debug("Master, clearing cp status for {}", deviceId);
821 clearQueueForDevice(deviceId);
822 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700823 } else {
824 log.info("Device {} availability changed to false, but ports are still available, " +
Matteo Scandolod7aa89c2021-12-07 10:21:34 -0800825 "assuming temporary disconnection.",
826 deviceId);
827 if (log.isTraceEnabled()) {
828 log.trace("Available ports: {}", deviceService.getPorts(deviceId));
829 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700830 }
831 return;
832 case DEVICE_REMOVED:
833 if (!isOlt) {
834 log.trace("Ignoring event {}, this is not an OLT device", deviceId);
835 return;
836 }
837 log.info("Device {} Removed, purging meters and flows", deviceId);
838 oltFlowService.purgeDeviceFlows(deviceId);
839 oltMeterService.purgeDeviceMeters(deviceId);
Andrea Campanellaccb32862022-02-17 16:29:10 +0100840 if (oltDeviceService.isLocalLeader(deviceId)) {
841 log.debug("Master, clearing cp status for {}", deviceId);
842 clearQueueForDevice(deviceId);
843 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700844 return;
845 default:
846 if (log.isTraceEnabled()) {
847 log.trace("Not handling event: {}, ", event);
848 }
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700849 }
850 });
alshabibf0e7e702015-05-30 18:22:36 -0700851 }
Gamze Abaka838d8142019-02-21 07:06:55 +0000852
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700853 protected void clearQueueForDevice(DeviceId devId) {
854 try {
855 queueWriteLock.lock();
856 Iterator<Map.Entry<ConnectPoint, LinkedBlockingQueue<DiscoveredSubscriber>>> iter =
857 eventsQueues.entrySet().iterator();
858 while (iter.hasNext()) {
859 Map.Entry<ConnectPoint, LinkedBlockingQueue<DiscoveredSubscriber>> entry = iter.next();
860 if (entry.getKey().deviceId().equals(devId)) {
861 eventsQueues.remove(entry.getKey());
Andrea Campanellaccb32862022-02-17 16:29:10 +0100862 log.debug("Removing key from queue {}", entry.getKey());
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700863 }
864 }
865 } finally {
866 queueWriteLock.unlock();
Gamze Abaka838d8142019-02-21 07:06:55 +0000867 }
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000868 }
869
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700870 protected void handleOltPort(DeviceEvent.Type type, Device device, Port port) {
871 log.info("OltDeviceListener receives event {} for port {} with status {} on device {}", type,
872 portWithName(port), port.isEnabled() ? "ENABLED" : "DISABLED", device.id());
Andrea Campanella61650a12022-01-24 18:09:44 -0800873 boolean isNni = oltDeviceService.isNniPort(device, port.number());
874
875 if (!isNni && type == DeviceEvent.Type.PORT_ADDED) {
876 log.debug("Ignoring PORT_ADD on UNI port {}", portWithName(port));
877 return;
878 }
879
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700880 if (port.isEnabled()) {
Andrea Campanella61650a12022-01-24 18:09:44 -0800881 if (isNni) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700882 OltFlowService.FlowOperation action = OltFlowService.FlowOperation.ADD;
Matteo Scandolo1f8de332021-12-06 12:18:24 -0800883 // NOTE the NNI is only disabled if the OLT shuts down (reboot or failure).
884 // In that case the flows are purged anyway, so there's no need to deal with them,
885 // it would actually be counter-productive as the openflow connection is severed and they won't
886 // be correctly processed
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700887 if (type == DeviceEvent.Type.PORT_REMOVED) {
Matteo Scandolo1f8de332021-12-06 12:18:24 -0800888 log.debug("NNI port went down, " +
889 "ignoring event as flows will be removed in the generic device cleanup");
890 return;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700891 }
892 oltFlowService.handleNniFlows(device, port, action);
893 } else {
894 post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_ADDED, device.id(), port));
895 // NOTE if the subscriber was previously provisioned,
896 // then add it back to the queue to be re-provisioned
897 boolean provisionSubscriber = oltFlowService.
898 isSubscriberServiceProvisioned(new AccessDevicePort(port));
899 SubscriberAndDeviceInformation si = subsService.get(getPortName(port));
900 if (si == null) {
901 //NOTE this should not happen given that the subscriber was provisioned before
902 log.error("Subscriber information not found in sadis for port {}",
903 portWithName(port));
904 return;
905 }
906
907 DiscoveredSubscriber.Status status = DiscoveredSubscriber.Status.ADDED;
908 if (type == DeviceEvent.Type.PORT_REMOVED) {
909 status = DiscoveredSubscriber.Status.REMOVED;
910 }
911
912 DiscoveredSubscriber sub =
913 new DiscoveredSubscriber(device, port,
914 status, provisionSubscriber, si);
915 addSubscriberToQueue(sub);
916 }
917 } else {
Andrea Campanella61650a12022-01-24 18:09:44 -0800918 if (isNni) {
Matteo Scandolob6981dc2021-12-02 16:31:44 -0800919 // NOTE the NNI is only disabled if the OLT shuts down (reboot or failure).
920 // In that case the flows are purged anyway, so there's no need to deal with them,
921 // it would actually be counter-productive as the openflow connection is severed and they won't
922 // be correctly processed
923 log.debug("NNI port went down, " +
924 "ignoring event as flows will be removed in the generic device cleanup");
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700925 } else {
926 post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_REMOVED, device.id(), port));
927 // NOTE we are assuming that if a subscriber has default eapol
928 // it does not have subscriber flows
929 if (oltFlowService.hasDefaultEapol(port)) {
930 SubscriberAndDeviceInformation si = subsService.get(getPortName(port));
931 if (si == null) {
932 //NOTE this should not happen given that the subscriber was provisioned before
933 log.error("Subscriber information not found in sadis for port {}",
934 portWithName(port));
935 return;
936 }
937 DiscoveredSubscriber sub =
938 new DiscoveredSubscriber(device, port,
939 DiscoveredSubscriber.Status.REMOVED, false, si);
940
941 addSubscriberToQueue(sub);
942
943 } else if (oltFlowService.isSubscriberServiceProvisioned(new AccessDevicePort(port))) {
944 SubscriberAndDeviceInformation si = subsService.get(getPortName(port));
945 if (si == null) {
946 //NOTE this should not happen given that the subscriber was provisioned before
947 log.error("Subscriber information not found in sadis for port {}",
948 portWithName(port));
949 return;
950 }
951 DiscoveredSubscriber sub =
952 new DiscoveredSubscriber(device, port,
953 DiscoveredSubscriber.Status.REMOVED, true, si);
954 addSubscriberToQueue(sub);
955 }
956 }
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000957 }
Gamze Abaka838d8142019-02-21 07:06:55 +0000958 }
Gamze Abakada282b42019-03-11 13:16:48 +0000959
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700960 /**
961 * This method is invoked on app activation in order to deal
962 * with devices and ports that are already existing in the system
963 * and thus won't trigger an event.
964 * It is also needed on instance reboot and device reconnect
965 */
966 protected void handleExistingPorts() {
967 Iterable<DeviceId> devices = getConnectedOlts();
968 for (DeviceId deviceId : devices) {
969 log.info("Handling existing OLT Ports for device {}", deviceId);
970 if (oltDeviceService.isLocalLeader(deviceId)) {
971 List<Port> ports = deviceService.getPorts(deviceId);
972 for (Port p : ports) {
973 if (PortNumber.LOCAL.equals(p.number()) || !p.isEnabled()) {
974 continue;
975 }
976 Device device = deviceService.getDevice(deviceId);
977 deviceListener.handleOltPort(DeviceEvent.Type.PORT_UPDATED, device, p);
978 }
979 }
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800980 }
981 }
982 }
Andrea Campanella61650a12022-01-24 18:09:44 -0800983
984 protected final class ThreadPoolQueue extends ArrayBlockingQueue<Runnable> {
985
986 public ThreadPoolQueue(int capacity) {
987 super(capacity);
988 }
989
990 @Override
991 public boolean offer(Runnable runnable) {
992 if (runnable == null) {
993 return false;
994 }
995 try {
996 put(runnable);
997 } catch (InterruptedException e1) {
998 Thread.currentThread().interrupt();
999 return false;
1000 }
1001 return true;
1002 }
1003
1004 }
Hardik Windlass395ff372019-06-13 05:16:00 +00001005}