blob: 112e3e1cb3ee660463671eab0e8a86153b10a4d1 [file] [log] [blame]
Rohan Agrawalc32d9932020-06-15 11:01:47 +00001// Copyright (c) 2017 Uber Technologies, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package jaeger
16
17import (
18 "encoding/json"
19 "fmt"
20 "io/ioutil"
21 "net/http"
22 "net/url"
23 "sync"
24 "sync/atomic"
25 "time"
26
27 "github.com/uber/jaeger-client-go/log"
28 "github.com/uber/jaeger-client-go/thrift-gen/sampling"
29)
30
31const (
32 defaultSamplingRefreshInterval = time.Minute
33)
34
35// SamplingStrategyFetcher is used to fetch sampling strategy updates from remote server.
36type SamplingStrategyFetcher interface {
37 Fetch(service string) ([]byte, error)
38}
39
40// SamplingStrategyParser is used to parse sampling strategy updates. The output object
41// should be of the type that is recognized by the SamplerUpdaters.
42type SamplingStrategyParser interface {
43 Parse(response []byte) (interface{}, error)
44}
45
46// SamplerUpdater is used by RemotelyControlledSampler to apply sampling strategies,
47// retrieved from remote config server, to the current sampler. The updater can modify
48// the sampler in-place if sampler supports it, or create a new one.
49//
50// If the strategy does not contain configuration for the sampler in question,
51// updater must return modifiedSampler=nil to give other updaters a chance to inspect
52// the sampling strategy response.
53//
54// RemotelyControlledSampler invokes the updaters while holding a lock on the main sampler.
55type SamplerUpdater interface {
56 Update(sampler SamplerV2, strategy interface{}) (modified SamplerV2, err error)
57}
58
59// RemotelyControlledSampler is a delegating sampler that polls a remote server
60// for the appropriate sampling strategy, constructs a corresponding sampler and
61// delegates to it for sampling decisions.
62type RemotelyControlledSampler struct {
63 // These fields must be first in the struct because `sync/atomic` expects 64-bit alignment.
64 // Cf. https://github.com/uber/jaeger-client-go/issues/155, https://goo.gl/zW7dgq
65 closed int64 // 0 - not closed, 1 - closed
66
67 sync.RWMutex
68 samplerOptions
69
70 serviceName string
71 doneChan chan *sync.WaitGroup
72}
73
74// NewRemotelyControlledSampler creates a sampler that periodically pulls
75// the sampling strategy from an HTTP sampling server (e.g. jaeger-agent).
76func NewRemotelyControlledSampler(
77 serviceName string,
78 opts ...SamplerOption,
79) *RemotelyControlledSampler {
80 options := new(samplerOptions).applyOptionsAndDefaults(opts...)
81 sampler := &RemotelyControlledSampler{
82 samplerOptions: *options,
83 serviceName: serviceName,
84 doneChan: make(chan *sync.WaitGroup),
85 }
86 go sampler.pollController()
87 return sampler
88}
89
90// IsSampled implements IsSampled() of Sampler.
91// TODO (breaking change) remove when Sampler V1 is removed
92func (s *RemotelyControlledSampler) IsSampled(id TraceID, operation string) (bool, []Tag) {
93 return false, nil
94}
95
96// OnCreateSpan implements OnCreateSpan of SamplerV2.
97func (s *RemotelyControlledSampler) OnCreateSpan(span *Span) SamplingDecision {
98 return s.sampler.OnCreateSpan(span)
99}
100
101// OnSetOperationName implements OnSetOperationName of SamplerV2.
102func (s *RemotelyControlledSampler) OnSetOperationName(span *Span, operationName string) SamplingDecision {
103 return s.sampler.OnSetOperationName(span, operationName)
104}
105
106// OnSetTag implements OnSetTag of SamplerV2.
107func (s *RemotelyControlledSampler) OnSetTag(span *Span, key string, value interface{}) SamplingDecision {
108 return s.sampler.OnSetTag(span, key, value)
109}
110
111// OnFinishSpan implements OnFinishSpan of SamplerV2.
112func (s *RemotelyControlledSampler) OnFinishSpan(span *Span) SamplingDecision {
113 return s.sampler.OnFinishSpan(span)
114}
115
116// Close implements Close() of Sampler.
117func (s *RemotelyControlledSampler) Close() {
118 if swapped := atomic.CompareAndSwapInt64(&s.closed, 0, 1); !swapped {
119 s.logger.Error("Repeated attempt to close the sampler is ignored")
120 return
121 }
122
123 var wg sync.WaitGroup
124 wg.Add(1)
125 s.doneChan <- &wg
126 wg.Wait()
127}
128
129// Equal implements Equal() of Sampler.
130func (s *RemotelyControlledSampler) Equal(other Sampler) bool {
131 // NB The Equal() function is expensive and will be removed. See PerOperationSampler.Equal() for
132 // more information.
133 return false
134}
135
136func (s *RemotelyControlledSampler) pollController() {
137 ticker := time.NewTicker(s.samplingRefreshInterval)
138 defer ticker.Stop()
139 s.pollControllerWithTicker(ticker)
140}
141
142func (s *RemotelyControlledSampler) pollControllerWithTicker(ticker *time.Ticker) {
143 for {
144 select {
145 case <-ticker.C:
146 s.UpdateSampler()
147 case wg := <-s.doneChan:
148 wg.Done()
149 return
150 }
151 }
152}
153
154// Sampler returns the currently active sampler.
155func (s *RemotelyControlledSampler) Sampler() SamplerV2 {
156 s.Lock()
157 defer s.Unlock()
158 return s.sampler
159}
160
161func (s *RemotelyControlledSampler) setSampler(sampler SamplerV2) {
162 s.Lock()
163 defer s.Unlock()
164 s.sampler = sampler
165}
166
167// UpdateSampler forces the sampler to fetch sampling strategy from backend server.
168// This function is called automatically on a timer, but can also be safely called manually, e.g. from tests.
169func (s *RemotelyControlledSampler) UpdateSampler() {
170 res, err := s.samplingFetcher.Fetch(s.serviceName)
171 if err != nil {
172 s.metrics.SamplerQueryFailure.Inc(1)
173 s.logger.Infof("failed to fetch sampling strategy: %v", err)
174 return
175 }
176 strategy, err := s.samplingParser.Parse(res)
177 if err != nil {
178 s.metrics.SamplerUpdateFailure.Inc(1)
179 s.logger.Infof("failed to parse sampling strategy response: %v", err)
180 return
181 }
182
183 s.Lock()
184 defer s.Unlock()
185
186 s.metrics.SamplerRetrieved.Inc(1)
187 if err := s.updateSamplerViaUpdaters(strategy); err != nil {
188 s.metrics.SamplerUpdateFailure.Inc(1)
189 s.logger.Infof("failed to handle sampling strategy response %+v. Got error: %v", res, err)
190 return
191 }
192 s.metrics.SamplerUpdated.Inc(1)
193}
194
195// NB: this function should only be called while holding a Write lock
196func (s *RemotelyControlledSampler) updateSamplerViaUpdaters(strategy interface{}) error {
197 for _, updater := range s.updaters {
198 sampler, err := updater.Update(s.sampler, strategy)
199 if err != nil {
200 return err
201 }
202 if sampler != nil {
203 s.logger.Debugf("sampler updated: %+v", sampler)
204 s.sampler = sampler
205 return nil
206 }
207 }
208 return fmt.Errorf("unsupported sampling strategy %+v", strategy)
209}
210
211// -----------------------
212
213// ProbabilisticSamplerUpdater is used by RemotelyControlledSampler to parse sampling configuration.
214type ProbabilisticSamplerUpdater struct{}
215
216// Update implements Update of SamplerUpdater.
217func (u *ProbabilisticSamplerUpdater) Update(sampler SamplerV2, strategy interface{}) (SamplerV2, error) {
218 type response interface {
219 GetProbabilisticSampling() *sampling.ProbabilisticSamplingStrategy
220 }
221 var _ response = new(sampling.SamplingStrategyResponse) // sanity signature check
222 if resp, ok := strategy.(response); ok {
223 if probabilistic := resp.GetProbabilisticSampling(); probabilistic != nil {
224 if ps, ok := sampler.(*ProbabilisticSampler); ok {
225 if err := ps.Update(probabilistic.SamplingRate); err != nil {
226 return nil, err
227 }
228 return sampler, nil
229 }
230 return newProbabilisticSampler(probabilistic.SamplingRate), nil
231 }
232 }
233 return nil, nil
234}
235
236// -----------------------
237
238// RateLimitingSamplerUpdater is used by RemotelyControlledSampler to parse sampling configuration.
239type RateLimitingSamplerUpdater struct{}
240
241// Update implements Update of SamplerUpdater.
242func (u *RateLimitingSamplerUpdater) Update(sampler SamplerV2, strategy interface{}) (SamplerV2, error) {
243 type response interface {
244 GetRateLimitingSampling() *sampling.RateLimitingSamplingStrategy
245 }
246 var _ response = new(sampling.SamplingStrategyResponse) // sanity signature check
247 if resp, ok := strategy.(response); ok {
248 if rateLimiting := resp.GetRateLimitingSampling(); rateLimiting != nil {
249 rateLimit := float64(rateLimiting.MaxTracesPerSecond)
250 if rl, ok := sampler.(*RateLimitingSampler); ok {
251 rl.Update(rateLimit)
252 return rl, nil
253 }
254 return NewRateLimitingSampler(rateLimit), nil
255 }
256 }
257 return nil, nil
258}
259
260// -----------------------
261
262// AdaptiveSamplerUpdater is used by RemotelyControlledSampler to parse sampling configuration.
263// Fields have the same meaning as in PerOperationSamplerParams.
264type AdaptiveSamplerUpdater struct {
265 MaxOperations int
266 OperationNameLateBinding bool
267}
268
269// Update implements Update of SamplerUpdater.
270func (u *AdaptiveSamplerUpdater) Update(sampler SamplerV2, strategy interface{}) (SamplerV2, error) {
271 type response interface {
272 GetOperationSampling() *sampling.PerOperationSamplingStrategies
273 }
274 var _ response = new(sampling.SamplingStrategyResponse) // sanity signature check
275 if p, ok := strategy.(response); ok {
276 if operations := p.GetOperationSampling(); operations != nil {
277 if as, ok := sampler.(*PerOperationSampler); ok {
278 as.update(operations)
279 return as, nil
280 }
281 return NewPerOperationSampler(PerOperationSamplerParams{
282 MaxOperations: u.MaxOperations,
283 OperationNameLateBinding: u.OperationNameLateBinding,
284 Strategies: operations,
285 }), nil
286 }
287 }
288 return nil, nil
289}
290
291// -----------------------
292
293type httpSamplingStrategyFetcher struct {
294 serverURL string
295 logger log.DebugLogger
296}
297
298func (f *httpSamplingStrategyFetcher) Fetch(serviceName string) ([]byte, error) {
299 v := url.Values{}
300 v.Set("service", serviceName)
301 uri := f.serverURL + "?" + v.Encode()
302
303 // TODO create and reuse http.Client with proper timeout settings, etc.
304 resp, err := http.Get(uri)
305 if err != nil {
306 return nil, err
307 }
308
309 defer func() {
310 if err := resp.Body.Close(); err != nil {
311 f.logger.Error(fmt.Sprintf("failed to close HTTP response body: %+v", err))
312 }
313 }()
314
315 body, err := ioutil.ReadAll(resp.Body)
316 if err != nil {
317 return nil, err
318 }
319
320 if resp.StatusCode >= 400 {
321 return nil, fmt.Errorf("StatusCode: %d, Body: %s", resp.StatusCode, body)
322 }
323
324 return body, nil
325}
326
327// -----------------------
328
329type samplingStrategyParser struct{}
330
331func (p *samplingStrategyParser) Parse(response []byte) (interface{}, error) {
332 strategy := new(sampling.SamplingStrategyResponse)
333 if err := json.Unmarshal(response, strategy); err != nil {
334 return nil, err
335 }
336 return strategy, nil
337}