Girish Kumar | e48ec8a | 2020-08-18 12:28:51 +0000 | [diff] [blame] | 1 | // Copyright (c) 2017 Uber Technologies, Inc. |
| 2 | // |
| 3 | // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | // you may not use this file except in compliance with the License. |
| 5 | // You may obtain a copy of the License at |
| 6 | // |
| 7 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | // |
| 9 | // Unless required by applicable law or agreed to in writing, software |
| 10 | // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | // See the License for the specific language governing permissions and |
| 13 | // limitations under the License. |
| 14 | |
| 15 | package jaeger |
| 16 | |
| 17 | import ( |
| 18 | "encoding/json" |
| 19 | "fmt" |
| 20 | "io/ioutil" |
| 21 | "net/http" |
| 22 | "net/url" |
| 23 | "sync" |
| 24 | "sync/atomic" |
| 25 | "time" |
| 26 | |
| 27 | "github.com/uber/jaeger-client-go/log" |
| 28 | "github.com/uber/jaeger-client-go/thrift-gen/sampling" |
| 29 | ) |
| 30 | |
| 31 | const ( |
| 32 | defaultSamplingRefreshInterval = time.Minute |
| 33 | ) |
| 34 | |
| 35 | // SamplingStrategyFetcher is used to fetch sampling strategy updates from remote server. |
| 36 | type SamplingStrategyFetcher interface { |
| 37 | Fetch(service string) ([]byte, error) |
| 38 | } |
| 39 | |
| 40 | // SamplingStrategyParser is used to parse sampling strategy updates. The output object |
| 41 | // should be of the type that is recognized by the SamplerUpdaters. |
| 42 | type SamplingStrategyParser interface { |
| 43 | Parse(response []byte) (interface{}, error) |
| 44 | } |
| 45 | |
| 46 | // SamplerUpdater is used by RemotelyControlledSampler to apply sampling strategies, |
| 47 | // retrieved from remote config server, to the current sampler. The updater can modify |
| 48 | // the sampler in-place if sampler supports it, or create a new one. |
| 49 | // |
| 50 | // If the strategy does not contain configuration for the sampler in question, |
| 51 | // updater must return modifiedSampler=nil to give other updaters a chance to inspect |
| 52 | // the sampling strategy response. |
| 53 | // |
| 54 | // RemotelyControlledSampler invokes the updaters while holding a lock on the main sampler. |
| 55 | type SamplerUpdater interface { |
| 56 | Update(sampler SamplerV2, strategy interface{}) (modified SamplerV2, err error) |
| 57 | } |
| 58 | |
| 59 | // RemotelyControlledSampler is a delegating sampler that polls a remote server |
| 60 | // for the appropriate sampling strategy, constructs a corresponding sampler and |
| 61 | // delegates to it for sampling decisions. |
| 62 | type RemotelyControlledSampler struct { |
| 63 | // These fields must be first in the struct because `sync/atomic` expects 64-bit alignment. |
| 64 | // Cf. https://github.com/uber/jaeger-client-go/issues/155, https://goo.gl/zW7dgq |
| 65 | closed int64 // 0 - not closed, 1 - closed |
| 66 | |
| 67 | sync.RWMutex |
| 68 | samplerOptions |
| 69 | |
| 70 | serviceName string |
| 71 | doneChan chan *sync.WaitGroup |
| 72 | } |
| 73 | |
| 74 | // NewRemotelyControlledSampler creates a sampler that periodically pulls |
| 75 | // the sampling strategy from an HTTP sampling server (e.g. jaeger-agent). |
| 76 | func NewRemotelyControlledSampler( |
| 77 | serviceName string, |
| 78 | opts ...SamplerOption, |
| 79 | ) *RemotelyControlledSampler { |
| 80 | options := new(samplerOptions).applyOptionsAndDefaults(opts...) |
| 81 | sampler := &RemotelyControlledSampler{ |
| 82 | samplerOptions: *options, |
| 83 | serviceName: serviceName, |
| 84 | doneChan: make(chan *sync.WaitGroup), |
| 85 | } |
| 86 | go sampler.pollController() |
| 87 | return sampler |
| 88 | } |
| 89 | |
| 90 | // IsSampled implements IsSampled() of Sampler. |
| 91 | // TODO (breaking change) remove when Sampler V1 is removed |
| 92 | func (s *RemotelyControlledSampler) IsSampled(id TraceID, operation string) (bool, []Tag) { |
| 93 | return false, nil |
| 94 | } |
| 95 | |
| 96 | // OnCreateSpan implements OnCreateSpan of SamplerV2. |
| 97 | func (s *RemotelyControlledSampler) OnCreateSpan(span *Span) SamplingDecision { |
| 98 | return s.sampler.OnCreateSpan(span) |
| 99 | } |
| 100 | |
| 101 | // OnSetOperationName implements OnSetOperationName of SamplerV2. |
| 102 | func (s *RemotelyControlledSampler) OnSetOperationName(span *Span, operationName string) SamplingDecision { |
| 103 | return s.sampler.OnSetOperationName(span, operationName) |
| 104 | } |
| 105 | |
| 106 | // OnSetTag implements OnSetTag of SamplerV2. |
| 107 | func (s *RemotelyControlledSampler) OnSetTag(span *Span, key string, value interface{}) SamplingDecision { |
| 108 | return s.sampler.OnSetTag(span, key, value) |
| 109 | } |
| 110 | |
| 111 | // OnFinishSpan implements OnFinishSpan of SamplerV2. |
| 112 | func (s *RemotelyControlledSampler) OnFinishSpan(span *Span) SamplingDecision { |
| 113 | return s.sampler.OnFinishSpan(span) |
| 114 | } |
| 115 | |
| 116 | // Close implements Close() of Sampler. |
| 117 | func (s *RemotelyControlledSampler) Close() { |
| 118 | if swapped := atomic.CompareAndSwapInt64(&s.closed, 0, 1); !swapped { |
| 119 | s.logger.Error("Repeated attempt to close the sampler is ignored") |
| 120 | return |
| 121 | } |
| 122 | |
| 123 | var wg sync.WaitGroup |
| 124 | wg.Add(1) |
| 125 | s.doneChan <- &wg |
| 126 | wg.Wait() |
| 127 | } |
| 128 | |
| 129 | // Equal implements Equal() of Sampler. |
| 130 | func (s *RemotelyControlledSampler) Equal(other Sampler) bool { |
| 131 | // NB The Equal() function is expensive and will be removed. See PerOperationSampler.Equal() for |
| 132 | // more information. |
| 133 | return false |
| 134 | } |
| 135 | |
| 136 | func (s *RemotelyControlledSampler) pollController() { |
| 137 | ticker := time.NewTicker(s.samplingRefreshInterval) |
| 138 | defer ticker.Stop() |
| 139 | s.pollControllerWithTicker(ticker) |
| 140 | } |
| 141 | |
| 142 | func (s *RemotelyControlledSampler) pollControllerWithTicker(ticker *time.Ticker) { |
| 143 | for { |
| 144 | select { |
| 145 | case <-ticker.C: |
| 146 | s.UpdateSampler() |
| 147 | case wg := <-s.doneChan: |
| 148 | wg.Done() |
| 149 | return |
| 150 | } |
| 151 | } |
| 152 | } |
| 153 | |
| 154 | // Sampler returns the currently active sampler. |
| 155 | func (s *RemotelyControlledSampler) Sampler() SamplerV2 { |
| 156 | s.Lock() |
| 157 | defer s.Unlock() |
| 158 | return s.sampler |
| 159 | } |
| 160 | |
| 161 | func (s *RemotelyControlledSampler) setSampler(sampler SamplerV2) { |
| 162 | s.Lock() |
| 163 | defer s.Unlock() |
| 164 | s.sampler = sampler |
| 165 | } |
| 166 | |
| 167 | // UpdateSampler forces the sampler to fetch sampling strategy from backend server. |
| 168 | // This function is called automatically on a timer, but can also be safely called manually, e.g. from tests. |
| 169 | func (s *RemotelyControlledSampler) UpdateSampler() { |
| 170 | res, err := s.samplingFetcher.Fetch(s.serviceName) |
| 171 | if err != nil { |
| 172 | s.metrics.SamplerQueryFailure.Inc(1) |
| 173 | s.logger.Infof("failed to fetch sampling strategy: %v", err) |
| 174 | return |
| 175 | } |
| 176 | strategy, err := s.samplingParser.Parse(res) |
| 177 | if err != nil { |
| 178 | s.metrics.SamplerUpdateFailure.Inc(1) |
| 179 | s.logger.Infof("failed to parse sampling strategy response: %v", err) |
| 180 | return |
| 181 | } |
| 182 | |
| 183 | s.Lock() |
| 184 | defer s.Unlock() |
| 185 | |
| 186 | s.metrics.SamplerRetrieved.Inc(1) |
| 187 | if err := s.updateSamplerViaUpdaters(strategy); err != nil { |
| 188 | s.metrics.SamplerUpdateFailure.Inc(1) |
| 189 | s.logger.Infof("failed to handle sampling strategy response %+v. Got error: %v", res, err) |
| 190 | return |
| 191 | } |
| 192 | s.metrics.SamplerUpdated.Inc(1) |
| 193 | } |
| 194 | |
| 195 | // NB: this function should only be called while holding a Write lock |
| 196 | func (s *RemotelyControlledSampler) updateSamplerViaUpdaters(strategy interface{}) error { |
| 197 | for _, updater := range s.updaters { |
| 198 | sampler, err := updater.Update(s.sampler, strategy) |
| 199 | if err != nil { |
| 200 | return err |
| 201 | } |
| 202 | if sampler != nil { |
| 203 | s.logger.Debugf("sampler updated: %+v", sampler) |
| 204 | s.sampler = sampler |
| 205 | return nil |
| 206 | } |
| 207 | } |
| 208 | return fmt.Errorf("unsupported sampling strategy %+v", strategy) |
| 209 | } |
| 210 | |
| 211 | // ----------------------- |
| 212 | |
| 213 | // ProbabilisticSamplerUpdater is used by RemotelyControlledSampler to parse sampling configuration. |
| 214 | type ProbabilisticSamplerUpdater struct{} |
| 215 | |
| 216 | // Update implements Update of SamplerUpdater. |
| 217 | func (u *ProbabilisticSamplerUpdater) Update(sampler SamplerV2, strategy interface{}) (SamplerV2, error) { |
| 218 | type response interface { |
| 219 | GetProbabilisticSampling() *sampling.ProbabilisticSamplingStrategy |
| 220 | } |
| 221 | var _ response = new(sampling.SamplingStrategyResponse) // sanity signature check |
| 222 | if resp, ok := strategy.(response); ok { |
| 223 | if probabilistic := resp.GetProbabilisticSampling(); probabilistic != nil { |
| 224 | if ps, ok := sampler.(*ProbabilisticSampler); ok { |
| 225 | if err := ps.Update(probabilistic.SamplingRate); err != nil { |
| 226 | return nil, err |
| 227 | } |
| 228 | return sampler, nil |
| 229 | } |
| 230 | return newProbabilisticSampler(probabilistic.SamplingRate), nil |
| 231 | } |
| 232 | } |
| 233 | return nil, nil |
| 234 | } |
| 235 | |
| 236 | // ----------------------- |
| 237 | |
| 238 | // RateLimitingSamplerUpdater is used by RemotelyControlledSampler to parse sampling configuration. |
| 239 | type RateLimitingSamplerUpdater struct{} |
| 240 | |
| 241 | // Update implements Update of SamplerUpdater. |
| 242 | func (u *RateLimitingSamplerUpdater) Update(sampler SamplerV2, strategy interface{}) (SamplerV2, error) { |
| 243 | type response interface { |
| 244 | GetRateLimitingSampling() *sampling.RateLimitingSamplingStrategy |
| 245 | } |
| 246 | var _ response = new(sampling.SamplingStrategyResponse) // sanity signature check |
| 247 | if resp, ok := strategy.(response); ok { |
| 248 | if rateLimiting := resp.GetRateLimitingSampling(); rateLimiting != nil { |
| 249 | rateLimit := float64(rateLimiting.MaxTracesPerSecond) |
| 250 | if rl, ok := sampler.(*RateLimitingSampler); ok { |
| 251 | rl.Update(rateLimit) |
| 252 | return rl, nil |
| 253 | } |
| 254 | return NewRateLimitingSampler(rateLimit), nil |
| 255 | } |
| 256 | } |
| 257 | return nil, nil |
| 258 | } |
| 259 | |
| 260 | // ----------------------- |
| 261 | |
| 262 | // AdaptiveSamplerUpdater is used by RemotelyControlledSampler to parse sampling configuration. |
| 263 | // Fields have the same meaning as in PerOperationSamplerParams. |
| 264 | type AdaptiveSamplerUpdater struct { |
| 265 | MaxOperations int |
| 266 | OperationNameLateBinding bool |
| 267 | } |
| 268 | |
| 269 | // Update implements Update of SamplerUpdater. |
| 270 | func (u *AdaptiveSamplerUpdater) Update(sampler SamplerV2, strategy interface{}) (SamplerV2, error) { |
| 271 | type response interface { |
| 272 | GetOperationSampling() *sampling.PerOperationSamplingStrategies |
| 273 | } |
| 274 | var _ response = new(sampling.SamplingStrategyResponse) // sanity signature check |
| 275 | if p, ok := strategy.(response); ok { |
| 276 | if operations := p.GetOperationSampling(); operations != nil { |
| 277 | if as, ok := sampler.(*PerOperationSampler); ok { |
| 278 | as.update(operations) |
| 279 | return as, nil |
| 280 | } |
| 281 | return NewPerOperationSampler(PerOperationSamplerParams{ |
| 282 | MaxOperations: u.MaxOperations, |
| 283 | OperationNameLateBinding: u.OperationNameLateBinding, |
| 284 | Strategies: operations, |
| 285 | }), nil |
| 286 | } |
| 287 | } |
| 288 | return nil, nil |
| 289 | } |
| 290 | |
| 291 | // ----------------------- |
| 292 | |
| 293 | type httpSamplingStrategyFetcher struct { |
| 294 | serverURL string |
| 295 | logger log.DebugLogger |
| 296 | } |
| 297 | |
| 298 | func (f *httpSamplingStrategyFetcher) Fetch(serviceName string) ([]byte, error) { |
| 299 | v := url.Values{} |
| 300 | v.Set("service", serviceName) |
| 301 | uri := f.serverURL + "?" + v.Encode() |
| 302 | |
| 303 | // TODO create and reuse http.Client with proper timeout settings, etc. |
| 304 | resp, err := http.Get(uri) |
| 305 | if err != nil { |
| 306 | return nil, err |
| 307 | } |
| 308 | |
| 309 | defer func() { |
| 310 | if err := resp.Body.Close(); err != nil { |
| 311 | f.logger.Error(fmt.Sprintf("failed to close HTTP response body: %+v", err)) |
| 312 | } |
| 313 | }() |
| 314 | |
| 315 | body, err := ioutil.ReadAll(resp.Body) |
| 316 | if err != nil { |
| 317 | return nil, err |
| 318 | } |
| 319 | |
| 320 | if resp.StatusCode >= 400 { |
| 321 | return nil, fmt.Errorf("StatusCode: %d, Body: %s", resp.StatusCode, body) |
| 322 | } |
| 323 | |
| 324 | return body, nil |
| 325 | } |
| 326 | |
| 327 | // ----------------------- |
| 328 | |
| 329 | type samplingStrategyParser struct{} |
| 330 | |
| 331 | func (p *samplingStrategyParser) Parse(response []byte) (interface{}, error) { |
| 332 | strategy := new(sampling.SamplingStrategyResponse) |
| 333 | if err := json.Unmarshal(response, strategy); err != nil { |
| 334 | return nil, err |
| 335 | } |
| 336 | return strategy, nil |
| 337 | } |