blob: 119f0a1bb6b8d483347b11dc65557da4e4164f6d [file] [log] [blame]
Matteo Scandoloa4285862020-12-01 18:10:10 -08001// Copyright (c) 2017 Uber Technologies, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package jaeger
16
17import (
18 "encoding/json"
19 "fmt"
20 "io/ioutil"
21 "net/http"
22 "net/url"
23 "sync"
24 "sync/atomic"
25 "time"
26
27 "github.com/uber/jaeger-client-go/log"
28 "github.com/uber/jaeger-client-go/thrift-gen/sampling"
29)
30
31const (
David K. Bainbridge06631892021-08-19 13:07:00 +000032 defaultRemoteSamplingTimeout = 10 * time.Second
Matteo Scandoloa4285862020-12-01 18:10:10 -080033 defaultSamplingRefreshInterval = time.Minute
34)
35
36// SamplingStrategyFetcher is used to fetch sampling strategy updates from remote server.
37type SamplingStrategyFetcher interface {
38 Fetch(service string) ([]byte, error)
39}
40
41// SamplingStrategyParser is used to parse sampling strategy updates. The output object
42// should be of the type that is recognized by the SamplerUpdaters.
43type SamplingStrategyParser interface {
44 Parse(response []byte) (interface{}, error)
45}
46
47// SamplerUpdater is used by RemotelyControlledSampler to apply sampling strategies,
48// retrieved from remote config server, to the current sampler. The updater can modify
49// the sampler in-place if sampler supports it, or create a new one.
50//
51// If the strategy does not contain configuration for the sampler in question,
52// updater must return modifiedSampler=nil to give other updaters a chance to inspect
53// the sampling strategy response.
54//
55// RemotelyControlledSampler invokes the updaters while holding a lock on the main sampler.
56type SamplerUpdater interface {
57 Update(sampler SamplerV2, strategy interface{}) (modified SamplerV2, err error)
58}
59
60// RemotelyControlledSampler is a delegating sampler that polls a remote server
61// for the appropriate sampling strategy, constructs a corresponding sampler and
62// delegates to it for sampling decisions.
63type RemotelyControlledSampler struct {
64 // These fields must be first in the struct because `sync/atomic` expects 64-bit alignment.
65 // Cf. https://github.com/uber/jaeger-client-go/issues/155, https://goo.gl/zW7dgq
66 closed int64 // 0 - not closed, 1 - closed
67
David K. Bainbridge06631892021-08-19 13:07:00 +000068 sync.RWMutex // used to serialize access to samplerOptions.sampler
Matteo Scandoloa4285862020-12-01 18:10:10 -080069 samplerOptions
70
71 serviceName string
72 doneChan chan *sync.WaitGroup
73}
74
75// NewRemotelyControlledSampler creates a sampler that periodically pulls
76// the sampling strategy from an HTTP sampling server (e.g. jaeger-agent).
77func NewRemotelyControlledSampler(
78 serviceName string,
79 opts ...SamplerOption,
80) *RemotelyControlledSampler {
81 options := new(samplerOptions).applyOptionsAndDefaults(opts...)
82 sampler := &RemotelyControlledSampler{
83 samplerOptions: *options,
84 serviceName: serviceName,
85 doneChan: make(chan *sync.WaitGroup),
86 }
87 go sampler.pollController()
88 return sampler
89}
90
91// IsSampled implements IsSampled() of Sampler.
92// TODO (breaking change) remove when Sampler V1 is removed
93func (s *RemotelyControlledSampler) IsSampled(id TraceID, operation string) (bool, []Tag) {
94 return false, nil
95}
96
97// OnCreateSpan implements OnCreateSpan of SamplerV2.
98func (s *RemotelyControlledSampler) OnCreateSpan(span *Span) SamplingDecision {
David K. Bainbridge06631892021-08-19 13:07:00 +000099 s.RLock()
100 defer s.RUnlock()
Matteo Scandoloa4285862020-12-01 18:10:10 -0800101 return s.sampler.OnCreateSpan(span)
102}
103
104// OnSetOperationName implements OnSetOperationName of SamplerV2.
105func (s *RemotelyControlledSampler) OnSetOperationName(span *Span, operationName string) SamplingDecision {
David K. Bainbridge06631892021-08-19 13:07:00 +0000106 s.RLock()
107 defer s.RUnlock()
Matteo Scandoloa4285862020-12-01 18:10:10 -0800108 return s.sampler.OnSetOperationName(span, operationName)
109}
110
111// OnSetTag implements OnSetTag of SamplerV2.
112func (s *RemotelyControlledSampler) OnSetTag(span *Span, key string, value interface{}) SamplingDecision {
David K. Bainbridge06631892021-08-19 13:07:00 +0000113 s.RLock()
114 defer s.RUnlock()
Matteo Scandoloa4285862020-12-01 18:10:10 -0800115 return s.sampler.OnSetTag(span, key, value)
116}
117
118// OnFinishSpan implements OnFinishSpan of SamplerV2.
119func (s *RemotelyControlledSampler) OnFinishSpan(span *Span) SamplingDecision {
David K. Bainbridge06631892021-08-19 13:07:00 +0000120 s.RLock()
121 defer s.RUnlock()
Matteo Scandoloa4285862020-12-01 18:10:10 -0800122 return s.sampler.OnFinishSpan(span)
123}
124
125// Close implements Close() of Sampler.
126func (s *RemotelyControlledSampler) Close() {
127 if swapped := atomic.CompareAndSwapInt64(&s.closed, 0, 1); !swapped {
128 s.logger.Error("Repeated attempt to close the sampler is ignored")
129 return
130 }
131
132 var wg sync.WaitGroup
133 wg.Add(1)
134 s.doneChan <- &wg
135 wg.Wait()
136}
137
138// Equal implements Equal() of Sampler.
139func (s *RemotelyControlledSampler) Equal(other Sampler) bool {
140 // NB The Equal() function is expensive and will be removed. See PerOperationSampler.Equal() for
141 // more information.
142 return false
143}
144
145func (s *RemotelyControlledSampler) pollController() {
146 ticker := time.NewTicker(s.samplingRefreshInterval)
147 defer ticker.Stop()
148 s.pollControllerWithTicker(ticker)
149}
150
151func (s *RemotelyControlledSampler) pollControllerWithTicker(ticker *time.Ticker) {
152 for {
153 select {
154 case <-ticker.C:
155 s.UpdateSampler()
156 case wg := <-s.doneChan:
157 wg.Done()
158 return
159 }
160 }
161}
162
163// Sampler returns the currently active sampler.
164func (s *RemotelyControlledSampler) Sampler() SamplerV2 {
David K. Bainbridge06631892021-08-19 13:07:00 +0000165 s.RLock()
166 defer s.RUnlock()
Matteo Scandoloa4285862020-12-01 18:10:10 -0800167 return s.sampler
168}
Matteo Scandoloa4285862020-12-01 18:10:10 -0800169func (s *RemotelyControlledSampler) setSampler(sampler SamplerV2) {
170 s.Lock()
171 defer s.Unlock()
172 s.sampler = sampler
173}
174
175// UpdateSampler forces the sampler to fetch sampling strategy from backend server.
176// This function is called automatically on a timer, but can also be safely called manually, e.g. from tests.
177func (s *RemotelyControlledSampler) UpdateSampler() {
178 res, err := s.samplingFetcher.Fetch(s.serviceName)
179 if err != nil {
180 s.metrics.SamplerQueryFailure.Inc(1)
181 s.logger.Infof("failed to fetch sampling strategy: %v", err)
182 return
183 }
184 strategy, err := s.samplingParser.Parse(res)
185 if err != nil {
186 s.metrics.SamplerUpdateFailure.Inc(1)
187 s.logger.Infof("failed to parse sampling strategy response: %v", err)
188 return
189 }
190
191 s.Lock()
192 defer s.Unlock()
193
194 s.metrics.SamplerRetrieved.Inc(1)
195 if err := s.updateSamplerViaUpdaters(strategy); err != nil {
196 s.metrics.SamplerUpdateFailure.Inc(1)
197 s.logger.Infof("failed to handle sampling strategy response %+v. Got error: %v", res, err)
198 return
199 }
200 s.metrics.SamplerUpdated.Inc(1)
201}
202
203// NB: this function should only be called while holding a Write lock
204func (s *RemotelyControlledSampler) updateSamplerViaUpdaters(strategy interface{}) error {
205 for _, updater := range s.updaters {
206 sampler, err := updater.Update(s.sampler, strategy)
207 if err != nil {
208 return err
209 }
210 if sampler != nil {
211 s.logger.Debugf("sampler updated: %+v", sampler)
212 s.sampler = sampler
213 return nil
214 }
215 }
216 return fmt.Errorf("unsupported sampling strategy %+v", strategy)
217}
218
219// -----------------------
220
221// ProbabilisticSamplerUpdater is used by RemotelyControlledSampler to parse sampling configuration.
222type ProbabilisticSamplerUpdater struct{}
223
224// Update implements Update of SamplerUpdater.
225func (u *ProbabilisticSamplerUpdater) Update(sampler SamplerV2, strategy interface{}) (SamplerV2, error) {
226 type response interface {
227 GetProbabilisticSampling() *sampling.ProbabilisticSamplingStrategy
228 }
229 var _ response = new(sampling.SamplingStrategyResponse) // sanity signature check
230 if resp, ok := strategy.(response); ok {
231 if probabilistic := resp.GetProbabilisticSampling(); probabilistic != nil {
232 if ps, ok := sampler.(*ProbabilisticSampler); ok {
233 if err := ps.Update(probabilistic.SamplingRate); err != nil {
234 return nil, err
235 }
236 return sampler, nil
237 }
238 return newProbabilisticSampler(probabilistic.SamplingRate), nil
239 }
240 }
241 return nil, nil
242}
243
244// -----------------------
245
246// RateLimitingSamplerUpdater is used by RemotelyControlledSampler to parse sampling configuration.
247type RateLimitingSamplerUpdater struct{}
248
249// Update implements Update of SamplerUpdater.
250func (u *RateLimitingSamplerUpdater) Update(sampler SamplerV2, strategy interface{}) (SamplerV2, error) {
251 type response interface {
252 GetRateLimitingSampling() *sampling.RateLimitingSamplingStrategy
253 }
254 var _ response = new(sampling.SamplingStrategyResponse) // sanity signature check
255 if resp, ok := strategy.(response); ok {
256 if rateLimiting := resp.GetRateLimitingSampling(); rateLimiting != nil {
257 rateLimit := float64(rateLimiting.MaxTracesPerSecond)
258 if rl, ok := sampler.(*RateLimitingSampler); ok {
259 rl.Update(rateLimit)
260 return rl, nil
261 }
262 return NewRateLimitingSampler(rateLimit), nil
263 }
264 }
265 return nil, nil
266}
267
268// -----------------------
269
270// AdaptiveSamplerUpdater is used by RemotelyControlledSampler to parse sampling configuration.
271// Fields have the same meaning as in PerOperationSamplerParams.
272type AdaptiveSamplerUpdater struct {
273 MaxOperations int
274 OperationNameLateBinding bool
275}
276
277// Update implements Update of SamplerUpdater.
278func (u *AdaptiveSamplerUpdater) Update(sampler SamplerV2, strategy interface{}) (SamplerV2, error) {
279 type response interface {
280 GetOperationSampling() *sampling.PerOperationSamplingStrategies
281 }
282 var _ response = new(sampling.SamplingStrategyResponse) // sanity signature check
283 if p, ok := strategy.(response); ok {
284 if operations := p.GetOperationSampling(); operations != nil {
285 if as, ok := sampler.(*PerOperationSampler); ok {
286 as.update(operations)
287 return as, nil
288 }
289 return NewPerOperationSampler(PerOperationSamplerParams{
290 MaxOperations: u.MaxOperations,
291 OperationNameLateBinding: u.OperationNameLateBinding,
292 Strategies: operations,
293 }), nil
294 }
295 }
296 return nil, nil
297}
298
299// -----------------------
300
301type httpSamplingStrategyFetcher struct {
David K. Bainbridge06631892021-08-19 13:07:00 +0000302 serverURL string
303 logger log.DebugLogger
304 httpClient http.Client
305}
306
307func newHTTPSamplingStrategyFetcher(serverURL string, logger log.DebugLogger) *httpSamplingStrategyFetcher {
308 customTransport := http.DefaultTransport.(*http.Transport).Clone()
309 customTransport.ResponseHeaderTimeout = defaultRemoteSamplingTimeout
310
311 return &httpSamplingStrategyFetcher{
312 serverURL: serverURL,
313 logger: logger,
314 httpClient: http.Client{
315 Transport: customTransport,
316 },
317 }
Matteo Scandoloa4285862020-12-01 18:10:10 -0800318}
319
320func (f *httpSamplingStrategyFetcher) Fetch(serviceName string) ([]byte, error) {
321 v := url.Values{}
322 v.Set("service", serviceName)
323 uri := f.serverURL + "?" + v.Encode()
324
David K. Bainbridge06631892021-08-19 13:07:00 +0000325 resp, err := f.httpClient.Get(uri)
Matteo Scandoloa4285862020-12-01 18:10:10 -0800326 if err != nil {
327 return nil, err
328 }
329
330 defer func() {
331 if err := resp.Body.Close(); err != nil {
332 f.logger.Error(fmt.Sprintf("failed to close HTTP response body: %+v", err))
333 }
334 }()
335
336 body, err := ioutil.ReadAll(resp.Body)
337 if err != nil {
338 return nil, err
339 }
340
341 if resp.StatusCode >= 400 {
342 return nil, fmt.Errorf("StatusCode: %d, Body: %s", resp.StatusCode, body)
343 }
344
345 return body, nil
346}
347
348// -----------------------
349
350type samplingStrategyParser struct{}
351
352func (p *samplingStrategyParser) Parse(response []byte) (interface{}, error) {
353 strategy := new(sampling.SamplingStrategyResponse)
354 if err := json.Unmarshal(response, strategy); err != nil {
355 return nil, err
356 }
357 return strategy, nil
358}