blob: a56515acab8690943e3f07f0840a03c1486027d2 [file] [log] [blame]
Girish Kumar182049b2020-07-08 18:53:34 +00001// Copyright (c) 2017 Uber Technologies, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package remote
16
17import (
18 "fmt"
19 "net/url"
20 "sync"
21 "time"
22
23 "github.com/uber/jaeger-client-go/internal/baggage"
24 thrift "github.com/uber/jaeger-client-go/thrift-gen/baggage"
25 "github.com/uber/jaeger-client-go/utils"
26)
27
28type httpBaggageRestrictionManagerProxy struct {
29 url string
30}
31
32func newHTTPBaggageRestrictionManagerProxy(hostPort, serviceName string) *httpBaggageRestrictionManagerProxy {
33 v := url.Values{}
34 v.Set("service", serviceName)
35 return &httpBaggageRestrictionManagerProxy{
36 url: fmt.Sprintf("http://%s/baggageRestrictions?%s", hostPort, v.Encode()),
37 }
38}
39
40func (s *httpBaggageRestrictionManagerProxy) GetBaggageRestrictions(serviceName string) ([]*thrift.BaggageRestriction, error) {
41 var out []*thrift.BaggageRestriction
42 if err := utils.GetJSON(s.url, &out); err != nil {
43 return nil, err
44 }
45 return out, nil
46}
47
48// RestrictionManager manages baggage restrictions by retrieving baggage restrictions from agent
49type RestrictionManager struct {
50 options
51
52 mux sync.RWMutex
53 serviceName string
54 restrictions map[string]*baggage.Restriction
55 thriftProxy thrift.BaggageRestrictionManager
56 pollStopped sync.WaitGroup
57 stopPoll chan struct{}
58 invalidRestriction *baggage.Restriction
59 validRestriction *baggage.Restriction
60
61 // Determines if the manager has successfully retrieved baggage restrictions from agent
62 initialized bool
63}
64
65// NewRestrictionManager returns a BaggageRestrictionManager that polls the agent for the latest
66// baggage restrictions.
67func NewRestrictionManager(serviceName string, options ...Option) *RestrictionManager {
68 // TODO there is a developing use case where a single tracer can generate traces on behalf of many services.
69 // restrictionsMap will need to exist per service
70 opts := applyOptions(options...)
71 m := &RestrictionManager{
72 serviceName: serviceName,
73 options: opts,
74 restrictions: make(map[string]*baggage.Restriction),
75 thriftProxy: newHTTPBaggageRestrictionManagerProxy(opts.hostPort, serviceName),
76 stopPoll: make(chan struct{}),
77 invalidRestriction: baggage.NewRestriction(false, 0),
78 validRestriction: baggage.NewRestriction(true, defaultMaxValueLength),
79 }
80 m.pollStopped.Add(1)
81 go m.pollManager()
82 return m
83}
84
85// isReady returns true if the manager has retrieved baggage restrictions from the remote source.
86func (m *RestrictionManager) isReady() bool {
87 m.mux.RLock()
88 defer m.mux.RUnlock()
89 return m.initialized
90}
91
92// GetRestriction implements RestrictionManager#GetRestriction.
93func (m *RestrictionManager) GetRestriction(service, key string) *baggage.Restriction {
94 m.mux.RLock()
95 defer m.mux.RUnlock()
96 if !m.initialized {
97 if m.denyBaggageOnInitializationFailure {
98 return m.invalidRestriction
99 }
100 return m.validRestriction
101 }
102 if restriction, ok := m.restrictions[key]; ok {
103 return restriction
104 }
105 return m.invalidRestriction
106}
107
108// Close stops remote polling and closes the RemoteRestrictionManager.
109func (m *RestrictionManager) Close() error {
110 close(m.stopPoll)
111 m.pollStopped.Wait()
112 return nil
113}
114
115func (m *RestrictionManager) pollManager() {
116 defer m.pollStopped.Done()
117 // attempt to initialize baggage restrictions
118 if err := m.updateRestrictions(); err != nil {
119 m.logger.Error(fmt.Sprintf("Failed to initialize baggage restrictions: %s", err.Error()))
120 }
121 ticker := time.NewTicker(m.refreshInterval)
122 defer ticker.Stop()
123
124 for {
125 select {
126 case <-ticker.C:
127 if err := m.updateRestrictions(); err != nil {
128 m.logger.Error(fmt.Sprintf("Failed to update baggage restrictions: %s", err.Error()))
129 }
130 case <-m.stopPoll:
131 return
132 }
133 }
134}
135
136func (m *RestrictionManager) updateRestrictions() error {
137 restrictions, err := m.thriftProxy.GetBaggageRestrictions(m.serviceName)
138 if err != nil {
139 m.metrics.BaggageRestrictionsUpdateFailure.Inc(1)
140 return err
141 }
142 newRestrictions := m.parseRestrictions(restrictions)
143 m.metrics.BaggageRestrictionsUpdateSuccess.Inc(1)
144 m.mux.Lock()
145 defer m.mux.Unlock()
146 m.initialized = true
147 m.restrictions = newRestrictions
148 return nil
149}
150
151func (m *RestrictionManager) parseRestrictions(restrictions []*thrift.BaggageRestriction) map[string]*baggage.Restriction {
152 setters := make(map[string]*baggage.Restriction, len(restrictions))
153 for _, restriction := range restrictions {
154 setters[restriction.BaggageKey] = baggage.NewRestriction(true, int(restriction.MaxValueLength))
155 }
156 return setters
157}