blob: d0be8ad500796c87d2218c135dd502059fe8447c [file] [log] [blame]
Takahiro Suzuki241c10e2020-12-17 20:17:57 +09001// Copyright (c) 2017 Uber Technologies, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package jaeger
16
17import (
18 "fmt"
19 "math"
20 "strings"
21 "sync"
22
23 "github.com/uber/jaeger-client-go/thrift-gen/sampling"
24 "github.com/uber/jaeger-client-go/utils"
25)
26
27const (
28 defaultMaxOperations = 2000
29)
30
31// Sampler decides whether a new trace should be sampled or not.
32type Sampler interface {
33 // IsSampled decides whether a trace with given `id` and `operation`
34 // should be sampled. This function will also return the tags that
35 // can be used to identify the type of sampling that was applied to
36 // the root span. Most simple samplers would return two tags,
37 // sampler.type and sampler.param, similar to those used in the Configuration
38 IsSampled(id TraceID, operation string) (sampled bool, tags []Tag)
39
40 // Close does a clean shutdown of the sampler, stopping any background
41 // go-routines it may have started.
42 Close()
43
44 // Equal checks if the `other` sampler is functionally equivalent
45 // to this sampler.
46 // TODO (breaking change) remove this function. See PerOperationSampler.Equals for explanation.
47 Equal(other Sampler) bool
48}
49
50// -----------------------
51
52// ConstSampler is a sampler that always makes the same decision.
53type ConstSampler struct {
54 legacySamplerV1Base
55 Decision bool
56 tags []Tag
57}
58
59// NewConstSampler creates a ConstSampler.
60func NewConstSampler(sample bool) *ConstSampler {
61 tags := []Tag{
62 {key: SamplerTypeTagKey, value: SamplerTypeConst},
63 {key: SamplerParamTagKey, value: sample},
64 }
65 s := &ConstSampler{
66 Decision: sample,
67 tags: tags,
68 }
69 s.delegate = s.IsSampled
70 return s
71}
72
73// IsSampled implements IsSampled() of Sampler.
74func (s *ConstSampler) IsSampled(id TraceID, operation string) (bool, []Tag) {
75 return s.Decision, s.tags
76}
77
78// Close implements Close() of Sampler.
79func (s *ConstSampler) Close() {
80 // nothing to do
81}
82
83// Equal implements Equal() of Sampler.
84func (s *ConstSampler) Equal(other Sampler) bool {
85 if o, ok := other.(*ConstSampler); ok {
86 return s.Decision == o.Decision
87 }
88 return false
89}
90
91// String is used to log sampler details.
92func (s *ConstSampler) String() string {
93 return fmt.Sprintf("ConstSampler(decision=%t)", s.Decision)
94}
95
96// -----------------------
97
98// ProbabilisticSampler is a sampler that randomly samples a certain percentage
99// of traces.
100type ProbabilisticSampler struct {
101 legacySamplerV1Base
102 samplingRate float64
103 samplingBoundary uint64
104 tags []Tag
105}
106
107const maxRandomNumber = ^(uint64(1) << 63) // i.e. 0x7fffffffffffffff
108
109// NewProbabilisticSampler creates a sampler that randomly samples a certain percentage of traces specified by the
110// samplingRate, in the range between 0.0 and 1.0.
111//
112// It relies on the fact that new trace IDs are 63bit random numbers themselves, thus making the sampling decision
113// without generating a new random number, but simply calculating if traceID < (samplingRate * 2^63).
114// TODO remove the error from this function for next major release
115func NewProbabilisticSampler(samplingRate float64) (*ProbabilisticSampler, error) {
116 if samplingRate < 0.0 || samplingRate > 1.0 {
117 return nil, fmt.Errorf("Sampling Rate must be between 0.0 and 1.0, received %f", samplingRate)
118 }
119 return newProbabilisticSampler(samplingRate), nil
120}
121
122func newProbabilisticSampler(samplingRate float64) *ProbabilisticSampler {
123 s := new(ProbabilisticSampler)
124 s.delegate = s.IsSampled
125 return s.init(samplingRate)
126}
127
128func (s *ProbabilisticSampler) init(samplingRate float64) *ProbabilisticSampler {
129 s.samplingRate = math.Max(0.0, math.Min(samplingRate, 1.0))
130 s.samplingBoundary = uint64(float64(maxRandomNumber) * s.samplingRate)
131 s.tags = []Tag{
132 {key: SamplerTypeTagKey, value: SamplerTypeProbabilistic},
133 {key: SamplerParamTagKey, value: s.samplingRate},
134 }
135 return s
136}
137
138// SamplingRate returns the sampling probability this sampled was constructed with.
139func (s *ProbabilisticSampler) SamplingRate() float64 {
140 return s.samplingRate
141}
142
143// IsSampled implements IsSampled() of Sampler.
144func (s *ProbabilisticSampler) IsSampled(id TraceID, operation string) (bool, []Tag) {
145 return s.samplingBoundary >= id.Low&maxRandomNumber, s.tags
146}
147
148// Close implements Close() of Sampler.
149func (s *ProbabilisticSampler) Close() {
150 // nothing to do
151}
152
153// Equal implements Equal() of Sampler.
154func (s *ProbabilisticSampler) Equal(other Sampler) bool {
155 if o, ok := other.(*ProbabilisticSampler); ok {
156 return s.samplingBoundary == o.samplingBoundary
157 }
158 return false
159}
160
161// Update modifies in-place the sampling rate. Locking must be done externally.
162func (s *ProbabilisticSampler) Update(samplingRate float64) error {
163 if samplingRate < 0.0 || samplingRate > 1.0 {
164 return fmt.Errorf("Sampling Rate must be between 0.0 and 1.0, received %f", samplingRate)
165 }
166 s.init(samplingRate)
167 return nil
168}
169
170// String is used to log sampler details.
171func (s *ProbabilisticSampler) String() string {
172 return fmt.Sprintf("ProbabilisticSampler(samplingRate=%v)", s.samplingRate)
173}
174
175// -----------------------
176
177// RateLimitingSampler samples at most maxTracesPerSecond. The distribution of sampled traces follows
178// burstiness of the service, i.e. a service with uniformly distributed requests will have those
179// requests sampled uniformly as well, but if requests are bursty, especially sub-second, then a
180// number of sequential requests can be sampled each second.
181type RateLimitingSampler struct {
182 legacySamplerV1Base
183 maxTracesPerSecond float64
184 rateLimiter *utils.ReconfigurableRateLimiter
185 tags []Tag
186}
187
188// NewRateLimitingSampler creates new RateLimitingSampler.
189func NewRateLimitingSampler(maxTracesPerSecond float64) *RateLimitingSampler {
190 s := new(RateLimitingSampler)
191 s.delegate = s.IsSampled
192 return s.init(maxTracesPerSecond)
193}
194
195func (s *RateLimitingSampler) init(maxTracesPerSecond float64) *RateLimitingSampler {
196 if s.rateLimiter == nil {
197 s.rateLimiter = utils.NewRateLimiter(maxTracesPerSecond, math.Max(maxTracesPerSecond, 1.0))
198 } else {
199 s.rateLimiter.Update(maxTracesPerSecond, math.Max(maxTracesPerSecond, 1.0))
200 }
201 s.maxTracesPerSecond = maxTracesPerSecond
202 s.tags = []Tag{
203 {key: SamplerTypeTagKey, value: SamplerTypeRateLimiting},
204 {key: SamplerParamTagKey, value: maxTracesPerSecond},
205 }
206 return s
207}
208
209// IsSampled implements IsSampled() of Sampler.
210func (s *RateLimitingSampler) IsSampled(id TraceID, operation string) (bool, []Tag) {
211 return s.rateLimiter.CheckCredit(1.0), s.tags
212}
213
214// Update reconfigures the rate limiter, while preserving its accumulated balance.
215// Locking must be done externally.
216func (s *RateLimitingSampler) Update(maxTracesPerSecond float64) {
217 if s.maxTracesPerSecond != maxTracesPerSecond {
218 s.init(maxTracesPerSecond)
219 }
220}
221
222// Close does nothing.
223func (s *RateLimitingSampler) Close() {
224 // nothing to do
225}
226
227// Equal compares with another sampler.
228func (s *RateLimitingSampler) Equal(other Sampler) bool {
229 if o, ok := other.(*RateLimitingSampler); ok {
230 return s.maxTracesPerSecond == o.maxTracesPerSecond
231 }
232 return false
233}
234
235// String is used to log sampler details.
236func (s *RateLimitingSampler) String() string {
237 return fmt.Sprintf("RateLimitingSampler(maxTracesPerSecond=%v)", s.maxTracesPerSecond)
238}
239
240// -----------------------
241
242// GuaranteedThroughputProbabilisticSampler is a sampler that leverages both ProbabilisticSampler and
243// RateLimitingSampler. The RateLimitingSampler is used as a guaranteed lower bound sampler such that
244// every operation is sampled at least once in a time interval defined by the lowerBound. ie a lowerBound
245// of 1.0 / (60 * 10) will sample an operation at least once every 10 minutes.
246//
247// The ProbabilisticSampler is given higher priority when tags are emitted, ie. if IsSampled() for both
248// samplers return true, the tags for ProbabilisticSampler will be used.
249type GuaranteedThroughputProbabilisticSampler struct {
250 probabilisticSampler *ProbabilisticSampler
251 lowerBoundSampler *RateLimitingSampler
252 tags []Tag
253 samplingRate float64
254 lowerBound float64
255}
256
257// NewGuaranteedThroughputProbabilisticSampler returns a delegating sampler that applies both
258// ProbabilisticSampler and RateLimitingSampler.
259func NewGuaranteedThroughputProbabilisticSampler(
260 lowerBound, samplingRate float64,
261) (*GuaranteedThroughputProbabilisticSampler, error) {
262 return newGuaranteedThroughputProbabilisticSampler(lowerBound, samplingRate), nil
263}
264
265func newGuaranteedThroughputProbabilisticSampler(lowerBound, samplingRate float64) *GuaranteedThroughputProbabilisticSampler {
266 s := &GuaranteedThroughputProbabilisticSampler{
267 lowerBoundSampler: NewRateLimitingSampler(lowerBound),
268 lowerBound: lowerBound,
269 }
270 s.setProbabilisticSampler(samplingRate)
271 return s
272}
273
274func (s *GuaranteedThroughputProbabilisticSampler) setProbabilisticSampler(samplingRate float64) {
275 if s.probabilisticSampler == nil {
276 s.probabilisticSampler = newProbabilisticSampler(samplingRate)
277 } else if s.samplingRate != samplingRate {
278 s.probabilisticSampler.init(samplingRate)
279 }
280 // since we don't validate samplingRate, sampler may have clamped it to [0, 1] interval
281 samplingRate = s.probabilisticSampler.SamplingRate()
282 if s.samplingRate != samplingRate || s.tags == nil {
283 s.samplingRate = s.probabilisticSampler.SamplingRate()
284 s.tags = []Tag{
285 {key: SamplerTypeTagKey, value: SamplerTypeLowerBound},
286 {key: SamplerParamTagKey, value: s.samplingRate},
287 }
288 }
289}
290
291// IsSampled implements IsSampled() of Sampler.
292func (s *GuaranteedThroughputProbabilisticSampler) IsSampled(id TraceID, operation string) (bool, []Tag) {
293 if sampled, tags := s.probabilisticSampler.IsSampled(id, operation); sampled {
294 s.lowerBoundSampler.IsSampled(id, operation)
295 return true, tags
296 }
297 sampled, _ := s.lowerBoundSampler.IsSampled(id, operation)
298 return sampled, s.tags
299}
300
301// Close implements Close() of Sampler.
302func (s *GuaranteedThroughputProbabilisticSampler) Close() {
303 s.probabilisticSampler.Close()
304 s.lowerBoundSampler.Close()
305}
306
307// Equal implements Equal() of Sampler.
308func (s *GuaranteedThroughputProbabilisticSampler) Equal(other Sampler) bool {
309 // NB The Equal() function is expensive and will be removed. See PerOperationSampler.Equal() for
310 // more information.
311 return false
312}
313
314// this function should only be called while holding a Write lock
315func (s *GuaranteedThroughputProbabilisticSampler) update(lowerBound, samplingRate float64) {
316 s.setProbabilisticSampler(samplingRate)
317 if s.lowerBound != lowerBound {
318 s.lowerBoundSampler.Update(lowerBound)
319 s.lowerBound = lowerBound
320 }
321}
322
323func (s GuaranteedThroughputProbabilisticSampler) String() string {
324 return fmt.Sprintf("GuaranteedThroughputProbabilisticSampler(lowerBound=%f, samplingRate=%f)", s.lowerBound, s.samplingRate)
325}
326
327// -----------------------
328
329// PerOperationSampler is a delegating sampler that applies GuaranteedThroughputProbabilisticSampler
330// on a per-operation basis.
331type PerOperationSampler struct {
332 sync.RWMutex
333
334 samplers map[string]*GuaranteedThroughputProbabilisticSampler
335 defaultSampler *ProbabilisticSampler
336 lowerBound float64
337 maxOperations int
338
339 // see description in PerOperationSamplerParams
340 operationNameLateBinding bool
341}
342
343// NewAdaptiveSampler returns a new PerOperationSampler.
344// Deprecated: please use NewPerOperationSampler.
345func NewAdaptiveSampler(strategies *sampling.PerOperationSamplingStrategies, maxOperations int) (*PerOperationSampler, error) {
346 return NewPerOperationSampler(PerOperationSamplerParams{
347 MaxOperations: maxOperations,
348 Strategies: strategies,
349 }), nil
350}
351
352// PerOperationSamplerParams defines parameters when creating PerOperationSampler.
353type PerOperationSamplerParams struct {
354 // Max number of operations that will be tracked. Other operations will be given default strategy.
355 MaxOperations int
356
357 // Opt-in feature for applications that require late binding of span name via explicit call to SetOperationName.
358 // When this feature is enabled, the sampler will return retryable=true from OnCreateSpan(), thus leaving
359 // the sampling decision as non-final (and the span as writeable). This may lead to degraded performance
360 // in applications that always provide the correct span name on trace creation.
361 //
362 // For backwards compatibility this option is off by default.
363 OperationNameLateBinding bool
364
365 // Initial configuration of the sampling strategies (usually retrieved from the backend by Remote Sampler).
366 Strategies *sampling.PerOperationSamplingStrategies
367}
368
369// NewPerOperationSampler returns a new PerOperationSampler.
370func NewPerOperationSampler(params PerOperationSamplerParams) *PerOperationSampler {
371 if params.MaxOperations <= 0 {
372 params.MaxOperations = defaultMaxOperations
373 }
374 samplers := make(map[string]*GuaranteedThroughputProbabilisticSampler)
375 for _, strategy := range params.Strategies.PerOperationStrategies {
376 sampler := newGuaranteedThroughputProbabilisticSampler(
377 params.Strategies.DefaultLowerBoundTracesPerSecond,
378 strategy.ProbabilisticSampling.SamplingRate,
379 )
380 samplers[strategy.Operation] = sampler
381 }
382 return &PerOperationSampler{
383 samplers: samplers,
384 defaultSampler: newProbabilisticSampler(params.Strategies.DefaultSamplingProbability),
385 lowerBound: params.Strategies.DefaultLowerBoundTracesPerSecond,
386 maxOperations: params.MaxOperations,
387 operationNameLateBinding: params.OperationNameLateBinding,
388 }
389}
390
391// IsSampled is not used and only exists to match Sampler V1 API.
392// TODO (breaking change) remove when upgrading everything to SamplerV2
393func (s *PerOperationSampler) IsSampled(id TraceID, operation string) (bool, []Tag) {
394 return false, nil
395}
396
397func (s *PerOperationSampler) trySampling(span *Span, operationName string) (bool, []Tag) {
398 samplerV1 := s.getSamplerForOperation(operationName)
399 var sampled bool
400 var tags []Tag
401 if span.context.samplingState.isLocalRootSpan(span.context.spanID) {
402 sampled, tags = samplerV1.IsSampled(span.context.TraceID(), operationName)
403 }
404 return sampled, tags
405}
406
407// OnCreateSpan implements OnCreateSpan of SamplerV2.
408func (s *PerOperationSampler) OnCreateSpan(span *Span) SamplingDecision {
409 sampled, tags := s.trySampling(span, span.OperationName())
410 return SamplingDecision{Sample: sampled, Retryable: s.operationNameLateBinding, Tags: tags}
411}
412
413// OnSetOperationName implements OnSetOperationName of SamplerV2.
414func (s *PerOperationSampler) OnSetOperationName(span *Span, operationName string) SamplingDecision {
415 sampled, tags := s.trySampling(span, operationName)
416 return SamplingDecision{Sample: sampled, Retryable: false, Tags: tags}
417}
418
419// OnSetTag implements OnSetTag of SamplerV2.
420func (s *PerOperationSampler) OnSetTag(span *Span, key string, value interface{}) SamplingDecision {
421 return SamplingDecision{Sample: false, Retryable: true}
422}
423
424// OnFinishSpan implements OnFinishSpan of SamplerV2.
425func (s *PerOperationSampler) OnFinishSpan(span *Span) SamplingDecision {
426 return SamplingDecision{Sample: false, Retryable: true}
427}
428
429func (s *PerOperationSampler) getSamplerForOperation(operation string) Sampler {
430 s.RLock()
431 sampler, ok := s.samplers[operation]
432 if ok {
433 defer s.RUnlock()
434 return sampler
435 }
436 s.RUnlock()
437 s.Lock()
438 defer s.Unlock()
439
440 // Check if sampler has already been created
441 sampler, ok = s.samplers[operation]
442 if ok {
443 return sampler
444 }
445 // Store only up to maxOperations of unique ops.
446 if len(s.samplers) >= s.maxOperations {
447 return s.defaultSampler
448 }
449 newSampler := newGuaranteedThroughputProbabilisticSampler(s.lowerBound, s.defaultSampler.SamplingRate())
450 s.samplers[operation] = newSampler
451 return newSampler
452}
453
454// Close invokes Close on all underlying samplers.
455func (s *PerOperationSampler) Close() {
456 s.Lock()
457 defer s.Unlock()
458 for _, sampler := range s.samplers {
459 sampler.Close()
460 }
461 s.defaultSampler.Close()
462}
463
464func (s *PerOperationSampler) String() string {
465 var sb strings.Builder
466
467 fmt.Fprintf(&sb, "PerOperationSampler(defaultSampler=%v, ", s.defaultSampler)
468 fmt.Fprintf(&sb, "lowerBound=%f, ", s.lowerBound)
469 fmt.Fprintf(&sb, "maxOperations=%d, ", s.maxOperations)
470 fmt.Fprintf(&sb, "operationNameLateBinding=%t, ", s.operationNameLateBinding)
471 fmt.Fprintf(&sb, "numOperations=%d,\n", len(s.samplers))
472 fmt.Fprintf(&sb, "samplers=[")
473 for operationName, sampler := range s.samplers {
474 fmt.Fprintf(&sb, "\n(operationName=%s, sampler=%v)", operationName, sampler)
475 }
476 fmt.Fprintf(&sb, "])")
477
478 return sb.String()
479}
480
481// Equal is not used.
482// TODO (breaking change) remove this in the future
483func (s *PerOperationSampler) Equal(other Sampler) bool {
484 // NB The Equal() function is overly expensive for PerOperationSampler since it's composed of multiple
485 // samplers which all need to be initialized before this function can be called for a comparison.
486 // Therefore, PerOperationSampler uses the update() function to only alter the samplers that need
487 // changing. Hence this function always returns false so that the update function can be called.
488 // Once the Equal() function is removed from the Sampler API, this will no longer be needed.
489 return false
490}
491
492func (s *PerOperationSampler) update(strategies *sampling.PerOperationSamplingStrategies) {
493 s.Lock()
494 defer s.Unlock()
495 newSamplers := map[string]*GuaranteedThroughputProbabilisticSampler{}
496 for _, strategy := range strategies.PerOperationStrategies {
497 operation := strategy.Operation
498 samplingRate := strategy.ProbabilisticSampling.SamplingRate
499 lowerBound := strategies.DefaultLowerBoundTracesPerSecond
500 if sampler, ok := s.samplers[operation]; ok {
501 sampler.update(lowerBound, samplingRate)
502 newSamplers[operation] = sampler
503 } else {
504 sampler := newGuaranteedThroughputProbabilisticSampler(
505 lowerBound,
506 samplingRate,
507 )
508 newSamplers[operation] = sampler
509 }
510 }
511 s.lowerBound = strategies.DefaultLowerBoundTracesPerSecond
512 if s.defaultSampler.SamplingRate() != strategies.DefaultSamplingProbability {
513 s.defaultSampler = newProbabilisticSampler(strategies.DefaultSamplingProbability)
514 }
515 s.samplers = newSamplers
516}