Added initial set of files for Ves Agent to gerrit
This version is functional.
Change to use standard vnfsdk ves-agent library, now that the changes to it have been merged
Added License headers
Separated docker commands into a separate script
Change-Id: I802bd50cb6e9b2272317822e94ed29fbdde172b4
Signed-off-by: William Kurkian <wkurkian@cisco.com
diff --git a/src/main/java/config/Config.java b/src/main/java/config/Config.java
new file mode 100644
index 0000000..abf5d73
--- /dev/null
+++ b/src/main/java/config/Config.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2018- Cisco
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package config;
+
+import java.util.Properties;
+import java.io.FileInputStream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Config {
+
+ private static Properties properties;
+
+ private final static Logger logger = LoggerFactory.getLogger("VolthaKafkaConsumer");
+
+ public static void loadProperties(String file) {
+ // create application properties with default
+ try {
+ properties = new Properties();
+
+ // now load properties
+ // from last invocation
+ FileInputStream in = new FileInputStream(file);
+ properties.load(in);
+ in.close();
+ } catch (Exception e) {
+ logger.error(e.getMessage());
+ }
+ }
+
+ public static String get(String key) {
+ return (String)properties.get(key);
+ }
+
+ public static String getVesAddress() {
+ return get("onap_ves_address");
+ }
+
+ public static String getVesPort() {
+ return get("onap_ves_port");
+ }
+
+ public static String getKafkaAddress() {
+ return get("kafka_address");
+ }
+
+ public static String getKafkaPort() {
+ return get("kafka_port");
+ }
+
+ public static String getKafkaTopic() {
+ return get("kafka_topic");
+ }
+}
diff --git a/src/main/java/controller/Application.java b/src/main/java/controller/Application.java
new file mode 100644
index 0000000..6ae05b8
--- /dev/null
+++ b/src/main/java/controller/Application.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2018- Cisco
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package controller;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+import kafka.VolthaKafkaConsumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.InterruptedException;
+
+import config.Config;
+
+
+@SpringBootApplication
+@RestController
+public class Application {
+
+ @RequestMapping("/")
+ public String home() {
+ return "Hello Docker World";
+ }
+
+ public static void main(String[] args) {
+ Config.loadProperties("/opt/ves-agent/config.properties");
+ KafkaThread kafka = new KafkaThread();
+ kafka.start();
+ SpringApplication.run(Application.class, args);
+ }
+
+}
+class KafkaThread extends Thread {
+
+ private final static Logger logger = LoggerFactory.getLogger("KafkaThread");
+
+ public void run() {
+ logger.debug("Start Kafka Consumer Thread");
+ try {
+ VolthaKafkaConsumer consumer = new VolthaKafkaConsumer();
+ consumer.runConsumer();
+ } catch (InterruptedException e) {
+ logger.error(e.getMessage());
+ }
+
+ }
+ }
diff --git a/src/main/java/filter/MarkerFilter.java b/src/main/java/filter/MarkerFilter.java
new file mode 100644
index 0000000..8c20563
--- /dev/null
+++ b/src/main/java/filter/MarkerFilter.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2018- Cisco
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package filter;
+
+import java.util.Arrays;
+
+import org.slf4j.Marker;
+
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.core.filter.AbstractMatcherFilter;
+import ch.qos.logback.core.spi.FilterReply;
+
+import org.slf4j.Marker;
+import org.slf4j.MarkerFactory;
+
+public class MarkerFilter extends AbstractMatcherFilter<ILoggingEvent> {
+
+ Marker markerToMatch;
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see ch.qos.logback.core.filter.Filter#decide(java.lang.Object)
+ */
+ @Override
+ public FilterReply decide(ILoggingEvent event) {
+ if (!isStarted()) {
+ return FilterReply.NEUTRAL;
+ }
+ Marker marker = event.getMarker();
+ if (marker == null) {
+ return onMismatch;
+ }
+
+ if (markerToMatch.contains(marker)) {
+ return onMatch;
+ } else {
+ return onMismatch;
+ }
+ }
+
+ /**
+ * The marker to match in the event.
+ *
+ * @param markerToMatch
+ */
+ public void setMarker(String markerStr) {
+ if (markerStr != null) {
+ Marker marker = MarkerFactory.getMarker(markerStr);
+ this.markerToMatch = marker;
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see ch.qos.logback.core.filter.Filter#start()
+ */
+ @Override
+ public void start() {
+ if (this.markerToMatch != null) {
+ super.start();
+ }
+ }
+}
diff --git a/src/main/java/kafka/VolthaKafkaConsumer.java b/src/main/java/kafka/VolthaKafkaConsumer.java
new file mode 100644
index 0000000..a9f0603
--- /dev/null
+++ b/src/main/java/kafka/VolthaKafkaConsumer.java
@@ -0,0 +1,121 @@
+/*
+ * Copyright 2018- Cisco
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka;
+import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.KafkaException;
+import java.util.Collections;
+import java.util.Properties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.Marker;
+import org.slf4j.MarkerFactory;
+
+import javax.xml.ws.http.HTTPException;
+
+import java.util.concurrent.TimeUnit;
+
+import ves.*;
+import config.Config;
+
+public class VolthaKafkaConsumer {
+
+ private final Logger logger = LoggerFactory.getLogger("VolthaKafkaConsumer");
+ private final String dataMarkerText = "DATA";
+ private final Marker dataMarker = MarkerFactory.getMarker(dataMarkerText);
+
+ private KafkaConsumer<Long, String> consumer;
+
+ public VolthaKafkaConsumer() {
+ logger.debug("VolthaKafkaConsumer constructor called");
+ initVesAgent();
+ consumer = createConsumer();
+ }
+
+ private void initVesAgent() {
+ VesAgent.initVes();
+ }
+
+ private KafkaConsumer<Long, String> createConsumer() {
+ logger.debug("Creating Kafka Consumer");
+
+ String kafkaAddress = Config.getKafkaAddress() + ":" + Config.getKafkaPort();
+ final Properties props = new Properties();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
+ kafkaAddress);
+ props.put(ConsumerConfig.GROUP_ID_CONFIG,
+ "KafkaExampleConsumer");
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+ LongDeserializer.class.getName());
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+ StringDeserializer.class.getName());
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
+ false);
+
+ // Create the consumer using props.
+ final KafkaConsumer<Long, String> consumer =
+ new KafkaConsumer<>(props);
+
+ // Subscribe to the topic.
+ consumer.subscribe(Collections.singletonList(Config.getKafkaTopic()));
+ return consumer;
+ }
+
+ public void runConsumer() throws InterruptedException {
+
+ logger.debug("Starting Consumer");
+
+ while (true) {
+ ConsumerRecords<Long, String> consumerRecords;
+ try {
+ consumerRecords = consumer.poll(100000);
+ } catch (KafkaException e) {
+ logger.debug("Error with Kafka connection. Retrying in 15 seconds.");
+ this.consumer = createConsumer();
+ TimeUnit.SECONDS.sleep(15);
+ continue;
+ }
+ logger.info("{} Records retrieved from poll.", consumerRecords.count());
+
+ boolean commit = true;
+ try {
+ consumerRecords.forEach(record -> {
+ logger.info(dataMarker, "Consumer Record:({}, {}, {}, {})\n",
+ record.key(), record.value(),
+ record.partition(), record.offset());
+ logger.info("Attempting to send data to VES");
+ boolean success = VesAgent.sendToVES(record.value());
+ if (!success) {
+ throw new HTTPException(0);
+ } else {
+ logger.info("Sent Ves Message");
+ }
+ });
+ } catch (HTTPException e) {
+ logger.info("Ves message failed. Going back to polling.");
+ commit = false;
+ }
+ if (commit) {
+ consumer.commitAsync();
+ }
+ }
+ //consumer.close();
+ //logger.debug("DONE");
+ }
+
+}
diff --git a/src/main/java/ves/VesAgent.java b/src/main/java/ves/VesAgent.java
new file mode 100644
index 0000000..b0465f7
--- /dev/null
+++ b/src/main/java/ves/VesAgent.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2018- Cisco
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package ves;
+
+import evel_javalibrary.att.com.*;
+import evel_javalibrary.att.com.AgentMain.EVEL_ERR_CODES;
+import evel_javalibrary.att.com.EvelFault.EVEL_SEVERITIES;
+import evel_javalibrary.att.com.EvelFault.EVEL_SOURCE_TYPES;
+import evel_javalibrary.att.com.EvelFault.EVEL_VF_STATUSES;
+import evel_javalibrary.att.com.EvelHeader.PRIORITIES;
+import evel_javalibrary.att.com.EvelMobileFlow.MOBILE_GTP_PER_FLOW_METRICS;
+import evel_javalibrary.att.com.EvelScalingMeasurement.MEASUREMENT_CPU_USE;
+import evel_javalibrary.att.com.EvelScalingMeasurement.MEASUREMENT_VNIC_PERFORMANCE;
+import evel_javalibrary.att.com.EvelStateChange.EVEL_ENTITY_STATE;
+import evel_javalibrary.att.com.EvelThresholdCross.EVEL_ALERT_TYPE;
+import evel_javalibrary.att.com.EvelThresholdCross.EVEL_EVENT_ACTION;
+import java.net.HttpURLConnection;
+
+import org.apache.log4j.Level;
+import config.Config;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class VesAgent {
+
+ private static final Logger logger = LoggerFactory.getLogger("VesAgent");
+
+ public static void initVes() {
+ logger.info("Initializing VES Agent");
+ try {
+ AgentMain.evel_initialize("http://"+Config.getVesAddress(),
+ Integer.parseInt(Config.getVesPort()),
+ // "http://1.2.3.4", 8080,
+ //"/vendor_event_listener","/example_vnf",
+ null,null,
+ "will",
+ "pill",
+ null, null, null,
+ //"/home/gokul/newwk/demo/vnfs/VES5.0/evel/sslcerts2/my-keystore.jks", "changeit", "changeit",
+ Level.TRACE);
+ } catch( Exception e ) {
+ e.printStackTrace();
+ }
+ }
+
+ public static boolean sendToVES(String json) {
+
+ EvelFault flt = new EvelFault("Fault_VOLTHA_failed", "tbd_event_key_unique_to_source",
+ "NIC error", "Hardware failed",
+ EvelHeader.PRIORITIES.EVEL_PRIORITY_HIGH,
+ EVEL_SEVERITIES.EVEL_SEVERITY_MAJOR,
+ EVEL_SOURCE_TYPES.EVEL_SOURCE_CARD,
+ EVEL_VF_STATUSES.EVEL_VF_STATUS_ACTIVE);
+ flt.evel_fault_addl_info_add("voltha", json);
+ //flt.evel_fault_addl_info_add("nicsw", "fail");
+ flt.evel_fault_category_set("Communication");
+ logger.info("Sending fault event");
+ int code = AgentMain.evel_post_event_immediate(flt);
+ logger.info("Fault event http code received: " + code);
+ if(code == 0 || code >= HttpURLConnection.HTTP_BAD_REQUEST )
+ {
+ return false;
+ } else {
+ return true;
+ }
+ }
+
+}