blob: a2e7c346dd03603f7d6520deaf7a3a1a13f416e2 [file] [log] [blame]
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +00001/*
2 *
3 * Copyright 2018 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package binarylog
20
21import (
22 "bufio"
23 "encoding/binary"
24 "fmt"
25 "io"
26 "io/ioutil"
27 "sync"
28 "time"
29
30 "github.com/golang/protobuf/proto"
31 pb "google.golang.org/grpc/binarylog/grpc_binarylog_v1"
32 "google.golang.org/grpc/grpclog"
33)
34
35var (
36 defaultSink Sink = &noopSink{} // TODO(blog): change this default (file in /tmp).
37)
38
39// SetDefaultSink sets the sink where binary logs will be written to.
40//
41// Not thread safe. Only set during initialization.
42func SetDefaultSink(s Sink) {
43 if defaultSink != nil {
44 defaultSink.Close()
45 }
46 defaultSink = s
47}
48
49// Sink writes log entry into the binary log sink.
50type Sink interface {
51 // Write will be called to write the log entry into the sink.
52 //
53 // It should be thread-safe so it can be called in parallel.
54 Write(*pb.GrpcLogEntry) error
55 // Close will be called when the Sink is replaced by a new Sink.
56 Close() error
57}
58
59type noopSink struct{}
60
61func (ns *noopSink) Write(*pb.GrpcLogEntry) error { return nil }
62func (ns *noopSink) Close() error { return nil }
63
64// newWriterSink creates a binary log sink with the given writer.
65//
66// Write() marshals the proto message and writes it to the given writer. Each
67// message is prefixed with a 4 byte big endian unsigned integer as the length.
68//
69// No buffer is done, Close() doesn't try to close the writer.
70func newWriterSink(w io.Writer) *writerSink {
71 return &writerSink{out: w}
72}
73
74type writerSink struct {
75 out io.Writer
76}
77
78func (ws *writerSink) Write(e *pb.GrpcLogEntry) error {
79 b, err := proto.Marshal(e)
80 if err != nil {
81 grpclog.Infof("binary logging: failed to marshal proto message: %v", err)
82 }
83 hdr := make([]byte, 4)
84 binary.BigEndian.PutUint32(hdr, uint32(len(b)))
85 if _, err := ws.out.Write(hdr); err != nil {
86 return err
87 }
88 if _, err := ws.out.Write(b); err != nil {
89 return err
90 }
91 return nil
92}
93
94func (ws *writerSink) Close() error { return nil }
95
96type bufWriteCloserSink struct {
97 mu sync.Mutex
98 closer io.Closer
99 out *writerSink // out is built on buf.
100 buf *bufio.Writer // buf is kept for flush.
101
102 writeStartOnce sync.Once
103 writeTicker *time.Ticker
104}
105
106func (fs *bufWriteCloserSink) Write(e *pb.GrpcLogEntry) error {
107 // Start the write loop when Write is called.
108 fs.writeStartOnce.Do(fs.startFlushGoroutine)
109 fs.mu.Lock()
110 if err := fs.out.Write(e); err != nil {
111 fs.mu.Unlock()
112 return err
113 }
114 fs.mu.Unlock()
115 return nil
116}
117
118const (
119 bufFlushDuration = 60 * time.Second
120)
121
122func (fs *bufWriteCloserSink) startFlushGoroutine() {
123 fs.writeTicker = time.NewTicker(bufFlushDuration)
124 go func() {
125 for range fs.writeTicker.C {
126 fs.mu.Lock()
127 fs.buf.Flush()
128 fs.mu.Unlock()
129 }
130 }()
131}
132
133func (fs *bufWriteCloserSink) Close() error {
134 if fs.writeTicker != nil {
135 fs.writeTicker.Stop()
136 }
137 fs.mu.Lock()
138 fs.buf.Flush()
139 fs.closer.Close()
140 fs.out.Close()
141 fs.mu.Unlock()
142 return nil
143}
144
145func newBufWriteCloserSink(o io.WriteCloser) Sink {
146 bufW := bufio.NewWriter(o)
147 return &bufWriteCloserSink{
148 closer: o,
149 out: newWriterSink(bufW),
150 buf: bufW,
151 }
152}
153
154// NewTempFileSink creates a temp file and returns a Sink that writes to this
155// file.
156func NewTempFileSink() (Sink, error) {
157 tempFile, err := ioutil.TempFile("/tmp", "grpcgo_binarylog_*.txt")
158 if err != nil {
159 return nil, fmt.Errorf("failed to create temp file: %v", err)
160 }
161 return newBufWriteCloserSink(tempFile), nil
162}