khenaidoo | ffe076b | 2019-01-15 16:08:08 -0500 | [diff] [blame^] | 1 | // Copyright 2015 The etcd Authors |
| 2 | // |
| 3 | // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | // you may not use this file except in compliance with the License. |
| 5 | // You may obtain a copy of the License at |
| 6 | // |
| 7 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | // |
| 9 | // Unless required by applicable law or agreed to in writing, software |
| 10 | // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | // See the License for the specific language governing permissions and |
| 13 | // limitations under the License. |
| 14 | |
| 15 | package rafthttp |
| 16 | |
| 17 | import ( |
| 18 | "context" |
| 19 | "errors" |
| 20 | "fmt" |
| 21 | "io/ioutil" |
| 22 | "net/http" |
| 23 | "path" |
| 24 | "strings" |
| 25 | "time" |
| 26 | |
| 27 | pioutil "github.com/coreos/etcd/pkg/ioutil" |
| 28 | "github.com/coreos/etcd/pkg/types" |
| 29 | "github.com/coreos/etcd/raft/raftpb" |
| 30 | "github.com/coreos/etcd/snap" |
| 31 | "github.com/coreos/etcd/version" |
| 32 | ) |
| 33 | |
| 34 | const ( |
| 35 | // connReadLimitByte limits the number of bytes |
| 36 | // a single read can read out. |
| 37 | // |
| 38 | // 64KB should be large enough for not causing |
| 39 | // throughput bottleneck as well as small enough |
| 40 | // for not causing a read timeout. |
| 41 | connReadLimitByte = 64 * 1024 |
| 42 | ) |
| 43 | |
| 44 | var ( |
| 45 | RaftPrefix = "/raft" |
| 46 | ProbingPrefix = path.Join(RaftPrefix, "probing") |
| 47 | RaftStreamPrefix = path.Join(RaftPrefix, "stream") |
| 48 | RaftSnapshotPrefix = path.Join(RaftPrefix, "snapshot") |
| 49 | |
| 50 | errIncompatibleVersion = errors.New("incompatible version") |
| 51 | errClusterIDMismatch = errors.New("cluster ID mismatch") |
| 52 | ) |
| 53 | |
| 54 | type peerGetter interface { |
| 55 | Get(id types.ID) Peer |
| 56 | } |
| 57 | |
| 58 | type writerToResponse interface { |
| 59 | WriteTo(w http.ResponseWriter) |
| 60 | } |
| 61 | |
| 62 | type pipelineHandler struct { |
| 63 | tr Transporter |
| 64 | r Raft |
| 65 | cid types.ID |
| 66 | } |
| 67 | |
| 68 | // newPipelineHandler returns a handler for handling raft messages |
| 69 | // from pipeline for RaftPrefix. |
| 70 | // |
| 71 | // The handler reads out the raft message from request body, |
| 72 | // and forwards it to the given raft state machine for processing. |
| 73 | func newPipelineHandler(tr Transporter, r Raft, cid types.ID) http.Handler { |
| 74 | return &pipelineHandler{ |
| 75 | tr: tr, |
| 76 | r: r, |
| 77 | cid: cid, |
| 78 | } |
| 79 | } |
| 80 | |
| 81 | func (h *pipelineHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { |
| 82 | if r.Method != "POST" { |
| 83 | w.Header().Set("Allow", "POST") |
| 84 | http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) |
| 85 | return |
| 86 | } |
| 87 | |
| 88 | w.Header().Set("X-Etcd-Cluster-ID", h.cid.String()) |
| 89 | |
| 90 | if err := checkClusterCompatibilityFromHeader(r.Header, h.cid); err != nil { |
| 91 | http.Error(w, err.Error(), http.StatusPreconditionFailed) |
| 92 | return |
| 93 | } |
| 94 | |
| 95 | addRemoteFromRequest(h.tr, r) |
| 96 | |
| 97 | // Limit the data size that could be read from the request body, which ensures that read from |
| 98 | // connection will not time out accidentally due to possible blocking in underlying implementation. |
| 99 | limitedr := pioutil.NewLimitedBufferReader(r.Body, connReadLimitByte) |
| 100 | b, err := ioutil.ReadAll(limitedr) |
| 101 | if err != nil { |
| 102 | plog.Errorf("failed to read raft message (%v)", err) |
| 103 | http.Error(w, "error reading raft message", http.StatusBadRequest) |
| 104 | recvFailures.WithLabelValues(r.RemoteAddr).Inc() |
| 105 | return |
| 106 | } |
| 107 | |
| 108 | var m raftpb.Message |
| 109 | if err := m.Unmarshal(b); err != nil { |
| 110 | plog.Errorf("failed to unmarshal raft message (%v)", err) |
| 111 | http.Error(w, "error unmarshaling raft message", http.StatusBadRequest) |
| 112 | recvFailures.WithLabelValues(r.RemoteAddr).Inc() |
| 113 | return |
| 114 | } |
| 115 | |
| 116 | receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(len(b))) |
| 117 | |
| 118 | if err := h.r.Process(context.TODO(), m); err != nil { |
| 119 | switch v := err.(type) { |
| 120 | case writerToResponse: |
| 121 | v.WriteTo(w) |
| 122 | default: |
| 123 | plog.Warningf("failed to process raft message (%v)", err) |
| 124 | http.Error(w, "error processing raft message", http.StatusInternalServerError) |
| 125 | w.(http.Flusher).Flush() |
| 126 | // disconnect the http stream |
| 127 | panic(err) |
| 128 | } |
| 129 | return |
| 130 | } |
| 131 | |
| 132 | // Write StatusNoContent header after the message has been processed by |
| 133 | // raft, which facilitates the client to report MsgSnap status. |
| 134 | w.WriteHeader(http.StatusNoContent) |
| 135 | } |
| 136 | |
| 137 | type snapshotHandler struct { |
| 138 | tr Transporter |
| 139 | r Raft |
| 140 | snapshotter *snap.Snapshotter |
| 141 | cid types.ID |
| 142 | } |
| 143 | |
| 144 | func newSnapshotHandler(tr Transporter, r Raft, snapshotter *snap.Snapshotter, cid types.ID) http.Handler { |
| 145 | return &snapshotHandler{ |
| 146 | tr: tr, |
| 147 | r: r, |
| 148 | snapshotter: snapshotter, |
| 149 | cid: cid, |
| 150 | } |
| 151 | } |
| 152 | |
| 153 | const unknownSnapshotSender = "UNKNOWN_SNAPSHOT_SENDER" |
| 154 | |
| 155 | // ServeHTTP serves HTTP request to receive and process snapshot message. |
| 156 | // |
| 157 | // If request sender dies without closing underlying TCP connection, |
| 158 | // the handler will keep waiting for the request body until TCP keepalive |
| 159 | // finds out that the connection is broken after several minutes. |
| 160 | // This is acceptable because |
| 161 | // 1. snapshot messages sent through other TCP connections could still be |
| 162 | // received and processed. |
| 163 | // 2. this case should happen rarely, so no further optimization is done. |
| 164 | func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { |
| 165 | start := time.Now() |
| 166 | |
| 167 | if r.Method != "POST" { |
| 168 | w.Header().Set("Allow", "POST") |
| 169 | http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) |
| 170 | snapshotReceiveFailures.WithLabelValues(unknownSnapshotSender).Inc() |
| 171 | return |
| 172 | } |
| 173 | |
| 174 | w.Header().Set("X-Etcd-Cluster-ID", h.cid.String()) |
| 175 | |
| 176 | if err := checkClusterCompatibilityFromHeader(r.Header, h.cid); err != nil { |
| 177 | http.Error(w, err.Error(), http.StatusPreconditionFailed) |
| 178 | snapshotReceiveFailures.WithLabelValues(unknownSnapshotSender).Inc() |
| 179 | return |
| 180 | } |
| 181 | |
| 182 | addRemoteFromRequest(h.tr, r) |
| 183 | |
| 184 | dec := &messageDecoder{r: r.Body} |
| 185 | // let snapshots be very large since they can exceed 512MB for large installations |
| 186 | m, err := dec.decodeLimit(uint64(1 << 63)) |
| 187 | from := types.ID(m.From).String() |
| 188 | if err != nil { |
| 189 | msg := fmt.Sprintf("failed to decode raft message (%v)", err) |
| 190 | plog.Errorf(msg) |
| 191 | http.Error(w, msg, http.StatusBadRequest) |
| 192 | recvFailures.WithLabelValues(r.RemoteAddr).Inc() |
| 193 | snapshotReceiveFailures.WithLabelValues(from).Inc() |
| 194 | return |
| 195 | } |
| 196 | |
| 197 | receivedBytes.WithLabelValues(from).Add(float64(m.Size())) |
| 198 | |
| 199 | if m.Type != raftpb.MsgSnap { |
| 200 | plog.Errorf("unexpected raft message type %s on snapshot path", m.Type) |
| 201 | http.Error(w, "wrong raft message type", http.StatusBadRequest) |
| 202 | snapshotReceiveFailures.WithLabelValues(from).Inc() |
| 203 | return |
| 204 | } |
| 205 | |
| 206 | plog.Infof("receiving database snapshot [index:%d, from %s] ...", m.Snapshot.Metadata.Index, types.ID(m.From)) |
| 207 | // save incoming database snapshot. |
| 208 | n, err := h.snapshotter.SaveDBFrom(r.Body, m.Snapshot.Metadata.Index) |
| 209 | if err != nil { |
| 210 | msg := fmt.Sprintf("failed to save KV snapshot (%v)", err) |
| 211 | plog.Error(msg) |
| 212 | http.Error(w, msg, http.StatusInternalServerError) |
| 213 | snapshotReceiveFailures.WithLabelValues(from).Inc() |
| 214 | return |
| 215 | } |
| 216 | receivedBytes.WithLabelValues(from).Add(float64(n)) |
| 217 | plog.Infof("received and saved database snapshot [index: %d, from: %s] successfully", m.Snapshot.Metadata.Index, types.ID(m.From)) |
| 218 | |
| 219 | if err := h.r.Process(context.TODO(), m); err != nil { |
| 220 | switch v := err.(type) { |
| 221 | // Process may return writerToResponse error when doing some |
| 222 | // additional checks before calling raft.Node.Step. |
| 223 | case writerToResponse: |
| 224 | v.WriteTo(w) |
| 225 | default: |
| 226 | msg := fmt.Sprintf("failed to process raft message (%v)", err) |
| 227 | plog.Warningf(msg) |
| 228 | http.Error(w, msg, http.StatusInternalServerError) |
| 229 | snapshotReceiveFailures.WithLabelValues(from).Inc() |
| 230 | } |
| 231 | return |
| 232 | } |
| 233 | // Write StatusNoContent header after the message has been processed by |
| 234 | // raft, which facilitates the client to report MsgSnap status. |
| 235 | w.WriteHeader(http.StatusNoContent) |
| 236 | |
| 237 | snapshotReceive.WithLabelValues(from).Inc() |
| 238 | snapshotReceiveSeconds.WithLabelValues(from).Observe(time.Since(start).Seconds()) |
| 239 | } |
| 240 | |
| 241 | type streamHandler struct { |
| 242 | tr *Transport |
| 243 | peerGetter peerGetter |
| 244 | r Raft |
| 245 | id types.ID |
| 246 | cid types.ID |
| 247 | } |
| 248 | |
| 249 | func newStreamHandler(tr *Transport, pg peerGetter, r Raft, id, cid types.ID) http.Handler { |
| 250 | return &streamHandler{ |
| 251 | tr: tr, |
| 252 | peerGetter: pg, |
| 253 | r: r, |
| 254 | id: id, |
| 255 | cid: cid, |
| 256 | } |
| 257 | } |
| 258 | |
| 259 | func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { |
| 260 | if r.Method != "GET" { |
| 261 | w.Header().Set("Allow", "GET") |
| 262 | http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) |
| 263 | return |
| 264 | } |
| 265 | |
| 266 | w.Header().Set("X-Server-Version", version.Version) |
| 267 | w.Header().Set("X-Etcd-Cluster-ID", h.cid.String()) |
| 268 | |
| 269 | if err := checkClusterCompatibilityFromHeader(r.Header, h.cid); err != nil { |
| 270 | http.Error(w, err.Error(), http.StatusPreconditionFailed) |
| 271 | return |
| 272 | } |
| 273 | |
| 274 | var t streamType |
| 275 | switch path.Dir(r.URL.Path) { |
| 276 | case streamTypeMsgAppV2.endpoint(): |
| 277 | t = streamTypeMsgAppV2 |
| 278 | case streamTypeMessage.endpoint(): |
| 279 | t = streamTypeMessage |
| 280 | default: |
| 281 | plog.Debugf("ignored unexpected streaming request path %s", r.URL.Path) |
| 282 | http.Error(w, "invalid path", http.StatusNotFound) |
| 283 | return |
| 284 | } |
| 285 | |
| 286 | fromStr := path.Base(r.URL.Path) |
| 287 | from, err := types.IDFromString(fromStr) |
| 288 | if err != nil { |
| 289 | plog.Errorf("failed to parse from %s into ID (%v)", fromStr, err) |
| 290 | http.Error(w, "invalid from", http.StatusNotFound) |
| 291 | return |
| 292 | } |
| 293 | if h.r.IsIDRemoved(uint64(from)) { |
| 294 | plog.Warningf("rejected the stream from peer %s since it was removed", from) |
| 295 | http.Error(w, "removed member", http.StatusGone) |
| 296 | return |
| 297 | } |
| 298 | p := h.peerGetter.Get(from) |
| 299 | if p == nil { |
| 300 | // This may happen in following cases: |
| 301 | // 1. user starts a remote peer that belongs to a different cluster |
| 302 | // with the same cluster ID. |
| 303 | // 2. local etcd falls behind of the cluster, and cannot recognize |
| 304 | // the members that joined after its current progress. |
| 305 | if urls := r.Header.Get("X-PeerURLs"); urls != "" { |
| 306 | h.tr.AddRemote(from, strings.Split(urls, ",")) |
| 307 | } |
| 308 | plog.Errorf("failed to find member %s in cluster %s", from, h.cid) |
| 309 | http.Error(w, "error sender not found", http.StatusNotFound) |
| 310 | return |
| 311 | } |
| 312 | |
| 313 | wto := h.id.String() |
| 314 | if gto := r.Header.Get("X-Raft-To"); gto != wto { |
| 315 | plog.Errorf("streaming request ignored (ID mismatch got %s want %s)", gto, wto) |
| 316 | http.Error(w, "to field mismatch", http.StatusPreconditionFailed) |
| 317 | return |
| 318 | } |
| 319 | |
| 320 | w.WriteHeader(http.StatusOK) |
| 321 | w.(http.Flusher).Flush() |
| 322 | |
| 323 | c := newCloseNotifier() |
| 324 | conn := &outgoingConn{ |
| 325 | t: t, |
| 326 | Writer: w, |
| 327 | Flusher: w.(http.Flusher), |
| 328 | Closer: c, |
| 329 | } |
| 330 | p.attachOutgoingConn(conn) |
| 331 | <-c.closeNotify() |
| 332 | } |
| 333 | |
| 334 | // checkClusterCompatibilityFromHeader checks the cluster compatibility of |
| 335 | // the local member from the given header. |
| 336 | // It checks whether the version of local member is compatible with |
| 337 | // the versions in the header, and whether the cluster ID of local member |
| 338 | // matches the one in the header. |
| 339 | func checkClusterCompatibilityFromHeader(header http.Header, cid types.ID) error { |
| 340 | if err := checkVersionCompability(header.Get("X-Server-From"), serverVersion(header), minClusterVersion(header)); err != nil { |
| 341 | plog.Errorf("request version incompatibility (%v)", err) |
| 342 | return errIncompatibleVersion |
| 343 | } |
| 344 | if gcid := header.Get("X-Etcd-Cluster-ID"); gcid != cid.String() { |
| 345 | plog.Errorf("request cluster ID mismatch (got %s want %s)", gcid, cid) |
| 346 | return errClusterIDMismatch |
| 347 | } |
| 348 | return nil |
| 349 | } |
| 350 | |
| 351 | type closeNotifier struct { |
| 352 | done chan struct{} |
| 353 | } |
| 354 | |
| 355 | func newCloseNotifier() *closeNotifier { |
| 356 | return &closeNotifier{ |
| 357 | done: make(chan struct{}), |
| 358 | } |
| 359 | } |
| 360 | |
| 361 | func (n *closeNotifier) Close() error { |
| 362 | close(n.done) |
| 363 | return nil |
| 364 | } |
| 365 | |
| 366 | func (n *closeNotifier) closeNotify() <-chan struct{} { return n.done } |