blob: 73aeb000f895741c206adb26533eaae6cb96f635 [file] [log] [blame]
Takahiro Suzuki241c10e2020-12-17 20:17:57 +09001// Copyright (c) 2017 Uber Technologies, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package jaeger
16
17import (
18 "encoding/binary"
19 "fmt"
20 "time"
21
22 "github.com/opentracing/opentracing-go/ext"
23
24 "github.com/uber/jaeger-client-go/internal/spanlog"
25 z "github.com/uber/jaeger-client-go/thrift-gen/zipkincore"
26 "github.com/uber/jaeger-client-go/utils"
27)
28
29const (
30 // Zipkin UI does not work well with non-string tag values
31 allowPackedNumbers = false
32)
33
34var specialTagHandlers = map[string]func(*zipkinSpan, interface{}){
35 string(ext.SpanKind): setSpanKind,
36 string(ext.PeerHostIPv4): setPeerIPv4,
37 string(ext.PeerPort): setPeerPort,
38 string(ext.PeerService): setPeerService,
39 TracerIPTagKey: removeTag,
40}
41
42// BuildZipkinThrift builds thrift span based on internal span.
43// TODO: (breaking change) move to transport/zipkin and make private.
44func BuildZipkinThrift(s *Span) *z.Span {
45 span := &zipkinSpan{Span: s}
46 span.handleSpecialTags()
47 parentID := int64(span.context.parentID)
48 var ptrParentID *int64
49 if parentID != 0 {
50 ptrParentID = &parentID
51 }
52 traceIDHigh := int64(span.context.traceID.High)
53 var ptrTraceIDHigh *int64
54 if traceIDHigh != 0 {
55 ptrTraceIDHigh = &traceIDHigh
56 }
57 timestamp := utils.TimeToMicrosecondsSinceEpochInt64(span.startTime)
58 duration := span.duration.Nanoseconds() / int64(time.Microsecond)
59 endpoint := &z.Endpoint{
60 ServiceName: span.tracer.serviceName,
61 Ipv4: int32(span.tracer.hostIPv4)}
62 thriftSpan := &z.Span{
63 TraceID: int64(span.context.traceID.Low),
64 TraceIDHigh: ptrTraceIDHigh,
65 ID: int64(span.context.spanID),
66 ParentID: ptrParentID,
67 Name: span.operationName,
68 Timestamp: &timestamp,
69 Duration: &duration,
70 Debug: span.context.IsDebug(),
71 Annotations: buildAnnotations(span, endpoint),
72 BinaryAnnotations: buildBinaryAnnotations(span, endpoint)}
73 return thriftSpan
74}
75
76func buildAnnotations(span *zipkinSpan, endpoint *z.Endpoint) []*z.Annotation {
77 // automatically adding 2 Zipkin CoreAnnotations
78 annotations := make([]*z.Annotation, 0, 2+len(span.logs))
79 var startLabel, endLabel string
80 if span.spanKind == string(ext.SpanKindRPCClientEnum) {
81 startLabel, endLabel = z.CLIENT_SEND, z.CLIENT_RECV
82 } else if span.spanKind == string(ext.SpanKindRPCServerEnum) {
83 startLabel, endLabel = z.SERVER_RECV, z.SERVER_SEND
84 }
85 if !span.startTime.IsZero() && startLabel != "" {
86 start := &z.Annotation{
87 Timestamp: utils.TimeToMicrosecondsSinceEpochInt64(span.startTime),
88 Value: startLabel,
89 Host: endpoint}
90 annotations = append(annotations, start)
91 if span.duration != 0 {
92 endTs := span.startTime.Add(span.duration)
93 end := &z.Annotation{
94 Timestamp: utils.TimeToMicrosecondsSinceEpochInt64(endTs),
95 Value: endLabel,
96 Host: endpoint}
97 annotations = append(annotations, end)
98 }
99 }
100 for _, log := range span.logs {
101 anno := &z.Annotation{
102 Timestamp: utils.TimeToMicrosecondsSinceEpochInt64(log.Timestamp),
103 Host: endpoint}
104 if content, err := spanlog.MaterializeWithJSON(log.Fields); err == nil {
105 anno.Value = truncateString(string(content), span.tracer.options.maxTagValueLength)
106 } else {
107 anno.Value = err.Error()
108 }
109 annotations = append(annotations, anno)
110 }
111 return annotations
112}
113
114func buildBinaryAnnotations(span *zipkinSpan, endpoint *z.Endpoint) []*z.BinaryAnnotation {
115 // automatically adding local component or server/client address tag, and client version
116 annotations := make([]*z.BinaryAnnotation, 0, 2+len(span.tags))
117
118 if span.peerDefined() && span.isRPC() {
119 peer := z.Endpoint{
120 Ipv4: span.peer.Ipv4,
121 Port: span.peer.Port,
122 ServiceName: span.peer.ServiceName}
123 label := z.CLIENT_ADDR
124 if span.isRPCClient() {
125 label = z.SERVER_ADDR
126 }
127 anno := &z.BinaryAnnotation{
128 Key: label,
129 Value: []byte{1},
130 AnnotationType: z.AnnotationType_BOOL,
131 Host: &peer}
132 annotations = append(annotations, anno)
133 }
134 if !span.isRPC() {
135 componentName := endpoint.ServiceName
136 for _, tag := range span.tags {
137 if tag.key == string(ext.Component) {
138 componentName = stringify(tag.value)
139 break
140 }
141 }
142 local := &z.BinaryAnnotation{
143 Key: z.LOCAL_COMPONENT,
144 Value: []byte(componentName),
145 AnnotationType: z.AnnotationType_STRING,
146 Host: endpoint}
147 annotations = append(annotations, local)
148 }
149 for _, tag := range span.tags {
150 // "Special tags" are already handled by this point, we'd be double reporting the
151 // tags if we don't skip here
152 if _, ok := specialTagHandlers[tag.key]; ok {
153 continue
154 }
155 if anno := buildBinaryAnnotation(tag.key, tag.value, span.tracer.options.maxTagValueLength, nil); anno != nil {
156 annotations = append(annotations, anno)
157 }
158 }
159 return annotations
160}
161
162func buildBinaryAnnotation(key string, val interface{}, maxTagValueLength int, endpoint *z.Endpoint) *z.BinaryAnnotation {
163 bann := &z.BinaryAnnotation{Key: key, Host: endpoint}
164 if value, ok := val.(string); ok {
165 bann.Value = []byte(truncateString(value, maxTagValueLength))
166 bann.AnnotationType = z.AnnotationType_STRING
167 } else if value, ok := val.([]byte); ok {
168 if len(value) > maxTagValueLength {
169 value = value[:maxTagValueLength]
170 }
171 bann.Value = value
172 bann.AnnotationType = z.AnnotationType_BYTES
173 } else if value, ok := val.(int32); ok && allowPackedNumbers {
174 bann.Value = int32ToBytes(value)
175 bann.AnnotationType = z.AnnotationType_I32
176 } else if value, ok := val.(int64); ok && allowPackedNumbers {
177 bann.Value = int64ToBytes(value)
178 bann.AnnotationType = z.AnnotationType_I64
179 } else if value, ok := val.(int); ok && allowPackedNumbers {
180 bann.Value = int64ToBytes(int64(value))
181 bann.AnnotationType = z.AnnotationType_I64
182 } else if value, ok := val.(bool); ok {
183 bann.Value = []byte{boolToByte(value)}
184 bann.AnnotationType = z.AnnotationType_BOOL
185 } else {
186 value := stringify(val)
187 bann.Value = []byte(truncateString(value, maxTagValueLength))
188 bann.AnnotationType = z.AnnotationType_STRING
189 }
190 return bann
191}
192
193func stringify(value interface{}) string {
194 if s, ok := value.(string); ok {
195 return s
196 }
197 return fmt.Sprintf("%+v", value)
198}
199
200func truncateString(value string, maxLength int) string {
201 // we ignore the problem of utf8 runes possibly being sliced in the middle,
202 // as it is rather expensive to iterate through each tag just to find rune
203 // boundaries.
204 if len(value) > maxLength {
205 return value[:maxLength]
206 }
207 return value
208}
209
210func boolToByte(b bool) byte {
211 if b {
212 return 1
213 }
214 return 0
215}
216
217// int32ToBytes converts int32 to bytes.
218func int32ToBytes(i int32) []byte {
219 buf := make([]byte, 4)
220 binary.BigEndian.PutUint32(buf, uint32(i))
221 return buf
222}
223
224// int64ToBytes converts int64 to bytes.
225func int64ToBytes(i int64) []byte {
226 buf := make([]byte, 8)
227 binary.BigEndian.PutUint64(buf, uint64(i))
228 return buf
229}
230
231type zipkinSpan struct {
232 *Span
233
234 // peer points to the peer service participating in this span,
235 // e.g. the Client if this span is a server span,
236 // or Server if this span is a client span
237 peer struct {
238 Ipv4 int32
239 Port int16
240 ServiceName string
241 }
242
243 // used to distinguish local vs. RPC Server vs. RPC Client spans
244 spanKind string
245}
246
247func (s *zipkinSpan) handleSpecialTags() {
248 s.Lock()
249 defer s.Unlock()
250 if s.firstInProcess {
251 // append the process tags
252 s.tags = append(s.tags, s.tracer.tags...)
253 }
254 filteredTags := make([]Tag, 0, len(s.tags))
255 for _, tag := range s.tags {
256 if handler, ok := specialTagHandlers[tag.key]; ok {
257 handler(s, tag.value)
258 } else {
259 filteredTags = append(filteredTags, tag)
260 }
261 }
262 s.tags = filteredTags
263}
264
265func setSpanKind(s *zipkinSpan, value interface{}) {
266 if val, ok := value.(string); ok {
267 s.spanKind = val
268 return
269 }
270 if val, ok := value.(ext.SpanKindEnum); ok {
271 s.spanKind = string(val)
272 }
273}
274
275func setPeerIPv4(s *zipkinSpan, value interface{}) {
276 if val, ok := value.(string); ok {
277 if ip, err := utils.ParseIPToUint32(val); err == nil {
278 s.peer.Ipv4 = int32(ip)
279 return
280 }
281 }
282 if val, ok := value.(uint32); ok {
283 s.peer.Ipv4 = int32(val)
284 return
285 }
286 if val, ok := value.(int32); ok {
287 s.peer.Ipv4 = val
288 }
289}
290
291func setPeerPort(s *zipkinSpan, value interface{}) {
292 if val, ok := value.(string); ok {
293 if port, err := utils.ParsePort(val); err == nil {
294 s.peer.Port = int16(port)
295 return
296 }
297 }
298 if val, ok := value.(uint16); ok {
299 s.peer.Port = int16(val)
300 return
301 }
302 if val, ok := value.(int); ok {
303 s.peer.Port = int16(val)
304 }
305}
306
307func setPeerService(s *zipkinSpan, value interface{}) {
308 if val, ok := value.(string); ok {
309 s.peer.ServiceName = val
310 }
311}
312
313func removeTag(s *zipkinSpan, value interface{}) {}
314
315func (s *zipkinSpan) peerDefined() bool {
316 return s.peer.ServiceName != "" || s.peer.Ipv4 != 0 || s.peer.Port != 0
317}
318
319func (s *zipkinSpan) isRPC() bool {
320 s.RLock()
321 defer s.RUnlock()
322 return s.spanKind == string(ext.SpanKindRPCClientEnum) || s.spanKind == string(ext.SpanKindRPCServerEnum)
323}
324
325func (s *zipkinSpan) isRPCClient() bool {
326 s.RLock()
327 defer s.RUnlock()
328 return s.spanKind == string(ext.SpanKindRPCClientEnum)
329}