Naveen Sampath | 04696f7 | 2022-06-13 15:19:14 +0530 | [diff] [blame] | 1 | // Copyright (c) 2017-2018 Uber Technologies, Inc. |
| 2 | // |
| 3 | // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | // you may not use this file except in compliance with the License. |
| 5 | // You may obtain a copy of the License at |
| 6 | // |
| 7 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | // |
| 9 | // Unless required by applicable law or agreed to in writing, software |
| 10 | // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | // See the License for the specific language governing permissions and |
| 13 | // limitations under the License. |
| 14 | |
| 15 | package jaeger |
| 16 | |
| 17 | import ( |
| 18 | "sync" |
| 19 | "sync/atomic" |
| 20 | "time" |
| 21 | |
| 22 | "github.com/opentracing/opentracing-go" |
| 23 | "github.com/opentracing/opentracing-go/ext" |
| 24 | "github.com/opentracing/opentracing-go/log" |
| 25 | ) |
| 26 | |
| 27 | // Span implements opentracing.Span |
| 28 | type Span struct { |
| 29 | // referenceCounter used to increase the lifetime of |
| 30 | // the object before return it into the pool. |
| 31 | referenceCounter int32 |
| 32 | |
| 33 | sync.RWMutex |
| 34 | |
| 35 | tracer *Tracer |
| 36 | |
| 37 | // TODO: (breaking change) change to use a pointer |
| 38 | context SpanContext |
| 39 | |
| 40 | // The name of the "operation" this span is an instance of. |
| 41 | // Known as a "span name" in some implementations. |
| 42 | operationName string |
| 43 | |
| 44 | // firstInProcess, if true, indicates that this span is the root of the (sub)tree |
| 45 | // of spans in the current process. In other words it's true for the root spans, |
| 46 | // and the ingress spans when the process joins another trace. |
| 47 | firstInProcess bool |
| 48 | |
| 49 | // startTime is the timestamp indicating when the span began, with microseconds precision. |
| 50 | startTime time.Time |
| 51 | |
| 52 | // duration returns duration of the span with microseconds precision. |
| 53 | // Zero value means duration is unknown. |
| 54 | duration time.Duration |
| 55 | |
| 56 | // tags attached to this span |
| 57 | tags []Tag |
| 58 | |
| 59 | // The span's "micro-log" |
| 60 | logs []opentracing.LogRecord |
| 61 | |
| 62 | // The number of logs dropped because of MaxLogsPerSpan. |
| 63 | numDroppedLogs int |
| 64 | |
| 65 | // references for this span |
| 66 | references []Reference |
| 67 | |
| 68 | observer ContribSpanObserver |
| 69 | } |
| 70 | |
| 71 | // Tag is a simple key value wrapper. |
| 72 | // TODO (breaking change) deprecate in the next major release, use opentracing.Tag instead. |
| 73 | type Tag struct { |
| 74 | key string |
| 75 | value interface{} |
| 76 | } |
| 77 | |
| 78 | // NewTag creates a new Tag. |
| 79 | // TODO (breaking change) deprecate in the next major release, use opentracing.Tag instead. |
| 80 | func NewTag(key string, value interface{}) Tag { |
| 81 | return Tag{key: key, value: value} |
| 82 | } |
| 83 | |
| 84 | // SetOperationName sets or changes the operation name. |
| 85 | func (s *Span) SetOperationName(operationName string) opentracing.Span { |
| 86 | s.Lock() |
| 87 | s.operationName = operationName |
| 88 | ctx := s.context |
| 89 | s.Unlock() |
| 90 | if !ctx.isSamplingFinalized() { |
| 91 | decision := s.tracer.sampler.OnSetOperationName(s, operationName) |
| 92 | s.applySamplingDecision(decision, true) |
| 93 | } |
| 94 | s.observer.OnSetOperationName(operationName) |
| 95 | return s |
| 96 | } |
| 97 | |
| 98 | // SetTag implements SetTag() of opentracing.Span |
| 99 | func (s *Span) SetTag(key string, value interface{}) opentracing.Span { |
| 100 | return s.setTagInternal(key, value, true) |
| 101 | } |
| 102 | |
| 103 | func (s *Span) setTagInternal(key string, value interface{}, lock bool) opentracing.Span { |
| 104 | var ctx SpanContext |
| 105 | var operationName string |
| 106 | if lock { |
| 107 | ctx = s.SpanContext() |
| 108 | operationName = s.OperationName() |
| 109 | } else { |
| 110 | ctx = s.context |
| 111 | operationName = s.operationName |
| 112 | } |
| 113 | |
| 114 | s.observer.OnSetTag(key, value) |
| 115 | if key == string(ext.SamplingPriority) && !setSamplingPriority(ctx.samplingState, operationName, s.tracer, value) { |
| 116 | return s |
| 117 | } |
| 118 | if !ctx.isSamplingFinalized() { |
| 119 | decision := s.tracer.sampler.OnSetTag(s, key, value) |
| 120 | s.applySamplingDecision(decision, lock) |
| 121 | } |
| 122 | if ctx.isWriteable() { |
| 123 | if lock { |
| 124 | s.Lock() |
| 125 | defer s.Unlock() |
| 126 | } |
| 127 | s.appendTagNoLocking(key, value) |
| 128 | } |
| 129 | return s |
| 130 | } |
| 131 | |
| 132 | // SpanContext returns span context |
| 133 | func (s *Span) SpanContext() SpanContext { |
| 134 | s.Lock() |
| 135 | defer s.Unlock() |
| 136 | return s.context |
| 137 | } |
| 138 | |
| 139 | // StartTime returns span start time |
| 140 | func (s *Span) StartTime() time.Time { |
| 141 | s.Lock() |
| 142 | defer s.Unlock() |
| 143 | return s.startTime |
| 144 | } |
| 145 | |
| 146 | // Duration returns span duration |
| 147 | func (s *Span) Duration() time.Duration { |
| 148 | s.Lock() |
| 149 | defer s.Unlock() |
| 150 | return s.duration |
| 151 | } |
| 152 | |
| 153 | // Tags returns tags for span |
| 154 | func (s *Span) Tags() opentracing.Tags { |
| 155 | s.Lock() |
| 156 | defer s.Unlock() |
| 157 | var result = make(opentracing.Tags, len(s.tags)) |
| 158 | for _, tag := range s.tags { |
| 159 | result[tag.key] = tag.value |
| 160 | } |
| 161 | return result |
| 162 | } |
| 163 | |
| 164 | // Logs returns micro logs for span |
| 165 | func (s *Span) Logs() []opentracing.LogRecord { |
| 166 | s.Lock() |
| 167 | defer s.Unlock() |
| 168 | |
| 169 | logs := append([]opentracing.LogRecord(nil), s.logs...) |
| 170 | if s.numDroppedLogs != 0 { |
| 171 | fixLogs(logs, s.numDroppedLogs) |
| 172 | } |
| 173 | |
| 174 | return logs |
| 175 | } |
| 176 | |
| 177 | // References returns references for this span |
| 178 | func (s *Span) References() []opentracing.SpanReference { |
| 179 | s.Lock() |
| 180 | defer s.Unlock() |
| 181 | |
| 182 | if s.references == nil || len(s.references) == 0 { |
| 183 | return nil |
| 184 | } |
| 185 | |
| 186 | result := make([]opentracing.SpanReference, len(s.references)) |
| 187 | for i, r := range s.references { |
| 188 | result[i] = opentracing.SpanReference{Type: r.Type, ReferencedContext: r.Context} |
| 189 | } |
| 190 | return result |
| 191 | } |
| 192 | |
| 193 | func (s *Span) appendTagNoLocking(key string, value interface{}) { |
| 194 | s.tags = append(s.tags, Tag{key: key, value: value}) |
| 195 | } |
| 196 | |
| 197 | // LogFields implements opentracing.Span API |
| 198 | func (s *Span) LogFields(fields ...log.Field) { |
| 199 | s.Lock() |
| 200 | defer s.Unlock() |
| 201 | if !s.context.IsSampled() { |
| 202 | return |
| 203 | } |
| 204 | s.logFieldsNoLocking(fields...) |
| 205 | } |
| 206 | |
| 207 | // this function should only be called while holding a Write lock |
| 208 | func (s *Span) logFieldsNoLocking(fields ...log.Field) { |
| 209 | lr := opentracing.LogRecord{ |
| 210 | Fields: fields, |
| 211 | Timestamp: time.Now(), |
| 212 | } |
| 213 | s.appendLogNoLocking(lr) |
| 214 | } |
| 215 | |
| 216 | // LogKV implements opentracing.Span API |
| 217 | func (s *Span) LogKV(alternatingKeyValues ...interface{}) { |
| 218 | s.RLock() |
| 219 | sampled := s.context.IsSampled() |
| 220 | s.RUnlock() |
| 221 | if !sampled { |
| 222 | return |
| 223 | } |
| 224 | fields, err := log.InterleavedKVToFields(alternatingKeyValues...) |
| 225 | if err != nil { |
| 226 | s.LogFields(log.Error(err), log.String("function", "LogKV")) |
| 227 | return |
| 228 | } |
| 229 | s.LogFields(fields...) |
| 230 | } |
| 231 | |
| 232 | // LogEvent implements opentracing.Span API |
| 233 | func (s *Span) LogEvent(event string) { |
| 234 | s.Log(opentracing.LogData{Event: event}) |
| 235 | } |
| 236 | |
| 237 | // LogEventWithPayload implements opentracing.Span API |
| 238 | func (s *Span) LogEventWithPayload(event string, payload interface{}) { |
| 239 | s.Log(opentracing.LogData{Event: event, Payload: payload}) |
| 240 | } |
| 241 | |
| 242 | // Log implements opentracing.Span API |
| 243 | func (s *Span) Log(ld opentracing.LogData) { |
| 244 | s.Lock() |
| 245 | defer s.Unlock() |
| 246 | if s.context.IsSampled() { |
| 247 | if ld.Timestamp.IsZero() { |
| 248 | ld.Timestamp = s.tracer.timeNow() |
| 249 | } |
| 250 | s.appendLogNoLocking(ld.ToLogRecord()) |
| 251 | } |
| 252 | } |
| 253 | |
| 254 | // this function should only be called while holding a Write lock |
| 255 | func (s *Span) appendLogNoLocking(lr opentracing.LogRecord) { |
| 256 | maxLogs := s.tracer.options.maxLogsPerSpan |
| 257 | if maxLogs == 0 || len(s.logs) < maxLogs { |
| 258 | s.logs = append(s.logs, lr) |
| 259 | return |
| 260 | } |
| 261 | |
| 262 | // We have too many logs. We don't touch the first numOld logs; we treat the |
| 263 | // rest as a circular buffer and overwrite the oldest log among those. |
| 264 | numOld := (maxLogs - 1) / 2 |
| 265 | numNew := maxLogs - numOld |
| 266 | s.logs[numOld+s.numDroppedLogs%numNew] = lr |
| 267 | s.numDroppedLogs++ |
| 268 | } |
| 269 | |
| 270 | // rotateLogBuffer rotates the records in the buffer: records 0 to pos-1 move at |
| 271 | // the end (i.e. pos circular left shifts). |
| 272 | func rotateLogBuffer(buf []opentracing.LogRecord, pos int) { |
| 273 | // This algorithm is described in: |
| 274 | // http://www.cplusplus.com/reference/algorithm/rotate |
| 275 | for first, middle, next := 0, pos, pos; first != middle; { |
| 276 | buf[first], buf[next] = buf[next], buf[first] |
| 277 | first++ |
| 278 | next++ |
| 279 | if next == len(buf) { |
| 280 | next = middle |
| 281 | } else if first == middle { |
| 282 | middle = next |
| 283 | } |
| 284 | } |
| 285 | } |
| 286 | |
| 287 | func fixLogs(logs []opentracing.LogRecord, numDroppedLogs int) { |
| 288 | // We dropped some log events, which means that we used part of Logs as a |
| 289 | // circular buffer (see appendLog). De-circularize it. |
| 290 | numOld := (len(logs) - 1) / 2 |
| 291 | numNew := len(logs) - numOld |
| 292 | rotateLogBuffer(logs[numOld:], numDroppedLogs%numNew) |
| 293 | |
| 294 | // Replace the log in the middle (the oldest "new" log) with information |
| 295 | // about the dropped logs. This means that we are effectively dropping one |
| 296 | // more "new" log. |
| 297 | numDropped := numDroppedLogs + 1 |
| 298 | logs[numOld] = opentracing.LogRecord{ |
| 299 | // Keep the timestamp of the last dropped event. |
| 300 | Timestamp: logs[numOld].Timestamp, |
| 301 | Fields: []log.Field{ |
| 302 | log.String("event", "dropped Span logs"), |
| 303 | log.Int("dropped_log_count", numDropped), |
| 304 | log.String("component", "jaeger-client"), |
| 305 | }, |
| 306 | } |
| 307 | } |
| 308 | |
| 309 | func (s *Span) fixLogsIfDropped() { |
| 310 | if s.numDroppedLogs == 0 { |
| 311 | return |
| 312 | } |
| 313 | fixLogs(s.logs, s.numDroppedLogs) |
| 314 | s.numDroppedLogs = 0 |
| 315 | } |
| 316 | |
| 317 | // SetBaggageItem implements SetBaggageItem() of opentracing.SpanContext. |
| 318 | // The call is proxied via tracer.baggageSetter to allow policies to be applied |
| 319 | // before allowing to set/replace baggage keys. |
| 320 | // The setter eventually stores a new SpanContext with extended baggage: |
| 321 | // |
| 322 | // span.context = span.context.WithBaggageItem(key, value) |
| 323 | // |
| 324 | // See SpanContext.WithBaggageItem() for explanation why it's done this way. |
| 325 | func (s *Span) SetBaggageItem(key, value string) opentracing.Span { |
| 326 | s.Lock() |
| 327 | defer s.Unlock() |
| 328 | s.tracer.setBaggage(s, key, value) |
| 329 | return s |
| 330 | } |
| 331 | |
| 332 | // BaggageItem implements BaggageItem() of opentracing.SpanContext |
| 333 | func (s *Span) BaggageItem(key string) string { |
| 334 | s.RLock() |
| 335 | defer s.RUnlock() |
| 336 | return s.context.baggage[key] |
| 337 | } |
| 338 | |
| 339 | // Finish implements opentracing.Span API |
| 340 | // After finishing the Span object it returns back to the allocator unless the reporter retains it again, |
| 341 | // so after that, the Span object should no longer be used because it won't be valid anymore. |
| 342 | func (s *Span) Finish() { |
| 343 | s.FinishWithOptions(opentracing.FinishOptions{}) |
| 344 | } |
| 345 | |
| 346 | // FinishWithOptions implements opentracing.Span API |
| 347 | func (s *Span) FinishWithOptions(options opentracing.FinishOptions) { |
| 348 | if options.FinishTime.IsZero() { |
| 349 | options.FinishTime = s.tracer.timeNow() |
| 350 | } |
| 351 | s.observer.OnFinish(options) |
| 352 | s.Lock() |
| 353 | s.duration = options.FinishTime.Sub(s.startTime) |
| 354 | ctx := s.context |
| 355 | s.Unlock() |
| 356 | if !ctx.isSamplingFinalized() { |
| 357 | decision := s.tracer.sampler.OnFinishSpan(s) |
| 358 | s.applySamplingDecision(decision, true) |
| 359 | } |
| 360 | if ctx.IsSampled() { |
| 361 | s.Lock() |
| 362 | s.fixLogsIfDropped() |
| 363 | if len(options.LogRecords) > 0 || len(options.BulkLogData) > 0 { |
| 364 | // Note: bulk logs are not subject to maxLogsPerSpan limit |
| 365 | if options.LogRecords != nil { |
| 366 | s.logs = append(s.logs, options.LogRecords...) |
| 367 | } |
| 368 | for _, ld := range options.BulkLogData { |
| 369 | s.logs = append(s.logs, ld.ToLogRecord()) |
| 370 | } |
| 371 | } |
| 372 | s.Unlock() |
| 373 | } |
| 374 | // call reportSpan even for non-sampled traces, to return span to the pool |
| 375 | // and update metrics counter |
| 376 | s.tracer.reportSpan(s) |
| 377 | } |
| 378 | |
| 379 | // Context implements opentracing.Span API |
| 380 | func (s *Span) Context() opentracing.SpanContext { |
| 381 | s.Lock() |
| 382 | defer s.Unlock() |
| 383 | return s.context |
| 384 | } |
| 385 | |
| 386 | // Tracer implements opentracing.Span API |
| 387 | func (s *Span) Tracer() opentracing.Tracer { |
| 388 | return s.tracer |
| 389 | } |
| 390 | |
| 391 | func (s *Span) String() string { |
| 392 | s.RLock() |
| 393 | defer s.RUnlock() |
| 394 | return s.context.String() |
| 395 | } |
| 396 | |
| 397 | // OperationName allows retrieving current operation name. |
| 398 | func (s *Span) OperationName() string { |
| 399 | s.RLock() |
| 400 | defer s.RUnlock() |
| 401 | return s.operationName |
| 402 | } |
| 403 | |
| 404 | // Retain increases object counter to increase the lifetime of the object |
| 405 | func (s *Span) Retain() *Span { |
| 406 | atomic.AddInt32(&s.referenceCounter, 1) |
| 407 | return s |
| 408 | } |
| 409 | |
| 410 | // Release decrements object counter and return to the |
| 411 | // allocator manager when counter will below zero |
| 412 | func (s *Span) Release() { |
| 413 | if atomic.AddInt32(&s.referenceCounter, -1) == -1 { |
| 414 | s.tracer.spanAllocator.Put(s) |
| 415 | } |
| 416 | } |
| 417 | |
| 418 | // reset span state and release unused data |
| 419 | func (s *Span) reset() { |
| 420 | s.firstInProcess = false |
| 421 | s.context = emptyContext |
| 422 | s.operationName = "" |
| 423 | s.tracer = nil |
| 424 | s.startTime = time.Time{} |
| 425 | s.duration = 0 |
| 426 | s.observer = nil |
| 427 | atomic.StoreInt32(&s.referenceCounter, 0) |
| 428 | |
| 429 | // Note: To reuse memory we can save the pointers on the heap |
| 430 | s.tags = s.tags[:0] |
| 431 | s.logs = s.logs[:0] |
| 432 | s.numDroppedLogs = 0 |
| 433 | s.references = s.references[:0] |
| 434 | } |
| 435 | |
| 436 | func (s *Span) serviceName() string { |
| 437 | return s.tracer.serviceName |
| 438 | } |
| 439 | |
| 440 | func (s *Span) applySamplingDecision(decision SamplingDecision, lock bool) { |
| 441 | var ctx SpanContext |
| 442 | if lock { |
| 443 | ctx = s.SpanContext() |
| 444 | } else { |
| 445 | ctx = s.context |
| 446 | } |
| 447 | |
| 448 | if !decision.Retryable { |
| 449 | ctx.samplingState.setFinal() |
| 450 | } |
| 451 | if decision.Sample { |
| 452 | ctx.samplingState.setSampled() |
| 453 | if len(decision.Tags) > 0 { |
| 454 | if lock { |
| 455 | s.Lock() |
| 456 | defer s.Unlock() |
| 457 | } |
| 458 | for _, tag := range decision.Tags { |
| 459 | s.appendTagNoLocking(tag.key, tag.value) |
| 460 | } |
| 461 | } |
| 462 | } |
| 463 | } |
| 464 | |
| 465 | // setSamplingPriority returns true if the flag was updated successfully, false otherwise. |
| 466 | // The behavior of setSamplingPriority is surprising |
| 467 | // If noDebugFlagOnForcedSampling is set |
| 468 | // setSamplingPriority(..., 1) always sets only flagSampled |
| 469 | // If noDebugFlagOnForcedSampling is unset, and isDebugAllowed passes |
| 470 | // setSamplingPriority(..., 1) sets both flagSampled and flagDebug |
| 471 | // However, |
| 472 | // setSamplingPriority(..., 0) always only resets flagSampled |
| 473 | // |
| 474 | // This means that doing a setSamplingPriority(..., 1) followed by setSamplingPriority(..., 0) can |
| 475 | // leave flagDebug set |
| 476 | func setSamplingPriority(state *samplingState, operationName string, tracer *Tracer, value interface{}) bool { |
| 477 | val, ok := value.(uint16) |
| 478 | if !ok { |
| 479 | return false |
| 480 | } |
| 481 | if val == 0 { |
| 482 | state.unsetSampled() |
| 483 | state.setFinal() |
| 484 | return true |
| 485 | } |
| 486 | if tracer.options.noDebugFlagOnForcedSampling { |
| 487 | state.setSampled() |
| 488 | state.setFinal() |
| 489 | return true |
| 490 | } else if tracer.isDebugAllowed(operationName) { |
| 491 | state.setDebugAndSampled() |
| 492 | state.setFinal() |
| 493 | return true |
| 494 | } |
| 495 | return false |
| 496 | } |
| 497 | |
| 498 | // EnableFirehose enables firehose flag on the span context |
| 499 | func EnableFirehose(s *Span) { |
| 500 | s.Lock() |
| 501 | defer s.Unlock() |
| 502 | s.context.samplingState.setFirehose() |
| 503 | } |