blob: 0cdb41831509d83c64fe767d6d6c2bc29340615b [file] [log] [blame]
khenaidoo5fc5cea2021-08-11 17:39:16 -04001/*
2 *
3 * Copyright 2018 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package binarylog
20
21import (
22 "net"
23 "strings"
24 "sync/atomic"
25 "time"
26
27 "github.com/golang/protobuf/proto"
28 "github.com/golang/protobuf/ptypes"
29 pb "google.golang.org/grpc/binarylog/grpc_binarylog_v1"
30 "google.golang.org/grpc/metadata"
31 "google.golang.org/grpc/status"
32)
33
34type callIDGenerator struct {
35 id uint64
36}
37
38func (g *callIDGenerator) next() uint64 {
39 id := atomic.AddUint64(&g.id, 1)
40 return id
41}
42
43// reset is for testing only, and doesn't need to be thread safe.
44func (g *callIDGenerator) reset() {
45 g.id = 0
46}
47
48var idGen callIDGenerator
49
50// MethodLogger is the sub-logger for each method.
51type MethodLogger struct {
52 headerMaxLen, messageMaxLen uint64
53
54 callID uint64
55 idWithinCallGen *callIDGenerator
56
57 sink Sink // TODO(blog): make this plugable.
58}
59
60func newMethodLogger(h, m uint64) *MethodLogger {
61 return &MethodLogger{
62 headerMaxLen: h,
63 messageMaxLen: m,
64
65 callID: idGen.next(),
66 idWithinCallGen: &callIDGenerator{},
67
68 sink: DefaultSink, // TODO(blog): make it plugable.
69 }
70}
71
72// Log creates a proto binary log entry, and logs it to the sink.
73func (ml *MethodLogger) Log(c LogEntryConfig) {
74 m := c.toProto()
75 timestamp, _ := ptypes.TimestampProto(time.Now())
76 m.Timestamp = timestamp
77 m.CallId = ml.callID
78 m.SequenceIdWithinCall = ml.idWithinCallGen.next()
79
80 switch pay := m.Payload.(type) {
81 case *pb.GrpcLogEntry_ClientHeader:
82 m.PayloadTruncated = ml.truncateMetadata(pay.ClientHeader.GetMetadata())
83 case *pb.GrpcLogEntry_ServerHeader:
84 m.PayloadTruncated = ml.truncateMetadata(pay.ServerHeader.GetMetadata())
85 case *pb.GrpcLogEntry_Message:
86 m.PayloadTruncated = ml.truncateMessage(pay.Message)
87 }
88
89 ml.sink.Write(m)
90}
91
92func (ml *MethodLogger) truncateMetadata(mdPb *pb.Metadata) (truncated bool) {
93 if ml.headerMaxLen == maxUInt {
94 return false
95 }
96 var (
97 bytesLimit = ml.headerMaxLen
98 index int
99 )
100 // At the end of the loop, index will be the first entry where the total
101 // size is greater than the limit:
102 //
103 // len(entry[:index]) <= ml.hdr && len(entry[:index+1]) > ml.hdr.
104 for ; index < len(mdPb.Entry); index++ {
105 entry := mdPb.Entry[index]
106 if entry.Key == "grpc-trace-bin" {
107 // "grpc-trace-bin" is a special key. It's kept in the log entry,
108 // but not counted towards the size limit.
109 continue
110 }
111 currentEntryLen := uint64(len(entry.Value))
112 if currentEntryLen > bytesLimit {
113 break
114 }
115 bytesLimit -= currentEntryLen
116 }
117 truncated = index < len(mdPb.Entry)
118 mdPb.Entry = mdPb.Entry[:index]
119 return truncated
120}
121
122func (ml *MethodLogger) truncateMessage(msgPb *pb.Message) (truncated bool) {
123 if ml.messageMaxLen == maxUInt {
124 return false
125 }
126 if ml.messageMaxLen >= uint64(len(msgPb.Data)) {
127 return false
128 }
129 msgPb.Data = msgPb.Data[:ml.messageMaxLen]
130 return true
131}
132
133// LogEntryConfig represents the configuration for binary log entry.
134type LogEntryConfig interface {
135 toProto() *pb.GrpcLogEntry
136}
137
138// ClientHeader configs the binary log entry to be a ClientHeader entry.
139type ClientHeader struct {
140 OnClientSide bool
141 Header metadata.MD
142 MethodName string
143 Authority string
144 Timeout time.Duration
145 // PeerAddr is required only when it's on server side.
146 PeerAddr net.Addr
147}
148
149func (c *ClientHeader) toProto() *pb.GrpcLogEntry {
150 // This function doesn't need to set all the fields (e.g. seq ID). The Log
151 // function will set the fields when necessary.
152 clientHeader := &pb.ClientHeader{
153 Metadata: mdToMetadataProto(c.Header),
154 MethodName: c.MethodName,
155 Authority: c.Authority,
156 }
157 if c.Timeout > 0 {
158 clientHeader.Timeout = ptypes.DurationProto(c.Timeout)
159 }
160 ret := &pb.GrpcLogEntry{
161 Type: pb.GrpcLogEntry_EVENT_TYPE_CLIENT_HEADER,
162 Payload: &pb.GrpcLogEntry_ClientHeader{
163 ClientHeader: clientHeader,
164 },
165 }
166 if c.OnClientSide {
167 ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
168 } else {
169 ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
170 }
171 if c.PeerAddr != nil {
172 ret.Peer = addrToProto(c.PeerAddr)
173 }
174 return ret
175}
176
177// ServerHeader configs the binary log entry to be a ServerHeader entry.
178type ServerHeader struct {
179 OnClientSide bool
180 Header metadata.MD
181 // PeerAddr is required only when it's on client side.
182 PeerAddr net.Addr
183}
184
185func (c *ServerHeader) toProto() *pb.GrpcLogEntry {
186 ret := &pb.GrpcLogEntry{
187 Type: pb.GrpcLogEntry_EVENT_TYPE_SERVER_HEADER,
188 Payload: &pb.GrpcLogEntry_ServerHeader{
189 ServerHeader: &pb.ServerHeader{
190 Metadata: mdToMetadataProto(c.Header),
191 },
192 },
193 }
194 if c.OnClientSide {
195 ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
196 } else {
197 ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
198 }
199 if c.PeerAddr != nil {
200 ret.Peer = addrToProto(c.PeerAddr)
201 }
202 return ret
203}
204
205// ClientMessage configs the binary log entry to be a ClientMessage entry.
206type ClientMessage struct {
207 OnClientSide bool
208 // Message can be a proto.Message or []byte. Other messages formats are not
209 // supported.
210 Message interface{}
211}
212
213func (c *ClientMessage) toProto() *pb.GrpcLogEntry {
214 var (
215 data []byte
216 err error
217 )
218 if m, ok := c.Message.(proto.Message); ok {
219 data, err = proto.Marshal(m)
220 if err != nil {
221 grpclogLogger.Infof("binarylogging: failed to marshal proto message: %v", err)
222 }
223 } else if b, ok := c.Message.([]byte); ok {
224 data = b
225 } else {
226 grpclogLogger.Infof("binarylogging: message to log is neither proto.message nor []byte")
227 }
228 ret := &pb.GrpcLogEntry{
229 Type: pb.GrpcLogEntry_EVENT_TYPE_CLIENT_MESSAGE,
230 Payload: &pb.GrpcLogEntry_Message{
231 Message: &pb.Message{
232 Length: uint32(len(data)),
233 Data: data,
234 },
235 },
236 }
237 if c.OnClientSide {
238 ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
239 } else {
240 ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
241 }
242 return ret
243}
244
245// ServerMessage configs the binary log entry to be a ServerMessage entry.
246type ServerMessage struct {
247 OnClientSide bool
248 // Message can be a proto.Message or []byte. Other messages formats are not
249 // supported.
250 Message interface{}
251}
252
253func (c *ServerMessage) toProto() *pb.GrpcLogEntry {
254 var (
255 data []byte
256 err error
257 )
258 if m, ok := c.Message.(proto.Message); ok {
259 data, err = proto.Marshal(m)
260 if err != nil {
261 grpclogLogger.Infof("binarylogging: failed to marshal proto message: %v", err)
262 }
263 } else if b, ok := c.Message.([]byte); ok {
264 data = b
265 } else {
266 grpclogLogger.Infof("binarylogging: message to log is neither proto.message nor []byte")
267 }
268 ret := &pb.GrpcLogEntry{
269 Type: pb.GrpcLogEntry_EVENT_TYPE_SERVER_MESSAGE,
270 Payload: &pb.GrpcLogEntry_Message{
271 Message: &pb.Message{
272 Length: uint32(len(data)),
273 Data: data,
274 },
275 },
276 }
277 if c.OnClientSide {
278 ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
279 } else {
280 ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
281 }
282 return ret
283}
284
285// ClientHalfClose configs the binary log entry to be a ClientHalfClose entry.
286type ClientHalfClose struct {
287 OnClientSide bool
288}
289
290func (c *ClientHalfClose) toProto() *pb.GrpcLogEntry {
291 ret := &pb.GrpcLogEntry{
292 Type: pb.GrpcLogEntry_EVENT_TYPE_CLIENT_HALF_CLOSE,
293 Payload: nil, // No payload here.
294 }
295 if c.OnClientSide {
296 ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
297 } else {
298 ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
299 }
300 return ret
301}
302
303// ServerTrailer configs the binary log entry to be a ServerTrailer entry.
304type ServerTrailer struct {
305 OnClientSide bool
306 Trailer metadata.MD
307 // Err is the status error.
308 Err error
309 // PeerAddr is required only when it's on client side and the RPC is trailer
310 // only.
311 PeerAddr net.Addr
312}
313
314func (c *ServerTrailer) toProto() *pb.GrpcLogEntry {
315 st, ok := status.FromError(c.Err)
316 if !ok {
317 grpclogLogger.Info("binarylogging: error in trailer is not a status error")
318 }
319 var (
320 detailsBytes []byte
321 err error
322 )
323 stProto := st.Proto()
324 if stProto != nil && len(stProto.Details) != 0 {
325 detailsBytes, err = proto.Marshal(stProto)
326 if err != nil {
327 grpclogLogger.Infof("binarylogging: failed to marshal status proto: %v", err)
328 }
329 }
330 ret := &pb.GrpcLogEntry{
331 Type: pb.GrpcLogEntry_EVENT_TYPE_SERVER_TRAILER,
332 Payload: &pb.GrpcLogEntry_Trailer{
333 Trailer: &pb.Trailer{
334 Metadata: mdToMetadataProto(c.Trailer),
335 StatusCode: uint32(st.Code()),
336 StatusMessage: st.Message(),
337 StatusDetails: detailsBytes,
338 },
339 },
340 }
341 if c.OnClientSide {
342 ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
343 } else {
344 ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
345 }
346 if c.PeerAddr != nil {
347 ret.Peer = addrToProto(c.PeerAddr)
348 }
349 return ret
350}
351
352// Cancel configs the binary log entry to be a Cancel entry.
353type Cancel struct {
354 OnClientSide bool
355}
356
357func (c *Cancel) toProto() *pb.GrpcLogEntry {
358 ret := &pb.GrpcLogEntry{
359 Type: pb.GrpcLogEntry_EVENT_TYPE_CANCEL,
360 Payload: nil,
361 }
362 if c.OnClientSide {
363 ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
364 } else {
365 ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
366 }
367 return ret
368}
369
370// metadataKeyOmit returns whether the metadata entry with this key should be
371// omitted.
372func metadataKeyOmit(key string) bool {
373 switch key {
374 case "lb-token", ":path", ":authority", "content-encoding", "content-type", "user-agent", "te":
375 return true
376 case "grpc-trace-bin": // grpc-trace-bin is special because it's visiable to users.
377 return false
378 }
379 return strings.HasPrefix(key, "grpc-")
380}
381
382func mdToMetadataProto(md metadata.MD) *pb.Metadata {
383 ret := &pb.Metadata{}
384 for k, vv := range md {
385 if metadataKeyOmit(k) {
386 continue
387 }
388 for _, v := range vv {
389 ret.Entry = append(ret.Entry,
390 &pb.MetadataEntry{
391 Key: k,
392 Value: []byte(v),
393 },
394 )
395 }
396 }
397 return ret
398}
399
400func addrToProto(addr net.Addr) *pb.Address {
401 ret := &pb.Address{}
402 switch a := addr.(type) {
403 case *net.TCPAddr:
404 if a.IP.To4() != nil {
405 ret.Type = pb.Address_TYPE_IPV4
406 } else if a.IP.To16() != nil {
407 ret.Type = pb.Address_TYPE_IPV6
408 } else {
409 ret.Type = pb.Address_TYPE_UNKNOWN
410 // Do not set address and port fields.
411 break
412 }
413 ret.Address = a.IP.String()
414 ret.IpPort = uint32(a.Port)
415 case *net.UnixAddr:
416 ret.Type = pb.Address_TYPE_UNIX
417 ret.Address = a.String()
418 default:
419 ret.Type = pb.Address_TYPE_UNKNOWN
420 }
421 return ret
422}