blob: 223a5deb9bdbe486f8cc51d47175945eb6fc2940 [file] [log] [blame]
khenaidooffe076b2019-01-15 16:08:08 -05001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package rafthttp
16
17import (
18 "context"
19 "errors"
20 "fmt"
21 "io/ioutil"
22 "net/http"
23 "path"
24 "strings"
25 "time"
26
27 pioutil "github.com/coreos/etcd/pkg/ioutil"
28 "github.com/coreos/etcd/pkg/types"
29 "github.com/coreos/etcd/raft/raftpb"
30 "github.com/coreos/etcd/snap"
31 "github.com/coreos/etcd/version"
32)
33
34const (
35 // connReadLimitByte limits the number of bytes
36 // a single read can read out.
37 //
38 // 64KB should be large enough for not causing
39 // throughput bottleneck as well as small enough
40 // for not causing a read timeout.
41 connReadLimitByte = 64 * 1024
42)
43
44var (
45 RaftPrefix = "/raft"
46 ProbingPrefix = path.Join(RaftPrefix, "probing")
47 RaftStreamPrefix = path.Join(RaftPrefix, "stream")
48 RaftSnapshotPrefix = path.Join(RaftPrefix, "snapshot")
49
50 errIncompatibleVersion = errors.New("incompatible version")
51 errClusterIDMismatch = errors.New("cluster ID mismatch")
52)
53
54type peerGetter interface {
55 Get(id types.ID) Peer
56}
57
58type writerToResponse interface {
59 WriteTo(w http.ResponseWriter)
60}
61
62type pipelineHandler struct {
63 tr Transporter
64 r Raft
65 cid types.ID
66}
67
68// newPipelineHandler returns a handler for handling raft messages
69// from pipeline for RaftPrefix.
70//
71// The handler reads out the raft message from request body,
72// and forwards it to the given raft state machine for processing.
73func newPipelineHandler(tr Transporter, r Raft, cid types.ID) http.Handler {
74 return &pipelineHandler{
75 tr: tr,
76 r: r,
77 cid: cid,
78 }
79}
80
81func (h *pipelineHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
82 if r.Method != "POST" {
83 w.Header().Set("Allow", "POST")
84 http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
85 return
86 }
87
88 w.Header().Set("X-Etcd-Cluster-ID", h.cid.String())
89
90 if err := checkClusterCompatibilityFromHeader(r.Header, h.cid); err != nil {
91 http.Error(w, err.Error(), http.StatusPreconditionFailed)
92 return
93 }
94
95 addRemoteFromRequest(h.tr, r)
96
97 // Limit the data size that could be read from the request body, which ensures that read from
98 // connection will not time out accidentally due to possible blocking in underlying implementation.
99 limitedr := pioutil.NewLimitedBufferReader(r.Body, connReadLimitByte)
100 b, err := ioutil.ReadAll(limitedr)
101 if err != nil {
102 plog.Errorf("failed to read raft message (%v)", err)
103 http.Error(w, "error reading raft message", http.StatusBadRequest)
104 recvFailures.WithLabelValues(r.RemoteAddr).Inc()
105 return
106 }
107
108 var m raftpb.Message
109 if err := m.Unmarshal(b); err != nil {
110 plog.Errorf("failed to unmarshal raft message (%v)", err)
111 http.Error(w, "error unmarshaling raft message", http.StatusBadRequest)
112 recvFailures.WithLabelValues(r.RemoteAddr).Inc()
113 return
114 }
115
116 receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(len(b)))
117
118 if err := h.r.Process(context.TODO(), m); err != nil {
119 switch v := err.(type) {
120 case writerToResponse:
121 v.WriteTo(w)
122 default:
123 plog.Warningf("failed to process raft message (%v)", err)
124 http.Error(w, "error processing raft message", http.StatusInternalServerError)
125 w.(http.Flusher).Flush()
126 // disconnect the http stream
127 panic(err)
128 }
129 return
130 }
131
132 // Write StatusNoContent header after the message has been processed by
133 // raft, which facilitates the client to report MsgSnap status.
134 w.WriteHeader(http.StatusNoContent)
135}
136
137type snapshotHandler struct {
138 tr Transporter
139 r Raft
140 snapshotter *snap.Snapshotter
141 cid types.ID
142}
143
144func newSnapshotHandler(tr Transporter, r Raft, snapshotter *snap.Snapshotter, cid types.ID) http.Handler {
145 return &snapshotHandler{
146 tr: tr,
147 r: r,
148 snapshotter: snapshotter,
149 cid: cid,
150 }
151}
152
153const unknownSnapshotSender = "UNKNOWN_SNAPSHOT_SENDER"
154
155// ServeHTTP serves HTTP request to receive and process snapshot message.
156//
157// If request sender dies without closing underlying TCP connection,
158// the handler will keep waiting for the request body until TCP keepalive
159// finds out that the connection is broken after several minutes.
160// This is acceptable because
161// 1. snapshot messages sent through other TCP connections could still be
162// received and processed.
163// 2. this case should happen rarely, so no further optimization is done.
164func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
165 start := time.Now()
166
167 if r.Method != "POST" {
168 w.Header().Set("Allow", "POST")
169 http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
170 snapshotReceiveFailures.WithLabelValues(unknownSnapshotSender).Inc()
171 return
172 }
173
174 w.Header().Set("X-Etcd-Cluster-ID", h.cid.String())
175
176 if err := checkClusterCompatibilityFromHeader(r.Header, h.cid); err != nil {
177 http.Error(w, err.Error(), http.StatusPreconditionFailed)
178 snapshotReceiveFailures.WithLabelValues(unknownSnapshotSender).Inc()
179 return
180 }
181
182 addRemoteFromRequest(h.tr, r)
183
184 dec := &messageDecoder{r: r.Body}
185 // let snapshots be very large since they can exceed 512MB for large installations
186 m, err := dec.decodeLimit(uint64(1 << 63))
187 from := types.ID(m.From).String()
188 if err != nil {
189 msg := fmt.Sprintf("failed to decode raft message (%v)", err)
190 plog.Errorf(msg)
191 http.Error(w, msg, http.StatusBadRequest)
192 recvFailures.WithLabelValues(r.RemoteAddr).Inc()
193 snapshotReceiveFailures.WithLabelValues(from).Inc()
194 return
195 }
196
197 receivedBytes.WithLabelValues(from).Add(float64(m.Size()))
198
199 if m.Type != raftpb.MsgSnap {
200 plog.Errorf("unexpected raft message type %s on snapshot path", m.Type)
201 http.Error(w, "wrong raft message type", http.StatusBadRequest)
202 snapshotReceiveFailures.WithLabelValues(from).Inc()
203 return
204 }
205
206 plog.Infof("receiving database snapshot [index:%d, from %s] ...", m.Snapshot.Metadata.Index, types.ID(m.From))
207 // save incoming database snapshot.
208 n, err := h.snapshotter.SaveDBFrom(r.Body, m.Snapshot.Metadata.Index)
209 if err != nil {
210 msg := fmt.Sprintf("failed to save KV snapshot (%v)", err)
211 plog.Error(msg)
212 http.Error(w, msg, http.StatusInternalServerError)
213 snapshotReceiveFailures.WithLabelValues(from).Inc()
214 return
215 }
216 receivedBytes.WithLabelValues(from).Add(float64(n))
217 plog.Infof("received and saved database snapshot [index: %d, from: %s] successfully", m.Snapshot.Metadata.Index, types.ID(m.From))
218
219 if err := h.r.Process(context.TODO(), m); err != nil {
220 switch v := err.(type) {
221 // Process may return writerToResponse error when doing some
222 // additional checks before calling raft.Node.Step.
223 case writerToResponse:
224 v.WriteTo(w)
225 default:
226 msg := fmt.Sprintf("failed to process raft message (%v)", err)
227 plog.Warningf(msg)
228 http.Error(w, msg, http.StatusInternalServerError)
229 snapshotReceiveFailures.WithLabelValues(from).Inc()
230 }
231 return
232 }
233 // Write StatusNoContent header after the message has been processed by
234 // raft, which facilitates the client to report MsgSnap status.
235 w.WriteHeader(http.StatusNoContent)
236
237 snapshotReceive.WithLabelValues(from).Inc()
238 snapshotReceiveSeconds.WithLabelValues(from).Observe(time.Since(start).Seconds())
239}
240
241type streamHandler struct {
242 tr *Transport
243 peerGetter peerGetter
244 r Raft
245 id types.ID
246 cid types.ID
247}
248
249func newStreamHandler(tr *Transport, pg peerGetter, r Raft, id, cid types.ID) http.Handler {
250 return &streamHandler{
251 tr: tr,
252 peerGetter: pg,
253 r: r,
254 id: id,
255 cid: cid,
256 }
257}
258
259func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
260 if r.Method != "GET" {
261 w.Header().Set("Allow", "GET")
262 http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
263 return
264 }
265
266 w.Header().Set("X-Server-Version", version.Version)
267 w.Header().Set("X-Etcd-Cluster-ID", h.cid.String())
268
269 if err := checkClusterCompatibilityFromHeader(r.Header, h.cid); err != nil {
270 http.Error(w, err.Error(), http.StatusPreconditionFailed)
271 return
272 }
273
274 var t streamType
275 switch path.Dir(r.URL.Path) {
276 case streamTypeMsgAppV2.endpoint():
277 t = streamTypeMsgAppV2
278 case streamTypeMessage.endpoint():
279 t = streamTypeMessage
280 default:
281 plog.Debugf("ignored unexpected streaming request path %s", r.URL.Path)
282 http.Error(w, "invalid path", http.StatusNotFound)
283 return
284 }
285
286 fromStr := path.Base(r.URL.Path)
287 from, err := types.IDFromString(fromStr)
288 if err != nil {
289 plog.Errorf("failed to parse from %s into ID (%v)", fromStr, err)
290 http.Error(w, "invalid from", http.StatusNotFound)
291 return
292 }
293 if h.r.IsIDRemoved(uint64(from)) {
294 plog.Warningf("rejected the stream from peer %s since it was removed", from)
295 http.Error(w, "removed member", http.StatusGone)
296 return
297 }
298 p := h.peerGetter.Get(from)
299 if p == nil {
300 // This may happen in following cases:
301 // 1. user starts a remote peer that belongs to a different cluster
302 // with the same cluster ID.
303 // 2. local etcd falls behind of the cluster, and cannot recognize
304 // the members that joined after its current progress.
305 if urls := r.Header.Get("X-PeerURLs"); urls != "" {
306 h.tr.AddRemote(from, strings.Split(urls, ","))
307 }
308 plog.Errorf("failed to find member %s in cluster %s", from, h.cid)
309 http.Error(w, "error sender not found", http.StatusNotFound)
310 return
311 }
312
313 wto := h.id.String()
314 if gto := r.Header.Get("X-Raft-To"); gto != wto {
315 plog.Errorf("streaming request ignored (ID mismatch got %s want %s)", gto, wto)
316 http.Error(w, "to field mismatch", http.StatusPreconditionFailed)
317 return
318 }
319
320 w.WriteHeader(http.StatusOK)
321 w.(http.Flusher).Flush()
322
323 c := newCloseNotifier()
324 conn := &outgoingConn{
325 t: t,
326 Writer: w,
327 Flusher: w.(http.Flusher),
328 Closer: c,
329 }
330 p.attachOutgoingConn(conn)
331 <-c.closeNotify()
332}
333
334// checkClusterCompatibilityFromHeader checks the cluster compatibility of
335// the local member from the given header.
336// It checks whether the version of local member is compatible with
337// the versions in the header, and whether the cluster ID of local member
338// matches the one in the header.
339func checkClusterCompatibilityFromHeader(header http.Header, cid types.ID) error {
340 if err := checkVersionCompability(header.Get("X-Server-From"), serverVersion(header), minClusterVersion(header)); err != nil {
341 plog.Errorf("request version incompatibility (%v)", err)
342 return errIncompatibleVersion
343 }
344 if gcid := header.Get("X-Etcd-Cluster-ID"); gcid != cid.String() {
345 plog.Errorf("request cluster ID mismatch (got %s want %s)", gcid, cid)
346 return errClusterIDMismatch
347 }
348 return nil
349}
350
351type closeNotifier struct {
352 done chan struct{}
353}
354
355func newCloseNotifier() *closeNotifier {
356 return &closeNotifier{
357 done: make(chan struct{}),
358 }
359}
360
361func (n *closeNotifier) Close() error {
362 close(n.done)
363 return nil
364}
365
366func (n *closeNotifier) closeNotify() <-chan struct{} { return n.done }