blob: e177c8e00e366f4426027173c50829ea2de0f080 [file] [log] [blame]
khenaidooab1f7bd2019-11-14 14:00:27 -05001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package wal
16
17import (
18 "bytes"
19 "errors"
20 "fmt"
21 "hash/crc32"
22 "io"
23 "os"
24 "path/filepath"
25 "sync"
26 "time"
27
28 "go.etcd.io/etcd/pkg/fileutil"
29 "go.etcd.io/etcd/pkg/pbutil"
30 "go.etcd.io/etcd/raft"
31 "go.etcd.io/etcd/raft/raftpb"
32 "go.etcd.io/etcd/wal/walpb"
33
34 "github.com/coreos/pkg/capnslog"
35 "go.uber.org/zap"
36)
37
38const (
39 metadataType int64 = iota + 1
40 entryType
41 stateType
42 crcType
43 snapshotType
44
45 // warnSyncDuration is the amount of time allotted to an fsync before
46 // logging a warning
47 warnSyncDuration = time.Second
48)
49
50var (
51 // SegmentSizeBytes is the preallocated size of each wal segment file.
52 // The actual size might be larger than this. In general, the default
53 // value should be used, but this is defined as an exported variable
54 // so that tests can set a different segment size.
55 SegmentSizeBytes int64 = 64 * 1000 * 1000 // 64MB
56
57 plog = capnslog.NewPackageLogger("go.etcd.io/etcd", "wal")
58
59 ErrMetadataConflict = errors.New("wal: conflicting metadata found")
60 ErrFileNotFound = errors.New("wal: file not found")
61 ErrCRCMismatch = errors.New("wal: crc mismatch")
62 ErrSnapshotMismatch = errors.New("wal: snapshot mismatch")
63 ErrSnapshotNotFound = errors.New("wal: snapshot not found")
64 crcTable = crc32.MakeTable(crc32.Castagnoli)
65)
66
67// WAL is a logical representation of the stable storage.
68// WAL is either in read mode or append mode but not both.
69// A newly created WAL is in append mode, and ready for appending records.
70// A just opened WAL is in read mode, and ready for reading records.
71// The WAL will be ready for appending after reading out all the previous records.
72type WAL struct {
73 lg *zap.Logger
74
75 dir string // the living directory of the underlay files
76
77 // dirFile is a fd for the wal directory for syncing on Rename
78 dirFile *os.File
79
80 metadata []byte // metadata recorded at the head of each WAL
81 state raftpb.HardState // hardstate recorded at the head of WAL
82
83 start walpb.Snapshot // snapshot to start reading
84 decoder *decoder // decoder to decode records
85 readClose func() error // closer for decode reader
86
87 mu sync.Mutex
88 enti uint64 // index of the last entry saved to the wal
89 encoder *encoder // encoder to encode records
90
91 locks []*fileutil.LockedFile // the locked files the WAL holds (the name is increasing)
92 fp *filePipeline
93}
94
95// Create creates a WAL ready for appending records. The given metadata is
96// recorded at the head of each WAL file, and can be retrieved with ReadAll.
97func Create(lg *zap.Logger, dirpath string, metadata []byte) (*WAL, error) {
98 if Exist(dirpath) {
99 return nil, os.ErrExist
100 }
101
102 // keep temporary wal directory so WAL initialization appears atomic
103 tmpdirpath := filepath.Clean(dirpath) + ".tmp"
104 if fileutil.Exist(tmpdirpath) {
105 if err := os.RemoveAll(tmpdirpath); err != nil {
106 return nil, err
107 }
108 }
109 if err := fileutil.CreateDirAll(tmpdirpath); err != nil {
110 if lg != nil {
111 lg.Warn(
112 "failed to create a temporary WAL directory",
113 zap.String("tmp-dir-path", tmpdirpath),
114 zap.String("dir-path", dirpath),
115 zap.Error(err),
116 )
117 }
118 return nil, err
119 }
120
121 p := filepath.Join(tmpdirpath, walName(0, 0))
122 f, err := fileutil.LockFile(p, os.O_WRONLY|os.O_CREATE, fileutil.PrivateFileMode)
123 if err != nil {
124 if lg != nil {
125 lg.Warn(
126 "failed to flock an initial WAL file",
127 zap.String("path", p),
128 zap.Error(err),
129 )
130 }
131 return nil, err
132 }
133 if _, err = f.Seek(0, io.SeekEnd); err != nil {
134 if lg != nil {
135 lg.Warn(
136 "failed to seek an initial WAL file",
137 zap.String("path", p),
138 zap.Error(err),
139 )
140 }
141 return nil, err
142 }
143 if err = fileutil.Preallocate(f.File, SegmentSizeBytes, true); err != nil {
144 if lg != nil {
145 lg.Warn(
146 "failed to preallocate an initial WAL file",
147 zap.String("path", p),
148 zap.Int64("segment-bytes", SegmentSizeBytes),
149 zap.Error(err),
150 )
151 }
152 return nil, err
153 }
154
155 w := &WAL{
156 lg: lg,
157 dir: dirpath,
158 metadata: metadata,
159 }
160 w.encoder, err = newFileEncoder(f.File, 0)
161 if err != nil {
162 return nil, err
163 }
164 w.locks = append(w.locks, f)
165 if err = w.saveCrc(0); err != nil {
166 return nil, err
167 }
168 if err = w.encoder.encode(&walpb.Record{Type: metadataType, Data: metadata}); err != nil {
169 return nil, err
170 }
171 if err = w.SaveSnapshot(walpb.Snapshot{}); err != nil {
172 return nil, err
173 }
174
175 if w, err = w.renameWAL(tmpdirpath); err != nil {
176 if lg != nil {
177 lg.Warn(
178 "failed to rename the temporary WAL directory",
179 zap.String("tmp-dir-path", tmpdirpath),
180 zap.String("dir-path", w.dir),
181 zap.Error(err),
182 )
183 }
184 return nil, err
185 }
186
187 var perr error
188 defer func() {
189 if perr != nil {
190 w.cleanupWAL(lg)
191 }
192 }()
193
194 // directory was renamed; sync parent dir to persist rename
195 pdir, perr := fileutil.OpenDir(filepath.Dir(w.dir))
196 if perr != nil {
197 if lg != nil {
198 lg.Warn(
199 "failed to open the parent data directory",
200 zap.String("parent-dir-path", filepath.Dir(w.dir)),
201 zap.String("dir-path", w.dir),
202 zap.Error(perr),
203 )
204 }
205 return nil, perr
206 }
207 if perr = fileutil.Fsync(pdir); perr != nil {
208 if lg != nil {
209 lg.Warn(
210 "failed to fsync the parent data directory file",
211 zap.String("parent-dir-path", filepath.Dir(w.dir)),
212 zap.String("dir-path", w.dir),
213 zap.Error(perr),
214 )
215 }
216 return nil, perr
217 }
218 if perr = pdir.Close(); perr != nil {
219 if lg != nil {
220 lg.Warn(
221 "failed to close the parent data directory file",
222 zap.String("parent-dir-path", filepath.Dir(w.dir)),
223 zap.String("dir-path", w.dir),
224 zap.Error(perr),
225 )
226 }
227 return nil, perr
228 }
229
230 return w, nil
231}
232
233func (w *WAL) cleanupWAL(lg *zap.Logger) {
234 var err error
235 if err = w.Close(); err != nil {
236 if lg != nil {
237 lg.Panic("failed to close WAL during cleanup", zap.Error(err))
238 } else {
239 plog.Panicf("failed to close WAL during cleanup: %v", err)
240 }
241 }
242 brokenDirName := fmt.Sprintf("%s.broken.%v", w.dir, time.Now().Format("20060102.150405.999999"))
243 if err = os.Rename(w.dir, brokenDirName); err != nil {
244 if lg != nil {
245 lg.Panic(
246 "failed to rename WAL during cleanup",
247 zap.Error(err),
248 zap.String("source-path", w.dir),
249 zap.String("rename-path", brokenDirName),
250 )
251 } else {
252 plog.Panicf("failed to rename WAL during cleanup: %v", err)
253 }
254 }
255}
256
257func (w *WAL) renameWAL(tmpdirpath string) (*WAL, error) {
258 if err := os.RemoveAll(w.dir); err != nil {
259 return nil, err
260 }
261 // On non-Windows platforms, hold the lock while renaming. Releasing
262 // the lock and trying to reacquire it quickly can be flaky because
263 // it's possible the process will fork to spawn a process while this is
264 // happening. The fds are set up as close-on-exec by the Go runtime,
265 // but there is a window between the fork and the exec where another
266 // process holds the lock.
267 if err := os.Rename(tmpdirpath, w.dir); err != nil {
268 if _, ok := err.(*os.LinkError); ok {
269 return w.renameWALUnlock(tmpdirpath)
270 }
271 return nil, err
272 }
273 w.fp = newFilePipeline(w.lg, w.dir, SegmentSizeBytes)
274 df, err := fileutil.OpenDir(w.dir)
275 w.dirFile = df
276 return w, err
277}
278
279func (w *WAL) renameWALUnlock(tmpdirpath string) (*WAL, error) {
280 // rename of directory with locked files doesn't work on windows/cifs;
281 // close the WAL to release the locks so the directory can be renamed.
282 if w.lg != nil {
283 w.lg.Info(
284 "closing WAL to release flock and retry directory renaming",
285 zap.String("from", tmpdirpath),
286 zap.String("to", w.dir),
287 )
288 } else {
289 plog.Infof("releasing file lock to rename %q to %q", tmpdirpath, w.dir)
290 }
291 w.Close()
292
293 if err := os.Rename(tmpdirpath, w.dir); err != nil {
294 return nil, err
295 }
296
297 // reopen and relock
298 newWAL, oerr := Open(w.lg, w.dir, walpb.Snapshot{})
299 if oerr != nil {
300 return nil, oerr
301 }
302 if _, _, _, err := newWAL.ReadAll(); err != nil {
303 newWAL.Close()
304 return nil, err
305 }
306 return newWAL, nil
307}
308
309// Open opens the WAL at the given snap.
310// The snap SHOULD have been previously saved to the WAL, or the following
311// ReadAll will fail.
312// The returned WAL is ready to read and the first record will be the one after
313// the given snap. The WAL cannot be appended to before reading out all of its
314// previous records.
315func Open(lg *zap.Logger, dirpath string, snap walpb.Snapshot) (*WAL, error) {
316 w, err := openAtIndex(lg, dirpath, snap, true)
317 if err != nil {
318 return nil, err
319 }
320 if w.dirFile, err = fileutil.OpenDir(w.dir); err != nil {
321 return nil, err
322 }
323 return w, nil
324}
325
326// OpenForRead only opens the wal files for read.
327// Write on a read only wal panics.
328func OpenForRead(lg *zap.Logger, dirpath string, snap walpb.Snapshot) (*WAL, error) {
329 return openAtIndex(lg, dirpath, snap, false)
330}
331
332func openAtIndex(lg *zap.Logger, dirpath string, snap walpb.Snapshot, write bool) (*WAL, error) {
333 names, nameIndex, err := selectWALFiles(lg, dirpath, snap)
334 if err != nil {
335 return nil, err
336 }
337
338 rs, ls, closer, err := openWALFiles(lg, dirpath, names, nameIndex, write)
339 if err != nil {
340 return nil, err
341 }
342
343 // create a WAL ready for reading
344 w := &WAL{
345 lg: lg,
346 dir: dirpath,
347 start: snap,
348 decoder: newDecoder(rs...),
349 readClose: closer,
350 locks: ls,
351 }
352
353 if write {
354 // write reuses the file descriptors from read; don't close so
355 // WAL can append without dropping the file lock
356 w.readClose = nil
357 if _, _, err := parseWALName(filepath.Base(w.tail().Name())); err != nil {
358 closer()
359 return nil, err
360 }
361 w.fp = newFilePipeline(lg, w.dir, SegmentSizeBytes)
362 }
363
364 return w, nil
365}
366
367func selectWALFiles(lg *zap.Logger, dirpath string, snap walpb.Snapshot) ([]string, int, error) {
368 names, err := readWALNames(lg, dirpath)
369 if err != nil {
370 return nil, -1, err
371 }
372
373 nameIndex, ok := searchIndex(lg, names, snap.Index)
374 if !ok || !isValidSeq(lg, names[nameIndex:]) {
375 err = ErrFileNotFound
376 return nil, -1, err
377 }
378
379 return names, nameIndex, nil
380}
381
382func openWALFiles(lg *zap.Logger, dirpath string, names []string, nameIndex int, write bool) ([]io.Reader, []*fileutil.LockedFile, func() error, error) {
383 rcs := make([]io.ReadCloser, 0)
384 rs := make([]io.Reader, 0)
385 ls := make([]*fileutil.LockedFile, 0)
386 for _, name := range names[nameIndex:] {
387 p := filepath.Join(dirpath, name)
388 if write {
389 l, err := fileutil.TryLockFile(p, os.O_RDWR, fileutil.PrivateFileMode)
390 if err != nil {
391 closeAll(rcs...)
392 return nil, nil, nil, err
393 }
394 ls = append(ls, l)
395 rcs = append(rcs, l)
396 } else {
397 rf, err := os.OpenFile(p, os.O_RDONLY, fileutil.PrivateFileMode)
398 if err != nil {
399 closeAll(rcs...)
400 return nil, nil, nil, err
401 }
402 ls = append(ls, nil)
403 rcs = append(rcs, rf)
404 }
405 rs = append(rs, rcs[len(rcs)-1])
406 }
407
408 closer := func() error { return closeAll(rcs...) }
409
410 return rs, ls, closer, nil
411}
412
413// ReadAll reads out records of the current WAL.
414// If opened in write mode, it must read out all records until EOF. Or an error
415// will be returned.
416// If opened in read mode, it will try to read all records if possible.
417// If it cannot read out the expected snap, it will return ErrSnapshotNotFound.
418// If loaded snap doesn't match with the expected one, it will return
419// all the records and error ErrSnapshotMismatch.
420// TODO: detect not-last-snap error.
421// TODO: maybe loose the checking of match.
422// After ReadAll, the WAL will be ready for appending new records.
423func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.Entry, err error) {
424 w.mu.Lock()
425 defer w.mu.Unlock()
426
427 rec := &walpb.Record{}
428 decoder := w.decoder
429
430 var match bool
431 for err = decoder.decode(rec); err == nil; err = decoder.decode(rec) {
432 switch rec.Type {
433 case entryType:
434 e := mustUnmarshalEntry(rec.Data)
435 if e.Index > w.start.Index {
436 ents = append(ents[:e.Index-w.start.Index-1], e)
437 }
438 w.enti = e.Index
439
440 case stateType:
441 state = mustUnmarshalState(rec.Data)
442
443 case metadataType:
444 if metadata != nil && !bytes.Equal(metadata, rec.Data) {
445 state.Reset()
446 return nil, state, nil, ErrMetadataConflict
447 }
448 metadata = rec.Data
449
450 case crcType:
451 crc := decoder.crc.Sum32()
452 // current crc of decoder must match the crc of the record.
453 // do no need to match 0 crc, since the decoder is a new one at this case.
454 if crc != 0 && rec.Validate(crc) != nil {
455 state.Reset()
456 return nil, state, nil, ErrCRCMismatch
457 }
458 decoder.updateCRC(rec.Crc)
459
460 case snapshotType:
461 var snap walpb.Snapshot
462 pbutil.MustUnmarshal(&snap, rec.Data)
463 if snap.Index == w.start.Index {
464 if snap.Term != w.start.Term {
465 state.Reset()
466 return nil, state, nil, ErrSnapshotMismatch
467 }
468 match = true
469 }
470
471 default:
472 state.Reset()
473 return nil, state, nil, fmt.Errorf("unexpected block type %d", rec.Type)
474 }
475 }
476
477 switch w.tail() {
478 case nil:
479 // We do not have to read out all entries in read mode.
480 // The last record maybe a partial written one, so
481 // ErrunexpectedEOF might be returned.
482 if err != io.EOF && err != io.ErrUnexpectedEOF {
483 state.Reset()
484 return nil, state, nil, err
485 }
486 default:
487 // We must read all of the entries if WAL is opened in write mode.
488 if err != io.EOF {
489 state.Reset()
490 return nil, state, nil, err
491 }
492 // decodeRecord() will return io.EOF if it detects a zero record,
493 // but this zero record may be followed by non-zero records from
494 // a torn write. Overwriting some of these non-zero records, but
495 // not all, will cause CRC errors on WAL open. Since the records
496 // were never fully synced to disk in the first place, it's safe
497 // to zero them out to avoid any CRC errors from new writes.
498 if _, err = w.tail().Seek(w.decoder.lastOffset(), io.SeekStart); err != nil {
499 return nil, state, nil, err
500 }
501 if err = fileutil.ZeroToEnd(w.tail().File); err != nil {
502 return nil, state, nil, err
503 }
504 }
505
506 err = nil
507 if !match {
508 err = ErrSnapshotNotFound
509 }
510
511 // close decoder, disable reading
512 if w.readClose != nil {
513 w.readClose()
514 w.readClose = nil
515 }
516 w.start = walpb.Snapshot{}
517
518 w.metadata = metadata
519
520 if w.tail() != nil {
521 // create encoder (chain crc with the decoder), enable appending
522 w.encoder, err = newFileEncoder(w.tail().File, w.decoder.lastCRC())
523 if err != nil {
524 return
525 }
526 }
527 w.decoder = nil
528
529 return metadata, state, ents, err
530}
531
532// Verify reads through the given WAL and verifies that it is not corrupted.
533// It creates a new decoder to read through the records of the given WAL.
534// It does not conflict with any open WAL, but it is recommended not to
535// call this function after opening the WAL for writing.
536// If it cannot read out the expected snap, it will return ErrSnapshotNotFound.
537// If the loaded snap doesn't match with the expected one, it will
538// return error ErrSnapshotMismatch.
539func Verify(lg *zap.Logger, walDir string, snap walpb.Snapshot) error {
540 var metadata []byte
541 var err error
542 var match bool
543
544 rec := &walpb.Record{}
545
546 names, nameIndex, err := selectWALFiles(lg, walDir, snap)
547 if err != nil {
548 return err
549 }
550
551 // open wal files in read mode, so that there is no conflict
552 // when the same WAL is opened elsewhere in write mode
553 rs, _, closer, err := openWALFiles(lg, walDir, names, nameIndex, false)
554 if err != nil {
555 return err
556 }
557 defer func() {
558 if closer != nil {
559 closer()
560 }
561 }()
562
563 // create a new decoder from the readers on the WAL files
564 decoder := newDecoder(rs...)
565
566 for err = decoder.decode(rec); err == nil; err = decoder.decode(rec) {
567 switch rec.Type {
568 case metadataType:
569 if metadata != nil && !bytes.Equal(metadata, rec.Data) {
570 return ErrMetadataConflict
571 }
572 metadata = rec.Data
573 case crcType:
574 crc := decoder.crc.Sum32()
575 // Current crc of decoder must match the crc of the record.
576 // We need not match 0 crc, since the decoder is a new one at this point.
577 if crc != 0 && rec.Validate(crc) != nil {
578 return ErrCRCMismatch
579 }
580 decoder.updateCRC(rec.Crc)
581 case snapshotType:
582 var loadedSnap walpb.Snapshot
583 pbutil.MustUnmarshal(&loadedSnap, rec.Data)
584 if loadedSnap.Index == snap.Index {
585 if loadedSnap.Term != snap.Term {
586 return ErrSnapshotMismatch
587 }
588 match = true
589 }
590 // We ignore all entry and state type records as these
591 // are not necessary for validating the WAL contents
592 case entryType:
593 case stateType:
594 default:
595 return fmt.Errorf("unexpected block type %d", rec.Type)
596 }
597 }
598
599 // We do not have to read out all the WAL entries
600 // as the decoder is opened in read mode.
601 if err != io.EOF && err != io.ErrUnexpectedEOF {
602 return err
603 }
604
605 if !match {
606 return ErrSnapshotNotFound
607 }
608
609 return nil
610}
611
612// cut closes current file written and creates a new one ready to append.
613// cut first creates a temp wal file and writes necessary headers into it.
614// Then cut atomically rename temp wal file to a wal file.
615func (w *WAL) cut() error {
616 // close old wal file; truncate to avoid wasting space if an early cut
617 off, serr := w.tail().Seek(0, io.SeekCurrent)
618 if serr != nil {
619 return serr
620 }
621
622 if err := w.tail().Truncate(off); err != nil {
623 return err
624 }
625
626 if err := w.sync(); err != nil {
627 return err
628 }
629
630 fpath := filepath.Join(w.dir, walName(w.seq()+1, w.enti+1))
631
632 // create a temp wal file with name sequence + 1, or truncate the existing one
633 newTail, err := w.fp.Open()
634 if err != nil {
635 return err
636 }
637
638 // update writer and save the previous crc
639 w.locks = append(w.locks, newTail)
640 prevCrc := w.encoder.crc.Sum32()
641 w.encoder, err = newFileEncoder(w.tail().File, prevCrc)
642 if err != nil {
643 return err
644 }
645
646 if err = w.saveCrc(prevCrc); err != nil {
647 return err
648 }
649
650 if err = w.encoder.encode(&walpb.Record{Type: metadataType, Data: w.metadata}); err != nil {
651 return err
652 }
653
654 if err = w.saveState(&w.state); err != nil {
655 return err
656 }
657
658 // atomically move temp wal file to wal file
659 if err = w.sync(); err != nil {
660 return err
661 }
662
663 off, err = w.tail().Seek(0, io.SeekCurrent)
664 if err != nil {
665 return err
666 }
667
668 if err = os.Rename(newTail.Name(), fpath); err != nil {
669 return err
670 }
671 if err = fileutil.Fsync(w.dirFile); err != nil {
672 return err
673 }
674
675 // reopen newTail with its new path so calls to Name() match the wal filename format
676 newTail.Close()
677
678 if newTail, err = fileutil.LockFile(fpath, os.O_WRONLY, fileutil.PrivateFileMode); err != nil {
679 return err
680 }
681 if _, err = newTail.Seek(off, io.SeekStart); err != nil {
682 return err
683 }
684
685 w.locks[len(w.locks)-1] = newTail
686
687 prevCrc = w.encoder.crc.Sum32()
688 w.encoder, err = newFileEncoder(w.tail().File, prevCrc)
689 if err != nil {
690 return err
691 }
692
693 if w.lg != nil {
694 w.lg.Info("created a new WAL segment", zap.String("path", fpath))
695 } else {
696 plog.Infof("segmented wal file %v is created", fpath)
697 }
698 return nil
699}
700
701func (w *WAL) sync() error {
702 if w.encoder != nil {
703 if err := w.encoder.flush(); err != nil {
704 return err
705 }
706 }
707 start := time.Now()
708 err := fileutil.Fdatasync(w.tail().File)
709
710 took := time.Since(start)
711 if took > warnSyncDuration {
712 if w.lg != nil {
713 w.lg.Warn(
714 "slow fdatasync",
715 zap.Duration("took", took),
716 zap.Duration("expected-duration", warnSyncDuration),
717 )
718 } else {
719 plog.Warningf("sync duration of %v, expected less than %v", took, warnSyncDuration)
720 }
721 }
722 walFsyncSec.Observe(took.Seconds())
723
724 return err
725}
726
727// ReleaseLockTo releases the locks, which has smaller index than the given index
728// except the largest one among them.
729// For example, if WAL is holding lock 1,2,3,4,5,6, ReleaseLockTo(4) will release
730// lock 1,2 but keep 3. ReleaseLockTo(5) will release 1,2,3 but keep 4.
731func (w *WAL) ReleaseLockTo(index uint64) error {
732 w.mu.Lock()
733 defer w.mu.Unlock()
734
735 if len(w.locks) == 0 {
736 return nil
737 }
738
739 var smaller int
740 found := false
741 for i, l := range w.locks {
742 _, lockIndex, err := parseWALName(filepath.Base(l.Name()))
743 if err != nil {
744 return err
745 }
746 if lockIndex >= index {
747 smaller = i - 1
748 found = true
749 break
750 }
751 }
752
753 // if no lock index is greater than the release index, we can
754 // release lock up to the last one(excluding).
755 if !found {
756 smaller = len(w.locks) - 1
757 }
758
759 if smaller <= 0 {
760 return nil
761 }
762
763 for i := 0; i < smaller; i++ {
764 if w.locks[i] == nil {
765 continue
766 }
767 w.locks[i].Close()
768 }
769 w.locks = w.locks[smaller:]
770
771 return nil
772}
773
774// Close closes the current WAL file and directory.
775func (w *WAL) Close() error {
776 w.mu.Lock()
777 defer w.mu.Unlock()
778
779 if w.fp != nil {
780 w.fp.Close()
781 w.fp = nil
782 }
783
784 if w.tail() != nil {
785 if err := w.sync(); err != nil {
786 return err
787 }
788 }
789 for _, l := range w.locks {
790 if l == nil {
791 continue
792 }
793 if err := l.Close(); err != nil {
794 if w.lg != nil {
795 w.lg.Warn("failed to close WAL", zap.Error(err))
796 } else {
797 plog.Errorf("failed to unlock during closing wal: %s", err)
798 }
799 }
800 }
801
802 return w.dirFile.Close()
803}
804
805func (w *WAL) saveEntry(e *raftpb.Entry) error {
806 // TODO: add MustMarshalTo to reduce one allocation.
807 b := pbutil.MustMarshal(e)
808 rec := &walpb.Record{Type: entryType, Data: b}
809 if err := w.encoder.encode(rec); err != nil {
810 return err
811 }
812 w.enti = e.Index
813 return nil
814}
815
816func (w *WAL) saveState(s *raftpb.HardState) error {
817 if raft.IsEmptyHardState(*s) {
818 return nil
819 }
820 w.state = *s
821 b := pbutil.MustMarshal(s)
822 rec := &walpb.Record{Type: stateType, Data: b}
823 return w.encoder.encode(rec)
824}
825
826func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error {
827 w.mu.Lock()
828 defer w.mu.Unlock()
829
830 // short cut, do not call sync
831 if raft.IsEmptyHardState(st) && len(ents) == 0 {
832 return nil
833 }
834
835 mustSync := raft.MustSync(st, w.state, len(ents))
836
837 // TODO(xiangli): no more reference operator
838 for i := range ents {
839 if err := w.saveEntry(&ents[i]); err != nil {
840 return err
841 }
842 }
843 if err := w.saveState(&st); err != nil {
844 return err
845 }
846
847 curOff, err := w.tail().Seek(0, io.SeekCurrent)
848 if err != nil {
849 return err
850 }
851 if curOff < SegmentSizeBytes {
852 if mustSync {
853 return w.sync()
854 }
855 return nil
856 }
857
858 return w.cut()
859}
860
861func (w *WAL) SaveSnapshot(e walpb.Snapshot) error {
862 b := pbutil.MustMarshal(&e)
863
864 w.mu.Lock()
865 defer w.mu.Unlock()
866
867 rec := &walpb.Record{Type: snapshotType, Data: b}
868 if err := w.encoder.encode(rec); err != nil {
869 return err
870 }
871 // update enti only when snapshot is ahead of last index
872 if w.enti < e.Index {
873 w.enti = e.Index
874 }
875 return w.sync()
876}
877
878func (w *WAL) saveCrc(prevCrc uint32) error {
879 return w.encoder.encode(&walpb.Record{Type: crcType, Crc: prevCrc})
880}
881
882func (w *WAL) tail() *fileutil.LockedFile {
883 if len(w.locks) > 0 {
884 return w.locks[len(w.locks)-1]
885 }
886 return nil
887}
888
889func (w *WAL) seq() uint64 {
890 t := w.tail()
891 if t == nil {
892 return 0
893 }
894 seq, _, err := parseWALName(filepath.Base(t.Name()))
895 if err != nil {
896 if w.lg != nil {
897 w.lg.Fatal("failed to parse WAL name", zap.String("name", t.Name()), zap.Error(err))
898 } else {
899 plog.Fatalf("bad wal name %s (%v)", t.Name(), err)
900 }
901 }
902 return seq
903}
904
905func closeAll(rcs ...io.ReadCloser) error {
906 for _, f := range rcs {
907 if err := f.Close(); err != nil {
908 return err
909 }
910 }
911 return nil
912}