[VOL-2471] Update library to use package logger
This commit consists of the following:
1) Add a GetLogLevel() API to make it easier to use specific
logger. There is also the V() API that kind of do something
similar.
2) Add a common.go file to some heavily used packages in order
to dynamically set their log level and also to a set a specific
logger per package.
3) Use a per package logger for some of the heavily used packages
for improved performance.
Change-Id: If22a2c82d87d808f305677a2e793f8064f33291e
diff --git a/pkg/kafka/kafka_inter_container_library.go b/pkg/kafka/kafka_inter_container_library.go
index 4e04b30..652bdfa 100644
--- a/pkg/kafka/kafka_inter_container_library.go
+++ b/pkg/kafka/kafka_inter_container_library.go
@@ -31,11 +31,6 @@
"time"
)
-// Initialize the logger - gets the default until the main function setup the logger
-func init() {
- log.AddPackage(log.JSON, log.DebugLevel, nil)
-}
-
const (
DefaultMaxRetries = 3
DefaultRequestTimeout = 10000 // 10000 milliseconds - to handle a wider latency range
@@ -148,11 +143,11 @@
}
func (kp *InterContainerProxy) Start() error {
- log.Info("Starting-Proxy")
+ logger.Info("Starting-Proxy")
// Kafka MsgClient should already have been created. If not, output fatal error
if kp.kafkaClient == nil {
- log.Fatal("kafka-client-not-set")
+ logger.Fatal("kafka-client-not-set")
}
// Create the Done channel
@@ -160,7 +155,7 @@
// Start the kafka client
if err := kp.kafkaClient.Start(); err != nil {
- log.Errorw("Cannot-create-kafka-proxy", log.Fields{"error": err})
+ logger.Errorw("Cannot-create-kafka-proxy", log.Fields{"error": err})
return err
}
@@ -177,7 +172,7 @@
}
func (kp *InterContainerProxy) Stop() {
- log.Info("stopping-intercontainer-proxy")
+ logger.Info("stopping-intercontainer-proxy")
kp.doneCh <- 1
// TODO : Perform cleanup
kp.kafkaClient.Stop()
@@ -188,10 +183,10 @@
// DeviceDiscovered publish the discovered device onto the kafka messaging bus
func (kp *InterContainerProxy) DeviceDiscovered(deviceId string, deviceType string, parentId string, publisher string) error {
- log.Debugw("sending-device-discovery-msg", log.Fields{"deviceId": deviceId})
+ logger.Debugw("sending-device-discovery-msg", log.Fields{"deviceId": deviceId})
// Simple validation
if deviceId == "" || deviceType == "" {
- log.Errorw("invalid-parameters", log.Fields{"id": deviceId, "type": deviceType})
+ logger.Errorw("invalid-parameters", log.Fields{"id": deviceId, "type": deviceType})
return errors.New("invalid-parameters")
}
// Create the device discovery message
@@ -212,7 +207,7 @@
var marshalledData *any.Any
var err error
if marshalledData, err = ptypes.MarshalAny(body); err != nil {
- log.Errorw("cannot-marshal-request", log.Fields{"error": err})
+ logger.Errorw("cannot-marshal-request", log.Fields{"error": err})
return err
}
msg := &ic.InterContainerMessage{
@@ -222,7 +217,7 @@
// Send the message
if err := kp.kafkaClient.Send(msg, kp.deviceDiscoveryTopic); err != nil {
- log.Errorw("cannot-send-device-discovery-message", log.Fields{"error": err})
+ logger.Errorw("cannot-send-device-discovery-message", log.Fields{"error": err})
return err
}
return nil
@@ -242,7 +237,7 @@
// Encode the request
protoRequest, err := encodeRequest(rpc, toTopic, responseTopic, key, kvArgs...)
if err != nil {
- log.Warnw("cannot-format-request", log.Fields{"rpc": rpc, "error": err})
+ logger.Warnw("cannot-format-request", log.Fields{"rpc": rpc, "error": err})
return false, nil
}
@@ -251,7 +246,7 @@
if waitForResponse {
var err error
if ch, err = kp.subscribeForResponse(*responseTopic, protoRequest.Header.Id); err != nil {
- log.Errorw("failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
+ logger.Errorw("failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
}
}
@@ -259,7 +254,7 @@
// specific key, hence ensuring a single partition is used to publish the request. This ensures that the
// subscriber on that topic will receive the request in the order it was sent. The key used is the deviceId.
//key := GetDeviceIdFromTopic(*toTopic)
- log.Debugw("sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
+ logger.Debugw("sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
go kp.kafkaClient.Send(protoRequest, toTopic, key)
if waitForResponse {
@@ -279,7 +274,7 @@
select {
case msg, ok := <-ch:
if !ok {
- log.Warnw("channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
+ logger.Warnw("channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
protoError := &ic.Error{Reason: "channel-closed"}
var marshalledArg *any.Any
if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
@@ -287,15 +282,15 @@
}
return false, marshalledArg
}
- log.Debugw("received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
+ logger.Debugw("received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
var responseBody *ic.InterContainerResponseBody
var err error
if responseBody, err = decodeResponse(msg); err != nil {
- log.Errorw("decode-response-error", log.Fields{"error": err})
+ logger.Errorw("decode-response-error", log.Fields{"error": err})
}
return responseBody.Success, responseBody.Result
case <-ctx.Done():
- log.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
+ logger.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
// pack the error as proto any type
protoError := &ic.Error{Reason: ctx.Err().Error()}
var marshalledArg *any.Any
@@ -304,7 +299,7 @@
}
return false, marshalledArg
case <-childCtx.Done():
- log.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": childCtx.Err()})
+ logger.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": childCtx.Err()})
// pack the error as proto any type
protoError := &ic.Error{Reason: childCtx.Err().Error()}
var marshalledArg *any.Any
@@ -313,7 +308,7 @@
}
return false, marshalledArg
case <-kp.doneCh:
- log.Infow("received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
+ logger.Infow("received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
return true, nil
}
}
@@ -329,7 +324,7 @@
var err error
if ch, err = kp.kafkaClient.Subscribe(&topic); err != nil {
//if ch, err = kp.Subscribe(topic); err != nil {
- log.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
+ logger.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
return err
}
@@ -348,7 +343,7 @@
var ch <-chan *ic.InterContainerMessage
var err error
if ch, err = kp.kafkaClient.Subscribe(&topic, &KVArg{Key: Offset, Value: initialOffset}); err != nil {
- log.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
+ logger.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
return err
}
kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: kp.defaultRequestHandlerInterface, ch: ch})
@@ -387,7 +382,7 @@
// Unsubscribe to this topic first - this will close the subscribed channel
var err error
if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
- log.Errorw("unsubscribing-error", log.Fields{"topic": topic})
+ logger.Errorw("unsubscribing-error", log.Fields{"topic": topic})
}
delete(kp.topicToResponseChannelMap, topic)
return err
@@ -403,7 +398,7 @@
for topic, _ := range kp.topicToResponseChannelMap {
// Unsubscribe to this topic first - this will close the subscribed channel
if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
- log.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
+ logger.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
}
delete(kp.topicToResponseChannelMap, topic)
}
@@ -438,7 +433,7 @@
for topic, _ := range kp.topicToRequestHandlerChannelMap {
// Close the kafka client client first by unsubscribing to this topic
if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
- log.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
+ logger.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
}
delete(kp.topicToRequestHandlerChannelMap, topic)
}
@@ -486,10 +481,10 @@
func (kp *InterContainerProxy) DeleteTopic(topic Topic) error {
// If we have any consumers on that topic we need to close them
if err := kp.deleteFromTopicResponseChannelMap(topic.Name); err != nil {
- log.Errorw("delete-from-topic-responsechannelmap-failed", log.Fields{"error": err})
+ logger.Errorw("delete-from-topic-responsechannelmap-failed", log.Fields{"error": err})
}
if err := kp.deleteFromTopicRequestHandlerChannelMap(topic.Name); err != nil {
- log.Errorw("delete-from-topic-requesthandlerchannelmap-failed", log.Fields{"error": err})
+ logger.Errorw("delete-from-topic-requesthandlerchannelmap-failed", log.Fields{"error": err})
}
kp.deleteTopicTransactionIdToChannelMap(topic.Name)
@@ -503,7 +498,7 @@
}
protoValue, ok := returnedVal.(proto.Message)
if !ok {
- log.Warnw("response-value-not-proto-message", log.Fields{"error": ok, "returnVal": returnedVal})
+ logger.Warnw("response-value-not-proto-message", log.Fields{"error": ok, "returnVal": returnedVal})
err := errors.New("response-value-not-proto-message")
return nil, err
}
@@ -512,7 +507,7 @@
var marshalledReturnedVal *any.Any
var err error
if marshalledReturnedVal, err = ptypes.MarshalAny(protoValue); err != nil {
- log.Warnw("cannot-marshal-returned-val", log.Fields{"error": err})
+ logger.Warnw("cannot-marshal-returned-val", log.Fields{"error": err})
return nil, err
}
return marshalledReturnedVal, nil
@@ -534,7 +529,7 @@
var err error
// Error should never happen here
if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
- log.Warnw("cannot-marshal-failed-response-body", log.Fields{"error": err})
+ logger.Warnw("cannot-marshal-failed-response-body", log.Fields{"error": err})
}
return &ic.InterContainerMessage{
@@ -547,7 +542,7 @@
//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
//or an error on failure
func encodeResponse(request *ic.InterContainerMessage, success bool, returnedValues ...interface{}) (*ic.InterContainerMessage, error) {
- //log.Debugw("encodeResponse", log.Fields{"success": success, "returnedValues": returnedValues})
+ //logger.Debugw("encodeResponse", log.Fields{"success": success, "returnedValues": returnedValues})
responseHeader := &ic.Header{
Id: request.Header.Id,
Type: ic.MessageType_RESPONSE,
@@ -562,7 +557,7 @@
var err error
for _, returnVal := range returnedValues {
if marshalledReturnedVal, err = encodeReturnedValue(returnVal); err != nil {
- log.Warnw("cannot-marshal-response-body", log.Fields{"error": err})
+ logger.Warnw("cannot-marshal-response-body", log.Fields{"error": err})
}
break // for now we support only 1 returned value - (excluding the error)
}
@@ -575,7 +570,7 @@
// Marshal the response body
var marshalledResponseBody *any.Any
if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
- log.Warnw("cannot-marshal-response-body", log.Fields{"error": err})
+ logger.Warnw("cannot-marshal-response-body", log.Fields{"error": err})
return nil, err
}
@@ -611,7 +606,7 @@
var marshalledArg *any.Any
var err error
if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: transactionId}); err != nil {
- log.Warnw("cannot-add-transactionId", log.Fields{"error": err})
+ logger.Warnw("cannot-add-transactionId", log.Fields{"error": err})
return currentArgs
}
protoArg := &ic.Argument{
@@ -625,7 +620,7 @@
var marshalledArg *any.Any
var err error
if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: fromTopic}); err != nil {
- log.Warnw("cannot-add-transactionId", log.Fields{"error": err})
+ logger.Warnw("cannot-add-transactionId", log.Fields{"error": err})
return currentArgs
}
protoArg := &ic.Argument{
@@ -645,9 +640,9 @@
// Get the request body
requestBody := &ic.InterContainerRequestBody{}
if err = ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
- log.Warnw("cannot-unmarshal-request", log.Fields{"error": err})
+ logger.Warnw("cannot-unmarshal-request", log.Fields{"error": err})
} else {
- log.Debugw("received-request", log.Fields{"rpc": requestBody.Rpc, "header": msg.Header})
+ logger.Debugw("received-request", log.Fields{"rpc": requestBody.Rpc, "header": msg.Header})
// let the callee unpack the arguments as its the only one that knows the real proto type
// Augment the requestBody with the message Id as it will be used in scenarios where cores
// are set in pairs and competing
@@ -659,7 +654,7 @@
out, err = CallFuncByName(targetInterface, requestBody.Rpc, requestBody.Args)
if err != nil {
- log.Warn(err)
+ logger.Warn(err)
}
}
// Response required?
@@ -679,7 +674,7 @@
if out[lastIndex].Interface() != nil { // Error
if retError, ok := out[lastIndex].Interface().(error); ok {
if retError.Error() == ErrorTransactionNotAcquired.Error() {
- log.Debugw("Ignoring request", log.Fields{"error": retError, "txId": msg.Header.Id})
+ logger.Debugw("Ignoring request", log.Fields{"error": retError, "txId": msg.Header.Id})
return // Ignore - process is in competing mode and ignored transaction
}
returnError = &ic.Error{Reason: retError.Error()}
@@ -689,12 +684,12 @@
returnedValues = append(returnedValues, returnError)
}
} else if len(out) == 2 && reflect.ValueOf(out[0].Interface()).IsValid() && reflect.ValueOf(out[0].Interface()).IsNil() {
- log.Warnw("Unexpected response of (nil,nil)", log.Fields{"txId": msg.Header.Id})
+ logger.Warnw("Unexpected response of (nil,nil)", log.Fields{"txId": msg.Header.Id})
return // Ignore - should not happen
} else { // Non-error case
success = true
for idx, val := range out {
- //log.Debugw("returned-api-response-loop", log.Fields{"idx": idx, "val": val.Interface()})
+ //logger.Debugw("returned-api-response-loop", log.Fields{"idx": idx, "val": val.Interface()})
if idx != lastIndex {
returnedValues = append(returnedValues, val.Interface())
}
@@ -704,7 +699,7 @@
var icm *ic.InterContainerMessage
if icm, err = encodeResponse(msg, success, returnedValues...); err != nil {
- log.Warnw("error-encoding-response-returning-failure-result", log.Fields{"error": err})
+ logger.Warnw("error-encoding-response-returning-failure-result", log.Fields{"error": err})
icm = encodeDefaultFailedResponse(msg)
}
// To preserve ordering of messages, all messages to a given topic are sent to the same partition
@@ -713,22 +708,22 @@
// partitions.
replyTopic := &Topic{Name: msg.Header.FromTopic}
key := msg.Header.KeyTopic
- log.Debugw("sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": icm.Header, "key": key})
+ logger.Debugw("sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": icm.Header, "key": key})
// TODO: handle error response.
go kp.kafkaClient.Send(icm, replyTopic, key)
}
} else if msg.Header.Type == ic.MessageType_RESPONSE {
- log.Debugw("response-received", log.Fields{"msg-header": msg.Header})
+ logger.Debugw("response-received", log.Fields{"msg-header": msg.Header})
go kp.dispatchResponse(msg)
} else {
- log.Warnw("unsupported-message-received", log.Fields{"msg-header": msg.Header})
+ logger.Warnw("unsupported-message-received", log.Fields{"msg-header": msg.Header})
}
}
func (kp *InterContainerProxy) waitForMessages(ch <-chan *ic.InterContainerMessage, topic Topic, targetInterface interface{}) {
// Wait for messages
for msg := range ch {
- //log.Debugw("request-received", log.Fields{"msg": msg, "topic": topic.Name, "target": targetInterface})
+ //logger.Debugw("request-received", log.Fields{"msg": msg, "topic": topic.Name, "target": targetInterface})
go kp.handleMessage(msg, targetInterface)
}
}
@@ -737,7 +732,7 @@
kp.lockTransactionIdToChannelMap.RLock()
defer kp.lockTransactionIdToChannelMap.RUnlock()
if _, exist := kp.transactionIdToChannelMap[msg.Header.Id]; !exist {
- log.Debugw("no-waiting-channel", log.Fields{"transaction": msg.Header.Id})
+ logger.Debugw("no-waiting-channel", log.Fields{"transaction": msg.Header.Id})
return
}
kp.transactionIdToChannelMap[msg.Header.Id].ch <- msg
@@ -748,7 +743,7 @@
// API. There is one response channel waiting for kafka messages before dispatching the message to the
// corresponding waiting channel
func (kp *InterContainerProxy) subscribeForResponse(topic Topic, trnsId string) (chan *ic.InterContainerMessage, error) {
- log.Debugw("subscribeForResponse", log.Fields{"topic": topic.Name, "trnsid": trnsId})
+ logger.Debugw("subscribeForResponse", log.Fields{"topic": topic.Name, "trnsid": trnsId})
// Create a specific channel for this consumers. We cannot use the channel from the kafkaclient as it will
// broadcast any message for this topic to all channels waiting on it.
@@ -759,7 +754,7 @@
}
func (kp *InterContainerProxy) unSubscribeForResponse(trnsId string) error {
- log.Debugw("unsubscribe-for-response", log.Fields{"trnsId": trnsId})
+ logger.Debugw("unsubscribe-for-response", log.Fields{"trnsId": trnsId})
kp.deleteFromTransactionIdToChannelMap(trnsId)
return nil
}
@@ -803,12 +798,12 @@
// ascertain the value interface type is a proto.Message
protoValue, ok := arg.Value.(proto.Message)
if !ok {
- log.Warnw("argument-value-not-proto-message", log.Fields{"error": ok, "Value": arg.Value})
+ logger.Warnw("argument-value-not-proto-message", log.Fields{"error": ok, "Value": arg.Value})
err := errors.New("argument-value-not-proto-message")
return nil, err
}
if marshalledArg, err = ptypes.MarshalAny(protoValue); err != nil {
- log.Warnw("cannot-marshal-request", log.Fields{"error": err})
+ logger.Warnw("cannot-marshal-request", log.Fields{"error": err})
return nil, err
}
protoArg := &ic.Argument{
@@ -821,7 +816,7 @@
var marshalledData *any.Any
var err error
if marshalledData, err = ptypes.MarshalAny(requestBody); err != nil {
- log.Warnw("cannot-marshal-request", log.Fields{"error": err})
+ logger.Warnw("cannot-marshal-request", log.Fields{"error": err})
return nil, err
}
request := &ic.InterContainerMessage{
@@ -835,10 +830,10 @@
// Extract the message body
responseBody := ic.InterContainerResponseBody{}
if err := ptypes.UnmarshalAny(response.Body, &responseBody); err != nil {
- log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+ logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
return nil, err
}
- //log.Debugw("response-decoded-successfully", log.Fields{"response-status": &responseBody.Success})
+ //logger.Debugw("response-decoded-successfully", log.Fields{"response-status": &responseBody.Success})
return &responseBody, nil