[SEBA-144] Operational Status DHCP L2 Relay.

Cherry-picked from dhcpl2relay-1.6

Change-Id: I58aab083152793af36dfd34fa91cd78385fd2ed7
Signed-off-by: Marcos Aurelio Carrero <mcarrero@furukawalatam.com>
diff --git a/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2Relay.java b/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2Relay.java
index c7e4913..411d763 100755
--- a/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2Relay.java
+++ b/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2Relay.java
@@ -19,6 +19,31 @@
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import static org.onlab.packet.DHCP.DHCPOptionCode.OptionCode_MessageType;
+import static org.onlab.packet.MacAddress.valueOf;
+import static org.onlab.util.Tools.get;
+import static org.onlab.util.Tools.getIntegerProperty;
+import static org.onosproject.net.config.basics.SubjectFactories.APP_SUBJECT_FACTORY;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Dictionary;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import org.apache.commons.io.HexDump;
 import org.onlab.packet.DHCP;
 import org.onlab.packet.Ethernet;
 import org.onlab.packet.IPv4;
@@ -28,6 +53,7 @@
 import org.onlab.packet.UDP;
 import org.onlab.packet.VlanId;
 import org.onlab.packet.dhcp.DhcpOption;
+import org.onlab.util.SafeRecurringTask;
 import org.onlab.util.Tools;
 import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.core.ApplicationId;
@@ -80,24 +106,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Dictionary;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
-
-import static org.onlab.packet.DHCP.DHCPOptionCode.OptionCode_MessageType;
-import static org.onlab.packet.MacAddress.valueOf;
-import static org.onosproject.net.config.basics.SubjectFactories.APP_SUBJECT_FACTORY;
-import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.ENABLE_DHCP_BROADCAST_REPLIES;
-import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.ENABLE_DHCP_BROADCAST_REPLIES_DEFAULT;
-import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.OPTION_82;
-import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.OPTION_82_DEFAULT;
+import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.*;
 
 /**
  * DHCP Relay Agent Application Component.
@@ -106,6 +115,8 @@
 property = {
         OPTION_82 + ":Boolean=" + OPTION_82_DEFAULT,
         ENABLE_DHCP_BROADCAST_REPLIES + ":Boolean=" + ENABLE_DHCP_BROADCAST_REPLIES_DEFAULT,
+        PUBLISH_COUNTERS_RATE + ":Integer=" + PUBLISH_COUNTERS_RATE_DEFAULT,
+        DHCP_COUNTERS_TOPIC + ":String=" + DHCP_COUNTERS_TOPIC_DEFAULT
 })
 public class DhcpL2Relay
         extends AbstractListenerManager<DhcpL2RelayEvent, DhcpL2RelayListener>
@@ -156,12 +167,24 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected FlowObjectiveService flowObjectiveService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected DhcpL2RelayCountersStore dhcpL2RelayCounters;
+
+    // OSGi Properties
     /** Add option 82 to relayed packets. */
     protected boolean option82 = OPTION_82_DEFAULT;
-
     /** Ask the DHCP Server to send back replies as L2 broadcast. */
     protected boolean enableDhcpBroadcastReplies = ENABLE_DHCP_BROADCAST_REPLIES_DEFAULT;
 
+    protected int publishCountersRate = PUBLISH_COUNTERS_RATE_DEFAULT;
+    private String dhcpCountersTopic = DHCP_COUNTERS_TOPIC_DEFAULT;
+
+
+    protected PublishCountersToKafka publishCountersToKafka;
+
+    ScheduledFuture<?> refreshTask;
+    ScheduledExecutorService refreshService = Executors.newSingleThreadScheduledExecutor();
+
     private DhcpRelayPacketProcessor dhcpRelayPacketProcessor =
             new DhcpRelayPacketProcessor();
 
@@ -170,14 +193,14 @@
 
     // connect points to the DHCP server
     Set<ConnectPoint> dhcpConnectPoints;
-    private AtomicReference<ConnectPoint> dhcpServerConnectPoint = new AtomicReference<>();
+    protected AtomicReference<ConnectPoint> dhcpServerConnectPoint = new AtomicReference<>();
     private MacAddress dhcpConnectMac = MacAddress.BROADCAST;
     private ApplicationId appId;
 
     static Map<String, DhcpAllocationInfo> allocationMap = Maps.newConcurrentMap();
-    private boolean modifyClientPktsSrcDstMac = false;
+    protected boolean modifyClientPktsSrcDstMac = false;
     //Whether to use the uplink port of the OLTs to send/receive messages to the DHCP server
-    private boolean useOltUplink = false;
+    protected boolean useOltUplink = false;
 
     private BaseInformationService<SubscriberAndDeviceInformation> subsService;
 
@@ -207,12 +230,21 @@
             modified(context);
         }
 
