[VOL-3836] Extract the OLT pipeliners from ONOS
Change-Id: I0dc99aabcb17b46fc5dc8bbe8e3bbd5ece52058a
diff --git a/impl/pom.xml b/impl/pom.xml
new file mode 100644
index 0000000..99f4b4e
--- /dev/null
+++ b/impl/pom.xml
@@ -0,0 +1,104 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Copyright 2016-present Open Networking Foundation
+ ~
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.opencord</groupId>
+ <artifactId>olt</artifactId>
+ <version>4.4.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>olt-impl</artifactId>
+ <packaging>bundle</packaging>
+ <description>OLT application for CORD</description>
+
+ <properties>
+ <olt.api.version>${project.version}</olt.api.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.opencord</groupId>
+ <artifactId>olt-api</artifactId>
+ <version>${olt.api.version}</version>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.opencord</groupId>
+ <artifactId>sadis-api</artifactId>
+ <version>${sadis.api.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-cli</artifactId>
+ <version>${onos.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-api</artifactId>
+ <version>${onos.version}</version>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-core-serializers</artifactId>
+ <version>${onos.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.karaf.shell</groupId>
+ <artifactId>org.apache.karaf.shell.console</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.karaf.shell</groupId>
+ <artifactId>org.apache.karaf.shell.core</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-maven-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <configuration>
+ <instructions>
+ <Karaf-Commands>org.opencord.olt.cli</Karaf-Commands>
+ </instructions>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/impl/src/main/java/org/opencord/olt/cli/ShowBpMeterMappingsCommand.java b/impl/src/main/java/org/opencord/olt/cli/ShowBpMeterMappingsCommand.java
new file mode 100644
index 0000000..eb1b47d
--- /dev/null
+++ b/impl/src/main/java/org/opencord/olt/cli/ShowBpMeterMappingsCommand.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2016-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.olt.cli;
+
+import org.apache.karaf.shell.api.action.Command;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.onosproject.cli.AbstractShellCommand;
+import org.onosproject.net.meter.MeterKey;
+import org.opencord.olt.internalapi.AccessDeviceMeterService;
+
+import java.util.Collection;
+import java.util.Map;
+
+@Service
+@Command(scope = "onos", name = "volt-bpmeter-mappings",
+ description = "Shows information about bandwidthProfile-meterKey (device / meter) mappings")
+public class ShowBpMeterMappingsCommand extends AbstractShellCommand {
+
+ @Override
+ protected void doExecute() {
+ AccessDeviceMeterService service = AbstractShellCommand.get(AccessDeviceMeterService.class);
+ Map<String, Collection<MeterKey>> bpMeterMappings = service.getBpMeterMappings();
+ bpMeterMappings.forEach(this::display);
+ }
+
+ private void display(String bpInfo, Collection<MeterKey> meterKeyList) {
+ meterKeyList.forEach(meterKey ->
+ print("bpInfo=%s deviceId=%s meterId=%s",
+ bpInfo, meterKey.deviceId(), meterKey.meterId()));
+
+ }
+}
diff --git a/impl/src/main/java/org/opencord/olt/cli/ShowFailedSubscribersCommand.java b/impl/src/main/java/org/opencord/olt/cli/ShowFailedSubscribersCommand.java
new file mode 100644
index 0000000..5d6bf3d
--- /dev/null
+++ b/impl/src/main/java/org/opencord/olt/cli/ShowFailedSubscribersCommand.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2016-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.olt.cli;
+
+import org.apache.karaf.shell.api.action.Command;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.onosproject.cli.AbstractShellCommand;
+import org.onosproject.net.ConnectPoint;
+import org.opencord.olt.AccessDeviceService;
+import org.opencord.sadis.UniTagInformation;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Shows subscriber information for those subscriber which have been programmed
+ * in the data-plane.
+ */
+@Service
+@Command(scope = "onos", name = "volt-failed-subscribers",
+ description = "Shows subscribers awaiting for programming in the dataplane")
+public class ShowFailedSubscribersCommand extends AbstractShellCommand {
+
+ @Override
+ protected void doExecute() {
+ AccessDeviceService service = AbstractShellCommand.get(AccessDeviceService.class);
+ Map<ConnectPoint, Set<UniTagInformation>> info = service.getFailedSubs();
+ info.forEach(this::display);
+ }
+
+ private void display(ConnectPoint cp, Set<UniTagInformation> uniTagInformation) {
+ uniTagInformation.forEach(uniTag ->
+ print("location=%s tagInformation=%s", cp, uniTag));
+ }
+}
diff --git a/impl/src/main/java/org/opencord/olt/cli/ShowOltCommand.java b/impl/src/main/java/org/opencord/olt/cli/ShowOltCommand.java
new file mode 100644
index 0000000..08aaa48
--- /dev/null
+++ b/impl/src/main/java/org/opencord/olt/cli/ShowOltCommand.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2016-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.olt.cli;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.karaf.shell.api.action.Command;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.onosproject.cli.AbstractShellCommand;
+import org.onosproject.net.DeviceId;
+import org.opencord.olt.AccessDeviceService;
+
+import java.util.List;
+
+/**
+ * Shows configured OLTs.
+ */
+@Service
+@Command(scope = "onos", name = "volt-olts",
+ description = "Shows vOLTs connected to ONOS")
+public class ShowOltCommand extends AbstractShellCommand {
+
+ @Override
+ protected void doExecute() {
+ AccessDeviceService service = AbstractShellCommand.get(AccessDeviceService.class);
+ if (outputJson()) {
+ print("%s", json(service.fetchOlts()));
+ } else {
+ service.fetchOlts().forEach(did -> print("OLT %s", did));
+ }
+
+ }
+
+ /**
+ * Returns JSON node representing the specified olts.
+ *
+ * @param olts collection of olts
+ * @return JSON node
+ */
+ private JsonNode json(List<DeviceId> olts) {
+ ObjectMapper mapper = new ObjectMapper();
+ ObjectNode node = mapper.createObjectNode();
+ ArrayNode result = node.putArray("olts");
+ for (DeviceId olt : olts) {
+ result.add(olt.toString());
+ }
+ return node;
+ }
+}
diff --git a/impl/src/main/java/org/opencord/olt/cli/ShowProgrammedMetersCommand.java b/impl/src/main/java/org/opencord/olt/cli/ShowProgrammedMetersCommand.java
new file mode 100644
index 0000000..a7dbd39
--- /dev/null
+++ b/impl/src/main/java/org/opencord/olt/cli/ShowProgrammedMetersCommand.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2016-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.olt.cli;
+
+import org.apache.karaf.shell.api.action.Command;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.onosproject.cli.AbstractShellCommand;
+import org.onosproject.net.meter.MeterKey;
+import org.opencord.olt.internalapi.AccessDeviceMeterService;
+
+import java.util.Set;
+
+/**
+ * Shows information about device-meter mappings that have been programmed in the
+ * data-plane.
+ */
+@Service
+@Command(scope = "onos", name = "volt-programmed-meters",
+ description = "Shows device-meter mappings programmed in the data-plane")
+public class ShowProgrammedMetersCommand extends AbstractShellCommand {
+
+ @Override
+ protected void doExecute() {
+ AccessDeviceMeterService service = AbstractShellCommand.get(AccessDeviceMeterService.class);
+ Set<MeterKey> programmedMeters = service.getProgMeters();
+ programmedMeters.forEach(this::display);
+ }
+
+ private void display(MeterKey meterKey) {
+ print("device=%s meter=%s", meterKey.deviceId(), meterKey.meterId());
+ }
+}
diff --git a/impl/src/main/java/org/opencord/olt/cli/ShowProgrammedSubscribersCommand.java b/impl/src/main/java/org/opencord/olt/cli/ShowProgrammedSubscribersCommand.java
new file mode 100644
index 0000000..413272b
--- /dev/null
+++ b/impl/src/main/java/org/opencord/olt/cli/ShowProgrammedSubscribersCommand.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2016-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.olt.cli;
+
+import org.apache.karaf.shell.api.action.Command;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.onosproject.cli.AbstractShellCommand;
+import org.onosproject.net.ConnectPoint;
+import org.opencord.olt.AccessDeviceService;
+import org.opencord.sadis.UniTagInformation;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Shows subscriber information for those subscriber which have been programmed
+ * in the data-plane.
+ */
+@Service
+@Command(scope = "onos", name = "volt-programmed-subscribers",
+ description = "Shows subscribers programmed in the dataplane")
+public class ShowProgrammedSubscribersCommand extends AbstractShellCommand {
+
+ @Override
+ protected void doExecute() {
+ AccessDeviceService service = AbstractShellCommand.get(AccessDeviceService.class);
+ Map<ConnectPoint, Set<UniTagInformation>> info = service.getProgSubs();
+ info.forEach(this::display);
+ }
+
+ private void display(ConnectPoint cp, Set<UniTagInformation> uniTagInformation) {
+ uniTagInformation.forEach(uniTag ->
+ print("location=%s tagInformation=%s", cp, uniTag));
+ }
+}
diff --git a/impl/src/main/java/org/opencord/olt/cli/SubscriberAddCommand.java b/impl/src/main/java/org/opencord/olt/cli/SubscriberAddCommand.java
new file mode 100644
index 0000000..b1a8720
--- /dev/null
+++ b/impl/src/main/java/org/opencord/olt/cli/SubscriberAddCommand.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2016-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.olt.cli;
+
+import org.apache.karaf.shell.api.action.Argument;
+import org.apache.karaf.shell.api.action.Command;
+import org.apache.karaf.shell.api.action.Completion;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.onosproject.cli.AbstractShellCommand;
+import org.onosproject.cli.net.DeviceIdCompleter;
+import org.onosproject.cli.net.PortNumberCompleter;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.PortNumber;
+import org.opencord.olt.AccessDeviceService;
+
+/**
+ * Adds a subscriber to an access device.
+ */
+@Service
+@Command(scope = "onos", name = "volt-add-subscriber-access",
+ description = "Adds a subscriber to an access device")
+public class SubscriberAddCommand extends AbstractShellCommand {
+
+ @Argument(index = 0, name = "deviceId", description = "Access device ID",
+ required = true, multiValued = false)
+ @Completion(DeviceIdCompleter.class)
+ private String strDeviceId = null;
+
+ @Argument(index = 1, name = "port", description = "Subscriber port number",
+ required = true, multiValued = false)
+ @Completion(PortNumberCompleter.class)
+ private String strPort = null;
+
+ @Override
+ protected void doExecute() {
+ AccessDeviceService service = AbstractShellCommand.get(AccessDeviceService.class);
+
+ DeviceId deviceId = DeviceId.deviceId(strDeviceId);
+ PortNumber port = PortNumber.portNumber(strPort);
+ ConnectPoint connectPoint = new ConnectPoint(deviceId, port);
+
+ service.provisionSubscriber(connectPoint);
+ }
+}
diff --git a/impl/src/main/java/org/opencord/olt/cli/SubscriberRemoveCommand.java b/impl/src/main/java/org/opencord/olt/cli/SubscriberRemoveCommand.java
new file mode 100644
index 0000000..79a7369
--- /dev/null
+++ b/impl/src/main/java/org/opencord/olt/cli/SubscriberRemoveCommand.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2016-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.olt.cli;
+
+import org.apache.karaf.shell.api.action.Argument;
+import org.apache.karaf.shell.api.action.Command;
+import org.apache.karaf.shell.api.action.Completion;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.onosproject.cli.AbstractShellCommand;
+import org.onosproject.cli.net.DeviceIdCompleter;
+import org.onosproject.cli.net.PortNumberCompleter;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.PortNumber;
+import org.opencord.olt.AccessDeviceService;
+
+/**
+ * Adds a subscriber to an access device.
+ */
+@Service
+@Command(scope = "onos", name = "volt-remove-subscriber-access",
+ description = "Removes a subscriber to an access device")
+public class SubscriberRemoveCommand extends AbstractShellCommand {
+
+ @Argument(index = 0, name = "deviceId", description = "Access device ID",
+ required = true, multiValued = false)
+ @Completion(DeviceIdCompleter.class)
+ private String strDeviceId = null;
+
+ @Argument(index = 1, name = "port", description = "Subscriber port number",
+ required = true, multiValued = false)
+ @Completion(PortNumberCompleter.class)
+ private String strPort = null;
+
+ @Override
+ protected void doExecute() {
+ AccessDeviceService service = AbstractShellCommand.get(AccessDeviceService.class);
+
+ DeviceId deviceId = DeviceId.deviceId(strDeviceId);
+ PortNumber port = PortNumber.portNumber(strPort);
+ ConnectPoint connectPoint = new ConnectPoint(deviceId, port);
+
+ service.removeSubscriber(connectPoint);
+
+ }
+}
diff --git a/impl/src/main/java/org/opencord/olt/cli/UniTagAddCommand.java b/impl/src/main/java/org/opencord/olt/cli/UniTagAddCommand.java
new file mode 100644
index 0000000..4eb6495
--- /dev/null
+++ b/impl/src/main/java/org/opencord/olt/cli/UniTagAddCommand.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2016-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.olt.cli;
+
+import org.apache.karaf.shell.api.action.Argument;
+import org.apache.karaf.shell.api.action.Command;
+import org.apache.karaf.shell.api.action.Option;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.onlab.packet.VlanId;
+import org.onosproject.cli.AbstractShellCommand;
+import org.opencord.olt.AccessDeviceService;
+import org.opencord.olt.AccessSubscriberId;
+
+import java.util.Optional;
+
+/**
+ * Adds a subscriber uni tag.
+ */
+@Service
+@Command(scope = "onos", name = "volt-add-subscriber-unitag",
+ description = "Adds a uni tag to an access device")
+public class UniTagAddCommand extends AbstractShellCommand {
+
+ @Argument(index = 0, name = "portName", description = "Port name",
+ required = true, multiValued = false)
+ private String strPortName = null;
+
+ @Option(name = "--cTag", description = "Inner vlan id",
+ required = false, multiValued = false)
+ private String strCtag = null;
+
+ @Option(name = "--sTag", description = "Outer vlan id",
+ required = false, multiValued = false)
+ private String strStag = null;
+
+ @Option(name = "--tpId", description = "Technology profile id",
+ required = false, multiValued = false)
+ private String strTpId = null;
+
+ @Override
+ protected void doExecute() {
+
+ AccessDeviceService service = AbstractShellCommand.get(AccessDeviceService.class);
+ AccessSubscriberId portName = new AccessSubscriberId(strPortName);
+
+ Optional<VlanId> cTag = strCtag == null ? Optional.empty() : Optional.of(VlanId.vlanId(strCtag));
+ Optional<VlanId> sTag = strStag == null ? Optional.empty() : Optional.of(VlanId.vlanId(strStag));
+ Optional<Integer> tpId = strTpId == null ? Optional.empty() : Optional.of(Integer.parseInt(strTpId));
+ service.provisionSubscriber(portName, sTag, cTag, tpId);
+ }
+}
diff --git a/impl/src/main/java/org/opencord/olt/cli/UniTagRemoveCommand.java b/impl/src/main/java/org/opencord/olt/cli/UniTagRemoveCommand.java
new file mode 100644
index 0000000..e256914
--- /dev/null
+++ b/impl/src/main/java/org/opencord/olt/cli/UniTagRemoveCommand.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2016-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.olt.cli;
+
+import org.apache.karaf.shell.api.action.Argument;
+import org.apache.karaf.shell.api.action.Command;
+import org.apache.karaf.shell.api.action.Option;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.onlab.packet.VlanId;
+import org.onosproject.cli.AbstractShellCommand;
+import org.opencord.olt.AccessDeviceService;
+import org.opencord.olt.AccessSubscriberId;
+
+import java.util.Optional;
+
+/**
+ * Removes a uni tag from a subscriber (portname).
+ */
+@Service
+@Command(scope = "onos", name = "volt-remove-subscriber-unitag",
+ description = "Removes a uni tag from an access device")
+public class UniTagRemoveCommand extends AbstractShellCommand {
+
+ @Argument(index = 0, name = "portName", description = "Port name",
+ required = true, multiValued = false)
+ private String strPortName = null;
+
+ @Option(name = "--cTag", description = "Inner vlan id",
+ required = false, multiValued = false)
+ private String strCtag = null;
+
+ @Option(name = "--sTag", description = "Outer vlan id",
+ required = false, multiValued = false)
+ private String strStag = null;
+
+ @Option(name = "--tpId", description = "Technology profile id",
+ required = false, multiValued = false)
+ private String strTpId = null;
+
+ @Override
+ protected void doExecute() {
+
+ AccessDeviceService service = AbstractShellCommand.get(AccessDeviceService.class);
+ AccessSubscriberId portName = new AccessSubscriberId(strPortName);
+
+ Optional<VlanId> cTag = strCtag == null ? Optional.empty() : Optional.of(VlanId.vlanId(strCtag));
+ Optional<VlanId> sTag = strStag == null ? Optional.empty() : Optional.of(VlanId.vlanId(strStag));
+ Optional<Integer> tpId = strTpId == null ? Optional.empty() : Optional.of(Integer.parseInt(strTpId));
+ service.removeSubscriber(portName, sTag, cTag, tpId);
+ }
+}
diff --git a/impl/src/main/java/org/opencord/olt/cli/package-info.java b/impl/src/main/java/org/opencord/olt/cli/package-info.java
new file mode 100644
index 0000000..b100077
--- /dev/null
+++ b/impl/src/main/java/org/opencord/olt/cli/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2016-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * OLT application handling PMC OLT hardware.
+ */
+package org.opencord.olt.cli;
diff --git a/impl/src/main/java/org/opencord/olt/driver/NokiaOltPipeline.java b/impl/src/main/java/org/opencord/olt/driver/NokiaOltPipeline.java
new file mode 100644
index 0000000..f8b9009
--- /dev/null
+++ b/impl/src/main/java/org/opencord/olt/driver/NokiaOltPipeline.java
@@ -0,0 +1,783 @@
+/*
+ * Copyright 2016-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.olt.driver;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalCause;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.onlab.osgi.ServiceDirectory;
+import org.onlab.packet.EthType;
+import org.onlab.packet.IPv4;
+import org.onlab.packet.VlanId;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.behaviour.NextGroup;
+import org.onosproject.net.behaviour.Pipeliner;
+import org.onosproject.net.behaviour.PipelinerContext;
+import org.onosproject.net.driver.AbstractHandlerBehaviour;
+import org.onosproject.net.flow.DefaultFlowRule;
+import org.onosproject.net.flow.DefaultTrafficSelector;
+import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.FlowRule;
+import org.onosproject.net.flow.FlowRuleOperations;
+import org.onosproject.net.flow.FlowRuleOperationsContext;
+import org.onosproject.net.flow.FlowRuleService;
+import org.onosproject.net.flow.TrafficSelector;
+import org.onosproject.net.flow.TrafficTreatment;
+import org.onosproject.net.flow.criteria.Criteria;
+import org.onosproject.net.flow.criteria.Criterion;
+import org.onosproject.net.flow.criteria.EthTypeCriterion;
+import org.onosproject.net.flow.criteria.IPCriterion;
+import org.onosproject.net.flow.criteria.IPProtocolCriterion;
+import org.onosproject.net.flow.criteria.PortCriterion;
+import org.onosproject.net.flow.criteria.VlanIdCriterion;
+import org.onosproject.net.flow.instructions.Instruction;
+import org.onosproject.net.flow.instructions.Instructions;
+import org.onosproject.net.flow.instructions.L2ModificationInstruction;
+import org.onosproject.net.flowobjective.FilteringObjective;
+import org.onosproject.net.flowobjective.FlowObjectiveStore;
+import org.onosproject.net.flowobjective.ForwardingObjective;
+import org.onosproject.net.flowobjective.NextObjective;
+import org.onosproject.net.flowobjective.Objective;
+import org.onosproject.net.flowobjective.ObjectiveError;
+import org.onosproject.net.group.DefaultGroupBucket;
+import org.onosproject.net.group.DefaultGroupDescription;
+import org.onosproject.net.group.DefaultGroupKey;
+import org.onosproject.net.group.Group;
+import org.onosproject.net.group.GroupBucket;
+import org.onosproject.net.group.GroupBuckets;
+import org.onosproject.net.group.GroupDescription;
+import org.onosproject.net.group.GroupEvent;
+import org.onosproject.net.group.GroupKey;
+import org.onosproject.net.group.GroupListener;
+import org.onosproject.net.group.GroupService;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.StorageService;
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Pipeliner for OLT device.
+ */
+
+public class NokiaOltPipeline extends AbstractHandlerBehaviour implements Pipeliner {
+
+ private static final Integer QQ_TABLE = 1;
+ private static final short MCAST_VLAN = 4000;
+ private static final String OLTCOOKIES = "olt-cookies-must-be-unique";
+ private static final int EAPOL_FLOW_PRIORITY = 1200;
+ private final Logger log = getLogger(getClass());
+
+ private ServiceDirectory serviceDirectory;
+ private FlowRuleService flowRuleService;
+ private GroupService groupService;
+ private CoreService coreService;
+ private StorageService storageService;
+
+ private DeviceId deviceId;
+ private ApplicationId appId;
+
+
+ protected FlowObjectiveStore flowObjectiveStore;
+
+ private Cache<GroupKey, NextObjective> pendingGroups;
+
+ protected static KryoNamespace appKryo = new KryoNamespace.Builder()
+ .register(KryoNamespaces.API)
+ .register(GroupKey.class)
+ .register(DefaultGroupKey.class)
+ .register(OltPipelineGroup.class)
+ .build("OltPipeline");
+ @Override
+ public void init(DeviceId deviceId, PipelinerContext context) {
+ log.debug("Initiate OLT pipeline");
+ this.serviceDirectory = context.directory();
+ this.deviceId = deviceId;
+
+ flowRuleService = serviceDirectory.get(FlowRuleService.class);
+ coreService = serviceDirectory.get(CoreService.class);
+ groupService = serviceDirectory.get(GroupService.class);
+ flowObjectiveStore = context.store();
+ storageService = serviceDirectory.get(StorageService.class);
+
+ appId = coreService.registerApplication(
+ "org.onosproject.driver.OLTPipeline");
+
+
+ pendingGroups = CacheBuilder.newBuilder()
+ .expireAfterWrite(20, TimeUnit.SECONDS)
+ .removalListener((RemovalNotification<GroupKey, NextObjective> notification) -> {
+ if (notification.getCause() == RemovalCause.EXPIRED) {
+ fail(notification.getValue(), ObjectiveError.GROUPINSTALLATIONFAILED);
+ }
+ }).build();
+
+ groupService.addListener(new InnerGroupListener());
+
+ }
+
+ @Override
+ public void filter(FilteringObjective filter) {
+ Instructions.OutputInstruction output;
+
+ if (filter.meta() != null && !filter.meta().immediate().isEmpty()) {
+ output = (Instructions.OutputInstruction) filter.meta().immediate().stream()
+ .filter(t -> t.type().equals(Instruction.Type.OUTPUT))
+ .limit(1)
+ .findFirst().get();
+
+ if (output == null || !output.port().equals(PortNumber.CONTROLLER)) {
+ log.error("OLT can only filter packet to controller");
+ fail(filter, ObjectiveError.UNSUPPORTED);
+ return;
+ }
+ } else {
+ fail(filter, ObjectiveError.BADPARAMS);
+ return;
+ }
+
+ if (filter.key().type() != Criterion.Type.IN_PORT) {
+ fail(filter, ObjectiveError.BADPARAMS);
+ return;
+ }
+
+ EthTypeCriterion ethType = (EthTypeCriterion)
+ filterForCriterion(filter.conditions(), Criterion.Type.ETH_TYPE);
+
+ if (ethType == null) {
+ fail(filter, ObjectiveError.BADPARAMS);
+ return;
+ }
+
+ if (ethType.ethType().equals(EthType.EtherType.EAPOL.ethType())) {
+ provisionEapol(filter, ethType, output);
+ } else if (ethType.ethType().equals(EthType.EtherType.IPV4.ethType())) {
+ IPProtocolCriterion ipProto = (IPProtocolCriterion)
+ filterForCriterion(filter.conditions(), Criterion.Type.IP_PROTO);
+ if (ipProto.protocol() == IPv4.PROTOCOL_IGMP) {
+ provisionIgmp(filter, ethType, ipProto, output);
+ } else {
+ log.error("OLT can only filter igmp");
+ fail(filter, ObjectiveError.UNSUPPORTED);
+ }
+ } else {
+ log.error("OLT can only filter eapol and igmp");
+ fail(filter, ObjectiveError.UNSUPPORTED);
+ }
+
+ }
+
+ private void installObjective(FlowRule.Builder ruleBuilder, Objective objective) {
+ FlowRuleOperations.Builder flowBuilder = FlowRuleOperations.builder();
+ switch (objective.op()) {
+
+ case ADD:
+ flowBuilder.add(ruleBuilder.build());
+ break;
+ case REMOVE:
+ flowBuilder.remove(ruleBuilder.build());
+ break;
+ default:
+ log.warn("Unknown operation {}", objective.op());
+ }
+
+ flowRuleService.apply(flowBuilder.build(new FlowRuleOperationsContext() {
+ @Override
+ public void onSuccess(FlowRuleOperations ops) {
+ objective.context().ifPresent(context -> context.onSuccess(objective));
+ }
+
+ @Override
+ public void onError(FlowRuleOperations ops) {
+ objective.context()
+ .ifPresent(context -> context.onError(objective, ObjectiveError.FLOWINSTALLATIONFAILED));
+ }
+ }));
+ }
+
+ @Override
+ public void forward(ForwardingObjective fwd) {
+
+ if (checkForMulticast(fwd)) {
+ processMulticastRule(fwd);
+ return;
+ }
+
+ if (checkForEapol(fwd)) {
+ log.warn("Discarding EAPOL flow which is not supported on this pipeline");
+ return;
+ }
+
+ TrafficTreatment treatment = fwd.treatment();
+
+ List<Instruction> instructions = treatment.allInstructions();
+
+ Optional<Instruction> vlanIntruction = instructions.stream()
+ .filter(i -> i.type() == Instruction.Type.L2MODIFICATION)
+ .filter(i -> ((L2ModificationInstruction) i).subtype() ==
+ L2ModificationInstruction.L2SubType.VLAN_PUSH ||
+ ((L2ModificationInstruction) i).subtype() ==
+ L2ModificationInstruction.L2SubType.VLAN_POP)
+ .findAny();
+
+ if (vlanIntruction.isPresent()) {
+ L2ModificationInstruction vlanIns =
+ (L2ModificationInstruction) vlanIntruction.get();
+
+ if (vlanIns.subtype() == L2ModificationInstruction.L2SubType.VLAN_PUSH) {
+ installUpstreamRules(fwd);
+ } else if (vlanIns.subtype() == L2ModificationInstruction.L2SubType.VLAN_POP) {
+ installDownstreamRules(fwd);
+ } else {
+ log.error("Unknown OLT operation: {}", fwd);
+ fail(fwd, ObjectiveError.UNSUPPORTED);
+ return;
+ }
+
+ pass(fwd);
+ } else {
+ TrafficSelector selector = fwd.selector();
+
+ if (fwd.treatment() != null) {
+ // Deal with SPECIFIC and VERSATILE in the same manner.
+ FlowRule.Builder ruleBuilder = DefaultFlowRule.builder()
+ .forDevice(deviceId)
+ .withSelector(selector)
+ .fromApp(fwd.appId())
+ .withPriority(fwd.priority())
+ .withTreatment(fwd.treatment());
+
+ if (fwd.permanent()) {
+ ruleBuilder.makePermanent();
+ } else {
+ ruleBuilder.makeTemporary(fwd.timeout());
+ }
+ installObjective(ruleBuilder, fwd);
+
+ } else {
+ log.error("No treatment error: {}", fwd);
+ fail(fwd, ObjectiveError.UNSUPPORTED);
+ }
+ }
+
+ }
+
+
+ @Override
+ public void next(NextObjective nextObjective) {
+ if (nextObjective.type() != NextObjective.Type.BROADCAST) {
+ log.error("OLT only supports broadcast groups.");
+ fail(nextObjective, ObjectiveError.BADPARAMS);
+ }
+
+ if (nextObjective.next().size() != 1) {
+ log.error("OLT only supports singleton broadcast groups.");
+ fail(nextObjective, ObjectiveError.BADPARAMS);
+ }
+
+ TrafficTreatment treatment = nextObjective.next().stream().findFirst().get();
+
+
+ GroupBucket bucket = DefaultGroupBucket.createAllGroupBucket(treatment);
+ GroupKey key = new DefaultGroupKey(appKryo.serialize(nextObjective.id()));
+
+
+ pendingGroups.put(key, nextObjective);
+
+ switch (nextObjective.op()) {
+ case ADD:
+ GroupDescription groupDesc =
+ new DefaultGroupDescription(deviceId,
+ GroupDescription.Type.ALL,
+ new GroupBuckets(Collections.singletonList(bucket)),
+ key,
+ null,
+ nextObjective.appId());
+ groupService.addGroup(groupDesc);
+ break;
+ case REMOVE:
+ groupService.removeGroup(deviceId, key, nextObjective.appId());
+ break;
+ case ADD_TO_EXISTING:
+ groupService.addBucketsToGroup(deviceId, key,
+ new GroupBuckets(Collections.singletonList(bucket)),
+ key, nextObjective.appId());
+ break;
+ case REMOVE_FROM_EXISTING:
+ groupService.removeBucketsFromGroup(deviceId, key,
+ new GroupBuckets(Collections.singletonList(bucket)),
+ key, nextObjective.appId());
+ break;
+ default:
+ log.warn("Unknown next objective operation: {}", nextObjective.op());
+ }
+
+
+ }
+
+ private void processMulticastRule(ForwardingObjective fwd) {
+ if (fwd.nextId() == null) {
+ log.error("Multicast objective does not have a next id");
+ fail(fwd, ObjectiveError.BADPARAMS);
+ }
+
+ GroupKey key = getGroupForNextObjective(fwd.nextId());
+
+ if (key == null) {
+ log.error("Group for forwarding objective missing: {}", fwd);
+ fail(fwd, ObjectiveError.GROUPMISSING);
+ }
+
+ Group group = groupService.getGroup(deviceId, key);
+ TrafficTreatment treatment =
+ buildTreatment(Instructions.createGroup(group.id()));
+
+ FlowRule rule = DefaultFlowRule.builder()
+ .fromApp(fwd.appId())
+ .forDevice(deviceId)
+ .forTable(0)
+ .makePermanent()
+ .withPriority(fwd.priority())
+ .withSelector(fwd.selector())
+ .withTreatment(treatment)
+ .build();
+
+ FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
+ switch (fwd.op()) {
+
+ case ADD:
+ builder.add(rule);
+ break;
+ case REMOVE:
+ builder.remove(rule);
+ break;
+ case ADD_TO_EXISTING:
+ case REMOVE_FROM_EXISTING:
+ break;
+ default:
+ log.warn("Unknown forwarding operation: {}", fwd.op());
+ }
+
+ applyFlowRules(builder, fwd);
+
+ }
+
+ private boolean checkForMulticast(ForwardingObjective fwd) {
+
+ IPCriterion ip = (IPCriterion) filterForCriterion(fwd.selector().criteria(),
+ Criterion.Type.IPV4_DST);
+
+ if (ip == null) {
+ return false;
+ }
+
+ return ip.ip().isMulticast();
+
+ }
+
+ private boolean checkForEapol(ForwardingObjective fwd) {
+ EthTypeCriterion ethType = (EthTypeCriterion)
+ filterForCriterion(fwd.selector().criteria(), Criterion.Type.ETH_TYPE);
+
+ return ethType != null && ethType.ethType().equals(EthType.EtherType.EAPOL.ethType());
+ }
+ private GroupKey getGroupForNextObjective(Integer nextId) {
+ NextGroup next = flowObjectiveStore.getNextGroup(nextId);
+ return appKryo.deserialize(next.data());
+
+ }
+
+ private void installDownstreamRules(ForwardingObjective fwd) {
+ List<Pair<Instruction, Instruction>> vlanOps =
+ vlanOps(fwd,
+ L2ModificationInstruction.L2SubType.VLAN_POP);
+
+ if (vlanOps == null) {
+ return;
+ }
+
+ Instructions.OutputInstruction output = (Instructions.OutputInstruction) fetchOutput(fwd, "downstream");
+
+ if (output == null) {
+ return;
+ }
+
+ Pair<Instruction, Instruction> popAndRewrite = vlanOps.remove(0);
+
+ TrafficSelector selector = fwd.selector();
+
+ Criterion outerVlan = selector.getCriterion(Criterion.Type.VLAN_VID);
+ Criterion innerVlan = selector.getCriterion(Criterion.Type.INNER_VLAN_VID);
+ Criterion inport = selector.getCriterion(Criterion.Type.IN_PORT);
+ Criterion bullshit = Criteria.matchMetadata(output.port().toLong());
+
+ if (outerVlan == null || innerVlan == null || inport == null) {
+ log.error("Forwarding objective is underspecified: {}", fwd);
+ fail(fwd, ObjectiveError.BADPARAMS);
+ return;
+ }
+
+ Criterion innerVid = Criteria.matchVlanId(((VlanIdCriterion) innerVlan).vlanId());
+
+ FlowRule.Builder outer = DefaultFlowRule.builder()
+ .fromApp(fwd.appId())
+ .forDevice(deviceId)
+ .makePermanent()
+ .withPriority(fwd.priority())
+ .withSelector(buildSelector(inport, outerVlan, bullshit))
+ .withTreatment(buildTreatment(popAndRewrite.getLeft(),
+ Instructions.transition(QQ_TABLE)));
+
+ FlowRule.Builder inner = DefaultFlowRule.builder()
+ .fromApp(fwd.appId())
+ .forDevice(deviceId)
+ .forTable(QQ_TABLE)
+ .makePermanent()
+ .withPriority(fwd.priority())
+ .withSelector(buildSelector(inport, innerVid))
+ .withTreatment(buildTreatment(popAndRewrite.getLeft(),
+ output));
+
+ applyRules(fwd, inner, outer);
+
+ }
+
+ private boolean hasUntaggedVlanTag(TrafficSelector selector) {
+ Iterator<Criterion> iter = selector.criteria().iterator();
+
+ while (iter.hasNext()) {
+ Criterion criterion = iter.next();
+ if (criterion.type() == Criterion.Type.VLAN_VID &&
+ ((VlanIdCriterion) criterion).vlanId().toShort() == VlanId.UNTAGGED) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ private void installUpstreamRules(ForwardingObjective fwd) {
+ List<Pair<Instruction, Instruction>> vlanOps =
+ vlanOps(fwd,
+ L2ModificationInstruction.L2SubType.VLAN_PUSH);
+ FlowRule.Builder inner;
+
+ if (vlanOps == null) {
+ return;
+ }
+
+ Instruction output = fetchOutput(fwd, "upstream");
+
+ if (output == null) {
+ return;
+ }
+
+ Pair<Instruction, Instruction> innerPair = vlanOps.remove(0);
+
+ Pair<Instruction, Instruction> outerPair = vlanOps.remove(0);
+
+
+ if (hasUntaggedVlanTag(fwd.selector())) {
+ inner = DefaultFlowRule.builder()
+ .fromApp(fwd.appId())
+ .forDevice(deviceId)
+ .makePermanent()
+ .withPriority(fwd.priority())
+ .withSelector(fwd.selector())
+ .withTreatment(buildTreatment(innerPair.getLeft(),
+ innerPair.getRight(),
+ Instructions.transition(QQ_TABLE)));
+ } else {
+ inner = DefaultFlowRule.builder()
+ .fromApp(fwd.appId())
+ .forDevice(deviceId)
+ .makePermanent()
+ .withPriority(fwd.priority())
+ .withSelector(fwd.selector())
+ .withTreatment(buildTreatment(
+ innerPair.getRight(),
+ Instructions.transition(QQ_TABLE)));
+ }
+
+
+ PortCriterion inPort = (PortCriterion)
+ fwd.selector().getCriterion(Criterion.Type.IN_PORT);
+
+ VlanId cVlanId = ((L2ModificationInstruction.ModVlanIdInstruction)
+ innerPair.getRight()).vlanId();
+
+ FlowRule.Builder outer = DefaultFlowRule.builder()
+ .fromApp(fwd.appId())
+ .forDevice(deviceId)
+ .forTable(QQ_TABLE)
+ .makePermanent()
+ .withPriority(fwd.priority())
+ .withSelector(buildSelector(inPort,
+ Criteria.matchVlanId(cVlanId)))
+ .withTreatment(buildTreatment(outerPair.getLeft(),
+ outerPair.getRight(),
+ output));
+
+ applyRules(fwd, inner, outer);
+
+ }
+
+ private Instruction fetchOutput(ForwardingObjective fwd, String direction) {
+ Instruction output = fwd.treatment().allInstructions().stream()
+ .filter(i -> i.type() == Instruction.Type.OUTPUT)
+ .findFirst().orElse(null);
+
+ if (output == null) {
+ log.error("OLT {} rule has no output", direction);
+ fail(fwd, ObjectiveError.BADPARAMS);
+ return null;
+ }
+ return output;
+ }
+
+ private List<Pair<Instruction, Instruction>> vlanOps(ForwardingObjective fwd,
+ L2ModificationInstruction.L2SubType type) {
+
+ List<Pair<Instruction, Instruction>> vlanOps = findVlanOps(
+ fwd.treatment().allInstructions(), type);
+
+ if (vlanOps == null) {
+ String direction = type == L2ModificationInstruction.L2SubType.VLAN_POP
+ ? "downstream" : "upstream";
+ log.error("Missing vlan operations in {} forwarding: {}", direction, fwd);
+ fail(fwd, ObjectiveError.BADPARAMS);
+ return null;
+ }
+ return vlanOps;
+ }
+
+
+ private List<Pair<Instruction, Instruction>> findVlanOps(List<Instruction> instructions,
+ L2ModificationInstruction.L2SubType type) {
+
+ List<Instruction> vlanPushs = findL2Instructions(
+ type,
+ instructions);
+ List<Instruction> vlanSets = findL2Instructions(
+ L2ModificationInstruction.L2SubType.VLAN_ID,
+ instructions);
+
+ if (vlanPushs.size() != vlanSets.size()) {
+ return null;
+ }
+
+ List<Pair<Instruction, Instruction>> pairs = Lists.newArrayList();
+
+ for (int i = 0; i < vlanPushs.size(); i++) {
+ pairs.add(new ImmutablePair<>(vlanPushs.get(i), vlanSets.get(i)));
+ }
+ return pairs;
+ }
+
+ private List<Instruction> findL2Instructions(L2ModificationInstruction.L2SubType subType,
+ List<Instruction> actions) {
+ return actions.stream()
+ .filter(i -> i.type() == Instruction.Type.L2MODIFICATION)
+ .filter(i -> ((L2ModificationInstruction) i).subtype() == subType)
+ .collect(Collectors.toList());
+ }
+
+ private void provisionEapol(FilteringObjective filter,
+ EthTypeCriterion ethType,
+ Instructions.OutputInstruction output) {
+
+ TrafficSelector selector = buildSelector(filter.key(), ethType);
+ TrafficTreatment treatment = buildTreatment(output);
+ buildAndApplyRule(filter, selector, treatment, EAPOL_FLOW_PRIORITY);
+
+ }
+
+ private void provisionIgmp(FilteringObjective filter, EthTypeCriterion ethType,
+ IPProtocolCriterion ipProto,
+ Instructions.OutputInstruction output) {
+ TrafficSelector selector = buildSelector(filter.key(), ethType, ipProto);
+ TrafficTreatment treatment = buildTreatment(output);
+ buildAndApplyRule(filter, selector, treatment);
+ }
+
+ private void buildAndApplyRule(FilteringObjective filter, TrafficSelector selector,
+ TrafficTreatment treatment) {
+ buildAndApplyRule(filter, selector, treatment, filter.priority());
+ }
+
+ private void buildAndApplyRule(FilteringObjective filter, TrafficSelector selector,
+ TrafficTreatment treatment, int priority) {
+ FlowRule rule = DefaultFlowRule.builder()
+ .fromApp(filter.appId())
+ .forDevice(deviceId)
+ .forTable(0)
+ .makePermanent()
+ .withSelector(selector)
+ .withTreatment(treatment)
+ .withPriority(priority)
+ .build();
+
+ FlowRuleOperations.Builder opsBuilder = FlowRuleOperations.builder();
+
+ switch (filter.type()) {
+ case PERMIT:
+ opsBuilder.add(rule);
+ break;
+ case DENY:
+ opsBuilder.remove(rule);
+ break;
+ default:
+ log.warn("Unknown filter type : {}", filter.type());
+ fail(filter, ObjectiveError.UNSUPPORTED);
+ }
+
+ applyFlowRules(opsBuilder, filter);
+ }
+
+ private void applyRules(ForwardingObjective fwd,
+ FlowRule.Builder inner, FlowRule.Builder outer) {
+ FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
+ switch (fwd.op()) {
+ case ADD:
+ builder.add(inner.build()).add(outer.build());
+ break;
+ case REMOVE:
+ builder.remove(inner.build()).remove(outer.build());
+ break;
+ case ADD_TO_EXISTING:
+ break;
+ case REMOVE_FROM_EXISTING:
+ break;
+ default:
+ log.warn("Unknown forwarding operation: {}", fwd.op());
+ }
+
+ applyFlowRules(builder, fwd);
+ }
+
+ private void applyFlowRules(FlowRuleOperations.Builder builder,
+ Objective objective) {
+ flowRuleService.apply(builder.build(new FlowRuleOperationsContext() {
+ @Override
+ public void onSuccess(FlowRuleOperations ops) {
+ pass(objective);
+ }
+
+ @Override
+ public void onError(FlowRuleOperations ops) {
+ fail(objective, ObjectiveError.FLOWINSTALLATIONFAILED);
+ }
+ }));
+ }
+
+ private Criterion filterForCriterion(Collection<Criterion> criteria, Criterion.Type type) {
+ return criteria.stream()
+ .filter(c -> c.type().equals(type))
+ .limit(1)
+ .findFirst().orElse(null);
+ }
+
+ private TrafficSelector buildSelector(Criterion... criteria) {
+
+
+ TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder();
+
+ for (Criterion c : criteria) {
+ sBuilder.add(c);
+ }
+
+ return sBuilder.build();
+ }
+
+ private TrafficTreatment buildTreatment(Instruction... instructions) {
+
+
+ TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
+
+ for (Instruction i : instructions) {
+ tBuilder.add(i);
+ }
+
+ return tBuilder.build();
+ }
+
+
+ private void fail(Objective obj, ObjectiveError error) {
+ obj.context().ifPresent(context -> context.onError(obj, error));
+ }
+
+ private void pass(Objective obj) {
+ obj.context().ifPresent(context -> context.onSuccess(obj));
+ }
+
+
+ private class InnerGroupListener implements GroupListener {
+ @Override
+ public void event(GroupEvent event) {
+ if (event.type() == GroupEvent.Type.GROUP_ADDED || event.type() == GroupEvent.Type.GROUP_UPDATED) {
+ GroupKey key = event.subject().appCookie();
+
+ NextObjective obj = pendingGroups.getIfPresent(key);
+ if (obj != null) {
+ flowObjectiveStore.putNextGroup(obj.id(), new OltPipelineGroup(key));
+ pass(obj);
+ pendingGroups.invalidate(key);
+ }
+ }
+ }
+ }
+
+ private static class OltPipelineGroup implements NextGroup {
+
+ private final GroupKey key;
+
+ public OltPipelineGroup(GroupKey key) {
+ this.key = key;
+ }
+
+ public GroupKey key() {
+ return key;
+ }
+
+ @Override
+ public byte[] data() {
+ return appKryo.serialize(key);
+ }
+
+ }
+
+ @Override
+ public List<String> getNextMappings(NextGroup nextGroup) {
+ // TODO Implementation deferred to vendor
+ return null;
+ }
+}
diff --git a/impl/src/main/java/org/opencord/olt/driver/OltDriversLoader.java b/impl/src/main/java/org/opencord/olt/driver/OltDriversLoader.java
new file mode 100644
index 0000000..a91621f
--- /dev/null
+++ b/impl/src/main/java/org/opencord/olt/driver/OltDriversLoader.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.olt.driver;
+
+import org.onosproject.net.driver.AbstractDriverLoader;
+import org.osgi.service.component.annotations.Component;
+import org.slf4j.Logger;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Loader for olt device drivers.
+ */
+@Component(immediate = true)
+public class OltDriversLoader extends AbstractDriverLoader {
+
+ private final Logger log = getLogger(getClass());
+
+ public OltDriversLoader() {
+ super("/olt-drivers.xml");
+ }
+
+ @Override
+ public void activate() {
+ super.activate();
+ }
+
+ @Override
+ public void deactivate() {
+ super.deactivate();
+ }
+}
diff --git a/impl/src/main/java/org/opencord/olt/driver/OltPipeline.java b/impl/src/main/java/org/opencord/olt/driver/OltPipeline.java
new file mode 100644
index 0000000..d5beb63
--- /dev/null
+++ b/impl/src/main/java/org/opencord/olt/driver/OltPipeline.java
@@ -0,0 +1,1291 @@
+/*
+ * Copyright 2016-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.olt.driver;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalCause;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.onlab.osgi.ServiceDirectory;
+import org.onlab.packet.EthType;
+import org.onlab.packet.IPv4;
+import org.onlab.packet.IPv6;
+import org.onlab.packet.IpPrefix;
+import org.onlab.packet.VlanId;
+import org.onlab.util.AbstractAccumulator;
+import org.onlab.util.Accumulator;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.behaviour.NextGroup;
+import org.onosproject.net.behaviour.Pipeliner;
+import org.onosproject.net.behaviour.PipelinerContext;
+import org.onosproject.net.driver.AbstractHandlerBehaviour;
+import org.onosproject.net.driver.Driver;
+import org.onosproject.net.flow.DefaultFlowRule;
+import org.onosproject.net.flow.DefaultTrafficSelector;
+import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.FlowRule;
+import org.onosproject.net.flow.FlowRuleOperations;
+import org.onosproject.net.flow.FlowRuleOperationsContext;
+import org.onosproject.net.flow.FlowRuleService;
+import org.onosproject.net.flow.TrafficSelector;
+import org.onosproject.net.flow.TrafficTreatment;
+import org.onosproject.net.flow.criteria.Criteria;
+import org.onosproject.net.flow.criteria.Criterion;
+import org.onosproject.net.flow.criteria.EthTypeCriterion;
+import org.onosproject.net.flow.criteria.IPCriterion;
+import org.onosproject.net.flow.criteria.IPProtocolCriterion;
+import org.onosproject.net.flow.criteria.PortCriterion;
+import org.onosproject.net.flow.criteria.UdpPortCriterion;
+import org.onosproject.net.flow.criteria.VlanIdCriterion;
+import org.onosproject.net.flow.instructions.Instruction;
+import org.onosproject.net.flow.instructions.Instructions;
+import org.onosproject.net.flow.instructions.L2ModificationInstruction;
+import org.onosproject.net.flowobjective.FilteringObjective;
+import org.onosproject.net.flowobjective.FlowObjectiveStore;
+import org.onosproject.net.flowobjective.ForwardingObjective;
+import org.onosproject.net.flowobjective.NextObjective;
+import org.onosproject.net.flowobjective.Objective;
+import org.onosproject.net.flowobjective.ObjectiveError;
+import org.onosproject.net.group.DefaultGroupBucket;
+import org.onosproject.net.group.DefaultGroupDescription;
+import org.onosproject.net.group.DefaultGroupKey;
+import org.onosproject.net.group.Group;
+import org.onosproject.net.group.GroupBucket;
+import org.onosproject.net.group.GroupBuckets;
+import org.onosproject.net.group.GroupDescription;
+import org.onosproject.net.group.GroupEvent;
+import org.onosproject.net.group.GroupKey;
+import org.onosproject.net.group.GroupListener;
+import org.onosproject.net.group.GroupService;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.StorageService;
+import org.slf4j.Logger;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.Timer;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.onosproject.core.CoreService.CORE_APP_NAME;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Pipeliner for OLT device.
+ */
+
+public class OltPipeline extends AbstractHandlerBehaviour implements Pipeliner {
+
+ private static final Integer QQ_TABLE = 1;
+ private static final int NO_ACTION_PRIORITY = 500;
+ private static final String DOWNSTREAM = "downstream";
+ private static final String UPSTREAM = "upstream";
+ private final Logger log = getLogger(getClass());
+
+ private ServiceDirectory serviceDirectory;
+ private FlowRuleService flowRuleService;
+ private GroupService groupService;
+ private CoreService coreService;
+ private StorageService storageService;
+
+ private DeviceId deviceId;
+ private ApplicationId appId;
+
+
+ protected FlowObjectiveStore flowObjectiveStore;
+
+ private Cache<GroupKey, NextObjective> pendingGroups;
+
+ protected static KryoNamespace appKryo = new KryoNamespace.Builder()
+ .register(KryoNamespaces.API)
+ .register(GroupKey.class)
+ .register(DefaultGroupKey.class)
+ .register(OltPipelineGroup.class)
+ .build("OltPipeline");
+
+ private static final Timer TIMER = new Timer("filterobj-batching");
+ private Accumulator<Pair<FilteringObjective, FlowRule>> accumulator;
+
+ // accumulator executor service
+ private ScheduledExecutorService accumulatorExecutorService
+ = newSingleThreadScheduledExecutor(groupedThreads("OltPipeliner", "acc-%d", log));
+
+ @Override
+ public void init(DeviceId deviceId, PipelinerContext context) {
+ log.debug("Initiate OLT pipeline");
+ this.serviceDirectory = context.directory();
+ this.deviceId = deviceId;
+
+ flowRuleService = serviceDirectory.get(FlowRuleService.class);
+ coreService = serviceDirectory.get(CoreService.class);
+ groupService = serviceDirectory.get(GroupService.class);
+ flowObjectiveStore = context.store();
+ storageService = serviceDirectory.get(StorageService.class);
+
+ appId = coreService.registerApplication(
+ "org.onosproject.driver.OLTPipeline");
+
+ // Init the accumulator, if enabled
+ if (isAccumulatorEnabled()) {
+ log.debug("Building accumulator with maxObjs {}, batchMs {}, idleMs {}",
+ context.accumulatorMaxObjectives(), context.accumulatorMaxBatchMillis(),
+ context.accumulatorMaxIdleMillis());
+ accumulator = new ObjectiveAccumulator(context.accumulatorMaxObjectives(),
+ context.accumulatorMaxBatchMillis(),
+ context.accumulatorMaxIdleMillis());
+ }
+
+
+ pendingGroups = CacheBuilder.newBuilder()
+ .expireAfterWrite(20, TimeUnit.SECONDS)
+ .removalListener((RemovalNotification<GroupKey, NextObjective> notification) -> {
+ if (notification.getCause() == RemovalCause.EXPIRED) {
+ fail(notification.getValue(), ObjectiveError.GROUPINSTALLATIONFAILED);
+ }
+ }).build();
+
+ groupService.addListener(new InnerGroupListener());
+
+ }
+
+ public boolean isAccumulatorEnabled() {
+ Driver driver = super.data().driver();
+ // we cannot determine the property
+ if (driver == null) {
+ return false;
+ }
+ return Boolean.parseBoolean(driver.getProperty(ACCUMULATOR_ENABLED));
+ }
+
+ @Override
+ public void filter(FilteringObjective filter) {
+ Instructions.OutputInstruction output;
+
+ if (filter.meta() != null && !filter.meta().immediate().isEmpty()) {
+ output = (Instructions.OutputInstruction) filter.meta().immediate().stream()
+ .filter(t -> t.type().equals(Instruction.Type.OUTPUT))
+ .limit(1)
+ .findFirst().get();
+
+ if (output == null || !output.port().equals(PortNumber.CONTROLLER)) {
+ log.warn("OLT can only filter packet to controller");
+ fail(filter, ObjectiveError.UNSUPPORTED);
+ return;
+ }
+ } else {
+ fail(filter, ObjectiveError.BADPARAMS);
+ return;
+ }
+
+ if (filter.key().type() != Criterion.Type.IN_PORT) {
+ fail(filter, ObjectiveError.BADPARAMS);
+ return;
+ }
+
+ EthTypeCriterion ethType = (EthTypeCriterion)
+ filterForCriterion(filter.conditions(), Criterion.Type.ETH_TYPE);
+
+ if (ethType == null) {
+ fail(filter, ObjectiveError.BADPARAMS);
+ return;
+ }
+ Optional<Instruction> vlanId = filter.meta().immediate().stream()
+ .filter(t -> t.type().equals(Instruction.Type.L2MODIFICATION)
+ && ((L2ModificationInstruction) t).subtype()
+ .equals(L2ModificationInstruction.L2SubType.VLAN_ID))
+ .limit(1)
+ .findFirst();
+
+ Optional<Instruction> vlanPcp = filter.meta().immediate().stream()
+ .filter(t -> t.type().equals(Instruction.Type.L2MODIFICATION)
+ && ((L2ModificationInstruction) t).subtype()
+ .equals(L2ModificationInstruction.L2SubType.VLAN_PCP))
+ .limit(1)
+ .findFirst();
+
+ Optional<Instruction> vlanPush = filter.meta().immediate().stream()
+ .filter(t -> t.type().equals(Instruction.Type.L2MODIFICATION)
+ && ((L2ModificationInstruction) t).subtype()
+ .equals(L2ModificationInstruction.L2SubType.VLAN_PUSH))
+ .limit(1)
+ .findFirst();
+
+ if (ethType.ethType().equals(EthType.EtherType.EAPOL.ethType())) {
+
+ if (vlanId.isEmpty() || vlanPush.isEmpty()) {
+ log.warn("Missing EAPOL vlan or vlanPush");
+ fail(filter, ObjectiveError.BADPARAMS);
+ return;
+ }
+ provisionEthTypeBasedFilter(filter, ethType, output,
+ (L2ModificationInstruction) vlanId.get(),
+ (L2ModificationInstruction) vlanPush.get());
+ } else if (ethType.ethType().equals(EthType.EtherType.PPPoED.ethType())) {
+ provisionPPPoED(filter, ethType, vlanId.orElse(null), vlanPcp.orElse(null), output);
+ } else if (ethType.ethType().equals(EthType.EtherType.LLDP.ethType())) {
+ provisionEthTypeBasedFilter(filter, ethType, output, null, null);
+ } else if (ethType.ethType().equals(EthType.EtherType.IPV4.ethType())) {
+ IPProtocolCriterion ipProto = (IPProtocolCriterion)
+ filterForCriterion(filter.conditions(), Criterion.Type.IP_PROTO);
+ if (ipProto == null) {
+ log.warn("OLT can only filter IGMP and DHCP");
+ fail(filter, ObjectiveError.UNSUPPORTED);
+ return;
+ }
+ if (ipProto.protocol() == IPv4.PROTOCOL_IGMP) {
+ provisionIgmp(filter, ethType, ipProto, output,
+ vlanId.orElse(null),
+ vlanPcp.orElse(null));
+ } else if (ipProto.protocol() == IPv4.PROTOCOL_UDP) {
+ UdpPortCriterion udpSrcPort = (UdpPortCriterion)
+ filterForCriterion(filter.conditions(), Criterion.Type.UDP_SRC);
+
+ UdpPortCriterion udpDstPort = (UdpPortCriterion)
+ filterForCriterion(filter.conditions(), Criterion.Type.UDP_DST);
+
+ if ((udpSrcPort.udpPort().toInt() == 67 && udpDstPort.udpPort().toInt() == 68) ||
+ (udpSrcPort.udpPort().toInt() == 68 && udpDstPort.udpPort().toInt() == 67)) {
+ provisionDhcp(filter, ethType, ipProto, udpSrcPort, udpDstPort, vlanId.orElse(null),
+ vlanPcp.orElse(null), output);
+ } else {
+ log.warn("Filtering rule with unsupported UDP src {} or dst {} port", udpSrcPort, udpDstPort);
+ fail(filter, ObjectiveError.UNSUPPORTED);
+ }
+ } else {
+ log.warn("Currently supporting only IGMP and DHCP filters for IPv4 packets");
+ fail(filter, ObjectiveError.UNSUPPORTED);
+ }
+ } else if (ethType.ethType().equals(EthType.EtherType.IPV6.ethType())) {
+ IPProtocolCriterion ipProto = (IPProtocolCriterion)
+ filterForCriterion(filter.conditions(), Criterion.Type.IP_PROTO);
+ if (ipProto == null) {
+ log.warn("OLT can only filter DHCP");
+ fail(filter, ObjectiveError.UNSUPPORTED);
+ return;
+ }
+ if (ipProto.protocol() == IPv6.PROTOCOL_UDP) {
+ UdpPortCriterion udpSrcPort = (UdpPortCriterion)
+ filterForCriterion(filter.conditions(), Criterion.Type.UDP_SRC);
+
+ UdpPortCriterion udpDstPort = (UdpPortCriterion)
+ filterForCriterion(filter.conditions(), Criterion.Type.UDP_DST);
+
+ if ((udpSrcPort.udpPort().toInt() == 546 && udpDstPort.udpPort().toInt() == 547) ||
+ (udpSrcPort.udpPort().toInt() == 547 && udpDstPort.udpPort().toInt() == 546)) {
+ provisionDhcp(filter, ethType, ipProto, udpSrcPort, udpDstPort, vlanId.orElse(null),
+ vlanPcp.orElse(null), output);
+ } else {
+ log.warn("Filtering rule with unsupported UDP src {} or dst {} port", udpSrcPort, udpDstPort);
+ fail(filter, ObjectiveError.UNSUPPORTED);
+ }
+ } else {
+ log.warn("Currently supporting only DHCP filters for IPv6 packets");
+ fail(filter, ObjectiveError.UNSUPPORTED);
+ }
+ } else {
+ log.warn("\nOnly the following are Supported in OLT for filter ->\n"
+ + "ETH TYPE : EAPOL, LLDP and IPV4\n"
+ + "IPV4 TYPE: IGMP and UDP (for DHCP)"
+ + "IPV6 TYPE: UDP (for DHCP)");
+ fail(filter, ObjectiveError.UNSUPPORTED);
+ }
+
+ }
+
+
+ @Override
+ public void forward(ForwardingObjective fwd) {
+ log.debug("Installing forwarding objective {}", fwd);
+ if (checkForMulticast(fwd)) {
+ processMulticastRule(fwd);
+ return;
+ }
+
+ TrafficTreatment treatment = fwd.treatment();
+
+ List<Instruction> instructions = treatment.allInstructions();
+
+ Optional<Instruction> vlanInstruction = instructions.stream()
+ .filter(i -> i.type() == Instruction.Type.L2MODIFICATION)
+ .filter(i -> ((L2ModificationInstruction) i).subtype() ==
+ L2ModificationInstruction.L2SubType.VLAN_PUSH ||
+ ((L2ModificationInstruction) i).subtype() ==
+ L2ModificationInstruction.L2SubType.VLAN_POP)
+ .findAny();
+
+
+ if (!vlanInstruction.isPresent()) {
+ installNoModificationRules(fwd);
+ } else {
+ L2ModificationInstruction vlanIns =
+ (L2ModificationInstruction) vlanInstruction.get();
+ if (vlanIns.subtype() == L2ModificationInstruction.L2SubType.VLAN_PUSH) {
+ installUpstreamRules(fwd);
+ } else if (vlanIns.subtype() == L2ModificationInstruction.L2SubType.VLAN_POP) {
+ installDownstreamRules(fwd);
+ } else {
+ log.error("Unknown OLT operation: {}", fwd);
+ fail(fwd, ObjectiveError.UNSUPPORTED);
+ return;
+ }
+ }
+
+ pass(fwd);
+
+ }
+
+
+ @Override
+ public void next(NextObjective nextObjective) {
+ if (nextObjective.type() != NextObjective.Type.BROADCAST) {
+ log.error("OLT only supports broadcast groups.");
+ fail(nextObjective, ObjectiveError.BADPARAMS);
+ return;
+ }
+
+ if (nextObjective.next().size() != 1 && !nextObjective.op().equals(Objective.Operation.REMOVE)) {
+ log.error("OLT only supports singleton broadcast groups.");
+ fail(nextObjective, ObjectiveError.BADPARAMS);
+ return;
+ }
+
+ Optional<TrafficTreatment> treatmentOpt = nextObjective.next().stream().findFirst();
+ if (treatmentOpt.isEmpty() && !nextObjective.op().equals(Objective.Operation.REMOVE)) {
+ log.error("Next objective {} does not have a treatment", nextObjective);
+ fail(nextObjective, ObjectiveError.BADPARAMS);
+ return;
+ }
+
+ GroupKey key = new DefaultGroupKey(appKryo.serialize(nextObjective.id()));
+
+ pendingGroups.put(key, nextObjective);
+ log.trace("NextObjective Operation {}", nextObjective.op());
+ switch (nextObjective.op()) {
+ case ADD:
+ GroupDescription groupDesc =
+ new DefaultGroupDescription(deviceId,
+ GroupDescription.Type.ALL,
+ new GroupBuckets(
+ Collections.singletonList(
+ buildBucket(treatmentOpt.get()))),
+ key,
+ null,
+ nextObjective.appId());
+ groupService.addGroup(groupDesc);
+ break;
+ case REMOVE:
+ groupService.removeGroup(deviceId, key, nextObjective.appId());
+ break;
+ case ADD_TO_EXISTING:
+ groupService.addBucketsToGroup(deviceId, key,
+ new GroupBuckets(
+ Collections.singletonList(
+ buildBucket(treatmentOpt.get()))),
+ key, nextObjective.appId());
+ break;
+ case REMOVE_FROM_EXISTING:
+ groupService.removeBucketsFromGroup(deviceId, key,
+ new GroupBuckets(
+ Collections.singletonList(
+ buildBucket(treatmentOpt.get()))),
+ key, nextObjective.appId());
+ break;
+ default:
+ log.warn("Unknown next objective operation: {}", nextObjective.op());
+ }
+
+
+ }
+
+ private GroupBucket buildBucket(TrafficTreatment treatment) {
+ return DefaultGroupBucket.createAllGroupBucket(treatment);
+ }
+
+ private void processMulticastRule(ForwardingObjective fwd) {
+ if (fwd.nextId() == null) {
+ log.error("Multicast objective does not have a next id");
+ fail(fwd, ObjectiveError.BADPARAMS);
+ }
+
+ GroupKey key = getGroupForNextObjective(fwd.nextId());
+
+ if (key == null) {
+ log.error("Group for forwarding objective missing: {}", fwd);
+ fail(fwd, ObjectiveError.GROUPMISSING);
+ }
+
+ Group group = groupService.getGroup(deviceId, key);
+ TrafficTreatment treatment =
+ buildTreatment(Instructions.createGroup(group.id()));
+
+ TrafficSelector.Builder selectorBuilder = buildIpv4SelectorForMulticast(fwd);
+
+ FlowRule rule = DefaultFlowRule.builder()
+ .fromApp(fwd.appId())
+ .forDevice(deviceId)
+ .forTable(0)
+ .makePermanent()
+ .withPriority(fwd.priority())
+ .withSelector(selectorBuilder.build())
+ .withTreatment(treatment)
+ .build();
+
+ FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
+ switch (fwd.op()) {
+
+ case ADD:
+ builder.add(rule);
+ break;
+ case REMOVE:
+ builder.remove(rule);
+ break;
+ case ADD_TO_EXISTING:
+ case REMOVE_FROM_EXISTING:
+ break;
+ default:
+ log.warn("Unknown forwarding operation: {}", fwd.op());
+ }
+
+ applyFlowRules(ImmutableList.of(fwd), builder);
+
+
+ }
+
+ private TrafficSelector.Builder buildIpv4SelectorForMulticast(ForwardingObjective fwd) {
+ TrafficSelector.Builder builderToUpdate = DefaultTrafficSelector.builder();
+
+ Optional<Criterion> vlanIdCriterion = readFromSelector(fwd.meta(), Criterion.Type.VLAN_VID);
+ if (vlanIdCriterion.isPresent()) {
+ VlanId assignedVlan = ((VlanIdCriterion) vlanIdCriterion.get()).vlanId();
+ builderToUpdate.matchVlanId(assignedVlan);
+ }
+
+ Optional<Criterion> innerVlanIdCriterion = readFromSelector(fwd.meta(), Criterion.Type.INNER_VLAN_VID);
+ if (innerVlanIdCriterion.isPresent()) {
+ VlanId assignedInnerVlan = ((VlanIdCriterion) innerVlanIdCriterion.get()).vlanId();
+ builderToUpdate.matchMetadata(assignedInnerVlan.toShort());
+ }
+
+ Optional<Criterion> ethTypeCriterion = readFromSelector(fwd.selector(), Criterion.Type.ETH_TYPE);
+ if (ethTypeCriterion.isPresent()) {
+ EthType ethType = ((EthTypeCriterion) ethTypeCriterion.get()).ethType();
+ builderToUpdate.matchEthType(ethType.toShort());
+ }
+
+ Optional<Criterion> ipv4DstCriterion = readFromSelector(fwd.selector(), Criterion.Type.IPV4_DST);
+ if (ipv4DstCriterion.isPresent()) {
+ IpPrefix ipv4Dst = ((IPCriterion) ipv4DstCriterion.get()).ip();
+ builderToUpdate.matchIPDst(ipv4Dst);
+ }
+
+ return builderToUpdate;
+ }
+
+ static Optional<Criterion> readFromSelector(TrafficSelector selector, Criterion.Type type) {
+ if (selector == null) {
+ return Optional.empty();
+ }
+ Criterion criterion = selector.getCriterion(type);
+ return (criterion == null)
+ ? Optional.empty() : Optional.of(criterion);
+ }
+
+ private boolean checkForMulticast(ForwardingObjective fwd) {
+
+ IPCriterion ip = (IPCriterion) filterForCriterion(fwd.selector().criteria(),
+ Criterion.Type.IPV4_DST);
+
+ if (ip == null) {
+ return false;
+ }
+
+ return ip.ip().isMulticast();
+
+ }
+
+ private GroupKey getGroupForNextObjective(Integer nextId) {
+ NextGroup next = flowObjectiveStore.getNextGroup(nextId);
+ return appKryo.deserialize(next.data());
+
+ }
+
+ private void installNoModificationRules(ForwardingObjective fwd) {
+ Instructions.OutputInstruction output = (Instructions.OutputInstruction) fetchOutput(fwd, DOWNSTREAM);
+ Instructions.MetadataInstruction writeMetadata = fetchWriteMetadata(fwd);
+ Instructions.MeterInstruction meter = (Instructions.MeterInstruction) fetchMeter(fwd);
+
+ TrafficSelector selector = fwd.selector();
+
+ Criterion inport = selector.getCriterion(Criterion.Type.IN_PORT);
+ Criterion outerVlan = selector.getCriterion(Criterion.Type.VLAN_VID);
+ Criterion innerVlan = selector.getCriterion(Criterion.Type.INNER_VLAN_VID);
+
+ if (inport == null || output == null || innerVlan == null || outerVlan == null) {
+ // Avoid logging a non-error from lldp, bbdp and eapol core flows.
+ if (!fwd.appId().name().equals(CORE_APP_NAME)) {
+ log.error("Forwarding objective is underspecified: {}", fwd);
+ } else {
+ log.debug("Not installing unsupported core generated flow {}", fwd);
+ }
+ fail(fwd, ObjectiveError.BADPARAMS);
+ return;
+ }
+
+
+ FlowRule.Builder outer = DefaultFlowRule.builder()
+ .fromApp(fwd.appId())
+ .forDevice(deviceId)
+ .makePermanent()
+ .withPriority(fwd.priority())
+ .withSelector(buildSelector(inport, outerVlan))
+ .withTreatment(buildTreatment(output, writeMetadata, meter));
+
+ applyRules(fwd, outer);
+ }
+
+ private void installDownstreamRules(ForwardingObjective fwd) {
+ Instructions.OutputInstruction output = (Instructions.OutputInstruction) fetchOutput(fwd, DOWNSTREAM);
+
+ if (output == null) {
+ return;
+ }
+
+ TrafficSelector selector = fwd.selector();
+
+ Criterion outerVlan = selector.getCriterion(Criterion.Type.VLAN_VID);
+ Criterion outerPbit = selector.getCriterion(Criterion.Type.VLAN_PCP);
+ Criterion innerVlanCriterion = selector.getCriterion(Criterion.Type.INNER_VLAN_VID);
+ Criterion inport = selector.getCriterion(Criterion.Type.IN_PORT);
+ Criterion dstMac = selector.getCriterion(Criterion.Type.ETH_DST);
+
+ if (outerVlan == null || innerVlanCriterion == null || inport == null) {
+ // Avoid logging a non-error from lldp, bbdp and eapol core flows.
+ if (!fwd.appId().name().equals(CORE_APP_NAME)) {
+ log.error("Forwarding objective is underspecified: {}", fwd);
+ } else {
+ log.debug("Not installing unsupported core generated flow {}", fwd);
+ }
+ fail(fwd, ObjectiveError.BADPARAMS);
+ return;
+ }
+
+ VlanId innerVlan = ((VlanIdCriterion) innerVlanCriterion).vlanId();
+ Criterion innerVid = Criteria.matchVlanId(innerVlan);
+
+ // In the case where the C-tag is the same for all the subscribers,
+ // we add a metadata with the outport in the selector to make the flow unique
+ Criterion innerSelectorMeta = Criteria.matchMetadata(output.port().toLong());
+
+ if (innerVlan.toShort() == VlanId.ANY_VALUE) {
+ TrafficSelector outerSelector = buildSelector(inport, outerVlan, outerPbit, dstMac);
+ installDownstreamRulesForAnyVlan(fwd, output, outerSelector,
+ buildSelector(inport,
+ Criteria.matchVlanId(VlanId.ANY),
+ innerSelectorMeta));
+ } else {
+ // Required to differentiate the same match flows
+ // Please note that S tag and S p bit values will be same for the same service - so conflict flows!
+ // Metadata match criteria solves the conflict issue - but not used by the voltha
+ // Maybe - find a better way to solve the above problem
+ Criterion metadata = Criteria.matchMetadata(innerVlan.toShort());
+ TrafficSelector outerSelector = buildSelector(inport, metadata, outerVlan, outerPbit, dstMac);
+ installDownstreamRulesForVlans(fwd, output, outerSelector, buildSelector(inport, innerVid,
+ innerSelectorMeta));
+ }
+ }
+
+ private void installDownstreamRulesForVlans(ForwardingObjective fwd, Instruction output,
+ TrafficSelector outerSelector, TrafficSelector innerSelector) {
+
+ List<Pair<Instruction, Instruction>> vlanOps =
+ vlanOps(fwd,
+ L2ModificationInstruction.L2SubType.VLAN_POP);
+
+ if (vlanOps == null || vlanOps.isEmpty()) {
+ return;
+ }
+
+ Pair<Instruction, Instruction> popAndRewrite = vlanOps.remove(0);
+
+ TrafficTreatment innerTreatment;
+ VlanId setVlanId = ((L2ModificationInstruction.ModVlanIdInstruction) popAndRewrite.getRight()).vlanId();
+ if (VlanId.NONE.equals(setVlanId)) {
+ innerTreatment = (buildTreatment(popAndRewrite.getLeft(), fetchMeter(fwd),
+ writeMetadataIncludingOnlyTp(fwd), output));
+ } else {
+ innerTreatment = (buildTreatment(popAndRewrite.getRight(),
+ fetchMeter(fwd), writeMetadataIncludingOnlyTp(fwd), output));
+ }
+
+ List<Instruction> setVlanPcps = findL2Instructions(L2ModificationInstruction.L2SubType.VLAN_PCP,
+ fwd.treatment().allInstructions());
+
+ Instruction innerPbitSet = null;
+
+ if (setVlanPcps != null && !setVlanPcps.isEmpty()) {
+ innerPbitSet = setVlanPcps.get(0);
+ }
+
+ VlanId remarkInnerVlan = null;
+ Optional<Criterion> vlanIdCriterion = readFromSelector(innerSelector, Criterion.Type.VLAN_VID);
+ if (vlanIdCriterion.isPresent()) {
+ remarkInnerVlan = ((VlanIdCriterion) vlanIdCriterion.get()).vlanId();
+ }
+
+ Instruction modVlanId = null;
+ if (innerPbitSet != null) {
+ modVlanId = Instructions.modVlanId(remarkInnerVlan);
+ }
+
+ //match: in port (nni), s-tag
+ //action: pop vlan (s-tag), write metadata, go to table 1, meter
+ FlowRule.Builder outer = DefaultFlowRule.builder()
+ .fromApp(fwd.appId())
+ .forDevice(deviceId)
+ .makePermanent()
+ .withPriority(fwd.priority())
+ .withSelector(outerSelector)
+ .withTreatment(buildTreatment(popAndRewrite.getLeft(), modVlanId,
+ innerPbitSet, fetchMeter(fwd),
+ fetchWriteMetadata(fwd),
+ Instructions.transition(QQ_TABLE)));
+
+ //match: in port (nni), c-tag
+ //action: immediate: write metadata and pop, meter, output
+ FlowRule.Builder inner = DefaultFlowRule.builder()
+ .fromApp(fwd.appId())
+ .forDevice(deviceId)
+ .forTable(QQ_TABLE)
+ .makePermanent()
+ .withPriority(fwd.priority())
+ .withSelector(innerSelector)
+ .withTreatment(innerTreatment);
+ applyRules(fwd, inner, outer);
+ }
+
+ private void installDownstreamRulesForAnyVlan(ForwardingObjective fwd, Instruction output,
+ TrafficSelector outerSelector, TrafficSelector innerSelector) {
+
+ //match: in port (nni), s-tag
+ //action: immediate: write metadata, pop vlan, meter and go to table 1
+ FlowRule.Builder outer = DefaultFlowRule.builder()
+ .fromApp(fwd.appId())
+ .forDevice(deviceId)
+ .makePermanent()
+ .withPriority(fwd.priority())
+ .withSelector(outerSelector)
+ .withTreatment(buildTreatment(Instructions.popVlan(), fetchMeter(fwd),
+ fetchWriteMetadata(fwd), Instructions.transition(QQ_TABLE)));
+
+ //match: in port (nni) and s-tag
+ //action: immediate : write metadata, meter and output
+ FlowRule.Builder inner = DefaultFlowRule.builder()
+ .fromApp(fwd.appId())
+ .forDevice(deviceId)
+ .forTable(QQ_TABLE)
+ .makePermanent()
+ .withPriority(fwd.priority())
+ .withSelector(innerSelector)
+ .withTreatment(buildTreatment(fetchMeter(fwd),
+ writeMetadataIncludingOnlyTp(fwd), output));
+
+ applyRules(fwd, inner, outer);
+ }
+
+ private void installUpstreamRules(ForwardingObjective fwd) {
+ List<Pair<Instruction, Instruction>> vlanOps =
+ vlanOps(fwd,
+ L2ModificationInstruction.L2SubType.VLAN_PUSH);
+
+ if (vlanOps == null || vlanOps.isEmpty()) {
+ return;
+ }
+
+ Instruction output = fetchOutput(fwd, UPSTREAM);
+
+ if (output == null) {
+ return;
+ }
+
+ Pair<Instruction, Instruction> outerPair = vlanOps.remove(0);
+
+ boolean noneValueVlanStatus = checkNoneVlanCriteria(fwd);
+ boolean anyValueVlanStatus = checkAnyVlanMatchCriteria(fwd);
+
+ if (anyValueVlanStatus) {
+ installUpstreamRulesForAnyVlan(fwd, output, outerPair);
+ } else {
+ Pair<Instruction, Instruction> innerPair = outerPair;
+ outerPair = vlanOps.remove(0);
+ installUpstreamRulesForVlans(fwd, output, innerPair, outerPair, noneValueVlanStatus);
+ }
+ }
+
+ private void installUpstreamRulesForVlans(ForwardingObjective fwd, Instruction output,
+ Pair<Instruction, Instruction> innerPair,
+ Pair<Instruction, Instruction> outerPair, Boolean noneValueVlanStatus) {
+
+ List<Instruction> setVlanPcps = findL2Instructions(L2ModificationInstruction.L2SubType.VLAN_PCP,
+ fwd.treatment().allInstructions());
+
+ Instruction innerPbitSet = null;
+ Instruction outerPbitSet = null;
+
+ if (setVlanPcps != null && !setVlanPcps.isEmpty()) {
+ innerPbitSet = setVlanPcps.get(0);
+ outerPbitSet = setVlanPcps.get(1);
+ }
+
+ TrafficTreatment innerTreatment;
+ if (noneValueVlanStatus) {
+ innerTreatment = buildTreatment(innerPair.getLeft(), innerPair.getRight(), fetchMeter(fwd),
+ fetchWriteMetadata(fwd), innerPbitSet,
+ Instructions.transition(QQ_TABLE));
+ } else {
+ innerTreatment = buildTreatment(innerPair.getRight(), fetchMeter(fwd), fetchWriteMetadata(fwd),
+ innerPbitSet, Instructions.transition(QQ_TABLE));
+ }
+
+ //match: in port, vlanId (0 or None)
+ //action:
+ //if vlanId None, push & set c-tag go to table 1
+ //if vlanId 0 or any specific vlan, set c-tag, write metadata, meter and go to table 1
+ FlowRule.Builder inner = DefaultFlowRule.builder()
+ .fromApp(fwd.appId())
+ .forDevice(deviceId)
+ .makePermanent()
+ .withPriority(fwd.priority())
+ .withSelector(fwd.selector())
+ .withTreatment(innerTreatment);
+
+ PortCriterion inPort = (PortCriterion)
+ fwd.selector().getCriterion(Criterion.Type.IN_PORT);
+
+ VlanId cVlanId = ((L2ModificationInstruction.ModVlanIdInstruction)
+ innerPair.getRight()).vlanId();
+
+ //match: in port, c-tag
+ //action: immediate: push s-tag, write metadata, meter and output
+ FlowRule.Builder outer = DefaultFlowRule.builder()
+ .fromApp(fwd.appId())
+ .forDevice(deviceId)
+ .forTable(QQ_TABLE)
+ .makePermanent()
+ .withPriority(fwd.priority())
+ .withTreatment(buildTreatment(outerPair.getLeft(), outerPair.getRight(),
+ fetchMeter(fwd), writeMetadataIncludingOnlyTp(fwd),
+ outerPbitSet, output));
+
+ if (innerPbitSet != null) {
+ byte innerPbit = ((L2ModificationInstruction.ModVlanPcpInstruction)
+ innerPbitSet).vlanPcp();
+ outer.withSelector(buildSelector(inPort, Criteria.matchVlanId(cVlanId), Criteria.matchVlanPcp(innerPbit)));
+ } else {
+ outer.withSelector(buildSelector(inPort, Criteria.matchVlanId(cVlanId)));
+ }
+
+ applyRules(fwd, inner, outer);
+ }
+
+ private void installUpstreamRulesForAnyVlan(ForwardingObjective fwd, Instruction output,
+ Pair<Instruction, Instruction> outerPair) {
+
+ log.debug("Installing upstream rules for any value vlan");
+
+ //match: in port and any-vlan (coming from OLT app.)
+ //action: write metadata, go to table 1 and meter
+ FlowRule.Builder inner = DefaultFlowRule.builder()
+ .fromApp(fwd.appId())
+ .forDevice(deviceId)
+ .makePermanent()
+ .withPriority(fwd.priority())
+ .withSelector(fwd.selector())
+ .withTreatment(buildTreatment(Instructions.transition(QQ_TABLE), fetchMeter(fwd),
+ fetchWriteMetadata(fwd)));
+
+ //match: in port and any-vlan (coming from OLT app.)
+ //action: immediate: push:QinQ, vlanId (s-tag), write metadata, meter and output
+ FlowRule.Builder outer = DefaultFlowRule.builder()
+ .fromApp(fwd.appId())
+ .forDevice(deviceId)
+ .forTable(QQ_TABLE)
+ .makePermanent()
+ .withPriority(fwd.priority())
+ .withSelector(fwd.selector())
+ .withTreatment(buildTreatment(outerPair.getLeft(), outerPair.getRight(),
+ fetchMeter(fwd), writeMetadataIncludingOnlyTp(fwd), output));
+
+ applyRules(fwd, inner, outer);
+ }
+
+ private boolean checkNoneVlanCriteria(ForwardingObjective fwd) {
+ // Add the VLAN_PUSH treatment if we're matching on VlanId.NONE
+ Criterion vlanMatchCriterion = filterForCriterion(fwd.selector().criteria(), Criterion.Type.VLAN_VID);
+ boolean noneValueVlanStatus = false;
+ if (vlanMatchCriterion != null) {
+ noneValueVlanStatus = ((VlanIdCriterion) vlanMatchCriterion).vlanId().equals(VlanId.NONE);
+ }
+ return noneValueVlanStatus;
+ }
+
+ private boolean checkAnyVlanMatchCriteria(ForwardingObjective fwd) {
+ Criterion anyValueVlanCriterion = fwd.selector().criteria().stream()
+ .filter(c -> c.type().equals(Criterion.Type.VLAN_VID))
+ .filter(vc -> ((VlanIdCriterion) vc).vlanId().toShort() == VlanId.ANY_VALUE)
+ .findAny().orElse(null);
+
+ if (anyValueVlanCriterion == null) {
+ log.debug("Any value vlan match criteria is not found, criteria {}",
+ fwd.selector().criteria());
+ return false;
+ }
+
+ return true;
+ }
+
+ private Instruction fetchOutput(ForwardingObjective fwd, String direction) {
+ Instruction output = fwd.treatment().allInstructions().stream()
+ .filter(i -> i.type() == Instruction.Type.OUTPUT)
+ .findFirst().orElse(null);
+
+ if (output == null) {
+ log.error("OLT {} rule has no output", direction);
+ fail(fwd, ObjectiveError.BADPARAMS);
+ return null;
+ }
+ return output;
+ }
+
+ private Instruction fetchMeter(ForwardingObjective fwd) {
+ Instruction meter = fwd.treatment().metered();
+
+ if (meter == null) {
+ log.debug("Meter instruction is not found for the forwarding objective {}", fwd);
+ return null;
+ }
+
+ log.debug("Meter instruction is found.");
+ return meter;
+ }
+
+ private Instructions.MetadataInstruction fetchWriteMetadata(ForwardingObjective fwd) {
+ Instructions.MetadataInstruction writeMetadata = fwd.treatment().writeMetadata();
+
+ if (writeMetadata == null) {
+ log.warn("Write metadata is not found for the forwarding obj");
+ fail(fwd, ObjectiveError.BADPARAMS);
+ return null;
+ }
+
+ log.debug("Write metadata is found {}", writeMetadata);
+ return writeMetadata;
+ }
+
+ private List<Pair<Instruction, Instruction>> vlanOps(ForwardingObjective fwd,
+ L2ModificationInstruction.L2SubType type) {
+
+ List<Pair<Instruction, Instruction>> vlanOps = findVlanOps(
+ fwd.treatment().allInstructions(), type);
+
+ if (vlanOps == null || vlanOps.isEmpty()) {
+ String direction = type == L2ModificationInstruction.L2SubType.VLAN_POP
+ ? DOWNSTREAM : UPSTREAM;
+ log.error("Missing vlan operations in {} forwarding: {}", direction, fwd);
+ fail(fwd, ObjectiveError.BADPARAMS);
+ return ImmutableList.of();
+ }
+ return vlanOps;
+ }
+
+
+ private List<Pair<Instruction, Instruction>> findVlanOps(List<Instruction> instructions,
+ L2ModificationInstruction.L2SubType type) {
+
+ List<Instruction> vlanOperations = findL2Instructions(
+ type,
+ instructions);
+ List<Instruction> vlanSets = findL2Instructions(
+ L2ModificationInstruction.L2SubType.VLAN_ID,
+ instructions);
+
+ if (vlanOperations.size() != vlanSets.size()) {
+ return ImmutableList.of();
+ }
+
+ List<Pair<Instruction, Instruction>> pairs = Lists.newArrayList();
+
+ for (int i = 0; i < vlanOperations.size(); i++) {
+ pairs.add(new ImmutablePair<>(vlanOperations.get(i), vlanSets.get(i)));
+ }
+ return pairs;
+ }
+
+ private List<Instruction> findL2Instructions(L2ModificationInstruction.L2SubType subType,
+ List<Instruction> actions) {
+ return actions.stream()
+ .filter(i -> i.type() == Instruction.Type.L2MODIFICATION)
+ .filter(i -> ((L2ModificationInstruction) i).subtype() == subType)
+ .collect(Collectors.toList());
+ }
+
+ private void provisionEthTypeBasedFilter(FilteringObjective filter,
+ EthTypeCriterion ethType,
+ Instructions.OutputInstruction output,
+ L2ModificationInstruction vlanId,
+ L2ModificationInstruction vlanPush) {
+
+ Instruction meter = filter.meta().metered();
+ Instruction writeMetadata = filter.meta().writeMetadata();
+
+ TrafficSelector selector = buildSelector(filter.key(), ethType);
+ TrafficTreatment treatment;
+
+ if (vlanPush == null || vlanId == null) {
+ treatment = buildTreatment(output, meter, writeMetadata);
+ } else {
+ // we need to push the vlan because it came untagged (ATT)
+ treatment = buildTreatment(output, meter, vlanPush, vlanId, writeMetadata);
+ }
+
+ buildAndApplyRule(filter, selector, treatment);
+
+ }
+
+ private void provisionIgmp(FilteringObjective filter, EthTypeCriterion ethType,
+ IPProtocolCriterion ipProto,
+ Instructions.OutputInstruction output,
+ Instruction vlan, Instruction pcp) {
+
+ Instruction meter = filter.meta().metered();
+ Instruction writeMetadata = filter.meta().writeMetadata();
+
+ // uniTagMatch
+ VlanIdCriterion vlanId = (VlanIdCriterion) filterForCriterion(filter.conditions(),
+ Criterion.Type.VLAN_VID);
+
+ TrafficSelector selector = buildSelector(filter.key(), ethType, ipProto, vlanId);
+ TrafficTreatment treatment = buildTreatment(output, vlan, pcp, meter, writeMetadata);
+ buildAndApplyRule(filter, selector, treatment);
+ }
+
+ private void provisionDhcp(FilteringObjective filter, EthTypeCriterion ethType,
+ IPProtocolCriterion ipProto,
+ UdpPortCriterion udpSrcPort,
+ UdpPortCriterion udpDstPort,
+ Instruction vlanIdInstruction,
+ Instruction vlanPcpInstruction,
+ Instructions.OutputInstruction output) {
+
+ Instruction meter = filter.meta().metered();
+ Instruction writeMetadata = filter.meta().writeMetadata();
+
+ VlanIdCriterion matchVlanId = (VlanIdCriterion)
+ filterForCriterion(filter.conditions(), Criterion.Type.VLAN_VID);
+
+ TrafficSelector selector;
+ TrafficTreatment treatment;
+
+ if (matchVlanId != null) {
+ log.debug("Building selector with match VLAN, {}", matchVlanId);
+ // in case of TT upstream the packet comes tagged and the vlan is swapped.
+ selector = buildSelector(filter.key(), ethType, ipProto, udpSrcPort,
+ udpDstPort, matchVlanId);
+ treatment = buildTreatment(output, meter, writeMetadata,
+ vlanIdInstruction, vlanPcpInstruction);
+ } else {
+ log.debug("Building selector with no VLAN");
+ // in case of ATT upstream the packet comes in untagged and we need to push the vlan
+ selector = buildSelector(filter.key(), ethType, ipProto, udpSrcPort, udpDstPort);
+ treatment = buildTreatment(output, meter, vlanIdInstruction, writeMetadata);
+ }
+ //In case of downstream there will be no match on the VLAN, which is null,
+ // so it will just be output, meter, writeMetadata
+
+ buildAndApplyRule(filter, selector, treatment);
+ }
+
+ private void provisionPPPoED(FilteringObjective filter, EthTypeCriterion ethType,
+ Instruction vlanIdInstruction,
+ Instruction vlanPcpInstruction,
+ Instructions.OutputInstruction output) {
+ Instruction meter = filter.meta().metered();
+ Instruction writeMetadata = filter.meta().writeMetadata();
+
+ VlanIdCriterion matchVlanId = (VlanIdCriterion)
+ filterForCriterion(filter.conditions(), Criterion.Type.VLAN_VID);
+
+ TrafficSelector selector;
+ TrafficTreatment treatment;
+
+ if (matchVlanId != null) {
+ log.debug("Building pppoed selector with match VLAN {}.", matchVlanId);
+ } else {
+ log.debug("Building pppoed selector without match VLAN.");
+ }
+
+ selector = buildSelector(filter.key(), ethType, matchVlanId);
+ treatment = buildTreatment(output, meter, writeMetadata, vlanIdInstruction, vlanPcpInstruction);
+ buildAndApplyRule(filter, selector, treatment);
+ }
+
+ private void buildAndApplyRule(FilteringObjective filter, TrafficSelector selector,
+ TrafficTreatment treatment) {
+ FlowRule rule = DefaultFlowRule.builder()
+ .fromApp(filter.appId())
+ .forDevice(deviceId)
+ .forTable(0)
+ .makePermanent()
+ .withSelector(selector)
+ .withTreatment(treatment)
+ .withPriority(filter.priority())
+ .build();
+
+ if (accumulator != null) {
+ if (log.isDebugEnabled()) {
+ log.debug("Adding pair to batch: {}", Pair.of(filter, rule));
+ }
+ accumulator.add(Pair.of(filter, rule));
+ } else {
+ FlowRuleOperations.Builder opsBuilder = FlowRuleOperations.builder();
+ switch (filter.type()) {
+ case PERMIT:
+ opsBuilder.add(rule);
+ break;
+ case DENY:
+ opsBuilder.remove(rule);
+ break;
+ default:
+ log.warn("Unknown filter type : {}", filter.type());
+ fail(filter, ObjectiveError.UNSUPPORTED);
+ }
+ applyFlowRules(ImmutableList.of(filter), opsBuilder);
+ }
+ }
+
+ private void applyRules(ForwardingObjective fwd, FlowRule.Builder... fwdBuilders) {
+ FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
+ switch (fwd.op()) {
+ case ADD:
+ for (FlowRule.Builder fwdBuilder : fwdBuilders) {
+ builder.add(fwdBuilder.build());
+ }
+ break;
+ case REMOVE:
+ for (FlowRule.Builder fwdBuilder : fwdBuilders) {
+ builder.remove(fwdBuilder.build());
+ }
+ break;
+ case ADD_TO_EXISTING:
+ break;
+ case REMOVE_FROM_EXISTING:
+ break;
+ default:
+ log.warn("Unknown forwarding operation: {}", fwd.op());
+ }
+
+ applyFlowRules(ImmutableList.of(fwd), builder);
+
+
+ }
+
+ private void applyFlowRules(List<Objective> objectives, FlowRuleOperations.Builder builder) {
+ flowRuleService.apply(builder.build(new FlowRuleOperationsContext() {
+ @Override
+ public void onSuccess(FlowRuleOperations ops) {
+ objectives.forEach(obj -> {
+ pass(obj);
+ });
+ }
+
+ @Override
+ public void onError(FlowRuleOperations ops) {
+ objectives.forEach(obj -> {
+ fail(obj, ObjectiveError.FLOWINSTALLATIONFAILED);
+ });
+
+ }
+ }));
+ }
+
+ // Builds the batch using the accumulated flow rules
+ private void sendFilters(List<Pair<FilteringObjective, FlowRule>> pairs) {
+ FlowRuleOperations.Builder flowOpsBuilder = FlowRuleOperations.builder();
+ log.debug("Sending batch of {} filter-objs", pairs.size());
+ List<Objective> filterObjs = Lists.newArrayList();
+ // Iterates over all accumulated flow rules and then build an unique batch
+ pairs.forEach(pair -> {
+ FilteringObjective filter = pair.getLeft();
+ FlowRule rule = pair.getRight();
+ switch (filter.type()) {
+ case PERMIT:
+ flowOpsBuilder.add(rule);
+ log.debug("Applying add filter-obj {} to device: {}", filter.id(), deviceId);
+ filterObjs.add(filter);
+ break;
+ case DENY:
+ flowOpsBuilder.remove(rule);
+ log.debug("Deleting flow rule {} to device: {}", rule, deviceId);
+ filterObjs.add(filter);
+ break;
+ default:
+ fail(filter, ObjectiveError.UNKNOWN);
+ log.warn("Unknown forwarding type {}", filter.type());
+ }
+ });
+ if (log.isDebugEnabled()) {
+ log.debug("Applying batch {}", flowOpsBuilder.build());
+ }
+ // Finally applies the operations
+ applyFlowRules(filterObjs, flowOpsBuilder);
+ }
+
+ private Criterion filterForCriterion(Collection<Criterion> criteria, Criterion.Type type) {
+ return criteria.stream()
+ .filter(c -> c.type().equals(type))
+ .limit(1)
+ .findFirst().orElse(null);
+ }
+
+ private TrafficSelector buildSelector(Criterion... criteria) {
+
+ TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder();
+
+ Arrays.stream(criteria).filter(Objects::nonNull).forEach(sBuilder::add);
+
+ return sBuilder.build();
+ }
+
+ private TrafficTreatment buildTreatment(Instruction... instructions) {
+
+ TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
+
+ Arrays.stream(instructions).filter(Objects::nonNull).forEach(tBuilder::add);
+
+ return tBuilder.build();
+ }
+
+ private Instruction writeMetadataIncludingOnlyTp(ForwardingObjective fwd) {
+
+ return Instructions.writeMetadata(
+ fetchWriteMetadata(fwd).metadata() & 0xFFFF00000000L, 0L);
+ }
+
+ private void fail(Objective obj, ObjectiveError error) {
+ obj.context().ifPresent(context -> context.onError(obj, error));
+ }
+
+ private void pass(Objective obj) {
+ obj.context().ifPresent(context -> context.onSuccess(obj));
+ }
+
+
+ private class InnerGroupListener implements GroupListener {
+ @Override
+ public void event(GroupEvent event) {
+ GroupKey key = event.subject().appCookie();
+ NextObjective obj = pendingGroups.getIfPresent(key);
+ if (obj == null) {
+ log.debug("No pending group for {}, moving on", key);
+ return;
+ }
+ log.trace("Event {} for group {}, handling pending" +
+ "NextGroup {}", event.type(), key, obj.id());
+ if (event.type() == GroupEvent.Type.GROUP_ADDED ||
+ event.type() == GroupEvent.Type.GROUP_UPDATED) {
+ flowObjectiveStore.putNextGroup(obj.id(), new OltPipelineGroup(key));
+ pass(obj);
+ pendingGroups.invalidate(key);
+ } else if (event.type() == GroupEvent.Type.GROUP_REMOVED) {
+ flowObjectiveStore.removeNextGroup(obj.id());
+ pass(obj);
+ pendingGroups.invalidate(key);
+ }
+ }
+ }
+
+ private static class OltPipelineGroup implements NextGroup {
+
+ private final GroupKey key;
+
+ public OltPipelineGroup(GroupKey key) {
+ this.key = key;
+ }
+
+ public GroupKey key() {
+ return key;
+ }
+
+ @Override
+ public byte[] data() {
+ return appKryo.serialize(key);
+ }
+
+ }
+
+ @Override
+ public List<String> getNextMappings(NextGroup nextGroup) {
+ // TODO Implementation deferred to vendor
+ return null;
+ }
+
+ // Flow rules accumulator for reducing the number of transactions required to the devices.
+ private final class ObjectiveAccumulator
+ extends AbstractAccumulator<Pair<FilteringObjective, FlowRule>> {
+
+ ObjectiveAccumulator(int maxFilter, int maxBatchMS, int maxIdleMS) {
+ super(TIMER, maxFilter, maxBatchMS, maxIdleMS);
+ }
+
+ @Override
+ public void processItems(List<Pair<FilteringObjective, FlowRule>> pairs) {
+ // Triggers creation of a batch using the list of flowrules generated from objs.
+ accumulatorExecutorService.execute(new FlowRulesBuilderTask(pairs));
+ }
+ }
+
+ // Task for building batch of flow rules in a separate thread.
+ private final class FlowRulesBuilderTask implements Runnable {
+ private final List<Pair<FilteringObjective, FlowRule>> pairs;
+
+ FlowRulesBuilderTask(List<Pair<FilteringObjective, FlowRule>> pairs) {
+ this.pairs = pairs;
+ }
+
+ @Override
+ public void run() {
+ try {
+ sendFilters(pairs);
+ } catch (Exception e) {
+ log.warn("Unable to send objectives", e);
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/impl/src/main/java/org/opencord/olt/driver/package-info.java b/impl/src/main/java/org/opencord/olt/driver/package-info.java
new file mode 100644
index 0000000..dbf2a79
--- /dev/null
+++ b/impl/src/main/java/org/opencord/olt/driver/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Drivers for the VOLTHA OLT application.
+ */
+package org.opencord.olt.driver;
diff --git a/impl/src/main/java/org/opencord/olt/impl/ConsistentHasher.java b/impl/src/main/java/org/opencord/olt/impl/ConsistentHasher.java
new file mode 100644
index 0000000..52e9b96
--- /dev/null
+++ b/impl/src/main/java/org/opencord/olt/impl/ConsistentHasher.java
@@ -0,0 +1,130 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.olt.impl;
+
+import com.google.common.hash.Hashing;
+import org.onosproject.cluster.NodeId;
+
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Returns a server hosting a given key based on consistent hashing.
+ */
+public class ConsistentHasher {
+
+ private static class Entry implements Comparable<Entry> {
+ private final NodeId server;
+ private final int hash;
+
+ public Entry(NodeId server, int hash) {
+ this.server = server;
+ this.hash = hash;
+ }
+
+ public Entry(int hash) {
+ server = null;
+ this.hash = hash;
+ }
+
+ @Override
+ public int compareTo(Entry o) {
+ if (this.hash > o.hash) {
+ return 1;
+ } else if (this.hash < o.hash) {
+ return -1;
+ } // else
+ return 0;
+ }
+ }
+
+ private final int weight;
+
+ private List<Entry> table;
+
+ /**
+ * Creates a new hasher with the given list of servers.
+ *
+ * @param servers list of servers
+ * @param weight weight
+ */
+ public ConsistentHasher(List<NodeId> servers, int weight) {
+ this.weight = weight;
+
+ this.table = new ArrayList<>();
+
+ servers.forEach(this::addServer);
+ }
+
+ /**
+ * Adds a new server to the list of servers.
+ *
+ * @param server server ID
+ */
+ public synchronized void addServer(NodeId server) {
+ // Add weight number of buckets for each server
+ for (int i = 0; i < weight; i++) {
+ String label = server.toString() + i;
+ int hash = getHash(label);
+ Entry e = new Entry(server, hash);
+ table.add(e);
+ }
+
+ Collections.sort(table);
+ }
+
+ /**
+ * Removes a server from the list of servers.
+ *
+ * @param server server ID
+ */
+ public synchronized void removeServer(NodeId server) {
+ table.removeIf(e -> e.server.equals(server));
+ }
+
+ /**
+ * Hashes a given input string and finds that server that should
+ * handle the given ID.
+ *
+ * @param s input
+ * @return server ID
+ */
+ public synchronized NodeId hash(String s) {
+ Entry temp = new Entry(getHash(s));
+
+ int pos = Collections.binarySearch(this.table, temp);
+
+ // translate a negative not-found result into the closest following match
+ if (pos < 0) {
+ pos = Math.abs(pos + 1);
+ }
+
+ // wraparound if the hash was after the last entry in the table
+ if (pos == this.table.size()) {
+ pos = 0;
+ }
+
+ return table.get(pos).server;
+ }
+
+ private int getHash(String s) {
+ // stable, uniformly-distributed hash
+ return Hashing.murmur3_128().hashString(s, Charset.defaultCharset()).asInt();
+ }
+}
diff --git a/impl/src/main/java/org/opencord/olt/impl/Olt.java b/impl/src/main/java/org/opencord/olt/impl/Olt.java
new file mode 100644
index 0000000..24073b0
--- /dev/null
+++ b/impl/src/main/java/org/opencord/olt/impl/Olt.java
@@ -0,0 +1,1393 @@
+/*
+ * Copyright 2016-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.olt.impl;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+import org.onlab.packet.VlanId;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterEvent;
+import org.onosproject.cluster.ClusterEventListener;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.event.AbstractListenerManager;
+import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.AnnotationKeys;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.Device;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.Port;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.device.DeviceListener;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.flow.FlowRuleService;
+import org.onosproject.net.flowobjective.FlowObjectiveService;
+import org.onosproject.net.flowobjective.ForwardingObjective;
+import org.onosproject.net.flowobjective.Objective;
+import org.onosproject.net.flowobjective.ObjectiveContext;
+import org.onosproject.net.flowobjective.ObjectiveError;
+import org.onosproject.net.meter.MeterId;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMultimap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.opencord.olt.AccessDeviceEvent;
+import org.opencord.olt.AccessDeviceListener;
+import org.opencord.olt.AccessDeviceService;
+import org.opencord.olt.AccessSubscriberId;
+import org.opencord.olt.internalapi.AccessDeviceFlowService;
+import org.opencord.olt.internalapi.AccessDeviceMeterService;
+import org.opencord.sadis.BandwidthProfileInformation;
+import org.opencord.sadis.BaseInformationService;
+import org.opencord.sadis.SadisService;
+import org.opencord.sadis.SubscriberAndDeviceInformation;
+import org.opencord.sadis.UniTagInformation;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Modified;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Dictionary;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static java.util.stream.Collectors.*;
+import static org.onlab.util.Tools.get;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.opencord.olt.impl.OsgiPropertyConstants.*;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Provisions rules on access devices.
+ */
+@Component(immediate = true,
+ property = {
+ DEFAULT_BP_ID + ":String=" + DEFAULT_BP_ID_DEFAULT,
+ DEFAULT_MCAST_SERVICE_NAME + ":String=" + DEFAULT_MCAST_SERVICE_NAME_DEFAULT,
+ EAPOL_DELETE_RETRY_MAX_ATTEMPS + ":Integer=" +
+ EAPOL_DELETE_RETRY_MAX_ATTEMPS_DEFAULT,
+ PROVISION_DELAY + ":Integer=" + PROVISION_DELAY_DEFAULT,
+ })
+public class Olt
+ extends AbstractListenerManager<AccessDeviceEvent, AccessDeviceListener>
+ implements AccessDeviceService {
+ private static final String APP_NAME = "org.opencord.olt";
+
+ private static final short EAPOL_DEFAULT_VLAN = 4091;
+ private static final String NO_UPLINK_PORT = "No uplink port found for OLT device {}";
+
+ public static final int HASH_WEIGHT = 10;
+
+ private final Logger log = getLogger(getClass());
+
+ private static final String NNI = "nni-";
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected FlowObjectiveService flowObjectiveService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected DeviceService deviceService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected CoreService coreService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected SadisService sadisService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected AccessDeviceFlowService oltFlowService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected AccessDeviceMeterService oltMeterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected StorageService storageService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected MastershipService mastershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected LeadershipService leadershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected FlowRuleService flowRuleService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected ComponentConfigService componentConfigService;
+
+ /**
+ * Default bandwidth profile id that is used for authentication trap flows.
+ **/
+ protected String defaultBpId = DEFAULT_BP_ID_DEFAULT;
+
+ /**
+ * Default multicast service name.
+ **/
+ protected String multicastServiceName = DEFAULT_MCAST_SERVICE_NAME_DEFAULT;
+
+ /**
+ * Default amounts of eapol retry.
+ **/
+ protected int eapolDeleteRetryMaxAttempts = EAPOL_DELETE_RETRY_MAX_ATTEMPS_DEFAULT;
+
+ /**
+ * Delay between EAPOL removal and data plane flows provisioning.
+ */
+ protected int provisionDelay = PROVISION_DELAY_DEFAULT;
+
+ private final DeviceListener deviceListener = new InternalDeviceListener();
+ private final ClusterEventListener clusterListener = new InternalClusterListener();
+
+ private ConsistentHasher hasher;
+
+ protected BaseInformationService<SubscriberAndDeviceInformation> subsService;
+ private BaseInformationService<BandwidthProfileInformation> bpService;
+
+ private ExecutorService oltInstallers = Executors.newFixedThreadPool(4,
+ groupedThreads("onos/olt-service",
+ "olt-installer-%d"));
+
+ protected ExecutorService eventExecutor;
+ protected ExecutorService retryExecutor;
+ protected ScheduledExecutorService provisionExecutor;
+
+ private ConsistentMultimap<ConnectPoint, UniTagInformation> programmedSubs;
+ private ConsistentMultimap<ConnectPoint, UniTagInformation> failedSubs;
+
+ private ConcurrentMap<DeviceId, BlockingQueue<SubscriberFlowInfo>> pendingSubscribersForDevice;
+
+ @Activate
+ public void activate(ComponentContext context) {
+ eventExecutor = newSingleThreadScheduledExecutor(groupedThreads("onos/olt",
+ "events-%d", log));
+ retryExecutor = Executors.newCachedThreadPool();
+ provisionExecutor = Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/olt",
+ "provision-%d", log));
+
+ modified(context);
+ ApplicationId appId = coreService.registerApplication(APP_NAME);
+ componentConfigService.registerProperties(getClass());
+
+ KryoNamespace serializer = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .register(UniTagInformation.class)
+ .build();
+
+ programmedSubs = storageService.<ConnectPoint, UniTagInformation>consistentMultimapBuilder()
+ .withName("volt-programmed-subs")
+ .withSerializer(Serializer.using(serializer))
+ .withApplicationId(appId)
+ .build();
+
+ failedSubs = storageService.<ConnectPoint, UniTagInformation>consistentMultimapBuilder()
+ .withName("volt-failed-subs")
+ .withSerializer(Serializer.using(serializer))
+ .withApplicationId(appId)
+ .build();
+
+ pendingSubscribersForDevice = new ConcurrentHashMap<>();
+ eventDispatcher.addSink(AccessDeviceEvent.class, listenerRegistry);
+
+ subsService = sadisService.getSubscriberInfoService();
+ bpService = sadisService.getBandwidthProfileService();
+
+ List<NodeId> readyNodes = clusterService.getNodes().stream()
+ .filter(c -> clusterService.getState(c.id()) == ControllerNode.State.READY)
+ .map(ControllerNode::id)
+ .collect(toList());
+ hasher = new ConsistentHasher(readyNodes, HASH_WEIGHT);
+ clusterService.addListener(clusterListener);
+
+ // look for all provisioned devices in Sadis and create EAPOL flows for the
+ // UNI ports
+ Iterable<Device> devices = deviceService.getDevices();
+ for (Device d : devices) {
+ if (isLocalLeader(d.id())) {
+ checkAndCreateDeviceFlows(d);
+ }
+ }
+
+ deviceService.addListener(deviceListener);
+ log.info("Started with Application ID {}", appId.id());
+ }
+
+ @Deactivate
+ public void deactivate() {
+ componentConfigService.unregisterProperties(getClass(), false);
+ clusterService.removeListener(clusterListener);
+ deviceService.removeListener(deviceListener);
+ eventDispatcher.removeSink(AccessDeviceEvent.class);
+ eventExecutor.shutdown();
+ retryExecutor.shutdown();
+ provisionExecutor.shutdown();
+ log.info("Stopped");
+ }
+
+ @Modified
+ public void modified(ComponentContext context) {
+ Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
+
+ try {
+ String bpId = get(properties, DEFAULT_BP_ID);
+ defaultBpId = isNullOrEmpty(bpId) ? defaultBpId : bpId;
+
+ String mcastSN = get(properties, DEFAULT_MCAST_SERVICE_NAME);
+ multicastServiceName = isNullOrEmpty(mcastSN) ? multicastServiceName : mcastSN;
+
+ String eapolDeleteRetryNew = get(properties, EAPOL_DELETE_RETRY_MAX_ATTEMPS);
+ eapolDeleteRetryMaxAttempts = isNullOrEmpty(eapolDeleteRetryNew) ? EAPOL_DELETE_RETRY_MAX_ATTEMPS_DEFAULT :
+ Integer.parseInt(eapolDeleteRetryNew.trim());
+
+ log.debug("OLT properties: DefaultBpId: {}, MulticastServiceName: {}, EapolDeleteRetryMaxAttempts: {}",
+ defaultBpId, multicastServiceName, eapolDeleteRetryMaxAttempts);
+
+ } catch (Exception e) {
+ log.error("Error while modifying the properties", e);
+ defaultBpId = DEFAULT_BP_ID_DEFAULT;
+ multicastServiceName = DEFAULT_MCAST_SERVICE_NAME_DEFAULT;
+ }
+ }
+
+ @Override
+ public boolean provisionSubscriber(ConnectPoint connectPoint) {
+ log.info("Call to provision subscriber at {}", connectPoint);
+ DeviceId deviceId = connectPoint.deviceId();
+ PortNumber subscriberPortNo = connectPoint.port();
+ checkNotNull(deviceService.getPort(deviceId, subscriberPortNo),
+ "Invalid connect point:" + connectPoint);
+
+ if (isSubscriberInstalled(connectPoint)) {
+ log.warn("Subscriber at {} already provisioned or in the process .."
+ + " not taking any more action", connectPoint);
+ return true;
+ }
+
+ // Find the subscriber config at this connect point
+ SubscriberAndDeviceInformation sub = getSubscriber(connectPoint);
+ if (sub == null) {
+ log.warn("No subscriber found for {}", connectPoint);
+ return false;
+ }
+
+ // Get the uplink port
+ Port uplinkPort = getUplinkPort(deviceService.getDevice(deviceId));
+ if (uplinkPort == null) {
+ log.warn(NO_UPLINK_PORT, deviceId);
+ return false;
+ }
+
+ // delete Eapol authentication flow with default bandwidth
+ // wait until Eapol rule with defaultBpId is removed to install subscriber-based rules
+ // retry deletion if it fails/times-out
+ retryExecutor.execute(new DeleteEapolInstallSub(connectPoint,
+ uplinkPort, sub, 1));
+ return true;
+ }
+
+ // returns true if subscriber is programmed or in the process of being programmed
+ private boolean isSubscriberInstalled(ConnectPoint connectPoint) {
+ Collection<? extends UniTagInformation> uniTagInformationSet =
+ programmedSubs.get(connectPoint).value();
+ if (!uniTagInformationSet.isEmpty()) {
+ return true;
+ }
+ //Check if the subscriber is already getting provisioned
+ // so we do not provision twice
+ AtomicBoolean isPending = new AtomicBoolean(false);
+ pendingSubscribersForDevice.computeIfPresent(connectPoint.deviceId(), (id, infos) -> {
+ for (SubscriberFlowInfo fi : infos) {
+ if (fi.getUniPort().equals(connectPoint.port())) {
+ isPending.set(true);
+ break;
+ }
+ }
+ return infos;
+ });
+
+ return isPending.get();
+ }
+
+ private class DeleteEapolInstallSub implements Runnable {
+ ConnectPoint cp;
+ Port uplinkPort;
+ SubscriberAndDeviceInformation sub;
+ private int attemptNumber;
+
+ DeleteEapolInstallSub(ConnectPoint cp, Port uplinkPort,
+ SubscriberAndDeviceInformation sub,
+ int attemptNumber) {
+ this.cp = cp;
+ this.uplinkPort = uplinkPort;
+ this.sub = sub;
+ this.attemptNumber = attemptNumber;
+ }
+
+ @Override
+ public void run() {
+ CompletableFuture<ObjectiveError> filterFuture = new CompletableFuture();
+ oltFlowService.processEapolFilteringObjectives(cp.deviceId(), cp.port(),
+ defaultBpId, filterFuture,
+ VlanId.vlanId(EAPOL_DEFAULT_VLAN),
+ false);
+ filterFuture.thenAcceptAsync(filterStatus -> {
+ if (filterStatus == null) {
+ log.info("Default eapol flow deleted in attempt {} of {}"
+ + "... provisioning subscriber flows {}",
+ attemptNumber, eapolDeleteRetryMaxAttempts, cp);
+
+ // FIXME this is needed to prevent that default EAPOL flow removal and
+ // data plane flows install are received by the device at the same time
+ provisionExecutor.schedule(
+ () -> provisionUniTagList(cp, uplinkPort.number(), sub),
+ provisionDelay, TimeUnit.MILLISECONDS);
+ } else {
+ if (attemptNumber <= eapolDeleteRetryMaxAttempts) {
+ log.warn("The filtering future failed {} for subscriber {}"
+ + "... retrying {} of {} attempts",
+ filterStatus, cp, attemptNumber, eapolDeleteRetryMaxAttempts);
+ retryExecutor.execute(
+ new DeleteEapolInstallSub(cp, uplinkPort, sub,
+ attemptNumber + 1));
+ } else {
+ log.error("The filtering future failed {} for subscriber {}"
+ + "after {} attempts. Subscriber provisioning failed",
+ filterStatus, cp, eapolDeleteRetryMaxAttempts);
+ sub.uniTagList().forEach(ut -> failedSubs.put(cp, ut));
+ }
+ }
+ });
+ }
+
+ }
+
+ @Override
+ public boolean removeSubscriber(ConnectPoint connectPoint) {
+ log.info("Call to un-provision subscriber at {}", connectPoint);
+
+ // Get the subscriber connected to this port from the local cache
+ // If we don't know about the subscriber there's no need to remove it
+ DeviceId deviceId = connectPoint.deviceId();
+ PortNumber subscriberPortNo = connectPoint.port();
+
+ Collection<? extends UniTagInformation> uniTagInformationSet = programmedSubs.get(connectPoint).value();
+ if (uniTagInformationSet == null || uniTagInformationSet.isEmpty()) {
+ log.warn("Subscriber on connectionPoint {} was not previously programmed, " +
+ "no need to remove it", connectPoint);
+ return true;
+ }
+
+ // Get the uplink port
+ Port uplinkPort = getUplinkPort(deviceService.getDevice(deviceId));
+ if (uplinkPort == null) {
+ log.warn(NO_UPLINK_PORT, deviceId);
+ return false;
+ }
+
+ for (UniTagInformation uniTag : uniTagInformationSet) {
+
+ if (multicastServiceName.equals(uniTag.getServiceName())) {
+ continue;
+ }
+
+ unprovisionVlans(deviceId, uplinkPort.number(), subscriberPortNo, uniTag);
+
+ // remove eapol with subscriber bandwidth profile
+ oltFlowService.processEapolFilteringObjectives(deviceId, subscriberPortNo,
+ uniTag.getUpstreamBandwidthProfile(),
+ null, uniTag.getPonCTag(), false);
+
+ Port port = deviceService.getPort(deviceId, subscriberPortNo);
+ if (port != null && port.isEnabled()) {
+ // reinstall eapol with default bandwidth profile
+ oltFlowService.processEapolFilteringObjectives(deviceId, subscriberPortNo, defaultBpId,
+ null, VlanId.vlanId(EAPOL_DEFAULT_VLAN), true);
+ } else {
+ log.debug("Port {} is no longer enabled or it's unavailable. Not "
+ + "reprogramming default eapol flow", connectPoint);
+ }
+ }
+ return true;
+ }
+
+
+ @Override
+ public boolean provisionSubscriber(AccessSubscriberId subscriberId, Optional<VlanId> sTag,
+ Optional<VlanId> cTag, Optional<Integer> tpId) {
+
+ log.info("Provisioning subscriber using subscriberId {}, sTag {}, cTag {}, tpId {}" +
+ "", subscriberId, sTag, cTag, tpId);
+
+ // Check if we can find the connect point to which this subscriber is connected
+ ConnectPoint subsPort = findSubscriberConnectPoint(subscriberId.toString());
+ if (subsPort == null) {
+ log.warn("ConnectPoint for {} not found", subscriberId);
+ return false;
+ }
+
+ if (!sTag.isPresent() && !cTag.isPresent()) {
+ return provisionSubscriber(subsPort);
+ } else if (sTag.isPresent() && cTag.isPresent() && tpId.isPresent()) {
+ Port uplinkPort = getUplinkPort(deviceService.getDevice(subsPort.deviceId()));
+ if (uplinkPort == null) {
+ log.warn(NO_UPLINK_PORT, subsPort.deviceId());
+ return false;
+ }
+
+ //delete Eapol authentication flow with default bandwidth
+ //wait until Eapol rule with defaultBpId is removed to install subscriber-based rules
+ //install subscriber flows
+ CompletableFuture<ObjectiveError> filterFuture = new CompletableFuture();
+ oltFlowService.processEapolFilteringObjectives(subsPort.deviceId(), subsPort.port(), defaultBpId,
+ filterFuture, VlanId.vlanId(EAPOL_DEFAULT_VLAN), false);
+ filterFuture.thenAcceptAsync(filterStatus -> {
+ if (filterStatus == null) {
+ provisionUniTagInformation(subsPort.deviceId(), uplinkPort.number(), subsPort.port(),
+ cTag.get(), sTag.get(), tpId.get());
+ }
+ });
+ return true;
+ } else {
+ log.warn("Provisioning failed for subscriber: {}", subscriberId);
+ return false;
+ }
+ }
+
+ @Override
+ public boolean removeSubscriber(AccessSubscriberId subscriberId, Optional<VlanId> sTag,
+ Optional<VlanId> cTag, Optional<Integer> tpId) {
+ // Check if we can find the connect point to which this subscriber is connected
+ ConnectPoint subsPort = findSubscriberConnectPoint(subscriberId.toString());
+ if (subsPort == null) {
+ log.warn("ConnectPoint for {} not found", subscriberId);
+ return false;
+ }
+
+ if (!sTag.isPresent() && !cTag.isPresent()) {
+ return removeSubscriber(subsPort);
+ } else if (sTag.isPresent() && cTag.isPresent() && tpId.isPresent()) {
+ // Get the uplink port
+ Port uplinkPort = getUplinkPort(deviceService.getDevice(subsPort.deviceId()));
+ if (uplinkPort == null) {
+ log.warn(NO_UPLINK_PORT, subsPort.deviceId());
+ return false;
+ }
+
+ Optional<UniTagInformation> tagInfo = getUniTagInformation(subsPort, cTag.get(), sTag.get(), tpId.get());
+ if (!tagInfo.isPresent()) {
+ log.warn("UniTagInformation does not exist for Device/Port {}, cTag {}, sTag {}, tpId {}",
+ subsPort, cTag, sTag, tpId);
+ return false;
+ }
+
+ unprovisionVlans(subsPort.deviceId(), uplinkPort.number(), subsPort.port(), tagInfo.get());
+ return true;
+ } else {
+ log.warn("Removing subscriber is not possible - please check the provided information" +
+ "for the subscriber: {}", subscriberId);
+ return false;
+ }
+ }
+
+ @Override
+ public ImmutableMap<ConnectPoint, Set<UniTagInformation>> getProgSubs() {
+ return programmedSubs.stream()
+ .collect(collectingAndThen(
+ groupingBy(Map.Entry::getKey, mapping(Map.Entry::getValue, toSet())),
+ ImmutableMap::copyOf));
+ }
+
+ @Override
+ public ImmutableMap<ConnectPoint, Set<UniTagInformation>> getFailedSubs() {
+ return failedSubs.stream()
+ .collect(collectingAndThen(
+ groupingBy(Map.Entry::getKey, mapping(Map.Entry::getValue, toSet())),
+ ImmutableMap::copyOf));
+ }
+
+ @Override
+ public List<DeviceId> fetchOlts() {
+ // look through all the devices and find the ones that are OLTs as per Sadis
+ List<DeviceId> olts = new ArrayList<>();
+ Iterable<Device> devices = deviceService.getDevices();
+ for (Device d : devices) {
+ if (getOltInfo(d) != null) {
+ // So this is indeed an OLT device
+ olts.add(d.id());
+ }
+ }
+ return olts;
+ }
+
+ /**
+ * Finds the connect point to which a subscriber is connected.
+ *
+ * @param id The id of the subscriber, this is the same ID as in Sadis
+ * @return Subscribers ConnectPoint if found else null
+ */
+ private ConnectPoint findSubscriberConnectPoint(String id) {
+
+ Iterable<Device> devices = deviceService.getDevices();
+ for (Device d : devices) {
+ for (Port p : deviceService.getPorts(d.id())) {
+ log.trace("Comparing {} with {}", p.annotations().value(AnnotationKeys.PORT_NAME), id);
+ if (p.annotations().value(AnnotationKeys.PORT_NAME).equals(id)) {
+ log.debug("Found on device {} port {}", d.id(), p.number());
+ return new ConnectPoint(d.id(), p.number());
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Gets the context of the bandwidth profile information for the given parameter.
+ *
+ * @param bandwidthProfile the bandwidth profile id
+ * @return the context of the bandwidth profile information
+ */
+ private BandwidthProfileInformation getBandwidthProfileInformation(String bandwidthProfile) {
+ if (bandwidthProfile == null) {
+ return null;
+ }
+ return bpService.get(bandwidthProfile);
+ }
+
+ /**
+ * Removes subscriber vlan flows.
+ *
+ * @param deviceId the device identifier
+ * @param uplink uplink port of the OLT
+ * @param subscriberPort uni port
+ * @param uniTag uni tag information
+ */
+ private void unprovisionVlans(DeviceId deviceId, PortNumber uplink,
+ PortNumber subscriberPort, UniTagInformation uniTag) {
+
+ log.info("Unprovisioning vlans for {} at {}/{}", uniTag, deviceId, subscriberPort);
+
+ CompletableFuture<ObjectiveError> downFuture = new CompletableFuture();
+ CompletableFuture<ObjectiveError> upFuture = new CompletableFuture();
+
+ VlanId deviceVlan = uniTag.getPonSTag();
+ VlanId subscriberVlan = uniTag.getPonCTag();
+
+ MeterId upstreamMeterId = oltMeterService
+ .getMeterIdFromBpMapping(deviceId, uniTag.getUpstreamBandwidthProfile());
+ MeterId downstreamMeterId = oltMeterService
+ .getMeterIdFromBpMapping(deviceId, uniTag.getDownstreamBandwidthProfile());
+
+ ForwardingObjective.Builder upFwd =
+ oltFlowService.createUpBuilder(uplink, subscriberPort, upstreamMeterId, uniTag);
+ ForwardingObjective.Builder downFwd =
+ oltFlowService.createDownBuilder(uplink, subscriberPort, downstreamMeterId, uniTag);
+
+ oltFlowService.processIgmpFilteringObjectives(deviceId, subscriberPort,
+ upstreamMeterId, uniTag, false, true);
+ oltFlowService.processDhcpFilteringObjectives(deviceId, subscriberPort,
+ upstreamMeterId, uniTag, false, true);
+ oltFlowService.processPPPoEDFilteringObjectives(deviceId, subscriberPort,
+ upstreamMeterId, uniTag, false, true);
+
+ flowObjectiveService.forward(deviceId, upFwd.remove(new ObjectiveContext() {
+ @Override
+ public void onSuccess(Objective objective) {
+ upFuture.complete(null);
+ }
+
+ @Override
+ public void onError(Objective objective, ObjectiveError error) {
+ upFuture.complete(error);
+ }
+ }));
+
+ flowObjectiveService.forward(deviceId, downFwd.remove(new ObjectiveContext() {
+ @Override
+ public void onSuccess(Objective objective) {
+ downFuture.complete(null);
+ }
+
+ @Override
+ public void onError(Objective objective, ObjectiveError error) {
+ downFuture.complete(error);
+ }
+ }));
+
+ upFuture.thenAcceptBothAsync(downFuture, (upStatus, downStatus) -> {
+ AccessDeviceEvent.Type type = AccessDeviceEvent.Type.SUBSCRIBER_UNI_TAG_UNREGISTERED;
+ if (upStatus == null && downStatus == null) {
+ log.debug("Uni tag information is unregistered successfully for cTag {}, sTag {}, tpId {}, and" +
+ "Device/Port{}", uniTag.getPonCTag(), uniTag.getPonSTag(),
+ uniTag.getTechnologyProfileId(), subscriberPort);
+ updateProgrammedSubscriber(new ConnectPoint(deviceId, subscriberPort), uniTag, false);
+ } else if (downStatus != null) {
+ log.error("Subscriber with vlan {} on device {} " +
+ "on port {} failed downstream uninstallation: {}",
+ subscriberVlan, deviceId, subscriberPort, downStatus);
+ type = AccessDeviceEvent.Type.SUBSCRIBER_UNI_TAG_UNREGISTRATION_FAILED;
+ } else if (upStatus != null) {
+ log.error("Subscriber with vlan {} on device {} " +
+ "on port {} failed upstream uninstallation: {}",
+ subscriberVlan, deviceId, subscriberPort, upStatus);
+ type = AccessDeviceEvent.Type.SUBSCRIBER_UNI_TAG_UNREGISTRATION_FAILED;
+ }
+ Port port = deviceService.getPort(deviceId, subscriberPort);
+ post(new AccessDeviceEvent(type, deviceId, port, deviceVlan, subscriberVlan,
+ uniTag.getTechnologyProfileId()));
+ }, oltInstallers);
+ }
+
+ /**
+ * Adds subscriber vlan flows, dhcp, eapol and igmp trap flows for the related uni port.
+ *
+ * @param connectPoint the connection point of the subscriber
+ * @param uplinkPort uplink port of the OLT (the nni port)
+ * @param sub subscriber information that includes s, c tags, tech profile and bandwidth profile references
+ */
+ private void provisionUniTagList(ConnectPoint connectPoint, PortNumber uplinkPort,
+ SubscriberAndDeviceInformation sub) {
+
+ log.debug("Provisioning vlans for subscriber on dev/port: {}", connectPoint.toString());
+ if (log.isTraceEnabled()) {
+ log.trace("Subscriber informations {}", sub);
+ }
+
+ if (sub.uniTagList() == null || sub.uniTagList().isEmpty()) {
+ log.warn("Unitaglist doesn't exist for the subscriber {} on dev/port {}",
+ sub.id(), connectPoint.toString());
+ return;
+ }
+
+ DeviceId deviceId = connectPoint.deviceId();
+ PortNumber subscriberPort = connectPoint.port();
+
+ for (UniTagInformation uniTag : sub.uniTagList()) {
+ handleSubscriberFlows(deviceId, uplinkPort, subscriberPort, uniTag);
+ }
+ }
+
+ /**
+ * Finds the uni tag information and provisions the found information.
+ * If the uni tag information is not found, returns
+ *
+ * @param deviceId the access device id
+ * @param uplinkPort the nni port
+ * @param subscriberPort the uni port
+ * @param innerVlan the pon c tag
+ * @param outerVlan the pon s tag
+ * @param tpId the technology profile id
+ */
+ private void provisionUniTagInformation(DeviceId deviceId, PortNumber uplinkPort,
+ PortNumber subscriberPort,
+ VlanId innerVlan,
+ VlanId outerVlan,
+ Integer tpId) {
+
+ ConnectPoint cp = new ConnectPoint(deviceId, subscriberPort);
+ Optional<UniTagInformation> gotTagInformation = getUniTagInformation(cp, innerVlan, outerVlan, tpId);
+ if (!gotTagInformation.isPresent()) {
+ return;
+ }
+ UniTagInformation tagInformation = gotTagInformation.get();
+ handleSubscriberFlows(deviceId, uplinkPort, subscriberPort, tagInformation);
+ }
+
+ private void updateProgrammedSubscriber(ConnectPoint connectPoint, UniTagInformation tagInformation, Boolean add) {
+ if (add) {
+ programmedSubs.put(connectPoint, tagInformation);
+ } else {
+ programmedSubs.remove(connectPoint, tagInformation);
+ }
+ }
+
+ /**
+ * Installs a uni tag information flow.
+ *
+ * @param deviceId the access device id
+ * @param uplinkPort the nni port
+ * @param subscriberPort the uni port
+ * @param tagInfo the uni tag information
+ */
+ private void handleSubscriberFlows(DeviceId deviceId, PortNumber uplinkPort, PortNumber subscriberPort,
+ UniTagInformation tagInfo) {
+
+ log.debug("Provisioning vlan-based flows for the uniTagInformation {} on dev/port {}/{}",
+ tagInfo, deviceId, subscriberPort);
+
+ Port port = deviceService.getPort(deviceId, subscriberPort);
+
+ if (multicastServiceName.equals(tagInfo.getServiceName())) {
+ // IGMP flows are taken care of along with VOD service
+ // Please note that for each service, Subscriber Registered event will be sent
+ post(new AccessDeviceEvent(AccessDeviceEvent.Type.SUBSCRIBER_UNI_TAG_REGISTERED,
+ deviceId, port, tagInfo.getPonSTag(), tagInfo.getPonCTag(),
+ tagInfo.getTechnologyProfileId()));
+ return;
+ }
+
+ BandwidthProfileInformation upstreamBpInfo =
+ getBandwidthProfileInformation(tagInfo.getUpstreamBandwidthProfile());
+ BandwidthProfileInformation downstreamBpInfo =
+ getBandwidthProfileInformation(tagInfo.getDownstreamBandwidthProfile());
+ if (upstreamBpInfo == null) {
+ log.warn("No meter installed since no Upstream BW Profile definition found for "
+ + "ctag {} stag {} tpId {} and dev/port: {}/{}",
+ tagInfo.getPonCTag(), tagInfo.getPonSTag(),
+ tagInfo.getTechnologyProfileId(), deviceId,
+ subscriberPort);
+ return;
+ }
+ if (downstreamBpInfo == null) {
+ log.warn("No meter installed since no Downstream BW Profile definition found for "
+ + "ctag {} stag {} tpId {} and dev/port: {}/{}",
+ tagInfo.getPonCTag(), tagInfo.getPonSTag(),
+ tagInfo.getTechnologyProfileId(), deviceId,
+ subscriberPort);
+ return;
+ }
+
+ // check for meterIds for the upstream and downstream bandwidth profiles
+ MeterId upMeterId = oltMeterService
+ .getMeterIdFromBpMapping(deviceId, upstreamBpInfo.id());
+ MeterId downMeterId = oltMeterService
+ .getMeterIdFromBpMapping(deviceId, downstreamBpInfo.id());
+ SubscriberFlowInfo fi = new SubscriberFlowInfo(deviceId, uplinkPort, subscriberPort,
+ tagInfo, downMeterId, upMeterId,
+ downstreamBpInfo.id(), upstreamBpInfo.id());
+
+ if (upMeterId != null && downMeterId != null) {
+ log.debug("Meters are existing for upstream {} and downstream {} on dev/port {}/{}",
+ upstreamBpInfo.id(), downstreamBpInfo.id(), deviceId, subscriberPort);
+ handleSubFlowsWithMeters(fi);
+ } else {
+ log.debug("Adding {} on {}/{} to pending subs", fi, deviceId, subscriberPort);
+ // one or both meters are not ready. It's possible they are in the process of being
+ // created for other subscribers that share the same bandwidth profile.
+ pendingSubscribersForDevice.compute(deviceId, (id, queue) -> {
+ if (queue == null) {
+ queue = new LinkedBlockingQueue<>();
+ }
+ queue.add(fi);
+ log.info("Added {} to pending subscribers on {}/{}", fi, deviceId, subscriberPort);
+ return queue;
+ });
+
+ // queue up the meters to be created
+ if (upMeterId == null) {
+ log.debug("Missing meter for upstream {} on {}/{}", upstreamBpInfo.id(), deviceId, subscriberPort);
+ checkAndCreateDevMeter(deviceId, upstreamBpInfo);
+ }
+ if (downMeterId == null) {
+ log.debug("Missing meter for downstream {} on {}/{}", downstreamBpInfo.id(), deviceId, subscriberPort);
+ checkAndCreateDevMeter(deviceId, downstreamBpInfo);
+ }
+ }
+ }
+ private void checkAndCreateDevMeter(DeviceId deviceId, BandwidthProfileInformation bwpInfo) {
+ //If false the meter is already being installed, skipping installation
+ if (!oltMeterService.checkAndAddPendingMeter(deviceId, bwpInfo)) {
+ return;
+ }
+ createMeter(deviceId, bwpInfo);
+ }
+
+ private void createMeter(DeviceId deviceId, BandwidthProfileInformation bwpInfo) {
+ log.debug("Creating Meter with {} on {}", bwpInfo, deviceId);
+ CompletableFuture<Object> meterFuture = new CompletableFuture<>();
+
+ MeterId meterId = oltMeterService.createMeter(deviceId, bwpInfo,
+ meterFuture);
+
+ meterFuture.thenAcceptAsync(result -> {
+ BlockingQueue<SubscriberFlowInfo> queue = pendingSubscribersForDevice.get(deviceId);
+ // iterate through the subscribers on hold
+ if (queue != null) {
+ while (true) {
+ //TODO this might return the reference and not the actual object so
+ // it can be actually swapped underneath us.
+ SubscriberFlowInfo fi = queue.peek();
+ if (fi == null) {
+ log.debug("No more subscribers pending on {}", deviceId);
+ break;
+ }
+ if (result == null) {
+ // meter install sent to device
+ log.debug("Meter {} installed for bw {} on {}", meterId, bwpInfo, deviceId);
+
+ MeterId upMeterId = oltMeterService
+ .getMeterIdFromBpMapping(deviceId, fi.getUpBpInfo());
+ MeterId downMeterId = oltMeterService
+ .getMeterIdFromBpMapping(deviceId, fi.getDownBpInfo());
+ if (upMeterId != null && downMeterId != null) {
+ log.debug("Provisioning subscriber after meter {} " +
+ "installation and both meters are present " +
+ "upstream {} and downstream {} on {}/{}",
+ meterId, upMeterId, downMeterId, deviceId, fi.getUniPort());
+ // put in the meterIds because when fi was first
+ // created there may or may not have been a meterId
+ // depending on whether the meter was created or
+ // not at that time.
+ fi.setUpMeterId(upMeterId);
+ fi.setDownMeterId(downMeterId);
+ handleSubFlowsWithMeters(fi);
+ queue.remove(fi);
+ }
+ oltMeterService.removeFromPendingMeters(deviceId, bwpInfo);
+ } else {
+ // meter install failed
+ log.error("Addition of subscriber {} on {}/{} failed due to meter " +
+ "{} with result {}", fi, deviceId, fi.getUniPort(),
+ meterId, result);
+ queue.remove(fi);
+ oltMeterService.removeFromPendingMeters(deviceId, bwpInfo);
+ }
+ }
+ } else {
+ log.info("No pending subscribers on {}", deviceId);
+ }
+ });
+
+ }
+ /**
+ * Add subscriber flows given meter information for both upstream and
+ * downstream directions.
+ *
+ * @param subscriberFlowInfo relevant information for subscriber
+ */
+ private void handleSubFlowsWithMeters(SubscriberFlowInfo subscriberFlowInfo) {
+ log.debug("Provisioning subscriber flows on {}/{} based on {}",
+ subscriberFlowInfo.getDevId(), subscriberFlowInfo.getUniPort(), subscriberFlowInfo);
+ UniTagInformation tagInfo = subscriberFlowInfo.getTagInfo();
+ CompletableFuture<ObjectiveError> upFuture = new CompletableFuture<>();
+ CompletableFuture<ObjectiveError> downFuture = new CompletableFuture<>();
+
+ ForwardingObjective.Builder upFwd =
+ oltFlowService.createUpBuilder(subscriberFlowInfo.getNniPort(), subscriberFlowInfo.getUniPort(),
+ subscriberFlowInfo.getUpId(), subscriberFlowInfo.getTagInfo());
+ flowObjectiveService.forward(subscriberFlowInfo.getDevId(), upFwd.add(new ObjectiveContext() {
+ @Override
+ public void onSuccess(Objective objective) {
+ log.debug("Upstream HSIA flow {} installed successfully on {}/{}",
+ subscriberFlowInfo, subscriberFlowInfo.getDevId(), subscriberFlowInfo.getUniPort());
+ upFuture.complete(null);
+ }
+
+ @Override
+ public void onError(Objective objective, ObjectiveError error) {
+ upFuture.complete(error);
+ }
+ }));
+
+ ForwardingObjective.Builder downFwd =
+ oltFlowService.createDownBuilder(subscriberFlowInfo.getNniPort(), subscriberFlowInfo.getUniPort(),
+ subscriberFlowInfo.getDownId(), subscriberFlowInfo.getTagInfo());
+ flowObjectiveService.forward(subscriberFlowInfo.getDevId(), downFwd.add(new ObjectiveContext() {
+ @Override
+ public void onSuccess(Objective objective) {
+ log.debug("Downstream HSIA flow {} installed successfully on {}/{}",
+ subscriberFlowInfo, subscriberFlowInfo.getDevId(), subscriberFlowInfo.getUniPort());
+ downFuture.complete(null);
+ }
+
+ @Override
+ public void onError(Objective objective, ObjectiveError error) {
+ downFuture.complete(error);
+ }
+ }));
+
+ upFuture.thenAcceptBothAsync(downFuture, (upStatus, downStatus) -> {
+ AccessDeviceEvent.Type type = AccessDeviceEvent.Type.SUBSCRIBER_UNI_TAG_REGISTERED;
+ if (downStatus != null) {
+ log.error("Flow with innervlan {} and outerVlan {} on {}/{} failed downstream installation: {}",
+ tagInfo.getPonCTag(), tagInfo.getPonSTag(), subscriberFlowInfo.getDevId(),
+ subscriberFlowInfo.getUniPort(), downStatus);
+ type = AccessDeviceEvent.Type.SUBSCRIBER_UNI_TAG_REGISTRATION_FAILED;
+ } else if (upStatus != null) {
+ log.error("Flow with innervlan {} and outerVlan {} on {}/{} failed downstream installation: {}",
+ tagInfo.getPonCTag(), tagInfo.getPonSTag(), subscriberFlowInfo.getDevId(),
+ subscriberFlowInfo.getUniPort(), upStatus);
+ type = AccessDeviceEvent.Type.SUBSCRIBER_UNI_TAG_REGISTRATION_FAILED;
+ } else {
+ log.debug("Upstream and downstream data plane flows are installed successfully " +
+ "for {}/{}", subscriberFlowInfo.getDevId(), subscriberFlowInfo.getUniPort());
+ oltFlowService.processEapolFilteringObjectives(subscriberFlowInfo.getDevId(),
+ subscriberFlowInfo.getUniPort(),
+ tagInfo.getUpstreamBandwidthProfile(),
+ null, tagInfo.getPonCTag(), true);
+ oltFlowService.processDhcpFilteringObjectives(subscriberFlowInfo.getDevId(),
+ subscriberFlowInfo.getUniPort(),
+ subscriberFlowInfo.getUpId(),
+ tagInfo, true, true);
+
+ oltFlowService.processIgmpFilteringObjectives(subscriberFlowInfo.getDevId(),
+ subscriberFlowInfo.getUniPort(),
+ subscriberFlowInfo.getUpId(),
+ tagInfo, true, true);
+
+ oltFlowService.processPPPoEDFilteringObjectives(subscriberFlowInfo.getDevId(),
+ subscriberFlowInfo.getUniPort(),
+ subscriberFlowInfo.getUpId(),
+ tagInfo, true, true);
+ updateProgrammedSubscriber(new ConnectPoint(subscriberFlowInfo.getDevId(),
+ subscriberFlowInfo.getUniPort()),
+ tagInfo, true);
+ }
+ post(new AccessDeviceEvent(type, subscriberFlowInfo.getDevId(),
+ deviceService.getPort(subscriberFlowInfo.getDevId(),
+ subscriberFlowInfo.getUniPort()),
+ tagInfo.getPonSTag(), tagInfo.getPonCTag(),
+ tagInfo.getTechnologyProfileId()));
+ }, oltInstallers);
+ }
+
+ /**
+ * Checks the subscriber uni tag list and find the uni tag information.
+ * using the pon c tag, pon s tag and the technology profile id
+ * May return Optional<null>
+ *
+ * @param cp the connection point of the subscriber
+ * @param innerVlan pon c tag
+ * @param outerVlan pon s tag
+ * @param tpId the technology profile id
+ * @return the found uni tag information
+ */
+ private Optional<UniTagInformation> getUniTagInformation(ConnectPoint cp, VlanId innerVlan, VlanId outerVlan,
+ int tpId) {
+ log.debug("Getting uni tag information for cp: {}, innerVlan: {}, outerVlan: {}, tpId: {}",
+ cp.toString(), innerVlan, outerVlan, tpId);
+ SubscriberAndDeviceInformation subInfo = getSubscriber(cp);
+ if (subInfo == null) {
+ log.warn("Subscriber information doesn't exist for the connect point {}", cp.toString());
+ return Optional.empty();
+ }
+
+ List<UniTagInformation> uniTagList = subInfo.uniTagList();
+ if (uniTagList == null) {
+ log.warn("Uni tag list is not found for the subscriber {} on {}", subInfo.id(), cp.toString());
+ return Optional.empty();
+ }
+
+ UniTagInformation service = null;
+ for (UniTagInformation tagInfo : subInfo.uniTagList()) {
+ if (innerVlan.equals(tagInfo.getPonCTag()) && outerVlan.equals(tagInfo.getPonSTag())
+ && tpId == tagInfo.getTechnologyProfileId()) {
+ service = tagInfo;
+ break;
+ }
+ }
+
+ if (service == null) {
+ log.warn("SADIS doesn't include the service with ponCtag {} ponStag {} and tpId {} on {}",
+ innerVlan, outerVlan, tpId, cp.toString());
+ return Optional.empty();
+ }
+
+ return Optional.of(service);
+ }
+
+ /**
+ * Creates trap flows for device, including DHCP and LLDP trap on NNI and
+ * EAPOL trap on the UNIs, if device is present in Sadis config.
+ *
+ * @param dev Device to look for
+ */
+ private void checkAndCreateDeviceFlows(Device dev) {
+ // check if this device is provisioned in Sadis
+ SubscriberAndDeviceInformation deviceInfo = getOltInfo(dev);
+ log.info("checkAndCreateDeviceFlows: deviceInfo {}", deviceInfo);
+
+ if (deviceInfo != null) {
+ // This is an OLT device as per Sadis, we create flows for UNI and NNI ports
+ for (Port p : deviceService.getPorts(dev.id())) {
+ if (PortNumber.LOCAL.equals(p.number()) || !p.isEnabled()) {
+ continue;
+ }
+ if (isUniPort(dev, p)) {
+ if (!programmedSubs.containsKey(new ConnectPoint(dev.id(), p.number()))) {
+ log.info("Creating Eapol on {}/{}", dev.id(), p.number());
+ oltFlowService.processEapolFilteringObjectives(dev.id(), p.number(), defaultBpId, null,
+ VlanId.vlanId(EAPOL_DEFAULT_VLAN), true);
+ } else {
+ log.debug("Subscriber Eapol on {}/{} is already provisioned, not installing default",
+ dev.id(), p.number());
+ }
+ } else {
+ oltFlowService.processNniFilteringObjectives(dev.id(), p.number(), true);
+ }
+ }
+ }
+ }
+
+
+ /**
+ * Get the uplink for of the OLT device.
+ * <p>
+ * This assumes that the OLT has a single uplink port. When more uplink ports need to be supported
+ * this logic needs to be changed
+ *
+ * @param dev Device to look for
+ * @return The uplink Port of the OLT
+ */
+ private Port getUplinkPort(Device dev) {
+ // check if this device is provisioned in Sadis
+ SubscriberAndDeviceInformation deviceInfo = getOltInfo(dev);
+ log.trace("getUplinkPort: deviceInfo {}", deviceInfo);
+ if (deviceInfo == null) {
+ log.warn("Device {} is not configured in SADIS .. cannot fetch device"
+ + " info", dev.id());
+ return null;
+ }
+ // Return the port that has been configured as the uplink port of this OLT in Sadis
+ Optional<Port> optionalPort = deviceService.getPorts(dev.id()).stream()
+ .filter(port -> isNniPort(port) ||
+ (port.number().toLong() == deviceInfo.uplinkPort()))
+ .findFirst();
+ if (optionalPort.isPresent()) {
+ log.trace("getUplinkPort: Found port {}", optionalPort.get());
+ return optionalPort.get();
+ }
+
+ log.warn("getUplinkPort: " + NO_UPLINK_PORT, dev.id());
+ return null;
+ }
+
+ /**
+ * Return the subscriber on a port.
+ *
+ * @param cp ConnectPoint on which to find the subscriber
+ * @return subscriber if found else null
+ */
+ SubscriberAndDeviceInformation getSubscriber(ConnectPoint cp) {
+ Port port = deviceService.getPort(cp);
+ checkNotNull(port, "Invalid connect point");
+ String portName = port.annotations().value(AnnotationKeys.PORT_NAME);
+ return subsService.get(portName);
+ }
+
+ /**
+ * Checks whether the given port of the device is a uni port or not.
+ *
+ * @param d the access device
+ * @param p the port of the device
+ * @return true if the given port is a uni port
+ */
+ private boolean isUniPort(Device d, Port p) {
+ Port ulPort = getUplinkPort(d);
+ if (ulPort != null) {
+ return (ulPort.number().toLong() != p.number().toLong());
+ }
+ //handles a special case where NNI port is misconfigured in SADIS and getUplinkPort(d) returns null
+ //checks whether the port name starts with nni- which is the signature of an NNI Port
+ if (p.annotations().value(AnnotationKeys.PORT_NAME) != null &&
+ p.annotations().value(AnnotationKeys.PORT_NAME).startsWith(NNI)) {
+ log.error("NNI port number {} is not matching with configured value", p.number().toLong());
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Gets the given device details from SADIS.
+ * If the device is not found, returns null
+ *
+ * @param dev the access device
+ * @return the olt information
+ */
+ private SubscriberAndDeviceInformation getOltInfo(Device dev) {
+ String devSerialNo = dev.serialNumber();
+ return subsService.get(devSerialNo);
+ }
+
+ // Custom-built function, when the device is not available we need a fallback mechanism
+ private boolean isLocalLeader(DeviceId deviceId) {
+ if (!mastershipService.isLocalMaster(deviceId)) {
+ // When the device is available we just check the mastership
+ if (deviceService.isAvailable(deviceId)) {
+ return false;
+ }
+ // Fallback with Leadership service - device id is used as topic
+ NodeId leader = leadershipService.runForLeadership(
+ deviceId.toString()).leaderNodeId();
+ // Verify if this node is the leader
+ return clusterService.getLocalNode().id().equals(leader);
+ }
+ return true;
+ }
+
+ private boolean isNniPort(Port port) {
+ if (port.annotations().keys().contains(AnnotationKeys.PORT_NAME)) {
+ return port.annotations().value(AnnotationKeys.PORT_NAME).contains(NNI);
+ }
+ return false;
+ }
+
+ private class InternalDeviceListener implements DeviceListener {
+ private Set<DeviceId> programmedDevices = Sets.newConcurrentHashSet();
+
+ @Override
+ public void event(DeviceEvent event) {
+ eventExecutor.execute(() -> {
+ DeviceId devId = event.subject().id();
+ Device dev = event.subject();
+ Port port = event.port();
+ DeviceEvent.Type eventType = event.type();
+
+ if (DeviceEvent.Type.PORT_STATS_UPDATED.equals(eventType) ||
+ DeviceEvent.Type.DEVICE_SUSPENDED.equals(eventType) ||
+ DeviceEvent.Type.DEVICE_UPDATED.equals(eventType)) {
+ return;
+ }
+
+ boolean isLocalLeader = isLocalLeader(devId);
+ // Only handle the event if the device belongs to us
+ if (!isLocalLeader && event.type().equals(DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED)
+ && !deviceService.isAvailable(devId) && deviceService.getPorts(devId).isEmpty()) {
+ log.info("Cleaning local state for non master instance upon " +
+ "device disconnection {}", devId);
+ // Since no mastership of the device is present upon disconnection
+ // the method in the FlowRuleManager only empties the local copy
+ // of the DeviceFlowTable thus this method needs to get called
+ // on every instance, see how it's done in the InternalDeviceListener
+ // in FlowRuleManager: no mastership check for purgeOnDisconnection
+ handleDeviceDisconnection(dev, false, false);
+ return;
+ } else if (!isLocalLeader) {
+ log.debug("Not handling event because instance is not leader for {}", devId);
+ return;
+ }
+
+ log.debug("OLT got {} event for {}/{}", eventType, event.subject(), event.port());
+
+ if (getOltInfo(dev) == null) {
+ // it's possible that we got an event for a previously
+ // programmed OLT that is no longer available in SADIS
+ // we let such events go through
+ if (!programmedDevices.contains(devId)) {
+ log.warn("No device info found for {}, this is either "
+ + "not an OLT or not known to sadis", dev);
+ return;
+ }
+ }
+
+ switch (event.type()) {
+ //TODO: Port handling and bookkeeping should be improved once
+ // olt firmware handles correct behaviour.
+ case PORT_ADDED:
+ if (!deviceService.isAvailable(devId)) {
+ log.warn("Received {} for disconnected device {}, ignoring", event, devId);
+ return;
+ }
+ if (isUniPort(dev, port)) {
+ post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_ADDED, devId, port));
+
+ if (port.isEnabled() && !port.number().equals(PortNumber.LOCAL)) {
+ log.info("eapol will be sent for port added {}/{}", devId, port);
+ oltFlowService.processEapolFilteringObjectives(devId, port.number(), defaultBpId,
+ null,
+ VlanId.vlanId(EAPOL_DEFAULT_VLAN),
+ true);
+ }
+ } else {
+ SubscriberAndDeviceInformation deviceInfo = getOltInfo(dev);
+ if (deviceInfo != null) {
+ oltFlowService.processNniFilteringObjectives(dev.id(), port.number(), true);
+ }
+ }
+ break;
+ case PORT_REMOVED:
+ if (isUniPort(dev, port)) {
+ // if no subscriber is provisioned we need to remove the default EAPOL
+ // if a subscriber was provisioned the default EAPOL will not be there and we can skip.
+ // The EAPOL with subscriber tag will be removed by removeSubscriber call.
+ Collection<? extends UniTagInformation> uniTagInformationSet =
+ programmedSubs.get(new ConnectPoint(port.element().id(), port.number())).value();
+ if (uniTagInformationSet == null || uniTagInformationSet.isEmpty()) {
+ log.info("No subscriber provisioned on port {}/{} in PORT_REMOVED event, " +
+ "removing default EAPOL flow", devId, port);
+ oltFlowService.processEapolFilteringObjectives(devId, port.number(), defaultBpId,
+ null,
+ VlanId.vlanId(EAPOL_DEFAULT_VLAN),
+ false);
+ } else {
+ removeSubscriber(new ConnectPoint(devId, port.number()));
+ }
+
+ post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_REMOVED, devId, port));
+ }
+ break;
+ case PORT_UPDATED:
+ if (!deviceService.isAvailable(devId)) {
+ log.warn("Received {} for disconnected device {}, ignoring", event, devId);
+ return;
+ }
+ if (!isUniPort(dev, port)) {
+ SubscriberAndDeviceInformation deviceInfo = getOltInfo(dev);
+ if (deviceInfo != null && port.isEnabled()) {
+ log.debug("NNI dev/port {}/{} enabled", dev.id(),
+ port.number());
+ oltFlowService.processNniFilteringObjectives(dev.id(),
+ port.number(), true);
+ }
+ return;
+ }
+ ConnectPoint cp = new ConnectPoint(devId, port.number());
+ Collection<? extends UniTagInformation> uniTagInformationSet = programmedSubs.get(cp).value();
+ if (uniTagInformationSet == null || uniTagInformationSet.isEmpty()) {
+ if (!port.number().equals(PortNumber.LOCAL)) {
+ log.info("eapol will be {} for dev/port updated {}/{} with default vlan {}",
+ (port.isEnabled()) ? "added" : "removed",
+ devId, port.number(), EAPOL_DEFAULT_VLAN);
+ oltFlowService.processEapolFilteringObjectives(devId, port.number(), defaultBpId,
+ null,
+ VlanId.vlanId(EAPOL_DEFAULT_VLAN),
+ port.isEnabled());
+ }
+ } else {
+ log.info("eapol will be {} for dev/port updated {}/{}",
+ (port.isEnabled()) ? "added" : "removed",
+ devId, port.number());
+ uniTagInformationSet.forEach(uniTag ->
+ oltFlowService.processEapolFilteringObjectives(devId, port.number(),
+ uniTag.getUpstreamBandwidthProfile(), null,
+ uniTag.getPonCTag(), port.isEnabled()));
+ }
+ if (port.isEnabled()) {
+ post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_ADDED, devId, port));
+ } else {
+ post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_REMOVED, devId, port));
+ }
+ break;
+ case DEVICE_ADDED:
+ handleDeviceConnection(dev, true);
+ break;
+ case DEVICE_REMOVED:
+ handleDeviceDisconnection(dev, true, true);
+ break;
+ case DEVICE_AVAILABILITY_CHANGED:
+ if (deviceService.isAvailable(devId)) {
+ log.info("Handling available device: {}", dev.id());
+ handleDeviceConnection(dev, false);
+ } else {
+ if (deviceService.getPorts(devId).isEmpty()) {
+ log.info("Handling controlled device disconnection .. "
+ + "flushing all state for dev:{}", devId);
+ handleDeviceDisconnection(dev, true, false);
+ } else {
+ log.info("Disconnected device has available ports .. "
+ + "assuming temporary disconnection, "
+ + "retaining state for device {}", devId);
+ }
+ }
+ break;
+ default:
+ log.debug("Not handling event {}", event);
+ return;
+ }
+ });
+ }
+
+ private void sendUniEvent(Device device, AccessDeviceEvent.Type eventType) {
+ deviceService.getPorts(device.id()).stream()
+ .filter(p -> !PortNumber.LOCAL.equals(p.number()))
+ .filter(p -> isUniPort(device, p))
+ .forEach(p -> post(new AccessDeviceEvent(eventType, device.id(), p)));
+ }
+
+ private void handleDeviceDisconnection(Device device, boolean sendDisconnectedEvent, boolean sendUniEvent) {
+ programmedDevices.remove(device.id());
+ removeAllSubscribers(device.id());
+ //Handle case where OLT disconnects during subscriber provisioning
+ pendingSubscribersForDevice.remove(device.id());
+ oltFlowService.clearDeviceState(device.id());
+
+ //Complete meter and flow purge
+ flowRuleService.purgeFlowRules(device.id());
+ oltMeterService.clearMeters(device.id());
+ if (sendDisconnectedEvent) {
+ post(new AccessDeviceEvent(
+ AccessDeviceEvent.Type.DEVICE_DISCONNECTED, device.id(),
+ null, null, null));
+ }
+ if (sendUniEvent) {
+ sendUniEvent(device, AccessDeviceEvent.Type.UNI_REMOVED);
+ }
+ }
+
+ private void handleDeviceConnection(Device dev, boolean sendUniEvent) {
+ post(new AccessDeviceEvent(
+ AccessDeviceEvent.Type.DEVICE_CONNECTED, dev.id(),
+ null, null, null));
+ programmedDevices.add(dev.id());
+ checkAndCreateDeviceFlows(dev);
+ if (sendUniEvent) {
+ sendUniEvent(dev, AccessDeviceEvent.Type.UNI_ADDED);
+ }
+ }
+
+ private void removeAllSubscribers(DeviceId deviceId) {
+ List<Map.Entry<ConnectPoint, UniTagInformation>> subs = programmedSubs.stream()
+ .filter(e -> e.getKey().deviceId().equals(deviceId))
+ .collect(toList());
+
+ subs.forEach(e -> programmedSubs.remove(e.getKey(), e.getValue()));
+ }
+
+ }
+
+ private class InternalClusterListener implements ClusterEventListener {
+
+ @Override
+ public void event(ClusterEvent event) {
+ if (event.type() == ClusterEvent.Type.INSTANCE_READY) {
+ hasher.addServer(event.subject().id());
+ }
+ if (event.type() == ClusterEvent.Type.INSTANCE_DEACTIVATED) {
+ hasher.removeServer(event.subject().id());
+ }
+ }
+ }
+
+}
diff --git a/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java b/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
new file mode 100644
index 0000000..14fa4f9
--- /dev/null
+++ b/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
@@ -0,0 +1,903 @@
+/*
+ * Copyright 2016-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.olt.impl;
+
+import org.onlab.packet.EthType;
+import org.onlab.packet.IPv4;
+import org.onlab.packet.IPv6;
+import org.onlab.packet.MacAddress;
+import org.onlab.packet.TpPort;
+import org.onlab.packet.VlanId;
+import org.onlab.util.Tools;
+import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.AnnotationKeys;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.Port;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.flow.DefaultTrafficSelector;
+import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.TrafficSelector;
+import org.onosproject.net.flow.TrafficTreatment;
+import org.onosproject.net.flow.criteria.Criteria;
+import org.onosproject.net.flowobjective.DefaultFilteringObjective;
+import org.onosproject.net.flowobjective.DefaultForwardingObjective;
+import org.onosproject.net.flowobjective.FilteringObjective;
+import org.onosproject.net.flowobjective.FlowObjectiveService;
+import org.onosproject.net.flowobjective.ForwardingObjective;
+import org.onosproject.net.flowobjective.Objective;
+import org.onosproject.net.flowobjective.ObjectiveContext;
+import org.onosproject.net.flowobjective.ObjectiveError;
+import org.onosproject.net.meter.MeterId;
+import org.opencord.olt.internalapi.AccessDeviceFlowService;
+import org.opencord.olt.internalapi.AccessDeviceMeterService;
+import org.opencord.sadis.BandwidthProfileInformation;
+import org.opencord.sadis.BaseInformationService;
+import org.opencord.sadis.SadisService;
+import org.opencord.sadis.SubscriberAndDeviceInformation;
+import org.opencord.sadis.UniTagInformation;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Modified;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.slf4j.Logger;
+
+import java.util.Dictionary;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static org.onlab.util.Tools.get;
+import static org.opencord.olt.impl.OsgiPropertyConstants.*;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Provisions flow rules on access devices.
+ */
+@Component(immediate = true, property = {
+ ENABLE_DHCP_ON_NNI + ":Boolean=" + ENABLE_DHCP_ON_NNI_DEFAULT,
+ ENABLE_DHCP_V4 + ":Boolean=" + ENABLE_DHCP_V4_DEFAULT,
+ ENABLE_DHCP_V6 + ":Boolean=" + ENABLE_DHCP_V6_DEFAULT,
+ ENABLE_IGMP_ON_NNI + ":Boolean=" + ENABLE_IGMP_ON_NNI_DEFAULT,
+ ENABLE_EAPOL + ":Boolean=" + ENABLE_EAPOL_DEFAULT,
+ ENABLE_PPPOE + ":Boolean=" + ENABLE_PPPOE_DEFAULT,
+ DEFAULT_TP_ID + ":Integer=" + DEFAULT_TP_ID_DEFAULT
+})
+public class OltFlowService implements AccessDeviceFlowService {
+
+ private static final String APP_NAME = "org.opencord.olt";
+ private static final int NONE_TP_ID = -1;
+ private static final int NO_PCP = -1;
+ private static final Integer MAX_PRIORITY = 10000;
+ private static final Integer MIN_PRIORITY = 1000;
+ private static final String INSTALLED = "installed";
+ private static final String REMOVED = "removed";
+ private static final String INSTALLATION = "installation";
+ private static final String REMOVAL = "removal";
+ private static final String V4 = "V4";
+ private static final String V6 = "V6";
+
+ private final Logger log = getLogger(getClass());
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected FlowObjectiveService flowObjectiveService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected CoreService coreService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected MastershipService mastershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected SadisService sadisService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected DeviceService deviceService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected AccessDeviceMeterService oltMeterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected ComponentConfigService componentConfigService;
+
+ /**
+ * Create DHCP trap flow on NNI port(s).
+ */
+ protected boolean enableDhcpOnNni = ENABLE_DHCP_ON_NNI_DEFAULT;
+
+ /**
+ * Enable flows for DHCP v4 if dhcp is required in sadis config.
+ **/
+ protected boolean enableDhcpV4 = ENABLE_DHCP_V4_DEFAULT;
+
+ /**
+ * Enable flows for DHCP v6 if dhcp is required in sadis config.
+ **/
+ protected boolean enableDhcpV6 = ENABLE_DHCP_V6_DEFAULT;
+
+ /**
+ * Create IGMP trap flow on NNI port(s).
+ **/
+ protected boolean enableIgmpOnNni = ENABLE_IGMP_ON_NNI_DEFAULT;
+
+ /**
+ * Send EAPOL authentication trap flows before subscriber provisioning.
+ **/
+ protected boolean enableEapol = ENABLE_EAPOL_DEFAULT;
+
+ /**
+ * Send PPPoED authentication trap flows before subscriber provisioning.
+ **/
+ protected boolean enablePppoe = ENABLE_PPPOE_DEFAULT;
+
+ /**
+ * Default technology profile id that is used for authentication trap flows.
+ **/
+ protected int defaultTechProfileId = DEFAULT_TP_ID_DEFAULT;
+
+ protected ApplicationId appId;
+ protected BaseInformationService<BandwidthProfileInformation> bpService;
+ protected BaseInformationService<SubscriberAndDeviceInformation> subsService;
+ private ConcurrentMap<DeviceId, BlockingQueue<SubscriberFlowInfo>> pendingEapolForDevice
+ = new ConcurrentHashMap<>();
+
+ @Activate
+ public void activate(ComponentContext context) {
+ bpService = sadisService.getBandwidthProfileService();
+ subsService = sadisService.getSubscriberInfoService();
+ componentConfigService.registerProperties(getClass());
+ appId = coreService.getAppId(APP_NAME);
+ log.info("started");
+ }
+
+
+ @Deactivate
+ public void deactivate(ComponentContext context) {
+ componentConfigService.unregisterProperties(getClass(), false);
+ log.info("stopped");
+ }
+
+ @Modified
+ public void modified(ComponentContext context) {
+
+ Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
+
+ Boolean o = Tools.isPropertyEnabled(properties, ENABLE_DHCP_ON_NNI);
+ if (o != null) {
+ enableDhcpOnNni = o;
+ }
+
+ Boolean v4 = Tools.isPropertyEnabled(properties, ENABLE_DHCP_V4);
+ if (v4 != null) {
+ enableDhcpV4 = v4;
+ }
+
+ Boolean v6 = Tools.isPropertyEnabled(properties, ENABLE_DHCP_V6);
+ if (v6 != null) {
+ enableDhcpV6 = v6;
+ }
+
+ Boolean p = Tools.isPropertyEnabled(properties, ENABLE_IGMP_ON_NNI);
+ if (p != null) {
+ enableIgmpOnNni = p;
+ }
+
+ Boolean eap = Tools.isPropertyEnabled(properties, ENABLE_EAPOL);
+ if (eap != null) {
+ enableEapol = eap;
+ }
+
+ Boolean pppoe = Tools.isPropertyEnabled(properties, ENABLE_PPPOE);
+ if (pppoe != null) {
+ enablePppoe = pppoe;
+ }
+
+ String tpId = get(properties, DEFAULT_TP_ID);
+ defaultTechProfileId = isNullOrEmpty(tpId) ? DEFAULT_TP_ID_DEFAULT : Integer.parseInt(tpId.trim());
+
+ log.info("modified. Values = enableDhcpOnNni: {}, enableDhcpV4: {}, " +
+ "enableDhcpV6:{}, enableIgmpOnNni:{}, " +
+ "enableEapol{}, enablePppoe{}, defaultTechProfileId: {}",
+ enableDhcpOnNni, enableDhcpV4, enableDhcpV6,
+ enableIgmpOnNni, enableEapol, enablePppoe,
+ defaultTechProfileId);
+
+ }
+
+ @Override
+ public void processDhcpFilteringObjectives(DeviceId devId, PortNumber port,
+ MeterId upstreamMeterId,
+ UniTagInformation tagInformation,
+ boolean install,
+ boolean upstream) {
+ if (upstream) {
+ // for UNI ports
+ if (tagInformation != null && !tagInformation.getIsDhcpRequired()) {
+ log.debug("Dhcp provisioning is disabled for UNI port {} on "
+ + "device {} for service {}", port, devId,
+ tagInformation.getServiceName());
+ return;
+ }
+ } else {
+ // for NNI ports
+ if (!enableDhcpOnNni) {
+ log.debug("Dhcp provisioning is disabled for NNI port {} on "
+ + "device {}", port, devId);
+ return;
+ }
+ }
+ int techProfileId = tagInformation != null ? tagInformation.getTechnologyProfileId() : NONE_TP_ID;
+ VlanId cTag = tagInformation != null ? tagInformation.getPonCTag() : VlanId.NONE;
+ VlanId unitagMatch = tagInformation != null ? tagInformation.getUniTagMatch() : VlanId.ANY;
+ Byte vlanPcp = tagInformation != null && tagInformation.getUsPonCTagPriority() != NO_PCP
+ ? (byte) tagInformation.getUsPonCTagPriority() : null;
+
+
+ if (enableDhcpV4) {
+ int udpSrc = (upstream) ? 68 : 67;
+ int udpDst = (upstream) ? 67 : 68;
+
+ EthType ethType = EthType.EtherType.IPV4.ethType();
+ byte protocol = IPv4.PROTOCOL_UDP;
+
+ addDhcpFilteringObjectives(devId, port, udpSrc, udpDst, ethType,
+ upstreamMeterId, techProfileId, protocol, cTag, unitagMatch,
+ vlanPcp, upstream, install);
+ }
+
+ if (enableDhcpV6) {
+ int udpSrc = (upstream) ? 547 : 546;
+ int udpDst = (upstream) ? 546 : 547;
+
+ EthType ethType = EthType.EtherType.IPV6.ethType();
+ byte protocol = IPv6.PROTOCOL_UDP;
+
+ addDhcpFilteringObjectives(devId, port, udpSrc, udpDst, ethType,
+ upstreamMeterId, techProfileId, protocol, cTag, unitagMatch,
+ vlanPcp, upstream, install);
+ }
+ }
+
+ private void addDhcpFilteringObjectives(DeviceId devId, PortNumber port, int udpSrc, int udpDst,
+ EthType ethType, MeterId upstreamMeterId, int techProfileId, byte protocol,
+ VlanId cTag, VlanId unitagMatch,
+ Byte vlanPcp, boolean upstream,
+ boolean install) {
+
+ DefaultFilteringObjective.Builder builder = DefaultFilteringObjective.builder();
+ TrafficTreatment.Builder treatmentBuilder = DefaultTrafficTreatment.builder();
+
+ if (upstreamMeterId != null) {
+ treatmentBuilder.meter(upstreamMeterId);
+ }
+
+ if (techProfileId != NONE_TP_ID) {
+ treatmentBuilder.writeMetadata(createTechProfValueForWm(unitagMatch, techProfileId), 0);
+ }
+
+ FilteringObjective.Builder dhcpUpstreamBuilder = (install ? builder.permit() : builder.deny())
+ .withKey(Criteria.matchInPort(port))
+ .addCondition(Criteria.matchEthType(ethType))
+ .addCondition(Criteria.matchIPProtocol(protocol))
+ .addCondition(Criteria.matchUdpSrc(TpPort.tpPort(udpSrc)))
+ .addCondition(Criteria.matchUdpDst(TpPort.tpPort(udpDst)))
+ .fromApp(appId)
+ .withPriority(MAX_PRIORITY);
+
+ //VLAN changes and PCP matching need to happen only in the upstream directions
+ if (upstream) {
+ treatmentBuilder.setVlanId(cTag);
+ if (!VlanId.vlanId(VlanId.NO_VID).equals(unitagMatch)) {
+ dhcpUpstreamBuilder.addCondition(Criteria.matchVlanId(unitagMatch));
+ }
+ if (vlanPcp != null) {
+ treatmentBuilder.setVlanPcp(vlanPcp);
+ }
+ }
+
+ dhcpUpstreamBuilder.withMeta(treatmentBuilder
+ .setOutput(PortNumber.CONTROLLER).build());
+
+
+ FilteringObjective dhcpUpstream = dhcpUpstreamBuilder.add(new ObjectiveContext() {
+ @Override
+ public void onSuccess(Objective objective) {
+ log.info("DHCP {} filter for dev/port {}/{} {}.",
+ (ethType.equals(EthType.EtherType.IPV4.ethType())) ? V4 : V6,
+ devId, port, (install) ? INSTALLED : REMOVED);
+ }
+
+ @Override
+ public void onError(Objective objective, ObjectiveError error) {
+ log.error("DHCP {} filter for dev/port {}/{} failed {} because {}",
+ (ethType.equals(EthType.EtherType.IPV4.ethType())) ? V4 : V6,
+ devId, port, (install) ? INSTALLATION : REMOVAL,
+ error);
+ }
+ });
+ flowObjectiveService.filter(devId, dhcpUpstream);
+
+ }
+
+ @Override
+ public void processPPPoEDFilteringObjectives(DeviceId devId, PortNumber portNumber,
+ MeterId upstreamMeterId,
+ UniTagInformation tagInformation,
+ boolean install,
+ boolean upstream) {
+ if (!enablePppoe) {
+ log.debug("PPPoED filtering is disabled. Ignoring request.");
+ return;
+ }
+
+ int techProfileId = NONE_TP_ID;
+ VlanId cTag = VlanId.NONE;
+ VlanId unitagMatch = VlanId.ANY;
+ Byte vlanPcp = null;
+
+ if (tagInformation != null) {
+ techProfileId = tagInformation.getTechnologyProfileId();
+ cTag = tagInformation.getPonCTag();
+ unitagMatch = tagInformation.getUniTagMatch();
+ if (tagInformation.getUsPonCTagPriority() != NO_PCP) {
+ vlanPcp = (byte) tagInformation.getUsPonCTagPriority();
+ }
+ }
+
+ DefaultFilteringObjective.Builder builder = DefaultFilteringObjective.builder();
+ TrafficTreatment.Builder treatmentBuilder = DefaultTrafficTreatment.builder();
+ CompletableFuture<Object> meterFuture = new CompletableFuture<>();
+
+ if (upstreamMeterId != null) {
+ treatmentBuilder.meter(upstreamMeterId);
+ }
+
+ if (techProfileId != NONE_TP_ID) {
+ treatmentBuilder.writeMetadata(createTechProfValueForWm(cTag, techProfileId), 0);
+ }
+
+ DefaultFilteringObjective.Builder pppoedBuilder = (install ? builder.permit() : builder.deny())
+ .withKey(Criteria.matchInPort(portNumber))
+ .addCondition(Criteria.matchEthType(EthType.EtherType.PPPoED.ethType()))
+ .fromApp(appId)
+ .withPriority(10000);
+
+ if (upstream) {
+ treatmentBuilder.setVlanId(cTag);
+ if (!VlanId.vlanId(VlanId.NO_VID).equals(unitagMatch)) {
+ pppoedBuilder.addCondition(Criteria.matchVlanId(unitagMatch));
+ }
+ if (vlanPcp != null) {
+ treatmentBuilder.setVlanPcp(vlanPcp);
+ }
+ }
+ pppoedBuilder = pppoedBuilder.withMeta(treatmentBuilder.setOutput(PortNumber.CONTROLLER).build());
+
+ FilteringObjective pppoed = pppoedBuilder
+ .add(new ObjectiveContext() {
+ @Override
+ public void onSuccess(Objective objective) {
+ log.info("PPPoED filter for {} on {} {}.",
+ devId, portNumber, (install) ? INSTALLED : REMOVED);
+ }
+
+ @Override
+ public void onError(Objective objective, ObjectiveError error) {
+ log.info("PPPoED filter for {} on {} failed {} because {}",
+ devId, portNumber, (install) ? INSTALLATION : REMOVAL, error);
+ }
+ });
+ flowObjectiveService.filter(devId, pppoed);
+ }
+
+ @Override
+ public void processIgmpFilteringObjectives(DeviceId devId, PortNumber port,
+ MeterId upstreamMeterId,
+ UniTagInformation tagInformation,
+ boolean install,
+ boolean upstream) {
+ if (upstream) {
+ // for UNI ports
+ if (tagInformation != null && !tagInformation.getIsIgmpRequired()) {
+ log.debug("Igmp provisioning is disabled for UNI port {} on "
+ + "device {} for service {}", port, devId,
+ tagInformation.getServiceName());
+ return;
+ }
+ } else {
+ // for NNI ports
+ if (!enableIgmpOnNni) {
+ log.debug("Igmp provisioning is disabled for NNI port {} on device {}",
+ port, devId);
+ return;
+ }
+ }
+
+ log.debug("{} IGMP flows on {}:{}", (install) ?
+ "Installing" : "Removing", devId, port);
+ DefaultFilteringObjective.Builder filterBuilder = DefaultFilteringObjective.builder();
+ TrafficTreatment.Builder treatmentBuilder = DefaultTrafficTreatment.builder();
+ if (upstream) {
+
+ if (tagInformation.getTechnologyProfileId() != NONE_TP_ID) {
+ treatmentBuilder.writeMetadata(createTechProfValueForWm(null,
+ tagInformation.getTechnologyProfileId()), 0);
+ }
+
+
+ if (upstreamMeterId != null) {
+ treatmentBuilder.meter(upstreamMeterId);
+ }
+
+ if (!VlanId.vlanId(VlanId.NO_VID).equals(tagInformation.getUniTagMatch())) {
+ filterBuilder.addCondition(Criteria.matchVlanId(tagInformation.getUniTagMatch()));
+ }
+
+ if (!VlanId.vlanId(VlanId.NO_VID).equals(tagInformation.getPonCTag())) {
+ treatmentBuilder.setVlanId(tagInformation.getPonCTag());
+ }
+
+ if (tagInformation.getUsPonCTagPriority() != NO_PCP) {
+ treatmentBuilder.setVlanPcp((byte) tagInformation.getUsPonCTagPriority());
+ }
+ }
+
+ filterBuilder = install ? filterBuilder.permit() : filterBuilder.deny();
+
+ FilteringObjective igmp = filterBuilder
+ .withKey(Criteria.matchInPort(port))
+ .addCondition(Criteria.matchEthType(EthType.EtherType.IPV4.ethType()))
+ .addCondition(Criteria.matchIPProtocol(IPv4.PROTOCOL_IGMP))
+ .withMeta(treatmentBuilder
+ .setOutput(PortNumber.CONTROLLER).build())
+ .fromApp(appId)
+ .withPriority(MAX_PRIORITY)
+ .add(new ObjectiveContext() {
+ @Override
+ public void onSuccess(Objective objective) {
+ log.info("Igmp filter for dev/port {}/{} {}.",
+ devId, port, (install) ? INSTALLED : REMOVED);
+ }
+
+ @Override
+ public void onError(Objective objective, ObjectiveError error) {
+ log.error("Igmp filter for dev/port {}/{} failed {} because {}.",
+ devId, port, (install) ? INSTALLATION : REMOVAL,
+ error);
+ }
+ });
+
+ flowObjectiveService.filter(devId, igmp);
+ }
+
+ @Override
+ public void processEapolFilteringObjectives(DeviceId devId, PortNumber portNumber, String bpId,
+ CompletableFuture<ObjectiveError> filterFuture,
+ VlanId vlanId, boolean install) {
+
+ if (!enableEapol) {
+ log.debug("Eapol filtering is disabled. Completing filteringFuture immediately for the device {}", devId);
+ if (filterFuture != null) {
+ filterFuture.complete(null);
+ }
+ return;
+ }
+ log.info("Processing EAPOL with Bandwidth profile {} on {}/{}", bpId,
+ devId, portNumber);
+ BandwidthProfileInformation bpInfo = getBandwidthProfileInformation(bpId);
+ if (bpInfo == null) {
+ log.warn("Bandwidth profile {} is not found. Authentication flow"
+ + " will not be installed on {}/{}", bpId, devId, portNumber);
+ if (filterFuture != null) {
+ filterFuture.complete(ObjectiveError.BADPARAMS);
+ }
+ return;
+ }
+
+ ConnectPoint cp = new ConnectPoint(devId, portNumber);
+ DefaultFilteringObjective.Builder filterBuilder = DefaultFilteringObjective.builder();
+ TrafficTreatment.Builder treatmentBuilder = DefaultTrafficTreatment.builder();
+ CompletableFuture<Object> meterFuture = new CompletableFuture<>();
+ // check if meter exists and create it only for an install
+ final MeterId meterId = oltMeterService.getMeterIdFromBpMapping(devId, bpInfo.id());
+ log.info("Meter id {} for Bandwidth profile {} associated to EAPOL on {}", meterId, bpInfo.id(), devId);
+ if (meterId == null) {
+ if (install) {
+ log.debug("Need to install meter for EAPOL with bwp {} on dev/port {}", bpInfo.id(), cp.toString());
+ SubscriberFlowInfo fi = new SubscriberFlowInfo(devId, null, cp.port(),
+ new UniTagInformation.Builder()
+ .setPonCTag(vlanId).build(),
+ null, null,
+ null, bpInfo.id());
+ pendingEapolForDevice.compute(devId, (id, queue) -> {
+ if (queue == null) {
+ queue = new LinkedBlockingQueue<>();
+ }
+ queue.add(fi);
+ return queue;
+ });
+
+ //If false the meter is already being installed, skipping installation
+ if (!oltMeterService.checkAndAddPendingMeter(devId, bpInfo)) {
+ return;
+ }
+ MeterId innerMeterId = oltMeterService.createMeter(devId, bpInfo,
+ meterFuture);
+ fi.setUpMeterId(innerMeterId);
+ } else {
+ // this case should not happen as the request to remove an eapol
+ // flow should mean that the flow points to a meter that exists.
+ // Nevertheless we can still delete the flow as we only need the
+ // correct 'match' to do so.
+ log.warn("Unknown meter id for bp {}, still proceeding with "
+ + "delete of eapol flow for {}", bpInfo.id(), cp.toString());
+ SubscriberFlowInfo fi = new SubscriberFlowInfo(devId, null, cp.port(),
+ new UniTagInformation.Builder()
+ .setPonCTag(vlanId).build(),
+ null, meterId,
+ null, bpInfo.id());
+ handleEapol(filterFuture, install, cp, filterBuilder, treatmentBuilder, fi, meterId);
+ }
+ } else {
+ log.debug("Meter {} was previously created for bp {} on {}", meterId, bpInfo.id(), cp.toString());
+ SubscriberFlowInfo fi = new SubscriberFlowInfo(devId, null, cp.port(),
+ new UniTagInformation.Builder()
+ .setPonCTag(vlanId).build(),
+ null, meterId,
+ null, bpInfo.id());
+ handleEapol(filterFuture, install, cp, filterBuilder, treatmentBuilder, fi, meterId);
+ //No need for the future, meter is present.
+ return;
+ }
+ meterFuture.thenAcceptAsync(result -> {
+ //for each pending eapol flow we check if the meter is there.
+ BlockingQueue<SubscriberFlowInfo> queue = pendingEapolForDevice.get(devId);
+ if (queue != null) {
+ while (true) {
+ SubscriberFlowInfo fi = queue.remove();
+ if (fi == null) {
+ break;
+ }
+ //TODO this might return the reference and not the actual object
+ // so it can be actually swapped underneath us.
+ log.debug("handing pending eapol on {}/{} for {}", fi.getDevId(), fi.getUniPort(), fi);
+ if (result == null) {
+ MeterId mId = oltMeterService
+ .getMeterIdFromBpMapping(devId, fi.getUpBpInfo());
+ if (mId != null) {
+ log.debug("Meter installation completed for subscriber on {}, handling EAPOL trap flow",
+ cp.toString());
+ handleEapol(filterFuture, install, cp, filterBuilder, treatmentBuilder, fi, mId);
+ }
+ } else {
+ log.warn("Meter installation error while sending EAPOL trap flow to {}. " +
+ "Result {} and MeterId {}", cp.toString(), result, meterId);
+ }
+ oltMeterService.removeFromPendingMeters(devId, bpInfo);
+ }
+ } else {
+ log.info("No pending EAPOLs on {}", devId);
+ }
+ });
+ }
+
+ private void handleEapol(CompletableFuture<ObjectiveError> filterFuture,
+ boolean install, ConnectPoint cp,
+ DefaultFilteringObjective.Builder filterBuilder,
+ TrafficTreatment.Builder treatmentBuilder,
+ SubscriberFlowInfo fi, MeterId mId) {
+ log.info("Meter {} for {} on {}/{} exists. {} EAPOL trap flow",
+ mId, fi.getUpBpInfo(), fi.getDevId(), fi.getUniPort(),
+ (install) ? "Installing" : "Removing");
+ int techProfileId = getDefaultTechProfileId(fi.getDevId(), fi.getUniPort());
+ // can happen in case of removal
+ if (mId != null) {
+ treatmentBuilder.meter(mId);
+ }
+ //Authentication trap flow uses only tech profile id as write metadata value
+ FilteringObjective eapol = (install ? filterBuilder.permit() : filterBuilder.deny())
+ .withKey(Criteria.matchInPort(fi.getUniPort()))
+ .addCondition(Criteria.matchEthType(EthType.EtherType.EAPOL.ethType()))
+ .withMeta(treatmentBuilder
+ .writeMetadata(createTechProfValueForWm(
+ fi.getTagInfo().getPonCTag(),
+ techProfileId), 0)
+ .setOutput(PortNumber.CONTROLLER)
+ .pushVlan()
+ .setVlanId(fi.getTagInfo().getPonCTag())
+ .build())
+ .fromApp(appId)
+ .withPriority(MAX_PRIORITY)
+ .add(new ObjectiveContext() {
+ @Override
+ public void onSuccess(Objective objective) {
+ log.info("Eapol filter {} for {} on {}/{} with meter {}.",
+ objective.id(), fi.getDevId(), fi.getUniPort(),
+ (install) ? INSTALLED : REMOVED, mId);
+ if (filterFuture != null) {
+ filterFuture.complete(null);
+ }
+ }
+
+ @Override
+ public void onError(Objective objective, ObjectiveError error) {
+ log.error("Eapol filter {} for {}/{} with meter {} " +
+ "failed {} because {}", objective.id(),
+ fi.getDevId(), fi.getUniPort(), mId,
+ (install) ? INSTALLATION : REMOVAL,
+ error);
+ if (filterFuture != null) {
+ filterFuture.complete(error);
+ }
+ }
+ });
+ flowObjectiveService.filter(fi.getDevId(), eapol);
+ }
+
+ /**
+ * Installs trap filtering objectives for particular traffic types on an
+ * NNI port.
+ *
+ * @param devId device ID
+ * @param port port number
+ * @param install true to install, false to remove
+ */
+ @Override
+ public void processNniFilteringObjectives(DeviceId devId, PortNumber port, boolean install) {
+ log.info("{} flows for NNI port {} on device {}",
+ install ? "Adding" : "Removing", port, devId);
+ processLldpFilteringObjective(devId, port, install);
+ processDhcpFilteringObjectives(devId, port, null, null, install, false);
+ processIgmpFilteringObjectives(devId, port, null, null, install, false);
+ processPPPoEDFilteringObjectives(devId, port, null, null, install, false);
+ }
+
+
+ @Override
+ public void processLldpFilteringObjective(DeviceId devId, PortNumber port, boolean install) {
+ DefaultFilteringObjective.Builder builder = DefaultFilteringObjective.builder();
+
+ FilteringObjective lldp = (install ? builder.permit() : builder.deny())
+ .withKey(Criteria.matchInPort(port))
+ .addCondition(Criteria.matchEthType(EthType.EtherType.LLDP.ethType()))
+ .withMeta(DefaultTrafficTreatment.builder()
+ .setOutput(PortNumber.CONTROLLER).build())
+ .fromApp(appId)
+ .withPriority(MAX_PRIORITY)
+ .add(new ObjectiveContext() {
+ @Override
+ public void onSuccess(Objective objective) {
+ log.info("LLDP filter for dev/port {}/{} {}.",
+ devId, port, (install) ? INSTALLED : REMOVED);
+ }
+
+ @Override
+ public void onError(Objective objective, ObjectiveError error) {
+ log.error("LLDP filter for dev/port {}/{} failed {} because {}",
+ devId, port, (install) ? INSTALLATION : REMOVAL,
+ error);
+ }
+ });
+
+ flowObjectiveService.filter(devId, lldp);
+ }
+
+ @Override
+ public ForwardingObjective.Builder createTransparentBuilder(PortNumber uplinkPort,
+ PortNumber subscriberPort,
+ MeterId meterId,
+ UniTagInformation tagInfo,
+ boolean upstream) {
+
+ TrafficSelector selector = DefaultTrafficSelector.builder()
+ .matchVlanId(tagInfo.getPonSTag())
+ .matchInPort(upstream ? subscriberPort : uplinkPort)
+ .matchInnerVlanId(tagInfo.getPonCTag())
+ .build();
+
+ TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
+ if (meterId != null) {
+ tBuilder.meter(meterId);
+ }
+
+ TrafficTreatment treatment = tBuilder
+ .setOutput(upstream ? uplinkPort : subscriberPort)
+ .writeMetadata(createMetadata(upstream ? tagInfo.getPonSTag() : tagInfo.getPonCTag(),
+ tagInfo.getTechnologyProfileId(), upstream ? uplinkPort : subscriberPort), 0)
+ .build();
+
+ return createForwardingObjectiveBuilder(selector, treatment, MIN_PRIORITY);
+ }
+
+ @Override
+ public ForwardingObjective.Builder createUpBuilder(PortNumber uplinkPort,
+ PortNumber subscriberPort,
+ MeterId upstreamMeterId,
+ UniTagInformation uniTagInformation) {
+
+ TrafficSelector selector = DefaultTrafficSelector.builder()
+ .matchInPort(subscriberPort)
+ .matchVlanId(uniTagInformation.getUniTagMatch())
+ .build();
+
+ TrafficTreatment.Builder treatmentBuilder = DefaultTrafficTreatment.builder();
+ //if the subscriberVlan (cTag) is different than ANY it needs to set.
+ if (uniTagInformation.getPonCTag().toShort() != VlanId.ANY_VALUE) {
+ treatmentBuilder.pushVlan()
+ .setVlanId(uniTagInformation.getPonCTag());
+ }
+
+ if (uniTagInformation.getUsPonCTagPriority() != NO_PCP) {
+ treatmentBuilder.setVlanPcp((byte) uniTagInformation.getUsPonCTagPriority());
+ }
+
+ treatmentBuilder.pushVlan()
+ .setVlanId(uniTagInformation.getPonSTag());
+
+ if (uniTagInformation.getUsPonSTagPriority() != NO_PCP) {
+ treatmentBuilder.setVlanPcp((byte) uniTagInformation.getUsPonSTagPriority());
+ }
+
+ treatmentBuilder.setOutput(uplinkPort)
+ .writeMetadata(createMetadata(uniTagInformation.getPonCTag(),
+ uniTagInformation.getTechnologyProfileId(), uplinkPort), 0L);
+
+ if (upstreamMeterId != null) {
+ treatmentBuilder.meter(upstreamMeterId);
+ }
+
+ return createForwardingObjectiveBuilder(selector, treatmentBuilder.build(), MIN_PRIORITY);
+ }
+
+ @Override
+ public ForwardingObjective.Builder createDownBuilder(PortNumber uplinkPort,
+ PortNumber subscriberPort,
+ MeterId downstreamMeterId,
+ UniTagInformation tagInformation) {
+
+ //subscriberVlan can be any valid Vlan here including ANY to make sure the packet is tagged
+ TrafficSelector.Builder selectorBuilder = DefaultTrafficSelector.builder()
+ .matchVlanId(tagInformation.getPonSTag())
+ .matchInPort(uplinkPort)
+ .matchInnerVlanId(tagInformation.getPonCTag());
+
+
+ if (tagInformation.getPonCTag().toShort() != VlanId.ANY_VALUE) {
+ selectorBuilder.matchMetadata(tagInformation.getPonCTag().toShort());
+ }
+
+ if (tagInformation.getDsPonSTagPriority() != NO_PCP) {
+ selectorBuilder.matchVlanPcp((byte) tagInformation.getDsPonSTagPriority());
+ }
+
+ if (tagInformation.getConfiguredMacAddress() != null &&
+ !tagInformation.getConfiguredMacAddress().equals("") &&
+ !MacAddress.NONE.equals(MacAddress.valueOf(tagInformation.getConfiguredMacAddress()))) {
+ selectorBuilder.matchEthDst(MacAddress.valueOf(tagInformation.getConfiguredMacAddress()));
+ }
+
+ TrafficTreatment.Builder treatmentBuilder = DefaultTrafficTreatment.builder()
+ .popVlan()
+ .setOutput(subscriberPort);
+
+ treatmentBuilder.writeMetadata(createMetadata(tagInformation.getPonCTag(),
+ tagInformation.getTechnologyProfileId(),
+ subscriberPort), 0);
+
+ // to remark inner vlan header
+ if (tagInformation.getUsPonCTagPriority() != NO_PCP) {
+ treatmentBuilder.setVlanPcp((byte) tagInformation.getUsPonCTagPriority());
+ }
+
+ if (!VlanId.NONE.equals(tagInformation.getUniTagMatch()) &&
+ tagInformation.getPonCTag().toShort() != VlanId.ANY_VALUE) {
+ treatmentBuilder.setVlanId(tagInformation.getUniTagMatch());
+ }
+
+ if (downstreamMeterId != null) {
+ treatmentBuilder.meter(downstreamMeterId);
+ }
+
+ return createForwardingObjectiveBuilder(selectorBuilder.build(), treatmentBuilder.build(), MIN_PRIORITY);
+ }
+
+ @Override
+ public void clearDeviceState(DeviceId deviceId) {
+ pendingEapolForDevice.remove(deviceId);
+ }
+
+ private DefaultForwardingObjective.Builder createForwardingObjectiveBuilder(TrafficSelector selector,
+ TrafficTreatment treatment,
+ Integer priority) {
+ return DefaultForwardingObjective.builder()
+ .withFlag(ForwardingObjective.Flag.VERSATILE)
+ .withPriority(priority)
+ .makePermanent()
+ .withSelector(selector)
+ .fromApp(appId)
+ .withTreatment(treatment);
+ }
+
+ /**
+ * Returns the write metadata value including tech profile reference and innerVlan.
+ * For param cVlan, null can be sent
+ *
+ * @param cVlan c (customer) tag of one subscriber
+ * @param techProfileId tech profile id of one subscriber
+ * @return the write metadata value including tech profile reference and innerVlan
+ */
+ private Long createTechProfValueForWm(VlanId cVlan, int techProfileId) {
+ if (cVlan == null || VlanId.NONE.equals(cVlan)) {
+ return (long) techProfileId << 32;
+ }
+ return ((long) (cVlan.id()) << 48 | (long) techProfileId << 32);
+ }
+
+ private BandwidthProfileInformation getBandwidthProfileInformation(String bandwidthProfile) {
+ if (bandwidthProfile == null) {
+ return null;
+ }
+ return bpService.get(bandwidthProfile);
+ }
+
+ /**
+ * It will be used to support AT&T use case (for EAPOL flows).
+ * If multiple services are found in uniServiceList, returns default tech profile id
+ * If one service is found, returns the found one
+ *
+ * @param devId
+ * @param portNumber
+ * @return the default technology profile id
+ */
+ private int getDefaultTechProfileId(DeviceId devId, PortNumber portNumber) {
+ Port port = deviceService.getPort(devId, portNumber);
+ if (port != null) {
+ SubscriberAndDeviceInformation info = subsService.get(port.annotations().value(AnnotationKeys.PORT_NAME));
+ if (info != null && info.uniTagList().size() == 1) {
+ return info.uniTagList().get(0).getTechnologyProfileId();
+ }
+ }
+ return defaultTechProfileId;
+ }
+
+ /**
+ * Write metadata instruction value (metadata) is 8 bytes.
+ * <p>
+ * MS 2 bytes: C Tag
+ * Next 2 bytes: Technology Profile Id
+ * Next 4 bytes: Port number (uni or nni)
+ */
+ private Long createMetadata(VlanId innerVlan, int techProfileId, PortNumber egressPort) {
+ if (techProfileId == NONE_TP_ID) {
+ techProfileId = DEFAULT_TP_ID_DEFAULT;
+ }
+
+ return ((long) (innerVlan.id()) << 48 | (long) techProfileId << 32) | egressPort.toLong();
+ }
+
+
+}
diff --git a/impl/src/main/java/org/opencord/olt/impl/OltMeterService.java b/impl/src/main/java/org/opencord/olt/impl/OltMeterService.java
new file mode 100644
index 0000000..d57f3c1
--- /dev/null
+++ b/impl/src/main/java/org/opencord/olt/impl/OltMeterService.java
@@ -0,0 +1,420 @@
+/*
+ * Copyright 2016-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.olt.impl;
+
+import static java.util.stream.Collectors.collectingAndThen;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.mapping;
+import static java.util.stream.Collectors.toSet;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.opencord.olt.impl.OsgiPropertyConstants.DELETE_METERS;
+import static org.opencord.olt.impl.OsgiPropertyConstants.DELETE_METERS_DEFAULT;
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import org.onlab.util.KryoNamespace;
+import org.onlab.util.Tools;
+import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.flowobjective.ObjectiveError;
+import org.onosproject.net.meter.Band;
+import org.onosproject.net.meter.DefaultBand;
+import org.onosproject.net.meter.DefaultMeterRequest;
+import org.onosproject.net.meter.Meter;
+import org.onosproject.net.meter.MeterContext;
+import org.onosproject.net.meter.MeterEvent;
+import org.onosproject.net.meter.MeterFailReason;
+import org.onosproject.net.meter.MeterId;
+import org.onosproject.net.meter.MeterKey;
+import org.onosproject.net.meter.MeterListener;
+import org.onosproject.net.meter.MeterRequest;
+import org.onosproject.net.meter.MeterService;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMultimap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.opencord.olt.internalapi.AccessDeviceMeterService;
+import org.opencord.sadis.BandwidthProfileInformation;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Modified;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.slf4j.Logger;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+
+/**
+ * Provisions Meters on access devices.
+ */
+@Component(immediate = true, property = {
+ DELETE_METERS + ":Boolean=" + DELETE_METERS_DEFAULT,
+ })
+public class OltMeterService implements AccessDeviceMeterService {
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected MeterService meterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected CoreService coreService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected ComponentConfigService componentConfigService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected StorageService storageService;
+
+ /**
+ * Delete meters when reference count drops to zero.
+ */
+ protected boolean deleteMeters = DELETE_METERS_DEFAULT;
+
+ private ApplicationId appId;
+ private static final String APP_NAME = "org.opencord.olt";
+
+ private final MeterListener meterListener = new InternalMeterListener();
+
+ private final Logger log = getLogger(getClass());
+
+ protected ExecutorService eventExecutor;
+
+ private Map<DeviceId, Set<BandwidthProfileInformation>> pendingMeters;
+ private Map<DeviceId, Map<MeterKey, AtomicInteger>> pendingRemoveMeters;
+ ConsistentMultimap<String, MeterKey> bpInfoToMeter;
+
+ @Activate
+ public void activate(ComponentContext context) {
+ eventExecutor = Executors.newFixedThreadPool(5, groupedThreads("onos/olt",
+ "events-%d", log));
+ appId = coreService.registerApplication(APP_NAME);
+ modified(context);
+
+ KryoNamespace serializer = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .register(MeterKey.class)
+ .build();
+
+ bpInfoToMeter = storageService.<String, MeterKey>consistentMultimapBuilder()
+ .withName("volt-bp-info-to-meter")
+ .withSerializer(Serializer.using(serializer))
+ .withApplicationId(appId)
+ .build();
+
+ meterService.addListener(meterListener);
+ componentConfigService.registerProperties(getClass());
+ pendingMeters = Maps.newConcurrentMap();
+ pendingRemoveMeters = Maps.newConcurrentMap();
+ log.info("Olt Meter service started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ meterService.removeListener(meterListener);
+ }
+
+
+ @Modified
+ public void modified(ComponentContext context) {
+ Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
+
+ Boolean d = Tools.isPropertyEnabled(properties, "deleteMeters");
+ if (d != null) {
+ deleteMeters = d;
+ }
+ }
+
+ @Override
+ public ImmutableMap<String, Collection<MeterKey>> getBpMeterMappings() {
+ return bpInfoToMeter.stream()
+ .collect(collectingAndThen(
+ groupingBy(Map.Entry::getKey, mapping(Map.Entry::getValue, toSet())),
+ ImmutableMap::copyOf));
+ }
+
+ boolean addMeterIdToBpMapping(DeviceId deviceId, MeterId meterId, String bandwidthProfile) {
+ log.debug("adding bp {} to meter {} mapping for device {}",
+ bandwidthProfile, meterId, deviceId);
+ return bpInfoToMeter.put(bandwidthProfile, MeterKey.key(deviceId, meterId));
+ }
+
+ @Override
+ public MeterId getMeterIdFromBpMapping(DeviceId deviceId, String bandwidthProfile) {
+ if (bpInfoToMeter.get(bandwidthProfile).value().isEmpty()) {
+ log.warn("Bandwidth Profile '{}' is not currently mapped to a meter",
+ bandwidthProfile);
+ return null;
+ }
+
+ Optional<? extends MeterKey> meterKeyForDevice = bpInfoToMeter.get(bandwidthProfile).value()
+ .stream()
+ .filter(meterKey -> meterKey.deviceId().equals(deviceId))
+ .findFirst();
+ if (meterKeyForDevice.isPresent()) {
+ log.debug("Found meter {} for bandwidth profile {} on {}",
+ meterKeyForDevice.get().meterId(), bandwidthProfile, deviceId);
+ return meterKeyForDevice.get().meterId();
+ } else {
+ log.warn("Bandwidth Profile '{}' is not currently mapped to a meter on {} , {}",
+ bandwidthProfile, deviceId, bpInfoToMeter.get(bandwidthProfile).value());
+ return null;
+ }
+ }
+
+ @Override
+ public ImmutableSet<MeterKey> getProgMeters() {
+ return bpInfoToMeter.stream()
+ .map(Map.Entry::getValue)
+ .collect(ImmutableSet.toImmutableSet());
+ }
+
+ @Override
+ public MeterId createMeter(DeviceId deviceId, BandwidthProfileInformation bpInfo,
+ CompletableFuture<Object> meterFuture) {
+ log.debug("Creating meter on {} for {}", deviceId, bpInfo);
+ if (bpInfo == null) {
+ log.warn("Requested bandwidth profile on {} information is NULL", deviceId);
+ meterFuture.complete(ObjectiveError.BADPARAMS);
+ return null;
+ }
+
+ MeterId meterId = getMeterIdFromBpMapping(deviceId, bpInfo.id());
+ if (meterId != null) {
+ log.debug("Meter {} was previously created for bp {}", meterId, bpInfo.id());
+ meterFuture.complete(null);
+ return meterId;
+ }
+
+ List<Band> meterBands = createMeterBands(bpInfo);
+
+ final AtomicReference<MeterId> meterIdRef = new AtomicReference<>();
+ MeterRequest meterRequest = DefaultMeterRequest.builder()
+ .withBands(meterBands)
+ .withUnit(Meter.Unit.KB_PER_SEC)
+ .withContext(new MeterContext() {
+ @Override
+ public void onSuccess(MeterRequest op) {
+ log.debug("Meter {} for {} is installed on the device {}",
+ meterIdRef.get(), bpInfo.id(), deviceId);
+ boolean added = addMeterIdToBpMapping(deviceId, meterIdRef.get(), bpInfo.id());
+ if (added) {
+ meterFuture.complete(null);
+ } else {
+ log.error("Failed to add Meter {} for {} on {} to the meter-bandwidth mapping",
+ meterIdRef.get(), bpInfo.id(), deviceId);
+ meterFuture.complete(ObjectiveError.UNKNOWN);
+ }
+ }
+
+ @Override
+ public void onError(MeterRequest op, MeterFailReason reason) {
+ log.error("Failed installing meter {} on {} for {}",
+ meterIdRef.get(), deviceId, bpInfo.id());
+ bpInfoToMeter.remove(bpInfo.id(),
+ MeterKey.key(deviceId, meterIdRef.get()));
+ meterFuture.complete(reason);
+ }
+ })
+ .forDevice(deviceId)
+ .fromApp(appId)
+ .burst()
+ .add();
+
+ Meter meter = meterService.submit(meterRequest);
+ meterIdRef.set(meter.id());
+ log.info("Meter {} created and sent for installation on {} for {}",
+ meter.id(), deviceId, bpInfo);
+ return meter.id();
+ }
+
+ @Override
+ public void removeFromPendingMeters(DeviceId deviceId, BandwidthProfileInformation bwpInfo) {
+ if (deviceId == null) {
+ return;
+ }
+ pendingMeters.computeIfPresent(deviceId, (id, bwps) -> {
+ bwps.remove(bwpInfo);
+ return bwps;
+ });
+ }
+
+ @Override
+ public synchronized boolean checkAndAddPendingMeter(DeviceId deviceId, BandwidthProfileInformation bwpInfo) {
+ if (pendingMeters.containsKey(deviceId)
+ && pendingMeters.get(deviceId).contains(bwpInfo)) {
+ log.debug("Meter is already pending on {} with bp {}",
+ deviceId, bwpInfo);
+ return false;
+ }
+ log.debug("Adding bandwidth profile {} to pending on {}",
+ bwpInfo, deviceId);
+ pendingMeters.compute(deviceId, (id, bwps) -> {
+ if (bwps == null) {
+ bwps = new HashSet<>();
+ }
+ bwps.add(bwpInfo);
+ return bwps;
+ });
+
+ return true;
+ }
+
+ @Override
+ public void clearMeters(DeviceId deviceId) {
+ log.debug("Removing all meters for device {}", deviceId);
+ clearDeviceState(deviceId);
+ meterService.purgeMeters(deviceId);
+ }
+
+ @Override
+ public void clearDeviceState(DeviceId deviceId) {
+ log.info("Clearing local device state for {}", deviceId);
+ pendingRemoveMeters.remove(deviceId);
+ removeMetersFromBpMapping(deviceId);
+ //Following call handles cornercase of OLT delete during meter provisioning
+ pendingMeters.remove(deviceId);
+ }
+
+ private List<Band> createMeterBands(BandwidthProfileInformation bpInfo) {
+ List<Band> meterBands = new ArrayList<>();
+
+ meterBands.add(createMeterBand(bpInfo.committedInformationRate(), bpInfo.committedBurstSize()));
+ meterBands.add(createMeterBand(bpInfo.exceededInformationRate(), bpInfo.exceededBurstSize()));
+ meterBands.add(createMeterBand(bpInfo.assuredInformationRate(), 0L));
+
+ return meterBands;
+ }
+
+ private Band createMeterBand(long rate, Long burst) {
+ return DefaultBand.builder()
+ .withRate(rate) //already Kbps
+ .burstSize(burst) // already Kbits
+ .ofType(Band.Type.DROP) // no matter
+ .build();
+ }
+
+ private void removeMeterFromBpMapping(MeterKey meterKey) {
+ List<Map.Entry<String, MeterKey>> meters = bpInfoToMeter.stream()
+ .filter(e -> e.getValue().equals(meterKey))
+ .collect(Collectors.toList());
+
+ meters.forEach(e -> bpInfoToMeter.remove(e.getKey(), e.getValue()));
+ }
+
+ private void removeMetersFromBpMapping(DeviceId deviceId) {
+ List<Map.Entry<String, MeterKey>> meters = bpInfoToMeter.stream()
+ .filter(e -> e.getValue().deviceId().equals(deviceId))
+ .collect(Collectors.toList());
+
+ meters.forEach(e -> bpInfoToMeter.remove(e.getKey(), e.getValue()));
+ }
+
+ private class InternalMeterListener implements MeterListener {
+
+ @Override
+ public void event(MeterEvent meterEvent) {
+ eventExecutor.execute(() -> {
+ Meter meter = meterEvent.subject();
+ if (meter == null) {
+ log.error("Meter in event {} is null", meterEvent);
+ return;
+ }
+ MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
+ if (deleteMeters && MeterEvent.Type.METER_REFERENCE_COUNT_ZERO.equals(meterEvent.type())) {
+ log.info("Zero Count Meter Event is received. Meter is {} on {}",
+ meter.id(), meter.deviceId());
+ incrementMeterCount(meter.deviceId(), key);
+
+ if (appId.equals(meter.appId()) && pendingRemoveMeters.get(meter.deviceId())
+ .get(key).get() == 3) {
+ log.info("Deleting unreferenced, no longer programmed Meter {} on {}",
+ meter.id(), meter.deviceId());
+ deleteMeter(meter.deviceId(), meter.id());
+ }
+ }
+ if (MeterEvent.Type.METER_REMOVED.equals(meterEvent.type())) {
+ log.info("Meter Removed Event is received for {} on {}",
+ meter.id(), meter.deviceId());
+ pendingRemoveMeters.computeIfPresent(meter.deviceId(),
+ (id, meters) -> {
+ if (meters.get(key) == null) {
+ log.info("Meters is not pending " +
+ "{} on {}", key, id);
+ return meters;
+ }
+ meters.remove(key);
+ return meters;
+ });
+ removeMeterFromBpMapping(key);
+ }
+ });
+ }
+
+ private void incrementMeterCount(DeviceId deviceId, MeterKey key) {
+ if (key == null) {
+ return;
+ }
+ pendingRemoveMeters.compute(deviceId,
+ (id, meters) -> {
+ if (meters == null) {
+ meters = new HashMap<>();
+
+ }
+ if (meters.get(key) == null) {
+ meters.put(key, new AtomicInteger(1));
+ }
+ meters.get(key).addAndGet(1);
+ return meters;
+ });
+ }
+
+ private void deleteMeter(DeviceId deviceId, MeterId meterId) {
+ Meter meter = meterService.getMeter(deviceId, meterId);
+ if (meter != null) {
+ MeterRequest meterRequest = DefaultMeterRequest.builder()
+ .withBands(meter.bands())
+ .withUnit(meter.unit())
+ .forDevice(deviceId)
+ .fromApp(appId)
+ .burst()
+ .remove();
+
+ meterService.withdraw(meterRequest, meterId);
+ }
+ }
+ }
+}
diff --git a/impl/src/main/java/org/opencord/olt/impl/OsgiPropertyConstants.java b/impl/src/main/java/org/opencord/olt/impl/OsgiPropertyConstants.java
new file mode 100644
index 0000000..3d76ef6
--- /dev/null
+++ b/impl/src/main/java/org/opencord/olt/impl/OsgiPropertyConstants.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.olt.impl;
+
+/**
+ * Constants for default values of configurable properties.
+ */
+public final class OsgiPropertyConstants {
+
+ private OsgiPropertyConstants() {
+ }
+
+ public static final String DEFAULT_MCAST_SERVICE_NAME = "multicastServiceName";
+ public static final String DEFAULT_MCAST_SERVICE_NAME_DEFAULT = "MC";
+
+ public static final String ENABLE_DHCP_ON_NNI = "enableDhcpOnNni";
+ public static final boolean ENABLE_DHCP_ON_NNI_DEFAULT = false;
+
+ public static final String ENABLE_DHCP_V4 = "enableDhcpV4";
+ public static final boolean ENABLE_DHCP_V4_DEFAULT = true;
+
+ public static final String ENABLE_DHCP_V6 = "enableDhcpV6";
+ public static final boolean ENABLE_DHCP_V6_DEFAULT = false;
+
+ public static final String ENABLE_IGMP_ON_NNI = "enableIgmpOnNni";
+ public static final boolean ENABLE_IGMP_ON_NNI_DEFAULT = false;
+
+ public static final String DELETE_METERS = "deleteMeters";
+ public static final boolean DELETE_METERS_DEFAULT = true;
+
+ public static final String DEFAULT_TP_ID = "defaultTechProfileId";
+ public static final int DEFAULT_TP_ID_DEFAULT = 64;
+
+ public static final String DEFAULT_BP_ID = "defaultBpId";
+ public static final String DEFAULT_BP_ID_DEFAULT = "Default";
+
+ public static final String ENABLE_EAPOL = "enableEapol";
+ public static final boolean ENABLE_EAPOL_DEFAULT = true;
+
+ public static final String ENABLE_PPPOE = "enablePppoe";
+ public static final boolean ENABLE_PPPOE_DEFAULT = false;
+
+ public static final String EAPOL_DELETE_RETRY_MAX_ATTEMPS = "eapolDeleteRetryMaxAttempts";
+ public static final int EAPOL_DELETE_RETRY_MAX_ATTEMPS_DEFAULT = 3;
+
+ public static final String PROVISION_DELAY = "provisionDelay";
+ public static final int PROVISION_DELAY_DEFAULT = 100;
+}
diff --git a/impl/src/main/java/org/opencord/olt/impl/SubscriberFlowInfo.java b/impl/src/main/java/org/opencord/olt/impl/SubscriberFlowInfo.java
new file mode 100644
index 0000000..1ac5b08
--- /dev/null
+++ b/impl/src/main/java/org/opencord/olt/impl/SubscriberFlowInfo.java
@@ -0,0 +1,185 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.olt.impl;
+
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.meter.MeterId;
+import org.opencord.sadis.UniTagInformation;
+
+import java.util.Objects;
+
+/**
+ * Contains the mapping of a given port to flow information, including bandwidth profile.
+ */
+class SubscriberFlowInfo {
+ private final DeviceId devId;
+ private final PortNumber nniPort;
+ private final PortNumber uniPort;
+ private final UniTagInformation tagInfo;
+ private MeterId downId;
+ private MeterId upId;
+ private final String downBpInfo;
+ private final String upBpInfo;
+
+ /**
+ * Builds the mapper of information.
+ * @param devId the device id
+ * @param nniPort the nni port
+ * @param uniPort the uni port
+ * @param tagInfo the tag info
+ * @param downId the downstream meter id
+ * @param upId the upstream meter id
+ * @param downBpInfo the downstream bandwidth profile
+ * @param upBpInfo the upstream bandwidth profile
+ */
+ SubscriberFlowInfo(DeviceId devId, PortNumber nniPort, PortNumber uniPort,
+ UniTagInformation tagInfo, MeterId downId, MeterId upId,
+ String downBpInfo, String upBpInfo) {
+ this.devId = devId;
+ this.nniPort = nniPort;
+ this.uniPort = uniPort;
+ this.tagInfo = tagInfo;
+ this.downId = downId;
+ this.upId = upId;
+ this.downBpInfo = downBpInfo;
+ this.upBpInfo = upBpInfo;
+ }
+
+ /**
+ * Gets the device id of this subscriber and flow information.
+ *
+ * @return device id
+ */
+ DeviceId getDevId() {
+ return devId;
+ }
+
+ /**
+ * Gets the nni of this subscriber and flow information.
+ *
+ * @return nni port
+ */
+ PortNumber getNniPort() {
+ return nniPort;
+ }
+
+ /**
+ * Gets the uni port of this subscriber and flow information.
+ *
+ * @return uni port
+ */
+ PortNumber getUniPort() {
+ return uniPort;
+ }
+
+ /**
+ * Gets the tag of this subscriber and flow information.
+ *
+ * @return tag of the subscriber
+ */
+ UniTagInformation getTagInfo() {
+ return tagInfo;
+ }
+
+ /**
+ * Gets the downstream meter id of this subscriber and flow information.
+ *
+ * @return downstream meter id
+ */
+ MeterId getDownId() {
+ return downId;
+ }
+
+ /**
+ * Gets the upstream meter id of this subscriber and flow information.
+ *
+ * @return upstream meter id
+ */
+ MeterId getUpId() {
+ return upId;
+ }
+
+ /**
+ * Gets the downstream bandwidth profile of this subscriber and flow information.
+ *
+ * @return downstream bandwidth profile
+ */
+ String getDownBpInfo() {
+ return downBpInfo;
+ }
+
+ /**
+ * Gets the upstream bandwidth profile of this subscriber and flow information.
+ *
+ * @return upstream bandwidth profile.
+ */
+ String getUpBpInfo() {
+ return upBpInfo;
+ }
+
+ /**
+ * Sets the upstream meter id.
+ * @param upMeterId the upstream meter id
+ */
+ void setUpMeterId(MeterId upMeterId) {
+ this.upId = upMeterId;
+ }
+
+ /**
+ * Sets the downstream meter id.
+ * @param downMeterId the downstream meter id
+ */
+ void setDownMeterId(MeterId downMeterId) {
+ this.downId = downMeterId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SubscriberFlowInfo flowInfo = (SubscriberFlowInfo) o;
+ return devId.equals(flowInfo.devId) &&
+ nniPort.equals(flowInfo.nniPort) &&
+ uniPort.equals(flowInfo.uniPort) &&
+ tagInfo.equals(flowInfo.tagInfo) &&
+ downBpInfo.equals(flowInfo.downBpInfo) &&
+ upBpInfo.equals(flowInfo.upBpInfo);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(devId, nniPort, uniPort, tagInfo, downBpInfo, upBpInfo);
+ }
+
+ @Override
+ public String toString() {
+ return com.google.common.base.MoreObjects.toStringHelper(this)
+ .add("devId", devId)
+ .add("nniPort", nniPort)
+ .add("uniPort", uniPort)
+ .add("tagInfo", tagInfo)
+ .add("downId", downId)
+ .add("upId", upId)
+ .add("downBpInfo", downBpInfo)
+ .add("upBpInfo", upBpInfo)
+ .toString();
+ }
+}
diff --git a/impl/src/main/java/org/opencord/olt/impl/package-info.java b/impl/src/main/java/org/opencord/olt/impl/package-info.java
new file mode 100644
index 0000000..b5ba320
--- /dev/null
+++ b/impl/src/main/java/org/opencord/olt/impl/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2016-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * OLT application handling PMC OLT hardware.
+ */
+package org.opencord.olt.impl;
diff --git a/impl/src/main/java/org/opencord/olt/internalapi/AccessDeviceFlowService.java b/impl/src/main/java/org/opencord/olt/internalapi/AccessDeviceFlowService.java
new file mode 100644
index 0000000..7de5b1a
--- /dev/null
+++ b/impl/src/main/java/org/opencord/olt/internalapi/AccessDeviceFlowService.java
@@ -0,0 +1,171 @@
+/*
+ * Copyright 2016-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.olt.internalapi;
+
+import org.onlab.packet.VlanId;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.flowobjective.ForwardingObjective;
+import org.onosproject.net.flowobjective.ObjectiveError;
+import org.onosproject.net.meter.MeterId;
+import org.opencord.sadis.UniTagInformation;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Olt service for flow operations.
+ */
+public interface AccessDeviceFlowService {
+
+ /**
+ * Provisions or removes trap-to-controller DHCP packets.
+ *
+ * @param devId the target device identifier
+ * @param port the uni port for which this trap flow is designated
+ * @param upstreamMeterId the upstream meter id that includes the upstream
+ * bandwidth profile values such as PIR,CIR. If no meter id needs to be referenced,
+ * null can be sent
+ * @param tagInformation the uni tag (ctag, stag) information
+ * @param install true to install the flow, false to remove the flow
+ * @param upstream true if trapped packets are flowing upstream towards
+ * server, false if packets are flowing downstream towards client
+ */
+ void processDhcpFilteringObjectives(DeviceId devId, PortNumber port,
+ MeterId upstreamMeterId,
+ UniTagInformation tagInformation,
+ boolean install,
+ boolean upstream);
+
+ /**
+ * Trap igmp packets to the controller.
+ *
+ * @param devId Device identifier to send the flow
+ * @param port Uni Port number
+ * @param upstreamMeterId upstream meter id that represents the upstream bandwidth profile
+ * @param tagInformation the uni tag information of the subscriber
+ * @param install the indicator to install or to remove the flow
+ * @param upstream determines the direction of the flow
+ */
+ void processIgmpFilteringObjectives(DeviceId devId, PortNumber port,
+ MeterId upstreamMeterId,
+ UniTagInformation tagInformation,
+ boolean install,
+ boolean upstream);
+
+ /**
+ * Trap eapol authentication packets to the controller.
+ *
+ * @param devId the device identifier
+ * @param portNumber the port for which this trap flow is designated
+ * @param bpId bandwidth profile id to add the related meter to the flow
+ * @param filterFuture completable future for this filtering objective operation
+ * @param vlanId the default or customer tag for a subscriber
+ * @param install true to install the flow, false to remove the flow
+ */
+ void processEapolFilteringObjectives(DeviceId devId, PortNumber portNumber, String bpId,
+ CompletableFuture<ObjectiveError> filterFuture,
+ VlanId vlanId, boolean install);
+
+ /**
+ * Trap PPPoE discovery packets to the controller.
+ *
+ * @param devId the target device identifier
+ * @param portNumber the uni port for which this trap flow is designated
+ * @param upstreamMeterId the upstream meter id that includes the upstream
+ * bandwidth profile values such as PIR,CIR. If no meter id needs to be referenced,
+ * null can be sent
+ * @param tagInformation the uni tag (ctag, stag) information
+ * @param install true to install the flow, false to remove the flow
+ * @param upstream true if trapped packets are flowing upstream towards
+ * server, false if packets are flowing downstream towards client
+ **/
+ void processPPPoEDFilteringObjectives(DeviceId devId, PortNumber portNumber,
+ MeterId upstreamMeterId, UniTagInformation tagInformation,
+ boolean install, boolean upstream);
+
+ /**
+ * Trap lldp packets to the controller.
+ *
+ * @param devId the device identifier
+ * @param port the port for which this trap flow is designated
+ * @param install true to install the flow, false to remove the flow
+ */
+ void processLldpFilteringObjective(DeviceId devId, PortNumber port, boolean install);
+
+ /**
+ * Installs trap filtering objectives for particular traffic types (LLDP, IGMP and DHCP) on an
+ * NNI port.
+ *
+ * @param devId device ID
+ * @param port port number
+ * @param install true to install, false to remove
+ */
+ void processNniFilteringObjectives(DeviceId devId, PortNumber port, boolean install);
+
+ /**
+ * Creates a ForwardingObjective builder with double-tag match criteria and output
+ * action. The treatment will not contain pop or push actions.
+ * If the last parameter is true, use the upstream meter id and vice versa.
+ *
+ * @param uplinkPort the nni port
+ * @param subscriberPort the uni port
+ * @param meterId the meter id that is assigned to upstream or downstream flows
+ * @param tagInfo the uni tag information
+ * @param upstream true to create upstream, false to create downstream builder
+ * @return ForwardingObjective.Builder
+ */
+ ForwardingObjective.Builder createTransparentBuilder(PortNumber uplinkPort,
+ PortNumber subscriberPort,
+ MeterId meterId,
+ UniTagInformation tagInfo,
+ boolean upstream);
+
+ /**
+ * Creates a ForwardingObjective builder for the upstream flows.
+ * The treatment will contain push action
+ *
+ * @param uplinkPort the nni port
+ * @param subscriberPort the uni port
+ * @param upstreamMeterId the meter id that is assigned to upstream flows
+ * @param uniTagInformation the uni tag information
+ * @return ForwardingObjective.Builder
+ */
+ ForwardingObjective.Builder createUpBuilder(PortNumber uplinkPort,
+ PortNumber subscriberPort,
+ MeterId upstreamMeterId,
+ UniTagInformation uniTagInformation);
+
+ /**
+ * Creates a ForwardingObjective builder for the downstream flows.
+ * The treatment will contain pop action
+ *
+ * @param uplinkPort the nni port
+ * @param subscriberPort the uni port
+ * @param downstreamMeterId the meter id that is assigned to downstream flows
+ * @param tagInformation the uni tag information
+ * @return ForwardingObjective.Builder
+ */
+ ForwardingObjective.Builder createDownBuilder(PortNumber uplinkPort,
+ PortNumber subscriberPort,
+ MeterId downstreamMeterId,
+ UniTagInformation tagInformation);
+
+ /**
+ * Clears pending mappings and state for device.
+ * @param deviceId the device id
+ */
+ void clearDeviceState(DeviceId deviceId);
+}
diff --git a/impl/src/main/java/org/opencord/olt/internalapi/AccessDeviceMeterService.java b/impl/src/main/java/org/opencord/olt/internalapi/AccessDeviceMeterService.java
new file mode 100644
index 0000000..ca04ed4
--- /dev/null
+++ b/impl/src/main/java/org/opencord/olt/internalapi/AccessDeviceMeterService.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright 2016-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.olt.internalapi;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.meter.MeterId;
+import org.onosproject.net.meter.MeterKey;
+import org.opencord.sadis.BandwidthProfileInformation;
+
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Olt service for meter operations.
+ */
+public interface AccessDeviceMeterService {
+
+ /**
+ * Returns information about bandwidthProfile-meterKey (device / meter) mappings
+ * that have been programmed in the data-plane.
+ *
+ * @return an immutable map of bandwidthProfile-meterKey (device / meter) mappings
+ */
+ ImmutableMap<String, Collection<MeterKey>> getBpMeterMappings();
+
+ /**
+ * Returns the meter id for a given bandwidth profile.
+ *
+ * @param deviceId the access device id
+ * @param bandwidthProfile the bandwidth profile id
+ * @return the meter id
+ */
+ MeterId getMeterIdFromBpMapping(DeviceId deviceId, String bandwidthProfile);
+
+ /**
+ * Returns information about device-meter relations that have been programmed in the
+ * data-plane.
+ *
+ * @return an immutable set of device-meter mappings
+ */
+ ImmutableSet<MeterKey> getProgMeters();
+
+ /**
+ * Creates a meter and sends it to the device.
+ *
+ * @param deviceId the access device id
+ * @param bpInfo the bandwidth profile information
+ * @param meterFuture the meter future to indicate whether the meter creation is
+ * successful or not.
+ * @return meter id that is generated for the given parameters
+ */
+ MeterId createMeter(DeviceId deviceId, BandwidthProfileInformation bpInfo,
+ CompletableFuture<Object> meterFuture);
+
+ /**
+ * Removes the DeviceBandwidthProfile from the pendingMeters.
+ *
+ * @param deviceId the device
+ * @param bwpInfo the bandwidth profile info
+ *
+ */
+ void removeFromPendingMeters(DeviceId deviceId, BandwidthProfileInformation bwpInfo);
+
+ /**
+ * Checks if DeviceBandwidthProfile is pending installation.
+ * If so immediately returns false meaning that no further action is needed,
+ * if not it adds the bandwidth profile do the pending list and returns true,
+ * meaning that further action to install the meter is required.
+ *
+ * @param deviceId the device
+ * @param bwpInfo the bandwidth profile info
+ *
+ * @return true if it was added to pending and a create meter action is needed,
+ * false if it is already pending and no further action is needed.
+ */
+ boolean checkAndAddPendingMeter(DeviceId deviceId, BandwidthProfileInformation bwpInfo);
+
+ /**
+ * Clears out meters for the given device.
+ *
+ * @param deviceId device ID
+ */
+ void clearMeters(DeviceId deviceId);
+
+ /**
+ * Clears out local state for the given device.
+ *
+ * @param deviceId device ID
+ */
+ void clearDeviceState(DeviceId deviceId);
+}
diff --git a/impl/src/main/java/org/opencord/olt/internalapi/package-info.java b/impl/src/main/java/org/opencord/olt/internalapi/package-info.java
new file mode 100644
index 0000000..2b5d98d
--- /dev/null
+++ b/impl/src/main/java/org/opencord/olt/internalapi/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2016-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Internal APIs for the OLT application.
+ */
+package org.opencord.olt.internalapi;
diff --git a/impl/src/main/resources/any_vlan_cfg.json b/impl/src/main/resources/any_vlan_cfg.json
new file mode 100644
index 0000000..0c42301
--- /dev/null
+++ b/impl/src/main/resources/any_vlan_cfg.json
@@ -0,0 +1,62 @@
+{
+"apps" : {
+ "org.opencord.sadis" : {
+ "sadis" : {
+ "integration" : {
+ "cache" : {
+ "enabled" : true,
+ "maxsize" : 40,
+ "ttl" : "PT1m"
+ }
+ },
+ "entries" : [ {
+ "id" : "s1-eth1",
+ "cTag" : 4096,
+ "sTag" : 4,
+ "nasPortId" : "s1-eth1",
+ "technologyProfileId" : 10,
+ "upstreamBandwidthProfile" : "High-Speed-Internet",
+ "downstreamBandwidthProfile" : "User1-Specific"
+ }, {
+ "id" : "1",
+ "hardwareIdentifier" : "00:00:00:00:00:01",
+ "ipAddress" : "127.0.0.1",
+ "uplinkPort": "2"
+ } ]
+ },
+ "bandwidthprofile":{
+ "integration":{
+ "cache":{
+ "enabled":true,
+ "maxsize":40,
+ "ttl":"PT1m"
+ }
+ },
+ "entries":[
+ {
+ "id":"High-Speed-Internet",
+ "cir":200000000,
+ "cbs":348000,
+ "eir":10000000,
+ "ebs":348000,
+ "air":10000000
+ },
+ {
+ "id":"User1-Specific",
+ "cir":300000000,
+ "cbs":348000,
+ "eir":20000000,
+ "ebs":348000
+ }
+ ]
+ }
+ }
+ },
+ "devices":{
+ "of:0000000000000001":{
+ "basic":{
+ "driver":"pmc-olt"
+ }
+ }
+ }
+}
diff --git a/impl/src/main/resources/cfg.json b/impl/src/main/resources/cfg.json
new file mode 100644
index 0000000..0b61e65
--- /dev/null
+++ b/impl/src/main/resources/cfg.json
@@ -0,0 +1,33 @@
+{
+"apps" : {
+ "org.opencord.sadis" : {
+ "sadis" : {
+ "integration" : {
+ "cache" : {
+ "enabled" : true,
+ "maxsize" : 50,
+ "ttl" : "PT1m"
+ }
+ },
+ "entries" : [ {
+ "id" : "s1-eth1",
+ "cTag" : 2,
+ "sTag" : 4,
+ "nasPortId" : "s1-eth1"
+ }, {
+ "id" : "1",
+ "hardwareIdentifier" : "00:00:00:00:00:01",
+ "ipAddress" : "127.0.0.1",
+ "uplinkPort": "2"
+ } ]
+ }
+ }
+ },
+ "devices":{
+ "of:0000000000000001":{
+ "basic":{
+ "driver":"pmc-olt"
+ }
+ }
+ }
+}
diff --git a/impl/src/main/resources/custom-topo.py b/impl/src/main/resources/custom-topo.py
new file mode 100644
index 0000000..dfb414d
--- /dev/null
+++ b/impl/src/main/resources/custom-topo.py
@@ -0,0 +1,69 @@
+'''
+ Copyright 2016-present Open Networking Foundation
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+'''
+from mininet.cli import CLI
+from mininet.log import setLogLevel
+from mininet.net import Mininet
+from mininet.topo import Topo
+from mininet.node import RemoteController, UserSwitch
+
+class MinimalTopo( Topo ):
+ "Minimal topology with a single switch and two hosts"
+
+ def build( self ):
+ # Create two hosts.
+ h1 = self.addHost( 'h1' )
+ h2 = self.addHost( 'h2' )
+
+ # Create a switch
+ s1 = self.addSwitch( 's1', cls=UserSwitch)
+
+ # Add links between the switch and each host
+ self.addLink( s1, h1 )
+ self.addLink( s1, h2 )
+
+def runMinimalTopo():
+ "Bootstrap a Mininet network using the Minimal Topology"
+
+ # Create an instance of our topology
+ topo = MinimalTopo()
+
+ # Create a network based on the topology using OVS and controlled by
+ # a remote controller.
+ net = Mininet(
+ topo=topo,
+ controller=lambda name: RemoteController( name, ip='127.0.0.1' ),
+ switch=UserSwitch,
+ autoSetMacs=True )
+
+ # Actually start the network
+ net.start()
+
+ # Drop the user in to a CLI so user can run commands.
+ CLI( net )
+
+ # After the user exits the CLI, shutdown the network.
+ net.stop()
+
+if __name__ == '__main__':
+ # This runs if this file is executed directly
+ setLogLevel( 'info' )
+ runMinimalTopo()
+
+# Allows the file to be imported using `mn --custom <filename> --topo minimal`
+topos = {
+ 'minimal': MinimalTopo
+}
diff --git a/impl/src/main/resources/olt-drivers.xml b/impl/src/main/resources/olt-drivers.xml
new file mode 100644
index 0000000..fa7b585
--- /dev/null
+++ b/impl/src/main/resources/olt-drivers.xml
@@ -0,0 +1,62 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Copyright 2016-present Open Networking Foundation
+ ~
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<drivers>
+ <driver name="celestica" extends="default"
+ manufacturer="PMC GPON Networks" hwVersion="PAS5211 v2" swVersion="vOLT version 1.5.3.*">
+ <behaviour api="org.onosproject.net.behaviour.Pipeliner"
+ impl="org.opencord.olt.driver.OltPipeline"/>
+ <behaviour api="org.onosproject.net.behaviour.MeterQuery"
+ impl="org.onosproject.driver.query.FullMetersAvailable"/>
+ </driver>
+ <driver name="pmc-olt" extends="default"
+ manufacturer="PMC GPON Networks" hwVersion="PASffffffff v-1" swVersion="vOLT.*">
+ <behaviour api="org.onosproject.net.behaviour.Pipeliner"
+ impl="org.opencord.olt.driver.OltPipeline"/>
+ <behaviour api="org.onosproject.net.behaviour.MeterQuery"
+ impl="org.onosproject.driver.query.FullMetersAvailable"/>
+ </driver>
+ <driver name="voltha" extends="default"
+ manufacturer="VOLTHA Project" hwVersion=".*" swVersion=".*">
+ <behaviour api="org.onosproject.net.behaviour.Pipeliner"
+ impl="org.opencord.olt.driver.OltPipeline"/>
+ <behaviour api="org.onosproject.net.behaviour.MeterQuery"
+ impl="org.onosproject.driver.query.FullMetersAvailable"/>
+ <property name="accumulatorEnabled">true</property>
+ </driver>
+ <driver name="fj-olt" extends="default"
+ manufacturer="Fujitsu" hwVersion="svkOLT" swVersion="v1.0">
+ <behaviour api="org.onosproject.net.behaviour.Pipeliner"
+ impl="org.opencord.olt.driver.OltPipeline"/>
+ <behaviour api="org.onosproject.net.behaviour.MeterQuery"
+ impl="org.onosproject.driver.query.FullMetersAvailable"/>
+ </driver>
+ <driver name="nokia-olt" extends="default"
+ manufacturer="Nokia" hwVersion="SDOLT" swVersion="5.2.1">
+ <behaviour api="org.onosproject.net.behaviour.Pipeliner"
+ impl="org.opencord.olt.driver.NokiaOltPipeline"/>
+ <behaviour api="org.onosproject.net.behaviour.MeterQuery"
+ impl="org.onosproject.driver.query.FullMetersAvailable"/>
+ </driver>
+ <driver name="g.fast" extends="default"
+ manufacturer="TEST1" hwVersion="TEST2" swVersion="TEST3">
+ <behaviour api="org.onosproject.net.behaviour.Pipeliner"
+ impl="org.opencord.olt.driver.OltPipeline"/>
+ <behaviour api="org.onosproject.net.behaviour.MeterQuery"
+ impl="org.onosproject.driver.query.FullMetersAvailable"/>
+ </driver>
+</drivers>
+
diff --git a/impl/src/main/resources/vlan_cfg.json b/impl/src/main/resources/vlan_cfg.json
new file mode 100644
index 0000000..3bb577f
--- /dev/null
+++ b/impl/src/main/resources/vlan_cfg.json
@@ -0,0 +1,131 @@
+{
+ "apps" : {
+ "org.opencord.sadis" : {
+ "sadis" : {
+ "integration" : {
+ "cache" : {
+ "enabled" : true,
+ "maxsize" : 60,
+ "ttl" : "PT1m"
+ }
+ },
+ "entries" : [ {
+ "id" : "s1-eth1",
+ "nasPortId" : "s1-eth1",
+ "uniTagList": [
+ {
+ "uniTagMatch": 35,
+ "ponCTag":33,
+ "ponSTag":7,
+ "technologyProfileId": 2,
+ "upstreamBandwidthProfile":"High-Speed-Internet",
+ "downstreamBandwidthProfile":"Service1"
+ },
+ {
+ "uniTagMatch": 45,
+ "ponCTag":43,
+ "ponSTag":10,
+ "technologyProfileId": 3,
+ "upstreamBandwidthProfile":"VOIP",
+ "downstreamBandwidthProfile":"Service2",
+ "isDhcpRequired":"true"
+ },
+ {
+ "uniTagMatch": 55,
+ "ponCTag": 55,
+ "ponSTag": 550,
+ "technologyProfileId": 4,
+ "upstreamBandwidthProfile": "VOD",
+ "downstreamBandwidthProfile": "Service3",
+ "isDhcpRequired": "true",
+ "isIgmpRequired": "true"
+ },
+ {
+ "ponCTag": 55,
+ "ponSTag": 555,
+ "dsPonCTagPriority": 5,
+ "dsPonSTagPriority": 5,
+ "technologyProfileId": 4,
+ "serviceName": "MC"
+ }
+ ]
+ }, {
+ "id" : "1",
+ "hardwareIdentifier" : "00:00:00:00:00:01",
+ "ipAddress" : "127.0.0.1",
+ "uplinkPort": "2"
+ } ]
+ },
+ "bandwidthprofile":{
+ "integration":{
+ "cache":{
+ "enabled":true,
+ "maxsize":40,
+ "ttl":"PT1m"
+ }
+ },
+ "entries":[
+ {
+ "id":"High-Speed-Internet",
+ "cir": 500000,
+ "cbs": 10000,
+ "eir": 500000,
+ "ebs": 10000,
+ "air": 100000
+ },
+ {
+ "id":"VOIP",
+ "cir": 500000,
+ "cbs": 10000,
+ "eir": 500000,
+ "ebs": 10000,
+ "air": 100000
+ },
+ {
+ "id":"Service1",
+ "cir": 600000,
+ "cbs": 10000,
+ "eir": 400000,
+ "ebs": 10000
+ },
+ {
+ "id":"Service2",
+ "cir": 600000,
+ "cbs": 10000,
+ "eir": 400000,
+ "ebs": 10000
+ },
+ {
+ "id":"VOD",
+ "cir": 600000,
+ "cbs": 10000,
+ "eir": 400000,
+ "ebs": 10000
+ },
+ {
+ "id":"Service3",
+ "cir": 600000,
+ "cbs": 10000,
+ "eir": 400000,
+ "ebs": 10000
+ },
+ {
+ "id":"Default",
+ "cir": 0,
+ "cbs": 0,
+ "eir": 512,
+ "ebs": 30,
+ "air": 0
+ }
+ ]
+ }
+ }
+ },
+ "devices":{
+ "of:0000000000000001":{
+ "basic":{
+ "driver":"pmc-olt"
+ }
+ }
+ }
+}
diff --git a/impl/src/main/resources/vlan_cfg_with_default.json b/impl/src/main/resources/vlan_cfg_with_default.json
new file mode 100644
index 0000000..e168283
--- /dev/null
+++ b/impl/src/main/resources/vlan_cfg_with_default.json
@@ -0,0 +1,104 @@
+{
+ "apps" : {
+ "org.opencord.sadis" : {
+ "sadis" : {
+ "integration" : {
+ "cache" : {
+ "enabled" : true,
+ "maxsize" : 60,
+ "ttl" : "PT1m"
+ }
+ },
+ "entries" : [ {
+ "id" : "s1-eth1",
+ "nasPortId" : "s1-eth1",
+ "uniTagList": [
+ {
+ "uniTagMatch": 35,
+ "ponCTag":33,
+ "ponSTag":7,
+ "technologyProfileId": 2,
+ "upstreamBandwidthProfile":"High-Speed-Internet",
+ "downstreamBandwidthProfile":"Service1"
+ }
+ ]
+ }, {
+ "id" : "1",
+ "hardwareIdentifier" : "00:00:00:00:00:01",
+ "ipAddress" : "127.0.0.1",
+ "uplinkPort": "2"
+ } ]
+ },
+ "bandwidthprofile":{
+ "integration":{
+ "cache":{
+ "enabled":true,
+ "maxsize":40,
+ "ttl":"PT1m"
+ }
+ },
+ "entries":[
+ {
+ "id":"High-Speed-Internet",
+ "cir": 500000,
+ "cbs": 10000,
+ "eir": 500000,
+ "ebs": 10000,
+ "air": 100000
+ },
+ {
+ "id":"VOIP",
+ "cir": 500000,
+ "cbs": 10000,
+ "eir": 500000,
+ "ebs": 10000,
+ "air": 100000
+ },
+ {
+ "id":"Service1",
+ "cir": 600000,
+ "cbs": 10000,
+ "eir": 400000,
+ "ebs": 10000
+ },
+ {
+ "id":"Service2",
+ "cir": 600000,
+ "cbs": 10000,
+ "eir": 400000,
+ "ebs": 10000
+ },
+ {
+ "id":"VOD",
+ "cir": 600000,
+ "cbs": 10000,
+ "eir": 400000,
+ "ebs": 10000
+ },
+ {
+ "id":"Service3",
+ "cir": 600000,
+ "cbs": 10000,
+ "eir": 400000,
+ "ebs": 10000
+ },
+ {
+ "id":"Default",
+ "cir": 0,
+ "cbs": 0,
+ "eir": 512,
+ "ebs": 30,
+ "air": 0
+ }
+ ]
+ }
+ }
+ },
+ "devices":{
+ "of:0000000000000001":{
+ "basic":{
+ "driver":"pmc-olt"
+ }
+ }
+ }
+}
diff --git a/impl/src/test/java/org/opencord/olt/impl/ConsistentHasherTest.java b/impl/src/test/java/org/opencord/olt/impl/ConsistentHasherTest.java
new file mode 100644
index 0000000..1f682cb
--- /dev/null
+++ b/impl/src/test/java/org/opencord/olt/impl/ConsistentHasherTest.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.olt.impl;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.net.DeviceId;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+
+public class ConsistentHasherTest {
+
+ private static final int WEIGHT = 10;
+
+ private static final NodeId N1 = new NodeId("10.0.0.1");
+ private static final NodeId N2 = new NodeId("10.0.0.2");
+ private static final NodeId N3 = new NodeId("10.0.0.3");
+
+ private ConsistentHasher hasher;
+
+ @Before
+ public void setUp() {
+ List<NodeId> servers = new ArrayList<>();
+ servers.add(N1);
+ servers.add(N2);
+
+ hasher = new ConsistentHasher(servers, WEIGHT);
+ }
+
+ @Test
+ public void testHasher() {
+ DeviceId deviceId = DeviceId.deviceId("foo");
+ NodeId server = hasher.hash(deviceId.toString());
+
+ assertThat(server, equalTo(N1));
+
+ deviceId = DeviceId.deviceId("bsaf");
+ server = hasher.hash(deviceId.toString());
+
+ assertThat(server, equalTo(N2));
+ }
+
+ @Test
+ public void testAddServer() {
+ DeviceId deviceId = DeviceId.deviceId("foo");
+ NodeId server = hasher.hash(deviceId.toString());
+
+ assertThat(server, equalTo(N1));
+
+ hasher.addServer(N3);
+
+ server = hasher.hash(deviceId.toString());
+
+ assertThat(server, equalTo(N3));
+ }
+}
diff --git a/impl/src/test/java/org/opencord/olt/impl/OltFlowTest.java b/impl/src/test/java/org/opencord/olt/impl/OltFlowTest.java
new file mode 100644
index 0000000..3283886
--- /dev/null
+++ b/impl/src/test/java/org/opencord/olt/impl/OltFlowTest.java
@@ -0,0 +1,556 @@
+/*
+ * Copyright 2016-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.olt.impl;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.junit.Before;
+import org.junit.Test;
+import org.onlab.packet.EthType;
+import org.onlab.packet.VlanId;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.cluster.RoleInfo;
+import org.onosproject.mastership.MastershipInfo;
+import org.onosproject.mastership.MastershipListener;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.MastershipRole;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.flow.TrafficSelector;
+import org.onosproject.net.flow.TrafficTreatment;
+import org.onosproject.net.flow.criteria.Criterion;
+import org.onosproject.net.flow.criteria.EthTypeCriterion;
+import org.onosproject.net.flow.criteria.PortCriterion;
+import org.onosproject.net.flow.criteria.VlanIdCriterion;
+import org.onosproject.net.flow.instructions.Instruction;
+import org.onosproject.net.flow.instructions.Instructions;
+import org.onosproject.net.flow.instructions.L2ModificationInstruction;
+import org.onosproject.net.flowobjective.FilteringObjQueueKey;
+import org.onosproject.net.flowobjective.FilteringObjective;
+import org.onosproject.net.flowobjective.ForwardingObjQueueKey;
+import org.onosproject.net.flowobjective.ForwardingObjective;
+import org.onosproject.net.flowobjective.NextObjQueueKey;
+import org.onosproject.net.flowobjective.NextObjective;
+import org.onosproject.net.flowobjective.Objective;
+import org.onosproject.net.meter.MeterId;
+import org.onosproject.net.meter.MeterKey;
+import org.opencord.sadis.BandwidthProfileInformation;
+import org.opencord.sadis.UniTagInformation;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ListMultimap;
+
+public class OltFlowTest extends TestBase {
+ private OltFlowService oltFlowService;
+ PortNumber uniPortNumber = PortNumber.portNumber(1);
+ PortNumber uniPortNumber2 = PortNumber.portNumber(2);
+ PortNumber nniPortNumber = PortNumber.portNumber(65535);
+
+ UniTagInformation.Builder tagInfoBuilder = new UniTagInformation.Builder();
+ UniTagInformation uniTagInfo = tagInfoBuilder.setUniTagMatch(VlanId.vlanId((short) 35))
+ .setPonCTag(VlanId.vlanId((short) 33))
+ .setPonSTag(VlanId.vlanId((short) 7))
+ .setDsPonCTagPriority(0)
+ .setUsPonSTagPriority(0)
+ .setTechnologyProfileId(64)
+ .setDownstreamBandwidthProfile(dsBpId)
+ .setUpstreamBandwidthProfile(usBpId)
+ .setIsDhcpRequired(true)
+ .setIsIgmpRequired(true)
+ .build();
+
+ UniTagInformation.Builder tagInfoBuilderNoPcp = new UniTagInformation.Builder();
+ UniTagInformation uniTagInfoNoPcp = tagInfoBuilderNoPcp.setUniTagMatch(VlanId.vlanId((short) 35))
+ .setPonCTag(VlanId.vlanId((short) 34))
+ .setPonSTag(VlanId.vlanId((short) 7))
+ .setDsPonCTagPriority(-1)
+ .setUsPonSTagPriority(-1)
+ .setUsPonCTagPriority(-1)
+ .setDsPonSTagPriority(-1)
+ .setTechnologyProfileId(64)
+ .setDownstreamBandwidthProfile(dsBpId)
+ .setUpstreamBandwidthProfile(usBpId)
+ .setIsDhcpRequired(true)
+ .setIsIgmpRequired(true)
+ .build();
+
+ UniTagInformation.Builder tagInfoBuilder2 = new UniTagInformation.Builder();
+ UniTagInformation uniTagInfoNoDhcpNoIgmp = tagInfoBuilder2
+ .setUniTagMatch(VlanId.vlanId((short) 35))
+ .setPonCTag(VlanId.vlanId((short) 33))
+ .setPonSTag(VlanId.vlanId((short) 7))
+ .setDsPonCTagPriority(0)
+ .setUsPonSTagPriority(0)
+ .setTechnologyProfileId(64)
+ .setDownstreamBandwidthProfile(dsBpId)
+ .setUpstreamBandwidthProfile(usBpId)
+ .build();
+
+ @Before
+ public void setUp() {
+ oltFlowService = new OltFlowService();
+ oltFlowService.oltMeterService = new MockOltMeterService();
+ oltFlowService.flowObjectiveService = new MockOltFlowObjectiveService();
+ oltFlowService.mastershipService = new MockMastershipService();
+ oltFlowService.sadisService = new MockSadisService();
+ oltFlowService.bpService = oltFlowService.sadisService.getBandwidthProfileService();
+ oltFlowService.appId = appId;
+ }
+
+ @Test
+ public void testDhcpFiltering() {
+ oltFlowService.flowObjectiveService.clearQueue();
+ // ensure upstream dhcp traps can be added and removed
+ oltFlowService.processDhcpFilteringObjectives(DEVICE_ID_1, uniPortNumber,
+ usMeterId, uniTagInfo,
+ true, true);
+ assert oltFlowService.flowObjectiveService.getPendingFlowObjectives().size() == 1;
+ oltFlowService.processDhcpFilteringObjectives(DEVICE_ID_1, uniPortNumber,
+ usMeterId, uniTagInfo,
+ false, true);
+ assert oltFlowService.flowObjectiveService.getPendingFlowObjectives().size() == 2;
+
+ // Ensure upstream flow has no pcp unless properly specified.
+ oltFlowService.processDhcpFilteringObjectives(DEVICE_ID_1, uniPortNumber2,
+ usMeterId, uniTagInfoNoPcp,
+ true, true);
+ assert oltFlowService.flowObjectiveService.getPendingFlowObjectives().size() == 3;
+
+ // ensure upstream flows are not added if uniTagInfo is missing dhcp requirement
+ oltFlowService.processDhcpFilteringObjectives(DEVICE_ID_1, uniPortNumber,
+ usMeterId, uniTagInfoNoDhcpNoIgmp,
+ true, true);
+ assert oltFlowService.flowObjectiveService.getPendingFlowObjectives().size() == 3;
+
+ // ensure downstream traps don't succeed without global config for nni ports
+ oltFlowService.processDhcpFilteringObjectives(DEVICE_ID_1, nniPortNumber,
+ null, null,
+ true, false);
+ oltFlowService.processDhcpFilteringObjectives(DEVICE_ID_1, nniPortNumber,
+ null, null,
+ false, false);
+ assert oltFlowService.flowObjectiveService.getPendingFlowObjectives().size() == 3;
+ // do global config for nni ports and now it should succeed
+ oltFlowService.enableDhcpOnNni = true;
+ oltFlowService.processDhcpFilteringObjectives(DEVICE_ID_1, nniPortNumber,
+ null, null,
+ true, false);
+ oltFlowService.processDhcpFilteringObjectives(DEVICE_ID_1, nniPortNumber,
+ null, null,
+ false, false);
+ assert oltFlowService.flowObjectiveService.getPendingFlowObjectives().size() == 5;
+
+ // turn on DHCPv6 and we should get 2 flows
+ oltFlowService.enableDhcpV6 = true;
+ oltFlowService.processDhcpFilteringObjectives(DEVICE_ID_1, uniPortNumber,
+ usMeterId, uniTagInfo,
+ true, true);
+ assert oltFlowService.flowObjectiveService.getPendingFlowObjectives().size() == 7;
+
+ // turn off DHCPv4 and it's only v6
+ oltFlowService.enableDhcpV4 = false;
+ oltFlowService.processDhcpFilteringObjectives(DEVICE_ID_1, uniPortNumber,
+ usMeterId, uniTagInfo,
+ true, true);
+ assert oltFlowService.flowObjectiveService.getPendingFlowObjectives().size() == 8;
+
+ // cleanup
+ oltFlowService.flowObjectiveService.clearQueue();
+ oltFlowService.enableDhcpV4 = true;
+ oltFlowService.enableDhcpV6 = false;
+ }
+
+ @Test
+ public void testPppoedFiltering() {
+ oltFlowService.flowObjectiveService.clearQueue();
+
+ // ensure pppoed traps are not added if global config is off.
+ oltFlowService.enablePppoe = false;
+ oltFlowService.processPPPoEDFilteringObjectives(DEVICE_ID_1, uniPortNumber,
+ usMeterId, uniTagInfo,
+ true, true);
+ assert oltFlowService.flowObjectiveService.getPendingFlowObjectives().size() == 0;
+
+ // ensure upstream pppoed traps can be added and removed
+ oltFlowService.enablePppoe = true;
+ oltFlowService.processPPPoEDFilteringObjectives(DEVICE_ID_1, uniPortNumber,
+ usMeterId, uniTagInfo,
+ true, true);
+ assert oltFlowService.flowObjectiveService.getPendingFlowObjectives().size() == 1;
+ oltFlowService.processPPPoEDFilteringObjectives(DEVICE_ID_1, uniPortNumber,
+ usMeterId, uniTagInfo,
+ false, true);
+ assert oltFlowService.flowObjectiveService.getPendingFlowObjectives().size() == 2;
+
+ // ensure downstream pppoed traps can be added and removed
+ oltFlowService.processPPPoEDFilteringObjectives(DEVICE_ID_1, nniPortNumber,
+ null, null,
+ true, false);
+ assert oltFlowService.flowObjectiveService.getPendingFlowObjectives().size() == 3;
+ oltFlowService.processPPPoEDFilteringObjectives(DEVICE_ID_1, nniPortNumber,
+ null, null,
+ false, false);
+ assert oltFlowService.flowObjectiveService.getPendingFlowObjectives().size() == 4;
+
+ // cleanup
+ oltFlowService.flowObjectiveService.clearQueue();
+ }
+
+ @Test
+ public void testIgmpFiltering() {
+ oltFlowService.flowObjectiveService.clearQueue();
+
+ // ensure igmp flows can be added and removed
+ oltFlowService.processIgmpFilteringObjectives(DEVICE_ID_1, uniPortNumber,
+ usMeterId, uniTagInfo,
+ true, true);
+ assert oltFlowService.flowObjectiveService.getPendingFlowObjectives().size() == 1;
+ oltFlowService.processIgmpFilteringObjectives(DEVICE_ID_1, uniPortNumber, usMeterId,
+ uniTagInfo,
+ false, true);
+ assert oltFlowService.flowObjectiveService.getPendingFlowObjectives().size() == 2;
+
+ // ensure igmp flow is not added if uniTag has no igmp requirement
+ oltFlowService.processIgmpFilteringObjectives(DEVICE_ID_1, uniPortNumber,
+ usMeterId, uniTagInfoNoDhcpNoIgmp,
+ true, true);
+ assert oltFlowService.flowObjectiveService.getPendingFlowObjectives().size() == 2;
+
+ //ensure igmp flow on NNI fails without global setting
+ oltFlowService.processIgmpFilteringObjectives(DEVICE_ID_1, nniPortNumber,
+ null, null,
+ true, false);
+ assert oltFlowService.flowObjectiveService.getPendingFlowObjectives().size() == 2;
+
+ // igmp trap on NNI should succeed with global config
+ oltFlowService.enableIgmpOnNni = true;
+ oltFlowService.processIgmpFilteringObjectives(DEVICE_ID_1, nniPortNumber,
+ null, null,
+ true, false);
+ assert oltFlowService.flowObjectiveService.getPendingFlowObjectives().size() == 3;
+ // cleanup
+ oltFlowService.flowObjectiveService.clearQueue();
+
+ }
+
+ @Test
+ public void testEapolFiltering() {
+ addBandwidthProfile(uniTagInfo.getUpstreamBandwidthProfile());
+ oltFlowService.enableEapol = true;
+
+ //will install
+ oltFlowService.processEapolFilteringObjectives(DEVICE_ID_1, uniPortNumber,
+ uniTagInfo.getUpstreamBandwidthProfile(), new CompletableFuture<>(),
+ uniTagInfo.getUniTagMatch(), true);
+
+ //bp profile doesn't exist
+ oltFlowService.processEapolFilteringObjectives(DEVICE_ID_1, uniPortNumber,
+ uniTagInfo.getDownstreamBandwidthProfile(), new CompletableFuture<>(),
+ uniTagInfo.getUniTagMatch(), true);
+ }
+
+ @Test
+ public void testLldpFiltering() {
+ oltFlowService.processLldpFilteringObjective(DEVICE_ID_1, nniPortNumber, true);
+ oltFlowService.processLldpFilteringObjective(DEVICE_ID_1, nniPortNumber, false);
+ }
+
+ @Test
+ public void testNniFiltering() {
+ oltFlowService.flowObjectiveService.clearQueue();
+ oltFlowService.enableDhcpOnNni = true;
+ oltFlowService.enableIgmpOnNni = true;
+ oltFlowService.processNniFilteringObjectives(DEVICE_ID_1, nniPortNumber, true);
+ assert oltFlowService.flowObjectiveService.getPendingFlowObjectives()
+ .size() == 3;
+ oltFlowService.processNniFilteringObjectives(DEVICE_ID_1, nniPortNumber, false);
+ assert oltFlowService.flowObjectiveService.getPendingFlowObjectives()
+ .size() == 6;
+ oltFlowService.flowObjectiveService.clearQueue();
+ }
+
+ @Test
+ public void testUpBuilder() {
+ ForwardingObjective objective =
+ oltFlowService.createUpBuilder(nniPortNumber, uniPortNumber, usMeterId, uniTagInfo).add();
+ checkObjective(objective, true);
+ }
+
+ @Test
+ public void testDownBuilder() {
+ ForwardingObjective objective =
+ oltFlowService.createDownBuilder(nniPortNumber, uniPortNumber, dsMeterId, uniTagInfo).remove();
+ checkObjective(objective, false);
+ }
+
+ private void checkObjective(ForwardingObjective fwd, boolean upstream) {
+ TrafficTreatment treatment = fwd.treatment();
+
+ //check instructions
+ Set<Instructions.MeterInstruction> meters = treatment.meters();
+ assert !meters.isEmpty();
+
+ Instructions.MetadataInstruction writeMetadata = treatment.writeMetadata();
+ assert writeMetadata != null;
+
+ List<Instruction> immediateInstructions = treatment.immediate();
+ Optional<Instruction> vlanInstruction = immediateInstructions.stream()
+ .filter(i -> i.type() == Instruction.Type.L2MODIFICATION)
+ .filter(i -> ((L2ModificationInstruction) i).subtype() ==
+ L2ModificationInstruction.L2SubType.VLAN_PUSH ||
+ ((L2ModificationInstruction) i).subtype() ==
+ L2ModificationInstruction.L2SubType.VLAN_POP)
+ .findAny();
+
+ assert vlanInstruction.isPresent();
+
+ //check match criteria
+ TrafficSelector selector = fwd.selector();
+ assert selector.getCriterion(Criterion.Type.IN_PORT) != null;
+ assert selector.getCriterion(Criterion.Type.VLAN_VID) != null;
+
+ if (!upstream) {
+ assert selector.getCriterion(Criterion.Type.METADATA) != null;
+ }
+ }
+
+ private class MockOltMeterService implements org.opencord.olt.internalapi.AccessDeviceMeterService {
+ @Override
+ public ImmutableMap<String, Collection<MeterKey>> getBpMeterMappings() {
+ return null;
+ }
+
+ @Override
+ public MeterId getMeterIdFromBpMapping(DeviceId deviceId, String bandwidthProfile) {
+ return null;
+ }
+
+
+ @Override
+ public ImmutableSet<MeterKey> getProgMeters() {
+ return null;
+ }
+
+ @Override
+ public MeterId createMeter(DeviceId deviceId, BandwidthProfileInformation bpInfo,
+ CompletableFuture<Object> meterFuture) {
+ return usMeterId;
+ }
+
+ @Override
+ public void removeFromPendingMeters(DeviceId deviceId, BandwidthProfileInformation bwpInfo) {
+
+ }
+
+ @Override
+ public boolean checkAndAddPendingMeter(DeviceId deviceId, BandwidthProfileInformation bwpInfo) {
+ return false;
+ }
+
+
+ @Override
+ public void clearMeters(DeviceId deviceId) {
+ }
+
+ @Override
+ public void clearDeviceState(DeviceId deviceId) {
+
+ }
+ }
+
+ private class MockOltFlowObjectiveService implements org.onosproject.net.flowobjective.FlowObjectiveService {
+ List<String> flowObjectives = new ArrayList<>();
+
+ @Override
+ public void filter(DeviceId deviceId, FilteringObjective filteringObjective) {
+ flowObjectives.add(filteringObjective.toString());
+ EthTypeCriterion ethType = (EthTypeCriterion)
+ filterForCriterion(filteringObjective.conditions(), Criterion.Type.ETH_TYPE);
+
+ Instructions.MeterInstruction meter = filteringObjective.meta().metered();
+ Instruction writeMetadata = filteringObjective.meta().writeMetadata();
+ VlanIdCriterion vlanIdCriterion = (VlanIdCriterion)
+ filterForCriterion(filteringObjective.conditions(), Criterion.Type.VLAN_VID);
+ PortCriterion portCriterion = (PortCriterion) filteringObjective.key();
+
+ filteringObjective.meta().allInstructions().forEach(instruction -> {
+ if (instruction.type().equals(Instruction.Type.L2MODIFICATION)) {
+ L2ModificationInstruction l2Instruction = (L2ModificationInstruction) instruction;
+ if (l2Instruction.subtype().equals(L2ModificationInstruction.L2SubType.VLAN_PCP)) {
+ //this, given the uniTagInfo we provide, should not be present
+ assert false;
+ }
+ }
+ });
+
+
+ if (ethType.ethType().equals(EthType.EtherType.LLDP.ethType()) ||
+ portCriterion.port().equals(nniPortNumber)) {
+ assert meter == null;
+ assert writeMetadata == null;
+ assert vlanIdCriterion == null;
+ } else {
+ assert meter.meterId().equals(usMeterId) || meter.meterId().equals(dsMeterId);
+ assert writeMetadata != null;
+ assert vlanIdCriterion == null || vlanIdCriterion.vlanId() == uniTagInfo.getUniTagMatch()
+ || vlanIdCriterion.vlanId() == uniTagInfoNoPcp.getUniTagMatch();
+ }
+
+ }
+
+ @Override
+ public void forward(DeviceId deviceId, ForwardingObjective forwardingObjective) {
+
+ }
+
+ @Override
+ public void next(DeviceId deviceId, NextObjective nextObjective) {
+
+ }
+
+ @Override
+ public int allocateNextId() {
+ return 0;
+ }
+
+ @Override
+ public void initPolicy(String s) {
+
+ }
+
+ @Override
+ public void apply(DeviceId deviceId, Objective objective) {
+
+ }
+
+ @Override
+ public Map<Pair<Integer, DeviceId>, List<String>> getNextMappingsChain() {
+ return null;
+ }
+
+ @Override
+ public List<String> getNextMappings() {
+ return null;
+ }
+
+ @Override
+ public List<String> getPendingFlowObjectives() {
+ return ImmutableList.copyOf(flowObjectives);
+ }
+
+ @Override
+ public ListMultimap<FilteringObjQueueKey, Objective> getFilteringObjQueue() {
+ return null;
+ }
+
+ @Override
+ public ListMultimap<ForwardingObjQueueKey, Objective> getForwardingObjQueue() {
+ return null;
+ }
+
+ @Override
+ public ListMultimap<NextObjQueueKey, Objective> getNextObjQueue() {
+ return null;
+ }
+
+ @Override
+ public Map<FilteringObjQueueKey, Objective> getFilteringObjQueueHead() {
+ return null;
+ }
+
+ @Override
+ public Map<ForwardingObjQueueKey, Objective> getForwardingObjQueueHead() {
+ return null;
+ }
+
+ @Override
+ public Map<NextObjQueueKey, Objective> getNextObjQueueHead() {
+ return null;
+ }
+
+ @Override
+ public void clearQueue() {
+ flowObjectives.clear();
+ }
+
+ private Criterion filterForCriterion(Collection<Criterion> criteria, Criterion.Type type) {
+ return criteria.stream()
+ .filter(c -> c.type().equals(type))
+ .limit(1)
+ .findFirst().orElse(null);
+ }
+ }
+
+ private class MockMastershipService implements org.onosproject.mastership.MastershipService {
+ @Override
+ public MastershipRole getLocalRole(DeviceId deviceId) {
+ return null;
+ }
+
+ @Override
+ public boolean isLocalMaster(DeviceId deviceId) {
+ return true;
+ }
+
+ @Override
+ public CompletableFuture<MastershipRole> requestRoleFor(DeviceId deviceId) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Void> relinquishMastership(DeviceId deviceId) {
+ return null;
+ }
+
+ @Override
+ public NodeId getMasterFor(DeviceId deviceId) {
+ return null;
+ }
+
+ @Override
+ public RoleInfo getNodesFor(DeviceId deviceId) {
+ return null;
+ }
+
+ @Override
+ public MastershipInfo getMastershipFor(DeviceId deviceId) {
+ return null;
+ }
+
+ @Override
+ public Set<DeviceId> getDevicesOf(NodeId nodeId) {
+ return null;
+ }
+
+ @Override
+ public void addListener(MastershipListener mastershipListener) {
+
+ }
+
+ @Override
+ public void removeListener(MastershipListener mastershipListener) {
+
+ }
+ }
+}
diff --git a/impl/src/test/java/org/opencord/olt/impl/OltMeterTest.java b/impl/src/test/java/org/opencord/olt/impl/OltMeterTest.java
new file mode 100644
index 0000000..000ea35
--- /dev/null
+++ b/impl/src/test/java/org/opencord/olt/impl/OltMeterTest.java
@@ -0,0 +1,138 @@
+/*
+ * Copyright 2016-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.olt.impl;
+
+import com.google.common.collect.ImmutableMap;
+import org.junit.Before;
+import org.junit.Test;
+import org.onosproject.cfg.ComponentConfigAdapter;
+import org.onosproject.core.CoreServiceAdapter;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.meter.DefaultMeter;
+import org.onosproject.net.meter.Meter;
+import org.onosproject.net.meter.MeterId;
+import org.onosproject.net.meter.MeterKey;
+import org.onosproject.net.meter.MeterListener;
+import org.onosproject.net.meter.MeterRequest;
+import org.onosproject.store.service.TestStorageService;
+import org.opencord.sadis.BandwidthProfileInformation;
+
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+public class OltMeterTest extends TestBase {
+ private OltMeterService oltMeterService;
+
+ private BandwidthProfileInformation bandwidthProfileInformation = new BandwidthProfileInformation();
+
+ @Before
+ public void setUp() {
+ oltMeterService = new OltMeterService();
+ oltMeterService.storageService = new TestStorageService();
+ oltMeterService.meterService = new MockMeterService();
+ oltMeterService.coreService = new CoreServiceAdapter();
+ oltMeterService.componentConfigService = new ComponentConfigAdapter();
+ oltMeterService.activate(null);
+ oltMeterService.bpInfoToMeter = new MockConsistentMultimap<>();
+ }
+
+ @Test
+ public void testAddAndGetMeterIdToBpMapping() {
+ oltMeterService.addMeterIdToBpMapping(DEVICE_ID_1, usMeterId, usBpId);
+ MeterId usMeterId = oltMeterService.getMeterIdFromBpMapping(DEVICE_ID_1, usBpId);
+ assert usMeterId.equals(this.usMeterId);
+
+ oltMeterService.addMeterIdToBpMapping(DEVICE_ID_1, dsMeterId, dsBpId);
+ MeterId dsMeterId = oltMeterService.getMeterIdFromBpMapping(DEVICE_ID_1, dsBpId);
+ assert dsMeterId.equals(this.dsMeterId);
+
+ ImmutableMap<String, Collection<MeterKey>> meterMappings = oltMeterService.getBpMeterMappings();
+ assert meterMappings.size() == 2;
+ }
+
+ @Test
+ public void testCreateMeter() {
+ //with provided bandwidth profile information
+ bandwidthProfileInformation.setId(usBpId);
+ bandwidthProfileInformation.setExceededInformationRate(10000);
+ bandwidthProfileInformation.setExceededBurstSize(10000L);
+ bandwidthProfileInformation.setCommittedBurstSize(10000L);
+ bandwidthProfileInformation.setCommittedInformationRate(10000);
+
+ oltMeterService.addMeterIdToBpMapping(DEVICE_ID_1, usMeterId, usBpId);
+
+
+ MeterId meterId =
+ oltMeterService.createMeter(DEVICE_ID_1, bandwidthProfileInformation, new CompletableFuture<>());
+ assert meterId != null;
+
+ //with null bandwidth profile information
+ meterId = oltMeterService.createMeter(DEVICE_ID_1, null, new CompletableFuture<>());
+ assert meterId == null;
+ }
+
+
+ private class MockMeterService implements org.onosproject.net.meter.MeterService {
+ @Override
+ public Meter submit(MeterRequest meterRequest) {
+ return DefaultMeter.builder()
+ .forDevice(DEVICE_ID_1)
+ .fromApp(appId)
+ .withId(usMeterId)
+ .build();
+ }
+
+ @Override
+ public void withdraw(MeterRequest meterRequest, MeterId meterId) {
+
+ }
+
+ @Override
+ public Meter getMeter(DeviceId deviceId, MeterId meterId) {
+ return null;
+ }
+
+ @Override
+ public Collection<Meter> getAllMeters() {
+ return null;
+ }
+
+ @Override
+ public Collection<Meter> getMeters(DeviceId deviceId) {
+ return null;
+ }
+
+ @Override
+ public MeterId allocateMeterId(DeviceId deviceId) {
+ return null;
+ }
+
+ @Override
+ public void freeMeterId(DeviceId deviceId, MeterId meterId) {
+
+ }
+
+ @Override
+ public void addListener(MeterListener meterListener) {
+
+ }
+
+ @Override
+ public void removeListener(MeterListener meterListener) {
+
+ }
+ }
+}
diff --git a/impl/src/test/java/org/opencord/olt/impl/OltTest.java b/impl/src/test/java/org/opencord/olt/impl/OltTest.java
new file mode 100644
index 0000000..c417590
--- /dev/null
+++ b/impl/src/test/java/org/opencord/olt/impl/OltTest.java
@@ -0,0 +1,161 @@
+/*
+ * Copyright 2016-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.olt.impl;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Set;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.onlab.packet.ChassisId;
+import org.onosproject.net.AnnotationKeys;
+import org.onosproject.net.Annotations;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.DefaultAnnotations;
+import org.onosproject.net.DefaultDevice;
+import org.onosproject.net.Device;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.Element;
+import org.onosproject.net.Port;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.device.DeviceServiceAdapter;
+import org.onosproject.net.provider.ProviderId;
+import org.opencord.sadis.SubscriberAndDeviceInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OltTest extends TestBase {
+ private final Logger log = LoggerFactory.getLogger(getClass());
+ private Olt olt;
+
+
+ private static final String SCHEME_NAME = "olt";
+ private static final DefaultAnnotations DEVICE_ANNOTATIONS = DefaultAnnotations.builder()
+ .set(AnnotationKeys.PROTOCOL, SCHEME_NAME.toUpperCase()).build();
+
+ @Before
+ public void setUp() {
+ olt = new Olt();
+ olt.deviceService = new MockDeviceService();
+ olt.sadisService = new MockSadisService();
+ olt.subsService = olt.sadisService.getSubscriberInfoService();
+ }
+
+ /**
+ * Tests that the getSubscriber method does throw a NullPointerException with a meaningful message.
+ */
+ @Test
+ public void testGetSubscriberError() {
+ ConnectPoint cp = ConnectPoint.deviceConnectPoint(OLT_DEV_ID + "/" + 1);
+ try {
+ olt.getSubscriber(cp);
+ } catch (NullPointerException e) {
+ assertEquals(e.getMessage(), "Invalid connect point");
+ }
+ }
+
+ /**
+ * Tests that the getSubscriber method returns Subscriber informations.
+ */
+ @Test
+ public void testGetSubscriber() {
+ ConnectPoint cp = ConnectPoint.deviceConnectPoint(OLT_DEV_ID + "/" + 2);
+
+ SubscriberAndDeviceInformation s = olt.getSubscriber(cp);
+
+ assertEquals(s.circuitId(), CLIENT_CIRCUIT_ID);
+ assertEquals(s.nasPortId(), CLIENT_NAS_PORT_ID);
+ }
+
+ private class MockDevice extends DefaultDevice {
+
+ public MockDevice(ProviderId providerId, DeviceId id, Type type,
+ String manufacturer, String hwVersion, String swVersion,
+ String serialNumber, ChassisId chassisId, Annotations... annotations) {
+ super(providerId, id, type, manufacturer, hwVersion, swVersion, serialNumber,
+ chassisId, annotations);
+ }
+ }
+
+ private class MockDeviceService extends DeviceServiceAdapter {
+
+ private ProviderId providerId = new ProviderId("of", "foo");
+ private final Device device1 = new MockDevice(providerId, DEVICE_ID_1, Device.Type.SWITCH,
+ "foo.inc", "0", "0", OLT_DEV_ID, new ChassisId(),
+ DEVICE_ANNOTATIONS);
+
+ @Override
+ public Device getDevice(DeviceId devId) {
+ return device1;
+
+ }
+
+ @Override
+ public Port getPort(ConnectPoint cp) {
+ log.info("Looking up port {}", cp.port().toString());
+ if (cp.port().toString().equals("1")) {
+ return null;
+ }
+ return new MockPort();
+ }
+ }
+
+ private class MockPort implements Port {
+
+ @Override
+ public boolean isEnabled() {
+ return true;
+ }
+ @Override
+ public long portSpeed() {
+ return 1000;
+ }
+ @Override
+ public Element element() {
+ return null;
+ }
+ @Override
+ public PortNumber number() {
+ return null;
+ }
+ @Override
+ public Annotations annotations() {
+ return new MockAnnotations();
+ }
+ @Override
+ public Type type() {
+ return Port.Type.FIBER;
+ }
+
+ private class MockAnnotations implements Annotations {
+
+ @Override
+ public String value(String val) {
+ return "BRCM12345678";
+ }
+ @Override
+ public Set<String> keys() {
+ return null;
+ }
+ }
+ }
+
+
+
+
+
+}
\ No newline at end of file
diff --git a/impl/src/test/java/org/opencord/olt/impl/TestBase.java b/impl/src/test/java/org/opencord/olt/impl/TestBase.java
new file mode 100644
index 0000000..4d7bcd5
--- /dev/null
+++ b/impl/src/test/java/org/opencord/olt/impl/TestBase.java
@@ -0,0 +1,324 @@
+/*
+ * Copyright 2016-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.olt.impl;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multiset;
+import org.onlab.packet.Ip4Address;
+import org.onlab.packet.MacAddress;
+import org.onosproject.core.DefaultApplicationId;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.meter.MeterId;
+import org.onosproject.store.service.AsyncConsistentMultimap;
+import org.onosproject.store.service.ConsistentMultimap;
+import org.onosproject.store.service.ConsistentMultimapBuilder;
+import org.onosproject.store.service.MultimapEventListener;
+import org.onosproject.store.service.TestConsistentMultimap;
+import org.onosproject.store.service.Versioned;
+import org.opencord.sadis.BandwidthProfileInformation;
+import org.opencord.sadis.BaseInformationService;
+import org.opencord.sadis.SadisService;
+import org.opencord.sadis.SubscriberAndDeviceInformation;
+
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+public class TestBase {
+
+ protected static final String CLIENT_NAS_PORT_ID = "PON 1/1";
+ protected static final String CLIENT_CIRCUIT_ID = "CIR-PON 1/1";
+ protected static final String OLT_DEV_ID = "of:00000000000000aa";
+ protected static final DeviceId DEVICE_ID_1 = DeviceId.deviceId(OLT_DEV_ID);
+ protected MeterId usMeterId = MeterId.meterId(1);
+ protected MeterId dsMeterId = MeterId.meterId(2);
+ protected String usBpId = "HSIA-US";
+ protected String dsBpId = "HSIA-DS";
+ protected DefaultApplicationId appId = new DefaultApplicationId(1, "OltServices");
+
+ Map<String, BandwidthProfileInformation> bpInformation = Maps.newConcurrentMap();
+
+ protected void addBandwidthProfile(String id) {
+ BandwidthProfileInformation bpInfo = new BandwidthProfileInformation();
+ bpInfo.setAssuredInformationRate(0);
+ bpInfo.setCommittedInformationRate(10000);
+ bpInfo.setCommittedBurstSize(1000L);
+ bpInfo.setExceededBurstSize(2000L);
+ bpInfo.setExceededInformationRate(20000);
+ bpInformation.put(id, bpInfo);
+ }
+
+ protected class MockSadisService implements SadisService {
+
+ @Override
+ public BaseInformationService<SubscriberAndDeviceInformation> getSubscriberInfoService() {
+ return new MockSubService();
+ }
+
+ @Override
+ public BaseInformationService<BandwidthProfileInformation> getBandwidthProfileService() {
+ return new MockBpService();
+ }
+ }
+
+ private class MockBpService implements BaseInformationService<BandwidthProfileInformation> {
+ @Override
+ public void invalidateAll() {
+
+ }
+
+ @Override
+ public void invalidateId(String id) {
+
+ }
+
+ @Override
+ public BandwidthProfileInformation get(String id) {
+ return bpInformation.get(id);
+ }
+
+ @Override
+ public BandwidthProfileInformation getfromCache(String id) {
+ return null;
+ }
+ }
+
+ private class MockSubService implements BaseInformationService<SubscriberAndDeviceInformation> {
+ MockSubscriberAndDeviceInformation sub =
+ new MockSubscriberAndDeviceInformation(CLIENT_NAS_PORT_ID,
+ CLIENT_NAS_PORT_ID, CLIENT_CIRCUIT_ID, null, null);
+
+ @Override
+ public SubscriberAndDeviceInformation get(String id) {
+ return sub;
+ }
+
+ @Override
+ public void invalidateAll() {
+ }
+
+ @Override
+ public void invalidateId(String id) {
+ }
+
+ @Override
+ public SubscriberAndDeviceInformation getfromCache(String id) {
+ return null;
+ }
+ }
+
+ private class MockSubscriberAndDeviceInformation extends SubscriberAndDeviceInformation {
+
+ MockSubscriberAndDeviceInformation(String id, String nasPortId,
+ String circuitId, MacAddress hardId,
+ Ip4Address ipAddress) {
+ this.setHardwareIdentifier(hardId);
+ this.setId(id);
+ this.setIPAddress(ipAddress);
+ this.setNasPortId(nasPortId);
+ this.setCircuitId(circuitId);
+ }
+ }
+
+ class MockConsistentMultimap<K, V> implements ConsistentMultimap<K, V> {
+ private HashMultimap<K, Versioned<V>> innermap;
+ private AtomicLong counter = new AtomicLong();
+
+ public MockConsistentMultimap() {
+ this.innermap = HashMultimap.create();
+ }
+
+ private Versioned<V> version(V v) {
+ return new Versioned<>(v, counter.incrementAndGet(), System.currentTimeMillis());
+ }
+
+ private Versioned<Collection<? extends V>> versionCollection(Collection<? extends V> collection) {
+ return new Versioned<>(collection, counter.incrementAndGet(), System.currentTimeMillis());
+ }
+
+ @Override
+ public int size() {
+ return innermap.size();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return innermap.isEmpty();
+ }
+
+ @Override
+ public boolean containsKey(K key) {
+ return innermap.containsKey(key);
+ }
+
+ @Override
+ public boolean containsValue(V value) {
+ return innermap.containsValue(value);
+ }
+
+ @Override
+ public boolean containsEntry(K key, V value) {
+ return innermap.containsEntry(key, value);
+ }
+
+ @Override
+ public boolean put(K key, V value) {
+ return innermap.put(key, version(value));
+ }
+
+ @Override
+ public Versioned<Collection<? extends V>> putAndGet(K key, V value) {
+ innermap.put(key, version(value));
+ return (Versioned<Collection<? extends V>>) innermap.get(key);
+ }
+
+ @Override
+ public boolean remove(K key, V value) {
+ return innermap.remove(key, value);
+ }
+
+ @Override
+ public Versioned<Collection<? extends V>> removeAndGet(K key, V value) {
+ innermap.remove(key, value);
+ return (Versioned<Collection<? extends V>>) innermap.get(key);
+ }
+
+ @Override
+ public boolean removeAll(K key, Collection<? extends V> values) {
+ return false;
+ }
+
+ @Override
+ public Versioned<Collection<? extends V>> removeAll(K key) {
+ return null;
+ }
+
+ @Override
+ public boolean putAll(K key, Collection<? extends V> values) {
+ return false;
+ }
+
+ @Override
+ public Versioned<Collection<? extends V>> replaceValues(K key, Collection<V> values) {
+ return null;
+ }
+
+ @Override
+ public void clear() {
+ innermap.clear();
+ }
+
+ @Override
+ public Versioned<Collection<? extends V>> get(K key) {
+ Collection<? extends V> values = innermap.get(key).stream()
+ .map(v -> v.value())
+ .collect(Collectors.toList());
+ return versionCollection(values);
+ }
+
+ @Override
+ public Set<K> keySet() {
+ return innermap.keySet();
+ }
+
+ @Override
+ public Multiset<K> keys() {
+ return innermap.keys();
+ }
+
+ @Override
+ public Multiset<V> values() {
+ return null;
+ }
+
+ @Override
+ public Collection<Map.Entry<K, V>> entries() {
+ return null;
+ }
+
+ @Override
+ public Iterator<Map.Entry<K, V>> iterator() {
+ return new ConsistentMultimapIterator(innermap.entries().iterator());
+ }
+
+ @Override
+ public Map<K, Collection<V>> asMap() {
+ return null;
+ }
+
+ @Override
+ public void addListener(MultimapEventListener<K, V> listener, Executor executor) {
+ }
+
+ @Override
+ public void removeListener(MultimapEventListener<K, V> listener) {
+ }
+
+ @Override
+ public String name() {
+ return "mock multimap";
+ }
+
+ @Override
+ public Type primitiveType() {
+ return null;
+ }
+
+ private class ConsistentMultimapIterator implements Iterator<Map.Entry<K, V>> {
+
+ private final Iterator<Map.Entry<K, Versioned<V>>> it;
+
+ public ConsistentMultimapIterator(Iterator<Map.Entry<K, Versioned<V>>> it) {
+ this.it = it;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return it.hasNext();
+ }
+
+ @Override
+ public Map.Entry<K, V> next() {
+ Map.Entry<K, Versioned<V>> e = it.next();
+ return new AbstractMap.SimpleEntry<>(e.getKey(), e.getValue().value());
+ }
+ }
+
+ }
+
+ public static TestConsistentMultimap.Builder builder() {
+ return new TestConsistentMultimap.Builder();
+ }
+
+ public static class Builder<K, V> extends ConsistentMultimapBuilder<K, V> {
+
+ @Override
+ public AsyncConsistentMultimap<K, V> buildMultimap() {
+ return null;
+ }
+
+ @Override
+ public ConsistentMultimap<K, V> build() {
+ return new TestConsistentMultimap<K, V>();
+ }
+ }
+}