| // Copyright (c) 2021 Uber Technologies, Inc. |
| // |
| // Permission is hereby granted, free of charge, to any person obtaining a copy |
| // of this software and associated documentation files (the "Software"), to deal |
| // in the Software without restriction, including without limitation the rights |
| // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
| // copies of the Software, and to permit persons to whom the Software is |
| // furnished to do so, subject to the following conditions: |
| // |
| // The above copyright notice and this permission notice shall be included in |
| // all copies or substantial portions of the Software. |
| // |
| // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
| // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
| // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
| // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
| // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
| // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN |
| // THE SOFTWARE. |
| |
| package zapcore |
| |
| import ( |
| "bufio" |
| "sync" |
| "time" |
| |
| "go.uber.org/multierr" |
| ) |
| |
| const ( |
| // _defaultBufferSize specifies the default size used by Buffer. |
| _defaultBufferSize = 256 * 1024 // 256 kB |
| |
| // _defaultFlushInterval specifies the default flush interval for |
| // Buffer. |
| _defaultFlushInterval = 30 * time.Second |
| ) |
| |
| // A BufferedWriteSyncer is a WriteSyncer that buffers writes in-memory before |
| // flushing them to a wrapped WriteSyncer after reaching some limit, or at some |
| // fixed interval--whichever comes first. |
| // |
| // BufferedWriteSyncer is safe for concurrent use. You don't need to use |
| // zapcore.Lock for WriteSyncers with BufferedWriteSyncer. |
| type BufferedWriteSyncer struct { |
| // WS is the WriteSyncer around which BufferedWriteSyncer will buffer |
| // writes. |
| // |
| // This field is required. |
| WS WriteSyncer |
| |
| // Size specifies the maximum amount of data the writer will buffered |
| // before flushing. |
| // |
| // Defaults to 256 kB if unspecified. |
| Size int |
| |
| // FlushInterval specifies how often the writer should flush data if |
| // there have been no writes. |
| // |
| // Defaults to 30 seconds if unspecified. |
| FlushInterval time.Duration |
| |
| // Clock, if specified, provides control of the source of time for the |
| // writer. |
| // |
| // Defaults to the system clock. |
| Clock Clock |
| |
| // unexported fields for state |
| mu sync.Mutex |
| initialized bool // whether initialize() has run |
| writer *bufio.Writer |
| ticker *time.Ticker |
| stop chan struct{} // closed when flushLoop should stop |
| stopped bool // whether Stop() has run |
| done chan struct{} // closed when flushLoop has stopped |
| } |
| |
| func (s *BufferedWriteSyncer) initialize() { |
| size := s.Size |
| if size == 0 { |
| size = _defaultBufferSize |
| } |
| |
| flushInterval := s.FlushInterval |
| if flushInterval == 0 { |
| flushInterval = _defaultFlushInterval |
| } |
| |
| if s.Clock == nil { |
| s.Clock = DefaultClock |
| } |
| |
| s.ticker = s.Clock.NewTicker(flushInterval) |
| s.writer = bufio.NewWriterSize(s.WS, size) |
| s.stop = make(chan struct{}) |
| s.done = make(chan struct{}) |
| s.initialized = true |
| go s.flushLoop() |
| } |
| |
| // Write writes log data into buffer syncer directly, multiple Write calls will be batched, |
| // and log data will be flushed to disk when the buffer is full or periodically. |
| func (s *BufferedWriteSyncer) Write(bs []byte) (int, error) { |
| s.mu.Lock() |
| defer s.mu.Unlock() |
| |
| if !s.initialized { |
| s.initialize() |
| } |
| |
| // To avoid partial writes from being flushed, we manually flush the existing buffer if: |
| // * The current write doesn't fit into the buffer fully, and |
| // * The buffer is not empty (since bufio will not split large writes when the buffer is empty) |
| if len(bs) > s.writer.Available() && s.writer.Buffered() > 0 { |
| if err := s.writer.Flush(); err != nil { |
| return 0, err |
| } |
| } |
| |
| return s.writer.Write(bs) |
| } |
| |
| // Sync flushes buffered log data into disk directly. |
| func (s *BufferedWriteSyncer) Sync() error { |
| s.mu.Lock() |
| defer s.mu.Unlock() |
| |
| var err error |
| if s.initialized { |
| err = s.writer.Flush() |
| } |
| |
| return multierr.Append(err, s.WS.Sync()) |
| } |
| |
| // flushLoop flushes the buffer at the configured interval until Stop is |
| // called. |
| func (s *BufferedWriteSyncer) flushLoop() { |
| defer close(s.done) |
| |
| for { |
| select { |
| case <-s.ticker.C: |
| // we just simply ignore error here |
| // because the underlying bufio writer stores any errors |
| // and we return any error from Sync() as part of the close |
| _ = s.Sync() |
| case <-s.stop: |
| return |
| } |
| } |
| } |
| |
| // Stop closes the buffer, cleans up background goroutines, and flushes |
| // remaining unwritten data. |
| func (s *BufferedWriteSyncer) Stop() (err error) { |
| var stopped bool |
| |
| // Critical section. |
| func() { |
| s.mu.Lock() |
| defer s.mu.Unlock() |
| |
| if !s.initialized { |
| return |
| } |
| |
| stopped = s.stopped |
| if stopped { |
| return |
| } |
| s.stopped = true |
| |
| s.ticker.Stop() |
| close(s.stop) // tell flushLoop to stop |
| <-s.done // and wait until it has |
| }() |
| |
| // Don't call Sync on consecutive Stops. |
| if !stopped { |
| err = s.Sync() |
| } |
| |
| return err |
| } |