Both the Upstream and Downstream Bandwidth Profiles can be 'named' and Referenced from a Subscriber Record in the SADIS DB on ONOS.

When the vOLT Appplication uses a Subscriber Record for OpenFlow message generation to VOLTHA the vOLT Application Must read the Bandwidth Profile name and be able to retrieve the Profile from a Database of Bandwidth Profiles. The Named Profile is converted to a Meter and Meter Band(s) and configured on the VOLTHA Instance, and the Flows will reference the Meter.

Note the code should be written to follow the SADIS DB model where it is designed to reference an external database but the DB records can be cached locally.

The Upstream Bandwidth Profile will consist of the following optional components:

EIR (Bits/Sec)

EBS (Bytes)

CIR (Bits/Sec)

CBS (Bits/Sec)

AIR (Bits/Sec)

The Downstream Bandwidth Profile will consist of the following optional components:

EIR (Bits/Sec)

EBS (Bytes)

CIR (Bits/Sec)

CBS (Bits/Sec)

 Each bandwidth component will be interpreted as defined in the Technology Profile Whitepaper ([^vOLTHA_Access_Tech_AugmentationV0.6.pdf]

)

Change-Id: Ie4edf4e9f27e9b5b9a84e6c733dc4f283a9996a7
diff --git a/app/src/main/java/org/opencord/olt/impl/Olt.java b/app/src/main/java/org/opencord/olt/impl/Olt.java
index e2fdc55..a611723 100644
--- a/app/src/main/java/org/opencord/olt/impl/Olt.java
+++ b/app/src/main/java/org/opencord/olt/impl/Olt.java
@@ -80,15 +80,27 @@
 import org.onosproject.store.service.ConsistentMultimap;
 import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.StorageService;
+import org.onosproject.net.meter.Band;
+import org.onosproject.net.meter.DefaultBand;
+import org.onosproject.net.meter.DefaultMeterRequest;
+import org.onosproject.net.meter.Meter;
+import org.onosproject.net.meter.MeterService;
+import org.onosproject.net.meter.MeterListener;
+import org.onosproject.net.meter.MeterRequest;
+import org.onosproject.net.meter.MeterId;
+import org.onosproject.net.meter.MeterEvent;
 import org.opencord.olt.AccessDeviceEvent;
 import org.opencord.olt.AccessDeviceListener;
 import org.opencord.olt.AccessDeviceService;
 import org.opencord.olt.AccessSubscriberId;
+import org.opencord.sadis.BandwidthProfileInformation;
+import org.opencord.sadis.BaseInformationService;
+import org.opencord.sadis.SadisService;
 import org.opencord.sadis.SubscriberAndDeviceInformation;
-import org.opencord.sadis.SubscriberAndDeviceInformationService;
 import org.osgi.service.component.ComponentContext;
 import org.slf4j.Logger;
 
+import java.util.HashMap;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
@@ -124,7 +136,10 @@
     protected ComponentConfigService componentConfigService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected SubscriberAndDeviceInformationService subsService;
+    protected SadisService sadisService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected MeterService meterService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected StorageService storageService;
@@ -149,13 +164,23 @@
             label = "Create IGMP Flow rules when a subscriber is provisioned")
     protected boolean enableIgmpOnProvisioning = false;
 
+    // needed because no implementation for meter statistics in Voltha yet
+    @Property(name = "deleteMeters", boolValue = false,
+            label = "Deleting Meters based on flow count statistics")
+    protected boolean deleteMeters = false;
+
     private final DeviceListener deviceListener = new InternalDeviceListener();
+    private final MeterListener meterListener = new InternalMeterListener();
 
     private ApplicationId appId;
+    protected BaseInformationService<SubscriberAndDeviceInformation> subsService;
+    private BaseInformationService<BandwidthProfileInformation> bpService;
 
