blob: e9a25bb3aa95c16a55f5a2a77cf81d6e70d2552d [file] [log] [blame]
khenaidood948f772021-08-11 17:49:24 -04001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package rafthttp
16
17import (
18 "context"
19 "sync"
20 "time"
21
22 "github.com/coreos/etcd/etcdserver/stats"
23 "github.com/coreos/etcd/pkg/types"
24 "github.com/coreos/etcd/raft"
25 "github.com/coreos/etcd/raft/raftpb"
26 "github.com/coreos/etcd/snap"
27
28 "golang.org/x/time/rate"
29)
30
31const (
32 // ConnReadTimeout and ConnWriteTimeout are the i/o timeout set on each connection rafthttp pkg creates.
33 // A 5 seconds timeout is good enough for recycling bad connections. Or we have to wait for
34 // tcp keepalive failing to detect a bad connection, which is at minutes level.
35 // For long term streaming connections, rafthttp pkg sends application level linkHeartbeatMessage
36 // to keep the connection alive.
37 // For short term pipeline connections, the connection MUST be killed to avoid it being
38 // put back to http pkg connection pool.
39 ConnReadTimeout = 5 * time.Second
40 ConnWriteTimeout = 5 * time.Second
41
42 recvBufSize = 4096
43 // maxPendingProposals holds the proposals during one leader election process.
44 // Generally one leader election takes at most 1 sec. It should have
45 // 0-2 election conflicts, and each one takes 0.5 sec.
46 // We assume the number of concurrent proposers is smaller than 4096.
47 // One client blocks on its proposal for at least 1 sec, so 4096 is enough
48 // to hold all proposals.
49 maxPendingProposals = 4096
50
51 streamAppV2 = "streamMsgAppV2"
52 streamMsg = "streamMsg"
53 pipelineMsg = "pipeline"
54 sendSnap = "sendMsgSnap"
55)
56
57type Peer interface {
58 // send sends the message to the remote peer. The function is non-blocking
59 // and has no promise that the message will be received by the remote.
60 // When it fails to send message out, it will report the status to underlying
61 // raft.
62 send(m raftpb.Message)
63
64 // sendSnap sends the merged snapshot message to the remote peer. Its behavior
65 // is similar to send.
66 sendSnap(m snap.Message)
67
68 // update updates the urls of remote peer.
69 update(urls types.URLs)
70
71 // attachOutgoingConn attaches the outgoing connection to the peer for
72 // stream usage. After the call, the ownership of the outgoing
73 // connection hands over to the peer. The peer will close the connection
74 // when it is no longer used.
75 attachOutgoingConn(conn *outgoingConn)
76 // activeSince returns the time that the connection with the
77 // peer becomes active.
78 activeSince() time.Time
79 // stop performs any necessary finalization and terminates the peer
80 // elegantly.
81 stop()
82}
83
84// peer is the representative of a remote raft node. Local raft node sends
85// messages to the remote through peer.
86// Each peer has two underlying mechanisms to send out a message: stream and
87// pipeline.
88// A stream is a receiver initialized long-polling connection, which
89// is always open to transfer messages. Besides general stream, peer also has
90// a optimized stream for sending msgApp since msgApp accounts for large part
91// of all messages. Only raft leader uses the optimized stream to send msgApp
92// to the remote follower node.
93// A pipeline is a series of http clients that send http requests to the remote.
94// It is only used when the stream has not been established.
95type peer struct {
96 // id of the remote raft peer node
97 id types.ID
98 r Raft
99
100 status *peerStatus
101
102 picker *urlPicker
103
104 msgAppV2Writer *streamWriter
105 writer *streamWriter
106 pipeline *pipeline
107 snapSender *snapshotSender // snapshot sender to send v3 snapshot messages
108 msgAppV2Reader *streamReader
109 msgAppReader *streamReader
110
111 recvc chan raftpb.Message
112 propc chan raftpb.Message
113
114 mu sync.Mutex
115 paused bool
116
117 cancel context.CancelFunc // cancel pending works in go routine created by peer.
118 stopc chan struct{}
119}
120
121func startPeer(transport *Transport, urls types.URLs, peerID types.ID, fs *stats.FollowerStats) *peer {
122 plog.Infof("starting peer %s...", peerID)
123 defer plog.Infof("started peer %s", peerID)
124
125 status := newPeerStatus(peerID)
126 picker := newURLPicker(urls)
127 errorc := transport.ErrorC
128 r := transport.Raft
129 pipeline := &pipeline{
130 peerID: peerID,
131 tr: transport,
132 picker: picker,
133 status: status,
134 followerStats: fs,
135 raft: r,
136 errorc: errorc,
137 }
138 pipeline.start()
139
140 p := &peer{
141 id: peerID,
142 r: r,
143 status: status,
144 picker: picker,
145 msgAppV2Writer: startStreamWriter(peerID, status, fs, r),
146 writer: startStreamWriter(peerID, status, fs, r),
147 pipeline: pipeline,
148 snapSender: newSnapshotSender(transport, picker, peerID, status),
149 recvc: make(chan raftpb.Message, recvBufSize),
150 propc: make(chan raftpb.Message, maxPendingProposals),
151 stopc: make(chan struct{}),
152 }
153
154 ctx, cancel := context.WithCancel(context.Background())
155 p.cancel = cancel
156 go func() {
157 for {
158 select {
159 case mm := <-p.recvc:
160 if err := r.Process(ctx, mm); err != nil {
161 plog.Warningf("failed to process raft message (%v)", err)
162 }
163 case <-p.stopc:
164 return
165 }
166 }
167 }()
168
169 // r.Process might block for processing proposal when there is no leader.
170 // Thus propc must be put into a separate routine with recvc to avoid blocking
171 // processing other raft messages.
172 go func() {
173 for {
174 select {
175 case mm := <-p.propc:
176 if err := r.Process(ctx, mm); err != nil {
177 plog.Warningf("failed to process raft message (%v)", err)
178 }
179 case <-p.stopc:
180 return
181 }
182 }
183 }()
184
185 p.msgAppV2Reader = &streamReader{
186 peerID: peerID,
187 typ: streamTypeMsgAppV2,
188 tr: transport,
189 picker: picker,
190 status: status,
191 recvc: p.recvc,
192 propc: p.propc,
193 rl: rate.NewLimiter(transport.DialRetryFrequency, 1),
194 }
195 p.msgAppReader = &streamReader{
196 peerID: peerID,
197 typ: streamTypeMessage,
198 tr: transport,
199 picker: picker,
200 status: status,
201 recvc: p.recvc,
202 propc: p.propc,
203 rl: rate.NewLimiter(transport.DialRetryFrequency, 1),
204 }
205
206 p.msgAppV2Reader.start()
207 p.msgAppReader.start()
208
209 return p
210}
211
212func (p *peer) send(m raftpb.Message) {
213 p.mu.Lock()
214 paused := p.paused
215 p.mu.Unlock()
216
217 if paused {
218 return
219 }
220
221 writec, name := p.pick(m)
222 select {
223 case writec <- m:
224 default:
225 p.r.ReportUnreachable(m.To)
226 if isMsgSnap(m) {
227 p.r.ReportSnapshot(m.To, raft.SnapshotFailure)
228 }
229 if p.status.isActive() {
230 plog.MergeWarningf("dropped internal raft message to %s since %s's sending buffer is full (bad/overloaded network)", p.id, name)
231 }
232 plog.Debugf("dropped %s to %s since %s's sending buffer is full", m.Type, p.id, name)
233 sentFailures.WithLabelValues(types.ID(m.To).String()).Inc()
234 }
235}
236
237func (p *peer) sendSnap(m snap.Message) {
238 go p.snapSender.send(m)
239}
240
241func (p *peer) update(urls types.URLs) {
242 p.picker.update(urls)
243}
244
245func (p *peer) attachOutgoingConn(conn *outgoingConn) {
246 var ok bool
247 switch conn.t {
248 case streamTypeMsgAppV2:
249 ok = p.msgAppV2Writer.attach(conn)
250 case streamTypeMessage:
251 ok = p.writer.attach(conn)
252 default:
253 plog.Panicf("unhandled stream type %s", conn.t)
254 }
255 if !ok {
256 conn.Close()
257 }
258}
259
260func (p *peer) activeSince() time.Time { return p.status.activeSince() }
261
262// Pause pauses the peer. The peer will simply drops all incoming
263// messages without returning an error.
264func (p *peer) Pause() {
265 p.mu.Lock()
266 defer p.mu.Unlock()
267 p.paused = true
268 p.msgAppReader.pause()
269 p.msgAppV2Reader.pause()
270}
271
272// Resume resumes a paused peer.
273func (p *peer) Resume() {
274 p.mu.Lock()
275 defer p.mu.Unlock()
276 p.paused = false
277 p.msgAppReader.resume()
278 p.msgAppV2Reader.resume()
279}
280
281func (p *peer) stop() {
282 plog.Infof("stopping peer %s...", p.id)
283 defer plog.Infof("stopped peer %s", p.id)
284
285 close(p.stopc)
286 p.cancel()
287 p.msgAppV2Writer.stop()
288 p.writer.stop()
289 p.pipeline.stop()
290 p.snapSender.stop()
291 p.msgAppV2Reader.stop()
292 p.msgAppReader.stop()
293}
294
295// pick picks a chan for sending the given message. The picked chan and the picked chan
296// string name are returned.
297func (p *peer) pick(m raftpb.Message) (writec chan<- raftpb.Message, picked string) {
298 var ok bool
299 // Considering MsgSnap may have a big size, e.g., 1G, and will block
300 // stream for a long time, only use one of the N pipelines to send MsgSnap.
301 if isMsgSnap(m) {
302 return p.pipeline.msgc, pipelineMsg
303 } else if writec, ok = p.msgAppV2Writer.writec(); ok && isMsgApp(m) {
304 return writec, streamAppV2
305 } else if writec, ok = p.writer.writec(); ok {
306 return writec, streamMsg
307 }
308 return p.pipeline.msgc, pipelineMsg
309}
310
311func isMsgApp(m raftpb.Message) bool { return m.Type == raftpb.MsgApp }
312
313func isMsgSnap(m raftpb.Message) bool { return m.Type == raftpb.MsgSnap }