[VOL-4180] Multi UNI feature implemented to OLT application.

Change-Id: I3d45719ebdce304ba94652ed9de553e40d76a77c

EAPOL flow bug-fixed

review fixes finshed

Multi UNI feature implemented to OLT application.

- It's possible to fetch a meter by annotations. (OltPipeline)
- New meters can be created for bandwidth profiles of OLT device.
- Olt meterId is transported via writeMetadata so that voltha/rw-core can parse it and assign the correct meters to ONU and OLT flows.

Change-Id: Ia6c9909b5f03b0f3fe329bd11580f891bfab3a32
diff --git a/impl/src/main/java/org/opencord/olt/impl/Olt.java b/impl/src/main/java/org/opencord/olt/impl/Olt.java
index 70a3753..f11ac21 100644
--- a/impl/src/main/java/org/opencord/olt/impl/Olt.java
+++ b/impl/src/main/java/org/opencord/olt/impl/Olt.java
@@ -405,6 +405,7 @@
         pendingSubscribersForDevice.computeIfPresent(connectPoint.deviceId(), (id, infos) -> {
             for (SubscriberFlowInfo fi : infos) {
                 if (fi.getUniPort().equals(connectPoint.port())) {
+                    log.debug("Subscriber is already pending, {}", fi);
                     isPending.set(true);
                     break;
                 }
@@ -434,7 +435,7 @@
         public void run() {
             CompletableFuture<ObjectiveError> filterFuture = new CompletableFuture<>();
             oltFlowService.processEapolFilteringObjectives(subscriberPort,
-                                                     defaultBpId, filterFuture,
+                                                     defaultBpId, Optional.empty(), filterFuture,
                                                      VlanId.vlanId(EAPOL_DEFAULT_VLAN),
                                                      false);
             filterFuture.thenAcceptAsync(filterStatus -> {
@@ -511,14 +512,17 @@
             unprovisionVlans(uplinkPort, subscriberPort, uniTag);
 
             // remove eapol with subscriber bandwidth profile
+            Optional<String> upstreamOltBw = uniTag.getUpstreamOltBandwidthProfile() == null ?
+                    Optional.empty() : Optional.of(uniTag.getUpstreamOltBandwidthProfile());
             oltFlowService.processEapolFilteringObjectives(subscriberPort,
                                                            uniTag.getUpstreamBandwidthProfile(),
+                                                           upstreamOltBw,
                                                            null, uniTag.getPonCTag(), false);
 
             if (subscriberPort.port() != null && subscriberPort.isEnabled()) {
                 // reinstall eapol with default bandwidth profile
-                oltFlowService.processEapolFilteringObjectives(subscriberPort, defaultBpId,
-                                                               null, VlanId.vlanId(EAPOL_DEFAULT_VLAN), true);
+                oltFlowService.processEapolFilteringObjectives(subscriberPort, defaultBpId, Optional.empty(),
+                        null, VlanId.vlanId(EAPOL_DEFAULT_VLAN), true);
             } else {
                 log.debug("Port {} is no longer enabled or it's unavailable. Not "
                                   + "reprogramming default eapol flow", connectPoint);
@@ -556,8 +560,8 @@
             //wait until Eapol rule with defaultBpId is removed to install subscriber-based rules
             //install subscriber flows
             CompletableFuture<ObjectiveError> filterFuture = new CompletableFuture<>();
-            oltFlowService.processEapolFilteringObjectives(subscriberPort, defaultBpId,
-                                                           filterFuture, VlanId.vlanId(EAPOL_DEFAULT_VLAN), false);
+            oltFlowService.processEapolFilteringObjectives(subscriberPort, defaultBpId, Optional.empty(),
+                    filterFuture, VlanId.vlanId(EAPOL_DEFAULT_VLAN), false);
             filterFuture.thenAcceptAsync(filterStatus -> {
                 if (filterStatus == null) {
                     provisionUniTagInformation(uplinkPort, subscriberPort, cTag.get(), sTag.get(), tpId.get());
@@ -697,6 +701,10 @@
                 .getMeterIdFromBpMapping(deviceId, uniTag.getUpstreamBandwidthProfile());
         MeterId downstreamMeterId = oltMeterService
                 .getMeterIdFromBpMapping(deviceId, uniTag.getDownstreamBandwidthProfile());
+        MeterId upstreamOltMeterId = oltMeterService
+                .getMeterIdFromBpMapping(deviceId, uniTag.getUpstreamOltBandwidthProfile());
+        MeterId downstreamOltMeterId = oltMeterService
+                .getMeterIdFromBpMapping(deviceId, uniTag.getDownstreamOltBandwidthProfile());
 
         Optional<SubscriberFlowInfo> waitingMacSubFlowInfo =
                 getAndRemoveWaitingMacSubFlowInfoForCTag(new ConnectPoint(deviceId, subscriberPort.number()),
@@ -707,7 +715,7 @@
                     "waitingMacSubFlowInfo:{}", waitingMacSubFlowInfo.get());
             CompletableFuture<ObjectiveError> dhcpFuture = new CompletableFuture<>();
             oltFlowService.processDhcpFilteringObjectives(subscriberPort,
-                    upstreamMeterId, uniTag, false, true, Optional.of(dhcpFuture));
+                    upstreamMeterId, upstreamOltMeterId, uniTag, false, true, Optional.of(dhcpFuture));
             dhcpFuture.thenAcceptAsync(dhcpStatus -> {
                 AccessDeviceEvent.Type type;
                 if (dhcpStatus == null) {
@@ -731,16 +739,19 @@
         }
 
         ForwardingObjective.Builder upFwd =
-                oltFlowService.createUpBuilder(uplink, subscriberPort, upstreamMeterId, uniTag);
+                oltFlowService.createUpBuilder(uplink, subscriberPort, upstreamMeterId, upstreamOltMeterId, uniTag);
 
         Optional<MacAddress> macAddress = getMacAddress(deviceId, subscriberPort, uniTag);
         ForwardingObjective.Builder downFwd =
-                oltFlowService.createDownBuilder(uplink, subscriberPort, downstreamMeterId, uniTag, macAddress);
+                oltFlowService.createDownBuilder(uplink, subscriberPort, downstreamMeterId, downstreamOltMeterId,
+                        uniTag, macAddress);
 
-        oltFlowService.processIgmpFilteringObjectives(subscriberPort, upstreamMeterId, uniTag, false, true);
+        oltFlowService.processIgmpFilteringObjectives(subscriberPort, upstreamMeterId, upstreamOltMeterId, uniTag,
+                false, true);
         oltFlowService.processDhcpFilteringObjectives(subscriberPort,
-                upstreamMeterId, uniTag, false, true, Optional.empty());
-        oltFlowService.processPPPoEDFilteringObjectives(subscriberPort, upstreamMeterId, uniTag, false, true);
+                upstreamMeterId, upstreamOltMeterId, uniTag, false, true, Optional.empty());
+        oltFlowService.processPPPoEDFilteringObjectives(subscriberPort, upstreamMeterId, upstreamOltMeterId, uniTag,
+                false, true);
 
         flowObjectiveService.forward(deviceId, upFwd.remove(new ObjectiveContext() {
             @Override
@@ -885,6 +896,10 @@
                 getBandwidthProfileInformation(tagInfo.getUpstreamBandwidthProfile());
         BandwidthProfileInformation downstreamBpInfo =
                 getBandwidthProfileInformation(tagInfo.getDownstreamBandwidthProfile());
+        BandwidthProfileInformation upstreamOltBpInfo =
+                getBandwidthProfileInformation(tagInfo.getUpstreamOltBandwidthProfile());
+        BandwidthProfileInformation downstreamOltBpInfo =
+                getBandwidthProfileInformation(tagInfo.getDownstreamOltBandwidthProfile());
         if (upstreamBpInfo == null) {
             log.warn("No meter installed since no Upstream BW Profile definition found for "
                              + "ctag {} stag {} tpId {} on {}",
@@ -898,17 +913,45 @@
                      tagInfo.getTechnologyProfileId(), subscriberPort);
             return;
         }
+        if ((upstreamOltBpInfo != null && downstreamOltBpInfo == null) ||
+                (upstreamOltBpInfo == null && downstreamOltBpInfo != null)) {
+            log.warn("No meter installed since only one olt BW Profile definition found for "
+                            + "ctag {} stag {} tpId {} and Device/port: {}:{}",
+                    tagInfo.getPonCTag(), tagInfo.getPonSTag(),
+                    tagInfo.getTechnologyProfileId(), deviceId,
+                    subscriberPort);
+            return;
+        }
+
+        MeterId upOltMeterId = null;
+        MeterId downOltMeterId = null;
 
         // check for meterIds for the upstream and downstream bandwidth profiles
         MeterId upMeterId = oltMeterService
                 .getMeterIdFromBpMapping(deviceId, upstreamBpInfo.id());
         MeterId downMeterId = oltMeterService
                 .getMeterIdFromBpMapping(deviceId, downstreamBpInfo.id());
-        SubscriberFlowInfo fi = new SubscriberFlowInfo(uplinkPort, subscriberPort,
-                                                       tagInfo, downMeterId, upMeterId,
-                                                       downstreamBpInfo.id(), upstreamBpInfo.id());
 
-        if (upMeterId != null && downMeterId != null) {
+        if (upstreamOltBpInfo != null) {
+            // Multi UNI service
+            upOltMeterId = oltMeterService
+                    .getMeterIdFromBpMapping(deviceId, upstreamOltBpInfo.id());
+            downOltMeterId = oltMeterService
+                    .getMeterIdFromBpMapping(deviceId, downstreamOltBpInfo.id());
+        } else {
+            // NOT Multi UNI service
+            log.debug("OLT bandwidth profiles fields are set to ONU bandwidth profiles");
+            upstreamOltBpInfo = upstreamBpInfo;
+            downstreamOltBpInfo = downstreamBpInfo;
+            upOltMeterId = upMeterId;
+            downOltMeterId = downMeterId;
+        }
+        SubscriberFlowInfo fi = new SubscriberFlowInfo(uplinkPort, subscriberPort,
+                                                       tagInfo, downMeterId, upMeterId, downOltMeterId, upOltMeterId,
+                                                       downstreamBpInfo.id(), upstreamBpInfo.id(),
+                                                       downstreamOltBpInfo.id(), upstreamOltBpInfo.id());
+
+        if (upMeterId != null && downMeterId != null && upOltMeterId != null && downOltMeterId != null) {
             log.debug("Meters are existing for upstream {} and downstream {} on {}",
                     upstreamBpInfo.id(), downstreamBpInfo.id(), subscriberPort);
             handleSubFlowsWithMeters(fi);
@@ -925,21 +968,37 @@
                 return queue;
             });
 
+            List<BandwidthProfileInformation> bws = new ArrayList<>();
             // queue up the meters to be created
             if (upMeterId == null) {
                 log.debug("Missing meter for upstream {} on {}", upstreamBpInfo.id(), subscriberPort);
-                checkAndCreateDevMeter(deviceId, upstreamBpInfo);
+                bws.add(upstreamBpInfo);
             }
             if (downMeterId == null) {
                 log.debug("Missing meter for downstream {} on {}", downstreamBpInfo.id(), subscriberPort);
-                checkAndCreateDevMeter(deviceId, downstreamBpInfo);
+                bws.add(downstreamBpInfo);
             }
+            if (upOltMeterId == null) {
+                log.debug("Missing meter for upstreamOlt {} on {}", upstreamOltBpInfo.id(), subscriberPort);
+                bws.add(upstreamOltBpInfo);
+            }
+            if (downOltMeterId == null) {
+                log.debug("Missing meter for downstreamOlt {} on {}", downstreamOltBpInfo.id(), subscriberPort);
+                bws.add(downstreamOltBpInfo);
+            }
+            bws.stream().distinct().forEach(bw -> checkAndCreateDevMeter(deviceId, bw));
         }
     }
 
     private void checkAndCreateDevMeter(DeviceId deviceId, BandwidthProfileInformation bwpInfo) {
+        log.debug("Checking and Creating Meter with {} on {}", bwpInfo, deviceId);
+        if (bwpInfo == null) {
+            log.error("Can't create meter. Bandwidth profile is null for device : {}", deviceId);
+            return;
+        }
         //If false the meter is already being installed, skipping installation
         if (!oltMeterService.checkAndAddPendingMeter(deviceId, bwpInfo)) {
+            log.debug("Meter is already being installed on {} for {}", deviceId, bwpInfo);
             return;
         }
         createMeter(deviceId, bwpInfo);
@@ -953,52 +1012,69 @@
                                                       meterFuture);
 
         meterFuture.thenAcceptAsync(result -> {
-            BlockingQueue<SubscriberFlowInfo> queue = pendingSubscribersForDevice.get(deviceId);
-            // iterate through the subscribers on hold
-            if (queue != null) {
-                while (true) {
-                    //TODO this might return the reference and not the actual object so
-                    // it can be actually swapped underneath us.
-                    SubscriberFlowInfo fi = queue.peek();
-                    if (fi == null) {
-                        log.debug("No more subscribers pending on {}", deviceId);
-                        pendingSubscribersForDevice.replace(deviceId, queue);
-                        break;
-                    }
-                    if (result == null) {
-                        // meter install sent to device
-                        log.debug("Meter {} installed for bw {} on {}", meterId, bwpInfo, deviceId);
+            log.debug("Meter Future for {} has completed", meterId);
+            pendingSubscribersForDevice.compute(deviceId, (id, queue) -> {
+                // iterate through the subscribers on hold
+                if (queue != null && !queue.isEmpty()) {
+                    while (true) {
+                        //TODO this might return the reference and not the actual object so
+                        // it can be actually swapped underneath us.
+                        SubscriberFlowInfo fi = queue.peek();
+                        if (fi == null) {
+                            log.debug("No more subscribers pending on {}", deviceId);
+                            queue = new LinkedBlockingQueue<>();
+                            break;
+                        }
+                        if (result == null) {
+                            // meter install sent to device
+                            log.debug("Meter {} installed for bw {} on {}", meterId, bwpInfo, deviceId);
 
-                        MeterId upMeterId = oltMeterService
-                                .getMeterIdFromBpMapping(deviceId, fi.getUpBpInfo());
-                        MeterId downMeterId = oltMeterService
-                                .getMeterIdFromBpMapping(deviceId, fi.getDownBpInfo());
-                        if (upMeterId != null && downMeterId != null) {
-                            log.debug("Provisioning subscriber after meter {} " +
-                                              "installation and both meters are present " +
-                                              "upstream {} and downstream {} on {}",
-                                      meterId, upMeterId, downMeterId, fi.getUniPort());
-                            // put in the meterIds  because when fi was first
-                            // created there may or may not have been a meterId
-                            // depending on whether the meter was created or
-                            // not at that time.
-                            fi.setUpMeterId(upMeterId);
-                            fi.setDownMeterId(downMeterId);
-                            handleSubFlowsWithMeters(fi);
+                            MeterId upMeterId = oltMeterService
+                                    .getMeterIdFromBpMapping(deviceId, fi.getUpBpInfo());
+                            MeterId downMeterId = oltMeterService
+                                    .getMeterIdFromBpMapping(deviceId, fi.getDownBpInfo());
+                            MeterId upOltMeterId = oltMeterService
+                                    .getMeterIdFromBpMapping(deviceId, fi.getUpOltBpInfo());
+                            MeterId downOltMeterId = oltMeterService
+                                    .getMeterIdFromBpMapping(deviceId, fi.getDownOltBpInfo());
+                            if (upMeterId != null && downMeterId != null &&
+                                    upOltMeterId != null && downOltMeterId != null) {
+                                log.debug("Provisioning subscriber after meter {} " +
+                                                  "installation and all meters are present " +
+                                                  "upstream {} , downstream {} , oltUpstream {} " +
+                                                  "and oltDownstream {} on {}",
+                                          meterId, upMeterId, downMeterId, upOltMeterId,
+                                          downOltMeterId, fi.getUniPort());
+                                // put in the meterIds  because when fi was first
+                                // created there may or may not have been a meterId
+                                // depending on whether the meter was created or
+                                // not at that time.
+                                fi.setUpMeterId(upMeterId);
+                                fi.setDownMeterId(downMeterId);
+                                fi.setUpOltMeterId(upOltMeterId);
+                                fi.setDownOltMeterId(downOltMeterId);
+                                handleSubFlowsWithMeters(fi);
+                                queue.remove(fi);
+                            } else {
+                                log.debug("Not all meters for {} are yet installed up {}, " +
+                                                 "down {}, oltUp {}, oltDown {}", fi, upMeterId,
+                                         downMeterId, upOltMeterId, downOltMeterId);
+                            }
+                            oltMeterService.removeFromPendingMeters(deviceId, bwpInfo);
+                        } else {
+                            // meter install failed
+                            log.error("Addition of subscriber {} on {} failed due to meter " +
+                                              "{} with result {}", fi, fi.getUniPort(), meterId, result);
+                            oltMeterService.removeFromPendingMeters(deviceId, bwpInfo);
                             queue.remove(fi);
                         }
-                        oltMeterService.removeFromPendingMeters(deviceId, bwpInfo);
-                    } else {
-                        // meter install failed
-                        log.error("Addition of subscriber {} on {} failed due to meter " +
-                                          "{} with result {}", fi, fi.getUniPort(), meterId, result);
-                        queue.remove(fi);
-                        oltMeterService.removeFromPendingMeters(deviceId, bwpInfo);
                     }
+                } else {
+                    log.info("No pending subscribers on {}", deviceId);
+                    queue = new LinkedBlockingQueue<>();
                 }
-            } else {
-                log.info("No pending subscribers on {}", deviceId);
-            }
+                return queue;
+            });
         });
 
     }
@@ -1027,7 +1103,8 @@
 
                 CompletableFuture<ObjectiveError> dhcpFuture = new CompletableFuture<>();
                 oltFlowService.processDhcpFilteringObjectives(subscriberFlowInfo.getUniPort(),
-                        subscriberFlowInfo.getUpId(), tagInfo, true, true, Optional.of(dhcpFuture));
+                        subscriberFlowInfo.getUpId(), subscriberFlowInfo.getUpOltId(),
+                        tagInfo, true, true, Optional.of(dhcpFuture));
                 dhcpFuture.thenAcceptAsync(dhcpStatus -> {
                     if (dhcpStatus != null) {
                         log.error("Dhcp Objective failed for {}: {}", subscriberFlowInfo, dhcpStatus);
@@ -1063,8 +1140,8 @@
         CompletableFuture<ObjectiveError> downFuture = new CompletableFuture<>();
 
         ForwardingObjective.Builder upFwd =
-                oltFlowService.createUpBuilder(subscriberFlowInfo.getNniPort(), uniPort,
-                                               subscriberFlowInfo.getUpId(), subscriberFlowInfo.getTagInfo());
+                oltFlowService.createUpBuilder(subscriberFlowInfo.getNniPort(), uniPort, subscriberFlowInfo.getUpId(),
+                        subscriberFlowInfo.getUpOltId(), subscriberFlowInfo.getTagInfo());
         flowObjectiveService.forward(subscriberFlowInfo.getDevId(), upFwd.add(new ObjectiveContext() {
             @Override
             public void onSuccess(Objective objective) {
@@ -1080,7 +1157,8 @@
 
         ForwardingObjective.Builder downFwd =
                 oltFlowService.createDownBuilder(subscriberFlowInfo.getNniPort(), uniPort,
-                        subscriberFlowInfo.getDownId(), subscriberFlowInfo.getTagInfo(), macAddress);
+                        subscriberFlowInfo.getDownId(), subscriberFlowInfo.getDownOltId(),
+                        subscriberFlowInfo.getTagInfo(), macAddress);
         flowObjectiveService.forward(subscriberFlowInfo.getDevId(), downFwd.add(new ObjectiveContext() {
             @Override
             public void onSuccess(Objective objective) {
@@ -1106,20 +1184,23 @@
                 type = AccessDeviceEvent.Type.SUBSCRIBER_UNI_TAG_REGISTRATION_FAILED;
             } else {
                 log.debug("Upstream and downstream data plane flows are installed successfully on {}", uniPort);
+                Optional<String> upstreamOltBw = tagInfo.getUpstreamOltBandwidthProfile() == null ?
+                        Optional.empty() : Optional.of(tagInfo.getUpstreamOltBandwidthProfile());
                 oltFlowService.processEapolFilteringObjectives(uniPort, tagInfo.getUpstreamBandwidthProfile(),
-                                                               null, tagInfo.getPonCTag(), true);
-
+                                                               upstreamOltBw, null,
+                        tagInfo.getPonCTag(), true);
 
                 if (!tagInfo.getEnableMacLearning()) {
                     oltFlowService.processDhcpFilteringObjectives(uniPort, subscriberFlowInfo.getUpId(),
-                                                                  tagInfo, true, true, Optional.empty());
+                            subscriberFlowInfo.getUpOltId(), tagInfo, true, true, Optional.empty());
                 }
 
                 oltFlowService.processIgmpFilteringObjectives(uniPort, subscriberFlowInfo.getUpId(),
-                                                              tagInfo, true, true);
+                        subscriberFlowInfo.getUpOltId(), tagInfo, true, true);
 
                 oltFlowService.processPPPoEDFilteringObjectives(uniPort, subscriberFlowInfo.getUpId(),
-                                                                tagInfo, true, true);
+                        subscriberFlowInfo.getUpOltId(), tagInfo, true, true);
+
                 updateProgrammedSubscriber(uniPort, tagInfo, true);
             }
             post(new AccessDeviceEvent(type, subscriberFlowInfo.getDevId(), uniPort.port(),
@@ -1228,8 +1309,8 @@
                     AccessDevicePort port = new AccessDevicePort(p, AccessDevicePort.Type.UNI);
                     if (!programmedSubs.containsKey(new ConnectPoint(dev.id(), p.number()))) {
                         log.info("Creating Eapol on {}", port);
-                        oltFlowService.processEapolFilteringObjectives(port, defaultBpId, null,
-                                                                       VlanId.vlanId(EAPOL_DEFAULT_VLAN), true);
+                        oltFlowService.processEapolFilteringObjectives(port, defaultBpId, Optional.empty(),
+                                null, VlanId.vlanId(EAPOL_DEFAULT_VLAN), true);
                     } else {
                         log.debug("Subscriber Eapol on {} is already provisioned, not installing default", port);
                     }
@@ -1465,7 +1546,7 @@
 
                             if (port.isEnabled() && !port.number().equals(PortNumber.LOCAL)) {
                                 log.info("eapol will be sent for port added {}", port);
-                                oltFlowService.processEapolFilteringObjectives(port, defaultBpId,
+                                oltFlowService.processEapolFilteringObjectives(port, defaultBpId, Optional.empty(),
                                                                                null,
                                                                                VlanId.vlanId(EAPOL_DEFAULT_VLAN),
                                                                                true);
@@ -1487,9 +1568,8 @@
                             if (uniTagInformationSet == null || uniTagInformationSet.isEmpty()) {
                                 log.info("No subscriber provisioned on port {} in PORT_REMOVED event, " +
                                                  "removing default EAPOL flow", port);
-                                oltFlowService.processEapolFilteringObjectives(port, defaultBpId, null,
-                                                                               VlanId.vlanId(EAPOL_DEFAULT_VLAN),
-                                                                               false);
+                                oltFlowService.processEapolFilteringObjectives(port, defaultBpId, Optional.empty(),
+                                        null, VlanId.vlanId(EAPOL_DEFAULT_VLAN), false);
                             } else {
                                 removeSubscriber(port);
                             }
@@ -1516,17 +1596,17 @@
                             if (!port.number().equals(PortNumber.LOCAL)) {
                                 log.info("eapol will be {} updated for {} with default vlan {}",
                                          (port.isEnabled()) ? "added" : "removed", port, EAPOL_DEFAULT_VLAN);
-                                oltFlowService.processEapolFilteringObjectives(port, defaultBpId, null,
-                                                                               VlanId.vlanId(EAPOL_DEFAULT_VLAN),
-                                                                               port.isEnabled());
+                                oltFlowService.processEapolFilteringObjectives(port, defaultBpId, Optional.empty(),
+                                        null, VlanId.vlanId(EAPOL_DEFAULT_VLAN), port.isEnabled());
                             }
                         } else {
                             log.info("eapol will be {} updated for {}", (port.isEnabled()) ? "added" : "removed",
                                      port);
                             for (UniTagInformation uniTag : uniTagInformationSet) {
                                 oltFlowService.processEapolFilteringObjectives(port,
-                                        uniTag.getUpstreamBandwidthProfile(), null,
-                                        uniTag.getPonCTag(), port.isEnabled());
+                                        uniTag.getUpstreamBandwidthProfile(),
+                                        Optional.of(uniTag.getUpstreamOltBandwidthProfile()),
+                                        null, uniTag.getPonCTag(), port.isEnabled());
                             }
                         }
                         if (port.isEnabled()) {