-    private ExecutorService oltInstallers = Executors
-            .newFixedThreadPool(4, groupedThreads("onos/olt-service",
-                                                  "olt-installer-%d"));
+    private Map<String, MeterId> bpInfoToMeter = new HashMap<>();
+
+    private ExecutorService oltInstallers = Executors.newFixedThreadPool(4,
+            groupedThreads("onos/olt-service",
+                    "olt-installer-%d"));
 
     private ConsistentMultimap<ConnectPoint, Map.Entry<VlanId, VlanId>> additionalVlans;
 
@@ -173,6 +198,9 @@
 
         eventDispatcher.addSink(AccessDeviceEvent.class, listenerRegistry);
 
+        subsService = sadisService.getSubscriberInfoService();
+        bpService = sadisService.getBandwidthProfileService();
+
         // look for all provisioned devices in Sadis and create EAPOL flows for the
         // UNI ports
         Iterable<Device> devices = deviceService.getDevices();
@@ -187,6 +215,7 @@
                 .build();
 
         deviceService.addListener(deviceListener);
+        meterService.addListener(meterListener);
 
         log.info("Started with Application ID {}", appId.id());
     }
@@ -195,6 +224,7 @@
     public void deactivate() {
         componentConfigService.unregisterProperties(getClass(), false);
         deviceService.removeListener(deviceListener);
+        meterService.removeListener(meterListener);
         eventDispatcher.removeSink(AccessDeviceEvent.class);
         log.info("Stopped");
     }
@@ -230,6 +260,11 @@
             log.info("DHCP Settings [enableDhcpOnProvisioning: {}, enableDhcpV4: {}, enableDhcpV6: {}]",
                      enableDhcpOnProvisioning, enableDhcpV4, enableDhcpV6);
 
+            Boolean d = Tools.isPropertyEnabled(properties, "deleteMeters");
+            if (d != null) {
+                deleteMeters = d;
+            }
+
         } catch (Exception e) {
             defaultVlan = DEFAULT_VLAN;
         }
@@ -255,12 +290,11 @@
 
         if (enableDhcpOnProvisioning) {
             processDhcpFilteringObjectives(port.deviceId(), port.port(), true,
-                                           true);
+                    true);
         }
         log.info("Programming vlans for subscriber: {}", sub);
         Optional<VlanId> defaultVlan = Optional.empty();
-        provisionVlans(port.deviceId(), uplinkPort.number(), port.port(),
-                       sub.cTag(), sub.sTag(), defaultVlan);
+        provisionVlans(port, uplinkPort.number(), defaultVlan, sub);
 
         if (enableIgmpOnProvisioning) {
             processIgmpFilteringObjectives(port.deviceId(), port.port(), true);
@@ -289,13 +323,12 @@
 
         if (enableDhcpOnProvisioning) {
             processDhcpFilteringObjectives(port.deviceId(), port.port(), false,
-                                           true);
+                    true);
         }
 
         log.info("Removing programmed vlans for subscriber: {}", subscriber);
         Optional<VlanId> defaultVlan = Optional.empty();
-        unprovisionSubscriber(port.deviceId(), uplinkPort.number(), port.port(),
-                              subscriber.cTag(), subscriber.sTag(), defaultVlan);
+        unprovisionSubscriber(port.deviceId(), uplinkPort.number(), port.port(), subscriber, defaultVlan);
 
         if (enableIgmpOnProvisioning) {
             processIgmpFilteringObjectives(port.deviceId(), port.port(), false);
@@ -438,19 +471,32 @@
         return null;
     }
 
