khenaidoo | 2672188 | 2021-08-11 17:42:52 -0400 | [diff] [blame] | 1 | // Copyright 2015 The etcd Authors |
| 2 | // |
| 3 | // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | // you may not use this file except in compliance with the License. |
| 5 | // You may obtain a copy of the License at |
| 6 | // |
| 7 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | // |
| 9 | // Unless required by applicable law or agreed to in writing, software |
| 10 | // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | // See the License for the specific language governing permissions and |
| 13 | // limitations under the License. |
| 14 | |
| 15 | package rafthttp |
| 16 | |
| 17 | import ( |
| 18 | "context" |
| 19 | "sync" |
| 20 | "time" |
| 21 | |
| 22 | "github.com/coreos/etcd/etcdserver/stats" |
| 23 | "github.com/coreos/etcd/pkg/types" |
| 24 | "github.com/coreos/etcd/raft" |
| 25 | "github.com/coreos/etcd/raft/raftpb" |
| 26 | "github.com/coreos/etcd/snap" |
| 27 | |
| 28 | "golang.org/x/time/rate" |
| 29 | ) |
| 30 | |
| 31 | const ( |
| 32 | // ConnReadTimeout and ConnWriteTimeout are the i/o timeout set on each connection rafthttp pkg creates. |
| 33 | // A 5 seconds timeout is good enough for recycling bad connections. Or we have to wait for |
| 34 | // tcp keepalive failing to detect a bad connection, which is at minutes level. |
| 35 | // For long term streaming connections, rafthttp pkg sends application level linkHeartbeatMessage |
| 36 | // to keep the connection alive. |
| 37 | // For short term pipeline connections, the connection MUST be killed to avoid it being |
| 38 | // put back to http pkg connection pool. |
| 39 | ConnReadTimeout = 5 * time.Second |
| 40 | ConnWriteTimeout = 5 * time.Second |
| 41 | |
| 42 | recvBufSize = 4096 |
| 43 | // maxPendingProposals holds the proposals during one leader election process. |
| 44 | // Generally one leader election takes at most 1 sec. It should have |
| 45 | // 0-2 election conflicts, and each one takes 0.5 sec. |
| 46 | // We assume the number of concurrent proposers is smaller than 4096. |
| 47 | // One client blocks on its proposal for at least 1 sec, so 4096 is enough |
| 48 | // to hold all proposals. |
| 49 | maxPendingProposals = 4096 |
| 50 | |
| 51 | streamAppV2 = "streamMsgAppV2" |
| 52 | streamMsg = "streamMsg" |
| 53 | pipelineMsg = "pipeline" |
| 54 | sendSnap = "sendMsgSnap" |
| 55 | ) |
| 56 | |
| 57 | type Peer interface { |
| 58 | // send sends the message to the remote peer. The function is non-blocking |
| 59 | // and has no promise that the message will be received by the remote. |
| 60 | // When it fails to send message out, it will report the status to underlying |
| 61 | // raft. |
| 62 | send(m raftpb.Message) |
| 63 | |
| 64 | // sendSnap sends the merged snapshot message to the remote peer. Its behavior |
| 65 | // is similar to send. |
| 66 | sendSnap(m snap.Message) |
| 67 | |
| 68 | // update updates the urls of remote peer. |
| 69 | update(urls types.URLs) |
| 70 | |
| 71 | // attachOutgoingConn attaches the outgoing connection to the peer for |
| 72 | // stream usage. After the call, the ownership of the outgoing |
| 73 | // connection hands over to the peer. The peer will close the connection |
| 74 | // when it is no longer used. |
| 75 | attachOutgoingConn(conn *outgoingConn) |
| 76 | // activeSince returns the time that the connection with the |
| 77 | // peer becomes active. |
| 78 | activeSince() time.Time |
| 79 | // stop performs any necessary finalization and terminates the peer |
| 80 | // elegantly. |
| 81 | stop() |
| 82 | } |
| 83 | |
| 84 | // peer is the representative of a remote raft node. Local raft node sends |
| 85 | // messages to the remote through peer. |
| 86 | // Each peer has two underlying mechanisms to send out a message: stream and |
| 87 | // pipeline. |
| 88 | // A stream is a receiver initialized long-polling connection, which |
| 89 | // is always open to transfer messages. Besides general stream, peer also has |
| 90 | // a optimized stream for sending msgApp since msgApp accounts for large part |
| 91 | // of all messages. Only raft leader uses the optimized stream to send msgApp |
| 92 | // to the remote follower node. |
| 93 | // A pipeline is a series of http clients that send http requests to the remote. |
| 94 | // It is only used when the stream has not been established. |
| 95 | type peer struct { |
| 96 | // id of the remote raft peer node |
| 97 | id types.ID |
| 98 | r Raft |
| 99 | |
| 100 | status *peerStatus |
| 101 | |
| 102 | picker *urlPicker |
| 103 | |
| 104 | msgAppV2Writer *streamWriter |
| 105 | writer *streamWriter |
| 106 | pipeline *pipeline |
| 107 | snapSender *snapshotSender // snapshot sender to send v3 snapshot messages |
| 108 | msgAppV2Reader *streamReader |
| 109 | msgAppReader *streamReader |
| 110 | |
| 111 | recvc chan raftpb.Message |
| 112 | propc chan raftpb.Message |
| 113 | |
| 114 | mu sync.Mutex |
| 115 | paused bool |
| 116 | |
| 117 | cancel context.CancelFunc // cancel pending works in go routine created by peer. |
| 118 | stopc chan struct{} |
| 119 | } |
| 120 | |
| 121 | func startPeer(transport *Transport, urls types.URLs, peerID types.ID, fs *stats.FollowerStats) *peer { |
| 122 | plog.Infof("starting peer %s...", peerID) |
| 123 | defer plog.Infof("started peer %s", peerID) |
| 124 | |
| 125 | status := newPeerStatus(peerID) |
| 126 | picker := newURLPicker(urls) |
| 127 | errorc := transport.ErrorC |
| 128 | r := transport.Raft |
| 129 | pipeline := &pipeline{ |
| 130 | peerID: peerID, |
| 131 | tr: transport, |
| 132 | picker: picker, |
| 133 | status: status, |
| 134 | followerStats: fs, |
| 135 | raft: r, |
| 136 | errorc: errorc, |
| 137 | } |
| 138 | pipeline.start() |
| 139 | |
| 140 | p := &peer{ |
| 141 | id: peerID, |
| 142 | r: r, |
| 143 | status: status, |
| 144 | picker: picker, |
| 145 | msgAppV2Writer: startStreamWriter(peerID, status, fs, r), |
| 146 | writer: startStreamWriter(peerID, status, fs, r), |
| 147 | pipeline: pipeline, |
| 148 | snapSender: newSnapshotSender(transport, picker, peerID, status), |
| 149 | recvc: make(chan raftpb.Message, recvBufSize), |
| 150 | propc: make(chan raftpb.Message, maxPendingProposals), |
| 151 | stopc: make(chan struct{}), |
| 152 | } |
| 153 | |
| 154 | ctx, cancel := context.WithCancel(context.Background()) |
| 155 | p.cancel = cancel |
| 156 | go func() { |
| 157 | for { |
| 158 | select { |
| 159 | case mm := <-p.recvc: |
| 160 | if err := r.Process(ctx, mm); err != nil { |
| 161 | plog.Warningf("failed to process raft message (%v)", err) |
| 162 | } |
| 163 | case <-p.stopc: |
| 164 | return |
| 165 | } |
| 166 | } |
| 167 | }() |
| 168 | |
| 169 | // r.Process might block for processing proposal when there is no leader. |
| 170 | // Thus propc must be put into a separate routine with recvc to avoid blocking |
| 171 | // processing other raft messages. |
| 172 | go func() { |
| 173 | for { |
| 174 | select { |
| 175 | case mm := <-p.propc: |
| 176 | if err := r.Process(ctx, mm); err != nil { |
| 177 | plog.Warningf("failed to process raft message (%v)", err) |
| 178 | } |
| 179 | case <-p.stopc: |
| 180 | return |
| 181 | } |
| 182 | } |
| 183 | }() |
| 184 | |
| 185 | p.msgAppV2Reader = &streamReader{ |
| 186 | peerID: peerID, |
| 187 | typ: streamTypeMsgAppV2, |
| 188 | tr: transport, |
| 189 | picker: picker, |
| 190 | status: status, |
| 191 | recvc: p.recvc, |
| 192 | propc: p.propc, |
| 193 | rl: rate.NewLimiter(transport.DialRetryFrequency, 1), |
| 194 | } |
| 195 | p.msgAppReader = &streamReader{ |
| 196 | peerID: peerID, |
| 197 | typ: streamTypeMessage, |
| 198 | tr: transport, |
| 199 | picker: picker, |
| 200 | status: status, |
| 201 | recvc: p.recvc, |
| 202 | propc: p.propc, |
| 203 | rl: rate.NewLimiter(transport.DialRetryFrequency, 1), |
| 204 | } |
| 205 | |
| 206 | p.msgAppV2Reader.start() |
| 207 | p.msgAppReader.start() |
| 208 | |
| 209 | return p |
| 210 | } |
| 211 | |
| 212 | func (p *peer) send(m raftpb.Message) { |
| 213 | p.mu.Lock() |
| 214 | paused := p.paused |
| 215 | p.mu.Unlock() |
| 216 | |
| 217 | if paused { |
| 218 | return |
| 219 | } |
| 220 | |
| 221 | writec, name := p.pick(m) |
| 222 | select { |
| 223 | case writec <- m: |
| 224 | default: |
| 225 | p.r.ReportUnreachable(m.To) |
| 226 | if isMsgSnap(m) { |
| 227 | p.r.ReportSnapshot(m.To, raft.SnapshotFailure) |
| 228 | } |
| 229 | if p.status.isActive() { |
| 230 | plog.MergeWarningf("dropped internal raft message to %s since %s's sending buffer is full (bad/overloaded network)", p.id, name) |
| 231 | } |
| 232 | plog.Debugf("dropped %s to %s since %s's sending buffer is full", m.Type, p.id, name) |
| 233 | sentFailures.WithLabelValues(types.ID(m.To).String()).Inc() |
| 234 | } |
| 235 | } |
| 236 | |
| 237 | func (p *peer) sendSnap(m snap.Message) { |
| 238 | go p.snapSender.send(m) |
| 239 | } |
| 240 | |
| 241 | func (p *peer) update(urls types.URLs) { |
| 242 | p.picker.update(urls) |
| 243 | } |
| 244 | |
| 245 | func (p *peer) attachOutgoingConn(conn *outgoingConn) { |
| 246 | var ok bool |
| 247 | switch conn.t { |
| 248 | case streamTypeMsgAppV2: |
| 249 | ok = p.msgAppV2Writer.attach(conn) |
| 250 | case streamTypeMessage: |
| 251 | ok = p.writer.attach(conn) |
| 252 | default: |
| 253 | plog.Panicf("unhandled stream type %s", conn.t) |
| 254 | } |
| 255 | if !ok { |
| 256 | conn.Close() |
| 257 | } |
| 258 | } |
| 259 | |
| 260 | func (p *peer) activeSince() time.Time { return p.status.activeSince() } |
| 261 | |
| 262 | // Pause pauses the peer. The peer will simply drops all incoming |
| 263 | // messages without returning an error. |
| 264 | func (p *peer) Pause() { |
| 265 | p.mu.Lock() |
| 266 | defer p.mu.Unlock() |
| 267 | p.paused = true |
| 268 | p.msgAppReader.pause() |
| 269 | p.msgAppV2Reader.pause() |
| 270 | } |
| 271 | |
| 272 | // Resume resumes a paused peer. |
| 273 | func (p *peer) Resume() { |
| 274 | p.mu.Lock() |
| 275 | defer p.mu.Unlock() |
| 276 | p.paused = false |
| 277 | p.msgAppReader.resume() |
| 278 | p.msgAppV2Reader.resume() |
| 279 | } |
| 280 | |
| 281 | func (p *peer) stop() { |
| 282 | plog.Infof("stopping peer %s...", p.id) |
| 283 | defer plog.Infof("stopped peer %s", p.id) |
| 284 | |
| 285 | close(p.stopc) |
| 286 | p.cancel() |
| 287 | p.msgAppV2Writer.stop() |
| 288 | p.writer.stop() |
| 289 | p.pipeline.stop() |
| 290 | p.snapSender.stop() |
| 291 | p.msgAppV2Reader.stop() |
| 292 | p.msgAppReader.stop() |
| 293 | } |
| 294 | |
| 295 | // pick picks a chan for sending the given message. The picked chan and the picked chan |
| 296 | // string name are returned. |
| 297 | func (p *peer) pick(m raftpb.Message) (writec chan<- raftpb.Message, picked string) { |
| 298 | var ok bool |
| 299 | // Considering MsgSnap may have a big size, e.g., 1G, and will block |
| 300 | // stream for a long time, only use one of the N pipelines to send MsgSnap. |
| 301 | if isMsgSnap(m) { |
| 302 | return p.pipeline.msgc, pipelineMsg |
| 303 | } else if writec, ok = p.msgAppV2Writer.writec(); ok && isMsgApp(m) { |
| 304 | return writec, streamAppV2 |
| 305 | } else if writec, ok = p.writer.writec(); ok { |
| 306 | return writec, streamMsg |
| 307 | } |
| 308 | return p.pipeline.msgc, pipelineMsg |
| 309 | } |
| 310 | |
| 311 | func isMsgApp(m raftpb.Message) bool { return m.Type == raftpb.MsgApp } |
| 312 | |
| 313 | func isMsgSnap(m raftpb.Message) bool { return m.Type == raftpb.MsgSnap } |