blob: 0c1436f76cc3ec663f19ceb6f8492ea4a86a9dea [file] [log] [blame]
// Copyright (c) 2021 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package zapcore
import (
"bufio"
"sync"
"time"
"go.uber.org/multierr"
)
const (
// _defaultBufferSize specifies the default size used by Buffer.
_defaultBufferSize = 256 * 1024 // 256 kB
// _defaultFlushInterval specifies the default flush interval for
// Buffer.
_defaultFlushInterval = 30 * time.Second
)
// A BufferedWriteSyncer is a WriteSyncer that buffers writes in-memory before
// flushing them to a wrapped WriteSyncer after reaching some limit, or at some
// fixed interval--whichever comes first.
//
// BufferedWriteSyncer is safe for concurrent use. You don't need to use
// zapcore.Lock for WriteSyncers with BufferedWriteSyncer.
type BufferedWriteSyncer struct {
// WS is the WriteSyncer around which BufferedWriteSyncer will buffer
// writes.
//
// This field is required.
WS WriteSyncer
// Size specifies the maximum amount of data the writer will buffered
// before flushing.
//
// Defaults to 256 kB if unspecified.
Size int
// FlushInterval specifies how often the writer should flush data if
// there have been no writes.
//
// Defaults to 30 seconds if unspecified.
FlushInterval time.Duration
// Clock, if specified, provides control of the source of time for the
// writer.
//
// Defaults to the system clock.
Clock Clock
// unexported fields for state
mu sync.Mutex
initialized bool // whether initialize() has run
writer *bufio.Writer
ticker *time.Ticker
stop chan struct{} // closed when flushLoop should stop
stopped bool // whether Stop() has run
done chan struct{} // closed when flushLoop has stopped
}
func (s *BufferedWriteSyncer) initialize() {
size := s.Size
if size == 0 {
size = _defaultBufferSize
}
flushInterval := s.FlushInterval
if flushInterval == 0 {
flushInterval = _defaultFlushInterval
}
if s.Clock == nil {
s.Clock = DefaultClock
}
s.ticker = s.Clock.NewTicker(flushInterval)
s.writer = bufio.NewWriterSize(s.WS, size)
s.stop = make(chan struct{})
s.done = make(chan struct{})
s.initialized = true
go s.flushLoop()
}
// Write writes log data into buffer syncer directly, multiple Write calls will be batched,
// and log data will be flushed to disk when the buffer is full or periodically.
func (s *BufferedWriteSyncer) Write(bs []byte) (int, error) {
s.mu.Lock()
defer s.mu.Unlock()
if !s.initialized {
s.initialize()
}
// To avoid partial writes from being flushed, we manually flush the existing buffer if:
// * The current write doesn't fit into the buffer fully, and
// * The buffer is not empty (since bufio will not split large writes when the buffer is empty)
if len(bs) > s.writer.Available() && s.writer.Buffered() > 0 {
if err := s.writer.Flush(); err != nil {
return 0, err
}
}
return s.writer.Write(bs)
}
// Sync flushes buffered log data into disk directly.
func (s *BufferedWriteSyncer) Sync() error {
s.mu.Lock()
defer s.mu.Unlock()
var err error
if s.initialized {
err = s.writer.Flush()
}
return multierr.Append(err, s.WS.Sync())
}
// flushLoop flushes the buffer at the configured interval until Stop is
// called.
func (s *BufferedWriteSyncer) flushLoop() {
defer close(s.done)
for {
select {
case <-s.ticker.C:
// we just simply ignore error here
// because the underlying bufio writer stores any errors
// and we return any error from Sync() as part of the close
_ = s.Sync()
case <-s.stop:
return
}
}
}
// Stop closes the buffer, cleans up background goroutines, and flushes
// remaining unwritten data.
func (s *BufferedWriteSyncer) Stop() (err error) {
var stopped bool
// Critical section.
func() {
s.mu.Lock()
defer s.mu.Unlock()
if !s.initialized {
return
}
stopped = s.stopped
if stopped {
return
}
s.stopped = true
s.ticker.Stop()
close(s.stop) // tell flushLoop to stop
<-s.done // and wait until it has
}()
// Don't call Sync on consecutive Stops.
if !stopped {
err = s.Sync()
}
return err
}