Girish Gowdra | 5d7d644 | 2020-09-08 17:03:11 -0700 | [diff] [blame^] | 1 | // Copyright (c) 2017-2018 Uber Technologies, Inc. |
| 2 | // |
| 3 | // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | // you may not use this file except in compliance with the License. |
| 5 | // You may obtain a copy of the License at |
| 6 | // |
| 7 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | // |
| 9 | // Unless required by applicable law or agreed to in writing, software |
| 10 | // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | // See the License for the specific language governing permissions and |
| 13 | // limitations under the License. |
| 14 | |
| 15 | package jaeger |
| 16 | |
| 17 | import ( |
| 18 | "fmt" |
| 19 | "io" |
| 20 | "math/rand" |
| 21 | "os" |
| 22 | "reflect" |
| 23 | "strconv" |
| 24 | "sync" |
| 25 | "time" |
| 26 | |
| 27 | "github.com/opentracing/opentracing-go" |
| 28 | "github.com/opentracing/opentracing-go/ext" |
| 29 | |
| 30 | "github.com/uber/jaeger-client-go/internal/baggage" |
| 31 | "github.com/uber/jaeger-client-go/internal/throttler" |
| 32 | "github.com/uber/jaeger-client-go/log" |
| 33 | "github.com/uber/jaeger-client-go/utils" |
| 34 | ) |
| 35 | |
| 36 | // Tracer implements opentracing.Tracer. |
| 37 | type Tracer struct { |
| 38 | serviceName string |
| 39 | hostIPv4 uint32 // this is for zipkin endpoint conversion |
| 40 | |
| 41 | sampler SamplerV2 |
| 42 | reporter Reporter |
| 43 | metrics Metrics |
| 44 | logger log.DebugLogger |
| 45 | |
| 46 | timeNow func() time.Time |
| 47 | randomNumber func() uint64 |
| 48 | |
| 49 | options struct { |
| 50 | gen128Bit bool // whether to generate 128bit trace IDs |
| 51 | zipkinSharedRPCSpan bool |
| 52 | highTraceIDGenerator func() uint64 // custom high trace ID generator |
| 53 | maxTagValueLength int |
| 54 | noDebugFlagOnForcedSampling bool |
| 55 | maxLogsPerSpan int |
| 56 | // more options to come |
| 57 | } |
| 58 | // allocator of Span objects |
| 59 | spanAllocator SpanAllocator |
| 60 | |
| 61 | injectors map[interface{}]Injector |
| 62 | extractors map[interface{}]Extractor |
| 63 | |
| 64 | observer compositeObserver |
| 65 | |
| 66 | tags []Tag |
| 67 | process Process |
| 68 | |
| 69 | baggageRestrictionManager baggage.RestrictionManager |
| 70 | baggageSetter *baggageSetter |
| 71 | |
| 72 | debugThrottler throttler.Throttler |
| 73 | } |
| 74 | |
| 75 | // NewTracer creates Tracer implementation that reports tracing to Jaeger. |
| 76 | // The returned io.Closer can be used in shutdown hooks to ensure that the internal |
| 77 | // queue of the Reporter is drained and all buffered spans are submitted to collectors. |
| 78 | // TODO (breaking change) return *Tracer only, without closer. |
| 79 | func NewTracer( |
| 80 | serviceName string, |
| 81 | sampler Sampler, |
| 82 | reporter Reporter, |
| 83 | options ...TracerOption, |
| 84 | ) (opentracing.Tracer, io.Closer) { |
| 85 | t := &Tracer{ |
| 86 | serviceName: serviceName, |
| 87 | sampler: samplerV1toV2(sampler), |
| 88 | reporter: reporter, |
| 89 | injectors: make(map[interface{}]Injector), |
| 90 | extractors: make(map[interface{}]Extractor), |
| 91 | metrics: *NewNullMetrics(), |
| 92 | spanAllocator: simpleSpanAllocator{}, |
| 93 | } |
| 94 | |
| 95 | for _, option := range options { |
| 96 | option(t) |
| 97 | } |
| 98 | |
| 99 | // register default injectors/extractors unless they are already provided via options |
| 100 | textPropagator := NewTextMapPropagator(getDefaultHeadersConfig(), t.metrics) |
| 101 | t.addCodec(opentracing.TextMap, textPropagator, textPropagator) |
| 102 | |
| 103 | httpHeaderPropagator := NewHTTPHeaderPropagator(getDefaultHeadersConfig(), t.metrics) |
| 104 | t.addCodec(opentracing.HTTPHeaders, httpHeaderPropagator, httpHeaderPropagator) |
| 105 | |
| 106 | binaryPropagator := NewBinaryPropagator(t) |
| 107 | t.addCodec(opentracing.Binary, binaryPropagator, binaryPropagator) |
| 108 | |
| 109 | // TODO remove after TChannel supports OpenTracing |
| 110 | interopPropagator := &jaegerTraceContextPropagator{tracer: t} |
| 111 | t.addCodec(SpanContextFormat, interopPropagator, interopPropagator) |
| 112 | |
| 113 | zipkinPropagator := &zipkinPropagator{tracer: t} |
| 114 | t.addCodec(ZipkinSpanFormat, zipkinPropagator, zipkinPropagator) |
| 115 | |
| 116 | if t.baggageRestrictionManager != nil { |
| 117 | t.baggageSetter = newBaggageSetter(t.baggageRestrictionManager, &t.metrics) |
| 118 | } else { |
| 119 | t.baggageSetter = newBaggageSetter(baggage.NewDefaultRestrictionManager(0), &t.metrics) |
| 120 | } |
| 121 | if t.debugThrottler == nil { |
| 122 | t.debugThrottler = throttler.DefaultThrottler{} |
| 123 | } |
| 124 | |
| 125 | if t.randomNumber == nil { |
| 126 | seedGenerator := utils.NewRand(time.Now().UnixNano()) |
| 127 | pool := sync.Pool{ |
| 128 | New: func() interface{} { |
| 129 | return rand.NewSource(seedGenerator.Int63()) |
| 130 | }, |
| 131 | } |
| 132 | |
| 133 | t.randomNumber = func() uint64 { |
| 134 | generator := pool.Get().(rand.Source) |
| 135 | number := uint64(generator.Int63()) |
| 136 | pool.Put(generator) |
| 137 | return number |
| 138 | } |
| 139 | } |
| 140 | if t.timeNow == nil { |
| 141 | t.timeNow = time.Now |
| 142 | } |
| 143 | if t.logger == nil { |
| 144 | t.logger = log.NullLogger |
| 145 | } |
| 146 | // Set tracer-level tags |
| 147 | t.tags = append(t.tags, Tag{key: JaegerClientVersionTagKey, value: JaegerClientVersion}) |
| 148 | if hostname, err := os.Hostname(); err == nil { |
| 149 | t.tags = append(t.tags, Tag{key: TracerHostnameTagKey, value: hostname}) |
| 150 | } |
| 151 | if ipval, ok := t.getTag(TracerIPTagKey); ok { |
| 152 | ipv4, err := utils.ParseIPToUint32(ipval.(string)) |
| 153 | if err != nil { |
| 154 | t.hostIPv4 = 0 |
| 155 | t.logger.Error("Unable to convert the externally provided ip to uint32: " + err.Error()) |
| 156 | } else { |
| 157 | t.hostIPv4 = ipv4 |
| 158 | } |
| 159 | } else if ip, err := utils.HostIP(); err == nil { |
| 160 | t.tags = append(t.tags, Tag{key: TracerIPTagKey, value: ip.String()}) |
| 161 | t.hostIPv4 = utils.PackIPAsUint32(ip) |
| 162 | } else { |
| 163 | t.logger.Error("Unable to determine this host's IP address: " + err.Error()) |
| 164 | } |
| 165 | |
| 166 | if t.options.gen128Bit { |
| 167 | if t.options.highTraceIDGenerator == nil { |
| 168 | t.options.highTraceIDGenerator = t.randomNumber |
| 169 | } |
| 170 | } else if t.options.highTraceIDGenerator != nil { |
| 171 | t.logger.Error("Overriding high trace ID generator but not generating " + |
| 172 | "128 bit trace IDs, consider enabling the \"Gen128Bit\" option") |
| 173 | } |
| 174 | if t.options.maxTagValueLength == 0 { |
| 175 | t.options.maxTagValueLength = DefaultMaxTagValueLength |
| 176 | } |
| 177 | t.process = Process{ |
| 178 | Service: serviceName, |
| 179 | UUID: strconv.FormatUint(t.randomNumber(), 16), |
| 180 | Tags: t.tags, |
| 181 | } |
| 182 | if throttler, ok := t.debugThrottler.(ProcessSetter); ok { |
| 183 | throttler.SetProcess(t.process) |
| 184 | } |
| 185 | |
| 186 | return t, t |
| 187 | } |
| 188 | |
| 189 | // addCodec adds registers injector and extractor for given propagation format if not already defined. |
| 190 | func (t *Tracer) addCodec(format interface{}, injector Injector, extractor Extractor) { |
| 191 | if _, ok := t.injectors[format]; !ok { |
| 192 | t.injectors[format] = injector |
| 193 | } |
| 194 | if _, ok := t.extractors[format]; !ok { |
| 195 | t.extractors[format] = extractor |
| 196 | } |
| 197 | } |
| 198 | |
| 199 | // StartSpan implements StartSpan() method of opentracing.Tracer. |
| 200 | func (t *Tracer) StartSpan( |
| 201 | operationName string, |
| 202 | options ...opentracing.StartSpanOption, |
| 203 | ) opentracing.Span { |
| 204 | sso := opentracing.StartSpanOptions{} |
| 205 | for _, o := range options { |
| 206 | o.Apply(&sso) |
| 207 | } |
| 208 | return t.startSpanWithOptions(operationName, sso) |
| 209 | } |
| 210 | |
| 211 | func (t *Tracer) startSpanWithOptions( |
| 212 | operationName string, |
| 213 | options opentracing.StartSpanOptions, |
| 214 | ) opentracing.Span { |
| 215 | if options.StartTime.IsZero() { |
| 216 | options.StartTime = t.timeNow() |
| 217 | } |
| 218 | |
| 219 | // Predicate whether the given span context is a valid reference |
| 220 | // which may be used as parent / debug ID / baggage items source |
| 221 | isValidReference := func(ctx SpanContext) bool { |
| 222 | return ctx.IsValid() || ctx.isDebugIDContainerOnly() || len(ctx.baggage) != 0 |
| 223 | } |
| 224 | |
| 225 | var references []Reference |
| 226 | var parent SpanContext |
| 227 | var hasParent bool // need this because `parent` is a value, not reference |
| 228 | var ctx SpanContext |
| 229 | var isSelfRef bool |
| 230 | for _, ref := range options.References { |
| 231 | ctxRef, ok := ref.ReferencedContext.(SpanContext) |
| 232 | if !ok { |
| 233 | t.logger.Error(fmt.Sprintf( |
| 234 | "Reference contains invalid type of SpanReference: %s", |
| 235 | reflect.ValueOf(ref.ReferencedContext))) |
| 236 | continue |
| 237 | } |
| 238 | if !isValidReference(ctxRef) { |
| 239 | continue |
| 240 | } |
| 241 | |
| 242 | if ref.Type == selfRefType { |
| 243 | isSelfRef = true |
| 244 | ctx = ctxRef |
| 245 | continue |
| 246 | } |
| 247 | |
| 248 | references = append(references, Reference{Type: ref.Type, Context: ctxRef}) |
| 249 | |
| 250 | if !hasParent { |
| 251 | parent = ctxRef |
| 252 | hasParent = ref.Type == opentracing.ChildOfRef |
| 253 | } |
| 254 | } |
| 255 | if !hasParent && isValidReference(parent) { |
| 256 | // If ChildOfRef wasn't found but a FollowFromRef exists, use the context from |
| 257 | // the FollowFromRef as the parent |
| 258 | hasParent = true |
| 259 | } |
| 260 | |
| 261 | rpcServer := false |
| 262 | if v, ok := options.Tags[ext.SpanKindRPCServer.Key]; ok { |
| 263 | rpcServer = (v == ext.SpanKindRPCServerEnum || v == string(ext.SpanKindRPCServerEnum)) |
| 264 | } |
| 265 | |
| 266 | var internalTags []Tag |
| 267 | newTrace := false |
| 268 | if !isSelfRef { |
| 269 | if !hasParent || !parent.IsValid() { |
| 270 | newTrace = true |
| 271 | ctx.traceID.Low = t.randomID() |
| 272 | if t.options.gen128Bit { |
| 273 | ctx.traceID.High = t.options.highTraceIDGenerator() |
| 274 | } |
| 275 | ctx.spanID = SpanID(ctx.traceID.Low) |
| 276 | ctx.parentID = 0 |
| 277 | ctx.samplingState = &samplingState{ |
| 278 | localRootSpan: ctx.spanID, |
| 279 | } |
| 280 | if hasParent && parent.isDebugIDContainerOnly() && t.isDebugAllowed(operationName) { |
| 281 | ctx.samplingState.setDebugAndSampled() |
| 282 | internalTags = append(internalTags, Tag{key: JaegerDebugHeader, value: parent.debugID}) |
| 283 | } |
| 284 | } else { |
| 285 | ctx.traceID = parent.traceID |
| 286 | if rpcServer && t.options.zipkinSharedRPCSpan { |
| 287 | // Support Zipkin's one-span-per-RPC model |
| 288 | ctx.spanID = parent.spanID |
| 289 | ctx.parentID = parent.parentID |
| 290 | } else { |
| 291 | ctx.spanID = SpanID(t.randomID()) |
| 292 | ctx.parentID = parent.spanID |
| 293 | } |
| 294 | ctx.samplingState = parent.samplingState |
| 295 | if parent.remote { |
| 296 | ctx.samplingState.setFinal() |
| 297 | ctx.samplingState.localRootSpan = ctx.spanID |
| 298 | } |
| 299 | } |
| 300 | if hasParent { |
| 301 | // copy baggage items |
| 302 | if l := len(parent.baggage); l > 0 { |
| 303 | ctx.baggage = make(map[string]string, len(parent.baggage)) |
| 304 | for k, v := range parent.baggage { |
| 305 | ctx.baggage[k] = v |
| 306 | } |
| 307 | } |
| 308 | } |
| 309 | } |
| 310 | |
| 311 | sp := t.newSpan() |
| 312 | sp.context = ctx |
| 313 | sp.tracer = t |
| 314 | sp.operationName = operationName |
| 315 | sp.startTime = options.StartTime |
| 316 | sp.duration = 0 |
| 317 | sp.references = references |
| 318 | sp.firstInProcess = rpcServer || sp.context.parentID == 0 |
| 319 | |
| 320 | if !sp.isSamplingFinalized() { |
| 321 | decision := t.sampler.OnCreateSpan(sp) |
| 322 | sp.applySamplingDecision(decision, false) |
| 323 | } |
| 324 | sp.observer = t.observer.OnStartSpan(sp, operationName, options) |
| 325 | |
| 326 | if tagsTotalLength := len(options.Tags) + len(internalTags); tagsTotalLength > 0 { |
| 327 | if sp.tags == nil || cap(sp.tags) < tagsTotalLength { |
| 328 | sp.tags = make([]Tag, 0, tagsTotalLength) |
| 329 | } |
| 330 | sp.tags = append(sp.tags, internalTags...) |
| 331 | for k, v := range options.Tags { |
| 332 | sp.setTagInternal(k, v, false) |
| 333 | } |
| 334 | } |
| 335 | t.emitNewSpanMetrics(sp, newTrace) |
| 336 | return sp |
| 337 | } |
| 338 | |
| 339 | // Inject implements Inject() method of opentracing.Tracer |
| 340 | func (t *Tracer) Inject(ctx opentracing.SpanContext, format interface{}, carrier interface{}) error { |
| 341 | c, ok := ctx.(SpanContext) |
| 342 | if !ok { |
| 343 | return opentracing.ErrInvalidSpanContext |
| 344 | } |
| 345 | if injector, ok := t.injectors[format]; ok { |
| 346 | return injector.Inject(c, carrier) |
| 347 | } |
| 348 | return opentracing.ErrUnsupportedFormat |
| 349 | } |
| 350 | |
| 351 | // Extract implements Extract() method of opentracing.Tracer |
| 352 | func (t *Tracer) Extract( |
| 353 | format interface{}, |
| 354 | carrier interface{}, |
| 355 | ) (opentracing.SpanContext, error) { |
| 356 | if extractor, ok := t.extractors[format]; ok { |
| 357 | spanCtx, err := extractor.Extract(carrier) |
| 358 | if err != nil { |
| 359 | return nil, err // ensure returned spanCtx is nil |
| 360 | } |
| 361 | spanCtx.remote = true |
| 362 | return spanCtx, nil |
| 363 | } |
| 364 | return nil, opentracing.ErrUnsupportedFormat |
| 365 | } |
| 366 | |
| 367 | // Close releases all resources used by the Tracer and flushes any remaining buffered spans. |
| 368 | func (t *Tracer) Close() error { |
| 369 | t.logger.Debugf("closing tracer") |
| 370 | t.reporter.Close() |
| 371 | t.sampler.Close() |
| 372 | if mgr, ok := t.baggageRestrictionManager.(io.Closer); ok { |
| 373 | _ = mgr.Close() |
| 374 | } |
| 375 | if throttler, ok := t.debugThrottler.(io.Closer); ok { |
| 376 | _ = throttler.Close() |
| 377 | } |
| 378 | return nil |
| 379 | } |
| 380 | |
| 381 | // Tags returns a slice of tracer-level tags. |
| 382 | func (t *Tracer) Tags() []opentracing.Tag { |
| 383 | tags := make([]opentracing.Tag, len(t.tags)) |
| 384 | for i, tag := range t.tags { |
| 385 | tags[i] = opentracing.Tag{Key: tag.key, Value: tag.value} |
| 386 | } |
| 387 | return tags |
| 388 | } |
| 389 | |
| 390 | // getTag returns the value of specific tag, if not exists, return nil. |
| 391 | // TODO only used by tests, move there. |
| 392 | func (t *Tracer) getTag(key string) (interface{}, bool) { |
| 393 | for _, tag := range t.tags { |
| 394 | if tag.key == key { |
| 395 | return tag.value, true |
| 396 | } |
| 397 | } |
| 398 | return nil, false |
| 399 | } |
| 400 | |
| 401 | // newSpan returns an instance of a clean Span object. |
| 402 | // If options.PoolSpans is true, the spans are retrieved from an object pool. |
| 403 | func (t *Tracer) newSpan() *Span { |
| 404 | return t.spanAllocator.Get() |
| 405 | } |
| 406 | |
| 407 | // emitNewSpanMetrics generates metrics on the number of started spans and traces. |
| 408 | // newTrace param: we cannot simply check for parentID==0 because in Zipkin model the |
| 409 | // server-side RPC span has the exact same trace/span/parent IDs as the |
| 410 | // calling client-side span, but obviously the server side span is |
| 411 | // no longer a root span of the trace. |
| 412 | func (t *Tracer) emitNewSpanMetrics(sp *Span, newTrace bool) { |
| 413 | if !sp.isSamplingFinalized() { |
| 414 | t.metrics.SpansStartedDelayedSampling.Inc(1) |
| 415 | if newTrace { |
| 416 | t.metrics.TracesStartedDelayedSampling.Inc(1) |
| 417 | } |
| 418 | // joining a trace is not possible, because sampling decision inherited from upstream is final |
| 419 | } else if sp.context.IsSampled() { |
| 420 | t.metrics.SpansStartedSampled.Inc(1) |
| 421 | if newTrace { |
| 422 | t.metrics.TracesStartedSampled.Inc(1) |
| 423 | } else if sp.firstInProcess { |
| 424 | t.metrics.TracesJoinedSampled.Inc(1) |
| 425 | } |
| 426 | } else { |
| 427 | t.metrics.SpansStartedNotSampled.Inc(1) |
| 428 | if newTrace { |
| 429 | t.metrics.TracesStartedNotSampled.Inc(1) |
| 430 | } else if sp.firstInProcess { |
| 431 | t.metrics.TracesJoinedNotSampled.Inc(1) |
| 432 | } |
| 433 | } |
| 434 | } |
| 435 | |
| 436 | func (t *Tracer) reportSpan(sp *Span) { |
| 437 | if !sp.isSamplingFinalized() { |
| 438 | t.metrics.SpansFinishedDelayedSampling.Inc(1) |
| 439 | } else if sp.context.IsSampled() { |
| 440 | t.metrics.SpansFinishedSampled.Inc(1) |
| 441 | } else { |
| 442 | t.metrics.SpansFinishedNotSampled.Inc(1) |
| 443 | } |
| 444 | |
| 445 | // Note: if the reporter is processing Span asynchronously then it needs to Retain() the span, |
| 446 | // and then Release() it when no longer needed. |
| 447 | // Otherwise, the span may be reused for another trace and its data may be overwritten. |
| 448 | if sp.context.IsSampled() { |
| 449 | t.reporter.Report(sp) |
| 450 | } |
| 451 | |
| 452 | sp.Release() |
| 453 | } |
| 454 | |
| 455 | // randomID generates a random trace/span ID, using tracer.random() generator. |
| 456 | // It never returns 0. |
| 457 | func (t *Tracer) randomID() uint64 { |
| 458 | val := t.randomNumber() |
| 459 | for val == 0 { |
| 460 | val = t.randomNumber() |
| 461 | } |
| 462 | return val |
| 463 | } |
| 464 | |
| 465 | // (NB) span must hold the lock before making this call |
| 466 | func (t *Tracer) setBaggage(sp *Span, key, value string) { |
| 467 | t.baggageSetter.setBaggage(sp, key, value) |
| 468 | } |
| 469 | |
| 470 | // (NB) span must hold the lock before making this call |
| 471 | func (t *Tracer) isDebugAllowed(operation string) bool { |
| 472 | return t.debugThrottler.IsAllowed(operation) |
| 473 | } |
| 474 | |
| 475 | // Sampler returns the sampler given to the tracer at creation. |
| 476 | func (t *Tracer) Sampler() SamplerV2 { |
| 477 | return t.sampler |
| 478 | } |
| 479 | |
| 480 | // SelfRef creates an opentracing compliant SpanReference from a jaeger |
| 481 | // SpanContext. This is a factory function in order to encapsulate jaeger specific |
| 482 | // types. |
| 483 | func SelfRef(ctx SpanContext) opentracing.SpanReference { |
| 484 | return opentracing.SpanReference{ |
| 485 | Type: selfRefType, |
| 486 | ReferencedContext: ctx, |
| 487 | } |
| 488 | } |