blob: f2f01fd881c49884585d0b8fc71df9d6fbb8b78c [file] [log] [blame]
khenaidooab1f7bd2019-11-14 14:00:27 -05001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package wal
16
17import (
18 "bufio"
19 "encoding/binary"
20 "hash"
21 "io"
22 "sync"
23
24 "go.etcd.io/etcd/pkg/crc"
25 "go.etcd.io/etcd/pkg/pbutil"
26 "go.etcd.io/etcd/raft/raftpb"
27 "go.etcd.io/etcd/wal/walpb"
28)
29
30const minSectorSize = 512
31
32// frameSizeBytes is frame size in bytes, including record size and padding size.
33const frameSizeBytes = 8
34
35type decoder struct {
36 mu sync.Mutex
37 brs []*bufio.Reader
38
39 // lastValidOff file offset following the last valid decoded record
40 lastValidOff int64
41 crc hash.Hash32
42}
43
44func newDecoder(r ...io.Reader) *decoder {
45 readers := make([]*bufio.Reader, len(r))
46 for i := range r {
47 readers[i] = bufio.NewReader(r[i])
48 }
49 return &decoder{
50 brs: readers,
51 crc: crc.New(0, crcTable),
52 }
53}
54
55func (d *decoder) decode(rec *walpb.Record) error {
56 rec.Reset()
57 d.mu.Lock()
58 defer d.mu.Unlock()
59 return d.decodeRecord(rec)
60}
61
62func (d *decoder) decodeRecord(rec *walpb.Record) error {
63 if len(d.brs) == 0 {
64 return io.EOF
65 }
66
67 l, err := readInt64(d.brs[0])
68 if err == io.EOF || (err == nil && l == 0) {
69 // hit end of file or preallocated space
70 d.brs = d.brs[1:]
71 if len(d.brs) == 0 {
72 return io.EOF
73 }
74 d.lastValidOff = 0
75 return d.decodeRecord(rec)
76 }
77 if err != nil {
78 return err
79 }
80
81 recBytes, padBytes := decodeFrameSize(l)
82
83 data := make([]byte, recBytes+padBytes)
84 if _, err = io.ReadFull(d.brs[0], data); err != nil {
85 // ReadFull returns io.EOF only if no bytes were read
86 // the decoder should treat this as an ErrUnexpectedEOF instead.
87 if err == io.EOF {
88 err = io.ErrUnexpectedEOF
89 }
90 return err
91 }
92 if err := rec.Unmarshal(data[:recBytes]); err != nil {
93 if d.isTornEntry(data) {
94 return io.ErrUnexpectedEOF
95 }
96 return err
97 }
98
99 // skip crc checking if the record type is crcType
100 if rec.Type != crcType {
101 d.crc.Write(rec.Data)
102 if err := rec.Validate(d.crc.Sum32()); err != nil {
103 if d.isTornEntry(data) {
104 return io.ErrUnexpectedEOF
105 }
106 return err
107 }
108 }
109 // record decoded as valid; point last valid offset to end of record
110 d.lastValidOff += frameSizeBytes + recBytes + padBytes
111 return nil
112}
113
114func decodeFrameSize(lenField int64) (recBytes int64, padBytes int64) {
115 // the record size is stored in the lower 56 bits of the 64-bit length
116 recBytes = int64(uint64(lenField) & ^(uint64(0xff) << 56))
117 // non-zero padding is indicated by set MSb / a negative length
118 if lenField < 0 {
119 // padding is stored in lower 3 bits of length MSB
120 padBytes = int64((uint64(lenField) >> 56) & 0x7)
121 }
122 return recBytes, padBytes
123}
124
125// isTornEntry determines whether the last entry of the WAL was partially written
126// and corrupted because of a torn write.
127func (d *decoder) isTornEntry(data []byte) bool {
128 if len(d.brs) != 1 {
129 return false
130 }
131
132 fileOff := d.lastValidOff + frameSizeBytes
133 curOff := 0
134 chunks := [][]byte{}
135 // split data on sector boundaries
136 for curOff < len(data) {
137 chunkLen := int(minSectorSize - (fileOff % minSectorSize))
138 if chunkLen > len(data)-curOff {
139 chunkLen = len(data) - curOff
140 }
141 chunks = append(chunks, data[curOff:curOff+chunkLen])
142 fileOff += int64(chunkLen)
143 curOff += chunkLen
144 }
145
146 // if any data for a sector chunk is all 0, it's a torn write
147 for _, sect := range chunks {
148 isZero := true
149 for _, v := range sect {
150 if v != 0 {
151 isZero = false
152 break
153 }
154 }
155 if isZero {
156 return true
157 }
158 }
159 return false
160}
161
162func (d *decoder) updateCRC(prevCrc uint32) {
163 d.crc = crc.New(prevCrc, crcTable)
164}
165
166func (d *decoder) lastCRC() uint32 {
167 return d.crc.Sum32()
168}
169
170func (d *decoder) lastOffset() int64 { return d.lastValidOff }
171
172func mustUnmarshalEntry(d []byte) raftpb.Entry {
173 var e raftpb.Entry
174 pbutil.MustUnmarshal(&e, d)
175 return e
176}
177
178func mustUnmarshalState(d []byte) raftpb.HardState {
179 var s raftpb.HardState
180 pbutil.MustUnmarshal(&s, d)
181 return s
182}
183
184func readInt64(r io.Reader) (int64, error) {
185 var n int64
186 err := binary.Read(r, binary.LittleEndian, &n)
187 return n, err
188}