A set of fixes and features
Some updates to handle errors in certain scenarios.
Formatted code
Initial work to parse json.
Added code to map certain json values from incoming data into parts of the VES format.
Added lines to map type and severity to VES
Completed support for mapping alarm messages. Also added timing functionality for tracking message send times
Change-Id: Iaf9ce372552fce3f744476719f5fd79548d22ef4
Signed-off-by: William Kurkian <wkurkian@cisco.com>
diff --git a/config/config.properties b/config/config.properties
index bde07af..2c4a4c5 100644
--- a/config/config.properties
+++ b/config/config.properties
@@ -1,4 +1,4 @@
-onap_ves_address=onap2
+onap_ves_address=onap
onap_ves_port=30235
kafka_address=kafka.voltha.svc.cluster.local
diff --git a/libs/evel_javalib2-1.1.0.jar b/libs/evel_javalib2-1.1.0.jar
index 8d21f35..2d9da20 100644
--- a/libs/evel_javalib2-1.1.0.jar
+++ b/libs/evel_javalib2-1.1.0.jar
Binary files differ
diff --git a/pom.xml b/pom.xml
index c492835..0e2c36f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1,19 +1,19 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
- ~ Copyright 2018- Cisco
- ~
- ~ Licensed under the Apache License, Version 2.0 (the "License");
- ~ you may not use this file except in compliance with the License.
- ~ You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
+~ Copyright 2018- Cisco
+~
+~ Licensed under the Apache License, Version 2.0 (the "License");
+~ you may not use this file except in compliance with the License.
+~ You may obtain a copy of the License at
+~
+~ http://www.apache.org/licenses/LICENSE-2.0
+~
+~ Unless required by applicable law or agreed to in writing, software
+~ distributed under the License is distributed on an "AS IS" BASIS,
+~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+~ See the License for the specific language governing permissions and
+~ limitations under the License.
+-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
@@ -29,7 +29,7 @@
<properties>
<java.version>1.8</java.version>
- <docker.image.prefix>osam</docker.image.prefix>
+ <docker.image.prefix>osam</docker.image.prefix>
</properties>
<dependencies>
@@ -42,16 +42,22 @@
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <version>1.1.0</version>
- </dependency>
- <dependency>
- <groupId>org.onap.vnfsdk.ves-agent</groupId>
- <artifactId>evel_javalib2</artifactId>
- <version>1.1.0</version>
- </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>1.1.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.onap.vnfsdk.ves-agent</groupId>
+ <artifactId>evel_javalib2</artifactId>
+ <version>1.1.0</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ <version>2.8.5</version>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
@@ -60,19 +66,19 @@
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
- </plugin>
- <plugin>
- <groupId>com.spotify</groupId>
- <artifactId>dockerfile-maven-plugin</artifactId>
- <version>1.3.6</version>
- <configuration>
- <repository>${docker.image.prefix}/${project.artifactId}</repository>
- <buildArgs>
- <JAR_FILE>target/${project.build.finalName}.jar</JAR_FILE>
- <PROPERTIES_FILE>config/config.properties</PROPERTIES_FILE>
- </buildArgs>
- </configuration>
- </plugin>
+ </plugin>
+ <plugin>
+ <groupId>com.spotify</groupId>
+ <artifactId>dockerfile-maven-plugin</artifactId>
+ <version>1.3.6</version>
+ <configuration>
+ <repository>${docker.image.prefix}/${project.artifactId}</repository>
+ <buildArgs>
+ <JAR_FILE>target/${project.build.finalName}.jar</JAR_FILE>
+ <PROPERTIES_FILE>config/config.properties</PROPERTIES_FILE>
+ </buildArgs>
+ </configuration>
+ </plugin>
</plugins>
</build>
</project>
diff --git a/src/main/java/config/Config.java b/src/main/java/config/Config.java
index abf5d73..3e520e3 100644
--- a/src/main/java/config/Config.java
+++ b/src/main/java/config/Config.java
@@ -1,18 +1,18 @@
/*
- * Copyright 2018- Cisco
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Copyright 2018- Cisco
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
package config;
import java.util.Properties;
@@ -23,46 +23,46 @@
public class Config {
- private static Properties properties;
+ private static Properties properties;
- private final static Logger logger = LoggerFactory.getLogger("VolthaKafkaConsumer");
+ private final static Logger logger = LoggerFactory.getLogger("VolthaKafkaConsumer");
- public static void loadProperties(String file) {
- // create application properties with default
- try {
- properties = new Properties();
+ public static void loadProperties(String file) {
+ // create application properties with default
+ try {
+ properties = new Properties();
- // now load properties
- // from last invocation
- FileInputStream in = new FileInputStream(file);
- properties.load(in);
- in.close();
- } catch (Exception e) {
- logger.error(e.getMessage());
+ // now load properties
+ // from last invocation
+ FileInputStream in = new FileInputStream(file);
+ properties.load(in);
+ in.close();
+ } catch (Exception e) {
+ logger.error(e.getMessage());
+ }
}
- }
- public static String get(String key) {
- return (String)properties.get(key);
- }
+ public static String get(String key) {
+ return (String)properties.get(key);
+ }
- public static String getVesAddress() {
- return get("onap_ves_address");
- }
+ public static String getVesAddress() {
+ return get("onap_ves_address");
+ }
- public static String getVesPort() {
- return get("onap_ves_port");
- }
+ public static String getVesPort() {
+ return get("onap_ves_port");
+ }
- public static String getKafkaAddress() {
- return get("kafka_address");
- }
+ public static String getKafkaAddress() {
+ return get("kafka_address");
+ }
- public static String getKafkaPort() {
- return get("kafka_port");
- }
+ public static String getKafkaPort() {
+ return get("kafka_port");
+ }
- public static String getKafkaTopic() {
- return get("kafka_topic");
- }
+ public static String getKafkaTopic() {
+ return get("kafka_topic");
+ }
}
diff --git a/src/main/java/controller/Application.java b/src/main/java/controller/Application.java
index 6ae05b8..d7a0d0e 100644
--- a/src/main/java/controller/Application.java
+++ b/src/main/java/controller/Application.java
@@ -1,18 +1,18 @@
/*
- * Copyright 2018- Cisco
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Copyright 2018- Cisco
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
package controller;
import org.springframework.boot.SpringApplication;
@@ -40,7 +40,7 @@
public static void main(String[] args) {
Config.loadProperties("/opt/ves-agent/config.properties");
- KafkaThread kafka = new KafkaThread();
+ KafkaThread kafka = new KafkaThread();
kafka.start();
SpringApplication.run(Application.class, args);
}
@@ -51,13 +51,13 @@
private final static Logger logger = LoggerFactory.getLogger("KafkaThread");
public void run() {
- logger.debug("Start Kafka Consumer Thread");
- try {
- VolthaKafkaConsumer consumer = new VolthaKafkaConsumer();
- consumer.runConsumer();
- } catch (InterruptedException e) {
- logger.error(e.getMessage());
- }
+ logger.debug("Start Kafka Consumer Thread");
+ try {
+ VolthaKafkaConsumer consumer = new VolthaKafkaConsumer();
+ consumer.runConsumer();
+ } catch (InterruptedException e) {
+ logger.error(e.getMessage());
+ }
}
- }
+}
diff --git a/src/main/java/filter/MarkerFilter.java b/src/main/java/filter/MarkerFilter.java
index 8c20563..9887409 100644
--- a/src/main/java/filter/MarkerFilter.java
+++ b/src/main/java/filter/MarkerFilter.java
@@ -1,18 +1,18 @@
/*
- * Copyright 2018- Cisco
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Copyright 2018- Cisco
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
package filter;
import java.util.Arrays;
@@ -31,10 +31,10 @@
Marker markerToMatch;
/*
- * (non-Javadoc)
- *
- * @see ch.qos.logback.core.filter.Filter#decide(java.lang.Object)
- */
+ * (non-Javadoc)
+ *
+ * @see ch.qos.logback.core.filter.Filter#decide(java.lang.Object)
+ */
@Override
public FilterReply decide(ILoggingEvent event) {
if (!isStarted()) {
@@ -53,10 +53,10 @@
}
/**
- * The marker to match in the event.
- *
- * @param markerToMatch
- */
+ * The marker to match in the event.
+ *
+ * @param markerToMatch
+ */
public void setMarker(String markerStr) {
if (markerStr != null) {
Marker marker = MarkerFactory.getMarker(markerStr);
@@ -65,10 +65,10 @@
}
/*
- * (non-Javadoc)
- *
- * @see ch.qos.logback.core.filter.Filter#start()
- */
+ * (non-Javadoc)
+ *
+ * @see ch.qos.logback.core.filter.Filter#start()
+ */
@Override
public void start() {
if (this.markerToMatch != null) {
diff --git a/src/main/java/kafka/VolthaKafkaConsumer.java b/src/main/java/kafka/VolthaKafkaConsumer.java
index a9f0603..dc431bb 100644
--- a/src/main/java/kafka/VolthaKafkaConsumer.java
+++ b/src/main/java/kafka/VolthaKafkaConsumer.java
@@ -1,18 +1,18 @@
/*
- * Copyright 2018- Cisco
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Copyright 2018- Cisco
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
package kafka;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.consumer.Consumer;
@@ -26,6 +26,11 @@
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;
+import java.time.Instant;
+import java.time.Duration;
+
+import com.google.gson.JsonSyntaxException;
+
import javax.xml.ws.http.HTTPException;
import java.util.concurrent.TimeUnit;
@@ -33,6 +38,7 @@
import ves.*;
import config.Config;
+
public class VolthaKafkaConsumer {
private final Logger logger = LoggerFactory.getLogger("VolthaKafkaConsumer");
@@ -42,80 +48,92 @@
private KafkaConsumer<Long, String> consumer;
public VolthaKafkaConsumer() {
- logger.debug("VolthaKafkaConsumer constructor called");
- initVesAgent();
- consumer = createConsumer();
+ logger.debug("VolthaKafkaConsumer constructor called");
+ initVesAgent();
+ try {
+ consumer = createConsumer();
+ } catch (KafkaException e) {
+ logger.error("Error with Kafka connection. Retrying in 15 seconds.");
+ //Don't try to resolve it here. Try again in the thread loo, in case this is a temporal issue
+ }
}
private void initVesAgent() {
- VesAgent.initVes();
+ VesAgent.initVes();
}
private KafkaConsumer<Long, String> createConsumer() {
- logger.debug("Creating Kafka Consumer");
+ logger.debug("Creating Kafka Consumer");
- String kafkaAddress = Config.getKafkaAddress() + ":" + Config.getKafkaPort();
- final Properties props = new Properties();
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
- kafkaAddress);
- props.put(ConsumerConfig.GROUP_ID_CONFIG,
- "KafkaExampleConsumer");
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
- LongDeserializer.class.getName());
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
- StringDeserializer.class.getName());
- props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
- false);
+ String kafkaAddress = Config.getKafkaAddress() + ":" + Config.getKafkaPort();
+ final Properties props = new Properties();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
+ kafkaAddress);
+ props.put(ConsumerConfig.GROUP_ID_CONFIG,
+ "KafkaExampleConsumer");
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+ LongDeserializer.class.getName());
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+ StringDeserializer.class.getName());
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
+ false);
- // Create the consumer using props.
- final KafkaConsumer<Long, String> consumer =
- new KafkaConsumer<>(props);
+ // Create the consumer using props.
+ final KafkaConsumer<Long, String> consumer =
+ new KafkaConsumer<>(props);
- // Subscribe to the topic.
- consumer.subscribe(Collections.singletonList(Config.getKafkaTopic()));
- return consumer;
- }
+ // Subscribe to the topic.
+ consumer.subscribe(Collections.singletonList(Config.getKafkaTopic()));
+ return consumer;
+ }
- public void runConsumer() throws InterruptedException {
+ public void runConsumer() throws InterruptedException {
- logger.debug("Starting Consumer");
+ logger.debug("Starting Consumer");
- while (true) {
- ConsumerRecords<Long, String> consumerRecords;
- try {
- consumerRecords = consumer.poll(100000);
- } catch (KafkaException e) {
- logger.debug("Error with Kafka connection. Retrying in 15 seconds.");
- this.consumer = createConsumer();
- TimeUnit.SECONDS.sleep(15);
- continue;
- }
- logger.info("{} Records retrieved from poll.", consumerRecords.count());
+ while (true) {
+ ConsumerRecords<Long, String> consumerRecords;
+ try {
+ if (consumer == null) {
+ this.consumer = createConsumer();
+ }
+ consumerRecords = consumer.poll(20000);
+ } catch (KafkaException e) {
+ logger.error("Error with Kafka connection. Retrying in 15 seconds.");
+ consumer = null;
+ TimeUnit.SECONDS.sleep(15);
+ continue;
+ }
+ logger.info("{} Records retrieved from poll.", consumerRecords.count());
- boolean commit = true;
- try {
- consumerRecords.forEach(record -> {
- logger.info(dataMarker, "Consumer Record:({}, {}, {}, {})\n",
- record.key(), record.value(),
- record.partition(), record.offset());
- logger.info("Attempting to send data to VES");
- boolean success = VesAgent.sendToVES(record.value());
- if (!success) {
- throw new HTTPException(0);
- } else {
- logger.info("Sent Ves Message");
- }
- });
- } catch (HTTPException e) {
- logger.info("Ves message failed. Going back to polling.");
- commit = false;
- }
- if (commit) {
- consumer.commitAsync();
- }
- }
- //consumer.close();
- //logger.debug("DONE");
+ boolean commit = true;
+ try {
+ consumerRecords.forEach(record -> {
+ Instant start = Instant.now();
+ logger.info(dataMarker, "Consumer Record:({}, {}, {}, {})\n",
+ record.key(), record.value(),
+ record.partition(), record.offset());
+ logger.info("Attempting to send data to VES");
+ boolean success = VesAgent.sendToVES(record.value());
+ if (!success) {
+ throw new HTTPException(0);
+ } else {
+ Instant finish = Instant.now();
+ logger.info("Sent Ves Message. Took " + Duration.between(start, finish).toMillis() + " Milliseconds.");
+ }
+ });
+ } catch (HTTPException e) {
+ logger.info("Ves message failed. Going back to polling.");
+ commit = false;
+ } catch (JsonSyntaxException e) {
+ logger.error("Json Syntax Exception: ", e);
+ }
+ if (commit) {
+ consumer.commitAsync();
+ }
+ }
+ //consumer.close();
+ //logger.debug("DONE");
}
}
diff --git a/src/main/java/mapper/VesVolthaMapper.java b/src/main/java/mapper/VesVolthaMapper.java
new file mode 100644
index 0000000..a3ee1ab
--- /dev/null
+++ b/src/main/java/mapper/VesVolthaMapper.java
@@ -0,0 +1,36 @@
+/*
+* Copyright 2018- Cisco
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package mapper;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+public class VesVolthaMapper {
+
+ private final Logger logger = LoggerFactory.getLogger("VesVolthaMapper");
+
+ private Gson gson;
+
+ public VesVolthaMapper() {
+ gson = new GsonBuilder().create();
+ }
+
+ public VesVolthaMessage parseJson(String json) {
+ return gson.fromJson(json, VesVolthaMessage.class);
+ }
+}
diff --git a/src/main/java/mapper/VesVolthaMessage.java b/src/main/java/mapper/VesVolthaMessage.java
new file mode 100644
index 0000000..f0327df
--- /dev/null
+++ b/src/main/java/mapper/VesVolthaMessage.java
@@ -0,0 +1,76 @@
+/*
+* Copyright 2018- Cisco
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package mapper;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Map;
+
+public class VesVolthaMessage {
+ private String id = "";
+ private String logical_device_id = "";
+ private String raised_ts = "";
+ private String description = "";
+ private String type = "";
+ private String category = "";
+ private String severity = "";
+ private String state = "";
+ private String resource_id = "";
+ private Map<String,String> context;
+
+ public String getId() {
+ return id;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+ public String getLogicalDeviceId() {
+ return logical_device_id;
+ }
+
+ public String getRaisedTS() {
+ return raised_ts;
+ }
+
+ public String getCategory() {
+ //Passing type instead of category to map bewteen VES and VOLTHA.
+ return category;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public String getSeverity() {
+ return severity;
+ }
+
+ public String getState() {
+ return state;
+ }
+
+ public Map<String,String> getContext() {
+ return context;
+ }
+
+ public String getResourceId() {
+ return resource_id;
+ }
+
+
+}
diff --git a/src/main/java/ves/VesAgent.java b/src/main/java/ves/VesAgent.java
index b0465f7..a9a723d 100644
--- a/src/main/java/ves/VesAgent.java
+++ b/src/main/java/ves/VesAgent.java
@@ -1,18 +1,18 @@
/*
- * Copyright 2018- Cisco
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Copyright 2018- Cisco
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
package ves;
import evel_javalibrary.att.com.*;
@@ -32,17 +32,24 @@
import org.apache.log4j.Level;
import config.Config;
+import mapper.VesVolthaMapper;
+import mapper.VesVolthaMessage;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.gson.JsonSyntaxException;
public class VesAgent {
- private static final Logger logger = LoggerFactory.getLogger("VesAgent");
+ private static final Logger logger = LoggerFactory.getLogger("VesAgent");
- public static void initVes() {
- logger.info("Initializing VES Agent");
- try {
- AgentMain.evel_initialize("http://"+Config.getVesAddress(),
+ private static VesVolthaMapper mapper;
+
+ public static void initVes() {
+ logger.info("Initializing VES Agent");
+ try {
+ mapper = new VesVolthaMapper();
+ AgentMain.evel_initialize("http://"+Config.getVesAddress(),
Integer.parseInt(Config.getVesPort()),
// "http://1.2.3.4", 8080,
//"/vendor_event_listener","/example_vnf",
@@ -52,31 +59,62 @@
null, null, null,
//"/home/gokul/newwk/demo/vnfs/VES5.0/evel/sslcerts2/my-keystore.jks", "changeit", "changeit",
Level.TRACE);
- } catch( Exception e ) {
- e.printStackTrace();
+ } catch( Exception e ) {
+ e.printStackTrace();
+ }
}
- }
- public static boolean sendToVES(String json) {
-
- EvelFault flt = new EvelFault("Fault_VOLTHA_failed", "tbd_event_key_unique_to_source",
- "NIC error", "Hardware failed",
+ public static boolean sendToVES(String json) throws JsonSyntaxException {
+ VesVolthaMessage message = mapper.parseJson(json);
+ String id = message.getId();
+ String ldeviceId = message.getLogicalDeviceId();
+ String ts = message.getRaisedTS();
+ String description = message.getDescription();
+ //Type in Voltha needs to be category in VES
+ String category = message.getType();
+ //Category in VOLTHA needs to be type in VES
+ String type = message.getCategory();
+ String severity = message.getSeverity();
+ String state = message.getState();
+ String resourceId = message.getResourceId();
+
+ EVEL_SEVERITIES vesSeverity = mapSeverity(severity);
+ EVEL_SOURCE_TYPES vesType = mapType(type);
+ EvelFault flt = new EvelFault(
+ "Fault_VOLTHA_" + id,
+ ldeviceId + ":" + ts,
+ id,
+ description,
EvelHeader.PRIORITIES.EVEL_PRIORITY_HIGH,
- EVEL_SEVERITIES.EVEL_SEVERITY_MAJOR,
- EVEL_SOURCE_TYPES.EVEL_SOURCE_CARD,
+ vesSeverity,
+ vesType,
EVEL_VF_STATUSES.EVEL_VF_STATUS_ACTIVE);
- flt.evel_fault_addl_info_add("voltha", json);
- //flt.evel_fault_addl_info_add("nicsw", "fail");
- flt.evel_fault_category_set("Communication");
- logger.info("Sending fault event");
- int code = AgentMain.evel_post_event_immediate(flt);
- logger.info("Fault event http code received: " + code);
- if(code == 0 || code >= HttpURLConnection.HTTP_BAD_REQUEST )
- {
- return false;
- } else {
- return true;
- }
- }
+ flt.evel_fault_addl_info_add("voltha", json);
+ flt.evel_fault_addl_info_add("state", state);
+ flt.evel_fault_addl_info_add("resourceId", resourceId);
+ flt.evel_fault_category_set(category);
+ logger.info("Sending fault event");
+ int code = AgentMain.evel_post_event_immediate(flt);
+ logger.info("Fault event http code received: " + code);
+ if(code == 0 || code >= HttpURLConnection.HTTP_BAD_REQUEST ) {
+ return false;
+ } else {
+ return true;
+ }
+ }
+
+ private static EVEL_SEVERITIES mapSeverity(String severity) {
+ String severityUpper = severity.toUpperCase();
+ switch (severityUpper) {
+ case "INDETERMINATE":
+ return EVEL_SEVERITIES.EVEL_SEVERITY_NORMAL;
+ default:
+ return EVEL_SEVERITIES.valueOf("EVEL_SEVERITY_" + severityUpper);
+ }
+ }
+
+ private static EVEL_SOURCE_TYPES mapType(String type) {
+ return EVEL_SOURCE_TYPES.valueOf("EVEL_SOURCE_" + type.toUpperCase());
+ }
}
diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml
index 89b8950..e4fcb12 100644
--- a/src/main/resources/logback.xml
+++ b/src/main/resources/logback.xml
@@ -1,17 +1,17 @@
<!--
- Copyright 2018- Cisco
-
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
+Copyright 2018- Cisco
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
-->
<configuration>
@@ -24,45 +24,45 @@
<logger name="org.apache.kafka" level="INFO"/>
<logger name="org.apache.kafka.common.metrics" level="INFO"/>
-
+
<appender name="FILE_MAIN" class="ch.qos.logback.core.rolling.RollingFileAppender">
- <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
- <fileNamePattern>/opt/ves-agent/log-%d{yyyy-MM-dd}.txt</fileNamePattern>
- <maxHistory>30</maxHistory>
- <totalSizeCap>5GB</totalSizeCap>
- </rollingPolicy>
- <append>true</append>
- <encoder>
- <pattern>%-4relative [%thread] %-5level %logger{35} - %msg%n</pattern>
- </encoder>
- </appender>
+ <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+ <fileNamePattern>/opt/ves-agent/log-%d{yyyy-MM-dd}.txt</fileNamePattern>
+ <maxHistory>30</maxHistory>
+ <totalSizeCap>5GB</totalSizeCap>
+ </rollingPolicy>
+ <append>true</append>
+ <encoder>
+ <pattern>%-4relative [%thread] %-5level %logger{35} - %msg%n</pattern>
+ </encoder>
+ </appender>
- <appender name="FILE_DATA" class="ch.qos.logback.core.rolling.RollingFileAppender">
- <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
- <fileNamePattern>/opt/ves-agent/data-%d{yyyy-MM-dd}.txt</fileNamePattern>
- <maxHistory>30</maxHistory>
- <totalSizeCap>1GB</totalSizeCap>
- </rollingPolicy>
- <append>true</append>
- <filter class="filter.MarkerFilter">
- <marker>DATA</marker>
- <onMismatch>DENY</onMismatch>
- </filter>
- <filter class="filter.MarkerFilter">
- <marker>DATA</marker>
- <onMatch>ACCEPT</onMatch>
- </filter>
- <encoder>
- <pattern>%-4relative [%thread] %-5level %logger{35} - %msg%n</pattern>
- </encoder>
- </appender>
+ <appender name="FILE_DATA" class="ch.qos.logback.core.rolling.RollingFileAppender">
+ <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+ <fileNamePattern>/opt/ves-agent/data-%d{yyyy-MM-dd}.txt</fileNamePattern>
+ <maxHistory>30</maxHistory>
+ <totalSizeCap>1GB</totalSizeCap>
+ </rollingPolicy>
+ <append>true</append>
+ <filter class="filter.MarkerFilter">
+ <marker>DATA</marker>
+ <onMismatch>DENY</onMismatch>
+ </filter>
+ <filter class="filter.MarkerFilter">
+ <marker>DATA</marker>
+ <onMatch>ACCEPT</onMatch>
+ </filter>
+ <encoder>
+ <pattern>%-4relative [%thread] %-5level %logger{35} - %msg%n</pattern>
+ </encoder>
+ </appender>
- <logger name="org.springframework" level="WARNING"/>
+ <logger name="org.springframework" level="WARNING"/>
- <root level="debug">
- <appender-ref ref="FILE_MAIN" />
- <appender-ref ref="FILE_DATA" />
- <appender-ref ref="STDOUT" />
- </root>
+ <root level="debug">
+ <appender-ref ref="FILE_MAIN" />
+ <appender-ref ref="FILE_DATA" />
+ <appender-ref ref="STDOUT" />
+ </root>
</configuration>
diff --git a/ves-agent.yaml b/ves-agent.yaml
index 756f73e..29534de 100644
--- a/ves-agent.yaml
+++ b/ves-agent.yaml
@@ -11,7 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
- */
+
apiVersion: v1
kind: Service
metadata:
@@ -52,8 +52,8 @@
valueFrom:
fieldRef:
fieldPath: metadata.namespace
-
+
ports:
- containerPort: 8080
name: rest-port
- imagePullPolicy: IfNotPresent
+ imagePullPolicy: Always