blob: 62efb0cdc3d8f37e276eeed79bff07991d2c341d [file] [log] [blame]
khenaidooab1f7bd2019-11-14 14:00:27 -05001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package rafthttp
16
17import (
18 "bytes"
19 "context"
20 "io"
21 "io/ioutil"
22 "net/http"
23 "time"
24
25 "go.etcd.io/etcd/etcdserver/api/snap"
26 "go.etcd.io/etcd/pkg/httputil"
27 pioutil "go.etcd.io/etcd/pkg/ioutil"
28 "go.etcd.io/etcd/pkg/types"
29 "go.etcd.io/etcd/raft"
30
31 "github.com/dustin/go-humanize"
32 "go.uber.org/zap"
33)
34
35var (
36 // timeout for reading snapshot response body
37 snapResponseReadTimeout = 5 * time.Second
38)
39
40type snapshotSender struct {
41 from, to types.ID
42 cid types.ID
43
44 tr *Transport
45 picker *urlPicker
46 status *peerStatus
47 r Raft
48 errorc chan error
49
50 stopc chan struct{}
51}
52
53func newSnapshotSender(tr *Transport, picker *urlPicker, to types.ID, status *peerStatus) *snapshotSender {
54 return &snapshotSender{
55 from: tr.ID,
56 to: to,
57 cid: tr.ClusterID,
58 tr: tr,
59 picker: picker,
60 status: status,
61 r: tr.Raft,
62 errorc: tr.ErrorC,
63 stopc: make(chan struct{}),
64 }
65}
66
67func (s *snapshotSender) stop() { close(s.stopc) }
68
69func (s *snapshotSender) send(merged snap.Message) {
70 start := time.Now()
71
72 m := merged.Message
73 to := types.ID(m.To).String()
74
75 body := createSnapBody(s.tr.Logger, merged)
76 defer body.Close()
77
78 u := s.picker.pick()
79 req := createPostRequest(u, RaftSnapshotPrefix, body, "application/octet-stream", s.tr.URLs, s.from, s.cid)
80
81 if s.tr.Logger != nil {
82 s.tr.Logger.Info(
83 "sending database snapshot",
84 zap.Uint64("snapshot-index", m.Snapshot.Metadata.Index),
85 zap.String("remote-peer-id", to),
86 zap.Int64("bytes", merged.TotalSize),
87 zap.String("size", humanize.Bytes(uint64(merged.TotalSize))),
88 )
89 } else {
90 plog.Infof("start to send database snapshot [index: %d, to %s]...", m.Snapshot.Metadata.Index, types.ID(m.To))
91 }
92
93 snapshotSendInflights.WithLabelValues(to).Inc()
94 defer func() {
95 snapshotSendInflights.WithLabelValues(to).Dec()
96 }()
97
98 err := s.post(req)
99 defer merged.CloseWithError(err)
100 if err != nil {
101 if s.tr.Logger != nil {
102 s.tr.Logger.Warn(
103 "failed to send database snapshot",
104 zap.Uint64("snapshot-index", m.Snapshot.Metadata.Index),
105 zap.String("remote-peer-id", to),
106 zap.Int64("bytes", merged.TotalSize),
107 zap.String("size", humanize.Bytes(uint64(merged.TotalSize))),
108 zap.Error(err),
109 )
110 } else {
111 plog.Warningf("database snapshot [index: %d, to: %s] failed to be sent out (%v)", m.Snapshot.Metadata.Index, types.ID(m.To), err)
112 }
113
114 // errMemberRemoved is a critical error since a removed member should
115 // always be stopped. So we use reportCriticalError to report it to errorc.
116 if err == errMemberRemoved {
117 reportCriticalError(err, s.errorc)
118 }
119
120 s.picker.unreachable(u)
121 s.status.deactivate(failureType{source: sendSnap, action: "post"}, err.Error())
122 s.r.ReportUnreachable(m.To)
123 // report SnapshotFailure to raft state machine. After raft state
124 // machine knows about it, it would pause a while and retry sending
125 // new snapshot message.
126 s.r.ReportSnapshot(m.To, raft.SnapshotFailure)
127 sentFailures.WithLabelValues(to).Inc()
128 snapshotSendFailures.WithLabelValues(to).Inc()
129 return
130 }
131 s.status.activate()
132 s.r.ReportSnapshot(m.To, raft.SnapshotFinish)
133
134 if s.tr.Logger != nil {
135 s.tr.Logger.Info(
136 "sent database snapshot",
137 zap.Uint64("snapshot-index", m.Snapshot.Metadata.Index),
138 zap.String("remote-peer-id", to),
139 zap.Int64("bytes", merged.TotalSize),
140 zap.String("size", humanize.Bytes(uint64(merged.TotalSize))),
141 )
142 } else {
143 plog.Infof("database snapshot [index: %d, to: %s] sent out successfully", m.Snapshot.Metadata.Index, types.ID(m.To))
144 }
145
146 sentBytes.WithLabelValues(to).Add(float64(merged.TotalSize))
147 snapshotSend.WithLabelValues(to).Inc()
148 snapshotSendSeconds.WithLabelValues(to).Observe(time.Since(start).Seconds())
149}
150
151// post posts the given request.
152// It returns nil when request is sent out and processed successfully.
153func (s *snapshotSender) post(req *http.Request) (err error) {
154 ctx, cancel := context.WithCancel(context.Background())
155 req = req.WithContext(ctx)
156 defer cancel()
157
158 type responseAndError struct {
159 resp *http.Response
160 body []byte
161 err error
162 }
163 result := make(chan responseAndError, 1)
164
165 go func() {
166 resp, err := s.tr.pipelineRt.RoundTrip(req)
167 if err != nil {
168 result <- responseAndError{resp, nil, err}
169 return
170 }
171
172 // close the response body when timeouts.
173 // prevents from reading the body forever when the other side dies right after
174 // successfully receives the request body.
175 time.AfterFunc(snapResponseReadTimeout, func() { httputil.GracefulClose(resp) })
176 body, err := ioutil.ReadAll(resp.Body)
177 result <- responseAndError{resp, body, err}
178 }()
179
180 select {
181 case <-s.stopc:
182 return errStopped
183 case r := <-result:
184 if r.err != nil {
185 return r.err
186 }
187 return checkPostResponse(r.resp, r.body, req, s.to)
188 }
189}
190
191func createSnapBody(lg *zap.Logger, merged snap.Message) io.ReadCloser {
192 buf := new(bytes.Buffer)
193 enc := &messageEncoder{w: buf}
194 // encode raft message
195 if err := enc.encode(&merged.Message); err != nil {
196 if lg != nil {
197 lg.Panic("failed to encode message", zap.Error(err))
198 } else {
199 plog.Panicf("encode message error (%v)", err)
200 }
201 }
202
203 return &pioutil.ReaderAndCloser{
204 Reader: io.MultiReader(buf, merged.ReadCloser),
205 Closer: merged.ReadCloser,
206 }
207}