blob: c2fdd58b3198276cde5100b29c67e3c41ddb95e0 [file] [log] [blame]
khenaidoo5fc5cea2021-08-11 17:39:16 -04001/*
2 *
3 * Copyright 2018 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package binarylog
20
21import (
22 "bufio"
23 "encoding/binary"
24 "io"
25 "sync"
26 "time"
27
28 "github.com/golang/protobuf/proto"
29 pb "google.golang.org/grpc/binarylog/grpc_binarylog_v1"
30)
31
32var (
33 // DefaultSink is the sink where the logs will be written to. It's exported
34 // for the binarylog package to update.
35 DefaultSink Sink = &noopSink{} // TODO(blog): change this default (file in /tmp).
36)
37
38// Sink writes log entry into the binary log sink.
39//
40// sink is a copy of the exported binarylog.Sink, to avoid circular dependency.
41type Sink interface {
42 // Write will be called to write the log entry into the sink.
43 //
44 // It should be thread-safe so it can be called in parallel.
45 Write(*pb.GrpcLogEntry) error
46 // Close will be called when the Sink is replaced by a new Sink.
47 Close() error
48}
49
50type noopSink struct{}
51
52func (ns *noopSink) Write(*pb.GrpcLogEntry) error { return nil }
53func (ns *noopSink) Close() error { return nil }
54
55// newWriterSink creates a binary log sink with the given writer.
56//
57// Write() marshals the proto message and writes it to the given writer. Each
58// message is prefixed with a 4 byte big endian unsigned integer as the length.
59//
60// No buffer is done, Close() doesn't try to close the writer.
61func newWriterSink(w io.Writer) Sink {
62 return &writerSink{out: w}
63}
64
65type writerSink struct {
66 out io.Writer
67}
68
69func (ws *writerSink) Write(e *pb.GrpcLogEntry) error {
70 b, err := proto.Marshal(e)
71 if err != nil {
72 grpclogLogger.Errorf("binary logging: failed to marshal proto message: %v", err)
73 return err
74 }
75 hdr := make([]byte, 4)
76 binary.BigEndian.PutUint32(hdr, uint32(len(b)))
77 if _, err := ws.out.Write(hdr); err != nil {
78 return err
79 }
80 if _, err := ws.out.Write(b); err != nil {
81 return err
82 }
83 return nil
84}
85
86func (ws *writerSink) Close() error { return nil }
87
88type bufferedSink struct {
89 mu sync.Mutex
90 closer io.Closer
91 out Sink // out is built on buf.
92 buf *bufio.Writer // buf is kept for flush.
93 flusherStarted bool
94
95 writeTicker *time.Ticker
96 done chan struct{}
97}
98
99func (fs *bufferedSink) Write(e *pb.GrpcLogEntry) error {
100 fs.mu.Lock()
101 defer fs.mu.Unlock()
102 if !fs.flusherStarted {
103 // Start the write loop when Write is called.
104 fs.startFlushGoroutine()
105 fs.flusherStarted = true
106 }
107 if err := fs.out.Write(e); err != nil {
108 return err
109 }
110 return nil
111}
112
113const (
114 bufFlushDuration = 60 * time.Second
115)
116
117func (fs *bufferedSink) startFlushGoroutine() {
118 fs.writeTicker = time.NewTicker(bufFlushDuration)
119 go func() {
120 for {
121 select {
122 case <-fs.done:
123 return
124 case <-fs.writeTicker.C:
125 }
126 fs.mu.Lock()
127 if err := fs.buf.Flush(); err != nil {
128 grpclogLogger.Warningf("failed to flush to Sink: %v", err)
129 }
130 fs.mu.Unlock()
131 }
132 }()
133}
134
135func (fs *bufferedSink) Close() error {
136 fs.mu.Lock()
137 defer fs.mu.Unlock()
138 if fs.writeTicker != nil {
139 fs.writeTicker.Stop()
140 }
141 close(fs.done)
142 if err := fs.buf.Flush(); err != nil {
143 grpclogLogger.Warningf("failed to flush to Sink: %v", err)
144 }
145 if err := fs.closer.Close(); err != nil {
146 grpclogLogger.Warningf("failed to close the underlying WriterCloser: %v", err)
147 }
148 if err := fs.out.Close(); err != nil {
149 grpclogLogger.Warningf("failed to close the Sink: %v", err)
150 }
151 return nil
152}
153
154// NewBufferedSink creates a binary log sink with the given WriteCloser.
155//
156// Write() marshals the proto message and writes it to the given writer. Each
157// message is prefixed with a 4 byte big endian unsigned integer as the length.
158//
159// Content is kept in a buffer, and is flushed every 60 seconds.
160//
161// Close closes the WriteCloser.
162func NewBufferedSink(o io.WriteCloser) Sink {
163 bufW := bufio.NewWriter(o)
164 return &bufferedSink{
165 closer: o,
166 out: newWriterSink(bufW),
167 buf: bufW,
168 done: make(chan struct{}),
169 }
170}