[VOL-4180] Multi UNI feature implemented to OLT application.

Change-Id: I3d45719ebdce304ba94652ed9de553e40d76a77c

EAPOL flow bug-fixed

review fixes finshed

Multi UNI feature implemented to OLT application.

- It's possible to fetch a meter by annotations. (OltPipeline)
- New meters can be created for bandwidth profiles of OLT device.
- Olt meterId is transported via writeMetadata so that voltha/rw-core can parse it and assign the correct meters to ONU and OLT flows.

Change-Id: Ia6c9909b5f03b0f3fe329bd11580f891bfab3a32
diff --git a/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java b/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
index 466d363..1868421 100644
--- a/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
+++ b/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
@@ -27,7 +27,9 @@
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
 import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.Annotations;
 import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.DefaultAnnotations;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.PortNumber;
 import org.onosproject.net.device.DeviceService;
@@ -66,8 +68,11 @@
 import org.osgi.service.component.annotations.ReferencePolicy;
 import org.slf4j.Logger;
 
+import java.util.Arrays;
 import java.util.Dictionary;
+import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.BlockingQueue;
@@ -271,6 +276,7 @@
     @Override
     public void processDhcpFilteringObjectives(AccessDevicePort port,
                                                MeterId upstreamMeterId,
+                                               MeterId upstreamOltMeterId,
                                                UniTagInformation tagInformation,
                                                boolean install,
                                                boolean upstream,
@@ -306,8 +312,8 @@
             byte protocol = IPv4.PROTOCOL_UDP;
 
             addDhcpFilteringObjectives(port, udpSrc, udpDst, ethType,
-                    upstreamMeterId, techProfileId, protocol, cTag, unitagMatch,
-                                       vlanPcp, upstream, install, dhcpFuture);
+                    upstreamMeterId, upstreamOltMeterId, techProfileId, protocol, cTag, unitagMatch,
+                    vlanPcp, upstream, install, dhcpFuture);
         }
 
         if (enableDhcpV6) {
@@ -318,14 +324,14 @@
             byte protocol = IPv6.PROTOCOL_UDP;
 
             addDhcpFilteringObjectives(port, udpSrc, udpDst, ethType,
-                    upstreamMeterId, techProfileId, protocol, cTag, unitagMatch,
-                                       vlanPcp, upstream, install, dhcpFuture);
+                    upstreamMeterId, upstreamOltMeterId, techProfileId, protocol, cTag, unitagMatch,
+                    vlanPcp, upstream, install, dhcpFuture);
         }
     }
 
     private void addDhcpFilteringObjectives(AccessDevicePort port, int udpSrc, int udpDst,
-                                            EthType ethType, MeterId upstreamMeterId, int techProfileId, byte protocol,
-                                            VlanId cTag, VlanId unitagMatch,
+                                            EthType ethType, MeterId upstreamMeterId, MeterId upstreamOltMeterId,
+                                            int techProfileId, byte protocol, VlanId cTag, VlanId unitagMatch,
                                             Byte vlanPcp, boolean upstream,
                                             boolean install, Optional<CompletableFuture<ObjectiveError>> dhcpFuture) {
 
@@ -337,7 +343,7 @@
         }
 
         if (techProfileId != NONE_TP_ID) {
-            treatmentBuilder.writeMetadata(createTechProfValueForWm(unitagMatch, techProfileId), 0);
+            treatmentBuilder.writeMetadata(createTechProfValueForWm(unitagMatch, techProfileId, upstreamOltMeterId), 0);
         }
 
         FilteringObjective.Builder dhcpUpstreamBuilder = (install ? builder.permit() : builder.deny())
@@ -389,6 +395,7 @@
     @Override
     public void processPPPoEDFilteringObjectives(AccessDevicePort port,
                                                  MeterId upstreamMeterId,
+                                                 MeterId upstreamOltMeterId,
                                                  UniTagInformation tagInformation,
                                                  boolean install,
                                                  boolean upstream) {
@@ -420,7 +427,7 @@
         }
 
         if (techProfileId != NONE_TP_ID) {
-            treatmentBuilder.writeMetadata(createTechProfValueForWm(cTag, techProfileId), 0);
+            treatmentBuilder.writeMetadata(createTechProfValueForWm(cTag, techProfileId, upstreamOltMeterId), 0);
         }
 
         DefaultFilteringObjective.Builder pppoedBuilder = (install ? builder.permit() : builder.deny())