+    private BandwidthProfileInformation getBandwidthProfileInformation(String bandwidthProfile) {
+        if (bandwidthProfile == null) {
+            return null;
+        }
+        return bpService.get(bandwidthProfile);
+    }
+
     private void unprovisionSubscriber(DeviceId deviceId, PortNumber uplink,
-                                       PortNumber subscriberPort, VlanId cVlan,
-                                       VlanId sVlan, Optional<VlanId> defaultVlan) {
+                                       PortNumber subscriberPort, SubscriberAndDeviceInformation subscriber,
+                                       Optional<VlanId> defaultVlan) {
 
         CompletableFuture<ObjectiveError> downFuture = new CompletableFuture();
         CompletableFuture<ObjectiveError> upFuture = new CompletableFuture();
 
+        VlanId deviceVlan = subscriber.sTag();
+        VlanId subscriberVlan = subscriber.cTag();
+
+        MeterId upstreamMeterId = bpInfoToMeter.get(subscriber.upstreamBandwidthProfile());
+        MeterId downstreamMeterId = bpInfoToMeter.get(subscriber.downstreamBandwidthProfile());
+
         ForwardingObjective.Builder upFwd = upBuilder(uplink, subscriberPort,
-                                                      cVlan, sVlan,
-                                                      defaultVlan);
+                subscriberVlan, deviceVlan,
+                defaultVlan, upstreamMeterId, subscriber.technologyProfileId());
         ForwardingObjective.Builder downFwd = downBuilder(uplink, subscriberPort,
-                                                          cVlan, sVlan,
-                                                          defaultVlan);
+                subscriberVlan, deviceVlan,
+                defaultVlan, downstreamMeterId, subscriber.technologyProfileId());
 
         flowObjectiveService.forward(deviceId, upFwd.remove(new ObjectiveContext() {
             @Override
@@ -479,38 +525,49 @@
         upFuture.thenAcceptBothAsync(downFuture, (upStatus, downStatus) -> {
             if (upStatus == null && downStatus == null) {
                 post(new AccessDeviceEvent(AccessDeviceEvent.Type.SUBSCRIBER_UNREGISTERED,
-                                           deviceId,
-                                           sVlan,
-                                           cVlan));
+                        deviceId,
+                        deviceVlan,
+                        subscriberVlan));
             } else if (downStatus != null) {
                 log.error("Subscriber with vlan {} on device {} " +
-                                  "on port {} failed downstream uninstallation: {}",
-                          cVlan, deviceId, subscriberPort, downStatus);
+                                "on port {} failed downstream uninstallation: {}",
+                        subscriberVlan, deviceId, subscriberPort, downStatus);
             } else if (upStatus != null) {
                 log.error("Subscriber with vlan {} on device {} " +
-                                  "on port {} failed upstream uninstallation: {}",
-                          cVlan, deviceId, subscriberPort, upStatus);
+                                "on port {} failed upstream uninstallation: {}",
+                        subscriberVlan, deviceId, subscriberPort, upStatus);
             }
         }, oltInstallers);
 
     }
 
