Minimally invasive set of logging fixes
- Cache structlog logger, make formatters generic
- Fix consul-specific naming in coordinator code
- Fix logging statements in envoyd that require formatting
- Clean up unicode-invalid binary strings from etcd
- Structured 'msg' key is removed by logging framework, change name
in a few places
- Move logging from INFO and above levels to DEBUG in a few places
Change-Id: Iea40f4969ad328f3d1180533dfc35cb9a2c0756b
diff --git a/common/structlog_setup.py b/common/structlog_setup.py
index cbbda89..a6950b7 100644
--- a/common/structlog_setup.py
+++ b/common/structlog_setup.py
@@ -18,59 +18,20 @@
import logging
import logging.config
-from collections import OrderedDict
import structlog
-from structlog.stdlib import BoundLogger, INFO
-
-try:
- from thread import get_ident as _get_ident
-except ImportError:
- from dummy_thread import get_ident as _get_ident
-class StructuredLogRenderer(object):
- def __call__(self, logger, name, event_dict):
- # in order to keep structured log data in event_dict to be forwarded as
- # is, we need to pass it into the logger framework as the first
- # positional argument.
- args = (event_dict,)
- kwargs = {}
- return args, kwargs
-
-
-class PlainRenderedOrderedDict(OrderedDict):
- """Our special version of OrderedDict that renders into string as a dict,
- to make the log stream output cleaner.
- """
- def __repr__(self, _repr_running={}):
- 'od.__repr__() <==> repr(od)'
- call_key = id(self), _get_ident()
- if call_key in _repr_running:
- return '...'
- _repr_running[call_key] = 1
- try:
- if not self:
- return '{}'
- return '{%s}' % ", ".join("%s: %s" % (k, v)
- for k, v in self.items())
- finally:
- del _repr_running[call_key]
-
-
-def setup_logging(log_config, instance_id, verbosity_adjust=0):
+def setup_logging(log_config, instance_id,
+ verbosity_adjust=0, cache_on_use=True):
"""
Set up logging such that:
- The primary logging entry method is structlog
(see http://structlog.readthedocs.io/en/stable/index.html)
- - By default, the logging backend is Python standard lib logger
+ - Optionally cache the logger on first use
+ :return: structured logger
"""
- def add_exc_info_flag_for_exception(_, name, event_dict):
- if name == 'exception':
- event_dict['exc_info'] = True
- return event_dict
-
def add_instance_id(_, __, event_dict):
event_dict['instance_id'] = instance_id
return event_dict
@@ -79,34 +40,34 @@
logging.config.dictConfig(log_config)
logging.root.level -= 10 * verbosity_adjust
- processors = [
- add_exc_info_flag_for_exception,
- structlog.processors.StackInfoRenderer(),
- structlog.processors.format_exc_info,
- add_instance_id,
- StructuredLogRenderer(),
- ]
- structlog.configure(logger_factory=structlog.stdlib.LoggerFactory(),
- context_class=PlainRenderedOrderedDict,
- wrapper_class=BoundLogger,
- processors=processors)
+ structlog.configure(
+ processors=[
+ structlog.stdlib.filter_by_level,
+ structlog.stdlib.PositionalArgumentsFormatter(),
+ structlog.processors.StackInfoRenderer(),
+ structlog.processors.format_exc_info,
+ add_instance_id,
+ structlog.processors.UnicodeEncoder(),
+ structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
+ ],
+ context_class=dict,
+ logger_factory=structlog.stdlib.LoggerFactory(),
+ wrapper_class=structlog.stdlib.BoundLogger,
+ cache_logger_on_first_use=cache_on_use,
+ )
# Mark first line of log
log = structlog.get_logger()
- log.info("first-line")
+ log.info("first-log-line, logging level %d" % logging.root.level)
return log
-def update_logging(instance_id, vcore_id):
+def update_logging(instance_id, vcore_id, cache_on_use=True):
"""
Add the vcore id to the structured logger
:param vcore_id: The assigned vcore id
- :return: structure logger
+ :return: structured logger
"""
- def add_exc_info_flag_for_exception(_, name, event_dict):
- if name == 'exception':
- event_dict['exc_info'] = True
- return event_dict
def add_instance_id(_, __, event_dict):
event_dict['instance_id'] = instance_id
@@ -116,15 +77,22 @@
event_dict['vcore_id'] = vcore_id
return event_dict
- processors = [
- add_exc_info_flag_for_exception,
- structlog.processors.StackInfoRenderer(),
- structlog.processors.format_exc_info,
- add_instance_id,
- add_vcore_id,
- StructuredLogRenderer(),
- ]
- structlog.configure(processors=processors)
+ structlog.configure(
+ processors=[
+ structlog.stdlib.filter_by_level,
+ structlog.stdlib.PositionalArgumentsFormatter(),
+ structlog.processors.StackInfoRenderer(),
+ structlog.processors.format_exc_info,
+ add_instance_id,
+ add_vcore_id,
+ structlog.processors.UnicodeEncoder(),
+ structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
+ ],
+ context_class=dict,
+ logger_factory=structlog.stdlib.LoggerFactory(),
+ wrapper_class=structlog.stdlib.BoundLogger,
+ cache_logger_on_first_use=cache_on_use,
+ )
# Mark first line of log
log = structlog.get_logger()
diff --git a/envoy/go/envoyd/envoyd.go b/envoy/go/envoyd/envoyd.go
index af542ad..1a688cb 100644
--- a/envoy/go/envoyd/envoyd.go
+++ b/envoy/go/envoyd/envoyd.go
@@ -218,7 +218,7 @@
return c.consulapi.Agent().ServiceDeregister(id)
}
-// Service return a service
+// Service return a service
func (c *client) Service(service, tag string) ([]*consulapi.ServiceEntry, *consulapi.QueryMeta, error) {
passingOnly := true
addrs, meta, err := c.consulapi.Health().Service(service, tag, passingOnly, nil)
@@ -247,17 +247,17 @@
stderr, err := cmd.StderrPipe()
if err != nil {
- log.Fatal("Couldn't attach to stderr running envoy command: %s", err.Error())
+ log.Fatalf("Couldn't attach to stderr running envoy command: %s", err.Error())
}
stdout, err := cmd.StdoutPipe()
if err != nil {
- log.Fatal("Couldn't attach to stdout running envoy command: %s", err.Error())
+ log.Fatalf("Couldn't attach to stdout running envoy command: %s", err.Error())
}
so := bufio.NewReader(stdout)
se := bufio.NewReader(stderr)
if err = cmd.Start(); err != nil {
- log.Fatal("Error starting envoy: %s", err.Error())
+ log.Fatalf("Error starting envoy: %s", err.Error())
}
log.Printf("Envoy(%d) started", curEpoch)
soEof := false
@@ -276,7 +276,7 @@
soEof = true
}
} else if err != nil {
- log.Fatal("Attempt to read envoy standard out failed: %s", err.Error())
+ log.Fatalf("Attempt to read envoy standard out failed: %s", err.Error())
} else if count > 0 {
log.Printf("ENVOY_LOG(%d)(%d): %s",curEpoch,count,string(data))
}
@@ -289,14 +289,14 @@
seEof = true
}
} else if err != nil {
- log.Fatal("Attempt to read envoy standard err failed: %s", err.Error())
+ log.Fatalf("Attempt to read envoy standard err failed: %s", err.Error())
} else if count > 0 {
log.Printf("ENVOY_LOG(%d)(%d): %s",curEpoch,count,string(data))
}
}
log.Printf("Waiting on envoy %d to exit", curEpoch)
if err = cmd.Wait(); err != nil {
- log.Fatal("Envoy %d exited with an unexpected exit code: %s", curEpoch, err.Error())
+ log.Fatalf("Envoy %d exited with an unexpected exit code: %s", curEpoch, err.Error())
}
log.Printf("Envoy %d exited", curEpoch)
// Check if this was the primary envoy, if so
@@ -313,7 +313,7 @@
}
// This function will use the provided templete file to generate
-// the targetfile substituting
+// the targetfile substituting
func (ec * EnvoyControl) updateEnvoyConfig(ecv * EnvoyConfigVars) (err error) {
var firstRun bool = true
var firstRun2 bool = true
@@ -331,23 +331,23 @@
// Slurp up the template file.
tplt, err := ioutil.ReadFile(ec.envoyConfigTemplate)
if err != nil {
- log.Fatal("ERROR reading the template file, aborting: %s", err.Error())
+ log.Fatalf("ERROR reading the template file, aborting: %s", err.Error())
}
//fmt.Println(string(tplt))
configTemplate, err := template.New("config").Funcs(funcs).Parse(string(tplt));
if err != nil {
- log.Fatal("Unexpected error loading the Envoy template, aborting: %s", err.Error())
+ log.Fatalf("Unexpected error loading the Envoy template, aborting: %s", err.Error())
}
outFile,err := os.Create(ec.envoyConfig)
if err != nil {
- log.Fatal("Unexpected error opening the Envoy config file for write, aborting: %s", err.Error())
+ log.Fatalf("Unexpected error opening the Envoy config file for write, aborting: %s", err.Error())
}
if err = configTemplate.Execute(outFile, ecv); err != nil {
- log.Fatal("Unexpected error executing the Envoy config template, aborting: %s", err.Error())
+ log.Fatalf("Unexpected error executing the Envoy config template, aborting: %s", err.Error())
}
//cfgFile, err := ioutil.ReadFile(ec.envoyConfig)
//if err != nil {
- // log.Fatal("ERROR reading the config file, aborting: %s", err.Error())
+ // log.Fatalf("ERROR reading the config file, aborting: %s", err.Error())
// panic(err)
//}
//fmt.Println(string(cfgFile))
@@ -363,7 +363,7 @@
//err = json.Unmarshal(jsonString, &f)
err = json.Unmarshal(jsonString, &f)
if err != nil {
- log.Fatal("Unable to parse json record %s : %s", jsonString, err.Error())
+ log.Fatalf("Unable to parse json record %s : %s", jsonString, err.Error())
} else {
m := f.(map[string]interface{})
for k, v := range m {
@@ -413,7 +413,7 @@
ecv.VolthaVip = ec.ipAddrs[ec.vcoreSvcName][0] + ":" + ec.vcorePort
// Extract all values from the KV record
- // In the future, the values should all be compared to what we currently have
+ // In the future, the values should all be compared to what we currently have
vCluster, err = ec.parseAssignment(keyValue)
if err == nil {
ec.vc = vCluster // For future use to determine if there's been a real change
@@ -425,7 +425,7 @@
ecv.VolthaRR = append(ecv.VolthaRR, vCluster[i].Host + ":" + ec.vcorePort)
}
} else {
- log.Fatal("Couldn't parse the KV record %s: %s", string(keyValue), err.Error())
+ log.Fatalf("Couldn't parse the KV record %s: %s", string(keyValue), err.Error())
}
return
}
@@ -435,7 +435,7 @@
var ecv EnvoyConfigVars
if err = ec.prepareEnvoyConfig(keyValue, &ecv); err != nil {
- log.Fatal("Error preparing envoy config variables, aborting: %s", err.Error())
+ log.Fatalf("Error preparing envoy config variables, aborting: %s", err.Error())
}
// Now that we have the data loaded, update the envoy config and start envoy
@@ -468,7 +468,7 @@
kvp, meta, err = kv.Get(ec.assignmentKey, qo)
}
if i == ec.retries {
- log.Fatal("Failed to read the assignment key after %d retries, aborting: %s", ec.retries, err.Error())
+ log.Fatalf("Failed to read the assignment key after %d retries, aborting: %s", ec.retries, err.Error())
}
}
return
@@ -497,7 +497,7 @@
resp, err = ec.etcd.Get(context.Background(), ec.assignmentKey)
}
if i == ec.retries {
- log.Fatal("Failed to read assignment key from etcd after %d retries, aborting: %s", ec.retries, err.Error())
+ log.Fatalf("Failed to read assignment key from etcd after %d retries, aborting: %s", ec.retries, err.Error())
}
}
return
@@ -525,7 +525,7 @@
}
val, meta, err = ec.readConsulKey(ec.assignmentKey, &qo)
if err != nil {
- log.Fatal("Unable to read assignment consul key: %s\n", err.Error())
+ log.Fatalf("Unable to read assignment consul key: %s\n", err.Error())
} else {
log.Println(string(val))
log.Printf("meta.LastIndex = %d", meta.LastIndex)
@@ -634,16 +634,16 @@
func (ec * EnvoyControl) Initialize() (err error) {
// Resolve KV store's virtual ip address
if err = ec.resolveServiceAddress(ec.kvSvcName); err != nil {
- log.Fatal("Can't proceed without KV store's vIP address: %s", err.Error())
+ log.Fatalf("Can't proceed without KV store's vIP address: %s", err.Error())
}
// Resolve voltha's virtual ip address
if err = ec.resolveServiceAddress(ec.vcoreSvcName); err != nil {
- log.Fatal("Can't proceed without voltha's vIP address: %s", err.Error())
+ log.Fatalf("Can't proceed without voltha's vIP address: %s", err.Error())
}
if err = ec.kvConnect[ec.kvStore](ec.kvSvcName, ec.kvPort); err != nil {
- log.Fatal("Failed to create KV client, aborting: %s", err.Error())
+ log.Fatalf("Failed to create KV client, aborting: %s", err.Error())
}
if ec.httpDisabled == true && ec.httpsDisabled == true {
@@ -672,7 +672,7 @@
log.Printf("KV-store %s at %s:%s", ec.kvStore, ec.kvSvcName, ec.kvPort)
if err = ec.Initialize(); err != nil {
- log.Fatal("Envoy control initialization failed, aborting: %s", err.Error())
+ log.Fatalf("Envoy control initialization failed, aborting: %s", err.Error())
}
diff --git a/ofagent/connection_mgr.py b/ofagent/connection_mgr.py
index f90c9f7..126dc29 100644
--- a/ofagent/connection_mgr.py
+++ b/ofagent/connection_mgr.py
@@ -35,7 +35,6 @@
from common.utils.dockerhelpers import get_my_containers_name
-log = get_logger()
# _ = third_party
class ConnectionManager(object):
@@ -45,8 +44,11 @@
vcore_retry_interval=0.5, devices_refresh_interval=5,
subscription_refresh_interval=5):
- log.info('init-connection-manager')
- log.info('list-of-controllers', controller_endpoints=controller_endpoints)
+ self.log = get_logger()
+ self.log.info('init-connection-manager')
+ self.log.info('list-of-controllers',
+ controller_endpoints=controller_endpoints)
+
self.controller_endpoints = controller_endpoints
self.consul_endpoint = consul_endpoint
self.vcore_endpoint = vcore_endpoint
@@ -74,7 +76,7 @@
if self.running:
return
- log.debug('starting')
+ self.log.debug('starting')
self.running = True
@@ -84,12 +86,12 @@
# Start monitoring logical devices and manage agents accordingly
reactor.callLater(0, self.monitor_logical_devices)
- log.info('started')
+ self.log.info('started')
return self
def stop(self):
- log.debug('stopping')
+ self.log.debug('stopping')
# clean up all controller connections
for agent in self.agent_map.itervalues():
agent.stop()
@@ -97,7 +99,7 @@
self._reset_grpc_attributes()
- log.info('stopped')
+ self.log.info('stopped')
def resolve_endpoint(self, endpoint):
ip_port_endpoint = endpoint
@@ -105,11 +107,11 @@
try:
ip_port_endpoint = get_endpoint_from_consul(
self.consul_endpoint, endpoint[1:])
- log.info(
+ self.log.info(
'{}-service-endpoint-found'.format(endpoint), address=ip_port_endpoint)
except Exception as e:
- log.error('{}-service-endpoint-not-found'.format(endpoint), exception=repr(e))
- log.error('committing-suicide')
+ self.log.error('{}-service-endpoint-not-found'.format(endpoint), exception=repr(e))
+ self.log.error('committing-suicide')
# Committing suicide in order to let docker restart ofagent
os.system("kill -15 {}".format(os.getpid()))
if ip_port_endpoint:
@@ -117,7 +119,7 @@
return host, int(port)
def _reset_grpc_attributes(self):
- log.debug('start-reset-grpc-attributes')
+ self.log.debug('start-reset-grpc-attributes')
if self.grpc_client is not None:
self.grpc_client.stop()
@@ -130,13 +132,13 @@
self.subscription = None
self.grpc_client = None
- log.debug('stop-reset-grpc-attributes')
+ self.log.debug('stop-reset-grpc-attributes')
def _assign_grpc_attributes(self):
- log.debug('start-assign-grpc-attributes')
+ self.log.debug('start-assign-grpc-attributes')
host, port = self.resolve_endpoint(self.vcore_endpoint)
- log.info('revolved-vcore-endpoint', endpoint=self.vcore_endpoint, host=host, port=port)
+ self.log.info('revolved-vcore-endpoint', endpoint=self.vcore_endpoint, host=host, port=port)
assert host is not None
assert port is not None
@@ -145,11 +147,11 @@
self.channel = grpc.insecure_channel('{}:{}'.format(host, port))
self.is_alive = True
- log.debug('stop-assign-grpc-attributes')
+ self.log.debug('stop-assign-grpc-attributes')
@inlineCallbacks
def monitor_vcore_grpc_channel(self):
- log.debug('start-monitor-vcore-grpc-channel')
+ self.log.debug('start-monitor-vcore-grpc-channel')
while self.running:
try:
@@ -170,7 +172,7 @@
if subscription is not None and subscription.ofagent_id == container_name:
if self.subscription is None:
# Keep details on the current GRPC session and subscription
- log.debug('subscription-with-vcore-successful', subscription=subscription)
+ self.log.debug('subscription-with-vcore-successful', subscription=subscription)
self.subscription = subscription
self.grpc_client.start()
@@ -182,13 +184,13 @@
# The subscription did not succeed, reset and move on
else:
- log.info('subscription-with-vcore-unavailable', subscription=subscription)
+ self.log.info('subscription-with-vcore-unavailable', subscription=subscription)
except _Rendezvous, e:
- log.error('subscription-with-vcore-terminated',exception=e, status=e.code())
+ self.log.error('subscription-with-vcore-terminated',exception=e, status=e.code())
except Exception as e:
- log.exception('unexpected-subscription-termination-with-vcore', e=e)
+ self.log.exception('unexpected-subscription-termination-with-vcore', e=e)
# Reset grpc details
# The vcore instance is either not available for subscription
@@ -198,33 +200,33 @@
# Sleep for a short period and retry
yield asleep(self.vcore_retry_interval)
- log.debug('stop-monitor-vcore-grpc-channel')
+ self.log.debug('stop-monitor-vcore-grpc-channel')
@inlineCallbacks
def get_list_of_reachable_logical_devices_from_voltha(self):
while self.running:
- log.info('retrieve-logical-device-list')
+ self.log.debug('retrieve-logical-device-list')
try:
devices = yield \
self.grpc_client.list_reachable_logical_devices()
for device in devices:
- log.info("reachable-logical-device-entry", id=device.id,
- datapath_id=device.datapath_id)
+ self.log.debug("reachable-logical-device-entry", id=device.id,
+ datapath_id=device.datapath_id)
returnValue(devices)
except _Rendezvous, e:
status = e.code()
- log.error('vcore-communication-failure', exception=e, status=status)
+ self.log.error('vcore-communication-failure', exception=e, status=status)
if status == StatusCode.UNAVAILABLE or status == StatusCode.DEADLINE_EXCEEDED:
os.system("kill -15 {}".format(os.getpid()))
except Exception as e:
- log.exception('logical-devices-retrieval-failure', exception=e)
+ self.log.exception('logical-devices-retrieval-failure', exception=e)
- log.info('reconnect', after_delay=self.vcore_retry_interval)
+ self.log.info('reconnect', after_delay=self.vcore_retry_interval)
yield asleep(self.vcore_retry_interval)
def refresh_agent_connections(self, devices):
@@ -258,8 +260,8 @@
if device.datapath_id in to_add:
self.create_agent(device)
- log.debug('updated-agent-list', count=len(self.agent_map))
- log.debug('updated-device-id-to-datapath-id-map',
+ self.log.debug('updated-agent-list', count=len(self.agent_map))
+ self.log.debug('updated-device-id-to-datapath-id-map',
map=str(self.device_id_to_datapath_id_map))
def create_agent(self, device):
@@ -283,10 +285,10 @@
@inlineCallbacks
def monitor_logical_devices(self):
- log.debug('start-monitor-logical-devices')
+ self.log.debug('start-monitor-logical-devices')
while self.running:
- log.info('monitoring-logical-devices')
+ self.log.debug('monitoring-logical-devices')
# should change to a gRPC streaming call
# see https://jira.opencord.org/browse/CORD-821
@@ -301,18 +303,18 @@
# update agent list and mapping tables as needed
self.refresh_agent_connections(reachable_devices)
else:
- log.info('vcore-communication-unavailable')
+ self.log.warning('vcore-communication-unavailable')
# wait before next poll
yield asleep(self.devices_refresh_interval)
except _Rendezvous, e:
- log.error('vcore-communication-failure', exception=repr(e), status=e.code())
+ self.log.error('vcore-communication-failure', exception=repr(e), status=e.code())
except Exception as e:
- log.exception('unexpected-vcore-communication-failure', exception=repr(e))
+ self.log.exception('unexpected-vcore-communication-failure', exception=repr(e))
- log.debug('stop-monitor-logical-devices')
+ self.log.debug('stop-monitor-logical-devices')
def forward_packet_in(self, device_id, ofp_packet_in):
datapath_id = self.device_id_to_datapath_id_map.get(device_id, None)
diff --git a/ofagent/grpc_client.py b/ofagent/grpc_client.py
index a1678c5..16be6f0 100644
--- a/ofagent/grpc_client.py
+++ b/ofagent/grpc_client.py
@@ -34,13 +34,12 @@
from google.protobuf import empty_pb2
-log = get_logger()
-
-
class GrpcClient(object):
def __init__(self, connection_manager, channel, grpc_timeout):
+ self.log = get_logger()
+
self.connection_manager = connection_manager
self.channel = channel
self.grpc_timeout = grpc_timeout
@@ -53,19 +52,19 @@
self.change_event_queue = DeferredQueue() # queue change events
def start(self):
- log.debug('starting', grpc_timeout=self.grpc_timeout)
+ self.log.debug('starting', grpc_timeout=self.grpc_timeout)
self.start_packet_out_stream()
self.start_packet_in_stream()
self.start_change_event_in_stream()
reactor.callLater(0, self.packet_in_forwarder_loop)
reactor.callLater(0, self.change_event_processing_loop)
- log.info('started')
+ self.log.info('started')
return self
def stop(self):
- log.debug('stopping')
+ self.log.debug('stopping')
self.stopped = True
- log.info('stopped')
+ self.log.info('stopped')
def start_packet_out_stream(self):
@@ -84,7 +83,7 @@
try:
self.local_stub.StreamPacketsOut(generator)
except _Rendezvous, e:
- log.error('grpc-exception', status=e.code())
+ self.log.error('grpc-exception', status=e.code())
if e.code() == StatusCode.UNAVAILABLE:
os.system("kill -15 {}".format(os.getpid()))
@@ -99,11 +98,11 @@
for packet_in in iterator:
reactor.callFromThread(self.packet_in_queue.put,
packet_in)
- log.debug('enqued-packet-in',
+ self.log.debug('enqued-packet-in',
packet_in=packet_in,
queue_len=len(self.packet_in_queue.pending))
except _Rendezvous, e:
- log.error('grpc-exception', status=e.code())
+ self.log.error('grpc-exception', status=e.code())
if e.code() == StatusCode.UNAVAILABLE:
os.system("kill -15 {}".format(os.getpid()))
@@ -117,11 +116,11 @@
try:
for event in iterator:
reactor.callFromThread(self.change_event_queue.put, event)
- log.debug('enqued-change-event',
+ self.log.debug('enqued-change-event',
change_event=event,
queue_len=len(self.change_event_queue.pending))
except _Rendezvous, e:
- log.error('grpc-exception', status=e.code())
+ self.log.error('grpc-exception', status=e.code())
if e.code() == StatusCode.UNAVAILABLE:
os.system("kill -15 {}".format(os.getpid()))
@@ -135,7 +134,7 @@
device_id = event.id
self.connection_manager.forward_change_event(device_id, event)
except Exception, e:
- log.exception('failed-in-packet-in-handler', e=e)
+ self.log.exception('failed-in-packet-in-handler', e=e)
if self.stopped:
break
@@ -264,7 +263,6 @@
self.local_stub.Subscribe, subscriber, timeout=self.grpc_timeout)
returnValue(res)
-
@inlineCallbacks
def get_meter_stats(self, device_id):
req = ID(id=device_id)
diff --git a/voltha/adapters/asfvolt16_olt/bal.py b/voltha/adapters/asfvolt16_olt/bal.py
index 012a150..a1a7eef 100644
--- a/voltha/adapters/asfvolt16_olt/bal.py
+++ b/voltha/adapters/asfvolt16_olt/bal.py
@@ -254,7 +254,7 @@
# Set the Packet-out info
# TODO: Need to provide correct value for intf_id
obj.packet.data.pkt = pkt
- self.log.info('sending-packet-out',
+ self.log.debug('sending-packet-out',
packet_out_details=obj)
yield self.stub.BalCfgSet(obj, timeout=GRPC_TIMEOUT)
diff --git a/voltha/adapters/bbsimolt/bbsimolt_device.py b/voltha/adapters/bbsimolt/bbsimolt_device.py
index 3cbafc1..8bce86f 100644
--- a/voltha/adapters/bbsimolt/bbsimolt_device.py
+++ b/voltha/adapters/bbsimolt/bbsimolt_device.py
@@ -612,7 +612,7 @@
def packet_out(self, egress_port, msg):
pkt = Ether(msg)
- self.log.info('packet out', egress_port=egress_port,
+ self.log.debug('packet out', egress_port=egress_port,
packet=str(pkt).encode("HEX"))
# Find port type
diff --git a/voltha/adapters/cig_olt/cig_olt_handler.py b/voltha/adapters/cig_olt/cig_olt_handler.py
index b7c37a7..0cfaa64 100644
--- a/voltha/adapters/cig_olt/cig_olt_handler.py
+++ b/voltha/adapters/cig_olt/cig_olt_handler.py
@@ -582,8 +582,8 @@
self.log.exception('Exception during packet_in_msg_proc processing', e=e)
def packet_out(self, egress_port, msg):
- self.log.info('sending-packet-out', egress_port=egress_port,
- msg=hexify(msg))
+ self.log.debug('sending-packet-out', egress_port=egress_port,
+ msg_hex=hexify(msg))
pkt = Ether(msg)
out_pkt = (
Ether(src=pkt.src, dst=pkt.dst) /
diff --git a/voltha/adapters/iadapter.py b/voltha/adapters/iadapter.py
index db08945..e066365 100644
--- a/voltha/adapters/iadapter.py
+++ b/voltha/adapters/iadapter.py
@@ -295,7 +295,8 @@
log.exception('Exception', e=e)
def send_proxied_message(self, proxy_address, msg):
- log.info('send-proxied-message', proxy_address=proxy_address, msg=msg)
+ log.debug('send-proxied-message', proxy_address=proxy_address,
+ proxied_msg=msg)
handler = self.devices_handlers[proxy_address.device_id]
handler.send_proxied_message(proxy_address, msg)
@@ -344,8 +345,8 @@
return device
def receive_proxied_message(self, proxy_address, msg):
- log.info('receive-proxied-message', proxy_address=proxy_address,
- device_id=proxy_address.device_id, msg=msg)
+ log.debug('receive-proxied-message', proxy_address=proxy_address,
+ device_id=proxy_address.device_id, proxied_msg=msg)
# Device_id from the proxy_address is the olt device id. We need to
# get the onu device id using the port number in the proxy_address
device = self.adapter_agent. \
diff --git a/voltha/adapters/maple_olt/maple_olt.py b/voltha/adapters/maple_olt/maple_olt.py
index 896b04f..1a49ef9 100644
--- a/voltha/adapters/maple_olt/maple_olt.py
+++ b/voltha/adapters/maple_olt/maple_olt.py
@@ -1362,9 +1362,9 @@
self.log.info('send-proxied_message-exception', exc=str(e))
def packet_out(self, egress_port, msg):
- self.log.info('sending-packet-out',
- egress_port=egress_port,
- msg=hexify(msg))
+ self.log.debug('sending-packet-out',
+ egress_port=egress_port,
+ msg_hex=hexify(msg))
pkt = Ether(msg)
out_pkt = (
diff --git a/voltha/adapters/openolt/openolt.py b/voltha/adapters/openolt/openolt.py
index 50c84af..5c8768c 100644
--- a/voltha/adapters/openolt/openolt.py
+++ b/voltha/adapters/openolt/openolt.py
@@ -242,14 +242,16 @@
raise NotImplementedError()
def send_proxied_message(self, proxy_address, msg):
- log.debug('send-proxied-message', proxy_address=proxy_address, msg=msg)
+ log.debug('send-proxied-message',
+ proxy_address=proxy_address,
+ proxied_msg=msg)
handler = self.devices[proxy_address.device_id]
handler.send_proxied_message(proxy_address, msg)
def receive_proxied_message(self, proxy_address, msg):
log.debug('receive_proxied_message - Not implemented',
proxy_address=proxy_address,
- msg=msg)
+ proxied_msg=msg)
raise NotImplementedError()
def receive_packet_out(self, logical_device_id, egress_port_no, msg):
diff --git a/voltha/adapters/ponsim_olt/ponsim_olt.py b/voltha/adapters/ponsim_olt/ponsim_olt.py
index 9f4ba89..a752644 100644
--- a/voltha/adapters/ponsim_olt/ponsim_olt.py
+++ b/voltha/adapters/ponsim_olt/ponsim_olt.py
@@ -700,7 +700,7 @@
self.pm_metrics.update(pm_config)
def send_proxied_message(self, proxy_address, msg):
- self.log.info('sending-proxied-message')
+ self.log.debug('sending-proxied-message')
if isinstance(msg, FlowTable):
stub = ponsim_pb2_grpc.PonSimStub(self.get_channel())
self.log.info('pushing-onu-flow-table', port=msg.port)
@@ -708,13 +708,13 @@
self.adapter_agent.receive_proxied_message(proxy_address, res)
elif isinstance(msg, PonSimMetricsRequest):
stub = ponsim_pb2_grpc.PonSimStub(self.get_channel())
- self.log.info('proxying onu stats request', port=msg.port)
+ self.log.debug('proxying onu stats request', port=msg.port)
res = stub.GetStats(msg)
self.adapter_agent.receive_proxied_message(proxy_address, res)
def packet_out(self, egress_port, msg):
- self.log.info('sending-packet-out', egress_port=egress_port,
- msg=hexify(msg))
+ self.log.debug('sending-packet-out', egress_port=egress_port,
+ msg_hex=hexify(msg))
pkt = Ether(msg)
out_pkt = pkt
if egress_port != self.nni_port.port_no:
diff --git a/voltha/adapters/ponsim_onu/ponsim_onu.py b/voltha/adapters/ponsim_onu/ponsim_onu.py
index 77ab382..5eea6f7 100644
--- a/voltha/adapters/ponsim_onu/ponsim_onu.py
+++ b/voltha/adapters/ponsim_onu/ponsim_onu.py
@@ -289,7 +289,7 @@
if isinstance(msg, PonSimMetrics):
# Message is a reply to an ONU statistics request. Push it out to Kafka via adapter.submit_kpis().
if self.pm_metrics:
- self.log.info('Handling incoming ONU metrics')
+ self.log.debug('Handling incoming ONU metrics')
prefix = 'voltha.{}.{}'.format("ponsim_onu", self.device_id)
port_metrics = self.pm_metrics.extract_metrics(msg)
try:
@@ -307,7 +307,7 @@
}
)
- self.log.info('Submitting KPI for incoming ONU mnetrics')
+ self.log.debug('Submitting KPI for incoming ONU metrics')
# Step 3: submit
self.adapter_agent.submit_kpis(kpi_event)
diff --git a/voltha/coordinator.py b/voltha/coordinator.py
index 8fb083a..aaf8f77 100644
--- a/voltha/coordinator.py
+++ b/voltha/coordinator.py
@@ -274,7 +274,7 @@
try:
log.info('membership-record-before')
is_timeout, (_, record) = yield \
- self.consul_get_with_timeout(
+ self.coordinator_get_with_timeout(
key=self.membership_record_key,
index=0,
timeout=5)
@@ -427,7 +427,7 @@
# this shall return only when update is made to leader key
# or expires after 5 seconds wait
is_timeout, (tmp_index, updated) = yield \
- self.consul_get_with_timeout(
+ self.coordinator_get_with_timeout(
key=self.leader_prefix,
index=index,
timeout=5)
@@ -437,8 +437,8 @@
continue
# After timeout event the index returned from
- # consul_get_with_timeout is None. If we are here it's not a
- # timeout, therefore the index is a valid one.
+ # coordinator_get_with_timeout is None. If we are here it's
+ # not a timeout, therefore the index is a valid one.
index=tmp_index
if updated is None or updated != last:
@@ -568,7 +568,7 @@
returnValue(result)
@inlineCallbacks
- def consul_get_with_timeout(self, key, timeout, **kw):
+ def coordinator_get_with_timeout(self, key, timeout, **kw):
"""
Query consul with a timeout
:param key: Key to query
diff --git a/voltha/coordinator_etcd.py b/voltha/coordinator_etcd.py
index c736104..99baacc 100644
--- a/voltha/coordinator_etcd.py
+++ b/voltha/coordinator_etcd.py
@@ -16,9 +16,6 @@
""" Etcd-based coordinator services """
-from consul import ConsulException
-from consul.twisted import Consul
-from requests import ConnectionError
from structlog import get_logger
from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks, returnValue, Deferred
@@ -30,7 +27,7 @@
from common.utils.message_queue import MessageQueue
from voltha.registry import IComponent
from worker import Worker
-from simplejson import dumps, loads
+from simplejson import dumps
from common.utils.deferred_utils import DeferredWithTimeout, TimeOutError
log = get_logger()
@@ -45,9 +42,9 @@
"""
An app shall instantiate only one Coordinator (singleton).
A single instance of this object shall take care of all external
- with consul, and via consul, all coordination activities with its
+ with etcd, and via etcd, all coordination activities with its
clustered peers. Roles include:
- - registering an ephemeral membership entry (k/v record) in consul
+ - registering an ephemeral membership entry (k/v record) in etcd
- participating in a symmetric leader election, and potentially assuming
the leader's role. What leadership entails is not a concern for the
coordination, it simply instantiates (and shuts down) a leader class
@@ -65,7 +62,6 @@
instance_id,
rest_port,
config,
- consul='localhost:8500',
etcd='localhost:2379',
container_name_regex='^.*\.([0-9]+)\..*$'):
@@ -118,12 +114,6 @@
self.worker = Worker(self.instance_id, self)
- self.host = consul.split(':')[0].strip()
- self.port = int(consul.split(':')[1].strip())
-
- # TODO need to handle reconnect events properly
- self.consul = Consul(host=self.host, port=self.port)
-
# Create etcd client
kv_host = etcd.split(':')[0].strip()
kv_port = etcd.split(':')[1].strip()
@@ -222,7 +212,7 @@
def _clear_backoff(self):
if self.retries:
- log.info('reconnected-to-consul', after_retries=self.retries)
+ log.info('reconnected-to-etcd', after_retries=self.retries)
self.retries = 0
@inlineCallbacks
@@ -244,7 +234,7 @@
try:
yield self.lease.revoke()
except Exception as e:
- log.exception('failed-to-delete-session',
+ log.exception('failed-to-delete-session %s' % e,
session_id=self.session_id)
@inlineCallbacks
@@ -285,16 +275,18 @@
@inlineCallbacks
def _assert_membership_record_valid(self):
try:
- log.info('membership-record-before')
+ log.debug('membership-record-before')
is_timeout, (_, record) = yield \
- self.consul_get_with_timeout(
+ self.coordinator_get_with_timeout(
key=self.membership_record_key,
index=0,
timeout=5)
if is_timeout:
+ log.debug('timeout creating membership record in etcd, key: %s' %
+ self.membership_record_key)
returnValue(False)
- log.info('membership-record-after', record=record)
+ log.debug('membership-record-after', record=record)
if record is None or \
'Session' not in record:
log.info('membership-record-change-detected',
@@ -349,12 +341,12 @@
def _renew_session(m_callback):
try:
time_left = yield self.lease.remaining()
- log.info('_renew_session', time_left=time_left)
+ log.debug('_renew_session', time_left=time_left)
result = yield self.lease.refresh()
- log.info('just-renewed-session', result=result)
+ log.debug('just-renewed-session', result=result)
if not m_callback.called:
# Triggering callback will cancel the timeout timer
- log.info('trigger-callback-to-cancel-timeout-timer')
+ log.debug('trigger-callback-to-cancel-timeout-timer')
m_callback.callback(result)
else:
# Timeout event has already been called. Just ignore
@@ -428,7 +420,7 @@
if result.kvs:
kv = result.kvs[0]
leader = kv.value
- log.info('get-leader-key', leader=leader, instance=self.instance_id)
+ log.debug('get-leader-key', leader=leader, instance=self.instance_id)
if leader is None:
log.error('get-leader-failed')
@@ -437,12 +429,12 @@
log.info('leadership-seized')
yield self._assert_leadership()
else:
- log.info('already-leader')
+ log.debug('already-leader')
else:
- log.info('leader-is-another', leader=leader)
+ log.debug('leader-is-another', leader=leader)
yield self._assert_nonleadership(leader)
- except Exception, e:
+ except Exception as e:
log.exception('unexpected-error-leader-tracking', e=e)
finally:
@@ -514,7 +506,7 @@
prefix = True
keyset = KeySet(bytes(args[0]), prefix=True)
kw.pop('recurse')
- log.info('start-op', operation=operation, args=args, kw=kw)
+ log.debug('start-op', operation=operation, args=args, kw=kw)
while 1:
try:
@@ -571,16 +563,16 @@
result = yield operation(*args, **kw)
self._clear_backoff()
break
- except Exception, e:
+ except Exception as e:
if not self.shutting_down:
log.exception(e)
- yield self._backoff('unknown-error')
+ yield self._backoff('etcd-unknown-error: %s' % e)
- log.info('end-op', operation=operation, args=args, kw=kw)
+ log.debug('end-op', operation=operation, args=args, kw=kw)
returnValue(result)
@inlineCallbacks
- def consul_get_with_timeout(self, key, timeout, **kw):
+ def coordinator_get_with_timeout(self, key, timeout, **kw):
"""
Query etcd with a timeout
:param key: Key to query
@@ -597,7 +589,8 @@
for name, value in kw.items():
if name == 'index':
mod_revision = value
- log.info('consul_get_with_timeout', index=mod_revision)
+ log.debug('coordinator-get-with-timeout-etcd',
+ index=mod_revision)
kw.pop('index')
break
diff --git a/voltha/core/config/config_backend.py b/voltha/core/config/config_backend.py
index 71eeddb..d906348 100644
--- a/voltha/core/config/config_backend.py
+++ b/voltha/core/config/config_backend.py
@@ -20,8 +20,6 @@
import etcd3
import structlog
-log = structlog.get_logger()
-
class ConsulStore(object):
""" Config kv store for consul with a cache for quicker subsequent reads
@@ -38,6 +36,8 @@
RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
def __init__(self, host, port, path_prefix):
+
+ self.log = structlog.get_logger()
self._consul = Consul(host=host, port=port)
self.host = host
self.port = port
@@ -75,7 +75,7 @@
self._cache[key] = value
self._kv_put(self.make_path(key), value)
except Exception, e:
- log.exception('cannot-set-item', e=e)
+ self.log.exception('cannot-set-item', e=e)
def __delitem__(self, key):
self._cache.pop(key, None)
@@ -86,7 +86,7 @@
wait_time = self.RETRY_BACKOFF[min(self.retries,
len(self.RETRY_BACKOFF) - 1)]
self.retries += 1
- log.error(msg, retry_in=wait_time)
+ self.log.error(msg, retry_in=wait_time)
yield asleep(wait_time)
def _redo_consul_connection(self):
@@ -95,7 +95,7 @@
def _clear_backoff(self):
if self.retries:
- log.info('reconnected-to-consul', after_retries=self.retries)
+ self.log.info('reconnected-to-consul', after_retries=self.retries)
self.retries = 0
def _get_consul(self):
@@ -115,7 +115,7 @@
while 1:
try:
consul = self._get_consul()
- log.debug('consul', consul=consul, operation=operation,
+ self.log.debug('consul', consul=consul, operation=operation,
args=args)
if operation == 'GET':
index, result = consul.kv.get(*args, **kw)
@@ -129,13 +129,13 @@
self._clear_backoff()
break
except ConsulException, e:
- log.exception('consul-not-up', e=e)
+ self.log.exception('consul-not-up', e=e)
self._backoff('consul-not-up')
except ConnectionError, e:
- log.exception('cannot-connect-to-consul', e=e)
+ self.log.exception('cannot-connect-to-consul', e=e)
self._backoff('cannot-connect-to-consul')
except Exception, e:
- log.exception(e)
+ self.log.exception(e)
self._backoff('unknown-error')
self._redo_consul_connection()
@@ -157,6 +157,8 @@
RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
def __init__(self, host, port, path_prefix):
+
+ self.log = structlog.get_logger()
self._etcd = etcd3.client(host=host, port=port)
self.host = host
self.port = port
@@ -193,7 +195,7 @@
self._cache[key] = value
self._kv_put(self.make_path(key), value)
except Exception, e:
- log.exception('cannot-set-item', e=e)
+ self.log.exception('cannot-set-item', e=e)
def __delitem__(self, key):
self._cache.pop(key, None)
@@ -204,7 +206,7 @@
wait_time = self.RETRY_BACKOFF[min(self.retries,
len(self.RETRY_BACKOFF) - 1)]
self.retries += 1
- log.error(msg, retry_in=wait_time)
+ self.log.error(msg, retry_in=wait_time)
yield asleep(wait_time)
def _redo_etcd_connection(self):
@@ -213,7 +215,7 @@
def _clear_backoff(self):
if self.retries:
- log.info('reconnected-to-etcd', after_retries=self.retries)
+ self.log.info('reconnected-to-etcd', after_retries=self.retries)
self.retries = 0
def _get_etcd(self):
@@ -230,12 +232,18 @@
return self._retry('DELETE', *args, **kw)
def _retry(self, operation, *args, **kw):
- log.info('backend-op', operation=operation, args=args, kw=kw)
+
+ # etcd data sometimes contains non-utf8 sequences, replace
+ self.log.debug('backend-op',
+ operation=operation,
+ args=map(lambda x : unicode(x,'utf8','replace'), args),
+ kw=kw)
+
while 1:
try:
etcd = self._get_etcd()
- log.debug('etcd', etcd=etcd, operation=operation,
- args=args)
+ self.log.debug('etcd', etcd=etcd, operation=operation,
+ args=map(lambda x : unicode(x,'utf8','replace'), args))
if operation == 'GET':
(value, meta) = etcd.get(*args, **kw)
result = (value, meta)
@@ -249,7 +257,7 @@
self._clear_backoff()
break
except Exception, e:
- log.exception(e)
+ self.log.exception(e)
self._backoff('unknown-error-with-etcd')
self._redo_etcd_connection()
diff --git a/voltha/core/dispatcher.py b/voltha/core/dispatcher.py
index 06f707e..9a3ffd1 100644
--- a/voltha/core/dispatcher.py
+++ b/voltha/core/dispatcher.py
@@ -68,19 +68,19 @@
id=None,
broadcast=False):
"""
- Called whenever a global request is received from the NBI. The
+ Called whenever a global request is received from the NBI. The
request will be dispatch as follows:
1) to a specific voltha Instance if the core_id is specified
- 2) to the local Voltha Instance if the request specifies an ID that
+ 2) to the local Voltha Instance if the request specifies an ID that
matches the core id of the local Voltha instance
- 3) to a remote Voltha Instance if the request specifies an ID that
+ 3) to a remote Voltha Instance if the request specifies an ID that
matches the core id of that voltha instance
- 4) to all Voltha Instances if it's a broadcast request,
- e.g. getDevices, i.e. broadcast=True. The id may or may not be
- None. In this case, the results will be returned as a list of
+ 4) to all Voltha Instances if it's a broadcast request,
+ e.g. getDevices, i.e. broadcast=True. The id may or may not be
+ None. In this case, the results will be returned as a list of
responses back to the global handler
- 5) to the local voltha instance if id=None and broadcast=False.
- This occurs in cases where any Voltha instance will return the same
+ 5) to the local voltha instance if id=None and broadcast=False.
+ This occurs in cases where any Voltha instance will return the same
output, e.g. getAdapters()
:param method_name: rpc name
:param id: the id in the request, if present.
@@ -88,7 +88,7 @@
:param context: grpc context
:return: the response of that dispatching request
"""
- log.info('start',
+ log.debug('start',
_method_name=method_name,
id=id,
request=request)
@@ -173,7 +173,7 @@
request,
context)
# Then get peers results
- log.info('maps', peers=self.peers_map, grpc=self.grpc_conn_map)
+ log.debug('maps', peers=self.peers_map, grpc=self.grpc_conn_map)
current_responses = [result]
for core_id in self.peers_map:
if core_id == self.core_store_id:
diff --git a/voltha/core/flow_decomposer.py b/voltha/core/flow_decomposer.py
index 125bfe4..7595e51 100644
--- a/voltha/core/flow_decomposer.py
+++ b/voltha/core/flow_decomposer.py
@@ -616,7 +616,7 @@
nni=self._nni_logical_port_no)
if in_port_no == self._nni_logical_port_no:
- log.info('trap-nni')
+ log.debug('trap-nni')
# Trap flow for NNI port
fl_lst.append(mk_flow_stat(
priority=flow.priority,
@@ -633,7 +633,7 @@
))
else:
- log.info('trap-uni')
+ log.debug('trap-uni')
# Trap flow for UNI port
# in_port_no is None for wildcard input case, do not include
diff --git a/voltha/core/logical_device_agent.py b/voltha/core/logical_device_agent.py
index 87ebc7f..f3a8417 100644
--- a/voltha/core/logical_device_agent.py
+++ b/voltha/core/logical_device_agent.py
@@ -518,7 +518,7 @@
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ PACKET_OUT ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def packet_out(self, ofp_packet_out):
- self.log.info('packet-out', packet=ofp_packet_out)
+ self.log.debug('packet-out', packet=ofp_packet_out)
topic = 'packet-out:{}'.format(self.logical_device_id)
self.event_bus.publish(topic, ofp_packet_out)
diff --git a/voltha/leader.py b/voltha/leader.py
index 54f1117..6d666a9 100644
--- a/voltha/leader.py
+++ b/voltha/leader.py
@@ -245,9 +245,9 @@
def _track_members(self, index):
previous_index = index
try:
- log.info('member-tracking-before')
+ log.debug('member-tracking-before')
is_timeout, (tmp_index, results) = yield \
- self.coord.consul_get_with_timeout(
+ self.coord.coordinator_get_with_timeout(
key=self.coord.membership_prefix,
recurse=True,
index=index,
@@ -270,7 +270,7 @@
return
# After timeout event the index returned from
- # consul_get_with_timeout is None. If we are here it's not a
+ # coordinator_get_with_timeout is None. If we are here it's not a
# timeout, therefore the index is a valid one.
index=tmp_index
diff --git a/voltha/main.py b/voltha/main.py
index a2f1df2..9485b18 100755
--- a/voltha/main.py
+++ b/voltha/main.py
@@ -299,11 +299,12 @@
verbosity_adjust = (args.verbose or 0) - (args.quiet or 0)
self.log = setup_logging(self.logconfig,
args.instance_id,
- verbosity_adjust=verbosity_adjust)
+ verbosity_adjust=verbosity_adjust,
+ cache_on_use=True)
self.log.info('core-number-extractor', regex=args.container_name_regex)
self.voltha_version = self.get_version()
- self.log.info('VOLTHA version is {}'.format(self.voltha_version))
+ self.log.info('VOLTHA version is %s' % self.voltha_version)
# configurable variables from voltha.yml file
#self.configurable_vars = self.config.get('Constants', {})
@@ -392,7 +393,6 @@
rest_port=self.args.rest_port,
instance_id=self.instance_id,
config=self.config,
- consul=self.args.consul,
etcd=self.args.etcd,
container_name_regex=self.args.container_name_regex)
).start()
diff --git a/voltha/northbound/kafka/kafka_proxy.py b/voltha/northbound/kafka/kafka_proxy.py
index 4b53483..b615b19 100644
--- a/voltha/northbound/kafka/kafka_proxy.py
+++ b/voltha/northbound/kafka/kafka_proxy.py
@@ -160,13 +160,13 @@
log.error('no-kafka-producer', endpoint=self.kafka_endpoint)
return
- log.debug('sending-kafka-msg', topic=topic, msg=msg)
+ log.debug('sending-kafka-msg', topic=topic, kafka_msg=msg)
msgs = [msg]
if self.kproducer is not None and self.event_bus_publisher and self.faulty is False:
d = deferToThread(self.kproducer.produce, topic, msg, key)
yield d
- log.debug('sent-kafka-msg', topic=topic, msg=msg)
+ log.debug('sent-kafka-msg', topic=topic, kafka_msg=msg)
# send a lightweight poll to avoid an exception after 100k messages.
d1 = deferToThread(self.kproducer.poll, 0)
yield d1
@@ -175,7 +175,7 @@
except Exception, e:
self.faulty = True
- log.error('failed-to-send-kafka-msg', topic=topic, msg=msg, e=e)
+ log.error('failed-to-send-kafka-msg', topic=topic, kafka_msg=msg, e=e)
# set the kafka producer to None. This is needed if the
# kafka docker went down and comes back up with a different
diff --git a/voltha/worker.py b/voltha/worker.py
index 154c931..ed4fd6d 100644
--- a/voltha/worker.py
+++ b/voltha/worker.py
@@ -151,7 +151,7 @@
if self.mycore_store_id:
# Wait for updates to the store assigment key
is_timeout, (tmp_index, mappings) = yield \
- self.coord.consul_get_with_timeout(
+ self.coord.coordinator_get_with_timeout(
key=self.coord.core_store_assignment_key,
recurse=True,
index=index,
@@ -161,8 +161,8 @@
return
# After timeout event the index returned from
- # consul_get_with_timeout is None. If we are here it's not a
- # timeout, therefore the index is a valid one.
+ # coordinator_get_with_timeout is None. If we are here it's
+ # not a timeout, therefore the index is a valid one.
index=tmp_index
if mappings and index != prev_index: