blob: 3b3c2b902dba7ff9ba159e409a1a93eb1ef5dcc8 [file] [log] [blame]
alshabibf0e7e702015-05-30 18:22:36 -07001/*
Joey Armstrong7f6d6d22023-01-09 17:09:50 -05002 * Copyright 2021-2023 Open Networking Foundation (ONF) and the ONF Contributors
alshabibf0e7e702015-05-30 18:22:36 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
alshabib36a4d732016-06-01 16:03:59 -070016package org.opencord.olt.impl;
alshabibf0e7e702015-05-30 18:22:36 -070017
alshabibf0e7e702015-05-30 18:22:36 -070018import org.onlab.packet.VlanId;
Jonathan Hart4f178fa2020-02-03 10:46:01 -080019import org.onlab.util.KryoNamespace;
Gamze Abaka1f62dd92020-05-07 08:58:13 +000020import org.onosproject.cfg.ComponentConfigService;
Jonathan Hart4f178fa2020-02-03 10:46:01 -080021import org.onosproject.cluster.ClusterService;
Andrea Campanellaaf39b4c2020-05-13 14:07:44 +020022import org.onosproject.cluster.LeadershipService;
alshabibf0e7e702015-05-30 18:22:36 -070023import org.onosproject.core.ApplicationId;
24import org.onosproject.core.CoreService;
alshabib8e4fd2f2016-01-12 15:55:53 -080025import org.onosproject.event.AbstractListenerManager;
Andrea Campanellaaf39b4c2020-05-13 14:07:44 +020026import org.onosproject.mastership.MastershipService;
Amit Ghosh1ed9aef2018-07-17 17:08:16 +010027import org.onosproject.net.AnnotationKeys;
Jonathan Harte533a422015-10-20 17:31:24 -070028import org.onosproject.net.ConnectPoint;
Amit Ghosh1ed9aef2018-07-17 17:08:16 +010029import org.onosproject.net.Device;
alshabibf0e7e702015-05-30 18:22:36 -070030import org.onosproject.net.DeviceId;
alshabibdec2e252016-01-15 12:20:25 -080031import org.onosproject.net.Port;
alshabibf0e7e702015-05-30 18:22:36 -070032import org.onosproject.net.PortNumber;
33import org.onosproject.net.device.DeviceEvent;
34import org.onosproject.net.device.DeviceListener;
35import org.onosproject.net.device.DeviceService;
Jonathan Hart4f178fa2020-02-03 10:46:01 -080036import org.onosproject.store.serializers.KryoNamespaces;
Jonathan Hart4f178fa2020-02-03 10:46:01 -080037import org.onosproject.store.service.Serializer;
38import org.onosproject.store.service.StorageService;
alshabib36a4d732016-06-01 16:03:59 -070039import org.opencord.olt.AccessDeviceEvent;
40import org.opencord.olt.AccessDeviceListener;
Gustavo Silva29fb20e2022-05-26 09:59:54 -030041import org.opencord.olt.AccessDevicePort;
alshabib36a4d732016-06-01 16:03:59 -070042import org.opencord.olt.AccessDeviceService;
Gustavo Silva29fb20e2022-05-26 09:59:54 -030043import org.opencord.olt.DiscoveredSubscriber;
44import org.opencord.olt.OltDeviceServiceInterface;
45import org.opencord.olt.OltFlowServiceInterface;
46import org.opencord.olt.OltMeterServiceInterface;
47import org.opencord.olt.ServiceKey;
48import org.opencord.olt.FlowOperation;
Gamze Abaka641fc072018-09-04 09:16:27 +000049import org.opencord.sadis.BaseInformationService;
50import org.opencord.sadis.SadisService;
Amit Ghosh1ed9aef2018-07-17 17:08:16 +010051import org.opencord.sadis.SubscriberAndDeviceInformation;
Andrea Campanellacbbb7952019-11-25 06:38:41 +000052import org.opencord.sadis.UniTagInformation;
alshabibe0559672016-02-21 14:49:51 -080053import org.osgi.service.component.ComponentContext;
Carmelo Casconeca931162019-07-15 18:22:24 -070054import org.osgi.service.component.annotations.Activate;
55import org.osgi.service.component.annotations.Component;
56import org.osgi.service.component.annotations.Deactivate;
57import org.osgi.service.component.annotations.Modified;
58import org.osgi.service.component.annotations.Reference;
59import org.osgi.service.component.annotations.ReferenceCardinality;
Ilayda Ozdemir90a93622021-02-25 09:40:58 +000060import org.osgi.service.component.annotations.ReferencePolicy;
alshabibf0e7e702015-05-30 18:22:36 -070061import org.slf4j.Logger;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -070062import org.slf4j.LoggerFactory;
alshabibf0e7e702015-05-30 18:22:36 -070063
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010064import java.util.ArrayList;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010065import java.util.Dictionary;
Andrea Campanella61650a12022-01-24 18:09:44 -080066import java.util.HashSet;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -070067import java.util.Iterator;
68import java.util.LinkedList;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010069import java.util.List;
70import java.util.Map;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010071import java.util.Properties;
72import java.util.Set;
Andrea Campanella61650a12022-01-24 18:09:44 -080073import java.util.concurrent.ArrayBlockingQueue;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010074import java.util.concurrent.ExecutorService;
75import java.util.concurrent.Executors;
76import java.util.concurrent.LinkedBlockingQueue;
77import java.util.concurrent.ScheduledExecutorService;
Andrea Campanella61650a12022-01-24 18:09:44 -080078import java.util.concurrent.ThreadPoolExecutor;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010079import java.util.concurrent.TimeUnit;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -070080import java.util.concurrent.locks.Lock;
81import java.util.concurrent.locks.ReentrantReadWriteLock;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010082
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010083import static com.google.common.base.Strings.isNullOrEmpty;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010084import static org.onlab.util.Tools.get;
85import static org.onlab.util.Tools.groupedThreads;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -070086import static org.opencord.olt.impl.OltUtils.getPortName;
87import static org.opencord.olt.impl.OltUtils.portWithName;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +010088import static org.opencord.olt.impl.OsgiPropertyConstants.*;
alshabibf0e7e702015-05-30 18:22:36 -070089
90/**
Matteo Scandoloaa2adde2021-09-13 12:45:32 -070091 * OLT Application.
alshabibf0e7e702015-05-30 18:22:36 -070092 */
Carmelo Casconeca931162019-07-15 18:22:24 -070093@Component(immediate = true,
94 property = {
Carmelo Casconeca931162019-07-15 18:22:24 -070095 DEFAULT_BP_ID + ":String=" + DEFAULT_BP_ID_DEFAULT,
Andrea Campanellacbbb7952019-11-25 06:38:41 +000096 DEFAULT_MCAST_SERVICE_NAME + ":String=" + DEFAULT_MCAST_SERVICE_NAME_DEFAULT,
Matteo Scandoloaa2adde2021-09-13 12:45:32 -070097 FLOW_PROCESSING_THREADS + ":Integer=" + FLOW_PROCESSING_THREADS_DEFAULT,
Andrea Campanella61650a12022-01-24 18:09:44 -080098 FLOW_EXECUTOR_QUEUE_SIZE + ":Integer=" + FLOW_EXECUTOR_QUEUE_SIZE_DEFAULT,
Matteo Scandoloaa2adde2021-09-13 12:45:32 -070099 SUBSCRIBER_PROCESSING_THREADS + ":Integer=" + SUBSCRIBER_PROCESSING_THREADS_DEFAULT,
100 REQUEUE_DELAY + ":Integer=" + REQUEUE_DELAY_DEFAULT
Carmelo Casconeca931162019-07-15 18:22:24 -0700101 })
alshabib8e4fd2f2016-01-12 15:55:53 -0800102public class Olt
103 extends AbstractListenerManager<AccessDeviceEvent, AccessDeviceListener>
104 implements AccessDeviceService {
alshabibf0e7e702015-05-30 18:22:36 -0700105
Carmelo Casconeca931162019-07-15 18:22:24 -0700106 @Reference(cardinality = ReferenceCardinality.MANDATORY)
alshabibf0e7e702015-05-30 18:22:36 -0700107 protected DeviceService deviceService;
108
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700109 @Reference(cardinality = ReferenceCardinality.MANDATORY)
110 protected ComponentConfigService cfgService;
Andrea Campanella438e1ad2021-03-26 11:41:16 +0100111
Carmelo Casconeca931162019-07-15 18:22:24 -0700112 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700113 protected MastershipService mastershipService;
alshabibf0e7e702015-05-30 18:22:36 -0700114
Andrea Campanella438e1ad2021-03-26 11:41:16 +0100115 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700116 protected ClusterService clusterService;
117
118 @Reference(cardinality = ReferenceCardinality.MANDATORY)
119 protected LeadershipService leadershipService;
Andrea Campanella438e1ad2021-03-26 11:41:16 +0100120
Ilayda Ozdemir90a93622021-02-25 09:40:58 +0000121 @Reference(cardinality = ReferenceCardinality.OPTIONAL,
122 bind = "bindSadisService",
123 unbind = "unbindSadisService",
124 policy = ReferencePolicy.DYNAMIC)
125 protected volatile SadisService sadisService;
Gamze Abaka641fc072018-09-04 09:16:27 +0000126
Carmelo Casconeca931162019-07-15 18:22:24 -0700127 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700128 protected OltDeviceServiceInterface oltDeviceService;
alshabibe0559672016-02-21 14:49:51 -0800129
Carmelo Casconeca931162019-07-15 18:22:24 -0700130 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700131 protected OltFlowServiceInterface oltFlowService;
132
133 @Reference(cardinality = ReferenceCardinality.MANDATORY)
134 protected OltMeterServiceInterface oltMeterService;
Gamze Abakaad329652018-12-20 10:12:21 +0000135
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800136 @Reference(cardinality = ReferenceCardinality.MANDATORY)
137 protected StorageService storageService;
138
139 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700140 protected CoreService coreService;
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800141
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700142 protected ApplicationId appId;
Andrea Campanellaaf39b4c2020-05-13 14:07:44 +0200143
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700144 private static final String ONOS_OLT_SERVICE = "onos/olt-service";
Tunahan Sezena07fe962021-02-24 08:24:24 +0000145
Carmelo Casconeca931162019-07-15 18:22:24 -0700146 /**
Carmelo Cascone95ff5122019-11-14 14:19:13 -0800147 * Default bandwidth profile id that is used for authentication trap flows.
Carmelo Casconeca931162019-07-15 18:22:24 -0700148 **/
149 protected String defaultBpId = DEFAULT_BP_ID_DEFAULT;
Gamze Abakaad329652018-12-20 10:12:21 +0000150
Carmelo Casconeca931162019-07-15 18:22:24 -0700151 /**
Gamze Abaka51a34e82020-05-08 13:03:14 +0000152 * Default multicast service name.
Carmelo Casconeca931162019-07-15 18:22:24 -0700153 **/
Gamze Abaka51a34e82020-05-08 13:03:14 +0000154 protected String multicastServiceName = DEFAULT_MCAST_SERVICE_NAME_DEFAULT;
Gamze Abaka33feef52019-02-27 08:16:47 +0000155
Saurav Das2d3777a2020-08-07 18:48:51 -0700156 /**
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700157 * Number of threads used to process flows.
Saurav Das2d3777a2020-08-07 18:48:51 -0700158 **/
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700159 protected int flowProcessingThreads = FLOW_PROCESSING_THREADS_DEFAULT;
Saurav Das2d3777a2020-08-07 18:48:51 -0700160
Matteo Scandoloa4aaa972020-10-23 15:24:38 -0700161 /**
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700162 * Number of threads used to process flows.
163 **/
Andrea Campanella61650a12022-01-24 18:09:44 -0800164 protected int flowExecutorQueueSize = FLOW_EXECUTOR_QUEUE_SIZE_DEFAULT;
165
166 /**
167 * Number of threads used to process flows.
168 **/
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700169 protected int subscriberProcessingThreads = SUBSCRIBER_PROCESSING_THREADS_DEFAULT;
170
171 /**
172 * Delay in ms to put an event back in the queue, used to avoid retrying things to often if conditions are not met.
173 **/
174 protected int requeueDelay = REQUEUE_DELAY_DEFAULT;
175
176 private final Logger log = LoggerFactory.getLogger(getClass());
177
178 /**
179 * A queue to asynchronously process events.
Matteo Scandoloa4aaa972020-10-23 15:24:38 -0700180 */
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700181 protected Map<ConnectPoint, LinkedBlockingQueue<DiscoveredSubscriber>> eventsQueues;
alshabibf0e7e702015-05-30 18:22:36 -0700182
Gamze Abaka641fc072018-09-04 09:16:27 +0000183 protected BaseInformationService<SubscriberAndDeviceInformation> subsService;
alshabibf0e7e702015-05-30 18:22:36 -0700184
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700185 /**
186 * Listener for OLT devices events.
187 */
188 protected OltDeviceListener deviceListener = new OltDeviceListener();
189 protected ScheduledExecutorService discoveredSubscriberExecutor =
190 Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/olt",
191 "discovered-cp-%d", log));
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100192
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700193 protected ScheduledExecutorService queueExecutor =
194 Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/olt",
195 "discovered-cp-restore-%d", log));
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700196
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700197 /**
198 * Executor used to defer flow provisioning to a different thread pool.
199 */
200 protected ExecutorService flowsExecutor;
Saurav Dasa9d5f442019-03-06 19:32:48 -0800201
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700202 /**
203 * Executor used to defer subscriber handling from API call to a different thread pool.
204 */
205 protected ExecutorService subscriberExecutor;
206
207 private static final String APP_NAME = "org.opencord.olt";
208
209 private final ReentrantReadWriteLock queueLock = new ReentrantReadWriteLock();
210 private final Lock queueWriteLock = queueLock.writeLock();
211 private final Lock queueReadLock = queueLock.readLock();
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700212
alshabibf0e7e702015-05-30 18:22:36 -0700213 @Activate
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700214 protected void activate(ComponentContext context) {
215 cfgService.registerProperties(getClass());
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700216
alshabibe0559672016-02-21 14:49:51 -0800217 modified(context);
Saurav Das62ad75e2019-03-05 12:22:22 -0800218
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700219 appId = coreService.registerApplication(APP_NAME);
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800220 KryoNamespace serializer = KryoNamespace.newBuilder()
221 .register(KryoNamespaces.API)
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700222 .register(ConnectPoint.class)
223 .register(DiscoveredSubscriber.class)
224 .register(DiscoveredSubscriber.Status.class)
225 .register(SubscriberAndDeviceInformation.class)
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800226 .register(UniTagInformation.class)
Ilayda Ozdemir90a93622021-02-25 09:40:58 +0000227 .register(LinkedBlockingQueue.class)
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800228 .build();
229
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700230 eventsQueues = storageService.<ConnectPoint, LinkedBlockingQueue<DiscoveredSubscriber>>consistentMapBuilder()
231 .withName("volt-subscriber-queues")
232 .withApplicationId(appId)
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800233 .withSerializer(Serializer.using(serializer))
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700234 .build().asJavaMap();
alshabib4ceaed32016-03-03 18:00:58 -0800235
alshabibba357492016-01-27 13:49:46 -0800236 deviceService.addListener(deviceListener);
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700237
238 discoveredSubscriberExecutor.execute(this::processDiscoveredSubscribers);
239 eventDispatcher.addSink(AccessDeviceEvent.class, listenerRegistry);
240 log.info("Started");
241
242 deviceListener.handleExistingPorts();
alshabibf0e7e702015-05-30 18:22:36 -0700243 }
244
245 @Deactivate
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700246 protected void deactivate(ComponentContext context) {
247 cfgService.unregisterProperties(getClass(), false);
248 discoveredSubscriberExecutor.shutdown();
Andrea Campanellaeaf23952021-12-30 15:58:54 +0100249 deviceService.removeListener(deviceListener);
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700250 flowsExecutor.shutdown();
251 subscriberExecutor.shutdown();
252 deviceListener.deactivate();
alshabibf0e7e702015-05-30 18:22:36 -0700253 log.info("Stopped");
254 }
255
alshabibe0559672016-02-21 14:49:51 -0800256 @Modified
257 public void modified(ComponentContext context) {
258 Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700259 if (context != null) {
Andrea Campanella971d5b92020-05-07 11:20:43 +0200260 String bpId = get(properties, DEFAULT_BP_ID);
261 defaultBpId = isNullOrEmpty(bpId) ? defaultBpId : bpId;
Gamze Abakaad329652018-12-20 10:12:21 +0000262
Andrea Campanella971d5b92020-05-07 11:20:43 +0200263 String mcastSN = get(properties, DEFAULT_MCAST_SERVICE_NAME);
264 multicastServiceName = isNullOrEmpty(mcastSN) ? multicastServiceName : mcastSN;
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000265
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700266 String flowThreads = get(properties, FLOW_PROCESSING_THREADS);
267 int oldFlowProcessingThreads = flowProcessingThreads;
268 flowProcessingThreads = isNullOrEmpty(flowThreads) ?
269 oldFlowProcessingThreads : Integer.parseInt(flowThreads.trim());
Saurav Das2d3777a2020-08-07 18:48:51 -0700270
Andrea Campanella61650a12022-01-24 18:09:44 -0800271 String executorQueueSize = get(properties, FLOW_EXECUTOR_QUEUE_SIZE);
272 int oldExecutorQueueSize = flowExecutorQueueSize;
273 flowExecutorQueueSize = isNullOrEmpty(executorQueueSize) ?
274 oldExecutorQueueSize : Integer.parseInt(executorQueueSize.trim());
275
276 if (flowsExecutor == null || oldFlowProcessingThreads != flowProcessingThreads
277 || oldExecutorQueueSize != flowExecutorQueueSize) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700278 if (flowsExecutor != null) {
279 flowsExecutor.shutdown();
280 }
Andrea Campanella61650a12022-01-24 18:09:44 -0800281
282 flowsExecutor = new ThreadPoolExecutor(0, flowProcessingThreads, 30,
283 TimeUnit.SECONDS, new ThreadPoolQueue(flowExecutorQueueSize),
284 new ThreadPoolExecutor.DiscardPolicy());
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700285 }
Gamze Abaka33feef52019-02-27 08:16:47 +0000286
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700287 String subscriberThreads = get(properties, SUBSCRIBER_PROCESSING_THREADS);
288 int oldSubscriberProcessingThreads = subscriberProcessingThreads;
289 subscriberProcessingThreads = isNullOrEmpty(subscriberThreads) ?
290 oldSubscriberProcessingThreads : Integer.parseInt(subscriberThreads.trim());
291
292 if (subscriberExecutor == null || oldSubscriberProcessingThreads != subscriberProcessingThreads) {
293 if (subscriberExecutor != null) {
294 subscriberExecutor.shutdown();
295 }
296 subscriberExecutor = Executors.newFixedThreadPool(subscriberProcessingThreads,
297 groupedThreads(ONOS_OLT_SERVICE,
298 "subscriber-installer-%d"));
299 }
300
301 String queueDelay = get(properties, REQUEUE_DELAY);
302 requeueDelay = isNullOrEmpty(queueDelay) ?
303 REQUEUE_DELAY_DEFAULT : Integer.parseInt(queueDelay.trim());
alshabibe0559672016-02-21 14:49:51 -0800304 }
Andrea Campanella61650a12022-01-24 18:09:44 -0800305 log.info("Modified. Values = {}: {}, {}:{}, {}:{}," +
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700306 "{}:{}, {}:{}, {}:{}",
307 DEFAULT_BP_ID, defaultBpId,
308 DEFAULT_MCAST_SERVICE_NAME, multicastServiceName,
309 FLOW_PROCESSING_THREADS, flowProcessingThreads,
Andrea Campanella61650a12022-01-24 18:09:44 -0800310 FLOW_EXECUTOR_QUEUE_SIZE, flowExecutorQueueSize,
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700311 SUBSCRIBER_PROCESSING_THREADS, subscriberProcessingThreads,
312 REQUEUE_DELAY, requeueDelay);
alshabibe0559672016-02-21 14:49:51 -0800313 }
314
Ilayda Ozdemir90a93622021-02-25 09:40:58 +0000315
alshabib32232c82016-02-25 17:57:24 -0500316 @Override
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700317 public boolean provisionSubscriber(ConnectPoint cp) {
318 subscriberExecutor.submit(() -> {
319 Device device = deviceService.getDevice(cp.deviceId());
320 Port port = deviceService.getPort(device.id(), cp.port());
321 AccessDevicePort accessDevicePort = new AccessDevicePort(port);
Hardik Windlass395ff372019-06-13 05:16:00 +0000322
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700323 if (oltDeviceService.isNniPort(device, port.number())) {
324 log.warn("will not provision a subscriber on the NNI {}", accessDevicePort);
325 return false;
Saurav Das026650f2020-09-21 18:56:35 -0700326 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700327
Mahir Gunyelc0bbe572024-03-19 12:58:20 -0700328 log.info("Provisioning subscriber on {}, deviceId {}", accessDevicePort, cp.deviceId());
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700329
330 if (oltFlowService.isSubscriberServiceProvisioned(accessDevicePort)) {
331 log.error("Subscriber on {} is already provisioned", accessDevicePort);
332 return false;
333 }
334
335 SubscriberAndDeviceInformation si = subsService.get(getPortName(port));
336 if (si == null) {
337 log.error("Subscriber information not found in sadis for port {}", accessDevicePort);
338 return false;
339 }
340 DiscoveredSubscriber sub = new DiscoveredSubscriber(device, port,
341 DiscoveredSubscriber.Status.ADDED, true, si);
342
343 // NOTE we need to keep a list of the subscribers that are provisioned on a port,
344 // regardless of the flow status
345 si.uniTagList().forEach(uti -> {
346 ServiceKey sk = new ServiceKey(accessDevicePort, uti);
347 oltFlowService.updateProvisionedSubscriberStatus(sk, true);
348 });
349
350 addSubscriberToQueue(sub);
351 return true;
Andrea Campanella7a1d7e72020-11-05 10:40:10 +0100352 });
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700353 //NOTE this only means we have taken the request in, nothing more.
Amit Ghosh31939522018-08-16 13:28:21 +0100354 return true;
alshabibbf23a1f2016-01-14 17:27:11 -0800355 }
356
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000357 @Override
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700358 public boolean removeSubscriber(ConnectPoint cp) {
359 subscriberExecutor.submit(() -> {
360 Device device = deviceService.getDevice(DeviceId.deviceId(cp.deviceId().toString()));
361 Port port = deviceService.getPort(device.id(), cp.port());
362 AccessDevicePort accessDevicePort = new AccessDevicePort(port);
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000363
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700364 if (oltDeviceService.isNniPort(device, port.number())) {
365 log.warn("will not un-provision a subscriber on the NNI {}",
366 accessDevicePort);
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100367 return false;
368 }
369
Mahir Gunyelc0bbe572024-03-19 12:58:20 -0700370 log.info("Unprovisioning subscriber on {}", accessDevicePort);
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700371
372 if (!oltFlowService.isSubscriberServiceProvisioned(accessDevicePort)) {
373 log.error("Subscriber on {} is not provisioned", accessDevicePort);
374 return false;
375 }
Matteo Scandolo27629042022-06-29 01:35:59 +0000376
Mahir Gunyelc0bbe572024-03-19 12:58:20 -0700377 SubscriberAndDeviceInformation si = subsService.get(getPortName(port));
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700378 if (si == null) {
Mahir Gunyelc0bbe572024-03-19 12:58:20 -0700379 log.error("Subscriber information not found in sadis for port {}",
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700380 accessDevicePort);
381 // NOTE that we are returning true so that the subscriber is removed from the queue
382 // and we can move on provisioning others
383 return false;
384 }
385 DiscoveredSubscriber sub = new DiscoveredSubscriber(device, port,
Andrea Campanella34ce61a2022-04-28 18:55:46 +0200386 DiscoveredSubscriber.Status.ADMIN_REMOVED, true, si);
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700387
388 // NOTE we need to keep a list of the subscribers that are provisioned on a port,
389 // regardless of the flow status
390 si.uniTagList().forEach(uti -> {
391 ServiceKey sk = new ServiceKey(accessDevicePort, uti);
392 oltFlowService.updateProvisionedSubscriberStatus(sk, false);
Gamze Abakaf59c0912019-04-19 08:24:28 +0000393 });
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700394
395 addSubscriberToQueue(sub);
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100396 return true;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700397 });
398 //NOTE this only means we have taken the request in, nothing more.
399 return true;
400 }
401
402 @Override
403 public boolean provisionSubscriber(ConnectPoint cp, VlanId cTag, VlanId sTag, Integer tpId) {
Mahir Gunyelc0bbe572024-03-19 12:58:20 -0700404 log.info("Provisioning subscriber on {}, with cTag {}, stag {}, tpId {}",
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700405 cp, cTag, sTag, tpId);
406 Device device = deviceService.getDevice(cp.deviceId());
407 Port port = deviceService.getPort(device.id(), cp.port());
408 AccessDevicePort accessDevicePort = new AccessDevicePort(port);
409
410 if (oltDeviceService.isNniPort(device, port.number())) {
411 log.warn("will not provision a subscriber on the NNI {}", accessDevicePort);
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100412 return false;
413 }
Amit Ghosh95e2f652017-08-23 12:49:46 +0100414
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700415 SubscriberAndDeviceInformation si = new SubscriberAndDeviceInformation();
Mahir Gunyelc0bbe572024-03-19 12:58:20 -0700416 UniTagInformation specificService = getUniTagInformation(getPortName(port), cTag, sTag, tpId);
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700417 if (specificService == null) {
418 log.error("Can't find Information for subscriber on {}, with cTag {}, " +
419 "stag {}, tpId {}", cp, cTag, sTag, tpId);
Amit Ghosh31939522018-08-16 13:28:21 +0100420 return false;
421 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700422 List<UniTagInformation> uniTagInformationList = new LinkedList<>();
423 uniTagInformationList.add(specificService);
424 si.setUniTagList(uniTagInformationList);
425 DiscoveredSubscriber sub = new DiscoveredSubscriber(device, port,
426 DiscoveredSubscriber.Status.ADDED, true, si);
Amit Ghosh31939522018-08-16 13:28:21 +0100427
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700428 // NOTE we need to keep a list of the subscribers that are provisioned on a port,
429 // regardless of the flow status
430 ServiceKey sk = new ServiceKey(accessDevicePort, specificService);
431 if (oltFlowService.isSubscriberServiceProvisioned(sk)) {
432 log.error("Subscriber on {} is already provisioned", sk);
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100433 return false;
434 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700435 oltFlowService.updateProvisionedSubscriberStatus(sk, true);
436
437 addSubscriberToQueue(sub);
438 return true;
Amit Ghosh31939522018-08-16 13:28:21 +0100439 }
440
441 @Override
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700442 public boolean removeSubscriber(ConnectPoint cp, VlanId cTag, VlanId sTag, Integer tpId) {
Mahir Gunyelc0bbe572024-03-19 12:58:20 -0700443 log.info("Unprovisioning subscriber on {} with cTag {}, stag {}, tpId {}",
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700444 cp, cTag, sTag, tpId);
445 Device device = deviceService.getDevice(cp.deviceId());
446 Port port = deviceService.getPort(device.id(), cp.port());
447 AccessDevicePort accessDevicePort = new AccessDevicePort(port);
448
449 if (oltDeviceService.isNniPort(device, port.number())) {
450 log.warn("will not un-provision a subscriber on the NNI {}",
451 accessDevicePort);
452 return false;
453 }
454
455 SubscriberAndDeviceInformation si = new SubscriberAndDeviceInformation();
Mahir Gunyelc0bbe572024-03-19 12:58:20 -0700456 UniTagInformation specificService = getUniTagInformation(getPortName(port), cTag, sTag, tpId);
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700457 if (specificService == null) {
458 log.error("Can't find Information for subscriber on {}, with cTag {}, " +
459 "stag {}, tpId {}", cp, cTag, sTag, tpId);
460 return false;
461 }
462 List<UniTagInformation> uniTagInformationList = new LinkedList<>();
463 uniTagInformationList.add(specificService);
464 si.setUniTagList(uniTagInformationList);
465 DiscoveredSubscriber sub = new DiscoveredSubscriber(device, port,
Andrea Campanella34ce61a2022-04-28 18:55:46 +0200466 DiscoveredSubscriber.Status.ADMIN_REMOVED, true, si);
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700467
468 // NOTE we need to keep a list of the subscribers that are provisioned on a port,
469 // regardless of the flow status
470 ServiceKey sk = new ServiceKey(accessDevicePort, specificService);
471 if (!oltFlowService.isSubscriberServiceProvisioned(sk)) {
472 log.error("Subscriber on {} is not provisioned", sk);
473 return false;
474 }
475 oltFlowService.updateProvisionedSubscriberStatus(sk, false);
476
477 addSubscriberToQueue(sub);
478 return true;
Saurav Das82b8e6d2018-10-04 15:25:12 -0700479 }
480
481 @Override
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700482 public List<DeviceId> getConnectedOlts() {
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100483 List<DeviceId> olts = new ArrayList<>();
484 Iterable<Device> devices = deviceService.getDevices();
485 for (Device d : devices) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700486 if (oltDeviceService.isOlt(d)) {
Saurav Das82b8e6d2018-10-04 15:25:12 -0700487 // So this is indeed an OLT device
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100488 olts.add(d.id());
489 }
490 }
491 return olts;
alshabibe0559672016-02-21 14:49:51 -0800492 }
493
Amit Ghosh31939522018-08-16 13:28:21 +0100494 /**
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700495 * Finds the connect-point to which a subscriber is connected.
Amit Ghosh31939522018-08-16 13:28:21 +0100496 *
497 * @param id The id of the subscriber, this is the same ID as in Sadis
498 * @return Subscribers ConnectPoint if found else null
499 */
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700500 @Override
501 public ConnectPoint findSubscriberConnectPoint(String id) {
Amit Ghosh31939522018-08-16 13:28:21 +0100502
503 Iterable<Device> devices = deviceService.getDevices();
504 for (Device d : devices) {
505 for (Port p : deviceService.getPorts(d.id())) {
506 log.trace("Comparing {} with {}", p.annotations().value(AnnotationKeys.PORT_NAME), id);
507 if (p.annotations().value(AnnotationKeys.PORT_NAME).equals(id)) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700508 log.debug("Found on {}", portWithName(p));
Amit Ghosh31939522018-08-16 13:28:21 +0100509 return new ConnectPoint(d.id(), p.number());
510 }
511 }
512 }
513 return null;
514 }
515
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700516 protected void processDiscoveredSubscribers() {
Gamze Abaka641fc072018-09-04 09:16:27 +0000517
Andrea Campanella61650a12022-01-24 18:09:44 -0800518 log.info("Started processDiscoveredSubscribers loop");
519 while (true) {
520 Set<ConnectPoint> discoveredCps;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700521 try {
522 queueReadLock.lock();
Andrea Campanella61650a12022-01-24 18:09:44 -0800523 discoveredCps = new HashSet<>(eventsQueues.keySet());
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700524 } catch (Exception e) {
Andrea Campanella61650a12022-01-24 18:09:44 -0800525 log.error("Cannot read keys from queue map", e);
526 continue;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700527 } finally {
528 queueReadLock.unlock();
Tunahan Sezena07fe962021-02-24 08:24:24 +0000529 }
Tunahan Sezena07fe962021-02-24 08:24:24 +0000530
Andrea Campanella61650a12022-01-24 18:09:44 -0800531 discoveredCps.forEach(cp -> {
532 LinkedBlockingQueue<DiscoveredSubscriber> eventsQueue;
533
534 try {
535 queueReadLock.lock();
536 eventsQueue = eventsQueues.get(cp);
537 } catch (Exception e) {
538 log.error("Cannot get key from queue map", e);
539 return;
540 } finally {
541 queueReadLock.unlock();
Andrea Campanella7a1d7e72020-11-05 10:40:10 +0100542 }
Tunahan Sezena07fe962021-02-24 08:24:24 +0000543
Andrea Campanella61650a12022-01-24 18:09:44 -0800544 if (!oltDeviceService.isLocalLeader(cp.deviceId())) {
545 // if we're not local leader for this device, ignore this queue
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700546 if (log.isTraceEnabled()) {
Andrea Campanella61650a12022-01-24 18:09:44 -0800547 log.trace("Ignoring queue on CP {} as not master of the device", cp);
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700548 }
Andrea Campanella61650a12022-01-24 18:09:44 -0800549 return;
550 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700551
Andrea Campanella61650a12022-01-24 18:09:44 -0800552 try {
553 flowsExecutor.execute(() -> {
554 if (!eventsQueue.isEmpty()) {
555 // we do not remove the event from the queue until it has been processed
556 // in that way we guarantee that events are processed in order
557 DiscoveredSubscriber sub = eventsQueue.peek();
558 if (sub == null) {
559 // the queue is empty
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700560 return;
561 }
562
Andrea Campanella61650a12022-01-24 18:09:44 -0800563 if (log.isTraceEnabled()) {
Andrea Campanellaccb32862022-02-17 16:29:10 +0100564 log.trace("Processing subscriber {} on port {} with status {}, has subscriber {}",
Andrea Campanella61650a12022-01-24 18:09:44 -0800565 sub, portWithName(sub.port), sub.status, sub.hasSubscriber);
566 }
567
568 if (sub.hasSubscriber) {
569 // this is a provision subscriber call
570 if (oltFlowService.handleSubscriberFlows(sub, defaultBpId, multicastServiceName)) {
571 removeSubscriberFromQueue(sub);
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700572 }
Andrea Campanella61650a12022-01-24 18:09:44 -0800573 } else {
574 // this is a port event (ENABLED/DISABLED)
575 // means no subscriber was provisioned on that port
576
577 if (!deviceService.isAvailable(sub.device.id()) ||
578 deviceService.getPort(sub.device.id(), sub.port.number()) == null) {
579 // If the device is not connected or the port is not available do nothing
580 // This can happen when we disable and then immediately delete the device,
581 // the queue is populated but the meters and flows are already gone
582 // thus there is nothing left to do
583 return;
584 }
585
586 if (oltFlowService.handleBasicPortFlows(sub, defaultBpId, defaultBpId)) {
587 if (log.isTraceEnabled()) {
588 log.trace("Processing of port {} completed",
589 portWithName(sub.port));
590 }
591 removeSubscriberFromQueue(sub);
Andrea Campanellaccb32862022-02-17 16:29:10 +0100592 } else {
593 log.debug("Not handling basic port flows " +
594 "for {}, leaving in the queue",
595 portWithName(sub.port));
Andrea Campanella61650a12022-01-24 18:09:44 -0800596 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700597 }
Tunahan Sezena07fe962021-02-24 08:24:24 +0000598 }
Andrea Campanella61650a12022-01-24 18:09:44 -0800599 });
600 } catch (Exception e) {
601 log.error("Exception processing subscriber", e);
Tunahan Sezena07fe962021-02-24 08:24:24 +0000602 }
603 });
Tunahan Sezena07fe962021-02-24 08:24:24 +0000604
Andrea Campanella61650a12022-01-24 18:09:44 -0800605 try {
606 TimeUnit.MILLISECONDS.sleep(requeueDelay);
607 } catch (InterruptedException e) {
608 log.debug("Interrupted while waiting to requeue", e);
609 }
Tunahan Sezena07fe962021-02-24 08:24:24 +0000610 }
Tunahan Sezena07fe962021-02-24 08:24:24 +0000611 }
612
Andrea Campanella61650a12022-01-24 18:09:44 -0800613
Tunahan Sezena07fe962021-02-24 08:24:24 +0000614 /**
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000615 * Checks the subscriber uni tag list and find the uni tag information.
616 * using the pon c tag, pon s tag and the technology profile id
617 * May return Optional<null>
618 *
Mahir Gunyelc0bbe572024-03-19 12:58:20 -0700619 * @param portName port of the subscriber
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000620 * @param innerVlan pon c tag
621 * @param outerVlan pon s tag
622 * @param tpId the technology profile id
623 * @return the found uni tag information
624 */
Mahir Gunyelc0bbe572024-03-19 12:58:20 -0700625 private UniTagInformation getUniTagInformation(String portName, VlanId innerVlan,
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700626 VlanId outerVlan, int tpId) {
Tunahan Sezenf0843b92021-04-30 07:13:16 +0000627 log.debug("Getting uni tag information for {}, innerVlan: {}, outerVlan: {}, tpId: {}",
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700628 portName, innerVlan, outerVlan, tpId);
629 SubscriberAndDeviceInformation subInfo = subsService.get(portName);
Gamze Abakaf59c0912019-04-19 08:24:28 +0000630 if (subInfo == null) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700631 log.warn("Subscriber information doesn't exist for {}", portName);
632 return null;
Gamze Abakaf59c0912019-04-19 08:24:28 +0000633 }
634
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000635 List<UniTagInformation> uniTagList = subInfo.uniTagList();
636 if (uniTagList == null) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700637 log.warn("Uni tag list is not found for the subscriber {} on {}", subInfo.id(), portName);
638 return null;
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000639 }
Matteo Scandolo27629042022-06-29 01:35:59 +0000640
Mahir Gunyelc0bbe572024-03-19 12:58:20 -0700641 UniTagInformation service = OltUtils.getUniTagInformation(subInfo, innerVlan, outerVlan, tpId);
Matteo Scandolo27629042022-06-29 01:35:59 +0000642
Mahir Gunyelc0bbe572024-03-19 12:58:20 -0700643 if (service == null) {
Harsh Awasthic1e4bf52022-02-09 14:14:14 +0530644 // Try again after invalidating cache for the particular port name.
645 subsService.invalidateId(portName);
646 subInfo = subsService.get(portName);
Mahir Gunyelc0bbe572024-03-19 12:58:20 -0700647 service = OltUtils.getUniTagInformation(subInfo, innerVlan, outerVlan, tpId);
Gamze Abaka1efc80c2019-02-15 12:10:54 +0000648 }
Gamze Abaka1efc80c2019-02-15 12:10:54 +0000649
Mahir Gunyelc0bbe572024-03-19 12:58:20 -0700650 if (service == null) {
Matteo Scandolo19b56f62020-10-29 13:29:21 -0700651 log.warn("SADIS doesn't include the service with ponCtag {} ponStag {} and tpId {} on {}",
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700652 innerVlan, outerVlan, tpId, portName);
Saurav Das82b8e6d2018-10-04 15:25:12 -0700653 return null;
654 }
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100655
Mahir Gunyelc0bbe572024-03-19 12:58:20 -0700656 return service;
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100657 }
658
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700659 protected void bindSadisService(SadisService service) {
660 sadisService = service;
661 subsService = sadisService.getSubscriberInfoService();
662
663 log.info("Sadis-service binds to onos.");
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100664 }
665
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700666 protected void unbindSadisService(SadisService service) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700667 deviceListener = null;
668 sadisService = null;
669 subsService = null;
670 log.info("Sadis-service unbinds from onos.");
Jonathan Hart1d34c8b2018-05-05 15:37:28 -0700671 }
672
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700673 protected void addSubscriberToQueue(DiscoveredSubscriber sub) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700674 try {
Andrea Campanella61650a12022-01-24 18:09:44 -0800675 ConnectPoint cp = new ConnectPoint(sub.device.id(), sub.port.number());
Matteo Scandolod7aa89c2021-12-07 10:21:34 -0800676
Andrea Campanella61650a12022-01-24 18:09:44 -0800677 try {
678 queueWriteLock.lock();
679 eventsQueues.compute(cp, (subcp, queue) -> {
680 queue = queue == null ? new LinkedBlockingQueue<>() : queue;
681 log.info("Adding subscriber {} to queue: {} with existing {}",
682 sub, portWithName(sub.port), queue);
683 queue.add(sub);
684 return queue;
685 });
686 } catch (UnsupportedOperationException | ClassCastException |
687 NullPointerException | IllegalArgumentException e) {
688 log.error("Cannot add subscriber {} to queue: {}", portWithName(sub.port), e.getMessage());
689 } finally {
690 queueWriteLock.unlock();
691 }
692 } catch (Exception e) {
693 log.error("Can't add {} to queue", sub, e);
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800694 }
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800695 }
696
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700697 protected void removeSubscriberFromQueue(DiscoveredSubscriber sub) {
698 ConnectPoint cp = new ConnectPoint(sub.device.id(), sub.port.number());
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700699 try {
700 queueWriteLock.lock();
Andrea Campanella61650a12022-01-24 18:09:44 -0800701 eventsQueues.compute(cp, (subcp, queue) -> {
702 if (log.isTraceEnabled()) {
703 log.trace("Removing subscriber {} from queue : {} " +
704 "with existing {}", sub,
705 portWithName(sub.port), queue);
706 }
707 if (queue == null) {
708 log.warn("Cannot find queue for connectPoint {}", cp);
709 return queue;
710 }
711 boolean removed = queue.remove(sub);
712 if (!removed) {
713 log.warn("Subscriber {} has not been removed from queue, " +
714 "is it still there? {}", sub, queue);
715 return queue;
716 } else {
717 log.debug("Subscriber {} has been removed from the queue {}",
718 sub, queue);
719 }
720
721 return queue;
722 });
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700723 } catch (UnsupportedOperationException | ClassCastException |
724 NullPointerException | IllegalArgumentException e) {
725 log.error("Cannot remove subscriber {} from queue: {}", sub, e.getMessage());
726 } finally {
727 queueWriteLock.unlock();
Tunahan Sezena07fe962021-02-24 08:24:24 +0000728 }
729 }
730
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700731 protected class OltDeviceListener
732 implements DeviceListener {
733
734 private final Logger log = LoggerFactory.getLogger(getClass());
735 protected ExecutorService eventExecutor;
736
737 /**
738 * Builds the listener with all the proper services and information needed.
739 */
740 public OltDeviceListener() {
741 this.eventExecutor = Executors.newFixedThreadPool(flowProcessingThreads,
742 groupedThreads("onos/olt-device-listener-event", "event-%d", log));
743 }
744
745 public void deactivate() {
746 this.eventExecutor.shutdown();
747 }
Saurav Dasa9d5f442019-03-06 19:32:48 -0800748
alshabibf0e7e702015-05-30 18:22:36 -0700749 @Override
750 public void event(DeviceEvent event) {
Andrea Campanella61650a12022-01-24 18:09:44 -0800751 if (log.isTraceEnabled()) {
752 log.trace("OltListener receives event {} for: {}/{}", event.type(), event.subject().id(),
753 event.port() != null ? event.port().number() : null);
754 }
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700755 eventExecutor.execute(() -> {
Andrea Campanella61650a12022-01-24 18:09:44 -0800756 if (log.isTraceEnabled()) {
757 log.trace("OltListener Executor receives event {} for: {}/{}",
758 event.type(), event.subject().id(),
759 event.port() != null ? event.port().number() : null);
760 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700761 boolean isOlt = oltDeviceService.isOlt(event.subject());
762 DeviceId deviceId = event.subject().id();
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700763 switch (event.type()) {
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700764 case PORT_STATS_UPDATED:
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700765 case DEVICE_ADDED:
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700766 return;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700767 case PORT_ADDED:
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700768 case PORT_REMOVED:
769 if (!isOlt) {
770 log.trace("Ignoring event {}, this is not an OLT device", deviceId);
771 return;
772 }
773 if (!oltDeviceService.isLocalLeader(deviceId)) {
774 log.trace("Device {} is not local to this node", deviceId);
775 return;
776 }
777 // port added, updated and removed are treated in the same way as we only care whether the port
778 // is enabled or not
779 handleOltPort(event.type(), event.subject(), event.port());
780 return;
Andrea Campanellacc6dc7e2022-03-22 10:38:43 +0100781 case PORT_UPDATED:
782 if (!isOlt) {
783 log.trace("Ignoring event {}, this is not an OLT device", deviceId);
784 return;
785 }
786 // port updated are handled only when the device is available, makes not sense otherwise.
787 // this also solves an issue with port events and device disconnection events handled
788 // in sparse order causing failures in the ofagent disconnect test
789 // (see https://jira.opencord.org/browse/VOL-4669)
790 if (!deviceService.isAvailable(deviceId)) {
791 log.debug("Ignoring port event {} on {} as it is disconnected", event, deviceId);
792 return;
793 }
794 if (!oltDeviceService.isLocalLeader(deviceId)) {
795 log.trace("Device {} is not local to this node", deviceId);
796 return;
797 }
798 handleOltPort(event.type(), event.subject(), event.port());
799 return;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700800 case DEVICE_AVAILABILITY_CHANGED:
801 if (!isOlt) {
802 log.trace("Ignoring event {}, this is not an OLT device", deviceId);
803 return;
804 }
805 if (deviceService.isAvailable(deviceId)) {
806 if (!oltDeviceService.isLocalLeader(deviceId)) {
807 if (log.isTraceEnabled()) {
808 log.trace("Device {} is not local to this node, not handling available device",
809 deviceId);
810 }
811 } else {
812 log.info("Handling available device: {}", deviceId);
813 handleExistingPorts();
814 }
815 } else if (!deviceService.isAvailable(deviceId) && deviceService.getPorts(deviceId).isEmpty()) {
816 // NOTE that upon disconnection there is no mastership on the device,
817 // and we should anyway clear the local cache of the flows/meters across instances.
818 // We're only clearing the device if there are no available ports,
819 // otherwise we assume it's a temporary disconnection
820 log.info("Device {} availability changed to false and ports are empty, " +
821 "purging meters and flows", deviceId);
822 //NOTE all the instances will call these methods
823 oltFlowService.purgeDeviceFlows(deviceId);
824 oltMeterService.purgeDeviceMeters(deviceId);
Andrea Campanellaccb32862022-02-17 16:29:10 +0100825 // cpStatus is a distributed map, thus only master will update it.
826 if (oltDeviceService.isLocalLeader(deviceId)) {
827 log.debug("Master, clearing cp status for {}", deviceId);
828 clearQueueForDevice(deviceId);
829 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700830 } else {
831 log.info("Device {} availability changed to false, but ports are still available, " +
Matteo Scandolod7aa89c2021-12-07 10:21:34 -0800832 "assuming temporary disconnection.",
833 deviceId);
834 if (log.isTraceEnabled()) {
835 log.trace("Available ports: {}", deviceService.getPorts(deviceId));
836 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700837 }
838 return;
839 case DEVICE_REMOVED:
840 if (!isOlt) {
841 log.trace("Ignoring event {}, this is not an OLT device", deviceId);
842 return;
843 }
844 log.info("Device {} Removed, purging meters and flows", deviceId);
845 oltFlowService.purgeDeviceFlows(deviceId);
846 oltMeterService.purgeDeviceMeters(deviceId);
Andrea Campanellaccb32862022-02-17 16:29:10 +0100847 if (oltDeviceService.isLocalLeader(deviceId)) {
848 log.debug("Master, clearing cp status for {}", deviceId);
849 clearQueueForDevice(deviceId);
850 }
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700851 return;
852 default:
853 if (log.isTraceEnabled()) {
854 log.trace("Not handling event: {}, ", event);
855 }
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700856 }
857 });
alshabibf0e7e702015-05-30 18:22:36 -0700858 }
Gamze Abaka838d8142019-02-21 07:06:55 +0000859
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700860 protected void clearQueueForDevice(DeviceId devId) {
861 try {
862 queueWriteLock.lock();
863 Iterator<Map.Entry<ConnectPoint, LinkedBlockingQueue<DiscoveredSubscriber>>> iter =
864 eventsQueues.entrySet().iterator();
865 while (iter.hasNext()) {
866 Map.Entry<ConnectPoint, LinkedBlockingQueue<DiscoveredSubscriber>> entry = iter.next();
867 if (entry.getKey().deviceId().equals(devId)) {
868 eventsQueues.remove(entry.getKey());
Andrea Campanellaccb32862022-02-17 16:29:10 +0100869 log.debug("Removing key from queue {}", entry.getKey());
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700870 }
871 }
872 } finally {
873 queueWriteLock.unlock();
Gamze Abaka838d8142019-02-21 07:06:55 +0000874 }
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000875 }
876
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700877 protected void handleOltPort(DeviceEvent.Type type, Device device, Port port) {
878 log.info("OltDeviceListener receives event {} for port {} with status {} on device {}", type,
879 portWithName(port), port.isEnabled() ? "ENABLED" : "DISABLED", device.id());
Andrea Campanella61650a12022-01-24 18:09:44 -0800880 boolean isNni = oltDeviceService.isNniPort(device, port.number());
881
882 if (!isNni && type == DeviceEvent.Type.PORT_ADDED) {
883 log.debug("Ignoring PORT_ADD on UNI port {}", portWithName(port));
884 return;
885 }
886
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700887 if (port.isEnabled()) {
Andrea Campanella61650a12022-01-24 18:09:44 -0800888 if (isNni) {
Gustavo Silva29fb20e2022-05-26 09:59:54 -0300889 FlowOperation action = FlowOperation.ADD;
Matteo Scandolo1f8de332021-12-06 12:18:24 -0800890 // NOTE the NNI is only disabled if the OLT shuts down (reboot or failure).
891 // In that case the flows are purged anyway, so there's no need to deal with them,
892 // it would actually be counter-productive as the openflow connection is severed and they won't
893 // be correctly processed
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700894 if (type == DeviceEvent.Type.PORT_REMOVED) {
Matteo Scandolo1f8de332021-12-06 12:18:24 -0800895 log.debug("NNI port went down, " +
896 "ignoring event as flows will be removed in the generic device cleanup");
897 return;
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700898 }
899 oltFlowService.handleNniFlows(device, port, action);
900 } else {
901 post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_ADDED, device.id(), port));
902 // NOTE if the subscriber was previously provisioned,
903 // then add it back to the queue to be re-provisioned
904 boolean provisionSubscriber = oltFlowService.
Mahir Gunyelc0bbe572024-03-19 12:58:20 -0700905 isSubscriberServiceProvisioned(new AccessDevicePort(port));
906 SubscriberAndDeviceInformation si = subsService.get(getPortName(port));
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700907 if (si == null) {
908 //NOTE this should not happen given that the subscriber was provisioned before
909 log.error("Subscriber information not found in sadis for port {}",
910 portWithName(port));
911 return;
912 }
913
Mahir Gunyelc0bbe572024-03-19 12:58:20 -0700914 DiscoveredSubscriber.Status status = DiscoveredSubscriber.Status.ADDED;
915 if (type == DeviceEvent.Type.PORT_REMOVED) {
916 status = DiscoveredSubscriber.Status.REMOVED;
917 }
918
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700919 DiscoveredSubscriber sub =
920 new DiscoveredSubscriber(device, port,
921 status, provisionSubscriber, si);
922 addSubscriberToQueue(sub);
923 }
924 } else {
Andrea Campanella61650a12022-01-24 18:09:44 -0800925 if (isNni) {
Matteo Scandolob6981dc2021-12-02 16:31:44 -0800926 // NOTE the NNI is only disabled if the OLT shuts down (reboot or failure).
927 // In that case the flows are purged anyway, so there's no need to deal with them,
928 // it would actually be counter-productive as the openflow connection is severed and they won't
929 // be correctly processed
930 log.debug("NNI port went down, " +
931 "ignoring event as flows will be removed in the generic device cleanup");
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700932 } else {
933 post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_REMOVED, device.id(), port));
934 // NOTE we are assuming that if a subscriber has default eapol
935 // it does not have subscriber flows
936 if (oltFlowService.hasDefaultEapol(port)) {
937 SubscriberAndDeviceInformation si = subsService.get(getPortName(port));
938 if (si == null) {
939 //NOTE this should not happen given that the subscriber was provisioned before
940 log.error("Subscriber information not found in sadis for port {}",
941 portWithName(port));
942 return;
943 }
944 DiscoveredSubscriber sub =
945 new DiscoveredSubscriber(device, port,
946 DiscoveredSubscriber.Status.REMOVED, false, si);
947
948 addSubscriberToQueue(sub);
949
950 } else if (oltFlowService.isSubscriberServiceProvisioned(new AccessDevicePort(port))) {
Mahir Gunyelc0bbe572024-03-19 12:58:20 -0700951 SubscriberAndDeviceInformation si = subsService.get(getPortName(port));
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700952 if (si == null) {
953 //NOTE this should not happen given that the subscriber was provisioned before
954 log.error("Subscriber information not found in sadis for port {}",
955 portWithName(port));
956 return;
957 }
958 DiscoveredSubscriber sub =
959 new DiscoveredSubscriber(device, port,
960 DiscoveredSubscriber.Status.REMOVED, true, si);
961 addSubscriberToQueue(sub);
962 }
963 }
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000964 }
Gamze Abaka838d8142019-02-21 07:06:55 +0000965 }
Gamze Abakada282b42019-03-11 13:16:48 +0000966
Matteo Scandoloaa2adde2021-09-13 12:45:32 -0700967 /**
968 * This method is invoked on app activation in order to deal
969 * with devices and ports that are already existing in the system
970 * and thus won't trigger an event.
971 * It is also needed on instance reboot and device reconnect
972 */
973 protected void handleExistingPorts() {
974 Iterable<DeviceId> devices = getConnectedOlts();
975 for (DeviceId deviceId : devices) {
976 log.info("Handling existing OLT Ports for device {}", deviceId);
977 if (oltDeviceService.isLocalLeader(deviceId)) {
978 List<Port> ports = deviceService.getPorts(deviceId);
979 for (Port p : ports) {
980 if (PortNumber.LOCAL.equals(p.number()) || !p.isEnabled()) {
981 continue;
982 }
983 Device device = deviceService.getDevice(deviceId);
984 deviceListener.handleOltPort(DeviceEvent.Type.PORT_UPDATED, device, p);
985 }
986 }
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800987 }
988 }
989 }
Andrea Campanella61650a12022-01-24 18:09:44 -0800990
991 protected final class ThreadPoolQueue extends ArrayBlockingQueue<Runnable> {
992
993 public ThreadPoolQueue(int capacity) {
994 super(capacity);
995 }
996
997 @Override
998 public boolean offer(Runnable runnable) {
999 if (runnable == null) {
1000 return false;
1001 }
1002 try {
1003 put(runnable);
1004 } catch (InterruptedException e1) {
1005 Thread.currentThread().interrupt();
1006 return false;
1007 }
1008 return true;
1009 }
1010
1011 }
Hardik Windlass395ff372019-06-13 05:16:00 +00001012}