-    private void provisionVlans(DeviceId deviceId, PortNumber uplinkPort,
-                                PortNumber subscriberPort,
-                                VlanId subscriberVlan, VlanId deviceVlan,
-                                Optional<VlanId> defaultVlan) {
+    private void provisionVlans(ConnectPoint port, PortNumber uplinkPort, Optional<VlanId> defaultVlan,
+                                SubscriberAndDeviceInformation sub) {
+
+        log.info("Provisioning vlans...");
+
+        DeviceId deviceId = port.deviceId();
+        PortNumber subscriberPort = port.port();
+        VlanId deviceVlan = sub.sTag();
+        VlanId subscriberVlan = sub.cTag();
+        int techProfId = sub.technologyProfileId();
+
+        BandwidthProfileInformation upstreamBpInfo = getBandwidthProfileInformation(sub.upstreamBandwidthProfile());
+        BandwidthProfileInformation downstreamBpInfo = getBandwidthProfileInformation(sub.downstreamBandwidthProfile());
+
+        MeterId usptreamMeterId = createMeter(deviceId, upstreamBpInfo);
+        MeterId downstreamMeterId = createMeter(deviceId, downstreamBpInfo);
 
         CompletableFuture<ObjectiveError> downFuture = new CompletableFuture();
         CompletableFuture<ObjectiveError> upFuture = new CompletableFuture();
 
         ForwardingObjective.Builder upFwd = upBuilder(uplinkPort, subscriberPort,
-                                                      subscriberVlan, deviceVlan,
-                                                      defaultVlan);
-
+                subscriberVlan, deviceVlan,
+                defaultVlan, usptreamMeterId, techProfId);
 
         ForwardingObjective.Builder downFwd = downBuilder(uplinkPort, subscriberPort,
-                                                          subscriberVlan, deviceVlan,
-                                                          defaultVlan);
+                subscriberVlan, deviceVlan,
+                defaultVlan, downstreamMeterId, techProfId);
 
         flowObjectiveService.forward(deviceId, upFwd.add(new ObjectiveContext() {
             @Override
@@ -539,39 +596,97 @@
         upFuture.thenAcceptBothAsync(downFuture, (upStatus, downStatus) -> {
             if (upStatus == null && downStatus == null) {
                 post(new AccessDeviceEvent(AccessDeviceEvent.Type.SUBSCRIBER_REGISTERED,
-                                           deviceId,
-                                           deviceVlan,
-                                           subscriberVlan));
+                        deviceId,
+                        deviceVlan,
+                        subscriberVlan));
 
             } else if (downStatus != null) {
                 log.error("Subscriber with vlan {} on device {} " +
-                                  "on port {} failed downstream installation: {}",
-                          subscriberVlan, deviceId, subscriberPort, downStatus);
+                                "on port {} failed downstream installation: {}",
+                        subscriberVlan, deviceId, subscriberPort, downStatus);
             } else if (upStatus != null) {
                 log.error("Subscriber with vlan {} on device {} " +
-                                  "on port {} failed upstream installation: {}",
-                          subscriberVlan, deviceId, subscriberPort, upStatus);
+                                "on port {} failed upstream installation: {}",
+                        subscriberVlan, deviceId, subscriberPort, upStatus);
             }
         }, oltInstallers);
 
     }
 
