blob: f1ffc4326b6520fc75d76da61f335af9df1ecba0 [file] [log] [blame]
khenaidood948f772021-08-11 17:49:24 -04001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package wal
16
17import (
18 "bytes"
19 "errors"
20 "fmt"
21 "hash/crc32"
22 "io"
23 "os"
24 "path/filepath"
25 "sync"
26 "time"
27
28 "github.com/coreos/etcd/pkg/fileutil"
29 "github.com/coreos/etcd/pkg/pbutil"
30 "github.com/coreos/etcd/raft"
31 "github.com/coreos/etcd/raft/raftpb"
32 "github.com/coreos/etcd/wal/walpb"
33
34 "github.com/coreos/pkg/capnslog"
35)
36
37const (
38 metadataType int64 = iota + 1
39 entryType
40 stateType
41 crcType
42 snapshotType
43
44 // warnSyncDuration is the amount of time allotted to an fsync before
45 // logging a warning
46 warnSyncDuration = time.Second
47)
48
49var (
50 // SegmentSizeBytes is the preallocated size of each wal segment file.
51 // The actual size might be larger than this. In general, the default
52 // value should be used, but this is defined as an exported variable
53 // so that tests can set a different segment size.
54 SegmentSizeBytes int64 = 64 * 1000 * 1000 // 64MB
55
56 plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "wal")
57
58 ErrMetadataConflict = errors.New("wal: conflicting metadata found")
59 ErrFileNotFound = errors.New("wal: file not found")
60 ErrCRCMismatch = errors.New("wal: crc mismatch")
61 ErrSnapshotMismatch = errors.New("wal: snapshot mismatch")
62 ErrSnapshotNotFound = errors.New("wal: snapshot not found")
63 ErrSliceOutOfRange = errors.New("wal: slice bounds out of range")
64 ErrMaxWALEntrySizeLimitExceeded = errors.New("wal: max entry size limit exceeded")
65 ErrDecoderNotFound = errors.New("wal: decoder not found")
66 crcTable = crc32.MakeTable(crc32.Castagnoli)
67)
68
69// WAL is a logical representation of the stable storage.
70// WAL is either in read mode or append mode but not both.
71// A newly created WAL is in append mode, and ready for appending records.
72// A just opened WAL is in read mode, and ready for reading records.
73// The WAL will be ready for appending after reading out all the previous records.
74type WAL struct {
75 dir string // the living directory of the underlay files
76
77 // dirFile is a fd for the wal directory for syncing on Rename
78 dirFile *os.File
79
80 metadata []byte // metadata recorded at the head of each WAL
81 state raftpb.HardState // hardstate recorded at the head of WAL
82
83 start walpb.Snapshot // snapshot to start reading
84 decoder *decoder // decoder to decode records
85 readClose func() error // closer for decode reader
86
87 mu sync.Mutex
88 enti uint64 // index of the last entry saved to the wal
89 encoder *encoder // encoder to encode records
90
91 locks []*fileutil.LockedFile // the locked files the WAL holds (the name is increasing)
92 fp *filePipeline
93}
94
95// Create creates a WAL ready for appending records. The given metadata is
96// recorded at the head of each WAL file, and can be retrieved with ReadAll
97// after the file is Open.
98func Create(dirpath string, metadata []byte) (*WAL, error) {
99 if Exist(dirpath) {
100 return nil, os.ErrExist
101 }
102
103 // keep temporary wal directory so WAL initialization appears atomic
104 tmpdirpath := filepath.Clean(dirpath) + ".tmp"
105 if fileutil.Exist(tmpdirpath) {
106 if err := os.RemoveAll(tmpdirpath); err != nil {
107 return nil, err
108 }
109 }
110 if err := fileutil.CreateDirAll(tmpdirpath); err != nil {
111 return nil, err
112 }
113
114 p := filepath.Join(tmpdirpath, walName(0, 0))
115 f, err := fileutil.LockFile(p, os.O_WRONLY|os.O_CREATE, fileutil.PrivateFileMode)
116 if err != nil {
117 return nil, err
118 }
119 if _, err = f.Seek(0, io.SeekEnd); err != nil {
120 return nil, err
121 }
122 if err = fileutil.Preallocate(f.File, SegmentSizeBytes, true); err != nil {
123 return nil, err
124 }
125
126 w := &WAL{
127 dir: dirpath,
128 metadata: metadata,
129 }
130 w.encoder, err = newFileEncoder(f.File, 0)
131 if err != nil {
132 return nil, err
133 }
134 w.locks = append(w.locks, f)
135 if err = w.saveCrc(0); err != nil {
136 return nil, err
137 }
138 if err = w.encoder.encode(&walpb.Record{Type: metadataType, Data: metadata}); err != nil {
139 return nil, err
140 }
141 if err = w.SaveSnapshot(walpb.Snapshot{}); err != nil {
142 return nil, err
143 }
144
145 if w, err = w.renameWal(tmpdirpath); err != nil {
146 return nil, err
147 }
148
149 // directory was renamed; sync parent dir to persist rename
150 pdir, perr := fileutil.OpenDir(filepath.Dir(w.dir))
151 if perr != nil {
152 return nil, perr
153 }
154
155 start := time.Now()
156 if perr = fileutil.Fsync(pdir); perr != nil {
157 return nil, perr
158 }
159 syncDurations.Observe(time.Since(start).Seconds())
160
161 if perr = pdir.Close(); err != nil {
162 return nil, perr
163 }
164
165 return w, nil
166}
167
168func (w *WAL) renameWal(tmpdirpath string) (*WAL, error) {
169 if err := os.RemoveAll(w.dir); err != nil {
170 return nil, err
171 }
172 // On non-Windows platforms, hold the lock while renaming. Releasing
173 // the lock and trying to reacquire it quickly can be flaky because
174 // it's possible the process will fork to spawn a process while this is
175 // happening. The fds are set up as close-on-exec by the Go runtime,
176 // but there is a window between the fork and the exec where another
177 // process holds the lock.
178 if err := os.Rename(tmpdirpath, w.dir); err != nil {
179 if _, ok := err.(*os.LinkError); ok {
180 return w.renameWalUnlock(tmpdirpath)
181 }
182 return nil, err
183 }
184 w.fp = newFilePipeline(w.dir, SegmentSizeBytes)
185 df, err := fileutil.OpenDir(w.dir)
186 w.dirFile = df
187 return w, err
188}
189
190func (w *WAL) renameWalUnlock(tmpdirpath string) (*WAL, error) {
191 // rename of directory with locked files doesn't work on windows/cifs;
192 // close the WAL to release the locks so the directory can be renamed.
193 plog.Infof("releasing file lock to rename %q to %q", tmpdirpath, w.dir)
194 w.Close()
195 if err := os.Rename(tmpdirpath, w.dir); err != nil {
196 return nil, err
197 }
198 // reopen and relock
199 newWAL, oerr := Open(w.dir, walpb.Snapshot{})
200 if oerr != nil {
201 return nil, oerr
202 }
203 if _, _, _, err := newWAL.ReadAll(); err != nil {
204 newWAL.Close()
205 return nil, err
206 }
207 return newWAL, nil
208}
209
210// Open opens the WAL at the given snap.
211// The snap SHOULD have been previously saved to the WAL, or the following
212// ReadAll will fail.
213// The returned WAL is ready to read and the first record will be the one after
214// the given snap. The WAL cannot be appended to before reading out all of its
215// previous records.
216func Open(dirpath string, snap walpb.Snapshot) (*WAL, error) {
217 w, err := openAtIndex(dirpath, snap, true)
218 if err != nil {
219 return nil, err
220 }
221 if w.dirFile, err = fileutil.OpenDir(w.dir); err != nil {
222 return nil, err
223 }
224 return w, nil
225}
226
227// OpenForRead only opens the wal files for read.
228// Write on a read only wal panics.
229func OpenForRead(dirpath string, snap walpb.Snapshot) (*WAL, error) {
230 return openAtIndex(dirpath, snap, false)
231}
232
233func openAtIndex(dirpath string, snap walpb.Snapshot, write bool) (*WAL, error) {
234 names, nameIndex, err := selectWALFiles(dirpath, snap)
235 if err != nil {
236 return nil, err
237 }
238
239 rs, ls, closer, err := openWALFiles(dirpath, names, nameIndex, write)
240 if err != nil {
241 return nil, err
242 }
243
244 // create a WAL ready for reading
245 w := &WAL{
246 dir: dirpath,
247 start: snap,
248 decoder: newDecoder(rs...),
249 readClose: closer,
250 locks: ls,
251 }
252
253 if write {
254 // write reuses the file descriptors from read; don't close so
255 // WAL can append without dropping the file lock
256 w.readClose = nil
257 if _, _, err := parseWalName(filepath.Base(w.tail().Name())); err != nil {
258 closer()
259 return nil, err
260 }
261 w.fp = newFilePipeline(w.dir, SegmentSizeBytes)
262 }
263
264 return w, nil
265}
266
267func selectWALFiles(dirpath string, snap walpb.Snapshot) ([]string, int, error) {
268 names, err := readWalNames(dirpath)
269 if err != nil {
270 return nil, -1, err
271 }
272
273 nameIndex, ok := searchIndex(names, snap.Index)
274 if !ok || !isValidSeq(names[nameIndex:]) {
275 err = ErrFileNotFound
276 return nil, -1, err
277 }
278
279 return names, nameIndex, nil
280}
281
282func openWALFiles(dirpath string, names []string, nameIndex int, write bool) ([]io.Reader, []*fileutil.LockedFile, func() error, error) {
283 rcs := make([]io.ReadCloser, 0)
284 rs := make([]io.Reader, 0)
285 ls := make([]*fileutil.LockedFile, 0)
286 for _, name := range names[nameIndex:] {
287 p := filepath.Join(dirpath, name)
288 if write {
289 l, err := fileutil.TryLockFile(p, os.O_RDWR, fileutil.PrivateFileMode)
290 if err != nil {
291 closeAll(rcs...)
292 return nil, nil, nil, err
293 }
294 ls = append(ls, l)
295 rcs = append(rcs, l)
296 } else {
297 rf, err := os.OpenFile(p, os.O_RDONLY, fileutil.PrivateFileMode)
298 if err != nil {
299 closeAll(rcs...)
300 return nil, nil, nil, err
301 }
302 ls = append(ls, nil)
303 rcs = append(rcs, rf)
304 }
305 rs = append(rs, rcs[len(rcs)-1])
306 }
307
308 closer := func() error { return closeAll(rcs...) }
309
310 return rs, ls, closer, nil
311}
312
313// ReadAll reads out records of the current WAL.
314// If opened in write mode, it must read out all records until EOF. Or an error
315// will be returned.
316// If opened in read mode, it will try to read all records if possible.
317// If it cannot read out the expected snap, it will return ErrSnapshotNotFound.
318// If loaded snap doesn't match with the expected one, it will return
319// all the records and error ErrSnapshotMismatch.
320// TODO: detect not-last-snap error.
321// TODO: maybe loose the checking of match.
322// After ReadAll, the WAL will be ready for appending new records.
323func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.Entry, err error) {
324 w.mu.Lock()
325 defer w.mu.Unlock()
326
327 rec := &walpb.Record{}
328
329 if w.decoder == nil {
330 return nil, state, nil, ErrDecoderNotFound
331 }
332 decoder := w.decoder
333
334 var match bool
335 for err = decoder.decode(rec); err == nil; err = decoder.decode(rec) {
336 switch rec.Type {
337 case entryType:
338 e := mustUnmarshalEntry(rec.Data)
339 // 0 <= e.Index-w.start.Index - 1 < len(ents)
340 if e.Index > w.start.Index {
341 // prevent "panic: runtime error: slice bounds out of range [:13038096702221461992] with capacity 0"
342 up := e.Index - w.start.Index - 1
343 if up > uint64(len(ents)) {
344 // return error before append call causes runtime panic
345 return nil, state, nil, ErrSliceOutOfRange
346 }
347 ents = append(ents[:up], e)
348 }
349 w.enti = e.Index
350 case stateType:
351 state = mustUnmarshalState(rec.Data)
352 case metadataType:
353 if metadata != nil && !bytes.Equal(metadata, rec.Data) {
354 state.Reset()
355 return nil, state, nil, ErrMetadataConflict
356 }
357 metadata = rec.Data
358 case crcType:
359 crc := decoder.crc.Sum32()
360 // current crc of decoder must match the crc of the record.
361 // do no need to match 0 crc, since the decoder is a new one at this case.
362 if crc != 0 && rec.Validate(crc) != nil {
363 state.Reset()
364 return nil, state, nil, ErrCRCMismatch
365 }
366 decoder.updateCRC(rec.Crc)
367 case snapshotType:
368 var snap walpb.Snapshot
369 pbutil.MustUnmarshal(&snap, rec.Data)
370 if snap.Index == w.start.Index {
371 if snap.Term != w.start.Term {
372 state.Reset()
373 return nil, state, nil, ErrSnapshotMismatch
374 }
375 match = true
376 }
377 default:
378 state.Reset()
379 return nil, state, nil, fmt.Errorf("unexpected block type %d", rec.Type)
380 }
381 }
382
383 switch w.tail() {
384 case nil:
385 // We do not have to read out all entries in read mode.
386 // The last record maybe a partial written one, so
387 // ErrunexpectedEOF might be returned.
388 if err != io.EOF && err != io.ErrUnexpectedEOF {
389 state.Reset()
390 return nil, state, nil, err
391 }
392 default:
393 // We must read all of the entries if WAL is opened in write mode.
394 if err != io.EOF {
395 state.Reset()
396 return nil, state, nil, err
397 }
398 // decodeRecord() will return io.EOF if it detects a zero record,
399 // but this zero record may be followed by non-zero records from
400 // a torn write. Overwriting some of these non-zero records, but
401 // not all, will cause CRC errors on WAL open. Since the records
402 // were never fully synced to disk in the first place, it's safe
403 // to zero them out to avoid any CRC errors from new writes.
404 if _, err = w.tail().Seek(w.decoder.lastOffset(), io.SeekStart); err != nil {
405 return nil, state, nil, err
406 }
407 if err = fileutil.ZeroToEnd(w.tail().File); err != nil {
408 return nil, state, nil, err
409 }
410 }
411
412 err = nil
413 if !match {
414 err = ErrSnapshotNotFound
415 }
416
417 // close decoder, disable reading
418 if w.readClose != nil {
419 w.readClose()
420 w.readClose = nil
421 }
422 w.start = walpb.Snapshot{}
423
424 w.metadata = metadata
425
426 if w.tail() != nil {
427 // create encoder (chain crc with the decoder), enable appending
428 w.encoder, err = newFileEncoder(w.tail().File, w.decoder.lastCRC())
429 if err != nil {
430 return
431 }
432 }
433 w.decoder = nil
434
435 return metadata, state, ents, err
436}
437
438// ValidSnapshotEntries returns all the valid snapshot entries in the wal logs in the given directory.
439// Snapshot entries are valid if their index is less than or equal to the most recent committed hardstate.
440func ValidSnapshotEntries(walDir string) ([]walpb.Snapshot, error) {
441 var snaps []walpb.Snapshot
442 var state raftpb.HardState
443 var err error
444
445 rec := &walpb.Record{}
446 names, err := readWalNames(walDir)
447 if err != nil {
448 return nil, err
449 }
450
451 // open wal files in read mode, so that there is no conflict
452 // when the same WAL is opened elsewhere in write mode
453 rs, _, closer, err := openWALFiles(walDir, names, 0, false)
454 if err != nil {
455 return nil, err
456 }
457 defer func() {
458 if closer != nil {
459 closer()
460 }
461 }()
462
463 // create a new decoder from the readers on the WAL files
464 decoder := newDecoder(rs...)
465
466 for err = decoder.decode(rec); err == nil; err = decoder.decode(rec) {
467 switch rec.Type {
468 case snapshotType:
469 var loadedSnap walpb.Snapshot
470 pbutil.MustUnmarshal(&loadedSnap, rec.Data)
471 snaps = append(snaps, loadedSnap)
472 case stateType:
473 state = mustUnmarshalState(rec.Data)
474 case crcType:
475 crc := decoder.crc.Sum32()
476 // current crc of decoder must match the crc of the record.
477 // do no need to match 0 crc, since the decoder is a new one at this case.
478 if crc != 0 && rec.Validate(crc) != nil {
479 return nil, ErrCRCMismatch
480 }
481 decoder.updateCRC(rec.Crc)
482 }
483 }
484 // We do not have to read out all the WAL entries
485 // as the decoder is opened in read mode.
486 if err != io.EOF && err != io.ErrUnexpectedEOF {
487 return nil, err
488 }
489
490 // filter out any snaps that are newer than the committed hardstate
491 n := 0
492 for _, s := range snaps {
493 if s.Index <= state.Commit {
494 snaps[n] = s
495 n++
496 }
497 }
498 snaps = snaps[:n:n]
499
500 return snaps, nil
501}
502
503// Verify reads through the given WAL and verifies that it is not corrupted.
504// It creates a new decoder to read through the records of the given WAL.
505// It does not conflict with any open WAL, but it is recommended not to
506// call this function after opening the WAL for writing.
507// If it cannot read out the expected snap, it will return ErrSnapshotNotFound.
508// If the loaded snap doesn't match with the expected one, it will
509// return error ErrSnapshotMismatch.
510func Verify(walDir string, snap walpb.Snapshot) error {
511 var metadata []byte
512 var err error
513 var match bool
514
515 rec := &walpb.Record{}
516
517 names, nameIndex, err := selectWALFiles(walDir, snap)
518 if err != nil {
519 return err
520 }
521
522 // open wal files in read mode, so that there is no conflict
523 // when the same WAL is opened elsewhere in write mode
524 rs, _, closer, err := openWALFiles(walDir, names, nameIndex, false)
525 if err != nil {
526 return err
527 }
528
529 // create a new decoder from the readers on the WAL files
530 decoder := newDecoder(rs...)
531
532 for err = decoder.decode(rec); err == nil; err = decoder.decode(rec) {
533 switch rec.Type {
534 case metadataType:
535 if metadata != nil && !bytes.Equal(metadata, rec.Data) {
536 return ErrMetadataConflict
537 }
538 metadata = rec.Data
539 case crcType:
540 crc := decoder.crc.Sum32()
541 // Current crc of decoder must match the crc of the record.
542 // We need not match 0 crc, since the decoder is a new one at this point.
543 if crc != 0 && rec.Validate(crc) != nil {
544 return ErrCRCMismatch
545 }
546 decoder.updateCRC(rec.Crc)
547 case snapshotType:
548 var loadedSnap walpb.Snapshot
549 pbutil.MustUnmarshal(&loadedSnap, rec.Data)
550 if loadedSnap.Index == snap.Index {
551 if loadedSnap.Term != snap.Term {
552 return ErrSnapshotMismatch
553 }
554 match = true
555 }
556 // We ignore all entry and state type records as these
557 // are not necessary for validating the WAL contents
558 case entryType:
559 case stateType:
560 default:
561 return fmt.Errorf("unexpected block type %d", rec.Type)
562 }
563 }
564
565 if closer != nil {
566 closer()
567 }
568
569 // We do not have to read out all the WAL entries
570 // as the decoder is opened in read mode.
571 if err != io.EOF && err != io.ErrUnexpectedEOF {
572 return err
573 }
574
575 if !match {
576 return ErrSnapshotNotFound
577 }
578
579 return nil
580}
581
582// cut closes current file written and creates a new one ready to append.
583// cut first creates a temp wal file and writes necessary headers into it.
584// Then cut atomically rename temp wal file to a wal file.
585func (w *WAL) cut() error {
586 // close old wal file; truncate to avoid wasting space if an early cut
587 off, serr := w.tail().Seek(0, io.SeekCurrent)
588 if serr != nil {
589 return serr
590 }
591 if err := w.tail().Truncate(off); err != nil {
592 return err
593 }
594 if err := w.sync(); err != nil {
595 return err
596 }
597
598 fpath := filepath.Join(w.dir, walName(w.seq()+1, w.enti+1))
599
600 // create a temp wal file with name sequence + 1, or truncate the existing one
601 newTail, err := w.fp.Open()
602 if err != nil {
603 return err
604 }
605
606 // update writer and save the previous crc
607 w.locks = append(w.locks, newTail)
608 prevCrc := w.encoder.crc.Sum32()
609 w.encoder, err = newFileEncoder(w.tail().File, prevCrc)
610 if err != nil {
611 return err
612 }
613 if err = w.saveCrc(prevCrc); err != nil {
614 return err
615 }
616 if err = w.encoder.encode(&walpb.Record{Type: metadataType, Data: w.metadata}); err != nil {
617 return err
618 }
619 if err = w.saveState(&w.state); err != nil {
620 return err
621 }
622 // atomically move temp wal file to wal file
623 if err = w.sync(); err != nil {
624 return err
625 }
626
627 off, err = w.tail().Seek(0, io.SeekCurrent)
628 if err != nil {
629 return err
630 }
631
632 if err = os.Rename(newTail.Name(), fpath); err != nil {
633 return err
634 }
635
636 start := time.Now()
637 if err = fileutil.Fsync(w.dirFile); err != nil {
638 return err
639 }
640 syncDurations.Observe(time.Since(start).Seconds())
641
642 // reopen newTail with its new path so calls to Name() match the wal filename format
643 newTail.Close()
644
645 if newTail, err = fileutil.LockFile(fpath, os.O_WRONLY, fileutil.PrivateFileMode); err != nil {
646 return err
647 }
648 if _, err = newTail.Seek(off, io.SeekStart); err != nil {
649 return err
650 }
651
652 w.locks[len(w.locks)-1] = newTail
653
654 prevCrc = w.encoder.crc.Sum32()
655 w.encoder, err = newFileEncoder(w.tail().File, prevCrc)
656 if err != nil {
657 return err
658 }
659
660 plog.Infof("segmented wal file %v is created", fpath)
661 return nil
662}
663
664func (w *WAL) sync() error {
665 if w.encoder != nil {
666 if err := w.encoder.flush(); err != nil {
667 return err
668 }
669 }
670 start := time.Now()
671 err := fileutil.Fdatasync(w.tail().File)
672
673 duration := time.Since(start)
674 if duration > warnSyncDuration {
675 plog.Warningf("sync duration of %v, expected less than %v", duration, warnSyncDuration)
676 }
677 syncDurations.Observe(duration.Seconds())
678
679 return err
680}
681
682func (w *WAL) Sync() error {
683 return w.sync()
684}
685
686// ReleaseLockTo releases the locks, which has smaller index than the given index
687// except the largest one among them.
688// For example, if WAL is holding lock 1,2,3,4,5,6, ReleaseLockTo(4) will release
689// lock 1,2 but keep 3. ReleaseLockTo(5) will release 1,2,3 but keep 4.
690func (w *WAL) ReleaseLockTo(index uint64) error {
691 w.mu.Lock()
692 defer w.mu.Unlock()
693
694 if len(w.locks) == 0 {
695 return nil
696 }
697
698 var smaller int
699 found := false
700
701 for i, l := range w.locks {
702 _, lockIndex, err := parseWalName(filepath.Base(l.Name()))
703 if err != nil {
704 return err
705 }
706 if lockIndex >= index {
707 smaller = i - 1
708 found = true
709 break
710 }
711 }
712
713 // if no lock index is greater than the release index, we can
714 // release lock up to the last one(excluding).
715 if !found {
716 smaller = len(w.locks) - 1
717 }
718
719 if smaller <= 0 {
720 return nil
721 }
722
723 for i := 0; i < smaller; i++ {
724 if w.locks[i] == nil {
725 continue
726 }
727 w.locks[i].Close()
728 }
729 w.locks = w.locks[smaller:]
730
731 return nil
732}
733
734func (w *WAL) Close() error {
735 w.mu.Lock()
736 defer w.mu.Unlock()
737
738 if w.fp != nil {
739 w.fp.Close()
740 w.fp = nil
741 }
742
743 if w.tail() != nil {
744 if err := w.sync(); err != nil {
745 return err
746 }
747 }
748 for _, l := range w.locks {
749 if l == nil {
750 continue
751 }
752 if err := l.Close(); err != nil {
753 plog.Errorf("failed to unlock during closing wal: %s", err)
754 }
755 }
756
757 return w.dirFile.Close()
758}
759
760func (w *WAL) saveEntry(e *raftpb.Entry) error {
761 // TODO: add MustMarshalTo to reduce one allocation.
762 b := pbutil.MustMarshal(e)
763 rec := &walpb.Record{Type: entryType, Data: b}
764 if err := w.encoder.encode(rec); err != nil {
765 return err
766 }
767 w.enti = e.Index
768 return nil
769}
770
771func (w *WAL) saveState(s *raftpb.HardState) error {
772 if raft.IsEmptyHardState(*s) {
773 return nil
774 }
775 w.state = *s
776 b := pbutil.MustMarshal(s)
777 rec := &walpb.Record{Type: stateType, Data: b}
778 return w.encoder.encode(rec)
779}
780
781func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error {
782 w.mu.Lock()
783 defer w.mu.Unlock()
784
785 // short cut, do not call sync
786 if raft.IsEmptyHardState(st) && len(ents) == 0 {
787 return nil
788 }
789
790 mustSync := raft.MustSync(st, w.state, len(ents))
791
792 // TODO(xiangli): no more reference operator
793 for i := range ents {
794 if err := w.saveEntry(&ents[i]); err != nil {
795 return err
796 }
797 }
798 if err := w.saveState(&st); err != nil {
799 return err
800 }
801
802 curOff, err := w.tail().Seek(0, io.SeekCurrent)
803 if err != nil {
804 return err
805 }
806 if curOff < SegmentSizeBytes {
807 if mustSync {
808 return w.sync()
809 }
810 return nil
811 }
812
813 return w.cut()
814}
815
816func (w *WAL) SaveSnapshot(e walpb.Snapshot) error {
817 b := pbutil.MustMarshal(&e)
818
819 w.mu.Lock()
820 defer w.mu.Unlock()
821
822 rec := &walpb.Record{Type: snapshotType, Data: b}
823 if err := w.encoder.encode(rec); err != nil {
824 return err
825 }
826 // update enti only when snapshot is ahead of last index
827 if w.enti < e.Index {
828 w.enti = e.Index
829 }
830 return w.sync()
831}
832
833func (w *WAL) saveCrc(prevCrc uint32) error {
834 return w.encoder.encode(&walpb.Record{Type: crcType, Crc: prevCrc})
835}
836
837func (w *WAL) tail() *fileutil.LockedFile {
838 if len(w.locks) > 0 {
839 return w.locks[len(w.locks)-1]
840 }
841 return nil
842}
843
844func (w *WAL) seq() uint64 {
845 t := w.tail()
846 if t == nil {
847 return 0
848 }
849 seq, _, err := parseWalName(filepath.Base(t.Name()))
850 if err != nil {
851 plog.Fatalf("bad wal name %s (%v)", t.Name(), err)
852 }
853 return seq
854}
855
856func closeAll(rcs ...io.ReadCloser) error {
857 for _, f := range rcs {
858 if err := f.Close(); err != nil {
859 return err
860 }
861 }
862 return nil
863}