blob: 0c1436f76cc3ec663f19ceb6f8492ea4a86a9dea [file] [log] [blame]
khenaidood948f772021-08-11 17:49:24 -04001// Copyright (c) 2021 Uber Technologies, Inc.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a copy
4// of this software and associated documentation files (the "Software"), to deal
5// in the Software without restriction, including without limitation the rights
6// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7// copies of the Software, and to permit persons to whom the Software is
8// furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19// THE SOFTWARE.
20
21package zapcore
22
23import (
24 "bufio"
25 "sync"
26 "time"
27
28 "go.uber.org/multierr"
29)
30
31const (
32 // _defaultBufferSize specifies the default size used by Buffer.
33 _defaultBufferSize = 256 * 1024 // 256 kB
34
35 // _defaultFlushInterval specifies the default flush interval for
36 // Buffer.
37 _defaultFlushInterval = 30 * time.Second
38)
39
40// A BufferedWriteSyncer is a WriteSyncer that buffers writes in-memory before
41// flushing them to a wrapped WriteSyncer after reaching some limit, or at some
42// fixed interval--whichever comes first.
43//
44// BufferedWriteSyncer is safe for concurrent use. You don't need to use
45// zapcore.Lock for WriteSyncers with BufferedWriteSyncer.
46type BufferedWriteSyncer struct {
47 // WS is the WriteSyncer around which BufferedWriteSyncer will buffer
48 // writes.
49 //
50 // This field is required.
51 WS WriteSyncer
52
53 // Size specifies the maximum amount of data the writer will buffered
54 // before flushing.
55 //
56 // Defaults to 256 kB if unspecified.
57 Size int
58
59 // FlushInterval specifies how often the writer should flush data if
60 // there have been no writes.
61 //
62 // Defaults to 30 seconds if unspecified.
63 FlushInterval time.Duration
64
65 // Clock, if specified, provides control of the source of time for the
66 // writer.
67 //
68 // Defaults to the system clock.
69 Clock Clock
70
71 // unexported fields for state
72 mu sync.Mutex
73 initialized bool // whether initialize() has run
74 writer *bufio.Writer
75 ticker *time.Ticker
76 stop chan struct{} // closed when flushLoop should stop
77 stopped bool // whether Stop() has run
78 done chan struct{} // closed when flushLoop has stopped
79}
80
81func (s *BufferedWriteSyncer) initialize() {
82 size := s.Size
83 if size == 0 {
84 size = _defaultBufferSize
85 }
86
87 flushInterval := s.FlushInterval
88 if flushInterval == 0 {
89 flushInterval = _defaultFlushInterval
90 }
91
92 if s.Clock == nil {
93 s.Clock = DefaultClock
94 }
95
96 s.ticker = s.Clock.NewTicker(flushInterval)
97 s.writer = bufio.NewWriterSize(s.WS, size)
98 s.stop = make(chan struct{})
99 s.done = make(chan struct{})
100 s.initialized = true
101 go s.flushLoop()
102}
103
104// Write writes log data into buffer syncer directly, multiple Write calls will be batched,
105// and log data will be flushed to disk when the buffer is full or periodically.
106func (s *BufferedWriteSyncer) Write(bs []byte) (int, error) {
107 s.mu.Lock()
108 defer s.mu.Unlock()
109
110 if !s.initialized {
111 s.initialize()
112 }
113
114 // To avoid partial writes from being flushed, we manually flush the existing buffer if:
115 // * The current write doesn't fit into the buffer fully, and
116 // * The buffer is not empty (since bufio will not split large writes when the buffer is empty)
117 if len(bs) > s.writer.Available() && s.writer.Buffered() > 0 {
118 if err := s.writer.Flush(); err != nil {
119 return 0, err
120 }
121 }
122
123 return s.writer.Write(bs)
124}
125
126// Sync flushes buffered log data into disk directly.
127func (s *BufferedWriteSyncer) Sync() error {
128 s.mu.Lock()
129 defer s.mu.Unlock()
130
131 var err error
132 if s.initialized {
133 err = s.writer.Flush()
134 }
135
136 return multierr.Append(err, s.WS.Sync())
137}
138
139// flushLoop flushes the buffer at the configured interval until Stop is
140// called.
141func (s *BufferedWriteSyncer) flushLoop() {
142 defer close(s.done)
143
144 for {
145 select {
146 case <-s.ticker.C:
147 // we just simply ignore error here
148 // because the underlying bufio writer stores any errors
149 // and we return any error from Sync() as part of the close
150 _ = s.Sync()
151 case <-s.stop:
152 return
153 }
154 }
155}
156
157// Stop closes the buffer, cleans up background goroutines, and flushes
158// remaining unwritten data.
159func (s *BufferedWriteSyncer) Stop() (err error) {
160 var stopped bool
161
162 // Critical section.
163 func() {
164 s.mu.Lock()
165 defer s.mu.Unlock()
166
167 if !s.initialized {
168 return
169 }
170
171 stopped = s.stopped
172 if stopped {
173 return
174 }
175 s.stopped = true
176
177 s.ticker.Stop()
178 close(s.stop) // tell flushLoop to stop
179 <-s.done // and wait until it has
180 }()
181
182 // Don't call Sync on consecutive Stops.
183 if !stopped {
184 err = s.Sync()
185 }
186
187 return err
188}