+    private MeterId createMeter(DeviceId deviceId, BandwidthProfileInformation bpInfo) {
+
+        if (bpInfo == null) {
+            log.warn("Bandwidth profile information is not found");
+            return null;
+        }
+
+        MeterId meterId = bpInfoToMeter.get(bpInfo.id());
+        if (meterId != null) {
+            log.info("Meter is already added. MeterId {}", meterId);
+            return meterId;
+        }
+
+        List<Band> meterBands = createMeterBands(bpInfo);
+
+        MeterRequest meterRequest = DefaultMeterRequest.builder()
+                .withBands(meterBands)
+                .withUnit(Meter.Unit.KB_PER_SEC)
+                .forDevice(deviceId)
+                .fromApp(appId)
+                .burst()
+                .add();
+
+        Meter meter = meterService.submit(meterRequest);
+        bpInfoToMeter.put(bpInfo.id(), meter.id());
+        log.info("Meter is created. Meter Id {}", meter.id());
+        return meter.id();
+    }
+
+    private List<Band> createMeterBands(BandwidthProfileInformation bpInfo) {
+        List<Band> meterBands = new ArrayList<>();
+
+        meterBands.add(createMeterBand(bpInfo.committedInformationRate(), bpInfo.committedBurstSize()));
+        meterBands.add(createMeterBand(bpInfo.exceededInformationRate(), bpInfo.exceededBurstSize()));
+
+        if (bpInfo.assuredInformationRate() != 0) {
+            meterBands.add(createMeterBand(bpInfo.assuredInformationRate(), 0L));
+        }
+        return meterBands;
+    }
+
+    private Band createMeterBand(long rate, Long burst) {
+        return DefaultBand.builder()
+                .withRate(rate) //already Kbps
+                .burstSize(burst) // already Kbits
+                .ofType(Band.Type.DROP) // no matter
+                .build();
+    }
+
     private ForwardingObjective.Builder downBuilder(PortNumber uplinkPort,
                                                     PortNumber subscriberPort,
                                                     VlanId subscriberVlan,
                                                     VlanId deviceVlan,
-                                                    Optional<VlanId> defaultVlan) {
+                                                    Optional<VlanId> defaultVlan,
+                                                    MeterId meterId,
+                                                    int techProfId) {
         TrafficSelector downstream = DefaultTrafficSelector.builder()
                 .matchVlanId(deviceVlan)
                 .matchInPort(uplinkPort)
                 .matchInnerVlanId(subscriberVlan)
                 .build();
 
-        TrafficTreatment downstreamTreatment = DefaultTrafficTreatment.builder()
+        TrafficTreatment.Builder downstreamTreatmentBuilder = DefaultTrafficTreatment.builder()
                 .popVlan()
                 .setVlanId(defaultVlan.orElse(VlanId.vlanId((short) this.defaultVlan)))
-                .setOutput(subscriberPort)
-                .build();
+                .setOutput(subscriberPort);
+
+        if (meterId != null) {
+            downstreamTreatmentBuilder.meter(meterId);
+        }
+
+        if (techProfId != -1) {
+            downstreamTreatmentBuilder.transition(techProfId);
+        }
 
         return DefaultForwardingObjective.builder()
                 .withFlag(ForwardingObjective.Flag.VERSATILE)
@@ -579,27 +694,45 @@
                 .makePermanent()
                 .withSelector(downstream)
                 .fromApp(appId)
-                .withTreatment(downstreamTreatment);
+                .withTreatment(downstreamTreatmentBuilder.build());
     }
 
     private ForwardingObjective.Builder upBuilder(PortNumber uplinkPort,
                                                   PortNumber subscriberPort,
                                                   VlanId subscriberVlan,
                                                   VlanId deviceVlan,
-                                                  Optional<VlanId> defaultVlan) {
+                                                  Optional<VlanId> defaultVlan,
+                                                  MeterId meterId,
+                                                  int technologyProfileId) {
+
+
+        VlanId dVlan = defaultVlan.orElse(VlanId.vlanId((short) this.defaultVlan));
+
+        if (subscriberVlan.toShort() == 4096) {
+            dVlan = subscriberVlan;
+        }
+
         TrafficSelector upstream = DefaultTrafficSelector.builder()
-                .matchVlanId(defaultVlan.orElse(VlanId.vlanId((short) this.defaultVlan)))
+                .matchVlanId(dVlan)
                 .matchInPort(subscriberPort)
                 .build();
 
 
-        TrafficTreatment upstreamTreatment = DefaultTrafficTreatment.builder()
+        TrafficTreatment.Builder upstreamTreatmentBuilder = DefaultTrafficTreatment.builder()
                 .pushVlan()
                 .setVlanId(subscriberVlan)
                 .pushVlan()
                 .setVlanId(deviceVlan)
-                .setOutput(uplinkPort)
-                .build();
+                .setOutput(uplinkPort);
+
+        if (meterId != null) {
+            upstreamTreatmentBuilder.meter(meterId);
+        }
+
+        if (technologyProfileId != -1) {
+            upstreamTreatmentBuilder.transition(technologyProfileId);
+
+        }
 
         return DefaultForwardingObjective.builder()
                 .withFlag(ForwardingObjective.Flag.VERSATILE)
@@ -607,9 +740,8 @@
                 .makePermanent()
                 .withSelector(upstream)
                 .fromApp(appId)
-                .withTreatment(upstreamTreatment);
+                .withTreatment(upstreamTreatmentBuilder.build());
     }
