blob: bb7eb00c97914fba1b88103cddf6ef42f53ba754 [file] [log] [blame]
mpagenkoaf801632020-07-03 10:00:42 +00001// Copyright (c) 2017 Uber Technologies, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package transport
16
17import (
18 "bytes"
19 "fmt"
20 "io"
21 "io/ioutil"
22 "net/http"
23 "time"
24
25 "github.com/uber/jaeger-client-go/thrift"
26
27 "github.com/uber/jaeger-client-go"
28 j "github.com/uber/jaeger-client-go/thrift-gen/jaeger"
29)
30
31// Default timeout for http request in seconds
32const defaultHTTPTimeout = time.Second * 5
33
34// HTTPTransport implements Transport by forwarding spans to a http server.
35type HTTPTransport struct {
36 url string
37 client *http.Client
38 batchSize int
39 spans []*j.Span
40 process *j.Process
41 httpCredentials *HTTPBasicAuthCredentials
42 headers map[string]string
43}
44
45// HTTPBasicAuthCredentials stores credentials for HTTP basic auth.
46type HTTPBasicAuthCredentials struct {
47 username string
48 password string
49}
50
51// HTTPOption sets a parameter for the HttpCollector
52type HTTPOption func(c *HTTPTransport)
53
54// HTTPTimeout sets maximum timeout for http request.
55func HTTPTimeout(duration time.Duration) HTTPOption {
56 return func(c *HTTPTransport) { c.client.Timeout = duration }
57}
58
59// HTTPBatchSize sets the maximum batch size, after which a collect will be
60// triggered. The default batch size is 100 spans.
61func HTTPBatchSize(n int) HTTPOption {
62 return func(c *HTTPTransport) { c.batchSize = n }
63}
64
65// HTTPBasicAuth sets the credentials required to perform HTTP basic auth
66func HTTPBasicAuth(username string, password string) HTTPOption {
67 return func(c *HTTPTransport) {
68 c.httpCredentials = &HTTPBasicAuthCredentials{username: username, password: password}
69 }
70}
71
72// HTTPRoundTripper configures the underlying Transport on the *http.Client
73// that is used
74func HTTPRoundTripper(transport http.RoundTripper) HTTPOption {
75 return func(c *HTTPTransport) {
76 c.client.Transport = transport
77 }
78}
79
80// HTTPHeaders defines the HTTP headers that will be attached to the jaeger client's HTTP request
81func HTTPHeaders(headers map[string]string) HTTPOption {
82 return func(c *HTTPTransport) {
83 c.headers = headers
84 }
85}
86
87// NewHTTPTransport returns a new HTTP-backend transport. url should be an http
88// url of the collector to handle POST request, typically something like:
89// http://hostname:14268/api/traces?format=jaeger.thrift
90func NewHTTPTransport(url string, options ...HTTPOption) *HTTPTransport {
91 c := &HTTPTransport{
92 url: url,
93 client: &http.Client{Timeout: defaultHTTPTimeout},
94 batchSize: 100,
95 spans: []*j.Span{},
96 }
97
98 for _, option := range options {
99 option(c)
100 }
101 return c
102}
103
104// Append implements Transport.
105func (c *HTTPTransport) Append(span *jaeger.Span) (int, error) {
106 if c.process == nil {
107 c.process = jaeger.BuildJaegerProcessThrift(span)
108 }
109 jSpan := jaeger.BuildJaegerThrift(span)
110 c.spans = append(c.spans, jSpan)
111 if len(c.spans) >= c.batchSize {
112 return c.Flush()
113 }
114 return 0, nil
115}
116
117// Flush implements Transport.
118func (c *HTTPTransport) Flush() (int, error) {
119 count := len(c.spans)
120 if count == 0 {
121 return 0, nil
122 }
123 err := c.send(c.spans)
124 c.spans = c.spans[:0]
125 return count, err
126}
127
128// Close implements Transport.
129func (c *HTTPTransport) Close() error {
130 return nil
131}
132
133func (c *HTTPTransport) send(spans []*j.Span) error {
134 batch := &j.Batch{
135 Spans: spans,
136 Process: c.process,
137 }
138 body, err := serializeThrift(batch)
139 if err != nil {
140 return err
141 }
142 req, err := http.NewRequest("POST", c.url, body)
143 if err != nil {
144 return err
145 }
146 req.Header.Set("Content-Type", "application/x-thrift")
147 for k, v := range c.headers {
148 req.Header.Set(k, v)
149 }
150
151 if c.httpCredentials != nil {
152 req.SetBasicAuth(c.httpCredentials.username, c.httpCredentials.password)
153 }
154
155 resp, err := c.client.Do(req)
156 if err != nil {
157 return err
158 }
159 io.Copy(ioutil.Discard, resp.Body)
160 resp.Body.Close()
161 if resp.StatusCode >= http.StatusBadRequest {
162 return fmt.Errorf("error from collector: %d", resp.StatusCode)
163 }
164 return nil
165}
166
167func serializeThrift(obj thrift.TStruct) (*bytes.Buffer, error) {
168 t := thrift.NewTMemoryBuffer()
169 p := thrift.NewTBinaryProtocolTransport(t)
170 if err := obj.Write(p); err != nil {
171 return nil, err
172 }
173 return t.Buffer, nil
174}