blob: e1e1c557b8bb10bea702a24ee1dff534cc8a19d3 [file] [log] [blame]
khenaidoo59ce9dd2019-11-11 13:05:32 -05001// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package wal
16
17import (
18 "fmt"
19 "os"
20 "path/filepath"
21
22 "go.etcd.io/etcd/pkg/fileutil"
23
24 "go.uber.org/zap"
25)
26
27// filePipeline pipelines allocating disk space
28type filePipeline struct {
29 lg *zap.Logger
30
31 // dir to put files
32 dir string
33 // size of files to make, in bytes
34 size int64
35 // count number of files generated
36 count int
37
38 filec chan *fileutil.LockedFile
39 errc chan error
40 donec chan struct{}
41}
42
43func newFilePipeline(lg *zap.Logger, dir string, fileSize int64) *filePipeline {
44 fp := &filePipeline{
45 lg: lg,
46 dir: dir,
47 size: fileSize,
48 filec: make(chan *fileutil.LockedFile),
49 errc: make(chan error, 1),
50 donec: make(chan struct{}),
51 }
52 go fp.run()
53 return fp
54}
55
56// Open returns a fresh file for writing. Rename the file before calling
57// Open again or there will be file collisions.
58func (fp *filePipeline) Open() (f *fileutil.LockedFile, err error) {
59 select {
60 case f = <-fp.filec:
61 case err = <-fp.errc:
62 }
63 return f, err
64}
65
66func (fp *filePipeline) Close() error {
67 close(fp.donec)
68 return <-fp.errc
69}
70
71func (fp *filePipeline) alloc() (f *fileutil.LockedFile, err error) {
72 // count % 2 so this file isn't the same as the one last published
73 fpath := filepath.Join(fp.dir, fmt.Sprintf("%d.tmp", fp.count%2))
74 if f, err = fileutil.LockFile(fpath, os.O_CREATE|os.O_WRONLY, fileutil.PrivateFileMode); err != nil {
75 return nil, err
76 }
77 if err = fileutil.Preallocate(f.File, fp.size, true); err != nil {
78 if fp.lg != nil {
79 fp.lg.Warn("failed to preallocate space when creating a new WAL", zap.Int64("size", fp.size), zap.Error(err))
80 } else {
81 plog.Errorf("failed to allocate space when creating new wal file (%v)", err)
82 }
83 f.Close()
84 return nil, err
85 }
86 fp.count++
87 return f, nil
88}
89
90func (fp *filePipeline) run() {
91 defer close(fp.errc)
92 for {
93 f, err := fp.alloc()
94 if err != nil {
95 fp.errc <- err
96 return
97 }
98 select {
99 case fp.filec <- f:
100 case <-fp.donec:
101 os.Remove(f.Name())
102 f.Close()
103 return
104 }
105 }
106}