blob: c3036d3c90dd219d174998f06154831d87f5186a [file] [log] [blame]
khenaidood948f772021-08-11 17:49:24 -04001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package raft
16
17import (
18 "fmt"
19 "log"
20
21 pb "github.com/coreos/etcd/raft/raftpb"
22)
23
24type raftLog struct {
25 // storage contains all stable entries since the last snapshot.
26 storage Storage
27
28 // unstable contains all unstable entries and snapshot.
29 // they will be saved into storage.
30 unstable unstable
31
32 // committed is the highest log position that is known to be in
33 // stable storage on a quorum of nodes.
34 committed uint64
35 // applied is the highest log position that the application has
36 // been instructed to apply to its state machine.
37 // Invariant: applied <= committed
38 applied uint64
39
40 logger Logger
41}
42
43// newLog returns log using the given storage. It recovers the log to the state
44// that it just commits and applies the latest snapshot.
45func newLog(storage Storage, logger Logger) *raftLog {
46 if storage == nil {
47 log.Panic("storage must not be nil")
48 }
49 log := &raftLog{
50 storage: storage,
51 logger: logger,
52 }
53 firstIndex, err := storage.FirstIndex()
54 if err != nil {
55 panic(err) // TODO(bdarnell)
56 }
57 lastIndex, err := storage.LastIndex()
58 if err != nil {
59 panic(err) // TODO(bdarnell)
60 }
61 log.unstable.offset = lastIndex + 1
62 log.unstable.logger = logger
63 // Initialize our committed and applied pointers to the time of the last compaction.
64 log.committed = firstIndex - 1
65 log.applied = firstIndex - 1
66
67 return log
68}
69
70func (l *raftLog) String() string {
71 return fmt.Sprintf("committed=%d, applied=%d, unstable.offset=%d, len(unstable.Entries)=%d", l.committed, l.applied, l.unstable.offset, len(l.unstable.entries))
72}
73
74// maybeAppend returns (0, false) if the entries cannot be appended. Otherwise,
75// it returns (last index of new entries, true).
76func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) {
77 if l.matchTerm(index, logTerm) {
78 lastnewi = index + uint64(len(ents))
79 ci := l.findConflict(ents)
80 switch {
81 case ci == 0:
82 case ci <= l.committed:
83 l.logger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed)
84 default:
85 offset := index + 1
86 l.append(ents[ci-offset:]...)
87 }
88 l.commitTo(min(committed, lastnewi))
89 return lastnewi, true
90 }
91 return 0, false
92}
93
94func (l *raftLog) append(ents ...pb.Entry) uint64 {
95 if len(ents) == 0 {
96 return l.lastIndex()
97 }
98 if after := ents[0].Index - 1; after < l.committed {
99 l.logger.Panicf("after(%d) is out of range [committed(%d)]", after, l.committed)
100 }
101 l.unstable.truncateAndAppend(ents)
102 return l.lastIndex()
103}
104
105// findConflict finds the index of the conflict.
106// It returns the first pair of conflicting entries between the existing
107// entries and the given entries, if there are any.
108// If there is no conflicting entries, and the existing entries contains
109// all the given entries, zero will be returned.
110// If there is no conflicting entries, but the given entries contains new
111// entries, the index of the first new entry will be returned.
112// An entry is considered to be conflicting if it has the same index but
113// a different term.
114// The first entry MUST have an index equal to the argument 'from'.
115// The index of the given entries MUST be continuously increasing.
116func (l *raftLog) findConflict(ents []pb.Entry) uint64 {
117 for _, ne := range ents {
118 if !l.matchTerm(ne.Index, ne.Term) {
119 if ne.Index <= l.lastIndex() {
120 l.logger.Infof("found conflict at index %d [existing term: %d, conflicting term: %d]",
121 ne.Index, l.zeroTermOnErrCompacted(l.term(ne.Index)), ne.Term)
122 }
123 return ne.Index
124 }
125 }
126 return 0
127}
128
129func (l *raftLog) unstableEntries() []pb.Entry {
130 if len(l.unstable.entries) == 0 {
131 return nil
132 }
133 return l.unstable.entries
134}
135
136// nextEnts returns all the available entries for execution.
137// If applied is smaller than the index of snapshot, it returns all committed
138// entries after the index of snapshot.
139func (l *raftLog) nextEnts() (ents []pb.Entry) {
140 off := max(l.applied+1, l.firstIndex())
141 if l.committed+1 > off {
142 ents, err := l.slice(off, l.committed+1, noLimit)
143 if err != nil {
144 l.logger.Panicf("unexpected error when getting unapplied entries (%v)", err)
145 }
146 return ents
147 }
148 return nil
149}
150
151// hasNextEnts returns if there is any available entries for execution. This
152// is a fast check without heavy raftLog.slice() in raftLog.nextEnts().
153func (l *raftLog) hasNextEnts() bool {
154 off := max(l.applied+1, l.firstIndex())
155 return l.committed+1 > off
156}
157
158func (l *raftLog) snapshot() (pb.Snapshot, error) {
159 if l.unstable.snapshot != nil {
160 return *l.unstable.snapshot, nil
161 }
162 return l.storage.Snapshot()
163}
164
165func (l *raftLog) firstIndex() uint64 {
166 if i, ok := l.unstable.maybeFirstIndex(); ok {
167 return i
168 }
169 index, err := l.storage.FirstIndex()
170 if err != nil {
171 panic(err) // TODO(bdarnell)
172 }
173 return index
174}
175
176func (l *raftLog) lastIndex() uint64 {
177 if i, ok := l.unstable.maybeLastIndex(); ok {
178 return i
179 }
180 i, err := l.storage.LastIndex()
181 if err != nil {
182 panic(err) // TODO(bdarnell)
183 }
184 return i
185}
186
187func (l *raftLog) commitTo(tocommit uint64) {
188 // never decrease commit
189 if l.committed < tocommit {
190 if l.lastIndex() < tocommit {
191 l.logger.Panicf("tocommit(%d) is out of range [lastIndex(%d)]. Was the raft log corrupted, truncated, or lost?", tocommit, l.lastIndex())
192 }
193 l.committed = tocommit
194 }
195}
196
197func (l *raftLog) appliedTo(i uint64) {
198 if i == 0 {
199 return
200 }
201 if l.committed < i || i < l.applied {
202 l.logger.Panicf("applied(%d) is out of range [prevApplied(%d), committed(%d)]", i, l.applied, l.committed)
203 }
204 l.applied = i
205}
206
207func (l *raftLog) stableTo(i, t uint64) { l.unstable.stableTo(i, t) }
208
209func (l *raftLog) stableSnapTo(i uint64) { l.unstable.stableSnapTo(i) }
210
211func (l *raftLog) lastTerm() uint64 {
212 t, err := l.term(l.lastIndex())
213 if err != nil {
214 l.logger.Panicf("unexpected error when getting the last term (%v)", err)
215 }
216 return t
217}
218
219func (l *raftLog) term(i uint64) (uint64, error) {
220 // the valid term range is [index of dummy entry, last index]
221 dummyIndex := l.firstIndex() - 1
222 if i < dummyIndex || i > l.lastIndex() {
223 // TODO: return an error instead?
224 return 0, nil
225 }
226
227 if t, ok := l.unstable.maybeTerm(i); ok {
228 return t, nil
229 }
230
231 t, err := l.storage.Term(i)
232 if err == nil {
233 return t, nil
234 }
235 if err == ErrCompacted || err == ErrUnavailable {
236 return 0, err
237 }
238 panic(err) // TODO(bdarnell)
239}
240
241func (l *raftLog) entries(i, maxsize uint64) ([]pb.Entry, error) {
242 if i > l.lastIndex() {
243 return nil, nil
244 }
245 return l.slice(i, l.lastIndex()+1, maxsize)
246}
247
248// allEntries returns all entries in the log.
249func (l *raftLog) allEntries() []pb.Entry {
250 ents, err := l.entries(l.firstIndex(), noLimit)
251 if err == nil {
252 return ents
253 }
254 if err == ErrCompacted { // try again if there was a racing compaction
255 return l.allEntries()
256 }
257 // TODO (xiangli): handle error?
258 panic(err)
259}
260
261// isUpToDate determines if the given (lastIndex,term) log is more up-to-date
262// by comparing the index and term of the last entries in the existing logs.
263// If the logs have last entries with different terms, then the log with the
264// later term is more up-to-date. If the logs end with the same term, then
265// whichever log has the larger lastIndex is more up-to-date. If the logs are
266// the same, the given log is up-to-date.
267func (l *raftLog) isUpToDate(lasti, term uint64) bool {
268 return term > l.lastTerm() || (term == l.lastTerm() && lasti >= l.lastIndex())
269}
270
271func (l *raftLog) matchTerm(i, term uint64) bool {
272 t, err := l.term(i)
273 if err != nil {
274 return false
275 }
276 return t == term
277}
278
279func (l *raftLog) maybeCommit(maxIndex, term uint64) bool {
280 if maxIndex > l.committed && l.zeroTermOnErrCompacted(l.term(maxIndex)) == term {
281 l.commitTo(maxIndex)
282 return true
283 }
284 return false
285}
286
287func (l *raftLog) restore(s pb.Snapshot) {
288 l.logger.Infof("log [%s] starts to restore snapshot [index: %d, term: %d]", l, s.Metadata.Index, s.Metadata.Term)
289 l.committed = s.Metadata.Index
290 l.unstable.restore(s)
291}
292
293// slice returns a slice of log entries from lo through hi-1, inclusive.
294func (l *raftLog) slice(lo, hi, maxSize uint64) ([]pb.Entry, error) {
295 err := l.mustCheckOutOfBounds(lo, hi)
296 if err != nil {
297 return nil, err
298 }
299 if lo == hi {
300 return nil, nil
301 }
302 var ents []pb.Entry
303 if lo < l.unstable.offset {
304 storedEnts, err := l.storage.Entries(lo, min(hi, l.unstable.offset), maxSize)
305 if err == ErrCompacted {
306 return nil, err
307 } else if err == ErrUnavailable {
308 l.logger.Panicf("entries[%d:%d) is unavailable from storage", lo, min(hi, l.unstable.offset))
309 } else if err != nil {
310 panic(err) // TODO(bdarnell)
311 }
312
313 // check if ents has reached the size limitation
314 if uint64(len(storedEnts)) < min(hi, l.unstable.offset)-lo {
315 return storedEnts, nil
316 }
317
318 ents = storedEnts
319 }
320 if hi > l.unstable.offset {
321 unstable := l.unstable.slice(max(lo, l.unstable.offset), hi)
322 if len(ents) > 0 {
323 ents = append([]pb.Entry{}, ents...)
324 ents = append(ents, unstable...)
325 } else {
326 ents = unstable
327 }
328 }
329 return limitSize(ents, maxSize), nil
330}
331
332// l.firstIndex <= lo <= hi <= l.firstIndex + len(l.entries)
333func (l *raftLog) mustCheckOutOfBounds(lo, hi uint64) error {
334 if lo > hi {
335 l.logger.Panicf("invalid slice %d > %d", lo, hi)
336 }
337 fi := l.firstIndex()
338 if lo < fi {
339 return ErrCompacted
340 }
341
342 length := l.lastIndex() + 1 - fi
343 if lo < fi || hi > fi+length {
344 l.logger.Panicf("slice[%d,%d) out of bound [%d,%d]", lo, hi, fi, l.lastIndex())
345 }
346 return nil
347}
348
349func (l *raftLog) zeroTermOnErrCompacted(t uint64, err error) uint64 {
350 if err == nil {
351 return t
352 }
353 if err == ErrCompacted {
354 return 0
355 }
356 l.logger.Panicf("unexpected error (%v)", err)
357 return 0
358}