@@ -459,6 +466,7 @@
     @Override
     public void processIgmpFilteringObjectives(AccessDevicePort port,
                                                MeterId upstreamMeterId,
+                                               MeterId upstreamOltMeterId,
                                                UniTagInformation tagInformation,
                                                boolean install,
                                                boolean upstream) {
@@ -484,7 +492,7 @@
 
             if (tagInformation.getTechnologyProfileId() != NONE_TP_ID) {
                 treatmentBuilder.writeMetadata(createTechProfValueForWm(null,
-                        tagInformation.getTechnologyProfileId()), 0);
+                        tagInformation.getTechnologyProfileId(), upstreamOltMeterId), 0);
             }
 
 
@@ -532,7 +540,7 @@
     }
 
     @Override
-    public void processEapolFilteringObjectives(AccessDevicePort port, String bpId,
+    public void processEapolFilteringObjectives(AccessDevicePort port, String bpId, Optional<String> oltBpId,
                                                 CompletableFuture<ObjectiveError> filterFuture,
                                                 VlanId vlanId, boolean install) {
 
@@ -546,6 +554,12 @@
         }
         log.info("Processing EAPOL with Bandwidth profile {} on {}", bpId, port);
         BandwidthProfileInformation bpInfo = getBandwidthProfileInformation(bpId);
+        BandwidthProfileInformation oltBpInfo;
+        if (oltBpId.isPresent()) {
+            oltBpInfo = getBandwidthProfileInformation(oltBpId.get());
+        } else {
+            oltBpInfo = bpInfo;
+        }
         if (bpInfo == null) {
             log.warn("Bandwidth profile {} is not found. Authentication flow"
                     + " will not be installed on {}", bpId, port);
@@ -558,19 +572,22 @@
         ConnectPoint cp = new ConnectPoint(port.deviceId(), port.number());
         DefaultFilteringObjective.Builder filterBuilder = DefaultFilteringObjective.builder();
         TrafficTreatment.Builder treatmentBuilder = DefaultTrafficTreatment.builder();
-        CompletableFuture<Object> meterFuture = new CompletableFuture<>();
         // check if meter exists and create it only for an install
         final MeterId meterId = oltMeterService.getMeterIdFromBpMapping(port.deviceId(), bpInfo.id());
-        log.info("Meter id {} for Bandwidth profile {} associated to EAPOL on {}",
-                meterId, bpInfo.id(), port.deviceId());
-        if (meterId == null) {
+        MeterId oltMeterId = null;
+        if (oltBpId.isPresent()) {
+            oltMeterId = oltBpId.map(id -> oltMeterService.getMeterIdFromBpMapping(port.deviceId(), id)).orElse(null);
+        }
+        log.info("Meter id {} for Bandwidth profile {} and OLT meter id {} for OLT Bandwidth profile {} " +
+                        "associated to EAPOL on {}", meterId, bpInfo.id(), oltMeterId, oltBpId, port.deviceId());
+        if (meterId == null || (oltBpId.isPresent() && oltMeterId == null)) {
             if (install) {
                 log.debug("Need to install meter for EAPOL with bwp {} on {}", bpInfo.id(), port);
                 SubscriberFlowInfo fi = new SubscriberFlowInfo(null, port,
-                                                               new UniTagInformation.Builder()
-                                                                       .setPonCTag(vlanId).build(),
-                                                               null, null,
-                                                               null, bpInfo.id());
+                                                            new UniTagInformation.Builder()
+                                                                    .setPonCTag(vlanId).build(),
+                                                        null, meterId, null, oltMeterId,
+                                                    null, bpInfo.id(), null, oltBpInfo.id());
                 pendingEapolForDevice.compute(port.deviceId(), (id, queue) -> {
                     if (queue == null) {
                         queue = new LinkedBlockingQueue<>();
@@ -580,11 +597,14 @@
                 });
 
                 //If false the meter is already being installed, skipping installation
-                if (!oltMeterService.checkAndAddPendingMeter(port.deviceId(), bpInfo)) {
+                if (!oltMeterService.checkAndAddPendingMeter(port.deviceId(), bpInfo) &&
+                        !oltMeterService.checkAndAddPendingMeter(port.deviceId(), oltBpInfo)) {
                     return;
                 }
-                MeterId innerMeterId = oltMeterService.createMeter(port.deviceId(), bpInfo, meterFuture);
-                fi.setUpMeterId(innerMeterId);
+                List<BandwidthProfileInformation> bwpList = Arrays.asList(bpInfo, oltBpInfo);
+                bwpList.stream().distinct().filter(Objects::nonNull)
+                        .forEach(bwp -> createMeterAndProceedEapol(port, bwp, filterFuture, install,
+                        cp, filterBuilder, treatmentBuilder));
             } else {
                 // this case should not happen as the request to remove an eapol
                 // flow should mean that the flow points to a meter that exists.
@@ -593,54 +613,75 @@
                 log.warn("Unknown meter id for bp {}, still proceeding with "
                         + "delete of eapol flow on {}", bpInfo.id(), port);
                 SubscriberFlowInfo fi = new SubscriberFlowInfo(null, port,
-                                                               new UniTagInformation.Builder()
-                                                                       .setPonCTag(vlanId).build(),
-                                                               null, meterId,
-                                                               null, bpInfo.id());
-                handleEapol(filterFuture, install, cp, filterBuilder, treatmentBuilder, fi, meterId);
+                        new UniTagInformation.Builder().setPonCTag(vlanId).build(),
+                        null, meterId, null, oltMeterId, null,
+                        bpInfo.id(), null, oltBpInfo.id());
+                handleEapol(filterFuture, install, cp, filterBuilder, treatmentBuilder, fi, meterId, oltMeterId);
             }
         } else {
             log.debug("Meter {} was previously created for bp {} on {}", meterId, bpInfo.id(), port);
             SubscriberFlowInfo fi = new SubscriberFlowInfo(null, port,
-                                                           new UniTagInformation.Builder()
-                                                                   .setPonCTag(vlanId).build(),
-                                                           null, meterId,
-                                                           null, bpInfo.id());
-            handleEapol(filterFuture, install, cp, filterBuilder, treatmentBuilder, fi, meterId);
+                    new UniTagInformation.Builder().setPonCTag(vlanId).build(),
+                    null, meterId, null, oltMeterId, null,
+                    bpInfo.id(), null, oltBpInfo.id());
+            handleEapol(filterFuture, install, cp, filterBuilder, treatmentBuilder, fi, meterId, oltMeterId);
             //No need for the future, meter is present.
             return;
         }
-        meterFuture.thenAcceptAsync(result -> {
+    }
+
+    private void createMeterAndProceedEapol(AccessDevicePort port, BandwidthProfileInformation bwpInfo,
+                                            CompletableFuture<ObjectiveError> filterFuture,
+                                            boolean install, ConnectPoint cp,
+                                            DefaultFilteringObjective.Builder filterBuilder,
+                                            TrafficTreatment.Builder treatmentBuilder) {
+        CompletableFuture<Object> meterFuture = new CompletableFuture<>();
+        MeterId meterId = oltMeterService.createMeter(port.deviceId(), bwpInfo, meterFuture);
+        DeviceId deviceId = port.deviceId();
+        meterFuture.thenAccept(result -> {
             //for each pending eapol flow we check if the meter is there.
-            BlockingQueue<SubscriberFlowInfo> queue = pendingEapolForDevice.get(port.deviceId());
-            if (queue != null) {
-                while (true) {
-                    SubscriberFlowInfo fi = queue.remove();
-                    if (fi == null) {
-                        pendingEapolForDevice.replace(port.deviceId(), queue);
-                        break;
-                    }
-                    //TODO this might return the reference and not the actual object
-                    // so it can be actually swapped underneath us.
-                    log.debug("handing pending eapol on {} for {}", fi.getUniPort(), fi);
-                    if (result == null) {
-                        MeterId mId = oltMeterService
-                                .getMeterIdFromBpMapping(port.deviceId(), fi.getUpBpInfo());
-                        if (mId != null) {
-                            log.debug("Meter installation completed for subscriber on {}, " +
-                                            "handling EAPOL trap flow", port);
-                            handleEapol(filterFuture, install, cp, filterBuilder, treatmentBuilder, fi, mId);
+            pendingEapolForDevice.compute(deviceId, (id, queue) -> {
+                if (queue != null && !queue.isEmpty()) {
+                    while (true) {
+                        //TODO this might return the reference and not the actual object
+                        // so it can be actually swapped underneath us.
+                        SubscriberFlowInfo fi = queue.peek();
+                        if (fi == null) {
+                            log.debug("No more subscribers eapol flows on {}", deviceId);
+                            queue = new LinkedBlockingQueue<>();
+                            break;
                         }
-                    } else {
-                        log.warn("Meter installation error while sending EAPOL trap flow to {}. " +
-                                         "Result {} and MeterId {}", port, result,
-                                meterId);
+                        log.debug("handing pending eapol on {} for {}", fi.getUniPort(), fi);
+                        if (result == null) {
+                            MeterId mId = oltMeterService
+                                    .getMeterIdFromBpMapping(port.deviceId(), fi.getUpBpInfo());
+                            MeterId oltMeterId = oltMeterService
+                                    .getMeterIdFromBpMapping(port.deviceId(), fi.getUpOltBpInfo());
+                            if (mId != null && oltMeterId != null) {
+                                log.debug("Meter installation completed for subscriber on {}, " +
+                                                  "handling EAPOL trap flow", port);
+                                fi.setUpMeterId(mId);
+                                fi.setUpOltMeterId(oltMeterId);
+                                handleEapol(filterFuture, install, cp, filterBuilder, treatmentBuilder, fi,
+                                            mId, oltMeterId);
+                                queue.remove(fi);
+                            } else {
+                                log.debug("Not all meters for {} are yet installed for EAPOL meterID {}, " +
+                                                  "oltMeterId {}", fi, meterId, oltMeterId);
+                            }
+                        } else {
+                            log.warn("Meter installation error while sending EAPOL trap flow to {}. " +
+                                             "Result {} and MeterId {}", port, result, meterId);
+                            queue.remove(fi);
+                        }
+                        oltMeterService.removeFromPendingMeters(port.deviceId(), bwpInfo);
                     }
-                    oltMeterService.removeFromPendingMeters(port.deviceId(), bpInfo);
+                } else {
+                    log.info("No pending EAPOLs on {}", port.deviceId());
+                    queue = new LinkedBlockingQueue<>();
                 }
-            } else {
-                log.info("No pending EAPOLs on {}", port.deviceId());
-            }
+                return queue;
+            });
         });
     }
 
@@ -648,7 +689,7 @@
                              boolean install, ConnectPoint cp,
                              DefaultFilteringObjective.Builder filterBuilder,
                              TrafficTreatment.Builder treatmentBuilder,
-                             SubscriberFlowInfo fi, MeterId mId) {
+                             SubscriberFlowInfo fi, MeterId mId, MeterId oltMeterId) {
         log.info("Meter {} for {} on {} exists. {} EAPOL trap flow",
                  mId, fi.getUpBpInfo(), fi.getUniPort(),
                  (install) ? "Installing" : "Removing");
@@ -664,7 +705,7 @@
                 .withMeta(treatmentBuilder
                                   .writeMetadata(createTechProfValueForWm(
                                           fi.getTagInfo().getPonCTag(),
-                                          techProfileId), 0)
+                                          techProfileId, oltMeterId), 0)
                                   .setOutput(PortNumber.CONTROLLER)
                                   .pushVlan()
                                   .setVlanId(fi.getTagInfo().getPonCTag())
@@ -707,9 +748,9 @@
         log.info("{} flows for NNI port {}",
                  install ? "Adding" : "Removing", nniPort);
         processLldpFilteringObjective(nniPort, install);
-        processDhcpFilteringObjectives(nniPort, null, null, install, false, Optional.empty());
-        processIgmpFilteringObjectives(nniPort, null, null, install, false);
-        processPPPoEDFilteringObjectives(nniPort, null, null, install, false);
+        processDhcpFilteringObjectives(nniPort, null, null, null, install, false, Optional.empty());
+        processIgmpFilteringObjectives(nniPort, null, null, null, install, false);
+        processPPPoEDFilteringObjectives(nniPort, null, null, null, install, false);
     }
 
 
@@ -765,13 +806,15 @@
                         upstream ? uplinkPort.number() : subscriberPort.number()), 0)
                 .build();
 
-        return createForwardingObjectiveBuilder(selector, treatment, MIN_PRIORITY);
+        return createForwardingObjectiveBuilder(selector, treatment, MIN_PRIORITY,
+                DefaultAnnotations.builder().build());
     }
 
     @Override
     public ForwardingObjective.Builder createUpBuilder(AccessDevicePort uplinkPort,
                                                        AccessDevicePort subscriberPort,
                                                        MeterId upstreamMeterId,
+                                                       MeterId upstreamOltMeterId,
                                                        UniTagInformation uniTagInformation) {
 
         TrafficSelector selector = DefaultTrafficSelector.builder()
@@ -801,17 +844,26 @@
                 .writeMetadata(createMetadata(uniTagInformation.getPonCTag(),
                         uniTagInformation.getTechnologyProfileId(), uplinkPort.number()), 0L);
 
+        DefaultAnnotations.Builder annotationBuilder = DefaultAnnotations.builder();
+
         if (upstreamMeterId != null) {
             treatmentBuilder.meter(upstreamMeterId);
+            annotationBuilder.set(UPSTREAM_ONU, upstreamMeterId.toString());
+        }
+        if (upstreamOltMeterId != null) {
+            treatmentBuilder.meter(upstreamOltMeterId);
+            annotationBuilder.set(UPSTREAM_OLT, upstreamOltMeterId.toString());
         }
 
-        return createForwardingObjectiveBuilder(selector, treatmentBuilder.build(), MIN_PRIORITY);
+        return createForwardingObjectiveBuilder(selector, treatmentBuilder.build(), MIN_PRIORITY,
+                annotationBuilder.build());
     }
 
     @Override
     public ForwardingObjective.Builder createDownBuilder(AccessDevicePort uplinkPort,
                                                          AccessDevicePort subscriberPort,
                                                          MeterId downstreamMeterId,
+                                                         MeterId downstreamOltMeterId,
                                                          UniTagInformation tagInformation,
                                                          Optional<MacAddress> macAddress) {
 
@@ -854,11 +906,20 @@
             treatmentBuilder.setVlanId(tagInformation.getUniTagMatch());
         }
 
+        DefaultAnnotations.Builder annotationBuilder = DefaultAnnotations.builder();
+
         if (downstreamMeterId != null) {
             treatmentBuilder.meter(downstreamMeterId);
+            annotationBuilder.set(DOWNSTREAM_ONU, downstreamMeterId.toString());
         }
 
-        return createForwardingObjectiveBuilder(selectorBuilder.build(), treatmentBuilder.build(), MIN_PRIORITY);
+        if (downstreamOltMeterId != null) {
+            treatmentBuilder.meter(downstreamOltMeterId);
+            annotationBuilder.set(DOWNSTREAM_OLT, downstreamOltMeterId.toString());
+        }
+
+        return createForwardingObjectiveBuilder(selectorBuilder.build(), treatmentBuilder.build(), MIN_PRIORITY,
+                annotationBuilder.build());
     }
 
     @Override
