blob: 77eedfccbad7ab5ec53035dc3a4da298f930472e [file] [log] [blame]
Takahiro Suzukid7bf8202020-12-17 20:21:59 +09001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package raft
16
17import (
18 "fmt"
19 "log"
20
21 pb "go.etcd.io/etcd/raft/raftpb"
22)
23
24type raftLog struct {
25 // storage contains all stable entries since the last snapshot.
26 storage Storage
27
28 // unstable contains all unstable entries and snapshot.
29 // they will be saved into storage.
30 unstable unstable
31
32 // committed is the highest log position that is known to be in
33 // stable storage on a quorum of nodes.
34 committed uint64
35 // applied is the highest log position that the application has
36 // been instructed to apply to its state machine.
37 // Invariant: applied <= committed
38 applied uint64
39
40 logger Logger
41
42 // maxNextEntsSize is the maximum number aggregate byte size of the messages
43 // returned from calls to nextEnts.
44 maxNextEntsSize uint64
45}
46
47// newLog returns log using the given storage and default options. It
48// recovers the log to the state that it just commits and applies the
49// latest snapshot.
50func newLog(storage Storage, logger Logger) *raftLog {
51 return newLogWithSize(storage, logger, noLimit)
52}
53
54// newLogWithSize returns a log using the given storage and max
55// message size.
56func newLogWithSize(storage Storage, logger Logger, maxNextEntsSize uint64) *raftLog {
57 if storage == nil {
58 log.Panic("storage must not be nil")
59 }
60 log := &raftLog{
61 storage: storage,
62 logger: logger,
63 maxNextEntsSize: maxNextEntsSize,
64 }
65 firstIndex, err := storage.FirstIndex()
66 if err != nil {
67 panic(err) // TODO(bdarnell)
68 }
69 lastIndex, err := storage.LastIndex()
70 if err != nil {
71 panic(err) // TODO(bdarnell)
72 }
73 log.unstable.offset = lastIndex + 1
74 log.unstable.logger = logger
75 // Initialize our committed and applied pointers to the time of the last compaction.
76 log.committed = firstIndex - 1
77 log.applied = firstIndex - 1
78
79 return log
80}
81
82func (l *raftLog) String() string {
83 return fmt.Sprintf("committed=%d, applied=%d, unstable.offset=%d, len(unstable.Entries)=%d", l.committed, l.applied, l.unstable.offset, len(l.unstable.entries))
84}
85
86// maybeAppend returns (0, false) if the entries cannot be appended. Otherwise,
87// it returns (last index of new entries, true).
88func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) {
89 if l.matchTerm(index, logTerm) {
90 lastnewi = index + uint64(len(ents))
91 ci := l.findConflict(ents)
92 switch {
93 case ci == 0:
94 case ci <= l.committed:
95 l.logger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed)
96 default:
97 offset := index + 1
98 l.append(ents[ci-offset:]...)
99 }
100 l.commitTo(min(committed, lastnewi))
101 return lastnewi, true
102 }
103 return 0, false
104}
105
106func (l *raftLog) append(ents ...pb.Entry) uint64 {
107 if len(ents) == 0 {
108 return l.lastIndex()
109 }
110 if after := ents[0].Index - 1; after < l.committed {
111 l.logger.Panicf("after(%d) is out of range [committed(%d)]", after, l.committed)
112 }
113 l.unstable.truncateAndAppend(ents)
114 return l.lastIndex()
115}
116
117// findConflict finds the index of the conflict.
118// It returns the first pair of conflicting entries between the existing
119// entries and the given entries, if there are any.
120// If there is no conflicting entries, and the existing entries contains
121// all the given entries, zero will be returned.
122// If there is no conflicting entries, but the given entries contains new
123// entries, the index of the first new entry will be returned.
124// An entry is considered to be conflicting if it has the same index but
125// a different term.
126// The first entry MUST have an index equal to the argument 'from'.
127// The index of the given entries MUST be continuously increasing.
128func (l *raftLog) findConflict(ents []pb.Entry) uint64 {
129 for _, ne := range ents {
130 if !l.matchTerm(ne.Index, ne.Term) {
131 if ne.Index <= l.lastIndex() {
132 l.logger.Infof("found conflict at index %d [existing term: %d, conflicting term: %d]",
133 ne.Index, l.zeroTermOnErrCompacted(l.term(ne.Index)), ne.Term)
134 }
135 return ne.Index
136 }
137 }
138 return 0
139}
140
141func (l *raftLog) unstableEntries() []pb.Entry {
142 if len(l.unstable.entries) == 0 {
143 return nil
144 }
145 return l.unstable.entries
146}
147
148// nextEnts returns all the available entries for execution.
149// If applied is smaller than the index of snapshot, it returns all committed
150// entries after the index of snapshot.
151func (l *raftLog) nextEnts() (ents []pb.Entry) {
152 off := max(l.applied+1, l.firstIndex())
153 if l.committed+1 > off {
154 ents, err := l.slice(off, l.committed+1, l.maxNextEntsSize)
155 if err != nil {
156 l.logger.Panicf("unexpected error when getting unapplied entries (%v)", err)
157 }
158 return ents
159 }
160 return nil
161}
162
163// hasNextEnts returns if there is any available entries for execution. This
164// is a fast check without heavy raftLog.slice() in raftLog.nextEnts().
165func (l *raftLog) hasNextEnts() bool {
166 off := max(l.applied+1, l.firstIndex())
167 return l.committed+1 > off
168}
169
170func (l *raftLog) snapshot() (pb.Snapshot, error) {
171 if l.unstable.snapshot != nil {
172 return *l.unstable.snapshot, nil
173 }
174 return l.storage.Snapshot()
175}
176
177func (l *raftLog) firstIndex() uint64 {
178 if i, ok := l.unstable.maybeFirstIndex(); ok {
179 return i
180 }
181 index, err := l.storage.FirstIndex()
182 if err != nil {
183 panic(err) // TODO(bdarnell)
184 }
185 return index
186}
187
188func (l *raftLog) lastIndex() uint64 {
189 if i, ok := l.unstable.maybeLastIndex(); ok {
190 return i
191 }
192 i, err := l.storage.LastIndex()
193 if err != nil {
194 panic(err) // TODO(bdarnell)
195 }
196 return i
197}
198
199func (l *raftLog) commitTo(tocommit uint64) {
200 // never decrease commit
201 if l.committed < tocommit {
202 if l.lastIndex() < tocommit {
203 l.logger.Panicf("tocommit(%d) is out of range [lastIndex(%d)]. Was the raft log corrupted, truncated, or lost?", tocommit, l.lastIndex())
204 }
205 l.committed = tocommit
206 }
207}
208
209func (l *raftLog) appliedTo(i uint64) {
210 if i == 0 {
211 return
212 }
213 if l.committed < i || i < l.applied {
214 l.logger.Panicf("applied(%d) is out of range [prevApplied(%d), committed(%d)]", i, l.applied, l.committed)
215 }
216 l.applied = i
217}
218
219func (l *raftLog) stableTo(i, t uint64) { l.unstable.stableTo(i, t) }
220
221func (l *raftLog) stableSnapTo(i uint64) { l.unstable.stableSnapTo(i) }
222
223func (l *raftLog) lastTerm() uint64 {
224 t, err := l.term(l.lastIndex())
225 if err != nil {
226 l.logger.Panicf("unexpected error when getting the last term (%v)", err)
227 }
228 return t
229}
230
231func (l *raftLog) term(i uint64) (uint64, error) {
232 // the valid term range is [index of dummy entry, last index]
233 dummyIndex := l.firstIndex() - 1
234 if i < dummyIndex || i > l.lastIndex() {
235 // TODO: return an error instead?
236 return 0, nil
237 }
238
239 if t, ok := l.unstable.maybeTerm(i); ok {
240 return t, nil
241 }
242
243 t, err := l.storage.Term(i)
244 if err == nil {
245 return t, nil
246 }
247 if err == ErrCompacted || err == ErrUnavailable {
248 return 0, err
249 }
250 panic(err) // TODO(bdarnell)
251}
252
253func (l *raftLog) entries(i, maxsize uint64) ([]pb.Entry, error) {
254 if i > l.lastIndex() {
255 return nil, nil
256 }
257 return l.slice(i, l.lastIndex()+1, maxsize)
258}
259
260// allEntries returns all entries in the log.
261func (l *raftLog) allEntries() []pb.Entry {
262 ents, err := l.entries(l.firstIndex(), noLimit)
263 if err == nil {
264 return ents
265 }
266 if err == ErrCompacted { // try again if there was a racing compaction
267 return l.allEntries()
268 }
269 // TODO (xiangli): handle error?
270 panic(err)
271}
272
273// isUpToDate determines if the given (lastIndex,term) log is more up-to-date
274// by comparing the index and term of the last entries in the existing logs.
275// If the logs have last entries with different terms, then the log with the
276// later term is more up-to-date. If the logs end with the same term, then
277// whichever log has the larger lastIndex is more up-to-date. If the logs are
278// the same, the given log is up-to-date.
279func (l *raftLog) isUpToDate(lasti, term uint64) bool {
280 return term > l.lastTerm() || (term == l.lastTerm() && lasti >= l.lastIndex())
281}
282
283func (l *raftLog) matchTerm(i, term uint64) bool {
284 t, err := l.term(i)
285 if err != nil {
286 return false
287 }
288 return t == term
289}
290
291func (l *raftLog) maybeCommit(maxIndex, term uint64) bool {
292 if maxIndex > l.committed && l.zeroTermOnErrCompacted(l.term(maxIndex)) == term {
293 l.commitTo(maxIndex)
294 return true
295 }
296 return false
297}
298
299func (l *raftLog) restore(s pb.Snapshot) {
300 l.logger.Infof("log [%s] starts to restore snapshot [index: %d, term: %d]", l, s.Metadata.Index, s.Metadata.Term)
301 l.committed = s.Metadata.Index
302 l.unstable.restore(s)
303}
304
305// slice returns a slice of log entries from lo through hi-1, inclusive.
306func (l *raftLog) slice(lo, hi, maxSize uint64) ([]pb.Entry, error) {
307 err := l.mustCheckOutOfBounds(lo, hi)
308 if err != nil {
309 return nil, err
310 }
311 if lo == hi {
312 return nil, nil
313 }
314 var ents []pb.Entry
315 if lo < l.unstable.offset {
316 storedEnts, err := l.storage.Entries(lo, min(hi, l.unstable.offset), maxSize)
317 if err == ErrCompacted {
318 return nil, err
319 } else if err == ErrUnavailable {
320 l.logger.Panicf("entries[%d:%d) is unavailable from storage", lo, min(hi, l.unstable.offset))
321 } else if err != nil {
322 panic(err) // TODO(bdarnell)
323 }
324
325 // check if ents has reached the size limitation
326 if uint64(len(storedEnts)) < min(hi, l.unstable.offset)-lo {
327 return storedEnts, nil
328 }
329
330 ents = storedEnts
331 }
332 if hi > l.unstable.offset {
333 unstable := l.unstable.slice(max(lo, l.unstable.offset), hi)
334 if len(ents) > 0 {
335 combined := make([]pb.Entry, len(ents)+len(unstable))
336 n := copy(combined, ents)
337 copy(combined[n:], unstable)
338 ents = combined
339 } else {
340 ents = unstable
341 }
342 }
343 return limitSize(ents, maxSize), nil
344}
345
346// l.firstIndex <= lo <= hi <= l.firstIndex + len(l.entries)
347func (l *raftLog) mustCheckOutOfBounds(lo, hi uint64) error {
348 if lo > hi {
349 l.logger.Panicf("invalid slice %d > %d", lo, hi)
350 }
351 fi := l.firstIndex()
352 if lo < fi {
353 return ErrCompacted
354 }
355
356 length := l.lastIndex() + 1 - fi
357 if lo < fi || hi > fi+length {
358 l.logger.Panicf("slice[%d,%d) out of bound [%d,%d]", lo, hi, fi, l.lastIndex())
359 }
360 return nil
361}
362
363func (l *raftLog) zeroTermOnErrCompacted(t uint64, err error) uint64 {
364 if err == nil {
365 return t
366 }
367 if err == ErrCompacted {
368 return 0
369 }
370 l.logger.Panicf("unexpected error (%v)", err)
371 return 0
372}