blob: fbb96d48b8b5feb716d4b8bd467ba49897303c9b [file] [log] [blame]
khenaidood948f772021-08-11 17:49:24 -04001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package rafthttp
16
17import (
18 "context"
19 "errors"
20 "fmt"
21 "io/ioutil"
22 "net/http"
23 "path"
24 "strings"
25 "time"
26
27 pioutil "github.com/coreos/etcd/pkg/ioutil"
28 "github.com/coreos/etcd/pkg/types"
29 "github.com/coreos/etcd/raft/raftpb"
30 "github.com/coreos/etcd/snap"
31 "github.com/coreos/etcd/version"
32 "github.com/dustin/go-humanize"
33)
34
35const (
36 // connReadLimitByte limits the number of bytes
37 // a single read can read out.
38 //
39 // 64KB should be large enough for not causing
40 // throughput bottleneck as well as small enough
41 // for not causing a read timeout.
42 connReadLimitByte = 64 * 1024
43)
44
45var (
46 RaftPrefix = "/raft"
47 ProbingPrefix = path.Join(RaftPrefix, "probing")
48 RaftStreamPrefix = path.Join(RaftPrefix, "stream")
49 RaftSnapshotPrefix = path.Join(RaftPrefix, "snapshot")
50
51 errIncompatibleVersion = errors.New("incompatible version")
52 errClusterIDMismatch = errors.New("cluster ID mismatch")
53)
54
55type peerGetter interface {
56 Get(id types.ID) Peer
57}
58
59type writerToResponse interface {
60 WriteTo(w http.ResponseWriter)
61}
62
63type pipelineHandler struct {
64 tr Transporter
65 r Raft
66 cid types.ID
67}
68
69// newPipelineHandler returns a handler for handling raft messages
70// from pipeline for RaftPrefix.
71//
72// The handler reads out the raft message from request body,
73// and forwards it to the given raft state machine for processing.
74func newPipelineHandler(tr Transporter, r Raft, cid types.ID) http.Handler {
75 return &pipelineHandler{
76 tr: tr,
77 r: r,
78 cid: cid,
79 }
80}
81
82func (h *pipelineHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
83 if r.Method != "POST" {
84 w.Header().Set("Allow", "POST")
85 http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
86 return
87 }
88
89 w.Header().Set("X-Etcd-Cluster-ID", h.cid.String())
90
91 if err := checkClusterCompatibilityFromHeader(r.Header, h.cid); err != nil {
92 http.Error(w, err.Error(), http.StatusPreconditionFailed)
93 return
94 }
95
96 addRemoteFromRequest(h.tr, r)
97
98 // Limit the data size that could be read from the request body, which ensures that read from
99 // connection will not time out accidentally due to possible blocking in underlying implementation.
100 limitedr := pioutil.NewLimitedBufferReader(r.Body, connReadLimitByte)
101 b, err := ioutil.ReadAll(limitedr)
102 if err != nil {
103 plog.Errorf("failed to read raft message (%v)", err)
104 http.Error(w, "error reading raft message", http.StatusBadRequest)
105 recvFailures.WithLabelValues(r.RemoteAddr).Inc()
106 return
107 }
108
109 var m raftpb.Message
110 if err := m.Unmarshal(b); err != nil {
111 plog.Errorf("failed to unmarshal raft message (%v)", err)
112 http.Error(w, "error unmarshaling raft message", http.StatusBadRequest)
113 recvFailures.WithLabelValues(r.RemoteAddr).Inc()
114 return
115 }
116
117 receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(len(b)))
118
119 if err := h.r.Process(context.TODO(), m); err != nil {
120 switch v := err.(type) {
121 case writerToResponse:
122 v.WriteTo(w)
123 default:
124 plog.Warningf("failed to process raft message (%v)", err)
125 http.Error(w, "error processing raft message", http.StatusInternalServerError)
126 w.(http.Flusher).Flush()
127 // disconnect the http stream
128 panic(err)
129 }
130 return
131 }
132
133 // Write StatusNoContent header after the message has been processed by
134 // raft, which facilitates the client to report MsgSnap status.
135 w.WriteHeader(http.StatusNoContent)
136}
137
138type snapshotHandler struct {
139 tr Transporter
140 r Raft
141 snapshotter *snap.Snapshotter
142 cid types.ID
143}
144
145func newSnapshotHandler(tr Transporter, r Raft, snapshotter *snap.Snapshotter, cid types.ID) http.Handler {
146 return &snapshotHandler{
147 tr: tr,
148 r: r,
149 snapshotter: snapshotter,
150 cid: cid,
151 }
152}
153
154const unknownSnapshotSender = "UNKNOWN_SNAPSHOT_SENDER"
155
156// ServeHTTP serves HTTP request to receive and process snapshot message.
157//
158// If request sender dies without closing underlying TCP connection,
159// the handler will keep waiting for the request body until TCP keepalive
160// finds out that the connection is broken after several minutes.
161// This is acceptable because
162// 1. snapshot messages sent through other TCP connections could still be
163// received and processed.
164// 2. this case should happen rarely, so no further optimization is done.
165func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
166 start := time.Now()
167
168 if r.Method != "POST" {
169 w.Header().Set("Allow", "POST")
170 http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
171 snapshotReceiveFailures.WithLabelValues(unknownSnapshotSender).Inc()
172 return
173 }
174
175 w.Header().Set("X-Etcd-Cluster-ID", h.cid.String())
176
177 if err := checkClusterCompatibilityFromHeader(r.Header, h.cid); err != nil {
178 http.Error(w, err.Error(), http.StatusPreconditionFailed)
179 snapshotReceiveFailures.WithLabelValues(unknownSnapshotSender).Inc()
180 return
181 }
182
183 addRemoteFromRequest(h.tr, r)
184
185 dec := &messageDecoder{r: r.Body}
186 // let snapshots be very large since they can exceed 512MB for large installations
187 m, err := dec.decodeLimit(uint64(1 << 63))
188 from := types.ID(m.From).String()
189 if err != nil {
190 msg := fmt.Sprintf("failed to decode raft message (%v)", err)
191 plog.Errorf(msg)
192 http.Error(w, msg, http.StatusBadRequest)
193 recvFailures.WithLabelValues(r.RemoteAddr).Inc()
194 snapshotReceiveFailures.WithLabelValues(from).Inc()
195 return
196 }
197
198 msgSizeVal := m.Size()
199 msgSize := humanize.Bytes(uint64(msgSizeVal))
200 receivedBytes.WithLabelValues(from).Add(float64(msgSizeVal))
201
202 if m.Type != raftpb.MsgSnap {
203 plog.Errorf("unexpected raft message type %s on snapshot path", m.Type)
204 http.Error(w, "wrong raft message type", http.StatusBadRequest)
205 snapshotReceiveFailures.WithLabelValues(from).Inc()
206 return
207 }
208
209 snapshotReceiveInflights.WithLabelValues(from).Inc()
210 defer func() {
211 snapshotReceiveInflights.WithLabelValues(from).Dec()
212 }()
213 plog.Infof("receiving database snapshot [index: %d, from: %s, raft message size: %s]", m.Snapshot.Metadata.Index, types.ID(m.From), msgSize)
214 // save incoming database snapshot.
215 n, err := h.snapshotter.SaveDBFrom(r.Body, m.Snapshot.Metadata.Index)
216 if err != nil {
217 msg := fmt.Sprintf("failed to save KV snapshot (%v)", err)
218 plog.Error(msg)
219 http.Error(w, msg, http.StatusInternalServerError)
220 snapshotReceiveFailures.WithLabelValues(from).Inc()
221 return
222 }
223
224 downloadTook := time.Since(start)
225 dbSize := humanize.Bytes(uint64(n))
226 receivedBytes.WithLabelValues(from).Add(float64(n))
227 plog.Infof("successfully received and saved database snapshot [index: %d, from: %s, raft message size: %s, db size: %s, took: %s]", m.Snapshot.Metadata.Index, types.ID(m.From), msgSize, dbSize, downloadTook)
228
229 if err := h.r.Process(context.TODO(), m); err != nil {
230 switch v := err.(type) {
231 // Process may return writerToResponse error when doing some
232 // additional checks before calling raft.Node.Step.
233 case writerToResponse:
234 v.WriteTo(w)
235 default:
236 msg := fmt.Sprintf("failed to process raft message (%v)", err)
237 plog.Warningf(msg)
238 http.Error(w, msg, http.StatusInternalServerError)
239 snapshotReceiveFailures.WithLabelValues(from).Inc()
240 }
241 return
242 }
243 // Write StatusNoContent header after the message has been processed by
244 // raft, which facilitates the client to report MsgSnap status.
245 w.WriteHeader(http.StatusNoContent)
246
247 snapshotReceive.WithLabelValues(from).Inc()
248 snapshotReceiveSeconds.WithLabelValues(from).Observe(time.Since(start).Seconds())
249}
250
251type streamHandler struct {
252 tr *Transport
253 peerGetter peerGetter
254 r Raft
255 id types.ID
256 cid types.ID
257}
258
259func newStreamHandler(tr *Transport, pg peerGetter, r Raft, id, cid types.ID) http.Handler {
260 return &streamHandler{
261 tr: tr,
262 peerGetter: pg,
263 r: r,
264 id: id,
265 cid: cid,
266 }
267}
268
269func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
270 if r.Method != "GET" {
271 w.Header().Set("Allow", "GET")
272 http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
273 return
274 }
275
276 w.Header().Set("X-Server-Version", version.Version)
277 w.Header().Set("X-Etcd-Cluster-ID", h.cid.String())
278
279 if err := checkClusterCompatibilityFromHeader(r.Header, h.cid); err != nil {
280 http.Error(w, err.Error(), http.StatusPreconditionFailed)
281 return
282 }
283
284 var t streamType
285 switch path.Dir(r.URL.Path) {
286 case streamTypeMsgAppV2.endpoint():
287 t = streamTypeMsgAppV2
288 case streamTypeMessage.endpoint():
289 t = streamTypeMessage
290 default:
291 plog.Debugf("ignored unexpected streaming request path %s", r.URL.Path)
292 http.Error(w, "invalid path", http.StatusNotFound)
293 return
294 }
295
296 fromStr := path.Base(r.URL.Path)
297 from, err := types.IDFromString(fromStr)
298 if err != nil {
299 plog.Errorf("failed to parse from %s into ID (%v)", fromStr, err)
300 http.Error(w, "invalid from", http.StatusNotFound)
301 return
302 }
303 if h.r.IsIDRemoved(uint64(from)) {
304 plog.Warningf("rejected the stream from peer %s since it was removed", from)
305 http.Error(w, "removed member", http.StatusGone)
306 return
307 }
308 p := h.peerGetter.Get(from)
309 if p == nil {
310 // This may happen in following cases:
311 // 1. user starts a remote peer that belongs to a different cluster
312 // with the same cluster ID.
313 // 2. local etcd falls behind of the cluster, and cannot recognize
314 // the members that joined after its current progress.
315 if urls := r.Header.Get("X-PeerURLs"); urls != "" {
316 h.tr.AddRemote(from, strings.Split(urls, ","))
317 }
318 plog.Errorf("failed to find member %s in cluster %s", from, h.cid)
319 http.Error(w, "error sender not found", http.StatusNotFound)
320 return
321 }
322
323 wto := h.id.String()
324 if gto := r.Header.Get("X-Raft-To"); gto != wto {
325 plog.Errorf("streaming request ignored (ID mismatch got %s want %s)", gto, wto)
326 http.Error(w, "to field mismatch", http.StatusPreconditionFailed)
327 return
328 }
329
330 w.WriteHeader(http.StatusOK)
331 w.(http.Flusher).Flush()
332
333 c := newCloseNotifier()
334 conn := &outgoingConn{
335 t: t,
336 Writer: w,
337 Flusher: w.(http.Flusher),
338 Closer: c,
339 }
340 p.attachOutgoingConn(conn)
341 <-c.closeNotify()
342}
343
344// checkClusterCompatibilityFromHeader checks the cluster compatibility of
345// the local member from the given header.
346// It checks whether the version of local member is compatible with
347// the versions in the header, and whether the cluster ID of local member
348// matches the one in the header.
349func checkClusterCompatibilityFromHeader(header http.Header, cid types.ID) error {
350 if err := checkVersionCompability(header.Get("X-Server-From"), serverVersion(header), minClusterVersion(header)); err != nil {
351 plog.Errorf("request version incompatibility (%v)", err)
352 return errIncompatibleVersion
353 }
354 if gcid := header.Get("X-Etcd-Cluster-ID"); gcid != cid.String() {
355 plog.Errorf("request cluster ID mismatch (got %s want %s)", gcid, cid)
356 return errClusterIDMismatch
357 }
358 return nil
359}
360
361type closeNotifier struct {
362 done chan struct{}
363}
364
365func newCloseNotifier() *closeNotifier {
366 return &closeNotifier{
367 done: make(chan struct{}),
368 }
369}
370
371func (n *closeNotifier) Close() error {
372 close(n.done)
373 return nil
374}
375
376func (n *closeNotifier) closeNotify() <-chan struct{} { return n.done }