blob: f48f2f8a762d569ff1e9fde890ab02d0e73c12e0 [file] [log] [blame]
Wei-Yu Chen49950b92021-11-08 19:19:18 +08001"""
2Copyright 2020 The Magma Authors.
3
4This source code is licensed under the BSD-style license found in the
5LICENSE file in the root directory of this source tree.
6
7Unless required by applicable law or agreed to in writing, software
8distributed under the License is distributed on an "AS IS" BASIS,
9WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10See the License for the specific language governing permissions and
11limitations under the License.
12"""
13
14import unittest
15import unittest.mock
16
17import metrics_pb2
18from common import metrics_export
19from orc8r.protos import metricsd_pb2
20from prometheus_client import (
21 CollectorRegistry,
22 Counter,
23 Gauge,
24 Histogram,
25 Summary,
26)
27
28
29class Service303MetricTests(unittest.TestCase):
30 """
31 Tests for the Service303 metrics interface
32 """
33
34 def setUp(self):
35 self.registry = CollectorRegistry()
36 self.maxDiff = None
37
38 def test_counter(self):
39 """Test that we can track counters in Service303"""
40 # Add a counter with a label to the regisry
41 c = Counter(
42 'process_max_fds', 'A counter', ['result'],
43 registry=self.registry,
44 )
45
46 # Create two series for value1 and value2
47 c.labels('success').inc(1.23)
48 c.labels('failure').inc(2.34)
49
50 # Build proto outputs
51 counter1 = metrics_pb2.Counter(value=1.23)
52 counter2 = metrics_pb2.Counter(value=2.34)
53 metric1 = metrics_pb2.Metric(
54 counter=counter1,
55 timestamp_ms=1234000,
56 )
57 metric2 = metrics_pb2.Metric(
58 counter=counter2,
59 timestamp_ms=1234000,
60 )
61 family = metrics_pb2.MetricFamily(
62 name=str(metricsd_pb2.process_max_fds),
63 type=metrics_pb2.COUNTER,
64 )
65 metric1.label.add(
66 name=str(metricsd_pb2.result),
67 value='success',
68 )
69 metric2.label.add(
70 name=str(metricsd_pb2.result),
71 value='failure',
72 )
73 family.metric.extend([metric1, metric2])
74
75 with unittest.mock.patch('time.time') as mock_time:
76 mock_time.side_effect = lambda: 1234
77 self.assertCountEqual(
78 list(metrics_export.get_metrics(self.registry))[0].metric,
79 family.metric,
80 )
81
82 def test_gauge(self):
83 """Test that we can track gauges in Service303"""
84 # Add a gauge with a label to the regisry
85 c = Gauge(
86 'process_max_fds', 'A gauge', ['result'],
87 registry=self.registry,
88 )
89
90 # Create two series for value1 and value2
91 c.labels('success').inc(1.23)
92 c.labels('failure').inc(2.34)
93
94 # Build proto outputs
95 gauge1 = metrics_pb2.Gauge(value=1.23)
96 gauge2 = metrics_pb2.Gauge(value=2.34)
97 metric1 = metrics_pb2.Metric(
98 gauge=gauge1,
99 timestamp_ms=1234000,
100 )
101 metric2 = metrics_pb2.Metric(
102 gauge=gauge2,
103 timestamp_ms=1234000,
104 )
105 family = metrics_pb2.MetricFamily(
106 name=str(metricsd_pb2.process_max_fds),
107 type=metrics_pb2.GAUGE,
108 )
109 metric1.label.add(
110 name=str(metricsd_pb2.result),
111 value='success',
112 )
113 metric2.label.add(
114 name=str(metricsd_pb2.result),
115 value='failure',
116 )
117 family.metric.extend([metric1, metric2])
118
119 with unittest.mock.patch('time.time') as mock_time:
120 mock_time.side_effect = lambda: 1234
121 self.assertCountEqual(
122 list(metrics_export.get_metrics(self.registry))[0].metric,
123 family.metric,
124 )
125
126 def test_summary(self):
127 """Test that we can track summaries in Service303"""
128 # Add a summary with a label to the regisry
129 c = Summary(
130 'process_max_fds', 'A summary', [
131 'result',
132 ], registry=self.registry,
133 )
134 c.labels('success').observe(1.23)
135 c.labels('failure').observe(2.34)
136
137 # Build proto outputs
138 summary1 = metrics_pb2.Summary(sample_count=1, sample_sum=1.23)
139 summary2 = metrics_pb2.Summary(sample_count=1, sample_sum=2.34)
140 metric1 = metrics_pb2.Metric(
141 summary=summary1,
142 timestamp_ms=1234000,
143 )
144 metric2 = metrics_pb2.Metric(
145 summary=summary2,
146 timestamp_ms=1234000,
147 )
148 family = metrics_pb2.MetricFamily(
149 name=str(metricsd_pb2.process_max_fds),
150 type=metrics_pb2.SUMMARY,
151 )
152 metric1.label.add(
153 name=str(metricsd_pb2.result),
154 value='success',
155 )
156 metric2.label.add(
157 name=str(metricsd_pb2.result),
158 value='failure',
159 )
160 family.metric.extend([metric1, metric2])
161
162 with unittest.mock.patch('time.time') as mock_time:
163 mock_time.side_effect = lambda: 1234
164 self.assertCountEqual(
165 list(metrics_export.get_metrics(self.registry))[0].metric,
166 family.metric,
167 )
168
169 def test_histogram(self):
170 """Test that we can track histogram in Service303"""
171 # Add a histogram with a label to the regisry
172 c = Histogram(
173 'process_max_fds', 'A summary', ['result'],
174 registry=self.registry, buckets=[0, 2, float('inf')],
175 )
176 c.labels('success').observe(1.23)
177 c.labels('failure').observe(2.34)
178
179 # Build proto outputs
180 histogram1 = metrics_pb2.Histogram(sample_count=1, sample_sum=1.23)
181 histogram1.bucket.add(upper_bound=0, cumulative_count=0)
182 histogram1.bucket.add(upper_bound=2, cumulative_count=1)
183 histogram1.bucket.add(upper_bound=float('inf'), cumulative_count=1)
184 histogram2 = metrics_pb2.Histogram(sample_count=1, sample_sum=2.34)
185 histogram2.bucket.add(upper_bound=0, cumulative_count=0)
186 histogram2.bucket.add(upper_bound=2, cumulative_count=0)
187 histogram2.bucket.add(upper_bound=float('inf'), cumulative_count=1)
188 metric1 = metrics_pb2.Metric(
189 histogram=histogram1,
190 timestamp_ms=1234000,
191 )
192 metric2 = metrics_pb2.Metric(
193 histogram=histogram2,
194 timestamp_ms=1234000,
195 )
196 family = metrics_pb2.MetricFamily(
197 name=str(metricsd_pb2.process_max_fds),
198 type=metrics_pb2.HISTOGRAM,
199 )
200 metric1.label.add(
201 name=str(metricsd_pb2.result),
202 value='success',
203 )
204 metric2.label.add(
205 name=str(metricsd_pb2.result),
206 value='failure',
207 )
208 family.metric.extend([metric1, metric2])
209
210 with unittest.mock.patch('time.time') as mock_time:
211 mock_time.side_effect = lambda: 1234
212 self.assertCountEqual(
213 list(metrics_export.get_metrics(self.registry))[0].metric,
214 family.metric,
215 )
216
217 def test_converted_enums(self):
218 """ Test that metric names and labels are auto converted """
219 # enum values (from metricsd.proto):
220 # mme_new_association => 500, result => 0
221 c = Counter(
222 'mme_new_association', 'A counter', ['result'],
223 registry=self.registry,
224 )
225
226 c.labels('success').inc(1.23)
227
228 metric_family = list(metrics_export.get_metrics(self.registry))[0]
229
230 self.assertEqual(
231 metric_family.name,
232 str(metricsd_pb2.mme_new_association),
233 )
234 metric_labels = metric_family.metric[0].label
235 # Order not guaranteed=
236 self.assertEqual(metric_labels[0].name, str(metricsd_pb2.result))
237 self.assertEqual(metric_labels[0].value, 'success')
238
239
240if __name__ == "__main__":
241 unittest.main()