blob: 20f434fe49567fc852621c9ba79de4bef8d86e9a [file] [log] [blame]
// Copyright (c) 2018 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package remote
import (
"fmt"
"net/url"
"sync"
"sync/atomic"
"time"
"github.com/pkg/errors"
"github.com/uber/jaeger-client-go"
"github.com/uber/jaeger-client-go/utils"
)
const (
// minimumCredits is the minimum amount of credits necessary to not be throttled.
// i.e. if currentCredits > minimumCredits, then the operation will not be throttled.
minimumCredits = 1.0
)
var (
errorUUIDNotSet = errors.New("Throttler UUID must be set")
)
type operationBalance struct {
Operation string `json:"operation"`
Balance float64 `json:"balance"`
}
type creditResponse struct {
Balances []operationBalance `json:"balances"`
}
type httpCreditManagerProxy struct {
hostPort string
}
func newHTTPCreditManagerProxy(hostPort string) *httpCreditManagerProxy {
return &httpCreditManagerProxy{
hostPort: hostPort,
}
}
// N.B. Operations list must not be empty.
func (m *httpCreditManagerProxy) FetchCredits(uuid, serviceName string, operations []string) (*creditResponse, error) {
params := url.Values{}
params.Set("service", serviceName)
params.Set("uuid", uuid)
for _, op := range operations {
params.Add("operations", op)
}
var resp creditResponse
if err := utils.GetJSON(fmt.Sprintf("http://%s/credits?%s", m.hostPort, params.Encode()), &resp); err != nil {
return nil, errors.Wrap(err, "Failed to receive credits from agent")
}
return &resp, nil
}
// Throttler retrieves credits from agent and uses it to throttle operations.
type Throttler struct {
options
mux sync.RWMutex
service string
uuid atomic.Value
creditManager *httpCreditManagerProxy
credits map[string]float64 // map of operation->credits
close chan struct{}
stopped sync.WaitGroup
}
// NewThrottler returns a Throttler that polls agent for credits and uses them to throttle
// the service.
func NewThrottler(service string, options ...Option) *Throttler {
opts := applyOptions(options...)
creditManager := newHTTPCreditManagerProxy(opts.hostPort)
t := &Throttler{
options: opts,
creditManager: creditManager,
service: service,
credits: make(map[string]float64),
close: make(chan struct{}),
}
t.stopped.Add(1)
go t.pollManager()
return t
}
// IsAllowed implements Throttler#IsAllowed.
func (t *Throttler) IsAllowed(operation string) bool {
t.mux.Lock()
defer t.mux.Unlock()
value, ok := t.credits[operation]
if !ok || value == 0 {
if !ok {
// NOTE: This appears to be a no-op at first glance, but it stores
// the operation key in the map. Necessary for functionality of
// Throttler#operations method.
t.credits[operation] = 0
}
if !t.synchronousInitialization {
t.metrics.ThrottledDebugSpans.Inc(1)
return false
}
// If it is the first time this operation is being checked, synchronously fetch
// the credits.
credits, err := t.fetchCredits([]string{operation})
if err != nil {
// Failed to receive credits from agent, try again next time
t.logger.Error("Failed to fetch credits: " + err.Error())
return false
}
if len(credits.Balances) == 0 {
// This shouldn't happen but just in case
return false
}
for _, opBalance := range credits.Balances {
t.credits[opBalance.Operation] += opBalance.Balance
}
}
return t.isAllowed(operation)
}
// Close stops the throttler from fetching credits from remote.
func (t *Throttler) Close() error {
close(t.close)
t.stopped.Wait()
return nil
}
// SetProcess implements ProcessSetter#SetProcess. It's imperative that the UUID is set before any remote
// requests are made.
func (t *Throttler) SetProcess(process jaeger.Process) {
if process.UUID != "" {
t.uuid.Store(process.UUID)
}
}
// N.B. This function must be called with the Write Lock
func (t *Throttler) isAllowed(operation string) bool {
credits := t.credits[operation]
if credits < minimumCredits {
t.metrics.ThrottledDebugSpans.Inc(1)
return false
}
t.credits[operation] = credits - minimumCredits
return true
}
func (t *Throttler) pollManager() {
defer t.stopped.Done()
ticker := time.NewTicker(t.refreshInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
t.refreshCredits()
case <-t.close:
return
}
}
}
func (t *Throttler) operations() []string {
t.mux.RLock()
defer t.mux.RUnlock()
operations := make([]string, 0, len(t.credits))
for op := range t.credits {
operations = append(operations, op)
}
return operations
}
func (t *Throttler) refreshCredits() {
operations := t.operations()
if len(operations) == 0 {
return
}
newCredits, err := t.fetchCredits(operations)
if err != nil {
t.metrics.ThrottlerUpdateFailure.Inc(1)
t.logger.Error("Failed to fetch credits: " + err.Error())
return
}
t.metrics.ThrottlerUpdateSuccess.Inc(1)
t.mux.Lock()
defer t.mux.Unlock()
for _, opBalance := range newCredits.Balances {
t.credits[opBalance.Operation] += opBalance.Balance
}
}
func (t *Throttler) fetchCredits(operations []string) (*creditResponse, error) {
uuid := t.uuid.Load()
uuidStr, _ := uuid.(string)
if uuid == nil || uuidStr == "" {
return nil, errorUUIDNotSet
}
return t.creditManager.FetchCredits(uuidStr, t.service, operations)
}