-
     private void provisionTransparentFlows(DeviceId deviceId, PortNumber uplinkPort,
                                            PortNumber subscriberPort,
                                            VlanId innerVlan,
@@ -656,11 +788,11 @@
         upFuture.thenAcceptBothAsync(downFuture, (upStatus, downStatus) -> {
             if (downStatus != null) {
                 log.error("Flow with innervlan {} and outerVlan {} on device {} " +
-                        "on port {} failed downstream installation: {}",
+                                "on port {} failed downstream installation: {}",
                         innerVlan, outerVlan, deviceId, subscriberPort, downStatus);
             } else if (upStatus != null) {
                 log.error("Flow with innerVlan {} and outerVlan {} on device {} " +
-                        "on port {} failed upstream installation: {}",
+                                "on port {} failed upstream installation: {}",
                         innerVlan, outerVlan, deviceId, subscriberPort, upStatus);
             }
         }, oltInstallers);
@@ -757,11 +889,11 @@
         upFuture.thenAcceptBothAsync(downFuture, (upStatus, downStatus) -> {
             if (downStatus != null) {
                 log.error("Flow with innerVlan {} and outerVlan {} on device {} " +
-                        "on port {} failed downstream uninstallation: {}",
+                                "on port {} failed downstream uninstallation: {}",
                         innerVlan, outerVlan, deviceId, subscriberPort, downStatus);
             } else if (upStatus != null) {
                 log.error("Flow with innerVlan {} and outerVlan {} on device {} " +
-                        "on port {} failed upstream uninstallation: {}",
+                                "on port {} failed upstream uninstallation: {}",
                         innerVlan, outerVlan, deviceId, subscriberPort, upStatus);
             }
         }, oltInstallers);
@@ -778,21 +910,21 @@
                 .withKey(Criteria.matchInPort(port))
                 .addCondition(Criteria.matchEthType(EthType.EtherType.EAPOL.ethType()))
                 .withMeta(DefaultTrafficTreatment.builder()
-                                  .setOutput(PortNumber.CONTROLLER).build())
+                        .setOutput(PortNumber.CONTROLLER).build())
                 .fromApp(appId)
                 .withPriority(10000)
                 .add(new ObjectiveContext() {
                     @Override
                     public void onSuccess(Objective objective) {
                         log.info("Eapol filter for {} on {} {}.",
-                                 devId, port, (install) ? "installed" : "removed");
+                                devId, port, (install) ? "installed" : "removed");
                     }
 
                     @Override
                     public void onError(Objective objective, ObjectiveError error) {
                         log.info("Eapol filter for {} on {} failed {} because {}",
-                                 devId, port, (install) ? "installation" : "removal",
-                                 error);
+                                devId, port, (install) ? "installation" : "removal",
+                                error);
                     }
                 });
 
@@ -901,7 +1033,7 @@
                 .addCondition(Criteria.matchUdpSrc(TpPort.tpPort(udpSrc)))
                 .addCondition(Criteria.matchUdpDst(TpPort.tpPort(udpDst)))
                 .withMeta(DefaultTrafficTreatment.builder()
-                                  .setOutput(PortNumber.CONTROLLER).build())
+                        .setOutput(PortNumber.CONTROLLER).build())
                 .fromApp(appId)
                 .withPriority(10000)
                 .add(new ObjectiveContext() {
@@ -929,7 +1061,7 @@
             return;
         }
 
-       DefaultFilteringObjective.Builder builder = DefaultFilteringObjective.builder();
+        DefaultFilteringObjective.Builder builder = DefaultFilteringObjective.builder();
 
         builder = install ? builder.permit() : builder.deny();
 
@@ -938,21 +1070,21 @@
                 .addCondition(Criteria.matchEthType(EthType.EtherType.IPV4.ethType()))
                 .addCondition(Criteria.matchIPProtocol(IPv4.PROTOCOL_IGMP))
                 .withMeta(DefaultTrafficTreatment.builder()
-                                  .setOutput(PortNumber.CONTROLLER).build())
+                        .setOutput(PortNumber.CONTROLLER).build())
                 .fromApp(appId)
                 .withPriority(10000)
                 .add(new ObjectiveContext() {
                     @Override
                     public void onSuccess(Objective objective) {
                         log.info("Igmp filter for {} on {} {}.",
-                                 devId, port, (install) ? "installed" : "removed");
+                                devId, port, (install) ? "installed" : "removed");
                     }
 
                     @Override
                     public void onError(Objective objective, ObjectiveError error) {
                         log.info("Igmp filter for {} on {} failed {} because {}.",
-                                 devId, port, (install) ? "installation" : "removal",
-                                 error);
+                                devId, port, (install) ? "installation" : "removal",
+                                error);
                     }
                 });
 
