Minimally invasive set of logging fixes
- Cache structlog logger, make formatters generic
- Fix consul-specific naming in coordinator code
- Fix logging statements in envoyd that require formatting
- Clean up unicode-invalid binary strings from etcd
- Structured 'msg' key is removed by logging framework, change name
in a few places
- Move logging from INFO and above levels to DEBUG in a few places
Change-Id: Iea40f4969ad328f3d1180533dfc35cb9a2c0756b
diff --git a/ofagent/connection_mgr.py b/ofagent/connection_mgr.py
index f90c9f7..126dc29 100644
--- a/ofagent/connection_mgr.py
+++ b/ofagent/connection_mgr.py
@@ -35,7 +35,6 @@
from common.utils.dockerhelpers import get_my_containers_name
-log = get_logger()
# _ = third_party
class ConnectionManager(object):
@@ -45,8 +44,11 @@
vcore_retry_interval=0.5, devices_refresh_interval=5,
subscription_refresh_interval=5):
- log.info('init-connection-manager')
- log.info('list-of-controllers', controller_endpoints=controller_endpoints)
+ self.log = get_logger()
+ self.log.info('init-connection-manager')
+ self.log.info('list-of-controllers',
+ controller_endpoints=controller_endpoints)
+
self.controller_endpoints = controller_endpoints
self.consul_endpoint = consul_endpoint
self.vcore_endpoint = vcore_endpoint
@@ -74,7 +76,7 @@
if self.running:
return
- log.debug('starting')
+ self.log.debug('starting')
self.running = True
@@ -84,12 +86,12 @@
# Start monitoring logical devices and manage agents accordingly
reactor.callLater(0, self.monitor_logical_devices)
- log.info('started')
+ self.log.info('started')
return self
def stop(self):
- log.debug('stopping')
+ self.log.debug('stopping')
# clean up all controller connections
for agent in self.agent_map.itervalues():
agent.stop()
@@ -97,7 +99,7 @@
self._reset_grpc_attributes()
- log.info('stopped')
+ self.log.info('stopped')
def resolve_endpoint(self, endpoint):
ip_port_endpoint = endpoint
@@ -105,11 +107,11 @@
try:
ip_port_endpoint = get_endpoint_from_consul(
self.consul_endpoint, endpoint[1:])
- log.info(
+ self.log.info(
'{}-service-endpoint-found'.format(endpoint), address=ip_port_endpoint)
except Exception as e:
- log.error('{}-service-endpoint-not-found'.format(endpoint), exception=repr(e))
- log.error('committing-suicide')
+ self.log.error('{}-service-endpoint-not-found'.format(endpoint), exception=repr(e))
+ self.log.error('committing-suicide')
# Committing suicide in order to let docker restart ofagent
os.system("kill -15 {}".format(os.getpid()))
if ip_port_endpoint:
@@ -117,7 +119,7 @@
return host, int(port)
def _reset_grpc_attributes(self):
- log.debug('start-reset-grpc-attributes')
+ self.log.debug('start-reset-grpc-attributes')
if self.grpc_client is not None:
self.grpc_client.stop()
@@ -130,13 +132,13 @@
self.subscription = None
self.grpc_client = None
- log.debug('stop-reset-grpc-attributes')
+ self.log.debug('stop-reset-grpc-attributes')
def _assign_grpc_attributes(self):
- log.debug('start-assign-grpc-attributes')
+ self.log.debug('start-assign-grpc-attributes')
host, port = self.resolve_endpoint(self.vcore_endpoint)
- log.info('revolved-vcore-endpoint', endpoint=self.vcore_endpoint, host=host, port=port)
+ self.log.info('revolved-vcore-endpoint', endpoint=self.vcore_endpoint, host=host, port=port)
assert host is not None
assert port is not None
@@ -145,11 +147,11 @@
self.channel = grpc.insecure_channel('{}:{}'.format(host, port))
self.is_alive = True
- log.debug('stop-assign-grpc-attributes')
+ self.log.debug('stop-assign-grpc-attributes')
@inlineCallbacks
def monitor_vcore_grpc_channel(self):
- log.debug('start-monitor-vcore-grpc-channel')
+ self.log.debug('start-monitor-vcore-grpc-channel')
while self.running:
try:
@@ -170,7 +172,7 @@
if subscription is not None and subscription.ofagent_id == container_name:
if self.subscription is None:
# Keep details on the current GRPC session and subscription
- log.debug('subscription-with-vcore-successful', subscription=subscription)
+ self.log.debug('subscription-with-vcore-successful', subscription=subscription)
self.subscription = subscription
self.grpc_client.start()
@@ -182,13 +184,13 @@
# The subscription did not succeed, reset and move on
else:
- log.info('subscription-with-vcore-unavailable', subscription=subscription)
+ self.log.info('subscription-with-vcore-unavailable', subscription=subscription)
except _Rendezvous, e:
- log.error('subscription-with-vcore-terminated',exception=e, status=e.code())
+ self.log.error('subscription-with-vcore-terminated',exception=e, status=e.code())
except Exception as e:
- log.exception('unexpected-subscription-termination-with-vcore', e=e)
+ self.log.exception('unexpected-subscription-termination-with-vcore', e=e)
# Reset grpc details
# The vcore instance is either not available for subscription
@@ -198,33 +200,33 @@
# Sleep for a short period and retry
yield asleep(self.vcore_retry_interval)
- log.debug('stop-monitor-vcore-grpc-channel')
+ self.log.debug('stop-monitor-vcore-grpc-channel')
@inlineCallbacks
def get_list_of_reachable_logical_devices_from_voltha(self):
while self.running:
- log.info('retrieve-logical-device-list')
+ self.log.debug('retrieve-logical-device-list')
try:
devices = yield \
self.grpc_client.list_reachable_logical_devices()
for device in devices:
- log.info("reachable-logical-device-entry", id=device.id,
- datapath_id=device.datapath_id)
+ self.log.debug("reachable-logical-device-entry", id=device.id,
+ datapath_id=device.datapath_id)
returnValue(devices)
except _Rendezvous, e:
status = e.code()
- log.error('vcore-communication-failure', exception=e, status=status)
+ self.log.error('vcore-communication-failure', exception=e, status=status)
if status == StatusCode.UNAVAILABLE or status == StatusCode.DEADLINE_EXCEEDED:
os.system("kill -15 {}".format(os.getpid()))
except Exception as e:
- log.exception('logical-devices-retrieval-failure', exception=e)
+ self.log.exception('logical-devices-retrieval-failure', exception=e)
- log.info('reconnect', after_delay=self.vcore_retry_interval)
+ self.log.info('reconnect', after_delay=self.vcore_retry_interval)
yield asleep(self.vcore_retry_interval)
def refresh_agent_connections(self, devices):
@@ -258,8 +260,8 @@
if device.datapath_id in to_add:
self.create_agent(device)
- log.debug('updated-agent-list', count=len(self.agent_map))
- log.debug('updated-device-id-to-datapath-id-map',
+ self.log.debug('updated-agent-list', count=len(self.agent_map))
+ self.log.debug('updated-device-id-to-datapath-id-map',
map=str(self.device_id_to_datapath_id_map))
def create_agent(self, device):
@@ -283,10 +285,10 @@
@inlineCallbacks
def monitor_logical_devices(self):
- log.debug('start-monitor-logical-devices')
+ self.log.debug('start-monitor-logical-devices')
while self.running:
- log.info('monitoring-logical-devices')
+ self.log.debug('monitoring-logical-devices')
# should change to a gRPC streaming call
# see https://jira.opencord.org/browse/CORD-821
@@ -301,18 +303,18 @@
# update agent list and mapping tables as needed
self.refresh_agent_connections(reachable_devices)
else:
- log.info('vcore-communication-unavailable')
+ self.log.warning('vcore-communication-unavailable')
# wait before next poll
yield asleep(self.devices_refresh_interval)
except _Rendezvous, e:
- log.error('vcore-communication-failure', exception=repr(e), status=e.code())
+ self.log.error('vcore-communication-failure', exception=repr(e), status=e.code())
except Exception as e:
- log.exception('unexpected-vcore-communication-failure', exception=repr(e))
+ self.log.exception('unexpected-vcore-communication-failure', exception=repr(e))
- log.debug('stop-monitor-logical-devices')
+ self.log.debug('stop-monitor-logical-devices')
def forward_packet_in(self, device_id, ofp_packet_in):
datapath_id = self.device_id_to_datapath_id_map.get(device_id, None)
diff --git a/ofagent/grpc_client.py b/ofagent/grpc_client.py
index a1678c5..16be6f0 100644
--- a/ofagent/grpc_client.py
+++ b/ofagent/grpc_client.py
@@ -34,13 +34,12 @@
from google.protobuf import empty_pb2
-log = get_logger()
-
-
class GrpcClient(object):
def __init__(self, connection_manager, channel, grpc_timeout):
+ self.log = get_logger()
+
self.connection_manager = connection_manager
self.channel = channel
self.grpc_timeout = grpc_timeout
@@ -53,19 +52,19 @@
self.change_event_queue = DeferredQueue() # queue change events
def start(self):
- log.debug('starting', grpc_timeout=self.grpc_timeout)
+ self.log.debug('starting', grpc_timeout=self.grpc_timeout)
self.start_packet_out_stream()
self.start_packet_in_stream()
self.start_change_event_in_stream()
reactor.callLater(0, self.packet_in_forwarder_loop)
reactor.callLater(0, self.change_event_processing_loop)
- log.info('started')
+ self.log.info('started')
return self
def stop(self):
- log.debug('stopping')
+ self.log.debug('stopping')
self.stopped = True
- log.info('stopped')
+ self.log.info('stopped')
def start_packet_out_stream(self):
@@ -84,7 +83,7 @@
try:
self.local_stub.StreamPacketsOut(generator)
except _Rendezvous, e:
- log.error('grpc-exception', status=e.code())
+ self.log.error('grpc-exception', status=e.code())
if e.code() == StatusCode.UNAVAILABLE:
os.system("kill -15 {}".format(os.getpid()))
@@ -99,11 +98,11 @@
for packet_in in iterator:
reactor.callFromThread(self.packet_in_queue.put,
packet_in)
- log.debug('enqued-packet-in',
+ self.log.debug('enqued-packet-in',
packet_in=packet_in,
queue_len=len(self.packet_in_queue.pending))
except _Rendezvous, e:
- log.error('grpc-exception', status=e.code())
+ self.log.error('grpc-exception', status=e.code())
if e.code() == StatusCode.UNAVAILABLE:
os.system("kill -15 {}".format(os.getpid()))
@@ -117,11 +116,11 @@
try:
for event in iterator:
reactor.callFromThread(self.change_event_queue.put, event)
- log.debug('enqued-change-event',
+ self.log.debug('enqued-change-event',
change_event=event,
queue_len=len(self.change_event_queue.pending))
except _Rendezvous, e:
- log.error('grpc-exception', status=e.code())
+ self.log.error('grpc-exception', status=e.code())
if e.code() == StatusCode.UNAVAILABLE:
os.system("kill -15 {}".format(os.getpid()))
@@ -135,7 +134,7 @@
device_id = event.id
self.connection_manager.forward_change_event(device_id, event)
except Exception, e:
- log.exception('failed-in-packet-in-handler', e=e)
+ self.log.exception('failed-in-packet-in-handler', e=e)
if self.stopped:
break
@@ -264,7 +263,6 @@
self.local_stub.Subscribe, subscriber, timeout=self.grpc_timeout)
returnValue(res)
-
@inlineCallbacks
def get_meter_stats(self, device_id):
req = ID(id=device_id)