@@ -868,12 +929,14 @@
 
     private DefaultForwardingObjective.Builder createForwardingObjectiveBuilder(TrafficSelector selector,
                                                                                 TrafficTreatment treatment,
-                                                                                Integer priority) {
+                                                                                Integer priority,
+                                                                                Annotations annotations) {
         return DefaultForwardingObjective.builder()
                 .withFlag(ForwardingObjective.Flag.VERSATILE)
                 .withPriority(priority)
                 .makePermanent()
                 .withSelector(selector)
+                .withAnnotations(annotations)
                 .fromApp(appId)
                 .withTreatment(treatment);
     }
@@ -882,15 +945,24 @@
      * Returns the write metadata value including tech profile reference and innerVlan.
      * For param cVlan, null can be sent
      *
-     * @param cVlan         c (customer) tag of one subscriber
-     * @param techProfileId tech profile id of one subscriber
+     * @param cVlan                 c (customer) tag of one subscriber
+     * @param techProfileId         tech profile id of one subscriber
+     * @param upstreamOltMeterId    upstream meter id for OLT device.
      * @return the write metadata value including tech profile reference and innerVlan
      */
-    private Long createTechProfValueForWm(VlanId cVlan, int techProfileId) {
+    private Long createTechProfValueForWm(VlanId cVlan, int techProfileId, MeterId upstreamOltMeterId) {
+        Long writeMetadata;
+
         if (cVlan == null || VlanId.NONE.equals(cVlan)) {
-            return (long) techProfileId << 32;
+            writeMetadata = (long) techProfileId << 32;
+        } else {
+            writeMetadata = ((long) (cVlan.id()) << 48 | (long) techProfileId << 32);
         }
-        return ((long) (cVlan.id()) << 48 | (long) techProfileId << 32);
+        if (upstreamOltMeterId == null) {
+            return writeMetadata;
+        } else {
+            return writeMetadata | upstreamOltMeterId.id();
+        }
     }
 
     private BandwidthProfileInformation getBandwidthProfileInformation(String bandwidthProfile) {