Naveen Sampath | 04696f7 | 2022-06-13 15:19:14 +0530 | [diff] [blame] | 1 | // Copyright (c) 2017 Uber Technologies, Inc. |
| 2 | // |
| 3 | // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | // you may not use this file except in compliance with the License. |
| 5 | // You may obtain a copy of the License at |
| 6 | // |
| 7 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | // |
| 9 | // Unless required by applicable law or agreed to in writing, software |
| 10 | // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | // See the License for the specific language governing permissions and |
| 13 | // limitations under the License. |
| 14 | |
| 15 | package jaeger |
| 16 | |
| 17 | import ( |
| 18 | "encoding/json" |
| 19 | "fmt" |
| 20 | "io/ioutil" |
| 21 | "net/http" |
| 22 | "net/url" |
| 23 | "sync" |
| 24 | "sync/atomic" |
| 25 | "time" |
| 26 | |
| 27 | "github.com/uber/jaeger-client-go/log" |
| 28 | "github.com/uber/jaeger-client-go/thrift-gen/sampling" |
| 29 | ) |
| 30 | |
| 31 | const ( |
| 32 | defaultRemoteSamplingTimeout = 10 * time.Second |
| 33 | defaultSamplingRefreshInterval = time.Minute |
| 34 | ) |
| 35 | |
| 36 | // SamplingStrategyFetcher is used to fetch sampling strategy updates from remote server. |
| 37 | type SamplingStrategyFetcher interface { |
| 38 | Fetch(service string) ([]byte, error) |
| 39 | } |
| 40 | |
| 41 | // SamplingStrategyParser is used to parse sampling strategy updates. The output object |
| 42 | // should be of the type that is recognized by the SamplerUpdaters. |
| 43 | type SamplingStrategyParser interface { |
| 44 | Parse(response []byte) (interface{}, error) |
| 45 | } |
| 46 | |
| 47 | // SamplerUpdater is used by RemotelyControlledSampler to apply sampling strategies, |
| 48 | // retrieved from remote config server, to the current sampler. The updater can modify |
| 49 | // the sampler in-place if sampler supports it, or create a new one. |
| 50 | // |
| 51 | // If the strategy does not contain configuration for the sampler in question, |
| 52 | // updater must return modifiedSampler=nil to give other updaters a chance to inspect |
| 53 | // the sampling strategy response. |
| 54 | // |
| 55 | // RemotelyControlledSampler invokes the updaters while holding a lock on the main sampler. |
| 56 | type SamplerUpdater interface { |
| 57 | Update(sampler SamplerV2, strategy interface{}) (modified SamplerV2, err error) |
| 58 | } |
| 59 | |
| 60 | // RemotelyControlledSampler is a delegating sampler that polls a remote server |
| 61 | // for the appropriate sampling strategy, constructs a corresponding sampler and |
| 62 | // delegates to it for sampling decisions. |
| 63 | type RemotelyControlledSampler struct { |
| 64 | // These fields must be first in the struct because `sync/atomic` expects 64-bit alignment. |
| 65 | // Cf. https://github.com/uber/jaeger-client-go/issues/155, https://goo.gl/zW7dgq |
| 66 | closed int64 // 0 - not closed, 1 - closed |
| 67 | |
| 68 | sync.RWMutex // used to serialize access to samplerOptions.sampler |
| 69 | samplerOptions |
| 70 | |
| 71 | serviceName string |
| 72 | doneChan chan *sync.WaitGroup |
| 73 | } |
| 74 | |
| 75 | // NewRemotelyControlledSampler creates a sampler that periodically pulls |
| 76 | // the sampling strategy from an HTTP sampling server (e.g. jaeger-agent). |
| 77 | func NewRemotelyControlledSampler( |
| 78 | serviceName string, |
| 79 | opts ...SamplerOption, |
| 80 | ) *RemotelyControlledSampler { |
| 81 | options := new(samplerOptions).applyOptionsAndDefaults(opts...) |
| 82 | sampler := &RemotelyControlledSampler{ |
| 83 | samplerOptions: *options, |
| 84 | serviceName: serviceName, |
| 85 | doneChan: make(chan *sync.WaitGroup), |
| 86 | } |
| 87 | go sampler.pollController() |
| 88 | return sampler |
| 89 | } |
| 90 | |
| 91 | // IsSampled implements IsSampled() of Sampler. |
| 92 | // TODO (breaking change) remove when Sampler V1 is removed |
| 93 | func (s *RemotelyControlledSampler) IsSampled(id TraceID, operation string) (bool, []Tag) { |
| 94 | return false, nil |
| 95 | } |
| 96 | |
| 97 | // OnCreateSpan implements OnCreateSpan of SamplerV2. |
| 98 | func (s *RemotelyControlledSampler) OnCreateSpan(span *Span) SamplingDecision { |
| 99 | s.RLock() |
| 100 | defer s.RUnlock() |
| 101 | return s.sampler.OnCreateSpan(span) |
| 102 | } |
| 103 | |
| 104 | // OnSetOperationName implements OnSetOperationName of SamplerV2. |
| 105 | func (s *RemotelyControlledSampler) OnSetOperationName(span *Span, operationName string) SamplingDecision { |
| 106 | s.RLock() |
| 107 | defer s.RUnlock() |
| 108 | return s.sampler.OnSetOperationName(span, operationName) |
| 109 | } |
| 110 | |
| 111 | // OnSetTag implements OnSetTag of SamplerV2. |
| 112 | func (s *RemotelyControlledSampler) OnSetTag(span *Span, key string, value interface{}) SamplingDecision { |
| 113 | s.RLock() |
| 114 | defer s.RUnlock() |
| 115 | return s.sampler.OnSetTag(span, key, value) |
| 116 | } |
| 117 | |
| 118 | // OnFinishSpan implements OnFinishSpan of SamplerV2. |
| 119 | func (s *RemotelyControlledSampler) OnFinishSpan(span *Span) SamplingDecision { |
| 120 | s.RLock() |
| 121 | defer s.RUnlock() |
| 122 | return s.sampler.OnFinishSpan(span) |
| 123 | } |
| 124 | |
| 125 | // Close implements Close() of Sampler. |
| 126 | func (s *RemotelyControlledSampler) Close() { |
| 127 | if swapped := atomic.CompareAndSwapInt64(&s.closed, 0, 1); !swapped { |
| 128 | s.logger.Error("Repeated attempt to close the sampler is ignored") |
| 129 | return |
| 130 | } |
| 131 | |
| 132 | var wg sync.WaitGroup |
| 133 | wg.Add(1) |
| 134 | s.doneChan <- &wg |
| 135 | wg.Wait() |
| 136 | } |
| 137 | |
| 138 | // Equal implements Equal() of Sampler. |
| 139 | func (s *RemotelyControlledSampler) Equal(other Sampler) bool { |
| 140 | // NB The Equal() function is expensive and will be removed. See PerOperationSampler.Equal() for |
| 141 | // more information. |
| 142 | return false |
| 143 | } |
| 144 | |
| 145 | func (s *RemotelyControlledSampler) pollController() { |
| 146 | ticker := time.NewTicker(s.samplingRefreshInterval) |
| 147 | defer ticker.Stop() |
| 148 | s.pollControllerWithTicker(ticker) |
| 149 | } |
| 150 | |
| 151 | func (s *RemotelyControlledSampler) pollControllerWithTicker(ticker *time.Ticker) { |
| 152 | for { |
| 153 | select { |
| 154 | case <-ticker.C: |
| 155 | s.UpdateSampler() |
| 156 | case wg := <-s.doneChan: |
| 157 | wg.Done() |
| 158 | return |
| 159 | } |
| 160 | } |
| 161 | } |
| 162 | |
| 163 | // Sampler returns the currently active sampler. |
| 164 | func (s *RemotelyControlledSampler) Sampler() SamplerV2 { |
| 165 | s.RLock() |
| 166 | defer s.RUnlock() |
| 167 | return s.sampler |
| 168 | } |
| 169 | func (s *RemotelyControlledSampler) setSampler(sampler SamplerV2) { |
| 170 | s.Lock() |
| 171 | defer s.Unlock() |
| 172 | s.sampler = sampler |
| 173 | } |
| 174 | |
| 175 | // UpdateSampler forces the sampler to fetch sampling strategy from backend server. |
| 176 | // This function is called automatically on a timer, but can also be safely called manually, e.g. from tests. |
| 177 | func (s *RemotelyControlledSampler) UpdateSampler() { |
| 178 | res, err := s.samplingFetcher.Fetch(s.serviceName) |
| 179 | if err != nil { |
| 180 | s.metrics.SamplerQueryFailure.Inc(1) |
| 181 | s.logger.Infof("failed to fetch sampling strategy: %v", err) |
| 182 | return |
| 183 | } |
| 184 | strategy, err := s.samplingParser.Parse(res) |
| 185 | if err != nil { |
| 186 | s.metrics.SamplerUpdateFailure.Inc(1) |
| 187 | s.logger.Infof("failed to parse sampling strategy response: %v", err) |
| 188 | return |
| 189 | } |
| 190 | |
| 191 | s.Lock() |
| 192 | defer s.Unlock() |
| 193 | |
| 194 | s.metrics.SamplerRetrieved.Inc(1) |
| 195 | if err := s.updateSamplerViaUpdaters(strategy); err != nil { |
| 196 | s.metrics.SamplerUpdateFailure.Inc(1) |
| 197 | s.logger.Infof("failed to handle sampling strategy response %+v. Got error: %v", res, err) |
| 198 | return |
| 199 | } |
| 200 | s.metrics.SamplerUpdated.Inc(1) |
| 201 | } |
| 202 | |
| 203 | // NB: this function should only be called while holding a Write lock |
| 204 | func (s *RemotelyControlledSampler) updateSamplerViaUpdaters(strategy interface{}) error { |
| 205 | for _, updater := range s.updaters { |
| 206 | sampler, err := updater.Update(s.sampler, strategy) |
| 207 | if err != nil { |
| 208 | return err |
| 209 | } |
| 210 | if sampler != nil { |
| 211 | s.logger.Debugf("sampler updated: %+v", sampler) |
| 212 | s.sampler = sampler |
| 213 | return nil |
| 214 | } |
| 215 | } |
| 216 | return fmt.Errorf("unsupported sampling strategy %+v", strategy) |
| 217 | } |
| 218 | |
| 219 | // ----------------------- |
| 220 | |
| 221 | // ProbabilisticSamplerUpdater is used by RemotelyControlledSampler to parse sampling configuration. |
| 222 | type ProbabilisticSamplerUpdater struct{} |
| 223 | |
| 224 | // Update implements Update of SamplerUpdater. |
| 225 | func (u *ProbabilisticSamplerUpdater) Update(sampler SamplerV2, strategy interface{}) (SamplerV2, error) { |
| 226 | type response interface { |
| 227 | GetProbabilisticSampling() *sampling.ProbabilisticSamplingStrategy |
| 228 | } |
| 229 | var _ response = new(sampling.SamplingStrategyResponse) // sanity signature check |
| 230 | if resp, ok := strategy.(response); ok { |
| 231 | if probabilistic := resp.GetProbabilisticSampling(); probabilistic != nil { |
| 232 | if ps, ok := sampler.(*ProbabilisticSampler); ok { |
| 233 | if err := ps.Update(probabilistic.SamplingRate); err != nil { |
| 234 | return nil, err |
| 235 | } |
| 236 | return sampler, nil |
| 237 | } |
| 238 | return newProbabilisticSampler(probabilistic.SamplingRate), nil |
| 239 | } |
| 240 | } |
| 241 | return nil, nil |
| 242 | } |
| 243 | |
| 244 | // ----------------------- |
| 245 | |
| 246 | // RateLimitingSamplerUpdater is used by RemotelyControlledSampler to parse sampling configuration. |
| 247 | type RateLimitingSamplerUpdater struct{} |
| 248 | |
| 249 | // Update implements Update of SamplerUpdater. |
| 250 | func (u *RateLimitingSamplerUpdater) Update(sampler SamplerV2, strategy interface{}) (SamplerV2, error) { |
| 251 | type response interface { |
| 252 | GetRateLimitingSampling() *sampling.RateLimitingSamplingStrategy |
| 253 | } |
| 254 | var _ response = new(sampling.SamplingStrategyResponse) // sanity signature check |
| 255 | if resp, ok := strategy.(response); ok { |
| 256 | if rateLimiting := resp.GetRateLimitingSampling(); rateLimiting != nil { |
| 257 | rateLimit := float64(rateLimiting.MaxTracesPerSecond) |
| 258 | if rl, ok := sampler.(*RateLimitingSampler); ok { |
| 259 | rl.Update(rateLimit) |
| 260 | return rl, nil |
| 261 | } |
| 262 | return NewRateLimitingSampler(rateLimit), nil |
| 263 | } |
| 264 | } |
| 265 | return nil, nil |
| 266 | } |
| 267 | |
| 268 | // ----------------------- |
| 269 | |
| 270 | // AdaptiveSamplerUpdater is used by RemotelyControlledSampler to parse sampling configuration. |
| 271 | // Fields have the same meaning as in PerOperationSamplerParams. |
| 272 | type AdaptiveSamplerUpdater struct { |
| 273 | MaxOperations int |
| 274 | OperationNameLateBinding bool |
| 275 | } |
| 276 | |
| 277 | // Update implements Update of SamplerUpdater. |
| 278 | func (u *AdaptiveSamplerUpdater) Update(sampler SamplerV2, strategy interface{}) (SamplerV2, error) { |
| 279 | type response interface { |
| 280 | GetOperationSampling() *sampling.PerOperationSamplingStrategies |
| 281 | } |
| 282 | var _ response = new(sampling.SamplingStrategyResponse) // sanity signature check |
| 283 | if p, ok := strategy.(response); ok { |
| 284 | if operations := p.GetOperationSampling(); operations != nil { |
| 285 | if as, ok := sampler.(*PerOperationSampler); ok { |
| 286 | as.update(operations) |
| 287 | return as, nil |
| 288 | } |
| 289 | return NewPerOperationSampler(PerOperationSamplerParams{ |
| 290 | MaxOperations: u.MaxOperations, |
| 291 | OperationNameLateBinding: u.OperationNameLateBinding, |
| 292 | Strategies: operations, |
| 293 | }), nil |
| 294 | } |
| 295 | } |
| 296 | return nil, nil |
| 297 | } |
| 298 | |
| 299 | // ----------------------- |
| 300 | |
| 301 | type httpSamplingStrategyFetcher struct { |
| 302 | serverURL string |
| 303 | logger log.DebugLogger |
| 304 | httpClient http.Client |
| 305 | } |
| 306 | |
| 307 | func newHTTPSamplingStrategyFetcher(serverURL string, logger log.DebugLogger) *httpSamplingStrategyFetcher { |
| 308 | customTransport := http.DefaultTransport.(*http.Transport).Clone() |
| 309 | customTransport.ResponseHeaderTimeout = defaultRemoteSamplingTimeout |
| 310 | |
| 311 | return &httpSamplingStrategyFetcher{ |
| 312 | serverURL: serverURL, |
| 313 | logger: logger, |
| 314 | httpClient: http.Client{ |
| 315 | Transport: customTransport, |
| 316 | }, |
| 317 | } |
| 318 | } |
| 319 | |
| 320 | func (f *httpSamplingStrategyFetcher) Fetch(serviceName string) ([]byte, error) { |
| 321 | v := url.Values{} |
| 322 | v.Set("service", serviceName) |
| 323 | uri := f.serverURL + "?" + v.Encode() |
| 324 | |
| 325 | resp, err := f.httpClient.Get(uri) |
| 326 | if err != nil { |
| 327 | return nil, err |
| 328 | } |
| 329 | |
| 330 | defer func() { |
| 331 | if err := resp.Body.Close(); err != nil { |
| 332 | f.logger.Error(fmt.Sprintf("failed to close HTTP response body: %+v", err)) |
| 333 | } |
| 334 | }() |
| 335 | |
| 336 | body, err := ioutil.ReadAll(resp.Body) |
| 337 | if err != nil { |
| 338 | return nil, err |
| 339 | } |
| 340 | |
| 341 | if resp.StatusCode >= 400 { |
| 342 | return nil, fmt.Errorf("StatusCode: %d, Body: %s", resp.StatusCode, body) |
| 343 | } |
| 344 | |
| 345 | return body, nil |
| 346 | } |
| 347 | |
| 348 | // ----------------------- |
| 349 | |
| 350 | type samplingStrategyParser struct{} |
| 351 | |
| 352 | func (p *samplingStrategyParser) Parse(response []byte) (interface{}, error) { |
| 353 | strategy := new(sampling.SamplingStrategyResponse) |
| 354 | if err := json.Unmarshal(response, strategy); err != nil { |
| 355 | return nil, err |
| 356 | } |
| 357 | return strategy, nil |
| 358 | } |