blob: fadd73e49aea2770c1c26565bb63e69a4a48d00f [file] [log] [blame]
Matteo Scandoloa4285862020-12-01 18:10:10 -08001// Copyright (c) 2017 Uber Technologies, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package utils
16
17import (
18 "errors"
19 "fmt"
20 "io"
21 "net"
22
23 "github.com/uber/jaeger-client-go/thrift"
24
25 "github.com/uber/jaeger-client-go/thrift-gen/agent"
26 "github.com/uber/jaeger-client-go/thrift-gen/jaeger"
27 "github.com/uber/jaeger-client-go/thrift-gen/zipkincore"
28)
29
30// UDPPacketMaxLength is the max size of UDP packet we want to send, synced with jaeger-agent
31const UDPPacketMaxLength = 65000
32
33// AgentClientUDP is a UDP client to Jaeger agent that implements agent.Agent interface.
34type AgentClientUDP struct {
35 agent.Agent
36 io.Closer
37
38 connUDP *net.UDPConn
39 client *agent.AgentClient
40 maxPacketSize int // max size of datagram in bytes
41 thriftBuffer *thrift.TMemoryBuffer // buffer used to calculate byte size of a span
42}
43
44// NewAgentClientUDP creates a client that sends spans to Jaeger Agent over UDP.
45func NewAgentClientUDP(hostPort string, maxPacketSize int) (*AgentClientUDP, error) {
46 if maxPacketSize == 0 {
47 maxPacketSize = UDPPacketMaxLength
48 }
49
50 thriftBuffer := thrift.NewTMemoryBufferLen(maxPacketSize)
51 protocolFactory := thrift.NewTCompactProtocolFactory()
52 client := agent.NewAgentClientFactory(thriftBuffer, protocolFactory)
53
54 destAddr, err := net.ResolveUDPAddr("udp", hostPort)
55 if err != nil {
56 return nil, err
57 }
58
59 connUDP, err := net.DialUDP(destAddr.Network(), nil, destAddr)
60 if err != nil {
61 return nil, err
62 }
63 if err := connUDP.SetWriteBuffer(maxPacketSize); err != nil {
64 return nil, err
65 }
66
67 clientUDP := &AgentClientUDP{
68 connUDP: connUDP,
69 client: client,
70 maxPacketSize: maxPacketSize,
71 thriftBuffer: thriftBuffer}
72 return clientUDP, nil
73}
74
75// EmitZipkinBatch implements EmitZipkinBatch() of Agent interface
76func (a *AgentClientUDP) EmitZipkinBatch(spans []*zipkincore.Span) error {
77 return errors.New("Not implemented")
78}
79
80// EmitBatch implements EmitBatch() of Agent interface
81func (a *AgentClientUDP) EmitBatch(batch *jaeger.Batch) error {
82 a.thriftBuffer.Reset()
83 a.client.SeqId = 0 // we have no need for distinct SeqIds for our one-way UDP messages
84 if err := a.client.EmitBatch(batch); err != nil {
85 return err
86 }
87 if a.thriftBuffer.Len() > a.maxPacketSize {
88 return fmt.Errorf("data does not fit within one UDP packet; size %d, max %d, spans %d",
89 a.thriftBuffer.Len(), a.maxPacketSize, len(batch.Spans))
90 }
91 _, err := a.connUDP.Write(a.thriftBuffer.Bytes())
92 return err
93}
94
95// Close implements Close() of io.Closer and closes the underlying UDP connection.
96func (a *AgentClientUDP) Close() error {
97 return a.connUDP.Close()
98}