blob: 3a1c57c1c960eb628865a7243489cb86553e7b8c [file] [log] [blame]
khenaidood948f772021-08-11 17:49:24 -04001// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package wal
16
17import (
18 "fmt"
19 "os"
20 "path/filepath"
21
22 "github.com/coreos/etcd/pkg/fileutil"
23)
24
25// filePipeline pipelines allocating disk space
26type filePipeline struct {
27 // dir to put files
28 dir string
29 // size of files to make, in bytes
30 size int64
31 // count number of files generated
32 count int
33
34 filec chan *fileutil.LockedFile
35 errc chan error
36 donec chan struct{}
37}
38
39func newFilePipeline(dir string, fileSize int64) *filePipeline {
40 fp := &filePipeline{
41 dir: dir,
42 size: fileSize,
43 filec: make(chan *fileutil.LockedFile),
44 errc: make(chan error, 1),
45 donec: make(chan struct{}),
46 }
47 go fp.run()
48 return fp
49}
50
51// Open returns a fresh file for writing. Rename the file before calling
52// Open again or there will be file collisions.
53func (fp *filePipeline) Open() (f *fileutil.LockedFile, err error) {
54 select {
55 case f = <-fp.filec:
56 case err = <-fp.errc:
57 }
58 return f, err
59}
60
61func (fp *filePipeline) Close() error {
62 close(fp.donec)
63 return <-fp.errc
64}
65
66func (fp *filePipeline) alloc() (f *fileutil.LockedFile, err error) {
67 // count % 2 so this file isn't the same as the one last published
68 fpath := filepath.Join(fp.dir, fmt.Sprintf("%d.tmp", fp.count%2))
69 if f, err = fileutil.LockFile(fpath, os.O_CREATE|os.O_WRONLY, fileutil.PrivateFileMode); err != nil {
70 return nil, err
71 }
72 if err = fileutil.Preallocate(f.File, fp.size, true); err != nil {
73 plog.Errorf("failed to allocate space when creating new wal file (%v)", err)
74 f.Close()
75 return nil, err
76 }
77 fp.count++
78 return f, nil
79}
80
81func (fp *filePipeline) run() {
82 defer close(fp.errc)
83 for {
84 f, err := fp.alloc()
85 if err != nil {
86 fp.errc <- err
87 return
88 }
89 select {
90 case fp.filec <- f:
91 case <-fp.donec:
92 os.Remove(f.Name())
93 f.Close()
94 return
95 }
96 }
97}