blob: dcb2223ca59ac9fb046851b8ad7dd751575df00a [file] [log] [blame]
khenaidoo59ce9dd2019-11-11 13:05:32 -05001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package rafthttp
16
17import (
18 "context"
19 "fmt"
20 "io"
21 "io/ioutil"
22 "net/http"
23 "path"
24 "strings"
25 "sync"
26 "time"
27
28 stats "go.etcd.io/etcd/etcdserver/api/v2stats"
29 "go.etcd.io/etcd/pkg/httputil"
30 "go.etcd.io/etcd/pkg/transport"
31 "go.etcd.io/etcd/pkg/types"
32 "go.etcd.io/etcd/raft/raftpb"
33 "go.etcd.io/etcd/version"
34
35 "github.com/coreos/go-semver/semver"
36 "go.uber.org/zap"
37 "golang.org/x/time/rate"
38)
39
40const (
41 streamTypeMessage streamType = "message"
42 streamTypeMsgAppV2 streamType = "msgappv2"
43
44 streamBufSize = 4096
45)
46
47var (
48 errUnsupportedStreamType = fmt.Errorf("unsupported stream type")
49
50 // the key is in string format "major.minor.patch"
51 supportedStream = map[string][]streamType{
52 "2.0.0": {},
53 "2.1.0": {streamTypeMsgAppV2, streamTypeMessage},
54 "2.2.0": {streamTypeMsgAppV2, streamTypeMessage},
55 "2.3.0": {streamTypeMsgAppV2, streamTypeMessage},
56 "3.0.0": {streamTypeMsgAppV2, streamTypeMessage},
57 "3.1.0": {streamTypeMsgAppV2, streamTypeMessage},
58 "3.2.0": {streamTypeMsgAppV2, streamTypeMessage},
59 "3.3.0": {streamTypeMsgAppV2, streamTypeMessage},
60 }
61)
62
63type streamType string
64
65func (t streamType) endpoint() string {
66 switch t {
67 case streamTypeMsgAppV2:
68 return path.Join(RaftStreamPrefix, "msgapp")
69 case streamTypeMessage:
70 return path.Join(RaftStreamPrefix, "message")
71 default:
72 plog.Panicf("unhandled stream type %v", t)
73 return ""
74 }
75}
76
77func (t streamType) String() string {
78 switch t {
79 case streamTypeMsgAppV2:
80 return "stream MsgApp v2"
81 case streamTypeMessage:
82 return "stream Message"
83 default:
84 return "unknown stream"
85 }
86}
87
88var (
89 // linkHeartbeatMessage is a special message used as heartbeat message in
90 // link layer. It never conflicts with messages from raft because raft
91 // doesn't send out messages without From and To fields.
92 linkHeartbeatMessage = raftpb.Message{Type: raftpb.MsgHeartbeat}
93)
94
95func isLinkHeartbeatMessage(m *raftpb.Message) bool {
96 return m.Type == raftpb.MsgHeartbeat && m.From == 0 && m.To == 0
97}
98
99type outgoingConn struct {
100 t streamType
101 io.Writer
102 http.Flusher
103 io.Closer
104
105 localID types.ID
106 peerID types.ID
107}
108
109// streamWriter writes messages to the attached outgoingConn.
110type streamWriter struct {
111 lg *zap.Logger
112
113 localID types.ID
114 peerID types.ID
115
116 status *peerStatus
117 fs *stats.FollowerStats
118 r Raft
119
120 mu sync.Mutex // guard field working and closer
121 closer io.Closer
122 working bool
123
124 msgc chan raftpb.Message
125 connc chan *outgoingConn
126 stopc chan struct{}
127 done chan struct{}
128}
129
130// startStreamWriter creates a streamWrite and starts a long running go-routine that accepts
131// messages and writes to the attached outgoing connection.
132func startStreamWriter(lg *zap.Logger, local, id types.ID, status *peerStatus, fs *stats.FollowerStats, r Raft) *streamWriter {
133 w := &streamWriter{
134 lg: lg,
135
136 localID: local,
137 peerID: id,
138
139 status: status,
140 fs: fs,
141 r: r,
142 msgc: make(chan raftpb.Message, streamBufSize),
143 connc: make(chan *outgoingConn),
144 stopc: make(chan struct{}),
145 done: make(chan struct{}),
146 }
147 go w.run()
148 return w
149}
150
151func (cw *streamWriter) run() {
152 var (
153 msgc chan raftpb.Message
154 heartbeatc <-chan time.Time
155 t streamType
156 enc encoder
157 flusher http.Flusher
158 batched int
159 )
160 tickc := time.NewTicker(ConnReadTimeout / 3)
161 defer tickc.Stop()
162 unflushed := 0
163
164 if cw.lg != nil {
165 cw.lg.Info(
166 "started stream writer with remote peer",
167 zap.String("local-member-id", cw.localID.String()),
168 zap.String("remote-peer-id", cw.peerID.String()),
169 )
170 } else {
171 plog.Infof("started streaming with peer %s (writer)", cw.peerID)
172 }
173
174 for {
175 select {
176 case <-heartbeatc:
177 err := enc.encode(&linkHeartbeatMessage)
178 unflushed += linkHeartbeatMessage.Size()
179 if err == nil {
180 flusher.Flush()
181 batched = 0
182 sentBytes.WithLabelValues(cw.peerID.String()).Add(float64(unflushed))
183 unflushed = 0
184 continue
185 }
186
187 cw.status.deactivate(failureType{source: t.String(), action: "heartbeat"}, err.Error())
188
189 sentFailures.WithLabelValues(cw.peerID.String()).Inc()
190 cw.close()
191 if cw.lg != nil {
192 cw.lg.Warn(
193 "lost TCP streaming connection with remote peer",
194 zap.String("stream-writer-type", t.String()),
195 zap.String("local-member-id", cw.localID.String()),
196 zap.String("remote-peer-id", cw.peerID.String()),
197 )
198 } else {
199 plog.Warningf("lost the TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
200 }
201 heartbeatc, msgc = nil, nil
202
203 case m := <-msgc:
204 err := enc.encode(&m)
205 if err == nil {
206 unflushed += m.Size()
207
208 if len(msgc) == 0 || batched > streamBufSize/2 {
209 flusher.Flush()
210 sentBytes.WithLabelValues(cw.peerID.String()).Add(float64(unflushed))
211 unflushed = 0
212 batched = 0
213 } else {
214 batched++
215 }
216
217 continue
218 }
219
220 cw.status.deactivate(failureType{source: t.String(), action: "write"}, err.Error())
221 cw.close()
222 if cw.lg != nil {
223 cw.lg.Warn(
224 "lost TCP streaming connection with remote peer",
225 zap.String("stream-writer-type", t.String()),
226 zap.String("local-member-id", cw.localID.String()),
227 zap.String("remote-peer-id", cw.peerID.String()),
228 )
229 } else {
230 plog.Warningf("lost the TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
231 }
232 heartbeatc, msgc = nil, nil
233 cw.r.ReportUnreachable(m.To)
234 sentFailures.WithLabelValues(cw.peerID.String()).Inc()
235
236 case conn := <-cw.connc:
237 cw.mu.Lock()
238 closed := cw.closeUnlocked()
239 t = conn.t
240 switch conn.t {
241 case streamTypeMsgAppV2:
242 enc = newMsgAppV2Encoder(conn.Writer, cw.fs)
243 case streamTypeMessage:
244 enc = &messageEncoder{w: conn.Writer}
245 default:
246 plog.Panicf("unhandled stream type %s", conn.t)
247 }
248 if cw.lg != nil {
249 cw.lg.Info(
250 "set message encoder",
251 zap.String("from", conn.localID.String()),
252 zap.String("to", conn.peerID.String()),
253 zap.String("stream-type", t.String()),
254 )
255 }
256 flusher = conn.Flusher
257 unflushed = 0
258 cw.status.activate()
259 cw.closer = conn.Closer
260 cw.working = true
261 cw.mu.Unlock()
262
263 if closed {
264 if cw.lg != nil {
265 cw.lg.Warn(
266 "closed TCP streaming connection with remote peer",
267 zap.String("stream-writer-type", t.String()),
268 zap.String("local-member-id", cw.localID.String()),
269 zap.String("remote-peer-id", cw.peerID.String()),
270 )
271 } else {
272 plog.Warningf("closed an existing TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
273 }
274 }
275 if cw.lg != nil {
276 cw.lg.Warn(
277 "established TCP streaming connection with remote peer",
278 zap.String("stream-writer-type", t.String()),
279 zap.String("local-member-id", cw.localID.String()),
280 zap.String("remote-peer-id", cw.peerID.String()),
281 )
282 } else {
283 plog.Infof("established a TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
284 }
285 heartbeatc, msgc = tickc.C, cw.msgc
286
287 case <-cw.stopc:
288 if cw.close() {
289 if cw.lg != nil {
290 cw.lg.Warn(
291 "closed TCP streaming connection with remote peer",
292 zap.String("stream-writer-type", t.String()),
293 zap.String("remote-peer-id", cw.peerID.String()),
294 )
295 } else {
296 plog.Infof("closed the TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
297 }
298 }
299 if cw.lg != nil {
300 cw.lg.Warn(
301 "stopped TCP streaming connection with remote peer",
302 zap.String("stream-writer-type", t.String()),
303 zap.String("remote-peer-id", cw.peerID.String()),
304 )
305 } else {
306 plog.Infof("stopped streaming with peer %s (writer)", cw.peerID)
307 }
308 close(cw.done)
309 return
310 }
311 }
312}
313
314func (cw *streamWriter) writec() (chan<- raftpb.Message, bool) {
315 cw.mu.Lock()
316 defer cw.mu.Unlock()
317 return cw.msgc, cw.working
318}
319
320func (cw *streamWriter) close() bool {
321 cw.mu.Lock()
322 defer cw.mu.Unlock()
323 return cw.closeUnlocked()
324}
325
326func (cw *streamWriter) closeUnlocked() bool {
327 if !cw.working {
328 return false
329 }
330 if err := cw.closer.Close(); err != nil {
331 if cw.lg != nil {
332 cw.lg.Warn(
333 "failed to close connection with remote peer",
334 zap.String("remote-peer-id", cw.peerID.String()),
335 zap.Error(err),
336 )
337 } else {
338 plog.Errorf("peer %s (writer) connection close error: %v", cw.peerID, err)
339 }
340 }
341 if len(cw.msgc) > 0 {
342 cw.r.ReportUnreachable(uint64(cw.peerID))
343 }
344 cw.msgc = make(chan raftpb.Message, streamBufSize)
345 cw.working = false
346 return true
347}
348
349func (cw *streamWriter) attach(conn *outgoingConn) bool {
350 select {
351 case cw.connc <- conn:
352 return true
353 case <-cw.done:
354 return false
355 }
356}
357
358func (cw *streamWriter) stop() {
359 close(cw.stopc)
360 <-cw.done
361}
362
363// streamReader is a long-running go-routine that dials to the remote stream
364// endpoint and reads messages from the response body returned.
365type streamReader struct {
366 lg *zap.Logger
367
368 peerID types.ID
369 typ streamType
370
371 tr *Transport
372 picker *urlPicker
373 status *peerStatus
374 recvc chan<- raftpb.Message
375 propc chan<- raftpb.Message
376
377 rl *rate.Limiter // alters the frequency of dial retrial attempts
378
379 errorc chan<- error
380
381 mu sync.Mutex
382 paused bool
383 closer io.Closer
384
385 ctx context.Context
386 cancel context.CancelFunc
387 done chan struct{}
388}
389
390func (cr *streamReader) start() {
391 cr.done = make(chan struct{})
392 if cr.errorc == nil {
393 cr.errorc = cr.tr.ErrorC
394 }
395 if cr.ctx == nil {
396 cr.ctx, cr.cancel = context.WithCancel(context.Background())
397 }
398 go cr.run()
399}
400
401func (cr *streamReader) run() {
402 t := cr.typ
403
404 if cr.lg != nil {
405 cr.lg.Info(
406 "started stream reader with remote peer",
407 zap.String("stream-reader-type", t.String()),
408 zap.String("local-member-id", cr.tr.ID.String()),
409 zap.String("remote-peer-id", cr.peerID.String()),
410 )
411 } else {
412 plog.Infof("started streaming with peer %s (%s reader)", cr.peerID, t)
413 }
414
415 for {
416 rc, err := cr.dial(t)
417 if err != nil {
418 if err != errUnsupportedStreamType {
419 cr.status.deactivate(failureType{source: t.String(), action: "dial"}, err.Error())
420 }
421 } else {
422 cr.status.activate()
423 if cr.lg != nil {
424 cr.lg.Info(
425 "established TCP streaming connection with remote peer",
426 zap.String("stream-reader-type", cr.typ.String()),
427 zap.String("local-member-id", cr.tr.ID.String()),
428 zap.String("remote-peer-id", cr.peerID.String()),
429 )
430 } else {
431 plog.Infof("established a TCP streaming connection with peer %s (%s reader)", cr.peerID, cr.typ)
432 }
433 err = cr.decodeLoop(rc, t)
434 if cr.lg != nil {
435 cr.lg.Warn(
436 "lost TCP streaming connection with remote peer",
437 zap.String("stream-reader-type", cr.typ.String()),
438 zap.String("local-member-id", cr.tr.ID.String()),
439 zap.String("remote-peer-id", cr.peerID.String()),
440 zap.Error(err),
441 )
442 } else {
443 plog.Warningf("lost the TCP streaming connection with peer %s (%s reader)", cr.peerID, cr.typ)
444 }
445 switch {
446 // all data is read out
447 case err == io.EOF:
448 // connection is closed by the remote
449 case transport.IsClosedConnError(err):
450 default:
451 cr.status.deactivate(failureType{source: t.String(), action: "read"}, err.Error())
452 }
453 }
454 // Wait for a while before new dial attempt
455 err = cr.rl.Wait(cr.ctx)
456 if cr.ctx.Err() != nil {
457 if cr.lg != nil {
458 cr.lg.Info(
459 "stopped stream reader with remote peer",
460 zap.String("stream-reader-type", t.String()),
461 zap.String("local-member-id", cr.tr.ID.String()),
462 zap.String("remote-peer-id", cr.peerID.String()),
463 )
464 } else {
465 plog.Infof("stopped streaming with peer %s (%s reader)", cr.peerID, t)
466 }
467 close(cr.done)
468 return
469 }
470 if err != nil {
471 if cr.lg != nil {
472 cr.lg.Warn(
473 "rate limit on stream reader with remote peer",
474 zap.String("stream-reader-type", t.String()),
475 zap.String("local-member-id", cr.tr.ID.String()),
476 zap.String("remote-peer-id", cr.peerID.String()),
477 zap.Error(err),
478 )
479 } else {
480 plog.Errorf("streaming with peer %s (%s reader) rate limiter error: %v", cr.peerID, t, err)
481 }
482 }
483 }
484}
485
486func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error {
487 var dec decoder
488 cr.mu.Lock()
489 switch t {
490 case streamTypeMsgAppV2:
491 dec = newMsgAppV2Decoder(rc, cr.tr.ID, cr.peerID)
492 case streamTypeMessage:
493 dec = &messageDecoder{r: rc}
494 default:
495 if cr.lg != nil {
496 cr.lg.Panic("unknown stream type", zap.String("type", t.String()))
497 } else {
498 plog.Panicf("unhandled stream type %s", t)
499 }
500 }
501 select {
502 case <-cr.ctx.Done():
503 cr.mu.Unlock()
504 if err := rc.Close(); err != nil {
505 return err
506 }
507 return io.EOF
508 default:
509 cr.closer = rc
510 }
511 cr.mu.Unlock()
512
513 // gofail: labelRaftDropHeartbeat:
514 for {
515 m, err := dec.decode()
516 if err != nil {
517 cr.mu.Lock()
518 cr.close()
519 cr.mu.Unlock()
520 return err
521 }
522
523 // gofail-go: var raftDropHeartbeat struct{}
524 // continue labelRaftDropHeartbeat
525 receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(m.Size()))
526
527 cr.mu.Lock()
528 paused := cr.paused
529 cr.mu.Unlock()
530
531 if paused {
532 continue
533 }
534
535 if isLinkHeartbeatMessage(&m) {
536 // raft is not interested in link layer
537 // heartbeat message, so we should ignore
538 // it.
539 continue
540 }
541
542 recvc := cr.recvc
543 if m.Type == raftpb.MsgProp {
544 recvc = cr.propc
545 }
546
547 select {
548 case recvc <- m:
549 default:
550 if cr.status.isActive() {
551 if cr.lg != nil {
552 cr.lg.Warn(
553 "dropped internal Raft message since receiving buffer is full (overloaded network)",
554 zap.String("message-type", m.Type.String()),
555 zap.String("local-member-id", cr.tr.ID.String()),
556 zap.String("from", types.ID(m.From).String()),
557 zap.String("remote-peer-id", types.ID(m.To).String()),
558 zap.Bool("remote-peer-active", cr.status.isActive()),
559 )
560 } else {
561 plog.MergeWarningf("dropped internal raft message from %s since receiving buffer is full (overloaded network)", types.ID(m.From))
562 }
563 } else {
564 if cr.lg != nil {
565 cr.lg.Warn(
566 "dropped Raft message since receiving buffer is full (overloaded network)",
567 zap.String("message-type", m.Type.String()),
568 zap.String("local-member-id", cr.tr.ID.String()),
569 zap.String("from", types.ID(m.From).String()),
570 zap.String("remote-peer-id", types.ID(m.To).String()),
571 zap.Bool("remote-peer-active", cr.status.isActive()),
572 )
573 } else {
574 plog.Debugf("dropped %s from %s since receiving buffer is full", m.Type, types.ID(m.From))
575 }
576 }
577 recvFailures.WithLabelValues(types.ID(m.From).String()).Inc()
578 }
579 }
580}
581
582func (cr *streamReader) stop() {
583 cr.mu.Lock()
584 cr.cancel()
585 cr.close()
586 cr.mu.Unlock()
587 <-cr.done
588}
589
590func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {
591 u := cr.picker.pick()
592 uu := u
593 uu.Path = path.Join(t.endpoint(), cr.tr.ID.String())
594
595 if cr.lg != nil {
596 cr.lg.Debug(
597 "dial stream reader",
598 zap.String("from", cr.tr.ID.String()),
599 zap.String("to", cr.peerID.String()),
600 zap.String("address", uu.String()),
601 )
602 }
603 req, err := http.NewRequest("GET", uu.String(), nil)
604 if err != nil {
605 cr.picker.unreachable(u)
606 return nil, fmt.Errorf("failed to make http request to %v (%v)", u, err)
607 }
608 req.Header.Set("X-Server-From", cr.tr.ID.String())
609 req.Header.Set("X-Server-Version", version.Version)
610 req.Header.Set("X-Min-Cluster-Version", version.MinClusterVersion)
611 req.Header.Set("X-Etcd-Cluster-ID", cr.tr.ClusterID.String())
612 req.Header.Set("X-Raft-To", cr.peerID.String())
613
614 setPeerURLsHeader(req, cr.tr.URLs)
615
616 req = req.WithContext(cr.ctx)
617
618 cr.mu.Lock()
619 select {
620 case <-cr.ctx.Done():
621 cr.mu.Unlock()
622 return nil, fmt.Errorf("stream reader is stopped")
623 default:
624 }
625 cr.mu.Unlock()
626
627 resp, err := cr.tr.streamRt.RoundTrip(req)
628 if err != nil {
629 cr.picker.unreachable(u)
630 return nil, err
631 }
632
633 rv := serverVersion(resp.Header)
634 lv := semver.Must(semver.NewVersion(version.Version))
635 if compareMajorMinorVersion(rv, lv) == -1 && !checkStreamSupport(rv, t) {
636 httputil.GracefulClose(resp)
637 cr.picker.unreachable(u)
638 return nil, errUnsupportedStreamType
639 }
640
641 switch resp.StatusCode {
642 case http.StatusGone:
643 httputil.GracefulClose(resp)
644 cr.picker.unreachable(u)
645 reportCriticalError(errMemberRemoved, cr.errorc)
646 return nil, errMemberRemoved
647
648 case http.StatusOK:
649 return resp.Body, nil
650
651 case http.StatusNotFound:
652 httputil.GracefulClose(resp)
653 cr.picker.unreachable(u)
654 return nil, fmt.Errorf("peer %s failed to find local node %s", cr.peerID, cr.tr.ID)
655
656 case http.StatusPreconditionFailed:
657 b, err := ioutil.ReadAll(resp.Body)
658 if err != nil {
659 cr.picker.unreachable(u)
660 return nil, err
661 }
662 httputil.GracefulClose(resp)
663 cr.picker.unreachable(u)
664
665 switch strings.TrimSuffix(string(b), "\n") {
666 case errIncompatibleVersion.Error():
667 if cr.lg != nil {
668 cr.lg.Warn(
669 "request sent was ignored by remote peer due to server version incompatibility",
670 zap.String("local-member-id", cr.tr.ID.String()),
671 zap.String("remote-peer-id", cr.peerID.String()),
672 zap.Error(errIncompatibleVersion),
673 )
674 } else {
675 plog.Errorf("request sent was ignored by peer %s (server version incompatible)", cr.peerID)
676 }
677 return nil, errIncompatibleVersion
678
679 case errClusterIDMismatch.Error():
680 if cr.lg != nil {
681 cr.lg.Warn(
682 "request sent was ignored by remote peer due to cluster ID mismatch",
683 zap.String("remote-peer-id", cr.peerID.String()),
684 zap.String("remote-peer-cluster-id", resp.Header.Get("X-Etcd-Cluster-ID")),
685 zap.String("local-member-id", cr.tr.ID.String()),
686 zap.String("local-member-cluster-id", cr.tr.ClusterID.String()),
687 zap.Error(errClusterIDMismatch),
688 )
689 } else {
690 plog.Errorf("request sent was ignored (cluster ID mismatch: peer[%s]=%s, local=%s)",
691 cr.peerID, resp.Header.Get("X-Etcd-Cluster-ID"), cr.tr.ClusterID)
692 }
693 return nil, errClusterIDMismatch
694
695 default:
696 return nil, fmt.Errorf("unhandled error %q when precondition failed", string(b))
697 }
698
699 default:
700 httputil.GracefulClose(resp)
701 cr.picker.unreachable(u)
702 return nil, fmt.Errorf("unhandled http status %d", resp.StatusCode)
703 }
704}
705
706func (cr *streamReader) close() {
707 if cr.closer != nil {
708 if err := cr.closer.Close(); err != nil {
709 if cr.lg != nil {
710 cr.lg.Warn(
711 "failed to close remote peer connection",
712 zap.String("local-member-id", cr.tr.ID.String()),
713 zap.String("remote-peer-id", cr.peerID.String()),
714 zap.Error(err),
715 )
716 } else {
717 plog.Errorf("peer %s (reader) connection close error: %v", cr.peerID, err)
718 }
719 }
720 }
721 cr.closer = nil
722}
723
724func (cr *streamReader) pause() {
725 cr.mu.Lock()
726 defer cr.mu.Unlock()
727 cr.paused = true
728}
729
730func (cr *streamReader) resume() {
731 cr.mu.Lock()
732 defer cr.mu.Unlock()
733 cr.paused = false
734}
735
736// checkStreamSupport checks whether the stream type is supported in the
737// given version.
738func checkStreamSupport(v *semver.Version, t streamType) bool {
739 nv := &semver.Version{Major: v.Major, Minor: v.Minor}
740 for _, s := range supportedStream[nv.String()] {
741 if s == t {
742 return true
743 }
744 }
745 return false
746}