blob: 04c89a2b9aff69149a34c38cefbad3e5c2b20986 [file] [log] [blame]
Wei-Yu Chenad55cb82022-02-15 20:07:01 +08001# SPDX-FileCopyrightText: 2020 The Magma Authors.
2# SPDX-FileCopyrightText: 2022 Open Networking Foundation <support@opennetworking.org>
3#
4# SPDX-License-Identifier: BSD-3-Clause
Wei-Yu Chen49950b92021-11-08 19:19:18 +08005
6import unittest
7import unittest.mock
8
9import metrics_pb2
10from common import metrics_export
11from orc8r.protos import metricsd_pb2
12from prometheus_client import (
13 CollectorRegistry,
14 Counter,
15 Gauge,
16 Histogram,
17 Summary,
18)
19
20
21class Service303MetricTests(unittest.TestCase):
22 """
23 Tests for the Service303 metrics interface
24 """
25
26 def setUp(self):
27 self.registry = CollectorRegistry()
28 self.maxDiff = None
29
30 def test_counter(self):
31 """Test that we can track counters in Service303"""
32 # Add a counter with a label to the regisry
33 c = Counter(
34 'process_max_fds', 'A counter', ['result'],
35 registry=self.registry,
36 )
37
38 # Create two series for value1 and value2
39 c.labels('success').inc(1.23)
40 c.labels('failure').inc(2.34)
41
42 # Build proto outputs
43 counter1 = metrics_pb2.Counter(value=1.23)
44 counter2 = metrics_pb2.Counter(value=2.34)
45 metric1 = metrics_pb2.Metric(
46 counter=counter1,
47 timestamp_ms=1234000,
48 )
49 metric2 = metrics_pb2.Metric(
50 counter=counter2,
51 timestamp_ms=1234000,
52 )
53 family = metrics_pb2.MetricFamily(
54 name=str(metricsd_pb2.process_max_fds),
55 type=metrics_pb2.COUNTER,
56 )
57 metric1.label.add(
58 name=str(metricsd_pb2.result),
59 value='success',
60 )
61 metric2.label.add(
62 name=str(metricsd_pb2.result),
63 value='failure',
64 )
65 family.metric.extend([metric1, metric2])
66
67 with unittest.mock.patch('time.time') as mock_time:
68 mock_time.side_effect = lambda: 1234
69 self.assertCountEqual(
70 list(metrics_export.get_metrics(self.registry))[0].metric,
71 family.metric,
72 )
73
74 def test_gauge(self):
75 """Test that we can track gauges in Service303"""
76 # Add a gauge with a label to the regisry
77 c = Gauge(
78 'process_max_fds', 'A gauge', ['result'],
79 registry=self.registry,
80 )
81
82 # Create two series for value1 and value2
83 c.labels('success').inc(1.23)
84 c.labels('failure').inc(2.34)
85
86 # Build proto outputs
87 gauge1 = metrics_pb2.Gauge(value=1.23)
88 gauge2 = metrics_pb2.Gauge(value=2.34)
89 metric1 = metrics_pb2.Metric(
90 gauge=gauge1,
91 timestamp_ms=1234000,
92 )
93 metric2 = metrics_pb2.Metric(
94 gauge=gauge2,
95 timestamp_ms=1234000,
96 )
97 family = metrics_pb2.MetricFamily(
98 name=str(metricsd_pb2.process_max_fds),
99 type=metrics_pb2.GAUGE,
100 )
101 metric1.label.add(
102 name=str(metricsd_pb2.result),
103 value='success',
104 )
105 metric2.label.add(
106 name=str(metricsd_pb2.result),
107 value='failure',
108 )
109 family.metric.extend([metric1, metric2])
110
111 with unittest.mock.patch('time.time') as mock_time:
112 mock_time.side_effect = lambda: 1234
113 self.assertCountEqual(
114 list(metrics_export.get_metrics(self.registry))[0].metric,
115 family.metric,
116 )
117
118 def test_summary(self):
119 """Test that we can track summaries in Service303"""
120 # Add a summary with a label to the regisry
121 c = Summary(
122 'process_max_fds', 'A summary', [
123 'result',
124 ], registry=self.registry,
125 )
126 c.labels('success').observe(1.23)
127 c.labels('failure').observe(2.34)
128
129 # Build proto outputs
130 summary1 = metrics_pb2.Summary(sample_count=1, sample_sum=1.23)
131 summary2 = metrics_pb2.Summary(sample_count=1, sample_sum=2.34)
132 metric1 = metrics_pb2.Metric(
133 summary=summary1,
134 timestamp_ms=1234000,
135 )
136 metric2 = metrics_pb2.Metric(
137 summary=summary2,
138 timestamp_ms=1234000,
139 )
140 family = metrics_pb2.MetricFamily(
141 name=str(metricsd_pb2.process_max_fds),
142 type=metrics_pb2.SUMMARY,
143 )
144 metric1.label.add(
145 name=str(metricsd_pb2.result),
146 value='success',
147 )
148 metric2.label.add(
149 name=str(metricsd_pb2.result),
150 value='failure',
151 )
152 family.metric.extend([metric1, metric2])
153
154 with unittest.mock.patch('time.time') as mock_time:
155 mock_time.side_effect = lambda: 1234
156 self.assertCountEqual(
157 list(metrics_export.get_metrics(self.registry))[0].metric,
158 family.metric,
159 )
160
161 def test_histogram(self):
162 """Test that we can track histogram in Service303"""
163 # Add a histogram with a label to the regisry
164 c = Histogram(
165 'process_max_fds', 'A summary', ['result'],
166 registry=self.registry, buckets=[0, 2, float('inf')],
167 )
168 c.labels('success').observe(1.23)
169 c.labels('failure').observe(2.34)
170
171 # Build proto outputs
172 histogram1 = metrics_pb2.Histogram(sample_count=1, sample_sum=1.23)
173 histogram1.bucket.add(upper_bound=0, cumulative_count=0)
174 histogram1.bucket.add(upper_bound=2, cumulative_count=1)
175 histogram1.bucket.add(upper_bound=float('inf'), cumulative_count=1)
176 histogram2 = metrics_pb2.Histogram(sample_count=1, sample_sum=2.34)
177 histogram2.bucket.add(upper_bound=0, cumulative_count=0)
178 histogram2.bucket.add(upper_bound=2, cumulative_count=0)
179 histogram2.bucket.add(upper_bound=float('inf'), cumulative_count=1)
180 metric1 = metrics_pb2.Metric(
181 histogram=histogram1,
182 timestamp_ms=1234000,
183 )
184 metric2 = metrics_pb2.Metric(
185 histogram=histogram2,
186 timestamp_ms=1234000,
187 )
188 family = metrics_pb2.MetricFamily(
189 name=str(metricsd_pb2.process_max_fds),
190 type=metrics_pb2.HISTOGRAM,
191 )
192 metric1.label.add(
193 name=str(metricsd_pb2.result),
194 value='success',
195 )
196 metric2.label.add(
197 name=str(metricsd_pb2.result),
198 value='failure',
199 )
200 family.metric.extend([metric1, metric2])
201
202 with unittest.mock.patch('time.time') as mock_time:
203 mock_time.side_effect = lambda: 1234
204 self.assertCountEqual(
205 list(metrics_export.get_metrics(self.registry))[0].metric,
206 family.metric,
207 )
208
209 def test_converted_enums(self):
210 """ Test that metric names and labels are auto converted """
211 # enum values (from metricsd.proto):
212 # mme_new_association => 500, result => 0
213 c = Counter(
214 'mme_new_association', 'A counter', ['result'],
215 registry=self.registry,
216 )
217
218 c.labels('success').inc(1.23)
219
220 metric_family = list(metrics_export.get_metrics(self.registry))[0]
221
222 self.assertEqual(
223 metric_family.name,
224 str(metricsd_pb2.mme_new_association),
225 )
226 metric_labels = metric_family.metric[0].label
227 # Order not guaranteed=
228 self.assertEqual(metric_labels[0].name, str(metricsd_pb2.result))
229 self.assertEqual(metric_labels[0].value, 'success')
230
231
232if __name__ == "__main__":
233 unittest.main()