blob: 160f6e8616f5bb01c4d94b86bd2ba5a50a378937 [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001/*
2 *
3 * Copyright 2018 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package binarylog
20
21import (
22 "net"
23 "strings"
24 "sync/atomic"
25 "time"
26
27 "github.com/golang/protobuf/proto"
28 "github.com/golang/protobuf/ptypes"
29 pb "google.golang.org/grpc/binarylog/grpc_binarylog_v1"
30 "google.golang.org/grpc/grpclog"
31 "google.golang.org/grpc/metadata"
32 "google.golang.org/grpc/status"
33)
34
35type callIDGenerator struct {
36 id uint64
37}
38
39func (g *callIDGenerator) next() uint64 {
40 id := atomic.AddUint64(&g.id, 1)
41 return id
42}
43
44// reset is for testing only, and doesn't need to be thread safe.
45func (g *callIDGenerator) reset() {
46 g.id = 0
47}
48
49var idGen callIDGenerator
50
51// MethodLogger is the sub-logger for each method.
52type MethodLogger struct {
53 headerMaxLen, messageMaxLen uint64
54
55 callID uint64
56 idWithinCallGen *callIDGenerator
57
58 sink Sink // TODO(blog): make this plugable.
59}
60
61func newMethodLogger(h, m uint64) *MethodLogger {
62 return &MethodLogger{
63 headerMaxLen: h,
64 messageMaxLen: m,
65
66 callID: idGen.next(),
67 idWithinCallGen: &callIDGenerator{},
68
69 sink: defaultSink, // TODO(blog): make it plugable.
70 }
71}
72
73// Log creates a proto binary log entry, and logs it to the sink.
74func (ml *MethodLogger) Log(c LogEntryConfig) {
75 m := c.toProto()
76 timestamp, _ := ptypes.TimestampProto(time.Now())
77 m.Timestamp = timestamp
78 m.CallId = ml.callID
79 m.SequenceIdWithinCall = ml.idWithinCallGen.next()
80
81 switch pay := m.Payload.(type) {
82 case *pb.GrpcLogEntry_ClientHeader:
83 m.PayloadTruncated = ml.truncateMetadata(pay.ClientHeader.GetMetadata())
84 case *pb.GrpcLogEntry_ServerHeader:
85 m.PayloadTruncated = ml.truncateMetadata(pay.ServerHeader.GetMetadata())
86 case *pb.GrpcLogEntry_Message:
87 m.PayloadTruncated = ml.truncateMessage(pay.Message)
88 }
89
90 ml.sink.Write(m)
91}
92
93func (ml *MethodLogger) truncateMetadata(mdPb *pb.Metadata) (truncated bool) {
94 if ml.headerMaxLen == maxUInt {
95 return false
96 }
97 var (
98 bytesLimit = ml.headerMaxLen
99 index int
100 )
101 // At the end of the loop, index will be the first entry where the total
102 // size is greater than the limit:
103 //
104 // len(entry[:index]) <= ml.hdr && len(entry[:index+1]) > ml.hdr.
105 for ; index < len(mdPb.Entry); index++ {
106 entry := mdPb.Entry[index]
107 if entry.Key == "grpc-trace-bin" {
108 // "grpc-trace-bin" is a special key. It's kept in the log entry,
109 // but not counted towards the size limit.
110 continue
111 }
112 currentEntryLen := uint64(len(entry.Value))
113 if currentEntryLen > bytesLimit {
114 break
115 }
116 bytesLimit -= currentEntryLen
117 }
118 truncated = index < len(mdPb.Entry)
119 mdPb.Entry = mdPb.Entry[:index]
120 return truncated
121}
122
123func (ml *MethodLogger) truncateMessage(msgPb *pb.Message) (truncated bool) {
124 if ml.messageMaxLen == maxUInt {
125 return false
126 }
127 if ml.messageMaxLen >= uint64(len(msgPb.Data)) {
128 return false
129 }
130 msgPb.Data = msgPb.Data[:ml.messageMaxLen]
131 return true
132}
133
134// LogEntryConfig represents the configuration for binary log entry.
135type LogEntryConfig interface {
136 toProto() *pb.GrpcLogEntry
137}
138
139// ClientHeader configs the binary log entry to be a ClientHeader entry.
140type ClientHeader struct {
141 OnClientSide bool
142 Header metadata.MD
143 MethodName string
144 Authority string
145 Timeout time.Duration
146 // PeerAddr is required only when it's on server side.
147 PeerAddr net.Addr
148}
149
150func (c *ClientHeader) toProto() *pb.GrpcLogEntry {
151 // This function doesn't need to set all the fields (e.g. seq ID). The Log
152 // function will set the fields when necessary.
153 clientHeader := &pb.ClientHeader{
154 Metadata: mdToMetadataProto(c.Header),
155 MethodName: c.MethodName,
156 Authority: c.Authority,
157 }
158 if c.Timeout > 0 {
159 clientHeader.Timeout = ptypes.DurationProto(c.Timeout)
160 }
161 ret := &pb.GrpcLogEntry{
162 Type: pb.GrpcLogEntry_EVENT_TYPE_CLIENT_HEADER,
163 Payload: &pb.GrpcLogEntry_ClientHeader{
164 ClientHeader: clientHeader,
165 },
166 }
167 if c.OnClientSide {
168 ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
169 } else {
170 ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
171 }
172 if c.PeerAddr != nil {
173 ret.Peer = addrToProto(c.PeerAddr)
174 }
175 return ret
176}
177
178// ServerHeader configs the binary log entry to be a ServerHeader entry.
179type ServerHeader struct {
180 OnClientSide bool
181 Header metadata.MD
182 // PeerAddr is required only when it's on client side.
183 PeerAddr net.Addr
184}
185
186func (c *ServerHeader) toProto() *pb.GrpcLogEntry {
187 ret := &pb.GrpcLogEntry{
188 Type: pb.GrpcLogEntry_EVENT_TYPE_SERVER_HEADER,
189 Payload: &pb.GrpcLogEntry_ServerHeader{
190 ServerHeader: &pb.ServerHeader{
191 Metadata: mdToMetadataProto(c.Header),
192 },
193 },
194 }
195 if c.OnClientSide {
196 ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
197 } else {
198 ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
199 }
200 if c.PeerAddr != nil {
201 ret.Peer = addrToProto(c.PeerAddr)
202 }
203 return ret
204}
205
206// ClientMessage configs the binary log entry to be a ClientMessage entry.
207type ClientMessage struct {
208 OnClientSide bool
209 // Message can be a proto.Message or []byte. Other messages formats are not
210 // supported.
211 Message interface{}
212}
213
214func (c *ClientMessage) toProto() *pb.GrpcLogEntry {
215 var (
216 data []byte
217 err error
218 )
219 if m, ok := c.Message.(proto.Message); ok {
220 data, err = proto.Marshal(m)
221 if err != nil {
222 grpclog.Infof("binarylogging: failed to marshal proto message: %v", err)
223 }
224 } else if b, ok := c.Message.([]byte); ok {
225 data = b
226 } else {
227 grpclog.Infof("binarylogging: message to log is neither proto.message nor []byte")
228 }
229 ret := &pb.GrpcLogEntry{
230 Type: pb.GrpcLogEntry_EVENT_TYPE_CLIENT_MESSAGE,
231 Payload: &pb.GrpcLogEntry_Message{
232 Message: &pb.Message{
233 Length: uint32(len(data)),
234 Data: data,
235 },
236 },
237 }
238 if c.OnClientSide {
239 ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
240 } else {
241 ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
242 }
243 return ret
244}
245
246// ServerMessage configs the binary log entry to be a ServerMessage entry.
247type ServerMessage struct {
248 OnClientSide bool
249 // Message can be a proto.Message or []byte. Other messages formats are not
250 // supported.
251 Message interface{}
252}
253
254func (c *ServerMessage) toProto() *pb.GrpcLogEntry {
255 var (
256 data []byte
257 err error
258 )
259 if m, ok := c.Message.(proto.Message); ok {
260 data, err = proto.Marshal(m)
261 if err != nil {
262 grpclog.Infof("binarylogging: failed to marshal proto message: %v", err)
263 }
264 } else if b, ok := c.Message.([]byte); ok {
265 data = b
266 } else {
267 grpclog.Infof("binarylogging: message to log is neither proto.message nor []byte")
268 }
269 ret := &pb.GrpcLogEntry{
270 Type: pb.GrpcLogEntry_EVENT_TYPE_SERVER_MESSAGE,
271 Payload: &pb.GrpcLogEntry_Message{
272 Message: &pb.Message{
273 Length: uint32(len(data)),
274 Data: data,
275 },
276 },
277 }
278 if c.OnClientSide {
279 ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
280 } else {
281 ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
282 }
283 return ret
284}
285
286// ClientHalfClose configs the binary log entry to be a ClientHalfClose entry.
287type ClientHalfClose struct {
288 OnClientSide bool
289}
290
291func (c *ClientHalfClose) toProto() *pb.GrpcLogEntry {
292 ret := &pb.GrpcLogEntry{
293 Type: pb.GrpcLogEntry_EVENT_TYPE_CLIENT_HALF_CLOSE,
294 Payload: nil, // No payload here.
295 }
296 if c.OnClientSide {
297 ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
298 } else {
299 ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
300 }
301 return ret
302}
303
304// ServerTrailer configs the binary log entry to be a ServerTrailer entry.
305type ServerTrailer struct {
306 OnClientSide bool
307 Trailer metadata.MD
308 // Err is the status error.
309 Err error
310 // PeerAddr is required only when it's on client side and the RPC is trailer
311 // only.
312 PeerAddr net.Addr
313}
314
315func (c *ServerTrailer) toProto() *pb.GrpcLogEntry {
316 st, ok := status.FromError(c.Err)
317 if !ok {
318 grpclog.Info("binarylogging: error in trailer is not a status error")
319 }
320 var (
321 detailsBytes []byte
322 err error
323 )
324 stProto := st.Proto()
325 if stProto != nil && len(stProto.Details) != 0 {
326 detailsBytes, err = proto.Marshal(stProto)
327 if err != nil {
328 grpclog.Infof("binarylogging: failed to marshal status proto: %v", err)
329 }
330 }
331 ret := &pb.GrpcLogEntry{
332 Type: pb.GrpcLogEntry_EVENT_TYPE_SERVER_TRAILER,
333 Payload: &pb.GrpcLogEntry_Trailer{
334 Trailer: &pb.Trailer{
335 Metadata: mdToMetadataProto(c.Trailer),
336 StatusCode: uint32(st.Code()),
337 StatusMessage: st.Message(),
338 StatusDetails: detailsBytes,
339 },
340 },
341 }
342 if c.OnClientSide {
343 ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
344 } else {
345 ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
346 }
347 if c.PeerAddr != nil {
348 ret.Peer = addrToProto(c.PeerAddr)
349 }
350 return ret
351}
352
353// Cancel configs the binary log entry to be a Cancel entry.
354type Cancel struct {
355 OnClientSide bool
356}
357
358func (c *Cancel) toProto() *pb.GrpcLogEntry {
359 ret := &pb.GrpcLogEntry{
360 Type: pb.GrpcLogEntry_EVENT_TYPE_CANCEL,
361 Payload: nil,
362 }
363 if c.OnClientSide {
364 ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
365 } else {
366 ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
367 }
368 return ret
369}
370
371// metadataKeyOmit returns whether the metadata entry with this key should be
372// omitted.
373func metadataKeyOmit(key string) bool {
374 switch key {
375 case "lb-token", ":path", ":authority", "content-encoding", "content-type", "user-agent", "te":
376 return true
377 case "grpc-trace-bin": // grpc-trace-bin is special because it's visiable to users.
378 return false
379 }
380 return strings.HasPrefix(key, "grpc-")
381}
382
383func mdToMetadataProto(md metadata.MD) *pb.Metadata {
384 ret := &pb.Metadata{}
385 for k, vv := range md {
386 if metadataKeyOmit(k) {
387 continue
388 }
389 for _, v := range vv {
390 ret.Entry = append(ret.Entry,
391 &pb.MetadataEntry{
392 Key: k,
393 Value: []byte(v),
394 },
395 )
396 }
397 }
398 return ret
399}
400
401func addrToProto(addr net.Addr) *pb.Address {
402 ret := &pb.Address{}
403 switch a := addr.(type) {
404 case *net.TCPAddr:
405 if a.IP.To4() != nil {
406 ret.Type = pb.Address_TYPE_IPV4
407 } else if a.IP.To16() != nil {
408 ret.Type = pb.Address_TYPE_IPV6
409 } else {
410 ret.Type = pb.Address_TYPE_UNKNOWN
411 // Do not set address and port fields.
412 break
413 }
414 ret.Address = a.IP.String()
415 ret.IpPort = uint32(a.Port)
416 case *net.UnixAddr:
417 ret.Type = pb.Address_TYPE_UNIX
418 ret.Address = a.String()
419 default:
420 ret.Type = pb.Address_TYPE_UNKNOWN
421 }
422 return ret
423}