blob: a71a92c3e84ec3e4d077c87e67eed2842bdb30bd [file] [log] [blame]
Girish Gowdra631ef3d2020-06-15 10:45:52 -07001// Copyright (c) 2017 Uber Technologies, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package jaeger
16
17import (
18 "fmt"
19 "sync"
20 "sync/atomic"
21 "time"
22
23 "github.com/opentracing/opentracing-go"
24
25 "github.com/uber/jaeger-client-go/internal/reporterstats"
26 "github.com/uber/jaeger-client-go/log"
27)
28
29// Reporter is called by the tracer when a span is completed to report the span to the tracing collector.
30type Reporter interface {
31 // Report submits a new span to collectors, possibly asynchronously and/or with buffering.
32 // If the reporter is processing Span asynchronously then it needs to Retain() the span,
33 // and then Release() it when no longer needed, to avoid span data corruption.
34 Report(span *Span)
35
36 // Close does a clean shutdown of the reporter, flushing any traces that may be buffered in memory.
37 Close()
38}
39
40// ------------------------------
41
42type nullReporter struct{}
43
44// NewNullReporter creates a no-op reporter that ignores all reported spans.
45func NewNullReporter() Reporter {
46 return &nullReporter{}
47}
48
49// Report implements Report() method of Reporter by doing nothing.
50func (r *nullReporter) Report(span *Span) {
51 // no-op
52}
53
54// Close implements Close() method of Reporter by doing nothing.
55func (r *nullReporter) Close() {
56 // no-op
57}
58
59// ------------------------------
60
61type loggingReporter struct {
62 logger Logger
63}
64
65// NewLoggingReporter creates a reporter that logs all reported spans to provided logger.
66func NewLoggingReporter(logger Logger) Reporter {
67 return &loggingReporter{logger}
68}
69
70// Report implements Report() method of Reporter by logging the span to the logger.
71func (r *loggingReporter) Report(span *Span) {
72 r.logger.Infof("Reporting span %+v", span)
73}
74
75// Close implements Close() method of Reporter by doing nothing.
76func (r *loggingReporter) Close() {
77 // no-op
78}
79
80// ------------------------------
81
82// InMemoryReporter is used for testing, and simply collects spans in memory.
83type InMemoryReporter struct {
84 spans []opentracing.Span
85 lock sync.Mutex
86}
87
88// NewInMemoryReporter creates a reporter that stores spans in memory.
89// NOTE: the Tracer should be created with options.PoolSpans = false.
90func NewInMemoryReporter() *InMemoryReporter {
91 return &InMemoryReporter{
92 spans: make([]opentracing.Span, 0, 10),
93 }
94}
95
96// Report implements Report() method of Reporter by storing the span in the buffer.
97func (r *InMemoryReporter) Report(span *Span) {
98 r.lock.Lock()
99 // Need to retain the span otherwise it will be released
100 r.spans = append(r.spans, span.Retain())
101 r.lock.Unlock()
102}
103
104// Close implements Close() method of Reporter
105func (r *InMemoryReporter) Close() {
106 r.Reset()
107}
108
109// SpansSubmitted returns the number of spans accumulated in the buffer.
110func (r *InMemoryReporter) SpansSubmitted() int {
111 r.lock.Lock()
112 defer r.lock.Unlock()
113 return len(r.spans)
114}
115
116// GetSpans returns accumulated spans as a copy of the buffer.
117func (r *InMemoryReporter) GetSpans() []opentracing.Span {
118 r.lock.Lock()
119 defer r.lock.Unlock()
120 copied := make([]opentracing.Span, len(r.spans))
121 copy(copied, r.spans)
122 return copied
123}
124
125// Reset clears all accumulated spans.
126func (r *InMemoryReporter) Reset() {
127 r.lock.Lock()
128 defer r.lock.Unlock()
129
130 // Before reset the collection need to release Span memory
131 for _, span := range r.spans {
132 span.(*Span).Release()
133 }
134 r.spans = r.spans[:0]
135}
136
137// ------------------------------
138
139type compositeReporter struct {
140 reporters []Reporter
141}
142
143// NewCompositeReporter creates a reporter that ignores all reported spans.
144func NewCompositeReporter(reporters ...Reporter) Reporter {
145 return &compositeReporter{reporters: reporters}
146}
147
148// Report implements Report() method of Reporter by delegating to each underlying reporter.
149func (r *compositeReporter) Report(span *Span) {
150 for _, reporter := range r.reporters {
151 reporter.Report(span)
152 }
153}
154
155// Close implements Close() method of Reporter by closing each underlying reporter.
156func (r *compositeReporter) Close() {
157 for _, reporter := range r.reporters {
158 reporter.Close()
159 }
160}
161
162// ------------- REMOTE REPORTER -----------------
163
164type reporterQueueItemType int
165
166const (
167 defaultQueueSize = 100
168 defaultBufferFlushInterval = 1 * time.Second
169
170 reporterQueueItemSpan reporterQueueItemType = iota
171 reporterQueueItemClose
172)
173
174type reporterQueueItem struct {
175 itemType reporterQueueItemType
176 span *Span
177 close *sync.WaitGroup
178}
179
180// reporterStats implements reporterstats.ReporterStats.
181type reporterStats struct {
182 droppedCount int64 // provided to Transports to report data loss to the backend
183}
184
185// SpansDroppedFromQueue implements reporterstats.ReporterStats.
186func (r *reporterStats) SpansDroppedFromQueue() int64 {
187 return atomic.LoadInt64(&r.droppedCount)
188}
189
190func (r *reporterStats) incDroppedCount() {
191 atomic.AddInt64(&r.droppedCount, 1)
192}
193
194type remoteReporter struct {
195 // These fields must be first in the struct because `sync/atomic` expects 64-bit alignment.
196 // Cf. https://github.com/uber/jaeger-client-go/issues/155, https://goo.gl/zW7dgq
197 queueLength int64 // used to update metrics.Gauge
198 closed int64 // 0 - not closed, 1 - closed
199
200 reporterOptions
201
202 sender Transport
203 queue chan reporterQueueItem
204 reporterStats *reporterStats
205}
206
207// NewRemoteReporter creates a new reporter that sends spans out of process by means of Sender.
208// Calls to Report(Span) return immediately (side effect: if internal buffer is full the span is dropped).
209// Periodically the transport buffer is flushed even if it hasn't reached max packet size.
210// Calls to Close() block until all spans reported prior to the call to Close are flushed.
211func NewRemoteReporter(sender Transport, opts ...ReporterOption) Reporter {
212 options := reporterOptions{}
213 for _, option := range opts {
214 option(&options)
215 }
216 if options.bufferFlushInterval <= 0 {
217 options.bufferFlushInterval = defaultBufferFlushInterval
218 }
219 if options.logger == nil {
220 options.logger = log.NullLogger
221 }
222 if options.metrics == nil {
223 options.metrics = NewNullMetrics()
224 }
225 if options.queueSize <= 0 {
226 options.queueSize = defaultQueueSize
227 }
228 reporter := &remoteReporter{
229 reporterOptions: options,
230 sender: sender,
231 queue: make(chan reporterQueueItem, options.queueSize),
232 reporterStats: new(reporterStats),
233 }
234 if receiver, ok := sender.(reporterstats.Receiver); ok {
235 receiver.SetReporterStats(reporter.reporterStats)
236 }
237 go reporter.processQueue()
238 return reporter
239}
240
241// Report implements Report() method of Reporter.
242// It passes the span to a background go-routine for submission to Jaeger backend.
243// If the internal queue is full, the span is dropped and metrics.ReporterDropped counter is incremented.
244// If Report() is called after the reporter has been Close()-ed, the additional spans will not be
245// sent to the backend, but the metrics.ReporterDropped counter may not reflect them correctly,
246// because some of them may still be successfully added to the queue.
247func (r *remoteReporter) Report(span *Span) {
248 select {
249 // Need to retain the span otherwise it will be released
250 case r.queue <- reporterQueueItem{itemType: reporterQueueItemSpan, span: span.Retain()}:
251 atomic.AddInt64(&r.queueLength, 1)
252 default:
253 r.metrics.ReporterDropped.Inc(1)
254 r.reporterStats.incDroppedCount()
255 }
256}
257
258// Close implements Close() method of Reporter by waiting for the queue to be drained.
259func (r *remoteReporter) Close() {
260 r.logger.Debugf("closing reporter")
261 if swapped := atomic.CompareAndSwapInt64(&r.closed, 0, 1); !swapped {
262 r.logger.Error("Repeated attempt to close the reporter is ignored")
263 return
264 }
265 r.sendCloseEvent()
266 _ = r.sender.Close()
267}
268
269func (r *remoteReporter) sendCloseEvent() {
270 wg := &sync.WaitGroup{}
271 wg.Add(1)
272 item := reporterQueueItem{itemType: reporterQueueItemClose, close: wg}
273
274 r.queue <- item // if the queue is full we will block until there is space
275 atomic.AddInt64(&r.queueLength, 1)
276 wg.Wait()
277}
278
279// processQueue reads spans from the queue, converts them to Thrift, and stores them in an internal buffer.
280// When the buffer length reaches batchSize, it is flushed by submitting the accumulated spans to Jaeger.
281// Buffer also gets flushed automatically every batchFlushInterval seconds, just in case the tracer stopped
282// reporting new spans.
283func (r *remoteReporter) processQueue() {
284 // flush causes the Sender to flush its accumulated spans and clear the buffer
285 flush := func() {
286 if flushed, err := r.sender.Flush(); err != nil {
287 r.metrics.ReporterFailure.Inc(int64(flushed))
288 r.logger.Error(fmt.Sprintf("failed to flush Jaeger spans to server: %s", err.Error()))
289 } else if flushed > 0 {
290 r.metrics.ReporterSuccess.Inc(int64(flushed))
291 }
292 }
293
294 timer := time.NewTicker(r.bufferFlushInterval)
295 for {
296 select {
297 case <-timer.C:
298 flush()
299 case item := <-r.queue:
300 atomic.AddInt64(&r.queueLength, -1)
301 switch item.itemType {
302 case reporterQueueItemSpan:
303 span := item.span
304 if flushed, err := r.sender.Append(span); err != nil {
305 r.metrics.ReporterFailure.Inc(int64(flushed))
306 r.logger.Error(fmt.Sprintf("error reporting Jaeger span %q: %s", span.OperationName(), err.Error()))
307 } else if flushed > 0 {
308 r.metrics.ReporterSuccess.Inc(int64(flushed))
309 // to reduce the number of gauge stats, we only emit queue length on flush
310 r.metrics.ReporterQueueLength.Update(atomic.LoadInt64(&r.queueLength))
311 r.logger.Debugf("flushed %d spans", flushed)
312 }
313 span.Release()
314 case reporterQueueItemClose:
315 timer.Stop()
316 flush()
317 item.close.Done()
318 return
319 }
320 }
321 }
322}