+        publishCountersToKafka = new PublishCountersToKafka();
+        subsService = sadisService.getSubscriberInfoService();
+        restartPublishCountersTask();
 
         log.info("DHCP-L2-RELAY Started");
     }
 
     @Deactivate
     protected void deactivate() {
+        if (refreshTask != null) {
+            refreshTask.cancel(true);
+        }
+        if (refreshService != null) {
+            refreshService.shutdownNow();
+        }
         cfgService.removeListener(cfgListener);
         factories.forEach(cfgService::unregisterConfigFactory);
         packetService.removeProcessor(dhcpRelayPacketProcessor);
@@ -239,6 +271,62 @@
         if (o != null) {
             enableDhcpBroadcastReplies = o;
         }
+
+        Integer newPublishCountersRate = getIntegerProperty(properties, "publishCountersRate");
+        if (newPublishCountersRate != null) {
+            if (newPublishCountersRate != publishCountersRate && newPublishCountersRate >= 0) {
+                log.info("publishCountersRate modified from {} to {}", publishCountersRate, newPublishCountersRate);
+                publishCountersRate = newPublishCountersRate;
+            } else if (newPublishCountersRate < 0) {
+                log.error("Invalid newPublishCountersRate : {}, defaulting to 0", newPublishCountersRate);
+                publishCountersRate = 0;
+            }
+            restartPublishCountersTask();
+        }
+
+        String newDhcpCountersTopic = get(properties, "dhcpCountersTopic");
+        if (newDhcpCountersTopic != null && !newDhcpCountersTopic.equals(dhcpCountersTopic)) {
+            log.info("Property dhcpCountersTopic modified from {} to {}", dhcpCountersTopic, newDhcpCountersTopic);
+            dhcpCountersTopic = newDhcpCountersTopic;
+        }
+    }
+
+    /**
+     * Starts a thread to publish the counters to kafka at a certain rate time.
+     */
+    private void restartPublishCountersTask() {
+        if (refreshTask != null) {
+            refreshTask.cancel(true);
+        }
+        if (publishCountersRate > 0) {
+            log.info("Refresh Rate set to {}, publishCountersToKafka will be called every {} seconds",
+                    publishCountersRate, publishCountersRate);
+            refreshTask = refreshService.scheduleWithFixedDelay(SafeRecurringTask.wrap(publishCountersToKafka),
+                    publishCountersRate, publishCountersRate, TimeUnit.SECONDS);
+        } else {
+            log.info("Refresh Rate set to 0, disabling calls to publishCountersToKafka");
+        }
+    }
+
+    /**
+     * Publish the counters to kafka.
+     */
+    private class PublishCountersToKafka implements Runnable {
+        public void run() {
+            dhcpL2RelayCounters.getCountersMap().forEach((counterKey, counterValue) -> {
+                // Publish the global counters
+                if (counterKey.counterClassKey.equals(DhcpL2RelayEvent.GLOBAL_COUNTER)) {
+                    post(new DhcpL2RelayEvent(DhcpL2RelayEvent.Type.STATS_UPDATE, null, null,
+                            new AbstractMap.SimpleEntry<String, AtomicLong>(counterKey.counterTypeKey.toString(),
+                                    counterValue), dhcpCountersTopic, null));
+                } else { // Publish the counters per subscriber
+                    DhcpAllocationInfo info = allocationMap.get(counterKey.counterClassKey);
+                    post(new DhcpL2RelayEvent(DhcpL2RelayEvent.Type.STATS_UPDATE, info, null,
+                            new AbstractMap.SimpleEntry<String, AtomicLong>(counterKey.counterTypeKey.toString(),
+                                    counterValue), dhcpCountersTopic, counterKey.counterClassKey));
+                }
+            });
+        }
     }
 
     /**
@@ -246,7 +334,7 @@
      *
      * @return true if all information we need have been initialized
      */
-    private boolean configured() {
+    protected boolean configured() {
         if (!useOltUplink) {
             return dhcpServerConnectPoint.get() != null;
         }
@@ -606,6 +694,9 @@
                               toSendTo, packet);
                 }
                 packetService.emit(o);
+
+                SubscriberAndDeviceInformation entry = getSubscriberInfoFromClient(context);
+                updateDhcpRelayCountersStore(entry, DhcpL2RelayCounters.valueOf("PACKETS_TO_SERVER"));
             } else {
                 log.error("No connect point to send msg to DHCP Server");
             }
@@ -623,6 +714,44 @@
             return null;
         }
 
+        private void  updateDhcpRelayCountersStore(SubscriberAndDeviceInformation entry,
+                                                   DhcpL2RelayCounters counterType) {
+            // Update global counter stats
+            dhcpL2RelayCounters.incrementCounter(DhcpL2RelayEvent.GLOBAL_COUNTER, counterType);
+            if (entry == null) {
+                log.warn("Counter not updated as subscriber info not found.");
+            } else {
+                // Update subscriber counter stats
+                dhcpL2RelayCounters.incrementCounter(entry.id(), counterType);
+            }
+        }
+
+        /*
+         * Get subscriber information based on it's context packet.
+         */
+        private SubscriberAndDeviceInformation getSubscriberInfoFromClient(PacketContext context) {
+            if (context != null) {
+                return getSubscriber(context);
+            }
+            return null;
+        }
+
+        /*
+         * Get subscriber information based on it's DHCP payload.
+         */
+        private SubscriberAndDeviceInformation getSubscriberInfoFromServer(DHCP dhcpPayload) {
+            if (dhcpPayload != null) {
+                MacAddress descMac = valueOf(dhcpPayload.getClientHardwareAddress());
+                ConnectPoint subsCp = getConnectPointOfClient(descMac);
+
+                if (subsCp != null) {
+                    String portId = nasPortId(subsCp);
+                    return subsService.get(portId);
+                }
+            }
+            return null;
+        }
+
         //process the dhcp packet before sending to server
         private void processDhcpPacket(PacketContext context, Ethernet packet,
                                        DHCP dhcpPayload) {
@@ -632,6 +761,18 @@
             }
 
             DHCP.MsgType incomingPacketType = getDhcpPacketType(dhcpPayload);
