blob: e8040b8dff13860265e6ff840ab612afcf61be72 [file] [log] [blame]
khenaidooffe076b2019-01-15 16:08:08 -05001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package wal
16
17import (
18 "encoding/binary"
19 "hash"
20 "io"
21 "os"
22 "sync"
23
24 "github.com/coreos/etcd/pkg/crc"
25 "github.com/coreos/etcd/pkg/ioutil"
26 "github.com/coreos/etcd/wal/walpb"
27)
28
29// walPageBytes is the alignment for flushing records to the backing Writer.
30// It should be a multiple of the minimum sector size so that WAL can safely
31// distinguish between torn writes and ordinary data corruption.
32const walPageBytes = 8 * minSectorSize
33
34type encoder struct {
35 mu sync.Mutex
36 bw *ioutil.PageWriter
37
38 crc hash.Hash32
39 buf []byte
40 uint64buf []byte
41}
42
43func newEncoder(w io.Writer, prevCrc uint32, pageOffset int) *encoder {
44 return &encoder{
45 bw: ioutil.NewPageWriter(w, walPageBytes, pageOffset),
46 crc: crc.New(prevCrc, crcTable),
47 // 1MB buffer
48 buf: make([]byte, 1024*1024),
49 uint64buf: make([]byte, 8),
50 }
51}
52
53// newFileEncoder creates a new encoder with current file offset for the page writer.
54func newFileEncoder(f *os.File, prevCrc uint32) (*encoder, error) {
55 offset, err := f.Seek(0, io.SeekCurrent)
56 if err != nil {
57 return nil, err
58 }
59 return newEncoder(f, prevCrc, int(offset)), nil
60}
61
62func (e *encoder) encode(rec *walpb.Record) error {
63 e.mu.Lock()
64 defer e.mu.Unlock()
65
66 e.crc.Write(rec.Data)
67 rec.Crc = e.crc.Sum32()
68 var (
69 data []byte
70 err error
71 n int
72 )
73
74 if rec.Size() > len(e.buf) {
75 data, err = rec.Marshal()
76 if err != nil {
77 return err
78 }
79 } else {
80 n, err = rec.MarshalTo(e.buf)
81 if err != nil {
82 return err
83 }
84 data = e.buf[:n]
85 }
86
87 lenField, padBytes := encodeFrameSize(len(data))
88 if err = writeUint64(e.bw, lenField, e.uint64buf); err != nil {
89 return err
90 }
91
92 if padBytes != 0 {
93 data = append(data, make([]byte, padBytes)...)
94 }
95 _, err = e.bw.Write(data)
96 return err
97}
98
99func encodeFrameSize(dataBytes int) (lenField uint64, padBytes int) {
100 lenField = uint64(dataBytes)
101 // force 8 byte alignment so length never gets a torn write
102 padBytes = (8 - (dataBytes % 8)) % 8
103 if padBytes != 0 {
104 lenField |= uint64(0x80|padBytes) << 56
105 }
106 return lenField, padBytes
107}
108
109func (e *encoder) flush() error {
110 e.mu.Lock()
111 defer e.mu.Unlock()
112 return e.bw.Flush()
113}
114
115func writeUint64(w io.Writer, n uint64, buf []byte) error {
116 // http://golang.org/src/encoding/binary/binary.go
117 binary.LittleEndian.PutUint64(buf, n)
118 _, err := w.Write(buf)
119 return err
120}