Remove OLT-specific Kafka integration.
This has been moved into its own app (kafka-onos repo)
Change-Id: Ie8f0c01bc2fe14561686848dc5b5bc22b9560837
diff --git a/kafka/pom.xml b/kafka/pom.xml
deleted file mode 100644
index 2fc1aef..0000000
--- a/kafka/pom.xml
+++ /dev/null
@@ -1,78 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Copyright 2018-present Open Networking Foundation
- ~
- ~ Licensed under the Apache License, Version 2.0 (the "License");
- ~ you may not use this file except in compliance with the License.
- ~ You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <groupId>org.opencord</groupId>
- <artifactId>olt</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- <relativePath>../pom.xml</relativePath>
- </parent>
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>olt-kafka</artifactId>
-
- <packaging>bundle</packaging>
- <description>Kafka event module for OLT application</description>
-
- <dependencies>
- <dependency>
- <groupId>org.opencord</groupId>
- <artifactId>olt-api</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.servicemix.bundles</groupId>
- <artifactId>org.apache.servicemix.bundles.kafka-clients</artifactId>
- <version>1.1.0_1</version>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.onosproject</groupId>
- <artifactId>onos-maven-plugin</artifactId>
- <version>1.11</version>
- </plugin>
- <plugin>
- <groupId>org.apache.felix</groupId>
- <artifactId>maven-bundle-plugin</artifactId>
- <extensions>true</extensions>
- </plugin>
- <plugin>
- <groupId>org.apache.felix</groupId>
- <artifactId>maven-scr-plugin</artifactId>
- <version>1.21.0</version>
- <executions>
- <execution>
- <id>generate-scr-srcdescriptor</id>
- <goals>
- <goal>scr</goal>
- </goals>
- </execution>
- </executions>
- <configuration>
- <supportedProjectTypes>
- <supportedProjectType>bundle</supportedProjectType>
- </supportedProjectTypes>
- </configuration>
- </plugin>
- </plugins>
- </build>
-</project>
diff --git a/kafka/src/main/java/org/opencord/olt/kafka/KafkaIntegration.java b/kafka/src/main/java/org/opencord/olt/kafka/KafkaIntegration.java
deleted file mode 100644
index 7ecbaf6..0000000
--- a/kafka/src/main/java/org/opencord/olt/kafka/KafkaIntegration.java
+++ /dev/null
@@ -1,262 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.opencord.olt.kafka;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.onosproject.core.ApplicationId;
-import org.onosproject.core.CoreService;
-import org.onosproject.net.AnnotationKeys;
-import org.onosproject.net.Port;
-import org.onosproject.net.config.ConfigFactory;
-import org.onosproject.net.config.NetworkConfigEvent;
-import org.onosproject.net.config.NetworkConfigListener;
-import org.onosproject.net.config.NetworkConfigRegistry;
-import org.onosproject.net.config.basics.SubjectFactories;
-import org.opencord.olt.AccessDeviceEvent;
-import org.opencord.olt.AccessDeviceListener;
-import org.opencord.olt.AccessDeviceService;
-import org.slf4j.Logger;
-
-import java.time.Instant;
-import java.util.Properties;
-import java.util.concurrent.ExecutorService;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static java.util.concurrent.Executors.newSingleThreadExecutor;
-import static org.onlab.util.Tools.groupedThreads;
-import static org.slf4j.LoggerFactory.getLogger;
-
-/**
- * Sends access device events to an external system.
- */
-@Component(immediate = true)
-public class KafkaIntegration {
-
- private final Logger log = getLogger(getClass());
- private static final Class<OltKafkaConfig>
- OLT_KAFKA_CONFIG_CLASS = OltKafkaConfig.class;
-
- private static final String APP_NAME = "org.opencord.olt";
- private ApplicationId appId;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected CoreService coreService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected NetworkConfigRegistry configRegistry;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected AccessDeviceService accessDeviceService;
-
- private static StringSerializer stringSerializer = new StringSerializer();
-
- private KafkaProducer<String, String> kafkaProducer;
-
- private InternalNetworkConfigListener configListener =
- new InternalNetworkConfigListener();
- private InternalAccessDeviceListener listener =
- new InternalAccessDeviceListener();
-
- private final ExecutorService executor = newSingleThreadExecutor(
- groupedThreads(this.getClass().getSimpleName(), "events", log));
-
- private ConfigFactory<ApplicationId, OltKafkaConfig> kafkaConfigFactory =
- new ConfigFactory<ApplicationId, OltKafkaConfig>(
- SubjectFactories.APP_SUBJECT_FACTORY, OLT_KAFKA_CONFIG_CLASS,
- "kafka") {
- @Override
- public OltKafkaConfig createConfig() {
- return new OltKafkaConfig();
- }
- };
-
- private static final String BOOTSTRAP_SERVERS = "bootstrap.servers";
- private static final String RETRIES = "retries";
- private static final String RECONNECT_BACKOFF = "reconnect.backoff.ms";
- private static final String INFLIGHT_REQUESTS =
- "max.in.flight.requests.per.connection";
- private static final String ACKS = "acks";
- private static final String KEY_SERIALIZER = "key.serializer";
- private static final String VALUE_SERIALIZER = "value.serializer";
- private static final String STRING_SERIALIZER =
- stringSerializer.getClass().getCanonicalName();
-
- private static final String ONU_TOPIC = "onu.events";
-
- private static final String STATUS = "status";
- private static final String SERIAL_NUMBER = "serial_number";
- private static final String UNI_PORT_ID = "uni_port_id";
- private static final String OF_DPID = "of_dpid";
- private static final String ACTIVATED = "activated";
- private static final String TIMESTAMP = "timestamp";
-
- @Activate
- public void activate() {
- appId = coreService.registerApplication(APP_NAME);
- configRegistry.registerConfigFactory(kafkaConfigFactory);
- configRegistry.addListener(configListener);
- accessDeviceService.addListener(listener);
-
- configure();
-
- log.info("Started");
- }
-
- @Deactivate
- public void deactivate() {
- accessDeviceService.removeListener(listener);
- configRegistry.removeListener(configListener);
- configRegistry.unregisterConfigFactory(kafkaConfigFactory);
-
- executor.shutdownNow();
-
- shutdownKafka();
- log.info("Stopped");
- }
-
- private void configure() {
- OltKafkaConfig config =
- configRegistry.getConfig(appId, OLT_KAFKA_CONFIG_CLASS);
- if (config == null) {
- log.info("OLT Kafka config not found");
- return;
- }
- configure(config);
- }
-
- private void configure(OltKafkaConfig config) {
- checkNotNull(config);
-
- Properties properties = new Properties();
- properties.put(BOOTSTRAP_SERVERS, config.getBootstrapServers());
- properties.put(RETRIES, config.getRetries());
- properties.put(RECONNECT_BACKOFF, config.getReconnectBackoff());
- properties.put(INFLIGHT_REQUESTS, config.getInflightRequests());
- properties.put(ACKS, config.getAcks());
- properties.put(KEY_SERIALIZER, STRING_SERIALIZER);
- properties.put(VALUE_SERIALIZER, STRING_SERIALIZER);
-
- startKafka(properties);
- }
-
- private void unconfigure() {
- shutdownKafka();
- }
-
- private void startKafka(Properties properties) {
- shutdownKafka();
-
- // Kafka client doesn't play nice with the default OSGi classloader
- // This workaround temporarily changes the thread's classloader so that
- // the Kafka client can load the serializer classes.
- ClassLoader original = Thread.currentThread().getContextClassLoader();
- Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
- try {
- kafkaProducer = new KafkaProducer<>(properties);
- } finally {
- Thread.currentThread().setContextClassLoader(original);
- }
- }
-
- private void shutdownKafka() {
- if (kafkaProducer != null) {
- kafkaProducer.close();
- kafkaProducer = null;
- }
- }
-
- private void sendUniAddEvent(AccessDeviceEvent event) {
- if (kafkaProducer == null) {
- return;
- }
-
- Port port = event.port().get();
- String serialNumber = port.annotations().value(AnnotationKeys.PORT_NAME);
-
- ObjectMapper mapper = new ObjectMapper();
- ObjectNode onuNode = mapper.createObjectNode();
- onuNode.put(TIMESTAMP, Instant.now().toString());
- onuNode.put(STATUS, ACTIVATED);
- onuNode.put(SERIAL_NUMBER, serialNumber);
- onuNode.put(UNI_PORT_ID, port.number().toLong());
- onuNode.put(OF_DPID, port.element().id().toString());
-
- if (log.isDebugEnabled()) {
- log.debug("Sending UNI ADD event: {}", onuNode.toString());
- }
-
- kafkaProducer.send(new ProducerRecord<>(ONU_TOPIC, onuNode.toString()),
- (r, e) -> logException(e));
- }
-
- private void logException(Exception e) {
- if (e != null) {
- log.error("Exception while sending to Kafka", e);
- }
- }
-
- private class InternalNetworkConfigListener implements NetworkConfigListener {
-
- @Override
- public void event(NetworkConfigEvent event) {
- switch (event.type()) {
- case CONFIG_ADDED:
- case CONFIG_UPDATED:
- configure((OltKafkaConfig) event.config().get());
- break;
- case CONFIG_REMOVED:
- unconfigure();
- break;
- case CONFIG_REGISTERED:
- case CONFIG_UNREGISTERED:
- default:
- break;
- }
- }
-
- @Override
- public boolean isRelevant(NetworkConfigEvent event) {
- return event.configClass().equals(OLT_KAFKA_CONFIG_CLASS);
- }
- }
-
- private class InternalAccessDeviceListener implements AccessDeviceListener {
-
- @Override
- public void event(AccessDeviceEvent accessDeviceEvent) {
- log.debug("KafkaIntegration got {} event for {}/{}",
- accessDeviceEvent.type(), accessDeviceEvent.subject(), accessDeviceEvent.port());
- switch (accessDeviceEvent.type()) {
- case UNI_ADDED:
- executor.execute(() ->
- KafkaIntegration.this.sendUniAddEvent(accessDeviceEvent));
- break;
- case UNI_REMOVED:
- default:
- break;
- }
- }
- }
-}
diff --git a/kafka/src/main/java/org/opencord/olt/kafka/OltKafkaConfig.java b/kafka/src/main/java/org/opencord/olt/kafka/OltKafkaConfig.java
deleted file mode 100644
index afea805..0000000
--- a/kafka/src/main/java/org/opencord/olt/kafka/OltKafkaConfig.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.opencord.olt.kafka;
-
-import org.onosproject.core.ApplicationId;
-import org.onosproject.net.config.Config;
-
-/**
- * Configuration of the Kafka publishing endpoint.
- */
-public class OltKafkaConfig extends Config<ApplicationId> {
-
- private static final String BOOTSTRAP_SERVERS = "bootstrapServers";
- private static final String RETRIES = "retries";
- private static final String RECONNECT_BACKOFF = "reconnectBackoff";
- private static final String INFLIGHT_REQUESTS = "inflightRequests";
- private static final String ACKS = "acks";
-
- private static final String DEFAULT_RETRIES = "1";
- private static final String DEFAULT_RECONNECT_BACKOFF = "500";
- private static final String DEFAULT_INFLIGHT_REQUESTS = "5";
- private static final String DEFAULT_ACKS = "1";
-
- @Override
- public boolean isValid() {
- return hasOnlyFields(BOOTSTRAP_SERVERS, RETRIES, RECONNECT_BACKOFF,
- INFLIGHT_REQUESTS, ACKS) && hasFields(BOOTSTRAP_SERVERS);
- }
-
- /**
- * Returns the Kafka bootstrap servers.
- * <p>
- * This can be a hostname/IP and port (e.g. 10.1.1.1:9092).
- * </p>
- *
- * @return Kafka bootstrap servers
- */
- public String getBootstrapServers() {
- return object.path(BOOTSTRAP_SERVERS).asText();
- }
-
- /**
- * Returns the number of retries.
- *
- * @return retries
- */
- public String getRetries() {
- return get(RETRIES, DEFAULT_RETRIES);
- }
-
- /**
- * Returns the reconnect backoff in milliseconds.
- *
- * @return reconnect backoff
- */
- public String getReconnectBackoff() {
- return get(RECONNECT_BACKOFF, DEFAULT_RECONNECT_BACKOFF);
- }
-
- /**
- * Returns the number of inflight requests.
- *
- * @return inflight requests
- */
- public String getInflightRequests() {
- return get(INFLIGHT_REQUESTS, DEFAULT_INFLIGHT_REQUESTS);
- }
-
- /**
- * Returns the number of acks.
- *
- * @return acks
- */
- public String getAcks() {
- return get(ACKS, DEFAULT_ACKS);
- }
-}
diff --git a/kafka/src/main/java/org/opencord/olt/kafka/package-info.java b/kafka/src/main/java/org/opencord/olt/kafka/package-info.java
deleted file mode 100644
index 7754563..0000000
--- a/kafka/src/main/java/org/opencord/olt/kafka/package-info.java
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Kafka event module for OLT application.
- */
-package org.opencord.olt.kafka;
diff --git a/pom.xml b/pom.xml
index 75e1f53..d44af86 100644
--- a/pom.xml
+++ b/pom.xml
@@ -39,7 +39,6 @@
<modules>
<module>api</module>
- <module>kafka</module>
<module>app</module>
</modules>