Added id code and removed from name
Added funtionality to consumer and send kpi events to onap
Change-Id: I0849e3b9188b1fa53210c341f1fc6e6228c5de9b
Signed-off-by: William Kurkian <wkurkian@cisco.com>
diff --git a/config/config.properties b/config/config.properties
index ce035c3..999b4b1 100644
--- a/config/config.properties
+++ b/config/config.properties
@@ -1,9 +1,11 @@
onap_ves_address=onap
onap_ves_port=30235
-kafka_address=kafka.voltha.svc.cluster.local
+kafka_address=cord-kafka
kafka_port=9092
-kafka_topic=voltha.alarms
+
+kafka_alarms_topic=voltha.alarms
+kafka_kpis_topic=voltha.kpis
co_id=001
pod_id=001
diff --git a/src/main/java/config/Config.java b/src/main/java/config/Config.java
index ec27cfa..cce3045 100644
--- a/src/main/java/config/Config.java
+++ b/src/main/java/config/Config.java
@@ -60,10 +60,14 @@
public static String getKafkaPort() {
return get("kafka_port");
+ }
+
+ public static String getKafkaAlarmsTopic() {
+ return get("kafka_alarms_topic");
}
- public static String getKafkaTopic() {
- return get("kafka_topic");
+ public static String getKafkaKpisTopic() {
+ return get("kafka_kpis_topic");
}
public static String getCoId() {
@@ -73,4 +77,4 @@
public static String getPodId() {
return get("pod_id");
}
-}
+}
diff --git a/src/main/java/controller/Application.java b/src/main/java/controller/Application.java
index d7a0d0e..23566dd 100644
--- a/src/main/java/controller/Application.java
+++ b/src/main/java/controller/Application.java
@@ -20,6 +20,7 @@
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import kafka.VolthaKafkaConsumer;
+import kafka.KafkaConsumerType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,23 +41,40 @@
public static void main(String[] args) {
Config.loadProperties("/opt/ves-agent/config.properties");
- KafkaThread kafka = new KafkaThread();
- kafka.start();
- SpringApplication.run(Application.class, args);
+ KafkaAlarmsThread kafkaAlarms = new KafkaAlarmsThread();
+ kafkaAlarms.start();
+ KafkaKpisThread kafkaKpis = new KafkaKpisThread();
+ kafkaKpis.start();
+ //SpringApplication.run(Application.class, args);
}
}
-class KafkaThread extends Thread {
+class KafkaAlarmsThread extends Thread {
- private final static Logger logger = LoggerFactory.getLogger("KafkaThread");
+ private final static Logger logger = LoggerFactory.getLogger("KafkaAlarmsThread");
public void run() {
- logger.debug("Start Kafka Consumer Thread");
+ logger.debug("Start Kafka Alarms Consumer Thread");
try {
- VolthaKafkaConsumer consumer = new VolthaKafkaConsumer();
+ VolthaKafkaConsumer consumer = new VolthaKafkaConsumer(KafkaConsumerType.ALARMS);
consumer.runConsumer();
- } catch (InterruptedException e) {
- logger.error(e.getMessage());
+ } catch (Exception e) {
+ logger.error("Error in Kafka Alarm thread", e);
+ }
+
+ }
+}
+class KafkaKpisThread extends Thread {
+
+ private final static Logger logger = LoggerFactory.getLogger("KafkaKpisThread");
+
+ public void run() {
+ logger.debug("Start Kafka KPIs Consumer Thread");
+ try {
+ VolthaKafkaConsumer consumer = new VolthaKafkaConsumer(KafkaConsumerType.KPIS);
+ consumer.runConsumer();
+ } catch (Exception e) {
+ logger.error("Error Kafka, KPI thread", e);
}
}
diff --git a/src/main/java/kafka/KafkaConsumerType.java b/src/main/java/kafka/KafkaConsumerType.java
new file mode 100644
index 0000000..13cd249
--- /dev/null
+++ b/src/main/java/kafka/KafkaConsumerType.java
@@ -0,0 +1,22 @@
+/*
+* Copyright 2018- Cisco
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package kafka;
+
+public enum KafkaConsumerType {
+ ALARMS,
+ KPIS
+}
diff --git a/src/main/java/kafka/VolthaKafkaConsumer.java b/src/main/java/kafka/VolthaKafkaConsumer.java
index dc431bb..11deb81 100644
--- a/src/main/java/kafka/VolthaKafkaConsumer.java
+++ b/src/main/java/kafka/VolthaKafkaConsumer.java
@@ -12,6 +12,7 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
+*
*/
package kafka;
import org.apache.kafka.clients.consumer.*;
@@ -38,7 +39,6 @@
import ves.*;
import config.Config;
-
public class VolthaKafkaConsumer {
private final Logger logger = LoggerFactory.getLogger("VolthaKafkaConsumer");
@@ -47,13 +47,17 @@
private KafkaConsumer<Long, String> consumer;
- public VolthaKafkaConsumer() {
+ private KafkaConsumerType type;
+
+ public VolthaKafkaConsumer(KafkaConsumerType type) {
logger.debug("VolthaKafkaConsumer constructor called");
initVesAgent();
+ this.type = type;
try {
consumer = createConsumer();
- } catch (KafkaException e) {
- logger.error("Error with Kafka connection. Retrying in 15 seconds.");
+ } catch (Exception e) {
+ logger.error("Error with Kafka: ", e);
+ logger.error("Retrying in 15 seconds.");
//Don't try to resolve it here. Try again in the thread loo, in case this is a temporal issue
}
}
@@ -70,26 +74,31 @@
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG,
- "KafkaExampleConsumer");
+ "VesAgent");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
LongDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
false);
-
// Create the consumer using props.
final KafkaConsumer<Long, String> consumer =
new KafkaConsumer<>(props);
-
// Subscribe to the topic.
- consumer.subscribe(Collections.singletonList(Config.getKafkaTopic()));
+ switch (type) {
+ case ALARMS:
+ consumer.subscribe(Collections.singletonList(Config.getKafkaAlarmsTopic()));
+ break;
+ case KPIS:
+ consumer.subscribe(Collections.singletonList(Config.getKafkaKpisTopic()));
+ break;
+ }
return consumer;
}
public void runConsumer() throws InterruptedException {
- logger.debug("Starting Consumer");
+ logger.debug("Starting Kafka Consumer");
while (true) {
ConsumerRecords<Long, String> consumerRecords;
@@ -98,8 +107,9 @@
this.consumer = createConsumer();
}
consumerRecords = consumer.poll(20000);
- } catch (KafkaException e) {
- logger.error("Error with Kafka connection. Retrying in 15 seconds.");
+ } catch (Exception e) {
+ logger.error("Error with kafka: ", e);
+ logger.error("Retrying in 15 seconds.");
consumer = null;
TimeUnit.SECONDS.sleep(15);
continue;
@@ -114,7 +124,7 @@
record.key(), record.value(),
record.partition(), record.offset());
logger.info("Attempting to send data to VES");
- boolean success = VesAgent.sendToVES(record.value());
+ boolean success = VesAgent.sendToVES(type, record.value());
if (!success) {
throw new HTTPException(0);
} else {
diff --git a/src/main/java/mapper/VesVolthaMessage.java b/src/main/java/mapper/VesVolthaAlarm.java
similarity index 97%
rename from src/main/java/mapper/VesVolthaMessage.java
rename to src/main/java/mapper/VesVolthaAlarm.java
index f0327df..743b8f1 100644
--- a/src/main/java/mapper/VesVolthaMessage.java
+++ b/src/main/java/mapper/VesVolthaAlarm.java
@@ -19,7 +19,7 @@
import org.slf4j.LoggerFactory;
import java.util.Map;
-public class VesVolthaMessage {
+public class VesVolthaAlarm {
private String id = "";
private String logical_device_id = "";
private String raised_ts = "";
diff --git a/src/main/java/mapper/VesVolthaKpi.java b/src/main/java/mapper/VesVolthaKpi.java
new file mode 100644
index 0000000..b6bf02e
--- /dev/null
+++ b/src/main/java/mapper/VesVolthaKpi.java
@@ -0,0 +1,53 @@
+/*
+* Copyright 2018- Cisco
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package mapper;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Map;
+import com.google.gson.annotations.JsonAdapter;
+
+public class VesVolthaKpi {
+ private String type = "";
+ private String ts = "";
+
+ //@JsonAdapter(StringTypeAdapter.class)
+ private String slice_data;
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ public String getTs() {
+ return ts;
+ }
+
+ public void setTs(String ts) {
+ this.ts = ts;
+ }
+
+ public String getSliceData() {
+ return slice_data;
+ }
+
+ public void setSliceData(String slice_data) {
+ this.slice_data = slice_data;
+ }
+}
diff --git a/src/main/java/mapper/VesVolthaMapper.java b/src/main/java/mapper/VesVolthaMapper.java
index a3ee1ab..6d927ca 100644
--- a/src/main/java/mapper/VesVolthaMapper.java
+++ b/src/main/java/mapper/VesVolthaMapper.java
@@ -19,6 +19,7 @@
import org.slf4j.LoggerFactory;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
+import com.google.gson.JsonObject;
public class VesVolthaMapper {
@@ -30,7 +31,17 @@
gson = new GsonBuilder().create();
}
- public VesVolthaMessage parseJson(String json) {
- return gson.fromJson(json, VesVolthaMessage.class);
+ public VesVolthaAlarm parseAlarm(String json) {
+ return gson.fromJson(json, VesVolthaAlarm.class);
+ }
+
+ public VesVolthaKpi parseKpi(String json) {
+ //return gson.fromJson(json, VesVolthaKpi.class);
+ JsonObject body = gson.fromJson(json, JsonObject.class);
+ VesVolthaKpi kpi = new VesVolthaKpi();
+ kpi.setType(body.get("type").getAsString());
+ kpi.setTs(body.get("ts").getAsString());
+ kpi.setSliceData(body.get("slice_data").toString());
+ return kpi;
}
}
diff --git a/src/main/java/ves/VesAgent.java b/src/main/java/ves/VesAgent.java
index bea964d..ac42115 100644
--- a/src/main/java/ves/VesAgent.java
+++ b/src/main/java/ves/VesAgent.java
@@ -33,7 +33,9 @@
import config.Config;
import mapper.VesVolthaMapper;
-import mapper.VesVolthaMessage;
+import mapper.VesVolthaAlarm;
+import mapper.VesVolthaKpi;
+import kafka.KafkaConsumerType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,9 +65,31 @@
}
}
- public static boolean sendToVES(String json) throws JsonSyntaxException {
- VesVolthaMessage message = mapper.parseJson(json);
+ public static boolean sendToVES(KafkaConsumerType type, String json) throws JsonSyntaxException {
+ int code = 0;
+
+ switch (type) {
+ case ALARMS:
+ code = sendFault(json);
+ break;
+ case KPIS:
+ code = sendKpi(json);
+ break;
+ }
+
+ if(code == 0 || code >= HttpURLConnection.HTTP_BAD_REQUEST ) {
+ return false;
+ } else {
+ return true;
+ }
+ }
+
+ private static int sendFault(String json) {
+ VesVolthaAlarm message = mapper.parseAlarm(json);
+
String id = message.getId();
+ String[] idsplit = id.split("\\.");
+ String eventType = idsplit[idsplit.length-1];
String ldeviceId = message.getLogicalDeviceId();
String ts = message.getRaisedTS();
String description = message.getDescription();
@@ -80,7 +104,7 @@
EVEL_SEVERITIES vesSeverity = mapSeverity(severity);
EVEL_SOURCE_TYPES vesType = mapType(type);
EvelFault flt = new EvelFault(
- "Fault_VOLTHA_" + id,
+ "Fault_VOLTHA_" + eventType,
ldeviceId + ":" + ts,
id,
description,
@@ -98,11 +122,25 @@
logger.info("Sending fault event");
int code = AgentMain.evel_post_event_immediate(flt);
logger.info("Fault event http code received: " + code);
- if(code == 0 || code >= HttpURLConnection.HTTP_BAD_REQUEST ) {
- return false;
- } else {
- return true;
- }
+ return code;
+ }
+
+ private static int sendKpi(String json) {
+ VesVolthaKpi message = mapper.parseKpi(json);
+
+ EvelOther ev = new EvelOther("measurement_VOLTHA_KPI", "vmname_ip");
+ ev.evel_other_field_add("co_id", Config.getCoId());
+ ev.evel_other_field_add("pod_id", Config.getPodId());
+ ev.evel_other_field_add("type", message.getType());
+ ev.evel_other_field_add("ts", message.getTs());
+ ev.evel_other_field_add("slices", message.getSliceData());
+
+ ev.evel_other_field_add("voltha", json);
+
+ logger.info("Sending fault event");
+ int code = AgentMain.evel_post_event_immediate(ev);
+ logger.info("Fault event http code received: " + code);
+ return code;
}
private static EVEL_SEVERITIES mapSeverity(String severity) {
diff --git a/ves-agent.yaml b/ves-agent.yaml
index 29534de..8acf42b 100644
--- a/ves-agent.yaml
+++ b/ves-agent.yaml
@@ -16,7 +16,7 @@
kind: Service
metadata:
name: ves
- namespace: voltha
+ namespace: default
labels:
name: ves
spec:
@@ -32,7 +32,7 @@
kind: Deployment
metadata:
name: ves
- namespace: voltha
+ namespace: default
spec:
replicas: 1
template: