blob: 20f434fe49567fc852621c9ba79de4bef8d86e9a [file] [log] [blame]
Girish Gowdra631ef3d2020-06-15 10:45:52 -07001// Copyright (c) 2018 The Jaeger Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package remote
16
17import (
18 "fmt"
19 "net/url"
20 "sync"
21 "sync/atomic"
22 "time"
23
24 "github.com/pkg/errors"
25
26 "github.com/uber/jaeger-client-go"
27 "github.com/uber/jaeger-client-go/utils"
28)
29
30const (
31 // minimumCredits is the minimum amount of credits necessary to not be throttled.
32 // i.e. if currentCredits > minimumCredits, then the operation will not be throttled.
33 minimumCredits = 1.0
34)
35
36var (
37 errorUUIDNotSet = errors.New("Throttler UUID must be set")
38)
39
40type operationBalance struct {
41 Operation string `json:"operation"`
42 Balance float64 `json:"balance"`
43}
44
45type creditResponse struct {
46 Balances []operationBalance `json:"balances"`
47}
48
49type httpCreditManagerProxy struct {
50 hostPort string
51}
52
53func newHTTPCreditManagerProxy(hostPort string) *httpCreditManagerProxy {
54 return &httpCreditManagerProxy{
55 hostPort: hostPort,
56 }
57}
58
59// N.B. Operations list must not be empty.
60func (m *httpCreditManagerProxy) FetchCredits(uuid, serviceName string, operations []string) (*creditResponse, error) {
61 params := url.Values{}
62 params.Set("service", serviceName)
63 params.Set("uuid", uuid)
64 for _, op := range operations {
65 params.Add("operations", op)
66 }
67 var resp creditResponse
68 if err := utils.GetJSON(fmt.Sprintf("http://%s/credits?%s", m.hostPort, params.Encode()), &resp); err != nil {
69 return nil, errors.Wrap(err, "Failed to receive credits from agent")
70 }
71 return &resp, nil
72}
73
74// Throttler retrieves credits from agent and uses it to throttle operations.
75type Throttler struct {
76 options
77
78 mux sync.RWMutex
79 service string
80 uuid atomic.Value
81 creditManager *httpCreditManagerProxy
82 credits map[string]float64 // map of operation->credits
83 close chan struct{}
84 stopped sync.WaitGroup
85}
86
87// NewThrottler returns a Throttler that polls agent for credits and uses them to throttle
88// the service.
89func NewThrottler(service string, options ...Option) *Throttler {
90 opts := applyOptions(options...)
91 creditManager := newHTTPCreditManagerProxy(opts.hostPort)
92 t := &Throttler{
93 options: opts,
94 creditManager: creditManager,
95 service: service,
96 credits: make(map[string]float64),
97 close: make(chan struct{}),
98 }
99 t.stopped.Add(1)
100 go t.pollManager()
101 return t
102}
103
104// IsAllowed implements Throttler#IsAllowed.
105func (t *Throttler) IsAllowed(operation string) bool {
106 t.mux.Lock()
107 defer t.mux.Unlock()
108 value, ok := t.credits[operation]
109 if !ok || value == 0 {
110 if !ok {
111 // NOTE: This appears to be a no-op at first glance, but it stores
112 // the operation key in the map. Necessary for functionality of
113 // Throttler#operations method.
114 t.credits[operation] = 0
115 }
116 if !t.synchronousInitialization {
117 t.metrics.ThrottledDebugSpans.Inc(1)
118 return false
119 }
120 // If it is the first time this operation is being checked, synchronously fetch
121 // the credits.
122 credits, err := t.fetchCredits([]string{operation})
123 if err != nil {
124 // Failed to receive credits from agent, try again next time
125 t.logger.Error("Failed to fetch credits: " + err.Error())
126 return false
127 }
128 if len(credits.Balances) == 0 {
129 // This shouldn't happen but just in case
130 return false
131 }
132 for _, opBalance := range credits.Balances {
133 t.credits[opBalance.Operation] += opBalance.Balance
134 }
135 }
136 return t.isAllowed(operation)
137}
138
139// Close stops the throttler from fetching credits from remote.
140func (t *Throttler) Close() error {
141 close(t.close)
142 t.stopped.Wait()
143 return nil
144}
145
146// SetProcess implements ProcessSetter#SetProcess. It's imperative that the UUID is set before any remote
147// requests are made.
148func (t *Throttler) SetProcess(process jaeger.Process) {
149 if process.UUID != "" {
150 t.uuid.Store(process.UUID)
151 }
152}
153
154// N.B. This function must be called with the Write Lock
155func (t *Throttler) isAllowed(operation string) bool {
156 credits := t.credits[operation]
157 if credits < minimumCredits {
158 t.metrics.ThrottledDebugSpans.Inc(1)
159 return false
160 }
161 t.credits[operation] = credits - minimumCredits
162 return true
163}
164
165func (t *Throttler) pollManager() {
166 defer t.stopped.Done()
167 ticker := time.NewTicker(t.refreshInterval)
168 defer ticker.Stop()
169 for {
170 select {
171 case <-ticker.C:
172 t.refreshCredits()
173 case <-t.close:
174 return
175 }
176 }
177}
178
179func (t *Throttler) operations() []string {
180 t.mux.RLock()
181 defer t.mux.RUnlock()
182 operations := make([]string, 0, len(t.credits))
183 for op := range t.credits {
184 operations = append(operations, op)
185 }
186 return operations
187}
188
189func (t *Throttler) refreshCredits() {
190 operations := t.operations()
191 if len(operations) == 0 {
192 return
193 }
194 newCredits, err := t.fetchCredits(operations)
195 if err != nil {
196 t.metrics.ThrottlerUpdateFailure.Inc(1)
197 t.logger.Error("Failed to fetch credits: " + err.Error())
198 return
199 }
200 t.metrics.ThrottlerUpdateSuccess.Inc(1)
201
202 t.mux.Lock()
203 defer t.mux.Unlock()
204 for _, opBalance := range newCredits.Balances {
205 t.credits[opBalance.Operation] += opBalance.Balance
206 }
207}
208
209func (t *Throttler) fetchCredits(operations []string) (*creditResponse, error) {
210 uuid := t.uuid.Load()
211 uuidStr, _ := uuid.(string)
212 if uuid == nil || uuidStr == "" {
213 return nil, errorUUIDNotSet
214 }
215 return t.creditManager.FetchCredits(uuidStr, t.service, operations)
216}