[VOL-2193] Create mocks for Kafka Client and Etcd
This commit consists of:
1) A kafka client mock that implements the kafka client interface
under voltha-lib-go/pkg/kafka/client.go
2) An embedded Etcd server that runs in-process and represents an
Etcd server.
Change-Id: I52a36132568e08c596bb4136918bebcb654a3b99
diff --git a/vendor/go.etcd.io/etcd/wal/decoder.go b/vendor/go.etcd.io/etcd/wal/decoder.go
new file mode 100644
index 0000000..f2f01fd
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/wal/decoder.go
@@ -0,0 +1,188 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package wal
+
+import (
+ "bufio"
+ "encoding/binary"
+ "hash"
+ "io"
+ "sync"
+
+ "go.etcd.io/etcd/pkg/crc"
+ "go.etcd.io/etcd/pkg/pbutil"
+ "go.etcd.io/etcd/raft/raftpb"
+ "go.etcd.io/etcd/wal/walpb"
+)
+
+const minSectorSize = 512
+
+// frameSizeBytes is frame size in bytes, including record size and padding size.
+const frameSizeBytes = 8
+
+type decoder struct {
+ mu sync.Mutex
+ brs []*bufio.Reader
+
+ // lastValidOff file offset following the last valid decoded record
+ lastValidOff int64
+ crc hash.Hash32
+}
+
+func newDecoder(r ...io.Reader) *decoder {
+ readers := make([]*bufio.Reader, len(r))
+ for i := range r {
+ readers[i] = bufio.NewReader(r[i])
+ }
+ return &decoder{
+ brs: readers,
+ crc: crc.New(0, crcTable),
+ }
+}
+
+func (d *decoder) decode(rec *walpb.Record) error {
+ rec.Reset()
+ d.mu.Lock()
+ defer d.mu.Unlock()
+ return d.decodeRecord(rec)
+}
+
+func (d *decoder) decodeRecord(rec *walpb.Record) error {
+ if len(d.brs) == 0 {
+ return io.EOF
+ }
+
+ l, err := readInt64(d.brs[0])
+ if err == io.EOF || (err == nil && l == 0) {
+ // hit end of file or preallocated space
+ d.brs = d.brs[1:]
+ if len(d.brs) == 0 {
+ return io.EOF
+ }
+ d.lastValidOff = 0
+ return d.decodeRecord(rec)
+ }
+ if err != nil {
+ return err
+ }
+
+ recBytes, padBytes := decodeFrameSize(l)
+
+ data := make([]byte, recBytes+padBytes)
+ if _, err = io.ReadFull(d.brs[0], data); err != nil {
+ // ReadFull returns io.EOF only if no bytes were read
+ // the decoder should treat this as an ErrUnexpectedEOF instead.
+ if err == io.EOF {
+ err = io.ErrUnexpectedEOF
+ }
+ return err
+ }
+ if err := rec.Unmarshal(data[:recBytes]); err != nil {
+ if d.isTornEntry(data) {
+ return io.ErrUnexpectedEOF
+ }
+ return err
+ }
+
+ // skip crc checking if the record type is crcType
+ if rec.Type != crcType {
+ d.crc.Write(rec.Data)
+ if err := rec.Validate(d.crc.Sum32()); err != nil {
+ if d.isTornEntry(data) {
+ return io.ErrUnexpectedEOF
+ }
+ return err
+ }
+ }
+ // record decoded as valid; point last valid offset to end of record
+ d.lastValidOff += frameSizeBytes + recBytes + padBytes
+ return nil
+}
+
+func decodeFrameSize(lenField int64) (recBytes int64, padBytes int64) {
+ // the record size is stored in the lower 56 bits of the 64-bit length
+ recBytes = int64(uint64(lenField) & ^(uint64(0xff) << 56))
+ // non-zero padding is indicated by set MSb / a negative length
+ if lenField < 0 {
+ // padding is stored in lower 3 bits of length MSB
+ padBytes = int64((uint64(lenField) >> 56) & 0x7)
+ }
+ return recBytes, padBytes
+}
+
+// isTornEntry determines whether the last entry of the WAL was partially written
+// and corrupted because of a torn write.
+func (d *decoder) isTornEntry(data []byte) bool {
+ if len(d.brs) != 1 {
+ return false
+ }
+
+ fileOff := d.lastValidOff + frameSizeBytes
+ curOff := 0
+ chunks := [][]byte{}
+ // split data on sector boundaries
+ for curOff < len(data) {
+ chunkLen := int(minSectorSize - (fileOff % minSectorSize))
+ if chunkLen > len(data)-curOff {
+ chunkLen = len(data) - curOff
+ }
+ chunks = append(chunks, data[curOff:curOff+chunkLen])
+ fileOff += int64(chunkLen)
+ curOff += chunkLen
+ }
+
+ // if any data for a sector chunk is all 0, it's a torn write
+ for _, sect := range chunks {
+ isZero := true
+ for _, v := range sect {
+ if v != 0 {
+ isZero = false
+ break
+ }
+ }
+ if isZero {
+ return true
+ }
+ }
+ return false
+}
+
+func (d *decoder) updateCRC(prevCrc uint32) {
+ d.crc = crc.New(prevCrc, crcTable)
+}
+
+func (d *decoder) lastCRC() uint32 {
+ return d.crc.Sum32()
+}
+
+func (d *decoder) lastOffset() int64 { return d.lastValidOff }
+
+func mustUnmarshalEntry(d []byte) raftpb.Entry {
+ var e raftpb.Entry
+ pbutil.MustUnmarshal(&e, d)
+ return e
+}
+
+func mustUnmarshalState(d []byte) raftpb.HardState {
+ var s raftpb.HardState
+ pbutil.MustUnmarshal(&s, d)
+ return s
+}
+
+func readInt64(r io.Reader) (int64, error) {
+ var n int64
+ err := binary.Read(r, binary.LittleEndian, &n)
+ return n, err
+}
diff --git a/vendor/go.etcd.io/etcd/wal/doc.go b/vendor/go.etcd.io/etcd/wal/doc.go
new file mode 100644
index 0000000..7ea348e
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/wal/doc.go
@@ -0,0 +1,75 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+/*
+Package wal provides an implementation of a write ahead log that is used by
+etcd.
+
+A WAL is created at a particular directory and is made up of a number of
+segmented WAL files. Inside of each file the raft state and entries are appended
+to it with the Save method:
+
+ metadata := []byte{}
+ w, err := wal.Create(zap.NewExample(), "/var/lib/etcd", metadata)
+ ...
+ err := w.Save(s, ents)
+
+After saving a raft snapshot to disk, SaveSnapshot method should be called to
+record it. So WAL can match with the saved snapshot when restarting.
+
+ err := w.SaveSnapshot(walpb.Snapshot{Index: 10, Term: 2})
+
+When a user has finished using a WAL it must be closed:
+
+ w.Close()
+
+Each WAL file is a stream of WAL records. A WAL record is a length field and a wal record
+protobuf. The record protobuf contains a CRC, a type, and a data payload. The length field is a
+64-bit packed structure holding the length of the remaining logical record data in its lower
+56 bits and its physical padding in the first three bits of the most significant byte. Each
+record is 8-byte aligned so that the length field is never torn. The CRC contains the CRC32
+value of all record protobufs preceding the current record.
+
+WAL files are placed inside of the directory in the following format:
+$seq-$index.wal
+
+The first WAL file to be created will be 0000000000000000-0000000000000000.wal
+indicating an initial sequence of 0 and an initial raft index of 0. The first
+entry written to WAL MUST have raft index 0.
+
+WAL will cut its current tail wal file if its size exceeds 64MB. This will increment an internal
+sequence number and cause a new file to be created. If the last raft index saved
+was 0x20 and this is the first time cut has been called on this WAL then the sequence will
+increment from 0x0 to 0x1. The new file will be: 0000000000000001-0000000000000021.wal.
+If a second cut issues 0x10 entries with incremental index later then the file will be called:
+0000000000000002-0000000000000031.wal.
+
+At a later time a WAL can be opened at a particular snapshot. If there is no
+snapshot, an empty snapshot should be passed in.
+
+ w, err := wal.Open("/var/lib/etcd", walpb.Snapshot{Index: 10, Term: 2})
+ ...
+
+The snapshot must have been written to the WAL.
+
+Additional items cannot be Saved to this WAL until all of the items from the given
+snapshot to the end of the WAL are read first:
+
+ metadata, state, ents, err := w.ReadAll()
+
+This will give you the metadata, the last raft.State and the slice of
+raft.Entry items in the log.
+
+*/
+package wal
diff --git a/vendor/go.etcd.io/etcd/wal/encoder.go b/vendor/go.etcd.io/etcd/wal/encoder.go
new file mode 100644
index 0000000..d3877ed
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/wal/encoder.go
@@ -0,0 +1,120 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package wal
+
+import (
+ "encoding/binary"
+ "hash"
+ "io"
+ "os"
+ "sync"
+
+ "go.etcd.io/etcd/pkg/crc"
+ "go.etcd.io/etcd/pkg/ioutil"
+ "go.etcd.io/etcd/wal/walpb"
+)
+
+// walPageBytes is the alignment for flushing records to the backing Writer.
+// It should be a multiple of the minimum sector size so that WAL can safely
+// distinguish between torn writes and ordinary data corruption.
+const walPageBytes = 8 * minSectorSize
+
+type encoder struct {
+ mu sync.Mutex
+ bw *ioutil.PageWriter
+
+ crc hash.Hash32
+ buf []byte
+ uint64buf []byte
+}
+
+func newEncoder(w io.Writer, prevCrc uint32, pageOffset int) *encoder {
+ return &encoder{
+ bw: ioutil.NewPageWriter(w, walPageBytes, pageOffset),
+ crc: crc.New(prevCrc, crcTable),
+ // 1MB buffer
+ buf: make([]byte, 1024*1024),
+ uint64buf: make([]byte, 8),
+ }
+}
+
+// newFileEncoder creates a new encoder with current file offset for the page writer.
+func newFileEncoder(f *os.File, prevCrc uint32) (*encoder, error) {
+ offset, err := f.Seek(0, io.SeekCurrent)
+ if err != nil {
+ return nil, err
+ }
+ return newEncoder(f, prevCrc, int(offset)), nil
+}
+
+func (e *encoder) encode(rec *walpb.Record) error {
+ e.mu.Lock()
+ defer e.mu.Unlock()
+
+ e.crc.Write(rec.Data)
+ rec.Crc = e.crc.Sum32()
+ var (
+ data []byte
+ err error
+ n int
+ )
+
+ if rec.Size() > len(e.buf) {
+ data, err = rec.Marshal()
+ if err != nil {
+ return err
+ }
+ } else {
+ n, err = rec.MarshalTo(e.buf)
+ if err != nil {
+ return err
+ }
+ data = e.buf[:n]
+ }
+
+ lenField, padBytes := encodeFrameSize(len(data))
+ if err = writeUint64(e.bw, lenField, e.uint64buf); err != nil {
+ return err
+ }
+
+ if padBytes != 0 {
+ data = append(data, make([]byte, padBytes)...)
+ }
+ _, err = e.bw.Write(data)
+ return err
+}
+
+func encodeFrameSize(dataBytes int) (lenField uint64, padBytes int) {
+ lenField = uint64(dataBytes)
+ // force 8 byte alignment so length never gets a torn write
+ padBytes = (8 - (dataBytes % 8)) % 8
+ if padBytes != 0 {
+ lenField |= uint64(0x80|padBytes) << 56
+ }
+ return lenField, padBytes
+}
+
+func (e *encoder) flush() error {
+ e.mu.Lock()
+ defer e.mu.Unlock()
+ return e.bw.Flush()
+}
+
+func writeUint64(w io.Writer, n uint64, buf []byte) error {
+ // http://golang.org/src/encoding/binary/binary.go
+ binary.LittleEndian.PutUint64(buf, n)
+ _, err := w.Write(buf)
+ return err
+}
diff --git a/vendor/go.etcd.io/etcd/wal/file_pipeline.go b/vendor/go.etcd.io/etcd/wal/file_pipeline.go
new file mode 100644
index 0000000..e1e1c55
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/wal/file_pipeline.go
@@ -0,0 +1,106 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package wal
+
+import (
+ "fmt"
+ "os"
+ "path/filepath"
+
+ "go.etcd.io/etcd/pkg/fileutil"
+
+ "go.uber.org/zap"
+)
+
+// filePipeline pipelines allocating disk space
+type filePipeline struct {
+ lg *zap.Logger
+
+ // dir to put files
+ dir string
+ // size of files to make, in bytes
+ size int64
+ // count number of files generated
+ count int
+
+ filec chan *fileutil.LockedFile
+ errc chan error
+ donec chan struct{}
+}
+
+func newFilePipeline(lg *zap.Logger, dir string, fileSize int64) *filePipeline {
+ fp := &filePipeline{
+ lg: lg,
+ dir: dir,
+ size: fileSize,
+ filec: make(chan *fileutil.LockedFile),
+ errc: make(chan error, 1),
+ donec: make(chan struct{}),
+ }
+ go fp.run()
+ return fp
+}
+
+// Open returns a fresh file for writing. Rename the file before calling
+// Open again or there will be file collisions.
+func (fp *filePipeline) Open() (f *fileutil.LockedFile, err error) {
+ select {
+ case f = <-fp.filec:
+ case err = <-fp.errc:
+ }
+ return f, err
+}
+
+func (fp *filePipeline) Close() error {
+ close(fp.donec)
+ return <-fp.errc
+}
+
+func (fp *filePipeline) alloc() (f *fileutil.LockedFile, err error) {
+ // count % 2 so this file isn't the same as the one last published
+ fpath := filepath.Join(fp.dir, fmt.Sprintf("%d.tmp", fp.count%2))
+ if f, err = fileutil.LockFile(fpath, os.O_CREATE|os.O_WRONLY, fileutil.PrivateFileMode); err != nil {
+ return nil, err
+ }
+ if err = fileutil.Preallocate(f.File, fp.size, true); err != nil {
+ if fp.lg != nil {
+ fp.lg.Warn("failed to preallocate space when creating a new WAL", zap.Int64("size", fp.size), zap.Error(err))
+ } else {
+ plog.Errorf("failed to allocate space when creating new wal file (%v)", err)
+ }
+ f.Close()
+ return nil, err
+ }
+ fp.count++
+ return f, nil
+}
+
+func (fp *filePipeline) run() {
+ defer close(fp.errc)
+ for {
+ f, err := fp.alloc()
+ if err != nil {
+ fp.errc <- err
+ return
+ }
+ select {
+ case fp.filec <- f:
+ case <-fp.donec:
+ os.Remove(f.Name())
+ f.Close()
+ return
+ }
+ }
+}
diff --git a/vendor/go.etcd.io/etcd/wal/metrics.go b/vendor/go.etcd.io/etcd/wal/metrics.go
new file mode 100644
index 0000000..22cb800
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/wal/metrics.go
@@ -0,0 +1,34 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package wal
+
+import "github.com/prometheus/client_golang/prometheus"
+
+var (
+ walFsyncSec = prometheus.NewHistogram(prometheus.HistogramOpts{
+ Namespace: "etcd",
+ Subsystem: "disk",
+ Name: "wal_fsync_duration_seconds",
+ Help: "The latency distributions of fsync called by WAL.",
+
+ // lowest bucket start of upper bound 0.001 sec (1 ms) with factor 2
+ // highest bucket start of 0.001 sec * 2^13 == 8.192 sec
+ Buckets: prometheus.ExponentialBuckets(0.001, 2, 14),
+ })
+)
+
+func init() {
+ prometheus.MustRegister(walFsyncSec)
+}
diff --git a/vendor/go.etcd.io/etcd/wal/repair.go b/vendor/go.etcd.io/etcd/wal/repair.go
new file mode 100644
index 0000000..15afed0
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/wal/repair.go
@@ -0,0 +1,141 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package wal
+
+import (
+ "io"
+ "os"
+ "path/filepath"
+
+ "go.etcd.io/etcd/pkg/fileutil"
+ "go.etcd.io/etcd/wal/walpb"
+
+ "go.uber.org/zap"
+)
+
+// Repair tries to repair ErrUnexpectedEOF in the
+// last wal file by truncating.
+func Repair(lg *zap.Logger, dirpath string) bool {
+ f, err := openLast(lg, dirpath)
+ if err != nil {
+ return false
+ }
+ defer f.Close()
+
+ if lg != nil {
+ lg.Info("repairing", zap.String("path", f.Name()))
+ } else {
+ plog.Noticef("repairing %v", f.Name())
+ }
+
+ rec := &walpb.Record{}
+ decoder := newDecoder(f)
+ for {
+ lastOffset := decoder.lastOffset()
+ err := decoder.decode(rec)
+ switch err {
+ case nil:
+ // update crc of the decoder when necessary
+ switch rec.Type {
+ case crcType:
+ crc := decoder.crc.Sum32()
+ // current crc of decoder must match the crc of the record.
+ // do no need to match 0 crc, since the decoder is a new one at this case.
+ if crc != 0 && rec.Validate(crc) != nil {
+ return false
+ }
+ decoder.updateCRC(rec.Crc)
+ }
+ continue
+
+ case io.EOF:
+ if lg != nil {
+ lg.Info("repaired", zap.String("path", f.Name()), zap.Error(io.EOF))
+ }
+ return true
+
+ case io.ErrUnexpectedEOF:
+ bf, bferr := os.Create(f.Name() + ".broken")
+ if bferr != nil {
+ if lg != nil {
+ lg.Warn("failed to create backup file", zap.String("path", f.Name()+".broken"), zap.Error(bferr))
+ } else {
+ plog.Errorf("could not repair %v, failed to create backup file", f.Name())
+ }
+ return false
+ }
+ defer bf.Close()
+
+ if _, err = f.Seek(0, io.SeekStart); err != nil {
+ if lg != nil {
+ lg.Warn("failed to read file", zap.String("path", f.Name()), zap.Error(err))
+ } else {
+ plog.Errorf("could not repair %v, failed to read file", f.Name())
+ }
+ return false
+ }
+
+ if _, err = io.Copy(bf, f); err != nil {
+ if lg != nil {
+ lg.Warn("failed to copy", zap.String("from", f.Name()+".broken"), zap.String("to", f.Name()), zap.Error(err))
+ } else {
+ plog.Errorf("could not repair %v, failed to copy file", f.Name())
+ }
+ return false
+ }
+
+ if err = f.Truncate(lastOffset); err != nil {
+ if lg != nil {
+ lg.Warn("failed to truncate", zap.String("path", f.Name()), zap.Error(err))
+ } else {
+ plog.Errorf("could not repair %v, failed to truncate file", f.Name())
+ }
+ return false
+ }
+
+ if err = fileutil.Fsync(f.File); err != nil {
+ if lg != nil {
+ lg.Warn("failed to fsync", zap.String("path", f.Name()), zap.Error(err))
+ } else {
+ plog.Errorf("could not repair %v, failed to sync file", f.Name())
+ }
+ return false
+ }
+
+ if lg != nil {
+ lg.Info("repaired", zap.String("path", f.Name()), zap.Error(io.ErrUnexpectedEOF))
+ }
+ return true
+
+ default:
+ if lg != nil {
+ lg.Warn("failed to repair", zap.String("path", f.Name()), zap.Error(err))
+ } else {
+ plog.Errorf("could not repair error (%v)", err)
+ }
+ return false
+ }
+ }
+}
+
+// openLast opens the last wal file for read and write.
+func openLast(lg *zap.Logger, dirpath string) (*fileutil.LockedFile, error) {
+ names, err := readWALNames(lg, dirpath)
+ if err != nil {
+ return nil, err
+ }
+ last := filepath.Join(dirpath, names[len(names)-1])
+ return fileutil.LockFile(last, os.O_RDWR, fileutil.PrivateFileMode)
+}
diff --git a/vendor/go.etcd.io/etcd/wal/util.go b/vendor/go.etcd.io/etcd/wal/util.go
new file mode 100644
index 0000000..a3f314b
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/wal/util.go
@@ -0,0 +1,124 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package wal
+
+import (
+ "errors"
+ "fmt"
+ "strings"
+
+ "go.etcd.io/etcd/pkg/fileutil"
+
+ "go.uber.org/zap"
+)
+
+var errBadWALName = errors.New("bad wal name")
+
+// Exist returns true if there are any files in a given directory.
+func Exist(dir string) bool {
+ names, err := fileutil.ReadDir(dir, fileutil.WithExt(".wal"))
+ if err != nil {
+ return false
+ }
+ return len(names) != 0
+}
+
+// searchIndex returns the last array index of names whose raft index section is
+// equal to or smaller than the given index.
+// The given names MUST be sorted.
+func searchIndex(lg *zap.Logger, names []string, index uint64) (int, bool) {
+ for i := len(names) - 1; i >= 0; i-- {
+ name := names[i]
+ _, curIndex, err := parseWALName(name)
+ if err != nil {
+ if lg != nil {
+ lg.Panic("failed to parse WAL file name", zap.String("path", name), zap.Error(err))
+ } else {
+ plog.Panicf("parse correct name should never fail: %v", err)
+ }
+ }
+ if index >= curIndex {
+ return i, true
+ }
+ }
+ return -1, false
+}
+
+// names should have been sorted based on sequence number.
+// isValidSeq checks whether seq increases continuously.
+func isValidSeq(lg *zap.Logger, names []string) bool {
+ var lastSeq uint64
+ for _, name := range names {
+ curSeq, _, err := parseWALName(name)
+ if err != nil {
+ if lg != nil {
+ lg.Panic("failed to parse WAL file name", zap.String("path", name), zap.Error(err))
+ } else {
+ plog.Panicf("parse correct name should never fail: %v", err)
+ }
+ }
+ if lastSeq != 0 && lastSeq != curSeq-1 {
+ return false
+ }
+ lastSeq = curSeq
+ }
+ return true
+}
+
+func readWALNames(lg *zap.Logger, dirpath string) ([]string, error) {
+ names, err := fileutil.ReadDir(dirpath)
+ if err != nil {
+ return nil, err
+ }
+ wnames := checkWalNames(lg, names)
+ if len(wnames) == 0 {
+ return nil, ErrFileNotFound
+ }
+ return wnames, nil
+}
+
+func checkWalNames(lg *zap.Logger, names []string) []string {
+ wnames := make([]string, 0)
+ for _, name := range names {
+ if _, _, err := parseWALName(name); err != nil {
+ // don't complain about left over tmp files
+ if !strings.HasSuffix(name, ".tmp") {
+ if lg != nil {
+ lg.Warn(
+ "ignored file in WAL directory",
+ zap.String("path", name),
+ )
+ } else {
+ plog.Warningf("ignored file %v in wal", name)
+ }
+ }
+ continue
+ }
+ wnames = append(wnames, name)
+ }
+ return wnames
+}
+
+func parseWALName(str string) (seq, index uint64, err error) {
+ if !strings.HasSuffix(str, ".wal") {
+ return 0, 0, errBadWALName
+ }
+ _, err = fmt.Sscanf(str, "%016x-%016x.wal", &seq, &index)
+ return seq, index, err
+}
+
+func walName(seq, index uint64) string {
+ return fmt.Sprintf("%016x-%016x.wal", seq, index)
+}
diff --git a/vendor/go.etcd.io/etcd/wal/wal.go b/vendor/go.etcd.io/etcd/wal/wal.go
new file mode 100644
index 0000000..e177c8e
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/wal/wal.go
@@ -0,0 +1,912 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package wal
+
+import (
+ "bytes"
+ "errors"
+ "fmt"
+ "hash/crc32"
+ "io"
+ "os"
+ "path/filepath"
+ "sync"
+ "time"
+
+ "go.etcd.io/etcd/pkg/fileutil"
+ "go.etcd.io/etcd/pkg/pbutil"
+ "go.etcd.io/etcd/raft"
+ "go.etcd.io/etcd/raft/raftpb"
+ "go.etcd.io/etcd/wal/walpb"
+
+ "github.com/coreos/pkg/capnslog"
+ "go.uber.org/zap"
+)
+
+const (
+ metadataType int64 = iota + 1
+ entryType
+ stateType
+ crcType
+ snapshotType
+
+ // warnSyncDuration is the amount of time allotted to an fsync before
+ // logging a warning
+ warnSyncDuration = time.Second
+)
+
+var (
+ // SegmentSizeBytes is the preallocated size of each wal segment file.
+ // The actual size might be larger than this. In general, the default
+ // value should be used, but this is defined as an exported variable
+ // so that tests can set a different segment size.
+ SegmentSizeBytes int64 = 64 * 1000 * 1000 // 64MB
+
+ plog = capnslog.NewPackageLogger("go.etcd.io/etcd", "wal")
+
+ ErrMetadataConflict = errors.New("wal: conflicting metadata found")
+ ErrFileNotFound = errors.New("wal: file not found")
+ ErrCRCMismatch = errors.New("wal: crc mismatch")
+ ErrSnapshotMismatch = errors.New("wal: snapshot mismatch")
+ ErrSnapshotNotFound = errors.New("wal: snapshot not found")
+ crcTable = crc32.MakeTable(crc32.Castagnoli)
+)
+
+// WAL is a logical representation of the stable storage.
+// WAL is either in read mode or append mode but not both.
+// A newly created WAL is in append mode, and ready for appending records.
+// A just opened WAL is in read mode, and ready for reading records.
+// The WAL will be ready for appending after reading out all the previous records.
+type WAL struct {
+ lg *zap.Logger
+
+ dir string // the living directory of the underlay files
+
+ // dirFile is a fd for the wal directory for syncing on Rename
+ dirFile *os.File
+
+ metadata []byte // metadata recorded at the head of each WAL
+ state raftpb.HardState // hardstate recorded at the head of WAL
+
+ start walpb.Snapshot // snapshot to start reading
+ decoder *decoder // decoder to decode records
+ readClose func() error // closer for decode reader
+
+ mu sync.Mutex
+ enti uint64 // index of the last entry saved to the wal
+ encoder *encoder // encoder to encode records
+
+ locks []*fileutil.LockedFile // the locked files the WAL holds (the name is increasing)
+ fp *filePipeline
+}
+
+// Create creates a WAL ready for appending records. The given metadata is
+// recorded at the head of each WAL file, and can be retrieved with ReadAll.
+func Create(lg *zap.Logger, dirpath string, metadata []byte) (*WAL, error) {
+ if Exist(dirpath) {
+ return nil, os.ErrExist
+ }
+
+ // keep temporary wal directory so WAL initialization appears atomic
+ tmpdirpath := filepath.Clean(dirpath) + ".tmp"
+ if fileutil.Exist(tmpdirpath) {
+ if err := os.RemoveAll(tmpdirpath); err != nil {
+ return nil, err
+ }
+ }
+ if err := fileutil.CreateDirAll(tmpdirpath); err != nil {
+ if lg != nil {
+ lg.Warn(
+ "failed to create a temporary WAL directory",
+ zap.String("tmp-dir-path", tmpdirpath),
+ zap.String("dir-path", dirpath),
+ zap.Error(err),
+ )
+ }
+ return nil, err
+ }
+
+ p := filepath.Join(tmpdirpath, walName(0, 0))
+ f, err := fileutil.LockFile(p, os.O_WRONLY|os.O_CREATE, fileutil.PrivateFileMode)
+ if err != nil {
+ if lg != nil {
+ lg.Warn(
+ "failed to flock an initial WAL file",
+ zap.String("path", p),
+ zap.Error(err),
+ )
+ }
+ return nil, err
+ }
+ if _, err = f.Seek(0, io.SeekEnd); err != nil {
+ if lg != nil {
+ lg.Warn(
+ "failed to seek an initial WAL file",
+ zap.String("path", p),
+ zap.Error(err),
+ )
+ }
+ return nil, err
+ }
+ if err = fileutil.Preallocate(f.File, SegmentSizeBytes, true); err != nil {
+ if lg != nil {
+ lg.Warn(
+ "failed to preallocate an initial WAL file",
+ zap.String("path", p),
+ zap.Int64("segment-bytes", SegmentSizeBytes),
+ zap.Error(err),
+ )
+ }
+ return nil, err
+ }
+
+ w := &WAL{
+ lg: lg,
+ dir: dirpath,
+ metadata: metadata,
+ }
+ w.encoder, err = newFileEncoder(f.File, 0)
+ if err != nil {
+ return nil, err
+ }
+ w.locks = append(w.locks, f)
+ if err = w.saveCrc(0); err != nil {
+ return nil, err
+ }
+ if err = w.encoder.encode(&walpb.Record{Type: metadataType, Data: metadata}); err != nil {
+ return nil, err
+ }
+ if err = w.SaveSnapshot(walpb.Snapshot{}); err != nil {
+ return nil, err
+ }
+
+ if w, err = w.renameWAL(tmpdirpath); err != nil {
+ if lg != nil {
+ lg.Warn(
+ "failed to rename the temporary WAL directory",
+ zap.String("tmp-dir-path", tmpdirpath),
+ zap.String("dir-path", w.dir),
+ zap.Error(err),
+ )
+ }
+ return nil, err
+ }
+
+ var perr error
+ defer func() {
+ if perr != nil {
+ w.cleanupWAL(lg)
+ }
+ }()
+
+ // directory was renamed; sync parent dir to persist rename
+ pdir, perr := fileutil.OpenDir(filepath.Dir(w.dir))
+ if perr != nil {
+ if lg != nil {
+ lg.Warn(
+ "failed to open the parent data directory",
+ zap.String("parent-dir-path", filepath.Dir(w.dir)),
+ zap.String("dir-path", w.dir),
+ zap.Error(perr),
+ )
+ }
+ return nil, perr
+ }
+ if perr = fileutil.Fsync(pdir); perr != nil {
+ if lg != nil {
+ lg.Warn(
+ "failed to fsync the parent data directory file",
+ zap.String("parent-dir-path", filepath.Dir(w.dir)),
+ zap.String("dir-path", w.dir),
+ zap.Error(perr),
+ )
+ }
+ return nil, perr
+ }
+ if perr = pdir.Close(); perr != nil {
+ if lg != nil {
+ lg.Warn(
+ "failed to close the parent data directory file",
+ zap.String("parent-dir-path", filepath.Dir(w.dir)),
+ zap.String("dir-path", w.dir),
+ zap.Error(perr),
+ )
+ }
+ return nil, perr
+ }
+
+ return w, nil
+}
+
+func (w *WAL) cleanupWAL(lg *zap.Logger) {
+ var err error
+ if err = w.Close(); err != nil {
+ if lg != nil {
+ lg.Panic("failed to close WAL during cleanup", zap.Error(err))
+ } else {
+ plog.Panicf("failed to close WAL during cleanup: %v", err)
+ }
+ }
+ brokenDirName := fmt.Sprintf("%s.broken.%v", w.dir, time.Now().Format("20060102.150405.999999"))
+ if err = os.Rename(w.dir, brokenDirName); err != nil {
+ if lg != nil {
+ lg.Panic(
+ "failed to rename WAL during cleanup",
+ zap.Error(err),
+ zap.String("source-path", w.dir),
+ zap.String("rename-path", brokenDirName),
+ )
+ } else {
+ plog.Panicf("failed to rename WAL during cleanup: %v", err)
+ }
+ }
+}
+
+func (w *WAL) renameWAL(tmpdirpath string) (*WAL, error) {
+ if err := os.RemoveAll(w.dir); err != nil {
+ return nil, err
+ }
+ // On non-Windows platforms, hold the lock while renaming. Releasing
+ // the lock and trying to reacquire it quickly can be flaky because
+ // it's possible the process will fork to spawn a process while this is
+ // happening. The fds are set up as close-on-exec by the Go runtime,
+ // but there is a window between the fork and the exec where another
+ // process holds the lock.
+ if err := os.Rename(tmpdirpath, w.dir); err != nil {
+ if _, ok := err.(*os.LinkError); ok {
+ return w.renameWALUnlock(tmpdirpath)
+ }
+ return nil, err
+ }
+ w.fp = newFilePipeline(w.lg, w.dir, SegmentSizeBytes)
+ df, err := fileutil.OpenDir(w.dir)
+ w.dirFile = df
+ return w, err
+}
+
+func (w *WAL) renameWALUnlock(tmpdirpath string) (*WAL, error) {
+ // rename of directory with locked files doesn't work on windows/cifs;
+ // close the WAL to release the locks so the directory can be renamed.
+ if w.lg != nil {
+ w.lg.Info(
+ "closing WAL to release flock and retry directory renaming",
+ zap.String("from", tmpdirpath),
+ zap.String("to", w.dir),
+ )
+ } else {
+ plog.Infof("releasing file lock to rename %q to %q", tmpdirpath, w.dir)
+ }
+ w.Close()
+
+ if err := os.Rename(tmpdirpath, w.dir); err != nil {
+ return nil, err
+ }
+
+ // reopen and relock
+ newWAL, oerr := Open(w.lg, w.dir, walpb.Snapshot{})
+ if oerr != nil {
+ return nil, oerr
+ }
+ if _, _, _, err := newWAL.ReadAll(); err != nil {
+ newWAL.Close()
+ return nil, err
+ }
+ return newWAL, nil
+}
+
+// Open opens the WAL at the given snap.
+// The snap SHOULD have been previously saved to the WAL, or the following
+// ReadAll will fail.
+// The returned WAL is ready to read and the first record will be the one after
+// the given snap. The WAL cannot be appended to before reading out all of its
+// previous records.
+func Open(lg *zap.Logger, dirpath string, snap walpb.Snapshot) (*WAL, error) {
+ w, err := openAtIndex(lg, dirpath, snap, true)
+ if err != nil {
+ return nil, err
+ }
+ if w.dirFile, err = fileutil.OpenDir(w.dir); err != nil {
+ return nil, err
+ }
+ return w, nil
+}
+
+// OpenForRead only opens the wal files for read.
+// Write on a read only wal panics.
+func OpenForRead(lg *zap.Logger, dirpath string, snap walpb.Snapshot) (*WAL, error) {
+ return openAtIndex(lg, dirpath, snap, false)
+}
+
+func openAtIndex(lg *zap.Logger, dirpath string, snap walpb.Snapshot, write bool) (*WAL, error) {
+ names, nameIndex, err := selectWALFiles(lg, dirpath, snap)
+ if err != nil {
+ return nil, err
+ }
+
+ rs, ls, closer, err := openWALFiles(lg, dirpath, names, nameIndex, write)
+ if err != nil {
+ return nil, err
+ }
+
+ // create a WAL ready for reading
+ w := &WAL{
+ lg: lg,
+ dir: dirpath,
+ start: snap,
+ decoder: newDecoder(rs...),
+ readClose: closer,
+ locks: ls,
+ }
+
+ if write {
+ // write reuses the file descriptors from read; don't close so
+ // WAL can append without dropping the file lock
+ w.readClose = nil
+ if _, _, err := parseWALName(filepath.Base(w.tail().Name())); err != nil {
+ closer()
+ return nil, err
+ }
+ w.fp = newFilePipeline(lg, w.dir, SegmentSizeBytes)
+ }
+
+ return w, nil
+}
+
+func selectWALFiles(lg *zap.Logger, dirpath string, snap walpb.Snapshot) ([]string, int, error) {
+ names, err := readWALNames(lg, dirpath)
+ if err != nil {
+ return nil, -1, err
+ }
+
+ nameIndex, ok := searchIndex(lg, names, snap.Index)
+ if !ok || !isValidSeq(lg, names[nameIndex:]) {
+ err = ErrFileNotFound
+ return nil, -1, err
+ }
+
+ return names, nameIndex, nil
+}
+
+func openWALFiles(lg *zap.Logger, dirpath string, names []string, nameIndex int, write bool) ([]io.Reader, []*fileutil.LockedFile, func() error, error) {
+ rcs := make([]io.ReadCloser, 0)
+ rs := make([]io.Reader, 0)
+ ls := make([]*fileutil.LockedFile, 0)
+ for _, name := range names[nameIndex:] {
+ p := filepath.Join(dirpath, name)
+ if write {
+ l, err := fileutil.TryLockFile(p, os.O_RDWR, fileutil.PrivateFileMode)
+ if err != nil {
+ closeAll(rcs...)
+ return nil, nil, nil, err
+ }
+ ls = append(ls, l)
+ rcs = append(rcs, l)
+ } else {
+ rf, err := os.OpenFile(p, os.O_RDONLY, fileutil.PrivateFileMode)
+ if err != nil {
+ closeAll(rcs...)
+ return nil, nil, nil, err
+ }
+ ls = append(ls, nil)
+ rcs = append(rcs, rf)
+ }
+ rs = append(rs, rcs[len(rcs)-1])
+ }
+
+ closer := func() error { return closeAll(rcs...) }
+
+ return rs, ls, closer, nil
+}
+
+// ReadAll reads out records of the current WAL.
+// If opened in write mode, it must read out all records until EOF. Or an error
+// will be returned.
+// If opened in read mode, it will try to read all records if possible.
+// If it cannot read out the expected snap, it will return ErrSnapshotNotFound.
+// If loaded snap doesn't match with the expected one, it will return
+// all the records and error ErrSnapshotMismatch.
+// TODO: detect not-last-snap error.
+// TODO: maybe loose the checking of match.
+// After ReadAll, the WAL will be ready for appending new records.
+func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.Entry, err error) {
+ w.mu.Lock()
+ defer w.mu.Unlock()
+
+ rec := &walpb.Record{}
+ decoder := w.decoder
+
+ var match bool
+ for err = decoder.decode(rec); err == nil; err = decoder.decode(rec) {
+ switch rec.Type {
+ case entryType:
+ e := mustUnmarshalEntry(rec.Data)
+ if e.Index > w.start.Index {
+ ents = append(ents[:e.Index-w.start.Index-1], e)
+ }
+ w.enti = e.Index
+
+ case stateType:
+ state = mustUnmarshalState(rec.Data)
+
+ case metadataType:
+ if metadata != nil && !bytes.Equal(metadata, rec.Data) {
+ state.Reset()
+ return nil, state, nil, ErrMetadataConflict
+ }
+ metadata = rec.Data
+
+ case crcType:
+ crc := decoder.crc.Sum32()
+ // current crc of decoder must match the crc of the record.
+ // do no need to match 0 crc, since the decoder is a new one at this case.
+ if crc != 0 && rec.Validate(crc) != nil {
+ state.Reset()
+ return nil, state, nil, ErrCRCMismatch
+ }
+ decoder.updateCRC(rec.Crc)
+
+ case snapshotType:
+ var snap walpb.Snapshot
+ pbutil.MustUnmarshal(&snap, rec.Data)
+ if snap.Index == w.start.Index {
+ if snap.Term != w.start.Term {
+ state.Reset()
+ return nil, state, nil, ErrSnapshotMismatch
+ }
+ match = true
+ }
+
+ default:
+ state.Reset()
+ return nil, state, nil, fmt.Errorf("unexpected block type %d", rec.Type)
+ }
+ }
+
+ switch w.tail() {
+ case nil:
+ // We do not have to read out all entries in read mode.
+ // The last record maybe a partial written one, so
+ // ErrunexpectedEOF might be returned.
+ if err != io.EOF && err != io.ErrUnexpectedEOF {
+ state.Reset()
+ return nil, state, nil, err
+ }
+ default:
+ // We must read all of the entries if WAL is opened in write mode.
+ if err != io.EOF {
+ state.Reset()
+ return nil, state, nil, err
+ }
+ // decodeRecord() will return io.EOF if it detects a zero record,
+ // but this zero record may be followed by non-zero records from
+ // a torn write. Overwriting some of these non-zero records, but
+ // not all, will cause CRC errors on WAL open. Since the records
+ // were never fully synced to disk in the first place, it's safe
+ // to zero them out to avoid any CRC errors from new writes.
+ if _, err = w.tail().Seek(w.decoder.lastOffset(), io.SeekStart); err != nil {
+ return nil, state, nil, err
+ }
+ if err = fileutil.ZeroToEnd(w.tail().File); err != nil {
+ return nil, state, nil, err
+ }
+ }
+
+ err = nil
+ if !match {
+ err = ErrSnapshotNotFound
+ }
+
+ // close decoder, disable reading
+ if w.readClose != nil {
+ w.readClose()
+ w.readClose = nil
+ }
+ w.start = walpb.Snapshot{}
+
+ w.metadata = metadata
+
+ if w.tail() != nil {
+ // create encoder (chain crc with the decoder), enable appending
+ w.encoder, err = newFileEncoder(w.tail().File, w.decoder.lastCRC())
+ if err != nil {
+ return
+ }
+ }
+ w.decoder = nil
+
+ return metadata, state, ents, err
+}
+
+// Verify reads through the given WAL and verifies that it is not corrupted.
+// It creates a new decoder to read through the records of the given WAL.
+// It does not conflict with any open WAL, but it is recommended not to
+// call this function after opening the WAL for writing.
+// If it cannot read out the expected snap, it will return ErrSnapshotNotFound.
+// If the loaded snap doesn't match with the expected one, it will
+// return error ErrSnapshotMismatch.
+func Verify(lg *zap.Logger, walDir string, snap walpb.Snapshot) error {
+ var metadata []byte
+ var err error
+ var match bool
+
+ rec := &walpb.Record{}
+
+ names, nameIndex, err := selectWALFiles(lg, walDir, snap)
+ if err != nil {
+ return err
+ }
+
+ // open wal files in read mode, so that there is no conflict
+ // when the same WAL is opened elsewhere in write mode
+ rs, _, closer, err := openWALFiles(lg, walDir, names, nameIndex, false)
+ if err != nil {
+ return err
+ }
+ defer func() {
+ if closer != nil {
+ closer()
+ }
+ }()
+
+ // create a new decoder from the readers on the WAL files
+ decoder := newDecoder(rs...)
+
+ for err = decoder.decode(rec); err == nil; err = decoder.decode(rec) {
+ switch rec.Type {
+ case metadataType:
+ if metadata != nil && !bytes.Equal(metadata, rec.Data) {
+ return ErrMetadataConflict
+ }
+ metadata = rec.Data
+ case crcType:
+ crc := decoder.crc.Sum32()
+ // Current crc of decoder must match the crc of the record.
+ // We need not match 0 crc, since the decoder is a new one at this point.
+ if crc != 0 && rec.Validate(crc) != nil {
+ return ErrCRCMismatch
+ }
+ decoder.updateCRC(rec.Crc)
+ case snapshotType:
+ var loadedSnap walpb.Snapshot
+ pbutil.MustUnmarshal(&loadedSnap, rec.Data)
+ if loadedSnap.Index == snap.Index {
+ if loadedSnap.Term != snap.Term {
+ return ErrSnapshotMismatch
+ }
+ match = true
+ }
+ // We ignore all entry and state type records as these
+ // are not necessary for validating the WAL contents
+ case entryType:
+ case stateType:
+ default:
+ return fmt.Errorf("unexpected block type %d", rec.Type)
+ }
+ }
+
+ // We do not have to read out all the WAL entries
+ // as the decoder is opened in read mode.
+ if err != io.EOF && err != io.ErrUnexpectedEOF {
+ return err
+ }
+
+ if !match {
+ return ErrSnapshotNotFound
+ }
+
+ return nil
+}
+
+// cut closes current file written and creates a new one ready to append.
+// cut first creates a temp wal file and writes necessary headers into it.
+// Then cut atomically rename temp wal file to a wal file.
+func (w *WAL) cut() error {
+ // close old wal file; truncate to avoid wasting space if an early cut
+ off, serr := w.tail().Seek(0, io.SeekCurrent)
+ if serr != nil {
+ return serr
+ }
+
+ if err := w.tail().Truncate(off); err != nil {
+ return err
+ }
+
+ if err := w.sync(); err != nil {
+ return err
+ }
+
+ fpath := filepath.Join(w.dir, walName(w.seq()+1, w.enti+1))
+
+ // create a temp wal file with name sequence + 1, or truncate the existing one
+ newTail, err := w.fp.Open()
+ if err != nil {
+ return err
+ }
+
+ // update writer and save the previous crc
+ w.locks = append(w.locks, newTail)
+ prevCrc := w.encoder.crc.Sum32()
+ w.encoder, err = newFileEncoder(w.tail().File, prevCrc)
+ if err != nil {
+ return err
+ }
+
+ if err = w.saveCrc(prevCrc); err != nil {
+ return err
+ }
+
+ if err = w.encoder.encode(&walpb.Record{Type: metadataType, Data: w.metadata}); err != nil {
+ return err
+ }
+
+ if err = w.saveState(&w.state); err != nil {
+ return err
+ }
+
+ // atomically move temp wal file to wal file
+ if err = w.sync(); err != nil {
+ return err
+ }
+
+ off, err = w.tail().Seek(0, io.SeekCurrent)
+ if err != nil {
+ return err
+ }
+
+ if err = os.Rename(newTail.Name(), fpath); err != nil {
+ return err
+ }
+ if err = fileutil.Fsync(w.dirFile); err != nil {
+ return err
+ }
+
+ // reopen newTail with its new path so calls to Name() match the wal filename format
+ newTail.Close()
+
+ if newTail, err = fileutil.LockFile(fpath, os.O_WRONLY, fileutil.PrivateFileMode); err != nil {
+ return err
+ }
+ if _, err = newTail.Seek(off, io.SeekStart); err != nil {
+ return err
+ }
+
+ w.locks[len(w.locks)-1] = newTail
+
+ prevCrc = w.encoder.crc.Sum32()
+ w.encoder, err = newFileEncoder(w.tail().File, prevCrc)
+ if err != nil {
+ return err
+ }
+
+ if w.lg != nil {
+ w.lg.Info("created a new WAL segment", zap.String("path", fpath))
+ } else {
+ plog.Infof("segmented wal file %v is created", fpath)
+ }
+ return nil
+}
+
+func (w *WAL) sync() error {
+ if w.encoder != nil {
+ if err := w.encoder.flush(); err != nil {
+ return err
+ }
+ }
+ start := time.Now()
+ err := fileutil.Fdatasync(w.tail().File)
+
+ took := time.Since(start)
+ if took > warnSyncDuration {
+ if w.lg != nil {
+ w.lg.Warn(
+ "slow fdatasync",
+ zap.Duration("took", took),
+ zap.Duration("expected-duration", warnSyncDuration),
+ )
+ } else {
+ plog.Warningf("sync duration of %v, expected less than %v", took, warnSyncDuration)
+ }
+ }
+ walFsyncSec.Observe(took.Seconds())
+
+ return err
+}
+
+// ReleaseLockTo releases the locks, which has smaller index than the given index
+// except the largest one among them.
+// For example, if WAL is holding lock 1,2,3,4,5,6, ReleaseLockTo(4) will release
+// lock 1,2 but keep 3. ReleaseLockTo(5) will release 1,2,3 but keep 4.
+func (w *WAL) ReleaseLockTo(index uint64) error {
+ w.mu.Lock()
+ defer w.mu.Unlock()
+
+ if len(w.locks) == 0 {
+ return nil
+ }
+
+ var smaller int
+ found := false
+ for i, l := range w.locks {
+ _, lockIndex, err := parseWALName(filepath.Base(l.Name()))
+ if err != nil {
+ return err
+ }
+ if lockIndex >= index {
+ smaller = i - 1
+ found = true
+ break
+ }
+ }
+
+ // if no lock index is greater than the release index, we can
+ // release lock up to the last one(excluding).
+ if !found {
+ smaller = len(w.locks) - 1
+ }
+
+ if smaller <= 0 {
+ return nil
+ }
+
+ for i := 0; i < smaller; i++ {
+ if w.locks[i] == nil {
+ continue
+ }
+ w.locks[i].Close()
+ }
+ w.locks = w.locks[smaller:]
+
+ return nil
+}
+
+// Close closes the current WAL file and directory.
+func (w *WAL) Close() error {
+ w.mu.Lock()
+ defer w.mu.Unlock()
+
+ if w.fp != nil {
+ w.fp.Close()
+ w.fp = nil
+ }
+
+ if w.tail() != nil {
+ if err := w.sync(); err != nil {
+ return err
+ }
+ }
+ for _, l := range w.locks {
+ if l == nil {
+ continue
+ }
+ if err := l.Close(); err != nil {
+ if w.lg != nil {
+ w.lg.Warn("failed to close WAL", zap.Error(err))
+ } else {
+ plog.Errorf("failed to unlock during closing wal: %s", err)
+ }
+ }
+ }
+
+ return w.dirFile.Close()
+}
+
+func (w *WAL) saveEntry(e *raftpb.Entry) error {
+ // TODO: add MustMarshalTo to reduce one allocation.
+ b := pbutil.MustMarshal(e)
+ rec := &walpb.Record{Type: entryType, Data: b}
+ if err := w.encoder.encode(rec); err != nil {
+ return err
+ }
+ w.enti = e.Index
+ return nil
+}
+
+func (w *WAL) saveState(s *raftpb.HardState) error {
+ if raft.IsEmptyHardState(*s) {
+ return nil
+ }
+ w.state = *s
+ b := pbutil.MustMarshal(s)
+ rec := &walpb.Record{Type: stateType, Data: b}
+ return w.encoder.encode(rec)
+}
+
+func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error {
+ w.mu.Lock()
+ defer w.mu.Unlock()
+
+ // short cut, do not call sync
+ if raft.IsEmptyHardState(st) && len(ents) == 0 {
+ return nil
+ }
+
+ mustSync := raft.MustSync(st, w.state, len(ents))
+
+ // TODO(xiangli): no more reference operator
+ for i := range ents {
+ if err := w.saveEntry(&ents[i]); err != nil {
+ return err
+ }
+ }
+ if err := w.saveState(&st); err != nil {
+ return err
+ }
+
+ curOff, err := w.tail().Seek(0, io.SeekCurrent)
+ if err != nil {
+ return err
+ }
+ if curOff < SegmentSizeBytes {
+ if mustSync {
+ return w.sync()
+ }
+ return nil
+ }
+
+ return w.cut()
+}
+
+func (w *WAL) SaveSnapshot(e walpb.Snapshot) error {
+ b := pbutil.MustMarshal(&e)
+
+ w.mu.Lock()
+ defer w.mu.Unlock()
+
+ rec := &walpb.Record{Type: snapshotType, Data: b}
+ if err := w.encoder.encode(rec); err != nil {
+ return err
+ }
+ // update enti only when snapshot is ahead of last index
+ if w.enti < e.Index {
+ w.enti = e.Index
+ }
+ return w.sync()
+}
+
+func (w *WAL) saveCrc(prevCrc uint32) error {
+ return w.encoder.encode(&walpb.Record{Type: crcType, Crc: prevCrc})
+}
+
+func (w *WAL) tail() *fileutil.LockedFile {
+ if len(w.locks) > 0 {
+ return w.locks[len(w.locks)-1]
+ }
+ return nil
+}
+
+func (w *WAL) seq() uint64 {
+ t := w.tail()
+ if t == nil {
+ return 0
+ }
+ seq, _, err := parseWALName(filepath.Base(t.Name()))
+ if err != nil {
+ if w.lg != nil {
+ w.lg.Fatal("failed to parse WAL name", zap.String("name", t.Name()), zap.Error(err))
+ } else {
+ plog.Fatalf("bad wal name %s (%v)", t.Name(), err)
+ }
+ }
+ return seq
+}
+
+func closeAll(rcs ...io.ReadCloser) error {
+ for _, f := range rcs {
+ if err := f.Close(); err != nil {
+ return err
+ }
+ }
+ return nil
+}
diff --git a/vendor/go.etcd.io/etcd/wal/walpb/record.go b/vendor/go.etcd.io/etcd/wal/walpb/record.go
new file mode 100644
index 0000000..30a05e0
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/wal/walpb/record.go
@@ -0,0 +1,29 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package walpb
+
+import "errors"
+
+var (
+ ErrCRCMismatch = errors.New("walpb: crc mismatch")
+)
+
+func (rec *Record) Validate(crc uint32) error {
+ if rec.Crc == crc {
+ return nil
+ }
+ rec.Reset()
+ return ErrCRCMismatch
+}
diff --git a/vendor/go.etcd.io/etcd/wal/walpb/record.pb.go b/vendor/go.etcd.io/etcd/wal/walpb/record.pb.go
new file mode 100644
index 0000000..3ce63dd
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/wal/walpb/record.pb.go
@@ -0,0 +1,504 @@
+// Code generated by protoc-gen-gogo. DO NOT EDIT.
+// source: record.proto
+
+/*
+ Package walpb is a generated protocol buffer package.
+
+ It is generated from these files:
+ record.proto
+
+ It has these top-level messages:
+ Record
+ Snapshot
+*/
+package walpb
+
+import (
+ "fmt"
+
+ proto "github.com/golang/protobuf/proto"
+
+ math "math"
+
+ _ "github.com/gogo/protobuf/gogoproto"
+
+ io "io"
+)
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ = proto.Marshal
+var _ = fmt.Errorf
+var _ = math.Inf
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the proto package it is being compiled against.
+// A compilation error at this line likely means your copy of the
+// proto package needs to be updated.
+const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
+
+type Record struct {
+ Type int64 `protobuf:"varint,1,opt,name=type" json:"type"`
+ Crc uint32 `protobuf:"varint,2,opt,name=crc" json:"crc"`
+ Data []byte `protobuf:"bytes,3,opt,name=data" json:"data,omitempty"`
+ XXX_unrecognized []byte `json:"-"`
+}
+
+func (m *Record) Reset() { *m = Record{} }
+func (m *Record) String() string { return proto.CompactTextString(m) }
+func (*Record) ProtoMessage() {}
+func (*Record) Descriptor() ([]byte, []int) { return fileDescriptorRecord, []int{0} }
+
+type Snapshot struct {
+ Index uint64 `protobuf:"varint,1,opt,name=index" json:"index"`
+ Term uint64 `protobuf:"varint,2,opt,name=term" json:"term"`
+ XXX_unrecognized []byte `json:"-"`
+}
+
+func (m *Snapshot) Reset() { *m = Snapshot{} }
+func (m *Snapshot) String() string { return proto.CompactTextString(m) }
+func (*Snapshot) ProtoMessage() {}
+func (*Snapshot) Descriptor() ([]byte, []int) { return fileDescriptorRecord, []int{1} }
+
+func init() {
+ proto.RegisterType((*Record)(nil), "walpb.Record")
+ proto.RegisterType((*Snapshot)(nil), "walpb.Snapshot")
+}
+func (m *Record) Marshal() (dAtA []byte, err error) {
+ size := m.Size()
+ dAtA = make([]byte, size)
+ n, err := m.MarshalTo(dAtA)
+ if err != nil {
+ return nil, err
+ }
+ return dAtA[:n], nil
+}
+
+func (m *Record) MarshalTo(dAtA []byte) (int, error) {
+ var i int
+ _ = i
+ var l int
+ _ = l
+ dAtA[i] = 0x8
+ i++
+ i = encodeVarintRecord(dAtA, i, uint64(m.Type))
+ dAtA[i] = 0x10
+ i++
+ i = encodeVarintRecord(dAtA, i, uint64(m.Crc))
+ if m.Data != nil {
+ dAtA[i] = 0x1a
+ i++
+ i = encodeVarintRecord(dAtA, i, uint64(len(m.Data)))
+ i += copy(dAtA[i:], m.Data)
+ }
+ if m.XXX_unrecognized != nil {
+ i += copy(dAtA[i:], m.XXX_unrecognized)
+ }
+ return i, nil
+}
+
+func (m *Snapshot) Marshal() (dAtA []byte, err error) {
+ size := m.Size()
+ dAtA = make([]byte, size)
+ n, err := m.MarshalTo(dAtA)
+ if err != nil {
+ return nil, err
+ }
+ return dAtA[:n], nil
+}
+
+func (m *Snapshot) MarshalTo(dAtA []byte) (int, error) {
+ var i int
+ _ = i
+ var l int
+ _ = l
+ dAtA[i] = 0x8
+ i++
+ i = encodeVarintRecord(dAtA, i, uint64(m.Index))
+ dAtA[i] = 0x10
+ i++
+ i = encodeVarintRecord(dAtA, i, uint64(m.Term))
+ if m.XXX_unrecognized != nil {
+ i += copy(dAtA[i:], m.XXX_unrecognized)
+ }
+ return i, nil
+}
+
+func encodeVarintRecord(dAtA []byte, offset int, v uint64) int {
+ for v >= 1<<7 {
+ dAtA[offset] = uint8(v&0x7f | 0x80)
+ v >>= 7
+ offset++
+ }
+ dAtA[offset] = uint8(v)
+ return offset + 1
+}
+func (m *Record) Size() (n int) {
+ var l int
+ _ = l
+ n += 1 + sovRecord(uint64(m.Type))
+ n += 1 + sovRecord(uint64(m.Crc))
+ if m.Data != nil {
+ l = len(m.Data)
+ n += 1 + l + sovRecord(uint64(l))
+ }
+ if m.XXX_unrecognized != nil {
+ n += len(m.XXX_unrecognized)
+ }
+ return n
+}
+
+func (m *Snapshot) Size() (n int) {
+ var l int
+ _ = l
+ n += 1 + sovRecord(uint64(m.Index))
+ n += 1 + sovRecord(uint64(m.Term))
+ if m.XXX_unrecognized != nil {
+ n += len(m.XXX_unrecognized)
+ }
+ return n
+}
+
+func sovRecord(x uint64) (n int) {
+ for {
+ n++
+ x >>= 7
+ if x == 0 {
+ break
+ }
+ }
+ return n
+}
+func sozRecord(x uint64) (n int) {
+ return sovRecord(uint64((x << 1) ^ uint64((int64(x) >> 63))))
+}
+func (m *Record) Unmarshal(dAtA []byte) error {
+ l := len(dAtA)
+ iNdEx := 0
+ for iNdEx < l {
+ preIndex := iNdEx
+ var wire uint64
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRecord
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ wire |= (uint64(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ fieldNum := int32(wire >> 3)
+ wireType := int(wire & 0x7)
+ if wireType == 4 {
+ return fmt.Errorf("proto: Record: wiretype end group for non-group")
+ }
+ if fieldNum <= 0 {
+ return fmt.Errorf("proto: Record: illegal tag %d (wire type %d)", fieldNum, wire)
+ }
+ switch fieldNum {
+ case 1:
+ if wireType != 0 {
+ return fmt.Errorf("proto: wrong wireType = %d for field Type", wireType)
+ }
+ m.Type = 0
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRecord
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ m.Type |= (int64(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ case 2:
+ if wireType != 0 {
+ return fmt.Errorf("proto: wrong wireType = %d for field Crc", wireType)
+ }
+ m.Crc = 0
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRecord
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ m.Crc |= (uint32(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ case 3:
+ if wireType != 2 {
+ return fmt.Errorf("proto: wrong wireType = %d for field Data", wireType)
+ }
+ var byteLen int
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRecord
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ byteLen |= (int(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ if byteLen < 0 {
+ return ErrInvalidLengthRecord
+ }
+ postIndex := iNdEx + byteLen
+ if postIndex > l {
+ return io.ErrUnexpectedEOF
+ }
+ m.Data = append(m.Data[:0], dAtA[iNdEx:postIndex]...)
+ if m.Data == nil {
+ m.Data = []byte{}
+ }
+ iNdEx = postIndex
+ default:
+ iNdEx = preIndex
+ skippy, err := skipRecord(dAtA[iNdEx:])
+ if err != nil {
+ return err
+ }
+ if skippy < 0 {
+ return ErrInvalidLengthRecord
+ }
+ if (iNdEx + skippy) > l {
+ return io.ErrUnexpectedEOF
+ }
+ m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
+ iNdEx += skippy
+ }
+ }
+
+ if iNdEx > l {
+ return io.ErrUnexpectedEOF
+ }
+ return nil
+}
+func (m *Snapshot) Unmarshal(dAtA []byte) error {
+ l := len(dAtA)
+ iNdEx := 0
+ for iNdEx < l {
+ preIndex := iNdEx
+ var wire uint64
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRecord
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ wire |= (uint64(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ fieldNum := int32(wire >> 3)
+ wireType := int(wire & 0x7)
+ if wireType == 4 {
+ return fmt.Errorf("proto: Snapshot: wiretype end group for non-group")
+ }
+ if fieldNum <= 0 {
+ return fmt.Errorf("proto: Snapshot: illegal tag %d (wire type %d)", fieldNum, wire)
+ }
+ switch fieldNum {
+ case 1:
+ if wireType != 0 {
+ return fmt.Errorf("proto: wrong wireType = %d for field Index", wireType)
+ }
+ m.Index = 0
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRecord
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ m.Index |= (uint64(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ case 2:
+ if wireType != 0 {
+ return fmt.Errorf("proto: wrong wireType = %d for field Term", wireType)
+ }
+ m.Term = 0
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRecord
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ m.Term |= (uint64(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ default:
+ iNdEx = preIndex
+ skippy, err := skipRecord(dAtA[iNdEx:])
+ if err != nil {
+ return err
+ }
+ if skippy < 0 {
+ return ErrInvalidLengthRecord
+ }
+ if (iNdEx + skippy) > l {
+ return io.ErrUnexpectedEOF
+ }
+ m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
+ iNdEx += skippy
+ }
+ }
+
+ if iNdEx > l {
+ return io.ErrUnexpectedEOF
+ }
+ return nil
+}
+func skipRecord(dAtA []byte) (n int, err error) {
+ l := len(dAtA)
+ iNdEx := 0
+ for iNdEx < l {
+ var wire uint64
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return 0, ErrIntOverflowRecord
+ }
+ if iNdEx >= l {
+ return 0, io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ wire |= (uint64(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ wireType := int(wire & 0x7)
+ switch wireType {
+ case 0:
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return 0, ErrIntOverflowRecord
+ }
+ if iNdEx >= l {
+ return 0, io.ErrUnexpectedEOF
+ }
+ iNdEx++
+ if dAtA[iNdEx-1] < 0x80 {
+ break
+ }
+ }
+ return iNdEx, nil
+ case 1:
+ iNdEx += 8
+ return iNdEx, nil
+ case 2:
+ var length int
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return 0, ErrIntOverflowRecord
+ }
+ if iNdEx >= l {
+ return 0, io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ length |= (int(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ iNdEx += length
+ if length < 0 {
+ return 0, ErrInvalidLengthRecord
+ }
+ return iNdEx, nil
+ case 3:
+ for {
+ var innerWire uint64
+ var start int = iNdEx
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return 0, ErrIntOverflowRecord
+ }
+ if iNdEx >= l {
+ return 0, io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ innerWire |= (uint64(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ innerWireType := int(innerWire & 0x7)
+ if innerWireType == 4 {
+ break
+ }
+ next, err := skipRecord(dAtA[start:])
+ if err != nil {
+ return 0, err
+ }
+ iNdEx = start + next
+ }
+ return iNdEx, nil
+ case 4:
+ return iNdEx, nil
+ case 5:
+ iNdEx += 4
+ return iNdEx, nil
+ default:
+ return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
+ }
+ }
+ panic("unreachable")
+}
+
+var (
+ ErrInvalidLengthRecord = fmt.Errorf("proto: negative length found during unmarshaling")
+ ErrIntOverflowRecord = fmt.Errorf("proto: integer overflow")
+)
+
+func init() { proto.RegisterFile("record.proto", fileDescriptorRecord) }
+
+var fileDescriptorRecord = []byte{
+ // 186 bytes of a gzipped FileDescriptorProto
+ 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x29, 0x4a, 0x4d, 0xce,
+ 0x2f, 0x4a, 0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x2d, 0x4f, 0xcc, 0x29, 0x48, 0x92,
+ 0x12, 0x49, 0xcf, 0x4f, 0xcf, 0x07, 0x8b, 0xe8, 0x83, 0x58, 0x10, 0x49, 0x25, 0x3f, 0x2e, 0xb6,
+ 0x20, 0xb0, 0x62, 0x21, 0x09, 0x2e, 0x96, 0x92, 0xca, 0x82, 0x54, 0x09, 0x46, 0x05, 0x46, 0x0d,
+ 0x66, 0x27, 0x96, 0x13, 0xf7, 0xe4, 0x19, 0x82, 0xc0, 0x22, 0x42, 0x62, 0x5c, 0xcc, 0xc9, 0x45,
+ 0xc9, 0x12, 0x4c, 0x0a, 0x8c, 0x1a, 0xbc, 0x50, 0x09, 0x90, 0x80, 0x90, 0x10, 0x17, 0x4b, 0x4a,
+ 0x62, 0x49, 0xa2, 0x04, 0xb3, 0x02, 0xa3, 0x06, 0x4f, 0x10, 0x98, 0xad, 0xe4, 0xc0, 0xc5, 0x11,
+ 0x9c, 0x97, 0x58, 0x50, 0x9c, 0x91, 0x5f, 0x22, 0x24, 0xc5, 0xc5, 0x9a, 0x99, 0x97, 0x92, 0x5a,
+ 0x01, 0x36, 0x92, 0x05, 0xaa, 0x13, 0x22, 0x04, 0xb6, 0x2d, 0xb5, 0x28, 0x17, 0x6c, 0x28, 0x0b,
+ 0xdc, 0xb6, 0xd4, 0xa2, 0x5c, 0x27, 0x91, 0x13, 0x0f, 0xe5, 0x18, 0x4e, 0x3c, 0x92, 0x63, 0xbc,
+ 0xf0, 0x48, 0x8e, 0xf1, 0xc1, 0x23, 0x39, 0xc6, 0x19, 0x8f, 0xe5, 0x18, 0x00, 0x01, 0x00, 0x00,
+ 0xff, 0xff, 0x7f, 0x5e, 0x5c, 0x46, 0xd3, 0x00, 0x00, 0x00,
+}
diff --git a/vendor/go.etcd.io/etcd/wal/walpb/record.proto b/vendor/go.etcd.io/etcd/wal/walpb/record.proto
new file mode 100644
index 0000000..b694cb2
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/wal/walpb/record.proto
@@ -0,0 +1,20 @@
+syntax = "proto2";
+package walpb;
+
+import "gogoproto/gogo.proto";
+
+option (gogoproto.marshaler_all) = true;
+option (gogoproto.sizer_all) = true;
+option (gogoproto.unmarshaler_all) = true;
+option (gogoproto.goproto_getters_all) = false;
+
+message Record {
+ optional int64 type = 1 [(gogoproto.nullable) = false];
+ optional uint32 crc = 2 [(gogoproto.nullable) = false];
+ optional bytes data = 3;
+}
+
+message Snapshot {
+ optional uint64 index = 1 [(gogoproto.nullable) = false];
+ optional uint64 term = 2 [(gogoproto.nullable) = false];
+}