blob: e8890d88a9b8ba9d9fc97d35c8ae8febc5191f5b [file] [log] [blame]
khenaidood948f772021-08-11 17:49:24 -04001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package wal
16
17import (
18 "encoding/binary"
19 "hash"
20 "io"
21 "os"
22 "sync"
23
24 "github.com/coreos/etcd/pkg/crc"
25 "github.com/coreos/etcd/pkg/ioutil"
26 "github.com/coreos/etcd/wal/walpb"
27)
28
29// walPageBytes is the alignment for flushing records to the backing Writer.
30// It should be a multiple of the minimum sector size so that WAL can safely
31// distinguish between torn writes and ordinary data corruption.
32const walPageBytes = 8 * minSectorSize
33
34type encoder struct {
35 mu sync.Mutex
36 bw *ioutil.PageWriter
37
38 crc hash.Hash32
39 buf []byte
40 uint64buf []byte
41}
42
43func newEncoder(w io.Writer, prevCrc uint32, pageOffset int) *encoder {
44 return &encoder{
45 bw: ioutil.NewPageWriter(w, walPageBytes, pageOffset),
46 crc: crc.New(prevCrc, crcTable),
47 // 1MB buffer
48 buf: make([]byte, 1024*1024),
49 uint64buf: make([]byte, 8),
50 }
51}
52
53// newFileEncoder creates a new encoder with current file offset for the page writer.
54func newFileEncoder(f *os.File, prevCrc uint32) (*encoder, error) {
55 offset, err := f.Seek(0, io.SeekCurrent)
56 if err != nil {
57 return nil, err
58 }
59 return newEncoder(f, prevCrc, int(offset)), nil
60}
61
62func (e *encoder) encode(rec *walpb.Record) error {
63 e.mu.Lock()
64 defer e.mu.Unlock()
65
66 e.crc.Write(rec.Data)
67 rec.Crc = e.crc.Sum32()
68 var (
69 data []byte
70 err error
71 n int
72 )
73
74 if rec.Size() > len(e.buf) {
75 data, err = rec.Marshal()
76 if err != nil {
77 return err
78 }
79 } else {
80 n, err = rec.MarshalTo(e.buf)
81 if err != nil {
82 return err
83 }
84 data = e.buf[:n]
85 }
86
87 lenField, padBytes := encodeFrameSize(len(data))
88 if err = writeUint64(e.bw, lenField, e.uint64buf); err != nil {
89 return err
90 }
91
92 if padBytes != 0 {
93 data = append(data, make([]byte, padBytes)...)
94 }
95 n, err = e.bw.Write(data)
96 walWriteBytes.Add(float64(n))
97 return err
98}
99
100func encodeFrameSize(dataBytes int) (lenField uint64, padBytes int) {
101 lenField = uint64(dataBytes)
102 // force 8 byte alignment so length never gets a torn write
103 padBytes = (8 - (dataBytes % 8)) % 8
104 if padBytes != 0 {
105 lenField |= uint64(0x80|padBytes) << 56
106 }
107 return lenField, padBytes
108}
109
110func (e *encoder) flush() error {
111 e.mu.Lock()
112 n, err := e.bw.FlushN()
113 e.mu.Unlock()
114 walWriteBytes.Add(float64(n))
115 return err
116}
117
118func writeUint64(w io.Writer, n uint64, buf []byte) error {
119 // http://golang.org/src/encoding/binary/binary.go
120 binary.LittleEndian.PutUint64(buf, n)
121 nv, err := w.Write(buf)
122 walWriteBytes.Add(float64(nv))
123 return err
124}