blob: e7cffd8a25f812715a5179f66dd2404a5db1941c [file] [log] [blame]
Daniele Moro6f30ffd2019-12-06 16:10:40 -08001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.opencord.kafka.integrations;
18
19import com.fasterxml.jackson.databind.JsonNode;
20import com.fasterxml.jackson.databind.ObjectMapper;
21import com.fasterxml.jackson.databind.node.ObjectNode;
22import com.google.common.collect.ImmutableMap;
23import org.apache.commons.lang3.tuple.Pair;
24import org.onosproject.net.behaviour.BngProgrammable;
25import org.onosproject.net.pi.runtime.PiCounterCellData;
26import org.opencord.bng.BngStatsEvent;
27import org.opencord.bng.BngStatsEventListener;
28import org.opencord.bng.BngStatsEventSubject;
29import org.opencord.bng.BngStatsService;
30import org.opencord.kafka.EventBusService;
31import org.osgi.service.component.annotations.Activate;
32import org.osgi.service.component.annotations.Component;
33import org.osgi.service.component.annotations.Deactivate;
34import org.osgi.service.component.annotations.Reference;
35import org.osgi.service.component.annotations.ReferenceCardinality;
36import org.osgi.service.component.annotations.ReferencePolicy;
37
38import java.time.Instant;
39import java.util.Map;
40import java.util.concurrent.atomic.AtomicReference;
41
42/**
43 * Listens for statistic events from the BNG app and pushes them on a Kafka
44 * bus.
45 */
46@Component(immediate = true)
47public class BngStatsKafkaIntegration extends AbstractKafkaIntegration {
48
49 private static final String TOPIC_STATS = "bng.stats";
50 private static final String SUBSCRIBER_S_TAG = "sTag";
51 private static final String SUBSCRIBER_C_TAG = "cTag";
52
53 private static final String UP_TX_BYTES = "upTermBytes";
54 private static final String UP_TX_PACKETS = "upTermPackets";
55
56 private static final String UP_RX_BYTES = "upRxBytes";
57 private static final String UP_RX_PACKETS = "upRxPackets";
58
59 private static final String UP_DROP_BYTES = "upDropBytes";
60 private static final String UP_DROP_PACKETS = "upDropPackets";
61
62 private static final String DOWN_RX_BYTES = "downRxBytes";
63 private static final String DOWN_RX_PACKETS = "downRxPackets";
64
65 private static final String DOWN_TX_BYTES = "downTxBytes";
66 private static final String DOWN_TX_PACKETS = "downTxPackets";
67
68 private static final String DOWN_DROP_BYTES = "downDropBytes";
69 private static final String DOWN_DROP_PACKETS = "downDropPackets";
70
71 private static final String CONTROL_PACKETS = "controlPackets";
72
73 private static final ImmutableMap<BngProgrammable.BngCounterType, Pair<String, String>> MAP_COUNTERS =
74 ImmutableMap.<BngProgrammable.BngCounterType, Pair<String, String>>builder()
75 .put(BngProgrammable.BngCounterType.UPSTREAM_RX, Pair.of(UP_RX_BYTES, UP_RX_PACKETS))
76 .put(BngProgrammable.BngCounterType.UPSTREAM_TX, Pair.of(UP_TX_BYTES, UP_TX_PACKETS))
77 .put(BngProgrammable.BngCounterType.UPSTREAM_DROPPED, Pair.of(UP_DROP_BYTES, UP_DROP_PACKETS))
78
79 .put(BngProgrammable.BngCounterType.DOWNSTREAM_RX, Pair.of(DOWN_RX_BYTES, DOWN_RX_PACKETS))
80 .put(BngProgrammable.BngCounterType.DOWNSTREAM_TX, Pair.of(DOWN_TX_BYTES, DOWN_TX_PACKETS))
81 .put(BngProgrammable.BngCounterType.DOWNSTREAM_DROPPED, Pair.of(DOWN_DROP_BYTES, DOWN_DROP_PACKETS))
82 .build();
83
84 private static final String TIMESTAMP = "timestamp";
85 private static final String ATTACHMENT_TYPE = "attachmentType";
86 private static final String DEVICE_ID = "deviceId";
87 private static final String PORT_NUMBER = "portNumber";
88 private static final String MAC_ADDRESS = "macAddress";
89 private static final String IP_ADDRESS = "ipAddress";
90 private static final String ONU_SERIAL_NUMBER = "onuSerialNumber";
91 private static final String PPPOE_SESSION_ID = "pppoeSessionId";
92
93 private final BngStatsEventListener statsListener = new InternalStatsListener();
94
95 private final AtomicReference<BngStatsService> bngStatsServiceRef = new AtomicReference<>();
96
97 @Reference(cardinality = ReferenceCardinality.OPTIONAL,
98 policy = ReferencePolicy.DYNAMIC,
99 bind = "bindBngStatsService",
100 unbind = "unbindBngStatsService")
101 protected volatile BngStatsService bngStatsService;
102
103 @Reference(cardinality = ReferenceCardinality.MANDATORY)
104 protected EventBusService eventBusService;
105
106
107 protected void bindBngStatsService(BngStatsService incomingService) {
108 bindAndAddListener(incomingService, bngStatsServiceRef, statsListener);
109 }
110
111 protected void unbindBngStatsService(BngStatsService outgoingService) {
112 unbindAndRemoveListener(outgoingService, bngStatsServiceRef, statsListener);
113 }
114
115 @Activate
116 public void activate() {
117 log.info("Started BngKafkaIntegration");
118 }
119
120 @Deactivate
121 public void deactivate() {
122 unbindBngStatsService(bngStatsServiceRef.get());
123 log.info("Stopped BngKafkaIntegration");
124 }
125
126 private JsonNode serializeBngStatsEvent(BngStatsEventSubject eventSubject) {
127 // Serialize stats in a JSON node
128 ObjectMapper mapper = new ObjectMapper();
129 ObjectNode attStatsNode = mapper.createObjectNode();
130
131 // Exposing statistics only for PPPoE attachment
132 var attachmentStats = eventSubject.getAttachmentStats();
133 var attachment = eventSubject.getBngAttachment();
134
135 attStatsNode.put(MAC_ADDRESS, attachment.macAddress().toString());
136 attStatsNode.put(IP_ADDRESS, attachment.ipAddress().toString());
137 attStatsNode.put(PPPOE_SESSION_ID, attachment.pppoeSessionId());
138
139 attStatsNode.put(SUBSCRIBER_S_TAG, attachment.sTag().toShort());
140 attStatsNode.put(SUBSCRIBER_C_TAG, attachment.cTag().toShort());
141
142 attStatsNode.put(ONU_SERIAL_NUMBER, attachment.onuSerial());
143 attStatsNode.put(ATTACHMENT_TYPE, attachment.type().toString());
144
145 attStatsNode.put(DEVICE_ID, attachment.oltConnectPoint().deviceId().toString());
146 attStatsNode.put(PORT_NUMBER, attachment.oltConnectPoint().port().toString());
147
148 // Add the statistics to the JSON
149 attStatsNode = createNodesStats(attachmentStats, attStatsNode);
150
151 // Control stats are different, only packets statistics
152 attStatsNode.put(CONTROL_PACKETS,
153 attachmentStats.get(BngProgrammable.BngCounterType.CONTROL_PLANE).packets());
154
155 attStatsNode.put(TIMESTAMP, Instant.now().toString());
156 return attStatsNode;
157 }
158
159 private ObjectNode createNodesStats(Map<BngProgrammable.BngCounterType,
160 PiCounterCellData> attStats, ObjectNode node) {
161 MAP_COUNTERS.forEach((counterType, pairStats) -> {
162 if (attStats.containsKey(counterType)) {
163 node.put(pairStats.getLeft(),
164 attStats.get(counterType).bytes());
165 node.put(pairStats.getRight(),
166 attStats.get(counterType).packets());
167 }
168 });
169 return node;
170 }
171
172 private class InternalStatsListener implements BngStatsEventListener {
173
174 @Override
175 public void event(BngStatsEvent event) {
176 eventBusService.send(TOPIC_STATS, serializeBngStatsEvent(event.subject()));
177 }
178 }
179}