@@ -968,11 +1100,10 @@
     private void checkAndCreateDeviceFlows(Device dev) {
         // we create only for the ones we are master of
         if (!mastershipService.isLocalMaster(dev.id())) {
-                return;
+            return;
         }
         // check if this device is provisioned in Sadis
-        String devSerialNo = dev.serialNumber();
-        SubscriberAndDeviceInformation deviceInfo = subsService.get(devSerialNo);
+        SubscriberAndDeviceInformation deviceInfo = getOltInfo(dev);
         log.debug("checkAndCreateDeviceFlows: deviceInfo {}", deviceInfo);
 
         if (deviceInfo != null) {
@@ -999,8 +1130,7 @@
      */
     private Port getUplinkPort(Device dev) {
         // check if this device is provisioned in Sadis
-        String devSerialNo = dev.serialNumber();
-        SubscriberAndDeviceInformation deviceInfo = subsService.get(devSerialNo);
+        SubscriberAndDeviceInformation deviceInfo = getOltInfo(dev);
         log.debug("getUplinkPort: deviceInfo {}", deviceInfo);
         if (deviceInfo == null) {
             log.warn("Device {} is not configured in SADIS .. cannot fetch device"
@@ -1042,8 +1172,7 @@
 
     private SubscriberAndDeviceInformation getOltInfo(Device dev) {
         String devSerialNo = dev.serialNumber();
-        SubscriberAndDeviceInformation deviceInfo = subsService.get(devSerialNo);
-        return deviceInfo;
+        return subsService.get(devSerialNo);
     }
 
     private class InternalDeviceListener implements DeviceListener {
@@ -1147,4 +1276,45 @@
             });
         }
     }
-}
+
+    private class InternalMeterListener implements MeterListener {
+
+        @Override
+        public void event(MeterEvent meterEvent) {
+            if (deleteMeters && MeterEvent.Type.METER_REFERENCE_COUNT_ZERO.equals(meterEvent.type())) {
+                log.debug("Zero Count Meter Event is received. Meter is {}", meterEvent.subject());
+                Meter meter = meterEvent.subject();
+                if (meter != null && appId.equals(meter.appId())) {
+                    deleteMeter(meter.deviceId(), meter.id());
+                }
+            } else if (MeterEvent.Type.METER_REMOVED.equals(meterEvent.type())) {
+                log.debug("Meter removed event is received. Meter is {}", meterEvent.subject());
+                removeMeterFromBpMap(meterEvent.subject());
+            }
+        }
+
+        private void deleteMeter(DeviceId deviceId, MeterId meterId) {
+            Meter meter = meterService.getMeter(deviceId, meterId);
+            MeterRequest meterRequest = DefaultMeterRequest.builder()
+                    .withBands(meter.bands())
+                    .withUnit(meter.unit())
+                    .forDevice(deviceId)
+                    .fromApp(appId)
+                    .burst()
+                    .remove();
+
+            meterService.withdraw(meterRequest, meterId);
+
+        }
+
+        private void removeMeterFromBpMap(Meter meter) {
+            for (Map.Entry<String, MeterId> entry : bpInfoToMeter.entrySet()) {
+                if (entry.getValue().equals(meter.id())) {
+                    bpInfoToMeter.remove(entry.getKey());
+                    log.info("Deleted from the internal map. Profile {} and Meter {}", entry.getKey(), meter.id());
+                    break;
+                }
+            }
+        }
+    }
+}
\ No newline at end of file