blob: 9a627bed5aa5ec81f0167f79f9012cdc1f9a895e [file] [log] [blame]
Elia Battistonc8d0d462022-02-22 16:30:51 +01001// Copyright (c) 2017-2018 Uber Technologies, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package jaeger
16
17import (
18 "fmt"
19 "io"
20 "math/rand"
21 "os"
22 "reflect"
23 "strconv"
24 "sync"
25 "time"
26
27 "github.com/opentracing/opentracing-go"
28 "github.com/opentracing/opentracing-go/ext"
29
30 "github.com/uber/jaeger-client-go/internal/baggage"
31 "github.com/uber/jaeger-client-go/internal/throttler"
32 "github.com/uber/jaeger-client-go/log"
33 "github.com/uber/jaeger-client-go/utils"
34)
35
36// Tracer implements opentracing.Tracer.
37type Tracer struct {
38 serviceName string
39 hostIPv4 uint32 // this is for zipkin endpoint conversion
40
41 sampler SamplerV2
42 reporter Reporter
43 metrics Metrics
44 logger log.DebugLogger
45
46 timeNow func() time.Time
47 randomNumber func() uint64
48
49 options struct {
50 gen128Bit bool // whether to generate 128bit trace IDs
51 zipkinSharedRPCSpan bool
52 highTraceIDGenerator func() uint64 // custom high trace ID generator
53 maxTagValueLength int
54 noDebugFlagOnForcedSampling bool
55 maxLogsPerSpan int
56 // more options to come
57 }
58 // allocator of Span objects
59 spanAllocator SpanAllocator
60
61 injectors map[interface{}]Injector
62 extractors map[interface{}]Extractor
63
64 observer compositeObserver
65
66 tags []Tag
67 process Process
68
69 baggageRestrictionManager baggage.RestrictionManager
70 baggageSetter *baggageSetter
71
72 debugThrottler throttler.Throttler
73}
74
75// NewTracer creates Tracer implementation that reports tracing to Jaeger.
76// The returned io.Closer can be used in shutdown hooks to ensure that the internal
77// queue of the Reporter is drained and all buffered spans are submitted to collectors.
78// TODO (breaking change) return *Tracer only, without closer.
79func NewTracer(
80 serviceName string,
81 sampler Sampler,
82 reporter Reporter,
83 options ...TracerOption,
84) (opentracing.Tracer, io.Closer) {
85 t := &Tracer{
86 serviceName: serviceName,
87 sampler: samplerV1toV2(sampler),
88 reporter: reporter,
89 injectors: make(map[interface{}]Injector),
90 extractors: make(map[interface{}]Extractor),
91 metrics: *NewNullMetrics(),
92 spanAllocator: simpleSpanAllocator{},
93 }
94
95 for _, option := range options {
96 option(t)
97 }
98
99 // register default injectors/extractors unless they are already provided via options
100 textPropagator := NewTextMapPropagator(getDefaultHeadersConfig(), t.metrics)
101 t.addCodec(opentracing.TextMap, textPropagator, textPropagator)
102
103 httpHeaderPropagator := NewHTTPHeaderPropagator(getDefaultHeadersConfig(), t.metrics)
104 t.addCodec(opentracing.HTTPHeaders, httpHeaderPropagator, httpHeaderPropagator)
105
106 binaryPropagator := NewBinaryPropagator(t)
107 t.addCodec(opentracing.Binary, binaryPropagator, binaryPropagator)
108
109 // TODO remove after TChannel supports OpenTracing
110 interopPropagator := &jaegerTraceContextPropagator{tracer: t}
111 t.addCodec(SpanContextFormat, interopPropagator, interopPropagator)
112
113 zipkinPropagator := &zipkinPropagator{tracer: t}
114 t.addCodec(ZipkinSpanFormat, zipkinPropagator, zipkinPropagator)
115
116 if t.baggageRestrictionManager != nil {
117 t.baggageSetter = newBaggageSetter(t.baggageRestrictionManager, &t.metrics)
118 } else {
119 t.baggageSetter = newBaggageSetter(baggage.NewDefaultRestrictionManager(0), &t.metrics)
120 }
121 if t.debugThrottler == nil {
122 t.debugThrottler = throttler.DefaultThrottler{}
123 }
124
125 if t.randomNumber == nil {
126 seedGenerator := utils.NewRand(time.Now().UnixNano())
127 pool := sync.Pool{
128 New: func() interface{} {
129 return rand.NewSource(seedGenerator.Int63())
130 },
131 }
132
133 t.randomNumber = func() uint64 {
134 generator := pool.Get().(rand.Source)
135 number := uint64(generator.Int63())
136 pool.Put(generator)
137 return number
138 }
139 }
140 if t.timeNow == nil {
141 t.timeNow = time.Now
142 }
143 if t.logger == nil {
144 t.logger = log.NullLogger
145 }
146 // Set tracer-level tags
147 t.tags = append(t.tags, Tag{key: JaegerClientVersionTagKey, value: JaegerClientVersion})
148 if hostname, err := os.Hostname(); err == nil {
149 t.tags = append(t.tags, Tag{key: TracerHostnameTagKey, value: hostname})
150 }
151 if ipval, ok := t.getTag(TracerIPTagKey); ok {
152 ipv4, err := utils.ParseIPToUint32(ipval.(string))
153 if err != nil {
154 t.hostIPv4 = 0
155 t.logger.Error("Unable to convert the externally provided ip to uint32: " + err.Error())
156 } else {
157 t.hostIPv4 = ipv4
158 }
159 } else if ip, err := utils.HostIP(); err == nil {
160 t.tags = append(t.tags, Tag{key: TracerIPTagKey, value: ip.String()})
161 t.hostIPv4 = utils.PackIPAsUint32(ip)
162 } else {
163 t.logger.Error("Unable to determine this host's IP address: " + err.Error())
164 }
165
166 if t.options.gen128Bit {
167 if t.options.highTraceIDGenerator == nil {
168 t.options.highTraceIDGenerator = t.randomNumber
169 }
170 } else if t.options.highTraceIDGenerator != nil {
171 t.logger.Error("Overriding high trace ID generator but not generating " +
172 "128 bit trace IDs, consider enabling the \"Gen128Bit\" option")
173 }
174 if t.options.maxTagValueLength == 0 {
175 t.options.maxTagValueLength = DefaultMaxTagValueLength
176 }
177 t.process = Process{
178 Service: serviceName,
179 UUID: strconv.FormatUint(t.randomNumber(), 16),
180 Tags: t.tags,
181 }
182 if throttler, ok := t.debugThrottler.(ProcessSetter); ok {
183 throttler.SetProcess(t.process)
184 }
185
186 return t, t
187}
188
189// addCodec adds registers injector and extractor for given propagation format if not already defined.
190func (t *Tracer) addCodec(format interface{}, injector Injector, extractor Extractor) {
191 if _, ok := t.injectors[format]; !ok {
192 t.injectors[format] = injector
193 }
194 if _, ok := t.extractors[format]; !ok {
195 t.extractors[format] = extractor
196 }
197}
198
199// StartSpan implements StartSpan() method of opentracing.Tracer.
200func (t *Tracer) StartSpan(
201 operationName string,
202 options ...opentracing.StartSpanOption,
203) opentracing.Span {
204 sso := opentracing.StartSpanOptions{}
205 for _, o := range options {
206 o.Apply(&sso)
207 }
208 return t.startSpanWithOptions(operationName, sso)
209}
210
211func (t *Tracer) startSpanWithOptions(
212 operationName string,
213 options opentracing.StartSpanOptions,
214) opentracing.Span {
215 if options.StartTime.IsZero() {
216 options.StartTime = t.timeNow()
217 }
218
219 // Predicate whether the given span context is an empty reference
220 // or may be used as parent / debug ID / baggage items source
221 isEmptyReference := func(ctx SpanContext) bool {
222 return !ctx.IsValid() && !ctx.isDebugIDContainerOnly() && len(ctx.baggage) == 0
223 }
224
225 var references []Reference
226 var parent SpanContext
227 var hasParent bool // need this because `parent` is a value, not reference
228 var ctx SpanContext
229 var isSelfRef bool
230 for _, ref := range options.References {
231 ctxRef, ok := ref.ReferencedContext.(SpanContext)
232 if !ok {
233 t.logger.Error(fmt.Sprintf(
234 "Reference contains invalid type of SpanReference: %s",
235 reflect.ValueOf(ref.ReferencedContext)))
236 continue
237 }
238 if isEmptyReference(ctxRef) {
239 continue
240 }
241
242 if ref.Type == selfRefType {
243 isSelfRef = true
244 ctx = ctxRef
245 continue
246 }
247
248 if ctxRef.IsValid() {
249 // we don't want empty context that contains only debug-id or baggage
250 references = append(references, Reference{Type: ref.Type, Context: ctxRef})
251 }
252
253 if !hasParent {
254 parent = ctxRef
255 hasParent = ref.Type == opentracing.ChildOfRef
256 }
257 }
258 if !hasParent && !isEmptyReference(parent) {
259 // If ChildOfRef wasn't found but a FollowFromRef exists, use the context from
260 // the FollowFromRef as the parent
261 hasParent = true
262 }
263
264 rpcServer := false
265 if v, ok := options.Tags[ext.SpanKindRPCServer.Key]; ok {
266 rpcServer = (v == ext.SpanKindRPCServerEnum || v == string(ext.SpanKindRPCServerEnum))
267 }
268
269 var internalTags []Tag
270 newTrace := false
271 if !isSelfRef {
272 if !hasParent || !parent.IsValid() {
273 newTrace = true
274 ctx.traceID.Low = t.randomID()
275 if t.options.gen128Bit {
276 ctx.traceID.High = t.options.highTraceIDGenerator()
277 }
278 ctx.spanID = SpanID(ctx.traceID.Low)
279 ctx.parentID = 0
280 ctx.samplingState = &samplingState{
281 localRootSpan: ctx.spanID,
282 }
283 if hasParent && parent.isDebugIDContainerOnly() && t.isDebugAllowed(operationName) {
284 ctx.samplingState.setDebugAndSampled()
285 internalTags = append(internalTags, Tag{key: JaegerDebugHeader, value: parent.debugID})
286 }
287 } else {
288 ctx.traceID = parent.traceID
289 if rpcServer && t.options.zipkinSharedRPCSpan {
290 // Support Zipkin's one-span-per-RPC model
291 ctx.spanID = parent.spanID
292 ctx.parentID = parent.parentID
293 } else {
294 ctx.spanID = SpanID(t.randomID())
295 ctx.parentID = parent.spanID
296 }
297 ctx.samplingState = parent.samplingState
298 if parent.remote {
299 ctx.samplingState.setFinal()
300 ctx.samplingState.localRootSpan = ctx.spanID
301 }
302 }
303 if hasParent {
304 // copy baggage items
305 if l := len(parent.baggage); l > 0 {
306 ctx.baggage = make(map[string]string, len(parent.baggage))
307 for k, v := range parent.baggage {
308 ctx.baggage[k] = v
309 }
310 }
311 }
312 }
313
314 sp := t.newSpan()
315 sp.context = ctx
316 sp.tracer = t
317 sp.operationName = operationName
318 sp.startTime = options.StartTime
319 sp.duration = 0
320 sp.references = references
321 sp.firstInProcess = rpcServer || sp.context.parentID == 0
322
323 if !sp.context.isSamplingFinalized() {
324 decision := t.sampler.OnCreateSpan(sp)
325 sp.applySamplingDecision(decision, false)
326 }
327 sp.observer = t.observer.OnStartSpan(sp, operationName, options)
328
329 if tagsTotalLength := len(options.Tags) + len(internalTags); tagsTotalLength > 0 {
330 if sp.tags == nil || cap(sp.tags) < tagsTotalLength {
331 sp.tags = make([]Tag, 0, tagsTotalLength)
332 }
333 sp.tags = append(sp.tags, internalTags...)
334 for k, v := range options.Tags {
335 sp.setTagInternal(k, v, false)
336 }
337 }
338 t.emitNewSpanMetrics(sp, newTrace)
339 return sp
340}
341
342// Inject implements Inject() method of opentracing.Tracer
343func (t *Tracer) Inject(ctx opentracing.SpanContext, format interface{}, carrier interface{}) error {
344 c, ok := ctx.(SpanContext)
345 if !ok {
346 return opentracing.ErrInvalidSpanContext
347 }
348 if injector, ok := t.injectors[format]; ok {
349 return injector.Inject(c, carrier)
350 }
351 return opentracing.ErrUnsupportedFormat
352}
353
354// Extract implements Extract() method of opentracing.Tracer
355func (t *Tracer) Extract(
356 format interface{},
357 carrier interface{},
358) (opentracing.SpanContext, error) {
359 if extractor, ok := t.extractors[format]; ok {
360 spanCtx, err := extractor.Extract(carrier)
361 if err != nil {
362 return nil, err // ensure returned spanCtx is nil
363 }
364 spanCtx.remote = true
365 return spanCtx, nil
366 }
367 return nil, opentracing.ErrUnsupportedFormat
368}
369
370// Close releases all resources used by the Tracer and flushes any remaining buffered spans.
371func (t *Tracer) Close() error {
372 t.logger.Debugf("closing tracer")
373 t.reporter.Close()
374 t.sampler.Close()
375 if mgr, ok := t.baggageRestrictionManager.(io.Closer); ok {
376 _ = mgr.Close()
377 }
378 if throttler, ok := t.debugThrottler.(io.Closer); ok {
379 _ = throttler.Close()
380 }
381 return nil
382}
383
384// Tags returns a slice of tracer-level tags.
385func (t *Tracer) Tags() []opentracing.Tag {
386 tags := make([]opentracing.Tag, len(t.tags))
387 for i, tag := range t.tags {
388 tags[i] = opentracing.Tag{Key: tag.key, Value: tag.value}
389 }
390 return tags
391}
392
393// getTag returns the value of specific tag, if not exists, return nil.
394// TODO only used by tests, move there.
395func (t *Tracer) getTag(key string) (interface{}, bool) {
396 for _, tag := range t.tags {
397 if tag.key == key {
398 return tag.value, true
399 }
400 }
401 return nil, false
402}
403
404// newSpan returns an instance of a clean Span object.
405// If options.PoolSpans is true, the spans are retrieved from an object pool.
406func (t *Tracer) newSpan() *Span {
407 return t.spanAllocator.Get()
408}
409
410// emitNewSpanMetrics generates metrics on the number of started spans and traces.
411// newTrace param: we cannot simply check for parentID==0 because in Zipkin model the
412// server-side RPC span has the exact same trace/span/parent IDs as the
413// calling client-side span, but obviously the server side span is
414// no longer a root span of the trace.
415func (t *Tracer) emitNewSpanMetrics(sp *Span, newTrace bool) {
416 if !sp.context.isSamplingFinalized() {
417 t.metrics.SpansStartedDelayedSampling.Inc(1)
418 if newTrace {
419 t.metrics.TracesStartedDelayedSampling.Inc(1)
420 }
421 // joining a trace is not possible, because sampling decision inherited from upstream is final
422 } else if sp.context.IsSampled() {
423 t.metrics.SpansStartedSampled.Inc(1)
424 if newTrace {
425 t.metrics.TracesStartedSampled.Inc(1)
426 } else if sp.firstInProcess {
427 t.metrics.TracesJoinedSampled.Inc(1)
428 }
429 } else {
430 t.metrics.SpansStartedNotSampled.Inc(1)
431 if newTrace {
432 t.metrics.TracesStartedNotSampled.Inc(1)
433 } else if sp.firstInProcess {
434 t.metrics.TracesJoinedNotSampled.Inc(1)
435 }
436 }
437}
438
439func (t *Tracer) reportSpan(sp *Span) {
440 ctx := sp.SpanContext()
441
442 if !ctx.isSamplingFinalized() {
443 t.metrics.SpansFinishedDelayedSampling.Inc(1)
444 } else if ctx.IsSampled() {
445 t.metrics.SpansFinishedSampled.Inc(1)
446 } else {
447 t.metrics.SpansFinishedNotSampled.Inc(1)
448 }
449
450 // Note: if the reporter is processing Span asynchronously then it needs to Retain() the span,
451 // and then Release() it when no longer needed.
452 // Otherwise, the span may be reused for another trace and its data may be overwritten.
453 if ctx.IsSampled() {
454 t.reporter.Report(sp)
455 }
456
457 sp.Release()
458}
459
460// randomID generates a random trace/span ID, using tracer.random() generator.
461// It never returns 0.
462func (t *Tracer) randomID() uint64 {
463 val := t.randomNumber()
464 for val == 0 {
465 val = t.randomNumber()
466 }
467 return val
468}
469
470// (NB) span must hold the lock before making this call
471func (t *Tracer) setBaggage(sp *Span, key, value string) {
472 t.baggageSetter.setBaggage(sp, key, value)
473}
474
475// (NB) span must hold the lock before making this call
476func (t *Tracer) isDebugAllowed(operation string) bool {
477 return t.debugThrottler.IsAllowed(operation)
478}
479
480// Sampler returns the sampler given to the tracer at creation.
481func (t *Tracer) Sampler() SamplerV2 {
482 return t.sampler
483}
484
485// SelfRef creates an opentracing compliant SpanReference from a jaeger
486// SpanContext. This is a factory function in order to encapsulate jaeger specific
487// types.
488func SelfRef(ctx SpanContext) opentracing.SpanReference {
489 return opentracing.SpanReference{
490 Type: selfRefType,
491 ReferencedContext: ctx,
492 }
493}