blob: 677e261610b138c53cb88a522a79ad5a94469cc4 [file] [log] [blame]
Jonathan Hart2aad7792018-07-31 15:09:17 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.opencord.kafka.integrations;
18
19import com.fasterxml.jackson.databind.JsonNode;
20import com.fasterxml.jackson.databind.ObjectMapper;
21import com.fasterxml.jackson.databind.node.ObjectNode;
22import org.apache.felix.scr.annotations.Activate;
23import org.apache.felix.scr.annotations.Component;
24import org.apache.felix.scr.annotations.Deactivate;
25import org.apache.felix.scr.annotations.Reference;
26import org.apache.felix.scr.annotations.ReferenceCardinality;
Matteo Scandolod50a4d32019-04-24 12:10:21 -070027import org.apache.felix.scr.annotations.ReferencePolicy;
Marcos Aurelio Carrerob0617732019-12-04 16:09:49 -030028import org.onosproject.cluster.ClusterService;
Matteo Scandolo580fb7f2019-04-17 15:33:15 -070029import org.onosproject.net.AnnotationKeys;
30import org.onosproject.net.device.DeviceService;
Jonathan Hart2aad7792018-07-31 15:09:17 -040031import org.opencord.dhcpl2relay.DhcpAllocationInfo;
32import org.opencord.dhcpl2relay.DhcpL2RelayEvent;
33import org.opencord.dhcpl2relay.DhcpL2RelayListener;
34import org.opencord.dhcpl2relay.DhcpL2RelayService;
35import org.opencord.kafka.EventBusService;
36import org.slf4j.Logger;
37import org.slf4j.LoggerFactory;
38
39import java.time.Instant;
40
41/**
42 * Listens for DHCP L2 relay events and pushes them on a Kafka bus.
43 */
44@Component(immediate = true)
45public class DhcpL2RelayKafkaIntegration {
46
47 public Logger log = LoggerFactory.getLogger(getClass());
48
49 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
50 protected EventBusService eventBusService;
51
Matteo Scandolo580fb7f2019-04-17 15:33:15 -070052 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Marcos Aurelio Carrerob0617732019-12-04 16:09:49 -030053 protected ClusterService clusterService;
54
55 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Matteo Scandolo580fb7f2019-04-17 15:33:15 -070056 protected DeviceService deviceService;
57
Matteo Scandolo03f13c12019-03-20 14:38:12 -070058 @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY,
Matteo Scandolod50a4d32019-04-24 12:10:21 -070059 policy = ReferencePolicy.DYNAMIC,
Matteo Scandolo03f13c12019-03-20 14:38:12 -070060 bind = "bindDhcpL2RelayService",
61 unbind = "unbindDhcpL2RelayService")
62 protected DhcpL2RelayService dhcpL2RelayService;
Jonathan Hart2aad7792018-07-31 15:09:17 -040063
64 private final DhcpL2RelayListener listener = new InternalDhcpL2RelayListener();
65
Marcos Aurelio Carrerob0617732019-12-04 16:09:49 -030066 // topics
Jonathan Hart2aad7792018-07-31 15:09:17 -040067 private static final String TOPIC = "dhcp.events";
Marcos Aurelio Carrerob0617732019-12-04 16:09:49 -030068 private static final String DHCP_STATS_TOPIC = "onos.dhcp.stats.kpis";
Jonathan Hart2aad7792018-07-31 15:09:17 -040069
70 private static final String TIMESTAMP = "timestamp";
71 private static final String DEVICE_ID = "deviceId";
Matteo Scandolo580fb7f2019-04-17 15:33:15 -070072 private static final String PORT_NUMBER = "portNumber";
73 private static final String SERIAL_NUMBER = "serialNumber";
Jonathan Hart2aad7792018-07-31 15:09:17 -040074 private static final String TYPE = "type";
75 private static final String MESSAGE_TYPE = "messageType";
Jonathan Hart2aad7792018-07-31 15:09:17 -040076 private static final String MAC_ADDRESS = "macAddress";
77 private static final String IP_ADDRESS = "ipAddress";
78
Marcos Aurelio Carrerob0617732019-12-04 16:09:49 -030079 // dhcp stats event params
80 static final String CONNECT_POINT = "connectPoint";
81 static final String INSTANCE_ID = "instance_id";
82 static final String METRICS = "metrics";
83 static final String SUBSCRIBER_ID = "subscriberId";
84 static final String SUBSCRIBER_INFO = "subscriberInfo";
85 static final String TS = "ts";
86 static final String TITLE = "title";
87
88 static final String GLOBAL_STATS_TITLE = "DHCP_L2_Relay_stats";
89 static final String PER_SUBSCRIBER_STATS_TITLE = "DHCP_L2_Relay_stats_Per_Subscriber";
90
Matteo Scandolo03f13c12019-03-20 14:38:12 -070091 protected void bindDhcpL2RelayService(DhcpL2RelayService dhcpL2RelayService) {
92 if (this.dhcpL2RelayService == null) {
93 log.info("Binding DhcpL2RelayService");
94 this.dhcpL2RelayService = dhcpL2RelayService;
95 log.info("Adding listener on DhcpL2RelayService");
96 dhcpL2RelayService.addListener(listener);
97 } else {
98 log.warn("Trying to bind DhcpL2RelayService but it is already bound");
99 }
100 }
101
102 protected void unbindDhcpL2RelayService(DhcpL2RelayService dhcpL2RelayService) {
103 if (this.dhcpL2RelayService == dhcpL2RelayService) {
104 log.info("Unbinding DhcpL2RelayService");
105 this.dhcpL2RelayService = null;
106 log.info("Removing listener on DhcpL2RelayService");
107 dhcpL2RelayService.removeListener(listener);
108 } else {
109 log.warn("Trying to unbind DhcpL2RelayService but it is already unbound");
110 }
111 }
112
Jonathan Hart2aad7792018-07-31 15:09:17 -0400113 @Activate
114 public void activate() {
Matteo Scandolod50a4d32019-04-24 12:10:21 -0700115 log.info("Started DhcpL2RelayKafkaIntegration");
Jonathan Hart2aad7792018-07-31 15:09:17 -0400116 }
117
118 @Deactivate
119 public void deactivate() {
Matteo Scandolod50a4d32019-04-24 12:10:21 -0700120 log.info("Stopped DhcpL2RelayKafkaIntegration");
Jonathan Hart2aad7792018-07-31 15:09:17 -0400121 }
122
123 private void handle(DhcpL2RelayEvent event) {
Marcos Aurelio Carrerob0617732019-12-04 16:09:49 -0300124 switch (event.type()) {
125 case STATS_UPDATE:
126 // pushes the stats based on the received event (per subscriber or global) on a Kafka bus
127 if (event.getSubscriberId() != null && event.subject() != null) {
128 eventBusService.send(DHCP_STATS_TOPIC, serializeStat(event, PER_SUBSCRIBER_STATS_TITLE));
129 } else {
130 eventBusService.send(DHCP_STATS_TOPIC, serializeStat(event, GLOBAL_STATS_TITLE));
131 }
132 log.info("Writing to kafka topic:{}, type:{}", DHCP_STATS_TOPIC,
133 DhcpL2RelayEvent.Type.STATS_UPDATE.toString());
134 break;
135 default:
136 eventBusService.send(TOPIC, serialize(event));
137 log.info("Writing to kafka topic:{}, type:{}", TOPIC, event.type().toString());
138 break;
139 }
Jonathan Hart2aad7792018-07-31 15:09:17 -0400140 }
141
142 private JsonNode serialize(DhcpL2RelayEvent event) {
Matteo Scandolo580fb7f2019-04-17 15:33:15 -0700143
144 String sn = deviceService.getPort(event.subject().location()).annotations().value(AnnotationKeys.PORT_NAME);
145
Jonathan Hart2aad7792018-07-31 15:09:17 -0400146 ObjectMapper mapper = new ObjectMapper();
147 ObjectNode dhcpEvent = mapper.createObjectNode();
148 DhcpAllocationInfo allocationInfo = event.subject();
149 dhcpEvent.put(TYPE, event.type().toString());
150 dhcpEvent.put(TIMESTAMP, Instant.now().toString());
151 dhcpEvent.put(DEVICE_ID, event.connectPoint().deviceId().toString());
Jonathan Hart2aad7792018-07-31 15:09:17 -0400152 dhcpEvent.put(PORT_NUMBER, event.connectPoint().port().toString());
Matteo Scandolo580fb7f2019-04-17 15:33:15 -0700153 dhcpEvent.put(SERIAL_NUMBER, sn);
154 dhcpEvent.put(MESSAGE_TYPE, allocationInfo.type().toString());
Jonathan Hart2aad7792018-07-31 15:09:17 -0400155 dhcpEvent.put(MAC_ADDRESS, allocationInfo.macAddress().toString());
156 dhcpEvent.put(IP_ADDRESS, allocationInfo.ipAddress().toString());
157 return dhcpEvent;
158 }
159
Marcos Aurelio Carrerob0617732019-12-04 16:09:49 -0300160 /**
161 * Returns a Json object that represents the DHCP L2 Relay stats.
162 *
163 * @param event DHCP L2 Relay event used for stats.
164 * @param title Describes the type of the received stats event (per subscriber or global).
165 */
166 private JsonNode serializeStat(DhcpL2RelayEvent event, String title) {
167 ObjectMapper mapper = new ObjectMapper();
168 ObjectNode statsEvent = mapper.createObjectNode();
169 Long ts = Instant.now().getEpochSecond();
170
171 // metrics for global and per subscriber stats
172 ObjectNode metrics = mapper.createObjectNode();
173 metrics.put(event.getCountersEntry().getKey(), event.getCountersEntry().getValue().longValue());
174
175 statsEvent.put(INSTANCE_ID, clusterService.getLocalNode().id().toString());
176 statsEvent.put(TITLE, title);
177 statsEvent.put(TS, ts);
178 statsEvent.put(METRICS, metrics);
179
180 // specific metrics for per subscriber stats
181 if (event.getSubscriberId() != null && event.subject() != null) {
182 String sn = deviceService.getDevice(event.subject().location().deviceId()).serialNumber();
183 ObjectNode subscriberInfo = mapper.createObjectNode();
184
185 statsEvent.put(SERIAL_NUMBER, sn);
186 subscriberInfo.put(SUBSCRIBER_ID, event.getSubscriberId());
187 subscriberInfo.put(CONNECT_POINT, event.subject().location().toString());
188 subscriberInfo.put(MAC_ADDRESS, event.subject().macAddress().toString());
189
190 statsEvent.put(SUBSCRIBER_INFO, subscriberInfo);
191 }
192
193 return statsEvent;
194 }
195
Jonathan Hart2aad7792018-07-31 15:09:17 -0400196 private class InternalDhcpL2RelayListener implements
197 DhcpL2RelayListener {
198
199 @Override
200 public void event(DhcpL2RelayEvent dhcpL2RelayEvent) {
201 handle(dhcpL2RelayEvent);
202 }
203 }
204}