blob: 42c9112c07603ecaac62864f7f024c379342c41c [file] [log] [blame]
mpagenkoaf801632020-07-03 10:00:42 +00001// Copyright (c) 2017-2018 Uber Technologies, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package jaeger
16
17import (
18 "sync"
19 "sync/atomic"
20 "time"
21
22 "github.com/opentracing/opentracing-go"
23 "github.com/opentracing/opentracing-go/ext"
24 "github.com/opentracing/opentracing-go/log"
25)
26
27// Span implements opentracing.Span
28type Span struct {
29 // referenceCounter used to increase the lifetime of
30 // the object before return it into the pool.
31 referenceCounter int32
32
33 sync.RWMutex
34
35 tracer *Tracer
36
37 // TODO: (breaking change) change to use a pointer
38 context SpanContext
39
40 // The name of the "operation" this span is an instance of.
41 // Known as a "span name" in some implementations.
42 operationName string
43
44 // firstInProcess, if true, indicates that this span is the root of the (sub)tree
45 // of spans in the current process. In other words it's true for the root spans,
46 // and the ingress spans when the process joins another trace.
47 firstInProcess bool
48
49 // startTime is the timestamp indicating when the span began, with microseconds precision.
50 startTime time.Time
51
52 // duration returns duration of the span with microseconds precision.
53 // Zero value means duration is unknown.
54 duration time.Duration
55
56 // tags attached to this span
57 tags []Tag
58
59 // The span's "micro-log"
60 logs []opentracing.LogRecord
61
62 // The number of logs dropped because of MaxLogsPerSpan.
63 numDroppedLogs int
64
65 // references for this span
66 references []Reference
67
68 observer ContribSpanObserver
69}
70
71// Tag is a simple key value wrapper.
72// TODO (breaking change) deprecate in the next major release, use opentracing.Tag instead.
73type Tag struct {
74 key string
75 value interface{}
76}
77
78// NewTag creates a new Tag.
79// TODO (breaking change) deprecate in the next major release, use opentracing.Tag instead.
80func NewTag(key string, value interface{}) Tag {
81 return Tag{key: key, value: value}
82}
83
84// SetOperationName sets or changes the operation name.
85func (s *Span) SetOperationName(operationName string) opentracing.Span {
86 s.Lock()
87 s.operationName = operationName
88 s.Unlock()
89 if !s.isSamplingFinalized() {
90 decision := s.tracer.sampler.OnSetOperationName(s, operationName)
91 s.applySamplingDecision(decision, true)
92 }
93 s.observer.OnSetOperationName(operationName)
94 return s
95}
96
97// SetTag implements SetTag() of opentracing.Span
98func (s *Span) SetTag(key string, value interface{}) opentracing.Span {
99 return s.setTagInternal(key, value, true)
100}
101
102func (s *Span) setTagInternal(key string, value interface{}, lock bool) opentracing.Span {
103 s.observer.OnSetTag(key, value)
104 if key == string(ext.SamplingPriority) && !setSamplingPriority(s, value) {
105 return s
106 }
107 if !s.isSamplingFinalized() {
108 decision := s.tracer.sampler.OnSetTag(s, key, value)
109 s.applySamplingDecision(decision, lock)
110 }
111 if s.isWriteable() {
112 if lock {
113 s.Lock()
114 defer s.Unlock()
115 }
116 s.appendTagNoLocking(key, value)
117 }
118 return s
119}
120
121// SpanContext returns span context
122func (s *Span) SpanContext() SpanContext {
123 s.Lock()
124 defer s.Unlock()
125 return s.context
126}
127
128// StartTime returns span start time
129func (s *Span) StartTime() time.Time {
130 s.Lock()
131 defer s.Unlock()
132 return s.startTime
133}
134
135// Duration returns span duration
136func (s *Span) Duration() time.Duration {
137 s.Lock()
138 defer s.Unlock()
139 return s.duration
140}
141
142// Tags returns tags for span
143func (s *Span) Tags() opentracing.Tags {
144 s.Lock()
145 defer s.Unlock()
146 var result = make(opentracing.Tags, len(s.tags))
147 for _, tag := range s.tags {
148 result[tag.key] = tag.value
149 }
150 return result
151}
152
153// Logs returns micro logs for span
154func (s *Span) Logs() []opentracing.LogRecord {
155 s.Lock()
156 defer s.Unlock()
157
158 logs := append([]opentracing.LogRecord(nil), s.logs...)
159 if s.numDroppedLogs != 0 {
160 fixLogs(logs, s.numDroppedLogs)
161 }
162
163 return logs
164}
165
166// References returns references for this span
167func (s *Span) References() []opentracing.SpanReference {
168 s.Lock()
169 defer s.Unlock()
170
171 if s.references == nil || len(s.references) == 0 {
172 return nil
173 }
174
175 result := make([]opentracing.SpanReference, len(s.references))
176 for i, r := range s.references {
177 result[i] = opentracing.SpanReference{Type: r.Type, ReferencedContext: r.Context}
178 }
179 return result
180}
181
182func (s *Span) appendTagNoLocking(key string, value interface{}) {
183 s.tags = append(s.tags, Tag{key: key, value: value})
184}
185
186// LogFields implements opentracing.Span API
187func (s *Span) LogFields(fields ...log.Field) {
188 s.Lock()
189 defer s.Unlock()
190 if !s.context.IsSampled() {
191 return
192 }
193 s.logFieldsNoLocking(fields...)
194}
195
196// this function should only be called while holding a Write lock
197func (s *Span) logFieldsNoLocking(fields ...log.Field) {
198 lr := opentracing.LogRecord{
199 Fields: fields,
200 Timestamp: time.Now(),
201 }
202 s.appendLogNoLocking(lr)
203}
204
205// LogKV implements opentracing.Span API
206func (s *Span) LogKV(alternatingKeyValues ...interface{}) {
207 s.RLock()
208 sampled := s.context.IsSampled()
209 s.RUnlock()
210 if !sampled {
211 return
212 }
213 fields, err := log.InterleavedKVToFields(alternatingKeyValues...)
214 if err != nil {
215 s.LogFields(log.Error(err), log.String("function", "LogKV"))
216 return
217 }
218 s.LogFields(fields...)
219}
220
221// LogEvent implements opentracing.Span API
222func (s *Span) LogEvent(event string) {
223 s.Log(opentracing.LogData{Event: event})
224}
225
226// LogEventWithPayload implements opentracing.Span API
227func (s *Span) LogEventWithPayload(event string, payload interface{}) {
228 s.Log(opentracing.LogData{Event: event, Payload: payload})
229}
230
231// Log implements opentracing.Span API
232func (s *Span) Log(ld opentracing.LogData) {
233 s.Lock()
234 defer s.Unlock()
235 if s.context.IsSampled() {
236 if ld.Timestamp.IsZero() {
237 ld.Timestamp = s.tracer.timeNow()
238 }
239 s.appendLogNoLocking(ld.ToLogRecord())
240 }
241}
242
243// this function should only be called while holding a Write lock
244func (s *Span) appendLogNoLocking(lr opentracing.LogRecord) {
245 maxLogs := s.tracer.options.maxLogsPerSpan
246 if maxLogs == 0 || len(s.logs) < maxLogs {
247 s.logs = append(s.logs, lr)
248 return
249 }
250
251 // We have too many logs. We don't touch the first numOld logs; we treat the
252 // rest as a circular buffer and overwrite the oldest log among those.
253 numOld := (maxLogs - 1) / 2
254 numNew := maxLogs - numOld
255 s.logs[numOld+s.numDroppedLogs%numNew] = lr
256 s.numDroppedLogs++
257}
258
259// rotateLogBuffer rotates the records in the buffer: records 0 to pos-1 move at
260// the end (i.e. pos circular left shifts).
261func rotateLogBuffer(buf []opentracing.LogRecord, pos int) {
262 // This algorithm is described in:
263 // http://www.cplusplus.com/reference/algorithm/rotate
264 for first, middle, next := 0, pos, pos; first != middle; {
265 buf[first], buf[next] = buf[next], buf[first]
266 first++
267 next++
268 if next == len(buf) {
269 next = middle
270 } else if first == middle {
271 middle = next
272 }
273 }
274}
275
276func fixLogs(logs []opentracing.LogRecord, numDroppedLogs int) {
277 // We dropped some log events, which means that we used part of Logs as a
278 // circular buffer (see appendLog). De-circularize it.
279 numOld := (len(logs) - 1) / 2
280 numNew := len(logs) - numOld
281 rotateLogBuffer(logs[numOld:], numDroppedLogs%numNew)
282
283 // Replace the log in the middle (the oldest "new" log) with information
284 // about the dropped logs. This means that we are effectively dropping one
285 // more "new" log.
286 numDropped := numDroppedLogs + 1
287 logs[numOld] = opentracing.LogRecord{
288 // Keep the timestamp of the last dropped event.
289 Timestamp: logs[numOld].Timestamp,
290 Fields: []log.Field{
291 log.String("event", "dropped Span logs"),
292 log.Int("dropped_log_count", numDropped),
293 log.String("component", "jaeger-client"),
294 },
295 }
296}
297
298func (s *Span) fixLogsIfDropped() {
299 if s.numDroppedLogs == 0 {
300 return
301 }
302 fixLogs(s.logs, s.numDroppedLogs)
303 s.numDroppedLogs = 0
304}
305
306// SetBaggageItem implements SetBaggageItem() of opentracing.SpanContext
307func (s *Span) SetBaggageItem(key, value string) opentracing.Span {
308 s.Lock()
309 defer s.Unlock()
310 s.tracer.setBaggage(s, key, value)
311 return s
312}
313
314// BaggageItem implements BaggageItem() of opentracing.SpanContext
315func (s *Span) BaggageItem(key string) string {
316 s.RLock()
317 defer s.RUnlock()
318 return s.context.baggage[key]
319}
320
321// Finish implements opentracing.Span API
322// After finishing the Span object it returns back to the allocator unless the reporter retains it again,
323// so after that, the Span object should no longer be used because it won't be valid anymore.
324func (s *Span) Finish() {
325 s.FinishWithOptions(opentracing.FinishOptions{})
326}
327
328// FinishWithOptions implements opentracing.Span API
329func (s *Span) FinishWithOptions(options opentracing.FinishOptions) {
330 if options.FinishTime.IsZero() {
331 options.FinishTime = s.tracer.timeNow()
332 }
333 s.observer.OnFinish(options)
334 s.Lock()
335 s.duration = options.FinishTime.Sub(s.startTime)
336 s.Unlock()
337 if !s.isSamplingFinalized() {
338 decision := s.tracer.sampler.OnFinishSpan(s)
339 s.applySamplingDecision(decision, true)
340 }
341 if s.context.IsSampled() {
342 s.Lock()
343 s.fixLogsIfDropped()
344 if len(options.LogRecords) > 0 || len(options.BulkLogData) > 0 {
345 // Note: bulk logs are not subject to maxLogsPerSpan limit
346 if options.LogRecords != nil {
347 s.logs = append(s.logs, options.LogRecords...)
348 }
349 for _, ld := range options.BulkLogData {
350 s.logs = append(s.logs, ld.ToLogRecord())
351 }
352 }
353 s.Unlock()
354 }
355 // call reportSpan even for non-sampled traces, to return span to the pool
356 // and update metrics counter
357 s.tracer.reportSpan(s)
358}
359
360// Context implements opentracing.Span API
361func (s *Span) Context() opentracing.SpanContext {
362 s.Lock()
363 defer s.Unlock()
364 return s.context
365}
366
367// Tracer implements opentracing.Span API
368func (s *Span) Tracer() opentracing.Tracer {
369 return s.tracer
370}
371
372func (s *Span) String() string {
373 s.RLock()
374 defer s.RUnlock()
375 return s.context.String()
376}
377
378// OperationName allows retrieving current operation name.
379func (s *Span) OperationName() string {
380 s.RLock()
381 defer s.RUnlock()
382 return s.operationName
383}
384
385// Retain increases object counter to increase the lifetime of the object
386func (s *Span) Retain() *Span {
387 atomic.AddInt32(&s.referenceCounter, 1)
388 return s
389}
390
391// Release decrements object counter and return to the
392// allocator manager when counter will below zero
393func (s *Span) Release() {
394 if atomic.AddInt32(&s.referenceCounter, -1) == -1 {
395 s.tracer.spanAllocator.Put(s)
396 }
397}
398
399// reset span state and release unused data
400func (s *Span) reset() {
401 s.firstInProcess = false
402 s.context = emptyContext
403 s.operationName = ""
404 s.tracer = nil
405 s.startTime = time.Time{}
406 s.duration = 0
407 s.observer = nil
408 atomic.StoreInt32(&s.referenceCounter, 0)
409
410 // Note: To reuse memory we can save the pointers on the heap
411 s.tags = s.tags[:0]
412 s.logs = s.logs[:0]
413 s.numDroppedLogs = 0
414 s.references = s.references[:0]
415}
416
417func (s *Span) serviceName() string {
418 return s.tracer.serviceName
419}
420
421func (s *Span) applySamplingDecision(decision SamplingDecision, lock bool) {
422 if !decision.Retryable {
423 s.context.samplingState.setFinal()
424 }
425 if decision.Sample {
426 s.context.samplingState.setSampled()
427 if len(decision.Tags) > 0 {
428 if lock {
429 s.Lock()
430 defer s.Unlock()
431 }
432 for _, tag := range decision.Tags {
433 s.appendTagNoLocking(tag.key, tag.value)
434 }
435 }
436 }
437}
438
439// Span can be written to if it is sampled or the sampling decision has not been finalized.
440func (s *Span) isWriteable() bool {
441 state := s.context.samplingState
442 return !state.isFinal() || state.isSampled()
443}
444
445func (s *Span) isSamplingFinalized() bool {
446 return s.context.samplingState.isFinal()
447}
448
449// setSamplingPriority returns true if the flag was updated successfully, false otherwise.
450// The behavior of setSamplingPriority is surprising
451// If noDebugFlagOnForcedSampling is set
452// setSamplingPriority(span, 1) always sets only flagSampled
453// If noDebugFlagOnForcedSampling is unset, and isDebugAllowed passes
454// setSamplingPriority(span, 1) sets both flagSampled and flagDebug
455// However,
456// setSamplingPriority(span, 0) always only resets flagSampled
457//
458// This means that doing a setSamplingPriority(span, 1) followed by setSamplingPriority(span, 0) can
459// leave flagDebug set
460func setSamplingPriority(s *Span, value interface{}) bool {
461 val, ok := value.(uint16)
462 if !ok {
463 return false
464 }
465 if val == 0 {
466 s.context.samplingState.unsetSampled()
467 s.context.samplingState.setFinal()
468 return true
469 }
470 if s.tracer.options.noDebugFlagOnForcedSampling {
471 s.context.samplingState.setSampled()
472 s.context.samplingState.setFinal()
473 return true
474 } else if s.tracer.isDebugAllowed(s.operationName) {
475 s.context.samplingState.setDebugAndSampled()
476 s.context.samplingState.setFinal()
477 return true
478 }
479 return false
480}
481
482// EnableFirehose enables firehose flag on the span context
483func EnableFirehose(s *Span) {
484 s.Lock()
485 defer s.Unlock()
486 s.context.samplingState.setFirehose()
487}