| /* |
| * |
| * Copyright 2018 gRPC authors. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| |
| package binarylog |
| |
| import ( |
| "net" |
| "strings" |
| "sync/atomic" |
| "time" |
| |
| "github.com/golang/protobuf/proto" |
| "github.com/golang/protobuf/ptypes" |
| pb "google.golang.org/grpc/binarylog/grpc_binarylog_v1" |
| "google.golang.org/grpc/grpclog" |
| "google.golang.org/grpc/metadata" |
| "google.golang.org/grpc/status" |
| ) |
| |
| type callIDGenerator struct { |
| id uint64 |
| } |
| |
| func (g *callIDGenerator) next() uint64 { |
| id := atomic.AddUint64(&g.id, 1) |
| return id |
| } |
| |
| // reset is for testing only, and doesn't need to be thread safe. |
| func (g *callIDGenerator) reset() { |
| g.id = 0 |
| } |
| |
| var idGen callIDGenerator |
| |
| // MethodLogger is the sub-logger for each method. |
| type MethodLogger struct { |
| headerMaxLen, messageMaxLen uint64 |
| |
| callID uint64 |
| idWithinCallGen *callIDGenerator |
| |
| sink Sink // TODO(blog): make this plugable. |
| } |
| |
| func newMethodLogger(h, m uint64) *MethodLogger { |
| return &MethodLogger{ |
| headerMaxLen: h, |
| messageMaxLen: m, |
| |
| callID: idGen.next(), |
| idWithinCallGen: &callIDGenerator{}, |
| |
| sink: defaultSink, // TODO(blog): make it plugable. |
| } |
| } |
| |
| // Log creates a proto binary log entry, and logs it to the sink. |
| func (ml *MethodLogger) Log(c LogEntryConfig) { |
| m := c.toProto() |
| timestamp, _ := ptypes.TimestampProto(time.Now()) |
| m.Timestamp = timestamp |
| m.CallId = ml.callID |
| m.SequenceIdWithinCall = ml.idWithinCallGen.next() |
| |
| switch pay := m.Payload.(type) { |
| case *pb.GrpcLogEntry_ClientHeader: |
| m.PayloadTruncated = ml.truncateMetadata(pay.ClientHeader.GetMetadata()) |
| case *pb.GrpcLogEntry_ServerHeader: |
| m.PayloadTruncated = ml.truncateMetadata(pay.ServerHeader.GetMetadata()) |
| case *pb.GrpcLogEntry_Message: |
| m.PayloadTruncated = ml.truncateMessage(pay.Message) |
| } |
| |
| ml.sink.Write(m) |
| } |
| |
| func (ml *MethodLogger) truncateMetadata(mdPb *pb.Metadata) (truncated bool) { |
| if ml.headerMaxLen == maxUInt { |
| return false |
| } |
| var ( |
| bytesLimit = ml.headerMaxLen |
| index int |
| ) |
| // At the end of the loop, index will be the first entry where the total |
| // size is greater than the limit: |
| // |
| // len(entry[:index]) <= ml.hdr && len(entry[:index+1]) > ml.hdr. |
| for ; index < len(mdPb.Entry); index++ { |
| entry := mdPb.Entry[index] |
| if entry.Key == "grpc-trace-bin" { |
| // "grpc-trace-bin" is a special key. It's kept in the log entry, |
| // but not counted towards the size limit. |
| continue |
| } |
| currentEntryLen := uint64(len(entry.Value)) |
| if currentEntryLen > bytesLimit { |
| break |
| } |
| bytesLimit -= currentEntryLen |
| } |
| truncated = index < len(mdPb.Entry) |
| mdPb.Entry = mdPb.Entry[:index] |
| return truncated |
| } |
| |
| func (ml *MethodLogger) truncateMessage(msgPb *pb.Message) (truncated bool) { |
| if ml.messageMaxLen == maxUInt { |
| return false |
| } |
| if ml.messageMaxLen >= uint64(len(msgPb.Data)) { |
| return false |
| } |
| msgPb.Data = msgPb.Data[:ml.messageMaxLen] |
| return true |
| } |
| |
| // LogEntryConfig represents the configuration for binary log entry. |
| type LogEntryConfig interface { |
| toProto() *pb.GrpcLogEntry |
| } |
| |
| // ClientHeader configs the binary log entry to be a ClientHeader entry. |
| type ClientHeader struct { |
| OnClientSide bool |
| Header metadata.MD |
| MethodName string |
| Authority string |
| Timeout time.Duration |
| // PeerAddr is required only when it's on server side. |
| PeerAddr net.Addr |
| } |
| |
| func (c *ClientHeader) toProto() *pb.GrpcLogEntry { |
| // This function doesn't need to set all the fields (e.g. seq ID). The Log |
| // function will set the fields when necessary. |
| clientHeader := &pb.ClientHeader{ |
| Metadata: mdToMetadataProto(c.Header), |
| MethodName: c.MethodName, |
| Authority: c.Authority, |
| } |
| if c.Timeout > 0 { |
| clientHeader.Timeout = ptypes.DurationProto(c.Timeout) |
| } |
| ret := &pb.GrpcLogEntry{ |
| Type: pb.GrpcLogEntry_EVENT_TYPE_CLIENT_HEADER, |
| Payload: &pb.GrpcLogEntry_ClientHeader{ |
| ClientHeader: clientHeader, |
| }, |
| } |
| if c.OnClientSide { |
| ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT |
| } else { |
| ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER |
| } |
| if c.PeerAddr != nil { |
| ret.Peer = addrToProto(c.PeerAddr) |
| } |
| return ret |
| } |
| |
| // ServerHeader configs the binary log entry to be a ServerHeader entry. |
| type ServerHeader struct { |
| OnClientSide bool |
| Header metadata.MD |
| // PeerAddr is required only when it's on client side. |
| PeerAddr net.Addr |
| } |
| |
| func (c *ServerHeader) toProto() *pb.GrpcLogEntry { |
| ret := &pb.GrpcLogEntry{ |
| Type: pb.GrpcLogEntry_EVENT_TYPE_SERVER_HEADER, |
| Payload: &pb.GrpcLogEntry_ServerHeader{ |
| ServerHeader: &pb.ServerHeader{ |
| Metadata: mdToMetadataProto(c.Header), |
| }, |
| }, |
| } |
| if c.OnClientSide { |
| ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT |
| } else { |
| ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER |
| } |
| if c.PeerAddr != nil { |
| ret.Peer = addrToProto(c.PeerAddr) |
| } |
| return ret |
| } |
| |
| // ClientMessage configs the binary log entry to be a ClientMessage entry. |
| type ClientMessage struct { |
| OnClientSide bool |
| // Message can be a proto.Message or []byte. Other messages formats are not |
| // supported. |
| Message interface{} |
| } |
| |
| func (c *ClientMessage) toProto() *pb.GrpcLogEntry { |
| var ( |
| data []byte |
| err error |
| ) |
| if m, ok := c.Message.(proto.Message); ok { |
| data, err = proto.Marshal(m) |
| if err != nil { |
| grpclog.Infof("binarylogging: failed to marshal proto message: %v", err) |
| } |
| } else if b, ok := c.Message.([]byte); ok { |
| data = b |
| } else { |
| grpclog.Infof("binarylogging: message to log is neither proto.message nor []byte") |
| } |
| ret := &pb.GrpcLogEntry{ |
| Type: pb.GrpcLogEntry_EVENT_TYPE_CLIENT_MESSAGE, |
| Payload: &pb.GrpcLogEntry_Message{ |
| Message: &pb.Message{ |
| Length: uint32(len(data)), |
| Data: data, |
| }, |
| }, |
| } |
| if c.OnClientSide { |
| ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT |
| } else { |
| ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER |
| } |
| return ret |
| } |
| |
| // ServerMessage configs the binary log entry to be a ServerMessage entry. |
| type ServerMessage struct { |
| OnClientSide bool |
| // Message can be a proto.Message or []byte. Other messages formats are not |
| // supported. |
| Message interface{} |
| } |
| |
| func (c *ServerMessage) toProto() *pb.GrpcLogEntry { |
| var ( |
| data []byte |
| err error |
| ) |
| if m, ok := c.Message.(proto.Message); ok { |
| data, err = proto.Marshal(m) |
| if err != nil { |
| grpclog.Infof("binarylogging: failed to marshal proto message: %v", err) |
| } |
| } else if b, ok := c.Message.([]byte); ok { |
| data = b |
| } else { |
| grpclog.Infof("binarylogging: message to log is neither proto.message nor []byte") |
| } |
| ret := &pb.GrpcLogEntry{ |
| Type: pb.GrpcLogEntry_EVENT_TYPE_SERVER_MESSAGE, |
| Payload: &pb.GrpcLogEntry_Message{ |
| Message: &pb.Message{ |
| Length: uint32(len(data)), |
| Data: data, |
| }, |
| }, |
| } |
| if c.OnClientSide { |
| ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT |
| } else { |
| ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER |
| } |
| return ret |
| } |
| |
| // ClientHalfClose configs the binary log entry to be a ClientHalfClose entry. |
| type ClientHalfClose struct { |
| OnClientSide bool |
| } |
| |
| func (c *ClientHalfClose) toProto() *pb.GrpcLogEntry { |
| ret := &pb.GrpcLogEntry{ |
| Type: pb.GrpcLogEntry_EVENT_TYPE_CLIENT_HALF_CLOSE, |
| Payload: nil, // No payload here. |
| } |
| if c.OnClientSide { |
| ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT |
| } else { |
| ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER |
| } |
| return ret |
| } |
| |
| // ServerTrailer configs the binary log entry to be a ServerTrailer entry. |
| type ServerTrailer struct { |
| OnClientSide bool |
| Trailer metadata.MD |
| // Err is the status error. |
| Err error |
| // PeerAddr is required only when it's on client side and the RPC is trailer |
| // only. |
| PeerAddr net.Addr |
| } |
| |
| func (c *ServerTrailer) toProto() *pb.GrpcLogEntry { |
| st, ok := status.FromError(c.Err) |
| if !ok { |
| grpclog.Info("binarylogging: error in trailer is not a status error") |
| } |
| var ( |
| detailsBytes []byte |
| err error |
| ) |
| stProto := st.Proto() |
| if stProto != nil && len(stProto.Details) != 0 { |
| detailsBytes, err = proto.Marshal(stProto) |
| if err != nil { |
| grpclog.Infof("binarylogging: failed to marshal status proto: %v", err) |
| } |
| } |
| ret := &pb.GrpcLogEntry{ |
| Type: pb.GrpcLogEntry_EVENT_TYPE_SERVER_TRAILER, |
| Payload: &pb.GrpcLogEntry_Trailer{ |
| Trailer: &pb.Trailer{ |
| Metadata: mdToMetadataProto(c.Trailer), |
| StatusCode: uint32(st.Code()), |
| StatusMessage: st.Message(), |
| StatusDetails: detailsBytes, |
| }, |
| }, |
| } |
| if c.OnClientSide { |
| ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT |
| } else { |
| ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER |
| } |
| if c.PeerAddr != nil { |
| ret.Peer = addrToProto(c.PeerAddr) |
| } |
| return ret |
| } |
| |
| // Cancel configs the binary log entry to be a Cancel entry. |
| type Cancel struct { |
| OnClientSide bool |
| } |
| |
| func (c *Cancel) toProto() *pb.GrpcLogEntry { |
| ret := &pb.GrpcLogEntry{ |
| Type: pb.GrpcLogEntry_EVENT_TYPE_CANCEL, |
| Payload: nil, |
| } |
| if c.OnClientSide { |
| ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT |
| } else { |
| ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER |
| } |
| return ret |
| } |
| |
| // metadataKeyOmit returns whether the metadata entry with this key should be |
| // omitted. |
| func metadataKeyOmit(key string) bool { |
| switch key { |
| case "lb-token", ":path", ":authority", "content-encoding", "content-type", "user-agent", "te": |
| return true |
| case "grpc-trace-bin": // grpc-trace-bin is special because it's visiable to users. |
| return false |
| } |
| return strings.HasPrefix(key, "grpc-") |
| } |
| |
| func mdToMetadataProto(md metadata.MD) *pb.Metadata { |
| ret := &pb.Metadata{} |
| for k, vv := range md { |
| if metadataKeyOmit(k) { |
| continue |
| } |
| for _, v := range vv { |
| ret.Entry = append(ret.Entry, |
| &pb.MetadataEntry{ |
| Key: k, |
| Value: []byte(v), |
| }, |
| ) |
| } |
| } |
| return ret |
| } |
| |
| func addrToProto(addr net.Addr) *pb.Address { |
| ret := &pb.Address{} |
| switch a := addr.(type) { |
| case *net.TCPAddr: |
| if a.IP.To4() != nil { |
| ret.Type = pb.Address_TYPE_IPV4 |
| } else if a.IP.To16() != nil { |
| ret.Type = pb.Address_TYPE_IPV6 |
| } else { |
| ret.Type = pb.Address_TYPE_UNKNOWN |
| // Do not set address and port fields. |
| break |
| } |
| ret.Address = a.IP.String() |
| ret.IpPort = uint32(a.Port) |
| case *net.UnixAddr: |
| ret.Type = pb.Address_TYPE_UNIX |
| ret.Address = a.String() |
| default: |
| ret.Type = pb.Address_TYPE_UNKNOWN |
| } |
| return ret |
| } |