Girish Gowdra | 631ef3d | 2020-06-15 10:45:52 -0700 | [diff] [blame] | 1 | // Copyright (c) 2017 Uber Technologies, Inc. |
| 2 | // |
| 3 | // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | // you may not use this file except in compliance with the License. |
| 5 | // You may obtain a copy of the License at |
| 6 | // |
| 7 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | // |
| 9 | // Unless required by applicable law or agreed to in writing, software |
| 10 | // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | // See the License for the specific language governing permissions and |
| 13 | // limitations under the License. |
| 14 | |
| 15 | package utils |
| 16 | |
| 17 | import ( |
| 18 | "errors" |
| 19 | "fmt" |
| 20 | "io" |
| 21 | "net" |
| 22 | |
| 23 | "github.com/uber/jaeger-client-go/thrift" |
| 24 | |
| 25 | "github.com/uber/jaeger-client-go/thrift-gen/agent" |
| 26 | "github.com/uber/jaeger-client-go/thrift-gen/jaeger" |
| 27 | "github.com/uber/jaeger-client-go/thrift-gen/zipkincore" |
| 28 | ) |
| 29 | |
| 30 | // UDPPacketMaxLength is the max size of UDP packet we want to send, synced with jaeger-agent |
| 31 | const UDPPacketMaxLength = 65000 |
| 32 | |
| 33 | // AgentClientUDP is a UDP client to Jaeger agent that implements agent.Agent interface. |
| 34 | type AgentClientUDP struct { |
| 35 | agent.Agent |
| 36 | io.Closer |
| 37 | |
| 38 | connUDP *net.UDPConn |
| 39 | client *agent.AgentClient |
| 40 | maxPacketSize int // max size of datagram in bytes |
| 41 | thriftBuffer *thrift.TMemoryBuffer // buffer used to calculate byte size of a span |
| 42 | } |
| 43 | |
| 44 | // NewAgentClientUDP creates a client that sends spans to Jaeger Agent over UDP. |
| 45 | func NewAgentClientUDP(hostPort string, maxPacketSize int) (*AgentClientUDP, error) { |
| 46 | if maxPacketSize == 0 { |
| 47 | maxPacketSize = UDPPacketMaxLength |
| 48 | } |
| 49 | |
| 50 | thriftBuffer := thrift.NewTMemoryBufferLen(maxPacketSize) |
| 51 | protocolFactory := thrift.NewTCompactProtocolFactory() |
| 52 | client := agent.NewAgentClientFactory(thriftBuffer, protocolFactory) |
| 53 | |
| 54 | destAddr, err := net.ResolveUDPAddr("udp", hostPort) |
| 55 | if err != nil { |
| 56 | return nil, err |
| 57 | } |
| 58 | |
| 59 | connUDP, err := net.DialUDP(destAddr.Network(), nil, destAddr) |
| 60 | if err != nil { |
| 61 | return nil, err |
| 62 | } |
| 63 | if err := connUDP.SetWriteBuffer(maxPacketSize); err != nil { |
| 64 | return nil, err |
| 65 | } |
| 66 | |
| 67 | clientUDP := &AgentClientUDP{ |
| 68 | connUDP: connUDP, |
| 69 | client: client, |
| 70 | maxPacketSize: maxPacketSize, |
| 71 | thriftBuffer: thriftBuffer} |
| 72 | return clientUDP, nil |
| 73 | } |
| 74 | |
| 75 | // EmitZipkinBatch implements EmitZipkinBatch() of Agent interface |
| 76 | func (a *AgentClientUDP) EmitZipkinBatch(spans []*zipkincore.Span) error { |
| 77 | return errors.New("Not implemented") |
| 78 | } |
| 79 | |
| 80 | // EmitBatch implements EmitBatch() of Agent interface |
| 81 | func (a *AgentClientUDP) EmitBatch(batch *jaeger.Batch) error { |
| 82 | a.thriftBuffer.Reset() |
| 83 | a.client.SeqId = 0 // we have no need for distinct SeqIds for our one-way UDP messages |
| 84 | if err := a.client.EmitBatch(batch); err != nil { |
| 85 | return err |
| 86 | } |
| 87 | if a.thriftBuffer.Len() > a.maxPacketSize { |
| 88 | return fmt.Errorf("data does not fit within one UDP packet; size %d, max %d, spans %d", |
| 89 | a.thriftBuffer.Len(), a.maxPacketSize, len(batch.Spans)) |
| 90 | } |
| 91 | _, err := a.connUDP.Write(a.thriftBuffer.Bytes()) |
| 92 | return err |
| 93 | } |
| 94 | |
| 95 | // Close implements Close() of io.Closer and closes the underlying UDP connection. |
| 96 | func (a *AgentClientUDP) Close() error { |
| 97 | return a.connUDP.Close() |
| 98 | } |