Takahiro Suzuki | d7bf820 | 2020-12-17 20:21:59 +0900 | [diff] [blame^] | 1 | // Copyright 2015 The etcd Authors |
| 2 | // |
| 3 | // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | // you may not use this file except in compliance with the License. |
| 5 | // You may obtain a copy of the License at |
| 6 | // |
| 7 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | // |
| 9 | // Unless required by applicable law or agreed to in writing, software |
| 10 | // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | // See the License for the specific language governing permissions and |
| 13 | // limitations under the License. |
| 14 | |
| 15 | package raft |
| 16 | |
| 17 | import ( |
| 18 | "fmt" |
| 19 | "log" |
| 20 | |
| 21 | pb "go.etcd.io/etcd/raft/raftpb" |
| 22 | ) |
| 23 | |
| 24 | type raftLog struct { |
| 25 | // storage contains all stable entries since the last snapshot. |
| 26 | storage Storage |
| 27 | |
| 28 | // unstable contains all unstable entries and snapshot. |
| 29 | // they will be saved into storage. |
| 30 | unstable unstable |
| 31 | |
| 32 | // committed is the highest log position that is known to be in |
| 33 | // stable storage on a quorum of nodes. |
| 34 | committed uint64 |
| 35 | // applied is the highest log position that the application has |
| 36 | // been instructed to apply to its state machine. |
| 37 | // Invariant: applied <= committed |
| 38 | applied uint64 |
| 39 | |
| 40 | logger Logger |
| 41 | |
| 42 | // maxNextEntsSize is the maximum number aggregate byte size of the messages |
| 43 | // returned from calls to nextEnts. |
| 44 | maxNextEntsSize uint64 |
| 45 | } |
| 46 | |
| 47 | // newLog returns log using the given storage and default options. It |
| 48 | // recovers the log to the state that it just commits and applies the |
| 49 | // latest snapshot. |
| 50 | func newLog(storage Storage, logger Logger) *raftLog { |
| 51 | return newLogWithSize(storage, logger, noLimit) |
| 52 | } |
| 53 | |
| 54 | // newLogWithSize returns a log using the given storage and max |
| 55 | // message size. |
| 56 | func newLogWithSize(storage Storage, logger Logger, maxNextEntsSize uint64) *raftLog { |
| 57 | if storage == nil { |
| 58 | log.Panic("storage must not be nil") |
| 59 | } |
| 60 | log := &raftLog{ |
| 61 | storage: storage, |
| 62 | logger: logger, |
| 63 | maxNextEntsSize: maxNextEntsSize, |
| 64 | } |
| 65 | firstIndex, err := storage.FirstIndex() |
| 66 | if err != nil { |
| 67 | panic(err) // TODO(bdarnell) |
| 68 | } |
| 69 | lastIndex, err := storage.LastIndex() |
| 70 | if err != nil { |
| 71 | panic(err) // TODO(bdarnell) |
| 72 | } |
| 73 | log.unstable.offset = lastIndex + 1 |
| 74 | log.unstable.logger = logger |
| 75 | // Initialize our committed and applied pointers to the time of the last compaction. |
| 76 | log.committed = firstIndex - 1 |
| 77 | log.applied = firstIndex - 1 |
| 78 | |
| 79 | return log |
| 80 | } |
| 81 | |
| 82 | func (l *raftLog) String() string { |
| 83 | return fmt.Sprintf("committed=%d, applied=%d, unstable.offset=%d, len(unstable.Entries)=%d", l.committed, l.applied, l.unstable.offset, len(l.unstable.entries)) |
| 84 | } |
| 85 | |
| 86 | // maybeAppend returns (0, false) if the entries cannot be appended. Otherwise, |
| 87 | // it returns (last index of new entries, true). |
| 88 | func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) { |
| 89 | if l.matchTerm(index, logTerm) { |
| 90 | lastnewi = index + uint64(len(ents)) |
| 91 | ci := l.findConflict(ents) |
| 92 | switch { |
| 93 | case ci == 0: |
| 94 | case ci <= l.committed: |
| 95 | l.logger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed) |
| 96 | default: |
| 97 | offset := index + 1 |
| 98 | l.append(ents[ci-offset:]...) |
| 99 | } |
| 100 | l.commitTo(min(committed, lastnewi)) |
| 101 | return lastnewi, true |
| 102 | } |
| 103 | return 0, false |
| 104 | } |
| 105 | |
| 106 | func (l *raftLog) append(ents ...pb.Entry) uint64 { |
| 107 | if len(ents) == 0 { |
| 108 | return l.lastIndex() |
| 109 | } |
| 110 | if after := ents[0].Index - 1; after < l.committed { |
| 111 | l.logger.Panicf("after(%d) is out of range [committed(%d)]", after, l.committed) |
| 112 | } |
| 113 | l.unstable.truncateAndAppend(ents) |
| 114 | return l.lastIndex() |
| 115 | } |
| 116 | |
| 117 | // findConflict finds the index of the conflict. |
| 118 | // It returns the first pair of conflicting entries between the existing |
| 119 | // entries and the given entries, if there are any. |
| 120 | // If there is no conflicting entries, and the existing entries contains |
| 121 | // all the given entries, zero will be returned. |
| 122 | // If there is no conflicting entries, but the given entries contains new |
| 123 | // entries, the index of the first new entry will be returned. |
| 124 | // An entry is considered to be conflicting if it has the same index but |
| 125 | // a different term. |
| 126 | // The first entry MUST have an index equal to the argument 'from'. |
| 127 | // The index of the given entries MUST be continuously increasing. |
| 128 | func (l *raftLog) findConflict(ents []pb.Entry) uint64 { |
| 129 | for _, ne := range ents { |
| 130 | if !l.matchTerm(ne.Index, ne.Term) { |
| 131 | if ne.Index <= l.lastIndex() { |
| 132 | l.logger.Infof("found conflict at index %d [existing term: %d, conflicting term: %d]", |
| 133 | ne.Index, l.zeroTermOnErrCompacted(l.term(ne.Index)), ne.Term) |
| 134 | } |
| 135 | return ne.Index |
| 136 | } |
| 137 | } |
| 138 | return 0 |
| 139 | } |
| 140 | |
| 141 | func (l *raftLog) unstableEntries() []pb.Entry { |
| 142 | if len(l.unstable.entries) == 0 { |
| 143 | return nil |
| 144 | } |
| 145 | return l.unstable.entries |
| 146 | } |
| 147 | |
| 148 | // nextEnts returns all the available entries for execution. |
| 149 | // If applied is smaller than the index of snapshot, it returns all committed |
| 150 | // entries after the index of snapshot. |
| 151 | func (l *raftLog) nextEnts() (ents []pb.Entry) { |
| 152 | off := max(l.applied+1, l.firstIndex()) |
| 153 | if l.committed+1 > off { |
| 154 | ents, err := l.slice(off, l.committed+1, l.maxNextEntsSize) |
| 155 | if err != nil { |
| 156 | l.logger.Panicf("unexpected error when getting unapplied entries (%v)", err) |
| 157 | } |
| 158 | return ents |
| 159 | } |
| 160 | return nil |
| 161 | } |
| 162 | |
| 163 | // hasNextEnts returns if there is any available entries for execution. This |
| 164 | // is a fast check without heavy raftLog.slice() in raftLog.nextEnts(). |
| 165 | func (l *raftLog) hasNextEnts() bool { |
| 166 | off := max(l.applied+1, l.firstIndex()) |
| 167 | return l.committed+1 > off |
| 168 | } |
| 169 | |
| 170 | func (l *raftLog) snapshot() (pb.Snapshot, error) { |
| 171 | if l.unstable.snapshot != nil { |
| 172 | return *l.unstable.snapshot, nil |
| 173 | } |
| 174 | return l.storage.Snapshot() |
| 175 | } |
| 176 | |
| 177 | func (l *raftLog) firstIndex() uint64 { |
| 178 | if i, ok := l.unstable.maybeFirstIndex(); ok { |
| 179 | return i |
| 180 | } |
| 181 | index, err := l.storage.FirstIndex() |
| 182 | if err != nil { |
| 183 | panic(err) // TODO(bdarnell) |
| 184 | } |
| 185 | return index |
| 186 | } |
| 187 | |
| 188 | func (l *raftLog) lastIndex() uint64 { |
| 189 | if i, ok := l.unstable.maybeLastIndex(); ok { |
| 190 | return i |
| 191 | } |
| 192 | i, err := l.storage.LastIndex() |
| 193 | if err != nil { |
| 194 | panic(err) // TODO(bdarnell) |
| 195 | } |
| 196 | return i |
| 197 | } |
| 198 | |
| 199 | func (l *raftLog) commitTo(tocommit uint64) { |
| 200 | // never decrease commit |
| 201 | if l.committed < tocommit { |
| 202 | if l.lastIndex() < tocommit { |
| 203 | l.logger.Panicf("tocommit(%d) is out of range [lastIndex(%d)]. Was the raft log corrupted, truncated, or lost?", tocommit, l.lastIndex()) |
| 204 | } |
| 205 | l.committed = tocommit |
| 206 | } |
| 207 | } |
| 208 | |
| 209 | func (l *raftLog) appliedTo(i uint64) { |
| 210 | if i == 0 { |
| 211 | return |
| 212 | } |
| 213 | if l.committed < i || i < l.applied { |
| 214 | l.logger.Panicf("applied(%d) is out of range [prevApplied(%d), committed(%d)]", i, l.applied, l.committed) |
| 215 | } |
| 216 | l.applied = i |
| 217 | } |
| 218 | |
| 219 | func (l *raftLog) stableTo(i, t uint64) { l.unstable.stableTo(i, t) } |
| 220 | |
| 221 | func (l *raftLog) stableSnapTo(i uint64) { l.unstable.stableSnapTo(i) } |
| 222 | |
| 223 | func (l *raftLog) lastTerm() uint64 { |
| 224 | t, err := l.term(l.lastIndex()) |
| 225 | if err != nil { |
| 226 | l.logger.Panicf("unexpected error when getting the last term (%v)", err) |
| 227 | } |
| 228 | return t |
| 229 | } |
| 230 | |
| 231 | func (l *raftLog) term(i uint64) (uint64, error) { |
| 232 | // the valid term range is [index of dummy entry, last index] |
| 233 | dummyIndex := l.firstIndex() - 1 |
| 234 | if i < dummyIndex || i > l.lastIndex() { |
| 235 | // TODO: return an error instead? |
| 236 | return 0, nil |
| 237 | } |
| 238 | |
| 239 | if t, ok := l.unstable.maybeTerm(i); ok { |
| 240 | return t, nil |
| 241 | } |
| 242 | |
| 243 | t, err := l.storage.Term(i) |
| 244 | if err == nil { |
| 245 | return t, nil |
| 246 | } |
| 247 | if err == ErrCompacted || err == ErrUnavailable { |
| 248 | return 0, err |
| 249 | } |
| 250 | panic(err) // TODO(bdarnell) |
| 251 | } |
| 252 | |
| 253 | func (l *raftLog) entries(i, maxsize uint64) ([]pb.Entry, error) { |
| 254 | if i > l.lastIndex() { |
| 255 | return nil, nil |
| 256 | } |
| 257 | return l.slice(i, l.lastIndex()+1, maxsize) |
| 258 | } |
| 259 | |
| 260 | // allEntries returns all entries in the log. |
| 261 | func (l *raftLog) allEntries() []pb.Entry { |
| 262 | ents, err := l.entries(l.firstIndex(), noLimit) |
| 263 | if err == nil { |
| 264 | return ents |
| 265 | } |
| 266 | if err == ErrCompacted { // try again if there was a racing compaction |
| 267 | return l.allEntries() |
| 268 | } |
| 269 | // TODO (xiangli): handle error? |
| 270 | panic(err) |
| 271 | } |
| 272 | |
| 273 | // isUpToDate determines if the given (lastIndex,term) log is more up-to-date |
| 274 | // by comparing the index and term of the last entries in the existing logs. |
| 275 | // If the logs have last entries with different terms, then the log with the |
| 276 | // later term is more up-to-date. If the logs end with the same term, then |
| 277 | // whichever log has the larger lastIndex is more up-to-date. If the logs are |
| 278 | // the same, the given log is up-to-date. |
| 279 | func (l *raftLog) isUpToDate(lasti, term uint64) bool { |
| 280 | return term > l.lastTerm() || (term == l.lastTerm() && lasti >= l.lastIndex()) |
| 281 | } |
| 282 | |
| 283 | func (l *raftLog) matchTerm(i, term uint64) bool { |
| 284 | t, err := l.term(i) |
| 285 | if err != nil { |
| 286 | return false |
| 287 | } |
| 288 | return t == term |
| 289 | } |
| 290 | |
| 291 | func (l *raftLog) maybeCommit(maxIndex, term uint64) bool { |
| 292 | if maxIndex > l.committed && l.zeroTermOnErrCompacted(l.term(maxIndex)) == term { |
| 293 | l.commitTo(maxIndex) |
| 294 | return true |
| 295 | } |
| 296 | return false |
| 297 | } |
| 298 | |
| 299 | func (l *raftLog) restore(s pb.Snapshot) { |
| 300 | l.logger.Infof("log [%s] starts to restore snapshot [index: %d, term: %d]", l, s.Metadata.Index, s.Metadata.Term) |
| 301 | l.committed = s.Metadata.Index |
| 302 | l.unstable.restore(s) |
| 303 | } |
| 304 | |
| 305 | // slice returns a slice of log entries from lo through hi-1, inclusive. |
| 306 | func (l *raftLog) slice(lo, hi, maxSize uint64) ([]pb.Entry, error) { |
| 307 | err := l.mustCheckOutOfBounds(lo, hi) |
| 308 | if err != nil { |
| 309 | return nil, err |
| 310 | } |
| 311 | if lo == hi { |
| 312 | return nil, nil |
| 313 | } |
| 314 | var ents []pb.Entry |
| 315 | if lo < l.unstable.offset { |
| 316 | storedEnts, err := l.storage.Entries(lo, min(hi, l.unstable.offset), maxSize) |
| 317 | if err == ErrCompacted { |
| 318 | return nil, err |
| 319 | } else if err == ErrUnavailable { |
| 320 | l.logger.Panicf("entries[%d:%d) is unavailable from storage", lo, min(hi, l.unstable.offset)) |
| 321 | } else if err != nil { |
| 322 | panic(err) // TODO(bdarnell) |
| 323 | } |
| 324 | |
| 325 | // check if ents has reached the size limitation |
| 326 | if uint64(len(storedEnts)) < min(hi, l.unstable.offset)-lo { |
| 327 | return storedEnts, nil |
| 328 | } |
| 329 | |
| 330 | ents = storedEnts |
| 331 | } |
| 332 | if hi > l.unstable.offset { |
| 333 | unstable := l.unstable.slice(max(lo, l.unstable.offset), hi) |
| 334 | if len(ents) > 0 { |
| 335 | combined := make([]pb.Entry, len(ents)+len(unstable)) |
| 336 | n := copy(combined, ents) |
| 337 | copy(combined[n:], unstable) |
| 338 | ents = combined |
| 339 | } else { |
| 340 | ents = unstable |
| 341 | } |
| 342 | } |
| 343 | return limitSize(ents, maxSize), nil |
| 344 | } |
| 345 | |
| 346 | // l.firstIndex <= lo <= hi <= l.firstIndex + len(l.entries) |
| 347 | func (l *raftLog) mustCheckOutOfBounds(lo, hi uint64) error { |
| 348 | if lo > hi { |
| 349 | l.logger.Panicf("invalid slice %d > %d", lo, hi) |
| 350 | } |
| 351 | fi := l.firstIndex() |
| 352 | if lo < fi { |
| 353 | return ErrCompacted |
| 354 | } |
| 355 | |
| 356 | length := l.lastIndex() + 1 - fi |
| 357 | if lo < fi || hi > fi+length { |
| 358 | l.logger.Panicf("slice[%d,%d) out of bound [%d,%d]", lo, hi, fi, l.lastIndex()) |
| 359 | } |
| 360 | return nil |
| 361 | } |
| 362 | |
| 363 | func (l *raftLog) zeroTermOnErrCompacted(t uint64, err error) uint64 { |
| 364 | if err == nil { |
| 365 | return t |
| 366 | } |
| 367 | if err == ErrCompacted { |
| 368 | return 0 |
| 369 | } |
| 370 | l.logger.Panicf("unexpected error (%v)", err) |
| 371 | return 0 |
| 372 | } |