khenaidoo | 2672188 | 2021-08-11 17:42:52 -0400 | [diff] [blame] | 1 | // Copyright 2015 The etcd Authors |
| 2 | // |
| 3 | // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | // you may not use this file except in compliance with the License. |
| 5 | // You may obtain a copy of the License at |
| 6 | // |
| 7 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | // |
| 9 | // Unless required by applicable law or agreed to in writing, software |
| 10 | // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | // See the License for the specific language governing permissions and |
| 13 | // limitations under the License. |
| 14 | |
| 15 | package rafthttp |
| 16 | |
| 17 | import ( |
| 18 | "context" |
| 19 | "errors" |
| 20 | "fmt" |
| 21 | "io/ioutil" |
| 22 | "net/http" |
| 23 | "path" |
| 24 | "strings" |
| 25 | "time" |
| 26 | |
| 27 | pioutil "github.com/coreos/etcd/pkg/ioutil" |
| 28 | "github.com/coreos/etcd/pkg/types" |
| 29 | "github.com/coreos/etcd/raft/raftpb" |
| 30 | "github.com/coreos/etcd/snap" |
| 31 | "github.com/coreos/etcd/version" |
| 32 | "github.com/dustin/go-humanize" |
| 33 | ) |
| 34 | |
| 35 | const ( |
| 36 | // connReadLimitByte limits the number of bytes |
| 37 | // a single read can read out. |
| 38 | // |
| 39 | // 64KB should be large enough for not causing |
| 40 | // throughput bottleneck as well as small enough |
| 41 | // for not causing a read timeout. |
| 42 | connReadLimitByte = 64 * 1024 |
| 43 | ) |
| 44 | |
| 45 | var ( |
| 46 | RaftPrefix = "/raft" |
| 47 | ProbingPrefix = path.Join(RaftPrefix, "probing") |
| 48 | RaftStreamPrefix = path.Join(RaftPrefix, "stream") |
| 49 | RaftSnapshotPrefix = path.Join(RaftPrefix, "snapshot") |
| 50 | |
| 51 | errIncompatibleVersion = errors.New("incompatible version") |
| 52 | errClusterIDMismatch = errors.New("cluster ID mismatch") |
| 53 | ) |
| 54 | |
| 55 | type peerGetter interface { |
| 56 | Get(id types.ID) Peer |
| 57 | } |
| 58 | |
| 59 | type writerToResponse interface { |
| 60 | WriteTo(w http.ResponseWriter) |
| 61 | } |
| 62 | |
| 63 | type pipelineHandler struct { |
| 64 | tr Transporter |
| 65 | r Raft |
| 66 | cid types.ID |
| 67 | } |
| 68 | |
| 69 | // newPipelineHandler returns a handler for handling raft messages |
| 70 | // from pipeline for RaftPrefix. |
| 71 | // |
| 72 | // The handler reads out the raft message from request body, |
| 73 | // and forwards it to the given raft state machine for processing. |
| 74 | func newPipelineHandler(tr Transporter, r Raft, cid types.ID) http.Handler { |
| 75 | return &pipelineHandler{ |
| 76 | tr: tr, |
| 77 | r: r, |
| 78 | cid: cid, |
| 79 | } |
| 80 | } |
| 81 | |
| 82 | func (h *pipelineHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { |
| 83 | if r.Method != "POST" { |
| 84 | w.Header().Set("Allow", "POST") |
| 85 | http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) |
| 86 | return |
| 87 | } |
| 88 | |
| 89 | w.Header().Set("X-Etcd-Cluster-ID", h.cid.String()) |
| 90 | |
| 91 | if err := checkClusterCompatibilityFromHeader(r.Header, h.cid); err != nil { |
| 92 | http.Error(w, err.Error(), http.StatusPreconditionFailed) |
| 93 | return |
| 94 | } |
| 95 | |
| 96 | addRemoteFromRequest(h.tr, r) |
| 97 | |
| 98 | // Limit the data size that could be read from the request body, which ensures that read from |
| 99 | // connection will not time out accidentally due to possible blocking in underlying implementation. |
| 100 | limitedr := pioutil.NewLimitedBufferReader(r.Body, connReadLimitByte) |
| 101 | b, err := ioutil.ReadAll(limitedr) |
| 102 | if err != nil { |
| 103 | plog.Errorf("failed to read raft message (%v)", err) |
| 104 | http.Error(w, "error reading raft message", http.StatusBadRequest) |
| 105 | recvFailures.WithLabelValues(r.RemoteAddr).Inc() |
| 106 | return |
| 107 | } |
| 108 | |
| 109 | var m raftpb.Message |
| 110 | if err := m.Unmarshal(b); err != nil { |
| 111 | plog.Errorf("failed to unmarshal raft message (%v)", err) |
| 112 | http.Error(w, "error unmarshaling raft message", http.StatusBadRequest) |
| 113 | recvFailures.WithLabelValues(r.RemoteAddr).Inc() |
| 114 | return |
| 115 | } |
| 116 | |
| 117 | receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(len(b))) |
| 118 | |
| 119 | if err := h.r.Process(context.TODO(), m); err != nil { |
| 120 | switch v := err.(type) { |
| 121 | case writerToResponse: |
| 122 | v.WriteTo(w) |
| 123 | default: |
| 124 | plog.Warningf("failed to process raft message (%v)", err) |
| 125 | http.Error(w, "error processing raft message", http.StatusInternalServerError) |
| 126 | w.(http.Flusher).Flush() |
| 127 | // disconnect the http stream |
| 128 | panic(err) |
| 129 | } |
| 130 | return |
| 131 | } |
| 132 | |
| 133 | // Write StatusNoContent header after the message has been processed by |
| 134 | // raft, which facilitates the client to report MsgSnap status. |
| 135 | w.WriteHeader(http.StatusNoContent) |
| 136 | } |
| 137 | |
| 138 | type snapshotHandler struct { |
| 139 | tr Transporter |
| 140 | r Raft |
| 141 | snapshotter *snap.Snapshotter |
| 142 | cid types.ID |
| 143 | } |
| 144 | |
| 145 | func newSnapshotHandler(tr Transporter, r Raft, snapshotter *snap.Snapshotter, cid types.ID) http.Handler { |
| 146 | return &snapshotHandler{ |
| 147 | tr: tr, |
| 148 | r: r, |
| 149 | snapshotter: snapshotter, |
| 150 | cid: cid, |
| 151 | } |
| 152 | } |
| 153 | |
| 154 | const unknownSnapshotSender = "UNKNOWN_SNAPSHOT_SENDER" |
| 155 | |
| 156 | // ServeHTTP serves HTTP request to receive and process snapshot message. |
| 157 | // |
| 158 | // If request sender dies without closing underlying TCP connection, |
| 159 | // the handler will keep waiting for the request body until TCP keepalive |
| 160 | // finds out that the connection is broken after several minutes. |
| 161 | // This is acceptable because |
| 162 | // 1. snapshot messages sent through other TCP connections could still be |
| 163 | // received and processed. |
| 164 | // 2. this case should happen rarely, so no further optimization is done. |
| 165 | func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { |
| 166 | start := time.Now() |
| 167 | |
| 168 | if r.Method != "POST" { |
| 169 | w.Header().Set("Allow", "POST") |
| 170 | http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) |
| 171 | snapshotReceiveFailures.WithLabelValues(unknownSnapshotSender).Inc() |
| 172 | return |
| 173 | } |
| 174 | |
| 175 | w.Header().Set("X-Etcd-Cluster-ID", h.cid.String()) |
| 176 | |
| 177 | if err := checkClusterCompatibilityFromHeader(r.Header, h.cid); err != nil { |
| 178 | http.Error(w, err.Error(), http.StatusPreconditionFailed) |
| 179 | snapshotReceiveFailures.WithLabelValues(unknownSnapshotSender).Inc() |
| 180 | return |
| 181 | } |
| 182 | |
| 183 | addRemoteFromRequest(h.tr, r) |
| 184 | |
| 185 | dec := &messageDecoder{r: r.Body} |
| 186 | // let snapshots be very large since they can exceed 512MB for large installations |
| 187 | m, err := dec.decodeLimit(uint64(1 << 63)) |
| 188 | from := types.ID(m.From).String() |
| 189 | if err != nil { |
| 190 | msg := fmt.Sprintf("failed to decode raft message (%v)", err) |
| 191 | plog.Errorf(msg) |
| 192 | http.Error(w, msg, http.StatusBadRequest) |
| 193 | recvFailures.WithLabelValues(r.RemoteAddr).Inc() |
| 194 | snapshotReceiveFailures.WithLabelValues(from).Inc() |
| 195 | return |
| 196 | } |
| 197 | |
| 198 | msgSizeVal := m.Size() |
| 199 | msgSize := humanize.Bytes(uint64(msgSizeVal)) |
| 200 | receivedBytes.WithLabelValues(from).Add(float64(msgSizeVal)) |
| 201 | |
| 202 | if m.Type != raftpb.MsgSnap { |
| 203 | plog.Errorf("unexpected raft message type %s on snapshot path", m.Type) |
| 204 | http.Error(w, "wrong raft message type", http.StatusBadRequest) |
| 205 | snapshotReceiveFailures.WithLabelValues(from).Inc() |
| 206 | return |
| 207 | } |
| 208 | |
| 209 | snapshotReceiveInflights.WithLabelValues(from).Inc() |
| 210 | defer func() { |
| 211 | snapshotReceiveInflights.WithLabelValues(from).Dec() |
| 212 | }() |
| 213 | plog.Infof("receiving database snapshot [index: %d, from: %s, raft message size: %s]", m.Snapshot.Metadata.Index, types.ID(m.From), msgSize) |
| 214 | // save incoming database snapshot. |
| 215 | n, err := h.snapshotter.SaveDBFrom(r.Body, m.Snapshot.Metadata.Index) |
| 216 | if err != nil { |
| 217 | msg := fmt.Sprintf("failed to save KV snapshot (%v)", err) |
| 218 | plog.Error(msg) |
| 219 | http.Error(w, msg, http.StatusInternalServerError) |
| 220 | snapshotReceiveFailures.WithLabelValues(from).Inc() |
| 221 | return |
| 222 | } |
| 223 | |
| 224 | downloadTook := time.Since(start) |
| 225 | dbSize := humanize.Bytes(uint64(n)) |
| 226 | receivedBytes.WithLabelValues(from).Add(float64(n)) |
| 227 | plog.Infof("successfully received and saved database snapshot [index: %d, from: %s, raft message size: %s, db size: %s, took: %s]", m.Snapshot.Metadata.Index, types.ID(m.From), msgSize, dbSize, downloadTook) |
| 228 | |
| 229 | if err := h.r.Process(context.TODO(), m); err != nil { |
| 230 | switch v := err.(type) { |
| 231 | // Process may return writerToResponse error when doing some |
| 232 | // additional checks before calling raft.Node.Step. |
| 233 | case writerToResponse: |
| 234 | v.WriteTo(w) |
| 235 | default: |
| 236 | msg := fmt.Sprintf("failed to process raft message (%v)", err) |
| 237 | plog.Warningf(msg) |
| 238 | http.Error(w, msg, http.StatusInternalServerError) |
| 239 | snapshotReceiveFailures.WithLabelValues(from).Inc() |
| 240 | } |
| 241 | return |
| 242 | } |
| 243 | // Write StatusNoContent header after the message has been processed by |
| 244 | // raft, which facilitates the client to report MsgSnap status. |
| 245 | w.WriteHeader(http.StatusNoContent) |
| 246 | |
| 247 | snapshotReceive.WithLabelValues(from).Inc() |
| 248 | snapshotReceiveSeconds.WithLabelValues(from).Observe(time.Since(start).Seconds()) |
| 249 | } |
| 250 | |
| 251 | type streamHandler struct { |
| 252 | tr *Transport |
| 253 | peerGetter peerGetter |
| 254 | r Raft |
| 255 | id types.ID |
| 256 | cid types.ID |
| 257 | } |
| 258 | |
| 259 | func newStreamHandler(tr *Transport, pg peerGetter, r Raft, id, cid types.ID) http.Handler { |
| 260 | return &streamHandler{ |
| 261 | tr: tr, |
| 262 | peerGetter: pg, |
| 263 | r: r, |
| 264 | id: id, |
| 265 | cid: cid, |
| 266 | } |
| 267 | } |
| 268 | |
| 269 | func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { |
| 270 | if r.Method != "GET" { |
| 271 | w.Header().Set("Allow", "GET") |
| 272 | http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) |
| 273 | return |
| 274 | } |
| 275 | |
| 276 | w.Header().Set("X-Server-Version", version.Version) |
| 277 | w.Header().Set("X-Etcd-Cluster-ID", h.cid.String()) |
| 278 | |
| 279 | if err := checkClusterCompatibilityFromHeader(r.Header, h.cid); err != nil { |
| 280 | http.Error(w, err.Error(), http.StatusPreconditionFailed) |
| 281 | return |
| 282 | } |
| 283 | |
| 284 | var t streamType |
| 285 | switch path.Dir(r.URL.Path) { |
| 286 | case streamTypeMsgAppV2.endpoint(): |
| 287 | t = streamTypeMsgAppV2 |
| 288 | case streamTypeMessage.endpoint(): |
| 289 | t = streamTypeMessage |
| 290 | default: |
| 291 | plog.Debugf("ignored unexpected streaming request path %s", r.URL.Path) |
| 292 | http.Error(w, "invalid path", http.StatusNotFound) |
| 293 | return |
| 294 | } |
| 295 | |
| 296 | fromStr := path.Base(r.URL.Path) |
| 297 | from, err := types.IDFromString(fromStr) |
| 298 | if err != nil { |
| 299 | plog.Errorf("failed to parse from %s into ID (%v)", fromStr, err) |
| 300 | http.Error(w, "invalid from", http.StatusNotFound) |
| 301 | return |
| 302 | } |
| 303 | if h.r.IsIDRemoved(uint64(from)) { |
| 304 | plog.Warningf("rejected the stream from peer %s since it was removed", from) |
| 305 | http.Error(w, "removed member", http.StatusGone) |
| 306 | return |
| 307 | } |
| 308 | p := h.peerGetter.Get(from) |
| 309 | if p == nil { |
| 310 | // This may happen in following cases: |
| 311 | // 1. user starts a remote peer that belongs to a different cluster |
| 312 | // with the same cluster ID. |
| 313 | // 2. local etcd falls behind of the cluster, and cannot recognize |
| 314 | // the members that joined after its current progress. |
| 315 | if urls := r.Header.Get("X-PeerURLs"); urls != "" { |
| 316 | h.tr.AddRemote(from, strings.Split(urls, ",")) |
| 317 | } |
| 318 | plog.Errorf("failed to find member %s in cluster %s", from, h.cid) |
| 319 | http.Error(w, "error sender not found", http.StatusNotFound) |
| 320 | return |
| 321 | } |
| 322 | |
| 323 | wto := h.id.String() |
| 324 | if gto := r.Header.Get("X-Raft-To"); gto != wto { |
| 325 | plog.Errorf("streaming request ignored (ID mismatch got %s want %s)", gto, wto) |
| 326 | http.Error(w, "to field mismatch", http.StatusPreconditionFailed) |
| 327 | return |
| 328 | } |
| 329 | |
| 330 | w.WriteHeader(http.StatusOK) |
| 331 | w.(http.Flusher).Flush() |
| 332 | |
| 333 | c := newCloseNotifier() |
| 334 | conn := &outgoingConn{ |
| 335 | t: t, |
| 336 | Writer: w, |
| 337 | Flusher: w.(http.Flusher), |
| 338 | Closer: c, |
| 339 | } |
| 340 | p.attachOutgoingConn(conn) |
| 341 | <-c.closeNotify() |
| 342 | } |
| 343 | |
| 344 | // checkClusterCompatibilityFromHeader checks the cluster compatibility of |
| 345 | // the local member from the given header. |
| 346 | // It checks whether the version of local member is compatible with |
| 347 | // the versions in the header, and whether the cluster ID of local member |
| 348 | // matches the one in the header. |
| 349 | func checkClusterCompatibilityFromHeader(header http.Header, cid types.ID) error { |
| 350 | if err := checkVersionCompability(header.Get("X-Server-From"), serverVersion(header), minClusterVersion(header)); err != nil { |
| 351 | plog.Errorf("request version incompatibility (%v)", err) |
| 352 | return errIncompatibleVersion |
| 353 | } |
| 354 | if gcid := header.Get("X-Etcd-Cluster-ID"); gcid != cid.String() { |
| 355 | plog.Errorf("request cluster ID mismatch (got %s want %s)", gcid, cid) |
| 356 | return errClusterIDMismatch |
| 357 | } |
| 358 | return nil |
| 359 | } |
| 360 | |
| 361 | type closeNotifier struct { |
| 362 | done chan struct{} |
| 363 | } |
| 364 | |
| 365 | func newCloseNotifier() *closeNotifier { |
| 366 | return &closeNotifier{ |
| 367 | done: make(chan struct{}), |
| 368 | } |
| 369 | } |
| 370 | |
| 371 | func (n *closeNotifier) Close() error { |
| 372 | close(n.done) |
| 373 | return nil |
| 374 | } |
| 375 | |
| 376 | func (n *closeNotifier) closeNotify() <-chan struct{} { return n.done } |