blob: 82c3d7d3c6d7c3a5c1cb81299fa118409af6145d [file] [log] [blame]
Rohan Agrawalc32d9932020-06-15 11:01:47 +00001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17// File contains utility functions to support Open Tracing in conjunction with
18// Enhanced Logging based on context propagation
19
20package log
21
22import (
23 "context"
24 "errors"
Girish Kumarcd402012020-08-18 12:17:38 +000025 "fmt"
Rohan Agrawalc32d9932020-06-15 11:01:47 +000026 "github.com/opentracing/opentracing-go"
27 jtracing "github.com/uber/jaeger-client-go"
28 jcfg "github.com/uber/jaeger-client-go/config"
Rohan Agrawalc32d9932020-06-15 11:01:47 +000029 "io"
30 "os"
Girish Kumaradc3ba12020-06-15 14:22:55 +000031 "strings"
Girish Kumarcd402012020-08-18 12:17:38 +000032 "sync"
Rohan Agrawalc32d9932020-06-15 11:01:47 +000033)
34
Girish Kumaradc3ba12020-06-15 14:22:55 +000035const (
36 RootSpanNameKey = "op-name"
37)
38
Girish Kumarcd402012020-08-18 12:17:38 +000039// Global Settings governing the Log Correlation and Tracing features. Should only
40// be updated through the exposed public methods
41type LogFeaturesManager struct {
42 isTracePublishingEnabled bool
43 isLogCorrelationEnabled bool
44 componentName string // Name of component extracted from ENV variable
45 activeTraceAgentAddress string
46 lock sync.Mutex
47}
Girish Kumaradc3ba12020-06-15 14:22:55 +000048
Girish Kumarcd402012020-08-18 12:17:38 +000049var globalLFM *LogFeaturesManager = &LogFeaturesManager{}
50
51func GetGlobalLFM() *LogFeaturesManager {
52 return globalLFM
53}
54
55// A Wrapper to utilize currently Active Tracer instance. The middleware library being used for generating
56// Spans for GRPC API calls does not support dynamically setting the Active Tracer similar to the SetGlobalTracer method
57// provided by OpenTracing API
58type ActiveTracerProxy struct {
59}
60
61func (atw ActiveTracerProxy) StartSpan(operationName string, opts ...opentracing.StartSpanOption) opentracing.Span {
62 return opentracing.GlobalTracer().StartSpan(operationName, opts...)
63}
64
65func (atw ActiveTracerProxy) Inject(sm opentracing.SpanContext, format interface{}, carrier interface{}) error {
66 return opentracing.GlobalTracer().Inject(sm, format, carrier)
67}
68
69func (atw ActiveTracerProxy) Extract(format interface{}, carrier interface{}) (opentracing.SpanContext, error) {
70 return opentracing.GlobalTracer().Extract(format, carrier)
71}
Girish Kumaradc3ba12020-06-15 14:22:55 +000072
73// Jaeger complaint Logger instance to redirect logs to Default Logger
74type traceLogger struct {
75 logger *clogger
76}
77
78func (tl traceLogger) Error(msg string) {
79 tl.logger.Error(context.Background(), msg)
80}
81
82func (tl traceLogger) Infof(msg string, args ...interface{}) {
83 // Tracing logs should be performed only at Debug Verbosity
84 tl.logger.Debugf(context.Background(), msg, args...)
85}
86
Girish Kumarcd402012020-08-18 12:17:38 +000087// Wrapper to handle correct Closer call at the time of Process Termination
88type traceCloser struct {
89}
90
91func (c traceCloser) Close() error {
92 currentActiveTracer := opentracing.GlobalTracer()
93 if currentActiveTracer != nil {
94 if jTracer, ok := currentActiveTracer.(*jtracing.Tracer); ok {
95 jTracer.Close()
96 }
Girish Kumaradc3ba12020-06-15 14:22:55 +000097 }
98
Girish Kumarcd402012020-08-18 12:17:38 +000099 return nil
100}
Girish Kumaradc3ba12020-06-15 14:22:55 +0000101
Girish Kumarcd402012020-08-18 12:17:38 +0000102// Method to Initialize Jaeger based Tracing client based on initial status of Tracing Publish and Log Correlation
103func (lfm *LogFeaturesManager) InitTracingAndLogCorrelation(tracePublishEnabled bool, traceAgentAddress string, logCorrelationEnabled bool) (io.Closer, error) {
104 lfm.componentName = os.Getenv("COMPONENT_NAME")
105 if lfm.componentName == "" {
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000106 return nil, errors.New("Unable to retrieve PoD Component Name from Runtime env")
107 }
108
Girish Kumarcd402012020-08-18 12:17:38 +0000109 lfm.lock.Lock()
110 defer lfm.lock.Unlock()
111
112 // Use NoopTracer when both Tracing Publishing and Log Correlation are disabled
113 if !tracePublishEnabled && !logCorrelationEnabled {
114 logger.Info(context.Background(), "Skipping Global Tracer initialization as both Trace publish and Log correlation are configured as disabled")
115 lfm.isTracePublishingEnabled = false
116 lfm.isLogCorrelationEnabled = false
117 opentracing.SetGlobalTracer(opentracing.NoopTracer{})
118 return traceCloser{}, nil
119 }
120
121 tracer, _, err := lfm.constructJaegerTracer(tracePublishEnabled, traceAgentAddress, true)
122 if err != nil {
123 return nil, err
124 }
125
126 // Initialize variables representing Active Status
127 opentracing.SetGlobalTracer(tracer)
128 lfm.isTracePublishingEnabled = tracePublishEnabled
129 lfm.activeTraceAgentAddress = traceAgentAddress
130 lfm.isLogCorrelationEnabled = logCorrelationEnabled
131 return traceCloser{}, nil
132}
133
134// Method to replace Active Tracer along with graceful closer of previous tracer
135func (lfm *LogFeaturesManager) replaceActiveTracer(tracer opentracing.Tracer) {
136 currentActiveTracer := opentracing.GlobalTracer()
137 opentracing.SetGlobalTracer(tracer)
138
139 if currentActiveTracer != nil {
140 if jTracer, ok := currentActiveTracer.(*jtracing.Tracer); ok {
141 jTracer.Close()
142 }
143 }
144}
145
146func (lfm *LogFeaturesManager) GetLogCorrelationStatus() bool {
147 lfm.lock.Lock()
148 defer lfm.lock.Unlock()
149
150 return lfm.isLogCorrelationEnabled
151}
152
153func (lfm *LogFeaturesManager) SetLogCorrelationStatus(isEnabled bool) {
154 lfm.lock.Lock()
155 defer lfm.lock.Unlock()
156
157 if isEnabled == lfm.isLogCorrelationEnabled {
158 logger.Debugf(context.Background(), "Ignoring Log Correlation Set operation with value %t; current Status same as desired", isEnabled)
159 return
160 }
161
162 if isEnabled {
163 // Construct new Tracer instance if Log Correlation has been enabled and current active tracer is a NoopTracer instance.
164 // Continue using the earlier tracer instance in case of any error
165 if _, ok := opentracing.GlobalTracer().(opentracing.NoopTracer); ok {
166 tracer, _, err := lfm.constructJaegerTracer(lfm.isTracePublishingEnabled, lfm.activeTraceAgentAddress, false)
167 if err != nil {
168 logger.Warnf(context.Background(), "Log Correlation Enable operation failed with error: %s", err.Error())
169 return
170 }
171
172 lfm.replaceActiveTracer(tracer)
173 }
174
175 lfm.isLogCorrelationEnabled = true
176 logger.Info(context.Background(), "Log Correlation has been enabled")
177
178 } else {
179 // Switch to NoopTracer when Log Correlation has been disabled and Tracing Publish is already disabled
180 if _, ok := opentracing.GlobalTracer().(opentracing.NoopTracer); !ok && !lfm.isTracePublishingEnabled {
181 lfm.replaceActiveTracer(opentracing.NoopTracer{})
182 }
183
184 lfm.isLogCorrelationEnabled = false
185 logger.Info(context.Background(), "Log Correlation has been disabled")
186 }
187}
188
189func (lfm *LogFeaturesManager) GetTracePublishingStatus() bool {
190 lfm.lock.Lock()
191 defer lfm.lock.Unlock()
192
193 return lfm.isTracePublishingEnabled
194}
195
196func (lfm *LogFeaturesManager) SetTracePublishingStatus(isEnabled bool) {
197 lfm.lock.Lock()
198 defer lfm.lock.Unlock()
199
200 if isEnabled == lfm.isTracePublishingEnabled {
201 logger.Debugf(context.Background(), "Ignoring Trace Publishing Set operation with value %t; current Status same as desired", isEnabled)
202 return
203 }
204
205 if isEnabled {
206 // Construct new Tracer instance if Tracing Publish has been enabled (even if a Jaeger instance is already active)
207 // This is needed to ensure that a fresh lookup of Jaeger Agent address is performed again while performing
208 // Disable-Enable of Tracing
209 tracer, _, err := lfm.constructJaegerTracer(isEnabled, lfm.activeTraceAgentAddress, false)
210 if err != nil {
211 logger.Warnf(context.Background(), "Trace Publishing Enable operation failed with error: %s", err.Error())
212 return
213 }
214 lfm.replaceActiveTracer(tracer)
215
216 lfm.isTracePublishingEnabled = true
217 logger.Info(context.Background(), "Tracing Publishing has been enabled")
218 } else {
219 // Switch to NoopTracer when Tracing Publish has been disabled and Log Correlation is already disabled
220 if !lfm.isLogCorrelationEnabled {
221 lfm.replaceActiveTracer(opentracing.NoopTracer{})
222 } else {
223 // Else construct a new Jaeger Instance with publishing disabled
224 tracer, _, err := lfm.constructJaegerTracer(isEnabled, lfm.activeTraceAgentAddress, false)
225 if err != nil {
226 logger.Warnf(context.Background(), "Trace Publishing Disable operation failed with error: %s", err.Error())
227 return
228 }
229 lfm.replaceActiveTracer(tracer)
230 }
231
232 lfm.isTracePublishingEnabled = false
233 logger.Info(context.Background(), "Tracing Publishing has been disabled")
234 }
235}
236
237// Method to contruct a new Jaeger Tracer instance based on given Trace Agent address and Publish status.
238// The last attribute indicates whether to use Loopback IP for creating Jaeger Client when the DNS lookup
239// of supplied Trace Agent address has failed. It is fine to fallback during the initialization step, but
240// not later (when enabling/disabling the status dynamically)
241func (lfm *LogFeaturesManager) constructJaegerTracer(tracePublishEnabled bool, traceAgentAddress string, fallbackToLoopbackAllowed bool) (opentracing.Tracer, io.Closer, error) {
242 cfg := jcfg.Configuration{ServiceName: lfm.componentName}
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000243
Girish Kumaradc3ba12020-06-15 14:22:55 +0000244 var err error
245 var jReporterConfig jcfg.ReporterConfig
246 var jReporterCfgOption jtracing.Reporter
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000247
Girish Kumarcd402012020-08-18 12:17:38 +0000248 logger.Info(context.Background(), "Constructing new Jaeger Tracer instance")
Girish Kumaradc3ba12020-06-15 14:22:55 +0000249
Girish Kumarcd402012020-08-18 12:17:38 +0000250 // Attempt Trace Agent Address first; will fallback to Loopback IP if it fails
251 jReporterConfig = jcfg.ReporterConfig{LocalAgentHostPort: traceAgentAddress, LogSpans: true}
252 jReporterCfgOption, err = jReporterConfig.NewReporter(lfm.componentName, jtracing.NewNullMetrics(), traceLogger{logger: logger.(*clogger)})
253
254 if err != nil {
255 if !fallbackToLoopbackAllowed {
256 return nil, nil, errors.New("Reporter Creation for given Trace Agent address " + traceAgentAddress + " failed with error : " + err.Error())
Girish Kumaradc3ba12020-06-15 14:22:55 +0000257 }
Girish Kumaradc3ba12020-06-15 14:22:55 +0000258
Girish Kumarcd402012020-08-18 12:17:38 +0000259 logger.Infow(context.Background(), "Unable to create Reporter with given Trace Agent address",
260 Fields{"error": err, "address": traceAgentAddress})
261 // The Reporter initialization may fail due to Invalid Agent address or non-existent Agent (DNS lookup failure).
262 // It is essential for Tracer Instance to still start for correct Span propagation needed for log correlation.
263 // Thus, falback to use loopback IP for Reporter initialization before throwing back any error
264 tracePublishEnabled = false
265
Girish Kumaradc3ba12020-06-15 14:22:55 +0000266 jReporterConfig.LocalAgentHostPort = "127.0.0.1:6831"
Girish Kumarcd402012020-08-18 12:17:38 +0000267 jReporterCfgOption, err = jReporterConfig.NewReporter(lfm.componentName, jtracing.NewNullMetrics(), traceLogger{logger: logger.(*clogger)})
Girish Kumaradc3ba12020-06-15 14:22:55 +0000268 if err != nil {
Girish Kumarcd402012020-08-18 12:17:38 +0000269 return nil, nil, errors.New("Failed to initialize Jaeger Tracing due to Reporter creation error : " + err.Error())
Girish Kumaradc3ba12020-06-15 14:22:55 +0000270 }
271 }
272
273 // To start with, we are using Constant Sampling type
274 samplerParam := 0 // 0: Do not publish span, 1: Publish
275 if tracePublishEnabled {
276 samplerParam = 1
277 }
278 jSamplerConfig := jcfg.SamplerConfig{Type: "const", Param: float64(samplerParam)}
Girish Kumarcd402012020-08-18 12:17:38 +0000279 jSamplerCfgOption, err := jSamplerConfig.NewSampler(lfm.componentName, jtracing.NewNullMetrics())
Girish Kumaradc3ba12020-06-15 14:22:55 +0000280 if err != nil {
Girish Kumarcd402012020-08-18 12:17:38 +0000281 return nil, nil, errors.New("Unable to create Sampler : " + err.Error())
Girish Kumaradc3ba12020-06-15 14:22:55 +0000282 }
283
Girish Kumarcd402012020-08-18 12:17:38 +0000284 return cfg.NewTracer(jcfg.Reporter(jReporterCfgOption), jcfg.Sampler(jSamplerCfgOption))
285}
286
287func TerminateTracing(c io.Closer) {
288 err := c.Close()
289 if err != nil {
290 logger.Error(context.Background(), "error-while-closing-jaeger-tracer", Fields{"err": err})
291 }
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000292}
293
294// Extracts details of Execution Context as log fields from the Tracing Span injected into the
295// context instance. Following log fields are extracted:
296// 1. Operation Name : key as 'op-name' and value as Span operation name
297// 2. Operation Id : key as 'op-id' and value as 64 bit Span Id in hex digits string
298//
299// Additionally, any tags present in Span are also extracted to use as log fields e.g. device-id.
300//
301// If no Span is found associated with context, blank slice is returned without any log fields
Girish Kumarcd402012020-08-18 12:17:38 +0000302func (lfm *LogFeaturesManager) ExtractContextAttributes(ctx context.Context) []interface{} {
303 if !lfm.isLogCorrelationEnabled {
Girish Kumaradc3ba12020-06-15 14:22:55 +0000304 return make([]interface{}, 0)
305 }
306
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000307 attrMap := make(map[string]interface{})
308
309 if ctx != nil {
310 if span := opentracing.SpanFromContext(ctx); span != nil {
311 if jspan, ok := span.(*jtracing.Span); ok {
Girish Kumaradc3ba12020-06-15 14:22:55 +0000312 // Add Log fields for operation identified by Root Level Span (Trace)
Girish Kumarcd402012020-08-18 12:17:38 +0000313 opId := fmt.Sprintf("%016x", jspan.SpanContext().TraceID().Low) // Using Sprintf to avoid removal of leading 0s
Girish Kumaradc3ba12020-06-15 14:22:55 +0000314 opName := jspan.BaggageItem(RootSpanNameKey)
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000315
Girish Kumarcd402012020-08-18 12:17:38 +0000316 taskId := fmt.Sprintf("%016x", uint64(jspan.SpanContext().SpanID())) // Using Sprintf to avoid removal of leading 0s
Girish Kumaradc3ba12020-06-15 14:22:55 +0000317 taskName := jspan.OperationName()
318
319 if opName == "" {
320 span.SetBaggageItem(RootSpanNameKey, taskName)
321 opName = taskName
322 }
323
324 attrMap["op-id"] = opId
325 attrMap["op-name"] = opName
326
327 // Add Log fields for task identified by Current Span, if it is different
328 // than operation
329 if taskId != opId {
330 attrMap["task-id"] = taskId
331 attrMap["task-name"] = taskName
332 }
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000333
334 for k, v := range jspan.Tags() {
Girish Kumaradc3ba12020-06-15 14:22:55 +0000335 // Ignore the special tags added by Jaeger, middleware (sampler.type, span.*) present in the span
336 if strings.HasPrefix(k, "sampler.") || strings.HasPrefix(k, "span.") || k == "component" {
337 continue
338 }
339
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000340 attrMap[k] = v
341 }
Girish Kumarcd402012020-08-18 12:17:38 +0000342
343 processBaggageItems := func(k, v string) bool {
344 if k != "rpc-span-name" {
345 attrMap[k] = v
346 }
347 return true
348 }
349
350 jspan.SpanContext().ForeachBaggageItem(processBaggageItems)
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000351 }
352 }
353 }
354
355 return serializeMap(attrMap)
356}
357
Girish Kumaradc3ba12020-06-15 14:22:55 +0000358// Method to inject additional log fields into Span e.g. device-id
359func EnrichSpan(ctx context.Context, keyAndValues ...Fields) {
360 span := opentracing.SpanFromContext(ctx)
361 if span != nil {
Girish Kumarcd402012020-08-18 12:17:38 +0000362 if jspan, ok := span.(*jtracing.Span); ok {
363 // Inject as a BaggageItem when the Span is the Root Span so that it propagates
364 // across the components along with Root Span (called as Trace)
365 // Else, inject as a Tag so that it is attached to the Child Task
366 isRootSpan := false
367 if jspan.SpanContext().TraceID().String() == jspan.SpanContext().SpanID().String() {
368 isRootSpan = true
369 }
370
371 for _, field := range keyAndValues {
372 for k, v := range field {
373 if isRootSpan {
374 span.SetBaggageItem(k, v.(string))
375 } else {
376 span.SetTag(k, v)
377 }
378 }
Girish Kumaradc3ba12020-06-15 14:22:55 +0000379 }
380 }
381 }
382}
383
384// Method to inject Error into the Span in event of any operation failure
385func MarkSpanError(ctx context.Context, err error) {
386 span := opentracing.SpanFromContext(ctx)
Girish Kumarcd402012020-08-18 12:17:38 +0000387 if span != nil {
388 span.SetTag("error", true)
389 span.SetTag("err", err)
390 }
Girish Kumaradc3ba12020-06-15 14:22:55 +0000391}
392
393// Creates a Child Span from Parent Span embedded in passed context. Should be used before starting a new major
394// operation in Synchronous or Asynchronous mode (go routine), such as following:
395// 1. Start of all implemented External API methods unless using a interceptor for auto-injection of Span (Server side impl)
396// 2. Just before calling an Third-Party lib which is invoking a External API (etcd, kafka)
397// 3. In start of a Go Routine responsible for performing a major task involving significant duration
398// 4. Any method which is suspected to be time consuming...
399func CreateChildSpan(ctx context.Context, taskName string, keyAndValues ...Fields) (opentracing.Span, context.Context) {
Girish Kumarcd402012020-08-18 12:17:38 +0000400 if !GetGlobalLFM().GetLogCorrelationStatus() && !GetGlobalLFM().GetTracePublishingStatus() {
Girish Kumaradc3ba12020-06-15 14:22:55 +0000401 return opentracing.NoopTracer{}.StartSpan(taskName), ctx
402 }
403
404 parentSpan := opentracing.SpanFromContext(ctx)
405 childSpan, newCtx := opentracing.StartSpanFromContext(ctx, taskName)
406
407 if parentSpan == nil || parentSpan.BaggageItem(RootSpanNameKey) == "" {
408 childSpan.SetBaggageItem(RootSpanNameKey, taskName)
409 }
410
411 EnrichSpan(newCtx, keyAndValues...)
412 return childSpan, newCtx
413}
414
415// Creates a Async Child Span with Follows-From relationship from Parent Span embedded in passed context.
416// Should be used only in scenarios when
417// a) There is dis-continuation in execution and thus result of Child span does not affect the Parent flow at all
418// b) The execution of Child Span is guaranteed to start after the completion of Parent Span
419// In case of any confusion, use CreateChildSpan method
420// Some situations where this method would be suitable includes Kafka Async RPC call, Propagation of Event across
421// a channel etc.
422func CreateAsyncSpan(ctx context.Context, taskName string, keyAndValues ...Fields) (opentracing.Span, context.Context) {
Girish Kumarcd402012020-08-18 12:17:38 +0000423 if !GetGlobalLFM().GetLogCorrelationStatus() && !GetGlobalLFM().GetTracePublishingStatus() {
Girish Kumaradc3ba12020-06-15 14:22:55 +0000424 return opentracing.NoopTracer{}.StartSpan(taskName), ctx
425 }
426
427 var asyncSpan opentracing.Span
428 var newCtx context.Context
429
430 parentSpan := opentracing.SpanFromContext(ctx)
431
432 // We should always be creating Aysnc span from a Valid parent span. If not, create a Child span instead
433 if parentSpan == nil {
Girish Kumarcd402012020-08-18 12:17:38 +0000434 logger.Warn(context.Background(), "Async span must be created with a Valid parent span only")
Girish Kumaradc3ba12020-06-15 14:22:55 +0000435 asyncSpan, newCtx = opentracing.StartSpanFromContext(ctx, taskName)
436 } else {
437 // Use Background context as the base for Follows-from case; else new span is getting both Child and FollowsFrom relationship
438 asyncSpan, newCtx = opentracing.StartSpanFromContext(context.Background(), taskName, opentracing.FollowsFrom(parentSpan.Context()))
439 }
440
441 if parentSpan == nil || parentSpan.BaggageItem(RootSpanNameKey) == "" {
442 asyncSpan.SetBaggageItem(RootSpanNameKey, taskName)
443 }
444
445 EnrichSpan(newCtx, keyAndValues...)
446 return asyncSpan, newCtx
447}
448
449// Extracts the span from Source context and injects into the supplied Target context.
450// This should be used in situations wherein we are calling a time-sensitive operation (etcd update) and hence
451// had a context.Background() used earlier to avoid any cancellation/timeout of operation by passed context.
452// This will allow propagation of span with a different base context (and not the original context)
453func WithSpanFromContext(targetCtx, sourceCtx context.Context) context.Context {
454 span := opentracing.SpanFromContext(sourceCtx)
455 return opentracing.ContextWithSpan(targetCtx, span)
456}
457
Rohan Agrawalc32d9932020-06-15 11:01:47 +0000458// Utility method to convert log Fields into array of interfaces expected by zap logger methods
459func serializeMap(fields Fields) []interface{} {
460 data := make([]interface{}, len(fields)*2)
461 i := 0
462 for k, v := range fields {
463 data[i] = k
464 data[i+1] = v
465 i = i + 2
466 }
467 return data
468}