blob: e20ecbb8f230bcdb416a9ee86990ea22d655b4fb [file] [log] [blame]
Daniele Moro6f30ffd2019-12-06 16:10:40 -08001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.opencord.kafka.integrations;
18
19import com.fasterxml.jackson.databind.JsonNode;
20import com.fasterxml.jackson.databind.ObjectMapper;
21import com.fasterxml.jackson.databind.node.ObjectNode;
22import com.google.common.collect.ImmutableMap;
23import org.apache.commons.lang3.tuple.Pair;
24import org.onosproject.net.behaviour.BngProgrammable;
25import org.onosproject.net.pi.runtime.PiCounterCellData;
26import org.opencord.bng.BngStatsEvent;
27import org.opencord.bng.BngStatsEventListener;
28import org.opencord.bng.BngStatsEventSubject;
29import org.opencord.bng.BngStatsService;
30import org.opencord.kafka.EventBusService;
31import org.osgi.service.component.annotations.Activate;
32import org.osgi.service.component.annotations.Component;
33import org.osgi.service.component.annotations.Deactivate;
34import org.osgi.service.component.annotations.Reference;
35import org.osgi.service.component.annotations.ReferenceCardinality;
36import org.osgi.service.component.annotations.ReferencePolicy;
37
38import java.time.Instant;
39import java.util.Map;
40import java.util.concurrent.atomic.AtomicReference;
41
42/**
43 * Listens for statistic events from the BNG app and pushes them on a Kafka
44 * bus.
45 */
46@Component(immediate = true)
47public class BngStatsKafkaIntegration extends AbstractKafkaIntegration {
48
Daniele Moro16600ce2020-01-15 12:00:05 -080049 @Reference(cardinality = ReferenceCardinality.MANDATORY)
50 protected EventBusService eventBusService;
51
52 @Reference(cardinality = ReferenceCardinality.OPTIONAL,
53 policy = ReferencePolicy.DYNAMIC,
54 bind = "bindBngStatsService",
55 unbind = "unbindBngStatsService")
56 protected volatile BngStatsService ignore;
57 private final AtomicReference<BngStatsService> bngStatsServiceRef = new AtomicReference<>();
58
Daniele Moro6f30ffd2019-12-06 16:10:40 -080059 private static final String TOPIC_STATS = "bng.stats";
60 private static final String SUBSCRIBER_S_TAG = "sTag";
61 private static final String SUBSCRIBER_C_TAG = "cTag";
62
Daniele Moro16600ce2020-01-15 12:00:05 -080063 private static final String UP_TX_BYTES = "upTxBytes";
64 private static final String UP_TX_PACKETS = "upTxPackets";
Daniele Moro6f30ffd2019-12-06 16:10:40 -080065
66 private static final String UP_RX_BYTES = "upRxBytes";
67 private static final String UP_RX_PACKETS = "upRxPackets";
68
69 private static final String UP_DROP_BYTES = "upDropBytes";
70 private static final String UP_DROP_PACKETS = "upDropPackets";
71
72 private static final String DOWN_RX_BYTES = "downRxBytes";
73 private static final String DOWN_RX_PACKETS = "downRxPackets";
74
75 private static final String DOWN_TX_BYTES = "downTxBytes";
76 private static final String DOWN_TX_PACKETS = "downTxPackets";
77
78 private static final String DOWN_DROP_BYTES = "downDropBytes";
79 private static final String DOWN_DROP_PACKETS = "downDropPackets";
80
81 private static final String CONTROL_PACKETS = "controlPackets";
82
83 private static final ImmutableMap<BngProgrammable.BngCounterType, Pair<String, String>> MAP_COUNTERS =
84 ImmutableMap.<BngProgrammable.BngCounterType, Pair<String, String>>builder()
85 .put(BngProgrammable.BngCounterType.UPSTREAM_RX, Pair.of(UP_RX_BYTES, UP_RX_PACKETS))
86 .put(BngProgrammable.BngCounterType.UPSTREAM_TX, Pair.of(UP_TX_BYTES, UP_TX_PACKETS))
87 .put(BngProgrammable.BngCounterType.UPSTREAM_DROPPED, Pair.of(UP_DROP_BYTES, UP_DROP_PACKETS))
88
89 .put(BngProgrammable.BngCounterType.DOWNSTREAM_RX, Pair.of(DOWN_RX_BYTES, DOWN_RX_PACKETS))
90 .put(BngProgrammable.BngCounterType.DOWNSTREAM_TX, Pair.of(DOWN_TX_BYTES, DOWN_TX_PACKETS))
91 .put(BngProgrammable.BngCounterType.DOWNSTREAM_DROPPED, Pair.of(DOWN_DROP_BYTES, DOWN_DROP_PACKETS))
92 .build();
93
94 private static final String TIMESTAMP = "timestamp";
95 private static final String ATTACHMENT_TYPE = "attachmentType";
96 private static final String DEVICE_ID = "deviceId";
97 private static final String PORT_NUMBER = "portNumber";
98 private static final String MAC_ADDRESS = "macAddress";
99 private static final String IP_ADDRESS = "ipAddress";
100 private static final String ONU_SERIAL_NUMBER = "onuSerialNumber";
101 private static final String PPPOE_SESSION_ID = "pppoeSessionId";
102
103 private final BngStatsEventListener statsListener = new InternalStatsListener();
104
Daniele Moro6f30ffd2019-12-06 16:10:40 -0800105 protected void bindBngStatsService(BngStatsService incomingService) {
106 bindAndAddListener(incomingService, bngStatsServiceRef, statsListener);
107 }
108
109 protected void unbindBngStatsService(BngStatsService outgoingService) {
110 unbindAndRemoveListener(outgoingService, bngStatsServiceRef, statsListener);
111 }
112
113 @Activate
114 public void activate() {
115 log.info("Started BngKafkaIntegration");
116 }
117
118 @Deactivate
119 public void deactivate() {
120 unbindBngStatsService(bngStatsServiceRef.get());
121 log.info("Stopped BngKafkaIntegration");
122 }
123
124 private JsonNode serializeBngStatsEvent(BngStatsEventSubject eventSubject) {
125 // Serialize stats in a JSON node
126 ObjectMapper mapper = new ObjectMapper();
127 ObjectNode attStatsNode = mapper.createObjectNode();
128
129 // Exposing statistics only for PPPoE attachment
130 var attachmentStats = eventSubject.getAttachmentStats();
131 var attachment = eventSubject.getBngAttachment();
132
133 attStatsNode.put(MAC_ADDRESS, attachment.macAddress().toString());
134 attStatsNode.put(IP_ADDRESS, attachment.ipAddress().toString());
135 attStatsNode.put(PPPOE_SESSION_ID, attachment.pppoeSessionId());
136
137 attStatsNode.put(SUBSCRIBER_S_TAG, attachment.sTag().toShort());
138 attStatsNode.put(SUBSCRIBER_C_TAG, attachment.cTag().toShort());
139
140 attStatsNode.put(ONU_SERIAL_NUMBER, attachment.onuSerial());
141 attStatsNode.put(ATTACHMENT_TYPE, attachment.type().toString());
142
143 attStatsNode.put(DEVICE_ID, attachment.oltConnectPoint().deviceId().toString());
144 attStatsNode.put(PORT_NUMBER, attachment.oltConnectPoint().port().toString());
145
146 // Add the statistics to the JSON
147 attStatsNode = createNodesStats(attachmentStats, attStatsNode);
148
149 // Control stats are different, only packets statistics
150 attStatsNode.put(CONTROL_PACKETS,
151 attachmentStats.get(BngProgrammable.BngCounterType.CONTROL_PLANE).packets());
152
153 attStatsNode.put(TIMESTAMP, Instant.now().toString());
154 return attStatsNode;
155 }
156
157 private ObjectNode createNodesStats(Map<BngProgrammable.BngCounterType,
158 PiCounterCellData> attStats, ObjectNode node) {
159 MAP_COUNTERS.forEach((counterType, pairStats) -> {
160 if (attStats.containsKey(counterType)) {
161 node.put(pairStats.getLeft(),
162 attStats.get(counterType).bytes());
163 node.put(pairStats.getRight(),
164 attStats.get(counterType).packets());
165 }
166 });
167 return node;
168 }
169
170 private class InternalStatsListener implements BngStatsEventListener {
171
172 @Override
173 public void event(BngStatsEvent event) {
174 eventBusService.send(TOPIC_STATS, serializeBngStatsEvent(event.subject()));
175 }
176 }
177}