blob: 8a3fc97abbcc879084bf750a2364ed88ba41f2ff [file] [log] [blame]
Takahiro Suzuki241c10e2020-12-17 20:17:57 +09001// Copyright (c) 2017-2018 Uber Technologies, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package jaeger
16
17import (
18 "fmt"
19 "io"
20 "math/rand"
21 "os"
22 "reflect"
23 "strconv"
24 "sync"
25 "time"
26
27 "github.com/opentracing/opentracing-go"
28 "github.com/opentracing/opentracing-go/ext"
29
30 "github.com/uber/jaeger-client-go/internal/baggage"
31 "github.com/uber/jaeger-client-go/internal/throttler"
32 "github.com/uber/jaeger-client-go/log"
33 "github.com/uber/jaeger-client-go/utils"
34)
35
36// Tracer implements opentracing.Tracer.
37type Tracer struct {
38 serviceName string
39 hostIPv4 uint32 // this is for zipkin endpoint conversion
40
41 sampler SamplerV2
42 reporter Reporter
43 metrics Metrics
44 logger log.DebugLogger
45
46 timeNow func() time.Time
47 randomNumber func() uint64
48
49 options struct {
50 gen128Bit bool // whether to generate 128bit trace IDs
51 zipkinSharedRPCSpan bool
52 highTraceIDGenerator func() uint64 // custom high trace ID generator
53 maxTagValueLength int
54 noDebugFlagOnForcedSampling bool
55 maxLogsPerSpan int
56 // more options to come
57 }
58 // allocator of Span objects
59 spanAllocator SpanAllocator
60
61 injectors map[interface{}]Injector
62 extractors map[interface{}]Extractor
63
64 observer compositeObserver
65
66 tags []Tag
67 process Process
68
69 baggageRestrictionManager baggage.RestrictionManager
70 baggageSetter *baggageSetter
71
72 debugThrottler throttler.Throttler
73}
74
75// NewTracer creates Tracer implementation that reports tracing to Jaeger.
76// The returned io.Closer can be used in shutdown hooks to ensure that the internal
77// queue of the Reporter is drained and all buffered spans are submitted to collectors.
78// TODO (breaking change) return *Tracer only, without closer.
79func NewTracer(
80 serviceName string,
81 sampler Sampler,
82 reporter Reporter,
83 options ...TracerOption,
84) (opentracing.Tracer, io.Closer) {
85 t := &Tracer{
86 serviceName: serviceName,
87 sampler: samplerV1toV2(sampler),
88 reporter: reporter,
89 injectors: make(map[interface{}]Injector),
90 extractors: make(map[interface{}]Extractor),
91 metrics: *NewNullMetrics(),
92 spanAllocator: simpleSpanAllocator{},
93 }
94
95 for _, option := range options {
96 option(t)
97 }
98
99 // register default injectors/extractors unless they are already provided via options
100 textPropagator := NewTextMapPropagator(getDefaultHeadersConfig(), t.metrics)
101 t.addCodec(opentracing.TextMap, textPropagator, textPropagator)
102
103 httpHeaderPropagator := NewHTTPHeaderPropagator(getDefaultHeadersConfig(), t.metrics)
104 t.addCodec(opentracing.HTTPHeaders, httpHeaderPropagator, httpHeaderPropagator)
105
106 binaryPropagator := NewBinaryPropagator(t)
107 t.addCodec(opentracing.Binary, binaryPropagator, binaryPropagator)
108
109 // TODO remove after TChannel supports OpenTracing
110 interopPropagator := &jaegerTraceContextPropagator{tracer: t}
111 t.addCodec(SpanContextFormat, interopPropagator, interopPropagator)
112
113 zipkinPropagator := &zipkinPropagator{tracer: t}
114 t.addCodec(ZipkinSpanFormat, zipkinPropagator, zipkinPropagator)
115
116 if t.baggageRestrictionManager != nil {
117 t.baggageSetter = newBaggageSetter(t.baggageRestrictionManager, &t.metrics)
118 } else {
119 t.baggageSetter = newBaggageSetter(baggage.NewDefaultRestrictionManager(0), &t.metrics)
120 }
121 if t.debugThrottler == nil {
122 t.debugThrottler = throttler.DefaultThrottler{}
123 }
124
125 if t.randomNumber == nil {
126 seedGenerator := utils.NewRand(time.Now().UnixNano())
127 pool := sync.Pool{
128 New: func() interface{} {
129 return rand.NewSource(seedGenerator.Int63())
130 },
131 }
132
133 t.randomNumber = func() uint64 {
134 generator := pool.Get().(rand.Source)
135 number := uint64(generator.Int63())
136 pool.Put(generator)
137 return number
138 }
139 }
140 if t.timeNow == nil {
141 t.timeNow = time.Now
142 }
143 if t.logger == nil {
144 t.logger = log.NullLogger
145 }
146 // Set tracer-level tags
147 t.tags = append(t.tags, Tag{key: JaegerClientVersionTagKey, value: JaegerClientVersion})
148 if hostname, err := os.Hostname(); err == nil {
149 t.tags = append(t.tags, Tag{key: TracerHostnameTagKey, value: hostname})
150 }
151 if ipval, ok := t.getTag(TracerIPTagKey); ok {
152 ipv4, err := utils.ParseIPToUint32(ipval.(string))
153 if err != nil {
154 t.hostIPv4 = 0
155 t.logger.Error("Unable to convert the externally provided ip to uint32: " + err.Error())
156 } else {
157 t.hostIPv4 = ipv4
158 }
159 } else if ip, err := utils.HostIP(); err == nil {
160 t.tags = append(t.tags, Tag{key: TracerIPTagKey, value: ip.String()})
161 t.hostIPv4 = utils.PackIPAsUint32(ip)
162 } else {
163 t.logger.Error("Unable to determine this host's IP address: " + err.Error())
164 }
165
166 if t.options.gen128Bit {
167 if t.options.highTraceIDGenerator == nil {
168 t.options.highTraceIDGenerator = t.randomNumber
169 }
170 } else if t.options.highTraceIDGenerator != nil {
171 t.logger.Error("Overriding high trace ID generator but not generating " +
172 "128 bit trace IDs, consider enabling the \"Gen128Bit\" option")
173 }
174 if t.options.maxTagValueLength == 0 {
175 t.options.maxTagValueLength = DefaultMaxTagValueLength
176 }
177 t.process = Process{
178 Service: serviceName,
179 UUID: strconv.FormatUint(t.randomNumber(), 16),
180 Tags: t.tags,
181 }
182 if throttler, ok := t.debugThrottler.(ProcessSetter); ok {
183 throttler.SetProcess(t.process)
184 }
185
186 return t, t
187}
188
189// addCodec adds registers injector and extractor for given propagation format if not already defined.
190func (t *Tracer) addCodec(format interface{}, injector Injector, extractor Extractor) {
191 if _, ok := t.injectors[format]; !ok {
192 t.injectors[format] = injector
193 }
194 if _, ok := t.extractors[format]; !ok {
195 t.extractors[format] = extractor
196 }
197}
198
199// StartSpan implements StartSpan() method of opentracing.Tracer.
200func (t *Tracer) StartSpan(
201 operationName string,
202 options ...opentracing.StartSpanOption,
203) opentracing.Span {
204 sso := opentracing.StartSpanOptions{}
205 for _, o := range options {
206 o.Apply(&sso)
207 }
208 return t.startSpanWithOptions(operationName, sso)
209}
210
211func (t *Tracer) startSpanWithOptions(
212 operationName string,
213 options opentracing.StartSpanOptions,
214) opentracing.Span {
215 if options.StartTime.IsZero() {
216 options.StartTime = t.timeNow()
217 }
218
219 // Predicate whether the given span context is a valid reference
220 // which may be used as parent / debug ID / baggage items source
221 isValidReference := func(ctx SpanContext) bool {
222 return ctx.IsValid() || ctx.isDebugIDContainerOnly() || len(ctx.baggage) != 0
223 }
224
225 var references []Reference
226 var parent SpanContext
227 var hasParent bool // need this because `parent` is a value, not reference
228 var ctx SpanContext
229 var isSelfRef bool
230 for _, ref := range options.References {
231 ctxRef, ok := ref.ReferencedContext.(SpanContext)
232 if !ok {
233 t.logger.Error(fmt.Sprintf(
234 "Reference contains invalid type of SpanReference: %s",
235 reflect.ValueOf(ref.ReferencedContext)))
236 continue
237 }
238 if !isValidReference(ctxRef) {
239 continue
240 }
241
242 if ref.Type == selfRefType {
243 isSelfRef = true
244 ctx = ctxRef
245 continue
246 }
247
248 references = append(references, Reference{Type: ref.Type, Context: ctxRef})
249
250 if !hasParent {
251 parent = ctxRef
252 hasParent = ref.Type == opentracing.ChildOfRef
253 }
254 }
255 if !hasParent && isValidReference(parent) {
256 // If ChildOfRef wasn't found but a FollowFromRef exists, use the context from
257 // the FollowFromRef as the parent
258 hasParent = true
259 }
260
261 rpcServer := false
262 if v, ok := options.Tags[ext.SpanKindRPCServer.Key]; ok {
263 rpcServer = (v == ext.SpanKindRPCServerEnum || v == string(ext.SpanKindRPCServerEnum))
264 }
265
266 var internalTags []Tag
267 newTrace := false
268 if !isSelfRef {
269 if !hasParent || !parent.IsValid() {
270 newTrace = true
271 ctx.traceID.Low = t.randomID()
272 if t.options.gen128Bit {
273 ctx.traceID.High = t.options.highTraceIDGenerator()
274 }
275 ctx.spanID = SpanID(ctx.traceID.Low)
276 ctx.parentID = 0
277 ctx.samplingState = &samplingState{
278 localRootSpan: ctx.spanID,
279 }
280 if hasParent && parent.isDebugIDContainerOnly() && t.isDebugAllowed(operationName) {
281 ctx.samplingState.setDebugAndSampled()
282 internalTags = append(internalTags, Tag{key: JaegerDebugHeader, value: parent.debugID})
283 }
284 } else {
285 ctx.traceID = parent.traceID
286 if rpcServer && t.options.zipkinSharedRPCSpan {
287 // Support Zipkin's one-span-per-RPC model
288 ctx.spanID = parent.spanID
289 ctx.parentID = parent.parentID
290 } else {
291 ctx.spanID = SpanID(t.randomID())
292 ctx.parentID = parent.spanID
293 }
294 ctx.samplingState = parent.samplingState
295 if parent.remote {
296 ctx.samplingState.setFinal()
297 ctx.samplingState.localRootSpan = ctx.spanID
298 }
299 }
300 if hasParent {
301 // copy baggage items
302 if l := len(parent.baggage); l > 0 {
303 ctx.baggage = make(map[string]string, len(parent.baggage))
304 for k, v := range parent.baggage {
305 ctx.baggage[k] = v
306 }
307 }
308 }
309 }
310
311 sp := t.newSpan()
312 sp.context = ctx
313 sp.tracer = t
314 sp.operationName = operationName
315 sp.startTime = options.StartTime
316 sp.duration = 0
317 sp.references = references
318 sp.firstInProcess = rpcServer || sp.context.parentID == 0
319
320 if !sp.isSamplingFinalized() {
321 decision := t.sampler.OnCreateSpan(sp)
322 sp.applySamplingDecision(decision, false)
323 }
324 sp.observer = t.observer.OnStartSpan(sp, operationName, options)
325
326 if tagsTotalLength := len(options.Tags) + len(internalTags); tagsTotalLength > 0 {
327 if sp.tags == nil || cap(sp.tags) < tagsTotalLength {
328 sp.tags = make([]Tag, 0, tagsTotalLength)
329 }
330 sp.tags = append(sp.tags, internalTags...)
331 for k, v := range options.Tags {
332 sp.setTagInternal(k, v, false)
333 }
334 }
335 t.emitNewSpanMetrics(sp, newTrace)
336 return sp
337}
338
339// Inject implements Inject() method of opentracing.Tracer
340func (t *Tracer) Inject(ctx opentracing.SpanContext, format interface{}, carrier interface{}) error {
341 c, ok := ctx.(SpanContext)
342 if !ok {
343 return opentracing.ErrInvalidSpanContext
344 }
345 if injector, ok := t.injectors[format]; ok {
346 return injector.Inject(c, carrier)
347 }
348 return opentracing.ErrUnsupportedFormat
349}
350
351// Extract implements Extract() method of opentracing.Tracer
352func (t *Tracer) Extract(
353 format interface{},
354 carrier interface{},
355) (opentracing.SpanContext, error) {
356 if extractor, ok := t.extractors[format]; ok {
357 spanCtx, err := extractor.Extract(carrier)
358 if err != nil {
359 return nil, err // ensure returned spanCtx is nil
360 }
361 spanCtx.remote = true
362 return spanCtx, nil
363 }
364 return nil, opentracing.ErrUnsupportedFormat
365}
366
367// Close releases all resources used by the Tracer and flushes any remaining buffered spans.
368func (t *Tracer) Close() error {
369 t.logger.Debugf("closing tracer")
370 t.reporter.Close()
371 t.sampler.Close()
372 if mgr, ok := t.baggageRestrictionManager.(io.Closer); ok {
373 _ = mgr.Close()
374 }
375 if throttler, ok := t.debugThrottler.(io.Closer); ok {
376 _ = throttler.Close()
377 }
378 return nil
379}
380
381// Tags returns a slice of tracer-level tags.
382func (t *Tracer) Tags() []opentracing.Tag {
383 tags := make([]opentracing.Tag, len(t.tags))
384 for i, tag := range t.tags {
385 tags[i] = opentracing.Tag{Key: tag.key, Value: tag.value}
386 }
387 return tags
388}
389
390// getTag returns the value of specific tag, if not exists, return nil.
391// TODO only used by tests, move there.
392func (t *Tracer) getTag(key string) (interface{}, bool) {
393 for _, tag := range t.tags {
394 if tag.key == key {
395 return tag.value, true
396 }
397 }
398 return nil, false
399}
400
401// newSpan returns an instance of a clean Span object.
402// If options.PoolSpans is true, the spans are retrieved from an object pool.
403func (t *Tracer) newSpan() *Span {
404 return t.spanAllocator.Get()
405}
406
407// emitNewSpanMetrics generates metrics on the number of started spans and traces.
408// newTrace param: we cannot simply check for parentID==0 because in Zipkin model the
409// server-side RPC span has the exact same trace/span/parent IDs as the
410// calling client-side span, but obviously the server side span is
411// no longer a root span of the trace.
412func (t *Tracer) emitNewSpanMetrics(sp *Span, newTrace bool) {
413 if !sp.isSamplingFinalized() {
414 t.metrics.SpansStartedDelayedSampling.Inc(1)
415 if newTrace {
416 t.metrics.TracesStartedDelayedSampling.Inc(1)
417 }
418 // joining a trace is not possible, because sampling decision inherited from upstream is final
419 } else if sp.context.IsSampled() {
420 t.metrics.SpansStartedSampled.Inc(1)
421 if newTrace {
422 t.metrics.TracesStartedSampled.Inc(1)
423 } else if sp.firstInProcess {
424 t.metrics.TracesJoinedSampled.Inc(1)
425 }
426 } else {
427 t.metrics.SpansStartedNotSampled.Inc(1)
428 if newTrace {
429 t.metrics.TracesStartedNotSampled.Inc(1)
430 } else if sp.firstInProcess {
431 t.metrics.TracesJoinedNotSampled.Inc(1)
432 }
433 }
434}
435
436func (t *Tracer) reportSpan(sp *Span) {
437 if !sp.isSamplingFinalized() {
438 t.metrics.SpansFinishedDelayedSampling.Inc(1)
439 } else if sp.context.IsSampled() {
440 t.metrics.SpansFinishedSampled.Inc(1)
441 } else {
442 t.metrics.SpansFinishedNotSampled.Inc(1)
443 }
444
445 // Note: if the reporter is processing Span asynchronously then it needs to Retain() the span,
446 // and then Release() it when no longer needed.
447 // Otherwise, the span may be reused for another trace and its data may be overwritten.
448 if sp.context.IsSampled() {
449 t.reporter.Report(sp)
450 }
451
452 sp.Release()
453}
454
455// randomID generates a random trace/span ID, using tracer.random() generator.
456// It never returns 0.
457func (t *Tracer) randomID() uint64 {
458 val := t.randomNumber()
459 for val == 0 {
460 val = t.randomNumber()
461 }
462 return val
463}
464
465// (NB) span must hold the lock before making this call
466func (t *Tracer) setBaggage(sp *Span, key, value string) {
467 t.baggageSetter.setBaggage(sp, key, value)
468}
469
470// (NB) span must hold the lock before making this call
471func (t *Tracer) isDebugAllowed(operation string) bool {
472 return t.debugThrottler.IsAllowed(operation)
473}
474
475// Sampler returns the sampler given to the tracer at creation.
476func (t *Tracer) Sampler() SamplerV2 {
477 return t.sampler
478}
479
480// SelfRef creates an opentracing compliant SpanReference from a jaeger
481// SpanContext. This is a factory function in order to encapsulate jaeger specific
482// types.
483func SelfRef(ctx SpanContext) opentracing.SpanReference {
484 return opentracing.SpanReference{
485 Type: selfRefType,
486 ReferencedContext: ctx,
487 }
488}