blob: d9f07c3479d93e5c4b1c0c20dc76a73a22a56bc2 [file] [log] [blame]
khenaidood948f772021-08-11 17:49:24 -04001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package rafthttp
16
17import (
18 "bytes"
19 "context"
20 "errors"
21 "io/ioutil"
22 "sync"
23 "time"
24
25 "github.com/coreos/etcd/etcdserver/stats"
26 "github.com/coreos/etcd/pkg/pbutil"
27 "github.com/coreos/etcd/pkg/types"
28 "github.com/coreos/etcd/raft"
29 "github.com/coreos/etcd/raft/raftpb"
30)
31
32const (
33 connPerPipeline = 4
34 // pipelineBufSize is the size of pipeline buffer, which helps hold the
35 // temporary network latency.
36 // The size ensures that pipeline does not drop messages when the network
37 // is out of work for less than 1 second in good path.
38 pipelineBufSize = 64
39)
40
41var errStopped = errors.New("stopped")
42
43type pipeline struct {
44 peerID types.ID
45
46 tr *Transport
47 picker *urlPicker
48 status *peerStatus
49 raft Raft
50 errorc chan error
51 // deprecate when we depercate v2 API
52 followerStats *stats.FollowerStats
53
54 msgc chan raftpb.Message
55 // wait for the handling routines
56 wg sync.WaitGroup
57 stopc chan struct{}
58}
59
60func (p *pipeline) start() {
61 p.stopc = make(chan struct{})
62 p.msgc = make(chan raftpb.Message, pipelineBufSize)
63 p.wg.Add(connPerPipeline)
64 for i := 0; i < connPerPipeline; i++ {
65 go p.handle()
66 }
67 plog.Infof("started HTTP pipelining with peer %s", p.peerID)
68}
69
70func (p *pipeline) stop() {
71 close(p.stopc)
72 p.wg.Wait()
73 plog.Infof("stopped HTTP pipelining with peer %s", p.peerID)
74}
75
76func (p *pipeline) handle() {
77 defer p.wg.Done()
78
79 for {
80 select {
81 case m := <-p.msgc:
82 start := time.Now()
83 err := p.post(pbutil.MustMarshal(&m))
84 end := time.Now()
85
86 if err != nil {
87 p.status.deactivate(failureType{source: pipelineMsg, action: "write"}, err.Error())
88
89 if m.Type == raftpb.MsgApp && p.followerStats != nil {
90 p.followerStats.Fail()
91 }
92 p.raft.ReportUnreachable(m.To)
93 if isMsgSnap(m) {
94 p.raft.ReportSnapshot(m.To, raft.SnapshotFailure)
95 }
96 sentFailures.WithLabelValues(types.ID(m.To).String()).Inc()
97 continue
98 }
99
100 p.status.activate()
101 if m.Type == raftpb.MsgApp && p.followerStats != nil {
102 p.followerStats.Succ(end.Sub(start))
103 }
104 if isMsgSnap(m) {
105 p.raft.ReportSnapshot(m.To, raft.SnapshotFinish)
106 }
107 sentBytes.WithLabelValues(types.ID(m.To).String()).Add(float64(m.Size()))
108 case <-p.stopc:
109 return
110 }
111 }
112}
113
114// post POSTs a data payload to a url. Returns nil if the POST succeeds,
115// error on any failure.
116func (p *pipeline) post(data []byte) (err error) {
117 u := p.picker.pick()
118 req := createPostRequest(u, RaftPrefix, bytes.NewBuffer(data), "application/protobuf", p.tr.URLs, p.tr.ID, p.tr.ClusterID)
119
120 done := make(chan struct{}, 1)
121 ctx, cancel := context.WithCancel(context.Background())
122 req = req.WithContext(ctx)
123 go func() {
124 select {
125 case <-done:
126 case <-p.stopc:
127 waitSchedule()
128 cancel()
129 }
130 }()
131
132 resp, err := p.tr.pipelineRt.RoundTrip(req)
133 done <- struct{}{}
134 if err != nil {
135 p.picker.unreachable(u)
136 return err
137 }
138 b, err := ioutil.ReadAll(resp.Body)
139 if err != nil {
140 p.picker.unreachable(u)
141 return err
142 }
143 resp.Body.Close()
144
145 err = checkPostResponse(resp, b, req, p.peerID)
146 if err != nil {
147 p.picker.unreachable(u)
148 // errMemberRemoved is a critical error since a removed member should
149 // always be stopped. So we use reportCriticalError to report it to errorc.
150 if err == errMemberRemoved {
151 reportCriticalError(err, p.errorc)
152 }
153 return err
154 }
155
156 return nil
157}
158
159// waitSchedule waits other goroutines to be scheduled for a while
160func waitSchedule() { time.Sleep(time.Millisecond) }