khenaidoo | 106c61a | 2021-08-11 18:05:46 -0400 | [diff] [blame] | 1 | // Copyright (c) 2021 Uber Technologies, Inc. |
| 2 | // |
| 3 | // Permission is hereby granted, free of charge, to any person obtaining a copy |
| 4 | // of this software and associated documentation files (the "Software"), to deal |
| 5 | // in the Software without restriction, including without limitation the rights |
| 6 | // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
| 7 | // copies of the Software, and to permit persons to whom the Software is |
| 8 | // furnished to do so, subject to the following conditions: |
| 9 | // |
| 10 | // The above copyright notice and this permission notice shall be included in |
| 11 | // all copies or substantial portions of the Software. |
| 12 | // |
| 13 | // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
| 14 | // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
| 15 | // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
| 16 | // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
| 17 | // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
| 18 | // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN |
| 19 | // THE SOFTWARE. |
| 20 | |
| 21 | package zapcore |
| 22 | |
| 23 | import ( |
| 24 | "bufio" |
| 25 | "sync" |
| 26 | "time" |
| 27 | |
| 28 | "go.uber.org/multierr" |
| 29 | ) |
| 30 | |
| 31 | const ( |
| 32 | // _defaultBufferSize specifies the default size used by Buffer. |
| 33 | _defaultBufferSize = 256 * 1024 // 256 kB |
| 34 | |
| 35 | // _defaultFlushInterval specifies the default flush interval for |
| 36 | // Buffer. |
| 37 | _defaultFlushInterval = 30 * time.Second |
| 38 | ) |
| 39 | |
| 40 | // A BufferedWriteSyncer is a WriteSyncer that buffers writes in-memory before |
| 41 | // flushing them to a wrapped WriteSyncer after reaching some limit, or at some |
| 42 | // fixed interval--whichever comes first. |
| 43 | // |
| 44 | // BufferedWriteSyncer is safe for concurrent use. You don't need to use |
| 45 | // zapcore.Lock for WriteSyncers with BufferedWriteSyncer. |
| 46 | type BufferedWriteSyncer struct { |
| 47 | // WS is the WriteSyncer around which BufferedWriteSyncer will buffer |
| 48 | // writes. |
| 49 | // |
| 50 | // This field is required. |
| 51 | WS WriteSyncer |
| 52 | |
| 53 | // Size specifies the maximum amount of data the writer will buffered |
| 54 | // before flushing. |
| 55 | // |
| 56 | // Defaults to 256 kB if unspecified. |
| 57 | Size int |
| 58 | |
| 59 | // FlushInterval specifies how often the writer should flush data if |
| 60 | // there have been no writes. |
| 61 | // |
| 62 | // Defaults to 30 seconds if unspecified. |
| 63 | FlushInterval time.Duration |
| 64 | |
| 65 | // Clock, if specified, provides control of the source of time for the |
| 66 | // writer. |
| 67 | // |
| 68 | // Defaults to the system clock. |
| 69 | Clock Clock |
| 70 | |
| 71 | // unexported fields for state |
| 72 | mu sync.Mutex |
| 73 | initialized bool // whether initialize() has run |
| 74 | writer *bufio.Writer |
| 75 | ticker *time.Ticker |
| 76 | stop chan struct{} // closed when flushLoop should stop |
| 77 | stopped bool // whether Stop() has run |
| 78 | done chan struct{} // closed when flushLoop has stopped |
| 79 | } |
| 80 | |
| 81 | func (s *BufferedWriteSyncer) initialize() { |
| 82 | size := s.Size |
| 83 | if size == 0 { |
| 84 | size = _defaultBufferSize |
| 85 | } |
| 86 | |
| 87 | flushInterval := s.FlushInterval |
| 88 | if flushInterval == 0 { |
| 89 | flushInterval = _defaultFlushInterval |
| 90 | } |
| 91 | |
| 92 | if s.Clock == nil { |
| 93 | s.Clock = DefaultClock |
| 94 | } |
| 95 | |
| 96 | s.ticker = s.Clock.NewTicker(flushInterval) |
| 97 | s.writer = bufio.NewWriterSize(s.WS, size) |
| 98 | s.stop = make(chan struct{}) |
| 99 | s.done = make(chan struct{}) |
| 100 | s.initialized = true |
| 101 | go s.flushLoop() |
| 102 | } |
| 103 | |
| 104 | // Write writes log data into buffer syncer directly, multiple Write calls will be batched, |
| 105 | // and log data will be flushed to disk when the buffer is full or periodically. |
| 106 | func (s *BufferedWriteSyncer) Write(bs []byte) (int, error) { |
| 107 | s.mu.Lock() |
| 108 | defer s.mu.Unlock() |
| 109 | |
| 110 | if !s.initialized { |
| 111 | s.initialize() |
| 112 | } |
| 113 | |
| 114 | // To avoid partial writes from being flushed, we manually flush the existing buffer if: |
| 115 | // * The current write doesn't fit into the buffer fully, and |
| 116 | // * The buffer is not empty (since bufio will not split large writes when the buffer is empty) |
| 117 | if len(bs) > s.writer.Available() && s.writer.Buffered() > 0 { |
| 118 | if err := s.writer.Flush(); err != nil { |
| 119 | return 0, err |
| 120 | } |
| 121 | } |
| 122 | |
| 123 | return s.writer.Write(bs) |
| 124 | } |
| 125 | |
| 126 | // Sync flushes buffered log data into disk directly. |
| 127 | func (s *BufferedWriteSyncer) Sync() error { |
| 128 | s.mu.Lock() |
| 129 | defer s.mu.Unlock() |
| 130 | |
| 131 | var err error |
| 132 | if s.initialized { |
| 133 | err = s.writer.Flush() |
| 134 | } |
| 135 | |
| 136 | return multierr.Append(err, s.WS.Sync()) |
| 137 | } |
| 138 | |
| 139 | // flushLoop flushes the buffer at the configured interval until Stop is |
| 140 | // called. |
| 141 | func (s *BufferedWriteSyncer) flushLoop() { |
| 142 | defer close(s.done) |
| 143 | |
| 144 | for { |
| 145 | select { |
| 146 | case <-s.ticker.C: |
| 147 | // we just simply ignore error here |
| 148 | // because the underlying bufio writer stores any errors |
| 149 | // and we return any error from Sync() as part of the close |
| 150 | _ = s.Sync() |
| 151 | case <-s.stop: |
| 152 | return |
| 153 | } |
| 154 | } |
| 155 | } |
| 156 | |
| 157 | // Stop closes the buffer, cleans up background goroutines, and flushes |
| 158 | // remaining unwritten data. |
| 159 | func (s *BufferedWriteSyncer) Stop() (err error) { |
| 160 | var stopped bool |
| 161 | |
| 162 | // Critical section. |
| 163 | func() { |
| 164 | s.mu.Lock() |
| 165 | defer s.mu.Unlock() |
| 166 | |
| 167 | if !s.initialized { |
| 168 | return |
| 169 | } |
| 170 | |
| 171 | stopped = s.stopped |
| 172 | if stopped { |
| 173 | return |
| 174 | } |
| 175 | s.stopped = true |
| 176 | |
| 177 | s.ticker.Stop() |
| 178 | close(s.stop) // tell flushLoop to stop |
| 179 | <-s.done // and wait until it has |
| 180 | }() |
| 181 | |
| 182 | // Don't call Sync on consecutive Stops. |
| 183 | if !stopped { |
| 184 | err = s.Sync() |
| 185 | } |
| 186 | |
| 187 | return err |
| 188 | } |