blob: 4e16adc5616a98792e19226df42fa39f1d5c6072 [file] [log] [blame]
Zack Williams9e8efd32018-10-17 15:01:13 -07001#!/usr/bin/env python
2
3# Copyright 2018-present Open Networking Foundation
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
Zack Williams1f300022018-10-26 15:30:23 -070017from kafkaloghandler import KafkaLogHandler
Zack Williams9e8efd32018-10-17 15:01:13 -070018import json
19import logging
20import unittest
21
Zack Williams1f300022018-10-26 15:30:23 -070022# mock is a part of unittest in python 3
23try:
24 from mock import patch
25except ImportError:
26 from unittest.mock import patch
Zack Williams9e8efd32018-10-17 15:01:13 -070027
28
29class FakeKafkaProducer():
30 '''
31 Works like Producer in confluent_kafka, ref:
32 https://docs.confluent.io/current/clients/confluent-kafka-python/#producer
33 '''
34 def __init__(self, config=[]):
35 self.config = config
36
37 def produce(self, topic, value='', key=''):
38 self.topic = topic
39 self.value = value
40 self.key = key
41
Zack Williams19e1cc22018-11-05 23:18:41 -070042 def poll(self, timeout=1):
43 self.poll_timeout = timeout
44
Zack Williams9e8efd32018-10-17 15:01:13 -070045 def flush(self, timeout=1):
46 self.flush_timeout = timeout
47
48
49class TestKafkaLogHandler(unittest.TestCase):
50
51 def setUp(self):
52 '''
53 Setup tests for KafkaLogHandler, mainly common init of logger
54 '''
55 self.logger = logging.getLogger(__name__)
56 self.logger.handlers = []
57 self.logger.setLevel(logging.INFO)
58
59 def tearDown(self):
60 logging.shutdown()
61
62 def test_single_message(self):
63 '''
64 tests that _emit is called once when there is one message
65 '''
66
67 with patch.object(KafkaLogHandler, 'emit') as emit:
68
69 klh = KafkaLogHandler(bootstrap_servers=["test-kafka:9092"],
70 topic="testtopic")
71
72 self.logger.addHandler(klh)
73
74 self.logger.warn('Warning')
75
Zack Williams1f300022018-10-26 15:30:23 -070076 assert emit.call_count == 1
Zack Williams9e8efd32018-10-17 15:01:13 -070077
Zack Williams19e1cc22018-11-05 23:18:41 -070078 def test_config(self):
79 '''
80 tests that producer_config dictionary is built correctly
81 '''
82
83 ec = {"queue.buffering.max.messages": 1000,
84 "queue.buffering.max.kbytes": 512,
85 "bootstrap.servers": 'foo'}
86
87 klh = KafkaLogHandler(
88 bootstrap_servers=["tk1:9092", "tk2:9093"],
89 extra_config=ec,
90 topic="testtopic")
91
92 self.assertEqual(klh.producer_config,
93 {"bootstrap.servers": "tk1:9092,tk2:9093",
94 "queue.buffering.max.messages": 1000,
95 "queue.buffering.max.kbytes": 512})
96
Zack Williams9e8efd32018-10-17 15:01:13 -070097 def test_with_structure(self):
98 '''
99 tests structured serialization of log to JSON
100 '''
101
102 with patch.object(KafkaLogHandler, '_connect'):
103
104 klh = KafkaLogHandler(bootstrap_servers=["test-kafka:9092"],
105 topic="testtopic")
106
107 klh.producer = FakeKafkaProducer()
108
109 self.logger.addHandler(klh)
110
111 extra_data = {
112 "foo": "value1",
113 "bar": "value2",
114 "l1": {"l2": {'l3': "nested"}},
115 }
116
117 self.logger.info('structured', extra=extra_data)
118
119 decoded_message = json.loads(klh.producer.value)
120
121 self.assertEqual(klh.producer.topic, 'testtopic')
Zack Williams1f300022018-10-26 15:30:23 -0700122 self.assertEqual(decoded_message['message'], 'structured')
Zack Williams9e8efd32018-10-17 15:01:13 -0700123 self.assertEqual(decoded_message['foo'], 'value1')
124 self.assertEqual(decoded_message['bar'], 'value2')
125 self.assertEqual(decoded_message['l1.l2.l3'], 'nested')
126
127 def test_without_flatten(self):
128 '''
129 tests with flattening of objects disabled
130 '''
131
132 with patch.object(KafkaLogHandler, '_connect'):
133
134 klh = KafkaLogHandler(bootstrap_servers=["test-kafka:9092"],
135 topic="testtopic",
136 flatten=0)
137
138 klh.producer = FakeKafkaProducer()
139
140 self.logger.addHandler(klh)
141
142 extra_data = {
143 "foo": "value1",
144 "l1": {"l2": {'l3': "nested"}},
145 }
146
147 self.logger.info('noflatten', extra=extra_data)
148
149 decoded_message = json.loads(klh.producer.value)
150
Zack Williams1f300022018-10-26 15:30:23 -0700151 self.assertEqual(decoded_message['message'], 'noflatten')
Zack Williams9e8efd32018-10-17 15:01:13 -0700152 self.assertEqual(decoded_message['foo'], 'value1')
153 self.assertEqual(decoded_message['l1'], {'l2': {'l3': "nested"}})
154
155 def test_with_shallow_flatten(self):
156 '''
157 Tests with a shallow flattening of objects, and different separator
158 '''
159
160 with patch.object(KafkaLogHandler, '_connect'):
161
162 klh = KafkaLogHandler(bootstrap_servers=["test-kafka:9092"],
163 topic="testtopic",
164 flatten=1,
165 separator='_')
166
167 klh.producer = FakeKafkaProducer()
168
169 self.logger.addHandler(klh)
170
171 extra_data = {
172 "foo": "value1",
173 "l1": {"l2": {'l3': "nested"}},
174 }
175
176 self.logger.info('oneflatten', extra=extra_data)
177
178 decoded_message = json.loads(klh.producer.value)
179
Zack Williams1f300022018-10-26 15:30:23 -0700180 self.assertEqual(decoded_message['message'], 'oneflatten')
Zack Williams9e8efd32018-10-17 15:01:13 -0700181 self.assertEqual(decoded_message['foo'], 'value1')
182 self.assertEqual(decoded_message['l1_l2'], {'l3': 'nested'})
183
Zack Williams19e1cc22018-11-05 23:18:41 -0700184 def test_flatten_list(self):
185 '''
186 Tests flattening of lists
187 '''
188
189 with patch.object(KafkaLogHandler, '_connect'):
190
191 klh = KafkaLogHandler(bootstrap_servers=["test-kafka:9092"],
192 topic="testtopic")
193
194 klh.producer = FakeKafkaProducer()
195
196 self.logger.addHandler(klh)
197
198 extra_data = {
199 "foo": "value1",
200 "list": ['item0', 'item1', {'dict': 'in_list'}],
201 }
202
203 self.logger.info('listflatten', extra=extra_data)
204
205 decoded_message = json.loads(klh.producer.value)
206
207 self.assertEqual(decoded_message['message'], 'listflatten')
208 self.assertEqual(decoded_message['foo'], 'value1')
209 self.assertEqual(decoded_message['list.0'], 'item0')
210 self.assertEqual(decoded_message['list.1'], 'item1')
211 self.assertEqual(decoded_message['list.2.dict'], 'in_list')
212
Zack Williams9e8efd32018-10-17 15:01:13 -0700213 def test_override_key(self):
214 '''
215 Test setting the key argument to override the default
216 '''
217
218 with patch.object(KafkaLogHandler, '_connect'):
219
220 klh = KafkaLogHandler(bootstrap_servers=["test-kafka:9092"],
221 topic="testtopic")
222
223 klh.producer = FakeKafkaProducer()
224
225 self.logger.addHandler(klh)
226
227 extra_data = {
228 "foo": "value1",
229 "l1": {"l2": {'l3': "nested"}},
230 }
231
232 # log with default 'klh' key
233 self.logger.info('defaultkey', extra=extra_data)
234
235 decoded_message1 = json.loads(klh.producer.value)
236
237 self.assertEqual(klh.producer.key, 'klh')
238 self.assertEqual(decoded_message1['foo'], 'value1')
Zack Williams1f300022018-10-26 15:30:23 -0700239 self.assertEqual(decoded_message1['message'], 'defaultkey')
Zack Williams9e8efd32018-10-17 15:01:13 -0700240 self.assertEqual(decoded_message1['l1.l2.l3'], 'nested')
241
242 # log with key overridden
243 extra_data.update({'key': 'override'})
244 self.logger.info('keyoverride', extra=extra_data)
245
246 decoded_message2 = json.loads(klh.producer.value)
247
248 self.assertEqual(klh.producer.key, 'override')
Zack Williams1f300022018-10-26 15:30:23 -0700249 self.assertEqual(decoded_message2['message'], 'keyoverride')
Zack Williams9e8efd32018-10-17 15:01:13 -0700250 self.assertEqual(decoded_message2['foo'], 'value1')
251 self.assertEqual(decoded_message2['l1.l2.l3'], 'nested')
252
253 def test_blacklist(self):
254 '''
255 tests adding items to blacklist
256 '''
257
258 with patch.object(KafkaLogHandler, '_connect'):
259
260 klh = KafkaLogHandler(bootstrap_servers=["test-kafka:9092"],
261 topic="testtopic",
262 blacklist=["bar"])
263
264 klh.producer = FakeKafkaProducer()
265
266 self.logger.addHandler(klh)
267
268 extra_data = {
269 "foo": "value1",
270 "bar": "value2",
271 "l1": {"l2": {'l3': "nested"}},
272 }
273
274 self.logger.info('blacklist', extra=extra_data)
275
276 decoded_message = json.loads(klh.producer.value)
277
278 self.assertEqual(klh.producer.topic, 'testtopic')
Zack Williams1f300022018-10-26 15:30:23 -0700279 self.assertEqual(decoded_message['message'], 'blacklist')
Zack Williams9e8efd32018-10-17 15:01:13 -0700280 self.assertEqual(decoded_message['foo'], 'value1')
281 with self.assertRaises(KeyError):
282 decoded_message['bar']