khenaidoo | ab1f7bd | 2019-11-14 14:00:27 -0500 | [diff] [blame] | 1 | // Copyright 2015 The etcd Authors |
| 2 | // |
| 3 | // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | // you may not use this file except in compliance with the License. |
| 5 | // You may obtain a copy of the License at |
| 6 | // |
| 7 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | // |
| 9 | // Unless required by applicable law or agreed to in writing, software |
| 10 | // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | // See the License for the specific language governing permissions and |
| 13 | // limitations under the License. |
| 14 | |
| 15 | package rafthttp |
| 16 | |
| 17 | import ( |
| 18 | "context" |
| 19 | "sync" |
| 20 | "time" |
| 21 | |
| 22 | "go.etcd.io/etcd/etcdserver/api/snap" |
| 23 | stats "go.etcd.io/etcd/etcdserver/api/v2stats" |
| 24 | "go.etcd.io/etcd/pkg/types" |
| 25 | "go.etcd.io/etcd/raft" |
| 26 | "go.etcd.io/etcd/raft/raftpb" |
| 27 | |
| 28 | "go.uber.org/zap" |
| 29 | "golang.org/x/time/rate" |
| 30 | ) |
| 31 | |
| 32 | const ( |
| 33 | // ConnReadTimeout and ConnWriteTimeout are the i/o timeout set on each connection rafthttp pkg creates. |
| 34 | // A 5 seconds timeout is good enough for recycling bad connections. Or we have to wait for |
| 35 | // tcp keepalive failing to detect a bad connection, which is at minutes level. |
| 36 | // For long term streaming connections, rafthttp pkg sends application level linkHeartbeatMessage |
| 37 | // to keep the connection alive. |
| 38 | // For short term pipeline connections, the connection MUST be killed to avoid it being |
| 39 | // put back to http pkg connection pool. |
| 40 | ConnReadTimeout = 5 * time.Second |
| 41 | ConnWriteTimeout = 5 * time.Second |
| 42 | |
| 43 | recvBufSize = 4096 |
| 44 | // maxPendingProposals holds the proposals during one leader election process. |
| 45 | // Generally one leader election takes at most 1 sec. It should have |
| 46 | // 0-2 election conflicts, and each one takes 0.5 sec. |
| 47 | // We assume the number of concurrent proposers is smaller than 4096. |
| 48 | // One client blocks on its proposal for at least 1 sec, so 4096 is enough |
| 49 | // to hold all proposals. |
| 50 | maxPendingProposals = 4096 |
| 51 | |
| 52 | streamAppV2 = "streamMsgAppV2" |
| 53 | streamMsg = "streamMsg" |
| 54 | pipelineMsg = "pipeline" |
| 55 | sendSnap = "sendMsgSnap" |
| 56 | ) |
| 57 | |
| 58 | type Peer interface { |
| 59 | // send sends the message to the remote peer. The function is non-blocking |
| 60 | // and has no promise that the message will be received by the remote. |
| 61 | // When it fails to send message out, it will report the status to underlying |
| 62 | // raft. |
| 63 | send(m raftpb.Message) |
| 64 | |
| 65 | // sendSnap sends the merged snapshot message to the remote peer. Its behavior |
| 66 | // is similar to send. |
| 67 | sendSnap(m snap.Message) |
| 68 | |
| 69 | // update updates the urls of remote peer. |
| 70 | update(urls types.URLs) |
| 71 | |
| 72 | // attachOutgoingConn attaches the outgoing connection to the peer for |
| 73 | // stream usage. After the call, the ownership of the outgoing |
| 74 | // connection hands over to the peer. The peer will close the connection |
| 75 | // when it is no longer used. |
| 76 | attachOutgoingConn(conn *outgoingConn) |
| 77 | // activeSince returns the time that the connection with the |
| 78 | // peer becomes active. |
| 79 | activeSince() time.Time |
| 80 | // stop performs any necessary finalization and terminates the peer |
| 81 | // elegantly. |
| 82 | stop() |
| 83 | } |
| 84 | |
| 85 | // peer is the representative of a remote raft node. Local raft node sends |
| 86 | // messages to the remote through peer. |
| 87 | // Each peer has two underlying mechanisms to send out a message: stream and |
| 88 | // pipeline. |
| 89 | // A stream is a receiver initialized long-polling connection, which |
| 90 | // is always open to transfer messages. Besides general stream, peer also has |
| 91 | // a optimized stream for sending msgApp since msgApp accounts for large part |
| 92 | // of all messages. Only raft leader uses the optimized stream to send msgApp |
| 93 | // to the remote follower node. |
| 94 | // A pipeline is a series of http clients that send http requests to the remote. |
| 95 | // It is only used when the stream has not been established. |
| 96 | type peer struct { |
| 97 | lg *zap.Logger |
| 98 | |
| 99 | localID types.ID |
| 100 | // id of the remote raft peer node |
| 101 | id types.ID |
| 102 | |
| 103 | r Raft |
| 104 | |
| 105 | status *peerStatus |
| 106 | |
| 107 | picker *urlPicker |
| 108 | |
| 109 | msgAppV2Writer *streamWriter |
| 110 | writer *streamWriter |
| 111 | pipeline *pipeline |
| 112 | snapSender *snapshotSender // snapshot sender to send v3 snapshot messages |
| 113 | msgAppV2Reader *streamReader |
| 114 | msgAppReader *streamReader |
| 115 | |
| 116 | recvc chan raftpb.Message |
| 117 | propc chan raftpb.Message |
| 118 | |
| 119 | mu sync.Mutex |
| 120 | paused bool |
| 121 | |
| 122 | cancel context.CancelFunc // cancel pending works in go routine created by peer. |
| 123 | stopc chan struct{} |
| 124 | } |
| 125 | |
| 126 | func startPeer(t *Transport, urls types.URLs, peerID types.ID, fs *stats.FollowerStats) *peer { |
| 127 | if t.Logger != nil { |
| 128 | t.Logger.Info("starting remote peer", zap.String("remote-peer-id", peerID.String())) |
| 129 | } else { |
| 130 | plog.Infof("starting peer %s...", peerID) |
| 131 | } |
| 132 | defer func() { |
| 133 | if t.Logger != nil { |
| 134 | t.Logger.Info("started remote peer", zap.String("remote-peer-id", peerID.String())) |
| 135 | } else { |
| 136 | plog.Infof("started peer %s", peerID) |
| 137 | } |
| 138 | }() |
| 139 | |
| 140 | status := newPeerStatus(t.Logger, t.ID, peerID) |
| 141 | picker := newURLPicker(urls) |
| 142 | errorc := t.ErrorC |
| 143 | r := t.Raft |
| 144 | pipeline := &pipeline{ |
| 145 | peerID: peerID, |
| 146 | tr: t, |
| 147 | picker: picker, |
| 148 | status: status, |
| 149 | followerStats: fs, |
| 150 | raft: r, |
| 151 | errorc: errorc, |
| 152 | } |
| 153 | pipeline.start() |
| 154 | |
| 155 | p := &peer{ |
| 156 | lg: t.Logger, |
| 157 | localID: t.ID, |
| 158 | id: peerID, |
| 159 | r: r, |
| 160 | status: status, |
| 161 | picker: picker, |
| 162 | msgAppV2Writer: startStreamWriter(t.Logger, t.ID, peerID, status, fs, r), |
| 163 | writer: startStreamWriter(t.Logger, t.ID, peerID, status, fs, r), |
| 164 | pipeline: pipeline, |
| 165 | snapSender: newSnapshotSender(t, picker, peerID, status), |
| 166 | recvc: make(chan raftpb.Message, recvBufSize), |
| 167 | propc: make(chan raftpb.Message, maxPendingProposals), |
| 168 | stopc: make(chan struct{}), |
| 169 | } |
| 170 | |
| 171 | ctx, cancel := context.WithCancel(context.Background()) |
| 172 | p.cancel = cancel |
| 173 | go func() { |
| 174 | for { |
| 175 | select { |
| 176 | case mm := <-p.recvc: |
| 177 | if err := r.Process(ctx, mm); err != nil { |
| 178 | if t.Logger != nil { |
| 179 | t.Logger.Warn("failed to process Raft message", zap.Error(err)) |
| 180 | } else { |
| 181 | plog.Warningf("failed to process raft message (%v)", err) |
| 182 | } |
| 183 | } |
| 184 | case <-p.stopc: |
| 185 | return |
| 186 | } |
| 187 | } |
| 188 | }() |
| 189 | |
| 190 | // r.Process might block for processing proposal when there is no leader. |
| 191 | // Thus propc must be put into a separate routine with recvc to avoid blocking |
| 192 | // processing other raft messages. |
| 193 | go func() { |
| 194 | for { |
| 195 | select { |
| 196 | case mm := <-p.propc: |
| 197 | if err := r.Process(ctx, mm); err != nil { |
| 198 | plog.Warningf("failed to process raft message (%v)", err) |
| 199 | } |
| 200 | case <-p.stopc: |
| 201 | return |
| 202 | } |
| 203 | } |
| 204 | }() |
| 205 | |
| 206 | p.msgAppV2Reader = &streamReader{ |
| 207 | lg: t.Logger, |
| 208 | peerID: peerID, |
| 209 | typ: streamTypeMsgAppV2, |
| 210 | tr: t, |
| 211 | picker: picker, |
| 212 | status: status, |
| 213 | recvc: p.recvc, |
| 214 | propc: p.propc, |
| 215 | rl: rate.NewLimiter(t.DialRetryFrequency, 1), |
| 216 | } |
| 217 | p.msgAppReader = &streamReader{ |
| 218 | lg: t.Logger, |
| 219 | peerID: peerID, |
| 220 | typ: streamTypeMessage, |
| 221 | tr: t, |
| 222 | picker: picker, |
| 223 | status: status, |
| 224 | recvc: p.recvc, |
| 225 | propc: p.propc, |
| 226 | rl: rate.NewLimiter(t.DialRetryFrequency, 1), |
| 227 | } |
| 228 | |
| 229 | p.msgAppV2Reader.start() |
| 230 | p.msgAppReader.start() |
| 231 | |
| 232 | return p |
| 233 | } |
| 234 | |
| 235 | func (p *peer) send(m raftpb.Message) { |
| 236 | p.mu.Lock() |
| 237 | paused := p.paused |
| 238 | p.mu.Unlock() |
| 239 | |
| 240 | if paused { |
| 241 | return |
| 242 | } |
| 243 | |
| 244 | writec, name := p.pick(m) |
| 245 | select { |
| 246 | case writec <- m: |
| 247 | default: |
| 248 | p.r.ReportUnreachable(m.To) |
| 249 | if isMsgSnap(m) { |
| 250 | p.r.ReportSnapshot(m.To, raft.SnapshotFailure) |
| 251 | } |
| 252 | if p.status.isActive() { |
| 253 | if p.lg != nil { |
| 254 | p.lg.Warn( |
| 255 | "dropped internal Raft message since sending buffer is full (overloaded network)", |
| 256 | zap.String("message-type", m.Type.String()), |
| 257 | zap.String("local-member-id", p.localID.String()), |
| 258 | zap.String("from", types.ID(m.From).String()), |
| 259 | zap.String("remote-peer-id", p.id.String()), |
| 260 | zap.Bool("remote-peer-active", p.status.isActive()), |
| 261 | ) |
| 262 | } else { |
| 263 | plog.MergeWarningf("dropped internal raft message to %s since %s's sending buffer is full (bad/overloaded network)", p.id, name) |
| 264 | } |
| 265 | } else { |
| 266 | if p.lg != nil { |
| 267 | p.lg.Warn( |
| 268 | "dropped internal Raft message since sending buffer is full (overloaded network)", |
| 269 | zap.String("message-type", m.Type.String()), |
| 270 | zap.String("local-member-id", p.localID.String()), |
| 271 | zap.String("from", types.ID(m.From).String()), |
| 272 | zap.String("remote-peer-id", p.id.String()), |
| 273 | zap.Bool("remote-peer-active", p.status.isActive()), |
| 274 | ) |
| 275 | } else { |
| 276 | plog.Debugf("dropped %s to %s since %s's sending buffer is full", m.Type, p.id, name) |
| 277 | } |
| 278 | } |
| 279 | sentFailures.WithLabelValues(types.ID(m.To).String()).Inc() |
| 280 | } |
| 281 | } |
| 282 | |
| 283 | func (p *peer) sendSnap(m snap.Message) { |
| 284 | go p.snapSender.send(m) |
| 285 | } |
| 286 | |
| 287 | func (p *peer) update(urls types.URLs) { |
| 288 | p.picker.update(urls) |
| 289 | } |
| 290 | |
| 291 | func (p *peer) attachOutgoingConn(conn *outgoingConn) { |
| 292 | var ok bool |
| 293 | switch conn.t { |
| 294 | case streamTypeMsgAppV2: |
| 295 | ok = p.msgAppV2Writer.attach(conn) |
| 296 | case streamTypeMessage: |
| 297 | ok = p.writer.attach(conn) |
| 298 | default: |
| 299 | if p.lg != nil { |
| 300 | p.lg.Panic("unknown stream type", zap.String("type", conn.t.String())) |
| 301 | } else { |
| 302 | plog.Panicf("unhandled stream type %s", conn.t) |
| 303 | } |
| 304 | } |
| 305 | if !ok { |
| 306 | conn.Close() |
| 307 | } |
| 308 | } |
| 309 | |
| 310 | func (p *peer) activeSince() time.Time { return p.status.activeSince() } |
| 311 | |
| 312 | // Pause pauses the peer. The peer will simply drops all incoming |
| 313 | // messages without returning an error. |
| 314 | func (p *peer) Pause() { |
| 315 | p.mu.Lock() |
| 316 | defer p.mu.Unlock() |
| 317 | p.paused = true |
| 318 | p.msgAppReader.pause() |
| 319 | p.msgAppV2Reader.pause() |
| 320 | } |
| 321 | |
| 322 | // Resume resumes a paused peer. |
| 323 | func (p *peer) Resume() { |
| 324 | p.mu.Lock() |
| 325 | defer p.mu.Unlock() |
| 326 | p.paused = false |
| 327 | p.msgAppReader.resume() |
| 328 | p.msgAppV2Reader.resume() |
| 329 | } |
| 330 | |
| 331 | func (p *peer) stop() { |
| 332 | if p.lg != nil { |
| 333 | p.lg.Info("stopping remote peer", zap.String("remote-peer-id", p.id.String())) |
| 334 | } else { |
| 335 | plog.Infof("stopping peer %s...", p.id) |
| 336 | } |
| 337 | |
| 338 | defer func() { |
| 339 | if p.lg != nil { |
| 340 | p.lg.Info("stopped remote peer", zap.String("remote-peer-id", p.id.String())) |
| 341 | } else { |
| 342 | plog.Infof("stopped peer %s", p.id) |
| 343 | } |
| 344 | }() |
| 345 | |
| 346 | close(p.stopc) |
| 347 | p.cancel() |
| 348 | p.msgAppV2Writer.stop() |
| 349 | p.writer.stop() |
| 350 | p.pipeline.stop() |
| 351 | p.snapSender.stop() |
| 352 | p.msgAppV2Reader.stop() |
| 353 | p.msgAppReader.stop() |
| 354 | } |
| 355 | |
| 356 | // pick picks a chan for sending the given message. The picked chan and the picked chan |
| 357 | // string name are returned. |
| 358 | func (p *peer) pick(m raftpb.Message) (writec chan<- raftpb.Message, picked string) { |
| 359 | var ok bool |
| 360 | // Considering MsgSnap may have a big size, e.g., 1G, and will block |
| 361 | // stream for a long time, only use one of the N pipelines to send MsgSnap. |
| 362 | if isMsgSnap(m) { |
| 363 | return p.pipeline.msgc, pipelineMsg |
| 364 | } else if writec, ok = p.msgAppV2Writer.writec(); ok && isMsgApp(m) { |
| 365 | return writec, streamAppV2 |
| 366 | } else if writec, ok = p.writer.writec(); ok { |
| 367 | return writec, streamMsg |
| 368 | } |
| 369 | return p.pipeline.msgc, pipelineMsg |
| 370 | } |
| 371 | |
| 372 | func isMsgApp(m raftpb.Message) bool { return m.Type == raftpb.MsgApp } |
| 373 | |
| 374 | func isMsgSnap(m raftpb.Message) bool { return m.Type == raftpb.MsgSnap } |