blob: 1d6f14d3289df19a707c2caaea4c0f489aaeaf7b [file] [log] [blame]
khenaidooc6c7bda2020-06-17 17:20:18 -04001// Copyright (c) 2017 Uber Technologies, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package transport
16
17import (
18 "bytes"
khenaidood948f772021-08-11 17:49:24 -040019 "context"
khenaidooc6c7bda2020-06-17 17:20:18 -040020 "fmt"
21 "io"
22 "io/ioutil"
23 "net/http"
24 "time"
25
26 "github.com/uber/jaeger-client-go/thrift"
27
28 "github.com/uber/jaeger-client-go"
29 j "github.com/uber/jaeger-client-go/thrift-gen/jaeger"
30)
31
32// Default timeout for http request in seconds
33const defaultHTTPTimeout = time.Second * 5
34
35// HTTPTransport implements Transport by forwarding spans to a http server.
36type HTTPTransport struct {
37 url string
38 client *http.Client
39 batchSize int
40 spans []*j.Span
41 process *j.Process
42 httpCredentials *HTTPBasicAuthCredentials
43 headers map[string]string
44}
45
46// HTTPBasicAuthCredentials stores credentials for HTTP basic auth.
47type HTTPBasicAuthCredentials struct {
48 username string
49 password string
50}
51
52// HTTPOption sets a parameter for the HttpCollector
53type HTTPOption func(c *HTTPTransport)
54
55// HTTPTimeout sets maximum timeout for http request.
56func HTTPTimeout(duration time.Duration) HTTPOption {
57 return func(c *HTTPTransport) { c.client.Timeout = duration }
58}
59
60// HTTPBatchSize sets the maximum batch size, after which a collect will be
61// triggered. The default batch size is 100 spans.
62func HTTPBatchSize(n int) HTTPOption {
63 return func(c *HTTPTransport) { c.batchSize = n }
64}
65
66// HTTPBasicAuth sets the credentials required to perform HTTP basic auth
67func HTTPBasicAuth(username string, password string) HTTPOption {
68 return func(c *HTTPTransport) {
69 c.httpCredentials = &HTTPBasicAuthCredentials{username: username, password: password}
70 }
71}
72
73// HTTPRoundTripper configures the underlying Transport on the *http.Client
74// that is used
75func HTTPRoundTripper(transport http.RoundTripper) HTTPOption {
76 return func(c *HTTPTransport) {
77 c.client.Transport = transport
78 }
79}
80
81// HTTPHeaders defines the HTTP headers that will be attached to the jaeger client's HTTP request
82func HTTPHeaders(headers map[string]string) HTTPOption {
83 return func(c *HTTPTransport) {
84 c.headers = headers
85 }
86}
87
88// NewHTTPTransport returns a new HTTP-backend transport. url should be an http
89// url of the collector to handle POST request, typically something like:
90// http://hostname:14268/api/traces?format=jaeger.thrift
91func NewHTTPTransport(url string, options ...HTTPOption) *HTTPTransport {
92 c := &HTTPTransport{
93 url: url,
94 client: &http.Client{Timeout: defaultHTTPTimeout},
95 batchSize: 100,
96 spans: []*j.Span{},
97 }
98
99 for _, option := range options {
100 option(c)
101 }
102 return c
103}
104
105// Append implements Transport.
106func (c *HTTPTransport) Append(span *jaeger.Span) (int, error) {
107 if c.process == nil {
108 c.process = jaeger.BuildJaegerProcessThrift(span)
109 }
110 jSpan := jaeger.BuildJaegerThrift(span)
111 c.spans = append(c.spans, jSpan)
112 if len(c.spans) >= c.batchSize {
113 return c.Flush()
114 }
115 return 0, nil
116}
117
118// Flush implements Transport.
119func (c *HTTPTransport) Flush() (int, error) {
120 count := len(c.spans)
121 if count == 0 {
122 return 0, nil
123 }
124 err := c.send(c.spans)
125 c.spans = c.spans[:0]
126 return count, err
127}
128
129// Close implements Transport.
130func (c *HTTPTransport) Close() error {
131 return nil
132}
133
134func (c *HTTPTransport) send(spans []*j.Span) error {
135 batch := &j.Batch{
136 Spans: spans,
137 Process: c.process,
138 }
139 body, err := serializeThrift(batch)
140 if err != nil {
141 return err
142 }
143 req, err := http.NewRequest("POST", c.url, body)
144 if err != nil {
145 return err
146 }
147 req.Header.Set("Content-Type", "application/x-thrift")
148 for k, v := range c.headers {
149 req.Header.Set(k, v)
150 }
151
152 if c.httpCredentials != nil {
153 req.SetBasicAuth(c.httpCredentials.username, c.httpCredentials.password)
154 }
155
156 resp, err := c.client.Do(req)
157 if err != nil {
158 return err
159 }
160 io.Copy(ioutil.Discard, resp.Body)
161 resp.Body.Close()
162 if resp.StatusCode >= http.StatusBadRequest {
163 return fmt.Errorf("error from collector: %d", resp.StatusCode)
164 }
165 return nil
166}
167
168func serializeThrift(obj thrift.TStruct) (*bytes.Buffer, error) {
169 t := thrift.NewTMemoryBuffer()
170 p := thrift.NewTBinaryProtocolTransport(t)
khenaidood948f772021-08-11 17:49:24 -0400171 if err := obj.Write(context.Background(), p); err != nil {
khenaidooc6c7bda2020-06-17 17:20:18 -0400172 return nil, err
173 }
174 return t.Buffer, nil
175}