olt installs default rules on startup and when a port shows up

Change-Id: I0db62db020f94500aeae7191f7681745e1268672
diff --git a/src/main/java/org/onosproject/olt/Olt.java b/src/main/java/org/onosproject/olt/Olt.java
index d3120dd..4b2f55b 100644
--- a/src/main/java/org/onosproject/olt/Olt.java
+++ b/src/main/java/org/onosproject/olt/Olt.java
@@ -23,12 +23,15 @@
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
+import org.onlab.packet.EthType;
+import org.onlab.packet.IPv4;
 import org.onlab.packet.VlanId;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
 import org.onosproject.event.AbstractListenerManager;
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.DeviceId;
+import org.onosproject.net.Port;
 import org.onosproject.net.PortNumber;
 import org.onosproject.net.config.ConfigFactory;
 import org.onosproject.net.config.NetworkConfigEvent;
@@ -42,7 +45,10 @@
 import org.onosproject.net.flow.DefaultTrafficTreatment;
 import org.onosproject.net.flow.TrafficSelector;
 import org.onosproject.net.flow.TrafficTreatment;
+import org.onosproject.net.flow.criteria.Criteria;
+import org.onosproject.net.flowobjective.DefaultFilteringObjective;
 import org.onosproject.net.flowobjective.DefaultForwardingObjective;
+import org.onosproject.net.flowobjective.FilteringObjective;
 import org.onosproject.net.flowobjective.FlowObjectiveService;
 import org.onosproject.net.flowobjective.ForwardingObjective;
 import org.onosproject.net.flowobjective.Objective;
@@ -126,6 +132,7 @@
         networkConfig.registerConfigFactory(configFactory);
         networkConfig.addListener(configListener);
 
+
         networkConfig.getSubjects(DeviceId.class, AccessDeviceConfig.class).forEach(
                 subject -> {
                     AccessDeviceConfig config = networkConfig.getConfig(subject, AccessDeviceConfig.class);
@@ -136,6 +143,12 @@
                 }
         );
 
+        oltData.keySet().stream()
+                .flatMap(did -> deviceService.getPorts(did).stream())
+                .filter(p -> oltData.get(p.element().id()).uplink() != p.number())
+                .filter(p -> p.isEnabled())
+                .forEach(p -> installFilteringObjectives((DeviceId) p.element().id(), p));
+
         log.info("Started with Application ID {}", appId.id());
     }
 
@@ -195,17 +208,17 @@
 
         fwds.stream().forEach(
                 fwd -> flowObjectiveService.forward(deviceId,
-                                                 fwd.remove(new ObjectiveContext() {
-                                                     @Override
-                                                     public void onSuccess(Objective objective) {
-                                                         upFuture.complete(null);
-                                                     }
+                                                    fwd.remove(new ObjectiveContext() {
+                                                        @Override
+                                                        public void onSuccess(Objective objective) {
+                                                            upFuture.complete(null);
+                                                        }
 
-                                                     @Override
-                                                     public void onError(Objective objective, ObjectiveError error) {
-                                                         upFuture.complete(error);
-                                                     }
-                                                 })));
+                                                        @Override
+                                                        public void onError(Objective objective, ObjectiveError error) {
+                                                            upFuture.complete(error);
+                                                        }
+                                                    })));
 
         upFuture.thenAcceptBothAsync(downFuture, (upStatus, downStatus) -> {
             if (upStatus == null && downStatus == null) {
@@ -327,6 +340,57 @@
 
     }
 
+    private void installFilteringObjectives(DeviceId devId, Port port) {
+        FilteringObjective eapol = DefaultFilteringObjective.builder()
+                .permit()
+                .withKey(Criteria.matchInPort(port.number()))
+                .addCondition(Criteria.matchEthType(EthType.EtherType.EAPOL.ethType()))
+                .withMeta(DefaultTrafficTreatment.builder()
+                                  .setOutput(PortNumber.CONTROLLER).build())
+                .fromApp(appId)
+                .withPriority(1000)
+                .add(new ObjectiveContext() {
+                    @Override
+                    public void onSuccess(Objective objective) {
+                        log.info("Eapol filter for {} on {} installed.",
+                                 devId, port);
+                    }
+
+                    @Override
+                    public void onError(Objective objective, ObjectiveError error) {
+                        log.info("Eapol filter for {} on {} failed because {}",
+                                 devId, port, error);
+                    }
+                });
+
+
+        FilteringObjective igmp = DefaultFilteringObjective.builder()
+                .permit()
+                .withKey(Criteria.matchInPort(port.number()))
+                .addCondition(Criteria.matchEthType(EthType.EtherType.IPV4.ethType()))
+                .addCondition(Criteria.matchIPProtocol(IPv4.PROTOCOL_IGMP))
+                .withMeta(DefaultTrafficTreatment.builder()
+                                  .setOutput(PortNumber.CONTROLLER).build())
+                .fromApp(appId)
+                .withPriority(1000)
+                .add(new ObjectiveContext() {
+                    @Override
+                    public void onSuccess(Objective objective) {
+                        log.info("Igmp filter for {} on {} installed.",
+                                 devId, port);
+                    }
+
+                    @Override
+                    public void onError(Objective objective, ObjectiveError error) {
+                        log.info("Igmp filter for {} on {} failed because {}.",
+                                 devId, port, error);
+                    }
+                });
+
+        flowObjectiveService.filter(devId, eapol);
+        flowObjectiveService.filter(devId, igmp);
+    }
+
     private class InternalDeviceListener implements DeviceListener {
         @Override
         public void event(DeviceEvent event) {
@@ -336,7 +400,20 @@
                 return;
             }
             switch (event.type()) {
+                //TODO: Port handling and bookkeeping should be inproved once
+                // olt firmware handles correct behaviour.
                 case PORT_ADDED:
+                    if (event.port().isEnabled()) {
+                        installFilteringObjectives(devId, event.port());
+                    }
+                    break;
+                case PORT_REMOVED:
+                    AccessDeviceData olt = oltData.get(devId);
+                    unprovisionSubscriber(devId, olt.uplink(),
+                                          event.port().number(),
+                                          olt.vlan());
+                    installFilteringObjectives(devId, event.port());
+                    break;
                 case PORT_UPDATED:
                     break;
                 case DEVICE_ADDED:
@@ -352,7 +429,6 @@
                 case DEVICE_UPDATED:
                 case DEVICE_SUSPENDED:
                 case DEVICE_AVAILABILITY_CHANGED:
-                case PORT_REMOVED:
                 case PORT_STATS_UPDATED:
                 default:
                     return;