blob: 00004124c89a9df1b536c8ec6fd762ece6166ed5 [file] [log] [blame]
Matteo Scandoloa4285862020-12-01 18:10:10 -08001// Copyright (c) 2017 Uber Technologies, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package jaeger
16
17import (
David K. Bainbridge06631892021-08-19 13:07:00 +000018 "context"
Matteo Scandoloa4285862020-12-01 18:10:10 -080019 "errors"
20 "fmt"
21
22 "github.com/uber/jaeger-client-go/internal/reporterstats"
David K. Bainbridge06631892021-08-19 13:07:00 +000023 "github.com/uber/jaeger-client-go/log"
Matteo Scandoloa4285862020-12-01 18:10:10 -080024 "github.com/uber/jaeger-client-go/thrift"
25 j "github.com/uber/jaeger-client-go/thrift-gen/jaeger"
26 "github.com/uber/jaeger-client-go/utils"
27)
28
29// Empirically obtained constant for how many bytes in the message are used for envelope.
30// The total datagram size is:
31// sizeof(Span) * numSpans + processByteSize + emitBatchOverhead <= maxPacketSize
32//
33// Note that due to the use of Compact Thrift protocol, overhead grows with the number of spans
34// in the batch, because the length of the list is encoded as varint32, as well as SeqId.
35//
36// There is a unit test `TestEmitBatchOverhead` that validates this number, it fails at <68.
37const emitBatchOverhead = 70
38
39var errSpanTooLarge = errors.New("span is too large")
40
41type udpSender struct {
42 client *utils.AgentClientUDP
43 maxPacketSize int // max size of datagram in bytes
44 maxSpanBytes int // max number of bytes to record spans (excluding envelope) in the datagram
45 byteBufferSize int // current number of span bytes accumulated in the buffer
46 spanBuffer []*j.Span // spans buffered before a flush
47 thriftBuffer *thrift.TMemoryBuffer // buffer used to calculate byte size of a span
48 thriftProtocol thrift.TProtocol
49 process *j.Process
50 processByteSize int
51
52 // reporterStats provides access to stats that are only known to Reporter
53 reporterStats reporterstats.ReporterStats
54
55 // The following counters are always non-negative, but we need to send them in signed i64 Thrift fields,
56 // so we keep them as signed. At 10k QPS, overflow happens in about 300 million years.
57 batchSeqNo int64
58 tooLargeDroppedSpans int64
59 failedToEmitSpans int64
60}
61
David K. Bainbridge06631892021-08-19 13:07:00 +000062// UDPTransportParams allows specifying options for initializing a UDPTransport. An instance of this struct should
63// be passed to NewUDPTransportWithParams.
64type UDPTransportParams struct {
65 utils.AgentClientUDPParams
66}
67
68// NewUDPTransportWithParams creates a reporter that submits spans to jaeger-agent.
Matteo Scandoloa4285862020-12-01 18:10:10 -080069// TODO: (breaking change) move to transport/ package.
David K. Bainbridge06631892021-08-19 13:07:00 +000070func NewUDPTransportWithParams(params UDPTransportParams) (Transport, error) {
71 if len(params.HostPort) == 0 {
72 params.HostPort = fmt.Sprintf("%s:%d", DefaultUDPSpanServerHost, DefaultUDPSpanServerPort)
Matteo Scandoloa4285862020-12-01 18:10:10 -080073 }
David K. Bainbridge06631892021-08-19 13:07:00 +000074
75 if params.Logger == nil {
76 params.Logger = log.StdLogger
77 }
78
79 if params.MaxPacketSize == 0 {
80 params.MaxPacketSize = utils.UDPPacketMaxLength
Matteo Scandoloa4285862020-12-01 18:10:10 -080081 }
82
83 protocolFactory := thrift.NewTCompactProtocolFactory()
84
85 // Each span is first written to thriftBuffer to determine its size in bytes.
David K. Bainbridge06631892021-08-19 13:07:00 +000086 thriftBuffer := thrift.NewTMemoryBufferLen(params.MaxPacketSize)
Matteo Scandoloa4285862020-12-01 18:10:10 -080087 thriftProtocol := protocolFactory.GetProtocol(thriftBuffer)
88
David K. Bainbridge06631892021-08-19 13:07:00 +000089 client, err := utils.NewAgentClientUDPWithParams(params.AgentClientUDPParams)
Matteo Scandoloa4285862020-12-01 18:10:10 -080090 if err != nil {
91 return nil, err
92 }
93
94 return &udpSender{
95 client: client,
David K. Bainbridge06631892021-08-19 13:07:00 +000096 maxSpanBytes: params.MaxPacketSize - emitBatchOverhead,
Matteo Scandoloa4285862020-12-01 18:10:10 -080097 thriftBuffer: thriftBuffer,
98 thriftProtocol: thriftProtocol,
99 }, nil
100}
101
David K. Bainbridge06631892021-08-19 13:07:00 +0000102// NewUDPTransport creates a reporter that submits spans to jaeger-agent.
103// TODO: (breaking change) move to transport/ package.
104func NewUDPTransport(hostPort string, maxPacketSize int) (Transport, error) {
105 return NewUDPTransportWithParams(UDPTransportParams{
106 AgentClientUDPParams: utils.AgentClientUDPParams{
107 HostPort: hostPort,
108 MaxPacketSize: maxPacketSize,
109 },
110 })
111}
112
Matteo Scandoloa4285862020-12-01 18:10:10 -0800113// SetReporterStats implements reporterstats.Receiver.
114func (s *udpSender) SetReporterStats(rs reporterstats.ReporterStats) {
115 s.reporterStats = rs
116}
117
118func (s *udpSender) calcSizeOfSerializedThrift(thriftStruct thrift.TStruct) int {
119 s.thriftBuffer.Reset()
David K. Bainbridge06631892021-08-19 13:07:00 +0000120 _ = thriftStruct.Write(context.Background(), s.thriftProtocol)
Matteo Scandoloa4285862020-12-01 18:10:10 -0800121 return s.thriftBuffer.Len()
122}
123
124func (s *udpSender) Append(span *Span) (int, error) {
125 if s.process == nil {
126 s.process = BuildJaegerProcessThrift(span)
127 s.processByteSize = s.calcSizeOfSerializedThrift(s.process)
128 s.byteBufferSize += s.processByteSize
129 }
130 jSpan := BuildJaegerThrift(span)
131 spanSize := s.calcSizeOfSerializedThrift(jSpan)
132 if spanSize > s.maxSpanBytes {
133 s.tooLargeDroppedSpans++
134 return 1, errSpanTooLarge
135 }
136
137 s.byteBufferSize += spanSize
138 if s.byteBufferSize <= s.maxSpanBytes {
139 s.spanBuffer = append(s.spanBuffer, jSpan)
140 if s.byteBufferSize < s.maxSpanBytes {
141 return 0, nil
142 }
143 return s.Flush()
144 }
145 // the latest span did not fit in the buffer
146 n, err := s.Flush()
147 s.spanBuffer = append(s.spanBuffer, jSpan)
148 s.byteBufferSize = spanSize + s.processByteSize
149 return n, err
150}
151
152func (s *udpSender) Flush() (int, error) {
153 n := len(s.spanBuffer)
154 if n == 0 {
155 return 0, nil
156 }
157 s.batchSeqNo++
158 batchSeqNo := int64(s.batchSeqNo)
David K. Bainbridge06631892021-08-19 13:07:00 +0000159 err := s.client.EmitBatch(context.Background(), &j.Batch{
Matteo Scandoloa4285862020-12-01 18:10:10 -0800160 Process: s.process,
161 Spans: s.spanBuffer,
162 SeqNo: &batchSeqNo,
163 Stats: s.makeStats(),
164 })
165 s.resetBuffers()
166 if err != nil {
167 s.failedToEmitSpans += int64(n)
168 }
169 return n, err
170}
171
172func (s *udpSender) Close() error {
173 return s.client.Close()
174}
175
176func (s *udpSender) resetBuffers() {
177 for i := range s.spanBuffer {
178 s.spanBuffer[i] = nil
179 }
180 s.spanBuffer = s.spanBuffer[:0]
181 s.byteBufferSize = s.processByteSize
182}
183
184func (s *udpSender) makeStats() *j.ClientStats {
185 var dropped int64
186 if s.reporterStats != nil {
187 dropped = s.reporterStats.SpansDroppedFromQueue()
188 }
189 return &j.ClientStats{
190 FullQueueDroppedSpans: dropped,
191 TooLargeDroppedSpans: s.tooLargeDroppedSpans,
192 FailedToEmitSpans: s.failedToEmitSpans,
193 }
194}