blob: 997cffdd88ff533fd4778d0e3662bc8c43fd6a9e [file] [log] [blame]
khenaidooc6c7bda2020-06-17 17:20:18 -04001// Copyright (c) 2017-2018 Uber Technologies, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package jaeger
16
17import (
18 "sync"
19 "sync/atomic"
20 "time"
21
22 "github.com/opentracing/opentracing-go"
23 "github.com/opentracing/opentracing-go/ext"
24 "github.com/opentracing/opentracing-go/log"
25)
26
27// Span implements opentracing.Span
28type Span struct {
29 // referenceCounter used to increase the lifetime of
30 // the object before return it into the pool.
31 referenceCounter int32
32
33 sync.RWMutex
34
35 tracer *Tracer
36
37 // TODO: (breaking change) change to use a pointer
38 context SpanContext
39
40 // The name of the "operation" this span is an instance of.
41 // Known as a "span name" in some implementations.
42 operationName string
43
44 // firstInProcess, if true, indicates that this span is the root of the (sub)tree
45 // of spans in the current process. In other words it's true for the root spans,
46 // and the ingress spans when the process joins another trace.
47 firstInProcess bool
48
49 // startTime is the timestamp indicating when the span began, with microseconds precision.
50 startTime time.Time
51
52 // duration returns duration of the span with microseconds precision.
53 // Zero value means duration is unknown.
54 duration time.Duration
55
56 // tags attached to this span
57 tags []Tag
58
59 // The span's "micro-log"
60 logs []opentracing.LogRecord
61
62 // The number of logs dropped because of MaxLogsPerSpan.
63 numDroppedLogs int
64
65 // references for this span
66 references []Reference
67
68 observer ContribSpanObserver
69}
70
71// Tag is a simple key value wrapper.
72// TODO (breaking change) deprecate in the next major release, use opentracing.Tag instead.
73type Tag struct {
74 key string
75 value interface{}
76}
77
78// NewTag creates a new Tag.
79// TODO (breaking change) deprecate in the next major release, use opentracing.Tag instead.
80func NewTag(key string, value interface{}) Tag {
81 return Tag{key: key, value: value}
82}
83
84// SetOperationName sets or changes the operation name.
85func (s *Span) SetOperationName(operationName string) opentracing.Span {
86 s.Lock()
87 s.operationName = operationName
khenaidood948f772021-08-11 17:49:24 -040088 ctx := s.context
khenaidooc6c7bda2020-06-17 17:20:18 -040089 s.Unlock()
khenaidood948f772021-08-11 17:49:24 -040090 if !ctx.isSamplingFinalized() {
khenaidooc6c7bda2020-06-17 17:20:18 -040091 decision := s.tracer.sampler.OnSetOperationName(s, operationName)
92 s.applySamplingDecision(decision, true)
93 }
94 s.observer.OnSetOperationName(operationName)
95 return s
96}
97
98// SetTag implements SetTag() of opentracing.Span
99func (s *Span) SetTag(key string, value interface{}) opentracing.Span {
100 return s.setTagInternal(key, value, true)
101}
102
103func (s *Span) setTagInternal(key string, value interface{}, lock bool) opentracing.Span {
khenaidood948f772021-08-11 17:49:24 -0400104 var ctx SpanContext
105 var operationName string
106 if lock {
107 ctx = s.SpanContext()
108 operationName = s.OperationName()
109 } else {
110 ctx = s.context
111 operationName = s.operationName
112 }
113
khenaidooc6c7bda2020-06-17 17:20:18 -0400114 s.observer.OnSetTag(key, value)
khenaidood948f772021-08-11 17:49:24 -0400115 if key == string(ext.SamplingPriority) && !setSamplingPriority(ctx.samplingState, operationName, s.tracer, value) {
khenaidooc6c7bda2020-06-17 17:20:18 -0400116 return s
117 }
khenaidood948f772021-08-11 17:49:24 -0400118 if !ctx.isSamplingFinalized() {
khenaidooc6c7bda2020-06-17 17:20:18 -0400119 decision := s.tracer.sampler.OnSetTag(s, key, value)
120 s.applySamplingDecision(decision, lock)
121 }
khenaidood948f772021-08-11 17:49:24 -0400122 if ctx.isWriteable() {
khenaidooc6c7bda2020-06-17 17:20:18 -0400123 if lock {
124 s.Lock()
125 defer s.Unlock()
126 }
127 s.appendTagNoLocking(key, value)
128 }
129 return s
130}
131
132// SpanContext returns span context
133func (s *Span) SpanContext() SpanContext {
134 s.Lock()
135 defer s.Unlock()
136 return s.context
137}
138
139// StartTime returns span start time
140func (s *Span) StartTime() time.Time {
141 s.Lock()
142 defer s.Unlock()
143 return s.startTime
144}
145
146// Duration returns span duration
147func (s *Span) Duration() time.Duration {
148 s.Lock()
149 defer s.Unlock()
150 return s.duration
151}
152
153// Tags returns tags for span
154func (s *Span) Tags() opentracing.Tags {
155 s.Lock()
156 defer s.Unlock()
157 var result = make(opentracing.Tags, len(s.tags))
158 for _, tag := range s.tags {
159 result[tag.key] = tag.value
160 }
161 return result
162}
163
164// Logs returns micro logs for span
165func (s *Span) Logs() []opentracing.LogRecord {
166 s.Lock()
167 defer s.Unlock()
168
169 logs := append([]opentracing.LogRecord(nil), s.logs...)
170 if s.numDroppedLogs != 0 {
171 fixLogs(logs, s.numDroppedLogs)
172 }
173
174 return logs
175}
176
177// References returns references for this span
178func (s *Span) References() []opentracing.SpanReference {
179 s.Lock()
180 defer s.Unlock()
181
182 if s.references == nil || len(s.references) == 0 {
183 return nil
184 }
185
186 result := make([]opentracing.SpanReference, len(s.references))
187 for i, r := range s.references {
188 result[i] = opentracing.SpanReference{Type: r.Type, ReferencedContext: r.Context}
189 }
190 return result
191}
192
193func (s *Span) appendTagNoLocking(key string, value interface{}) {
194 s.tags = append(s.tags, Tag{key: key, value: value})
195}
196
197// LogFields implements opentracing.Span API
198func (s *Span) LogFields(fields ...log.Field) {
199 s.Lock()
200 defer s.Unlock()
201 if !s.context.IsSampled() {
202 return
203 }
204 s.logFieldsNoLocking(fields...)
205}
206
207// this function should only be called while holding a Write lock
208func (s *Span) logFieldsNoLocking(fields ...log.Field) {
209 lr := opentracing.LogRecord{
210 Fields: fields,
211 Timestamp: time.Now(),
212 }
213 s.appendLogNoLocking(lr)
214}
215
216// LogKV implements opentracing.Span API
217func (s *Span) LogKV(alternatingKeyValues ...interface{}) {
218 s.RLock()
219 sampled := s.context.IsSampled()
220 s.RUnlock()
221 if !sampled {
222 return
223 }
224 fields, err := log.InterleavedKVToFields(alternatingKeyValues...)
225 if err != nil {
226 s.LogFields(log.Error(err), log.String("function", "LogKV"))
227 return
228 }
229 s.LogFields(fields...)
230}
231
232// LogEvent implements opentracing.Span API
233func (s *Span) LogEvent(event string) {
234 s.Log(opentracing.LogData{Event: event})
235}
236
237// LogEventWithPayload implements opentracing.Span API
238func (s *Span) LogEventWithPayload(event string, payload interface{}) {
239 s.Log(opentracing.LogData{Event: event, Payload: payload})
240}
241
242// Log implements opentracing.Span API
243func (s *Span) Log(ld opentracing.LogData) {
244 s.Lock()
245 defer s.Unlock()
246 if s.context.IsSampled() {
247 if ld.Timestamp.IsZero() {
248 ld.Timestamp = s.tracer.timeNow()
249 }
250 s.appendLogNoLocking(ld.ToLogRecord())
251 }
252}
253
254// this function should only be called while holding a Write lock
255func (s *Span) appendLogNoLocking(lr opentracing.LogRecord) {
256 maxLogs := s.tracer.options.maxLogsPerSpan
257 if maxLogs == 0 || len(s.logs) < maxLogs {
258 s.logs = append(s.logs, lr)
259 return
260 }
261
262 // We have too many logs. We don't touch the first numOld logs; we treat the
263 // rest as a circular buffer and overwrite the oldest log among those.
264 numOld := (maxLogs - 1) / 2
265 numNew := maxLogs - numOld
266 s.logs[numOld+s.numDroppedLogs%numNew] = lr
267 s.numDroppedLogs++
268}
269
270// rotateLogBuffer rotates the records in the buffer: records 0 to pos-1 move at
271// the end (i.e. pos circular left shifts).
272func rotateLogBuffer(buf []opentracing.LogRecord, pos int) {
273 // This algorithm is described in:
274 // http://www.cplusplus.com/reference/algorithm/rotate
275 for first, middle, next := 0, pos, pos; first != middle; {
276 buf[first], buf[next] = buf[next], buf[first]
277 first++
278 next++
279 if next == len(buf) {
280 next = middle
281 } else if first == middle {
282 middle = next
283 }
284 }
285}
286
287func fixLogs(logs []opentracing.LogRecord, numDroppedLogs int) {
288 // We dropped some log events, which means that we used part of Logs as a
289 // circular buffer (see appendLog). De-circularize it.
290 numOld := (len(logs) - 1) / 2
291 numNew := len(logs) - numOld
292 rotateLogBuffer(logs[numOld:], numDroppedLogs%numNew)
293
294 // Replace the log in the middle (the oldest "new" log) with information
295 // about the dropped logs. This means that we are effectively dropping one
296 // more "new" log.
297 numDropped := numDroppedLogs + 1
298 logs[numOld] = opentracing.LogRecord{
299 // Keep the timestamp of the last dropped event.
300 Timestamp: logs[numOld].Timestamp,
301 Fields: []log.Field{
302 log.String("event", "dropped Span logs"),
303 log.Int("dropped_log_count", numDropped),
304 log.String("component", "jaeger-client"),
305 },
306 }
307}
308
309func (s *Span) fixLogsIfDropped() {
310 if s.numDroppedLogs == 0 {
311 return
312 }
313 fixLogs(s.logs, s.numDroppedLogs)
314 s.numDroppedLogs = 0
315}
316
khenaidood948f772021-08-11 17:49:24 -0400317// SetBaggageItem implements SetBaggageItem() of opentracing.SpanContext.
318// The call is proxied via tracer.baggageSetter to allow policies to be applied
319// before allowing to set/replace baggage keys.
320// The setter eventually stores a new SpanContext with extended baggage:
321//
322// span.context = span.context.WithBaggageItem(key, value)
323//
324// See SpanContext.WithBaggageItem() for explanation why it's done this way.
khenaidooc6c7bda2020-06-17 17:20:18 -0400325func (s *Span) SetBaggageItem(key, value string) opentracing.Span {
326 s.Lock()
327 defer s.Unlock()
328 s.tracer.setBaggage(s, key, value)
329 return s
330}
331
332// BaggageItem implements BaggageItem() of opentracing.SpanContext
333func (s *Span) BaggageItem(key string) string {
334 s.RLock()
335 defer s.RUnlock()
336 return s.context.baggage[key]
337}
338
339// Finish implements opentracing.Span API
340// After finishing the Span object it returns back to the allocator unless the reporter retains it again,
341// so after that, the Span object should no longer be used because it won't be valid anymore.
342func (s *Span) Finish() {
343 s.FinishWithOptions(opentracing.FinishOptions{})
344}
345
346// FinishWithOptions implements opentracing.Span API
347func (s *Span) FinishWithOptions(options opentracing.FinishOptions) {
348 if options.FinishTime.IsZero() {
349 options.FinishTime = s.tracer.timeNow()
350 }
351 s.observer.OnFinish(options)
352 s.Lock()
353 s.duration = options.FinishTime.Sub(s.startTime)
khenaidood948f772021-08-11 17:49:24 -0400354 ctx := s.context
khenaidooc6c7bda2020-06-17 17:20:18 -0400355 s.Unlock()
khenaidood948f772021-08-11 17:49:24 -0400356 if !ctx.isSamplingFinalized() {
khenaidooc6c7bda2020-06-17 17:20:18 -0400357 decision := s.tracer.sampler.OnFinishSpan(s)
358 s.applySamplingDecision(decision, true)
359 }
khenaidood948f772021-08-11 17:49:24 -0400360 if ctx.IsSampled() {
khenaidooc6c7bda2020-06-17 17:20:18 -0400361 s.Lock()
362 s.fixLogsIfDropped()
363 if len(options.LogRecords) > 0 || len(options.BulkLogData) > 0 {
364 // Note: bulk logs are not subject to maxLogsPerSpan limit
365 if options.LogRecords != nil {
366 s.logs = append(s.logs, options.LogRecords...)
367 }
368 for _, ld := range options.BulkLogData {
369 s.logs = append(s.logs, ld.ToLogRecord())
370 }
371 }
372 s.Unlock()
373 }
374 // call reportSpan even for non-sampled traces, to return span to the pool
375 // and update metrics counter
376 s.tracer.reportSpan(s)
377}
378
379// Context implements opentracing.Span API
380func (s *Span) Context() opentracing.SpanContext {
381 s.Lock()
382 defer s.Unlock()
383 return s.context
384}
385
386// Tracer implements opentracing.Span API
387func (s *Span) Tracer() opentracing.Tracer {
388 return s.tracer
389}
390
391func (s *Span) String() string {
392 s.RLock()
393 defer s.RUnlock()
394 return s.context.String()
395}
396
397// OperationName allows retrieving current operation name.
398func (s *Span) OperationName() string {
399 s.RLock()
400 defer s.RUnlock()
401 return s.operationName
402}
403
404// Retain increases object counter to increase the lifetime of the object
405func (s *Span) Retain() *Span {
406 atomic.AddInt32(&s.referenceCounter, 1)
407 return s
408}
409
410// Release decrements object counter and return to the
411// allocator manager when counter will below zero
412func (s *Span) Release() {
413 if atomic.AddInt32(&s.referenceCounter, -1) == -1 {
414 s.tracer.spanAllocator.Put(s)
415 }
416}
417
418// reset span state and release unused data
419func (s *Span) reset() {
420 s.firstInProcess = false
421 s.context = emptyContext
422 s.operationName = ""
423 s.tracer = nil
424 s.startTime = time.Time{}
425 s.duration = 0
426 s.observer = nil
427 atomic.StoreInt32(&s.referenceCounter, 0)
428
429 // Note: To reuse memory we can save the pointers on the heap
430 s.tags = s.tags[:0]
431 s.logs = s.logs[:0]
432 s.numDroppedLogs = 0
433 s.references = s.references[:0]
434}
435
436func (s *Span) serviceName() string {
437 return s.tracer.serviceName
438}
439
440func (s *Span) applySamplingDecision(decision SamplingDecision, lock bool) {
khenaidood948f772021-08-11 17:49:24 -0400441 var ctx SpanContext
442 if lock {
443 ctx = s.SpanContext()
444 } else {
445 ctx = s.context
446 }
447
khenaidooc6c7bda2020-06-17 17:20:18 -0400448 if !decision.Retryable {
khenaidood948f772021-08-11 17:49:24 -0400449 ctx.samplingState.setFinal()
khenaidooc6c7bda2020-06-17 17:20:18 -0400450 }
451 if decision.Sample {
khenaidood948f772021-08-11 17:49:24 -0400452 ctx.samplingState.setSampled()
khenaidooc6c7bda2020-06-17 17:20:18 -0400453 if len(decision.Tags) > 0 {
454 if lock {
455 s.Lock()
456 defer s.Unlock()
457 }
458 for _, tag := range decision.Tags {
459 s.appendTagNoLocking(tag.key, tag.value)
460 }
461 }
462 }
463}
464
khenaidooc6c7bda2020-06-17 17:20:18 -0400465// setSamplingPriority returns true if the flag was updated successfully, false otherwise.
466// The behavior of setSamplingPriority is surprising
467// If noDebugFlagOnForcedSampling is set
khenaidood948f772021-08-11 17:49:24 -0400468// setSamplingPriority(..., 1) always sets only flagSampled
khenaidooc6c7bda2020-06-17 17:20:18 -0400469// If noDebugFlagOnForcedSampling is unset, and isDebugAllowed passes
khenaidood948f772021-08-11 17:49:24 -0400470// setSamplingPriority(..., 1) sets both flagSampled and flagDebug
khenaidooc6c7bda2020-06-17 17:20:18 -0400471// However,
khenaidood948f772021-08-11 17:49:24 -0400472// setSamplingPriority(..., 0) always only resets flagSampled
khenaidooc6c7bda2020-06-17 17:20:18 -0400473//
khenaidood948f772021-08-11 17:49:24 -0400474// This means that doing a setSamplingPriority(..., 1) followed by setSamplingPriority(..., 0) can
khenaidooc6c7bda2020-06-17 17:20:18 -0400475// leave flagDebug set
khenaidood948f772021-08-11 17:49:24 -0400476func setSamplingPriority(state *samplingState, operationName string, tracer *Tracer, value interface{}) bool {
khenaidooc6c7bda2020-06-17 17:20:18 -0400477 val, ok := value.(uint16)
478 if !ok {
479 return false
480 }
481 if val == 0 {
khenaidood948f772021-08-11 17:49:24 -0400482 state.unsetSampled()
483 state.setFinal()
khenaidooc6c7bda2020-06-17 17:20:18 -0400484 return true
485 }
khenaidood948f772021-08-11 17:49:24 -0400486 if tracer.options.noDebugFlagOnForcedSampling {
487 state.setSampled()
488 state.setFinal()
khenaidooc6c7bda2020-06-17 17:20:18 -0400489 return true
khenaidood948f772021-08-11 17:49:24 -0400490 } else if tracer.isDebugAllowed(operationName) {
491 state.setDebugAndSampled()
492 state.setFinal()
khenaidooc6c7bda2020-06-17 17:20:18 -0400493 return true
494 }
495 return false
496}
497
498// EnableFirehose enables firehose flag on the span context
499func EnableFirehose(s *Span) {
500 s.Lock()
501 defer s.Unlock()
502 s.context.samplingState.setFirehose()
503}