blob: 2f58bb541a3907751069d355adc6dc76e76da8f3 [file] [log] [blame]
khenaidooc6c7bda2020-06-17 17:20:18 -04001// Copyright (c) 2017 Uber Technologies, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package remote
16
17import (
khenaidood948f772021-08-11 17:49:24 -040018 "context"
khenaidooc6c7bda2020-06-17 17:20:18 -040019 "fmt"
20 "net/url"
21 "sync"
22 "time"
23
24 "github.com/uber/jaeger-client-go/internal/baggage"
25 thrift "github.com/uber/jaeger-client-go/thrift-gen/baggage"
26 "github.com/uber/jaeger-client-go/utils"
27)
28
29type httpBaggageRestrictionManagerProxy struct {
30 url string
31}
32
33func newHTTPBaggageRestrictionManagerProxy(hostPort, serviceName string) *httpBaggageRestrictionManagerProxy {
34 v := url.Values{}
35 v.Set("service", serviceName)
36 return &httpBaggageRestrictionManagerProxy{
37 url: fmt.Sprintf("http://%s/baggageRestrictions?%s", hostPort, v.Encode()),
38 }
39}
40
khenaidood948f772021-08-11 17:49:24 -040041func (s *httpBaggageRestrictionManagerProxy) GetBaggageRestrictions(context.Context, string) ([]*thrift.BaggageRestriction, error) {
khenaidooc6c7bda2020-06-17 17:20:18 -040042 var out []*thrift.BaggageRestriction
43 if err := utils.GetJSON(s.url, &out); err != nil {
44 return nil, err
45 }
46 return out, nil
47}
48
49// RestrictionManager manages baggage restrictions by retrieving baggage restrictions from agent
50type RestrictionManager struct {
51 options
52
53 mux sync.RWMutex
54 serviceName string
55 restrictions map[string]*baggage.Restriction
56 thriftProxy thrift.BaggageRestrictionManager
57 pollStopped sync.WaitGroup
58 stopPoll chan struct{}
59 invalidRestriction *baggage.Restriction
60 validRestriction *baggage.Restriction
61
62 // Determines if the manager has successfully retrieved baggage restrictions from agent
63 initialized bool
64}
65
66// NewRestrictionManager returns a BaggageRestrictionManager that polls the agent for the latest
67// baggage restrictions.
68func NewRestrictionManager(serviceName string, options ...Option) *RestrictionManager {
69 // TODO there is a developing use case where a single tracer can generate traces on behalf of many services.
70 // restrictionsMap will need to exist per service
71 opts := applyOptions(options...)
72 m := &RestrictionManager{
73 serviceName: serviceName,
74 options: opts,
75 restrictions: make(map[string]*baggage.Restriction),
76 thriftProxy: newHTTPBaggageRestrictionManagerProxy(opts.hostPort, serviceName),
77 stopPoll: make(chan struct{}),
78 invalidRestriction: baggage.NewRestriction(false, 0),
79 validRestriction: baggage.NewRestriction(true, defaultMaxValueLength),
80 }
81 m.pollStopped.Add(1)
82 go m.pollManager()
83 return m
84}
85
86// isReady returns true if the manager has retrieved baggage restrictions from the remote source.
87func (m *RestrictionManager) isReady() bool {
88 m.mux.RLock()
89 defer m.mux.RUnlock()
90 return m.initialized
91}
92
93// GetRestriction implements RestrictionManager#GetRestriction.
94func (m *RestrictionManager) GetRestriction(service, key string) *baggage.Restriction {
95 m.mux.RLock()
96 defer m.mux.RUnlock()
97 if !m.initialized {
98 if m.denyBaggageOnInitializationFailure {
99 return m.invalidRestriction
100 }
101 return m.validRestriction
102 }
103 if restriction, ok := m.restrictions[key]; ok {
104 return restriction
105 }
106 return m.invalidRestriction
107}
108
109// Close stops remote polling and closes the RemoteRestrictionManager.
110func (m *RestrictionManager) Close() error {
111 close(m.stopPoll)
112 m.pollStopped.Wait()
113 return nil
114}
115
116func (m *RestrictionManager) pollManager() {
117 defer m.pollStopped.Done()
118 // attempt to initialize baggage restrictions
119 if err := m.updateRestrictions(); err != nil {
120 m.logger.Error(fmt.Sprintf("Failed to initialize baggage restrictions: %s", err.Error()))
121 }
122 ticker := time.NewTicker(m.refreshInterval)
123 defer ticker.Stop()
124
125 for {
126 select {
127 case <-ticker.C:
128 if err := m.updateRestrictions(); err != nil {
129 m.logger.Error(fmt.Sprintf("Failed to update baggage restrictions: %s", err.Error()))
130 }
131 case <-m.stopPoll:
132 return
133 }
134 }
135}
136
137func (m *RestrictionManager) updateRestrictions() error {
khenaidood948f772021-08-11 17:49:24 -0400138 restrictions, err := m.thriftProxy.GetBaggageRestrictions(context.Background(), m.serviceName)
khenaidooc6c7bda2020-06-17 17:20:18 -0400139 if err != nil {
140 m.metrics.BaggageRestrictionsUpdateFailure.Inc(1)
141 return err
142 }
143 newRestrictions := m.parseRestrictions(restrictions)
144 m.metrics.BaggageRestrictionsUpdateSuccess.Inc(1)
145 m.mux.Lock()
146 defer m.mux.Unlock()
147 m.initialized = true
148 m.restrictions = newRestrictions
149 return nil
150}
151
152func (m *RestrictionManager) parseRestrictions(restrictions []*thrift.BaggageRestriction) map[string]*baggage.Restriction {
153 setters := make(map[string]*baggage.Restriction, len(restrictions))
154 for _, restriction := range restrictions {
155 setters[restriction.BaggageKey] = baggage.NewRestriction(true, int(restriction.MaxValueLength))
156 }
157 return setters
158}