khenaidoo | ab1f7bd | 2019-11-14 14:00:27 -0500 | [diff] [blame] | 1 | // Copyright 2017 The etcd Authors |
| 2 | // |
| 3 | // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | // you may not use this file except in compliance with the License. |
| 5 | // You may obtain a copy of the License at |
| 6 | // |
| 7 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | // |
| 9 | // Unless required by applicable law or agreed to in writing, software |
| 10 | // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | // See the License for the specific language governing permissions and |
| 13 | // limitations under the License. |
| 14 | |
| 15 | package etcdserver |
| 16 | |
| 17 | import ( |
| 18 | "context" |
| 19 | "fmt" |
| 20 | "time" |
| 21 | |
| 22 | "go.etcd.io/etcd/clientv3" |
| 23 | "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" |
| 24 | pb "go.etcd.io/etcd/etcdserver/etcdserverpb" |
| 25 | "go.etcd.io/etcd/mvcc" |
| 26 | "go.etcd.io/etcd/pkg/types" |
| 27 | |
| 28 | "go.uber.org/zap" |
| 29 | ) |
| 30 | |
| 31 | // CheckInitialHashKV compares initial hash values with its peers |
| 32 | // before serving any peer/client traffic. Only mismatch when hashes |
| 33 | // are different at requested revision, with same compact revision. |
| 34 | func (s *EtcdServer) CheckInitialHashKV() error { |
| 35 | if !s.Cfg.InitialCorruptCheck { |
| 36 | return nil |
| 37 | } |
| 38 | |
| 39 | lg := s.getLogger() |
| 40 | |
| 41 | if lg != nil { |
| 42 | lg.Info( |
| 43 | "starting initial corruption check", |
| 44 | zap.String("local-member-id", s.ID().String()), |
| 45 | zap.Duration("timeout", s.Cfg.ReqTimeout()), |
| 46 | ) |
| 47 | } else { |
| 48 | plog.Infof("%s starting initial corruption check with timeout %v...", s.ID(), s.Cfg.ReqTimeout()) |
| 49 | } |
| 50 | |
| 51 | h, rev, crev, err := s.kv.HashByRev(0) |
| 52 | if err != nil { |
| 53 | return fmt.Errorf("%s failed to fetch hash (%v)", s.ID(), err) |
| 54 | } |
| 55 | peers := s.getPeerHashKVs(rev) |
| 56 | mismatch := 0 |
| 57 | for _, p := range peers { |
| 58 | if p.resp != nil { |
| 59 | peerID := types.ID(p.resp.Header.MemberId) |
| 60 | fields := []zap.Field{ |
| 61 | zap.String("local-member-id", s.ID().String()), |
| 62 | zap.Int64("local-member-revision", rev), |
| 63 | zap.Int64("local-member-compact-revision", crev), |
| 64 | zap.Uint32("local-member-hash", h), |
| 65 | zap.String("remote-peer-id", peerID.String()), |
| 66 | zap.Strings("remote-peer-endpoints", p.eps), |
| 67 | zap.Int64("remote-peer-revision", p.resp.Header.Revision), |
| 68 | zap.Int64("remote-peer-compact-revision", p.resp.CompactRevision), |
| 69 | zap.Uint32("remote-peer-hash", p.resp.Hash), |
| 70 | } |
| 71 | |
| 72 | if h != p.resp.Hash { |
| 73 | if crev == p.resp.CompactRevision { |
| 74 | if lg != nil { |
| 75 | lg.Warn("found different hash values from remote peer", fields...) |
| 76 | } else { |
| 77 | plog.Errorf("%s's hash %d != %s's hash %d (revision %d, peer revision %d, compact revision %d)", s.ID(), h, peerID, p.resp.Hash, rev, p.resp.Header.Revision, crev) |
| 78 | } |
| 79 | mismatch++ |
| 80 | } else { |
| 81 | if lg != nil { |
| 82 | lg.Warn("found different compact revision values from remote peer", fields...) |
| 83 | } else { |
| 84 | plog.Warningf("%s cannot check hash of peer(%s): peer has a different compact revision %d (revision:%d)", s.ID(), peerID, p.resp.CompactRevision, rev) |
| 85 | } |
| 86 | } |
| 87 | } |
| 88 | |
| 89 | continue |
| 90 | } |
| 91 | |
| 92 | if p.err != nil { |
| 93 | switch p.err { |
| 94 | case rpctypes.ErrFutureRev: |
| 95 | if lg != nil { |
| 96 | lg.Warn( |
| 97 | "cannot fetch hash from slow remote peer", |
| 98 | zap.String("local-member-id", s.ID().String()), |
| 99 | zap.Int64("local-member-revision", rev), |
| 100 | zap.Int64("local-member-compact-revision", crev), |
| 101 | zap.Uint32("local-member-hash", h), |
| 102 | zap.String("remote-peer-id", p.id.String()), |
| 103 | zap.Strings("remote-peer-endpoints", p.eps), |
| 104 | zap.Error(err), |
| 105 | ) |
| 106 | } else { |
| 107 | plog.Warningf("%s cannot check the hash of peer(%q) at revision %d: peer is lagging behind(%q)", s.ID(), p.eps, rev, p.err.Error()) |
| 108 | } |
| 109 | case rpctypes.ErrCompacted: |
| 110 | if lg != nil { |
| 111 | lg.Warn( |
| 112 | "cannot fetch hash from remote peer; local member is behind", |
| 113 | zap.String("local-member-id", s.ID().String()), |
| 114 | zap.Int64("local-member-revision", rev), |
| 115 | zap.Int64("local-member-compact-revision", crev), |
| 116 | zap.Uint32("local-member-hash", h), |
| 117 | zap.String("remote-peer-id", p.id.String()), |
| 118 | zap.Strings("remote-peer-endpoints", p.eps), |
| 119 | zap.Error(err), |
| 120 | ) |
| 121 | } else { |
| 122 | plog.Warningf("%s cannot check the hash of peer(%q) at revision %d: local node is lagging behind(%q)", s.ID(), p.eps, rev, p.err.Error()) |
| 123 | } |
| 124 | } |
| 125 | } |
| 126 | } |
| 127 | if mismatch > 0 { |
| 128 | return fmt.Errorf("%s found data inconsistency with peers", s.ID()) |
| 129 | } |
| 130 | |
| 131 | if lg != nil { |
| 132 | lg.Info( |
| 133 | "initial corruption checking passed; no corruption", |
| 134 | zap.String("local-member-id", s.ID().String()), |
| 135 | ) |
| 136 | } else { |
| 137 | plog.Infof("%s succeeded on initial corruption checking: no corruption", s.ID()) |
| 138 | } |
| 139 | return nil |
| 140 | } |
| 141 | |
| 142 | func (s *EtcdServer) monitorKVHash() { |
| 143 | t := s.Cfg.CorruptCheckTime |
| 144 | if t == 0 { |
| 145 | return |
| 146 | } |
| 147 | |
| 148 | lg := s.getLogger() |
| 149 | if lg != nil { |
| 150 | lg.Info( |
| 151 | "enabled corruption checking", |
| 152 | zap.String("local-member-id", s.ID().String()), |
| 153 | zap.Duration("interval", t), |
| 154 | ) |
| 155 | } else { |
| 156 | plog.Infof("enabled corruption checking with %s interval", t) |
| 157 | } |
| 158 | |
| 159 | for { |
| 160 | select { |
| 161 | case <-s.stopping: |
| 162 | return |
| 163 | case <-time.After(t): |
| 164 | } |
| 165 | if !s.isLeader() { |
| 166 | continue |
| 167 | } |
| 168 | if err := s.checkHashKV(); err != nil { |
| 169 | if lg != nil { |
| 170 | lg.Warn("failed to check hash KV", zap.Error(err)) |
| 171 | } else { |
| 172 | plog.Debugf("check hash kv failed %v", err) |
| 173 | } |
| 174 | } |
| 175 | } |
| 176 | } |
| 177 | |
| 178 | func (s *EtcdServer) checkHashKV() error { |
| 179 | lg := s.getLogger() |
| 180 | |
| 181 | h, rev, crev, err := s.kv.HashByRev(0) |
| 182 | if err != nil { |
| 183 | return err |
| 184 | } |
| 185 | peers := s.getPeerHashKVs(rev) |
| 186 | |
| 187 | ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout()) |
| 188 | err = s.linearizableReadNotify(ctx) |
| 189 | cancel() |
| 190 | if err != nil { |
| 191 | return err |
| 192 | } |
| 193 | |
| 194 | h2, rev2, crev2, err := s.kv.HashByRev(0) |
| 195 | if err != nil { |
| 196 | return err |
| 197 | } |
| 198 | |
| 199 | alarmed := false |
| 200 | mismatch := func(id uint64) { |
| 201 | if alarmed { |
| 202 | return |
| 203 | } |
| 204 | alarmed = true |
| 205 | a := &pb.AlarmRequest{ |
| 206 | MemberID: id, |
| 207 | Action: pb.AlarmRequest_ACTIVATE, |
| 208 | Alarm: pb.AlarmType_CORRUPT, |
| 209 | } |
| 210 | s.goAttach(func() { |
| 211 | s.raftRequest(s.ctx, pb.InternalRaftRequest{Alarm: a}) |
| 212 | }) |
| 213 | } |
| 214 | |
| 215 | if h2 != h && rev2 == rev && crev == crev2 { |
| 216 | if lg != nil { |
| 217 | lg.Warn( |
| 218 | "found hash mismatch", |
| 219 | zap.Int64("revision-1", rev), |
| 220 | zap.Int64("compact-revision-1", crev), |
| 221 | zap.Uint32("hash-1", h), |
| 222 | zap.Int64("revision-2", rev2), |
| 223 | zap.Int64("compact-revision-2", crev2), |
| 224 | zap.Uint32("hash-2", h2), |
| 225 | ) |
| 226 | } else { |
| 227 | plog.Warningf("mismatched hashes %d and %d for revision %d", h, h2, rev) |
| 228 | } |
| 229 | mismatch(uint64(s.ID())) |
| 230 | } |
| 231 | |
| 232 | for _, p := range peers { |
| 233 | if p.resp == nil { |
| 234 | continue |
| 235 | } |
| 236 | id := p.resp.Header.MemberId |
| 237 | |
| 238 | // leader expects follower's latest revision less than or equal to leader's |
| 239 | if p.resp.Header.Revision > rev2 { |
| 240 | if lg != nil { |
| 241 | lg.Warn( |
| 242 | "revision from follower must be less than or equal to leader's", |
| 243 | zap.Int64("leader-revision", rev2), |
| 244 | zap.Int64("follower-revision", p.resp.Header.Revision), |
| 245 | zap.String("follower-peer-id", types.ID(id).String()), |
| 246 | ) |
| 247 | } else { |
| 248 | plog.Warningf( |
| 249 | "revision %d from member %v, expected at most %d", |
| 250 | p.resp.Header.Revision, |
| 251 | types.ID(id), |
| 252 | rev2) |
| 253 | } |
| 254 | mismatch(id) |
| 255 | } |
| 256 | |
| 257 | // leader expects follower's latest compact revision less than or equal to leader's |
| 258 | if p.resp.CompactRevision > crev2 { |
| 259 | if lg != nil { |
| 260 | lg.Warn( |
| 261 | "compact revision from follower must be less than or equal to leader's", |
| 262 | zap.Int64("leader-compact-revision", crev2), |
| 263 | zap.Int64("follower-compact-revision", p.resp.CompactRevision), |
| 264 | zap.String("follower-peer-id", types.ID(id).String()), |
| 265 | ) |
| 266 | } else { |
| 267 | plog.Warningf( |
| 268 | "compact revision %d from member %v, expected at most %d", |
| 269 | p.resp.CompactRevision, |
| 270 | types.ID(id), |
| 271 | crev2, |
| 272 | ) |
| 273 | } |
| 274 | mismatch(id) |
| 275 | } |
| 276 | |
| 277 | // follower's compact revision is leader's old one, then hashes must match |
| 278 | if p.resp.CompactRevision == crev && p.resp.Hash != h { |
| 279 | if lg != nil { |
| 280 | lg.Warn( |
| 281 | "same compact revision then hashes must match", |
| 282 | zap.Int64("leader-compact-revision", crev2), |
| 283 | zap.Uint32("leader-hash", h), |
| 284 | zap.Int64("follower-compact-revision", p.resp.CompactRevision), |
| 285 | zap.Uint32("follower-hash", p.resp.Hash), |
| 286 | zap.String("follower-peer-id", types.ID(id).String()), |
| 287 | ) |
| 288 | } else { |
| 289 | plog.Warningf( |
| 290 | "hash %d at revision %d from member %v, expected hash %d", |
| 291 | p.resp.Hash, |
| 292 | rev, |
| 293 | types.ID(id), |
| 294 | h, |
| 295 | ) |
| 296 | } |
| 297 | mismatch(id) |
| 298 | } |
| 299 | } |
| 300 | return nil |
| 301 | } |
| 302 | |
| 303 | type peerHashKVResp struct { |
| 304 | id types.ID |
| 305 | eps []string |
| 306 | |
| 307 | resp *clientv3.HashKVResponse |
| 308 | err error |
| 309 | } |
| 310 | |
| 311 | func (s *EtcdServer) getPeerHashKVs(rev int64) (resps []*peerHashKVResp) { |
| 312 | // TODO: handle the case when "s.cluster.Members" have not |
| 313 | // been populated (e.g. no snapshot to load from disk) |
| 314 | mbs := s.cluster.Members() |
| 315 | pss := make([]peerHashKVResp, len(mbs)) |
| 316 | for _, m := range mbs { |
| 317 | if m.ID == s.ID() { |
| 318 | continue |
| 319 | } |
| 320 | pss = append(pss, peerHashKVResp{id: m.ID, eps: m.PeerURLs}) |
| 321 | } |
| 322 | |
| 323 | lg := s.getLogger() |
| 324 | |
| 325 | for _, p := range pss { |
| 326 | if len(p.eps) == 0 { |
| 327 | continue |
| 328 | } |
| 329 | cli, cerr := clientv3.New(clientv3.Config{ |
| 330 | DialTimeout: s.Cfg.ReqTimeout(), |
| 331 | Endpoints: p.eps, |
| 332 | }) |
| 333 | if cerr != nil { |
| 334 | if lg != nil { |
| 335 | lg.Warn( |
| 336 | "failed to create client to peer URL", |
| 337 | zap.String("local-member-id", s.ID().String()), |
| 338 | zap.String("remote-peer-id", p.id.String()), |
| 339 | zap.Strings("remote-peer-endpoints", p.eps), |
| 340 | zap.Error(cerr), |
| 341 | ) |
| 342 | } else { |
| 343 | plog.Warningf("%s failed to create client to peer %q for hash checking (%q)", s.ID(), p.eps, cerr.Error()) |
| 344 | } |
| 345 | continue |
| 346 | } |
| 347 | |
| 348 | respsLen := len(resps) |
| 349 | for _, c := range cli.Endpoints() { |
| 350 | ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout()) |
| 351 | var resp *clientv3.HashKVResponse |
| 352 | resp, cerr = cli.HashKV(ctx, c, rev) |
| 353 | cancel() |
| 354 | if cerr == nil { |
| 355 | resps = append(resps, &peerHashKVResp{id: p.id, eps: p.eps, resp: resp, err: nil}) |
| 356 | break |
| 357 | } |
| 358 | if lg != nil { |
| 359 | lg.Warn( |
| 360 | "failed hash kv request", |
| 361 | zap.String("local-member-id", s.ID().String()), |
| 362 | zap.Int64("requested-revision", rev), |
| 363 | zap.String("remote-peer-endpoint", c), |
| 364 | zap.Error(cerr), |
| 365 | ) |
| 366 | } else { |
| 367 | plog.Warningf("%s hash-kv error %q on peer %q with revision %d", s.ID(), cerr.Error(), c, rev) |
| 368 | } |
| 369 | } |
| 370 | cli.Close() |
| 371 | |
| 372 | if respsLen == len(resps) { |
| 373 | resps = append(resps, &peerHashKVResp{id: p.id, eps: p.eps, resp: nil, err: cerr}) |
| 374 | } |
| 375 | } |
| 376 | return resps |
| 377 | } |
| 378 | |
| 379 | type applierV3Corrupt struct { |
| 380 | applierV3 |
| 381 | } |
| 382 | |
| 383 | func newApplierV3Corrupt(a applierV3) *applierV3Corrupt { return &applierV3Corrupt{a} } |
| 384 | |
| 385 | func (a *applierV3Corrupt) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, error) { |
| 386 | return nil, ErrCorrupt |
| 387 | } |
| 388 | |
| 389 | func (a *applierV3Corrupt) Range(txn mvcc.TxnRead, p *pb.RangeRequest) (*pb.RangeResponse, error) { |
| 390 | return nil, ErrCorrupt |
| 391 | } |
| 392 | |
| 393 | func (a *applierV3Corrupt) DeleteRange(txn mvcc.TxnWrite, p *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) { |
| 394 | return nil, ErrCorrupt |
| 395 | } |
| 396 | |
| 397 | func (a *applierV3Corrupt) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) { |
| 398 | return nil, ErrCorrupt |
| 399 | } |
| 400 | |
| 401 | func (a *applierV3Corrupt) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, error) { |
| 402 | return nil, nil, ErrCorrupt |
| 403 | } |
| 404 | |
| 405 | func (a *applierV3Corrupt) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) { |
| 406 | return nil, ErrCorrupt |
| 407 | } |
| 408 | |
| 409 | func (a *applierV3Corrupt) LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) { |
| 410 | return nil, ErrCorrupt |
| 411 | } |