Girish Gowdra | 631ef3d | 2020-06-15 10:45:52 -0700 | [diff] [blame] | 1 | // Copyright (c) 2017-2018 Uber Technologies, Inc. |
| 2 | // |
| 3 | // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | // you may not use this file except in compliance with the License. |
| 5 | // You may obtain a copy of the License at |
| 6 | // |
| 7 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | // |
| 9 | // Unless required by applicable law or agreed to in writing, software |
| 10 | // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | // See the License for the specific language governing permissions and |
| 13 | // limitations under the License. |
| 14 | |
| 15 | package jaeger |
| 16 | |
| 17 | import ( |
| 18 | "sync" |
| 19 | "sync/atomic" |
| 20 | "time" |
| 21 | |
| 22 | "github.com/opentracing/opentracing-go" |
| 23 | "github.com/opentracing/opentracing-go/ext" |
| 24 | "github.com/opentracing/opentracing-go/log" |
| 25 | ) |
| 26 | |
| 27 | // Span implements opentracing.Span |
| 28 | type Span struct { |
| 29 | // referenceCounter used to increase the lifetime of |
| 30 | // the object before return it into the pool. |
| 31 | referenceCounter int32 |
| 32 | |
| 33 | sync.RWMutex |
| 34 | |
| 35 | tracer *Tracer |
| 36 | |
| 37 | // TODO: (breaking change) change to use a pointer |
| 38 | context SpanContext |
| 39 | |
| 40 | // The name of the "operation" this span is an instance of. |
| 41 | // Known as a "span name" in some implementations. |
| 42 | operationName string |
| 43 | |
| 44 | // firstInProcess, if true, indicates that this span is the root of the (sub)tree |
| 45 | // of spans in the current process. In other words it's true for the root spans, |
| 46 | // and the ingress spans when the process joins another trace. |
| 47 | firstInProcess bool |
| 48 | |
| 49 | // startTime is the timestamp indicating when the span began, with microseconds precision. |
| 50 | startTime time.Time |
| 51 | |
| 52 | // duration returns duration of the span with microseconds precision. |
| 53 | // Zero value means duration is unknown. |
| 54 | duration time.Duration |
| 55 | |
| 56 | // tags attached to this span |
| 57 | tags []Tag |
| 58 | |
| 59 | // The span's "micro-log" |
| 60 | logs []opentracing.LogRecord |
| 61 | |
| 62 | // The number of logs dropped because of MaxLogsPerSpan. |
| 63 | numDroppedLogs int |
| 64 | |
| 65 | // references for this span |
| 66 | references []Reference |
| 67 | |
| 68 | observer ContribSpanObserver |
| 69 | } |
| 70 | |
| 71 | // Tag is a simple key value wrapper. |
| 72 | // TODO (breaking change) deprecate in the next major release, use opentracing.Tag instead. |
| 73 | type Tag struct { |
| 74 | key string |
| 75 | value interface{} |
| 76 | } |
| 77 | |
| 78 | // NewTag creates a new Tag. |
| 79 | // TODO (breaking change) deprecate in the next major release, use opentracing.Tag instead. |
| 80 | func NewTag(key string, value interface{}) Tag { |
| 81 | return Tag{key: key, value: value} |
| 82 | } |
| 83 | |
| 84 | // SetOperationName sets or changes the operation name. |
| 85 | func (s *Span) SetOperationName(operationName string) opentracing.Span { |
| 86 | s.Lock() |
| 87 | s.operationName = operationName |
| 88 | s.Unlock() |
| 89 | if !s.isSamplingFinalized() { |
| 90 | decision := s.tracer.sampler.OnSetOperationName(s, operationName) |
| 91 | s.applySamplingDecision(decision, true) |
| 92 | } |
| 93 | s.observer.OnSetOperationName(operationName) |
| 94 | return s |
| 95 | } |
| 96 | |
| 97 | // SetTag implements SetTag() of opentracing.Span |
| 98 | func (s *Span) SetTag(key string, value interface{}) opentracing.Span { |
| 99 | return s.setTagInternal(key, value, true) |
| 100 | } |
| 101 | |
| 102 | func (s *Span) setTagInternal(key string, value interface{}, lock bool) opentracing.Span { |
| 103 | s.observer.OnSetTag(key, value) |
| 104 | if key == string(ext.SamplingPriority) && !setSamplingPriority(s, value) { |
| 105 | return s |
| 106 | } |
| 107 | if !s.isSamplingFinalized() { |
| 108 | decision := s.tracer.sampler.OnSetTag(s, key, value) |
| 109 | s.applySamplingDecision(decision, lock) |
| 110 | } |
| 111 | if s.isWriteable() { |
| 112 | if lock { |
| 113 | s.Lock() |
| 114 | defer s.Unlock() |
| 115 | } |
| 116 | s.appendTagNoLocking(key, value) |
| 117 | } |
| 118 | return s |
| 119 | } |
| 120 | |
| 121 | // SpanContext returns span context |
| 122 | func (s *Span) SpanContext() SpanContext { |
| 123 | s.Lock() |
| 124 | defer s.Unlock() |
| 125 | return s.context |
| 126 | } |
| 127 | |
| 128 | // StartTime returns span start time |
| 129 | func (s *Span) StartTime() time.Time { |
| 130 | s.Lock() |
| 131 | defer s.Unlock() |
| 132 | return s.startTime |
| 133 | } |
| 134 | |
| 135 | // Duration returns span duration |
| 136 | func (s *Span) Duration() time.Duration { |
| 137 | s.Lock() |
| 138 | defer s.Unlock() |
| 139 | return s.duration |
| 140 | } |
| 141 | |
| 142 | // Tags returns tags for span |
| 143 | func (s *Span) Tags() opentracing.Tags { |
| 144 | s.Lock() |
| 145 | defer s.Unlock() |
| 146 | var result = make(opentracing.Tags, len(s.tags)) |
| 147 | for _, tag := range s.tags { |
| 148 | result[tag.key] = tag.value |
| 149 | } |
| 150 | return result |
| 151 | } |
| 152 | |
| 153 | // Logs returns micro logs for span |
| 154 | func (s *Span) Logs() []opentracing.LogRecord { |
| 155 | s.Lock() |
| 156 | defer s.Unlock() |
| 157 | |
| 158 | logs := append([]opentracing.LogRecord(nil), s.logs...) |
| 159 | if s.numDroppedLogs != 0 { |
| 160 | fixLogs(logs, s.numDroppedLogs) |
| 161 | } |
| 162 | |
| 163 | return logs |
| 164 | } |
| 165 | |
| 166 | // References returns references for this span |
| 167 | func (s *Span) References() []opentracing.SpanReference { |
| 168 | s.Lock() |
| 169 | defer s.Unlock() |
| 170 | |
| 171 | if s.references == nil || len(s.references) == 0 { |
| 172 | return nil |
| 173 | } |
| 174 | |
| 175 | result := make([]opentracing.SpanReference, len(s.references)) |
| 176 | for i, r := range s.references { |
| 177 | result[i] = opentracing.SpanReference{Type: r.Type, ReferencedContext: r.Context} |
| 178 | } |
| 179 | return result |
| 180 | } |
| 181 | |
| 182 | func (s *Span) appendTagNoLocking(key string, value interface{}) { |
| 183 | s.tags = append(s.tags, Tag{key: key, value: value}) |
| 184 | } |
| 185 | |
| 186 | // LogFields implements opentracing.Span API |
| 187 | func (s *Span) LogFields(fields ...log.Field) { |
| 188 | s.Lock() |
| 189 | defer s.Unlock() |
| 190 | if !s.context.IsSampled() { |
| 191 | return |
| 192 | } |
| 193 | s.logFieldsNoLocking(fields...) |
| 194 | } |
| 195 | |
| 196 | // this function should only be called while holding a Write lock |
| 197 | func (s *Span) logFieldsNoLocking(fields ...log.Field) { |
| 198 | lr := opentracing.LogRecord{ |
| 199 | Fields: fields, |
| 200 | Timestamp: time.Now(), |
| 201 | } |
| 202 | s.appendLogNoLocking(lr) |
| 203 | } |
| 204 | |
| 205 | // LogKV implements opentracing.Span API |
| 206 | func (s *Span) LogKV(alternatingKeyValues ...interface{}) { |
| 207 | s.RLock() |
| 208 | sampled := s.context.IsSampled() |
| 209 | s.RUnlock() |
| 210 | if !sampled { |
| 211 | return |
| 212 | } |
| 213 | fields, err := log.InterleavedKVToFields(alternatingKeyValues...) |
| 214 | if err != nil { |
| 215 | s.LogFields(log.Error(err), log.String("function", "LogKV")) |
| 216 | return |
| 217 | } |
| 218 | s.LogFields(fields...) |
| 219 | } |
| 220 | |
| 221 | // LogEvent implements opentracing.Span API |
| 222 | func (s *Span) LogEvent(event string) { |
| 223 | s.Log(opentracing.LogData{Event: event}) |
| 224 | } |
| 225 | |
| 226 | // LogEventWithPayload implements opentracing.Span API |
| 227 | func (s *Span) LogEventWithPayload(event string, payload interface{}) { |
| 228 | s.Log(opentracing.LogData{Event: event, Payload: payload}) |
| 229 | } |
| 230 | |
| 231 | // Log implements opentracing.Span API |
| 232 | func (s *Span) Log(ld opentracing.LogData) { |
| 233 | s.Lock() |
| 234 | defer s.Unlock() |
| 235 | if s.context.IsSampled() { |
| 236 | if ld.Timestamp.IsZero() { |
| 237 | ld.Timestamp = s.tracer.timeNow() |
| 238 | } |
| 239 | s.appendLogNoLocking(ld.ToLogRecord()) |
| 240 | } |
| 241 | } |
| 242 | |
| 243 | // this function should only be called while holding a Write lock |
| 244 | func (s *Span) appendLogNoLocking(lr opentracing.LogRecord) { |
| 245 | maxLogs := s.tracer.options.maxLogsPerSpan |
| 246 | if maxLogs == 0 || len(s.logs) < maxLogs { |
| 247 | s.logs = append(s.logs, lr) |
| 248 | return |
| 249 | } |
| 250 | |
| 251 | // We have too many logs. We don't touch the first numOld logs; we treat the |
| 252 | // rest as a circular buffer and overwrite the oldest log among those. |
| 253 | numOld := (maxLogs - 1) / 2 |
| 254 | numNew := maxLogs - numOld |
| 255 | s.logs[numOld+s.numDroppedLogs%numNew] = lr |
| 256 | s.numDroppedLogs++ |
| 257 | } |
| 258 | |
| 259 | // rotateLogBuffer rotates the records in the buffer: records 0 to pos-1 move at |
| 260 | // the end (i.e. pos circular left shifts). |
| 261 | func rotateLogBuffer(buf []opentracing.LogRecord, pos int) { |
| 262 | // This algorithm is described in: |
| 263 | // http://www.cplusplus.com/reference/algorithm/rotate |
| 264 | for first, middle, next := 0, pos, pos; first != middle; { |
| 265 | buf[first], buf[next] = buf[next], buf[first] |
| 266 | first++ |
| 267 | next++ |
| 268 | if next == len(buf) { |
| 269 | next = middle |
| 270 | } else if first == middle { |
| 271 | middle = next |
| 272 | } |
| 273 | } |
| 274 | } |
| 275 | |
| 276 | func fixLogs(logs []opentracing.LogRecord, numDroppedLogs int) { |
| 277 | // We dropped some log events, which means that we used part of Logs as a |
| 278 | // circular buffer (see appendLog). De-circularize it. |
| 279 | numOld := (len(logs) - 1) / 2 |
| 280 | numNew := len(logs) - numOld |
| 281 | rotateLogBuffer(logs[numOld:], numDroppedLogs%numNew) |
| 282 | |
| 283 | // Replace the log in the middle (the oldest "new" log) with information |
| 284 | // about the dropped logs. This means that we are effectively dropping one |
| 285 | // more "new" log. |
| 286 | numDropped := numDroppedLogs + 1 |
| 287 | logs[numOld] = opentracing.LogRecord{ |
| 288 | // Keep the timestamp of the last dropped event. |
| 289 | Timestamp: logs[numOld].Timestamp, |
| 290 | Fields: []log.Field{ |
| 291 | log.String("event", "dropped Span logs"), |
| 292 | log.Int("dropped_log_count", numDropped), |
| 293 | log.String("component", "jaeger-client"), |
| 294 | }, |
| 295 | } |
| 296 | } |
| 297 | |
| 298 | func (s *Span) fixLogsIfDropped() { |
| 299 | if s.numDroppedLogs == 0 { |
| 300 | return |
| 301 | } |
| 302 | fixLogs(s.logs, s.numDroppedLogs) |
| 303 | s.numDroppedLogs = 0 |
| 304 | } |
| 305 | |
| 306 | // SetBaggageItem implements SetBaggageItem() of opentracing.SpanContext |
| 307 | func (s *Span) SetBaggageItem(key, value string) opentracing.Span { |
| 308 | s.Lock() |
| 309 | defer s.Unlock() |
| 310 | s.tracer.setBaggage(s, key, value) |
| 311 | return s |
| 312 | } |
| 313 | |
| 314 | // BaggageItem implements BaggageItem() of opentracing.SpanContext |
| 315 | func (s *Span) BaggageItem(key string) string { |
| 316 | s.RLock() |
| 317 | defer s.RUnlock() |
| 318 | return s.context.baggage[key] |
| 319 | } |
| 320 | |
| 321 | // Finish implements opentracing.Span API |
| 322 | // After finishing the Span object it returns back to the allocator unless the reporter retains it again, |
| 323 | // so after that, the Span object should no longer be used because it won't be valid anymore. |
| 324 | func (s *Span) Finish() { |
| 325 | s.FinishWithOptions(opentracing.FinishOptions{}) |
| 326 | } |
| 327 | |
| 328 | // FinishWithOptions implements opentracing.Span API |
| 329 | func (s *Span) FinishWithOptions(options opentracing.FinishOptions) { |
| 330 | if options.FinishTime.IsZero() { |
| 331 | options.FinishTime = s.tracer.timeNow() |
| 332 | } |
| 333 | s.observer.OnFinish(options) |
| 334 | s.Lock() |
| 335 | s.duration = options.FinishTime.Sub(s.startTime) |
| 336 | s.Unlock() |
| 337 | if !s.isSamplingFinalized() { |
| 338 | decision := s.tracer.sampler.OnFinishSpan(s) |
| 339 | s.applySamplingDecision(decision, true) |
| 340 | } |
| 341 | if s.context.IsSampled() { |
| 342 | s.Lock() |
| 343 | s.fixLogsIfDropped() |
| 344 | if len(options.LogRecords) > 0 || len(options.BulkLogData) > 0 { |
| 345 | // Note: bulk logs are not subject to maxLogsPerSpan limit |
| 346 | if options.LogRecords != nil { |
| 347 | s.logs = append(s.logs, options.LogRecords...) |
| 348 | } |
| 349 | for _, ld := range options.BulkLogData { |
| 350 | s.logs = append(s.logs, ld.ToLogRecord()) |
| 351 | } |
| 352 | } |
| 353 | s.Unlock() |
| 354 | } |
| 355 | // call reportSpan even for non-sampled traces, to return span to the pool |
| 356 | // and update metrics counter |
| 357 | s.tracer.reportSpan(s) |
| 358 | } |
| 359 | |
| 360 | // Context implements opentracing.Span API |
| 361 | func (s *Span) Context() opentracing.SpanContext { |
| 362 | s.Lock() |
| 363 | defer s.Unlock() |
| 364 | return s.context |
| 365 | } |
| 366 | |
| 367 | // Tracer implements opentracing.Span API |
| 368 | func (s *Span) Tracer() opentracing.Tracer { |
| 369 | return s.tracer |
| 370 | } |
| 371 | |
| 372 | func (s *Span) String() string { |
| 373 | s.RLock() |
| 374 | defer s.RUnlock() |
| 375 | return s.context.String() |
| 376 | } |
| 377 | |
| 378 | // OperationName allows retrieving current operation name. |
| 379 | func (s *Span) OperationName() string { |
| 380 | s.RLock() |
| 381 | defer s.RUnlock() |
| 382 | return s.operationName |
| 383 | } |
| 384 | |
| 385 | // Retain increases object counter to increase the lifetime of the object |
| 386 | func (s *Span) Retain() *Span { |
| 387 | atomic.AddInt32(&s.referenceCounter, 1) |
| 388 | return s |
| 389 | } |
| 390 | |
| 391 | // Release decrements object counter and return to the |
| 392 | // allocator manager when counter will below zero |
| 393 | func (s *Span) Release() { |
| 394 | if atomic.AddInt32(&s.referenceCounter, -1) == -1 { |
| 395 | s.tracer.spanAllocator.Put(s) |
| 396 | } |
| 397 | } |
| 398 | |
| 399 | // reset span state and release unused data |
| 400 | func (s *Span) reset() { |
| 401 | s.firstInProcess = false |
| 402 | s.context = emptyContext |
| 403 | s.operationName = "" |
| 404 | s.tracer = nil |
| 405 | s.startTime = time.Time{} |
| 406 | s.duration = 0 |
| 407 | s.observer = nil |
| 408 | atomic.StoreInt32(&s.referenceCounter, 0) |
| 409 | |
| 410 | // Note: To reuse memory we can save the pointers on the heap |
| 411 | s.tags = s.tags[:0] |
| 412 | s.logs = s.logs[:0] |
| 413 | s.numDroppedLogs = 0 |
| 414 | s.references = s.references[:0] |
| 415 | } |
| 416 | |
| 417 | func (s *Span) serviceName() string { |
| 418 | return s.tracer.serviceName |
| 419 | } |
| 420 | |
| 421 | func (s *Span) applySamplingDecision(decision SamplingDecision, lock bool) { |
| 422 | if !decision.Retryable { |
| 423 | s.context.samplingState.setFinal() |
| 424 | } |
| 425 | if decision.Sample { |
| 426 | s.context.samplingState.setSampled() |
| 427 | if len(decision.Tags) > 0 { |
| 428 | if lock { |
| 429 | s.Lock() |
| 430 | defer s.Unlock() |
| 431 | } |
| 432 | for _, tag := range decision.Tags { |
| 433 | s.appendTagNoLocking(tag.key, tag.value) |
| 434 | } |
| 435 | } |
| 436 | } |
| 437 | } |
| 438 | |
| 439 | // Span can be written to if it is sampled or the sampling decision has not been finalized. |
| 440 | func (s *Span) isWriteable() bool { |
| 441 | state := s.context.samplingState |
| 442 | return !state.isFinal() || state.isSampled() |
| 443 | } |
| 444 | |
| 445 | func (s *Span) isSamplingFinalized() bool { |
| 446 | return s.context.samplingState.isFinal() |
| 447 | } |
| 448 | |
| 449 | // setSamplingPriority returns true if the flag was updated successfully, false otherwise. |
| 450 | // The behavior of setSamplingPriority is surprising |
| 451 | // If noDebugFlagOnForcedSampling is set |
| 452 | // setSamplingPriority(span, 1) always sets only flagSampled |
| 453 | // If noDebugFlagOnForcedSampling is unset, and isDebugAllowed passes |
| 454 | // setSamplingPriority(span, 1) sets both flagSampled and flagDebug |
| 455 | // However, |
| 456 | // setSamplingPriority(span, 0) always only resets flagSampled |
| 457 | // |
| 458 | // This means that doing a setSamplingPriority(span, 1) followed by setSamplingPriority(span, 0) can |
| 459 | // leave flagDebug set |
| 460 | func setSamplingPriority(s *Span, value interface{}) bool { |
| 461 | val, ok := value.(uint16) |
| 462 | if !ok { |
| 463 | return false |
| 464 | } |
| 465 | if val == 0 { |
| 466 | s.context.samplingState.unsetSampled() |
| 467 | s.context.samplingState.setFinal() |
| 468 | return true |
| 469 | } |
| 470 | if s.tracer.options.noDebugFlagOnForcedSampling { |
| 471 | s.context.samplingState.setSampled() |
| 472 | s.context.samplingState.setFinal() |
| 473 | return true |
| 474 | } else if s.tracer.isDebugAllowed(s.operationName) { |
| 475 | s.context.samplingState.setDebugAndSampled() |
| 476 | s.context.samplingState.setFinal() |
| 477 | return true |
| 478 | } |
| 479 | return false |
| 480 | } |
| 481 | |
| 482 | // EnableFirehose enables firehose flag on the span context |
| 483 | func EnableFirehose(s *Span) { |
| 484 | s.Lock() |
| 485 | defer s.Unlock() |
| 486 | s.context.samplingState.setFirehose() |
| 487 | } |