blob: f141f1adafdaf80c01d8a7de30cc436e34a12638 [file] [log] [blame]
Arjun E Kc33d3442020-03-02 10:20:28 +00001/*
Joey Armstrong89c55e02023-01-09 18:09:41 -05002 * Copyright 2018-2023 Open Networking Foundation (ONF) and the ONF Contributors
Arjun E Kc33d3442020-03-02 10:20:28 +00003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.opencord.kafka.integrations;
18
19import com.fasterxml.jackson.databind.JsonNode;
20import com.fasterxml.jackson.databind.ObjectMapper;
21import com.fasterxml.jackson.databind.node.ArrayNode;
22import com.fasterxml.jackson.databind.node.ObjectNode;
23import org.opencord.cordmcast.CordMcastStatistics;
24import org.opencord.cordmcast.CordMcastStatisticsEvent;
25import org.opencord.cordmcast.CordMcastStatisticsEventListener;
26import org.opencord.cordmcast.CordMcastStatisticsService;
27import org.opencord.kafka.EventBusService;
28import org.osgi.service.component.annotations.Activate;
29import org.osgi.service.component.annotations.Component;
30import org.osgi.service.component.annotations.Deactivate;
31import org.osgi.service.component.annotations.Reference;
32import org.osgi.service.component.annotations.ReferenceCardinality;
33import org.osgi.service.component.annotations.ReferencePolicy;
34
35import java.time.Instant;
36import java.util.List;
37import java.util.concurrent.atomic.AtomicReference;
38
39/**
40 * Listens to Mcast events and pushes them into kafka bus.
41 */
42@Component(immediate = true)
43public class McastKafkaIntegration extends AbstractKafkaIntegration {
44
45 @Reference(cardinality = ReferenceCardinality.MANDATORY)
46 protected EventBusService eventBusService;
47
48 @Reference(cardinality = ReferenceCardinality.OPTIONAL,
49 policy = ReferencePolicy.DYNAMIC,
50 bind = "bindMcastStatisticsService",
51 unbind = "unbindMcastStatisticsService")
52 protected volatile CordMcastStatisticsService cordMcastStatisticsService;
53 protected final AtomicReference<CordMcastStatisticsService> cordMcastStatisticsServiceRef = new AtomicReference<>();
54
55 private final CordMcastStatisticsEventListener cordMcastStatisticsEventListener =
56 new InternalCorcMcastStatisticsListener();
57
kishoreefa3db52020-03-23 16:09:26 +053058 protected static final String MCAST_OPERATIONAL_STATUS_TOPIC = "mcastOperationalStatus.events";
Arjun E Kc33d3442020-03-02 10:20:28 +000059
60 //cord mcast stats event params
61 private static final String TIMESTAMP = "timestamp";
62 private static final String GROUP = "Group";
63 private static final String SOURCE = "Source";
64 private static final String VLAN = "Vlan";
65 private static final String MCAST_EVENT_DATA = "McastEventData";
66
67 protected void bindMcastStatisticsService(CordMcastStatisticsService cordMcastStatisticsService) {
68 bindAndAddListener(cordMcastStatisticsService, cordMcastStatisticsServiceRef, cordMcastStatisticsEventListener);
69 }
70
71 protected void unbindMcastStatisticsService(CordMcastStatisticsService cordMcastStatisticsService) {
72 unbindAndRemoveListener(cordMcastStatisticsService,
73 cordMcastStatisticsServiceRef, cordMcastStatisticsEventListener);
74 }
75
76 @Activate
77 public void activate() {
78 log.info("Started McastKafkaIntegration");
79 }
80
81 @Deactivate
82 public void deactivate() {
83 log.info("Stopped McastKafkaIntegration");
84 }
85
86 private void handleMcastStat(CordMcastStatisticsEvent mcastStatEvent) {
87 eventBusService.send(MCAST_OPERATIONAL_STATUS_TOPIC, serializeMcastStat(mcastStatEvent));
Andrea Campanellaf26dd352020-05-08 14:55:55 +020088 log.debug("CordMcastStatisticsEvent {} sent successfully", mcastStatEvent);
Arjun E Kc33d3442020-03-02 10:20:28 +000089 }
90
91 private JsonNode serializeMcastStat(CordMcastStatisticsEvent mcastStatEvent) {
92 log.debug("Serializing AuthenticationStatisticsEvent");
93 ObjectMapper mapper = new ObjectMapper();
94 ObjectNode mcastStat = mapper.createObjectNode();
95 mcastStat.put(TIMESTAMP, Instant.now().toString());
96 ArrayNode mcastArrayNode = mcastStat.putArray(MCAST_EVENT_DATA);
97 List<CordMcastStatistics> cordMcastStatsList = mcastStatEvent.subject();
98 cordMcastStatsList.forEach(stats -> {
99 ObjectNode mcastNode = mapper.createObjectNode();
100 if (stats.getGroupAddress() != null) {
101 mcastNode.put(GROUP, stats.getGroupAddress().toString());
102 }
103 if (stats.getSourceAddress() != null) {
104 mcastNode.put(SOURCE, stats.getSourceAddress().toString());
105 }
106 mcastNode.put(VLAN, stats.getVlanId().toShort());
107 mcastArrayNode.add(mcastNode);
108 });
109 return mcastStat;
110 }
111
112 private class InternalCorcMcastStatisticsListener implements CordMcastStatisticsEventListener {
113
114 @Override
115 public void event(CordMcastStatisticsEvent mcastStatEvent) {
116 handleMcastStat(mcastStatEvent);
117 }
118 }
119}