blob: 52f9feb60cfdbbb6e1d7d858122dccb02f6430b8 [file] [log] [blame]
Zack Williams9e8efd32018-10-17 15:01:13 -07001#!/usr/bin/env python
2
3# Copyright 2018-present Open Networking Foundation
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17import json
18import logging
19import unittest
20
21from mock import patch
22from kafkaloghandler import KafkaLogHandler
23
24
25class FakeKafkaProducer():
26 '''
27 Works like Producer in confluent_kafka, ref:
28 https://docs.confluent.io/current/clients/confluent-kafka-python/#producer
29 '''
30 def __init__(self, config=[]):
31 self.config = config
32
33 def produce(self, topic, value='', key=''):
34 self.topic = topic
35 self.value = value
36 self.key = key
37
38 def flush(self, timeout=1):
39 self.flush_timeout = timeout
40
41
42class TestKafkaLogHandler(unittest.TestCase):
43
44 def setUp(self):
45 '''
46 Setup tests for KafkaLogHandler, mainly common init of logger
47 '''
48 self.logger = logging.getLogger(__name__)
49 self.logger.handlers = []
50 self.logger.setLevel(logging.INFO)
51
52 def tearDown(self):
53 logging.shutdown()
54
55 def test_single_message(self):
56 '''
57 tests that _emit is called once when there is one message
58 '''
59
60 with patch.object(KafkaLogHandler, 'emit') as emit:
61
62 klh = KafkaLogHandler(bootstrap_servers=["test-kafka:9092"],
63 topic="testtopic")
64
65 self.logger.addHandler(klh)
66
67 self.logger.warn('Warning')
68
69 emit.assert_called_once()
70
71 def test_with_structure(self):
72 '''
73 tests structured serialization of log to JSON
74 '''
75
76 with patch.object(KafkaLogHandler, '_connect'):
77
78 klh = KafkaLogHandler(bootstrap_servers=["test-kafka:9092"],
79 topic="testtopic")
80
81 klh.producer = FakeKafkaProducer()
82
83 self.logger.addHandler(klh)
84
85 extra_data = {
86 "foo": "value1",
87 "bar": "value2",
88 "l1": {"l2": {'l3': "nested"}},
89 }
90
91 self.logger.info('structured', extra=extra_data)
92
93 decoded_message = json.loads(klh.producer.value)
94
95 self.assertEqual(klh.producer.topic, 'testtopic')
96 self.assertEqual(decoded_message['msg'], 'structured')
97 self.assertEqual(decoded_message['foo'], 'value1')
98 self.assertEqual(decoded_message['bar'], 'value2')
99 self.assertEqual(decoded_message['l1.l2.l3'], 'nested')
100
101 def test_without_flatten(self):
102 '''
103 tests with flattening of objects disabled
104 '''
105
106 with patch.object(KafkaLogHandler, '_connect'):
107
108 klh = KafkaLogHandler(bootstrap_servers=["test-kafka:9092"],
109 topic="testtopic",
110 flatten=0)
111
112 klh.producer = FakeKafkaProducer()
113
114 self.logger.addHandler(klh)
115
116 extra_data = {
117 "foo": "value1",
118 "l1": {"l2": {'l3': "nested"}},
119 }
120
121 self.logger.info('noflatten', extra=extra_data)
122
123 decoded_message = json.loads(klh.producer.value)
124
125 self.assertEqual(decoded_message['msg'], 'noflatten')
126 self.assertEqual(decoded_message['foo'], 'value1')
127 self.assertEqual(decoded_message['l1'], {'l2': {'l3': "nested"}})
128
129 def test_with_shallow_flatten(self):
130 '''
131 Tests with a shallow flattening of objects, and different separator
132 '''
133
134 with patch.object(KafkaLogHandler, '_connect'):
135
136 klh = KafkaLogHandler(bootstrap_servers=["test-kafka:9092"],
137 topic="testtopic",
138 flatten=1,
139 separator='_')
140
141 klh.producer = FakeKafkaProducer()
142
143 self.logger.addHandler(klh)
144
145 extra_data = {
146 "foo": "value1",
147 "l1": {"l2": {'l3': "nested"}},
148 }
149
150 self.logger.info('oneflatten', extra=extra_data)
151
152 decoded_message = json.loads(klh.producer.value)
153
154 self.assertEqual(decoded_message['msg'], 'oneflatten')
155 self.assertEqual(decoded_message['foo'], 'value1')
156 self.assertEqual(decoded_message['l1_l2'], {'l3': 'nested'})
157
158 def test_override_key(self):
159 '''
160 Test setting the key argument to override the default
161 '''
162
163 with patch.object(KafkaLogHandler, '_connect'):
164
165 klh = KafkaLogHandler(bootstrap_servers=["test-kafka:9092"],
166 topic="testtopic")
167
168 klh.producer = FakeKafkaProducer()
169
170 self.logger.addHandler(klh)
171
172 extra_data = {
173 "foo": "value1",
174 "l1": {"l2": {'l3': "nested"}},
175 }
176
177 # log with default 'klh' key
178 self.logger.info('defaultkey', extra=extra_data)
179
180 decoded_message1 = json.loads(klh.producer.value)
181
182 self.assertEqual(klh.producer.key, 'klh')
183 self.assertEqual(decoded_message1['foo'], 'value1')
184 self.assertEqual(decoded_message1['msg'], 'defaultkey')
185 self.assertEqual(decoded_message1['l1.l2.l3'], 'nested')
186
187 # log with key overridden
188 extra_data.update({'key': 'override'})
189 self.logger.info('keyoverride', extra=extra_data)
190
191 decoded_message2 = json.loads(klh.producer.value)
192
193 self.assertEqual(klh.producer.key, 'override')
194 self.assertEqual(decoded_message2['msg'], 'keyoverride')
195 self.assertEqual(decoded_message2['foo'], 'value1')
196 self.assertEqual(decoded_message2['l1.l2.l3'], 'nested')
197
198 def test_blacklist(self):
199 '''
200 tests adding items to blacklist
201 '''
202
203 with patch.object(KafkaLogHandler, '_connect'):
204
205 klh = KafkaLogHandler(bootstrap_servers=["test-kafka:9092"],
206 topic="testtopic",
207 blacklist=["bar"])
208
209 klh.producer = FakeKafkaProducer()
210
211 self.logger.addHandler(klh)
212
213 extra_data = {
214 "foo": "value1",
215 "bar": "value2",
216 "l1": {"l2": {'l3': "nested"}},
217 }
218
219 self.logger.info('blacklist', extra=extra_data)
220
221 decoded_message = json.loads(klh.producer.value)
222
223 self.assertEqual(klh.producer.topic, 'testtopic')
224 self.assertEqual(decoded_message['msg'], 'blacklist')
225 self.assertEqual(decoded_message['foo'], 'value1')
226 with self.assertRaises(KeyError):
227 decoded_message['bar']