khenaidoo | 2672188 | 2021-08-11 17:42:52 -0400 | [diff] [blame] | 1 | // Copyright 2015 The etcd Authors |
| 2 | // |
| 3 | // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | // you may not use this file except in compliance with the License. |
| 5 | // You may obtain a copy of the License at |
| 6 | // |
| 7 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | // |
| 9 | // Unless required by applicable law or agreed to in writing, software |
| 10 | // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | // See the License for the specific language governing permissions and |
| 13 | // limitations under the License. |
| 14 | |
| 15 | package rafthttp |
| 16 | |
| 17 | import ( |
| 18 | "encoding/binary" |
| 19 | "fmt" |
| 20 | "io" |
| 21 | "time" |
| 22 | |
| 23 | "github.com/coreos/etcd/etcdserver/stats" |
| 24 | "github.com/coreos/etcd/pkg/pbutil" |
| 25 | "github.com/coreos/etcd/pkg/types" |
| 26 | "github.com/coreos/etcd/raft/raftpb" |
| 27 | ) |
| 28 | |
| 29 | const ( |
| 30 | msgTypeLinkHeartbeat uint8 = 0 |
| 31 | msgTypeAppEntries uint8 = 1 |
| 32 | msgTypeApp uint8 = 2 |
| 33 | |
| 34 | msgAppV2BufSize = 1024 * 1024 |
| 35 | ) |
| 36 | |
| 37 | // msgappv2 stream sends three types of message: linkHeartbeatMessage, |
| 38 | // AppEntries and MsgApp. AppEntries is the MsgApp that is sent in |
| 39 | // replicate state in raft, whose index and term are fully predictable. |
| 40 | // |
| 41 | // Data format of linkHeartbeatMessage: |
| 42 | // | offset | bytes | description | |
| 43 | // +--------+-------+-------------+ |
| 44 | // | 0 | 1 | \x00 | |
| 45 | // |
| 46 | // Data format of AppEntries: |
| 47 | // | offset | bytes | description | |
| 48 | // +--------+-------+-------------+ |
| 49 | // | 0 | 1 | \x01 | |
| 50 | // | 1 | 8 | length of entries | |
| 51 | // | 9 | 8 | length of first entry | |
| 52 | // | 17 | n1 | first entry | |
| 53 | // ... |
| 54 | // | x | 8 | length of k-th entry data | |
| 55 | // | x+8 | nk | k-th entry data | |
| 56 | // | x+8+nk | 8 | commit index | |
| 57 | // |
| 58 | // Data format of MsgApp: |
| 59 | // | offset | bytes | description | |
| 60 | // +--------+-------+-------------+ |
| 61 | // | 0 | 1 | \x02 | |
| 62 | // | 1 | 8 | length of encoded message | |
| 63 | // | 9 | n | encoded message | |
| 64 | type msgAppV2Encoder struct { |
| 65 | w io.Writer |
| 66 | fs *stats.FollowerStats |
| 67 | |
| 68 | term uint64 |
| 69 | index uint64 |
| 70 | buf []byte |
| 71 | uint64buf []byte |
| 72 | uint8buf []byte |
| 73 | } |
| 74 | |
| 75 | func newMsgAppV2Encoder(w io.Writer, fs *stats.FollowerStats) *msgAppV2Encoder { |
| 76 | return &msgAppV2Encoder{ |
| 77 | w: w, |
| 78 | fs: fs, |
| 79 | buf: make([]byte, msgAppV2BufSize), |
| 80 | uint64buf: make([]byte, 8), |
| 81 | uint8buf: make([]byte, 1), |
| 82 | } |
| 83 | } |
| 84 | |
| 85 | func (enc *msgAppV2Encoder) encode(m *raftpb.Message) error { |
| 86 | start := time.Now() |
| 87 | switch { |
| 88 | case isLinkHeartbeatMessage(m): |
| 89 | enc.uint8buf[0] = byte(msgTypeLinkHeartbeat) |
| 90 | if _, err := enc.w.Write(enc.uint8buf); err != nil { |
| 91 | return err |
| 92 | } |
| 93 | case enc.index == m.Index && enc.term == m.LogTerm && m.LogTerm == m.Term: |
| 94 | enc.uint8buf[0] = byte(msgTypeAppEntries) |
| 95 | if _, err := enc.w.Write(enc.uint8buf); err != nil { |
| 96 | return err |
| 97 | } |
| 98 | // write length of entries |
| 99 | binary.BigEndian.PutUint64(enc.uint64buf, uint64(len(m.Entries))) |
| 100 | if _, err := enc.w.Write(enc.uint64buf); err != nil { |
| 101 | return err |
| 102 | } |
| 103 | for i := 0; i < len(m.Entries); i++ { |
| 104 | // write length of entry |
| 105 | binary.BigEndian.PutUint64(enc.uint64buf, uint64(m.Entries[i].Size())) |
| 106 | if _, err := enc.w.Write(enc.uint64buf); err != nil { |
| 107 | return err |
| 108 | } |
| 109 | if n := m.Entries[i].Size(); n < msgAppV2BufSize { |
| 110 | if _, err := m.Entries[i].MarshalTo(enc.buf); err != nil { |
| 111 | return err |
| 112 | } |
| 113 | if _, err := enc.w.Write(enc.buf[:n]); err != nil { |
| 114 | return err |
| 115 | } |
| 116 | } else { |
| 117 | if _, err := enc.w.Write(pbutil.MustMarshal(&m.Entries[i])); err != nil { |
| 118 | return err |
| 119 | } |
| 120 | } |
| 121 | enc.index++ |
| 122 | } |
| 123 | // write commit index |
| 124 | binary.BigEndian.PutUint64(enc.uint64buf, m.Commit) |
| 125 | if _, err := enc.w.Write(enc.uint64buf); err != nil { |
| 126 | return err |
| 127 | } |
| 128 | enc.fs.Succ(time.Since(start)) |
| 129 | default: |
| 130 | if err := binary.Write(enc.w, binary.BigEndian, msgTypeApp); err != nil { |
| 131 | return err |
| 132 | } |
| 133 | // write size of message |
| 134 | if err := binary.Write(enc.w, binary.BigEndian, uint64(m.Size())); err != nil { |
| 135 | return err |
| 136 | } |
| 137 | // write message |
| 138 | if _, err := enc.w.Write(pbutil.MustMarshal(m)); err != nil { |
| 139 | return err |
| 140 | } |
| 141 | |
| 142 | enc.term = m.Term |
| 143 | enc.index = m.Index |
| 144 | if l := len(m.Entries); l > 0 { |
| 145 | enc.index = m.Entries[l-1].Index |
| 146 | } |
| 147 | enc.fs.Succ(time.Since(start)) |
| 148 | } |
| 149 | return nil |
| 150 | } |
| 151 | |
| 152 | type msgAppV2Decoder struct { |
| 153 | r io.Reader |
| 154 | local, remote types.ID |
| 155 | |
| 156 | term uint64 |
| 157 | index uint64 |
| 158 | buf []byte |
| 159 | uint64buf []byte |
| 160 | uint8buf []byte |
| 161 | } |
| 162 | |
| 163 | func newMsgAppV2Decoder(r io.Reader, local, remote types.ID) *msgAppV2Decoder { |
| 164 | return &msgAppV2Decoder{ |
| 165 | r: r, |
| 166 | local: local, |
| 167 | remote: remote, |
| 168 | buf: make([]byte, msgAppV2BufSize), |
| 169 | uint64buf: make([]byte, 8), |
| 170 | uint8buf: make([]byte, 1), |
| 171 | } |
| 172 | } |
| 173 | |
| 174 | func (dec *msgAppV2Decoder) decode() (raftpb.Message, error) { |
| 175 | var ( |
| 176 | m raftpb.Message |
| 177 | typ uint8 |
| 178 | ) |
| 179 | if _, err := io.ReadFull(dec.r, dec.uint8buf); err != nil { |
| 180 | return m, err |
| 181 | } |
| 182 | typ = uint8(dec.uint8buf[0]) |
| 183 | switch typ { |
| 184 | case msgTypeLinkHeartbeat: |
| 185 | return linkHeartbeatMessage, nil |
| 186 | case msgTypeAppEntries: |
| 187 | m = raftpb.Message{ |
| 188 | Type: raftpb.MsgApp, |
| 189 | From: uint64(dec.remote), |
| 190 | To: uint64(dec.local), |
| 191 | Term: dec.term, |
| 192 | LogTerm: dec.term, |
| 193 | Index: dec.index, |
| 194 | } |
| 195 | |
| 196 | // decode entries |
| 197 | if _, err := io.ReadFull(dec.r, dec.uint64buf); err != nil { |
| 198 | return m, err |
| 199 | } |
| 200 | l := binary.BigEndian.Uint64(dec.uint64buf) |
| 201 | m.Entries = make([]raftpb.Entry, int(l)) |
| 202 | for i := 0; i < int(l); i++ { |
| 203 | if _, err := io.ReadFull(dec.r, dec.uint64buf); err != nil { |
| 204 | return m, err |
| 205 | } |
| 206 | size := binary.BigEndian.Uint64(dec.uint64buf) |
| 207 | var buf []byte |
| 208 | if size < msgAppV2BufSize { |
| 209 | buf = dec.buf[:size] |
| 210 | if _, err := io.ReadFull(dec.r, buf); err != nil { |
| 211 | return m, err |
| 212 | } |
| 213 | } else { |
| 214 | buf = make([]byte, int(size)) |
| 215 | if _, err := io.ReadFull(dec.r, buf); err != nil { |
| 216 | return m, err |
| 217 | } |
| 218 | } |
| 219 | dec.index++ |
| 220 | // 1 alloc |
| 221 | pbutil.MustUnmarshal(&m.Entries[i], buf) |
| 222 | } |
| 223 | // decode commit index |
| 224 | if _, err := io.ReadFull(dec.r, dec.uint64buf); err != nil { |
| 225 | return m, err |
| 226 | } |
| 227 | m.Commit = binary.BigEndian.Uint64(dec.uint64buf) |
| 228 | case msgTypeApp: |
| 229 | var size uint64 |
| 230 | if err := binary.Read(dec.r, binary.BigEndian, &size); err != nil { |
| 231 | return m, err |
| 232 | } |
| 233 | buf := make([]byte, int(size)) |
| 234 | if _, err := io.ReadFull(dec.r, buf); err != nil { |
| 235 | return m, err |
| 236 | } |
| 237 | pbutil.MustUnmarshal(&m, buf) |
| 238 | |
| 239 | dec.term = m.Term |
| 240 | dec.index = m.Index |
| 241 | if l := len(m.Entries); l > 0 { |
| 242 | dec.index = m.Entries[l-1].Index |
| 243 | } |
| 244 | default: |
| 245 | return m, fmt.Errorf("failed to parse type %d in msgappv2 stream", typ) |
| 246 | } |
| 247 | return m, nil |
| 248 | } |