blob: 1fa36deb39427677dc46f2e7b1032b6a926f130f [file] [log] [blame]
khenaidooab1f7bd2019-11-14 14:00:27 -05001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package rafthttp
16
17import (
18 "encoding/binary"
19 "fmt"
20 "io"
21 "time"
22
23 stats "go.etcd.io/etcd/etcdserver/api/v2stats"
24 "go.etcd.io/etcd/pkg/pbutil"
25 "go.etcd.io/etcd/pkg/types"
26 "go.etcd.io/etcd/raft/raftpb"
27)
28
29const (
30 msgTypeLinkHeartbeat uint8 = 0
31 msgTypeAppEntries uint8 = 1
32 msgTypeApp uint8 = 2
33
34 msgAppV2BufSize = 1024 * 1024
35)
36
37// msgappv2 stream sends three types of message: linkHeartbeatMessage,
38// AppEntries and MsgApp. AppEntries is the MsgApp that is sent in
39// replicate state in raft, whose index and term are fully predictable.
40//
41// Data format of linkHeartbeatMessage:
42// | offset | bytes | description |
43// +--------+-------+-------------+
44// | 0 | 1 | \x00 |
45//
46// Data format of AppEntries:
47// | offset | bytes | description |
48// +--------+-------+-------------+
49// | 0 | 1 | \x01 |
50// | 1 | 8 | length of entries |
51// | 9 | 8 | length of first entry |
52// | 17 | n1 | first entry |
53// ...
54// | x | 8 | length of k-th entry data |
55// | x+8 | nk | k-th entry data |
56// | x+8+nk | 8 | commit index |
57//
58// Data format of MsgApp:
59// | offset | bytes | description |
60// +--------+-------+-------------+
61// | 0 | 1 | \x02 |
62// | 1 | 8 | length of encoded message |
63// | 9 | n | encoded message |
64type msgAppV2Encoder struct {
65 w io.Writer
66 fs *stats.FollowerStats
67
68 term uint64
69 index uint64
70 buf []byte
71 uint64buf []byte
72 uint8buf []byte
73}
74
75func newMsgAppV2Encoder(w io.Writer, fs *stats.FollowerStats) *msgAppV2Encoder {
76 return &msgAppV2Encoder{
77 w: w,
78 fs: fs,
79 buf: make([]byte, msgAppV2BufSize),
80 uint64buf: make([]byte, 8),
81 uint8buf: make([]byte, 1),
82 }
83}
84
85func (enc *msgAppV2Encoder) encode(m *raftpb.Message) error {
86 start := time.Now()
87 switch {
88 case isLinkHeartbeatMessage(m):
89 enc.uint8buf[0] = msgTypeLinkHeartbeat
90 if _, err := enc.w.Write(enc.uint8buf); err != nil {
91 return err
92 }
93 case enc.index == m.Index && enc.term == m.LogTerm && m.LogTerm == m.Term:
94 enc.uint8buf[0] = msgTypeAppEntries
95 if _, err := enc.w.Write(enc.uint8buf); err != nil {
96 return err
97 }
98 // write length of entries
99 binary.BigEndian.PutUint64(enc.uint64buf, uint64(len(m.Entries)))
100 if _, err := enc.w.Write(enc.uint64buf); err != nil {
101 return err
102 }
103 for i := 0; i < len(m.Entries); i++ {
104 // write length of entry
105 binary.BigEndian.PutUint64(enc.uint64buf, uint64(m.Entries[i].Size()))
106 if _, err := enc.w.Write(enc.uint64buf); err != nil {
107 return err
108 }
109 if n := m.Entries[i].Size(); n < msgAppV2BufSize {
110 if _, err := m.Entries[i].MarshalTo(enc.buf); err != nil {
111 return err
112 }
113 if _, err := enc.w.Write(enc.buf[:n]); err != nil {
114 return err
115 }
116 } else {
117 if _, err := enc.w.Write(pbutil.MustMarshal(&m.Entries[i])); err != nil {
118 return err
119 }
120 }
121 enc.index++
122 }
123 // write commit index
124 binary.BigEndian.PutUint64(enc.uint64buf, m.Commit)
125 if _, err := enc.w.Write(enc.uint64buf); err != nil {
126 return err
127 }
128 enc.fs.Succ(time.Since(start))
129 default:
130 if err := binary.Write(enc.w, binary.BigEndian, msgTypeApp); err != nil {
131 return err
132 }
133 // write size of message
134 if err := binary.Write(enc.w, binary.BigEndian, uint64(m.Size())); err != nil {
135 return err
136 }
137 // write message
138 if _, err := enc.w.Write(pbutil.MustMarshal(m)); err != nil {
139 return err
140 }
141
142 enc.term = m.Term
143 enc.index = m.Index
144 if l := len(m.Entries); l > 0 {
145 enc.index = m.Entries[l-1].Index
146 }
147 enc.fs.Succ(time.Since(start))
148 }
149 return nil
150}
151
152type msgAppV2Decoder struct {
153 r io.Reader
154 local, remote types.ID
155
156 term uint64
157 index uint64
158 buf []byte
159 uint64buf []byte
160 uint8buf []byte
161}
162
163func newMsgAppV2Decoder(r io.Reader, local, remote types.ID) *msgAppV2Decoder {
164 return &msgAppV2Decoder{
165 r: r,
166 local: local,
167 remote: remote,
168 buf: make([]byte, msgAppV2BufSize),
169 uint64buf: make([]byte, 8),
170 uint8buf: make([]byte, 1),
171 }
172}
173
174func (dec *msgAppV2Decoder) decode() (raftpb.Message, error) {
175 var (
176 m raftpb.Message
177 typ uint8
178 )
179 if _, err := io.ReadFull(dec.r, dec.uint8buf); err != nil {
180 return m, err
181 }
182 typ = dec.uint8buf[0]
183 switch typ {
184 case msgTypeLinkHeartbeat:
185 return linkHeartbeatMessage, nil
186 case msgTypeAppEntries:
187 m = raftpb.Message{
188 Type: raftpb.MsgApp,
189 From: uint64(dec.remote),
190 To: uint64(dec.local),
191 Term: dec.term,
192 LogTerm: dec.term,
193 Index: dec.index,
194 }
195
196 // decode entries
197 if _, err := io.ReadFull(dec.r, dec.uint64buf); err != nil {
198 return m, err
199 }
200 l := binary.BigEndian.Uint64(dec.uint64buf)
201 m.Entries = make([]raftpb.Entry, int(l))
202 for i := 0; i < int(l); i++ {
203 if _, err := io.ReadFull(dec.r, dec.uint64buf); err != nil {
204 return m, err
205 }
206 size := binary.BigEndian.Uint64(dec.uint64buf)
207 var buf []byte
208 if size < msgAppV2BufSize {
209 buf = dec.buf[:size]
210 if _, err := io.ReadFull(dec.r, buf); err != nil {
211 return m, err
212 }
213 } else {
214 buf = make([]byte, int(size))
215 if _, err := io.ReadFull(dec.r, buf); err != nil {
216 return m, err
217 }
218 }
219 dec.index++
220 // 1 alloc
221 pbutil.MustUnmarshal(&m.Entries[i], buf)
222 }
223 // decode commit index
224 if _, err := io.ReadFull(dec.r, dec.uint64buf); err != nil {
225 return m, err
226 }
227 m.Commit = binary.BigEndian.Uint64(dec.uint64buf)
228 case msgTypeApp:
229 var size uint64
230 if err := binary.Read(dec.r, binary.BigEndian, &size); err != nil {
231 return m, err
232 }
233 buf := make([]byte, int(size))
234 if _, err := io.ReadFull(dec.r, buf); err != nil {
235 return m, err
236 }
237 pbutil.MustUnmarshal(&m, buf)
238
239 dec.term = m.Term
240 dec.index = m.Index
241 if l := len(m.Entries); l > 0 {
242 dec.index = m.Entries[l-1].Index
243 }
244 default:
245 return m, fmt.Errorf("failed to parse type %d in msgappv2 stream", typ)
246 }
247 return m, nil
248}