blob: d998ec5902078f1a25ed61ab5f403c8fc3353aac [file] [log] [blame]
khenaidood948f772021-08-11 17:49:24 -04001// Copyright 2017 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package etcdserver
16
17import (
18 "context"
19 "fmt"
20 "time"
21
22 "github.com/coreos/etcd/clientv3"
23 "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
24 pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
25 "github.com/coreos/etcd/mvcc"
26 "github.com/coreos/etcd/pkg/types"
27)
28
29// CheckInitialHashKV compares initial hash values with its peers
30// before serving any peer/client traffic. Only mismatch when hashes
31// are different at requested revision, with same compact revision.
32func (s *EtcdServer) CheckInitialHashKV() error {
33 if !s.Cfg.InitialCorruptCheck {
34 return nil
35 }
36
37 plog.Infof("%s starting initial corruption check with timeout %v...", s.ID(), s.Cfg.ReqTimeout())
38 h, rev, crev, err := s.kv.HashByRev(0)
39 if err != nil {
40 return fmt.Errorf("%s failed to fetch hash (%v)", s.ID(), err)
41 }
42 peers := s.getPeerHashKVs(rev)
43 mismatch := 0
44 for _, p := range peers {
45 if p.resp != nil {
46 peerID := types.ID(p.resp.Header.MemberId)
47 if h != p.resp.Hash {
48 if crev == p.resp.CompactRevision {
49 plog.Errorf("%s's hash %d != %s's hash %d (revision %d, peer revision %d, compact revision %d)", s.ID(), h, peerID, p.resp.Hash, rev, p.resp.Header.Revision, crev)
50 mismatch++
51 } else {
52 plog.Warningf("%s cannot check hash of peer(%s): peer has a different compact revision %d (revision:%d)", s.ID(), peerID, p.resp.CompactRevision, rev)
53 }
54 }
55 continue
56 }
57 if p.err != nil {
58 switch p.err {
59 case rpctypes.ErrFutureRev:
60 plog.Warningf("%s cannot check the hash of peer(%q) at revision %d: peer is lagging behind(%q)", s.ID(), p.eps, rev, p.err.Error())
61 case rpctypes.ErrCompacted:
62 plog.Warningf("%s cannot check the hash of peer(%q) at revision %d: local node is lagging behind(%q)", s.ID(), p.eps, rev, p.err.Error())
63 }
64 }
65 }
66 if mismatch > 0 {
67 return fmt.Errorf("%s found data inconsistency with peers", s.ID())
68 }
69
70 plog.Infof("%s succeeded on initial corruption checking: no corruption", s.ID())
71 return nil
72}
73
74func (s *EtcdServer) monitorKVHash() {
75 t := s.Cfg.CorruptCheckTime
76 if t == 0 {
77 return
78 }
79 plog.Infof("enabled corruption checking with %s interval", t)
80 for {
81 select {
82 case <-s.stopping:
83 return
84 case <-time.After(t):
85 }
86 if !s.isLeader() {
87 continue
88 }
89 if err := s.checkHashKV(); err != nil {
90 plog.Debugf("check hash kv failed %v", err)
91 }
92 }
93}
94
95func (s *EtcdServer) checkHashKV() error {
96 h, rev, crev, err := s.kv.HashByRev(0)
97 if err != nil {
98 plog.Fatalf("failed to hash kv store (%v)", err)
99 }
100 peers := s.getPeerHashKVs(rev)
101
102 ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
103 err = s.linearizableReadNotify(ctx)
104 cancel()
105 if err != nil {
106 return err
107 }
108
109 h2, rev2, crev2, err := s.kv.HashByRev(0)
110 if err != nil {
111 plog.Warningf("failed to hash kv store (%v)", err)
112 return err
113 }
114
115 alarmed := false
116 mismatch := func(id uint64) {
117 if alarmed {
118 return
119 }
120 alarmed = true
121 a := &pb.AlarmRequest{
122 MemberID: uint64(id),
123 Action: pb.AlarmRequest_ACTIVATE,
124 Alarm: pb.AlarmType_CORRUPT,
125 }
126 s.goAttach(func() {
127 s.raftRequest(s.ctx, pb.InternalRaftRequest{Alarm: a})
128 })
129 }
130
131 if h2 != h && rev2 == rev && crev == crev2 {
132 plog.Warningf("mismatched hashes %d and %d for revision %d", h, h2, rev)
133 mismatch(uint64(s.ID()))
134 }
135
136 for _, p := range peers {
137 if p.resp == nil {
138 continue
139 }
140 id := p.resp.Header.MemberId
141
142 // leader expects follower's latest revision less than or equal to leader's
143 if p.resp.Header.Revision > rev2 {
144 plog.Warningf(
145 "revision %d from member %v, expected at most %d",
146 p.resp.Header.Revision,
147 types.ID(id),
148 rev2)
149 mismatch(id)
150 }
151
152 // leader expects follower's latest compact revision less than or equal to leader's
153 if p.resp.CompactRevision > crev2 {
154 plog.Warningf(
155 "compact revision %d from member %v, expected at most %d",
156 p.resp.CompactRevision,
157 types.ID(id),
158 crev2,
159 )
160 mismatch(id)
161 }
162
163 // follower's compact revision is leader's old one, then hashes must match
164 if p.resp.CompactRevision == crev && p.resp.Hash != h {
165 plog.Warningf(
166 "hash %d at revision %d from member %v, expected hash %d",
167 p.resp.Hash,
168 rev,
169 types.ID(id),
170 h,
171 )
172 mismatch(id)
173 }
174 }
175 return nil
176}
177
178type peerHashKVResp struct {
179 resp *clientv3.HashKVResponse
180 err error
181 eps []string
182}
183
184func (s *EtcdServer) getPeerHashKVs(rev int64) (resps []*peerHashKVResp) {
185 // TODO: handle the case when "s.cluster.Members" have not
186 // been populated (e.g. no snapshot to load from disk)
187 mbs := s.cluster.Members()
188 pURLs := make([][]string, len(mbs))
189 for _, m := range mbs {
190 if m.ID == s.ID() {
191 continue
192 }
193 pURLs = append(pURLs, m.PeerURLs)
194 }
195
196 for _, purls := range pURLs {
197 if len(purls) == 0 {
198 continue
199 }
200 cli, cerr := clientv3.New(clientv3.Config{
201 DialTimeout: s.Cfg.ReqTimeout(),
202 Endpoints: purls,
203 })
204 if cerr != nil {
205 plog.Warningf("%s failed to create client to peer %q for hash checking (%q)", s.ID(), purls, cerr.Error())
206 continue
207 }
208
209 respsLen := len(resps)
210 for _, c := range cli.Endpoints() {
211 ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
212 var resp *clientv3.HashKVResponse
213 resp, cerr = cli.HashKV(ctx, c, rev)
214 cancel()
215 if cerr == nil {
216 resps = append(resps, &peerHashKVResp{resp: resp})
217 break
218 }
219 plog.Warningf("%s hash-kv error %q on peer %q with revision %d", s.ID(), cerr.Error(), c, rev)
220 }
221 cli.Close()
222
223 if respsLen == len(resps) {
224 resps = append(resps, &peerHashKVResp{err: cerr, eps: purls})
225 }
226 }
227 return resps
228}
229
230type applierV3Corrupt struct {
231 applierV3
232}
233
234func newApplierV3Corrupt(a applierV3) *applierV3Corrupt { return &applierV3Corrupt{a} }
235
236func (a *applierV3Corrupt) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, error) {
237 return nil, ErrCorrupt
238}
239
240func (a *applierV3Corrupt) Range(txn mvcc.TxnRead, p *pb.RangeRequest) (*pb.RangeResponse, error) {
241 return nil, ErrCorrupt
242}
243
244func (a *applierV3Corrupt) DeleteRange(txn mvcc.TxnWrite, p *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
245 return nil, ErrCorrupt
246}
247
248func (a *applierV3Corrupt) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
249 return nil, ErrCorrupt
250}
251
252func (a *applierV3Corrupt) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, error) {
253 return nil, nil, ErrCorrupt
254}
255
256func (a *applierV3Corrupt) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
257 return nil, ErrCorrupt
258}
259
260func (a *applierV3Corrupt) LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
261 return nil, ErrCorrupt
262}