blob: 7370d8007518d1d598bd50a70d22702168fdbdf0 [file] [log] [blame]
Takahiro Suzukid7bf8202020-12-17 20:21:59 +09001// Copyright (c) 2017 Uber Technologies, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package jaeger
16
17import (
18 "errors"
19 "fmt"
20
21 "github.com/uber/jaeger-client-go/internal/reporterstats"
22 "github.com/uber/jaeger-client-go/thrift"
23 j "github.com/uber/jaeger-client-go/thrift-gen/jaeger"
24 "github.com/uber/jaeger-client-go/utils"
25)
26
27// Empirically obtained constant for how many bytes in the message are used for envelope.
28// The total datagram size is:
29// sizeof(Span) * numSpans + processByteSize + emitBatchOverhead <= maxPacketSize
30//
31// Note that due to the use of Compact Thrift protocol, overhead grows with the number of spans
32// in the batch, because the length of the list is encoded as varint32, as well as SeqId.
33//
34// There is a unit test `TestEmitBatchOverhead` that validates this number, it fails at <68.
35const emitBatchOverhead = 70
36
37var errSpanTooLarge = errors.New("span is too large")
38
39type udpSender struct {
40 client *utils.AgentClientUDP
41 maxPacketSize int // max size of datagram in bytes
42 maxSpanBytes int // max number of bytes to record spans (excluding envelope) in the datagram
43 byteBufferSize int // current number of span bytes accumulated in the buffer
44 spanBuffer []*j.Span // spans buffered before a flush
45 thriftBuffer *thrift.TMemoryBuffer // buffer used to calculate byte size of a span
46 thriftProtocol thrift.TProtocol
47 process *j.Process
48 processByteSize int
49
50 // reporterStats provides access to stats that are only known to Reporter
51 reporterStats reporterstats.ReporterStats
52
53 // The following counters are always non-negative, but we need to send them in signed i64 Thrift fields,
54 // so we keep them as signed. At 10k QPS, overflow happens in about 300 million years.
55 batchSeqNo int64
56 tooLargeDroppedSpans int64
57 failedToEmitSpans int64
58}
59
60// NewUDPTransport creates a reporter that submits spans to jaeger-agent.
61// TODO: (breaking change) move to transport/ package.
62func NewUDPTransport(hostPort string, maxPacketSize int) (Transport, error) {
63 if len(hostPort) == 0 {
64 hostPort = fmt.Sprintf("%s:%d", DefaultUDPSpanServerHost, DefaultUDPSpanServerPort)
65 }
66 if maxPacketSize == 0 {
67 maxPacketSize = utils.UDPPacketMaxLength
68 }
69
70 protocolFactory := thrift.NewTCompactProtocolFactory()
71
72 // Each span is first written to thriftBuffer to determine its size in bytes.
73 thriftBuffer := thrift.NewTMemoryBufferLen(maxPacketSize)
74 thriftProtocol := protocolFactory.GetProtocol(thriftBuffer)
75
76 client, err := utils.NewAgentClientUDP(hostPort, maxPacketSize)
77 if err != nil {
78 return nil, err
79 }
80
81 return &udpSender{
82 client: client,
83 maxSpanBytes: maxPacketSize - emitBatchOverhead,
84 thriftBuffer: thriftBuffer,
85 thriftProtocol: thriftProtocol,
86 }, nil
87}
88
89// SetReporterStats implements reporterstats.Receiver.
90func (s *udpSender) SetReporterStats(rs reporterstats.ReporterStats) {
91 s.reporterStats = rs
92}
93
94func (s *udpSender) calcSizeOfSerializedThrift(thriftStruct thrift.TStruct) int {
95 s.thriftBuffer.Reset()
96 _ = thriftStruct.Write(s.thriftProtocol)
97 return s.thriftBuffer.Len()
98}
99
100func (s *udpSender) Append(span *Span) (int, error) {
101 if s.process == nil {
102 s.process = BuildJaegerProcessThrift(span)
103 s.processByteSize = s.calcSizeOfSerializedThrift(s.process)
104 s.byteBufferSize += s.processByteSize
105 }
106 jSpan := BuildJaegerThrift(span)
107 spanSize := s.calcSizeOfSerializedThrift(jSpan)
108 if spanSize > s.maxSpanBytes {
109 s.tooLargeDroppedSpans++
110 return 1, errSpanTooLarge
111 }
112
113 s.byteBufferSize += spanSize
114 if s.byteBufferSize <= s.maxSpanBytes {
115 s.spanBuffer = append(s.spanBuffer, jSpan)
116 if s.byteBufferSize < s.maxSpanBytes {
117 return 0, nil
118 }
119 return s.Flush()
120 }
121 // the latest span did not fit in the buffer
122 n, err := s.Flush()
123 s.spanBuffer = append(s.spanBuffer, jSpan)
124 s.byteBufferSize = spanSize + s.processByteSize
125 return n, err
126}
127
128func (s *udpSender) Flush() (int, error) {
129 n := len(s.spanBuffer)
130 if n == 0 {
131 return 0, nil
132 }
133 s.batchSeqNo++
134 batchSeqNo := int64(s.batchSeqNo)
135 err := s.client.EmitBatch(&j.Batch{
136 Process: s.process,
137 Spans: s.spanBuffer,
138 SeqNo: &batchSeqNo,
139 Stats: s.makeStats(),
140 })
141 s.resetBuffers()
142 if err != nil {
143 s.failedToEmitSpans += int64(n)
144 }
145 return n, err
146}
147
148func (s *udpSender) Close() error {
149 return s.client.Close()
150}
151
152func (s *udpSender) resetBuffers() {
153 for i := range s.spanBuffer {
154 s.spanBuffer[i] = nil
155 }
156 s.spanBuffer = s.spanBuffer[:0]
157 s.byteBufferSize = s.processByteSize
158}
159
160func (s *udpSender) makeStats() *j.ClientStats {
161 var dropped int64
162 if s.reporterStats != nil {
163 dropped = s.reporterStats.SpansDroppedFromQueue()
164 }
165 return &j.ClientStats{
166 FullQueueDroppedSpans: dropped,
167 TooLargeDroppedSpans: s.tooLargeDroppedSpans,
168 FailedToEmitSpans: s.failedToEmitSpans,
169 }
170}