blob: 68450d1343e6c7ffc27c957ce464778e741fbcea [file] [log] [blame]
Matteo Scandolo8a015832018-10-09 14:54:11 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.opencord.kafka.integrations;
18
19import com.fasterxml.jackson.databind.JsonNode;
20import com.fasterxml.jackson.databind.ObjectMapper;
21import com.fasterxml.jackson.databind.node.ArrayNode;
22import com.fasterxml.jackson.databind.node.ObjectNode;
Carmelo Cascone7e73fa12019-07-15 18:29:01 -070023import org.osgi.service.component.annotations.Activate;
24import org.osgi.service.component.annotations.Component;
25import org.osgi.service.component.annotations.Deactivate;
26import org.osgi.service.component.annotations.Reference;
27import org.osgi.service.component.annotations.ReferenceCardinality;
Matteo Scandolo8a015832018-10-09 14:54:11 -070028import org.onosproject.net.DeviceId;
29import org.onosproject.net.device.DeviceEvent;
30import org.onosproject.net.device.DeviceListener;
31import org.onosproject.net.device.DeviceService;
32import org.onosproject.net.device.PortStatistics;
Scott Baker900b48b2019-03-18 16:59:31 -070033import org.onosproject.net.Port;
Matteo Scandolo8a015832018-10-09 14:54:11 -070034import org.opencord.kafka.EventBusService;
35import org.slf4j.Logger;
36import org.slf4j.LoggerFactory;
37
38import java.time.Instant;
39import java.util.Iterator;
40import java.util.List;
41
42
43/**
44 * Listens for access device events and pushes them on a Kafka bus.
45 */
46@Component(immediate = true)
47public class DeviceKafkaIntegration {
48
49 public Logger log = LoggerFactory.getLogger(getClass());
50
Carmelo Cascone7e73fa12019-07-15 18:29:01 -070051 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Matteo Scandolo8a015832018-10-09 14:54:11 -070052 protected EventBusService eventBusService;
53
Carmelo Cascone7e73fa12019-07-15 18:29:01 -070054 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Matteo Scandolo8a015832018-10-09 14:54:11 -070055 protected DeviceService deviceService;
56
57 private final DeviceListener listener = new InternalDeviceListener();
58
59 private static final String TOPIC = "onos.kpis";
Scott Baker900b48b2019-03-18 16:59:31 -070060 private static final String PORT_EVENT_TOPIC = "onos.events.port";
Matteo Scandolo8a015832018-10-09 14:54:11 -070061
62 // event fields
63 private static final String TIMESTAMP = "timestamp";
64 private static final String DEVICE_ID = "deviceId";
65 private static final String PORTS = "ports";
66 private static final String PORT_ID = "portId";
67 private static final String PKT_RX = "pktRx";
68 private static final String PKT_TX = "pktTx";
69 private static final String BYTES_RX = "bytesRx";
70 private static final String BYTES_TX = "bytesTx";
71 private static final String PKT_RX_DROP = "pktRxDrp";
72 private static final String PKT_TX_DROP = "pktTxDrp";
Scott Baker900b48b2019-03-18 16:59:31 -070073 private static final String ENABLED = "enabled";
74 private static final String SPEED = "speed";
75 private static final String TYPE = "type";
Matteo Scandolo8a015832018-10-09 14:54:11 -070076
77 @Activate
78 public void activate() {
79 deviceService.addListener(listener);
80 log.info("Started");
81 }
82
83 @Deactivate
84 public void deactivate() {
85 deviceService.removeListener(listener);
86 log.info("Stopped");
87 }
88
89 private void handle(List<PortStatistics> stats, DeviceId deviceId) {
Scott Baker900b48b2019-03-18 16:59:31 -070090 eventBusService.send(TOPIC, serializeStats(stats, deviceId));
Matteo Scandolo8a015832018-10-09 14:54:11 -070091 }
92
Scott Baker900b48b2019-03-18 16:59:31 -070093 private void handlePortUpdate(Port port, DeviceId deviceId) {
94 eventBusService.send(PORT_EVENT_TOPIC, serializePort(port, deviceId));
95 }
96
97 private JsonNode serializeStats(List<PortStatistics> stats, DeviceId deviceId) {
Matteo Scandolo8a015832018-10-09 14:54:11 -070098
99 ObjectMapper mapper = new ObjectMapper();
100 ObjectNode kpis = mapper.createObjectNode();
101 ArrayNode ports = mapper.createArrayNode();
102
103 for (Iterator<PortStatistics> i = stats.iterator(); i.hasNext();) {
104 PortStatistics stat = i.next();
105
106 ObjectNode port = mapper.createObjectNode();
107 port.put(PORT_ID, stat.portNumber().toString());
108 port.put(PKT_RX, stat.packetsReceived());
109 port.put(PKT_TX, stat.packetsSent());
110 port.put(BYTES_RX, stat.bytesReceived());
111 port.put(BYTES_TX, stat.bytesSent());
112 port.put(PKT_RX_DROP, stat.packetsRxDropped());
113 port.put(PKT_TX_DROP, stat.packetsTxDropped());
114
115 ports.add(port);
116 }
117
118 kpis.put(TIMESTAMP, Instant.now().toString());
119 kpis.put(PORTS, ports);
120 kpis.put(DEVICE_ID, deviceId.toString());
121
122 return kpis;
123 }
124
Scott Baker900b48b2019-03-18 16:59:31 -0700125 private JsonNode serializePort(Port port, DeviceId deviceId) {
126
127 ObjectMapper mapper = new ObjectMapper();
128 ObjectNode update = mapper.createObjectNode();
129
130 update.put(TIMESTAMP, Instant.now().toString());
131 update.put(DEVICE_ID, deviceId.toString());
132 update.put(PORT_ID, port.number().toString());
133 update.put(ENABLED, port.isEnabled());
134 update.put(SPEED, port.portSpeed());
135 update.put(TYPE, port.type().toString());
136
137 return update;
138 }
139
Matteo Scandolo8a015832018-10-09 14:54:11 -0700140 private class InternalDeviceListener implements
141 DeviceListener {
142
143 @Override
144 public void event(DeviceEvent deviceEvent) {
Scott Baker900b48b2019-03-18 16:59:31 -0700145 final DeviceId deviceId;
Matteo Scandolo8a015832018-10-09 14:54:11 -0700146
147 if (deviceEvent.subject().manufacturer().contains("VOLTHA")) {
148 // TODO check the NNI port instead
149 return;
150 }
151
152 log.trace("Got DeviceEvent: " + deviceEvent.type());
153 switch (deviceEvent.type()) {
154 case PORT_STATS_UPDATED:
Scott Baker900b48b2019-03-18 16:59:31 -0700155 deviceId = deviceEvent.subject().id();
Matteo Scandolo8a015832018-10-09 14:54:11 -0700156 final List<PortStatistics> stats = deviceService.getPortStatistics(deviceId);
157 handle(stats, deviceId);
158 break;
Scott Baker900b48b2019-03-18 16:59:31 -0700159 case PORT_UPDATED:
160 deviceId = deviceEvent.subject().id();
161 final Port port = deviceEvent.port();
162 handlePortUpdate(port, deviceId);
163 break;
Matteo Scandolo8a015832018-10-09 14:54:11 -0700164 default:
165 break;
166 }
167 }
168 }
169}