blob: 000d5e620918e85a73ef7426e94791d0a330ee0f [file] [log] [blame]
alshabibf0e7e702015-05-30 18:22:36 -07001/*
Brian O'Connord6a135a2017-08-03 22:46:05 -07002 * Copyright 2016-present Open Networking Foundation
alshabibf0e7e702015-05-30 18:22:36 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
alshabib36a4d732016-06-01 16:03:59 -070016package org.opencord.olt.impl;
alshabibf0e7e702015-05-30 18:22:36 -070017
Carmelo Casconeca931162019-07-15 18:22:24 -070018import com.google.common.collect.ImmutableMap;
Carmelo Casconeca931162019-07-15 18:22:24 -070019import com.google.common.collect.Sets;
alshabibf0e7e702015-05-30 18:22:36 -070020import org.onlab.packet.VlanId;
Jonathan Hart4f178fa2020-02-03 10:46:01 -080021import org.onlab.util.KryoNamespace;
alshabibe0559672016-02-21 14:49:51 -080022import org.onosproject.cfg.ComponentConfigService;
Jonathan Hart4f178fa2020-02-03 10:46:01 -080023import org.onosproject.cluster.ClusterEvent;
24import org.onosproject.cluster.ClusterEventListener;
25import org.onosproject.cluster.ClusterService;
26import org.onosproject.cluster.ControllerNode;
27import org.onosproject.cluster.NodeId;
alshabibf0e7e702015-05-30 18:22:36 -070028import org.onosproject.core.ApplicationId;
29import org.onosproject.core.CoreService;
alshabib8e4fd2f2016-01-12 15:55:53 -080030import org.onosproject.event.AbstractListenerManager;
Amit Ghosh1ed9aef2018-07-17 17:08:16 +010031import org.onosproject.net.AnnotationKeys;
Jonathan Harte533a422015-10-20 17:31:24 -070032import org.onosproject.net.ConnectPoint;
Amit Ghosh1ed9aef2018-07-17 17:08:16 +010033import org.onosproject.net.Device;
alshabibf0e7e702015-05-30 18:22:36 -070034import org.onosproject.net.DeviceId;
alshabibdec2e252016-01-15 12:20:25 -080035import org.onosproject.net.Port;
alshabibf0e7e702015-05-30 18:22:36 -070036import org.onosproject.net.PortNumber;
37import org.onosproject.net.device.DeviceEvent;
38import org.onosproject.net.device.DeviceListener;
39import org.onosproject.net.device.DeviceService;
alshabibf0e7e702015-05-30 18:22:36 -070040import org.onosproject.net.flowobjective.FlowObjectiveService;
41import org.onosproject.net.flowobjective.ForwardingObjective;
alshabib3ea82642016-01-12 18:06:53 -080042import org.onosproject.net.flowobjective.Objective;
43import org.onosproject.net.flowobjective.ObjectiveContext;
44import org.onosproject.net.flowobjective.ObjectiveError;
Saurav Daseae48de2019-06-19 13:26:15 -070045import org.onosproject.net.meter.MeterId;
Jonathan Hart4f178fa2020-02-03 10:46:01 -080046import org.onosproject.store.serializers.KryoNamespaces;
47import org.onosproject.store.service.ConsistentMultimap;
48import org.onosproject.store.service.Serializer;
49import org.onosproject.store.service.StorageService;
alshabib36a4d732016-06-01 16:03:59 -070050import org.opencord.olt.AccessDeviceEvent;
51import org.opencord.olt.AccessDeviceListener;
52import org.opencord.olt.AccessDeviceService;
Amit Ghosh31939522018-08-16 13:28:21 +010053import org.opencord.olt.AccessSubscriberId;
Andrea Campanellacbbb7952019-11-25 06:38:41 +000054import org.opencord.olt.internalapi.AccessDeviceFlowService;
55import org.opencord.olt.internalapi.AccessDeviceMeterService;
Gamze Abaka641fc072018-09-04 09:16:27 +000056import org.opencord.sadis.BandwidthProfileInformation;
57import org.opencord.sadis.BaseInformationService;
58import org.opencord.sadis.SadisService;
Amit Ghosh1ed9aef2018-07-17 17:08:16 +010059import org.opencord.sadis.SubscriberAndDeviceInformation;
Andrea Campanellacbbb7952019-11-25 06:38:41 +000060import org.opencord.sadis.UniTagInformation;
alshabibe0559672016-02-21 14:49:51 -080061import org.osgi.service.component.ComponentContext;
Carmelo Casconeca931162019-07-15 18:22:24 -070062import org.osgi.service.component.annotations.Activate;
63import org.osgi.service.component.annotations.Component;
64import org.osgi.service.component.annotations.Deactivate;
65import org.osgi.service.component.annotations.Modified;
66import org.osgi.service.component.annotations.Reference;
67import org.osgi.service.component.annotations.ReferenceCardinality;
alshabibf0e7e702015-05-30 18:22:36 -070068import org.slf4j.Logger;
69
Carmelo Casconeca931162019-07-15 18:22:24 -070070import java.util.ArrayList;
Jonathan Hart4f178fa2020-02-03 10:46:01 -080071import java.util.Collection;
Carmelo Casconeca931162019-07-15 18:22:24 -070072import java.util.Dictionary;
73import java.util.List;
74import java.util.Map;
Carmelo Casconeca931162019-07-15 18:22:24 -070075import java.util.Optional;
76import java.util.Properties;
77import java.util.Set;
78import java.util.concurrent.CompletableFuture;
Carmelo Casconeca931162019-07-15 18:22:24 -070079import java.util.concurrent.ExecutorService;
80import java.util.concurrent.Executors;
Carmelo Casconeca931162019-07-15 18:22:24 -070081
82import static com.google.common.base.Preconditions.checkNotNull;
Carmelo Casconeca931162019-07-15 18:22:24 -070083import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
Jonathan Hart4f178fa2020-02-03 10:46:01 -080084import static java.util.stream.Collectors.collectingAndThen;
85import static java.util.stream.Collectors.groupingBy;
86import static java.util.stream.Collectors.mapping;
87import static java.util.stream.Collectors.toList;
88import static java.util.stream.Collectors.toSet;
Carmelo Casconeca931162019-07-15 18:22:24 -070089import static org.onlab.util.Tools.get;
90import static org.onlab.util.Tools.groupedThreads;
Jonathan Hart4f178fa2020-02-03 10:46:01 -080091import static org.opencord.olt.impl.OsgiPropertyConstants.DEFAULT_BP_ID;
92import static org.opencord.olt.impl.OsgiPropertyConstants.DEFAULT_BP_ID_DEFAULT;
93import static org.opencord.olt.impl.OsgiPropertyConstants.DEFAULT_MCAST_SERVICE_NAME;
94import static org.opencord.olt.impl.OsgiPropertyConstants.DEFAULT_MCAST_SERVICE_NAME_DEFAULT;
Carmelo Casconeca931162019-07-15 18:22:24 -070095import static org.slf4j.LoggerFactory.getLogger;
alshabibf0e7e702015-05-30 18:22:36 -070096
97/**
Jonathan Harte533a422015-10-20 17:31:24 -070098 * Provisions rules on access devices.
alshabibf0e7e702015-05-30 18:22:36 -070099 */
Carmelo Casconeca931162019-07-15 18:22:24 -0700100@Component(immediate = true,
101 property = {
Carmelo Casconeca931162019-07-15 18:22:24 -0700102 DEFAULT_BP_ID + ":String=" + DEFAULT_BP_ID_DEFAULT,
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000103 DEFAULT_MCAST_SERVICE_NAME + ":String=" + DEFAULT_MCAST_SERVICE_NAME_DEFAULT,
Carmelo Casconeca931162019-07-15 18:22:24 -0700104 })
alshabib8e4fd2f2016-01-12 15:55:53 -0800105public class Olt
106 extends AbstractListenerManager<AccessDeviceEvent, AccessDeviceListener>
107 implements AccessDeviceService {
Charles Chan54f110f2017-01-20 11:22:42 -0800108 private static final String APP_NAME = "org.opencord.olt";
alshabibe0559672016-02-21 14:49:51 -0800109
Gamze Abakada282b42019-03-11 13:16:48 +0000110 private static final short EAPOL_DEFAULT_VLAN = 4091;
Gamze Abaka838d8142019-02-21 07:06:55 +0000111 private static final String NO_UPLINK_PORT = "No uplink port found for OLT device {}";
alshabibe0559672016-02-21 14:49:51 -0800112
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800113 public static final int HASH_WEIGHT = 10;
114
alshabibf0e7e702015-05-30 18:22:36 -0700115 private final Logger log = getLogger(getClass());
116
Thomas Lee Sd7735f92020-02-20 19:21:47 +0530117 private static final String NNI = "nni-";
118
Carmelo Casconeca931162019-07-15 18:22:24 -0700119 @Reference(cardinality = ReferenceCardinality.MANDATORY)
alshabibf0e7e702015-05-30 18:22:36 -0700120 protected FlowObjectiveService flowObjectiveService;
121
Carmelo Casconeca931162019-07-15 18:22:24 -0700122 @Reference(cardinality = ReferenceCardinality.MANDATORY)
alshabibf0e7e702015-05-30 18:22:36 -0700123 protected DeviceService deviceService;
124
Carmelo Casconeca931162019-07-15 18:22:24 -0700125 @Reference(cardinality = ReferenceCardinality.MANDATORY)
alshabibf0e7e702015-05-30 18:22:36 -0700126 protected CoreService coreService;
127
Carmelo Casconeca931162019-07-15 18:22:24 -0700128 @Reference(cardinality = ReferenceCardinality.MANDATORY)
alshabibe0559672016-02-21 14:49:51 -0800129 protected ComponentConfigService componentConfigService;
130
Carmelo Casconeca931162019-07-15 18:22:24 -0700131 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Gamze Abaka641fc072018-09-04 09:16:27 +0000132 protected SadisService sadisService;
133
Carmelo Casconeca931162019-07-15 18:22:24 -0700134 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000135 protected AccessDeviceFlowService oltFlowService;
alshabibe0559672016-02-21 14:49:51 -0800136
Carmelo Casconeca931162019-07-15 18:22:24 -0700137 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000138 protected AccessDeviceMeterService oltMeterService;
Gamze Abakaad329652018-12-20 10:12:21 +0000139
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800140 @Reference(cardinality = ReferenceCardinality.MANDATORY)
141 protected StorageService storageService;
142
143 @Reference(cardinality = ReferenceCardinality.MANDATORY)
144 protected ClusterService clusterService;
145
Carmelo Casconeca931162019-07-15 18:22:24 -0700146 /**
Carmelo Cascone95ff5122019-11-14 14:19:13 -0800147 * Default bandwidth profile id that is used for authentication trap flows.
Carmelo Casconeca931162019-07-15 18:22:24 -0700148 **/
149 protected String defaultBpId = DEFAULT_BP_ID_DEFAULT;
Gamze Abakaad329652018-12-20 10:12:21 +0000150
Carmelo Casconeca931162019-07-15 18:22:24 -0700151 /**
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000152 * Deleting Meters based on flow count statistics.
Carmelo Casconeca931162019-07-15 18:22:24 -0700153 **/
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000154 protected String multicastServiceName = DEFAULT_MCAST_SERVICE_NAME;
Gamze Abaka33feef52019-02-27 08:16:47 +0000155
alshabibf0e7e702015-05-30 18:22:36 -0700156 private final DeviceListener deviceListener = new InternalDeviceListener();
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800157 private final ClusterEventListener clusterListener = new InternalClusterListener();
158
159 private ConsistentHasher hasher;
alshabibf0e7e702015-05-30 18:22:36 -0700160
Gamze Abaka641fc072018-09-04 09:16:27 +0000161 protected BaseInformationService<SubscriberAndDeviceInformation> subsService;
162 private BaseInformationService<BandwidthProfileInformation> bpService;
alshabibf0e7e702015-05-30 18:22:36 -0700163
Gamze Abaka641fc072018-09-04 09:16:27 +0000164 private ExecutorService oltInstallers = Executors.newFixedThreadPool(4,
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000165 groupedThreads("onos/olt-service",
166 "olt-installer-%d"));
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100167
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700168 protected ExecutorService eventExecutor;
169
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800170 private ConsistentMultimap<ConnectPoint, UniTagInformation> programmedSubs;
Saurav Dasa9d5f442019-03-06 19:32:48 -0800171
alshabibf0e7e702015-05-30 18:22:36 -0700172 @Activate
alshabibe0559672016-02-21 14:49:51 -0800173 public void activate(ComponentContext context) {
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000174 eventExecutor = newSingleThreadScheduledExecutor(groupedThreads("onos/olt",
175 "events-%d", log));
alshabibe0559672016-02-21 14:49:51 -0800176 modified(context);
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000177 ApplicationId appId = coreService.registerApplication(APP_NAME);
Saurav Das62ad75e2019-03-05 12:22:22 -0800178
179 // ensure that flow rules are purged from flow-store upon olt-disconnection
180 // when olt reconnects, the port-numbers may change for the ONUs
181 // making flows pushed earlier invalid
182 componentConfigService
183 .preSetProperty("org.onosproject.net.flow.impl.FlowRuleManager",
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000184 "purgeOnDisconnection", "true");
Gamze Abakada282b42019-03-11 13:16:48 +0000185 componentConfigService
186 .preSetProperty("org.onosproject.net.meter.impl.MeterManager",
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000187 "purgeOnDisconnection", "true");
alshabibe0559672016-02-21 14:49:51 -0800188 componentConfigService.registerProperties(getClass());
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800189
190 KryoNamespace serializer = KryoNamespace.newBuilder()
191 .register(KryoNamespaces.API)
192 .register(UniTagInformation.class)
193 .build();
194
195 programmedSubs = storageService.<ConnectPoint, UniTagInformation>consistentMultimapBuilder()
196 .withName("volt-programmed-subs")
197 .withSerializer(Serializer.using(serializer))
198 .withApplicationId(appId)
199 .build();
alshabibc4dfe852015-06-05 13:35:13 -0700200
alshabib8e4fd2f2016-01-12 15:55:53 -0800201 eventDispatcher.addSink(AccessDeviceEvent.class, listenerRegistry);
202
Gamze Abaka641fc072018-09-04 09:16:27 +0000203 subsService = sadisService.getSubscriberInfoService();
204 bpService = sadisService.getBandwidthProfileService();
205
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800206 List<NodeId> readyNodes = clusterService.getNodes().stream()
207 .filter(c -> clusterService.getState(c.id()) == ControllerNode.State.READY)
208 .map(ControllerNode::id)
209 .collect(toList());
210 hasher = new ConsistentHasher(readyNodes, HASH_WEIGHT);
211 clusterService.addListener(clusterListener);
212
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100213 // look for all provisioned devices in Sadis and create EAPOL flows for the
214 // UNI ports
215 Iterable<Device> devices = deviceService.getDevices();
216 for (Device d : devices) {
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800217 if (isDeviceMine(d.id())) {
218 checkAndCreateDeviceFlows(d);
219 }
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100220 }
alshabib4ceaed32016-03-03 18:00:58 -0800221
alshabibba357492016-01-27 13:49:46 -0800222 deviceService.addListener(deviceListener);
alshabibf0e7e702015-05-30 18:22:36 -0700223 log.info("Started with Application ID {}", appId.id());
224 }
225
226 @Deactivate
227 public void deactivate() {
alshabibe0559672016-02-21 14:49:51 -0800228 componentConfigService.unregisterProperties(getClass(), false);
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800229 clusterService.removeListener(clusterListener);
alshabib62e9ce72016-02-11 17:31:36 -0800230 deviceService.removeListener(deviceListener);
Jonathan Hart5f1c8142018-07-24 17:31:59 -0700231 eventDispatcher.removeSink(AccessDeviceEvent.class);
alshabibf0e7e702015-05-30 18:22:36 -0700232 log.info("Stopped");
233 }
234
alshabibe0559672016-02-21 14:49:51 -0800235 @Modified
236 public void modified(ComponentContext context) {
237 Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
238
239 try {
Gamze Abakaad329652018-12-20 10:12:21 +0000240 String bpId = get(properties, "defaultBpId");
241 defaultBpId = bpId;
242
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000243 String mcastSN = get(properties, "multicastServiceName");
244 multicastServiceName = mcastSN;
245
246 log.debug("OLT properties: DefaultBpId: {}, MulticastServiceName: {}", defaultBpId, multicastServiceName);
Gamze Abaka33feef52019-02-27 08:16:47 +0000247
alshabibe0559672016-02-21 14:49:51 -0800248 } catch (Exception e) {
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000249 log.error("Error while modifying the properties", e);
alshabibe0559672016-02-21 14:49:51 -0800250 }
251 }
252
alshabib32232c82016-02-25 17:57:24 -0500253 @Override
Gamze Abaka838d8142019-02-21 07:06:55 +0000254 public boolean provisionSubscriber(ConnectPoint connectPoint) {
Saurav Daseae48de2019-06-19 13:26:15 -0700255 log.info("Call to provision subscriber at {}", connectPoint);
Gamze Abaka838d8142019-02-21 07:06:55 +0000256 DeviceId deviceId = connectPoint.deviceId();
257 PortNumber subscriberPortNo = connectPoint.port();
258
259 checkNotNull(deviceService.getPort(deviceId, subscriberPortNo),
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000260 "Invalid connect point:" + connectPoint);
Hardik Windlass395ff372019-06-13 05:16:00 +0000261
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100262 // Find the subscriber on this connect point
Gamze Abaka838d8142019-02-21 07:06:55 +0000263 SubscriberAndDeviceInformation sub = getSubscriber(connectPoint);
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100264 if (sub == null) {
Gamze Abaka838d8142019-02-21 07:06:55 +0000265 log.warn("No subscriber found for {}", connectPoint);
Amit Ghosh31939522018-08-16 13:28:21 +0100266 return false;
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100267 }
Jonathan Harte533a422015-10-20 17:31:24 -0700268
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100269 // Get the uplink port
Gamze Abaka838d8142019-02-21 07:06:55 +0000270 Port uplinkPort = getUplinkPort(deviceService.getDevice(deviceId));
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100271 if (uplinkPort == null) {
Gamze Abaka838d8142019-02-21 07:06:55 +0000272 log.warn(NO_UPLINK_PORT, deviceId);
Amit Ghosh31939522018-08-16 13:28:21 +0100273 return false;
Jonathan Harte533a422015-10-20 17:31:24 -0700274 }
275
Gamze Abaka838d8142019-02-21 07:06:55 +0000276 //delete Eapol authentication flow with default bandwidth
Gamze Abaka33feef52019-02-27 08:16:47 +0000277 //wait until Eapol rule with defaultBpId is removed to install subscriber-based rules
Gamze Abaka33feef52019-02-27 08:16:47 +0000278 //install subscriber flows
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000279 CompletableFuture<ObjectiveError> filterFuture = new CompletableFuture();
280 oltFlowService.processEapolFilteringObjectives(deviceId, subscriberPortNo, defaultBpId, filterFuture,
281 VlanId.vlanId(EAPOL_DEFAULT_VLAN), false);
Gamze Abaka33feef52019-02-27 08:16:47 +0000282 filterFuture.thenAcceptAsync(filterStatus -> {
283 if (filterStatus == null) {
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000284 provisionUniTagList(connectPoint, uplinkPort.number(), sub);
Gamze Abaka33feef52019-02-27 08:16:47 +0000285 }
286 });
Amit Ghosh31939522018-08-16 13:28:21 +0100287 return true;
alshabibb7a9e172016-01-13 11:23:53 -0800288 }
289
290 @Override
Gamze Abaka838d8142019-02-21 07:06:55 +0000291 public boolean removeSubscriber(ConnectPoint connectPoint) {
Saurav Daseae48de2019-06-19 13:26:15 -0700292 log.info("Call to un-provision subscriber at {}", connectPoint);
Gamze Abaka838d8142019-02-21 07:06:55 +0000293
Saurav Daseae48de2019-06-19 13:26:15 -0700294 // Get the subscriber connected to this port from the local cache
295 // If we don't know about the subscriber there's no need to remove it
Gamze Abaka838d8142019-02-21 07:06:55 +0000296 DeviceId deviceId = connectPoint.deviceId();
297 PortNumber subscriberPortNo = connectPoint.port();
298
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800299 Collection<? extends UniTagInformation> uniTagInformationSet = programmedSubs.get(connectPoint).value();
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000300 if (uniTagInformationSet == null || uniTagInformationSet.isEmpty()) {
Gamze Abaka838d8142019-02-21 07:06:55 +0000301 log.warn("Subscriber on connectionPoint {} was not previously programmed, " +
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000302 "no need to remove it", connectPoint);
Matteo Scandolo962a6ad2018-12-11 15:39:42 -0800303 return true;
alshabibbf23a1f2016-01-14 17:27:11 -0800304 }
305
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100306 // Get the uplink port
Gamze Abaka838d8142019-02-21 07:06:55 +0000307 Port uplinkPort = getUplinkPort(deviceService.getDevice(deviceId));
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100308 if (uplinkPort == null) {
Gamze Abaka838d8142019-02-21 07:06:55 +0000309 log.warn(NO_UPLINK_PORT, deviceId);
Amit Ghosh31939522018-08-16 13:28:21 +0100310 return false;
alshabib4ceaed32016-03-03 18:00:58 -0800311 }
312
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000313 for (UniTagInformation uniTag : uniTagInformationSet) {
Amit Ghosh95e2f652017-08-23 12:49:46 +0100314
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000315 if (multicastServiceName.equals(uniTag.getServiceName())) {
316 continue;
317 }
Gamze Abaka838d8142019-02-21 07:06:55 +0000318
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000319 unprovisionVlans(deviceId, uplinkPort.number(), subscriberPortNo, uniTag);
alshabibbf23a1f2016-01-14 17:27:11 -0800320
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000321 // re-install eapol with default bandwidth profile
322 oltFlowService.processEapolFilteringObjectives(deviceId, subscriberPortNo,
323 uniTag.getUpstreamBandwidthProfile(),
324 null, uniTag.getPonCTag(), false);
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100325
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000326 Port port = deviceService.getPort(deviceId, subscriberPortNo);
327 if (port != null && port.isEnabled()) {
328 oltFlowService.processEapolFilteringObjectives(deviceId, subscriberPortNo, defaultBpId,
329 null, VlanId.vlanId(EAPOL_DEFAULT_VLAN), true);
330 } else {
331 log.debug("Port {} is no longer enabled or it's unavailable. Not "
332 + "reprogramming default eapol flow", connectPoint);
333 }
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100334 }
Amit Ghosh31939522018-08-16 13:28:21 +0100335 return true;
alshabibbf23a1f2016-01-14 17:27:11 -0800336 }
337
Gamze Abakaf59c0912019-04-19 08:24:28 +0000338
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000339 @Override
340 public boolean provisionSubscriber(AccessSubscriberId subscriberId, Optional<VlanId> sTag,
341 Optional<VlanId> cTag, Optional<Integer> tpId) {
342
343 log.info("Provisioning subscriber using subscriberId {}, sTag {}, cTag {}, tpId {}" +
344 "", subscriberId, sTag, cTag, tpId);
Gamze Abakaf59c0912019-04-19 08:24:28 +0000345
Amit Ghosh31939522018-08-16 13:28:21 +0100346 // Check if we can find the connect point to which this subscriber is connected
347 ConnectPoint subsPort = findSubscriberConnectPoint(subscriberId.toString());
348 if (subsPort == null) {
349 log.warn("ConnectPoint for {} not found", subscriberId);
350 return false;
351 }
352
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100353 if (!sTag.isPresent() && !cTag.isPresent()) {
354 return provisionSubscriber(subsPort);
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000355 } else if (sTag.isPresent() && cTag.isPresent() && tpId.isPresent()) {
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100356 Port uplinkPort = getUplinkPort(deviceService.getDevice(subsPort.deviceId()));
357 if (uplinkPort == null) {
Gamze Abaka838d8142019-02-21 07:06:55 +0000358 log.warn(NO_UPLINK_PORT, subsPort.deviceId());
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100359 return false;
360 }
361
Gamze Abakaf59c0912019-04-19 08:24:28 +0000362 //delete Eapol authentication flow with default bandwidth
363 //wait until Eapol rule with defaultBpId is removed to install subscriber-based rules
Gamze Abakaf59c0912019-04-19 08:24:28 +0000364 //install subscriber flows
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000365 CompletableFuture<ObjectiveError> filterFuture = new CompletableFuture();
366 oltFlowService.processEapolFilteringObjectives(subsPort.deviceId(), subsPort.port(), defaultBpId,
367 filterFuture, VlanId.vlanId(EAPOL_DEFAULT_VLAN), false);
Gamze Abakaf59c0912019-04-19 08:24:28 +0000368 filterFuture.thenAcceptAsync(filterStatus -> {
369 if (filterStatus == null) {
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000370 provisionUniTagInformation(subsPort.deviceId(), uplinkPort.number(), subsPort.port(),
371 cTag.get(), sTag.get(), tpId.get());
Gamze Abakaf59c0912019-04-19 08:24:28 +0000372 }
373 });
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100374 return true;
375 } else {
376 log.warn("Provisioning failed for subscriber: {}", subscriberId);
377 return false;
378 }
Amit Ghosh31939522018-08-16 13:28:21 +0100379 }
Amit Ghosh95e2f652017-08-23 12:49:46 +0100380
alshabibe0559672016-02-21 14:49:51 -0800381 @Override
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000382 public boolean removeSubscriber(AccessSubscriberId subscriberId, Optional<VlanId> sTag,
383 Optional<VlanId> cTag, Optional<Integer> tpId) {
Amit Ghosh31939522018-08-16 13:28:21 +0100384 // Check if we can find the connect point to which this subscriber is connected
385 ConnectPoint subsPort = findSubscriberConnectPoint(subscriberId.toString());
386 if (subsPort == null) {
387 log.warn("ConnectPoint for {} not found", subscriberId);
388 return false;
389 }
390
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100391 if (!sTag.isPresent() && !cTag.isPresent()) {
392 return removeSubscriber(subsPort);
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000393 } else if (sTag.isPresent() && cTag.isPresent() && tpId.isPresent()) {
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100394 // Get the uplink port
395 Port uplinkPort = getUplinkPort(deviceService.getDevice(subsPort.deviceId()));
396 if (uplinkPort == null) {
Gamze Abaka838d8142019-02-21 07:06:55 +0000397 log.warn(NO_UPLINK_PORT, subsPort.deviceId());
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100398 return false;
399 }
400
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000401 Optional<UniTagInformation> tagInfo = getUniTagInformation(subsPort, cTag.get(), sTag.get(), tpId.get());
402 if (!tagInfo.isPresent()) {
403 log.warn("UniTagInformation does not exist for Device/Port {}, cTag {}, sTag {}, tpId {}",
404 subsPort, cTag, sTag, tpId);
405 return false;
406 }
Gamze Abakaf59c0912019-04-19 08:24:28 +0000407
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000408 unprovisionVlans(subsPort.deviceId(), uplinkPort.number(), subsPort.port(), tagInfo.get());
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100409 return true;
410 } else {
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000411 log.warn("Removing subscriber is not possible - please check the provided information" +
412 "for the subscriber: {}", subscriberId);
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100413 return false;
414 }
Amit Ghosh31939522018-08-16 13:28:21 +0100415 }
416
417 @Override
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000418 public ImmutableMap<ConnectPoint, Set<UniTagInformation>> getProgSubs() {
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800419 return programmedSubs.stream()
420 .collect(collectingAndThen(
421 groupingBy(Map.Entry::getKey, mapping(Map.Entry::getValue, toSet())),
422 ImmutableMap::copyOf));
Saurav Das82b8e6d2018-10-04 15:25:12 -0700423 }
424
425 @Override
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100426 public List<DeviceId> fetchOlts() {
427 // look through all the devices and find the ones that are OLTs as per Sadis
428 List<DeviceId> olts = new ArrayList<>();
429 Iterable<Device> devices = deviceService.getDevices();
430 for (Device d : devices) {
Saurav Das82b8e6d2018-10-04 15:25:12 -0700431 if (getOltInfo(d) != null) {
432 // So this is indeed an OLT device
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100433 olts.add(d.id());
434 }
435 }
436 return olts;
alshabibe0559672016-02-21 14:49:51 -0800437 }
438
Amit Ghosh31939522018-08-16 13:28:21 +0100439 /**
440 * Finds the connect point to which a subscriber is connected.
441 *
442 * @param id The id of the subscriber, this is the same ID as in Sadis
443 * @return Subscribers ConnectPoint if found else null
444 */
445 private ConnectPoint findSubscriberConnectPoint(String id) {
446
447 Iterable<Device> devices = deviceService.getDevices();
448 for (Device d : devices) {
449 for (Port p : deviceService.getPorts(d.id())) {
450 log.trace("Comparing {} with {}", p.annotations().value(AnnotationKeys.PORT_NAME), id);
451 if (p.annotations().value(AnnotationKeys.PORT_NAME).equals(id)) {
452 log.debug("Found on device {} port {}", d.id(), p.number());
453 return new ConnectPoint(d.id(), p.number());
454 }
455 }
456 }
457 return null;
458 }
459
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000460 /**
461 * Gets the context of the bandwidth profile information for the given parameter.
462 *
463 * @param bandwidthProfile the bandwidth profile id
464 * @return the context of the bandwidth profile information
465 */
Gamze Abaka641fc072018-09-04 09:16:27 +0000466 private BandwidthProfileInformation getBandwidthProfileInformation(String bandwidthProfile) {
467 if (bandwidthProfile == null) {
468 return null;
469 }
470 return bpService.get(bandwidthProfile);
471 }
472
Gamze Abaka838d8142019-02-21 07:06:55 +0000473 /**
Gamze Abaka33feef52019-02-27 08:16:47 +0000474 * Removes subscriber vlan flows.
Gamze Abaka838d8142019-02-21 07:06:55 +0000475 *
476 * @param deviceId the device identifier
477 * @param uplink uplink port of the OLT
478 * @param subscriberPort uni port
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000479 * @param uniTag uni tag information
Gamze Abaka838d8142019-02-21 07:06:55 +0000480 */
Gamze Abaka33feef52019-02-27 08:16:47 +0000481 private void unprovisionVlans(DeviceId deviceId, PortNumber uplink,
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000482 PortNumber subscriberPort, UniTagInformation uniTag) {
483
484 log.info("Unprovisioning vlans for {} at {}/{}", uniTag, deviceId, subscriberPort);
alshabibbf23a1f2016-01-14 17:27:11 -0800485
486 CompletableFuture<ObjectiveError> downFuture = new CompletableFuture();
487 CompletableFuture<ObjectiveError> upFuture = new CompletableFuture();
488
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000489 VlanId deviceVlan = uniTag.getPonSTag();
490 VlanId subscriberVlan = uniTag.getPonCTag();
Gamze Abaka641fc072018-09-04 09:16:27 +0000491
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000492 MeterId upstreamMeterId = oltMeterService
493 .getMeterIdFromBpMapping(deviceId, uniTag.getUpstreamBandwidthProfile());
494 MeterId downstreamMeterId = oltMeterService
495 .getMeterIdFromBpMapping(deviceId, uniTag.getDownstreamBandwidthProfile());
Gamze Abaka641fc072018-09-04 09:16:27 +0000496
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000497 ForwardingObjective.Builder upFwd =
498 oltFlowService.createUpBuilder(uplink, subscriberPort, upstreamMeterId, uniTag);
499 ForwardingObjective.Builder downFwd =
500 oltFlowService.createDownBuilder(uplink, subscriberPort, downstreamMeterId, uniTag);
501
502 if (uniTag.getIsIgmpRequired()) {
503 oltFlowService.processIgmpFilteringObjectives(deviceId, subscriberPort,
504 upstreamMeterId, uniTag, false, true);
505 }
506 if (uniTag.getIsDhcpRequired()) {
507 oltFlowService.processDhcpFilteringObjectives(deviceId, subscriberPort,
508 upstreamMeterId, uniTag, false, true);
509 }
alshabibbf23a1f2016-01-14 17:27:11 -0800510
alshabib4ceaed32016-03-03 18:00:58 -0800511 flowObjectiveService.forward(deviceId, upFwd.remove(new ObjectiveContext() {
512 @Override
513 public void onSuccess(Objective objective) {
514 upFuture.complete(null);
515 }
alshabibbf23a1f2016-01-14 17:27:11 -0800516
alshabib4ceaed32016-03-03 18:00:58 -0800517 @Override
518 public void onError(Objective objective, ObjectiveError error) {
519 upFuture.complete(error);
520 }
521 }));
522
523 flowObjectiveService.forward(deviceId, downFwd.remove(new ObjectiveContext() {
524 @Override
525 public void onSuccess(Objective objective) {
526 downFuture.complete(null);
527 }
528
529 @Override
530 public void onError(Objective objective, ObjectiveError error) {
531 downFuture.complete(error);
532 }
533 }));
alshabibbf23a1f2016-01-14 17:27:11 -0800534
535 upFuture.thenAcceptBothAsync(downFuture, (upStatus, downStatus) -> {
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000536 AccessDeviceEvent.Type type = AccessDeviceEvent.Type.SUBSCRIBER_UNI_TAG_UNREGISTERED;
alshabibbf23a1f2016-01-14 17:27:11 -0800537 if (upStatus == null && downStatus == null) {
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000538 log.debug("Uni tag information is unregistered successfully for cTag {}, sTag {}, tpId {}, and" +
539 "Device/Port{}", uniTag.getPonCTag(), uniTag.getPonSTag(),
540 uniTag.getTechnologyProfileId(), subscriberPort);
541 updateProgrammedSubscriber(new ConnectPoint(deviceId, subscriberPort), uniTag, false);
alshabibbf23a1f2016-01-14 17:27:11 -0800542 } else if (downStatus != null) {
543 log.error("Subscriber with vlan {} on device {} " +
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000544 "on port {} failed downstream uninstallation: {}",
545 subscriberVlan, deviceId, subscriberPort, downStatus);
546 type = AccessDeviceEvent.Type.SUBSCRIBER_UNI_TAG_UNREGISTRATION_FAILED;
alshabibbf23a1f2016-01-14 17:27:11 -0800547 } else if (upStatus != null) {
548 log.error("Subscriber with vlan {} on device {} " +
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000549 "on port {} failed upstream uninstallation: {}",
550 subscriberVlan, deviceId, subscriberPort, upStatus);
551 type = AccessDeviceEvent.Type.SUBSCRIBER_UNI_TAG_UNREGISTRATION_FAILED;
alshabibbf23a1f2016-01-14 17:27:11 -0800552 }
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000553 Port port = deviceService.getPort(deviceId, subscriberPort);
554 post(new AccessDeviceEvent(type, deviceId, port, deviceVlan, subscriberVlan,
555 uniTag.getTechnologyProfileId()));
alshabibbf23a1f2016-01-14 17:27:11 -0800556 }, oltInstallers);
Jonathan Harte533a422015-10-20 17:31:24 -0700557 }
558
Gamze Abaka838d8142019-02-21 07:06:55 +0000559 /**
Gamze Abaka33feef52019-02-27 08:16:47 +0000560 * Adds subscriber vlan flows, dhcp, eapol and igmp trap flows for the related uni port.
Gamze Abaka838d8142019-02-21 07:06:55 +0000561 *
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000562 * @param connectPoint the connection point of the subscriber
563 * @param uplinkPort uplink port of the OLT (the nni port)
564 * @param sub subscriber information that includes s, c tags, tech profile and bandwidth profile references
Gamze Abaka838d8142019-02-21 07:06:55 +0000565 */
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000566 private void provisionUniTagList(ConnectPoint connectPoint, PortNumber uplinkPort,
567 SubscriberAndDeviceInformation sub) {
Gamze Abaka641fc072018-09-04 09:16:27 +0000568
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000569 log.info("Provisioning vlans for subscriber {} on dev/port: {}", sub, connectPoint);
Gamze Abaka641fc072018-09-04 09:16:27 +0000570
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000571 if (sub.uniTagList() == null || sub.uniTagList().isEmpty()) {
572 log.warn("Unitaglist doesn't exist for the subscriber {}", sub.id());
573 return;
574 }
Gamze Abaka641fc072018-09-04 09:16:27 +0000575
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000576 DeviceId deviceId = connectPoint.deviceId();
577 PortNumber subscriberPort = connectPoint.port();
Gamze Abaka641fc072018-09-04 09:16:27 +0000578
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000579 for (UniTagInformation uniTag : sub.uniTagList()) {
580 handleSubscriberFlows(deviceId, uplinkPort, subscriberPort, uniTag);
581 }
582 }
alshabib3ea82642016-01-12 18:06:53 -0800583
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000584 /**
585 * Finds the uni tag information and provisions the found information.
586 * If the uni tag information is not found, returns
587 *
588 * @param deviceId the access device id
589 * @param uplinkPort the nni port
590 * @param subscriberPort the uni port
591 * @param innerVlan the pon c tag
592 * @param outerVlan the pon s tag
593 * @param tpId the technology profile id
594 */
595 private void provisionUniTagInformation(DeviceId deviceId, PortNumber uplinkPort,
596 PortNumber subscriberPort,
597 VlanId innerVlan,
598 VlanId outerVlan,
599 Integer tpId) {
Jonathan Harte533a422015-10-20 17:31:24 -0700600
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000601 ConnectPoint cp = new ConnectPoint(deviceId, subscriberPort);
602 Optional<UniTagInformation> gotTagInformation = getUniTagInformation(cp, innerVlan, outerVlan, tpId);
603 if (!gotTagInformation.isPresent()) {
604 return;
605 }
606 UniTagInformation tagInformation = gotTagInformation.get();
607 handleSubscriberFlows(deviceId, uplinkPort, subscriberPort, tagInformation);
608 }
alshabib3ea82642016-01-12 18:06:53 -0800609
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000610 private void updateProgrammedSubscriber(ConnectPoint connectPoint, UniTagInformation tagInformation, Boolean add) {
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800611 if (add) {
612 programmedSubs.put(connectPoint, tagInformation);
613 } else {
614 programmedSubs.remove(connectPoint, tagInformation);
615 }
Jonathan Harte533a422015-10-20 17:31:24 -0700616 }
617
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000618 /**
619 * Installs a uni tag information flow.
620 *
621 * @param deviceId the access device id
622 * @param uplinkPort the nni port
623 * @param subscriberPort the uni port
624 * @param tagInfo the uni tag information
625 */
626 private void handleSubscriberFlows(DeviceId deviceId, PortNumber uplinkPort, PortNumber subscriberPort,
627 UniTagInformation tagInfo) {
628
629 log.info("Provisioning vlan-based flows for the uniTagInformation {}", tagInfo);
630
631 Port port = deviceService.getPort(deviceId, subscriberPort);
632
633 if (multicastServiceName.equals(tagInfo.getServiceName())) {
634 // IGMP flows are taken care of along with VOD service
635 // Please note that for each service, Subscriber Registered event will be sent
636 post(new AccessDeviceEvent(AccessDeviceEvent.Type.SUBSCRIBER_UNI_TAG_REGISTERED,
637 deviceId, port, tagInfo.getPonSTag(), tagInfo.getPonCTag(),
638 tagInfo.getTechnologyProfileId()));
639 return;
Gamze Abaka641fc072018-09-04 09:16:27 +0000640 }
641
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100642 ConnectPoint cp = new ConnectPoint(deviceId, subscriberPort);
643
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000644 BandwidthProfileInformation upstreamBpInfo =
645 getBandwidthProfileInformation(tagInfo.getUpstreamBandwidthProfile());
646 BandwidthProfileInformation downstreamBpInfo =
647 getBandwidthProfileInformation(tagInfo.getDownstreamBandwidthProfile());
Gamze Abakaf59c0912019-04-19 08:24:28 +0000648
649 CompletableFuture<Object> upstreamMeterFuture = new CompletableFuture<>();
650 CompletableFuture<Object> downsteamMeterFuture = new CompletableFuture<>();
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000651 CompletableFuture<ObjectiveError> upFuture = new CompletableFuture<>();
652 CompletableFuture<ObjectiveError> downFuture = new CompletableFuture<>();
Gamze Abakaf59c0912019-04-19 08:24:28 +0000653
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000654 MeterId upstreamMeterId = oltMeterService.createMeter(deviceId, upstreamBpInfo, upstreamMeterFuture);
655
656 MeterId downstreamMeterId = oltMeterService.createMeter(deviceId, downstreamBpInfo, downsteamMeterFuture);
Gamze Abakaf59c0912019-04-19 08:24:28 +0000657
658 upstreamMeterFuture.thenAcceptAsync(result -> {
659 if (result == null) {
660 log.info("Upstream Meter {} is sent to the device {}. " +
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000661 "Sending subscriber flows.", upstreamMeterId, deviceId);
Gamze Abakaf59c0912019-04-19 08:24:28 +0000662
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000663 ForwardingObjective.Builder upFwd =
664 oltFlowService.createUpBuilder(uplinkPort, subscriberPort, upstreamMeterId, tagInfo);
Gamze Abakaf59c0912019-04-19 08:24:28 +0000665
666 flowObjectiveService.forward(deviceId, upFwd.add(new ObjectiveContext() {
667 @Override
668 public void onSuccess(Objective objective) {
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000669 log.info("Upstream flow installed successfully");
Gamze Abakaf59c0912019-04-19 08:24:28 +0000670 upFuture.complete(null);
671 }
672
673 @Override
674 public void onError(Objective objective, ObjectiveError error) {
675 upFuture.complete(error);
676 }
677 }));
678
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000679 } else if (upstreamBpInfo == null) {
680 log.warn("No meter installed since no Upstream BW Profile definition found for " +
681 "ctag {} stag {} tpId {} and Device/port: {}:{}",
682 tagInfo.getPonCTag(), tagInfo.getPonSTag(),
683 tagInfo.getTechnologyProfileId(),
684 deviceId, subscriberPort);
Gamze Abakaf59c0912019-04-19 08:24:28 +0000685 } else {
686 log.warn("Meter installation error while sending upstream flows. " +
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000687 "Result {} and MeterId {}", result, upstreamMeterId);
Gamze Abakaf59c0912019-04-19 08:24:28 +0000688 }
689 });
690
691 downsteamMeterFuture.thenAcceptAsync(result -> {
692 if (result == null) {
693 log.info("Downstream Meter {} is sent to the device {}. " +
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000694 "Sending subscriber flows.", downstreamMeterId, deviceId);
Gamze Abakaf59c0912019-04-19 08:24:28 +0000695
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000696 ForwardingObjective.Builder downFwd =
697 oltFlowService.createDownBuilder(uplinkPort, subscriberPort, downstreamMeterId, tagInfo);
Gamze Abakaf59c0912019-04-19 08:24:28 +0000698
699 flowObjectiveService.forward(deviceId, downFwd.add(new ObjectiveContext() {
700 @Override
701 public void onSuccess(Objective objective) {
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000702 log.info("Downstream flow installed successfully");
Gamze Abakaf59c0912019-04-19 08:24:28 +0000703 downFuture.complete(null);
704 }
705
706 @Override
707 public void onError(Objective objective, ObjectiveError error) {
708 downFuture.complete(error);
709 }
710 }));
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000711
712 } else if (downstreamBpInfo == null) {
713 log.warn("No meter installed since no Downstream BW Profile definition found for " +
714 "ctag {} stag {} tpId {} and Device/port: {}:{}",
715 tagInfo.getPonCTag(), tagInfo.getPonSTag(),
716 tagInfo.getTechnologyProfileId(),
717 deviceId, subscriberPort);
Gamze Abakaf59c0912019-04-19 08:24:28 +0000718 } else {
719 log.warn("Meter installation error while sending upstream flows. " +
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000720 "Result {} and MeterId {}", result, downstreamMeterId);
Gamze Abakaf59c0912019-04-19 08:24:28 +0000721 }
722 });
723
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100724 upFuture.thenAcceptBothAsync(downFuture, (upStatus, downStatus) -> {
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000725 AccessDeviceEvent.Type type = AccessDeviceEvent.Type.SUBSCRIBER_UNI_TAG_REGISTERED;
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100726 if (downStatus != null) {
727 log.error("Flow with innervlan {} and outerVlan {} on device {} " +
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000728 "on port {} failed downstream installation: {}",
729 tagInfo.getPonCTag(), tagInfo.getPonSTag(), deviceId, cp, downStatus);
730 type = AccessDeviceEvent.Type.SUBSCRIBER_UNI_TAG_REGISTRATION_FAILED;
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100731 } else if (upStatus != null) {
732 log.error("Flow with innerVlan {} and outerVlan {} on device {} " +
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000733 "on port {} failed upstream installation: {}",
734 tagInfo.getPonCTag(), tagInfo.getPonSTag(), deviceId, cp, upStatus);
735 type = AccessDeviceEvent.Type.SUBSCRIBER_UNI_TAG_REGISTRATION_FAILED;
Gamze Abakaf59c0912019-04-19 08:24:28 +0000736 } else {
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000737 log.info("Upstream and downstream data plane flows are installed successfully.");
738 oltFlowService.processEapolFilteringObjectives(deviceId, subscriberPort,
739 tagInfo.getUpstreamBandwidthProfile(),
740 null, tagInfo.getPonCTag(), true);
741 if (tagInfo.getIsDhcpRequired()) {
742 oltFlowService.processDhcpFilteringObjectives(deviceId, subscriberPort,
743 upstreamMeterId, tagInfo, true, true);
744 }
Gamze Abakaf59c0912019-04-19 08:24:28 +0000745
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000746 if (tagInfo.getIsIgmpRequired()) {
747 oltFlowService.processIgmpFilteringObjectives(deviceId, subscriberPort, upstreamMeterId, tagInfo,
748 true, true);
749 }
750 updateProgrammedSubscriber(cp, tagInfo, true);
751 post(new AccessDeviceEvent(type, deviceId, port, tagInfo.getPonSTag(), tagInfo.getPonCTag(),
752 tagInfo.getTechnologyProfileId()));
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100753 }
754 }, oltInstallers);
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100755 }
756
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000757 /**
758 * Checks the subscriber uni tag list and find the uni tag information.
759 * using the pon c tag, pon s tag and the technology profile id
760 * May return Optional<null>
761 *
762 * @param cp the connection point of the subscriber
763 * @param innerVlan pon c tag
764 * @param outerVlan pon s tag
765 * @param tpId the technology profile id
766 * @return the found uni tag information
767 */
768 private Optional<UniTagInformation> getUniTagInformation(ConnectPoint cp, VlanId innerVlan, VlanId outerVlan,
769 int tpId) {
770 log.info("Getting uni tag information for cp: {}, innerVlan: {}, outerVlan: {}, tpId: {}", cp, innerVlan,
771 outerVlan, tpId);
772 SubscriberAndDeviceInformation subInfo = getSubscriber(cp);
Gamze Abakaf59c0912019-04-19 08:24:28 +0000773 if (subInfo == null) {
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000774 log.warn("Subscriber information doesn't exist for the connect point {}", cp);
775 return Optional.empty();
Gamze Abakaf59c0912019-04-19 08:24:28 +0000776 }
777
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000778 List<UniTagInformation> uniTagList = subInfo.uniTagList();
779 if (uniTagList == null) {
780 log.warn("Uni tag list is not found for the subscriber {}", subInfo.id());
781 return Optional.empty();
782 }
Amit Ghoshe1d3f092018-10-09 19:44:33 +0100783
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000784 UniTagInformation service = null;
785 for (UniTagInformation tagInfo : subInfo.uniTagList()) {
786 if (innerVlan.equals(tagInfo.getPonCTag()) && outerVlan.equals(tagInfo.getPonSTag())
787 && tpId == tagInfo.getTechnologyProfileId()) {
788 service = tagInfo;
789 break;
Andy Bavier160e8682019-05-07 18:32:22 -0700790 }
Gamze Abaka1efc80c2019-02-15 12:10:54 +0000791 }
Gamze Abaka1efc80c2019-02-15 12:10:54 +0000792
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000793 if (service == null) {
794 log.warn("SADIS doesn't include the service with ponCtag {} ponStag {} and tpId {}",
795 innerVlan, outerVlan, tpId);
796 return Optional.empty();
Gamze Abaka33feef52019-02-27 08:16:47 +0000797 }
798
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000799 return Optional.of(service);
Amit Ghosh95e2f652017-08-23 12:49:46 +0100800 }
801
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100802 /**
Jonathan Hart403372d2018-08-22 11:44:13 -0700803 * Creates trap flows for device, including DHCP and LLDP trap on NNI and
804 * EAPOL trap on the UNIs, if device is present in Sadis config.
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100805 *
806 * @param dev Device to look for
807 */
Jonathan Hart403372d2018-08-22 11:44:13 -0700808 private void checkAndCreateDeviceFlows(Device dev) {
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100809 // check if this device is provisioned in Sadis
Gamze Abaka641fc072018-09-04 09:16:27 +0000810 SubscriberAndDeviceInformation deviceInfo = getOltInfo(dev);
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000811 log.info("checkAndCreateDeviceFlows: deviceInfo {}", deviceInfo);
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100812
813 if (deviceInfo != null) {
Jonathan Hart403372d2018-08-22 11:44:13 -0700814 // This is an OLT device as per Sadis, we create flows for UNI and NNI ports
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100815 for (Port p : deviceService.getPorts(dev.id())) {
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800816 if (PortNumber.LOCAL.equals(p.number()) || !p.isEnabled()) {
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000817 continue;
818 }
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100819 if (isUniPort(dev, p)) {
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000820 log.info("Creating Eapol for the uni {}", p);
821 oltFlowService.processEapolFilteringObjectives(dev.id(), p.number(), defaultBpId, null,
822 VlanId.vlanId(EAPOL_DEFAULT_VLAN), true);
Jonathan Hart403372d2018-08-22 11:44:13 -0700823 } else {
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000824 oltFlowService.processNniFilteringObjectives(dev.id(), p.number(), true);
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100825 }
826 }
827 }
828 }
829
Jonathan Hart403372d2018-08-22 11:44:13 -0700830
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100831 /**
832 * Get the uplink for of the OLT device.
Gamze Abakaad329652018-12-20 10:12:21 +0000833 * <p>
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100834 * This assumes that the OLT has a single uplink port. When more uplink ports need to be supported
835 * this logic needs to be changed
836 *
837 * @param dev Device to look for
838 * @return The uplink Port of the OLT
839 */
840 private Port getUplinkPort(Device dev) {
841 // check if this device is provisioned in Sadis
Gamze Abaka641fc072018-09-04 09:16:27 +0000842 SubscriberAndDeviceInformation deviceInfo = getOltInfo(dev);
Saurav Daseae48de2019-06-19 13:26:15 -0700843 log.trace("getUplinkPort: deviceInfo {}", deviceInfo);
Saurav Das82b8e6d2018-10-04 15:25:12 -0700844 if (deviceInfo == null) {
845 log.warn("Device {} is not configured in SADIS .. cannot fetch device"
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000846 + " info", dev.id());
Saurav Das82b8e6d2018-10-04 15:25:12 -0700847 return null;
848 }
849 // Return the port that has been configured as the uplink port of this OLT in Sadis
Gamze Abakaad329652018-12-20 10:12:21 +0000850 for (Port p : deviceService.getPorts(dev.id())) {
Saurav Das82b8e6d2018-10-04 15:25:12 -0700851 if (p.number().toLong() == deviceInfo.uplinkPort()) {
Saurav Daseae48de2019-06-19 13:26:15 -0700852 log.trace("getUplinkPort: Found port {}", p);
Saurav Das82b8e6d2018-10-04 15:25:12 -0700853 return p;
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100854 }
855 }
856
Saurav Daseae48de2019-06-19 13:26:15 -0700857 log.warn("getUplinkPort: " + NO_UPLINK_PORT, dev.id());
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100858 return null;
859 }
860
861 /**
862 * Return the subscriber on a port.
863 *
Matteo Scandolo962a6ad2018-12-11 15:39:42 -0800864 * @param cp ConnectPoint on which to find the subscriber
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100865 * @return subscriber if found else null
866 */
Matteo Scandolo962a6ad2018-12-11 15:39:42 -0800867 SubscriberAndDeviceInformation getSubscriber(ConnectPoint cp) {
868 Port port = deviceService.getPort(cp);
869 checkNotNull(port, "Invalid connect point");
870 String portName = port.annotations().value(AnnotationKeys.PORT_NAME);
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100871 return subsService.get(portName);
872 }
873
Gamze Abakaad329652018-12-20 10:12:21 +0000874 /**
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000875 * Checks whether the given port of the device is a uni port or not.
876 *
877 * @param d the access device
878 * @param p the port of the device
879 * @return true if the given port is a uni port
Gamze Abakaad329652018-12-20 10:12:21 +0000880 */
Amit Ghosh1ed9aef2018-07-17 17:08:16 +0100881 private boolean isUniPort(Device d, Port p) {
882 Port ulPort = getUplinkPort(d);
883 if (ulPort != null) {
884 return (ulPort.number().toLong() != p.number().toLong());
885 }
Thomas Lee Sd7735f92020-02-20 19:21:47 +0530886 //handles a special case where NNI port is misconfigured in SADIS and getUplinkPort(d) returns null
887 //checks whether the port name starts with nni- which is the signature of an NNI Port
888 if (p.annotations().value(AnnotationKeys.PORT_NAME) != null &&
889 p.annotations().value(AnnotationKeys.PORT_NAME).startsWith(NNI)) {
890 log.error("NNI port number {} is not matching with configured value", p.number().toLong());
891 return false;
892 }
893 return true;
Jonathan Hart1d34c8b2018-05-05 15:37:28 -0700894 }
895
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000896 /**
897 * Gets the given device details from SADIS.
898 * If the device is not found, returns null
899 *
900 * @param dev the access device
901 * @return the olt information
902 */
Jonathan Hart4c538002018-08-23 10:11:54 -0700903 private SubscriberAndDeviceInformation getOltInfo(Device dev) {
904 String devSerialNo = dev.serialNumber();
Gamze Abaka641fc072018-09-04 09:16:27 +0000905 return subsService.get(devSerialNo);
Jonathan Hart4c538002018-08-23 10:11:54 -0700906 }
907
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800908 /**
909 * Determines if this instance should handle this device based on
910 * consistent hashing.
911 *
912 * @param id device ID
913 * @return true if this instance should handle the device, otherwise false
914 */
915 private boolean isDeviceMine(DeviceId id) {
916 NodeId nodeId = hasher.hash(id.toString());
917 if (log.isDebugEnabled()) {
918 log.debug("Node that will handle {} is {}", id, nodeId);
919 }
920 return nodeId.equals(clusterService.getLocalNode().id());
921 }
922
alshabibf0e7e702015-05-30 18:22:36 -0700923 private class InternalDeviceListener implements DeviceListener {
Saurav Dasa9d5f442019-03-06 19:32:48 -0800924 private Set<DeviceId> programmedDevices = Sets.newConcurrentHashSet();
925
alshabibf0e7e702015-05-30 18:22:36 -0700926 @Override
927 public void event(DeviceEvent event) {
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700928 eventExecutor.execute(() -> {
929 DeviceId devId = event.subject().id();
930 Device dev = event.subject();
Gamze Abaka838d8142019-02-21 07:06:55 +0000931 Port port = event.port();
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000932 DeviceEvent.Type eventType = event.type();
Jonathan Hart4c538002018-08-23 10:11:54 -0700933
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000934 if (DeviceEvent.Type.PORT_STATS_UPDATED.equals(eventType) ||
935 DeviceEvent.Type.DEVICE_SUSPENDED.equals(eventType) ||
936 DeviceEvent.Type.DEVICE_UPDATED.equals(eventType)) {
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700937 return;
938 }
Jonathan Hart4c538002018-08-23 10:11:54 -0700939
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800940 // Only handle the event if the device belongs to us
941 if (!isDeviceMine(event.subject().id())) {
942 return;
943 }
944
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000945 log.debug("OLT got {} event for {} {}", eventType, event.subject(), event.port());
946
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700947 if (getOltInfo(dev) == null) {
Saurav Dasa9d5f442019-03-06 19:32:48 -0800948 // it's possible that we got an event for a previously
949 // programmed OLT that is no longer available in SADIS
950 // we let such events go through
951 if (!programmedDevices.contains(devId)) {
952 log.warn("No device info found for {}, this is either "
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000953 + "not an OLT or not known to sadis", dev);
Saurav Dasa9d5f442019-03-06 19:32:48 -0800954 return;
955 }
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700956 }
Jonathan Hart4c538002018-08-23 10:11:54 -0700957
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700958 switch (event.type()) {
959 //TODO: Port handling and bookkeeping should be improved once
960 // olt firmware handles correct behaviour.
961 case PORT_ADDED:
Gamze Abaka838d8142019-02-21 07:06:55 +0000962 if (isUniPort(dev, port)) {
963 post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_ADDED, devId, port));
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000964
965 if (port.isEnabled() && !port.number().equals(PortNumber.LOCAL)) {
966 log.info("eapol will be sent for port added {}", port);
967 oltFlowService.processEapolFilteringObjectives(devId, port.number(), defaultBpId,
968 null,
969 VlanId.vlanId(EAPOL_DEFAULT_VLAN), true);
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700970 }
971 } else {
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000972 SubscriberAndDeviceInformation deviceInfo = getOltInfo(dev);
973 if (deviceInfo != null) {
974 oltFlowService.processNniFilteringObjectives(dev.id(), port.number(), true);
975 }
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700976 }
977 break;
978 case PORT_REMOVED:
Gamze Abaka838d8142019-02-21 07:06:55 +0000979 if (isUniPort(dev, port)) {
Gamze Abaka853bf252019-03-25 10:27:06 +0000980 removeSubscriber(new ConnectPoint(devId, port.number()));
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000981 log.info("eapol will be send for port removed", port);
982 oltFlowService.processEapolFilteringObjectives(devId, port.number(), defaultBpId,
983 null,
984 VlanId.vlanId(EAPOL_DEFAULT_VLAN), false);
Andy Bavier160e8682019-05-07 18:32:22 -0700985
Gamze Abaka838d8142019-02-21 07:06:55 +0000986 post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_REMOVED, devId, port));
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700987 }
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700988 break;
989 case PORT_UPDATED:
Gamze Abaka838d8142019-02-21 07:06:55 +0000990 if (!isUniPort(dev, port)) {
Matteo Scandolo632f0fc2018-09-07 12:21:45 -0700991 break;
992 }
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800993 ConnectPoint cp = new ConnectPoint(devId, port.number());
994 Collection<? extends UniTagInformation> uniTagInformationSet = programmedSubs.get(cp).value();
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000995 if (uniTagInformationSet == null || uniTagInformationSet.isEmpty()) {
996 if (port.isEnabled() && !port.number().equals(PortNumber.LOCAL)) {
997 log.info("eapol will be processed for port updated {}", port);
Matteo Scandolo27c471c2020-02-11 16:41:53 -0800998 oltFlowService.processEapolFilteringObjectives(devId, port.number(), defaultBpId,
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000999 null,
1000 VlanId.vlanId(EAPOL_DEFAULT_VLAN),
1001 port.isEnabled());
1002 }
1003 } else {
1004 log.info("eapol will be processed for port updated {}", port);
1005 uniTagInformationSet.forEach(uniTag ->
1006 oltFlowService.processEapolFilteringObjectives(devId, port.number(),
1007 uniTag.getUpstreamBandwidthProfile(), null,
1008 uniTag.getPonCTag(), port.isEnabled()));
1009 }
Gamze Abaka838d8142019-02-21 07:06:55 +00001010 if (port.isEnabled()) {
Gamze Abaka838d8142019-02-21 07:06:55 +00001011 post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_ADDED, devId, port));
Matteo Scandolo632f0fc2018-09-07 12:21:45 -07001012 } else {
Gamze Abaka838d8142019-02-21 07:06:55 +00001013 post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_REMOVED, devId, port));
Jonathan Hart1d34c8b2018-05-05 15:37:28 -07001014 }
alshabibbb83aa22016-02-10 15:08:23 -08001015 break;
Matteo Scandolo632f0fc2018-09-07 12:21:45 -07001016 case DEVICE_ADDED:
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001017 handleDeviceConnection(dev, true);
Matteo Scandolo632f0fc2018-09-07 12:21:45 -07001018 break;
1019 case DEVICE_REMOVED:
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001020 handleDeviceDisconnection(dev, true);
Matteo Scandolo632f0fc2018-09-07 12:21:45 -07001021 break;
1022 case DEVICE_AVAILABILITY_CHANGED:
1023 if (deviceService.isAvailable(devId)) {
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001024 handleDeviceConnection(dev, false);
Matteo Scandolo632f0fc2018-09-07 12:21:45 -07001025 } else {
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001026 handleDeviceDisconnection(dev, false);
Matteo Scandolo632f0fc2018-09-07 12:21:45 -07001027 }
1028 break;
Matteo Scandolo632f0fc2018-09-07 12:21:45 -07001029 default:
1030 return;
1031 }
1032 });
alshabibf0e7e702015-05-30 18:22:36 -07001033 }
Gamze Abaka838d8142019-02-21 07:06:55 +00001034
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001035 private void sendUniEvent(Device device, AccessDeviceEvent.Type eventType) {
1036 deviceService.getPorts(device.id()).stream()
1037 .filter(p -> !PortNumber.LOCAL.equals(p.number()))
1038 .filter(p -> isUniPort(device, p))
1039 .forEach(p -> post(new AccessDeviceEvent(eventType, device.id(), p)));
1040 }
1041
1042 private void handleDeviceDisconnection(Device device, boolean sendUniEvent) {
1043 programmedDevices.remove(device.id());
1044 removeAllSubscribers(device.id());
Jonathan Hart4f178fa2020-02-03 10:46:01 -08001045 oltMeterService.clearMeters(device.id());
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001046 post(new AccessDeviceEvent(
1047 AccessDeviceEvent.Type.DEVICE_DISCONNECTED, device.id(),
1048 null, null, null));
1049 if (sendUniEvent) {
1050 sendUniEvent(device, AccessDeviceEvent.Type.UNI_REMOVED);
Gamze Abaka838d8142019-02-21 07:06:55 +00001051 }
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001052 }
1053
1054 private void handleDeviceConnection(Device dev, boolean sendUniEvent) {
1055 post(new AccessDeviceEvent(
1056 AccessDeviceEvent.Type.DEVICE_CONNECTED, dev.id(),
1057 null, null, null));
1058 programmedDevices.add(dev.id());
1059 checkAndCreateDeviceFlows(dev);
1060 if (sendUniEvent) {
1061 sendUniEvent(dev, AccessDeviceEvent.Type.UNI_ADDED);
1062 }
Gamze Abaka838d8142019-02-21 07:06:55 +00001063 }
Gamze Abakada282b42019-03-11 13:16:48 +00001064
1065 private void removeAllSubscribers(DeviceId deviceId) {
Jonathan Hart4f178fa2020-02-03 10:46:01 -08001066 List<Map.Entry<ConnectPoint, UniTagInformation>> subs = programmedSubs.stream()
1067 .filter(e -> e.getKey().deviceId().equals(deviceId))
1068 .collect(toList());
Gamze Abakada282b42019-03-11 13:16:48 +00001069
Jonathan Hart4f178fa2020-02-03 10:46:01 -08001070 subs.forEach(e -> programmedSubs.remove(e.getKey(), e.getValue()));
Gamze Abakada282b42019-03-11 13:16:48 +00001071 }
Gamze Abaka641fc072018-09-04 09:16:27 +00001072
Gamze Abaka641fc072018-09-04 09:16:27 +00001073 }
Jonathan Hart4f178fa2020-02-03 10:46:01 -08001074
1075 private class InternalClusterListener implements ClusterEventListener {
1076
1077 @Override
1078 public void event(ClusterEvent event) {
1079 if (event.type() == ClusterEvent.Type.INSTANCE_READY) {
1080 hasher.addServer(event.subject().id());
1081 }
1082 if (event.type() == ClusterEvent.Type.INSTANCE_DEACTIVATED) {
1083 hasher.removeServer(event.subject().id());
1084 }
1085 }
1086 }
Hardik Windlass395ff372019-06-13 05:16:00 +00001087}