blob: 4c59ae9dd8adcdb543c8553c78ff9b7cb96188f2 [file] [log] [blame]
Girish Gowdra631ef3d2020-06-15 10:45:52 -07001// Copyright (c) 2017 Uber Technologies, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package utils
16
17import (
khenaidoo106c61a2021-08-11 18:05:46 -040018 "context"
Girish Gowdra631ef3d2020-06-15 10:45:52 -070019 "errors"
20 "fmt"
21 "io"
22 "net"
khenaidoo106c61a2021-08-11 18:05:46 -040023 "time"
Girish Gowdra631ef3d2020-06-15 10:45:52 -070024
khenaidoo106c61a2021-08-11 18:05:46 -040025 "github.com/uber/jaeger-client-go/log"
Girish Gowdra631ef3d2020-06-15 10:45:52 -070026 "github.com/uber/jaeger-client-go/thrift"
27
28 "github.com/uber/jaeger-client-go/thrift-gen/agent"
29 "github.com/uber/jaeger-client-go/thrift-gen/jaeger"
30 "github.com/uber/jaeger-client-go/thrift-gen/zipkincore"
31)
32
33// UDPPacketMaxLength is the max size of UDP packet we want to send, synced with jaeger-agent
34const UDPPacketMaxLength = 65000
35
36// AgentClientUDP is a UDP client to Jaeger agent that implements agent.Agent interface.
37type AgentClientUDP struct {
38 agent.Agent
39 io.Closer
40
khenaidoo106c61a2021-08-11 18:05:46 -040041 connUDP udpConn
Girish Gowdra631ef3d2020-06-15 10:45:52 -070042 client *agent.AgentClient
43 maxPacketSize int // max size of datagram in bytes
44 thriftBuffer *thrift.TMemoryBuffer // buffer used to calculate byte size of a span
45}
46
khenaidoo106c61a2021-08-11 18:05:46 -040047type udpConn interface {
48 Write([]byte) (int, error)
49 SetWriteBuffer(int) error
50 Close() error
51}
52
53// AgentClientUDPParams allows specifying options for initializing an AgentClientUDP. An instance of this struct should
54// be passed to NewAgentClientUDPWithParams.
55type AgentClientUDPParams struct {
56 HostPort string
57 MaxPacketSize int
58 Logger log.Logger
59 DisableAttemptReconnecting bool
60 AttemptReconnectInterval time.Duration
61}
62
63// NewAgentClientUDPWithParams creates a client that sends spans to Jaeger Agent over UDP.
64func NewAgentClientUDPWithParams(params AgentClientUDPParams) (*AgentClientUDP, error) {
65 // validate hostport
66 if _, _, err := net.SplitHostPort(params.HostPort); err != nil {
67 return nil, err
Girish Gowdra631ef3d2020-06-15 10:45:52 -070068 }
69
khenaidoo106c61a2021-08-11 18:05:46 -040070 if params.MaxPacketSize == 0 {
71 params.MaxPacketSize = UDPPacketMaxLength
72 }
73
74 if params.Logger == nil {
75 params.Logger = log.StdLogger
76 }
77
78 if !params.DisableAttemptReconnecting && params.AttemptReconnectInterval == 0 {
79 params.AttemptReconnectInterval = time.Second * 30
80 }
81
82 thriftBuffer := thrift.NewTMemoryBufferLen(params.MaxPacketSize)
Girish Gowdra631ef3d2020-06-15 10:45:52 -070083 protocolFactory := thrift.NewTCompactProtocolFactory()
84 client := agent.NewAgentClientFactory(thriftBuffer, protocolFactory)
85
khenaidoo106c61a2021-08-11 18:05:46 -040086 var connUDP udpConn
87 var err error
88
89 if params.DisableAttemptReconnecting {
90 destAddr, err := net.ResolveUDPAddr("udp", params.HostPort)
91 if err != nil {
92 return nil, err
93 }
94
95 connUDP, err = net.DialUDP(destAddr.Network(), nil, destAddr)
96 if err != nil {
97 return nil, err
98 }
99 } else {
100 // host is hostname, setup resolver loop in case host record changes during operation
101 connUDP, err = newReconnectingUDPConn(params.HostPort, params.AttemptReconnectInterval, net.ResolveUDPAddr, net.DialUDP, params.Logger)
102 if err != nil {
103 return nil, err
104 }
105 }
106
107 if err := connUDP.SetWriteBuffer(params.MaxPacketSize); err != nil {
Girish Gowdra631ef3d2020-06-15 10:45:52 -0700108 return nil, err
109 }
110
khenaidoo106c61a2021-08-11 18:05:46 -0400111 return &AgentClientUDP{
Girish Gowdra631ef3d2020-06-15 10:45:52 -0700112 connUDP: connUDP,
113 client: client,
khenaidoo106c61a2021-08-11 18:05:46 -0400114 maxPacketSize: params.MaxPacketSize,
115 thriftBuffer: thriftBuffer,
116 }, nil
117}
118
119// NewAgentClientUDP creates a client that sends spans to Jaeger Agent over UDP.
120func NewAgentClientUDP(hostPort string, maxPacketSize int) (*AgentClientUDP, error) {
121 return NewAgentClientUDPWithParams(AgentClientUDPParams{
122 HostPort: hostPort,
123 MaxPacketSize: maxPacketSize,
124 })
Girish Gowdra631ef3d2020-06-15 10:45:52 -0700125}
126
127// EmitZipkinBatch implements EmitZipkinBatch() of Agent interface
khenaidoo106c61a2021-08-11 18:05:46 -0400128func (a *AgentClientUDP) EmitZipkinBatch(context.Context, []*zipkincore.Span) error {
Girish Gowdra631ef3d2020-06-15 10:45:52 -0700129 return errors.New("Not implemented")
130}
131
132// EmitBatch implements EmitBatch() of Agent interface
khenaidoo106c61a2021-08-11 18:05:46 -0400133func (a *AgentClientUDP) EmitBatch(ctx context.Context, batch *jaeger.Batch) error {
Girish Gowdra631ef3d2020-06-15 10:45:52 -0700134 a.thriftBuffer.Reset()
khenaidoo106c61a2021-08-11 18:05:46 -0400135 if err := a.client.EmitBatch(ctx, batch); err != nil {
Girish Gowdra631ef3d2020-06-15 10:45:52 -0700136 return err
137 }
138 if a.thriftBuffer.Len() > a.maxPacketSize {
139 return fmt.Errorf("data does not fit within one UDP packet; size %d, max %d, spans %d",
140 a.thriftBuffer.Len(), a.maxPacketSize, len(batch.Spans))
141 }
142 _, err := a.connUDP.Write(a.thriftBuffer.Bytes())
143 return err
144}
145
146// Close implements Close() of io.Closer and closes the underlying UDP connection.
147func (a *AgentClientUDP) Close() error {
148 return a.connUDP.Close()
149}