Takahiro Suzuki | d7bf820 | 2020-12-17 20:21:59 +0900 | [diff] [blame^] | 1 | // Copyright (c) 2017 Uber Technologies, Inc. |
| 2 | // |
| 3 | // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | // you may not use this file except in compliance with the License. |
| 5 | // You may obtain a copy of the License at |
| 6 | // |
| 7 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | // |
| 9 | // Unless required by applicable law or agreed to in writing, software |
| 10 | // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | // See the License for the specific language governing permissions and |
| 13 | // limitations under the License. |
| 14 | |
| 15 | package jaeger |
| 16 | |
| 17 | import ( |
| 18 | "fmt" |
| 19 | "sync" |
| 20 | "sync/atomic" |
| 21 | "time" |
| 22 | |
| 23 | "github.com/opentracing/opentracing-go" |
| 24 | |
| 25 | "github.com/uber/jaeger-client-go/internal/reporterstats" |
| 26 | "github.com/uber/jaeger-client-go/log" |
| 27 | ) |
| 28 | |
| 29 | // Reporter is called by the tracer when a span is completed to report the span to the tracing collector. |
| 30 | type Reporter interface { |
| 31 | // Report submits a new span to collectors, possibly asynchronously and/or with buffering. |
| 32 | // If the reporter is processing Span asynchronously then it needs to Retain() the span, |
| 33 | // and then Release() it when no longer needed, to avoid span data corruption. |
| 34 | Report(span *Span) |
| 35 | |
| 36 | // Close does a clean shutdown of the reporter, flushing any traces that may be buffered in memory. |
| 37 | Close() |
| 38 | } |
| 39 | |
| 40 | // ------------------------------ |
| 41 | |
| 42 | type nullReporter struct{} |
| 43 | |
| 44 | // NewNullReporter creates a no-op reporter that ignores all reported spans. |
| 45 | func NewNullReporter() Reporter { |
| 46 | return &nullReporter{} |
| 47 | } |
| 48 | |
| 49 | // Report implements Report() method of Reporter by doing nothing. |
| 50 | func (r *nullReporter) Report(span *Span) { |
| 51 | // no-op |
| 52 | } |
| 53 | |
| 54 | // Close implements Close() method of Reporter by doing nothing. |
| 55 | func (r *nullReporter) Close() { |
| 56 | // no-op |
| 57 | } |
| 58 | |
| 59 | // ------------------------------ |
| 60 | |
| 61 | type loggingReporter struct { |
| 62 | logger Logger |
| 63 | } |
| 64 | |
| 65 | // NewLoggingReporter creates a reporter that logs all reported spans to provided logger. |
| 66 | func NewLoggingReporter(logger Logger) Reporter { |
| 67 | return &loggingReporter{logger} |
| 68 | } |
| 69 | |
| 70 | // Report implements Report() method of Reporter by logging the span to the logger. |
| 71 | func (r *loggingReporter) Report(span *Span) { |
| 72 | r.logger.Infof("Reporting span %+v", span) |
| 73 | } |
| 74 | |
| 75 | // Close implements Close() method of Reporter by doing nothing. |
| 76 | func (r *loggingReporter) Close() { |
| 77 | // no-op |
| 78 | } |
| 79 | |
| 80 | // ------------------------------ |
| 81 | |
| 82 | // InMemoryReporter is used for testing, and simply collects spans in memory. |
| 83 | type InMemoryReporter struct { |
| 84 | spans []opentracing.Span |
| 85 | lock sync.Mutex |
| 86 | } |
| 87 | |
| 88 | // NewInMemoryReporter creates a reporter that stores spans in memory. |
| 89 | // NOTE: the Tracer should be created with options.PoolSpans = false. |
| 90 | func NewInMemoryReporter() *InMemoryReporter { |
| 91 | return &InMemoryReporter{ |
| 92 | spans: make([]opentracing.Span, 0, 10), |
| 93 | } |
| 94 | } |
| 95 | |
| 96 | // Report implements Report() method of Reporter by storing the span in the buffer. |
| 97 | func (r *InMemoryReporter) Report(span *Span) { |
| 98 | r.lock.Lock() |
| 99 | // Need to retain the span otherwise it will be released |
| 100 | r.spans = append(r.spans, span.Retain()) |
| 101 | r.lock.Unlock() |
| 102 | } |
| 103 | |
| 104 | // Close implements Close() method of Reporter |
| 105 | func (r *InMemoryReporter) Close() { |
| 106 | r.Reset() |
| 107 | } |
| 108 | |
| 109 | // SpansSubmitted returns the number of spans accumulated in the buffer. |
| 110 | func (r *InMemoryReporter) SpansSubmitted() int { |
| 111 | r.lock.Lock() |
| 112 | defer r.lock.Unlock() |
| 113 | return len(r.spans) |
| 114 | } |
| 115 | |
| 116 | // GetSpans returns accumulated spans as a copy of the buffer. |
| 117 | func (r *InMemoryReporter) GetSpans() []opentracing.Span { |
| 118 | r.lock.Lock() |
| 119 | defer r.lock.Unlock() |
| 120 | copied := make([]opentracing.Span, len(r.spans)) |
| 121 | copy(copied, r.spans) |
| 122 | return copied |
| 123 | } |
| 124 | |
| 125 | // Reset clears all accumulated spans. |
| 126 | func (r *InMemoryReporter) Reset() { |
| 127 | r.lock.Lock() |
| 128 | defer r.lock.Unlock() |
| 129 | |
| 130 | // Before reset the collection need to release Span memory |
| 131 | for _, span := range r.spans { |
| 132 | span.(*Span).Release() |
| 133 | } |
| 134 | r.spans = r.spans[:0] |
| 135 | } |
| 136 | |
| 137 | // ------------------------------ |
| 138 | |
| 139 | type compositeReporter struct { |
| 140 | reporters []Reporter |
| 141 | } |
| 142 | |
| 143 | // NewCompositeReporter creates a reporter that ignores all reported spans. |
| 144 | func NewCompositeReporter(reporters ...Reporter) Reporter { |
| 145 | return &compositeReporter{reporters: reporters} |
| 146 | } |
| 147 | |
| 148 | // Report implements Report() method of Reporter by delegating to each underlying reporter. |
| 149 | func (r *compositeReporter) Report(span *Span) { |
| 150 | for _, reporter := range r.reporters { |
| 151 | reporter.Report(span) |
| 152 | } |
| 153 | } |
| 154 | |
| 155 | // Close implements Close() method of Reporter by closing each underlying reporter. |
| 156 | func (r *compositeReporter) Close() { |
| 157 | for _, reporter := range r.reporters { |
| 158 | reporter.Close() |
| 159 | } |
| 160 | } |
| 161 | |
| 162 | // ------------- REMOTE REPORTER ----------------- |
| 163 | |
| 164 | type reporterQueueItemType int |
| 165 | |
| 166 | const ( |
| 167 | defaultQueueSize = 100 |
| 168 | defaultBufferFlushInterval = 1 * time.Second |
| 169 | |
| 170 | reporterQueueItemSpan reporterQueueItemType = iota |
| 171 | reporterQueueItemClose |
| 172 | ) |
| 173 | |
| 174 | type reporterQueueItem struct { |
| 175 | itemType reporterQueueItemType |
| 176 | span *Span |
| 177 | close *sync.WaitGroup |
| 178 | } |
| 179 | |
| 180 | // reporterStats implements reporterstats.ReporterStats. |
| 181 | type reporterStats struct { |
| 182 | droppedCount int64 // provided to Transports to report data loss to the backend |
| 183 | } |
| 184 | |
| 185 | // SpansDroppedFromQueue implements reporterstats.ReporterStats. |
| 186 | func (r *reporterStats) SpansDroppedFromQueue() int64 { |
| 187 | return atomic.LoadInt64(&r.droppedCount) |
| 188 | } |
| 189 | |
| 190 | func (r *reporterStats) incDroppedCount() { |
| 191 | atomic.AddInt64(&r.droppedCount, 1) |
| 192 | } |
| 193 | |
| 194 | type remoteReporter struct { |
| 195 | // These fields must be first in the struct because `sync/atomic` expects 64-bit alignment. |
| 196 | // Cf. https://github.com/uber/jaeger-client-go/issues/155, https://goo.gl/zW7dgq |
| 197 | queueLength int64 // used to update metrics.Gauge |
| 198 | closed int64 // 0 - not closed, 1 - closed |
| 199 | |
| 200 | reporterOptions |
| 201 | |
| 202 | sender Transport |
| 203 | queue chan reporterQueueItem |
| 204 | reporterStats *reporterStats |
| 205 | } |
| 206 | |
| 207 | // NewRemoteReporter creates a new reporter that sends spans out of process by means of Sender. |
| 208 | // Calls to Report(Span) return immediately (side effect: if internal buffer is full the span is dropped). |
| 209 | // Periodically the transport buffer is flushed even if it hasn't reached max packet size. |
| 210 | // Calls to Close() block until all spans reported prior to the call to Close are flushed. |
| 211 | func NewRemoteReporter(sender Transport, opts ...ReporterOption) Reporter { |
| 212 | options := reporterOptions{} |
| 213 | for _, option := range opts { |
| 214 | option(&options) |
| 215 | } |
| 216 | if options.bufferFlushInterval <= 0 { |
| 217 | options.bufferFlushInterval = defaultBufferFlushInterval |
| 218 | } |
| 219 | if options.logger == nil { |
| 220 | options.logger = log.NullLogger |
| 221 | } |
| 222 | if options.metrics == nil { |
| 223 | options.metrics = NewNullMetrics() |
| 224 | } |
| 225 | if options.queueSize <= 0 { |
| 226 | options.queueSize = defaultQueueSize |
| 227 | } |
| 228 | reporter := &remoteReporter{ |
| 229 | reporterOptions: options, |
| 230 | sender: sender, |
| 231 | queue: make(chan reporterQueueItem, options.queueSize), |
| 232 | reporterStats: new(reporterStats), |
| 233 | } |
| 234 | if receiver, ok := sender.(reporterstats.Receiver); ok { |
| 235 | receiver.SetReporterStats(reporter.reporterStats) |
| 236 | } |
| 237 | go reporter.processQueue() |
| 238 | return reporter |
| 239 | } |
| 240 | |
| 241 | // Report implements Report() method of Reporter. |
| 242 | // It passes the span to a background go-routine for submission to Jaeger backend. |
| 243 | // If the internal queue is full, the span is dropped and metrics.ReporterDropped counter is incremented. |
| 244 | // If Report() is called after the reporter has been Close()-ed, the additional spans will not be |
| 245 | // sent to the backend, but the metrics.ReporterDropped counter may not reflect them correctly, |
| 246 | // because some of them may still be successfully added to the queue. |
| 247 | func (r *remoteReporter) Report(span *Span) { |
| 248 | select { |
| 249 | // Need to retain the span otherwise it will be released |
| 250 | case r.queue <- reporterQueueItem{itemType: reporterQueueItemSpan, span: span.Retain()}: |
| 251 | atomic.AddInt64(&r.queueLength, 1) |
| 252 | default: |
| 253 | r.metrics.ReporterDropped.Inc(1) |
| 254 | r.reporterStats.incDroppedCount() |
| 255 | } |
| 256 | } |
| 257 | |
| 258 | // Close implements Close() method of Reporter by waiting for the queue to be drained. |
| 259 | func (r *remoteReporter) Close() { |
| 260 | r.logger.Debugf("closing reporter") |
| 261 | if swapped := atomic.CompareAndSwapInt64(&r.closed, 0, 1); !swapped { |
| 262 | r.logger.Error("Repeated attempt to close the reporter is ignored") |
| 263 | return |
| 264 | } |
| 265 | r.sendCloseEvent() |
| 266 | _ = r.sender.Close() |
| 267 | } |
| 268 | |
| 269 | func (r *remoteReporter) sendCloseEvent() { |
| 270 | wg := &sync.WaitGroup{} |
| 271 | wg.Add(1) |
| 272 | item := reporterQueueItem{itemType: reporterQueueItemClose, close: wg} |
| 273 | |
| 274 | r.queue <- item // if the queue is full we will block until there is space |
| 275 | atomic.AddInt64(&r.queueLength, 1) |
| 276 | wg.Wait() |
| 277 | } |
| 278 | |
| 279 | // processQueue reads spans from the queue, converts them to Thrift, and stores them in an internal buffer. |
| 280 | // When the buffer length reaches batchSize, it is flushed by submitting the accumulated spans to Jaeger. |
| 281 | // Buffer also gets flushed automatically every batchFlushInterval seconds, just in case the tracer stopped |
| 282 | // reporting new spans. |
| 283 | func (r *remoteReporter) processQueue() { |
| 284 | // flush causes the Sender to flush its accumulated spans and clear the buffer |
| 285 | flush := func() { |
| 286 | if flushed, err := r.sender.Flush(); err != nil { |
| 287 | r.metrics.ReporterFailure.Inc(int64(flushed)) |
| 288 | r.logger.Error(fmt.Sprintf("failed to flush Jaeger spans to server: %s", err.Error())) |
| 289 | } else if flushed > 0 { |
| 290 | r.metrics.ReporterSuccess.Inc(int64(flushed)) |
| 291 | } |
| 292 | } |
| 293 | |
| 294 | timer := time.NewTicker(r.bufferFlushInterval) |
| 295 | for { |
| 296 | select { |
| 297 | case <-timer.C: |
| 298 | flush() |
| 299 | case item := <-r.queue: |
| 300 | atomic.AddInt64(&r.queueLength, -1) |
| 301 | switch item.itemType { |
| 302 | case reporterQueueItemSpan: |
| 303 | span := item.span |
| 304 | if flushed, err := r.sender.Append(span); err != nil { |
| 305 | r.metrics.ReporterFailure.Inc(int64(flushed)) |
| 306 | r.logger.Error(fmt.Sprintf("error reporting Jaeger span %q: %s", span.OperationName(), err.Error())) |
| 307 | } else if flushed > 0 { |
| 308 | r.metrics.ReporterSuccess.Inc(int64(flushed)) |
| 309 | // to reduce the number of gauge stats, we only emit queue length on flush |
| 310 | r.metrics.ReporterQueueLength.Update(atomic.LoadInt64(&r.queueLength)) |
| 311 | r.logger.Debugf("flushed %d spans", flushed) |
| 312 | } |
| 313 | span.Release() |
| 314 | case reporterQueueItemClose: |
| 315 | timer.Stop() |
| 316 | flush() |
| 317 | item.close.Done() |
| 318 | return |
| 319 | } |
| 320 | } |
| 321 | } |
| 322 | } |