khenaidoo | 59ce9dd | 2019-11-11 13:05:32 -0500 | [diff] [blame] | 1 | // Copyright 2016 The etcd Authors |
| 2 | // |
| 3 | // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | // you may not use this file except in compliance with the License. |
| 5 | // You may obtain a copy of the License at |
| 6 | // |
| 7 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | // |
| 9 | // Unless required by applicable law or agreed to in writing, software |
| 10 | // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | // See the License for the specific language governing permissions and |
| 13 | // limitations under the License. |
| 14 | |
| 15 | package wal |
| 16 | |
| 17 | import ( |
| 18 | "fmt" |
| 19 | "os" |
| 20 | "path/filepath" |
| 21 | |
| 22 | "go.etcd.io/etcd/pkg/fileutil" |
| 23 | |
| 24 | "go.uber.org/zap" |
| 25 | ) |
| 26 | |
| 27 | // filePipeline pipelines allocating disk space |
| 28 | type filePipeline struct { |
| 29 | lg *zap.Logger |
| 30 | |
| 31 | // dir to put files |
| 32 | dir string |
| 33 | // size of files to make, in bytes |
| 34 | size int64 |
| 35 | // count number of files generated |
| 36 | count int |
| 37 | |
| 38 | filec chan *fileutil.LockedFile |
| 39 | errc chan error |
| 40 | donec chan struct{} |
| 41 | } |
| 42 | |
| 43 | func newFilePipeline(lg *zap.Logger, dir string, fileSize int64) *filePipeline { |
| 44 | fp := &filePipeline{ |
| 45 | lg: lg, |
| 46 | dir: dir, |
| 47 | size: fileSize, |
| 48 | filec: make(chan *fileutil.LockedFile), |
| 49 | errc: make(chan error, 1), |
| 50 | donec: make(chan struct{}), |
| 51 | } |
| 52 | go fp.run() |
| 53 | return fp |
| 54 | } |
| 55 | |
| 56 | // Open returns a fresh file for writing. Rename the file before calling |
| 57 | // Open again or there will be file collisions. |
| 58 | func (fp *filePipeline) Open() (f *fileutil.LockedFile, err error) { |
| 59 | select { |
| 60 | case f = <-fp.filec: |
| 61 | case err = <-fp.errc: |
| 62 | } |
| 63 | return f, err |
| 64 | } |
| 65 | |
| 66 | func (fp *filePipeline) Close() error { |
| 67 | close(fp.donec) |
| 68 | return <-fp.errc |
| 69 | } |
| 70 | |
| 71 | func (fp *filePipeline) alloc() (f *fileutil.LockedFile, err error) { |
| 72 | // count % 2 so this file isn't the same as the one last published |
| 73 | fpath := filepath.Join(fp.dir, fmt.Sprintf("%d.tmp", fp.count%2)) |
| 74 | if f, err = fileutil.LockFile(fpath, os.O_CREATE|os.O_WRONLY, fileutil.PrivateFileMode); err != nil { |
| 75 | return nil, err |
| 76 | } |
| 77 | if err = fileutil.Preallocate(f.File, fp.size, true); err != nil { |
| 78 | if fp.lg != nil { |
| 79 | fp.lg.Warn("failed to preallocate space when creating a new WAL", zap.Int64("size", fp.size), zap.Error(err)) |
| 80 | } else { |
| 81 | plog.Errorf("failed to allocate space when creating new wal file (%v)", err) |
| 82 | } |
| 83 | f.Close() |
| 84 | return nil, err |
| 85 | } |
| 86 | fp.count++ |
| 87 | return f, nil |
| 88 | } |
| 89 | |
| 90 | func (fp *filePipeline) run() { |
| 91 | defer close(fp.errc) |
| 92 | for { |
| 93 | f, err := fp.alloc() |
| 94 | if err != nil { |
| 95 | fp.errc <- err |
| 96 | return |
| 97 | } |
| 98 | select { |
| 99 | case fp.filec <- f: |
| 100 | case <-fp.donec: |
| 101 | os.Remove(f.Name()) |
| 102 | f.Close() |
| 103 | return |
| 104 | } |
| 105 | } |
| 106 | } |