blob: 96d01a23af69f1285eb8ac83761b2b83ec3a1dbb [file] [log] [blame]
khenaidooffe076b2019-01-15 16:08:08 -05001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package wal
16
17import (
18 "bytes"
19 "errors"
20 "fmt"
21 "hash/crc32"
22 "io"
23 "os"
24 "path/filepath"
25 "sync"
26 "time"
27
28 "github.com/coreos/etcd/pkg/fileutil"
29 "github.com/coreos/etcd/pkg/pbutil"
30 "github.com/coreos/etcd/raft"
31 "github.com/coreos/etcd/raft/raftpb"
32 "github.com/coreos/etcd/wal/walpb"
33
34 "github.com/coreos/pkg/capnslog"
35)
36
37const (
38 metadataType int64 = iota + 1
39 entryType
40 stateType
41 crcType
42 snapshotType
43
44 // warnSyncDuration is the amount of time allotted to an fsync before
45 // logging a warning
46 warnSyncDuration = time.Second
47)
48
49var (
50 // SegmentSizeBytes is the preallocated size of each wal segment file.
51 // The actual size might be larger than this. In general, the default
52 // value should be used, but this is defined as an exported variable
53 // so that tests can set a different segment size.
54 SegmentSizeBytes int64 = 64 * 1000 * 1000 // 64MB
55
56 plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "wal")
57
58 ErrMetadataConflict = errors.New("wal: conflicting metadata found")
59 ErrFileNotFound = errors.New("wal: file not found")
60 ErrCRCMismatch = errors.New("wal: crc mismatch")
61 ErrSnapshotMismatch = errors.New("wal: snapshot mismatch")
62 ErrSnapshotNotFound = errors.New("wal: snapshot not found")
63 crcTable = crc32.MakeTable(crc32.Castagnoli)
64)
65
66// WAL is a logical representation of the stable storage.
67// WAL is either in read mode or append mode but not both.
68// A newly created WAL is in append mode, and ready for appending records.
69// A just opened WAL is in read mode, and ready for reading records.
70// The WAL will be ready for appending after reading out all the previous records.
71type WAL struct {
72 dir string // the living directory of the underlay files
73
74 // dirFile is a fd for the wal directory for syncing on Rename
75 dirFile *os.File
76
77 metadata []byte // metadata recorded at the head of each WAL
78 state raftpb.HardState // hardstate recorded at the head of WAL
79
80 start walpb.Snapshot // snapshot to start reading
81 decoder *decoder // decoder to decode records
82 readClose func() error // closer for decode reader
83
84 mu sync.Mutex
85 enti uint64 // index of the last entry saved to the wal
86 encoder *encoder // encoder to encode records
87
88 locks []*fileutil.LockedFile // the locked files the WAL holds (the name is increasing)
89 fp *filePipeline
90}
91
92// Create creates a WAL ready for appending records. The given metadata is
93// recorded at the head of each WAL file, and can be retrieved with ReadAll.
94func Create(dirpath string, metadata []byte) (*WAL, error) {
95 if Exist(dirpath) {
96 return nil, os.ErrExist
97 }
98
99 // keep temporary wal directory so WAL initialization appears atomic
100 tmpdirpath := filepath.Clean(dirpath) + ".tmp"
101 if fileutil.Exist(tmpdirpath) {
102 if err := os.RemoveAll(tmpdirpath); err != nil {
103 return nil, err
104 }
105 }
106 if err := fileutil.CreateDirAll(tmpdirpath); err != nil {
107 return nil, err
108 }
109
110 p := filepath.Join(tmpdirpath, walName(0, 0))
111 f, err := fileutil.LockFile(p, os.O_WRONLY|os.O_CREATE, fileutil.PrivateFileMode)
112 if err != nil {
113 return nil, err
114 }
115 if _, err = f.Seek(0, io.SeekEnd); err != nil {
116 return nil, err
117 }
118 if err = fileutil.Preallocate(f.File, SegmentSizeBytes, true); err != nil {
119 return nil, err
120 }
121
122 w := &WAL{
123 dir: dirpath,
124 metadata: metadata,
125 }
126 w.encoder, err = newFileEncoder(f.File, 0)
127 if err != nil {
128 return nil, err
129 }
130 w.locks = append(w.locks, f)
131 if err = w.saveCrc(0); err != nil {
132 return nil, err
133 }
134 if err = w.encoder.encode(&walpb.Record{Type: metadataType, Data: metadata}); err != nil {
135 return nil, err
136 }
137 if err = w.SaveSnapshot(walpb.Snapshot{}); err != nil {
138 return nil, err
139 }
140
141 if w, err = w.renameWal(tmpdirpath); err != nil {
142 return nil, err
143 }
144
145 // directory was renamed; sync parent dir to persist rename
146 pdir, perr := fileutil.OpenDir(filepath.Dir(w.dir))
147 if perr != nil {
148 return nil, perr
149 }
150 if perr = fileutil.Fsync(pdir); perr != nil {
151 return nil, perr
152 }
153 if perr = pdir.Close(); err != nil {
154 return nil, perr
155 }
156
157 return w, nil
158}
159
160func (w *WAL) renameWal(tmpdirpath string) (*WAL, error) {
161 if err := os.RemoveAll(w.dir); err != nil {
162 return nil, err
163 }
164 // On non-Windows platforms, hold the lock while renaming. Releasing
165 // the lock and trying to reacquire it quickly can be flaky because
166 // it's possible the process will fork to spawn a process while this is
167 // happening. The fds are set up as close-on-exec by the Go runtime,
168 // but there is a window between the fork and the exec where another
169 // process holds the lock.
170 if err := os.Rename(tmpdirpath, w.dir); err != nil {
171 if _, ok := err.(*os.LinkError); ok {
172 return w.renameWalUnlock(tmpdirpath)
173 }
174 return nil, err
175 }
176 w.fp = newFilePipeline(w.dir, SegmentSizeBytes)
177 df, err := fileutil.OpenDir(w.dir)
178 w.dirFile = df
179 return w, err
180}
181
182func (w *WAL) renameWalUnlock(tmpdirpath string) (*WAL, error) {
183 // rename of directory with locked files doesn't work on windows/cifs;
184 // close the WAL to release the locks so the directory can be renamed.
185 plog.Infof("releasing file lock to rename %q to %q", tmpdirpath, w.dir)
186 w.Close()
187 if err := os.Rename(tmpdirpath, w.dir); err != nil {
188 return nil, err
189 }
190 // reopen and relock
191 newWAL, oerr := Open(w.dir, walpb.Snapshot{})
192 if oerr != nil {
193 return nil, oerr
194 }
195 if _, _, _, err := newWAL.ReadAll(); err != nil {
196 newWAL.Close()
197 return nil, err
198 }
199 return newWAL, nil
200}
201
202// Open opens the WAL at the given snap.
203// The snap SHOULD have been previously saved to the WAL, or the following
204// ReadAll will fail.
205// The returned WAL is ready to read and the first record will be the one after
206// the given snap. The WAL cannot be appended to before reading out all of its
207// previous records.
208func Open(dirpath string, snap walpb.Snapshot) (*WAL, error) {
209 w, err := openAtIndex(dirpath, snap, true)
210 if err != nil {
211 return nil, err
212 }
213 if w.dirFile, err = fileutil.OpenDir(w.dir); err != nil {
214 return nil, err
215 }
216 return w, nil
217}
218
219// OpenForRead only opens the wal files for read.
220// Write on a read only wal panics.
221func OpenForRead(dirpath string, snap walpb.Snapshot) (*WAL, error) {
222 return openAtIndex(dirpath, snap, false)
223}
224
225func openAtIndex(dirpath string, snap walpb.Snapshot, write bool) (*WAL, error) {
226 names, err := readWalNames(dirpath)
227 if err != nil {
228 return nil, err
229 }
230
231 nameIndex, ok := searchIndex(names, snap.Index)
232 if !ok || !isValidSeq(names[nameIndex:]) {
233 return nil, ErrFileNotFound
234 }
235
236 // open the wal files
237 rcs := make([]io.ReadCloser, 0)
238 rs := make([]io.Reader, 0)
239 ls := make([]*fileutil.LockedFile, 0)
240 for _, name := range names[nameIndex:] {
241 p := filepath.Join(dirpath, name)
242 if write {
243 l, err := fileutil.TryLockFile(p, os.O_RDWR, fileutil.PrivateFileMode)
244 if err != nil {
245 closeAll(rcs...)
246 return nil, err
247 }
248 ls = append(ls, l)
249 rcs = append(rcs, l)
250 } else {
251 rf, err := os.OpenFile(p, os.O_RDONLY, fileutil.PrivateFileMode)
252 if err != nil {
253 closeAll(rcs...)
254 return nil, err
255 }
256 ls = append(ls, nil)
257 rcs = append(rcs, rf)
258 }
259 rs = append(rs, rcs[len(rcs)-1])
260 }
261
262 closer := func() error { return closeAll(rcs...) }
263
264 // create a WAL ready for reading
265 w := &WAL{
266 dir: dirpath,
267 start: snap,
268 decoder: newDecoder(rs...),
269 readClose: closer,
270 locks: ls,
271 }
272
273 if write {
274 // write reuses the file descriptors from read; don't close so
275 // WAL can append without dropping the file lock
276 w.readClose = nil
277 if _, _, err := parseWalName(filepath.Base(w.tail().Name())); err != nil {
278 closer()
279 return nil, err
280 }
281 w.fp = newFilePipeline(w.dir, SegmentSizeBytes)
282 }
283
284 return w, nil
285}
286
287// ReadAll reads out records of the current WAL.
288// If opened in write mode, it must read out all records until EOF. Or an error
289// will be returned.
290// If opened in read mode, it will try to read all records if possible.
291// If it cannot read out the expected snap, it will return ErrSnapshotNotFound.
292// If loaded snap doesn't match with the expected one, it will return
293// all the records and error ErrSnapshotMismatch.
294// TODO: detect not-last-snap error.
295// TODO: maybe loose the checking of match.
296// After ReadAll, the WAL will be ready for appending new records.
297func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.Entry, err error) {
298 w.mu.Lock()
299 defer w.mu.Unlock()
300
301 rec := &walpb.Record{}
302 decoder := w.decoder
303
304 var match bool
305 for err = decoder.decode(rec); err == nil; err = decoder.decode(rec) {
306 switch rec.Type {
307 case entryType:
308 e := mustUnmarshalEntry(rec.Data)
309 if e.Index > w.start.Index {
310 ents = append(ents[:e.Index-w.start.Index-1], e)
311 }
312 w.enti = e.Index
313 case stateType:
314 state = mustUnmarshalState(rec.Data)
315 case metadataType:
316 if metadata != nil && !bytes.Equal(metadata, rec.Data) {
317 state.Reset()
318 return nil, state, nil, ErrMetadataConflict
319 }
320 metadata = rec.Data
321 case crcType:
322 crc := decoder.crc.Sum32()
323 // current crc of decoder must match the crc of the record.
324 // do no need to match 0 crc, since the decoder is a new one at this case.
325 if crc != 0 && rec.Validate(crc) != nil {
326 state.Reset()
327 return nil, state, nil, ErrCRCMismatch
328 }
329 decoder.updateCRC(rec.Crc)
330 case snapshotType:
331 var snap walpb.Snapshot
332 pbutil.MustUnmarshal(&snap, rec.Data)
333 if snap.Index == w.start.Index {
334 if snap.Term != w.start.Term {
335 state.Reset()
336 return nil, state, nil, ErrSnapshotMismatch
337 }
338 match = true
339 }
340 default:
341 state.Reset()
342 return nil, state, nil, fmt.Errorf("unexpected block type %d", rec.Type)
343 }
344 }
345
346 switch w.tail() {
347 case nil:
348 // We do not have to read out all entries in read mode.
349 // The last record maybe a partial written one, so
350 // ErrunexpectedEOF might be returned.
351 if err != io.EOF && err != io.ErrUnexpectedEOF {
352 state.Reset()
353 return nil, state, nil, err
354 }
355 default:
356 // We must read all of the entries if WAL is opened in write mode.
357 if err != io.EOF {
358 state.Reset()
359 return nil, state, nil, err
360 }
361 // decodeRecord() will return io.EOF if it detects a zero record,
362 // but this zero record may be followed by non-zero records from
363 // a torn write. Overwriting some of these non-zero records, but
364 // not all, will cause CRC errors on WAL open. Since the records
365 // were never fully synced to disk in the first place, it's safe
366 // to zero them out to avoid any CRC errors from new writes.
367 if _, err = w.tail().Seek(w.decoder.lastOffset(), io.SeekStart); err != nil {
368 return nil, state, nil, err
369 }
370 if err = fileutil.ZeroToEnd(w.tail().File); err != nil {
371 return nil, state, nil, err
372 }
373 }
374
375 err = nil
376 if !match {
377 err = ErrSnapshotNotFound
378 }
379
380 // close decoder, disable reading
381 if w.readClose != nil {
382 w.readClose()
383 w.readClose = nil
384 }
385 w.start = walpb.Snapshot{}
386
387 w.metadata = metadata
388
389 if w.tail() != nil {
390 // create encoder (chain crc with the decoder), enable appending
391 w.encoder, err = newFileEncoder(w.tail().File, w.decoder.lastCRC())
392 if err != nil {
393 return
394 }
395 }
396 w.decoder = nil
397
398 return metadata, state, ents, err
399}
400
401// cut closes current file written and creates a new one ready to append.
402// cut first creates a temp wal file and writes necessary headers into it.
403// Then cut atomically rename temp wal file to a wal file.
404func (w *WAL) cut() error {
405 // close old wal file; truncate to avoid wasting space if an early cut
406 off, serr := w.tail().Seek(0, io.SeekCurrent)
407 if serr != nil {
408 return serr
409 }
410 if err := w.tail().Truncate(off); err != nil {
411 return err
412 }
413 if err := w.sync(); err != nil {
414 return err
415 }
416
417 fpath := filepath.Join(w.dir, walName(w.seq()+1, w.enti+1))
418
419 // create a temp wal file with name sequence + 1, or truncate the existing one
420 newTail, err := w.fp.Open()
421 if err != nil {
422 return err
423 }
424
425 // update writer and save the previous crc
426 w.locks = append(w.locks, newTail)
427 prevCrc := w.encoder.crc.Sum32()
428 w.encoder, err = newFileEncoder(w.tail().File, prevCrc)
429 if err != nil {
430 return err
431 }
432 if err = w.saveCrc(prevCrc); err != nil {
433 return err
434 }
435 if err = w.encoder.encode(&walpb.Record{Type: metadataType, Data: w.metadata}); err != nil {
436 return err
437 }
438 if err = w.saveState(&w.state); err != nil {
439 return err
440 }
441 // atomically move temp wal file to wal file
442 if err = w.sync(); err != nil {
443 return err
444 }
445
446 off, err = w.tail().Seek(0, io.SeekCurrent)
447 if err != nil {
448 return err
449 }
450
451 if err = os.Rename(newTail.Name(), fpath); err != nil {
452 return err
453 }
454 if err = fileutil.Fsync(w.dirFile); err != nil {
455 return err
456 }
457
458 // reopen newTail with its new path so calls to Name() match the wal filename format
459 newTail.Close()
460
461 if newTail, err = fileutil.LockFile(fpath, os.O_WRONLY, fileutil.PrivateFileMode); err != nil {
462 return err
463 }
464 if _, err = newTail.Seek(off, io.SeekStart); err != nil {
465 return err
466 }
467
468 w.locks[len(w.locks)-1] = newTail
469
470 prevCrc = w.encoder.crc.Sum32()
471 w.encoder, err = newFileEncoder(w.tail().File, prevCrc)
472 if err != nil {
473 return err
474 }
475
476 plog.Infof("segmented wal file %v is created", fpath)
477 return nil
478}
479
480func (w *WAL) sync() error {
481 if w.encoder != nil {
482 if err := w.encoder.flush(); err != nil {
483 return err
484 }
485 }
486 start := time.Now()
487 err := fileutil.Fdatasync(w.tail().File)
488
489 duration := time.Since(start)
490 if duration > warnSyncDuration {
491 plog.Warningf("sync duration of %v, expected less than %v", duration, warnSyncDuration)
492 }
493 syncDurations.Observe(duration.Seconds())
494
495 return err
496}
497
498// ReleaseLockTo releases the locks, which has smaller index than the given index
499// except the largest one among them.
500// For example, if WAL is holding lock 1,2,3,4,5,6, ReleaseLockTo(4) will release
501// lock 1,2 but keep 3. ReleaseLockTo(5) will release 1,2,3 but keep 4.
502func (w *WAL) ReleaseLockTo(index uint64) error {
503 w.mu.Lock()
504 defer w.mu.Unlock()
505
506 if len(w.locks) == 0 {
507 return nil
508 }
509
510 var smaller int
511 found := false
512
513 for i, l := range w.locks {
514 _, lockIndex, err := parseWalName(filepath.Base(l.Name()))
515 if err != nil {
516 return err
517 }
518 if lockIndex >= index {
519 smaller = i - 1
520 found = true
521 break
522 }
523 }
524
525 // if no lock index is greater than the release index, we can
526 // release lock up to the last one(excluding).
527 if !found {
528 smaller = len(w.locks) - 1
529 }
530
531 if smaller <= 0 {
532 return nil
533 }
534
535 for i := 0; i < smaller; i++ {
536 if w.locks[i] == nil {
537 continue
538 }
539 w.locks[i].Close()
540 }
541 w.locks = w.locks[smaller:]
542
543 return nil
544}
545
546func (w *WAL) Close() error {
547 w.mu.Lock()
548 defer w.mu.Unlock()
549
550 if w.fp != nil {
551 w.fp.Close()
552 w.fp = nil
553 }
554
555 if w.tail() != nil {
556 if err := w.sync(); err != nil {
557 return err
558 }
559 }
560 for _, l := range w.locks {
561 if l == nil {
562 continue
563 }
564 if err := l.Close(); err != nil {
565 plog.Errorf("failed to unlock during closing wal: %s", err)
566 }
567 }
568
569 return w.dirFile.Close()
570}
571
572func (w *WAL) saveEntry(e *raftpb.Entry) error {
573 // TODO: add MustMarshalTo to reduce one allocation.
574 b := pbutil.MustMarshal(e)
575 rec := &walpb.Record{Type: entryType, Data: b}
576 if err := w.encoder.encode(rec); err != nil {
577 return err
578 }
579 w.enti = e.Index
580 return nil
581}
582
583func (w *WAL) saveState(s *raftpb.HardState) error {
584 if raft.IsEmptyHardState(*s) {
585 return nil
586 }
587 w.state = *s
588 b := pbutil.MustMarshal(s)
589 rec := &walpb.Record{Type: stateType, Data: b}
590 return w.encoder.encode(rec)
591}
592
593func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error {
594 w.mu.Lock()
595 defer w.mu.Unlock()
596
597 // short cut, do not call sync
598 if raft.IsEmptyHardState(st) && len(ents) == 0 {
599 return nil
600 }
601
602 mustSync := raft.MustSync(st, w.state, len(ents))
603
604 // TODO(xiangli): no more reference operator
605 for i := range ents {
606 if err := w.saveEntry(&ents[i]); err != nil {
607 return err
608 }
609 }
610 if err := w.saveState(&st); err != nil {
611 return err
612 }
613
614 curOff, err := w.tail().Seek(0, io.SeekCurrent)
615 if err != nil {
616 return err
617 }
618 if curOff < SegmentSizeBytes {
619 if mustSync {
620 return w.sync()
621 }
622 return nil
623 }
624
625 return w.cut()
626}
627
628func (w *WAL) SaveSnapshot(e walpb.Snapshot) error {
629 b := pbutil.MustMarshal(&e)
630
631 w.mu.Lock()
632 defer w.mu.Unlock()
633
634 rec := &walpb.Record{Type: snapshotType, Data: b}
635 if err := w.encoder.encode(rec); err != nil {
636 return err
637 }
638 // update enti only when snapshot is ahead of last index
639 if w.enti < e.Index {
640 w.enti = e.Index
641 }
642 return w.sync()
643}
644
645func (w *WAL) saveCrc(prevCrc uint32) error {
646 return w.encoder.encode(&walpb.Record{Type: crcType, Crc: prevCrc})
647}
648
649func (w *WAL) tail() *fileutil.LockedFile {
650 if len(w.locks) > 0 {
651 return w.locks[len(w.locks)-1]
652 }
653 return nil
654}
655
656func (w *WAL) seq() uint64 {
657 t := w.tail()
658 if t == nil {
659 return 0
660 }
661 seq, _, err := parseWalName(filepath.Base(t.Name()))
662 if err != nil {
663 plog.Fatalf("bad wal name %s (%v)", t.Name(), err)
664 }
665 return seq
666}
667
668func closeAll(rcs ...io.ReadCloser) error {
669 for _, f := range rcs {
670 if err := f.Close(); err != nil {
671 return err
672 }
673 }
674 return nil
675}