blob: 32678a7c5129ec9fd101b016cd387dfb194a5f7c [file] [log] [blame]
khenaidooab1f7bd2019-11-14 14:00:27 -05001// Copyright 2017 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package etcdserver
16
17import (
18 "context"
19 "fmt"
20 "time"
21
22 "go.etcd.io/etcd/clientv3"
23 "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
24 pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
25 "go.etcd.io/etcd/mvcc"
26 "go.etcd.io/etcd/pkg/types"
27
28 "go.uber.org/zap"
29)
30
31// CheckInitialHashKV compares initial hash values with its peers
32// before serving any peer/client traffic. Only mismatch when hashes
33// are different at requested revision, with same compact revision.
34func (s *EtcdServer) CheckInitialHashKV() error {
35 if !s.Cfg.InitialCorruptCheck {
36 return nil
37 }
38
39 lg := s.getLogger()
40
41 if lg != nil {
42 lg.Info(
43 "starting initial corruption check",
44 zap.String("local-member-id", s.ID().String()),
45 zap.Duration("timeout", s.Cfg.ReqTimeout()),
46 )
47 } else {
48 plog.Infof("%s starting initial corruption check with timeout %v...", s.ID(), s.Cfg.ReqTimeout())
49 }
50
51 h, rev, crev, err := s.kv.HashByRev(0)
52 if err != nil {
53 return fmt.Errorf("%s failed to fetch hash (%v)", s.ID(), err)
54 }
55 peers := s.getPeerHashKVs(rev)
56 mismatch := 0
57 for _, p := range peers {
58 if p.resp != nil {
59 peerID := types.ID(p.resp.Header.MemberId)
60 fields := []zap.Field{
61 zap.String("local-member-id", s.ID().String()),
62 zap.Int64("local-member-revision", rev),
63 zap.Int64("local-member-compact-revision", crev),
64 zap.Uint32("local-member-hash", h),
65 zap.String("remote-peer-id", peerID.String()),
66 zap.Strings("remote-peer-endpoints", p.eps),
67 zap.Int64("remote-peer-revision", p.resp.Header.Revision),
68 zap.Int64("remote-peer-compact-revision", p.resp.CompactRevision),
69 zap.Uint32("remote-peer-hash", p.resp.Hash),
70 }
71
72 if h != p.resp.Hash {
73 if crev == p.resp.CompactRevision {
74 if lg != nil {
75 lg.Warn("found different hash values from remote peer", fields...)
76 } else {
77 plog.Errorf("%s's hash %d != %s's hash %d (revision %d, peer revision %d, compact revision %d)", s.ID(), h, peerID, p.resp.Hash, rev, p.resp.Header.Revision, crev)
78 }
79 mismatch++
80 } else {
81 if lg != nil {
82 lg.Warn("found different compact revision values from remote peer", fields...)
83 } else {
84 plog.Warningf("%s cannot check hash of peer(%s): peer has a different compact revision %d (revision:%d)", s.ID(), peerID, p.resp.CompactRevision, rev)
85 }
86 }
87 }
88
89 continue
90 }
91
92 if p.err != nil {
93 switch p.err {
94 case rpctypes.ErrFutureRev:
95 if lg != nil {
96 lg.Warn(
97 "cannot fetch hash from slow remote peer",
98 zap.String("local-member-id", s.ID().String()),
99 zap.Int64("local-member-revision", rev),
100 zap.Int64("local-member-compact-revision", crev),
101 zap.Uint32("local-member-hash", h),
102 zap.String("remote-peer-id", p.id.String()),
103 zap.Strings("remote-peer-endpoints", p.eps),
104 zap.Error(err),
105 )
106 } else {
107 plog.Warningf("%s cannot check the hash of peer(%q) at revision %d: peer is lagging behind(%q)", s.ID(), p.eps, rev, p.err.Error())
108 }
109 case rpctypes.ErrCompacted:
110 if lg != nil {
111 lg.Warn(
112 "cannot fetch hash from remote peer; local member is behind",
113 zap.String("local-member-id", s.ID().String()),
114 zap.Int64("local-member-revision", rev),
115 zap.Int64("local-member-compact-revision", crev),
116 zap.Uint32("local-member-hash", h),
117 zap.String("remote-peer-id", p.id.String()),
118 zap.Strings("remote-peer-endpoints", p.eps),
119 zap.Error(err),
120 )
121 } else {
122 plog.Warningf("%s cannot check the hash of peer(%q) at revision %d: local node is lagging behind(%q)", s.ID(), p.eps, rev, p.err.Error())
123 }
124 }
125 }
126 }
127 if mismatch > 0 {
128 return fmt.Errorf("%s found data inconsistency with peers", s.ID())
129 }
130
131 if lg != nil {
132 lg.Info(
133 "initial corruption checking passed; no corruption",
134 zap.String("local-member-id", s.ID().String()),
135 )
136 } else {
137 plog.Infof("%s succeeded on initial corruption checking: no corruption", s.ID())
138 }
139 return nil
140}
141
142func (s *EtcdServer) monitorKVHash() {
143 t := s.Cfg.CorruptCheckTime
144 if t == 0 {
145 return
146 }
147
148 lg := s.getLogger()
149 if lg != nil {
150 lg.Info(
151 "enabled corruption checking",
152 zap.String("local-member-id", s.ID().String()),
153 zap.Duration("interval", t),
154 )
155 } else {
156 plog.Infof("enabled corruption checking with %s interval", t)
157 }
158
159 for {
160 select {
161 case <-s.stopping:
162 return
163 case <-time.After(t):
164 }
165 if !s.isLeader() {
166 continue
167 }
168 if err := s.checkHashKV(); err != nil {
169 if lg != nil {
170 lg.Warn("failed to check hash KV", zap.Error(err))
171 } else {
172 plog.Debugf("check hash kv failed %v", err)
173 }
174 }
175 }
176}
177
178func (s *EtcdServer) checkHashKV() error {
179 lg := s.getLogger()
180
181 h, rev, crev, err := s.kv.HashByRev(0)
182 if err != nil {
183 return err
184 }
185 peers := s.getPeerHashKVs(rev)
186
187 ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
188 err = s.linearizableReadNotify(ctx)
189 cancel()
190 if err != nil {
191 return err
192 }
193
194 h2, rev2, crev2, err := s.kv.HashByRev(0)
195 if err != nil {
196 return err
197 }
198
199 alarmed := false
200 mismatch := func(id uint64) {
201 if alarmed {
202 return
203 }
204 alarmed = true
205 a := &pb.AlarmRequest{
206 MemberID: id,
207 Action: pb.AlarmRequest_ACTIVATE,
208 Alarm: pb.AlarmType_CORRUPT,
209 }
210 s.goAttach(func() {
211 s.raftRequest(s.ctx, pb.InternalRaftRequest{Alarm: a})
212 })
213 }
214
215 if h2 != h && rev2 == rev && crev == crev2 {
216 if lg != nil {
217 lg.Warn(
218 "found hash mismatch",
219 zap.Int64("revision-1", rev),
220 zap.Int64("compact-revision-1", crev),
221 zap.Uint32("hash-1", h),
222 zap.Int64("revision-2", rev2),
223 zap.Int64("compact-revision-2", crev2),
224 zap.Uint32("hash-2", h2),
225 )
226 } else {
227 plog.Warningf("mismatched hashes %d and %d for revision %d", h, h2, rev)
228 }
229 mismatch(uint64(s.ID()))
230 }
231
232 for _, p := range peers {
233 if p.resp == nil {
234 continue
235 }
236 id := p.resp.Header.MemberId
237
238 // leader expects follower's latest revision less than or equal to leader's
239 if p.resp.Header.Revision > rev2 {
240 if lg != nil {
241 lg.Warn(
242 "revision from follower must be less than or equal to leader's",
243 zap.Int64("leader-revision", rev2),
244 zap.Int64("follower-revision", p.resp.Header.Revision),
245 zap.String("follower-peer-id", types.ID(id).String()),
246 )
247 } else {
248 plog.Warningf(
249 "revision %d from member %v, expected at most %d",
250 p.resp.Header.Revision,
251 types.ID(id),
252 rev2)
253 }
254 mismatch(id)
255 }
256
257 // leader expects follower's latest compact revision less than or equal to leader's
258 if p.resp.CompactRevision > crev2 {
259 if lg != nil {
260 lg.Warn(
261 "compact revision from follower must be less than or equal to leader's",
262 zap.Int64("leader-compact-revision", crev2),
263 zap.Int64("follower-compact-revision", p.resp.CompactRevision),
264 zap.String("follower-peer-id", types.ID(id).String()),
265 )
266 } else {
267 plog.Warningf(
268 "compact revision %d from member %v, expected at most %d",
269 p.resp.CompactRevision,
270 types.ID(id),
271 crev2,
272 )
273 }
274 mismatch(id)
275 }
276
277 // follower's compact revision is leader's old one, then hashes must match
278 if p.resp.CompactRevision == crev && p.resp.Hash != h {
279 if lg != nil {
280 lg.Warn(
281 "same compact revision then hashes must match",
282 zap.Int64("leader-compact-revision", crev2),
283 zap.Uint32("leader-hash", h),
284 zap.Int64("follower-compact-revision", p.resp.CompactRevision),
285 zap.Uint32("follower-hash", p.resp.Hash),
286 zap.String("follower-peer-id", types.ID(id).String()),
287 )
288 } else {
289 plog.Warningf(
290 "hash %d at revision %d from member %v, expected hash %d",
291 p.resp.Hash,
292 rev,
293 types.ID(id),
294 h,
295 )
296 }
297 mismatch(id)
298 }
299 }
300 return nil
301}
302
303type peerHashKVResp struct {
304 id types.ID
305 eps []string
306
307 resp *clientv3.HashKVResponse
308 err error
309}
310
311func (s *EtcdServer) getPeerHashKVs(rev int64) (resps []*peerHashKVResp) {
312 // TODO: handle the case when "s.cluster.Members" have not
313 // been populated (e.g. no snapshot to load from disk)
314 mbs := s.cluster.Members()
315 pss := make([]peerHashKVResp, len(mbs))
316 for _, m := range mbs {
317 if m.ID == s.ID() {
318 continue
319 }
320 pss = append(pss, peerHashKVResp{id: m.ID, eps: m.PeerURLs})
321 }
322
323 lg := s.getLogger()
324
325 for _, p := range pss {
326 if len(p.eps) == 0 {
327 continue
328 }
329 cli, cerr := clientv3.New(clientv3.Config{
330 DialTimeout: s.Cfg.ReqTimeout(),
331 Endpoints: p.eps,
332 })
333 if cerr != nil {
334 if lg != nil {
335 lg.Warn(
336 "failed to create client to peer URL",
337 zap.String("local-member-id", s.ID().String()),
338 zap.String("remote-peer-id", p.id.String()),
339 zap.Strings("remote-peer-endpoints", p.eps),
340 zap.Error(cerr),
341 )
342 } else {
343 plog.Warningf("%s failed to create client to peer %q for hash checking (%q)", s.ID(), p.eps, cerr.Error())
344 }
345 continue
346 }
347
348 respsLen := len(resps)
349 for _, c := range cli.Endpoints() {
350 ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
351 var resp *clientv3.HashKVResponse
352 resp, cerr = cli.HashKV(ctx, c, rev)
353 cancel()
354 if cerr == nil {
355 resps = append(resps, &peerHashKVResp{id: p.id, eps: p.eps, resp: resp, err: nil})
356 break
357 }
358 if lg != nil {
359 lg.Warn(
360 "failed hash kv request",
361 zap.String("local-member-id", s.ID().String()),
362 zap.Int64("requested-revision", rev),
363 zap.String("remote-peer-endpoint", c),
364 zap.Error(cerr),
365 )
366 } else {
367 plog.Warningf("%s hash-kv error %q on peer %q with revision %d", s.ID(), cerr.Error(), c, rev)
368 }
369 }
370 cli.Close()
371
372 if respsLen == len(resps) {
373 resps = append(resps, &peerHashKVResp{id: p.id, eps: p.eps, resp: nil, err: cerr})
374 }
375 }
376 return resps
377}
378
379type applierV3Corrupt struct {
380 applierV3
381}
382
383func newApplierV3Corrupt(a applierV3) *applierV3Corrupt { return &applierV3Corrupt{a} }
384
385func (a *applierV3Corrupt) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, error) {
386 return nil, ErrCorrupt
387}
388
389func (a *applierV3Corrupt) Range(txn mvcc.TxnRead, p *pb.RangeRequest) (*pb.RangeResponse, error) {
390 return nil, ErrCorrupt
391}
392
393func (a *applierV3Corrupt) DeleteRange(txn mvcc.TxnWrite, p *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
394 return nil, ErrCorrupt
395}
396
397func (a *applierV3Corrupt) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
398 return nil, ErrCorrupt
399}
400
401func (a *applierV3Corrupt) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, error) {
402 return nil, nil, ErrCorrupt
403}
404
405func (a *applierV3Corrupt) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
406 return nil, ErrCorrupt
407}
408
409func (a *applierV3Corrupt) LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
410 return nil, ErrCorrupt
411}