Reformatting kafka logs
Change-Id: I853d3db2324c6cf96ccd5c7d08fe295299e38688
diff --git a/common/utils/consulhelpers.py b/common/utils/consulhelpers.py
index 1d2b88f..681aaf4 100644
--- a/common/utils/consulhelpers.py
+++ b/common/utils/consulhelpers.py
@@ -28,8 +28,9 @@
def get_endpoint_from_consul(consul_endpoint, service_name):
"""Look up, from consul, the service name specified by service-name
"""
- log.debug('Retrieving endpoint {} from consul {}'.format(service_name,
- consul_endpoint))
+ log.debug('getting-service-endpoint', consul=consul_endpoint,
+ service=service_name)
+
host = consul_endpoint.split(':')[0].strip()
port = int(consul_endpoint.split(':')[1].strip())
diff --git a/common/utils/dockerhelpers.py b/common/utils/dockerhelpers.py
index 9379e2d..89acab6 100644
--- a/common/utils/dockerhelpers.py
+++ b/common/utils/dockerhelpers.py
@@ -80,7 +80,7 @@
docker_cli = Client(base_url=docker_socket)
info = docker_cli.inspect_container(id)
except Exception, e:
- log.debug('failed: {}'.format(e.message))
+ log.exception('failed-inspect-container', id=id, e=e)
raise
return info
@@ -94,7 +94,7 @@
host_config = docker_cli.create_host_config(binds=binds,
port_bindings=port_bindings)
except Exception, e:
- log.exception('failed host config creation', volumes, ports, e=e)
+ log.exception('failed-host-config-creation', volumes, ports, e=e)
raise
return host_config
@@ -103,8 +103,9 @@
try:
docker_cli = Client(base_url=docker_socket)
docker_cli.connect_container_to_network(container, net_id, links=links)
- except:
- log.exception('Failed to connect container {} to network {}'.format(container, net_id))
+ except Exception, e:
+ log.exception('failed-connection-to-network',
+ container=container, net_id=net_id, e=e)
raise
def create_networking_config(name, links):
@@ -120,7 +121,7 @@
name : docker_cli.create_endpoint_config(links=links)
})
except Exception, e:
- log.exception('failed network creation', name, e=e)
+ log.exception('failed-network-creation', name, e=e)
raise
return networking_config
@@ -165,7 +166,7 @@
def __init__(self, threads=1):
self.client = CustomClient(base_url=docker_socket)
self.events = self.client.events(decode=True)
- log.info("Starting event processor with {} threads".format(threads))
+ log.info("starting", threads=threads)
self.exec_service = ThreadPoolExecutor(max_workers=threads)
def stop_listening(self):
@@ -200,7 +201,7 @@
for k in ['time', 'Time']:
if k in event:
event[k] = datetime.fromtimestamp(event[k])
- log.debug('docker event: {}'.format(event))
+ log.debug('docker-event', event=event)
data = {}
i = get_id(event)
@@ -212,14 +213,14 @@
data = self.client.inspect_image(i)
data[i] = data
except errors.NotFound:
- log.debug('No data for container {}'.format(i))
+ log.debug('no-data-for-container', container=i)
status = get_status(event)
if status in handlers:
self.exec_service.submit(handlers[get_status(event)], event,
data, handlers['podder_config'])
else:
- log.debug("No handler for {}; skipping...".format(status))
+ log.debug('no-handler', handler=status)
class CustomGenerator(object):
"""
diff --git a/kafka/kafka-consumer.py b/kafka/kafka-consumer.py
index 47d788e..7ed354d 100644
--- a/kafka/kafka-consumer.py
+++ b/kafka/kafka-consumer.py
@@ -39,16 +39,17 @@
yield self._client.load_metadata_for_topics(self.topic)
e = self._client.metadata_error_for_topic(self.topic)
if e:
- log.warning("Error: %r getting metadata for topic: %s",
- e, self.topic)
+ log.warning('no-metadata-for-topic', error=e,
+ topic=self.topic)
else:
partitions = self._client.topic_partitions[self.topic]
except KafkaUnavailableError:
- log.error("Unable to communicate with any Kafka brokers")
+ log.error("unable-to-communicate-with-Kafka-brokers")
self.stop()
def _note_consumer_stopped(result, consumer):
- log.info("Consumer: %r stopped with result: %r", consumer, result)
+ log.info('consumer-stopped', consumer=consumer,
+ result=result)
for partition in partitions:
c = Consumer(self._client, self.topic, partition,
@@ -63,7 +64,7 @@
def stop(self):
log.info("\n")
- log.info("Time is up, stopping consumers...")
+ log.info('end-of-execution-stopping-consumers')
# Ask each of our consumers to stop. When a consumer fully stops, it
# fires the deferred returned from its start() method. We saved all
# those deferreds away (above, in start()) in self._consumer_d_list,
@@ -75,10 +76,9 @@
# Once the consumers are all stopped, then close our client
def _stop_client(result):
if isinstance(result, Failure):
- log.error("Error stopping consumers: %r", result)
+ log.error('error', result=result)
else:
- log.info("All consumers stopped. Stopping client: %r",
- self._client)
+ log.info('all-consumers-stopped', client=self._client)
self._client.close()
return result
@@ -93,7 +93,7 @@
def msg_processor(self, consumer, msglist):
for msg in msglist:
- log.info("proc: msg: %r", msg)
+ log.info('proc', msg=msg)
def parse_options():
parser = ArgumentParser("Consume kafka messages")
@@ -123,7 +123,7 @@
int(args.runtime))
reactor.callWhenRunning(consumer_example.start)
reactor.run()
- log.info("All Done!")
+ log.info("completed!")
if __name__ == "__main__":
diff --git a/voltha/northbound/kafka/kafka_proxy.py b/voltha/northbound/kafka/kafka_proxy.py
index 5b9e161..4ee3e66 100644
--- a/voltha/northbound/kafka/kafka_proxy.py
+++ b/voltha/northbound/kafka/kafka_proxy.py
@@ -48,9 +48,7 @@
if KafkaProxy._kafka_instance:
raise Exception('Singleton exist for :{}'.format(KafkaProxy))
- log.info('KafkaProxy init with kafka endpoint:{}'.format(
- kafka_endpoint))
-
+ log.debug('initializing', endpoint=kafka_endpoint)
self.ack_timeout = ack_timeout
self.max_req_attempts = max_req_attempts
self.consul_endpoint = consul_endpoint
@@ -59,6 +57,7 @@
self.kclient = None
self.kproducer = None
self.event_bus_publisher = None
+ log.debug('initialized', endpoint=kafka_endpoint)
@inlineCallbacks
def start(self):
@@ -83,12 +82,11 @@
try:
_k_endpoint = get_endpoint_from_consul(self.consul_endpoint,
self.kafka_endpoint[1:])
- log.info(
- 'Found kafka service at {}'.format(_k_endpoint))
+ log.debug('found-kafka-service', endpoint=_k_endpoint)
except Exception as e:
- log.error('Failure to locate a kafka service from '
- 'consul {}:'.format(repr(e)))
+ log.exception('no-kafka-service-in-consul', e=e)
+
self.kproducer = None
self.kclient = None
return