Remove OLT-specific Kafka integration.

This has been moved into its own app (kafka-onos repo)

Change-Id: Ie8f0c01bc2fe14561686848dc5b5bc22b9560837
diff --git a/kafka/pom.xml b/kafka/pom.xml
deleted file mode 100644
index 2fc1aef..0000000
--- a/kafka/pom.xml
+++ /dev/null
@@ -1,78 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Copyright 2018-present Open Networking Foundation
-  ~
-  ~ Licensed under the Apache License, Version 2.0 (the "License");
-  ~ you may not use this file except in compliance with the License.
-  ~ You may obtain a copy of the License at
-  ~
-  ~     http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <parent>
-        <groupId>org.opencord</groupId>
-        <artifactId>olt</artifactId>
-        <version>2.0.0-SNAPSHOT</version>
-        <relativePath>../pom.xml</relativePath>
-    </parent>
-    <modelVersion>4.0.0</modelVersion>
-
-    <artifactId>olt-kafka</artifactId>
-
-    <packaging>bundle</packaging>
-    <description>Kafka event module for OLT application</description>
-
-      <dependencies>
-        <dependency>
-            <groupId>org.opencord</groupId>
-            <artifactId>olt-api</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.servicemix.bundles</groupId>
-            <artifactId>org.apache.servicemix.bundles.kafka-clients</artifactId>
-            <version>1.1.0_1</version>
-        </dependency>
-    </dependencies>
-
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>org.onosproject</groupId>
-                <artifactId>onos-maven-plugin</artifactId>
-                <version>1.11</version>
-            </plugin>
-            <plugin>
-                <groupId>org.apache.felix</groupId>
-                <artifactId>maven-bundle-plugin</artifactId>
-                <extensions>true</extensions>
-            </plugin>
-            <plugin>
-                <groupId>org.apache.felix</groupId>
-                <artifactId>maven-scr-plugin</artifactId>
-                <version>1.21.0</version>
-                <executions>
-                    <execution>
-                        <id>generate-scr-srcdescriptor</id>
-                        <goals>
-                            <goal>scr</goal>
-                        </goals>
-                    </execution>
-                </executions>
-                <configuration>
-                    <supportedProjectTypes>
-                        <supportedProjectType>bundle</supportedProjectType>
-                    </supportedProjectTypes>
-                </configuration>
-            </plugin>
-        </plugins>
-    </build>
-</project>
diff --git a/kafka/src/main/java/org/opencord/olt/kafka/KafkaIntegration.java b/kafka/src/main/java/org/opencord/olt/kafka/KafkaIntegration.java
deleted file mode 100644
index 7ecbaf6..0000000
--- a/kafka/src/main/java/org/opencord/olt/kafka/KafkaIntegration.java
+++ /dev/null
@@ -1,262 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.opencord.olt.kafka;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.onosproject.core.ApplicationId;
-import org.onosproject.core.CoreService;
-import org.onosproject.net.AnnotationKeys;
-import org.onosproject.net.Port;
-import org.onosproject.net.config.ConfigFactory;
-import org.onosproject.net.config.NetworkConfigEvent;
-import org.onosproject.net.config.NetworkConfigListener;
-import org.onosproject.net.config.NetworkConfigRegistry;
-import org.onosproject.net.config.basics.SubjectFactories;
-import org.opencord.olt.AccessDeviceEvent;
-import org.opencord.olt.AccessDeviceListener;
-import org.opencord.olt.AccessDeviceService;
-import org.slf4j.Logger;
-
-import java.time.Instant;
-import java.util.Properties;
-import java.util.concurrent.ExecutorService;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static java.util.concurrent.Executors.newSingleThreadExecutor;
-import static org.onlab.util.Tools.groupedThreads;
-import static org.slf4j.LoggerFactory.getLogger;
-
-/**
- * Sends access device events to an external system.
- */
-@Component(immediate = true)
-public class KafkaIntegration {
-
-    private final Logger log = getLogger(getClass());
-    private static final Class<OltKafkaConfig>
-            OLT_KAFKA_CONFIG_CLASS = OltKafkaConfig.class;
-
-    private static final String APP_NAME = "org.opencord.olt";
-    private ApplicationId appId;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected CoreService coreService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected NetworkConfigRegistry configRegistry;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected AccessDeviceService accessDeviceService;
-
-    private static StringSerializer stringSerializer = new StringSerializer();
-
-    private KafkaProducer<String, String> kafkaProducer;
-
-    private InternalNetworkConfigListener configListener =
-            new InternalNetworkConfigListener();
-    private InternalAccessDeviceListener listener =
-            new InternalAccessDeviceListener();
-
-    private final ExecutorService executor = newSingleThreadExecutor(
-            groupedThreads(this.getClass().getSimpleName(), "events", log));
-
-    private ConfigFactory<ApplicationId, OltKafkaConfig> kafkaConfigFactory =
-            new ConfigFactory<ApplicationId, OltKafkaConfig>(
-                    SubjectFactories.APP_SUBJECT_FACTORY, OLT_KAFKA_CONFIG_CLASS,
-                    "kafka") {
-                @Override
-                public OltKafkaConfig createConfig() {
-                    return new OltKafkaConfig();
-                }
-            };
-
-    private static final String BOOTSTRAP_SERVERS = "bootstrap.servers";
-    private static final String RETRIES = "retries";
-    private static final String RECONNECT_BACKOFF = "reconnect.backoff.ms";
-    private static final String INFLIGHT_REQUESTS =
-            "max.in.flight.requests.per.connection";
-    private static final String ACKS = "acks";
-    private static final String KEY_SERIALIZER = "key.serializer";
-    private static final String VALUE_SERIALIZER = "value.serializer";
-    private static final String STRING_SERIALIZER =
-            stringSerializer.getClass().getCanonicalName();
-
-    private static final String ONU_TOPIC = "onu.events";
-
-    private static final String STATUS = "status";
-    private static final String SERIAL_NUMBER = "serial_number";
-    private static final String UNI_PORT_ID = "uni_port_id";
-    private static final String OF_DPID = "of_dpid";
-    private static final String ACTIVATED = "activated";
-    private static final String TIMESTAMP = "timestamp";
-
-    @Activate
-    public void activate() {
-        appId = coreService.registerApplication(APP_NAME);
-        configRegistry.registerConfigFactory(kafkaConfigFactory);
-        configRegistry.addListener(configListener);
-        accessDeviceService.addListener(listener);
-
-        configure();
-
-        log.info("Started");
-    }
-
-    @Deactivate
-    public void deactivate() {
-        accessDeviceService.removeListener(listener);
-        configRegistry.removeListener(configListener);
-        configRegistry.unregisterConfigFactory(kafkaConfigFactory);
-
-        executor.shutdownNow();
-
-        shutdownKafka();
-        log.info("Stopped");
-    }
-
-    private void configure() {
-        OltKafkaConfig config =
-                configRegistry.getConfig(appId, OLT_KAFKA_CONFIG_CLASS);
-        if (config == null) {
-            log.info("OLT Kafka config not found");
-            return;
-        }
-        configure(config);
-    }
-
-    private void configure(OltKafkaConfig config) {
-        checkNotNull(config);
-
-        Properties properties = new Properties();
-        properties.put(BOOTSTRAP_SERVERS, config.getBootstrapServers());
-        properties.put(RETRIES, config.getRetries());
-        properties.put(RECONNECT_BACKOFF, config.getReconnectBackoff());
-        properties.put(INFLIGHT_REQUESTS, config.getInflightRequests());
-        properties.put(ACKS, config.getAcks());
-        properties.put(KEY_SERIALIZER, STRING_SERIALIZER);
-        properties.put(VALUE_SERIALIZER, STRING_SERIALIZER);
-
-        startKafka(properties);
-    }
-
-    private void unconfigure() {
-        shutdownKafka();
-    }
-
-    private void startKafka(Properties properties) {
-        shutdownKafka();
-
-        // Kafka client doesn't play nice with the default OSGi classloader
-        // This workaround temporarily changes the thread's classloader so that
-        // the Kafka client can load the serializer classes.
-        ClassLoader original = Thread.currentThread().getContextClassLoader();
-        Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
-        try {
-            kafkaProducer = new KafkaProducer<>(properties);
-        } finally {
-            Thread.currentThread().setContextClassLoader(original);
-        }
-    }
-
-    private void shutdownKafka() {
-        if (kafkaProducer != null) {
-            kafkaProducer.close();
-            kafkaProducer = null;
-        }
-    }
-
-    private void sendUniAddEvent(AccessDeviceEvent event) {
-        if (kafkaProducer == null) {
-            return;
-        }
-
-        Port port = event.port().get();
-        String serialNumber = port.annotations().value(AnnotationKeys.PORT_NAME);
-
-        ObjectMapper mapper = new ObjectMapper();
-        ObjectNode onuNode = mapper.createObjectNode();
-        onuNode.put(TIMESTAMP, Instant.now().toString());
-        onuNode.put(STATUS, ACTIVATED);
-        onuNode.put(SERIAL_NUMBER, serialNumber);
-        onuNode.put(UNI_PORT_ID, port.number().toLong());
-        onuNode.put(OF_DPID, port.element().id().toString());
-
-        if (log.isDebugEnabled()) {
-            log.debug("Sending UNI ADD event: {}", onuNode.toString());
-        }
-
-        kafkaProducer.send(new ProducerRecord<>(ONU_TOPIC, onuNode.toString()),
-                (r, e) -> logException(e));
-    }
-
-    private void logException(Exception e) {
-        if (e != null) {
-            log.error("Exception while sending to Kafka", e);
-        }
-    }
-
-    private class InternalNetworkConfigListener implements NetworkConfigListener {
-
-        @Override
-        public void event(NetworkConfigEvent event) {
-            switch (event.type()) {
-            case CONFIG_ADDED:
-            case CONFIG_UPDATED:
-                configure((OltKafkaConfig) event.config().get());
-                break;
-            case CONFIG_REMOVED:
-                unconfigure();
-                break;
-            case CONFIG_REGISTERED:
-            case CONFIG_UNREGISTERED:
-            default:
-                break;
-            }
-        }
-
-        @Override
-        public boolean isRelevant(NetworkConfigEvent event) {
-            return event.configClass().equals(OLT_KAFKA_CONFIG_CLASS);
-        }
-    }
-
-    private class InternalAccessDeviceListener implements AccessDeviceListener {
-
-        @Override
-        public void event(AccessDeviceEvent accessDeviceEvent) {
-            log.debug("KafkaIntegration got {} event for {}/{}",
-                    accessDeviceEvent.type(), accessDeviceEvent.subject(), accessDeviceEvent.port());
-            switch (accessDeviceEvent.type()) {
-            case UNI_ADDED:
-                executor.execute(() ->
-                        KafkaIntegration.this.sendUniAddEvent(accessDeviceEvent));
-                break;
-            case UNI_REMOVED:
-            default:
-                break;
-            }
-        }
-    }
-}
diff --git a/kafka/src/main/java/org/opencord/olt/kafka/OltKafkaConfig.java b/kafka/src/main/java/org/opencord/olt/kafka/OltKafkaConfig.java
deleted file mode 100644
index afea805..0000000
--- a/kafka/src/main/java/org/opencord/olt/kafka/OltKafkaConfig.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.opencord.olt.kafka;
-
-import org.onosproject.core.ApplicationId;
-import org.onosproject.net.config.Config;
-
-/**
- * Configuration of the Kafka publishing endpoint.
- */
-public class OltKafkaConfig extends Config<ApplicationId> {
-
-    private static final String BOOTSTRAP_SERVERS = "bootstrapServers";
-    private static final String RETRIES = "retries";
-    private static final String RECONNECT_BACKOFF = "reconnectBackoff";
-    private static final String INFLIGHT_REQUESTS = "inflightRequests";
-    private static final String ACKS = "acks";
-
-    private static final String DEFAULT_RETRIES = "1";
-    private static final String DEFAULT_RECONNECT_BACKOFF = "500";
-    private static final String DEFAULT_INFLIGHT_REQUESTS = "5";
-    private static final String DEFAULT_ACKS = "1";
-
-    @Override
-    public boolean isValid() {
-        return hasOnlyFields(BOOTSTRAP_SERVERS, RETRIES, RECONNECT_BACKOFF,
-                INFLIGHT_REQUESTS, ACKS) && hasFields(BOOTSTRAP_SERVERS);
-    }
-
-    /**
-     * Returns the Kafka bootstrap servers.
-     * <p>
-     * This can be a hostname/IP and port (e.g. 10.1.1.1:9092).
-     * </p>
-     *
-     * @return Kafka bootstrap servers
-     */
-    public String getBootstrapServers() {
-        return object.path(BOOTSTRAP_SERVERS).asText();
-    }
-
-    /**
-     * Returns the number of retries.
-     *
-     * @return retries
-     */
-    public String getRetries() {
-        return get(RETRIES, DEFAULT_RETRIES);
-    }
-
-    /**
-     * Returns the reconnect backoff in milliseconds.
-     *
-     * @return reconnect backoff
-     */
-    public String getReconnectBackoff() {
-        return get(RECONNECT_BACKOFF, DEFAULT_RECONNECT_BACKOFF);
-    }
-
-    /**
-     * Returns the number of inflight requests.
-     *
-     * @return inflight requests
-     */
-    public String getInflightRequests() {
-        return get(INFLIGHT_REQUESTS, DEFAULT_INFLIGHT_REQUESTS);
-    }
-
-    /**
-     * Returns the number of acks.
-     *
-     * @return acks
-     */
-    public String getAcks() {
-        return get(ACKS, DEFAULT_ACKS);
-    }
-}
diff --git a/kafka/src/main/java/org/opencord/olt/kafka/package-info.java b/kafka/src/main/java/org/opencord/olt/kafka/package-info.java
deleted file mode 100644
index 7754563..0000000
--- a/kafka/src/main/java/org/opencord/olt/kafka/package-info.java
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Kafka event module for OLT application.
- */
-package org.opencord.olt.kafka;
diff --git a/pom.xml b/pom.xml
index 75e1f53..d44af86 100644
--- a/pom.xml
+++ b/pom.xml
@@ -39,7 +39,6 @@
 
     <modules>
         <module>api</module>
-        <module>kafka</module>
         <module>app</module>
     </modules>