blob: af49c18b1d90ed201fe07bc4e219480aaef45eb1 [file] [log] [blame]
khenaidood948f772021-08-11 17:49:24 -04001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package rafthttp
16
17import (
18 "context"
19 "fmt"
20 "io"
21 "io/ioutil"
22 "net/http"
23 "path"
24 "strings"
25 "sync"
26 "time"
27
28 "golang.org/x/time/rate"
29
30 "github.com/coreos/etcd/etcdserver/stats"
31 "github.com/coreos/etcd/pkg/httputil"
32 "github.com/coreos/etcd/pkg/transport"
33 "github.com/coreos/etcd/pkg/types"
34 "github.com/coreos/etcd/raft/raftpb"
35 "github.com/coreos/etcd/version"
36 "github.com/coreos/go-semver/semver"
37)
38
39const (
40 streamTypeMessage streamType = "message"
41 streamTypeMsgAppV2 streamType = "msgappv2"
42
43 streamBufSize = 4096
44)
45
46var (
47 errUnsupportedStreamType = fmt.Errorf("unsupported stream type")
48
49 // the key is in string format "major.minor.patch"
50 supportedStream = map[string][]streamType{
51 "2.0.0": {},
52 "2.1.0": {streamTypeMsgAppV2, streamTypeMessage},
53 "2.2.0": {streamTypeMsgAppV2, streamTypeMessage},
54 "2.3.0": {streamTypeMsgAppV2, streamTypeMessage},
55 "3.0.0": {streamTypeMsgAppV2, streamTypeMessage},
56 "3.1.0": {streamTypeMsgAppV2, streamTypeMessage},
57 "3.2.0": {streamTypeMsgAppV2, streamTypeMessage},
58 "3.3.0": {streamTypeMsgAppV2, streamTypeMessage},
59 }
60)
61
62type streamType string
63
64func (t streamType) endpoint() string {
65 switch t {
66 case streamTypeMsgAppV2:
67 return path.Join(RaftStreamPrefix, "msgapp")
68 case streamTypeMessage:
69 return path.Join(RaftStreamPrefix, "message")
70 default:
71 plog.Panicf("unhandled stream type %v", t)
72 return ""
73 }
74}
75
76func (t streamType) String() string {
77 switch t {
78 case streamTypeMsgAppV2:
79 return "stream MsgApp v2"
80 case streamTypeMessage:
81 return "stream Message"
82 default:
83 return "unknown stream"
84 }
85}
86
87var (
88 // linkHeartbeatMessage is a special message used as heartbeat message in
89 // link layer. It never conflicts with messages from raft because raft
90 // doesn't send out messages without From and To fields.
91 linkHeartbeatMessage = raftpb.Message{Type: raftpb.MsgHeartbeat}
92)
93
94func isLinkHeartbeatMessage(m *raftpb.Message) bool {
95 return m.Type == raftpb.MsgHeartbeat && m.From == 0 && m.To == 0
96}
97
98type outgoingConn struct {
99 t streamType
100 io.Writer
101 http.Flusher
102 io.Closer
103}
104
105// streamWriter writes messages to the attached outgoingConn.
106type streamWriter struct {
107 peerID types.ID
108 status *peerStatus
109 fs *stats.FollowerStats
110 r Raft
111
112 mu sync.Mutex // guard field working and closer
113 closer io.Closer
114 working bool
115
116 msgc chan raftpb.Message
117 connc chan *outgoingConn
118 stopc chan struct{}
119 done chan struct{}
120}
121
122// startStreamWriter creates a streamWrite and starts a long running go-routine that accepts
123// messages and writes to the attached outgoing connection.
124func startStreamWriter(id types.ID, status *peerStatus, fs *stats.FollowerStats, r Raft) *streamWriter {
125 w := &streamWriter{
126 peerID: id,
127 status: status,
128 fs: fs,
129 r: r,
130 msgc: make(chan raftpb.Message, streamBufSize),
131 connc: make(chan *outgoingConn),
132 stopc: make(chan struct{}),
133 done: make(chan struct{}),
134 }
135 go w.run()
136 return w
137}
138
139func (cw *streamWriter) run() {
140 var (
141 msgc chan raftpb.Message
142 heartbeatc <-chan time.Time
143 t streamType
144 enc encoder
145 flusher http.Flusher
146 batched int
147 )
148 tickc := time.NewTicker(ConnReadTimeout / 3)
149 defer tickc.Stop()
150 unflushed := 0
151
152 plog.Infof("started streaming with peer %s (writer)", cw.peerID)
153
154 for {
155 select {
156 case <-heartbeatc:
157 err := enc.encode(&linkHeartbeatMessage)
158 unflushed += linkHeartbeatMessage.Size()
159 if err == nil {
160 flusher.Flush()
161 batched = 0
162 sentBytes.WithLabelValues(cw.peerID.String()).Add(float64(unflushed))
163 unflushed = 0
164 continue
165 }
166
167 cw.status.deactivate(failureType{source: t.String(), action: "heartbeat"}, err.Error())
168
169 sentFailures.WithLabelValues(cw.peerID.String()).Inc()
170 cw.close()
171 plog.Warningf("lost the TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
172 heartbeatc, msgc = nil, nil
173
174 case m := <-msgc:
175 err := enc.encode(&m)
176 if err == nil {
177 unflushed += m.Size()
178
179 if len(msgc) == 0 || batched > streamBufSize/2 {
180 flusher.Flush()
181 sentBytes.WithLabelValues(cw.peerID.String()).Add(float64(unflushed))
182 unflushed = 0
183 batched = 0
184 } else {
185 batched++
186 }
187
188 continue
189 }
190
191 cw.status.deactivate(failureType{source: t.String(), action: "write"}, err.Error())
192 cw.close()
193 plog.Warningf("lost the TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
194 heartbeatc, msgc = nil, nil
195 cw.r.ReportUnreachable(m.To)
196 sentFailures.WithLabelValues(cw.peerID.String()).Inc()
197
198 case conn := <-cw.connc:
199 cw.mu.Lock()
200 closed := cw.closeUnlocked()
201 t = conn.t
202 switch conn.t {
203 case streamTypeMsgAppV2:
204 enc = newMsgAppV2Encoder(conn.Writer, cw.fs)
205 case streamTypeMessage:
206 enc = &messageEncoder{w: conn.Writer}
207 default:
208 plog.Panicf("unhandled stream type %s", conn.t)
209 }
210 flusher = conn.Flusher
211 unflushed = 0
212 cw.status.activate()
213 cw.closer = conn.Closer
214 cw.working = true
215 cw.mu.Unlock()
216
217 if closed {
218 plog.Warningf("closed an existing TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
219 }
220 plog.Infof("established a TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
221 heartbeatc, msgc = tickc.C, cw.msgc
222 case <-cw.stopc:
223 if cw.close() {
224 plog.Infof("closed the TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
225 }
226 plog.Infof("stopped streaming with peer %s (writer)", cw.peerID)
227 close(cw.done)
228 return
229 }
230 }
231}
232
233func (cw *streamWriter) writec() (chan<- raftpb.Message, bool) {
234 cw.mu.Lock()
235 defer cw.mu.Unlock()
236 return cw.msgc, cw.working
237}
238
239func (cw *streamWriter) close() bool {
240 cw.mu.Lock()
241 defer cw.mu.Unlock()
242 return cw.closeUnlocked()
243}
244
245func (cw *streamWriter) closeUnlocked() bool {
246 if !cw.working {
247 return false
248 }
249 if err := cw.closer.Close(); err != nil {
250 plog.Errorf("peer %s (writer) connection close error: %v", cw.peerID, err)
251 }
252 if len(cw.msgc) > 0 {
253 cw.r.ReportUnreachable(uint64(cw.peerID))
254 }
255 cw.msgc = make(chan raftpb.Message, streamBufSize)
256 cw.working = false
257 return true
258}
259
260func (cw *streamWriter) attach(conn *outgoingConn) bool {
261 select {
262 case cw.connc <- conn:
263 return true
264 case <-cw.done:
265 return false
266 }
267}
268
269func (cw *streamWriter) stop() {
270 close(cw.stopc)
271 <-cw.done
272}
273
274// streamReader is a long-running go-routine that dials to the remote stream
275// endpoint and reads messages from the response body returned.
276type streamReader struct {
277 peerID types.ID
278 typ streamType
279
280 tr *Transport
281 picker *urlPicker
282 status *peerStatus
283 recvc chan<- raftpb.Message
284 propc chan<- raftpb.Message
285
286 rl *rate.Limiter // alters the frequency of dial retrial attempts
287
288 errorc chan<- error
289
290 mu sync.Mutex
291 paused bool
292 closer io.Closer
293
294 ctx context.Context
295 cancel context.CancelFunc
296 done chan struct{}
297}
298
299func (cr *streamReader) start() {
300 cr.done = make(chan struct{})
301 if cr.errorc == nil {
302 cr.errorc = cr.tr.ErrorC
303 }
304 if cr.ctx == nil {
305 cr.ctx, cr.cancel = context.WithCancel(context.Background())
306 }
307 go cr.run()
308}
309
310func (cr *streamReader) run() {
311 t := cr.typ
312 plog.Infof("started streaming with peer %s (%s reader)", cr.peerID, t)
313 for {
314 rc, err := cr.dial(t)
315 if err != nil {
316 if err != errUnsupportedStreamType {
317 cr.status.deactivate(failureType{source: t.String(), action: "dial"}, err.Error())
318 }
319 } else {
320 cr.status.activate()
321 plog.Infof("established a TCP streaming connection with peer %s (%s reader)", cr.peerID, cr.typ)
322 err = cr.decodeLoop(rc, t)
323 plog.Warningf("lost the TCP streaming connection with peer %s (%s reader)", cr.peerID, cr.typ)
324 switch {
325 // all data is read out
326 case err == io.EOF:
327 // connection is closed by the remote
328 case transport.IsClosedConnError(err):
329 default:
330 cr.status.deactivate(failureType{source: t.String(), action: "read"}, err.Error())
331 }
332 }
333 // Wait for a while before new dial attempt
334 err = cr.rl.Wait(cr.ctx)
335 if cr.ctx.Err() != nil {
336 plog.Infof("stopped streaming with peer %s (%s reader)", cr.peerID, t)
337 close(cr.done)
338 return
339 }
340 if err != nil {
341 plog.Errorf("streaming with peer %s (%s reader) rate limiter error: %v", cr.peerID, t, err)
342 }
343 }
344}
345
346func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error {
347 var dec decoder
348 cr.mu.Lock()
349 switch t {
350 case streamTypeMsgAppV2:
351 dec = newMsgAppV2Decoder(rc, cr.tr.ID, cr.peerID)
352 case streamTypeMessage:
353 dec = &messageDecoder{r: rc}
354 default:
355 plog.Panicf("unhandled stream type %s", t)
356 }
357 select {
358 case <-cr.ctx.Done():
359 cr.mu.Unlock()
360 if err := rc.Close(); err != nil {
361 return err
362 }
363 return io.EOF
364 default:
365 cr.closer = rc
366 }
367 cr.mu.Unlock()
368
369 for {
370 m, err := dec.decode()
371 if err != nil {
372 cr.mu.Lock()
373 cr.close()
374 cr.mu.Unlock()
375 return err
376 }
377
378 receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(m.Size()))
379
380 cr.mu.Lock()
381 paused := cr.paused
382 cr.mu.Unlock()
383
384 if paused {
385 continue
386 }
387
388 if isLinkHeartbeatMessage(&m) {
389 // raft is not interested in link layer
390 // heartbeat message, so we should ignore
391 // it.
392 continue
393 }
394
395 recvc := cr.recvc
396 if m.Type == raftpb.MsgProp {
397 recvc = cr.propc
398 }
399
400 select {
401 case recvc <- m:
402 default:
403 if cr.status.isActive() {
404 plog.MergeWarningf("dropped internal raft message from %s since receiving buffer is full (overloaded network)", types.ID(m.From))
405 }
406 plog.Debugf("dropped %s from %s since receiving buffer is full", m.Type, types.ID(m.From))
407 recvFailures.WithLabelValues(types.ID(m.From).String()).Inc()
408 }
409 }
410}
411
412func (cr *streamReader) stop() {
413 cr.mu.Lock()
414 cr.cancel()
415 cr.close()
416 cr.mu.Unlock()
417 <-cr.done
418}
419
420func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {
421 u := cr.picker.pick()
422 uu := u
423 uu.Path = path.Join(t.endpoint(), cr.tr.ID.String())
424
425 req, err := http.NewRequest("GET", uu.String(), nil)
426 if err != nil {
427 cr.picker.unreachable(u)
428 return nil, fmt.Errorf("failed to make http request to %v (%v)", u, err)
429 }
430 req.Header.Set("X-Server-From", cr.tr.ID.String())
431 req.Header.Set("X-Server-Version", version.Version)
432 req.Header.Set("X-Min-Cluster-Version", version.MinClusterVersion)
433 req.Header.Set("X-Etcd-Cluster-ID", cr.tr.ClusterID.String())
434 req.Header.Set("X-Raft-To", cr.peerID.String())
435
436 setPeerURLsHeader(req, cr.tr.URLs)
437
438 req = req.WithContext(cr.ctx)
439
440 cr.mu.Lock()
441 select {
442 case <-cr.ctx.Done():
443 cr.mu.Unlock()
444 return nil, fmt.Errorf("stream reader is stopped")
445 default:
446 }
447 cr.mu.Unlock()
448
449 resp, err := cr.tr.streamRt.RoundTrip(req)
450 if err != nil {
451 cr.picker.unreachable(u)
452 return nil, err
453 }
454
455 rv := serverVersion(resp.Header)
456 lv := semver.Must(semver.NewVersion(version.Version))
457 if compareMajorMinorVersion(rv, lv) == -1 && !checkStreamSupport(rv, t) {
458 httputil.GracefulClose(resp)
459 cr.picker.unreachable(u)
460 return nil, errUnsupportedStreamType
461 }
462
463 switch resp.StatusCode {
464 case http.StatusGone:
465 httputil.GracefulClose(resp)
466 cr.picker.unreachable(u)
467 reportCriticalError(errMemberRemoved, cr.errorc)
468 return nil, errMemberRemoved
469 case http.StatusOK:
470 return resp.Body, nil
471 case http.StatusNotFound:
472 httputil.GracefulClose(resp)
473 cr.picker.unreachable(u)
474 return nil, fmt.Errorf("peer %s failed to find local node %s", cr.peerID, cr.tr.ID)
475 case http.StatusPreconditionFailed:
476 b, err := ioutil.ReadAll(resp.Body)
477 if err != nil {
478 cr.picker.unreachable(u)
479 return nil, err
480 }
481 httputil.GracefulClose(resp)
482 cr.picker.unreachable(u)
483
484 switch strings.TrimSuffix(string(b), "\n") {
485 case errIncompatibleVersion.Error():
486 plog.Errorf("request sent was ignored by peer %s (server version incompatible)", cr.peerID)
487 return nil, errIncompatibleVersion
488 case errClusterIDMismatch.Error():
489 plog.Errorf("request sent was ignored (cluster ID mismatch: peer[%s]=%s, local=%s)",
490 cr.peerID, resp.Header.Get("X-Etcd-Cluster-ID"), cr.tr.ClusterID)
491 return nil, errClusterIDMismatch
492 default:
493 return nil, fmt.Errorf("unhandled error %q when precondition failed", string(b))
494 }
495 default:
496 httputil.GracefulClose(resp)
497 cr.picker.unreachable(u)
498 return nil, fmt.Errorf("unhandled http status %d", resp.StatusCode)
499 }
500}
501
502func (cr *streamReader) close() {
503 if cr.closer != nil {
504 if err := cr.closer.Close(); err != nil {
505 plog.Errorf("peer %s (reader) connection close error: %v", cr.peerID, err)
506 }
507 }
508 cr.closer = nil
509}
510
511func (cr *streamReader) pause() {
512 cr.mu.Lock()
513 defer cr.mu.Unlock()
514 cr.paused = true
515}
516
517func (cr *streamReader) resume() {
518 cr.mu.Lock()
519 defer cr.mu.Unlock()
520 cr.paused = false
521}
522
523// checkStreamSupport checks whether the stream type is supported in the
524// given version.
525func checkStreamSupport(v *semver.Version, t streamType) bool {
526 nv := &semver.Version{Major: v.Major, Minor: v.Minor}
527 for _, s := range supportedStream[nv.String()] {
528 if s == t {
529 return true
530 }
531 }
532 return false
533}