+            if (incomingPacketType == null) {
+                log.warn("DHCP packet type not found. Dump of ethernet pkt in hex format for troubleshooting.");
+                byte[] array = packet.serialize();
+                ByteArrayOutputStream buf = new ByteArrayOutputStream();
+                try {
+                    HexDump.dump(array, 0, buf, 0);
+                    log.trace(buf.toString());
+                } catch (Exception e) { }
+                return;
+            }
+
+            SubscriberAndDeviceInformation entry = null;
 
             log.info("Received DHCP Packet of type {} from {}",
                      incomingPacketType, context.inPacket().receivedFrom());
@@ -643,6 +784,8 @@
                     if (ethernetPacketDiscover != null) {
                         forwardPacket(ethernetPacketDiscover, context);
                     }
+                    entry = getSubscriberInfoFromClient(context);
+                    updateDhcpRelayCountersStore(entry, DhcpL2RelayCounters.valueOf("DHCPDISCOVER"));
                     break;
                 case DHCPOFFER:
                     //reply to dhcp client.
@@ -651,6 +794,8 @@
                     if (ethernetPacketOffer != null) {
                         sendReply(ethernetPacketOffer, dhcpPayload);
                     }
+                    entry = getSubscriberInfoFromServer(dhcpPayload);
+                    updateDhcpRelayCountersStore(entry, DhcpL2RelayCounters.valueOf("DHCPOFFER"));
                     break;
                 case DHCPREQUEST:
                     Ethernet ethernetPacketRequest =
@@ -658,6 +803,8 @@
                     if (ethernetPacketRequest != null) {
                         forwardPacket(ethernetPacketRequest, context);
                     }
+                    entry = getSubscriberInfoFromClient(context);
+                    updateDhcpRelayCountersStore(entry, DhcpL2RelayCounters.valueOf("DHCPREQUEST"));
                     break;
                 case DHCPACK:
                     //reply to dhcp client.
@@ -666,6 +813,20 @@
                     if (ethernetPacketAck != null) {
                         sendReply(ethernetPacketAck, dhcpPayload);
                     }
+                    entry = getSubscriberInfoFromServer(dhcpPayload);
+                    updateDhcpRelayCountersStore(entry, DhcpL2RelayCounters.valueOf("DHCPACK"));
+                    break;
+                case DHCPDECLINE:
+                    entry = getSubscriberInfoFromClient(context);
+                    updateDhcpRelayCountersStore(entry, DhcpL2RelayCounters.valueOf("DHCPDECLINE"));
+                    break;
+                case DHCPNAK:
+                    entry = getSubscriberInfoFromServer(dhcpPayload);
+                    updateDhcpRelayCountersStore(entry, DhcpL2RelayCounters.valueOf("DHCPNACK"));
+                    break;
+                case DHCPRELEASE:
+                    entry = getSubscriberInfoFromClient(context);
+                    updateDhcpRelayCountersStore(entry, DhcpL2RelayCounters.valueOf("DHCPRELEASE"));
                     break;
                 default:
                     break;
@@ -709,7 +870,7 @@
                     context.inPacket().receivedFrom(), dhcpPacket.getPacketType(),
                     entry.nasPortId(), clientMac, clientIp);
 
-            allocationMap.put(entry.nasPortId(), info);
+            allocationMap.put(entry.id(), info);
 
             post(new DhcpL2RelayEvent(DhcpL2RelayEvent.Type.UPDATED, info,
                                       context.inPacket().receivedFrom()));
@@ -781,12 +942,15 @@
                     DhcpAllocationInfo info = new DhcpAllocationInfo(subsCp,
                             dhcpPayload.getPacketType(), circuitId, dstMac, ip);
 
-                    allocationMap.put(sub.nasPortId(), info);
+                    allocationMap.put(sub.id(), info);
 
                     post(new DhcpL2RelayEvent(DhcpL2RelayEvent.Type.UPDATED, info, subsCp));
                 }
             } // end storing of info
 
+            SubscriberAndDeviceInformation entry = getSubscriberInfoFromServer(dhcpPayload);
+            updateDhcpRelayCountersStore(entry, DhcpL2RelayCounters.valueOf("PACKETS_FROM_SERVER"));
+
             // we leave the srcMac from the original packet
             etherReply.setDestinationMACAddress(dstMac);
             etherReply.setQinQVID(sTag(subsCp).toShort());