igmp proxy app
Change-Id: If053dc39b611a6176b1d59004a480d65bfac9091
diff --git a/DesignDoc.md b/DesignDoc.md
new file mode 100644
index 0000000..e75548b
--- /dev/null
+++ b/DesignDoc.md
@@ -0,0 +1,284 @@
+**Terminology**
+
+| **IGMP** | **Internet Group Management Protocol** |
+|------------|--------------------------------------------|
+| **CPE** | **Customer Premise Equipment** |
+| **ONOS** | **Open Network Operation System** |
+| **OF** | **OpenFlow** |
+| **vOLT** | **Virtualized Optical Line Terminal** |
+| **ONT** | **Optical Network Terminal** |
+| **SD-OLT** | **Software Defined Optical Line Terminal** |
+
+1. IGMP Overview and feature list
+
+ 1. Support Igmp join
+
+IGMP Proxy app receives and decodes igmp report messega from hosts. if the host
+ask to join a group, Igmpproxy will add flow rule for the multicast traffic to
+olt .And if it is the **first host join** into the group, Igmpproxy will forward
+the message to olt uplink port.
+
+If it is not the first host that joins into the multicast group, Igmpproxy will
+add bucket into the flow rule, but will not forward to uplink port.
+
+Support Igmp leave
+------------------
+
+IGMP Proxy app receives and decodes igmp report message from host, if the host
+ask to leave a group, igmpproxy will raise a query to the group and wait for
+hosts’ response. If any host not response for the query, Igmpproxy will remove
+the host from this group.
+
+Support Igmp fast leave
+-----------------------
+
+IGMP Proxy app receives and decodes igmp report message from host, if the host
+ask to leave a group, Igmpproxy will remove bucket from the group’s flow rule.
+
+If it is the latest host that existing in the group, Igmpproxy will forward the
+message to olt uplink port.
+
+Support Igmp period query and keep alive
+----------------------------------------
+
+Igmpproxy will periodical sent query message to each existing group, If any host
+not response for the query, Igmpproxy will remove the host from this group.
+
+1. Software Architecture
+
+ 1. IGMP Deployment Architecture
+
+ ![cid:image001.png\@01D2036E.8855DCA0](media/72caf6b6c9c6f9a128c5eca8423b3767.png)
+
+ 2. ONOS APP Architecture
+
+*App layer:*
+
+- *Igmpproxy:* Nokia developed app for igmp (use proxy mode).
+
+- *IgmpSnoop:* ONOS official app for igmp (use snoop mode).
+
+- *Cordmcast:* ONOS official app , as the adapter between onos core service
+ and specific igmp implements
+
+*Onos Core Layer:*
+
+- *Flow objective:* for specific device and behavior, to find a driver for
+ adaption
+
+- *Netconfig Service:* Used for our REST interface
+
+- *Packet Service:* To receive , decode, encode and dispatch openflow messages
+ from/to devices
+
+- *Device Service:* To communicate with device via connection point
+
+*Driver Layer:*
+
+- *Olt pipeline:* driver for our OLT device.
+
+ 1. Igmp Message Flow
+
+![](media/052750ec8402a955a5b6a9b3e513d9cc.png)
+
+Comment:
+
+For step 9, for performance purpose, is not really removing the rules. Just
+remove the output port from the group rule.
+
+IGMP Subsystem Structure
+------------------------
+
+*Apps and Sub-systems:*
+
+*Igmpproxy App*
+
+1. Host state machine(interact with multicast source), refer to RFC 2236
+
+- To join group, send **add sink** to Cordmcast
+
+- To leave group, send **remove sink** to Cordmcast
+
+1. source host management(interact with hosts)
+
+- accept and process Igmp join/leave packet form hosts
+
+- Find first join host and last leave host, inform the state machine
+
+1. Igmp packet encode/decode and dispatch
+
+- To decode v2/v3 igmp packet, and dispatch them into different subsystems
+
+- For v3 join packet, divided it with different group addresses
+
+1. Rest interface (use onos network configuration service) subsystems, for user
+ configuration
+
+ *Cordmcast App:*
+
+ Receive **add sink**/ **remove sink** message, translate into **forward**
+ and **next** primitives, and send those primitives to olt pipeline
+
+ *Olt pipeline:*
+
+ Receive **forward/next** primitives, translate to openflow rules, send
+ **group add/remove**, **flow add/remove** messages to OLT
+
+2. Subsystem Design
+
+ 1. host State Machine
+
+ Index: mvlan + group ip + devid , we use this index to indicate a host.
+ And for each host, we implement a independent state machine, see the
+ chart below, refer to RFC 2236
+
+**Event description:**
+
+1. join/leave comes from packet-in message , received from the uni port
+
+2. query comes from packet-in message m received from the uplink port
+
+**Action description:**
+
+1. send report/leave means send those IGMP packet to the uplink port of device;
+
+2. Timer:
+
+ for join event, the timer is [0, **Unsolicited Report Interval**],
+ **Unsolicited Report Interval** is an user configured value.
+
+ for query event,the timer is [0, **Max Resp Time**], **Max Resp Time** is a
+ field in query packet.
+
+**Implementation:**
+
+1. For general query,retrieve all hosts which belong to the group, and send
+ query message.
+
+ For specific query, only send the message to the specific host
+
+2. If last member leave the group, the state machine will be removed , and a
+ remove sink message will be sent to Cordmcast. **For performance purpose,
+ ONOS will not really remove the rules. Just remove the output port from the
+ group rule.**
+
+3. South host management
+
+ 1. Description
+
+South host management divided into 2 sub-module:Southbound **Hosts Management**
+and **Northbound Reporter**,as the chart below:
+
+### Southbound Hosts Management
+
+Index: mvlan + group ip + devid + port
+
+Action:
+
+1. When receive join/leave message to a new index, forward message to
+ Northbound Reporter;
+
+ Filter messages with same index
+
+2. When receive the leave message and the fast leave switch is turned off, send
+ query to all hosts which belong to the same group. It is because maybe some
+ hosts join into same group.
+
+3. Aging: we will send query message to each hosts in a fixed period, if no
+ response, we will remove the host.
+
+ 1. Northbound Reporter
+
+Index:mvlan + groupip + devid
+
+Action:
+
+1. Receive join leave messages from **Southbound Hosts Management.**
+
+2. If it is the first host that join into a specific group, send **add sink**
+ to Cordmcast;
+
+ If it is not the first host to join , just increase the group’s host
+ counter;
+
+ If it is the last host leave from the group, send **remove sink** to
+ Cordmcast;
+
+ If it is not the last host to leave, just decrease the group’s host counter.
+
+ 1. IGMP packet encode/decode and dispatch
+
+Action:
+
+1. Receive igmp message from device, decode it;
+
+2. If it is IGMP v3 reporter and include multiple groups, divided it into some
+ packet with single group;
+
+3. Dispatch the packet to different subsystems.
+
+The flow chart:
+
+1. External Interface
+
+ 1. NBI REST-API
+
+Parameter **Unsolicited TimeOut**, **Max Respose times**, **Keep Alive
+Interval**, **Last Query Interval**, **Last Query Count**, **Fast Leave flags**
+can be configured through REST-API provided by ONOS
+
+Please refer to REST-API for ONOS
+APP ([3HH-12876-0211-DDZZA](https://ct.web.alcatel-lucent.com/scm-lib4/show-entry.cgi?number=3HH-12876-0211-DDZZA))
+for details
+
+1. SBI Openflow
+
+ 1. Lift Rule:
+
+| **type** | **Call situation** | **Match field** | **Match value** | **Action field** | **Action value** |
+|-------------|------------------------------------------------------|-----------------|----------------------------------------------------------------------------------|------------------|------------------|
+| Add flow | igmp app activate | Ethernet | IPV4 | Output port | controller |
+| | | Protocol | IGMP | | |
+| | | port | {all activated port } | | |
+| Delete flow | igmp app deactivate, or port state change to disable | Ethernet | IPV4 | | |
+| | | Protocol | IGMP | | |
+| | | port | {All activated port if app goes down, or the port which change state to disable} | N/A | |
+
+This flow rule is used for instructing olt to transmit the received igmp packets
+to the controller. Once the igmp app is initialized , the device information is
+registered in ONOS, and the port is enabled, Igmp app will send this flow rule
+to the device for each of its activated port automatically.
+
+The flow rules would be removed while igmpproxy app deactivated.
+
+### Multicast Rule:
+
+| **type** | **Call situation** | **Match field** | **Match value** | **Action field** | **Action value** |
+|-------------------------|--------------------|--------------------------|-----------------|------------------|------------------|
+| Group add | Receive igmp join | With group id as the key | N/A | Output port | {host port} |
+| Group mod (delete port) | Receive igmp leave | With group id as the key | N/A | N/A | N/A |
+| Add flow | Receive igmp join | Ethernet | IPV4 | Group | Group id |
+| | | In Port | {uplink port} | | |
+| | | IPDst | {group address} | | |
+| | | Vlan Id | mcast vlan | | |
+
+This flow rule is used for instructing olt to forward the igmp traffic from
+uplink port to the ONU port. Once the **igmp join** packet was received and
+processed succesfully , Igmp app will send this flow rule (a **group add**
+message and a **flow add** message) to the port which report the igmp join;
+
+If host request to leave a group, a **group mod** message will send to olt, and
+the **out port** of group rule would be removed;
+
+If host re-send join message to ONOS, a **group mod** message will send to olt,
+which to add the **out port** again.
+
+IPTV STB/Source packet interface
+--------------------------------
+
+| Packet Types | Description |
+|-------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| Membership Query Message | Membership Queries are sent by IP multicast routers to query the multicast reception state of neighboring interfaces |
+| Version 3 Membership Report Message | Version 3 Membership Reports are sent by IP systems to report (to neighboring routers) the current multicast reception state, or changes in the multicast reception state, of their interfaces. |
+
+**For more Igmp packet detail, please refer to RFC 3376**
diff --git a/app.xml b/app.xml
new file mode 100644
index 0000000..0b0389d
--- /dev/null
+++ b/app.xml
@@ -0,0 +1,22 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Copyright 2017 Nokia
+ ~
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<app name="org.opencord.igmpproxy" origin="Nokia" version="1.2-SNAPSHOT"
+ featuresRepo="mvn:org.opencord/onos-app-igmpproxy/1.2-SNAPSHOT/xml/features"
+ features="onos-app-igmpproxy" apps="org.opencord.mcast">
+ <description>IGMP PROXY APP</description>
+ <artifact>mvn:org.opencord/onos-app-igmpproxy/1.2-SNAPSHOT</artifact>
+</app>
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..4986eeb
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,192 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Copyright 2015-present Open Networking Laboratory
+ ~
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns="http://maven.apache.org/POM/4.0.0"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-dependencies</artifactId>
+ <version>1.8.0</version>
+ <relativePath></relativePath>
+ </parent>
+
+ <groupId>org.opencord</groupId>
+ <artifactId>onos-app-igmpproxy</artifactId>
+ <version>1.2-SNAPSHOT</version>
+ <packaging>bundle</packaging>
+
+ <description>IGMP PROXY APP</description>
+
+ <properties>
+ <onos.app.name>org.opencord.igmpproxy</onos.app.name>
+ <onos.version>1.8.0</onos.version>
+ <onos.app.category>Traffic Steering</onos.app.category>
+ <onos.app.title>IGMP proxy App</onos.app.title>
+ <onos.app.url>http://opencord.org</onos.app.url>
+ <onos.app.readme>IGMP implementation.</onos.app.readme>
+ <onos.app.requires>org.opencord.config</onos.app.requires>
+ <onos.app.requires>org.opencord.mcast</onos.app.requires>
+ </properties>
+
+
+ <dependencies>
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-api</artifactId>
+ <version>${onos.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-cli</artifactId>
+ <version>${onos.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onlab-osgi</artifactId>
+ <version>${onos.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onlab-junit</artifactId>
+ <version>${onos.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onlab-misc</artifactId>
+ <version>${onos.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-incubator-api</artifactId>
+ <version>${onos.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-incubator-net</artifactId>
+ <version>${onos.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.opencord</groupId>
+ <artifactId>cord-config</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <version>3.0.1</version>
+ <extensions>true</extensions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.5.1</version>
+ <configuration>
+ <source>1.8</source>
+ <target>1.8</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-scr-plugin</artifactId>
+ <version>1.21.0</version>
+ <executions>
+ <execution>
+ <id>generate-scr-srcdescriptor</id>
+ <goals>
+ <goal>scr</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <supportedProjectTypes>
+ <supportedProjectType>bundle</supportedProjectType>
+ <supportedProjectType>war</supportedProjectType>
+ </supportedProjectTypes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-maven-plugin</artifactId>
+ <version>1.7</version>
+ <executions>
+ <execution>
+ <id>cfg</id>
+ <phase>generate-resources</phase>
+ <goals>
+ <goal>cfg</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>swagger</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>swagger</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>app</id>
+ <phase>package</phase>
+ <goals>
+ <goal>app</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <repositories>
+ <repository>
+ <id>central</id>
+ <name>Central Repository</name>
+ <url>http://repo.maven.apache.org/maven2</url>
+ <layout>default</layout>
+ <snapshots>
+ <enabled>false</enabled>
+ </snapshots>
+ <releases>
+ <enabled>true</enabled>
+ <updatePolicy>always</updatePolicy>
+ <checksumPolicy>fail</checksumPolicy>
+ </releases>
+ </repository>
+
+ <repository>
+ <id>snapshots</id>
+ <url>https://oss.sonatype.org/content/repositories/snapshots</url>
+ <snapshots>
+ <enabled>true</enabled>
+ <updatePolicy>always</updatePolicy>
+ <checksumPolicy>fail</checksumPolicy>
+ </snapshots>
+ </repository>
+ </repositories>
+</project>
diff --git a/src/main/java/org/opencord/igmpproxy/GroupMember.java b/src/main/java/org/opencord/igmpproxy/GroupMember.java
new file mode 100644
index 0000000..4fab7d4
--- /dev/null
+++ b/src/main/java/org/opencord/igmpproxy/GroupMember.java
@@ -0,0 +1,262 @@
+package org.opencord.igmpproxy;
+
+import org.onlab.packet.IGMPMembership;
+import org.onlab.packet.Ip4Address;
+import org.onlab.packet.VlanId;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.PortNumber;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+
+/**
+ * Date struct to keep Igmp member infomations.
+ */
+public final class GroupMember {
+
+ private final VlanId vlan;
+ private final DeviceId deviceId;
+ private final PortNumber portNumber;
+ private final Ip4Address groupIp;
+ private final boolean v2;
+ private byte recordType = IGMPMembership.MODE_IS_INCLUDE;
+ private ArrayList<Ip4Address> sourceList = new ArrayList<>();
+ private int keepAliveQueryInterval = 0;
+ private int keepAliveQueryCount = 0;
+ private int lastQueryInterval = 0;
+ private int lastQueryCount = 0;
+ private boolean leave = false;
+
+ public GroupMember(Ip4Address groupIp, VlanId vlan, DeviceId deviceId, PortNumber portNum, boolean isV2) {
+ this.groupIp = groupIp;
+ this.vlan = vlan;
+ this.deviceId = deviceId;
+ this.portNumber = portNum;
+ v2 = isV2;
+ }
+
+ static String getkey(Ip4Address groupIp, DeviceId deviceId, PortNumber portNum) {
+ return groupIp.toString() + deviceId.toString() + portNum.toString();
+ }
+
+ public String getkey() {
+ return GroupMember.getkey(groupIp, deviceId, portNumber);
+ }
+
+ public String getId() {
+ return getkey();
+ }
+
+ public VlanId getvlan() {
+ return vlan;
+ }
+
+ public DeviceId getDeviceId() {
+ return deviceId;
+ }
+
+ public PortNumber getPortNumber() {
+ return portNumber;
+ }
+
+ public Ip4Address getGroupIp() {
+ return groupIp;
+ }
+
+ public byte getRecordType() {
+ return recordType;
+ }
+
+ public boolean getv2() {
+ return v2;
+ }
+
+ public ArrayList<Ip4Address> getSourceList() {
+ return sourceList;
+ }
+
+
+ public void updateList(byte recordType, ArrayList<Ip4Address> newSourceList) {
+ this.recordType = recordType;
+ this.sourceList.clear();
+ this.sourceList.addAll(newSourceList);
+
+ /*TODO : support SSM
+ if (this.recordType == IGMPMembership.MODE_IS_INCLUDE) {
+ switch (recordType) {
+ case IGMPMembership.MODE_IS_INCLUDE:
+ case IGMPMembership.CHANGE_TO_INCLUDE_MODE:
+ //however , set to include<B> anyway
+ this.sourceList = sourceList;
+ this.recordType = IGMPMembership.MODE_IS_INCLUDE;
+ break;
+ case IGMPMembership.MODE_IS_EXCLUDE:
+ case IGMPMembership.CHANGE_TO_EXCLUDE_MODE:
+ //set to exclude<B>
+ this.sourceList = sourceList;
+ this.recordType = IGMPMembership.MODE_IS_EXCLUDE;
+ break;
+ case IGMPMembership.ALLOW_NEW_SOURCES:
+ //set to include <A+B>
+ join(this.sourceList, sourceList);
+ break;
+ case IGMPMembership.BLOCK_OLD_SOURCES:
+ //set to include <A-B>
+ exclude(this.sourceList, sourceList);
+ break;
+ default:
+ break;
+ }
+ } else if (this.recordType == IGMPMembership.MODE_IS_EXCLUDE) {
+ switch (recordType) {
+ case IGMPMembership.MODE_IS_INCLUDE:
+ case IGMPMembership.CHANGE_TO_INCLUDE_MODE:
+ //set to include<B>
+ this.recordType = IGMPMembership.MODE_IS_INCLUDE;
+ this.sourceList = sourceList;
+ break;
+ case IGMPMembership.MODE_IS_EXCLUDE:
+ case IGMPMembership.CHANGE_TO_EXCLUDE_MODE:
+ this.sourceList = sourceList;
+ this.recordType = IGMPMembership.MODE_IS_EXCLUDE;
+ break;
+ case IGMPMembership.ALLOW_NEW_SOURCES:
+ //set to exclude <A-B>
+ exclude(this.sourceList, sourceList);
+ break;
+ case IGMPMembership.BLOCK_OLD_SOURCES:
+ //set to exclude <A+B>
+ join(this.sourceList, sourceList);
+ break;
+ default:
+ break;
+ }
+ }*/
+
+ return;
+ }
+
+
+ /*join B to A (A+B)*/
+ private void join(ArrayList<Integer> listA, ArrayList<Integer> listB) {
+ Iterator<Integer> iterA = null;
+ Iterator<Integer> iterB = listB.iterator();
+ boolean exists;
+ while (iterB.hasNext()) {
+ iterA = listA.iterator();
+ exists = false;
+ int ipToAdd = iterB.next();
+ while (iterA.hasNext()) {
+ if (iterA.next().equals(ipToAdd)) {
+ exists = true;
+ break;
+ }
+ }
+ if (!exists) {
+ listA.add(ipToAdd);
+ }
+ }
+ }
+
+ /* include A and B (A*B)*/
+ private void intersection(ArrayList<Integer> listA, ArrayList<Integer> listB) {
+ Iterator<Integer> iterA = listA.iterator();
+ Iterator<Integer> iterB;
+ boolean exists;
+
+ while (iterA.hasNext()) {
+ iterB = listB.iterator();
+ int ipToInclude = iterA.next();
+ exists = false;
+ while (iterB.hasNext()) {
+ if (iterB.next().equals(ipToInclude)) {
+ exists = true;
+ break;
+ }
+ }
+ if (!exists) {
+ iterA.remove();
+ }
+ }
+ }
+
+ /*exclude B from A (A-B)*/
+ private void exclude(ArrayList<Integer> listA, ArrayList<Integer> listB) {
+ Iterator<Integer> iterA = null;
+ Iterator<Integer> iterB = listB.iterator();
+
+ while (iterB.hasNext()) {
+ iterA = listA.iterator();
+ int ipToDel = iterB.next();
+ while (iterA.hasNext()) {
+ if (iterA.next().equals(ipToDel)) {
+ iterA.remove();
+ break;
+ }
+ }
+ }
+ }
+
+ public void setLeave(boolean l) {
+ leave = l;
+ }
+
+ public boolean isLeave() {
+ return leave;
+ }
+
+ public int getKeepAliveQueryInterval() {
+ return keepAliveQueryInterval;
+ }
+
+ public int getKeepAliveQueryCount() {
+ return keepAliveQueryCount;
+ }
+
+ public int getLastQueryInterval() {
+ return lastQueryInterval;
+ }
+
+ public int getLastQueryCount() {
+ return lastQueryCount;
+ }
+
+ public void keepAliveQueryCount(boolean add) {
+ if (add) {
+ keepAliveQueryCount++;
+ } else {
+ keepAliveQueryCount = 0;
+ }
+ }
+
+ public void lastQueryCount(boolean add) {
+ if (add) {
+ lastQueryCount++;
+ } else {
+ lastQueryCount = 0;
+ }
+ }
+
+ public void keepAliveInterval(boolean add) {
+ if (add) {
+ keepAliveQueryInterval++;
+ } else {
+ keepAliveQueryInterval = 0;
+ }
+ }
+
+ public void lastQueryInterval(boolean add) {
+ if (add) {
+ lastQueryInterval++;
+ } else {
+ lastQueryInterval = 0;
+ }
+ }
+
+ public void resetAllTimers() {
+ keepAliveQueryInterval = 0;
+ keepAliveQueryCount = 0;
+ lastQueryInterval = 0;
+ lastQueryCount = 0;
+ }
+}
diff --git a/src/main/java/org/opencord/igmpproxy/IgmpManager.java b/src/main/java/org/opencord/igmpproxy/IgmpManager.java
new file mode 100644
index 0000000..4194b92
--- /dev/null
+++ b/src/main/java/org/opencord/igmpproxy/IgmpManager.java
@@ -0,0 +1,781 @@
+package org.opencord.igmpproxy;
+
+import com.google.common.collect.Maps;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.onlab.packet.EthType;
+import org.onlab.packet.Ethernet;
+import org.onlab.packet.IGMP;
+import org.onlab.packet.IGMPGroup;
+import org.onlab.packet.IGMPMembership;
+import org.onlab.packet.IGMPQuery;
+import org.onlab.packet.IPv4;
+import org.onlab.packet.Ip4Address;
+import org.onlab.packet.IpAddress;
+import org.onlab.packet.VlanId;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.incubator.net.config.basics.McastConfig;
+import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.AnnotationKeys;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.Port;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.config.ConfigFactory;
+import org.onosproject.net.config.NetworkConfigEvent;
+import org.onosproject.net.config.NetworkConfigListener;
+import org.onosproject.net.config.NetworkConfigRegistry;
+import org.onosproject.net.config.basics.SubjectFactories;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.device.DeviceListener;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.FlowRuleService;
+import org.onosproject.net.flow.criteria.Criteria;
+import org.onosproject.net.flowobjective.DefaultFilteringObjective;
+import org.onosproject.net.flowobjective.FilteringObjective;
+import org.onosproject.net.flowobjective.FlowObjectiveService;
+import org.onosproject.net.flowobjective.Objective;
+import org.onosproject.net.flowobjective.ObjectiveContext;
+import org.onosproject.net.flowobjective.ObjectiveError;
+import org.onosproject.net.mcast.McastRoute;
+import org.onosproject.net.mcast.MulticastRouteService;
+import org.onosproject.net.packet.InboundPacket;
+import org.onosproject.net.packet.PacketContext;
+import org.onosproject.net.packet.PacketProcessor;
+import org.onosproject.net.packet.PacketService;
+import org.opencord.cordconfig.access.AccessDeviceConfig;
+import org.opencord.cordconfig.access.AccessDeviceData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Igmp process application, use proxy mode, support first join/ last leave , fast leave
+ * period query and keep alive, packet out igmp message to uplink port features.
+ */
+@Component(immediate = true)
+public class IgmpManager {
+
+ private static final Class<AccessDeviceConfig> CONFIG_CLASS =
+ AccessDeviceConfig.class;
+ private static final Class<IgmpproxyConfig> IGMPPROXY_CONFIG_CLASS =
+ IgmpproxyConfig.class;
+ private static final Class<IgmpproxySsmTranslateConfig> IGMPPROXY_SSM_CONFIG_CLASS =
+ IgmpproxySsmTranslateConfig.class;
+ private static final Class<McastConfig> MCAST_CONFIG_CLASS =
+ McastConfig.class;
+ public static Map<String, GroupMember> groupMemberMap = Maps.newConcurrentMap();
+ private static ApplicationId appId;
+ private static Map<DeviceId, AccessDeviceData> oltData = new ConcurrentHashMap<>();
+ private static int unSolicitedTimeout = 3; // unit is 1 sec
+ private static int keepAliveCount = 3;
+ private static int lastQueryInterval = 2; //unit is 1 sec
+ private static int lastQueryCount = 2;
+ private static boolean fastLeave = true;
+ private static boolean withRAUplink = true;
+ private static boolean withRADownlink = false;
+ private static boolean periodicQuery = true;
+ private static short mvlan = 4000;
+ private static byte igmpCos = 7;
+ public static boolean connectPointMode = true;
+ public static ConnectPoint connectPoint = null;
+
+ private final ScheduledExecutorService scheduledExecutorService =
+ Executors.newScheduledThreadPool(1);
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected CoreService coreService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected PacketService packetService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected MastershipService mastershipService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected FlowRuleService flowRuleService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected DeviceService deviceService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected FlowObjectiveService flowObjectiveService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected NetworkConfigRegistry networkConfig;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected MulticastRouteService multicastService;
+ private IgmpPacketProcessor processor = new IgmpPacketProcessor();
+ private Logger log = LoggerFactory.getLogger(getClass());
+ private ApplicationId coreAppId;
+ private Map<Ip4Address, Ip4Address> ssmTranslateTable = new ConcurrentHashMap<>();
+ private InternalNetworkConfigListener configListener =
+ new InternalNetworkConfigListener();
+ private DeviceListener deviceListener = new InternalDeviceListener();
+ private ConfigFactory<DeviceId, AccessDeviceConfig> configFactory = null;
+ private ConfigFactory<ApplicationId, IgmpproxyConfig> igmpproxyConfigFactory =
+ new ConfigFactory<ApplicationId, IgmpproxyConfig>(
+ SubjectFactories.APP_SUBJECT_FACTORY, IGMPPROXY_CONFIG_CLASS, "igmpproxy") {
+ @Override
+ public IgmpproxyConfig createConfig() {
+ return new IgmpproxyConfig();
+ }
+ };
+ private ConfigFactory<ApplicationId, IgmpproxySsmTranslateConfig> igmpproxySsmConfigFactory =
+ new ConfigFactory<ApplicationId, IgmpproxySsmTranslateConfig>(
+ SubjectFactories.APP_SUBJECT_FACTORY, IGMPPROXY_SSM_CONFIG_CLASS, "ssmTranslate", true) {
+ @Override
+ public IgmpproxySsmTranslateConfig createConfig() {
+ return new IgmpproxySsmTranslateConfig();
+ }
+ };
+ private int maxResp = 10; //unit is 1 sec
+ private int keepAliveInterval = 120; //unit is 1 sec
+
+ public static int getUnsolicitedTimeout() {
+ return unSolicitedTimeout;
+ }
+
+ @Activate
+ protected void activate() {
+ appId = coreService.registerApplication("org.opencord.igmpproxy");
+ coreAppId = coreService.registerApplication(CoreService.CORE_APP_NAME);
+ packetService.addProcessor(processor, PacketProcessor.director(4));
+ IgmpSender.init(packetService, mastershipService);
+
+ if (networkConfig.getConfigFactory(CONFIG_CLASS) == null) {
+ configFactory =
+ new ConfigFactory<DeviceId, AccessDeviceConfig>(
+ SubjectFactories.DEVICE_SUBJECT_FACTORY, CONFIG_CLASS, "accessDevice") {
+ @Override
+ public AccessDeviceConfig createConfig() {
+ return new AccessDeviceConfig();
+ }
+ };
+ networkConfig.registerConfigFactory(configFactory);
+ }
+ networkConfig.registerConfigFactory(igmpproxySsmConfigFactory);
+ networkConfig.registerConfigFactory(igmpproxyConfigFactory);
+ networkConfig.addListener(configListener);
+
+ configListener.reconfigureNetwork(networkConfig.getConfig(appId, IGMPPROXY_CONFIG_CLASS));
+ configListener.reconfigureSsmTable(networkConfig.getConfig(appId, IGMPPROXY_SSM_CONFIG_CLASS));
+
+ networkConfig.getSubjects(DeviceId.class, AccessDeviceConfig.class).forEach(
+ subject -> {
+ AccessDeviceConfig config = networkConfig.getConfig(subject,
+ AccessDeviceConfig.class);
+ if (config != null) {
+ AccessDeviceData data = config.getAccessDevice();
+ oltData.put(data.deviceId(), data);
+ }
+ }
+ );
+
+ oltData.keySet().forEach(d->provisionDefaultFlows(d));
+ if (connectPointMode) {
+ provisionConnectPointFlows();
+ } else {
+ provisionUplinkFlows();
+ }
+
+ McastConfig config = networkConfig.getConfig(coreAppId, MCAST_CONFIG_CLASS);
+ if (config != null) {
+ mvlan = config.egressVlan().toShort();
+ }
+ deviceService.addListener(deviceListener);
+ scheduledExecutorService.scheduleAtFixedRate(new IgmpProxyTimerTask(), 0, 1000, TimeUnit.MILLISECONDS);
+
+ log.info("Started");
+ }
+
+ @Deactivate
+ protected void deactivate() {
+ scheduledExecutorService.shutdown();
+
+ // de-register and null our handler
+ networkConfig.removeListener(configListener);
+ if (configFactory != null) {
+ networkConfig.unregisterConfigFactory(configFactory);
+ }
+ networkConfig.unregisterConfigFactory(igmpproxyConfigFactory);
+ networkConfig.unregisterConfigFactory(igmpproxySsmConfigFactory);
+ deviceService.removeListener(deviceListener);
+ packetService.removeProcessor(processor);
+ flowRuleService.removeFlowRulesById(appId);
+
+ log.info("Stopped");
+ }
+
+ protected Ip4Address getDeviceIp(DeviceId ofDeviceId) {
+ try {
+ String[] mgmtAddress = deviceService.getDevice(ofDeviceId)
+ .annotations().value(AnnotationKeys.MANAGEMENT_ADDRESS).split(":");
+ return Ip4Address.valueOf(mgmtAddress[0]);
+ } catch (Exception ex) {
+ log.info("No valid Ipaddress for " + ofDeviceId.toString());
+ return null;
+ }
+ }
+
+ private void processIgmpQuery(IGMPQuery igmpQuery, ConnectPoint cp, int maxResp) {
+
+ DeviceId deviceId = cp.deviceId();
+ Ip4Address gAddr = igmpQuery.getGaddr().getIp4Address();
+
+ if (maxResp >= 128) {
+ int mant = maxResp & 0xf;
+ int exp = (maxResp >> 4) & 0x7;
+ maxResp = (mant | 0x10) << (exp + 3);
+ }
+
+ maxResp = (maxResp + 5) / 10;
+
+ if (gAddr != null && !gAddr.isZero()) {
+ StateMachine.specialQuery(deviceId, gAddr, maxResp);
+ } else {
+ StateMachine.generalQuery(deviceId, maxResp);
+ }
+
+ }
+
+ private Ip4Address ssmTranslateRoute(IpAddress group) {
+ return ssmTranslateTable.get(group);
+ }
+
+ private void processIgmpReport(IGMPMembership igmpGroup, VlanId vlan, ConnectPoint cp, byte igmpType) {
+ DeviceId deviceId = cp.deviceId();
+ PortNumber portNumber = cp.port();
+
+ Ip4Address groupIp = igmpGroup.getGaddr().getIp4Address();
+ if (!groupIp.isMulticast()) {
+ log.info(groupIp.toString() + " is not a valid group address");
+ return;
+ }
+ Ip4Address srcIp = getDeviceIp(deviceId);
+
+ byte recordType = igmpGroup.getRecordType();
+ boolean join = false;
+
+ ArrayList<Ip4Address> sourceList = new ArrayList<>();
+
+ if (igmpGroup.getSources().size() > 0) {
+ igmpGroup.getSources().forEach(source -> sourceList.add(source.getIp4Address()));
+ if (recordType == IGMPMembership.CHANGE_TO_EXCLUDE_MODE ||
+ recordType == IGMPMembership.MODE_IS_EXCLUDE ||
+ recordType == IGMPMembership.BLOCK_OLD_SOURCES) {
+ join = false;
+ } else if (recordType == IGMPMembership.CHANGE_TO_INCLUDE_MODE ||
+ recordType == IGMPMembership.MODE_IS_INCLUDE ||
+ recordType == IGMPMembership.ALLOW_NEW_SOURCES) {
+ join = true;
+ }
+ } else {
+ IpAddress src = ssmTranslateRoute(groupIp);
+ if (src == null) {
+ log.info("no ssm translate for group " + groupIp.toString());
+ return;
+ }
+ sourceList.add(src.getIp4Address());
+ if (recordType == IGMPMembership.CHANGE_TO_EXCLUDE_MODE ||
+ recordType == IGMPMembership.MODE_IS_EXCLUDE ||
+ recordType == IGMPMembership.BLOCK_OLD_SOURCES) {
+ join = true;
+ } else if (recordType == IGMPMembership.CHANGE_TO_INCLUDE_MODE ||
+ recordType == IGMPMembership.MODE_IS_INCLUDE ||
+ recordType == IGMPMembership.ALLOW_NEW_SOURCES) {
+ join = false;
+ }
+ }
+ String groupMemberKey = GroupMember.getkey(groupIp, deviceId, portNumber);
+ GroupMember groupMember = groupMemberMap.get(groupMemberKey);
+
+ if (join) {
+ if (groupMember == null) {
+ if (igmpType == IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT) {
+ groupMember = new GroupMember(groupIp, vlan, deviceId, portNumber, true);
+ } else {
+ groupMember = new GroupMember(groupIp, vlan, deviceId, portNumber, false);
+ }
+ StateMachine.join(deviceId, groupIp, srcIp);
+ groupMemberMap.put(groupMemberKey, groupMember);
+ groupMember.updateList(recordType, sourceList);
+ groupMember.getSourceList().forEach(source -> multicastService.addSink(
+ new McastRoute(source, groupIp, McastRoute.Type.IGMP), cp));
+ }
+ groupMember.resetAllTimers();
+ groupMember.updateList(recordType, sourceList);
+ groupMember.setLeave(false);
+ } else {
+ if (groupMember == null) {
+ log.info("receive leave but no instance, group " + groupIp.toString() +
+ " device:" + deviceId.toString() + " port:" + portNumber.toString());
+ return;
+ } else {
+ groupMember.setLeave(true);
+ if (fastLeave) {
+ leaveAction(groupMember);
+ } else {
+ sendQuery(groupMember);
+ }
+ }
+ }
+ }
+
+ private void leaveAction(GroupMember groupMember) {
+ ConnectPoint cp = new ConnectPoint(groupMember.getDeviceId(), groupMember.getPortNumber());
+ StateMachine.leave(groupMember.getDeviceId(), groupMember.getGroupIp());
+ groupMember.getSourceList().forEach(source -> multicastService.removeSink(
+ new McastRoute(source, groupMember.getGroupIp(),
+ McastRoute.Type.IGMP), cp));
+ groupMemberMap.remove(groupMember.getId());
+ }
+
+ private void sendQuery(GroupMember groupMember) {
+ Ethernet ethpkt;
+ Ip4Address srcIp = getDeviceIp(groupMember.getDeviceId());
+ if (groupMember.getv2()) {
+ ethpkt = IgmpSender.getInstance().buildIgmpV2Query(groupMember.getGroupIp(), srcIp);
+ } else {
+ ethpkt = IgmpSender.getInstance().buildIgmpV3Query(groupMember.getGroupIp(), srcIp);
+ }
+ IgmpSender.getInstance().sendIgmpPacket(ethpkt, groupMember.getDeviceId(), groupMember.getPortNumber());
+ }
+
+ private void processFilterObjective(DeviceId devId, Port port, boolean remove) {
+
+ //TODO migrate to packet requests when packet service uses filtering objectives
+ DefaultFilteringObjective.Builder builder = DefaultFilteringObjective.builder();
+
+ builder = remove ? builder.deny() : builder.permit();
+
+ FilteringObjective igmp = builder
+ .withKey(Criteria.matchInPort(port.number()))
+ .addCondition(Criteria.matchEthType(EthType.EtherType.IPV4.ethType()))
+ .addCondition(Criteria.matchIPProtocol(IPv4.PROTOCOL_IGMP))
+ .withMeta(DefaultTrafficTreatment.builder().setOutput(PortNumber.CONTROLLER).build())
+ .fromApp(appId)
+ .withPriority(10000)
+ .add(new ObjectiveContext() {
+ @Override
+ public void onSuccess(Objective objective) {
+ log.info("IgmpProxy filter for {} on {} installed.",
+ devId, port);
+ }
+
+ @Override
+ public void onError(Objective objective, ObjectiveError error) {
+ log.info("IgmpProxy filter for {} on {} failed because {}.",
+ devId, port, error);
+ }
+ });
+
+ flowObjectiveService.filter(devId, igmp);
+ }
+
+ /**
+ * Packet processor responsible for forwarding packets along their paths.
+ */
+ private class IgmpPacketProcessor implements PacketProcessor {
+ @Override
+ public void process(PacketContext context) {
+
+ try {
+ InboundPacket pkt = context.inPacket();
+ Ethernet ethPkt = pkt.parsed();
+ if (ethPkt == null) {
+ return;
+ }
+
+ if (ethPkt.getEtherType() != Ethernet.TYPE_IPV4) {
+ return;
+ }
+
+ IPv4 ipv4Pkt = (IPv4) ethPkt.getPayload();
+
+ if (ipv4Pkt.getProtocol() != IPv4.PROTOCOL_IGMP) {
+ return;
+ }
+
+ short vlan = ethPkt.getVlanID();
+ DeviceId deviceId = pkt.receivedFrom().deviceId();
+
+ if (oltData.get(deviceId) == null) {
+ log.error("Device not registered in netcfg :" + deviceId.toString());
+ return;
+ }
+
+ IGMP igmp = (IGMP) ipv4Pkt.getPayload();
+ switch (igmp.getIgmpType()) {
+ case IGMP.TYPE_IGMPV3_MEMBERSHIP_QUERY:
+ //Discard Query from OLT’s non-uplink port’s
+ if (!pkt.receivedFrom().port().equals(getDeviceUplink(deviceId))) {
+ log.info("IGMP Picked up query from non-uplink port");
+ return;
+ }
+
+ processIgmpQuery((IGMPQuery) igmp.getGroups().get(0), pkt.receivedFrom(),
+ 0xff & igmp.getMaxRespField());
+ break;
+ case IGMP.TYPE_IGMPV1_MEMBERSHIP_REPORT:
+ log.debug("IGMP version 1 message types are not currently supported.");
+ break;
+ case IGMP.TYPE_IGMPV3_MEMBERSHIP_REPORT:
+ case IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT:
+ case IGMP.TYPE_IGMPV2_LEAVE_GROUP:
+ //Discard join/leave from OLT’s uplink port’s
+ if (pkt.receivedFrom().port().equals(getDeviceUplink(deviceId))) {
+ log.info("IGMP Picked up join/leave from the olt uplink port");
+ return;
+ }
+
+ Iterator<IGMPGroup> itr = igmp.getGroups().iterator();
+ while (itr.hasNext()) {
+ IGMPGroup group = itr.next();
+ if (group instanceof IGMPMembership) {
+ processIgmpReport((IGMPMembership) group, VlanId.vlanId(vlan),
+ pkt.receivedFrom(), igmp.getIgmpType());
+ } else if (group instanceof IGMPQuery) {
+ IGMPMembership mgroup;
+ mgroup = new IGMPMembership(group.getGaddr().getIp4Address());
+ mgroup.setRecordType(igmp.getIgmpType() == IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT ?
+ IGMPMembership.MODE_IS_EXCLUDE : IGMPMembership.MODE_IS_INCLUDE);
+ processIgmpReport(mgroup, VlanId.vlanId(vlan),
+ pkt.receivedFrom(), igmp.getIgmpType());
+ }
+ }
+ break;
+
+ default:
+ log.info("wrong IGMP v3 type:" + igmp.getIgmpType());
+ break;
+ }
+
+ } catch (Exception ex) {
+ log.error("igmp process error : " + ex.toString());
+ ex.printStackTrace();
+ }
+ }
+ }
+
+ private class IgmpProxyTimerTask extends TimerTask {
+ public void run() {
+ try {
+ IgmpTimer.timeOut1s();
+ queryMembers();
+ } catch (Exception ex) {
+ log.warn("Igmp timer task error : {}", ex.getMessage());
+ }
+ }
+
+ private void queryMembers() {
+ GroupMember groupMember;
+ Set groupMemberSet = groupMemberMap.entrySet();
+ Iterator itr = groupMemberSet.iterator();
+ while (itr.hasNext()) {
+ Map.Entry entry = (Map.Entry) itr.next();
+ groupMember = (GroupMember) entry.getValue();
+ DeviceId did = groupMember.getDeviceId();
+ if (mastershipService.isLocalMaster(did)) {
+ if (groupMember.isLeave()) {
+ lastQuery(groupMember);
+ } else if (periodicQuery) {
+ periodicQuery(groupMember);
+ }
+ }
+ }
+ }
+
+ private void lastQuery(GroupMember groupMember) {
+ if (groupMember.getLastQueryInterval() < lastQueryInterval) {
+ groupMember.lastQueryInterval(true); // count times
+ } else if (groupMember.getLastQueryCount() < lastQueryCount - 1) {
+ sendQuery(groupMember);
+ groupMember.lastQueryInterval(false); // reset count number
+ groupMember.lastQueryCount(true); //count times
+ } else if (groupMember.getLastQueryCount() == lastQueryCount - 1) {
+ leaveAction(groupMember);
+ }
+ }
+
+ private void periodicQuery(GroupMember groupMember) {
+ if (groupMember.getKeepAliveQueryInterval() < keepAliveInterval) {
+ groupMember.keepAliveInterval(true);
+ } else if (groupMember.getKeepAliveQueryCount() < keepAliveCount) {
+ sendQuery(groupMember);
+ groupMember.keepAliveInterval(false);
+ groupMember.keepAliveQueryCount(true);
+ } else if (groupMember.getKeepAliveQueryCount() == keepAliveCount) {
+ leaveAction(groupMember);
+ }
+ }
+
+ }
+
+ public static PortNumber getDeviceUplink(DeviceId devId) {
+ return oltData.get(devId).uplink();
+ }
+
+ private void processFilterObjective(DeviceId devId, PortNumber port, boolean remove) {
+ //TODO migrate to packet requests when packet service uses filtering objectives
+ DefaultFilteringObjective.Builder builder = DefaultFilteringObjective.builder();
+
+ builder = remove ? builder.deny() : builder.permit();
+
+ FilteringObjective igmp = builder
+ .withKey(Criteria.matchInPort(port))
+ .addCondition(Criteria.matchEthType(EthType.EtherType.IPV4.ethType()))
+ .addCondition(Criteria.matchIPProtocol(IPv4.PROTOCOL_IGMP))
+ .withMeta(DefaultTrafficTreatment.builder().setOutput(PortNumber.CONTROLLER).build())
+ .fromApp(appId)
+ .withPriority(10000)
+ .add(new ObjectiveContext() {
+ @Override
+ public void onSuccess(Objective objective) {
+ log.info("IgmpProxy filter for {} on {} installed.",
+ devId, port);
+ }
+
+ @Override
+ public void onError(Objective objective, ObjectiveError error) {
+ log.info("IgmpProxy filter for {} on {} failed because {}.",
+ devId, port, error);
+ }
+ });
+
+ flowObjectiveService.filter(devId, igmp);
+ }
+
+ private boolean isConnectPoint(DeviceId device, PortNumber port) {
+ return (connectPointMode && connectPoint.deviceId().equals(device)
+ && connectPoint.port().equals(port));
+ }
+ private boolean isUplink(DeviceId device, PortNumber port) {
+ return ((!connectPointMode) && oltData.containsKey(device)
+ && oltData.get(device).uplink().equals(port));
+ }
+
+ private class InternalDeviceListener implements DeviceListener {
+ @Override
+ public void event(DeviceEvent event) {
+ DeviceId devId = event.subject().id();
+ PortNumber port;
+ if (oltData.get(devId) == null) {
+ return;
+ }
+ switch (event.type()) {
+
+ case DEVICE_ADDED:
+ case DEVICE_UPDATED:
+ case DEVICE_REMOVED:
+ case DEVICE_SUSPENDED:
+ case DEVICE_AVAILABILITY_CHANGED:
+ case PORT_STATS_UPDATED:
+ break;
+ case PORT_ADDED:
+ port = event.port().number();
+ if (oltData.containsKey(devId) && !isUplink(devId, port) && !isConnectPoint(devId, port)) {
+ processFilterObjective(devId, port, false);
+ } else if (isUplink(devId, port)) {
+ provisionUplinkFlows();
+ } else if (isConnectPoint(devId, port)) {
+ provisionConnectPointFlows();
+ }
+ break;
+ case PORT_UPDATED:
+ port = event.port().number();
+ if (oltData.containsKey(devId) && !isUplink(devId, port) && !isConnectPoint(devId, port)) {
+ if (event.port().isEnabled()) {
+ processFilterObjective(devId, port, false);
+ } else {
+ processFilterObjective(devId, port, true);
+ }
+ } else if (isUplink(devId, port)) {
+ if (event.port().isEnabled()) {
+ provisionUplinkFlows(devId);
+ } else {
+ processFilterObjective(devId, port, true);
+ }
+ } else if (isConnectPoint(devId, port)) {
+ if (event.port().isEnabled()) {
+ provisionConnectPointFlows();
+ } else {
+ unprovisionConnectPointFlows();
+ }
+ }
+ break;
+ case PORT_REMOVED:
+ port = event.port().number();
+ processFilterObjective(devId, port, true);
+ break;
+ default:
+ log.info("Unknown device event {}", event.type());
+ break;
+ }
+ }
+
+ @Override
+ public boolean isRelevant(DeviceEvent event) {
+ return true;
+ }
+ }
+
+ private class InternalNetworkConfigListener implements NetworkConfigListener {
+
+ private void reconfigureNetwork(IgmpproxyConfig cfg) {
+ IgmpproxyConfig newCfg;
+ if (cfg == null) {
+ newCfg = new IgmpproxyConfig();
+ } else {
+ newCfg = cfg;
+ }
+
+ unSolicitedTimeout = newCfg.unsolicitedTimeOut();
+ maxResp = newCfg.maxResp();
+ keepAliveInterval = newCfg.keepAliveInterval();
+ keepAliveCount = newCfg.keepAliveCount();
+ lastQueryInterval = newCfg.lastQueryInterval();
+ lastQueryCount = newCfg.lastQueryCount();
+ withRAUplink = newCfg.withRAUplink();
+ withRADownlink = newCfg.withRADownlink();
+ igmpCos = newCfg.igmpCos();
+ periodicQuery = newCfg.periodicQuery();
+ fastLeave = newCfg.fastLeave();
+
+ connectPoint = newCfg.connectPoint();
+ if (connectPointMode != newCfg.connectPointMode()) {
+ connectPointMode = newCfg.connectPointMode();
+ if (connectPointMode) {
+ unprovisionUplinkFlows();
+ provisionConnectPointFlows();
+ } else {
+ unprovisionConnectPointFlows();
+ provisionUplinkFlows();
+ }
+ }
+ if (connectPoint != null) {
+ log.info("connect point :" + connectPoint.toString());
+ }
+ log.info(" mode: " + connectPointMode);
+
+ IgmpSender.getInstance().setIgmpCos(igmpCos);
+ IgmpSender.getInstance().setMaxResp(maxResp);
+ IgmpSender.getInstance().setMvlan(mvlan);
+ IgmpSender.getInstance().setWithRADownlink(withRADownlink);
+ IgmpSender.getInstance().setWithRAUplink(withRAUplink);
+
+ }
+
+ public void reconfigureSsmTable(IgmpproxySsmTranslateConfig cfg) {
+ if (cfg == null) {
+ return;
+ }
+ Collection<McastRoute> translations = cfg.getSsmTranslations();
+ for (McastRoute route : translations) {
+ ssmTranslateTable.put(route.group().getIp4Address(), route.source().getIp4Address());
+ }
+ }
+
+ @Override
+ public void event(NetworkConfigEvent event) {
+ switch (event.type()) {
+ case CONFIG_ADDED:
+ case CONFIG_UPDATED:
+ if (event.configClass().equals(CONFIG_CLASS)) {
+ AccessDeviceConfig config =
+ networkConfig.getConfig((DeviceId) event.subject(), CONFIG_CLASS);
+ if (config != null) {
+ oltData.put(config.getAccessDevice().deviceId(), config.getAccessDevice());
+ provisionDefaultFlows((DeviceId) event.subject());
+ provisionUplinkFlows((DeviceId) event.subject());
+ }
+ }
+
+ if (event.configClass().equals(IGMPPROXY_CONFIG_CLASS)) {
+ IgmpproxyConfig config = networkConfig.getConfig(appId, IGMPPROXY_CONFIG_CLASS);
+ if (config != null) {
+ reconfigureNetwork(config);
+ }
+ }
+
+ if (event.configClass().equals(IGMPPROXY_SSM_CONFIG_CLASS)) {
+ IgmpproxySsmTranslateConfig config = networkConfig.getConfig(appId, IGMPPROXY_SSM_CONFIG_CLASS);
+ if (config != null) {
+ reconfigureSsmTable(config);
+ }
+ }
+
+ if (event.configClass().equals(MCAST_CONFIG_CLASS)) {
+ McastConfig config = networkConfig.getConfig(coreAppId, MCAST_CONFIG_CLASS);
+ if (config != null && mvlan != config.egressVlan().toShort()) {
+ mvlan = config.egressVlan().toShort();
+ groupMemberMap.values().forEach(m -> leaveAction(m));
+ }
+ }
+
+ log.info("Reconfigured");
+ break;
+ case CONFIG_REGISTERED:
+ case CONFIG_UNREGISTERED:
+ break;
+ case CONFIG_REMOVED:
+ if (event.configClass().equals(CONFIG_CLASS)) {
+ oltData.remove(event.subject());
+ }
+
+ default:
+ break;
+ }
+ }
+ }
+
+ private void provisionDefaultFlows(DeviceId deviceId) {
+ List<Port> ports = deviceService.getPorts(deviceId);
+ ports.stream()
+ .filter(p -> (!oltData.get(p.element().id()).uplink().equals(p.number()) && p.isEnabled()))
+ .forEach(p -> processFilterObjective((DeviceId) p.element().id(), p.number(), false));
+ }
+
+ private void provisionUplinkFlows(DeviceId deviceId) {
+ if (connectPointMode) {
+ return;
+ }
+
+ processFilterObjective(deviceId, oltData.get(deviceId).uplink(), false);
+ }
+
+ private void provisionUplinkFlows() {
+ if (connectPointMode) {
+ return;
+ }
+
+ oltData.keySet().forEach(deviceId ->provisionUplinkFlows(deviceId));
+ }
+ private void unprovisionUplinkFlows() {
+ oltData.keySet().forEach(deviceId ->
+ processFilterObjective(deviceId, oltData.get(deviceId).uplink(), true));
+ }
+
+ private void provisionConnectPointFlows() {
+ if ((!connectPointMode) || connectPoint == null) {
+ return;
+ }
+
+ processFilterObjective(connectPoint.deviceId(), connectPoint.port(), false);
+ }
+ private void unprovisionConnectPointFlows() {
+ if (connectPoint == null) {
+ return;
+ }
+ processFilterObjective(connectPoint.deviceId(), connectPoint.port(), true);
+ }
+}
diff --git a/src/main/java/org/opencord/igmpproxy/IgmpSender.java b/src/main/java/org/opencord/igmpproxy/IgmpSender.java
new file mode 100644
index 0000000..21db478
--- /dev/null
+++ b/src/main/java/org/opencord/igmpproxy/IgmpSender.java
@@ -0,0 +1,206 @@
+package org.opencord.igmpproxy;
+
+import org.onlab.packet.Ethernet;
+import org.onlab.packet.IGMP;
+import org.onlab.packet.IGMPMembership;
+import org.onlab.packet.IGMPQuery;
+import org.onlab.packet.IPv4;
+import org.onlab.packet.Ip4Address;
+import org.onlab.packet.MacAddress;
+import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.TrafficTreatment;
+import org.onosproject.net.packet.DefaultOutboundPacket;
+import org.onosproject.net.packet.OutboundPacket;
+import org.onosproject.net.packet.PacketService;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Message encode and send interface for igmpproxy.
+ */
+public final class IgmpSender {
+ static final String V3_REPORT_ADDRESS = "224.0.0.22";
+ static final String MAC_ADDRESS = "DE:AD:BE:EF:BA:11";
+ static final short DEFAULT_MVLAN = 4000;
+ static final byte DEFAULT_COS = 7;
+ static final int DEFAULT_MEX_RESP = 10;
+ static final byte[] RA_BYTES = {(byte) 0x94, (byte) 0x04, (byte) 0x00, (byte) 0x00};
+
+ private static IgmpSender instance = null;
+ private PacketService packetService;
+ private MastershipService mastershipService;
+ private boolean withRAUplink = true;
+ private boolean withRADownlink = false;
+ private short mvlan = DEFAULT_MVLAN;
+ private byte igmpCos = DEFAULT_COS;
+ private int maxResp = DEFAULT_MEX_RESP;
+
+ private IgmpSender(PacketService packetService, MastershipService mastershipService) {
+ this.packetService = packetService;
+ this.mastershipService = mastershipService;
+ }
+
+ public static void init(PacketService packetService, MastershipService mastershipService) {
+ instance = new IgmpSender(packetService, mastershipService);
+ }
+
+ public static IgmpSender getInstance() {
+ return instance;
+ }
+
+ public void setWithRAUplink(boolean withRaUplink) {
+ this.withRAUplink = withRaUplink;
+ }
+
+ public void setWithRADownlink(boolean withRADownlink) {
+ this.withRADownlink = withRADownlink;
+ }
+
+ public void setMvlan(short mvlan) {
+ this.mvlan = mvlan;
+ }
+
+ public void setIgmpCos(byte igmpCos) {
+ this.igmpCos = igmpCos;
+ }
+
+ public void setMaxResp(int maxResp) {
+ this.maxResp = maxResp;
+ }
+
+ public Ethernet buildIgmpV3Join(Ip4Address groupIp, Ip4Address sourceIp) {
+ IGMPMembership igmpMembership = new IGMPMembership(groupIp);
+ igmpMembership.setRecordType(IGMPMembership.CHANGE_TO_EXCLUDE_MODE);
+
+ return buildIgmpPacket(IGMP.TYPE_IGMPV3_MEMBERSHIP_REPORT, groupIp, igmpMembership, sourceIp, false);
+ }
+
+ public Ethernet buildIgmpV3ResponseQuery(Ip4Address groupIp, Ip4Address sourceIp) {
+ IGMPMembership igmpMembership = new IGMPMembership(groupIp);
+ igmpMembership.setRecordType(IGMPMembership.MODE_IS_EXCLUDE);
+
+ return buildIgmpPacket(IGMP.TYPE_IGMPV3_MEMBERSHIP_REPORT, groupIp, igmpMembership, sourceIp, false);
+ }
+
+ public Ethernet buildIgmpV3Leave(Ip4Address groupIp, Ip4Address sourceIp) {
+ IGMPMembership igmpMembership = new IGMPMembership(groupIp);
+ igmpMembership.setRecordType(IGMPMembership.CHANGE_TO_INCLUDE_MODE);
+
+ return buildIgmpPacket(IGMP.TYPE_IGMPV3_MEMBERSHIP_REPORT, groupIp, igmpMembership, sourceIp, false);
+ }
+
+ public Ethernet buildIgmpV2Query(Ip4Address groupIp, Ip4Address sourceIp) {
+ return buildIgmpPacket(IGMP.TYPE_IGMPV3_MEMBERSHIP_QUERY, groupIp, null, sourceIp, true);
+ }
+
+ public Ethernet buildIgmpV3Query(Ip4Address groupIp, Ip4Address sourceIp) {
+ return buildIgmpPacket(IGMP.TYPE_IGMPV3_MEMBERSHIP_QUERY, groupIp, null, sourceIp, false);
+ }
+
+ private Ethernet buildIgmpPacket(byte type, Ip4Address groupIp, IGMPMembership igmpMembership,
+ Ip4Address sourceIp, boolean isV2Query) {
+
+ IGMP igmpPacket;
+ if (isV2Query) {
+ igmpPacket = new IGMP.IGMPv2();
+ } else {
+ igmpPacket = new IGMP.IGMPv3();
+ }
+
+ IPv4 ip4Packet = new IPv4();
+ Ethernet ethPkt = new Ethernet();
+
+ igmpPacket.setIgmpType(type);
+
+ switch (type) {
+ case IGMP.TYPE_IGMPV3_MEMBERSHIP_QUERY:
+ igmpPacket.setMaxRespCode((byte) (maxResp * 10));
+ IGMPQuery igmpQuery = new IGMPQuery(groupIp, 0);
+
+ igmpPacket.addGroup(igmpQuery);
+ ip4Packet.setDestinationAddress(groupIp.toInt());
+ if (withRADownlink) {
+ ip4Packet.setOptions(RA_BYTES);
+ }
+ break;
+
+ case IGMP.TYPE_IGMPV3_MEMBERSHIP_REPORT:
+ if (igmpMembership == null) {
+ return null;
+ }
+ igmpPacket.addGroup(igmpMembership);
+ if (type == IGMP.TYPE_IGMPV3_MEMBERSHIP_REPORT) {
+ ip4Packet.setDestinationAddress(Ip4Address.valueOf(V3_REPORT_ADDRESS).toInt());
+ } else {
+ ip4Packet.setDestinationAddress(groupIp.toInt());
+ }
+ if (withRAUplink) {
+ ip4Packet.setOptions(RA_BYTES);
+ }
+ break;
+
+ case IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT:
+ case IGMP.TYPE_IGMPV2_LEAVE_GROUP:
+ return null;
+ default:
+ return null;
+ }
+
+ igmpPacket.setParent(ip4Packet);
+ ip4Packet.setSourceAddress(sourceIp.toInt());
+ ip4Packet.setProtocol(IPv4.PROTOCOL_IGMP);
+ ip4Packet.setPayload(igmpPacket);
+ ip4Packet.setParent(ethPkt);
+ ip4Packet.setTtl((byte) 0x78);
+
+ ethPkt.setDestinationMACAddress(multiaddToMac(ip4Packet.getDestinationAddress()));
+ ethPkt.setSourceMACAddress(MAC_ADDRESS);
+ ethPkt.setEtherType(Ethernet.TYPE_IPV4);
+ ethPkt.setPayload(ip4Packet);
+ ethPkt.setVlanID(mvlan);
+ ethPkt.setPriorityCode(igmpCos);
+
+ return ethPkt;
+ }
+
+ private MacAddress multiaddToMac(int multiaddress) {
+ byte[] b = new byte[3];
+ b[0] = (byte) (multiaddress & 0xff);
+ b[1] = (byte) (multiaddress >> 8 & 0xff);
+ b[2] = (byte) (multiaddress >> 16 & 0x7f);
+ byte[] macByte = {0x01, 0x00, 0x5e, b[2], b[1], b[0]};
+
+ MacAddress mac = MacAddress.valueOf(macByte);
+ return mac;
+ }
+
+ public void sendIgmpPacketUplink(Ethernet ethPkt, DeviceId deviceId) {
+ if (!mastershipService.isLocalMaster(deviceId)) {
+ return;
+ }
+
+
+ if (IgmpManager.connectPointMode) {
+ sendIgmpPacket(ethPkt, IgmpManager.connectPoint.deviceId(), IgmpManager.connectPoint.port());
+ } else {
+ PortNumber upLink = IgmpManager.getDeviceUplink(deviceId);
+ sendIgmpPacket(ethPkt, deviceId, upLink);
+ }
+ }
+
+ public void sendIgmpPacket(Ethernet ethPkt, DeviceId deviceId, PortNumber portNumber) {
+ if (!mastershipService.isLocalMaster(deviceId)) {
+ return;
+ }
+
+ TrafficTreatment treatment = DefaultTrafficTreatment.builder()
+ .setOutput(portNumber).build();
+ OutboundPacket packet = new DefaultOutboundPacket(deviceId,
+ treatment, ByteBuffer.wrap(ethPkt.serialize()));
+ packetService.emit(packet);
+
+ }
+}
diff --git a/src/main/java/org/opencord/igmpproxy/IgmpTimer.java b/src/main/java/org/opencord/igmpproxy/IgmpTimer.java
new file mode 100644
index 0000000..0124b42
--- /dev/null
+++ b/src/main/java/org/opencord/igmpproxy/IgmpTimer.java
@@ -0,0 +1,69 @@
+package org.opencord.igmpproxy;
+
+import com.google.common.collect.Maps;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Implement the timer for igmp state machine.
+ */
+public final class IgmpTimer {
+
+ public static final int INVALID_TIMER_ID = 0;
+ public static int timerId = INVALID_TIMER_ID + 1;
+ private static Map<Integer, SingleTimer> igmpTimerMap = Maps.newConcurrentMap();
+
+ private IgmpTimer(){
+
+ }
+ private static int getId() {
+ return timerId++;
+ }
+
+ public static int start(SingleStateMachine machine, int timeOut) {
+ int id = getId();
+ igmpTimerMap.put(id, new SingleTimer(machine, timeOut));
+ return id;
+ }
+
+ public static int reset(int oldId, SingleStateMachine machine, int timeOut) {
+ igmpTimerMap.remove(new Integer(oldId));
+ int id = getId();
+ igmpTimerMap.put(new Integer(id), new SingleTimer(machine, timeOut));
+ return id;
+ }
+
+ public static void cancel(int id) {
+ igmpTimerMap.remove(new Integer(id));
+ }
+
+
+ static void timeOut1s() {
+ Set mapSet = igmpTimerMap.entrySet();
+ Iterator itr = mapSet.iterator();
+ while (itr.hasNext()) {
+ Map.Entry entry = (Map.Entry) itr.next();
+ SingleTimer single = (SingleTimer) entry.getValue();
+ if (single.timeOut > 0) {
+ single.timeOut--;
+ } else {
+ single.machine.timeOut();
+ itr.remove();
+ }
+ }
+ }
+
+ static class SingleTimer {
+
+ public int timeOut; // unit is 1 second
+ public SingleStateMachine machine;
+
+ public SingleTimer(SingleStateMachine machine, int timeOut) {
+ this.machine = machine;
+ this.timeOut = timeOut;
+ }
+
+ }
+}
diff --git a/src/main/java/org/opencord/igmpproxy/IgmpproxyConfig.java b/src/main/java/org/opencord/igmpproxy/IgmpproxyConfig.java
new file mode 100644
index 0000000..0756218
--- /dev/null
+++ b/src/main/java/org/opencord/igmpproxy/IgmpproxyConfig.java
@@ -0,0 +1,155 @@
+package org.opencord.igmpproxy;
+
+import org.onosproject.core.ApplicationId;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.config.Config;
+import org.onosproject.net.config.basics.BasicElementConfig;
+
+/**
+ * Net configuration class for igmpproxy.
+ */
+public class IgmpproxyConfig extends Config<ApplicationId> {
+ protected static final String DEFAULT_UNSOLICITED_TIMEOUT = "2";
+ protected static final String DEFAULT_MAX_RESP = "10";
+ protected static final String DEFAULT_KEEP_ALIVE_INTERVAL = "120";
+ protected static final String DEFAULT_KEEP_ALIVE_COUNT = "3";
+ protected static final String DEFAULT_LAST_QUERY_INTERVAL = "2";
+ protected static final String DEFAULT_LAST_QUERY_COUNT = "2";
+ protected static final String DEFAULT_IGMP_COS = "7";
+ protected static final Boolean DEFAULT_FAST_LEAVE = false;
+ protected static final Boolean DEFAULT_PERIODIC_QUERY = true;
+ protected static final String DEFAULT_WITH_RA_UPLINK = "true";
+ protected static final String DEFAULT_WITH_RA_DOWNLINK = "true";
+ protected static final String CONNECT_POINT_MODE = "globalConnectPointMode";
+ protected static final String CONNECT_POINT = "globalConnectPoint";
+ private static final String UNSOLICITED_TIMEOUT = "UnsolicitedTimeOut";
+ private static final String MAX_RESP = "MaxResp";
+ private static final String KEEP_ALIVE_INTERVAL = "KeepAliveInterval";
+ private static final String KEEP_ALIVE_COUNT = "KeepAliveCount";
+ private static final String LAST_QUERY_INTERVAL = "LastQueryInterval";
+ private static final String LAST_QUERY_COUNT = "LastQueryCount";
+ private static final String FAST_LEAVE = "FastLeave";
+ private static final String PERIODIC_QUERY = "PeriodicQuery";
+ private static final String IGMP_COS = "IgmpCos";
+ private static final String WITH_RA_UPLINK = "withRAUpLink";
+ private static final String WITH_RA_DOWN_LINK = "withRADownLink";
+ private static final Boolean DEFAULT_CONNECT_POINT_MODE = true;
+
+ /**
+ * Gets the value of a string property, protecting for an empty
+ * JSON object.
+ *
+ * @param name name of the property
+ * @param defaultValue default value if none has been specified
+ * @return String value if one os found, default value otherwise
+ */
+ private String getStringProperty(String name, String defaultValue) {
+ if (object == null) {
+ return defaultValue;
+ }
+ return get(name, defaultValue);
+ }
+
+ public int unsolicitedTimeOut() {
+ return Integer.parseInt(getStringProperty(UNSOLICITED_TIMEOUT, DEFAULT_UNSOLICITED_TIMEOUT));
+ }
+
+ public BasicElementConfig unsolicitedTimeOut(int timeout) {
+ return (BasicElementConfig) setOrClear(UNSOLICITED_TIMEOUT, timeout);
+ }
+
+ public int maxResp() {
+ return Integer.parseInt(getStringProperty(MAX_RESP, DEFAULT_MAX_RESP));
+ }
+
+ public BasicElementConfig maxResp(int maxResp) {
+ return (BasicElementConfig) setOrClear(MAX_RESP, maxResp);
+ }
+
+ public int keepAliveInterval() {
+ return Integer.parseInt(getStringProperty(KEEP_ALIVE_INTERVAL, DEFAULT_KEEP_ALIVE_INTERVAL));
+ }
+
+ public BasicElementConfig keepAliveInterval(int interval) {
+ return (BasicElementConfig) setOrClear(KEEP_ALIVE_INTERVAL, interval);
+ }
+
+ public int keepAliveCount() {
+ return Integer.parseInt(getStringProperty(KEEP_ALIVE_COUNT, DEFAULT_KEEP_ALIVE_COUNT));
+ }
+
+ public BasicElementConfig keepAliveCount(int count) {
+ return (BasicElementConfig) setOrClear(KEEP_ALIVE_COUNT, count);
+ }
+
+ public int lastQueryInterval() {
+ return Integer.parseInt(getStringProperty(LAST_QUERY_INTERVAL, DEFAULT_LAST_QUERY_INTERVAL));
+ }
+
+ public BasicElementConfig lastQueryInterval(int interval) {
+ return (BasicElementConfig) setOrClear(LAST_QUERY_INTERVAL, interval);
+ }
+
+ public int lastQueryCount() {
+ return Integer.parseInt(getStringProperty(LAST_QUERY_COUNT, DEFAULT_LAST_QUERY_COUNT));
+ }
+
+ public BasicElementConfig lastQueryCount(int count) {
+ return (BasicElementConfig) setOrClear(LAST_QUERY_COUNT, count);
+ }
+
+ public boolean fastLeave() {
+ if (object == null || object.path(FAST_LEAVE) == null) {
+ return DEFAULT_FAST_LEAVE;
+ }
+ return Boolean.parseBoolean(getStringProperty(FAST_LEAVE, DEFAULT_FAST_LEAVE.toString()));
+ }
+
+ public BasicElementConfig fastLeave(boolean fastLeave) {
+ return (BasicElementConfig) setOrClear(FAST_LEAVE, fastLeave);
+ }
+
+ public boolean periodicQuery() {
+ if (object == null || object.path(PERIODIC_QUERY) == null) {
+ return DEFAULT_PERIODIC_QUERY;
+ }
+ return Boolean.parseBoolean(getStringProperty(PERIODIC_QUERY, DEFAULT_PERIODIC_QUERY.toString()));
+ }
+
+ public BasicElementConfig periodicQuery(boolean periodicQuery) {
+ return (BasicElementConfig) setOrClear(PERIODIC_QUERY, periodicQuery);
+ }
+
+ public byte igmpCos() {
+ return Byte.parseByte(getStringProperty(IGMP_COS, DEFAULT_IGMP_COS));
+ }
+
+ public boolean withRAUplink() {
+ if (object == null || object.path(WITH_RA_UPLINK) == null) {
+ return true;
+ }
+ return Boolean.parseBoolean(getStringProperty(WITH_RA_UPLINK, DEFAULT_WITH_RA_UPLINK));
+ }
+
+ public boolean withRADownlink() {
+ if (object == null || object.path(WITH_RA_DOWN_LINK) == null) {
+ return false;
+ }
+ return Boolean.parseBoolean(getStringProperty(WITH_RA_DOWN_LINK, DEFAULT_WITH_RA_DOWNLINK));
+ }
+
+ public boolean connectPointMode() {
+ if (object == null || object.path(CONNECT_POINT_MODE) == null) {
+ return DEFAULT_CONNECT_POINT_MODE;
+ }
+ return Boolean.parseBoolean(getStringProperty(CONNECT_POINT_MODE, DEFAULT_CONNECT_POINT_MODE.toString()));
+ }
+
+ public ConnectPoint connectPoint() {
+ if (object == null || object.path(CONNECT_POINT) == null) {
+ return null;
+ }
+
+ return ConnectPoint.deviceConnectPoint(getStringProperty(CONNECT_POINT, ""));
+ }
+}
diff --git a/src/main/java/org/opencord/igmpproxy/IgmpproxySsmTranslateConfig.java b/src/main/java/org/opencord/igmpproxy/IgmpproxySsmTranslateConfig.java
new file mode 100644
index 0000000..b8f42a5
--- /dev/null
+++ b/src/main/java/org/opencord/igmpproxy/IgmpproxySsmTranslateConfig.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.igmpproxy;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.onlab.packet.IpAddress;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.net.config.Config;
+import org.onosproject.net.mcast.McastRoute;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * IGMP proxy SSM translate configuration.
+ */
+public class IgmpproxySsmTranslateConfig extends Config<ApplicationId> {
+
+ private static final String SOURCE = "source";
+ private static final String GROUP = "group";
+
+ @Override
+ public boolean isValid() {
+ for (JsonNode node : array) {
+ if (!hasOnlyFields((ObjectNode) node, SOURCE, GROUP)) {
+ return false;
+ }
+
+ if (!(isIpAddress((ObjectNode) node, SOURCE, FieldPresence.MANDATORY) &&
+ isIpAddress((ObjectNode) node, GROUP, FieldPresence.MANDATORY))) {
+ return false;
+ }
+
+ }
+ return true;
+ }
+
+ /**
+ * Gets the list of SSM translations.
+ *
+ * @return SSM translations
+ */
+ public List<McastRoute> getSsmTranslations() {
+ List<McastRoute> translations = new ArrayList();
+ for (JsonNode node : array) {
+ translations.add(
+ new McastRoute(
+ IpAddress.valueOf(node.path(SOURCE).asText().trim()),
+ IpAddress.valueOf(node.path(GROUP).asText().trim()),
+ McastRoute.Type.STATIC));
+ }
+
+ return translations;
+ }
+}
diff --git a/src/main/java/org/opencord/igmpproxy/SingleStateMachine.java b/src/main/java/org/opencord/igmpproxy/SingleStateMachine.java
new file mode 100644
index 0000000..ceaac52
--- /dev/null
+++ b/src/main/java/org/opencord/igmpproxy/SingleStateMachine.java
@@ -0,0 +1,159 @@
+package org.opencord.igmpproxy;
+
+import org.onlab.packet.Ethernet;
+import org.onlab.packet.Ip4Address;
+import org.onosproject.net.DeviceId;
+
+import java.util.Random;
+
+/**
+ * State machine for single IGMP group member. The state machine is implemented on
+ * RFC 2236 "6. Host State Diagram".
+ */
+public class SingleStateMachine {
+ static final int STATE_NON = 0;
+ static final int STATE_DELAY = 1;
+ static final int STATE_IDLE = 2;
+ static final int TRANSITION_JOIN = 0;
+ static final int TRANSITION_LEAVE = 1;
+ static final int TRANSITION_QUERY = 2;
+ static final int TRANSITION_TIMEOUT = 3;
+ static final int DEFAULT_MAX_RESP = 0xfffffff;
+ static final int DEFAULT_COUNT = 1;
+ private DeviceId devId;
+ private Ip4Address groupIp;
+ private Ip4Address srcIp;
+
+ private int count = DEFAULT_COUNT;
+ private int timerId = IgmpTimer.INVALID_TIMER_ID;
+ private int timeOut = DEFAULT_MAX_RESP;
+ private State[] states =
+ {
+ new NonMember(), new DelayMember(), new IdleMember()
+ };
+ private int[] nonTransition =
+ {STATE_DELAY, STATE_NON, STATE_NON, STATE_NON};
+ private int[] delayTransition =
+ {STATE_DELAY, STATE_NON, STATE_DELAY, STATE_IDLE};
+ private int[] idleTransition =
+ {STATE_IDLE, STATE_NON, STATE_DELAY, STATE_IDLE};
+ //THE TRANSITION TABLE
+ private int[][] transition =
+ {nonTransition, delayTransition, idleTransition};
+ private int currentState = STATE_NON;
+
+ public SingleStateMachine(DeviceId devId, Ip4Address groupIp, Ip4Address src) {
+ this.devId = devId;
+ this.groupIp = groupIp;
+ this.srcIp = src;
+ }
+
+
+ public DeviceId getDeviceId() {
+ return devId;
+ }
+ public boolean increaseCounter() {
+ count++;
+ return true;
+ }
+
+ public boolean decreaseCounter() {
+ if (count > 0) {
+ count--;
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ public int getCounter() {
+ return count;
+ }
+ public int currentState() {
+ return currentState;
+ }
+
+ private void next(int msg) {
+ currentState = transition[currentState][msg];
+ }
+
+ public void join() {
+ states[currentState].join();
+ next(TRANSITION_JOIN);
+ }
+
+ public void leave() {
+ states[currentState].leave();
+ next(TRANSITION_LEAVE);
+ }
+
+ public void query(int maxResp) {
+ states[currentState].query(maxResp);
+ next(TRANSITION_QUERY);
+ }
+
+ public void timeOut() {
+ states[currentState].timeOut();
+ next(TRANSITION_TIMEOUT);
+ }
+
+ int getTimeOut(int maxTimeOut) {
+ Random random = new Random();
+ return Math.abs(random.nextInt()) % maxTimeOut;
+ }
+
+ protected void cancelTimer() {
+ if (IgmpTimer.INVALID_TIMER_ID != timerId) {
+ IgmpTimer.cancel(timerId);
+ }
+ }
+
+ class State {
+ public void join() {
+ }
+
+ public void leave() {
+ Ethernet eth = IgmpSender.getInstance().buildIgmpV3Leave(groupIp, srcIp);
+ IgmpSender.getInstance().sendIgmpPacketUplink(eth, devId);
+ }
+
+ public void query(int maxResp) {
+ }
+
+ public void timeOut() {
+ }
+
+ }
+
+ class NonMember extends State {
+ public void join() {
+ Ethernet eth = IgmpSender.getInstance().buildIgmpV3Join(groupIp, srcIp);
+ IgmpSender.getInstance().sendIgmpPacketUplink(eth, devId);
+ timeOut = getTimeOut(IgmpManager.getUnsolicitedTimeout());
+ timerId = IgmpTimer.start(SingleStateMachine.this, timeOut);
+ }
+ }
+
+ class DelayMember extends State {
+ public void query(int maxResp) {
+ if (maxResp < timeOut) {
+ timeOut = getTimeOut(maxResp);
+ timerId = IgmpTimer.reset(timerId, SingleStateMachine.this, timeOut);
+ }
+ }
+
+ public void timeOut() {
+ Ethernet eth = IgmpSender.getInstance().buildIgmpV3ResponseQuery(groupIp, srcIp);
+ IgmpSender.getInstance().sendIgmpPacketUplink(eth, devId);
+ timeOut = DEFAULT_MAX_RESP;
+ }
+
+ }
+
+ class IdleMember extends State {
+ public void query(int maxResp) {
+ timeOut = getTimeOut(maxResp);
+ timerId = IgmpTimer.start(SingleStateMachine.this, timeOut);
+ }
+ }
+}
diff --git a/src/main/java/org/opencord/igmpproxy/StateMachine.java b/src/main/java/org/opencord/igmpproxy/StateMachine.java
new file mode 100644
index 0000000..6c12877
--- /dev/null
+++ b/src/main/java/org/opencord/igmpproxy/StateMachine.java
@@ -0,0 +1,97 @@
+package org.opencord.igmpproxy;
+
+import com.google.common.collect.Maps;
+import org.onlab.packet.Ip4Address;
+import org.onosproject.net.DeviceId;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * State machine for whole IGMP process. The state machine is implemented on
+ * RFC 2236 "6. Host State Diagram".
+ */
+public final class StateMachine {
+ private StateMachine() {
+
+ }
+ private static Map<String, SingleStateMachine> map = Maps.newConcurrentMap();
+
+ private static String getId(DeviceId devId, Ip4Address groupIp) {
+ return devId.toString() + "Group" + groupIp.toString();
+ }
+
+ private static SingleStateMachine get(DeviceId devId, Ip4Address groupIp) {
+ String id = getId(devId, groupIp);
+ return map.get(id);
+ }
+
+ public static void destorySingle(DeviceId devId, Ip4Address groupIp) {
+ SingleStateMachine machine = get(devId, groupIp);
+ if (null == machine) {
+ return;
+ }
+ machine.cancelTimer();
+ map.remove(getId(devId, groupIp));
+ }
+
+ public static boolean join(DeviceId devId, Ip4Address groupIp, Ip4Address srcIP) {
+ SingleStateMachine machine = get(devId, groupIp);
+ if (null == machine) {
+ machine = new SingleStateMachine(devId, groupIp, srcIP);
+ map.put(getId(devId, groupIp), machine);
+ machine.join();
+ return true;
+ }
+ machine.increaseCounter();
+ return false;
+ }
+
+ public static boolean leave(DeviceId devId, Ip4Address groupIp) {
+ SingleStateMachine machine = get(devId, groupIp);
+ if (null == machine) {
+ return false;
+ }
+ machine.decreaseCounter();
+
+ if (machine.getCounter() == 0) {
+ machine.leave();
+ destorySingle(devId, groupIp);
+ return true;
+ }
+ return false;
+ }
+
+ static void specialQuery(DeviceId devId, Ip4Address groupIp, int maxResp) {
+ SingleStateMachine machine = get(devId, groupIp);
+ if (null == machine) {
+ return;
+ }
+ machine.query(maxResp);
+ }
+
+ static void generalQuery(DeviceId devId, int maxResp) {
+ for (Map.Entry<String, SingleStateMachine> entry : map.entrySet()) {
+ SingleStateMachine machine = entry.getValue();
+ if (devId.equals(machine.getDeviceId())) {
+ machine.query(maxResp);
+ }
+ }
+ }
+
+ public static Set<Map.Entry<String, SingleStateMachine>> entrySet() {
+ return map.entrySet();
+ }
+
+ public static void timeOut(DeviceId devId, Ip4Address groupIp) {
+ SingleStateMachine machine = get(devId, groupIp);
+ if (null == machine) {
+ return;
+ }
+ machine.timeOut();
+ }
+
+ public static void clearMap() {
+ map.clear();
+ }
+
+}
diff --git a/src/main/java/org/opencord/igmpproxy/package-info.java b/src/main/java/org/opencord/igmpproxy/package-info.java
new file mode 100644
index 0000000..ad33c32
--- /dev/null
+++ b/src/main/java/org/opencord/igmpproxy/package-info.java
@@ -0,0 +1,4 @@
+/**
+ * Created by onos on 17-3-9.
+ */
+package org.opencord.igmpproxy;
\ No newline at end of file