blob: f2f0b26fd076f4e0f99f3f7a691def0a17a3eb36 [file] [log] [blame]
khenaidood948f772021-08-11 17:49:24 -04001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package wal
16
17import (
18 "bufio"
19 "encoding/binary"
20 "hash"
21 "io"
22 "sync"
23
24 "github.com/coreos/etcd/pkg/crc"
25 "github.com/coreos/etcd/pkg/pbutil"
26 "github.com/coreos/etcd/raft/raftpb"
27 "github.com/coreos/etcd/wal/walpb"
28)
29
30const minSectorSize = 512
31
32// frameSizeBytes is frame size in bytes, including record size and padding size.
33const frameSizeBytes = 8
34
35type decoder struct {
36 mu sync.Mutex
37 brs []*bufio.Reader
38
39 // lastValidOff file offset following the last valid decoded record
40 lastValidOff int64
41 crc hash.Hash32
42}
43
44func newDecoder(r ...io.Reader) *decoder {
45 readers := make([]*bufio.Reader, len(r))
46 for i := range r {
47 readers[i] = bufio.NewReader(r[i])
48 }
49 return &decoder{
50 brs: readers,
51 crc: crc.New(0, crcTable),
52 }
53}
54
55func (d *decoder) decode(rec *walpb.Record) error {
56 rec.Reset()
57 d.mu.Lock()
58 defer d.mu.Unlock()
59 return d.decodeRecord(rec)
60}
61
62// raft max message size is set to 1 MB in etcd server
63// assume projects set reasonable message size limit,
64// thus entry size should never exceed 10 MB
65const maxWALEntrySizeLimit = int64(10 * 1024 * 1024)
66
67func (d *decoder) decodeRecord(rec *walpb.Record) error {
68 if len(d.brs) == 0 {
69 return io.EOF
70 }
71
72 l, err := readInt64(d.brs[0])
73 if err == io.EOF || (err == nil && l == 0) {
74 // hit end of file or preallocated space
75 d.brs = d.brs[1:]
76 if len(d.brs) == 0 {
77 return io.EOF
78 }
79 d.lastValidOff = 0
80 return d.decodeRecord(rec)
81 }
82 if err != nil {
83 return err
84 }
85
86 recBytes, padBytes := decodeFrameSize(l)
87 if recBytes >= maxWALEntrySizeLimit-padBytes {
88 return ErrMaxWALEntrySizeLimitExceeded
89 }
90
91 data := make([]byte, recBytes+padBytes)
92 if _, err = io.ReadFull(d.brs[0], data); err != nil {
93 // ReadFull returns io.EOF only if no bytes were read
94 // the decoder should treat this as an ErrUnexpectedEOF instead.
95 if err == io.EOF {
96 err = io.ErrUnexpectedEOF
97 }
98 return err
99 }
100 if err := rec.Unmarshal(data[:recBytes]); err != nil {
101 if d.isTornEntry(data) {
102 return io.ErrUnexpectedEOF
103 }
104 return err
105 }
106
107 // skip crc checking if the record type is crcType
108 if rec.Type != crcType {
109 d.crc.Write(rec.Data)
110 if err := rec.Validate(d.crc.Sum32()); err != nil {
111 if d.isTornEntry(data) {
112 return io.ErrUnexpectedEOF
113 }
114 return err
115 }
116 }
117 // record decoded as valid; point last valid offset to end of record
118 d.lastValidOff += frameSizeBytes + recBytes + padBytes
119 return nil
120}
121
122func decodeFrameSize(lenField int64) (recBytes int64, padBytes int64) {
123 // the record size is stored in the lower 56 bits of the 64-bit length
124 recBytes = int64(uint64(lenField) & ^(uint64(0xff) << 56))
125 // non-zero padding is indicated by set MSb / a negative length
126 if lenField < 0 {
127 // padding is stored in lower 3 bits of length MSB
128 padBytes = int64((uint64(lenField) >> 56) & 0x7)
129 }
130 return recBytes, padBytes
131}
132
133// isTornEntry determines whether the last entry of the WAL was partially written
134// and corrupted because of a torn write.
135func (d *decoder) isTornEntry(data []byte) bool {
136 if len(d.brs) != 1 {
137 return false
138 }
139
140 fileOff := d.lastValidOff + frameSizeBytes
141 curOff := 0
142 chunks := [][]byte{}
143 // split data on sector boundaries
144 for curOff < len(data) {
145 chunkLen := int(minSectorSize - (fileOff % minSectorSize))
146 if chunkLen > len(data)-curOff {
147 chunkLen = len(data) - curOff
148 }
149 chunks = append(chunks, data[curOff:curOff+chunkLen])
150 fileOff += int64(chunkLen)
151 curOff += chunkLen
152 }
153
154 // if any data for a sector chunk is all 0, it's a torn write
155 for _, sect := range chunks {
156 isZero := true
157 for _, v := range sect {
158 if v != 0 {
159 isZero = false
160 break
161 }
162 }
163 if isZero {
164 return true
165 }
166 }
167 return false
168}
169
170func (d *decoder) updateCRC(prevCrc uint32) {
171 d.crc = crc.New(prevCrc, crcTable)
172}
173
174func (d *decoder) lastCRC() uint32 {
175 return d.crc.Sum32()
176}
177
178func (d *decoder) lastOffset() int64 { return d.lastValidOff }
179
180func mustUnmarshalEntry(d []byte) raftpb.Entry {
181 var e raftpb.Entry
182 pbutil.MustUnmarshal(&e, d)
183 return e
184}
185
186func mustUnmarshalState(d []byte) raftpb.HardState {
187 var s raftpb.HardState
188 pbutil.MustUnmarshal(&s, d)
189 return s
190}
191
192func readInt64(r io.Reader) (int64, error) {
193 var n int64
194 err := binary.Read(r, binary.LittleEndian, &n)
195 return n, err
196}